Czym jest zadanie w Spark? Jak Spark worker wykonuje plik jar?

Po przeczytaniu jakiegoś dokumentu na http://spark.apache.org/docs/0.8.0/cluster-overview.html , Mam pytanie, które chcę wyjaśnić.

Weźmy ten przykład z Spark:

JavaSparkContext spark = new JavaSparkContext(
  new SparkConf().setJars("...").setSparkHome....);
JavaRDD<String> file = spark.textFile("hdfs://...");

// step1
JavaRDD<String> words =
  file.flatMap(new FlatMapFunction<String, String>() {
    public Iterable<String> call(String s) {
      return Arrays.asList(s.split(" "));
    }
  });

// step2
JavaPairRDD<String, Integer> pairs =
  words.map(new PairFunction<String, String, Integer>() {
    public Tuple2<String, Integer> call(String s) {
      return new Tuple2<String, Integer>(s, 1);
    }
  });

// step3
JavaPairRDD<String, Integer> counts =
  pairs.reduceByKey(new Function2<Integer, Integer>() {
    public Integer call(Integer a, Integer b) {
      return a + b;
    }
  });

counts.saveAsTextFile("hdfs://...");

Załóżmy więc, że mam 3 węzły klastra, a węzeł 1 działa jako master, a powyższy program sterownika został poprawnie skonfigurowany (powiedzmy application-test.jar). Więc teraz uruchamiam ten kod na głównym węźle i wierzę, że zaraz po utworzeniu SparkContext, Test aplikacji.plik jar będzie skopiowane do węzłów roboczych (a każdy worker utworzy katalog dla tej aplikacji).

Więc teraz moje pytanie: Czy w przykładowych zadaniach Krok 1, Krok 2 i krok 3 są wysyłane do pracowników? Jeśli tak, to w jaki sposób pracownik to wykonuje? Jak java -cp "application-test.jar" step1 i tak dalej?
Author: gsamaras, 2014-08-13

2 answers

Kiedy tworzysz SparkContext, każdy worker uruchamia executor. Jest to osobny proces (JVM) i ładuje również jar. Wykonawcy podłączyć z powrotem do programu sterownika. Teraz sterownik może wysyłać im polecenia, takie jak flatMap, map i reduceByKey w twoim przykładzie. Gdy sterownik kończy pracę, wykonawcy wyłączają się.

RDD są czymś w rodzaju dużych tablic, które są podzielone na partycje i każdy executor może przechowywać niektóre z tych partycji.

A zadanie to polecenie wysłane od sterownika do executora poprzez serializację obiektu Function. Executor deserializuje polecenie (jest to możliwe, ponieważ załadował Twój jar) i wykonuje je na partycji.

(jest to przegląd pojęciowy. Przeglądam niektóre szczegóły, ale mam nadzieję, że będą pomocne.)


Aby odpowiedzieć na twoje konkretne pytanie: nie, nowy proces nie jest rozpoczynany dla każdego kroku. Po zbudowaniu SparkContext dla każdego workera uruchamiany jest nowy proces.

 75
Author: Daniel Darabos,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-08-21 05:38:32

Aby uzyskać jasny wgląd w sposób tworzenia i planowania zadań, musimy zrozumieć, jak działa Model wykonania w Spark. Krótko mówiąc, aplikacja w spark jest wykonywana w trzech krokach:

  1. Tworzenie wykresu RDD
  2. Utwórz plan wykonania zgodnie z wykresem RDD. Etapy są tworzone w tym kroku
  3. generowanie zadań w oparciu o plan i zaplanowanie ich pomiędzy pracownikami

W twoim przykładzie zliczania słów Wykres RDD jest raczej prosty, to coś jak follows:

File -> lines - > words- > per-word count -> global word count - > output

Na podstawie tego wykresu tworzone są dwa etapy. Reguła tworzenia sceny opiera się na idei rurociągu tak wielu wąskich przekształceń, jak to możliwe. W twoim przykładzie wąska transformacja kończy się na liczbie słów. Dlatego otrzymujesz dwa etapy

  1. file - > lines - > words - > per-word count
  2. global word count - > output

Gdy etapy są zorientowane, spark będzie generować zadania z etapów. Pierwszy etap utworzy zadania ShuffleMapTasks, a ostatni etap utworzy zadania ResultTasks, ponieważ w ostatnim etapie, jedna operacja działania jest uwzględniona w celu uzyskania wyników.

Liczba zadań do wygenerowania zależy od sposobu dystrybucji plików. Załóżmy, że masz 3 trzy różne pliki w trzech różnych węzłach, pierwszy etap wygeneruje 3 zadania : jedno zadanie na partycję.

Dlatego nie należy mapować kroków do zadań bezpośrednio. Zadanie należy do etapu i jest związane z partycją.

Zazwyczaj liczba zadań uruchomionych dla etapu jest dokładnie liczbą partycji końcowego RDD, ale ponieważ RDD mogą być współdzielone (a więc ShuffleMapStages), ich liczba różni się w zależności od współdzielenia RDD/stage. Proszę odnieść się do Jak DAG działa pod pokrywami w RDD?

 27
Author: Hui Wang,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 12:34:29