sonicorchagent线程调度的最小单位是Consumer。Consumer是基于epoll事件Selectable的进一步封装。每次epoll事件发生时,都会触发orchagent进行调度。Orch是资源的集合。一个orch可以包含多个消费者。例如,aclorch将监控多个redistables。classExecutor//设计假设//1.一个Orch可以有一个或多个Executor//2.一个Executor必须属于一个且仅属于一个Orch//3.Executor将持有一个指向new-edselectable的指针,并将其删除duringdtor//设计假设://1.一个orch可以有一个或多个Executor//2.一个Executor必须属于一个orch并且只能属于一个orch//3.Executor有一个指向新的Selectable结构的指针,必须是在析构函数中删除,否则会泄漏}//装饰SelectableintgetFd()override{returnm_selectable->getFd();}voidreadData()override{m_selectable->readData();}boolhasCachedData()override{returnm_selectable->hasCachedData();}boolinitializedWithData()override{returnm_selectable->initializedWithData();}voidupdateAfterRead()override{m_selectable->updateAfterRead();}Orch*getorch(){返回m_orch;}//禁用复制Executor(constExecutor&)=delete;Executor&operator=(constExecutor&)=delete;//Executeoneventhappen//执行执行事件,drain是一个辅助函数pointtoanorch//获取底层selectable以获得SelectableSelectable*getSelectable()const{returnm_selectable;}};classExecutor只是一个中间派生类,orch直接使用classConsumer和classExecutableTimerclassConsumer消费类一般用来处理app_db订阅事件,asic_db一般用来处理syncd响应事件。typedefstd::pairFieldValueTuple;#definefvFieldstd::get<0>#definefvValuestd::get<1>typedefstd::tuple>KeyOpFieldsValuesTuple;#definekfvKeystd::get<0>#definekfvOpstd::get<1>#definekfvFieldsValuesstd::get<2>typedefmapSyncMap;classConsumer:publicExecutor{public:Consumer(TableConsumable*select,Orch*orch):Executor(select,orch){}TableConsumable*getConsumerTable()const{returnstatic_cast(getSelectable())复制代码;}stringgetTableName()const{returngetConsumerTable()->getTableName();}//事物执行voidexecute();空漏();/*存储最新的“黄金”状态*///TODO:隐藏?SyncMapm_toSync;};voidConsumer::execute()epoll事件触发后,需要调用该函数从数据库中读取指定key的内容,存入m_toSync,用于后续处理voidConsumer::execute(){SWSS_LOG_ENTER();std::deque条目;//调用pops函数,从redis数据库读取数据,返回KeyOpFieldsValuesTuple结构getConsumerTable()->pops(entries);/*没有弹出*/if(entries.empty()){return;}//遍历每个事件for(auto&entry:entries){stringkey=kfvKey(entry);stringop=kfvOp(entry);/*记录传入任务记录事件*/if(gSwssRecord){Orch::recordTuple(*this,entry);}/*如果有新任务来了,或者有DEL任务来了,我们直接放到getConsumerTable().m_toSyncmap中*///这里进行合并,直接重写if(m_toSync.find(key)==m_toSync.end()||op==DEL_COMMAND){m_toSync[key]=entry;}/*如果旧任务仍然存在,我们将旧任务与新任务组合*//**/else{KeyOpFieldsValuesTupleexisting_data=m_toSync[钥匙];autonew_values=kfvFieldsValues(entry);autoexisting_values=kfvFieldsValues(existing_data);//遍历每个新值for(autoit:new_values){stringfield=fvField(it);字符串值=fvValue(it);autoiu=existing_values.begin();while(iu!=existing_values.end())//遍历每一个旧值{stringofield=fvField(*iu);if(field==ofield)//同一个字段,覆盖旧值,这里应该跳出while,代码效率差iu=existing_values.erase(iu);否则我++;}/*添加新值*/existing_values.push_back(FieldValueTuple(field,value));}m_toSync[key]=KeyOpFieldsValuesTuple(key,op,existing_values);}}//执行所有排序好的任务drain();}假设有一个任务键值对如下:key=test;op=set;value={A:a,B:b,C:c,}第一个触发任务写在APP_DB中添加:key=test;op=set;value={A:a,B:b}加入orchagent只是将任务读入m_toSync,但是由于某种原因任务没有执行,仍然驻留在m_toSync中。第二次写:key=test;op=set;value={B:b1,C:c}然后execute函数后,m_toSync会是:key=test;op=set;value={A:a,B:b1,C:c}voidConsumer::drain()执行m_toSync中的任务。voidConsumer::drain(){if(!m_toSync.empty())m_orch->doTask(*this);}classOrchclassOrch{public://每个orch都会连接到数据库和它需要的表名订阅,以及订阅表产生的事件的优先级//默认优先级订阅多表Orch(DBConnector*db,constvector&tableNames);//订阅多个表,说明每个表的优先级Orch(DBConnector*db,constvector&tableNameWithPri);//连接多个数据库Orch(constvector&tables);虚拟~Orch();//获取orch的所有epoll事件vectorgetSelectables();/*迭代m_consumerMap中的所有消费者并运行doTask(Consumer)*///在所有消费者中执行m_sync中的orch任务voiddoTask();/*针对特定执行器运行doTask*///任务的来源可以是消费者、NotificationConsumer、SelectableTimervirtualvoiddoTask(Consumer&consumer)=0;虚拟无效doTask(NotificationConsumer&consumer){}virtualvoiddoTask(SelectableTimer&timer){}/*TODO:重构记录*/staticvoidrecordTuple(Consumer&consumer,KeyOpFieldsValuesTuple&tuple);protected://消费者map,一个orch可以订购多个table,key为tableName,value为ExecutorConsumerMapm_consumerMap;//与调试相关staticvoidlogfileReopen();串转储元组(消费者&消费者,KeyOpFieldsValuesTuple&元组);ref_resolve_statusresolveFieldRefValue(type_map&,conststring&,KeyOpFieldsValuesTuple&,sai_object_id_t&);boolparseIndexRange(conststring&input,sai_uint32_t&range_low,sai_uint32_t&range_high);boolparseReference(type_map&type_maps,string&ref,string&table_name,string&object_name);ref_resolve_statusresolveFieldRefArray(type_map&,conststring&,KeyOpFieldsValuesTuple&,vector&);/*注意:消费者将拥有thisclass*///内部函数添加一个Executor,给addConsumer使用voidaddExecutor(stringexecutorName,Executor*executor);Executor*getExecutor(stringexecutorName);private://添加一个消费者voidaddConsumer(DBConnector*db,stringtableName,intpri=default_orch_pri);};voidOrch::addConsumer(......)voidOrch::addExecutor(stringexecutorName,Executor*executor){m_consumerMap.emplace(std::piecewise_construct,std::forward_as_tuple(executorName),std::forward_as_tuple(executor));}//添加一个消费者voidOrch::addConsumer(DBConnector*db,stringtableName,intpri){if(db->getDbId()==CONFIG_DB||db->getDbId()==STATE_DB){addExecutor(tableName,newConsumer(newSubscriberStateTable(db,tableName,TableConsumable)::DEFAULT_POP_BATCH_SIZE,pri),this));}else{addExecutor(tableName,newConsumer(newConsumerStateTable(db,tableName,gBatchSize,pri),this));}}voidOrch::doTask(......)执行本orch中每个consumer的m_toSync中的task,不管这次是从redis读取task还是之前没有处理过voidOrch::doTask(){for(auto&it:m_consumerMap){it.second->drain();}}classOrch2orch2是基于orch的封装,增强了代码的可读性。Orch2类:publicOrch{public:Orch2(DBConnector*db,conststd::string&tableName,Request&request,intpri=default_orch_pri):Orch(db,tableName,pri),request_(request){}protected:virtualvoiddoTask(消费者&消费者);virtualbooladdOperation(constRequest&request)=0;virtualbooldelOperation(constRequest&request)=0;private:Request&request_;};voidOrch2::doTaskvoidOrch2::doTask(Consumer&consumer){SWSS_LOG_ENTER();自动它=consumer.m_toSync.begin();while(it!=consumer.m_toSync.end()){boolerase_from_queue=true;尝试{request_.parse(it->second);autoop=request_.getOperation();如果(op==SET_COMMAND){erase_from_queue=addOperation(request_);}elseif(op==DEL_COMMAND){erase_from_queue=delOperation(request_);}else{SWSS_LOG_ERROR("错误操作。检查RequestParser:%s",op.c_str());}}catch(conststd::invalid_argument&e){SWSS_LOG_ERROR("解析错误:%s",e.what());}catch(conststd::logic_error&e){SWSS_LOG_ERROR("逻辑错误:%s",e.what());}catch(conststd::exception&e){SWSS_LOG_ERROR("在请求解析器中捕获异常:%s",e.what());}catch(...){SWSS_LOG_ERROR("在请求解析器中捕获到未知异常");}request_.clear();//执行成功,那么从m_tosync中删除,否则执行下一个任务if(erase_from_queue){it=consumer.m_toSync.erase(it);}else{++it;}}}