Source code for kibitzr.storage

import os
import itertools
import io
import shutil

from kibitzr.compat import sh

from .utils import normalize_filename


[docs]def report_changes(conf, content): return PageHistory(conf).report_changes(content)
[docs]class PageHistory: """Single file changes history using git.""" STORAGE_DIR = "pages" def __init__(self, conf, storage_dir=None, style=None): self.storage_dir = storage_dir or self.STORAGE_DIR self.cwd = os.path.join( self.storage_dir, normalize_filename(conf['name']), ) self.target = os.path.join(self.cwd, "content") self.git = sh.Command('git').bake( '--no-pager', _cwd=self.cwd, ) self.ensure_repo_exists() if conf.get('url'): self.commit_msg = f"{conf['name']} at {conf.get('url')}" else: self.commit_msg = conf['name'] self.reporter = ChangesReporter( self.git, self.commit_msg, style, )
[docs] def report_changes(self, content): """Save contents and commit it to git. 1. Write changes in file. 2. Commit changes in git. 3. If something changed, return tuple(True, changes). 4. If nothing changed, return tuple(False, None). If style is "verbose", return changes in human-friendly format, else use unified diff """ self.write(content) if self.commit(): return True, self.reporter.report() return False, None
[docs] def write(self, content): """Save content on disk""" with io.open(self.target, 'w', encoding='utf-8') as fp: fp.write(content) if not content.endswith('\n'): fp.write('\n')
[docs] def commit(self): """git commit and return whether there were changes""" self.git.add('-A', '.') try: self.git.commit('-m', self.commit_msg) return True except sh.ErrorReturnCode_1: return False
[docs] def ensure_repo_exists(self): """Create git repo if one does not exist yet""" if not os.path.isdir(self.cwd): os.makedirs(self.cwd) if not os.path.isdir(os.path.join(self.cwd, ".git")): self.git.init() self.git.config("user.email", "you@example.com") self.git.config("user.name", "Your Name")
[docs] @staticmethod def clean(): """Remove storage dir (delete all git repos)""" shutil.rmtree(PageHistory.STORAGE_DIR)
[docs]def ensure_unicode(text): # On Windows + Python 3 stdout is Unicode # and can't be decoded. try: return text.decode("utf-8") except AttributeError: return text
[docs]class ChangesReporter: def __init__(self, git, subject, style=None): self.git = git self.subject = subject self.report = getattr(self, style or 'default', self.default)
[docs] def word(self): """Return last changes with word diff""" try: output = ensure_unicode(self.git.diff( '--no-color', '--word-diff=plain', 'HEAD~1:content', 'HEAD:content', ).stdout) except sh.ErrorReturnCode_128: result = ensure_unicode(self.git.show( "HEAD:content" ).stdout) else: ago = ensure_unicode(self.git.log( '-2', '--pretty=format:last change was %cr', 'content' ).stdout).splitlines() lines = output.splitlines() result = '\n'.join( itertools.chain( itertools.islice( itertools.dropwhile( lambda x: not x.startswith('@@'), lines[1:], ), 1, None, ), itertools.islice(ago, 1, None), ) ) return result
[docs] def default(self): """Return last changes in truncated unified diff format""" output = ensure_unicode(self.git.log( '-1', '-p', '--no-color', '--format=%s', ).stdout) lines = output.splitlines() return '\n'.join( itertools.chain( lines[:1], itertools.islice( itertools.dropwhile( lambda x: not x.startswith('+++'), lines[1:], ), 1, None, ), ) )
[docs] def verbose(self): """Return changes in human-friendly format #14""" try: before = self.git.show('HEAD~1:content').strip() except sh.ErrorReturnCode_128: before = None after = self.git.show('HEAD:content').strip() if before is not None: return ( f"{self.subject}\nNew value:\n{after}\n" f"Old value:\n{before}\n" ) return '\n'.join([self.subject, after])
[docs] def new(self): content = self.git.show('HEAD:content').strip() return '\n'.join([self.subject, content])