import datetime as dt
import pytz
import sqlalchemy as sa
from sqlalchemy import func
from sqlalchemy.event import listen
from sqlalchemy.orm import relationship, foreign, remote
from sqlalchemy.sql import select, insert, update
from celery import schedules
from celery.utils.log import get_logger
from .tzcrontab import TzAwareCrontab
from .session import ModelBase
from ..item.block import Block
from ..plugin.taskid import TaskId
from ..util.utils import cached_property
logger = get_logger("celery_sqlalchemy_scheduler.models")
[docs]def cronexp(field):
"""Representation of cron expression."""
return field and str(field).replace(" ", "") or "*"
[docs]class ModelMixin:
[docs] @classmethod
def create(cls, **kw):
return cls(**kw)
[docs] def update(self, **kw):
for attr, value in kw.items():
setattr(self, attr, value)
return self
[docs]class IntervalSchedule(ModelBase, ModelMixin):
__tablename__ = "celery_interval_schedule"
__table_args__ = {"sqlite_autoincrement": True}
DAYS = "days"
HOURS = "hours"
MINUTES = "minutes"
SECONDS = "seconds"
MICROSECONDS = "microseconds"
id = sa.Column(sa.Integer, primary_key=True, autoincrement=True)
every = sa.Column(sa.Integer, nullable=False)
period = sa.Column(sa.String(24))
def __repr__(self):
if self.every == 1:
return f"every {self.period_singular}"
return f"every {self.every} {self.period}"
@property
def schedule(self):
return schedules.schedule(
dt.timedelta(**{self.period: self.every}),
# nowfun=lambda: make_aware(now())
# nowfun=dt.datetime.now
)
[docs] @classmethod
def from_schedule(cls, session, schedule, period=SECONDS):
every = max(schedule.run_every.total_seconds(), 0)
model = (
session.query(IntervalSchedule)
.filter_by(every=every, period=period)
.first()
)
if not model:
model = cls(every=every, period=period)
session.add(model)
session.commit()
return model
@property
def period_singular(self):
return self.period[:-1]
[docs]class CrontabSchedule(ModelBase, ModelMixin):
__tablename__ = "celery_crontab_schedule"
__table_args__ = {"sqlite_autoincrement": True}
id = sa.Column(sa.Integer, primary_key=True, autoincrement=True)
minute = sa.Column(sa.String(60 * 4), default="*")
hour = sa.Column(sa.String(24 * 4), default="*")
day_of_week = sa.Column(sa.String(64), default="*")
day_of_month = sa.Column(sa.String(31 * 4), default="*")
month_of_year = sa.Column(sa.String(64), default="*")
timezone = sa.Column(sa.String(64), default="UTC")
def __repr__(self):
return "{} {} {} {} {} (m/h/d/dM/MY) {}".format(
cronexp(self.minute),
cronexp(self.hour),
cronexp(self.day_of_week),
cronexp(self.day_of_month),
cronexp(self.month_of_year),
str(self.timezone),
)
@property
def schedule(self):
return TzAwareCrontab(
minute=self.minute,
hour=self.hour,
day_of_week=self.day_of_week,
day_of_month=self.day_of_month,
month_of_year=self.month_of_year,
tz=pytz.timezone(self.timezone),
)
[docs] @classmethod
def from_schedule(cls, session, schedule):
spec = {
"minute": schedule._orig_minute,
"hour": schedule._orig_hour,
"day_of_week": schedule._orig_day_of_week,
"day_of_month": schedule._orig_day_of_month,
"month_of_year": schedule._orig_month_of_year,
}
if schedule.tz:
spec.update({"timezone": schedule.tz.zone})
model = session.query(CrontabSchedule).filter_by(**spec).first()
if not model:
model = cls(**spec)
session.add(model)
session.commit()
return model
[docs]class SolarSchedule(ModelBase, ModelMixin):
__tablename__ = "celery_solar_schedule"
__table_args__ = {"sqlite_autoincrement": True}
id = sa.Column(sa.Integer, primary_key=True, autoincrement=True)
event = sa.Column(sa.String(24))
latitude = sa.Column(sa.Float())
longitude = sa.Column(sa.Float())
@property
def schedule(self):
return schedules.solar(
self.event, self.latitude, self.longitude, nowfun=dt.datetime.now
)
[docs] @classmethod
def from_schedule(cls, session, schedule):
spec = {
"event": schedule.event,
"latitude": schedule.lat,
"longitude": schedule.lon,
}
model = session.query(SolarSchedule).filter_by(**spec).first()
if not model:
model = cls(**spec)
session.add(model)
session.commit()
return model
def __repr__(self):
return f"{self.event} ({self.latitude}, {self.longitude})"
[docs]class PeriodicTaskChanged(ModelBase, ModelMixin):
"""Helper table for tracking updates to periodic tasks."""
__tablename__ = "celery_periodic_task_changed"
id = sa.Column(sa.Integer, primary_key=True)
last_update = sa.Column(
sa.DateTime(timezone=True), nullable=False, default=dt.datetime.now
)
[docs] @classmethod
def changed(cls, mapper, connection, target):
"""
:param mapper: the Mapper which is the target of this event
:param connection: the Connection being used
:param target: the mapped instance being persisted
"""
if not target.no_changes:
cls.update_changed(mapper, connection, target)
[docs] @classmethod
def update_changed(cls, mapper, connection, target):
"""
:param mapper: the Mapper which is the target of this event
:param connection: the Connection being used
:param target: the mapped instance being persisted
"""
s = connection.execute(
select([PeriodicTaskChanged]).where(PeriodicTaskChanged.id == 1).limit(1)
)
if not s:
s = connection.execute(
insert(PeriodicTaskChanged), last_update=dt.datetime.now()
)
else:
s = connection.execute(
update(PeriodicTaskChanged)
.where(PeriodicTaskChanged.id == 1)
.values(last_update=dt.datetime.now())
)
[docs] @classmethod
def last_change(cls, session):
periodic_tasks = session.query(PeriodicTaskChanged).get(1)
if periodic_tasks:
return periodic_tasks.last_update
[docs]class PeriodicTask(ModelBase, ModelMixin):
__tablename__ = "celery_periodic_task"
__table_args__ = {"sqlite_autoincrement": True}
id = sa.Column(sa.Integer, primary_key=True, autoincrement=True)
block_id = sa.Column(sa.Integer, sa.ForeignKey("block.id"), nullable=True)
block = relationship(Block)
# name
name = sa.Column(sa.String(255), unique=True)
# task name
task = sa.Column(sa.String(255))
# not use ForeignKey
interval_id = sa.Column(sa.Integer)
interval = relationship(
IntervalSchedule,
uselist=False,
primaryjoin=foreign(interval_id) == remote(IntervalSchedule.id),
)
crontab_id = sa.Column(sa.Integer)
crontab = relationship(
CrontabSchedule,
uselist=False,
primaryjoin=foreign(crontab_id) == remote(CrontabSchedule.id),
)
solar_id = sa.Column(sa.Integer)
solar = relationship(
SolarSchedule,
uselist=False,
primaryjoin=foreign(solar_id) == remote(SolarSchedule.id),
)
args = sa.Column(sa.Text(), default="[]")
kwargs = sa.Column(sa.Text(), default="{}")
# queue for celery
queue = sa.Column(sa.String(255))
# exchange for celery
exchange = sa.Column(sa.String(255))
# routing_key for celery
routing_key = sa.Column(sa.String(255))
priority = sa.Column(sa.Integer())
expires = sa.Column(sa.DateTime(timezone=True))
# 只执行一次
one_off = sa.Column(sa.Boolean(), default=False)
start_time = sa.Column(sa.DateTime(timezone=True))
enabled = sa.Column(sa.Boolean(), default=True)
last_run_at = sa.Column(sa.DateTime(timezone=True))
total_run_count = sa.Column(sa.Integer(), nullable=False, default=0)
# 修改时间
date_changed = sa.Column(
sa.DateTime(timezone=True), default=func.now(), onupdate=func.now()
)
description = sa.Column(sa.Text(), default="")
no_changes = False
def __repr__(self):
fmt = "{0.name}: {{no schedule}}"
if self.interval:
fmt = "{0.name}: {0.interval}"
elif self.crontab:
fmt = "{0.name}: {0.crontab}"
elif self.solar:
fmt = "{0.name}: {0.solar}"
return fmt.format(self)
@property
def task_name(self):
return self.task
@task_name.setter
def task_name(self, value):
self.task = value
@property
def schedule(self):
if self.interval:
return self.interval.schedule
elif self.crontab:
return self.crontab.schedule
elif self.solar:
return self.solar.schedule
raise ValueError(f"{self.name} schedule is None!")
@cached_property
def task_id(self):
"""Returns the TIM task id. Only valid for tasks added by users."""
return TaskId.parse(self.name, allow_block_hint=False, allow_type=False)
listen(PeriodicTask, "after_insert", PeriodicTaskChanged.update_changed)
listen(PeriodicTask, "after_delete", PeriodicTaskChanged.update_changed)
listen(PeriodicTask, "after_update", PeriodicTaskChanged.changed)
listen(IntervalSchedule, "after_insert", PeriodicTaskChanged.update_changed)
listen(IntervalSchedule, "after_delete", PeriodicTaskChanged.update_changed)
listen(IntervalSchedule, "after_update", PeriodicTaskChanged.update_changed)
listen(CrontabSchedule, "after_insert", PeriodicTaskChanged.update_changed)
listen(CrontabSchedule, "after_delete", PeriodicTaskChanged.update_changed)
listen(CrontabSchedule, "after_update", PeriodicTaskChanged.update_changed)
listen(SolarSchedule, "after_insert", PeriodicTaskChanged.update_changed)
listen(SolarSchedule, "after_delete", PeriodicTaskChanged.update_changed)
listen(SolarSchedule, "after_update", PeriodicTaskChanged.update_changed)