MongoDB Messaging via Morphium

published : 2018-05-06 changed: 2018-05-06

category: Computer --> programming --> MongoDB --> morphium


One of the many advantages of Morphium is the integrate messaging system. This is used for synchronizing the caches in a clustered environment for example. It is part of Morphium for quite some time, it was introduced with one of the first releases.

Messaging uses a sophisticated locking mechanism in order to ensure that messages for one recipient will only get there. Unfortunately this is usually being solved using polling, which means querying the db every now and then. Since Morphium V3.2.0 we can use the OplogMonitor for Messaging. This creates kind of a "Push" for new messages, which means that the DB informs clients about incoming messages.

This reduces load and increases speed. Lets have a look how that works...

Messaging in Morphium - how it works

As mentioned above with V3.2.0 we need to destinguish 2 cases: are we connected to a replicaset (only then there is an oplog the listener could listen to) or not.

no Replicaset => Polling

No replicaset is also true, if you are connected to a sharded cluster via MongoS. Here also messaging uses polling to get the data. Of course, this can be configured. Like, how long should the system pause between polls, should messages be processed one by one or in a bulk...

All comes down to the locking. The algorithm looks like this (you can have a closer look at Messaging.java for mor details):

  1. send a command to mongo, which will lock all messages either send directly to me (= this messaging) or is for all and exclusive (should only be processed once). Every system can be identified with a unique UUID and this id is use for locking, too. it will either lock one or all matching messages - depending if you want to process one or all
  2. read in all locked messages
  3. process them
  4. mark message as processed by me (UUID->processed_by)
  5. do a pause (configured) and go to 1.

Replicaset => OpLogMonitor

The OplogMonitor is part of Morphium for quite a while now. It uses a TailableCursor on the oplog to get informed about changes. A tailable cursor will stay open, even if thera are no more matching documents. But will send all incoming documents to the client. So the client gets informed about all changes in the system.

So why not use a TailableCursor directly on the Msg-Collection then? for several reasons:

  1. it only works with capped collections. Which is not a showstopper in our case, but unpleasant
  2. it only informs about new entries in the collection. But the locking algorithm depends on the update being atomic - hence this is not working. We could try to lock messages by erasing old ones and creating new ones, but this is not atomic and will lead to misreads.

Messaging based on the OplogMonitor looks quite similar to the algorithm above, but the polling simplifies things a bit. on new messages, this happens:

  1. is the incoming message an exclusive one, just try the locking described above. But as we now have the ID, it is a lot simpler and more efficient.
  2. is it non exclusive (and not send by myself), just process it
  3. is it an exclusive message and sent directly to me, process it

usually, when an update on messages comes in, nothing interesting happens. But for being able to reject messages (see below) we just start the locking mechanism to be sure.

how to use Messaging

Well, that is quite simple. Kust create an instance of Messaging and hit start. emoji people:smirk

   Messaging messaging=new Messaging(morphium, 500, true);
   messaging.start();

of course, you could instanciate it using spring or something.

Message sending

to send a message, just do:

    messaging.storeMessage(new Msg("Testmessage", "A message", "the value - for now", 5000));

this message here does have a ttl (time to live) of 5 secs. The default ttl is 30secs. Older messages will automatically be deleted by mongo.

Messages are broadcast messages by default, meaning, all listeners may process it. if you set the message to be exclusive, only one of the listeners gets the permission to process ist (see locking above).

        Msg m = new Msg();
        m.setExclusive(true);
        m.setName("A message");

this message will only be processed by one recipient!

And the sender does not read his own messages!

Of course, you can send a message directly to a specifiy recipient. This happens automatically when sending answers for example. To send a message to a specific recipient you need to know his UUID. You can get that by messages being sent (sender for example) or you implement some kind of discovery...

        Msg m = new Msg("testmsg1", "The message from M1", "Value");
        m.addRecipient(recipientId);
        m1.storeMessage(m);
storeMessage vs queueMessage

in the integration tests of Morphium both methods are being used. The difference is quite simple: storeMessage stores the message directly do mongodb whereas queueMessage works asynchronously - which might be the better choice when it comes to performance.

receiving messages

just register a Message listener to the messaging system:

           messaging.addMessageListener((messaging, message) -> {
            log.info("Got Message: " + message.toString());
            gotMessage = true;
            return null;
        });

here, messaging is the messaging system and message the message that was sent. This listener returns null, but it could also return a Message, that should be send back as an answer to the sender.

Using the messaging-object, the listener can also publish own messages, which should not be answers or something.

in addition to that, the listener may "reject" a Message by sending a MessageRejectedException - then the message is unlocked so that all clients might use it again (if it was not sent directly to me).

usage of messaging - cache synchronization

Within Morphium the CacheSynchronizer uses Messaging. It needs a messaging system in the constructor.

The implementation of it is not that complicated. The CacheSynchronizer just registers as MorphiumStorageListener, so that it gets informed about all writing accesses (and only then caches need to be syncrhonized).

public class CacheSynchronizer implements MessageListener, MorphiumStorageListener {

}

on write access, it checks if a cached entity is affected and if so, a ClearCachemessage is send using messaging. This message also contains the strategy to use (like, clear whole cache, update the element and so on).

Of course, incoming messages also have to be processed by the CacheSynchronizer. But that is quite simple: if a message comes in, erase the coresponding cache mentioned in the message according to the strategy.

And you might send those clear messages manually by accessing the CacheSynchronizer directly.

And we should mention, that there you could be informed about all cache sync activities using a specific listener interface.

conclusion

the messaging feature of morphium is not well known yet. But it might be used as a simple replacement for full-blown messaging systems and with the new OplogMonitor-Feature it is even better than it ever was.

created Stephan Bösebeck (stephan)