import datetime
import copy
import concurrent.futures
from functools import partial
from collections import OrderedDict
from hestia_earth.schema import ImpactAssessmentJSONLD, SchemaType, IndicatorJSONLD
from hestia_earth.utils.api import download_hestia
from .version import VERSION
from .abstract_hierarchy import Hierarchy
from .representation import DataStore
from .utils import instantiate_model, EngineDict, EngineList, primary_product
from .emissionsResourceUses import EMISSIONSRESOURCEUSE_ORDER
from .impacts import IMPACTS_ORDER
[docs]class Assessments_hierarchy(Hierarchy):
"""Assessments_hierarchy class. This is an abstract class from which inherit all the hierarchies defined in Hestia.
It contains a set of common impacts and instance variables that are typically used by every hierarchy. A hierarchy
defines the order to run the calculations of impacts in Hestia. The order is from higher tier to lower tier."""
def __init__(self):
"""Constructor method.
Arguments:
None
Returns instance."""
super().__init__()
func = partial(instantiate_model, package="emissionsResourceUses")
with concurrent.futures.ThreadPoolExecutor() as executor:
self.resourceUse_models = list(executor.map(func, EMISSIONSRESOURCEUSE_ORDER))
func = partial(instantiate_model, package="impacts")
with concurrent.futures.ThreadPoolExecutor() as executor:
self.impacts_models = list(executor.map(func, IMPACTS_ORDER))
self.calc_data = DataStore()
self.impact_assessments = []
self.counter = 0
[docs] def process_hierarchy(self, data):
"""Method that runs the hierarchy through all the impacts for a given cycle.
Arguments:
cycle: This is a cycle in the representation required by the calculation engine.
(Different from the Schema representation)
Returns a file object."""
if data.representation_cycle is not None:
self.process_hierarchy_with_cycle(data)
else:
self.process_hierarchy_no_cycle(data)
def process_hierarchy_no_cycle(self, data):
products = [assessment['product'] for assessment in data.representation_assessment.evalues()]
emissionsResourceUses = EngineList([assessment['emissionsResourceUse'] for assessment in
data.representation_assessment.evalues()])
for (product, emissionsResourceUse) in zip(products, emissionsResourceUses):
if product['termType'] != 'cropResidue':
prodnorm = copy.deepcopy(product)
if 'term' in prodnorm and 'defaultProperties' in prodnorm['term']:
prodnorm['term']['defaultProperties'] = product['term']['defaultProperties'].evalues()
impact_assessment = self.build_impact_assessment(None, prodnorm)
impact_assessment['emissionsResourceUse'] = emissionsResourceUse.evalues()
indicators = self.calculate_impact_indicators(EngineDict(), prodnorm, emissionsResourceUse)
impact_assessment['impacts'] = indicators.evalues()
self.impact_assessments.append(impact_assessment)
self.calc_data.import_assessment(self.impact_assessments)
data.update_representation_assessments(self.calc_data)
def process_hierarchy_with_cycle(self, data):
cycle = data.representation_cycle
ids_products_to_process = [assessment['product']['@id']
for assessment in data.representation_assessment.evalues()]
products_to_process = cycle['products'].evalues() if len(ids_products_to_process) == 0 else\
[product for product in cycle['products'].evalues() if product['term']['@id'] in ids_products_to_process]
for product in products_to_process:
if product['economicValueShare'] != {} and product['term']['termType'] != 'cropResidue':
prodnorm = copy.deepcopy(product)
prodnorm['term']['defaultProperties'] = product['term']['defaultProperties'].evalues()
impact_assessment = self.build_impact_assessment(cycle, prodnorm)
emissionsResourceUses = self.calculate_emissionsResourceUses(cycle, prodnorm)
impact_assessment['emissionsResourceUse'] = emissionsResourceUses.evalues()
indicators = self.calculate_impact_indicators(cycle, prodnorm, emissionsResourceUses)
impact_assessment['impacts'] = indicators.evalues()
self.impact_assessments.append(impact_assessment)
self.calc_data.import_assessment(self.impact_assessments)
data.update_representation_assessments(self.calc_data)
def calculate_emissionsResourceUses(self, cycle, product):
emissionsResourceUses = {}
for model in self.resourceUse_models:
if not model.check(cycle, product):
continue
else:
emissionsResourceUses.update(model.calculate())
return self.build_indicators_array(emissionsResourceUses)
def calculate_impact_indicators(self, cycle, product, emissionsResourceUses):
results = {}
for model in self.impacts_models:
if not model.check(cycle, product, emissionsResourceUses):
continue
else:
result = model.calculate()
if result > 0:
results[model.id] = result
return self.build_indicators_array(results)
def build_impact_assessment(self, cycle, product):
impact_assessment = OrderedDict(list(OrderedDict([('@context', None)]).items()) +
list(ImpactAssessmentJSONLD().fields.items()))
impact_assessment['@context'] = Hierarchy.context + "/ImpactAssessment.jsonld"
impact_assessment['@id'] = str(self.counter)
impact_assessment['@type'] = SchemaType.IMPACTASSESSMENT.value
impact_assessment['name'] = self.get_name(cycle) if cycle is not None else ''
impact_assessment['version'] = VERSION
impact_assessment['cycle'] = self.get_reference(cycle) if cycle is not None else {}
impact_assessment['endDate'] = datetime.datetime.today().strftime('%Y-%m-%d')
impact_assessment['country'] = cycle['site']['country'] if cycle is not None else ''
impact_assessment['product'] = product['term'] if 'term' in product else product
impact_assessment['source'] = self.get_reference(dict(cycle['defaultSource'])) if cycle is not None else {}
impact_assessment['functionalUnitMeasure'] = "kg"
impact_assessment['functionalUnitQuantity'] = 1
impact_assessment['allocationMethod'] = "economic"
impact_assessment['systemBoundary'] = True
impact_assessment['dataPrivate'] = "false"
impact_assessment['dataPrivate'] = False
self.counter = self.counter + 1
return impact_assessment
def build_indicator(self, id, uses):
result = uses[id]
property = IndicatorJSONLD().fields
term = download_hestia(id)
term.pop('@context', None)
property['term'] = self.get_reference(term)
property['value'] = self.round_to_significant(result, 10)
property['recalculated'] = ['term', 'value']
property['recalculatedVersion'] = [VERSION, VERSION]
return property
def build_indicators_array(self, dictionary):
eru = {x: y for x, y in dictionary.items()}
func = partial(self.build_indicator, uses=dictionary)
with concurrent.futures.ThreadPoolExecutor() as executor:
indicators = EngineList(executor.map(func, list(eru.keys())))
return indicators.to_edict()
def get_name(self, cycle):
product = primary_product(cycle['products']) if cycle['products'] != {} else {}
name = product['term']['name'] if product != {} else {}
description = cycle['description'].lower() if 'description' in cycle else ""
year = cycle['endDate'] if 'endDate' in cycle['endDate'] else ""
source = cycle['defaultSource']['name'] if 'name' in cycle['defaultSource'] else ""
impact_assessment_name = \
"Impact Assessment for, '" + name + "' , under " + description + " in " + year + " from " + source
return impact_assessment_name