import copy
from .version import VERSION
from hestia_earth.schema import EmissionMethodTier
from hestia_earth.calculation.utils import EngineDict, EngineList, parse_methodTier
[docs]class DataStore():
def __init__(self):
"""Constructor method.
Arguments:
None
Returns instance."""
self.cycle = None
self.site = None
self.assessment = None
self.representation_cycle = None
self.representation_assessment = None
self.output_cycle = None
self.output_assessmet = None
[docs] def clean(self, clean_fields, key):
"""Method that eliminates the non-used fields of the emissions output nodes dictionaries.
Arguments:
clean_fields: The dictionary where the field to be cleaned is in.
key: the key of the field to be cleaned.
Returns nothind."""
if clean_fields[key] == "" or clean_fields[key] is None or clean_fields[key] == "-":
del (clean_fields[key])
elif isinstance(clean_fields[key], dict) or isinstance(clean_fields[key], list):
if len(clean_fields[key]) == 0:
del (clean_fields[key])
[docs] def inspect_field(self, clean_fields, key):
"""Method that inspects if non-used fields are present in a particular node.
Arguments:
clean_fields: The dictionary where the field to be cleaned is in.
key: the key of the field to be cleaned.
Returns a dictionary without the unused fields."""
if isinstance(clean_fields[key], list) and not clean_fields[key] == []:
for object in clean_fields[key]:
if not isinstance(object, str) and not isinstance(object, int) and\
not isinstance(object, float) and not isinstance(object, list):
self.clean_crawler_fields(object)
elif isinstance(clean_fields[key], dict) and not clean_fields[key] == {}:
self.clean_crawler_fields(clean_fields[key])
else:
self.clean(clean_fields, key)
[docs] def clean_crawler_fields(self, mobject):
"""Method that crawls nodes looking for empty fields.
Arguments:
mobject: The output emission node.
Returns a dictionary without the unused fields."""
clean_fields = mobject
for key in list(clean_fields.keys()):
self.inspect_field(clean_fields, key)
return clean_fields
[docs] def converter_edict(self, object):
"""Method that converts a dictionary to an EngineDict if it is a dictionary, or a list of dictionaries to a list
of EngineDicts. If it is not a dictionary or a collection of dictionaries returns the object itself.
Arguments:
object: The object to be converted to EngineDict.
Returns the object converted to EngineDict or the object otherwise."""
if isinstance(object, dict):
EngineDict(object)
return self.converter_crawler_edict(object)
elif isinstance(object, list):
return [self.converter_edict(item) for item in object]
else:
return object
[docs] def converter_crawler_edict(self, object):
"""Method that converts all the dictionaries in a cycle to EngineDict.
Arguments:
object: A Hestia cycle.
Returns a cycle where all the dictionaries are EngineDict."""
mobject = EngineDict(object)
for key in list(mobject.keys()):
mobject[key] = self.converter_edict(mobject[key])
return mobject
[docs] def converter_dict(self, object):
"""Method that converts an EngineDict to a dictionary if it is an EngineDict, or a list (or elist) of
EngingeDicts to a list of dictionaries. If it is not a dictionary or a collection of dictionaries returns the
object itself.
Arguments:
object: The object to be converted to dictionary.
Returns the object converted to dictionary or the object otherwise."""
if isinstance(object, EngineDict):
dict(object)
return self.converter_crawler_dict(object)
elif isinstance(object, dict):
return self.converter_crawler_dict(object)
elif isinstance(object, list) or isinstance(object, EngineList):
return [self.converter_dict(item) for item in object]
else:
return object
[docs] def converter_crawler_dict(self, object):
"""Method that converts all the EngineDict in a cycle to dictionaries.
Arguments:
object: A Hestia cycle.
Returns a cycle where all the EngineDict are dictionaries."""
mobject = dict(object) if object is not None else {}
for key in list(mobject.keys()):
mobject[key] = self.converter_dict(mobject[key])
return mobject
[docs] def converter_dict_to_list(self, node, key):
"""Method that converts a key of a node (that is a dictionary) into a list defined in Hestia's Schema.
Arguments:
node: A Hestia node
key: A key of the node
Returns nothing."""
if key in node:
node[key] = node[key].evalues()
[docs] def converter_list_to_dict(self, node, key):
"""Method that converts a subkey of a node (that is a dictionary) into a list defined in Hestia's Schema.
Arguments:
node: A Hestia node
key: A subkey of the node
Returns nothing."""
if key in node:
elist = EngineList(node[key])
node[key] = elist.to_edict()
[docs] def converter_subdict_to_list(self, node, key):
"""Method that converts a key of the subnodes of a node (that are lists) into a dictionary to easily access
data.
Arguments:
node: A Hestia node
key: A key of the subnodes
Returns nothing."""
subnodes = list(node.values())
for subnode in subnodes:
if key in subnode:
self.converter_dict_to_list(subnode, key)
[docs] def converter_subsubdict_to_list(self, node, subkey, subsubkey):
"""Method that converts a subsubkey of the subnodes of a node (that are lists) into a dictionary to easily
access data.
Arguments:
node: A Hestia node
subkey: A subkey of the subnodes
Returns nothing."""
subnodes = list(node.values())
for subnode in subnodes:
if subkey in subnode and subsubkey in subnode[subkey]:
self.converter_dict_to_list(subnode[subkey], subsubkey)
[docs] def converter_sublist_to_dict(self, node, key):
"""Method that converts a subkey of the subnodes of a node (that are lists) into a dictionary to easily access
data.
Arguments:
node: A Hestia node
key: A subkey of the subnodes
Returns nothing."""
subnodes = list(node.values())
for subnode in subnodes:
if key in subnode:
self.converter_list_to_dict(subnode, key)
[docs] def converter_subsublist_to_dict(self, node, subkey, subsubkey):
"""Method that converts a subsubkey of the subnodes of a node (that are lists) into a dictionary to easily
access data.
Arguments:
node: A Hestia node
subkey: A subkey of the subnodes
subsubkey: A subsubkey of the subnodes
Returns nothing."""
subnodes = list(node.values())
for subnode in subnodes:
if subkey in subnode and subsubkey in subnode[subkey]:
self.converter_list_to_dict(subnode[subkey], subsubkey)
[docs] def import_assessment(self, assessment):
"""Method that imports a jsonld file into the representation required to perform the Hestia calculations.
Arguments:
file: A jsonld file
Returns nothing."""
if assessment is not None:
self.save_original_assessment(assessment)
else:
self.representation_assessment = None
if self.assessment is not None and len(self.assessment) > 0 and 'cycle' in self.assessment[0]:
self.representation_cycle = self.import_cycle(self.cycle, None)
if self.assessment != [] and self.assessment is not None:
self.representation_assessment =\
EngineDict({element['product']['@id']: EngineDict(element) for element in self.assessment})
for element in self.representation_assessment.evalues():
self.converter_list_to_dict(element['product'], 'defaultProperties')
self.converter_list_to_dict(element, 'emissionsResourceUse')
self.converter_subsublist_to_dict(element['emissionsResourceUse'], 'term', 'defaultProperties')
self.converter_list_to_dict(element, 'impacts')
elif self.assessment == [] and self.assessment is not None:
self.representation_assessment = EngineDict()
return self.representation_assessment
[docs] def import_cycle(self, cycle, site=None):
"""Method that imports a jsonld file into the representation required to perform the Hestia calculations.
Arguments:
file: A jsonld file
Returns nothing."""
self.save_original_cycle(cycle, site)
if 'site' in self.cycle and self.cycle['site'] is not None:
self.converter_list_to_dict(self.cycle['site'], 'measurements')
self.converter_subsublist_to_dict(self.cycle['site']['measurements'], 'term', 'defaultProperties')
if 'inputs' in self.cycle and self.cycle['inputs'] is not None:
self.converter_list_to_dict(self.cycle, 'inputs')
self.converter_sublist_to_dict(self.cycle['inputs'], 'properties')
self.converter_subsublist_to_dict(self.cycle['inputs'], 'term', 'defaultProperties')
if 'products' in self.cycle and self.cycle['products'] is not None:
self.converter_list_to_dict(self.cycle, 'products')
self.converter_sublist_to_dict(self.cycle['products'], 'properties')
self.converter_subsublist_to_dict(self.cycle['products'], 'term', 'defaultProperties')
if 'emissions' in self.cycle and self.cycle['emissions'] is not None:
self.converter_list_to_dict(self.cycle, 'emissions')
self.converter_sublist_to_dict(self.cycle['emissions'], 'properties')
self.converter_subsublist_to_dict(self.cycle['emissions'], 'term', 'defaultProperties')
if 'practices' in self.cycle and self.cycle['practices'] is not None:
self.converter_list_to_dict(self.cycle, 'practices')
self.representation_cycle = self.cycle
self.safe_representation = self.trasnform_engine_dict(self.cycle)
return self.representation_cycle
[docs] def export_assessment(self):
"""Method that imports a jsonld file into the representation required to perform the Hestia calculations.
Arguments:
file: A jsonld file
Returns nothing."""
self.output_assessmet = copy.deepcopy(self.representation_assessment)
if self.output_assessmet is not None:
for element in self.output_assessmet.evalues():
self.converter_subsubdict_to_list(element['emissionsResourceUse'], 'term', 'defaultProperties')
self.converter_dict_to_list(element, 'emissionsResourceUse')
self.converter_dict_to_list(element['product'], 'defaultProperties')
self.converter_dict_to_list(element, 'impacts')
self.clean_crawler_fields(element)
return list(self.converter_crawler_dict(self.output_assessmet).values())
[docs] def export_cycle(self):
"""Method that exports the Engine representation into the Hestia Schema.
Arguments:
None
Returns a dict representing the recalculated cycle."""
self.output_cycle = copy.deepcopy(self.representation_cycle)
if 'site' in self.cycle and self.cycle['site'] is not None:
self.converter_subsubdict_to_list(self.output_cycle['site']['measurements'], 'term', 'defaultProperties')
self.converter_dict_to_list(self.output_cycle['site'], 'measurements')
if 'inputs' in self.cycle and self.cycle['inputs'] is not None:
self.converter_subsubdict_to_list(self.output_cycle['inputs'], 'term', 'defaultProperties')
self.converter_subdict_to_list(self.output_cycle['inputs'], 'properties')
self.converter_dict_to_list(self.output_cycle, 'inputs')
if 'products' in self.cycle and self.cycle['products'] is not None:
self.converter_subsubdict_to_list(self.output_cycle['products'], 'term', 'defaultProperties')
self.converter_subdict_to_list(self.output_cycle['products'], 'properties')
self.converter_dict_to_list(self.output_cycle, 'products')
if 'emissions' in self.cycle and self.cycle['emissions'] is not None:
self.converter_subsubdict_to_list(self.output_cycle['emissions'], 'term', 'defaultProperties')
self.converter_subdict_to_list(self.output_cycle['emissions'], 'properties')
self.converter_dict_to_list(self.output_cycle, 'emissions')
if 'practices' in self.cycle and self.cycle['practices'] is not None:
self.converter_dict_to_list(self.output_cycle, 'practices')
return self.converter_crawler_dict(self.clean_crawler_fields(self.output_cycle))
[docs] def save_original_assessment(self, assessment):
"""Method that saves the original data in the engine.
Arguments:
cycle: the original cycle
site: the original site
Returns nothing."""
self.assessment = copy.deepcopy(assessment)
if len(assessment) > 0:
self.cycle = copy.deepcopy(assessment[0]['cycle']) if 'cycle' in assessment[0] else None
if 'site' in assessment[0]:
self.site = copy.deepcopy(assessment[0]['site'])
elif self.site is not None and 'site' in self.cycle:
self.site = self.cycle['site']
else:
self.site = None
[docs] def save_original_cycle(self, cycle, site):
"""Method that saves the original data in the engine.
Arguments:
cycle: the original cycle
site: the original site
Returns nothing."""
self.cycle = copy.deepcopy(cycle)
if site is not None:
self.site = copy.deepcopy(site)
elif cycle is not None and 'site' in cycle:
self.site = copy.deepcopy(cycle['site'])
else:
self.site = None
if self.site is not None:
self.cycle["site"] = self.site
self.cycle = self.converter_crawler_edict(self.cycle)
[docs] def update_representation_assessments(self, calc_data):
"""Method that updates the impact assessments of an assessment representation.
Arguments:
calc_data: A dataStore object containing the new assessments.
Returns nothing."""
if self.representation_assessment is None:
self.representation_assessment = EngineDict()
for impact_assessment in calc_data.representation_assessment.evalues():
if self.representation_assessment is not None and\
impact_assessment['product']['@id'] not in self.representation_assessment:
self.representation_assessment[impact_assessment['product']['@id']] = impact_assessment
else:
self.update_indicators(impact_assessment, 'emissionsResourceUse')
self.update_indicators(impact_assessment, 'impacts')
[docs] def update_indicators(self, assessment_new, key):
"""Method that updates the indicators with a recalculated impact assessment.
Arguments:
assessment_new: The new impact assessment with the new indicators.
Returns nothing."""
pid = assessment_new['product']['@id']
for indicator in assessment_new[key]:
if indicator in self.representation_assessment[pid][key] and \
'dataState' in self.representation_assessment[pid][key][indicator] and \
'dataVersion' in self.representation_assessment[pid][key][indicator] and\
self.representation_assessment[pid][key][indicator]['dataState'] == "original" and\
self.representation_assessment[pid][key][indicator]['dataVersion'] == VERSION and\
not self.is_different([self.representation_assessment[pid][key][indicator]['value']],
[assessment_new[key][indicator]['value']]):
pass
elif key not in self.representation_assessment[pid]:
self.representation_assessment[pid][key] = EngineDict()
self.representation_assessment[pid][key][indicator] = assessment_new[key][indicator]
else:
self.representation_assessment[pid][key][indicator] = assessment_new[key][indicator]
[docs] def update_representation_emission(self, emission):
"""Method that updates a emission of the cycle representation
Arguments:
emision: A node with the emission to update.
Returns nothing."""
no_rewriting = [EmissionMethodTier.BACKGROUND.value, EmissionMethodTier.MEASURED.value]
eid = emission['term']['@id']
tier_new = emission['methodTier']
if self.representation_cycle['emissions'] == {}:
self.representation_cycle['emissions'] = EngineDict()
if self.representation_cycle['emissions'].get(eid) is not None and \
self.representation_cycle['emissions'][eid].get('methodTier') and\
self.representation_cycle['emissions'][eid].get('methodTier') not in no_rewriting and\
parse_methodTier(tier_new) >=\
parse_methodTier(self.representation_cycle['emissions'][eid].get('methodTier')) and\
self.is_different(self.representation_cycle['emissions'][eid]['value'], emission['value']):
emission_versions = self.representation_cycle['emissions'][eid].get('dataVersion')
if self.check_version(emission_versions):
self.representation_cycle['emissions'][eid] = emission
elif self.representation_cycle['emissions'].get(eid) is None:
self.representation_cycle['emissions'][eid] = emission
[docs] def is_different(self, original_list, calculated_list):
"""Method that checks if two values are equal.
Arguments:
original_list: The original list.
calculated_list: The recalculated list.
Returns boolean."""
original = sum(original_list)
calculated = sum(calculated_list)
diff = abs(original - calculated)
if calculated > 0.001:
return True if diff/calculated > 0.01 else False
else:
return True if diff > 0.001/2 else False
[docs] def check_version(self, emission_versions):
"""Method that checks if two the node is using the most recent version of the schema.
Arguments:
emission_versions: The version of the the node to be checked.
Returns boolean."""
if emission_versions is [] or emission_versions is None:
return True
elif emission_versions[-1] < VERSION:
return True
else:
return False