mirror of
https://github.com/mediacms-io/mediacms.git
synced 2026-01-20 07:12:58 -05:00
a
This commit is contained in:
@@ -5,16 +5,19 @@ Provides Django-specific implementations for PyLTI1p3 interfaces
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import json
|
import json
|
||||||
|
import time
|
||||||
from typing import Any, Dict, Optional
|
from typing import Any, Dict, Optional
|
||||||
|
|
||||||
|
import requests
|
||||||
from cryptography.hazmat.backends import default_backend
|
from cryptography.hazmat.backends import default_backend
|
||||||
from cryptography.hazmat.primitives import serialization
|
from cryptography.hazmat.primitives import serialization
|
||||||
from django.core.cache import cache
|
from django.core.cache import cache
|
||||||
from jwcrypto import jwk
|
from jwcrypto import jwk, jwt
|
||||||
from pylti1p3.message_launch import MessageLaunch
|
from pylti1p3.message_launch import MessageLaunch
|
||||||
from pylti1p3.oidc_login import OIDCLogin
|
from pylti1p3.oidc_login import OIDCLogin
|
||||||
from pylti1p3.registration import Registration
|
from pylti1p3.registration import Registration
|
||||||
from pylti1p3.request import Request
|
from pylti1p3.request import Request
|
||||||
|
from pylti1p3.service_connector import ServiceConnector
|
||||||
from pylti1p3.tool_config import ToolConfAbstract
|
from pylti1p3.tool_config import ToolConfAbstract
|
||||||
|
|
||||||
from .models import LTIPlatform, LTIToolKeys
|
from .models import LTIPlatform, LTIToolKeys
|
||||||
@@ -180,6 +183,84 @@ class DjangoCacheDataStorage:
|
|||||||
return cache_key in self._cache
|
return cache_key in self._cache
|
||||||
|
|
||||||
|
|
||||||
|
class DjangoServiceConnector(ServiceConnector):
|
||||||
|
def __init__(self, registration):
|
||||||
|
super().__init__(registration)
|
||||||
|
self._registration = registration
|
||||||
|
self._access_token = None
|
||||||
|
self._access_token_expires = 0
|
||||||
|
|
||||||
|
def get_access_token(self, scopes):
|
||||||
|
if self._access_token and time.time() < self._access_token_expires:
|
||||||
|
return self._access_token
|
||||||
|
|
||||||
|
key_obj = LTIToolKeys.get_or_create_keys()
|
||||||
|
jwk_obj = jwk.JWK(**key_obj.private_key_jwk)
|
||||||
|
|
||||||
|
now = int(time.time())
|
||||||
|
claims = {
|
||||||
|
'iss': self._registration.get_client_id(),
|
||||||
|
'sub': self._registration.get_client_id(),
|
||||||
|
'aud': self._registration.get_auth_token_url(),
|
||||||
|
'iat': now,
|
||||||
|
'exp': now + 300,
|
||||||
|
'jti': str(time.time()),
|
||||||
|
}
|
||||||
|
|
||||||
|
jwt_token = jwt.JWT(header={'alg': 'RS256', 'kid': key_obj.private_key_jwk['kid']}, claims=claims)
|
||||||
|
jwt_token.make_signed_token(jwk_obj)
|
||||||
|
client_assertion = jwt_token.serialize()
|
||||||
|
|
||||||
|
token_url = self._registration.get_auth_token_url()
|
||||||
|
data = {
|
||||||
|
'grant_type': 'client_credentials',
|
||||||
|
'client_assertion_type': 'urn:ietf:params:oauth:client-assertion-type:jwt-bearer',
|
||||||
|
'client_assertion': client_assertion,
|
||||||
|
'scope': ' '.join(scopes),
|
||||||
|
}
|
||||||
|
|
||||||
|
response = requests.post(token_url, data=data, timeout=10)
|
||||||
|
response.raise_for_status()
|
||||||
|
|
||||||
|
token_data = response.json()
|
||||||
|
self._access_token = token_data['access_token']
|
||||||
|
expires_in = token_data.get('expires_in', 3600)
|
||||||
|
self._access_token_expires = time.time() + expires_in - 10
|
||||||
|
|
||||||
|
return self._access_token
|
||||||
|
|
||||||
|
def make_service_request(self, scopes, url, is_post=False, data=None, **kwargs):
|
||||||
|
access_token = self.get_access_token(scopes)
|
||||||
|
|
||||||
|
headers = {
|
||||||
|
'Authorization': f'Bearer {access_token}',
|
||||||
|
}
|
||||||
|
|
||||||
|
if 'accept' in kwargs:
|
||||||
|
headers['Accept'] = kwargs['accept']
|
||||||
|
|
||||||
|
if is_post:
|
||||||
|
response = requests.post(url, json=data, headers=headers, timeout=10)
|
||||||
|
else:
|
||||||
|
response = requests.get(url, headers=headers, timeout=10)
|
||||||
|
|
||||||
|
response.raise_for_status()
|
||||||
|
|
||||||
|
next_page_url = None
|
||||||
|
link_header = response.headers.get('Link')
|
||||||
|
if link_header:
|
||||||
|
for link in link_header.split(','):
|
||||||
|
if 'rel="next"' in link:
|
||||||
|
next_page_url = link.split(';')[0].strip('<> ')
|
||||||
|
|
||||||
|
return {
|
||||||
|
'body': response.json(),
|
||||||
|
'status_code': response.status_code,
|
||||||
|
'headers': dict(response.headers),
|
||||||
|
'next_page_url': next_page_url,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
class DjangoToolConfig(ToolConfAbstract):
|
class DjangoToolConfig(ToolConfAbstract):
|
||||||
"""Tool configuration from Django models"""
|
"""Tool configuration from Django models"""
|
||||||
|
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ from pylti1p3.names_roles import NamesRolesProvisioningService
|
|||||||
from rbac.models import RBACMembership
|
from rbac.models import RBACMembership
|
||||||
from users.models import User
|
from users.models import User
|
||||||
|
|
||||||
from .adapters import DjangoToolConfig
|
from .adapters import DjangoServiceConnector, DjangoToolConfig
|
||||||
from .handlers import apply_lti_roles, generate_username_from_lti
|
from .handlers import apply_lti_roles, generate_username_from_lti
|
||||||
from .models import LTIUserMapping
|
from .models import LTIUserMapping
|
||||||
|
|
||||||
@@ -49,19 +49,18 @@ class LTINRPSClient:
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
def fetch_members(self):
|
def fetch_members(self):
|
||||||
"""
|
|
||||||
Fetch all course members from Moodle via NRPS
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
List of member dicts with keys: user_id, name, email, roles, etc.
|
|
||||||
"""
|
|
||||||
if not self.can_sync():
|
if not self.can_sync():
|
||||||
return []
|
return []
|
||||||
|
|
||||||
try:
|
try:
|
||||||
tool_config = DjangoToolConfig.from_platform(self.platform)
|
tool_config = DjangoToolConfig.from_platform(self.platform)
|
||||||
nrps = NamesRolesProvisioningService(tool_config, self.nrps_claim)
|
registration = tool_config.find_registration_by_issuer(self.platform.platform_id)
|
||||||
|
|
||||||
|
if not registration:
|
||||||
|
return []
|
||||||
|
|
||||||
|
service_connector = DjangoServiceConnector(registration)
|
||||||
|
nrps = NamesRolesProvisioningService(service_connector, self.nrps_claim)
|
||||||
members = nrps.get_members()
|
members = nrps.get_members()
|
||||||
|
|
||||||
return members
|
return members
|
||||||
|
|||||||
Reference in New Issue
Block a user