Iteracja linii w pliku równolegle (Scala)?

Wiem o kolekcjach równoległych w Scali. Są poręczne! Jednak chciałbym iterację nad liniami pliku, który jest zbyt duży dla pamięci równolegle. Mógłbym na przykład utworzyć wątki i ustawić blokadę nad skanerem, ale byłoby świetnie, gdybym mógł uruchomić kod taki jak:

Source.fromFile(path).getLines.par foreach { line =>

Niestety, jednak

error: value par is not a member of Iterator[String]

Jaki jest najprostszy sposób na osiągnięcie podobieństwa? Na razie przeczytam w kilku linijkach i zajmę się nimi równolegle.

Author: schmmd, 2011-07-19

6 answers

Możesz użyć grupowania, aby łatwo pokroić iterator na kawałki, które możesz załadować do pamięci, a następnie przetworzyć równolegle.

val chunkSize = 128 * 1024
val iterator = Source.fromFile(path).getLines.grouped(chunkSize)
iterator.foreach { lines => 
    lines.par.foreach { line => process(line) }
}

Moim zdaniem, coś takiego jest najprostszym sposobem, aby to zrobić.

 31
Author: Joshua Hartman,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2011-07-19 20:18:16

Ujmę to jako osobną odpowiedź, ponieważ zasadniczo różni się od mojej poprzedniej (i faktycznie działa)

Oto zarys rozwiązania wykorzystującego aktorów, który jest w zasadzie tym, co opisuje komentarz Kim Stebel. Istnieją dwie klasy aktorów, jeden aktor FileReader, który odczytuje poszczególne linie z pliku na żądanie, i kilku aktorów pracujących. Wszyscy pracownicy wysyłają wnioski o linie do czytnika, a linie przetwarzania równolegle, gdy są odczytywane z plik.

Używam tutaj Akka actors, ale użycie innej implementacji to w zasadzie ten sam pomysł.

case object LineRequest
case object BeginProcessing

class FileReader extends Actor {

  //reads a single line from the file or returns None if EOF
  def getLine:Option[String] = ...

  def receive = {
    case LineRequest => self.sender.foreach{_ ! getLine} //sender is an Option[ActorRef]
  }
}

class Worker(reader: ActorRef) extends Actor {

  def process(line:String) ...

  def receive = {
    case BeginProcessing => reader ! LineRequest
    case Some(line) => {
      process(line)
      reader ! LineRequest
    }
    case None => self.stop
  }
}

val reader = actorOf[FileReader].start    
val workers = Vector.fill(4)(actorOf(new Worker(reader)).start)
workers.foreach{_ ! BeginProcessing}
//wait for the workers to stop...

W ten sposób nie więcej niż 4 (lub jak wielu pracowników masz) nieprzetworzonych linii są w pamięci na raz.

 10
Author: Dan Simon,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2011-07-20 14:19:37

Komentarze do odpowiedzi dana Simona dały mi do myślenia. Może spróbujemy zawinąć źródło W Strumień:

def src(source: Source) = Stream[String] = {
  if (source.hasNext) Stream.cons(source.takeWhile( _ != '\n' ).mkString)
  else Stream.empty
}

Wtedy można by je spożywać równolegle w ten sposób:

src(Source.fromFile(path)).par foreach process
[2]} wypróbowałem to, i to kompiluje i działa w każdym razie. Nie jestem pewien, czy ładuje cały plik do pamięci, czy nie, ale nie wydaje mi się, że jest.
 0
Author: Ian McLaird,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2011-07-20 04:48:09

Zdaję sobie sprawę, że to stare pytanie, ale może się okazać, że implementacja ParIterator W iterata library jest użyteczną implementacją nie wymaganą przez assembly:

scala> import com.timgroup.iterata.ParIterator.Implicits._
scala> val it = (1 to 100000).toIterator.par().map(n => (n + 1, Thread.currentThread.getId))
scala> it.map(_._2).toSet.size
res2: Int = 8 // addition was distributed over 8 threads
 0
Author: ms-tg,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-06-12 00:01:04

Poniżej pomógł mi osiągnąć

source.getLines.toStream.par.foreach( line => println(line))
 0
Author: Mahesh Pujari,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-06-09 10:36:48

W końcu napisaliśmy niestandardowe rozwiązanie w naszej firmie, abyśmy dokładnie zrozumieli równoległość.

 -2
Author: schmmd,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-03-13 20:28:05