mirror of
https://github.com/mediacms-io/mediacms.git
synced 2026-03-09 06:27:21 -04:00
373 lines
13 KiB
Python
373 lines
13 KiB
Python
"""
|
|
LTI Launch Handlers for User and Context Provisioning
|
|
|
|
Provides functions to:
|
|
- Create/update MediaCMS users from LTI launches
|
|
- Create/update categories and RBAC groups for courses
|
|
- Apply role mappings from LTI to MediaCMS
|
|
- Create and manage LTI sessions
|
|
"""
|
|
|
|
import base64
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
|
|
from allauth.account.models import EmailAddress
|
|
from django.conf import settings
|
|
from django.contrib.auth import login
|
|
from django.utils import timezone
|
|
|
|
from files.models import Category
|
|
from rbac.models import RBACGroup, RBACMembership
|
|
from users.models import User
|
|
|
|
from .models import LTIResourceLink, LTIRoleMapping, LTIUserMapping
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
DEFAULT_LTI_ROLE_MAPPINGS = {
|
|
# LTI role names (used in standard launches)
|
|
'Instructor': {'global_role': '', 'group_role': 'manager'},
|
|
'TeachingAssistant': {'global_role': '', 'group_role': 'contributor'},
|
|
'Learner': {'global_role': '', 'group_role': 'member'},
|
|
'Student': {'global_role': '', 'group_role': 'member'},
|
|
'Administrator': {'global_role': '', 'group_role': 'manager'},
|
|
'Faculty': {'global_role': '', 'group_role': 'manager'},
|
|
# Moodle role shortnames (used in custom_publishdata from My Media launches)
|
|
'student': {'global_role': '', 'group_role': 'member'},
|
|
'guest': {'global_role': '', 'group_role': 'member'},
|
|
'teacher': {'global_role': '', 'group_role': 'manager'},
|
|
'editingteacher': {'global_role': '', 'group_role': 'manager'},
|
|
'manager': {'global_role': '', 'group_role': 'manager'},
|
|
'coursecreator': {'global_role': '', 'group_role': 'manager'},
|
|
'ta': {'global_role': '', 'group_role': 'contributor'},
|
|
}
|
|
|
|
|
|
def _ensure_course_context(platform, context_id, title, label, resource_link_id):
|
|
"""
|
|
Find or create the LTIResourceLink, Category, and RBACGroup for a course.
|
|
|
|
When a record already exists (e.g. created by a bulk My Media launch), it is
|
|
reused and its metadata is kept in sync. The resource_link_id is only
|
|
promoted when a real launch ID arrives to replace a 'bulk_*' placeholder.
|
|
|
|
Returns:
|
|
Tuple of (category, rbac_group, resource_link)
|
|
"""
|
|
resource_link = LTIResourceLink.objects.filter(
|
|
platform=platform,
|
|
context_id=context_id,
|
|
).first()
|
|
|
|
if resource_link:
|
|
category = resource_link.category
|
|
rbac_group = resource_link.rbac_group
|
|
|
|
rl_updates = []
|
|
if title and resource_link.context_title != title:
|
|
resource_link.context_title = title
|
|
rl_updates.append('context_title')
|
|
if label and resource_link.context_label != label:
|
|
resource_link.context_label = label
|
|
rl_updates.append('context_label')
|
|
# Promote from bulk placeholder to real resource link ID.
|
|
if resource_link_id and not resource_link_id.startswith('bulk_') and resource_link.resource_link_id != resource_link_id:
|
|
resource_link.resource_link_id = resource_link_id
|
|
rl_updates.append('resource_link_id')
|
|
if rl_updates:
|
|
resource_link.save(update_fields=rl_updates)
|
|
|
|
if title and category and category.title != title:
|
|
category.title = title
|
|
category.save(update_fields=['title'])
|
|
|
|
else:
|
|
category = Category.objects.create(
|
|
title=title or label or f'Course {context_id}',
|
|
description=f'Auto-created from {platform.name}: {title}',
|
|
is_global=False,
|
|
is_rbac_category=True,
|
|
is_lms_course=True,
|
|
lti_platform=platform,
|
|
lti_context_id=context_id,
|
|
)
|
|
rbac_group = RBACGroup.objects.create(
|
|
name=f'{title or label} ({platform.name})',
|
|
description=f'LTI course group from {platform.name}',
|
|
)
|
|
rbac_group.categories.add(category)
|
|
resource_link = LTIResourceLink.objects.create(
|
|
platform=platform,
|
|
context_id=context_id,
|
|
resource_link_id=resource_link_id,
|
|
context_title=title,
|
|
context_label=label,
|
|
category=category,
|
|
rbac_group=rbac_group,
|
|
)
|
|
|
|
return category, rbac_group, resource_link
|
|
|
|
|
|
def _ensure_membership(user, rbac_group, group_role):
|
|
"""
|
|
Ensure the user is a member of rbac_group with at least group_role.
|
|
Upgrades the role if the user already has a lower one; never downgrades.
|
|
"""
|
|
existing = RBACMembership.objects.filter(user=user, rbac_group=rbac_group).first()
|
|
if existing:
|
|
final_role = get_higher_privilege_group(existing.role, group_role)
|
|
if final_role != existing.role:
|
|
existing.role = final_role
|
|
existing.save(update_fields=['role'])
|
|
else:
|
|
try:
|
|
RBACMembership.objects.create(user=user, rbac_group=rbac_group, role=group_role)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
# ── Public API ─────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def provision_lti_user(platform, claims):
|
|
"""
|
|
Provision MediaCMS user from LTI launch claims.
|
|
|
|
Returns:
|
|
User instance
|
|
"""
|
|
lti_user_id = claims.get('sub')
|
|
if not lti_user_id:
|
|
raise ValueError("Missing 'sub' claim in LTI launch")
|
|
|
|
email = claims.get('email', '')
|
|
given_name = claims.get('given_name', '')
|
|
family_name = claims.get('family_name', '')
|
|
name = claims.get('name', f"{given_name} {family_name}").strip()
|
|
|
|
mapping = LTIUserMapping.objects.filter(platform=platform, lti_user_id=lti_user_id).select_related('user').first()
|
|
|
|
if mapping:
|
|
user = mapping.user
|
|
update_fields = []
|
|
|
|
if email and user.email != email:
|
|
user.email = email
|
|
update_fields.append('email')
|
|
if given_name and user.first_name != given_name:
|
|
user.first_name = given_name
|
|
update_fields.append('first_name')
|
|
if family_name and user.last_name != family_name:
|
|
user.last_name = family_name
|
|
update_fields.append('last_name')
|
|
if name and user.name != name:
|
|
user.name = name
|
|
update_fields.append('name')
|
|
|
|
if update_fields:
|
|
user.save(update_fields=update_fields)
|
|
|
|
else:
|
|
username = generate_username_from_lti(lti_user_id, email, given_name, family_name)
|
|
if User.objects.filter(username=username).exists():
|
|
username = f"{username}_{hashlib.md5(lti_user_id.encode()).hexdigest()[:6]}"
|
|
|
|
user = User.objects.create_user(
|
|
username=username,
|
|
email=email or '',
|
|
first_name=given_name,
|
|
last_name=family_name,
|
|
name=name or username,
|
|
is_active=True,
|
|
)
|
|
|
|
if email:
|
|
try:
|
|
EmailAddress.objects.create(user=user, email=email, verified=True, primary=True)
|
|
except Exception:
|
|
pass
|
|
|
|
LTIUserMapping.objects.create(platform=platform, lti_user_id=lti_user_id, user=user)
|
|
|
|
return user
|
|
|
|
|
|
def generate_username_from_lti(lti_user_id, email, given_name, family_name):
|
|
"""Generate a username from LTI user info."""
|
|
if email and '@' in email:
|
|
username = email.split('@')[0]
|
|
username = ''.join(c if c.isalnum() or c in '_-' else '_' for c in username)
|
|
if len(username) >= 4:
|
|
return username[:30]
|
|
|
|
if given_name and family_name:
|
|
username = f"{given_name}.{family_name}".lower()
|
|
username = ''.join(c if c.isalnum() or c in '_-.' else '_' for c in username)
|
|
if len(username) >= 4:
|
|
return username[:30]
|
|
|
|
return f"lti_user_{hashlib.md5(lti_user_id.encode()).hexdigest()[:10]}"
|
|
|
|
|
|
def provision_lti_context(platform, claims, resource_link_id):
|
|
"""
|
|
Provision MediaCMS category and RBAC group for an LTI context (course).
|
|
|
|
Returns:
|
|
Tuple of (category, rbac_group, resource_link)
|
|
"""
|
|
context = claims.get('https://purl.imsglobal.org/spec/lti/claim/context', {})
|
|
context_id = context.get('id')
|
|
if not context_id:
|
|
raise ValueError("Missing context ID in LTI launch")
|
|
|
|
return _ensure_course_context(
|
|
platform=platform,
|
|
context_id=str(context_id),
|
|
title=context.get('title', ''),
|
|
label=context.get('label', ''),
|
|
resource_link_id=resource_link_id,
|
|
)
|
|
|
|
|
|
def provision_lti_bulk_contexts(platform, user, publish_data_raw):
|
|
"""
|
|
Bulk-provision categories, groups, and memberships for every course the
|
|
user is enrolled in, as reported by the LMS via custom_publishdata.
|
|
|
|
Called on My Media launches. Skips the Moodle site course (ID 1).
|
|
"""
|
|
try:
|
|
padding = 4 - len(publish_data_raw) % 4
|
|
if padding != 4:
|
|
publish_data_raw += '=' * padding
|
|
courses = json.loads(base64.b64decode(publish_data_raw).decode('utf-8'))
|
|
except Exception as exc:
|
|
logger.warning('provision_lti_bulk_contexts: failed to decode publishdata: %s', exc)
|
|
return
|
|
|
|
if not isinstance(courses, list):
|
|
logger.warning('provision_lti_bulk_contexts: publishdata is not a list')
|
|
return
|
|
|
|
for course in courses:
|
|
try:
|
|
course_id = str(course.get('id', '')).strip()
|
|
if not course_id:
|
|
continue
|
|
|
|
fullname = course.get('fullname', '')
|
|
shortname = course.get('shortname', '')
|
|
group_role = DEFAULT_LTI_ROLE_MAPPINGS.get(course.get('role', 'student'), {}).get('group_role', 'member')
|
|
|
|
_, rbac_group, _ = _ensure_course_context(
|
|
platform=platform,
|
|
context_id=course_id,
|
|
title=fullname,
|
|
label=shortname,
|
|
resource_link_id=f'bulk_{course_id}',
|
|
)
|
|
|
|
if rbac_group:
|
|
_ensure_membership(user, rbac_group, group_role)
|
|
|
|
except Exception as exc:
|
|
logger.warning(
|
|
'provision_lti_bulk_contexts: error processing course %s: %s',
|
|
course.get('id'),
|
|
exc,
|
|
)
|
|
|
|
|
|
def apply_lti_roles(user, platform, lti_roles, rbac_group):
|
|
"""
|
|
Apply role mappings from LTI role URIs to MediaCMS global and group roles.
|
|
|
|
Returns:
|
|
Tuple of (global_role, group_role)
|
|
"""
|
|
short_roles = []
|
|
for role in lti_roles or []:
|
|
if '#' in role:
|
|
short_roles.append(role.split('#')[-1])
|
|
elif '/' in role:
|
|
short_roles.append(role.split('/')[-1])
|
|
else:
|
|
short_roles.append(role)
|
|
|
|
custom_mappings = {m.lti_role: {'global_role': m.global_role, 'group_role': m.group_role} for m in LTIRoleMapping.objects.filter(platform=platform)}
|
|
all_mappings = {**DEFAULT_LTI_ROLE_MAPPINGS, **custom_mappings}
|
|
|
|
global_role = 'user'
|
|
group_role = 'member'
|
|
for role in short_roles:
|
|
if role in all_mappings:
|
|
if all_mappings[role].get('global_role'):
|
|
global_role = get_higher_privilege_global(global_role, all_mappings[role]['global_role'])
|
|
if all_mappings[role].get('group_role'):
|
|
group_role = get_higher_privilege_group(group_role, all_mappings[role]['group_role'])
|
|
|
|
user.set_role_from_mapping(global_role)
|
|
_ensure_membership(user, rbac_group, group_role)
|
|
|
|
return global_role, group_role
|
|
|
|
|
|
def get_higher_privilege_global(role1, role2):
|
|
"""Return the higher privilege global role."""
|
|
privilege_order = ['user', 'advancedUser', 'editor', 'manager', 'admin']
|
|
try:
|
|
return privilege_order[max(privilege_order.index(role1), privilege_order.index(role2))]
|
|
except ValueError:
|
|
return role2
|
|
|
|
|
|
def get_higher_privilege_group(role1, role2):
|
|
"""Return the higher privilege group role."""
|
|
privilege_order = ['member', 'contributor', 'manager']
|
|
try:
|
|
return privilege_order[max(privilege_order.index(role1), privilege_order.index(role2))]
|
|
except ValueError:
|
|
return role2
|
|
|
|
|
|
def create_lti_session(request, user, launch_data, platform):
|
|
"""Create a MediaCMS session from an LTI launch."""
|
|
login(request, user, backend='django.contrib.auth.backends.ModelBackend')
|
|
|
|
ld = launch_data.get_launch_data()
|
|
context = ld.get('https://purl.imsglobal.org/spec/lti/claim/context', {})
|
|
resource_link = ld.get('https://purl.imsglobal.org/spec/lti/claim/resource_link', {})
|
|
roles = ld.get('https://purl.imsglobal.org/spec/lti/claim/roles', [])
|
|
|
|
request.session['lti_session'] = {
|
|
'platform_id': platform.id,
|
|
'platform_name': platform.name,
|
|
'context_id': context.get('id'),
|
|
'context_title': context.get('title'),
|
|
'resource_link_id': resource_link.get('id'),
|
|
'roles': roles,
|
|
'launch_time': timezone.now().isoformat(),
|
|
}
|
|
|
|
request.session.set_expiry(getattr(settings, 'LTI_SESSION_TIMEOUT', 3600))
|
|
request.session.modified = True
|
|
request.session.save()
|
|
|
|
return True
|
|
|
|
|
|
def validate_lti_session(request):
|
|
"""
|
|
Validate that an LTI session exists and is valid.
|
|
|
|
Returns:
|
|
Dict of LTI session data or None
|
|
"""
|
|
if not request.user.is_authenticated:
|
|
return None
|
|
return request.session.get('lti_session')
|