Source code for hestia_earth.calculation.assessment_hierarchies

import datetime
import copy
import concurrent.futures
from functools import partial
from collections import OrderedDict
from hestia_earth.schema import ImpactAssessmentJSONLD, SchemaType, IndicatorJSONLD
from hestia_earth.utils.api import download_hestia

from .version import VERSION
from .abstract_hierarchy import Hierarchy
from .representation import DataStore
from .utils import instantiate_model, EngineDict, EngineList, primary_product
from .emissionsResourceUses import EMISSIONSRESOURCEUSE_ORDER
from .impacts import IMPACTS_ORDER


[docs]class Assessments_hierarchy(Hierarchy): """Assessments_hierarchy class. This is an abstract class from which inherit all the hierarchies defined in Hestia. It contains a set of common impacts and instance variables that are typically used by every hierarchy. A hierarchy defines the order to run the calculations of impacts in Hestia. The order is from higher tier to lower tier.""" def __init__(self): """Constructor method. Arguments: None Returns instance.""" super().__init__() func = partial(instantiate_model, package="emissionsResourceUses") with concurrent.futures.ThreadPoolExecutor() as executor: self.resourceUse_models = list(executor.map(func, EMISSIONSRESOURCEUSE_ORDER)) func = partial(instantiate_model, package="impacts") with concurrent.futures.ThreadPoolExecutor() as executor: self.impacts_models = list(executor.map(func, IMPACTS_ORDER)) self.calc_data = DataStore() self.impact_assessments = [] self.counter = 0
[docs] def process_hierarchy(self, data): """Method that runs the hierarchy through all the impacts for a given cycle. Arguments: cycle: This is a cycle in the representation required by the calculation engine. (Different from the Schema representation) Returns a file object.""" if data.representation_cycle is not None: self.process_hierarchy_with_cycle(data) else: self.process_hierarchy_no_cycle(data)
def process_hierarchy_no_cycle(self, data): products = [assessment['product'] for assessment in data.representation_assessment.evalues()] emissionsResourceUses = EngineList([assessment['emissionsResourceUse'] for assessment in data.representation_assessment.evalues()]) for (product, emissionsResourceUse) in zip(products, emissionsResourceUses): if product['termType'] != 'cropResidue': prodnorm = copy.deepcopy(product) if 'term' in prodnorm and 'defaultProperties' in prodnorm['term']: prodnorm['term']['defaultProperties'] = product['term']['defaultProperties'].evalues() impact_assessment = self.build_impact_assessment(None, prodnorm) impact_assessment['emissionsResourceUse'] = emissionsResourceUse.evalues() indicators = self.calculate_impact_indicators(EngineDict(), prodnorm, emissionsResourceUse) impact_assessment['impacts'] = indicators.evalues() self.impact_assessments.append(impact_assessment) self.calc_data.import_assessment(self.impact_assessments) data.update_representation_assessments(self.calc_data) def process_hierarchy_with_cycle(self, data): cycle = data.representation_cycle ids_products_to_process = [assessment['product']['@id'] for assessment in data.representation_assessment.evalues()] products_to_process = cycle['products'].evalues() if len(ids_products_to_process) == 0 else\ [product for product in cycle['products'].evalues() if product['term']['@id'] in ids_products_to_process] for product in products_to_process: if product['economicValueShare'] != {} and product['term']['termType'] != 'cropResidue': prodnorm = copy.deepcopy(product) prodnorm['term']['defaultProperties'] = product['term']['defaultProperties'].evalues() impact_assessment = self.build_impact_assessment(cycle, prodnorm) emissionsResourceUses = self.calculate_emissionsResourceUses(cycle, prodnorm) impact_assessment['emissionsResourceUse'] = emissionsResourceUses.evalues() indicators = self.calculate_impact_indicators(cycle, prodnorm, emissionsResourceUses) impact_assessment['impacts'] = indicators.evalues() self.impact_assessments.append(impact_assessment) self.calc_data.import_assessment(self.impact_assessments) data.update_representation_assessments(self.calc_data) def calculate_emissionsResourceUses(self, cycle, product): emissionsResourceUses = {} for model in self.resourceUse_models: if not model.check(cycle, product): continue else: emissionsResourceUses.update(model.calculate()) return self.build_indicators_array(emissionsResourceUses) def calculate_impact_indicators(self, cycle, product, emissionsResourceUses): results = {} for model in self.impacts_models: if not model.check(cycle, product, emissionsResourceUses): continue else: result = model.calculate() if result > 0: results[model.id] = result return self.build_indicators_array(results) def build_impact_assessment(self, cycle, product): impact_assessment = OrderedDict(list(OrderedDict([('@context', None)]).items()) + list(ImpactAssessmentJSONLD().fields.items())) impact_assessment['@context'] = Hierarchy.context + "/ImpactAssessment.jsonld" impact_assessment['@id'] = str(self.counter) impact_assessment['@type'] = SchemaType.IMPACTASSESSMENT.value impact_assessment['name'] = self.get_name(cycle) if cycle is not None else '' impact_assessment['version'] = VERSION impact_assessment['cycle'] = self.get_reference(cycle) if cycle is not None else {} impact_assessment['endDate'] = datetime.datetime.today().strftime('%Y-%m-%d') impact_assessment['country'] = cycle['site']['country'] if cycle is not None else '' impact_assessment['product'] = product['term'] if 'term' in product else product impact_assessment['source'] = self.get_reference(dict(cycle['defaultSource'])) if cycle is not None else {} impact_assessment['functionalUnitMeasure'] = "kg" impact_assessment['functionalUnitQuantity'] = 1 impact_assessment['allocationMethod'] = "economic" impact_assessment['systemBoundary'] = True impact_assessment['dataPrivate'] = "false" impact_assessment['dataPrivate'] = False self.counter = self.counter + 1 return impact_assessment def build_indicator(self, id, uses): result = uses[id] property = IndicatorJSONLD().fields term = download_hestia(id) term.pop('@context', None) property['term'] = self.get_reference(term) property['value'] = self.round_to_significant(result, 10) property['recalculated'] = ['term', 'value'] property['recalculatedVersion'] = [VERSION, VERSION] return property def build_indicators_array(self, dictionary): eru = {x: y for x, y in dictionary.items()} func = partial(self.build_indicator, uses=dictionary) with concurrent.futures.ThreadPoolExecutor() as executor: indicators = EngineList(executor.map(func, list(eru.keys()))) return indicators.to_edict() def get_name(self, cycle): product = primary_product(cycle['products']) if cycle['products'] != {} else {} name = product['term']['name'] if product != {} else {} description = cycle['description'].lower() if 'description' in cycle else "" year = cycle['endDate'] if 'endDate' in cycle['endDate'] else "" source = cycle['defaultSource']['name'] if 'name' in cycle['defaultSource'] else "" impact_assessment_name = \ "Impact Assessment for, '" + name + "' , under " + description + " in " + year + " from " + source return impact_assessment_name