Asyncio Persisent Client Protocol Class Using Queue
Solution 1:
Your server is fine as-is for what you're trying to do; your code as written actually keeps the TCP connection alive, it's you just don't have the plumbing in place to continously feed it new messages. To do that, you need to tweak the client code so that you can feed new messages into it whenever you want, rather than only doing it when the connection_made
callback fires.
This is easy enough; we'll add an internal asyncio.Queue
to the ClientProtocol
which can receive messages, and then run a coroutine in an infinite loop that consumes the messages from that Queue
, and sends them on to the server. The final piece is to actually store the ClientProtocol
instance you get back from the create_connection
call, and then pass it to a coroutine that actually sends messages.
import asyncio
import json
classSubscriberClientProtocol(asyncio.Protocol):
def__init__(self, loop):
self.transport = None
self.loop = loop
self.queue = asyncio.Queue()
self._ready = asyncio.Event()
asyncio.async(self._send_messages()) # Or asyncio.ensure_future if using 3.4.3+ @asyncio.coroutinedef_send_messages(self):
""" Send messages to the server as they become available. """yieldfrom self._ready.wait()
print("Ready!")
whileTrue:
data = yieldfrom self.queue.get()
self.transport.write(data.encode('utf-8'))
print('Message sent: {!r}'.format(message))
defconnection_made(self, transport):
""" Upon connection send the message to the
server
A message has to have the following items:
type: subscribe/unsubscribe
channel: the name of the channel
"""
self.transport = transport
print("Connection made.")
self._ready.set()
@asyncio.coroutinedefsend_message(self, data):
""" Feed a message to the sender coroutine. """yieldfrom self.queue.put(data)
defdata_received(self, data):
""" After sending a message we expect a reply
back from the server
The return message consist of three fields:
type: subscribe/unsubscribe
channel: the name of the channel
channel_count: the amount of channels subscribed to
"""print('Message received: {!r}'.format(data.decode()))
defconnection_lost(self, exc):
print('The server closed the connection')
print('Stop the event loop')
self.loop.stop()
@asyncio.coroutinedeffeed_messages(protocol):
""" An example function that sends the same message repeatedly. """
message = json.dumps({'type': 'subscribe', 'channel': 'sensor'},
separators=(',', ':'))
whileTrue:
yieldfrom protocol.send_message(message)
yieldfrom asyncio.sleep(1)
if __name__ == '__main__':
message = json.dumps({'type': 'subscribe', 'channel': 'sensor'},
separators=(',', ':'))
loop = asyncio.get_event_loop()
coro = loop.create_connection(lambda: SubscriberClientProtocol(loop),
'127.0.0.1', 10666)
_, proto = loop.run_until_complete(coro)
asyncio.async(feed_messages(proto)) # Or asyncio.ensure_future if using 3.4.3+try:
loop.run_forever()
except KeyboardInterrupt:
print('Closing connection')
loop.close()
Post a Comment for "Asyncio Persisent Client Protocol Class Using Queue"