Jak połączyć HBase i Spark za pomocą Pythona?

Mam żenująco równoległe zadanie, do którego używam Sparka do dystrybucji obliczeń. Te obliczenia są w Pythonie, a ja używam PySpark do odczytu i wstępnego przetwarzania danych. Dane wejściowe do mojego zadania są przechowywane w bazie HBase. Niestety, nie znalazłem jeszcze zadowalającego (tj. łatwego w użyciu i skalowalnego) sposobu odczytu / zapisu danych HBase Z / do Sparka za pomocą Pythona.

To, co odkryłem wcześniej:

  • Łączenie z poziomu moich procesów Pythona za pomocą happybase. To pakiet umożliwia łączenie się z HBase z Pythona za pomocą Thrift API HBase. W ten sposób zasadniczo pomijam Spark do odczytu/zapisu danych i brakuje mi potencjalnych optymalizacji HBase-Spark. Prędkości odczytu wydają się dość szybkie, ale prędkości zapisu są powolne. Jest to obecnie moje najlepsze rozwiązanie.

  • Używanie sparkcontext newAPIHadoopRDD i saveAsNewAPIHadoopDataset, które wykorzystują interfejs MapReduce HBase. Przykłady tego były kiedyś zawarte w bazie kodu Spark (zobacz tutaj ). Są to jednak obecnie uważany za przestarzały na korzyść wiązań iskrowych HBase (patrz tutaj ). Okazało się również, że ta metoda jest powolna i uciążliwa( czytanie, pisanie działało dobrze), na przykład, ponieważ ciągi znaków zwracane z newAPIHadoopRDD musiały być przetwarzane i przekształcane na różne sposoby, aby skończyć z obiektami Pythona, które chciałem. Obsługiwał również tylko jedną kolumnę na raz.

Alternatywy, o których wiem:

  • Obecnie używam Cloudera CDH i wersji 5.7.0 oferty hbase-spark (CDH release notes i szczegółowy post na blogu). Moduł ten (wcześniej znany jako SparkOnHBase) będzie oficjalnie częścią HBase 2.0. Niestety, to wspaniałe rozwiązanie wydaje się działać tylko z Scala / Java.

  • Spark-SQL-on-HBase / Astro (nie widzę różnicy między tymi dwoma...). Nie wygląda tak solidny i dobrze obsługiwany, jak chciałbym, aby moje rozwiązanie było.

Author: Def_Os, 2016-07-20

1 answers

Znalazłem ten komentarz autorstwa jednego z twórców hbase-spark, który wydaje się sugerować, że istnieje sposób na użycie PySpark do odpytywania HBase za pomocą Spark SQL.

I rzeczywiście, opisany tutaj wzorzec może być zastosowany do zapytania HBase za pomocą Spark SQL przy użyciu PySpark, jak pokazuje poniższy przykład:

from pyspark import SparkContext
from pyspark.sql import SQLContext

sc = SparkContext()
sqlc = SQLContext(sc)

data_source_format = 'org.apache.hadoop.hbase.spark'

df = sc.parallelize([('a', '1.0'), ('b', '2.0')]).toDF(schema=['col0', 'col1'])

# ''.join(string.split()) in order to write a multi-line JSON string here.
catalog = ''.join("""{
    "table":{"namespace":"default", "name":"testtable"},
    "rowkey":"key",
    "columns":{
        "col0":{"cf":"rowkey", "col":"key", "type":"string"},
        "col1":{"cf":"cf", "col":"col1", "type":"string"}
    }
}""".split())


# Writing
df.write\
.options(catalog=catalog)\  # alternatively: .option('catalog', catalog)
.format(data_source_format)\
.save()

# Reading
df = sqlc.read\
.options(catalog=catalog)\
.format(data_source_format)\
.load()

Próbowałem hbase-spark-1.2.0-cdh5.7.0.jar (dystrybuowany przez Cloudera), ale wpadłem w kłopoty (org.apache.hadoop.hbase.spark.DefaultSource does not allow create table as select podczas pisania, java.util.NoSuchElementException: None.get podczas czytania). Jak się okazuje, obecna wersja CDH nie zawiera zmian hbase-spark, które umożliwiają integrację Spark SQL-HBase.

To, co robi Działa dla mnie, to shc Pakiet Spark, znaleziony tutaj . Jedyną zmianą jaką musiałem dokonać w powyższym skrypcie jest zmiana:

data_source_format = 'org.apache.spark.sql.execution.datasources.hbase'

Oto jak przesyłam powyższy skrypt na moim klastrze CDH, na przykładzie z shc README:

spark-submit --packages com.hortonworks:shc:1.0.0-1.6-s_2.10 --repositories http://repo.hortonworks.com/content/groups/public/ --files /opt/cloudera/parcels/CDH/lib/hbase/conf/hbase-site.xml example.py

Większość prac nad shc wydaje się być już scalona w hbase-spark moduł HBase, dla wydania w wersji 2.0. Z tym, Spark SQL zapytanie HBase jest możliwe przy użyciu wyżej wymienionego wzorca (patrz: https://hbase.apache.org/book.html#_sparksql_dataframes po szczegóły). Mój przykład powyżej pokazuje, jak to wygląda dla użytkowników PySpark.

Wreszcie, zastrzeżenie: mój przykład dane powyżej ma tylko ciągi. Konwersja danych w Pythonie nie jest obsługiwana przez shc, więc miałem problemy z liczbami całkowitymi i pływakami, które nie pojawiają się w HBase lub z dziwnymi wartościami.

 17
Author: Def_Os,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-11-22 00:31:35