Source code for deftlariat.core

"""Main module."""
__version__ = '0.0.1'

from abc import ABC, abstractmethod
from hamcrest import anything, match_equality, equal_to, has_item, starts_with, \
    greater_than, greater_than_or_equal_to, less_than, less_than_or_equal_to, \
    close_to, contains_string, string_contains_in_order, equal_to_ignoring_case, \
    equal_to_ignoring_whitespace, not_none, none, any_of, all_of, is_not, is_

import logging

from enum import Enum


[docs]class MatcherType(Enum): NOTHING = 'Nothing' ANYTHING = 'Anything' EQUAL_TO = 'EqualTo' STARTS_WITH = 'StartsWith' CONTAINS_STRING = 'ContainsString' CONTAINS_STRING_IN_ORDER = 'ContainsStringInOrder' EQUAL_TO_IGNORE_CASE = 'EqualToIgnoreCase' EQUAL_TO_IGNORE_WHITESPACE = 'EqualToIgnoreWhitespace' GREATER_THAN = 'GreaterThan' GREATER_THAN_EQUAL_TO = 'GreaterThanEqualTo' LESS_THAN = 'LessThan' LESS_THAN_EQUAL_TO = 'LessThanEqualTo' CLOSE_TO = 'CloseTo' NONE = 'None' NONE_OR_EMPTY = 'NoneOrEmpty' NOT_NONE = 'NotNone' NOT_NONE_OR_EMPTY = 'NotNoneOrEmpty'
# pull values from list...support for list of input parameters and *list syntax
[docs]def pull_val(x): return x
[docs]class Matcher(ABC): def __init__(self, match_col_key): self.matcher_type = MatcherType.NOTHING self.match_col_key = match_col_key self.my_logger = logging.getLogger('matching')
[docs] @abstractmethod def is_match(self, match_values, data_record) -> bool: pass
[docs] def validate_key_exists(self, data_record) -> bool: """ Validate match-key-column exists in data record""" if self.match_col_key not in data_record: self.my_logger.warning((f"'{self.match_col_key}' not present" f" in data record \n\n{data_record}\n\n")) return False else: return True
[docs] def get_key_val(self): """ Generate a value suitable for hashing, dictionary key""" return frozenset((self.matcher_type.value, self.match_col_key))
def __repr__(self): return (f'{self.__class__.__name__}(' f'{self.matcher_type!r}, {self.match_col_key!r})') def __str__(self): return (f'Matcher for {self.matcher_type.value!r} ' f'matching on field {self.match_col_key!r}') def __eq__(self, other): if other.__class__ is self.__class__: return (self.matcher_type, self.match_col_key) == \ (other.matcher_type, other.match_col_key) else: return NotImplemented def __hash__(self): return hash((self.__class__, self.matcher_type, self.match_col_key))
[docs]class NothingMatcher(Matcher): """ Matcher never successfully matches any input. Always returns False. """ def __init__(self, match_col_key): super().__init__(match_col_key) self.match_col_key = match_col_key self.matcher_type = MatcherType.NOTHING
[docs] def is_match(self, match_values, data_record) -> bool: self.my_logger.info("No Matcher set, defaults to Nothing Matcher. Always False.") return False
[docs]class AnythingMatcher(Matcher): """ Matcher always successfully matches any input. Always returns True. """ def __init__(self, match_col_key): super().__init__(match_col_key) self.match_col_key = match_col_key self.matcher_type = MatcherType.ANYTHING self.my_matcher = anything(f"Anything for {match_col_key}")
[docs] def is_match(self, match_values, data_record) -> bool: return match_equality(self.my_matcher) == data_record
[docs]class EqualTo(Matcher): """ Equal To matching style. Cast everything to str. """ def __init__(self, match_col_key, ): super().__init__(match_col_key, ) self.match_col_key = match_col_key self.matcher_type = MatcherType.EQUAL_TO
[docs] def is_match(self, match_values, data_record) -> bool: if not self.validate_key_exists(data_record): return False if len(match_values) == 0: self.my_logger.warning("No Match Values provided, raising Error") raise NotImplementedError("Cannot use Equal To to check for empty " "string. Use None or Not_None.") elif isinstance(match_values, list): if len(match_values) == 0: # covered by len ==0 above pass elif len(match_values) == 1: q_match_values = pull_val(*match_values) return (match_equality(equal_to(q_match_values)) == str(data_record[self.match_col_key])) else: # has_item will iterate a sequence ... return match_equality( has_item(equal_to( data_record[self.match_col_key]))) == [ str(x) for x in match_values] else: return (match_equality(equal_to(match_values)) == str(data_record[self.match_col_key]))
[docs]class TextComparer(Matcher): def __init__(self, match_col_key, matcher_type): super().__init__(match_col_key) self.match_col_key = match_col_key if matcher_type == MatcherType.STARTS_WITH: self.matcher_type = MatcherType.STARTS_WITH self.my_matcher = starts_with elif matcher_type == MatcherType.CONTAINS_STRING: self.matcher_type = MatcherType.CONTAINS_STRING self.my_matcher = contains_string elif matcher_type == MatcherType.CONTAINS_STRING_IN_ORDER: self.matcher_type = MatcherType.CONTAINS_STRING_IN_ORDER self.my_matcher = string_contains_in_order elif matcher_type == MatcherType.EQUAL_TO_IGNORE_CASE: self.matcher_type = MatcherType.EQUAL_TO_IGNORE_CASE self.my_matcher = equal_to_ignoring_case elif matcher_type == MatcherType.EQUAL_TO_IGNORE_WHITESPACE: self.matcher_type = MatcherType.EQUAL_TO_IGNORE_WHITESPACE self.my_matcher = equal_to_ignoring_whitespace else: raise NotImplementedError(f"Matcher for {matcher_type} not implemented")
[docs] def is_match(self, match_values, data_record) -> bool: if not self.validate_key_exists(data_record): return False if len(match_values) == 0: self.my_logger.warning("No Match Values provided, raising Error") raise NotImplementedError(f"Cannot use {self.matcher_type.value} to check for " "empty string. Use None or Not_None.") elif isinstance(match_values, list): if len(match_values) == 1: q_match_values = pull_val(*match_values) return (match_equality(self.my_matcher(q_match_values)) == str(data_record[self.match_col_key])) else: matches_list = [q for q in match_values if match_equality(self.my_matcher(q)) == str(data_record[self.match_col_key])] if len(matches_list) > 0: return True else: return False else: return (match_equality(self.my_matcher(match_values)) == str(data_record[self.match_col_key]))
[docs]class NumberComparer(Matcher): def __init__(self, match_col_key, matcher_type, convert_none_to=None): super().__init__(match_col_key) self.match_col_key = match_col_key self.replacement_val = None if convert_none_to is None: self.convert_none = False else: self.convert_none = True self.replacement_val = convert_none_to if matcher_type == MatcherType.GREATER_THAN: self.matcher_type = MatcherType.GREATER_THAN self.my_matcher = greater_than elif matcher_type == MatcherType.GREATER_THAN_EQUAL_TO: self.matcher_type = MatcherType.GREATER_THAN_EQUAL_TO self.my_matcher = greater_than_or_equal_to elif matcher_type == MatcherType.LESS_THAN: self.matcher_type = MatcherType.LESS_THAN self.my_matcher = less_than elif matcher_type == MatcherType.LESS_THAN_EQUAL_TO: self.matcher_type = MatcherType.LESS_THAN_EQUAL_TO self.my_matcher = less_than_or_equal_to elif matcher_type == MatcherType.CLOSE_TO: self.matcher_type = MatcherType.CLOSE_TO self.my_matcher = close_to else: raise NotImplementedError(f"Matcher for {matcher_type} not implemented")
[docs] def get_record_value(self, data_record) -> int: """ If you want to convert a None to an Int, set a replacement value. """ if self.convert_none and data_record[self.match_col_key] is None: return_val = self.replacement_val else: return_val = data_record[self.match_col_key] return return_val
[docs] def is_match(self, match_values, data_record) -> bool: if not self.validate_key_exists(data_record): return False if isinstance(match_values, list): if len(match_values) == 0: self.my_logger.warning("No Match Values provided, raising Error") cls_name = self.__class__.__name__ raise NotImplementedError(fr"Cannot use {cls_name} to check for " "empty string. Use None or Not_None.") elif len(match_values) == 1: q_match_values = pull_val(*match_values) test_val = self.get_record_value(data_record) return (match_equality(self.my_matcher(q_match_values)) == test_val) else: cls_name = self.__class__.__name__ raise NotImplementedError(fr"Cannot use {cls_name} to check " " a list of values") else: if self.matcher_type == MatcherType.CLOSE_TO: # Expect a tuple for Close To for Num, Delta...so unpack values test_val = self.get_record_value(data_record) return (match_equality(self.my_matcher(*match_values)) == test_val) else: test_val = self.get_record_value(data_record) return (match_equality(self.my_matcher(match_values)) == test_val)
[docs]class ExistsMatchers(Matcher): def __init__(self, match_col_key, matcher_type): super().__init__(match_col_key) self.match_col_key = match_col_key if matcher_type in (MatcherType.NONE, MatcherType.NONE_OR_EMPTY): self.matcher_type = MatcherType(matcher_type) self.my_matcher = none elif matcher_type in (MatcherType.NOT_NONE, MatcherType.NOT_NONE_OR_EMPTY): self.matcher_type = MatcherType(matcher_type) self.my_matcher = not_none else: raise NotImplementedError(f"Matcher for {matcher_type} not implemented")
[docs] def is_match(self, data_record) -> bool: if not self.validate_key_exists(data_record): return False if self.matcher_type in (MatcherType.NONE, MatcherType.NOT_NONE): return (match_equality(self.my_matcher()) == data_record[self.match_col_key]) else: r""" Special case of chaining equal to with none """ if self.matcher_type == MatcherType.NONE_OR_EMPTY: result = match_equality(any_of( self.my_matcher(), equal_to(''), equal_to([]), equal_to({}), equal_to(()) )) == data_record[self.match_col_key] return result elif self.matcher_type == MatcherType.NOT_NONE_OR_EMPTY: result = match_equality(all_of( self.my_matcher(), is_not(equal_to('')), is_not(equal_to([])), is_not(equal_to({})), is_not(equal_to(())) )) == data_record[self.match_col_key] return result else: raise NotImplementedError(f"Matcher for {self.matcher_type} not implemented")