c# - Threadsafe buffer wrapping Stream -


i'm using sslstream on top of tcpclient. unfortunately `sslstream`` not support writing or reading multiple threads @ same time. that's why i've wrote own wrapper around it:

private concurrentqueue<byte> sendqueue;  private volatile bool osending;  private readonly object writelock;   public async void write(byte[] buffer, int offset, int count) {   if (osending)   {     lock (writelock)     {       foreach (var b in buffer)       {         sendqueue.enqueue(b);       }     }   }   else   {     osending = true;     await stream.writeasync(buffer, offset, count);     osending = false;      lock (writelock)     {       if (sendqueue.count > 0)       {         write(sendqueue.toarray(), 0, sendqueue.count);         sendqueue = new concurrentqueue<byte>();       }     }   } } 

the intention behind following:

  1. if stream free, write stream.
  2. if stream busy, write buffer.
  3. if stream returns sending, check if there data in queue , send recursively.

i've tried several solutions far, seems everytime data being sent.

p.s.: know filling queue bytewise not nice, made quick , dirty.

update: i've added deletion of queue according dirk's comment.

update

using tpl dataflow:

using system.threading.tasks.dataflow;  public class dataflowstreamwriter {     private readonly memorystream _stream = new memorystream();     private readonly actionblock<byte[]> _block;      public dataflowstreamwriter()     {         _block = new actionblock<byte[]>(                         bytes => _stream.write(bytes, 0, bytes.length));     }      public void write(byte[] data)     {         _block.post(data);     } } 

here's better producer-consumer approach.

whenever writes data concurrentstreamwriter instance, data added buffer. method thread-safe , several threads may writing data @ once. these producers.

then, have 1 single consumer - consuming data off buffer , writing stream.

a blockingcollection<t> used communicate between producers , consumer. way, consumer sits idle if no 1 producing. whenever producer kicks in , writes buffer, consumer wake up.

the consumer lazily initialized - created when , when data first available.

public class concurrentstreamwriter : idisposable {     private readonly memorystream _stream = new memorystream();     private readonly blockingcollection<byte> _buffer = new blockingcollection<byte>(new concurrentqueue<byte>());      private readonly object _writebufferlock = new object();     private task _flusher;     private volatile bool _disposed;      private void flushbuffer()     {         //keep writing stream, , block when buffer empty         while (!_disposed)             _stream.writebyte(_buffer.take());          //when instance has been disposed, flush residue left in concurrentstreamwriter , exit         byte b;         while (_buffer.trytake(out b))             _stream.writebyte(b);     }      public void write(byte[] data)     {         if (_disposed)             throw new objectdisposedexception("concurrentstreamwriter");          lock (_writebufferlock)             foreach (var b in data)                 _buffer.add(b);          initflusher();     }      public void initflusher()     {         //safely create new flusher task if 1 hasn't been created yet         if (_flusher == null)         {             task newflusher = new task(flushbuffer);             if (interlocked.compareexchange(ref _flusher, newflusher, null) == null)                 newflusher.start();         }     }      public void dispose()     {         _disposed = true;         if (_flusher != null)             _flusher.wait();          _buffer.dispose();     } } 

Comments

Popular posts from this blog

c# - How to get the current UAC mode -

postgresql - Lazarus + Postgres: incomplete startup packet -

javascript - Ajax jqXHR.status==0 fix error -