Ich versuche eine Scala-Funktion zu schreiben, die zu Spark DataTypes basierend auf einer bereitgestellten Eingabezeichenfolge:
%Vor% Mein Ziel ist es, eine große Teilmenge, wenn nicht alle, der verfügbaren DataTypes
zu unterstützen. Als ich mit der Implementierung dieser Funktion begann, dachte ich: "Spark / Scala hat wahrscheinlich schon eine helper / util-Methode, die das für mich erledigt." Ich weiß, dass ich etwas tun kann:
Und entweder skalieren Scla und / oder Spark implizit mein "string"
Argument in StringType
usw. Also frage ich: Welche Magie kann ich mit Spark oder Scala machen, um mir bei der Implementierung meiner Konvertermethode zu helfen?
Spark / Scala hat wahrscheinlich schon eine helper / util-Methode, die das für mich erledigt.
Sie haben Recht. Spark hat bereits einen eigenen Schema- und Datentyp-Inferenzcode, mit dem das Schema aus den zugrundeliegenden Datenquellen (csv, json etc.) abgeleitet wird. Sie können sich das also ansehen, um Ihr eigenes zu implementieren (die tatsächliche Implementierung ist als privat markiert) RDD und interne Klassen gebunden, so dass es nicht direkt von Code außerhalb von Spark verwendet werden kann, aber sollte Ihnen eine gute Idee geben, wie man es macht.)
Da csv flacher Typ ist (und json eine verschachtelte Struktur haben kann), ist csv schema inference relativ einfacher und sollte Ihnen bei der Aufgabe helfen, die Sie oben erreichen wollen. Also werde ich erklären, wie die csv-Inferenz funktioniert (json-Inferenz muss nur die möglicherweise verschachtelte Struktur berücksichtigen, aber die Datentyp-Inferenz ist ziemlich analog).
Mit diesem Prolog ist die Sache, die Sie sich ansehen wollen, CSVInferSchema Objekt. Sehen Sie sich insbesondere die Methode infer
an, die ein RDD[Array[String]]
verwendet und den Datentyp für jedes Element des Arrays über die gesamte RDD ableitet. So wie es ist - es markiert jedes Feld als NullType
und beginnt dann, wenn es über die nächste Reihe von Werten ( Array[String]
) in der RDD
iteriert, es aktualisiert die bereits abgeleitete DataType
zu einem neuen DataType
wenn die neue DataType
spezifischer ist. Dies geschieht hier :
Jetzt inferRowType
ruft inferField
für jedes Feld in der Zeile auf. inferField
implementation ist, was Sie wahrscheinlich suchen - es dauert Typ abgeleitete bisher für ein bestimmtes Feld und den String-Wert des Feldes für die aktuelle Zeile als Parameter. Es gibt dann entweder den vorhandenen abgeleiteten Typ zurück oder wenn der abgeleitete neue Typ spezifischer als der neue Typ ist.
Relevanter Abschnitt des Codes ist wie folgt:
%Vor% Bitte beachten Sie, dass, wenn typeSoFar
NullType ist, es zuerst versucht, es als Integer
zu analysieren, aber tryParseInteger
call ist eine Aufrufkette, um das Parsen von niedrigeren Typen durchzuführen. Wenn es also nicht möglich ist, den Wert als Integer zu analysieren, wird% ce_de% aufgerufen. Bei einem Fehler wird tryParseLong
aufgerufen. Bei einem Fehler wird tryParseDecimal
w.o.fw.w.i aufgerufen. tryParseDouble
ww.o.fw.i tryParseTimestamp
w.o.fw.w.i. Endlich tryParseBoolean
.
Sie können also ziemlich genau die ähnliche Logik verwenden, um Ihren Anwendungsfall zu implementieren. (Wenn Sie nicht über mehrere Zeilen hinweg zusammenführen müssen, implementieren Sie einfach alle stringType
-Methoden wortwörtlich und rufen einfach tryParse*
auf. Sie müssen keine eigene Regex schreiben.)
Hoffe, das hilft.
Ja, natürlich hat Spark Magie, die du brauchst.
In Spark 2.x ist es CatalystSqlParser
-Objekt, definiert hier .
Zum Beispiel:
%Vor%Und so weiter.
Aber ich verstehe, dass es kein Teil der öffentlichen API ist und sich daher in den nächsten Versionen ohne Warnungen ändern kann.
Sie können Ihre Methode einfach wie folgt implementieren:
%Vor%Aus > scala scheint es nicht, dass Sie das tun können, was Sie tun wünsche magisch, überprüfe zum Beispiel dieses Beispiel:
%Vor%Ich habe nach dem Lesen ich möchte komponiert um den Typ einer Variablen zur Laufzeit zu erhalten .
Jetzt von > funken , da ich keine Installation installiert habe Momentan konnte ich kein Beispiel erstellen, aber es gibt nichts Offensichtliches zu verwenden, also würde ich vorschlagen, dass Sie toSparkType()
weiterschreiben, wie Sie begonnen haben, aber werfen Sie einen Blick auf die Quellcode für pyspark.sql.types zuerst.
Das Problem ist, dass Sie immer eine Zeichenfolge übergeben.
Tags und Links scala types apache-spark introspection spark-dataframe