Wo ist der Speicherverlust, wenn mapcat den Gegendruck in core.async aufgibt?

9

Ich habe einen core.async-Code in Clojure geschrieben, und als ich ihn ausgeführt habe, hat er den gesamten verfügbaren Speicher belegt und ist mit einem Fehler gescheitert. Es scheint, dass die Verwendung von mapcat in einer core.async-Pipeline den Druck abbaut. (Was aus Gründen, die über diese Frage hinausgehen, unglücklich ist.)

Hier ist ein Code, der das Problem veranschaulicht, indem :x s in und aus einem mapcat ing Wandler gezählt wird:

%Vor%

Der Produzent rennt den Konsumenten weit voraus.

Es scheint, dass ich nicht die erste Person bin, die das entdeckt. Aber die Erklärung, die hier gegeben wurde, scheint es nicht ganz zu überdecken. (Obwohl es eine angemessene Problemumgehung bietet.) Konzeptionell würde ich erwarten, dass der Produzent voraus ist, aber nur durch die Länge der wenigen Nachrichten, die in den Kanälen gepuffert werden könnten.

Meine Frage ist, wo sind all die anderen Nachrichten? Nach der vierten Zeile der Ausgabe werden 7000 :x s nicht berücksichtigt.

    
Peter Westmacott 21.06.2016, 19:51
quelle

1 Antwort

2

Es gibt zwei mögliche Interpretationen der Frage "Wo ist das Speicherleck?"

Erstens, wo werden die Daten gespeichert? Die Antwort scheint in dem Kanalpuffer unmittelbar stromabwärts der expandierenden Transformation zu liegen.

Kanäle verwenden standardmäßig FixedBuffer ( clojure.core.async.impl.buffers / FixedBuffer ), das erkennen kann, ob es voll ist, aber nicht dagegen ist, überladen zu sein.

Zweitens, welche Passage des Codes bewirkt, dass der Puffer überfüllt ist? Dies (korrigieren Sie mich, wenn ich falsch liege) scheint in Die Methode take! von ManyToManyChannel ( clojure.core.async.impl.channels / ManyToManyChannel ) wo Der erste Aufruf von add! im Puffer erfolgt vor jedem anrufen zu full? stattgefunden haben.

Es scheint, dass take! davon ausgeht, dass es für jedes Element, das es entfernt, dem Puffer mindestens ein Element hinzufügen kann. Bei lang laufenden Dehnungswandlern wie mapcat ist dies nicht immer eine sichere Annahme.

Indem Sie dies ändern Zeile bis (when (and (.hasNext iter) (not (impl/full? buf))) in einer lokalen Kopie von core.async Ich kann den Code in der Frage wie erwartet verhalten lassen. (N.B. Mein Verständnis von core.async reicht nicht aus, um zu garantieren, dass dies eine robuste Lösung für Ihren Anwendungsfall ist.)

UPDATE 2016-09-17: Es gibt jetzt ein Problem dafür: Ссылка

    
Peter Westmacott 27.06.2016, 15:56
quelle