Source code for kibitzr.app
import os
import logging
import signal
import time
import code
import psutil
import entrypoints
from .conf import settings, SettingsParser
from .fetcher import cleanup_fetchers, persistent_firefox
from .checker import Checker
from .bootstrap import create_boilerplate
from . import timeline
logger = logging.getLogger(__name__)
__all__ = [
'Application',
]
[docs]class Application:
def __init__(self):
self.signals = {
'reload_conf_pending': False,
'interrupted': False,
'open_backdoor': False,
'orig': {
signal.SIGINT: None,
signal.SIGTERM: None,
}
}
try:
self.signals['orig'].update({
signal.SIGUSR1: None,
signal.SIGUSR2: None,
})
except AttributeError:
# Unavailable on Windows
pass
[docs] @staticmethod
def bootstrap():
create_boilerplate()
[docs] def run(self, once=False, log_level=logging.INFO, names=None):
# Reset global state for testability:
self.signals.update({
'reload_conf_pending': False,
'interrupted': False,
'open_backdoor': False,
})
self.setup_logger(log_level)
self.connect_signals()
try:
while True:
if self.signals['interrupted']:
return 1
if self.signals['reload_conf_pending']:
settings().reread()
self.signals['reload_conf_pending'] = False
checkers = Checker.create_from_settings(
checks=settings().checks,
names=names
)
if checkers:
self.before_start(checkers)
self.execute_all(checkers)
if once:
return 0
self.check_forever(checkers)
else:
logger.warning("No checks defined. Exiting")
return 1
finally:
cleanup_fetchers()
return 0
[docs] def disconnect_signals(self):
signal.signal(signal.SIGINT, self.signals['orig'][signal.SIGINT])
signal.signal(signal.SIGTERM, self.signals['orig'][signal.SIGTERM])
try:
signal.signal(signal.SIGUSR1, self.signals['orig'][signal.SIGUSR1])
signal.signal(signal.SIGUSR2, self.signals['orig'][signal.SIGUSR2])
except AttributeError:
# Unavailable on Windows
pass
[docs] def connect_signals(self):
self.signals['orig'][signal.SIGINT] = signal.signal(signal.SIGINT, self.on_interrupt)
self.signals['orig'][signal.SIGTERM] = signal.signal(signal.SIGTERM, self.on_interrupt)
try:
self.signals['orig'][signal.SIGUSR1] = signal.signal(signal.SIGUSR1,
self.on_reload_config)
self.signals['orig'][signal.SIGUSR2] = signal.signal(signal.SIGUSR2, self.on_backdoor)
except AttributeError:
# Unavailable on Windows
pass
[docs] @staticmethod
def execute_conf(conf):
logging.basicConfig(level=logging.WARNING)
logging.getLogger('').handlers[0].level = logging.WARNING
checks = SettingsParser().parse_checks(conf)
for check in checks:
Checker(check).check()
[docs] def run_firefox(self):
self.setup_logger(logging.INFO)
persistent_firefox()
[docs] @staticmethod
def telegram_chat():
from .notifier.telegram import get_chat_id # pylint: disable=import-outside-toplevel
get_chat_id()
[docs] @staticmethod
def setup_logger(log_level=logging.INFO):
logging.getLogger("").setLevel(log_level)
[docs] def check_forever(self, checkers):
timeline.schedule_checks(checkers)
logger.info("Starting infinite loop")
while not self.signals['reload_conf_pending']:
if self.signals['interrupted']:
break
if self.signals['open_backdoor']:
self.signals['open_backdoor'] = False
code.interact(
banner="Kibitzr debug shell",
local=locals(),
)
timeline.run_pending()
if self.signals['interrupted']:
break
time.sleep(1)
[docs] def execute_all(self, checkers):
for checker in checkers:
if not self.signals['interrupted']:
checker.check()
else:
break
[docs] def on_reload_config(self, *args, **kwargs):
del args, kwargs
logger.info("Received SIGUSR1. Flagging configuration reload")
self.signals['reload_conf_pending'] = True
[docs] def on_backdoor(self, *args, **kwargs):
del args, kwargs
logger.info("Received SIGUSR2. Flagging backdoor to open")
self.signals['open_backdoor'] = True
[docs] def on_interrupt(self, *args, **kwargs):
del args, kwargs
if not self.signals['interrupted']:
self.signals['interrupted'] = True
else:
# Third Ctrl+C to hard stop:
self.disconnect_signals()
[docs] def before_start(self, checkers):
"""
Loads entry points named kibitzr.before_start
and call each one with two arguments:
1. Application instance;
2. List of configured checkers
"""
for point in entrypoints.get_group_all("kibitzr.before_start"):
entry = point.load()
entry(self, checkers)
[docs] @staticmethod
def send_reload():
"""
Sends SIGUSR1 to all processes that execute kibitzr
"""
user_id = os.geteuid()
for proc in psutil.process_iter(['uids', 'name', 'pid']):
if proc.info['name'] == "kibitzr" and proc.info['uids'][0] == user_id:
proc.send_signal(signal.SIGUSR1)
logger.info("Send singal SIGUSR1 to process: %r", proc.info['pid'])