更新rsa签名

release
周平 12 months ago
parent 103ee99d26
commit cb4011a1c1

@ -108,6 +108,8 @@ services:
# - /data/app/lemon:/app/lemon # 如果由相同的设置则会覆盖Dockfile中COPY的文件 # - /data/app/lemon:/app/lemon # 如果由相同的设置则会覆盖Dockfile中COPY的文件
- /data/app/log:/app/log - /data/app/log:/app/log
- /data/app/fileupload:/app/fileupload - /data/app/fileupload:/app/fileupload
- /usr/sbin/dmidecode:/usr/sbin/dmidecode # dmidecode命令需要root权限否则会报错
- /dev/mem:/dev/mem # /dev/mem需要root权限否则会报错
environment: environment:
- TZ=Asia/Shanghai - TZ=Asia/Shanghai
networks: # 配置网络 networks: # 配置网络

@ -16,6 +16,9 @@ services:
# - /data/app/lemon:/app/lemon # - /data/app/lemon:/app/lemon
- /data/app/log:/app/log - /data/app/log:/app/log
- /data/app/fileupload:/app/fileupload - /data/app/fileupload:/app/fileupload
- ./private_key.pem:/app/lemon/private_key.pem
- /usr/sbin/dmidecode:/usr/sbin/dmidecode
- /dev/mem:/dev/mem
environment: environment:
- TZ=Asia/Shanghai - TZ=Asia/Shanghai
networks: networks:

@ -31,6 +31,8 @@ mysql_app = {
} }
redis_app = ("lemon_redis", 6379, 0, "hgkiYY87") redis_app = ("lemon_redis", 6379, 0, "hgkiYY87")
file_upload_dir = "/app/fileupload" file_upload_dir = "/app/fileupload"
rsa_private_file = "/app/lemon/private_key.pem"
rsa_license_file = "/app/lemon/license"
EOF EOF
fi fi

@ -40,4 +40,9 @@ device_status_map = {
1002: u"离线", 1002: u"离线",
1003: u"运行中", 1003: u"运行中",
1004: u"故障", 1004: u"故障",
} }
# 系统状态
system_status_not_active = 9000 # 未激活
system_status_activated = 9001 # 已激活
system_status_expire_atall = 9003 # 完全过期

@ -1,21 +1,19 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""系统信息""" """系统信息"""
import datetime
import json
import logging import logging
import uuid
import time
import re import re
import os import time
import json
import hashlib
import datetime from sqlalchemy import text
from website import consts
from website import errors from website import errors
from website import settings from website import settings
from website.handler import APIHandler, WebHandler, authenticated, operation_log, permission
from website.db_mysql import to_json_list from website.db_mysql import to_json_list
from website.util import sysinfo, rsa from website.handler import APIHandler, authenticated
from sqlalchemy import text from website.util import sysinfo, rsa_oaep
class VersionHandler(APIHandler): class VersionHandler(APIHandler):
@ -24,8 +22,12 @@ class VersionHandler(APIHandler):
def post(self): def post(self):
self.finish() self.finish()
class IdentifycodeHandler(APIHandler): class IdentifycodeHandler(APIHandler):
@authenticated """系统识别码"""
# @authenticated
# @permission([100014, 100015]) # @permission([100014, 100015])
# @operation_log("资产管理中心", "系统激活", "查询", "查询本地识别码", "查询本地识别码") # @operation_log("资产管理中心", "系统激活", "查询", "查询本地识别码", "查询本地识别码")
def post(self): def post(self):
@ -33,6 +35,7 @@ class IdentifycodeHandler(APIHandler):
self.finish({"result": code}) self.finish({"result": code})
class LicenseUploadHandler(APIHandler): class LicenseUploadHandler(APIHandler):
@authenticated @authenticated
# @permission([100014, 100015]) # @permission([100014, 100015])
@ -55,45 +58,44 @@ class LicenseUploadHandler(APIHandler):
if file_size > 10 * 1024 * 1024: if file_size > 10 * 1024 * 1024:
raise errors.HTTPAPIError(errors.ERROR_METHOD_NOT_ALLOWED, 'Exceed 10M size limit') raise errors.HTTPAPIError(errors.ERROR_METHOD_NOT_ALLOWED, 'Exceed 10M size limit')
md5_str = hashlib.md5(file.body).hexdigest()
filepath = settings.rsa_license_file filepath = settings.rsa_license_file
try: try:
body = file['body'] body = file['body']
public_key = rsa.load_pub_key_string(open(settings.rsa_public_file).read().strip('\n').encode('utf-8')) plaintext = rsa_oaep.decrypt_message_pri(
plaintext = rsa.decrypt(public_key, body) open(settings.rsa_private_file).read().strip('\n').encode('utf-8'), body)
plaintext_json = json.loads(self.tostr(plaintext)) plaintext_json = json.loads(self.tostr(plaintext))
syscode = plaintext_json["syscode"] syscode = plaintext_json["sys_code"]
expireat = plaintext_json["expireat"] expireat = plaintext_json["expire_at"]
current_syscode = sysinfo.get_identify_code() current_syscode = sysinfo.get_idntify_code_v2()
if syscode != current_syscode: if syscode != current_syscode:
raise errors.HTTPAPIError(errors.ERROR_BAD_REQUEST, "license激活失败请重新激活") raise errors.HTTPAPIError(errors.ERROR_BAD_REQUEST, "license激活失败请重新激活")
row = self.db_app.get("select id from license where syscode=%s", syscode) with self.app_mysql.connect() as conn:
if row: conn.execute(text(
self.db_app.update( "update sys_license set syscode=:syscode, expireat=:expireat, status=:status",
"update license set expireat=%s where syscode=%s", str(expireat), syscode {
) "syscode": syscode,
else: "expireat": expireat,
self.db_app.insert( "status": consts.system_status_activated
"insert into license(syscode, expireat) values(%s, %s)", }
syscode, expireat ))
) conn.commit()
self.r_app.set("system:license", json.dumps({"syscode":syscode, "expireat":expireat}))
self.r_app.set("system:license", json.dumps({"syscode": syscode, "expireat": expireat}))
with open(filepath, 'wb') as f: with open(filepath, 'wb') as f:
f.write(file['body']) f.write(file['body'])
logging.info(plaintext_json)
except Exception as e: except Exception as e:
logging.info(e) logging.info(e)
raise errors.HTTPAPIError(errors.ERROR_BAD_REQUEST, "license激活失败请重新激活") raise errors.HTTPAPIError(errors.ERROR_BAD_REQUEST, "license激活失败请重新激活")
self.finish() self.finish()
class ActivateInfoHandler(APIHandler): class ActivateInfoHandler(APIHandler):
@authenticated @authenticated
# @permission([100014, 100015]) # @permission([100014, 100015])
@ -128,7 +130,6 @@ class ActivateInfoHandler(APIHandler):
class InfoHandler(APIHandler): class InfoHandler(APIHandler):
def post(self): def post(self):
self.finish() self.finish()
@ -167,4 +168,4 @@ class LogHandler(APIHandler):
res = conn.execute(text(sql), p) res = conn.execute(text(sql), p)
data = to_json_list(res) data = to_json_list(res)
self.finish({"count": count, "data": data}) self.finish({"count": count, "data": data})

@ -8,7 +8,6 @@ handlers = [
("/system/license/upload", handler.LicenseUploadHandler), ("/system/license/upload", handler.LicenseUploadHandler),
("/system/activate/info", handler.ActivateInfoHandler), ("/system/activate/info", handler.ActivateInfoHandler),
("/system/info", handler.InfoHandler), ("/system/info", handler.InfoHandler),
("/system/log", handler.LogHandler), ("/system/log", handler.LogHandler),
] ]

@ -78,8 +78,11 @@ enterprise_aes_key = "FquMBlcVoIkTAmL7"
file_upload_dir = "/data/fileupload" file_upload_dir = "/data/fileupload"
rsa_public_file = "/home/app/public" system_salt = "5bVQmI0ATh+QITf75WgVchT6TPN1DEOasSmrtMcTsPQ="
# rsa_public_file = "/home/app/public"
rsa_license_file = "/home/app/license" rsa_license_file = "/home/app/license"
rsa_private_file = "/home/app/private_key.pem"
# hashlib.sha256(base64.b64encode(uuid.uuid4().bytes + uuid.uuid4().bytes)).hexdigest() # hashlib.sha256(base64.b64encode(uuid.uuid4().bytes + uuid.uuid4().bytes)).hexdigest()

@ -1,22 +1,60 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from M2Crypto import RSA
from M2Crypto import BIO
from binascii import a2b_hex, b2a_hex from binascii import a2b_hex, b2a_hex
from M2Crypto import BIO
from M2Crypto import RSA
def load_pub_key_string(string): def load_pub_key_string(string):
bio = BIO.MemoryBuffer(string) bio = BIO.MemoryBuffer(string)
return RSA.load_pub_key_bio(bio) return RSA.load_pub_key_bio(bio)
def block_data(texts, block_size): def block_data(texts, block_size):
for i in range(0, len(texts), block_size): for i in range(0, len(texts), block_size):
yield texts[i:i + block_size] yield texts[i:i + block_size]
def decrypt(publick_key, texts):
def encrypt(texts):
ciphertext = b""
block_size = 256 - 11
for text in block_data(texts.encode('utf-8'), block_size):
current_text = pri_key.private_encrypt(text, RSA.pkcs1_padding)
ciphertext += current_text
return b2a_hex(ciphertext)
def decrypt(texts):
plaintext = b"" plaintext = b""
block_size = 256 block_size = 256
for text in block_data(a2b_hex(texts), block_size): for text in block_data(a2b_hex(texts), block_size):
current_text = publick_key.public_decrypt(text, RSA.pkcs1_padding) current_text = pub_key.public_decrypt(text, RSA.pkcs1_padding)
plaintext += current_text plaintext += current_text
return plaintext return plaintext
if __name__ == '__main__':
# 2048代表生成密钥的位数65537代表公钥的指数
key = RSA.gen_key(2048, 65537)
key.save_key("private_key", None)
key.save_pub_key("public_key")
prikey = open("private_key").read()
pubkey = open("public_key").read()
pri_key = RSA.load_key_string(prikey.strip('\n').encode('utf-8'))
pub_key = load_pub_key_string(pubkey.strip('\n').encode('utf-8'))
texts = "hellohellohellohellohellohellohellohellohellohellohellohellohello" \
"hellohellohellohellohellohellohellohellohellohellohellohellohello" \
"hellohellohellohellohellohellohellohellohellohellohellohellohello" \
"hellohellohellohellohellohellohellohellohellohellohello"
ciphertext = encrypt(texts)
print(ciphertext)
plaintext = decrypt(ciphertext)
print(plaintext)

@ -1,75 +0,0 @@
# -*- coding: utf-8 -*-
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_OAEP
import base64
"""
RSA 加密中,有两种常见的填充方式:PKCS1_v1_5 PKCS1_OAEP这两种填充方式在安全性和性能方面都有一些差异
PKCS1_v1_5 填充方式:
这是较早的 RSA 填充方式,相对简单且性能较好
但是它存在一些安全隐患,比如可能会受到选择密文攻击(Chosen Ciphertext Attack, CCA)
PKCS1_OAEP 填充方式:
PKCS1_OAEP 是一种更加安全的填充方式,它使用了随机填充来提高安全性
PKCS1_OAEP 可以抵御选择密文攻击(CCA)和其他一些攻击方式,因此被认为更加安全
但是,PKCS1_OAEP 的性能略低于 PKCS1_v1_5,因为它需要进行更多的计算
"""
# 生成密钥对
def generate_keys():
key = RSA.generate(2048)
private_key = key.export_key()
public_key = key.publickey().export_key()
return private_key, public_key
# 加密消息message为bytes类型
def encrypt_message(public_key, message):
cipher = PKCS1_OAEP.new(RSA.import_key(public_key))
encrypted_message = base64.b64encode(cipher.encrypt(message))
print("Encrypted message:", encrypted_message.decode())
return encrypted_message
# 解密消息
def decrypt_message(private_key, encrypted_message):
decipher = PKCS1_OAEP.new(RSA.import_key(private_key))
decrypted_message = decipher.decrypt(base64.b64decode(encrypted_message))
print("Decrypted message:", decrypted_message.decode())
return decrypted_message.decode()
# 主程序
if __name__ == "__main__":
# 生成密钥对
private_key, public_key = generate_keys()
print(private_key)
print(public_key)
# 序列化公钥和私钥
# private_pem = private_key.private_bytes(
# encoding=serialization.Encoding.PEM,
# format=serialization.PrivateFormat.PKCS8,
# encryption_algorithm=serialization.NoEncryption()
# )
# public_pem = public_key.public_bytes(
# encoding=serialization.Encoding.PEM,
# format=serialization.PublicFormat.SubjectPublicKeyInfo
# )
# 打印公钥和私钥
print("Private Key:")
print(private_key.decode())
print("Public Key:")
print(public_key.decode())
# 待加密消息
message = b"Hello, RSA!"
# 加密消息
encrypted_message = encrypt_message(public_key, message)
print("Encrypted Message:")
print(encrypted_message)
# 解密消息
decrypted_message = decrypt_message(private_key, encrypted_message)
print("Decrypted Message:")
print(decrypted_message)

@ -0,0 +1,114 @@
# -*- coding: utf-8 -*-
import argparse
import base64
import json
import sys
from Crypto.Cipher import PKCS1_OAEP
from Crypto.PublicKey import RSA
"""
RSA 加密中,有两种常见的填充方式:PKCS1_v1_5 PKCS1_OAEP这两种填充方式在安全性和性能方面都有一些差异
PKCS1_v1_5 填充方式:
这是较早的 RSA 填充方式,相对简单且性能较好
但是它存在一些安全隐患,比如可能会受到选择密文攻击(Chosen Ciphertext Attack, CCA)
PKCS1_OAEP 填充方式:
PKCS1_OAEP 是一种更加安全的填充方式,它使用了随机填充来提高安全性
PKCS1_OAEP 可以抵御选择密文攻击(CCA)和其他一些攻击方式,因此被认为更加安全
但是,PKCS1_OAEP 的性能略低于 PKCS1_v1_5,因为它需要进行更多的计算
"""
# 生成密钥对
def generate_keys():
key = RSA.generate(2048)
private_key = key.export_key()
public_key = key.publickey().export_key()
return private_key, public_key
# 公钥加密消息message为bytes类型
def encrypt_message_pub(public_key, message):
cipher = PKCS1_OAEP.new(RSA.import_key(public_key))
encrypted_message = base64.b64encode(cipher.encrypt(message))
# print("Encrypted message:", encrypted_message.decode())
return encrypted_message
# 私钥解密消息
def decrypt_message_pri(private_key, encrypted_message):
decipher = PKCS1_OAEP.new(RSA.import_key(private_key))
decrypted_message = decipher.decrypt(base64.b64decode(encrypted_message))
# print("Decrypted message:", decrypted_message.decode())
return decrypted_message.decode()
def test():
# 生成密钥对
private_key, public_key = generate_keys()
print(private_key)
print(public_key)
# 打印公钥和私钥
print("Private Key:")
print(private_key.decode())
print("Public Key:")
print(public_key.decode())
# 待加密消息
# message = b"Hello, RSA!"
message = "Hello, RSA!".encode()
# 加密消息
encrypted_message = encrypt_message_pub(public_key, message)
print("Encrypted Message:")
print(encrypted_message)
# 解密消息
decrypted_message = decrypt_message_pri(private_key, encrypted_message)
print("Decrypted Message:")
print(decrypted_message)
# 主程序
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='manual to sign enterprise license')
parser.add_argument("--o", type=str, help="operation type", required=True) # name, t/test, s/sign, g/generate key, d/decrypt
parser.add_argument("--c", type=str, help="enterprise's sys code") # code
parser.add_argument("--e", type=str, help="expire date") # expire
args = parser.parse_args()
operation = args.o
if operation == "t":
test()
elif operation == "g":
private_key, public_key = generate_keys()
with open("private_key.pem", "wb") as f:
f.write(private_key)
with open("public_key.pem", "wb") as f:
f.write(public_key)
elif operation == "s":
code = args.c
expire = args.e
if not code or not expire:
print("sys code and expire date are required")
sys.exit(1)
pub_key = open("public_key.pem", "r").read()
license = encrypt_message_pub(pub_key.strip('\n').encode('utf-8'),
json.dumps({"sys_code": code, "expire_at": expire}).encode("utf-8"))
with open("license", "wb") as f:
f.write(license)
elif operation == "d":
private_key = open("private_key.pem", "r").read()
with open("license", "rb") as f:
license = f.read()
# 解密消息
body = decrypt_message_pri(private_key.strip('\n').encode('utf-8'), license)
json_body = json.loads(body)
print(json_body)

@ -4,10 +4,10 @@ import os
import socket import socket
import subprocess import subprocess
import uuid import uuid
from website import settings
def get_cpu_id(): def get_cpu_id():
p = subprocess.Popen(["dmidecode -t 4 | grep ID"], p = subprocess.Popen(["dmidecode -t 4 | grep ID | tail -1"],
shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
data = p.stdout data = p.stdout
lines = [] lines = []
@ -23,6 +23,75 @@ def get_cpu_id():
return lines return lines
def get_system_uuid():
p = subprocess.Popen(["dmidecode -s system-uuid"],
shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
data = p.stdout
lines = []
while True:
line = str(data.readline(), encoding="utf-8")
if line == '\n':
break
if line:
lines.append(line)
else:
break
print("system uuid {}".format(lines))
return lines
def get_system_manufacture():
p = subprocess.Popen(["dmidecode -s processor-manufacturer | tail -1"],
shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
data = p.stdout
lines = []
while True:
line = str(data.readline(), encoding="utf-8")
if line == '\n':
break
if line:
lines.append(line)
else:
break
return lines
def get_board_manufacturer():
p = subprocess.Popen(["dmidecode -s baseboard-manufacturer"],
shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
data = p.stdout
lines = []
while True:
line = str(data.readline(), encoding="utf-8")
if line == '\n':
break
if line:
lines.append(line)
else:
break
return lines
def get_board_serial_number():
p = subprocess.Popen(["dmidecode -s baseboard-serial-number"],
shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
data = p.stdout
lines = []
while True:
line = str(data.readline(), encoding="utf-8")
if line == '\n':
break
if line:
lines.append(line)
else:
break
return lines
def get_board_serialnumber(): def get_board_serialnumber():
p = subprocess.Popen(["dmidecode -t 2 | grep Serial"], p = subprocess.Popen(["dmidecode -t 2 | grep Serial"],
shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
@ -40,6 +109,37 @@ def get_board_serialnumber():
return lines return lines
def get_idntify_code_v2():
cpuid = get_cpu_id()
system_uuid = get_system_uuid()
system_manufacture = get_system_manufacture()
board_manufacturer = get_board_manufacturer()
board_serial_number = get_board_serial_number()
s = ""
if cpuid:
s += cpuid[0]["ID"]
print("cpuid: ", cpuid[0]["ID"])
if system_uuid:
print(system_uuid)
s += system_uuid[0].strip()
if system_manufacture:
print(system_manufacture)
s += system_manufacture[0].strip()
if board_manufacturer:
print(board_manufacturer)
s += board_manufacturer[0].strip()
if board_serial_number:
print(board_serial_number)
s += board_serial_number[0].strip()
s += settings.system_salt
# system_salt = "5bVQmI0ATh+QITf75WgVchT6TPN1DEOasSmrtMcTsPQ="
# s += system_salt
code = hashlib.sha256(s.encode("utf8")).hexdigest()
return code
def get_identify_code(): def get_identify_code():
mac = uuid.UUID(int=uuid.getnode()).hex[-12:] mac = uuid.UUID(int=uuid.getnode()).hex[-12:]
mac_addr = ":".join([mac[e:e + 2] for e in range(0, 11, 2)]) mac_addr = ":".join([mac[e:e + 2] for e in range(0, 11, 2)])
@ -65,14 +165,6 @@ def get_identify_code():
return code return code
def get_system_uuid():
# 获取系统uuid
# 这个 UUID 是与硬件相关的,因此即使在 Docker 容器中,它也应该是唯一的,可以用来标识宿主机,而不是容器本身。
with open("/sys/class/dmi/id/product_uuid", "r") as f:
host_uuid = f.read().strip()
return host_uuid
def get_docker_container_id(): def get_docker_container_id():
# 获取当前 Docker 容器的 ID # 获取当前 Docker 容器的 ID
cmd = "cat /proc/self/cgroup" cmd = "cat /proc/self/cgroup"
@ -84,3 +176,7 @@ def get_docker_container_id():
else: else:
container_id = container_message.strip().split("docker/")[-1] container_id = container_message.strip().split("docker/")[-1]
return container_id return container_id
if __name__ == "__main__":
print(get_idntify_code_v2())
Loading…
Cancel
Save