Mapreduce Combiner

Mam prosty kod mapreduce z mapperem, reduktorem i kombinatorem. Wyjście z mapera jest przekazywane do combinera. Ale do reduktora,zamiast wyjścia z kombinera, wyjście z mapera jest przekazywane.

Uprzejma pomoc

Kod:

package Combiner;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class AverageSalary
{
public static class Map extends  Mapper<LongWritable, Text, Text, DoubleWritable> 
{
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
    {    
        String[] empDetails= value.toString().split(",");
        Text unit_key = new Text(empDetails[1]);      
        DoubleWritable salary_value = new DoubleWritable(Double.parseDouble(empDetails[2]));
        context.write(unit_key,salary_value);    

    }  
}
public static class Combiner extends Reducer<Text,DoubleWritable, Text,Text> 
{
    public void reduce(final Text key, final Iterable<DoubleWritable> values, final Context context)
    {
        String val;
        double sum=0;
        int len=0;
        while (values.iterator().hasNext())
        {
            sum+=values.iterator().next().get();
            len++;
        }
        val=String.valueOf(sum)+":"+String.valueOf(len);
        try {
            context.write(key,new Text(val));
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}
public static class Reduce extends Reducer<Text,Text, Text,Text> 
{
    public void reduce (final Text key, final Text values, final Context context)
    {
        //String[] sumDetails=values.toString().split(":");
        //double average;
        //average=Double.parseDouble(sumDetails[0]);
        try {
            context.write(key,values);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}
public static void main(String args[])
{
    Configuration conf = new Configuration();
    try
    {
     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();    
     if (otherArgs.length != 2) {      
         System.err.println("Usage: Main <in> <out>");      
         System.exit(-1);    }    
     Job job = new Job(conf, "Average salary");    
     //job.setInputFormatClass(KeyValueTextInputFormat.class);    
     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));    
     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));    
     job.setJarByClass(AverageSalary.class);    
     job.setMapperClass(Map.class);    
     job.setCombinerClass(Combiner.class);
     job.setReducerClass(Reduce.class);    
     job.setOutputKeyClass(Text.class);    
     job.setOutputValueClass(Text.class);    

        System.exit(job.waitForCompletion(true) ? 0 : -1);
    } catch (ClassNotFoundException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

}

Author: user2401464, 2013-11-26

4 answers

Wygląda na to, że zapomniałeś o ważnej własności kombinatora:

Typy wejściowe dla klucza / wartości i typy wyjściowe klucz / wartość muszą być takie same.

Nie możesz przyjąć Text/DoubleWritable i zwrócić Text/Text. Sugeruję użycie Text zamiast DoubleWritable i poprawne parsowanie wewnątrz Combiner.

 8
Author: user987339,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2014-03-26 22:51:40

Zasada #1 kombinatorów brzmi: nie zakładaj, że kombinator będzie działał . Traktuj kombinator tylko jako optymalizację .

Kombiner nie ma gwarancji, aby uruchomić na wszystkich danych. W niektórych przypadkach, gdy dane nie muszą być rozlane na dysk, MapReduce pominie używając Combinera całkowicie. Zauważ również, że Kombiner może być uruchamiany wielokrotnie przez podzbiory danych! Będzie działać raz na jeden wyciek.

W Twoim przypadku, robisz to złe założenie. Ty powinien robić sumę w Kombinatorze i reduktorze.

Powinieneś również śledzić odpowiedź @user987339. Wejście i wyjście kombinatora musi być identyczne (Text,Double -> Text,Double) I musi być zgodne z wyjściem Mapera i wejściem reduktora.

 15
Author: Donald Miner,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2013-11-26 15:01:23

Jeśli używana jest funkcja combine, to ma taką samą postać jak funkcja reduce (i jest implementacji reduktora), z tym, że jego typy wyjściowe są kluczem pośrednim i typy wartości (K2 i V2), dzięki czemu mogą one zasilać funkcję reduce: Mapa: (K1, V1) → lista (K2, V2) combine: (K2, list(V2)) → list (K2, V2) reduce: (K2, list(V2)) → list (K3, V3) Często funkcje combine i reduce są takie same, w takim przypadku K3 jest taki sam jak K2, A V3 jest taki sam jak V2.

 1
Author: Narsireddy,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-04-23 13:07:30

Combiner nie będzie działać zawsze, gdy uruchomisz mapreduce.

Jeśli istnieją co najmniej trzy pliki rozlewu (wyjście mappera zapisane na dysku lokalnym), combiner uruchomi się tak, że rozmiar pliku może być zmniejszony tak, że można go łatwo przenieść do węzła reduce.

Liczba wycieków, dla których kombiner musi działać, może być ustawiona przez min.num.spills.for.combine właściwość

 0
Author: jintocvg,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2014-09-24 14:24:45