Source code for xnatctl.services.pipelines

"""Pipeline service for XNAT pipeline operations."""

from __future__ import annotations

import builtins
import time
from collections.abc import Callable
from typing import Any

from xnatctl.core.exceptions import OperationError, ResourceNotFoundError

from .base import BaseService


[docs] class PipelineService(BaseService): """Service for XNAT pipeline operations."""
[docs] def list( self, project: str | None = None, ) -> builtins.list[dict[str, Any]]: """List available pipelines. Args: project: Filter by project ID Returns: List of pipeline dicts """ if project: path = f"/data/projects/{project}/pipelines" else: path = "/data/pipelines" params = {"format": "json"} data = self._get(path, params=params) return self._extract_results(data)
[docs] def get( self, pipeline_name: str, project: str | None = None, ) -> dict[str, Any]: """Get pipeline details. Args: pipeline_name: Pipeline name project: Project ID Returns: Pipeline details dict Raises: ResourceNotFoundError: If pipeline not found """ if project: path = f"/data/projects/{project}/pipelines/{pipeline_name}" else: path = f"/data/pipelines/{pipeline_name}" params = {"format": "json"} try: data = self._get(path, params=params) if isinstance(data, dict): return data results = self._extract_results(data) if results: return results[0] raise ResourceNotFoundError("pipeline", pipeline_name) except Exception as e: if "404" in str(e): raise ResourceNotFoundError("pipeline", pipeline_name) from e raise
[docs] def run( self, pipeline_name: str, experiment_id: str, project: str | None = None, params: dict[str, Any] | None = None, ) -> dict[str, Any]: """Run a pipeline on an experiment. Args: pipeline_name: Pipeline name experiment_id: Experiment/session ID project: Project ID params: Additional pipeline parameters Returns: Job information dict with job ID """ path = f"/data/experiments/{experiment_id}/pipelines/{pipeline_name}" request_params: dict[str, Any] = {} if params: request_params.update(params) result = self._post(path, params=request_params) # Extract job ID from result job_id = None if isinstance(result, dict): job_id = result.get("jobId") or result.get("id") elif isinstance(result, str): # Sometimes returns just the job ID as text job_id = result.strip() return { "success": True, "pipeline": pipeline_name, "experiment": experiment_id, "job_id": job_id, "result": result, }
[docs] def status( self, job_id: str, ) -> dict[str, Any]: """Get pipeline job status. Args: job_id: Job ID Returns: Job status dict """ path = f"/data/pipelines/jobs/{job_id}" params = {"format": "json"} data = self._get(path, params=params) if isinstance(data, dict): return data results = self._extract_results(data) if results: return results[0] return {"job_id": job_id, "status": "unknown"}
[docs] def wait( self, job_id: str, timeout: int = 3600, poll_interval: int = 30, progress_callback: Callable[[dict[str, Any]], None] | None = None, ) -> dict[str, Any]: """Wait for a pipeline job to complete. Args: job_id: Job ID timeout: Maximum wait time in seconds poll_interval: Seconds between status checks progress_callback: Called with status on each poll Returns: Final job status dict Raises: OperationError: If job fails or times out """ start_time = time.time() terminal_states = {"Complete", "Failed", "Error", "Killed"} while True: elapsed = time.time() - start_time if elapsed > timeout: raise OperationError( "pipeline_wait", f"Job {job_id} timed out after {timeout}s", ) status = self.status(job_id) if progress_callback: progress_callback(status) job_status = status.get("status", "").capitalize() if job_status in terminal_states: if job_status in ("Failed", "Error"): raise OperationError( "pipeline_run", f"Job {job_id} failed: {status.get('message', 'Unknown error')}", ) return status time.sleep(poll_interval)
[docs] def cancel( self, job_id: str, ) -> bool: """Cancel a running pipeline job. Args: job_id: Job ID Returns: True if cancelled successfully """ path = f"/data/pipelines/jobs/{job_id}" params = {"action": "kill"} self._post(path, params=params) return True
[docs] def list_jobs( self, experiment_id: str | None = None, project: str | None = None, status: str | None = None, limit: int = 100, ) -> builtins.list[dict[str, Any]]: """List pipeline jobs. Args: experiment_id: Filter by experiment project: Filter by project status: Filter by status limit: Maximum results Returns: List of job dicts """ if experiment_id: path = f"/data/experiments/{experiment_id}/pipelines/jobs" elif project: path = f"/data/projects/{project}/pipelines/jobs" else: path = "/data/pipelines/jobs" params: dict[str, Any] = {"format": "json", "limit": limit} if status: params["status"] = status data = self._get(path, params=params) return self._extract_results(data)