Wie schließe ich am besten eine clojure core.async-Pipeline von Prozessen ab?

8

Ich habe eine Clojure-App, die eine Pipeline von Kanälen darstellt. Jeder Verarbeitungsschritt führt seine Berechnungen asynchron durch (dh er macht eine HTTP-Anfrage unter Verwendung von http-kit oder etwas) und bringt sein Ergebnis auf den Ausgabekanal. Auf diese Weise kann der nächste Schritt aus diesem Kanal lesen und seine Berechnung durchführen.

Meine Hauptfunktion sieht so aus

%Vor%

Momentan steuert der Scheduler-Schritt die Pipeline (er hat keinen Eingangskanal) und stellt der Kette Workload zur Verfügung.

Wenn ich dies in der REPL ausführen:

%Vor%

Es läuft grundsätzlich für immer wegen der Unendlichkeit des Schedulers. Was ist der beste Weg, um diese Architektur so zu ändern, dass ich das ganze System von der REPL herunterfahren kann? Bedeutet das Schließen jedes Kanals, dass das System beendet wird?

Würde ein Broadcast-Kanal helfen?

    
Marten Sytema 10.10.2015, 07:55
quelle

3 Antworten

6

Sie könnten Ihren Scheduler alts! / alts!! auf einem Kill-Kanal und dem Eingabekanal Ihrer Pipeline haben:

%Vor%

Wenn Sie einen Wert auf kill-channel setzen, wird die Schleife beendet.

Technisch könnte man auch output-ch verwenden, um den Prozess zu steuern (legt auf geschlossene Kanäle zurück, false ), aber normalerweise finde ich explizite Kill-Kanäle sauberer, zumindest für Pipelines der obersten Ebene.

Um die Dinge gleichzeitig eleganter und bequemer zu machen (sowohl bei der REPL als auch in der Produktion), könnten Sie die Komponente von Stuart Sierra , starten Sie die Scheduler-Schleife (in einem separaten Thread) und assoc den Kill-Kanal in der start -Methode der Komponente und dann close! den Kill-Kanal (und beenden Sie damit die Schleife) in der Komponente stop Methode.

    
Michał Marczyk 10.10.2015, 12:34
quelle
4

Ich würde vorschlagen, etwas wie Ссылка zu verwenden, um das System-Setup zu handhaben. Es stellt sicher, dass Sie Ihr System in der REPL einfach starten und stoppen können. Wenn Sie diese Bibliothek verwenden, richten Sie sie so ein, dass jeder Verarbeitungsschritt eine Komponente ist und jede Komponente die Einrichtung und den Abbau von Kanälen in ihren Protokollen start und stop übernimmt. Außerdem könnten Sie wahrscheinlich ein IStream -Protokoll für jede zu implementierende Komponente erstellen und jede Komponente davon abhängig machen, dass Komponenten dieses Protokoll implementieren. Es kauft Ihnen einige sehr einfache Modularität.

Sie würden mit einem System enden, das wie folgt aussieht:

%Vor%

Eine nette Sache mit dieser Art von Ansatz ist, dass Sie leicht Komponenten hinzufügen können, wenn sie auch auf einen Kanal angewiesen sind. Wenn beispielsweise jede Komponente ihren Ausgabekanal mit einem tap in einem internen mult erstellt, können Sie einen Logger für den Prozessor nur durch eine Protokollierungskomponente hinzufügen, die den Prozessor als Abhängigkeit verwendet.

%Vor%

Ich würde empfehlen, auch seine Diskussion zu verfolgen, um sich ein Bild davon zu machen, wie es funktioniert.

    
Alvin Francis Dumalus 10.10.2015 12:57
quelle
1

Sie sollten den neu geladenen Workflow von Stuart Sierra in Erwägung ziehen, der von der Modellierung Ihres 'abhängt. Pipeline-Elemente als Komponenten , so können Sie Ihre logischen Singletons als "Klassen" modellieren, was bedeutet, dass Sie die Konstruktion und Zerstörung steuern können ( Start / Stop) Logik für jede von ihnen.

    
Krisztián Szabó 10.10.2015 13:29
quelle

Tags und Links