# -*- coding: utf-8 -*-
""" Classes that specify the requirements for each column in a dataset.
They take care of validation, and produce the settings required to
perform the hashing.
"""
from __future__ import unicode_literals
import abc
import re
from datetime import datetime
from typing import Any, Dict, Iterable, Optional, Text, cast
from future.builtins import range, super
from six import add_metaclass
from clkhash.backports import raise_from, re_compile_full, strftime
[docs]class InvalidEntryError(ValueError):
""" An entry in the data file does not conform to the schema.
"""
field_spec = None # type: Optional[FieldSpec]
[docs]class InvalidSchemaError(ValueError):
"""Raised if the schema of a field specification is invalid.
For example, a regular expression included in the schema is not
syntactically correct.
"""
json_field_spec = None # type: Optional[dict]
field_spec_index = None # type: Optional[int]
[docs]class MissingValueSpec(object):
""" Stores the information about how to find and treat missing
values.
:ivar str sentinel: sentinel is the string that identifies a
missing value e.g.: 'N/A', ''.
The sentinel will not be validated against the
feature format definition
:ivar str replaceWith: defines the string which replaces the
sentinel whenever present, can be 'None', then sentinel will
not be replaced.
"""
def __init__(self,
sentinel, # type: str
replace_with=None # type: Optional[str]
):
# type: (...) -> None
self.sentinel = sentinel
self.replace_with = (replace_with if replace_with is not None
else sentinel)
[docs] @classmethod
def from_json_dict(cls, json_dict):
# type: (Dict[str, Any]) -> MissingValueSpec
return cls(
sentinel=json_dict['sentinel'],
replace_with=cast(Optional[str], json_dict.get('replaceWith'))
)
[docs]class FieldHashingProperties(object):
"""
Stores the settings used to hash a field.
This includes the encoding and tokenisation parameters.
:ivar str encoding: The encoding to use when converting the
string to bytes. Refer to
`Python's documentation <https://docs.python.org/3/library/codecs.html#standard-encodings>`
for possible values.
:ivar int ngram: The n in n-gram. Possible values are 0, 1, and
2.
:ivar bool positional: Controls whether the n-grams are
positional.
:ivar int num_bits: dynamic k = num_bits / number of n-grams
:ivar int k: max number of bits per n-gram
"""
_DEFAULT_ENCODING = 'utf-8'
_DEFAULT_POSITIONAL = False
def __init__(self,
ngram, # type: int
encoding=_DEFAULT_ENCODING, # type: str
positional=_DEFAULT_POSITIONAL, # type: bool
hash_type='blakeHash', # type: str
prevent_singularity=None, # type: Optional[bool]
num_bits=None, # type: Optional[int]
k=None, # type: Optional[int]
missing_value=None # type: Optional[MissingValueSpec]
):
# type: (...) -> None
""" Make a :class:`FieldHashingProperties` object, setting it
attributes to values specified in keyword arguments.
"""
if ngram not in range(3):
msg = 'ngram is {} but is expected to be 0, 1, or 2.'
raise ValueError(msg.format(ngram))
try:
''.encode(encoding)
except LookupError as e:
msg = '{} is not a valid Python encoding.'
raise_from(ValueError(msg.format(encoding)), e)
if prevent_singularity is not None and hash_type != 'doubleHash':
raise ValueError("Prevent_singularity must only be specified"
" with hash_type doubleHash.")
if not num_bits and not k:
raise ValueError('One of num_bits or k must be specified.')
self.ngram = ngram
self.encoding = encoding
self.positional = positional
self.hash_type = hash_type
self.prevent_singularity = prevent_singularity
self.num_bits = num_bits
self.k = k
self.missing_value = missing_value
[docs] def ks(self, num_ngrams):
# type (int) -> [int]
"""
Provide a k for each ngram in the field value.
:param num_ngrams: number of ngrams in the field value
:return: [ k, ... ] a k value for each of num_ngrams such that the sum is exactly num_bits
"""
if self.num_bits:
k = int(self.num_bits / num_ngrams)
residue = self.num_bits % num_ngrams
return ([k + 1] * residue) + ([k] * (num_ngrams - residue))
else:
return [self.k if self.k else 0] * num_ngrams
[docs] def replace_missing_value(self, str_in):
# type: (Text) -> Text
""" returns 'str_in' if it is not equals to the 'sentinel' as
defined in the missingValue section of
the schema. Else it will return the 'replaceWith' value.
:param str_in:
:return: str_in or the missingValue replacement value
"""
if self.missing_value is None:
return str_in
elif self.missing_value.sentinel == str_in:
return self.missing_value.replace_with
else:
return str_in
[docs]def fhp_from_json_dict(
json_dict # type: Dict[str, Any]
):
# type: (...) -> FieldHashingProperties
"""
Make a :class:`FieldHashingProperties` object from a dictionary.
:param dict json_dict:
Conforming to the `hashingConfig` definition
in the `v2` linkage schema.
:return: A :class:`FieldHashingProperties` instance.
"""
hashing_strategy = json_dict['strategy']
h = json_dict.get('hash', {'type': 'blakeHash'})
num_bits = hashing_strategy.get('numBits')
k = hashing_strategy.get('k')
return FieldHashingProperties(
ngram=json_dict['ngram'],
positional=json_dict.get(
'positional', FieldHashingProperties._DEFAULT_POSITIONAL),
hash_type=h['type'],
prevent_singularity=h.get('prevent_singularity'),
num_bits=num_bits,
k=k,
missing_value=MissingValueSpec.from_json_dict(
json_dict[
'missingValue']) if 'missingValue' in json_dict else None
)
[docs]@add_metaclass(abc.ABCMeta)
class FieldSpec(object):
""" Abstract base class representing the specification of a column
in the dataset. Subclasses validate entries, and modify the
`hashing_properties` ivar to customise hashing procedures.
:ivar str identifier: The name of the field.
:ivar str description: Description of the field format.
:ivar FieldHashingProperties hashing_properties: The properties
for hashing. None if field ignored.
"""
def __init__(self,
identifier, # type: str
hashing_properties, # type: Optional[FieldHashingProperties]
description=None # type: Optional[str]
):
# type: (...) -> None
""" Make a FieldSpec object, setting it attributes to values
specified in keyword arguments.
"""
self.identifier = identifier
self.hashing_properties = hashing_properties
self.description = description
[docs] @classmethod
def from_json_dict(cls,
field_dict # type: Dict[str, Any]
):
# type: (...) -> FieldSpec
""" Initialise a FieldSpec object from a dictionary of
properties.
:param dict field_dict: The properties dictionary to use. Must
contain a `'hashing'` key that meets the requirements of
:class:`FieldHashingProperties`. Subclasses may require
:raises InvalidSchemaError: When the `properties`
dictionary contains invalid values. Exactly what that
means is decided by the subclasses.
"""
identifier = field_dict['identifier']
description = field_dict['format'].get('description')
hashing_properties = fhp_from_json_dict(field_dict['hashing']) if 'hashing' in field_dict else None
result = cls.__new__(cls) # type: ignore
result.identifier = identifier
result.hashing_properties = hashing_properties
result.description = description
return result
[docs] @abc.abstractmethod
def validate(self, str_in):
# type: (Text) -> None
""" Validates an entry in the field.
Raises :class:`InvalidEntryError` iff the entry is invalid.
Subclasses must override this method with their own
validation. They should call the parent's `validate` method
via `super`.
:param str str_in: String to validate.
:raises InvalidEntryError: When entry is invalid.
"""
if self.hashing_properties: # else its Ignore
try:
str_in.encode(encoding=self.hashing_properties.encoding)
except UnicodeEncodeError as err:
msg = ("Expected entry that can be encoded in {}. Read '{}'."
.format(self.hashing_properties.encoding, str_in))
e_new = InvalidEntryError(msg)
e_new.field_spec = self
raise_from(e_new, err)
[docs] def is_missing_value(self, str_in):
# type: (Text) -> bool
""" tests if 'str_in' is the sentinel value for this field
:param str str_in: String to test if it stands for missing value
:return: True if a missing value is defined for this field and
str_in matches this value
"""
return (self.hashing_properties is not None and
self.hashing_properties.missing_value is not None and
self.hashing_properties.missing_value.sentinel == str_in)
def _format_regular_value(self, str_in):
# type: (Text) -> Text
""" overwrite this if you want to modify 'str_in' before hashing.
:param str_in:
:return: a string representation of 'str_in' which is ready to
be hashed
"""
return str_in
[docs]class StringSpec(FieldSpec):
""" Represents a field that holds strings.
One way to specify the format of the entries is to provide a
regular expression that they must conform to. Another is to
provide zero or more of: minimum length, maximum length, casing
(lower, upper, mixed).
Each string field also specifies an encoding used when turning
characters into bytes. This is stored in `hashing_properties`
since it is needed for hashing.
:ivar str encoding: The encoding to use when converting the
string to bytes. Refer to
`Python's documentation
<https://docs.python.org/3/library/codecs.html#standard
-encodings>`_
for possible values.
:ivar regex: Compiled regular expression that entries must
conform to. Present only if the specification is regex-
-based.
:ivar str case: The casing of the entries. One of `'lower'`,
`'upper'`, or `'mixed'`. Default is `'mixed'`. Present only
if the specification is not regex-based.
:ivar int min_length: The minimum length of the string. `None`
if there is no minimum length. Present only if the
specification is not regex-based.
:ivar int max_length: The maximum length of the string. `None`
if there is no maximum length. Present only if the
specification is not regex-based.
"""
_DEFAULT_CASE = 'mixed'
_DEFAULT_MIN_LENGTH = 0
_PERMITTED_CASE_STYLES = {'lower', 'upper', 'mixed'}
def __init__(self,
identifier, # type: str
hashing_properties, # type: FieldHashingProperties
description=None, # type: Optional[str]
regex=None, # type: Optional[str]
case=_DEFAULT_CASE, # type: str
min_length=_DEFAULT_MIN_LENGTH, # type: int
max_length=None # type: Optional[int]
):
# type: (...) -> None
""" Make a StringSpec object, setting it attributes to values
specified in keyword arguments.
"""
# noinspection PyCompatibility,PyArgumentList
super().__init__(identifier=identifier,
description=description,
hashing_properties=hashing_properties)
regex_based = regex is not None
if regex_based and (case != self._DEFAULT_CASE
or min_length != self._DEFAULT_MIN_LENGTH
or max_length is not None):
msg = ('regex cannot be passed along with case, min_length, or'
' max_length.')
raise ValueError(msg)
if case not in self._PERMITTED_CASE_STYLES:
msg = ("the case is {}, but should be 'lower', 'upper', or"
"'mixed'")
raise ValueError(msg.format(case))
if min_length < 0:
msg = 'min_length must be non-negative, but is {}'
raise ValueError(msg.format(min_length))
# type checker thinks max_length is of type None
# noinspection PyTypeChecker
if max_length is not None and max_length <= 0:
msg = 'max_length must be positive, but is {}'
raise ValueError(msg.format(max_length))
if regex_based:
regex_str = cast(str, regex)
try:
compiled_regex = re_compile_full(regex_str)
self.regex = compiled_regex
except (SyntaxError, re.error) as e:
msg = "invalid regular expression '{}.'".format(regex_str)
e_new = InvalidEntryError(msg)
e_new.field_spec = self
raise_from(e_new, e)
else:
self.case = case
self.min_length = min_length
self.max_length = max_length
self.regex_based = regex_based
[docs] @classmethod
def from_json_dict(cls,
json_dict # type: Dict[str, Any]
):
# type: (...) -> StringSpec
""" Make a StringSpec object from a dictionary containing its
properties.
:param dict json_dict: This dictionary must contain an
`'encoding'` key associated with a Python-conformant
encoding. It must also contain a `'hashing'` key, whose
contents are passed to :class:`FieldHashingProperties`.
Permitted keys also include `'pattern'`, `'case'`,
`'minLength'`, and `'maxLength'`.
:raises InvalidSchemaError: When a regular expression is
provided but is not a valid pattern.
"""
# noinspection PyCompatibility
result = cast(StringSpec, # Go away, Mypy.
super().from_json_dict(json_dict))
format_ = json_dict['format']
if 'encoding' in format_ and result.hashing_properties:
result.hashing_properties.encoding = format_['encoding']
if 'pattern' in format_:
pattern = format_['pattern']
try:
result.regex = re_compile_full(pattern)
except (SyntaxError, re.error) as e:
msg = "Invalid regular expression '{}.'".format(pattern)
e_new = InvalidSchemaError(msg)
e_new.json_field_spec = json_dict
raise_from(e_new, e)
result.regex_based = True
else:
result.case = format_.get('case', StringSpec._DEFAULT_CASE)
result.min_length = format_.get('minLength')
result.max_length = format_.get('maxLength')
result.regex_based = False
return result
[docs] def validate(self, str_in):
# type: (Text) -> None
""" Validates an entry in the field.
Raises `InvalidEntryError` iff the entry is invalid.
An entry is invalid iff (1) a pattern is part of the
specification of the field and the string does not match
it; (2) the string does not match the provided casing,
minimum length, or maximum length; or (3) the specified
encoding cannot represent the string.
:param str str_in: String to validate.
:raises InvalidEntryError: When entry is invalid.
:raises ValueError: When self.case is not one of the
permitted values (`'lower'`, `'upper'`, or `'mixed'`).
"""
if self.is_missing_value(str_in):
return
# noinspection PyCompatibility
super().validate(str_in) # Validate encoding.
if self.regex_based:
match = self.regex.match(str_in)
if match is None:
e = InvalidEntryError(
'Expected entry that conforms to regular expression '
"'{}'. Read '{}'.".format(self.regex.pattern, str_in))
e.field_spec = self
raise e
else:
str_len = len(str_in)
if self.min_length is not None and str_len < self.min_length:
e = InvalidEntryError(
"Expected string length of at least {}. Read string '{}' "
'of length {}.'.format(self.min_length, str_in, str_len))
e.field_spec = self
raise e
if self.max_length is not None and str_len > self.max_length:
e = InvalidEntryError(
"Expected string length of at most {}. Read string '{}' "
'of length {}.'.format(self.max_length, str_in, str_len))
e.field_spec = self
raise e
if self.case == 'upper':
if str_in.upper() != str_in:
msg = "Expected upper case string. Read '{}'.".format(
str_in)
e = InvalidEntryError(msg)
e.field_spec = self
raise e
elif self.case == 'lower':
if str_in.lower() != str_in:
msg = "Expected lower case string. Read '{}'.".format(
str_in)
e = InvalidEntryError(msg)
e.field_spec = self
raise e
elif self.case == 'mixed':
pass
else:
raise ValueError(
'Invalid case property {}.'.format(self.case))
[docs]class IntegerSpec(FieldSpec):
""" Represents a field that holds integers.
Minimum and maximum values may be specified.
:ivar int minimum: The minimum permitted value.
:ivar int maximum: The maximum permitted value or None.
"""
def __init__(self,
identifier, # type: str
hashing_properties, # type: FieldHashingProperties
description=None, # type: Optional[str]
minimum=None, # type: Optional[int]
maximum=None, # type: Optional[int]
**kwargs # type: Dict[str, Any]
):
# type: (...) -> None
""" Make a IntegerSpec object, setting it attributes to values
specified in keyword arguments.
"""
# noinspection PyCompatibility,PyArgumentList
super().__init__(identifier=identifier,
description=description,
hashing_properties=hashing_properties)
self.minimum = minimum
self.maximum = maximum
[docs] @classmethod
def from_json_dict(cls,
json_dict # type: Dict[str, Any]
):
# type: (...) -> IntegerSpec
""" Make a IntegerSpec object from a dictionary containing its
properties.
:param dict json_dict: This dictionary may contain
`'minimum'` and `'maximum'` keys. In addition, it must
contain a `'hashing'` key, whose contents are passed to
:class:`FieldHashingProperties`.
:param dict json_dict: The properties dictionary.
"""
# noinspection PyCompatibility
result = cast(IntegerSpec, # For Mypy.
super().from_json_dict(json_dict))
format_ = json_dict['format']
result.minimum = format_.get('minimum')
result.maximum = format_.get('maximum')
return result
[docs] def validate(self, str_in):
# type: (Text) -> None
""" Validates an entry in the field.
Raises `InvalidEntryError` iff the entry is invalid.
An entry is invalid iff (1) the string does not represent a
base-10 integer; (2) the integer is not between
`self.minimum` and `self.maximum`, if those exist; or (3)
the integer is negative.
:param str str_in: String to validate.
:raises InvalidEntryError: When entry is invalid.
"""
if self.is_missing_value(str_in):
return
# noinspection PyCompatibility
super().validate(str_in)
try:
value = int(str_in, base=10)
except ValueError as e:
msg = "Invalid integer. Read '{}'.".format(str_in)
e_new = InvalidEntryError(msg)
e_new.field_spec = self
raise_from(e_new, e)
return # to stop PyCharm thinking that value might be undefined
# later
if self.minimum is not None and value < self.minimum:
msg = ("Expected integer value of at least {}. Read '{}'."
.format(self.minimum, value))
e_new = InvalidEntryError(msg)
e_new.field_spec = self
raise e_new
if self.maximum is not None and value > self.maximum:
msg = ("Expected integer value of at most {}. Read '{}'."
.format(self.maximum, value))
e_new = InvalidEntryError(msg)
e_new.field_spec = self
raise e_new
def _format_regular_value(self, str_in):
# type: (Text) -> Text
""" we need to reformat integer strings, as there can be different
strings for the same integer. The
strategy of unification here is to first parse the integer
string to an Integer type. Thus all of
'+13', ' 13', '13' will be parsed to 13. We then convert the
integer value to an unambiguous string
(no whitespaces, leading '-' for negative numbers, no leading '+').
:param str_in: integer string
:return: integer string without whitespaces, leading '-' for
negative numbers, no leading '+'
"""
try:
value = int(str_in, base=10)
return str(value)
except ValueError as e:
msg = "Invalid integer. Read '{}'.".format(str_in)
e_new = InvalidEntryError(msg)
e_new.field_spec = self
raise_from(e_new, e)
[docs]class DateSpec(FieldSpec):
""" Represents a field that holds dates.
Dates are specified as full-dates in a format that can be described
as a *strptime()* (C89 standard) compatible
format string.
E.g.: the format for the standard internet format `RFC3339
<https://tools.ietf.org/html/rfc3339>`_
(e.g. 1996-12-19) is '%Y-%m-%d'.
:ivar str format: The format of the date.
"""
OUTPUT_FORMAT = '%Y%m%d'
def __init__(self,
identifier, # type: str
hashing_properties, # type: FieldHashingProperties
format, # type: str
description=None # type: Optional[str]
):
# type: (...) -> None
""" Make a DateSpec object, setting it attributes to values
specified in keyword arguments.
"""
# noinspection PyCompatibility,PyArgumentList
super().__init__(identifier=identifier,
description=description,
hashing_properties=hashing_properties)
self.format = format
[docs] @classmethod
def from_json_dict(cls,
json_dict # type: Dict[str, Any]
):
# type: (...) -> DateSpec
""" Make a DateSpec object from a dictionary containing its
properties.
:param dict json_dict: This dictionary must contain a
`'format'` key. In addition, it must contain a
`'hashing'` key, whose contents are passed to
:class:`FieldHashingProperties`.
:param json_dict: The properties dictionary.
"""
# noinspection PyCompatibility
result = cast(DateSpec, # For Mypy.
super().from_json_dict(json_dict))
format_ = json_dict['format']
result.format = format_['format']
return result
[docs] def validate(self, str_in):
# type: (Text) -> None
""" Validates an entry in the field.
Raises `InvalidEntryError` iff the entry is invalid.
An entry is invalid iff (1) the string does not represent a
date in the correct format; or (2) the date it represents
is invalid (such as 30 February).
:param str str_in: String to validate.
:raises InvalidEntryError: Iff entry is invalid.
:raises ValueError: When self.format is unrecognised.
"""
if self.is_missing_value(str_in):
return
# noinspection PyCompatibility
super().validate(str_in)
try:
datetime.strptime(str_in, self.format)
except ValueError as e:
msg = "Validation error for date type: {}".format(e)
e_new = InvalidEntryError(msg)
e_new.field_spec = self
raise_from(e_new, e)
def _format_regular_value(self, str_in):
# type: (Text) -> Text
""" we overwrite default behaviour as we want to hash the numbers
only, no fillers like '-', or '/'
:param str str_in: date string
:return: str date string with format DateSpec.OUTPUT_FORMAT
"""
try:
dt = datetime.strptime(str_in, self.format)
return strftime(dt, DateSpec.OUTPUT_FORMAT)
except ValueError as e:
msg = "Unable to format date value '{}'. Reason: {}".format(str_in,
e)
e_new = InvalidEntryError(msg)
e_new.field_spec = self
raise_from(e_new, e)
[docs]class EnumSpec(FieldSpec):
""" Represents a field that holds an enum.
The finite collection of permitted values must be specified.
:ivar values: The set of permitted values.
"""
def __init__(self,
identifier, # type: str
hashing_properties, # type: FieldHashingProperties
values, # type: Iterable[str]
description=None # type: Optional[str]
):
# type: (...) -> None
""" Make a EnumSpec object, setting it attributes to values
specified in keyword arguments.
"""
# noinspection PyCompatibility,PyArgumentList
super().__init__(identifier=identifier,
description=description,
hashing_properties=hashing_properties)
self.values = set(values)
[docs] @classmethod
def from_json_dict(cls,
json_dict # type: Dict[str, Any]
):
# type: (...) -> EnumSpec
""" Make a EnumSpec object from a dictionary containing its
properties.
:param dict json_dict: This dictionary must contain an
`'enum'` key specifying the permitted values. In
addition, it must contain a `'hashing'` key, whose
contents are passed to :class:`FieldHashingProperties`.
"""
# noinspection PyCompatibility
result = cast(EnumSpec, # Appease the gods of Mypy.
super().from_json_dict(json_dict))
format_ = json_dict['format']
result.values = set(format_['values'])
return result
[docs] def validate(self, str_in):
# type: (Text) -> None
""" Validates an entry in the field.
Raises `InvalidEntryError` iff the entry is invalid.
An entry is invalid iff it is not one of the permitted
values.
:param str str_in: String to validate.
:raises InvalidEntryError: When entry is invalid.
"""
if self.is_missing_value(str_in):
return
# noinspection PyCompatibility
super().validate(str_in)
if str_in not in self.values:
msg = ("Expected enum value to be one of {}. Read '{}'."
.format(list(self.values), str_in))
e = InvalidEntryError(msg)
e.field_spec = self
raise e
[docs]class Ignore(FieldSpec):
"""
represent a field which will be ignored throughout the clk processing.
"""
def __init__(self,
identifier=None # type: Optional[str]
):
# type: (...) -> None
# noinspection PyCompatibility
super().__init__('' if identifier is None else identifier,
None)
[docs] def validate(self, str_in):
pass
# Map type string (as defined in master schema) to
FIELD_TYPE_MAP = {
'string': StringSpec,
'integer': IntegerSpec,
'date': DateSpec,
'enum': EnumSpec,
}
[docs]def spec_from_json_dict(
json_dict # type: Dict[str, Any]
):
# type: (...) -> FieldSpec
""" Turns a dictionary into the appropriate FieldSpec object.
:param dict json_dict: A dictionary with properties.
:raises InvalidSchemaError:
:returns: An initialised instance of the appropriate FieldSpec
subclass.
"""
try:
if json_dict.get('ignored', False):
return Ignore(json_dict['identifier'])
type_str = json_dict['format']['type']
spec_type = cast(FieldSpec, FIELD_TYPE_MAP[type_str])
except KeyError as e:
raise InvalidSchemaError("the feature definition {} is incomplete. Must contain: {}".format(json_dict, e))
return spec_type.from_json_dict(json_dict)