import logging
import datetime as dt
from multiprocessing.util import Finalize
import sqlalchemy
from celery import current_app
from celery import schedules
from celery.beat import Scheduler, ScheduleEntry
from celery.five import values, items
from celery.utils.encoding import safe_str, safe_repr
from celery.utils.log import get_logger
from celery.utils.time import maybe_make_aware
from kombu.utils.json import dumps, loads
from .session import session_cleanup
from .session import SessionManager
from .models import (
PeriodicTask,
PeriodicTaskChanged,
CrontabSchedule,
IntervalSchedule,
SolarSchedule,
)
# This scheduler must wake up more frequently than the
# regular of 5 minutes because it needs to take external
# changes to the schedule into account.
DEFAULT_MAX_INTERVAL = 5 # seconds
DEFAULT_BEAT_DBURI = "sqlite:///schedule.db"
ADD_ENTRY_ERROR = """\
Cannot add entry %r to database schedule: %r. Contents: %r
"""
session_manager = SessionManager()
# session = session_manager()
logger = get_logger("timApp.celery_sqlalchemy_scheduler.schedulers")
[docs]class ModelEntry(ScheduleEntry):
"""Scheduler entry taken from database row."""
model_schedules = (
# (schedule_type, model_type, model_field)
(schedules.crontab, CrontabSchedule, "crontab"),
(schedules.schedule, IntervalSchedule, "interval"),
(schedules.solar, SolarSchedule, "solar"),
)
save_fields = ["last_run_at", "total_run_count", "no_changes"]
def __init__(self, model, Session, app=None, **kw):
"""Initialize the model entry."""
self.app = app or current_app._get_current_object()
self.session = kw.get("session")
self.Session = Session
self.model = model
self.name = model.name
self.task = model.task
try:
self.schedule = model.schedule
logger.debug(f"schedule: {self.schedule}")
except Exception as e:
logger.error(e)
logger.error(
"Disabling schedule %s that was removed from database",
self.name,
)
self._disable(model)
try:
self.args = loads(model.args or "[]")
self.kwargs = loads(model.kwargs or "{}")
except ValueError as exc:
logger.exception(
"Removing schedule %s for argument deseralization error: %r",
self.name,
exc,
)
self._disable(model)
self.options = {}
for option in ["queue", "exchange", "routing_key", "expires", "priority"]:
value = getattr(model, option)
if value is None:
continue
self.options[option] = value
self.total_run_count = model.total_run_count
self.enabled = model.enabled
if not model.last_run_at:
model.last_run_at = self._default_now()
self.last_run_at = model.last_run_at
# 因为从数据库读取的 last_run_at 可能没有时区信息,所以这里必须加上时区信息
self.last_run_at = self.last_run_at.replace(tzinfo=self.app.timezone)
# self.options['expires'] 同理
# if 'expires' in self.options:
# expires = self.options['expires']
# self.options['expires'] = expires.replace(tzinfo=self.app.timezone)
def _disable(self, model):
model.no_changes = True
self.model.enabled = self.enabled = model.enabled = False
if self.session:
self.session.add(model)
self.session.commit()
else:
session = self.Session()
with session_cleanup(session):
session.add(model)
session.commit()
# obj = session.query(PeriodicTask).get(model.id)
# obj.enable = model.enabled
# session.add(obj)
# session.commit()
[docs] def is_due(self):
if not self.model.enabled:
# 5 second delay for re-enable.
return schedules.schedstate(False, 5.0)
# START DATE: only run after the `start_time`, if one exists.
if self.model.start_time is not None:
now = maybe_make_aware(self._default_now())
start_time = self.model.start_time.replace(tzinfo=self.app.timezone)
if now < start_time:
# The datetime is before the start date - don't run.
_, delay = self.schedule.is_due(self.last_run_at)
# use original delay for re-check
return schedules.schedstate(False, delay)
# ONE OFF TASK: Disable one off tasks after they've ran once
if self.model.one_off and self.model.enabled and self.model.total_run_count > 0:
self.model.enabled = False # disable
self.model.total_run_count = 0 # Reset
self.model.no_changes = False # Mark the model entry as changed
save_fields = ("enabled",) # the additional fields to save
self.save(save_fields)
return schedules.schedstate(False, None) # Don't recheck
return self.schedule.is_due(self.last_run_at)
def _default_now(self):
now = self.app.now()
# The PyTZ datetime must be localised for the Django-Celery-Beat
# scheduler to work. Keep in mind that timezone arithmatic
# with a localized timezone may be inaccurate.
# return now.tzinfo.localize(now.replace(tzinfo=None))
return now.replace(tzinfo=self.app.timezone)
def __next__(self):
# should be use `self._default_now()` or `self.app.now()` ?
self.model.last_run_at = self.app.now()
self.model.total_run_count += 1
self.model.no_changes = True
return self.__class__(self.model, Session=self.Session)
next = __next__ # for 2to3
[docs] def save(self, fields=tuple()):
"""
:params fields: tuple, the additional fields to save
"""
# TODO:
session = self.Session()
with session_cleanup(session):
# Object may not be synchronized, so only
# change the fields we care about.
obj = session.query(PeriodicTask).get(self.model.id)
for field in self.save_fields:
setattr(obj, field, getattr(self.model, field))
for field in fields:
setattr(obj, field, getattr(self.model, field))
session.add(obj)
session.commit()
[docs] @classmethod
def to_model_schedule(cls, session, schedule):
for schedule_type, model_type, model_field in cls.model_schedules:
# change to schedule
schedule = schedules.maybe_schedule(schedule)
if isinstance(schedule, schedule_type):
# TODO:
model_schedule = model_type.from_schedule(session, schedule)
return model_schedule, model_field
raise ValueError(f"Cannot convert schedule type {schedule!r} to model")
[docs] @classmethod
def from_entry(cls, name, Session, app=None, **entry):
"""
**entry sample:
{'task': 'celery.backend_cleanup',
'schedule': schedules.crontab('0', '4', '*'),
'options': {'expires': 43200}}
"""
session = Session()
with session_cleanup(session):
periodic_task = session.query(PeriodicTask).filter_by(name=name).first()
if not periodic_task:
periodic_task = PeriodicTask(name=name)
temp = cls._unpack_fields(session, **entry)
periodic_task.update(**temp)
session.add(periodic_task)
try:
session.commit()
except sqlalchemy.exc.IntegrityError as exc:
logger.error(exc)
session.rollback()
except Exception as exc:
logger.error(exc)
session.rollback()
res = cls(periodic_task, app=app, Session=Session, session=session)
return res
@classmethod
def _unpack_fields(
cls,
session,
schedule,
args=None,
kwargs=None,
relative=None,
options=None,
**entry,
):
"""
**entry sample:
{'task': 'celery.backend_cleanup',
'schedule': <crontab: 0 4 * * * (m/h/d/dM/MY)>,
'options': {'expires': 43200}}
"""
model_schedule, model_field = cls.to_model_schedule(session, schedule)
entry.update(
# the model_id which to relationship
{model_field + "_id": model_schedule.id},
args=dumps(args or []),
kwargs=dumps(kwargs or {}),
**cls._unpack_options(**options or {}),
)
return entry
@classmethod
def _unpack_options(
cls,
queue=None,
exchange=None,
routing_key=None,
priority=None,
one_off=None,
expires=None,
**kwargs,
):
data = {
"queue": queue,
"exchange": exchange,
"routing_key": routing_key,
"priority": priority,
"one_off": one_off,
}
if expires:
if isinstance(expires, int):
expires = dt.datetime.utcnow() + dt.timedelta(seconds=expires)
elif isinstance(expires, dt.datetime):
pass
else:
raise ValueError("expires value error")
data["expires"] = expires
return data
def __repr__(self):
return "<ModelEntry: {} {}(*{}, **{}) {}>".format(
safe_str(self.name),
self.task,
safe_repr(self.args),
safe_repr(self.kwargs),
self.schedule,
)
[docs]class DatabaseScheduler(Scheduler):
Entry = ModelEntry
Model = PeriodicTask
Changes = PeriodicTaskChanged
_schedule = None
_last_timestamp = None
_initial_read = True
_heap_invalidated = False
def __init__(self, *args, **kwargs):
"""Initialize the database scheduler."""
self.app = kwargs["app"]
self.dburi = (
kwargs.get("dburi") or self.app.conf.get("BEAT_DBURI") or DEFAULT_BEAT_DBURI
)
self.engine, self.Session = session_manager.create_session(self.dburi)
session_manager.prepare_models(self.engine)
self._dirty = set()
Scheduler.__init__(self, *args, **kwargs)
self._finalize = Finalize(self, self.sync, exitpriority=5)
self.max_interval = (
kwargs.get("max_interval")
or self.app.conf.beat_max_loop_interval
or DEFAULT_MAX_INTERVAL
)
[docs] def setup_schedule(self):
"""override"""
logger.info("setup_schedule")
self.install_default_entries(self.schedule)
self.update_from_dict(self.app.conf.beat_schedule)
[docs] def all_as_schedule(self):
# TODO:
session = self.Session()
with session_cleanup(session):
logger.debug("DatabaseScheduler: Fetching database schedule")
# get all enabled PeriodicTask
models = session.query(self.Model).filter_by(enabled=True).all()
s = {}
for model in models:
try:
s[model.name] = self.Entry(
model, app=self.app, Session=self.Session, session=session
)
except ValueError:
pass
return s
[docs] def schedule_changed(self):
session = self.Session()
with session_cleanup(session):
changes = session.query(self.Changes).get(1)
if not changes:
changes = self.Changes(id=1)
session.add(changes)
session.commit()
return False
last, ts = self._last_timestamp, changes.last_update
try:
if ts and ts > (last if last else ts):
return True
finally:
self._last_timestamp = ts
return False
[docs] def reserve(self, entry):
"""override
It will be called in parent class.
"""
new_entry = next(entry)
# Need to store entry by name, because the entry may change
# in the mean time.
self._dirty.add(new_entry.name)
return new_entry
[docs] def sync(self):
"""override"""
logger.info("Writing entries...")
_tried = set()
_failed = set()
try:
while self._dirty:
name = self._dirty.pop()
try:
self.schedule[name].save() # save to database
logger.debug(f"{name} save to database")
_tried.add(name)
except (KeyError) as exc:
logger.error(exc)
_failed.add(name)
except sqlalchemy.exc.IntegrityError as exc:
logger.exception("Database error while sync: %r", exc)
except Exception as exc:
logger.exception(exc)
finally:
# retry later, only for the failed ones
self._dirty |= _failed
[docs] def update_from_dict(self, mapping):
s = {}
for name, entry_fields in items(mapping):
# {'task': 'celery.backend_cleanup',
# 'schedule': schedules.crontab('0', '4', '*'),
# 'options': {'expires': 43200}}
try:
entry = self.Entry.from_entry(
name, Session=self.Session, app=self.app, **entry_fields
)
if entry.model.enabled:
s[name] = entry
except Exception as exc:
logger.error(ADD_ENTRY_ERROR, name, exc, entry_fields)
# update self.schedule
self.schedule.update(s)
[docs] def install_default_entries(self, data):
entries = {}
if self.app.conf.result_expires:
entries.setdefault(
"celery.backend_cleanup",
{
"task": "celery.backend_cleanup",
"schedule": schedules.crontab("0", "4", "*"),
"options": {"expires": 12 * 3600},
},
)
self.update_from_dict(entries)
[docs] def schedules_equal(self, *args, **kwargs):
if self._heap_invalidated:
self._heap_invalidated = False
return False
return super().schedules_equal(*args, **kwargs)
@property
def schedule(self):
initial = update = False
if self._initial_read:
logger.debug("DatabaseScheduler: initial read")
initial = update = True
self._initial_read = False
elif self.schedule_changed():
# when you updated the `PeriodicTasks` model's `last_update` field
logger.info("DatabaseScheduler: Schedule changed.")
update = True
if update:
self.sync()
self._schedule = self.all_as_schedule()
# the schedule changed, invalidate the heap in Scheduler.tick
if not initial:
self._heap = []
self._heap_invalidated = True
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
"Current schedule:\n%s",
"\n".join(repr(entry) for entry in values(self._schedule)),
)
# logger.debug(self._schedule)
return self._schedule
@property
def info(self):
"""override"""
# return infomation about Schedule
return f" . db -> {self.dburi}"