epoll IO mit Worker-Threads in C

8

Ich schreibe einen kleinen Server, der Daten von mehreren Quellen erhält und diese Daten verarbeitet. Die Quellen und empfangenen Daten sind signifikant, aber nicht mehr als epoll sollte in der Lage sein, gut zu handhaben. Alle empfangenen Daten müssen jedoch geparst werden und eine große Anzahl von Tests durchlaufen, was zeitaufwendig ist und einen einzigen Thread trotz Epoll-Multiplexing blockiert. Grundsätzlich sollte das Muster etwa so aussehen: IO-Loop empfängt Daten und bündelt sie zu einem Job, sendet sie an den ersten im Pool verfügbaren Thread, das Bundle wird vom Job verarbeitet und das Ergebnis wird an die IO-Schleife übergeben Schreiben in Datei.

Ich habe beschlossen, einen einzelnen IO-Thread und N Worker-Threads zu verwenden. Der IO-Thread zum Akzeptieren von TCP-Verbindungen und zum Lesen von Daten ist einfach zu implementieren. Ссылка

Thread ist normalerweise auch einfach genug, aber ich kämpfe darum, den epoll IO Loop mit einem Threadpool elegant zu kombinieren. Ich finde auch keine "Best Practice" für die Verwendung von epoll mit einem Worker Pool online, aber einige Fragen zum selben Thema.

Ich habe daher eine Frage, ich hoffe jemand kann mir helfen zu antworten:

  1. Könnte (und sollte) eventfd als Mechanismus für die 2-Wege-Synchronisation zwischen dem IO-Thread und allen Arbeitern verwendet werden? Zum Beispiel, ist es eine gute Idee für jeden Worker-Thread, seine eigene epoll-Routine auf einem shared-ereignisfd (mit einem struct-Pointer, der Daten / Informationen über den Job enthält) zu haben, d.h. die eventfd irgendwie als Job-Queue zu benutzen? Haben Sie vielleicht noch ein Eventfd, um Ergebnisse aus mehreren Worker-Threads in den IO-Thread zurückzugeben?
  2. Nachdem der IO - Thread über mehr Daten auf einem Socket informiert wurde, sollte der eigentliche recv auf dem IO - Thread stattfinden, oder sollte der Arbeiter die Daten selbst einholen, um den IO - Thread nicht beim Parsing von Datenframes etc .? Wie kann ich in diesem Fall die Sicherheit gewährleisten, z.B. für den Fall, dass recv 1,5 Frames von Daten in einem Worker Thread liest und ein anderer Worker Thread den letzten 0,5 Frame von Daten von derselben Verbindung empfängt?
  3. Wenn der Worker-Thread-Pool über Mutexe und dergleichen implementiert wird, blockiert das Warten auf Sperren den IO-Thread, wenn N + 1-Threads versuchen, dieselbe Sperre zu verwenden?
  4. Gibt es irgendwelche guten Übungsmuster für die Erstellung eines Worker-Thread-Pools um epoll mit Zwei-Wege-Kommunikation (d. h. sowohl von IO zu Arbeitern und zurück)?

EDIT: Kann eine mögliche Lösung darin bestehen, einen Ringpuffer von der IO-Schleife zu aktualisieren, nach Aktualisierung den Ringpufferindex an die Worker über eine gemeinsame Pipe für alle Worker zu senden (wodurch die Kontrolle über diesen Index an den ersten Worker weitergegeben wird) das liest den Index aus der Pipe), soll der Worker diesen Index bis zum Ende der Verarbeitung besitzen und dann die Indexnummer durch eine Pipe wieder in den IO-Thread zurücksenden und damit die Kontrolle zurückgeben?

Meine Anwendung ist nur Linux, daher kann ich die Linux-only-Funktionalität verwenden, um dies auf die eleganteste Weise zu erreichen. Cross-Plattform-Unterstützung ist nicht erforderlich, aber Leistung und Thread-Sicherheit ist.

    
agnsaft 19.02.2014, 21:21
quelle

3 Antworten

3

Wenn wir dieses Modell ausführen, weil wir die Paketgröße erst kennen, nachdem wir das Paket vollständig erhalten haben, können wir den Empfang selbst leider nicht auf einen Arbeitsthread übertragen. Stattdessen ist das beste, was wir noch tun können, ein Thread, der die Daten empfängt, die Zeiger auf vollständig empfangene Pakete ausgeben müssen.

Die Daten selbst werden wahrscheinlich am besten in einem Ringpuffer gehalten, aber wir wollen einen separaten Puffer für jede Eingangsquelle (wenn wir ein Teilpaket erhalten, können wir weiterhin von anderen Quellen empfangen, ohne die Daten zu teilen. Die verbleibende Frage ist wie man die Arbeiter darüber informiert, wenn ein neues Paket bereit ist, und ihnen einen Zeiger auf die Daten in dem Paket zu geben Da nur wenige Daten vorhanden sind, wäre dies der eleganteste Weg, dies mit Posix-Nachrichtenwarteschlangen zu tun. Diese bieten mehreren Absendern und mehreren Empfängern die Möglichkeit, Nachrichten zu schreiben und zu lesen, wobei immer sichergestellt wird, dass jede Nachricht empfangen wird und genau 1 Thread.

Sie wollen für jede Datenquelle eine Struktur, die der folgenden ähnelt, ich werde jetzt die Felder für die Zwecke durchgehen.

%Vor%

Das SourceFD ist offensichtlich der Dateideskriptor für den fraglichen Datenstrom, der DataBuffer ist, wo der Paketinhalt während der Verarbeitung gehalten wird, es ist ein Ringpuffer. Der LatestPacket-Zeiger wird verwendet, um vorübergehend einen Zeiger auf das am meisten wiedergesendete Paket zu halten, falls wir ein Teilpaket empfangen und sich auf eine andere Quelle bewegen, bevor das Paket übergeben wird. Der CurrentLocation speichert, wo das letzte Paket endet, so dass wir wissen, wo wir den nächsten platzieren oder wo wir im Falle eines teilweisen Empfangs weitermachen können. Die verbleibende Größe ist der Raum, der im Puffer übrig ist, dies wird verwendet, um zu sagen, ob wir das Paket passen können oder um den Anfang zurückkreieren müssen.

Die Empfangsfunktion wird somit effektiv

  • Kopieren Sie den Inhalt des Pakets in den Puffer
  • Move CurrentLocation, um auf das Ende des Pakets zu zeigen
  • Aktualisieren Sie SizeLeft, um den jetzt reduzierten Puffer zu berücksichtigen
  • Wenn wir das Paket nicht am Ende des Puffers anpassen können, gehen wir um
  • Wenn dort auch kein Platz ist, versuchen wir es später noch einmal, gehen aber zu einer anderen Quelle
  • Wenn wir einen partiellen Empfangsspeicher hatten, zeigt der LatestPacket-Zeiger auf den Anfang des Pakets und geht zu einem anderen Stream, bis wir den Rest erhalten haben
  • Senden Sie eine Nachricht mithilfe einer Posix-Nachrichtenwarteschlange an einen Arbeitsthread, damit er die Daten, die Nachricht, verarbeiten kann enthält einen Zeiger auf die DataSource-Struktur, so dass es daran arbeiten kann, es benötigt auch einen Zeiger auf das Paket, an dem es arbeitet, und es ist Größe, diese können berechnet werden, wenn wir das Paket
  • erhalten

Der Worker-Thread führt seine Verarbeitung mit den empfangenen Zeigern durch und erhöht dann das SizeLeft, so dass der Empfänger-Thread weiß, dass er den Puffer füllen kann. Die atomaren Funktionen werden benötigt, um den Größenwert in der Struktur zu bearbeiten, so dass wir keine Rassenbedingungen mit der size -Eigenschaft bekommen (wie es möglich ist, wird es von einem Arbeiter und dem IO-Thread gleichzeitig geschrieben, was verlorene Schreibvorgänge verursacht) Kommentar unten), sie sind hier aufgeführt und sind einfach und äußerst nützlich .

Nun habe ich einen allgemeinen Hintergrund gegeben, werde aber auf die speziell genannten Punkte eingehen:

  1. Die Verwendung des EventFD als Synchronisationsmechanismus ist größtenteils eine schlechte Idee. Sie werden feststellen, dass Sie eine nicht unerhebliche Menge an CPU-Zeit verbrauchen und es sehr schwierig ist, eine Synchronisation durchzuführen. Besonders wenn Sie mehrere Threads haben, die denselben Dateideskriptor verwenden, könnten Sie größere Probleme haben. Dies ist in der Tat ein hässlicher Hack, der manchmal funktioniert, aber kein wirklicher Ersatz für die richtige Synchronisation ist.
  2. Es ist auch eine schlechte Idee zu versuchen und den Empfang zu entladen, wie oben erklärt, Sie können das Problem mit komplexen IPC umgehen, aber ehrlich gesagt ist es unwahrscheinlich, IO empfangen wird genügend Zeit dauern, um Ihre Anwendung zu stallen, Ihre IO ist wahrscheinlich auch viel langsamer als CPU, so dass das Empfangen mit mehreren Threads wenig gewinnt. (Dies setzt voraus, dass Sie nicht sagen, haben Sie mehrere 10 Gigabit-Netzwerkkarten).
  3. Die Verwendung von Mutexen oder Locks ist hier eine dumme Idee, sie passt viel besser in die Lockless-Kodierung angesichts der geringen Menge an (gleichzeitig) geteilten Daten, Sie geben wirklich nur Arbeit und Daten weiter. Dies erhöht auch die Leistung des Empfangs-Threads und macht Ihre App wesentlich skalierbarer. Mit den hier genannten Funktionen Ссылка können Sie das ganz einfach und leicht machen. Wenn Sie es auf diese Weise tun würden, wäre ein Semaphor erforderlich. Dies kann jedes Mal entsperrt werden, wenn ein Paket von jedem Thread empfangen und gesperrt wird, der einen Job startet, um dynamisch mehr Threads zuzulassen, wenn mehr Pakete bereit sind weit weniger Aufwand als eine Homebrew-Lösung mit Mutexen.
  4. Es gibt hier keinen wirklich großen Unterschied zu irgendeinem Thread-Pool, Sie erzeugen viele Threads und dann blockieren sie alle in mq_receive in der Datennachrichten-Warteschlange, um auf Nachrichten zu warten. Wenn sie fertig sind, senden sie ihr Ergebnis zurück an den Haupt-Thread, der die Ergebnis-Nachrichten-Warteschlange zu seiner epoll-Liste hinzufügt. Es kann dann auf diese Weise Ergebnisse erhalten, es ist einfach und sehr effizient für kleine Datennutzlasten wie Zeiger. Dies wird auch wenig CPU verbrauchen und den Hauptthread nicht zwingen, Zeit zu verschwenden, um Arbeiter zu verwalten.

Schließlich ist Ihre Bearbeitung ziemlich vernünftig, abgesehen von der Tatsache, wie ich vorgeschlagen habe, Nachrichtenwarteschlangen sind hier weit besser als Pipes, da sie Ereignisse sehr effizient signalisieren, eine vollständige Nachrichtenlesung garantieren und automatisches Framing ermöglichen.

Ich hoffe, das hilft, aber es ist spät, wenn ich etwas verpasse oder Fragen habe, kann ich mich gerne zur Klärung oder Erklärung äußern.

    
Vality 25.02.2014, 23:14
quelle
4

Bei meinen Tests übertraf eine Epoll-Instanz pro Thread komplizierte Threading-Modelle bei weitem. Wenn Listener-Sockets zu allen epoll-Instanzen hinzugefügt werden, würden die Worker einfach accept(2) und der Gewinner erhält die Verbindung und verarbeitet sie für die gesamte Lebensdauer.

Ihre Arbeiter könnten in etwa so aussehen:

%Vor%

Jedem zu einer epoll-Instanz hinzugefügten Dateideskriptor könnte ein struct socket_context zugeordnet sein:

%Vor%

Ich mag diese Strategie, weil:

  • sehr einfaches Design;
  • alle Threads sind identisch;
  • Arbeiter und Verbindungen sind isoliert - kein Tritt auf die Zehen oder Aufruf von read(2) im falschen Arbeiter;
  • es sind keine Sperren erforderlich (der Kernel muss sich um die Synchronisation in accept(2) kümmern);
  • ist etwas natürlich ausgeglichen, da kein beschäftigter Arbeiter sich aktiv um accept(2) kümmert.

Und ein paar Anmerkungen zu epoll:

  • Verwenden Sie den flankengetriggerten Modus, nicht blockierende Sockets und lesen Sie immer bis EAGAIN ;
  • Vermeiden Sie die dup(2) -Reihe von Aufrufen, um sich vor einigen Überraschungen zu schützen (epoll registriert die Datei descriptors , beobachtet aber tatsächlich Beschreibungen );
  • Sie können epoll_ctl(2) andere Threads 'epoll-Instanzen sicher verwenden;
  • Verwenden Sie einen großen struct epoll_event Puffer für epoll_wait(2) , um das Verhungern zu vermeiden.

Einige andere Anmerkungen:

  • Verwenden Sie accept4(2) , um einen Systemaufruf zu speichern;
  • Verwenden Sie einen Thread pro Kern (1 für jedes physische, wenn CPU-gebunden, oder 1 für jedes logische wenn I / O-gebunden);
  • poll(2) / select(2) ist wahrscheinlich schneller, wenn die Anzahl der Verbindungen niedrig ist.

Ich hoffe, das hilft.

    
haste 20.02.2014 00:21
quelle
0

Ich poste die gleiche Antwort in einem anderen Post: Ich möchte auf einen Dateideskriptor und einen Mutex warten, was ist die empfohlene Vorgehensweise?

================================================== ===========

Dies ist ein sehr häufig auftretendes Problem, besonders wenn Sie ein serverseitiges Netzwerkprogramm entwickeln. Die meisten Lookouts des Servers auf der Serverseite werden wie folgt aussehen:

%Vor%

Es ist das epollbasierte Serverframework mit einem einzigen Thread (der Hauptthread). Das Problem ist, es ist Single-Threaded, nicht Multi-Threaded. Es erfordert, dass proc () sollte nie blockiert oder läuft für eine signifikante Zeit (etwa 10 ms für häufige Fälle).

Wenn proc () jemals für eine lange Zeit ausgeführt wird, BENÖTIGEN WIR MULTI THREADS und führen proc () in einem getrennten Thread (dem Worker-Thread) aus.

Wir können eine Aufgabe an den Worker-Thread senden, ohne den Hauptthread zu blockieren, indem wir eine Mutex-basierte Nachrichtenwarteschlange verwenden, die schnell genug ist.

Dann brauchen wir einen Weg, um das Task-Ergebnis von einem Worker-Thread zu erhalten. Wie? Wenn wir die Nachrichtenwarteschlange direkt vor oder nach epoll_wait () überprüfen, wird die Prüfaktion jedoch nach dem Ende von epoll_wait () ausgeführt, und epoll_wait () blockiert normalerweise für 10 Mikrosekunden (häufige Fälle), wenn alle Dateideskriptoren warten sind nicht aktiv.

Für einen Server sind 10 ms eine ziemlich lange Zeit! Können wir signalisieren, dass epoll_wait () sofort endet, wenn das Ergebnis der Aufgabe generiert wird?

Ja! Ich werde beschreiben, wie es in einem meiner Open-Source-Projekte gemacht wird.

Erstellen Sie eine Pipe für alle Worker-Threads, und epoll wartet ebenfalls auf diese Pipe. Sobald ein Task-Ergebnis generiert wird, schreibt der Worker-Thread ein Byte in die Pipe, und epoll_wait () endet fast zur gleichen Zeit! - Linux-Pipe hat 5 us bis 20 us Latenz.

In meinem Projekt SSDB (eine Redis-Protokoll-kompatible In-Disk-NoSQL-Datenbank) erstelle ich eine SelectableQueue für die Weitergabe von Nachrichten zwischen den Hauptthread- und Worker-Threads. Genau wie sein Name hat SelectableQueue einen Dateideskriptor, der von epoll gewartet werden kann.

AuswählbareQueue: Ссылка

Verwendung im Hauptthread:

%Vor%

Verwendung im Worker-Thread:

%Vor%     
ideawu 14.09.2017 10:40
quelle

Tags und Links