经纪人组件完成了GO-Micro中不同步骤的发送和接收。底部有很多方法,例如RabbitMQ,Kafka,Redis等。本文主要介绍使用兔子发送数据的GO-Micro的方法和原理。
经纪人的核心功能是发布和订阅,已发布和订阅。其定义是:
第一个参数是用于识别某种类型的消息的主题(主题)。
已发布的数据是通过消息传递的,该消息包括消息标头和消息主体,定义如下:
消息头是MAP,这是一组KV(钥匙值对)。
该消息是一个字节数组,在发送和接收时需要编码和解码。
订阅的第一个参数也是主题,用于过滤要接收的新闻。
订阅数据由处理程序处理。处理程序是一个函数,定义如下:
参数事件是一个接口,它需要特定的代理才能实现。定义如下:
当开发人员订阅数据时,他们需要实施处理程序的功能,接收事件的示例,提取数据进行处理以及根据不同的经纪人的说法,您也可能需要致电ack()。处理错误时,返回错误。
了解经纪人的定义后,让我们看一下如何使用Go-Micro发送和接收RabbitMQ消息。
如果您已经拥有RabbitMQ服务器,请跳过此步骤。
这是一种使用Docker快速启动RabbitMQ的方法。当然,您必须安装Docker。
执行以下命令来启动RabbitMQ的Docker容器:
然后进入容器以进行某些设置:
启动管理工具并禁用索引收集(将导致一些API500错误):
最后重新启动容器:
最后,在浏览器中输入http://127.0.0.0.0:15672访问它。默认用户名和密码是访客。
为了促进演示,让我们定义发布消息和接收消息的功能。发布功能使用Go-Micro提供的事件类型,其他类型也可以提供公共功能。这里发送的数据格式是JSON字符串。可以随意获取接收消息的功能名称,但是参数和返回值必须符合规格,即下面的代码中的外观。此功能也可以绑定到某种类型。
此处给出了代码,其中提供了一些注释,并且将进行详细的介绍。
主要逻辑是:
1.首先创建一个兔子经纪人,该经纪人实现标准经纪人接口。主要参数是兔子和兔子开关的访问地址,订户(或消费者)使用了预摘要。
2.然后通过NewsRvice创建Go-Micro服务,然后将经纪人设置为IN。这里将有很多在此初始化的内容。核心是创建一个RPCServer并将RPCServer绑定到该经纪人。
3.然后通过RegistersubsCriber注册订阅。该注册具有两个级别的功能:首先,如果在兔子中不存在队列时创建队列,并且指定主题的信息被订阅;接收数据处理方法。
让我们在此处查看订阅参数:
4.然后为了在此处演示,通过新闻通道创建了一个事件,该事件每秒发送1条消息。
5.最后,通过service.run()启动此程序。
在努力写作很长时间之后,请查看该程序的操作效果:
请注意,普通出版商和订阅者处于不同的过程中。仅出于示范的方便,他们才将它们放入程序中。因此,如果您仅发布消息,则不需要订阅代码。如果您仅订阅,则不需要发布消息的代码。当您使用它时,应该根据需要自己切割它。
让我们看一下新闻如何在Go-Micro和RabbitMQ中流通。我画了一个示意图:
这张照片有点复杂,这是一个详细的解释。
首先分为三个部分:兔子,消息发布零件和接收零件的消息。这是要区分的不同颜色。
此处理过程也可以分为业务零件,核心模块零件和插件零件。
从上图,您可以看到该消息需要通过此RabbitMQ插件处理。实际上,您可以使用此插件来实现消息的发送和接收。我已将此演示代码提交给GitHub,感兴趣的学生可以在文本结尾处获得GitHub仓库的地址。
从上面的这些部门中,我们可以理解设计师的总体设计思想,掌握关键节点,使用正确的右侧使用,并在存在问题时迅速找到。
这是因为路由。过程在寻找订阅时使用go-micro的标头信息:
此味精。主题在以下实例中返回主题字段:
除非专门适用于Go-Micro,否则其他框架将不会具有这样的标题。
因为在使用RabbitMQ的场景中,整个开发都是围绕RabbitMQ完成的,而Go-Micro的处理逻辑并未考虑可以使用RabbitMQ订阅来使用配件的情况。发布消息,主题和微主题的价值匹配的值匹配时间匹配时间匹配时间IT的主题是按照等效原则来处理的,因此您可以使用RabbitMQ消息随附的主题来设置消息标题。在接收消息后,rabbitmq.rbroker.subscribe,您可以进行此设置:
通过这种方式,由Go-Micro开发的消费者程序可以接收其他框架发布的消息,而其他框架无需改编。
Go-Micro的RabbitMQ插件底层使用另一个库:github.com/streamway/amqp
对于发布者,AMQP库将在断开RabbitMQ时通过GO频道同步通知Go-Micro,然后Go-Micro可以启动重新连接。该问题出现在此同步通知中,Go-Micro的RabbitMQ插件集接收连接和频道的封闭通知,但仅处理一个通知以重新连接。结果,无法释放锁定,并且在发布时需要此锁,这会导致发布者无限封锁。解决方案是解决方案。添加外层的周期。当收到所有通知时,然后连接到重新连接。
对于订户,当兔断开连接时,它将始终在特定的GO通道上阻止,直到返回值。此值意味着已重新建立连接,订户可以重建消费者渠道。该问题也出现在此阻止GO频道上,因为此GO通道每次收到AMQP关闭通知,并且都会重新分配一个值,并且Go Channel等待订户可能是以前的旧值,并且永远不会返回。订阅订阅人员被无限阻止。解决方案是添加时间。选择后,以便等待GO频道有机会更新新值。
代码不会发布。如果您有兴趣,可以前往GitHub查看:https://github.com/go-micro/plugins/commit/9f64710807221f3cc64fe075b075b07c666666666666c00c
对这两个问题的修改已合并到官方仓库中,每个人都可以获取最新的代码。
这两个坑被填充并基本上满足了我的需求。当然,可能还有其他坑,例如Go-Micro的RabbitMQ插件似乎没有发行商确认的功能。为了实现这一目标,您必须考虑如何更改它。
好吧,以上是本文的主要内容。
旧规则,该代码已上传到GitHub,欢迎访问:https://github.com/bosima/go-demo/main/go-micro-broker-broker-broker-rabbitmq
要收获更多的建筑知识,请注意微信的公共帐户Firefly Architecture.firfly Architecture.firfly content,请指出来源。