""" LTI 1.3 Views for MediaCMS Implements the LTI 1.3 / LTI Advantage flow: - OIDC Login Initiation - LTI Launch (JWT validation and processing) - JWKS endpoint (public keys) - My Media view (iframe-compatible) - Embed Media view (LTI-authenticated) - Manual NRPS Sync """ import logging from django.http import HttpResponseRedirect, JsonResponse from django.shortcuts import get_object_or_404, render from django.urls import reverse from django.utils.decorators import method_decorator from django.views import View from django.views.decorators.clickjacking import xframe_options_exempt from django.views.decorators.csrf import csrf_exempt from pylti1p3.exception import LtiException from pylti1p3.message_launch import MessageLaunch from pylti1p3.oidc_login import OIDCLogin from rest_framework import status from rest_framework.permissions import IsAuthenticated from rest_framework.response import Response from rest_framework.views import APIView from files.models import Media from rbac.models import RBACMembership from .adapters import DjangoRequest, DjangoSessionService, DjangoToolConfig from .handlers import ( apply_lti_roles, create_lti_session, provision_lti_context, provision_lti_user, validate_lti_session, ) from .models import LTILaunchLog, LTIPlatform, LTIResourceLink from .services import LTINRPSClient logger = logging.getLogger(__name__) def get_client_ip(request): """Get client IP address from request""" x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR') if x_forwarded_for: ip = x_forwarded_for.split(',')[0] else: ip = request.META.get('REMOTE_ADDR') return ip @method_decorator(csrf_exempt, name='dispatch') class OIDCLoginView(View): """ OIDC Login Initiation - Step 1 of LTI 1.3 launch Flow: Moodle → This endpoint → Redirect to Moodle auth endpoint """ def get(self, request): return self.handle_oidc_login(request) def post(self, request): return self.handle_oidc_login(request) def handle_oidc_login(self, request): """Handle OIDC login initiation""" print("=== OIDC Login Started ===", flush=True) logger.info("=== OIDC Login Started ===") try: # Get target_link_uri and other OIDC params target_link_uri = request.GET.get('target_link_uri') or request.POST.get('target_link_uri') iss = request.GET.get('iss') or request.POST.get('iss') client_id = request.GET.get('client_id') or request.POST.get('client_id') print(f"OIDC params - iss: {iss}, client_id: {client_id}, target: {target_link_uri}", flush=True) logger.info(f"OIDC params - iss: {iss}, client_id: {client_id}, target: {target_link_uri}") if not all([target_link_uri, iss, client_id]): print("ERROR: Missing OIDC parameters", flush=True) logger.error("Missing OIDC parameters") return JsonResponse({'error': 'Missing required OIDC parameters'}, status=400) # Get platform configuration platform = get_object_or_404(LTIPlatform, platform_id=iss, client_id=client_id, active=True) print(f"Found platform: {platform.name}", flush=True) logger.info(f"Found platform: {platform.name}") # Create tool config for this platform tool_config = DjangoToolConfig.from_platform(platform) print(f"Tool config: {tool_config._config}", flush=True) # Wrap Django request for PyLTI1p3 lti_request = DjangoRequest(request) # Create OIDC login handler with session and cookie services session_service = DjangoSessionService(request) cookie_service = DjangoSessionService(request) # Using same service for cookies oidc_login = OIDCLogin(lti_request, tool_config, session_service=session_service, cookie_service=cookie_service) # Redirect to platform's authorization endpoint print(f"Target link URI: {target_link_uri}", flush=True) print(f"Auth login URL: {platform.auth_login_url}", flush=True) try: redirect_url = oidc_login.enable_check_cookies().redirect(target_link_uri) print(f"OIDC redirect URL type: {type(redirect_url)}", flush=True) print(f"OIDC redirecting to: {redirect_url}", flush=True) logger.info(f"OIDC redirecting to: {redirect_url}") if not redirect_url: print("ERROR: Empty redirect URL!", flush=True) return JsonResponse({'error': 'Failed to generate OIDC redirect URL'}, status=500) return HttpResponseRedirect(redirect_url) except Exception as e: print(f"ERROR in OIDC redirect: {str(e)}", flush=True) import traceback traceback.print_exc() raise except LtiException as e: logger.error(f"LTI OIDC Login Error: {str(e)}") return render(request, 'lti/launch_error.html', {'error': 'OIDC Login Failed', 'message': str(e)}, status=400) except Exception as e: logger.error(f"OIDC Login Error: {str(e)}", exc_info=True) return JsonResponse({'error': 'Internal server error during OIDC login'}, status=500) @method_decorator(csrf_exempt, name='dispatch') @method_decorator(xframe_options_exempt, name='dispatch') class LaunchView(View): """ LTI Launch Handler - Step 3 of LTI 1.3 launch Flow: Moodle → This endpoint (with JWT) → Validate → Provision → Session → Redirect """ def post(self, request): """Handle LTI launch with JWT validation""" print("=== LTI Launch Started ===", flush=True) logger.info("=== LTI Launch Started ===") platform = None user = None error_message = '' claims = {} try: # Get issuer from request id_token = request.POST.get('id_token') if not id_token: raise ValueError("Missing id_token in launch request") # Decode JWT to get issuer (without validation first) import jwt unverified = jwt.decode(id_token, options={"verify_signature": False}) iss = unverified.get('iss') aud = unverified.get('aud') # Get platform platform = get_object_or_404(LTIPlatform, platform_id=iss, client_id=aud, active=True) print(f"Launch from platform: {platform.name}", flush=True) logger.info(f"Launch from platform: {platform.name}") # Create tool config tool_config = DjangoToolConfig.from_platform(platform) # Wrap Django request for PyLTI1p3 lti_request = DjangoRequest(request) # Validate JWT and get launch data session_service = DjangoSessionService(request) cookie_service = DjangoSessionService(request) message_launch = MessageLaunch(lti_request, tool_config, session_service=session_service, cookie_service=cookie_service) # Get validated launch data launch_data = message_launch.get_launch_data() claims = self.sanitize_claims(launch_data) # Extract key claims sub = launch_data.get('sub') resource_link = launch_data.get('https://purl.imsglobal.org/spec/lti/claim/resource_link', {}) resource_link_id = resource_link.get('id', 'default') roles = launch_data.get('https://purl.imsglobal.org/spec/lti/claim/roles', []) # Check launch type message_type = launch_data.get('https://purl.imsglobal.org/spec/lti/claim/message_type') if message_type == 'LtiDeepLinkingRequest': # Deep linking request - handle separately return self.handle_deep_linking_launch(request, message_launch, platform, launch_data) # Provision user print(f"Provisioning user, sub: {sub}", flush=True) logger.info(f"Provisioning user, sub: {sub}") if platform.auto_create_users: user = provision_lti_user(platform, launch_data) print(f"User provisioned: {user.username}", flush=True) logger.info(f"User provisioned: {user.username}") else: # Must find existing user from .models import LTIUserMapping mapping = LTIUserMapping.objects.filter(platform=platform, lti_user_id=sub).first() if not mapping: raise ValueError("User auto-creation disabled and no existing mapping found") user = mapping.user # Provision context (category + RBAC group) if 'https://purl.imsglobal.org/spec/lti/claim/context' in launch_data: logger.info("Provisioning context...") category, rbac_group, resource_link_obj = provision_lti_context(platform, launch_data, resource_link_id) logger.info(f"Context provisioned: category={category.title if category else None}") # Apply roles apply_lti_roles(user, platform, roles, rbac_group) logger.info(f"Roles applied: {roles}") else: # No context - might be a direct media embed resource_link_obj = None # Create session create_lti_session(request, user, message_launch, platform) logger.info("LTI session created") # Log successful launch LTILaunchLog.objects.create(platform=platform, user=user, resource_link=resource_link_obj, launch_type='resource_link', success=True, claims=claims, ip_address=get_client_ip(request)) logger.info("Launch logged") # Determine where to redirect redirect_url = self.determine_redirect(launch_data, resource_link_obj) print(f"=== Launch Success - Redirecting to: {redirect_url} ===", flush=True) logger.info(f"=== Launch Success - Redirecting to: {redirect_url} ===") return HttpResponseRedirect(redirect_url) except LtiException as e: error_message = f"LTI Launch Error: {str(e)}" logger.error(error_message) except Exception as e: error_message = f"Launch Error: {str(e)}" logger.error(error_message, exc_info=True) # Log failed launch if platform: LTILaunchLog.objects.create(platform=platform, user=user, launch_type='resource_link', success=False, error_message=error_message, claims=claims, ip_address=get_client_ip(request)) return render(request, 'lti/launch_error.html', {'error': 'LTI Launch Failed', 'message': error_message}, status=400) def sanitize_claims(self, claims): """Remove sensitive data from claims before logging""" safe_claims = claims.copy() # Remove any sensitive keys if needed return safe_claims def determine_redirect(self, launch_data, resource_link): """Determine where to redirect after successful launch""" # Check for custom parameters indicating what to show custom = launch_data.get('https://purl.imsglobal.org/spec/lti/claim/custom', {}) # Check if specific media is requested media_id = custom.get('media_id') or custom.get('media_friendly_token') if media_id: try: media = Media.objects.get(friendly_token=media_id) return reverse('lti:embed_media', args=[media.friendly_token]) except Media.DoesNotExist: pass # Check resource link for linked media if resource_link and resource_link.media: return reverse('lti:embed_media', args=[resource_link.media.friendly_token]) # Default: redirect to my media return reverse('lti:my_media') def handle_deep_linking_launch(self, request, message_launch, platform, launch_data): """Handle deep linking request""" # Store deep link data in session deep_link = message_launch.get_deep_link() request.session['lti_deep_link'] = { 'deep_link_return_url': deep_link.get_response_url(), 'deployment_id': launch_data.get('https://purl.imsglobal.org/spec/lti/claim/deployment_id'), 'platform_id': platform.id, } # Redirect to media selection page return HttpResponseRedirect(reverse('lti:select_media')) class JWKSView(View): """ JWKS Endpoint - Provides tool's public keys Used by Moodle to validate signatures from MediaCMS """ def get(self, request): """Return tool's public JWK Set""" # For now, return empty JWKS since we're not signing responses # In the future, we can generate and store keys for signing deep linking responses jwks = {"keys": []} return JsonResponse(jwks, content_type='application/json') @method_decorator(xframe_options_exempt, name='dispatch') class MyMediaLTIView(View): """ My Media page for LTI-authenticated users Shows user's media profile in iframe """ def get(self, request): """Display my media page""" print(f"=== My Media LTI View - User: {request.user} ===", flush=True) logger.info(f"=== My Media LTI View - User: {request.user} ===") # Validate LTI session lti_session = validate_lti_session(request) print(f"LTI session valid: {bool(lti_session)}", flush=True) logger.info(f"LTI session valid: {bool(lti_session)}") if not lti_session: print("ERROR: LTI session validation failed", flush=True) logger.error("LTI session validation failed") return JsonResponse({'error': 'Not authenticated via LTI'}, status=403) # Redirect to user's profile page # The existing user profile page is already iframe-compatible profile_url = f"/user/{request.user.username}" print(f"Redirecting to profile: {profile_url}", flush=True) logger.info(f"Redirecting to profile: {profile_url}") return HttpResponseRedirect(profile_url) @method_decorator(xframe_options_exempt, name='dispatch') class EmbedMediaLTIView(View): """ Embed media with LTI authentication Pattern: Extends existing /embed functionality """ def get(self, request, friendly_token): """Display embedded media""" media = get_object_or_404(Media, friendly_token=friendly_token) # Check LTI session lti_session = validate_lti_session(request) if lti_session and request.user.is_authenticated: # Check RBAC access via course membership if request.user.has_member_access_to_media(media): can_view = True else: can_view = False else: # Fall back to public state check can_view = media.state == 'public' if not can_view: return JsonResponse({'error': 'Access denied', 'message': 'You do not have permission to view this media'}, status=403) # Redirect to existing embed page # The existing embed page already handles media display in iframes return HttpResponseRedirect(f"/embed?m={friendly_token}") class ManualSyncView(APIView): """ Manual NRPS sync for course members/roles Endpoint: POST /lti/sync/// Requires: User must be manager in the course RBAC group """ permission_classes = [IsAuthenticated] def post(self, request, platform_id, context_id): """Manually trigger NRPS sync""" try: # Get platform platform = get_object_or_404(LTIPlatform, id=platform_id, active=True) # Find resource link by context resource_link = LTIResourceLink.objects.filter(platform=platform, context_id=context_id).first() if not resource_link: return Response({'error': 'Context not found', 'message': f'No resource link found for context {context_id}'}, status=status.HTTP_404_NOT_FOUND) # Verify user has manager role in the course rbac_group = resource_link.rbac_group if not rbac_group: return Response({'error': 'No RBAC group', 'message': 'This context does not have an associated RBAC group'}, status=status.HTTP_400_BAD_REQUEST) is_manager = RBACMembership.objects.filter(user=request.user, rbac_group=rbac_group, role='manager').exists() if not is_manager: return Response({'error': 'Insufficient permissions', 'message': 'You must be a course manager to sync members'}, status=status.HTTP_403_FORBIDDEN) # Check NRPS is enabled if not platform.enable_nrps: return Response({'error': 'NRPS disabled', 'message': 'Names and Role Provisioning Service is disabled for this platform'}, status=status.HTTP_400_BAD_REQUEST) # Get last successful launch for NRPS endpoint last_launch = LTILaunchLog.objects.filter(platform=platform, resource_link=resource_link, success=True).order_by('-created_at').first() if not last_launch: return Response({'error': 'No launch data', 'message': 'No successful launch data found for NRPS'}, status=status.HTTP_400_BAD_REQUEST) # Perform NRPS sync nrps_client = LTINRPSClient(platform, last_launch.claims) result = nrps_client.sync_members_to_rbac_group(rbac_group) return Response( { 'status': 'success', 'message': f'Successfully synced {result["synced"]} members', 'synced_count': result['synced'], 'removed_count': result.get('removed', 0), 'synced_at': result['synced_at'], }, status=status.HTTP_200_OK, ) except Exception as e: logger.error(f"NRPS sync error: {str(e)}", exc_info=True) return Response({'error': 'Sync failed', 'message': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)