当前位置: 首页 > 后端技术 > Python

PythonReactive类库RxPy简介

时间:2023-03-25 20:05:15 Python

RxPy是ReactiveX的Python版本,这是一个非常流行的响应式框架。其实这些版本都是一样的,只是各个语言的实现不同而已。因此,如果你学会其中一个,使用其他响应式版本也是轻而易举的事。我以前听说过这个框架,最近决定仔细看看。基本概念ReactiveX中有几个核心概念,我们先简单介绍一下。Observable和Observer(可观察对象和观察者)首先,Observable和Observer,分别是可观察对象和观察者。Observable可以理解为发送一系列值的异步数据源。Observer类似于consumer,需要先订阅Observable才能接收到它发出的值。可以说,这套概念是设计模式中观察者模式和生产者消费者模式的结合。运算符(operator)另一个很重要的概念是运算符。操作符作用于Observable的数据流,并可以对其应用各种操作。更重要的是,运营商也可以链接在一起。这种链式函数调用不仅将数据和操作分开,而且使代码更加清晰和可读。一旦掌握了它,您就会爱上它。Single(单例)在RxJava及其变体中,还有一个特殊的概念叫做Single,它是一个只发出相同值的Observable。说白了就是单例。当然,如果熟悉Java等语言,对单例肯定不陌生。主题(subject)主题的概念很特殊,它既是一个Observable又是一个Observer。正是因为这个特性,Subject可以订阅其他Observable,也可以向其他Observer发射对象。在某些场景下,Subject会发挥很大的作用。调度器(scheduler)默认情况下,ReactiveX只在当前线程下运行,但如果需要,你也可以使用调度器让ReactiveX运行在多线程环境下。有很多调度器和相应的操作符可以处理多线程场景下的各种需求。Observer和Observable先来看一个最简单的例子,运行的结果会依次打印这些数字。这里的of是一个运算符,它根据给定的参数创建一个新的Observable。创建完成后可以订阅Observable,三个回调方法会在相应的时间执行。Observer一旦订阅了Observable,就会收到后续Observable发出的值。从rximportofob=of(1,2,34,5,6,7,7)ob.subscribe(on_next=lambdai:print(f'Received:{i}'),on_error=lambdae:print(f'Error:{e}'),on_completed=lambda:print('Completed'))这个例子看似微不足道,似乎不起作用。但是当你理解了Rx的一些核心概念后,你就会明白它是一个多么强大的工具。更重要的是,Observable产生数据和订阅的过程是异步的。如果你熟悉它,你可以使用这个特性做很多事情。OperatorRxPy中另一个非常重要的概念就是operator,甚至可以说operator是最重要的概念。几乎所有的功能都可以通过组合单个运算符来实现。熟练掌握运算符是学习RxPy的关键。Operators也可以和pipe函数连接起来,形成一个复杂的操作链。fromrximportof,operatorsasopimportrxob=of(1,2,34,5,6,7,7)ob.pipe(op.map(lambdai:i**2),op.filter(lambdai:i>=10)).subscribe(lambdai:print(f'Received:{i}'))RxPy中有大量的运算符,可以执行各种功能。让我们简单看一下一些常用的运算符。如果你熟悉Java8的流库或者其他函数式编程库,这些操作符应该感觉很熟悉。Creationaloperators首先是创建Observables的operators,下面列出一些比较常用的creationaloperators。operator的作用just(n)Observablerepeated_value(v,n)containingonly1valueRepeatedntimesObservableof(a,b,c,d)Observableempty()containingallparameters一个空的Observablefrom_iterable(iter)使用iterable创建一个Observablegenerate(0,lambdax:x<10,lambdax:x+1)使用初值和循环条件生成Observableinterval(n)Observable过滤器类型算子,每隔n秒定时发送整数序列Filtertype的主要功能operator是对Observable进行过滤和筛选。debounce运算符的作用按时间间隔过滤,范围内的值会被忽略distinct忽略重复值elementAt只发出第n位的值filter按条件过滤值first/last发出第一个/lastvalueskipskipthefirstnvaluestakeonlytakesthefirstnvaluesconversionoperatoroperatorfunctionflatMap将多个Observable的值转换并合并成一个ObservablegroupBy对值进行分组并返回多个Observable的map映射Observable对另一个Observablescan应用函数到Observable的每个值,然后返回下面的值Arithmeticoperatoroperatorfunctionaveragecountnumbermax最大值min最小值reduce将函数应用到每个值,然后返回最终计算结果sumsumSubjectSubjectis一个既是观察者又是可观察对象的特殊对象。不过这个对象一般不常用,但如果用于某些目的还是有用的。所以还是要介绍一下。下面的代码,因为第一个值在订阅时已经发出,所以只会打印订阅后发出的值。fromrx.subjectimportSubject,AsyncSubject,BehaviorSubject,ReplaySubject#SubjectisbothObserverandObservableprint('--------Subject--------')subject=Subject()subject.on_next(1)subject.subscribe(lambdai:print(i))subject.on_next(2)subject.on_next(3)subject.on_next(4)subject.on_completed()#234还有几个特殊的Subject,我们介绍一下以下。ReplaySubjectReplaySubject是一个特殊的Subject,它记录所有发出的值,无论它们何时被订阅。所以它可以用作缓存。ReplaySubject还可以接受一个bufferSize参数,指定最近可以缓存的数据条数,默认是all。下面的代码和上面几乎一模一样,但是因为使用了ReplaySubject,所以打印了所有的值。当然,你也可以尝试将订阅语句放在其他地方,看看输出是否有变化。#ReplaySubject会缓存所有的值,如果指定了参数,只会缓存最后几个值print('--------ReplaySubject--------')subject=ReplaySubject()subject.on_next(1)subject.subscribe(lambdai:print(i))subject.on_next(2)subject.on_next(3)subject.on_next(4)subject.on_completed()#1234BehaviorSubjectBehaviorSubject是一个特殊的Subject,它只会记录最近发出的值。并且在创建它的时候,必须指定一个初始值,所有订阅它的对象都可以收到这个初始值。当然,如果订阅晚了,这个初始值也会被后面发出的值覆盖,这一点要注意。#BehaviorSubject会缓存上次发出的值,除非Observable已经关闭print('--------BehaviorSubject--------')subject=BehaviorSubject(0)subject.on_next(1)subject.on_next(2)subject.subscribe(lambdai:print(i))subject.on_next(3)subject.on_next(4)subject.on_completed()#234AsyncSubjectAsyncSubject是一个特殊的Subject,顾名思义是一个异步的Subject,它只会在Observer完成时才发出数据,并且只发出最后的数据。所以下面的代码只会输出4。如果co_completedcall的最后一行被注释掉,则什么也不会输出。#AsyncSubject会缓存上次发射的值,并且只会在Observable关闭后才开始发射print('--------AsyncSubject--------')subject=AsyncSubject()subject.on_next(1)subject.on_next(2)subject.subscribe(lambdai:print(i))subject.on_next(3)subject.on_next(4)subject.on_completed()#4Scheduler虽然RxPy是一个异步框架,但它实际上默认情况下它仍然在单个线程上运行,因此如果您使用阻止线程运行的东西,程序将冻结。当然,对于这些情况,我们可以使用其他的Scheduler来调度任务,以保证程序能够高效运行。下面的示例创建了一个ThreadPoolScheduler,它是一个基于线程池的调度程序。两个Observables使用subscribe_on方法指定调度器,所以它们会使用不同的线程来工作。importrxfromrx.schedulerimportThreadPoolSchedulerfromrximportoperatorsasopimportmultiprocessingimporttimeimportthreadingimportrandomdeflong_work(value):时间。sleep(random.randint(5,20)/10)返回值pool_schedular.(5).pipe(op.map(lambdai:long_work(i+1)),op.subscribe_on(pool_schedular)).subscribe(lambdai:print(f'Work1:{threading.current_thread().name},{i}'))rx.of(1,2,3,4,5).pipe(op.map(lambdai:i*2),op.subscribe_on(pool_schedular)).subscribe(lambdai:打印(f'Work2:{threading.current_thread().name},{i}'))如果你观察过各个算子的API,你会发现大多数算子都支持可选的Scheduler参数,这些参数是operator指定一个调度器。如果operator上指定了调度器,则优先使用该调度器;其次,将使用订阅方法中指定的调度程序;如果以上均未指定,则将使用默认调度程序。应用场景不错。介绍完ReactiveX的一些知识,我们再来看看如何使用ReactiveX。在很多应用场景中,ReactiveX可以用来抽象数据处理,简化概念。防止重复发送在很多情况下,我们需要控制事件的发生间隔。比如某个按钮不小心被按下了几次,我们只希望第一个按钮生效。这种情况下可以使用debounce算子,它会对Observable进行过滤,小于指定时间间隔的数据会被过滤掉。debounce运算符将等待一段时间,直到间隔结束,然后才会发出最后一个数据。如果要过滤后面的数据,发送第一个数据,需要用到throttle_first算子。下面的代码可以更好的演示这个算子,快速按下回车键发送数据,注意观察按键和数据显示的关系,也可以把throttle_first算子换成debounce算子,然后看看会发生什么输出有什么变化,也可以把管道中的操作符完全注释掉,然后看看输出会有什么变化。importrxfromrximportoperatorsasopfromrx.subjectimportSubjectimportdatetime#debounceoperator,只有在时间间隔之外才能发出ob=Subject()ob.pipe(op.throttle_first(3)#op.debounce(3)).subscribe(on_next=lambdai:print(i),on_completed=lambda:print('Completed'))print('按回车键打印,按其他键退出')whileTrue:s=input()ifs=='':ob.on_next(datetime.datetime.now().time())else:ob.on_completed()break操作数据流如果需要对一些数据进行操作,也有很多操作符可以满足需求。当然,这部分功能并不是ReactiveX独有的,如果你对Java8流库有所了解,你会发现两者的功能几乎是一模一样的。下面是一个简单的例子,结合两个数据源,找出其中所有的偶数。importrxfromrximportoperatorsasopfromrx.subjectimportSubjectimportdatetime#操作数据流some_data=rx.of(1,2,3,4,5,6,7,8)some_data2=rx.from_iterable(range(10,20))some_data.pipe(op.merge(some_data2),op.filter(lambdai:i%2==0),#op.map(lambdai:i*2)).subscribe(lambdai:print(i))或者一个使用reduce求1-100整数之和的简单示例。importrxfromrximportoperatorsasopfromrx.subjectimportSubjectimportdatetimerx.range(1,101).pipe(op.reduce(lambdaacc,i:acc+i,0)).subscribe(lambdai:print(i))