Note
This technote is a work-in-progress.
1 Abstract¶
This technote proposes a new model for executing pipelines against butler repositories, in which both quantum graphs and output datasets are managed by a butler workspace before being committed back to the central data repository.
Workspaces generalize the “quantum-backed butler” system (DMTN-177 [3]) to all butler-backed execution with a more powerful and use-case-focused interface than the current pipetask
tool.
By storing quantum graphs in a butler-managed location, workspaces also pave the way to storing quantum provenance in butler.
2 Conceptual overview¶
2.1 Origin in BPS/QBB¶
The execution model of the Batch Production Service (BPS) with quantum-backed butler (QBB, see DMTN-177 [3]) can be seen as three intuitive steps, analogous to transactional operations in SQL:
bps prepare
starts the process defining a moderately large unit of processing processing, via a sequence of related directed acyclic graphs (DAGs), starting with the quantum graph. These graphs and other WMS-specific files are saved to a “submit” directory, whose location is managed by the user even though the contents are managed by BPS. This can be seen as the beginning of a transaction for a run - no modifications are made to the central data repository.bps submit
executes that processing by delegating to a third-party workflow management system (WMS). All of these batch jobs (aside from the very last) write the output datasets as file artifacts, but do not read or write from the central data repository’s database at all.The last batch job inserts records describing the produced file artifacts to the central database, effectively committing the transaction by putting all produced datasets into a single
RUN
collection.
These steps are not always visible to users - bps submit
can run the prepare
step itself, and usually does, and in the absence of errors the division between the regular execution batch jobs and the final “transfer” job is invisible.
What matters is that this model provides a sound foundation for higher-level tools, with the existence of a discrete “commit” step the aspect that is most notably lacking in non-BPS execution.
2.2 Workspace basics¶
The middleware team has long wanted to unify the bps
and pipetask
interfaces, and the proposal here is to do just that, by introducing a new workspace concept that generalizes the BPS/QBB flow to all execution.
A workspace is best thought of an under-construction, uncommitted RUN
collection, known to the central data repository but not included in its databases (except perhaps as a lightweight record of which workspaces exist).
Users interact with a workspace via a WorkspaceButler
client, which subclasses LimitedButler
.
When a workspace is committed a regular RUN
collection is created and the workspace ceases to exist.
Workspaces can hold quantum graphs, output file artifacts (via a datastore), and provenance/status information.
Workspaces can have multiple implementations and capabilities.
WorkspaceButler
itself will not assume anything about tasks or pipelines, as appropriate for a class defined in daf_butler
.
Our main focus here will be a PipelineWorkspaceButler
subclass that serves as a pipetask
replacement, and we imagine it storing its persistent state (mostly quantum graphs) using files (see 5 Implementation notes).
We envision BPS and its plugins delegating to the PipelineWorkspaceButler
via a command-line interface, much as they currently do with pipetask
, but we hope that eventually the front-end interface to BPS could be a WorkspaceButler
subclass as well.
Workspaces may read from the central repository database for certain operations after they are created, but they must not write to it except when committed, and it is expected that many core operations will not access the central database at all (such as actually running tasks with PipelineWorkspaceButler.run_quanta()
).
2.3 Provenance¶
One clear advantage of the workspace model is that the “commit” step provides an opportunity for provenance to be gathered and possibly rewritten into a form more focused on post-execution reads than at-scale execution writes.
This is a problem largely ignored by DMTN-205 [2], which describes a central database schema for provenance but lacks a complete plan for populating it.
In addition to the workspace interfaces, this technote will describe new methods on the Butler
class that provide some provenance query support.
These could be implemented by storing quantum graphs and quantum status in files after workspaces are committed, while leaving room for database storage of provenance graphs as described in DMTN-205 [2] in the future.
2.4 Development mode¶
Workspaces would also be useful in pipeline software development and small-scale testing, especially if we could (sometimes) relax our consistency controls on software versions and pipeline configuration prior to “commit”, a state we will call “developement mode”.
Furthermore, many development runs would not be committed at all, because the data products themselves are not the real outcome of that work - working code is.
This suggests that workspaces should also support an analog of SQL’s ROLLBACK
as an alternative to commit (though we will use WorkspaceButler.abandon()
as the nomenclature here to avoid confusion with the very different relationship between commit
and rollback
in other contexts, e.g. some version control systems).
2.5 External workspaces¶
If we provide an option to create a workspace in an external location instead of inside a central data repository, we get an intriguing possibility for execution in the Rubin Science Platform (RSP) or on a science user’s local compute resources. The quantum graph stored in the workspace knows about all needed input datasets, and these could be transferred to the external location as well, providing a self-contained proto-data-repository that could be manually moved and ultimately executed anywhere, and it contains all of the information needed to reconstitute a standalone data repository with its own database. Alternatively, if the workspace is left in a location where it can read from the central data repository but not write to it, input datasets could be left in the central data repository and the user could choose to either create a standalone repository that merely references them or commit by transferring output dataset file artifacts to the cental repository root.
2.7 Data repository consistency¶
A serious but often ignored problem with QBB (and its predecessor, “execution butler”) is that it is inconsistent with the nominal butler consistency model, in that it allows file artifacts to exist in a datastore root without having any record of those artifacts in the corresponding registry.
This consistency model is only “nominal” because it was never actually sound (deletions in particular have always been a problem), and hence we’ve considered it an acceptable casualty when working to limiting database access during execution (i.e. DMTN-177 [3]).
DMTN-249 [1] attempts to address this issue by defining a new consistency model that explicitly permits (and actually requires) file artifacts to exist prior to their datasets’ inclusion in the registry, as long as those datasets are tracked as possibly existing in some other way that allows them to be found by any client of the central data repository (not merely, e.g. one that knows the user-provided location of an external quantum graph file, as is the case today).
The workspace concept described here is a realization of this new consistency model: the central data repository will have a record of all active (internal) workspaces (probably in a registry database table) and its full butler clients will have the ability to construct a WorkspaceButler
for these by name.
Workspace implementations are required to support the WorkspaceButler.abandon()
operation, which effectively requires that it have a way to find all of its file artifacts.
2.8 Concurrency and workspace consistency¶
Different workspaces belonging to the same central data repository are fully independent, and may be operated on concurrently in different processes with no danger of corruption. This includes commits and workspace creation, and races for creation of the same workspace should result in at least one of the two attempts explicitly failing, and in no cases should hardware failures or concurrency yield an incorrectly-created workspace that can masquerade as a correct one.
Operations on workspaces should be idempotent either by being atomic or by being resumable from a partial state after interruption, with exceptions to this rule clearly documented.
Most operations on a workspaces are not expected to support or guard against concurrent operations on the same workspace, unless they are operating on disjoint sharding data IDs, in which case concurrency is required of all sharding workspace implementations.
In addition, some workspace-specific interfaces are expected to explicitly support concurrency under certain conditions; this is, after all, much of the reason we require workspaces to avoid central database access whenever possible.
Most prominent of these is the PipelineWorkspaceButler.run_quanta()
method, which guarantees correct behavior in the presence of concurrent calls as long as those calls identify disjoint sets of quanta to run, allowing it to be used as an execution primitive by higher-level logic capable of generating those disjoint sets (e.g. BPS).
In fact, we will define PipelineWorkspaceButler.run_quanta()
to block when it is given a quantum whose inputs are produced by other quanta that have not been run.
This is possible because this workspace implementation will also record quantum status (e.g. in per-quantum report files) for provenance, and we can modify the single-quantum execution logic to wait (e.g. for these files) to appear upstream.
This actually addresses a possible soundness problem in our current execution model: ResourcePath
backends other than POSIX and S3 may not guarantee that an output file artifacts write will propagate to all servers by the time a read or existence check is performed on a downstream node, let alone by the time the write call returns, and we think we’ve seen this problem in some WebDAV storage in the past.
With our current approach to execution this can result in silently incorrect execution, because the dataset will be assumed to have been not produced and may be ignored by the downstream.
2.9 Phasing out prediction-mode datastore¶
Allowing file datastore to operate in “prediction mode” (where a datastore client assumes datasets were written with the same formatters and file templates it is configured to use) was a necessary part of getting QBB working. With workspaces, we hope to retire it, by saving datastore records along with the quantum status information the workspace will already be required to store, and which downstream quanta will be required to read. This will allow us to simplify the datastore code and eliminate existence checks both during execution and when committing a quantum-graph-based workflow back to the central data repository.
2.10 Pipeline workspace actions¶
Creating a new pipeline workspace will not require that the quantum graph be produced immediately, though this will be a convenience operation (combining all of the steps described below) we expect to be frequently exercised.
Instead, only the pipeline must be provided up-front, and even that can be overwritten in development mode.
Tasks within that pipeline are activated next, which writes their init output file artifacts to the workspace (note that this now happens before a quantum graph is built).
Quanta can then be built, shard-by-shard; the workspace will remember which shards have quanta already.
Even executed quanta are not immutable until the workspace is committed - PipelineWorkspaceButler
will provide methods for quantum state transitions, like resetting a failed quantum so it can be attempted again.
Development mode will provide much more flexibility in the order these steps these can be performed, generally by resetting downstream steps and allowing output dataset to be clobbered.
3 Python components and operation details¶
3.1 Workspace construction and completion (daf_butler
)¶
Workspaces are expected to play a central role in the DMTN-249 [1] consistency model, in which file artifacts are written prior to the insertion of their metadata into the database, but only after some record is made in the central repository that those artifacts may exist.
This means workspace construction and removal operations need to be very careful about consistency in the presence of errors and concurrency.
This is complicated by the fact that workspaces are extensible; all concrete implementations will live downstream of daf_butler
.
We envision workspace creation to be aided by two helper classes that are typically defined along with a new WorkspaceButler
implementation:
A
WorkspaceFactory
is a callable that creates a new workspace from a full butler and some standard user-provided arguments, returning aWorkspaceButler
instance and aWorkspaceConfig
instance.WorkspaceConfig
is a pydantic model that records all state needed to make a newWorkspaceButler
client to an existing workspace, with a method to create aWorkspaceButler
from that state.
After a factory is used to write the rest of the workspace’s initial state, the configuration is written to a JSON file in a predefined (by a URI template in the central repository butler configuration) location.
This file is read and used to make a new WorkspaceButler
instance without requiring access to the full butler.
This is all driven by a make_workspace()
method on the full Butler
class, for which the prototyping here includes a nominal implementation with detailed comments about how responsibility (especially for error-handling) is shared by different methods.
After an internal workspace has been created, a client WorkspaceButler
can be obtained from a full Butler
to the central repository by calling Butler.get_workspace()
, or without ever making a full butler by calling ButlerConfig.make_workspace_butler()
.
External workspace construction goes through WorkspaceButler.make_external()
.
We expect most concrete workspace implementations to define a butler
subcommand for their creation, and for most end users to interact only with that command-line interface.
Committing a workspace goes through WorkspaceButler.commit()
, and abandoning one goes through WorkspaceButler.abandon()
.
Creating a new standalone repository goes through WorkspaceButler.export()
.
All of these have nominal implementations in the prototype, showing that they delegate to many of the same abstract methods to transfer their content to full butlers and remove anything that remains from the workspace.
To make exporting and external workspaces more useful, WorkspaceButler.transfer_inputs()
may be used to transfer the file artifacts of input datasets used by the workspace into the workspace itself.
This will allow the external workspace (depending on its implementation) to be used after fully severing its connection to the central data repository (e.g. copying it to a laptop).
These datasets are also included in the standalone data repository created by WorkspaceButler.export()
.
3.2 Quantum graph and provenance queries (daf_butler
)¶
This technote includes a simplified (relative to [2]) proposal for provenance querying on the full butler, primarily because PipelineWorkspaceButler
will require major changes to the QuantumGraph
class, and it makes sense to include changes directed at using the same class (or at least a common base class) to report provenance to avoid yet another round of disruption.
The entry point is Butler.query_provenance()
, which delegates much of its functionality to a new parsed string-expression language, represented here by the QuantumGraphExpression
placeholder class.
I am quite optimistic that this will actually be pretty easy to implement, with one important caveat: we do not promise to efficiently resolve these expressions against large (let alone unbounded) sequences of collections, allowing us to implement expression resolution by something as simple as reading per-collection quantum-graph files into memory and calling networkx methods (which are generally O(N) in the size of the graph we’ve loaded).
A moderately complex expression could look something like this:
isr..(..warp@{tract=9813, patch=22, visit=1228} | ..warp@{tract=9813, patch=22, visit=1230})
which evaluates to “all quanta and datasets downstream of the isr
task that are also upstream of either of two particular warp
datasets”.
The standard set operators have their usual meanings, and ..
is used (just as in the butler data ID query language) to specify ranges.
In the context of a DAG, half-open ranges mean ancestors
or descendants
, while:
a..b
is a shortcut for:
a.. & ..b
The return type is the new QuantumGraph
, which has very little in common with its lsst.pipe.base.QuantumGraph
predecessor in terms of its Python API, though of course they are conceptually very similar.
They may have more in common under the hood, especially with regards to serialization.
Major differences include:
It does not attempt to hold task or config information, which makes it easier to put the class in
daf_butler
(where it needs to be forButler.query_provenance()
). Instead it just has string task labels that can be looked up in aPipeline
orPipelineGraph
, which are stored as regular butler datasets in the sameRUN
collection as the quanta they correspond to.Its interface is designed to allow implemenations to load the the
DataCoordinate
andDatasetRef
information associated with a node only on demand. Our profiling has shown that saving and loading that information constitutes the vast majority of the time it takes to serialize and deserialize graphs, and we want to give future implementations the freedom to simply not do a lot of that work unless it’s actually needed. We expect many provenance use cases to involve traversing a large graph while only requiring details about a small subset of the nodes.It records status for each node via the new
QuantumStatus
andDatasetStatus
enums. These are not intended to provide human-readable error reports - this is a known gap in this proposal I’d like to resolve later - but they do provide a sufficiently rich set of states to represent human-driven responses to errors during processing as state transitions (e.g.PipelineWorkspaceButler.accept_failed_quanta()
,poison_successful_quanta()
, andreset_quanta()
), as well as a way to record those responses in provenance.Many of the current
lsst.pipe.base.QuantumGraph
accessors are missing, having been replaced by a bipartitenetworkx.MultiDiGraph
accessor that includes nodes for datasets as well as quanta. Some of these may return as convenience methods for particularly common operations, but I like the idea of embracing the networkx graphs as the primary interface, having grown quite fond of them while working onPipelineGraph
.In order to be suitable for provenance query results that can span collections, a particular
QuantumGraph
instance is not longer restricted to a singleRUN
collection
The prototype here defines QuantumGraph
as an ABC
, which may or may not be ultimately necessary.
It may be that a single implementation could satisfy all quantum-oriented concrete workspaces as well as Butler.query_provenance()
.
How to handle the storage of committed quantum graphs is a question this technote does not attempt to fully answer. The best answer probably involves a new registry “manager” interface and implementation, despite this implying the introduction of file-based storage to the registry (for the implementation we imagine doing first; a database-graph-storage option as in [2] should be possible as well, if we find we need it). We want changing the quantum graph storage format to be tracked as a repository migration, and using the registry manager paradigm is our established way of doing that. Storing quantum graph content in database blob columns may be another good first-implementation option; this still saves us from having to fully map the quantum graph data structure to tables, while still allowing us to pull particularly useful/stable quantities into separate columns.
3.3 Pipeline workspace (pipe_base
)¶
The PipelineWorkspaceButler
that defines the new interface for low-level task execution has been prototyped here as an ABC
living in pipe_base
.
We could implement this ABC fully in ctrl_mpexec
using modified versions of tools (e.g. SingleQuantumExecutor
) already defined there, but it may also make sense to move most or all of the implementation to pipe_base
, perhaps leaving only multiprocessing
execution to ctrl_mpexec
while implementing serial, in-process execution in pipe_base
.
Even if the base class ends up concrete and capable of simple multiprocessing, I do expect it to support subclassing for specialized pipeline execution contexts (BPS, prompt processing, mocking), though composition or even command-line delegation may be a better option for some of these.
A pipeline workspace is initialized with a Pipeline
or PipelineGraph
, and upon creation it stores these and (usually) the software versions currently in use as datasets (with empty data IDs) to the workspace.
Getter methods (get_pipeline()
, get_pipeline_graph()
, get_packages()
) provide access to the corresponding in-memory objects (these are not properties because they may do I/O).
A pipeline workspace can be initialized in or converted (irreversibly) to development_mode
, which disables the saving of pipeline versions.
The pipeline associated with the workspace may be reset
only in development mode.
Committing a development-mode pipeline workspace does not save provenance or configs to the central repository, because these are in general unreliable.
Often development-mode workspaces will ultimately be abandoned instead of committed.
After initialization, pipeline workspace operations proceed in roughly three stages:
Tasks are activated, which writes their init-outputs (including configs) to the workspace and marks them for inclusion (by default) in the next step.
activate_tasks()
may be called multiple times as long as the order of the calls is consistent with the pipeline graph’s topologicial ordering. Init-input and init-output dataset references are accessible viaget_init_input_refs()
andget_init_output_refs()
. It is a major change here that these are written before a quantum graph is generated. This has always made more sense, since init datasets do not depend on data IDs or other the criteria that go into quantum graph generation, but we defer init-output writes today to after quantum graph generation in order to defer the first central-repository write operations as along as possible (particularly until after we are past the possibility of errors in quantum graph generation). With workspaces deferring committing any datasets to the central repository already, this is no longer a concern and we are free to move init-output writes earlier.Quanta are built for the active tasks or a subset thereof via one or more calls to
build_quanta()
. Quanta are persisted to the workspace in an implementation-defined way (the first implementation will use files similar to the.qgraph
files we currently save externally). In a sharded workspace, the quanta for different disjoint sets of shard data IDs (not arbitrary data IDs!) may be built separately - at first incrementally, by serial calls tobuild_quanta()
, but eventually concurrent calls ormultiprocessing
parallelism should be possible as well. We also eventually expect to support building the quanta for subsequent tasks in separate (but serial) calls, allowing differentwhere
expressions (etc) for different tasks within the same graph. This is actually a really big deal: it’s a major step towards finally addressing the infamous DM-21904 problem, which is what prevents us from building a correct quantum graph for a full DRP pipeline on even small amounts of data. Very large scale productions (of the sort handled by Campaign Management) will continue to be split up into multiple workspaces/collections with external sharding, but smaller CI and developer-initiated runs of the full pipeline should possible within a single workspace, with a single batch submission and a single outputRUN
collection.WorkspaceButler.transfer_inputs()
does nothing on aPipelineWorkspaceButler
unless quanta have already been generated.Quanta are executed via one or more calls to
run_quanta()
. Here the base class does specify correct behavior in the presence of concurrent calls, if the quanta matching the arguments to those calls are disjoint: implementations must not attempt to execute a matching quantum until its upstream quanta have executed successfully, and block until this is the case. Quanta that have already been run are automatically skipped, while those that fail continue to block everything downstream.run_quanta()
accepts both simple UUIDs and rich expressions in its specification of which quanta to run, but only the former is guaranteed to be O(1) in the size of the graph; anything else could traversing the full graph, which is O(N). Passing quantum UUIDs toPipelineWorkspaceButler
is what we expect BPS to do under the hood, so this has to be fast, while everything else is a convenience.
PipelineWorkspaceButler
has several methods for tracking these state transitions:
active_tasks
: the set of currently active tasks;get_built_quanta_summary()
: the set of task and shard data ID combinations for which quanta have already been built;query_quanta()
: the execution status of all built quanta. This method’s signature is identical to that ofquery_provenance()
, but it is limited to the workspace’s own quanta. Since it reports on quanta that could be executing concurrently, the base class makes very weak guarantees about how up-to-date its information may be. It may also be far less efficient than WMS-based approaches to getting status, especially for queries where the entire quantum graph has to be traversed. The ideal scenario (but probably not a near-term one) for BPS would be a WMS-specific workspace provided by a BPS plugin implementing this method to use WMS approaches first, delegating toPipelineWorkspaceButler
via composition or inheritance with narrower queries only when necessary.
In development mode, task init-outputs (including configs) are rewritten on every call to build_quanta()
or run_quanta()
, because we have no way to check whether their contents would change when software versions and configs are not controlled.
Quanta are not automatically rebuilt, because graph building is often slow, most development changes to tasks do not change graph structure, and it is reasonable to expect developers to be aware of when it does.
We can efficiently most detect changes to the PipelineGraph
due to code or config changes and warn or fail when the quantum graph is not rebuilt when it should be, but this is not rigorously true: changes to a adjustQuantum()
implementation are invisible unless the quantum graph is actually rebuilt.
Rebuilding the quantum graph for a shard-task combination that already has quanta is permitted only in development mode, where it immediately deletes all outputs that may exist from executing the previous quanta for that combination.
PipelineWorkspaceButler
also provides methods for changing the status of already-executed quanta, even outside development mode, with consistent changes to (including, sometimes, removal of) their output datasets:
accept_failed_quanta()
marks failed quanta as successful (while remembering their original failed state and error conditions), allowing downstream quanta to be run as long as they can do so without the outputs the failed quanta would have produced. Invoking this on all failed quanta is broadly equivalent to committing all successfully-produced datasets to the central data repository and starting a new workspace with the committedRUN
collection as input, which is effectively how failures are accepted today. It is worth considering whether we should require all failed quanta to be accepted before a workspace is committed (outside development mode) to make this explicit rather than implicit.poison_successful_quanta()
marks successful quanta as failures, as might need to occur if QA on their outputs revealed a serious problem that did not result in an exception. Downstream quanta that had already executed are also marked as failed - we assume bad inputs implies bad outputs. This does not immediately remove output datasets, but it does mark them asINVALIDATED
, preventing them from being committed unless they are again explicitly reclassified.reset_quanta()
resets all matching quanta to the just-builtPREDICTED
state, deleting any existing outputs.
The activate_tasks()
and build_quanta()
methods accept a parsed string PipelineGraphExpression
similar to (but simpler than) the QuantumGraphExpression
described in 3.2 Quantum graph and provenance queries (daf_butler), which just uses dataset types and task labels as identifiers since data IDs and UUIDs are irrelevant.
We also expect to enable this expression language to be used in the definition of labeled subsets in pipeline YAML files in the future.
4 Command-line interface¶
This technote does not yet include a prototype command-line interface for PipelineWorkspaceButler
, despite this being the main way we expect most users to interact with it.
We do expect the CLI to involve a suite of new butler
subcommands (possibly another level of nesting, i.e. butler workspace build-quanta
, butler workspace run-quanta
), and for the current pipetask
tool to be deprecated in full rather than migrated.
Details will be added to this technote on a future ticket.
5 Implementation notes¶
While the interface of PipelineWorkspaceButler
is intended to permit implementations that store their persistent state in other ways, such as a NoSQL database (Redis seems particularly well suited), the initial implementation will use files.
We’ll need a file-based implemenatation in the long term anyway to make it easy to set up a minimal middleware environment without the sort administrative responsibilities nearly all databases involve.
These files will sometimes be managed by a Datastore
; certainly this will be the case for the file artifacts of datasets that could be eventually committed as-is back to the central repository, including the workspace-written datasets like packages
and the new pipeline
and pipeline_graph
datasets.
Quantum graphs don’t fit as well into Datastore
management.
This is partly a conceptual issue - we expect quantum graph files to continue to provide the sort of dataset metadata (data IDs, datastore records) the database provides for the central repository (as in today’s QBB), so putting quantum graph files into a Datastore
is a little like putting a SQLite database file into a Datastore
.
And this isn’t entirely conceptual - like SQLite database files, we may want to be able to modify quantum graph files in-place (even if that’s just to append), and that’s not a door we want to open for the butler dataset concept.
Quantum graph files also exist on the opposite end of the spectrum from regular pipeline output datasets in terms of whether we will want to rewrite them rather than just ingest them on commit.
Even if provenance quantum graphs in the central repository are stored in files initially (the plan in mind here), we probably want to strip out all of the dimension record data they hold that is wholly redundant with what’s in the database, and since concerns about modifying the graphs disappear after commit, we probably want to consolidate the graph information into fewer files.
This last point also holds for a new category of file-based state we’ll need to add for PipelineWorkspaceButler
: per-quantum status files that are written after each a quantum is executed.
In addition to status flags and exception information for failures, these are can hold output-dataset datastore records, and their existence is what downstream quanta will block on in PipelineWorkspaceButler.run_quanta
.
These will be scanned as-is by PipelineWorkspaceButler.query_quanta
, since that’s all it can do, but we really want to merge this status information with the rest of the quantum graph on commit (while dropping the redundant dimension records, and moving datastore records into the central database), potentially making Butler.query_provenance
much more efficient in terms of storage and access cost.
6 Prototype code¶
This technote’s git repository includes two Python files with stubs representing new interfaces for the daf_butler
and pipe_base
packages.
These can be inspected directly, but the most important parts are included here so classes and methods can be referenced by the preceding sections.
This occasionally includes proto-implementations, but only when these are particularly illustrative of the relationships between interfaces.
6.1 daf_butler¶
- class Butler¶
- make_workspace()¶
def make_workspace( self, name: str, input_collections: Sequence[str], factory: WorkspaceFactory, *, shard_by: Iterable[str] | DimensionGraph | None = None, chain: str | None = None, root: ResourcePathExpression | None = None, ) -> WorkspaceButler: """Make a new workspace. Parameters ---------- name : `str` Name of the workspace. This can be used to retrieve it later (via `get_workspace`) and will be used as the name of the ``RUN`` collection created when the workspace is committed. input_collections : `~collections.abc.Sequence` [ `str` ] Collections whose inputs may be used by the workspace. factory : `~collections.abc.Callable` Callback that is passed all arguments to this method (after transforming some defaults) as well as ``self`` and the butler configuration to construct the actual workspace. shard_by : `~collections.abc.Iterable` [ `str` ] or `DimensionGraph`, \ optional Dimensions for data IDs that can be separately committed from this workspace. chain : `str`, optional Name of a chained collection to create or modify to include the new output run and the input collections on commit. root : convertible to `lsst.resources.ResourcePath`, optional If not `None`, a path to an external directory where the workspace's file artifacts should be stored. This marks the workspace as "external", which means that those artifacts must be transferred in order to be committed, but it is easier to set up a standalone data repository from them instead (see `WorkspaceButler.export`). Returns ------- workspace : `WorkspaceButler` Butler client for the new workspace. Notes ----- We may be able to add some syntactic sugar to make this pattern a little simpler for users, but we may not have to if they always use a CLI command defined in ``pipe_base`` to do it anyway. The idea of having a callback here is that the central data repository's configuration may have options that workspace factories could read when a new workspace is constructed, like queues for a BPS workspace. """ # Standardize arguments. match shard_by: case None: shard_by = self.dimensions.empty case DimensionGraph(): # type: ignore[misc] pass case _: shard_by = self.dimensions.extract(shard_by) if root is None: # This is an internal workspace. config_uri = self._config.get_workspace_config_uri(name) # For internal workspaces, we insert a row identifying the # workspace into a database table before it is actually created, # and fail if such a row already exists. This is effectively a # per-name concurrency lock on the creation of workspaces. self._insert_workspace_record(name) else: # This is an external workspace. root = ResourcePath(root, forceDirectory=True) config_uri = root.join(WorkspaceConfig.FILENAME) # Delegate to the factory object to do most of the work. This writes # persistent state (e.g. to files or a database). try: workspace_butler, workspace_config = factory( name, input_collections, shard_by, chain, root, self, self._config ) except HandledWorkspaceFactoryError as err: # An error occurred, but the factory cleaned up its own mess. We # can remove the record of the workspace from the database and # just re-raise the original exception. self._remove_workspace_record(name) raise cast(BaseException, err.__cause__) except BaseException as err: # An error occurred and the factory cannot guarantee anything about # the persistent state. Make it clear that administrative action # is needed. # # Note that this state is recognizable for internal workspaces from # the existence of a central database row for the workspace and the # absence of a config file, and that the database row needs to be # deleted for an administrator to mark it as cleaned up. For # external workspaces we expect the user to just 'rm -rf' (or # equivalent) the workspace directory. raise CorruptedWorkspaceError( f"New workspace {name} with root {root} was corrupted during construction." ) from err try: # Save a configuration file for the workspace to allow the # WorkspaceButler to be reconstructed without the full Butler in # the future. with config_uri.open("w") as stream: stream.write(workspace_config.json()) except BaseException: # If we fail here, try to clean up. try: workspace_butler._remove_remaining_content() except BaseException as err: # Couldn't clean up. raise CorruptedWorkspaceError( f"New workspace {name} with root {root} was corrupted after to write {config_uri}." ) from err # Successfully cleaned up workspace persistent state, try to remove # from database as well if this is an internal workspace. if root is None: self._remove_workspace_record(name) raise return workspace_butler
- get_workspace()¶
def get_workspace(self, name: str) -> WorkspaceButler: """Return an existing internal workspace butler. Parameters ---------- name : `str` Name used to construct the workspace. Returns ------- workspace : `WorkspaceButler` Butler client for the workspace. """ return self._config.make_workspace_butler( name, parent_write_butler=self if self.is_writeable else None, parent_read_butler=self if not self.is_writeable else None, )
- query_provenance()¶
def query_provenance( self, collections: str | Sequence[str], *, where: QuantumGraphExpression | str | EllipsisType = ..., bind: Mapping[str, Any] | None = None, data_id: DataId | None = None, **kwargs: Any, ) -> QuantumGraph: """Query for provenance as a directed-acyclic graph of datasets and 'quanta' that produce them. Parameters ---------- collections Collection or collections to search, in order. Overall-input datasets may also be included, and shadowed quanta or datasets may also appear if they are ancestors or descendants of a non-shadowed quantum or dataset identified in the expression. where : `QuantumGraphExpression`, `str`, or ``...``, optional A `QuantumGraphExpression` that restricts the quanta to return, or a string expression that can be parsed into one. ``...`` can be used to select all quanta consistent with criteria from other arguments. bind : `~collections.abc.Mapping`, optional Mapping from identifiers appearing in ``where`` to the values they should be substituted with. data_id : `~collections.abc.Mapping` or \ `~lsst.daf.butler.DataCoordinate`, optional Data ID that constrains the data IDs of all quanta returned and the data IDs of any dataset data ID literalsappearin in ``where`` (e.g. providing ``instrument`` so it doesn't have to be repeated every time a ``visit`` is referenced). Quanta with dimensions not constrained by this data ID's dimensions are still constrained by it via spatiotemporal relationships, just as they are in query methods. **kwargs Additional keyword arguments are interpreted as data ID key-value pairs, overriding and extending those in ``data_id``. Returns ------- quanta : `QuantumGraph` Matching quanta and all of their inputs and outputs. Note that while datasets may be used in ``where`` to control which quanta are returned via ancestors/descendant relationships, we never return a quantum without including all of its inputs and outputs, and hence arguments like ``data_id`` to not directly constrain the datasets in the graph. Notes ----- This only considers provenance that has been committed to the central data repository. The implementation is not expected to be optimized for the case of querying all (or a very large number) of collections with a highly constraining ``where`` expression; provenance graph storage will probably be run-by-run rather than indexed by data ID or dataset/quantum UUID. This means it may be much more efficient to first query for datasets using data ID expressions in order to extract their ``RUN`` collection names, before querying for their provenance in a second pass. """ raise NotImplementedError()
- class ButlerConfig¶
- make_workspace_butler()¶
def make_workspace_butler( self, name: str, parent_read_butler: Butler | None = None, parent_write_butler: Butler | None = None, ) -> WorkspaceButler: """Make a butler client for an existing internal workspace.""" config_uri = self.get_workspace_config_uri(name) with config_uri.open("r") as stream: config_data = json.load(stream) config_cls = doImportType(config_data["type_name"]) config: WorkspaceConfig = config_cls(**config_data) return config.make_butler( self, parent_read_butler=parent_read_butler, parent_write_butler=parent_write_butler )
- class WorkspaceFactory¶
class WorkspaceFactory(Protocol): """An interface for callables that construct new workspaces. Implementations of this protocol may be regular methods or functions, but we expect them to frequently be full types so instance state can be used to hold workspace-specific initialization state. """ def __call__( self, name: str, input_collections: Sequence[str], shard_by: DimensionGraph, chain: str | None, path: ResourcePath | None, parent: Butler, parent_config: ButlerConfig, ) -> tuple[WorkspaceButler, WorkspaceConfig]: ...
- class WorkspaceConfig¶
class WorkspaceConfig(BaseModel, ABC): """A configuration object that can construct a `WorkspaceButler` client instance for an existing workspace. """ FILENAME: ClassVar[str] = "butler-workspace.json" """Filename used for all workspace butler configuration files.""" type_name: str """Fully-qualified Python type for this `WorkspaceConfig` subclass. This is populated automatically by the constructor and stored in this attribute to make sure it is automatically saved to disk. """ parent_config: dict[str, Any] """Parent configuration, as a nested dictionary.""" name: str """Name of the workspace This will become the RUN collection name on commit. """ input_collections: list[str] """Sequence of collections to search for inputs. """ chain: str | None """CHAINED collection to create from input collections and the output RUN collection on commit. """ def __init__(self, **kwargs: Any) -> None: super().__init__(type_name=get_full_type_name(self), **kwargs) @abstractmethod def make_butler( self, parent_config: ButlerConfig | None = None, parent_read_butler: Butler | None = None, parent_write_butler: Butler | None = None, ) -> WorkspaceButler: """Construct a `WorkspaceButler` instance from this configuration.""" raise NotImplementedError()
- class WorkspaceButler¶
- make_external()¶
@classmethod def make_external(cls, root: ResourcePathExpression) -> Self: """Construct a workspace butler of the appropriate derived type from an external workspace path. """ root = ResourcePath(root, forceDirectory=True) with root.join(WorkspaceConfig.FILENAME).open("r") as stream: config_data = json.load(stream) config_cls = doImportType(config_data["type_name"]) config: WorkspaceConfig = config_cls(**config_data) butler = config.make_butler() if not isinstance(butler, cls): raise TypeError( f"Unexpected workspace butler class; config file specifies {type(butler).__name__}, but " f"make_external was called on {cls.__name__}." ) return butler
- abandon()¶
@final def abandon(self) -> None: """Delete the workspace and all of its contents. Notes ----- This operation invalidates the workspace object. """ try: self._remove_remaining_content() except BaseException as err: raise CorruptedWorkspaceError( f"Workspace {self.name} left in an inconsistent state by 'abandon'." ) from err self._get_parent_write_butler()._remove_workspace_record(self.name)
- commit()¶
@final def commit( self, shards: Iterable[DataCoordinate] | EllipsisType = ..., transfer: str | None = None ) -> None: """Register all datasets produced in this workspace with the central data repository, and invalidate the workspace. Parameters ---------- data_id : `~lsst.daf.butler.DataCoordinate` or mapping, optional If provided, only commit certain data IDs (which must correspond to the workspace's sharding dimensions), and do not invalidate the workspace as a whole. Datasets and provenance with these data IDs are removed from the workspace and added to a ``RUN`` collection in the central repository with the same name. transfer : `str`, optional Transfer method used to ingest datasets into the central data repository. Ignored unless if this is an internal workspace. Notes ----- Committing a workspace also transfers provenance information to the registry, which may (or may not!) include moving content between databases, between databases and files, or consolidating files, even when the workspace is internal. This method writes to the central data repository's database. """ if shards is not ...: raise NotImplementedError("TODO: check and then update committed shards.") parent = self._get_parent_write_butler() self._transfer_content(parent, shards=shards, transfer=transfer) if shards is ...: self.abandon()
- export()¶
@final def export( self, *, root: ResourcePathExpression | None = None, transfer: str | None = None, **kwargs: Any ) -> Butler: """Create a new independent data repository from the output datasets in this workspace. Parameters ---------- root : convertible to `lsst.resources.ResourcePath`, optional Root for the new data repository. Must be provided if this is an internal data repository. transfer : `str` or `None`, optional Transfer mode for ingesting file artifacts into the new data repository. Ignored if ``root`` is `None` or the same as the workspace's current root. **kwargs Forwarded to `Butler.makeRepo`. Returns ------- butler : `Butler` A full butler client for the new data repository. Notes ----- This operation invalidates the workspace object. """ if root is None: root = self.root if root is None: raise ValueError("Cannot export an internal workspace without a root.") else: root = ResourcePath(root, forceDirectory=True) if root != self.root and transfer is None: raise ValueError( "A transfer mode is required to export a repository from a workspace with a different root." ) full_config = ButlerConfig(Butler.makeRepo(root, **kwargs)) full_butler = Butler(full_config, writeable=True) self._insert_dimension_records(full_butler) self._transfer_content(full_butler, shards=..., transfer=transfer) self.abandon() return full_butler
- transfer_inputs()¶
@abstractmethod def transfer_inputs(self, transfer: str | None = "copy") -> None: """Transfer file artifacts for all overall-input datasets to the workspace root and update the URIs used to fetch them during execution accordingly. Notes ----- This method can only be called if the workspace root is outside the central data repository root. """ raise NotImplementedError()
- class QuantumGraph¶
class QuantumGraph(ABC): """A directed acyclic graph of datasets and the quanta that produce and consume them. Notes ----- Datasets are included in a quantum graph if and only if a producing or consuming quantum is included; there are never isolated dataset nodes with no edges in a graph, and any quantum in the graph can be assumed to have all of its nodes. The attributes here do not necessarily map to how the graph is stored, which is workspace- and (for committed graphs) registry-defined. This base class needs to live in ``daf_butler`` so it can be used as the return type for provenance queries; note that it sees quanta only as entities that consume and produce datasets that are identified by a UUID or the combination of a task label and a data ID, and it sees tasks as just string labels. """ @property @abstractmethod def universe(self) -> DimensionUniverse: """Definitions for any dimensions used by this graph.""" raise NotImplementedError() @abstractmethod def get_bipartite_xgraph(self, *, annotate: bool = False) -> nx.MultiDiGraph: """Return a directed acyclic graph with UUID keys for both quanta and datasets. Parameters ---------- annotate : `bool`, optional If `True`, add expanded `lsst.daf.butler.DataCoordinate` objects and `lsst.daf.butler.DatasetRef` to nodes (the latter only for dataset nodes) as ``data_id`` and ``ref`` attributes. Returns ------- graph : `networkx.MultiDiGraph` NetworkX directed graph with quantum and dataset nodes, where edges only connect quanta to datasets and datasets to quanta. This is a `networkx.MultiDiGraph` to represent the possibility of a quantum consuming a single dataset through multiple connections (edge keys are connection names). Notes ----- Nodes have a ``bipartite`` attribute that is set to ``0`` for datasets and ``1`` for quanta. This can be used by the functions in the `networkx.algorithms.bipartite` package. Nodes representing quanta also have a ``label`` attribute, while nodes representing datasets have a ``dataset_type_name`` attribute. All nodes have a ``run`` attribute with the name of the run collection or workspace they belong to. """ raise NotImplementedError() @abstractmethod def get_quantum_xgraph(self, annotate: bool = False) -> nx.DiGraph: """Return a directed acyclic graph with UUIDs key for quanta only. Parameters ---------- annotate : `bool`, optional If `True`, add expanded `lsst.daf.butler.DataCoordinate` objects to nodes as ``data_id`` attributes. Returns ------- graph : `networkx.MultiDiGraph` NetworkX directed graph with quantum nodes only, with edges representing one or more datasets produced by one quantum and consumed by the other. Nodes ----- Unless ``annotate=True``, nodes have ``label`` and ``run`` attributes only. Edges have no attributes. """ raise NotImplementedError() @abstractmethod def get_data_id(self, node_id: uuid.UUID) -> DataCoordinate: """Return the data ID for a quantum or dataset. Data IDs are guaranteed to be fully expanded. """ raise NotImplementedError() @abstractmethod def get_dataset_ref(self, dataset_id: uuid.UUID) -> DatasetRef: """Return the `~lsst.daf.butler.DatasetRef` object for a dataset node. Data IDs are guaranteed to be fully expanded. """ raise NotImplementedError() @abstractmethod def get_quanta_for_task(self, label: str) -> Set[uuid.UUID]: """Return all quanta for a single task.""" raise NotImplementedError() @abstractmethod def find_quantum(self, label: str, data_id: DataId) -> uuid.UUID: """Find a single quantum by data ID. This method will raise if the result is not unique, which can happen in rare cases if the graph spans multiple collections. `find_quanta` is usually less convenient but can be used to handle this case. """ raise NotImplementedError() @abstractmethod def find_quanta(self, label: str, data_id: DataId) -> Set[uuid.UUID]: """Find all quanta with a particular task and data ID.""" raise NotImplementedError() @abstractmethod def get_datasets_for_type(self, dataset_type_name: str) -> Set[uuid.UUID]: """Return all datasets for a dataset type.""" raise NotImplementedError() @abstractmethod def find_dataset(self, dataset_type_name: str, data_id: DataId) -> uuid.UUID: """Find a single dataset by data ID. This method will raise if the result is not unique, which can happen in rare cases if the graph spans multiple collections. `find_datasets` is usually less convenient but can be used to handle this case. """ raise NotImplementedError() @abstractmethod def find_datasets(self, label: str, data_id: DataId) -> Set[uuid.UUID]: """Find all datasets with a particular dataset type and data ID.""" raise NotImplementedError() @abstractmethod def get_quantum_status(self, quantum_id: uuid.UUID) -> QuantumStatus: """Get the current status or a quantum.""" return self.get_quantum_history(quantum_id)[-1] @abstractmethod def get_dataset_status(self, dataset_id: uuid.UUID) -> DatasetStatus: """Get the current status for a dataset.""" return self.get_dataset_history(dataset_id)[-1] @abstractmethod def get_quantum_history(self, quantum_id: uuid.UUID) -> list[QuantumStatus]: """Get the history of this quantum's status. Current status is last. """ raise NotImplementedError() @abstractmethod def get_dataset_history(self, dataset_id: uuid.UUID) -> list[DatasetStatus]: """Get the history of this dataset's status. Current status is last. """ raise NotImplementedError() @abstractmethod def filtered( self, where: QuantumGraphExpression = ..., bind: Mapping[str, Any] | None = None, data_id: DataId | None = None, **kwargs: Any, ) -> QuantumGraph: """Filter the quanta and datasets in this graph according to an expression. """ raise NotImplementedError()
- class QuantumGraphExpression¶
class QuantumGraphExpression: """Placeholder for a parsed expression that subsets a quantum graph. This is expected to include: - the standard set-theory operations (intersection, union, difference, symmetric difference, inversion); - literal sets of quanta and datasets; - range syntax for ancestors and descendants of quanta and datasets (see `PipelineGraphExpression`). In both literal sets and range expressions, quanta and datasets could be identified via: - UUID literals; - combination of task label or dataset type name and data ID (e.g. ``makeWarp@{tract=9813, patch=42, visit=12}``); - bind aliases for UUIDs, sets of UUIDs, task labels, dataset type names, and data IDs. - the values of the `QuantumStatus` and `DatasetStatus` enums as keywords, representing the set of all quanta or datasets with that status. Data IDs keys that are common to all selected nodes can be provided by butler defaults or a separate single data ID, which should be accepted by any method accepting an expression of this type. In queries that span multiple collections, dataset and quantum identifiers that use data IDs are resolved in the order they appear in the collection sequence, but range expressions can pick up datasets and quanta that are shadowed. """
- class QuantumStatus¶
class QuantumStatus(enum.Enum): """Valid states for a quantum node. This is intended only to capture status to the extent necessary to represent state transitions when executing or retrying quanta; full status information that's useful for diagnostic purposes does not belong here. """ BUILT = enum.auto() """Quantum has been predicted but is not known to have been started or finished. """ STARTED = enum.auto() """Execution has started but is not complete. Workspace implementations are not required to ever set this state. """ SUCCEEDED = enum.auto() """Execution completed successfully. This includes the case where the quantum had no work to do, which can be determined by looking at the status of downstream dataset nodes. """ FAILED = enum.auto() """Execution was started but raised an exception or otherwise failed. When this status is set on a quantum node, the node may have an ``exception_type`` attribute with the exception type, a `str` ``message`` attribute with the exception message, and a ``traceback`` attribute with the exception traceback, *if* an exception was caught. """
- class DatasetStatus¶
class DatasetStatus(enum.Flag): """Valid states for a dataset node in quantum graph.""" PREDICTED = enum.auto() """Dataset has been predicted to exist but is not known to have been written or invalidated. """ PRESENT = enum.auto() """Dataset has been produced, and the producing quantum was either successful or is still running. This is all state for all overall-inputs to the workspace that were present in the central data repository. """ INVALIDATED = enum.auto() """Dataset has been produced but the producing task later failed. This can also occur if this quantum or an upstream one was poisoned (manually marked as failed after it had apparently succeeded). Invalidated datasets are not transferred to the central data repository on workspace commit, but they may be reclassified again prior to commit. """
6.2 pipe_base¶
- class PipelineWorkspaceButler¶
- development_mode¶
@development_mode.setter @abstractmethod def development_mode(self, value: bool) -> None: """Whether this workspace is in development mode, in which version and configuration changes are permitted but provenance is limited. This can be set to `True` to enter development mode at any time, but doing so is irreversible. """ raise NotImplementedError()
- get_pipeline_graph()¶
@abstractmethod def get_pipeline_graph(self) -> PipelineGraph: """Return the pipeline graph associated with this workspace. This is always the complete pipeline; it in general includes tasks that have not been activated. """ raise NotImplementedError()
- get_pipeline()¶
@abstractmethod def get_pipeline(self) -> Pipeline | None: """Return the pipeline associated with this workspace. This is always the complete pipeline; it in general includes tasks that have not been activated. There may not be any pipeline if only a pipeline graph was provided at workspace construction. """ raise NotImplementedError()
- reset_pipeline()¶
@abstractmethod def reset_pipeline( self, pipeline: ResourcePathExpression | Pipeline | PipelineGraph, ) -> None: """Update the pipeline associated with this workspace. This is only permitted if the workspace in development mode. This operation requires read access to the central data repository database (to resolve dataset types). """ raise NotImplementedError()
- get_packages()¶
@abstractmethod def get_packages(self) -> Packages | None: """Return the software versions frozen with this workspace. This is `None` if and only if the workspace is in development mode. """ raise NotImplementedError()
- active_tasks¶
@property @abstractmethod def active_tasks(self) -> Set[str]: """The labels of the tasks considered active in this workspace. This is always a subset of the pipeline graph's task labels. Active tasks have had their init-outputs (including configuration) written, and only active tasks may have quanta built. """ raise NotImplementedError()
- activate_tasks()¶
@abstractmethod def activate_tasks( self, spec: PipelineGraphExpression | str | Iterable[str] | EllipsisType = ..., /, ) -> None: """Activate tasks matching the given pattern. This writes init-outputs for the given tasks. Activating a task whose init-inputs are not available either in the input collections or the workspace itself is an error. Reactivating an already-active task in development mode causes init-outputs to be re-written. Outside development mode it checks that software versions have not changed and hence init-outputs do not need to be written, and raises if software versions have changed. """ raise NotImplementedError()
- get_init_input_refs()¶
@abstractmethod def get_init_input_refs(self, task_label: str) -> Mapping[str, DatasetRef]: """Return the init-input dataset references for an activated task. Mapping keys are connection names. """ raise NotImplementedError()
- get_init_output_refs()¶
@abstractmethod def get_init_output_refs(self, task_label: str) -> Mapping[str, DatasetRef]: """Return the init-output dataset references for an activated task. Mapping keys are connection names. """ raise NotImplementedError()
- build_quanta()¶
@abstractmethod def build_quanta( self, *, tasks: PipelineGraphExpression | str | Iterable[str] | EllipsisType | None = None, where: str = "", bind: Mapping[str, Any] | None = None, data_id: DataId | None = None, shards: Iterable[DataCoordinate] | None = None, **kwargs: Any, ) -> None: """Build a quantum graph, extending any existing graph. Parameters ---------- tasks Specification for the tasks to include. If `None`, all active tasks are used. For any other value, matching tasks that are not already active will be activated. where : `str`, optional Data ID query string. bind : `~collections.abc.Mapping`, optional Values to substitute for aliases in ``where``. data_id : `~lsst.daf.butler.DataCoordinate` or mapping, optional Data ID that constrains all quanta. shards : `~collections.abc.Iterable` [ \ ~`lsst.daf.butler.DataCoordinate` ], optional Data IDs that identify the sharding dimensions; must be provided if the sharding dimensions for this workspace are not empty. **kwargs Additional data ID key-value pairs, overriding and extending ``data_id``. Notes ----- This may be called multiple times with different tasks or shards, but only with tasks in topological order (for each shard data ID). This allows a large quantum graph to be built incrementally, with different data ID constraints for different tasks. Rebuilding the quanta for a task-shard combination that has already been built does nothing unless the workspace is in development mode. In development mode, rebuilding quanta for a task-shard combination will delete all downstream quanta that have not be executed, and raise if any downstream quanta have already been executed (`reset_quanta` can be used to deal with already-executed quanta in advance). Rebuilt quanta and their output datasets are assigned new UUIDs. This operation requires read access and temporary-table write access to the central data repository database. """ raise NotImplementedError()
- get_built_quanta_summary()¶
@abstractmethod def get_built_quanta_summary(self) -> Set[tuple[str, DataCoordinate]]: """Report the combinations of task labels and sharding data IDs for which quantum graphs have already been built. Empty data IDs are returned for workspaces with no sharding dimensions. """ raise NotImplementedError()
- query_quanta()¶
@abstractmethod def query_quanta( self, *, where: QuantumGraphExpression | str | EllipsisType = ..., bind: Mapping[str, Any] | None = None, data_id: DataId | None = None, **kwargs: Any, ) -> QuantumGraph: """Query for quanta that have already been built and possibly executed. See `Butler.query_provenance` for parameter details. Notes ----- The returned QuantumGraph is a snapshot of the workspace's state, not a view. If quanta are currently being executed when this is called, the status for different quanta may not reflect the same instant in time, but the states for a single quantum and its output datasets are always consistent (but possibly already out-of-date by the time the method returns). The base class does not specify the result when quanta are currently built while this method is called, but derived classes may enable this. """ raise NotImplementedError()
- run_quanta()¶
@abstractmethod def run_quanta( self, *, quanta: uuid.UUID | Iterable[uuid.UUID] | EllipsisType = ..., where: QuantumGraphExpression | str | EllipsisType = ..., bind: Mapping[str, Any] | None = None, data_id: DataId | None = None, **kwargs: Any, ) -> None: """Execute tasks on quanta. TODO """ raise NotImplementedError()
- accept_failed_quanta()¶
@abstractmethod def accept_failed_quanta( self, *, quanta: uuid.UUID | Iterable[uuid.UUID] | EllipsisType = ..., where: QuantumGraphExpression = ..., bind: Mapping[str, Any] | None = None, data_id: DataId | None = None, **kwargs: Any, ) -> None: """Change the status of matching quanta that currently also have status `~QuantumStatus.FAILED` to `~QuantumStatus.SUCCESSFUL`. Existing outputs (which should already be marked as `~DatasetStatus.INVALIDATED`) will have their status set to `~DatasetStatus.PRESENT`. """ raise NotImplementedError()
- poison_successful_quanta()¶
@abstractmethod def poison_successful_quanta( self, *, quanta: uuid.UUID | Iterable[uuid.UUID] | EllipsisType = ..., where: QuantumGraphExpression = ..., bind: Mapping[str, Any] | None = None, data_id: DataId | None = None, **kwargs: Any, ) -> None: """Change the status of matching quanta that currently also have status `~QuantumStatus.SUCCESSFUL` to `~QuantumStatus.FAILED`, set all downstream quanta to `~QuantumStatus.PREDICTED` """ raise NotImplementedError()
- reset_quanta()¶
@abstractmethod def reset_quanta( self, *, quanta: uuid.UUID | Iterable[uuid.UUID] | EllipsisType = ..., where: QuantumGraphExpression = ..., bind: Mapping[str, Any] | None = None, data_id: DataId | None = None, **kwargs: Any, ) -> None: """Change the status of matching quanta to `~QuantumStatus.PREDICTED` and delete all existing outputs. """ raise NotImplementedError()
- class PipelineGraphExpression¶
class PipelineGraphExpression: """Placeholder for a parsed expression that subsets a pipeline graph. This is expected to include: - the standard set-theory operations (intersection, union, difference, symmetric difference, inversion); - literal sets of task labels and dataset type names; - regular expressions or shell-style globs on task labels and dataset type names; - range syntax for ancestors and descendants of task labels and dataset type names (e.g. ``..b`` and ``a..``), possibly with a shortcut for an intersection of the two (e.g. ``a..b``); - bind aliases for task labels, dataset type names, and sets thereof, to be satisfied with an extra dictionary mapping bind alias to `str` or ``collections.abc.Set[str]``. """
7 References¶
References
[DMTN-249]. Jim Bosch. Revisiting division of responsibilities in Butler components. 2023. Vera C. Rubin Observatory Data Management Technical Note. URL: https://dmtn-249.lsst.io/
[DMTN-205]. Jim Bosch, Tim Jenness, Michelle Gower, and Andy Salnikov. Tracking Provenance in Butler. 2022. Vera C. Rubin Observatory Data Management Technical Note. URL: https://dmtn-205.lsst.io/
[DMTN-177]. Tim Jenness. Limiting Registry Access During Workflow Execution. 2023. Vera C. Rubin Observatory Data Management Technical Note. URL: https://dmtn-177.lsst.io/