Source code for farmingpy.planned_isoxml
import xml.etree.ElementTree as ET
import os
import numpy as np
import zipfile
import xarray as xr
import rioxarray
import geopandas as gpd
from shapely.geometry import Polygon
[docs]
class TaskReader(object):
"""
Class to read planned ISOBUS task files in grid format.
"""
[docs]
def __init__(self, taskfile):
"""
Parameters
----------
taskfile
Path to TASKDATA.XML or zipped task file.
Based on: ISO 11783-10:2015.
"""
self.grid = None #: Planned task grid as xarray.DataArray
if os.path.isdir(taskfile):
taskfile = os.path.join(taskfile, "TASKDATA.XML")
ext = os.path.splitext(taskfile)[1].lower()
if ext == ".xml":
self.task = ET.parse(taskfile)
self.path = os.path.split(taskfile)[0]
self.root = self.task.getroot()
elif ext == ".zip":
self.zf = zipfile.ZipFile(taskfile)
tf = [f for f in self.zf.namelist() if "TASKDATA.XML" in f][0]
self.root = ET.fromstring(self.zf.read(tf))
self.ext = ext
self.field = Polygon([(float(pnt.attrib["D"]), float(pnt.attrib["C"])) for pnt in self.root.findall("PFD/PLN/LSG/PNT")])
self.read_grid()
def read_grid(self, grid_idx = 0):
grids = self.root.findall('./TSK/GRD')
g = grids[grid_idx]
grid_info = g.attrib
self.grid_info = grid_info
grd_file = grid_info["G"] + ".bin"
grid_type = int(self.grid_info["I"])
assert grid_type == 2, "Unsupported task file, grid type 2 is required (GRD tag, I attribute)"
pdv = self.root.find("./TSK/TZN").findall("PDV")
pdt = self.root.findall("./PDT")
vpns = [p.attrib["E"] for p in pdv]
self.products = [p.attrib["B"] for p in pdt]
self.units = [self.root.find(f"./VPN[@A='{v}']").attrib["E"] for v in vpns]
self.scales = [self.root.find(f"./VPN[@A='{v}']").attrib["C"] for v in vpns]
if self.ext == ".xml":
grid = np.fromfile(os.path.join(self.path, grd_file), dtype=np.int32)
elif self.ext == ".zip":
gf = [f for f in self.zf.namelist() if grd_file in f][0]
grid = np.frombuffer(self.zf.read(gf), dtype=np.int32)
# Grid Minimum East and North positions
y0, x0 = np.asarray([grid_info["A"], grid_info["B"]]).astype("float")
# Grid cell sizes
sy, sx = np.asarray([grid_info["C"], grid_info["D"]]).astype("float")
nr = int(grid_info["F"])
nc = int(grid_info["E"])
grid = grid.reshape((nr, nc, -1))
x = np.array([x0+c*sx for c in range(nc)])
y = np.array([y0+r*sy for r in range(nr)])
gx = xr.DataArray(np.float64(grid), dims=["y", "x", "product"],
coords={"x" : x, "y" : y})
gx.attrs["products"] = self.products
gx.attrs["units"] = self.units
# Scale data according to VPN attrib C
for c in range(len(self.scales)):
gx[:,:,c] *= float(self.scales[c])
self.grid = gx.rio.write_crs("epsg:4326")
self.grid = self.grid.rio.write_nodata(np.nan)
# Try to clip based on field polygon
# sometimes the field polygon doesn't match the grid and this fails
# -> just returns the grid
try:
self.grid = self.grid.rio.clip([self.field])
except:
pass
@property
def points(self):
"""
Planned task data as geopandas.GeoDataFrame.
"""
df = self.grid.to_dataframe("rate").reset_index()
gdf = gpd.GeoDataFrame(df[["product", "rate"]],
geometry=gpd.points_from_xy(df["x"], df["y"]), crs="epsg:4326" )
return gdf.dropna()