"""
The pipeline structure is used to define a sequence of processes
to be applied one after another onto a spim, to automate a
detection task consisting of multiple process steps.
"""
import dill
[docs]class Pipeline(object):
"""A pipeline is a sequence of processes, that can be applied one after
another.
The processes are stored in a Dictionary, along with the SpimStage at which
they can be applied. The pipeline can be applied completely using the
`apply_all_steps` method or partially using the `apply_from_stage_to_stage`
method.
"""
def __init__(self, processes):
self.process_stage_dict = dict([(p.input_stage, p) for p in processes])
@property
def processes(self):
return self.process_stage_dict.values()
[docs] def replaced_with(self, new_process):
"""This will give a new pipeline, where one process is replaced with
the given one
Parameters
----------
new_process : SpotlobProcessStep
the new process to be inserted
Returns
----------
Pipeline
the pipeline, that includes `new_process`
"""
new_dict = self.process_stage_dict.copy()
new_dict[new_process.input_stage] = new_process
return Pipeline(new_dict.values())
[docs] def apply_from_stage_to_stage(self, spim, from_stage, to_stage):
"""Recursively applies the pipeline-processes from a given stage up to
another given stage.
Parameters
----------
spim : Spim
The image item to apply parts of the pipeline to
from_stage : int
SpimStage at which stage the first process should be applied
to_stage : int
SpimStage at which stage the last process should be applied
Returns
----------
Spim
The processed Spim at stage `to_stage`
Raises
------
Exception:
If to_stage is before from_stage
"""
if from_stage == to_stage:
return spim
elif from_stage > to_stage:
# return self.apply_from_stage_to_stage(spim, to_stage, from_stage)
raise Exception("invalid apply request")
else:
# recursive
intermediate_spim = self.apply_from_stage_to_stage(
spim, from_stage, to_stage-1)
last_process = self.process_stage_dict[to_stage-1]
outspim = intermediate_spim.do_process_at_stage(last_process)
return outspim
def _maxstage(self):
return max(self.process_stage_dict.keys())+1
[docs] def apply_all_steps(self, spim):
"""Apply the complete pipeline on a given spim
Parameters
----------
spim : Spim
the spotlob image item to apply the complete pipeline to
Returns
-------
Spim
a spotlob image item a the stage after the last process
"""
minstage = min(self.process_stage_dict.keys())
maxstage = self._maxstage()
return self.apply_from_stage_to_stage(spim, minstage, maxstage)
[docs] def apply_at_stage(self, spim):
"""Applies all steps following the stage of the spim
Parameters
----------
spim : Spim
the spotlob image item to apply the pipeline to
Returns
----------
Spim
the processed Spim at stage `to_stage`
"""
startstage = spim.stage
maxstage = self._maxstage()
return self.apply_from_stage_to_stage(spim, startstage, maxstage)
[docs] def apply_outdated_up_to_stage(self, spim, up_to_stage):
"""Applies all processes since the first outdated one on spim up to
a given stage if no process is outdated or if the outdated stage
is past up_to_stage, spim is processed up to up_to_stage,
or a predecessor is returned at up_to_stage
Parameters
----------
spim : Spim
Spim to apply the pipeline to
up_to_stage : int
SpimStage up to which the pipeline should be applied
Returns
-------
Spim
the processed Spim at stage `up_to_stage`
"""
first_outdated_stage = -1
# find first stage that is outdated
for st, process in sorted(self.process_stage_dict.items()):
if process.outdated:
first_outdated_stage = st
break
if first_outdated_stage >= 0:
if first_outdated_stage < up_to_stage:
return self.apply_from_stage_to_stage(
spim, first_outdated_stage, up_to_stage)
# nothing is outdated or outdated stage is past up_to_stage
if up_to_stage > spim.stage:
return self.apply_from_stage_to_stage(spim,
spim.stage,
up_to_stage)
else:
return spim.get_at_stage(up_to_stage)
[docs] def save(self, target_path):
"""Store the pipeline including process paramaters for
later use.
Parameters
----------
target_path : str
path of the file to store the pipeline to
Notes
-----
This creates a file that is also suitable for batch process
and parallelization.
See also
--------
Pipeline.from_file
Use `Pipeline.from_file` to restore the pipeline object
from storage
"""
with open(target_path, "wb") as dill_file:
dill.dump(self, dill_file)
[docs] @classmethod
def from_file(cls, filepath):
"""Restore a pipeline from a file
Parameters
----------
filepath : str
filepath of the pipeline file
Returns
-------
Pipepline
the restored pipeline object
"""
with open(filepath, "rb") as dill_file:
restored_pipe = dill.load(dill_file)
return restored_pipe
def __str__(self):
out = []
for stage in sorted(self.process_stage_dict.keys()):
process = self.process_stage_dict[stage]
out += [str(process)]
return "".join(out)