diff --git a/lti/adapters.py b/lti/adapters.py index 2e72a36c..5dc05487 100644 --- a/lti/adapters.py +++ b/lti/adapters.py @@ -5,16 +5,19 @@ Provides Django-specific implementations for PyLTI1p3 interfaces """ import json +import time from typing import Any, Dict, Optional +import requests from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization from django.core.cache import cache -from jwcrypto import jwk +from jwcrypto import jwk, jwt from pylti1p3.message_launch import MessageLaunch from pylti1p3.oidc_login import OIDCLogin from pylti1p3.registration import Registration from pylti1p3.request import Request +from pylti1p3.service_connector import ServiceConnector from pylti1p3.tool_config import ToolConfAbstract from .models import LTIPlatform, LTIToolKeys @@ -180,6 +183,84 @@ class DjangoCacheDataStorage: return cache_key in self._cache +class DjangoServiceConnector(ServiceConnector): + def __init__(self, registration): + super().__init__(registration) + self._registration = registration + self._access_token = None + self._access_token_expires = 0 + + def get_access_token(self, scopes): + if self._access_token and time.time() < self._access_token_expires: + return self._access_token + + key_obj = LTIToolKeys.get_or_create_keys() + jwk_obj = jwk.JWK(**key_obj.private_key_jwk) + + now = int(time.time()) + claims = { + 'iss': self._registration.get_client_id(), + 'sub': self._registration.get_client_id(), + 'aud': self._registration.get_auth_token_url(), + 'iat': now, + 'exp': now + 300, + 'jti': str(time.time()), + } + + jwt_token = jwt.JWT(header={'alg': 'RS256', 'kid': key_obj.private_key_jwk['kid']}, claims=claims) + jwt_token.make_signed_token(jwk_obj) + client_assertion = jwt_token.serialize() + + token_url = self._registration.get_auth_token_url() + data = { + 'grant_type': 'client_credentials', + 'client_assertion_type': 'urn:ietf:params:oauth:client-assertion-type:jwt-bearer', + 'client_assertion': client_assertion, + 'scope': ' '.join(scopes), + } + + response = requests.post(token_url, data=data, timeout=10) + response.raise_for_status() + + token_data = response.json() + self._access_token = token_data['access_token'] + expires_in = token_data.get('expires_in', 3600) + self._access_token_expires = time.time() + expires_in - 10 + + return self._access_token + + def make_service_request(self, scopes, url, is_post=False, data=None, **kwargs): + access_token = self.get_access_token(scopes) + + headers = { + 'Authorization': f'Bearer {access_token}', + } + + if 'accept' in kwargs: + headers['Accept'] = kwargs['accept'] + + if is_post: + response = requests.post(url, json=data, headers=headers, timeout=10) + else: + response = requests.get(url, headers=headers, timeout=10) + + response.raise_for_status() + + next_page_url = None + link_header = response.headers.get('Link') + if link_header: + for link in link_header.split(','): + if 'rel="next"' in link: + next_page_url = link.split(';')[0].strip('<> ') + + return { + 'body': response.json(), + 'status_code': response.status_code, + 'headers': dict(response.headers), + 'next_page_url': next_page_url, + } + + class DjangoToolConfig(ToolConfAbstract): """Tool configuration from Django models""" diff --git a/lti/services.py b/lti/services.py index ac6d8437..4f3dd92a 100644 --- a/lti/services.py +++ b/lti/services.py @@ -13,7 +13,7 @@ from pylti1p3.names_roles import NamesRolesProvisioningService from rbac.models import RBACMembership from users.models import User -from .adapters import DjangoToolConfig +from .adapters import DjangoServiceConnector, DjangoToolConfig from .handlers import apply_lti_roles, generate_username_from_lti from .models import LTIUserMapping @@ -49,19 +49,18 @@ class LTINRPSClient: return True def fetch_members(self): - """ - Fetch all course members from Moodle via NRPS - - Returns: - List of member dicts with keys: user_id, name, email, roles, etc. - """ if not self.can_sync(): return [] try: tool_config = DjangoToolConfig.from_platform(self.platform) - nrps = NamesRolesProvisioningService(tool_config, self.nrps_claim) + registration = tool_config.find_registration_by_issuer(self.platform.platform_id) + if not registration: + return [] + + service_connector = DjangoServiceConnector(registration) + nrps = NamesRolesProvisioningService(service_connector, self.nrps_claim) members = nrps.get_members() return members