You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

198 lines
7.0 KiB
Python

# -*- coding: utf-8 -*-
import logging
from typing import List, Dict, Any, Union
from sqlalchemy import text
from website.db_mysql import get_session, to_json_list, to_json
"""
CREATE TABLE `enterprise_node` (
`id` int NOT NULL AUTO_INCREMENT,
`entity_suid` int NOT NULL COMMENT '企业uuid',
`name` varchar(255) DEFAULT '' COMMENT '企业name',
`parent` int DEFAULT NULL,
`addr` varchar(255) DEFAULT '',
`lola` varchar(255) DEFAULT '',
`contact` varchar(255) DEFAULT '',
`phone` varchar(255) DEFAULT '',
`comment` varchar(255) DEFAULT '',
`del` int DEFAULT '0',
`create_time` datetime DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='企业节点表'
"""
class EnterpriseNodeRepository(object):
def insert_node(self, node: dict) -> int:
with get_session() as session:
session.execute(
text(
"insert into "
"enterprise_node(suid, entity_id, entity_suid, name, parent, addr, lola, contact, phone, comment) "
"values (:suid, :entity_id, :entity_suid, :name, :parent, :addr, :lola, :contact, :phone, :comment)"
),
node,
)
# last_insert_id = session.execute(text("SELECT LAST_INSERT_ID()")).scalar()
# logging.info(f"last_insert_id: {last_insert_id}")
return 0
def update_node(self, node: dict) -> int:
with get_session() as session:
sql = (
"update enterprise_node "
"set name=:name, parent=:parent, addr=:addr, lola=:lola, contact=:contact, phone=:phone, comment=:comment where id=:id"
)
param = {
"id": node["id"],
"name": node["name"],
"parent": node["parent"],
"addr": node["addr"],
"lola": node["lola"],
"contact": node["contact"],
"phone": node["phone"],
"comment": node["comment"],
}
session.execute(text(sql), param)
return 0
def select_tree(self, entity_id: int, name: str = "") -> List[Dict[str, Any]]:
roots = []
with get_session() as session:
# sql = "select id, name, suid, parent from enterprise_node where entity_id=:entity_id and del=0 and parent=0 "
sql = (
"""
SELECT
n.id, n.name, n.parent, p.name AS parent_name, n.suid
FROM enterprise_node n
LEFT JOIN enterprise_node p ON n.parent = p.id
WHERE n.entity_id=:entity_id AND n.del=0
"""
)
if name == "":
sql += " and n.parent=0"
param = {"entity_id": entity_id}
if name:
sql += " and n.name like :name"
param["name"] = f"%{name}%"
# logging.info(f"############################# sql: {sql}, param: {param}")
res = session.execute(text(sql), param)
node_list = to_json_list(res)
node_list = node_list and node_list or []
for node in node_list:
root = {
"id": node["id"],
"name": node["name"],
"suid": node["suid"],
"parent": node["parent"],
"parent_name": node["parent_name"],
"children": self.build_tree(session, node, name),
}
roots.append(root)
# return node_list
return roots
def build_tree(
self, session: Any, node: Dict[str, Any], name: str = ""
) -> List[Any]:
# sql = (
# "select id, name, suid, parent from enterprise_node where del=0 and parent=:parent"
# )
sql = (
"""
SELECT
n.id, n.name, n.parent, p.name AS parent_name, n.suid
FROM enterprise_node n
LEFT JOIN enterprise_node p ON n.parent = p.id
WHERE n.parent=:parent AND n.del=0
"""
)
param = {"parent": node["id"]}
# if name:
# sql += " and n.name like :name"
# param["name"] = f"%{name}%"
res = session.execute(text(sql), param)
node_list = to_json_list(res)
node_list = node_list and node_list or []
children = []
for node in node_list:
child = {
"id": node["id"],
"name": node["name"],
"suid": node["suid"],
"parent": node["parent"],
"parent_name": node["parent_name"],
"children": self.build_tree(session, node, name),
}
children.append(child)
return children
def select_node(self, node_id: int) -> Dict[str, Any]:
with get_session() as session:
# sql = (
# "select id, name, parent, addr, lola, contact, phone, comment, suid from enterprise_node "
# "where id=:id and del=0"
# )
sql = (
"""
SELECT
n.id, n.name, n.parent, p.name AS parent_name, n.addr, n.lola, n.contact, n.phone, n.comment, n.suid
FROM enterprise_node n
LEFT JOIN enterprise_node p ON n.parent = p.id
WHERE n.id=:id AND n.del=0
"""
)
param = {"id": node_id}
res = session.execute(text(sql), param)
node_list = to_json_list(res)
node_list = node_list and node_list or []
return node_list[0] if node_list else None
def delete_node(self, node_id: int) -> int:
with get_session() as session:
sql = "update enterprise_node set del=1 where id=:id"
param = {"id": node_id}
session.execute(text(sql), param)
session.commit()
return 0
def get_node_by_id(self, node_id: int) -> dict:
with get_session() as session:
sql = "select suid, name from enterprise_node where id=:id"
param = {"id": node_id}
res = session.execute(text(sql), param)
node = to_json(res)
return node
def get_entity_suid_by_node_id(self, node_id: int) -> dict:
with get_session() as session:
res = session.execute(
text("select suid, entity_suid from enterprise_node where id=:id"),
{"id": node_id},
)
entity = to_json(res)
return entity
def simple_list(self, entity_id: int) -> List[Any]:
with get_session() as session:
sql = (
"select id, name from enterprise_node where del=0 and entity_id=:entity_id"
)
param = {"entity_id": entity_id}
res = session.execute(text(sql), param)
node_list = to_json_list(res)
return node_list