# -*- coding: utf-8 -*- import base64 import json import logging import uuid from io import BytesIO from sqlalchemy import text from website import db_mysql from website import errors from website import settings from website.handler import APIHandler, authenticated from website.util import aes from website.util import shortuuid from website.util.captcha import create_validate_code class CaptchaHandler(APIHandler): def get(self): self.set_header("Content-Type", "image/png") image, image_str = create_validate_code() c = uuid.uuid4().hex token = self.create_signed_value("logc", c) self.r_app.set("logincaptcha:%s" % c, image_str, ex=120) buffered = BytesIO() # 保存验证码图片 image.save(buffered, 'png') img_b64 = base64.b64encode(buffered.getvalue()) # for line in buffered.getvalue(): # self.write(line) # output.close() self.finish({"token": self.tostr(token), "captcha": self.tostr(img_b64)}) class LogoutHandler(APIHandler): def get(self): if self.current_user: # self.db_app.insert( # "insert into system_log(user, ip, first_module, second_module, op_type, op_content, description) " # "values(%s, %s, %s, %s, %s, %s, %s)", # self.current_user.name, self.request.remote_ip, "平台管理中心", "账号管理", "登出", "系统登出", "系统登出" # ) self.r_app.delete(settings.session_key_prefix % self.current_user.uuid) self.finish() class LoginHandler(APIHandler): def post(self): suid = shortuuid.ShortUUID().random(10) logging.info(suid) username = self.get_escaped_argument("username") password = self.get_escaped_argument("pwd") # captcha = self.get_escaped_argument("captcha", "") # captcha_token = self.get_escaped_argument("captcha_token", "") # wrong_time_lock = self.r_app.get("pwd:wrong:time:%s:lock" % self.tostr(username)) # if wrong_time_lock: # raise errors.HTTPAPIError(errors.ERROR_BAD_REQUEST, "账号处于冷却期,请稍后再试") # return logging.info(self.request.body) logging.info(self.request.arguments) logging.info(password) logging.info("#########################") if not username or not password: raise errors.HTTPAPIError(errors.ERROR_BAD_REQUEST, "请输入用户名和密码") # if not captcha: # raise errors.HTTPAPIError(errors.ERROR_BAD_REQUEST, "请输入验证码") # if not captcha_token: # raise errors.HTTPAPIError(errors.ERROR_BAD_REQUEST, "缺少参数") # c = tornado.web.decode_signed_value( # settings.cookie_secret, # "logc", # self.tostr(captcha_token) # ) # code = self.r_app.get("logincaptcha:%s" % self.tostr(c)) # 清除校验码缓存 # self.r_app.delete("logincaptcha:%s" % c) # if not code: # raise errors.HTTPAPIError(errors.ERROR_BAD_REQUEST, "验证码已过期") # 判断验证码与缓存是否一致 # if self.tostr(captcha).lower() != self.tostr(code).lower(): # raise errors.HTTPAPIError(errors.ERROR_BAD_REQUEST, "验证码错误") username = self.tostr(username) password = self.tostr(password) pwd_enc = aes.encrypt(settings.pwd_aes_key, password) row = {} with self.app_mysql.connect() as conn: cur = conn.execute( text("select id, uid, available from sys_user where name=:name and pwd=:pwd"), {"name": username, "pwd": pwd_enc} ) # keys = list(cur.keys()) # # one = cur.fetchone() # row = dict(zip(keys, one)) # logging.info(db.Row(itertools.zip_longest(keys, one))) row = db_mysql.to_json(cur) cur.close() # data = [dict(zip(keys, res)) for res in cur.fetchall()] if not row: # wrong_time = self.r_app.get("pwd:wrong:time:%s" % username) # logging.info(wrong_time) # logging.info(settings.pwd_error_limit - 1) # if wrong_time and int(wrong_time) > settings.pwd_error_limit - 1: # self.r_app.set("pwd:wrong:time:%s:lock" % username, 1, ex=3600) # self.r_app.delete("pwd:wrong:time:%s" % username) # else: # self.r_app.incr("pwd:wrong:time:%s" % username) raise errors.HTTPAPIError(errors.ERROR_BAD_REQUEST, "用户名或者密码错误") return if row["available"] == 0: raise errors.HTTPAPIError(errors.ERROR_FORBIDDEN, "当前用户被禁用") return # row_role = self.db_app.get("select role from user_role where userid=%s", row["id"]) # user_role = row_role["role"] userId = row["id"] jsessionid = row["uid"] # create sign value admin_login_sign secure_cookie = self.create_signed_value(settings.secure_cookie_name, str(jsessionid)) self.r_app.set( settings.session_key_prefix % jsessionid, json.dumps({ "id": userId, "name": username, "uuid": row["uid"], # "role": user_role }), ex=settings.session_ttl ) # self.db_app.insert( # "insert into system_log(user, ip, first_module, second_module, op_type, op_content, description) " # "values(%s, %s, %s, %s, %s, %s, %s)", # username, self.request.remote_ip, "平台管理中心", "账号管理", "登录", "系统登录", "系统登录" # ) # license_row = self.db_app.get( # "select expireat from license limit 1" # ) # system_status = get_license_status(license_row) render_data = { "token": str(secure_cookie, encoding="utf-8"), # "role": user_role, "username": username, # "system_status": system_status, # 9000/未激活, 9001/已激活, 9002/过期可查看, 9003/完全过期 } self.finish(render_data) class UserInfoHandler(APIHandler): def post(self): # token = self.get_argument("token") # user = self.get_current_user(token_body=self.tostr(token)) user = self.get_current_user() if not user: raise errors.HTTPAPIError(errors.ERROR_UNAUTHORIZED) self.finish({"name": user.name, "avtar": ""}) class UserListHandler(APIHandler): @authenticated def post(self): with self.app_mysql.connect() as conn: cur = conn.execute( text( "select name from sys_user" ) ) res = db_mysql.to_json_list(cur) names = [row["name"] for row in res] self.finish({"data": names})