Jak używać niestandardowych klas z Apache Spark (pyspark)?
Napisałem klasę implementującą klasyfikator w Pythonie. Chciałbym użyć Apache Spark do równoległej klasyfikacji ogromnej liczby punktów danych za pomocą tego klasyfikatora.
- jestem skonfigurowany przy użyciu Amazon EC2 na klastrze z 10 niewolnikami, opartym na ami, który pochodzi z dystrybucji Anaconda Pythona na nim. Ami pozwala mi zdalnie korzystać z notebooka IPython.
- zdefiniowałem klasę BoTree w wywołaniu pliku BoTree.py na master w folderze /root / anaconda / lib / python2. 7/ czyli gdzie są wszystkie moje Moduły Pythona
- sprawdziłem czy mogę importować i używać BoTree.py po uruchomieniu linii poleceń spark od master (muszę tylko zacząć od zapisu import BoTree i moja klasa BoTree staje się dostępna
- użyłem spark ' s /root/spark-ec2/copy-dir.sh skrypt do skopiowania katalogu / python2. 7 / po moim klastrze.
- ssh-ed do jednego z niewolników i próbował uruchomić ipython tam, i był w stanie zaimportować BoTree, więc myślę, że moduł został wysłany przez klaster pomyślnie (widzę również BoTree.py plik w .../ python2. 7 / folder)
- na master, który sprawdziłem, mogę marynować i rozpakowywać instancję BoTree za pomocą cPickle, co jak rozumiem jest serializerem pyspark.
Jednak , kiedy wykonuję następujące czynności:
import BoTree
bo_tree = BoTree.train(data)
rdd = sc.parallelize(keyed_training_points) #create rdd of 10 (integer, (float, float) tuples
rdd = rdd.mapValues(lambda point, bt = bo_tree: bt.classify(point[0], point[1]))
out = rdd.collect()
Spark zawodzi z błędem (tylko odpowiedni bit myślę):
File "/root/spark/python/pyspark/worker.py", line 90, in main
command = pickleSer.loads(command.value)
File "/root/spark/python/pyspark/serializers.py", line 405, in loads
return cPickle.loads(obj)
ImportError: No module named BoroughTree
Czy ktoś może mi pomóc? Trochę zdesperowany...
Dzięki
2 answers
Prawdopodobnie najprostszym rozwiązaniem jest użycie argumentu pyFiles
Podczas tworzenia SparkContext
from pyspark import SparkContext
sc = SparkContext(master, app_name, pyFiles=['/path/to/BoTree.py'])
Każdy plik tam umieszczony zostanie wysłany do pracowników i dodany do PYTHONPATH
.
Jeśli pracujesz w trybie interaktywnym, musisz zatrzymać istniejący kontekst za pomocą sc.stop()
, zanim utworzysz nowy.
Upewnij się również, że Spark worker faktycznie używa dystrybucji Anaconda, a nie domyślnego interpretera Pythona. Na podstawie Twojego opisu jest to najprawdopodobniej problem. Aby ustawić PYSPARK_PYTHON
możesz użyć conf/spark-env.sh
plików.
Na marginesie kopiowanie pliku do lib
jest dość niechlujnym rozwiązaniem. Jeśli chcesz uniknąć wypychania plików za pomocą pyFiles
, polecam utworzenie zwykłego pakietu Pythona lub pakietu Conda i odpowiednią instalację. W ten sposób możesz łatwo śledzić co jest zainstalowane, usuwać niepotrzebne pakiety i unikać trudnych do debugowania problemów.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-06-28 15:04:16
Po pozyskaniu SparkContext można również użyć addPyFile
, aby następnie wysłać moduł do każdego pracownika.
sc.addPyFile('/path/to/BoTree.py')
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-03-19 10:55:41