Morphium Messaging Options

Info

Datum: 25. 02. 2020 um 22:26:13

Schlagworte: MongoDB Java Morphium

Kategorie: morphium

erstellt von Stephan Bösebeck

logged in

ADMIN


Morphium Messaging Options

Weil ich jetzt diese Frage auch schon ein paar mal bekommen habe, hier noch mal eine kurze Zusammenfassung über Morphium Messaging und wie man es nutzen kann:

Der erste Parameter im Konstruktor ist immer die morphium Instanz. Diese bestimmt, welche Mongo genutzt wird und auch einige Settings von Morphium wirken sich auf das Messaging subsystem aus:

  • threadPoolMessagingCoreSize: bestimmt die Core Size des Threadpools in Messaging.
  • threadPoolMessagingMaxSize: die maximale Größe des Threadpools in Messaging
  • threadPoolMessagingKeepAliveTime: Zeit, die ein Thread im Pool ungenutzt überleben kann in ms.

Dieser Threadpool wird eigentlich ausschließlich dazu genutzt, messages in eigenen Threads abzuarbeiten. Wird die maximale größe des ThreadPool (oder die Window-Size) erreicht, wird so lange pausiert, bis wieder genug Kapazität im Pool verfügbar ist.

Dann haben wir in den Konstruktoren noch evtl. weitere Parameter:

  1. immer Morphium Instanz
  2. queueName - der Name der Queue(!), die genutzt werden soll. der Collection-Name ist dann normalerweise mmsg_+der Queuename um Kollisionen mit den "normalen" Namen zu vermeiden. Wird kein Name gesetzt wird "msg" als Collection und queuename angenommen. Der Collection-Name kann mit getCollectionName() auf der Messaging Instanz in Erfahrung gebracht werden.
  3. pause in ms: diese Zeitangabe bestimmt, wie oft nach neuen Nachrichten gesucht werden soll (polling) oder im Falle, dass man sich mit einem Replicaset verbunden hat und somit ChangeStreams verwenden kann, wie viel Zeit zwischen zwei checks nach älteren Nachrichten vergehen soll. Diese Nachrichten können dann auftreten, wenn man das MessageProcessing zwischendurch pausiert. Alle nachrichten, die vie ChangeStream gemeldet werden, werden in dieser Zeit ignoriert und nicht bearbeitet. Damit diese nicht "verloren" gehen, wird alle pause ms nach diesen Nachrichten gesucht. Im Allgemeinen sollte man die Pause im Falle, dass man sich mit einem Replicaset verbunden hat, größer eingestellt werden um die Last auf der Mongo zu minimieren. Im Falle, dass Polling verwendet werden muss, sollte die Zeit relativ klein eingestellt werden um schnelle reaktionszeiten zu garantieren.
  4. processMultiple: wenn true, werden bei jedem run immer gleich mehrere Nachrichten für das processing von diesem Messaging markiert (locking)
  5. multithreaded: bestimmt, ob die Abarbeitung (Aufruf der MessageListener) in einem eigenen Thread stattfinden soll. Wenn gesetzt, wird ein Threadpool gemäß den Settings in MorphiumConfig erzeugt.
  6. windowSize: wie viele Nachrichten sollen auf ein Mal bearbeitet werden. Bestimmt, wie viele Nachrichten Maximal gleichzeitig markiert werden, bzw. wie viele Threads im Threadpool gleichzeitig aktiv sein dürfen.
  7. useChangeStream: damit kann man die Verwendung eines changeStreams erzwingen. Wenn gesetzt, wird man quasi via Push über neue Nachrichten von der Mongo informiert. Wenn nicht gesetzt, findet ein Polling statt. Als Default wird morphium.isReplicaSet() genommen.

Ich möchte auch hier noch mal darauf hinweisen, dass die Verwendung eines Replicaset für Messaging die deutlich bessere Lösung darstellt!

Was gilt es denn bei den Werten zu berücksichtigen

  • stellt man pause zu klein ein (egal ob mit oder ohne ChangeStream), wird die Last auf der Mongodb erhöht. Als guter Wert hat sich etwas um die 100ms herausgestellt. Ist der wert zu groß, kann das zu zu großen Latzenzeiten zwischen denden und Empfangen der Nachricht führen. Wenn diese Latenz die TTL der Nachricht übersteigt, wird die Nachricht nicht mehr bearbeitet.
  • processMultiple und multithreaded sind ein wenig im Zusammenhang zu betrachten. Denn processMultiple macht - insbesondere bei Listenern, die wirklich was tun - nur sinn, wenn man auch multithreaded auf true stellt. Andernfalls werden einfach mehrere Nachrichten für die Abarbeitung dieses Messagingsystems markiert, aber eine nach der anderen abgearbeitet. Beide auf false zu setzen, reduziert die Last im System (single Threaded, eins nach dem anderen), aber kann halt wiederum zu latenzen führen.
  • die windowSize kann man auch nur in Kombination mit dem ThreadPool sehen. Denn eigentlich sollten die Werte in etwa gleich sein. Denn es werden nur maximal so viele Threads gleichzeitig erzeugt, wie in windowSize angegeben ist. Dummerweise muss der Threadpool diese auch zulassen, sonst wird es damit auch nix. Deswegen sollte die windowSize == dem threadPoolMessagingMaxSize sein (um auf nummer Sicher zu gehen, könnte man dem Threadpool auch gerne noch ein paar Threads mehr Luft gönnen). Ist processMultiple nicht gesetzt, dann ist die windowSize quasi auch 1...

alles in allem kann man aber nicht sonderlich viel Falsch machen. Schaut euch einfach die Tests an um ein paar Beispielcodes zu bekommen.