Morphium Messaging Options

info

date: 2020-02-25 22:26:13

tags: MongoDB Java Morphium

category: morphium

Created by: Stephan Bösebeck

logged in

ADMIN


Morphium Messaging Options

Because I've now received this question a few times, here is a brief summary of morphine messaging and how to use it:

The first parameter in the constructor is always the morphium instance. This determines which Mongo is used and also some settings of morphine affect the messaging subsystem:

  • threadPoolMessagingCoreSize: determines the core size of the thread pool in messaging.
  • threadPoolMessagingMaxSize: the maximum size of the thread pool in messaging
  • threadPoolMessagingKeepAliveTime: Time that a thread in the pool can survive unused in ms.

This thread pool is actually only used to process messages in own threads. If the maximum size of the ThreadPool (or the window size) is reached, it is paused until enough capacity is available in the pool again.

Then we may have other parameters in the constructors:

  1. Always morphine instance
  2. queueName - the name of the queue (!) To be used. the collection name is then usually mmsg_ + the queue name to avoid collisions with the" normal "names. If no name is set, "msg" is assumed as the collection and queue name. The collection name can be found with getCollectionName () on the messaging instance.
  3. pause in ms: this time determines how often to search for new messages (polling) or, if you have connected to a replica set and thusChangeStreams can use how much time between two checks older messages should pass. These messages can occur when MessageProcessing is paused in between. All messages reported to ChangeStream will be ignored during this time and will not be processed. So that these are not "lost", these messages are searched for every pause ms. In general, if you have connected to a Replica set, the pause should be increased to minimize the load on the Mongo. If polling has to be used, the time should be set relatively short to guarantee fast response times.
  4. processMultiple: if true, several messages are always marked for processing by this messaging (locking)
  5. multithreaded: determines whether processing (calling the MessageListener) should take place in a separate thread. If set, a thread pool is created according to the settings in MorphiumConfig.
  6. windowSize: how many messages should be processed at once. Determines the maximum number of messages that can be marked at the same time, or how many threads can be active in the thread pool at the same time.
  7. useChangeStream: this can be used to force the use of a changeStream. If set, you will be informed about new messages from Mongo via push. If not set, polling takes place. The default is morphium.isReplicaSet ().

Again, I would like to point out that using a replica set for messaging is a much better solution!

What is there to take into account in the values

  • if you set the pause too small (regardless of whether with or without ChangeStream), the load on the Mongodb is increased. Something around 100ms turned out to be a good value. If the value is too large, this can lead to long delays between sending and receiving the message. If this latency exceeds the TTL of the message, the message is no longer processed.
  • processMultiple and multithreaded are a bit related. Because processMultiple only makes sense - especially for listeners who really do some work - if you also set multithreaded to true. Otherwise, several messages are simply marked for processing by this messaging system, but are being processed one after the other. Setting both to false reduces the load in the system (single threaded, one by one), but can in turn to high latencies.
  • the windowSize is strongly related to the ThreadPool. Actually the values ​​should be about the same. Because the maximum of how threads are created at the same time is specified by windowSize. it would be best if the thread pool also allowed this, otherwise it won't work. Therefore, the windowSize should be about the same as threadPoolMessagingMaxSize (to be on the safe side, you could also give the thread pool a few more threads). If processMultiple is set to false, the windowSize is 1.

all in all you can't do much wrong. Just have a look at the tests to get some sample codes.