Source code for kibitzr.timeline

import collections
import logging
import re

import six
import pytimeparse
import schedule

from .exceptions import ConfigurationError

logger = logging.getLogger(__name__)

TimelineRule = collections.namedtuple("TimelineRule", "interval unit at")


[docs]class Timeline: RE_TIME = re.compile(r'\d?\d:\d\d') def __init__(self, scheduler=None): self.scheduler = scheduler or schedule.default_scheduler
[docs] @classmethod def parse_check(cls, check): check_schedule = [] if "period" not in check and "schedule" not in check: check_schedule.append(TimelineRule( interval=300, unit='seconds', at=None )) elif "period" in check: period = check["period"] if isinstance(period, six.string_types): seconds = int(pytimeparse.parse(period)) logger.debug('Parsed "%s" to %d seconds', period, seconds) period = seconds check_schedule.append(TimelineRule( interval=period, unit='seconds', at=None )) if "schedule" in check: schedule_value = check['schedule'] if isinstance(schedule_value, dict): check_schedule.append( cls._clean_single_schedule(check, schedule_value) ) elif isinstance(schedule_value, list): check_schedule.extend([ cls._clean_single_schedule(check, item) for item in schedule_value ]) else: raise ConfigurationError( f"Check {check['name']} has invalid schedule configuration: {check['schedule']}" ) return check_schedule
@classmethod def _clean_single_schedule(cls, check, schedule_dict): if not isinstance(schedule_dict, dict): raise ConfigurationError( f"Check {check['name']} has invalid schedule configuration: {schedule_dict}" ) try: every = schedule_dict['every'] except KeyError as exc: raise ConfigurationError( f"Check {check['name']} has invalid schedule format: {schedule_dict}" ) from exc if isinstance(every, six.string_types): # "every: day" shortcut unit, every = every, 1 else: unit = schedule_dict.get('unit') unit = cls._clean_unit(check, unit) at = cls._clean_at(check, schedule_dict.get('at')) rule = TimelineRule(every, unit, at) logger.debug('Parsed schedule "%r" to %r', schedule_dict, rule) return rule @staticmethod def _clean_unit(check, unit): try: getattr(schedule.every(1), unit) except Exception as exc: raise ConfigurationError( f"Unit {unit} is not one of valid options. Referenced in check {check['name']}" ) from exc return unit @classmethod def _clean_at(cls, check, at): if at is None or cls.RE_TIME.match(at): return at raise ConfigurationError( f"Check {check['name']} schedule has invalid value for 'at': {at}" )
[docs] def schedule_checks(self, checkers): self.scheduler.clear() for checker in checkers: conf = checker.conf for s in conf['schedule']: job = getattr(self.scheduler.every(s.interval), s.unit) if s.at is not None: logger.info( "Scheduling checks for %r every %r %s at %r", conf["name"], s.interval, s.unit, s.at ) job.at(s.at) else: logger.info( "Scheduling checks for %r every %r %s", conf["name"], s.interval, s.unit, ) # Add job to scheduler job.do(checker.check)
# The following methods are shortcuts for not having to # create a Timeline instance: # default_timeline explicitly uses the default scheduler default_timeline = Timeline()
[docs]def parse_check(check): return default_timeline.parse_check(check)
[docs]def run_pending(): return default_timeline.scheduler.run_pending()
[docs]def schedule_checks(checkers): default_timeline.schedule_checks(checkers)