Source code for hestia_earth.calculation.utils

import importlib
import inspect
import copy
from collections import OrderedDict
from statistics import mean
from dateutil import parser
from hestia_earth.utils.lookup import download_lookup, get_table_value


MAX_DEPTH = 1000
OLDEST_DATE = '1800'


[docs]def average(data): if isinstance(data, list) and len(data) > 0 and all(not isinstance(x, EngineDict) and not isinstance(x, dict) for x in data): return mean(data) elif isinstance(data, list) and len(data) > 0: partial = [item for item in copy.deepcopy(data) if not isinstance(item, EngineDict) and not isinstance(item, dict)] return mean(partial) if isinstance(partial, list) and len(partial) > 0 else {} else: return {}
[docs]def format_lookup(lookup_name, column_name): numpy_lookup = download_lookup(lookup_name) return {elem[0]: get_table_value(numpy_lookup, 'termid', elem[0], column_name) for elem in numpy_lookup if get_table_value(numpy_lookup, 'termid', elem[0], column_name) is not None}
[docs]def instantiate_model(module, package=None): module_path = f"hestia_earth.calculation.{package}.{module}" module_imported = importlib.import_module(module_path) members_imported = inspect.getmembers(module_imported, inspect.isclass) obj = [tuple[1]() for tuple in members_imported if tuple[1].__module__ == module_path] return obj[0]
[docs]def instantiate_hierarchy(hierarchy): package = 'hestia_earth.calculation.emission_hierarchies' instance = getattr(importlib.import_module(package), 'Emissions_field_hierarchy') return instance(hierarchy)
[docs]def instantiate_hierarchy_transformation(hierarchy): package = 'hestia_earth.calculation.emission_hierarchies' instance = getattr(importlib.import_module(package), 'Emissions_transformation_hierarchy') return instance(hierarchy)
[docs]def most_recents(measurements, cycle_date): dates = list(set([abs((parser.isoparse(measurement['endDate']) - cycle_date).days) if 'endDate' in measurement else abs((parser.isoparse(OLDEST_DATE) - cycle_date).days) for measurement in measurements])) dates.sort() orderedmeasurements = [[] for i in range(len(dates))] for measurement in measurements: date = abs((parser.isoparse(measurement['endDate']) - cycle_date).days) \ if 'endDate' in measurement else abs((parser.isoparse(OLDEST_DATE) - cycle_date).days) date_index = dates.index(date) orderedmeasurements[date_index].append(measurement) return orderedmeasurements[0] if len(orderedmeasurements) > 0 else []
[docs]def parse_methodTier(value: str): # TODO: methodTier can be 'background' / 'measured', code won't work with these return [int(value) for value in value.split() if value.isdigit()][0]
[docs]def most_relevant_measurement(measurement, date): if isinstance(measurement, EngineDict): return average(measurement['value']) elif len(measurement) > 0 and date != {}: cycle_date = parser.isoparse(date) most_recent_measurements = most_recents(measurement, cycle_date) shallowest_measurements = shallowest_measurement(most_recent_measurements) return average(shallowest_measurements[0]['value']) else: return {}
[docs]def shallowest_measurement(measurements): depths = list(set([measurement['depthUpper'] if 'depthUpper' in measurement else MAX_DEPTH for measurement in measurements])) depths.sort() orderedmeasurements = [[] for i in range(len(depths))] for measurement in measurements: depth = measurement['depthUpper'] if 'depthUpper' in measurement else MAX_DEPTH depth_index = depths.index(depth) orderedmeasurements[depth_index].append(measurement) return orderedmeasurements[0] if len(orderedmeasurements) > 0 else []
[docs]def summation(data): if isinstance(data, list) and len(data) > 0 and all(not isinstance(x, EngineDict) and not isinstance(x, dict) for x in data): return sum(data) elif isinstance(data, list) and len(data) > 0: partial = [item for item in copy.deepcopy(data) if not isinstance(item, EngineDict) and not isinstance(item, dict)] return sum(partial) if isinstance(partial, list) and len(partial) > 0 else {} else: return {}
[docs]def residue_nitrogen(products): return summation([residue_nitrogen_aboveground(products), residue_nitrogen_belowground(products)])
[docs]def residue_nitrogen_aboveground(products): AG_Res = 0 if products['aboveGroundCropResidueLeftOnField']['value'] != {} or\ products['aboveGroundCropResidueIncorporated']['value'] != {}: AG_Res_Lof = summation(products['aboveGroundCropResidueLeftOnField']['value']) AG_Res_Inc = summation(products['aboveGroundCropResidueIncorporated']['value']) AG_Res = summation([AG_Res_Lof, AG_Res_Inc]) ncontent = products['aboveGroundCropResidueTotal']['properties']['nitrogenContent']['value'] AG_Res_N = AG_Res * float(ncontent)/100 if ncontent != {} and AG_Res != {} else {} else: AG_Res_Tot = summation(products['aboveGroundCropResidueTotal']['value']) AG_Res_Bur = summation(products['aboveGroundCropResidueBurnt']['value']) AG_Res_Rem = summation(products['aboveGroundCropResidueRemoved']['value']) ncontent = products['aboveGroundCropResidueTotal']['properties']['nitrogenContent']['value'] AG_Res = AG_Res_Tot if AG_Res_Tot != {} and AG_Res != {} else {} AG_Res = AG_Res - AG_Res_Bur if AG_Res_Bur != {} and AG_Res != {} else AG_Res AG_Res = AG_Res - AG_Res_Rem if AG_Res_Rem != {} and AG_Res != {} else AG_Res AG_Res_N = AG_Res * float(ncontent)/100 if ncontent != {} and AG_Res != {} else {} return AG_Res_N
[docs]def residue_nitrogen_belowground(products): if products['belowGroundCropResidue']['value'] != {}: BG_Res = summation(products['belowGroundCropResidue']['value']) ncontent = products['belowGroundCropResidue']['properties']['nitrogenContent']['value'] BG_Res_N = BG_Res * float(ncontent)/100 if ncontent != {} and BG_Res != {} else {} else: BG_Res_N = {} return BG_Res_N
[docs]def primary_product(products): products_safe = copy.deepcopy(products) primary_products = [product for product in products_safe.evalues() if product['primary']] return primary_products[0] if len(primary_products) != 0 else {}
[docs]class EngineDict(OrderedDict): """EngineDict class. This class creates dictionaries that do not throw error exceptions when we try to access keys that are not in the dictionary. This class also extends dictionaries in order to covert to list EngineDict itself.""" def __missing__(self, key): """Method that returns an empty dictionary when a key is not fund. Arguments: key: key of a dictionary Returns empty dictionary.""" value = type(self)() return value
[docs] def evalues(self): """Method to generate an EngineList from an EngineDict. Arguments: None Returns EngineList.""" values = EngineList() for element in self.__iter__(): if isinstance(self.__getitem__(element), list): values.extend(self.__getitem__(element)) else: values.append(self.__getitem__(element)) return values
[docs]class EngineList(list): """EngineList class. This class also extends lists in order to covert them easily to EngineDict, that is the data structure used for the Engine representation."""
[docs] def to_edict(self): """Method to generate an EngineDict from an EngineList. Arguments: None Returns EngineDict.""" items = EngineDict() counter = 0 for element in self.__iter__(): if 'term' not in element or '@id' not in element['term']: items[f"no-term-no_id-{counter}"] = element counter += 1 elif element['term']['@id'] not in items: items[element['term']['@id']] = element elif isinstance(items[element['term']['@id']], list): items[element['term']['@id']].append(element) else: items[element['term']['@id']] = [items[element['term']['@id']], element] return items
[docs]def parse_float(value: str, default=0): return float(value) if value is not None and value != '-' else default