You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

72 lines
2.0 KiB
Python

from crontab import CronTab
from datetime import datetime
import json
import getpass
user = getpass.getuser()
class CronOpt:
#创建定时任务
def __init__(self,user=user):
self.initialize(user)
#初始化
def initialize(self,user=user):
self.cron = CronTab(user)
#查询列表
def select(self,reInit = False):
if reInit != False:
# 强制重新读取列表
self.initialize()
cronArray = []
for job in self.cron:
# print job.command
schedule = job.schedule(date_from=datetime.now())
cronDict = {
"task" : (job.command).replace(r'>/dev/null 2>&1', ''),
"next" : str(schedule.get_next()),
"prev" : str(schedule.get_prev()),
"comment": job.comment
}
cronArray.append(cronDict)
return cronArray
#新增定时任务
def add(self, command, timeStr, commentName):
# 创建任务
job = self.cron.new(command=command)
# 设置任务执行周期
job.setall(timeStr)
# 备注也是命令id
job.set_comment(commentName)
# 写入到定时任务
self.cron.write()
# 返回更新后的列表
return self.select(True)
#删除定时任务
def delCron(self,commentName):
for job in self.cron :
if job.comment == commentName:
self.cron.remove(job)
self.cron.write()
return self.select(True)
#更新定时任务
def update(self, command, timeStr, commentName):
# 先删除任务
self.delCron(commentName)
# 再创建任务,以达到更新的结果
return self.add(command, timeStr, commentName)
if __name__ == '__main__':
c = CronOpt()
#print(c.select())
# 新增定时任务
#print(c.add("ls /home","*/10 * * * *","测试"))
# 删除定时任务
print(c.delCron("测试"))