Verwendung von Reactive Streams für die NIO-Binärverarbeitung?

9

Gibt es einige Codebeispiele für die Verwendung von Bibliotheken org.reactivestreams zur Verarbeitung großer Datenströme mit Java NIO (für hohe Leistung)? ? Ich strebe eine verteilte Verarbeitung an, also wären Beispiele, die Akka verwenden, am besten, aber ich kann das herausfinden.

Es scheint immer noch der Fall zu sein, dass die meisten (ich hoffe nicht alle) Beispiele für das Lesen von Dateien in Scala auf Source (nicht-binär) oder direktes Java NIO (und sogar auf Dinge wie Files.readAllBytes !)

Vielleicht gibt es eine Aktivatorvorlage, die ich verpasst habe? ( Akka Streams mit Scala! ist in der Nähe und adressiert alles, was ich brauche außer der binären / NIO-Seite)

    
Stephen 05.01.2015, 02:08
quelle

2 Antworten

8

Verwenden Sie nicht scala.collection.immutable.Stream , um solche Dateien zu konsumieren. Der Grund dafür ist, dass es Memoization ausführt - das heißt, während es ja faul ist, wird der gesamte Stream im Speicher zwischengespeichert (Memo)!

Dies ist definitiv nicht was Sie wollen, wenn Sie über "stream Verarbeitung einer Datei" nachdenken. Der Grund dafür, dass Scala's Stream so funktioniert, liegt darin, dass es in einer funktionalen Umgebung sinnvoll ist - Sie können es vermeiden, die Fibbonachi-Zahlen immer wieder einfach zu berechnen, zum Beispiel für weitere Details siehe ScalaDoc .

Akka Streams stellt Reactive Streams-Implementierungen bereit und stellt eine FileIO -Klasse zur Verfügung, die Sie hier verwenden können (es wird die Daten aus der Datei nur dann zurückdrucken, wenn sie benötigt werden, und der Rest des Streams ist bereit, sie zu verbrauchen ):

%Vor%

Hier finden Sie weitere Dokumente zum Arbeiten mit IO mit Akka Streams Beachten Sie, dass dies für die aktuell geschriebene Version von Akka gilt, also die 2.5.x-Serie.

Hoffe, das hilft!

    
Konrad 'ktoso' Malawski 07.01.2015 12:10
quelle
4

Wir verwenden Akka-Streams, um Binärdateien zu verarbeiten. Es war ein wenig schwierig, die Dinge in Gang zu bringen, da es keine Dokumentation gab, aber das haben wir uns ausgedacht:

%Vor%

Sobald Sie binSource haben, was ein akka Source[Byte] ist, können Sie fortfahren und die gewünschten Stream-Transformationen ( map , flatMap , transform , etc ...) anwenden. Diese Funktionalität nutzt das Source Companion-Objekt apply , das ein Iterable übernimmt, und übergibt eine scala Stream , die die Daten langsam einlesen und für Ihre Transformationen verfügbar machen soll.

BEARBEITEN

Wie Konrad in den Kommentaren darauf hingewiesen hat, kann ein Stream bei großen Dateien ein Problem darstellen, da er Memoization der Elemente vornimmt, auf die er stößt, wenn er den Stream langsam ausbaut. Dies kann zu Situationen mit zu wenig Arbeitsspeicher führen, wenn Sie nicht vorsichtig sind. Wenn Sie jedoch die Dokumentation für Stream ansehen, gibt es ein Tipp, um zu vermeiden, dass sich Memoization im Gedächtnis aufbaut:

  

Man muss auf die Memoisierung achten; Sie können sehr schnell groß essen   Mengen an Speicher, wenn Sie nicht vorsichtig sind. Der Grund dafür ist, dass   Die Memoisierung des Streams erzeugt eine ähnliche Struktur   scala.collection.immutable.List. Solange sich etwas festhält   der Kopf, der Kopf hält sich am Schwanz fest, und so geht es weiter   rekursiv. Wenn sich dagegen nichts an der   Kopf (z. B. haben wir def verwendet, um den Stream zu definieren), dann ist es nicht mehr   Wird es direkt verwendet, verschwindet es.

Wenn Sie das berücksichtigen, können Sie mein ursprüngliches Beispiel wie folgt ändern:

%Vor%

Die Idee hier ist also, das Stream über ein def aufzubauen und nicht einem val zuzuordnen und dann sofort das iterator daraus zu holen und damit den Akka Source zu initialisieren. Dinge so einzurichten, sollte die Probleme mit Momoization vermeiden. Ich habe den alten Code gegen eine große Datei ausgeführt und konnte eine OutOfMemory -Situation erzeugen, indem ich eine foreach auf der Source ausführte. Als ich es auf den neuen Code umstellte, konnte ich dieses Problem vermeiden.

    
cmbaxter 05.01.2015 13:25
quelle

Tags und Links