feat: LTI support and Moodle plugin

This commit is contained in:
Markos Gogoulos
2026-05-11 12:47:09 +03:00
committed by GitHub
parent b7427869b6
commit 55ab7ff34f
307 changed files with 19966 additions and 3748 deletions
+6
View File
@@ -0,0 +1,6 @@
"""
LTI 1.3 Integration for MediaCMS
Enables integration with Learning Management Systems like Moodle
"""
default_app_config = 'lti.apps.LtiConfig'
+461
View File
@@ -0,0 +1,461 @@
"""
PyLTI1p3 Django adapters for MediaCMS
Provides Django-specific implementations for PyLTI1p3 interfaces
"""
import json
import logging
import time
from typing import Any, Dict, Optional
import jwt
import requests
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from django.core.cache import cache
from jwcrypto import jwk
from pylti1p3.message_launch import MessageLaunch
from pylti1p3.oidc_login import OIDCLogin
from pylti1p3.registration import Registration
from pylti1p3.request import Request
from pylti1p3.service_connector import ServiceConnector
from pylti1p3.tool_config import ToolConfAbstract
from .models import LTIPlatform, LTIToolKeys
logger = logging.getLogger(__name__)
class DjangoRequest(Request):
"""Django request adapter for PyLTI1p3"""
def __init__(self, request):
super().__init__()
self._request = request
self._cookies = request.COOKIES
self._session = request.session
def get_param(self, key):
"""Get parameter from GET or POST"""
value = self._request.POST.get(key) or self._request.GET.get(key)
return value
def get_cookie(self, key):
"""Get cookie value"""
return self._cookies.get(key)
def is_secure(self):
"""Check if request is secure (HTTPS)"""
return self._request.is_secure()
@property
def session(self):
"""Get session"""
return self._session
def _get_request_param(self, key):
"""Internal method for PyLTI1p3 compatibility"""
return self.get_param(key)
class DjangoOIDCLogin:
"""Handles OIDC login initiation"""
def __init__(self, request, tool_config, launch_data_storage=None):
self.request = request
self.lti_request = DjangoRequest(request)
self.tool_config = tool_config
self.launch_data_storage = launch_data_storage or DjangoSessionService(request)
def get_redirect(self, redirect_url):
"""Get the redirect object for OIDC login"""
oidc_login = OIDCLogin(self.lti_request, self.tool_config, session_service=self.launch_data_storage, cookie_service=self.launch_data_storage)
return oidc_login.enable_check_cookies().redirect(redirect_url)
class DjangoMessageLaunch:
"""Handles LTI message launch validation"""
def __init__(self, request, tool_config, launch_data_storage=None):
self.request = request
self.lti_request = DjangoRequest(request)
self.tool_config = tool_config
self.launch_data_storage = launch_data_storage or DjangoSessionService(request)
def validate(self):
"""Validate the LTI launch message"""
class CustomMessageLaunch(MessageLaunch):
def _get_request_param(self, key):
"""Override to properly get request parameters"""
return self._request.get_param(key)
message_launch = CustomMessageLaunch(self.lti_request, self.tool_config, session_service=self.launch_data_storage, cookie_service=self.launch_data_storage)
return message_launch
class DjangoSessionService:
"""
Launch data storage using Django cache for state/nonce (to avoid race conditions)
and Django sessions for other data
"""
def __init__(self, request):
self.request = request
self._session_key_prefix = 'lti1p3_'
self._cache_prefix = 'lti1p3_cache_'
def _use_cache_for_key(self, key):
"""Determine if this key should use cache (for concurrent access safety)"""
# Use cache for state and nonce to avoid race conditions in concurrent launches
return key.startswith('state-') or key.startswith('nonce-')
def get_launch_data(self, key):
"""Get launch data from cache or session depending on key type"""
if self._use_cache_for_key(key):
# Get from cache (atomic, no race condition)
cache_key = self._cache_prefix + key
data = cache.get(cache_key)
else:
# Get from session (for non-concurrent data)
session_key = self._session_key_prefix + key
data = self.request.session.get(session_key)
return json.loads(data) if data else None
def save_launch_data(self, key, data):
"""Save launch data to cache or session depending on key type"""
if self._use_cache_for_key(key):
# Save to cache with 10 minute expiration (atomic operation, no race condition)
cache_key = self._cache_prefix + key
cache.set(cache_key, json.dumps(data), timeout=600)
else:
# Save to session (for non-concurrent data)
session_key = self._session_key_prefix + key
self.request.session[session_key] = json.dumps(data)
self.request.session.modified = True
return True
def check_launch_data_storage_exists(self, key):
"""Check if launch data exists in cache or session"""
if self._use_cache_for_key(key):
# Check cache
cache_key = self._cache_prefix + key
return cache.get(cache_key) is not None
else:
# Check session
session_key = self._session_key_prefix + key
return session_key in self.request.session
def check_state_is_valid(self, state, nonce):
"""Check if state is valid - state is for CSRF protection, nonce is validated separately by JWT"""
state_key = f'state-{state}'
state_data = self.get_launch_data(state_key)
if not state_data:
return False
# State exists - that's sufficient for CSRF protection
# Nonce validation is handled by PyLTI1p3 through JWT signature and claims validation
return True
def check_nonce(self, nonce):
"""Check if nonce is valid (not used before) and mark it as used"""
nonce_key = f'nonce-{nonce}'
# Check if nonce was already used
if self.check_launch_data_storage_exists(nonce_key):
return False
# Mark nonce as used
self.save_launch_data(nonce_key, {'used': True})
return True
def set_state_valid(self, state, id_token_hash):
"""Mark state as valid and associate it with the id_token_hash"""
state_key = f'state-{state}'
self.save_launch_data(state_key, {'valid': True, 'id_token_hash': id_token_hash})
return True
def get_cookie(self, key):
"""Get cookie value (for cookie service compatibility)"""
return self.request.COOKIES.get(key)
def set_cookie(self, key, value, exp=3600):
"""Set cookie value (for cookie service compatibility)"""
# Note: Actual cookie setting happens in the response, not here
# This is just for interface compatibility
return True
class DjangoCacheDataStorage:
"""Key/value storage using Django cache"""
def __init__(self, cache_name='default', **kwargs):
self._cache = cache
self._prefix = 'lti1p3_cache_'
def get_value(self, key):
"""Get value from cache"""
cache_key = self._prefix + key
return self._cache.get(cache_key)
def set_value(self, key, value, exp=3600):
"""Set value in cache with expiration"""
cache_key = self._prefix + key
return self._cache.set(cache_key, value, timeout=exp)
def check_value(self, key):
"""Check if value exists in cache"""
cache_key = self._prefix + key
return cache_key in self._cache
class DjangoServiceConnector(ServiceConnector):
def __init__(self, registration):
super().__init__(registration)
self._registration = registration
self._access_token = None
self._access_token_expires = 0
def get_access_token(self, scopes):
if self._access_token and time.time() < self._access_token_expires:
return self._access_token
key_obj = LTIToolKeys.get_or_create_keys()
jwk_obj = jwk.JWK(**key_obj.private_key_jwk)
pem_bytes = jwk_obj.export_to_pem(private_key=True, password=None)
private_key = serialization.load_pem_private_key(pem_bytes, password=None, backend=default_backend())
now = int(time.time())
payload = {
'iss': self._registration.get_client_id(),
'sub': self._registration.get_client_id(),
'aud': self._registration.get_auth_token_url(),
'iat': now,
'exp': now + 300,
'jti': str(time.time()),
}
client_assertion = jwt.encode(payload, private_key, algorithm='RS256', headers={'kid': key_obj.private_key_jwk['kid']})
token_url = self._registration.get_auth_token_url()
data = {
'grant_type': 'client_credentials',
'client_assertion_type': 'urn:ietf:params:oauth:client-assertion-type:jwt-bearer',
'client_assertion': client_assertion,
'scope': ' '.join(scopes),
}
response = requests.post(token_url, data=data, timeout=10)
response.raise_for_status()
token_data = response.json()
self._access_token = token_data['access_token']
expires_in = token_data.get('expires_in', 3600)
self._access_token_expires = time.time() + expires_in - 10
return self._access_token
def make_service_request(self, scopes, url, is_post=False, data=None, **kwargs):
access_token = self.get_access_token(scopes)
headers = {
'Authorization': f'Bearer {access_token}',
}
if 'accept' in kwargs:
headers['Accept'] = kwargs['accept']
if is_post:
response = requests.post(url, json=data, headers=headers, timeout=10)
else:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
try:
response_body = response.json()
except ValueError:
raise ValueError(f"NRPS endpoint returned non-JSON response. Status: {response.status_code}, Content-Type: {response.headers.get('Content-Type')}, Body: {response.text[:500]}")
next_page_url = None
link_header = response.headers.get('Link')
if link_header:
for link in link_header.split(','):
if 'rel="next"' in link:
next_page_url = link.split(';')[0].strip('<> ')
return {
'body': response_body,
'status_code': response.status_code,
'headers': dict(response.headers),
'next_page_url': next_page_url,
}
class DjangoToolConfig(ToolConfAbstract):
"""Tool configuration from Django models"""
def __init__(self, platforms_dict: Optional[Dict[str, Any]] = None):
"""
Initialize with platforms configuration
Args:
platforms_dict: Dictionary mapping platform_id to config
{
'https://moodle.example.com': {
'client_id': '...',
'auth_login_url': '...',
'auth_token_url': '...',
'key_set_url': '...',
'deployment_ids': [...],
}
}
"""
super().__init__()
self._config = platforms_dict or {}
def check_iss_has_one_client(self, iss):
"""Check if issuer has exactly one client"""
return iss in self._config and len([self._config[iss]]) == 1
def check_iss_has_many_clients(self, iss):
"""Check if issuer has multiple clients"""
return False
def find_registration_by_issuer(self, iss, *args, **kwargs):
"""Find registration by issuer"""
if iss not in self._config:
return None
config = self._config[iss]
registration = Registration()
registration.set_issuer(iss)
registration.set_client_id(config.get('client_id'))
registration.set_auth_login_url(config.get('auth_login_url'))
registration.set_auth_token_url(config.get('auth_token_url'))
if config.get('auth_audience'):
registration.set_auth_audience(config.get('auth_audience'))
registration.set_key_set_url(config.get('key_set_url'))
key_obj = LTIToolKeys.get_or_create_keys()
jwk_obj = jwk.JWK(**key_obj.private_key_jwk)
pem_bytes = jwk_obj.export_to_pem(private_key=True, password=None)
registration._tool_private_key = pem_bytes.decode('utf-8')
registration._tool_private_key_kid = key_obj.private_key_jwk['kid']
return registration
def find_registration_by_params(self, iss, client_id, *args, **kwargs):
"""Find registration by issuer and client ID"""
if iss not in self._config:
return None
config = self._config[iss]
if config.get('client_id') != client_id:
return None
registration = Registration()
registration.set_issuer(iss)
registration.set_client_id(config.get('client_id'))
registration.set_auth_login_url(config.get('auth_login_url'))
registration.set_auth_token_url(config.get('auth_token_url'))
if config.get('auth_audience'):
registration.set_auth_audience(config.get('auth_audience'))
registration.set_key_set_url(config.get('key_set_url'))
key_obj = LTIToolKeys.get_or_create_keys()
jwk_obj = jwk.JWK(**key_obj.private_key_jwk)
pem_bytes = jwk_obj.export_to_pem(private_key=True, password=None)
registration._tool_private_key = pem_bytes.decode('utf-8')
registration._tool_private_key_kid = key_obj.private_key_jwk['kid']
return registration
def find_deployment(self, iss, deployment_id):
"""Find deployment by issuer and deployment ID"""
if iss not in self._config:
return None
config_dict = self._config[iss]
deployment_ids = config_dict.get('deployment_ids', [])
if deployment_id not in deployment_ids:
return None
return self.find_registration_by_issuer(iss)
def find_deployment_by_params(self, iss, deployment_id, client_id, *args, **kwargs):
"""Find deployment by parameters"""
if iss not in self._config:
return None
config_dict = self._config[iss]
if config_dict.get('client_id') != client_id:
return None
deployment_ids = config_dict.get('deployment_ids', [])
if deployment_id not in deployment_ids:
return None
return self.find_registration_by_params(iss, client_id)
def get_jwks(self, iss, client_id=None):
"""Get JWKS from configuration - returns None to fetch from URL"""
return None
def get_iss(self):
"""Get all issuers"""
return list(self._config.keys())
def get_jwk(self, iss=None, client_id=None):
"""
Get private key for signing Deep Linking responses
PyLTI1p3 calls this to get the tool's private key for signing
Returns a cryptography RSA key object that PyJWT can use directly
"""
key_obj = LTIToolKeys.get_or_create_keys()
jwk_obj = jwk.JWK(**key_obj.private_key_jwk)
pem_bytes = jwk_obj.export_to_pem(private_key=True, password=None)
private_key = serialization.load_pem_private_key(pem_bytes, password=None, backend=default_backend())
return private_key
def get_kid(self, iss=None, client_id=None):
"""
Get key ID for JWT header
PyLTI1p3 calls this to get the kid to include in JWT headers
"""
key_obj = LTIToolKeys.get_or_create_keys()
return key_obj.private_key_jwk.get('kid')
@classmethod
def from_platform(cls, platform):
"""Create ToolConfig from LTIPlatform model instance"""
if isinstance(platform, LTIPlatform):
config = {platform.platform_id: platform.get_lti_config()}
return cls(config)
raise ValueError("Must provide LTIPlatform instance")
@classmethod
def from_all_platforms(cls):
"""Create ToolConfig with all platforms"""
platforms = LTIPlatform.objects.filter()
config = {}
for platform in platforms:
config[platform.platform_id] = platform.get_lti_config()
return cls(config)
+192
View File
@@ -0,0 +1,192 @@
"""
Django Admin for LTI models
"""
from django.contrib import admin, messages
from django.utils.html import format_html
from .models import (
LTILaunchLog,
LTIPlatform,
LTIResourceLink,
LTIRoleMapping,
LTIToolKeys,
LTIUserMapping,
)
@admin.register(LTIPlatform)
class LTIPlatformAdmin(admin.ModelAdmin):
"""Admin for LTI Platforms (Moodle instances)"""
list_display = ['name', 'platform_id', 'client_id', 'nrps_enabled', 'deep_linking_enabled', 'created_at']
list_filter = ['enable_nrps', 'enable_deep_linking', 'created_at']
search_fields = ['name', 'platform_id', 'client_id']
readonly_fields = ['created_at', 'updated_at']
fieldsets = (
('Basic Information', {'fields': ('name', 'platform_id', 'client_id')}),
('OIDC Endpoints', {'fields': ('auth_login_url', 'auth_token_url', 'auth_audience')}),
('JWK Configuration', {'fields': ('key_set_url',), 'classes': ('collapse',)}),
('Deployment & Features', {'fields': ('deployment_ids', 'enable_nrps', 'enable_deep_linking')}),
('Auto-Provisioning Settings', {'fields': ('remove_from_groups_on_unenroll',)}),
('Timestamps', {'fields': ('created_at', 'updated_at'), 'classes': ('collapse',)}),
)
def nrps_enabled(self, obj):
return '' if obj.enable_nrps else ''
nrps_enabled.short_description = 'NRPS'
def deep_linking_enabled(self, obj):
return '' if obj.enable_deep_linking else ''
deep_linking_enabled.short_description = 'Deep Link'
@admin.register(LTIResourceLink)
class LTIResourceLinkAdmin(admin.ModelAdmin):
"""Admin for LTI Resource Links"""
list_display = ['context_title', 'platform', 'category_link', 'rbac_group_link']
list_filter = ['platform']
search_fields = ['context_id', 'context_title', 'resource_link_id']
fieldsets = (
('Platform', {'fields': ('platform',)}),
('Context (Course)', {'fields': ('context_id', 'context_title', 'context_label')}),
('Resource Link', {'fields': ('resource_link_id', 'resource_link_title')}),
('MediaCMS Mappings', {'fields': ('category', 'rbac_group')}),
)
def category_link(self, obj):
if obj.category:
return format_html('<a href="/admin/files/category/{}/change/">{}</a>', obj.category.id, obj.category.title)
return '-'
category_link.short_description = 'Category'
def rbac_group_link(self, obj):
if obj.rbac_group:
return format_html('<a href="/admin/rbac/rbacgroup/{}/change/">{}</a>', obj.rbac_group.id, obj.rbac_group.name)
return '-'
rbac_group_link.short_description = 'RBAC Group'
@admin.register(LTIUserMapping)
class LTIUserMappingAdmin(admin.ModelAdmin):
"""Admin for LTI User Mappings"""
list_display = ['user_link', 'lti_user_id', 'platform', 'user_email', 'last_login']
list_filter = ['platform', 'created_at', 'last_login']
search_fields = ['lti_user_id', 'user__username', 'user__email']
readonly_fields = ['created_at', 'last_login']
fieldsets = (
('Mapping', {'fields': ('platform', 'lti_user_id', 'user')}),
('Timestamps', {'fields': ('created_at', 'last_login')}),
)
def user_link(self, obj):
return format_html('<a href="/admin/users/user/{}/change/">{}</a>', obj.user.id, obj.user.username)
user_link.short_description = 'MediaCMS User'
def user_email(self, obj):
return obj.user.email
user_email.short_description = 'User Email'
@admin.register(LTIRoleMapping)
class LTIRoleMappingAdmin(admin.ModelAdmin):
"""Admin for LTI Role Mappings"""
list_display = ['lti_role', 'platform', 'global_role', 'group_role']
list_filter = ['platform', 'global_role', 'group_role']
search_fields = ['lti_role']
fieldsets = (
('LTI Role', {'fields': ('platform', 'lti_role')}),
('MediaCMS Roles', {'fields': ('global_role', 'group_role'), 'description': 'Map this LTI role to MediaCMS global and group roles'}),
)
@admin.register(LTILaunchLog)
class LTILaunchLogAdmin(admin.ModelAdmin):
"""Admin for LTI Launch Logs"""
list_display = ['created_at', 'platform', 'user_link', 'launch_type', 'success_badge']
list_filter = ['success', 'launch_type', 'platform', 'created_at']
search_fields = ['user__username', 'error_message']
readonly_fields = ['created_at', 'claims']
date_hierarchy = 'created_at'
fieldsets = (
('Launch Info', {'fields': ('platform', 'user', 'resource_link', 'launch_type', 'success', 'created_at')}),
('Error Details', {'fields': ('error_message',), 'classes': ('collapse',)}),
('Claims Data', {'fields': ('claims',), 'classes': ('collapse',)}),
)
def success_badge(self, obj):
if obj.success:
return format_html('<span style="color: green;">✓ Success</span>')
return format_html('<span style="color: red;">✗ Failed</span>')
success_badge.short_description = 'Status'
def user_link(self, obj):
if obj.user:
return format_html('<a href="/admin/users/user/{}/change/">{}</a>', obj.user.id, obj.user.username)
return '-'
user_link.short_description = 'User'
def has_add_permission(self, request):
"""Disable manual creation of launch logs"""
return False
def has_change_permission(self, request, obj=None):
"""Make launch logs read-only"""
return False
@admin.register(LTIToolKeys)
class LTIToolKeysAdmin(admin.ModelAdmin):
"""Admin for LTI Tool RSA Keys"""
list_display = ['key_id', 'created_at', 'updated_at']
readonly_fields = ['key_id', 'created_at', 'updated_at', 'public_key_display']
fieldsets = (
('Key Information', {'fields': ('key_id', 'created_at', 'updated_at')}),
('Public Key (for JWKS)', {'fields': ('public_key_display',)}),
('Private Key (Keep Secure!)', {'fields': ('private_key_jwk',), 'classes': ('collapse',), 'description': '⚠️ This is your private signing key. Do not share it!'}),
)
actions = ['regenerate_keys']
def public_key_display(self, obj):
"""Display public key in readable format"""
import json
return format_html('<pre>{}</pre>', json.dumps(obj.public_key_jwk, indent=2))
public_key_display.short_description = 'Public Key (JWK)'
def regenerate_keys(self, request, queryset):
"""Regenerate keys for selected instances"""
for key_obj in queryset:
key_obj.generate_keys()
self.message_user(request, f"Keys regenerated for {key_obj.key_id}", messages.SUCCESS)
regenerate_keys.short_description = 'Regenerate RSA keys'
def has_add_permission(self, request):
"""Only allow one key pair - disable manual add if exists"""
return not LTIToolKeys.objects.exists()
def has_delete_permission(self, request, obj=None):
"""Prevent accidental deletion of keys"""
return False
+16
View File
@@ -0,0 +1,16 @@
from django.apps import AppConfig
from .keys import ensure_keys_exist
class LtiConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'lti'
verbose_name = 'LTI 1.3 Integration'
def ready(self):
"""Initialize LTI app - ensure keys exist"""
try:
ensure_keys_exist()
except Exception:
pass
+177
View File
@@ -0,0 +1,177 @@
"""
LTI Deep Linking 2.0 for MediaCMS
Allows instructors to select media from MediaCMS library and embed in Moodle courses
"""
import time
import traceback
import uuid
from urllib.parse import quote
import jwt
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from django.contrib.auth.decorators import login_required
from django.http import HttpResponseRedirect, JsonResponse
from django.shortcuts import render
from django.urls import reverse
from django.utils.decorators import method_decorator
from django.views import View
from django.views.decorators.csrf import csrf_exempt
from jwcrypto import jwk
from files.models import Media
from .models import LTIPlatform, LTIToolKeys
@method_decorator(login_required, name='dispatch')
class SelectMediaView(View):
"""
UI for instructors to select media for deep linking
Flow: Instructor clicks "Add MediaCMS" in Moodle → Deep link launch →
This view → Instructor selects media → Return to Moodle
"""
def get(self, request):
"""Display media selection interface - redirects to user's profile page"""
profile_url = f"/user/{request.user.username}?mode=lms_embed_mode&action=select_media"
lti_session = request.session.get('lti_session', {})
lti_context_id = lti_session.get('context_id', '')
if lti_context_id:
profile_url += f"&lti_context_id={quote(str(lti_context_id))}"
return HttpResponseRedirect(profile_url)
@method_decorator(csrf_exempt)
def post(self, request):
"""Return selected media as deep linking content items"""
deep_link_data = request.session.get('lti_deep_link')
if not deep_link_data:
return JsonResponse({'error': 'Invalid session'}, status=400)
selected_ids = request.POST.getlist('media_ids[]')
if not selected_ids:
return JsonResponse({'error': 'No media selected'}, status=400)
content_items = []
for media_id in selected_ids:
try:
media = Media.objects.get(id=media_id)
# Build launch URL (must be an LTI launch endpoint that handles POST with id_token)
# The /lti/launch/ endpoint will use the custom parameter to redirect to the correct media
launch_url = request.build_absolute_uri(reverse('lti:launch'))
content_item = {
'type': 'ltiResourceLink',
'title': media.title,
'url': launch_url,
'custom': {
'media_friendly_token': media.friendly_token,
},
}
if media.thumbnail_url:
thumbnail_url = media.thumbnail_url
if not thumbnail_url.startswith('http'):
thumbnail_url = request.build_absolute_uri(thumbnail_url)
content_item['thumbnail'] = {'url': thumbnail_url, 'width': 344, 'height': 194}
content_item['iframe'] = {'width': 960, 'height': 540}
content_items.append(content_item)
except Media.DoesNotExist:
continue
if not content_items:
return JsonResponse({'error': 'No valid media found'}, status=400)
# Full implementation would use PyLTI1p3's DeepLink response builder
jwt_response = self.create_deep_link_jwt(deep_link_data, content_items, request)
context = {
'return_url': deep_link_data['deep_link_return_url'],
'jwt': jwt_response,
}
return render(request, 'lti/deep_link_return.html', context)
def create_deep_link_jwt(self, deep_link_data, content_items, request):
"""
Create JWT response for deep linking - manual implementation
"""
try:
platform_id = deep_link_data['platform_id']
platform = LTIPlatform.objects.get(id=platform_id)
deployment_id = deep_link_data['deployment_id']
message_launch_data = deep_link_data['message_launch_data']
deep_linking_settings = message_launch_data.get('https://purl.imsglobal.org/spec/lti-dl/claim/deep_linking_settings', {})
key_obj = LTIToolKeys.get_or_create_keys()
jwk_obj = jwk.JWK(**key_obj.private_key_jwk)
pem_bytes = jwk_obj.export_to_pem(private_key=True, password=None)
private_key = serialization.load_pem_private_key(pem_bytes, password=None, backend=default_backend())
now = int(time.time())
lti_content_items = []
for item in content_items:
lti_item = {
'type': item['type'],
'title': item['title'],
'url': item['url'],
}
if item.get('custom'):
lti_item['custom'] = item['custom']
if item.get('thumbnail'):
lti_item['thumbnail'] = item['thumbnail']
if item.get('iframe'):
lti_item['iframe'] = item['iframe']
lti_content_items.append(lti_item)
tool_issuer = platform.client_id
audience = platform.platform_id
sub = message_launch_data.get('sub')
payload = {
'iss': tool_issuer,
'aud': audience,
'exp': now + 3600,
'iat': now,
'nonce': str(uuid.uuid4()),
'https://purl.imsglobal.org/spec/lti/claim/message_type': 'LtiDeepLinkingResponse',
'https://purl.imsglobal.org/spec/lti/claim/version': '1.3.0',
'https://purl.imsglobal.org/spec/lti/claim/deployment_id': deployment_id,
'https://purl.imsglobal.org/spec/lti-dl/claim/content_items': lti_content_items,
}
if sub:
payload['sub'] = sub
if 'data' in deep_linking_settings:
payload['https://purl.imsglobal.org/spec/lti-dl/claim/data'] = deep_linking_settings['data']
kid = key_obj.private_key_jwk['kid']
response_jwt = jwt.encode(payload, private_key, algorithm='RS256', headers={'kid': kid})
return response_jwt
except Exception as e:
traceback.print_exc()
raise ValueError(f"Failed to create Deep Linking JWT: {str(e)}")
+390
View File
@@ -0,0 +1,390 @@
"""
LTI Launch Handlers for User and Context Provisioning
Provides functions to:
- Create/update MediaCMS users from LTI launches
- Create/update categories and RBAC groups for courses
- Apply role mappings from LTI to MediaCMS
- Create and manage LTI sessions
"""
import base64
import hashlib
import json
import logging
from allauth.account.models import EmailAddress
from django.conf import settings
from django.contrib.auth import login
from django.utils import timezone
from files.models import Category
from rbac.models import RBACGroup, RBACMembership, RBACRole
from users.models import User
from .models import LTIResourceLink, LTIRoleMapping, LTIUserMapping
logger = logging.getLogger(__name__)
DEFAULT_LTI_ROLE_MAPPINGS = {
# LTI role names (used in standard launches)
'Instructor': {'global_role': '', 'group_role': 'manager'},
'TeachingAssistant': {'global_role': '', 'group_role': 'contributor'},
'Learner': {'global_role': '', 'group_role': 'member'},
'Student': {'global_role': '', 'group_role': 'member'},
'Administrator': {'global_role': '', 'group_role': 'manager'},
'Faculty': {'global_role': '', 'group_role': 'manager'},
# Moodle role shortnames (used in custom_publishdata from My Media launches)
'student': {'global_role': '', 'group_role': 'member'},
'guest': {'global_role': '', 'group_role': 'member'},
'teacher': {'global_role': '', 'group_role': 'manager'},
'editingteacher': {'global_role': '', 'group_role': 'manager'},
'manager': {'global_role': '', 'group_role': 'manager'},
'coursecreator': {'global_role': '', 'group_role': 'manager'},
'ta': {'global_role': '', 'group_role': 'contributor'},
}
def _ensure_course_context(platform, context_id, title, label, resource_link_id):
"""
Find or create the LTIResourceLink, Category, and RBACGroup for a course.
When a record already exists (e.g. created by a bulk My Media launch), it is
reused and its metadata is kept in sync. The resource_link_id is only
promoted when a real launch ID arrives to replace a 'bulk_*' placeholder.
Returns:
Tuple of (category, rbac_group, resource_link)
"""
resource_link = LTIResourceLink.objects.filter(
platform=platform,
context_id=context_id,
).first()
if resource_link:
category = resource_link.category
rbac_group = resource_link.rbac_group
rl_updates = []
if title and resource_link.context_title != title:
resource_link.context_title = title
rl_updates.append('context_title')
if label and resource_link.context_label != label:
resource_link.context_label = label
rl_updates.append('context_label')
# Promote from bulk placeholder to real resource link ID.
if resource_link_id and not resource_link_id.startswith('bulk_') and resource_link.resource_link_id != resource_link_id:
resource_link.resource_link_id = resource_link_id
rl_updates.append('resource_link_id')
if rl_updates:
resource_link.save(update_fields=rl_updates)
if title and category and category.title != title:
category.title = title
category.save(update_fields=['title'])
else:
category = Category.objects.create(
title=title or label or f'Course {context_id}',
description=f'Auto-created from {platform.name}: {title}',
is_global=False,
is_rbac_category=True,
is_lms_course=True,
lti_platform=platform,
lti_context_id=context_id,
)
rbac_group = RBACGroup.objects.create(
name=f'{title or label} ({platform.name})',
description=f'LTI course group from {platform.name}',
)
rbac_group.categories.add(category)
resource_link = LTIResourceLink.objects.create(
platform=platform,
context_id=context_id,
resource_link_id=resource_link_id,
context_title=title,
context_label=label,
category=category,
rbac_group=rbac_group,
)
return category, rbac_group, resource_link
_VALID_RBAC_ROLES = {r.value for r in RBACRole}
def _ensure_membership(user, rbac_group, group_role):
"""
Ensure the user is a member of rbac_group with group_role.
Updates the role if it differs from the current one, in either direction.
"""
if group_role not in _VALID_RBAC_ROLES:
return
existing = RBACMembership.objects.filter(user=user, rbac_group=rbac_group).first()
if existing:
if group_role != existing.role:
existing.role = group_role
existing.save(update_fields=['role'])
else:
try:
RBACMembership.objects.create(user=user, rbac_group=rbac_group, role=group_role)
except Exception:
pass
def provision_lti_user(platform, claims):
"""
Provision MediaCMS user from LTI launch claims.
Returns:
User instance
"""
lti_user_id = claims.get('sub')
if not lti_user_id:
raise ValueError("Missing 'sub' claim in LTI launch")
email = claims.get('email', '')
given_name = claims.get('given_name', '')
family_name = claims.get('family_name', '')
name = claims.get('name', f"{given_name} {family_name}").strip()
mapping = LTIUserMapping.objects.filter(platform=platform, lti_user_id=lti_user_id).select_related('user').first()
if mapping:
user = mapping.user
update_fields = []
if email and user.email != email:
user.email = email
update_fields.append('email')
if given_name and user.first_name != given_name:
user.first_name = given_name
update_fields.append('first_name')
if family_name and user.last_name != family_name:
user.last_name = family_name
update_fields.append('last_name')
if name and user.name != name:
user.name = name
update_fields.append('name')
if update_fields:
user.save(update_fields=update_fields)
else:
username = generate_username_from_lti(lti_user_id, email, given_name, family_name)
if User.objects.filter(username=username).exists():
username = f"{username}_{hashlib.md5(lti_user_id.encode()).hexdigest()[:6]}"
user = User.objects.create_user(
username=username,
email=email or '',
first_name=given_name,
last_name=family_name,
name=name or username,
is_active=True,
)
if email:
try:
EmailAddress.objects.create(user=user, email=email, verified=True, primary=True)
except Exception:
pass
LTIUserMapping.objects.create(platform=platform, lti_user_id=lti_user_id, user=user)
return user
def generate_username_from_lti(lti_user_id, email, given_name, family_name):
"""Generate a username from LTI user info."""
if email and '@' in email:
username = email.split('@')[0]
username = ''.join(c if c.isalnum() or c in '_-' else '_' for c in username)
if len(username) >= 4:
return username[:30]
if given_name and family_name:
username = f"{given_name}.{family_name}".lower()
username = ''.join(c if c.isalnum() or c in '_-.' else '_' for c in username)
if len(username) >= 4:
return username[:30]
return f"lti_user_{hashlib.md5(lti_user_id.encode()).hexdigest()[:10]}"
def provision_lti_context(platform, claims, resource_link_id):
"""
Provision MediaCMS category and RBAC group for an LTI context (course).
Returns:
Tuple of (category, rbac_group, resource_link)
"""
context = claims.get('https://purl.imsglobal.org/spec/lti/claim/context', {})
context_id = context.get('id')
if not context_id:
raise ValueError("Missing context ID in LTI launch")
return _ensure_course_context(
platform=platform,
context_id=str(context_id),
title=context.get('title', ''),
label=context.get('label', ''),
resource_link_id=resource_link_id,
)
def provision_lti_bulk_contexts(platform, user, publish_data_raw):
"""
Bulk-provision categories, groups, and memberships for every course the
user is enrolled in, as reported by the LMS via custom_publishdata.
Called on My Media launches. Skips the Moodle site course (ID 1).
"""
try:
padding = 4 - len(publish_data_raw) % 4
if padding != 4:
publish_data_raw += '=' * padding
courses = json.loads(base64.b64decode(publish_data_raw).decode('utf-8'))
except Exception as exc:
logger.warning('provision_lti_bulk_contexts: failed to decode publishdata: %s', exc)
return
if not isinstance(courses, list):
logger.warning('provision_lti_bulk_contexts: publishdata is not a list')
return
seen_group_ids = set()
for course in courses:
try:
course_id = str(course.get('id', '')).strip()
if not course_id:
continue
fullname = course.get('fullname', '')
shortname = course.get('shortname', '')
group_role = DEFAULT_LTI_ROLE_MAPPINGS.get(course.get('role', 'student'), {}).get('group_role', 'member')
_, rbac_group, _ = _ensure_course_context(
platform=platform,
context_id=course_id,
title=fullname,
label=shortname,
resource_link_id=f'bulk_{course_id}',
)
if rbac_group:
_ensure_membership(user, rbac_group, group_role)
seen_group_ids.add(rbac_group.id)
except Exception as exc:
logger.warning(
'provision_lti_bulk_contexts: error processing course %s: %s',
course.get('id'),
exc,
)
# Remove memberships for groups that belong to this platform but were absent
# from the current publishdata — meaning the user is no longer enrolled.
if seen_group_ids:
platform_group_ids = set(LTIResourceLink.objects.filter(platform=platform, rbac_group__isnull=False).values_list('rbac_group_id', flat=True))
stale_ids = platform_group_ids - seen_group_ids
if stale_ids:
deleted, _ = RBACMembership.objects.filter(user=user, rbac_group_id__in=stale_ids).delete()
if deleted:
logger.info(
'provision_lti_bulk_contexts: removed %d stale membership(s) for user %s',
deleted,
user.username,
)
def apply_lti_roles(user, platform, lti_roles, rbac_group):
"""
Apply role mappings from LTI role URIs to MediaCMS global and group roles.
Returns:
Tuple of (global_role, group_role)
"""
short_roles = []
for role in lti_roles or []:
if '#' in role:
short_roles.append(role.split('#')[-1])
elif '/' in role:
short_roles.append(role.split('/')[-1])
else:
short_roles.append(role)
custom_mappings = {m.lti_role: {'global_role': m.global_role, 'group_role': m.group_role} for m in LTIRoleMapping.objects.filter(platform=platform)}
all_mappings = {**DEFAULT_LTI_ROLE_MAPPINGS, **custom_mappings}
global_role = 'user'
group_role = 'member'
for role in short_roles:
if role in all_mappings:
if all_mappings[role].get('global_role'):
global_role = get_higher_privilege_global(global_role, all_mappings[role]['global_role'])
if all_mappings[role].get('group_role'):
group_role = resolve_group_role(group_role, all_mappings[role]['group_role'])
user.set_role_from_mapping(global_role)
_ensure_membership(user, rbac_group, group_role)
return global_role, group_role
def get_higher_privilege_global(role1, role2):
"""Return the higher privilege global role."""
privilege_order = ['user', 'advancedUser', 'editor', 'manager', 'admin']
try:
return privilege_order[max(privilege_order.index(role1), privilege_order.index(role2))]
except ValueError:
return role2
def resolve_group_role(role1, role2):
"""Return whichever of role1/role2 has higher privilege."""
privilege_order = ['member', 'contributor', 'manager']
try:
return privilege_order[max(privilege_order.index(role1), privilege_order.index(role2))]
except ValueError:
return role2
def create_lti_session(request, user, launch_data, platform):
"""Create a MediaCMS session from an LTI launch."""
login(request, user, backend='django.contrib.auth.backends.ModelBackend')
ld = launch_data.get_launch_data()
context = ld.get('https://purl.imsglobal.org/spec/lti/claim/context', {})
resource_link = ld.get('https://purl.imsglobal.org/spec/lti/claim/resource_link', {})
roles = ld.get('https://purl.imsglobal.org/spec/lti/claim/roles', [])
request.session['lti_session'] = {
'platform_id': platform.id,
'platform_name': platform.name,
'context_id': context.get('id'),
'context_title': context.get('title'),
'resource_link_id': resource_link.get('id'),
'roles': roles,
'launch_time': timezone.now().isoformat(),
}
request.session.set_expiry(getattr(settings, 'LTI_SESSION_TIMEOUT', 3600))
request.session.modified = True
request.session.save()
return True
def validate_lti_session(request):
"""
Validate that an LTI session exists and is valid.
Returns:
Dict of LTI session data or None
"""
if not request.user.is_authenticated:
return None
return request.session.get('lti_session')
+30
View File
@@ -0,0 +1,30 @@
"""
LTI Key Management for MediaCMS
Manages RSA keys for signing Deep Linking responses (stored in database)
"""
def load_public_key():
"""Load public key from database"""
from .models import LTIToolKeys
key_obj = LTIToolKeys.get_or_create_keys()
return key_obj.public_key_jwk
def get_jwks():
"""
Get JWKS (JSON Web Key Set) for public keys
Returns public keys in JWKS format for the /lti/jwks/ endpoint
"""
public_key = load_public_key()
return {'keys': [public_key]}
def ensure_keys_exist():
"""Ensure key pair exists in database, generate if not"""
from .models import LTIToolKeys
LTIToolKeys.get_or_create_keys()
+190
View File
@@ -0,0 +1,190 @@
# Generated by Django 5.2.6 on 2025-12-29 16:15
import django.db.models.deletion
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
('files', '0015_category_is_lms_course_category_lti_context_id'),
('rbac', '0003_alter_rbacgroup_members'),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.CreateModel(
name='LTIToolKeys',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('key_id', models.CharField(default='mediacms-lti-key', help_text='Key identifier', max_length=255, unique=True)),
('private_key_jwk', models.JSONField(help_text='Private key in JWK format (for signing)')),
('public_key_jwk', models.JSONField(help_text='Public key in JWK format (for JWKS endpoint)')),
('created_at', models.DateTimeField(auto_now_add=True)),
('updated_at', models.DateTimeField(auto_now=True)),
],
options={
'verbose_name': 'LTI Tool Keys',
'verbose_name_plural': 'LTI Tool Keys',
},
),
migrations.CreateModel(
name='LTIPlatform',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('name', models.CharField(help_text="Platform name (e.g., 'Moodle Production')", max_length=255, unique=True)),
('platform_id', models.URLField(help_text="Platform's issuer URL (iss claim, e.g., https://moodle.example.com)")),
('client_id', models.CharField(help_text='Client ID provided by the platform', max_length=255)),
('auth_login_url', models.URLField(help_text='OIDC authentication endpoint URL')),
('auth_token_url', models.URLField(help_text='OAuth2 token endpoint URL')),
('auth_audience', models.URLField(blank=True, help_text='OAuth2 audience (optional)', null=True)),
('key_set_url', models.URLField(help_text="Platform's public JWK Set URL")),
('deployment_ids', models.JSONField(default=list, help_text='List of deployment IDs for this platform')),
('enable_nrps', models.BooleanField(default=True, help_text='Enable Names and Role Provisioning Service')),
('enable_deep_linking', models.BooleanField(default=True, help_text='Enable Deep Linking 2.0')),
('remove_from_groups_on_unenroll', models.BooleanField(default=False, help_text="Remove users from RBAC groups when they're no longer in the course")),
('created_at', models.DateTimeField(auto_now_add=True)),
('updated_at', models.DateTimeField(auto_now=True)),
],
options={
'verbose_name': 'LTI Platform',
'verbose_name_plural': 'LTI Platforms',
'unique_together': {('platform_id', 'client_id')},
},
),
migrations.CreateModel(
name='LTIResourceLink',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('context_id', models.CharField(db_index=True, help_text='LTI context ID (typically course ID)', max_length=255)),
('context_title', models.CharField(blank=True, help_text='Course title', max_length=255)),
('context_label', models.CharField(blank=True, help_text='Course short name/code', max_length=100)),
('resource_link_id', models.CharField(db_index=True, help_text='LTI resource link ID', max_length=255)),
('resource_link_title', models.CharField(blank=True, help_text='Resource link title', max_length=255)),
(
'category',
models.ForeignKey(
blank=True, help_text='Mapped MediaCMS category', null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='lti_resource_links', to='files.category'
),
),
('platform', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='resource_links', to='lti.ltiplatform')),
(
'rbac_group',
models.ForeignKey(
blank=True, help_text='RBAC group for course members', null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='lti_resource_links', to='rbac.rbacgroup'
),
),
],
options={
'verbose_name': 'LTI Resource Link',
'verbose_name_plural': 'LTI Resource Links',
},
),
migrations.CreateModel(
name='LTILaunchLog',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('launch_type', models.CharField(choices=[('resource_link', 'Resource Link Launch'), ('deep_linking', 'Deep Linking')], default='resource_link', max_length=50)),
('success', models.BooleanField(db_index=True, default=True, help_text='Whether the launch was successful')),
('error_message', models.TextField(blank=True, help_text='Error message if launch failed')),
('claims', models.JSONField(help_text='Sanitized LTI claims from the launch')),
('created_at', models.DateTimeField(auto_now_add=True, db_index=True)),
(
'user',
models.ForeignKey(
blank=True,
help_text='MediaCMS user (null if launch failed before user creation)',
null=True,
on_delete=django.db.models.deletion.CASCADE,
related_name='lti_launch_logs',
to=settings.AUTH_USER_MODEL,
),
),
('platform', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='launch_logs', to='lti.ltiplatform')),
('resource_link', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='launch_logs', to='lti.ltiresourcelink')),
],
options={
'verbose_name': 'LTI Launch Log',
'verbose_name_plural': 'LTI Launch Logs',
'ordering': ['-created_at'],
},
),
migrations.CreateModel(
name='LTIRoleMapping',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('lti_role', models.CharField(help_text="LTI role URI or short name (e.g., 'Instructor', 'Learner')", max_length=255)),
(
'global_role',
models.CharField(
blank=True,
choices=[('advancedUser', 'Advanced User'), ('editor', 'MediaCMS Editor'), ('manager', 'MediaCMS Manager'), ('admin', 'MediaCMS Administrator')],
help_text='MediaCMS global role to assign',
max_length=20,
),
),
(
'group_role',
models.CharField(blank=True, choices=[('member', 'Member'), ('contributor', 'Contributor'), ('manager', 'Manager')], help_text='RBAC group role to assign', max_length=20),
),
('platform', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='role_mappings', to='lti.ltiplatform')),
],
options={
'verbose_name': 'LTI Role Mapping',
'verbose_name_plural': 'LTI Role Mappings',
},
),
migrations.CreateModel(
name='LTIUserMapping',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('lti_user_id', models.CharField(db_index=True, help_text="LTI 'sub' claim (unique user identifier from platform)", max_length=255)),
('created_at', models.DateTimeField(auto_now_add=True)),
('last_login', models.DateTimeField(auto_now=True)),
('platform', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='user_mappings', to='lti.ltiplatform')),
('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='lti_mappings', to=settings.AUTH_USER_MODEL)),
],
options={
'verbose_name': 'LTI User Mapping',
'verbose_name_plural': 'LTI User Mappings',
},
),
migrations.AddIndex(
model_name='ltiresourcelink',
index=models.Index(fields=['platform', 'context_id'], name='lti_ltireso_platfor_4a3f27_idx'),
),
migrations.AddIndex(
model_name='ltiresourcelink',
index=models.Index(fields=['context_id'], name='lti_ltireso_context_c6f9e2_idx'),
),
migrations.AlterUniqueTogether(
name='ltiresourcelink',
unique_together={('platform', 'context_id', 'resource_link_id')},
),
migrations.AddIndex(
model_name='ltilaunchlog',
index=models.Index(fields=['-created_at'], name='lti_ltilaun_created_94c574_idx'),
),
migrations.AddIndex(
model_name='ltilaunchlog',
index=models.Index(fields=['platform', 'user'], name='lti_ltilaun_platfor_5240bf_idx'),
),
migrations.AlterUniqueTogether(
name='ltirolemapping',
unique_together={('platform', 'lti_role')},
),
migrations.AddIndex(
model_name='ltiusermapping',
index=models.Index(fields=['platform', 'lti_user_id'], name='lti_ltiuser_platfor_9c70bb_idx'),
),
migrations.AddIndex(
model_name='ltiusermapping',
index=models.Index(fields=['user'], name='lti_ltiuser_user_id_b06d01_idx'),
),
migrations.AlterUniqueTogether(
name='ltiusermapping',
unique_together={('platform', 'lti_user_id')},
),
]
View File
Executable
+218
View File
@@ -0,0 +1,218 @@
import json
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from django.db import models
from jwcrypto import jwk
class LTIPlatform(models.Model):
"""LTI 1.3 Platform (Moodle instance) configuration"""
name = models.CharField(max_length=255, unique=True, help_text="Platform name (e.g., 'Moodle Production')")
platform_id = models.URLField(help_text="Platform's issuer URL (iss claim, e.g., https://moodle.example.com)")
client_id = models.CharField(max_length=255, help_text="Client ID provided by the platform")
auth_login_url = models.URLField(help_text="OIDC authentication endpoint URL")
auth_token_url = models.URLField(help_text="OAuth2 token endpoint URL")
auth_audience = models.URLField(blank=True, null=True, help_text="OAuth2 audience (optional)")
key_set_url = models.URLField(help_text="Platform's public JWK Set URL")
deployment_ids = models.JSONField(default=list, help_text="List of deployment IDs for this platform")
enable_nrps = models.BooleanField(default=True, help_text="Enable Names and Role Provisioning Service")
enable_deep_linking = models.BooleanField(default=True, help_text="Enable Deep Linking 2.0")
remove_from_groups_on_unenroll = models.BooleanField(default=False, help_text="Remove users from RBAC groups when they're no longer in the course")
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
verbose_name = 'LTI Platform'
verbose_name_plural = 'LTI Platforms'
unique_together = [['platform_id', 'client_id']]
def __str__(self):
return f"{self.name} ({self.platform_id})"
def get_lti_config(self):
"""Return configuration dict for PyLTI1p3"""
return {
'platform_id': self.platform_id,
'client_id': self.client_id,
'auth_login_url': self.auth_login_url,
'auth_token_url': self.auth_token_url,
'auth_audience': self.auth_audience,
'key_set_url': self.key_set_url,
'deployment_ids': self.deployment_ids,
}
class LTIResourceLink(models.Model):
"""Specific LTI resource link (e.g., MediaCMS in a Moodle course)"""
platform = models.ForeignKey(LTIPlatform, on_delete=models.CASCADE, related_name='resource_links')
context_id = models.CharField(max_length=255, db_index=True, help_text="LTI context ID (typically course ID)")
context_title = models.CharField(max_length=255, blank=True, help_text="Course title")
context_label = models.CharField(max_length=100, blank=True, help_text="Course short name/code")
resource_link_id = models.CharField(max_length=255, db_index=True, help_text="LTI resource link ID")
resource_link_title = models.CharField(max_length=255, blank=True, help_text="Resource link title")
category = models.ForeignKey('files.Category', on_delete=models.SET_NULL, null=True, blank=True, related_name='lti_resource_links', help_text="Mapped MediaCMS category")
rbac_group = models.ForeignKey('rbac.RBACGroup', on_delete=models.SET_NULL, null=True, blank=True, related_name='lti_resource_links', help_text="RBAC group for course members")
class Meta:
verbose_name = 'LTI Resource Link'
verbose_name_plural = 'LTI Resource Links'
unique_together = [['platform', 'context_id', 'resource_link_id']]
indexes = [
models.Index(fields=['platform', 'context_id']),
models.Index(fields=['context_id']),
]
def __str__(self):
return f"{self.context_title or self.context_id} - {self.resource_link_title or self.resource_link_id}"
class LTIUserMapping(models.Model):
"""Maps LTI user identities (sub claim) to MediaCMS users"""
platform = models.ForeignKey(LTIPlatform, on_delete=models.CASCADE, related_name='user_mappings')
lti_user_id = models.CharField(max_length=255, db_index=True, help_text="LTI 'sub' claim (unique user identifier from platform)")
user = models.ForeignKey('users.User', on_delete=models.CASCADE, related_name='lti_mappings')
created_at = models.DateTimeField(auto_now_add=True)
last_login = models.DateTimeField(auto_now=True)
class Meta:
verbose_name = 'LTI User Mapping'
verbose_name_plural = 'LTI User Mappings'
unique_together = [['platform', 'lti_user_id']]
indexes = [
models.Index(fields=['platform', 'lti_user_id']),
models.Index(fields=['user']),
]
def __str__(self):
return f"{self.user.username} ({self.platform.name})"
class LTIRoleMapping(models.Model):
"""Maps LTI institutional roles to MediaCMS roles"""
GLOBAL_ROLE_CHOICES = [('advancedUser', 'Advanced User'), ('editor', 'MediaCMS Editor'), ('manager', 'MediaCMS Manager'), ('admin', 'MediaCMS Administrator')]
GROUP_ROLE_CHOICES = [('member', 'Member'), ('contributor', 'Contributor'), ('manager', 'Manager')]
platform = models.ForeignKey(LTIPlatform, on_delete=models.CASCADE, related_name='role_mappings')
lti_role = models.CharField(max_length=255, help_text="LTI role URI or short name (e.g., 'Instructor', 'Learner')")
global_role = models.CharField(max_length=20, blank=True, choices=GLOBAL_ROLE_CHOICES, help_text="MediaCMS global role to assign")
group_role = models.CharField(max_length=20, blank=True, choices=GROUP_ROLE_CHOICES, help_text="RBAC group role to assign")
class Meta:
verbose_name = 'LTI Role Mapping'
verbose_name_plural = 'LTI Role Mappings'
unique_together = [['platform', 'lti_role']]
def __str__(self):
return f"{self.lti_role}{self.global_role or 'none'}/{self.group_role or 'none'} ({self.platform.name})"
class LTILaunchLog(models.Model):
"""Audit log for LTI launches"""
LAUNCH_TYPE_CHOICES = [
('resource_link', 'Resource Link Launch'),
('deep_linking', 'Deep Linking'),
]
platform = models.ForeignKey(LTIPlatform, on_delete=models.CASCADE, related_name='launch_logs')
user = models.ForeignKey('users.User', on_delete=models.CASCADE, null=True, blank=True, related_name='lti_launch_logs', help_text="MediaCMS user (null if launch failed before user creation)")
resource_link = models.ForeignKey(LTIResourceLink, on_delete=models.SET_NULL, null=True, blank=True, related_name='launch_logs')
launch_type = models.CharField(max_length=50, choices=LAUNCH_TYPE_CHOICES, default='resource_link')
success = models.BooleanField(default=True, db_index=True, help_text="Whether the launch was successful")
error_message = models.TextField(blank=True, help_text="Error message if launch failed")
claims = models.JSONField(help_text="Sanitized LTI claims from the launch")
created_at = models.DateTimeField(auto_now_add=True, db_index=True)
class Meta:
verbose_name = 'LTI Launch Log'
verbose_name_plural = 'LTI Launch Logs'
ordering = ['-created_at']
indexes = [
models.Index(fields=['-created_at']),
models.Index(fields=['platform', 'user']),
]
def __str__(self):
status = "" if self.success else ""
user_str = self.user.username if self.user else "Unknown"
return f"{status} {user_str} @ {self.platform.name} ({self.created_at.strftime('%Y-%m-%d %H:%M')})"
class LTIToolKeys(models.Model):
"""
Stores MediaCMS's RSA key pair for signing LTI responses (e.g., Deep Linking)
Only one instance should exist (singleton pattern)
"""
key_id = models.CharField(max_length=255, unique=True, default='mediacms-lti-key', help_text='Key identifier')
private_key_jwk = models.JSONField(help_text='Private key in JWK format (for signing)')
public_key_jwk = models.JSONField(help_text='Public key in JWK format (for JWKS endpoint)')
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
verbose_name = 'LTI Tool Keys'
verbose_name_plural = 'LTI Tool Keys'
def __str__(self):
return f"LTI Keys ({self.key_id})"
@classmethod
def get_or_create_keys(cls):
"""Get or create the default key pair"""
key_obj, created = cls.objects.get_or_create(
key_id='mediacms-lti-key',
defaults={'private_key_jwk': {}, 'public_key_jwk': {}}, # Will be populated by save()
)
if created or not key_obj.private_key_jwk or not key_obj.public_key_jwk:
key_obj.generate_keys()
return key_obj
def generate_keys(self):
"""Generate new RSA key pair"""
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048, backend=default_backend())
public_key = private_key.public_key()
private_pem = private_key.private_bytes(encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption())
public_pem = public_key.public_bytes(encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo)
private_jwk = jwk.JWK.from_pem(private_pem)
public_jwk = jwk.JWK.from_pem(public_pem)
private_jwk_dict = json.loads(private_jwk.export())
private_jwk_dict['kid'] = self.key_id
private_jwk_dict['alg'] = 'RS256'
private_jwk_dict['use'] = 'sig'
public_jwk_dict = json.loads(public_jwk.export_public())
public_jwk_dict['kid'] = self.key_id
public_jwk_dict['alg'] = 'RS256'
public_jwk_dict['use'] = 'sig'
self.private_key_jwk = private_jwk_dict
self.public_key_jwk = public_jwk_dict
self.save()
return private_jwk_dict, public_jwk_dict
+19
View File
@@ -0,0 +1,19 @@
"""
LTI 1.3 URL Configuration for MediaCMS
"""
from django.urls import path
from . import deep_linking, views
app_name = 'lti'
urlpatterns = [
path('oidc/login/', views.OIDCLoginView.as_view(), name='oidc_login'),
path('launch/', views.LaunchView.as_view(), name='launch'),
path('jwks/', views.JWKSView.as_view(), name='jwks'),
path('public-key/', views.PublicKeyPEMView.as_view(), name='public_key_pem'),
path('select-media/', deep_linking.SelectMediaView.as_view(), name='select_media'),
path('my-media/', views.MyMediaLTIView.as_view(), name='my_media'),
path('embed/<str:friendly_token>/', views.EmbedMediaLTIView.as_view(), name='embed_media'),
]
+774
View File
@@ -0,0 +1,774 @@
"""
LTI 1.3 Views for MediaCMS
Implements the LTI 1.3 / LTI Advantage flow:
- OIDC Login Initiation
- LTI Launch (JWT validation and processing)
- JWKS endpoint (public keys)
- My Media view (iframe-compatible)
- Embed Media view (LTI-authenticated)
- Manual NRPS Sync
"""
import base64
import json
import logging
import traceback
import uuid
from urllib.parse import quote, urlencode
import jwt
from django.http import HttpResponse, HttpResponseRedirect, JsonResponse
from django.shortcuts import get_object_or_404, render
from django.urls import reverse
from django.utils.decorators import method_decorator
from django.views import View
from django.views.decorators.clickjacking import xframe_options_exempt
from django.views.decorators.csrf import csrf_exempt
from jwcrypto import jwk
from pylti1p3.exception import LtiException
from pylti1p3.message_launch import MessageLaunch
from pylti1p3.oidc_login import OIDCLogin
from files.models import Media, MediaPermission
from rbac.models import RBACMembership
from .adapters import DjangoRequest, DjangoSessionService, DjangoToolConfig
from .handlers import (
apply_lti_roles,
create_lti_session,
provision_lti_bulk_contexts,
provision_lti_context,
provision_lti_user,
validate_lti_session,
)
from .keys import get_jwks
from .models import LTILaunchLog, LTIPlatform, LTIResourceLink, LTIToolKeys
logger = logging.getLogger(__name__)
def get_client_ip(request):
"""Get client IP address from request"""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
@method_decorator(csrf_exempt, name='dispatch')
class OIDCLoginView(View):
"""
OIDC Login Initiation - Step 1 of LTI 1.3 launch
Flow: Moodle → This endpoint → Redirect to Moodle auth endpoint
"""
def get(self, request):
return self.handle_oidc_login(request)
def post(self, request):
return self.handle_oidc_login(request)
def handle_oidc_login(self, request):
"""Handle OIDC login initiation"""
try:
target_link_uri = request.GET.get('target_link_uri') or request.POST.get('target_link_uri')
iss = request.GET.get('iss') or request.POST.get('iss')
client_id = request.GET.get('client_id') or request.POST.get('client_id')
login_hint = request.GET.get('login_hint') or request.POST.get('login_hint')
lti_message_hint = request.GET.get('lti_message_hint') or request.POST.get('lti_message_hint')
cmid = request.GET.get('cmid') or request.POST.get('cmid')
media_token = request.GET.get('media_token') or request.POST.get('media_token')
# Extract embed parameters from request
embed_show_title = request.GET.get('embed_show_title') or request.POST.get('embed_show_title')
embed_show_related = request.GET.get('embed_show_related') or request.POST.get('embed_show_related')
embed_show_user_avatar = request.GET.get('embed_show_user_avatar') or request.POST.get('embed_show_user_avatar')
embed_link_title = request.GET.get('embed_link_title') or request.POST.get('embed_link_title')
embed_start_time = request.GET.get('embed_start_time') or request.POST.get('embed_start_time')
embed_width = request.GET.get('embed_width') or request.POST.get('embed_width')
embed_height = request.GET.get('embed_height') or request.POST.get('embed_height')
show_media_page = request.GET.get('show_media_page') or request.POST.get('show_media_page')
if not all([target_link_uri, iss, client_id]):
return JsonResponse({'error': 'Missing required OIDC parameters'}, status=400)
try:
platform = LTIPlatform.objects.get(platform_id=iss, client_id=client_id)
except LTIPlatform.DoesNotExist:
return JsonResponse({'error': 'Platform not found'}, status=404)
tool_config = DjangoToolConfig.from_platform(platform)
lti_request = DjangoRequest(request)
session_service = DjangoSessionService(request)
cookie_service = DjangoSessionService(request) # Using same service for cookies
oidc_login = OIDCLogin(lti_request, tool_config, session_service=session_service, cookie_service=cookie_service)
try:
oidc_with_cookies = oidc_login.enable_check_cookies()
redirect_url = oidc_with_cookies.redirect(target_link_uri)
if not redirect_url:
# Generate base state UUID
state_uuid = str(uuid.uuid4())
nonce = str(uuid.uuid4())
# Encode lti_message_hint IN the state parameter for retry reliability
# This survives session/cookie issues since it's passed through URLs
state_data = {'uuid': state_uuid}
if lti_message_hint:
state_data['hint'] = lti_message_hint
if media_token:
state_data['media_token'] = media_token
# Add embed parameters to state
if embed_show_title:
state_data['embed_show_title'] = embed_show_title
if embed_show_related:
state_data['embed_show_related'] = embed_show_related
if embed_show_user_avatar:
state_data['embed_show_user_avatar'] = embed_show_user_avatar
if embed_link_title:
state_data['embed_link_title'] = embed_link_title
if embed_start_time:
state_data['embed_start_time'] = embed_start_time
if embed_width:
state_data['embed_width'] = embed_width
if embed_height:
state_data['embed_height'] = embed_height
if show_media_page:
state_data['show_media_page'] = show_media_page
# Encode as base64 URL-safe string
state = base64.urlsafe_b64encode(json.dumps(state_data).encode()).decode().rstrip('=')
launch_data = {'target_link_uri': target_link_uri, 'nonce': nonce}
# Store cmid if provided (including 0 for filter-based launches)
if cmid is not None:
launch_data['cmid'] = cmid
# Store lti_message_hint for retry mechanism
if lti_message_hint:
launch_data['lti_message_hint'] = lti_message_hint
# CRITICAL: Store using the FULL encoded state, not just the UUID
# PyLTI1p3 looks for the full state value during validation
session_service.save_launch_data(f'state-{state}', launch_data)
# Also store lti_message_hint in regular session for retry mechanism
# (state-specific storage might be lost due to cookie issues)
if lti_message_hint:
request.session['lti_last_message_hint'] = lti_message_hint
request.session.modified = True
params = {
'response_type': 'id_token',
'redirect_uri': target_link_uri,
'state': state,
'client_id': client_id,
'login_hint': login_hint,
'scope': 'openid',
'response_mode': 'form_post',
'prompt': 'none',
'nonce': nonce,
}
if lti_message_hint:
params['lti_message_hint'] = lti_message_hint
redirect_url = f"{platform.auth_login_url}?{urlencode(params)}"
return HttpResponseRedirect(redirect_url)
except Exception:
raise
except LtiException as e:
traceback.print_exc()
return render(request, 'lti/launch_error.html', {'error': 'OIDC Login Failed', 'message': str(e)}, status=400)
except Exception as e: # noqa
traceback.print_exc()
return JsonResponse({'error': 'Internal server error during OIDC login'}, status=500)
@method_decorator(csrf_exempt, name='dispatch')
@method_decorator(xframe_options_exempt, name='dispatch')
class LaunchView(View):
"""
LTI Launch Handler - Step 3 of LTI 1.3 launch
Flow: Moodle → This endpoint (with JWT) → Validate → Provision → Session → Redirect
"""
@staticmethod
def extract_embed_params_from_dict(params_dict):
"""Extract embed parameters from a dictionary and return as query string list."""
embed_params = []
param_mapping = {
'embed_show_title': 'showTitle',
'embed_show_related': 'showRelated',
'embed_show_user_avatar': 'showUserAvatar',
'embed_link_title': 'linkTitle',
'embed_start_time': 't',
'embed_width': 'width',
'embed_height': 'height',
'showTitle': 'showTitle',
'showRelated': 'showRelated',
'showUserAvatar': 'showUserAvatar',
'linkTitle': 'linkTitle',
't': 't',
'width': 'width',
'height': 'height',
'show_media_page': 'show_media_page',
'embed_share_media': 'share_media',
'parent_media_base': 'parent_media_base',
}
url_encode_keys = {'parent_media_base'}
for key, param_name in param_mapping.items():
value = params_dict.get(key)
if value:
encoded_value = quote(str(value), safe='') if key in url_encode_keys else value
param_str = f"{param_name}={encoded_value}"
if param_str not in embed_params:
embed_params.append(param_str)
return embed_params
@staticmethod
def build_url_with_embed_params(base_url, embed_params):
"""Build URL with embed parameters."""
# Check if base_url already has query parameters
separator = '&' if '?' in base_url else '?'
query_parts = ['mode=lms_embed_mode']
query_parts.extend(embed_params)
return f"{base_url}{separator}{'&'.join(query_parts)}"
def post(self, request):
"""Handle LTI launch with JWT validation"""
platform = None
user = None
error_message = ''
claims = {}
# Extract media_token and embed parameters from state parameter if present (for filter launches)
media_token_from_state = None
embed_params_from_state = {}
state = request.POST.get('state')
if state:
try:
# Add padding if needed for base64 decode
padding = 4 - (len(state) % 4)
if padding and padding != 4:
state_padded = state + ('=' * padding)
else:
state_padded = state
state_decoded = base64.urlsafe_b64decode(state_padded.encode()).decode()
state_data = json.loads(state_decoded)
media_token_from_state = state_data.get('media_token')
# Extract embed parameters from state
for key in [
'embed_show_title',
'embed_show_related',
'embed_show_user_avatar',
'embed_link_title',
'embed_start_time',
'embed_width',
'embed_height',
'show_media_page',
'embed_share_media',
]:
if key in state_data:
embed_params_from_state[key] = state_data[key]
except Exception:
pass
try:
id_token = request.POST.get('id_token')
if not id_token:
raise ValueError("Missing id_token in launch request")
unverified = jwt.decode(id_token, options={"verify_signature": False})
iss = unverified.get('iss')
aud = unverified.get('aud')
try:
platform = LTIPlatform.objects.get(platform_id=iss, client_id=aud)
except LTIPlatform.DoesNotExist:
raise
tool_config = DjangoToolConfig.from_platform(platform)
lti_request = DjangoRequest(request)
session_service = DjangoSessionService(request)
cookie_service = DjangoSessionService(request)
class CustomMessageLaunch(MessageLaunch):
def _get_request_param(self, key):
"""Override to properly get request parameters"""
return self._request.get_param(key)
message_launch = CustomMessageLaunch(lti_request, tool_config, session_service=session_service, cookie_service=cookie_service)
launch_data = message_launch.get_launch_data()
claims = self.sanitize_claims(launch_data)
# Extract custom claims and inject media_token and embed params from state if present
try:
custom_claims = launch_data.get('https://purl.imsglobal.org/spec/lti/claim/custom', {})
# Inject media_token from state if present (for filter launches)
if media_token_from_state and not custom_claims.get('media_friendly_token'):
custom_claims['media_friendly_token'] = media_token_from_state
# Inject embed parameters from state if present
for key, value in embed_params_from_state.items():
if key not in custom_claims:
custom_claims[key] = value
# Update launch_data with the modified custom claims
launch_data['https://purl.imsglobal.org/spec/lti/claim/custom'] = custom_claims
except Exception:
custom_claims = {}
resource_link = launch_data.get('https://purl.imsglobal.org/spec/lti/claim/resource_link', {})
resource_link_id = resource_link.get('id', 'default')
roles = launch_data.get('https://purl.imsglobal.org/spec/lti/claim/roles', [])
# IMPORTANT: Provision user and create session BEFORE handling deep linking
# This ensures filter launches (which are deep linking) have authenticated user
user = provision_lti_user(platform, launch_data)
context_claim = launch_data.get('https://purl.imsglobal.org/spec/lti/claim/context', {})
# Detect My Media launches: publishdata is only sent on My Media launches.
publish_data_raw = custom_claims.get('publishdata') or custom_claims.get('custom_publishdata')
if publish_data_raw:
# My Media launch: provision all enrolled courses from publishdata.
# Skip individual context provisioning to avoid double-provisioning.
resource_link_obj = None
provision_lti_bulk_contexts(platform, user, publish_data_raw)
elif context_claim:
# Normal course launch: provision only this context.
category, rbac_group, resource_link_obj = provision_lti_context(platform, launch_data, resource_link_id)
apply_lti_roles(user, platform, roles, rbac_group)
else:
resource_link_obj = None
create_lti_session(request, user, message_launch, platform)
message_type = launch_data.get('https://purl.imsglobal.org/spec/lti/claim/message_type')
if message_type == 'LtiDeepLinkingRequest':
return self.handle_deep_linking_launch(request, message_launch, platform, launch_data)
# Clear retry counter on successful launch
if 'lti_retry_count' in request.session:
del request.session['lti_retry_count']
LTILaunchLog.objects.create(platform=platform, user=user, resource_link=resource_link_obj, launch_type='resource_link', success=True, claims=claims)
redirect_url = self.determine_redirect(launch_data, resource_link_obj)
if redirect_url is None:
return HttpResponse('This media no longer exists', status=404, content_type='text/plain; charset=utf-8')
# Use HTML meta refresh instead of HTTP redirect to ensure session cookie is sent
# In cross-site/iframe contexts, HTTP 302 redirects may not preserve session cookies
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<meta http-equiv="refresh" content="0;url={redirect_url}">
<title>Loading...</title>
<style>
body {{
font-family: sans-serif;
display: flex;
justify-content: center;
align-items: center;
height: 100vh;
margin: 0;
background: #f5f5f5;
}}
.loader {{
text-align: center;
}}
</style>
</head>
<body>
<div class="loader">
<p>Loading MediaCMS...</p>
<p><small>If you are not redirected, <a href="{redirect_url}">click here</a></small></p>
</div>
</body>
</html>
"""
response = HttpResponse(html_content, content_type='text/html')
# Ensure session cookie is set in this response
request.session.modified = True
return response
except LtiException as e: # noqa
error_message = str(e)
traceback.print_exc()
# Attempt automatic retry for state errors (handles concurrent launches and session issues)
if "State not found" in error_message or "state not found" in error_message.lower():
return self.handle_state_not_found(request, platform)
except Exception as e: # noqa
traceback.print_exc()
if platform:
LTILaunchLog.objects.create(platform=platform, user=user, launch_type='resource_link', success=False, error_message=error_message, claims=claims)
return render(request, 'lti/launch_error.html', {'error': 'LTI Launch Failed', 'message': error_message}, status=400)
def sanitize_claims(self, claims):
"""Remove sensitive data from claims before logging"""
safe_claims = claims.copy()
return safe_claims
def determine_redirect(self, launch_data, resource_link):
"""Determine where to redirect after successful launch"""
custom = launch_data.get('https://purl.imsglobal.org/spec/lti/claim/custom', {})
custom_path = custom.get('redirect_path')
if custom_path:
if not custom_path.startswith('/'):
custom_path = '/' + custom_path
return custom_path
# Check custom claims for media token (from both deep linking and filter launches)
media_id = custom.get('media_id') or custom.get('media_friendly_token')
if media_id:
try:
media = Media.objects.get(friendly_token=media_id)
embed_params = self.extract_embed_params_from_dict(custom)
base_url = reverse('lti:embed_media', args=[media.friendly_token])
return self.build_url_with_embed_params(base_url, embed_params)
except Media.DoesNotExist:
return None
my_media_url = reverse('lti:my_media') + '?mode=lms_embed_mode'
if custom.get('embed_share_media') == '0':
my_media_url += '&share_media=0'
return my_media_url
def handle_state_not_found(self, request, platform=None):
"""
Handle state not found errors by attempting to restart the OIDC flow.
This can happen when:
- Cookies are blocked/deleted
- Session expired
- Browser privacy settings interfere
"""
try:
# Check retry count to prevent infinite loops
retry_count = request.session.get('lti_retry_count', 0)
MAX_RETRIES = 5 # Increased for concurrent launches (e.g., multiple videos on same page)
if retry_count >= MAX_RETRIES:
return render(
request,
'lti/launch_error.html',
{
'error': 'Authentication Failed',
'message': (
'Unable to establish a secure session after multiple attempts. '
'This may be due to browser cookie settings or privacy features. Please try:\n\n'
'1. Enabling cookies for this site\n'
'2. Disabling tracking protection for this site\n'
'3. Using a different browser\n'
'4. Contacting your administrator if the issue persists'
),
'is_cookie_error': True,
},
status=400,
)
# Extract launch parameters from the POST request
id_token = request.POST.get('id_token')
state = request.POST.get('state')
if not id_token:
raise ValueError("No id_token available for retry")
# Decode state to extract media_token (encoded during OIDC login)
media_token_from_retry = None
try:
# Add padding if needed for base64 decode
padding = 4 - (len(state) % 4)
if padding and padding != 4:
state_padded = state + ('=' * padding)
else:
state_padded = state
state_decoded = base64.urlsafe_b64decode(state_padded.encode()).decode()
state_data = json.loads(state_decoded)
media_token_from_retry = state_data.get('media_token')
except Exception:
# State might be a plain UUID from older code, that's OK
pass
# Decode JWT to extract issuer and target info (no verification needed for this)
unverified = jwt.decode(id_token, options={"verify_signature": False})
iss = unverified.get('iss')
aud = unverified.get('aud') # This is the client_id
target_link_uri = unverified.get('https://purl.imsglobal.org/spec/lti/claim/target_link_uri')
# Get login_hint and lti_message_hint if available
login_hint = request.POST.get('login_hint') or unverified.get('sub')
if not all([iss, aud, target_link_uri]):
raise ValueError("Missing required parameters for OIDC retry")
# Try to identify platform
if not platform:
try:
platform = LTIPlatform.objects.get(platform_id=iss, client_id=aud)
except LTIPlatform.DoesNotExist:
raise ValueError(f"Platform not found: {iss}/{aud}")
# Increment retry counter
request.session['lti_retry_count'] = retry_count + 1
request.session.modified = True
# Build OIDC login URL with all parameters
oidc_login_url = request.build_absolute_uri(reverse('lti:oidc_login'))
params = {
'iss': iss,
'client_id': aud,
'target_link_uri': target_link_uri,
'login_hint': login_hint,
}
# DON'T pass lti_message_hint in retry - it's single-use and causes Moodle 404
# The launchid in lti_message_hint is only valid for one authentication flow
# Moodle will handle the retry without the hint
# Pass media_token in retry for filter launches (our custom parameter, not Moodle's)
if media_token_from_retry:
params['media_token'] = media_token_from_retry
# Add retry indicator
params['retry'] = retry_count + 1
redirect_url = f"{oidc_login_url}?{urlencode(params)}"
return HttpResponseRedirect(redirect_url)
except Exception as retry_error:
traceback.print_exc()
return render(
request,
'lti/launch_error.html',
{
'error': 'LTI Launch Failed',
'message': f'State validation failed and automatic retry was unsuccessful: {str(retry_error)}',
},
status=400,
)
def handle_deep_linking_launch(self, request, message_launch, platform, launch_data):
"""Handle deep linking request"""
# Clear retry counter on successful launch
if 'lti_retry_count' in request.session:
del request.session['lti_retry_count']
deep_linking_settings = launch_data.get('https://purl.imsglobal.org/spec/lti-dl/claim/deep_linking_settings', {})
if not deep_linking_settings:
raise ValueError("Missing deep linking settings in launch data")
deep_link_return_url = deep_linking_settings.get('deep_link_return_url')
if not deep_link_return_url:
raise ValueError("Missing deep_link_return_url in deep linking settings")
request.session['lti_deep_link'] = {
'deep_link_return_url': deep_link_return_url,
'deployment_id': launch_data.get('https://purl.imsglobal.org/spec/lti/claim/deployment_id'),
'platform_id': platform.id,
'message_launch_data': launch_data, # Store full launch data for JWT creation
}
# Check if we have a media_friendly_token from filter launches
custom_claims = launch_data.get('https://purl.imsglobal.org/spec/lti/claim/custom', {})
media_token = custom_claims.get('media_friendly_token')
if media_token:
embed_params = self.extract_embed_params_from_dict(custom_claims)
base_url = reverse('lti:embed_media', args=[media_token])
redirect_url = self.build_url_with_embed_params(base_url, embed_params)
else:
redirect_url = reverse('lti:select_media') + '?mode=lms_embed_mode'
# Use HTML meta refresh to ensure session cookie is preserved in cross-site contexts
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<meta http-equiv="refresh" content="0;url={redirect_url}">
<title>Loading...</title>
</head>
<body>
<p>Loading...</p>
</body>
</html>
"""
request.session.modified = True
return HttpResponse(html_content, content_type='text/html')
class JWKSView(View):
"""
JWKS Endpoint - Provides tool's public keys
Used by Moodle to validate signatures from MediaCMS (e.g., Deep Linking responses)
"""
def get(self, request):
"""Return tool's public JWK Set"""
jwks = get_jwks()
return JsonResponse(jwks, content_type='application/json')
class PublicKeyPEMView(View):
"""
Display public key in PEM format for easy copy/paste into Moodle
"""
def get(self, request):
"""Return public key in PEM format"""
key_obj = LTIToolKeys.get_or_create_keys()
jwk_obj = jwk.JWK(**key_obj.public_key_jwk)
pem_bytes = jwk_obj.export_to_pem()
pem_string = pem_bytes.decode('utf-8')
return HttpResponse(
f"MediaCMS LTI Public Key (PEM Format)\n"
f"{'=' * 80}\n\n"
f"{pem_string}\n"
f"{'=' * 80}\n\n"
f"Instructions:\n"
f"1. Copy the entire key above (including BEGIN/END lines)\n"
f"2. In Moodle LTI tool configuration, change 'Public key type' to 'Public key'\n"
f"3. Paste the key into the 'Public key' field\n"
f"4. Save and try Deep Linking again\n",
content_type='text/plain',
)
@method_decorator(xframe_options_exempt, name='dispatch')
class MyMediaLTIView(View):
"""
My Media page for LTI-authenticated users
Shows user's media profile in iframe
"""
def get(self, request):
"""Display my media page"""
lti_session = validate_lti_session(request)
if not lti_session:
return JsonResponse({'error': 'Not authenticated via LTI'}, status=403)
profile_url = f"/user/{request.user.username}?mode=lms_embed_mode"
share_media = request.GET.get('share_media')
if share_media == '0':
profile_url += '&share_media=0'
return HttpResponseRedirect(profile_url)
@method_decorator(xframe_options_exempt, name='dispatch')
class EmbedMediaLTIView(View):
"""
Embed media with LTI authentication
Pattern: Extends existing /embed functionality
"""
def get(self, request, friendly_token):
"""Display embedded media"""
media = get_object_or_404(Media, friendly_token=friendly_token)
lti_session = validate_lti_session(request)
if media.state in ["public", "unlisted"]:
can_view = True
else:
can_view = False
if lti_session and request.user.is_authenticated:
context_id = lti_session.get('context_id')
platform_id = lti_session.get('platform_id')
# MediaPermission has to be added so that the user is able to visit the media
if media.is_shared and context_id and platform_id:
try:
resource_link = (
LTIResourceLink.objects.filter(
platform_id=platform_id,
context_id=context_id,
)
.select_related('rbac_group')
.first()
)
if resource_link and resource_link.rbac_group:
has_course_access = RBACMembership.objects.filter(
user=request.user,
rbac_group=resource_link.rbac_group,
).exists()
if has_course_access:
# create an entry so it shows up under shared with me
MediaPermission.objects.get_or_create(
user=request.user,
media=media,
defaults={
'owner_user': media.user,
'permission': 'viewer',
'source': MediaPermission.SOURCE_LTI_EMBED,
},
)
can_view = True
except Exception:
logger.exception('EmbedMediaLTIView: error checking course access for user=%s media=%s', request.user, friendly_token)
if not can_view and request.user.has_member_access_to_media(media):
can_view = True
if not can_view:
return HttpResponse('You do not have permission to view this media', status=403, content_type='text/plain; charset=utf-8')
# Build embed URL with parameters from the request
embed_params = LaunchView.extract_embed_params_from_dict(request.GET)
show_media_page = request.GET.get('show_media_page')
if show_media_page == 'true':
base_path = f"/view?m={friendly_token}"
else:
base_path = f"/embed?m={friendly_token}"
embed_url = LaunchView.build_url_with_embed_params(base_path, embed_params)
return HttpResponseRedirect(embed_url)