"""Admin service for XNAT administrative operations."""
from __future__ import annotations
from collections.abc import Callable
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, cast
from .base import BaseService
[docs]
class AdminService(BaseService):
"""Service for XNAT administrative operations."""
[docs]
def refresh_catalogs(
self,
project: str,
experiments: list[str] | None = None,
options: list[str] | None = None,
limit: int | None = None,
parallel: bool = True,
workers: int = 4,
progress_callback: Callable[[int, int, str], None] | None = None,
) -> dict[str, Any]:
"""Refresh catalog XMLs for project experiments.
Args:
project: Project ID
experiments: Specific experiment IDs (or all if None)
options: Refresh options (checksum, delete, append, populateStats)
limit: Limit number of experiments
parallel: Use parallel execution
workers: Number of parallel workers
progress_callback: Callback(current, total, experiment_id)
Returns:
Summary dict with refreshed, failed, errors
"""
# Get experiments if not specified
if not experiments:
path = f"/data/projects/{project}/experiments"
params = {"format": "json", "columns": "ID"}
data = self._get(path, params=params)
experiment_rows = self._extract_results(data)
experiments = [str(r["ID"]) for r in experiment_rows if r.get("ID")]
if limit:
experiments = experiments[:limit]
total = len(experiments)
summary: dict[str, Any] = {
"refreshed": [],
"failed": [],
"errors": [],
"total": total,
}
# Build options string
option_str = ",".join(options) if options else ""
def refresh_experiment(exp_id: str) -> tuple[str, bool, str]:
"""Refresh a single experiment and return status."""
try:
path = f"/data/experiments/{exp_id}"
params: dict[str, Any] = {"pullDataFromHeaders": "true"}
if option_str:
params["options"] = option_str
self._put(path, params=params)
return (exp_id, True, "")
except Exception as e:
return (exp_id, False, str(e))
if parallel and total > 1:
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = {
executor.submit(refresh_experiment, exp_id): exp_id for exp_id in experiments
}
for i, future in enumerate(as_completed(futures)):
exp_id, success, error = future.result()
if success:
summary["refreshed"].append(exp_id)
else:
summary["failed"].append(exp_id)
summary["errors"].append({"experiment": exp_id, "error": error})
if progress_callback:
progress_callback(i + 1, total, exp_id)
else:
for i, exp_id in enumerate(experiments):
exp_id, success, error = refresh_experiment(exp_id)
if success:
summary["refreshed"].append(exp_id)
else:
summary["failed"].append(exp_id)
summary["errors"].append({"experiment": exp_id, "error": error})
if progress_callback:
progress_callback(i + 1, total, exp_id)
return summary
[docs]
def add_user_to_groups(
self,
username: str,
groups: list[str],
projects: list[str] | None = None,
role: str = "member",
) -> dict[str, Any]:
"""Add a user to XNAT groups.
Args:
username: XNAT username
groups: Group names to add user to
projects: Project IDs (expands group names per project)
role: Role (owner, member, collaborator)
Returns:
Summary dict with added, failed, errors
"""
results: dict[str, Any] = {
"added": [],
"failed": [],
"errors": [],
}
# Expand groups with projects if provided
target_groups: list[str] = []
if projects:
for project in projects:
for group in groups:
target_groups.append(f"{project}_{group}")
else:
target_groups = groups
for group in target_groups:
try:
path = f"/data/projects/{group.split('_')[0]}/users/{role}/{username}"
self._put(path)
results["added"].append(group)
except Exception as e:
results["failed"].append(group)
results["errors"].append({"group": group, "error": str(e)})
return results
[docs]
def remove_user_from_groups(
self,
username: str,
groups: list[str],
projects: list[str] | None = None,
) -> dict[str, Any]:
"""Remove a user from XNAT groups.
Args:
username: XNAT username
groups: Group names to remove user from
projects: Project IDs
Returns:
Summary dict with removed, failed, errors
"""
results: dict[str, Any] = {
"removed": [],
"failed": [],
"errors": [],
}
target_groups: list[str] = []
if projects:
for project in projects:
for group in groups:
target_groups.append(f"{project}_{group}")
else:
target_groups = groups
for group in target_groups:
try:
parts = group.split("_")
if len(parts) >= 2:
project = parts[0]
path = f"/data/projects/{project}/users/{username}"
self._delete(path)
results["removed"].append(group)
except Exception as e:
results["failed"].append(group)
results["errors"].append({"group": group, "error": str(e)})
return results
[docs]
def list_users(
self,
project: str | None = None,
) -> list[dict[str, Any]]:
"""List users.
Args:
project: Filter by project
Returns:
List of user dicts
"""
if project:
path = f"/data/projects/{project}/users"
else:
path = "/data/users"
params = {"format": "json"}
data = self._get(path, params=params)
return self._extract_results(data)
[docs]
def get_user(
self,
username: str,
) -> dict[str, Any]:
"""Get user details.
Args:
username: Username
Returns:
User details dict
"""
path = f"/data/users/{username}"
params = {"format": "json"}
data = self._get(path, params=params)
if isinstance(data, dict):
return data
results = self._extract_results(data)
if results:
return results[0]
return {}
[docs]
def audit_log(
self,
project: str | None = None,
username: str | None = None,
action: str | None = None,
since: str | None = None,
limit: int = 100,
) -> list[dict[str, Any]]:
"""Get audit log entries.
Args:
project: Filter by project
username: Filter by username
action: Filter by action type
since: Time filter (e.g., "7d", "2024-01-01")
limit: Maximum results
Returns:
List of audit log entries
"""
path = "/data/audit"
params: dict[str, Any] = {"format": "json", "limit": limit}
if project:
params["project"] = project
if username:
params["username"] = username
if action:
params["action"] = action
if since:
params["since"] = since
data = self._get(path, params=params)
return self._extract_results(data)
[docs]
def get_server_info(self) -> dict[str, Any]:
"""Get XNAT server information.
Returns:
Server info dict with version, build info, etc.
"""
path = "/xapi/siteConfig/buildInfo/version"
return cast(dict[str, Any], self._get(path))
[docs]
def get_site_config(
self,
key: str | None = None,
) -> dict[str, Any]:
"""Get site configuration.
Args:
key: Specific config key (or all if None)
Returns:
Configuration dict
"""
if key:
path = f"/xapi/siteConfig/{key}"
else:
path = "/xapi/siteConfig"
return cast(dict[str, Any], self._get(path))
[docs]
def set_site_config(
self,
key: str,
value: Any,
) -> bool:
"""Set site configuration value.
Args:
key: Config key
value: Config value
Returns:
True if successful
"""
path = f"/xapi/siteConfig/{key}"
self._put(path, json=value)
return True