Optimieren der Netzwerkbandbreite über verteilte Datenbankaggregationsjobs

8

Ich habe eine verteilte / föderierte Datenbank, die wie folgt strukturiert ist:

  1. Die Datenbanken sind auf drei geografische Standorte verteilt ("Knoten")
  2. Mehrere Datenbanken sind an jedem Knoten gruppiert
  3. Die relationalen Datenbanken sind eine Mischung aus PostgreSQL, MySQL, Oracle und MS SQL Server; Die nicht relationalen Datenbanken sind entweder MongoDB oder Cassandra
  4. Lose Kopplung innerhalb jedes Knotens und über die Knotenföderation hinweg wird über RabbitMQ erreicht, wobei jeder Knoten einen RabbitMQ-Broker ausführt

Ich implementiere ein readonly-inter-node-aggregations-Jobsystem für Jobs, die den Knotenverbund umfassen (d. h. für Jobs, die nicht lokal für einen Knoten sind). Diese Jobs führen nur "get" -Abfragen durch - sie ändern die Datenbanken nicht. (Wenn die Ergebnisse der Jobs in eine oder mehrere der Datenbanken gehen sollen, wird dies durch einen separaten Job ausgeführt, der nicht Teil des Inter-Node-Jobsystems ist, das ich zu optimieren versuche.) Mein Ziel ist es, die Netzwerkbandbreite, die von diesen Jobs benötigt wird (zuerst, um die Bandbreite zwischen Knoten und WAN zu minimieren und dann die Bandbreite innerhalb des Knotens / LAN zu minimieren); Ich nehme an, dass für jede WAN-Verbindung einheitliche Kosten und für jede LAN-Verbindung weitere einheitliche Kosten anfallen. Die Jobs sind nicht besonders zeitabhängig. Ich führe einen gewissen CPU-Lastausgleich innerhalb eines Knotens durch, aber nicht zwischen Knoten.

Die Datenmenge, die für die Aggregationsjobs über das WAN / LAN transportiert wird, ist relativ zur Menge der lokalen Datenbankschreibvorgänge in einem Cluster oder einer bestimmten Datenbank klein. Daher wäre es nicht sinnvoll, die Datenbanken vollständig zu verteilen der Verband.

Der grundlegende Algorithmus, den ich zur Minimierung der Netzwerkbandbreite verwende, ist:

  1. Wenn ein Job mit einer Gruppe von Daten ausgeführt wird, die über die Föderation verteilt sind, sendet der Verwaltungsknoten eine Nachricht an alle anderen Knoten, die die relevanten Datenbankabfragen enthalten.
  2. Jeder Knoten führt seinen Satz von Abfragen aus, komprimiert sie mit gzip, speichert sie zwischen und sendet ihre komprimierten Größen an den Manager-Knoten.
  3. Der Manager bewegt sich zu dem Knoten, der die Vielzahl der Daten enthält (insbesondere zu der Maschine innerhalb des Clusters, die die meisten Daten aufweist und die freie Kerne hat); Er fordert den Rest der Daten von den anderen zwei Knoten und von den anderen Maschinen innerhalb des Clusters an und führt dann den Job aus.

Wenn möglich, verwenden die Jobs einen Divide-and-Conquer-Ansatz, um den Umfang der erforderlichen Daten-Co-Location zu minimieren. Wenn der Job beispielsweise die Summen aller Verkaufszahlen im Verbund berechnen muss, berechnet jeder Knoten lokal seine Verkaufssummen, die dann auf dem Managerknoten aggregiert werden (anstatt alle nicht verarbeiteten Verkaufsdaten zum Managerknoten zu kopieren). . Manchmal (z. B. wenn Sie eine Verknüpfung zwischen zwei Tabellen durchführen, die sich an verschiedenen Knoten befinden) ist jedoch eine Datenkoordination erforderlich.

Das erste, was ich getan habe, um das zu optimieren, war, die Jobs zu aggregieren und die aggregierten Jobs in Zehn-Minuten-Zeiträumen auszuführen (die Maschinen laufen alle NTP, also kann ich ziemlich sicher sein, dass "alle zehn Minuten" dasselbe bedeutet) an jedem Knoten). Das Ziel ist, dass zwei Jobs die gleichen Daten teilen können, was die Gesamtkosten für den Transport der Daten reduziert.

  1. Bei zwei Jobs, die dieselbe Tabelle abfragen, erzeuge ich die Ergebnismenge jedes Jobs und nehme dann die Schnittmenge der beiden Resultsets.
  2. Wenn beide Jobs auf demselben Knoten ausgeführt werden sollen, werden die Netzwerkübertragungskosten als Summe der beiden Resultsets minus der Schnittmenge der beiden Resultsets berechnet.
  3. Die beiden Resultsets werden in PostgreSQL-Temporärtabellen (im Fall von relationalen Daten) oder auch in temporären Cassandra-Spaltenfamilien / MongoDB-Sammlungen (im Fall von Nosql-Daten) in dem für die Ausführung der Jobs ausgewählten Knoten gespeichert; Die ursprünglichen Abfragen werden dann für die kombinierten Ergebnismengen ausgeführt, und die gelieferten Daten werden an die einzelnen Aufträge gesendet. (Dieser Schritt wird nur bei kombinierten Resultsets durchgeführt. Einzelne Resultsetdaten werden einfach an den Job übergeben, ohne zuerst in temporären Tabellen / Spaltenfamilien / Sammlungen gespeichert zu werden.)

Dies führt zu einer Verbesserung der Netzwerkbandbreite, aber ich frage mich, ob es ein Framework / eine Bibliothek / einen Algorithmus gibt, die dies verbessern könnten. Eine Option, die ich in Betracht gezogen habe, ist das Zwischenspeichern der Resultsets auf einem Knoten und die Berücksichtigung dieser zwischengespeicherten Ergebnismengen bei der Bestimmung der Netzwerkbandbreite (dh Versuch, Resultsets über Jobs hinweg zusätzlich zu dem aktuellen Satz von vorher festgelegten co-lokalisierten Jobs wiederzuverwenden, so dass z Job-Lauf in einer 10-Minuten-Epoche kann ein zwischengespeichertes Resultset aus einer vorherigen 10-Minuten-Ergebnismenge verwenden, aber es sei denn, die Jobs verwenden genau die gleichen Resultsets (d. hes sei denn, sie verwenden identische where-Klauseln), dann kenne ich keinen Allzweckalgorithmus, der die Lücken in der Ergebnismenge füllen würde (z. B. wenn die Ergebnismenge die Klausel "where N & gt; 3" und einen anderen Job benötigt das Resultset mit der Klausel "where N & gt; 0", welchen Algorithmus könnte ich dann verwenden, um zu bestimmen, dass ich die Vereinigung der ursprünglichen Ergebnismenge und der Ergebnismenge mit der Klausel "where N & gt; 0 UND N & lt; = 3 ") - Ich könnte versuchen, meinen eigenen Algorithmus dafür zu schreiben, aber das Ergebnis wäre ein fehlerhaftes nutzloses Durcheinander. Ich müsste auch feststellen, wenn die zwischengespeicherten Daten veraltet sind - der einfachste Weg ist, den Zeitstempel der zwischengespeicherten Daten mit dem letzten modifizierten Zeitstempel in der Quelltabelle zu vergleichen und alle Daten zu ersetzen, wenn sich der Zeitstempel geändert hat, aber idealerweise Ich möchte in der Lage sein, nur die Werte zu aktualisieren, die sich mit Zeitmarken pro Zeile oder pro Stück geändert haben.

    
Zim-Zam O'Pootertoot 10.06.2013, 16:34
quelle

1 Antwort

4

Ich habe angefangen, meine Lösung für die Frage zu implementieren.

Um den Intra-Node-Cache zu vereinfachen und den CPU-Lastausgleich zu vereinfachen, verwende ich eine Cassandra-Datenbank in jedem Datenbank-Cluster ("Cassandra-Node"), um die Aggregationsjobs auszuführen (zuvor habe ich die lokale Datenbank zusammengefasst) Resultsets per Hand) - Ich benutze die einzelne Cassandra-Datenbank für die relationalen, Cassandra- und MongoDB-Daten (der Nachteil ist, dass einige relationale Abfragen auf Cassandra langsamer laufen, aber dies wird durch die Tatsache ausgeglichen, dass die einzelne vereinheitlichte Aggregationsdatenbank ist einfacher zu pflegen als die separaten relationalen und nichtrelationalen Aggregationsdatenbanken). Ich bin auch nicht mehr aggregieren Jobs in zehn Minuten Epochen, da der Cache diesen Algorithmus unnötig macht.

Jede Maschine in einem Knoten bezieht sich auf eine Cassandra-Spaltenfamilie namens Cassandra_Cache_ [MachineID], die zum Speichern der key_ids und column_ids verwendet wird, die sie an den Cassandra-Knoten gesendet hat. Die Cassandra_Cache-Spaltenfamilie besteht aus einer Tabellenspalte, einer Primary_Key-Spalte, einer Column_ID-Spalte, einer Last_Modified_Timestamp-Spalte, einer Last_Used_Timestamp-Spalte und einem zusammengesetzten Schlüssel, der aus der Tabelle | Primary_Key | Column_ID besteht. Die Spalte "Last_Modified_Timestamp" bezeichnet den letzten Änderungswert des Datums aus der Quellendatenbank und die Spalte "Last_Used_Timestamp" bezeichnet den Zeitstempel, zu dem das Datum zuletzt von einem Aggregationsauftrag verwendet / gelesen wurde. Wenn der Cassandra-Knoten Daten von einer Maschine anfordert, berechnet die Maschine die Ergebnismenge und nimmt dann die festgelegte Differenz der Ergebnismenge und der Tabelle | key | columns, die sich in Cassandra_Cache befinden und denselben Last_Modified_Timestamp haben wie die Zeilen in Cassandra_Cache (if Die Zeitstempel stimmen nicht überein. Die zwischengespeicherten Daten sind veraltet und werden zusammen mit dem neuen Last_Modified_Timestamp aktualisiert. Die lokale Maschine sendet dann die eingestellte Differenz an den Cassandra-Knoten und aktualisiert ihren Cassandra_Cache mit der eingestellten Differenz und aktualisiert den Last_Used_Timestamp für jedes zwischengespeicherte Datum, das zum Erstellen der Ergebnismenge verwendet wurde. (Eine einfachere Alternative zum Beibehalten eines separaten Zeitstempels für jede Tabelle | Schlüssel | Spalte besteht darin, einen Zeitstempel für jede Tabelle | Schlüssel beizubehalten, aber dies ist weniger präzise und der Zeitstempel der Tabelle | Schlüssel | Spalte ist nicht zu komplex.) Die Last_Used_Timestamps beibehalten Die Synchronisierung zwischen Cassandra_Caches erfordert nur, dass die lokalen Computer und Remote-Knoten den Last_Used_Timestamp senden, der jedem Job zugeordnet ist, da alle Daten in einem Job denselben Last_Used_Timestamp verwenden.

Der Cassandra-Knoten aktualisiert seine Ergebnismenge mit den neuen Daten, die er vom Knoten erhält, und auch mit den Daten, die er von den anderen Knoten erhält. Der Cassandra-Knoten verwaltet auch eine Spaltenfamilie, die dieselben Daten wie Cassandra_Cache in jedem Computer speichert (mit Ausnahme des LastModified_Timestamps, der nur auf dem lokalen Computer benötigt wird, um festzustellen, wann Daten veraltet sind), zusammen mit einer Quellen-ID, die angibt, ob die Daten gekommen sind innerhalb des Knotens oder von einem anderen Knoten aus - die ID unterscheidet zwischen den verschiedenen Knoten, unterscheidet jedoch nicht zwischen den verschiedenen Maschinen innerhalb des lokalen Knotens. (Eine andere Option besteht darin, einen einheitlichen Cassandra_Cache zu verwenden, anstatt einen Cassandra_Cache pro Maschine plus einen anderen Cassandra_Cache für den Knoten zu verwenden, aber ich entschied, dass die zusätzliche Komplexität die Speicherplatzeinsparungen nicht wert war.)

Jeder Cassandra-Knoten verwaltet auch einen Federated_Cassandra_Cache, der aus den Tupeln {Database, Table, Primary_Key, Column_ID, Last_Used_Timestamp} besteht, die vom lokalen Knoten an einen der beiden anderen Knoten gesendet wurden.

Wenn ein Job die Pipeline durchläuft, aktualisiert jeder Cassandra-Knoten seinen knoteninternen Cache mit den lokalen Resultsets und schließt auch die Unterjobs ab, die lokal ausgeführt werden können (z. B. in einem Job, um Daten zwischen mehreren Knoten zu summieren) Knoten summiert seine Intra-Node-Daten, um die Menge an Daten zu minimieren, die in der Föderation zwischen den Knoten gemeinsam lokalisiert werden muss) - ein Unterauftrag kann lokal ausgeführt werden, wenn er nur Knotendaten verwendet. Der Verwaltungsknoten bestimmt dann, auf welchem ​​Knoten der Rest des Auftrags ausgeführt werden soll: Jeder Cassandra-Knoten kann die Kosten des Sendens seines Ergebnissatzes an einen anderen Knoten lokal berechnen, indem er die festgelegte Differenz seines Ergebnissatzes und der Teilmenge des Resultsets, das zwischengespeichert wurde, nimmt zu seinem Federated_Cassandra_Cache, und der Manager-Knoten minimiert die Kostengleichung ["Kosten zum Transportieren der Ergebnismenge von NodeX" + "Kosten zum Transportieren der Ergebnismenge von NodeY"]. Zum Beispiel kostet es Node1 {3, 5}, sein Resultset zu {Node2, Node3} zu transportieren, es kostet Node2 {2, 2}, sein Resultset zu {Node1, Node3} zu transportieren, und es kostet Node3 {4, 3} Um den Ergebnissatz zu {Node1, Node2} zu transportieren, wird der Job auf Node1 mit Kosten von "6" ausgeführt.

Ich verwende für jeden Cassandra-Knoten eine LRU-Räumungsrichtlinie; Ich habe ursprünglich eine Richtlinie für die älteste Evakuierung verwendet, da sie einfacher zu implementieren ist und weniger Schreibvorgänge in der Last_Used_Timestamp-Spalte erfordert (einmal pro Datumswiederholung anstatt einmal pro gelesenen Datum), aber die Implementierung einer LRU-Richtlinie stellte sich als nicht übermäßig heraus komplex und die Last_Used_Timestamp-Schreibvorgänge haben keinen Engpass verursacht. Wenn ein Cassandra-Knoten 20% freien Speicherplatz erreicht, werden die Daten gelöscht, bis sie 30% freien Speicherplatz erreicht haben. Daher entspricht jede Räumung ungefähr der Größe von 10% des insgesamt verfügbaren Speicherplatzes. Der Knoten unterhält zwei Zeitstempel: den Zeitstempel der zuletzt geräumten Daten innerhalb des Knotens und den Zeitstempel der zuletzt entfernten Knoten / Verbunddaten; Aufgrund der erhöhten Latenz der Kommunikation zwischen Knoten relativ zu der Kommunikation innerhalb des Knotens besteht das Ziel der Räumungsrichtlinie darin, dass 75% der zwischengespeicherten Daten Knotendaten und 25% der zwischengespeicherten Daten Knotendaten sind , die schnell approximiert werden kann, indem 25% jeder Räumung Daten zwischen Knoten sind und 75% jeder Räumung Daten innerhalb eines Knotens sind. Die Vertreibung funktioniert wie folgt:

%Vor%

Ausgelöste Daten werden nicht dauerhaft gelöscht, bis Räumungsbestätigungen von den Maschinen innerhalb des Knotens und von den anderen Knoten empfangen wurden.

Der Cassandra-Knoten sendet dann eine Benachrichtigung an die Maschinen in seinem Knoten, die angibt, was der neue last_evicted_local_timestamp ist. Die lokalen Computer aktualisieren ihre Cassandra_Caches, um den neuen Zeitstempel wiederzugeben, und senden eine Benachrichtigung an den Cassandra-Knoten, wenn dies abgeschlossen ist. Wenn der Cassandra-Knoten Benachrichtigungen von allen lokalen Computern erhalten hat, löscht er permanent die entfernten lokalen Daten. Der Cassandra-Knoten sendet auch eine Benachrichtigung an die fernen Knoten mit dem neuen last_evicted_federated_timestamp; die anderen Knoten aktualisieren ihre Federated_Cassandra_Caches, um den neuen Zeitstempel wiederzugeben, und der Cassandra-Knoten löscht die entfernten föderierten Daten endgültig, wenn er Benachrichtigungen von jedem Knoten empfängt (der Cassandra-Knoten verfolgt, aus welchem ​​Knoten ein Datenelement stammt, also nach einer Räumung) Quittierung von NodeX kann der Knoten die ausgelagerten NodeX-Daten vor dem Empfang einer Räumungsbestätigung von NodeY endgültig löschen. Bis alle Maschinen / Knoten ihre Benachrichtigungen gesendet haben, verwendet der Cassandra-Knoten die zwischengespeicherten geräumten Daten in seinen Abfragen, wenn er eine Ergebnismenge von einer Maschine / einem Knoten empfängt, der seine alten Daten nicht entfernt hat. Zum Beispiel hat der Cassandra-Knoten ein lokales Tabellen | Primary_Key | Column_ID-Datum, das er entfernt hat, und währenddessen hat ein lokaler Rechner (der die Räumungsanfrage nicht verarbeitet hat) das Table | Primary_Key | Column_ID-Datum nicht in sein Resultset aufgenommen, weil es denkt dass der Cassandra-Knoten das Datum bereits in seinem Cache hat; Der Cassandra-Knoten empfängt die Ergebnismenge von der lokalen Maschine, und da die lokale Maschine die Räumungsanfrage nicht bestätigt hat, enthält der Cassandra-Knoten das zwischengespeicherte geräumte Datum in seiner eigenen Ergebnismenge.

    
Zim-Zam O'Pootertoot 12.06.2013, 14:31
quelle