So implementieren Sie Consumer-Producer mit mehreren Konsumenten und mehreren Warteschlangen

8

Angenommen, es gibt 1 Erzeuger P und 2 Verbraucher C1 und C2. Und es gibt zwei Warteschlangen Q1 und Q2, beide mit einer bestimmten Kapazität.

P erzeugt Gegenstände und wechselt abwechselnd in Q1 und Q2. Artikel wird für bestimmte Verbraucher produziert und kann nicht von anderen Verbrauchern konsumiert werden. Wie kann ich Folgendes in Java implementieren: Nachdem ich die 3 Threads gestartet habe, wenn Q1 leer ist, wird Thread C1 blockiert, bis es benachrichtigt wird, wenn sich etwas in Q1 befindet. So ist Q2. Und P wird blockiert, wenn sowohl Q1 als auch Q2 voll sind, bis es benachrichtigt wird, wenn entweder Q1 oder Q2 nicht voll ist.

Ich dachte daran, BlockingQueue zu verwenden, das einen Verbraucher blockiert, wenn seine Warteschlange leer ist. Aber das Problem ist, wenn eine der Warteschlangen voll ist, wird der Produzent blockiert. Gibt es eine Datenstruktur in Java, mit der wir dieses Problem lösen können?

Aktualisieren

Ich habe selbst eine Lösung, aber ich bin mir nicht sicher, ob es effizient ist. Wir können immer noch 2 Blockingqueues haben. Und wenn ein Benutzer ein Objekt aus seiner Warteschlange nimmt, verwendet es BlockingQueue.take() , so dass es blockiert wird, wenn sich kein Element in der Warteschlange befindet. Wenn der Produzent ein Element zu einer der beiden Warteschlangen hinzufügt, verwendet es BlockingQueue.offer() . Damit wird es nie durch diese Operation blockiert und wird "falsch", wenn die Warteschlange voll ist. Darüber hinaus behalten wir einen AtomicInteger, um die Anzahl der Warteschlangen anzugeben, die nicht voll ist. Jedes Mal, wenn Produzent P ein Element in eine Warteschlange stellen möchte, wird bei falscher Rückgabe der AtomicInteger um 1 verringert. Wenn er 0 erreicht, ruft der Producer AtomicInteger.wait() auf. Jedes Mal, wenn ein Verbraucher ein Element aus seiner Warteschlange nimmt, untersucht es auch den AtomicInteger. Wenn es 0 ist, erhöht der Konsument es um 1 und ruft AtomicInteger.notify() auf.

Bitte lassen Sie mich wissen, ob diese Lösung sinnvoll ist.

Vielen Dank!

    
user2440712 31.05.2013, 14:13
quelle

3 Antworten

0

Sie können Themen aus dem Framework verwenden.

In activemq Ссылка

in hornetq genaues Beispiel für JMS Topic in HornetQ

    
Enrique San Martín 31.05.2013 14:23
quelle
0

Haben Sie einen Striped Executor Service in Erwägung gezogen? Auf diese Weise können Sie Ihr Problem lösen und Ihre Kunden in einen Pool stellen, der viel effizienter ist.

    
OldCurmudgeon 31.05.2013 14:29
quelle
0

Egal, für welche Datenstruktur / Nachrichtenserver Sie sich entscheiden, Sie können mit diesen Ressourcen keine Ressourcen mehr erhalten. Der Speicher- oder Festplattenspeicher ist immer begrenzt.

Es ist also nicht schlecht, dass der Produzent gestoppt wird.

Wenn Ihre Warteschlangen voll sind, sollten Sie versuchen, die -Salance wiederherzustellen: Sie könnten mehr Kunden hinzufügen. Sie könnten die Leistung des Verbrauchers verbessern. Wenn das nicht möglich ist, sollte etwas den Produzenten wirklich drosseln. Dies ist eine Möglichkeit, Fehler wegen zu wenig Arbeitsspeicher oder zu vermeiden, da kein Platz mehr auf dem Gerät vorhanden ist .

Schließlich obliegt es dem Rechenzentrum, die Warteschlangen trotzdem zu überwachen. Sie sollten Sie informieren, wenn der Füllgrad Ihrer Warteschlangen ein Limit erreicht, zum Beispiel & gt; 80%.

Aktualisieren

Wenn der Produzent nicht alle Warteschlangen senden kann, weil eine seiner Warteschlangen voll ist, liegt es an ihm zu puffern, aber das Puffern ist etwas, was die Warteschlangen tun sollten.

    
Beryllium 31.05.2013 14:42
quelle