c# - Threadsafe buffer wrapping Stream -
i'm using sslstream
on top of tcpclient
. unfortunately `sslstream`` not support writing or reading multiple threads @ same time. that's why i've wrote own wrapper around it:
private concurrentqueue<byte> sendqueue; private volatile bool osending; private readonly object writelock; public async void write(byte[] buffer, int offset, int count) { if (osending) { lock (writelock) { foreach (var b in buffer) { sendqueue.enqueue(b); } } } else { osending = true; await stream.writeasync(buffer, offset, count); osending = false; lock (writelock) { if (sendqueue.count > 0) { write(sendqueue.toarray(), 0, sendqueue.count); sendqueue = new concurrentqueue<byte>(); } } } }
the intention behind following:
- if stream free, write stream.
- if stream busy, write buffer.
- if stream returns sending, check if there data in queue , send recursively.
i've tried several solutions far, seems everytime data being sent.
p.s.: know filling queue bytewise not nice, made quick , dirty.
update: i've added deletion of queue according dirk's comment.
update
using tpl dataflow:
using system.threading.tasks.dataflow; public class dataflowstreamwriter { private readonly memorystream _stream = new memorystream(); private readonly actionblock<byte[]> _block; public dataflowstreamwriter() { _block = new actionblock<byte[]>( bytes => _stream.write(bytes, 0, bytes.length)); } public void write(byte[] data) { _block.post(data); } }
here's better producer-consumer approach.
whenever writes data concurrentstreamwriter
instance, data added buffer. method thread-safe , several threads may writing data @ once. these producers.
then, have 1 single consumer - consuming data off buffer , writing stream.
a blockingcollection<t>
used communicate between producers , consumer. way, consumer sits idle if no 1 producing. whenever producer kicks in , writes buffer, consumer wake up.
the consumer lazily initialized - created when , when data first available.
public class concurrentstreamwriter : idisposable { private readonly memorystream _stream = new memorystream(); private readonly blockingcollection<byte> _buffer = new blockingcollection<byte>(new concurrentqueue<byte>()); private readonly object _writebufferlock = new object(); private task _flusher; private volatile bool _disposed; private void flushbuffer() { //keep writing stream, , block when buffer empty while (!_disposed) _stream.writebyte(_buffer.take()); //when instance has been disposed, flush residue left in concurrentstreamwriter , exit byte b; while (_buffer.trytake(out b)) _stream.writebyte(b); } public void write(byte[] data) { if (_disposed) throw new objectdisposedexception("concurrentstreamwriter"); lock (_writebufferlock) foreach (var b in data) _buffer.add(b); initflusher(); } public void initflusher() { //safely create new flusher task if 1 hasn't been created yet if (_flusher == null) { task newflusher = new task(flushbuffer); if (interlocked.compareexchange(ref _flusher, newflusher, null) == null) newflusher.start(); } } public void dispose() { _disposed = true; if (_flusher != null) _flusher.wait(); _buffer.dispose(); } }
Comments
Post a Comment