Jak czytać z hbase za pomocą spark
Poniższy kod zostanie odczytany z bazy hbase, a następnie przekonwertowany na strukturę json i konwertowany na schemaRDD, ale problem polega na tym, że jestem using List
aby zapisać ciąg json, a następnie przekazać do javaRDD, dla danych o 100 GB master zostanie załadowany z danymi w pamięci. Jaki jest właściwy sposób załadować dane z hbase następnie wykonać manipulację, a następnie przekonwertować do JavaRDD.
package hbase_reader;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import scala.Function1;
import scala.Tuple2;
import scala.runtime.AbstractFunction1;
import com.google.common.collect.Lists;
public class hbase_reader {
public static void main(String[] args) throws IOException, ParseException {
List<String> jars = Lists.newArrayList("");
SparkConf spconf = new SparkConf();
spconf.setMaster("local[2]");
spconf.setAppName("HBase");
//spconf.setSparkHome("/opt/human/opt/spark-0.9.0-hdp1");
spconf.setJars(jars.toArray(new String[jars.size()]));
JavaSparkContext sc = new JavaSparkContext(spconf);
//spconf.set("spark.executor.memory", "1g");
JavaSQLContext jsql = new JavaSQLContext(sc);
HBaseConfiguration conf = new HBaseConfiguration();
String tableName = "HBase.CounData1_Raw_Min1";
HTable table = new HTable(conf,tableName);
try {
ResultScanner scanner = table.getScanner(new Scan());
List<String> jsonList = new ArrayList<String>();
String json = null;
for(Result rowResult:scanner) {
json = "";
String rowKey = Bytes.toString(rowResult.getRow());
for(byte[] s1:rowResult.getMap().keySet()) {
String s1_str = Bytes.toString(s1);
String jsonSame = "";
for(byte[] s2:rowResult.getMap().get(s1).keySet()) {
String s2_str = Bytes.toString(s2);
for(long s3:rowResult.getMap().get(s1).get(s2).keySet()) {
String s3_str = new String(rowResult.getMap().get(s1).get(s2).get(s3));
jsonSame += "\""+s2_str+"\":"+s3_str+",";
}
}
jsonSame = jsonSame.substring(0,jsonSame.length()-1);
json += "\""+s1_str+"\""+":{"+jsonSame+"}"+",";
}
json = json.substring(0,json.length()-1);
json = "{\"RowKey\":\""+rowKey+"\","+json+"}";
jsonList.add(json);
}
JavaRDD<String> jsonRDD = sc.parallelize(jsonList);
JavaSchemaRDD schemaRDD = jsql.jsonRDD(jsonRDD);
System.out.println(schemaRDD.take(2));
} finally {
table.close();
}
}
}
4 answers
Podstawowy przykład odczytu danych HBase za pomocą Spark( Scala), Możesz również napisać to w Javie:
import org.apache.hadoop.hbase.client.{HBaseAdmin, Result}
import org.apache.hadoop.hbase.{ HBaseConfiguration, HTableDescriptor }
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark._
object HBaseRead {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("HBaseRead").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val conf = HBaseConfiguration.create()
val tableName = "table1"
System.setProperty("user.name", "hdfs")
System.setProperty("HADOOP_USER_NAME", "hdfs")
conf.set("hbase.master", "localhost:60000")
conf.setInt("timeout", 120000)
conf.set("hbase.zookeeper.quorum", "localhost")
conf.set("zookeeper.znode.parent", "/hbase-unsecure")
conf.set(TableInputFormat.INPUT_TABLE, tableName)
val admin = new HBaseAdmin(conf)
if (!admin.isTableAvailable(tableName)) {
val tableDesc = new HTableDescriptor(tableName)
admin.createTable(tableDesc)
}
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
println("Number of Records found : " + hBaseRDD.count())
sc.stop()
}
}
Aktualizacja -2016
[[4]}od Spark 1.0.x+, teraz Można również użyć złącza Spark-HBase:Zależność Mavena do włączenia:
<dependency>
<groupId>it.nerdammer.bigdata</groupId>
<artifactId>spark-hbase-connector_2.10</artifactId>
<version>1.0.3</version> // Version can be changed as per your Spark version, I am using Spark 1.6.x
</dependency>
I znajdź poniżej przykładowy kod dla tego samego:
import org.apache.spark._
import it.nerdammer.spark.hbase._
object HBaseRead extends App {
val sparkConf = new SparkConf().setAppName("Spark-HBase").setMaster("local[4]")
sparkConf.set("spark.hbase.host", "<YourHostnameOnly>") //e.g. 192.168.1.1 or localhost or your hostanme
val sc = new SparkContext(sparkConf)
// For Example If you have an HBase Table as 'Document' with ColumnFamily 'SMPL' and qualifier as 'DocID, Title' then:
val docRdd = sc.hbaseTable[(Option[String], Option[String])]("Document")
.select("DocID", "Title").inColumnFamily("SMPL")
println("Number of Records found : " + docRdd .count())
}
Aktualizacja-2017
Od Spark 1.6.x+, teraz Można również używać Złącza SHC (użytkownicy Hortonworks lub HDP):
Zależność Mavena do włączenia :
<dependency>
<groupId>com.hortonworks</groupId>
<artifactId>shc</artifactId>
<version>1.0.0-2.0-s_2.11</version> // Version depends on the Spark version and is supported upto Spark 2.x
</dependency>
Główną zaletą korzystania z tego złącza jest to, że ma elastyczność w definicji schematu i nie wymaga kodowanych na twardo paramów, tak jak w nerdammer/spark-HBase-connector. Pamiętaj również, że obsługuje Spark 2.x więc to złącze jest bardzo elastyczne i zapewnia kompleksowe wsparcie w kwestiach i PRs.
Znajdź poniższą ścieżkę repozytorium dla najnowszych readme i sampli:
Hortonworks Spark HBase Connector
Możesz także przekonwertować to RDD do DataFrames i uruchomić SQL nad nim lub można mapować te Dataset lub DataFrames do zdefiniowanych przez użytkownika Pojo lub klas przypadków Java. Działa genialnie.
Proszę skomentować poniżej, jeśli potrzebujesz czegoś więcej.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-03-08 07:00:53
[[1]] wolę czytać z hbase i robić manipulacje json w spark.
Spark dostarcza JavaSparkContext.newAPIHadoopRDD funkcja do odczytu danych z pamięci hadoop, w tym HBase. Będziesz musiał podać konfigurację HBase, nazwę tabeli i skanowanie w parametrze konfiguracyjnym i formacie wprowadzania tabeli oraz jej wartość klucza
Możesz użyć klasytable input format i parametru job, aby podać nazwę tabeli i skanować konfiguracja
Przykład:
conf.set(TableInputFormat.INPUT_TABLE, "tablename");
JavaPairRDD<ImmutableBytesWritable, Result> data =
jsc.newAPIHadoopRDD(conf, TableInputFormat.class,ImmutableBytesWritable.class, Result.class);
Następnie możesz zrobić manipulację json w spark. Ponieważ spark może ponownie obliczyć, gdy pamięć jest pełna, załaduje tylko dane potrzebne do części przeliczeniowej (cmiiw), więc nie musisz się martwić o rozmiar danych
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2014-09-23 09:53:12
Aby dodać komentarz jak dodać skanowanie:
TableInputFormat ma następujące atrybuty:
- SCAN_ROW_START
- SCAN_ROW_STOP
conf.set(TableInputFormat.SCAN_ROW_START, "startrowkey");
conf.set(TableInputFormat.SCAN_ROW_STOP, "stoprowkey");
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-06-08 06:36:27
Ponieważ pytanie nie jest nowe, na razie jest kilka innych alternatyw:
- HBase-spark, Moduł dostępny bezpośrednio w HBase repo
- Spark-on-HBase by Hortonworks
Nie wiem zbyt wiele o pierwszym projekcie, ale wygląda na to, że nie obsługuje Spark 2.x. jednak ma bogate wsparcie na poziomie RDD Dla Spark 1.6.x.
Spark-on-HBase, z drugiej strony, ma oddziały Dla Spark 2.0 i nadchodzącego Spark 2.1. Ten projekt jest bardzo obiecujący, ponieważ koncentruje się na Dataset/DataFrame API. Pod maską implementuje standardowe API Spark Datasource i wykorzystuje silnik Spark Catalyst do optymalizacji zapytań. Programiści twierdzą tutaj, że jest w stanie podzielić przycinanie, przycinanie kolumn, predykat pushdown i osiągnięcie lokalizacji danych.
Prosty przykład, który wykorzystuje artefakt com.hortonworks:shc:1.0.0-2.0-s_2.11
z tego repo i Spark 2.0.2, jest przedstawiony poniżej:
case class Record(col0: Int, col1: Int, col2: Boolean)
val spark = SparkSession
.builder()
.appName("Spark HBase Example")
.master("local[4]")
.getOrCreate()
def catalog =
s"""{
|"table":{"namespace":"default", "name":"table1"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"int"},
|"col1":{"cf":"cf1", "col":"col1", "type":"int"},
|"col2":{"cf":"cf2", "col":"col2", "type":"boolean"}
|}
|}""".stripMargin
val artificialData = (0 to 100).map(number => Record(number, number, number % 2 == 0))
// write
spark
.createDataFrame(artificialData)
.write
.option(HBaseTableCatalog.tableCatalog, catalog)
.option(HBaseTableCatalog.newTable, "5")
.format("org.apache.spark.sql.execution.datasources.hbase")
.save()
// read
val df = spark
.read
.option(HBaseTableCatalog.tableCatalog, catalog)
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
df.count()
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-12-14 17:11:35