##########################################################################
# NSAp - Copyright (C) CEA, 2013 - 2018
# Distributed under the terms of the CeCILL-B license, as published by
# the CEA-CNRS-INRIA. Refer to the LICENSE file or to
# http://www.cecill.info/licences/Licence_CeCILL-B_V1-en.html
# for details.
##########################################################################
"""
A module to connect a CubicWeb service and send requests.
"""
# System import
from __future__ import print_function
import os
import sys
import json
import time
import stat
import glob
import csv
try:
# for Python 2.x
from StringIO import StringIO
except ImportError:
# for Python 3.x
from io import StringIO
# Third party import
import requests
import numpy
import paramiko
[docs]def load_csv(text, delimiter=";"):
""" Load a csv.
Parameters
----------
text: string (mandatory)
the csv text.
Returns
-------
csv_lines: list
a list containing all the csv lines.
"""
csv_stream = StringIO(text)
reader = csv.reader(csv_stream, delimiter=delimiter)
csv_lines = [line for line in reader]
return csv_lines
[docs]class CWInstanceConnection(object):
""" Tool to dump the data stored in a cw instance.
.. code-block:: python
# Import Connection module
from cwbrowser.cw_connection import CWInstanceConnection
# Create dummy rqls
rql1 = ("Any C, G Where X is Subject, X code_in_study C, "
"X handedness 'ambidextrous', X gender G")
rql2 = ("Any S WHERE S is Scan, S has_data A, A field '3T', "
"S in_assessment B, B timepoint 'V1', S format 'GIS', "
"S in_assessment C, C concerns D, D code_in_study 'ab100207'")
# HTTP test
url = @HTTPURL; login = @LOGIN; password = @PWD
connection = CWInstanceConnection(url, login, password, port=9191)
connection.execute(rql1, export_type="json")
connection.execute_with_sync(rql2, "/tmp/fuse", timer=1)
# HTTPS test
url = @HTTPSURL; login = @LOGIN; password = @PWD
connection = CWInstanceConnection(url, login, password,
server_root="/home/$login")
connection.execute(rql)
Attributes
----------
url : str
the url to the cw instance.
login : str
the cw login.
opener: OpenerDirector
object that contains the connexion to the cw instance.
"""
# Global variable that specify the supported export cw formats
_EXPORT_TYPES = ["json", "csv", "cw"]
importers = {
"json": json.loads,
"csv": load_csv,
"cw": json.loads,
"cwsearch": json.loads
}
def __init__(self, url, login, password, port=22, server_root=os.path.sep,
verify=True, verbosity=0):
""" Initilize the HTTPConnection class.
Parameters
----------
url: str (mandatory)
the url to the cw instance.
login: str (mandatory)
the cw login.
password: str (mandatory)
the cw user password.
port: int (optional default 22)
the sftp port.
server_root: str (optional default '/')
the server root directory where the user mount points (chroot) are
mapped.
verify: bool (optional, default True)
if unset, disable the security certificate check.
verbosity: int (optional default 0)
the verbosity level.
"""
# Class parameters
if not url.startswith("https"):
raise ValueError(
"Authentication was requested on a non secured URL ({0})."
"Request has been blocked for security reasons.".format(url))
self.url = url
self.login = login
self.password = password
self.host = self.url.split("/")[2].split(":")[0]
self.port = port
self.server_root = server_root
self.verify = verify
self.verbosity = verbosity
###########################################################################
# Public Members
###########################################################################
[docs] def execute(self, rql, export_type="json", nb_tries=2):
""" Method that loads the rset from a rql request.
Parameters
----------
rql: str (mandatory)
the rql rquest that will be executed on the cw instance.
export_type: str (optional default 'json')
the result set export format: one defined in '_EXPORT_TYPES'.
nb_tries: int (optional default 2)
number of times a request will be repeated if it fails.
Returns
-------
rset: list of list of str
a list that contains the requested entity parameters.
"""
# Debug message
if self.verbosity > 2:
print("Executing rql: '%s'", rql)
print("Exporting in: '%s'", export_type)
# Check export type
if export_type not in self._EXPORT_TYPES:
raise Exception("Unknown export type '{0}', expect one in "
"'{1}'.".format(export_type, self._EXPORT_TYPES))
# Create a dictionary with the request meta information
data = {
"__login": self.login,
"__password": self.password,
"rql": rql,
"vid": export_type + "export",
"_binary": 1
}
if export_type == "cw":
del data["_binary"]
try_count = 0
while True:
try: # Get the result set, it will always try at least once
try_count += 1
response = requests.post(
self.url, data=data,verify=self.verify,
auth=(self.login, self.password))
if not response.ok:
raise ValueError(response.reason)
rset = self.importers[export_type](response.content)
break
except Exception as e:
if try_count >= nb_tries:
# keep original message of e and add infos
e.message += ("\nFailed to get data after {} tries.\n"
"Request: {}").format(nb_tries, data["rql"])
raise e
time.sleep(1) # wait 1 second before retrying
# Debug message
if self.verbosity > 2:
print("RQL result: '%s'", rset)
return rset
[docs] def execute_with_sync(self, rql, sync_dir, timer=3, nb_tries=3):
""" Method that loads the rset from a rql request through sftp protocol
using the CWSearch mechanism.
Parameters
----------
rql: str (mandatory)
the rql rquest that will be executed on the cw instance.
sync_dir: str (mandatory)
the destination folder where the rql data are synchronized.
timer: int (optional default 3)
the time in seconds we are waiting for the fuse or twisted
server update.
nb_tries: int (optional default 3)
if the update has not been detected after 'nb_of_try' trials
raise an exception.
Returns
-------
rset: list of list or list of dict
a list that contains the requested cubicweb database parameters
when a json rset is generated, a list of dictionaries if a csv
rset is generated.
"""
# Create the CWSearch
self._create_cwsearch(rql)
# Wait for the update: use double quote in rql
try_nb = 1
cwsearch_title = None
rql = rql.replace("'", '"')
while try_nb <= nb_tries:
# Timer
if self.verbosity > 2:
print("Sleeping: '%i sec'", timer)
time.sleep(timer)
# Get all the user CWSearch in the database
rset = self.execute(
"Any S, T, P Where S is CWSearch, S title T, S path P")
# Check if the cubicweb update has been done.
# If true, get the associated CWSearch title
for item in rset:
if item[2].replace("'", '"') == rql:
cwsearch_title = item[1]
break
if cwsearch_title is not None:
break
# Increment
try_nb += 1
# If the search is not created
if try_nb == (nb_tries + 1):
raise IOError("The search has not been created properly.")
# Get instance parameters
cw_params = self.execute(rql="", export_type="cw")
if self.verbosity > 2:
print("Autodetected sync parameters: '%s'", str(cw_params))
# Copy the data with the sftp fuse mount point
self._get_server_dataset(sync_dir, cwsearch_title, cw_params)
# Load the rset
local_dir = os.path.join(sync_dir, cwsearch_title)
rset_file = glob.glob(os.path.join(local_dir, "request_result.*"))
if self.verbosity > 2:
print("Autodetected json rset file at location '{0}'".format(
rset_file))
if len(rset_file) != 1:
raise IOError("'{0}' rset file not supported, expect a single "
"rset file.".format(rset_json_file))
rset_file = rset_file[0]
filext = os.path.splitext(rset_file)[1]
# > deal with json file
if filext == ".json":
with open(rset_file) as json_data:
rset = json.load(json_data)
# Tune the rset files in order to point in the local filesystem
if not local_dir.endswith(os.path.sep):
local_dir += os.path.sep
if not cw_params["basedir"].endswith(os.path.sep):
cw_params["basedir"] += os.path.sep
for rset_items in rset:
for item_index in range(len(rset_items)):
item = rset_items[item_index]
if (isinstance(item, basestring) and
item.startswith(cw_params["basedir"])):
rset_items[item_index] = item.replace(
cw_params["basedir"], local_dir, 1)
# > deal with csv file
elif filext == ".csv":
with open(rset_file) as csv_data:
data = csv.DictReader(csv_data, delimiter=";", quotechar="|")
rset = [item for item in data]
# > raise an error when the file extension is not supported
else:
raise IOError("Unknown '{0}' rset extension.".format(rset_file))
# Debug message
if self.verbosity > 2:
print("RQL result: '%s'", rset)
return rset
[docs] def get_genotype_measure(self, gene_name, genomic_measure, nb_tries=3):
""" Method that loads the genomic measures stored in PLINK format.
Parameters
----------
gene_name: str (mandatory)
a gene name used to limit the number of measures that will be
loaded.
genomic_measure: str (mandatory)
the genomic measure name associated to PLINK files.
nb_tries: int (optional default 3)
if the update has not been detected after 'nb_of_try' trials
raise an exception.
Returns
-------
rset: dict
dictionary with 'labels' and 'records' (that contains
the requested cubicweb database parameters).
"""
# Debug message
if self.verbosity > 2:
print("Genotype extraction: '{0}', '{1}'".format(
genomic_measure, gene_name))
# Create a dictionary with the request meta information
data = {
"__login": self.login,
"__password": self.password,
"vid": "metagen-search-json",
"measure": genomic_measure,
"gene": gene_name,
"export": "data"
}
try_count = 0
while True:
try: # Get the result set, it will always try at least once
try_count += 1
response = requests.post(
self.url, data=data, verify=self.verify,
auth=(self.login, self.password))
if not response.ok:
raise ValueError(response.reason)
rset = self.importers["json"](response.content)
break
except Exception as e:
if try_count >= nb_tries:
# keep original message of e and add infos
e.message += ("\nFailed to get data after {} tries.\n"
"Request: {}").format(nb_tries, data["rql"])
raise e
time.sleep(1) # wait 1 second before retrying
# Debug message
if self.verbosity > 2:
print("Genotype result: '%s'", rset)
return rset
###########################################################################
# Private Members
###########################################################################
def _get_server_dataset(self, sync_dir, cwsearch_title, cw_params):
""" Download the CWSearch result trough a sftp connection.
.. note::
If a folder 'sync_dir' + 'cwsearch_title' is detected on the local
machine, no download is run. We assume that the CWSearch has already
been downloaded properly.
Parameters
----------
sync_dir: str (mandatory)
the destination folder where the rql data are synchronized.
cwsearch_title: str (mandatory)
the title of the CWSearch that will be downloaded.
cw_params: dict (mandatory)
a dictionary containing cw/fuse parameters.
"""
# Build the mount point
mount_point = os.path.join(
self.server_root, cw_params["instance_name"])
# Get the virtual folder to sync
virtual_dir_to_sync = os.path.join(mount_point, cwsearch_title)
if self.verbosity > 2:
print("Autodetected sync directory: '%s'", virtual_dir_to_sync)
# Get the local folder
local_dir = os.path.join(sync_dir, cwsearch_title)
if os.path.isdir(local_dir):
print("The CWSearch '{0}' has been found at location "
"'{1}'. Do not download the data again.".format(
cwsearch_title, local_dir))
# Rsync via paramiko and sftp
else:
transport = paramiko.Transport((self.host, self.port))
transport.connect(username=self.login, password=self.password)
sftp = paramiko.SFTPClient.from_transport(transport)
if self.verbosity > 2:
print("Downloading: '%s' to '%s'", virtual_dir_to_sync,
local_dir)
self._sftp_get_recursive(virtual_dir_to_sync, local_dir, sftp)
if self.verbosity > 2:
print("Downloading done")
sftp.close()
transport.close()
def _sftp_get_recursive(self, path, dest, sftp):
""" Recursive download of the data through a sftp connection.
Parameters
----------
path: str (mandatory)
the sftp path to download.
dest: str (mandatory)
the destination folder on the local machine.
sftp: paramiko sftp connection (mandatory)
"""
# Go through the current sftp folder content
dir_items = sftp.listdir(path)
os.makedirs(dest)
for item in dir_items:
# Construct the item absolute path
item_path = os.path.join(path, item)
dest_path = os.path.join(dest, item)
# If a directory is found
if self._sftp_isdir(item_path, sftp):
self._sftp_get_recursive(item_path, dest_path, sftp)
# Otherwise transfer the data
else:
sftp.get(item_path, dest_path)
def _sftp_isdir(self, path, sftp):
""" Check if a distant path is a directory through a sftp connection.
Parameters
----------
path: str (mandatory)
the sftp path to download.
sftp: paramiko sftp connection (mandatory)
"""
try:
return stat.S_ISDIR(sftp.stat(path).st_mode)
#Path does not exist, so by definition not a directory
except IOError:
return False
def _create_cwsearch(self, rql, export_type="cwsearch"):
""" Method that creates a CWSearch entity from a rql.
.. note::
The CWSearch title has to be unique, build automatically title
of the form 'auto_generated_title_x' where x is incremented
each time an element is inserted in the data base.
Parameters
----------
rql: str (mandatory)
the rql rquest that will be executed on the cw instance.
"""
# Debug message
if self.verbosity > 2:
print("Executing rql: '%s'", rql)
print("Exporting in: '%s'", export_type)
# Create a dictionary with the request meta information
data = {
"__login": self.login,
"__password": self.password,
"path": rql,
"vid": export_type + "export"
}
# Get the result set
response = requests.post(self.url, data=data, verify=self.verify,
auth=(self.login, self.password))
if not response.ok:
raise ValueError(response.reason)
status = self.importers[export_type](response.content)
if status["exitcode"] != 0:
raise ValueError("Can't create 'CWSearch' from RQL '{0}': "
"{1}.".format(rql, status["stderr"]))