Skip to content

Commit

Permalink
Add device daemon (pytorch#131814)
Browse files Browse the repository at this point in the history
Base implementation aiming towards pytorch/rfcs#64

Details of the implementation and next steps in https://github.com/pytorch/pytorch/blob/gh/albanD/3/head/test/cpp_extensions/open_registration_extension/README.md

Pull Request resolved: pytorch#131814
Approved by: https://github.com/ezyang
  • Loading branch information
albanD authored and pytorchmergebot committed Aug 27, 2024
1 parent d6091c8 commit 3b33f26
Show file tree
Hide file tree
Showing 10 changed files with 614 additions and 41 deletions.
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ per-file-ignores =
torch/distributed/_tensor/_collective_utils.py: TOR901
# This is a full package that happen to live within the test
# folder, so ok to skip
test/cpp_extensions/open_registration_extension/pytorch_openreg/__init__.py: TOR901
test/cpp_extensions/open_registration_extension/pytorch_openreg/_aten_impl.py: TOR901
optional-ascii-coding = True
exclude =
./.git,
Expand Down
23 changes: 20 additions & 3 deletions test/cpp_extensions/open_registration_extension/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
This folder contains a self-contained example of a PyTorch out-of-tree backend leveraging the "PrivateUse1" backend in core.
This folder contains a self-contained example of a PyTorch out-of-tree backend leveraging the "PrivateUse1" backend from core.

## How to use
Install as standalone with `python setup.py develop` (or install) from this folder.
Expand All @@ -8,6 +8,23 @@ You can run test via `python test/test_openreg.py`.
For simplicity anything that can be implemented from python is done so.
A real implementation will most likely want to call these different APIs from c++ directly.

The current version send everything back to python and is missing most implementations in python. The only one available is the one used by the autograd engine to check how many workers to spawn.
The current version sends everything back to python and contains enough implementation to run basic model, transfer host/device and printing.

Next step is to create the device daemon so we can actually provide and allocator and create memory, then start using features and re-route all missing methods to daemon as appropriate.
The codebase is split as follows:
- `pytorch_openreg/__init__.py` imports torch to get core state initialized, imports `._aten_impl` to register our aten op implementations to torch, imports `.C` to load our c++ extension that registers more ops, allocator and hooks and finally renames the PrivateUse1 backend and register our python-side module.
- `pytorch_openreg/_aten_impl.py` does two main things. Use the `_register_same_name()` function to register hooks from c++ (like getDevice, getStream, etc) and send them to our device daemon. Define a new `torch.Library` that registers a fallback that will be called whenever a backend kernel for PrivateUse1 is called. It contains the logic to handle all kind of native functions, computing the output metadata, allocating it and only calling into the device daemon to perform computation
- `pytorch_openreg/_device_daemon.py` contains the Allocator (responsible for allocating memory on the device side, as int8 buffers, and recreating nice looking Tensors on the device side to be able to use aten ops to run code there), `run_op` that is the logic running on the device side to perform compute (for simplicity of coverage, we are re-building full blown Tensors here and calling aten ops on them). It also contains the Daemon responsible for the device worker process and sending data back and forth.
- `pytorch_openreg/_meta_parser.py` mainly contain utilities to send objects over the wire from the user process to the device process. The main class there is `OpenRegTensorMeta` that contains all the metadata sent to the device which should be enough for it to populate the output Tensor.

## Next steps

Currently, the autograd test is disabled because it's missing the getStream implementation.
The main next step would be to:
- Split the daemon into a proper user-process driver vs device-process executor. The main goal would be to better mimick which information is held on the user-process side and when we're actually communicating with the device. In particular current device or stream should be user-process informations.
- Add Stream/Event system. Most likely by having multiple requests queue that go to the device from the driver.
- Add RNG Generator.
- Add Pinned memory and HostAllocator.

Longer term:
- Replace the current `open_registration_extension.cpp` test in PyTorch CI with this.
- Build this module in the CI environment and enable Device-generic tests on this device.
Original file line number Diff line number Diff line change
@@ -1,26 +1,13 @@
import torch


# Global properties of our device
NUM_DEVICES = 7

# Create our python implementation dict so that the C++ module
# can access it during its initialization
_IMPL_REGISTRY = {}

# Load the C++ Module
import pytorch_openreg._C # noqa: F401


# Define all the implementations in the registry
def register(fn):
_IMPL_REGISTRY[fn.__name__[1:]] = fn
return fn
# Also register aten impls
from ._aten_impl import _IMPL_REGISTRY as _IMPL_REGISTRY # noqa: F401


@register
def _deviceCount():
return NUM_DEVICES
# Load the C++ Module
import pytorch_openreg._C # noqa: F401 # usort: skip


# Module used for our backend
Expand All @@ -31,15 +18,3 @@ class _OpenRegMod:
# Set all the appropriate state on PyTorch
torch.utils.rename_privateuse1_backend("openreg")
torch._register_device_module("openreg", _OpenRegMod())

_openreg_lib = torch.library.Library("_", "IMPL") # ignore TOR901


def _openreg_kernel_fallback(op, *args, **kwargs):
print("Calling ", op)
assert op is torch.ops.aten.empty.memory_format
# FIXME: this returns a cpu Tensor which is NOT ok.
return torch.empty(args[0])


_openreg_lib.fallback(_openreg_kernel_fallback, dispatch_key="PrivateUse1")
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import logging

import torch
from torch.utils._pytree import tree_any


log = logging.getLogger(__name__)

from ._device_daemon import daemon
from ._meta_parser import prepare_for_sending, to_device_no_copy


_IMPL_REGISTRY = {}


# Define all the implementations in the registry
def _register_same_name(name, with_log=False):
def _(*args, **kwargs):
if with_log:
log.info("Calling hook %s", name)
return daemon.exec(name, *args, **kwargs)

_IMPL_REGISTRY[name] = _


_register_same_name("deviceCount")
_register_same_name("getDevice")
_register_same_name("uncheckedSetDevice")
_register_same_name("exchangeDevice")
_register_same_name("malloc", True)
_register_same_name("free", True)

_openreg_lib = torch.library.Library("_", "IMPL")


def _openreg_kernel_fallback(op, *args, **kwargs):
log.info("Calling kernel %s", op)

# Special ops needed to avoid infinite recursion
if op is torch.ops.aten._copy_from.default:
from_, to_ = args
if from_.device.type == to_.device.type:
assert from_.device.type == "openreg"
op = torch.ops.aten.copy_.default
# handled below as a regular copy
elif from_.device.type == "openreg":
args, _ = prepare_for_sending((from_,), {})
host_mem = daemon.exec("send_data", *args)
return to_.copy_(host_mem)
elif to_.device.type == "openreg":
args, _ = prepare_for_sending((to_,), {})
daemon.exec("recv_data", from_, *args)
return to_
else:
raise RuntimeError("Should not happen")
elif op is torch.ops.aten.set_.source_Tensor:
return torch.ops.aten.set_.source_Storage_storage_offset(
args[0],
args[1].untyped_storage(),
args[1].storage_offset(),
args[1].size(),
args[1].stride(),
)
elif op is torch.ops.aten._local_scalar_dense.default:
args, _ = prepare_for_sending(args, {})
host_mem = daemon.exec("send_data", *args)
return host_mem.item()

op_name = None
post_process = None
if "out" in op._overloadname:
# Note that all structured native op will call here
if isinstance(kwargs["out"], tuple):
raise RuntimeError(f"out= variant {op} with tuple out= not supported")
if kwargs["out"].nelement() == 0:
# Out variant that needs a resize, convert to an out of place
# and handle generically below
orig_out = kwargs["out"]
del kwargs["out"]
if op._overloadname != "out":
raise RuntimeError(
"Cannot retranslate non-default out= variant form 0 size"
)
op = op.overloadpacket.default

def _post_process():
nonlocal real_res
orig_out.set_(real_res)
real_res = orig_out

post_process = _post_process

else:
# No metadata update to do, just run the op on the device
op_name = op.overloadpacket._qualified_op_name
real_res = kwargs["out"]
elif not tree_any(lambda obj: isinstance(obj, torch.Tensor), (args, kwargs)):
# No Tensor argument means factory function
# They should decompose and be handled in our c++ side directly
raise RuntimeError(f"{op} not handled yet.")
elif op._schema.is_mutable or op is torch.ops.aten._copy_from.default:
# Only handle inplace ops returning their first arg
assert len(args) >= 1, f"Inplace {op} needs at least one arg"
assert (
len(op._schema.returns) == 1
), f"NYI Inplace {op} with more than one return"
op_name = op.overloadpacket._qualified_op_name
real_res = args[0]
elif any(r.alias_info is not None for r in op._schema.returns):
# View ops
if op is torch.ops.aten.view.default:
return torch.ops.aten._unsafe_view(*args, **kwargs)
raise RuntimeError(f"{op} view op is not handled yet")

if op_name is None:
# 1. Compute updated metadata
if torch.Tag.dynamic_output_shape not in op.tags:
# Usual case: run the meta op to see the output metadata
meta_args, meta_kwargs = to_device_no_copy("meta", args, kwargs)
meta_res = op(*meta_args, **meta_kwargs)

# 2. Allocate the output
real_res, _ = to_device_no_copy("openreg", meta_res, {})
else:
# Slow version for data-dependent functions:
# Run the op on the device just to get the output shape
args_, kwargs_ = prepare_for_sending(args, kwargs)
shape = daemon.exec(
"get_op_output_shape",
op.overloadpacket._qualified_op_name,
args_,
kwargs_,
)

# 2. Allocate the output
real_res = args[0].new(shape)

# 3. Move to out variant
kwargs["out"] = real_res
# Let overload resolution find the out= overload
op_name = op.overloadpacket._qualified_op_name

# 4. Run the compute and populate the output on the device
args, kwargs = prepare_for_sending(args, kwargs)
daemon.exec("run_op", op_name, args, kwargs)

if post_process is not None:
post_process()

return real_res


_openreg_lib.fallback(_openreg_kernel_fallback, dispatch_key="PrivateUse1")
Loading

0 comments on commit 3b33f26

Please sign in to comment.