from crontab import CronTab from datetime import datetime import json import getpass user = getpass.getuser() class CronOpt: #创建定时任务 def __init__(self,user=user): self.initialize(user) #初始化 def initialize(self,user=user): self.cron = CronTab(user) #查询列表 def select(self,reInit = False): if reInit != False: # 强制重新读取列表 self.initialize() cronArray = [] for job in self.cron: # print job.command schedule = job.schedule(date_from=datetime.now()) cronDict = { "task" : (job.command).replace(r'>/dev/null 2>&1', ''), "next" : str(schedule.get_next()), "prev" : str(schedule.get_prev()), "comment": job.comment } cronArray.append(cronDict) return cronArray #新增定时任务 def add(self, command, timeStr, commentName): # 创建任务 job = self.cron.new(command=command) # 设置任务执行周期 job.setall(timeStr) # 备注,也是命令id job.set_comment(commentName) # 写入到定时任务 self.cron.write() # 返回更新后的列表 return self.select(True) #删除定时任务 def delCron(self,commentName): for job in self.cron : if job.comment == commentName: self.cron.remove(job) self.cron.write() return self.select(True) #更新定时任务 def update(self, command, timeStr, commentName): # 先删除任务 self.delCron(commentName) # 再创建任务,以达到更新的结果 return self.add(command, timeStr, commentName) if __name__ == '__main__': c = CronOpt() #print(c.select()) # 新增定时任务 #print(c.add("ls /home","*/10 * * * *","测试")) # 删除定时任务 print(c.delCron("测试"))