From 75916cc37ef9c14c36c3fbf4d454a4caf76847d2 Mon Sep 17 00:00:00 2001 From: Cheng Ni Date: Mon, 15 Apr 2024 17:07:39 -0700 Subject: [PATCH] Refactor PopenHandler API for better subclassing for replica popen handling (#883) Summary: ## No functional changes - refactor Local Scheduler process popen local into a PopenHandler class as a field for easier subclass. Differential Revision: D56162173 --- torchx/schedulers/local_scheduler.py | 214 +++++++++++++++------------ 1 file changed, 119 insertions(+), 95 deletions(-) diff --git a/torchx/schedulers/local_scheduler.py b/torchx/schedulers/local_scheduler.py index 2fa07d362..899da0255 100644 --- a/torchx/schedulers/local_scheduler.py +++ b/torchx/schedulers/local_scheduler.py @@ -333,6 +333,111 @@ def failed(self) -> bool: return self.proc.returncode != 0 +class _PopenHandler: + """ + Handles the creation ``subprocess.Popen`` for each replica process with + configuration extracted from ``ReplicaParam``. + """ + + def __init__(self) -> None: + pass + + def _get_file_io(self, file: Optional[str]) -> Optional[io.FileIO]: + """ + Given a file name, opens the file for write and returns the IO. + If no file name is given, then returns ``None`` + Raises a ``FileExistsError`` if the file is already present. + """ + + if not file: + return None + + if os.path.isfile(file): + raise FileExistsError( + f"log file: {file} already exists," + f" specify a different log_dir, app_name, or remove the file and retry" + ) + + os.makedirs(os.path.dirname(file), exist_ok=True) + return io.open(file, mode="wb", buffering=0) + + def _popen( + self, + role_name: RoleName, + replica_id: int, + replica_params: ReplicaParam, + ) -> _LocalReplica: + """ + Same as ``subprocess.Popen(**popen_kwargs)`` but is able to take ``stdout`` and ``stderr`` + as file name ``str`` rather than a file-like obj. + """ + + stdout_, stderr_, combined_ = self._get_replica_output_handles(replica_params) + + args_pfmt = pprint.pformat(asdict(replica_params), indent=2, width=80) + log.debug(f"Running {role_name} (replica {replica_id}):\n {args_pfmt}") + env = self._get_replica_env(replica_params) + + proc = subprocess.Popen( + args=replica_params.args, + env=env, + stdout=stdout_, + stderr=stderr_, + start_new_session=True, + cwd=replica_params.cwd, + ) + return _LocalReplica( + role_name, + replica_id, + proc, + stdout=stdout_, + stderr=stderr_, + combined=combined_, + error_file=env.get("TORCHELASTIC_ERROR_FILE", ""), + ) + + def _get_replica_env( + self, + replica_params: ReplicaParam, + ) -> Dict[str, str]: + """ + Returns environment variables for the ``_LocalReplica`` + """ + + # inherit parent's env vars since 99.9% of the time we want this behavior + # just make sure we override the parent's env vars with the user_defined ones + env = os.environ.copy() + env.update(replica_params.env) + # PATH is a special one, instead of overriding, append + env["PATH"] = _join_PATH(replica_params.env.get("PATH"), os.getenv("PATH")) + + # default to unbuffered python for faster responsiveness locally + env.setdefault("PYTHONUNBUFFERED", "x") + + return env + + def _get_replica_output_handles( + self, + replica_params: ReplicaParam, + ) -> Tuple[Optional[io.FileIO], Optional[io.FileIO], Optional[Tee]]: + """ + Returns the stdout, stderr, and combined outputs of the replica. + If the combined output file is not specified, then the combined output is ``None``. + """ + + stdout_ = self._get_file_io(replica_params.stdout) + stderr_ = self._get_file_io(replica_params.stderr) + combined_: Optional[Tee] = None + combined_file = self._get_file_io(replica_params.combined) + if combined_file: + combined_ = Tee( + combined_file, + none_throws(replica_params.stdout), + none_throws(replica_params.stderr), + ) + return stdout_, stderr_, combined_ + + class _LocalAppDef: """ Container object used by ``LocalhostScheduler`` to group the pids that @@ -600,6 +705,8 @@ def __init__( self._cache_size = cache_size _register_termination_signals() + self._popen_handler = _PopenHandler() + self._extra_paths: List[str] = extra_paths or [] # sets lazily on submit or dryrun based on log_dir cfg @@ -660,101 +767,6 @@ def _evict_lru(self) -> bool: log.debug(f"no apps evicted, all {len(self._apps)} apps are running") return False - def _get_file_io(self, file: Optional[str]) -> Optional[io.FileIO]: - """ - Given a file name, opens the file for write and returns the IO. - If no file name is given, then returns ``None`` - Raises a ``FileExistsError`` if the file is already present. - """ - - if not file: - return None - - if os.path.isfile(file): - raise FileExistsError( - f"log file: {file} already exists," - f" specify a different log_dir, app_name, or remove the file and retry" - ) - - os.makedirs(os.path.dirname(file), exist_ok=True) - return io.open(file, mode="wb", buffering=0) - - def _popen( - self, - role_name: RoleName, - replica_id: int, - replica_params: ReplicaParam, - ) -> _LocalReplica: - """ - Same as ``subprocess.Popen(**popen_kwargs)`` but is able to take ``stdout`` and ``stderr`` - as file name ``str`` rather than a file-like obj. - """ - - stdout_, stderr_, combined_ = self._get_replica_output_handles(replica_params) - - args_pfmt = pprint.pformat(asdict(replica_params), indent=2, width=80) - log.debug(f"Running {role_name} (replica {replica_id}):\n {args_pfmt}") - env = self._get_replica_env(replica_params) - - proc = subprocess.Popen( - args=replica_params.args, - env=env, - stdout=stdout_, - stderr=stderr_, - start_new_session=True, - cwd=replica_params.cwd, - ) - return _LocalReplica( - role_name, - replica_id, - proc, - stdout=stdout_, - stderr=stderr_, - combined=combined_, - error_file=env.get("TORCHELASTIC_ERROR_FILE", ""), - ) - - def _get_replica_output_handles( - self, - replica_params: ReplicaParam, - ) -> Tuple[Optional[io.FileIO], Optional[io.FileIO], Optional[Tee]]: - """ - Returns the stdout, stderr, and combined outputs of the replica. - If the combined output file is not specified, then the combined output is ``None``. - """ - - stdout_ = self._get_file_io(replica_params.stdout) - stderr_ = self._get_file_io(replica_params.stderr) - combined_: Optional[Tee] = None - combined_file = self._get_file_io(replica_params.combined) - if combined_file: - combined_ = Tee( - combined_file, - none_throws(replica_params.stdout), - none_throws(replica_params.stderr), - ) - return stdout_, stderr_, combined_ - - def _get_replica_env( - self, - replica_params: ReplicaParam, - ) -> Dict[str, str]: - """ - Returns environment variables for the ``_LocalReplica`` - """ - - # inherit parent's env vars since 99.9% of the time we want this behavior - # just make sure we override the parent's env vars with the user_defined ones - env = os.environ.copy() - env.update(replica_params.env) - # PATH is a special one, instead of overriding, append - env["PATH"] = _join_PATH(replica_params.env.get("PATH"), os.getenv("PATH")) - - # default to unbuffered python for faster responsiveness locally - env.setdefault("PYTHONUNBUFFERED", "x") - - return env - def _get_app_log_dir(self, app_id: str, cfg: LocalOpts) -> str: """ Returns the log dir. We redirect stdout/err @@ -811,6 +823,18 @@ def schedule(self, dryrun_info: AppDryRunInfo[PopenRequest]) -> str: self._apps[app_id] = local_app return app_id + def _popen( + self, + role_name: RoleName, + replica_id: int, + replica_params: ReplicaParam, + ) -> _LocalReplica: + return self._popen_handler._popen( + role_name, + replica_id, + replica_params, + ) + def _submit_dryrun( self, app: AppDef, cfg: LocalOpts ) -> AppDryRunInfo[PopenRequest]: