Jak używać niestandardowych klas z Apache Spark (pyspark)?

Napisałem klasę implementującą klasyfikator w Pythonie. Chciałbym użyć Apache Spark do równoległej klasyfikacji ogromnej liczby punktów danych za pomocą tego klasyfikatora.

  1. jestem skonfigurowany przy użyciu Amazon EC2 na klastrze z 10 niewolnikami, opartym na ami, który pochodzi z dystrybucji Anaconda Pythona na nim. Ami pozwala mi zdalnie korzystać z notebooka IPython.
  2. zdefiniowałem klasę BoTree w wywołaniu pliku BoTree.py na master w folderze /root / anaconda / lib / python2. 7/ czyli gdzie są wszystkie moje Moduły Pythona
  3. sprawdziłem czy mogę importować i używać BoTree.py po uruchomieniu linii poleceń spark od master (muszę tylko zacząć od zapisu import BoTree i moja klasa BoTree staje się dostępna
  4. użyłem spark ' s /root/spark-ec2/copy-dir.sh skrypt do skopiowania katalogu / python2. 7 / po moim klastrze.
  5. ssh-ed do jednego z niewolników i próbował uruchomić ipython tam, i był w stanie zaimportować BoTree, więc myślę, że moduł został wysłany przez klaster pomyślnie (widzę również BoTree.py plik w .../ python2. 7 / folder)
  6. na master, który sprawdziłem, mogę marynować i rozpakowywać instancję BoTree za pomocą cPickle, co jak rozumiem jest serializerem pyspark.

Jednak , kiedy wykonuję następujące czynności:

import BoTree
bo_tree = BoTree.train(data)
rdd = sc.parallelize(keyed_training_points) #create rdd of 10 (integer, (float, float) tuples
rdd = rdd.mapValues(lambda point, bt = bo_tree: bt.classify(point[0], point[1]))
out = rdd.collect()

Spark zawodzi z błędem (tylko odpowiedni bit myślę):

  File "/root/spark/python/pyspark/worker.py", line 90, in main
    command = pickleSer.loads(command.value)
  File "/root/spark/python/pyspark/serializers.py", line 405, in loads
    return cPickle.loads(obj)
ImportError: No module named BoroughTree
Czy ktoś może mi pomóc? Trochę zdesperowany...

Dzięki

Author: user3279453, 2015-06-27

2 answers

Prawdopodobnie najprostszym rozwiązaniem jest użycie argumentu pyFiles Podczas tworzenia SparkContext

from pyspark import SparkContext
sc = SparkContext(master, app_name, pyFiles=['/path/to/BoTree.py'])

Każdy plik tam umieszczony zostanie wysłany do pracowników i dodany do PYTHONPATH.

Jeśli pracujesz w trybie interaktywnym, musisz zatrzymać istniejący kontekst za pomocą sc.stop(), zanim utworzysz nowy.

Upewnij się również, że Spark worker faktycznie używa dystrybucji Anaconda, a nie domyślnego interpretera Pythona. Na podstawie Twojego opisu jest to najprawdopodobniej problem. Aby ustawić PYSPARK_PYTHON możesz użyć conf/spark-env.sh plików.

Na marginesie kopiowanie pliku do lib jest dość niechlujnym rozwiązaniem. Jeśli chcesz uniknąć wypychania plików za pomocą pyFiles, polecam utworzenie zwykłego pakietu Pythona lub pakietu Conda i odpowiednią instalację. W ten sposób możesz łatwo śledzić co jest zainstalowane, usuwać niepotrzebne pakiety i unikać trudnych do debugowania problemów.

 14
Author: zero323,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-06-28 15:04:16

Po pozyskaniu SparkContext można również użyć addPyFile, aby następnie wysłać moduł do każdego pracownika.

sc.addPyFile('/path/to/BoTree.py')

Pyspark.SparkContext.addPyFile ( path ) documentation

 11
Author: dmbaker,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-03-19 10:55:41