diff --git a/metaflow/mflog/__init__.py b/metaflow/mflog/__init__.py index 5a9076320b..f6fd468878 100644 --- a/metaflow/mflog/__init__.py +++ b/metaflow/mflog/__init__.py @@ -1,4 +1,9 @@ import math +import time + +from .mflog import refine, set_should_persist + +from metaflow.util import to_unicode # Log source indicates the system that *minted the timestamp* # for the logline. This means that for a single task we can @@ -39,17 +44,17 @@ BASH_SAVE_LOGS = " ".join(BASH_SAVE_LOGS_ARGS) # this function returns a bash expression that redirects stdout -# and stderr of the given bash expression to mflog.tee -def bash_capture_logs(bash_expr, var_transform=None): +# and stderr of the given command to mflog +def capture_output_to_mflog(command_and_args, var_transform=None): if var_transform is None: var_transform = lambda s: "$%s" % s - cmd = "python -m metaflow.mflog.tee %s %s" - parts = ( - bash_expr, - cmd % (TASK_LOG_SOURCE, var_transform("MFLOG_STDOUT")), - cmd % (TASK_LOG_SOURCE, var_transform("MFLOG_STDERR")), + + return "python -m metaflow.mflog.redirect_streams %s %s %s %s" % ( + TASK_LOG_SOURCE, + var_transform("MFLOG_STDOUT"), + var_transform("MFLOG_STDERR"), + command_and_args, ) - return "(%s) 1>> >(%s) 2>> >(%s >&2)" % parts # update_delay determines how often logs should be uploaded to S3 @@ -71,7 +76,8 @@ def update_delay(secs_since_start): # this function is used to generate a Bash 'export' expression that -# sets environment variables that are used by 'tee' and 'save_logs'. +# sets environment variables that are used by 'redirect_streams' and +# 'save_logs'. # Note that we can't set the env vars statically, as some of them # may need to be evaluated during runtime def export_mflog_env_vars( @@ -99,3 +105,41 @@ def export_mflog_env_vars( env_vars["MF_DATASTORE_ROOT"] = datastore_root return "export " + " ".join("%s=%s" % kv for kv in env_vars.items()) + + +def tail_logs(prefix, stdout_tail, stderr_tail, echo, has_log_updates): + def _available_logs(tail, stream, echo, should_persist=False): + # print the latest batch of lines + try: + for line in tail: + if should_persist: + line = set_should_persist(line) + else: + line = refine(line, prefix=prefix) + echo(line.strip().decode("utf-8", errors="replace"), stream) + except Exception as ex: + echo( + "%s[ temporary error in fetching logs: %s ]" % to_unicode(prefix), + ex, + "stderr", + ) + + start_time = time.time() + next_log_update = start_time + log_update_delay = 1 + while has_log_updates(): + if time.time() > next_log_update: + _available_logs(stdout_tail, "stdout", echo) + _available_logs(stderr_tail, "stderr", echo) + now = time.time() + log_update_delay = update_delay(now - start_time) + next_log_update = now + log_update_delay + + # This sleep should never delay log updates. On the other hand, + # we should exit this loop when the task has finished without + # a long delay, regardless of the log tailing schedule + time.sleep(min(log_update_delay, 5.0)) + # It is possible that we exit the loop above before all logs have been + # tailed. + _available_logs(stdout_tail, "stdout", echo) + _available_logs(stderr_tail, "stderr", echo) diff --git a/metaflow/mflog/redirect_streams.py b/metaflow/mflog/redirect_streams.py new file mode 100644 index 0000000000..36aac03342 --- /dev/null +++ b/metaflow/mflog/redirect_streams.py @@ -0,0 +1,54 @@ +import os +import sys +import subprocess +import threading + +from .mflog import decorate + +# This script runs another process and captures stderr and stdout to a file, decorating +# lines with mflog metadata. +# +# Usage: redirect_streams SOURCE STDOUT_FILE STDERR_FILE PROGRAM ARG1 ARG2 ... + + +def reader_thread(SOURCE, dest_file, dest_stream, src): + with open(dest_file, mode="ab", buffering=0) as f: + if sys.version_info < (3, 0): + # Python 2 + for line in iter(sys.stdin.readline, ""): + # https://bugs.python.org/issue3907 + decorated = decorate(SOURCE, line) + f.write(decorated) + sys.stdout.write(line) + else: + # Python 3 + for line in src: + decorated = decorate(SOURCE, line) + f.write(decorated) + dest_stream.buffer.write(line) + + +if __name__ == "__main__": + SOURCE = sys.argv[1].encode("utf-8") + stdout_dest = sys.argv[2] + stderr_dest = sys.argv[3] + + p = subprocess.Popen( + sys.argv[4:], + env=os.environ, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + + stdout_reader = threading.Thread( + target=reader_thread, args=(SOURCE, stdout_dest, sys.stdout, p.stdout) + ) + stdout_reader.start() + stderr_reader = threading.Thread( + target=reader_thread, args=(SOURCE, stderr_dest, sys.stderr, p.stderr) + ) + stderr_reader.start() + rc = p.wait() + stdout_reader.join() + stderr_reader.join() + sys.exit(rc) diff --git a/metaflow/plugins/aws/batch/batch.py b/metaflow/plugins/aws/batch/batch.py index 7e19c422cf..13c0d9fd9d 100644 --- a/metaflow/plugins/aws/batch/batch.py +++ b/metaflow/plugins/aws/batch/batch.py @@ -20,8 +20,8 @@ from metaflow.mflog.mflog import refine, set_should_persist from metaflow.mflog import ( export_mflog_env_vars, - bash_capture_logs, - update_delay, + capture_output_to_mflog, + tail_logs, BASH_SAVE_LOGS, ) @@ -59,8 +59,11 @@ def _command(self, environment, code_package_url, step_name, step_cmds, task_spe ) init_cmds = environment.get_package_commands(code_package_url) init_expr = " && ".join(init_cmds) - step_expr = bash_capture_logs( - " && ".join(environment.bootstrap_commands(step_name) + step_cmds) + step_expr = " && ".join( + [ + capture_output_to_mflog(a) + for a in (environment.bootstrap_commands(step_name) + step_cmds) + ] ) # construct an entry point that @@ -329,63 +332,24 @@ def wait_for_launch(job): select.poll().poll(200) prefix = b"[%s] " % util.to_bytes(self.job.id) - - def _print_available(tail, stream, should_persist=False): - # print the latest batch of lines from S3Tail - try: - for line in tail: - if should_persist: - line = set_should_persist(line) - else: - line = refine(line, prefix=prefix) - echo(line.strip().decode("utf-8", errors="replace"), stream) - except Exception as ex: - echo( - "[ temporary error in fetching logs: %s ]" % ex, - "stderr", - batch_id=self.job.id, - ) - stdout_tail = S3Tail(stdout_location) stderr_tail = S3Tail(stderr_location) # 1) Loop until the job has started wait_for_launch(self.job) - # 2) Loop until the job has finished - start_time = time.time() - is_running = True - next_log_update = start_time - log_update_delay = 1 - - while is_running: - if time.time() > next_log_update: - _print_available(stdout_tail, "stdout") - _print_available(stderr_tail, "stderr") - now = time.time() - log_update_delay = update_delay(now - start_time) - next_log_update = now + log_update_delay - is_running = self.job.is_running - - # This sleep should never delay log updates. On the other hand, - # we should exit this loop when the task has finished without - # a long delay, regardless of the log tailing schedule - d = min(log_update_delay, 5.0) - select.poll().poll(d * 1000) + # 2) Tail logs until the job has finished + tail_logs( + prefix=prefix, + stdout_tail=stdout_tail, + stderr_tail=stderr_tail, + echo=echo, + has_log_updates=lambda: self.job.is_running, + ) - # 3) Fetch remaining logs - # - # It is possible that we exit the loop above before all logs have been - # shown. - # - # TODO if we notice AWS Batch failing to upload logs to S3, we can add a - # HEAD request here to ensure that the file exists prior to calling - # S3Tail and note the user about truncated logs if it doesn't - _print_available(stdout_tail, "stdout") - _print_available(stderr_tail, "stderr") # In case of hard crashes (OOM), the final save_logs won't happen. - # We fetch the remaining logs from AWS CloudWatch and persist them to - # Amazon S3. + # We can fetch the remaining logs from AWS CloudWatch and persist them + # to Amazon S3. if self.job.is_crashed: msg = next( diff --git a/metaflow/plugins/aws/eks/kubernetes.py b/metaflow/plugins/aws/eks/kubernetes.py index c908020e6a..652f5dcc7f 100644 --- a/metaflow/plugins/aws/eks/kubernetes.py +++ b/metaflow/plugins/aws/eks/kubernetes.py @@ -20,8 +20,8 @@ ) from metaflow.mflog import ( export_mflog_env_vars, - bash_capture_logs, - update_delay, + capture_output_to_mflog, + tail_logs, BASH_SAVE_LOGS, ) from metaflow.mflog.mflog import refine, set_should_persist @@ -134,10 +134,13 @@ def _command( ) init_cmds = self._environment.get_package_commands(code_package_url) init_expr = " && ".join(init_cmds) - step_expr = bash_capture_logs( - " && ".join( - self._environment.bootstrap_commands(self._step_name) + step_cmds - ) + step_expr = " && ".join( + [ + capture_output_to_mflog(a) + for a in ( + self._environment.bootstrap_commands(self._step_name) + step_cmds + ) + ] ) # Construct an entry point that @@ -302,48 +305,21 @@ def wait_for_launch(job): break time.sleep(1) - def _print_available(tail, stream, should_persist=False): - # print the latest batch of lines from S3Tail - prefix = b"[%s] " % util.to_bytes(self._job.id) - try: - for line in tail: - if should_persist: - line = set_should_persist(line) - else: - line = refine(line, prefix=prefix) - echo(line.strip().decode("utf-8", errors="replace"), stream) - except Exception as ex: - echo( - "[ temporary error in fetching logs: %s ]" % ex, - "stderr", - job_id=self._job.id, - ) - + prefix = b"[%s] " % util.to_bytes(self._job.id) stdout_tail = S3Tail(stdout_location) stderr_tail = S3Tail(stderr_location) # 1) Loop until the job has started wait_for_launch(self._job) - # 2) Loop until the job has finished - start_time = time.time() - is_running = True - next_log_update = start_time - log_update_delay = 1 - - while is_running: - if time.time() > next_log_update: - _print_available(stdout_tail, "stdout") - _print_available(stderr_tail, "stderr") - now = time.time() - log_update_delay = update_delay(now - start_time) - next_log_update = now + log_update_delay - is_running = self._job.is_running - - # This sleep should never delay log updates. On the other hand, - # we should exit this loop when the task has finished without - # a long delay, regardless of the log tailing schedule - time.sleep(min(log_update_delay, 5.0)) + # 2) Tail logs until the job has finished + tail_logs( + prefix=prefix, + stdout_tail=stdout_tail, + stderr_tail=stderr_tail, + echo=echo, + has_log_updates=lambda: self._job.is_running, + ) # 3) Fetch remaining logs # @@ -355,8 +331,6 @@ def _print_available(tail, stream, should_persist=False): # exists prior to calling S3Tail and note the user about # truncated logs if it doesn't. # TODO (savin): For hard crashes, we can fetch logs from the pod. - _print_available(stdout_tail, "stdout") - _print_available(stderr_tail, "stderr") if self._job.has_failed: exit_code, reason = self._job.reason