Source code for pyrams.data_tools

"""Contains a collections of functions for working with RAMS data in Python"""
from operator import concat
import numpy as np
from netCDF4 import Dataset as ncfile
from matplotlib import pyplot as plt
import xarray as xr
import warnings
from tqdm import tqdm
import pandas as pd
from datetime import datetime
from metpy.interpolate import log_interpolate_1d
import re

[docs]class DataInfo(): """ Deprecated. Please use `DataVar()` A class to handle model data Attributes ---------- variable : str The name of the variable as found in the data files (e.g. "RTP") longname : str The long name of the variable (e.g. "Total Water Mixing Ratio") unit : str The unit of the variable (e.g. "kg/kg") data : numpy.ndarray The data array for the variable """ def __init__(self, variable, longname, unit): warnings.warn('Note: data_tools.DataInfo is depricated.\ Please move to data_tools.DataVar') self.variable = variable self.longname = longname self.unit = unit self.data = None
[docs] def get_data(self, datadir, simulation): """ Parameters ---------- datadir : str The path of the data files simulation : str Name of the subfolder that the data is found in (e.g. "feb2014_control") Returns ------- data : numpy.ndarray The data for the desired variable """ file = xr.open_mfdataset(datadir + simulation + "*g2.h5", concat_dim='TIME') data = file[self.variable] return data
[docs]class DataVar(): """ A new class created to manage variables, their names, and units (replaces `DataInfo`) Parameters ---------- varname : str The variable name as found in the data files (e.g. "RTP") longname : str Optional. The long name of the variable (e.g. "Total Water Mixing Ratio") unit : str Optional. The unit of the variable (e.g. "kg/kg") Attributes ---------- varname : str The variable name as found in the data files (e.g. "RTP") longname : str The long name of the variable (e.g. "Total Water Mixing Ratio") unit : str The unit of the variable (e.g. "kg/kg") data : numpy.ndarray The data for the variable """ def __init__(self, varname, longname=None, unit=None): self.varname = varname self.unit = unit self.longname = longname
[docs] def get_data(self, flist): """ Pulls data from a list of files (flist) and puts it into a single array Arguments --------- flist : list A list of sorted file paths """ print(f'Opening {self.varname}') # Get dimensions dims = list(xr.open_dataset(flist[0])[self.varname].shape) dims.insert(0, len(flist)) # Create empty array self.data = np.zeros(dims) # Get data for i in tqdm(range(len(flist))): ds = xr.open_dataset(flist[i]) self.data[i, :] = ds[self.varname]
def purge_data(self): self.data = None
[docs]def domain_mean_netcdf(ds_with_metadata, outfile, vars=None): """ Writes x/y domain-average from from an xarray dataset to `outfile` as NetCDF. Arguments --------- ds_with_metadata : xr.Dataset An xarray dataset created with `pyrams.datatools.create_xr_dataset()` outfile : str Name of output file vars : list (optional) List of variable names to write. Default is to process and write all variables """ from os.path import exists from tqdm import tqdm ds = ds_with_metadata if vars is None: vars = [i for i in ds.data_vars] if exists(outfile): raise Exception(f"Error: {outfile} already exists") for v in tqdm(vars): try: a = ds[v].mean(dim=('x', 'y')) if exists(outfile): mode = 'a' else: mode = 'w' a.to_netcdf(outfile, mode=mode) # print(a) except ValueError: print(f"Variable {f} ") continue
[docs]def rewrite_to_netcdf(flist, output_path, duped_dims, phony_dim, prefix='dimfix', single_file=False, compression_level=None): """ Rewrites RAMS standard output files as netCDF4 with fixed dimension data, using ``data_tools.fix_duplicate_dims()`` Arguments --------- flist : list of str A list of file paths (recommend using ``sorted(glob.glob('/path/to/files/*g1.h5'))`` or similar) output_path : str Path where new files will be written duped_dims : list of str List of dimensions that are duplicated, in order (e.g. ``['y', 'x']``) phony_dim : string Name of duplicate dimension in `ds`, often ``phony_dim_0`` prefix : string Prefix for output files, defaults to `dimfix` single_file : bool (optional) If `True`, will combine all files into a single file with name `<prefix>.nc`. Defaults to `False`. compression_level : int If specified, data will be compressed on the given level (0-9 are valid). Defaults to `None` """ dslist = [] for f in tqdm(flist): str_date = '-'.join(f.split('/')[-1].split("-")[2:6]) date = np.datetime64(pd.to_datetime(str_date)) if duped_dims: ds = fix_duplicate_dims(xr.open_dataset(f), duped_dims, phony_dim) else: ds = xr.open_dataset(f) ds = ds.expand_dims({'time': [date]}) if not single_file: ds.to_netcdf(f'{output_path}/{prefix}_{str_date}.nc', unlimited_dims=['time']) if single_file: dslist.append(ds) ds.close() if single_file: ds = xr.concat(dslist, dim='time') encoding = None # If compression_level is defined, compress files if compression_level: # Catch invalid values if type(compression_level) is not int or compression_level < 0 or compression_level > 9: raise ValueError( "compression_level must be an integer between 0 and 9 (inclusive)") comp = dict(zlib=True, complevel=compression_level) encoding = {var: comp for var in ds.data_vars} ds.to_netcdf(f"{output_path}/{prefix}.nc", encoding=encoding) return
[docs]def fix_duplicate_dims(ds, duped_dims, phony_dim): """ Fixes duplicate dimensions (often with the same amount of `x` and `y` gridpoints), for use with xarray.open_mfdataset and xarray.combine_nested. Arguments --------- ds : xarray.Dataset The dataset to be fixed duped_dims : list of str List of dimensions that are duplicated, in order (e.g. `['y', 'x']`) phony_dim : string Name of duplicate dimension in `ds`, often `'phony_dim_0'` Returns ------- ds_new : xarray.Dataset New dataset with fixed dimension names """ dims = dict(ds.dims) try: dupe_dim = dims[phony_dim] except KeyError: print(f'Error, duplicate dimension must be \'phony_dim_0\'') return dims.pop(phony_dim) for d in duped_dims: dims[d] = dupe_dim ds_new = xr.Dataset() for v in ds.variables: dvar = ds[v] # Check if phony_dim exists in variable dimensions if phony_dim in dvar.dims: vardims = list(dvar.dims) indices = [i for i, x in enumerate(vardims) if x == phony_dim] for i, ind in enumerate(indices): vardims[ind] = duped_dims[i] vardims = tuple(vardims) ds_new[v] = (vardims, ds[v]) else: vardims = dvar.dims ds_new[v] = (vardims, ds[v]) ds.close() return(ds_new)
[docs]def flist_to_times(flist): """ Creates a list of datetimes from a list of RAMS output variables. Function uses regex to find the pattern "YYYY-mm-dd-HHMMSS" in the file path and converts to a np.datetime64 object. Parameters ---------- flilst: list A list of files Returns ------- times: list A list of times in np.datetime64 format. """ dtregex = r"[0-9]{4}-[0-9]{2}-[0-9]{2}-[0-9]{6}" times = np.zeros(len(flist), dtype=datetime) for i,f in enumerate(flist): tarr = re.findall(dtregex, f) if tarr == []: raise SyntaxError(f"No datetimes of form \"YYYY-MM-DD-HHMMSS\" were found in {f}") if len(tarr) == 1: traw = tarr[0] else: raise SyntaxError(f"More than one date found in path {f}") traw = tarr[0] times[i] = datetime.strptime(traw, "%Y-%m-%d-%H%M%S") return times.astype(np.datetime64)
def create_xr_metadata( ds, flist = None, dims = { 'phony_dim_0' : 'x', 'phony_dim_1' : 'y', 'phony_dim_2' : 'z' }, dx = None, dz = None, z = None, dt = None, ): import numpy as np """ Adds metadata to ``xr.Dataset()``. Parameters ---------- ds: ``xarray.Dataset`` Dataset flist: List of file paths, optional List of filepaths, used to add datetimes to time dimension dt: ``String`` One of ``['second', 'minute', 'hour', 'day']`` Change the ``time`` coordinate to be a timedelta of unit ``dt``, ``flist`` must be specified. dims: dict, optional Dict of dims to rename. defaults to :: dims = { 'phony_dim_0' : 'x', 'phony_dim_1' : 'y', 'phony_dim_2' : 'z' } dx: float, optional dx to add values to ``(x,y)`` dimensions dz: float, optional dz to add values to ``z`` dimension z: list, optional List of explicit ``z`` values to add to dimension Returns ------- ds: ``xr.Dataset()`` """ try: ds = ds.rename(dims) except ValueError: print("Dimensions have already been renamed - skipping and continuing with other metadata") if flist: if type(flist) is str: flist = [flist] try: ds['time'] = flist_to_times(flist) except KeyError: ds = ds.assign(time=flist_to_times(flist)) dt_options = { 'second' : np.timedelta64(1, 's'), 'minute' : np.timedelta64(1, 'm'), 'hour' : np.timedelta64(1, 'h'), 'day' : np.timedelta64(1, 'D'), } if dt in dt_options: # If we're getting timestamps from `flist`, convert to timedelta # and divide by dt to get in specified units if flist: t = (ds['time'] - ds['time'][0]) / dt_options[dt] ds['time'] = t else: raise TypeError('`flist` must be definied to use `dt`.') # If not, assume dt describes length between time indices # else: # ds['time'] = np.arange(0, dt*len(ds['time']), dt) # Add unit ds['time'].attrs['unit'] = dt # If not in dt_options or None, raise error elif dt is not None: raise TypeError(f"`dt` must one of {list(dt_options.keys())}") if dx: ds['x'] = np.arange(0, len(ds.x)) * dx ds['y'] = np.arange(0, len(ds.y)) * dx ds['x'].attrs = {'units' : 'm'} ds['y'].attrs = {'units' : 'm'} if dz: ds['z'] = np.arange(0, len(ds.z)) * dz ds['z'].attrs = {'units' : 'm'} if z: ds['z'] = z ds['z'].attrs = {'units' : 'm'} return ds
[docs]def habit_count(habits, tmax): """ Takes 3D habit data and tmax (number of time steps) and returns the number of each habit at each time step. """ # Initialize empty array count = np.zeros((tmax, 12)) for time in range(0, tmax): unique, counts = np.unique(habits[time, :, :, :], return_counts=True) for i in range(0, len(unique)): if np.ma.is_masked(unique[i]) is False: count[time, int(unique[i])] = counts[i] return count
[docs]def press_level(pressure, heights, plevels, no_time=False): """ Calculates geopotential heights at a given pressure level Parameters ---------- pressure : numpy.ndarray The 3-D pressure field (assumes time dimension, turn off with `no_time=True`) heights : numpy.ndarray The 3-D array of gridbox heights plevels : list List of pressure levels to interpolate to no_time=False: bool Optional, set to `True` to indicate lack of time dimension. Returns ------- press_height : numpy.ndarray The geopotential heights at the specified pressure levels """ if no_time is False: try: tlen, zlen, ylen, xlen = pressure.shape press_height = np.zeros((tlen, ylen, xlen)) for t in range(0, tlen): for x in range(0, xlen): for y in range(0, ylen): press_height[t, y, x] =\ log_interpolate_1d(plevels, pressure[t, :, y, x], heights[:, y, x]) except ValueError: print("Error in dimensions, trying with no_time=True") no_time = True elif no_time is True: try: xlen, ylen, xlen = pressure.shape press_height = np.zeros((ylen, xlen)) for x in range(0, xlen): for y in range(0, ylen): press_height[t, y, x] =\ log_interpolate_1d(plevels, pressure[t, :, y, x], heights[:, y, x]) except ValueError: print("Error in dimensions") return press_height
[docs]def calc_height(topt, ztn): """ Calculates the height of each grid box Parameters ---------- topt : numpy.ndarray The 2-D topographic height information ztn : numpy.ndarray The ztn variable from the *head.txt files output from RAMS Returns ------- z : numpy.ndarray A 3-D array of the heights of each gridbox """ ylen, xlen = topt.shape zlen = len(ztn) z = np.zeros((zlen, ylen, xlen)) ztop = ztn[59] for x in range(0, xlen): for y in range(0, ylen): z[:, y, x] = ztn * (1 - (topt[y, x]/ztop)) + topt[y, x] return z
[docs]def z_levels_3d(ztn, topt): """ Calculates the gridbox heights for a 3-D grid Parameters ---------- ztn : list List of ztn values from RAMS *head.txt output topt : numpy.ndarray 2-D array of topography height values Returns ------- zheight : numpy.ndarray 3-D array of gridbox heights """ ylen, xlen = topt.shape zlen = len(ztn) zheight = np.zeros((zlen, ylen, xlen)) for x in range(0, xlen): for y in range(0, ylen): for z in range(0, zlen): zheight[z, y, x] = ztn[z] * \ (1-topt[y, x]/ztn[zlen-1])+topt[y, x] return zheight
[docs]def z_levels_2d(ztn, topt): """ Calculates the gridbox heights for a 2-D grid Parameters ---------- ztn : list List of ztn values from RAMS *head.txt output topt : numpy.ndarray 1-D array of topography height values Returns ------- zheight : numpy.ndarray 2-D array of gridbox heights """ xlen = topt.shape[0] zlen = len(ztn) zheight = np.zeros((zlen, xlen)) for x in range(0, xlen): for z in range(0, zlen): zheight[z, x] = ztn[z] * \ (1-topt[x]/ztn[zlen-1])+topt[x] return zheight
[docs]def build_mfdataset(path, **kwargs): """ Build and xarray dataset with a time dimension Parameters ---------- path : string Path to folder containing files **kwargs Additional arguments to pass to xarray """ from glob import glob flist = sorted(glob(path + '/*.h5')) print(f'Building dataset with {len(flist)} files') return xr.open_mfdataset(flist, combine='nested', concat_dim='time', **kwargs)