DMTN-271: Butler management of quantum graph storage and execution

  • Jim Bosch

Latest Revision: 2023-09-22

Note

This technote is a work-in-progress.

1 Abstract

This technote proposes a new model for executing pipelines against butler repositories, in which both quantum graphs and output datasets are managed by a butler workspace before being committed back to the central data repository. Workspaces generalize the “quantum-backed butler” system (DMTN-177 [3]) to all butler-backed execution with a more powerful and use-case-focused interface than the current pipetask tool. By storing quantum graphs in a butler-managed location, workspaces also pave the way to storing quantum provenance in butler.

2 Conceptual overview

2.1 Origin in BPS/QBB

The execution model of the Batch Production Service (BPS) with quantum-backed butler (QBB, see DMTN-177 [3]) can be seen as three intuitive steps, analogous to transactional operations in SQL:

  1. bps prepare starts the process defining a moderately large unit of processing processing, via a sequence of related directed acyclic graphs (DAGs), starting with the quantum graph. These graphs and other WMS-specific files are saved to a “submit” directory, whose location is managed by the user even though the contents are managed by BPS. This can be seen as the beginning of a transaction for a run - no modifications are made to the central data repository.

  2. bps submit executes that processing by delegating to a third-party workflow management system (WMS). All of these batch jobs (aside from the very last) write the output datasets as file artifacts, but do not read or write from the central data repository’s database at all.

  3. The last batch job inserts records describing the produced file artifacts to the central database, effectively committing the transaction by putting all produced datasets into a single RUN collection.

These steps are not always visible to users - bps submit can run the prepare step itself, and usually does, and in the absence of errors the division between the regular execution batch jobs and the final “transfer” job is invisible. What matters is that this model provides a sound foundation for higher-level tools, with the existence of a discrete “commit” step the aspect that is most notably lacking in non-BPS execution.

2.2 Workspace basics

The middleware team has long wanted to unify the bps and pipetask interfaces, and the proposal here is to do just that, by introducing a new workspace concept that generalizes the BPS/QBB flow to all execution. A workspace is best thought of an under-construction, uncommitted RUN collection, known to the central data repository but not included in its databases (except perhaps as a lightweight record of which workspaces exist). Users interact with a workspace via a WorkspaceButler client, which subclasses LimitedButler. When a workspace is committed a regular RUN collection is created and the workspace ceases to exist. Workspaces can hold quantum graphs, output file artifacts (via a datastore), and provenance/status information.

Workspaces can have multiple implementations and capabilities. WorkspaceButler itself will not assume anything about tasks or pipelines, as appropriate for a class defined in daf_butler. Our main focus here will be a PipelineWorkspaceButler subclass that serves as a pipetask replacement, and we imagine it storing its persistent state (mostly quantum graphs) using files (see 5   Implementation notes). We envision BPS and its plugins delegating to the PipelineWorkspaceButler via a command-line interface, much as they currently do with pipetask, but we hope that eventually the front-end interface to BPS could be a WorkspaceButler subclass as well.

Workspaces may read from the central repository database for certain operations after they are created, but they must not write to it except when committed, and it is expected that many core operations will not access the central database at all (such as actually running tasks with PipelineWorkspaceButler.run_quanta()).

2.3 Provenance

One clear advantage of the workspace model is that the “commit” step provides an opportunity for provenance to be gathered and possibly rewritten into a form more focused on post-execution reads than at-scale execution writes. This is a problem largely ignored by DMTN-205 [2], which describes a central database schema for provenance but lacks a complete plan for populating it. In addition to the workspace interfaces, this technote will describe new methods on the Butler class that provide some provenance query support. These could be implemented by storing quantum graphs and quantum status in files after workspaces are committed, while leaving room for database storage of provenance graphs as described in DMTN-205 [2] in the future.

2.4 Development mode

Workspaces would also be useful in pipeline software development and small-scale testing, especially if we could (sometimes) relax our consistency controls on software versions and pipeline configuration prior to “commit”, a state we will call “developement mode”. Furthermore, many development runs would not be committed at all, because the data products themselves are not the real outcome of that work - working code is. This suggests that workspaces should also support an analog of SQL’s ROLLBACK as an alternative to commit (though we will use WorkspaceButler.abandon() as the nomenclature here to avoid confusion with the very different relationship between commit and rollback in other contexts, e.g. some version control systems).

2.5 External workspaces

If we provide an option to create a workspace in an external location instead of inside a central data repository, we get an intriguing possibility for execution in the Rubin Science Platform (RSP) or on a science user’s local compute resources. The quantum graph stored in the workspace knows about all needed input datasets, and these could be transferred to the external location as well, providing a self-contained proto-data-repository that could be manually moved and ultimately executed anywhere, and it contains all of the information needed to reconstitute a standalone data repository with its own database. Alternatively, if the workspace is left in a location where it can read from the central data repository but not write to it, input datasets could be left in the central data repository and the user could choose to either create a standalone repository that merely references them or commit by transferring output dataset file artifacts to the cental repository root.

2.6 Sharded workspaces and Prompt Processing

We have one major use case that does not fit as well with this model, at least as described so far: prompt processing runs one visit or one {visit, detector} at a time, appending these results to a single RUN collection that typically covers an entire night’s processing. The switch from pipetask to workspaces wouldn’t actually require any major changes to the prompt processing framework’s architecture, because each prompt processing worker has its own full SQL-backed butler, with the fan-out to workers and long-term persistence represented as transfers between data repositories. Workspaces would be created and committed wholly within individual workers, requiring some changes to low-level tooling like SeparablePipelineExecutor. The existence of a full Python interface for workspace-based execution may actually remove the need for separate pipeline executor classes entirely.

An alternative approach in which workspaces are more tightly integrated with prompt processing is both intriguing and more disruptive. In this model, prompt processing would have a custom WorkspaceButler implementation that represents a client of a central-butler workspace existing on a worker node. This would give prompt-processing complete control over quantum graph generation and how it is interleaved with execution, allowing the graph to be built incrementally as new information arrives (first the nextVisit event, then a raw file artifact, and then perhaps a raw for a second stamp), with each processing step starting as soon as possible. There would be no SQL database on the workers at all, and the WorkspaceButler implementation would implement its own get() and put() implementations for use by tasks. This would be a natural place to put in-memory caching of datasets, while delegating to a regular Datastore when actual file I/O is desired.

This model would not work if a RUN collection is the (only) unit for commits and quantum graph generation, since we would want to commit results from each worker to the central butler on a per-data ID basis. To address this, a workspace may be created with sharding dimensions, which define data IDs that may be committed independently. This restricts the tasks that may be run in such a workspace to those that have at least those dimensions, ensuring that all output datasets in the workspace can be divided into disjoint sets according to those data IDs. Committing a data ID from a sharded workspace moves all datasets and provenance with that data ID to the corresponding RUN collection in the central repo, removing it from the workspace (note that file artifacts remain where they are unless this is an external workspace), allowing the RUN collection and its workspace to coexist for a while.

Sharded workspaces may be also useful for data release production campaign management. Our current campaign management tooling prefers to do its own sharding, resulting in different RUN collection for each shard (where a shard is typically hundreds or thousands of data IDs), and this model will continue to be viable and probably preferred due to the simplicity of one-to-one correspondence between RUN collections (and in the future, workspaces) and batch submissions. Sharding within a workspace just gives us another option, and declaring sharding dimensions with the current campaign management sharding approach still has the advantage of restricting tasks in exactly the way we’d like to guard against bad pipeline step definitions and operator error.

An unsharded workspace logically has empty sharding dimensions, and one shard identified by the empty data ID. It permits all tasks since the set of empty dimensions is a subset of all dimension sets.

2.7 Data repository consistency

A serious but often ignored problem with QBB (and its predecessor, “execution butler”) is that it is inconsistent with the nominal butler consistency model, in that it allows file artifacts to exist in a datastore root without having any record of those artifacts in the corresponding registry. This consistency model is only “nominal” because it was never actually sound (deletions in particular have always been a problem), and hence we’ve considered it an acceptable casualty when working to limiting database access during execution (i.e. DMTN-177 [3]). DMTN-249 [1] attempts to address this issue by defining a new consistency model that explicitly permits (and actually requires) file artifacts to exist prior to their datasets’ inclusion in the registry, as long as those datasets are tracked as possibly existing in some other way that allows them to be found by any client of the central data repository (not merely, e.g. one that knows the user-provided location of an external quantum graph file, as is the case today). The workspace concept described here is a realization of this new consistency model: the central data repository will have a record of all active (internal) workspaces (probably in a registry database table) and its full butler clients will have the ability to construct a WorkspaceButler for these by name. Workspace implementations are required to support the WorkspaceButler.abandon() operation, which effectively requires that it have a way to find all of its file artifacts.

2.8 Concurrency and workspace consistency

Different workspaces belonging to the same central data repository are fully independent, and may be operated on concurrently in different processes with no danger of corruption. This includes commits and workspace creation, and races for creation of the same workspace should result in at least one of the two attempts explicitly failing, and in no cases should hardware failures or concurrency yield an incorrectly-created workspace that can masquerade as a correct one.

Operations on workspaces should be idempotent either by being atomic or by being resumable from a partial state after interruption, with exceptions to this rule clearly documented.

Most operations on a workspaces are not expected to support or guard against concurrent operations on the same workspace, unless they are operating on disjoint sharding data IDs, in which case concurrency is required of all sharding workspace implementations. In addition, some workspace-specific interfaces are expected to explicitly support concurrency under certain conditions; this is, after all, much of the reason we require workspaces to avoid central database access whenever possible. Most prominent of these is the PipelineWorkspaceButler.run_quanta() method, which guarantees correct behavior in the presence of concurrent calls as long as those calls identify disjoint sets of quanta to run, allowing it to be used as an execution primitive by higher-level logic capable of generating those disjoint sets (e.g. BPS).

In fact, we will define PipelineWorkspaceButler.run_quanta() to block when it is given a quantum whose inputs are produced by other quanta that have not been run. This is possible because this workspace implementation will also record quantum status (e.g. in per-quantum report files) for provenance, and we can modify the single-quantum execution logic to wait (e.g. for these files) to appear upstream. This actually addresses a possible soundness problem in our current execution model: ResourcePath backends other than POSIX and S3 may not guarantee that an output file artifacts write will propagate to all servers by the time a read or existence check is performed on a downstream node, let alone by the time the write call returns, and we think we’ve seen this problem in some WebDAV storage in the past. With our current approach to execution this can result in silently incorrect execution, because the dataset will be assumed to have been not produced and may be ignored by the downstream.

2.9 Phasing out prediction-mode datastore

Allowing file datastore to operate in “prediction mode” (where a datastore client assumes datasets were written with the same formatters and file templates it is configured to use) was a necessary part of getting QBB working. With workspaces, we hope to retire it, by saving datastore records along with the quantum status information the workspace will already be required to store, and which downstream quanta will be required to read. This will allow us to simplify the datastore code and eliminate existence checks both during execution and when committing a quantum-graph-based workflow back to the central data repository.

2.10 Pipeline workspace actions

Creating a new pipeline workspace will not require that the quantum graph be produced immediately, though this will be a convenience operation (combining all of the steps described below) we expect to be frequently exercised. Instead, only the pipeline must be provided up-front, and even that can be overwritten in development mode. Tasks within that pipeline are activated next, which writes their init output file artifacts to the workspace (note that this now happens before a quantum graph is built). Quanta can then be built, shard-by-shard; the workspace will remember which shards have quanta already. Even executed quanta are not immutable until the workspace is committed - PipelineWorkspaceButler will provide methods for quantum state transitions, like resetting a failed quantum so it can be attempted again.

Development mode will provide much more flexibility in the order these steps these can be performed, generally by resetting downstream steps and allowing output dataset to be clobbered.

3 Python components and operation details

3.1 Workspace construction and completion (daf_butler)

Workspaces are expected to play a central role in the DMTN-249 [1] consistency model, in which file artifacts are written prior to the insertion of their metadata into the database, but only after some record is made in the central repository that those artifacts may exist. This means workspace construction and removal operations need to be very careful about consistency in the presence of errors and concurrency. This is complicated by the fact that workspaces are extensible; all concrete implementations will live downstream of daf_butler.

We envision workspace creation to be aided by two helper classes that are typically defined along with a new WorkspaceButler implementation:

After a factory is used to write the rest of the workspace’s initial state, the configuration is written to a JSON file in a predefined (by a URI template in the central repository butler configuration) location. This file is read and used to make a new WorkspaceButler instance without requiring access to the full butler.

This is all driven by a make_workspace() method on the full Butler class, for which the prototyping here includes a nominal implementation with detailed comments about how responsibility (especially for error-handling) is shared by different methods.

After an internal workspace has been created, a client WorkspaceButler can be obtained from a full Butler to the central repository by calling Butler.get_workspace(), or without ever making a full butler by calling ButlerConfig.make_workspace_butler(). External workspace construction goes through WorkspaceButler.make_external().

We expect most concrete workspace implementations to define a butler subcommand for their creation, and for most end users to interact only with that command-line interface.

Committing a workspace goes through WorkspaceButler.commit(), and abandoning one goes through WorkspaceButler.abandon(). Creating a new standalone repository goes through WorkspaceButler.export(). All of these have nominal implementations in the prototype, showing that they delegate to many of the same abstract methods to transfer their content to full butlers and remove anything that remains from the workspace.

To make exporting and external workspaces more useful, WorkspaceButler.transfer_inputs() may be used to transfer the file artifacts of input datasets used by the workspace into the workspace itself. This will allow the external workspace (depending on its implementation) to be used after fully severing its connection to the central data repository (e.g. copying it to a laptop). These datasets are also included in the standalone data repository created by WorkspaceButler.export().

3.2 Quantum graph and provenance queries (daf_butler)

This technote includes a simplified (relative to [2]) proposal for provenance querying on the full butler, primarily because PipelineWorkspaceButler will require major changes to the QuantumGraph class, and it makes sense to include changes directed at using the same class (or at least a common base class) to report provenance to avoid yet another round of disruption.

The entry point is Butler.query_provenance(), which delegates much of its functionality to a new parsed string-expression language, represented here by the QuantumGraphExpression placeholder class. I am quite optimistic that this will actually be pretty easy to implement, with one important caveat: we do not promise to efficiently resolve these expressions against large (let alone unbounded) sequences of collections, allowing us to implement expression resolution by something as simple as reading per-collection quantum-graph files into memory and calling networkx methods (which are generally O(N) in the size of the graph we’ve loaded). A moderately complex expression could look something like this:

isr..(..warp@{tract=9813, patch=22, visit=1228} | ..warp@{tract=9813, patch=22, visit=1230})

which evaluates to “all quanta and datasets downstream of the isr task that are also upstream of either of two particular warp datasets”. The standard set operators have their usual meanings, and .. is used (just as in the butler data ID query language) to specify ranges. In the context of a DAG, half-open ranges mean ancestors or descendants, while:

a..b

is a shortcut for:

a.. & ..b

The return type is the new QuantumGraph, which has very little in common with its lsst.pipe.base.QuantumGraph predecessor in terms of its Python API, though of course they are conceptually very similar. They may have more in common under the hood, especially with regards to serialization. Major differences include:

  • It does not attempt to hold task or config information, which makes it easier to put the class in daf_butler (where it needs to be for Butler.query_provenance()). Instead it just has string task labels that can be looked up in a Pipeline or PipelineGraph, which are stored as regular butler datasets in the same RUN collection as the quanta they correspond to.

  • Its interface is designed to allow implemenations to load the the DataCoordinate and DatasetRef information associated with a node only on demand. Our profiling has shown that saving and loading that information constitutes the vast majority of the time it takes to serialize and deserialize graphs, and we want to give future implementations the freedom to simply not do a lot of that work unless it’s actually needed. We expect many provenance use cases to involve traversing a large graph while only requiring details about a small subset of the nodes.

  • It records status for each node via the new QuantumStatus and DatasetStatus enums. These are not intended to provide human-readable error reports - this is a known gap in this proposal I’d like to resolve later - but they do provide a sufficiently rich set of states to represent human-driven responses to errors during processing as state transitions (e.g. PipelineWorkspaceButler.accept_failed_quanta(), poison_successful_quanta(), and reset_quanta()), as well as a way to record those responses in provenance.

  • Many of the current lsst.pipe.base.QuantumGraph accessors are missing, having been replaced by a bipartite networkx.MultiDiGraph accessor that includes nodes for datasets as well as quanta. Some of these may return as convenience methods for particularly common operations, but I like the idea of embracing the networkx graphs as the primary interface, having grown quite fond of them while working on PipelineGraph.

  • In order to be suitable for provenance query results that can span collections, a particular QuantumGraph instance is not longer restricted to a single RUN collection

The prototype here defines QuantumGraph as an ABC, which may or may not be ultimately necessary. It may be that a single implementation could satisfy all quantum-oriented concrete workspaces as well as Butler.query_provenance().

How to handle the storage of committed quantum graphs is a question this technote does not attempt to fully answer. The best answer probably involves a new registry “manager” interface and implementation, despite this implying the introduction of file-based storage to the registry (for the implementation we imagine doing first; a database-graph-storage option as in [2] should be possible as well, if we find we need it). We want changing the quantum graph storage format to be tracked as a repository migration, and using the registry manager paradigm is our established way of doing that. Storing quantum graph content in database blob columns may be another good first-implementation option; this still saves us from having to fully map the quantum graph data structure to tables, while still allowing us to pull particularly useful/stable quantities into separate columns.

3.3 Pipeline workspace (pipe_base)

The PipelineWorkspaceButler that defines the new interface for low-level task execution has been prototyped here as an ABC living in pipe_base. We could implement this ABC fully in ctrl_mpexec using modified versions of tools (e.g. SingleQuantumExecutor) already defined there, but it may also make sense to move most or all of the implementation to pipe_base, perhaps leaving only multiprocessing execution to ctrl_mpexec while implementing serial, in-process execution in pipe_base. Even if the base class ends up concrete and capable of simple multiprocessing, I do expect it to support subclassing for specialized pipeline execution contexts (BPS, prompt processing, mocking), though composition or even command-line delegation may be a better option for some of these.

A pipeline workspace is initialized with a Pipeline or PipelineGraph, and upon creation it stores these and (usually) the software versions currently in use as datasets (with empty data IDs) to the workspace. Getter methods (get_pipeline(), get_pipeline_graph(), get_packages()) provide access to the corresponding in-memory objects (these are not properties because they may do I/O).

A pipeline workspace can be initialized in or converted (irreversibly) to development_mode, which disables the saving of pipeline versions. The pipeline associated with the workspace may be reset only in development mode. Committing a development-mode pipeline workspace does not save provenance or configs to the central repository, because these are in general unreliable. Often development-mode workspaces will ultimately be abandoned instead of committed.

After initialization, pipeline workspace operations proceed in roughly three stages:

  1. Tasks are activated, which writes their init-outputs (including configs) to the workspace and marks them for inclusion (by default) in the next step. activate_tasks() may be called multiple times as long as the order of the calls is consistent with the pipeline graph’s topologicial ordering. Init-input and init-output dataset references are accessible via get_init_input_refs() and get_init_output_refs(). It is a major change here that these are written before a quantum graph is generated. This has always made more sense, since init datasets do not depend on data IDs or other the criteria that go into quantum graph generation, but we defer init-output writes today to after quantum graph generation in order to defer the first central-repository write operations as along as possible (particularly until after we are past the possibility of errors in quantum graph generation). With workspaces deferring committing any datasets to the central repository already, this is no longer a concern and we are free to move init-output writes earlier.

  2. Quanta are built for the active tasks or a subset thereof via one or more calls to build_quanta(). Quanta are persisted to the workspace in an implementation-defined way (the first implementation will use files similar to the .qgraph files we currently save externally). In a sharded workspace, the quanta for different disjoint sets of shard data IDs (not arbitrary data IDs!) may be built separately - at first incrementally, by serial calls to build_quanta(), but eventually concurrent calls or multiprocessing parallelism should be possible as well. We also eventually expect to support building the quanta for subsequent tasks in separate (but serial) calls, allowing different where expressions (etc) for different tasks within the same graph. This is actually a really big deal: it’s a major step towards finally addressing the infamous DM-21904 problem, which is what prevents us from building a correct quantum graph for a full DRP pipeline on even small amounts of data. Very large scale productions (of the sort handled by Campaign Management) will continue to be split up into multiple workspaces/collections with external sharding, but smaller CI and developer-initiated runs of the full pipeline should possible within a single workspace, with a single batch submission and a single output RUN collection. WorkspaceButler.transfer_inputs() does nothing on a PipelineWorkspaceButler unless quanta have already been generated.

  3. Quanta are executed via one or more calls to run_quanta(). Here the base class does specify correct behavior in the presence of concurrent calls, if the quanta matching the arguments to those calls are disjoint: implementations must not attempt to execute a matching quantum until its upstream quanta have executed successfully, and block until this is the case. Quanta that have already been run are automatically skipped, while those that fail continue to block everything downstream. run_quanta() accepts both simple UUIDs and rich expressions in its specification of which quanta to run, but only the former is guaranteed to be O(1) in the size of the graph; anything else could traversing the full graph, which is O(N). Passing quantum UUIDs to PipelineWorkspaceButler is what we expect BPS to do under the hood, so this has to be fast, while everything else is a convenience.

PipelineWorkspaceButler has several methods for tracking these state transitions:

  • active_tasks: the set of currently active tasks;

  • get_built_quanta_summary(): the set of task and shard data ID combinations for which quanta have already been built;

  • query_quanta(): the execution status of all built quanta. This method’s signature is identical to that of query_provenance(), but it is limited to the workspace’s own quanta. Since it reports on quanta that could be executing concurrently, the base class makes very weak guarantees about how up-to-date its information may be. It may also be far less efficient than WMS-based approaches to getting status, especially for queries where the entire quantum graph has to be traversed. The ideal scenario (but probably not a near-term one) for BPS would be a WMS-specific workspace provided by a BPS plugin implementing this method to use WMS approaches first, delegating to PipelineWorkspaceButler via composition or inheritance with narrower queries only when necessary.

In development mode, task init-outputs (including configs) are rewritten on every call to build_quanta() or run_quanta(), because we have no way to check whether their contents would change when software versions and configs are not controlled. Quanta are not automatically rebuilt, because graph building is often slow, most development changes to tasks do not change graph structure, and it is reasonable to expect developers to be aware of when it does. We can efficiently most detect changes to the PipelineGraph due to code or config changes and warn or fail when the quantum graph is not rebuilt when it should be, but this is not rigorously true: changes to a adjustQuantum() implementation are invisible unless the quantum graph is actually rebuilt. Rebuilding the quantum graph for a shard-task combination that already has quanta is permitted only in development mode, where it immediately deletes all outputs that may exist from executing the previous quanta for that combination.

PipelineWorkspaceButler also provides methods for changing the status of already-executed quanta, even outside development mode, with consistent changes to (including, sometimes, removal of) their output datasets:

  • accept_failed_quanta() marks failed quanta as successful (while remembering their original failed state and error conditions), allowing downstream quanta to be run as long as they can do so without the outputs the failed quanta would have produced. Invoking this on all failed quanta is broadly equivalent to committing all successfully-produced datasets to the central data repository and starting a new workspace with the committed RUN collection as input, which is effectively how failures are accepted today. It is worth considering whether we should require all failed quanta to be accepted before a workspace is committed (outside development mode) to make this explicit rather than implicit.

  • poison_successful_quanta() marks successful quanta as failures, as might need to occur if QA on their outputs revealed a serious problem that did not result in an exception. Downstream quanta that had already executed are also marked as failed - we assume bad inputs implies bad outputs. This does not immediately remove output datasets, but it does mark them as INVALIDATED, preventing them from being committed unless they are again explicitly reclassified.

  • reset_quanta() resets all matching quanta to the just-built PREDICTED state, deleting any existing outputs.

The activate_tasks() and build_quanta() methods accept a parsed string PipelineGraphExpression similar to (but simpler than) the QuantumGraphExpression described in 3.2   Quantum graph and provenance queries (daf_butler), which just uses dataset types and task labels as identifiers since data IDs and UUIDs are irrelevant. We also expect to enable this expression language to be used in the definition of labeled subsets in pipeline YAML files in the future.

4 Command-line interface

This technote does not yet include a prototype command-line interface for PipelineWorkspaceButler, despite this being the main way we expect most users to interact with it. We do expect the CLI to involve a suite of new butler subcommands (possibly another level of nesting, i.e. butler workspace build-quanta, butler workspace run-quanta), and for the current pipetask tool to be deprecated in full rather than migrated. Details will be added to this technote on a future ticket.

5 Implementation notes

While the interface of PipelineWorkspaceButler is intended to permit implementations that store their persistent state in other ways, such as a NoSQL database (Redis seems particularly well suited), the initial implementation will use files. We’ll need a file-based implemenatation in the long term anyway to make it easy to set up a minimal middleware environment without the sort administrative responsibilities nearly all databases involve.

These files will sometimes be managed by a Datastore; certainly this will be the case for the file artifacts of datasets that could be eventually committed as-is back to the central repository, including the workspace-written datasets like packages and the new pipeline and pipeline_graph datasets.

Quantum graphs don’t fit as well into Datastore management. This is partly a conceptual issue - we expect quantum graph files to continue to provide the sort of dataset metadata (data IDs, datastore records) the database provides for the central repository (as in today’s QBB), so putting quantum graph files into a Datastore is a little like putting a SQLite database file into a Datastore. And this isn’t entirely conceptual - like SQLite database files, we may want to be able to modify quantum graph files in-place (even if that’s just to append), and that’s not a door we want to open for the butler dataset concept. Quantum graph files also exist on the opposite end of the spectrum from regular pipeline output datasets in terms of whether we will want to rewrite them rather than just ingest them on commit. Even if provenance quantum graphs in the central repository are stored in files initially (the plan in mind here), we probably want to strip out all of the dimension record data they hold that is wholly redundant with what’s in the database, and since concerns about modifying the graphs disappear after commit, we probably want to consolidate the graph information into fewer files.

This last point also holds for a new category of file-based state we’ll need to add for PipelineWorkspaceButler: per-quantum status files that are written after each a quantum is executed. In addition to status flags and exception information for failures, these are can hold output-dataset datastore records, and their existence is what downstream quanta will block on in PipelineWorkspaceButler.run_quanta. These will be scanned as-is by PipelineWorkspaceButler.query_quanta, since that’s all it can do, but we really want to merge this status information with the rest of the quantum graph on commit (while dropping the redundant dimension records, and moving datastore records into the central database), potentially making Butler.query_provenance much more efficient in terms of storage and access cost.

6 Prototype code

This technote’s git repository includes two Python files with stubs representing new interfaces for the daf_butler and pipe_base packages. These can be inspected directly, but the most important parts are included here so classes and methods can be referenced by the preceding sections. This occasionally includes proto-implementations, but only when these are particularly illustrative of the relationships between interfaces.

6.1 daf_butler

class Butler
make_workspace()
    def make_workspace(
        self,
        name: str,
        input_collections: Sequence[str],
        factory: WorkspaceFactory,
        *,
        shard_by: Iterable[str] | DimensionGraph | None = None,
        chain: str | None = None,
        root: ResourcePathExpression | None = None,
    ) -> WorkspaceButler:
        """Make a new workspace.

        Parameters
        ----------
        name : `str`
            Name of the workspace.  This can be used to retrieve it later (via
            `get_workspace`) and will be used as the name of the ``RUN``
            collection created when the workspace is committed.
        input_collections : `~collections.abc.Sequence` [ `str` ]
            Collections whose inputs may be used by the workspace.
        factory : `~collections.abc.Callable`
            Callback that is passed all arguments to this method (after
            transforming some defaults) as well as ``self`` and the butler
            configuration to construct the actual workspace.
        shard_by : `~collections.abc.Iterable` [ `str` ] or `DimensionGraph`, \
                optional
            Dimensions for data IDs that can be separately committed from this
            workspace.
        chain : `str`, optional
            Name of a chained collection to create or modify to include the new
            output run and the input collections on commit.
        root : convertible to `lsst.resources.ResourcePath`, optional
            If not `None`, a path to an external directory where the
            workspace's file artifacts should be stored.  This marks the
            workspace as "external", which means that those artifacts must be
            transferred in order to be committed, but it is easier to set up a
            standalone data repository from them instead (see
            `WorkspaceButler.export`).

        Returns
        -------
        workspace : `WorkspaceButler`
            Butler client for the new workspace.

        Notes
        -----
        We may be able to add some syntactic sugar to make this pattern a
        little simpler for users, but we may not have to if they always use a
        CLI command defined in ``pipe_base`` to do it anyway.  The idea of
        having a callback here is that the central data repository's
        configuration may have options that workspace factories could read when
        a new workspace is constructed, like queues for a BPS workspace.
        """
        # Standardize arguments.
        match shard_by:
            case None:
                shard_by = self.dimensions.empty
            case DimensionGraph():  # type: ignore[misc]
                pass
            case _:
                shard_by = self.dimensions.extract(shard_by)
        if root is None:
            # This is an internal workspace.
            config_uri = self._config.get_workspace_config_uri(name)
            # For internal workspaces, we insert a row identifying the
            # workspace into a database table before it is actually created,
            # and fail if such a row already exists.  This is effectively a
            # per-name concurrency lock on the creation of workspaces.
            self._insert_workspace_record(name)
        else:
            # This is an external workspace.
            root = ResourcePath(root, forceDirectory=True)
            config_uri = root.join(WorkspaceConfig.FILENAME)
        # Delegate to the factory object to do most of the work.  This writes
        # persistent state (e.g. to files or a database).
        try:
            workspace_butler, workspace_config = factory(
                name, input_collections, shard_by, chain, root, self, self._config
            )
        except HandledWorkspaceFactoryError as err:
            # An error occurred, but the factory cleaned up its own mess.  We
            # can remove the record of the workspace from the database and
            # just re-raise the original exception.
            self._remove_workspace_record(name)
            raise cast(BaseException, err.__cause__)
        except BaseException as err:
            # An error occurred and the factory cannot guarantee anything about
            # the persistent state.  Make it clear that administrative action
            # is needed.
            #
            # Note that this state is recognizable for internal workspaces from
            # the existence of a central database row for the workspace and the
            # absence of a config file, and that the database row needs to be
            # deleted for an administrator to mark it as cleaned up.  For
            # external workspaces we expect the user to just 'rm -rf' (or
            # equivalent) the workspace directory.
            raise CorruptedWorkspaceError(
                f"New workspace {name} with root {root} was corrupted during construction."
            ) from err
        try:
            # Save a configuration file for the workspace to allow the
            # WorkspaceButler to be reconstructed without the full Butler in
            # the future.
            with config_uri.open("w") as stream:
                stream.write(workspace_config.json())
        except BaseException:
            # If we fail here, try to clean up.
            try:
                workspace_butler._remove_remaining_content()
            except BaseException as err:
                # Couldn't clean up.
                raise CorruptedWorkspaceError(
                    f"New workspace {name} with root {root} was corrupted after to write {config_uri}."
                ) from err
            # Successfully cleaned up workspace persistent state, try to remove
            # from database as well if this is an internal workspace.
            if root is None:
                self._remove_workspace_record(name)
            raise
        return workspace_butler
get_workspace()
    def get_workspace(self, name: str) -> WorkspaceButler:
        """Return an existing internal workspace butler.

        Parameters
        ----------
        name : `str`
            Name used to construct the workspace.

        Returns
        -------
        workspace : `WorkspaceButler`
            Butler client for the workspace.
        """
        return self._config.make_workspace_butler(
            name,
            parent_write_butler=self if self.is_writeable else None,
            parent_read_butler=self if not self.is_writeable else None,
        )
query_provenance()
    def query_provenance(
        self,
        collections: str | Sequence[str],
        *,
        where: QuantumGraphExpression | str | EllipsisType = ...,
        bind: Mapping[str, Any] | None = None,
        data_id: DataId | None = None,
        **kwargs: Any,
    ) -> QuantumGraph:
        """Query for provenance as a directed-acyclic graph of datasets and
        'quanta' that produce them.

        Parameters
        ----------
        collections
            Collection or collections to search, in order.  Overall-input
            datasets may also be included, and shadowed quanta or datasets may
            also appear if they are ancestors or descendants of a non-shadowed
            quantum or dataset identified in the expression.
        where : `QuantumGraphExpression`, `str`, or ``...``, optional
            A `QuantumGraphExpression` that restricts the quanta to return, or
            a string expression that can be parsed into one.  ``...`` can be
            used to select all quanta consistent with criteria from other
            arguments.
        bind : `~collections.abc.Mapping`, optional
            Mapping from identifiers appearing in ``where`` to the values they
            should be substituted with.
        data_id : `~collections.abc.Mapping` or \
                `~lsst.daf.butler.DataCoordinate`, optional
            Data ID that constrains the data IDs of all quanta returned and the
            data IDs of any dataset data ID literalsappearin in ``where`` (e.g.
            providing ``instrument`` so it doesn't have to be repeated every
            time a ``visit`` is referenced).  Quanta with dimensions not
            constrained by this data ID's dimensions are still constrained by
            it via spatiotemporal relationships, just as they are in query
            methods.
        **kwargs
            Additional keyword arguments are interpreted as data ID key-value
            pairs, overriding and extending those in ``data_id``.

        Returns
        -------
        quanta : `QuantumGraph`
            Matching quanta and all of their inputs and outputs.  Note that
            while datasets may be used in ``where`` to control which quanta are
            returned via ancestors/descendant relationships, we never return a
            quantum without including all of its inputs and outputs, and hence
            arguments like ``data_id`` to not directly constrain the datasets
            in the graph.

        Notes
        -----
        This only considers provenance that has been committed to the central
        data repository.

        The implementation is not expected to be optimized for the case of
        querying all (or a very large number) of collections with a highly
        constraining ``where`` expression; provenance graph storage will
        probably be run-by-run rather than indexed by data ID or
        dataset/quantum UUID.  This means it may be much more efficient to
        first query for datasets using data ID expressions in order to extract
        their ``RUN`` collection names, before querying for their provenance in
        a second pass.
        """
        raise NotImplementedError()
class ButlerConfig
make_workspace_butler()
    def make_workspace_butler(
        self,
        name: str,
        parent_read_butler: Butler | None = None,
        parent_write_butler: Butler | None = None,
    ) -> WorkspaceButler:
        """Make a butler client for an existing internal workspace."""
        config_uri = self.get_workspace_config_uri(name)
        with config_uri.open("r") as stream:
            config_data = json.load(stream)
        config_cls = doImportType(config_data["type_name"])
        config: WorkspaceConfig = config_cls(**config_data)
        return config.make_butler(
            self, parent_read_butler=parent_read_butler, parent_write_butler=parent_write_butler
        )
class WorkspaceFactory
class WorkspaceFactory(Protocol):
    """An interface for callables that construct new workspaces.

    Implementations of this protocol may be regular methods or functions, but
    we expect them to frequently be full types so instance state can be used to
    hold workspace-specific initialization state.
    """

    def __call__(
        self,
        name: str,
        input_collections: Sequence[str],
        shard_by: DimensionGraph,
        chain: str | None,
        path: ResourcePath | None,
        parent: Butler,
        parent_config: ButlerConfig,
    ) -> tuple[WorkspaceButler, WorkspaceConfig]:
        ...
class WorkspaceConfig
class WorkspaceConfig(BaseModel, ABC):
    """A configuration object that can construct a `WorkspaceButler` client
    instance for an existing workspace.
    """

    FILENAME: ClassVar[str] = "butler-workspace.json"
    """Filename used for all workspace butler configuration files."""

    type_name: str
    """Fully-qualified Python type for this `WorkspaceConfig` subclass.

    This is populated automatically by the constructor and stored in this
    attribute to make sure it is automatically saved to disk.
    """

    parent_config: dict[str, Any]
    """Parent configuration, as a nested dictionary."""

    name: str
    """Name of the workspace

    This will become the RUN collection name on commit.
    """

    input_collections: list[str]
    """Sequence of collections to search for inputs.
    """

    chain: str | None
    """CHAINED collection to create from input collections and the output
    RUN collection on commit.
    """

    def __init__(self, **kwargs: Any) -> None:
        super().__init__(type_name=get_full_type_name(self), **kwargs)

    @abstractmethod
    def make_butler(
        self,
        parent_config: ButlerConfig | None = None,
        parent_read_butler: Butler | None = None,
        parent_write_butler: Butler | None = None,
    ) -> WorkspaceButler:
        """Construct a `WorkspaceButler` instance from this configuration."""
        raise NotImplementedError()
class WorkspaceButler
make_external()
    @classmethod
    def make_external(cls, root: ResourcePathExpression) -> Self:
        """Construct a workspace butler of the appropriate derived type from
        an external workspace path.
        """
        root = ResourcePath(root, forceDirectory=True)
        with root.join(WorkspaceConfig.FILENAME).open("r") as stream:
            config_data = json.load(stream)
        config_cls = doImportType(config_data["type_name"])
        config: WorkspaceConfig = config_cls(**config_data)
        butler = config.make_butler()
        if not isinstance(butler, cls):
            raise TypeError(
                f"Unexpected workspace butler class; config file specifies {type(butler).__name__}, but "
                f"make_external was called on {cls.__name__}."
            )
        return butler
abandon()
    @final
    def abandon(self) -> None:
        """Delete the workspace and all of its contents.

        Notes
        -----
        This operation invalidates the workspace object.
        """
        try:
            self._remove_remaining_content()
        except BaseException as err:
            raise CorruptedWorkspaceError(
                f"Workspace {self.name} left in an inconsistent state by 'abandon'."
            ) from err
        self._get_parent_write_butler()._remove_workspace_record(self.name)
commit()
    @final
    def commit(
        self, shards: Iterable[DataCoordinate] | EllipsisType = ..., transfer: str | None = None
    ) -> None:
        """Register all datasets produced in this workspace with the central
        data repository, and invalidate the workspace.

        Parameters
        ----------
        data_id : `~lsst.daf.butler.DataCoordinate` or mapping, optional
            If provided, only commit certain data IDs (which must correspond to
            the workspace's sharding dimensions), and do not invalidate the
            workspace as a whole.  Datasets and provenance with these data IDs
            are removed from the workspace and added to a ``RUN`` collection in
            the central repository with the same name.
        transfer : `str`, optional
            Transfer method used to ingest datasets into the central data
            repository.  Ignored unless if this is an internal workspace.

        Notes
        -----
        Committing a workspace also transfers provenance information to the
        registry, which may (or may not!) include moving content between
        databases, between databases and files, or consolidating files, even
        when the workspace is internal.

        This method writes to the central data repository's database.
        """
        if shards is not ...:
            raise NotImplementedError("TODO: check and then update committed shards.")
        parent = self._get_parent_write_butler()
        self._transfer_content(parent, shards=shards, transfer=transfer)
        if shards is ...:
            self.abandon()
export()
    @final
    def export(
        self, *, root: ResourcePathExpression | None = None, transfer: str | None = None, **kwargs: Any
    ) -> Butler:
        """Create a new independent data repository from the output datasets in
        this workspace.

        Parameters
        ----------
        root : convertible to `lsst.resources.ResourcePath`, optional
            Root for the new data repository.  Must be provided if this is an
            internal data repository.
        transfer : `str` or `None`, optional
            Transfer mode for ingesting file artifacts into the new data
            repository.  Ignored if ``root`` is `None` or the same as the
            workspace's current root.
        **kwargs
            Forwarded to `Butler.makeRepo`.

        Returns
        -------
        butler : `Butler`
            A full butler client for the new data repository.

        Notes
        -----
        This operation invalidates the workspace object.
        """
        if root is None:
            root = self.root
            if root is None:
                raise ValueError("Cannot export an internal workspace without a root.")
        else:
            root = ResourcePath(root, forceDirectory=True)
        if root != self.root and transfer is None:
            raise ValueError(
                "A transfer mode is required to export a repository from a workspace with a different root."
            )
        full_config = ButlerConfig(Butler.makeRepo(root, **kwargs))
        full_butler = Butler(full_config, writeable=True)
        self._insert_dimension_records(full_butler)
        self._transfer_content(full_butler, shards=..., transfer=transfer)
        self.abandon()
        return full_butler
transfer_inputs()
    @abstractmethod
    def transfer_inputs(self, transfer: str | None = "copy") -> None:
        """Transfer file artifacts for all overall-input datasets to the
        workspace root and update the URIs used to fetch them during execution
        accordingly.

        Notes
        -----
        This method can only be called if the workspace root is outside the
        central data repository root.
        """
        raise NotImplementedError()
class QuantumGraph
class QuantumGraph(ABC):
    """A directed acyclic graph of datasets and the quanta that produce and
    consume them.

    Notes
    -----
    Datasets are included in a quantum graph if and only if a producing or
    consuming quantum is included; there are never isolated dataset nodes with
    no edges in a graph, and any quantum in the graph can be assumed to have
    all of its nodes.

    The attributes here do not necessarily map to how the graph is stored,
    which is workspace- and (for committed graphs) registry-defined.

    This base class needs to live in ``daf_butler`` so it can be used as the
    return type for provenance queries; note that it sees quanta only as
    entities that consume and produce datasets that are identified by a UUID or
    the combination of a task label and a data ID, and it sees tasks as just
    string labels.
    """

    @property
    @abstractmethod
    def universe(self) -> DimensionUniverse:
        """Definitions for any dimensions used by this graph."""
        raise NotImplementedError()

    @abstractmethod
    def get_bipartite_xgraph(self, *, annotate: bool = False) -> nx.MultiDiGraph:
        """Return a directed acyclic graph with UUID keys for both quanta and
        datasets.

        Parameters
        ----------
        annotate : `bool`, optional
            If `True`, add expanded `lsst.daf.butler.DataCoordinate` objects
            and `lsst.daf.butler.DatasetRef` to nodes (the latter only for
            dataset nodes) as ``data_id`` and ``ref`` attributes.

        Returns
        -------
        graph : `networkx.MultiDiGraph`
            NetworkX directed graph with quantum and dataset nodes, where edges
            only connect quanta to datasets and datasets to quanta.  This is a
            `networkx.MultiDiGraph` to represent the possibility of a quantum
            consuming a single dataset through multiple connections (edge keys
            are connection names).

        Notes
        -----
        Nodes have a ``bipartite`` attribute that is set to ``0`` for datasets
        and ``1`` for quanta.  This can be used by the functions in the
        `networkx.algorithms.bipartite` package.  Nodes representing quanta
        also have a ``label`` attribute, while nodes representing datasets have
        a ``dataset_type_name`` attribute.  All nodes have a ``run`` attribute
        with the name of the run collection or workspace they belong to.
        """
        raise NotImplementedError()

    @abstractmethod
    def get_quantum_xgraph(self, annotate: bool = False) -> nx.DiGraph:
        """Return a directed acyclic graph with UUIDs key for quanta only.

        Parameters
        ----------
        annotate : `bool`, optional
            If `True`, add expanded `lsst.daf.butler.DataCoordinate` objects
            to nodes as ``data_id`` attributes.

        Returns
        -------
        graph : `networkx.MultiDiGraph`
            NetworkX directed graph with quantum nodes only, with edges
            representing one or more datasets produced by one quantum and
            consumed by the other.

        Nodes
        -----
            Unless ``annotate=True``, nodes have ``label`` and ``run``
            attributes only.  Edges have no attributes.
        """
        raise NotImplementedError()

    @abstractmethod
    def get_data_id(self, node_id: uuid.UUID) -> DataCoordinate:
        """Return the data ID for a quantum or dataset.

        Data IDs are guaranteed to be fully expanded.
        """
        raise NotImplementedError()

    @abstractmethod
    def get_dataset_ref(self, dataset_id: uuid.UUID) -> DatasetRef:
        """Return the `~lsst.daf.butler.DatasetRef` object for a dataset node.

        Data IDs are guaranteed to be fully expanded.
        """
        raise NotImplementedError()

    @abstractmethod
    def get_quanta_for_task(self, label: str) -> Set[uuid.UUID]:
        """Return all quanta for a single task."""
        raise NotImplementedError()

    @abstractmethod
    def find_quantum(self, label: str, data_id: DataId) -> uuid.UUID:
        """Find a single quantum by data ID.

        This method will raise if the result is not unique, which can happen
        in rare cases if the graph spans multiple collections.  `find_quanta`
        is usually less convenient but can be used to handle this case.
        """
        raise NotImplementedError()

    @abstractmethod
    def find_quanta(self, label: str, data_id: DataId) -> Set[uuid.UUID]:
        """Find all quanta with a particular task and data ID."""
        raise NotImplementedError()

    @abstractmethod
    def get_datasets_for_type(self, dataset_type_name: str) -> Set[uuid.UUID]:
        """Return all datasets for a dataset type."""
        raise NotImplementedError()

    @abstractmethod
    def find_dataset(self, dataset_type_name: str, data_id: DataId) -> uuid.UUID:
        """Find a single dataset by data ID.

        This method will raise if the result is not unique, which can happen
        in rare cases if the graph spans multiple collections.  `find_datasets`
        is usually less convenient but can be used to handle this case.
        """
        raise NotImplementedError()

    @abstractmethod
    def find_datasets(self, label: str, data_id: DataId) -> Set[uuid.UUID]:
        """Find all datasets with a particular dataset type and data ID."""
        raise NotImplementedError()

    @abstractmethod
    def get_quantum_status(self, quantum_id: uuid.UUID) -> QuantumStatus:
        """Get the current status or a quantum."""
        return self.get_quantum_history(quantum_id)[-1]

    @abstractmethod
    def get_dataset_status(self, dataset_id: uuid.UUID) -> DatasetStatus:
        """Get the current status for a dataset."""
        return self.get_dataset_history(dataset_id)[-1]

    @abstractmethod
    def get_quantum_history(self, quantum_id: uuid.UUID) -> list[QuantumStatus]:
        """Get the history of this quantum's status.

        Current status is last.
        """
        raise NotImplementedError()

    @abstractmethod
    def get_dataset_history(self, dataset_id: uuid.UUID) -> list[DatasetStatus]:
        """Get the history of this dataset's status.

        Current status is last.
        """
        raise NotImplementedError()

    @abstractmethod
    def filtered(
        self,
        where: QuantumGraphExpression = ...,
        bind: Mapping[str, Any] | None = None,
        data_id: DataId | None = None,
        **kwargs: Any,
    ) -> QuantumGraph:
        """Filter the quanta and datasets in this graph according to an
        expression.
        """
        raise NotImplementedError()
class QuantumGraphExpression
class QuantumGraphExpression:
    """Placeholder for a parsed expression that subsets a quantum graph.

    This is expected to include:

    - the standard set-theory operations (intersection, union, difference,
      symmetric difference, inversion);
    - literal sets of quanta and datasets;
    - range syntax for ancestors and descendants of quanta and datasets (see
      `PipelineGraphExpression`).

    In both literal sets and range expressions, quanta and datasets could be
    identified via:

    - UUID literals;
    - combination of task label or dataset type name and data ID (e.g.
      ``makeWarp@{tract=9813, patch=42, visit=12}``);
    - bind aliases for UUIDs, sets of UUIDs, task labels, dataset type names,
      and data IDs.
    - the values of the `QuantumStatus` and `DatasetStatus` enums as keywords,
      representing the set of all quanta or datasets with that status.

    Data IDs keys that are common to all selected nodes can be provided by
    butler defaults or a separate single data ID, which should be accepted by
    any method accepting an expression of this type.

    In queries that span multiple collections, dataset and quantum identifiers
    that use data IDs are resolved in the order they appear in the collection
    sequence, but range expressions can pick up datasets and quanta that are
    shadowed.
    """
class QuantumStatus
class QuantumStatus(enum.Enum):
    """Valid states for a quantum node.

    This is intended only to capture status to the extent necessary to
    represent state transitions when executing or retrying quanta; full status
    information that's useful for diagnostic purposes does not belong here.
    """

    BUILT = enum.auto()
    """Quantum has been predicted but is not known to have been started or
    finished.
    """

    STARTED = enum.auto()
    """Execution has started but is not complete.

    Workspace implementations are not required to ever set this state.
    """

    SUCCEEDED = enum.auto()
    """Execution completed successfully.

    This includes the case where the quantum had no work to do, which can be
    determined by looking at the status of downstream dataset nodes.
    """

    FAILED = enum.auto()
    """Execution was started but raised an exception or otherwise failed.

    When this status is set on a quantum node, the node may have an
    ``exception_type`` attribute with the exception type, a `str`
    ``message`` attribute with the exception message, and a ``traceback``
    attribute with the exception traceback, *if* an exception was caught.
    """
class DatasetStatus
class DatasetStatus(enum.Flag):
    """Valid states for a dataset node in quantum graph."""

    PREDICTED = enum.auto()
    """Dataset has been predicted to exist but is not known to have been
    written or invalidated.
    """

    PRESENT = enum.auto()
    """Dataset has been produced, and the producing quantum was either
    successful or is still running.

    This is all state for all overall-inputs to the workspace that were
    present in the central data repository.
    """

    INVALIDATED = enum.auto()
    """Dataset has been produced but the producing task later failed.

    This can also occur if this quantum or an upstream one was poisoned
    (manually marked as failed after it had apparently succeeded).

    Invalidated datasets are not transferred to the central data repository
    on workspace commit, but they may be reclassified again prior to commit.
    """

6.2 pipe_base

class PipelineWorkspaceButler
development_mode
    @development_mode.setter
    @abstractmethod
    def development_mode(self, value: bool) -> None:
        """Whether this workspace is in development mode, in which version and
        configuration changes are permitted but provenance is limited.

        This can be set to `True` to enter development mode at any time, but
        doing so is irreversible.
        """
        raise NotImplementedError()
get_pipeline_graph()
    @abstractmethod
    def get_pipeline_graph(self) -> PipelineGraph:
        """Return the pipeline graph associated with this workspace.

        This is always the complete pipeline; it in general includes tasks that
        have not been activated.
        """
        raise NotImplementedError()
get_pipeline()
    @abstractmethod
    def get_pipeline(self) -> Pipeline | None:
        """Return the pipeline associated with this workspace.

        This is always the complete pipeline; it in general includes tasks that
        have not been activated.  There may not be any pipeline if only a
        pipeline graph was provided at workspace construction.
        """
        raise NotImplementedError()
reset_pipeline()
    @abstractmethod
    def reset_pipeline(
        self,
        pipeline: ResourcePathExpression | Pipeline | PipelineGraph,
    ) -> None:
        """Update the pipeline associated with this workspace.

        This is only permitted if the workspace in development mode.

        This operation requires read access to the central data repository
        database (to resolve dataset types).
        """
        raise NotImplementedError()
get_packages()
    @abstractmethod
    def get_packages(self) -> Packages | None:
        """Return the software versions frozen with this workspace.

        This is `None` if and only if the workspace is in development mode.
        """
        raise NotImplementedError()
active_tasks
    @property
    @abstractmethod
    def active_tasks(self) -> Set[str]:
        """The labels of the tasks considered active in this workspace.

        This is always a subset of the pipeline graph's task labels.  Active
        tasks have had their init-outputs (including configuration) written,
        and only active tasks may have quanta built.
        """
        raise NotImplementedError()
activate_tasks()
    @abstractmethod
    def activate_tasks(
        self,
        spec: PipelineGraphExpression | str | Iterable[str] | EllipsisType = ...,
        /,
    ) -> None:
        """Activate tasks matching the given pattern.

        This writes init-outputs for the given tasks.  Activating a task whose
        init-inputs are not available either in the input collections or the
        workspace itself is an error.

        Reactivating an already-active task in development mode causes
        init-outputs to be re-written.  Outside development mode it checks that
        software versions have not changed and hence init-outputs do not need
        to be written, and raises if software versions have changed.
        """
        raise NotImplementedError()
get_init_input_refs()
    @abstractmethod
    def get_init_input_refs(self, task_label: str) -> Mapping[str, DatasetRef]:
        """Return the init-input dataset references for an activated task.

        Mapping keys are connection names.
        """
        raise NotImplementedError()
get_init_output_refs()
    @abstractmethod
    def get_init_output_refs(self, task_label: str) -> Mapping[str, DatasetRef]:
        """Return the init-output dataset references for an activated task.

        Mapping keys are connection names.
        """
        raise NotImplementedError()
build_quanta()
    @abstractmethod
    def build_quanta(
        self,
        *,
        tasks: PipelineGraphExpression | str | Iterable[str] | EllipsisType | None = None,
        where: str = "",
        bind: Mapping[str, Any] | None = None,
        data_id: DataId | None = None,
        shards: Iterable[DataCoordinate] | None = None,
        **kwargs: Any,
    ) -> None:
        """Build a quantum graph, extending any existing graph.

        Parameters
        ----------
        tasks
            Specification for the tasks to include.  If `None`, all active
            tasks are used.  For any other value, matching tasks that are not
            already active will be activated.
        where : `str`, optional
            Data ID query string.
        bind : `~collections.abc.Mapping`, optional
            Values to substitute for aliases in ``where``.
        data_id : `~lsst.daf.butler.DataCoordinate` or mapping, optional
            Data ID that constrains all quanta.
        shards : `~collections.abc.Iterable` [ \
                ~`lsst.daf.butler.DataCoordinate` ], optional
            Data IDs that identify the sharding dimensions; must be provided if
            the sharding dimensions for this workspace are not empty.
        **kwargs
            Additional data ID key-value pairs, overriding and extending
            ``data_id``.

        Notes
        -----
        This may be called multiple times with different tasks or shards, but
        only with tasks in topological order (for each shard data ID).  This
        allows a large quantum graph to be built incrementally, with different
        data ID constraints for different tasks.

        Rebuilding the quanta for a task-shard combination that has already
        been built does nothing unless the workspace is in development mode. In
        development mode, rebuilding quanta for a task-shard combination will
        delete all downstream quanta that have not be executed, and raise if
        any downstream quanta have already been executed (`reset_quanta` can be
        used to deal with already-executed quanta in advance). Rebuilt quanta
        and their output datasets are assigned new UUIDs.

        This operation requires read access and temporary-table write access to
        the central data repository database.
        """
        raise NotImplementedError()
get_built_quanta_summary()
    @abstractmethod
    def get_built_quanta_summary(self) -> Set[tuple[str, DataCoordinate]]:
        """Report the combinations of task labels and sharding data IDs for
        which quantum graphs have already been built.

        Empty data IDs are returned for workspaces with no sharding dimensions.
        """
        raise NotImplementedError()
query_quanta()
    @abstractmethod
    def query_quanta(
        self,
        *,
        where: QuantumGraphExpression | str | EllipsisType = ...,
        bind: Mapping[str, Any] | None = None,
        data_id: DataId | None = None,
        **kwargs: Any,
    ) -> QuantumGraph:
        """Query for quanta that have already been built and possibly
        executed.

        See `Butler.query_provenance` for parameter details.

        Notes
        -----
        The returned QuantumGraph is a snapshot of the workspace's state, not a
        view.  If quanta are currently being executed when this is called, the
        status for different quanta may not reflect the same instant in time,
        but the states for a single quantum and its output datasets are always
        consistent (but possibly already out-of-date by the time the method
        returns).

        The base class does not specify the result when quanta are currently
        built while this method is called, but derived classes may enable this.
        """
        raise NotImplementedError()
run_quanta()
    @abstractmethod
    def run_quanta(
        self,
        *,
        quanta: uuid.UUID | Iterable[uuid.UUID] | EllipsisType = ...,
        where: QuantumGraphExpression | str | EllipsisType = ...,
        bind: Mapping[str, Any] | None = None,
        data_id: DataId | None = None,
        **kwargs: Any,
    ) -> None:
        """Execute tasks on quanta.

        TODO
        """
        raise NotImplementedError()
accept_failed_quanta()
    @abstractmethod
    def accept_failed_quanta(
        self,
        *,
        quanta: uuid.UUID | Iterable[uuid.UUID] | EllipsisType = ...,
        where: QuantumGraphExpression = ...,
        bind: Mapping[str, Any] | None = None,
        data_id: DataId | None = None,
        **kwargs: Any,
    ) -> None:
        """Change the status of matching quanta that currently also have status
        `~QuantumStatus.FAILED` to `~QuantumStatus.SUCCESSFUL`.

        Existing outputs (which should already be marked as
        `~DatasetStatus.INVALIDATED`) will have their status set to
        `~DatasetStatus.PRESENT`.
        """
        raise NotImplementedError()
poison_successful_quanta()
    @abstractmethod
    def poison_successful_quanta(
        self,
        *,
        quanta: uuid.UUID | Iterable[uuid.UUID] | EllipsisType = ...,
        where: QuantumGraphExpression = ...,
        bind: Mapping[str, Any] | None = None,
        data_id: DataId | None = None,
        **kwargs: Any,
    ) -> None:
        """Change the status of matching quanta that currently also have status
        `~QuantumStatus.SUCCESSFUL` to `~QuantumStatus.FAILED`, set all
        downstream quanta to `~QuantumStatus.PREDICTED`
        """
        raise NotImplementedError()
reset_quanta()
    @abstractmethod
    def reset_quanta(
        self,
        *,
        quanta: uuid.UUID | Iterable[uuid.UUID] | EllipsisType = ...,
        where: QuantumGraphExpression = ...,
        bind: Mapping[str, Any] | None = None,
        data_id: DataId | None = None,
        **kwargs: Any,
    ) -> None:
        """Change the status of matching quanta to `~QuantumStatus.PREDICTED`
        and delete all existing outputs.
        """
        raise NotImplementedError()
class PipelineGraphExpression
class PipelineGraphExpression:
    """Placeholder for a parsed expression that subsets a pipeline graph.

    This is expected to include:

    - the standard set-theory operations (intersection, union, difference,
      symmetric difference, inversion);
    - literal sets of task labels and dataset type names;
    - regular expressions or shell-style globs on task labels and dataset type
      names;
    - range syntax for ancestors and descendants of task labels and dataset
      type names (e.g. ``..b`` and ``a..``), possibly with a shortcut for
      an intersection of the two (e.g. ``a..b``);
    - bind aliases for task labels, dataset type names, and sets thereof, to
      be satisfied with an extra dictionary mapping bind alias to `str` or
      ``collections.abc.Set[str]``.
    """

7 References

References

[1] (1,2)

[DMTN-249]. Jim Bosch. Revisiting division of responsibilities in Butler components. 2023. Vera C. Rubin Observatory Data Management Technical Note. URL: https://dmtn-249.lsst.io/

[2] (1,2,3,4)

[DMTN-205]. Jim Bosch, Tim Jenness, Michelle Gower, and Andy Salnikov. Tracking Provenance in Butler. 2022. Vera C. Rubin Observatory Data Management Technical Note. URL: https://dmtn-205.lsst.io/

[3] (1,2,3)

[DMTN-177]. Tim Jenness. Limiting Registry Access During Workflow Execution. 2023. Vera C. Rubin Observatory Data Management Technical Note. URL: https://dmtn-177.lsst.io/