Ich schreibe einen kleinen Server, der Daten von mehreren Quellen erhält und diese Daten verarbeitet. Die Quellen und empfangenen Daten sind signifikant, aber nicht mehr als epoll sollte in der Lage sein, gut zu handhaben. Alle empfangenen Daten müssen jedoch geparst werden und eine große Anzahl von Tests durchlaufen, was zeitaufwendig ist und einen einzigen Thread trotz Epoll-Multiplexing blockiert. Grundsätzlich sollte das Muster etwa so aussehen: IO-Loop empfängt Daten und bündelt sie zu einem Job, sendet sie an den ersten im Pool verfügbaren Thread, das Bundle wird vom Job verarbeitet und das Ergebnis wird an die IO-Schleife übergeben Schreiben in Datei.
Ich habe beschlossen, einen einzelnen IO-Thread und N Worker-Threads zu verwenden. Der IO-Thread zum Akzeptieren von TCP-Verbindungen und zum Lesen von Daten ist einfach zu implementieren. Ссылка
Thread ist normalerweise auch einfach genug, aber ich kämpfe darum, den epoll IO Loop mit einem Threadpool elegant zu kombinieren. Ich finde auch keine "Best Practice" für die Verwendung von epoll mit einem Worker Pool online, aber einige Fragen zum selben Thema.
Ich habe daher eine Frage, ich hoffe jemand kann mir helfen zu antworten:
EDIT: Kann eine mögliche Lösung darin bestehen, einen Ringpuffer von der IO-Schleife zu aktualisieren, nach Aktualisierung den Ringpufferindex an die Worker über eine gemeinsame Pipe für alle Worker zu senden (wodurch die Kontrolle über diesen Index an den ersten Worker weitergegeben wird) das liest den Index aus der Pipe), soll der Worker diesen Index bis zum Ende der Verarbeitung besitzen und dann die Indexnummer durch eine Pipe wieder in den IO-Thread zurücksenden und damit die Kontrolle zurückgeben?
Meine Anwendung ist nur Linux, daher kann ich die Linux-only-Funktionalität verwenden, um dies auf die eleganteste Weise zu erreichen. Cross-Plattform-Unterstützung ist nicht erforderlich, aber Leistung und Thread-Sicherheit ist.
Wenn wir dieses Modell ausführen, weil wir die Paketgröße erst kennen, nachdem wir das Paket vollständig erhalten haben, können wir den Empfang selbst leider nicht auf einen Arbeitsthread übertragen. Stattdessen ist das beste, was wir noch tun können, ein Thread, der die Daten empfängt, die Zeiger auf vollständig empfangene Pakete ausgeben müssen.
Die Daten selbst werden wahrscheinlich am besten in einem Ringpuffer gehalten, aber wir wollen einen separaten Puffer für jede Eingangsquelle (wenn wir ein Teilpaket erhalten, können wir weiterhin von anderen Quellen empfangen, ohne die Daten zu teilen. Die verbleibende Frage ist wie man die Arbeiter darüber informiert, wenn ein neues Paket bereit ist, und ihnen einen Zeiger auf die Daten in dem Paket zu geben Da nur wenige Daten vorhanden sind, wäre dies der eleganteste Weg, dies mit Posix-Nachrichtenwarteschlangen zu tun. Diese bieten mehreren Absendern und mehreren Empfängern die Möglichkeit, Nachrichten zu schreiben und zu lesen, wobei immer sichergestellt wird, dass jede Nachricht empfangen wird und genau 1 Thread.
Sie wollen für jede Datenquelle eine Struktur, die der folgenden ähnelt, ich werde jetzt die Felder für die Zwecke durchgehen.
%Vor%Das SourceFD ist offensichtlich der Dateideskriptor für den fraglichen Datenstrom, der DataBuffer ist, wo der Paketinhalt während der Verarbeitung gehalten wird, es ist ein Ringpuffer. Der LatestPacket-Zeiger wird verwendet, um vorübergehend einen Zeiger auf das am meisten wiedergesendete Paket zu halten, falls wir ein Teilpaket empfangen und sich auf eine andere Quelle bewegen, bevor das Paket übergeben wird. Der CurrentLocation speichert, wo das letzte Paket endet, so dass wir wissen, wo wir den nächsten platzieren oder wo wir im Falle eines teilweisen Empfangs weitermachen können. Die verbleibende Größe ist der Raum, der im Puffer übrig ist, dies wird verwendet, um zu sagen, ob wir das Paket passen können oder um den Anfang zurückkreieren müssen.
Die Empfangsfunktion wird somit effektiv
Der Worker-Thread führt seine Verarbeitung mit den empfangenen Zeigern durch und erhöht dann das SizeLeft, so dass der Empfänger-Thread weiß, dass er den Puffer füllen kann. Die atomaren Funktionen werden benötigt, um den Größenwert in der Struktur zu bearbeiten, so dass wir keine Rassenbedingungen mit der size -Eigenschaft bekommen (wie es möglich ist, wird es von einem Arbeiter und dem IO-Thread gleichzeitig geschrieben, was verlorene Schreibvorgänge verursacht) Kommentar unten), sie sind hier aufgeführt und sind einfach und äußerst nützlich .
Nun habe ich einen allgemeinen Hintergrund gegeben, werde aber auf die speziell genannten Punkte eingehen:
Schließlich ist Ihre Bearbeitung ziemlich vernünftig, abgesehen von der Tatsache, wie ich vorgeschlagen habe, Nachrichtenwarteschlangen sind hier weit besser als Pipes, da sie Ereignisse sehr effizient signalisieren, eine vollständige Nachrichtenlesung garantieren und automatisches Framing ermöglichen.
Ich hoffe, das hilft, aber es ist spät, wenn ich etwas verpasse oder Fragen habe, kann ich mich gerne zur Klärung oder Erklärung äußern.
Bei meinen Tests übertraf eine Epoll-Instanz pro Thread komplizierte Threading-Modelle bei weitem. Wenn Listener-Sockets zu allen epoll-Instanzen hinzugefügt werden, würden die Worker einfach accept(2)
und der Gewinner erhält die Verbindung und verarbeitet sie für die gesamte Lebensdauer.
Ihre Arbeiter könnten in etwa so aussehen:
%Vor% Jedem zu einer epoll-Instanz hinzugefügten Dateideskriptor könnte ein struct socket_context
zugeordnet sein:
Ich mag diese Strategie, weil:
read(2)
im falschen Arbeiter; accept(2)
kümmern); accept(2)
kümmert. Und ein paar Anmerkungen zu epoll:
EAGAIN
; dup(2)
-Reihe von Aufrufen, um sich vor einigen Überraschungen zu schützen (epoll registriert die Datei descriptors , beobachtet aber tatsächlich Beschreibungen ); epoll_ctl(2)
andere Threads 'epoll-Instanzen sicher verwenden; struct epoll_event
Puffer für epoll_wait(2)
, um das Verhungern zu vermeiden. Einige andere Anmerkungen:
accept4(2)
, um einen Systemaufruf zu speichern; poll(2)
/ select(2)
ist wahrscheinlich schneller, wenn die Anzahl der Verbindungen niedrig ist. Ich hoffe, das hilft.
Ich poste die gleiche Antwort in einem anderen Post: Ich möchte auf einen Dateideskriptor und einen Mutex warten, was ist die empfohlene Vorgehensweise?
================================================== ===========
Dies ist ein sehr häufig auftretendes Problem, besonders wenn Sie ein serverseitiges Netzwerkprogramm entwickeln. Die meisten Lookouts des Servers auf der Serverseite werden wie folgt aussehen:
%Vor%Es ist das epollbasierte Serverframework mit einem einzigen Thread (der Hauptthread). Das Problem ist, es ist Single-Threaded, nicht Multi-Threaded. Es erfordert, dass proc () sollte nie blockiert oder läuft für eine signifikante Zeit (etwa 10 ms für häufige Fälle).
Wenn proc () jemals für eine lange Zeit ausgeführt wird, BENÖTIGEN WIR MULTI THREADS und führen proc () in einem getrennten Thread (dem Worker-Thread) aus.
Wir können eine Aufgabe an den Worker-Thread senden, ohne den Hauptthread zu blockieren, indem wir eine Mutex-basierte Nachrichtenwarteschlange verwenden, die schnell genug ist.
Dann brauchen wir einen Weg, um das Task-Ergebnis von einem Worker-Thread zu erhalten. Wie? Wenn wir die Nachrichtenwarteschlange direkt vor oder nach epoll_wait () überprüfen, wird die Prüfaktion jedoch nach dem Ende von epoll_wait () ausgeführt, und epoll_wait () blockiert normalerweise für 10 Mikrosekunden (häufige Fälle), wenn alle Dateideskriptoren warten sind nicht aktiv.
Für einen Server sind 10 ms eine ziemlich lange Zeit! Können wir signalisieren, dass epoll_wait () sofort endet, wenn das Ergebnis der Aufgabe generiert wird?
Ja! Ich werde beschreiben, wie es in einem meiner Open-Source-Projekte gemacht wird.
Erstellen Sie eine Pipe für alle Worker-Threads, und epoll wartet ebenfalls auf diese Pipe. Sobald ein Task-Ergebnis generiert wird, schreibt der Worker-Thread ein Byte in die Pipe, und epoll_wait () endet fast zur gleichen Zeit! - Linux-Pipe hat 5 us bis 20 us Latenz.
In meinem Projekt SSDB (eine Redis-Protokoll-kompatible In-Disk-NoSQL-Datenbank) erstelle ich eine SelectableQueue für die Weitergabe von Nachrichten zwischen den Hauptthread- und Worker-Threads. Genau wie sein Name hat SelectableQueue einen Dateideskriptor, der von epoll gewartet werden kann.
AuswählbareQueue: Ссылка
Verwendung im Hauptthread:
%Vor%Verwendung im Worker-Thread:
%Vor%Tags und Links c multithreading linux posix epoll