# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# (C) British Crown copyright. The Met Office.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
""" Provides support utilities for cube manipulation."""
import warnings
from typing import Any, Dict, List, Optional, Union
import iris
import numpy as np
from iris.coords import DimCoord
from iris.cube import Cube, CubeList
from iris.exceptions import CoordinateNotFoundError
from improver import BasePlugin
from improver.metadata.constants import FLOAT_DTYPE, FLOAT_TYPES
from improver.metadata.probabilistic import find_threshold_coordinate
from improver.utilities.cube_checker import check_cube_coordinates
[docs]def collapsed(cube: Cube, *args: Any, **kwargs: Any) -> Cube:
"""Collapses the cube with given arguments.
The cell methods of the output cube will match the cell methods
from the input cube. Any cell methods generated by the iris
collapsed method will not be retained.
Args:
cube:
A Cube to be collapsed.
Returns:
A collapsed cube where the cell methods match the input cube.
"""
original_methods = cube.cell_methods
new_cube = cube.collapsed(*args, **kwargs)
new_cube.cell_methods = original_methods
# demote escalated datatypes as required
if new_cube.dtype in FLOAT_TYPES:
new_cube.data = new_cube.data.astype(FLOAT_DTYPE)
collapsed_coords = args[0] if isinstance(args[0], list) else [args[0]]
for coord in collapsed_coords:
if new_cube.coord(coord).points.dtype in FLOAT_TYPES:
new_cube.coord(coord).points = new_cube.coord(coord).points.astype(
FLOAT_DTYPE
)
if new_cube.coord(coord).bounds is not None:
new_cube.coord(coord).bounds = new_cube.coord(coord).bounds.astype(
FLOAT_DTYPE
)
return new_cube
[docs]def collapse_realizations(cube: Cube) -> Cube:
"""Collapses the realization coord of a cube and strips the coord from the cube.
Args:
cube:
Input cube
Returns:
Cube with realization coord collapsed and removed.
"""
returned_cube = collapsed(cube, "realization", iris.analysis.MEAN)
returned_cube.remove_coord("realization")
return returned_cube
[docs]def get_dim_coord_names(cube: Cube) -> List[str]:
"""
Returns an ordered list of dimension coordinate names on the cube
Args:
cube
Returns:
List of dimension coordinate names
"""
return [coord.name() for coord in cube.coords(dim_coords=True)]
[docs]def get_coord_names(cube: Cube) -> List[str]:
"""
Returns a list of all coordinate names on the cube
Args:
cube
Returns:
List of all coordinate names
"""
return [coord.name() for coord in cube.coords()]
[docs]def strip_var_names(cubes: Union[Cube, CubeList]) -> CubeList:
"""
Strips var_name from the cube and from all coordinates except where
required to support probabilistic metadata. Inputs are modified in place.
Args:
cubes
Returns:
cubes with stripped var_name
"""
if isinstance(cubes, iris.cube.Cube):
cubes = iris.cube.CubeList([cubes])
for cube in cubes:
cube.var_name = None
for coord in cube.coords():
# retain var name required for threshold coordinate
if coord.var_name != "threshold":
coord.var_name = None
return cubes
[docs]class MergeCubes(BasePlugin):
"""
Class adding functionality to iris.merge_cubes()
Accounts for differences in attributes, cell methods and bounds ranges to
avoid merge failures and anonymous dimensions.
"""
[docs] def __init__(self) -> None:
"""Initialise constants"""
# List of attributes to remove silently if unmatched
self.silent_attributes = ["history", "title", "mosg__grid_version"]
[docs] @staticmethod
def _equalise_cell_methods(cubelist: CubeList) -> None:
"""
Function to equalise cell methods that do not match. Modifies cubes
in place.
Args:
cubelist:
List of cubes to check the cell methods and revise.
"""
cell_methods = cubelist[0].cell_methods
for cube in cubelist[1:]:
cell_methods = list(set(cell_methods) & set(cube.cell_methods))
for cube in cubelist:
cube.cell_methods = tuple(cell_methods)
[docs] @staticmethod
def _check_time_bounds_ranges(cube: Cube) -> None:
"""
Check the bounds on any dimensional time coordinates after merging.
For example, to check time and forecast period ranges for accumulations
to avoid blending 1 hr with 3 hr accumulations. If points on the
coordinate are not compatible, raise an error.
Args:
cube:
Merged cube
"""
for name in ["time", "forecast_period"]:
try:
coord = cube.coord(name)
except CoordinateNotFoundError:
continue
if coord.bounds is None:
continue
if len(coord.points) == 1:
continue
bounds_ranges = np.abs(np.diff(coord.bounds))
reference_range = bounds_ranges[0]
if not np.all(np.isclose(bounds_ranges, reference_range)):
msg = (
"Cube with mismatching {} bounds ranges "
"cannot be blended".format(name)
)
raise ValueError(msg)
[docs] def process(
self,
cubes_in: Union[List[Cube], CubeList],
check_time_bounds_ranges: bool = False,
slice_over_realization: bool = False,
copy: bool = True,
) -> Cube:
"""
Function to merge cubes, accounting for differences in attributes,
coordinates and cell methods. Note that cubes with different sets
of coordinates (as opposed to cubes with the same coordinates with
different values) cannot be merged.
If the input is a single Cube, this is returned unmodified. A
CubeList of length 1 is checked for mismatched time bounds before
returning the single Cube (since a CubeList of this form may be the
result of premature iris merging on load).
Args:
cubes_in:
Cubes to be merged.
check_time_bounds_ranges:
Flag to check whether scalar time bounds ranges match.
This is for when we are expecting to create a new "time" axis
through merging for eg precipitation accumulations, where we
want to make sure that the bounds match so that we are not eg
combining 1 hour with 3 hour accumulations.
slice_over_realization:
Options to combine cubes with different realization dimensions.
These cannot always be concatenated directly as this can create a
non-monotonic realization coordinate.
copy:
If True, this will copy the cubes, thus not having any impact on
the original objects.
Returns:
Merged cube.
"""
# if input is already a single cube, return unchanged
if isinstance(cubes_in, iris.cube.Cube):
return cubes_in
if len(cubes_in) == 1:
# iris merges cubelist into shortest list possible on load
# - may already have collapsed across invalid time bounds
if check_time_bounds_ranges:
self._check_time_bounds_ranges(cubes_in[0])
return cubes_in[0]
if copy:
# create copies of input cubes so as not to modify in place
cube_return = lambda cube: cube.copy()
else:
cube_return = lambda cube: cube
cubelist = iris.cube.CubeList([])
for cube in cubes_in:
if slice_over_realization:
for real_slice in cube.slices_over("realization"):
cubelist.append(cube_return(real_slice))
else:
cubelist.append(cube_return(cube))
# equalise cube attributes, cell methods and coordinate names
iris.util.equalise_attributes(cubelist)
strip_var_names(cubelist)
self._equalise_cell_methods(cubelist)
# merge resulting cubelist
result = cubelist.merge_cube()
# check time bounds if required
if check_time_bounds_ranges:
self._check_time_bounds_ranges(result)
return result
[docs]def get_filtered_attributes(cube: Cube, attribute_filter: Optional[str] = None) -> Dict:
"""
Build dictionary of attributes that match the attribute_filter. If the
attribute_filter is None, return all attributes.
Args:
cube:
A cube from which attributes partially matching the
attribute_filter will be returned.
attribute_filter:
A string to match, or partially match, against attributes to build
a filtered attribute dictionary. If None, all attributes are
returned.
Returns:
A dictionary of attributes partially matching the attribute_filter
that were found on the input cube.
"""
attributes = cube.attributes
if attribute_filter is not None:
attributes = {k: v for (k, v) in attributes.items() if attribute_filter in k}
return attributes
[docs]def compare_attributes(
cubes: CubeList, attribute_filter: Optional[str] = None
) -> List[Dict]:
"""
Function to compare attributes of cubes
Args:
cubes:
List of cubes to compare (must be more than 1)
attribute_filter:
A string to filter which attributes are actually compared. If None
all attributes are compared.
Returns:
List of dictionaries of unmatching attributes
Warns:
Warning: If only a single cube is supplied
"""
unmatching_attributes = []
if len(cubes) == 1:
msg = "Only a single cube so no differences will be found "
warnings.warn(msg)
else:
reference_attributes = get_filtered_attributes(
cubes[0], attribute_filter=attribute_filter
)
common_keys = reference_attributes.keys()
for cube in cubes[1:]:
cube_attributes = get_filtered_attributes(
cube, attribute_filter=attribute_filter
)
common_keys = {
key
for key in cube_attributes.keys()
if key in common_keys
and np.all(cube_attributes[key] == reference_attributes[key])
}
for cube in cubes:
cube_attributes = get_filtered_attributes(
cube, attribute_filter=attribute_filter
)
unique_attributes = {
key: value
for (key, value) in cube_attributes.items()
if key not in common_keys
}
unmatching_attributes.append(unique_attributes)
return unmatching_attributes
[docs]def compare_coords(
cubes: CubeList, ignored_coords: Optional[List[str]] = None
) -> List[Dict]:
"""
Function to compare the coordinates of the cubes
Args:
cubes:
List of cubes to compare (must be more than 1)
ignored_coords:
List of coordinate names that identify coordinates to exclude from
the comparison.
Returns:
List of dictionaries of unmatching coordinates
Number of dictionaries equals number of cubes
unless cubes is a single cube in which case
unmatching_coords returns an empty list.
Warns:
Warning: If only a single cube is supplied
"""
if ignored_coords is None:
ignored_coords = []
unmatching_coords = []
if len(cubes) == 1:
msg = "Only a single cube so no differences will be found "
warnings.warn(msg)
else:
common_coords = cubes[0].coords()
for cube in cubes[1:]:
cube_coords = cube.coords()
common_coords = [
coord
for coord in common_coords
if (
coord in cube_coords
and np.all(cube.coords(coord) == cubes[0].coords(coord))
)
]
for i, cube in enumerate(cubes):
unmatching_coords.append({})
for coord in cube.coords():
if coord not in common_coords and coord.name() not in ignored_coords:
dim_coords = cube.dim_coords
if coord in dim_coords:
dim_val = dim_coords.index(coord)
else:
dim_val = None
aux_val = None
if dim_val is None and len(cube.coord_dims(coord)) > 0:
aux_val = cube.coord_dims(coord)[0]
unmatching_coords[i].update(
{
coord.name(): {
"data_dims": dim_val,
"aux_dims": aux_val,
"coord": coord,
}
}
)
return unmatching_coords
[docs]def sort_coord_in_cube(cube: Cube, coord: str, descending: bool = False) -> Cube:
"""Sort a cube based on the ordering within the chosen coordinate.
Sorting can either be in ascending or descending order.
This code is based upon https://gist.github.com/pelson/9763057.
Args:
cube:
The input cube to be sorted.
coord:
Name of the coordinate to be sorted.
descending:
If True it will be sorted in descending order.
Returns:
Cube where the chosen coordinate has been sorted into either
ascending or descending order.
Warns:
Warning if the coordinate being processed is a circular coordinate.
"""
coord_to_sort = cube.coord(coord)
if isinstance(coord_to_sort, DimCoord):
if coord_to_sort.circular:
msg = (
"The {} coordinate is circular. If the values in the "
"coordinate span a boundary then the sorting may return "
"an undesirable result.".format(coord_to_sort.name())
)
warnings.warn(msg)
(dim,) = cube.coord_dims(coord_to_sort)
index = [slice(None)] * cube.ndim
index[dim] = np.argsort(coord_to_sort.points)
if descending:
index[dim] = index[dim][::-1]
return cube[tuple(index)]
[docs]def enforce_coordinate_ordering(
cube: Cube, coord_names: Union[List[str], str], anchor_start: bool = True
) -> None:
"""
Function to reorder dimensions within a cube.
Note that the input cube is modified in place.
Args:
cube:
Cube where the ordering will be enforced to match the order within
the coord_names. This input cube will be modified as part of this
function.
coord_names:
List of the names of the coordinates to order. If a string is
passed in, only the single specified coordinate is reordered.
anchor_start:
Define whether the specified coordinates should be moved to the
start (True) or end (False) of the list of dimensions. If True, the
coordinates are inserted as the first dimensions in the order in
which they are provided. If False, the coordinates are moved to the
end. For example, if the specified coordinate names are
["time", "realization"] then "realization" will be the last
coordinate within the cube, whilst "time" will be the last but one.
"""
if isinstance(coord_names, str):
coord_names = [coord_names]
# construct a list of dimensions on the cube to be reordered
dim_coord_names = get_dim_coord_names(cube)
coords_to_reorder = []
for coord in coord_names:
if coord == "threshold":
try:
coord = find_threshold_coordinate(cube).name()
except CoordinateNotFoundError:
continue
if coord in dim_coord_names:
coords_to_reorder.append(coord)
original_coords = cube.coords(dim_coords=True)
coord_dims = cube.coord_dims
# construct list of reordered dimensions assuming start anchor
new_dims = [coord_dims(coord)[0] for coord in coords_to_reorder]
new_dims.extend(
[
coord_dims(coord)[0]
for coord in original_coords
if coord_dims(coord)[0] not in new_dims
]
)
# if anchor is end, reshuffle the list
if not anchor_start:
new_dims_end = new_dims[len(coords_to_reorder) :]
new_dims_end.extend(new_dims[: len(coords_to_reorder)])
new_dims = new_dims_end
# transpose cube using new coordinate order
if new_dims != sorted(new_dims):
cube.transpose(new_dims)
[docs]def clip_cube_data(cube: Cube, minimum_value: float, maximum_value: float) -> Cube:
"""Apply np.clip to data in a cube to ensure that the limits do not go
beyond the provided minimum and maximum values.
Args:
cube:
The cube that has been processed and contains data that is to be
clipped.
minimum_value:
The minimum value, with data in the cube that falls below this
threshold set to it.
maximum_value:
The maximum value, with data in the cube that falls above this
threshold set to it.
Returns:
The processed cube with the data clipped to the limits of the
original preprocessed cube.
"""
original_attributes = cube.attributes
original_methods = cube.cell_methods
result = iris.cube.CubeList()
for cube_slice in cube.slices([cube.coord(axis="y"), cube.coord(axis="x")]):
cube_slice.data = np.clip(cube_slice.data, minimum_value, maximum_value)
result.append(cube_slice)
result = result.merge_cube()
result.cell_methods = original_methods
result.attributes = original_attributes
result = check_cube_coordinates(cube, result)
return result
[docs]def expand_bounds(
result_cube: Cube, cubelist: Union[List[Cube], CubeList], coord_names: List[str],
) -> Cube:
"""Alter a coordinate on result_cube such that bounds are expanded to cover
the entire range of the input cubes (cubelist). The input result_cube is
modified in place and returned.
For example, in the case of time cubes if the input cubes have
bounds of [0000Z, 0100Z] & [0100Z, 0200Z] then the output cube will
have bounds of [0000Z,0200Z]. The returned coordinate point will be
equal to the upper bound.
Args:
result_cube:
Cube with coords requiring expansion
cubelist:
List of input cubes with source coords
coord_names:
Coordinates which should be expanded
Returns:
Cube with coords expanded.
"""
for coord in coord_names:
if len(result_cube.coord(coord).points) != 1:
emsg = (
"the expand bounds function should only be used on a"
'coordinate with a single point. The coordinate "{}" '
"has {} points."
)
raise ValueError(emsg.format(coord, len(result_cube.coord(coord).points)))
bounds = [cube.coord(coord).bounds for cube in cubelist]
if any(b is None for b in bounds):
if not all(b is None for b in bounds):
raise ValueError(
"cannot expand bounds for a mixture of "
"bounded / unbounded coordinates"
)
points = [cube.coord(coord).points for cube in cubelist]
new_low_bound = np.min(points)
new_top_bound = np.max(points)
else:
new_low_bound = np.min(bounds)
new_top_bound = np.max(bounds)
result_coord = result_cube.coord(coord)
result_coord.bounds = np.array([[new_low_bound, new_top_bound]])
if result_coord.bounds.dtype in FLOAT_TYPES:
result_coord.bounds = result_coord.bounds.astype(FLOAT_DTYPE)
result_coord.points = [new_top_bound]
if result_coord.points.dtype in FLOAT_TYPES:
result_coord.points = result_coord.points.astype(FLOAT_DTYPE)
return result_cube
[docs]def filter_realizations(cubes: CubeList) -> Cube:
"""For a given list of cubes, identifies the set of times, filters out any realizations
that are not present at all times and returns a merged cube of the result.
Args:
cubes:
List of cubes to be filtered
Returns:
Cube:
Filtered and merged cube
"""
times = set()
realizations = set()
for cube in cubes:
times.update([c.point for c in cube.coord("time").cells()])
realizations.update(cube.coord("realization").points)
filtered_cubes = CubeList()
for realization in realizations:
realization_cube = MergeCubes()(
cubes.extract(iris.Constraint(realization=realization))
)
if set([c.point for c in realization_cube.coord("time").cells()]) == times:
filtered_cubes.append(realization_cube)
return MergeCubes()(filtered_cubes)
[docs]def add_coordinate_to_cube(
cube: Cube,
new_coord: DimCoord,
new_dim_location: int = 0,
copy_metadata: bool = True,
) -> Cube:
"""Create a copy of input cube with an additional dimension coordinate
added to the cube at the specified axis. The data from input cube is broadcast
over this new dimension.
Args:
cube:
cube to add realization dimension to.
new_coord:
new coordinate to add to input cube.
new_dim_location:
position in cube.data to position the new dimension coord. Default is
to add the new coordinate as the leading dimension.
copy_metadata:
flag as to whether to carry metadata over to output cube.
Returns:
A copy of cube broadcast over the new dimension coordinate.
"""
input_dim_count = len(cube.dim_coords)
if (new_dim_location > input_dim_count) or (new_dim_location < 0):
raise ValueError(
f"New dimension location: {new_dim_location} incompatible \
with cube containing {input_dim_count}."
)
new_dim_coords = list(cube.dim_coords) + [new_coord]
new_dims = list(range(input_dim_count + 1))
new_dim_coords_and_dims = list(zip(new_dim_coords, new_dims))
aux_coords = cube.aux_coords
aux_coord_dims = [cube.coord_dims(coord.name()) for coord in aux_coords]
new_aux_coords_and_dims = list(zip(aux_coords, aux_coord_dims))
new_coord_size = len(new_coord.points)
new_data = np.broadcast_to(
cube.data[..., np.newaxis], shape=cube.shape + (new_coord_size,)
).astype(cube.data.dtype)
output_cube = Cube(
new_data,
dim_coords_and_dims=new_dim_coords_and_dims,
aux_coords_and_dims=new_aux_coords_and_dims,
)
if copy_metadata:
output_cube.metadata = cube.metadata
final_dim_order = np.insert(
np.arange(input_dim_count), new_dim_location, values=input_dim_count
)
output_cube.transpose(final_dim_order)
return output_cube