qbit-ci/qbit_ci/pipeline.py
2022-10-02 19:25:41 +02:00

35 lines
1.1 KiB
Python

import typing
import qbittorrentapi
from qbit_ci.change_map import ChangeMap
from qbit_ci.errors import PipelineNonZeroExit
from qbit_ci.pipeline_step import CommandStep
from qbit_ci.pipeline_step import expr_to_step_condition
from qbit_ci.pipeline_step import GenericStep
AnyMap = typing.Mapping[str, typing.Any]
class Pipeline:
name: str
steps: typing.List[GenericStep] = []
def __init__(self, pipeline_config: AnyMap) -> None:
self.name = pipeline_config['name']
for step in pipeline_config['steps']:
conds = [*map(expr_to_step_condition, step["when"])]
self.steps.append(CommandStep(step["name"], step["commands"], conds))
def execute(self, torrent: qbittorrentapi.TorrentDictionary, changes: ChangeMap):
state = {"torrent": torrent, "changes": changes}
for step in self.steps:
if not step.should_invoke(state):
continue
exit_code = step.invoke(state)
if exit_code != 0:
raise PipelineNonZeroExit("Someone fucked up")
__all__ = ("Pipeline",)