Skip to content

Commit

Permalink
Verify .map can run parallel classes (#802)
Browse files Browse the repository at this point in the history
* Add integration test confirming it works.
* Add documentation explaining how.
* Add support for viewing the readthedocs documentation before committing.
  • Loading branch information
eric-anderson authored Sep 16, 2024
1 parent baedef8 commit 241bba5
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 1 deletion.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ luna_traces/
traces/
apps/jupyter/bind_dir/poetry_cache
apps/query-ui/cache_dir
docs/build
8 changes: 8 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
help:
@echo "make all -- make all in ${DIRS}"
@echo "make clean -- make clean in ${DIRS}"
@echo "make serve-docs -- serve the sycamore docs at http://localhost:8000/"

DIRS := apps
.PHONY: $(DIRS)

Expand All @@ -10,3 +15,6 @@ subdir-all-%:

subdir-clean-%:
$(MAKE) -C $* clean

serve-docs:
(cd docs && make serve-docs)
8 changes: 7 additions & 1 deletion docs/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,15 @@ BUILDDIR = build

# Put it first so that "make" without argument is like "make help".
help:
@echo "make serve-docs -- make and serve the docs on http://localhost:8000/"
@$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)

.PHONY: help Makefile
serve-docs:
make html
(cd build/html && poetry run python -m http.server)


.PHONY: help serve-docs Makefile

# Catch-all target: route all unknown targets to Sphinx using the new
# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
Expand Down
5 changes: 5 additions & 0 deletions lib/sycamore/sycamore/docset.py
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,7 @@ def map(self, f: Callable[[Document], Document], **resource_args) -> "DocSet":
Args:
f: The function to apply to each document.
See the :class:`~sycamore.transforms.map.Map` documentation for advanced features.
"""
from sycamore.transforms import Map

Expand All @@ -873,6 +874,8 @@ def flat_map(self, f: Callable[[Document], list[Document]], **resource_args) ->
Args:
f: The function to apply to each document.
See the :class:`~sycamore.transforms.map.FlatMap` documentation for advanced features.
Example:
.. code-block:: python
Expand Down Expand Up @@ -990,6 +993,8 @@ def map_batch(
The map_batch transform is similar to map, except that it processes a list of documents and returns a list of
documents. map_batch is ideal for transformations that get performance benefits from batching.
See the :class:`~sycamore.transforms.map.MapBatch` documentation for advanced features.
Example:
.. code-block:: python
Expand Down
52 changes: 52 additions & 0 deletions lib/sycamore/sycamore/tests/integration/transforms/test_map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import sycamore
import logging
import math
import time
import uuid
from sycamore.data import Document
from ray.data import ActorPoolStrategy


def make_docs(num):
docs = []
for i in range(num):
doc = Document({"doc_id": f"doc_{i}"})
docs.append(doc)

return docs


ctx = sycamore.init()


def test_map_class_parallelism():
class AgentMark:
def __init__(self):
self.id = uuid.uuid4()
logging.error("Start AgentMark {self.id}")

def __call__(self, d):
logging.error(f"Call AgentMark {self.id} on {d.doc_id}")
time.sleep(1)
d.properties["agent"] = self.id
return d

num_actors = 4
num_docs = 20
docs = ctx.read.document(make_docs(num_docs)).map(AgentMark, compute=ActorPoolStrategy(size=num_actors)).take()

count = {}
for d in docs:
a = d.properties["agent"]
count[a] = count.get(a, 0) + 1

assert len(count) == num_actors
# Provide +-1 slop on perfectly even distribution.
# given the sleep we probably will get perfect distribution
min_count = math.floor(num_docs / num_actors - 1)
max_count = math.ceil(num_docs / num_actors + 1)
print("Expecting count to be between {min_count} and {max_count}")
for a in count:
print(f"Actor {a} got {count[a]} items")
assert count[a] >= min_count
assert count[a] <= max_count
13 changes: 13 additions & 0 deletions lib/sycamore/sycamore/transforms/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ class Map(BaseMapTransform):
# option 2:
docset.map(f, args=my_args, kwargs=my_kwargs)
If f is a class type, when using ray execution, the class will be mapped to an agent that
will be instantiated a fixed number of times. By default that will be once, but you can
change that with:
.. code-block:: python
from ray.data import ActorPoolStrategy
ctx.map(ExampleClass, compute=ActorPoolStrategy(size=num_actors))
Example:
.. code-block:: python
Expand Down Expand Up @@ -72,6 +81,8 @@ class FlatMap(BaseMapTransform):
FlatMap is a transformation class for applying a callable function to each document in a dataset and flattening
the resulting list of documents.
See :class:`Map` for additional arguments that can be specified and the option for the type of f.
Example:
.. code-block:: python
Expand Down Expand Up @@ -127,6 +138,8 @@ class MapBatch(BaseMapTransform):
The MapBatch transform is similar to Map, except that it processes a list of documents and returns a list of
documents. MapBatches is ideal for transformations that get performance benefits from batching.
See :class:`Map` for additional arguments that can be specified and the option for the type of f.
Example:
.. code-block:: python
Expand Down

0 comments on commit 241bba5

Please sign in to comment.