import os
import re
import copy
import logging.config
import contextlib
import yaml
import entrypoints
from . import timeline
from .exceptions import ConfigurationError
logger = logging.getLogger(__name__)
[docs]class ReloadableSettings:
_instance = None
CONFIG_DIRS = (
'',
'~/.config/kibitzr/',
'~/',
)
CONFIG_FILENAME = 'kibitzr.yml'
RE_PUNCTUATION = re.compile(r'\W+')
UNNAMED_PATTERN = 'Unnamed check {0}'
def __init__(self, config_dir):
self.filename = os.path.join(config_dir, self.CONFIG_FILENAME)
self.checks = None
self.creds = CompositeCreds(config_dir)
self.parser = SettingsParser()
self.reread()
[docs] @classmethod
def detect_config_dir(cls):
candidates = [
(directory, os.path.join(directory, cls.CONFIG_FILENAME))
for directory in map(os.path.expanduser, cls.CONFIG_DIRS)
]
for directory, file_path in candidates:
if os.path.exists(file_path):
return directory
raise ConfigurationError(
"kibitzr.yml not found in following locations: {}" # pylint: disable=consider-using-f-string
.format(", ".join([x[1] for x in candidates]))
)
[docs] @classmethod
def instance(cls):
if cls._instance is None:
config_dir = cls.detect_config_dir()
cls._instance = cls(config_dir)
return cls._instance
[docs] def reread(self):
"""
Read configuration file and substitute references into checks conf
"""
logger.debug("Loading settings from %s",
os.path.abspath(self.filename))
conf = self.read_conf()
changed = self.creds.reread()
checks = self.parser.parse_checks(conf)
if self.checks != checks:
self.checks = checks
return True
return changed
[docs] def read_conf(self):
"""
Read and parse configuration file
"""
with self.open_conf() as fp:
return yaml.safe_load(fp)
[docs] @contextlib.contextmanager
def open_conf(self):
with open(self.filename, encoding='utf8') as fp:
yield fp
[docs]class CompositeCreds:
def __init__(self, config_dir):
self.plain = PlainYamlCreds(config_dir)
self.extensions = {}
self.load_extensions()
[docs] def reread(self):
changed = False
for extension in self.extensions:
reread_method = getattr(extension, 'reread', None)
if reread_method:
changed |= reread_method()
return changed
[docs] def load_extensions(self):
for point in entrypoints.get_group_all("kibitzr.creds"):
factory = point.load()
self.extensions[point.name] = factory()
[docs] def get(self, key, default=None):
try:
return self[key]
except KeyError:
return default
def __getitem__(self, key):
if key in self.extensions:
return self.extensions[key]
if key in self.plain:
return self.plain[key]
raise KeyError(f"Credentials not found: {key}")
[docs]class PlainYamlCreds:
CREDENTIALS_FILENAME = 'kibitzr-creds.yml'
def __init__(self, config_dir):
super().__init__()
self.creds = {}
self.creds_filename = os.path.join(config_dir,
self.CREDENTIALS_FILENAME)
self.reread()
def __contains__(self, key):
return key in self.creds
def __getitem__(self, key):
return self.creds[key]
[docs] def reread(self):
"""
Read and parse credentials file.
If something goes wrong, log exception and continue.
"""
logger.debug("Loading credentials from %s",
os.path.abspath(self.creds_filename))
creds = {}
try:
with self.open_creds() as fp:
creds = yaml.safe_load(fp)
except IOError:
logger.info("No credentials file found at %s",
os.path.abspath(self.creds_filename))
except:
logger.exception("Error loading credentials file")
if creds != self.creds:
self.creds = creds
return True
return False
[docs] @contextlib.contextmanager
def open_creds(self):
with open(self.creds_filename, encoding='utf8') as fp:
yield fp
[docs]def settings():
"""
Returns singleton instance of settings
"""
return ReloadableSettings.instance()
[docs]class SettingsParser:
RE_PUNCTUATION = re.compile(r'\W+')
UNNAMED_PATTERN = 'Unnamed check {0}'
[docs] def parse_checks(self, conf):
"""
Unpack configuration from human-friendly form
to strict check definitions.
"""
checks = conf.get('checks', conf.get('pages', []))
checks = list(self.unpack_batches(checks))
checks = list(self.unpack_templates(checks, conf.get('templates', {})))
self.inject_missing_names(checks)
for check in checks:
self.inject_scenarios(check, conf.get('scenarios', {}))
self.inject_notifiers(check, conf.get('notifiers', {}))
self.expand_schedule(check)
return checks
[docs] @staticmethod
def inject_notifiers(check, notifiers):
if 'notify' in check:
for notify in check['notify']:
if hasattr(notify, 'keys'):
notify_type = next(iter(notify.keys()))
notify_param = next(iter(notify.values()))
try:
notify[notify_type] = notifiers[notify_param]
except (TypeError, KeyError):
# notify_param is not a predefined notifier name
# Save it as is:
notify[notify_type] = notify_param
[docs] @staticmethod
def inject_scenarios(check, scenarios):
try:
shared_scenario = scenarios[check['scenario']]
except (KeyError, TypeError):
pass
else:
check['scenario'] = shared_scenario
[docs] @staticmethod
def expand_schedule(check):
check_schedule = timeline.parse_check(check)
if 'period' in check:
del check['period']
check['schedule'] = check_schedule
[docs] @staticmethod
def unpack_batches(checks):
for check in checks:
if 'batch' in check:
base = copy.deepcopy(check)
batch = base.pop('batch')
url_pattern = base.pop('url-pattern')
items = base.pop('items')
for item in items:
yield dict(
copy.deepcopy(base),
name=batch.format(item),
url=url_pattern.format(item),
)
else:
yield check
[docs] @classmethod
def inject_missing_names(cls, checks):
unnamed_check_counter = 1
for check in checks:
if not check.get('name'):
if check.get('url'):
check['name'] = cls.url_to_name(check['url'])
else:
check['name'] = cls.UNNAMED_PATTERN.format(unnamed_check_counter)
unnamed_check_counter += 1
[docs] @classmethod
def url_to_name(cls, url):
return cls.RE_PUNCTUATION.sub('-', url)
[docs] @staticmethod
def unpack_templates(checks, templates):
for check in checks:
if 'template' in check:
if check['template'] in templates:
templated_check = dict(
copy.deepcopy(templates[check['template']]),
**check
)
del templated_check['template']
yield templated_check
else:
raise ConfigurationError(
f"Template {check['template']} not found. "
f"Referenced in check {check['name']}"
)
else:
yield check
logging.config.dictConfig({
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'standard': {
'format': '%(asctime)s [%(levelname)s] %(name)s: %(message)s'
},
},
'handlers': {
'default': {
'level': 'DEBUG',
'class': 'logging.StreamHandler',
'formatter': 'standard',
},
},
'loggers': {
'': {
'handlers': ['default'],
'level': 'INFO',
'propagate': True,
},
'sh': {
'level': 'INFO',
},
'sh.command': {
'level': 'WARNING',
},
}
})