Question Réindexer Elastic search via Bulk API, numériser et faire défiler


J'essaie de réindexer ma configuration de recherche Elastic, en train de regarder la documentation de recherche Elastic et un exemple d'utilisation de l'API Python

Je suis un peu confus quant à la façon dont tout cela fonctionne si. J'ai pu obtenir l'ID de défilement de l'API Python:

es = Elasticsearch("myhost")

index = "myindex"
query = {"query":{"match_all":{}}}
response = es.search(index= index, doc_type= "my-doc-type", body= query, search_type= "scan", scroll= "10m")

scroll_id = response["_scroll_id"]

Maintenant, ma question est: à quoi cela sert-il pour moi? Qu'est-ce que la connaissance de l'id du défilement me donne même? La documentation dit d'utiliser "l'API en bloc" mais je n'ai aucune idée de la façon dont le scoll_id prend en compte cela, c'était un peu déroutant.

Quelqu'un pourrait-il donner un bref exemple montrant comment réindexer à partir de ce point, étant donné que j'ai reçu le scroll_id correctement?


11
2017-10-14 22:08


origine


Réponses:


Bonjour, vous pouvez utiliser l’API de défilement pour parcourir tous les documents de la manière la plus efficace. En utilisant scroll_id, vous pouvez trouver une session stockée sur le serveur pour votre requête de défilement spécifique. Vous devez donc fournir le scroll_id à chaque demande pour obtenir plus d'éléments.

L'API en vrac est pour des documents d'indexation plus efficaces. Lorsque vous copiez et indexez, vous avez besoin des deux, mais ils ne sont pas vraiment liés.

J'ai du code Java qui pourrait vous aider à mieux comprendre comment ça marche.

    public void reIndex() {
    logger.info("Start creating a new index based on the old index.");

    SearchResponse searchResponse = client.prepareSearch(MUSIC_INDEX)
            .setQuery(matchAllQuery())
            .setSearchType(SearchType.SCAN)
            .setScroll(createScrollTimeoutValue())
            .setSize(SCROLL_SIZE).execute().actionGet();

    BulkProcessor bulkProcessor = BulkProcessor.builder(client,
            createLoggingBulkProcessorListener()).setBulkActions(BULK_ACTIONS_THRESHOLD)
            .setConcurrentRequests(BULK_CONCURRENT_REQUESTS)
            .setFlushInterval(createFlushIntervalTime())
            .build();

    while (true) {
        searchResponse = client.prepareSearchScroll(searchResponse.getScrollId())
                .setScroll(createScrollTimeoutValue()).execute().actionGet();

        if (searchResponse.getHits().getHits().length == 0) {
            logger.info("Closing the bulk processor");
            bulkProcessor.close();
            break; //Break condition: No hits are returned
        }

        for (SearchHit hit : searchResponse.getHits()) {
            IndexRequest request = new IndexRequest(MUSIC_INDEX_NEW, hit.type(), hit.id());
            request.source(hit.sourceRef());
            bulkProcessor.add(request);
        }
    }
}

7
2017-10-14 22:17



Voici un exemple de réindexation sur un autre nœud elasticsearch en utilisant elasticsearch-py:

from elasticsearch import helpers
es_src = Elasticsearch(["host"])
es_des = Elasticsearch(["host"])

helpers.reindex(es_src, 'src_index_name', 'des_index_name', target_client=es_des)

Vous pouvez également réindexer le résultat d'une requête sur un index différent. Voici comment procéder:

from elasticsearch import helpers
es_src = Elasticsearch(["host"])
es_des = Elasticsearch(["host"])

body = {"query": {"term": {"year": "2004"}}}
helpers.reindex(es_src, 'src_index_name', 'des_index_name', target_client=es_des, query=body)

7
2018-01-14 13:30



Pour toute personne qui rencontre ce problème, vous pouvez utiliser l'API suivante du client Python pour réindexer:

https://elasticsearch-py.readthedocs.org/en/master/helpers.html#elasticsearch.helpers.reindex

Cela vous évitera de devoir faire défiler et rechercher pour obtenir toutes les données et d'utiliser l'API en bloc pour placer les données dans le nouvel index.


5
2018-05-14 23:57