ParallelDeflateOutputStream.cs 55 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401
  1. //#define Trace
  2. // ParallelDeflateOutputStream.cs
  3. // ------------------------------------------------------------------
  4. //
  5. // A DeflateStream that does compression only, it uses a
  6. // divide-and-conquer approach with multiple threads to exploit multiple
  7. // CPUs for the DEFLATE computation.
  8. //
  9. // last saved: <2011-July-31 14:49:40>
  10. //
  11. // ------------------------------------------------------------------
  12. //
  13. // Copyright (c) 2009-2011 by Dino Chiesa
  14. // All rights reserved!
  15. //
  16. // This code module is part of DotNetZip, a zipfile class library.
  17. //
  18. // ------------------------------------------------------------------
  19. //
  20. // This code is licensed under the Microsoft Public License.
  21. // See the file License.txt for the license details.
  22. // More info on: http://dotnetzip.codeplex.com
  23. //
  24. // ------------------------------------------------------------------
  25. using System;
  26. using System.Collections.Generic;
  27. using System.Threading;
  28. using Ionic.Zlib;
  29. using System.IO;
  30. namespace Ionic.Zlib
  31. {
  32. internal class WorkItem
  33. {
  34. public byte[] buffer;
  35. public byte[] compressed;
  36. public int crc;
  37. public int index;
  38. public int ordinal;
  39. public int inputBytesAvailable;
  40. public int compressedBytesAvailable;
  41. public ZlibCodec compressor;
  42. public WorkItem(int size,
  43. Ionic.Zlib.CompressionLevel compressLevel,
  44. CompressionStrategy strategy,
  45. int ix)
  46. {
  47. this.buffer= new byte[size];
  48. // alloc 5 bytes overhead for every block (margin of safety= 2)
  49. int n = size + ((size / 32768)+1) * 5 * 2;
  50. this.compressed = new byte[n];
  51. this.compressor = new ZlibCodec();
  52. this.compressor.InitializeDeflate(compressLevel, false);
  53. this.compressor.OutputBuffer = this.compressed;
  54. this.compressor.InputBuffer = this.buffer;
  55. this.index = ix;
  56. }
  57. }
  58. /// <summary>
  59. /// A class for compressing streams using the
  60. /// Deflate algorithm with multiple threads.
  61. /// </summary>
  62. ///
  63. /// <remarks>
  64. /// <para>
  65. /// This class performs DEFLATE compression through writing. For
  66. /// more information on the Deflate algorithm, see IETF RFC 1951,
  67. /// "DEFLATE Compressed Data Format Specification version 1.3."
  68. /// </para>
  69. ///
  70. /// <para>
  71. /// This class is similar to <see cref="Ionic.Zlib.DeflateStream"/>, except
  72. /// that this class is for compression only, and this implementation uses an
  73. /// approach that employs multiple worker threads to perform the DEFLATE. On
  74. /// a multi-cpu or multi-core computer, the performance of this class can be
  75. /// significantly higher than the single-threaded DeflateStream, particularly
  76. /// for larger streams. How large? Anything over 10mb is a good candidate
  77. /// for parallel compression.
  78. /// </para>
  79. ///
  80. /// <para>
  81. /// The tradeoff is that this class uses more memory and more CPU than the
  82. /// vanilla DeflateStream, and also is less efficient as a compressor. For
  83. /// large files the size of the compressed data stream can be less than 1%
  84. /// larger than the size of a compressed data stream from the vanialla
  85. /// DeflateStream. For smaller files the difference can be larger. The
  86. /// difference will also be larger if you set the BufferSize to be lower than
  87. /// the default value. Your mileage may vary. Finally, for small files, the
  88. /// ParallelDeflateOutputStream can be much slower than the vanilla
  89. /// DeflateStream, because of the overhead associated to using the thread
  90. /// pool.
  91. /// </para>
  92. ///
  93. /// </remarks>
  94. /// <seealso cref="Ionic.Zlib.DeflateStream" />
  95. public class ParallelDeflateOutputStream : System.IO.Stream
  96. {
  97. private static readonly int IO_BUFFER_SIZE_DEFAULT = 64 * 1024; // 128k
  98. private static readonly int BufferPairsPerCore = 4;
  99. private System.Collections.Generic.List<WorkItem> _pool;
  100. private bool _leaveOpen;
  101. private bool emitting;
  102. private System.IO.Stream _outStream;
  103. private int _maxBufferPairs;
  104. private int _bufferSize = IO_BUFFER_SIZE_DEFAULT;
  105. private AutoResetEvent _newlyCompressedBlob;
  106. //private ManualResetEvent _writingDone;
  107. //private ManualResetEvent _sessionReset;
  108. private object _outputLock = new object();
  109. private bool _isClosed;
  110. private bool _firstWriteDone;
  111. private int _currentlyFilling;
  112. private int _lastFilled;
  113. private int _lastWritten;
  114. private int _latestCompressed;
  115. private int _Crc32;
  116. private Ionic.Crc.CRC32 _runningCrc;
  117. private object _latestLock = new object();
  118. private System.Collections.Generic.Queue<int> _toWrite;
  119. private System.Collections.Generic.Queue<int> _toFill;
  120. private Int64 _totalBytesProcessed;
  121. private Ionic.Zlib.CompressionLevel _compressLevel;
  122. private volatile Exception _pendingException;
  123. private bool _handlingException;
  124. private object _eLock = new Object(); // protects _pendingException
  125. // This bitfield is used only when Trace is defined.
  126. //private TraceBits _DesiredTrace = TraceBits.Write | TraceBits.WriteBegin |
  127. //TraceBits.WriteDone | TraceBits.Lifecycle | TraceBits.Fill | TraceBits.Flush |
  128. //TraceBits.Session;
  129. //private TraceBits _DesiredTrace = TraceBits.WriteBegin | TraceBits.WriteDone | TraceBits.Synch | TraceBits.Lifecycle | TraceBits.Session ;
  130. private TraceBits _DesiredTrace =
  131. TraceBits.Session |
  132. TraceBits.Compress |
  133. TraceBits.WriteTake |
  134. TraceBits.WriteEnter |
  135. TraceBits.EmitEnter |
  136. TraceBits.EmitDone |
  137. TraceBits.EmitLock |
  138. TraceBits.EmitSkip |
  139. TraceBits.EmitBegin;
  140. /// <summary>
  141. /// Create a ParallelDeflateOutputStream.
  142. /// </summary>
  143. /// <remarks>
  144. ///
  145. /// <para>
  146. /// This stream compresses data written into it via the DEFLATE
  147. /// algorithm (see RFC 1951), and writes out the compressed byte stream.
  148. /// </para>
  149. ///
  150. /// <para>
  151. /// The instance will use the default compression level, the default
  152. /// buffer sizes and the default number of threads and buffers per
  153. /// thread.
  154. /// </para>
  155. ///
  156. /// <para>
  157. /// This class is similar to <see cref="Ionic.Zlib.DeflateStream"/>,
  158. /// except that this implementation uses an approach that employs
  159. /// multiple worker threads to perform the DEFLATE. On a multi-cpu or
  160. /// multi-core computer, the performance of this class can be
  161. /// significantly higher than the single-threaded DeflateStream,
  162. /// particularly for larger streams. How large? Anything over 10mb is
  163. /// a good candidate for parallel compression.
  164. /// </para>
  165. ///
  166. /// </remarks>
  167. ///
  168. /// <example>
  169. ///
  170. /// This example shows how to use a ParallelDeflateOutputStream to compress
  171. /// data. It reads a file, compresses it, and writes the compressed data to
  172. /// a second, output file.
  173. ///
  174. /// <code>
  175. /// byte[] buffer = new byte[WORKING_BUFFER_SIZE];
  176. /// int n= -1;
  177. /// String outputFile = fileToCompress + ".compressed";
  178. /// using (System.IO.Stream input = System.IO.File.OpenRead(fileToCompress))
  179. /// {
  180. /// using (var raw = System.IO.File.Create(outputFile))
  181. /// {
  182. /// using (Stream compressor = new ParallelDeflateOutputStream(raw))
  183. /// {
  184. /// while ((n= input.Read(buffer, 0, buffer.Length)) != 0)
  185. /// {
  186. /// compressor.Write(buffer, 0, n);
  187. /// }
  188. /// }
  189. /// }
  190. /// }
  191. /// </code>
  192. /// <code lang="VB">
  193. /// Dim buffer As Byte() = New Byte(4096) {}
  194. /// Dim n As Integer = -1
  195. /// Dim outputFile As String = (fileToCompress &amp; ".compressed")
  196. /// Using input As Stream = File.OpenRead(fileToCompress)
  197. /// Using raw As FileStream = File.Create(outputFile)
  198. /// Using compressor As Stream = New ParallelDeflateOutputStream(raw)
  199. /// Do While (n &lt;&gt; 0)
  200. /// If (n &gt; 0) Then
  201. /// compressor.Write(buffer, 0, n)
  202. /// End If
  203. /// n = input.Read(buffer, 0, buffer.Length)
  204. /// Loop
  205. /// End Using
  206. /// End Using
  207. /// End Using
  208. /// </code>
  209. /// </example>
  210. /// <param name="stream">The stream to which compressed data will be written.</param>
  211. public ParallelDeflateOutputStream(System.IO.Stream stream)
  212. : this(stream, CompressionLevel.Default, CompressionStrategy.Default, false)
  213. {
  214. }
  215. /// <summary>
  216. /// Create a ParallelDeflateOutputStream using the specified CompressionLevel.
  217. /// </summary>
  218. /// <remarks>
  219. /// See the <see cref="ParallelDeflateOutputStream(System.IO.Stream)"/>
  220. /// constructor for example code.
  221. /// </remarks>
  222. /// <param name="stream">The stream to which compressed data will be written.</param>
  223. /// <param name="level">A tuning knob to trade speed for effectiveness.</param>
  224. public ParallelDeflateOutputStream(System.IO.Stream stream, CompressionLevel level)
  225. : this(stream, level, CompressionStrategy.Default, false)
  226. {
  227. }
  228. /// <summary>
  229. /// Create a ParallelDeflateOutputStream and specify whether to leave the captive stream open
  230. /// when the ParallelDeflateOutputStream is closed.
  231. /// </summary>
  232. /// <remarks>
  233. /// See the <see cref="ParallelDeflateOutputStream(System.IO.Stream)"/>
  234. /// constructor for example code.
  235. /// </remarks>
  236. /// <param name="stream">The stream to which compressed data will be written.</param>
  237. /// <param name="leaveOpen">
  238. /// true if the application would like the stream to remain open after inflation/deflation.
  239. /// </param>
  240. public ParallelDeflateOutputStream(System.IO.Stream stream, bool leaveOpen)
  241. : this(stream, CompressionLevel.Default, CompressionStrategy.Default, leaveOpen)
  242. {
  243. }
  244. /// <summary>
  245. /// Create a ParallelDeflateOutputStream and specify whether to leave the captive stream open
  246. /// when the ParallelDeflateOutputStream is closed.
  247. /// </summary>
  248. /// <remarks>
  249. /// See the <see cref="ParallelDeflateOutputStream(System.IO.Stream)"/>
  250. /// constructor for example code.
  251. /// </remarks>
  252. /// <param name="stream">The stream to which compressed data will be written.</param>
  253. /// <param name="level">A tuning knob to trade speed for effectiveness.</param>
  254. /// <param name="leaveOpen">
  255. /// true if the application would like the stream to remain open after inflation/deflation.
  256. /// </param>
  257. public ParallelDeflateOutputStream(System.IO.Stream stream, CompressionLevel level, bool leaveOpen)
  258. : this(stream, CompressionLevel.Default, CompressionStrategy.Default, leaveOpen)
  259. {
  260. }
  261. /// <summary>
  262. /// Create a ParallelDeflateOutputStream using the specified
  263. /// CompressionLevel and CompressionStrategy, and specifying whether to
  264. /// leave the captive stream open when the ParallelDeflateOutputStream is
  265. /// closed.
  266. /// </summary>
  267. /// <remarks>
  268. /// See the <see cref="ParallelDeflateOutputStream(System.IO.Stream)"/>
  269. /// constructor for example code.
  270. /// </remarks>
  271. /// <param name="stream">The stream to which compressed data will be written.</param>
  272. /// <param name="level">A tuning knob to trade speed for effectiveness.</param>
  273. /// <param name="strategy">
  274. /// By tweaking this parameter, you may be able to optimize the compression for
  275. /// data with particular characteristics.
  276. /// </param>
  277. /// <param name="leaveOpen">
  278. /// true if the application would like the stream to remain open after inflation/deflation.
  279. /// </param>
  280. public ParallelDeflateOutputStream(System.IO.Stream stream,
  281. CompressionLevel level,
  282. CompressionStrategy strategy,
  283. bool leaveOpen)
  284. {
  285. TraceOutput(TraceBits.Lifecycle | TraceBits.Session, "-------------------------------------------------------");
  286. TraceOutput(TraceBits.Lifecycle | TraceBits.Session, "Create {0:X8}", this.GetHashCode());
  287. _outStream = stream;
  288. _compressLevel= level;
  289. Strategy = strategy;
  290. _leaveOpen = leaveOpen;
  291. this.MaxBufferPairs = 16; // default
  292. }
  293. /// <summary>
  294. /// The ZLIB strategy to be used during compression.
  295. /// </summary>
  296. ///
  297. public CompressionStrategy Strategy
  298. {
  299. get;
  300. private set;
  301. }
  302. /// <summary>
  303. /// The maximum number of buffer pairs to use.
  304. /// </summary>
  305. ///
  306. /// <remarks>
  307. /// <para>
  308. /// This property sets an upper limit on the number of memory buffer
  309. /// pairs to create. The implementation of this stream allocates
  310. /// multiple buffers to facilitate parallel compression. As each buffer
  311. /// fills up, this stream uses <see
  312. /// cref="System.Threading.ThreadPool.QueueUserWorkItem(WaitCallback)">
  313. /// ThreadPool.QueueUserWorkItem()</see>
  314. /// to compress those buffers in a background threadpool thread. After a
  315. /// buffer is compressed, it is re-ordered and written to the output
  316. /// stream.
  317. /// </para>
  318. ///
  319. /// <para>
  320. /// A higher number of buffer pairs enables a higher degree of
  321. /// parallelism, which tends to increase the speed of compression on
  322. /// multi-cpu computers. On the other hand, a higher number of buffer
  323. /// pairs also implies a larger memory consumption, more active worker
  324. /// threads, and a higher cpu utilization for any compression. This
  325. /// property enables the application to limit its memory consumption and
  326. /// CPU utilization behavior depending on requirements.
  327. /// </para>
  328. ///
  329. /// <para>
  330. /// For each compression "task" that occurs in parallel, there are 2
  331. /// buffers allocated: one for input and one for output. This property
  332. /// sets a limit for the number of pairs. The total amount of storage
  333. /// space allocated for buffering will then be (N*S*2), where N is the
  334. /// number of buffer pairs, S is the size of each buffer (<see
  335. /// cref="BufferSize"/>). By default, DotNetZip allocates 4 buffer
  336. /// pairs per CPU core, so if your machine has 4 cores, and you retain
  337. /// the default buffer size of 128k, then the
  338. /// ParallelDeflateOutputStream will use 4 * 4 * 2 * 128kb of buffer
  339. /// memory in total, or 4mb, in blocks of 128kb. If you then set this
  340. /// property to 8, then the number will be 8 * 2 * 128kb of buffer
  341. /// memory, or 2mb.
  342. /// </para>
  343. ///
  344. /// <para>
  345. /// CPU utilization will also go up with additional buffers, because a
  346. /// larger number of buffer pairs allows a larger number of background
  347. /// threads to compress in parallel. If you find that parallel
  348. /// compression is consuming too much memory or CPU, you can adjust this
  349. /// value downward.
  350. /// </para>
  351. ///
  352. /// <para>
  353. /// The default value is 16. Different values may deliver better or
  354. /// worse results, depending on your priorities and the dynamic
  355. /// performance characteristics of your storage and compute resources.
  356. /// </para>
  357. ///
  358. /// <para>
  359. /// This property is not the number of buffer pairs to use; it is an
  360. /// upper limit. An illustration: Suppose you have an application that
  361. /// uses the default value of this property (which is 16), and it runs
  362. /// on a machine with 2 CPU cores. In that case, DotNetZip will allocate
  363. /// 4 buffer pairs per CPU core, for a total of 8 pairs. The upper
  364. /// limit specified by this property has no effect.
  365. /// </para>
  366. ///
  367. /// <para>
  368. /// The application can set this value at any time, but it is effective
  369. /// only before the first call to Write(), which is when the buffers are
  370. /// allocated.
  371. /// </para>
  372. /// </remarks>
  373. public int MaxBufferPairs
  374. {
  375. get
  376. {
  377. return _maxBufferPairs;
  378. }
  379. set
  380. {
  381. if (value < 4)
  382. throw new ArgumentException("MaxBufferPairs",
  383. "Value must be 4 or greater.");
  384. _maxBufferPairs = value;
  385. }
  386. }
  387. /// <summary>
  388. /// The size of the buffers used by the compressor threads.
  389. /// </summary>
  390. /// <remarks>
  391. ///
  392. /// <para>
  393. /// The default buffer size is 128k. The application can set this value
  394. /// at any time, but it is effective only before the first Write().
  395. /// </para>
  396. ///
  397. /// <para>
  398. /// Larger buffer sizes implies larger memory consumption but allows
  399. /// more efficient compression. Using smaller buffer sizes consumes less
  400. /// memory but may result in less effective compression. For example,
  401. /// using the default buffer size of 128k, the compression delivered is
  402. /// within 1% of the compression delivered by the single-threaded <see
  403. /// cref="Ionic.Zlib.DeflateStream"/>. On the other hand, using a
  404. /// BufferSize of 8k can result in a compressed data stream that is 5%
  405. /// larger than that delivered by the single-threaded
  406. /// <c>DeflateStream</c>. Excessively small buffer sizes can also cause
  407. /// the speed of the ParallelDeflateOutputStream to drop, because of
  408. /// larger thread scheduling overhead dealing with many many small
  409. /// buffers.
  410. /// </para>
  411. ///
  412. /// <para>
  413. /// The total amount of storage space allocated for buffering will be
  414. /// (N*S*2), where N is the number of buffer pairs, and S is the size of
  415. /// each buffer (this property). There are 2 buffers used by the
  416. /// compressor, one for input and one for output. By default, DotNetZip
  417. /// allocates 4 buffer pairs per CPU core, so if your machine has 4
  418. /// cores, then the number of buffer pairs used will be 16. If you
  419. /// accept the default value of this property, 128k, then the
  420. /// ParallelDeflateOutputStream will use 16 * 2 * 128kb of buffer memory
  421. /// in total, or 4mb, in blocks of 128kb. If you set this property to
  422. /// 64kb, then the number will be 16 * 2 * 64kb of buffer memory, or
  423. /// 2mb.
  424. /// </para>
  425. ///
  426. /// </remarks>
  427. public int BufferSize
  428. {
  429. get { return _bufferSize;}
  430. set
  431. {
  432. if (value < 1024)
  433. throw new ArgumentOutOfRangeException("BufferSize",
  434. "BufferSize must be greater than 1024 bytes");
  435. _bufferSize = value;
  436. }
  437. }
  438. /// <summary>
  439. /// The CRC32 for the data that was written out, prior to compression.
  440. /// </summary>
  441. /// <remarks>
  442. /// This value is meaningful only after a call to Close().
  443. /// </remarks>
  444. public int Crc32 { get { return _Crc32; } }
  445. /// <summary>
  446. /// The total number of uncompressed bytes processed by the ParallelDeflateOutputStream.
  447. /// </summary>
  448. /// <remarks>
  449. /// This value is meaningful only after a call to Close().
  450. /// </remarks>
  451. public Int64 BytesProcessed { get { return _totalBytesProcessed; } }
  452. private void _InitializePoolOfWorkItems()
  453. {
  454. _toWrite = new Queue<int>();
  455. _toFill = new Queue<int>();
  456. _pool = new System.Collections.Generic.List<WorkItem>();
  457. int nTasks = BufferPairsPerCore * Environment.ProcessorCount;
  458. nTasks = Math.Min(nTasks, _maxBufferPairs);
  459. for(int i=0; i < nTasks; i++)
  460. {
  461. _pool.Add(new WorkItem(_bufferSize, _compressLevel, Strategy, i));
  462. _toFill.Enqueue(i);
  463. }
  464. _newlyCompressedBlob = new AutoResetEvent(false);
  465. _runningCrc = new Ionic.Crc.CRC32();
  466. _currentlyFilling = -1;
  467. _lastFilled = -1;
  468. _lastWritten = -1;
  469. _latestCompressed = -1;
  470. }
  471. /// <summary>
  472. /// Write data to the stream.
  473. /// </summary>
  474. ///
  475. /// <remarks>
  476. ///
  477. /// <para>
  478. /// To use the ParallelDeflateOutputStream to compress data, create a
  479. /// ParallelDeflateOutputStream with CompressionMode.Compress, passing a
  480. /// writable output stream. Then call Write() on that
  481. /// ParallelDeflateOutputStream, providing uncompressed data as input. The
  482. /// data sent to the output stream will be the compressed form of the data
  483. /// written.
  484. /// </para>
  485. ///
  486. /// <para>
  487. /// To decompress data, use the <see cref="Ionic.Zlib.DeflateStream"/> class.
  488. /// </para>
  489. ///
  490. /// </remarks>
  491. /// <param name="buffer">The buffer holding data to write to the stream.</param>
  492. /// <param name="offset">the offset within that data array to find the first byte to write.</param>
  493. /// <param name="count">the number of bytes to write.</param>
  494. public override void Write(byte[] buffer, int offset, int count)
  495. {
  496. bool mustWait = false;
  497. // This method does this:
  498. // 0. handles any pending exceptions
  499. // 1. write any buffers that are ready to be written,
  500. // 2. fills a work buffer; when full, flip state to 'Filled',
  501. // 3. if more data to be written, goto step 1
  502. if (_isClosed)
  503. throw new InvalidOperationException();
  504. // dispense any exceptions that occurred on the BG threads
  505. if (_pendingException != null)
  506. {
  507. _handlingException = true;
  508. var pe = _pendingException;
  509. _pendingException = null;
  510. throw pe;
  511. }
  512. if (count == 0) return;
  513. if (!_firstWriteDone)
  514. {
  515. // Want to do this on first Write, first session, and not in the
  516. // constructor. We want to allow MaxBufferPairs to
  517. // change after construction, but before first Write.
  518. _InitializePoolOfWorkItems();
  519. _firstWriteDone = true;
  520. }
  521. do
  522. {
  523. // may need to make buffers available
  524. EmitPendingBuffers(false, mustWait);
  525. mustWait = false;
  526. // use current buffer, or get a new buffer to fill
  527. int ix = -1;
  528. if (_currentlyFilling >= 0)
  529. {
  530. ix = _currentlyFilling;
  531. TraceOutput(TraceBits.WriteTake,
  532. "Write notake wi({0}) lf({1})",
  533. ix,
  534. _lastFilled);
  535. }
  536. else
  537. {
  538. TraceOutput(TraceBits.WriteTake, "Write take?");
  539. if (_toFill.Count == 0)
  540. {
  541. // no available buffers, so... need to emit
  542. // compressed buffers.
  543. mustWait = true;
  544. continue;
  545. }
  546. ix = _toFill.Dequeue();
  547. TraceOutput(TraceBits.WriteTake,
  548. "Write take wi({0}) lf({1})",
  549. ix,
  550. _lastFilled);
  551. ++_lastFilled; // TODO: consider rollover?
  552. }
  553. WorkItem workitem = _pool[ix];
  554. int limit = ((workitem.buffer.Length - workitem.inputBytesAvailable) > count)
  555. ? count
  556. : (workitem.buffer.Length - workitem.inputBytesAvailable);
  557. workitem.ordinal = _lastFilled;
  558. TraceOutput(TraceBits.Write,
  559. "Write lock wi({0}) ord({1}) iba({2})",
  560. workitem.index,
  561. workitem.ordinal,
  562. workitem.inputBytesAvailable
  563. );
  564. // copy from the provided buffer to our workitem, starting at
  565. // the tail end of whatever data we might have in there currently.
  566. Buffer.BlockCopy(buffer,
  567. offset,
  568. workitem.buffer,
  569. workitem.inputBytesAvailable,
  570. limit);
  571. count -= limit;
  572. offset += limit;
  573. workitem.inputBytesAvailable += limit;
  574. if (workitem.inputBytesAvailable == workitem.buffer.Length)
  575. {
  576. // No need for interlocked.increment: the Write()
  577. // method is documented as not multi-thread safe, so
  578. // we can assume Write() calls come in from only one
  579. // thread.
  580. TraceOutput(TraceBits.Write,
  581. "Write QUWI wi({0}) ord({1}) iba({2}) nf({3})",
  582. workitem.index,
  583. workitem.ordinal,
  584. workitem.inputBytesAvailable );
  585. #if !PCL
  586. if (!ThreadPool.QueueUserWorkItem( _DeflateOne, workitem ))
  587. throw new Exception("Cannot enqueue workitem");
  588. #else
  589. System.Threading.Tasks.Task.Run(() => _DeflateOne(workitem));
  590. #endif
  591. _currentlyFilling = -1; // will get a new buffer next time
  592. }
  593. else
  594. _currentlyFilling = ix;
  595. if (count > 0)
  596. TraceOutput(TraceBits.WriteEnter, "Write more");
  597. }
  598. while (count > 0); // until no more to write
  599. TraceOutput(TraceBits.WriteEnter, "Write exit");
  600. return;
  601. }
  602. private void _FlushFinish()
  603. {
  604. // After writing a series of compressed buffers, each one closed
  605. // with Flush.Sync, we now write the final one as Flush.Finish,
  606. // and then stop.
  607. byte[] buffer = new byte[128];
  608. var compressor = new ZlibCodec();
  609. int rc = compressor.InitializeDeflate(_compressLevel, false);
  610. compressor.InputBuffer = null;
  611. compressor.NextIn = 0;
  612. compressor.AvailableBytesIn = 0;
  613. compressor.OutputBuffer = buffer;
  614. compressor.NextOut = 0;
  615. compressor.AvailableBytesOut = buffer.Length;
  616. rc = compressor.Deflate(FlushType.Finish);
  617. if (rc != ZlibConstants.Z_STREAM_END && rc != ZlibConstants.Z_OK)
  618. throw new Exception("deflating: " + compressor.Message);
  619. if (buffer.Length - compressor.AvailableBytesOut > 0)
  620. {
  621. TraceOutput(TraceBits.EmitBegin,
  622. "Emit begin flush bytes({0})",
  623. buffer.Length - compressor.AvailableBytesOut);
  624. _outStream.Write(buffer, 0, buffer.Length - compressor.AvailableBytesOut);
  625. TraceOutput(TraceBits.EmitDone,
  626. "Emit done flush");
  627. }
  628. compressor.EndDeflate();
  629. _Crc32 = _runningCrc.Crc32Result;
  630. }
  631. private void _Flush(bool lastInput)
  632. {
  633. if (_isClosed)
  634. throw new InvalidOperationException();
  635. if (emitting) return;
  636. // compress any partial buffer
  637. if (_currentlyFilling >= 0)
  638. {
  639. WorkItem workitem = _pool[_currentlyFilling];
  640. _DeflateOne(workitem);
  641. _currentlyFilling = -1; // get a new buffer next Write()
  642. }
  643. if (lastInput)
  644. {
  645. EmitPendingBuffers(true, false);
  646. _FlushFinish();
  647. }
  648. else
  649. {
  650. EmitPendingBuffers(false, false);
  651. }
  652. }
  653. /// <summary>
  654. /// Flush the stream.
  655. /// </summary>
  656. public override void Flush()
  657. {
  658. if (_pendingException != null)
  659. {
  660. _handlingException = true;
  661. var pe = _pendingException;
  662. _pendingException = null;
  663. throw pe;
  664. }
  665. if (_handlingException)
  666. return;
  667. _Flush(false);
  668. }
  669. #if !PCL
  670. /// <summary>
  671. /// Close the stream.
  672. /// </summary>
  673. /// <remarks>
  674. /// You must call Close on the stream to guarantee that all of the data written in has
  675. /// been compressed, and the compressed data has been written out.
  676. /// </remarks>
  677. public override void Close()
  678. {
  679. InnerClose();
  680. }
  681. #endif
  682. private void InnerClose()
  683. {
  684. TraceOutput(TraceBits.Session, "Close {0:X8}", this.GetHashCode());
  685. if (_pendingException != null)
  686. {
  687. _handlingException = true;
  688. var pe = _pendingException;
  689. _pendingException = null;
  690. throw pe;
  691. }
  692. if (_handlingException)
  693. return;
  694. if (_isClosed) return;
  695. _Flush(true);
  696. if (!_leaveOpen)
  697. _outStream.Dispose();
  698. _isClosed= true;
  699. }
  700. // workitem 10030 - implement a new Dispose method
  701. /// <summary>Dispose the object</summary>
  702. /// <remarks>
  703. /// <para>
  704. /// Because ParallelDeflateOutputStream is IDisposable, the
  705. /// application must call this method when finished using the instance.
  706. /// </para>
  707. /// <para>
  708. /// This method is generally called implicitly upon exit from
  709. /// a <c>using</c> scope in C# (<c>Using</c> in VB).
  710. /// </para>
  711. /// </remarks>
  712. new public void Dispose()
  713. {
  714. TraceOutput(TraceBits.Lifecycle, "Dispose {0:X8}", this.GetHashCode());
  715. _pool = null;
  716. Dispose(true);
  717. }
  718. /// <summary>The Dispose method</summary>
  719. /// <param name="disposing">
  720. /// indicates whether the Dispose method was invoked by user code.
  721. /// </param>
  722. protected override void Dispose(bool disposing)
  723. {
  724. base.Dispose(disposing);
  725. InnerClose();
  726. }
  727. /// <summary>
  728. /// Resets the stream for use with another stream.
  729. /// </summary>
  730. /// <remarks>
  731. /// Because the ParallelDeflateOutputStream is expensive to create, it
  732. /// has been designed so that it can be recycled and re-used. You have
  733. /// to call Close() on the stream first, then you can call Reset() on
  734. /// it, to use it again on another stream.
  735. /// </remarks>
  736. ///
  737. /// <param name="stream">
  738. /// The new output stream for this era.
  739. /// </param>
  740. ///
  741. /// <example>
  742. /// <code>
  743. /// ParallelDeflateOutputStream deflater = null;
  744. /// foreach (var inputFile in listOfFiles)
  745. /// {
  746. /// string outputFile = inputFile + ".compressed";
  747. /// using (System.IO.Stream input = System.IO.File.OpenRead(inputFile))
  748. /// {
  749. /// using (var outStream = System.IO.File.Create(outputFile))
  750. /// {
  751. /// if (deflater == null)
  752. /// deflater = new ParallelDeflateOutputStream(outStream,
  753. /// CompressionLevel.Best,
  754. /// CompressionStrategy.Default,
  755. /// true);
  756. /// deflater.Reset(outStream);
  757. ///
  758. /// while ((n= input.Read(buffer, 0, buffer.Length)) != 0)
  759. /// {
  760. /// deflater.Write(buffer, 0, n);
  761. /// }
  762. /// }
  763. /// }
  764. /// }
  765. /// </code>
  766. /// </example>
  767. public void Reset(Stream stream)
  768. {
  769. TraceOutput(TraceBits.Session, "-------------------------------------------------------");
  770. TraceOutput(TraceBits.Session, "Reset {0:X8} firstDone({1})", this.GetHashCode(), _firstWriteDone);
  771. if (!_firstWriteDone) return;
  772. // reset all status
  773. _toWrite.Clear();
  774. _toFill.Clear();
  775. foreach (var workitem in _pool)
  776. {
  777. _toFill.Enqueue(workitem.index);
  778. workitem.ordinal = -1;
  779. }
  780. _firstWriteDone = false;
  781. _totalBytesProcessed = 0L;
  782. _runningCrc = new Ionic.Crc.CRC32();
  783. _isClosed= false;
  784. _currentlyFilling = -1;
  785. _lastFilled = -1;
  786. _lastWritten = -1;
  787. _latestCompressed = -1;
  788. _outStream = stream;
  789. }
  790. private void EmitPendingBuffers(bool doAll, bool mustWait)
  791. {
  792. // When combining parallel deflation with a ZipSegmentedStream, it's
  793. // possible for the ZSS to throw from within this method. In that
  794. // case, Close/Dispose will be called on this stream, if this stream
  795. // is employed within a using or try/finally pair as required. But
  796. // this stream is unaware of the pending exception, so the Close()
  797. // method invokes this method AGAIN. This can lead to a deadlock.
  798. // Therefore, failfast if re-entering.
  799. if (emitting) return;
  800. emitting = true;
  801. if ((doAll && (_latestCompressed != _lastFilled)) || mustWait) {
  802. _newlyCompressedBlob.WaitOne();
  803. }
  804. do
  805. {
  806. int firstSkip = -1;
  807. int millisecondsToWait = doAll ? 200 : (mustWait ? -1 : 0);
  808. int nextToWrite = -1;
  809. do
  810. {
  811. if (Monitor.TryEnter(_toWrite, millisecondsToWait))
  812. {
  813. nextToWrite = -1;
  814. try
  815. {
  816. if (_toWrite.Count > 0)
  817. nextToWrite = _toWrite.Dequeue();
  818. }
  819. finally
  820. {
  821. Monitor.Exit(_toWrite);
  822. }
  823. if (nextToWrite >= 0)
  824. {
  825. WorkItem workitem = _pool[nextToWrite];
  826. if (workitem.ordinal != _lastWritten + 1)
  827. {
  828. // out of order. requeue and try again.
  829. TraceOutput(TraceBits.EmitSkip,
  830. "Emit skip wi({0}) ord({1}) lw({2}) fs({3})",
  831. workitem.index,
  832. workitem.ordinal,
  833. _lastWritten,
  834. firstSkip);
  835. lock(_toWrite)
  836. {
  837. _toWrite.Enqueue(nextToWrite);
  838. }
  839. if (firstSkip == nextToWrite)
  840. {
  841. // We went around the list once.
  842. // None of the items in the list is the one we want.
  843. // Now wait for a compressor to signal again.
  844. _newlyCompressedBlob.WaitOne();
  845. firstSkip = -1;
  846. }
  847. else if (firstSkip == -1)
  848. firstSkip = nextToWrite;
  849. continue;
  850. }
  851. firstSkip = -1;
  852. TraceOutput(TraceBits.EmitBegin,
  853. "Emit begin wi({0}) ord({1}) cba({2})",
  854. workitem.index,
  855. workitem.ordinal,
  856. workitem.compressedBytesAvailable);
  857. _outStream.Write(workitem.compressed, 0, workitem.compressedBytesAvailable);
  858. _runningCrc.Combine(workitem.crc, workitem.inputBytesAvailable);
  859. _totalBytesProcessed += workitem.inputBytesAvailable;
  860. workitem.inputBytesAvailable = 0;
  861. TraceOutput(TraceBits.EmitDone,
  862. "Emit done wi({0}) ord({1}) cba({2}) mtw({3})",
  863. workitem.index,
  864. workitem.ordinal,
  865. workitem.compressedBytesAvailable,
  866. millisecondsToWait);
  867. _lastWritten = workitem.ordinal;
  868. _toFill.Enqueue(workitem.index);
  869. // don't wait next time through
  870. if (millisecondsToWait == -1) millisecondsToWait = 0;
  871. }
  872. }
  873. else
  874. nextToWrite = -1;
  875. } while (nextToWrite >= 0);
  876. } while (doAll && (_lastWritten != _latestCompressed || _lastWritten != _lastFilled));
  877. emitting = false;
  878. }
  879. #if OLD
  880. private void _PerpetualWriterMethod(object state)
  881. {
  882. TraceOutput(TraceBits.WriterThread, "_PerpetualWriterMethod START");
  883. try
  884. {
  885. do
  886. {
  887. // wait for the next session
  888. TraceOutput(TraceBits.Synch | TraceBits.WriterThread, "Synch _sessionReset.WaitOne(begin) PWM");
  889. _sessionReset.WaitOne();
  890. TraceOutput(TraceBits.Synch | TraceBits.WriterThread, "Synch _sessionReset.WaitOne(done) PWM");
  891. if (_isDisposed) break;
  892. TraceOutput(TraceBits.Synch | TraceBits.WriterThread, "Synch _sessionReset.Reset() PWM");
  893. _sessionReset.Reset();
  894. // repeatedly write buffers as they become ready
  895. WorkItem workitem = null;
  896. Ionic.Zlib.CRC32 c= new Ionic.Zlib.CRC32();
  897. do
  898. {
  899. workitem = _pool[_nextToWrite % _pc];
  900. lock(workitem)
  901. {
  902. if (_noMoreInputForThisSegment)
  903. TraceOutput(TraceBits.Write,
  904. "Write drain wi({0}) stat({1}) canuse({2}) cba({3})",
  905. workitem.index,
  906. workitem.status,
  907. (workitem.status == (int)WorkItem.Status.Compressed),
  908. workitem.compressedBytesAvailable);
  909. do
  910. {
  911. if (workitem.status == (int)WorkItem.Status.Compressed)
  912. {
  913. TraceOutput(TraceBits.WriteBegin,
  914. "Write begin wi({0}) stat({1}) cba({2})",
  915. workitem.index,
  916. workitem.status,
  917. workitem.compressedBytesAvailable);
  918. workitem.status = (int)WorkItem.Status.Writing;
  919. _outStream.Write(workitem.compressed, 0, workitem.compressedBytesAvailable);
  920. c.Combine(workitem.crc, workitem.inputBytesAvailable);
  921. _totalBytesProcessed += workitem.inputBytesAvailable;
  922. _nextToWrite++;
  923. workitem.inputBytesAvailable= 0;
  924. workitem.status = (int)WorkItem.Status.Done;
  925. TraceOutput(TraceBits.WriteDone,
  926. "Write done wi({0}) stat({1}) cba({2})",
  927. workitem.index,
  928. workitem.status,
  929. workitem.compressedBytesAvailable);
  930. Monitor.Pulse(workitem);
  931. break;
  932. }
  933. else
  934. {
  935. int wcycles = 0;
  936. // I've locked a workitem I cannot use.
  937. // Therefore, wake someone else up, and then release the lock.
  938. while (workitem.status != (int)WorkItem.Status.Compressed)
  939. {
  940. TraceOutput(TraceBits.WriteWait,
  941. "Write waiting wi({0}) stat({1}) nw({2}) nf({3}) nomore({4})",
  942. workitem.index,
  943. workitem.status,
  944. _nextToWrite, _nextToFill,
  945. _noMoreInputForThisSegment );
  946. if (_noMoreInputForThisSegment && _nextToWrite == _nextToFill)
  947. break;
  948. wcycles++;
  949. // wake up someone else
  950. Monitor.Pulse(workitem);
  951. // release and wait
  952. Monitor.Wait(workitem);
  953. if (workitem.status == (int)WorkItem.Status.Compressed)
  954. TraceOutput(TraceBits.WriteWait,
  955. "Write A-OK wi({0}) stat({1}) iba({2}) cba({3}) cyc({4})",
  956. workitem.index,
  957. workitem.status,
  958. workitem.inputBytesAvailable,
  959. workitem.compressedBytesAvailable,
  960. wcycles);
  961. }
  962. if (_noMoreInputForThisSegment && _nextToWrite == _nextToFill)
  963. break;
  964. }
  965. }
  966. while (true);
  967. }
  968. if (_noMoreInputForThisSegment)
  969. TraceOutput(TraceBits.Write,
  970. "Write nomore nw({0}) nf({1}) break({2})",
  971. _nextToWrite, _nextToFill, (_nextToWrite == _nextToFill));
  972. if (_noMoreInputForThisSegment && _nextToWrite == _nextToFill)
  973. break;
  974. } while (true);
  975. // Finish:
  976. // After writing a series of buffers, closing each one with
  977. // Flush.Sync, we now write the final one as Flush.Finish, and
  978. // then stop.
  979. byte[] buffer = new byte[128];
  980. ZlibCodec compressor = new ZlibCodec();
  981. int rc = compressor.InitializeDeflate(_compressLevel, false);
  982. compressor.InputBuffer = null;
  983. compressor.NextIn = 0;
  984. compressor.AvailableBytesIn = 0;
  985. compressor.OutputBuffer = buffer;
  986. compressor.NextOut = 0;
  987. compressor.AvailableBytesOut = buffer.Length;
  988. rc = compressor.Deflate(FlushType.Finish);
  989. if (rc != ZlibConstants.Z_STREAM_END && rc != ZlibConstants.Z_OK)
  990. throw new Exception("deflating: " + compressor.Message);
  991. if (buffer.Length - compressor.AvailableBytesOut > 0)
  992. {
  993. TraceOutput(TraceBits.WriteBegin,
  994. "Write begin flush bytes({0})",
  995. buffer.Length - compressor.AvailableBytesOut);
  996. _outStream.Write(buffer, 0, buffer.Length - compressor.AvailableBytesOut);
  997. TraceOutput(TraceBits.WriteBegin,
  998. "Write done flush");
  999. }
  1000. compressor.EndDeflate();
  1001. _Crc32 = c.Crc32Result;
  1002. // signal that writing is complete:
  1003. TraceOutput(TraceBits.Synch, "Synch _writingDone.Set() PWM");
  1004. _writingDone.Set();
  1005. }
  1006. while (true);
  1007. }
  1008. catch (System.Exception exc1)
  1009. {
  1010. lock(_eLock)
  1011. {
  1012. // expose the exception to the main thread
  1013. if (_pendingException!=null)
  1014. _pendingException = exc1;
  1015. }
  1016. }
  1017. TraceOutput(TraceBits.WriterThread, "_PerpetualWriterMethod FINIS");
  1018. }
  1019. #endif
  1020. private void _DeflateOne(Object wi)
  1021. {
  1022. // compress one buffer
  1023. WorkItem workitem = (WorkItem) wi;
  1024. try
  1025. {
  1026. int myItem = workitem.index;
  1027. Ionic.Crc.CRC32 crc = new Ionic.Crc.CRC32();
  1028. // calc CRC on the buffer
  1029. crc.SlurpBlock(workitem.buffer, 0, workitem.inputBytesAvailable);
  1030. // deflate it
  1031. DeflateOneSegment(workitem);
  1032. // update status
  1033. workitem.crc = crc.Crc32Result;
  1034. TraceOutput(TraceBits.Compress,
  1035. "Compress wi({0}) ord({1}) len({2})",
  1036. workitem.index,
  1037. workitem.ordinal,
  1038. workitem.compressedBytesAvailable
  1039. );
  1040. lock(_latestLock)
  1041. {
  1042. if (workitem.ordinal > _latestCompressed)
  1043. _latestCompressed = workitem.ordinal;
  1044. }
  1045. lock (_toWrite)
  1046. {
  1047. _toWrite.Enqueue(workitem.index);
  1048. }
  1049. _newlyCompressedBlob.Set();
  1050. }
  1051. catch (System.Exception exc1)
  1052. {
  1053. lock(_eLock)
  1054. {
  1055. // expose the exception to the main thread
  1056. if (_pendingException!=null)
  1057. _pendingException = exc1;
  1058. }
  1059. }
  1060. }
  1061. private bool DeflateOneSegment(WorkItem workitem)
  1062. {
  1063. ZlibCodec compressor = workitem.compressor;
  1064. int rc= 0;
  1065. compressor.ResetDeflate();
  1066. compressor.NextIn = 0;
  1067. compressor.AvailableBytesIn = workitem.inputBytesAvailable;
  1068. // step 1: deflate the buffer
  1069. compressor.NextOut = 0;
  1070. compressor.AvailableBytesOut = workitem.compressed.Length;
  1071. do
  1072. {
  1073. compressor.Deflate(FlushType.None);
  1074. }
  1075. while (compressor.AvailableBytesIn > 0 || compressor.AvailableBytesOut == 0);
  1076. // step 2: flush (sync)
  1077. rc = compressor.Deflate(FlushType.Sync);
  1078. workitem.compressedBytesAvailable= (int) compressor.TotalBytesOut;
  1079. return true;
  1080. }
  1081. [System.Diagnostics.ConditionalAttribute("Trace")]
  1082. private void TraceOutput(TraceBits bits, string format, params object[] varParams)
  1083. {
  1084. if ((bits & _DesiredTrace) != 0)
  1085. {
  1086. lock(_outputLock)
  1087. {
  1088. #if !PCL
  1089. int tid = Thread.CurrentThread.GetHashCode();
  1090. #if !SILVERLIGHT
  1091. Console.ForegroundColor = (ConsoleColor) (tid % 8 + 8);
  1092. #endif
  1093. Console.Write("{0:000} PDOS ", tid);
  1094. Console.WriteLine(format, varParams);
  1095. #if !SILVERLIGHT
  1096. Console.ResetColor();
  1097. #endif
  1098. #endif
  1099. }
  1100. }
  1101. }
  1102. // used only when Trace is defined
  1103. [Flags]
  1104. enum TraceBits : uint
  1105. {
  1106. None = 0,
  1107. NotUsed1 = 1,
  1108. EmitLock = 2,
  1109. EmitEnter = 4, // enter _EmitPending
  1110. EmitBegin = 8, // begin to write out
  1111. EmitDone = 16, // done writing out
  1112. EmitSkip = 32, // writer skipping a workitem
  1113. EmitAll = 58, // All Emit flags
  1114. Flush = 64,
  1115. Lifecycle = 128, // constructor/disposer
  1116. Session = 256, // Close/Reset
  1117. Synch = 512, // thread synchronization
  1118. Instance = 1024, // instance settings
  1119. Compress = 2048, // compress task
  1120. Write = 4096, // filling buffers, when caller invokes Write()
  1121. WriteEnter = 8192, // upon entry to Write()
  1122. WriteTake = 16384, // on _toFill.Take()
  1123. All = 0xffffffff,
  1124. }
  1125. /// <summary>
  1126. /// Indicates whether the stream supports Seek operations.
  1127. /// </summary>
  1128. /// <remarks>
  1129. /// Always returns false.
  1130. /// </remarks>
  1131. public override bool CanSeek
  1132. {
  1133. get { return false; }
  1134. }
  1135. /// <summary>
  1136. /// Indicates whether the stream supports Read operations.
  1137. /// </summary>
  1138. /// <remarks>
  1139. /// Always returns false.
  1140. /// </remarks>
  1141. public override bool CanRead
  1142. {
  1143. get {return false;}
  1144. }
  1145. /// <summary>
  1146. /// Indicates whether the stream supports Write operations.
  1147. /// </summary>
  1148. /// <remarks>
  1149. /// Returns true if the provided stream is writable.
  1150. /// </remarks>
  1151. public override bool CanWrite
  1152. {
  1153. get { return _outStream.CanWrite; }
  1154. }
  1155. /// <summary>
  1156. /// Reading this property always throws a NotSupportedException.
  1157. /// </summary>
  1158. public override long Length
  1159. {
  1160. get { throw new NotSupportedException(); }
  1161. }
  1162. /// <summary>
  1163. /// Returns the current position of the output stream.
  1164. /// </summary>
  1165. /// <remarks>
  1166. /// <para>
  1167. /// Because the output gets written by a background thread,
  1168. /// the value may change asynchronously. Setting this
  1169. /// property always throws a NotSupportedException.
  1170. /// </para>
  1171. /// </remarks>
  1172. public override long Position
  1173. {
  1174. get { return _outStream.Position; }
  1175. set { throw new NotSupportedException(); }
  1176. }
  1177. /// <summary>
  1178. /// This method always throws a NotSupportedException.
  1179. /// </summary>
  1180. /// <param name="buffer">
  1181. /// The buffer into which data would be read, IF THIS METHOD
  1182. /// ACTUALLY DID ANYTHING.
  1183. /// </param>
  1184. /// <param name="offset">
  1185. /// The offset within that data array at which to insert the
  1186. /// data that is read, IF THIS METHOD ACTUALLY DID
  1187. /// ANYTHING.
  1188. /// </param>
  1189. /// <param name="count">
  1190. /// The number of bytes to write, IF THIS METHOD ACTUALLY DID
  1191. /// ANYTHING.
  1192. /// </param>
  1193. /// <returns>nothing.</returns>
  1194. public override int Read(byte[] buffer, int offset, int count)
  1195. {
  1196. throw new NotSupportedException();
  1197. }
  1198. /// <summary>
  1199. /// This method always throws a NotSupportedException.
  1200. /// </summary>
  1201. /// <param name="offset">
  1202. /// The offset to seek to....
  1203. /// IF THIS METHOD ACTUALLY DID ANYTHING.
  1204. /// </param>
  1205. /// <param name="origin">
  1206. /// The reference specifying how to apply the offset.... IF
  1207. /// THIS METHOD ACTUALLY DID ANYTHING.
  1208. /// </param>
  1209. /// <returns>nothing. It always throws.</returns>
  1210. public override long Seek(long offset, System.IO.SeekOrigin origin)
  1211. {
  1212. throw new NotSupportedException();
  1213. }
  1214. /// <summary>
  1215. /// This method always throws a NotSupportedException.
  1216. /// </summary>
  1217. /// <param name="value">
  1218. /// The new value for the stream length.... IF
  1219. /// THIS METHOD ACTUALLY DID ANYTHING.
  1220. /// </param>
  1221. public override void SetLength(long value)
  1222. {
  1223. throw new NotSupportedException();
  1224. }
  1225. }
  1226. }