Source code for eggshell.utils
# -*- coding: utf-8 -*-
"""Utitility functions."""
import os
import tempfile
import tarfile
import requests
import shutil
from netCDF4 import Dataset, MFDataset
from re import search
from urllib.parse import urlparse
from zipfile import ZipFile
import eggshell as eg
from eggshell.config import Paths
from eggshell.nc.nc_utils import get_variable
import logging
[docs]LOGGER = logging.getLogger("EGGSHELL")
[docs]def archive(resources, format='tar', dir_output=None, mode=None):
"""
Compresses a list of files into an archive.
:param resources: list of files to be stored in archive
:param format: archive format. Options: tar (default), zip
:param dir_output: path to output folder (default: tempory folder)
:param mode: for format='tar':
'w' or 'w:' open for writing without compression
'w:gz' open for writing with gzip compression
'w:bz2' open for writing with bzip2 compression
'w|' open an uncompressed stream for writing
'w|gz' open a gzip compressed stream for writing
'w|bz2' open a bzip2 compressed stream for writing
for foramt='zip':
read "r", write "w" or append "a"
:return str: archive path/filname.ext
"""
dir_output = dir_output or tempfile.gettempdir()
mode = mode or 'w'
if format not in ['tar', 'zip']:
raise Exception('archive format {} not supported (only zip and tar)'.format(format))
LOGGER.info('compressing files to archive, format={}'.format(format))
# convert to list if necessary
if not isinstance(resources, list):
resources = list([resources])
resources = [x for x in resources if x is not None]
_, arch = tempfile.mkstemp(dir=dir_output, suffix='.{}'.format(format))
try:
if format == 'tar':
with tarfile.open(arch, mode) as tar:
for f in resources:
tar.add(f, arcname=os.path.basename(f))
elif format == 'zip':
with ZipFile(arch, mode=mode) as zf:
for f in resources:
zf.write(f, os.path.basename(f))
except Exception as e:
raise Exception('failed to create {} archive: {}'.format(format, e))
return arch
[docs]def download_file(url, out=None, verify=False):
if out:
local_filename = out
else:
local_filename = url.split('/')[-1]
r = requests.get(url, stream=True, verify=verify)
with open(local_filename, 'wb') as fp:
shutil.copyfileobj(r.raw, fp)
return local_filename
[docs]def local_path(url):
url_parts = urlparse(url)
return url_parts.path
[docs]def download(url, cache=False):
"""
Downloads URL using the Python requests module to the current directory.
:param cache: if True then files will be downloaded to a cache directory.
:param url: url adress of the target file location
:return str: filename
"""
filename = ''
try:
if cache:
parsed_url = urlparse(url)
filename = os.path.join(paths.cache, parsed_url.netloc, parsed_url.path.strip('/'))
if os.path.exists(filename):
LOGGER.info('file already in cache: %s', os.path.basename(filename))
# if check_creationtime(filename, url):
# msg = 'file in cache older than archive file, downloading: {}'.format(os.path.basename(filename))
# LOGGER.info(msg)
# os.remove(filename)
# filename = download_file(url, out=filename)
# TODO: enable creation time check
else:
if not os.path.exists(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
LOGGER.info('downloading: {}'.format(url))
filename = download_file(url, out=filename)
# make softlink to current dir
# os.symlink(filename, os.path.basename(filename))
# filename = os.path.basename(filename)
else:
filename = download_file(url)
except Exception as e:
msg = 'failed to download data: {}'.format(e)
LOGGER.exception(msg)
return filename
# def get_coordinates(resource, variable=None, unrotate=False):
# """
# reads out the coordinates of a variable
#
# :param resource: netCDF resource file
# :param variable: variable name
# :param unrotate: If True the coordinates will be returned for unrotated pole
#
# :returns list, list: latitudes , longitudes
# """
# if type(resource) != list:
# resource = [resource]
#
# if variable is None:
# variable = get_variable(resource)
#
# if unrotate is False:
# try:
# if len(resource) > 1:
# ds = MFDataset(resource)
# else:
# ds = Dataset(resource[0])
#
# var = ds.variables[variable]
# dims = list(var.dimensions)
# if 'time' in dims:
# dims.remove('time')
# # TODO: find position of lat and long in list and replace dims[0] dims[1]
# lats = ds.variables[dims[0]][:]
# lons = ds.variables[dims[1]][:]
# ds.close()
# LOGGER.info('got coordinates without pole rotation')
# except Exception as e:
# msg = 'failed to extract coordinates: {}'.format(e)
# raise Exception(msg)
# else:
# lats, lons = unrotate_pole(resource)
# LOGGER.info('got coordinates with pole rotation')
# return lats, lons
[docs]def address_append(address):
"""
Formats a URL/URI to be more easily read with libraries such as "rasterstats"
:param address: URL/URI to a potential zip or tar file
:return: URL/URI prefixed for archive type
"""
zipped = search(r'(\.zip)', address)
tarred = search(r'(\.tar)', address)
try:
if zipped:
return 'zip://{}'.format(address)
elif tarred:
return 'tar://{}'.format(address)
else:
return address
except Exception as e:
msg = 'Failed to prefix or parse URL: {}'.format(e)
raise Exception(msg)