Source code for farmingpy.soil.rosetta

#%%
from rosetta import rosetta, SoilData
import pandas as pd
import geopandas as gpd
import numpy as np
from functools import singledispatch
import os
try:
    import matplotlib.pyplot as plt
except:
    pass


# %%

[docs] class Rosetta(object): """ Interface to USDA Rosetta pedotransfer function model using https://pypi.org/project/rosetta-soil/. Used to estimate soil hydraulic properties. """ rosetta_avg = None
[docs] def __init__(self, soildata): """ soildata: DataFrame with columns "sand", "silt", "clay" """ if type(soildata) in [dict, pd.core.series.Series]: soildata = pd.DataFrame([soildata]) elif type(soildata) is str: soildata = [soildata] elif type(soildata) is list and type(soildata[0]) is str: rparams = [self.find_params(soil) for soil in soildata] self.rosettaparams = pd.concat(rparams).reset_index(drop=True) self.soildata = pd.DataFrame(dict(name = soildata)) return None self.soildata = soildata self.rosettaparams = self.vg_params()
@classmethod def average_soils(cls): """Return average soil data for USDA Rosetta model. Source: https://www.ars.usda.gov/pacific-west-area/riverside-ca/agricultural-water-efficiency-and-salinity-research-unit/docs/model/rosetta-class-average-hydraulic-parameters/ """ if cls.rosetta_avg is None: cls.rosetta_avg = pd.read_csv(os.path.dirname(__file__) + "/../data/rosetta_average_parameters.csv") return cls.rosetta_avg @classmethod def find_params(cls, soilname): cls.average_soils() rparams = cls.rosetta_avg[cls.rosetta_avg.texture_name.str.replace(" ", "") == soilname.lower().replace(" ", "")] return rparams.reset_index(drop=True) def vg_params(self): """ Get soil hydrological parameters using USDA Rosetta model. Uses: https://github.com/usda-ars-ussl/rosetta-soil """ sdata = self.soildata.rename(str.lower, axis=1) sarray = sdata[["sand", "silt", "clay"]].to_numpy() if sarray.max() <= 1.0: sarray *= 100.0 r_soildata = SoilData.from_array(sarray) mean, stdev, codes = rosetta(3, r_soildata) p_names = ["theta_r", "theta_s", "log10_alpha", "log10_n", "log10_ksat" ] mu_params = pd.DataFrame(mean, columns=p_names) sd_params = pd.DataFrame(stdev, columns= [f"{p}_std" for p in p_names]) rcode = pd.DataFrame(codes, columns=["rosetta_model_code"]) return pd.concat([mu_params, sd_params, rcode], axis=1) def water_retention(self, phi, rosettaparams = None): """ Function that computes volumetric water content from soil matric potential using the van Genuchten (1980) model. Used with Rosetta model predictions. phi: Matric potential kPa """ if rosettaparams is None: rosettaparams = self.rosettaparams # Rosetta units for alpha and npar are 1/cm and [-] # convert kPa to cm of H20 phi = np.array(phi) * 10.19716 alpha = 10**rosettaparams["log10_alpha"] n = 10**rosettaparams["log10_n"] theta_r = rosettaparams["theta_r"] theta_s = rosettaparams["theta_s"] theta = theta_r + (theta_s-theta_r)*(1+(alpha*phi)**n)**-(1-1/n) return theta def water_capacity(self): """ Get soil saturation capacity, field capacity at 10 and 33 kPA, wilting point (1500kPA) and available water capacity. """ # rparams = self.find_params(soil) # soildata = pd.DataFrame(dict(texture = [soil])) #else: # soildata = soil # rparams = self.rosettaparams phi_vec = [("sat", 0.1), ("fc_10", 10), ("fc_33", 33), ("wp", 1500)] data = pd.DataFrame() for pname, phi in phi_vec: data[pname] = self.water_retention(phi) data["awc_10"] = data["fc_10"] - data["wp"] data["awc_33"] = data["fc_33"] - data["wp"] data["ksat"] = 10**self.rosettaparams["log10_ksat"] #return pd.concat([soildata, data], axis=1) return data def plot_water_retention(self, fc = 10, wp=1500, sat=0.1, legend=True): """Plot water retention curve""" wpts = np.array([sat, fc, wp]) # SAT, FC, wp in kPAs phis = np.logspace(-3.5,6,2000) curves = [self.water_retention(phis, p) for i,p in self.rosettaparams.iterrows()] pts = [self.water_retention(wpts, p) for i,p in self.rosettaparams.iterrows()] N = len(curves) if "name" in self.soildata.columns: labels = self.soildata["name"].to_list() else: labels = [f"{i}" for i in range(N)] for i in range(N): _ = plt.semilogy(curves[i].T, phis, label = labels[i]) _ = plt.plot(pts[i].T, wpts, linestyle="None", marker="o") plt.xlabel("Volumetric Water Content") plt.ylabel("Suction (kPA)") if legend: plt.legend(loc=1)
# %%