当前位置: 首页 > 后端技术 > Python

使用Python开发EMQXMQTT服务器插件

时间:2023-03-25 21:14:14 Python

从v4.1版本开始,EMQXMQTT服务器提供了专门的多语言支持插件emqx_extension_hook,现在支持使用其他编程语言来处理EMQX中的hook事件,开发者可以使用Python或Java快速开发自己的插件,并在官方功能的基础上进行扩展,满足自己的业务场景。例如:验证某个客户端的登录权限:在客户端连接时触发相应的函数,通过参数获取客户端信息,然后通过读取数据库、比对等操作判断是否有登录权限,记录在线状态以及客户端的登出历史:客户端状态变化时触发相应的函数,通过参数获取客户端信息,在数据库中重写客户端的在线状态,验证客户端的PUB/SUB操作权限:触发相应的函数发布/订阅时,通过参数获取客户端信息和当前主题,判断客户端是否有相应的操作权限来处理会话(Session)和消息(Message)事件,实现订阅关系和消息处理/存储:触发相应函数当消息发布,状态改变时,获取当前客户端信息、消息状态和消息内容,转发给Kafka或数据库存储。注意:消息挂钩仅在企业版中受支持。Python和Java驱动都是基于Erlang/OTP-Port进程间通信实现的,具有非常高的吞吐性能。本文以Python扩展为例,介绍EMQX跨语言扩展的使用方法。Python扩展示例要求EMQX所在服务器必须安装Python3.6以上版本。使用步骤为通过pip安装PythonSDK并调整EMQX配置,确保相关配置项正确指向Python项目。导入SDK并编写代码。Python插件安装通过pip命令在本地安装SDK。确保使用pip3安装:pip3installemqx-extension-sdk修改配置修改emqx-extension-hook插件配置,正确使用extension:##设置支持的驱动####Value:python2|蟒蛇3|javaexhook.drivers=python3##脚本的搜索路径/libraryexhook.drivers.python3.path=data/extension/hooks.py##调用超时####Value:Duration##exhook.drivers.python3.call_timeout=5s##初始模块名##你的文件名或模块名exhook.drivers.python3.init_module=hooks编写代码在emqx/data/extension目录新建hooks.py文件,导入SDK编写业务逻辑,示例程序如下如下:##data/extension/hooks.py来自emqx_extension。hooksimportEmqxHookSdk,hooks_handlerfromemqx_extension.typesimportEMQX_CLIENTINFO_PARSE_T,EMQX_MESSAGE_PARSE_T#继承SDKHookSdk类classCustomHook(EmqxHookSdk):#使用装饰器注册hooks@hooks_handler()defon_client_connect:EMQX_CLIENTINFO_PARSE_T=None,props:dict=None,state:list=None:print(f'[PythonSDK][on_client_connect]{conninfo.clientid}connecte')@hooks_handler()defon_client_connected(self,clientinfo:EMQX_CLIENTINFO_PARSE_T,state:list=None):print(f'[PythonSDK][on_client_connected]{clientinfo.clientid}已连接')@hooks_handler()defon_client_check_acl(self,clientinfo:EMQX_CLIENTINFO_PARSE_T,pubsub:str,topic:str,result:bool,state:tuple)->bool:print(f'[PythonSDK][on_client_check_acl]{clientinfo.username}checkACL:{pubsub}{topic}')#用户名为空时,ACL验证不通ifclientinfo.用户名=='':返回False返回True@hooks_handler()defon_client_authenticate(self,clientinfo:EMQX_CLIENTINFO_PARSE_T,authresult,state)->bool:print(f'[PythonSDK][on_client_authenticate]{clientinfo.clientid}authenticate')#当clientid不为空时,认证通过clientinfo.clientid!='':returnTruereturnFalse#on_message_*只支持企业版@hooks_handler()defon_message_publish(self,message:EMQX_MESSAGE_PARSE_T,state):print(f'[PythonSDK][on_message_publish]{message.topic}{message.payload}')emqx_hook=CustomHook(hook_module=f'{__name__}.emqx_hook')definit():returnemqx_hook.start()defdeinit():returnstartemqx_extension_hookplugin,如果配置错误或代码写错错误将无法正常启动。启动后尝试建立MQTT连接,观察业务运行情况。./bin/emqx_ctlpluginsloademqx_extension_hook高级开发目前EMQXPython扩展SDK已经开源。如果您对可控性和性能有更高的要求,或者需要使用Python2.7运行环境,欢迎贡献代码或者基于原例开发开发:代码仓库:emqx-extension-python-sdkPython原例,您可以使用本例自行打包:[emqx-extension-hookmain.py版权声明:本文为EMQ原创,转载请注明出处。