You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

194 lines
6.9 KiB
Python

# -*- coding: utf-8 -*-
import base64
10 months ago
import json
import logging
import uuid
10 months ago
from io import BytesIO
from sqlalchemy import text
10 months ago
from website import db_mysql
from website import errors
from website import settings
10 months ago
from website.handler import APIHandler, authenticated
from website.util import aes
from website.util import shortuuid
10 months ago
from website.util.captcha import create_validate_code
class CaptchaHandler(APIHandler):
def get(self):
self.set_header("Content-Type", "image/png")
image, image_str = create_validate_code()
c = uuid.uuid4().hex
token = self.create_signed_value("logc", c)
self.r_app.set("logincaptcha:%s" % c, image_str, ex=120)
buffered = BytesIO()
# 保存验证码图片
image.save(buffered, 'png')
img_b64 = base64.b64encode(buffered.getvalue())
# for line in buffered.getvalue():
# self.write(line)
# output.close()
self.finish({"token": self.tostr(token), "captcha": self.tostr(img_b64)})
10 months ago
class LogoutHandler(APIHandler):
def get(self):
if self.current_user:
# self.db_app.insert(
# "insert into system_log(user, ip, first_module, second_module, op_type, op_content, description) "
# "values(%s, %s, %s, %s, %s, %s, %s)",
# self.current_user.name, self.request.remote_ip, "平台管理中心", "账号管理", "登出", "系统登出", "系统登出"
# )
self.r_app.delete(settings.session_key_prefix % self.current_user.uuid)
self.finish()
10 months ago
class LoginHandler(APIHandler):
def post(self):
suid = shortuuid.ShortUUID().random(10)
logging.info(suid)
username = self.get_escaped_argument("username")
password = self.get_escaped_argument("pwd")
# captcha = self.get_escaped_argument("captcha", "")
# captcha_token = self.get_escaped_argument("captcha_token", "")
# wrong_time_lock = self.r_app.get("pwd:wrong:time:%s:lock" % self.tostr(username))
# if wrong_time_lock:
# raise errors.HTTPAPIError(errors.ERROR_BAD_REQUEST, "账号处于冷却期,请稍后再试")
# return
logging.info(self.request.body)
logging.info(self.request.arguments)
logging.info(password)
logging.info("#########################")
if not username or not password:
raise errors.HTTPAPIError(errors.ERROR_BAD_REQUEST, "请输入用户名和密码")
# if not captcha:
# raise errors.HTTPAPIError(errors.ERROR_BAD_REQUEST, "请输入验证码")
# if not captcha_token:
# raise errors.HTTPAPIError(errors.ERROR_BAD_REQUEST, "缺少参数")
# c = tornado.web.decode_signed_value(
# settings.cookie_secret,
# "logc",
# self.tostr(captcha_token)
# )
# code = self.r_app.get("logincaptcha:%s" % self.tostr(c))
# 清除校验码缓存
# self.r_app.delete("logincaptcha:%s" % c)
# if not code:
# raise errors.HTTPAPIError(errors.ERROR_BAD_REQUEST, "验证码已过期")
# 判断验证码与缓存是否一致
# if self.tostr(captcha).lower() != self.tostr(code).lower():
# raise errors.HTTPAPIError(errors.ERROR_BAD_REQUEST, "验证码错误")
username = self.tostr(username)
password = self.tostr(password)
pwd_enc = aes.encrypt(settings.pwd_aes_key, password)
row = {}
with self.app_mysql.connect() as conn:
cur = conn.execute(
text("select id, uid, available from sys_user where name=:name and pwd=:pwd"),
{"name": username, "pwd": pwd_enc}
)
# keys = list(cur.keys())
#
# one = cur.fetchone()
# row = dict(zip(keys, one))
# logging.info(db.Row(itertools.zip_longest(keys, one)))
12 months ago
row = db_mysql.to_json(cur)
cur.close()
# data = [dict(zip(keys, res)) for res in cur.fetchall()]
if not row:
# wrong_time = self.r_app.get("pwd:wrong:time:%s" % username)
# logging.info(wrong_time)
# logging.info(settings.pwd_error_limit - 1)
# if wrong_time and int(wrong_time) > settings.pwd_error_limit - 1:
# self.r_app.set("pwd:wrong:time:%s:lock" % username, 1, ex=3600)
# self.r_app.delete("pwd:wrong:time:%s" % username)
# else:
# self.r_app.incr("pwd:wrong:time:%s" % username)
raise errors.HTTPAPIError(errors.ERROR_BAD_REQUEST, "用户名或者密码错误")
return
if row["available"] == 0:
raise errors.HTTPAPIError(errors.ERROR_FORBIDDEN, "当前用户被禁用")
return
# row_role = self.db_app.get("select role from user_role where userid=%s", row["id"])
# user_role = row_role["role"]
userId = row["id"]
jsessionid = row["uid"]
# create sign value admin_login_sign
secure_cookie = self.create_signed_value(settings.secure_cookie_name, str(jsessionid))
self.r_app.set(
settings.session_key_prefix % jsessionid,
json.dumps({
"id": userId,
"name": username,
"uuid": row["uid"],
# "role": user_role
}),
ex=settings.session_ttl
)
# self.db_app.insert(
# "insert into system_log(user, ip, first_module, second_module, op_type, op_content, description) "
# "values(%s, %s, %s, %s, %s, %s, %s)",
# username, self.request.remote_ip, "平台管理中心", "账号管理", "登录", "系统登录", "系统登录"
# )
# license_row = self.db_app.get(
# "select expireat from license limit 1"
# )
# system_status = get_license_status(license_row)
render_data = {
"token": str(secure_cookie, encoding="utf-8"),
# "role": user_role,
"username": username,
# "system_status": system_status, # 9000/未激活, 9001/已激活, 9002/过期可查看, 9003/完全过期
}
self.finish(render_data)
class UserInfoHandler(APIHandler):
def post(self):
token = self.get_argument("token")
user = self.get_current_user(token_body=self.tostr(token))
if not user:
raise errors.HTTPAPIError(errors.ERROR_UNAUTHORIZED)
self.finish({"name": user.name, "role": user.role})
10 months ago
class UserListHandler(APIHandler):
@authenticated
def post(self):
with self.app_mysql.connect() as conn:
cur = conn.execute(
text(
"select name from sys_user"
)
)
res = db_mysql.to_json_list(cur)
names = [row["name"] for row in res]
self.finish({"data": names})