Verarbeiten von DynamoDB-Streams mithilfe des AWS Java DynamoDB-Stream-Kinesis-Adapters

9

Ich versuche, DynamoDB-Tabellenänderungen mithilfe von DynamoDB-Streams und des von AWS bereitgestellten Java DynamoDB-Stream-Kinesis-Adapters zu erfassen. Ich arbeite mit den AWS Java SDKs in einer Scala App.

Ich habe angefangen, dem AWS-Leitfaden zu folgen und durchzugehen Das AWS veröffentlichte Codebeispiel . Ich habe jedoch Probleme, Amazons eigenen Code in meiner Umgebung zu verwenden. Mein Problem liegt beim Objekt KinesisClientLibConfiguration .

Im Beispielcode ist KinesisClientLibConfiguration mit dem von DynamoDB bereitgestellten Stream-ARN konfiguriert.

%Vor%

Ich habe ein ähnliches Muster in meiner Scala-App verfolgt, indem ich zuerst den aktuellen ARN aus meiner Dynamo-Tabelle gefunden habe:

%Vor%

Und dann die KinesisClientLibConfiguration mit dem bereitgestellten ARN erstellen:

%Vor%

Ich habe den bereitgestellten ARN-Stream überprüft und alles stimmt mit dem überein, was ich in der AWS-Konsole sehe.

Zur Laufzeit bekomme ich eine Ausnahme, die besagt, dass der angegebene ARN kein gültiger Stream-Name ist:

%Vor%

Wenn man sich die Dokumentation von KinesisClientLibConfiguration anschaut, macht dies Sinn, da der zweite Parameter als streamName aufgeführt ist, ohne dass ein ARN erwähnt wird.

Ich kann anscheinend nichts in KinesisClientLibConfiguration finden, das mit einem ARN zusammenhängt. Da ich mit einem DynamoDB-Stream und nicht mit einem Kinesis-Stream arbeite, weiß ich auch nicht, wie ich meinen Stream-Namen finden kann.

An diesem Punkt bin ich mir nicht sicher, was ich im veröffentlichten AWS-Beispiel vermisse, es scheint, als ob sie eine viel ältere Version der KCL verwenden. Ich benutze Version 1.7.0 von amazon-kinesis-client.

    
francis 15.10.2016, 15:23
quelle

4 Antworten

3

Das Problem endete tatsächlich außerhalb meiner KinesisClientLibConfiguration .

Ich konnte dieses Problem umgehen, indem ich dieselbe Konfiguration verwendete und sowohl den Stream-Adapter, der in der DynamoDB-Stream-Adapter-Bibliothek enthalten ist, als auch die Clients für DynamoDB und CloudWatch bereitstellte.

Meine Arbeitslösung sieht jetzt so aus.

Definieren der Kinesis-Client-Konfiguration.

%Vor%

Definieren Sie einen Stream-Adapter und einen CloudWatch-Client

%Vor%

Erstellen Sie eine Instanz von RecordProcessorFactory , es liegt an Ihnen, eine Klasse zu definieren, die die von KCL bereitgestellte IRecordProcessorFactory und die zurückgegebene IRecordProcessor implementiert.

%Vor%

Und der Teil, den ich vermisste, all das muss Ihrem Arbeiter zur Verfügung gestellt werden.

%Vor%     
francis 19.10.2016, 18:55
quelle
0

Alternativ können Sie com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorker anstelle von com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker verwenden.  die intern AmazonDynamoDBStreamsAdapterClient verwendet.

d. h.

%Vor%     
anon 09.01.2017 16:39
quelle
0

Nur um zu antworten, was das Problem war - Sie haben das ARN bereitgestellt, als es nur den Stream-Namen wollte.

    
Steve Gula 24.02.2017 16:50
quelle
0

Ich habe kürzlich eine PR zu diesem Projekt gfc-aws-kinesis gemacht und Sie können es jetzt einfach verwenden Übergeben des Adapters und Schreiben einer KinesisRecordAdapter-Implementierung.

Im Beispiel verwende ich Scanamo , um die hashmap zu analysieren

Erstellen Sie den Client

%Vor%

Übergeben Sie es in der Konfiguration:

%Vor%

Erstellen Sie einen impliziten Datensatzleser, der zum Lesen dynamoDB-Ereignisse geeignet ist:

%Vor%     
Vasilis Nicolaou 12.06.2017 18:07
quelle