Analog von Queue.Peek () für BlockingCollection beim Anhören von IEnumerableT

8

Ich verwende Pipelines-Muster Implementierung, um Nachrichtenkonsumenten von einem Produzenten zu entkoppeln, um Verzögerungen zu vermeiden -konsum Problem.

Im Falle einer Ausnahme in einer Nachrichtenverarbeitungsstufe [1] wird sie verloren gehen und nicht an einen anderen Service / Layer [2] gesendet. Wie kann ich mit einem solchen Problem in [3] umgehen, damit die Nachricht nicht verloren geht und was wichtig ist! Die Reihenfolge der Nachrichten wird nicht durcheinander gebracht, so dass der obere Dienst / Schicht Nachrichten in der Reihenfolge erhält, in der sie eingegangen sind. Ich habe eine Idee, die ein anderes intermediate Queue beinhaltet, aber es scheint komplex zu sein? Leider stellt BlockingCollection<T> kein Analogon von Queue.Peek() zur Verfügung, so dass ich nur die nächste verfügbare Nachricht lesen kann und im Falle einer erfolgreichen Verarbeitung Dequeue()

%Vor%

BEARBEITEN : Vergessen Sie zu sagen, dass dies der von IIS gehostete WCF-Dienst ist, der Nachrichten über den Client-Callback-Vertrag an den Silverlight-Client-WCF-Proxy zurücksendet.

BEARBEITEN2: Unten ist, wie ich das mit Peek() machen würde, fehle ich etwas?

%Vor%

EDIT3: Sicher kann ich den alten Stil mit while loop, bool flag, Queue und AutoResetEvent verwenden, aber ich frage mich, ob das gleiche mit BlockingCollection und% co_de möglich ist % Ich denke Anlage wie GetConsumingEnumerable() wäre sehr hilfreich bei der Verwendung zusammen mit dem Aufzählen, da sonst alle Pipeline-Pattern-Implementierungsbeispiele wie zB Peek und BlockingCollection nicht dauerhaft aussehen und ich zurück zum alten Ansatz gehen muss.

    
sll 27.11.2012, 10:16
quelle

4 Antworten

7

Sie sollten die Zwischenwarteschlange berücksichtigen.

BlockingCollection<T> kann aufgrund seiner Art keine Artikel "sehen" - es kann mehr als einen Verbraucher geben. Einer von ihnen kann einen Gegenstand sehen und ein anderer kann ihn nehmen - daher wird der erste versuchen, einen Gegenstand aufzunehmen, der bereits genommen wurde.

    
Dennis 27.11.2012, 11:03
quelle
4
Dennis sagt in seinem Kommentar BlockingCollection<T> stellt jedem Implementierer von IProducerConsumerCollection<T> Schnittstelle.

Wie Sie sehen können, definiert IProducerConsumerCollection<T> per Entwurf keine Peek<T> oder andere Methoden, um eine zu implementieren. Dies bedeutet, dass BlockingCollection<T> , so wie es aussieht, nicht zu Peek anbieten kann.

Wenn Sie das in Betracht ziehen, reduziert dies in hohem Maße die Konkurrenzprobleme, die durch den Utility-Trade einer Peek -Implementierung verursacht werden. Wie können Sie konsumieren, ohne zu konsumieren? Bis Peek gleichzeitig müssten Sie den Kopf der Sammlung sperren, bis die Operation Peek abgeschlossen ist, die ich und die Designer von BlockingCollection<T> als nicht optimal betrachten. Ich denke, es wäre auch chaotisch und schwierig zu implementieren, erfordert eine Art von Einweg-Peek-Kontext.

Wenn Sie eine Nachricht konsumieren und ihr Verbrauch fehlschlägt, müssen Sie damit umgehen. Sie könnten es zu einer anderen Fehlerwarteschlange hinzufügen, es zur normalen Verarbeitungswarteschlange für eine weitere Wiederholung hinzufügen oder es für die Nachwelt oder eine andere Aktion, die Ihrem Kontext entspricht, einfach protokollieren.

Wenn Sie die Nachrichten nicht gleichzeitig konsumieren möchten, müssen Sie BlockingCollection<T> nicht verwenden, da Sie keine gleichzeitige Verwendung benötigen. Du könntest ConcurrentQueue<T> direkt verwenden, du erhältst immer noch Synchronizität und du kannst Verwenden Sie TryPeek<T> sicher, da Sie einen einzelnen Verbraucher kontrollieren. Wenn der Verbrauch versagt, könntest du den Konsum mit einer unendlichen Wiederholungsschleife in deinem Wunsch stoppen, obwohl, ich schlage vor, dass dies einige Designgedanken erfordert.

    
Jodrell 27.11.2012 11:31
quelle
2

BlockingCollection<T> ist ein Wrapper um IProducerConsumerCollection<T> , der allgemeiner als z.B. ConcurrentQueue und gibt dem Implementierer die Freiheit, keine (Try)Peek -Methode implementieren zu müssen.

Sie können jedoch TryPeek immer direkt für die zugrunde liegende Warteschlange aufrufen:

%Vor%

Beachten Sie jedoch, dass Sie Ihre Warteschlange nicht über useOnlyForPeeking ändern müssen, da sonst blockingCollection verwechselt wird und möglicherweise throw InvalidOperationException s auf Sie, aber ich wäre überrascht, wenn ein Aufruf der nicht-ändernden TryPeek auf dieser gleichzeitigen Datenstruktur ein Problem wäre.

    
Eugene Beresovsky 06.06.2014 03:15
quelle
0

Sie können ConcurrentQueue<T> verwenden stattdessen hat es TryDequeue() Methode .

  

ConcurrentQueue<T>.TryDequeue(out T result) versucht das Objekt am Anfang der Warteschlange zu entfernen und zurückzugeben, es gibt true zurück, wenn ein Element entfernt und vom Anfang der Warteschlange zurückgegeben wurde ConcurrentQueue erfolgreich.

Also, keine Notwendigkeit, einen Peek zuerst zu überprüfen.

TryDequeue() ist threadsicher:

  

ConcurrentQueue<T> behandelt die gesamte Synchronisation intern . Wenn zwei Threads genau zum selben Zeitpunkt TryDequeue (T) aufrufen, wird keine Operation blockiert.

Soweit ich das verstehe gibt es nur dann false zurück, wenn die Warteschlange leer ist :

  

Wenn die Warteschlange mit Code wie q.Enqueue ("a") gefüllt wurde; q.Enqueue ("b"); q.Enqueue ("c"); und zwei Threads versuchen gleichzeitig, ein Element aus der Warteschlange zu entfernen, ein Thread entnimmt a und der andere Thread entnimmt b. Beide Aufrufe von TryDequeue (T) geben true zurück, da beide ein Element aus der Warteschlange entfernen konnten. Wenn jeder Thread ein zusätzliches Element aus der Warteschlange entfernt, entfernt einer der Threads c und gibt true zurück, während der andere Thread die Warteschlange leer findet und false zurückgibt.

Ссылка

AKTUALISIEREN

Vielleicht wäre die einfachste Option die Verwendung von TaskScheduler Klasse. Damit können Sie alle Ihre Verarbeitungsaufgaben in die Elemente der Warteschlange einbinden und die Implementierung der Synchronisierung vereinfachen.

    
maximpa 27.11.2012 10:56
quelle