Source code for kibitzr.conf

import os
import re
import copy
import logging.config
import contextlib

import yaml
import entrypoints

from . import timeline
from .exceptions import ConfigurationError

logger = logging.getLogger(__name__)


[docs]class ReloadableSettings: _instance = None CONFIG_DIRS = ( '', '~/.config/kibitzr/', '~/', ) CONFIG_FILENAME = 'kibitzr.yml' RE_PUNCTUATION = re.compile(r'\W+') UNNAMED_PATTERN = 'Unnamed check {0}' def __init__(self, config_dir): self.filename = os.path.join(config_dir, self.CONFIG_FILENAME) self.checks = None self.creds = CompositeCreds(config_dir) self.parser = SettingsParser() self.reread()
[docs] @classmethod def detect_config_dir(cls): candidates = [ (directory, os.path.join(directory, cls.CONFIG_FILENAME)) for directory in map(os.path.expanduser, cls.CONFIG_DIRS) ] for directory, file_path in candidates: if os.path.exists(file_path): return directory raise ConfigurationError( "kibitzr.yml not found in following locations: {}" # pylint: disable=consider-using-f-string .format(", ".join([x[1] for x in candidates])) )
[docs] @classmethod def instance(cls): if cls._instance is None: config_dir = cls.detect_config_dir() cls._instance = cls(config_dir) return cls._instance
[docs] def reread(self): """ Read configuration file and substitute references into checks conf """ logger.debug("Loading settings from %s", os.path.abspath(self.filename)) conf = self.read_conf() changed = self.creds.reread() checks = self.parser.parse_checks(conf) if self.checks != checks: self.checks = checks return True return changed
[docs] def read_conf(self): """ Read and parse configuration file """ with self.open_conf() as fp: return yaml.safe_load(fp)
[docs] @contextlib.contextmanager def open_conf(self): with open(self.filename, encoding='utf8') as fp: yield fp
[docs]class CompositeCreds: def __init__(self, config_dir): self.plain = PlainYamlCreds(config_dir) self.extensions = {} self.load_extensions()
[docs] def reread(self): changed = False for extension in self.extensions: reread_method = getattr(extension, 'reread', None) if reread_method: changed |= reread_method() return changed
[docs] def load_extensions(self): for point in entrypoints.get_group_all("kibitzr.creds"): factory = point.load() self.extensions[point.name] = factory()
[docs] def get(self, key, default=None): try: return self[key] except KeyError: return default
def __getitem__(self, key): if key in self.extensions: return self.extensions[key] if key in self.plain: return self.plain[key] raise KeyError(f"Credentials not found: {key}")
[docs]class PlainYamlCreds: CREDENTIALS_FILENAME = 'kibitzr-creds.yml' def __init__(self, config_dir): super().__init__() self.creds = {} self.creds_filename = os.path.join(config_dir, self.CREDENTIALS_FILENAME) self.reread() def __contains__(self, key): return key in self.creds def __getitem__(self, key): return self.creds[key]
[docs] def reread(self): """ Read and parse credentials file. If something goes wrong, log exception and continue. """ logger.debug("Loading credentials from %s", os.path.abspath(self.creds_filename)) creds = {} try: with self.open_creds() as fp: creds = yaml.safe_load(fp) except IOError: logger.info("No credentials file found at %s", os.path.abspath(self.creds_filename)) except: logger.exception("Error loading credentials file") if creds != self.creds: self.creds = creds return True return False
[docs] @contextlib.contextmanager def open_creds(self): with open(self.creds_filename, encoding='utf8') as fp: yield fp
[docs]def settings(): """ Returns singleton instance of settings """ return ReloadableSettings.instance()
[docs]class SettingsParser: RE_PUNCTUATION = re.compile(r'\W+') UNNAMED_PATTERN = 'Unnamed check {0}'
[docs] def parse_checks(self, conf): """ Unpack configuration from human-friendly form to strict check definitions. """ checks = conf.get('checks', conf.get('pages', [])) checks = list(self.unpack_batches(checks)) checks = list(self.unpack_templates(checks, conf.get('templates', {}))) self.inject_missing_names(checks) for check in checks: self.inject_scenarios(check, conf.get('scenarios', {})) self.inject_notifiers(check, conf.get('notifiers', {})) self.expand_schedule(check) return checks
[docs] @staticmethod def inject_notifiers(check, notifiers): if 'notify' in check: for notify in check['notify']: if hasattr(notify, 'keys'): notify_type = next(iter(notify.keys())) notify_param = next(iter(notify.values())) try: notify[notify_type] = notifiers[notify_param] except (TypeError, KeyError): # notify_param is not a predefined notifier name # Save it as is: notify[notify_type] = notify_param
[docs] @staticmethod def inject_scenarios(check, scenarios): try: shared_scenario = scenarios[check['scenario']] except (KeyError, TypeError): pass else: check['scenario'] = shared_scenario
[docs] @staticmethod def expand_schedule(check): check_schedule = timeline.parse_check(check) if 'period' in check: del check['period'] check['schedule'] = check_schedule
[docs] @staticmethod def unpack_batches(checks): for check in checks: if 'batch' in check: base = copy.deepcopy(check) batch = base.pop('batch') url_pattern = base.pop('url-pattern') items = base.pop('items') for item in items: yield dict( copy.deepcopy(base), name=batch.format(item), url=url_pattern.format(item), ) else: yield check
[docs] @classmethod def inject_missing_names(cls, checks): unnamed_check_counter = 1 for check in checks: if not check.get('name'): if check.get('url'): check['name'] = cls.url_to_name(check['url']) else: check['name'] = cls.UNNAMED_PATTERN.format(unnamed_check_counter) unnamed_check_counter += 1
[docs] @classmethod def url_to_name(cls, url): return cls.RE_PUNCTUATION.sub('-', url)
[docs] @staticmethod def unpack_templates(checks, templates): for check in checks: if 'template' in check: if check['template'] in templates: templated_check = dict( copy.deepcopy(templates[check['template']]), **check ) del templated_check['template'] yield templated_check else: raise ConfigurationError( f"Template {check['template']} not found. " f"Referenced in check {check['name']}" ) else: yield check
logging.config.dictConfig({ 'version': 1, 'disable_existing_loggers': False, 'formatters': { 'standard': { 'format': '%(asctime)s [%(levelname)s] %(name)s: %(message)s' }, }, 'handlers': { 'default': { 'level': 'DEBUG', 'class': 'logging.StreamHandler', 'formatter': 'standard', }, }, 'loggers': { '': { 'handlers': ['default'], 'level': 'INFO', 'propagate': True, }, 'sh': { 'level': 'INFO', }, 'sh.command': { 'level': 'WARNING', }, } })