Files
mediacms/lti/views.py
Markos Gogoulos 748d3b39ba wtv
2025-12-29 19:42:17 +02:00

574 lines
22 KiB
Python

"""
LTI 1.3 Views for MediaCMS
Implements the LTI 1.3 / LTI Advantage flow:
- OIDC Login Initiation
- LTI Launch (JWT validation and processing)
- JWKS endpoint (public keys)
- My Media view (iframe-compatible)
- Embed Media view (LTI-authenticated)
- Manual NRPS Sync
"""
import traceback
import uuid
from urllib.parse import urlencode
import jwt
from django.http import HttpResponse, HttpResponseRedirect, JsonResponse
from django.shortcuts import get_object_or_404, render
from django.urls import reverse
from django.utils.decorators import method_decorator
from django.views import View
from django.views.decorators.clickjacking import xframe_options_exempt
from django.views.decorators.csrf import csrf_exempt
from pylti1p3.exception import LtiException
from pylti1p3.message_launch import MessageLaunch
from pylti1p3.oidc_login import OIDCLogin
from rest_framework import status
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework.views import APIView
from files.models import Media
from rbac.models import RBACMembership
from .adapters import DjangoRequest, DjangoSessionService, DjangoToolConfig
from .handlers import (
apply_lti_roles,
create_lti_session,
provision_lti_context,
provision_lti_user,
validate_lti_session,
)
from .keys import get_jwks
from .models import LTILaunchLog, LTIPlatform, LTIResourceLink
from .services import LTINRPSClient
def get_client_ip(request):
"""Get client IP address from request"""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
@method_decorator(csrf_exempt, name='dispatch')
class OIDCLoginView(View):
"""
OIDC Login Initiation - Step 1 of LTI 1.3 launch
Flow: Moodle → This endpoint → Redirect to Moodle auth endpoint
"""
def get(self, request):
return self.handle_oidc_login(request)
def post(self, request):
return self.handle_oidc_login(request)
def handle_oidc_login(self, request):
"""Handle OIDC login initiation"""
try:
print("=" * 80)
print("OIDC LOGIN INITIATED")
print("=" * 80)
# Get target_link_uri and other OIDC params
target_link_uri = request.GET.get('target_link_uri') or request.POST.get('target_link_uri')
iss = request.GET.get('iss') or request.POST.get('iss')
client_id = request.GET.get('client_id') or request.POST.get('client_id')
login_hint = request.GET.get('login_hint') or request.POST.get('login_hint')
lti_message_hint = request.GET.get('lti_message_hint') or request.POST.get('lti_message_hint')
print(f"Target Link URI: {target_link_uri}")
print(f"Issuer (iss): {iss}")
print(f"Client ID: {client_id}")
print(f"Login Hint: {login_hint}")
print(f"LTI Message Hint: {lti_message_hint}")
if not all([target_link_uri, iss, client_id]):
print("ERROR: Missing required OIDC parameters")
return JsonResponse({'error': 'Missing required OIDC parameters'}, status=400)
# Get platform configuration
print(f"Looking for platform with iss={iss}, client_id={client_id}")
try:
platform = LTIPlatform.objects.get(platform_id=iss, client_id=client_id)
print(f"Platform found: {platform.name}")
except LTIPlatform.DoesNotExist:
print(f"ERROR: No platform found with iss={iss}, client_id={client_id}")
print("Available platforms:")
for p in LTIPlatform.objects.all():
print(f" - {p.name}: platform_id={p.platform_id}, client_id={p.client_id}")
return JsonResponse({'error': 'Platform not found'}, status=404)
# Create tool config for this platform
tool_config = DjangoToolConfig.from_platform(platform)
print("Tool config created")
# Wrap Django request for PyLTI1p3
lti_request = DjangoRequest(request)
# Create OIDC login handler with session and cookie services
session_service = DjangoSessionService(request)
cookie_service = DjangoSessionService(request) # Using same service for cookies
oidc_login = OIDCLogin(lti_request, tool_config, session_service=session_service, cookie_service=cookie_service)
# Redirect to platform's authorization endpoint
try:
oidc_with_cookies = oidc_login.enable_check_cookies()
redirect_url = oidc_with_cookies.redirect(target_link_uri)
if not redirect_url:
# Manual OIDC redirect construction with all required OAuth 2.0 parameters
state = str(uuid.uuid4())
nonce = str(uuid.uuid4())
# Store state and nonce in session for validation
session_service.save_launch_data(f'state-{state}', {'target_link_uri': target_link_uri, 'nonce': nonce})
# Build redirect URL with all required parameters
params = {
'response_type': 'id_token',
'redirect_uri': target_link_uri,
'state': state,
'client_id': client_id,
'login_hint': login_hint,
'scope': 'openid',
'response_mode': 'form_post',
'prompt': 'none',
'nonce': nonce,
}
# Add optional parameters if present
if lti_message_hint:
params['lti_message_hint'] = lti_message_hint
redirect_url = f"{platform.auth_login_url}?{urlencode(params)}"
return HttpResponseRedirect(redirect_url)
except Exception:
raise
except LtiException as e:
print(f"LtiException during OIDC login: {str(e)}")
traceback.print_exc()
return render(request, 'lti/launch_error.html', {'error': 'OIDC Login Failed', 'message': str(e)}, status=400)
except Exception as e:
print(f"Exception during OIDC login: {str(e)}")
traceback.print_exc()
return JsonResponse({'error': 'Internal server error during OIDC login'}, status=500)
@method_decorator(csrf_exempt, name='dispatch')
@method_decorator(xframe_options_exempt, name='dispatch')
class LaunchView(View):
"""
LTI Launch Handler - Step 3 of LTI 1.3 launch
Flow: Moodle → This endpoint (with JWT) → Validate → Provision → Session → Redirect
"""
def post(self, request):
"""Handle LTI launch with JWT validation"""
platform = None
user = None
error_message = ''
claims = {}
try:
print("=" * 80)
print("LTI LAUNCH INITIATED")
print("=" * 80)
# Get issuer from request
id_token = request.POST.get('id_token')
if not id_token:
print("ERROR: Missing id_token in launch request")
raise ValueError("Missing id_token in launch request")
print(f"Received id_token (first 50 chars): {id_token[:50]}...")
# Decode JWT to get issuer (without validation first)
unverified = jwt.decode(id_token, options={"verify_signature": False})
iss = unverified.get('iss')
aud = unverified.get('aud')
print(f"Decoded JWT - Issuer (iss): {iss}")
print(f"Decoded JWT - Audience (aud): {aud}")
# Get platform
print(f"Looking for platform with platform_id={iss}, client_id={aud}")
try:
platform = LTIPlatform.objects.get(platform_id=iss, client_id=aud)
print(f"Platform found: {platform.name}")
except LTIPlatform.DoesNotExist:
print(f"ERROR: No platform found with platform_id={iss}, client_id={aud}")
print("Available platforms:")
for p in LTIPlatform.objects.all():
print(f" - {p.name}: platform_id={p.platform_id}, client_id={p.client_id}")
raise
# Create tool config
tool_config = DjangoToolConfig.from_platform(platform)
print("Tool config created")
# Wrap Django request for PyLTI1p3
lti_request = DjangoRequest(request)
# Validate JWT and get launch data
session_service = DjangoSessionService(request)
cookie_service = DjangoSessionService(request)
# Create custom MessageLaunch that properly implements _get_request_param
class CustomMessageLaunch(MessageLaunch):
def _get_request_param(self, key):
"""Override to properly get request parameters"""
return self._request.get_param(key)
message_launch = CustomMessageLaunch(lti_request, tool_config, session_service=session_service, cookie_service=cookie_service)
# Get validated launch data
launch_data = message_launch.get_launch_data()
claims = self.sanitize_claims(launch_data)
# Extract key claims
resource_link = launch_data.get('https://purl.imsglobal.org/spec/lti/claim/resource_link', {})
resource_link_id = resource_link.get('id', 'default')
roles = launch_data.get('https://purl.imsglobal.org/spec/lti/claim/roles', [])
# Check launch type
message_type = launch_data.get('https://purl.imsglobal.org/spec/lti/claim/message_type')
if message_type == 'LtiDeepLinkingRequest':
# Deep linking request - handle separately
return self.handle_deep_linking_launch(request, message_launch, platform, launch_data)
user = provision_lti_user(platform, launch_data)
# Provision context (category + RBAC group)
if 'https://purl.imsglobal.org/spec/lti/claim/context' in launch_data:
category, rbac_group, resource_link_obj = provision_lti_context(platform, launch_data, resource_link_id)
# Apply roles
apply_lti_roles(user, platform, roles, rbac_group)
else:
# No context - might be a direct media embed
resource_link_obj = None
# Create session
create_lti_session(request, user, message_launch, platform)
# Log successful launch
LTILaunchLog.objects.create(platform=platform, user=user, resource_link=resource_link_obj, launch_type='resource_link', success=True, claims=claims)
# Determine where to redirect
redirect_url = self.determine_redirect(launch_data, resource_link_obj)
return HttpResponseRedirect(redirect_url)
except LtiException as e:
error_message = f"LTI Launch Error: {str(e)}"
print(f"LtiException during launch: {error_message}")
traceback.print_exc()
except Exception as e:
error_message = f"Launch Error: {str(e)}"
print(f"Exception during launch: {error_message}")
traceback.print_exc()
# Log failed launch
if platform:
LTILaunchLog.objects.create(platform=platform, user=user, launch_type='resource_link', success=False, error_message=error_message, claims=claims)
return render(request, 'lti/launch_error.html', {'error': 'LTI Launch Failed', 'message': error_message}, status=400)
def sanitize_claims(self, claims):
"""Remove sensitive data from claims before logging"""
safe_claims = claims.copy()
# Remove any sensitive keys if needed
return safe_claims
def determine_redirect(self, launch_data, resource_link):
"""Determine where to redirect after successful launch"""
# Check for custom parameters indicating what to show
custom = launch_data.get('https://purl.imsglobal.org/spec/lti/claim/custom', {})
# Check for custom redirect URL (any MediaCMS path)
custom_path = custom.get('redirect_path')
if custom_path:
# Ensure it starts with / and doesn't include domain
if not custom_path.startswith('/'):
custom_path = '/' + custom_path
return custom_path
# Check if specific media is requested
media_id = custom.get('media_id') or custom.get('media_friendly_token')
if media_id:
try:
media = Media.objects.get(friendly_token=media_id)
return reverse('lti:embed_media', args=[media.friendly_token])
except Media.DoesNotExist:
pass
# Default: redirect to my media
return reverse('lti:my_media')
def handle_deep_linking_launch(self, request, message_launch, platform, launch_data):
"""Handle deep linking request"""
# Get deep linking settings from launch data
deep_linking_settings = launch_data.get('https://purl.imsglobal.org/spec/lti-dl/claim/deep_linking_settings', {})
if not deep_linking_settings:
raise ValueError("Missing deep linking settings in launch data")
deep_link_return_url = deep_linking_settings.get('deep_link_return_url')
if not deep_link_return_url:
raise ValueError("Missing deep_link_return_url in deep linking settings")
# Store deep link data in session for use in SelectMediaView
request.session['lti_deep_link'] = {
'deep_link_return_url': deep_link_return_url,
'deployment_id': launch_data.get('https://purl.imsglobal.org/spec/lti/claim/deployment_id'),
'platform_id': platform.id,
'message_launch_data': launch_data, # Store full launch data for JWT creation
}
# Redirect to media selection page
return HttpResponseRedirect(reverse('lti:select_media'))
class JWKSView(View):
"""
JWKS Endpoint - Provides tool's public keys
Used by Moodle to validate signatures from MediaCMS (e.g., Deep Linking responses)
"""
def get(self, request):
"""Return tool's public JWK Set"""
# Return public keys for signature validation
jwks = get_jwks()
return JsonResponse(jwks, content_type='application/json')
class PublicKeyPEMView(View):
"""
Display public key in PEM format for easy copy/paste into Moodle
"""
def get(self, request):
"""Return public key in PEM format"""
from jwcrypto import jwk
from .models import LTIToolKeys
# Get key from database
key_obj = LTIToolKeys.get_or_create_keys()
# Convert to PEM
jwk_obj = jwk.JWK(**key_obj.public_key_jwk)
pem_bytes = jwk_obj.export_to_pem()
pem_string = pem_bytes.decode('utf-8')
# Return as plain text for easy copy/paste
return HttpResponse(
f"MediaCMS LTI Public Key (PEM Format)\n"
f"{'=' * 80}\n\n"
f"{pem_string}\n"
f"{'=' * 80}\n\n"
f"Instructions:\n"
f"1. Copy the entire key above (including BEGIN/END lines)\n"
f"2. In Moodle LTI tool configuration, change 'Public key type' to 'Public key'\n"
f"3. Paste the key into the 'Public key' field\n"
f"4. Save and try Deep Linking again\n",
content_type='text/plain',
)
class TestKeysView(View):
"""
Test JWT signing and verification with our keys
"""
def get(self, request):
"""Sign a test JWT and verify it"""
import time
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from jwcrypto import jwk
from .models import LTIToolKeys
try:
# Get keys
key_obj = LTIToolKeys.get_or_create_keys()
# Get private key for signing
jwk_obj = jwk.JWK(**key_obj.private_key_jwk)
pem_bytes = jwk_obj.export_to_pem(private_key=True, password=None)
private_key = serialization.load_pem_private_key(pem_bytes, password=None, backend=default_backend())
# Get public key for verification
public_jwk_obj = jwk.JWK(**key_obj.public_key_jwk)
public_pem_bytes = public_jwk_obj.export_to_pem()
public_key = serialization.load_pem_public_key(public_pem_bytes, backend=default_backend())
# Create test payload
payload = {
'iss': 'test',
'aud': 'test',
'exp': int(time.time()) + 3600,
'iat': int(time.time()),
'test': 'data',
}
# Sign JWT
kid = key_obj.private_key_jwk['kid']
token = jwt.encode(payload, private_key, algorithm='RS256', headers={'kid': kid})
# Try to verify JWT
try:
decoded = jwt.decode(token, public_key, algorithms=['RS256'], options={'verify_aud': False})
result = "SUCCESS: JWT signature verified correctly!\n\n"
result += f"Signed with kid: {kid}\n"
result += f"Token: {token[:100]}...\n\n"
result += f"Decoded payload: {decoded}\n\n"
result += "✓ Your key pair is valid and working correctly.\n"
result += "✓ The issue must be with Moodle's configuration or how it's fetching the public key.\n"
except Exception as verify_error:
result = "FAILED: Could not verify JWT signature!\n\n"
result += f"Error: {str(verify_error)}\n\n"
result += "✗ Your key pair is BROKEN - private and public keys don't match!\n"
result += "✗ You need to regenerate the keys.\n"
return HttpResponse(result, content_type='text/plain')
except Exception as e:
return HttpResponse(f"ERROR: {str(e)}\n\n{traceback.format_exc()}", content_type='text/plain')
@method_decorator(xframe_options_exempt, name='dispatch')
class MyMediaLTIView(View):
"""
My Media page for LTI-authenticated users
Shows user's media profile in iframe
"""
def get(self, request):
"""Display my media page"""
# Validate LTI session
lti_session = validate_lti_session(request)
if not lti_session:
return JsonResponse({'error': 'Not authenticated via LTI'}, status=403)
# Redirect to user's profile page
# The existing user profile page is already iframe-compatible
profile_url = f"/user/{request.user.username}"
return HttpResponseRedirect(profile_url)
@method_decorator(xframe_options_exempt, name='dispatch')
class EmbedMediaLTIView(View):
"""
Embed media with LTI authentication
Pattern: Extends existing /embed functionality
"""
def get(self, request, friendly_token):
"""Display embedded media"""
media = get_object_or_404(Media, friendly_token=friendly_token)
# Check LTI session
lti_session = validate_lti_session(request)
if lti_session and request.user.is_authenticated:
if request.user.has_member_access_to_media(media):
can_view = True
else:
can_view = False
if media.state in ["public", "unlisted"]:
can_view = True
if not can_view:
return JsonResponse({'error': 'Access denied', 'message': 'You do not have permission to view this media'}, status=403)
# Redirect to media view page
return HttpResponseRedirect(f"/view?m={friendly_token}")
class ManualSyncView(APIView):
"""
Manual NRPS sync for course members/roles
Endpoint: POST /lti/sync/<platform_id>/<context_id>/
Requires: User must be manager in the course RBAC group
"""
permission_classes = [IsAuthenticated]
def post(self, request, platform_id, context_id):
"""Manually trigger NRPS sync"""
try:
# Get platform
platform = get_object_or_404(LTIPlatform, id=platform_id)
# Find resource link by context
resource_link = LTIResourceLink.objects.filter(platform=platform, context_id=context_id).first()
if not resource_link:
return Response({'error': 'Context not found', 'message': f'No resource link found for context {context_id}'}, status=status.HTTP_404_NOT_FOUND)
# Verify user has manager role in the course
rbac_group = resource_link.rbac_group
if not rbac_group:
return Response({'error': 'No RBAC group', 'message': 'This context does not have an associated RBAC group'}, status=status.HTTP_400_BAD_REQUEST)
is_manager = RBACMembership.objects.filter(user=request.user, rbac_group=rbac_group, role='manager').exists()
if not is_manager:
return Response({'error': 'Insufficient permissions', 'message': 'You must be a course manager to sync members'}, status=status.HTTP_403_FORBIDDEN)
# Check NRPS is enabled
if not platform.enable_nrps:
return Response({'error': 'NRPS disabled', 'message': 'Names and Role Provisioning Service is disabled for this platform'}, status=status.HTTP_400_BAD_REQUEST)
# Get last successful launch for NRPS endpoint
last_launch = LTILaunchLog.objects.filter(platform=platform, resource_link=resource_link, success=True).order_by('-created_at').first()
if not last_launch:
return Response({'error': 'No launch data', 'message': 'No successful launch data found for NRPS'}, status=status.HTTP_400_BAD_REQUEST)
# Perform NRPS sync
nrps_client = LTINRPSClient(platform, last_launch.claims)
result = nrps_client.sync_members_to_rbac_group(rbac_group)
return Response(
{
'status': 'success',
'message': f'Successfully synced {result["synced"]} members',
'synced_count': result['synced'],
'removed_count': result.get('removed', 0),
'synced_at': result['synced_at'],
},
status=status.HTTP_200_OK,
)
except Exception as e:
return Response({'error': 'Sync failed', 'message': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)