Skip to content

Commit

Permalink
[DEV-11456] make request chains wait appropriately for sync/async cal…
Browse files Browse the repository at this point in the history
…ls (#298)

* refactor wait logic

* edits

* formatting

* check for debouncer instance instead of float or int; update typing; add kwarg max_wait_time for all calls using wait

* fix random typo in createexport; add debouncer to rest of uqeries

* update typing tuple to union; update docstrings

* update timer logic

* address comments

* change max_wait_time to request_interval

* correct job docstring

* address comments

* address comments

---------

Co-authored-by: Nathanael Shim <[email protected]>
  • Loading branch information
nateshim-indico and Nathanael Shim authored Mar 22, 2024
1 parent 74326f4 commit 1d5ea16
Show file tree
Hide file tree
Showing 10 changed files with 148 additions and 119 deletions.
2 changes: 1 addition & 1 deletion examples/aio_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,4 @@ async def example_1(client):

if __name__ == "__main__":
# How to run a Python script using async
asyncio.run(example_with_client)
asyncio.run(example_with_client())
22 changes: 14 additions & 8 deletions indico/client/client.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
# -*- coding: utf-8 -*-

from typing import Union, Optional
import asyncio
import time
from typing import Optional, Union

import urllib3

from indico.config import IndicoConfig
from indico.errors import IndicoError
from indico.http.client import HTTPClient, AIOHTTPClient
from indico.client.request import (
GraphQLRequest,
HTTPRequest,
RequestChain,
PagedRequest,
GraphQLRequest,
RequestChain,
)
from indico.config import IndicoConfig
from indico.errors import IndicoError
from indico.client.request import Delay
from indico.http.client import AIOHTTPClient, HTTPClient


class IndicoClient:
Expand Down Expand Up @@ -47,7 +51,8 @@ def _handle_request_chain(self, chain: RequestChain):
elif isinstance(request, RequestChain):
response = self._handle_request_chain(request)
chain.previous = response

elif isinstance(request, Delay):
time.sleep(request.seconds)
if chain.result:
return chain.result
return response
Expand Down Expand Up @@ -147,7 +152,8 @@ async def _handle_request_chain(self, chain: RequestChain):
elif isinstance(request, RequestChain):
response = await self._handle_request_chain(request)
chain.previous = response

elif isinstance(request, Delay):
await asyncio.sleep(request.seconds)
if chain.result:
return chain.result
return response
Expand Down
19 changes: 5 additions & 14 deletions indico/client/request.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Dict, Any
from enum import Enum
from typing import Any, Dict, Union

from indico.errors import IndicoRequestError
import time


class HTTPMethod(Enum):
Expand Down Expand Up @@ -89,15 +89,6 @@ def requests(self):
pass


class Debouncer:
def __init__(self, max_timeout: int = 5):
self.timeout = 0
self.max_timeout = max_timeout or 5 # prevent None and 0

def backoff(self):
self.increment_timeout()
time.sleep(self.timeout)

def increment_timeout(self):
if self.timeout < self.max_timeout:
self.timeout += 1
class Delay:
def __init__(self, seconds: Union[int, float] = 2):
self.seconds = seconds
54 changes: 30 additions & 24 deletions indico/queries/datasets.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,32 @@
# -*- coding: utf-8 -*-

import json
import jsons
import tempfile
from pathlib import Path
from typing import List, Union, Dict, Optional
from typing import Dict, List, Optional, Union

import pandas as pd
import deprecation
import jsons
import pandas as pd

from indico.client.request import (
Debouncer,
Delay,
GraphQLRequest,
HTTPMethod,
HTTPRequest,
PagedRequest,
RequestChain,
)
from indico.errors import IndicoNotFound, IndicoInputError
from indico.errors import IndicoInputError, IndicoNotFound
from indico.filters import DatasetFilter
from indico.queries.storage import UploadBatched, UploadImages
from indico.types.dataset import (
Dataset,
OcrEngine,
OcrInputLanguage,
OmnipageOcrOptionsInput,
ReadApiOcrOptionsInput,
OcrInputLanguage,
)
from indico.filters import DatasetFilter


class ListDatasets(PagedRequest):
Expand Down Expand Up @@ -196,12 +196,17 @@ class CreateDataset(RequestChain):
Create a dataset and upload the associated files.
Args:
name (str): Name of the dataset
files (List[str]): List of pathnames to the dataset files
Options:
dataset_type (str): Type of dataset to create [TEXT, DOCUMENT, IMAGE]
wait (bool, default=True): Wait for the dataset to upload and finish
name (str): Name of the dataset.
files (List[str]): List of path names to the dataset files.
wait (bool, optional): Wait for the dataset to upload and finish. Defaults to True.
dataset_type (str, optional): Type of dataset to create [TEXT, DOCUMENT, IMAGE]. Defaults to TEXT.
from_local_images (bool, optional): Flag whether files are local images or not. Defaults to False.
image_filename_col (str, optional): Image filename column. Defaults to 'filename'.
batch_size (int, optional): Size of file batch to upload at a time. Defaults to 20.
ocr_engine (OcrEngine, optional): Specify an OCR engine [OMNIPAGE, READAPI, READAPI_V2, READAPI_TABLES_V1]. Defaults to None.
omnipage_ocr_options (OmnipageOcrOptionsInput, optional): If using Omnipage, specify Omnipage OCR options. Defaults to None.
read_api_ocr_options: (ReadApiOcrOptionsInput, optional): If using ReadAPI, specify ReadAPI OCR options. Defaults to None.
request_interval (int or float, optional): The maximum time in between retry calls when waiting. Defaults to 5 seconds.
Returns:
Dataset object
Expand All @@ -222,6 +227,7 @@ def __init__(
ocr_engine: OcrEngine = None,
omnipage_ocr_options: OmnipageOcrOptionsInput = None,
read_api_ocr_options: ReadApiOcrOptionsInput = None,
request_interval: Union[int, float] = 5,
):
self.files = files
self.name = name
Expand All @@ -233,6 +239,7 @@ def __init__(
self.ocr_engine = ocr_engine
self.omnipage_ocr_options = omnipage_ocr_options
self.read_api_ocr_options = read_api_ocr_options
self.request_interval = request_interval
if omnipage_ocr_options is not None and read_api_ocr_options is not None:
raise IndicoInputError(
"Must supply either omnipage or readapi options but not both."
Expand Down Expand Up @@ -278,13 +285,12 @@ def requests(self):
)
dataset_id = self.previous.id
yield GetDatasetFileStatus(id=dataset_id)
debouncer = Debouncer()
if self.wait is True:
while not all(
[f.status in ["PROCESSED", "FAILED"] for f in self.previous.files]
):
yield GetDatasetFileStatus(id=dataset_id)
debouncer.backoff()
yield Delay(seconds=self.request_interval)
yield GetDataset(id=dataset_id)


Expand Down Expand Up @@ -475,12 +481,11 @@ def requests(self):
)
yield GetDatasetFileStatus(id=self.dataset_id)
if self.wait:
debouncer = Debouncer()
while not all(
f.status in self.expected_statuses for f in self.previous.files
):
yield GetDatasetFileStatus(id=self.previous.id)
debouncer.backoff()
yield Delay()


# Alias for backwards compatibility
Expand Down Expand Up @@ -538,9 +543,10 @@ class ProcessFiles(RequestChain):
Process files associated with a dataset and add corresponding data to the dataset
Args:
dataset_id (int): ID of the dataset
datafile_ids (List[str]): IDs of the datafiles to process
wait (bool): Block while polling for status of files
dataset_id (int): ID of the dataset.
datafile_ids (List[str]): IDs of the datafiles to process.
wait (bool, optional): Block while polling for status of files. Defaults to True.
request_interval (int or float, optional): The maximum time in between retry calls when waiting. Defaults to 5 seconds.
Returns:
Expand All @@ -552,21 +558,22 @@ def __init__(
dataset_id: int,
datafile_ids: List[int],
wait: bool = True,
request_interval: Union[int, float] = 5,
):
self.dataset_id = dataset_id
self.datafile_ids = datafile_ids
self.wait = wait
self.request_interval = request_interval

def requests(self):
yield _ProcessFiles(self.dataset_id, self.datafile_ids)
debouncer = Debouncer()
yield GetDatasetFileStatus(id=self.dataset_id)
if self.wait:
while not all(
f.status in ["PROCESSED", "FAILED"] for f in self.previous.files
):
yield GetDatasetFileStatus(id=self.dataset_id)
debouncer.backoff()
yield Delay(seconds=self.request_interval)


@deprecation.deprecated(
Expand All @@ -593,14 +600,13 @@ def __init__(self, dataset_id: int, datafile_ids: List[int], wait: bool = True):

def requests(self):
yield _ProcessCSV(self.dataset_id, self.datafile_ids)
debouncer = Debouncer()
yield GetDatasetFileStatus(id=self.dataset_id)
if self.wait:
while not all(
f.status in ["PROCESSED", "FAILED"] for f in self.previous.files
):
yield GetDatasetFileStatus(id=self.dataset_id)
debouncer.backoff()
yield Delay()


class GetAvailableOcrEngines(GraphQLRequest):
Expand Down
49 changes: 31 additions & 18 deletions indico/queries/export.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
import pandas as pd
import io
import warnings
from typing import List, Union

from indico.client import GraphQLRequest, RequestChain, Debouncer
import pandas as pd

from indico.client import Delay, GraphQLRequest, RequestChain
from indico.errors import IndicoNotFound, IndicoRequestError
from indico.types.export import LabelResolutionStrategy, Export
from indico.queries.storage import RetrieveStorageObject
from indico.types.export import Export, LabelResolutionStrategy


class _CreateExport(GraphQLRequest):
query = """
mutation CreateExport(
$datasetId: Int!,
$labelsetId: Int!,
$columnIds: [Int],
$columnIds: [Int],
$modelIds: [Int],
$frozenLabelsetIds: [Int],
$combineLabels: LabelResolutionStrategy,
Expand Down Expand Up @@ -55,7 +57,16 @@ def __init__(
combine_labels: LabelResolutionStrategy = LabelResolutionStrategy.ALL.name,
file_info: bool = None,
anonymoous: bool = None,
anonymous: bool = None,
):
if anonymoous:
warnings.warn(
"Argument anonymoous is deprecated and will be removed in future versions. Use argument anonymous instead."
)
if anonymous:
raise IndicoRequestError("Cannot use both anonymoous and anonymous.")
else:
anonymous = anonymoous
super().__init__(
self.query,
variables={
Expand All @@ -66,7 +77,7 @@ def __init__(
"frozenLabelsetIds": frozen_labelset_ids,
"combineLabels": combine_labels,
"fileInfo": file_info,
"anonymous": anonymoous,
"anonymous": anonymous,
},
)

Expand All @@ -93,7 +104,7 @@ class GetExport(GraphQLRequest):
exports {
id
datasetId
name
name
status
columnIds
labelsetId
Expand Down Expand Up @@ -165,15 +176,16 @@ class CreateExport(RequestChain):
Create an export job for a dataset.
Args:
dataset_id (int): Dataset to create the export for
labelset_id (int): Labelset column id to export
column_ids (List(int)): Data column ids to export
model_ids (List(int)): Model ids to include predictions from
frozen_labelset_ids: (List(int)): frozen labelset ids to limit examples by
combine_labels (LabelResolutionStrategy): One row per example, combine labels from multiple labels into a single row
file_info (bool): Include datafile information
anonymous (bool): Anonymize user information
wait (bool): Wait for the export to complete. Default is True
dataset_id (int): Dataset to create the export for.
labelset_id (int): Labelset column id to export.
column_ids (List(int), optional): Data column ids to export. Defaults to None.
model_ids (List(int), optional): Model ids to include predictions from. Defaults to None.
frozen_labelset_ids: (List(int), optional): frozen labelset ids to limit examples by. Defaults to None.
combine_labels (LabelResolutionStrategy, optional): One row per example, combine labels from multiple labels into a single row. Defaults to 'all'.
file_info (bool, optional): Include datafile information. Defaults to False.
anonymous (bool, optional): Anonymize user information. Defaults to False.
wait (bool, optional): Wait for the export to complete. Defaults to True.
request_interval (int or float, optional): The maximum time in between retry calls when waiting. Defaults to 5 seconds.
Returns:
Export object
Expand All @@ -193,6 +205,7 @@ def __init__(
file_info: bool = False,
anonymous: bool = False,
wait: bool = True,
request_interval: Union[int, float] = 5,
):
self.dataset_id = dataset_id
self.labelset_id = labelset_id
Expand All @@ -203,6 +216,7 @@ def __init__(
self.file_info = file_info
self.anonymous = anonymous
self.wait = wait
self.request_interval = request_interval
super().__init__()

def requests(self):
Expand All @@ -214,12 +228,11 @@ def requests(self):
frozen_labelset_ids=self.frozen_labelset_ids,
combine_labels=self.combine_labels,
file_info=self.file_info,
anonymoous=self.anonymous,
anonymous=self.anonymous,
)
debouncer = Debouncer()
if self.wait is True:
while self.previous.status not in ["COMPLETE", "FAILED"]:
yield GetExport(self.previous.id)
debouncer.backoff()
yield Delay(seconds=self.request_interval)

yield GetExport(self.previous.id)
Loading

0 comments on commit 1d5ea16

Please sign in to comment.