You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

85 lines
3.1 KiB
Python

# -*- coding: utf-8 -*-
import logging
import hashlib
import re
import os
from sqlalchemy import text
from website import errors
from website import db
from website import settings
from website.handler import APIHandler, authenticated
class UploadHandler(APIHandler):
@authenticated
def post(self):
file_metas = self.request.files.get('file', None)
if not file_metas:
raise errors.HTTPAPIError(errors.ERROR_BAD_REQUEST, "请选择文件")
filename = file_metas[0].filename
punctuation = """!"#$%&'()*+,/:;<=>?@[\]^`{|}~ """
regex = re.compile('[%s]' % re.escape(punctuation))
filename = regex.sub("", filename.replace('..', ''))
file_size = len(file_metas[0].body)
logging.info("file_size: %s", file_size)
if file_size > 300 * 1024 * 1024:
raise errors.HTTPAPIError(errors.ERROR_BAD_REQUEST, 'Exceed 300M size limit')
filetype = filename.split(".") and filename.split(".")[-1] or ""
file_upload_dir = settings.file_upload_dir
os.makedirs(file_upload_dir, exist_ok=True)
md5_str = hashlib.md5(file_metas[0].body).hexdigest()
row = None
with self.app_mysql.connect() as conn:
sql = text("select id from files where md5_str=:md5_str")
cur = conn.execute(sql, {"md5_str": md5_str})
row = cur.fetchone()
if not row:
filepath = os.path.join(settings.file_upload_dir, md5_str + '_' + filename)
if not os.path.exists(filepath):
for meta in file_metas:
# filename = meta['filename']
with open(filepath, 'wb') as f:
f.write(meta['body'])
with self.app_mysql.connect() as conn:
sql = text("insert into files(filename, filepath, md5_str, filetype, user) values(:filename, :filepath, :md5_str, :filetype, :user)")
conn.execute(sql, {"filename": filename, "filepath": filepath, "md5_str": md5_str, "filetype": filetype, "user": self.current_user.id})
conn.commit()
self.finish({"result": md5_str})
class DeleteHandler(APIHandler):
@authenticated
def post(self):
md5_str = self.get_escaped_argument("file_md5", "")
if not md5_str:
raise errors.HTTPAPIError(errors.ERROR_BAD_REQUEST, "file md5 is required")
logging.info("md5_str: %s", md5_str)
row = None
with self.app_mysql.connect() as conn:
sql = text("select filepath from files where md5_str=:md5_str")
cur = conn.execute(sql, {"md5_str": md5_str})
row = db.to_json(cur)
if not row:
raise errors.HTTPAPIError(errors.ERROR_BAD_REQUEST, "file not found")
filepath = row["filepath"]
if os.path.exists(filepath):
os.remove(filepath)
sql_del = text("delete from files where md5_str=:md5_str")
conn.execute(sql_del, {"md5_str": md5_str})
conn.commit()
self.finish()