Source code for improver.ensemble_copula_coupling.utilities

# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# (C) British Crown copyright. The Met Office.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
#   list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
#   this list of conditions and the following disclaimer in the documentation
#   and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
#   contributors may be used to endorse or promote products derived from
#   this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""
This module defines the utilities required for Ensemble Copula Coupling
plugins.

"""
import warnings
from typing import List, Optional, Union

import cf_units as unit
import iris
import numpy as np
from cf_units import Unit
from iris.cube import Cube
from numpy import ndarray

from improver.ensemble_copula_coupling.constants import BOUNDS_FOR_ECDF


[docs]def concatenate_2d_array_with_2d_array_endpoints( array_2d: ndarray, low_endpoint: float, high_endpoint: float, ) -> ndarray: """ For a 2d array, add a 2d array as the lower and upper endpoints. The concatenation to add the lower and upper endpoints to the 2d array are performed along the second (index 1) dimension. Args: array_2d: 2d array of values low_endpoint: Number used to create a 2d array of a constant value as the lower endpoint. high_endpoint: Number of used to create a 2d array of a constant value as the upper endpoint. Returns: 2d array of values after padding with the low_endpoint and high_endpoint. """ if array_2d.ndim != 2: raise ValueError("Expected 2D input, got {}D input".format(array_2d.ndim)) lower_array = np.full((array_2d.shape[0], 1), low_endpoint, dtype=array_2d.dtype) upper_array = np.full((array_2d.shape[0], 1), high_endpoint, dtype=array_2d.dtype) array_2d = np.concatenate((lower_array, array_2d, upper_array), axis=1) return array_2d
[docs]def choose_set_of_percentiles( no_of_percentiles: int, sampling: str = "quantile" ) -> List[float]: """ Function to create percentiles. Args: no_of_percentiles: Number of percentiles. sampling: Type of sampling of the distribution to produce a set of percentiles e.g. quantile or random. Accepted options for sampling are: * Quantile: A regular set of equally-spaced percentiles aimed at dividing a Cumulative Distribution Function into blocks of equal probability. * Random: A random set of ordered percentiles. Returns: Percentiles calculated using the sampling technique specified. Raises: ValueError: if the sampling option is not one of the accepted options. References: For further details, Flowerdew, J., 2014. Calibrating ensemble reliability whilst preserving spatial structure. Tellus, Series A: Dynamic Meteorology and Oceanography, 66(1), pp.1-20. Schefzik, R., Thorarinsdottir, T.L. & Gneiting, T., 2013. Uncertainty Quantification in Complex Simulation Models Using Ensemble Copula Coupling. Statistical Science, 28(4), pp.616-640. """ if sampling in ["quantile"]: # Generate percentiles from 1/N+1 to N/N+1. percentiles = np.linspace( 1 / float(1 + no_of_percentiles), no_of_percentiles / float(1 + no_of_percentiles), no_of_percentiles, ).tolist() elif sampling in ["random"]: # Generate percentiles from 1/N+1 to N/N+1. # Random sampling doesn't currently sample the ends of the # distribution i.e. 0 to 1/N+1 and N/N+1 to 1. percentiles = np.random.uniform( 1 / float(1 + no_of_percentiles), no_of_percentiles / float(1 + no_of_percentiles), no_of_percentiles, ) percentiles = sorted(list(percentiles)) else: msg = "Unrecognised sampling option '{}'".format(sampling) raise ValueError(msg) return [item * 100 for item in percentiles]
[docs]def create_cube_with_percentiles( percentiles: Union[List[float], ndarray], template_cube: Cube, cube_data: ndarray, cube_unit: Optional[Union[Unit, str]] = None, ) -> Cube: """ Create a cube with a percentile coordinate based on a template cube. The resulting cube will have an extra percentile coordinate compared with the template cube. The shape of the cube_data should be the shape of the desired output cube. Args: percentiles: Ensemble percentiles. There should be the same number of percentiles as the first dimension of cube_data. template_cube: Cube to copy metadata from. cube_data: Data to insert into the template cube. The shape of the cube_data, excluding the dimension associated with the percentile coordinate, should be the same as the shape of template_cube. For example, template_cube shape is (3, 3, 3), whilst the cube_data is (10, 3, 3, 3), where there are 10 percentiles. cube_unit: The units of the data within the cube, if different from those of the template_cube. Returns: Cube containing a percentile coordinate as the leading dimension (or scalar percentile coordinate if single-valued) """ # create cube with new percentile dimension cubes = iris.cube.CubeList([]) for point in percentiles: cube = template_cube.copy() cube.add_aux_coord( iris.coords.AuxCoord( np.float32(point), long_name="percentile", units=unit.Unit("%") ) ) cubes.append(cube) result = cubes.merge_cube() # replace data and units result.data = cube_data if cube_unit is not None: result.units = cube_unit return result
[docs]def get_bounds_of_distribution(bounds_pairing_key: str, desired_units: Unit) -> ndarray: """ Gets the bounds of the distribution and converts the units of the bounds_pairing to the desired_units. This method gets the bounds values and units from the imported dictionaries: BOUNDS_FOR_ECDF and units_of_BOUNDS_FOR_ECDF. The units of the bounds are converted to be the desired units. Args: bounds_pairing_key: Name of key to be used for the BOUNDS_FOR_ECDF dictionary, in order to get the desired bounds_pairing. desired_units: Units to which the bounds_pairing will be converted. Returns: Lower and upper bound to be used as the ends of the empirical cumulative distribution function, converted to have the desired units. Raises: KeyError: If the bounds_pairing_key is not within the BOUNDS_FOR_ECDF dictionary. """ # Extract bounds from dictionary of constants. try: bounds_pairing = BOUNDS_FOR_ECDF[bounds_pairing_key].value bounds_pairing_units = BOUNDS_FOR_ECDF[bounds_pairing_key].units except KeyError as err: msg = ( "The bounds_pairing_key: {} is not recognised " "within BOUNDS_FOR_ECDF {}. \n" "Error: {}".format(bounds_pairing_key, BOUNDS_FOR_ECDF, err) ) raise KeyError(msg) bounds_pairing_units = unit.Unit(bounds_pairing_units) bounds_pairing = bounds_pairing_units.convert( np.array(bounds_pairing), desired_units ) return bounds_pairing
[docs]def insert_lower_and_upper_endpoint_to_1d_array( array_1d: ndarray, low_endpoint: float, high_endpoint: float ) -> ndarray: """ For a 1d array, add a lower and upper endpoint. Args: array_1d: 1d array of values low_endpoint: Number of use as the lower endpoint. high_endpoint: Number of use as the upper endpoint. Returns: 1d array of values padded with the low_endpoint and high_endpoint. """ if array_1d.ndim != 1: raise ValueError("Expected 1D input, got {}D input".format(array_1d.ndim)) lower_array = np.array([low_endpoint]) upper_array = np.array([high_endpoint]) array_1d = np.concatenate((lower_array, array_1d, upper_array)) if array_1d.dtype == np.float64: array_1d = array_1d.astype(np.float32) return array_1d
[docs]def restore_non_percentile_dimensions( array_to_reshape: ndarray, original_cube: Cube, n_percentiles: int ) -> ndarray: """ Reshape a 2d array, so that it has the dimensions of the original cube, whilst ensuring that the probabilistic dimension is the first dimension. Args: array_to_reshape: The array that requires reshaping. This has dimensions "percentiles" by "points", where "points" is a flattened array of all the other original dimensions that needs reshaping. original_cube: Cube slice containing the desired shape to be reshaped to, apart from the probabilistic dimension. This would typically be expected to be either [time, y, x] or [y, x]. n_percentiles: Length of the required probabilistic dimension ("percentiles"). Returns: The array after reshaping. Raises: ValueError: If the probabilistic dimension is not the first on the original_cube. CoordinateNotFoundError: If the input_probabilistic_dimension_name is not a coordinate on the original_cube. """ shape_to_reshape_to = list(original_cube.shape) if n_percentiles > 1: shape_to_reshape_to = [n_percentiles] + shape_to_reshape_to return array_to_reshape.reshape(shape_to_reshape_to)
[docs]def slow_interp_same_x(x: np.ndarray, xp: np.ndarray, fp: np.ndarray) -> np.ndarray: """For each row i of fp, calculate np.interp(x, xp, fp[i, :]). Args: x: 1-D array xp: 1-D array, sorted in non-decreasing order fp: 2-D array with len(xp) columns Returns: 2-D array with shape (len(fp), len(x)), with each row i equal to np.interp(x, xp, fp[i, :]) """ result = np.empty((fp.shape[0], len(x)), np.float32) for i in range(fp.shape[0]): result[i, :] = np.interp(x, xp, fp[i, :]) return result
[docs]def interpolate_multiple_rows_same_x(*args): """For each row i of fp, do the equivalent of np.interp(x, xp, fp[i, :]). Calls a fast numba implementation where numba is available (see `improver.ensemble_copula_coupling.numba_utilities.fast_interp_same_y`) and calls a the native python implementation otherwise (see :func:`slow_interp_same_y`). Args: x: 1-D array xp: 1-D array, sorted in non-decreasing order fp: 2-D array with len(xp) columns Returns: 2-D array with shape (len(fp), len(x)), with each row i equal to np.interp(x, xp, fp[i, :]) """ try: import numba # noqa: F401 from improver.ensemble_copula_coupling.numba_utilities import fast_interp_same_x return fast_interp_same_x(*args) except ImportError: warnings.warn("Module numba unavailable. ResamplePercentiles will be slower.") return slow_interp_same_x(*args)
[docs]def slow_interp_same_y(x: np.ndarray, xp: np.ndarray, fp: np.ndarray) -> np.ndarray: """For each row i of xp, do the equivalent of np.interp(x, xp[i], fp). Args: x: 1-d array xp: n * m array, each row must be in non-decreasing order fp: 1-d array with length m Returns: n * len(x) array where each row i is equal to np.interp(x, xp[i], fp) """ result = np.empty((xp.shape[0], len(x)), dtype=np.float32) for i in range(xp.shape[0]): result[i] = np.interp(x, xp[i, :], fp) return result
[docs]def interpolate_multiple_rows_same_y(*args): """For each row i of xp, do the equivalent of np.interp(x, xp[i], fp). Calls a fast numba implementation where numba is available (see `improver.ensemble_copula_coupling.numba_utilities.fast_interp_same_y`) and calls a the native python implementation otherwise (see :func:`slow_interp_same_y`). Args: x: 1-d array xp: n * m array, each row must be in non-decreasing order fp: 1-d array with length m Returns: n * len(x) array where each row i is equal to np.interp(x, xp[i], fp) """ try: import numba # noqa: F401 from improver.ensemble_copula_coupling.numba_utilities import fast_interp_same_y return fast_interp_same_y(*args) except ImportError: warnings.warn( "Module numba unavailable. ConvertProbabilitiesToPercentiles will be slower." ) return slow_interp_same_y(*args)