Skip to content

Commit

Permalink
Merge branch 'main' into ms/check_batch-backoff
Browse files Browse the repository at this point in the history
  • Loading branch information
michalsosn authored and Michał Sośnicki committed Jan 14, 2025
2 parents aeed5e1 + 12faace commit f441a2f
Show file tree
Hide file tree
Showing 21 changed files with 399 additions and 268 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ repos:
args: [ --config-file, pyproject.toml ]
pass_filenames: false
additional_dependencies:
- neptune-api==0.7.0b
- neptune-api
- more-itertools
- backoff
default_language_version:
Expand Down
34 changes: 20 additions & 14 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,22 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.8.0] - 2024-11-26
## 0.9.0 - 2025-01-07

### Changes
* Removed support for Python 3.8 (https://github.com/neptune-ai/neptune-client-scale/pull/105)

### Added
* Added `projects.list_projects()` method to list projects accessible to the current user (https://github.com/neptune-ai/neptune-client-scale/pull/97)

### Fixed
* Fixed retry behavior on encountering a `NeptuneRetryableError` (https://github.com/neptune-ai/neptune-client-scale/pull/99)
* Fixed batching of metrics when logged with steps out of order (https://github.com/neptune-ai/neptune-client-scale/pull/91)

### Chores
* Not invoking `on_error_callback` on encountering 408 and 429 HTTP statuses (https://github.com/neptune-ai/neptune-client-scale/pull/110)

## 0.8.0 - 2024-11-26

### Added
- Added function `neptune_scale.projects.create_project()` to programmatically create Neptune projects ([#92](https://github.com/neptune-ai/neptune-client-scale/pull/92))
Expand All @@ -24,71 +39,62 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fixed batching of steps ([#82](https://github.com/neptune-ai/neptune-client-scale/pull/82))

## [0.7.2] - 2024-11-07
## 0.7.2 - 2024-11-07

### Added

- List, set, and tuple support for `log_configs()` ([#67](https://github.com/neptune-ai/neptune-client-scale/pull/67))
- Tuple support for tags ([#67](https://github.com/neptune-ai/neptune-client-scale/pull/67))

### Changed

- Performance improvements
- Change the logger's configuration to be more resilient ([#66](https://github.com/neptune-ai/neptune-client-scale/pull/66))
- Update docs: info about timestamp and timezones ([#69](https://github.com/neptune-ai/neptune-client-scale/pull/69))
- Strip quotes from the `NEPTUNE_PROJECT` env variable ([#51](https://github.com/neptune-ai/neptune-client-scale/pull/51))


## [0.7.1] - 2024-10-28
## 0.7.1 - 2024-10-28

### Changed
- Removed `family` from run initialization parameters ([#62](https://github.com/neptune-ai/neptune-client-scale/pull/62))
- Made `timestamp` keyword-only in `log_metrics()` ([#58](https://github.com/neptune-ai/neptune-client-scale/pull/58))

## [0.6.3] - 2024-10-23
## 0.6.3 - 2024-10-23

### Changed

- Changed the signature of `Run.log_metrics`:
- `date` is now the first parameter in line with other logging methods ([#58](https://github.com/neptune-ai/neptune-client-scale/pull/58))
- `step` and `data` are now mandatory ([#55](https://github.com/neptune-ai/neptune-client-scale/pull/55))

- Removed iterables from `log_config` value type hints ([#53](https://github.com/neptune-ai/neptune-client-scale/pull/53))

## [0.6.0] - 2024-09-09
## 0.6.0 - 2024-09-09

### Added

- Dedicated exceptions for missing project or API token ([#44](https://github.com/neptune-ai/neptune-client-scale/pull/44))

### Changed

- Removed `timestamp` parameter from `add_tags()`, `remove_tags()` and `log_configs()` methods ([#37](https://github.com/neptune-ai/neptune-client-scale/pull/37))
- Performance improvements of metadata logging ([#42](https://github.com/neptune-ai/neptune-client-scale/pull/42))

## [0.5.0] - 2024-09-05

### Added

- Added docstrings to logging methods ([#40](https://github.com/neptune-ai/neptune-client-scale/pull/40))

## [0.4.0] - 2024-09-03

### Added

- Added support for integer values when logging metric values ([#33](https://github.com/neptune-ai/neptune-client-scale/pull/33))
- Added support for async lag threshold ([#22](https://github.com/neptune-ai/neptune-client-scale/pull/22))

## [0.3.0] - 2024-09-03

### Added

- Package renamed to `neptune-scale` ([#31](https://github.com/neptune-ai/neptune-client-scale/pull/31))

## [0.2.0] - 2024-09-02

### Added

- Added minimal Run classes ([#6](https://github.com/neptune-ai/neptune-client-scale/pull/6))
- Added support for `max_queue_size` and `max_queue_size_exceeded_callback` parameters in `Run` ([#7](https://github.com/neptune-ai/neptune-client-scale/pull/7))
- Added support for logging metadata ([#8](https://github.com/neptune-ai/neptune-client-scale/pull/8))
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ pattern = "default-unprefixed"
[tool.poetry.dependencies]
python = "^3.9"

neptune-api = "^0.9.0"
neptune-api = "^0.10.0"
more-itertools = "^10.0.0"
psutil = "^5.0.0"
backoff = "^2.0.0"

[tool.poetry]
name = "neptune-scale"
version = "0.1.0"
description = "A minimal client library"
description = "Python logging API for Neptune Scale"
authors = ["neptune.ai <[email protected]>"]
repository = "https://github.com/neptune-ai/neptune-client-scale"
readme = "README.md"
Expand Down
54 changes: 42 additions & 12 deletions src/neptune_scale/api/attribute.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import functools
import itertools
import threading
import warnings
from collections.abc import (
Collection,
Expand All @@ -14,6 +15,7 @@
cast,
)

from neptune_scale.exceptions import NeptuneSeriesStepNonIncreasing
from neptune_scale.sync.metadata_splitter import MetadataSplitter
from neptune_scale.sync.operations_queue import OperationsQueue

Expand Down Expand Up @@ -59,6 +61,11 @@ def __init__(self, project: str, run_id: str, operations_queue: OperationsQueue)
self._run_id = run_id
self._operations_queue = operations_queue
self._attributes: dict[str, Attribute] = {}
# Keep a list of path -> (last step, last value) mappings to detect non-increasing steps
# at call site. The backend will detect this error as well, but it's more convenient for the user
# to get the error as soon as possible.
self._metric_state: dict[str, tuple[float, float]] = {}
self._lock = threading.RLock()

def __getitem__(self, path: str) -> "Attribute":
path = cleanup_path(path)
Expand All @@ -85,22 +92,45 @@ def log(
) -> None:
if timestamp is None:
timestamp = datetime.now()
elif isinstance(timestamp, float):
elif isinstance(timestamp, (float, int)):
timestamp = datetime.fromtimestamp(timestamp)

splitter: MetadataSplitter = MetadataSplitter(
project=self._project,
run_id=self._run_id,
step=step,
timestamp=timestamp,
configs=configs,
metrics=metrics,
add_tags=tags_add,
remove_tags=tags_remove,
# MetadataSplitter is an iterator, so gather everything into a list instead of iterating over
# it in the critical section, to avoid holding the lock for too long.
# TODO: Move splitting into the worker process. Here we should just send messages as they are.
chunks = list(
MetadataSplitter(
project=self._project,
run_id=self._run_id,
step=step,
timestamp=timestamp,
configs=configs,
metrics=metrics,
add_tags=tags_add,
remove_tags=tags_remove,
)
)

for operation, metadata_size in splitter:
self._operations_queue.enqueue(operation=operation, size=metadata_size, key=step)
with self._lock:
self._verify_and_update_metrics_state(step, metrics)

for operation, metadata_size in chunks:
self._operations_queue.enqueue(operation=operation, size=metadata_size)

def _verify_and_update_metrics_state(self, step: Optional[float], metrics: Optional[dict[str, float]]) -> None:
"""Check if step in provided metrics is increasing, raise `NeptuneSeriesStepNonIncreasing` if not."""

if step is None or metrics is None:
return

for metric, value in metrics.items():
if (state := self._metric_state.get(metric)) is not None:
last_step, last_value = state
# Repeating a step is fine as long as the value does not change
if step < last_step or (step == last_step and value != last_value):
raise NeptuneSeriesStepNonIncreasing()

self._metric_state[metric] = (step, value)


class Attribute:
Expand Down
10 changes: 10 additions & 0 deletions src/neptune_scale/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"NeptuneAsyncLagThresholdExceeded",
"NeptuneProjectNotProvided",
"NeptuneApiTokenNotProvided",
"NeptuneTooManyRequestsResponseError",
)

from typing import Any
Expand Down Expand Up @@ -191,6 +192,15 @@ class NeptuneUnexpectedResponseError(NeptuneRetryableError):
"""


class NeptuneTooManyRequestsResponseError(NeptuneRetryableError):
message = """
{h1}
NeptuneTooManyRequestsResponseError: The Neptune server reported receiving too many requests.
{end}
This is a temporary problem. If the problem persists, please contact us at [email protected].
"""


class NeptuneInternalServerError(NeptuneRetryableError):
message = """
{h1}
Expand Down
29 changes: 29 additions & 0 deletions src/neptune_scale/net/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
from typing import (
Any,
Literal,
cast,
)

import httpx
import neptune_retrieval_api.client
from httpx import Timeout
from neptune_api import (
AuthenticatedClient,
Expand Down Expand Up @@ -64,6 +66,11 @@
from neptune_api.proto.neptune_pb.ingest.v1.pub.ingest_pb2 import RunOperation
from neptune_api.proto.neptune_pb.ingest.v1.pub.request_status_pb2 import RequestStatus
from neptune_api.types import Response
from neptune_retrieval_api.api.default import search_leaderboard_entries_proto
from neptune_retrieval_api.models import SearchLeaderboardEntriesParamsDTO
from neptune_retrieval_api.proto.neptune_pb.api.v1.model.leaderboard_entries_pb2 import (
ProtoLeaderboardEntriesSearchResultDTO,
)

from neptune_scale.exceptions import (
NeptuneConnectionLostError,
Expand Down Expand Up @@ -129,6 +136,11 @@ def submit(self, operation: RunOperation, family: str) -> Response[SubmitRespons
@abc.abstractmethod
def check_batch(self, request_ids: list[str], project: str) -> Response[BulkRequestStatus]: ...

@abc.abstractmethod
def search_entries(
self, project_id: str, body: SearchLeaderboardEntriesParamsDTO
) -> ProtoLeaderboardEntriesSearchResultDTO: ...


class HostedApiClient(ApiClient):
def __init__(self, api_token: str) -> None:
Expand All @@ -141,6 +153,9 @@ def __init__(self, api_token: str) -> None:
self.backend = create_auth_api_client(
credentials=credentials, config=config, token_refreshing_urls=token_urls, verify_ssl=verify_ssl
)
# This is required only to silence mypy. The two client objects are compatible, because they're
# generated by swagger codegen.
self.retrieval_backend = cast(neptune_retrieval_api.client.AuthenticatedClient, self.backend)
logger.debug("Connected to Neptune API")

def submit(self, operation: RunOperation, family: str) -> Response[SubmitResponse]:
Expand All @@ -153,6 +168,15 @@ def check_batch(self, request_ids: list[str], project: str) -> Response[BulkRequ
body=RequestIdList(ids=[RequestId(value=request_id) for request_id in request_ids]),
)

def search_entries(
self, project_id: str, body: SearchLeaderboardEntriesParamsDTO
) -> ProtoLeaderboardEntriesSearchResultDTO:
resp = search_leaderboard_entries_proto.sync_detailed(
client=self.retrieval_backend, project_identifier=project_id, type=["run"], body=body
)
result = ProtoLeaderboardEntriesSearchResultDTO.FromString(resp.content)
return result

def close(self) -> None:
logger.debug("Closing API client")
self.backend.__exit__()
Expand Down Expand Up @@ -181,6 +205,11 @@ def check_batch(self, request_ids: list[str], project: str) -> Response[BulkRequ
)
return Response(content=b"", parsed=response_body, status_code=HTTPStatus.OK, headers={})

def search_entries(
self, project_id: str, body: SearchLeaderboardEntriesParamsDTO
) -> ProtoLeaderboardEntriesSearchResultDTO:
return ProtoLeaderboardEntriesSearchResultDTO()


def backend_factory(api_token: str, mode: Literal["async", "disabled"]) -> ApiClient:
if mode == "disabled":
Expand Down
18 changes: 3 additions & 15 deletions src/neptune_scale/net/projects.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os
import re
from enum import Enum
from json import JSONDecodeError
Expand All @@ -11,15 +10,14 @@
import httpx

from neptune_scale.exceptions import (
NeptuneApiTokenNotProvided,
NeptuneBadRequestError,
NeptuneProjectAlreadyExists,
)
from neptune_scale.net.api_client import (
HostedApiClient,
with_api_errors_handling,
)
from neptune_scale.util.envs import API_TOKEN_ENV_NAME
from neptune_scale.sync.util import ensure_api_token

PROJECTS_PATH_BASE = "/api/backend/v1/projects"

Expand All @@ -33,14 +31,6 @@ class ProjectVisibility(Enum):
ORGANIZATION_NOT_FOUND_RE = re.compile(r"Organization .* not found")


def _get_api_token(api_token: Optional[str]) -> str:
api_token = api_token or os.environ.get(API_TOKEN_ENV_NAME)
if api_token is None:
raise NeptuneApiTokenNotProvided()

return api_token


@with_api_errors_handling
def create_project(
workspace: str,
Expand All @@ -52,9 +42,7 @@ def create_project(
fail_if_exists: bool = False,
api_token: Optional[str] = None,
) -> None:
api_token = _get_api_token(api_token)

client = HostedApiClient(api_token=api_token)
client = HostedApiClient(api_token=ensure_api_token(api_token))
visibility = ProjectVisibility(visibility)

body = {
Expand Down Expand Up @@ -92,7 +80,7 @@ def _safe_json(response: httpx.Response) -> Any:


def get_project_list(*, api_token: Optional[str] = None) -> list[dict]:
client = HostedApiClient(api_token=_get_api_token(api_token))
client = HostedApiClient(api_token=ensure_api_token(api_token))

params = {
"userRelation": "viewerOrHigher",
Expand Down
29 changes: 29 additions & 0 deletions src/neptune_scale/net/runs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from typing import Optional

from neptune_retrieval_api.models import SearchLeaderboardEntriesParamsDTO

from neptune_scale.exceptions import NeptuneScaleError
from neptune_scale.net.api_client import HostedApiClient
from neptune_scale.net.util import escape_nql_criterion
from neptune_scale.sync.util import ensure_api_token


def run_exists(project: str, run_id: str, api_token: Optional[str] = None) -> bool:
"""Query the backend for the existence of a Run with the given ID.
Returns True if the Run exists, False otherwise.
"""

client = HostedApiClient(api_token=ensure_api_token(api_token))
body = SearchLeaderboardEntriesParamsDTO.from_dict(
{
"query": {"query": f'`sys/custom_run_id`:string = "{escape_nql_criterion(run_id)}"'},
}
)

try:
result = client.search_entries(project, body)
except Exception as e:
raise NeptuneScaleError(reason=e)

return bool(result.entries)
Loading

0 comments on commit f441a2f

Please sign in to comment.