hadoop map reduce job with HDFS input and HBase output
Jestem nowy na hadoop. Mam zadanie MapReduce, które ma pobierać wejście z Hdfs i zapisywać wyjście reduktora do Hbase. Nie znalazłem dobrego przykładu.
Oto kod, błąd wywołany w tym przykładzie to type mismatch in map, expected ImmutableBytesWritable received IntWritable.
Klasa Mapper
public static class AddValueMapper extends Mapper < LongWritable,
Text, ImmutableBytesWritable, IntWritable > {
/* input <key, line number : value, full line>
* output <key, log key : value >*/
public void map(LongWritable key, Text value,
Context context)throws IOException,
InterruptedException {
byte[] key;
int value, pos = 0;
String line = value.toString();
String p1 , p2 = null;
pos = line.indexOf("=");
//Key part
p1 = line.substring(0, pos);
p1 = p1.trim();
key = Bytes.toBytes(p1);
//Value part
p2 = line.substring(pos +1);
p2 = p2.trim();
value = Integer.parseInt(p2);
context.write(new ImmutableBytesWritable(key),new IntWritable(value));
}
}
Klasa Reduktora
public static class AddValuesReducer extends TableReducer<
ImmutableBytesWritable, IntWritable, ImmutableBytesWritable> {
public void reduce(ImmutableBytesWritable key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
long total =0;
// Loop values
while(values.iterator().hasNext()){
total += values.iterator().next().get();
}
// Put to HBase
Put put = new Put(key.get());
put.add(Bytes.toBytes("data"), Bytes.toBytes("total"),
Bytes.toBytes(total));
Bytes.toInt(key.get()), total));
context.write(key, put);
}
}
Miałem podobną pracę tylko z HDFS i działa dobrze.
Edytowano 18-06-2013 . Projekt uczelni zakończył się pomyślnie dwa lata temu. W przypadku konfiguracji zadania (Część sterownika) sprawdź poprawną odpowiedź.
4 answers
Oto kod, który rozwiąże twój problem
Kierowca
HBaseConfiguration conf = HBaseConfiguration.create();
Job job = new Job(conf,"JOB_NAME");
job.setJarByClass(yourclass.class);
job.setMapperClass(yourMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Intwritable.class);
FileInputFormat.setInputPaths(job, new Path(inputPath));
TableMapReduceUtil.initTableReducerJob(TABLE,
yourReducer.class, job);
job.setReducerClass(yourReducer.class);
job.waitForCompletion(true);
Mapper & Reducer
class yourMapper extends Mapper<LongWritable, Text, Text,IntWritable> {
//@overide map()
}
class yourReducer
extends
TableReducer<Text, IntWritable,
ImmutableBytesWritable>
{
//@override reduce()
}
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2013-06-18 12:16:44
Nie wiem, dlaczego wersja HDFS działa: zwykle musisz ustawić format wejściowy dla zadania, a FileInputFormat jest klasą abstrakcyjną. Może pominąłeś jakieś kwestie? takie jak
job.setInputFormatClass(TextInputFormat.class);
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2011-01-12 00:58:50
Najlepszym i najszybszym sposobem na zbiorcze ładowanie danych w HBase jest użycie HFileOutputFormat
i CompliteBulkLoad
utility.
Przykładowy kod znajdziesz tutaj :
Mam nadzieję, że się przyda:)
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-07-08 10:33:34
public void map(LongWritable key, Text value,
Context context)throws IOException,
InterruptedException {
Zmień to na immutableBytesWritable
, intwritable
.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2013-04-17 12:09:05