"""
.. module:: engine
:platform: Unix, Windows
:synopsis: Main rule engine class
"""
# Python Standard Libraries
import copy
import logging
from typing import Tuple
# 3rd Party Libraries
# Project Specific Libraries
from elevaso_spine.rule.base import RuleEngineBase
from elevaso_spine.rule.explain import explain_rule
from elevaso_spine.rule.eval import (
format_case_sensitive,
format_none,
format_trim,
format_value,
)
from elevaso_spine.rule.operators import CONDITIONS, OPERATORS
from elevaso_spine.rule.validate import validate_rule
LOGGER = logging.getLogger(__name__)
[docs]
class RuleEngine(RuleEngineBase):
"""
A class to evaluate a set of records against a complex set of rules to
identify matches
"""
[docs]
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.__rule_set__ = []
self.__record_set__ = []
self.__output = []
[docs]
def add_rule(self, rule: dict, rule_num: str = None):
"""Add a single rule
Args:
rule (dict): Single rule in dictionary format
rule_num (str, Optional): Rule number, defaults to the order
number when added
"""
if rule_num is None:
rule_num = self.rule_len + 1
# TODO Check if the rule_num already exists and throw error
self.add_rule_set([{"rule_num": rule_num, "rule": rule}])
[docs]
def add_rule_set(self, rule_set: list):
"""Add a list of rules pre-formatted
Args:
rule_set (list): List of dictionary containing rule_num, and rule
Raises:
TypeError if rule_set is not in list format
"""
if not isinstance(rule_set, list):
self.__raise_exception__(
TypeError("rule_set must be in list format")
)
for item in rule_set:
try:
validate_rule(item)
except Exception as exc:
self.__raise_exception__(exc)
rule_set = copy.deepcopy(rule_set)
self.__rule_set__.extend(rule_set)
[docs]
def add_record(self, record: dict):
"""Add a single record
Args:
record (dict): Single record in dictionary format
"""
if not isinstance(record, dict):
self.__raise_exception__(
TypeError("record must be in dictionary format")
)
self.add_record_set([record])
[docs]
def add_record_set(self, record_set: list):
"""Add list of records to the rule engine
Args:
record_set (list): List of records in dictionary format
"""
if not isinstance(record_set, list):
self.__raise_exception__(
TypeError("record_set must be in list format")
)
if not isinstance(record_set[0], dict):
self.__raise_exception__(
TypeError("record_set items must be in dictionary format")
)
record_set = copy.deepcopy(record_set)
self.__record_set__.extend(record_set)
def __check_values__(self) -> Tuple[list, list]:
"""Check to make sure rule(s) and record(s) exist
Returns:
list representing the formatted rule_set
list representing the formatted record_set
"""
if len(self.__rule_set__) == 0:
self.__raise_exception__(ValueError("Rules must be provided"))
if len(self.__record_set__) == 0:
self.__raise_exception__(ValueError("Records must be provided"))
rule_set = [self.__apply_settings__(item)
for item in self.__rule_set__]
record_set = [
self.__apply_settings__(item) for item in self.__record_set__
]
return rule_set, record_set
[docs]
def eval(self):
"""Perform evaluation of rule_set against record_set"""
rule_set, record_set = self.__check_values__()
self.__output = []
for record in record_set:
matching_rules = []
for rule in rule_set:
if self.__eval_rule__(record, rule["rule"]):
matching_rules.append(rule["rule_num"])
self.__output.append((record, matching_rules))
def __eval_rule__(self, record: dict, rule: dict) -> bool:
"""Evaluate a single rule against a record
Args:
record (dict): Dictionary representing the record
rule (dict): Rule to evaluate
Returns:
bool if the record matches the rule
"""
output = False
for key, value in rule.items():
if key in CONDITIONS.keys():
output = self.__eval_condition__(key, record, value)
elif key in OPERATORS.keys():
output = self.__eval_operator__(key, record, value)
return output
def __eval_condition__(
self, condition: str, record: str, rule: dict
) -> bool:
"""Evaluate a condition statement
Args:
condition (str): Condition key
record (dict): Dictionary representing the record
rule (dict): Remainder of the rule
Returns:
bool if the record matches the rule
"""
output = []
for key, value in rule.items():
if key in CONDITIONS.keys():
output.append(self.__eval_condition__(key, record, value))
elif key in OPERATORS.keys():
output.append(
self.__eval_operator__(key, record, value, condition)
)
if len(output) > 1:
condition_func = CONDITIONS.get(condition, "AND")
return condition_func(*output)
return output[0]
def __eval_operator__(
self, operator: str, record: str, rule: dict, condition: str = None
) -> bool:
"""Evaluate a condition statement
Args:
operator (str): Operator key
record (dict): Dictionary representing the record
rule (dict): Remainder of the rule
condition (str, Optional): Condition key, defaults to None
Returns:
bool if the record matches the rule
"""
output = []
operator_func = OPERATORS[operator]
if isinstance(rule, dict):
rule = [rule]
for item in rule:
for key, value in item.items():
format_value(
key,
record,
output,
cls=self,
operator_func=operator_func,
value=value,
)
if len(output) > 1:
condition_func = CONDITIONS.get(condition, "AND")
return condition_func(*output)
return output[0]
[docs]
def explain_rule(self, rule_num: object) -> str:
"""Explain a single rule in simple terms
Args:
rule_num (object): Rule number to explain
Returns:
str representing the explained rule
"""
rule = list(
filter(lambda x: x["rule_num"] == rule_num, self.__rule_set__)
)
if len(rule) == 1:
return explain_rule(rule[0]["rule"])
if len(rule) == 0:
self.__raise_exception__(
ValueError(f"{rule_num} not found in rule_set")
)
return ""
def __apply_settings__(self, item: dict) -> dict:
"""Apply settings to a single item (rule or record)
Args:
item (dict): Dictionary of the item
Returns:
dict representing the formmatted item
"""
output = {}
for key, value in item.items():
formatted_key = self.__apply_settings_to_value__(key)
if isinstance(value, dict):
formatted_value = self.__apply_settings__(value)
elif isinstance(value, list):
if len(value) == 0:
self.__raise_exception__(
ValueError("value cannot be empty list")
)
elif isinstance(value[0], (dict, list)):
formatted_value = [
self.__apply_settings__(item) for item in value
]
else:
formatted_value = [
self.__apply_settings_to_value__(item)
for item in value
]
else:
formatted_value = self.__apply_settings_to_value__(value)
output[formatted_key] = formatted_value
return output
def __apply_settings_to_value__(self, value: object) -> object:
"""Apply rule engine settings to a single value
Args:
value (object): Object to apply settings to
Returns:
object representing the formatted value
"""
output = value
output = format_case_sensitive(output, self.case_sensitive)
output = format_trim(output, self.trim_strings)
output = format_none(output, self.empty_equals_none)
return output
@property
def rule_len(self) -> int:
"""Number of rules in the engine
Returns:
int representing the number of rules
"""
return len(self.__rule_set__)
@property
def record_len(self) -> int:
"""Number of records in the engine
Returns:
int representing the number of records
"""
return len(self.__record_set__)
@property
def output(self) -> list:
"""Return output from eval function
Returns:
list of tuple containing the record and list of rules matched
"""
return self.__output