Source code for improver.wind_calculations.wind_components

# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# (C) British Crown copyright. The Met Office.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
#   list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
#   this list of conditions and the following disclaimer in the documentation
#   and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
#   contributors may be used to endorse or promote products derived from
#   this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""Module containing plugin to resolve wind components."""

from typing import Tuple

import numpy as np
from iris.analysis import Linear
from iris.analysis.cartography import rotate_winds
from iris.coord_systems import GeogCS
from iris.coords import DimCoord
from iris.cube import Cube
from numpy import ndarray

from improver import BasePlugin
from improver.utilities.cube_manipulation import compare_coords

# Global coordinate reference system used in StaGE (GRS80)
GLOBAL_CRS = GeogCS(semi_major_axis=6378137.0, inverse_flattening=298.257222101)


[docs]class ResolveWindComponents(BasePlugin): """ Plugin to resolve wind components along an input cube's projection axes, given directions with respect to true North """ def __repr__(self) -> str: """Represent the plugin instance as a string""" return "<ResolveWindComponents>"
[docs] @staticmethod def calc_true_north_offset(reference_cube: Cube) -> ndarray: """ Calculate the angles between grid North and true North, as a matrix of values on the grid of the input reference cube. Args: reference_cube: 2D cube on grid for which "north" is required. Provides both coordinate system (reference_cube.coord_system()) and template spatial grid on which the angle adjustments should be provided. Returns: Angle in radians by which wind direction wrt true North at each point must be rotated to be relative to grid North. """ reference_x_coord = reference_cube.coord(axis="x") reference_y_coord = reference_cube.coord(axis="y") # find corners of reference_cube grid in lat / lon coordinates latlon = [ GLOBAL_CRS.as_cartopy_crs().transform_point( reference_x_coord.points[i], reference_y_coord.points[j], reference_cube.coord_system().as_cartopy_crs(), ) for i in [0, -1] for j in [0, -1] ] latlon = np.array(latlon).T.tolist() # define lat / lon coordinates to cover the reference_cube grid at an # equivalent resolution lat_points = np.linspace( np.floor(min(latlon[1])), np.ceil(max(latlon[1])), len(reference_y_coord.points), ) lon_points = np.linspace( np.floor(min(latlon[0])), np.ceil(max(latlon[0])), len(reference_x_coord.points), ) lat_coord = DimCoord( lat_points, "latitude", units="degrees", coord_system=GLOBAL_CRS ) lon_coord = DimCoord( lon_points, "longitude", units="degrees", coord_system=GLOBAL_CRS ) # define a unit vector wind towards true North over the lat / lon grid udata = np.zeros(reference_cube.shape, dtype=np.float32) vdata = np.ones(reference_cube.shape, dtype=np.float32) ucube_truenorth = Cube( udata, "grid_eastward_wind", dim_coords_and_dims=[(lat_coord, 0), (lon_coord, 1)], ) vcube_truenorth = Cube( vdata, "grid_northward_wind", dim_coords_and_dims=[(lat_coord, 0), (lon_coord, 1)], ) # rotate unit vector onto reference_cube coordinate system ucube, vcube = rotate_winds( ucube_truenorth, vcube_truenorth, reference_cube.coord_system() ) # unmask and regrid rotated winds onto reference_cube grid ucube.data = ucube.data.data ucube = ucube.regrid(reference_cube, Linear()) vcube.data = vcube.data.data vcube = vcube.regrid(reference_cube, Linear()) # ratio of u to v winds is the tangent of the angle which is the # true North to grid North rotation angle_adjustment = np.arctan2(ucube.data, vcube.data) return angle_adjustment
[docs] @staticmethod def resolve_wind_components( speed: Cube, angle: Cube, adj: ndarray ) -> Tuple[Cube, Cube]: """ Perform trigonometric reprojection onto x and y axes Args: speed: Cube containing wind speed data angle: Cube containing wind directions as angles from true North adj: 2D array of wind direction angle adjustments in radians, to convert zero reference from true North to grid North. Broadcast automatically if speed and angle cubes have extra dimensions. Returns: - Cube containing wind vector component in the positive x-direction u_speed - Cube containing wind vector component in the positive y-direction v_speed """ angle.convert_units("radians") angle.data += adj # output vectors should be pointing "to" not "from" if "wind_from_direction" in angle.name(): angle.data += np.pi sin_angle = np.sin(angle.data) cos_angle = np.cos(angle.data) uspeed = np.multiply(speed.data, sin_angle) vspeed = np.multiply(speed.data, cos_angle) return [speed.copy(data=uspeed), speed.copy(data=vspeed)]
[docs] def process(self, wind_speed: Cube, wind_dir: Cube) -> Tuple[Cube, Cube]: """ Convert wind speed and direction into u,v components along input cube projection axes. Args: wind_speed: Cube containing wind speed values wind_dir: Cube containing wind direction values relative to true North Returns: - Cube containing wind speeds in the positive projection x-axis direction, with units and projection matching wind_speed cube. - Cube containing wind speeds in the positive projection y-axis direction, with units and projection matching wind_speed cube. """ # check cubes contain the correct data (assuming CF standard names) if "wind_speed" not in wind_speed.name(): msg = "{} cube does not contain wind speeds" raise ValueError("{} {}".format(wind_speed.name(), msg)) if "wind" not in wind_dir.name() or "direction" not in wind_dir.name(): msg = "{} cube does not contain wind directions" raise ValueError("{} {}".format(wind_dir.name(), msg)) # check input cube coordinates match ignored_coords = ["wind_from_direction status_flag", "wind_speed status_flag"] unmatched_coords = compare_coords( [wind_speed, wind_dir], ignored_coords=ignored_coords ) if unmatched_coords != [{}, {}]: msg = "Wind speed and direction cubes have unmatched coordinates" raise ValueError("{} {}".format(msg, unmatched_coords)) # calculate angle adjustments for wind direction wind_dir_slice = next( wind_dir.slices( [wind_dir.coord(axis="y").name(), wind_dir.coord(axis="x").name()] ) ) adj = self.calc_true_north_offset(wind_dir_slice) # calculate grid eastward and northward speeds ucube, vcube = self.resolve_wind_components(wind_speed, wind_dir, adj) # relabel final cubes with CF compliant data names corresponding to # positive wind speeds along the x and y axes ucube.rename("grid_eastward_wind") vcube.rename("grid_northward_wind") return ucube, vcube