Files
mediacms/uploader/views.py
Markos Gogoulos 5fecda02d6 w
2026-01-09 13:40:20 +02:00

96 lines
3.7 KiB
Python

# -*- coding: utf-8 -*-
import os
import shutil
from django.conf import settings
from django.core.exceptions import PermissionDenied
from django.core.files import File
from django.http import JsonResponse
from django.views import generic
from files import helpers
from files.helpers import rm_file
from files.methods import user_allowed_to_upload
from files.models import Category, Media, Tag
from .fineuploader import ChunkedFineUploader
from .forms import FineUploaderUploadForm, FineUploaderUploadSuccessForm
class FineUploaderView(generic.FormView):
http_method_names = ("post",)
form_class_upload = FineUploaderUploadForm
form_class_upload_success = FineUploaderUploadSuccessForm
@property
def concurrent(self):
return settings.CONCURRENT_UPLOADS
@property
def chunks_done(self):
return self.chunks_done_param_name in self.request.GET
@property
def chunks_done_param_name(self):
return settings.CHUNKS_DONE_PARAM_NAME
def make_response(self, data, **kwargs):
return JsonResponse(data, **kwargs)
def get_form(self, form_class=None):
if self.chunks_done:
form_class = self.form_class_upload_success
else:
form_class = self.form_class_upload
return form_class(**self.get_form_kwargs())
def dispatch(self, request, *args, **kwargs):
if not user_allowed_to_upload(request):
raise PermissionDenied # HTTP 403
return super(FineUploaderView, self).dispatch(request, *args, **kwargs)
def form_valid(self, form):
self.upload = ChunkedFineUploader(form.cleaned_data, self.concurrent)
if self.upload.concurrent and self.chunks_done:
try:
self.upload.combine_chunks()
except FileNotFoundError:
data = {"success": False, "error": "Error with File Uploading"}
return self.make_response(data, status=400)
elif self.upload.total_parts == 1:
self.upload.save()
else:
self.upload.save()
return self.make_response({"success": True})
media_file = os.path.join(settings.MEDIA_ROOT, self.upload.real_path)
with open(media_file, "rb") as f:
myfile = File(f)
new = Media.objects.create(media_file=myfile, user=self.request.user, title=self.upload.original_filename)
publish_to_category = self.request.POST.get('publish_to_category', '') or self.request.GET.get('publish_to_category', '')
publish_to_category = publish_to_category.strip()
if publish_to_category:
category_uids = [uid.strip() for uid in publish_to_category.split(',') if uid.strip()]
for category_uid in category_uids:
category = Category.objects.filter(uid=category_uid).first()
if category:
has_access = self.request.user.has_contributor_access_to_category(category) or category.is_rbac_category is False
if has_access:
new.category.add(category)
if category.is_lms_course:
# Transform the title before get_or_create to match what Tag.save() does
tag_title = helpers.get_alphanumeric_and_spaces(category.title)
tag, created = Tag.objects.get_or_create(title=tag_title)
new.tags.add(tag)
rm_file(media_file)
shutil.rmtree(os.path.join(settings.MEDIA_ROOT, self.upload.file_path))
return self.make_response({"success": True, "media_url": new.get_absolute_url()})
def form_invalid(self, form):
data = {"success": False, "error": "%s" % repr(form.errors)}
return self.make_response(data, status=400)