Ich versuche, DynamoDB-Tabellenänderungen mithilfe von DynamoDB-Streams und des von AWS bereitgestellten Java DynamoDB-Stream-Kinesis-Adapters zu erfassen. Ich arbeite mit den AWS Java SDKs in einer Scala App.
Ich habe angefangen, dem AWS-Leitfaden zu folgen und durchzugehen Das AWS veröffentlichte Codebeispiel . Ich habe jedoch Probleme, Amazons eigenen Code in meiner Umgebung zu verwenden. Mein Problem liegt beim Objekt KinesisClientLibConfiguration
.
Im Beispielcode ist KinesisClientLibConfiguration
mit dem von DynamoDB bereitgestellten Stream-ARN konfiguriert.
Ich habe ein ähnliches Muster in meiner Scala-App verfolgt, indem ich zuerst den aktuellen ARN aus meiner Dynamo-Tabelle gefunden habe:
%Vor% Und dann die KinesisClientLibConfiguration
mit dem bereitgestellten ARN erstellen:
Ich habe den bereitgestellten ARN-Stream überprüft und alles stimmt mit dem überein, was ich in der AWS-Konsole sehe.
Zur Laufzeit bekomme ich eine Ausnahme, die besagt, dass der angegebene ARN kein gültiger Stream-Name ist:
%Vor% Wenn man sich die Dokumentation von KinesisClientLibConfiguration
anschaut, macht dies Sinn, da der zweite Parameter als streamName aufgeführt ist, ohne dass ein ARN erwähnt wird.
Ich kann anscheinend nichts in KinesisClientLibConfiguration
finden, das mit einem ARN zusammenhängt. Da ich mit einem DynamoDB-Stream und nicht mit einem Kinesis-Stream arbeite, weiß ich auch nicht, wie ich meinen Stream-Namen finden kann.
An diesem Punkt bin ich mir nicht sicher, was ich im veröffentlichten AWS-Beispiel vermisse, es scheint, als ob sie eine viel ältere Version der KCL verwenden. Ich benutze Version 1.7.0 von amazon-kinesis-client.
Das Problem endete tatsächlich außerhalb meiner KinesisClientLibConfiguration
.
Ich konnte dieses Problem umgehen, indem ich dieselbe Konfiguration verwendete und sowohl den Stream-Adapter, der in der DynamoDB-Stream-Adapter-Bibliothek enthalten ist, als auch die Clients für DynamoDB und CloudWatch bereitstellte.
Meine Arbeitslösung sieht jetzt so aus.
Definieren der Kinesis-Client-Konfiguration.
%Vor%Definieren Sie einen Stream-Adapter und einen CloudWatch-Client
%Vor% Erstellen Sie eine Instanz von RecordProcessorFactory
, es liegt an Ihnen, eine Klasse zu definieren, die die von KCL bereitgestellte IRecordProcessorFactory
und die zurückgegebene IRecordProcessor
implementiert.
Und der Teil, den ich vermisste, all das muss Ihrem Arbeiter zur Verfügung gestellt werden.
%Vor%Nur um zu antworten, was das Problem war - Sie haben das ARN bereitgestellt, als es nur den Stream-Namen wollte.
Ich habe kürzlich eine PR zu diesem Projekt gfc-aws-kinesis gemacht und Sie können es jetzt einfach verwenden Übergeben des Adapters und Schreiben einer KinesisRecordAdapter-Implementierung.
Im Beispiel verwende ich Scanamo , um die hashmap zu analysieren
Erstellen Sie den Client
%Vor%Übergeben Sie es in der Konfiguration:
%Vor%Erstellen Sie einen impliziten Datensatzleser, der zum Lesen dynamoDB-Ereignisse geeignet ist:
%Vor%Tags und Links scala java amazon-web-services amazon-dynamodb amazon-kinesis