Source code for hestia_earth.calculation.representation

import copy
from .version import VERSION
from hestia_earth.schema import EmissionMethodTier
from hestia_earth.calculation.utils import EngineDict, EngineList, parse_methodTier


[docs]class DataStore(): def __init__(self): """Constructor method. Arguments: None Returns instance.""" self.cycle = None self.site = None self.assessment = None self.representation_cycle = None self.representation_assessment = None self.output_cycle = None self.output_assessmet = None
[docs] def clean(self, clean_fields, key): """Method that eliminates the non-used fields of the emissions output nodes dictionaries. Arguments: clean_fields: The dictionary where the field to be cleaned is in. key: the key of the field to be cleaned. Returns nothind.""" if clean_fields[key] == "" or clean_fields[key] is None or clean_fields[key] == "-": del (clean_fields[key]) elif isinstance(clean_fields[key], dict) or isinstance(clean_fields[key], list): if len(clean_fields[key]) == 0: del (clean_fields[key])
[docs] def inspect_field(self, clean_fields, key): """Method that inspects if non-used fields are present in a particular node. Arguments: clean_fields: The dictionary where the field to be cleaned is in. key: the key of the field to be cleaned. Returns a dictionary without the unused fields.""" if isinstance(clean_fields[key], list) and not clean_fields[key] == []: for object in clean_fields[key]: if not isinstance(object, str) and not isinstance(object, int) and\ not isinstance(object, float) and not isinstance(object, list): self.clean_crawler_fields(object) elif isinstance(clean_fields[key], dict) and not clean_fields[key] == {}: self.clean_crawler_fields(clean_fields[key]) else: self.clean(clean_fields, key)
[docs] def clean_crawler_fields(self, mobject): """Method that crawls nodes looking for empty fields. Arguments: mobject: The output emission node. Returns a dictionary without the unused fields.""" clean_fields = mobject for key in list(clean_fields.keys()): self.inspect_field(clean_fields, key) return clean_fields
[docs] def converter_edict(self, object): """Method that converts a dictionary to an EngineDict if it is a dictionary, or a list of dictionaries to a list of EngineDicts. If it is not a dictionary or a collection of dictionaries returns the object itself. Arguments: object: The object to be converted to EngineDict. Returns the object converted to EngineDict or the object otherwise.""" if isinstance(object, dict): EngineDict(object) return self.converter_crawler_edict(object) elif isinstance(object, list): return [self.converter_edict(item) for item in object] else: return object
[docs] def converter_crawler_edict(self, object): """Method that converts all the dictionaries in a cycle to EngineDict. Arguments: object: A Hestia cycle. Returns a cycle where all the dictionaries are EngineDict.""" mobject = EngineDict(object) for key in list(mobject.keys()): mobject[key] = self.converter_edict(mobject[key]) return mobject
[docs] def converter_dict(self, object): """Method that converts an EngineDict to a dictionary if it is an EngineDict, or a list (or elist) of EngingeDicts to a list of dictionaries. If it is not a dictionary or a collection of dictionaries returns the object itself. Arguments: object: The object to be converted to dictionary. Returns the object converted to dictionary or the object otherwise.""" if isinstance(object, EngineDict): dict(object) return self.converter_crawler_dict(object) elif isinstance(object, dict): return self.converter_crawler_dict(object) elif isinstance(object, list) or isinstance(object, EngineList): return [self.converter_dict(item) for item in object] else: return object
[docs] def converter_crawler_dict(self, object): """Method that converts all the EngineDict in a cycle to dictionaries. Arguments: object: A Hestia cycle. Returns a cycle where all the EngineDict are dictionaries.""" mobject = dict(object) if object is not None else {} for key in list(mobject.keys()): mobject[key] = self.converter_dict(mobject[key]) return mobject
[docs] def converter_dict_to_list(self, node, key): """Method that converts a key of a node (that is a dictionary) into a list defined in Hestia's Schema. Arguments: node: A Hestia node key: A key of the node Returns nothing.""" if key in node: node[key] = node[key].evalues()
[docs] def converter_list_to_dict(self, node, key): """Method that converts a subkey of a node (that is a dictionary) into a list defined in Hestia's Schema. Arguments: node: A Hestia node key: A subkey of the node Returns nothing.""" if key in node: elist = EngineList(node[key]) node[key] = elist.to_edict()
[docs] def converter_subdict_to_list(self, node, key): """Method that converts a key of the subnodes of a node (that are lists) into a dictionary to easily access data. Arguments: node: A Hestia node key: A key of the subnodes Returns nothing.""" subnodes = list(node.values()) for subnode in subnodes: if key in subnode: self.converter_dict_to_list(subnode, key)
[docs] def converter_subsubdict_to_list(self, node, subkey, subsubkey): """Method that converts a subsubkey of the subnodes of a node (that are lists) into a dictionary to easily access data. Arguments: node: A Hestia node subkey: A subkey of the subnodes Returns nothing.""" subnodes = list(node.values()) for subnode in subnodes: if subkey in subnode and subsubkey in subnode[subkey]: self.converter_dict_to_list(subnode[subkey], subsubkey)
[docs] def converter_sublist_to_dict(self, node, key): """Method that converts a subkey of the subnodes of a node (that are lists) into a dictionary to easily access data. Arguments: node: A Hestia node key: A subkey of the subnodes Returns nothing.""" subnodes = list(node.values()) for subnode in subnodes: if key in subnode: self.converter_list_to_dict(subnode, key)
[docs] def converter_subsublist_to_dict(self, node, subkey, subsubkey): """Method that converts a subsubkey of the subnodes of a node (that are lists) into a dictionary to easily access data. Arguments: node: A Hestia node subkey: A subkey of the subnodes subsubkey: A subsubkey of the subnodes Returns nothing.""" subnodes = list(node.values()) for subnode in subnodes: if subkey in subnode and subsubkey in subnode[subkey]: self.converter_list_to_dict(subnode[subkey], subsubkey)
[docs] def import_assessment(self, assessment): """Method that imports a jsonld file into the representation required to perform the Hestia calculations. Arguments: file: A jsonld file Returns nothing.""" if assessment is not None: self.save_original_assessment(assessment) else: self.representation_assessment = None if self.assessment is not None and len(self.assessment) > 0 and 'cycle' in self.assessment[0]: self.representation_cycle = self.import_cycle(self.cycle, None) if self.assessment != [] and self.assessment is not None: self.representation_assessment =\ EngineDict({element['product']['@id']: EngineDict(element) for element in self.assessment}) for element in self.representation_assessment.evalues(): self.converter_list_to_dict(element['product'], 'defaultProperties') self.converter_list_to_dict(element, 'emissionsResourceUse') self.converter_subsublist_to_dict(element['emissionsResourceUse'], 'term', 'defaultProperties') self.converter_list_to_dict(element, 'impacts') elif self.assessment == [] and self.assessment is not None: self.representation_assessment = EngineDict() return self.representation_assessment
[docs] def import_cycle(self, cycle, site=None): """Method that imports a jsonld file into the representation required to perform the Hestia calculations. Arguments: file: A jsonld file Returns nothing.""" self.save_original_cycle(cycle, site) if 'site' in self.cycle and self.cycle['site'] is not None: self.converter_list_to_dict(self.cycle['site'], 'measurements') self.converter_subsublist_to_dict(self.cycle['site']['measurements'], 'term', 'defaultProperties') if 'inputs' in self.cycle and self.cycle['inputs'] is not None: self.converter_list_to_dict(self.cycle, 'inputs') self.converter_sublist_to_dict(self.cycle['inputs'], 'properties') self.converter_subsublist_to_dict(self.cycle['inputs'], 'term', 'defaultProperties') if 'products' in self.cycle and self.cycle['products'] is not None: self.converter_list_to_dict(self.cycle, 'products') self.converter_sublist_to_dict(self.cycle['products'], 'properties') self.converter_subsublist_to_dict(self.cycle['products'], 'term', 'defaultProperties') if 'emissions' in self.cycle and self.cycle['emissions'] is not None: self.converter_list_to_dict(self.cycle, 'emissions') self.converter_sublist_to_dict(self.cycle['emissions'], 'properties') self.converter_subsublist_to_dict(self.cycle['emissions'], 'term', 'defaultProperties') if 'practices' in self.cycle and self.cycle['practices'] is not None: self.converter_list_to_dict(self.cycle, 'practices') self.representation_cycle = self.cycle self.safe_representation = self.trasnform_engine_dict(self.cycle) return self.representation_cycle
[docs] def export_assessment(self): """Method that imports a jsonld file into the representation required to perform the Hestia calculations. Arguments: file: A jsonld file Returns nothing.""" self.output_assessmet = copy.deepcopy(self.representation_assessment) if self.output_assessmet is not None: for element in self.output_assessmet.evalues(): self.converter_subsubdict_to_list(element['emissionsResourceUse'], 'term', 'defaultProperties') self.converter_dict_to_list(element, 'emissionsResourceUse') self.converter_dict_to_list(element['product'], 'defaultProperties') self.converter_dict_to_list(element, 'impacts') self.clean_crawler_fields(element) return list(self.converter_crawler_dict(self.output_assessmet).values())
[docs] def export_cycle(self): """Method that exports the Engine representation into the Hestia Schema. Arguments: None Returns a dict representing the recalculated cycle.""" self.output_cycle = copy.deepcopy(self.representation_cycle) if 'site' in self.cycle and self.cycle['site'] is not None: self.converter_subsubdict_to_list(self.output_cycle['site']['measurements'], 'term', 'defaultProperties') self.converter_dict_to_list(self.output_cycle['site'], 'measurements') if 'inputs' in self.cycle and self.cycle['inputs'] is not None: self.converter_subsubdict_to_list(self.output_cycle['inputs'], 'term', 'defaultProperties') self.converter_subdict_to_list(self.output_cycle['inputs'], 'properties') self.converter_dict_to_list(self.output_cycle, 'inputs') if 'products' in self.cycle and self.cycle['products'] is not None: self.converter_subsubdict_to_list(self.output_cycle['products'], 'term', 'defaultProperties') self.converter_subdict_to_list(self.output_cycle['products'], 'properties') self.converter_dict_to_list(self.output_cycle, 'products') if 'emissions' in self.cycle and self.cycle['emissions'] is not None: self.converter_subsubdict_to_list(self.output_cycle['emissions'], 'term', 'defaultProperties') self.converter_subdict_to_list(self.output_cycle['emissions'], 'properties') self.converter_dict_to_list(self.output_cycle, 'emissions') if 'practices' in self.cycle and self.cycle['practices'] is not None: self.converter_dict_to_list(self.output_cycle, 'practices') return self.converter_crawler_dict(self.clean_crawler_fields(self.output_cycle))
[docs] def save_original_assessment(self, assessment): """Method that saves the original data in the engine. Arguments: cycle: the original cycle site: the original site Returns nothing.""" self.assessment = copy.deepcopy(assessment) if len(assessment) > 0: self.cycle = copy.deepcopy(assessment[0]['cycle']) if 'cycle' in assessment[0] else None if 'site' in assessment[0]: self.site = copy.deepcopy(assessment[0]['site']) elif self.site is not None and 'site' in self.cycle: self.site = self.cycle['site'] else: self.site = None
[docs] def save_original_cycle(self, cycle, site): """Method that saves the original data in the engine. Arguments: cycle: the original cycle site: the original site Returns nothing.""" self.cycle = copy.deepcopy(cycle) if site is not None: self.site = copy.deepcopy(site) elif cycle is not None and 'site' in cycle: self.site = copy.deepcopy(cycle['site']) else: self.site = None if self.site is not None: self.cycle["site"] = self.site self.cycle = self.converter_crawler_edict(self.cycle)
[docs] def trasnform_engine_dict(self, representation): """Method to convert all the nested dictionaries to EngineDict. Arguments: key: a cycle representation Returns a cycle representation with all the dictionaries converted to EngineDict.""" endict = EngineDict(representation) for key, node in endict.items(): if isinstance(node, dict) or isinstance(node, EngineDict): endict[key] = self.trasnform_engine_dict(node) if isinstance(node, list) or isinstance(node, EngineList): for i, v in enumerate(node): if isinstance(v, dict): endict[key][i] = self.trasnform_engine_dict(v) return endict
[docs] def update_representation_assessments(self, calc_data): """Method that updates the impact assessments of an assessment representation. Arguments: calc_data: A dataStore object containing the new assessments. Returns nothing.""" if self.representation_assessment is None: self.representation_assessment = EngineDict() for impact_assessment in calc_data.representation_assessment.evalues(): if self.representation_assessment is not None and\ impact_assessment['product']['@id'] not in self.representation_assessment: self.representation_assessment[impact_assessment['product']['@id']] = impact_assessment else: self.update_indicators(impact_assessment, 'emissionsResourceUse') self.update_indicators(impact_assessment, 'impacts')
[docs] def update_indicators(self, assessment_new, key): """Method that updates the indicators with a recalculated impact assessment. Arguments: assessment_new: The new impact assessment with the new indicators. Returns nothing.""" pid = assessment_new['product']['@id'] for indicator in assessment_new[key]: if indicator in self.representation_assessment[pid][key] and \ 'dataState' in self.representation_assessment[pid][key][indicator] and \ 'dataVersion' in self.representation_assessment[pid][key][indicator] and\ self.representation_assessment[pid][key][indicator]['dataState'] == "original" and\ self.representation_assessment[pid][key][indicator]['dataVersion'] == VERSION and\ not self.is_different([self.representation_assessment[pid][key][indicator]['value']], [assessment_new[key][indicator]['value']]): pass elif key not in self.representation_assessment[pid]: self.representation_assessment[pid][key] = EngineDict() self.representation_assessment[pid][key][indicator] = assessment_new[key][indicator] else: self.representation_assessment[pid][key][indicator] = assessment_new[key][indicator]
[docs] def update_representation_emission(self, emission): """Method that updates a emission of the cycle representation Arguments: emision: A node with the emission to update. Returns nothing.""" no_rewriting = [EmissionMethodTier.BACKGROUND.value, EmissionMethodTier.MEASURED.value] eid = emission['term']['@id'] tier_new = emission['methodTier'] if self.representation_cycle['emissions'] == {}: self.representation_cycle['emissions'] = EngineDict() if self.representation_cycle['emissions'].get(eid) is not None and \ self.representation_cycle['emissions'][eid].get('methodTier') and\ self.representation_cycle['emissions'][eid].get('methodTier') not in no_rewriting and\ parse_methodTier(tier_new) >=\ parse_methodTier(self.representation_cycle['emissions'][eid].get('methodTier')) and\ self.is_different(self.representation_cycle['emissions'][eid]['value'], emission['value']): emission_versions = self.representation_cycle['emissions'][eid].get('dataVersion') if self.check_version(emission_versions): self.representation_cycle['emissions'][eid] = emission elif self.representation_cycle['emissions'].get(eid) is None: self.representation_cycle['emissions'][eid] = emission
[docs] def is_different(self, original_list, calculated_list): """Method that checks if two values are equal. Arguments: original_list: The original list. calculated_list: The recalculated list. Returns boolean.""" original = sum(original_list) calculated = sum(calculated_list) diff = abs(original - calculated) if calculated > 0.001: return True if diff/calculated > 0.01 else False else: return True if diff > 0.001/2 else False
[docs] def check_version(self, emission_versions): """Method that checks if two the node is using the most recent version of the schema. Arguments: emission_versions: The version of the the node to be checked. Returns boolean.""" if emission_versions is [] or emission_versions is None: return True elif emission_versions[-1] < VERSION: return True else: return False