Steuern der Anzahl der erzeugten Futures, um Gegendruck zu erzeugen

9

Ich verwende eine Futures-Rs-Version der Rusoto AWS Kinesis-Bibliothek . Ich muss eine tiefe Pipeline von AWS-Kinesis-Anfragen erzeugen, um einen hohen Durchsatz zu erreichen, da Kinesis eine Grenze von 500 Datensätzen pro HTTP-Anfrage hat. In Kombination mit der 50ms Latenzzeit beim Senden einer Anfrage muss ich viele gleichzeitige Anfragen generieren. Ich möchte irgendwo in der Größenordnung von 100 In-Flight-Anfragen erstellen.

Die Funktionssignatur Rusoto put_records sieht folgendermaßen aus:

%Vor%

Der RusotoFuture ist ein Wrapper, der wie folgt definiert ist:

%Vor%

Die innere Future ist umhüllt, aber die RusutoFuture implementiert immer noch Future::poll() , also glaube ich, dass sie mit dem futures-rs Ökosystem kompatibel ist. Das RusotoFuture stellt einen Synchronisierungsaufruf bereit:

%Vor%

Ich kann eine Anfrage stellen und sync() it, um das Ergebnis von AWS zu erhalten. Ich möchte viele Anfragen erstellen, sie in eine Art Warteschlange / Liste stellen und fertige Anfragen sammeln. Wenn die Anfrage fehlerhaft ist, muss ich die Anfrage erneut ausgeben (das ist in Kinesis etwas normal, besonders wenn man den Shard-Durchsatz limitiert). Wenn die Anfrage erfolgreich abgeschlossen ist, sollte ich eine Anfrage mit neuen Daten senden. Ich könnte einen Thread für jede Anfrage erzeugen und synchronisieren, aber das scheint ineffizient zu sein, wenn ich den async IO-Thread laufen lasse.

Ich habe versucht, futures::sync::mpsc::channel aus meinem Anwendungsthread zu verwenden (läuft nicht aus dem Tokio-Reaktor), aber wenn ich tx klone, erzeugt es einen eigenen Puffer und eliminiert jeden Gegendruck auf send :

%Vor%

Ohne den Klon habe ich den Fehler:

%Vor%

Ich habe mir auch futures::mpsc::sync::spawn aufgrund von Empfehlungen angesehen aber es dauert Eigentümerschaft des rx (als Stream ) und löst nicht mein Problem mit dem Copy von tx , das ungebundenes Verhalten verursacht.

Ich hoffe, wenn ich die channel / spawn Nutzung nutzen kann, werde ich ein System haben, das RusotoFuture s benötigt, darauf wartet, dass sie fertig sind und mir dann eine einfache Möglichkeit gibt, die Fertigstellung zu erreichen Ergebnisse aus meinem Anwendungsthread.

    
xrl 15.01.2018, 16:54
quelle

1 Antwort

0

Soweit ich Ihr Problem mit channel ist das nicht, dass ein einzelner Klon der Sender das erhöht Kapazität um eins, klonen Sie Sender für jedes Objekt, das Sie senden möchten.

Der Fehler, den Sie ohne clone sehen, stammt von Ihrer falschen Verwendung der Sink::send -Schnittstelle. Mit clone sollten Sie die Warnung tatsächlich sehen:

%Vor%

Das heißt: Ihr aktueller Code sendet eigentlich gar nichts!

Um Gegendruck anzuwenden, müssen Sie diese send -Aufrufe verketten; jeder sollte warten bis der vorherige fertig ist (und Sie müssen auch auf den letzten warten!); Bei Erfolg bekommst du Sender zurück. Der beste Weg, dies zu tun, ist ein Stream von Ihrem Iterator zu generieren, indem Sie % co_de verwenden % und an iter_ok .

Jetzt haben Sie eine Zukunft send_all , die Sie "fahren" müssen. Wenn Sie das Ergebnis und die Panik bei Fehler ( SendAll ) ignorieren, können Sie es als separate Aufgabe erzeugen, aber vielleicht möchten Sie es in Ihre Hauptanwendung integrieren (d. H. Es in .then(|r| { r.unwrap(); Ok::<(), ()>(()) }) zurückgeben).

%Vor%

Box und RusotoFuture::sync

Verwenden Sie nicht Future::wait : es ist in einer Branche bereits veraltet und wird normalerweise nicht das tun, wonach Sie suchen. Ich bezweifle, dass Future::wait sich der Probleme bewusst ist, daher empfehle ich, RusotoFuture zu vermeiden.

Klonen RusotoFuture::sync erhöht die Kanalkapazität

Wie Sie richtig festgestellt haben, erhöht das Klonen von Sender die Kapazität um eins.

Dies scheint getan zu sein, um die Leistung zu verbessern: A Sender beginnt im unblockierten ("nicht geparkten") Zustand; Wenn ein Sender nicht blockiert ist, kann er einen Gegenstand ohne Blockierung senden. Wenn jedoch die Anzahl der Elemente in der Warteschlange das konfigurierte Limit erreicht, wenn ein Sender ein Element sendet, wird Sender blockiert ("geparkt"). (Durch das Entfernen von Elementen aus der Warteschlange wird die Sender zu einem bestimmten Zeitpunkt entsperrt.)

Dies bedeutet, dass% ce_de%, nachdem die innere Warteschlange das Limit erreicht hat, immer noch ein Element senden kann, was zu dem dokumentierten Effekt erhöhter Kapazität führt, aber nur wenn tatsächlich alle Sender s Elemente senden - unbenutztes Sender s erhöhen nicht die beobachtete Kapazität.

Der Leistungsschub kommt von der Tatsache, dass, solange Sie nicht das Limit erreichen, es keine Aufgaben parken und benachrichtigen muss (was ziemlich schwer ist).

Die private Dokumentation oben in der Sender Modul beschreibt mehr Details.

    
Stefan 30.01.2018, 10:14
quelle