Question Existe-t-il un flux FIFO à Scala?


Je recherche un flux FIFO à Scala, c’est-à-dire quelque chose qui offre les fonctionnalités de

  • immuable.Stream (un flux pouvant être fini et mémorisant les éléments déjà lus)
  • mutable.Queue (qui permet d'ajouter des éléments au FIFO)

Le flux doit pouvoir être fermé et bloquer l’accès à l’élément suivant jusqu’à ce que l’élément ait été ajouté ou que le flux ait été fermé.

En fait, je suis un peu surpris que la bibliothèque de la collection n'inclue pas (semble-t-il) une telle structure de données, car celle-ci est plutôt classique.

Mes questions:

  • 1) Ai-je oublié quelque chose? Y a-t-il déjà une classe fournissant cette fonctionnalité?

  • 2) OK, s’il n’est pas inclus dans la bibliothèque de collections, il se peut qu’il n’y ait qu’une combinaison triviale de classes de collections existantes. Cependant, j'ai essayé de trouver ce code trivial mais mon implémentation semble encore assez complexe pour un problème aussi simple. Existe-t-il une solution plus simple pour un tel FifoStream?

    class FifoStream[T] extends Closeable {
    
    val queue = new Queue[Option[T]]
    
    lazy val stream = nextStreamElem
    
    private def nextStreamElem: Stream[T] = next() match {
        case Some(elem) => Stream.cons(elem, nextStreamElem)
        case None       => Stream.empty
    }
    
    /** Returns next element in the queue (may wait for it to be inserted). */
    private def next() = {
        queue.synchronized {
            if (queue.isEmpty) queue.wait()
            queue.dequeue()
        }
    }
    
    /** Adds new elements to this stream. */
    def enqueue(elems: T*) {
        queue.synchronized {
            queue.enqueue(elems.map{Some(_)}: _*)
            queue.notify()
        }
    }
    
    /** Closes this stream. */
    def close() {
        queue.synchronized {
            queue.enqueue(None)
            queue.notify()
        }
    }
    }
    

La solution de Paradigmatic (modifiée visuellement)

Merci pour vos suggestions. J'ai légèrement modifié la solution de paradigmatic pour que toStream retourne un flux immuable (permet des lectures répétables) pour qu'il réponde à mes besoins. Juste pour être complet, voici le code:

import collection.JavaConversions._
import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}

class FIFOStream[A]( private val queue: BlockingQueue[Option[A]] = new LinkedBlockingQueue[Option[A]]() ) {
  lazy val toStream: Stream[A] = queue2stream
  private def queue2stream: Stream[A] = queue take match {
    case Some(a) => Stream cons ( a, queue2stream )
    case None    => Stream empty
  }
  def close() = queue add None
  def enqueue( as: A* ) = queue addAll as.map( Some(_) )
}

14
2017-09-26 09:42


origine


Réponses:


Dans Scala, les flux sont des "itérateurs fonctionnels". Les gens s'attendent à ce qu'ils soient purs (sans effets secondaires) et immuables. Dans votre cas, chaque fois que vous effectuez une itération sur le flux, vous modifiez la file d'attente (elle n'est donc pas pure). Cela peut créer beaucoup de malentendus, car itérer deux fois le même flux aura deux résultats différents.

Cela étant dit, vous devriez plutôt utiliser Java BlockingQueues, plutôt que de déployer votre propre implémentation. Ils sont considérés comme bien mis en œuvre en termes de sécurité et de performances. Voici le code le plus propre auquel je puisse penser (en utilisant votre approche):

import java.util.concurrent.BlockingQueue
import scala.collection.JavaConversions._

class FIFOStream[A]( private val queue: BlockingQueue[Option[A]] ) {
  def toStream: Stream[A] = queue take match {
    case Some(a) => Stream cons ( a, toStream )
    case None => Stream empty
  }
  def close() = queue add None
  def enqueue( as: A* ) = queue addAll as.map( Some(_) )
}

object FIFOStream {
  def apply[A]() = new LinkedBlockingQueue
}

15
2017-09-27 09:04



Je suppose que vous cherchez quelque chose comme java.util.concurrent.BlockingQueue?

Akka a un BoundedBlockingQueue implémentation de cette interface. Il y a bien entendu les implémentations disponibles dans java.util.concurrent.

Vous pourriez aussi envisager d'utiliser les Akka acteurs pour ce que tu fais. Utilisez les acteurs pour être averti ou poussé un nouvel événement ou un message au lieu de tirer.


2
2017-09-26 11:08



1) Il semble que vous cherchiez un flux de données vu dans des langues comme Oz, qui supporte le modèle producteur-consommateur. Une telle collection n'est pas disponible dans l'API des collections, mais vous pouvez toujours en créer une vous-même.

2) Le flux de données repose sur le concept de variables d'affectation unique (de sorte qu'ils ne doivent pas être initialisés au point de déclaration et que leur lecture avant l'initialisation entraîne un blocage):

val x: Int
startThread {
  println(x)
}
println("The other thread waits for the x to be assigned")
x = 1

Il serait simple d'implémenter un tel flux si des variables d'affectation unique (ou de flux de données) étaient prises en charge dans le langage (voir le lien). Comme ils ne font pas partie de Scala, vous devez utiliser le wait-synchronized-notify modèle comme vous l'avez fait.

Files d'attente concurrentes de Java peut être utilisé pour y parvenir, comme l’a suggéré l’autre utilisateur.


0
2017-09-26 10:50