From 241bba50e6a0f9046a6fb0b658e0f6307b20adc1 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Mon, 16 Sep 2024 10:46:58 -0700 Subject: [PATCH] Verify .map can run parallel classes (#802) * Add integration test confirming it works. * Add documentation explaining how. * Add support for viewing the readthedocs documentation before committing. --- .gitignore | 1 + Makefile | 8 +++ docs/Makefile | 8 ++- lib/sycamore/sycamore/docset.py | 5 ++ .../tests/integration/transforms/test_map.py | 52 +++++++++++++++++++ lib/sycamore/sycamore/transforms/map.py | 13 +++++ 6 files changed, 86 insertions(+), 1 deletion(-) create mode 100644 lib/sycamore/sycamore/tests/integration/transforms/test_map.py diff --git a/.gitignore b/.gitignore index 8761c0701..ee830136a 100644 --- a/.gitignore +++ b/.gitignore @@ -39,3 +39,4 @@ luna_traces/ traces/ apps/jupyter/bind_dir/poetry_cache apps/query-ui/cache_dir +docs/build diff --git a/Makefile b/Makefile index eb4d13603..711e2f913 100644 --- a/Makefile +++ b/Makefile @@ -1,3 +1,8 @@ +help: + @echo "make all -- make all in ${DIRS}" + @echo "make clean -- make clean in ${DIRS}" + @echo "make serve-docs -- serve the sycamore docs at http://localhost:8000/" + DIRS := apps .PHONY: $(DIRS) @@ -10,3 +15,6 @@ subdir-all-%: subdir-clean-%: $(MAKE) -C $* clean + +serve-docs: + (cd docs && make serve-docs) diff --git a/docs/Makefile b/docs/Makefile index 59f4366ed..5e493983d 100644 --- a/docs/Makefile +++ b/docs/Makefile @@ -10,9 +10,15 @@ BUILDDIR = build # Put it first so that "make" without argument is like "make help". help: + @echo "make serve-docs -- make and serve the docs on http://localhost:8000/" @$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) -.PHONY: help Makefile +serve-docs: + make html + (cd build/html && poetry run python -m http.server) + + +.PHONY: help serve-docs Makefile # Catch-all target: route all unknown targets to Sphinx using the new # "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS). diff --git a/lib/sycamore/sycamore/docset.py b/lib/sycamore/sycamore/docset.py index 6ee3d8c05..fa87ffdb8 100644 --- a/lib/sycamore/sycamore/docset.py +++ b/lib/sycamore/sycamore/docset.py @@ -860,6 +860,7 @@ def map(self, f: Callable[[Document], Document], **resource_args) -> "DocSet": Args: f: The function to apply to each document. + See the :class:`~sycamore.transforms.map.Map` documentation for advanced features. """ from sycamore.transforms import Map @@ -873,6 +874,8 @@ def flat_map(self, f: Callable[[Document], list[Document]], **resource_args) -> Args: f: The function to apply to each document. + See the :class:`~sycamore.transforms.map.FlatMap` documentation for advanced features. + Example: .. code-block:: python @@ -990,6 +993,8 @@ def map_batch( The map_batch transform is similar to map, except that it processes a list of documents and returns a list of documents. map_batch is ideal for transformations that get performance benefits from batching. + See the :class:`~sycamore.transforms.map.MapBatch` documentation for advanced features. + Example: .. code-block:: python diff --git a/lib/sycamore/sycamore/tests/integration/transforms/test_map.py b/lib/sycamore/sycamore/tests/integration/transforms/test_map.py new file mode 100644 index 000000000..41bbf2efc --- /dev/null +++ b/lib/sycamore/sycamore/tests/integration/transforms/test_map.py @@ -0,0 +1,52 @@ +import sycamore +import logging +import math +import time +import uuid +from sycamore.data import Document +from ray.data import ActorPoolStrategy + + +def make_docs(num): + docs = [] + for i in range(num): + doc = Document({"doc_id": f"doc_{i}"}) + docs.append(doc) + + return docs + + +ctx = sycamore.init() + + +def test_map_class_parallelism(): + class AgentMark: + def __init__(self): + self.id = uuid.uuid4() + logging.error("Start AgentMark {self.id}") + + def __call__(self, d): + logging.error(f"Call AgentMark {self.id} on {d.doc_id}") + time.sleep(1) + d.properties["agent"] = self.id + return d + + num_actors = 4 + num_docs = 20 + docs = ctx.read.document(make_docs(num_docs)).map(AgentMark, compute=ActorPoolStrategy(size=num_actors)).take() + + count = {} + for d in docs: + a = d.properties["agent"] + count[a] = count.get(a, 0) + 1 + + assert len(count) == num_actors + # Provide +-1 slop on perfectly even distribution. + # given the sleep we probably will get perfect distribution + min_count = math.floor(num_docs / num_actors - 1) + max_count = math.ceil(num_docs / num_actors + 1) + print("Expecting count to be between {min_count} and {max_count}") + for a in count: + print(f"Actor {a} got {count[a]} items") + assert count[a] >= min_count + assert count[a] <= max_count diff --git a/lib/sycamore/sycamore/transforms/map.py b/lib/sycamore/sycamore/transforms/map.py index 087568cc3..b2776ecad 100644 --- a/lib/sycamore/sycamore/transforms/map.py +++ b/lib/sycamore/sycamore/transforms/map.py @@ -21,6 +21,15 @@ class Map(BaseMapTransform): # option 2: docset.map(f, args=my_args, kwargs=my_kwargs) + If f is a class type, when using ray execution, the class will be mapped to an agent that + will be instantiated a fixed number of times. By default that will be once, but you can + change that with: + .. code-block:: python + + from ray.data import ActorPoolStrategy + + ctx.map(ExampleClass, compute=ActorPoolStrategy(size=num_actors)) + Example: .. code-block:: python @@ -72,6 +81,8 @@ class FlatMap(BaseMapTransform): FlatMap is a transformation class for applying a callable function to each document in a dataset and flattening the resulting list of documents. + See :class:`Map` for additional arguments that can be specified and the option for the type of f. + Example: .. code-block:: python @@ -127,6 +138,8 @@ class MapBatch(BaseMapTransform): The MapBatch transform is similar to Map, except that it processes a list of documents and returns a list of documents. MapBatches is ideal for transformations that get performance benefits from batching. + See :class:`Map` for additional arguments that can be specified and the option for the type of f. + Example: .. code-block:: python