Skip to content

Commit

Permalink
Add --sync-no-step to neptune sync
Browse files Browse the repository at this point in the history
  • Loading branch information
kgodlewski committed Jan 10, 2025
1 parent 7b06e23 commit 3ae111b
Showing 1 changed file with 41 additions and 7 deletions.
48 changes: 41 additions & 7 deletions src/neptune_scale/cli/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@
)
from neptune_scale.exceptions import (
NeptuneRunDuplicate,
NeptuneRunForkParentNotFound,
NeptuneSeriesStepNonIncreasing,
)
from neptune_scale.net.runs import run_exists
from neptune_scale.storage.operations import (
LocalRun,
OperationReader,
OperationWriter,
list_runs,
Expand Down Expand Up @@ -145,12 +148,9 @@ def _error_callback(state: SyncState, exc: BaseException, ts: Optional[float]) -


def _warning_callback(state: SyncState, exc: BaseException, ts: Optional[float]) -> None:
if isinstance(exc, NeptuneRunDuplicate):
if isinstance(exc, (NeptuneRunDuplicate, NeptuneRunForkParentNotFound)):
# Silence the warning
return
# elif isinstance(exc, NeptuneRunForkParentNotFound):
# state.set_error(exc)
# return

logger.warning(f"{exc}")

Expand Down Expand Up @@ -192,7 +192,28 @@ def _do_sync(reader: OperationReader, state: SyncState) -> None:
state.cond.notify_all()


def sync_file(path: Path, api_token: Optional[str], allow_non_increasing_step: bool) -> None:
def _verify_fork_parent(local_run: LocalRun, parent_must_exist: bool) -> None:
if not local_run.fork_run_id:
return

for i in range(3):
if run_exists(local_run.project, local_run.fork_run_id):
return
time.sleep(2**i)

msg = f"Parent Run `{local_run.fork_run_id}` does not exist."

if parent_must_exist:
logger.error(msg)
raise NeptuneRunForkParentNotFound()

msg += " Proceeding because --sync-no-parent was passed."
logger.warning(msg)


def sync_file(
path: Path, api_token: Optional[str], *, allow_non_increasing_step: bool, parent_must_exist: bool
) -> None:
logger.info(f"Processing file {path}")
reader = OperationReader(path)
local_run = reader.run
Expand All @@ -215,6 +236,8 @@ def sync_file(path: Path, api_token: Optional[str], allow_non_increasing_step: b
creation_time=local_run.creation_time,
)

_verify_fork_parent(local_run, parent_must_exist)

state = SyncState(allow_non_increasing_step)
run = Run(
run_id=local_run.run_id,
Expand Down Expand Up @@ -257,9 +280,15 @@ def sync_file(path: Path, api_token: Optional[str], allow_non_increasing_step: b
help="Do not abort on non-increasing metric steps being sent. This is useful for resuming interrupted syncs that "
"are stuck on a metric being sent multiple times. ",
)
@click.option("--sync-no-parent", is_flag=True, help="Do not require the parent run to exist when syncing forked runs")
@click.pass_context
def sync(
ctx: click.Context, filename: Optional[str], api_token: Optional[str], keep: bool, allow_non_increasing_step: bool
ctx: click.Context,
filename: Optional[str],
api_token: Optional[str],
keep: bool,
allow_non_increasing_step: bool,
sync_no_parent: bool,
) -> None:
neptune_dir = ctx.obj["neptune_dir"]
if not is_neptune_dir(neptune_dir):
Expand All @@ -282,7 +311,12 @@ def sync(

for path in files:
try:
sync_file(path, api_token=api_token, allow_non_increasing_step=allow_non_increasing_step)
sync_file(
path,
api_token=api_token,
allow_non_increasing_step=allow_non_increasing_step,
parent_must_exist=not sync_no_parent,
)
if not keep:
logger.info(f"Removing file {path}")
path.unlink()
Expand Down

0 comments on commit 3ae111b

Please sign in to comment.