info
Because I've now received this question a few times, here is a brief summary of morphine messaging and how to use it:
The first parameter in the constructor is always the morphium instance. This determines which Mongo is used and also some settings of morphine affect the messaging subsystem:
threadPoolMessagingCoreSize
: determines the core size of the thread pool in messaging.threadPoolMessagingMaxSize
: the maximum size of the thread pool in messagingthreadPoolMessagingKeepAliveTime
: Time that a thread in the pool can survive unused in ms.
This thread pool is actually only used to process messages in own threads. If the maximum size of the ThreadPool (or the window size) is reached, it is paused until enough capacity is available in the pool again.
Then we may have other parameters in the constructors:
- Always morphine instance
queueName
- the name of the queue (!) To be used. the collection name is then usuallymmsg_
+ the queue name to avoid collisions with the" normal "names. If no name is set, "msg" is assumed as the collection and queue name. The collection name can be found withgetCollectionName ()
on the messaging instance.pause
in ms: this time determines how often to search for new messages (polling) or, if you have connected to a replica set and thusChangeStreams
can use how much time between two checks older messages should pass. These messages can occur when MessageProcessing is paused in between. All messages reported to ChangeStream will be ignored during this time and will not be processed. So that these are not "lost", these messages are searched for everypause
ms. In general, if you have connected to a Replica set, the pause should be increased to minimize the load on the Mongo. If polling has to be used, the time should be set relatively short to guarantee fast response times.processMultiple
: if true, several messages are always marked for processing by this messaging (locking)multithreaded
: determines whether processing (calling the MessageListener) should take place in a separate thread. If set, a thread pool is created according to the settings inMorphiumConfig
.windowSize
: how many messages should be processed at once. Determines the maximum number of messages that can be marked at the same time, or how many threads can be active in the thread pool at the same time.useChangeStream
: this can be used to force the use of a changeStream. If set, you will be informed about new messages from Mongo via push. If not set, polling takes place. The default ismorphium.isReplicaSet ()
.
Again, I would like to point out that using a replica set for messaging is a much better solution!
What is there to take into account in the values
- if you set the pause too small (regardless of whether with or without ChangeStream), the load on the Mongodb is increased. Something around 100ms turned out to be a good value. If the value is too large, this can lead to long delays between sending and receiving the message. If this latency exceeds the TTL of the message, the message is no longer processed.
processMultiple
andmultithreaded
are a bit related. BecauseprocessMultiple
only makes sense - especially for listeners who really do some work - if you also setmultithreaded
totrue
. Otherwise, several messages are simply marked for processing by this messaging system, but are being processed one after the other. Setting both tofalse
reduces the load in the system (single threaded, one by one), but can in turn to high latencies.- the
windowSize
is strongly related to the ThreadPool. Actually the values ​​should be about the same. Because the maximum of how threads are created at the same time is specified by windowSize. it would be best if the thread pool also allowed this, otherwise it won't work. Therefore, thewindowSize
should be about the same asthreadPoolMessagingMaxSize
(to be on the safe side, you could also give the thread pool a few more threads). IfprocessMultiple
is set tofalse
, thewindowSize
is 1.
all in all you can't do much wrong. Just have a look at the tests to get some sample codes.