MultipleOutputFormat w hadoop

Jestem nowicjuszem w Hadoop. Wypróbuję program Wordcount.

Teraz, aby wypróbować wiele plików wyjściowych, używam MultipleOutputFormat. ten link pomógł mi w tym. http://hadoop.apache.org/common/docs/r0.19.0/api/org/apache/hadoop/mapred/lib/MultipleOutputs.html

W klasie kierowcy miałem

    MultipleOutputs.addNamedOutput(conf, "even",
            org.apache.hadoop.mapred.TextOutputFormat.class, Text.class,
            IntWritable.class);

    MultipleOutputs.addNamedOutput(conf, "odd",
            org.apache.hadoop.mapred.TextOutputFormat.class, Text.class,
            IntWritable.class);`

I moja klasa stała się tym

public static class Reduce extends MapReduceBase implements
        Reducer<Text, IntWritable, Text, IntWritable> {
    MultipleOutputs mos = null;

    public void configure(JobConf job) {
        mos = new MultipleOutputs(job);
    }

    public void reduce(Text key, Iterator<IntWritable> values,
            OutputCollector<Text, IntWritable> output, Reporter reporter)
            throws IOException {
        int sum = 0;
        while (values.hasNext()) {
            sum += values.next().get();
        }
        if (sum % 2 == 0) {
            mos.getCollector("even", reporter).collect(key, new IntWritable(sum));
        }else {
            mos.getCollector("odd", reporter).collect(key, new IntWritable(sum));
        }
        //output.collect(key, new IntWritable(sum));
    }
    @Override
    public void close() throws IOException {
        // TODO Auto-generated method stub
    mos.close();
    }
}

Wszystko działało, ale dostaję dużo plików, (jeden nieparzysty i jeden parzysty dla każdej mapy-reduce)

Pytanie brzmi: jak Czy Mogę mieć tylko 2 pliki wyjściowe (Nieparzyste i Parzyste ) tak, że każde nieparzyste wyjście każdego Map-reduce zostanie zapisane w tym nieparzystym pliku, i to samo dla parzystego.

Author: raj, 2010-08-16

3 answers

Każdy reduktor używa OutputFormat do zapisu rekordów. Dlatego otrzymujesz zestaw nieparzystych i parzystych plików na reduktor. Jest to projekt tak, że każdy reduktor może wykonywać zapisy równolegle.

Jeśli chcesz tylko jeden plik nieparzysty i pojedynczy parzysty, musisz ustawić mapred.zmniejsz.zadania do 1. Ale wydajność ucierpi, ponieważ wszyscy maperzy będą zasilać jeden reduktor.

Inną opcją jest zmiana procesu odczytywania tych plików na akceptację wielu pliki wejściowe lub napisać oddzielny proces, który łączy te pliki razem.

 3
Author: bajafresh4life,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2010-08-16 13:01:38

Napisałam za to zajęcia. Po prostu użyj tego swojej pracy:

job.setOutputFormatClass(m_customOutputFormatClass);

To moja klasa:

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * TextOutputFormat extension which enables writing the mapper/reducer's output in multiple files.<br>
 * <p>
 * <b>WARNING</b>: The number of different folder shuoldn't be large for one mapper since we keep an
 * {@link RecordWriter} instance per folder name.
 * </p>
 * <p>
 * In this class the folder name is defined by the written entry's key.<br>
 * To change this behavior simply extend this class and override the
 * {@link HdMultipleFileOutputFormat#getFolderNameExtractor()} method and create your own
 * {@link FolderNameExtractor} implementation.
 * </p>
 * 
 * 
 * @author ykesten
 * 
 * @param <K> - Keys type
 * @param <V> - Values type
 */
public class HdMultipleFileOutputFormat<K, V> extends TextOutputFormat<K, V> {

    private String folderName;

    private class MultipleFilesRecordWriter extends RecordWriter<K, V> {

        private Map<String, RecordWriter<K, V>> fileNameToWriter;
        private FolderNameExtractor<K, V> fileNameExtractor;
        private TaskAttemptContext job;

        public MultipleFilesRecordWriter(FolderNameExtractor<K, V> fileNameExtractor, TaskAttemptContext job) {
            fileNameToWriter = new HashMap<String, RecordWriter<K, V>>();
            this.fileNameExtractor = fileNameExtractor;
            this.job = job;
        }

        @Override
        public void write(K key, V value) throws IOException, InterruptedException {
            String fileName = fileNameExtractor.extractFolderName(key, value);
            RecordWriter<K, V> writer = fileNameToWriter.get(fileName);
            if (writer == null) {
                writer = createNewWriter(fileName, fileNameToWriter, job);
                if (writer == null) {
                    throw new IOException("Unable to create writer for path: " + fileName);
                }
            }
            writer.write(key, value);
        }

        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            for (Entry<String, RecordWriter<K, V>> entry : fileNameToWriter.entrySet()) {
                entry.getValue().close(context);
            }
        }

    }

    private synchronized RecordWriter<K, V> createNewWriter(String folderName,
            Map<String, RecordWriter<K, V>> fileNameToWriter, TaskAttemptContext job) {
        try {
            this.folderName = folderName;
            RecordWriter<K, V> writer = super.getRecordWriter(job);
            this.folderName = null;
            fileNameToWriter.put(folderName, writer);
            return writer;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    @Override
    public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException {
        Path path = super.getDefaultWorkFile(context, extension);
        if (folderName != null) {
            String newPath = path.getParent().toString() + "/" + folderName + "/" + path.getName();
            path = new Path(newPath);
        }
        return path;
    }

    @Override
    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        return new MultipleFilesRecordWriter(getFolderNameExtractor(), job);
    }

    public FolderNameExtractor<K, V> getFolderNameExtractor() {
        return new KeyFolderNameExtractor<K, V>();
    }

    public interface FolderNameExtractor<K, V> {
        public String extractFolderName(K key, V value);
    }

    private static class KeyFolderNameExtractor<K, V> implements FolderNameExtractor<K, V> {
        public String extractFolderName(K key, V value) {
            return key.toString();
        }
    }

}
 3
Author: Yuval Kesten,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2012-03-07 14:54:50

Wiele plików wyjściowych zostanie wygenerowanych na podstawie liczby reduktorów.

Możesz użyć hadoop dfs-getmerge do scalonych wyjść

 1
Author: Harsha Hulageri,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2010-08-17 07:17:58