Source code for calcey.GUI_test

import tkinter as tk
import pandas as pd
from osgeo import gdal
import netCDF4
import numpy as np
import os
import openpyxl
from geopy.geocoders import Photon
import country_converter as coco

import bw2data as bd
import bw2calc as bc
import bw2analyzer as ba
import bw2io as bi
import numpy as np
from scipy import sparse
import uuid
from geopy.geocoders import Nominatim

[docs] class calcey: def __init__(self): bd.projects.set_current("calcey") self.brightway_checkin() self.root = tk.Tk() self.frame = tk.Frame(self.root) self.frame.pack() self.input_field_lat = tk.Entry(self.frame, width=20) self.input_field_lat.pack() self.input_field_lon = tk.Entry(self.frame, width=20) self.input_field_lon.pack() self.input_field_crop = tk.Entry(self.frame, width=20) self.input_field_crop.pack() self.button = tk.Button(self.frame, text="CLICK ME", fg="red", command=self.print_result) self.button.pack(side=tk.LEFT)
[docs] def print_result(self): # get all inputs from the UI value_lat = float(self.input_field_lat.get()) value_lon = float(self.input_field_lon.get()) crop = self.input_field_crop.get() # input_yield = self.input_field_yield.get() n_fert_input self.create_main_process(self.name_database, value_lat, value_lon, crop) # yield_ = input_yield n_fert_input print("create_main_process finished") self.dataframe_exchanges(latitude=value_lat, longitude=value_lon, crop=crop) print("dataframe_exchanges finished") self.generate_edges_to_product() print("generate_edges_to_product finished") self.calculate_LCA_results()
[docs] def get_background_data_yield(self, input_lat: float, input_lon: float, crop: str): # get yield data df = pd.read_csv("../data/FAOSTAT_data_yield.csv") country = self.country_from_coordinates(Latitude=input_lat, Longitude=input_lon) FAO_yield = float(df.loc[(df['Area'] == country) & (df['Year'] == 2022) & (df['Item'] == crop), 'Value'].iloc[0]) return FAO_yield
[docs] def run(self): self.root.mainloop()
[docs] def country_from_coordinates(self, Latitude: float, Longitude: float): lat_str = str(Latitude) lon_str = str(Longitude) geolocator = Nominatim(user_agent="GetLoc") location = geolocator.reverse(lat_str + "," + lon_str, language="en") print("location:" , location) address = location.address print("address:" , address) data_address = address.split(',') print("data_address:" , data_address) country = data_address[len(data_address)-1].lstrip().rstrip() country = coco.convert(country, to="name_short") return(country)
[docs] def ISO3_country_from_coordinates(self, Latitude: float,Longitude:float): country = self.country_from_coordinates(Latitude, Longitude) country_ISO3 = coco.convert(country,to='ISO3') print(country, country_ISO3) return(country_ISO3)
####### crop mapping #################
[docs] def get_crop_mapping_file(self, sheet_name: str): crop_mapping_df = pd.read_excel("../data/Mapping_data_Calcey.xlsx", sheet_name=sheet_name, index_col = 0) return crop_mapping_df
[docs] def crop_proxy_excelwithsheet(self, crop:str, sheet_name: str ,level:int=1): ''' crop_mapping_file should be something like crop_mapping_file = pd.read_excel([path_file], sheet_name = [sheet_name], index_col = 0) ''' crop_mapping_file = self.get_crop_mapping_file(sheet_name) return(crop_mapping_file.loc[crop,"Level_"+str(level)])
####### country mapping #################
[docs] def country_fertilizer_proxy_location(self, Latitude: float, Longitude: float): country_ISO3 = self.ISO3_country_from_coordinates(Latitude, Longitude) country_mapping_file = pd.read_excel("../data/Fertilizer_mapping_country.xlsx", sheet_name = "Mapping_nearest_neighbor", index_col = 0) country_proxy_ISO3 = country_mapping_file.loc[country_ISO3,"iso3_nearest"][0] return(country_proxy_ISO3)
################## get background data ###########################
[docs] def get_background_data_fert(self, Latitude: float, Longitude: float, crop:str): """ Latitude, Longitude, crop """ # get fetilizer data df_amount_fert_crop_country = pd.read_excel("../data/Ludemann2022_fertilizer_Country_clean_ISO3.xlsx") df_amount_fert_crop_country = df_amount_fert_crop_country.set_index(["Country_ISO3","Crop"]) # get the crop mapping file crop_mapping_file = self.get_crop_mapping_file(sheet_name="Mapping_Fertilizer") # get country proxy in ISO 3 format country_proxy_ISO3 = self.country_fertilizer_proxy_location(Latitude, Longitude) print(country_proxy_ISO3) try: return(df_amount_fert_crop_country.loc[(country_proxy_ISO3,crop)]) except KeyError: for level in range(1,len(crop_mapping_file.columns)-1): try: crop_proxy_value = self.crop_proxy_excelwithsheet(crop=crop,sheet_name="Mapping_Fertilizer",level=level) return(df_amount_fert_crop_country.loc[(country_proxy_ISO3,crop_proxy_value)]) except KeyError: pass
def get_background_data_yield(self, input_lat: float, input_lon: float, crop: str): # get yield data df = pd.read_csv("../data/FAOSTAT_data_yield.csv") country = self.country_from_coordinates(Latitude=input_lat, Longitude=input_lon) FAO_yield = float(df.loc[(df['Area'] == country) & (df['Year'] == 2022) & (df['Item'] == crop), 'Value'].iloc[0]) return FAO_yield
[docs] def get_background_data_water(self, input_lat: float, input_lon: float, crop: str): path_to_folder = "../data/NC_background_water/" crop_water = self.crop_proxy_excelwithsheet(crop=crop, sheet_name="Mapping_water") df_list = [] for filename in os.listdir(path_to_folder): # Check if the file is a .nc file if filename.endswith('.nc') and crop_water in filename: # Open the file ds = netCDF4.Dataset(path_to_folder + filename, 'r') # identify the correct grid element lat_var = ds.variables['lat'][:] lon_var = ds.variables['lon'][:] lat_idx = np.argmin(np.abs(lat_var - input_lat)) lon_idx = np.argmin(np.abs(lon_var - input_lon)) try: # Get the raster data (assuming it's in the same variable) value = ds.variables['wf_unit_irrigated_blue'][:].data[lat_idx, lon_idx] except KeyError: value = np.NaN # 1e20 is the null value in this dataset, so we have to replace it value = 0 if value > 10000000000000 else value # Create a new DataFrame for each iteration new_df = pd.DataFrame({'filename': [filename], 'value': [value]}) # Append the new DataFrame to the list df_list.append(new_df) # Close the file ds.close() # Concatenate the DataFrames in the list df = pd.concat(df_list, ignore_index=True) sep = crop_water + "_" df[['crop_name', 'water_footprint']] = df['filename'].str.split(sep, n=1, expand=True) df['water_footprint'] = df['water_footprint'].str.rstrip('.nc') df = df.drop(['filename', 'crop_name'], axis=1) df = df.loc[:, ['water_footprint', 'value']] return df
[docs] def get_background_data_pest(self, input_lat: float, input_lon: float, crop: str): """ test """ path_to_folder = "../data/NC_background_pest/" crop_pest = self.crop_proxy_excelwithsheet(crop=crop, sheet_name="Mapping_pesticides") df_list = [] for filename in os.listdir(path_to_folder): # Check if the file is a .nc file if filename.endswith('.nc') and crop_pest in filename: # Open the file ds = netCDF4.Dataset(path_to_folder + filename, 'r') # identify the correct grid element lat_var = ds.variables['lat'][:] lon_var = ds.variables['lon'][:] lat_idx = np.argmin(np.abs(lat_var - input_lat)) lon_idx = np.argmin(np.abs(lon_var - input_lon)) try: # Get the raster data (assuming it's in the same variable) high_est = ds.variables['apr_H'][:].data[1] # Get the local value (using the same lat and lon coordinates) local_high_value = high_est[lat_idx, lon_idx] local_high_value = 0 if local_high_value == -1 else local_high_value except KeyError: local_high_value = np.NaN try: # Get the raster data (assuming it's in the same variable) low_est = ds.variables['apr_L'][:].data[1] # Get the local value (using the same lat and lon coordinates) local_low_value = low_est[lat_idx, lon_idx] local_low_value = 0 if local_low_value == -1 else local_low_value except KeyError: local_low_value = np.NaN value = (local_high_value + local_low_value) / 2 # Create a new DataFrame for each iteration new_df = pd.DataFrame({'filename': [filename], 'value': [value]}) # Append the new DataFrame to the list df_list.append(new_df) # Close the file ds.close() # Concatenate the DataFrames in the list df = pd.concat(df_list, ignore_index=True) sep = crop_pest + "_" df[['crop_name', 'pesticide_name']] = df['filename'].str.split(sep, n=1, expand=True) df['pesticide_name'] = df['pesticide_name'].str.rstrip('.nc') df = df.drop(['filename', 'crop_name'], axis=1) df = df.loc[:, ['pesticide_name', 'value']] pest_total = df['value'].sum() return df, pest_total
[docs] def create_final_dataframe(self, yield_data, fert_data, water_data, pest_data, pest_total): # yield data yield_in_kgperha = yield_data / 10 # fert data series = fert_data.drop('Country') df_fert = pd.DataFrame({ 'Parameter': series.index, 'Value': series.values }) df_fert = df_fert.set_index(df_fert.columns[0]) df_fert['Value'] = df_fert['Value'] / yield_in_kgperha # water_data water_m3perkg = water_data.at[0, 'value'] / 1000 # pest data pest_data['value'] = pest_data['value'] / yield_in_kgperha pest_data = pest_data.set_index(pest_data.columns[0]) pest_data.index = [i.capitalize() for i in pest_data.index] # display(pest_data) # pest total pest_kgperkg = pest_total / yield_in_kgperha return yield_in_kgperha, df_fert, water_m3perkg, pest_data, pest_kgperkg
[docs] def calculate_pest_emissions(self,pest_name, amount): # Emission factors EF_pest_soil = 0.9 EF_pest_air = 0.1 # Initialize an empty list to store the results results = [] # Calculate emissions pest_em_soil = amount * EF_pest_soil pest_em_air = amount * EF_pest_air # Append the results as a dictionary results.append({ 'pesticide_name': pest_name, 'pest_em_soil': pest_em_soil, 'pest_em_air': pest_em_air }) # Convert the results list to a DataFrame self.df_emissions = pd.DataFrame(results) self.df_emissions.set_index('pesticide_name', inplace=True) return self.df_emissions
[docs] def dataframe_exchanges(self, latitude, longitude, crop, yield_ = None, n_fert_input=None, p2o5_fert_input=None, k2o_fert_input=None): self.mapping_flows_uuid = pd.read_excel("../data/Mapping_data_Calcey.xlsx", sheet_name = "Mapping_flows_ecoinvent", index_col =0) ## adjust path before "Mapping_data_Calcey.xlsx" all_inputs = list(set(self.mapping_flows_uuid.index)) all_inputs = [x for x in all_inputs if type(x) != float] self.df_output = pd.DataFrame(index = all_inputs , columns = ['Amount']) if yield_ is not None and n_fert_input is not None: print("There is yield input and nitrogen fertilizer input, thanks!") pest_data, pest_total = self.get_background_data_pest(latitude,longitude,crop) ## add self. fert_data = self.get_background_data_fert(latitude,longitude,crop) ## add self. water_data = self.get_background_data_water(latitude,longitude,crop) ## add self. generic_yield, df_fert_per_kg, water_m3_per_kg, df_pest_per_kg, total_pest_per_kg = self.create_final_dataframe(yield_,fert_data,water_data,pest_data,pest_total) ## add self. self.df_output.loc["Default_pesticide","Amount"] = total_pest_per_kg self.df_output.loc["Default_irrigation","Amount"] = water_m3_per_kg/0.8 self.df_output.loc["Default_nitrogen_fertilizer","Amount"] = n_fert_input self.df_output.loc["Default_p2o5_fertilizer","Amount"] = df_fert_per_kg.loc["Mean_P2O5_applied","Value"] self.df_output.loc["Default_k2o_fertilizer","Amount"] = df_fert_per_kg.loc["Mean_K2O_applied","Value"] list_pesticides = list(pest_data.index) list_compartments = ["pest_em_soil", "pest_em_air"] for i in list_pesticides: for compartment in list_compartments: df_pesticides_emissions = self.calculate_pest_emissions(i,pest_data.loc[i,"value"]) self.df_output.loc[str(i)+""+compartment,"Amount"] = df_pesticides_emissions.loc[i,compartment] self.df_output = self.df_output.fillna(0) if yield_ is None and n_fert_input is None: print("You don't even know your yield! But we have an answer for you!") yield_data = self.get_background_data_yield(latitude,longitude,crop) pest_data, pest_total = self.get_background_data_pest(latitude,longitude,crop) ## add self. fert_data = self.get_background_data_fert(latitude,longitude,crop) ## add self. water_data = self.get_background_data_water(latitude,longitude,crop) ## add self. generic_yield, df_fert_per_kg, water_m3_per_kg, df_pest_per_kg, total_pest_per_kg = self.create_final_dataframe(yield_data,fert_data,water_data,pest_data,pest_total) ## add self. self.df_output.loc["Default_pesticide","Amount"] = total_pest_per_kg self.df_output.loc["Default_irrigation","Amount"] = water_m3_per_kg/0.8 self.df_output.loc["Default_nitrogen_fertilizer","Amount"] = df_fert_per_kg.loc["Mean_N_applied","Value"] self.df_output.loc["Default_p2o5_fertilizer","Amount"] = df_fert_per_kg.loc["Mean_P2O5_applied","Value"] self.df_output.loc["Default_k2o_fertilizer","Amount"] = df_fert_per_kg.loc["Mean_K2O_applied","Value"] list_pesticides = list(pest_data.index) list_compartments = ["pest_em_soil", "pest_em_air"] for i in list_pesticides: for compartment in list_compartments: df_pesticides_emissions = self.calculate_pest_emissions(i,pest_data.loc[i,"value"]) ## add self. self.df_output.loc[str(i)+"_"+compartment,"Amount"] = df_pesticides_emissions.loc[i,compartment] self.df_output = self.df_output.fillna(0)
[docs] def generate_edges_to_product(self): for name in list(self.mapping_flows_uuid.index): self.create_edge(self.df_output.loc[name,"Amount"], name) ## add self.
[docs] def create_edge(self, amount, name): #mapping_flows_uuid = mapping_flows_uuid self.product.new_edge( amount=amount, type= self.type_flow_node_ei(name, self.mapping_flows_uuid), input=self.code_node_ei(name, self.mapping_flows_uuid) ).save()
[docs] def code_node_ei(self, input_name,mapping_flows_uuid): uuid = self.mapping_flows_uuid.loc[input_name,"uuid"] name_database = mapping_flows_uuid.loc[input_name,"database"] return(name_database,uuid)
[docs] def type_flow_node_ei(self, input_name,mapping_flows_uuid): type_ = self.mapping_flows_uuid.loc[input_name,"type"] return type_
[docs] def create_main_process(self, name_database,Latitude,Longitude,crop): country = self.country_from_coordinates(Latitude,Longitude) name = "Production of "+crop+" in "+country data = { "code": uuid.uuid4().hex , "name": name, "reference product":crop, "location": country, "Latitude": Latitude, "Longitude": Longitude, "database":name_database, } self.product = self.db.new_node(**data) self.product.save() return(self.product)
[docs] def brightway_checkin(self): # bi.bw2setup() self.name_database = "Calcey_crop_database" if self.name_database in list(bd.databases): del bd.databases[self.name_database] self.db = bd.Database(self.name_database) self.db.register()
[docs] def calculate_LCA_results(self): lca = bc.LCA( demand={self.product: 1}, method = ('ReCiPe 2016 v1.03, midpoint (H) no LT', 'climate change no LT', 'global warming potential (GWP1000) no LT'), ) lca.lci() lca.lcia() print(lca.score)