Skip to content

Commit

Permalink
Fix materialize + S3 not working. (#734)
Browse files Browse the repository at this point in the history
* Fix materialize + S3 not working.

Existing code was relying on python pathlib to do most operations rather than
running them through the pyarrow filesystem.  Fix that, and add a test with
a fake pyarrow filesystem to make sure we run with a path that does not exist
in the local filesystem.

* Also make AutoMaterialize work with pyarrow non-local filesystems.
  • Loading branch information
eric-anderson authored Aug 28, 2024
1 parent c989a63 commit 72c33ae
Show file tree
Hide file tree
Showing 4 changed files with 255 additions and 55 deletions.
4 changes: 4 additions & 0 deletions lib/sycamore/sycamore/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,13 @@ def execute_iter(self, plan: Node, **kwargs) -> Iterable[Document]:
plan.traverse(visit=lambda n: n.finalize())

def recursive_execute(self, n: Node) -> list[Document]:
from sycamore.materialize import Materialize

if len(n.children) == 0:
assert hasattr(n, "local_source"), f"Source {n} needs a local_source method"
return n.local_source()
if isinstance(n, Materialize) and n._will_be_source():
return n.local_source()
if len(n.children) == 1:
assert hasattr(n, "local_execute"), f"Transform {n.__class__.__name__} needs a local_execute method"
assert n.children[0] is not None
Expand Down
126 changes: 82 additions & 44 deletions lib/sycamore/sycamore/materialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,33 @@
logger = logging.getLogger("__name__")


class _PyArrowFsHelper:
def __init__(self, fs: "pyarrow.FileSystem"):
self._fs = fs

def list_files(self, path):
from pyarrow.fs import FileSelector

return self._fs.get_file_info(FileSelector(str(path), allow_not_found=True, recursive=True))

def file_exists(self, path: Path) -> bool:
from pyarrow.fs import FileType

info = self._fs.get_file_info(str(path))
return info.type == FileType.File

def safe_cleanup(self, path) -> None:
# materialize dirs should be non-hierarchical, minimize the chance that
# mis-use will delete unexpected files.
plen = len(str(path)) + 1
for fi in self.list_files(path):
assert "/" not in fi.path[plen:], f"Refusing to clean {path}. Found unexpected hierarchical file {fi.path}"

logging.info(f"Cleaning up any materialized files in {path}")
self._fs.delete_dir_contents(str(path), missing_dir_ok=True)
self._fs.create_dir(str(path))


class Materialize(UnaryNode):
def __init__(
self,
Expand All @@ -33,6 +60,7 @@ def __init__(
pass
elif isinstance(path, str) or isinstance(path, Path):
(self._fs, self._root) = self.infer_fs(str(path))
self._fshelper = _PyArrowFsHelper(self._fs)
self._doc_to_name = self.doc_to_name
self._doc_to_binary = Document.serialize
self._clean_root = True
Expand All @@ -43,6 +71,7 @@ def __init__(
self._fs = path["fs"]
else:
(self._fs, self._root) = self.infer_fs(str(self._root))
self._fshelper = _PyArrowFsHelper(self._fs)
self._doc_to_name = path.get("name", self.doc_to_name)
self._doc_to_binary = path.get("tobin", Document.serialize)
assert callable(self._doc_to_name)
Expand All @@ -58,17 +87,19 @@ def __init__(
assert self._clean_root, "Using materialize in source mode requires cleaning the root"

self._source_mode = source_mode
self._executed_child = False

super().__init__(child, **kwargs)

def execute(self, **kwargs) -> "Dataset":
logger.debug("Materialize execute")
if self._source_mode == MaterializeSourceMode.IF_PRESENT:
success = self._success_path().exists()
success = self._fshelper.file_exists(self._success_path())
if success or len(self.children) == 0:
logger.info(f"Using {self._root} as cached source of data")
self._verify_has_files()
self._executed_child = False
if not success:
self._verify_has_files()
logging.warning(f"materialize.success not found in {self._root}. Returning partial data")

from ray.data import read_binary_files
Expand All @@ -77,6 +108,7 @@ def execute(self, **kwargs) -> "Dataset":

return files.map(self._ray_to_document)

self._executed_child = True
# right now, no validation happens, so save data in parallel. Once we support validation
# to support retries we won't be able to run the validation in parallel. non-shared
# filesystems will also eventually be a problem but we can put it off for now.
Expand All @@ -97,22 +129,29 @@ def ray_callable(ray_input: dict[str, numpy.ndarray]) -> dict[str, numpy.ndarray
return input_dataset

def _verify_has_files(self) -> None:

assert self._root is not None
if not self._root.is_dir():
raise ValueError(f"Materialize root {self._root} is not a directory")
assert self._fs is not None

for n in self._root.iterdir():
if str(n).endswith(".pickle"):
files = self._fshelper.list_files(self._root)
for n in files:
if n.path.endswith(".pickle"):
return

raise ValueError(f"Materialize root {self._root} has no .pickle files")

def _ray_to_document(self, dict: dict[str, Any]) -> dict[str, bytes]:
return {"doc": dict["bytes"]}

def _will_be_source(self) -> bool:
return self._source_mode == MaterializeSourceMode.IF_PRESENT and self._fshelper.file_exists(
self._success_path()
)

def local_execute(self, docs: list[Document]) -> list[Document]:
if self._source_mode == MaterializeSourceMode.IF_PRESENT:
if self._success_path().exists():
if self._fshelper.file_exists(self._success_path()):
self._executed_child = False
logger.info(f"Using {self._root} as cached source of data")

return self.local_source()
Expand All @@ -121,30 +160,35 @@ def local_execute(self, docs: list[Document]) -> list[Document]:
self.cleanup()
for d in docs:
self.save(d)
self._executed_child = True

return docs

def local_source(self) -> list[Document]:
assert self._root is not None
self._verify_has_files()
logger.info(f"Using {self._root} as cached source of data")
if not self._success_path().exists():
if not self._fshelper.file_exists(self._success_path()):
logging.warning(f"materialize.success not found in {self._root}. Returning partial data")
ret = []
for n in self._root.iterdir():
for fi in self._fshelper.list_files(self._root):
n = Path(fi.path)
if n.suffix == ".pickle":
with open(n, "rb") as f:
ret.append(Document.deserialize(f.read()))
f = self._fs.open_input_stream(str(n))
ret.append(Document.deserialize(f.read()))
f.close()

return ret

def _success_path(self):
return self._root / "materialize.success"

def finalize(self):
if not self._executed_child:
return
if self._root is not None:
self._success_path().touch()
assert self._success_path().exists()
self._fs.open_output_stream(str(self._success_path())).close()
assert self._fshelper.file_exists(self._success_path())

@staticmethod
def infer_fs(path: str) -> "pyarrow.FileSystem":
Expand Down Expand Up @@ -172,21 +216,20 @@ def save(self, doc: Document) -> None:
assert self._root is not None
name = self._doc_to_name(doc)
path = self._root / name
if self._clean_root:
assert not path.exists(), f"Duplicate name {path} generated for clean root"
if self._clean_root and self._fshelper.file_exists(path):
raise ValueError(f"Duplicate name {path} generated for clean root")
with self._fs.open_output_stream(str(path)) as out:
out.write(bin)

def cleanup(self) -> None:
if not self._clean_root:
if self._root is None:
return

import shutil

if self._root is None:
if not self._clean_root:
self._fs.create_dir(str(self._root))
return
shutil.rmtree(self._root, ignore_errors=True)
self._root.mkdir(parents=True, exist_ok=True)

self._fshelper.safe_cleanup(self._root)

@staticmethod
def doc_to_name(doc: Document) -> str:
Expand Down Expand Up @@ -224,19 +267,17 @@ def __init__(self, path: Union[str, Path, dict] = {}):
super().__init__()
if isinstance(path, str) or isinstance(path, Path):
path = {"root": path}
else:
path = path.copy()
if "clean" not in path:
path["clean"] = True

self._path = path
self.directory = path.pop("root", None)
self._choose_directory(path)
self._basename_to_count: dict[str, int] = {}

def once(self, context, node):
self._name_unique = set()
node = node.traverse(after=self._naming_pass())

self._setup_directory()

node = node.traverse(visit=self._cleanup_pass())
node = node.traverse(before=self._wrap_pass(context))

Expand Down Expand Up @@ -273,20 +314,12 @@ def visit(node):
materialize = node.properties["materialize"]
materialize.pop("mark", None)

path = self.directory / materialize["name"]
path = self._directory / materialize["name"]

if path.exists():
if not self._path["clean"]:
return
# auto-materialize dirs should be non-hierarchical, minimize the chance that
# mis-use will delete unexpected files.
for p in path.iterdir():
assert not p.is_dir()
if not self._path["clean"]:
return

for p in path.iterdir():
p.unlink()
else:
path.mkdir(parents=True)
self._fshelper.safe_cleanup(path)

return visit

Expand All @@ -306,24 +339,29 @@ def before(node):
return node

path = self._path.copy()
path["root"] = self.directory / materialize["name"]
path["root"] = self._directory / materialize["name"]
materialize["mark"] = True
materialize["count"] = materialize.get("count", 0) + 1
return Materialize(node, context, path=path)

return before

def _setup_directory(self):
def _choose_directory(self, path):
from pathlib import Path

if self.directory is None:
directory = path.pop("root", None)
self._path = path

if directory is None:
from datetime import datetime
import tempfile

now = datetime.now().replace(microsecond=0)
dir = Path(tempfile.gettempdir()) / f"materialize.{now.isoformat()}"
self.directory = dir
directory = str(dir)
logger.info(f"Materialize directory was not specified. Used {dir}")

if not isinstance(self.directory, Path):
self.directory = Path(self.directory)
(self._fs, self._directory) = Materialize.infer_fs(directory)
if "fs" in self._path:
self._fs = self._path["fs"]
self._fshelper = _PyArrowFsHelper(self._fs)
96 changes: 96 additions & 0 deletions lib/sycamore/sycamore/tests/unit/inmempyarrowfs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import io

from pyarrow.fs import FileSystem, FileSelector, FileInfo, FileType


class InMemPyArrowFileSystem(FileSystem):
def __init__(self):
self._fs = {}

def copy_file(self, src, dest):
raise NotImplementedError()

def create_dir(self, path, *, recursive=True):
assert isinstance(path, str)
# We're blob-like, so create dir is noop.
pass

def delete_dir(self, path):
raise NotImplementedError()

def delete_dir_contents(self, path, missing_dir_ok=False):
assert isinstance(path, str)
assert missing_dir_ok, "unimplemented"
path = path + "/"
todelete = []
for k, v in self._fs.items():
if k.startswith(path):
todelete.append(k)

for k in todelete:
del self._fs[k]

def delete_file(self, path):
assert isinstance(path, str)
assert path in self._fs
del self._fs[path]

def equals(self, other):
raise NotImplementedError()

def get_file_info(self, p):
if isinstance(p, str):
if p not in self._fs:
return FileInfo(str(p))

# API docs claim we can leave mtime & size as None
return FileInfo(str(p), type=FileType.File)

assert isinstance(p, FileSelector)
assert p.allow_not_found, "unimplemented"
assert p.recursive, "unimplemented"
dir = p.base_dir + "/"
len(dir)
ret = []
for k, v in self._fs.items():
if not k.startswith(dir):
continue
ret.append(FileInfo(str(k), type=FileType.File))

return ret

def move(self, src, dest):
raise NotImplementedError()

def normalize_path(self, path):
raise NotImplementedError()

def open_append_stream(self, path):
raise NotImplementedError()

def open_input_file(self, path):
raise NotImplementedError()

def open_input_stream(self, path):
assert isinstance(path, str)
assert path in self._fs
f = self._fs[path]
assert isinstance(f, bytes)
return io.BytesIO(f)

def open_output_stream(self, path):
class OpenFile(io.BytesIO):
def __init__(self, fs, name):
self._fs = fs
self._name = name
super().__init__()

def close(self):
assert isinstance(self._fs[self._name], OpenFile)
self._fs[self._name] = self.getvalue()
super().close()

assert isinstance(path, str)
assert path not in self._fs, "overwrite unimplemented"
self._fs[path] = OpenFile(self._fs, path)
return self._fs[path]
Loading

0 comments on commit 72c33ae

Please sign in to comment.