当前位置: 首页 > 编程语言 > C#

如何使用ConcurrentQueue 进行线程处理分享

时间:2023-04-11 02:44:10 C#

C#学习教程:如何使用ConcurrentQueue进行线程化我有一个返回数据表的进程。反过来,每个DataTable都与前一个DataTable合并。在最终BulkCopy(OutOfMemory)之前,存在记录过多无法保留的问题。所以,我决定我应该立即处理每个传入的DataTable。考虑ConcurrentQueue...但我看不到WriteQueuedData()方法如何知道WriteQueuedData()列并将其写入数据库。例如:publicclassTableTransporter{privateConcurrentQueuetableQueue=newConcurrentQueue();publicTableTransporter(){tableQueue.OnItemQueued+=newEventHandler(WriteQueuedData);//没有可用事件}publicvoidExtractData(){DataTable表;//执行数据提取tableQueue.Enqueue(table);}privatevoidWriteQueuedData(objectsender,EventArgse){BulkCopy(e.Table);我的第一个问题是,除了我实际上没有任何要订阅的事件这一事实之外,如果我异步调用ExtractData(),这就是我所需要的吗?其次,我是否遗漏了ConcurrentQueue函数的方式,需要某种形式的触发器才能与排队的对象异步工作?更新我刚刚从具有OnItemQueued事件处理程序的ConcurrentQueue派生了一个类。然后:newpublicvoidEnqueue(DataTableTable){base.Enqueue(Table);OnTableQueued(新TableQueuedEventArgs(表));}publicvoidOnTableQueued(TableQueuedEventArgstable){EventHandlerhandler=TableQueued;if(handler!=null){handler(this,table);}}对这个实现有什么顾虑吗?根据我对问题的理解,你遗漏了一些东西。并发队列是一种数据结构,旨在接受多个线程读取和写入队列而不显式锁定数据结构。(所有爵士乐都是在幕后处理的,或者集合是以不需要锁定的方式实现的。)考虑到这一点,您尝试使用的模式看起来像是“生产者/消费者”。首先,您有产生工作的任务(并将项目添加到队列中)。第二个你有第二个任务消耗队列中的东西(和出队项目)。所以你真的需要两个线程:一个用于添加项目,一个用于删除项目。因为您使用的是并发集合,所以可以有多个线程添加项目和多个线程删除项目。但是很明显,你越是和并发队列争论,它就会越快成为瓶颈。我认为ConcurrentQueue仅在极少数情况下有用。它的主要优点是它是无锁的。然而,通常生产者线程必须以某种方式通知消费者线程有数据可供处理。线程之间的这种信号传递需要锁定并抵消了使用ConcurrentQueue的好处。同步线程最快的方法是使用Monitor.Pulse(),它只在锁中运行。所有其他同步工具甚至更慢。当然,消费者可以继续检查队列中是否有没有锁的东西,但这是对处理器资源的巨大浪费。如果消费者在检查之间等待,情况会好一些。在写入队列时抛出一个线程是一个非常糟糕的主意。使用ConcurrentQueue节省1微秒可能会被执行事件处理程序完全浪费,这可能需要1000倍的时间。如果所有处理都在事件处理程序或异步调用中完成,那么问题仍然是为什么需要队列?最好将数据直接传递给处理程序,根本不使用队列。请注意,ConcurrentQueue的实现相当复杂以允许并发。在大多数情况下,最好使用普通队列并锁定对队列的每次访问。由于队列访问只需要几微秒,因此2个线程在同一微秒内访问队列的可能性极小,并且几乎不会因锁定而造成任何延迟。使用带有锁定的普通队列通常会比ConcurrentQueue更快地执行代码。这是我提出的完整解决方案:publicclassTableTransporter{privatestaticint_indexer;privateCustomQueuetableQueue=newCustomQueue();私有函数RunPostProcess;私有字符串文件名;publicTableTransporter(){RunPostProcess=newFunc(SerializeTable);tableQueue.TableQueued+=newEventHandler(tableQueue_TableQueued);}voidtableQueue_TableQueued(objectsender,TableQueuedEventArgse){//用表做点什么//我不知道如何在第三个参数中传递自定义对象RunPostProcess.BeginInvoke(e.Table,newAsyncCallback(PostComplete),filename);}publicvoidExtractData(){//执行数据提取tableQueue.Enqueue(MakeTable());Console.WriteLine("表数[{0}]",tableQueue.Count);}privateDataTableMakeTable(){returnnewDataTable(String.Format("Table{0}",_indexer++));}privatestringSerializeTable(DataTableTable){stringfile=Table.TableName+".xml";DataSetdataSet=newDataSet(Table.TableName);数据集.Tables.Add(表);Console.WriteLine([{0}]写入{1}",Thread.CurrentThread.ManagedThreadId,文件);字符串xmlstream=String.Empty;使用(MemoryStreammemstream=newMemoryStream()){XmlSerializerxmlSerializer=newXmlSerializer(typeof(DataSet));XmlTextWriterxmlWriter=newXmlTextWriter(memstream,Encoding.UTF8);xmlSerializer.Serialize(xmlWriter,数据集);xmlstream=UTF8ByteArrayToString(((MemoryStream)xmlWriter.BaseStream).ToArray());使用(varfileStream=newFileStream(file,FileMode.Create))fileStream.Write(StringToUTF8ByteArray(xmlstream),0,xmlstream.Length+2);}文件名=文件;返回文件;}privatevoidPostComplete(IAsyncResultiasResult){stringfile=(string)iasResult.AsyncState;Console.WriteLine([{0}]已完成:{1}",Thread.CurrentThread.ManagedThreadId,文件);RunPostProcess.EndInvoke(iasResult);}publicstaticStringUTF8ByteArrayToString(Byte[]ArrBytes){returnnewUTF8Encoding().GetString(ArrBytes);}浦blicstaticByte[]StringToUTF8ByteArray(StringXmlString){返回新的UTF8Encoding().GetBytes(XmlString);}}publicsealedclassCustomQueue:ConcurrentQueue{publiceventEventHandlerTableQueued;publicCustomQueue(){}publicCustomQueue(IEnumerableTableCollection):base(TableCollection){}newpublicvoidEnqueue(DataTableTable){base.Enqueue(Table);OnTableQueued(新TableQueuedEventArgs(表));}publicvoidOnTableQueued(TableQueuedEventArgstable){EventHandlerhandler=TableQueued;如果(处理程序!=null){处理程序(这个,表);}}}公共类TableQueuedEventArgs:EventArgs{#regionFields#endregion#regionInitpublicTableQueuedEventArgs(DataTableTable){this.Table=Table;}#endregion#regionFunctions#endregion#regionPropertiespublicDataTableTable{get;set;}#endregion}作概念的证书,它似乎运行良好我最多看4个工人线程序。以上就是C#学习教程:如何使用ConcurrentQueue进行线程处理分享的全部内容。如果对你有用,需要进一步了解C#学习教程,希望大家多多关注。本文收集自网络,不代表立场。如涉及侵权请点击右侧联系管理员删除。如需转载请注明出处: