Source code for ternip.rule_engine.rule_engine

#!/usr/bin/env python

from collections import defaultdict
from glob import glob
import imp
import os.path

[docs]class RuleEngine(object): """ A base class for rule engines to use """ _block_type = None def __init__(self): self._rules = [] self.num_rules = 0
[docs] def load_rules(self, path): """ Do rule loading. Loads all files ending in .pyrule as 'complex' rules (direct Python code), .rule using the documented rule format, and .ruleblock as blocks which contain sequences of rules. For direct Python code, the rule must be a class called 'rule'. Throws rule_load_errors containing errors for all rules that failed to load. """ errors = [] # First load simple rules for filename in glob(os.path.join(path, '*.rule')): # don't bail out after one load failure, load them all and report # all at once with open(filename) as fd: try: self._rules.append(self._load_rule(filename, fd.readlines())) self.num_rules += 1 except RuleLoadError as e: errors.append(e) # Then rule blocks for file in glob(os.path.join(path, '*.ruleblock')): try: self._rules.append(self._load_block(file)) self.num_rules += len(self._rules[-1]._rules) except RuleLoadError as e: errors.append(e) except RuleLoadErrors as e: for error in e.errors: errors.append(e) # Then complex rules for file in glob(os.path.join(path, '*.pyrule')): (dir, modname) = os.path.split(file) modname = modname[:-7] self._rules.append(imp.load_source(modname, file).rule()) self.num_rules += 1 # Now, check the rule's we've just loaded for consistency try: self._check_rule_consistency() except RuleLoadErrors as e: for error in e.errors: errors.append(e) # Bulk raise any errors that occurred if len(errors) > 0: raise RuleLoadErrors(errors)
[docs] def load_rule(self, filename): """ Load a rule, then check for consistency Throws rule_load_error if a rule fails to load """ self._rules.append(self._load_rule(filename)) self._check_rule_consistency()
[docs] def load_block(self, filename): """ Load a block of rules, then check for consistency Throws rule_load_errors if a rule fails to load """ self._rules.append(self._load_block(filename)) self._check_rule_consistency()
def _load_block(self, filename): """ Load a block of rules """ errors = [] # split the files up until rules, separated by '---' with open(filename) as fd: parts = [] part = [] for line in fd: if line.startswith('---'): parts.append(part) part = [] else: part.append(line) parts.append(part) header = parts[0] parts = parts[1:] rules = [] # First block is considered to be the header header = self._parse_rule(filename, header) # Defaults block_type = None id = filename after = [] for key in header: # 'Block-Type' is a compulsory field with limited acceptable values if key == 'block-type': if (len(header[key]) != 1): raise RuleLoadError(filename, "There must be exactly 1 'Block-Type' field") else: type = header[key][0].lower() if type == 'run-all': type = 'all' elif type == 'run-until-success': type = 'until-success' else: errors.append( RuleLoadError(filename, "Only 'run-all' or 'run-until-success' are valid block types")) # ID is an optional field, which can only exist once elif key == 'id': if (len(header[key]) == 1): id = header[key][0] elif (len(header[key]) > 1): errors.append(RuleLoadError(filename, "Too many 'ID' fields")) # After takes multiple, optional values elif key == 'after': after = header[key] # Reject anything else else: errors.append(RuleLoadError(filename, "Key '" + key + "' is not valid in a block header")) # Now load all other parts i = 0 for part in parts: i += 1 try: rules.append(self._load_rule(filename + ":" + str(i), part)) except RuleLoadError as e: errors.append(e) # ID and After are invalid in individual rules for rule in rules: if rule.id[:len(filename)] != filename: errors.append( RuleLoadError(filename, "'ID' fields are invalid outside of the block header in a rule block")) if len(rule.after) > 0: errors.append( RuleLoadError(filename, "'After' fields are invalid outside of the block header in a rule block")) if len(errors) > 0: raise RuleLoadErrors(errors) else: return self._block_type(id, header['after'], type, rules) def _check_rule_consistency(self): """ Check that the rules are internally consistent """ errors = [] # First, get all rule IDs and then all IDs mentioned as after IDs rule_ids = dict() for rule in self._rules: if rule.id in rule_ids: errors.append(RuleLoadError(rule.id, 'Duplicate ID!')) else: rule_ids[rule.id] = rule # Now check each referred to after ID exists for rule in self._rules: circular_check = True for after in rule.after: if after not in rule_ids: errors.append(RuleLoadError(rule.id, 'Reference made to non-existant rule')) # If this happens, don't check for circular references, as # there are dangling references and it causes errors circular_check = False # and check each rule for any circular references if circular_check and self._circular_check(rule.id, rule, rule_ids): errors.append(RuleLoadError(rule.id, 'Circular dependency - rule must run after itself')) # Bulk raise any errors that occurred if len(errors) > 0: raise RuleLoadErrors(errors) def _parse_rule(self, filename, rulelines): """ Private function that takes the lines of a 'simple' rule file, parses the key/value pairs, and then returns them as a dictionary. Does no kind of type or validity checking, and assumes that everything can have multiple values. It's then the caller's responsibility to check what gets returned. """ d = defaultdict(list) for line in rulelines: # ignore empty lines and comments if line.rstrip() != '' and not line.startswith('#'): try: [key, value] = line.split(':', 1) except ValueError: raise RuleLoadError(filename, 'Malformed key-value pair on line: ' + line.rstrip()) d[key.lower()].append(value.strip()) return d def _circular_check(self, search_for, rule, rule_ids): """ Check for any circular references """ if search_for in rule.after: return True else: for after in rule.after: res = self._circular_check(search_for, rule_ids[after], rule_ids) if res: return True return False
[docs]class RuleLoadError(Exception): """ Error for when a rule fails to load """ def __init__(self, filename, errorstr): self._filename = filename self._errorstr = errorstr def __str__(self): return 'Error when loading rule ' + self._filename + ': ' + self._errorstr
[docs]class RuleLoadErrors(Exception): """ Error which bundles multiple rule_load_errors together. Allows for delayed exit on multiple load errors. """ def __init__(self, errors): self.errors = errors def __str__(self): return '\n'.join([str(error) for error in self.errors])