123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401 |
- //#define Trace
- // ParallelDeflateOutputStream.cs
- // ------------------------------------------------------------------
- //
- // A DeflateStream that does compression only, it uses a
- // divide-and-conquer approach with multiple threads to exploit multiple
- // CPUs for the DEFLATE computation.
- //
- // last saved: <2011-July-31 14:49:40>
- //
- // ------------------------------------------------------------------
- //
- // Copyright (c) 2009-2011 by Dino Chiesa
- // All rights reserved!
- //
- // This code module is part of DotNetZip, a zipfile class library.
- //
- // ------------------------------------------------------------------
- //
- // This code is licensed under the Microsoft Public License.
- // See the file License.txt for the license details.
- // More info on: http://dotnetzip.codeplex.com
- //
- // ------------------------------------------------------------------
- using System;
- using System.Collections.Generic;
- using System.Threading;
- using Ionic.Zlib;
- using System.IO;
- namespace Ionic.Zlib
- {
- internal class WorkItem
- {
- public byte[] buffer;
- public byte[] compressed;
- public int crc;
- public int index;
- public int ordinal;
- public int inputBytesAvailable;
- public int compressedBytesAvailable;
- public ZlibCodec compressor;
- public WorkItem(int size,
- Ionic.Zlib.CompressionLevel compressLevel,
- CompressionStrategy strategy,
- int ix)
- {
- this.buffer= new byte[size];
- // alloc 5 bytes overhead for every block (margin of safety= 2)
- int n = size + ((size / 32768)+1) * 5 * 2;
- this.compressed = new byte[n];
- this.compressor = new ZlibCodec();
- this.compressor.InitializeDeflate(compressLevel, false);
- this.compressor.OutputBuffer = this.compressed;
- this.compressor.InputBuffer = this.buffer;
- this.index = ix;
- }
- }
- /// <summary>
- /// A class for compressing streams using the
- /// Deflate algorithm with multiple threads.
- /// </summary>
- ///
- /// <remarks>
- /// <para>
- /// This class performs DEFLATE compression through writing. For
- /// more information on the Deflate algorithm, see IETF RFC 1951,
- /// "DEFLATE Compressed Data Format Specification version 1.3."
- /// </para>
- ///
- /// <para>
- /// This class is similar to <see cref="Ionic.Zlib.DeflateStream"/>, except
- /// that this class is for compression only, and this implementation uses an
- /// approach that employs multiple worker threads to perform the DEFLATE. On
- /// a multi-cpu or multi-core computer, the performance of this class can be
- /// significantly higher than the single-threaded DeflateStream, particularly
- /// for larger streams. How large? Anything over 10mb is a good candidate
- /// for parallel compression.
- /// </para>
- ///
- /// <para>
- /// The tradeoff is that this class uses more memory and more CPU than the
- /// vanilla DeflateStream, and also is less efficient as a compressor. For
- /// large files the size of the compressed data stream can be less than 1%
- /// larger than the size of a compressed data stream from the vanialla
- /// DeflateStream. For smaller files the difference can be larger. The
- /// difference will also be larger if you set the BufferSize to be lower than
- /// the default value. Your mileage may vary. Finally, for small files, the
- /// ParallelDeflateOutputStream can be much slower than the vanilla
- /// DeflateStream, because of the overhead associated to using the thread
- /// pool.
- /// </para>
- ///
- /// </remarks>
- /// <seealso cref="Ionic.Zlib.DeflateStream" />
- public class ParallelDeflateOutputStream : System.IO.Stream
- {
- private static readonly int IO_BUFFER_SIZE_DEFAULT = 64 * 1024; // 128k
- private static readonly int BufferPairsPerCore = 4;
- private System.Collections.Generic.List<WorkItem> _pool;
- private bool _leaveOpen;
- private bool emitting;
- private System.IO.Stream _outStream;
- private int _maxBufferPairs;
- private int _bufferSize = IO_BUFFER_SIZE_DEFAULT;
- private AutoResetEvent _newlyCompressedBlob;
- //private ManualResetEvent _writingDone;
- //private ManualResetEvent _sessionReset;
- private object _outputLock = new object();
- private bool _isClosed;
- private bool _firstWriteDone;
- private int _currentlyFilling;
- private int _lastFilled;
- private int _lastWritten;
- private int _latestCompressed;
- private int _Crc32;
- private Ionic.Crc.CRC32 _runningCrc;
- private object _latestLock = new object();
- private System.Collections.Generic.Queue<int> _toWrite;
- private System.Collections.Generic.Queue<int> _toFill;
- private Int64 _totalBytesProcessed;
- private Ionic.Zlib.CompressionLevel _compressLevel;
- private volatile Exception _pendingException;
- private bool _handlingException;
- private object _eLock = new Object(); // protects _pendingException
- // This bitfield is used only when Trace is defined.
- //private TraceBits _DesiredTrace = TraceBits.Write | TraceBits.WriteBegin |
- //TraceBits.WriteDone | TraceBits.Lifecycle | TraceBits.Fill | TraceBits.Flush |
- //TraceBits.Session;
- //private TraceBits _DesiredTrace = TraceBits.WriteBegin | TraceBits.WriteDone | TraceBits.Synch | TraceBits.Lifecycle | TraceBits.Session ;
- private TraceBits _DesiredTrace =
- TraceBits.Session |
- TraceBits.Compress |
- TraceBits.WriteTake |
- TraceBits.WriteEnter |
- TraceBits.EmitEnter |
- TraceBits.EmitDone |
- TraceBits.EmitLock |
- TraceBits.EmitSkip |
- TraceBits.EmitBegin;
- /// <summary>
- /// Create a ParallelDeflateOutputStream.
- /// </summary>
- /// <remarks>
- ///
- /// <para>
- /// This stream compresses data written into it via the DEFLATE
- /// algorithm (see RFC 1951), and writes out the compressed byte stream.
- /// </para>
- ///
- /// <para>
- /// The instance will use the default compression level, the default
- /// buffer sizes and the default number of threads and buffers per
- /// thread.
- /// </para>
- ///
- /// <para>
- /// This class is similar to <see cref="Ionic.Zlib.DeflateStream"/>,
- /// except that this implementation uses an approach that employs
- /// multiple worker threads to perform the DEFLATE. On a multi-cpu or
- /// multi-core computer, the performance of this class can be
- /// significantly higher than the single-threaded DeflateStream,
- /// particularly for larger streams. How large? Anything over 10mb is
- /// a good candidate for parallel compression.
- /// </para>
- ///
- /// </remarks>
- ///
- /// <example>
- ///
- /// This example shows how to use a ParallelDeflateOutputStream to compress
- /// data. It reads a file, compresses it, and writes the compressed data to
- /// a second, output file.
- ///
- /// <code>
- /// byte[] buffer = new byte[WORKING_BUFFER_SIZE];
- /// int n= -1;
- /// String outputFile = fileToCompress + ".compressed";
- /// using (System.IO.Stream input = System.IO.File.OpenRead(fileToCompress))
- /// {
- /// using (var raw = System.IO.File.Create(outputFile))
- /// {
- /// using (Stream compressor = new ParallelDeflateOutputStream(raw))
- /// {
- /// while ((n= input.Read(buffer, 0, buffer.Length)) != 0)
- /// {
- /// compressor.Write(buffer, 0, n);
- /// }
- /// }
- /// }
- /// }
- /// </code>
- /// <code lang="VB">
- /// Dim buffer As Byte() = New Byte(4096) {}
- /// Dim n As Integer = -1
- /// Dim outputFile As String = (fileToCompress & ".compressed")
- /// Using input As Stream = File.OpenRead(fileToCompress)
- /// Using raw As FileStream = File.Create(outputFile)
- /// Using compressor As Stream = New ParallelDeflateOutputStream(raw)
- /// Do While (n <> 0)
- /// If (n > 0) Then
- /// compressor.Write(buffer, 0, n)
- /// End If
- /// n = input.Read(buffer, 0, buffer.Length)
- /// Loop
- /// End Using
- /// End Using
- /// End Using
- /// </code>
- /// </example>
- /// <param name="stream">The stream to which compressed data will be written.</param>
- public ParallelDeflateOutputStream(System.IO.Stream stream)
- : this(stream, CompressionLevel.Default, CompressionStrategy.Default, false)
- {
- }
- /// <summary>
- /// Create a ParallelDeflateOutputStream using the specified CompressionLevel.
- /// </summary>
- /// <remarks>
- /// See the <see cref="ParallelDeflateOutputStream(System.IO.Stream)"/>
- /// constructor for example code.
- /// </remarks>
- /// <param name="stream">The stream to which compressed data will be written.</param>
- /// <param name="level">A tuning knob to trade speed for effectiveness.</param>
- public ParallelDeflateOutputStream(System.IO.Stream stream, CompressionLevel level)
- : this(stream, level, CompressionStrategy.Default, false)
- {
- }
- /// <summary>
- /// Create a ParallelDeflateOutputStream and specify whether to leave the captive stream open
- /// when the ParallelDeflateOutputStream is closed.
- /// </summary>
- /// <remarks>
- /// See the <see cref="ParallelDeflateOutputStream(System.IO.Stream)"/>
- /// constructor for example code.
- /// </remarks>
- /// <param name="stream">The stream to which compressed data will be written.</param>
- /// <param name="leaveOpen">
- /// true if the application would like the stream to remain open after inflation/deflation.
- /// </param>
- public ParallelDeflateOutputStream(System.IO.Stream stream, bool leaveOpen)
- : this(stream, CompressionLevel.Default, CompressionStrategy.Default, leaveOpen)
- {
- }
- /// <summary>
- /// Create a ParallelDeflateOutputStream and specify whether to leave the captive stream open
- /// when the ParallelDeflateOutputStream is closed.
- /// </summary>
- /// <remarks>
- /// See the <see cref="ParallelDeflateOutputStream(System.IO.Stream)"/>
- /// constructor for example code.
- /// </remarks>
- /// <param name="stream">The stream to which compressed data will be written.</param>
- /// <param name="level">A tuning knob to trade speed for effectiveness.</param>
- /// <param name="leaveOpen">
- /// true if the application would like the stream to remain open after inflation/deflation.
- /// </param>
- public ParallelDeflateOutputStream(System.IO.Stream stream, CompressionLevel level, bool leaveOpen)
- : this(stream, CompressionLevel.Default, CompressionStrategy.Default, leaveOpen)
- {
- }
- /// <summary>
- /// Create a ParallelDeflateOutputStream using the specified
- /// CompressionLevel and CompressionStrategy, and specifying whether to
- /// leave the captive stream open when the ParallelDeflateOutputStream is
- /// closed.
- /// </summary>
- /// <remarks>
- /// See the <see cref="ParallelDeflateOutputStream(System.IO.Stream)"/>
- /// constructor for example code.
- /// </remarks>
- /// <param name="stream">The stream to which compressed data will be written.</param>
- /// <param name="level">A tuning knob to trade speed for effectiveness.</param>
- /// <param name="strategy">
- /// By tweaking this parameter, you may be able to optimize the compression for
- /// data with particular characteristics.
- /// </param>
- /// <param name="leaveOpen">
- /// true if the application would like the stream to remain open after inflation/deflation.
- /// </param>
- public ParallelDeflateOutputStream(System.IO.Stream stream,
- CompressionLevel level,
- CompressionStrategy strategy,
- bool leaveOpen)
- {
- TraceOutput(TraceBits.Lifecycle | TraceBits.Session, "-------------------------------------------------------");
- TraceOutput(TraceBits.Lifecycle | TraceBits.Session, "Create {0:X8}", this.GetHashCode());
- _outStream = stream;
- _compressLevel= level;
- Strategy = strategy;
- _leaveOpen = leaveOpen;
- this.MaxBufferPairs = 16; // default
- }
- /// <summary>
- /// The ZLIB strategy to be used during compression.
- /// </summary>
- ///
- public CompressionStrategy Strategy
- {
- get;
- private set;
- }
- /// <summary>
- /// The maximum number of buffer pairs to use.
- /// </summary>
- ///
- /// <remarks>
- /// <para>
- /// This property sets an upper limit on the number of memory buffer
- /// pairs to create. The implementation of this stream allocates
- /// multiple buffers to facilitate parallel compression. As each buffer
- /// fills up, this stream uses <see
- /// cref="System.Threading.ThreadPool.QueueUserWorkItem(WaitCallback)">
- /// ThreadPool.QueueUserWorkItem()</see>
- /// to compress those buffers in a background threadpool thread. After a
- /// buffer is compressed, it is re-ordered and written to the output
- /// stream.
- /// </para>
- ///
- /// <para>
- /// A higher number of buffer pairs enables a higher degree of
- /// parallelism, which tends to increase the speed of compression on
- /// multi-cpu computers. On the other hand, a higher number of buffer
- /// pairs also implies a larger memory consumption, more active worker
- /// threads, and a higher cpu utilization for any compression. This
- /// property enables the application to limit its memory consumption and
- /// CPU utilization behavior depending on requirements.
- /// </para>
- ///
- /// <para>
- /// For each compression "task" that occurs in parallel, there are 2
- /// buffers allocated: one for input and one for output. This property
- /// sets a limit for the number of pairs. The total amount of storage
- /// space allocated for buffering will then be (N*S*2), where N is the
- /// number of buffer pairs, S is the size of each buffer (<see
- /// cref="BufferSize"/>). By default, DotNetZip allocates 4 buffer
- /// pairs per CPU core, so if your machine has 4 cores, and you retain
- /// the default buffer size of 128k, then the
- /// ParallelDeflateOutputStream will use 4 * 4 * 2 * 128kb of buffer
- /// memory in total, or 4mb, in blocks of 128kb. If you then set this
- /// property to 8, then the number will be 8 * 2 * 128kb of buffer
- /// memory, or 2mb.
- /// </para>
- ///
- /// <para>
- /// CPU utilization will also go up with additional buffers, because a
- /// larger number of buffer pairs allows a larger number of background
- /// threads to compress in parallel. If you find that parallel
- /// compression is consuming too much memory or CPU, you can adjust this
- /// value downward.
- /// </para>
- ///
- /// <para>
- /// The default value is 16. Different values may deliver better or
- /// worse results, depending on your priorities and the dynamic
- /// performance characteristics of your storage and compute resources.
- /// </para>
- ///
- /// <para>
- /// This property is not the number of buffer pairs to use; it is an
- /// upper limit. An illustration: Suppose you have an application that
- /// uses the default value of this property (which is 16), and it runs
- /// on a machine with 2 CPU cores. In that case, DotNetZip will allocate
- /// 4 buffer pairs per CPU core, for a total of 8 pairs. The upper
- /// limit specified by this property has no effect.
- /// </para>
- ///
- /// <para>
- /// The application can set this value at any time, but it is effective
- /// only before the first call to Write(), which is when the buffers are
- /// allocated.
- /// </para>
- /// </remarks>
- public int MaxBufferPairs
- {
- get
- {
- return _maxBufferPairs;
- }
- set
- {
- if (value < 4)
- throw new ArgumentException("MaxBufferPairs",
- "Value must be 4 or greater.");
- _maxBufferPairs = value;
- }
- }
- /// <summary>
- /// The size of the buffers used by the compressor threads.
- /// </summary>
- /// <remarks>
- ///
- /// <para>
- /// The default buffer size is 128k. The application can set this value
- /// at any time, but it is effective only before the first Write().
- /// </para>
- ///
- /// <para>
- /// Larger buffer sizes implies larger memory consumption but allows
- /// more efficient compression. Using smaller buffer sizes consumes less
- /// memory but may result in less effective compression. For example,
- /// using the default buffer size of 128k, the compression delivered is
- /// within 1% of the compression delivered by the single-threaded <see
- /// cref="Ionic.Zlib.DeflateStream"/>. On the other hand, using a
- /// BufferSize of 8k can result in a compressed data stream that is 5%
- /// larger than that delivered by the single-threaded
- /// <c>DeflateStream</c>. Excessively small buffer sizes can also cause
- /// the speed of the ParallelDeflateOutputStream to drop, because of
- /// larger thread scheduling overhead dealing with many many small
- /// buffers.
- /// </para>
- ///
- /// <para>
- /// The total amount of storage space allocated for buffering will be
- /// (N*S*2), where N is the number of buffer pairs, and S is the size of
- /// each buffer (this property). There are 2 buffers used by the
- /// compressor, one for input and one for output. By default, DotNetZip
- /// allocates 4 buffer pairs per CPU core, so if your machine has 4
- /// cores, then the number of buffer pairs used will be 16. If you
- /// accept the default value of this property, 128k, then the
- /// ParallelDeflateOutputStream will use 16 * 2 * 128kb of buffer memory
- /// in total, or 4mb, in blocks of 128kb. If you set this property to
- /// 64kb, then the number will be 16 * 2 * 64kb of buffer memory, or
- /// 2mb.
- /// </para>
- ///
- /// </remarks>
- public int BufferSize
- {
- get { return _bufferSize;}
- set
- {
- if (value < 1024)
- throw new ArgumentOutOfRangeException("BufferSize",
- "BufferSize must be greater than 1024 bytes");
- _bufferSize = value;
- }
- }
- /// <summary>
- /// The CRC32 for the data that was written out, prior to compression.
- /// </summary>
- /// <remarks>
- /// This value is meaningful only after a call to Close().
- /// </remarks>
- public int Crc32 { get { return _Crc32; } }
- /// <summary>
- /// The total number of uncompressed bytes processed by the ParallelDeflateOutputStream.
- /// </summary>
- /// <remarks>
- /// This value is meaningful only after a call to Close().
- /// </remarks>
- public Int64 BytesProcessed { get { return _totalBytesProcessed; } }
- private void _InitializePoolOfWorkItems()
- {
- _toWrite = new Queue<int>();
- _toFill = new Queue<int>();
- _pool = new System.Collections.Generic.List<WorkItem>();
- int nTasks = BufferPairsPerCore * Environment.ProcessorCount;
- nTasks = Math.Min(nTasks, _maxBufferPairs);
- for(int i=0; i < nTasks; i++)
- {
- _pool.Add(new WorkItem(_bufferSize, _compressLevel, Strategy, i));
- _toFill.Enqueue(i);
- }
- _newlyCompressedBlob = new AutoResetEvent(false);
- _runningCrc = new Ionic.Crc.CRC32();
- _currentlyFilling = -1;
- _lastFilled = -1;
- _lastWritten = -1;
- _latestCompressed = -1;
- }
- /// <summary>
- /// Write data to the stream.
- /// </summary>
- ///
- /// <remarks>
- ///
- /// <para>
- /// To use the ParallelDeflateOutputStream to compress data, create a
- /// ParallelDeflateOutputStream with CompressionMode.Compress, passing a
- /// writable output stream. Then call Write() on that
- /// ParallelDeflateOutputStream, providing uncompressed data as input. The
- /// data sent to the output stream will be the compressed form of the data
- /// written.
- /// </para>
- ///
- /// <para>
- /// To decompress data, use the <see cref="Ionic.Zlib.DeflateStream"/> class.
- /// </para>
- ///
- /// </remarks>
- /// <param name="buffer">The buffer holding data to write to the stream.</param>
- /// <param name="offset">the offset within that data array to find the first byte to write.</param>
- /// <param name="count">the number of bytes to write.</param>
- public override void Write(byte[] buffer, int offset, int count)
- {
- bool mustWait = false;
- // This method does this:
- // 0. handles any pending exceptions
- // 1. write any buffers that are ready to be written,
- // 2. fills a work buffer; when full, flip state to 'Filled',
- // 3. if more data to be written, goto step 1
- if (_isClosed)
- throw new InvalidOperationException();
- // dispense any exceptions that occurred on the BG threads
- if (_pendingException != null)
- {
- _handlingException = true;
- var pe = _pendingException;
- _pendingException = null;
- throw pe;
- }
- if (count == 0) return;
- if (!_firstWriteDone)
- {
- // Want to do this on first Write, first session, and not in the
- // constructor. We want to allow MaxBufferPairs to
- // change after construction, but before first Write.
- _InitializePoolOfWorkItems();
- _firstWriteDone = true;
- }
- do
- {
- // may need to make buffers available
- EmitPendingBuffers(false, mustWait);
- mustWait = false;
- // use current buffer, or get a new buffer to fill
- int ix = -1;
- if (_currentlyFilling >= 0)
- {
- ix = _currentlyFilling;
- TraceOutput(TraceBits.WriteTake,
- "Write notake wi({0}) lf({1})",
- ix,
- _lastFilled);
- }
- else
- {
- TraceOutput(TraceBits.WriteTake, "Write take?");
- if (_toFill.Count == 0)
- {
- // no available buffers, so... need to emit
- // compressed buffers.
- mustWait = true;
- continue;
- }
- ix = _toFill.Dequeue();
- TraceOutput(TraceBits.WriteTake,
- "Write take wi({0}) lf({1})",
- ix,
- _lastFilled);
- ++_lastFilled; // TODO: consider rollover?
- }
- WorkItem workitem = _pool[ix];
- int limit = ((workitem.buffer.Length - workitem.inputBytesAvailable) > count)
- ? count
- : (workitem.buffer.Length - workitem.inputBytesAvailable);
- workitem.ordinal = _lastFilled;
- TraceOutput(TraceBits.Write,
- "Write lock wi({0}) ord({1}) iba({2})",
- workitem.index,
- workitem.ordinal,
- workitem.inputBytesAvailable
- );
- // copy from the provided buffer to our workitem, starting at
- // the tail end of whatever data we might have in there currently.
- Buffer.BlockCopy(buffer,
- offset,
- workitem.buffer,
- workitem.inputBytesAvailable,
- limit);
- count -= limit;
- offset += limit;
- workitem.inputBytesAvailable += limit;
- if (workitem.inputBytesAvailable == workitem.buffer.Length)
- {
- // No need for interlocked.increment: the Write()
- // method is documented as not multi-thread safe, so
- // we can assume Write() calls come in from only one
- // thread.
- TraceOutput(TraceBits.Write,
- "Write QUWI wi({0}) ord({1}) iba({2}) nf({3})",
- workitem.index,
- workitem.ordinal,
- workitem.inputBytesAvailable );
- #if !PCL
- if (!ThreadPool.QueueUserWorkItem( _DeflateOne, workitem ))
- throw new Exception("Cannot enqueue workitem");
- #else
- System.Threading.Tasks.Task.Run(() => _DeflateOne(workitem));
- #endif
- _currentlyFilling = -1; // will get a new buffer next time
- }
- else
- _currentlyFilling = ix;
- if (count > 0)
- TraceOutput(TraceBits.WriteEnter, "Write more");
- }
- while (count > 0); // until no more to write
- TraceOutput(TraceBits.WriteEnter, "Write exit");
- return;
- }
- private void _FlushFinish()
- {
- // After writing a series of compressed buffers, each one closed
- // with Flush.Sync, we now write the final one as Flush.Finish,
- // and then stop.
- byte[] buffer = new byte[128];
- var compressor = new ZlibCodec();
- int rc = compressor.InitializeDeflate(_compressLevel, false);
- compressor.InputBuffer = null;
- compressor.NextIn = 0;
- compressor.AvailableBytesIn = 0;
- compressor.OutputBuffer = buffer;
- compressor.NextOut = 0;
- compressor.AvailableBytesOut = buffer.Length;
- rc = compressor.Deflate(FlushType.Finish);
- if (rc != ZlibConstants.Z_STREAM_END && rc != ZlibConstants.Z_OK)
- throw new Exception("deflating: " + compressor.Message);
- if (buffer.Length - compressor.AvailableBytesOut > 0)
- {
- TraceOutput(TraceBits.EmitBegin,
- "Emit begin flush bytes({0})",
- buffer.Length - compressor.AvailableBytesOut);
- _outStream.Write(buffer, 0, buffer.Length - compressor.AvailableBytesOut);
- TraceOutput(TraceBits.EmitDone,
- "Emit done flush");
- }
- compressor.EndDeflate();
- _Crc32 = _runningCrc.Crc32Result;
- }
- private void _Flush(bool lastInput)
- {
- if (_isClosed)
- throw new InvalidOperationException();
- if (emitting) return;
- // compress any partial buffer
- if (_currentlyFilling >= 0)
- {
- WorkItem workitem = _pool[_currentlyFilling];
- _DeflateOne(workitem);
- _currentlyFilling = -1; // get a new buffer next Write()
- }
- if (lastInput)
- {
- EmitPendingBuffers(true, false);
- _FlushFinish();
- }
- else
- {
- EmitPendingBuffers(false, false);
- }
- }
- /// <summary>
- /// Flush the stream.
- /// </summary>
- public override void Flush()
- {
- if (_pendingException != null)
- {
- _handlingException = true;
- var pe = _pendingException;
- _pendingException = null;
- throw pe;
- }
- if (_handlingException)
- return;
- _Flush(false);
- }
- #if !PCL
- /// <summary>
- /// Close the stream.
- /// </summary>
- /// <remarks>
- /// You must call Close on the stream to guarantee that all of the data written in has
- /// been compressed, and the compressed data has been written out.
- /// </remarks>
- public override void Close()
- {
- InnerClose();
- }
- #endif
- private void InnerClose()
- {
- TraceOutput(TraceBits.Session, "Close {0:X8}", this.GetHashCode());
- if (_pendingException != null)
- {
- _handlingException = true;
- var pe = _pendingException;
- _pendingException = null;
- throw pe;
- }
- if (_handlingException)
- return;
- if (_isClosed) return;
- _Flush(true);
- if (!_leaveOpen)
- _outStream.Dispose();
- _isClosed= true;
- }
- // workitem 10030 - implement a new Dispose method
- /// <summary>Dispose the object</summary>
- /// <remarks>
- /// <para>
- /// Because ParallelDeflateOutputStream is IDisposable, the
- /// application must call this method when finished using the instance.
- /// </para>
- /// <para>
- /// This method is generally called implicitly upon exit from
- /// a <c>using</c> scope in C# (<c>Using</c> in VB).
- /// </para>
- /// </remarks>
- new public void Dispose()
- {
- TraceOutput(TraceBits.Lifecycle, "Dispose {0:X8}", this.GetHashCode());
- _pool = null;
- Dispose(true);
- }
- /// <summary>The Dispose method</summary>
- /// <param name="disposing">
- /// indicates whether the Dispose method was invoked by user code.
- /// </param>
- protected override void Dispose(bool disposing)
- {
- base.Dispose(disposing);
- InnerClose();
- }
- /// <summary>
- /// Resets the stream for use with another stream.
- /// </summary>
- /// <remarks>
- /// Because the ParallelDeflateOutputStream is expensive to create, it
- /// has been designed so that it can be recycled and re-used. You have
- /// to call Close() on the stream first, then you can call Reset() on
- /// it, to use it again on another stream.
- /// </remarks>
- ///
- /// <param name="stream">
- /// The new output stream for this era.
- /// </param>
- ///
- /// <example>
- /// <code>
- /// ParallelDeflateOutputStream deflater = null;
- /// foreach (var inputFile in listOfFiles)
- /// {
- /// string outputFile = inputFile + ".compressed";
- /// using (System.IO.Stream input = System.IO.File.OpenRead(inputFile))
- /// {
- /// using (var outStream = System.IO.File.Create(outputFile))
- /// {
- /// if (deflater == null)
- /// deflater = new ParallelDeflateOutputStream(outStream,
- /// CompressionLevel.Best,
- /// CompressionStrategy.Default,
- /// true);
- /// deflater.Reset(outStream);
- ///
- /// while ((n= input.Read(buffer, 0, buffer.Length)) != 0)
- /// {
- /// deflater.Write(buffer, 0, n);
- /// }
- /// }
- /// }
- /// }
- /// </code>
- /// </example>
- public void Reset(Stream stream)
- {
- TraceOutput(TraceBits.Session, "-------------------------------------------------------");
- TraceOutput(TraceBits.Session, "Reset {0:X8} firstDone({1})", this.GetHashCode(), _firstWriteDone);
- if (!_firstWriteDone) return;
- // reset all status
- _toWrite.Clear();
- _toFill.Clear();
- foreach (var workitem in _pool)
- {
- _toFill.Enqueue(workitem.index);
- workitem.ordinal = -1;
- }
- _firstWriteDone = false;
- _totalBytesProcessed = 0L;
- _runningCrc = new Ionic.Crc.CRC32();
- _isClosed= false;
- _currentlyFilling = -1;
- _lastFilled = -1;
- _lastWritten = -1;
- _latestCompressed = -1;
- _outStream = stream;
- }
- private void EmitPendingBuffers(bool doAll, bool mustWait)
- {
- // When combining parallel deflation with a ZipSegmentedStream, it's
- // possible for the ZSS to throw from within this method. In that
- // case, Close/Dispose will be called on this stream, if this stream
- // is employed within a using or try/finally pair as required. But
- // this stream is unaware of the pending exception, so the Close()
- // method invokes this method AGAIN. This can lead to a deadlock.
- // Therefore, failfast if re-entering.
- if (emitting) return;
- emitting = true;
- if ((doAll && (_latestCompressed != _lastFilled)) || mustWait) {
- _newlyCompressedBlob.WaitOne();
- }
- do
- {
- int firstSkip = -1;
- int millisecondsToWait = doAll ? 200 : (mustWait ? -1 : 0);
- int nextToWrite = -1;
- do
- {
- if (Monitor.TryEnter(_toWrite, millisecondsToWait))
- {
- nextToWrite = -1;
- try
- {
- if (_toWrite.Count > 0)
- nextToWrite = _toWrite.Dequeue();
- }
- finally
- {
- Monitor.Exit(_toWrite);
- }
- if (nextToWrite >= 0)
- {
- WorkItem workitem = _pool[nextToWrite];
- if (workitem.ordinal != _lastWritten + 1)
- {
- // out of order. requeue and try again.
- TraceOutput(TraceBits.EmitSkip,
- "Emit skip wi({0}) ord({1}) lw({2}) fs({3})",
- workitem.index,
- workitem.ordinal,
- _lastWritten,
- firstSkip);
- lock(_toWrite)
- {
- _toWrite.Enqueue(nextToWrite);
- }
- if (firstSkip == nextToWrite)
- {
- // We went around the list once.
- // None of the items in the list is the one we want.
- // Now wait for a compressor to signal again.
- _newlyCompressedBlob.WaitOne();
- firstSkip = -1;
- }
- else if (firstSkip == -1)
- firstSkip = nextToWrite;
- continue;
- }
- firstSkip = -1;
- TraceOutput(TraceBits.EmitBegin,
- "Emit begin wi({0}) ord({1}) cba({2})",
- workitem.index,
- workitem.ordinal,
- workitem.compressedBytesAvailable);
- _outStream.Write(workitem.compressed, 0, workitem.compressedBytesAvailable);
- _runningCrc.Combine(workitem.crc, workitem.inputBytesAvailable);
- _totalBytesProcessed += workitem.inputBytesAvailable;
- workitem.inputBytesAvailable = 0;
- TraceOutput(TraceBits.EmitDone,
- "Emit done wi({0}) ord({1}) cba({2}) mtw({3})",
- workitem.index,
- workitem.ordinal,
- workitem.compressedBytesAvailable,
- millisecondsToWait);
- _lastWritten = workitem.ordinal;
- _toFill.Enqueue(workitem.index);
- // don't wait next time through
- if (millisecondsToWait == -1) millisecondsToWait = 0;
- }
- }
- else
- nextToWrite = -1;
- } while (nextToWrite >= 0);
- } while (doAll && (_lastWritten != _latestCompressed || _lastWritten != _lastFilled));
- emitting = false;
- }
- #if OLD
- private void _PerpetualWriterMethod(object state)
- {
- TraceOutput(TraceBits.WriterThread, "_PerpetualWriterMethod START");
- try
- {
- do
- {
- // wait for the next session
- TraceOutput(TraceBits.Synch | TraceBits.WriterThread, "Synch _sessionReset.WaitOne(begin) PWM");
- _sessionReset.WaitOne();
- TraceOutput(TraceBits.Synch | TraceBits.WriterThread, "Synch _sessionReset.WaitOne(done) PWM");
- if (_isDisposed) break;
- TraceOutput(TraceBits.Synch | TraceBits.WriterThread, "Synch _sessionReset.Reset() PWM");
- _sessionReset.Reset();
- // repeatedly write buffers as they become ready
- WorkItem workitem = null;
- Ionic.Zlib.CRC32 c= new Ionic.Zlib.CRC32();
- do
- {
- workitem = _pool[_nextToWrite % _pc];
- lock(workitem)
- {
- if (_noMoreInputForThisSegment)
- TraceOutput(TraceBits.Write,
- "Write drain wi({0}) stat({1}) canuse({2}) cba({3})",
- workitem.index,
- workitem.status,
- (workitem.status == (int)WorkItem.Status.Compressed),
- workitem.compressedBytesAvailable);
- do
- {
- if (workitem.status == (int)WorkItem.Status.Compressed)
- {
- TraceOutput(TraceBits.WriteBegin,
- "Write begin wi({0}) stat({1}) cba({2})",
- workitem.index,
- workitem.status,
- workitem.compressedBytesAvailable);
- workitem.status = (int)WorkItem.Status.Writing;
- _outStream.Write(workitem.compressed, 0, workitem.compressedBytesAvailable);
- c.Combine(workitem.crc, workitem.inputBytesAvailable);
- _totalBytesProcessed += workitem.inputBytesAvailable;
- _nextToWrite++;
- workitem.inputBytesAvailable= 0;
- workitem.status = (int)WorkItem.Status.Done;
- TraceOutput(TraceBits.WriteDone,
- "Write done wi({0}) stat({1}) cba({2})",
- workitem.index,
- workitem.status,
- workitem.compressedBytesAvailable);
- Monitor.Pulse(workitem);
- break;
- }
- else
- {
- int wcycles = 0;
- // I've locked a workitem I cannot use.
- // Therefore, wake someone else up, and then release the lock.
- while (workitem.status != (int)WorkItem.Status.Compressed)
- {
- TraceOutput(TraceBits.WriteWait,
- "Write waiting wi({0}) stat({1}) nw({2}) nf({3}) nomore({4})",
- workitem.index,
- workitem.status,
- _nextToWrite, _nextToFill,
- _noMoreInputForThisSegment );
- if (_noMoreInputForThisSegment && _nextToWrite == _nextToFill)
- break;
- wcycles++;
- // wake up someone else
- Monitor.Pulse(workitem);
- // release and wait
- Monitor.Wait(workitem);
- if (workitem.status == (int)WorkItem.Status.Compressed)
- TraceOutput(TraceBits.WriteWait,
- "Write A-OK wi({0}) stat({1}) iba({2}) cba({3}) cyc({4})",
- workitem.index,
- workitem.status,
- workitem.inputBytesAvailable,
- workitem.compressedBytesAvailable,
- wcycles);
- }
- if (_noMoreInputForThisSegment && _nextToWrite == _nextToFill)
- break;
- }
- }
- while (true);
- }
- if (_noMoreInputForThisSegment)
- TraceOutput(TraceBits.Write,
- "Write nomore nw({0}) nf({1}) break({2})",
- _nextToWrite, _nextToFill, (_nextToWrite == _nextToFill));
- if (_noMoreInputForThisSegment && _nextToWrite == _nextToFill)
- break;
- } while (true);
- // Finish:
- // After writing a series of buffers, closing each one with
- // Flush.Sync, we now write the final one as Flush.Finish, and
- // then stop.
- byte[] buffer = new byte[128];
- ZlibCodec compressor = new ZlibCodec();
- int rc = compressor.InitializeDeflate(_compressLevel, false);
- compressor.InputBuffer = null;
- compressor.NextIn = 0;
- compressor.AvailableBytesIn = 0;
- compressor.OutputBuffer = buffer;
- compressor.NextOut = 0;
- compressor.AvailableBytesOut = buffer.Length;
- rc = compressor.Deflate(FlushType.Finish);
- if (rc != ZlibConstants.Z_STREAM_END && rc != ZlibConstants.Z_OK)
- throw new Exception("deflating: " + compressor.Message);
- if (buffer.Length - compressor.AvailableBytesOut > 0)
- {
- TraceOutput(TraceBits.WriteBegin,
- "Write begin flush bytes({0})",
- buffer.Length - compressor.AvailableBytesOut);
- _outStream.Write(buffer, 0, buffer.Length - compressor.AvailableBytesOut);
- TraceOutput(TraceBits.WriteBegin,
- "Write done flush");
- }
- compressor.EndDeflate();
- _Crc32 = c.Crc32Result;
- // signal that writing is complete:
- TraceOutput(TraceBits.Synch, "Synch _writingDone.Set() PWM");
- _writingDone.Set();
- }
- while (true);
- }
- catch (System.Exception exc1)
- {
- lock(_eLock)
- {
- // expose the exception to the main thread
- if (_pendingException!=null)
- _pendingException = exc1;
- }
- }
- TraceOutput(TraceBits.WriterThread, "_PerpetualWriterMethod FINIS");
- }
- #endif
- private void _DeflateOne(Object wi)
- {
- // compress one buffer
- WorkItem workitem = (WorkItem) wi;
- try
- {
- int myItem = workitem.index;
- Ionic.Crc.CRC32 crc = new Ionic.Crc.CRC32();
- // calc CRC on the buffer
- crc.SlurpBlock(workitem.buffer, 0, workitem.inputBytesAvailable);
- // deflate it
- DeflateOneSegment(workitem);
- // update status
- workitem.crc = crc.Crc32Result;
- TraceOutput(TraceBits.Compress,
- "Compress wi({0}) ord({1}) len({2})",
- workitem.index,
- workitem.ordinal,
- workitem.compressedBytesAvailable
- );
- lock(_latestLock)
- {
- if (workitem.ordinal > _latestCompressed)
- _latestCompressed = workitem.ordinal;
- }
- lock (_toWrite)
- {
- _toWrite.Enqueue(workitem.index);
- }
- _newlyCompressedBlob.Set();
- }
- catch (System.Exception exc1)
- {
- lock(_eLock)
- {
- // expose the exception to the main thread
- if (_pendingException!=null)
- _pendingException = exc1;
- }
- }
- }
- private bool DeflateOneSegment(WorkItem workitem)
- {
- ZlibCodec compressor = workitem.compressor;
- int rc= 0;
- compressor.ResetDeflate();
- compressor.NextIn = 0;
- compressor.AvailableBytesIn = workitem.inputBytesAvailable;
- // step 1: deflate the buffer
- compressor.NextOut = 0;
- compressor.AvailableBytesOut = workitem.compressed.Length;
- do
- {
- compressor.Deflate(FlushType.None);
- }
- while (compressor.AvailableBytesIn > 0 || compressor.AvailableBytesOut == 0);
- // step 2: flush (sync)
- rc = compressor.Deflate(FlushType.Sync);
- workitem.compressedBytesAvailable= (int) compressor.TotalBytesOut;
- return true;
- }
- [System.Diagnostics.ConditionalAttribute("Trace")]
- private void TraceOutput(TraceBits bits, string format, params object[] varParams)
- {
- if ((bits & _DesiredTrace) != 0)
- {
- lock(_outputLock)
- {
- #if !PCL
- int tid = Thread.CurrentThread.GetHashCode();
- #if !SILVERLIGHT
- Console.ForegroundColor = (ConsoleColor) (tid % 8 + 8);
- #endif
- Console.Write("{0:000} PDOS ", tid);
- Console.WriteLine(format, varParams);
- #if !SILVERLIGHT
- Console.ResetColor();
- #endif
- #endif
- }
- }
- }
- // used only when Trace is defined
- [Flags]
- enum TraceBits : uint
- {
- None = 0,
- NotUsed1 = 1,
- EmitLock = 2,
- EmitEnter = 4, // enter _EmitPending
- EmitBegin = 8, // begin to write out
- EmitDone = 16, // done writing out
- EmitSkip = 32, // writer skipping a workitem
- EmitAll = 58, // All Emit flags
- Flush = 64,
- Lifecycle = 128, // constructor/disposer
- Session = 256, // Close/Reset
- Synch = 512, // thread synchronization
- Instance = 1024, // instance settings
- Compress = 2048, // compress task
- Write = 4096, // filling buffers, when caller invokes Write()
- WriteEnter = 8192, // upon entry to Write()
- WriteTake = 16384, // on _toFill.Take()
- All = 0xffffffff,
- }
- /// <summary>
- /// Indicates whether the stream supports Seek operations.
- /// </summary>
- /// <remarks>
- /// Always returns false.
- /// </remarks>
- public override bool CanSeek
- {
- get { return false; }
- }
- /// <summary>
- /// Indicates whether the stream supports Read operations.
- /// </summary>
- /// <remarks>
- /// Always returns false.
- /// </remarks>
- public override bool CanRead
- {
- get {return false;}
- }
- /// <summary>
- /// Indicates whether the stream supports Write operations.
- /// </summary>
- /// <remarks>
- /// Returns true if the provided stream is writable.
- /// </remarks>
- public override bool CanWrite
- {
- get { return _outStream.CanWrite; }
- }
- /// <summary>
- /// Reading this property always throws a NotSupportedException.
- /// </summary>
- public override long Length
- {
- get { throw new NotSupportedException(); }
- }
- /// <summary>
- /// Returns the current position of the output stream.
- /// </summary>
- /// <remarks>
- /// <para>
- /// Because the output gets written by a background thread,
- /// the value may change asynchronously. Setting this
- /// property always throws a NotSupportedException.
- /// </para>
- /// </remarks>
- public override long Position
- {
- get { return _outStream.Position; }
- set { throw new NotSupportedException(); }
- }
- /// <summary>
- /// This method always throws a NotSupportedException.
- /// </summary>
- /// <param name="buffer">
- /// The buffer into which data would be read, IF THIS METHOD
- /// ACTUALLY DID ANYTHING.
- /// </param>
- /// <param name="offset">
- /// The offset within that data array at which to insert the
- /// data that is read, IF THIS METHOD ACTUALLY DID
- /// ANYTHING.
- /// </param>
- /// <param name="count">
- /// The number of bytes to write, IF THIS METHOD ACTUALLY DID
- /// ANYTHING.
- /// </param>
- /// <returns>nothing.</returns>
- public override int Read(byte[] buffer, int offset, int count)
- {
- throw new NotSupportedException();
- }
- /// <summary>
- /// This method always throws a NotSupportedException.
- /// </summary>
- /// <param name="offset">
- /// The offset to seek to....
- /// IF THIS METHOD ACTUALLY DID ANYTHING.
- /// </param>
- /// <param name="origin">
- /// The reference specifying how to apply the offset.... IF
- /// THIS METHOD ACTUALLY DID ANYTHING.
- /// </param>
- /// <returns>nothing. It always throws.</returns>
- public override long Seek(long offset, System.IO.SeekOrigin origin)
- {
- throw new NotSupportedException();
- }
- /// <summary>
- /// This method always throws a NotSupportedException.
- /// </summary>
- /// <param name="value">
- /// The new value for the stream length.... IF
- /// THIS METHOD ACTUALLY DID ANYTHING.
- /// </param>
- public override void SetLength(long value)
- {
- throw new NotSupportedException();
- }
- }
- }
|