Ich verwende Pipelines-Muster Implementierung, um Nachrichtenkonsumenten von einem Produzenten zu entkoppeln, um Verzögerungen zu vermeiden -konsum Problem.
Im Falle einer Ausnahme in einer Nachrichtenverarbeitungsstufe [1]
wird sie verloren gehen und nicht an einen anderen Service / Layer [2]
gesendet. Wie kann ich mit einem solchen Problem in [3]
umgehen, damit die Nachricht nicht verloren geht und was wichtig ist! Die Reihenfolge der Nachrichten wird nicht durcheinander gebracht, so dass der obere Dienst / Schicht Nachrichten in der Reihenfolge erhält, in der sie eingegangen sind. Ich habe eine Idee, die ein anderes intermediate Queue
beinhaltet, aber es scheint komplex zu sein? Leider stellt BlockingCollection<T>
kein Analogon von Queue.Peek()
zur Verfügung, so dass ich nur die nächste verfügbare Nachricht lesen kann und im Falle einer erfolgreichen Verarbeitung Dequeue()
BEARBEITEN : Vergessen Sie zu sagen, dass dies der von IIS gehostete WCF-Dienst ist, der Nachrichten über den Client-Callback-Vertrag an den Silverlight-Client-WCF-Proxy zurücksendet.
BEARBEITEN2: Unten ist, wie ich das mit Peek()
machen würde, fehle ich etwas?
EDIT3: Sicher kann ich den alten Stil mit while loop, bool flag, Queue
und AutoResetEvent
verwenden, aber ich frage mich, ob das gleiche mit BlockingCollection
und% co_de möglich ist % Ich denke Anlage wie GetConsumingEnumerable()
wäre
sehr hilfreich bei der Verwendung zusammen mit dem Aufzählen, da sonst alle Pipeline-Pattern-Implementierungsbeispiele wie zB Peek
und BlockingCollection
nicht dauerhaft aussehen und ich zurück zum alten Ansatz gehen muss.
Sie sollten die Zwischenwarteschlange berücksichtigen.
BlockingCollection<T>
kann aufgrund seiner Art keine Artikel "sehen" - es kann mehr als einen Verbraucher geben. Einer von ihnen kann einen Gegenstand sehen und ein anderer kann ihn nehmen - daher wird der erste versuchen, einen Gegenstand aufzunehmen, der bereits genommen wurde.
BlockingCollection<T>
stellt jedem Implementierer von IProducerConsumerCollection<T>
Schnittstelle.
Wie Sie sehen können, definiert IProducerConsumerCollection<T>
per Entwurf keine Peek<T>
oder andere Methoden, um eine zu implementieren. Dies bedeutet, dass BlockingCollection<T>
, so wie es aussieht, nicht zu Peek
anbieten kann.
Wenn Sie das in Betracht ziehen, reduziert dies in hohem Maße die Konkurrenzprobleme, die durch den Utility-Trade einer Peek
-Implementierung verursacht werden. Wie können Sie konsumieren, ohne zu konsumieren? Bis Peek
gleichzeitig müssten Sie den Kopf der Sammlung sperren, bis die Operation Peek
abgeschlossen ist, die ich und die Designer von BlockingCollection<T>
als nicht optimal betrachten. Ich denke, es wäre auch chaotisch und schwierig zu implementieren, erfordert eine Art von Einweg-Peek-Kontext.
Wenn Sie eine Nachricht konsumieren und ihr Verbrauch fehlschlägt, müssen Sie damit umgehen. Sie könnten es zu einer anderen Fehlerwarteschlange hinzufügen, es zur normalen Verarbeitungswarteschlange für eine weitere Wiederholung hinzufügen oder es für die Nachwelt oder eine andere Aktion, die Ihrem Kontext entspricht, einfach protokollieren.
Wenn Sie die Nachrichten nicht gleichzeitig konsumieren möchten, müssen Sie BlockingCollection<T>
nicht verwenden, da Sie keine gleichzeitige Verwendung benötigen. Du könntest ConcurrentQueue<T>
direkt verwenden, du erhältst immer noch Synchronizität und du kannst Verwenden Sie TryPeek<T>
sicher, da Sie einen einzelnen Verbraucher kontrollieren. Wenn der Verbrauch versagt, könntest du den Konsum mit einer unendlichen Wiederholungsschleife in deinem Wunsch stoppen, obwohl, ich schlage vor, dass dies einige Designgedanken erfordert.
BlockingCollection<T>
ist ein Wrapper um IProducerConsumerCollection<T>
, der allgemeiner als z.B. ConcurrentQueue
und gibt dem Implementierer die Freiheit, keine (Try)Peek
-Methode implementieren zu müssen.
Sie können jedoch TryPeek
immer direkt für die zugrunde liegende Warteschlange aufrufen:
Beachten Sie jedoch, dass Sie Ihre Warteschlange nicht über useOnlyForPeeking
ändern müssen, da sonst blockingCollection
verwechselt wird und möglicherweise throw InvalidOperationException
s auf Sie, aber ich wäre überrascht, wenn ein Aufruf der nicht-ändernden TryPeek
auf dieser gleichzeitigen Datenstruktur ein Problem wäre.
Sie können ConcurrentQueue<T>
verwenden stattdessen hat es TryDequeue()
Methode .
ConcurrentQueue<T>.TryDequeue(out T result)
versucht das Objekt am Anfang der Warteschlange zu entfernen und zurückzugeben, es gibt true zurück, wenn ein Element entfernt und vom Anfang der Warteschlange zurückgegeben wurde ConcurrentQueue erfolgreich.
Also, keine Notwendigkeit, einen Peek zuerst zu überprüfen.
TryDequeue()
ist threadsicher:
ConcurrentQueue<T>
behandelt die gesamte Synchronisation intern . Wenn zwei Threads genau zum selben Zeitpunkt TryDequeue (T) aufrufen, wird keine Operation blockiert.
Soweit ich das verstehe gibt es nur dann false zurück, wenn die Warteschlange leer ist :
Wenn die Warteschlange mit Code wie q.Enqueue ("a") gefüllt wurde; q.Enqueue ("b"); q.Enqueue ("c"); und zwei Threads versuchen gleichzeitig, ein Element aus der Warteschlange zu entfernen, ein Thread entnimmt a und der andere Thread entnimmt b. Beide Aufrufe von TryDequeue (T) geben true zurück, da beide ein Element aus der Warteschlange entfernen konnten. Wenn jeder Thread ein zusätzliches Element aus der Warteschlange entfernt, entfernt einer der Threads c und gibt true zurück, während der andere Thread die Warteschlange leer findet und false zurückgibt.
AKTUALISIEREN
Vielleicht wäre die einfachste Option die Verwendung von TaskScheduler
Klasse. Damit können Sie alle Ihre Verarbeitungsaufgaben in die Elemente der Warteschlange einbinden und die Implementierung der Synchronisierung vereinfachen.
Tags und Links wcf c# multithreading task-parallel-library .net-4.0