Skip to content Skip to sidebar Skip to footer

Asyncio Persisent Client Protocol Class Using Queue

I'm trying to get my head around the Python 3 asyncio module, in particular using the transport/protocol API. I want to create a publish/subscribe pattern, and use the asyncio.Prot

Solution 1:

Your server is fine as-is for what you're trying to do; your code as written actually keeps the TCP connection alive, it's you just don't have the plumbing in place to continously feed it new messages. To do that, you need to tweak the client code so that you can feed new messages into it whenever you want, rather than only doing it when the connection_made callback fires.

This is easy enough; we'll add an internal asyncio.Queue to the ClientProtocol which can receive messages, and then run a coroutine in an infinite loop that consumes the messages from that Queue, and sends them on to the server. The final piece is to actually store the ClientProtocol instance you get back from the create_connection call, and then pass it to a coroutine that actually sends messages.

import asyncio
import json

classSubscriberClientProtocol(asyncio.Protocol):
    def__init__(self, loop):
        self.transport = None
        self.loop = loop
        self.queue = asyncio.Queue()
        self._ready = asyncio.Event()
        asyncio.async(self._send_messages())  # Or asyncio.ensure_future if using 3.4.3+    @asyncio.coroutinedef_send_messages(self):
        """ Send messages to the server as they become available. """yieldfrom self._ready.wait()
        print("Ready!")
        whileTrue:
            data = yieldfrom self.queue.get()
            self.transport.write(data.encode('utf-8'))
            print('Message sent: {!r}'.format(message))

    defconnection_made(self, transport):
        """ Upon connection send the message to the
        server

        A message has to have the following items:
            type:       subscribe/unsubscribe
            channel:    the name of the channel
        """
        self.transport = transport
        print("Connection made.")
        self._ready.set()

    @asyncio.coroutinedefsend_message(self, data):
        """ Feed a message to the sender coroutine. """yieldfrom self.queue.put(data)

    defdata_received(self, data):
        """ After sending a message we expect a reply
        back from the server

        The return message consist of three fields:
            type:           subscribe/unsubscribe
            channel:        the name of the channel
            channel_count:  the amount of channels subscribed to
        """print('Message received: {!r}'.format(data.decode()))

    defconnection_lost(self, exc):
        print('The server closed the connection')
        print('Stop the event loop')
        self.loop.stop()

@asyncio.coroutinedeffeed_messages(protocol):
    """ An example function that sends the same message repeatedly. """
    message = json.dumps({'type': 'subscribe', 'channel': 'sensor'},
                         separators=(',', ':'))
    whileTrue:
        yieldfrom protocol.send_message(message)
        yieldfrom asyncio.sleep(1)

if __name__ == '__main__':
    message = json.dumps({'type': 'subscribe', 'channel': 'sensor'},
                         separators=(',', ':'))

    loop = asyncio.get_event_loop()
    coro = loop.create_connection(lambda: SubscriberClientProtocol(loop),
                                  '127.0.0.1', 10666)
    _, proto = loop.run_until_complete(coro)
    asyncio.async(feed_messages(proto))  # Or asyncio.ensure_future if using 3.4.3+try:
        loop.run_forever()
    except KeyboardInterrupt:
        print('Closing connection')
    loop.close()

Post a Comment for "Asyncio Persisent Client Protocol Class Using Queue"