Jak połączyć HBase i Spark za pomocą Pythona?
Mam żenująco równoległe zadanie, do którego używam Sparka do dystrybucji obliczeń. Te obliczenia są w Pythonie, a ja używam PySpark do odczytu i wstępnego przetwarzania danych. Dane wejściowe do mojego zadania są przechowywane w bazie HBase. Niestety, nie znalazłem jeszcze zadowalającego (tj. łatwego w użyciu i skalowalnego) sposobu odczytu / zapisu danych HBase Z / do Sparka za pomocą Pythona.
To, co odkryłem wcześniej:
Łączenie z poziomu moich procesów Pythona za pomocą
happybase
. To pakiet umożliwia łączenie się z HBase z Pythona za pomocą Thrift API HBase. W ten sposób zasadniczo pomijam Spark do odczytu/zapisu danych i brakuje mi potencjalnych optymalizacji HBase-Spark. Prędkości odczytu wydają się dość szybkie, ale prędkości zapisu są powolne. Jest to obecnie moje najlepsze rozwiązanie.Używanie sparkcontext
newAPIHadoopRDD
isaveAsNewAPIHadoopDataset
, które wykorzystują interfejs MapReduce HBase. Przykłady tego były kiedyś zawarte w bazie kodu Spark (zobacz tutaj ). Są to jednak obecnie uważany za przestarzały na korzyść wiązań iskrowych HBase (patrz tutaj ). Okazało się również, że ta metoda jest powolna i uciążliwa( czytanie, pisanie działało dobrze), na przykład, ponieważ ciągi znaków zwracane znewAPIHadoopRDD
musiały być przetwarzane i przekształcane na różne sposoby, aby skończyć z obiektami Pythona, które chciałem. Obsługiwał również tylko jedną kolumnę na raz.
Alternatywy, o których wiem:
Obecnie używam Cloudera CDH i wersji 5.7.0 oferty
hbase-spark
(CDH release notes i szczegółowy post na blogu). Moduł ten (wcześniej znany jakoSparkOnHBase
) będzie oficjalnie częścią HBase 2.0. Niestety, to wspaniałe rozwiązanie wydaje się działać tylko z Scala / Java.Spark-SQL-on-HBase / Astro (nie widzę różnicy między tymi dwoma...). Nie wygląda tak solidny i dobrze obsługiwany, jak chciałbym, aby moje rozwiązanie było.
1 answers
Znalazłem ten komentarz autorstwa jednego z twórców hbase-spark
, który wydaje się sugerować, że istnieje sposób na użycie PySpark do odpytywania HBase za pomocą Spark SQL.
I rzeczywiście, opisany tutaj wzorzec może być zastosowany do zapytania HBase za pomocą Spark SQL przy użyciu PySpark, jak pokazuje poniższy przykład:
from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext()
sqlc = SQLContext(sc)
data_source_format = 'org.apache.hadoop.hbase.spark'
df = sc.parallelize([('a', '1.0'), ('b', '2.0')]).toDF(schema=['col0', 'col1'])
# ''.join(string.split()) in order to write a multi-line JSON string here.
catalog = ''.join("""{
"table":{"namespace":"default", "name":"testtable"},
"rowkey":"key",
"columns":{
"col0":{"cf":"rowkey", "col":"key", "type":"string"},
"col1":{"cf":"cf", "col":"col1", "type":"string"}
}
}""".split())
# Writing
df.write\
.options(catalog=catalog)\ # alternatively: .option('catalog', catalog)
.format(data_source_format)\
.save()
# Reading
df = sqlc.read\
.options(catalog=catalog)\
.format(data_source_format)\
.load()
Próbowałem hbase-spark-1.2.0-cdh5.7.0.jar
(dystrybuowany przez Cloudera), ale wpadłem w kłopoty (org.apache.hadoop.hbase.spark.DefaultSource does not allow create table as select
podczas pisania, java.util.NoSuchElementException: None.get
podczas czytania). Jak się okazuje, obecna wersja CDH nie zawiera zmian hbase-spark
, które umożliwiają integrację Spark SQL-HBase.
To, co robi Działa dla mnie, to shc
Pakiet Spark, znaleziony tutaj . Jedyną zmianą jaką musiałem dokonać w powyższym skrypcie jest zmiana:
data_source_format = 'org.apache.spark.sql.execution.datasources.hbase'
Oto jak przesyłam powyższy skrypt na moim klastrze CDH, na przykładzie z shc
README:
spark-submit --packages com.hortonworks:shc:1.0.0-1.6-s_2.10 --repositories http://repo.hortonworks.com/content/groups/public/ --files /opt/cloudera/parcels/CDH/lib/hbase/conf/hbase-site.xml example.py
Większość prac nad shc
wydaje się być już scalona w hbase-spark
moduł HBase, dla wydania w wersji 2.0. Z tym, Spark SQL zapytanie HBase jest możliwe przy użyciu wyżej wymienionego wzorca (patrz: https://hbase.apache.org/book.html#_sparksql_dataframes po szczegóły). Mój przykład powyżej pokazuje, jak to wygląda dla użytkowników PySpark.
Wreszcie, zastrzeżenie: mój przykład dane powyżej ma tylko ciągi. Konwersja danych w Pythonie nie jest obsługiwana przez shc
, więc miałem problemy z liczbami całkowitymi i pływakami, które nie pojawiają się w HBase lub z dziwnymi wartościami.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-11-22 00:31:35