Jak czytać z hbase za pomocą spark

Poniższy kod zostanie odczytany z bazy hbase, a następnie przekonwertowany na strukturę json i konwertowany na schemaRDD, ale problem polega na tym, że jestem using List aby zapisać ciąg json, a następnie przekazać do javaRDD, dla danych o 100 GB master zostanie załadowany z danymi w pamięci. Jaki jest właściwy sposób załadować dane z hbase następnie wykonać manipulację, a następnie przekonwertować do JavaRDD.

package hbase_reader;


import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;

import scala.Function1;
import scala.Tuple2;
import scala.runtime.AbstractFunction1;

import com.google.common.collect.Lists;

public class hbase_reader {

    public static void main(String[] args) throws IOException, ParseException {

        List<String> jars = Lists.newArrayList("");

        SparkConf spconf = new SparkConf();
        spconf.setMaster("local[2]");
        spconf.setAppName("HBase");
        //spconf.setSparkHome("/opt/human/opt/spark-0.9.0-hdp1");
        spconf.setJars(jars.toArray(new String[jars.size()]));
        JavaSparkContext sc = new JavaSparkContext(spconf);
        //spconf.set("spark.executor.memory", "1g");

        JavaSQLContext jsql = new JavaSQLContext(sc);


        HBaseConfiguration conf = new HBaseConfiguration();
        String tableName = "HBase.CounData1_Raw_Min1";
        HTable table = new HTable(conf,tableName);
        try {

            ResultScanner scanner = table.getScanner(new Scan());
            List<String> jsonList = new ArrayList<String>();

            String json = null;

            for(Result rowResult:scanner) {
                json = "";
                String rowKey  = Bytes.toString(rowResult.getRow());
                for(byte[] s1:rowResult.getMap().keySet()) {
                    String s1_str = Bytes.toString(s1);

                    String jsonSame = "";
                    for(byte[] s2:rowResult.getMap().get(s1).keySet()) {
                        String s2_str = Bytes.toString(s2);
                        for(long s3:rowResult.getMap().get(s1).get(s2).keySet()) {
                            String s3_str = new String(rowResult.getMap().get(s1).get(s2).get(s3));
                            jsonSame += "\""+s2_str+"\":"+s3_str+",";
                        }
                    }
                    jsonSame = jsonSame.substring(0,jsonSame.length()-1);
                    json += "\""+s1_str+"\""+":{"+jsonSame+"}"+",";
                }
                json = json.substring(0,json.length()-1);
                json = "{\"RowKey\":\""+rowKey+"\","+json+"}";
                jsonList.add(json);
            }

            JavaRDD<String> jsonRDD = sc.parallelize(jsonList);

            JavaSchemaRDD schemaRDD = jsql.jsonRDD(jsonRDD);




            System.out.println(schemaRDD.take(2));

        } finally {
            table.close();
        }

    }

}
Author: smola, 2014-07-30

4 answers

Podstawowy przykład odczytu danych HBase za pomocą Spark( Scala), Możesz również napisać to w Javie:

import org.apache.hadoop.hbase.client.{HBaseAdmin, Result}
import org.apache.hadoop.hbase.{ HBaseConfiguration, HTableDescriptor }
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.spark._

object HBaseRead {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("HBaseRead").setMaster("local[2]")
    val sc = new SparkContext(sparkConf)
    val conf = HBaseConfiguration.create()
    val tableName = "table1"

    System.setProperty("user.name", "hdfs")
    System.setProperty("HADOOP_USER_NAME", "hdfs")
    conf.set("hbase.master", "localhost:60000")
    conf.setInt("timeout", 120000)
    conf.set("hbase.zookeeper.quorum", "localhost")
    conf.set("zookeeper.znode.parent", "/hbase-unsecure")
    conf.set(TableInputFormat.INPUT_TABLE, tableName)

    val admin = new HBaseAdmin(conf)
    if (!admin.isTableAvailable(tableName)) {
      val tableDesc = new HTableDescriptor(tableName)
      admin.createTable(tableDesc)
    }

    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
    println("Number of Records found : " + hBaseRDD.count())
    sc.stop()
  }
}

Aktualizacja -2016

[[4]}od Spark 1.0.x+, teraz Można również użyć złącza Spark-HBase:

Zależność Mavena do włączenia:

<dependency>
  <groupId>it.nerdammer.bigdata</groupId>
  <artifactId>spark-hbase-connector_2.10</artifactId>
  <version>1.0.3</version> // Version can be changed as per your Spark version, I am using Spark 1.6.x
</dependency>

I znajdź poniżej przykładowy kod dla tego samego:

import org.apache.spark._
import it.nerdammer.spark.hbase._

object HBaseRead extends App {
    val sparkConf = new SparkConf().setAppName("Spark-HBase").setMaster("local[4]")
    sparkConf.set("spark.hbase.host", "<YourHostnameOnly>") //e.g. 192.168.1.1 or localhost or your hostanme
    val sc = new SparkContext(sparkConf)

    // For Example If you have an HBase Table as 'Document' with ColumnFamily 'SMPL' and qualifier as 'DocID, Title' then:

    val docRdd = sc.hbaseTable[(Option[String], Option[String])]("Document")
    .select("DocID", "Title").inColumnFamily("SMPL")

    println("Number of Records found : " + docRdd .count())
}

Aktualizacja-2017

Od Spark 1.6.x+, teraz Można również używać Złącza SHC (użytkownicy Hortonworks lub HDP):

Zależność Mavena do włączenia :

    <dependency>
        <groupId>com.hortonworks</groupId>
        <artifactId>shc</artifactId>
        <version>1.0.0-2.0-s_2.11</version> // Version depends on the Spark version and is supported upto Spark 2.x
    </dependency>

Główną zaletą korzystania z tego złącza jest to, że ma elastyczność w definicji schematu i nie wymaga kodowanych na twardo paramów, tak jak w nerdammer/spark-HBase-connector. Pamiętaj również, że obsługuje Spark 2.x więc to złącze jest bardzo elastyczne i zapewnia kompleksowe wsparcie w kwestiach i PRs.

Znajdź poniższą ścieżkę repozytorium dla najnowszych readme i sampli:

Hortonworks Spark HBase Connector

Możesz także przekonwertować to RDD do DataFrames i uruchomić SQL nad nim lub można mapować te Dataset lub DataFrames do zdefiniowanych przez użytkownika Pojo lub klas przypadków Java. Działa genialnie.

Proszę skomentować poniżej, jeśli potrzebujesz czegoś więcej.

 40
Author: Murtaza Kanchwala,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-03-08 07:00:53

[[1]] wolę czytać z hbase i robić manipulacje json w spark.
Spark dostarcza JavaSparkContext.newAPIHadoopRDD funkcja do odczytu danych z pamięci hadoop, w tym HBase. Będziesz musiał podać konfigurację HBase, nazwę tabeli i skanowanie w parametrze konfiguracyjnym i formacie wprowadzania tabeli oraz jej wartość klucza

Możesz użyć klasytable input format i parametru job, aby podać nazwę tabeli i skanować konfiguracja

Przykład:

conf.set(TableInputFormat.INPUT_TABLE, "tablename");
JavaPairRDD<ImmutableBytesWritable, Result> data = 
jsc.newAPIHadoopRDD(conf, TableInputFormat.class,ImmutableBytesWritable.class, Result.class);

Następnie możesz zrobić manipulację json w spark. Ponieważ spark może ponownie obliczyć, gdy pamięć jest pełna, załaduje tylko dane potrzebne do części przeliczeniowej (cmiiw), więc nie musisz się martwić o rozmiar danych

 10
Author: Averman,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2014-09-23 09:53:12

Aby dodać komentarz jak dodać skanowanie:

TableInputFormat ma następujące atrybuty:

  1. SCAN_ROW_START
  2. SCAN_ROW_STOP
conf.set(TableInputFormat.SCAN_ROW_START, "startrowkey");
conf.set(TableInputFormat.SCAN_ROW_STOP, "stoprowkey");
 6
Author: Zhang Kan,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-06-08 06:36:27

Ponieważ pytanie nie jest nowe, na razie jest kilka innych alternatyw:

Nie wiem zbyt wiele o pierwszym projekcie, ale wygląda na to, że nie obsługuje Spark 2.x. jednak ma bogate wsparcie na poziomie RDD Dla Spark 1.6.x.

Spark-on-HBase, z drugiej strony, ma oddziały Dla Spark 2.0 i nadchodzącego Spark 2.1. Ten projekt jest bardzo obiecujący, ponieważ koncentruje się na Dataset/DataFrame API. Pod maską implementuje standardowe API Spark Datasource i wykorzystuje silnik Spark Catalyst do optymalizacji zapytań. Programiści twierdzą tutaj, że jest w stanie podzielić przycinanie, przycinanie kolumn, predykat pushdown i osiągnięcie lokalizacji danych.

Prosty przykład, który wykorzystuje artefakt com.hortonworks:shc:1.0.0-2.0-s_2.11 z tego repo i Spark 2.0.2, jest przedstawiony poniżej:

case class Record(col0: Int, col1: Int, col2: Boolean)

val spark = SparkSession
  .builder()
  .appName("Spark HBase Example")
  .master("local[4]")
  .getOrCreate()

def catalog =
  s"""{
      |"table":{"namespace":"default", "name":"table1"},
      |"rowkey":"key",
      |"columns":{
      |"col0":{"cf":"rowkey", "col":"key", "type":"int"},
      |"col1":{"cf":"cf1", "col":"col1", "type":"int"},
      |"col2":{"cf":"cf2", "col":"col2", "type":"boolean"}
      |}
      |}""".stripMargin

val artificialData = (0 to 100).map(number => Record(number, number, number % 2 == 0))

// write
spark
  .createDataFrame(artificialData)
  .write
  .option(HBaseTableCatalog.tableCatalog, catalog)
  .option(HBaseTableCatalog.newTable, "5")
  .format("org.apache.spark.sql.execution.datasources.hbase")
  .save()

// read
val df = spark
  .read
  .option(HBaseTableCatalog.tableCatalog, catalog)
  .format("org.apache.spark.sql.execution.datasources.hbase")
  .load()

df.count()
 6
Author: Anton Okolnychyi,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-12-14 17:11:35