Hadoop DistributedCache jest przestarzały - jakie jest preferowane API?
Moje zadania z mapą wymagają pewnych danych konfiguracyjnych, które chciałbym rozpowszechnić poprzez rozproszoną pamięć podręczną.
Hadoop MapReduce Tutorial pokazuje użycie klasy DistributedCache, mniej więcej w następujący sposób:
// In the driver
JobConf conf = new JobConf(getConf(), WordCount.class);
...
DistributedCache.addCacheFile(new Path(filename).toUri(), conf);
// In the mapper
Path[] myCacheFiles = DistributedCache.getLocalCacheFiles(job);
...
Jednak DistributedCache
jest oznaczony jako przestarzały w Hadoop 2.2.0.
Jaki jest nowy preferowany sposób, aby to osiągnąć? Czy istnieje aktualny przykład lub samouczek dotyczący tego API?
6 answers
API dla rozproszonej pamięci podręcznej można znaleźć w samej klasie pracy. Sprawdź dokumentację tutaj: http://hadoop.apache.org/docs/stable2/api/org/apache/hadoop/mapreduce/Job.html Kod powinien być podobny do
Job job = new Job();
...
job.addCacheFile(new Path(filename).toUri());
W kodzie Mapy:
Path[] localPaths = context.getLocalCacheFiles();
...
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2014-01-20 17:53:27
Aby rozwinąć @jtravaglini, preferowany sposób użycia DistributedCache
dla przędzy/MapReduce 2 jest następujący:
W swoim kierowcy użyj Job.addCacheFile()
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Job job = Job.getInstance(conf, "MyJob");
job.setMapperClass(MyMapper.class);
// ...
// Mind the # sign after the absolute file location.
// You will be using the name after the # sign as your
// file name in your Mapper/Reducer
job.addCacheFile(new URI("/user/yourname/cache/some_file.json#some"));
job.addCacheFile(new URI("/user/yourname/cache/other_file.json#other"));
return job.waitForCompletion(true) ? 0 : 1;
}
I w Twoim Maperze/reduktorze zastąp metodę setup(Context context)
:
@Override
protected void setup(
Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
if (context.getCacheFiles() != null
&& context.getCacheFiles().length > 0) {
File some_file = new File("./some");
File other_file = new File("./other");
// Do things to these two files, like read them
// or parse as JSON or whatever.
}
super.setup(context);
}
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2014-10-17 08:36:15
Nowy DistributedCache API dla YARN / MR2 znajduje się w klasie org.apache.hadoop.mapreduce.Job
.
Job.addCacheFile()
Niestety, nie ma jeszcze wielu kompleksowych przykładów w stylu samouczka.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2014-01-20 17:58:04
Nie użyłem Hioba.addCacheFile (). Zamiast tego użyłem opcji-files, takiej jak " - files/path/to / myfile.txt#myfile " jak poprzednio. Następnie w kodzie mapera lub reduktora stosuję metodę poniżej:
/**
* This method can be used with local execution or HDFS execution.
*
* @param context
* @param symLink
* @param throwExceptionIfNotFound
* @return
* @throws IOException
*/
public static File findDistributedFileBySymlink(JobContext context, String symLink, boolean throwExceptionIfNotFound) throws IOException
{
URI[] uris = context.getCacheFiles();
if(uris==null||uris.length==0)
{
if(throwExceptionIfNotFound)
throw new RuntimeException("Unable to find file with symlink '"+symLink+"' in distributed cache");
return null;
}
URI symlinkUri = null;
for(URI uri: uris)
{
if(symLink.equals(uri.getFragment()))
{
symlinkUri = uri;
break;
}
}
if(symlinkUri==null)
{
if(throwExceptionIfNotFound)
throw new RuntimeException("Unable to find file with symlink '"+symLink+"' in distributed cache");
return null;
}
//if we run this locally the file system URI scheme will be "file" otherwise it should be a symlink
return "file".equalsIgnoreCase(FileSystem.get(context.getConfiguration()).getScheme())?(new File(symlinkUri.getPath())):new File(symLink);
}
Następnie w maperze / reduktorze:
@Override
protected void setup(Context context) throws IOException, InterruptedException
{
super.setup(context);
File file = HadoopUtils.findDistributedFileBySymlink(context,"myfile",true);
... do work ...
}
Zauważ, że jeśli użyłem " - files / path / to / myfile.txt "bezpośrednio potem muszę użyć" myfile.txt " aby uzyskać dostęp do pliku, ponieważ jest to domyślna nazwa dowiązania symbolicznego.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-10-15 01:10:03
Żadne z wymienionych rozwiązań nie działało dla mnie w kompletności . Może dlatego, że wersja Hadoop ciągle się zmienia używam hadoop 2.6.4. Zasadniczo DistributedCache jest przestarzały, więc nie chciałem tego używać. Jak niektórzy z postów sugerują nam użycie addCacheFile (), jednak trochę się to zmieniło. Oto jak to dla mnie działało
job.addCacheFile(new URI("hdfs://X.X.X.X:9000/EnglishStop.txt#EnglishStop.txt"));
Tutaj X. X. X. X może być głównym adresem IP lub localhost. / Align = "Left" / txt był przechowywany w HDFS w / location.
hadoop fs -ls /
Wyjście jest
-rw-r--r-- 3 centos supergroup 1833 2016-03-12 20:24 /EnglishStop.txt
drwxr-xr-x - centos supergroup 0 2016-03-12 19:46 /test
[[4]] śmieszne, ale wygodne, # Anglistop.txt oznacza, że teraz możemy uzyskać do niego dostęp jako " EnglishStop.txt " w maperze. Oto kod dla tego samego
public void setup(Context context) throws IOException, InterruptedException
{
File stopwordFile = new File("EnglishStop.txt");
FileInputStream fis = new FileInputStream(stopwordFile);
BufferedReader reader = new BufferedReader(new InputStreamReader(fis));
while ((stopWord = reader.readLine()) != null) {
// stopWord is a word read from Cache
}
}
To mi po prostu zadziałało. Możesz odczytać linię z pliku zapisanego w HDFSWarning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-03-13 10:30:12
Miałem ten sam problem. I nie tylko jest DistributedCach przestarzałe, ale getLocalCacheFiles i "new Job" zbyt. Więc to, co działało dla mnie, jest następujące:
Kierowca:
Configuration conf = getConf();
Job job = Job.getInstance(conf);
...
job.addCacheFile(new Path(filename).toUri());
W ustawieniu Mapera / reduktora:
@Override
protected void setup(Context context) throws IOException, InterruptedException
{
super.setup(context);
URI[] files = context.getCacheFiles(); // getCacheFiles returns null
Path file1path = new Path(files[0])
...
}
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-06-01 12:33:01