Warum gibt mein RxJava Observable nicht aus oder vervollständigt es nur, wenn es blockiert?

8

Hintergrund

Ich habe eine Reihe von RxJava Observables (entweder von Jersey Kunden oder Stubs mit Observable.just(someObject) ) generiert. Alle sollten genau einen Wert ausgeben. Ich habe einen Komponententest, der alle Jersey-Kunden ausspioniert und Observable.just(someObject) verwendet, und ich sehe dort dasselbe Verhalten wie beim Ausführen des Produktionscodes.

Ich habe mehrere Klassen, die auf diese Observablen einwirken, einige Berechnungen durchführen (und einige Nebenwirkungen - ich könnte sie später direkt zu Rückgabewerten bringen) und leere leere Observablen zurückgeben.

An einer Stelle, in einer solchen Klasse, versuche ich mehrere meiner Quellobservablen zu komprimieren und sie dann zuzuordnen - etwa so:

%Vor%

Die Rechenklassen werden dann alle kombiniert und gewartet:

%Vor%

Das Problem

Das Problem ist, dass processToNewObservable() niemals ausgeführt wird. Durch den Eliminierungsprozess kann ich sehen, dass es getObservable1() ist, das ist das Problem - wenn ich es durch Observable.just(null) ersetze, wird alles so ausgeführt, wie ich es mir vorstelle (aber mit einem Nullwert, wo ich einen echten Wert haben möchte).

Noch einmal: getObservable1() gibt eine Observable von einem Jersey-Client im Produktionscode zurück, aber dieser Client ist ein Mockito-Mock, der in meinem Test Observable.just(someValue) zurückgibt.

Untersuchung

Wenn ich getObservable1() in blocking umwandle, dann wickle den ersten Wert in just() , wieder führt alles so aus, wie ich es mir vorgestellt habe (aber ich möchte den Blockierungsschritt nicht einführen):

%Vor%

Mein erster Gedanke war, dass vielleicht etwas anderes den von meinem Observablen ausgestrahlten Wert verbrauchte, und zip sah, dass es bereits vollständig war, und somit feststellte, dass das Ergebnis der Zipper-Operation eine leere Beobachtungsstelle sein sollte. Ich habe versucht, .cache() auf jede beobachtbare Quelle hinzuzufügen, von der ich denke, dass sie verwandt ist, aber das hat das Verhalten nicht verändert.

Ich habe auch versucht, next / error / complete / finally-Handler auf getObservable1 (ohne es zu sperren) kurz vor der Zip-Datei hinzuzufügen, aber keiner von ihnen hat folgendes ausgeführt:

%Vor%

Die Frage

Ich bin sehr neu bei RxJava, also bin ich mir ziemlich sicher, dass ich etwas Grundlegendes vermisse. Die Frage ist: Was für eine dumme Sache könnte ich tun? Wenn das aus dem, was ich bisher gesagt habe, nicht offensichtlich ist, was kann ich tun, um das Problem zu diagnostizieren?

    
Rowan 13.12.2015, 23:21
quelle

3 Antworten

0

Hinweis : Ich habe meine Antwort hier nicht sehr zufriedenstellend gefunden, also habe ich mich ein bisschen weiter gequält und einen viel kleineren Reproduktionsfall gefunden, also habe ich hier eine neue Frage gestellt: Warum sendet mein RxJava Observable nur an den ersten Verbraucher?

Ich habe zumindest einen Teil meiner Probleme herausgefunden (und, Entschuldigung an alle, die versucht haben zu antworten, ich glaube nicht, dass Sie bei meiner Erklärung eine große Chance hatten).

Die verschiedenen Klassen, die diese Berechnungen durchführen, haben alle Observable.empty() zurückgegeben (wie pro processToNewObservable() in meinem ursprünglichen Beispiel). Soweit ich das beurteilen kann, akzeptiert Observable.zip() nicht die N-Observable, die es zieht, bis die N-1-Observable einen Wert ausgegeben hat.

Mein ursprüngliches Beispiel behauptete, es sei getObservable1() , das sich schlecht benahm - das war tatsächlich geringfügig ungenau, es war später in der Parameterliste zu beobachten. Wie ich es verstehe, ist der Grund dafür, dass es blockiert, und dann diesen Wert in ein Observable zu verwandeln, wieder funktioniert, weil das Blockieren und Aufrufen zuerst die Ausführung erzwungen hat und ich die Nebenwirkungen bekommen habe, die ich wollte.

Wenn ich meine gesamte berechnende Klassifizierung so ändere, dass stattdessen Observable.just(null) zurückgegeben wird, funktioniert alles: Der letzte zip() aller Observablen der Berechnungsklassen funktioniert durch sie alle, so dass alle erwarteten Nebenwirkungen auftreten.

Wenn ich eine Null-Leere zurückgebe, scheint es, als würde ich aus Design-Sicht definitiv etwas falsch machen, aber zumindest wird diese spezielle Frage beantwortet.

    
Rowan 14.12.2015, 20:01
quelle
1

Das Observable muss emittieren, um die Kette zu starten. Sie müssen sich Ihre Pipeline als eine Erklärung dessen vorstellen, was passieren wird, wenn das Observable ausstößt.

Sie haben nicht geteilt, was tatsächlich beobachtet wurde, aber Observable.just () bewirkt, dass das Observable das umschlossene Objekt sofort ausgibt.

    
John Scattergood 13.12.2015 23:59
quelle
0

Basierend auf der Antwort im Kommentar gibt einer der getObservable keinen Wert zurück, sondern wird nur vervollständigt oder der Mockito Spott macht etwas falsch. Das folgende eigenständige Beispiel funktioniert für mich. Könntest du es überprüfen und anfangen, es langsam zu mutieren, um zu sehen, wo die Dinge brechen?

%Vor%     
akarnokd 14.12.2015 15:19
quelle

Tags und Links