Ich entschuldige mich im Voraus für die Länge dieser Notiz. Ich verbrachte viel Zeit damit, es kürzer zu machen, und das war so klein, wie ich es bekommen konnte.
Ich habe ein Rätsel und wäre dankbar für Ihre Hilfe. Dieses Mysterium kommt von dem Verhalten eines rxjava observer
Ich schrieb in Clojure über ein paar einfache observable
s aus Online-Stichproben.
Eine beobachtbare Nachricht sendet synchron Nachrichten an die onNext
Handler ihrer Beobachter, und mein angeblich prinzipieller Beobachter verhält sich wie erwartet.
Das andere Observable macht asynchron dasselbe in einem anderen Thread über eine Clojure future
. Derselbe Beobachter erfasst nicht alle Ereignisse, die in seinem onNext
gepostet werden. Es scheint nur eine zufällige Anzahl von Nachrichten am Ende zu verlieren.
Zwischen dem Ablauf einer Wartezeit auf promise
d onCompleted
und dem Ablauf einer Wartezeit für alle Ereignisse, die an einen agent
-Auffang gesendet werden, tritt im folgenden ein absichtliches Rennen auf. Wenn promise
gewinnt, erwarte ich false
für onCompleted
und eine möglicherweise kurze Warteschlange in agent
. Wenn agent
gewinnt, erwarte ich true
für onCompleted
und alle Nachrichten aus der Warteschlange agent
. Das einzige Ergebnis, das ich NICHT erwarte, ist true
für onCompleted
UND eine kurze Warteschlange von agent
. Aber Murphy schläft nicht, und genau das sehe ich. Ich weiß nicht, ob die Müllsammlung ein Fehler ist oder ob ich eine interne Warteschlange für Clojures STM oder meine Dummheit oder etwas ganz anderes habe.
Ich präsentiere hier die Quelle in der Reihenfolge ihrer in sich geschlossenen Form, so dass sie direkt über lein repl
ausgeführt werden kann. Es gibt drei Cermonials, die aus dem Weg geräumt werden: Erstens, die Leiningen-Projektdatei, project.clj
, die die Abhängigkeit von der 0.9.0
-Version von Netflix's rxjava:
Nun, der Namespace und eine Clojure-Anforderung und die Java-Importe:
%Vor%Schließlich ein Makro für die Ausgabe an die Konsole:
%Vor% Endlich zu meinem Beobachter. Ich benutze agent
, um die Nachrichten zu sammeln, die vom onNext
einer Observablen gesendet werden. Ich benutze atom
, um ein potenzielles onError
zu sammeln. Ich benutze promise
für onCompleted
, so dass externe Benutzer die darauf warten können.
Nun, hier ist ein synchrones Observable. Er pumpt 25 Nachrichten in die onNext
Kehlen seiner Beobachter und ruft dann ihre onCompleted
s auf.
Wir abonnieren unseren Beobachter für dieses Observable:
%Vor%Es funktioniert wie erwartet, und wir sehen die folgenden Ergebnisse auf der Konsole
%Vor% Hier ist ein asynchrones Observable, das genau dasselbe tut, nur für einen future
-Thread:
Aber, Überraschung, hier ist, was wir auf der Konsole sehen: true
für onCompleted
, was impliziert, dass promise
DID NOT TIME-OUT; aber nur einige der asynch Nachrichten. Die tatsächliche Anzahl von Nachrichten, die wir sehen, variiert von Lauf zu Lauf, was impliziert, dass es ein Nebenläufigkeitsphänomen gibt. Hinweise geschätzt.
Der await-for
Agent bedeutet Blockiert den aktuellen Thread, bis alle Aktionen abgeschlossen sind
far (von diesem Thread oder Agenten) zu den Agenten aufgetreten sind , was bedeutet, dass es passieren kann, dass nach dem Warten noch ein anderer Thread Nachrichten an den Agenten senden kann was passiert in deinem Fall. Nachdem Sie auf den Agenten gewartet haben und den Wert in der :onNext
-Schlüssel in der Karte gelöscht haben, warten Sie auf das abgeschlossene Versprechen, das sich nach dem Warten als wahr herausstellt, aber in der Zwischenzeit wurden einige andere Nachrichten gesendet um den Agenten in den Vektor gesammelt werden.
Sie können dies lösen, indem Sie den Schlüssel :onCompleted
als ersten Schlüssel in der Karte verwenden, was im Grunde bedeutet, auf die Fertigstellung zu warten und dann auf die Agenten zu warten, da zu diesem Zeitpunkt keine send
-Aufrufe mehr möglich sind geschehen nach wie bereits onCompleted erhalten haben.
Tags und Links clojure reactive-programming rx-java netflix