Source code for django_scim2_server.views

"""SCIM 2.0 views: discovery endpoints and CRUD for Users and Groups."""

from __future__ import annotations

import json
import logging
from typing import Any

from django.http import HttpRequest, HttpResponse, JsonResponse
from django.utils.decorators import method_decorator
from django.utils.module_loading import import_string
from django.views import View
from django.views.decorators.csrf import csrf_exempt
from pydantic import BaseModel
from scim2_models import Group as SCIMGroupModel
from scim2_models import ListResponse, ResourceType, Schema
from scim2_models import User as SCIMUserModel

from django_scim2_server.conf import app_settings
from django_scim2_server.constants import (
    RESOURCE_TYPE_GROUP,
    RESOURCE_TYPE_USER,
    SCHEMA_GROUP,
    SCHEMA_USER,
    SCIM_CONTENT_TYPE,
    SERVICE_PROVIDER_CONFIG,
    URN_PATCH_OP,
)
from django_scim2_server.exceptions import (
    BadRequestError,
    NotFoundError,
    SCIMError,
    scim_error_response,
)
from django_scim2_server.filters import parse_filter

logger = logging.getLogger(__name__)

# Bound page size to mitigate resource-exhaustion requests while remaining generous.
MAX_PAGE_SIZE = 1000
# Bound offset to avoid pathological deep-offset scans on large tables.
# One million keeps compatibility for large tenants while still rejecting extreme abuse.
MAX_START_INDEX = 1000000


def _get_adapter(dotted_path: str) -> Any:
    """Import and instantiate an adapter class from a dotted path."""
    cls = import_string(dotted_path)
    return cls()


def _parse_pagination(request: HttpRequest) -> tuple[int, int]:
    """Parse and validate SCIM pagination query parameters."""
    try:
        start_index = int(request.GET.get("startIndex", 1))
        count = int(request.GET.get("count", 100))
    except (TypeError, ValueError) as exc:
        raise BadRequestError("startIndex and count must be integers") from exc

    if start_index < 1:
        raise BadRequestError("startIndex must be >= 1")
    if start_index > MAX_START_INDEX:
        raise BadRequestError(f"startIndex must be <= {MAX_START_INDEX}")
    if count < 0:
        raise BadRequestError("count must be >= 0")
    if count > MAX_PAGE_SIZE:
        raise BadRequestError(f"count must be <= {MAX_PAGE_SIZE}")
    return start_index, count


[docs] @method_decorator(csrf_exempt, name="dispatch") class SCIMView(View): """Base view for SCIM endpoints."""
[docs] def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse: """Dispatch with SCIM error handling and content type.""" auth_check = import_string(app_settings.SCIM2_SERVER_AUTH_CHECK) if not auth_check(request): error = SCIMError(detail="Authentication required", status=401) return scim_error_response(error) try: response = super().dispatch(request, *args, **kwargs) except SCIMError as exc: return scim_error_response(exc) except json.JSONDecodeError: return scim_error_response(BadRequestError("Invalid JSON in request body")) return response
[docs] def scim_response( self, data: BaseModel | dict[str, Any], status: int = 200, ) -> JsonResponse: """Return a JsonResponse with SCIM content type.""" if isinstance(data, BaseModel): json_data = data.model_dump(mode="json", by_alias=True, exclude_none=True) else: json_data = data return JsonResponse(json_data, status=status, content_type=SCIM_CONTENT_TYPE)
[docs] def parse_body(self, request: HttpRequest) -> dict[str, Any]: """Parse JSON body from request.""" return json.loads(request.body)
# Discovery views
[docs] class ServiceProviderConfigView(SCIMView): """GET /ServiceProviderConfig - SCIM service provider configuration."""
[docs] def get(self, request: HttpRequest) -> JsonResponse: """Return the service provider configuration.""" return self.scim_response(SERVICE_PROVIDER_CONFIG)
[docs] class ResourceTypesView(SCIMView): """GET /ResourceTypes - available SCIM resource types."""
[docs] def get(self, request: HttpRequest) -> JsonResponse: """Return the list of resource types.""" response = ListResponse[ResourceType]( total_results=2, resources=[RESOURCE_TYPE_USER, RESOURCE_TYPE_GROUP], ) return self.scim_response(response)
[docs] class SchemasView(SCIMView): """GET /Schemas - SCIM schema definitions."""
[docs] def get(self, request: HttpRequest) -> JsonResponse: """Return the list of schemas.""" response = ListResponse[Schema]( total_results=2, resources=[SCHEMA_USER, SCHEMA_GROUP], ) return self.scim_response(response)
# Resource views
[docs] class UserListView(SCIMView): """GET /Users (list+filter) and POST /Users (create)."""
[docs] def get(self, request: HttpRequest) -> JsonResponse: """List users with optional filtering and pagination.""" adapter = _get_adapter(app_settings.SCIM2_SERVER_USER_ADAPTER) qs = adapter.get_queryset() # Filtering filter_expr = request.GET.get("filter") if filter_expr: q = parse_filter(filter_expr, adapter.filter_map) qs = qs.filter(q) # Pagination (SCIM uses 1-based startIndex) total = qs.count() start_index, count = _parse_pagination(request) offset = start_index - 1 page = qs[offset : offset + count] resources = [adapter.to_scim(obj, request) for obj in page] response = ListResponse[SCIMUserModel]( total_results=total, start_index=start_index, items_per_page=len(resources), resources=resources, ) return self.scim_response(response)
[docs] def post(self, request: HttpRequest) -> JsonResponse: """Create a new user.""" adapter = _get_adapter(app_settings.SCIM2_SERVER_USER_ADAPTER) data = self.parse_body(request) scim_obj = adapter.from_scim(data) return self.scim_response(adapter.to_scim(scim_obj, request), status=201)
[docs] class UserDetailView(SCIMView): """GET/PUT/PATCH/DELETE /Users/<scim_id>."""
[docs] def get(self, request: HttpRequest, scim_id: str, **kwargs: Any) -> JsonResponse: """Return a single user.""" adapter = _get_adapter(app_settings.SCIM2_SERVER_USER_ADAPTER) scim_obj = self._get_object(adapter, scim_id) return self.scim_response(adapter.to_scim(scim_obj, request))
[docs] def put(self, request: HttpRequest, scim_id: str, **kwargs: Any) -> JsonResponse: """Replace a user.""" adapter = _get_adapter(app_settings.SCIM2_SERVER_USER_ADAPTER) scim_obj = self._get_object(adapter, scim_id) data = self.parse_body(request) scim_obj = adapter.from_scim(data, scim_obj) return self.scim_response(adapter.to_scim(scim_obj, request))
[docs] def patch(self, request: HttpRequest, scim_id: str, **kwargs: Any) -> JsonResponse: """Partially update a user via SCIM PatchOp.""" adapter = _get_adapter(app_settings.SCIM2_SERVER_USER_ADAPTER) scim_obj = self._get_object(adapter, scim_id) data = self.parse_body(request) self._validate_patch(data) scim_obj = adapter.patch(scim_obj, data["Operations"]) return self.scim_response(adapter.to_scim(scim_obj, request))
[docs] def delete(self, request: HttpRequest, scim_id: str, **kwargs: Any) -> HttpResponse: """Delete (deactivate) a user.""" adapter = _get_adapter(app_settings.SCIM2_SERVER_USER_ADAPTER) scim_obj = self._get_object(adapter, scim_id) adapter.delete(scim_obj) return HttpResponse(status=204)
def _get_object(self, adapter: Any, scim_id: str) -> Any: try: return adapter.get_queryset().get(id=scim_id) except adapter.get_queryset().model.DoesNotExist: raise NotFoundError(f"User {scim_id} not found") from None def _validate_patch(self, data: dict[str, Any]) -> None: schemas = data.get("schemas", []) if URN_PATCH_OP not in schemas: raise BadRequestError("PatchOp schema required") if "Operations" not in data: raise BadRequestError("Operations is required")
[docs] class GroupListView(SCIMView): """GET /Groups (list+filter) and POST /Groups (create)."""
[docs] def get(self, request: HttpRequest) -> JsonResponse: """List groups with optional filtering and pagination.""" adapter = _get_adapter(app_settings.SCIM2_SERVER_GROUP_ADAPTER) qs = adapter.get_queryset() # Filtering filter_expr = request.GET.get("filter") if filter_expr: q = parse_filter(filter_expr, adapter.filter_map) qs = qs.filter(q) # Pagination total = qs.count() start_index, count = _parse_pagination(request) offset = start_index - 1 page = qs[offset : offset + count] resources = [adapter.to_scim(obj, request) for obj in page] response = ListResponse[SCIMGroupModel]( total_results=total, start_index=start_index, items_per_page=len(resources), resources=resources, ) return self.scim_response(response)
[docs] def post(self, request: HttpRequest) -> JsonResponse: """Create a new group.""" adapter = _get_adapter(app_settings.SCIM2_SERVER_GROUP_ADAPTER) data = self.parse_body(request) scim_obj = adapter.from_scim(data) return self.scim_response(adapter.to_scim(scim_obj, request), status=201)
[docs] class GroupDetailView(SCIMView): """GET/PUT/PATCH/DELETE /Groups/<scim_id>."""
[docs] def get(self, request: HttpRequest, scim_id: str, **kwargs: Any) -> JsonResponse: """Return a single group.""" adapter = _get_adapter(app_settings.SCIM2_SERVER_GROUP_ADAPTER) scim_obj = self._get_object(adapter, scim_id) return self.scim_response(adapter.to_scim(scim_obj, request))
[docs] def put(self, request: HttpRequest, scim_id: str, **kwargs: Any) -> JsonResponse: """Replace a group.""" adapter = _get_adapter(app_settings.SCIM2_SERVER_GROUP_ADAPTER) scim_obj = self._get_object(adapter, scim_id) data = self.parse_body(request) scim_obj = adapter.from_scim(data, scim_obj) return self.scim_response(adapter.to_scim(scim_obj, request))
[docs] def patch(self, request: HttpRequest, scim_id: str, **kwargs: Any) -> JsonResponse: """Partially update a group via SCIM PatchOp.""" adapter = _get_adapter(app_settings.SCIM2_SERVER_GROUP_ADAPTER) scim_obj = self._get_object(adapter, scim_id) data = self.parse_body(request) self._validate_patch(data) scim_obj = adapter.patch(scim_obj, data["Operations"]) return self.scim_response(adapter.to_scim(scim_obj, request))
[docs] def delete(self, request: HttpRequest, scim_id: str, **kwargs: Any) -> HttpResponse: """Delete a group.""" adapter = _get_adapter(app_settings.SCIM2_SERVER_GROUP_ADAPTER) scim_obj = self._get_object(adapter, scim_id) adapter.delete(scim_obj) return HttpResponse(status=204)
def _get_object(self, adapter: Any, scim_id: str) -> Any: try: return adapter.get_queryset().get(id=scim_id) except adapter.get_queryset().model.DoesNotExist: raise NotFoundError(f"Group {scim_id} not found") from None def _validate_patch(self, data: dict[str, Any]) -> None: schemas = data.get("schemas", []) if URN_PATCH_OP not in schemas: raise BadRequestError("PatchOp schema required") if "Operations" not in data: raise BadRequestError("Operations is required")