import datetime
import itertools
import os
from typing import Optional
import ee
import json
from contextlib import redirect_stdout
import io
import logging
import warnings
import cdsapi
import ipyfilechooser as fc
import ipywidgets as widgets
from ipywidgets import DatePicker
from ipywidgets import VBox, HBox
from mcimageprocessing import config_manager
from mcimageprocessing.programmatic.APIs.EarthEngine import EarthEngineManager
from mcimageprocessing.programmatic.shared_functions.utilities import process_and_clip_raster
from osgeo import gdal
# Set error level to only report errors and ignore warnings - especially related to GloFas warnings regarding missing metadata for pygrib in the GRIB files natively from GloFas.
gdal.UseExceptions()
gdal.PushErrorHandler('CPLQuietErrorHandler')
gdal.SetConfigOption('CPL_LOG_ERRORS', 'OFF')
gdal.SetConfigOption('CPL_LOG', 'OFF')
[docs]
class GloFasAPI:
def __init__(self, ee_manager: Optional[EarthEngineManager] = None):
# ==============================================================================
# CONFIGURATION
# ==============================================================================
self.ee_instance = ee_manager if ee_manager else EarthEngineManager()
url = config_manager.config['KEYS']['GloFas']['url']
key = config_manager.config['KEYS']['GloFas']['key']
self.client = cdsapi.Client(url=url, key=key)
self.glofas_dict = {
"products": {
'cems-glofas-seasonal': {
"system_version": ['operational', 'version_3_1', 'version_2_2'],
'hydrological_model': ['lisflood'],
"variable": "river_discharge_in_the_last_24_hours",
"leadtime_hour": list(range(24, 5161, 24)),
"year": list(range(2019, datetime.date.today().year + 1)),
"month": ["01", "02", "03", "04", "05", "06", "07", "08", "09", "10",
"11", "12"],
# "day": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
# "area": [10.95, -90.95, -30.95, -29.95],
"format": "grib"
},
'cems-glofas-forecast': {
"system_version": ['operational', 'version_3_1', 'version_2_1'],
'hydrological_model': ['lisflood', 'htessel_lisflood'],
'product_type': [
'control_forecast', 'ensemble_perturbed_forecasts',
],
"variable": "river_discharge_in_the_last_24_hours",
"leadtime_hour": list(range(24, 721, 24)),
"year": list(range(2020, datetime.date.today().year + 1)),
"month": ["01", "02", "03", "04", "05", "06", "07", "08", "09", "10",
"11", "12"],
"day": list(range(24, 32)),
# "area": [10.95, -90.95, -30.95, -29.95],
"format": "grib"
},
'cems-glofas-reforecast': {
"system_version": ['version_4_0', 'version_3_1', 'version_2_2'],
'hydrological_model': ['lisflood', 'htessel_lisflood'],
'product_type': [
'control_forecast', 'ensemble_perturbed_forecasts',
],
"leadtime_hour": list(range(24, 1105, 24)),
"year": list(range(1999, datetime.date.today().year + 1)),
"month": ["01", "02", "03", "04", "05", "06", "07", "08", "09", "10",
"11", "12"],
"day": list(range(24, 32)),
# "area": [10.95, -90.95, -30.95, -29.95],
"format": "grib"
}
}
}
logging.getLogger('cdsapi').setLevel(logging.CRITICAL)
warnings.filterwarnings('ignore', message='.*Template .*')
# ==============================================================================
# PRIMARY FUNCTIONS
# ==============================================================================
[docs]
def download_data(self, product_name, request_parameters, file_name):
# Construct the file path
day = request_parameters.get('day', '01')
file_path = os.path.join(request_parameters['folder_location'], file_name)
f = io.StringIO()
# Call the CDS API
with redirect_stdout(f):
self.client.retrieve(
product_name,
{
'variable': request_parameters['variable'],
'format': request_parameters['format'],
'system_version': request_parameters['system_version'],
'hydrological_model': request_parameters['hydrological_model'],
'product_type': request_parameters['product_type'],
'year': request_parameters['year'],
'day': day,
'month': request_parameters['month'],
'leadtime_hour': request_parameters['leadtime_hour'],
'area': request_parameters['area'],
},
file_path
)
return file_path
[docs]
def download_glofas_data(self, bbox, params, index=None, distinct_values=None):
"""
:param bbox: The bounding box of the area to download Glofas data for.
:param glofas_params: The parameters for downloading Glofas data.
:param index: The index of the Glofas data.
:param distinct_values: The distinct values for the Glofas data (optional).
:return: The file path of the downloaded Glofas data.
"""
request_parameters = {
'glofas_product': params.get('glofas_product'),
'variable': 'river_discharge_in_the_last_24_hours',
'format': 'grib',
'system_version': params.get('system_version'),
'hydrological_model': params.get('hydrological_model'),
'product_type': params.get('product_type', 'ensemble_perturbed_forecasts'),
'year': params.get('year'),
'month': params.get('month'),
# Omit 'day' to use the default value or provide a specific day
'day': params.get('day', '01'),
'leadtime_hour': params.get('leadtime_hour'),
'area': [bbox['maxy'][0], bbox['minx'][0], bbox['miny'][0], bbox['maxx'][0]],
'folder_location': params.get('folder_location'),
}
index = index if index is not None else 0
# Construct file name based on the parameters
file_name = f"{params['glofas_product']}_{'userdefined' if distinct_values is None else '_'.join(str(value) for value in distinct_values)}_{index}_{params.get('year')}_{params.get('month')}_{request_parameters.get('day', '01')}.grib"
# Download data and return the file path
return self.download_data(params['glofas_product'], request_parameters, file_name)
# ==============================================================================
# HELPER FUNCTIONS
# ==============================================================================
[docs]
def no_data_helper_function(self, bbox, glofas_params, geometry, index, distinct_values):
"""
Helper function to handle 'no data available' scenario by trying different combinations.
"""
system_version_list = self.glofas_dict['products'][glofas_params['glofas_product']]['system_version']
hydrological_model_list = self.glofas_dict['products'][glofas_params['glofas_product']]['hydrological_model']
product_type_list = self.glofas_dict['products'][glofas_params['glofas_product']].get('product_type', [None])
all_combinations = list(itertools.product(system_version_list, hydrological_model_list, product_type_list))
last_attempted_combination = (glofas_params['system_version'], glofas_params['hydrological_model'],
glofas_params['product_type'] if glofas_params.get('product_type') else None)
all_combinations.remove(last_attempted_combination)
for comb in all_combinations:
try:
glofas_params['system_version'], glofas_params['hydrological_model'], glofas_params[
'product_type'] = comb
file_path = self.download_glofas_data(bbox, glofas_params, index, distinct_values)
processed_raster = process_and_clip_raster(file_path, geometry, glofas_params, self.ee_instance)
if processed_raster: # Check if processing was successful
return processed_raster
except Exception as e:
print(e)
if "no data is available within your requested subset" not in str(e):
break # Exit the loop if a different error occurs
print("No suitable data could be found for any combination.")
return None
def _create_sub_folder(self, base_folder: str) -> str:
"""
Create a new subfolder within the given base folder.
:param base_folder: The path of the base folder where the subfolder will be created.
:type base_folder: str
:return: The path of the newly created subfolder.
:rtype: str
"""
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") # More readable timestamp
folder_name = os.path.join(base_folder, f"glofas_processed_on_{timestamp}")
try:
os.mkdir(folder_name)
return folder_name
except OSError as e:
self.logger.error(f"Failed to create subfolder: {e}")
return base_folder
[docs]
def get_last_day_of_month(self, year, month):
"""
Get the last day of the given month and year.
:param year: The year
:param month: The month
:return: The date of the last day of the given month and year.
"""
next_month = month % 12 + 1
next_month_first_day = datetime.date(year if next_month != 1 else year + 1, next_month, 1)
last_day_of_month = next_month_first_day - datetime.timedelta(days=1)
return last_day_of_month
[docs]
class GloFasAPINotebookInterface(GloFasAPI):
def __init__(self, ee_manager: Optional[EarthEngineManager] = None):
super().__init__(ee_manager)
self.out = widgets.Output() # For displaying logs, errors, etc.
# Initialize widgets
self.glofas_stack = VBox([])
[docs]
def create_glofas_dropdown(self, dropdown_options, description, default_value):
"""
Creates a dropdown widget for the GLOFAS application.
:param dropdown_options: A list of options for the dropdown.
:param description: The description label for the dropdown.
:param default_value: The default value for the dropdown.
:return: A Dropdown widget for the GLOFAS application.
"""
dropdown = widgets.Dropdown(
options=dropdown_options,
value=default_value, # the default value
description=description,
disabled=False,
)
return dropdown
[docs]
def get_available_dates(self, glofas_option):
"""Generate a list of available dates based on the selected GloFas option."""
min_year = min(self.glofas_dict['products'][glofas_option]['year'])
max_year = max(self.glofas_dict['products'][glofas_option]['year'])
min_month = 1 # Assuming January is always included
max_month = 12 # Assuming December is always included
min_day = 1
available_dates = []
for year in range(min_year, max_year + 1):
for month in range(min_month, max_month + 1):
for day in range(min_day, self.get_last_day_of_month(year, month).day + 1):
if datetime.date(year, month, day) >= datetime.date.today():
break
available_dates.append(datetime.date(year, month, day))
return available_dates
[docs]
def update_date_dropdown(self, glofas_option):
"""Update the date dropdown with available dates based on the selected GloFas option."""
available_dates = self.get_available_dates(glofas_option)
formatted_date_options = [(date.strftime('%Y-%m-%d'), date) for date in available_dates]
return formatted_date_options
[docs]
def on_single_or_date_range_change(self, change, glofas_option: str):
"""
Handles the change event when the option for single date or date range is changed.
:param change: A dictionary containing information about the change event.
:param glofas_option: The selected Glofas option.
:return: None
"""
single_or_date_range_value = change['new']
options = self.update_date_dropdown(glofas_option)
if single_or_date_range_value == 'Single Date':
# Create the DatePicker widget with constraints
self.date_picker = widgets.Dropdown(
options=options,
description='Select Date:',
disabled=False,
)
self.glofas_date_vbox.children = [self.date_picker]
else:
# Create the DatePicker widgets with constraints
self.date_picker = HBox([
widgets.Dropdown(
options=options,
description='Select Start Date:',
disabled=False
),
widgets.Dropdown(
options=options,
description='Select End Date:',
disabled=False
)])
self.glofas_date_vbox.children = [self.date_picker]
[docs]
def update_max_date(self, year, month):
"""
Update the maximum date of the DatePicker when the year or month changes.
:param year: The selected year.
:param month: The selected month.
"""
max_date = self.get_last_day_of_month(year, month)
self.date_picker.max = max_date
[docs]
def on_glofas_option_change(self, change):
"""
Updates the glofas_stack based on the new value received in the change parameter.
:param change: A dictionary containing the new value of the glofas option.
:return: None
"""
new_value = change['new']
self.glofas_stack.children = () # Clear the glofas_stack
self.update_glofas_container(new_value)
[docs]
def update_glofas_container(self, glofas_value):
"""
Update the GloFAS container based on the selected GloFAS product.
:param glofas_value: The selected GloFAS product.
:return: None
"""
specific_widgets = self.create_widgets_for_glofas(glofas_value)
# Replace the children of the glofas_stack with the specific widgets
self.glofas_stack.children = tuple(specific_widgets)
# else:
# # If the selected GloFAS product is not recognized, clear the glofas_stack
# self.glofas_stack.children = ()
[docs]
def gather_parameters(self, glofas_product: str):
"""
:param glofas_product: The type of GloFAS product.
:return: A dictionary containing the parameters required for the given GloFAS product.
The `get_glofas_parameters` method takes in the `glofas_product` parameter to determine the type of GloFAS product. It then collects the necessary parameters based on the type of product
* and returns them in a dictionary.
Note: The returned dictionary may vary depending on the value of `glofas_product`.
Example usages:
```
parameters = get_glofas_parameters('cems-glofas-seasonal')
# Returns:
# {
# 'system_version': system_version,
# 'hydrological_model': hydrological_model,
# 'leadtime_hour': leadtime_hour,
# 'year': year,
# 'month': month,
# 'day': day,
# """
date_type = self.single_or_date_range.value
system_version = self.system_version.value.replace('.', '_').lower()
hydrological_model = self.hydrological_model.value
try:
product_type = self.product_type.value.replace('.', '_').lower()
except AttributeError:
product_type = None
leadtime_hour = self.leadtime.value
if date_type == 'Single Date':
date = self.date_picker.value
year = str(date.year)
month = int(date.month)
day = str(date.day)
elif date_type == 'Date Range':
start_date = self.date_picker.children[0].value
end_date = self.date_picker.children[1].value
year = str(start_date.year)
month = int(start_date.month)
day = str(start_date.day)
folder_location = self.filechooser.selected
create_sub_folder = self.create_sub_folder.value
clip_to_geometry = self.clip_to_geometry.value
add_image_to_map = self.add_image_to_map.value
no_data_helper = self.no_data_helper_checklist.value
if glofas_product == 'cems-glofas-seasonal':
return {
'glofas_product': glofas_product,
'system_version': system_version,
'hydrological_model': hydrological_model,
'leadtime_hour': leadtime_hour,
'year': year,
'month': month,
'day': day,
'folder_location': folder_location,
'create_sub_folder': create_sub_folder,
'clip_to_geometry': clip_to_geometry,
'add_image_to_map': add_image_to_map,
'no_data_helper': no_data_helper
}
elif glofas_product == 'cems-glofas-forecast':
return {
'glofas_product': glofas_product,
'system_version': system_version,
'hydrological_model': hydrological_model,
'product_type': product_type,
'leadtime_hour': leadtime_hour,
'year': year,
'month': month,
'day': day,
'folder_location': folder_location,
'create_sub_folder': create_sub_folder,
'clip_to_geometry': clip_to_geometry,
'add_image_to_map': add_image_to_map,
'no_data_helper': no_data_helper
}
elif glofas_product == 'cems-glofas-reforecast':
return {
'glofas_product': glofas_product,
'system_version': system_version,
'hydrological_model': hydrological_model,
'product_type': product_type,
'leadtime_hour': leadtime_hour,
'year': year,
'month': month,
'day': day,
'folder_location': folder_location,
'create_sub_folder': create_sub_folder,
'clip_to_geometry': clip_to_geometry,
'add_image_to_map': add_image_to_map,
'no_data_helper': no_data_helper
}
else:
print("Invalid GloFAS product.")
return None
[docs]
def process_api(self, geometry, distinct_values, index, bbox, params, pbar=None):
"""
Process the GLOFAS API data.
"""
try:
pbar.update(4)
pbar.set_postfix_str("Downloading data...")
if params['create_sub_folder']:
# Create a sub-folder
folder_path = params['folder_location']
params['folder_location'] = self._create_sub_folder(params['folder_location'])
try:
os.rename(os.path.join(folder_path, 'geometry.geojson'),
os.path.join(params['folder_location'], 'geometry.geojson'))
except PermissionError:
pass
params_file_path = os.path.join(params['folder_location'], 'parameters.json')
with open(params_file_path, 'w') as f:
json.dump(params, f)
if self.single_or_date_range.value == "Date Range":
try:
start_date = self.date_picker.children[0].value
end_date = self.date_picker.children[1].value
current_date = start_date
if isinstance(start_date, datetime.datetime):
start_date = start_date.date()
if isinstance(end_date, datetime.datetime):
end_date = end_date.date()
if isinstance(current_date, datetime.datetime):
current_date = current_date.date()
while current_date <= end_date:
params['year'] = str(current_date.year)
params['month'] = current_date.month
params['day'] = str(current_date.day)
file_path = self.download_glofas_data(bbox=bbox, params=params, index=index, distinct_values=distinct_values)
pbar.update(4)
pbar.set_postfix_str("Processing data...")
processed_raster = process_and_clip_raster(file_path, geometry, params, self.ee_instance)
current_date += datetime.timedelta(days=1)
except Exception as e:
print(e)
if "no data is available within your requested subset" in str(e) and params['no_data_helper']:
return self.no_data_helper_function(bbox, params, geometry, index, distinct_values)
else:
print("An error occurred that couldn't be handled by the no data helper function.")
return None
else:
file_path = self.download_glofas_data(bbox=bbox, params=params, index=index, distinct_values=distinct_values)
pbar.update(4)
pbar.set_postfix_str("Processing data...")
processed_raster = process_and_clip_raster(file_path, geometry, params, self.ee_instance)
# Serialize the geometry to GeoJSON
if isinstance(geometry, ee.Geometry):
geojson_geometry = geometry.getInfo() # If geometry is an Earth Engine object
elif isinstance(geometry, ee.Feature):
geojson_geometry = geometry.getInfo()
elif isinstance(geometry, ee.FeatureCollection):
geojson_geometry = geometry.getInfo()
else:
geojson_geometry = geometry # If geometry is already in GeoJSON format
# Define the GeoJSON filename
geojson_filename = os.path.join(params['folder_location'], 'geometry.geojson')
# Write the GeoJSON to a file
with open(geojson_filename, 'w') as f:
f.write(json.dumps(geojson_geometry))
pbar.update(2)
pbar.set_postfix_str("Finished!")
return processed_raster
except Exception as e:
print(e)
if "no data is available within your requested subset" in str(e) and params['no_data_helper']:
return self.no_data_helper_function(bbox, params, geometry, index, distinct_values)
else:
print("An error occurred that couldn't be handled by the no data helper function.")
return None
[docs]
def setup_global_variables(self):
self.glofas_dict = {
"products": {
# 'cems-glofas-seasonal': {
# "system_version": ['operational', 'version_3_1', 'version_2_2'],
# 'hydrological_model': ['lisflood'],
# "variable": "river_discharge_in_the_last_24_hours",
# "leadtime_hour": list(range(24, 5161, 24)),
# "year": list(range(2019, datetime.date.today().year + 1)),
# "month": ["01", "02", "03", "04", "05", "06", "07", "08", "09", "10",
# "11", "12"],
# # "day": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
# # "area": [10.95, -90.95, -30.95, -29.95],
# "format": "grib"
# },
'cems-glofas-forecast': {
"system_version": ['operational', 'version_3_1', 'version_2_1'],
'hydrological_model': ['lisflood', 'htessel_lisflood'],
'product_type': [
'control_forecast', 'ensemble_perturbed_forecasts',
],
"variable": "river_discharge_in_the_last_24_hours",
"leadtime_hour": list(range(24, 721, 24)),
"year": list(range(2020, datetime.date.today().year + 1)),
"month": ["01", "02", "03", "04", "05", "06", "07", "08", "09", "10",
"11", "12"],
"day": list(range(24, 32)),
# "area": [10.95, -90.95, -30.95, -29.95],
"format": "grib"
},
# 'cems-glofas-reforecast': {
# "system_version": ['version_4_0', 'version_3_1', 'version_2_2'],
# 'hydrological_model': ['lisflood', 'htessel_lisflood'],
# 'product_type': [
# 'control_forecast', 'ensemble_perturbed_forecasts',
# ],
# "leadtime_hour": list(range(24, 1105, 24)),
# "year": list(range(1999, datetime.date.today().year + 1)),
# "month": ["01", "02", "03", "04", "05", "06", "07", "08", "09", "10",
# "11", "12"],
# "day": list(range(24, 32)),
# # "area": [10.95, -90.95, -30.95, -29.95],
# "format": "grib"
# }
}
}