import calendar
import datetime
import json
import os
import re
from typing import ClassVar
import ee
import geemap
import ipyfilechooser as fc
import ipywidgets as widgets
import requests
from ipywidgets import Layout
from pydantic import BaseModel, Extra
from pydantic import root_validator
from shapely.geometry import shape
from mcimageprocessing import config_manager
[docs]
class EarthEngineManager(BaseModel):
year_ranges: list = []
vis_params: ClassVar[dict] = {
'NDVI': {
'min': 0, 'max': 1,
'palette': [
'FFFFFF', 'CE7E45', 'DF923D', 'F1B555', 'FCD163', '99B718',
'74A901', '66A000', '529400', '3E8601', '207401', '056201',
'004C00', '023B01', '012E01', '011D01', '011301'
]
}
}
ee_dates: list = []
# Class-level aggregation functions that do not change and are not instance-specific
aggregation_functions: ClassVar[dict] = {
'mode': lambda ic: ic.mode(),
'median': lambda ic: ic.median(),
'mean': lambda ic: ic.mean(),
'max': lambda ic: ic.max(),
'min': lambda ic: ic.min(),
'sum': lambda ic: ic.reduce(ee.Reducer.sum()),
'first': lambda ic: ic.sort('system:time_start', False).first()
}
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.load_credentials()
[docs]
def load_credentials(self):
# Assuming `config_manager` is defined elsewhere and loaded appropriately
credentials = ee.ServiceAccountCredentials(
email=config_manager.config['KEYS']['GEE']['client_email'],
key_data=config_manager.config['KEYS']['GEE']['private_key']
)
ee.Initialize(credentials)
[docs]
@classmethod
def validate_aggregation_function(cls, function):
if function not in cls.aggregation_functions:
raise ValueError(
f"Invalid aggregation function: {function}. Must be one of: {', '.join(cls.aggregation_functions.keys())}")
return function
[docs]
def generate_monthly_date_ranges(self, start_date, end_date):
"""
:param start_date: The start date of the date ranges to generate. Should be a datetime.date object.
:param end_date: The end date of the date ranges to generate. Should be a datetime.date object.
:return: A list of tuples, where each tuple represents a monthly date range. Each tuple contains the start date and end date of the respective month.
"""
date_ranges = []
for year in range(start_date.year, end_date.year + 1):
for month in range(1, 13):
# Skip months before the start month in the start year
if year == start_date.year and month < start_date.month:
continue
# Stop if the month is after the end month in the end year
if year == end_date.year and month > end_date.month:
break
# Get the last day of the month
last_day = calendar.monthrange(year, month)[1]
# Format the start and end dates of the month
month_start_date = datetime.date(year, month, 1).strftime('%Y-%m-%d')
month_end_date = datetime.date(year, month, last_day).strftime('%Y-%m-%d')
# Append the tuple of the start and end date of the month to the list
date_ranges.append((month_start_date, month_end_date))
return date_ranges
[docs]
def generate_yearly_date_ranges(self, start_date, end_date):
"""
:param start_date: The start date of the date range.
:param end_date: The end date of the date range.
:return: A list of tuples, where each tuple represents the start and end dates for each year within the specified range.
"""
# Convert start and end dates from strings to date objects
# start_date_dt = datetime.datetime.strptime(start_date, '%Y-%m-%d')
# end_date_dt = datetime.datetime.strptime(end_date, '%Y-%m-%d')
# Check if end_date is supplied and valid
if not end_date or end_date < start_date:
raise ValueError("The end date must be provided and must be after the start date.")
date_ranges = []
for year in range(start_date.year, end_date.year + 1):
# Determine the start of the year
if year == start_date.year:
year_start_date = start_date
else:
year_start_date = datetime.date(year, 1, 1)
# Determine the end of the year
if year == end_date.year:
year_end_date = end_date
else:
year_end_date = datetime.date(year, 12, 31)
# Format the start and end dates of the year
year_start_date_str = year_start_date.strftime('%Y-%m-%d')
year_end_date_str = year_end_date.strftime('%Y-%m-%d')
# Append the tuple of the start and end date of the year to the list
date_ranges.append((year_start_date_str, year_end_date_str))
return date_ranges
[docs]
def get_image_collection_dates(self, collection: str, min_max_only: bool = False):
"""
:param collection: The name of the image collection to retrieve dates from. It should be in the format 'path/to/collection'.
:param min_max_only: A boolean value indicating whether to only return the minimum and maximum dates in the collection. Default is False.
:return: A list of dates in the image collection. If `min_max_only` is True, it will return a list with two dates, representing the minimum and maximum dates in the collection. Otherwise
*, it will return a list of all dates in the collection.
"""
# Regular expression to match both formats
pattern = r'^[A-Za-z0-9_-]+(?:/[A-Za-z0-9_-]+){1,2}$'
def format_dates(img):
original_date = ee.Date(img.get('system:time_start'))
current_day = original_date.format('YYYY-MM-dd')
return ee.Feature(None, {'current_date': current_day})
def get_min_max_dates(collection):
# Get the minimum date in the collection.
min_date = ee.Date(collection.aggregate_min('system:time_start')).format('YYYY-MM-dd')
# Get the maximum date in the collection.
max_date = ee.Date(collection.aggregate_max('system:time_start')).format('YYYY-MM-dd')
min_date = min_date.getInfo()
max_date = max_date.getInfo()
return [min_date, max_date]
collection = ee.ImageCollection(collection)
if min_max_only:
return get_min_max_dates(collection)
else:
formatted_dates = collection.map(format_dates)
current_date_list = formatted_dates.aggregate_array('current_date')
ee_date_list = current_date_list.getInfo()
self.ee_dates = [x for x in ee_date_list]
return self.ee_dates
[docs]
def calculate_statistics(self, img, geometry, band):
"""
:param img: The image on which to calculate the statistics.
:param geometry: The geometry within which to calculate the statistics.
:param band: The band of the image on which to calculate the statistics.
:return: The calculated statistics as a dictionary.
This method calculates various statistics for a given image within a specified geometry and band. It uses the Earth Engine reducers to calculate mean, sum, maximum, minimum, standard
* deviation, variance, and median.
The `img` parameter is the image object on which the statistics are calculated.
The `geometry` parameter is the geometry object that defines the region within which the statistics are calculated.
The `band` parameter is the name of the band on which the statistics are calculated.
The method returns the calculated statistics as a dictionary. The keys of the dictionary represent the statistic types ('mean', 'sum', 'max', 'min', 'stdDev', 'variance', 'median') and
* the values represent the computed statistics for each type.
"""
# Define the reducers for each statistic you want to calculate
reducers = ee.Reducer.mean().combine(
reducer2=ee.Reducer.sum(),
sharedInputs=True
).combine(
reducer2=ee.Reducer.max(),
sharedInputs=True
).combine(
reducer2=ee.Reducer.min(),
sharedInputs=True
).combine(
reducer2=ee.Reducer.stdDev(),
sharedInputs=True
).combine(
reducer2=ee.Reducer.variance(),
sharedInputs=True
).combine(
reducer2=ee.Reducer.median(),
sharedInputs=True
)
# Apply the reducers to the image
stats = img.reduceRegion(reducer=reducers, geometry=geometry, maxPixels=1e12)
return stats
[docs]
def get_image(self,
multi_date: bool,
aggregation_period: str=None,
aggregation_method: str=None,
start_date:str=None,
end_date:str=None,
date:str=None,
create_sub_folder=None,
scale=None,
image_collection:str=None,
add_to_map:bool=None,
band:str=None,
additional_filter=False,
filter_argument=None,
geometry=None,
statistics_only=False
):
"""
:param multi_date: Boolean indicating whether multiple dates will be used for aggregation
:param aggregation_period: String indicating the period over which to aggregate the images
:param aggregation_method: String indicating the method of aggregation to be used
:param start_date: String indicating the start date of the image collection
:param end_date: String indicating the end date of the image collection
:param date: String indicating the date for which to retrieve the image
:param create_sub_folder: Boolean indicating whether to create a subfolder for the downloaded image
:param scale: Scale at which to retrieve the image. Default is None.
:param image_collection: String indicating the Earth Engine image collection to use
:param add_to_map: Boolean indicating whether to add the retrieved image to the map
:param band: String indicating the band to retrieve from the image collection
:param additional_filter: Boolean indicating whether to apply an additional filter to the image collection
:param filter_argument: Argument to be used for additional filtering. Default is None.
:param geometry: Geometry object representing the region of interest
:param statistics_only: Boolean indicating whether to only return additional statistics about the image
:return: Tuple containing the retrieved image, the boundary of the region, and the nominal scale of the image
"""
if multi_date:
self.__class__.validate_aggregation_function(aggregation_method)
img_function = EarthEngineManager.aggregation_functions.get(aggregation_method)
if img_function is None:
raise ValueError(f"Invalid aggregation function: {aggregation_method}")
boundary = geometry
img_collection = ee.ImageCollection(image_collection).filter(
ee.Filter.date(start_date, end_date)).select(band).filter(ee.Filter.bounds(geometry))
ee_img_scale = img_collection.first().projection().nominalScale().getInfo()
mask = ee.Image.constant(1).clip(geometry)
img = self.__class__.aggregation_functions[aggregation_method](img_collection).clip(boundary)
# Step 1: Mask where value is not -9999
valid_pixels = img.neq(-9999)
# Step 2: Combine valid_pixels mask with nigeria_mask
combined_mask = valid_pixels.And(mask)
# Step 3: Use the combined mask to mask the aggregated image
img_masked = img.updateMask(combined_mask)
nodata_value = 0
img = img_masked.unmask(nodata_value)
return img, boundary, ee_img_scale
else:
if isinstance(geometry, ee.Feature):
geometry = geometry.geometry()
elif isinstance(geometry, ee.Geometry):
geometry = geometry
else:
raise ValueError("The region must be a Feature or Geometry.")
img_collection = ee.ImageCollection(image_collection).filterDate(ee.Date(date)).select(band).filter(ee.Filter.bounds(geometry))
ee_img_scale = img_collection.first().projection().nominalScale().getInfo()
mask = ee.Image.constant(1).clip(geometry)
img = img_collection.first().clip(geometry)
# Step 1: Mask where value is not -9999
valid_pixels = img.neq(-9999)
# Step 2: Combine valid_pixels mask with nigeria_mask
combined_mask = valid_pixels.And(mask)
# Step 3: Use the combined mask to mask the aggregated image
img_masked = img.updateMask(combined_mask)
nodata_value = 0
img = img_masked.unmask(nodata_value)
return img, geometry, ee_img_scale
[docs]
def split_and_sort_geometry(self, geometry, num_sections):
"""
:param geometry: The geometry to be split and sorted.
:param num_sections: The number of sections to split the geometry into.
:return: A list of sorted grid cells obtained by splitting the geometry.
This method takes a geometry and splits it into a grid of specified number of sections. The grid cells are then sorted based on their area in descending order.
Example usage:
```
geometry = ee.Geometry.Polygon(<coordinates>)
num_sections = 4
sorted_grid = split_and_sort_geometry(geometry, num_sections)
```
"""
bounds = geometry.bounds()
coords = bounds.getInfo()['coordinates'][0]
min_x, max_x = min(coords, key=lambda x: x[0])[0], max(coords, key=lambda x: x[0])[0]
min_y, max_y = min(coords, key=lambda x: x[1])[1], max(coords, key=lambda x: x[1])[1]
# Calculate dimensions for the grid
num_rows = num_cols = int(num_sections ** 0.5)
x_step = (max_x - min_x) / num_cols
y_step = (max_y - min_y) / num_rows
grid = []
for i in range(num_cols):
for j in range(num_rows):
x_start = min_x + i * x_step
y_start = min_y + j * y_step
cell = ee.Geometry.Rectangle([x_start, y_start, x_start + x_step, y_start + y_step])
intersected_cell = cell.intersection(geometry, ee.ErrorMargin(x_step))
grid.append(intersected_cell)
# Sort the grid cells by area, largest first
sorted_grid = sorted(grid, key=lambda g: g.area().getInfo(), reverse=True)
return sorted_grid
[docs]
def get_image_download_url(self, region, scale, img, format='GEO_TIFF', crs='EPSG:4326'):
"""
:param region: The region of interest for the image download. It can be either an ee.Feature or ee.Geometry object.
:param scale: The scale of the image. It can be either a numerical value or 'default' to use the default scale of the image.
:param img: The image to download.
:param format: The format of the downloaded image. Default is 'GEO_TIFF'.
:param crs: The coordinate reference system of the downloaded image. Default is 'EPSG:4326'.
:return: The download URL for the image.
This method takes in a region of interest, scale, image, format, and coordinate reference system (CRS) as parameters and returns the URL to download the image. The region can be specified
* as an ee.Feature or ee.Geometry object. The scale can be provided as a numerical value or 'default' to use the default scale of the image. The format parameter determines the format
* in which the image will be downloaded, with the default being 'GEO_TIFF'. The crs parameter specifies the CRS of the downloaded image, with the default being 'EPSG:4326'.
"""
if isinstance(region, ee.Feature):
geometry = region.geometry()
elif isinstance(region, ee.Geometry):
geometry = region
else:
raise ValueError("The region must be a Feature or Geometry.")
if scale == 'default':
scale = img.projection().nominalScale().getInfo()
else:
pass
return img.getDownloadURL({
'region': geometry,
'scale': scale,
'crs': crs,
'format': format
})
[docs]
@staticmethod
def download_file_from_url(url, destination_path):
"""Download a file from the given URL.
:param url: The URL of the file to be downloaded.
:param destination_path: The path where the downloaded file will be saved.
:return: None
"""
response = requests.get(url, stream=True)
response.raise_for_status()
with open(destination_path, 'wb') as file:
for chunk in response.iter_content(chunk_size=8192):
file.write(chunk)
[docs]
def img_min_max(self, img, scale, min_threshold=None, boundary=None, band=None):
"""
:param img: The input image
:param scale: The scale at which to compute the min and max values
:param min_threshold: An optional threshold to exclude values below
:param boundary: An optional geometry to limit the computation to a specific area
:param band: The band to compute the min and max values for
:return: The minimum and maximum values of the specified band as a tuple (min_value, max_value)
"""
max_pixels = 1e12
# If a threshold is provided, mask the image to exclude values below the threshold
if min_threshold is not None:
img = img.updateMask(img.gte(min_threshold))
stats = img.reduceRegion(
reducer=ee.Reducer.minMax(),
bestEffort=True,
scale=scale,
geometry=boundary, # Add this line
maxPixels=max_pixels
).getInfo()
min_value = stats[f"{band}_min"]
max_value = stats[f"{band}_max"]
return min_value, max_value
[docs]
def plot_image(self, img, vis_params):
"""
Plot an image on a geemap.Map object.
:param img: The image to be plotted.
:param vis_params: The visualization parameters for the image.
:return: The geemap.Map object with the image plotted.
"""
center_lat = 2
center_lon = 32
zoomlevel = 6
map = geemap.Map(center=[center_lat, center_lon], zoom=zoomlevel)
map.addLayer(img, vis_params=vis_params)
map.addLayerControl()
return map
[docs]
def process_images(self, start_date, end_date, image_collection, band, country, aggregation_type, function):
"""
Process images within a specified date range.
:param start_date: The start date of the date range.
:param end_date: The end date of the date range.
:param image_collection: The collection of images to process.
:param band: The desired band of the images.
:param country: The country for which to process the images.
:param aggregation_type: The type of aggregation for the images (either "monthly" or "yearly").
:param function: The function to apply to the images.
:return: None
"""
# Validate aggregation type
if aggregation_type.lower() not in ("monthly", "yearly"):
raise ValueError('Aggregation type should be either "monthly" or "yearly".')
# Generate date ranges
if aggregation_type.lower() == "monthly":
date_ranges = self.generate_monthly_date_ranges(int(start_date[:4]))
else:
date_ranges = self.generate_yearly_date_ranges(int(start_date[:4]))
for start, end in date_ranges:
start = datetime.datetime.strptime(start, "%Y-%m-%d").date()
end = datetime.datetime.strptime(end, "%Y-%m-%d").date()
# If the date range is within the specified start date and end date
if start >= datetime.datetime.strptime(start_date, "%Y-%m-%d").date() and end <= datetime.datetime.strptime(end_date,
"%Y-%m-%d").date():
print(f"Processing images from {start} to {end}...")
# Proceed to fetch and process image
img, boundary = self.get_image(function, str(start), str(end), image_collection, band, country=country)
# Generate download URL
download_url = self.get_image_download_url(boundary, 1000, img)
# Prompt the download URL
print("Download URL: ", download_url)
# Download the image
destination_path = f"{band}_{start}_{end}.tif"
self.download_file_from_url(download_url, destination_path)
[docs]
def get_admin_units(self, level=0):
"""
:param level: An integer representing the administrative level. Default is 0.
:return: A list of unique administrative units at the specified level.
This method retrieves administrative units from a dataset based on the specified level. The dataset used is FAO/GAUL_SIMPLIFIED_500m/2015, which contains administrative boundary information
*. The parameter 'level' specifies the desired administrative level.
If level is 0, the method returns a sorted list of unique country names found in the dataset.
If level is 1 or 2, the method creates a dictionary where each key represents a higher-level unit and its value is a list of lower-level units. The dictionary is returned as a Python
* dictionary.
Note: This method utilizes the Earth Engine API to interact with geospatial data.
Example usage:
my_object = MyClass()
result = my_object.get_admin_units(level=1)
# Result may look like:
{
'Country A': ['Region 1', 'Region 2'],
'Country B': ['Region 3', 'Region 4'],
...
}
"""
# Load the dataset
gaul_dataset = ee.FeatureCollection(f"FAO/GAUL_SIMPLIFIED_500m/2015/level{level}")
if level == 0:
unique_countries = gaul_dataset.aggregate_array('ADM0_NAME').distinct()
return sorted(unique_countries.getInfo())
elif level == 1 or level == 2:
# Get the distinct higher-level units
unique_higher_level_units = gaul_dataset.aggregate_array(f'ADM{level - 1}_NAME').distinct()
# Create a FeatureCollection where each feature is a higher-level unit and its property is the list of lower-level units
def process_higher_level_unit(current, prev):
prev = ee.Dictionary(prev)
current = ee.String(current)
filtered_1 = gaul_dataset.filter(ee.Filter.eq(f'ADM{level - 1}_NAME', current))
lower_level_units_1 = filtered_1.aggregate_array(f'ADM{level}_NAME').distinct()
return prev.set(current, lower_level_units_1)
admin_units_dict = ee.Dictionary(
ee.List(unique_higher_level_units.iterate(process_higher_level_unit, ee.Dictionary({}))))
return admin_units_dict.getInfo()
[docs]
def get_image_sum(self, img, geometry, scale, band='population'):
"""
:param img: The input image to calculate the sum.
:param geometry: The geometry to apply the calculation to.
:param scale: The scale to use for calculation.
:param band: The band to calculate the sum for. Defaults to 'population'.
:return: The sum value calculated for the specified band.
"""
# Define the reducers for each statistic you want to calculate
reducers = ee.Reducer.sum()
# Apply the reducers to the image
stats = img.reduceRegion(reducer=reducers, geometry=geometry, scale=scale, maxPixels=1e12)
sum_value = stats.get(band).getInfo() # Make sure 'band' is the correct key
return sum_value
[docs]
def ee_ensure_geometry(self, geometry):
"""
:param geometry: The input geometry to ensure that it is a valid Earth Engine Geometry.
:return: Return the valid Earth Engine Geometry.
Ensures that the input `geometry` is a valid Earth Engine Geometry. If the `geometry` is an
instance of `ee.Feature`, it extracts the geometry from the feature and returns it. If the
`geometry` is already an instance of `ee.Geometry`, it returns it directly. If the `geometry`
is an instance of `ee.FeatureCollection`, it dissolves it into its outer boundary and returns
the dissolved geometry. If the `geometry` is a dictionary, it assumes that it is a GeoJSON
geometry and converts it to an Earth Engine Geometry before ensuring its validity. If the
`geometry` is of any other type, it raises a `ValueError` with a specific error message
indicating that the geometry type must be an Earth Engine Geometry, Feature, or FeatureCollection.
"""
if isinstance(geometry, ee.Feature):
geometry = geometry.geometry()
return geometry
elif isinstance(geometry, ee.Geometry):
return geometry
elif isinstance(geometry, ee.FeatureCollection):
# Dissolve the FeatureCollection into its outer boundary
combined_geometry = geometry.geometry()
dissolved_geometry = combined_geometry.dissolve(maxError=1)
return dissolved_geometry
elif isinstance(geometry, dict):
# Assuming geojson is meant to be geometry (typo in the original code)
return self.ee_ensure_geometry(self.convert_geojson_to_ee(geometry))
else:
raise ValueError("Invalid geometry type. Must be an Earth Engine Geometry, Feature, or FeatureCollection.")
[docs]
def convert_geojson_to_ee(self, geojson_obj):
"""
Converts a GeoJSON object to Earth Engine objects.
:param geojson_obj: The GeoJSON object to be converted.
:return: The converted Earth Engine object.
Raises:
ValueError: If the GeoJSON type is not supported.
"""
if geojson_obj['type'] == 'FeatureCollection':
return ee.FeatureCollection(geojson_obj['features'])
elif geojson_obj['type'] == 'Feature':
geometry = geojson_obj['geometry']
return ee.Feature(geometry)
elif geojson_obj['type'] in ['Polygon', 'MultiPolygon', 'Point', 'LineString', 'MultiPoint', 'MultiLineString']:
return ee.Geometry(geojson_obj)
else:
raise ValueError("Unsupported GeoJSON type")
[docs]
def process_drawn_features(self, drawn_features, layer, column):
"""
:param drawn_features: A list of drawn features, each representing a geometry or a feature in Google Earth Engine.
:param layer: An Earth Engine asset layer to filter and retrieve distinct values from.
:param column: The column or property in the asset layer for which to retrieve distinct values.
:return: A list of distinct values from the specified column"""
all_distinct_values = []
for feature in drawn_features:
if isinstance(feature, ee.Feature) or isinstance(feature, ee.Geometry):
drawn_geom = feature.geometry()
bounding = drawn_geom.bounds()
filtered_layer = layer.filterBounds(bounding)
distinct_values = filtered_layer.aggregate_array(column).distinct().getInfo()
all_distinct_values.extend(distinct_values)
return list(set(all_distinct_values))
[docs]
def process_geometry_collection(self, geometry_collection, all_geometries):
"""
Process a geometry collection and extract polygon and multipolygon geometries
:param geometry_collection: The input geometry collection to be processed
:param all_geometries: List to store the extracted geometries
:return: None
"""
geometries = geometry_collection.geometries().getInfo()
for geom in geometries:
geom_type = geom['type']
if geom_type == 'Polygon':
all_geometries.append(geom['coordinates'])
elif geom_type == 'MultiPolygon':
for poly in geom['coordinates']:
all_geometries.append(poly)
[docs]
def download_feature_geometry(self, distinct_values, feature_type_prefix=None, column=None, layer=None,
dropdown_api=None, output_folder_location=None):
"""
:param distinct_values: A list of distinct values used to filter the features.
:param feature_type_prefix: Optional prefix for the feature type.
:param column: Optional column used for filtering the features.
:param layer: A layer object containing the features.
:param dropdown_api: Optional dropdown API type.
:return: The geometry of the features or None if no valid geometries are found.
"""
if not distinct_values:
print("No distinct values provided.")
return
if feature_type_prefix not in ['watersheds', 'admin']:
print("Invalid feature type.")
return
all_geometries = []
for value in distinct_values:
feature = layer.filter(ee.Filter.eq(column, value)).first()
if not feature:
print("No feature found for value:", value)
continue
geometry = feature.geometry()
if not geometry:
print("No geometry for value:", value)
continue
geometry_type = geometry.type().getInfo()
if geometry_type == 'Polygon':
all_geometries.append(geometry.coordinates().getInfo())
elif geometry_type == 'MultiPolygon':
for poly in geometry.coordinates().getInfo():
all_geometries.append(poly)
elif geometry_type == 'GeometryCollection':
self.process_geometry_collection(geometry, all_geometries)
if all_geometries:
try:
dissolved_geometry = ee.Geometry.MultiPolygon(all_geometries).dissolve()
feature = ee.Feature(dissolved_geometry)
except ee.EEException as e:
print("Error creating dissolved geometry:", e)
else:
print("No valid geometries to dissolve.")
if feature and dropdown_api in ['glofas', 'modis_nrt']:
geometry = feature.geometry().getInfo()
geom_location = 'geometry.geojson' if output_folder_location is None else os.path.join(output_folder_location, 'geometry.geojson')
with open(geom_location, "w") as file:
json.dump(geometry, file)
return geometry
else:
geometry = feature.geometry()
return geometry
[docs]
def download_and_split(self, image, original_geometry, scale, split_count=1, params=None, band=None):
"""
:param image: The image identifier.
:param original_geometry: The original geometry of the image.
:param scale: The scale of the image.
:param split_count: The number of splits to divide the image into (default is 1).
:param params: Additional parameters (default is None).
:param band: The band of the image (default is None).
:return: A list of file names and a boolean indicating if the download was successful.
"""
file_names = []
try:
geom_list = self.split_and_sort_geometry(original_geometry, split_count)
for index, geom in enumerate(geom_list):
url = self.get_image_download_url(img=image, region=geom, scale=scale)
file_name = f"{band}_{str(params['year'])}_{split_count}_{index}.tif".replace('-', '_').replace('/',
'_').replace(
' ', '_')
file_name = os.path.join(params['folder_output'], file_name)
self.download_file_from_url(url=url, destination_path=file_name)
file_names.append(file_name)
if split_count == 1 and len(file_names) == 1:
# Download successful without splitting, no need to mosaic
return file_names, True
except Exception as e:
if "Total request size" in str(e):
print(f"Splitting geometry into {split_count * 2} parts and trying again.")
# Increase split count and try again
return self.download_and_split(image, original_geometry, scale, split_count * 2, params, band=band)
else:
print(f'Unexpected error: {e}')
return file_names, False
[docs]
def ee_geometry_to_shapely(self, geometry):
"""
Converts an Earth Engine Geometry or Feature, or a GeoJSON dictionary, to a Shapely Geometry.
:param geometry: An Earth Engine Geometry or Feature, or a GeoJSON dictionary.
:return: A Shapely Geometry object.
Examples:
# Convert an Earth Engine Geometry
ee_geometry_to_shapely(ee_object)
# Convert a GeoJSON dictionary
geo_json = {
"type": "Point",
"coordinates": [0, 0]
}
ee_geometry_to_shapely(geo_json)
"""
# Check if the geometry is an Earth Engine Geometry or Feature
if isinstance(geometry, ee.Geometry) or isinstance(geometry, ee.Feature):
# Convert Earth Engine object to GeoJSON
geo_json = geometry.getInfo()
if 'geometry' in geo_json: # If it's a Feature, extract the geometry part
geo_json = geo_json['geometry']
# Convert GeoJSON to a Shapely Geometry
return shape(geo_json)
elif isinstance(geometry, dict): # Directly convert from GeoJSON if it's a dictionary
return shape(geometry)
else:
# If it's neither, assume it's already a Shapely Geometry or compatible
return geometry
[docs]
def determine_geometries_to_process(self, override_boundary_type=None, layer=None, column=None, dropdown_api=None,
boundary_type=None, draw_features=None, userlayers=None, boundary_layer=None,
output_folder_location=None):
"""
:param override_boundary_type: (optional) Type of boundary to override the default boundary type.
:type override_boundary_type: str
:param layer: (optional) Name of the layer to use for processing features.
:type layer: str
:param column: (optional) Name of the column to use for processing features.
:type column: str
:param dropdown_api: (optional) API to retrieve dropdown values.
:type dropdown_api: str
:param boundary_type: (optional) Type of boundary to process.
:type boundary_type: str
:param draw_features: (optional) List of features drawn by the user.
:type draw_features: list
:param userlayers: (optional) Dictionary of user uploaded layers.
:type userlayers: dict
:param boundary_layer: (optional) Layer name for user defined boundary.
:type boundary_layer: str
:return: List of geometries to process.
:rtype: list
"""
geometries = []
if override_boundary_type:
boundary_type = override_boundary_type
else:
boundary_type = boundary_type
if boundary_type in ['Predefined Boundaries', 'User Defined']:
for feature in draw_features:
if boundary_type == 'Predefined Boundaries':
distinct_values = self.process_drawn_features([feature], layer=layer, column=column)
feature = self.download_feature_geometry(distinct_values, feature_type_prefix=boundary_layer.split('_')[0],
column=column, layer=layer, dropdown_api=dropdown_api,
output_folder_location=output_folder_location)
else: # User Defined
distinct_values = None
# Assuming feature is the geometry itself in this case
geometries.append((feature, distinct_values))
elif boundary_type == 'User Uploaded Data' and 'User Uploaded Data' in userlayers:
feature = userlayers['User Uploaded Data'].data
geometries.append((feature, None))
return geometries
[docs]
class EarthEngineNotebookInterface(BaseModel):
[docs]
class Config:
"""
This class represents a configuration object.
Attributes:
extra (Extra): Specifies whether to allow extra fields in the configuration.
Enum:
Extra:
- allow: Allows extra fields in the configuration.
- disallow: Disallows extra fields in the configuration.
"""
extra = Extra.allow # Allow extra fields
def __init__(self, **data):
"""Initialize the object with the given parameters.
:param data: The data used to initialize the object.
"""
super().__init__(**data)
self.gee_layer_search_widget = None
self.create_widgets_gee()
[docs]
def on_gee_layer_selected(self, b):
"""
Method to handle the selection of a Google Earth Engine layer.
:param b: The event object triggered by the selection.
:type b: object
:return: None
:rtype: None
"""
selected_layer = self.gee_layer_search_results_dropdown.value
self.ee_dates_min_max = self.get_image_collection_dates(selected_layer, min_max_only=True)
self.gee_bands_search_results.options = ee.ImageCollection(selected_layer).first().bandNames().getInfo()
[docs]
def on_single_or_range_dates_change(self, change):
"""
Method to handle changes in the selection of single or range dates.
:param change: The change event triggered by the selection.
:return: None
"""
if self.single_or_range_dates.value == 'Single Date':
self.gee_single_date_selector = widgets.Dropdown(
options=[],
value=None,
description='Results:',
disabled=False,
layout=Layout(width='auto')
)
self.gee_single_date_selector.options = self.get_image_collection_dates(
self.gee_layer_search_results_dropdown.value, min_max_only=False)
self.gee_date_selection.children = [self.gee_single_date_selector]
elif self.single_or_range_dates.value == 'Date Range':
start_date = datetime.datetime.strptime(self.ee_dates_min_max[0], '%Y-%m-%d').date()
end_date = datetime.datetime.strptime(self.ee_dates_min_max[1], '%Y-%m-%d').date()
self.gee_date_picker_start = widgets.DatePicker(
description='Select Start Date:',
disabled=False,
min=start_date,
max=end_date,
value=start_date
)
self.gee_date_picker_end = widgets.DatePicker(
description='Select End Date:',
disabled=False,
min=start_date,
max=end_date,
value=end_date
)
self.gee_multi_date_aggregation_periods = widgets.ToggleButtons(
options=['Monthly', 'Yearly', 'All Images', 'One Aggregation'],
disabled=False,
value='Monthly',
tooltips=['Monthly', 'Yearly', 'All Images', 'One Aggregation'],
)
aggregation_values = {
'mode': lambda ic: ic.mode(),
'median': lambda ic: ic.median(),
'mean': lambda ic: ic.mean(),
'max': lambda ic: ic.max(),
'min': lambda ic: ic.min(),
'sum': lambda ic: ic.reduce(ee.Reducer.sum()),
'first': lambda ic: ic.sort('system:time_start', False).first(),
'last': lambda ic: ic.sort('system:time_start', False).last(),
# 'none': lambda ic: ic
}
self.gee_multi_date_aggregation_method = widgets.Dropdown(
options={x.title(): x for x in aggregation_values.keys()},
value='mean',
description='Aggregation Method:',
disabled=False,
)
self.gee_date_selection.children = [widgets.HBox([self.gee_date_picker_start, self.gee_date_picker_end]),
self.gee_multi_date_aggregation_periods,
self.gee_multi_date_aggregation_method]
[docs]
def process_api(self, geometry, distinct_values, index):
"""
:param geometry: The geometry for which to retrieve the image data.
:param distinct_values: Whether to retrieve distinct values or not.
:param index: The index of the distinct value to retrieve.
:return: None
"""
with self.out:
gee_params = self.gather_gee_parameters()
with self.out:
self.out.clear_output()
print(gee_params)
geometry = self.ee_ensure_geometry(geometry)
if gee_params['multi_date'] == False:
img, region, gee_params['scale'] = self.get_image(**gee_params, geometry=geometry)
url = self.get_image_download_url(img=img, region=region, scale=gee_params['scale'])
file_name = 'gee_image.tif'
self.download_file_from_url(url=url, destination_path=file_name)
min_val, max_val, no_data_val = self.get_raster_min_max(file_name)
if self.gee_bands_search_results.value.lower() in ['ndvi', 'evi']:
palette = ['FFFFFF', 'CE7E45', 'DF923D', 'F1B555', 'FCD163', '99B718',
'74A901', '66A000', '529400', '3E8601', '207401', '056201',
'004C00', '023B01', '012E01', '011D01', '011301']
vis_params = {
'min': 0,
'max': 10000,
'palette': palette,
'nodata': no_data_val
}
else:
vis_params = {
'min': min_val,
'max': max_val,
'palette': 'viridis',
'nodata': no_data_val
}
self.addLayer(img, vis_params)
else:
if gee_params['aggregation_period'] == 'Monthly':
monthly_date_ranges = self.generate_monthly_date_ranges(gee_params['start_date'],
gee_params['end_date'])
if gee_params['statistics_only']:
all_stats = ee.Dictionary()
for dates in monthly_date_ranges:
if gee_params['statistics_only']:
image, geometry = self.get_image(multi_date=True,
aggregation_method=gee_params[
'aggregation_method'],
geometry=geometry, start_date=dates[0],
end_date=dates[1],
band=gee_params['band'],
image_collection=gee_params[
'image_collection'])
stats = self.calculate_statistics(image, geometry, gee_params[
'band']) # This should be a server-side object
all_stats = all_stats.set(dates[0], stats)
else:
img, boundary = self.get_image(multi_date=True,
aggregation_method=gee_params[
'aggregation_method'],
geometry=geometry, start_date=dates[0],
end_date=dates[1],
band=gee_params['band'],
image_collection=gee_params[
'image_collection'])
url = self.get_image_download_url(img=img, region=boundary,
scale=gee_params['scale'])
file_name = f"{gee_params['image_collection']}_{dates[0]}_{dates[1]}_{gee_params['aggregation_method']}.tif".replace(
'-', '_').replace('/', '_').replace(' ', '_')
self.download_file_from_url(url=url, destination_path=file_name)
print(f"Downloaded {file_name}")
if gee_params['statistics_only']:
all_stats_info = all_stats.getInfo()
with self.out:
self.out.clear_output()
print(all_stats_info)
elif gee_params['aggregation_period'] == 'Yearly':
yearly_date_ranges = self.generate_yearly_date_ranges(
gee_params['start_date'], gee_params['end_date'])
if gee_params['statistics_only']:
all_stats = ee.Dictionary()
for dates in yearly_date_ranges:
if gee_params['statistics_only']:
image, geometry = self.get_image(multi_date=True,
aggregation_method=gee_params[
'aggregation_method'],
geometry=geometry, start_date=dates[0],
end_date=dates[1],
band=gee_params['band'],
image_collection=gee_params[
'image_collection'])
stats = self.calculate_statistics(image, geometry, gee_params[
'band']) # This should be a server-side object
all_stats = all_stats.set(dates[0], stats)
else:
img, boundary = self.get_image(multi_date=True,
aggregation_method=gee_params[
'aggregation_method'],
geometry=geometry, start_date=dates[0],
end_date=dates[1],
band=gee_params['band'],
image_collection=gee_params[
'image_collection'])
url = self.get_image_download_url(img=img, region=boundary,
scale=gee_params['scale'])
file_name = f"{gee_params['image_collection']}_{dates[0]}_{dates[1]}_{gee_params['aggregation_method']}.tif".replace(
'-', '_').replace('/', '_').replace(' ', '_')
self.download_file_from_url(url=url, destination_path=file_name)
print(f"Downloaded {file_name}")
if gee_params['statistics_only']:
all_stats_info = all_stats.getInfo()
with self.out:
print(all_stats_info)
elif gee_params['aggregation_period'] == 'One Aggregation':
img, boundary = self.get_image(multi_date=True,
aggregation_method=gee_params[
'aggregation_method'],
geometry=geometry,
start_date=str(gee_params['start_date']),
end_date=str(gee_params['end_date']),
band=gee_params['band'],
image_collection=gee_params[
'image_collection'])
url = self.get_image_download_url(img=img, region=boundary,
scale=gee_params['scale'])
file_name = f"{gee_params['image_collection']}_{str(gee_params['start_date'])}_{str(gee_params['end_date'])}_{gee_params['aggregation_method']}.tif".replace(
'-', '_').replace('/', '_').replace(' ', '_')
self.download_file_from_url(url=url, destination_path=file_name)
min_val, max_val, no_data_val = self.get_raster_min_max(file_name)
if self.gee_bands_search_results.value.lower() in ['ndvi', 'evi']:
palette = ['FFFFFF', 'CE7E45', 'DF923D', 'F1B555', 'FCD163', '99B718',
'74A901', '66A000', '529400', '3E8601', '207401', '056201',
'004C00', '023B01', '012E01', '011D01', '011301']
vis_params = {
'min': 0,
'max': 10000,
'palette': palette,
'nodata': no_data_val
}
else:
vis_params = {
'min': min_val,
'max': max_val,
'palette': 'viridis',
'nodata': no_data_val
}
self.addLayer(img, vis_params)
with self.out:
print(f"Downloaded {file_name}")
[docs]
def gather_parameters(self):
"""
Gathers the parameters required for processing.
:return: A dictionary containing the gathered parameters.
"""
image_collection = self.gee_layer_search_results_dropdown.value
date_type = self.single_or_range_dates.value
band = self.gee_bands_search_results.value
statistics_only = self.statistics_only_check.value
if self.scale_input.value == 'default':
scale = 'default'
else:
scale = int(self.scale_input.value)
add_image_to_map = self.add_to_map_check.value
create_sub_folder = self.create_sub_folder.value
if date_type == 'Single Date':
date = self.gee_single_date_selector.value
self.add_to_map_check.value = True
self.add_to_map_check.disabled = False
return {
'statistics_only': statistics_only,
'image_collection': image_collection,
'multi_date': False,
'band': band,
'date': date,
'scale': scale,
'create_sub_folder': create_sub_folder,
'add_to_map': add_image_to_map,
}
elif date_type == 'Date Range':
aggregation_period = self.gee_multi_date_aggregation_periods.value
aggregation_method = self.gee_multi_date_aggregation_method.value
start_date = self.gee_date_picker_start.value
end_date = self.gee_date_picker_end.value
band = self.gee_bands_search_results.value
self.add_to_map_check.value = False
self.add_to_map_check.disabled = True
return {
'statistics_only': statistics_only,
'image_collection': image_collection,
'multi_date': True,
'aggregation_period': aggregation_period,
'aggregation_method': aggregation_method,
'start_date': start_date,
'band': band,
'end_date': end_date,
'scale': scale,
'create_sub_folder': create_sub_folder,
'add_to_map': add_image_to_map,
}
else:
pass