当前位置: 首页 > 科技观察

Akka用法系列3:层次结构

时间:2023-03-17 22:49:10 科技观察

Akka以层次结构组织Actor。1、Akka的层次结构我们需要实现一个翻译模块,其功能是输入中文,输出多种语言。我们可以让一个MasterActor负责接收外部输入,多个WorkerActor负责将输入翻译成特定的语言。MasterActor和WorkerActor之间存在层级关系。下图显示了这种层次结构。具体代码实现如下。classMasterextendsActorwithActorLogging{valenglish2chinese=context.actorOf(Props[English2Chinese],"English2Chinese")valenglish2cat=context.actorOf(Props[English2Cat],"English2Cat")defreceive={caseeng1:String=>{english2chinese!eng1english2cat}ActorclassActorOfclassActorActorOf}{defreceive={caseeng:String=>{println("我不会翻译!")}}}classEnglish2CattextendsActorwithActorLogging{defreceive={caseeng:String=>{println("喵喵喵!")}}}objectMain{defmain(args:Array[String])={valsys=ActorSystem("system")valmaster=sys.actorOf(Props[Master],"Master")master!"Hello,world!"}}我们在MasterActor中使用上下文。actorOf实例化了English2Chinese和English2Cat,使得它们之间可以形成层次关系。他们的演员地址证实了这一点。上面的Actor层级就是我们程序中Actor的层级。该层次结构是ActorSystem层次结构的一部分。ActorSystem层次结构从根节点开始有两个子节点:UserGuardian和SystemGuardian。用户程序产生的所有Actor都在UserGuardian节点下,SystemGuardian节点包含系统中的一些Actor,比如deadLetterListener。如果一个Actor已经停止,发送到这个Actor的消息将被转发到deadLetterListener。所以完整的Actor层次结构如下所示。2、Akka的容错机制对于分布式系统来说,容错机制是一个非常重要的指标。那么Akka是如何实现容错的呢?Akka的容错机制是基于层次结构的:Akka在Actor上增加了一个监控策略来监控它的子Actor。下面的代码是给Actor添加一个监控策略。监控策略的内容是:如果子Actor在运行过程中抛出Exception,则对子Actor执行停止动作(即停止子Actor)。overridevalsupervisorStrategy=OneForOneStrategy(){case_:Exception=>Stop}Akka的监控策略一共支持四种动作:Stop、Resume、Restart和Escalate。停止:子Actor停止。恢复:子参与者忽略引发异常的消息并继续处理后续消息。重启:子actor停止,重新初始化一个子actor来处理后续的消息。Escalate:错误严重到自己无法处理,将错误信息上报给父actor。Akka的监控策略分为两种。一个是一对一。该策略只对抛出Exception的子Actor执行相应的动作。仍然以上面的翻译模块为例,我们添加一个OneForOneStop监控策略。classMaster1extendsActorwithActorLogging{valenglish2Chinese=context.actorOf(Props[English2Chinese1],"English2Chinese")valenglish2Cat=context.actorOf(Props[English2Cat1],"English2Cat")overridevalsupervisorStrategy=OneForOneStrategy(){case_:Exception=>Stop}overridedefreceive={caseeng:String=>{english2Cat!eng;english2Chinese!eng;}}}classEnglish2Chinese1extendsActorwithActorLogging{overridefreceive={caseeng:String=>{println("翻译不出来")}}}classEnglish2Cat1extendsActorwithActorLogging{overridefreceive={caseeng:String=>{thrownewException("ExceptioninEnglish2Cat1")}}}objecthierarchy1{defmain(args:Array[String])={valsystem=ActorSystem("system")valmaster=system.actorOf(Props[Master1],"Master")master!"你好,世界"Thread.sleep(1000)master!"Hello,world"}}运行这段代码,我们得到下面的结果。从下面的结果可以看出:第一轮English2Cat1抛出Exception,English2Chinese1正常;第二轮,English2Cat1死了,English2Chinese1也死了。这个结果表明监控策略已经停止了MasterActor的所有子Actor。另一个是AllForOne。如果任何子Actor抛出异常,此监视策略将对所有子Actor执行操作。classMaster2extendsActorwithActorLogging{valenglish2Chinese=context.actorOf(Props[English2Chinese2],"English2Chinese")valenglish2Cat=context.actorOf(Props[English2Cat2],"English2Cat")覆盖supervisorStrategy=AllForOneStrategy(){overcase_:Exception=String=>{english2Cat!eng;english2Chinese!eng;}}}运行这段代码,我们得到如下结果。从下面的结果可以看出:第一轮English2Cat1抛出Exception,English2Chinese1正常;第二轮,English2Cat1死了,English2Chinese1也死了。这个结果表明监控策略已经停止了MasterActor的所有子Actor。3.总结当我们使用Akka开发并行程序时,我们可以使用层次结构来组织Actor。层次结构不仅更符合人的直觉,也为容错提供了机制保障。【本文为专栏作家“李莉”原创稿件,转载请联系授权】点此查看该作者更多好文

最新推荐
猜你喜欢