From 844679d09d8760bdc9bdbefa23c4a63ce00f9cdf Mon Sep 17 00:00:00 2001 From: Alex Kanitz Date: Thu, 20 Feb 2020 13:40:49 +0100 Subject: [PATCH] pushing changes from biohackathon --- .netrc | 3 + requirements.txt | 12 +- wes_elixir/api/controllers.py | 47 ++ wes_elixir/api/register_openapi.py | 11 +- .../api/schema.stdout_stderr.openapi.yaml | 163 ++++++ wes_elixir/config/app_config.yaml | 11 +- wes_elixir/ga4gh/wes/endpoints/get_run_log.py | 2 + ... (oldirty's conflicted copy 2020-01-06).py | 515 ++++++++++++++++++ .../ga4gh/wes/endpoints/run_workflow.py | 12 +- wes_elixir/tasks/celery_task_monitor.py | 37 +- wes_elixir/tasks/register_celery.py | 12 + 11 files changed, 805 insertions(+), 20 deletions(-) create mode 100644 wes_elixir/api/controllers.py create mode 100755 wes_elixir/api/schema.stdout_stderr.openapi.yaml create mode 100644 wes_elixir/ga4gh/wes/endpoints/run_workflow (oldirty's conflicted copy 2020-01-06).py diff --git a/.netrc b/.netrc index e69de29..e5ab918 100644 --- a/.netrc +++ b/.netrc @@ -0,0 +1,3 @@ +machine ftp-private.ebi.ac.uk +login tesk-1 +password Z6fsH6MG diff --git a/requirements.txt b/requirements.txt index d608e78..701a55c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,12 +13,12 @@ cffi==1.11.5 chardet==3.0.4 click==6.7 clickclick==1.2.2 -connexion==1.5.2 +connexion==2.4.0 cryptography==2.3.1 --e git+https://github.com/ohsu-comp-bio/cwl-tes.git@62840435c5b22ac7b3ad1724047d811f72dd372d#egg=cwl-tes +-e git+https://github.com/uniqueg/cwl-tes-temp.git@e37090e00f2573e84becec30f0ade9c5003820b3#egg=cwl_tes cwltool==1.0.20181217162649 decorator==4.3.0 -Flask==1.0.2 +Flask==1.1.1 Flask-Cors==3.0.6 Flask-PyMongo==2.1.0 future==0.16.0 @@ -40,6 +40,7 @@ mccabe==0.6.1 mistune==0.8.4 mypy-extensions==0.4.1 networkx==2.2 +openapi-spec-validator==0.2.8 prov==1.5.1 psutil==5.4.7 py-tes==0.3.0 @@ -50,7 +51,7 @@ pymongo==3.7.1 pyparsing==2.2.1 python-dateutil==2.6.1 pytz==2018.5 -PyYAML==4.2b1 +PyYAML==5.1.2 rdflib==4.2.2 rdflib-jsonld==0.4.0 requests==2.20.0 @@ -61,9 +62,10 @@ shellescape==3.4.1 six==1.11.0 subprocess32==3.5.2 swagger-spec-validator==2.3.1 +swagger-ui-bundle==0.0.6 typed-ast==1.1.0 typing==3.6.6 -typing-extensions==3.6.5 +typing-extensions==3.7.4 urllib3==1.24.2 vine==1.1.4 Werkzeug==0.15.3 diff --git a/wes_elixir/api/controllers.py b/wes_elixir/api/controllers.py new file mode 100644 index 0000000..40d47b5 --- /dev/null +++ b/wes_elixir/api/controllers.py @@ -0,0 +1,47 @@ +"""Controller for auxiliary WES-ELIXIR API endpoints.""" + +import logging + +from celery import current_app as celery_app +from connexion import request +from flask import current_app + +from wes_elixir.security.decorators import auth_token_optional + +# Get logger instance +logger = logging.getLogger(__name__) + + +# GET /stdout/ +@auth_token_optional +def get_stdout(run_id, *args, **kwargs): + """Returns run STDOUT as plain text.""" + response = "" + log_request(request, response) + return response + + +# POST /stderr/ +@auth_token_optional +def get_stderr(run_id, *args, **kwargs): + """Returns run STDERR as plain text.""" + response = "" + log_request(request, response) + return response + + +def log_request(request, response): + """Writes request and response to log.""" + # TODO: write decorator for request logging + logger.debug( + ( + "Response to request \"{method} {path} {protocol}\" from " + "{remote_addr}: {response}" + ).format( + method=request.environ['REQUEST_METHOD'], + path=request.environ['PATH_INFO'], + protocol=request.environ['SERVER_PROTOCOL'], + remote_addr=request.environ['REMOTE_ADDR'], + response=response, + ) + ) diff --git a/wes_elixir/api/register_openapi.py b/wes_elixir/api/register_openapi.py index bec216c..f71bf82 100644 --- a/wes_elixir/api/register_openapi.py +++ b/wes_elixir/api/register_openapi.py @@ -39,13 +39,20 @@ def register_openapi( path = __add_security_definitions(in_file=path) # Generate API endpoints from OpenAPI spec + options = { + "swagger_ui": get_conf(spec, 'swagger_ui'), + "serve_spec": get_conf(spec, 'swagger_json'), + } + base_path = get_conf(spec, 'base_path') + if not base_path: + base_path = None try: app.add_api( path, strict_validation=get_conf(spec, 'strict_validation'), validate_responses=get_conf(spec, 'validate_responses'), - swagger_ui=get_conf(spec, 'swagger_ui'), - swagger_json=get_conf(spec, 'swagger_json'), + options=options, + base_path=base_path, ) logger.info("API endpoints specified in '{path}' added.".format( diff --git a/wes_elixir/api/schema.stdout_stderr.openapi.yaml b/wes_elixir/api/schema.stdout_stderr.openapi.yaml new file mode 100755 index 0000000..42a2c80 --- /dev/null +++ b/wes_elixir/api/schema.stdout_stderr.openapi.yaml @@ -0,0 +1,163 @@ +openapi: 3.0.0 +info: + title: WES-ELIXIR STDOUT & STDERR OpenAPI specification + contact: + name: ELIXIR Cloud & AAI group + email: alexander.kanitz@alumni.ethz.ch + license: + name: Apache 2.0 + url: https://www.apache.org/licenses/LICENSE-2.0 + version: 0.14.0 +servers: +- url: /wes-elixir/v1 +paths: + /stdout/{run_id}: + get: + summary: |- + Retrieves the content of the indicated run's STDOUT stream and returns + it as plain text. + parameters: + - in: path + name: run_id + schema: + type: string + required: true + description: Run identifier. + operationId: get_stdout + responses: + 200: + description: |- + STDOUT stream of indicated run as plain text. + content: + text/plain: + schema: + type: string + example: "This is STDOUT." + 400: + description: The request is malformed. + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + 401: + description: The request is unauthorized. + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + 403: + description: The requester is not authorized to perform this action. + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + 404: + description: The requested resource was not found. + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + 500: + description: An unexpected error occurred. + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + x-openapi-router-controller: api.controllers + /stderr/{run_id}: + get: + summary: |- + Retrieves the content of the indicated run's STDERR stream and returns + it as plain text. + operationId: get_stderr + parameters: + - in: path + name: run_id + schema: + type: string + required: true + description: Run identifier. + responses: + 200: + description: |- + STDERR stream of indicated run as plain text. + content: + text/plain: + schema: + type: string + example: "This is STDERR." + 400: + description: The request is malformed. + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + 401: + description: The request is unauthorized. + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + 403: + description: The requester is not authorized to perform this action. + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + 404: + description: The requested resource was not found. + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + 500: + description: An unexpected error occurred. + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + x-openapi-router-controller: api.controllers +components: + schemas: + Error: + required: + - message + - reason + type: object + properties: + message: + type: string + description: |- + A human readable message providing more details about the error. + example: + Required parameter 'xyz' is missing. + reason: + type: string + description: |- + Unique identifier for this error, but *not* the HTTP response code + (e.g., name of exception). + example: ValueError + description: An individual error message. + ErrorResponse: + required: + - code + - errors + - message + type: object + properties: + code: + type: integer + description: HTTP status code (e.g., 400, 404). + format: int64 + example: 400 + errors: + type: array + description: List of associated errors and warnings. + items: + $ref: '#/components/schemas/Error' + message: + type: string + description: |- + A human readable message providing more details about the error. + example: The request could not be interpreted. + description: A response object for detailed error messages. \ No newline at end of file diff --git a/wes_elixir/config/app_config.yaml b/wes_elixir/config/app_config.yaml index 9c0f5b5..244fe80 100644 --- a/wes_elixir/config/app_config.yaml +++ b/wes_elixir/config/app_config.yaml @@ -53,11 +53,20 @@ celery: # OpenAPI specs api: specs: - - path: '20181023.c5406f1-v1-0-0.workflow_execution_service.swagger.yaml' + - name: 'WES' + path: '20181023.c5406f1-v1-0-0.workflow_execution_service.swagger.yaml' strict_validation: True validate_responses: True swagger_ui: True swagger_json: True + base_path: False + - name: 'stdout_stderr' + path: 'schema.stdout_stderr.openapi.yaml' + strict_validation: True + validate_responses: True + swagger_ui: True + swagger_json: True + base_path: '/wes-elixir/v1' general_params: time_format: "%Y-%m-%dT%H:%M:%SZ" endpoint_params: diff --git a/wes_elixir/ga4gh/wes/endpoints/get_run_log.py b/wes_elixir/ga4gh/wes/endpoints/get_run_log.py index 3b63acc..66b2363 100644 --- a/wes_elixir/ga4gh/wes/endpoints/get_run_log.py +++ b/wes_elixir/ga4gh/wes/endpoints/get_run_log.py @@ -52,4 +52,6 @@ def get_run_log( ) raise Forbidden + # Remove + return run_log diff --git a/wes_elixir/ga4gh/wes/endpoints/run_workflow (oldirty's conflicted copy 2020-01-06).py b/wes_elixir/ga4gh/wes/endpoints/run_workflow (oldirty's conflicted copy 2020-01-06).py new file mode 100644 index 0000000..793f82c --- /dev/null +++ b/wes_elixir/ga4gh/wes/endpoints/run_workflow (oldirty's conflicted copy 2020-01-06).py @@ -0,0 +1,515 @@ +"""Utility functions for POST /runs endpoint.""" + +import logging +import os +import re +import shutil +import string # noqa: F401 +import subprocess + +from celery import uuid +from json import (decoder, loads) +from pymongo.errors import DuplicateKeyError +from random import choice +from typing import Dict +from yaml import dump +from werkzeug.datastructures import ImmutableMultiDict + +from wes_elixir.config.config_parser import get_conf +from wes_elixir.errors.errors import BadRequest +from wes_elixir.tasks.tasks.run_workflow import task__run_workflow + + +# Get logger instance +logger = logging.getLogger(__name__) + + +# Utility function for endpoint POST /runs +def run_workflow( + config: Dict, + form_data: ImmutableMultiDict, + *args, + **kwargs +) -> Dict: + """Executes workflow and save info to database; returns unique run id.""" + # Validate data and prepare run environment + form_data_dict = __immutable_multi_dict_to_nested_dict( + multi_dict=form_data + ) + __validate_run_workflow_request(data=form_data_dict) + __check_service_info_compatibility(data=form_data_dict) + document = __init_run_document(data=form_data_dict) + document = __create_run_environment( + config=config, + document=document, + **kwargs + ) + + # Start workflow run in background + __run_workflow( + config=config, + document=document, + **kwargs + ) + + response = {'run_id': document['run_id']} + return response + + +def __immutable_multi_dict_to_nested_dict( + multi_dict: ImmutableMultiDict +) -> Dict: + """Converts ImmutableMultiDict to nested dictionary.""" + # Convert to flat dictionary + nested_dict = multi_dict.to_dict(flat=True) + for key in nested_dict: + # Try to decode JSON string; ignore JSONDecodeErrors + try: + nested_dict[key] = loads(nested_dict[key]) + except decoder.JSONDecodeError: + pass + return nested_dict + + +def __validate_run_workflow_request(data: Dict) -> None: + """Validates presence and types of workflow run request form data; sets + defaults for optional fields.""" + # The form data is not validated properly because all types except + # 'workflow_attachment' are string and none are labeled as required + # Considering the 'RunRequest' model in the current specs (0.3.0), the + # following assumptions are made and verified for the indicated parameters: + # workflow_params: + # type = dict + # required = True + # workflow_type: + # type = str + # required = True + # workflow_type_version: + # type = str + # required = True + # tags: + # type = dict + # required = False + # workflow_engine_parameters: + # type = dict + # required = False + # workflow_url: + # type = str + # required = True + # workflow_attachment: + # type = [str] + # required = False + + # Set required parameters + required = { + 'workflow_params', + 'workflow_type', + 'workflow_type_version', + 'workflow_url', + } + params_str = [ + 'workflow_type', + 'workflow_type_version', + 'workflow_url', + ] + params_dict = [ + 'workflow_params', + 'workflow_engine_parameters', + 'tags', + ] + type_str = dict((key, data[key]) for key in params_str if key in data) + type_dict = dict((key, data[key]) for key in params_dict if key in data) + # TODO: implement type casting/checking for workflow attachment + + # Raise error if any required params are missing + if not required <= set(data): + logger.error('POST request does not conform to schema.') + raise BadRequest + + # Raise error if any string params are not of type string + if not all(isinstance(value, str) for value in type_str.values()): + logger.error('POST request does not conform to schema.') + raise BadRequest + + # Raise error if any dict params are not of type dict + if not all(isinstance(value, dict) for value in type_dict.values()): + logger.error('POST request does not conform to schema.') + raise BadRequest + + return None + + +def __check_service_info_compatibility(data: Dict) -> None: + """Checks compatibility with service info; raises BadRequest.""" + # TODO: implement me + return None + + +def __init_run_document(data: Dict) -> Dict: + """Initializes workflow run document.""" + document: Dict = dict() + document['api'] = dict() + document['internal'] = dict() + document['api']['request'] = data + document['api']['state'] = 'UNKNOWN' + document['api']['run_log'] = dict() + document['api']['task_logs'] = list() + document['api']['outputs'] = dict() + return document + + +def __create_run_environment( + config: Dict, + document: Dict, + **kwargs +) -> Dict: + """Creates unique run identifier and permanent and temporary storage + directories for current run.""" + collection_runs = get_conf(config, 'database', 'collections', 'runs') + out_dir = get_conf(config, 'storage', 'permanent_dir') + tmp_dir = get_conf(config, 'storage', 'tmp_dir') + run_id_charset = eval(get_conf(config, 'database', 'run_id', 'charset')) + run_id_length = get_conf(config, 'database', 'run_id', 'length') + + # Keep on trying until a unique run id was found and inserted + # TODO: If no more possible IDs => inf loop; fix (raise custom error; 500 + # to user) + while True: + + # Create unique run and task ids + run_id = __create_run_id( + charset=run_id_charset, + length=run_id_length, + ) + task_id = uuid() + + # Set temporary and output directories + current_tmp_dir = os.path.abspath(os.path.join(tmp_dir, run_id)) + current_out_dir = os.path.abspath(os.path.join(out_dir, run_id)) + + # Try to create workflow run directory (temporary) + try: + # TODO: Think about permissions + # TODO: Add working dir (currently one has to run the app from + # outermost dir) + os.makedirs(current_tmp_dir) + os.makedirs(current_out_dir) + + # Try new run id if directory already exists + except FileExistsError: + continue + + # Add run/task/user identifier, temp/output directories to document + document['run_id'] = run_id + document['task_id'] = task_id + if 'user_id' in kwargs: + document['user_id'] = kwargs['user_id'] + else: + document['user_id'] = None + document['internal']['tmp_dir'] = current_tmp_dir + document['internal']['out_dir'] = current_out_dir + + # Process worflow attachments + document = __process_workflow_attachments(document) + + # Try to insert document into database + try: + collection_runs.insert(document) + + # Try new run id if document already exists + except DuplicateKeyError: + + # And remove run directories created previously + shutil.rmtree(current_tmp_dir, ignore_errors=True) + shutil.rmtree(current_out_dir, ignore_errors=True) + + continue + + # Catch other database errors + # TODO: implement properly + except Exception as e: + print('Database error') + print(e) + break + + # Exit loop + break + + return document + + +def __create_run_id( + charset: str = '0123456789', + length: int = 6 +) -> str: + """Creates random run ID.""" + return ''.join(choice(charset) for __ in range(length)) + + +def __process_workflow_attachments(data: Dict) -> Dict: + """Processes workflow attachments.""" + # TODO: implement properly + # Current workaround until processing of workflow attachments is + # implemented + # Use 'workflow_url' for path to (main) CWL workflow file on local file + # system or in Git repo + # Use 'workflow_params' or file in Git repo to generate YAML file + + # Set regular expression for finding workflow files on git repositories + # Assumptions: + # - A URL needs to consist of a root, a "separator" keyword, a + # branch/commit, and a "file path", separated by slashes + # - The root is the part of the URL up to the separator and is assumed to + # represent the "git clone URL" when '.git' is appended + # - Accepted separator keywords are 'blob', 'src' and 'tree' + # - The value branch/commit is used to checkout the repo to that state + # before obtaining the file + # - The "file path" segment represents the relative path to the CWL + # workflow file when inside the repo + # + # All of the above assumptions should be met when copying the links of + # files in most repos on GitHub, GitLab or Bitbucket + # + # Note that the "file path" portion (see above) of a CWL *parameter file* + # can be *optionally* appended to the URL + # + # The following additional rules apply for workflow and/or parameter files: + # - CWL workflow files *must* end in .cwl, .yml, .yaml or .json + # - Parameter files *must* end in '.yml', '.yaml' or '.json' + # - Accepted delimiters for separating workflow and parameter file, if + # specified, are: ',', ';', ':', '|' + re_git_file = re.compile( + ( + r'^(https?:.*)\/(blob|src|tree)\/(.*?)\/(.*?\.(cwl|yml|yaml|json))' + r'[,:;|]?(.*\.(yml|yaml|json))?' + ) + ) + + # Create directory for storing workflow files + workflow_dir = os.path.abspath( + os.path.join( + data['internal']['out_dir'], 'workflow_files' + ) + ) + try: + os.mkdir(workflow_dir) + + except OSError: + # TODO: Do something more reasonable here + pass + + # Get main workflow file + user_string = data['api']['request']['workflow_url'] + m = re_git_file.match(user_string) + + # Get workflow from Git repo if regex matches + if m: + + repo_url = '.'.join([m.group(1), 'git']) + branch_commit = m.group(3) + cwl_path = m.group(4) + + # Try to clone repo + if not subprocess.run( + [ + 'git', + 'clone', + repo_url, + os.path.join(workflow_dir, 'repo') + ], + check=True + ): + logger.error( + ( + 'Could not clone Git repository. Check value of ' + "'workflow_url' in run request." + ) + ) + raise BadRequest + + # Try to checkout branch/commit + if not subprocess.run( + [ + 'git', + '--git-dir', + os.path.join(workflow_dir, 'repo', '.git'), + '--work-tree', + os.path.join(workflow_dir, 'repo'), + 'checkout', + branch_commit + ], + check=True + ): + logger.error( + ( + 'Could not checkout repository commit/branch. Check value ' + "of 'workflow_url' in run request." + ) + ) + raise BadRequest + + # Set CWL path + data['internal']['cwl_path'] = os.path.join( + workflow_dir, + 'repo', + cwl_path + ) + + # Else assume value of 'workflow_url' represents file on local file system + else: + + # Set main CWL workflow file path + data['internal']['cwl_path'] = os.path.abspath( + data['api']['request']['workflow_url'] + ) + + # Extract name and extensions of workflow + workflow_name_ext = os.path.splitext( + os.path.basename( + data['internal']['cwl_path'] + ) + ) + + # Try to get parameters from 'workflow_params' field + if data['api']['request']['workflow_params']: + data['internal']['param_file_path'] = os.path.join( + workflow_dir, + '.'.join([ + str(workflow_name_ext[0]), + 'yml', + ]), + ) + with open(data['internal']['param_file_path'], 'w') as yaml_file: + dump( + data['api']['request']['workflow_params'], + yaml_file, + allow_unicode=True, + default_flow_style=False + ) + + # Or from provided relative file path in repo + elif m and m.group(6): + param_path = m.group(6) + data['internal']['param_file_path'] = os.path.join( + workflow_dir, + 'repo', + param_path, + ) + + # Else try to see if there is a 'yml', 'yaml' or 'json' file with exactly + # the same basename as CWL in same dir + else: + param_file_extensions = ['yml', 'yaml', 'json'] + for ext in param_file_extensions: + possible_param_file = os.path.join( + workflow_dir, + 'repo', + '.'.join([ + str(workflow_name_ext[0]), + ext, + ]), + ) + if os.path.isfile(possible_param_file): + data['internal']['param_file_path'] = possible_param_file + break + + # Raise BadRequest if not parameter file was found + if 'param_file_path' not in data['internal']: + raise BadRequest + + # Extract workflow attachments from form data dictionary + if 'workflow_attachment' in data['api']['request']: + + # TODO: do something with data['workflow_attachment'] + + # Strip workflow attachments from data + del data['api']['request']['workflow_attachment'] + + # Add workflow base name (without extension) to document + data['api']['run_log']['name'] = str(workflow_name_ext[0]) + + # Return form data stripped of workflow attachments + return data + + +def __run_workflow( + config: Dict, + document: Dict, + **kwargs +) -> None: + """Helper function `run_workflow()`.""" + tes_url = get_conf(config, 'tes', 'url') + remote_storage_url = get_conf(config, 'storage', 'remote_storage_url') + run_id = document['run_id'] + task_id = document['task_id'] + tmp_dir = document['internal']['tmp_dir'] + cwl_path = document['internal']['cwl_path'] + param_file_path = document['internal']['param_file_path'] + + # Build command + command_list = [ + 'cwl-tes', + '--debug', + '--leave-outputs', + '--remote-storage-url', remote_storage_url, + '--tes', tes_url, + cwl_path, + param_file_path + ] + + # Add authorization parameters + if 'token' in kwargs: + auth_params = [ + '--token-public-key', get_conf( + config, + 'security', + 'jwt', + 'public_key' + ).encode('unicode_escape').decode('utf-8'), + '--token', kwargs['token'], + ] + command_list[2:2] = auth_params + + # TEST CASE FOR SYSTEM ERROR + # command_list = [ + # '/path/to/non_existing/script', + # ] + # TEST CASE FOR EXECUTOR ERROR + # command_list = [ + # '/bin/false', + # ] + # TEST CASE FOR SLOW COMPLETION WITH ARGUMENT (NO STDOUT/STDERR) + # command_list = [ + # 'sleep', + # '30', + # ] + + # Get timeout duration + timeout_duration = get_conf( + config, + 'api', + 'endpoint_params', + 'timeout_run_workflow', + ) + + # Execute command as background task + logger.info( + ( + "Starting execution of run '{run_id}' as task '{task_id}' in " + "'{tmp_dir}'..." + ).format( + run_id=run_id, + task_id=task_id, + tmp_dir=tmp_dir, + ) + ) + task__run_workflow.apply_async( + None, + { + 'command_list': command_list, + 'tmp_dir': tmp_dir, + }, + task_id=task_id, + soft_time_limit=timeout_duration, + ) + return None diff --git a/wes_elixir/ga4gh/wes/endpoints/run_workflow.py b/wes_elixir/ga4gh/wes/endpoints/run_workflow.py index 793f82c..545dc1c 100644 --- a/wes_elixir/ga4gh/wes/endpoints/run_workflow.py +++ b/wes_elixir/ga4gh/wes/endpoints/run_workflow.py @@ -363,7 +363,14 @@ def __process_workflow_attachments(data: Dict) -> Dict: data['api']['request']['workflow_url'] ) - # Extract name and extensions of workflow + # Extract name and extensions of workflow + workflow_name_ext = os.path.splitext( + os.path.basename( + data['internal']['cwl_path'] + ) + ) + + # Get parameter file workflow_name_ext = os.path.splitext( os.path.basename( data['internal']['cwl_path'] @@ -424,9 +431,6 @@ def __process_workflow_attachments(data: Dict) -> Dict: # Strip workflow attachments from data del data['api']['request']['workflow_attachment'] - - # Add workflow base name (without extension) to document - data['api']['run_log']['name'] = str(workflow_name_ext[0]) # Return form data stripped of workflow attachments return data diff --git a/wes_elixir/tasks/celery_task_monitor.py b/wes_elixir/tasks/celery_task_monitor.py index 60c4d59..839ab29 100644 --- a/wes_elixir/tasks/celery_task_monitor.py +++ b/wes_elixir/tasks/celery_task_monitor.py @@ -32,6 +32,8 @@ def __init__( celery_app: Celery, collection: Collection, tes_config: Dict[str, str], + stdout_endpoint: Optional[str] = None, + stderr_endpoint: Optional[str] = None, timeout: float = 0, authorization: bool = True, time_format: str = "%Y-%m-%dT%H:%M:%SZ", @@ -39,6 +41,8 @@ def __init__( """Starts Celery task monitor daemon process.""" self.celery_app = celery_app self.collection = collection + self.stdout_endpoint = stdout_endpoint + self.stderr_endpoint = stderr_endpoint self.timeout = timeout self.authorization = authorization self.time_format = time_format @@ -233,8 +237,16 @@ def on_task_succeeded( ) -> None: """Event handler for successful, failed and canceled Celery tasks.""" - if not self.collection.find_one({'task_id': event['uuid']}): + document = self.collection.find_one({'task_id': event['uuid']}) + if not document: return None + + # Create dictionary for internal parameters + internal = dict() + internal['task_finished'] = datetime.utcfromtimestamp( + event['timestamp'] + ) + # Parse subprocess results try: (returncode, log, tes_ids) = literal_eval(event['result']) @@ -251,11 +263,20 @@ def on_task_succeeded( ) pass - # Create dictionary for internal parameters - internal = dict() - internal['task_finished'] = datetime.utcfromtimestamp( - event['timestamp'] - ) + # Save STDOUT & STDERR + internal['stdout'] = log + internal['stderr'] = '' + + # Compile API URLs to retreive STDOUT & STDERR + if 'run_id' in document: + if self.stdout_endpoint: + stdout_url = '/'.join([self.stdout_endpoint, document['run_id']]) + else: + stdout_url = 'unavailable' + if self.stderr_endpoint: + stderr_url = '/'.join([self.stderr_endpoint, document['run_id']]) + else: + stderr_url = 'unavailable' # Set final state to be set document = self.collection.find_one( @@ -297,8 +318,8 @@ def on_task_succeeded( task_logs=task_logs, end_time=internal['task_finished'].strftime(self.time_format), exit_code=returncode, - stdout=log, - stderr='', + stdout=stdout_url, + stderr=stderr_url, ) except Exception as e: logger.exception( diff --git a/wes_elixir/tasks/register_celery.py b/wes_elixir/tasks/register_celery.py index 1f45f1e..ca9b29c 100644 --- a/wes_elixir/tasks/register_celery.py +++ b/wes_elixir/tasks/register_celery.py @@ -20,6 +20,16 @@ def register_task_service(app: Flask) -> None: # Instantiate Celery app instance celery_app = create_celery_app(app) + # Find STDOUT/STDERR endpoints + stdout_endpoint = None + stderr_endpoint = None + for spec in app.config['api']['specs']: + if spec['name'] == 'stdout_stderr': + base_path = spec['base_path'] + stdout_endpoint = '/'.join([base_path, 'stdout']) + stderr_endpoint = '/'.join([base_path, 'stderr']) + break + # Start task monitor daemon TaskMonitor( celery_app=celery_app, @@ -32,6 +42,8 @@ def register_task_service(app: Flask) -> None: 'logs_endpoint_query_params': app.config['tes']['get_logs']['query_params'], }, + stdout_endpoint=stdout_endpoint, + stderr_endpoint=stderr_endpoint, timeout=app.config['celery']['monitor']['timeout'], authorization=app.config['security']['authorization_required'], time_format=app.config['api']['general_params']['time_format'],