Skip to content

Commit

Permalink
mflog changes for supporting AWS Lambda (Netflix#801)
Browse files Browse the repository at this point in the history
* mflog changes for supporting AWS Lambda

* updates to mflog

* updates to mflog

* fix mflog capture for multiple commands (Netflix#805)

Co-authored-by: Oleg Avdeev <[email protected]>
  • Loading branch information
savingoyal and oavdeev authored Nov 5, 2021
1 parent e778f5d commit ea74c46
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 106 deletions.
62 changes: 53 additions & 9 deletions metaflow/mflog/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import math
import time

from .mflog import refine, set_should_persist

from metaflow.util import to_unicode

# Log source indicates the system that *minted the timestamp*
# for the logline. This means that for a single task we can
Expand Down Expand Up @@ -39,17 +44,17 @@
BASH_SAVE_LOGS = " ".join(BASH_SAVE_LOGS_ARGS)

# this function returns a bash expression that redirects stdout
# and stderr of the given bash expression to mflog.tee
def bash_capture_logs(bash_expr, var_transform=None):
# and stderr of the given command to mflog
def capture_output_to_mflog(command_and_args, var_transform=None):
if var_transform is None:
var_transform = lambda s: "$%s" % s
cmd = "python -m metaflow.mflog.tee %s %s"
parts = (
bash_expr,
cmd % (TASK_LOG_SOURCE, var_transform("MFLOG_STDOUT")),
cmd % (TASK_LOG_SOURCE, var_transform("MFLOG_STDERR")),

return "python -m metaflow.mflog.redirect_streams %s %s %s %s" % (
TASK_LOG_SOURCE,
var_transform("MFLOG_STDOUT"),
var_transform("MFLOG_STDERR"),
command_and_args,
)
return "(%s) 1>> >(%s) 2>> >(%s >&2)" % parts


# update_delay determines how often logs should be uploaded to S3
Expand All @@ -71,7 +76,8 @@ def update_delay(secs_since_start):


# this function is used to generate a Bash 'export' expression that
# sets environment variables that are used by 'tee' and 'save_logs'.
# sets environment variables that are used by 'redirect_streams' and
# 'save_logs'.
# Note that we can't set the env vars statically, as some of them
# may need to be evaluated during runtime
def export_mflog_env_vars(
Expand Down Expand Up @@ -99,3 +105,41 @@ def export_mflog_env_vars(
env_vars["MF_DATASTORE_ROOT"] = datastore_root

return "export " + " ".join("%s=%s" % kv for kv in env_vars.items())


def tail_logs(prefix, stdout_tail, stderr_tail, echo, has_log_updates):
def _available_logs(tail, stream, echo, should_persist=False):
# print the latest batch of lines
try:
for line in tail:
if should_persist:
line = set_should_persist(line)
else:
line = refine(line, prefix=prefix)
echo(line.strip().decode("utf-8", errors="replace"), stream)
except Exception as ex:
echo(
"%s[ temporary error in fetching logs: %s ]" % to_unicode(prefix),
ex,
"stderr",
)

start_time = time.time()
next_log_update = start_time
log_update_delay = 1
while has_log_updates():
if time.time() > next_log_update:
_available_logs(stdout_tail, "stdout", echo)
_available_logs(stderr_tail, "stderr", echo)
now = time.time()
log_update_delay = update_delay(now - start_time)
next_log_update = now + log_update_delay

# This sleep should never delay log updates. On the other hand,
# we should exit this loop when the task has finished without
# a long delay, regardless of the log tailing schedule
time.sleep(min(log_update_delay, 5.0))
# It is possible that we exit the loop above before all logs have been
# tailed.
_available_logs(stdout_tail, "stdout", echo)
_available_logs(stderr_tail, "stderr", echo)
54 changes: 54 additions & 0 deletions metaflow/mflog/redirect_streams.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import os
import sys
import subprocess
import threading

from .mflog import decorate

# This script runs another process and captures stderr and stdout to a file, decorating
# lines with mflog metadata.
#
# Usage: redirect_streams SOURCE STDOUT_FILE STDERR_FILE PROGRAM ARG1 ARG2 ...


def reader_thread(SOURCE, dest_file, dest_stream, src):
with open(dest_file, mode="ab", buffering=0) as f:
if sys.version_info < (3, 0):
# Python 2
for line in iter(sys.stdin.readline, ""):
# https://bugs.python.org/issue3907
decorated = decorate(SOURCE, line)
f.write(decorated)
sys.stdout.write(line)
else:
# Python 3
for line in src:
decorated = decorate(SOURCE, line)
f.write(decorated)
dest_stream.buffer.write(line)


if __name__ == "__main__":
SOURCE = sys.argv[1].encode("utf-8")
stdout_dest = sys.argv[2]
stderr_dest = sys.argv[3]

p = subprocess.Popen(
sys.argv[4:],
env=os.environ,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)

stdout_reader = threading.Thread(
target=reader_thread, args=(SOURCE, stdout_dest, sys.stdout, p.stdout)
)
stdout_reader.start()
stderr_reader = threading.Thread(
target=reader_thread, args=(SOURCE, stderr_dest, sys.stderr, p.stderr)
)
stderr_reader.start()
rc = p.wait()
stdout_reader.join()
stderr_reader.join()
sys.exit(rc)
70 changes: 17 additions & 53 deletions metaflow/plugins/aws/batch/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
from metaflow.mflog.mflog import refine, set_should_persist
from metaflow.mflog import (
export_mflog_env_vars,
bash_capture_logs,
update_delay,
capture_output_to_mflog,
tail_logs,
BASH_SAVE_LOGS,
)

Expand Down Expand Up @@ -59,8 +59,11 @@ def _command(self, environment, code_package_url, step_name, step_cmds, task_spe
)
init_cmds = environment.get_package_commands(code_package_url)
init_expr = " && ".join(init_cmds)
step_expr = bash_capture_logs(
" && ".join(environment.bootstrap_commands(step_name) + step_cmds)
step_expr = " && ".join(
[
capture_output_to_mflog(a)
for a in (environment.bootstrap_commands(step_name) + step_cmds)
]
)

# construct an entry point that
Expand Down Expand Up @@ -329,63 +332,24 @@ def wait_for_launch(job):
select.poll().poll(200)

prefix = b"[%s] " % util.to_bytes(self.job.id)

def _print_available(tail, stream, should_persist=False):
# print the latest batch of lines from S3Tail
try:
for line in tail:
if should_persist:
line = set_should_persist(line)
else:
line = refine(line, prefix=prefix)
echo(line.strip().decode("utf-8", errors="replace"), stream)
except Exception as ex:
echo(
"[ temporary error in fetching logs: %s ]" % ex,
"stderr",
batch_id=self.job.id,
)

stdout_tail = S3Tail(stdout_location)
stderr_tail = S3Tail(stderr_location)

# 1) Loop until the job has started
wait_for_launch(self.job)

# 2) Loop until the job has finished
start_time = time.time()
is_running = True
next_log_update = start_time
log_update_delay = 1

while is_running:
if time.time() > next_log_update:
_print_available(stdout_tail, "stdout")
_print_available(stderr_tail, "stderr")
now = time.time()
log_update_delay = update_delay(now - start_time)
next_log_update = now + log_update_delay
is_running = self.job.is_running

# This sleep should never delay log updates. On the other hand,
# we should exit this loop when the task has finished without
# a long delay, regardless of the log tailing schedule
d = min(log_update_delay, 5.0)
select.poll().poll(d * 1000)
# 2) Tail logs until the job has finished
tail_logs(
prefix=prefix,
stdout_tail=stdout_tail,
stderr_tail=stderr_tail,
echo=echo,
has_log_updates=lambda: self.job.is_running,
)

# 3) Fetch remaining logs
#
# It is possible that we exit the loop above before all logs have been
# shown.
#
# TODO if we notice AWS Batch failing to upload logs to S3, we can add a
# HEAD request here to ensure that the file exists prior to calling
# S3Tail and note the user about truncated logs if it doesn't
_print_available(stdout_tail, "stdout")
_print_available(stderr_tail, "stderr")
# In case of hard crashes (OOM), the final save_logs won't happen.
# We fetch the remaining logs from AWS CloudWatch and persist them to
# Amazon S3.
# We can fetch the remaining logs from AWS CloudWatch and persist them
# to Amazon S3.

if self.job.is_crashed:
msg = next(
Expand Down
62 changes: 18 additions & 44 deletions metaflow/plugins/aws/eks/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
)
from metaflow.mflog import (
export_mflog_env_vars,
bash_capture_logs,
update_delay,
capture_output_to_mflog,
tail_logs,
BASH_SAVE_LOGS,
)
from metaflow.mflog.mflog import refine, set_should_persist
Expand Down Expand Up @@ -134,10 +134,13 @@ def _command(
)
init_cmds = self._environment.get_package_commands(code_package_url)
init_expr = " && ".join(init_cmds)
step_expr = bash_capture_logs(
" && ".join(
self._environment.bootstrap_commands(self._step_name) + step_cmds
)
step_expr = " && ".join(
[
capture_output_to_mflog(a)
for a in (
self._environment.bootstrap_commands(self._step_name) + step_cmds
)
]
)

# Construct an entry point that
Expand Down Expand Up @@ -302,48 +305,21 @@ def wait_for_launch(job):
break
time.sleep(1)

def _print_available(tail, stream, should_persist=False):
# print the latest batch of lines from S3Tail
prefix = b"[%s] " % util.to_bytes(self._job.id)
try:
for line in tail:
if should_persist:
line = set_should_persist(line)
else:
line = refine(line, prefix=prefix)
echo(line.strip().decode("utf-8", errors="replace"), stream)
except Exception as ex:
echo(
"[ temporary error in fetching logs: %s ]" % ex,
"stderr",
job_id=self._job.id,
)

prefix = b"[%s] " % util.to_bytes(self._job.id)
stdout_tail = S3Tail(stdout_location)
stderr_tail = S3Tail(stderr_location)

# 1) Loop until the job has started
wait_for_launch(self._job)

# 2) Loop until the job has finished
start_time = time.time()
is_running = True
next_log_update = start_time
log_update_delay = 1

while is_running:
if time.time() > next_log_update:
_print_available(stdout_tail, "stdout")
_print_available(stderr_tail, "stderr")
now = time.time()
log_update_delay = update_delay(now - start_time)
next_log_update = now + log_update_delay
is_running = self._job.is_running

# This sleep should never delay log updates. On the other hand,
# we should exit this loop when the task has finished without
# a long delay, regardless of the log tailing schedule
time.sleep(min(log_update_delay, 5.0))
# 2) Tail logs until the job has finished
tail_logs(
prefix=prefix,
stdout_tail=stdout_tail,
stderr_tail=stderr_tail,
echo=echo,
has_log_updates=lambda: self._job.is_running,
)

# 3) Fetch remaining logs
#
Expand All @@ -355,8 +331,6 @@ def _print_available(tail, stream, should_persist=False):
# exists prior to calling S3Tail and note the user about
# truncated logs if it doesn't.
# TODO (savin): For hard crashes, we can fetch logs from the pod.
_print_available(stdout_tail, "stdout")
_print_available(stderr_tail, "stderr")

if self._job.has_failed:
exit_code, reason = self._job.reason
Expand Down

0 comments on commit ea74c46

Please sign in to comment.