Source code for xnatctl.services.resources

"""Resource service for XNAT resource operations."""

from __future__ import annotations

import builtins
from collections.abc import Mapping
from pathlib import Path

import httpx

from xnatctl.core.exceptions import ResourceNotFoundError
from xnatctl.models.resource import Resource, ResourceFile

from .base import BaseService
from .hierarchy import HierarchyService


[docs] class ResourceService(BaseService): """Service for XNAT resource operations.""" @staticmethod def _parse_optional_int(value: object) -> int | None: """Parse an optional integer from XNAT-ish API values. XNAT sometimes returns empty strings or non-numeric strings for numeric fields; those should be treated as missing. Args: value: Input value from API. Returns: Parsed integer, or None if not parseable. """ if value is None: return None if isinstance(value, bool): return None if isinstance(value, int): return value if isinstance(value, str): stripped = value.strip() if not stripped: return None try: return int(stripped) except ValueError: return None return None @classmethod def _normalize_resource_row( cls, row: Mapping[str, object], session_id: str ) -> dict[str, object]: """Normalize a resource row so it is safe to validate with Resource. Args: row: Raw row from XNAT ResultSet. session_id: Parent session ID used for stable fallback IDs. Returns: Normalized copy of the row. """ normalized: dict[str, object] = dict(row) normalized["file_count"] = cls._parse_optional_int(normalized.get("file_count")) normalized["file_size"] = cls._parse_optional_int(normalized.get("file_size")) fallback_uri = normalized.get("xnat_abstractresource_id") or normalized.get("id") if not fallback_uri: fallback_uri = normalized.get("URI") or normalized.get("uri") raw_id = normalized.get("ID") if isinstance(raw_id, str) and not raw_id.strip(): raw_id = None if raw_id is None: label_value = normalized.get("label") or normalized.get("Label") label = label_value.strip() if isinstance(label_value, str) else None if not label: label = None if fallback_uri: normalized["ID"] = str(fallback_uri) elif label is not None: normalized["ID"] = f"{session_id}:{label}" else: normalized["ID"] = session_id else: normalized["ID"] = str(raw_id) return normalized
[docs] def list( self, session_id: str, scan_id: str | None = None, project: str | None = None, ) -> builtins.list[Resource]: """List resources for a session or scan. Args: session_id: Session ID scan_id: Scan ID (for scan-level resources) project: Project ID Returns: List of Resource objects """ if scan_id: if project: path = ( f"/data/projects/{project}/experiments/{session_id}/scans/{scan_id}/resources" ) else: path = f"/data/experiments/{session_id}/scans/{scan_id}/resources" else: if project: path = f"/data/projects/{project}/experiments/{session_id}/resources" else: path = f"/data/experiments/{session_id}/resources" params: dict[str, str] = {"format": "json"} data = self._get(path, params=params) results = HierarchyService.extract_rows(data) resources = [] for r in results: r = self._normalize_resource_row(r, session_id=session_id) r["session_id"] = session_id if scan_id: r["scan_id"] = scan_id if project: r["project"] = project resources.append(Resource.model_validate(r)) return resources
[docs] def get( self, session_id: str, resource_label: str, scan_id: str | None = None, project: str | None = None, ) -> Resource: """Get resource details. Args: session_id: Session ID resource_label: Resource label scan_id: Scan ID (for scan-level resources) project: Project ID Returns: Resource object Raises: ResourceNotFoundError: If resource not found """ resources = self.list(session_id, scan_id=scan_id, project=project) for resource in resources: if resource.label == resource_label: return resource raise ResourceNotFoundError("resource", resource_label)
[docs] def list_files( self, session_id: str, resource_label: str, scan_id: str | None = None, project: str | None = None, ) -> builtins.list[ResourceFile]: """List files in a resource. Args: session_id: Session ID resource_label: Resource label scan_id: Scan ID (for scan-level resources) project: Project ID Returns: List of ResourceFile objects """ if scan_id: if project: path = f"/data/projects/{project}/experiments/{session_id}/scans/{scan_id}/resources/{resource_label}/files" else: path = f"/data/experiments/{session_id}/scans/{scan_id}/resources/{resource_label}/files" else: if project: path = f"/data/projects/{project}/experiments/{session_id}/resources/{resource_label}/files" else: path = f"/data/experiments/{session_id}/resources/{resource_label}/files" params: dict[str, str] = {"format": "json"} data = self._get(path, params=params) results = HierarchyService.extract_rows(data) return [ResourceFile(**r) for r in results]
[docs] def create( self, session_id: str, resource_label: str, scan_id: str | None = None, project: str | None = None, format: str | None = None, content: str | None = None, ) -> Resource: """Create a new resource. Args: session_id: Session ID resource_label: Resource label scan_id: Scan ID (for scan-level resources) project: Project ID format: Resource format content: Content type Returns: Created Resource object """ if scan_id: if project: path = f"/data/projects/{project}/experiments/{session_id}/scans/{scan_id}/resources/{resource_label}" else: path = f"/data/experiments/{session_id}/scans/{scan_id}/resources/{resource_label}" else: if project: path = ( f"/data/projects/{project}/experiments/{session_id}/resources/{resource_label}" ) else: path = f"/data/experiments/{session_id}/resources/{resource_label}" params: dict[str, str] = {} if format: params["format"] = format if content: params["content"] = content try: self._put(path, params=params) except httpx.HTTPStatusError as exc: if exc.response.status_code != 409: raise # 409 Conflict means the resource already exists; proceed to return it return self.get(session_id, resource_label, scan_id=scan_id, project=project)
[docs] def delete( self, session_id: str, resource_label: str, scan_id: str | None = None, project: str | None = None, remove_files: bool = True, ) -> bool: """Delete a resource. Args: session_id: Session ID resource_label: Resource label scan_id: Scan ID (for scan-level resources) project: Project ID remove_files: Also remove files from filesystem Returns: True if successful """ if scan_id: if project: path = f"/data/projects/{project}/experiments/{session_id}/scans/{scan_id}/resources/{resource_label}" else: path = f"/data/experiments/{session_id}/scans/{scan_id}/resources/{resource_label}" else: if project: path = ( f"/data/projects/{project}/experiments/{session_id}/resources/{resource_label}" ) else: path = f"/data/experiments/{session_id}/resources/{resource_label}" params: dict[str, str] = {} if remove_files: params["removeFiles"] = "true" return self._delete(path, params=params)
[docs] def upload_file( self, session_id: str, resource_label: str, file_path: Path, scan_id: str | None = None, project: str | None = None, extract: bool = False, overwrite: bool = False, ) -> dict[str, object]: """Upload a file to a resource. Args: session_id: Session ID resource_label: Resource label file_path: Local file path scan_id: Scan ID (for scan-level resources) project: Project ID extract: Extract ZIP/TAR files after upload overwrite: Overwrite existing files Returns: Upload result dict """ if not file_path.exists(): raise FileNotFoundError(f"File not found: {file_path}") if scan_id: if project: path = f"/data/projects/{project}/experiments/{session_id}/scans/{scan_id}/resources/{resource_label}/files/{file_path.name}" else: path = f"/data/experiments/{session_id}/scans/{scan_id}/resources/{resource_label}/files/{file_path.name}" else: if project: path = f"/data/projects/{project}/experiments/{session_id}/resources/{resource_label}/files/{file_path.name}" else: path = f"/data/experiments/{session_id}/resources/{resource_label}/files/{file_path.name}" params: dict[str, str] = {} if extract: params["extract"] = "true" if overwrite: params["overwrite"] = "true" file_size = file_path.stat().st_size # Determine content type content_type = "application/octet-stream" suffix = file_path.suffix.lower() if suffix == ".zip": content_type = "application/zip" elif suffix in (".tar", ".tar.gz", ".tgz"): content_type = "application/x-tar" elif suffix in (".json",): content_type = "application/json" elif suffix in (".xml",): content_type = "application/xml" elif suffix in (".txt", ".csv"): content_type = "text/plain" with open(file_path, "rb") as f: self.client.put( path, params=params, data=f, headers={"Content-Type": content_type}, ) return { "success": True, "file": file_path.name, "size": file_size, "extracted": extract, }
[docs] def upload_directory( self, session_id: str, resource_label: str, directory_path: Path, scan_id: str | None = None, project: str | None = None, overwrite: bool = False, ) -> dict[str, object]: """Upload a directory to a resource (creates ZIP first). Args: session_id: Session ID resource_label: Resource label directory_path: Local directory path scan_id: Scan ID (for scan-level resources) project: Project ID overwrite: Overwrite existing files Returns: Upload result dict """ import shutil import tempfile if not directory_path.is_dir(): raise NotADirectoryError(f"Not a directory: {directory_path}") # Create temporary ZIP with tempfile.TemporaryDirectory() as tmp_dir: zip_path = Path(tmp_dir) / f"{directory_path.name}.zip" shutil.make_archive( str(zip_path.with_suffix("")), "zip", directory_path, ) return self.upload_file( session_id=session_id, resource_label=resource_label, file_path=zip_path, scan_id=scan_id, project=project, extract=True, overwrite=overwrite, )