# -*- coding: utf-8 -*- import os import shutil from django.conf import settings from django.core.exceptions import PermissionDenied from django.core.files import File from django.http import JsonResponse from django.views import generic from files import helpers from files.helpers import rm_file from files.methods import user_allowed_to_upload from files.models import Category, Media, Tag from .fineuploader import ChunkedFineUploader from .forms import FineUploaderUploadForm, FineUploaderUploadSuccessForm class FineUploaderView(generic.FormView): http_method_names = ("post",) form_class_upload = FineUploaderUploadForm form_class_upload_success = FineUploaderUploadSuccessForm @property def concurrent(self): return settings.CONCURRENT_UPLOADS @property def chunks_done(self): return self.chunks_done_param_name in self.request.GET @property def chunks_done_param_name(self): return settings.CHUNKS_DONE_PARAM_NAME def make_response(self, data, **kwargs): return JsonResponse(data, **kwargs) def get_form(self, form_class=None): if self.chunks_done: form_class = self.form_class_upload_success else: form_class = self.form_class_upload return form_class(**self.get_form_kwargs()) def dispatch(self, request, *args, **kwargs): if not user_allowed_to_upload(request): raise PermissionDenied # HTTP 403 return super(FineUploaderView, self).dispatch(request, *args, **kwargs) def form_valid(self, form): self.upload = ChunkedFineUploader(form.cleaned_data, self.concurrent) if self.upload.concurrent and self.chunks_done: try: self.upload.combine_chunks() except FileNotFoundError: data = {"success": False, "error": "Error with File Uploading"} return self.make_response(data, status=400) elif self.upload.total_parts == 1: self.upload.save() else: self.upload.save() return self.make_response({"success": True}) media_file = os.path.join(settings.MEDIA_ROOT, self.upload.real_path) with open(media_file, "rb") as f: myfile = File(f) new = Media.objects.create(media_file=myfile, user=self.request.user, title=self.upload.original_filename) publish_to_category = self.request.POST.get('publish_to_category', '') or self.request.GET.get('publish_to_category', '') publish_to_category = publish_to_category.strip() if publish_to_category: category_uids = [uid.strip() for uid in publish_to_category.split(',') if uid.strip()] for category_uid in category_uids: category = Category.objects.filter(uid=category_uid).first() if category: has_access = self.request.user.has_contributor_access_to_category(category) or category.is_rbac_category is False if has_access: new.category.add(category) if category.is_lms_course: # Transform the title before get_or_create to match what Tag.save() does tag_title = helpers.get_alphanumeric_and_spaces(category.title) tag, created = Tag.objects.get_or_create(title=tag_title) new.tags.add(tag) rm_file(media_file) shutil.rmtree(os.path.join(settings.MEDIA_ROOT, self.upload.file_path)) return self.make_response({"success": True, "media_url": new.get_absolute_url()}) def form_invalid(self, form): data = {"success": False, "error": "%s" % repr(form.errors)} return self.make_response(data, status=400)