RxJava. Sequenzielle Ausführung

9

In meiner Android App habe ich einen Moderator, der Benutzerinteraktionen abwickelt, einen Anforderungsmanager enthält und bei Bedarf Benutzereingaben über den Anforderungsmanager an den Anforderungsmanager sendet.

Der Anforderungsmanager selbst enthält die Server-API und verarbeitet Serveranforderungen mit diesem RxJava.

Ich habe einen Code, der immer dann eine Anfrage an den Server sendet, wenn ein Benutzer eine Nachricht eingibt und die Antwort vom Server anzeigt:

%Vor%

Aber jetzt muss ich irgendwie Schlange stehen. Der Benutzer kann eine neue Nachricht senden, bevor der Server geantwortet hat. Jede Nachricht aus der Warteschlange sollte sequenziell verarbeitet werden. I.e. Die zweite Nachricht wird gesendet, nachdem wir eine Antwort auf die erste Nachricht erhalten haben und so weiter.

Falls ein Fehler auftritt, sollten keine weiteren Anfragen bearbeitet werden.

Ich muss auch die Antworten in einem RecyclerView anzeigen.

Ich habe keine Ahnung, wie ich den obigen Code ändern kann, um die oben beschriebene Handhabung zu erreichen

Ich sehe Art von Problem. Auf der einen Seite kann diese Warteschlange jederzeit vom Benutzer aktualisiert werden, auf der anderen Seite, wenn der Server eine Antwort gesendet hat, sollte die Nachricht aus der Warteschlange entfernt werden.

Vielleicht gibt es einen Rxjava-Operator oder eine spezielle Art, die ich gerade verpasst habe.

Ich sah eine ähnliche Antwort hier, aber die "Warteschlange" dort ist konstant. N sequenzielle api-Aufrufe mit RxJava und Retrofit machen

Ich bin sehr dankbar für jede Lösung oder jeden Link

    
Tima 13.11.2017, 12:59
quelle

4 Antworten

2

Ich finde keine elegante native-RxJava-Lösung. Also werde ich eine Subscriber anpassen, um deine Arbeit zu erledigen.

Für Ihre 3 Punkte:

  1. Für die sequentielle Ausführung erstellen wir einen einzelnen Thread-Scheduler

    Scheduler sequential = Schedulers.from(Executors.newFixedThreadPool(1));

  2. Um alle Anfragen zu stoppen, wenn ein Fehler auftritt, sollten wir alle Anfragen zusammen abbonieren, anstatt jedes Mal ein Flowable zu erstellen. Also definieren wir folgende Funktionen (hier ist die Anfrage Integer und Antwort String ):

    void sendRequest(Integer request)

    Flowable<String> reciveResponse()

    und definieren Sie ein Feld, um die Zuordnung von Anfrage- und Antwortfluss zu ermöglichen:

    FlowableProcessor<Integer> requestQueue = UnicastProcessor.create();

  3. Um die nicht gesendete Anfrage erneut auszuführen, definieren wir die Rerun-Funktion:

    void rerun()

Dann können wir es verwenden:

%Vor%

Lasst uns sie jetzt implementieren.

Wenn wir eine Anfrage senden, schieben wir sie einfach in requestQueue

%Vor%

Erstens sollten wir, um die Anfrage nacheinander auszuführen, die Arbeit auf sequential :

einplanen %Vor%

Zweitens, um die Anfrage zu stoppen, wenn ein Fehler auftritt. Es ist ein Standardverhalten. Wenn wir nichts tun, wird ein Fehler das Abonnement und alle weiteren Elemente werden nicht ausgegeben.

Drittens, um die nicht gesendeten Anfragen erneut auszuführen. Erstens weil der native Operator den Stream abbrechen wird, wie MapSubscriber do (RxJava-2.1.0-FlowableMap # 63):

%Vor%

Wir sollten den Fehler umbrechen. Hier verwende ich meine % co_de Um die mögliche Ausnahme zu umbrechen, können Sie jede andere Implementierung verwenden, die die Ausnahme umbrechen kann, statt sie zu werfen:

%Vor%

Und dann ist es die benutzerdefinierte Try .

Er fordert und sendet normalerweise Gegenstände. Wenn ein Fehler auftritt (tatsächlich ist ein fehlgeschlagener OnErrorStopSubscriber implements Subscriber<Try<T>>, Subscription -Empfang), hat er dort angehalten und wird keine Downstream-Anfrage anfordern oder ausgeben. Nach dem Aufruf von Try method kehrt es zum laufenden Statu zurück und wird normal ausgegeben. Die Klasse umfasst etwa 80 Zeilen. Sie können den Code auf meinem GitHub sehen .

Jetzt können wir unseren Code testen:

%Vor%

und Ausgabe:

%Vor%

Sie können sehen, dass es nacheinander läuft. Und gestoppt, wenn ein Fehler auftritt. Nach dem Aufruf rerun method behandelt es weiter die linke nicht gesendete Anfrage.

Den vollständigen Code finden Sie unter my github.

    
Dean Xu 20.11.2017 07:34
quelle
1

Ich schlage vor, asynchrone beobachtbare Methoden zu erstellen, hier ein Beispiel:

%Vor%

das erste Observable sendet Anfrage, wenn er Antwort erhalten wird, die zweite und Sie können Kette, können Sie jede Methode zur Behandlung von Fehlern oder Erfolg, diese Probe wie Warteschlange.

hier das Ergebnis für die Ausführung:

%Vor%     
Elyes 22.11.2017 13:36
quelle
0

Für diese Art von Verhalten verwende ich die Flowable-Backpressure-Implementierung. Erstellen Sie einen äußeren Stream, der für Ihren API-Anforderungsstream übergeordnet ist, flatMap die API-Anforderung mit maxConcurrency = 1 und implementieren Sie eine Art von Pufferstrategie, sodass Ihr Flowable keine Ausnahme auslöst.

%Vor%

Es puffert Benutzereingaben bis zu einem bestimmten Schwellenwert und löscht sie dann (wenn Sie dies nicht tun, wird eine Ausnahme ausgelöst, aber es ist sehr unwahrscheinlich, dass der Benutzer einen solchen Puffer überschreitet), wird er nacheinander 1 zu 1 ausgeführt wie eine Schlange. Versuchen Sie nicht, dieses Verhalten selbst zu implementieren, wenn es Operatoren für ein Verhalten in der Bibliothek selbst gibt.

Oh, ich habe vergessen zu erwähnen, dass deine sendRequest() -Methode Flowable zurückgeben muss, oder du kannst sie in Flowable konvertieren.

Hoffe, das hilft!

    
Tuby 13.11.2017 13:49
quelle
0

Meine Lösungen wären wie folgt (ich habe vorher in Swift etwas ähnliches gemacht):

  1. Sie benötigen eine Wrapper-Schnittstelle (nennen wir es "Event") für Anfragen und Antworten.
  2. Sie benötigen ein Zustandsobjekt (machen wir es zur Klasse "State"), das die Anforderungswarteschlange und die letzte Serverantwort enthält, und eine Methode, die "Event" als Parameter akzeptiert und "this" zurückgibt.
  3. Ihre Hauptverarbeitungskette wird aussehen wie Observable state = Observable.merge (serverResponsesMappedToEventObservable, requestsMappedToEventObservable) .scan (new State (), (state, event) - & gt; {state.apply (event)})
  4. Beide Parameter der Methode .merge () sind wahrscheinlich Subjects.
  5. Die Verarbeitung von Warteschlangen erfolgt in der einzigen Methode des "State" -Objekts (Auswahl und Senden einer Anfrage aus der Warteschlange bei einem beliebigen Ereignis, Hinzufügen zu einer Warteschlange auf Anfrage, Aktualisierung der letzten Antwort auf ein Antwort-Ereignis).
Maxim Volgin 22.11.2017 04:59
quelle

Tags und Links