Source code for farmingpy.eo.s2tbx_biophys

import xarray as xr
import datetime
import numpy as np
import math
import os

[docs] class BioPhysS2tbx(object): """ Implementation of Sentinel 2 Toolbox Neural Network for predicting biophysical parameters from Sentinel 2 data. Uses weights from s2tbx-biophysical: https://github.com/senbox-org/s2tbx/tree/master/s2tbx-biophysical The model does input and output range validation, but omits convexity checks. Based on: Weiss, M., Baret, F., Jay, S., 2020. S2ToolBox level 2 products LAI, FAPAR, FCOVER. EMMAH-CAPTE, INRAe Avignon. http://step.esa.int/docs/extra/ATBD_S2ToolBox_V2.0.pdf """
[docs] def __init__(self, product="LAI", resolution=20, stbx_version="3.0"): """ Args: product (str, optional): Output product, "LAI", "FAPAR", "FCOVER", "LAI_Cab" or "LAI_cw". Defaults to "LAI". resolution (int, optional): Model resolution to use 10 or 20. Defaults to 20. stbx_version (str, optional): Toolbox version for weights 2.1 or 3.0. Defaults to "3.0". """ # Load weights and parameters from SNAP files if stbx_version == "3.0": if resolution == 10: snap_path = os.path.dirname(__file__) + "/../data/s2tbx_biophysical_auxdata/3_0/S2A_10m" else: snap_path = os.path.dirname(__file__) + "/../data/s2tbx_biophysical_auxdata/3_0/S2A" elif stbx_version == "2.1": snap_path = os.path.dirname(__file__) + "/../data/s2tbx_biophysical_auxdata/2_1/" self.stbx_version = stbx_version self.resolution = resolution self.product = product self.extreme_cases = np.loadtxt(f"{snap_path}/{product}/{product}_ExtremeCases", delimiter=",") # 2.1 has negative tolerance, 3.0 positive if stbx_version == "2.1": self.extreme_cases[0] = -self.extreme_cases[0] self.normalize_minmax = np.loadtxt(f"{snap_path}/{product}/{product}_Normalisation", delimiter=",") self.denormalize_minmax = np.loadtxt(f"{snap_path}/{product}/{product}_Denormalisation", delimiter=",") self.minmax_domain = np.loadtxt(f"{snap_path}/{product}/{product}_DefinitionDomain_MinMax", delimiter=",") b1 = np.loadtxt(f"{snap_path}/{product}/{product}_Weights_Layer1_Bias", delimiter=",").T w1 = np.loadtxt(f"{snap_path}/{product}/{product}_Weights_Layer1_Neurons", delimiter=",").T self.wts = np.vstack([b1, w1]) self.b2 = np.loadtxt(f"{snap_path}/{product}/{product}_Weights_Layer2_Bias", delimiter=",") self.wts2 = np.loadtxt(f"{snap_path}/{product}/{product}_Weights_Layer2_Neurons", delimiter=",")
[docs] def __call__(self, ds, *args, **kwargs): """Run the model on dataset. Args: ds xarray.DataArray: Sentinel 2 data. The format needs to match the data retrieved using `twinyields.eo.S2SentinelHub` class. Returns: xarray.DataArray: Vegetation index """ ds = ds.transpose("y", "x", "band") ds = self.clean_input(ds) nm = self.normalize_minmax degToRad = math.pi/ 180 intercept = ds.isel(band=0) intercept.coords["band"] = "I" intercept = intercept.where(np.isnan, 1) # With 10m resolution fewer bands are used # http://step.esa.int/docs/extra/ATBD_S2ToolBox_V2.0.pdf if self.resolution == 10: bands = ds.sel(band=["B03", "B04", "B08"]) for i in range(3): bands[:,:,i] = self.normalize(bands[:,:,i], *nm[i,:]) viewZen_norm = self.normalize(np.cos(ds.sel(band="viewZenithMean") * degToRad), *nm[3,:]) sunZen_norm = self.normalize(np.cos(ds.sel(band="sunZenithAngles") * degToRad), *nm[4,:]) relAzim_norm = self.normalize(np.cos((ds.sel(band="sunAzimuthAngles") - ds.sel(band="viewAzimuthMean")) * degToRad), *nm[5,:]) relAzim_norm.coords["band"] = "relAzim_norm" X = xr.concat([intercept, bands, viewZen_norm,sunZen_norm,relAzim_norm], dim="band") else: b03_norm = self.normalize(ds.sel(band="B03"), *nm[0,:]) b04_norm = self.normalize(ds.sel(band="B04"), *nm[1,:]) b05_norm = self.normalize(ds.sel(band="B05"), *nm[2,:]) b06_norm = self.normalize(ds.sel(band="B06"), *nm[3,:]) b07_norm = self.normalize(ds.sel(band="B07"), *nm[4,:]) b8a_norm = self.normalize(ds.sel(band="B8A"), *nm[5,:]) b11_norm = self.normalize(ds.sel(band="B11"), *nm[6,:]) b12_norm = self.normalize(ds.sel(band="B12"), *nm[7,:]) viewZen_norm = self.normalize(np.cos(ds.sel(band="viewZenithMean") * degToRad), *nm[8,:]) sunZen_norm = self.normalize(np.cos(ds.sel(band="sunZenithAngles") * degToRad), *nm[9,:]) relAzim_norm = self.normalize(np.cos((ds.sel(band="sunAzimuthAngles") - ds.sel(band="viewAzimuthMean")) * degToRad), *nm[10,:]) relAzim_norm.coords["band"] = "relAzim_norm" X = xr.concat([intercept, b03_norm, b04_norm, b05_norm, b06_norm, b07_norm, b8a_norm, b11_norm, b12_norm, viewZen_norm,sunZen_norm,relAzim_norm], dim="band") X = X.transpose("y", "x", "band").to_numpy() l1 = np.tanh(np.dot(X, self.wts)) l2 = np.dot(l1, self.wts2) + self.b2 index = self.denormalize(l2, *self.denormalize_minmax) lds = ds.isel(band=0) lds.values = index lds.coords["band"] = self.product # Remove extreme values from output tolerance = self.extreme_cases[0] index_min = self.extreme_cases[1] index_max = self.extreme_cases[2] l_copy = lds.copy() lds = lds.where(l_copy >= -tolerance, np.nan) #Everything less than tolerance is nan lds = lds.where(l_copy >= index_min, index_min) #Everything below index_min = index_min lds = lds.where(l_copy <= index_max, index_max) #Everything above index_max to index_max lds = lds.where(l_copy <= (index_max + tolerance), np.nan) #Everything above index_max - tolerance to NaN return lds
def normalize(self, unnormalized, min, max): return 2 * (unnormalized - min) / (max - min) - 1 def denormalize(self, normalized, min, max): return 0.5 * (normalized + 1) * (max - min) + min # Remove values outside snap accepted range def clean_input(self, ds): if self.resolution == 10: s2_bands = ["B03", "B04", "B08"] else: s2_bands = ["B03", "B04", "B05", "B06", "B07", "B8A", "B11", "B12"] index_min = self.minmax_domain[0,:] index_max = self.minmax_domain[1,:] X = ds.sel(band=s2_bands) X = X.where(X > index_min, np.nan).where(X < index_max, np.nan) ds.loc[:,:,s2_bands] = X.loc[:,:,s2_bands] return ds