From 8e5e7991b7a4388c097fd45fd5e2d9d46c5a381e Mon Sep 17 00:00:00 2001 From: Markos Gogoulos Date: Tue, 30 Dec 2025 14:49:40 +0200 Subject: [PATCH] a --- lti/adapters.py | 13 ------ lti/apps.py | 2 - lti/handlers.py | 76 ++++++++++++------------------- lti/models.py | 19 -------- lti/serializers.py | 42 ----------------- lti/services.py | 41 +---------------- lti/views.py | 110 +++------------------------------------------ 7 files changed, 34 insertions(+), 269 deletions(-) delete mode 100644 lti/serializers.py diff --git a/lti/adapters.py b/lti/adapters.py index c66cc976..2e72a36c 100644 --- a/lti/adapters.py +++ b/lti/adapters.py @@ -31,7 +31,6 @@ class DjangoRequest(Request): def get_param(self, key): """Get parameter from GET or POST""" - # Check both POST and GET, POST takes priority value = self._request.POST.get(key) or self._request.GET.get(key) return value @@ -81,7 +80,6 @@ class DjangoMessageLaunch: def validate(self): """Validate the LTI launch message""" - # Create custom MessageLaunch that properly implements _get_request_param class CustomMessageLaunch(MessageLaunch): def _get_request_param(self, key): """Override to properly get request parameters""" @@ -210,7 +208,6 @@ class DjangoToolConfig(ToolConfAbstract): def check_iss_has_many_clients(self, iss): """Check if issuer has multiple clients""" - # For now, we support one client per issuer return False def find_registration_by_issuer(self, iss, *args, **kwargs): @@ -219,7 +216,6 @@ class DjangoToolConfig(ToolConfAbstract): return None config = self._config[iss] - # Create Registration object from config dict registration = Registration() registration.set_issuer(iss) registration.set_client_id(config.get('client_id')) @@ -229,12 +225,10 @@ class DjangoToolConfig(ToolConfAbstract): registration.set_auth_audience(config.get('auth_audience')) registration.set_key_set_url(config.get('key_set_url')) - # Set tool's private key for signing (e.g., Deep Linking responses) key_obj = LTIToolKeys.get_or_create_keys() jwk_obj = jwk.JWK(**key_obj.private_key_jwk) pem_bytes = jwk_obj.export_to_pem(private_key=True, password=None) - # Set both the key and kid directly on Registration internal attributes registration._tool_private_key = pem_bytes.decode('utf-8') registration._tool_private_key_kid = key_obj.private_key_jwk['kid'] @@ -249,7 +243,6 @@ class DjangoToolConfig(ToolConfAbstract): if config.get('client_id') != client_id: return None - # Create Registration object from config dict registration = Registration() registration.set_issuer(iss) registration.set_client_id(config.get('client_id')) @@ -259,12 +252,10 @@ class DjangoToolConfig(ToolConfAbstract): registration.set_auth_audience(config.get('auth_audience')) registration.set_key_set_url(config.get('key_set_url')) - # Set tool's private key for signing (e.g., Deep Linking responses) key_obj = LTIToolKeys.get_or_create_keys() jwk_obj = jwk.JWK(**key_obj.private_key_jwk) pem_bytes = jwk_obj.export_to_pem(private_key=True, password=None) - # Set both the key and kid directly on Registration internal attributes registration._tool_private_key = pem_bytes.decode('utf-8') registration._tool_private_key_kid = key_obj.private_key_jwk['kid'] @@ -299,7 +290,6 @@ class DjangoToolConfig(ToolConfAbstract): def get_jwks(self, iss, client_id=None): """Get JWKS from configuration - returns None to fetch from URL""" - # No caching - PyLTI1p3 will fetch from key_set_url return None def get_iss(self): @@ -313,14 +303,11 @@ class DjangoToolConfig(ToolConfAbstract): PyLTI1p3 calls this to get the tool's private key for signing Returns a cryptography RSA key object that PyJWT can use directly """ - # Load JWK and convert to PEM bytes key_obj = LTIToolKeys.get_or_create_keys() jwk_obj = jwk.JWK(**key_obj.private_key_jwk) - # Export to PEM bytes pem_bytes = jwk_obj.export_to_pem(private_key=True, password=None) - # Load as cryptography key object (PyJWT accepts this) private_key = serialization.load_pem_private_key(pem_bytes, password=None, backend=default_backend()) return private_key diff --git a/lti/apps.py b/lti/apps.py index 03735307..c61acaa0 100644 --- a/lti/apps.py +++ b/lti/apps.py @@ -10,9 +10,7 @@ class LtiConfig(AppConfig): def ready(self): """Initialize LTI app - ensure keys exist""" - # Ensure RSA key pair exists for signing Deep Linking responses try: ensure_keys_exist() except Exception: - # Don't block startup if key generation fails pass diff --git a/lti/handlers.py b/lti/handlers.py index cd8a82ec..6c6df712 100644 --- a/lti/handlers.py +++ b/lti/handlers.py @@ -21,7 +21,6 @@ from users.models import User from .models import LTIResourceLink, LTIRoleMapping, LTIUserMapping -# Default LTI role mappings DEFAULT_LTI_ROLE_MAPPINGS = { 'Instructor': {'global_role': '', 'group_role': 'manager'}, 'TeachingAssistant': {'global_role': '', 'group_role': 'contributor'}, @@ -54,20 +53,16 @@ def provision_lti_user(platform, claims): family_name = claims.get('family_name', '') name = claims.get('name', f"{given_name} {family_name}").strip() - # Check for existing mapping mapping = LTIUserMapping.objects.filter(platform=platform, lti_user_id=lti_user_id).select_related('user').first() if mapping: - # Update existing user user = mapping.user update_fields = [] - # Update email if changed and not empty if email and user.email != email: user.email = email update_fields.append('email') - # Update name fields if changed if given_name and user.first_name != given_name: user.first_name = given_name update_fields.append('first_name') @@ -84,24 +79,19 @@ def provision_lti_user(platform, claims): user.save(update_fields=update_fields) else: - # Create new user username = generate_username_from_lti(lti_user_id, email, given_name, family_name) - # Check if username already exists if User.objects.filter(username=username).exists(): - # Add random suffix username = f"{username}_{hashlib.md5(lti_user_id.encode()).hexdigest()[:6]}" user = User.objects.create_user(username=username, email=email or '', first_name=given_name, last_name=family_name, name=name or username, is_active=True) - # Mark email as verified via allauth if email: try: EmailAddress.objects.create(user=user, email=email, verified=True, primary=True) except Exception: pass - # Create mapping LTIUserMapping.objects.create(platform=platform, lti_user_id=lti_user_id, user=user) return user @@ -110,22 +100,18 @@ def provision_lti_user(platform, claims): def generate_username_from_lti(lti_user_id, email, given_name, family_name): """Generate a username from LTI user info""" - # Try email username if email and '@' in email: username = email.split('@')[0] - # Clean up username - only alphanumeric, underscore, hyphen username = ''.join(c if c.isalnum() or c in '_-' else '_' for c in username) if len(username) >= 4: return username[:30] # Max 30 chars - # Try first.last if given_name and family_name: username = f"{given_name}.{family_name}".lower() username = ''.join(c if c.isalnum() or c in '_-.' else '_' for c in username) if len(username) >= 4: return username[:30] - # Use hashed LTI user ID as fallback user_hash = hashlib.md5(lti_user_id.encode()).hexdigest()[:10] return f"lti_user_{user_hash}" @@ -152,26 +138,39 @@ def provision_lti_context(platform, claims, resource_link_id): context_title = context.get('title', '') context_label = context.get('label', '') - # Try to get existing resource link first to reuse category/group - try: - resource_link = LTIResourceLink.objects.get( - platform=platform, - context_id=context_id, - resource_link_id=resource_link_id, - ) - category = resource_link.category - rbac_group = resource_link.rbac_group + existing_link = LTIResourceLink.objects.filter( + platform=platform, + context_id=context_id, + ).first() - # Update context title if changed - if context_title and resource_link.context_title != context_title: - resource_link.context_title = context_title - resource_link.save(update_fields=['context_title']) + if existing_link: + category = existing_link.category + rbac_group = existing_link.rbac_group + + if context_title and existing_link.context_title != context_title: + existing_link.context_title = context_title + existing_link.save(update_fields=['context_title']) if category and category.title != context_title: category.title = context_title category.save(update_fields=['title']) - except LTIResourceLink.DoesNotExist: - # Create new category and RBAC group with auto-generated UIDs + resource_link, created = LTIResourceLink.objects.get_or_create( + platform=platform, + context_id=context_id, + resource_link_id=resource_link_id, + defaults={ + 'context_title': context_title, + 'context_label': context_label, + 'category': category, + 'rbac_group': rbac_group, + }, + ) + + if not created and context_title and resource_link.context_title != context_title: + resource_link.context_title = context_title + resource_link.save(update_fields=['context_title']) + + else: category = Category.objects.create( title=context_title or context_label or f"Course {context_id}", description=f"Auto-created from {platform.name}: {context_title}", @@ -187,10 +186,8 @@ def provision_lti_context(platform, claims, resource_link_id): description=f"LTI course group from {platform.name}", ) - # Link category to RBAC group rbac_group.categories.add(category) - # Create resource link resource_link = LTIResourceLink.objects.create( platform=platform, context_id=context_id, @@ -219,8 +216,6 @@ def apply_lti_roles(user, platform, lti_roles, rbac_group): if not lti_roles: lti_roles = [] - # Extract short role names from URIs - # e.g., "http://purl.imsglobal.org/vocab/lis/v2/membership#Instructor" -> "Instructor" short_roles = [] for role in lti_roles: if '#' in role: @@ -230,7 +225,6 @@ def apply_lti_roles(user, platform, lti_roles, rbac_group): else: short_roles.append(role) - # Get custom role mappings from database custom_mappings = {} for mapping in LTIRoleMapping.objects.filter(platform=platform): custom_mappings[mapping.lti_role] = { @@ -238,10 +232,8 @@ def apply_lti_roles(user, platform, lti_roles, rbac_group): 'group_role': mapping.group_role, } - # Combine default and custom mappings (custom takes precedence) all_mappings = {**DEFAULT_LTI_ROLE_MAPPINGS, **custom_mappings} - # Determine highest privilege global role global_role = 'user' for role in short_roles: if role in all_mappings: @@ -251,7 +243,6 @@ def apply_lti_roles(user, platform, lti_roles, rbac_group): user.set_role_from_mapping(global_role) - # Determine group role group_role = 'member' for role in short_roles: if role in all_mappings: @@ -259,26 +250,20 @@ def apply_lti_roles(user, platform, lti_roles, rbac_group): if role_group: group_role = get_higher_privilege_group(group_role, role_group) - # Create or update RBAC membership (defensive: handle multiple memberships) memberships = RBACMembership.objects.filter(user=user, rbac_group=rbac_group) if memberships.exists(): - # Check if any membership already has the correct role if not memberships.filter(role=group_role).exists(): - # None have the correct role, update the first one first_membership = memberships.first() first_membership.role = group_role try: first_membership.save() except Exception: - # Save failed (e.g., uniqueness constraint), skip pass else: - # No existing membership, create new one try: RBACMembership.objects.create(user=user, rbac_group=rbac_group, role=group_role) except Exception: - # Creation failed (e.g., uniqueness constraint), skip pass return global_role, group_role @@ -318,15 +303,12 @@ def create_lti_session(request, user, launch_data, platform): Pattern: Uses Django's session framework """ - # Django login (creates session in Redis) login(request, user, backend='django.contrib.auth.backends.ModelBackend') - # Extract key context info context = launch_data.get_launch_data().get('https://purl.imsglobal.org/spec/lti/claim/context', {}) resource_link = launch_data.get_launch_data().get('https://purl.imsglobal.org/spec/lti/claim/resource_link', {}) roles = launch_data.get_launch_data().get('https://purl.imsglobal.org/spec/lti/claim/roles', []) - # Store LTI context in session request.session['lti_session'] = { 'platform_id': platform.id, 'platform_name': platform.name, @@ -337,7 +319,6 @@ def create_lti_session(request, user, launch_data, platform): 'launch_time': timezone.now().isoformat(), } - # Session timeout from settings or default 1 hour timeout = getattr(settings, 'LTI_SESSION_TIMEOUT', 3600) request.session.set_expiry(timeout) @@ -356,7 +337,6 @@ def validate_lti_session(request): if not lti_session: return None - # Check if session has expired (Django handles this, but double-check) if not request.user.is_authenticated: return None diff --git a/lti/models.py b/lti/models.py index c7fd854c..a5f23ca2 100755 --- a/lti/models.py +++ b/lti/models.py @@ -26,7 +26,6 @@ class LTIPlatform(models.Model): remove_from_groups_on_unenroll = models.BooleanField(default=False, help_text="Remove users from RBAC groups when they're no longer in the course") - # Timestamps created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) @@ -56,16 +55,13 @@ class LTIResourceLink(models.Model): platform = models.ForeignKey(LTIPlatform, on_delete=models.CASCADE, related_name='resource_links') - # LTI context (course) context_id = models.CharField(max_length=255, db_index=True, help_text="LTI context ID (typically course ID)") context_title = models.CharField(max_length=255, blank=True, help_text="Course title") context_label = models.CharField(max_length=100, blank=True, help_text="Course short name/code") - # Resource link resource_link_id = models.CharField(max_length=255, db_index=True, help_text="LTI resource link ID") resource_link_title = models.CharField(max_length=255, blank=True, help_text="Resource link title") - # MediaCMS mappings category = models.ForeignKey('files.Category', on_delete=models.SET_NULL, null=True, blank=True, related_name='lti_resource_links', help_text="Mapped MediaCMS category") rbac_group = models.ForeignKey('rbac.RBACGroup', on_delete=models.SET_NULL, null=True, blank=True, related_name='lti_resource_links', help_text="RBAC group for course members") @@ -115,10 +111,8 @@ class LTIRoleMapping(models.Model): platform = models.ForeignKey(LTIPlatform, on_delete=models.CASCADE, related_name='role_mappings') lti_role = models.CharField(max_length=255, help_text="LTI role URI or short name (e.g., 'Instructor', 'Learner')") - # Global role (optional) global_role = models.CharField(max_length=20, blank=True, choices=GLOBAL_ROLE_CHOICES, help_text="MediaCMS global role to assign") - # Group role for RBAC group_role = models.CharField(max_length=20, blank=True, choices=GROUP_ROLE_CHOICES, help_text="RBAC group role to assign") class Meta: @@ -174,7 +168,6 @@ class LTIToolKeys(models.Model): key_id = models.CharField(max_length=255, unique=True, default='mediacms-lti-key', help_text='Key identifier') - # JWK format keys private_key_jwk = models.JSONField(help_text='Private key in JWK format (for signing)') public_key_jwk = models.JSONField(help_text='Public key in JWK format (for JWKS endpoint)') @@ -196,7 +189,6 @@ class LTIToolKeys(models.Model): defaults={'private_key_jwk': {}, 'public_key_jwk': {}}, # Will be populated by save() ) - # If keys are empty, generate them if created or not key_obj.private_key_jwk or not key_obj.public_key_jwk: key_obj.generate_keys() @@ -204,32 +196,21 @@ class LTIToolKeys(models.Model): def generate_keys(self): """Generate new RSA key pair""" - # Generate RSA key pair private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048, backend=default_backend()) - public_key = private_key.public_key() - - # Convert to PEM private_pem = private_key.private_bytes(encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption()) - public_pem = public_key.public_bytes(encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo) - - # Convert to JWK private_jwk = jwk.JWK.from_pem(private_pem) public_jwk = jwk.JWK.from_pem(public_pem) - - # Add metadata private_jwk_dict = json.loads(private_jwk.export()) private_jwk_dict['kid'] = self.key_id private_jwk_dict['alg'] = 'RS256' private_jwk_dict['use'] = 'sig' - public_jwk_dict = json.loads(public_jwk.export_public()) public_jwk_dict['kid'] = self.key_id public_jwk_dict['alg'] = 'RS256' public_jwk_dict['use'] = 'sig' - # Save to database self.private_key_jwk = private_jwk_dict self.public_key_jwk = public_jwk_dict self.save() diff --git a/lti/serializers.py b/lti/serializers.py deleted file mode 100644 index 6b684297..00000000 --- a/lti/serializers.py +++ /dev/null @@ -1,42 +0,0 @@ -""" -REST API Serializers for LTI - -Currently minimal - can be expanded for API endpoints if needed -""" - -from rest_framework import serializers - -from .models import LTIPlatform, LTIResourceLink, LTIUserMapping - - -class LTIPlatformSerializer(serializers.ModelSerializer): - """Serializer for LTI Platform""" - - class Meta: - model = LTIPlatform - fields = ['id', 'name', 'platform_id', 'active', 'enable_nrps', 'enable_deep_linking'] - read_only_fields = ['id'] - - -class LTIResourceLinkSerializer(serializers.ModelSerializer): - """Serializer for LTI Resource Link""" - - platform_name = serializers.CharField(source='platform.name', read_only=True) - category_title = serializers.CharField(source='category.title', read_only=True) - - class Meta: - model = LTIResourceLink - fields = ['id', 'platform', 'platform_name', 'context_id', 'context_title', 'category', 'category_title', 'launch_count', 'last_launch'] - read_only_fields = ['id', 'launch_count', 'last_launch'] - - -class LTIUserMappingSerializer(serializers.ModelSerializer): - """Serializer for LTI User Mapping""" - - username = serializers.CharField(source='user.username', read_only=True) - platform_name = serializers.CharField(source='platform.name', read_only=True) - - class Meta: - model = LTIUserMapping - fields = ['id', 'platform', 'platform_name', 'lti_user_id', 'user', 'username', 'email', 'name', 'last_login'] - read_only_fields = ['id', 'last_login'] diff --git a/lti/services.py b/lti/services.py index 01d7c46c..ac6d8437 100644 --- a/lti/services.py +++ b/lti/services.py @@ -32,7 +32,6 @@ class LTINRPSClient: self.platform = platform self.launch_claims = launch_claims - # Extract NRPS claim self.nrps_claim = launch_claims.get('https://purl.imsglobal.org/spec/lti-nrps/claim/namesroleservice') def can_sync(self): @@ -60,14 +59,9 @@ class LTINRPSClient: return [] try: - # Use PyLTI1p3's NRPS service - # Note: This requires proper configuration in the tool config tool_config = DjangoToolConfig.from_platform(self.platform) - - # Pass the entire NRPS claim as service_data, not just the URL nrps = NamesRolesProvisioningService(tool_config, self.nrps_claim) - # Fetch members members = nrps.get_members() return members @@ -101,10 +95,8 @@ class LTINRPSClient: processed_users.add(user.id) - # Get roles from member roles = member.get('roles', []) - # Apply role mapping apply_lti_roles(user, self.platform, roles, rbac_group) synced_count += 1 @@ -112,7 +104,6 @@ class LTINRPSClient: except Exception: continue - # Remove unenrolled users if configured removed_count = 0 if self.platform.remove_from_groups_on_unenroll: removed = RBACMembership.objects.filter(rbac_group=rbac_group).exclude(user_id__in=processed_users) @@ -138,26 +129,21 @@ class LTINRPSClient: if not user_id: return None - # Get user details from NRPS data name = member.get('name', '') email = member.get('email', '') given_name = member.get('given_name', '') family_name = member.get('family_name', '') - # Check for existing mapping mapping = LTIUserMapping.objects.filter(platform=self.platform, lti_user_id=user_id).select_related('user').first() if mapping: - # Update existing user details if they changed user = mapping.user update_fields = [] - # Update email if changed and not empty if email and user.email != email: user.email = email update_fields.append('email') - # Update name fields if changed if given_name and user.first_name != given_name: user.first_name = given_name update_fields.append('first_name') @@ -173,42 +159,17 @@ class LTINRPSClient: if update_fields: user.save(update_fields=update_fields) - # Update mapping cache - mapping_update_fields = [] - if email and mapping.email != email: - mapping.email = email - mapping_update_fields.append('email') - if given_name and mapping.given_name != given_name: - mapping.given_name = given_name - mapping_update_fields.append('given_name') - if family_name and mapping.family_name != family_name: - mapping.family_name = family_name - mapping_update_fields.append('family_name') - if name and mapping.name != name: - mapping.name = name - mapping_update_fields.append('name') - - if mapping_update_fields: - mapping.save(update_fields=mapping_update_fields) - return user - # Create new user from NRPS data - - # Generate username username = generate_username_from_lti(user_id, email, given_name, family_name) - # Check if username exists if User.objects.filter(username=username).exists(): username = f"{username}_{hashlib.md5(user_id.encode()).hexdigest()[:6]}" - # Create user user = User.objects.create_user(username=username, email=email or '', first_name=given_name, last_name=family_name, name=name or username, is_active=True) - # Create mapping - LTIUserMapping.objects.create(platform=self.platform, lti_user_id=user_id, user=user, email=email, given_name=given_name, family_name=family_name, name=name) + LTIUserMapping.objects.create(platform=self.platform, lti_user_id=user_id, user=user) - # Mark email as verified if email: try: EmailAddress.objects.create(user=user, email=email, verified=True, primary=True) diff --git a/lti/views.py b/lti/views.py index 3014197d..7f4a3c62 100644 --- a/lti/views.py +++ b/lti/views.py @@ -22,6 +22,7 @@ from django.utils.decorators import method_decorator from django.views import View from django.views.decorators.clickjacking import xframe_options_exempt from django.views.decorators.csrf import csrf_exempt +from jwcrypto import jwk from pylti1p3.exception import LtiException from pylti1p3.message_launch import MessageLaunch from pylti1p3.oidc_login import OIDCLogin @@ -42,7 +43,7 @@ from .handlers import ( validate_lti_session, ) from .keys import get_jwks -from .models import LTILaunchLog, LTIPlatform, LTIResourceLink +from .models import LTILaunchLog, LTIPlatform, LTIResourceLink, LTIToolKeys from .services import LTINRPSClient @@ -73,67 +74,38 @@ class OIDCLoginView(View): def handle_oidc_login(self, request): """Handle OIDC login initiation""" try: - print("=" * 80) - print("OIDC LOGIN INITIATED") - print("=" * 80) - - # Get target_link_uri and other OIDC params target_link_uri = request.GET.get('target_link_uri') or request.POST.get('target_link_uri') iss = request.GET.get('iss') or request.POST.get('iss') client_id = request.GET.get('client_id') or request.POST.get('client_id') login_hint = request.GET.get('login_hint') or request.POST.get('login_hint') lti_message_hint = request.GET.get('lti_message_hint') or request.POST.get('lti_message_hint') - print(f"Target Link URI: {target_link_uri}") - print(f"Issuer (iss): {iss}") - print(f"Client ID: {client_id}") - print(f"Login Hint: {login_hint}") - print(f"LTI Message Hint: {lti_message_hint}") - if not all([target_link_uri, iss, client_id]): - print("ERROR: Missing required OIDC parameters") return JsonResponse({'error': 'Missing required OIDC parameters'}, status=400) - # Get platform configuration - print(f"Looking for platform with iss={iss}, client_id={client_id}") try: platform = LTIPlatform.objects.get(platform_id=iss, client_id=client_id) - print(f"Platform found: {platform.name}") except LTIPlatform.DoesNotExist: - print(f"ERROR: No platform found with iss={iss}, client_id={client_id}") - print("Available platforms:") - for p in LTIPlatform.objects.all(): - print(f" - {p.name}: platform_id={p.platform_id}, client_id={p.client_id}") return JsonResponse({'error': 'Platform not found'}, status=404) - # Create tool config for this platform tool_config = DjangoToolConfig.from_platform(platform) - print("Tool config created") - # Wrap Django request for PyLTI1p3 lti_request = DjangoRequest(request) - # Create OIDC login handler with session and cookie services session_service = DjangoSessionService(request) cookie_service = DjangoSessionService(request) # Using same service for cookies oidc_login = OIDCLogin(lti_request, tool_config, session_service=session_service, cookie_service=cookie_service) - - # Redirect to platform's authorization endpoint try: oidc_with_cookies = oidc_login.enable_check_cookies() redirect_url = oidc_with_cookies.redirect(target_link_uri) if not redirect_url: - # Manual OIDC redirect construction with all required OAuth 2.0 parameters - state = str(uuid.uuid4()) nonce = str(uuid.uuid4()) - # Store state and nonce in session for validation session_service.save_launch_data(f'state-{state}', {'target_link_uri': target_link_uri, 'nonce': nonce}) - # Build redirect URL with all required parameters params = { 'response_type': 'id_token', 'redirect_uri': target_link_uri, @@ -146,7 +118,6 @@ class OIDCLoginView(View): 'nonce': nonce, } - # Add optional parameters if present if lti_message_hint: params['lti_message_hint'] = lti_message_hint @@ -157,11 +128,9 @@ class OIDCLoginView(View): raise except LtiException as e: - print(f"LtiException during OIDC login: {str(e)}") traceback.print_exc() return render(request, 'lti/launch_error.html', {'error': 'OIDC Login Failed', 'message': str(e)}, status=400) - except Exception as e: - print(f"Exception during OIDC login: {str(e)}") + except Exception as e: # noqa traceback.print_exc() return JsonResponse({'error': 'Internal server error during OIDC login'}, status=500) @@ -183,51 +152,25 @@ class LaunchView(View): claims = {} try: - print("=" * 80) - print("LTI LAUNCH INITIATED") - print("=" * 80) - - # Get issuer from request id_token = request.POST.get('id_token') if not id_token: - print("ERROR: Missing id_token in launch request") raise ValueError("Missing id_token in launch request") - print(f"Received id_token (first 50 chars): {id_token[:50]}...") - - # Decode JWT to get issuer (without validation first) - unverified = jwt.decode(id_token, options={"verify_signature": False}) iss = unverified.get('iss') aud = unverified.get('aud') - - print(f"Decoded JWT - Issuer (iss): {iss}") - print(f"Decoded JWT - Audience (aud): {aud}") - - # Get platform - print(f"Looking for platform with platform_id={iss}, client_id={aud}") try: platform = LTIPlatform.objects.get(platform_id=iss, client_id=aud) - print(f"Platform found: {platform.name}") except LTIPlatform.DoesNotExist: - print(f"ERROR: No platform found with platform_id={iss}, client_id={aud}") - print("Available platforms:") - for p in LTIPlatform.objects.all(): - print(f" - {p.name}: platform_id={p.platform_id}, client_id={p.client_id}") raise - # Create tool config tool_config = DjangoToolConfig.from_platform(platform) - print("Tool config created") - # Wrap Django request for PyLTI1p3 lti_request = DjangoRequest(request) - # Validate JWT and get launch data session_service = DjangoSessionService(request) cookie_service = DjangoSessionService(request) - # Create custom MessageLaunch that properly implements _get_request_param class CustomMessageLaunch(MessageLaunch): def _get_request_param(self, key): """Override to properly get request parameters""" @@ -235,55 +178,40 @@ class LaunchView(View): message_launch = CustomMessageLaunch(lti_request, tool_config, session_service=session_service, cookie_service=cookie_service) - # Get validated launch data launch_data = message_launch.get_launch_data() claims = self.sanitize_claims(launch_data) - # Extract key claims resource_link = launch_data.get('https://purl.imsglobal.org/spec/lti/claim/resource_link', {}) resource_link_id = resource_link.get('id', 'default') roles = launch_data.get('https://purl.imsglobal.org/spec/lti/claim/roles', []) - # Check launch type message_type = launch_data.get('https://purl.imsglobal.org/spec/lti/claim/message_type') if message_type == 'LtiDeepLinkingRequest': - # Deep linking request - handle separately return self.handle_deep_linking_launch(request, message_launch, platform, launch_data) user = provision_lti_user(platform, launch_data) - # Provision context (category + RBAC group) if 'https://purl.imsglobal.org/spec/lti/claim/context' in launch_data: category, rbac_group, resource_link_obj = provision_lti_context(platform, launch_data, resource_link_id) - # Apply roles apply_lti_roles(user, platform, roles, rbac_group) else: - # No context - might be a direct media embed resource_link_obj = None - # Create session create_lti_session(request, user, message_launch, platform) - # Log successful launch LTILaunchLog.objects.create(platform=platform, user=user, resource_link=resource_link_obj, launch_type='resource_link', success=True, claims=claims) - # Determine where to redirect redirect_url = self.determine_redirect(launch_data, resource_link_obj) return HttpResponseRedirect(redirect_url) - except LtiException as e: - error_message = f"LTI Launch Error: {str(e)}" - print(f"LtiException during launch: {error_message}") + except LtiException as e: # noqa traceback.print_exc() - except Exception as e: - error_message = f"Launch Error: {str(e)}" - print(f"Exception during launch: {error_message}") + except Exception as e: # noqa traceback.print_exc() - # Log failed launch if platform: LTILaunchLog.objects.create(platform=platform, user=user, launch_type='resource_link', success=False, error_message=error_message, claims=claims) @@ -292,25 +220,20 @@ class LaunchView(View): def sanitize_claims(self, claims): """Remove sensitive data from claims before logging""" safe_claims = claims.copy() - # Remove any sensitive keys if needed return safe_claims def determine_redirect(self, launch_data, resource_link): """Determine where to redirect after successful launch""" - # Check for custom parameters indicating what to show custom = launch_data.get('https://purl.imsglobal.org/spec/lti/claim/custom', {}) - # Check for custom redirect URL (any MediaCMS path) custom_path = custom.get('redirect_path') if custom_path: - # Ensure it starts with / and doesn't include domain if not custom_path.startswith('/'): custom_path = '/' + custom_path return custom_path - # Check if specific media is requested media_id = custom.get('media_id') or custom.get('media_friendly_token') if media_id: try: @@ -319,12 +242,10 @@ class LaunchView(View): except Media.DoesNotExist: pass - # Default: redirect to my media return reverse('lti:my_media') def handle_deep_linking_launch(self, request, message_launch, platform, launch_data): """Handle deep linking request""" - # Get deep linking settings from launch data deep_linking_settings = launch_data.get('https://purl.imsglobal.org/spec/lti-dl/claim/deep_linking_settings', {}) if not deep_linking_settings: @@ -335,7 +256,6 @@ class LaunchView(View): if not deep_link_return_url: raise ValueError("Missing deep_link_return_url in deep linking settings") - # Store deep link data in session for use in SelectMediaView request.session['lti_deep_link'] = { 'deep_link_return_url': deep_link_return_url, 'deployment_id': launch_data.get('https://purl.imsglobal.org/spec/lti/claim/deployment_id'), @@ -343,7 +263,6 @@ class LaunchView(View): 'message_launch_data': launch_data, # Store full launch data for JWT creation } - # Redirect to media selection page return HttpResponseRedirect(reverse('lti:select_media')) @@ -356,7 +275,6 @@ class JWKSView(View): def get(self, request): """Return tool's public JWK Set""" - # Return public keys for signature validation jwks = get_jwks() return JsonResponse(jwks, content_type='application/json') @@ -369,19 +287,12 @@ class PublicKeyPEMView(View): def get(self, request): """Return public key in PEM format""" - from jwcrypto import jwk - - from .models import LTIToolKeys - - # Get key from database key_obj = LTIToolKeys.get_or_create_keys() - # Convert to PEM jwk_obj = jwk.JWK(**key_obj.public_key_jwk) pem_bytes = jwk_obj.export_to_pem() pem_string = pem_bytes.decode('utf-8') - # Return as plain text for easy copy/paste return HttpResponse( f"MediaCMS LTI Public Key (PEM Format)\n" f"{'=' * 80}\n\n" @@ -406,14 +317,11 @@ class MyMediaLTIView(View): def get(self, request): """Display my media page""" - # Validate LTI session lti_session = validate_lti_session(request) if not lti_session: return JsonResponse({'error': 'Not authenticated via LTI'}, status=403) - # Redirect to user's profile page - # The existing user profile page is already iframe-compatible profile_url = f"/user/{request.user.username}" return HttpResponseRedirect(profile_url) @@ -430,7 +338,6 @@ class EmbedMediaLTIView(View): """Display embedded media""" media = get_object_or_404(Media, friendly_token=friendly_token) - # Check LTI session lti_session = validate_lti_session(request) if lti_session and request.user.is_authenticated: @@ -445,7 +352,6 @@ class EmbedMediaLTIView(View): if not can_view: return JsonResponse({'error': 'Access denied', 'message': 'You do not have permission to view this media'}, status=403) - # Redirect to media view page return HttpResponseRedirect(f"/view?m={friendly_token}") @@ -462,16 +368,13 @@ class ManualSyncView(APIView): def post(self, request, platform_id, context_id): """Manually trigger NRPS sync""" try: - # Get platform platform = get_object_or_404(LTIPlatform, id=platform_id) - # Find resource link by context resource_link = LTIResourceLink.objects.filter(platform=platform, context_id=context_id).first() if not resource_link: return Response({'error': 'Context not found', 'message': f'No resource link found for context {context_id}'}, status=status.HTTP_404_NOT_FOUND) - # Verify user has manager role in the course rbac_group = resource_link.rbac_group if not rbac_group: return Response({'error': 'No RBAC group', 'message': 'This context does not have an associated RBAC group'}, status=status.HTTP_400_BAD_REQUEST) @@ -481,17 +384,14 @@ class ManualSyncView(APIView): if not is_manager: return Response({'error': 'Insufficient permissions', 'message': 'You must be a course manager to sync members'}, status=status.HTTP_403_FORBIDDEN) - # Check NRPS is enabled if not platform.enable_nrps: return Response({'error': 'NRPS disabled', 'message': 'Names and Role Provisioning Service is disabled for this platform'}, status=status.HTTP_400_BAD_REQUEST) - # Get last successful launch for NRPS endpoint last_launch = LTILaunchLog.objects.filter(platform=platform, resource_link=resource_link, success=True).order_by('-created_at').first() if not last_launch: return Response({'error': 'No launch data', 'message': 'No successful launch data found for NRPS'}, status=status.HTTP_400_BAD_REQUEST) - # Perform NRPS sync nrps_client = LTINRPSClient(platform, last_launch.claims) result = nrps_client.sync_members_to_rbac_group(rbac_group)