"""SCIM 2.0 views: discovery endpoints and CRUD for Users and Groups."""
from __future__ import annotations
import json
import logging
from typing import Any
from django.http import HttpRequest, HttpResponse, JsonResponse
from django.utils.decorators import method_decorator
from django.utils.module_loading import import_string
from django.views import View
from django.views.decorators.csrf import csrf_exempt
from pydantic import BaseModel
from scim2_models import Group as SCIMGroupModel
from scim2_models import ListResponse, ResourceType, Schema
from scim2_models import User as SCIMUserModel
from django_scim2_server.conf import app_settings
from django_scim2_server.constants import (
RESOURCE_TYPE_GROUP,
RESOURCE_TYPE_USER,
SCHEMA_GROUP,
SCHEMA_USER,
SCIM_CONTENT_TYPE,
SERVICE_PROVIDER_CONFIG,
URN_PATCH_OP,
)
from django_scim2_server.exceptions import (
BadRequestError,
NotFoundError,
SCIMError,
scim_error_response,
)
from django_scim2_server.filters import parse_filter
logger = logging.getLogger(__name__)
# Bound page size to mitigate resource-exhaustion requests while remaining generous.
MAX_PAGE_SIZE = 1000
# Bound offset to avoid pathological deep-offset scans on large tables.
# One million keeps compatibility for large tenants while still rejecting extreme abuse.
MAX_START_INDEX = 1000000
def _get_adapter(dotted_path: str) -> Any:
"""Import and instantiate an adapter class from a dotted path."""
cls = import_string(dotted_path)
return cls()
def _parse_pagination(request: HttpRequest) -> tuple[int, int]:
"""Parse and validate SCIM pagination query parameters."""
try:
start_index = int(request.GET.get("startIndex", 1))
count = int(request.GET.get("count", 100))
except (TypeError, ValueError) as exc:
raise BadRequestError("startIndex and count must be integers") from exc
if start_index < 1:
raise BadRequestError("startIndex must be >= 1")
if start_index > MAX_START_INDEX:
raise BadRequestError(f"startIndex must be <= {MAX_START_INDEX}")
if count < 0:
raise BadRequestError("count must be >= 0")
if count > MAX_PAGE_SIZE:
raise BadRequestError(f"count must be <= {MAX_PAGE_SIZE}")
return start_index, count
[docs]
@method_decorator(csrf_exempt, name="dispatch")
class SCIMView(View):
"""Base view for SCIM endpoints."""
[docs]
def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
"""Dispatch with SCIM error handling and content type."""
auth_check = import_string(app_settings.SCIM2_SERVER_AUTH_CHECK)
if not auth_check(request):
error = SCIMError(detail="Authentication required", status=401)
return scim_error_response(error)
try:
response = super().dispatch(request, *args, **kwargs)
except SCIMError as exc:
return scim_error_response(exc)
except json.JSONDecodeError:
return scim_error_response(BadRequestError("Invalid JSON in request body"))
return response
[docs]
def scim_response(
self,
data: BaseModel | dict[str, Any],
status: int = 200,
) -> JsonResponse:
"""Return a JsonResponse with SCIM content type."""
if isinstance(data, BaseModel):
json_data = data.model_dump(mode="json", by_alias=True, exclude_none=True)
else:
json_data = data
return JsonResponse(json_data, status=status, content_type=SCIM_CONTENT_TYPE)
[docs]
def parse_body(self, request: HttpRequest) -> dict[str, Any]:
"""Parse JSON body from request."""
return json.loads(request.body)
# Discovery views
[docs]
class ServiceProviderConfigView(SCIMView):
"""GET /ServiceProviderConfig - SCIM service provider configuration."""
[docs]
def get(self, request: HttpRequest) -> JsonResponse:
"""Return the service provider configuration."""
return self.scim_response(SERVICE_PROVIDER_CONFIG)
[docs]
class ResourceTypesView(SCIMView):
"""GET /ResourceTypes - available SCIM resource types."""
[docs]
def get(self, request: HttpRequest) -> JsonResponse:
"""Return the list of resource types."""
response = ListResponse[ResourceType](
total_results=2,
resources=[RESOURCE_TYPE_USER, RESOURCE_TYPE_GROUP],
)
return self.scim_response(response)
[docs]
class SchemasView(SCIMView):
"""GET /Schemas - SCIM schema definitions."""
[docs]
def get(self, request: HttpRequest) -> JsonResponse:
"""Return the list of schemas."""
response = ListResponse[Schema](
total_results=2,
resources=[SCHEMA_USER, SCHEMA_GROUP],
)
return self.scim_response(response)
# Resource views
[docs]
class UserListView(SCIMView):
"""GET /Users (list+filter) and POST /Users (create)."""
[docs]
def get(self, request: HttpRequest) -> JsonResponse:
"""List users with optional filtering and pagination."""
adapter = _get_adapter(app_settings.SCIM2_SERVER_USER_ADAPTER)
qs = adapter.get_queryset()
# Filtering
filter_expr = request.GET.get("filter")
if filter_expr:
q = parse_filter(filter_expr, adapter.filter_map)
qs = qs.filter(q)
# Pagination (SCIM uses 1-based startIndex)
total = qs.count()
start_index, count = _parse_pagination(request)
offset = start_index - 1
page = qs[offset : offset + count]
resources = [adapter.to_scim(obj, request) for obj in page]
response = ListResponse[SCIMUserModel](
total_results=total,
start_index=start_index,
items_per_page=len(resources),
resources=resources,
)
return self.scim_response(response)
[docs]
def post(self, request: HttpRequest) -> JsonResponse:
"""Create a new user."""
adapter = _get_adapter(app_settings.SCIM2_SERVER_USER_ADAPTER)
data = self.parse_body(request)
scim_obj = adapter.from_scim(data)
return self.scim_response(adapter.to_scim(scim_obj, request), status=201)
[docs]
class UserDetailView(SCIMView):
"""GET/PUT/PATCH/DELETE /Users/<scim_id>."""
[docs]
def get(self, request: HttpRequest, scim_id: str, **kwargs: Any) -> JsonResponse:
"""Return a single user."""
adapter = _get_adapter(app_settings.SCIM2_SERVER_USER_ADAPTER)
scim_obj = self._get_object(adapter, scim_id)
return self.scim_response(adapter.to_scim(scim_obj, request))
[docs]
def put(self, request: HttpRequest, scim_id: str, **kwargs: Any) -> JsonResponse:
"""Replace a user."""
adapter = _get_adapter(app_settings.SCIM2_SERVER_USER_ADAPTER)
scim_obj = self._get_object(adapter, scim_id)
data = self.parse_body(request)
scim_obj = adapter.from_scim(data, scim_obj)
return self.scim_response(adapter.to_scim(scim_obj, request))
[docs]
def patch(self, request: HttpRequest, scim_id: str, **kwargs: Any) -> JsonResponse:
"""Partially update a user via SCIM PatchOp."""
adapter = _get_adapter(app_settings.SCIM2_SERVER_USER_ADAPTER)
scim_obj = self._get_object(adapter, scim_id)
data = self.parse_body(request)
self._validate_patch(data)
scim_obj = adapter.patch(scim_obj, data["Operations"])
return self.scim_response(adapter.to_scim(scim_obj, request))
[docs]
def delete(self, request: HttpRequest, scim_id: str, **kwargs: Any) -> HttpResponse:
"""Delete (deactivate) a user."""
adapter = _get_adapter(app_settings.SCIM2_SERVER_USER_ADAPTER)
scim_obj = self._get_object(adapter, scim_id)
adapter.delete(scim_obj)
return HttpResponse(status=204)
def _get_object(self, adapter: Any, scim_id: str) -> Any:
try:
return adapter.get_queryset().get(id=scim_id)
except adapter.get_queryset().model.DoesNotExist:
raise NotFoundError(f"User {scim_id} not found") from None
def _validate_patch(self, data: dict[str, Any]) -> None:
schemas = data.get("schemas", [])
if URN_PATCH_OP not in schemas:
raise BadRequestError("PatchOp schema required")
if "Operations" not in data:
raise BadRequestError("Operations is required")
[docs]
class GroupListView(SCIMView):
"""GET /Groups (list+filter) and POST /Groups (create)."""
[docs]
def get(self, request: HttpRequest) -> JsonResponse:
"""List groups with optional filtering and pagination."""
adapter = _get_adapter(app_settings.SCIM2_SERVER_GROUP_ADAPTER)
qs = adapter.get_queryset()
# Filtering
filter_expr = request.GET.get("filter")
if filter_expr:
q = parse_filter(filter_expr, adapter.filter_map)
qs = qs.filter(q)
# Pagination
total = qs.count()
start_index, count = _parse_pagination(request)
offset = start_index - 1
page = qs[offset : offset + count]
resources = [adapter.to_scim(obj, request) for obj in page]
response = ListResponse[SCIMGroupModel](
total_results=total,
start_index=start_index,
items_per_page=len(resources),
resources=resources,
)
return self.scim_response(response)
[docs]
def post(self, request: HttpRequest) -> JsonResponse:
"""Create a new group."""
adapter = _get_adapter(app_settings.SCIM2_SERVER_GROUP_ADAPTER)
data = self.parse_body(request)
scim_obj = adapter.from_scim(data)
return self.scim_response(adapter.to_scim(scim_obj, request), status=201)
[docs]
class GroupDetailView(SCIMView):
"""GET/PUT/PATCH/DELETE /Groups/<scim_id>."""
[docs]
def get(self, request: HttpRequest, scim_id: str, **kwargs: Any) -> JsonResponse:
"""Return a single group."""
adapter = _get_adapter(app_settings.SCIM2_SERVER_GROUP_ADAPTER)
scim_obj = self._get_object(adapter, scim_id)
return self.scim_response(adapter.to_scim(scim_obj, request))
[docs]
def put(self, request: HttpRequest, scim_id: str, **kwargs: Any) -> JsonResponse:
"""Replace a group."""
adapter = _get_adapter(app_settings.SCIM2_SERVER_GROUP_ADAPTER)
scim_obj = self._get_object(adapter, scim_id)
data = self.parse_body(request)
scim_obj = adapter.from_scim(data, scim_obj)
return self.scim_response(adapter.to_scim(scim_obj, request))
[docs]
def patch(self, request: HttpRequest, scim_id: str, **kwargs: Any) -> JsonResponse:
"""Partially update a group via SCIM PatchOp."""
adapter = _get_adapter(app_settings.SCIM2_SERVER_GROUP_ADAPTER)
scim_obj = self._get_object(adapter, scim_id)
data = self.parse_body(request)
self._validate_patch(data)
scim_obj = adapter.patch(scim_obj, data["Operations"])
return self.scim_response(adapter.to_scim(scim_obj, request))
[docs]
def delete(self, request: HttpRequest, scim_id: str, **kwargs: Any) -> HttpResponse:
"""Delete a group."""
adapter = _get_adapter(app_settings.SCIM2_SERVER_GROUP_ADAPTER)
scim_obj = self._get_object(adapter, scim_id)
adapter.delete(scim_obj)
return HttpResponse(status=204)
def _get_object(self, adapter: Any, scim_id: str) -> Any:
try:
return adapter.get_queryset().get(id=scim_id)
except adapter.get_queryset().model.DoesNotExist:
raise NotFoundError(f"Group {scim_id} not found") from None
def _validate_patch(self, data: dict[str, Any]) -> None:
schemas = data.get("schemas", [])
if URN_PATCH_OP not in schemas:
raise BadRequestError("PatchOp schema required")
if "Operations" not in data:
raise BadRequestError("Operations is required")