Question Créer des observables sans utiliser Observable.create


J'utilise RxJava dans mon application Android et je souhaite charger des données depuis la base de données.

De cette façon, je crée une nouvelle observable en utilisant Observable.create() qui renvoie une liste de EventLog

public Observable<List<EventLog>> loadEventLogs() {
    return Observable.create(new Observable.OnSubscribe<List<EventLog>>() {
        @Override
        public void call(Subscriber<? super List<EventLog>> subscriber) {
            List<DBEventLog> logs = new Select().from(DBEventLog.class).execute();
            List<EventLog> eventLogs = new ArrayList<>(logs.size());
            for (int i = 0; i < logs.size(); i++) {
                eventLogs.add(new EventLog(logs.get(i)));
            }
            subscriber.onNext(eventLogs);
        }
    });
}

Bien que cela fonctionne correctement, j'ai lu cela en utilisant Observable.create() n'est pas réellement une bonne pratique pour Rx Java (voir ici).

J'ai donc changé cette méthode de cette manière.

public Observable<List<EventLog>> loadEventLogs() {
    return Observable.fromCallable(new Func0<List<EventLog>>() {
        @Override
        public List<EventLog> call() {
            List<DBEventLog> logs = new Select().from(DBEventLog.class).execute();
            List<EventLog> eventLogs = new ArrayList<>(logs.size());
            for (int i = 0; i < logs.size(); i++) {
                eventLogs.add(new EventLog(logs.get(i)));
            }
            return eventLogs;
        }
    });
}

Est-ce une meilleure approche en utilisant Rx Java? Pourquoi? Quelle est la différence entre les deux méthodes?

De plus, puisque la base de données charge une liste d'éléments, il est logique d'émettre la liste complète en une fois? Ou devrais-je émettre un article à la fois?


32
2017-12-10 14:45


origine


Réponses:


Les deux méthodes peuvent se ressembler et se comporter de manière similaire mais fromCallable traite des difficultés de contre-pression pour vous alors que le create version pas. Faire face à la contre-pression dans un OnSubscribe la mise en œuvre va du simple au purement mental; cependant, si vous l'omettez, vous pouvez obtenir MissingBackpressureExceptions le long de limites asynchrones (telles que observeOn) ou même sur des limites de continuation (telles que concat).

RxJava essaie de proposer un support de contre-pression approprié pour autant d'usines et d'opérateurs que possible, cependant, il existe de nombreuses usines et opérateurs qui ne peuvent pas le prendre en charge.

Le deuxième problème avec le manuel OnSubscribe la mise en œuvre est le manque de support d'annulation, surtout si vous générez beaucoup de onNext appels. Beaucoup d'entre eux peuvent être remplacés par des méthodes d'usine standard (telles que from) ou des classes auxiliaires (telles que SyncOnSubscribe) qui traitent de toute la complexité pour vous.

Vous pouvez trouver beaucoup d'introductions et d'exemples que (encore) utilisent create pour deux raisons.

  1. Il est beaucoup plus facile d’introduire des flux de données basés sur les push en montrant comment la poussée des événements fonctionne de manière impérative. A mon avis, de telles sources passent trop de temps avec create proportionnellement au lieu de parler des méthodes d'usine standard et de montrer comment certaines tâches communes (comme la vôtre) peuvent être réalisées en toute sécurité.
  2. Bon nombre de ces exemples ont été créés au moment où RxJava ne nécessitait pas de support de contre-pression ou même de support d'annulation synchrone approprié ou étaient simplement portés à partir des exemples Rx.NET suppose.) Générer des valeurs en appelant onNext était sans souci à l’époque. Cependant, une telle utilisation entraîne un gonflement de la mémoire tampon et une utilisation excessive de la mémoire. Par conséquent, l'équipe Netflix a proposé un moyen de limiter l'utilisation de la mémoire en demandant aux observateurs d'indiquer le nombre d'éléments qu'ils souhaitaient utiliser. Cela est devenu connu sous le nom de contre-pression.

Pour la deuxième question, à savoir si l'on doit créer une liste ou une séquence de valeurs, cela dépend de votre source. Si votre source prend en charge une sorte d'itération ou de diffusion en continu d'éléments de données individuels (tels que JDBC), vous pouvez simplement y accéder et émettre un par un (voir SyncOnSubscribe). S'il ne le prend pas en charge ou si vous en avez besoin sous forme de liste, conservez-le tel quel. Vous pouvez toujours convertir entre les deux formes via toListet flatMapIterable si nécessaire.


34
2017-12-10 16:03



Comme il l'explique dans la réponse vous avez lié, avec Observable.create vous devrez peut-être violer les exigences avancées de RxJava.

Par exemple, vous devez implémenter contre-pression, ou comment vous désabonner.

Dans votre cas, vous souhaitez émettre un article sans avoir à faire face à une contre-pression ou à un abonnement. Alors Observable.fromCallable est un bon appel. RxJava s'occupera du reste.


2
2017-12-10 15:34