Threadbehandlung im Java HornetQ-Client

9

Ich versuche zu verstehen, wie man mit Threads innerhalb eines Java-Clients umgeht, der sich mit HornetQ verbindet. Ich erhalte keinen spezifischen Fehler, verstehe aber nicht, wie ich mit Threads umgehen soll (in Bezug auf den HornetQ-Client und speziell MessageHandler.onMessage() - Threads im Allgemeinen sind für mich kein Problem).

Falls dies relevant ist: Ich verwende 'org.hornetq:hornetq-server:2.4.7.Final' , um den in meine Anwendung eingebetteten Server auszuführen. Ich habe nicht vor, dass dies einen Unterschied macht. In meiner Situation ist das aus Sicht eines Operators einfacher, als einen eigenständigen Serverprozess auszuführen.

Was ich bis jetzt gemacht habe:

  1. Erstellen Sie einen eingebetteten Server: new EmbeddedHornetQ(), .setConfiguration()

  2. Erstellen Sie einen Server-Locator: HornetQClient.createServerLocator(false, new TransportConfiguration(InVMConnectorFactory.class.getName()))

  3. Erstellen Sie eine Sitzungs-Factory: serverLocator.createSessionFactory()

Jetzt ist es für mich offensichtlich, dass ich eine Sitzung mit hornetqClientSessionFactory.createSession() erstellen, einen Producer und einen Consumer für diese Session erstellen und Nachrichten innerhalb eines einzelnen Threads mit .send() und .receive() bearbeiten kann.

Aber ich habe auch consumer.setMessageHandler() entdeckt, und das sagt mir, dass ich das Threading im Client überhaupt nicht verstanden habe. Ich habe versucht, es zu verwenden, aber dann ruft der Consumer messageHandler.onMessage() in zwei Threads auf, die sich von denen unterscheiden, die die Sitzung erstellt haben. Dies scheint meiner Meinung nach dem Code zu entsprechen - der HornetQ-Client verwendet einen Thread-Pool, um Nachrichten zu versenden.

Das lässt mich verwirrt sein. Die Javadocs sagen, dass die Sitzung ein "single-thread object" ist, und der Code stimmt überein - keine offensichtliche Synchronisation läuft dort ab. Aber da onMessage() in mehreren Threads aufgerufen wird, wird auch message.acknowledge() in mehreren Threads aufgerufen, und dieser delegiert nur an die Sitzung. Wie soll das funktionieren? Wie würde ein Szenario aussehen, in dem MessageHandler aus mehreren Threads NICHT auf die Sitzung zugreift?

Gehen Sie weiter, Wie würde ich Follow-up-Nachrichten von innerhalb von onMessage () senden? Ich verwende HornetQ für eine persistente "to-do" -Arbeitswarteschlange, so dass das Senden von Follow-up-Nachrichten ist ein typischer Anwendungsfall für mich. Aber wieder, in onMessage() , bin ich in den falschen Thread für den Zugriff auf die Sitzung.

Beachten Sie, dass es okay wäre, wenn Sie den MessageHandler nicht verwenden und send() / receive() nur so verwenden, dass ich Threading steuern kann. Aber ich bin überzeugt, dass ich die ganze Situation überhaupt nicht verstehe, und dass kombiniert mit Multi-Threading nur nach Ärger fragt.

    
Martin Geisse 06.06.2016, 07:38
quelle

1 Antwort

1

Ich kann einen Teil Ihrer Frage beantworten, obwohl ich hoffe, dass Sie das Problem bereits behoben haben.

Bilden Sie die HornetQ-Dokumentation auf ClientConsumer (Hervorhebung von mir):

  

Ein ClientConsumer empfängt Nachrichten von HornetQ-Warteschlangen.
  Nachrichten können synchron konsumiert werden, indem die receive () -Methoden verwendet werden, die blockieren, bis eine Nachricht empfangen wird (oder ein Zeitlimit abläuft), oder asynchron durch Festlegen eines MessageHandlers.
Diese beiden Verbrauchstypen sind exklusiv : Ein ClientConsumer mit einem MessageHandler-Set wird HornetQException auslösen, wenn seine receive () -Methoden aufgerufen werden.

Sie haben also zwei Möglichkeiten, den Nachrichtenempfang zu behandeln:

  1. Synchronisieren Sie die Rezeption selbst
    • Stellen Sie nicht einen MessageListener für HornetQ bereit
    • Rufen Sie in Ihrem eigenen cunsumer Thread .receive() oder .receive(long itmeout) in Ihrer Freizeit auf
    • Abrufen des (optionalen) ClientMessage -Objekts, das vom Aufruf zurückgegeben wurde
      • Pro: Mit dem Session , den Sie hoffentlich im Consumer tragen, können Sie die Nachricht so weiterleiten, wie Sie es für richtig halten
      • Con: All diese Nachrichtenbehandlung wird sequenziell sein
  2. Thread-Synchronisierung an HornetQ deligieren
    • Führt nicht .receive() auf einem Consumer
    • aus
    • Bereitstellung einer MessageListener-Implementierung von onMessage(ClientMessage)
      • Pro: Die gesamte Nachrichtenverarbeitung erfolgt gleichzeitig und schnell, ohne Probleme.
      • Con: Ich glaube nicht, dass es möglich ist, das Session von diesem Objekt abzurufen, da es nicht von der Schnittstelle verfügbar gemacht wird.
    • Ungeprüfte Problemumgehung: In meiner Anwendung (die in-vm wie Ihre ist) habe ich die zugrunde liegende, threadsichere QueueConnection als statische Variable, die anwendungsweit verfügbar ist. Von Ihrem MessageListener können Sie QueueSession jmsSession = jmsConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); darauf aufrufen, um eine neue Session zu erhalten und Ihre Nachrichten davon zu senden ... Dies ist wahrscheinlich in Ordnung soweit ich kann siehe , da das Session-Objekt nicht wirklich neu erstellt wird. Ich tat dies auch, weil Sessions dazu neigten, abgestanden zu werden.

Ich denke nicht, dass Sie so viel Kontrolle über Ihre Nachrichtenausführungs-Threads haben sollten, insbesondere transiente Threads, die lediglich Nachrichten weiterleiten. HornetQ hat eingebaute Thread-Pools, wie Sie es vermutet haben, und verwendet diese Objekte effizient.

Wie Sie auch wissen, müssen Sie nicht in einem einzelnen Thread sein, um auf ein Objekt (wie eine Warteschlange) zuzugreifen, so dass es keinen Unterschied macht, ob auf die Warteschlange über mehrere Threads oder sogar über mehrere Sitzungen zugegriffen wird. Sie müssen nur sicherstellen, dass eine Sitzung nur von einem Thread unterstützt wird, und dies ist per Entwurf mit MessageListener.

    
Sylvain Boisse 06.10.2016 13:18
quelle