Source code for kibitzr.fetcher.browser.fetcher

import os
import time
import shutil
import logging
import traceback
import collections

from selenium.common.exceptions import (
    NoSuchElementException,
    StaleElementReferenceException,
)
from selenium.webdriver.common.by import By
from jinja2 import Template

from kibitzr.conf import settings
from .launcher import firefox, PROFILE_DIR
from .trigger import prompt_return


logger = logging.getLogger(__name__)
HOME_PAGE = 'https://kibitzr.github.io/'


[docs]def persistent_firefox(): if not os.path.exists(PROFILE_DIR): os.makedirs(PROFILE_DIR) print("Kibitzr is running Firefox in persistent profile mode.") with firefox(headless=False) as driver: driver.get(HOME_PAGE) while True: prompt_return() try: # Property raises when browser is closed: driver.title except: # All kinds of things happen when closing Firefox break else: update_profile(driver) print( "Firefox profile is saved. " "Close browser now to reuse profile in further runs." ) print( "Firefox is closed. " "To delete saved profile remove following directory: " f"{os.path.abspath(PROFILE_DIR)}" )
[docs]def update_profile(driver): if os.path.exists(PROFILE_DIR): shutil.rmtree(PROFILE_DIR) shutil.copytree( driver.capabilities['moz:profile'], PROFILE_DIR, ignore=shutil.ignore_patterns( "parent.lock", "lock", ".parentlock", "*.sqlite-shm", "*.sqlite-wal", ), )
[docs]def firefox_fetcher(conf): headless = conf.get('headless', True) with firefox(headless) as driver: fetcher = FirefoxFetcher(driver) return fetcher.fetch(conf)
[docs]class FirefoxFetcher: def __init__(self, driver): self.driver = driver
[docs] def fetch(self, conf): """ 1. Fetch URL 2. Run automation. 3. Return HTML. 4. Close the tab. """ url = conf['url'] # If Firefox is broken, it will raise here, causing kibitzr restart: self.driver.set_window_size(1366, 800) self.driver.implicitly_wait(2) self.driver.get(url) try: self._run_automation(conf) html = self._get_html() except: logger.exception( "Exception occurred while fetching" ) return False, traceback.format_exc() finally: self._close_tab() return True, html
def _run_automation(self, conf): """ 1. Fill form. 2. Run scenario. 3. Delay. """ self._fill_form(self._find_form(conf)) self._run_scenario(conf) self._delay(conf) @staticmethod def _fill_form(form): """ Fill all inputs with provided Jinja2 templates. If no field had click key, submit last element. """ clicked = False last_element = None for field in form: if field['text']: field['element'].clear() field['element'].send_keys(field['text']) if field['click']: field['element'].click() clicked = True last_element = field['element'] if last_element: if not clicked: last_element.submit() def _find_form(self, conf): """ Find elements defined in conf['form']. Render all Jinja2 templates from field['value']. Save all field['click'] triggers. If all found, return them as a list of dictionaries. If at least one was not found, return empty list. """ form = conf.get('form', []) fields = [] creds = settings().creds first_field = True for field in form: click = field.get('click') text = self._parse_field_text(field, conf, creds) selector_type, selector = self._parse_field_selector(field) if selector: element = self._find_element( selector, selector_type, check_displayed=first_field, ) if element: fields.append({ 'element': element, 'text': text, 'click': click, }) else: logging.warning("Element {%s: %s} not found", selector_type, selector) else: logging.warning("Failed to parse selector %r", field) first_field = False if len(fields) == len(form): return fields logging.info( "Skipped form filling because not all fields were found" ) return [] @staticmethod def _parse_field_selector(field): """ Check field keys: css, xpath and id. Return the first found along with it's value. """ for selector_type in ('css', 'xpath', 'id'): selector = field.get(selector_type) if selector: return selector_type, selector logger.warning("Form field does not define any selector " "(id, css and xpath available): %r", field) return None, None @staticmethod def _parse_field_text(field, conf, creds): """ Return form field value from 3 options: 1. If value key is present, render it as a Jinja2 template. The template has access to ``conf`` and ``creds`` dictionaries. 2. If creds key is present, use it as a dot-delimited path in creds dictionary (e.g. ``account.login``). 3. Otherwise return None """ value = field.get('value') if value: template = Template(value) return template.render( conf=conf, creds=creds, ) creds_path = field.get('creds') if creds_path: bread_crumbs = creds_path.split('.') node = creds for crumb in bread_crumbs: node = node[crumb] return node return None def _run_scenario(self, conf): scenario = conf.get('scenario') if isinstance(scenario, collections.abc.Mapping): code = scenario['python'] elements = scenario.get('elements', {}) else: code = scenario elements = {} if elements: element_objects = self._find_elements(elements) else: element_objects = {} if code: self._exec_scenario(code, conf, element_objects) @staticmethod def _delay(conf): delay = conf.get('delay') if delay: time.sleep(delay) def _get_html(self): total_attempts = 3 for attempt in range(1, total_attempts + 1): try: elem = self.driver.find_element(By.XPATH, "//*") html = elem.get_attribute("outerHTML") except (NoSuchElementException, StaleElementReferenceException): # Crazy (but stable) race condition, # new page loaded after call to find_element # Just retry: if attempt == total_attempts: raise time.sleep(1) else: if html is None: if attempt == total_attempts: html = "" else: time.sleep(1) continue return html return None def _find_elements(self, elements): logger.info("Finding elements") result = {} for key, selector in elements.items(): name, _, selector_type = key.partition("|") result[name] = self._find_element(selector, selector_type.lower()) return result def _find_element(self, selector, selector_type, check_displayed=False): """ Return first matching displayed element of non-zero size or None if nothing found """ if selector_type == 'css': elements = self.driver.find_elements(By.CSS_SELECTOR, selector) elif selector_type == 'xpath': elements = self.driver.find_elements(By.XPATH, selector) elif selector_type == 'id': elements = self.driver.find_elements(By.CSS_SELECTOR, '#' + selector) else: raise RuntimeError( f"Unknown selector_type: {selector_type} for selector: {selector}" ) for element in elements: if check_displayed: if not element.is_displayed() or sum(element.size.values()) <= 0: continue return element def _exec_scenario(self, code, conf, elements): logger.info("Executing custom scenario") logger.debug(code) exec( # pylint: disable=exec-used code, { 'conf': conf, 'creds': settings().creds, 'driver': self.driver, 'elements': elements, 'By': By, }, ) def _close_tab(self): """ Create a new tab and close the old one to avoid idle page resource usage """ old_tab = self.driver.current_window_handle self.driver.execute_script('''window.open("about:blank", "_blank");''') self.driver.switch_to.window(old_tab) self.driver.close() self.driver.switch_to.window(self.driver.window_handles[0])