"""Trajectory metrics manager.
Contains all metric methods which take a Trajectory as input.
"""
__author__ = "Henry Robbins (henryrobbins)"
import numpy as np
from .trajectory import Trajectory
from typing import List
[docs]def get_bucket(trajectory: Trajectory, bucket: str,
meta_groups: List[str] = None, aggregate: bool = True,
cumulative: bool = False, normalize: bool = False):
"""Return the given bucket vector.
Args:
trajectory (Trajectory): Trajectory to compute metric for,.
bucket (str): One of the simulation buckets: {S, I, R, D, H}.
meta_groups (List[str]): Meta-groups to aggregate over. Default to all.
aggregate (bool): Aggregate over the meta-groups if True.
cumulative (bool): Return cumulative metric over time if True.
normalize (bool): Normalize relative to total population.
"""
sim = trajectory.sim
population = trajectory.scenario.population
A = {"S": sim.S, "I": sim.I, "R": sim.R, "D": sim.D, "H": sim.H}[bucket]
# aggregate across meta-groups
A_ = []
if meta_groups is None:
meta_groups = population.meta_group_names
for meta_group in meta_groups:
idx = population.meta_group_ids(meta_group)
A_.append(np.sum(A[:, idx], axis=1))
A = np.array(A_).T
if aggregate:
x = np.sum(A, axis=1)
else:
x = A
if cumulative:
x = np.cumsum(x, axis=0)
if normalize:
total_pop = np.sum(sim.S + sim.I + sim.R, axis=1)[0]
x /= total_pop
return x
[docs]def get_hospitalizations(trajectory: Trajectory, meta_groups: List[str] = None,
aggregate: bool = True, cumulative: bool = False,
normalize: bool = False):
"""Return hospitalizations as computed from hospitalization rates.
Args:
trajectory (Trajectory): Trajectory to compute metric for,.
meta_groups (List[str]): Meta-groups to aggregate over. Default to all.
aggregate (bool): Aggregate over the meta-groups if True.
cumulative (bool): Return cumulative metric over time if True.
normalize (bool): Normalize relative to total population.
"""
scenario = trajectory.scenario
population = trajectory.scenario.population
I = get_bucket(trajectory, bucket="I", meta_groups=meta_groups,
aggregate=False, cumulative=cumulative,
normalize=normalize)
if meta_groups is None:
hospitalization_rates = scenario.hospitalization_rates
else:
hospitalization_rates = []
for i, meta_group in enumerate(population.meta_group_names):
if meta_group in meta_groups:
hospitalization_rates.append(scenario.hospitalization_rates[i])
hospitalizations = hospitalization_rates * I
if aggregate:
hospitalizations = np.sum(hospitalizations, axis=1)
return hospitalizations
def _get_isolated(discovered: np.ndarray, generation_time: float,
iso_lengths: List[int], iso_props: List[int]):
"""Return the isolated count (not including the arrival positives).
As an intermediate step, this function calculates isolation_frac, which
tells us what fraction of discovered positives need isolation.
isolation_frac[i] is the fraction of people discovered i generations ago
that require isolation in the current generation. For example, suppose 80%
of people require isolation for 2 generations and 20% for only 1. Then
isolation_frac = [1, .2] is appropriate because all of the people
discovered in the current generation require isolation and only 20% of
those discovered in the previous generation still require isolation.
As another example:
If 80% of people require isolation for 5 days and 20% for 10 days, then set
iso_lengths = [5,10] and iso_props = [0.8, 0.2] so that
isolation_frac[0] = 1,
isolation_frac[1] = 0.2*1 + 0.8*(5-4)/4 = 0.4
isolation_frac[2] = 0.2*(10-8)/4 = 0.1
Args:
discovered (np.ndarray): Number of people discovered.
generation_time (float): The number of days per generation.
iso_lengths (List[int]): List of isolation lengths (in days).
iso_props (List[int])): List of probability of isolation lengths.
"""
max_isolation = int(np.ceil(iso_lengths[-1] / generation_time))
# Compute isolation_frac as described above
isolation_frac = np.ones(max_isolation)
for t in range(1,max_isolation):
isolation_frac[t] = 0
for i, duration in enumerate(iso_lengths):
isolation_frac[t] += \
iso_props[i] * \
np.clip((duration - generation_time*t) / generation_time, 0, 1)
isolated = np.zeros(discovered.shape)
for t in range(len(discovered)):
for i in range(max_isolation):
if t-i >= 0:
isolated[t] = isolated[t] + isolation_frac[i] * discovered[t-i]
return isolated
[docs]def get_total_discovered(trajectory: Trajectory, meta_groups: List[str] = None,
aggregate: bool = True, cumulative: bool = False,
normalize: bool = False):
"""Return the number of discovered individuals at each iteration.
This includes those discovered in arrival spread across the arrival period.
Args:
trajectory (Trajectory): Trajectory object.
meta_groups (List[str]): Limit to isolated in these metagroups.
aggregate (bool): Aggregate over the meta-groups if True.
cumulative (bool): Return cumulative metric over time if True.
normalize (bool): Normalize relative to total population.
"""
D = get_bucket(trajectory, bucket="D", meta_groups=meta_groups,
aggregate=aggregate, cumulative=cumulative,
normalize=normalize)
# TODO: think through this--I think this is double counting
arrival_discovered = _get_arrival_discovered(trajectory, meta_groups,
aggregate=aggregate,
normalize=normalize)
return D + arrival_discovered
[docs]def get_total_isolated(trajectory: Trajectory, meta_groups: List[str] = None,
aggregate: bool = True, cumulative: bool = False,
normalize: bool = False):
"""Return the number of isolated individuals at each generation.
This includes those isolating as a result of being discovered upon arrival.
Args:
trajectory (Trajectory): Trajectory object.
meta_groups (List[str]): Limit to isolated in these metagroups.
aggregate (bool): Aggregate over the meta-groups if True.
cumulative (bool): Return cumulative metric over time if True.
normalize (bool): Normalize relative to total population.
"""
scenario = trajectory.scenario
generation_time = scenario.generation_time
iso_lengths = scenario.isolation_lengths
iso_props = scenario.isolation_fracs
total_discovered = get_total_discovered(trajectory, meta_groups, aggregate,
cumulative, normalize)
total_isolated = _get_isolated(total_discovered, generation_time,
iso_lengths, iso_props)
return total_isolated
def _get_arrival_discovered(trajectory: Trajectory,
meta_groups: List[str] = None,
aggregate: bool = False,
cumulative: bool = False,
normalize: bool = False):
"""Return arrival discovered (spread across the arrival period)."""
sim = trajectory.sim
scenario = trajectory.scenario
population = scenario.population
strategy = trajectory.strategy
# Get total arrival discovered
arrival_discovered_sum = \
strategy.get_arrival_discovered(
scenario.init_recovered, scenario.init_infections,
scenario.pct_recovered_discovered_arrival)
if meta_groups is not None:
names = population.meta_group_names
idx = [i for i, mg in enumerate(meta_groups) if mg in names]
arrival_discovered_sum = arrival_discovered_sum[idx]
# Spread arrival discoverd over the arrival period
if meta_groups is None:
K = len(population.meta_group_list)
else:
K = len(meta_groups)
arrival_discovered = np.zeros((sim.max_T, K))
for i in range(scenario.arrival_period):
arrival_discovered[i] += \
arrival_discovered_sum / scenario.arrival_period
if aggregate:
x = np.sum(arrival_discovered, axis=1)
else:
x = arrival_discovered
if cumulative:
x = np.cumsum(x, axis=0)
if normalize:
total_pop = np.sum(sim.S + sim.I + sim.R, axis=1)[0]
x /= total_pop
return x