Wyświetla listę z zadania redukcji Mapy Hadoop za pomocą niestandardowego zapisu

Próbuję stworzyć proste zadanie zmniejszania map, zmieniając przykład wordcount podany przez hadoop.

Próbuję umieścić listę zamiast liczenia słów. Przykład wordcount daje następujący ouput

hello 2
world 2

Staram się, aby to wyszło jako lista, która będzie podstawą przyszłej pracy

hello 1 1
world 1 1

Myślę, że jestem na dobrej drodze, ale mam problem z napisaniem listy. Zamiast powyższego otrzymuję

Hello   foo.MyArrayWritable@61250ff2
World   foo.MyArrayWritable@483a0ab1

Oto moje moje życie. I umieścić sys w write(DataOuptut arg0), ale nigdy niczego nie wypisuje, więc myślę, że ta metoda może nie być wywołana i nie wiem dlaczego.

class MyArrayWritable extends ArrayWritable{

public MyArrayWritable(Class<? extends Writable> valueClass, Writable[] values) {
    super(valueClass, values);
}
public MyArrayWritable(Class<? extends Writable> valueClass) {
    super(valueClass);
}

@Override
public IntWritable[] get() {
    return (IntWritable[]) super.get();
}

@Override
public void write(DataOutput arg0) throws IOException {
    for(IntWritable i : get()){
        i.write(arg0);
    }
}
}

Edycja-dodawanie większej ilości kodu źródłowego

public class WordCount {

public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            word.set(tokenizer.nextToken());
            context.write(word, one);
        }
    }
} 

public static class Reduce extends Reducer<Text, IntWritable, Text, MyArrayWritable> {

    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        ArrayList<IntWritable> list = new ArrayList<IntWritable>();    
        for (IntWritable val : values) {
            list.add(val);
        }
        context.write(key, new MyArrayWritable(IntWritable.class, list.toArray(new IntWritable[list.size()])));
    }
}

public static void main(String[] args) throws Exception {
    if(args == null || args.length == 0)
        args = new String[]{"./wordcount/input","./wordcount/output"};
    Path p = new Path(args[1]);
    FileSystem fs = FileSystem.get(new Configuration());
    fs.exists(p);
    fs.delete(p, true);

    Configuration conf = new Configuration();

    Job job = new Job(conf, "wordcount");
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);
    job.setJarByClass(WordCount.class);
    job.setInputFormatClass(TextInputFormat.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.waitForCompletion(true);
}

}

Author: triggs, 2013-04-04

1 answers

Masz 'błąd' w reduktorze - iterator wartości ponownie używa tego samego IntWritable w całej pętli, więc powinieneś zawinąć wartość dodaną do listy w następujący sposób:

public void reduce(Text key, Iterable<IntWritable> values, Context context)
                                      throws IOException, InterruptedException {
    ArrayList<IntWritable> list = new ArrayList<IntWritable>();    
    for (IntWritable val : values) {
        list.add(new IntWritable(val));
    }
    context.write(key, new MyArrayWritable(IntWritable.class, list.toArray(new IntWritable[list.size()])));
}

W rzeczywistości nie jest to problem, ponieważ używasz listy tablic, a Twój maper wysyła tylko jedną wartość (jedną), ale jest czymś, co może cię potknąć, jeśli kiedykolwiek rozszerzysz ten kod.

Musisz również zdefiniować w swoim zadaniu, że typy wyjściowe map i reduktorów są różne:

// map output types
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// reducer output types

job.setOutputValueClass(Text.class);
job.setOutputValueClass(MyArrayWritable.class);

Ty możesz chcieć jawnie zdefiniować liczbę reduktorów (co może być powodem, dla którego nigdy nie widzisz, że Twoje sysouty są zapisywane w dziennikach zadań, zwłaszcza jeśli administrator klastra zdefiniował domyślną liczbę na 0):

job.setNumReduceTasks(1);

Używasz domyślnego formatu wyjściowego tekstu, który wywołuje ToString () na parze klucza wyjściowego i wartości - MyArrayWritable nie ma nadpisanej metody toString, więc powinieneś umieścić ją w MyArrayWritable:

@Override
public String toString() {
  return Arrays.toString(get());
}

Na koniec usuń metodę overridden write z MyArrayWritable - nie jest to poprawna implementacja zgodna z darmową metodą readFields. nie musisz nadpisywać tej metody, ale jeśli to zrobisz (powiedzmy, że chcesz zobaczyć sysout, aby zweryfikować, że jest wywoływany), zrób coś takiego zamiast tego:

@Override
public void write(DataOutput arg0) throws IOException {
  System.out.println("write method called");
  super.write(arg0);
}
 22
Author: Chris White,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2013-04-05 12:27:33