Source code for intelligent_tracker.persistence

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# (C) 2017 David Toro <davsamirtor@gmail.com>

# compatibility with python 2 and 3
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import
from past.builtins import basestring

# import build-in modules
import sys

# import third party modules
import json
from six import reraise
from json import JSONEncoder

# special variables
# __all__ = []
__author__ = "David Toro"
# __copyright__ = "Copyright 2017, The <name> Project"
# __credits__ = [""]
__license__ = "GPL"
# __version__ = "1.0.0"
__maintainer__ = "David Toro"
__email__ = "davsamirtor@gmail.com"
# __status__ = "Pre-release"

### JSON support
# these methods were not packed into a class to prevent customization
# thus setting an standard for dev extended json files

# create json support variables
extended_json_classes = {}
extended_json_watched_classes = {}
# this is implemented to be readable only and
# to set the dev extended json protocol
json_class_id = "__json_class__"  # field in json file for object class name
json_class_data = "__json_data__"  # field in json file for object data
json_class_module = "__json_module__"  # field in json file for object module
json_enco_name = "__json_enco__"  # name of encoding method in supporting class
json_deco_name = "__json_deco__"  # name of decoding method in supporting class


[docs]def get_obj_name(obj): # this should return the same as class_obj.__name__ from it instance obj try: return obj.__class__.__name__ except AttributeError: return type(obj).__name__
[docs]def get_obj_module_name(obj): try: return obj.__module__ # for new-style except AttributeError: return type(obj).__module__ # for old-style
[docs]def register_json_class(class_obj, compatibility_name=None): extended_json_classes[class_obj.__name__] = class_obj if compatibility_name is not None: if not isinstance(compatibility_name, basestring): raise TypeError("compatibility_name must be string") extended_json_classes[compatibility_name] = class_obj
[docs]def register_json_watcher(class_obj, enco=None, deco=None): extended_json_watched_classes[class_obj.__name__] = (class_obj, enco, deco)
# register tuple watcher # by default dev extended json files support tuple # but they can be deactivated by removing the tuple watcher. register_json_watcher(tuple, deco=tuple)
[docs]class DEVJSONEncoder(JSONEncoder): """ Extended json decoder with support for instance classes with encoding methods. """ # https://stackoverflow.com/a/3768975/5288758 # https://docs.python.org/2/library/json.html
[docs] def encode(self, obj): # https://stackoverflow.com/a/15721641/5288758 def hint_tuples(item): if isinstance(item, tuple): return self.default(item) # serialize with default if isinstance(item, list): return [hint_tuples(e) for e in item] # walk list for tuples else: return item return super(DEVJSONEncoder, self).encode(hint_tuples(obj))
[docs] def default(self, o): class_name = get_obj_name(o) module_name = get_obj_module_name(o) try: # process if is a extended class return {json_class_data: getattr(o, json_enco_name)(), # get object data json_class_id: class_name, # get object class json_class_module: module_name # get module } except AttributeError: # watched classes try: if class_name in extended_json_watched_classes: enco = extended_json_watched_classes[class_name][1] if enco is None: data = o else: try: data = enco(o) except Exception: e, msg, traceback = sys.exc_info() msg.args = ( msg.args[0] + ". Watcher encoder failed",) reraise(e, msg, traceback) return {json_class_data: data, # get object data json_class_id: class_name, # get object class json_class_module: module_name # get module } except Exception: e, msg, traceback = sys.exc_info() msg.args = ( msg.args[0] + ". Watched object of class '{}' could not be " "json serialized".format(class_name),) reraise(e, msg, traceback) # special case # if tuple is in register_json_watcher it can be handled there # it gives freedom to the user to return tuple original behaviour if class_name == "tuple": return o return JSONEncoder.default(self, o)
[docs]def object_hook_DEVJSONDecoder(json_object): """ object_hook compatible with DEVJSONEncoder :param json_object: :return: """ # is it a extended object? try: custom_class_name = json_object[json_class_id] json_data = json_object[json_class_data] module_name = json_object[json_class_module] except KeyError: return json_object # try to find extended object class to recreate recreate = None recreate_method = " " to_unpack = False try: cls = extended_json_classes[custom_class_name] recreate_method += "extended list" except KeyError: # special case "tuple" if custom_class_name == "tuple": recreate = tuple elif custom_class_name in extended_json_watched_classes: # find in watched classes recreate, _, deco = extended_json_watched_classes[custom_class_name] if deco is None: to_unpack = True else: recreate = deco recreate_method += "watched list" else: # if not found in priority find in general modules cls_list = [] for module_path in (i for i in sys.modules.keys() if module_name in i): cls = getattr(sys.modules[module_path], custom_class_name, None) if cls is not None: cls_list.append(cls) if len(cls_list) == 0: raise ValueError( "Object of class '{}' could not be found. " "Make sure to register it.".format(custom_class_name)) elif len(cls_list) > 1: raise ValueError( "Multiple inferred '{}' objects classes in {}. " "Make sure to register it.".format(custom_class_name, cls_list)) else: cls = cls_list[0] recreate_method += "modules" if recreate is None: # ensure class has method try: recreate = getattr(cls, json_deco_name) # custom recreation method recreate_method = "{} in{}".format(json_deco_name, recreate_method) except AttributeError: recreate = cls # class __init__ method recreate_method = "__init__ in" + recreate_method to_unpack = True else: recreate_method = "DECO in" + recreate_method # try to recreate object try: if to_unpack: if isinstance(json_data, dict): # unpack dictionary return recreate(**json_data) else: # unpack iterable return recreate(*json_data) else: # provide data "as is" return recreate(json_data) except Exception: e, msg, traceback = sys.exc_info() msg.args = (msg.args[0] + ". Object of class '{}' could not be recreated " "from '{}' method".format(custom_class_name, recreate_method),) reraise(ValueError, msg, traceback)
[docs]def save_configuration(path, data, indent=2, separators=(',', ': '), cls=DEVJSONEncoder, **kwargs): """ Save dev extended json serialization. :param path: save path :param data: custom data :param indent: 2 :param separators: (',', ': ') :param cls: DEVJSONEncoder :param kwargs: additional arguments for json.dump :return: """ json_data = json.dumps(data, indent=indent, # organize data by indent separators=separators, # prevent whitespace cls=cls, **kwargs ) # save only if serialization was successful with open(path, "w") as f: f.write(json_data)
[docs]def load_configuration(path, data=None, object_hook=object_hook_DEVJSONDecoder, cls_enco=DEVJSONEncoder, **kwargs): """ Load dev extended json file with default data if file is not found. :param path: load and save path :param data: any data supported by the extended json format implemented by dev :param object_hook: object_hook_DEVJSONDecoder :param cls_enco: DEVJSONEncoder :param kwargs: additional arguments for json.load :return: deserialized json data """ try: with open(path, "r") as f: loaded = json.load(f, object_hook=object_hook, **kwargs) except IOError: if data is not None: save_configuration(path, data, cls=cls_enco) with open(path, "r") as f: loaded = json.load(f, object_hook=object_hook, **kwargs) return loaded