# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# (C) British Crown copyright. The Met Office.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""Provides tools for neighbourhood generation"""
from typing import Any, Tuple, Union
import numpy as np
from numpy import ndarray
[docs]def rolling_window(
input_array: ndarray, shape: Tuple[int, int], writeable: bool = False
) -> ndarray:
"""Creates a rolling window neighbourhood of the given `shape` from the
last `len(shape)` axes of the input array. Avoids creating a large output
array by constructing a non-continuous view mapped onto the input array.
args:
input_array:
An array from which rolling window neighbourhoods will be created.
shape:
The neighbourhood shape e.g. if the neighbourhood
size is 3, the shape would be (3, 3) to create a
3x3 array around each point in the input_array.
writeable:
If True the returned view will be writeable. This will modify
the input array, so use with caution.
Returns:
"views" into the data, each view represents
a neighbourhood of points.
Raises:
ValueError: If `input_array` has fewer dimensions than `shape`.
RuntimeError: If any dimension of `shape` is larger than
the corresponding dimension of `input_array`.
"""
num_window_dims = len(shape)
num_arr_dims = len(input_array.shape)
if num_arr_dims < num_window_dims:
raise ValueError(
"Number of dimensions of the input array must be greater than or "
"equal to the length of the neighbourhood shape used for "
"constructing rolling window neighbourhoods."
)
adjshp = (
*input_array.shape[:-num_window_dims],
*(
arr_dims - win_dims + 1
for arr_dims, win_dims in zip(input_array.shape[-num_window_dims:], shape)
),
*shape,
)
if any(arr_dims <= 0 for arr_dims in adjshp):
raise RuntimeError(
"The calculated shape of the output array view contains a "
"dimension that is negative or zero. Each dimension of the "
"neighbourhood shape must be less than or equal to the "
"corresponding dimension of the input array."
)
strides = input_array.strides + input_array.strides[-num_window_dims:]
return np.lib.stride_tricks.as_strided(
input_array, shape=adjshp, strides=strides, writeable=writeable
)
[docs]def pad_and_roll(
input_array: ndarray, shape: Tuple[int, int], **kwargs: Any
) -> ndarray:
"""Pads the last `len(shape)` axes of the input array for `rolling_window`
to create 'neighbourhood' views of the data of a given `shape` as the last
axes in the returned array. Collapsing over the last `len(shape)` axes
results in a shape of the original input array.
args:
input_array:
The dataset of points to pad and create rolling windows for.
shape:
Desired shape of the neighbourhood. E.g. if a neighbourhood
width of 1 around the point is desired, this shape should be (3, 3)::
X X X
X O X
X X X
Where O is our central point and X represent the neighbour points.
kwargs:
additional keyword arguments passed to `numpy.pad` function.
Returns:
Contains the views of the input_array, the final dimension of
the array will be the specified shape in the input arguments,
the leading dimensions will depend on the shape of the input array.
"""
writeable = kwargs.pop("writeable", False)
pad_extent = [(0, 0)] * (len(input_array.shape) - len(shape))
pad_extent.extend((d // 2, d // 2) for d in shape)
input_array = np.pad(input_array, pad_extent, **kwargs)
return rolling_window(input_array, shape, writeable=writeable)
[docs]def pad_boxsum(
data: ndarray, boxsize: Union[int, Tuple[int, int]], **pad_options: Any
) -> ndarray:
"""Pad an array to shape suitable for `boxsum`.
Note that padding is not symmetric: there is an extra row/column at
the top/left (as required for calculating the boxsum).
Args:
data:
The input data array.
boxsize:
The size of the neighbourhood.
pad_options:
Additional keyword arguments passed to `numpy.pad` function.
Returns:
Array padded to shape suitable for `boxsum`.
"""
boxsize = np.atleast_1d(boxsize)
ih, jh = boxsize[0] // 2, boxsize[-1] // 2
padding = [(0, 0)] * (data.ndim - 2) + [(ih + 1, ih), (jh + 1, jh)]
padded = np.pad(data, padding, **pad_options)
return padded
[docs]def boxsum(
data: ndarray,
boxsize: Union[int, Tuple[int, int]],
cumsum: bool = True,
**pad_options: Any,
) -> ndarray:
"""Fast vectorised approach to calculating neighbourhood totals.
This function makes use of the summed-area table method. An input
array is accumulated top to bottom and left to right. This accumulated
array can then be used to efficiently calculate the total within a
neighbourhood about any point. An example input data array::
| 1 | 1 | 1 | 1 | 1 |
| 1 | 1 | 1 | 1 | 1 |
| 1 | 1 | 1 | 1 | 1 |
| 1 | 1 | 1 | 1 | 1 |
is accumulated to become::
| 1 | 2 | 3 | 4 | 5 |
| 2 | 4 | 6 | 8 | 10 |
| 3 | 6 | 9 | 12 | 15 |
| 4 | 8 | 12 | 16 | 20 |
| 5 | 10 | 15 | 20 | 25 |
If we wish to calculate the total in a 3x3 neighbourhood about
some point (*) of our array we use the following points::
| 1 (C) | 2 | 3 | 4 (D) | 5 |
| 2 | 4 | 6 | 8 | 10 |
| 3 | 6 | 9 (*) | 12 | 15 |
| 4 (A) | 8 | 12 | 16 (B) | 20 |
| 5 | 10 | 15 | 20 | 25 |
And the calculation is::
Neighbourhood sum = C - A - D + B
= 1 - 4 - 4 + 16
= 9
This is the value we would expect for a 3x3 neighbourhood
in an array filled with ones.
Args:
data:
The input data array.
boxsize:
The size of the neighbourhood. Must be an odd number.
cumsum:
If False, assume the input data is already cumulative. If True
(default), calculate cumsum along the last two dimensions of
the input array.
pad_options:
Additional keyword arguments passed to `numpy.pad` function.
If given, the returned result will have the same shape as the input
array.
Returns:
Array containing the calculated neighbourhood total.
Raises:
ValueError: If `boxsize` has non-integer type.
ValueError: If any member of `boxsize` is not an odd number.
"""
boxsize = np.atleast_1d(boxsize)
if not issubclass(boxsize.dtype.type, np.integer):
raise ValueError("The size of the neighbourhood must be of an integer type.")
if not np.all(boxsize % 2):
raise ValueError("The size of the neighbourhood must be an odd number.")
if pad_options:
data = pad_boxsum(data, boxsize, **pad_options)
if cumsum:
data = data.cumsum(-2).cumsum(-1)
i, j = boxsize[0], boxsize[-1]
m, n = data.shape[-2] - i, data.shape[-1] - j
result = (
data[..., i : i + m, j : j + n]
- data[..., :m, j : j + n]
+ data[..., :m, :n]
- data[..., i : i + m, :n]
)
return result