Ich versuche eine Klasse zu implementieren, die zwei Threads verwendet: einen für den Produzenten und einen für den Konsumenten. Die aktuelle Implementierung verwendet keine Sperren:
%Vor%Die Anwendung muss Arbeitselemente für einen bestimmten Zeitraum in die Warteschlange stellen und dann auf ein Ereignis warten. Dies ist ein minimaler main, der das Verhalten simuliert:
%Vor% Ich bin mir ziemlich sicher, dass meine Implementierung fehlerhaft ist: Was passiert, wenn der Arbeitsthread beendet wird und bevor working_ = false
ausgeführt wird, ein weiterer enqueue
kommt? Ist es möglich, meinen Code-Thread sicher zu machen, ohne Sperren zu verwenden?
Die Lösung erfordert:
Ich habe eine andere Implementierung der Klasse Worker
durchgeführt, basierend auf Ihren Vorschlägen. Hier ist mein zweiter Versuch:
Ich habe das worker_.join()
in der enqueue
-Methode eingeführt. Dies kann sich auf die Leistung auswirken, aber in sehr seltenen Fällen (wenn die Warteschlange leer ist und bevor der Thread beendet wird, kommt ein weiteres enqueue
hinzu). Die Variable working_
ist jetzt ein atomic_flag
, das in enqueue
festgelegt und in work
gelöscht wird. Der zusätzliche while
nach working_.clear()
wird benötigt, da wenn ein anderer Wert, vor dem clear
, aber nach dem while
, der Wert nicht verarbeitet wird.
Ist diese Implementierung korrekt?
Ich habe einige Tests durchgeführt und die Implementierung scheint zu funktionieren.
OT: Ist es besser, dies als eine Bearbeitung oder eine Antwort zu setzen?
Das ist meine Lösung der Frage. Ich mag es nicht sehr, mich selbst zu beantworten, aber ich denke, das Zeigen von tatsächlichem Code kann anderen helfen.
%Vor% Ich habe den Vorschlag von @Cameron ausprobiert, den Thread nicht herunterzufahren und einen Semaphor hinzuzufügen. Dies wird nur in der ersten enqueue
und in der letzten work
verwendet. Dies ist nicht blockierungsfrei, sondern nur in diesen beiden Fällen.
Ich habe einen Leistungsvergleich zwischen meiner vorherigen Version (siehe meine bearbeitete Frage) und dieser gemacht. Es gibt keine signifikanten Unterschiede, wenn es nicht viele Start und Stopp gibt. Das enqueue
ist jedoch 10-mal schneller, wenn es signal
des Worker-Threads sein muss, anstatt einen neuen Thread zu starten. Dies ist ein seltener Fall, also ist es nicht sehr wichtig, aber es ist trotzdem eine Verbesserung.
Diese Implementierung erfüllt:
enqueue
und work
beschäftigt sind); enqueue
gibt
Was passiert, wenn der Worker-Thread beendet wird und bevor er working_ = false ausführt, kommt eine weitere Enqueue?
Dann wird der Wert in die Warteschlange geschoben, aber nicht verarbeitet, bis ein anderer Wert in die Warteschlange eingereiht wird, nachdem das Flag gesetzt wurde. Sie (oder Ihre Benutzer) können entscheiden, ob dies akzeptabel ist. Dies kann mit Sperren vermieden werden, aber sie sind gegen Ihre Anforderungen.
Der Code kann fehlschlagen, wenn der laufende Thread gerade beendet wird, und setzt working_ = false;
, aber er wird nicht beendet, bevor der nächste Wert in die Warteschlange gestellt wird. In diesem Fall ruft Ihr Code operator = für den laufenden Thread auf, der zu einem Anruf führt zu std::terminate
gemäß der verknüpften Dokumentation.
Das Hinzufügen von worker_.join()
vor dem Zuweisen des Arbeiters zu einem neuen Thread sollte dies verhindern.
Ein weiteres Problem ist, dass queue_.push
fehlschlagen kann, wenn die Warteschlange voll ist, weil sie eine feste Größe hat. Derzeit ignorierst du einfach den Fall und der Wert wird nicht zur vollen Warteschlange hinzugefügt. Wenn Sie darauf warten, dass in der Warteschlange Platz ist, erhalten Sie keine schnelle Warteschlange (im Edge-Fall). Sie könnten das Bool von push
zurückgeben (was sagt, ob es erfolgreich war) und es von enqueue
zurückgeben. Auf diese Weise kann der Anrufer entscheiden, ob er warten oder den Wert verwerfen möchte.
Oder verwenden Sie eine Warteschlange ohne feste Größe. Boost sagt Folgendes zu dieser Entscheidung:
Kann verwendet werden, um dynamische Speicherzuweisungen während des Push-Vorgangs vollständig zu deaktivieren, um ein blockierfreies Verhalten zu gewährleisten. Wenn die Datenstruktur als feste Größe konfiguriert ist, werden die internen Knoten in einem Array gespeichert und adressiert durch Array-Indexierung. Dies begrenzt die mögliche Größe der Warteschlange auf die Anzahl der Elemente, die vom Index adressiert werden können Typ (in der Regel 2 ** 16-2), aber auf Plattformen, die Doppel-Breite Vergleichs- und Austausch-Anweisungen fehlt, ist dies der beste Weg Lock-Freiheit erreichen.
Ihr Worker-Thread benötigt mehr als 2 Status.
Wenn Sie das Herunterfahren erzwingen, wird die Leerlaufabschaltung übersprungen. Wenn Sie keine Aufgaben mehr ausführen, wird das System in den Leerlaufmodus versetzt. Im Leerlauf wird die Aufgabenwarteschlange geleert und anschließend heruntergefahren.
Herunterfahren ist eingestellt, dann gehen Sie vom Ende Ihrer Worker-Aufgabe weg.
Der Produzent bringt die Dinge zuerst in die Warteschlange. Dann überprüft es den Arbeiterstatus. Wenn Shutdown oder Idle shutdown, zuerst join
it (und Übergang zu nicht ausgeführt) dann starten Sie einen neuen Worker. Wenn nicht, starte einfach einen neuen Arbeiter.
Wenn der Hersteller einen neuen Arbeiter starten möchte, stellt er zuerst sicher, dass wir uns im nicht laufenden Zustand befinden (andernfalls logischer Fehler). Wir wechseln dann in den Status "Aufgaben ausführen" und starten den Worker-Thread.
Wenn der Hersteller die Hilfstask herunterfahren will, setzt er das Fertig-Flag. Es überprüft dann den Arbeitsstatus. Wenn es nichts außer nicht läuft, verbindet es es.
Dies kann zu einem Worker-Thread führen, der ohne wichtigen Grund gestartet wird.
Es gibt ein paar Fälle, in denen das oben genannte blockieren kann, aber da waren auch einige vorher.
Dann schreiben wir einen formellen oder semi-formalen Beweis, dass das Obige keine Nachrichten verlieren kann, weil Sie beim Schreiben von freiem Code nicht fertig sind, bevor Sie einen Beweis haben.
Tags und Links c++ multithreading c++11 lock-free