Ich verwende C # TPL, und ich habe ein Problem mit einem Erzeuger / Verbraucher-Code ... Aus irgendeinem Grund verwendet TPL keine Threads und erstellt neue, ohne anzuhalten,
Ich machte ein einfaches Beispiel, um dieses Verhalten zu demonstrieren:
%Vor%Dieser Code erstellt zwei Aufgaben, Produzent und Konsument. Produces fügt jede Sekunde 1 Arbeitselement hinzu, und Consumer gibt nur eine Zeichenfolge mit Informationen aus. Ich würde annehmen, dass 1 Consumer-Thread in dieser Situation genug ist, da Tasks viel schneller verarbeitet werden, als sie der Warteschlange hinzugefügt werden, aber was tatsächlich passiert ist, dass jede zweite Anzahl von Threads im Prozess um 1 wächst ... als ob TPL erstellt für jedes Element einen neuen Thread
Nachdem ich versucht habe zu verstehen, was passiert, habe ich noch eine andere Sache bemerkt: Obwohl die BlockingCollection-Größe 1 ist, wird Consumer nach einer Weile zum Beispiel in Bursts aufgerufen, so fängt es an:
Eingehenden Job
Verarbeiteter Job-Thread: 4 Prozess-Thread-Anzahl: 9
Eingehenden Job
Verarbeiteter Job-Thread: 6 Prozess-Thread-Anzahl: 9
Eingehenden Job
Job verarbeiteter Thread: 5 Prozess-Threadanzahl: 10
Eingehenden Job
Job verarbeiteter Thread: 4 Prozess-Threadanzahl: 10
Eingehenden Job
Verarbeiteter Job-Thread: 6 Prozess-Thread-Anzahl: 11
und so bearbeitet es weniger als eine Minute später:
Eingehenden Job
Verarbeiteter Job-Thread: 25 Prozess-Thread-Anzahl: 52
Eingehenden Job
Eingehenden Job
Job verarbeiteter Thread: 5 Prozess-Threadanzahl: 54
Job verarbeiteter Thread: 5 Prozess-Threadanzahl: 54
und weil Threads nach dem Beenden von Parallel.ForEach loop entsorgt werden (ich zeige es nicht in diesem Beispiel, aber es war im realen Projekt) Ich nahm an, dass es etwas speziell mit ForEach zu tun hat ... Ich habe das gefunden artice Ссылка , und ich dachte, dass mein Problem wurde von diesem Standard-Partitionierer verursacht, so nahm ich benutzerdefinierte Partitionierung von TPL Beispiele, die Consumer-Themen-Element nacheinander zugeführt wird, und obwohl es die Reihenfolge der Ausführung (loszuwerden, Verzögerung) ...
Eingehenden Job
Job verarbeiteter Thread: 71 Process Thread Count: 140
Eingehenden Job
Job verarbeiteter Thread: 12 Process Thread Count: 141
Eingehenden Job
Job verarbeiteter Thread: 72 Process Thread Count: 142
Eingehenden Job
Job verarbeiteter Thread: 38 Process Thread Count: 143
Eingehenden Job
Job verarbeiteter Thread: 73 Process Thread Count: 143
Eingehenden Job
Job verarbeiteter Thread: 21 Process Thread Count: 144
Eingehenden Job
Job verarbeiteter Thread: 74 Process Thread Count: 145
... es hat die Entwicklung von Threads nicht aufgehalten
Ich weiß über ParallelOptions.MaxDegreeOfParallelism, aber ich möchte immer noch verstehen, was mit TPL passiert und warum es Hunderte von Threads ohne Grund erstellt
in meinem Projekt Ich ein Code, der stundenlang laufen und neue Daten aus der Datenbank lesen, in eine BlockingCollections einfügen und Daten von anderem Code verarbeitet haben, gibt es 1 neues Element etwa alle 5 Sekunden und es dauert einige Millisekunden zu fast einer Minute, um es zu verarbeiten, und nach dem Laufen für ungefähr 10 Minuten, Fadenzahl erreicht über 1000 Fäden
Es gibt zwei Dinge, die zusammen dieses Verhalten verursachen:
ThreadPool
versucht, die optimale Anzahl von Threads für Ihre Situation zu verwenden. Wenn jedoch einer der Threads im Pool blockiert, sieht der Pool dies so, als ob dieser Thread keine sinnvolle Arbeit verrichten würde, und neigt dazu, bald darauf einen anderen Thread zu erstellen. Das bedeutet, dass ThreadPool
beim Optimieren der optimalen Anzahl von Threads wirklich schlecht ist, wenn Sie viel blockieren, und neue Threads erstellen, bis das Limit erreicht ist.
Parallel.ForEach()
vertraut der ThreadPool
auf die richtige Anzahl von Threads, es sei denn, Sie haben die maximale Anzahl von Threads explizit festgelegt. Parallel.ForEach()
war auch in erster Linie für beschränkte Sammlungen gedacht, nicht für Datenströme.
Wenn Sie diese beiden Dinge mit GetConsumingEnumerable()
kombinieren, erhalten Sie, dass Parallel.ForEach()
Threads erstellt, die fast immer blockiert sind. Das ThreadPool
sieht dies und erstellt, um die CPU zu benutzen, mehr und mehr Threads.
Die richtige Lösung ist hier, MaxDegreeOfParallelism
zu setzen. Wenn Ihre Berechnungen CPU-gebunden sind, ist der beste Wert wahrscheinlich Environment.ProcessorCount
. Wenn sie IO-gebunden sind, müssen Sie den besten Wert experimentell herausfinden.
Eine weitere Option, wenn Sie .Net 4.5 verwenden können, ist die Verwendung von TPL Dataflow. Diese Bibliothek wurde speziell für die Verarbeitung von Datenströmen erstellt, so wie Sie sie haben. Sie hat also nicht die Probleme, die Ihr Code hat. Es ist eigentlich sogar besser als das und verwendet überhaupt keine Threads, wenn es gerade nichts verarbeitet.
Hinweis: Es gibt auch einen guten Grund, warum für jedes neue Element ein neuer Thread erstellt wird, aber wenn ich dies erkläre, müsste ich erklären, wie Parallel.ForEach()
genauer arbeitet, und ich denke, das ist hier nicht notwendig.
Tags und Links c# multithreading task-parallel-library parallel-processing producer-consumer