澶у濂斤紝鎴戞槸鑰佽儭銆傛渶杩戯紝鎴戜竴鐩村湪鍜屾垜鐨勬湅鍙嬩竴璧峰仛浜嬨€傛垜鏄粺璁″鍑鸿韩锛岀紪绋嬭兘鍔涘叾瀹炲緢宸紝鏈夌偣纰嶄簨銆傚洜姝わ紝鏈夊繀瑕佽ˉ鍏呭熀纭€鐭ヨ瘑銆傛湰绯诲垪浼氭洿鏂板嚑绡囨枃绔狅紝鏈夊叴瓒g殑鍚屽鍙互涓€璧峰涔犱氦娴併€俍eroMQ姒傝堪ZeroMQ锛堝張鍚嵜楳Q銆丮Q鎴杬mq锛夊氨鍍忎竴涓彲宓屽叆鐨勭綉缁滃簱锛屼絾鍙堝儚涓€涓苟鍙戞鏋躲€俍eroMQ绫讳技浜庢爣鍑嗙殑Berkeleysockets锛屽畠鎻愪緵浜嗗悇绉嶄紶杈撳伐鍏凤紝渚嬪鐢ㄤ簬杩涚▼鍐呫€佽繘绋嬮棿銆乀CP鍜屽鎾殑鍘熷瓙娑堟伅浼犺緭鐨剆ockets銆侼瀵筃濂楁帴瀛楄繛鎺ュ彲浠ヤ娇鐢ㄥ悇绉嶆ā寮忓疄鐜帮紝鍖呮嫭锛氬彂甯?璁㈤槄銆佷换鍔″垎閰嶃€佽姹?鍥炲銆俍eroMQ瓒冲蹇紝鍙互鍏呭綋闆嗙兢浜у搧鐨勭粨鏋勩€俍eroMQ鐨勫紓姝/O妯″瀷鎻愪緵鍙墿灞曠殑澶氭牳搴旂敤绋嬪簭锛屼娇鐢ㄥ紓姝ユ秷鎭鐞嗕换鍔°€俍eroMQ鏍稿績閲囩敤C璇█缂栧啓锛屾敮鎸丆銆丆++銆乯ava銆乸ython绛夊绉嶇紪绋嬭瑷€鐨凙PI锛屽彲浠ヨ繍琛屽湪澶у瀷鐨勫ぇ澶氭暟鎿嶄綔绯荤粺涓婃€荤粨濡備笅锛毭楳Q锛圸eroMQ锛夋槸涓€涓绾跨▼缃戠粶搴撳熀浜庢秷鎭槦鍒椼€傚畠灏佽浜嗙綉缁滈€氫俊銆佹秷鎭槦鍒椼€佺嚎绋嬭皟搴︾瓑鍔熻兘锛屽悜涓婂眰鎻愪緵浜嗙畝娲佺殑API銆傞€氳繃鍔犺浇搴撴枃浠讹紝搴旂敤绋嬪簭鍙互璋冪敤API鍑芥暟鏉ュ疄鐜伴珮鎬ц兘鐨勭綉缁滈€氫俊銆傚ソ鍍忔湁鐐规娊璞★紝涓嬮潰缁撳悎ZeroMQ鐨凱ython鍖?--pyzmp鏉ョ湅涓€涓媄eroMQ鏈€鍩烘湰鐨勪笁绉嶅伐浣滄ā寮忋€傚畨瑁呮柟娉昿ipinstallpyzmq鏌ョ湅鏄惁瀹夎鎴愬姛>>>importzmq>>>print(zmq.__version__)22.0.3Request-Reply锛堣姹傚搷搴旀ā寮忥級Request-Reply妯″紡姒傝堪锛氭秷鎭槸鍙屽悜鐨勶紝鏈変紶鍏ュ拰澶栧彂娑堟伅銆侰lient璇锋眰鐨勬秷鎭紝Server蹇呴』鍥炲缁機lient銆傚鎴风璇锋眰鍚庯紝鏈嶅姟鍣ㄥ繀椤诲搷搴斻€傛敞鎰忥細濡傛灉鏈嶅姟鍣ㄦ病鏈夎繑鍥炲搷搴旓紝鍒欎細鎶ラ敊銆係erver鍜孋lient閮藉彲浠ユ槸1:N鐨勬ā鍨嬨€傞€氬父锛?琚涓烘槸Server锛孨琚涓烘槸Client銆備笅灞傜鐐瑰湴鍧€瀵逛笂灞傛槸闅愯棌鐨勶紝姣忎竴涓姹傞兘闅愬惈鐫€涓€涓搷搴斿湴鍧€锛屽簲鐢ㄧ▼搴忎笉鍏冲績瀹冦€俍MQ鍙互寰堝ソ鐨勬敮鎸佽矾鐢卞姛鑳斤紙瀹炵幇璺敱鍔熻兘鐨勭粍浠剁О涓篋evice锛夛紝灏?:N鎵╁睍涓篘:M锛堝彧闇€瑕佸鍔犲嚑涓矾鐢辫妭鐐癸級銆傚鎴风python瀹炵幇#client.pyimportzmqcontext=zmq.Context()#Sockettotalktoserverprint("Connectingtohelloworldserver鈥?)socket=context.socket(zmq.REQ)socket.connect("tcp://localhost:5555")socket.send(b"Hello")#鑾峰彇reply.message=socket.recv()print(f"Receivedreply[{message}]")鏈嶅姟鍣ㄧpython瀹炵幇#server.pyimporttimeimportzmqcontext=zmq.Context()socket=context.socket(zmq.REP)socket.bind("tcp://*:5555")whileTrue:#绛夊緟鏉ヨ嚜瀹㈡埛绔殑涓嬩竴涓姹俶essage=socket.recv()print("Receivedrequest:%s"%message)#Dosome'work'time.sleep(1)#Sendreplybacktoclientsocket.send(b"World")startclient.py灏嗛鍏堟墦鍗癈onnectingtohelloworldserver銆?.浣嗕笉浼氭敹鍒颁换浣曟秷鎭€傜劧鍚庡惎鍔╯erver.py锛屽鎴风鏀跺埌瀹㈡埛绔殑璇锋眰锛歜'Hello'姝ゆ椂瀹㈡埛绔敹鍒版湇鍔″櫒鐨勫洖澶嶏細[b'World']pythonclient.pyConnectingtohelloworldserver...Receivedreply[b'World']pythonserver.pyReceivedrequest:b'Hello'浣犲彲浠ヨ瘯涓€涓嬶紝澶氳窇鍑犱釜client.py鐪嬬湅鏄粈涔堟儏鍐点€侾ublish/Subscribe锛堣闃?鍙戝竷妯″紡锛塒ub-Subs妯″紡姒傝堪锛氬崟鍚戞秷鎭紝涓€涓彂甯冭€咃紝澶氫釜璁㈤槄鑰咃紱鍙戝竷鑰呭彧浜х敓鏁版嵁锛屽彂甯冭€呭彂甯冧竴鏉℃秷鎭紝澶氫釜璁㈤槄鑰呭悓鏃舵帴鏀跺埌鐨勬秷鎭彲浠ヤ娇鐢ㄣ€傚彂甯冭€呬笉闇€瑕佸叧蹇冭闃呰€呯殑鍔犲叆鍜岀寮€锛屾秷鎭細浠?:N鐨勬柟寮忎紶鎾粰姣忎釜璁㈤槄鑰呫€傚悜鎵€鏈夊鎴风骞挎挱锛屾病鏈夐槦鍒楃紦瀛橈紝鏂紑杩炴帴鐨勬暟鎹皢姘歌繙涓㈠け銆傚鏋淧ublish绔紑濮嬪彂甯冧俊鎭椂Subscribe绔繕娌℃湁杩炴帴涓婏紝鍒欒淇℃伅灏嗚鐩存帴涓㈠純銆侾UB鍜孲UB涔嬮棿璋佺粦瀹氳皝杩炴帴娌℃湁涓ユ牸鐨勮姹傦紙铏界劧鏈川涓婃病鏈夊尯鍒級锛屼絾鏄繕鏄帹鑽怭UB浣跨敤bind锛孲UB浣跨敤connect銆備娇鐢⊿UB璁剧疆璁㈤槄鏃讹紝蹇呴』浣跨敤zmq_setsockopt()鏉ヨ繃婊ゆ秷鎭€傝繖閲岀洿鎺ュ弬鑰冨畼鏂规枃妗d緥瀛愶細Publisher锛氱被浼间簬涓€涓ぉ姘旀洿鏂版湇鍔″櫒锛屽畠鍚戣闃呰€呭彂閫佸ぉ姘旀洿鏂帮紝鍖呮嫭閭斂缂栫爜銆佹俯搴︺€佹箍搴︾瓑淇℃伅#Publisher.pyimportzmqfromrandomimportrandrangecontext=zmq.Context()socket=context.socket(zmq.PUB)socket.bind("tcp://*:5556")whileTrue:zipcode=randrange(1,100000)temperature=randrange(-80,135)relhumidity=randrange(10,60)socket.send_string("%i%i%i"%(zipcode,temperature,relhumidity))Subscriber:瀹冪洃鍚彂甯冭€呮洿鏂扮殑鏁版嵁娴侊紝杩囨护骞朵笖鍙帴鏀朵笌鐗瑰畾閭斂缂栫爜鐩稿叧鐨勫ぉ姘斾俊鎭紝骞堕粯璁ゆ帴鏀?0鏉℃暟鎹?Subscribe銆俻yimportsysimportzmq#Sockettotalktoservercontext=zmq.Context()socket=context.socket(zmq.SUB)print("Collectingupdatesfromweatherserver...")socket.connect("tcp://localhost:5556")#璁㈤槄閭斂缂栫爜锛岄粯璁や负NYC锛?0001zip_filter=sys.argv[1]iflen(sys.argv)>1else"10001"#Python2-ascii瀛楄妭鍒皍nicodestrifisinstance(zip_filter,bytes):zip_filter=zip_filter.decode('ascii')琚滃瓙et.setsockopt_string(zmq.SUBSCRIBE,zip_filter)#Process5updatestotal_temp=0forupdate_nbrinrange(5):string=socket.recv_string()閭斂缂栫爜锛屾俯搴︼紝relhumidity=string.split()total_temp+=int(temperature)print("Averagetemperatureforzipcode'%s'was%dF"%(zip_filter,total_temp/(update_nbr+1)))Push/Pull(pipelinemode)绠¢亾妯″紡姒傝堪锛氫富瑕佺敤浜庡崟鍚戝浠诲姟骞惰娑堟伅锛屼笌骞朵笖娌℃湁鍚庤儗銆傚浜庝换浣昉ush娑堟伅锛屾€绘槸鍙湁涓€涓狿ull绔細鏀跺埌璇ユ秷鎭€侾ush绔垨鑰匬ull绔兘鍙互鏄湇鍔″櫒锛岀粦瀹氬埌鏌愪釜鍦板潃锛岀瓑寰呭鏂硅闂€傚鏋滄湁澶氫釜PULL缁堢鍚屾椂杩炴帴鍒癙USH缁堢锛孭USH缁堢浼氬湪鍐呴儴杩涜璐熻浇鍧囪 锛屼娇鐢ㄥ潎鍖€鍒嗛厤绠楁硶灏嗘墍鏈夋秷鎭潎琛″垎閰嶇粰PULL缁堢銆傚畠鐢变笁閮ㄥ垎缁勬垚锛孭ush鐢ㄤ簬鏁版嵁鎺ㄩ€侊紝work鐢ㄤ簬鏁版嵁缂撳瓨锛孭ull鐢ㄤ簬鏁版嵁绔炰簤鑾峰彇澶勭悊銆傚瓨鍦ㄦ暟鎹紦瀛樺拰澶勭悊璐熻浇銆傚綋杩炴帴鏂紑鏃讹紝鏁版嵁涓嶄細涓㈠け锛岄噸杩炲悗鏁版嵁浼氱户缁彂閫佺粰瀵圭銆俈entilator浣跨敤SOCKET_PUSH灏嗕换鍔″垎鍙戠粰Worker鑺傜偣銆傚湪Worker鑺傜偣涓婏紝浣跨敤SOCKET_PULL鎺ュ彈涓婃父鐨勪换鍔★紝浣跨敤SOCKET_PUSH灏嗙粨鏋滆仛鍚堝埌Sink銆傚€煎緱娉ㄦ剰鐨勬槸锛岃繕鏈変竴涓敤浜庝换鍔″垎鍙戠殑璐熻浇鍧囪 璺敱鍔熻兘銆傚伐浜哄彲浠ラ殢鏃惰嚜鐢卞姞鍏ワ紝鍛煎惛鏈哄彲浠ュ潎琛″垎閰嶄换鍔°€侾ush/Pull妯″紡闈炲父甯哥敤銆傝繖閲屾垜浠富瑕佹祴璇曞畠鐨勮礋杞藉潎琛°€俈entilator#ventilator.pyimportzmqimporttimecontext=zmq.Context()socket=context.socket(zmq.PUSH)socket.bind("tcp://*:5557")whileTrue:socket.send(b"test")鎵撳嵃("Sent")time.sleep(1)worker#worker.pyimportzmqcontext=zmq.Context()recive=context.socket(zmq.PULL)recive.connect('tcp://127.0.0.1:5557')鍙戜欢浜?context.socket(zmq.PUSH)sender.connect('tcp://127.0.0.1:5558')whileTrue:data=recive.recv()print("work1isforwarding...")sender.send(data)sink#sink.pyimportzmqimportsyscontext=zmq.Context()socket=context.socket(zmq.PULL)socket.bind("tcp://*:5558")whileTrue:response=socket.recv()鎵撳嵃("response:%s"%response)鎵撳紑4涓猅erminal锛岃繍琛宲ythonsink.pythonworker.pythonworker.pythonventilator.py鎬荤粨娑堟伅妯″瀷鍙互鏍规嵁闇€瑕佺粍鍚堜娇鐢紝鍚庨潰鐨勪唬鐞嗘ā寮忓拰璺敱妯″紡閮芥槸涓夌鍩烘湰妯″紡鐨勬墿灞曟垨鍙樹綋銆傜户缁帰绱紝璇风Щ姝ュ畼鏂规枃妗p煍金煍紾ithub锛歨ttps://github.com/zeromq/pyzmq鏂囨。锛歨ttps://zeromq.github.io/pyzmq/鎸囧崡锛歨ttp://zguide.zeromq.org/py:鎵€鏈塸ypi:https://pypi.org/project/pyzmq/
