# -*- coding: utf-8 -*- import logging from sqlalchemy import Column, Integer, String, DateTime, func from sqlalchemy.ext.declarative import declarative_base from website import errors from website.db.device_classification.device_classification import DeviceClassification from website.db.enterprise_entity.enterprise_entity import EnterpriseEntityRepository from website.db.enterprise_node.enterprise_node import EnterpriseNodeRepository from website.db_mysql import get_session from website.util import shortuuid def row2dict(row): d = {} for column in row.__table__.columns: d[column.name] = str(getattr(row, column.name)) return d Base = declarative_base() """ CREATE TABLE `enterprise_device` ( `id` int NOT NULL AUTO_INCREMENT, `suid` varchar(10) DEFAULT NULL COMMENT '设备suid', `entity_id` int NOT NULL COMMENT '企业id', `entity_suid` varchar(10) NOT NULL COMMENT '企业suid', `node_id` int NOT NULL COMMENT '节点id', `node_suid` varchar(10) NOT NULL COMMENT '节点suid', `classification` varchar(10) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT '' COMMENT '分类suid', `name` varchar(255) NOT NULL, `addr` varchar(255) DEFAULT '', `device_model` varchar(255) DEFAULT '' COMMENT '设备型号', `param` varchar(255) DEFAULT '', `comment` varchar(255) DEFAULT '', `del` int DEFAULT '0', `create_time` datetime DEFAULT CURRENT_TIMESTAMP, `update_time` datetime DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='企业设备表'; """ class EnterpriseDevice(Base): __tablename__ = 'enterprise_device' id = Column(Integer, primary_key=True) suid = Column(String(length=10), default="") entity_id = Column(Integer) entity_suid = Column(String) node_id = Column(Integer) node_suid = Column(String) classification = Column(String) name = Column(String) addr = Column(String) device_model = Column(String) param = Column(String) comment = Column(String) delete = Column("del", Integer, default=0) create_time = Column(DateTime, default=func.now()) update_time = Column(DateTime, default=func.now()) class EnterpriseDeviceRepository(object): def add_device(self, device): entity_id = device["entity_id"] node_id = device["node_id"] entity_suid = EnterpriseEntityRepository().get_entity_suid(entity_id) node = EnterpriseNodeRepository().get_node_by_id(node_id) node_suid = node["suid"] device["entity_suid"] = entity_suid device["node_suid"] = node_suid device["suid"] = shortuuid.ShortUUID().random(10) name = device["name"] with get_session() as session: existing_device = session.query(EnterpriseDevice) \ .filter_by(node_id=node_id, name=name, delete=0) \ .first() if existing_device: logging.error(f"Failed to add device: device with node_id={node_id} and name={name} already exists") raise errors.HTTPAPIError(errors.ERROR_BAD_REQUEST, "设备已存在") new_device = EnterpriseDevice(**device) try: # session.execute( # text( # """INSERT INTO enterprise_device # (suid, entity_id, entity_suid, node_id, node_suid, classification, name, addr, device_model, param, comment) # values (:suid, :entity_id, :entity_suid, :node_id, :node_suid, :classification, :name, :addr, :device_model, :param, :comment)""" # ), # device) session.add(new_device) except Exception as e: logging.error("Failed to add device") raise e return def edit_device(self, device): device_id = device["id"] with get_session() as session: try: session.query(EnterpriseDevice).filter(EnterpriseDevice.id == device_id).update(device) except Exception as e: logging.error("Failed to edit device") raise e return def delete_device(self, device_id): with get_session() as session: try: session.query(EnterpriseDevice).filter(EnterpriseDevice.id == device_id).update({"delete": 1}) except Exception as e: logging.error("Failed to delete device") raise e return def list_devices(self, node_id: int, pageNo: int, pageSize: int) -> dict: with get_session() as session: try: total_count = session.query(EnterpriseDevice) \ .filter(EnterpriseDevice.node_id == node_id, EnterpriseDevice.delete != 1) \ .count() devices = session.query(EnterpriseDevice, DeviceClassification.name.label("classification_name")) \ .join(DeviceClassification, EnterpriseDevice.classification == DeviceClassification.suid) \ .filter(EnterpriseDevice.node_id == node_id, EnterpriseDevice.delete != 1) \ .order_by(EnterpriseDevice.id.desc()) \ .limit(pageSize) \ .offset((pageNo - 1) * pageSize) \ .all() except Exception as e: logging.error("Failed to list devices") raise e device_dicts = [] for device, classification_name in devices: device_dict = { "id": device.id, "suid": device.suid, "entity_id": device.entity_id, "entity_suid": device.entity_suid, "node_id": device.node_id, "node_suid": device.node_suid, "classification": classification_name, "name": device.name, "addr": device.addr, "device_model": device.device_model, "param": device.param, "comment": device.comment, "delete": device.delete, "create_time": device.create_time.strftime('%Y-%m-%d %H:%M:%S'), "update_time": str(device.update_time) } device_dicts.append(device_dict) # for row in devices: # logging.info(row.name) logging.info(device_dicts) return { "devices": device_dicts, "total_count": total_count } def get_device(self, device_id: int) -> dict: with get_session() as session: try: device = session.query(EnterpriseDevice) \ .filter(EnterpriseDevice.id == device_id, EnterpriseDevice.delete != 1) \ .first() except Exception as e: logging.error("Failed to get device") raise e device_dict = {} if device: device_dict = { "name": device.name, "addr": device.addr, "device_model": device.device_model, "param": device.param, "comment": device.comment, "classification": device.classification } return device_dict