Skip to content

Commit

Permalink
Refactor PopenHandler API for better subclassing for replica popen ha…
Browse files Browse the repository at this point in the history
…ndling (#883)

Summary:

## No functional changes
- refactor Local Scheduler process popen local into a PopenHandler class as a field for easier subclass.

Differential Revision: D56162173
  • Loading branch information
cniii authored and facebook-github-bot committed Apr 16, 2024
1 parent a38a1f8 commit 75916cc
Showing 1 changed file with 119 additions and 95 deletions.
214 changes: 119 additions & 95 deletions torchx/schedulers/local_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,111 @@ def failed(self) -> bool:
return self.proc.returncode != 0


class _PopenHandler:
"""
Handles the creation ``subprocess.Popen`` for each replica process with
configuration extracted from ``ReplicaParam``.
"""

def __init__(self) -> None:
pass

def _get_file_io(self, file: Optional[str]) -> Optional[io.FileIO]:
"""
Given a file name, opens the file for write and returns the IO.
If no file name is given, then returns ``None``
Raises a ``FileExistsError`` if the file is already present.
"""

if not file:
return None

if os.path.isfile(file):
raise FileExistsError(
f"log file: {file} already exists,"
f" specify a different log_dir, app_name, or remove the file and retry"
)

os.makedirs(os.path.dirname(file), exist_ok=True)
return io.open(file, mode="wb", buffering=0)

def _popen(
self,
role_name: RoleName,
replica_id: int,
replica_params: ReplicaParam,
) -> _LocalReplica:
"""
Same as ``subprocess.Popen(**popen_kwargs)`` but is able to take ``stdout`` and ``stderr``
as file name ``str`` rather than a file-like obj.
"""

stdout_, stderr_, combined_ = self._get_replica_output_handles(replica_params)

args_pfmt = pprint.pformat(asdict(replica_params), indent=2, width=80)
log.debug(f"Running {role_name} (replica {replica_id}):\n {args_pfmt}")
env = self._get_replica_env(replica_params)

proc = subprocess.Popen(
args=replica_params.args,
env=env,
stdout=stdout_,
stderr=stderr_,
start_new_session=True,
cwd=replica_params.cwd,
)
return _LocalReplica(
role_name,
replica_id,
proc,
stdout=stdout_,
stderr=stderr_,
combined=combined_,
error_file=env.get("TORCHELASTIC_ERROR_FILE", "<N/A>"),
)

def _get_replica_env(
self,
replica_params: ReplicaParam,
) -> Dict[str, str]:
"""
Returns environment variables for the ``_LocalReplica``
"""

# inherit parent's env vars since 99.9% of the time we want this behavior
# just make sure we override the parent's env vars with the user_defined ones
env = os.environ.copy()
env.update(replica_params.env)
# PATH is a special one, instead of overriding, append
env["PATH"] = _join_PATH(replica_params.env.get("PATH"), os.getenv("PATH"))

# default to unbuffered python for faster responsiveness locally
env.setdefault("PYTHONUNBUFFERED", "x")

return env

def _get_replica_output_handles(
self,
replica_params: ReplicaParam,
) -> Tuple[Optional[io.FileIO], Optional[io.FileIO], Optional[Tee]]:
"""
Returns the stdout, stderr, and combined outputs of the replica.
If the combined output file is not specified, then the combined output is ``None``.
"""

stdout_ = self._get_file_io(replica_params.stdout)
stderr_ = self._get_file_io(replica_params.stderr)
combined_: Optional[Tee] = None
combined_file = self._get_file_io(replica_params.combined)
if combined_file:
combined_ = Tee(
combined_file,
none_throws(replica_params.stdout),
none_throws(replica_params.stderr),
)
return stdout_, stderr_, combined_


class _LocalAppDef:
"""
Container object used by ``LocalhostScheduler`` to group the pids that
Expand Down Expand Up @@ -600,6 +705,8 @@ def __init__(
self._cache_size = cache_size
_register_termination_signals()

self._popen_handler = _PopenHandler()

self._extra_paths: List[str] = extra_paths or []

# sets lazily on submit or dryrun based on log_dir cfg
Expand Down Expand Up @@ -660,101 +767,6 @@ def _evict_lru(self) -> bool:
log.debug(f"no apps evicted, all {len(self._apps)} apps are running")
return False

def _get_file_io(self, file: Optional[str]) -> Optional[io.FileIO]:
"""
Given a file name, opens the file for write and returns the IO.
If no file name is given, then returns ``None``
Raises a ``FileExistsError`` if the file is already present.
"""

if not file:
return None

if os.path.isfile(file):
raise FileExistsError(
f"log file: {file} already exists,"
f" specify a different log_dir, app_name, or remove the file and retry"
)

os.makedirs(os.path.dirname(file), exist_ok=True)
return io.open(file, mode="wb", buffering=0)

def _popen(
self,
role_name: RoleName,
replica_id: int,
replica_params: ReplicaParam,
) -> _LocalReplica:
"""
Same as ``subprocess.Popen(**popen_kwargs)`` but is able to take ``stdout`` and ``stderr``
as file name ``str`` rather than a file-like obj.
"""

stdout_, stderr_, combined_ = self._get_replica_output_handles(replica_params)

args_pfmt = pprint.pformat(asdict(replica_params), indent=2, width=80)
log.debug(f"Running {role_name} (replica {replica_id}):\n {args_pfmt}")
env = self._get_replica_env(replica_params)

proc = subprocess.Popen(
args=replica_params.args,
env=env,
stdout=stdout_,
stderr=stderr_,
start_new_session=True,
cwd=replica_params.cwd,
)
return _LocalReplica(
role_name,
replica_id,
proc,
stdout=stdout_,
stderr=stderr_,
combined=combined_,
error_file=env.get("TORCHELASTIC_ERROR_FILE", "<N/A>"),
)

def _get_replica_output_handles(
self,
replica_params: ReplicaParam,
) -> Tuple[Optional[io.FileIO], Optional[io.FileIO], Optional[Tee]]:
"""
Returns the stdout, stderr, and combined outputs of the replica.
If the combined output file is not specified, then the combined output is ``None``.
"""

stdout_ = self._get_file_io(replica_params.stdout)
stderr_ = self._get_file_io(replica_params.stderr)
combined_: Optional[Tee] = None
combined_file = self._get_file_io(replica_params.combined)
if combined_file:
combined_ = Tee(
combined_file,
none_throws(replica_params.stdout),
none_throws(replica_params.stderr),
)
return stdout_, stderr_, combined_

def _get_replica_env(
self,
replica_params: ReplicaParam,
) -> Dict[str, str]:
"""
Returns environment variables for the ``_LocalReplica``
"""

# inherit parent's env vars since 99.9% of the time we want this behavior
# just make sure we override the parent's env vars with the user_defined ones
env = os.environ.copy()
env.update(replica_params.env)
# PATH is a special one, instead of overriding, append
env["PATH"] = _join_PATH(replica_params.env.get("PATH"), os.getenv("PATH"))

# default to unbuffered python for faster responsiveness locally
env.setdefault("PYTHONUNBUFFERED", "x")

return env

def _get_app_log_dir(self, app_id: str, cfg: LocalOpts) -> str:
"""
Returns the log dir. We redirect stdout/err
Expand Down Expand Up @@ -811,6 +823,18 @@ def schedule(self, dryrun_info: AppDryRunInfo[PopenRequest]) -> str:
self._apps[app_id] = local_app
return app_id

def _popen(
self,
role_name: RoleName,
replica_id: int,
replica_params: ReplicaParam,
) -> _LocalReplica:
return self._popen_handler._popen(
role_name,
replica_id,
replica_params,
)

def _submit_dryrun(
self, app: AppDef, cfg: LocalOpts
) -> AppDryRunInfo[PopenRequest]:
Expand Down

0 comments on commit 75916cc

Please sign in to comment.