""" Schema loading and validation.
"""
import base64
import json
import pkgutil
from typing import Any, Dict, Hashable, Optional, Sequence, Text, TextIO
from copy import deepcopy
import jsonschema
from clkhash.field_formats import FieldSpec, spec_from_json_dict, InvalidSchemaError
from clkhash.key_derivation import DEFAULT_KEY_SIZE as DEFAULT_KDF_KEY_SIZE
MASTER_SCHEMA_FILE_NAMES = {1: 'v1.json',
2: 'v2.json',
3: 'v3.json'} # type: Dict[Hashable, Text]
[docs]class SchemaError(Exception):
""" The user-defined schema is invalid.
"""
def __init__(self,
msg: str,
errors: Optional[Sequence[InvalidSchemaError]] = None
) -> None:
self.msg = msg
self.errors = [] if errors is None else errors
super().__init__(msg)
def __str__(self) -> str:
detail = ""
for i, e in enumerate(self.errors, start=1):
detail += f"Error {i} in feature at index {e.field_spec_index} - {str(e)}\n"
detail += f"Invalid spec:\n{e.json_field_spec}\n---\n"
return self.msg + '\n\n' + detail
[docs]class MasterSchemaError(Exception):
""" Master schema missing? Corrupted? Otherwise surprising? This is
the exception for you!
"""
[docs]class Schema:
"""Linkage Schema which describes how to encode plaintext identifiers.
:ivar fields: the features or field definitions
:ivar int l: The length of the resulting encoding in bits. This is the length after XOR folding.
:ivar int xor_folds: The number of XOR folds to perform on the hash.
:ivar str kdf_type: The key derivation function to use. Currently,
the only permitted value is 'HKDF'.
:ivar str kdf_hash: The hash function to use in key derivation. The
options are 'SHA256' and 'SHA512'.
:ivar bytes kdf_info: The info for key derivation. See documentation
of :func:`key_derivation.hkdf` for details.
:ivar bytes kdf_salt: The salt for key derivation. See documentation
of :func:`key_derivation.hkdf` for details.
:ivar int kdf_key_size: The size of the derived keys in bytes.
"""
def __init__(self,
fields: Sequence[FieldSpec],
l: int,
xor_folds: int = 0,
kdf_type: str = 'HKDF',
kdf_hash: str = 'SHA256',
kdf_info: Optional[bytes] = None,
kdf_salt: Optional[bytes] = None,
kdf_key_size: int = DEFAULT_KDF_KEY_SIZE
) -> None:
self.fields = fields
self.l = l
self.xor_folds = xor_folds
self.kdf_type = kdf_type
self.kdf_type = kdf_type
self.kdf_hash = kdf_hash
self.kdf_info = kdf_info
self.kdf_salt = kdf_salt
self.kdf_key_size = kdf_key_size
def __repr__(self):
return f"<Schema (v3): {len(self.fields)} fields>"
def _convert_v1_to_v2(schema_dict: Dict[str, Any]) -> Dict[str, Any]:
"""
Convert v1 schema dict to v2 schema dict.
:param schema_dict: v1 schema dict
:return: v2 schema dict
"""
schema_dict = deepcopy(schema_dict)
version = schema_dict['version']
if version != 1:
raise ValueError(f'Version {version} not 1')
clk_config = schema_dict['clkConfig']
k = clk_config.pop('k')
clk_hash = clk_config['hash']
def convert_feature(f):
if f.get('ignored', False):
return f
hashing = f['hashing']
weight = hashing.get('weight', 1.0)
if weight == 0:
return {
'identifier': f['identifier'],
'ignored': True
}
x = deepcopy(f)
hashing = x['hashing']
if 'weight' in hashing:
del hashing['weight']
hashing['strategy'] = {}
hashing['strategy']['k'] = int(round(weight * k))
hashing['hash'] = clk_hash
return x
result = {
'version': 2,
'clkConfig': {
'l': clk_config['l'],
'xor_folds': clk_config.get('xor_folds', 0),
'kdf': clk_config['kdf']
},
'features': list(map(convert_feature, schema_dict['features']))
}
return result
def _convert_v2_to_v3(schema_dict: Dict[str, Any]) -> Dict[str, Any]:
"""
Convert v2 schema dict to v3 schema dict.
:param schema_dict: v2 schema dict
:return: v3 schema dict
"""
schema_dict = deepcopy(schema_dict)
version = schema_dict['version']
if version != 2:
raise ValueError(f'Version {version} not 2')
schema_dict['version'] = 3
for feature in schema_dict['features']:
if feature.get('ignored', False):
continue
strategy = feature['hashing']['strategy']
if 'k' in strategy:
strategy['bitsPerToken'] = strategy.pop('k')
elif 'numBits' in strategy:
strategy['bitsPerFeature'] = strategy.pop('numBits')
ngrams = feature['hashing']['ngram']
feature['hashing']['comparison'] = {'type': 'ngram', 'n': feature['hashing'].pop('ngram'), 'positional': feature['hashing'].pop('positional', False)}
return schema_dict
[docs]def convert_to_latest_version(schema_dict: Dict[str, Any], validate_result: Optional[bool] = False) -> Dict[str, Any]:
""" Convert the given schema to latest schema version.
:param schema_dict: A dictionary describing a linkage schema. This dictionary must have a `'version'` key
containing a master schema version. The rest of the schema dict must conform to the corresponding
master schema.
:param validate_result: validate converted schema against schema specification
:return: schema dict of the latest version
:raises SchemaError: if schema version is not supported
"""
version = schema_dict.get('version', "'not specified'")
if version not in MASTER_SCHEMA_FILE_NAMES.keys():
msg = ('Schema version {} is not supported. '
'Consider updating clkhash.').format(version)
raise SchemaError(msg)
if schema_dict['version'] == 1:
schema_dict = _convert_v1_to_v2(schema_dict)
if schema_dict['version'] == 2:
schema_dict = _convert_v2_to_v3(schema_dict)
if validate_result:
validate_schema_dict(schema_dict)
return schema_dict
[docs]def from_json_dict(dct: Dict[str, Any], validate: bool = True) -> Schema:
""" Create a Schema of the most recent version according to dct
if the provided schema dict is of an older version, then it will be automatically converted to the latest.
:param dct: This dictionary must have a `'features'`
key specifying the columns of the dataset. It must have
a `'version'` key containing the master schema version
that this schema conforms to. It must have a `'hash'`
key with all the globals.
:param validate: (default True) Raise an exception if the
schema does not conform to the master schema.
:raises SchemaError: An exception containing details about why
the schema is not valid.
:return: the Schema
"""
if validate:
# This raises iff the schema is invalid.
validate_schema_dict(dct)
dct = convert_to_latest_version(dct)
if validate:
validate_schema_dict(dct)
clk_config = dct['clkConfig']
l = clk_config['l']
xor_folds = clk_config.get('xor_folds', 0)
kdf = clk_config['kdf']
kdf_type = kdf['type']
kdf_hash = kdf.get('hash', 'SHA256')
kdf_info_string = kdf.get('info')
kdf_info = (base64.b64decode(kdf_info_string)
if kdf_info_string is not None
else None)
kdf_salt_string = kdf.get('salt')
kdf_salt = (base64.b64decode(kdf_salt_string)
if kdf_salt_string is not None
else None)
kdf_key_size = kdf.get('keySize', DEFAULT_KDF_KEY_SIZE)
# Try to parse each feature config and store any errors encountered
# for reporting.
feature_errors = []
feature_configs = []
for i, feature_config in enumerate(dct['features']):
try:
feature_configs.append(spec_from_json_dict(feature_config))
except InvalidSchemaError as e:
e.field_spec_index = i
e.json_field_spec = feature_config
feature_errors.append(e)
if len(feature_errors):
raise SchemaError("Schema was invalid", feature_errors)
return Schema(feature_configs, l, xor_folds,
kdf_type, kdf_hash, kdf_info, kdf_salt, kdf_key_size)
[docs]def from_json_file(schema_file: TextIO, validate: bool = True) -> Schema:
""" Load a Schema object from a json file.
:param schema_file: A JSON file containing the schema.
:param validate: (default True) Raise an exception if the
schema does not conform to the master schema.
:raises SchemaError: When the schema is invalid.
:return: the Schema
"""
try:
schema_dict = json.load(schema_file)
except ValueError as e: # In Python 3 we can be more specific
# with json.decoder.JSONDecodeError,
# but that doesn't exist in Python 2.
msg = 'The schema is not a valid JSON file.'
raise SchemaError(msg) from e
return from_json_dict(schema_dict, validate=validate)
def _get_master_schema(version: Hashable) -> dict:
""" Loads the master schema of given version
:param version: The version of the master schema whose path we
wish to retrieve.
:raises SchemaError: When the schema version is unknown. This
usually means that either (a) clkhash is out of date, or (b)
the schema version listed is incorrect.
:raises MasterSchemaError: When the master schema is invalid.
:return: Dict object of the (json) master schema.
"""
try:
file_name = MASTER_SCHEMA_FILE_NAMES[version]
except (TypeError, KeyError) as e:
msg = ('Schema version {} is not supported. '
'Consider updating clkhash.').format(version)
raise SchemaError(msg) from e
try:
schema_bytes = pkgutil.get_data('clkhash', f'schemas/{file_name}')
if schema_bytes is None:
msg = ('The master schema could not be loaded. The schema cannot be '
'validated. Please file a bug report.')
raise MasterSchemaError(msg)
except FileNotFoundError as e:
msg = ('The master schema could not be found. The schema cannot be '
'validated. Please file a bug report.')
raise MasterSchemaError(msg) from e
try:
master_schema = json.loads(schema_bytes.decode('utf-8'))
return master_schema
except json.decoder.JSONDecodeError as e:
msg = ('The master schema is not a valid JSON file. The schema cannot '
'be validated. Please file a bug report.')
raise MasterSchemaError(msg) from e
[docs]def validate_schema_dict(schema: Dict[str, Any]) -> None:
""" Validate the schema.
This raises iff either the schema or the master schema are
invalid. If it's successful, it returns nothing.
:param schema: The schema to validate, as parsed by `json`.
:raises SchemaError: When the schema is invalid.
:raises MasterSchemaError: When the master schema is invalid.
"""
if not isinstance(schema, dict):
msg = ('The top level of the schema file is a {}, whereas a dict is '
'expected.'.format(type(schema).__name__))
raise SchemaError(msg)
if 'version' in schema:
version = schema['version']
else:
raise SchemaError('A format version is expected in the schema.')
master_schema = _get_master_schema(version)
try:
jsonschema.validate(schema, master_schema)
except jsonschema.exceptions.ValidationError as e:
raise SchemaError('The schema is not valid.\n\n' + str(e)) from e
except jsonschema.exceptions.SchemaError as e:
msg = ('The master schema is not valid. The schema cannot be '
'validated. Please file a bug report.')
raise MasterSchemaError(msg) from e