Question Itérer sur des lignes dans un fichier en parallèle (Scala)?


Je connais les collections parallèles à Scala. Ils sont pratiques! Cependant, je voudrais parcourir les lignes d'un fichier trop volumineux pour la mémoire en parallèle. Je pourrais créer des threads et configurer un verrou sur un scanner, par exemple, mais ce serait bien si je pouvais exécuter du code tel que:

Source.fromFile(path).getLines.par foreach { line =>

Malheureusement, cependant

error: value par is not a member of Iterator[String]

Quelle est la manière la plus simple de réaliser un parallélisme ici? Pour l'instant, je vais lire quelques lignes et les traiter en parallèle.


34
2017-07-19 17:34


origine


Réponses:


Vous pouvez utiliser le regroupement pour découper facilement l'itérateur en morceaux que vous pouvez charger en mémoire, puis traiter en parallèle.

val chunkSize = 128 * 1024
val iterator = Source.fromFile(path).getLines.grouped(chunkSize)
iterator.foreach { lines => 
    lines.par.foreach { line => process(line) }
}

À mon avis, quelque chose comme ça est la façon la plus simple de le faire.


31
2017-07-19 20:18



Je vais mettre cela comme une réponse séparée car il est fondamentalement différent de mon dernier (et cela fonctionne réellement)

Voici un aperçu d'une solution utilisant des acteurs, qui est essentiellement ce que décrit le commentaire de Kim Stebel. Il existe deux classes d'acteurs, un seul acteur FileReader qui lit les lignes individuelles du fichier à la demande et plusieurs acteurs Worker. Les travailleurs envoient tous des demandes de lignes au lecteur et traitent les lignes en parallèle lorsqu'elles sont lues à partir du fichier.

J'utilise les acteurs d'Akka ici, mais utiliser une autre implémentation est fondamentalement la même idée.

case object LineRequest
case object BeginProcessing

class FileReader extends Actor {

  //reads a single line from the file or returns None if EOF
  def getLine:Option[String] = ...

  def receive = {
    case LineRequest => self.sender.foreach{_ ! getLine} //sender is an Option[ActorRef]
  }
}

class Worker(reader: ActorRef) extends Actor {

  def process(line:String) ...

  def receive = {
    case BeginProcessing => reader ! LineRequest
    case Some(line) => {
      process(line)
      reader ! LineRequest
    }
    case None => self.stop
  }
}

val reader = actorOf[FileReader].start    
val workers = Vector.fill(4)(actorOf(new Worker(reader)).start)
workers.foreach{_ ! BeginProcessing}
//wait for the workers to stop...

De cette façon, pas plus de 4 lignes (ou le nombre de travailleurs que vous avez) non traitées sont en mémoire à la fois.


10
2017-07-20 14:19



Les commentaires sur la réponse de Dan Simon m'ont fait réfléchir. Pourquoi ne pas essayer d'envelopper la source dans un flux:

def src(source: Source) = Stream[String] = {
  if (source.hasNext) Stream.cons(source.takeWhile( _ != '\n' ).mkString)
  else Stream.empty
}

Ensuite, vous pourriez le consommer en parallèle comme ceci:

src(Source.fromFile(path)).par foreach process

J'ai essayé ceci, et il compile et fonctionne à tout moment. Je ne suis pas vraiment sûr si le fichier est chargé en mémoire ou non, mais je ne le fais pas pense c'est.


0
2017-07-20 04:29



Je me rends compte que c'est une vieille question, mais vous pouvez trouver le ParIterator mise en œuvre dans le bibliothèque d'itérations être une implémentation utile sans assemblage de ceci:

scala> import com.timgroup.iterata.ParIterator.Implicits._
scala> val it = (1 to 100000).toIterator.par().map(n => (n + 1, Thread.currentThread.getId))
scala> it.map(_._2).toSet.size
res2: Int = 8 // addition was distributed over 8 threads

0
2018-06-12 00:01



Ci-dessous m'a aidé à atteindre

source.getLines.toStream.par.foreach( line => println(line))

0
2018-06-09 10:36



Nous avons fini par écrire une solution personnalisée dans notre entreprise afin que nous comprenions exactement le parallélisme.


-2
2018-03-13 20:28