published : 2018-05-06 changed: 2018-07-30
One of the many advantages of Morphium is the integrate messaging system. This is used for synchronizing the caches in a clustered environment for example. It is part of Morphium for quite some time, it was introduced with one of the first releases.
Messaging uses a sophisticated locking mechanism in order to ensure that messages for one recipient will only get there. Unfortunately this is usually being solved using polling, which means querying the db every now and then. Since Morphium V3.2.0 we can use the OplogMonitor for Messaging. This creates kind of a "Push" for new messages, which means that the DB informs clients about incoming messages.
This reduces load and increases speed. Lets have a look how that works...
As mentioned above with V3.2.0 we need to destinguish 2 cases: are we connected to a replicaset (only then there is an oplog the listener could listen to) or not.
No replicaset is also true, if you are connected to a sharded cluster via MongoS. Here also messaging uses polling to get the data. Of course, this can be configured. Like, how long should the system pause between polls, should messages be processed one by one or in a bulk...
All comes down to the locking. The algorithm looks like this (you can have a closer look at
Messaging.java for mor details):
The OplogMonitor is part of Morphium for quite a while now. It uses a
TailableCursor on the oplog to get informed about changes. A tailable cursor will stay open, even if thera are no more matching documents. But will send all incoming documents to the client. So the client gets informed about all changes in the system.
With morphium 4.0 we use the change stream instead the oplog to get polling of messages done. This is working as efficient, but does not need admin access.
So why not use a TailableCursor directly on the Msg-Collection then? for several reasons:
Messaging based on the OplogMonitor looks quite similar to the algorithm above, but the polling simplifies things a bit. on new messages, this happens:
usually, when an update on messages comes in, nothing interesting happens. But for being able to reject messages (see below) we just start the locking mechanism to be sure.
Well, that is quite simple. Kust create an instance of
Messaging and hit
of course, you could instanciate it using spring or something.
to send a message, just do:
this message here does have a ttl (time to live) of 5 secs. The default ttl is 30secs. Older messages will automatically be deleted by mongo.
Messages are broadcast messages by default, meaning, all listeners may process it. if you set the message to be exclusive, only one of the listeners gets the permission to process ist (see locking above).
this message will only be processed by one recipient!
And the sender does not read his own messages!
Of course, you can send a message directly to a specifiy recipient. This happens automatically when sending answers for example. To send a message to a specific recipient you need to know his UUID. You can get that by messages being sent (sender for example) or you implement some kind of discovery...
in the integration tests of Morphium both methods are being used. The difference is quite simple:
storeMessage stores the message directly do mongodb whereas
queueMessage works asynchronously - which might be the better choice when it comes to performance.
just register a Message listener to the messaging system:
messaging is the messaging system and
message the message that was sent. This listener returns
null, but it could also return a Message, that should be send back as an answer to the sender.
messaging-object, the listener can also publish own messages, which should not be answers or something.
in addition to that, the listener may "reject" a Message by sending a
MessageRejectedException - then the message is unlocked so that all clients might use it again (if it was not sent directly to me).
Within Morphium the
CacheSynchronizer uses Messaging. It needs a messaging system in the constructor.
The implementation of it is not that complicated. The CacheSynchronizer just registers as
MorphiumStorageListener, so that it gets informed about all writing accesses (and only then caches need to be syncrhonized).
on write access, it checks if a cached entity is affected and if so, a
ClearCachemessage is send using messaging. This message also contains the strategy to use (like, clear whole cache, update the element and so on).
Of course, incoming messages also have to be processed by the CacheSynchronizer. But that is quite simple: if a message comes in, erase the coresponding cache mentioned in the message according to the strategy.
And you might send those clear messages manually by accessing the CacheSynchronizer directly.
And we should mention, that there you could be informed about all cache sync activities using a specific listener interface.
the messaging feature of morphium is not well known yet. But it might be used as a simple replacement for full-blown messaging systems and with the new
OplogMonitor-Feature it is even better than it ever was.
created Stephan Bösebeck (stephan)