Writing an "interactive" client with Twisted/Autobahn Websockets
Asked Answered
C

2

8

Maybe I'm missing something here in the asynchronous designs of Twisted, but I can't seem to find a way to call the sendMessage() method "externaly". By this I mean, sending messages without being solely at the callback methods of Twisted/AutobahnWebsockets (like at onOpen or when receiving data from server at onMessage())

Of course I could launch a thread and call my_protocol_instance.sendMessage("hello") but that would defeat every purpose of the asynchronous design right?

In a concrete example, I need to have a top wrapper class which opens the connection and manages it, and whenever I need I call my_class.send_my_toplevel_message(msg). How can I implement this?

Hope I've been clear on my explanation.

Thanks

Collaborative answered 19/9, 2013 at 15:55 Comment(0)
W
3

Why do you need a thread to launch protocolInstance.sendMessage() ? This can be done in a normal reactor loop.

The core of a twisted is reactor and it gives a much easier look at things when you consider twisted itself reactive - meaning it does something as a reaction (response) to something else.

Now I assume that the thread you are talking about, also gets created and made in calling sendMessage because of certain events or activity or status. I can hardly imagine a case where you would just need to send a message out of the blue without any reason to react.

If however there is an event which should trigger sendMessage, there is no need to invoke that in thread: just use twisted mechanisms for catching that event and then calling sendMessage from that particular event's callback.

Now on to your concrete example: can you specify what "whenever I need" means exactly in the context of this question? An input from another connection? An input from the user? Looping activity?

Whist answered 20/9, 2013 at 9:19 Comment(10)
Ok, I kind of start to understand this "reactive" behaviour.So, the event would be any user input like an interactive prompt in which the user enters commands and they are sent to the websocket server. How can twisted catch an event that comes from the user?Collaborative
Or maybe a more straight forward implementation: have a Queue() which holds every message to send (now this Queue can be filled by other threads) and make the main Twisted loop keep fetching messages from it to send to the server?Collaborative
Ar you using Twisted on a client side?Whist
Yes, this is at the client sideCollaborative
The Twisted way of injecting events into the main reactor thread from a background thread is via callFromThread. You may have a look at twistedmatrix.com/documents/current/core/howto/threading.htmlShoestring
And by interactive shell you mean a console / CLI of a sort? Which platform cliet will run: linux?Whist
@oberstet: thanks. I also ended up there and accomplished what I wanted with callFromThread. But because I had that idea that I shouldn't use threads, didn't really looked at it at first. This questions also helped (as they're almost the same as mine): this and thisCollaborative
@Whist yes. But the details are not really important. I just needed a way to keep twisted running without blocking my main program and at the same time, allowing me to trigger send data to it. I will post my solution and ask for your opinion.Collaborative
@Collaborative - the reason I asked about the shell - is that in most of scenarios you will be better off if you will find a twisted library for specific task : twistedmatrix.com/documents/10.0.0/words/examples/… this e.g. gives example how to integrate curses with twisted. There are number of other libraries available for specifc functions in twisted, just as there are specific reactors for varios things. I usually try to avoid the whole thread part in high level application code, an go pure twisted way and back.Whist
@jbreicis: that makes sense and I also have that feeling of "this thread thing doesn't seem 100% right". but for my case (which needs to be as generic as possible) I didn't find any reactor that would solve it easily. I actually started reading about twisted.internet.stdio which pretty much was the design I was looking for and tried implementing my own "bridge", so I could keep Twisted with a truly reactive design but quickly gave up as it was getting too complicated for such a simple thing (implementing it well would be great though...)Collaborative
C
3

I managed to implement what I needed by running Twisted in another thread, keeping my program free to run and allowing it to trigger send data in Twisted with reactor.callFromThread().

What do you think?

# ----- twisted ----------
class _WebSocketClientProtocol(WebSocketClientProtocol):
    def __init__(self, factory):
        self.factory = factory

    def onOpen(self):
        log.debug("Client connected")
        self.factory.protocol_instance = self
        self.factory.base_client._connected_event.set()

class _WebSocketClientFactory(WebSocketClientFactory):
    def __init__(self, *args, **kwargs):
        WebSocketClientFactory.__init__(self, *args, **kwargs)
        self.protocol_instance = None
        self.base_client = None

    def buildProtocol(self, addr):
        return _WebSocketClientProtocol(self)
# ------ end twisted -------

class BaseWBClient(object):

    def __init__(self, websocket_settings):
        self.settings = websocket_settings
        # instance to be set by the own factory
        self.factory = None
        # this event will be triggered on onOpen()
        self._connected_event = threading.Event()
        # queue to hold not yet dispatched messages
        self._send_queue = Queue.Queue()
        self._reactor_thread = None

    def connect(self):
        log.debug("Connecting to %(host)s:%(port)d" % self.settings)
        self.factory = _WebSocketClientFactory(
                                "ws://%(host)s:%(port)d" % self.settings,
                                debug=True)
        self.factory.base_client = self
        c = connectWS(self.factory)
        self._reactor_thread = threading.Thread(target=reactor.run,
                                               args=(False,))
        self._reactor_thread.daemon = True
        self._reactor_thread.start()

    def send_message(self, body):
        if not self._check_connection():
            return
        log.debug("Queing send")
        self._send_queue.put(body)
        reactor.callFromThread(self._dispatch)

    def _check_connection(self):
        if not self._connected_event.wait(timeout=10):
            log.error("Unable to connect to server")
            self.close()
            return False
        return True

    def _dispatch(self):
        log.debug("Dispatching")
        while True:
            try:
                body = self._send_queue.get(block=False)
            except Queue.Empty:
                break
            self.factory.protocol_instance.sendMessage(body)

    def close(self):
        reactor.callFromThread(reactor.stop)
Collaborative answered 23/9, 2013 at 23:17 Comment(3)
Just wondering: why do you need a queue when you use callFromThread .. where you can easily forward the payload as an argument also?Shoestring
@oberstet: yeah, I could have done it simpler.. there's not any strong reason for it, but with that queue I can implement some kind of buffering/throttling if needed (I posted a reduced version of the class. the original is supposed to work like a library which will be imported/extended by other modules and therefore I don't have control over the number of calls to send_message). In short: for now there's not any critical reason. For the future it might be.Collaborative
Keep in mind that merely queuing your stuff from your background thread won't awake the reactor loop on the main thread. It will process the queue only after reactor loop timeout. Using callFromThread does not have this issue (it immediately triggers a reactor loop run).Shoestring

© 2022 - 2024 — McMap. All rights reserved.