Menu

Package to generate/request a filesystem reference.

Source code for blizzard.elasticfs

# -*- coding: utf-8 -*-
###############################################################################
# NSAp - Copyright (C) CEA, 2019 - 2020
# Distributed under the terms of the CeCILL-B license, as published by
# the CEA-CNRS-INRIA. Refer to the LICENSE file or to
# http://www.cecill.info/licences/Licence_CeCILL-B_V1-en.html
# for details.
###############################################################################


"""
This module provides tools to generate/request an ElasticSearch filesystem
reference.
"""

# Imports
from collections import OrderedDict
from pprint import pprint
import time
import ssl
import progressbar
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk, parallel_bulk
from elasticsearch.connection import create_ssl_context


[docs]class Blizzard(object): """ This class enables us to construct a filesystem resource with ElasticSearch. """ def __init__(self, url, verify_certs=True): """ Initialize the Blizzard class. Use the bulk API to perform many index operations in a single API call. This can greatly increase the indexing speed. Parameters ---------- url: str the server url. verify_certs: bool, default True verify server certificate. """ kwargs = {} if not verify_certs: ssl_context = create_ssl_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE kwargs["verify_certs"] = verify_certs kwargs["ssl_context"] = ssl_context self.session = Elasticsearch([url], **kwargs) @classmethod def _format_result(cls, res): """ Format the result of a query. Parameters ---------- res: dict the returned query result. Returns ------- format_res: dict the formated returned query result. """ format_res = dict((item["_id"], item["_source"]) for item in res["hits"]["hits"]) return format_res def _scroll(self, data): """ Use the scroll API to retrieve the result of a query. Parameters ---------- data: dict the returned query data. Returns ------- format_res: dict the formated returned query result. """ scroll_id = data["_scroll_id"] scroll_size = len(data["hits"]["hits"]) scroll_result = {} while scroll_size > 0: scroll_result.update(Blizzard._format_result(data)) data = self.session.scroll(scroll_id=scroll_id, scroll="1m") scroll_id = data["_scroll_id"] scroll_size = len(data["hits"]["hits"]) return scroll_result ########################################################################### # Public Methods ###########################################################################
[docs] def add_mapping(self, index, mapping): """ Create an index and add an index. Parameters ---------- index: str the index name. mapping: dict the index mapping. """ self.session.indices.create(index=index, ignore=400, body=mapping)
[docs] def status(self, full=False, display=True): """ Display the content status. Parameters ---------- full: bool, default False if True display the ElasticSearch cluster sttatus. display: bool, default True if True display the status. Returns ------- result: dict the ElasticSearch current status. """ query = {"query": {"match_all": {}}} result = OrderedDict() if full: for key, val in self.session.info().items(): result[key] = val for index in self.session.indices.get_alias().keys(): status = self.session.count(index=index, body=query) result[index] = status["count"] if index == "timestamp": query["size"] = 10000 status = Blizzard._format_result( self.session.search(index=index, body=query)) for key, val in status.items(): result[val["project"]] = val["timestamp"] if display: print("-" * 20) for key, val in result.items(): if not isinstance(val, int): print(" {0:15} {1}".format(key, val)) else: print("- {0:20}: {1}".format(key, val)) print("-" * 20) return result
[docs] def list(self, path): """ List the content of the input path. Parameters ---------- path: str the path to be listed. Returns ------- data: list the path content. """ query = { "size": 10000, "query": { "term": { "path": path } } } data = self.session.search( index="content", body=query, request_timeout=500, scroll="1m") result = self._scroll(data) if path not in result: print("'{0}' is not defined.".format(path)) return [] return result[path]["ls"]
[docs] def search(self, keys=None): """ Retrieve the files that matche the filtering keys. Parameters ---------- keys: list of str, default None the filtering keys. Returns ------- data: list the requested files. """ if keys is None: query = { "query": { "size": 10000, "match_all": {} } } else: query = { "size": 10000, "query": { "match": { "keys": { "operator": "and", "query": " ".join(keys) } } } } data = self.session.search( index="files", body=query, request_timeout=500, scroll="1m") result = self._scroll(data) return list(result.keys())
[docs] def import_data(self, actions, thread_count=1): """ Method that import one chromsome data in the database. Parameters ---------- actions: dict some bulk data to be inserted. thread_count: int, default 1 size of the threadpool to use for the bulk requests. """ chunk_size = 50000 if thread_count == 1: result = bulk(self.session, actions, chunk_size=chunk_size, stats_only=True, request_timeout=500) print(result) else: parallel_bulk(self.session, actions, thread_count=thread_count, chunk_size=chunk_size)
[docs] def delete_index(self, name): """ Delete a specific index. Parameters ---------- name: str the name of the index to be deleted. """ self.session.indices.delete(index=name, ignore=[400, 404])

Follow us

© 2020, blizzard developers .
Inspired by AZMIND template.