当前位置: 首页 > 后端技术 > Python

基于Python实现循环队列高效定时器

时间:2023-03-26 19:14:14 Python

基于python实现环形队列高效定时器timerpython实现代码importtimeimportredisimportmultiprocessingclassBase:"""redisconfiguration"""redis_conf={}"""环形队列使用redis存储"""_ri=None"""timingWheelsize"""slot_num=15"""存储环队列使用的redis缓存key"""cache_key='wheel:slot_'def__init__(self,**kwargs):forkinkwargs:ifhasattr(self,k):setattr(self,k,kwargs[k])self._ri=redis.Redis(**self.redis_conf)classTimer(Base):"""当前槽的下标"""_current=0"""Eventprocessing"""event_handler=Nonedefworker(self):"""#TODO用1W事件ID独立进程测试每个卡槽的处理效率,将事件ID分发给事件处理器:return:"""key=self.cache_key+str(self._current)#获取当前卡槽中需要触发的事件IDevent_ids=self._ri.zrangebyscore(key,0,0)#删除需要触发的事件ID触发dinthecurrentcardslotself._ri.zremrangebyscore(key,0,0)#遍历当前卡槽的所有剩余事件ID,将剩余循环数减一倍surplus_event_ids=self._ri.zrange(key,0,-1)formidinsurplus_event_ids:self._ri.zincrby(key,mid,-1)#将事件ID传递给handlerformidinevent_ids:self.event_handler(eid=mid)exit(0)defrun(self):"""启动进程:return:"""whileTrue:p=multiprocessing.Process(target=self.worker)p.start()time.sleep(1)self._current=int(time.time())%self.slot_numclassTimerEvent(Base):defadd(self,event_id,emit_time):"""添加事件ID到定时器:paramevent_id:事件ID:paramemit_time:触发时间:return:"""current_time=int(time.time())diff=emit_time-current_timeifdiff>0:#计算周期数cycle=int(diff/self.slot_num)#计算slot的索引到被存储索引=(diff%self.slot_num+current_time%self.slot_num)%self.slot_numres=self._ri.zadd(self.cache_key+str(index),str(event_id),cycle)returnTrueifreselseFalsereturnFalse#TODO批量添加同一时间不同事件ID#TODO批量添加不同时间不同事件ID通过环形队列实现高效的任务触发redis集合[slot]的设计说明由一个环形数组key_1key_2....key_n有序集合命令ZADDkeyscorememberorderedset包含两部分,一个是score,一个是memberscore作为剩余循环次数和meber作为事件IDpython多进程计算当前时间应该处理的卡槽当前槽索引=(当前时间%卡槽总数+当前时间戳%卡槽总数)%总数cardslots取余操作创建一个独立的子进程处理当前子进程,需要快速读取剩余循环次数为0。将事件ID传递给事件处理程序进行处理。应用说明启动定时器importTimerimporttimedefevent_handler(eid):print(time.strftime('%Y-%m-%d%H:%M:%S',time.localtime(time.time())),eid)t=Timer(redis_conf={'host':'127.0.0.1','port':6379,'password':'123456','db':0},event_handler=event_handler)times=int(time.time())print('当前时间是'+time.strftime('%Y-%m-%d%H:%M:%S',time.localtime(times)))t.run()添加延时触发事件IDimportTimerEventimporttimete=TimerEvent(redis_conf={'host':'127.0.0.1','port':6379,'密码':'123456','db':0})times=int(time.time())print(time.strftime('%Y-%m-%d%H:%M:%S',time.localtime(times)))after_seconds_alert=20forxinrange(100):te.add(x,times+after_seconds_alert+x)print('FirsEmitwillhappenat'+time.strftime('Start:%Y-%m-%d%H:%M:%S',time.localtime(times+after_seconds_alert)))参考文章10w定时任务,如何高效触发超时