Łączenie wielu zadań MapReduce w Hadoop

W wielu rzeczywistych sytuacjach, w których stosujesz MapReduce, ostateczne algorytmy kończą się kilkoma krokami MapReduce.

Tj. Map1, Reduce1, Map2, Reduce2 i tak dalej.

Więc masz wyjście z ostatniego zmniejszenia, które jest potrzebne jako Wejście dla następnej mapy.

Dane pośrednie są czymś, czego (ogólnie) nie chcesz zachować po pomyślnym zakończeniu potoku. Również dlatego, że te dane pośrednie są ogólnie pewną strukturą danych (jak 'map 'lub ' set') nie chcesz wkładać zbyt wiele wysiłku w pisanie i czytanie tych par klucz-wartość.

Jaki jest zalecany sposób robienia tego w Hadoop?

Czy istnieje (prosty) przykład, który pokazuje, jak prawidłowo obsługiwać te pośrednie dane, w tym oczyszczanie po?

Author: Ani Menon, 2010-03-23

13 answers

Myślę, że ten tutorial na Yahoo developer network pomoże Ci w tym: Chaining Jobs

Używasz JobClient.runJob(). Ścieżka wyjściowa danych z pierwszego zadania staje się ścieżką wejściową do drugiego zadania. Muszą one zostać przekazane jako argumenty do zadań z odpowiednim kodem, aby je przeanalizować i skonfigurować parametry dla zadania.

Myślę, że powyższa metoda może być jednak sposobem, w jaki starsze API mapred to zrobiło, ale nadal powinno działać. Będzie podobna metoda w nowym API mapreduce, ale nie jestem pewien, co to jest.

Jeśli chodzi o usuwanie danych pośrednich po zakończeniu zadania, możesz to zrobić w swoim kodzie. Jak już to robiłem to używałem czegoś takiego:

FileSystem.delete(Path f, boolean recursive);

Gdzie ścieżka jest lokalizacją na HDFS danych. Musisz upewnić się, że usuwasz te dane tylko wtedy, gdy żadne inne zadanie ich nie wymaga.

 53
Author: Binary Nerd,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-02-07 04:54:30

Można to zrobić na wiele sposobów.

(1) zadania kaskadowe

Utwórz obiekt JobConf "job1" dla pierwszego zadania i ustaw wszystkie parametry z "input" jako katalog wejściowy i "temp" jako katalog wyjściowy. Wykonaj to zadanie:

JobClient.run(job1).

Bezpośrednio pod nim Utwórz obiekt JobConf " job2 "dla drugiego zadania i ustaw wszystkie parametry z" temp "jako katalog wejściowy i" output " jako katalog wyjściowy. Wykonaj to zadanie:

JobClient.run(job2).

(2) utwórz dwa Obiektów JobConf i ustawia w nich wszystkie parametry tak jak (1) poza tym, że nie używasz JobClient.uciekaj.

Następnie utwórz dwa obiekty zadania z jobconfs jako parametrami:

Job job1=new Job(jobconf1); 
Job job2=new Job(jobconf2);

Używając obiektu jobControl, określasz zależności zadania, a następnie uruchamiasz zadania:

JobControl jbcntrl=new JobControl("jbcntrl");
jbcntrl.addJob(job1);
jbcntrl.addJob(job2);
job2.addDependingJob(job1);
jbcntrl.run();

(3) Jeśli potrzebujesz struktury podobnej do Map + | Reduce / Map*, możesz użyć klas ChainMapper i ChainReducer, które są dostarczane z Hadoop w wersji 0.19 i naprzód.

Cheers

 17
Author: user381928,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-11-10 08:31:21

Istnieje wiele sposobów, aby to zrobić. Skupię się na dwóch.

Jeden jest przez Riffle ( http://github.com/cwensel/riffle ) biblioteka adnotacji do identyfikowania rzeczy zależnych i 'wykonywania' ich w kolejności zależności (topologicznej).

LUB możesz użyć kaskady (i MapReduceFlow) w kaskadzie ( http://www.cascading.org / ). Przyszła wersja będzie obsługiwać adnotacje Riffle, ale działa świetnie teraz z raw jobconf jobs.

A variant on this jest nie zarządzać MR jobs ręcznie w ogóle, ale rozwijać swoją aplikację za pomocą kaskadowego API. Następnie jobconf i job chaining są obsługiwane wewnętrznie za pomocą klas cascading planner i Flow.

W ten sposób spędzasz czas skupiając się na swoim problemie, a nie na mechanice zarządzania zadaniami Hadoop itp. Możesz nawet warstwować różne języki na wierzchu (jak clojure lub jruby), aby jeszcze bardziej uprościć swój rozwój i aplikacje. http://www.cascading.org/modules.html

 7
Author: cwensel,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2010-03-26 19:25:42

Wykonałem łączenie zadań używając obiektów JobConf jeden po drugim. Wziąłem przykład WordCount do łączenia zadań. Jedno zadanie określa, ile razy słowo powtórzyło się w danym wyjściu. Drugie zadanie pobiera pierwsze wyjście zadania jako wejście i oblicza całkowitą liczbę słów w danym wejściu. Poniżej znajduje się kod, który należy umieścić w klasie kierowcy.

    //First Job - Counts, how many times a word encountered in a given file 
    JobConf job1 = new JobConf(WordCount.class);
    job1.setJobName("WordCount");

    job1.setOutputKeyClass(Text.class);
    job1.setOutputValueClass(IntWritable.class);

    job1.setMapperClass(WordCountMapper.class);
    job1.setCombinerClass(WordCountReducer.class);
    job1.setReducerClass(WordCountReducer.class);

    job1.setInputFormat(TextInputFormat.class);
    job1.setOutputFormat(TextOutputFormat.class);

    //Ensure that a folder with the "input_data" exists on HDFS and contains the input files
    FileInputFormat.setInputPaths(job1, new Path("input_data"));

    //"first_job_output" contains data that how many times a word occurred in the given file
    //This will be the input to the second job. For second job, input data name should be
    //"first_job_output". 
    FileOutputFormat.setOutputPath(job1, new Path("first_job_output"));

    JobClient.runJob(job1);


    //Second Job - Counts total number of words in a given file

    JobConf job2 = new JobConf(TotalWords.class);
    job2.setJobName("TotalWords");

    job2.setOutputKeyClass(Text.class);
    job2.setOutputValueClass(IntWritable.class);

    job2.setMapperClass(TotalWordsMapper.class);
    job2.setCombinerClass(TotalWordsReducer.class);
    job2.setReducerClass(TotalWordsReducer.class);

    job2.setInputFormat(TextInputFormat.class);
    job2.setOutputFormat(TextOutputFormat.class);

    //Path name for this job should match first job's output path name
    FileInputFormat.setInputPaths(job2, new Path("first_job_output"));

    //This will contain the final output. If you want to send this jobs output
    //as input to third job, then third jobs input path name should be "second_job_output"
    //In this way, jobs can be chained, sending output one to other as input and get the
    //final output
    FileOutputFormat.setOutputPath(job2, new Path("second_job_output"));

    JobClient.runJob(job2);

Polecenie uruchamiające te zadania to:

Bin / hadoop Jar TotalWords.

Musimy podać ostateczną nazwę zadania dla dowództwo. W powyższym przypadku jest to TotalWords.

 6
Author: psrklr,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2014-10-28 06:39:55

Możesz użyć oozie do przetwarzania zadań MapReduce. http://issues.apache.org/jira/browse/HADOOP-5303

 4
Author: user300313,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2010-03-23 21:05:43

Możesz uruchomić MR chain w sposób podany w kodzie.

Uwaga : podano tylko kod kierowcy

public class WordCountSorting {
// here the word keys shall be sorted
      //let us write the wordcount logic first

      public static void main(String[] args)throws IOException,InterruptedException,ClassNotFoundException {
            //THE DRIVER CODE FOR MR CHAIN
            Configuration conf1=new Configuration();
            Job j1=Job.getInstance(conf1);
            j1.setJarByClass(WordCountSorting.class);
            j1.setMapperClass(MyMapper.class);
            j1.setReducerClass(MyReducer.class);

            j1.setMapOutputKeyClass(Text.class);
            j1.setMapOutputValueClass(IntWritable.class);
            j1.setOutputKeyClass(LongWritable.class);
            j1.setOutputValueClass(Text.class);
            Path outputPath=new Path("FirstMapper");
            FileInputFormat.addInputPath(j1,new Path(args[0]));
                  FileOutputFormat.setOutputPath(j1,outputPath);
                  outputPath.getFileSystem(conf1).delete(outputPath);
            j1.waitForCompletion(true);
                  Configuration conf2=new Configuration();
                  Job j2=Job.getInstance(conf2);
                  j2.setJarByClass(WordCountSorting.class);
                  j2.setMapperClass(MyMapper2.class);
                  j2.setNumReduceTasks(0);
                  j2.setOutputKeyClass(Text.class);
                  j2.setOutputValueClass(IntWritable.class);
                  Path outputPath1=new Path(args[1]);
                  FileInputFormat.addInputPath(j2, outputPath);
                  FileOutputFormat.setOutputPath(j2, outputPath1);
                  outputPath1.getFileSystem(conf2).delete(outputPath1, true);
                  System.exit(j2.waitForCompletion(true)?0:1);
      }

}

CIĄG JEST

( JOB1 ) MAP->REDUCE-> ( JOB2) MAP
Zostało to zrobione, aby uporządkować klucze, ale istnieje więcej sposobów, takich jak użycie mapy drzewa
Ale chcę skupić waszą uwagę na sposobie, w jaki praca została przykuta!!
Dziękuję

 4
Author: Aniruddha Sinha,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-10-27 13:12:19

Istnieją przykłady w projekcie Apache Mahout, które łączą ze sobą wiele zadań MapReduce. Jeden z przykładów można znaleźć na stronie:

RecommenderJob.java

Http://search-lucene.com/c/Mahout:/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java%7C%7CRecommenderJob

 3
Author: Christie English,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2011-05-25 23:38:52

Możemy użyć metody waitForCompletion(true) zadania do zdefiniowania zależności pomiędzy zadaniem.

W moim scenariuszu miałem 3 prace, które były zależne od siebie. W klasie kierowcy użyłem poniższego kodu i działa zgodnie z oczekiwaniami.

public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub

        CCJobExecution ccJobExecution = new CCJobExecution();

        Job distanceTimeFraudJob = ccJobExecution.configureDistanceTimeFraud(new Configuration(),args[0], args[1]);
        Job spendingFraudJob = ccJobExecution.configureSpendingFraud(new Configuration(),args[0], args[1]);
        Job locationFraudJob = ccJobExecution.configureLocationFraud(new Configuration(),args[0], args[1]);

        System.out.println("****************Started Executing distanceTimeFraudJob ================");
        distanceTimeFraudJob.submit();
        if(distanceTimeFraudJob.waitForCompletion(true))
        {
            System.out.println("=================Completed DistanceTimeFraudJob================= ");
            System.out.println("=================Started Executing spendingFraudJob ================");
            spendingFraudJob.submit();
            if(spendingFraudJob.waitForCompletion(true))
            {
                System.out.println("=================Completed spendingFraudJob================= ");
                System.out.println("=================Started locationFraudJob================= ");
                locationFraudJob.submit();
                if(locationFraudJob.waitForCompletion(true))
                {
                    System.out.println("=================Completed locationFraudJob=================");
                }
            }
        }
    }
 3
Author: Shivaprasad,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2013-01-28 07:48:48

Nowa klasa org.Apacz.hadoop.mapreduce.lib.łańcuch.ChainMapper help this scenario

 2
Author: Xavi,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-02-01 16:29:30

Chociaż istnieją złożone serwerowe silniki przepływu pracy Hadoop, np. oozie, mam prostą bibliotekę java, która umożliwia wykonywanie wielu zadań Hadoop jako przepływu pracy. Konfiguracja zadania i przepływ pracy definiujące zależności między zadaniami są skonfigurowane w pliku JSON. Wszystko jest konfigurowalne zewnętrznie i nie wymaga żadnych zmian w istniejącej implementacji map reduce, aby być częścią przepływu pracy.

Szczegóły można znaleźć tutaj. Kod źródłowy i jar jest dostępny w github.

Http://pkghosh.wordpress.com/2011/05/22/hadoop-orchestration/

Pranab

 1
Author: Pranab,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2011-05-26 18:58:56

Myślę, że oozie pomaga kolejnym zadaniom odbierać dane wejściowe bezpośrednio z poprzedniego zadania. Pozwala to uniknąć operacji wejścia / wyjścia wykonywanej za pomocą jobcontrol.

 1
Author: stholy,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2012-11-13 22:28:59

Jeśli chcesz programowo łączyć swoje zadania, skorzystaj z JobControl. Użycie jest dość proste:

    JobControl jobControl = new JobControl(name);

Po tym dodajesz instancje ControlledJob. ControlledJob definiuje zadanie z jego zależnościami, automatycznie podłączając wejścia i wyjścia, aby pasowały do "łańcucha" zadań.

    jobControl.add(new ControlledJob(job, Arrays.asList(controlledjob1, controlledjob2));

    jobControl.run();

Rozpoczyna łańcuch. Będziesz chciał umieścić to w wątku speerate. Pozwala to sprawdzić stan Twojego łańcucha podczas jego działania:

    while (!jobControl.allFinished()) {
        System.out.println("Jobs in waiting state: " + jobControl.getWaitingJobList().size());
        System.out.println("Jobs in ready state: " + jobControl.getReadyJobsList().size());
        System.out.println("Jobs in running state: " + jobControl.getRunningJobList().size());
        List<ControlledJob> successfulJobList = jobControl.getSuccessfulJobList();
        System.out.println("Jobs in success state: " + successfulJobList.size());
        List<ControlledJob> failedJobList = jobControl.getFailedJobList();
        System.out.println("Jobs in failed state: " + failedJobList.size());
    }
 1
Author: Erik Schmiegelow,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2014-02-28 16:52:46

Jak już wspomniałeś w swoim wymaganiu, że chcesz, aby o / p z MRJob1 było i / p z MRJob2 i tak dalej, możesz rozważyć użycie Oozie workflow dla tej aplikacji. Możesz również rozważyć zapisanie danych pośrednich do HDFS, ponieważ będą one używane przez następnego MRJob. A po zakończeniu zadania możesz oczyścić swoje pośrednie dane.

<start to="mr-action1"/>
<action name="mr-action1">
   <!-- action for MRJob1-->
   <!-- set output path = /tmp/intermediate/mr1-->
    <ok to="end"/>
    <error to="end"/>
</action>

<action name="mr-action2">
   <!-- action for MRJob2-->
   <!-- set input path = /tmp/intermediate/mr1-->
    <ok to="end"/>
    <error to="end"/>
</action>

<action name="success">
        <!-- action for success-->
    <ok to="end"/>
    <error to="end"/>
</action>

<action name="fail">
        <!-- action for fail-->
    <ok to="end"/>
    <error to="end"/>
</action>

<end name="end"/>

 0
Author: Neha Kumari,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-03-02 10:43:26