from netCDF4 import Dataset, MFDataset, num2date
from ocgis import RequestDataset
from datetime import datetime as dt
# TODO: change to nc_utils guess_main_variables
# from eggshell.nc.ocg_utils import get_variable
import logging
import os
from os import path, rename
import requests
[docs]LOGGER = logging.getLogger("PYWPS")
# from esgf_utils import ATTRIBUTE_TO_FACETS_MAP
[docs]class CookieNetCDFTransfer:
def __init__(self, request, opendap_hostnames=[]):
self.request = request
self.cookie = None
self.daprc_fn = '.daprc'
self.auth_cookie_fn = 'auth_cookie'
self.opendap_hostnames = opendap_hostnames
[docs] def __enter__(self):
self.cookie = get_auth_cookie(self.request)
if self.cookie:
with open(self.daprc_fn, 'w') as f:
f.write('HTTP.COOKIEJAR = auth_cookie')
with open(self.auth_cookie_fn, 'w') as f:
for opendap_hostname in self.opendap_hostnames:
for key, value in self.cookie.items():
f.write('{domain}\t{access_flag}\t{path}\t{secure}\t{expiration}\t{name}\t{value}\n'.format(
domain=opendap_hostname,
access_flag='FALSE',
path='/',
secure='FALSE',
expiration=0,
name=key,
value=value))
[docs] def __exit__(self, exc_type, exc_val, exc_tb):
if self.cookie:
os.remove(self.daprc_fn)
os.remove(self.auth_cookie_fn)
[docs]def aggregations(resource):
"""
aggregates netcdf files by experiment. Aggregation examples:
CORDEX: EUR-11_ICHEC-EC-EARTH_historical_r3i1p1_DMI-HIRHAM5_v1_day
CMIP5:
We collect for each experiment all files on the time axis:
200101-200512, 200601-201012, ...
Time axis is sorted by time.
:param resource: list of netcdf files
:return: dictionary with key=experiment
"""
from .nc_utils import drs_filename, sort_by_time
aggregations = {}
for nc in resource:
key = drs_filename(nc, skip_timestamp=True, skip_format=True)
# collect files of each aggregation (time axis)
if key in aggregations:
aggregations[key]['files'].append(nc)
else:
aggregations[key] = dict(files=[nc])
# collect aggregation metadata
for key in aggregations.keys():
# sort files by time
aggregations[key]['files'] = sort_by_time(aggregations[key]['files'])
# start timestamp of first file
start, _ = get_timerange(aggregations[key]['files'][0])
# end timestamp of last file
_, end = get_timerange(aggregations[key]['files'][-1])
aggregations[key]['from_timestamp'] = start
aggregations[key]['to_timestamp'] = end
aggregations[key]['start_year'] = int(start[0:4])
aggregations[key]['end_year'] = int(end[0:4])
aggregations[key]['variable'] = get_variable(aggregations[key]['files'][0])
aggregations[key]['filename'] = "%s_%s-%s.nc" % (key, start, end)
return aggregations
[docs]def get_auth_cookie(pywps_request):
try:
return dict(auth_tkt=pywps_request.http_request.cookies['auth_tkt'])
except KeyError:
# No token... will be anonymous
return None
[docs]def opendap_or_download(resource, auth_tkt_cookie={}, output_path=None,
max_nbytes=10000000000):
"""Check for OPEnDAP support, if not download the resource.
:param resource: url of a NetCDF resource
:param output_path: where to save the non-OPEnDAP resource
:param max_nbytes: maximum file size for download, default: 1 gb
:return str: the original url if OPEnDAP is supported or path of saved file
"""
try:
nc = Dataset(resource, 'r')
nc.close()
except Exception:
response = requests.get(resource, cookies=auth_tkt_cookie, stream=True)
if response.status_code == 401:
raise Exception("Not Authorized")
if 'Content-Length' in response.headers.keys():
if int(response.headers['Content-Length']) > max_nbytes:
raise IOError("File too large to download.")
chunk_size = 16 * 1024
if not output_path:
output_path = os.getcwd()
output_file = os.path.join(output_path, os.path.basename(resource))
with open(output_file, 'wb') as f:
for chunk in response.iter_content(chunk_size):
if chunk:
f.write(chunk)
try:
nc = Dataset(output_file, 'r')
nc.close()
except Exception:
raise IOError("This does not appear to be a valid NetCDF file.")
return output_file
return resource
[docs]def get_coordinates(resource, variable=None, unrotate=False):
"""
reads out the coordinates of a variable
:param resource: netCDF resource file
:param variable: variable name
:param unrotate: If True the coordinates will be returned for unrotated pole
:returns list, list: latitudes , longitudes
"""
if type(resource) != list:
resource = [resource]
if variable is None:
variable = get_variable(resource)
if unrotate is False:
try:
if len(resource) > 1:
ds = MFDataset(resource)
else:
ds = Dataset(resource[0])
var = ds.variables[variable]
dims = list(var.dimensions)
if 'time' in dims:
dims.remove('time')
# TODO: find position of lat and long in list and replace dims[0] dims[1]
lats = ds.variables[dims[0]][:]
lons = ds.variables[dims[1]][:]
ds.close()
LOGGER.info('got coordinates without pole rotation')
except Exception:
msg = 'failed to extract coordinates'
LOGGER.exception(msg)
else:
# lats, lons = unrotate_pole(resource)
# LOGGER.info('got coordinates with pole rotation')
LOGGER.error('got coordinates with pole rotation is currently switched off')
return lats, lons
[docs]def get_index_lat(resource, variable=None):
"""
returns the dimension index of the latiude values
:param resource: list of path(s) to netCDF file(s) of one Dataset
:param variable: variable name
:return int: index
"""
if variable is None:
variable = get_variable(resource)
if type(resource) != list:
resource = [resource]
if len(resource) == 1:
ds = Dataset(resource[0])
else:
ds = MFDataset(resource)
var = ds.variables[variable]
dims = list(var.dimensions)
if 'rlat' in dims:
index = dims.index('rlat')
if 'lat' in dims:
index = dims.index('lat')
if 'latitude' in dims:
index = dims.index('latitude')
if 'y' in dims:
index = dims.index('y')
return index
[docs]def get_variable(resources):
"""Guess main variables in a NetCDF file.
(compare nc.ocg_utils.get_variable)
:param resources: netCDF4.Dataset
:return list: names of main variables
Notes
-----
The main variables are the one with highest dimensionality and size. The
time, lon, lat variables and variables that are defined as bounds are
automatically ignored.
"""
if type(resources) == str:
ncdataset = Dataset(resources)
elif type(resources) == list:
ncdataset = MFDataset(resources)
else:
ncdataset = resources
# dims = [key for key in ncdataset.dimensions.keys()]
vars = [key for key in ncdataset.variables.keys()]
# assume that main variables are 3D or 4D
main_vars = []
for var in vars:
if len(ncdataset.variables[var].dimensions) >= 3:
main_vars.append(var)
if len(main_vars) > 1:
LOGGER.exception('more than one 3D or 4D variable in file')
if len(main_vars) == 0:
LOGGER.exception('No 3D or 4D variable detected')
return main_vars[0]
# var_candidates = []
# bnds_variables = []
# for var_name in ncdataset.variables:
# if var_name in ['time', 'lon', 'lat']:
# continue
# ncvar = ncdataset.variables[var_name]
# if hasattr(ncvar, 'bounds'):
# bnds_variables.append(ncvar.bounds)
# var_candidates.append(var_name)
# var_candidates = list(set(var_candidates) - set(bnds_variables))
#
# # Find main variables among the candidates
# nd = -1
# size = -1
# main_variables = []
# for var_name in var_candidates:
# ncvar = ncdataset.variables[var_name]
# if len(ncvar.shape) > nd:
# main_variables = [var_name]
# nd = len(ncvar.shape)
# size = ncvar.size
# elif (len(ncvar.shape) == nd) and (ncvar.size > size):
# main_variables = [var_name]
# size = ncvar.size
# elif (len(ncvar.shape) == nd) and (ncvar.size == size):
# main_variables.append(var_name)
# return main_variables
[docs]def sort_by_filename(resource, historical_concatination=False):
"""
Sort a list of files with CORDEX-conformant file names.
:param resource: netCDF file
:param historical_concatination: if True (default=False), appropriate historial
runs will be sorted to the rcp datasets
:return dictionary: {'drs_filename': [list of netCDF files]}
"""
from os import path
if type(resource) == str:
resource = [resource]
ndic = {}
tmp_dic = {}
try:
if len(resource) > 1:
LOGGER.debug('sort_by_filename module start sorting %s files' % len(resource))
# LOGGER.debug('resource is list with %s files' % len(resource))
try: # if len(resource) > 1:
# collect the different experiment names
for nc in resource:
# LOGGER.info('file: %s' % nc)
p, f = path.split(path.abspath(nc))
n = f.split('_')
bn = '_'.join(n[0:-1]) # skipping the date information in the filename
ndic[bn] = [] # dictionary containing all datasets names
LOGGER.info('found %s datasets', len(ndic.keys()))
except Exception:
LOGGER.exception('failed to find names of datasets!')
LOGGER.info('check for historical/RCP datasets')
try:
if historical_concatination is True:
# select only necessary names
if any("_rcp" in s for s in ndic.keys()):
for key in ndic.keys():
if 'historical' in key:
ndic.pop(key)
LOGGER.info('historical data set names removed from dictionary')
else:
LOGGER.info('no RCP dataset names found in dictionary')
except Exception:
LOGGER.exception('failed to pop historical data set names!')
LOGGER.info('start sorting the files')
try:
for key in ndic:
try:
if historical_concatination is False:
for n in resource:
if '%s_' % key in n:
ndic[key].append(path.abspath(n)) # path.join(p, n))
elif historical_concatination is True:
key_hist = key.replace('rcp26', 'historical').\
replace('rcp45', 'historical').\
replace('rcp65', 'historical').\
replace('rcp85', 'historical')
for n in resource:
if '%s_' % key in n or '%s_' % key_hist in n:
ndic[key].append(path.abspath(n)) # path.join(p, n))
else:
LOGGER.error('append file paths to dictionary for key %s failed' % key)
ndic[key].sort()
except Exception:
LOGGER.exception('failed for %s ' % key)
except Exception:
LOGGER.exception('failed to populate the dictionary with appropriate files')
for key in ndic.keys():
try:
ndic[key].sort()
start, end = get_timerange(ndic[key])
newkey = key + '_' + start + '-' + end
tmp_dic[newkey] = ndic[key]
except Exception:
msg = 'failed to sort the list of resources and add dates to keyname: %s' % key
LOGGER.exception(msg)
tmp_dic[key] = ndic[key]
# raise Exception(msg)
elif len(resource) == 1:
p, f = path.split(path.abspath(resource[0]))
tmp_dic[f.replace('.nc', '')] = path.abspath(resource[0])
LOGGER.debug('only one file! Nothing to sort, resource is passed into dictionary')
else:
LOGGER.debug('sort_by_filename module failed: resource is not 1 or >1')
LOGGER.info('sort_by_filename module done: %s datasets found' % len(ndic))
except Exception:
msg = 'failed to sort files by filename'
LOGGER.exception(msg)
raise Exception(msg)
return tmp_dic
# def get_calendar(resource, variable=None):
# """
# returns the calendar and units in wich the timestamps are stored
#
# :param resource: netCDF file or files of one Dataset
#
# :return str: calendar, unit
# """
#
# if type(resource) != list:
# resource = [resource]
#
# try:
# if len(resource) > 1:
# ds = MFDataset(resource)
# else:
# ds = Dataset(resource[0])
# time = ds.variables['time']
# except:
# msg = 'failed to get time'
# LOGGER.exception(msg)
# raise Exception(msg)
#
# if hasattr(time, 'units') is True:
# unit = time.units
# else:
# unit = None
#
# if hasattr(time, 'calendar') is True:
# calendar = time.calendar
# else:
# calendar = None
# return str(calendar), str(unit)
#
# def get_domain(resource):
# """
# returns the domain of a netCDF file
#
# :param resource: netCDF file (metadata quality checked!)
#
# :return str: domain
# """
# try:
# ds = Dataset(resource)
# if 'CMIP' in ds.project_id or 'EUCLEIA' in ds.project_id:
# domain = None
# LOGGER.debug('resource belongs to a global experiment project')
# elif 'CORDEX' in ds.project_id:
# domain = ds.CORDEX_domain
# LOGGER.info('resource belongs to CORDEX')
# else:
# LOGGER.debug('No known project_id found in meta data')
# ds.close()
# except Exception as e:
# LOGGER.debug('Could not specify domain for %s: %s' % (resource, e))
# return domain
#
#
[docs]def get_frequency(resource):
"""
returns the frequency as set in the metadata (see also metadata.get_frequency)
:param resource: NetCDF file
:return str: frequency
"""
ds = Dataset(resource)
try:
frequency = ds.frequency
LOGGER.info('frequency written in the meta data: %s', frequency)
except Exception as ex:
msg = "Could not specify frequency for %s : %s" % (resource, ex)
LOGGER.exception(msg)
raise Exception(msg)
else:
ds.close()
return frequency
[docs]def get_timerange(resource):
"""
returns from/to timestamp of given netcdf file(s).
:param resource: list of path(s) to netCDF file(s)
:returns netcdf.datetime.datetime: start, end
"""
start = end = None
if type(resource) != list:
resource = [resource]
LOGGER.debug('length of recources: %s files' % len(resource))
try:
if len(resource) > 1:
ds = MFDataset(resource)
LOGGER.debug('MFDataset loaded for %s of files in resource:' % len(resource))
else:
ds = Dataset(resource[0])
LOGGER.debug('Dataset loaded for %s file in resource:' % len(resource))
time = ds.variables['time']
if (hasattr(time, 'units') and hasattr(time, 'calendar')) is True:
s = num2date(time[0], time.units, time.calendar)
e = num2date(time[-1], time.units, time.calendar)
elif hasattr(time, 'units'):
s = num2date(time[0], time.units)
e = num2date(time[-1], time.units)
else:
s = num2date(time[0])
e = num2date(time[-1])
# TODO: include frequency
start = '%s%s%s' % (s.year, str(s.month).zfill(2), str(s.day).zfill(2))
end = '%s%s%s' % (e.year, str(e.month).zfill(2), str(e.day).zfill(2))
ds.close()
except Exception:
msg = 'failed to get time range'
LOGGER.exception(msg)
ds.close()
raise Exception(msg)
return start, end
[docs]def get_time(resource):
"""
returns all timestamps of given netcdf file as datetime list.
:param resource: NetCDF file(s)
:return : list of timesteps
"""
# :param format: if a format is provided (e.g format='%Y%d%m'), values will be converted to string
if type(resource) != list:
resource = [resource]
try:
if len(resource) > 1:
ds = MFDataset(resource)
else:
ds = Dataset(resource[0])
time = ds.variables['time']
except Exception as ex:
msg = 'failed to get time {}'.format(ex)
LOGGER.exception(msg)
raise Exception(msg)
try:
if (hasattr(time, 'units') and hasattr(time, 'calendar')) is True:
timestamps = num2date(time[:], time.units, time.calendar)
elif hasattr(time, 'units'):
timestamps = num2date(time[:], time.units)
else:
timestamps = num2date(time[:])
ds.close()
try:
ts = [dt.strptime(str(i), '%Y-%m-%d %H:%M:%S') for i in timestamps]
# if date_format is not None:
# ts = [t.strftime(format=date_format) for t in timestamps]
# else:
# ts = [dt.strptime(str(i), '%Y-%m-%d %H:%M:%S') for i in timestamps]
# TODO give out dateformat by frequency
# ERROR: ValueError: unconverted data remains: 12:00:00
# from flyingpigeon.metadata import get_frequency
# frq = get_frequency(resource)
# if frq is 'day':
# ts = [dt.strptime(str(i), '%Y-%m-%d') for i in timestamps]
# elif frq is 'mon':
# ts = [dt.strptime(str(i), '%Y-%m') for i in timestamps]
# elif frq is 'sem':
# ts = [dt.strptime(str(i), '%Y-%m') for i in timestamps]
# elif frq is 'yr':
# ts = [dt.strptime(str(i), '%Y') for i in timestamps]
# else:
# ts = [dt.strptime(str(i), '%Y-%m-%d %H:%M:%S') for i in timestamps]
except Exception as e:
msg = 'failed to convert times to string: {}'.format(e)
LOGGER.exception(msg)
except Exception as e:
msg = 'failed to convert time: {}'.format(e)
LOGGER.exception(msg)
return ts
[docs]def get_values(resource, variable=None):
"""
returns the values for a list of files of files belonging to one dataset
:param resource: list of files
:param variable: variable to be picked from the files (if not set, variable will be detected)
:returs numpy.array: values
"""
from numpy import squeeze
if variable is None:
variable = get_variable(resource)
if isinstance(resource, str):
ds = Dataset(resource)
elif len(resource) == 1:
ds = Dataset(resource)
else:
ds = MFDataset(resource)
vals = squeeze(ds.variables[variable][:])
return vals
# def rename_variable(resource, oldname=None, newname='newname'):
# """
# Change the variable name of a netCDF variable
#
# :param resource: path to netCDF input file
# :param oldname: variable name to be changed
# :param newname: variable name to be given
#
# :retunrs str: path to resource
# """
# try:
# if oldname is None:
# oldname = get_variable(resource)
# if oldname != newname:
# from netCDF4 import Dataset
# ds = Dataset(resource, mode='a')
# ds.renameVariable(oldname, newname)
# ds.close()
# LOGGER.info('varname %s in netCDF renamed to %s' % (oldname, newname))
# except Exception as e:
# msg = 'failed to rename variable in target files %s ' % e
# LOGGER.debug(msg)
# raise Exception(msg)
#
#
# #
# def unrotate_pole(resource, write_to_file=False):
# """
# Calculates the unrotatated coordinates for a rotated pole grid
#
# :param resource: netCDF file or list of files of one datatset
# :param write_to_file: calculated values will be written to file if True (default=False)
#
# :return list: lats, lons
# """
# from numpy import reshape, repeat
# from iris.analysis import cartography as ct
#
# if len(resource) == 1:
# ds = Dataset(resource[0])
# else:
# ds = MFDataset(resource)
#
# # ds = MFDataset(resource)
#
# if 'lat' in ds.variables.keys():
# LOGGER.info('file include unrotated coordinate values')
# lats = ds.variables['lat'][:]
# lons = ds.variables['lon'][:]
# else:
# try:
# if 'rotated_latitude_longitude' in ds.variables:
# rp = ds.variables['rotated_latitude_longitude']
# elif 'rotated_pole' in ds.variables:
# rp = ds.variables['rotated_pole']
# else:
# LOGGER.debug('rotated pole variable not found')
# pole_lat = rp.grid_north_pole_latitude
# pole_lon = rp.grid_north_pole_longitude
# except:
# LOGGER.exception('failed to find rotated_pole coordinates')
# try:
# if 'rlat' in ds.variables:
# rlats = ds.variables['rlat']
# rlons = ds.variables['rlon']
#
# if 'x' in ds.variables:
# rlats = ds.variables['y']
# rlons = ds.variables['x']
# except:
# LOGGER.exception('failed to read in rotated coordiates')
#
# try:
# rlons_i = reshape(rlons, (1, len(rlons)))
# rlats_i = reshape(rlats, (len(rlats), 1))
# grid_rlats = repeat(rlats_i, (len(rlons)), axis=1)
# grid_rlons = repeat(rlons_i, (len(rlats)), axis=0)
# except:
# LOGGER.execption('failed to repeat coordinates')
#
# lons, lats = ct.unrotate_pole(grid_rlons, grid_rlats, pole_lon, pole_lat)
#
# if write_to_file is True:
# lat = ds.createVariable('lat', 'f8', ('rlat', 'rlon'))
# lon = ds.createVariable('lon', 'f8', ('rlat', 'rlon'))
#
# lon.standard_name = "longitude"
# lon.long_name = "longitude coordinate"
# lon.units = 'degrees_east'
# lat.standard_name = "latitude"
# lat.long_name = "latitude coordinate"
# lat.units = 'degrees_north'
#
# lat[:] = lats
# lon[:] = lons
#
# ds.close()
#
# return lats, lons
[docs]def drs_filename(resource, skip_timestamp=False, skip_format=False,
variable=None, rename_file=False, add_file_path=False):
"""
generates filename according to the data reference syntax (DRS)
based on the metadata in the resource.
http://cmip-pcmdi.llnl.gov/cmip5/docs/cmip5_data_reference_syntax.pdf
https://pypi.python.org/pypi/drslib
:param add_file_path: if add_file_path=True, path to file will be added (default=False)
:param resource: netcdf file
:param skip_timestamp: if True then from/to timestamp != added to the filename
(default: False)
:param variable: appropriate variable for filename, if not set (default), variable will
be determined. For files with more than one data variable,
the variable parameter has to be defined (default: )
example: variable='tas'
:param rename_file: rename the file. (default: False)
:returns str: DRS filename
"""
try:
ds = Dataset(resource)
if variable is None:
variable = get_variable(resource)
# CORDEX example: EUR-11_ICHEC-EC-EARTH_historical_r3i1p1_DMI-HIRHAM5_v1_day
cordex_pattern = "{variable}_{domain}_{driving_model}_{experiment}_{ensemble}_{model}_{version}_{frequency}"
# CMIP5 example: tas_MPI-ESM-LR_historical_r1i1p1
cmip5_pattern = "{variable}_{model}_{experiment}_{ensemble}"
filename = resource
if ds.project_id == 'CORDEX' or ds.project_id == 'EOBS':
filename = cordex_pattern.format(
variable=variable,
domain=ds.CORDEX_domain,
driving_model=ds.driving_model_id,
experiment=ds.experiment_id,
ensemble=ds.driving_model_ensemble_member,
model=ds.model_id,
version=ds.rcm_version_id,
frequency=ds.frequency)
elif ds.project_id == 'CMIP5':
# TODO: attributes missing in netcdf file for name generation?
filename = cmip5_pattern.format(
variable=variable,
model=ds.model_id,
experiment=ds.experiment,
ensemble=ds.parent_experiment_rip
)
else:
raise Exception('unknown project %s' % ds.project_id)
ds.close()
except Exception:
LOGGER.exception('Could not read metadata %s', resource)
try:
# add from/to timestamp if not skipped
if skip_timestamp is False:
LOGGER.debug("add timestamp")
from_timestamp, to_timestamp = get_timerange(resource)
LOGGER.debug("from_timestamp %s", from_timestamp)
filename = "%s_%s-%s" % (filename, int(from_timestamp), int(to_timestamp))
# add format extension
if skip_format is False:
filename = filename + '.nc'
pf = path.dirname(resource)
# add file path
if add_file_path is True:
filename = path.join(pf, filename)
# rename the file
if rename_file is True:
if path.exists(path.join(resource)):
rename(resource, path.join(pf, filename))
except Exception:
LOGGER.exception('Could not generate DRS filename for %s', resource)
return filename
[docs]def sort_by_time(resource):
"""
Sort a list of files by their time variable.
:param resource: File path.
:return: Sorted file list.
"""
from ocgis.util.helpers import get_sorted_uris_by_time_dimension
if type(resource) == list and len(resource) > 1:
sorted_list = get_sorted_uris_by_time_dimension(resource)
elif type(resource) == str:
sorted_list = [resource]
else:
sorted_list = resource
return sorted_list