Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor PopenHandler API for better subclassing for replica popen handling #883

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 119 additions & 95 deletions torchx/schedulers/local_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,111 @@ def failed(self) -> bool:
return self.proc.returncode != 0


class _PopenHandler:
"""
Handles the creation ``subprocess.Popen`` for each replica process with
configuration extracted from ``ReplicaParam``.
"""

def __init__(self) -> None:
pass

def _get_file_io(self, file: Optional[str]) -> Optional[io.FileIO]:
"""
Given a file name, opens the file for write and returns the IO.
If no file name is given, then returns ``None``
Raises a ``FileExistsError`` if the file is already present.
"""

if not file:
return None

if os.path.isfile(file):
raise FileExistsError(
f"log file: {file} already exists,"
f" specify a different log_dir, app_name, or remove the file and retry"
)

os.makedirs(os.path.dirname(file), exist_ok=True)
return io.open(file, mode="wb", buffering=0)

def _popen(
self,
role_name: RoleName,
replica_id: int,
replica_params: ReplicaParam,
) -> _LocalReplica:
"""
Same as ``subprocess.Popen(**popen_kwargs)`` but is able to take ``stdout`` and ``stderr``
as file name ``str`` rather than a file-like obj.
"""

stdout_, stderr_, combined_ = self._get_replica_output_handles(replica_params)

args_pfmt = pprint.pformat(asdict(replica_params), indent=2, width=80)
log.debug(f"Running {role_name} (replica {replica_id}):\n {args_pfmt}")
env = self._get_replica_env(replica_params)

proc = subprocess.Popen(
args=replica_params.args,
env=env,
stdout=stdout_,
stderr=stderr_,
start_new_session=True,
cwd=replica_params.cwd,
)
return _LocalReplica(
role_name,
replica_id,
proc,
stdout=stdout_,
stderr=stderr_,
combined=combined_,
error_file=env.get("TORCHELASTIC_ERROR_FILE", "<N/A>"),
)

def _get_replica_env(
self,
replica_params: ReplicaParam,
) -> Dict[str, str]:
"""
Returns environment variables for the ``_LocalReplica``
"""

# inherit parent's env vars since 99.9% of the time we want this behavior
# just make sure we override the parent's env vars with the user_defined ones
env = os.environ.copy()
env.update(replica_params.env)
# PATH is a special one, instead of overriding, append
env["PATH"] = _join_PATH(replica_params.env.get("PATH"), os.getenv("PATH"))

# default to unbuffered python for faster responsiveness locally
env.setdefault("PYTHONUNBUFFERED", "x")

return env

def _get_replica_output_handles(
self,
replica_params: ReplicaParam,
) -> Tuple[Optional[io.FileIO], Optional[io.FileIO], Optional[Tee]]:
"""
Returns the stdout, stderr, and combined outputs of the replica.
If the combined output file is not specified, then the combined output is ``None``.
"""

stdout_ = self._get_file_io(replica_params.stdout)
stderr_ = self._get_file_io(replica_params.stderr)
combined_: Optional[Tee] = None
combined_file = self._get_file_io(replica_params.combined)
if combined_file:
combined_ = Tee(
combined_file,
none_throws(replica_params.stdout),
none_throws(replica_params.stderr),
)
return stdout_, stderr_, combined_


class _LocalAppDef:
"""
Container object used by ``LocalhostScheduler`` to group the pids that
Expand Down Expand Up @@ -600,6 +705,8 @@ def __init__(
self._cache_size = cache_size
_register_termination_signals()

self._popen_handler = _PopenHandler()

self._extra_paths: List[str] = extra_paths or []

# sets lazily on submit or dryrun based on log_dir cfg
Expand Down Expand Up @@ -660,101 +767,6 @@ def _evict_lru(self) -> bool:
log.debug(f"no apps evicted, all {len(self._apps)} apps are running")
return False

def _get_file_io(self, file: Optional[str]) -> Optional[io.FileIO]:
"""
Given a file name, opens the file for write and returns the IO.
If no file name is given, then returns ``None``
Raises a ``FileExistsError`` if the file is already present.
"""

if not file:
return None

if os.path.isfile(file):
raise FileExistsError(
f"log file: {file} already exists,"
f" specify a different log_dir, app_name, or remove the file and retry"
)

os.makedirs(os.path.dirname(file), exist_ok=True)
return io.open(file, mode="wb", buffering=0)

def _popen(
self,
role_name: RoleName,
replica_id: int,
replica_params: ReplicaParam,
) -> _LocalReplica:
"""
Same as ``subprocess.Popen(**popen_kwargs)`` but is able to take ``stdout`` and ``stderr``
as file name ``str`` rather than a file-like obj.
"""

stdout_, stderr_, combined_ = self._get_replica_output_handles(replica_params)

args_pfmt = pprint.pformat(asdict(replica_params), indent=2, width=80)
log.debug(f"Running {role_name} (replica {replica_id}):\n {args_pfmt}")
env = self._get_replica_env(replica_params)

proc = subprocess.Popen(
args=replica_params.args,
env=env,
stdout=stdout_,
stderr=stderr_,
start_new_session=True,
cwd=replica_params.cwd,
)
return _LocalReplica(
role_name,
replica_id,
proc,
stdout=stdout_,
stderr=stderr_,
combined=combined_,
error_file=env.get("TORCHELASTIC_ERROR_FILE", "<N/A>"),
)

def _get_replica_output_handles(
self,
replica_params: ReplicaParam,
) -> Tuple[Optional[io.FileIO], Optional[io.FileIO], Optional[Tee]]:
"""
Returns the stdout, stderr, and combined outputs of the replica.
If the combined output file is not specified, then the combined output is ``None``.
"""

stdout_ = self._get_file_io(replica_params.stdout)
stderr_ = self._get_file_io(replica_params.stderr)
combined_: Optional[Tee] = None
combined_file = self._get_file_io(replica_params.combined)
if combined_file:
combined_ = Tee(
combined_file,
none_throws(replica_params.stdout),
none_throws(replica_params.stderr),
)
return stdout_, stderr_, combined_

def _get_replica_env(
self,
replica_params: ReplicaParam,
) -> Dict[str, str]:
"""
Returns environment variables for the ``_LocalReplica``
"""

# inherit parent's env vars since 99.9% of the time we want this behavior
# just make sure we override the parent's env vars with the user_defined ones
env = os.environ.copy()
env.update(replica_params.env)
# PATH is a special one, instead of overriding, append
env["PATH"] = _join_PATH(replica_params.env.get("PATH"), os.getenv("PATH"))

# default to unbuffered python for faster responsiveness locally
env.setdefault("PYTHONUNBUFFERED", "x")

return env

def _get_app_log_dir(self, app_id: str, cfg: LocalOpts) -> str:
"""
Returns the log dir. We redirect stdout/err
Expand Down Expand Up @@ -811,6 +823,18 @@ def schedule(self, dryrun_info: AppDryRunInfo[PopenRequest]) -> str:
self._apps[app_id] = local_app
return app_id

def _popen(
self,
role_name: RoleName,
replica_id: int,
replica_params: ReplicaParam,
) -> _LocalReplica:
return self._popen_handler._popen(
role_name,
replica_id,
replica_params,
)

def _submit_dryrun(
self, app: AppDef, cfg: LocalOpts
) -> AppDryRunInfo[PopenRequest]:
Expand Down