Skip to content

Commit

Permalink
Get Local Mode working 2/n (#861)
Browse files Browse the repository at this point in the history
* Switch examples/markdown.py and examples/s3_ingest.py over to local mode.
* Fix s3_ingest to have the correct path adjustment. Fix it to only setup the filesystem
  if the AWS env vars are not present (enabled verifying that auto-fs detection worked)
  Switch it from taking a manifest to taking a path. I was unable to find an example of using
  the manifest in files, and given the python path bug, it means no one was using the script.
* Fix file scan to be able to work when the filesystem isn't specified.
* Add logging for when we run steps in local mode -- it was hard to debug if things were
  having problems or just being slow.
* docset.py, regex_replace.py: Fix escape sequences in docstrings that get warnings on 3.12.
  • Loading branch information
eric-anderson authored Oct 2, 2024
1 parent 1d348c4 commit 44137bf
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 21 deletions.
2 changes: 1 addition & 1 deletion examples/markdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from sycamore.transforms.partition import ArynPartitioner

docs = (
sycamore.init()
sycamore.init(exec_mode=sycamore.EXEC_LOCAL)
.read.binary(sys.argv[1:], binary_format="pdf")
.partition(partitioner=ArynPartitioner(extract_table_structure=True, use_partitioning_service=False))
.markdown()
Expand Down
35 changes: 21 additions & 14 deletions examples/s3_ingest.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import sys
import boto3
import pyarrow.fs
import os


# ruff: noqa: E402
sys.path.append("../sycamore")
sys.path.append("../lib/sycamore")

import sycamore
from sycamore.functions.tokenizer import HuggingFaceTokenizer
from sycamore.llms import OpenAIModels, OpenAI
from sycamore.connectors.file.file_scan import JsonManifestMetadataProvider
from sycamore.transforms import COALESCE_WHITESPACE
from sycamore.transforms.merge_elements import MarkedMerger
from sycamore.transforms.partition import UnstructuredPdfPartitioner
Expand All @@ -17,27 +18,33 @@

from simple_config import idx_settings, osrch_args, title_template

manifest = sys.argv[1]
if len(sys.argv) <= 1 or sys.argv[1] == "-h" or sys.argv[1] == "--help":
print("Usage: poetry run python s3_ingest.py s3://<something> [s3://another-thing ...]")
exit(1)

index = "demoindex0"

sess = boto3.session.Session()
cred = sess.get_credentials()
assert cred is not None
fsys = pyarrow.fs.S3FileSystem(
access_key=cred.access_key,
secret_key=cred.secret_key,
region=sess.region_name,
session_token=cred.token,
)
if "AWS_SECRET_ACCESS_KEY" in os.environ:
fsys = None
else:
print("Attempting to get S3 Credentials")
sess = boto3.session.Session()
cred = sess.get_credentials()
assert cred is not None
fsys = pyarrow.fs.S3FileSystem(
access_key=cred.access_key,
secret_key=cred.secret_key,
region=sess.region_name,
session_token=cred.token,
)

davinci_llm = OpenAI(OpenAIModels.GPT_3_5_TURBO_INSTRUCT.value)
tokenizer = HuggingFaceTokenizer("thenlper/gte-small")

ctx = sycamore.init()
ctx = sycamore.init(exec_mode=sycamore.EXEC_LOCAL)

ds = (
ctx.read.manifest(metadata_provider=JsonManifestMetadataProvider(manifest), binary_format="pdf", filesystem=fsys)
ctx.read.binary(sys.argv[1:], binary_format="pdf", filesystem=fsys)
.partition(partitioner=UnstructuredPdfPartitioner())
.regex_replace(COALESCE_WHITESPACE)
.extract_entity(entity_extractor=OpenAIEntityExtractor("title", llm=davinci_llm, prompt_template=title_template))
Expand Down
7 changes: 4 additions & 3 deletions lib/sycamore/sycamore/connectors/file/file_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import uuid
import logging

from pyarrow.fs import FileSystem, LocalFileSystem, FileSelector
from pyarrow.fs import FileSystem, FileSelector
from sycamore.data import Document
from sycamore.plan_nodes import Scan
from sycamore.utils.time_trace import timetrace
Expand Down Expand Up @@ -169,8 +169,7 @@ def local_source(self, **kwargs) -> list[Document]:
paths = [self._paths]
else:
paths = self._paths
if not self._filesystem:
self._filesystem = LocalFileSystem()

documents = []

def process_file(info):
Expand Down Expand Up @@ -200,6 +199,8 @@ def process_file(info):
from sycamore.utils.pyarrow import cross_check_infer_fs

(filesystem, path) = cross_check_infer_fs(self._filesystem, orig_path)
if self._filesystem is None:
self._filesystem = filesystem

path_info = filesystem.get_file_info(path)
if path_info.is_file:
Expand Down
2 changes: 1 addition & 1 deletion lib/sycamore/sycamore/docset.py
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,7 @@ def regex_replace(self, spec: list[tuple[str, str]], **kwargs) -> "DocSet":
ds = context.read.binary(paths, binary_format="pdf")
.partition(partitioner=ArynPartitioner())
.regex_replace(COALESCE_WHITESPACE)
.regex_replace([(r"\d+", "1313"), (r"old", "new")])
.regex_replace([(r"\\d+", "1313"), (r"old", "new")])
.explode()
"""
from sycamore.transforms import RegexReplace
Expand Down
18 changes: 17 additions & 1 deletion lib/sycamore/sycamore/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
from sycamore.plan_nodes import Node


logger = logging.getLogger(__name__)


def _ray_logging_setup():
# The commented out lines allow for easier testing that logging is working correctly since
# they will emit information at the start.
Expand Down Expand Up @@ -135,15 +138,28 @@ def visit(n):
def recursive_execute(self, n: Node) -> list[Document]:
from sycamore.materialize import Materialize

def get_name(f):
if hasattr(f, "_name"):
return f._name # handle the case of basemap transforms

if hasattr(f, "__name__"):
return f.__name__

return f.__class__.__name__

if len(n.children) == 0:
assert hasattr(n, "local_source"), f"Source {n} needs a local_source method"
logger.info(f"Executing source {get_name(n)}")
return n.local_source()
if isinstance(n, Materialize) and n._will_be_source():
logger.info(f"Reading from materialized source {get_name(n)}")
return n.local_source()
if len(n.children) == 1:
assert hasattr(n, "local_execute"), f"Transform {n.__class__.__name__} needs a local_execute method"
assert n.children[0] is not None
return n.local_execute(self.recursive_execute(n.children[0]))
d = self.recursive_execute(n.children[0])
logger.info(f"Executing node {get_name(n)}")
return n.local_execute(d)

assert f"Unable to handle node {n} with multiple children"
return []
2 changes: 1 addition & 1 deletion lib/sycamore/sycamore/transforms/regex_replace.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class RegexReplace(SingleThreadUser, NonGPUUser, Map):
Example:
.. code-block:: python
rr = RegexReplace(child=node, spec=[(r"\s+", " "), (r"^ ", "")])
rr = RegexReplace(child=node, spec=[(r"\\s+", " "), (r"^ ", "")])
dataset = rr.execute()
"""

Expand Down

0 comments on commit 44137bf

Please sign in to comment.