Jaki jest najszybszy sposób programowego ładowania danych do bazy HBase?

Mam zwykły plik tekstowy z prawdopodobnie milionami linii, który wymaga niestandardowego parsowania i chcę go jak najszybciej załadować do tabeli HBase (używając Hadoop lub HBase Java client).

Moje obecne rozwiązanie opiera się na pracy MapReduce BEZ części Reduce. Używam FileInputFormat do odczytu pliku tekstowego, aby każda linia była przekazywana do metody map mojej klasy Mapper. W tym momencie linia jest przetwarzana do postaci Put obiektu, który jest zapisywany do context. Następnie, TableOutputFormat bierze Put obiekt i wstawia go do tabeli.

To rozwiązanie daje średnią szybkość wstawiania 1000 wierszy na sekundę, czyli mniej niż się spodziewałem. moja konfiguracja HBase jest w trybie pseudo rozproszonym na jednym serwerze.

Ciekawostką jest to, że podczas wstawiania 1,000,000 wierszy, 25 maperów (zadań) jest wywoływanych, ale działają seryjnie (jeden po drugim); czy to normalne?

Oto kod do mojego obecnego rozwiązania:

public static class CustomMap extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {

    protected void map(LongWritable key, Text value, Context context) throws IOException {
        Map<String, String> parsedLine = parseLine(value.toString());

        Put row = new Put(Bytes.toBytes(parsedLine.get(keys[1])));
        for (String currentKey : parsedLine.keySet()) {
            row.add(Bytes.toBytes(currentKey),Bytes.toBytes(currentKey),Bytes.toBytes(parsedLine.get(currentKey)));
        }

        try {
            context.write(new ImmutableBytesWritable(Bytes.toBytes(parsedLine.get(keys[1]))), row);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

public int run(String[] args) throws Exception {
    if (args.length != 2) {
        return -1;
    }

    conf.set("hbase.mapred.outputtable", args[1]);

    // I got these conf parameters from a presentation about Bulk Load
    conf.set("hbase.hstore.blockingStoreFiles", "25");
    conf.set("hbase.hregion.memstore.block.multiplier", "8");
    conf.set("hbase.regionserver.handler.count", "30");
    conf.set("hbase.regions.percheckin", "30");
    conf.set("hbase.regionserver.globalMemcache.upperLimit", "0.3");
    conf.set("hbase.regionserver.globalMemcache.lowerLimit", "0.15");

    Job job = new Job(conf);
    job.setJarByClass(BulkLoadMapReduce.class);
    job.setJobName(NAME);
    TextInputFormat.setInputPaths(job, new Path(args[0]));
    job.setInputFormatClass(TextInputFormat.class);
    job.setMapperClass(CustomMap.class);
    job.setOutputKeyClass(ImmutableBytesWritable.class);
    job.setOutputValueClass(Put.class);
    job.setNumReduceTasks(0);
    job.setOutputFormatClass(TableOutputFormat.class);

    job.waitForCompletion(true);
    return 0;
}

public static void main(String[] args) throws Exception {
    Long startTime = Calendar.getInstance().getTimeInMillis();
    System.out.println("Start time : " + startTime);

    int errCode = ToolRunner.run(HBaseConfiguration.create(), new BulkLoadMapReduce(), args);

    Long endTime = Calendar.getInstance().getTimeInMillis();
    System.out.println("End time : " + endTime);
    System.out.println("Duration milliseconds: " + (endTime-startTime));

    System.exit(errCode);
}
Author: Cihan Keser, 2012-01-06

2 answers

Przeszedłem przez proces, który jest prawdopodobnie bardzo podobny do twojego, próbując znaleźć skuteczny sposób na załadowanie danych z MR do bazy HBase. To, co znalazłem do pracy, to użycie HFileOutputFormat jako OutputFormatClass of the MR.

Poniżej jest podstawa mojego kodu, którą muszę wygenerować job i funkcję Mapper map, która zapisuje dane. To było szybkie. Nie używamy go już, więc nie mam numerów pod ręką, ale było to około 2,5 miliona płyt w pod minucie.

Oto (rozebrana) funkcja, którą napisałem, aby wygenerować zadanie dla mojego procesu MapReduce, aby umieścić dane w HBase

private Job createCubeJob(...) {
    //Build and Configure Job
    Job job = new Job(conf);
    job.setJobName(jobName);
    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapOutputValueClass(Put.class);
    job.setMapperClass(HiveToHBaseMapper.class);//Custom Mapper
    job.setJarByClass(CubeBuilderDriver.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(HFileOutputFormat.class);

    TextInputFormat.setInputPaths(job, hiveOutputDir);
    HFileOutputFormat.setOutputPath(job, cubeOutputPath);

    Configuration hConf = HBaseConfiguration.create(conf);
    hConf.set("hbase.zookeeper.quorum", hbaseZookeeperQuorum);
    hConf.set("hbase.zookeeper.property.clientPort", hbaseZookeeperClientPort);

    HTable hTable = new HTable(hConf, tableName);

    HFileOutputFormat.configureIncrementalLoad(job, hTable);
    return job;
}

To jest moja funkcja map z klasy HiveToHBaseMapper (lekko edytowana).

public void map(WritableComparable key, Writable val, Context context)
        throws IOException, InterruptedException {
    try{
        Configuration config = context.getConfiguration();
        String[] strs = val.toString().split(Constants.HIVE_RECORD_COLUMN_SEPARATOR);
        String family = config.get(Constants.CUBEBUILDER_CONFIGURATION_FAMILY);
        String column = strs[COLUMN_INDEX];
        String Value = strs[VALUE_INDEX];
        String sKey = generateKey(strs, config);
        byte[] bKey = Bytes.toBytes(sKey);
        Put put = new Put(bKey);
        put.add(Bytes.toBytes(family), Bytes.toBytes(column), (value <= 0) 
                        ? Bytes.toBytes(Double.MIN_VALUE)
                        : Bytes.toBytes(value));

        ImmutableBytesWritable ibKey = new ImmutableBytesWritable(bKey);
        context.write(ibKey, put);

        context.getCounter(CubeBuilderContextCounters.CompletedMapExecutions).increment(1);
    }
    catch(Exception e){
        context.getCounter(CubeBuilderContextCounters.FailedMapExecutions).increment(1);    
    }

}

Jestem całkiem pewien, że nie będzie to rozwiązanie do kopiowania i wklejania dla Ciebie. Oczywiście dane, z którymi tu pracowałem, nie wymagały żadnego niestandardowego przetwarzania(to było zrobione w robocie MR przed tym). Najważniejsze, co chcę z tego zapewnić, to HFileOutputFormat. Na odpoczynek jest tylko przykładem tego, jak go wykorzystałem. :)
Mam nadzieję, że trafisz na solidną drogę do dobrego rozwiązania. :

 17
Author: QuinnG,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2012-01-06 17:50:19

Ciekawostką jest to, że podczas wstawiania 1,000,000 wierszy, 25 maperów (zadań) jest wywoływanych, ale działają seryjnie (jeden po drugim); czy to normalne?

mapreduce.tasktracker.map.tasks.maximum parametr, który jest domyślnie ustawiony na 2, określa maksymalną liczbę zadań, które mogą działać równolegle na węźle. Jeśli nie zostanie to zmienione, powinieneś zobaczyć 2 zadania mapowe działające jednocześnie na każdym węźle.

 0
Author: Praveen Sripati,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2012-01-06 01:22:04