Gibt es einige Codebeispiele für die Verwendung von Bibliotheken org.reactivestreams zur Verarbeitung großer Datenströme mit Java NIO (für hohe Leistung)? ? Ich strebe eine verteilte Verarbeitung an, also wären Beispiele, die Akka verwenden, am besten, aber ich kann das herausfinden.
Es scheint immer noch der Fall zu sein, dass die meisten (ich hoffe nicht alle) Beispiele für das Lesen von Dateien in Scala auf Source
(nicht-binär) oder direktes Java NIO (und sogar auf Dinge wie Files.readAllBytes
!)
Vielleicht gibt es eine Aktivatorvorlage, die ich verpasst habe? ( Akka Streams mit Scala! ist in der Nähe und adressiert alles, was ich brauche außer der binären / NIO-Seite)
Verwenden Sie nicht scala.collection.immutable.Stream
, um solche Dateien zu konsumieren. Der Grund dafür ist, dass es Memoization ausführt - das heißt, während es ja faul ist, wird der gesamte Stream im Speicher zwischengespeichert (Memo)!
Dies ist definitiv nicht was Sie wollen, wenn Sie über "stream Verarbeitung einer Datei" nachdenken. Der Grund dafür, dass Scala's Stream so funktioniert, liegt darin, dass es in einer funktionalen Umgebung sinnvoll ist - Sie können es vermeiden, die Fibbonachi-Zahlen immer wieder einfach zu berechnen, zum Beispiel für weitere Details siehe ScalaDoc .
Akka Streams stellt Reactive Streams-Implementierungen bereit und stellt eine FileIO
-Klasse zur Verfügung, die Sie hier verwenden können (es wird die Daten aus der Datei nur dann zurückdrucken, wenn sie benötigt werden, und der Rest des Streams ist bereit, sie zu verbrauchen ):
Hier finden Sie weitere Dokumente zum Arbeiten mit IO mit Akka Streams Beachten Sie, dass dies für die aktuell geschriebene Version von Akka gilt, also die 2.5.x-Serie.
Hoffe, das hilft!
Wir verwenden Akka-Streams, um Binärdateien zu verarbeiten. Es war ein wenig schwierig, die Dinge in Gang zu bringen, da es keine Dokumentation gab, aber das haben wir uns ausgedacht:
%Vor% Sobald Sie binSource
haben, was ein akka Source[Byte]
ist, können Sie fortfahren und die gewünschten Stream-Transformationen ( map
, flatMap
, transform
, etc ...) anwenden. Diese Funktionalität nutzt das Source
Companion-Objekt apply
, das ein Iterable
übernimmt, und übergibt eine scala Stream
, die die Daten langsam einlesen und für Ihre Transformationen verfügbar machen soll.
BEARBEITEN
Wie Konrad in den Kommentaren darauf hingewiesen hat, kann ein Stream bei großen Dateien ein Problem darstellen, da er Memoization der Elemente vornimmt, auf die er stößt, wenn er den Stream langsam ausbaut. Dies kann zu Situationen mit zu wenig Arbeitsspeicher führen, wenn Sie nicht vorsichtig sind. Wenn Sie jedoch die Dokumentation für Stream ansehen, gibt es ein Tipp, um zu vermeiden, dass sich Memoization im Gedächtnis aufbaut:
Man muss auf die Memoisierung achten; Sie können sehr schnell groß essen Mengen an Speicher, wenn Sie nicht vorsichtig sind. Der Grund dafür ist, dass Die Memoisierung des Streams erzeugt eine ähnliche Struktur scala.collection.immutable.List. Solange sich etwas festhält der Kopf, der Kopf hält sich am Schwanz fest, und so geht es weiter rekursiv. Wenn sich dagegen nichts an der Kopf (z. B. haben wir def verwendet, um den Stream zu definieren), dann ist es nicht mehr Wird es direkt verwendet, verschwindet es.
Wenn Sie das berücksichtigen, können Sie mein ursprüngliches Beispiel wie folgt ändern:
%Vor% Die Idee hier ist also, das Stream
über ein def
aufzubauen und nicht einem val
zuzuordnen und dann sofort das iterator
daraus zu holen und damit den Akka Source
zu initialisieren. Dinge so einzurichten, sollte die Probleme mit Momoization vermeiden. Ich habe den alten Code gegen eine große Datei ausgeführt und konnte eine OutOfMemory
-Situation erzeugen, indem ich eine foreach
auf der Source
ausführte. Als ich es auf den neuen Code umstellte, konnte ich dieses Problem vermeiden.
Tags und Links scala akka nio reactive-streams