工作过程中参考python官方手册实现了一个socket长连接客户端,使用了asyncio库中的streamreader。在实施过程中花费了很多精力。虽然工作有障碍,但感觉并不好。我还是想上网看看。有学习视频吗?这个是在B站找到的,我觉得蟒蛇大神A.JesseJiryuDavis的视频讲解非常通俗易懂。服务器端在学习阶段不知道视频中是如何实现的,所以懒得用了。flask实现了一个超级简单的服务器端fromflaskimportFlaskapp=Flask(__name__)@app.route("/")defhello():return"hellohaha??"@app.route("/foo")deffoo():return"=foo=="*100@app.route("/bar")defbar():return"=bar=="*100if__name__=="__main__":app.run()执行方式:python3flask_server.py执行效果:ubuntu@VM-0-13-ubuntu:~$python3flask_server.py*ServingFlaskapp"flask_server"(lazyloading)*环境:productionWARNING:Thisisadevelopmentserver.不要在生产部署中使用它。请改用生产WSGI服务器。*调试模式:关闭*在http://127.0.0.1:5000/上运行(按CTRL+C退出)127.0.0.1--[28/Nov/202020:08:25]"GET/fooHTTP/1.0》200-socket客户端(v1.0)知识点:选择器,非阻塞socketfromselectorsimportDefaultSelector,EVENT_WRITE,EVENT_READimportsocketimporttimeselector=DefaultSelector()defget(path):s=socket.socket()s.setblocking(False)try:s.connect(('127.0.0.1',5000))除了BlockingIOError:passrequest='GET%sHTTP/1.0\r\n\r\n'%path#可写selector.register(s.fileno(),EVENT_WRITE)selector.select()selector.unregister(s.fileno())s.send(request.encode())chunks=[]whileTrue:#可读selector.register(s.fileno(),EVENT_READ)selector.select()selector.unregister(s.fileno())chunk=s.recv(10)ifchunk:print("recivechunk:%s"%chunk)chunks.append(chunk)else:body=(b''.join(chunks)).decode()print(body.split('\n')[0])returnstart=time.time()foriinrange(10):get('/foo')get('/bar')print('took%.2fsec'%(time.time()-start))执行结果...recivechunk:b'ar===bar=='HTTP/1.0200OKtook0.11秒socket客户端(v1.1)知识点:回调函数callbackfromselectorsimportDefaultSelector,EVENT_WRITE,EVENT_READimportsocketimporttimeselector=DefaultSelector()n_tasks=0defget(path):globaln_taskss=socket.socket()s.setblocking(False)try:s.connect(('127.0.0.1',5000))除了BlockingIOError:passrequest='GET%sHTTP/1.0\r\n\r\n'%pathcallback=lambda:connected(s,request)selector.register(s.fileno(),EVENT_WRITE,data=callback)n_tasks+=1defconnected(s,request):~~~~selector.unregister(s.fileno())#s可写s.send(request.encode())chunks=[]callback=lambda:readable(s,chunks)selector.register(s.fileno(),EVENT_READ,data=callback)defreadable(s,chunks):globaln_tasksselector.unregister(s.fileno())chunk=s.recv(100)ifchunk:print("recivechunk:%s"%chunk)chunks.append(chunk)callback=lambda:可读(s,chunks)选择器.register(s.fileno(),EVENT_READ,data=callback)n_tasks-=1else:body=(b''.join(chunks)).decode()print(body.split('\n')[0])returnstart=time.time()foriinrange(10)get('/foo')get('/bar')whilen_tasks:events=selector.select()forevent,maskinevents:cb=event.datacb()print('took%.2fsec'%(time.time()-start))socket客户端(v1.2)知识点:未来对象fromselectorsimportDefaultSelector,EVENT_WRITE,EVENT_READimportsocketimporttimeselector=DefaultSelector()n_tasks=0classFuture:def__init__(self):self.callbacks=[]defresolve(self):forcinself.callbacks:c()defget(path):globaln_taskss=socket.socket()s.setblocking(False)try:s.connect(('127.0.0.1',5000))除了BlockingIOError:passrequest='GET%sHTTP/1.0\r\n\r\n'%pathcallback=lambda:connected(s,request)fut=Future()fut.callbacks.append(callback)selector.register(s.fileno(),EVENT_WRITE,data=fut)n_tasks+=1defconnected(s,request):selector.unregister(s.fileno())#s可写s.send(request.encode())chunks=[]callback=lambda:readable(s,chunks)fut=Future()fut.callbacks.append(callback)selector.register(s.fileno(),EVENT_READ,data=fut)defreadable(s,chunks):globaln_tasksselector.unregister(s.fileno())chunk=s.recv(100)ifchunk:print("recivechunk:%s"%chunks.append(chunk)回调=lambda:readable(s,chunks)fut=Future()fut.callbacks.append(callback)selector.register(s.fileno(),EVENT_READ,data=fut)n_tasks-=1else:body=(b''.join(chunks)).decode()print(body.split('\n')[0])returnstart=time.time()foriinrange(10)get('/foo')get('/bar')whilen_tasks:events=selector.select()forevent,maskinevents:fut=event.datafut.resolve()print('took%.2fsec'%(time.time()-start))socket客户端(v1.3)知识点:任务,协程序fromselectorsimportDefaultSelector,EVENT_WRITE,EVENT_READimportsocketimporttimeselector=DefaultSelector()n_tasks=0classFuture:def__init__(self):self.callbacks=[]defresolve(self):forcinself.callbacks:c()classTask:def__init__(self,gen):self.gen=genself.step()defstep(self):try:f=next(self.gen)exceptStopIteration:returnf.callbacks.append(self.step)defget(path):globaln_tasksn_tasks+=1s=socket.socket()s.setblocking(False)try:s.connect(('127.0.0.1',5000))除了BlockingIOError:passrequest='GET%sHTTP/1.0\r\n\r\n'%路径fut=Future()selector.register(s.fileno(),EVENT_WRITE,data=fut)yieldfutselector.unregister(s.fileno())s.send(request.encode())chunks=[]而True:fut=Future()selector.register(s.fileno(),EVENT_READ,data=fut)yieldfutselector.unregister(s.fileno())chunk=s.recv(100)ifchunk:print("recivechunk:%s"%chunk)chunks.append(chunk)else:body=(b''.join(chunks)).decode()print(body.split('\n')[0])n_tasks-=1start=time.time()foriinrange(10)Task(get('/foo'))Task(get('/bar'))whilen_tasks:events=selector.select()forevent,maskinevents:fut=event.datafut.resolve()print('took%.2fsec'%(time.time()-start))ECHO小例子,server,clientserver端:importsocketimportasynciofromselectorsimportDefaultSelector,EVENT_WRITE,EVENT_READclassServer(object):MAX_BUF=1024def__init__(self,port):self._client=[]self._loop=asyncio.new_event_loop()self._selector=DefaultSelector()self._sock=self.socket_init()defsocket_init(self):sock=socket.socket()sock.bind(('127.0.0.1',5000))sock.listen(Server.MAX_BUF)self._selector.register(sock,EVENT_READ,self.accept_client)returnsockdefaccept_client(self,sock,mask):conn,addr=sock.accept()print(f"recvaconn:{conn}来自{addr}")self._client.append(conn)print(f"nowhasclient:{self._client}")conn.setblocking(False)self._selector.register(conn,EVENT_READ,self.read_from_client)defread_from_client(self,conn,mask):data=conn.recv(Server.MAX_BUF)ifdata:print(f"recvdatafromclient:{data}")conn.send(data)#echofirsttodoimporttimetime.sleep(10)else:self._selector.unregister(conn)conn.close()defrun_forever(self):whileTrue:events=self._selector.select()forevent,maskinevents:cb=event.datacb(event.fileobj,mask)if__name__=='__main__':ser=Server(5000)ser.run_forever()客户端:importasynciofromconcurrent.futuresimportTimeoutErrorasyncdeftcp_echo_client(message):reader,writer=awaitasyncio.open_connection('127.0.0.1',5000)print(f'Send:{message!r}')writer.write(message.encode())而不是reader.at_eof():print(type(reader))print(reader)print(type(writer))print(writer)try:writer.write(message.encode())data=等待asyncio.wait_for(reader.read(1),2)print(f'Received:{data.decode()!r}')除了TimeoutError:importtracebackprint("%s"%traceback.format_exc())asyncio.get_event_loop().run_until_complete(tcp_echo_client('地狱oWorld!'))参考-[1][一个使用asyncio协程的网络爬虫](https://linux.cn/article-8265-1.html)-[2][Python协程工作原理](https://www.bilibili.com/video/BV1N4411S7BP?t=32&p=2)
