# -*- coding: utf-8 -*- import logging from sqlalchemy import Column, Integer, String, DateTime, func from sqlalchemy.ext.declarative import declarative_base from website import errors from website.db.device_classification.device_classification import DeviceClassification from website.db.enterprise_entity.enterprise_entity import EnterpriseEntityRepository from website.db.enterprise_node.enterprise_node import EnterpriseNodeRepository from website.db_mysql import get_session from website.util import shortuuid def row2dict(row): d = {} for column in row.__table__.columns: d[column.name] = str(getattr(row, column.name)) return d Base = declarative_base() """ CREATE TABLE `enterprise_device` ( `id` int NOT NULL AUTO_INCREMENT, `suid` varchar(10) DEFAULT NULL COMMENT '设备suid', `entity_id` int NOT NULL COMMENT '企业id', `entity_suid` varchar(10) NOT NULL COMMENT '企业suid', `node_id` int NOT NULL COMMENT '节点id', `node_suid` varchar(10) NOT NULL COMMENT '节点suid', `classification` varchar(10) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT '' COMMENT '分类suid', `name` varchar(255) NOT NULL, `addr` varchar(255) DEFAULT '', `device_model` varchar(255) DEFAULT '' COMMENT '设备型号', `param` varchar(255) DEFAULT '', `comment` varchar(255) DEFAULT '', `del` int DEFAULT '0', `create_time` datetime DEFAULT CURRENT_TIMESTAMP, `update_time` datetime DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='企业设备表'; """ class EnterpriseDevice(Base): __tablename__ = "enterprise_device" id = Column(Integer, primary_key=True) suid = Column(String(length=10), default="") entity_id = Column(Integer) entity_suid = Column(String) node_id = Column(Integer) node_suid = Column(String) classification = Column(String) name = Column(String) addr = Column(String) device_model = Column(String) param = Column(String) comment = Column(String) status = Column(Integer) delete = Column("del", Integer, default=0) create_time = Column(DateTime, default=func.now()) update_time = Column(DateTime, default=func.now()) class EnterpriseDeviceRepository(object): def add_device(self, device): entity_id = device["entity_id"] node_id = device["node_id"] entity_suid = EnterpriseEntityRepository().get_entity_suid(entity_id) node = EnterpriseNodeRepository().get_node_by_id(node_id) node_suid = node["suid"] device["entity_suid"] = entity_suid device["node_suid"] = node_suid device["suid"] = shortuuid.ShortUUID().random(10) name = device["name"] with get_session() as session: existing_device = ( session.query(EnterpriseDevice) .filter_by(node_id=node_id, name=name, delete=0) .first() ) if existing_device: logging.error( f"Failed to add device: device with node_id={node_id} and name={name} already exists" ) raise errors.HTTPAPIError(errors.ERROR_BAD_REQUEST, "设备已存在") new_device = EnterpriseDevice(**device) try: # session.execute( # text( # """INSERT INTO enterprise_device # (suid, entity_id, entity_suid, node_id, node_suid, classification, name, addr, device_model, param, comment) # values (:suid, :entity_id, :entity_suid, :node_id, :node_suid, :classification, :name, :addr, :device_model, :param, :comment)""" # ), # device) session.add(new_device) except Exception as e: logging.error("Failed to add device") raise e return def edit_device(self, device): device_id = device["id"] with get_session() as session: try: session.query(EnterpriseDevice).filter( EnterpriseDevice.id == device_id ).update(device) except Exception as e: logging.error("Failed to edit device") raise e return def delete_device(self, device_id): with get_session() as session: try: session.query(EnterpriseDevice).filter( EnterpriseDevice.id == device_id ).update({"delete": 1}) except Exception as e: logging.error("Failed to delete device") raise e return def list_devices(self, node_id: int, pageNo: int, pageSize: int) -> dict: with get_session() as session: try: total_count = ( session.query(EnterpriseDevice) .filter( EnterpriseDevice.node_id == node_id, EnterpriseDevice.delete != 1, ) .count() ) devices = ( session.query( EnterpriseDevice, DeviceClassification.name.label("classification_name"), ) .join( DeviceClassification, EnterpriseDevice.classification == DeviceClassification.suid, ) .filter( EnterpriseDevice.node_id == node_id, EnterpriseDevice.delete != 1, ) .order_by(EnterpriseDevice.id.desc()) .limit(pageSize) .offset((pageNo - 1) * pageSize) .all() ) except Exception as e: logging.error("Failed to list devices") raise e device_dicts = [] for device, classification_name in devices: device_dict = { "id": device.id, "suid": device.suid, "entity_id": device.entity_id, "entity_suid": device.entity_suid, "node_id": device.node_id, "node_suid": device.node_suid, "classification": classification_name, "name": device.name, "addr": device.addr, "device_model": device.device_model, "param": device.param, "comment": device.comment, "delete": device.delete, "create_time": device.create_time.strftime("%Y-%m-%d %H:%M:%S"), "update_time": str(device.update_time), } device_dicts.append(device_dict) # for row in devices: # logging.info(row.name) logging.info(device_dicts) return {"devices": device_dicts, "total_count": total_count} def list_simple_devices(self, node_id: int) -> list: """获取节点下所有设备的简单信息。""" with get_session() as session: try: devices = ( session.query( EnterpriseDevice, DeviceClassification.name.label("classification_name"), ) .join( DeviceClassification, EnterpriseDevice.classification == DeviceClassification.suid, ) .filter( EnterpriseDevice.node_id == node_id, EnterpriseDevice.delete != 1, ) .all() ) except Exception as e: logging.error("Failed to list devices") raise e device_dicts = [] for device in devices: device_dict = { "id": device.id, "name": device.name, "classification_name": device.classification_name, } device_dicts.append(device_dict) return device_dicts def get_device(self, device_id: int) -> dict: """ 根据设备ID获取设备信息。 """ with get_session() as session: try: device = ( session.query(EnterpriseDevice) .filter( EnterpriseDevice.id == device_id, EnterpriseDevice.delete != 1 ) .first() ) except Exception as e: logging.error("Failed to get device") raise e device_dict = {} if device: device_dict = { "suid": device.suid, "name": device.name, "addr": device.addr, "device_model": device.device_model, "param": device.param, "comment": device.comment, "classification": device.classification, } return device_dict def get_devices(self, device_ids: list) -> list: """ 根据设备ID列表获取设备信息。 """ with get_session() as session: try: devices = ( session.query(EnterpriseDevice) .filter( EnterpriseDevice.id.in_(device_ids), EnterpriseDevice.delete != 1, ) .all() ) except Exception as e: logging.error("Failed to get devices") raise e device_dicts = [] for device in devices: device_dict = { "id": device.id, "name": device.name, "addr": device.addr, "device_model": device.device_model, "param": device.param, "comment": device.comment, } device_dicts.append(device_dict) return device_dicts def get_all_device_count(self) -> int: """获取所有设备的数量""" with get_session() as session: try: count = session.query(EnterpriseDevice).filter(EnterpriseDevice.delete != 1).count() return count except Exception as e: logging.error("Failed to get all device count, error: {}".format(e)) return 0 def list_entity_devices(self, entity_id: int, pageNo: int, pageSize: int, classification: str, status: int) -> dict: with get_session() as session: try: session_count = ( session.query(EnterpriseDevice) .filter( EnterpriseDevice.entity_id == entity_id, EnterpriseDevice.delete != 1, ) ) if classification: session_count.filter(EnterpriseDevice.classification == classification) if status: session_count.filter(EnterpriseDevice.status == status) count = session_count.count() session_device = ( session.query(EnterpriseDevice) .filter( EnterpriseDevice.entity_id == entity_id, EnterpriseDevice.delete != 1, ) ) if classification: session_device.filter(EnterpriseDevice.classification == classification) if status: session_device.filter(EnterpriseDevice.status == status) devices = session_device.order_by(EnterpriseDevice.id.desc()).limit(pageSize).offset((pageNo - 1) * pageSize).all() except Exception as e: logging.error("Failed to list devices") raise e return {"count": count, "devices": devices}