diff --git a/GZipTest.sln b/GZipTest.sln new file mode 100644 index 0000000..4f8f60f --- /dev/null +++ b/GZipTest.sln @@ -0,0 +1,25 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 16 +VisualStudioVersion = 16.0.29009.5 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GZipTest", "GZipTest\GZipTest.csproj", "{BE563CBF-0E92-4BD8-8157-D6EEBFE9535F}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {BE563CBF-0E92-4BD8-8157-D6EEBFE9535F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {BE563CBF-0E92-4BD8-8157-D6EEBFE9535F}.Debug|Any CPU.Build.0 = Debug|Any CPU + {BE563CBF-0E92-4BD8-8157-D6EEBFE9535F}.Release|Any CPU.ActiveCfg = Release|Any CPU + {BE563CBF-0E92-4BD8-8157-D6EEBFE9535F}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {ED3D3F32-26C1-434B-B48B-1EE336E8B1F0} + EndGlobalSection +EndGlobal diff --git a/GZipTest/App.config b/GZipTest/App.config new file mode 100644 index 0000000..d0f8440 --- /dev/null +++ b/GZipTest/App.config @@ -0,0 +1,6 @@ + + + + + + diff --git a/GZipTest/CompressionModule.cs b/GZipTest/CompressionModule.cs new file mode 100644 index 0000000..42600dd --- /dev/null +++ b/GZipTest/CompressionModule.cs @@ -0,0 +1,74 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.IO.Compression; + +namespace GZipTest +{ + class CompressionModule : ProcessingModule + { + /// + /// Reading uncompressed source file + /// + internal override void Read() + { + try + { + using (FileStream input = File.OpenRead(source)) //Opening reading stream + { + segmentCount = (long)Math.Ceiling((double)input.Length / 1048576); //segmentCount field will be used to display progress bar + length = input.Length; //This variable will be used in post analysis + + for (int i = 0; input.Position < input.Length; i++) + { + if (readBuffer.Count >= 5 * Environment.ProcessorCount) //Helping compression thread if there's too many unprocessed blocks + { + ProcessOne(); + i--; + continue; + } + + int blockSize = (int)Math.Min(1048576, input.Length - input.Position); //Determining new block size. Either 1MB or count of the last bytes + + byte[] block = new byte[blockSize]; //Instantiating empty block + input.Read(block, 0, blockSize); //Reading next block + + readBuffer.Enqueue(new KeyValuePair(i, block)); //Adding read block to compression queue. Each block must contain its position number since compression is multi thread + } + } + } + catch (Exception e) + { + ReportError(this, $"Error occured in Reading thread. Served blocks: {served}", e); + } + } + + internal override void ProcessOne() + { + if (!readBuffer.TryDequeue(out KeyValuePair block)) //Extracting read block + return; + + processed.WaitOne(); //Waiting for empty place for compressed block + + using (MemoryStream stream = new MemoryStream()) //Instatiating memory stream which will contain compressed block + using (GZipStream compressor = new GZipStream(stream, CompressionMode.Compress)) //Instantiating compression stream + { + compressor.Write(block.Value, 0, block.Value.Length); //Compressing block + compressor.Close(); + + byte[] compressedBlock = stream.ToArray(); //Getting compressed block + byte[] fileMeta = block.Key == 0 ? BitConverter.GetBytes(segmentCount) : new byte[0]; //If it's the first block in a file we write info about total block count (that will be used to count progress) + byte[] zippedMeta = BitConverter.GetBytes(compressedBlock.Length); //Creating compressed block length info + + byte[] newBlock = new byte[fileMeta.Length + 4 + compressedBlock.Length]; //Merging arrays + fileMeta.CopyTo(newBlock, 0); + zippedMeta.CopyTo(newBlock, fileMeta.Length); + compressedBlock.CopyTo(newBlock, fileMeta.Length + 4); + + processedBuffer.TryAdd( //Processing block and adding it to write queue keeping its position number + block.Key, + newBlock); + } + } + } +} diff --git a/GZipTest/DecompressionModule.cs b/GZipTest/DecompressionModule.cs new file mode 100644 index 0000000..3c01ded --- /dev/null +++ b/GZipTest/DecompressionModule.cs @@ -0,0 +1,68 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.IO.Compression; + +namespace GZipTest +{ + class DecompressionModule : ProcessingModule + { + /// + /// Reading compressed source file + /// + internal override void Read() + { + try + { + using (FileStream input = File.OpenRead(source)) //Opening reading stream + { + byte[] segmentMeta = new byte[8]; //Reading first 8 bytes to determine total count of blocks + input.Read(segmentMeta, 0, 8); + segmentCount = BitConverter.ToInt64(segmentMeta, 0); //segmentCount field will be used to display progress bar + + for (int i = 0; input.Position < input.Length; i++) + { + if (readBuffer.Count >= 5 * Environment.ProcessorCount) //Helping decompression thread if there's too many unprocessed blocks + { + ProcessOne(); + i--; + continue; + } + + byte[] meta = new byte[4]; //Reading first 4 bytes to determine block's length + input.Read(meta, 0, 4); + int blockSize = BitConverter.ToInt32(meta, 0); + + byte[] block = new byte[blockSize]; //Instantiating empty block + input.Read(block, 0, blockSize); //Reading next block + + readBuffer.Enqueue(new KeyValuePair(i, block)); //Adding read block to compression queue. Each block must contain its position number since compression is multi thread + } + } + } + catch (Exception e) + { + ReportError(this, $"Error occured in Reading thread. Served blocks: {served}", e); + } + } + + internal override void ProcessOne() + { + if (!readBuffer.TryDequeue(out KeyValuePair block)) //Extracting read block + return; + + processed.WaitOne(); //Waiting for empty place for compressed block + + using (MemoryStream stream = new MemoryStream(block.Value)) //Instantiating memory stream with compressed block data + using (GZipStream compressor = new GZipStream(stream, CompressionMode.Decompress)) //Instantiating decompressor stream + using (MemoryStream destination = new MemoryStream()) //Instantiating memory stream which will contain decompressed block + { + compressor.CopyTo(destination); //Decompressing block + + processedBuffer.TryAdd( //Processing block and adding it to write queue keeping its position number + block.Key, + destination.ToArray()); + } + } + } +} diff --git a/GZipTest/GZipTest.csproj b/GZipTest/GZipTest.csproj new file mode 100644 index 0000000..f7b227b --- /dev/null +++ b/GZipTest/GZipTest.csproj @@ -0,0 +1,60 @@ + + + + + Debug + AnyCPU + {BE563CBF-0E92-4BD8-8157-D6EEBFE9535F} + Exe + GZipTest + GZipTest + v4.7.2 + 512 + true + true + + + + AnyCPU + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + false + + + AnyCPU + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + false + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/GZipTest/IProcessingModule.cs b/GZipTest/IProcessingModule.cs new file mode 100644 index 0000000..a6428c8 --- /dev/null +++ b/GZipTest/IProcessingModule.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace GZipTest +{ + interface IProcessingModule + { + void Run(string input, string output); + void Stop(); + event ProgressChangedEventHandler ProgressChanged; + event EventHandler Complete; + event ErrorEventHandler ErrorOccured; + } +} diff --git a/GZipTest/ProcessingModule.cs b/GZipTest/ProcessingModule.cs new file mode 100644 index 0000000..bd11067 --- /dev/null +++ b/GZipTest/ProcessingModule.cs @@ -0,0 +1,146 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading; + +namespace GZipTest +{ + /// + /// Delegate void used to inform UI thread about changed progress + /// + /// Amount of blocks that have been done + /// Amount of total blocks + public delegate void ProgressChangedEventHandler(long done, long totalSegments); + + public abstract class ProcessingModule : IProcessingModule + { + public event ProgressChangedEventHandler ProgressChanged; + public event EventHandler Complete; + public event ErrorEventHandler ErrorOccured; + + internal Thread readingThread, writingThread; + internal Thread[] compressionThreads = new Thread[Math.Max(1, Environment.ProcessorCount - 2)]; //If we have ability to use more than 3 threads we add more threads that will proccess blocks (because this operation takes the biggest amount of resources) + + internal Semaphore processed; //Semaphore will help us to maintain RAM and use minimum of it + + internal ConcurrentQueue> readBuffer = new ConcurrentQueue>(); //We use queue for reading and processing blocks since FIFO method is more efficient here + internal ConcurrentDictionary processedBuffer = new ConcurrentDictionary(); //And use dictionary for writing since we need blocks to be placed in order + + //These variables are used for tracking progress + internal long segmentCount = 0; + internal long served = 0; + internal long length; + + //Source and output file paths + internal string source, result; + + /// + /// Initializing workflow + /// + /// Source file path + /// Destination file path + public void Run(string input, string output) + { + //Setting files paths + source = input; + result = output; + + //Instantiating threads + readingThread = new Thread(Read); + writingThread = new Thread(Write); + + for (int i = 0; i < compressionThreads.Length; i++) + compressionThreads[i] = new Thread(Process); + + foreach (Thread i in compressionThreads) + i.Priority = ThreadPriority.Highest; //Since compression is the slowest operation it must be marked as high priority task + + //Semaphore will indicate how many blocks can be now written. + //There can be max 5 blocks for each compression thread because there's no reason for more. + //5 block in a row mean that compressing algorithm is faster than writing algorithm so there's no need to process more block until these are done + processed = new Semaphore(compressionThreads.Length * 5, compressionThreads.Length * 5); + + //Starting threads + readingThread.Start(); + foreach (Thread i in compressionThreads) + i.Start(); + writingThread.Start(); + } + + /// + /// Instantly terminates all threads and cleans up stuff + /// + public void Stop() + { + //Terminating threads + readingThread.Abort(); + foreach (Thread thread in compressionThreads) + thread.Abort(); + writingThread.Abort(); + + //Collecting garbage (Yours' Cap) + GC.Collect(); + } + internal void ReportError(object sender, string message, Exception ex) => ErrorOccured?.Invoke(sender, new ErrorEventArgs(new Exception(message, ex))); + + /// + /// Reading source file + /// + internal abstract void Read(); + + /// + /// Processes one block. This method is used in Read and Write threads + /// + internal abstract void ProcessOne(); + + /// + /// Processing read block + /// + internal void Process() + { + try + { + while (readingThread.IsAlive || readBuffer.Count > 0) //The task will be alive as long as reading is in progress or there's stil any unprocessed blocks + ProcessOne(); + } + catch (Exception e) + { + ReportError(this, $"Error occured in Compression thread. Served blocks: {served}", e); + } + } + + /// + /// Writing processed block to disk + /// + internal void Write() + { + try + { + using (FileStream stream = new FileStream(result, FileMode.Create, FileAccess.Write)) //Instantiating writing stream + { + while (compressionThreads.Any(i => i.IsAlive) || processedBuffer.Count > 0) //The task will be alive as long as compression is in progress or there's stil any unwritten block + { + if (!processedBuffer.TryRemove((int)served, out byte[] block)) //Extracting block that need to be written next + { + if (readBuffer.Count > 0) //Helping processing thread to do its job + ProcessOne(); + continue; + } + + stream.Write(block, 0, block.Length); //Writing block to the file + processed.Release(); //Informing compression threads that they can continue + + ProgressChanged?.Invoke(++served, segmentCount); //Updating progress bar + } + } + Complete?.Invoke(length / 1024 / 1024, null); + } + catch (Exception e) + { + ReportError(this, $"Error occured in writing thread. Blocks served: {served}", e); + } + } + } +} diff --git a/GZipTest/Program.cs b/GZipTest/Program.cs new file mode 100644 index 0000000..c081b16 --- /dev/null +++ b/GZipTest/Program.cs @@ -0,0 +1,101 @@ +using System; +using System.IO; + +namespace GZipTest +{ + class Program + { + static DateTime start = DateTime.Now; + static IProcessingModule module; + + static int Main(string[] args) + { + try + { + //Validating input parameters + if (args.Length != 3) + throw new InvalidDataException("Invalid parameters set.\nUsage: NewWinRar.exe [compress|decompress] [source file name] [destination file name]"); + if (!File.Exists(args[1])) + throw new FileNotFoundException("The source file not found. Make sure it is place in the program's directory and has the same name. Stating extension is required"); + + //Instatiating module + switch (args[0].ToLower()) + { + case "compress": + Console.WriteLine("Compressing file..."); + module = new CompressionModule(); + break; + case "decompress": + Console.WriteLine("Unpacking file..."); + module = new DecompressionModule(); + break; + default: + throw new InvalidDataException("Invalid parameter. The first parameter must be 'compress' or 'decompress'"); + } + + //Subscribing to events + module.ProgressChanged += SetProgress; + module.Complete += Complete; + module.ErrorOccured += Module_ErrorOccured; + + //Executing module + module.Run(args[1], args[2]); + + return 0; + } + //Catching errors and displaying them + catch (Exception e) + { + Console.Error.WriteLine($"\n\n{e.ToString()}\n" + e.InnerException != null && e.InnerException != e ? $"\n{e.InnerException.ToString()}\n" : ""); + return 1; + } + } + + private static void Module_ErrorOccured(object sender, ErrorEventArgs e) + { + Console.Error.WriteLine("Error has occured. Threads tremination initiated"); + Console.Error.WriteLine($"\n\n{e.GetException().ToString()}\n"); + module.Complete -= Complete; + Console.WriteLine("Press any key to continue..."); + Console.ReadKey(); + module.Stop(); + } + + /// + /// Displays complete message and post analysis + /// + /// Represents original file size in MB + /// Not used + private static void Complete(object size, EventArgs e) + { + TimeSpan elapsed = DateTime.Now - start; + Console.WriteLine($"\nDone\nProcessed {size} MB within {elapsed.Minutes} minutes {elapsed.Seconds} seconds\nPress any key to continue..."); + Console.ReadKey(); + } + + /// + /// Displaying progress bar which represents current workflow position + /// + /// Integer from 0 to 100. Represents amount of completed work + public static void SetProgress(long done, long totalSegments) + { + TimeSpan elapsed = DateTime.Now - start; + //Border braces + Console.CursorLeft = 0; + Console.Write("["); + Console.CursorLeft = 21; + Console.Write("]"); + + //Progress bar + for (int i = 0; i < done * 20 / totalSegments; i++) + { + Console.CursorLeft = i + 1; + Console.Write("■"); + } + + //Percentage + Console.CursorLeft = 23; + Console.Write($"{done * 100 / totalSegments}% {done} of {totalSegments} blocks [{elapsed.ToString(@"hh\:mm\:ss")}]"); + } + } +} diff --git a/GZipTest/Properties/AssemblyInfo.cs b/GZipTest/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..8880b2f --- /dev/null +++ b/GZipTest/Properties/AssemblyInfo.cs @@ -0,0 +1,36 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("NewWinRar")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("NewWinRar")] +[assembly: AssemblyCopyright("Copyright © 2019")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("c4bc63bf-c2a8-4057-b746-662f9dcf5a6b")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")]