当前位置: 首页 > 编程语言 > C#

WaitingTaskBasedQueueSharing

时间:2023-04-10 16:23:12 C#

WaitingTaskBasedQueue想知道有没有ConcurrentQueue的实现/包装器,类似于BlockingCollection,从集合中获取不会阻塞,而是异步的,会导致异步等到一个项目被放置在队列中。我想出了我自己的实现,但它似乎没有按预期执行。我想知道我是否正在重新发明已经存在的东西。这是我的实现:publicclassMessageQueue{ConcurrentQueuequeue=newConcurrentQueue();ConcurrentQueuewaitingQueue=newConcurrentQueue();对象队列同步锁=新对象();publicvoidEnqueue(Titem){Enqueue(item);处理队列();}publicasyncTaskDequeue(){TaskCompletionSourcetcs=newTaskCompletionSource();waitingQueue.Enqueue(tcs);处理队列();返回tcs.Task.IsCompleted?tcs.Task.Result:等待tcs.Task;}privatevoidProcessQueues(){TaskCompletionSourcetcs=null;TfirstItem=default(T);while(true){布尔确定;锁(queueSyncLock){ok=waitingQueue.TryPeek(outtcs)&&queue.TryPeek(outfirstItem);if(ok){waitingQueue.TryDequeue(outtcs);queue.TryDequeue(outfirstItem);}}如果(!ok)中断;tcs.SetResult(firstItem);我不知道无锁解决方案,但您可以查看新的Dataflow库,它是AsyncCTP的一部分。一个简单的BufferBlock就足够了,例如:BufferBlockbuffer=newBufferBlock();通过数据流块类型的扩展方法可以轻松完成生产和消费。生产就像:buffer.Post(13);消费是异步的prepare:intitem=awaitbuffer.ReceiveAsync();如果可能,我建议您使用数据流;使这样的缓冲区既高效又正确比它最初看起来要难。我的尝试(创建“承诺”时会引发一个事件,外部生产者可以使用该事件来了解何时生产更多项目):私人并发队列>_promisesQueue;私有对象_syncRoot=新对象();publicAsyncQueue(){_bufferQueue=newConcurrentQueue();_promisesQueue=newConcurrentQueue>();}//////排队指定的项目。//////项目。publicvoidEnqueue(Titem){TaskCompletionSource承诺;做{if(_promisesQueue.TryDequeue(outpromise)&&!promise.Task.IsCanceled&&promise.TrySetResult(item)){返回;}}while(promise!=null);锁定(_syncRoot){如果(_promisesQueue.TryDequeue(出承诺)&&!promise.Task.IsCanceled&&promise.TrySetResult(项目)){返回;}_bufferQueue.Enqueue(item);}}//////出队异步。/////取消标记。///publicTaskDequeueAsync(CancellationTokencancellationToken){Titem;如果(!_bufferQueue.TryDequeue(outitem)){锁(_syncRoot){如果(!_bufferQueue.TryDequeue(outitem)){varpromise=newTaskCompletionSource();cancellationToken.Register(()=>promise.TrySetCanceled());_promisesQueue.Enqueue(承诺);this.PromiseAdded.RaiseEvent(this,EventArgs.Empty);返回promise.Task;}}}返回Task.FromResult(item);}//////获取一个值,指示此实例是否有承诺。/////////如果此实例为真,否则为假。///publicboolHasPromises{get{return_promisesQueue.Where(p=>!p.Task.IsCanceled).Count()>0;}}//////当队列生成新的promise///时发生///publiceventEventHandlerPromiseAdded;这对你的用例来说可能有点矫枉过正(考虑到学习曲线),但是ReactiveExtentions提供了所有你可能想要的异步组合代理的胶水,你基本上订阅了这些变化,并在它们可用时推送给你,你可以让系统在单独的线程上推送更改。这是我目前正在使用的实现。公共类MessageQueue{ConcurrentQueuequeue=newConcurrentQueue();ConcurrentQueue>waitingQueue=newConcurrentQueue>();对象队列同步锁=新对象();publicvoidEnqueue(Titem){queue.Enqueue(item);处理队列();publicasyncTaskDequeueAsync(CancellationTokenct){TaskCompletionSourcetcs=newTaskCompletionSource();ct.R??egister(()=>{lock(queueSyncLock){tcs.TrySetCanceled();}});waitingQueue.Enqueue(tcs);ProcessQueues();返回tcs.Task.IsCompleted?tcs.Task.Result:等待tcs.Task;}privatevoidProcessQueues(){TaskCompletionSourcetcs=null;TfirstItem=default(T);锁(queueSyncLock){while(true){如果(waitingQueue.TryPeek(outtcs)&&queue.TryPeek(outfirstItem)){waitingQueue.TryDequeue(outtcs);如果(tcs.Task.IsCanceled){继续;}queue.TryDequeue(outfirstItem);}else{休息;}tcs.SetResult(firstItem);它工作正常,但在queueSyncLock上有很多争用,因为我经常使用CancellationToken来取消一些等待任务。当然,这导致我在BlockingCollections中看到的BlockingCollections少得多,但是......我想知道是否有更平滑、无锁的方法来实现同样的事情你可以用BlockingCollection(使用默认的ConcurrentQueue)和callWrapitintoTakeinaTask这样就可以await了:以上就是C#学习教程:等待任务队列的全部内容,分享给大家。如果对大家有用,需要进一步了解C#学习教程,希望大家多多关注——varbc=newBlockingCollection();Telement=awaitTask.Run(()=>bc.Take());本文收集自网络,不代表立场。如涉及侵权,请点击右侧联系管理员删除。如需转载请注明出处: