A discussion on how to create a fast, non-blocking messaging bus in Python.


The discussion here is to try and create a message bus, which is :
1. fast
2. simple
3. scalable
4. language agnostic
5. does not change order of received packets

We will work here with Python. Python is a loosely typed language, easy to learn, great to code, and perfect to demo new ideas.

Day 1: Lets use something thats already there in the market. Pyee (https://github.com/jfhbrook/pyee)  is an open library created with the intention of event emission. It transfers messages based on "skills". What it basically does is to associate pre-defined handlers for a given skill. When a packet with a skill is "emitted", all the handlers who were associated with this particular skill are all invked one after another. This gives the feeling of transferring the message from the current context to the context of the handlers, effectively passing the message.

In a short program here, I will try to demonstrate what happens in the whole cycle. Assume, thread1 is an emitter thread that is continuously generating "event"s with the name 'data'. Say, thread2 is a receiver thread that receives the event, and then retransmits the same data packet back with the event name 'reply'. We will implement the threads here through an object subclassed from threading.Thread.

The  emitter 'thread1' will generate events; here lets try to time the interval needed to "emit" the message.
s=datetime.now()
self.ee.emit('data',i)
e=datetime.now()
print ("THREAD1 : Thread blocked during last emit=",(e-s).total_seconds()*1000,"msec")


The event handler in the receiver 'thread2' looks like:
@self.ee.on('data')
def evthandler(data):
print ("THREAD2 : Received event on msging bus")
self.q.put_nowait(data)

NOTE : You will notice here that the receiver thread just stores the data in a local 'queue' object. Nothing more is done.

The run() function in receiver 'thread2' will loop through all items in the local queue and emit them back into the messaging bus with event-name 'reply'. The code looks like:

def run(self):
while not self.stopflag:
while not self.q.empty():
print ("THREAD2: Emitting reply into msging bus")
s=datetime.now()
self.ee.emit('reply',self.q.get_nowait())
e=datetime.now()
print ("THREAD2: Emission of reply done")
print ("THREAD2: Thread blocked during last emit=",(e-s).total_seconds()*1000,"msec")
time.sleep(0.01)

The event handler for event name 'reply' in emitter thread 'thread1' looks like:
@self.ee.on('reply')
def evthandler(data):
t=datetime.now()
print ("THREAD1 : Reply event handler triggered")
dataint=int(data)
if dataint in self.senttime.keys():
self.recvtime[dataint]=t



Running the 2 threads, we get events in the following order/format:

THREAD1: 2017-09-06 22:37:00.336504  Emitting data into msging bus
THREAD2: 2017-09-06 22:37:00.336621  Received event on msging bus
THREAD1: 2017-09-06 22:37:00.336756  Emission of data done
THREAD1: Thread blocked during last emit= 0.15100000000000002 msec
THREAD2: 2017-09-06 22:37:00.345754 Emitting reply into msging bus
THREAD1: 2017-09-06 22:37:00.345949  Reply event handler triggered
THREAD2: 2017-09-06 22:37:00.346050  Emission of reply done
THREAD2: Thread blocked during last emit= 0.184 msec

And ...

Round trip latency for 2 is= 9.349 msec


This shows that the emitter 'thread1' was blocked for 151 usec for the emit function to finish. The receiver 'thread2' which re-emits the received data back is blocked for 184usec during the 'emit' operation. However, the complete round trip of the event starting from 'emit' from thread1 to being received, processed, re-emitted back by thread2, then received back by thread1 took about 9.3msec.

Part of this high 'round trip time' is accredited to the temporary storage and looping through the storage to pull out the data, in thread2. This is a cost of making the 2 threads (emitter and receiver) non-blocking, and therefore capable of handling demanding functions.

From this we can see that:
1. To get the best non-blocking states in both emitter and transmitter threads, only bare necessary instructions must be done inside the event handler functions of any given event.
2. There is no 'easy' way to concurrently process and emit multiple events from the emitter thread without getting into complexities of multithreading, queues for sharing data with the new 'emitter-only' thread.
- An added downside of this is that control to the emitter thread will not come back immediately after an emission. Though multithreading will quicken the control coming back to the emitter, it will nevertheless be slow.
3. If, for a given event, an event handler in even one out of many receiver threads is running a slow operation, the emitter thread can be blocked nonetheless.
4. Storing events locally in the emitter is never a good idea, and therefore running a non-blocking continuous loop of emitting events on all emitter threads is very important.
- especially if you are planning to use 'queue' object, as 'queue' insertion is a staggeringly slow process and could sometimes taken up more than 1 millisecond to complete.


--------------------------------------------------------------------------------------------------------------------------

The complete code of the example emitter and receiver threads used here:

Emitter thread:
import threading
import socket
import time
import queue

from datetime import datetime
from pyee import EventEmitter
from multiprocessing.pool import ThreadPool

class thread1(threading.Thread):

        def __init__(self,ee):
                self.ee = ee
                self.sentdata='a'*1024
                self.senttime=dict()
                self.recvdata=None
                self.recvtime=dict()

                @self.ee.on('reply')
                def evthandler(data):
                        t=datetime.now()
                        print ("THREAD1:",str(datetime.now())," Reply event handler triggered")
                        dataint=int(data)
                        if dataint in self.senttime.keys():
                                self.recvtime[dataint]=t

                threading.Thread.__init__(self)


        def run(self):

                for i in range(10):
                        print ("\n\nTHREAD1:",str(datetime.now())," Emitting data into msging bus")
                        s=datetime.now()
                        self.ee.emit('data',i)
                        e=datetime.now()
                        self.senttime[i] = s
                        print ("THREAD1:",str(datetime.now())," Emission of data done")
                        print ("THREAD1: Thread blocked during last emit=",(e-s).total_seconds()*1000,"msec")
                        time.sleep(0.01)

                # Calculate latency for all the events now
                for i in range(10):
                        if i in self.recvtime.keys() and i in self.senttime.keys():
                                d=(self.recvtime[i] - self.senttime[i]).total_seconds()*1000
                                print ("Round trip latency for",i,"is=",d,"msec")


--------------------------------------------------------------------------------------------------------------------------

Receiver thread:

import threading
import time
import queue

from datetime import datetime
from pyee import EventEmitter

class thread2(threading.Thread):

        def __init__(self,ee):
                self.ee=ee
                self.stopflag=False
                self.q=queue.Queue()

                @self.ee.on('data')
                def evthandler(data):
                        print ("THREAD2:",str(datetime.now())," Received event on msging bus")
                        self.q.put_nowait(data)

                threading.Thread.__init__(self)

        def run(self):
                while not self.stopflag:
                        while not self.q.empty():
                                print ("THREAD2:",str(datetime.now()),"Emitting reply into msging bus")
                                s=datetime.now()
                                self.ee.emit('reply',self.q.get_nowait())
                                e=datetime.now()
                                print ("THREAD2:",str(datetime.now())," Emission of reply done")
                                print ("THREAD2: Thread blocked during last emit=",(e-s).total_seconds()*1000,"msec")
                        time.sleep(0.01)

        def stop(self):
                self.stopflag=True

-------------------------------------------------------------------------------------------------------------------

Should you want the complete running output:

Starting thread2
Starting thread1


THREAD1: 2017-09-06 22:37:00.315206  Emitting data into msging bus
THREAD2: 2017-09-06 22:37:00.315595  Received event on msging bus
THREAD1: 2017-09-06 22:37:00.315750  Emission of data done
THREAD1: Thread blocked during last emit= 0.19 msec
THREAD2: 2017-09-06 22:37:00.324726 Emitting reply into msging bus
THREAD1: 2017-09-06 22:37:00.324925  Reply event handler triggered
THREAD2: 2017-09-06 22:37:00.325023  Emission of reply done
THREAD2: Thread blocked during last emit= 0.19 msec


THREAD1: 2017-09-06 22:37:00.326059  Emitting data into msging bus
THREAD2: 2017-09-06 22:37:00.326177  Received event on msging bus
THREAD1: 2017-09-06 22:37:00.326314  Emission of data done
THREAD1: Thread blocked during last emit= 0.152 msec
THREAD2: 2017-09-06 22:37:00.335248 Emitting reply into msging bus
THREAD1: 2017-09-06 22:37:00.335434  Reply event handler triggered
THREAD2: 2017-09-06 22:37:00.335533  Emission of reply done
THREAD2: Thread blocked during last emit= 0.179 msec


THREAD1: 2017-09-06 22:37:00.336504  Emitting data into msging bus
THREAD2: 2017-09-06 22:37:00.336621  Received event on msging bus
THREAD1: 2017-09-06 22:37:00.336756  Emission of data done
THREAD1: Thread blocked during last emit= 0.15100000000000002 msec
THREAD2: 2017-09-06 22:37:00.345754 Emitting reply into msging bus
THREAD1: 2017-09-06 22:37:00.345949  Reply event handler triggered
THREAD2: 2017-09-06 22:37:00.346050  Emission of reply done
THREAD2: Thread blocked during last emit= 0.184 msec


THREAD1: 2017-09-06 22:37:00.346949  Emitting data into msging bus
THREAD2: 2017-09-06 22:37:00.347062  Received event on msging bus
THREAD1: 2017-09-06 22:37:00.347189  Emission of data done
THREAD1: Thread blocked during last emit= 0.14200000000000002 msec
THREAD2: 2017-09-06 22:37:00.356267 Emitting reply into msging bus
THREAD1: 2017-09-06 22:37:00.356455  Reply event handler triggered
THREAD2: 2017-09-06 22:37:00.356556  Emission of reply done
THREAD2: Thread blocked during last emit= 0.182 msec


THREAD1: 2017-09-06 22:37:00.357377  Emitting data into msging bus
THREAD2: 2017-09-06 22:37:00.357491  Received event on msging bus
THREAD1: 2017-09-06 22:37:00.357625  Emission of data done
THREAD1: Thread blocked during last emit= 0.148 msec
THREAD2: 2017-09-06 22:37:00.366783 Emitting reply into msging bus
THREAD1: 2017-09-06 22:37:00.366953  Reply event handler triggered
THREAD2: 2017-09-06 22:37:00.367050  Emission of reply done
THREAD2: Thread blocked during last emit= 0.174 msec


THREAD1: 2017-09-06 22:37:00.367809  Emitting data into msging bus
THREAD2: 2017-09-06 22:37:00.367922  Received event on msging bus
THREAD1: 2017-09-06 22:37:00.368062  Emission of data done
THREAD1: Thread blocked during last emit= 0.15 msec
THREAD2: 2017-09-06 22:37:00.377270 Emitting reply into msging bus
THREAD1: 2017-09-06 22:37:00.377435  Reply event handler triggered
THREAD2: 2017-09-06 22:37:00.377532  Emission of reply done
THREAD2: Thread blocked during last emit= 0.175 msec


THREAD1: 2017-09-06 22:37:00.378245  Emitting data into msging bus
THREAD2: 2017-09-06 22:37:00.378357  Received event on msging bus
THREAD1: 2017-09-06 22:37:00.378487  Emission of data done
THREAD1: Thread blocked during last emit= 0.14300000000000002 msec
THREAD2: 2017-09-06 22:37:00.387746 Emitting reply into msging bus
THREAD1: 2017-09-06 22:37:00.387920  Reply event handler triggered
THREAD2: 2017-09-06 22:37:00.388017  Emission of reply done
THREAD2: Thread blocked during last emit= 0.176 msec


THREAD1: 2017-09-06 22:37:00.388679  Emitting data into msging bus
THREAD2: 2017-09-06 22:37:00.388794  Received event on msging bus
THREAD1: 2017-09-06 22:37:00.388928  Emission of data done
THREAD1: Thread blocked during last emit= 0.148 msec
THREAD2: 2017-09-06 22:37:00.398230 Emitting reply into msging bus
THREAD1: 2017-09-06 22:37:00.398397  Reply event handler triggered
THREAD2: 2017-09-06 22:37:00.398492  Emission of reply done
THREAD2: Thread blocked during last emit= 0.171 msec


THREAD1: 2017-09-06 22:37:00.399110  Emitting data into msging bus
THREAD2: 2017-09-06 22:37:00.399220  Received event on msging bus
THREAD1: 2017-09-06 22:37:00.399351  Emission of data done
THREAD1: Thread blocked during last emit= 0.14400000000000002 msec
THREAD2: 2017-09-06 22:37:00.408710 Emitting reply into msging bus
THREAD1: 2017-09-06 22:37:00.408875  Reply event handler triggered
THREAD2: 2017-09-06 22:37:00.408968  Emission of reply done
THREAD2: Thread blocked during last emit= 0.16699999999999998 msec


THREAD1: 2017-09-06 22:37:00.409541  Emitting data into msging bus
THREAD2: 2017-09-06 22:37:00.409656  Received event on msging bus
THREAD1: 2017-09-06 22:37:00.409786  Emission of data done
THREAD1: Thread blocked during last emit= 0.146 msec
THREAD2: 2017-09-06 22:37:00.419188 Emitting reply into msging bus
THREAD1: 2017-09-06 22:37:00.419358  Reply event handler triggered
THREAD2: 2017-09-06 22:37:00.419457  Emission of reply done
THREAD2: Thread blocked during last emit= 0.177 msec
Round trip latency for 0 is= 9.369 msec
Round trip latency for 1 is= 9.276 msec
Round trip latency for 2 is= 9.349 msec
Round trip latency for 3 is= 9.412 msec
Round trip latency for 4 is= 9.481 msec
Round trip latency for 5 is= 9.532 msec
Round trip latency for 6 is= 9.581 msec
Round trip latency for 7 is= 9.622 msec
Round trip latency for 8 is= 9.673 msec
Round trip latency for 9 is= 9.721 msec
Program finished




Part 2: Scaling it up.

We can already see here that the message passing in this mechanism cannot pass. If using python, where the language switches between user and kernel everytime there is a context switch (the interpreter needs the kernel to context switch from software thread to thread), this gets even slower. Should you be using Goroutines here, things might be easier with the sceduling happening inside the process.
Secondly, the emitter of the event is blocked until the receiver of the event returns control. Should there be multiple emitters and multiple receivers, you can already see the delays piling up. The Global Interpreter lock in python brings in even more delays.

A more scalable option  would therefore be to prevent the pyee object from being the reason to transfer control from emitter to receiver, and thereby blocking the threads. A programmer's easy solution would therefore be to declare this object as a global variable rather than have it being shared by multiple threads and implicitly falling into the delay traps of threading.Lock() and the global interpreter lock. If you wanted to create a single EventEmitter object for the whole messaging passing system, and also have this object as a global variable in a program, the only solution would be for each thread to:
1. send and forget about the data => leading to no blockage on the ingress side
2. asynchronous write to the receiver => push data to the receiver.

Both these criteria is met by WebSockets !!! Lets try to imagine how this can be done.
Lets say we still have the 2 threads -> ingress thread (emitter) , egress thread(receiver).
If the ingress thread were to be non-blocking it must send and forget. WebSockets help to achieve this by sending to a server socket, and then returning the call immediately.
If the egress thread was to be pushed in data, whenever there was data to push to it, WebSockets can do this by inherent design. Python implementation of a 'websocket-client' invokes a callback whenever there is message received in (no polling is done to receive the data).

But then we need a way to stitch the ingress and the egress threads, such that ingress can send some data, which would land into an EventEmitter object, after which the ingress thread is immediately unblocked. The EventEmitter would then be allowed to do a quick context switch, and return the data. This can be done by having a 'write_message' being defined inside the handler for the specified skill. This stitching would therefore need a service program that allows other clients wanting to exchange data to connect in. The service should understand the skills wanted by each of its clients, and on receiving data from any client, it should be able to push data down to another client listening for the particular skill.

... Example code coming up soon ...

Comments

Popular posts from this blog

Working with Redis queues using rmq library in Golang