From 72c33ae414a50caeb4590e1c49486e9747240e7b Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Wed, 28 Aug 2024 16:57:13 -0700 Subject: [PATCH] Fix materialize + S3 not working. (#734) * Fix materialize + S3 not working. Existing code was relying on python pathlib to do most operations rather than running them through the pyarrow filesystem. Fix that, and add a test with a fake pyarrow filesystem to make sure we run with a path that does not exist in the local filesystem. * Also make AutoMaterialize work with pyarrow non-local filesystems. --- lib/sycamore/sycamore/executor.py | 4 + lib/sycamore/sycamore/materialize.py | 126 ++++++++++++------ .../sycamore/tests/unit/inmempyarrowfs.py | 96 +++++++++++++ .../sycamore/tests/unit/test_materialize.py | 84 ++++++++++-- 4 files changed, 255 insertions(+), 55 deletions(-) create mode 100644 lib/sycamore/sycamore/tests/unit/inmempyarrowfs.py diff --git a/lib/sycamore/sycamore/executor.py b/lib/sycamore/sycamore/executor.py index 1c5a3d795..384c0ed41 100644 --- a/lib/sycamore/sycamore/executor.py +++ b/lib/sycamore/sycamore/executor.py @@ -88,9 +88,13 @@ def execute_iter(self, plan: Node, **kwargs) -> Iterable[Document]: plan.traverse(visit=lambda n: n.finalize()) def recursive_execute(self, n: Node) -> list[Document]: + from sycamore.materialize import Materialize + if len(n.children) == 0: assert hasattr(n, "local_source"), f"Source {n} needs a local_source method" return n.local_source() + if isinstance(n, Materialize) and n._will_be_source(): + return n.local_source() if len(n.children) == 1: assert hasattr(n, "local_execute"), f"Transform {n.__class__.__name__} needs a local_execute method" assert n.children[0] is not None diff --git a/lib/sycamore/sycamore/materialize.py b/lib/sycamore/sycamore/materialize.py index 154c16cea..dea940680 100644 --- a/lib/sycamore/sycamore/materialize.py +++ b/lib/sycamore/sycamore/materialize.py @@ -17,6 +17,33 @@ logger = logging.getLogger("__name__") +class _PyArrowFsHelper: + def __init__(self, fs: "pyarrow.FileSystem"): + self._fs = fs + + def list_files(self, path): + from pyarrow.fs import FileSelector + + return self._fs.get_file_info(FileSelector(str(path), allow_not_found=True, recursive=True)) + + def file_exists(self, path: Path) -> bool: + from pyarrow.fs import FileType + + info = self._fs.get_file_info(str(path)) + return info.type == FileType.File + + def safe_cleanup(self, path) -> None: + # materialize dirs should be non-hierarchical, minimize the chance that + # mis-use will delete unexpected files. + plen = len(str(path)) + 1 + for fi in self.list_files(path): + assert "/" not in fi.path[plen:], f"Refusing to clean {path}. Found unexpected hierarchical file {fi.path}" + + logging.info(f"Cleaning up any materialized files in {path}") + self._fs.delete_dir_contents(str(path), missing_dir_ok=True) + self._fs.create_dir(str(path)) + + class Materialize(UnaryNode): def __init__( self, @@ -33,6 +60,7 @@ def __init__( pass elif isinstance(path, str) or isinstance(path, Path): (self._fs, self._root) = self.infer_fs(str(path)) + self._fshelper = _PyArrowFsHelper(self._fs) self._doc_to_name = self.doc_to_name self._doc_to_binary = Document.serialize self._clean_root = True @@ -43,6 +71,7 @@ def __init__( self._fs = path["fs"] else: (self._fs, self._root) = self.infer_fs(str(self._root)) + self._fshelper = _PyArrowFsHelper(self._fs) self._doc_to_name = path.get("name", self.doc_to_name) self._doc_to_binary = path.get("tobin", Document.serialize) assert callable(self._doc_to_name) @@ -58,17 +87,19 @@ def __init__( assert self._clean_root, "Using materialize in source mode requires cleaning the root" self._source_mode = source_mode + self._executed_child = False super().__init__(child, **kwargs) def execute(self, **kwargs) -> "Dataset": logger.debug("Materialize execute") if self._source_mode == MaterializeSourceMode.IF_PRESENT: - success = self._success_path().exists() + success = self._fshelper.file_exists(self._success_path()) if success or len(self.children) == 0: logger.info(f"Using {self._root} as cached source of data") - self._verify_has_files() + self._executed_child = False if not success: + self._verify_has_files() logging.warning(f"materialize.success not found in {self._root}. Returning partial data") from ray.data import read_binary_files @@ -77,6 +108,7 @@ def execute(self, **kwargs) -> "Dataset": return files.map(self._ray_to_document) + self._executed_child = True # right now, no validation happens, so save data in parallel. Once we support validation # to support retries we won't be able to run the validation in parallel. non-shared # filesystems will also eventually be a problem but we can put it off for now. @@ -97,12 +129,13 @@ def ray_callable(ray_input: dict[str, numpy.ndarray]) -> dict[str, numpy.ndarray return input_dataset def _verify_has_files(self) -> None: + assert self._root is not None - if not self._root.is_dir(): - raise ValueError(f"Materialize root {self._root} is not a directory") + assert self._fs is not None - for n in self._root.iterdir(): - if str(n).endswith(".pickle"): + files = self._fshelper.list_files(self._root) + for n in files: + if n.path.endswith(".pickle"): return raise ValueError(f"Materialize root {self._root} has no .pickle files") @@ -110,9 +143,15 @@ def _verify_has_files(self) -> None: def _ray_to_document(self, dict: dict[str, Any]) -> dict[str, bytes]: return {"doc": dict["bytes"]} + def _will_be_source(self) -> bool: + return self._source_mode == MaterializeSourceMode.IF_PRESENT and self._fshelper.file_exists( + self._success_path() + ) + def local_execute(self, docs: list[Document]) -> list[Document]: if self._source_mode == MaterializeSourceMode.IF_PRESENT: - if self._success_path().exists(): + if self._fshelper.file_exists(self._success_path()): + self._executed_child = False logger.info(f"Using {self._root} as cached source of data") return self.local_source() @@ -121,6 +160,7 @@ def local_execute(self, docs: list[Document]) -> list[Document]: self.cleanup() for d in docs: self.save(d) + self._executed_child = True return docs @@ -128,13 +168,15 @@ def local_source(self) -> list[Document]: assert self._root is not None self._verify_has_files() logger.info(f"Using {self._root} as cached source of data") - if not self._success_path().exists(): + if not self._fshelper.file_exists(self._success_path()): logging.warning(f"materialize.success not found in {self._root}. Returning partial data") ret = [] - for n in self._root.iterdir(): + for fi in self._fshelper.list_files(self._root): + n = Path(fi.path) if n.suffix == ".pickle": - with open(n, "rb") as f: - ret.append(Document.deserialize(f.read())) + f = self._fs.open_input_stream(str(n)) + ret.append(Document.deserialize(f.read())) + f.close() return ret @@ -142,9 +184,11 @@ def _success_path(self): return self._root / "materialize.success" def finalize(self): + if not self._executed_child: + return if self._root is not None: - self._success_path().touch() - assert self._success_path().exists() + self._fs.open_output_stream(str(self._success_path())).close() + assert self._fshelper.file_exists(self._success_path()) @staticmethod def infer_fs(path: str) -> "pyarrow.FileSystem": @@ -172,21 +216,20 @@ def save(self, doc: Document) -> None: assert self._root is not None name = self._doc_to_name(doc) path = self._root / name - if self._clean_root: - assert not path.exists(), f"Duplicate name {path} generated for clean root" + if self._clean_root and self._fshelper.file_exists(path): + raise ValueError(f"Duplicate name {path} generated for clean root") with self._fs.open_output_stream(str(path)) as out: out.write(bin) def cleanup(self) -> None: - if not self._clean_root: + if self._root is None: return - import shutil - - if self._root is None: + if not self._clean_root: + self._fs.create_dir(str(self._root)) return - shutil.rmtree(self._root, ignore_errors=True) - self._root.mkdir(parents=True, exist_ok=True) + + self._fshelper.safe_cleanup(self._root) @staticmethod def doc_to_name(doc: Document) -> str: @@ -224,19 +267,17 @@ def __init__(self, path: Union[str, Path, dict] = {}): super().__init__() if isinstance(path, str) or isinstance(path, Path): path = {"root": path} + else: + path = path.copy() if "clean" not in path: path["clean"] = True - self._path = path - self.directory = path.pop("root", None) + self._choose_directory(path) self._basename_to_count: dict[str, int] = {} def once(self, context, node): self._name_unique = set() node = node.traverse(after=self._naming_pass()) - - self._setup_directory() - node = node.traverse(visit=self._cleanup_pass()) node = node.traverse(before=self._wrap_pass(context)) @@ -273,20 +314,12 @@ def visit(node): materialize = node.properties["materialize"] materialize.pop("mark", None) - path = self.directory / materialize["name"] + path = self._directory / materialize["name"] - if path.exists(): - if not self._path["clean"]: - return - # auto-materialize dirs should be non-hierarchical, minimize the chance that - # mis-use will delete unexpected files. - for p in path.iterdir(): - assert not p.is_dir() + if not self._path["clean"]: + return - for p in path.iterdir(): - p.unlink() - else: - path.mkdir(parents=True) + self._fshelper.safe_cleanup(path) return visit @@ -306,24 +339,29 @@ def before(node): return node path = self._path.copy() - path["root"] = self.directory / materialize["name"] + path["root"] = self._directory / materialize["name"] materialize["mark"] = True materialize["count"] = materialize.get("count", 0) + 1 return Materialize(node, context, path=path) return before - def _setup_directory(self): + def _choose_directory(self, path): from pathlib import Path - if self.directory is None: + directory = path.pop("root", None) + self._path = path + + if directory is None: from datetime import datetime import tempfile now = datetime.now().replace(microsecond=0) dir = Path(tempfile.gettempdir()) / f"materialize.{now.isoformat()}" - self.directory = dir + directory = str(dir) logger.info(f"Materialize directory was not specified. Used {dir}") - if not isinstance(self.directory, Path): - self.directory = Path(self.directory) + (self._fs, self._directory) = Materialize.infer_fs(directory) + if "fs" in self._path: + self._fs = self._path["fs"] + self._fshelper = _PyArrowFsHelper(self._fs) diff --git a/lib/sycamore/sycamore/tests/unit/inmempyarrowfs.py b/lib/sycamore/sycamore/tests/unit/inmempyarrowfs.py new file mode 100644 index 000000000..720f2ec66 --- /dev/null +++ b/lib/sycamore/sycamore/tests/unit/inmempyarrowfs.py @@ -0,0 +1,96 @@ +import io + +from pyarrow.fs import FileSystem, FileSelector, FileInfo, FileType + + +class InMemPyArrowFileSystem(FileSystem): + def __init__(self): + self._fs = {} + + def copy_file(self, src, dest): + raise NotImplementedError() + + def create_dir(self, path, *, recursive=True): + assert isinstance(path, str) + # We're blob-like, so create dir is noop. + pass + + def delete_dir(self, path): + raise NotImplementedError() + + def delete_dir_contents(self, path, missing_dir_ok=False): + assert isinstance(path, str) + assert missing_dir_ok, "unimplemented" + path = path + "/" + todelete = [] + for k, v in self._fs.items(): + if k.startswith(path): + todelete.append(k) + + for k in todelete: + del self._fs[k] + + def delete_file(self, path): + assert isinstance(path, str) + assert path in self._fs + del self._fs[path] + + def equals(self, other): + raise NotImplementedError() + + def get_file_info(self, p): + if isinstance(p, str): + if p not in self._fs: + return FileInfo(str(p)) + + # API docs claim we can leave mtime & size as None + return FileInfo(str(p), type=FileType.File) + + assert isinstance(p, FileSelector) + assert p.allow_not_found, "unimplemented" + assert p.recursive, "unimplemented" + dir = p.base_dir + "/" + len(dir) + ret = [] + for k, v in self._fs.items(): + if not k.startswith(dir): + continue + ret.append(FileInfo(str(k), type=FileType.File)) + + return ret + + def move(self, src, dest): + raise NotImplementedError() + + def normalize_path(self, path): + raise NotImplementedError() + + def open_append_stream(self, path): + raise NotImplementedError() + + def open_input_file(self, path): + raise NotImplementedError() + + def open_input_stream(self, path): + assert isinstance(path, str) + assert path in self._fs + f = self._fs[path] + assert isinstance(f, bytes) + return io.BytesIO(f) + + def open_output_stream(self, path): + class OpenFile(io.BytesIO): + def __init__(self, fs, name): + self._fs = fs + self._name = name + super().__init__() + + def close(self): + assert isinstance(self._fs[self._name], OpenFile) + self._fs[self._name] = self.getvalue() + super().close() + + assert isinstance(path, str) + assert path not in self._fs, "overwrite unimplemented" + self._fs[path] = OpenFile(self._fs, path) + return self._fs[path] diff --git a/lib/sycamore/sycamore/tests/unit/test_materialize.py b/lib/sycamore/sycamore/tests/unit/test_materialize.py index 1f5432c20..a9b19c209 100644 --- a/lib/sycamore/sycamore/tests/unit/test_materialize.py +++ b/lib/sycamore/sycamore/tests/unit/test_materialize.py @@ -14,6 +14,7 @@ from sycamore.context import ExecMode from sycamore.data import Document, MetadataDocument from sycamore.materialize import AutoMaterialize, Materialize, MaterializeSourceMode +from sycamore.tests.unit.inmempyarrowfs import InMemPyArrowFileSystem def tobin(d): @@ -29,6 +30,8 @@ def __init__(self, ext): self.extension = ext def open_output_stream(self, path): + if path.endswith("/materialize.success"): + return super().open_output_stream(path) return super().open_output_stream(path + self.extension) @@ -183,16 +186,16 @@ def test_autodirname(self): ctx = sycamore.init(exec_mode=ExecMode.LOCAL, rewrite_rules=[a]) ctx.read.document(docs).map(noop_fn).execute() - files = [f for f in Path(a.directory).rglob("*")] + files = [f for f in Path(a._directory).rglob("*")] logging.info(f"Found {files}") assert len([f for f in files if "DocScan.0/doc" in str(f)]) == 3 assert len([f for f in files if "DocScan.0/md-" in str(f)]) == 1 assert len([f for f in files if "Map.0/doc" in str(f)]) == 3 assert len([f for f in files if "Map.0/md-" in str(f)]) == 2 - assert re.match(".*materialize\\.[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}", str(a.directory)) + assert re.match(".*materialize\\.[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}", str(a._directory)) finally: - if a.directory is not None: - shutil.rmtree(a.directory) + if a._directory is not None: + shutil.rmtree(a._directory) def test_dupnodename(self): docs = make_docs(3) @@ -241,7 +244,6 @@ def doc_to_name4(doc): ds.execute() files = [f for f in Path(tmpdir).rglob("*")] - logging.info(f"TO Found-1 {files}") assert len([f for f in files if ".test4" in str(f)]) == 3 + 1 + 3 + 2 for d in docs: @@ -250,24 +252,26 @@ def doc_to_name4(doc): ds.execute() files = [f for f in Path(tmpdir).rglob("*")] - logging.info(f"TO Found-2 {files}") assert len([f for f in files if "-dup" in str(f)]) == 3 + 3 assert len([f for f in files if ".test4" in str(f)]) == 2 * (3 + 1 + 3 + 2) a._path["clean"] = True ds.execute() files = [f for f in Path(tmpdir).rglob("*")] - logging.info(f"TO Found-3 {files}") assert len([f for f in files if ".test4" in str(f)]) == 3 + 1 + 3 + 2 +def any_id(d): + if isinstance(d, MetadataDocument): + return str(d.metadata) + else: + return d.doc_id + + def ids(docs): ret = [] for d in docs: - if isinstance(d, MetadataDocument): - ret.append(str(d.metadata)) - else: - ret.append(d.doc_id) + ret.append(any_id(d)) ret.sort() return ret @@ -331,3 +335,61 @@ def test_materialize_read(self): Path(tmpdir).rmdir() with pytest.raises(ValueError): ds.take_all() + + +class TestAllViaPyarrowFS(unittest.TestCase): + def test_simple(self): + fs = InMemPyArrowFileSystem() + ctx = sycamore.init(exec_mode=ExecMode.LOCAL) + docs = make_docs(3) + path = {"root": "/no/such/path", "fs": fs} + ctx.read.document(docs).materialize(path=path).execute() + docs_out = ctx.read.materialize(path).take_all() + assert docs.sort(key=any_id) == docs_out.sort(key=any_id) + + def test_clean(self): + fs = InMemPyArrowFileSystem() + ctx = sycamore.init(exec_mode=ExecMode.LOCAL) + docs = make_docs(3) + path = {"root": "/fake/inmem/no/such/path", "fs": fs} + fs.open_output_stream(path["root"] + "/fake.pickle").close() + ctx.read.document(docs).materialize(path=path).execute() + docs_out = ctx.read.materialize(path).take_all() + assert docs.sort(key=any_id) == docs_out.sort(key=any_id) + + def test_automaterialize(self): + fs = InMemPyArrowFileSystem() + path = {"root": "/fake/inmem/no/such/path", "fs": fs} + ctx = sycamore.init(exec_mode=ExecMode.LOCAL, rewrite_rules=[AutoMaterialize(path)]) + docs = make_docs(3) + docs_out = ctx.read.document(docs).materialize(path=path).take_all() + assert docs.sort(key=any_id) == docs_out.sort(key=any_id) + + def test_fail_if_hierarchy(self): + fs = InMemPyArrowFileSystem() + ctx = sycamore.init(exec_mode=ExecMode.LOCAL) + docs = make_docs(3) + path = {"root": "/fake/inmem/no/such/path", "fs": fs} + fs.open_output_stream(path["root"] + "/subdir/fake.pickle").close() + from sycamore.materialize import _PyArrowFsHelper + + fsh = _PyArrowFsHelper(fs) + + # Fail with explicit materialize + with pytest.raises(AssertionError): + ctx.read.document(docs).materialize(path=path).execute() + + ctx = sycamore.init(exec_mode=ExecMode.LOCAL, rewrite_rules=[AutoMaterialize(path)]) + pipeline = ctx.read.document(docs).materialize(path=path) + # Fail with auto-materialize + with pytest.raises(AssertionError): + pipeline.take_all() + + assert fsh.file_exists(path["root"] + "/subdir/fake.pickle") + + fs.open_output_stream(path["root"] + "/DocScan.0/subdir/fake.pickle").close() + # Fail with auto-materialize and file in one of the real subdirs + with pytest.raises(AssertionError): + pipeline.take_all() + + assert fsh.file_exists(path["root"] + "/DocScan.0/subdir/fake.pickle")