Skip to content

Commit

Permalink
Merge pull request #14 from Autodesk/persistent_obj_references
Browse files Browse the repository at this point in the history
Add mechanism for persisting references across RPC roundtrips
  • Loading branch information
avirshup authored Jun 15, 2017
2 parents 256d529 + c73630f commit 73b014d
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 14 deletions.
1 change: 1 addition & 0 deletions pyccc/engines/dockerengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def __init__(self, client=None, workingdir='/default_wdir'):
self.default_wdir = workingdir
self.hostname = self.client.base_url


def connect_to_docker(self, client=None):
if isinstance(client, basestring):
client = du.get_docker_apiclient(client)
Expand Down
76 changes: 76 additions & 0 deletions pyccc/picklers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from __future__ import print_function, absolute_import, division
from future.builtins import *
from future import standard_library

standard_library.install_aliases()

# Copyright 2017 Autodesk Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import io
import pickle
import weakref

_weakmemos = weakref.WeakValueDictionary()


class DepartingPickler(pickle.Pickler):
""" Pickler for objects on the DEPARTURE leg of the roundtrip
Note: this will _probably_ only handle a single layer round trip for now, and will
fail in weird ways otherwise.
"""

def persistent_id(self, obj):
""" Tags objects with a persistent ID, but do NOT emit it
"""
if getattr(obj, '_PERSIST_REFERENCES', None):
objid = id(obj)
obj._persistent_ref = objid
_weakmemos[objid] = obj
return None


class ReturningPickler(pickle.Pickler):
""" Pickler for objects on the RETURN leg of the roundtrip
"""
def persistent_id(self, obj):
""" Replaces object reference
"""
if getattr(obj, '_persistent_ref', None) is not None:
return obj._persistent_ref


class ReturningUnpickler(pickle.Unpickler):
""" Pickler for RETURNING objects that will retain references on roundtrip
"""
def persistent_load(self, pid):
return _weakmemos[pid]


def departure_dumps(obj, **kwargs):
buff = io.BytesIO()
pickler = DepartingPickler(buff, **kwargs)
pickler.dump(obj)
result = buff.getvalue()
buff.close()
return result


def return_loads(b, **kwargs):
buff = io.BytesIO(b)
unpickler = ReturningUnpickler(buff, **kwargs)
obj = unpickler.load()
buff.close()
return obj
37 changes: 26 additions & 11 deletions pyccc/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@
import inspect

import pyccc
from pyccc import job
from pyccc import source_inspections as src
from pyccc.exceptions import ProgramFailure
from pyccc.files import StringContainer, LocalFile
from . import job
from . import source_inspections as src
from .exceptions import ProgramFailure
from .files import StringContainer, LocalFile
from . import picklers

from ._native import native

Expand All @@ -50,7 +51,6 @@ def exports(o):
PYVERSION = 3
import builtins as BUILTINS


@exports
class PythonCall(object):
def __init__(self, function, *args, **kwargs):
Expand All @@ -75,6 +75,7 @@ class PythonJob(job.Job):
# @utils.doc_inherit
def __init__(self, engine, image, command,
interpreter=DEFAULT_INTERPRETER,
persist_references=False,
sendsource=True, **kwargs):
self._raised = False
self._updated_object = None
Expand All @@ -83,6 +84,7 @@ def __init__(self, engine, image, command,
self.sendsource = sendsource
self._function_result = None
self.interpreter = self._clean_interpreter_string(interpreter)
self.persist_references = persist_references

self.function_call = command

Expand All @@ -108,7 +110,6 @@ def _clean_interpreter_string(istr):
else:
return 'python' + str(istr)


def _get_python_files(self):
"""
Construct the files to send the remote host
Expand All @@ -118,9 +119,14 @@ def _get_python_files(self):
"""
python_files = {'run_job.py': PYTHON_JOB_FILE}

remote_function = PackagedFunction(self.function_call)
remote_function = PackagedFunction(self.function_call, self.persist_references)
if self.persist_references:
dumps = picklers.departure_dumps
else:
dumps = pickle.dumps

python_files['function.pkl'] = pyccc.BytesContainer(
pickle.dumps(remote_function, protocol=PICKLE_PROTOCOL),
dumps(remote_function, protocol=PICKLE_PROTOCOL),
name='function.pkl')
self._remote_function = remote_function

Expand Down Expand Up @@ -186,7 +192,12 @@ def function_result(self):
returnval = self.get_output('_function_return.pkl')
except KeyError:
raise ProgramFailure(self)
self._callback_result = pickle.loads(returnval.read('rb'))
if self.persist_references:
loads = picklers.return_loads
else:
loads = pickle.loads

self._callback_result = loads(returnval.read('rb'))
return self._callback_result

@property
Expand Down Expand Up @@ -248,7 +259,7 @@ class PackagedFunction(native.object):
- Function side effects are not tracked at all.
- Closure variables and module references will be sent along with the function
"""
def __init__(self, function_call):
def __init__(self, function_call, persist_references):
func = function_call.function
self.is_imethod = function_call.is_instancemethod
if self.is_imethod:
Expand All @@ -258,6 +269,7 @@ def __init__(self, function_call):
self.func_name = func.__name__
self.args = function_call.args
self.kwargs = function_call.kwargs
self.persist_references = persist_references

globalvars = src.get_global_vars(func)
self.global_closure = globalvars['vars']
Expand Down Expand Up @@ -299,4 +311,7 @@ def prepare_namespace(self, func):
to_run.__globals__.update(self.global_functions)
return to_run

PACKAGEDFUNCTIONSOURCE = '\n' + src.getsource(PackagedFunction)
PACKAGEDFUNCTIONSOURCE = '\n'.join(['',
src.getsource(PackagedFunction),
'\nimport pickle',
src.getsource(picklers.ReturningPickler)])
10 changes: 7 additions & 3 deletions pyccc/static/run_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def main():
try:
funcpkg, func = load_job()
result = funcpkg.run(func)
serialize_output(result)
serialize_output(result, persistrefs=funcpkg.persist_references)

if funcpkg.is_imethod:
with open('_object_state.pkl', 'wb') as ofp:
Expand All @@ -74,9 +74,13 @@ def load_job():
return funcpkg, func


def serialize_output(result):
def serialize_output(result, persistrefs=False):
with open('_function_return.pkl', 'wb') as rp:
pickle.dump(result, rp, PICKLE_PROTOCOL)
if persistrefs:
pickler = source.ReturningPickler(rp, PICKLE_PROTOCOL)
pickler.dump(result)
else:
pickle.dump(result, rp, PICKLE_PROTOCOL)


def capture_exceptions(exc):
Expand Down
67 changes: 67 additions & 0 deletions pyccc/tests/test_job_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,73 @@ def test_python_exitcode(fixture, request):
assert job.exitcode == 38


class MyRefObj(object):
_PERSIST_REFERENCES = True

def identity(self):
return self


@pytest.mark.parametrize('fixture', fixture_types['engine'])
def test_persistence_assumptions(fixture, request):
# Object references are not persisted across function calls by default.
# This is the control experiment prior to the following tests
testobj = MyRefObj()
testobj.o = MyRefObj()
testobj.o.o = testobj

engine = request.getfuncargvalue(fixture)
pycall = pyccc.PythonCall(testobj.identity)

# First the control experiment - references are NOT persisted
job = engine.launch(PYIMAGE, pycall, interpreter=PYVERSION)
job.wait()
result = job.result
assert result is not testobj
assert result.o is not testobj.o
assert result.o.o is result


@pytest.mark.parametrize('fixture', fixture_types['engine'])
def test_persist_references_flag(fixture, request):
testobj = MyRefObj()
testobj.o = MyRefObj()
testobj.o.o = testobj

engine = request.getfuncargvalue(fixture)
pycall = pyccc.PythonCall(testobj.identity)

# With the right flag, references ARE now persisted
job = engine.launch(PYIMAGE, pycall, interpreter=PYVERSION, persist_references=True)
job.wait()
result = job.result
assert result is testobj
assert result.o is testobj.o
assert result.o.o is result


@pytest.mark.parametrize('fixture', fixture_types['engine'])
def test_persistent_and_nonpersistent_mixture(fixture, request):
# References only persisted in objects that request it
testobj = MyRefObj()
testobj.o = MyRefObj()
testobj.o.o = testobj
testobj.should_persist = MyRefObj()
testobj._PERSIST_REFERENCES = False
testobj.o._PERSIST_REFERENCES = False

engine = request.getfuncargvalue(fixture)
pycall = pyccc.PythonCall(testobj.identity)

job = engine.launch(PYIMAGE, pycall, interpreter=PYVERSION, persist_references=True)
job.wait()
result = job.result
assert result is not testobj
assert result.o is not testobj.o
assert result.o.o is result
assert result.should_persist is testobj.should_persist


def _runcall(fixture, request, function, *args, **kwargs):
engine = request.getfuncargvalue(fixture)
fn = pyccc.PythonCall(function, *args, **kwargs)
Expand Down

0 comments on commit 73b014d

Please sign in to comment.