Czym jest zadanie w Spark? Jak Spark worker wykonuje plik jar?
Po przeczytaniu jakiegoś dokumentu na http://spark.apache.org/docs/0.8.0/cluster-overview.html , Mam pytanie, które chcę wyjaśnić.
Weźmy ten przykład z Spark:
JavaSparkContext spark = new JavaSparkContext(
new SparkConf().setJars("...").setSparkHome....);
JavaRDD<String> file = spark.textFile("hdfs://...");
// step1
JavaRDD<String> words =
file.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String s) {
return Arrays.asList(s.split(" "));
}
});
// step2
JavaPairRDD<String, Integer> pairs =
words.map(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
// step3
JavaPairRDD<String, Integer> counts =
pairs.reduceByKey(new Function2<Integer, Integer>() {
public Integer call(Integer a, Integer b) {
return a + b;
}
});
counts.saveAsTextFile("hdfs://...");
Załóżmy więc, że mam 3 węzły klastra, a węzeł 1 działa jako master, a powyższy program sterownika został poprawnie skonfigurowany (powiedzmy application-test.jar). Więc teraz uruchamiam ten kod na głównym węźle i wierzę, że zaraz po utworzeniu SparkContext
, Test aplikacji.plik jar będzie skopiowane do węzłów roboczych (a każdy worker utworzy katalog dla tej aplikacji).
java -cp "application-test.jar" step1
i tak dalej? 2 answers
Kiedy tworzysz SparkContext
, każdy worker uruchamia executor. Jest to osobny proces (JVM) i ładuje również jar. Wykonawcy podłączyć z powrotem do programu sterownika. Teraz sterownik może wysyłać im polecenia, takie jak flatMap
, map
i reduceByKey
w twoim przykładzie. Gdy sterownik kończy pracę, wykonawcy wyłączają się.
RDD są czymś w rodzaju dużych tablic, które są podzielone na partycje i każdy executor może przechowywać niektóre z tych partycji.
A zadanie to polecenie wysłane od sterownika do executora poprzez serializację obiektu Function
. Executor deserializuje polecenie (jest to możliwe, ponieważ załadował Twój jar) i wykonuje je na partycji.
(jest to przegląd pojęciowy. Przeglądam niektóre szczegóły, ale mam nadzieję, że będą pomocne.)
Aby odpowiedzieć na twoje konkretne pytanie: nie, nowy proces nie jest rozpoczynany dla każdego kroku. Po zbudowaniu SparkContext
dla każdego workera uruchamiany jest nowy proces.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-08-21 05:38:32
Aby uzyskać jasny wgląd w sposób tworzenia i planowania zadań, musimy zrozumieć, jak działa Model wykonania w Spark. Krótko mówiąc, aplikacja w spark jest wykonywana w trzech krokach:
- Tworzenie wykresu RDD
- Utwórz plan wykonania zgodnie z wykresem RDD. Etapy są tworzone w tym kroku
- generowanie zadań w oparciu o plan i zaplanowanie ich pomiędzy pracownikami
W twoim przykładzie zliczania słów Wykres RDD jest raczej prosty, to coś jak follows:
File -> lines - > words- > per-word count -> global word count - > output
Na podstawie tego wykresu tworzone są dwa etapy. Reguła tworzenia sceny opiera się na idei rurociągu tak wielu wąskich przekształceń, jak to możliwe. W twoim przykładzie wąska transformacja kończy się na liczbie słów. Dlatego otrzymujesz dwa etapy
- file - > lines - > words - > per-word count
- global word count - > output
Gdy etapy są zorientowane, spark będzie generować zadania z etapów. Pierwszy etap utworzy zadania ShuffleMapTasks, a ostatni etap utworzy zadania ResultTasks, ponieważ w ostatnim etapie, jedna operacja działania jest uwzględniona w celu uzyskania wyników.
Liczba zadań do wygenerowania zależy od sposobu dystrybucji plików. Załóżmy, że masz 3 trzy różne pliki w trzech różnych węzłach, pierwszy etap wygeneruje 3 zadania : jedno zadanie na partycję.
Dlatego nie należy mapować kroków do zadań bezpośrednio. Zadanie należy do etapu i jest związane z partycją.
Zazwyczaj liczba zadań uruchomionych dla etapu jest dokładnie liczbą partycji końcowego RDD, ale ponieważ RDD mogą być współdzielone (a więc ShuffleMapStages
), ich liczba różni się w zależności od współdzielenia RDD/stage. Proszę odnieść się do Jak DAG działa pod pokrywami w RDD?
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 12:34:29