From 21a71ccaf834952935ad1e4c458931e67aa9f888 Mon Sep 17 00:00:00 2001 From: Paul Natsuo Kishimoto Date: Mon, 19 Aug 2024 11:28:08 +0200 Subject: [PATCH] Handle non-int age in sales_fraction_annual() --- message_ix_models/model/transport/operator.py | 65 ++++++++++++++++--- .../tests/model/transport/test_operator.py | 30 ++++++++- 2 files changed, 83 insertions(+), 12 deletions(-) diff --git a/message_ix_models/model/transport/operator.py b/message_ix_models/model/transport/operator.py index 6db8068779..9168f4c174 100644 --- a/message_ix_models/model/transport/operator.py +++ b/message_ix_models/model/transport/operator.py @@ -3,7 +3,7 @@ import logging import re from functools import partial, reduce -from itertools import product +from itertools import pairwise, product from operator import gt, le, lt from typing import ( TYPE_CHECKING, @@ -26,6 +26,7 @@ from genno import Computer, KeySeq, Operator, quote from genno.operator import apply_units, rename_dims from genno.testing import assert_qty_allclose, assert_units +from scipy import integrate from sdmx.model.v21 import Code from message_ix_models import ScenarioInfo @@ -813,20 +814,66 @@ def relabel2(qty: "AnyQuantity", new_dims: dict): return result +def uniform_in_dim(value: "AnyQuantity", dim: str = "y") -> "AnyQuantity": + """Construct a uniform distribution from `value` along its :math:`y`-dimension. + + `value` must have a dimension `dim` with length 1 and a single value, :math:`k`. The + sole `dim`-coordinate is taken as :math:`d_{max}`: the upper end of a uniform + distribution of which the mean is :math:`d_{max} - k`. + + Returns + ------- + genno.Quantity + with dimension `dim`, and `dim` coordinates including all integer values up to + and including :math:`d_{max}`. Values are the piecewise integral of the uniform + distribution in the interval ending at the respective coordinate. + """ + d_max = value.coords[dim].item() # Upper end of the distribution + width = 2 * value.item() # Width of a uniform distribution + height = 1.0 / width # Height of the distribution + d_min = d_max - width # Lower end of the distribution + + def _uniform(x: float) -> float: + """Uniform distribution between `d_min` and `d_max`.""" + return height if d_min < x < d_max else 0.0 + + # Points for piecewise integration of `_uniform`: integers from the first <= d_min + # to d_max inclusive + points = np.arange(np.floor(d_min), d_max + 1).astype(int) + + # - Group `points` pairwise: (d0, d1), (d1, d2) + # - On each interval, compute the integral of _uniform() by quadrature. + # - Convert to Quantity with y-dimension, labelled with interval upper bounds. + return genno.Quantity( + pd.Series( + {b: integrate.quad(_uniform, a, b)[0] for a, b in pairwise(points)} + ).rename_axis(dim), + units="", + ) + + def sales_fraction_annual(age: "AnyQuantity") -> "AnyQuantity": - """Return fractions of current stock that should be sold in prior years.""" + """Return fractions of current vehicle stock that should be added in prior years. - def _(value: int): - """Produce the sales fractions for a scalar `value`.""" - N = 2 * value.item() # Number of periods - y0 = value.coords["y"].item() # Initial period - coords = dict(y=range(y0 + 1 - N, y0 + 1)) # Periods included - return genno.Quantity([1.0 / N] * N, coords=coords, units="") + Parameters + --- + age : genno.Quantity + Mean age of vehicle stock. Must have dimension "y" and at least 1 other + dimension. For every unique combination of those other dimensions, there must be + only one value/|y|-coordinate. This is taken as the *rightmost* end of a uniform + distribution with mean age given by the respective value. + Returns + ------- + genno.Quantity + Same dimensionality as `age`, with sufficient |y| coordinates to cover all years + in which. Every integer year is included, i.e. the result is **not** aggregated + to multi-year periods (called ``year`` in MESSAGE). + """ # - Group by all dims other than `y`. # - Apply the function to each scalar value. dims = list(filter(lambda d: d != "y", age.dims)) - return age.groupby(dims).apply(_) + return age.groupby(dims).apply(uniform_in_dim) def share_weight( diff --git a/message_ix_models/tests/model/transport/test_operator.py b/message_ix_models/tests/model/transport/test_operator.py index df07c7b4ca..aaa04966aa 100644 --- a/message_ix_models/tests/model/transport/test_operator.py +++ b/message_ix_models/tests/model/transport/test_operator.py @@ -1,5 +1,6 @@ +import genno +import numpy.testing as npt import pytest -from genno import Quantity from genno.testing import assert_qty_equal from message_ix import Scenario from numpy.testing import assert_allclose @@ -11,6 +12,7 @@ distance_nonldv, factor_input, factor_ssp, + sales_fraction_annual, transport_check, ) from message_ix_models.model.transport.structure import get_technology_groups @@ -69,7 +71,7 @@ def test_distance_nonldv(test_context, regions): # Check a computed value assert_qty_equal( - Quantity(32.7633, units="Mm / vehicle / year", name="non-ldv distance"), + genno.Quantity(32.7633, units="Mm / vehicle / year", name="non-ldv distance"), result.sel(nl=f"{regions}_EEU", t="BUS", drop=True), ) @@ -131,8 +133,30 @@ def test_factor_ssp(test_context, ssp: SSP_2024) -> None: assert {"n", "y"} == set(result.dims) +def test_sales_fraction_annual(): + q = genno.Quantity( + [[12.4, 6.1]], coords={"y": [2020], "n": list("AB")}, units="year" + ) + + result = sales_fraction_annual(q) + + # Result has same dimensions as input + assert set(q.dims) == set(result.dims) + # Results are bounded in y at the last period given in input + assert 2020 == max(q.coords["y"]) + # Results sum to 1.0 + npt.assert_allclose([1.0, 1.0], result.sum(dim="y")) + + # Values for earlier periods are uniform… + assert result.sel(n="A", y=1997).item() == result.sel(n="A", y=2020).item() + assert result.sel(n="B", y=2009).item() == result.sel(n="B", y=2020).item() + # Except the earliest period, containing the remainder of the distribution + assert result.sel(n="A", y=1996).item() <= result.sel(n="A", y=2020).item() + assert result.sel(n="B", y=2008).item() <= result.sel(n="B", y=2020).item() + + @pytest.mark.xfail(reason="Incomplete test") def test_transport_check(test_context): s = Scenario(test_context.get_platform(), model="m", scenario="s", version="new") - transport_check(s, Quantity()) + transport_check(s, genno.Quantity())