Skip to content

Commit

Permalink
Merge pull request #187 from IdentityPython/feature-macros-and-scopes
Browse files Browse the repository at this point in the history
support for macros and scopes
  • Loading branch information
leifj authored Dec 10, 2019
2 parents d701006 + 9a03146 commit a17e543
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 10 deletions.
12 changes: 5 additions & 7 deletions examples/batch-mdq-loop.fd
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
- when sign:
- drop_xsi_type:
- finalize:
cacheDuration: PT5H
validUntil: P10D
Expand All @@ -12,16 +13,13 @@
- http://fed.openathens.net/oafed/metadata
- http://edugain.cdn.samlbits.net
- select:
- fork:
- then sign:
- break
- map:
- log_entity:
- fork:
- drop_xsi_type:
- finalize:
cacheDuration: PT5H
validUntil: P10D
- sign:
key: sign.key
cert: sign.crt
- then sign:
- publish:
output: /tmp/mdq/entities
hash_link: true
Expand Down
16 changes: 14 additions & 2 deletions src/pyff/builtins.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def _p(e):
ip = Plumbing(pipeline=req.args, pid="{}.each[{}]".format(req.plumbing.pid, entity_id))
ireq = Plumbing.Request(ip, req.md, t=e, scheduler=req.scheduler)
ireq.set_id(entity_id)
ireq.set_parent(req)
return ip.iprocess(ireq)

from multiprocessing.pool import ThreadPool
Expand All @@ -89,6 +90,16 @@ def _p(e):
log.info("processed {} entities".format(len(result)))


@pipe(name="then")
def _then(req, *opts):
"""
Call a named 'when' clause and return - akin to macro invocations for pyFF
"""
for cb in [PipelineCallback(p, req, store=req.md.store) for p in opts]:
req.t = cb(req.t)
return req.t


@pipe(name="log_entity")
def _log_entity(req, *opts):
"""
Expand Down Expand Up @@ -221,6 +232,7 @@ def fork(req, *opts):
ip = Plumbing(pipeline=req.args, pid="%s.fork" % req.plumbing.pid)
ireq = Plumbing.Request(ip, req.md, t=nt, scheduler=req.scheduler)
ireq.set_id(req.id)
ireq.set_parent(req)
ip.iprocess(ireq)

if req.t is not None and ireq.t is not None and len(root(ireq.t)) > 0:
Expand Down Expand Up @@ -608,10 +620,10 @@ def load(req, *opts):
params['verify'] = elt

if params['via'] is not None:
params['via'] = [PipelineCallback(pipe, req, store=req.md.store) for pipe in params['via']]
params['via'] = [PipelineCallback(p, req, store=req.md.store) for p in params['via']]

if params['cleanup'] is not None:
params['cleanup'] = [PipelineCallback(pipe, req, store=req.md.store) for pipe in params['cleanup']]
params['cleanup'] = [PipelineCallback(p, req, store=req.md.store) for p in params['cleanup']]

params.update(opts)

Expand Down
14 changes: 13 additions & 1 deletion src/pyff/pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class PipelineCallback(object):

def __init__(self, entry_point, req, store=None):
self.entry_point = entry_point
self.plumbing = Plumbing(req.plumbing.pipeline, "%s-via-%s" % (req.plumbing.id, entry_point))
self.plumbing = Plumbing(req.scope_of(entry_point).plumbing.pipeline, "%s-via-%s" % (req.plumbing.id, entry_point))
self.req = req
self.store = store

Expand Down Expand Up @@ -213,6 +213,15 @@ def __init__(self, pl, md, t=None, name=None, args=None, state=None, store=None,
self.scheduler = scheduler
self.raise_exceptions = raise_exceptions
self.exception = None
self.parent = None

def scope_of(self, entry_point):
if 'with {}'.format(entry_point) in self.plumbing.pipeline:
return self
elif self.parent is None:
return self
else:
return self.parent.scope_of(entry_point)

@property
def id(self):
Expand All @@ -227,6 +236,9 @@ def id(self):
def set_id(self, _id):
self._id = _id

def set_parent(self, _parent):
self.parent = _parent

@property
def store(self):
if self._store:
Expand Down

0 comments on commit a17e543

Please sign in to comment.