import pandas as pd
from osgeo import gdal
import netCDF4
import numpy as np
import os
import openpyxl
from geopy.geocoders import Photon
import country_converter as coco
[docs]
class calcey:
def __init__(self):
pass
####### lat lon country convert stuff #################
[docs]
def country_from_coordinates(self, Latitude: float, Longitude: float):
lat_str = str(Latitude)
lon_str = str(Longitude)
geolocator = Photon(user_agent="measurements")
location = geolocator.reverse(lat_str + "," + lon_str)
address = location.address
data_address = address.split(',')
print("data_address:" , data_address)
country = data_address[len(data_address)-1].lstrip().rstrip()
return(country)
[docs]
def ISO3_country_from_coordinates(self, Latitude: float,Longitude:float):
country = self.country_from_coordinates(Latitude, Longitude)
country_ISO3 = coco.convert(country,to='ISO3')
print(country, country_ISO3)
return(country_ISO3)
####### crop mapping #################
[docs]
def get_crop_mapping_file(self, sheet_name: str):
crop_mapping_df = pd.read_excel("../data/Mapping_data_Calcey.xlsx", sheet_name=sheet_name, index_col = 0)
return crop_mapping_df
[docs]
def crop_proxy_excelwithsheet(self, crop:str, sheet_name: str ,level:int=1):
'''
crop_mapping_file should be something like
crop_mapping_file = pd.read_excel([path_file], sheet_name = [sheet_name], index_col = 0)
'''
crop_mapping_file = self.get_crop_mapping_file(sheet_name)
return(crop_mapping_file.loc[crop,"Level_"+str(level)])
####### country mapping #################
[docs]
def country_fertilizer_proxy_location(self, Latitude: float, Longitude: float):
country_ISO3 = self.ISO3_country_from_coordinates(Latitude, Longitude)
country_mapping_file = pd.read_excel("../data/Fertilizer_mapping_country.xlsx", sheet_name = "Mapping_nearest_neighbor", index_col = 0)
country_proxy_ISO3 = country_mapping_file.loc[country_ISO3,"iso3_nearest"][0]
return(country_proxy_ISO3)
################## get background data ###########################
[docs]
def get_background_data_fert(self, Latitude: float, Longitude: float, crop:str):
"""
Latitude, Longitude, crop
"""
# get fetilizer data
df_amount_fert_crop_country = pd.read_excel("../data/Ludemann2022_fertilizer_Country_clean_ISO3.xlsx")
df_amount_fert_crop_country = df_amount_fert_crop_country.set_index(["Country_ISO3","Crop"])
# get the crop mapping file
crop_mapping_file = self.get_crop_mapping_file(sheet_name="Mapping_Fertilizer")
# get country proxy in ISO 3 format
country_proxy_ISO3 = self.country_fertilizer_proxy_location(Latitude, Longitude)
print(country_proxy_ISO3)
try:
return(df_amount_fert_crop_country.loc[(country_proxy_ISO3,crop)])
except KeyError:
for level in range(1,len(crop_mapping_file.columns)-1):
try:
crop_proxy_value = self.crop_proxy_excelwithsheet(crop=crop,sheet_name="Mapping_Fertilizer",level=level)
return(df_amount_fert_crop_country.loc[(country_proxy_ISO3,crop_proxy_value)])
except KeyError:
pass
[docs]
def get_background_data_yield(self, input_lat: float, input_lon: float, crop: str):
# get yield data
df = pd.read_csv("../data/FAOSTAT_data_yield.csv")
country = self.country_from_coordinates(Latitude=input_lat, Longitude=input_lon)
FAO_yield = float(df.loc[(df['Area'] == country) & (df['Year'] == 2022) & (df['Item'] == crop), 'Value'].iloc[0])
return FAO_yield
[docs]
def get_background_data_water(self, input_lat: float, input_lon: float, crop: str):
path_to_folder = "../data/NC_background_water/"
crop_water = self.crop_proxy_excelwithsheet(crop=crop, sheet_name="Mapping_water")
df_list = []
for filename in os.listdir(path_to_folder):
# Check if the file is a .nc file
if filename.endswith('.nc') and crop_water in filename:
# Open the file
ds = netCDF4.Dataset(path_to_folder + filename, 'r')
# identify the correct grid element
lat_var = ds.variables['lat'][:]
lon_var = ds.variables['lon'][:]
lat_idx = np.argmin(np.abs(lat_var - input_lat))
lon_idx = np.argmin(np.abs(lon_var - input_lon))
try:
# Get the raster data (assuming it's in the same variable)
value = ds.variables['wf_unit_irrigated_blue'][:].data[lat_idx, lon_idx]
except KeyError:
value = np.NaN
# 1e20 is the null value in this dataset, so we have to replace it
value = 0 if value > 10000000000000 else value
# Create a new DataFrame for each iteration
new_df = pd.DataFrame({'filename': [filename], 'value': [value]})
# Append the new DataFrame to the list
df_list.append(new_df)
# Close the file
ds.close()
# Concatenate the DataFrames in the list
df = pd.concat(df_list, ignore_index=True)
sep = crop_water + "_"
df[['crop_name', 'water_footprint']] = df['filename'].str.split(sep, n=1, expand=True)
df['water_footprint'] = df['water_footprint'].str.rstrip('.nc')
df = df.drop(['filename', 'crop_name'], axis=1)
df = df.loc[:, ['water_footprint', 'value']]
return df
[docs]
def get_background_data_pest(self, input_lat: float, input_lon: float, crop: str):
"""
test
"""
path_to_folder = "../data/NC_background_pest/"
crop_pest = self.crop_proxy_excelwithsheet(crop=crop, sheet_name="Mapping_pesticides")
df_list = []
for filename in os.listdir(path_to_folder):
# Check if the file is a .nc file
if filename.endswith('.nc') and crop_pest in filename:
# Open the file
ds = netCDF4.Dataset(path_to_folder + filename, 'r')
# identify the correct grid element
lat_var = ds.variables['lat'][:]
lon_var = ds.variables['lon'][:]
lat_idx = np.argmin(np.abs(lat_var - input_lat))
lon_idx = np.argmin(np.abs(lon_var - input_lon))
try:
# Get the raster data (assuming it's in the same variable)
high_est = ds.variables['apr_H'][:].data[1]
# Get the local value (using the same lat and lon coordinates)
local_high_value = high_est[lat_idx, lon_idx]
local_high_value = 0 if local_high_value == -1 else local_high_value
except KeyError:
local_high_value = np.NaN
try:
# Get the raster data (assuming it's in the same variable)
low_est = ds.variables['apr_L'][:].data[1]
# Get the local value (using the same lat and lon coordinates)
local_low_value = low_est[lat_idx, lon_idx]
local_low_value = 0 if local_low_value == -1 else local_low_value
except KeyError:
local_low_value = np.NaN
value = (local_high_value + local_low_value) / 2
# Create a new DataFrame for each iteration
new_df = pd.DataFrame({'filename': [filename], 'value': [value]})
# Append the new DataFrame to the list
df_list.append(new_df)
# Close the file
ds.close()
# Concatenate the DataFrames in the list
df = pd.concat(df_list, ignore_index=True)
sep = crop_pest + "_"
df[['crop_name', 'pesticide_name']] = df['filename'].str.split(sep, n=1, expand=True)
df['pesticide_name'] = df['pesticide_name'].str.rstrip('.nc')
df = df.drop(['filename', 'crop_name'], axis=1)
df = df.loc[:, ['pesticide_name', 'value']]
pest_total = df['value'].sum()
return df, pest_total
[docs]
def create_final_dataframe(self, yield_data, fert_data, water_data, pest_data, pest_total):
# yield data
yield_in_kgperha = yield_data / 10
# fert data
series = fert_data.drop('Country')
df_fert = pd.DataFrame({
'Parameter': series.index,
'Value': series.values
})
df_fert = df_fert.set_index(df_fert.columns[0])
df_fert['Value'] = df_fert['Value'] / yield_in_kgperha
# water_data
water_m3perkg = water_data.at[0, 'value']
# pest data
pest_data['value'] = pest_data['value'] / yield_in_kgperha
pest_data = pest_data.set_index(pest_data.columns[0])
# pest total
pest_kgperkg = pest_total / yield_in_kgperha
return yield_in_kgperha, df_fert, water_m3perkg, pest_data, pest_kgperkg