rxjava und clojure asynchrony Mysterium: Futures Versprechen und Agenten, oh mein

8

Ich entschuldige mich im Voraus für die Länge dieser Notiz. Ich verbrachte viel Zeit damit, es kürzer zu machen, und das war so klein, wie ich es bekommen konnte.

Ich habe ein Rätsel und wäre dankbar für Ihre Hilfe. Dieses Mysterium kommt von dem Verhalten eines rxjava observer Ich schrieb in Clojure über ein paar einfache observable s aus Online-Stichproben.

Eine beobachtbare Nachricht sendet synchron Nachrichten an die onNext Handler ihrer Beobachter, und mein angeblich prinzipieller Beobachter verhält sich wie erwartet.

Das andere Observable macht asynchron dasselbe in einem anderen Thread über eine Clojure future . Derselbe Beobachter erfasst nicht alle Ereignisse, die in seinem onNext gepostet werden. Es scheint nur eine zufällige Anzahl von Nachrichten am Ende zu verlieren.

Zwischen dem Ablauf einer Wartezeit auf promise d onCompleted und dem Ablauf einer Wartezeit für alle Ereignisse, die an einen agent -Auffang gesendet werden, tritt im folgenden ein absichtliches Rennen auf. Wenn promise gewinnt, erwarte ich false für onCompleted und eine möglicherweise kurze Warteschlange in agent . Wenn agent gewinnt, erwarte ich true für onCompleted und alle Nachrichten aus der Warteschlange agent . Das einzige Ergebnis, das ich NICHT erwarte, ist true für onCompleted UND eine kurze Warteschlange von agent . Aber Murphy schläft nicht, und genau das sehe ich. Ich weiß nicht, ob die Müllsammlung ein Fehler ist oder ob ich eine interne Warteschlange für Clojures STM oder meine Dummheit oder etwas ganz anderes habe.

Ich präsentiere hier die Quelle in der Reihenfolge ihrer in sich geschlossenen Form, so dass sie direkt über lein repl ausgeführt werden kann. Es gibt drei Cermonials, die aus dem Weg geräumt werden: Erstens, die Leiningen-Projektdatei, project.clj , die die Abhängigkeit von der 0.9.0 -Version von Netflix's rxjava:

erklärt %Vor%

Nun, der Namespace und eine Clojure-Anforderung und die Java-Importe:

%Vor%

Schließlich ein Makro für die Ausgabe an die Konsole:

%Vor%

Endlich zu meinem Beobachter. Ich benutze agent , um die Nachrichten zu sammeln, die vom onNext einer Observablen gesendet werden. Ich benutze atom , um ein potenzielles onError zu sammeln. Ich benutze promise für onCompleted , so dass externe Benutzer die darauf warten können.

%Vor%

Nun, hier ist ein synchrones Observable. Er pumpt 25 Nachrichten in die onNext Kehlen seiner Beobachter und ruft dann ihre onCompleted s auf.

%Vor%

Wir abonnieren unseren Beobachter für dieses Observable:

%Vor%

Es funktioniert wie erwartet, und wir sehen die folgenden Ergebnisse auf der Konsole

%Vor%

Hier ist ein asynchrones Observable, das genau dasselbe tut, nur für einen future -Thread:

%Vor%

Aber, Überraschung, hier ist, was wir auf der Konsole sehen: true für onCompleted , was impliziert, dass promise DID NOT TIME-OUT; aber nur einige der asynch Nachrichten. Die tatsächliche Anzahl von Nachrichten, die wir sehen, variiert von Lauf zu Lauf, was impliziert, dass es ein Nebenläufigkeitsphänomen gibt. Hinweise geschätzt.

%Vor%     
Reb.Cabin 01.06.2013, 02:20
quelle

1 Antwort

7

Der await-for Agent bedeutet Blockiert den aktuellen Thread, bis alle Aktionen abgeschlossen sind far (von diesem Thread oder Agenten) zu den Agenten aufgetreten sind , was bedeutet, dass es passieren kann, dass nach dem Warten noch ein anderer Thread Nachrichten an den Agenten senden kann was passiert in deinem Fall. Nachdem Sie auf den Agenten gewartet haben und den Wert in der :onNext -Schlüssel in der Karte gelöscht haben, warten Sie auf das abgeschlossene Versprechen, das sich nach dem Warten als wahr herausstellt, aber in der Zwischenzeit wurden einige andere Nachrichten gesendet um den Agenten in den Vektor gesammelt werden.

Sie können dies lösen, indem Sie den Schlüssel :onCompleted als ersten Schlüssel in der Karte verwenden, was im Grunde bedeutet, auf die Fertigstellung zu warten und dann auf die Agenten zu warten, da zu diesem Zeitpunkt keine send -Aufrufe mehr möglich sind geschehen nach wie bereits onCompleted erhalten haben.

%Vor%     
Ankur 01.06.2013, 08:45
quelle