Ich verwende eine Futures-Rs-Version der Rusoto AWS Kinesis-Bibliothek . Ich muss eine tiefe Pipeline von AWS-Kinesis-Anfragen erzeugen, um einen hohen Durchsatz zu erreichen, da Kinesis eine Grenze von 500 Datensätzen pro HTTP-Anfrage hat. In Kombination mit der 50ms Latenzzeit beim Senden einer Anfrage muss ich viele gleichzeitige Anfragen generieren. Ich möchte irgendwo in der Größenordnung von 100 In-Flight-Anfragen erstellen.
Die Funktionssignatur Rusoto put_records
sieht folgendermaßen aus:
Der RusotoFuture
ist ein Wrapper, der wie folgt definiert ist:
Die innere Future
ist umhüllt, aber die RusutoFuture
implementiert immer noch Future::poll()
, also glaube ich, dass sie mit dem futures-rs
Ökosystem kompatibel ist. Das RusotoFuture
stellt einen Synchronisierungsaufruf bereit:
Ich kann eine Anfrage stellen und sync()
it, um das Ergebnis von AWS zu erhalten. Ich möchte viele Anfragen erstellen, sie in eine Art Warteschlange / Liste stellen und fertige Anfragen sammeln. Wenn die Anfrage fehlerhaft ist, muss ich die Anfrage erneut ausgeben (das ist in Kinesis etwas normal, besonders wenn man den Shard-Durchsatz limitiert). Wenn die Anfrage erfolgreich abgeschlossen ist, sollte ich eine Anfrage mit neuen Daten senden. Ich könnte einen Thread für jede Anfrage erzeugen und synchronisieren, aber das scheint ineffizient zu sein, wenn ich den async IO-Thread laufen lasse.
Ich habe versucht, futures::sync::mpsc::channel
aus meinem Anwendungsthread zu verwenden (läuft nicht aus dem Tokio-Reaktor), aber wenn ich tx
klone, erzeugt es einen eigenen Puffer und eliminiert jeden Gegendruck auf send
:
Ohne den Klon habe ich den Fehler:
%Vor% Ich habe mir auch futures::mpsc::sync::spawn
aufgrund von Empfehlungen angesehen aber es dauert Eigentümerschaft des rx
(als Stream
) und löst nicht mein Problem mit dem Copy
von tx
, das ungebundenes Verhalten verursacht.
Ich hoffe, wenn ich die channel
/ spawn
Nutzung nutzen kann, werde ich ein System haben, das RusotoFuture
s benötigt, darauf wartet, dass sie fertig sind und mir dann eine einfache Möglichkeit gibt, die Fertigstellung zu erreichen Ergebnisse aus meinem Anwendungsthread.
Soweit ich Ihr Problem mit channel
ist das nicht, dass ein einzelner Klon der Sender
das erhöht Kapazität um eins, klonen Sie Sender
für jedes Objekt, das Sie senden möchten.
Der Fehler, den Sie ohne clone
sehen, stammt von Ihrer falschen Verwendung der Sink::send
-Schnittstelle. Mit clone
sollten Sie die Warnung tatsächlich sehen:
Das heißt: Ihr aktueller Code sendet eigentlich gar nichts!
Um Gegendruck anzuwenden, müssen Sie diese send
-Aufrufe verketten; jeder sollte warten bis der vorherige fertig ist (und Sie müssen auch auf den letzten warten!); Bei Erfolg bekommst du Sender
zurück. Der beste Weg, dies zu tun, ist ein Stream
von Ihrem Iterator zu generieren, indem Sie % co_de verwenden % und an iter_ok
.
Jetzt haben Sie eine Zukunft send_all
, die Sie "fahren" müssen. Wenn Sie das Ergebnis und die Panik bei Fehler ( SendAll
) ignorieren, können Sie es als separate Aufgabe erzeugen, aber vielleicht möchten Sie es in Ihre Hauptanwendung integrieren (d. H. Es in .then(|r| { r.unwrap(); Ok::<(), ()>(()) })
zurückgeben).
Box
und RusotoFuture::sync
Verwenden Sie nicht Future::wait
: es ist in einer Branche bereits veraltet und wird normalerweise nicht das tun, wonach Sie suchen. Ich bezweifle, dass Future::wait
sich der Probleme bewusst ist, daher empfehle ich, RusotoFuture
zu vermeiden.
RusotoFuture::sync
erhöht die Kanalkapazität Wie Sie richtig festgestellt haben, erhöht das Klonen von Sender
die Kapazität um eins.
Dies scheint getan zu sein, um die Leistung zu verbessern: A Sender
beginnt im unblockierten ("nicht geparkten") Zustand; Wenn ein Sender
nicht blockiert ist, kann er einen Gegenstand ohne Blockierung senden. Wenn jedoch die Anzahl der Elemente in der Warteschlange das konfigurierte Limit erreicht, wenn ein Sender
ein Element sendet, wird Sender
blockiert ("geparkt"). (Durch das Entfernen von Elementen aus der Warteschlange wird die Sender
zu einem bestimmten Zeitpunkt entsperrt.)
Dies bedeutet, dass% ce_de%, nachdem die innere Warteschlange das Limit erreicht hat, immer noch ein Element senden kann, was zu dem dokumentierten Effekt erhöhter Kapazität führt, aber nur wenn tatsächlich alle Sender
s Elemente senden - unbenutztes Sender
s erhöhen nicht die beobachtete Kapazität.
Der Leistungsschub kommt von der Tatsache, dass, solange Sie nicht das Limit erreichen, es keine Aufgaben parken und benachrichtigen muss (was ziemlich schwer ist).
Die private Dokumentation oben in der Sender
Modul beschreibt mehr Details.
Tags und Links asynchronous rust amazon-kinesis future