Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

creation of nodes with snakemake functionality #188

Open
JNmpi opened this issue Apr 11, 2023 · 4 comments
Open

creation of nodes with snakemake functionality #188

JNmpi opened this issue Apr 11, 2023 · 4 comments
Labels
enhancement New feature; can be answer partner of "feature request"

Comments

@JNmpi
Copy link
Contributor

JNmpi commented Apr 11, 2023

Summary

Provide a new node type that allows execution of snakemake rules. pyiron/ironflow would provide a thin interface to make snakemake rules appearing as native pyiron objects.

Detailed Description

@liamhuber, this is a brief summary of the discussion we had at our last Zoom-meeting.

Looking at several workflow solutions in the Plattform MaterialDigital (PMD) a recurring request is that workflows with codes running in various conda environments have to be supported. A simple approach, which should be rather easy to implement would be to use snakemake. The idea would be to have a special run command that provides all the parameters that are needed to setup the Snakemake input file to run snakemake and to run it within pyiron. The corresponding jobs would be regular pyiron jobs, but could use the functionality of snakemake to run install, load and run codes living in different conda environments. Rather than using a full snakemake workflow the idea would be to run each snakemake rule as a separate job. This should be straightforward to implement since the rule defines input and output parameters (similarly to pyiron) plus some extra information regarding the conda version etc.

The (pseudo-) code could look like this:

@snakemake(conda=‘my_repository’,
                             resources=‘100MB’,
                             output=‘out_file1’)
# job object is created by decorator, similar to the value argument in a setter decorator
def my_node(input= [‘my_file1',  ‘my_file2’], job): 
       return job.output   

Further Information, Files, and Links

Example workflows (snakemake example):

@JNmpi JNmpi added the enhancement New feature; can be answer partner of "feature request" label Apr 11, 2023
@liamhuber
Copy link
Member

I also made notes from this meeting. I only got to the pseudocode parts this morning, but was actually not super happy and wrote some more pseudocode right after.

Notes:

On 6 April, Joerg and I just had a zoom call to discuss recent thoughts on the next iteration of pyiron_base/all this graph stuff.
I often make post-processing notes after such meetings for my own benefit, and reproduce them here.

First on the docket was to quickly run through my (barely)pseudo-code example for a cyclic while loop over here.
I'm relieved that he also finds it pretty readable.
We can add extensions like a limit on the number of iterations quite easily, e.g. by including max_steps as input to the while-loop kill switch, then having steps as both input and output and linking these (to strictly maintain functional behaviour).
I also floated the idea of adding "meta node" constructors -- i.e. a node class call returns a node instance, a meta node call returns a node class.
I think things like while and for loops will often require some customization, but this would hopefully make the whole thing less verbose.
One of the things Joerg was keen on with this example was the way that all the information existed right in the graph, even the Lammps intepreter instance (the engine node).

Next up was Joerg's node/task sketch here.
I was initially very concerned about the fact that a single node might have multiple associated tasks -- I thought only a macro node should behave in such a way.
Joerg walked me through the task/node distinction a little bit more and I've really come around to this point of view.

First, some nomenclature: we broke the idea of a "node" down into "atomic node" (in the greek sense of the word, not the kind of science we're doing) and a "macro node".
"Atomic nodes" should, as far as possible, be functional -- meaning they do not hold internal state, and the same input always produces the same output.
Now, there are a few bends to that rule:

  • First, we might consider something like a vasp job. On the initial call, the node might start such a job on remote resources and return some manner of "wait" signal. On the next call it may see that such a job is already running and return the "wait" signal again. Next, it may see that the job has finished, parse the output and return the "real" output. Finally, it may have discovered that the job crashed and return a signal accordingly. In a strict sense, this is not functional because we get different outcomes through time for the same input.
  • Second, there may be unavoidable internal randomness, e.g. some executed code may use RNG without exposing a seed value to us, or there may be hardware dependent impact (up to and including external forces crashing a calculation, e.g. if there is memory shortage on shared resources!) We simply live with this.
  • Finally, some data is too big to return in-memory, so we will want to return a pointer to some sort of serialized data. E.g. a vasp wavecar. This introduces "state" to the graph that can be tampered with from outside the graph execution. Again, we just live with it. As a bonus, if we accept that we live with this, we can also allow file pointers as other IO to nodes, e.g. to quickly write nodes for running different softwares without writing parsers. This abandons "purity" but is super practical.
    In contrast to this is "macro nodes", which is any node can be broken down into a graph of atomic and/or sub-macro nodes.
    For maintainability and readability, it would be nice to keep "atomic" nodes as simple as possible, and for "macro" nodes to only run a sub-graph and node mix in extra non-nodal functionality.

Now we come back to Joerg's sketch and the idea of tasks.
In the simplest case: a node may issue a "task" to interpret a python snippet, and the "executor" is just the current python interpreter, and the "management" is just a first-come-first-served queue of python commands.
Under more complex circumstances, the "executor" might be a lammps interpreter, and the node is submitting to it a lammps-library-readable command for interpretation.
We could go a step further in complexity and think about a Vasp interpreter, then the executor may want to receive extra input to itself letting it know whether subsequent tasks are going to be quantitatively similar or not -- i.e., can and should we mix together the charge density from the last task with the initial density from the new task's configuration, or should we start from scratch entirely with the initial density?
Then there is the entire concept of the executor intelligently allocating resources to these tasks and adjusting their order for optimal computational efficiency -- of course this is a brilliant idea, but I have no insights on how to implement it at the moment.

Pseudocode modifications to the node class run routine to accommodate executors might look something like this:

class Node:
    ...

    def run(self):
        if self.executor is None:
            function_output = self.node_function(**self.inputs.to_value_dict())
        else:
            if self.executor.is_queued(self):
              function_output = Executing()
              # Executing is some custom class the node will know how to process
              # s.t. outputs don't get updated when we process the function_output
            elif self.executor.is_finished(self):
              function_output = self.executor.collect(self)  # Executor has state! :(
              # This might return the expected tuple of output, or might return some
              # Failed() class instance or otherwise similar to Executing()
            else:
              self.executor.push(self) 
              # The executor is then responsible for executing 
              # node.node_function(**node.inputs.to_value_dict) 
              function_output = Executing()
                    
        self.process_output(function_output)
        # Catch failure errors, update output channels with successful data, etc.

Where the if/else-with and process_errors method extend existing functionality.
Under the hood, the executor methods don't just store the node (which may get updated as part of some for-loop), but are checking to see if a hash of some combination of the node+input is already in its system.
This way the node (an object on our graph) and the task (the function call with specific input) is truly decoupled, at the cost of hashing the input.
Thus, there is overhead to using an executor, but this is in-line with what Joerg and I discussed and not dissimilar from how dask delayed introduces cost (although we'll have to think/work to make sure the cost is comparable in magnitude to dask...)
For something like Lammps, we can probably also manage to send batches of tasks to the executor?

In this way, the node processes its input, produces one or more tasks, talks directly with an executor to submit them, wait for them to be executed, and retrieve the result, then processes this into node output, which then propagates through the node graph to trigger new node work cycles.
So tasks and nodes are extremely closely related, but not the same thing.
Thus, a single node really may submit multiple tasks -- perhaps even to different executors!! -- although most of the time such a scenario is probably best handled by breaking such a node into smaller "atomic" nodes and re-assembling them into a "macro" node.

Previously, we had talked about such executors/task queues belonging to the workflow itself, and all nodes submitting their directly to their workflow, or tasks of their workflow.
Today we brainstormed a little bit how the executors may themselves be nodes placed on the graph, which serve only as input to nodes, talking directly with them to get work done.
In such a paradigm this would, again, bend the rules on a node (the executor node) having "state", but would be very practical, and I'm ok with it insofar as the state is intended only to exist while the workflow is live and work is being done.

Pseudocode using executors might look something like this:

from pyiron_contrib import Workflow
from time import sleep

wf = Workflow("murn_pseudocode")

# First, let's define nodes that will exist outside all loops

wf.lattice_constants = wf.create.node.linspace(
  start=3.8, 
  end=4.2, 
  n=10
)

wf.lammps_executor = wf.create.node.lammps_executor(
  n_cores=10,
  n_instances=5
)
# This node, unfortunately, has state!
# On first run(on instantiation?), it creates an executor and returns 
# it as output. Subsequent runs do nothing (unless cores or instances change?)
# The executor object in turn holds n_instances copies of the Lammps interpreter
# and distributes tasks to them under the assumption n_cores are available.

wf.potential = wf.create.node.atomic_potential(
  type="EAM",
  species=("Al",),
  choice=0,
)

wf.lammps_engine = wf.create.node.lammps_engine(
  potential=wf.potential.outputs.potential,
  executor=wf.lammps_executor.outputs.executor,
  # If the executor is None, the engine should create 
  # its own single-use instance of the lammps interpreter 
  # on each run that just uses the main python process 
  # for computation -- i.e. it behaves the same as a pure-
  # python node that hogs the python process when executed
)

# Next, we want an outer while loop that runs until
# all the jobs in our for-loop produce real data

# Decorator is class method for single import
@Workflow.node(
  "required_or_none", "data",
  input_that_must_update=("step",)  # Won't re-execute until this has been updated
)  
def while_executing(required, step, data, sleep_time=0):
    """
    `step` is not used, but the node won't execute the function unless this channel
    has received an update since the last execution.
    """
    if any(isinstance(out, Workflow.Executing) for out in data):
        # Executing is class attribute for single import
        # Loop again
        sleep(sleep_time)
        return required, None
    else:
        # Stop and return the completed data
        return None, data
    
wf.while_executing = while_executing(sleep_time=5)

# And an inner loop that will iterate over our lattice constants
wf.for_loop = wf.create.node.for_loop()
# inputs: iterable, i, step (must update), reset
# outputs: item, i (gets incremented and connects to input), done
# A connection is made between the input i and output i
# so that we have topologically-defined pseudo-state!
# The step channel must have received an update before
# a new execution will trigger

# Next, let's define the nodes needed inside the inner loop
wf.structure = wf.create.node.bulk_structure(element="Al")
wf.calc_static = wf.create.node.calc_static(
  engine=wf.lammps_engine.outputs.engine,
)
wf.energies = wf.create.node.accumulator()
# inputs: item, items, reset
# outputs: items
# Starts as an empty list, gets pseudo-state
# by looping the output items back to the input items
# Appends item to items at each call, resetting to an
# empty list if reset==True

# Now let's wire up our loops!

# The flow of data inside the for-loop is very easy
wf.calc_static.inputs.structure = wf.structure.outputs.structure
wf.energies.inputs.item = wf.calc_static.outputs.energy_pot

# The for-loop should pass in lattice constants and iterate 
# each time we append to the accumulator
wf.structure.inputs.a = wf.for_controller.outputs.item
wf.for_loop.inputs.step = wf.energies.outputs.items

# The while loop will kill the for-loop by destroying its
# iterable input
wf.while_executing.inputs.required = wf.lattice_constants.outputs.array
wf.for_loop.inputs.iterable = wf.while_executing.outputs.required
# Or reset the loop
wf.for_loop.inputs.reset = wf.while_executing.outputs.required

# The while loop should step with each completion of the for-loop,
wf.while_executing.inputs.step = wf.for_loop.outputs.done
# and once when we start the graph
wf.while_executing.inputs.step = wf.lattice_constants.outputs.array

# And the while loop needs to see the accumulated energies,
# to check whether they're all finished data or not
wf.while_executing.inputs.data = wf.energies.outputs.items


# Finally, we'll use the collected energies and lattice constants
# to calculate something
@Workflow.node("bulk_modulus")
def murnaghan(lattice_constants, energies: list | np.ndarray):
  # Do the math
  return 42

wf.bulk_modulus = murnaghan(
  lattice_constants=wf.lattice_constants.outputs.array,
  energies=wf.for_loop.outputs.data,
)

wf.run()
print(wf.bulk_modulus.outputs.bulk_modulus)
>>> 42

The idea is that we have an inner for-loop which generates Lammps static calculations and ships them off to the Lammps executor queue.
These calc_static nodes get different output depending on wheth
Outside this we have a while-loop that keeps running the for-loop until all the accumulated energies are actually energies and not unfinished-calculation signals.
In the process of writing this up, I really see the benefit of having control channels ala Unreal blueprints, but I wanted to keep this example as close as possible to the existing code -- I'll go play around with this idea afterwards.
Nope, wait, this whole example gets broken because there is only one node calc_static -- somewhere the calc status needs to be preserved, and while that could be on the executor it is better on the node, but then we really need multiple node instances...
In this way the node should really submit itself to the executor, and I really need multiple node instances, and probably a special node class that handles the async execution....
And then some way of handling for loops and while loops that somehow doesn't involve spamming a million nodes?
I need to think on it a bit again.

Finally, Joerg shared an idea he had from listening to some of the engineers' presentations about providing a pyiron wrapper for snakemake.
Snakemake is brilliant in that it readily allows for work to use different conda environments, distributes resources, etc., which are wheels we really don't want to re-invent.
But, it is also really ugly to write (basically old-school make files), and strictly acyclic.
Since we are already on-board with having node IO be pointers to files, Joerg's insight was to have nodes that allow us to write snakemake rules in python, and then convert these to snakemake for execution.
There would be overhead for this, so you would only want to do it for more expensive calls or for situations where you really needed a different env, but it immediately gives us a lot of flexibility for incorporating new codes.
Further, if you write any sort of IO parser for the file-based snakemake execution you want to do, this would allow you to embed the (strictly acyclic) snakemake call inside a (potentially) cyclic pyiron graph!
There is no concrete plan, but we figured some sort of @snakemake_node decorator should be fairly straightforward.

Technical note: English notes were completed immediately following the meeting on 6 April, but I didn't have time to sit down and write the Murnaghan pseudo-code and accompanying paraphraphs (i.e. the header stating what the pseudocode is for and the paragraph immediately after the code) until 11 April.

@liamhuber
Copy link
Member

Better (but still imperfect) pseudocode for loops, relying on nodes having flow control events:

from pyiron_contrib import Workflow

wf = Workflow("murn_pseudocode")

wf.lattice_constants = wf.create.node.linspace(
  start=3.8, 
  end=4.2, 
  n=10
)

wf.lammps_executor = wf.create.node.lammps_executor(
  n_cores=10,
  n_instances=5
)
# This node, unfortunately, has state!
# On first run(on instantiation?), it creates an executor and returns 
# it as output. Subsequent runs do nothing (unless cores or instances change?)
# The executor object in turn holds n_instances copies of the Lammps interpreter
# and distributes tasks to them under the assumption n_cores are available.

wf.potential = wf.create.node.atomic_potential(
  type="EAM",
  species=("Al",),
  choice=0,
)

wf.lammps_engine = wf.create.node.lammps_engine(
  potential=wf.potential.outputs.potential,
  executor=wf.lammps_executor.outputs.executor,
  # If the executor is None, the engine should create 
  # its own single-use instance of the lammps interpreter 
  # on each run that just uses the main python process 
  # for computation -- i.e. it behaves the same as a pure-
  # python node that hogs the python process when executed
)

inner = Workflow("inner_loop")
inner.structure = inner.create.node.bulk_structure(
  element="Al"
)
inner.calc_static = inner.create.node.calc_static(
  structure=inner.structure.outputs.structure
)

lattice_energy = Workflow.meta.for_loop(
  body=inner,
  step=inner.calc_static.outputs.control.ran,
  iterable_inputs={
    "lattice_constant": inner.structure.inputs.lattice_constant
  },
  iterable_outputs={
    "energy_pot": inner.calc_static.outputs.energy_pot,
  }
)
# a meta node takes a workflow instance and returns a node class

wf.for_loop = lattice_energy()
# This is a macro node. When called it will make sure that its children
# match with its input, creating or destroying nodes as needed
# In this case, it loops over its iterable input to create a body node for each
# element in the iterable inputs (can there be more than one??) and updates the 
# corresponding input channel in each to trigger a run of the subgraph
# then collects all the subgraph outputs into iterable_outputs
# What happens if the number of iterables changes??
#  Death? Error? Maybe it's OK on the first call and the first call only, since it's a macro
# Can we loop over multiple iterable inputs as long as they're the same length?
#  Yes, zip and execute
# What about when they're different lengths?
#  No, nest multiple meta nodes together
# Can we collect an arbitrary number of iterable outputs?
#  Yes
# How to change/access non-iterable IO?
#  Some sort of broadcasting magic? Then do we even need iterable_outputs?
wf.for_loop.iterable_inputs.lattice_constants = wf.lattice_constants.outputs.array

wf.all_finished = wf.create.node.none_running(
  data=wf.for_loop.iterable_outputs.energy_pot
)

wf.while_loop = wf.create.node.while_loop(
  condition=wf.all_finished.outputs.truth,
  step=wf.for_loop.outputs.control.done,
)
wf.for_loop.inputs.control.reset = wf.while_loop.outputs.control.if_false

@Workflow.node("bulk_modulus")
def murnaghan(lattice_constants, energies: list | np.ndarray):
  # Do the math
  return 42

wf.bulk_modulus = murnaghan(
  lattice_constants=wf.lattice_constants.outputs.array,
  energies=wf.for_loop.iterable_outputs.energy_pot,
  update_automatically=False
)
wf.bulk_modulus.inputs.control.run = wf.while_loop.outputs.control.if_true

@JNmpi
Copy link
Contributor Author

JNmpi commented Apr 14, 2023

@liamhuber, thanks for the great summary and all the suggestions and pseudocode. I like most of your pseudocode. I am however afraid that the loop part (with the inner workflow) will be hard to understand - it is rather far away from any standard python notation. For acceptance, this is however a super important criterion. Below are some (very preliminary) thoughts to combine a python-like syntax with a workflow notation:

  @node()
  def energy_convergence(job_old, job_new, eps):
      while not job_new.is_finished:   # wait until job is finished (or aborted)
          wait(1)
      return np.abs(np.mean(job_new.output.energy) - np.mean(job_old.output.energy)) < eps
  
  job = wf.create.jobs.lammps('my_name')
  job.run()
  for it in wf.arange(10):  # it is the inner workflow object (not just an integer index)
      # the next line calls job.next(i_structure=-1, iter_step=it), which is job/node specific
      # for an atomistic job n_iter determines the structure that should be taken (-1 is the last step) 
      # 'it' also appends the inner workflow to the job object
      job_new = it.next(job, i_structure=-1)  
      job_new.run() # append command (here run) to inner workflow
      it.break_if(wf.energy_convergence(job, job_new, 1e-5) # call and append to inner workflow
      job = job_new.copy()   

The main idea is to create an iterator object that replaces the normal integer index and provides all the functionality to run and store the inner workflow in the loop. This object not only allows to call break or continue but can be appended also to node objects to log and store all commands performed for this node. This logging ability is also needed e.g. for the structure object, e.g., to store when vacancies, substitutions etc. are done.

Again, very preliminary ideas but it would be good to discuss them.

@liamhuber
Copy link
Member

@JNmpi so the good news is that the sort of thing you propose is actually already completely doable, the bad news is that I have concerns with the paradigm (but concerns, not objections! And I strongly agree that similarity with python is critical for adoption).

So, first, the good news: if you're just doing things live in the notebook, we can actually really easily dynamically create new nodes, and even dynamically create node connections! This comes very naturally from our syntactic sugar that wf.some_node is equivalent to wf["some_node"]. The example below is a little bit simpler than yours, but the extension to something like energy convergence is quite trivial, and this example works out-of-the-box with the current HEAD of pyiron_contrib:

from pyiron_contrib.workflow.workflow import Workflow
from pyiron_contrib.workflow.node import node

@node("y")
def add_one(x):
    return x + 1

wf = Workflow("my_loop")

wf.n0 = add_one(x=0)

i = 0
while wf[f"n{i}"].outputs.y.value < 5:
    wf.add(add_one(x=wf[f"n{i}"].outputs.y, label=f"n{i+1}"))
    i += 1

print(wf.nodes.keys())
>>> dict_keys(['n0', 'n1', 'n2', 'n3', 'n4'])
print(wf.n4.outputs.y.value)
>>> 5

Now, what worries me about these examples is that mine (exclusively) and yours (naively) work only when live in a notebook, i.e. they once again promote the jupyter notebook rather than the Workflow to the position of supremacy; they are not serializable because they are adapting themselves on the fly (i.e. they are internally calling run -- mine implicitly when updating input connections, yours explicitly) and the rule for how they adapt themselves lives in the notebook cell (the for and while commands, respectively) and is not defined inside the graph scope. Further, mine (exclusively) and yours (naively) don't even have a graphical representation!

I keep saying "naively" for yours, because I really like your idea of allowing the workflow to construct a custom iterator. In this paradigm, I can envision that your entire example is strictly syntactic sugar on top of building something like my node-based loop example a couple comments above -- I suspect this may just be what you're driving at already!! In this way, we could satisfy code-based users at the same time that we maintain a consistent, node-based paradigm -- Which in turn keeps a consistent universe for graphical and text users, and makes sure we are working with serializable (and thus shareable) objects.

How do we achieve this technically? Honestly, I'm not sure. I took a peek at some of the generator/iterator docs today to refresh my memory, and they are relatively powerful and flexible objects, but this is asking an awful lot of them. We'll need to either delay run commands or provide some other alternative/wrap them in-context somehow so that actual computation is not required at definition time. I'm cautiously optimistic that this is a direction worth pursuing.

For me, falling back to live notebook objects is a show stopper and must be avoided. But, if we agree that this sort of loop syntax is actually just sugar on top of constructing some sort of computation-free, serializable graph objects under the hood, then I would propose to simply continue development using the more verbose but rigorous paradigm of explicit flow-control nodes, and then figure out how we can get an iterator object to map onto those.

More discussions in real-time also sounds good. This coming Monday (the 17th) I need to leave the pyiron meeting a little early, and won't be free again until ~10:00, and on Friday I am unavailable starting ~11:00. Otherwise I should be more or less free to schedule something 06:00-16:00 PST, so it depends on the workshop schedule for you and @jan-janssen. I guess @pmrv and @samwaseda might also be interested, in which case a morning time slot is important.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature; can be answer partner of "feature request"
Projects
None yet
Development

No branches or pull requests

2 participants