Source code for ctdproc.proc

#!/usr/bin/env python
# coding: utf-8
import gsw
import matplotlib as mpl
import numpy as np
import xarray as xr
from scipy import fft, optimize, signal, stats

from . import calcs, helpers

__all__ = [
    "run_all",
    "cleanup",
    "cleanup_ud",
    "split_updn",
    "phase_correct",
    "rmloops",
    "bincast",
    "wsink",
    "despike",
    "remove_out_of_bounds",
    "preen_ctd",
    "add_tcfit_default",
]

# We are running into trouble if the default backend is not installed
# in the current python environment. Try to import pyplot the default
# way first. If this fails, set backend to agg which should always work.
try:
    import matplotlib.pyplot as plt
except ImportError:
    mpl.use("agg")
    import matplotlib.pyplot as plt


[docs] def run_all(ds): """ Run all standard processing steps on raw CTD time series. Applies, in order: :func:`cleanup`, :func:`split_updn`, :func:`add_tcfit_default`, :func:`phase_correct`, :func:`ctdproc.calcs.swcalcs`, :func:`rmloops`, and :func:`cleanup_ud` to each of the down- and up-cast. Parameters ---------- ds : xarray.Dataset Raw CTD time series with default processing parameters in ``ds.attrs`` (see :func:`ctdproc.io.add_default_proc_params`). Returns ------- dict of xarray.Dataset Mapping with keys ``"down"`` and ``"up"`` containing the processed downcast and upcast time series. See Also -------- bincast : depth-bin the result of this function. """ ds = cleanup(ds) ds_updown = split_updn(ds) for v, d in ds_updown.items(): add_tcfit_default(d) d = d.dropna("time", how="any") d = phase_correct(d) d = calcs.swcalcs(d) d = rmloops(d) d = cleanup_ud(d) ds_updown[v] = d return ds_updown
[docs] def add_tcfit_default(ds): """ Set the default pressure range used for the t/c phase fit. The range is chosen by maximum cast pressure: 500 dbar to max for casts deeper than 1000 dbar, 200 dbar to max for casts deeper than 300 dbar, 50 dbar to max otherwise. The result is stored in ``ds.attrs["tcfit"]``; nothing is returned. Parameters ---------- ds : xarray.Dataset CTD time series. Must contain pressure variable ``p``. Modified in place. """ if ds.p.max() > 1000: tcfit = [500, ds.p.max().data] elif ds.p.max() > 300: tcfit = [200, ds.p.max().data] else: tcfit = [50, ds.p.max().data] ds.attrs["tcfit"] = tcfit
[docs] def cleanup(ds): """ Clean up CTD raw time series. Despikes pressure, trims near-surface data, removes spikes in temperature exceeding ``spike_thresh_t``, removes out-of-range values (see :func:`preen_ctd`), and drops a leading run of NaN conductivity. Parameters ---------- ds : xarray.Dataset Raw CTD time series. Required attrs: ``diff_p``, ``prod_p``, ``spike_thresh_t``, plus the per-variable bounds used by :func:`preen_ctd`. Returns ------- ds : xarray.Dataset Cleaned time series. """ # despike pressure ds["p"].data = helpers.glitchcorrect( ds.p.data, ds.attrs["diff_p"], ds.attrs["prod_p"] ) ipmax = np.argmax(ds.p.data) # eliminate near-surface data ptop = 1 fdeep = np.squeeze(np.where(ds.p.data > ptop)) ideepstart, ideepstop, ideeplen = helpers.findsegments(fdeep) ii = np.max(ideepstart[ideepstart < ipmax]) jj = np.min(ideepstop[ideepstop >= ipmax]) ds = ds.isel(time=range(ii, jj + 1)) # remove spikes in temperature ib = np.squeeze( np.where(np.absolute(np.diff(ds.t1.data)) > ds.attrs["spike_thresh_t"]) ) ds.t1[ib] = np.nan ib = np.squeeze( np.where(np.absolute(np.diff(ds.t2.data)) > ds.attrs["spike_thresh_t"]) ) ds.t2[ib] = np.nan # remove out of range values ds = preen_ctd(ds) # no trans, fl *** # remove nans at start fnan1 = np.squeeze(np.where(np.isnan(ds.c1.data))) fnan2 = np.squeeze(np.where(np.isnan(ds.c2.data))) if np.array_equiv(fnan1, fnan2) is False: print("warning: NaNs index different in data.c1 and data.c2") if fnan1.size > 0: istart, istop, ilen = helpers.findsegments(fnan1) if istart[0] != 0 | istart.size != 1: print("warning: more NaNs") ds = ds.isel(time=range(istop[0] + 1, ds.time.size)) return ds
[docs] def cleanup_ud(ds): """ Clean a single down- or up-cast and calculate derived variables. Despikes ``t1``/``t2``, glitch-corrects ``c1``/``c2``/``t1``/``t2``, computes salinity (practical and absolute) via :func:`ctdproc.calcs.calc_sal`, despikes/glitch-corrects the salinity fields, and finally calls :func:`ctdproc.calcs.calc_temp` and :func:`ctdproc.calcs.calc_sigma` to add conservative/potential temperature and potential density anomaly. Parameters ---------- ds : xarray.Dataset CTD time series for a single cast (down or up). Required attrs: ``spike_thresh_t``, ``spike_thresh_s``, ``diff_t``/``prod_t``, ``diff_c``/``prod_c``, ``diff_s``/``prod_s``, ``bounds_s``. Returns ------- ds : xarray.Dataset Cleaned cast with derived salinity, temperature and density fields. """ # remove spikes in temperature for v in ["t1", "t2"]: ds[v] = despike(ds[v], ds.attrs["spike_thresh_t"]) # despike T, C ibefore = 1 iafter = 1 for v in ["c1", "c2", "t1", "t2"]: ds[v].data = helpers.glitchcorrect( ds[v], ds.attrs[f"diff_{v[0]}"], ds.attrs[f"prod_{v[0]}"], ibefore, iafter ) # Calculate salinity (both absolute and practical) ds = calcs.calc_sal(ds) # despike s ibefore = 2 iafter = 2 for v in ["s1", "s2", "SA1", "SA2"]: # remove spikes ds[v] = despike(ds[v], ds.attrs["spike_thresh_s"]) # remove out of bounds data ds[v] = remove_out_of_bounds( ds[v], bmin=ds.attrs["bounds_s"][0], bmax=ds.attrs["bounds_s"][1] ) ds[v].data = helpers.glitchcorrect( ds[v], ds.attrs["diff_s"], ds.attrs["prod_s"], ibefore, iafter ) # calculate potential/conservative temperature, potential density anomaly ds = calcs.calc_temp(ds) ds = calcs.calc_sigma(ds) return ds
[docs] def despike(da, spike_threshold): """ Set spikes to NaN where the absolute first difference exceeds a threshold. Parameters ---------- da : xarray.DataArray Input data. Modified in place. spike_threshold : float Maximum absolute difference between consecutive samples; samples whose preceding difference exceeds this value are set to NaN. Returns ------- da : xarray.DataArray The input data with spikes replaced by NaN. """ absdiff = np.absolute(np.diff(da.data)) # Using np.greater instead of the > operator as we can use the where option # and avoid the warning when nans are compared to a number. It broadcasts # to the original array size. ib = np.squeeze( np.where(np.greater(absdiff, spike_threshold, where=np.isfinite(absdiff))) ) da[ib] = np.nan return da
[docs] def remove_out_of_bounds(da, bmin, bmax): """ Replace out-of-bounds values with NaN. Parameters ---------- da : xarray.DataArray Input data. bmin : float Lower bound (inclusive). bmax : float Upper bound (inclusive). Returns ------- xarray.DataArray Input with values outside ``[bmin, bmax]`` replaced by NaN. """ da = da.where((da <= bmax) & (da >= bmin)) return da
[docs] def preen_ctd(ds): """ Replace out-of-range values in ``p``, ``t1``, ``t2``, ``c1``, ``c2``. Each variable is filtered against the per-channel bounds stored in ``ds.attrs["bounds_p"]``, ``ds.attrs["bounds_t"]``, and ``ds.attrs["bounds_c"]``; out-of-range samples are replaced by linear interpolation between adjacent in-range samples. Parameters ---------- ds : xarray.Dataset Dataset with CTD time series. Returns ------- ds : xarray.Dataset Dataset with each channel cleaned and reinterpolated. """ for v in ["p", "t1", "t2", "c1", "c2"]: ds[v] = ds[v].where( xr.zeros_like(ds[v]), helpers.preen( ds[v].data, ds.attrs[f"bounds_{v[0]}"][0], ds.attrs[f"bounds_{v[0]}"][1] ), ) # TODO: remove spikes in oxygen, trans, fl in volts. return ds
[docs] def phase_correct(ds): """ Bring temperature and conductivity into phase via FFT-based fit. Estimates the thermistor time constant ``tau`` and the t/c sensor lag ``L`` for each (temperature, conductivity) pair by fitting an arctan model to the cross-spectral phase, then applies the correction in the frequency domain. Operates over the pressure range stored in ``ds.attrs["tcfit"]`` (see :func:`add_tcfit_default`). This is the most expensive step in :func:`run_all` and is critical for accurate salinity. Parameters ---------- ds : xarray.Dataset CTD time series for a single cast (down or up). Required attrs: ``tcfit``; optional ``plot_spectra`` and ``plot_path`` for diagnostic output. Returns ------- ds : xarray.Dataset Time series with ``t1``, ``t2``, ``c1``, ``c2`` replaced by their phase-corrected, low-pass-filtered counterparts. """ # remove spikes # TODO: bring this back in. however, the function fails later on if there # are nan's present. Could interpolate over anything that is just a few data points # for field in ["t1", "t2", "c1", "c2"]: # ib = np.squeeze(np.where(np.absolute(np.diff(data[field].data)) > 0.5)) # data[field][ib] = np.nan # ---Spectral Analysis of Raw Data--- # 24Hz data dt = 1 / 24 # number of points per segment N = 2**9 # only data within tcfit range. ii = np.squeeze( np.argwhere( (ds.p.data > ds.attrs["tcfit"][0]) & (ds.p.data < ds.attrs["tcfit"][1]) ) ) i1 = ii[0] i2 = ii[-1] n = i2 - i1 + 1 n = (np.floor(n / N) * N).astype("int64") # Truncate to be multiple of N elements long i2 = (i1 + n).astype("int64") # number of segments = dof/2 m = (n / N).astype("int64") # Number of degrees of freedom (power of 2) dof = 2 * m # Frequency resolution at dof degrees of freedom. df = 1 / (N * dt) # fft of each segment (row). Data are detrended, then windowed. window = signal.windows.triang(N) * np.ones((m, N)) At1 = fft.fft(signal.detrend(np.reshape(ds.t1.data[i1:i2], shape=(m, N))) * window) At2 = fft.fft(signal.detrend(np.reshape(ds.t2.data[i1:i2], shape=(m, N))) * window) Ac1 = fft.fft(signal.detrend(np.reshape(ds.c1.data[i1:i2], shape=(m, N))) * window) Ac2 = fft.fft(signal.detrend(np.reshape(ds.c2.data[i1:i2], shape=(m, N))) * window) # Positive frequencies only At1 = At1[:, 0 : int(N / 2)] At2 = At2[:, 0 : int(N / 2)] Ac1 = Ac1[:, 0 : int(N / 2)] Ac2 = Ac2[:, 0 : int(N / 2)] # Frequency f = fft.ifftshift(np.linspace(-N / 2, N / 2 - 1, N) / N / dt) f = f[: int(N / 2)] fold = f # Spectral Estimates. Note: In Matlab, At1*conj(At1) is not complex anymore. # Here, it is still a complex number but the imaginary part is zero. # We keep only the real part to stay consistent. Et1 = 2 * np.real(np.nanmean(At1 * np.conj(At1) / df / N**2, axis=0)) Et2 = 2 * np.real(np.nanmean(At2 * np.conj(At2) / df / N**2, axis=0)) Ec1 = 2 * np.real(np.nanmean(Ac1 * np.conj(Ac1) / df / N**2, axis=0)) Ec2 = 2 * np.real(np.nanmean(Ac2 * np.conj(Ac2) / df / N**2, axis=0)) # Cross Spectral Estimates Ct1c1 = 2 * np.nanmean(At1 * np.conj(Ac1) / df / N**2, axis=0) Ct2c2 = 2 * np.nanmean(At2 * np.conj(Ac2) / df / N**2, axis=0) # Squared Coherence Estimates Coht1c1 = np.real(Ct1c1 * np.conj(Ct1c1) / (Et1 * Ec1)) Coht2c2 = np.real(Ct2c2 * np.conj(Ct2c2) / (Et2 * Ec2)) # Cross-spectral Phase Estimates Phit1c1 = np.arctan2(np.imag(Ct1c1), np.real(Ct1c1)) Phit2c2 = np.arctan2(np.imag(Ct2c2), np.real(Ct2c2)) # ---Determine tau and L--- # tau is the thermistor time constant (sec) # L is the lag of t behind c due to sensor separation (sec) # Matrix of weights based on squared coherence. W1 = np.diag(Coht1c1) W2 = np.diag(Coht2c2) x1 = optimize.fmin(func=_atanfit, x0=[0, 0], args=(f, Phit1c1, W1), disp=False) x2 = optimize.fmin(func=_atanfit, x0=[0, 0], args=(f, Phit2c2, W2), disp=False) tau1 = x1[0] tau2 = x2[0] L1 = x1[1] L2 = x2[1] print("1: tau = {:1.4f}s, lag = {:1.4f}s".format(tau1, L1)) print("2: tau = {:1.4f}s, lag = {:1.4f}s".format(tau2, L2)) # ---Apply Phase Correction and LP Filter--- ii = np.squeeze(np.argwhere(ds.p.data > 1)) i1 = ii[0] i2 = ii[-1] n = i2 - i1 + 1 n = (np.floor(n / N) * N).astype("int64") # Truncate to be multiple of N elements long i2 = (i1 + n).astype("int64") # number of segments = dof/2 m = (n / N).astype("int64") # Number of degrees of freedom (power of 2) dof = 2 * m # Frequency resolution at dof degrees of freedom. df = 1 / (N * dt) # Transfer function f = fft.ifftshift(np.linspace(-N / 2, N / 2 - 1, N) / N / dt) H1 = (1 + 1j * 2 * np.pi * f * tau1) * np.exp(1j * 2 * np.pi * f * L1) H2 = (1 + 1j * 2 * np.pi * f * tau2) * np.exp(1j * 2 * np.pi * f * L2) # Low Pass Filter f0 = 6 # Cutoff frequency LP = 1 / (1 + (f / f0) ** 6) # Restructure data with overlapping segments. # Staggered segments vars = [ "t1", "t2", "c1", "c2", "p", "trans", "fl", "par", "alt", "oxygen1", "oxygen2", "ph", ] vard = {} for v in vars: if v in ds: vard[v] = np.zeros((2 * m - 1, N)) vard[v][: 2 * m - 1 : 2, :] = np.reshape(ds[v].data[i1:i2], shape=(m, N)) vard[v][1::2, :] = np.reshape( ds[v].data[i1 + int(N / 2) : i2 - int(N / 2)], shape=(m - 1, N), ) time = ds.time[i1:i2] lon = ds.lon[i1:i2] lat = ds.lat[i1:i2] # FFTs of staggered segments (each row) Ad = {} for v in vars: if v in ds: Ad[v] = fft.fft(vard[v]) # Corrected Fourier transforms of temperature. Ad["t1"] = Ad["t1"] * ((H1 * LP) * np.ones((2 * m - 1, 1))) Ad["t2"] = Ad["t2"] * ((H2 * LP) * np.ones((2 * m - 1, 1))) # LP filter remaining variables vars2 = [ "c1", "c2", "p", "trans", "fl", "par", "alt", "oxygen1", "oxygen2", "ph", ] for v in vars2: if v in ds: Ad[v] = Ad[v] * (LP * np.ones((2 * m - 1, 1))) # Inverse transforms of corrected temperature # and low passed other variables Adi = {} for v in vars: if v in ds: Adi[v] = np.real(fft.ifft(Ad[v])) Adi[v] = np.squeeze( np.reshape(Adi[v][:, int(N / 4) : (3 * int(N / 4))], shape=(1, -1)) ) time = time[int(N / 4) : -int(N / 4)] lon = lon[int(N / 4) : -int(N / 4)] lat = lat[int(N / 4) : -int(N / 4)] # Generate output structure. Copy attributes over. out = xr.Dataset(coords={"time": time}) out.attrs = ds.attrs out["lon"] = lon out["lat"] = lat for v in vars: if v in ds: out[v] = xr.DataArray(Adi[v], coords=(out.time,)) out[v].attrs = ds[v].attrs out.assign_attrs(dict(tau1=tau1, tau2=tau2, L1=L1, L2=L2)) # ---Recalculate and replot spectra, coherence and phase--- t1 = Adi["t1"][int(N / 4) : -int(N / 4)] # Now N elements shorter t2 = Adi["t2"][int(N / 4) : -int(N / 4)] c1 = Adi["c1"][int(N / 4) : -int(N / 4)] c2 = Adi["c2"][int(N / 4) : -int(N / 4)] # p = Adi["p"][int(N / 4) : -int(N / 4)] m = (i2 - N) / N # number of segments = dof/2 m = np.floor(m).astype("int64") dof = 2 * m # Number of degrees of freedom (power of 2) df = 1 / (N * dt) # Frequency resolution at dof degrees of freedom. window = signal.windows.triang(N) * np.ones((m, N)) At1 = fft.fft(signal.detrend(np.reshape(t1, shape=(m, N))) * window) At2 = fft.fft(signal.detrend(np.reshape(t2, shape=(m, N))) * window) Ac1 = fft.fft(signal.detrend(np.reshape(c1, shape=(m, N))) * window) Ac2 = fft.fft(signal.detrend(np.reshape(c2, shape=(m, N))) * window) # Positive frequencies only At1 = At1[:, 0 : int(N / 2)] At2 = At2[:, 0 : int(N / 2)] Ac1 = Ac1[:, 0 : int(N / 2)] Ac2 = Ac2[:, 0 : int(N / 2)] fn = f[0 : int(N / 2)] Et1n = 2 * np.nanmean(np.absolute(At1[:, : int(N / 2)]) ** 2, 0) / df / N**2 Et2n = 2 * np.nanmean(np.absolute(At2[:, : int(N / 2)]) ** 2, 0) / df / N**2 Ec1n = 2 * np.nanmean(np.absolute(Ac1[:, : int(N / 2)]) ** 2, 0) / df / N**2 Ec2n = 2 * np.nanmean(np.absolute(Ac2[:, : int(N / 2)]) ** 2, 0) / df / N**2 # Cross Spectral Estimates Ct1c1n = 2 * np.nanmean(At1 * np.conj(Ac1) / df / N**2, axis=0) Ct2c2n = 2 * np.nanmean(At2 * np.conj(Ac2) / df / N**2, axis=0) # Squared Coherence Estimates Coht1c1n = np.real(Ct1c1n * np.conj(Ct1c1n) / (Et1n * Ec1n)) Coht2c2n = np.real(Ct2c2n * np.conj(Ct2c2n) / (Et2n * Ec2n)) # 95% confidence bound # epsCoht1c1n = np.sqrt(2) * (1 - Coht1c1n) / np.sqrt(Coht1c1n) / np.sqrt(m) # epsCoht2c2n = np.sqrt(2) * (1 - Coht2c2n) / np.sqrt(Coht2c2n) / np.sqrt(m) # 95% significance level for coherence from Gille notes betan = 1 - 0.05 ** (1 / (m - 1)) # Cross-spectral Phase Estimates Phit1c1n = np.arctan2(np.imag(Ct1c1n), np.real(Ct1c1n)) Phit2c2n = np.arctan2(np.imag(Ct2c2n), np.real(Ct2c2n)) # 95% error bound # epsPhit1c1n = np.arcsin( # stats.t.ppf(0.05, dof) * np.sqrt((1 - Coht1c1n) / (dof * Coht1c1n)) # ) # epsPhit1c2n = np.arcsin( # stats.t.ppf(0.05, dof) * np.sqrt((1 - Coht2c2n) / (dof * Coht2c2n)) # ) if ds.attrs["plot_spectra"]: fig, ax = plt.subplots( nrows=2, ncols=2, figsize=(9, 7), constrained_layout=True ) ax0, ax1, ax2, ax3 = ax.flatten() ax0.plot(fold, Et1, label="1 uncorrected", color="0.5") ax0.plot(fold, Et2, label="2 uncorrected", color="0.8") ax0.plot(fn, Et1n, label="sensor 1") ax0.plot(fn, Et2n, label="sensor 2") ax0.set( yscale="log", xscale="log", xlabel="frequency [Hz]", ylabel=r"spectral density [$^{\circ}$C$^2$/Hz]", title="temperature spectra", ) ax0.plot( [fn[50], fn[50]], [ dof * Et1n[100] / stats.distributions.chi2.ppf(0.05 / 2, dof), dof * Et1n[100] / stats.distributions.chi2.ppf(1 - 0.05 / 2, dof), ], "k", ) ax0.legend() ax1.plot(fold, Ec1, label="1 uncorrected", color="0.5") ax1.plot(fold, Ec2, label="2 uncorrected", color="0.8") ax1.plot(fn, Ec1n, label="1") ax1.plot(fn, Ec2n, label="2") ax1.set( yscale="log", xscale="log", xlabel="frequency [Hz]", ylabel=r"spectral density [mmho$^2$/cm$^2$/Hz]", title="conductivity spectra", ) ax1.plot( [fn[50], fn[50]], [ dof * Ec1n[100] / stats.distributions.chi2.ppf(0.05 / 2, dof), dof * Ec1n[100] / stats.distributions.chi2.ppf(1 - 0.05 / 2, dof), ], "k", ) # Coherence between Temperature and Conductivity ax2.plot(fold, Coht1c1, color="0.5") ax2.plot(fold, Coht2c2, color="0.8") ax2.plot(fn, Coht1c1n) ax2.plot(fn, Coht2c2n) # ax.plot(fn, Coht1c1 / (1 + 2 * epsCoht1c1), color="b", linewidth=0.5, alpha=0.2) # ax.plot(fn, Coht1c1 / (1 - 2 * epsCoht1c1), color="b", linewidth=0.5, alpha=0.2) ax2.plot(fn, betan * np.ones(fn.size), "k--") ax2.set( xlabel="frequency [Hz]", ylabel="squared coherence", ylim=(-0.1, 1.1), title="t/c coherence", ) # Phase between Temperature and Conductivity ax3.plot(fold, Phit1c1, color="0.5") ax3.plot(fold, Phit2c2, color="0.8") ax3.plot(fn, Phit1c1n) ax3.plot(fn, Phit2c2n) ax3.set( xlabel="frequency [Hz]", ylabel="phase [rad]", ylim=[-4, 4], title="t/c phase", # xscale="log", ) ax3.plot( fold, -np.arctan(2 * np.pi * fold * x1[0]) - 2 * np.pi * fold * x1[1], "k--", ) ax3.plot( fold, -np.arctan(2 * np.pi * fold * x2[0]) - 2 * np.pi * fold * x2[1], "k--", ) if ds.attrs["plot_path"] != "": plt.savefig(ds.attrs["plot_path"], dpi=200) return out
[docs] def rmloops(ds): """ Eliminate depth loops in CTD data based on sinking velocity. All data variables are set to NaN within depth loops. Parameters ---------- ds : xarray.Dataset CTD time series dataset Returns ------- ds : xarray.Dataset CTD time series dataset """ assert "wthresh" in ds.attrs tsmooth = 0.25 # seconds fs = 24 # Hz w = wsink(ds.p.data, tsmooth, fs) # down/up +/-ve iloop = np.array([]) # up or downcast? pol = np.polyfit(np.array(range(ds.p.size)), ds.p, 1) dn = 1 if pol[0] > 0 else 0 if dn: # downcast flp = np.squeeze(np.argwhere(w < ds.attrs["wthresh"])) if flp.size >= 1: ia, ib, ilen = helpers.findsegments(flp) if ia[0] == ib[0] == 0: ia = np.delete(ia, 0) ib = np.delete(ib, 0) for start, stop in zip(ia, ib): pmi = np.argmax(ds.p[:stop].data) pm = ds.p[pmi].data tmp = np.squeeze(np.where(ds.p[pmi:] < pm)) iloop = np.append(iloop, pmi + 1) iloop = np.append(iloop, pmi + tmp - 1) iloop = np.unique(iloop) iloop = iloop.astype("int64") else: iloop = np.array([]) else: # upcast flp = np.squeeze(np.argwhere(w > -ds.attrs["wthresh"])) if flp.size >= 1: ia, ib, ilen = helpers.findsegments(flp) if ia[0] == ib[0] == 0: ia = np.delete(ia, 0) ib = np.delete(ib, 0) for start, stop in zip(ia, ib): pmi = np.argmax(ds.p[:stop].data) pm = ds.p[pmi].data tmp = np.squeeze(np.where(ds.p[:pmi] < pm)) iloop = np.append(iloop, pmi) iloop = np.append(iloop, pmi - tmp) iloop = np.unique(iloop) iloop = iloop.astype("int64") else: iloop = np.array([]) # get data variable names varnames = [k for k, v in ds.data_vars.items()] for rmitem in ["lon", "lat", "z", "depth"]: if rmitem in varnames: varnames.remove(rmitem) # set loops to NaN for varn in varnames: ds[varn][iloop] = np.nan return ds
[docs] def bincast(ds, dz, zmin, zmax): """ Depth-bin CTD time series. Parameters ---------- ds : xr.Dataset CTD time series dz : float Bin size [m] zmin : float Minimum bin depth center [m] zmax : float Maximum bin depth center [m] Returns ------- ds : xr.Dataset Depth-binned CTD profile """ dz2 = dz / 2 zbin = np.arange(zmin - dz2, zmax + dz + dz2, dz) zbinlabel = np.arange(zmin, zmax + dz, dz) # prepare dataset tmp = ds.swap_dims({"time": "depth"}) tmp = tmp.reset_coords() # need to bin time separately, not sure why btime = tmp.time.groupby_bins( "depth", bins=zbin, labels=zbinlabel, right=True, include_lowest=True ).mean() # bin all variables out = tmp.groupby_bins( "depth", bins=zbin, labels=zbinlabel, right=True, include_lowest=True ).mean() # organize out.coords["time"] = btime out = out.set_coords(["lon", "lat"]) out = out.rename({"depth_bins": "depth"}) # copy attributes # get data variable names varnames = [k for k, v in ds.data_vars.items()] for vari in varnames: out[vari].attrs = ds[vari].attrs out["depth"].attrs = { "long_name": "depth", "units": "m", "standard_name": "depth", "positive": "down", } out.attrs = ds.attrs # recalculate pressure from depth bins out["p"] = (("depth",), gsw.p_from_z(-1 * out.depth.data, out.lat.data)) out.p.attrs = { "long_name": "pressure", "units": "dbar", "standard_name": "sea_water_pressure", "positive": "down", } return out
[docs] def split_updn(ds): """ Split a cast into downcast and upcast at the pressure maximum. Parameters ---------- ds : xarray.Dataset CTD time series with pressure variable ``p``. Returns ------- dict of xarray.Dataset Mapping with keys ``"down"`` and ``"up"`` containing the downcast and upcast time series. """ n = ds.p.size ipmax = np.argmax(ds.p.data) datad = ds.isel(time=range(0, ipmax)) datau = ds.isel(time=range(ipmax, n)) return {"down": datad, "up": datau}
[docs] def wsink(p, Ts, Fs): """ Compute sinking velocity from pressure record. Computes the sinking (or rising) velocity from the pressure signal p by first differencing. The pressure signal is smoothed with a low-pass filter for differentiation. If the input signal is shorter than the smoothing time scale, w is taken as the slope of the linear regression of p. Adapted from wsink.m - Fabian Wolk, Rockland Oceanographic Services Inc. Parameters ---------- p : array-like Pressure [dbar] Ts : float Smoothing time scale [s] Fs : float Sampling frequency [Hz] Returns ------- w : array-like Sinking velocity [dbar/s] """ FORDER = 1 # low pass filter coefficients [b, a] = signal.butter(FORDER, 1 / Ts * 2 / Fs) N = p.size if N <= Fs * Ts * FORDER: pol = np.polyfit(np.array(range(N)), p, 1) w = pol[0] * Fs * np.ones(N) else: # pad the pressure vector left and right nPad = int(FORDER * Ts * Fs) if nPad > N: print( "warning: length of pressure vector is smaller than padding length.\n", "Filter transients may occur.", ) p = _pad_lr(p, nPad) w = np.gradient(Fs * signal.filtfilt(b, a, p)) w = w[nPad:-nPad] return w
def _atanfit(x, f, Phi, W): f = np.arctan(2 * np.pi * f * x[0]) + 2 * np.pi * f * x[1] + Phi f = np.matmul(np.matmul(f.transpose(), W**4), f) return f def _pad_lr(p, nPad): """Pad array left and right.""" p0 = p[0] p = p - p0 p = p0 + np.insert(p, 0, -p[nPad - 1 :: -1]) p0 = p[-1] p = p - p0 p = p0 + np.insert(p, -1, -p[: -nPad - 1 : -1]) return p