# Libraries
# The code was initially inspired by the following sources:
# https://www.kaggle.com/pstarszyk/seir-model-with-extensions/comments#Define-Extended-Model
# https://www.kaggle.com/anjum48/seir-hcd-model/data#SEIR-HCD-Model
# http://gabgoh.github.io/COVID/index.html
# https://neherlab.org/covid19/about
import pandas as pd
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
import os
from scipy.integrate import solve_ivp
from scipy.optimize import minimize
from sklearn.metrics import mean_squared_log_error, mean_squared_error
from collections import namedtuple
from collections import defaultdict as ddict
from copy import deepcopy
import warnings
from datetime import datetime, timedelta
warnings.filterwarnings("ignore")
import seaborn as sns
import matplotlib.pylab as plt
from seaborn import heatmap
from pyDOE import *
# This is a function used to control the font size in generating the figures
def update_font_size(font_size = 30,rcParams = mpl.rcParams):
rcParams.update({'font.size': font_size})
plt.rc('font', size=font_size) # controls default text sizes
plt.rc('axes', titlesize=font_size) # fontsize of the axes title
plt.rc('axes', labelsize=font_size*1.1) # fontsize of the x and y labels
plt.rc('xtick', labelsize=font_size) # fontsize of the tick labels
plt.rc('ytick', labelsize=font_size) # fontsize of the tick labels
plt.rc('legend', fontsize=font_size) # legend fontsize
plt.rc('figure', titlesize=font_size)
# Make model labels a global so we can access anywhere
model_labels = {'Cashier' :'Public Facing',
'USPS' :'Non-Public Facing',
'Healthcare' :'Healthcare',
'No Structure':'No EWs',
'Healthcare_Sigmoid' :'Healthcare'}
# Susceptible equation
def dS_dt(y_i, p, N_i, I, Beta_t, H, Beta_hosp, groupID):
if groupID == 1 and p.model == 'Healthcare_Sigmoid':
return - y_i.S * (sum(Beta_t * I) * (1./N_i) + Beta_hosp)
else:
return -(1./N_i) * y_i.S * ( sum(Beta_t * I) + Beta_hosp * H)
# Exposed equation
def dE_dt(y_i, p, N_i, I, Beta_t, H, Beta_hosp, groupID):
if groupID == 1 and p.model == 'Healthcare_Sigmoid':
incoming = y_i.S * (sum(Beta_t * I) * (1./N_i) + Beta_hosp)
else:
incoming = (1./N_i) * y_i.S * ( sum(Beta_t * I) + Beta_hosp * H )
return incoming - (y_i.E / p.t_inc)
# Infected, asymptomatic, and will recover equation
def dIA_dt(y_i, p,groupID):
if groupID == 1:
p_IA = p.p_IA * p.change_in_p_IA
else:
p_IA = p.p_IA
return (y_i.E / p.t_inc) * p_IA - y_i.IA / p.t_AR
# Infected, symptomatic, and will recover equation
def dIR_dt(y_i, p,groupID):
if groupID == 1:
p_IA = p.p_IA * p.change_in_p_IA
p_IH = p.p_IH * p.change_in_p_IH
else:
p_IA = p.p_IA
p_IH = p.p_IH
return (y_i.E / p.t_inc)*(1-p_IA)*(1-p_IH) - y_i.IR / p.t_IR
# Infected and will require hospitalization equation
def dIH_dt(y_i, p,groupID):
if groupID == 1:
p_IA = p.p_IA*p.change_in_p_IA
p_IH = p.p_IH*p.change_in_p_IH
else:
p_IA = p.p_IA
p_IH = p.p_IH
return (y_i.E / p.t_inc)*(1-p_IA)*p_IH - y_i.IH / p.t_IH
# Recovered equation
def dR_dt(y_i, p,groupID):
return y_i.IR / p.t_IR + y_i.IA / p.t_AR + y_i.HR / p.t_HR + y_i.L / p.t_LR
# Hospitalized and will recover equation
def dHR_dt(y_i, p,groupID):
return y_i.IH / p.t_IH * (1 - p.p_HC) - y_i.HR / p.t_HR
# Hospitalized and will require critical care equation
def dHC_dt(y_i, p,groupID):
return y_i.IH / p.t_IH * p.p_HC - y_i.HC / p.t_HC
# Critical care and will recover equation
def dCR_dt(y_i, p,groupID):
return y_i.HC / p.t_HC * (1 - p.p_CD) - (y_i.CR / p.t_CL)
# Critical and will die equation
def dCD_dt(y_i, p, groupID):
return y_i.HC / p.t_HC * p.p_CD - y_i.CD / p.t_CD
# Post-Critical care but still hospitalized equation
def dL_dt(y_i, p,groupID):
return y_i.CR / p.t_CL - y_i.L / p.t_LR
# Dead equation
def dD_dt(y_i, p,groupID):
return y_i.CD / p.t_CD
# Parameters: Using a namedtuple allows you to access parameters by name instead of index (e.g. args.N)
parameters = namedtuple('parameters',["R_0","N","C",
"social_distancing_time","theta","model","proportion_essential","rho",
"max_days","num_groups","compartment_names",
"t_inc","t_AR","t_IR","t_IH","t_HR","t_HC","t_CD","t_CL","t_LR",
"p_IA","p_IH","p_HC","p_CD",
"change_in_p_IH","change_in_p_IA","adjust_beta_hosp"])
# Define the compartments and link names with formulas
compartment_names = ["S","E","IA","IR","IH","HR","HC","CR","CD","L","R","D"]
compartment_formulas = [dS_dt,dE_dt,dIA_dt,dIR_dt,dIH_dt,dHR_dt,dHC_dt,dCR_dt,dCD_dt,dL_dt,dR_dt,dD_dt]
compartments = namedtuple("compartments",compartment_names)
# This function returns an array of differential equations that is used for the ODE solver in the following section
def SEIR_HCD_model(t, y_vals, parameters):
# Convert to matrix
# Each row is a group
# Each column corresponds to a given class within each group
# Columns indexed according to position in name array: compartment_names = ["S", "E", "I", "R", "H", "C", "D"]
y_matrix = np.reshape(y_vals,[parameters.num_groups,len(compartment_names)])
output = np.zeros(np.shape(y_matrix))
# Iterate over each group
# Nonessential = 0
# Essential = 1
for group_idx in range(parameters.num_groups):
Beta_t_i = get_betas(t,parameters,group_idx)
# Components for group i
compartments_group_i = compartments(*y_matrix[group_idx,:])
# Total population within group i
N_i = sum(y_matrix[group_idx,:])
# Get the infected population for each group for all 3 compartments
I = sum(np.transpose(y_matrix[:,2:5]))
# Get the hospitalized population for each group
H = sum(sum(np.transpose(y_matrix[:,5:10])))
# If using a healthcare model and considering the healthcare group,
# we need to consider infections stemming from hospitals
if 'Healthcare' in parameters.model and group_idx == 1:
# Determine the beta or fraction of workers infected
beta_hosp = get_hosp_beta(t,parameters,H)
else:
beta_hosp = 0
# Susceptibles and Exposed DiffEq use special parameters I and Beta_t_i)
S_out_i = dS_dt(y_i = compartments_group_i,
p = parameters,
N_i = N_i,
I = I,
Beta_t = Beta_t_i,
H = H,
Beta_hosp = beta_hosp,
groupID = group_idx)
output[group_idx,0] = S_out_i
E_out_i = dE_dt(compartments_group_i, parameters, N_i, I, Beta_t_i, H, beta_hosp, group_idx)
output[group_idx,1] = E_out_i
# The remainder of the differentiail equations
for index in np.arange(2,len(compartment_names)):
output[group_idx,index] = compartment_formulas[index](compartments_group_i, parameters,group_idx)
# Convert the matrix back into a linear array for the ODE solver
output_array = np.reshape(output,[1,parameters.num_groups*len(compartment_names)])[0]
return output_array
# Return the matrix of thetas that determine how to adjust betas given lockdown
def get_effect_of_social_distancing(p,t=0):
if p.model == 'No Structure' or (p.rho > 0 and (p.model == 'USPS' or p.model == 'Healthcare' or p.model == 'Healthcare_Sigmoid')):
social_distancing_effect = np.array([p.theta]*4)
elif p.model == 'Cashier':
social_distancing_effect = np.array([p.theta]+[1]*3)
elif p.model == 'USPS' or p.model == 'Healthcare' or p.model == 'Healthcare_Sigmoid':
social_distancing_effect = np.array([p.theta]*3 + [1])
else:
raise ValueError('Model should be either Cashier,USPS,or No Structure. '
+ p.model + ' is not an implemented model')
social_distancing_effect = np.reshape(social_distancing_effect,[p.num_groups,p.num_groups])
return social_distancing_effect
# Return the beta for within hospital interactions
def get_hosp_beta(t,p,H):
if p.model == 'Healthcare_Sigmoid':
return get_sigmoid_beta(t,p,H)
else:
# Default: assume the within beta hospital interaction is the same as the beta between I compartments and S
return get_infectious_beta(p)*p.adjust_beta_hosp
def get_sigmoid_beta(t,p,H):
exponent_parameter = 1
halfway_point = 1/4
max_infection_rate = get_infectious_beta(p)*p.adjust_beta_hosp*1.5*H_max/p.N
halfway_exponent = halfway_point**exponent_parameter
h_exponent = (H/H_max)**exponent_parameter
return ( max_infection_rate * ( h_exponent ) / ( h_exponent+halfway_exponent ) )
def get_infectious_beta(p,model = ''):
# llow user to set model, but otherwise use parameter model
if not model:
model = p.model
# Fraction of essential workers
f = p.proportion_essential
# Average time spent in I compartments
avg_time_infectious = (p.p_IA * p.t_AR + (1 - p.p_IA) * ((1-p.p_IH) * p.t_IR + p.p_IH * p.t_IH))
# Average time spent in the hospital
if model == 'Healthcare':
avg_time_critical = (1 - p.p_CD) * (p.t_CL + p.t_LR) + p.p_CD * (p.t_CD)
avg_time_hospital = (1-p.p_HC) * p.t_HR + p.p_HC * (p.t_HC +avg_time_critical)
avg_time_healthcare = (1 - p.p_IA) * p.p_IH * avg_time_hospital
else:
# ignore if not a healthcare model
avg_time_healthcare = 0
# Get the mixing matrix or set to proportional mixing if not provided
C = p.C
if C == []:
C = np.array([[1-f,1-f],[f,f]])
# Vector to describe the distribution of people
people_array = np.array([1-f,f])
# The weighted sum across mixings
beta_coefficient = avg_time_infectious*(sum(np.matmul(C,people_array)) + p.rho * f)
hosp_coefficient = avg_time_healthcare
beta = p.R_0/(beta_coefficient + hosp_coefficient * p.adjust_beta_hosp * keep_R0_constant)
return beta
# This allows for greater within-group mixing than under proportional mixing
def get_extra_rho_effect(t,p,groupID):
if groupID != 1:
return 0
if t > p.social_distancing_time and p.model == 'No Structure':
return p.rho * (p.theta)
return p.rho
def get_betas(t,p,groupID,model = ''):
if not model:
model = p.model
social_distancing_effect = get_effect_of_social_distancing(p,t)
if t>p.social_distancing_time:
social_distancing_effect = social_distancing_effect[:,groupID]
else:
social_distancing_effect = social_distancing_effect[:,groupID] * 0 + 1
# beta is R_0 * mean infectious time
# This is the same beta across all infectious compartments and groups
nointervention_beta = get_infectious_beta(p,model)
# This gives you the betas including the mixing between groups (specific to group i)
betas = list(nointervention_beta*social_distancing_effect*p.C[groupID,:])
extra_rho = get_extra_rho_effect(t,p,groupID)
betas[1] += extra_rho * nointervention_beta
return betas
#These are the parameters used in the models. These are listed in Table 1 of the main text.
#By default, R0 is 2.5, however this is changed to 3.2 for most of the subsequent plots
keep_R0_constant = False
args_default = parameters(R_0 = 2.5, # R_0: used to calculate betas
N = 8e6, # total population size
C = [], # Mixing matrix
social_distancing_time = 50, # When does social distancing start
theta = 1, # How effective is social distancing
model = 'Cashier', # Who social distances?
proportion_essential = 0.1, # What fraction of the population is essential workers?
rho = 1,
max_days = 1000, # bookkeeping of how many days simulated
num_groups = 2, # bookkeeping of how many groups
compartment_names = compartment_names, # bookkeeping of how many compartments
t_inc = 4.6, # !time from E to I: exposed to infected
t_AR = 5, # !time from IA to R: infected and asymptomatic to recovered
t_IR = 5, # !time from IR to R: infected, symptomatic and will recover to recovered
t_IH = 5, # !time from IH to H: infected and will need hospitalization to hospitalization
t_HR = 8, # !time from HR to R: hospitalized and will recover to recovered
t_HC = 6, # !time from HC to C: hospitalization to critical care
t_CD = 10,# time from CD to D: critical care and will die to death
t_CL = 7, # time from CR to L: critical care and will recover to post-critical care hospitalization
t_LR = 3, # time from L to R: post-critical care hospitalization to recovered
p_IA = 1/3, # Proportions of infections that are asymptomatic
p_IH = 0.066, # Proportion of symptomatic infections that require hospitalizations (this corresponds to 4.4% of all infections that require hospitalization)
p_HC = 0.3, # Proportion of hospitalizations that require critical care
p_CD = 0.5, # Proportion of critical cases that are fatal
change_in_p_IH = 1, # How much is the probability of hopsitalization increased in OLD
change_in_p_IA = 1, # How much is the probability of asymptomatic increased in OLD
adjust_beta_hosp = 1) # change the beta for within hospital interactions relative to beta between I and S
# Returns number of cases, hospitalizations, and deaths given model solution, number of groups, and compartment names
def get_compartment_counts(solution, num_groups,
compartment_names = ["S","E","IA","IR","IH","HR","HC","CR","CD","L","R","D"]):
# convert solution into a matrix
max_days = len(solution.y[0])
solution_matrix = np.reshape(solution.y,[num_groups,len(compartment_names),max_days])
# sum across groups
total_solution = np.zeros([len(compartment_names),max_days])
for i in range(num_groups):
total_solution += solution_matrix[i]
# sum across relevant compartments
cases = sum(total_solution[2:])
deaths = total_solution[-1]
hospitilized = sum(total_solution[5:10])
return cases, hospitilized, deaths
# get R_0 from the doubling time
def R0_from_doubling_time(p,doubling_time,model= None,adjust_beta_hosp = np.nan,verbose = False):
# allow user to specify the model and override the default model
if not model:
model = p.model
if np.isnan(adjust_beta_hosp):
adjust_beta_hosp = p.adjust_beta_hosp
# Get average infectious time
avg_time_infectious = (p.p_IA * p.t_AR + (1 - p.p_IA) * ((1-p.p_IH) * p.t_IR + p.p_IH * p.t_IH))
# if healthcare model, add average healthcare time to the infectious time
if 'Healthcare' in model:
avg_time_critical = (1 - p.p_CD) * (p.t_CL + p.t_LR) + p.p_CD * (p.t_CD)
avg_time_hospital = (1 - p.p_HC) * p.t_HR + p.p_HC * (p.t_HC + avg_time_critical)
# only affects essential workers and include the relative deviation in the hospital beta
avg_time_infectious += keep_R0_constant * (1 - p.p_IA) * p.p_IH * avg_time_hospital * adjust_beta_hosp
if verbose:
print(avg_time_infectious,(p.p_IA * p.t_AR + (1 - p.p_IA) * ((1-p.p_IH) * p.t_IR + p.p_IH * p.t_IH)),avg_time_critical,avg_time_hospital)
R0 = 1 + (p.t_inc + avg_time_infectious) * np.log(2) / doubling_time
return R0
#This function will run the model above, and return the solution and parameters used in a tuple
def run_model(theta,R_0,t_lockdown,model,n_infected=50,n_exposed=50,prop_essential=0.1,max_days = 1000,
rho = 1, change_in_p_IH = 1, change_in_p_IA = 1,num_groups = 2,doubling_time = None,
adjust_beta_hosp = 1):
# Define the initial conditions & population size
N = 8e6
# Number of groups
num_groups = 2
# Social interaction effects: C
# C[i][j] = proportion of group I's contacts that are with group
# rho allows deviations from proportionate mixing
C = np.array([ (1 - prop_essential) + rho * prop_essential,
(1 - rho) * (1 - prop_essential),
(1 - rho) * (prop_essential),
1 - (1 - rho) * (1 - prop_essential) - rho])
C = np.reshape(C,[num_groups,num_groups])
if doubling_time:
args = parameters(R_0 = R_0, # R_0: used to calculate betas
N = N, # total population size
C = C, # Mixing matrix
social_distancing_time = t_lockdown, # when does social distancing start
theta = theta, # How effective is social distancing
model = model, # Who social distances?
proportion_essential = prop_essential, #
rho = rho, # record changes from proportionate mixing
max_days = max_days, # bookkeeping of how many days simulated
num_groups = num_groups, # bookkeeping of how many groups
compartment_names = compartment_names, # bookkeeping of how many compartments
t_inc = 4.6, # !time from E to I: exposed to infected
t_AR = 5, # !time from IA to R: infected and asymptomatic to recovered
t_IR = 5, # !time from IR to R: infected, symptomatic and will recover to recovered
t_IH = 5, # !time from IH to H: infected and will need hospitalization to hospitalization
t_HR = 8, # !time from HR to R: hospitalized and will recover to recovered
t_HC = 6, # !time from HC to C: hospitalization to critical care
t_CD = 10,# time from CD to D: critical care and will die to death
t_CL = 7, # time from CR to L: critical care and will recover to post-critical care hospitalization
t_LR = 3, # time from L to R: post-critical care hospitalization to recovered
p_IA = 1/3, # Proportions of infections that are asymptomatic
p_IH = 0.066, # Proportion of symptomatic infections that require hospitalizations, conditional on not being asymptomatic
p_HC = 0.3, # Proportion of hospitalizations that require critical care
p_CD = 0.5, # Proportion of critical cases that are fatal
change_in_p_IH = 1, # How much is the probability of hopsitalization increased in OLD
change_in_p_IA = 1, # How much is the probability of asymptomatic increased in OLD
adjust_beta_hosp = adjust_beta_hosp) # change the beta for within hospital interactions relative to beta between I and S
if doubling_time: R_0 = R0_from_doubling_time(args,doubling_time)
args = parameters(R_0 = R_0, # R_0: used to calculate betas
N = N, # total population size
C = C, # Mixing matrix
social_distancing_time = t_lockdown, # when does social distancing start
theta = theta, # How effective is social distancing
model = model, # Who social distances?
proportion_essential = prop_essential, #
rho = rho, # record changes from proportionate mixing
max_days = max_days, # bookkeeping of how many days simulated
num_groups = num_groups, # bookkeeping of how many groups
compartment_names = compartment_names, # bookkeeping of how many compartments
t_inc = 4.6, # !time from E to I: exposed to infected
t_AR = 5, # !time from IA to R: infected and asymptomatic to recovered
t_IR = 5, # !time from IR to R: infected, symptomatic and will recover to recovered
t_IH = 5, # !time from IH to H: infected and will need hospitalization to hospitalization
t_HR = 8, # !time from HR to R: hospitalized and will recover to recovered
t_HC = 6, # !time from HC to C: hospitalization to critical care
t_CD = 10,# time from CD to D: critical care and will die to death
t_CL = 7, # time from CR to L: critical care and will recover to post-critical care hospitalization
t_LR = 3, # time from L to R: post-critical care hospitalization to recovered
p_IA = 1/3, # Proportions of infections that are asymptomatic
p_IH = 0.066, # Proportion of symptomatic infections that require hospitalizations
p_HC = 0.3, # Proportion of hospitalizations that require critical care
p_CD = 0.5, # Proportion of critical cases that are fatal
change_in_p_IH = 1, # How much is the probability of hopsitalization increased in OLD
change_in_p_IA = 1, # How much is the probability of asymptomatic increased in OLD
adjust_beta_hosp = adjust_beta_hosp) # change the beta for within hospital interactions relative to beta between I and S
single_population = np.zeros(len(compartment_names))
single_population[1] = n_exposed
single_population[2] = n_infected*args.p_IA
single_population[3] = n_infected*(1-args.p_IA)*(1-args.p_IH)
single_population[4] = n_infected*(1-args.p_IA)*(args.p_IH)
single_population[0] = N - sum(single_population)
initial_state = [(1 - prop_essential) * single_population,
prop_essential * single_population]
# Convert initial state into 1-dimensional array
initial_state = [params for group in initial_state for params in group]
# Evaluate the model
# Here, a stiff solver (Radau) not RK45 (default) is used
sol = solve_ivp(lambda t,y: SEIR_HCD_model(t,y,args), [0, max_days], initial_state, t_eval=np.arange(max_days),method = 'Radau')
return (sol,args)
# Get the cumulative infection rate at some time
def get_infection_rate(solution, p, time = 365):
# Turn the solution array into a matrix
solution_matrix = np.reshape(solution.y,[p.num_groups,len(p.compartment_names),p.max_days])
# If no time specified, get cumulative infection at end of time
if not time or time == -1:
time = p.max_days-1
# Get the number of infections, return it as a fraction of the populations
infections = sum(sum(solution_matrix[:,1:,time]))
return infections/p.N
# Return the metrics to show the added disease risk within the essential workers
def added_disease_risk(solution, p,method = 'g(t)',groupID = 1):
solution_matrix = np.reshape(solution.y,[p.num_groups,len(p.compartment_names),p.max_days])
f = p.proportion_essential
if method == 'Fraction Infected':
if groupID == 1:
N = (p.N*(p.proportion_essential))
else:
N = (p.N*(1-p.proportion_essential))
return sum(solution_matrix[groupID,1:,-1])/N
elif method == 'Nonessential Comparison':
L = sum(solution_matrix[1,1:,-1])/sum(solution_matrix[0,1:,-1])*((1-f)/f)
return L
elif method == 'g(t) additive':
return sum(solution_matrix[1,1:,-1])/(sum(solution_matrix[1,1:,-1])+sum(solution_matrix[0,1:,-1]))
elif method == 'g(t) multiplicative':
return sum(solution_matrix[1,1:,-1])/(sum(solution_matrix[1,1:,-1])+sum(solution_matrix[0,1:,-1]))/f
elif method == 'EisenA':
return sum(solution_matrix[1,1:,-1])/(p.N*f)-sum(solution_matrix[0,1:,-1])/(p.N*(1-f))
elif method == 'No Structure Comparison':
NS_sol,NS_p = run_model(theta=p.theta,
R_0=p.R_0,
t_lockdown=p.social_distancing_time,
model='No Structure',
n_infected=20,
n_exposed=0,
prop_essential=p.proportion_essential,
max_days = p.max_days,
rho = 0,
change_in_p_IH = p.change_in_p_IH,
change_in_p_IA = p.change_in_p_IA,
doubling_time = 3,
adjust_beta_hosp = 1)
NS_solution_matrix = np.reshape(NS_sol.y,[p.num_groups,len(p.compartment_names),p.max_days])
# Turn the solution array into a matrix
return sum(solution_matrix[1,1:,-1])/sum(NS_solution_matrix[1,1:,-1])
else:
raise ValueError("Method not understood")
# Given a solution to the model and the corresponding parameters, get several different metrics of model outcomes
def get_metrics(result_dict):
R0 = R0_from_doubling_time(args_default,3)
# Create a nested dict, where the structure is {model:{theta:{f:([],[])}}}
reversed_solved_dict = ddict(dict)
for model,model_dict in result_dict.items():
reversed_solved_dict[model] = ddict(dict)
for f,f_dict in model_dict.items():
for theta,(sol,args) in f_dict.items():
reversed_solved_dict[model][theta][f] = (get_infection_rate(solution=sol,p=args, time = 365),
added_disease_risk(solution=sol,p = args,method = 'Fraction Infected',groupID = 0),
added_disease_risk(solution=sol,p = args,method = 'Fraction Infected',groupID = 1))
return reversed_solved_dict
# Assign row and column labels for the plots of the effective R over time
def assign_row_label_grid(ax,f, pad = 5, xy = (0,0.5),font_size = 30):
ax.annotate('f = {}'.format(f)+'\n', xy=xy, xytext=(-ax.yaxis.labelpad - pad, 0),
xycoords=ax.yaxis.label, textcoords='offset points',
size=font_size, ha='right', va='center',rotation=90)
# Label is the value of R_effective
def assign_column_label_grid(ax,value, pad = 25, xy = (0.5,1),font_size = 30):
text = r'
{}'.format(round(value,2))
ax.annotate(text, xy=xy, xytext=(0, pad),
xycoords='axes fraction', textcoords='offset points',
size=font_size, ha='center', va='baseline')
def get_R_effective(solution_matrix,p):
# Set up an empty array for both effective R and the betas over time
R_eff = np.zeros(p.max_days+1)
betas_eff = np.zeros([2,2,p.max_days+1])
# Get the time values
t = np.arange(0,p.max_days)
# Fraction of population an essential worker
f = p.proportion_essential
# Get average time an infectious person is infectious
avg_time_infectious = (p.p_IA * p.t_AR + (1 - p.p_IA) * ((1-p.p_IH) * p.t_IR + p.p_IH * p.t_IH))
# Find the effects of social distancing. Second variable is from a different implementation
social_distancing_effect = get_effect_of_social_distancing(p,None)
# beta is R_0 * mean infectious time
# This is the same beta across all infectious compartments and groups
nointervention_beta = get_infectious_beta(p)
# Get fraction of infected population that is essential over time
I_t_0 = sum(solution_matrix[0,2:5,:])
I_t_1 = sum(solution_matrix[1,2:5,:])
S_t_0 = solution_matrix[0,0,:]/(p.N*(1-f))
S_t_1 = solution_matrix[1,0,:]/(p.N*f)
f_t_all = (I_t_1*f)/(I_t_0*(1-f)+I_t_1*f)
for j in t:
# Get fraction of infected that are essential at this time, j
f_t = f_t_all[j]
# Calculate betas. This is elementwise multiplication not matrix multiplication
betas = nointervention_beta*(p.C*(1-social_distancing_effect*(j>p.social_distancing_time)))
betas_eff[:,:,j] = betas
# Include extra effect from within group mixing
betas_eff[1,1,j] += get_extra_rho_effect(j,p,1) * nointervention_beta
# Calculate R effective by weighting betas by f(t) and the average infectious time
R_eff[j] = sum(np.matmul(betas_eff[:,:,j]*np.array([S_t_0[j],S_t_1[j]]),np.array([1-f_t,f_t])))
R_eff[j] = R_eff[j]*avg_time_infectious
# If a healthcare model, include the within hospital infections
if 'Healthcare' in p.model:
H = sum(np.transpose(solution_matrix[:,5:10,j]))
f_th = (H[1]*f)/(H[0]*(1-f)+H[1]*f)
hosp_beta = get_hosp_beta(j,p,sum(H))
avg_time_healthcare = (1 - p.p_IA) * p.p_IH * ((1 - p.p_HC) * p.t_HR + p.p_HC * (p.t_HC + (1 - p.p_CD) * (p.t_CL + p.t_LR) + p.p_CD * p.t_CD))
R_eff[j] += avg_time_healthcare*hosp_beta
return R_eff,betas_eff,f_t_all
def create_legend(fig,models: list,ls_styles: dict,linewidth = 2,legend_entries = {},legend_keywords = {}):
legend_entries['EWs'] = mpl.lines.Line2D([],[],color = 'k',ls = ':', linewidth = linewidth)
legend_entries['nEWs'] = mpl.lines.Line2D([],[],color = 'k',ls = '-', linewidth = linewidth)
for model in models:
ls,color = ls_styles[model]
legend_entries[model_labels[model]] = mpl.lines.Line2D([],[],color = color,ls = ls, linewidth = linewidth)
legend = fig.legend(legend_entries.values(),legend_entries.keys(),**legend_keywords)
return legend
def plot_grid(theta_values, f_values, metric, models, t_lockdown, max_days = 500,R_0=2.5,I0=20,adjust_beta_hosp = 1,
figsize = (30,10),model_dependent_rho = None, linewidth = 2,font_size = 30):
update_font_size(font_size)
fig, axes = plt.subplots(len(f_values), len(theta_values),figsize=(14*len(theta_values),12*len(f_values)))
# Plotting LS and color for each model
ls_styles = {'No Structure':('-',[0.5,0.5,0.5,0.5]),
'Cashier':('-',[0,0,1,1]),
'Healthcare':('-','r'),
'USPS':('-','g'),
'Healthcare_Sigmoid':('-','purple')}
for i,f in enumerate(f_values):
for j,theta in enumerate(theta_values):
for model_index,model in enumerate(models):
rho = model_dependent_rho[model]
# run model and reshape output
solution,p = run_model(theta = theta,
R_0 = R_0,
t_lockdown = t_lockdown,
model = model,
n_infected = I0,
n_exposed = I0,
prop_essential = f,
max_days = max_days,
rho = rho,
adjust_beta_hosp = adjust_beta_hosp*f)
solution_matrix = np.reshape(solution.y,[p.num_groups,len(p.compartment_names),p.max_days])
# Get average cumulative infection rate across population
avg_infection_rate = sum(sum(solution_matrix[:,1:,:]))/8e6
# Get proportion of population in each compartment relative to group size
# Note, here N is hardcoded as 8e6 (approx. NYC)
solution_matrix[0,:,:] = solution_matrix[0,:,:]/(8e6*(1-f))
solution_matrix[1,:,:] = solution_matrix[1,:,:]/(8e6*f)
# Get the effective R, betas, and g(t)
R_effective, Betas, G = get_R_effective(solution_matrix,p)
R_effective = R_effective[:-1]
R_effective[:t_lockdown+1] = np.nan
# Get color for each model but don't use LS
ls, color = ls_styles[model]
ls = '-'
if len(f_values) > 1:
if len(theta_values) > 1:
ax = axes[i][j]
else:
ax = axes[i]
else:
if len(theta_values) > 1:
ax = axes[j]
else:
ax = axes
plt.sca(ax)
if metric == 'Average_Infection':
# Plot cumulative infection rate in population
plt.plot(np.arange(0,max_days),avg_infection_rate,ls = '-',color=color,label=model,linewidth = linewidth)
plt.ylabel("Cumulative Infections")
elif metric == 'Essential_Infection':
plt.plot(np.arange(0,max_days),sum(solution_matrix[1,1:,:]),ls = ':',color=color,linewidth = linewidth,alpha = 1)
plt.ylabel("Cumulative Infections\nin Essential Workers")
elif metric == 'Nonessential_Infection':
plt.plot(np.arange(0,max_days),sum(solution_matrix[0,1:,:]),ls = ':',color=color,linewidth = linewidth,alpha = 1)
plt.ylabel("Cumulative Infections\nin Non-Essential Workers")
elif metric == 'All_Infection':
plt.plot(np.arange(0,max_days),sum(solution_matrix[1,1:,:]),ls = ':',color=color,linewidth = linewidth,alpha = 1)
plt.plot(np.arange(0,max_days),sum(solution_matrix[0,1:,:]),ls = '-',color=color,linewidth = linewidth,alpha = 1)
plt.ylabel("Cumulative Infections")
elif metric == 'R(t)':
plt.plot(np.arange(0,max_days),R_effective,color = color,linewidth = linewidth,ls=ls,label=model,rho = 1)
plt.ylabel("R(t)")
else: raise ValueError('Metric "{}" not understood'.format(metric))
plt.xlabel("Days")
plt.xlim([0,200])
if metric != 'R(t)':
plt.fill_betweenx(y=[1e-7,1],x1=0,x2=p.social_distancing_time,color=[0.8,0.8,0.8,0.5])
# Assign a column label
if i == 0 and len(theta_values) > 1:
assign_column_label_grid(ax,R_0*(theta),font_size = font_size)
# Assign a row column
if j == 0 and len(f_values) > 1:
assign_row_label_grid(ax,f,font_size = font_size)
legend = create_legend(fig = fig,
ls_styles = ls_styles,
models = models,
linewidth = linewidth,
legend_keywords = {'loc':'upper left',
'bbox_to_anchor':(0.82, 0.55),
'framealpha': 1,
'edgecolor':'k'})
for ax in axes.flat:
ax.tick_params(width = 5, length = 10)
ax.tick_params(width = 2.5, length = 5,which='minor')
plt.subplots_adjust(wspace=0.3)
plt.subplots_adjust(hspace=0.3)
plt.savefig('Figure2.png',bbox_extra_artists = legend)
plt.show()
# Make plot for Figure 2
desired_Reff = np.array([0.5,0.9,1.5])
R_0 = R0_from_doubling_time(args_default,3)
print('t',(1-desired_Reff/R_0))
plt.rc('legend', fontsize=30)
plot_grid(theta_values = desired_Reff/R_0,
f_values = [0.05],
max_days = 400,
metric = 'All_Infection',
R_0 = R_0,
I0 = 20,
t_lockdown = 59,
models = ['Cashier','USPS','Healthcare','No Structure'],
model_dependent_rho = {'No Structure': 0,
'Cashier': 0,
'Healthcare': 0.5,
'USPS': 0.5,
'Healthcare_Sigmoid': 0},
linewidth = 4,
adjust_beta_hosp = 9.5649565,
font_size=35)
# Make a similar plot to Figure 2, but change f
plt.rc('legend', fontsize=30)
plot_grid(theta_values = desired_Reff/R_0,
f_values = [0.10],
max_days = 400,
metric = 'All_Infection',
R_0 = R_0,
I0 = 20,
t_lockdown = 59,
models = ['Cashier','USPS','Healthcare','No Structure'],
model_dependent_rho = {'No Structure': 0,
'Cashier': 0,
'Healthcare': 0.5,
'USPS': 0.5,
'Healthcare_Sigmoid': 0},
linewidth = 4,
adjust_beta_hosp = 9.5649565,
font_size=35)
# Make a similar plot to Figure 2, but change t_lockdown
plt.rc('legend', fontsize=30)
plot_grid(theta_values = desired_Reff/R_0,
f_values = [0.05],
max_days = 400,
metric = 'All_Infection',
R_0 = R_0,
I0 = 20,
t_lockdown = 59 - 10,
models = ['Cashier','USPS','Healthcare','No Structure'],
model_dependent_rho = {'No Structure': 0,
'Cashier': 0,
'Healthcare': 0.5,
'USPS': 0.5,
'Healthcare_Sigmoid': 0},
linewidth = 4,
adjust_beta_hosp = 9.5649565,
font_size=35)
# Run several models over different values of theta and f
# Here, we look at theta between 0 and 1, and f between 0 and 0.25
def run_iterations(model_names, theta_values = np.linspace(0,1,21),f_values = np.linspace(0,0.25,21),
R_0 = 2.5, t_lockdown = 59, n_infected = 50, n_exposed = 50, max_days = 1000,
change_in_p_IH = 1, change_in_p_IA = 1, doubling_time = None, rho = 9.5649565):
results_dict = {}
# For each of the specified models
for model in model_names:
print(model)
results_dict[model] = {}
if model == 'USPS' or model == 'Healthcare':
rho = 0.5
else:
rho = 0
# For each of the given f values (f = fraction essential)
for f in f_values:
# f can't be 0
if f == 0: continue
#print('\t',f)
results_dict[model][f] = {}
# for each of the given theta values (theta = effectiveness of social distancing)
for theta in theta_values:
# run the model and record the results
results_dict[model][f][theta] = run_model(theta = theta,
R_0 = R_0,
t_lockdown=t_lockdown,
model=model,
n_infected=n_infected,
n_exposed=n_exposed,
prop_essential=f,
max_days = max_days,
rho = rho,
change_in_p_IH = change_in_p_IH,
change_in_p_IA = change_in_p_IA,
adjust_beta_hosp = f*rho)
return results_dict
def assign_row_label_figure3(ax,model, pad = 25, xy = (0,0.5), model_labels = model_labels,font_size = 12):
# Adds the text to left of the subplot
ax.annotate(model_labels[model], xy=xy, xytext=(-ax.yaxis.labelpad - pad, 0),
xycoords=ax.yaxis.label, textcoords='offset points',
size=font_size, ha='right', va='center',rotation=90)
def assign_column_label_figure3(ax,text,index, pad = 5, xy = (0.5,1),font_size=12):
letter_maps = {0: 'A',
1: 'B',
2: 'C'}
label = letter_maps[index] + ': ' + text
pad = 5 # in points
ax.annotate(label+'\n', xy=xy, xytext=(0, pad),
xycoords='axes fraction', textcoords='offset points',
size=font_size, ha='center', va='baseline')
def plot_heatmap(sol_dict,model_names = ['No Structure','Cashier','USPS'],
Xname = '',Yname = '',font_size = 30,center_point = None):
update_font_size(font_size)
# The models we are plotting heatmaps for
model_keys = list(sol_dict.keys())
#
variable_1_keys = np.sort([i for i in sol_dict[model_keys[0]].keys()])
variable_2_keys = np.sort([i for i in sol_dict[model_keys[0]][variable_1_keys[0]].keys()])
fig, axes_all = plt.subplots(len(model_names),3,figsize=[45,10*len(model_names)])
cbar_ax = fig.add_axes([0.85, 0.15, 0.02, 0.7])
# Only considering 2 metrics
for metric in range(3):
# Name of the metrics and levels to plot contour curves for
metric_names = {0:('Total infections after one year',[center_point]),
1:('Infections among non-essentials',[]),
2:('Infections among essential workers',[])}
metric_name,levels = metric_names[metric]
# Get the axis for this model
axes = [row_axes[metric] for row_axes in axes_all]
# Create a matrix to build the heatmaps from
sol_array = np.zeros([len(model_keys),len(variable_1_keys),len(variable_2_keys)])
for i,model in enumerate(model_names):
# Reformat keys to look nice
axis_1_keys = []
axis_2_keys = []
# Get R0 to convert from theta to r0(theta)
R0 = R0_from_doubling_time(args_default,3)
for index,key1 in enumerate(variable_1_keys):
axis_1_keys.append(round(R0*(key1),2))
for index,key2 in enumerate(variable_2_keys):
axis_2_keys.append(key2)
# Fill in the matrix
model_dict = sol_dict[model]
for j,(var1,var1_dict) in enumerate(model_dict.items()):
for k,(var2,solutions) in enumerate(var1_dict.items()):
sol_array[i,j,k] = solutions[metric]
plt.sca(axes[i])
# Get the relevant part of the matrix and orientate the matrix correctly (tranpose then flip vertically)
data = np.flip(np.transpose(sol_array[i,:,:]))
data = pd.DataFrame(data)
data.index = np.flip(axis_2_keys)
data.columns = axis_1_keys
# Make a heatmap from it
HM = heatmap(data=data,
yticklabels=5,xticklabels=5,
cmap = 'Spectral',
center = center_point,
vmin = 0,
vmax = 1,
cbar_ax=cbar_ax)
CS = plt.contour(data,levels=levels,colors='r',linewidths=6)
plt.yticks(rotation=0)
axes[i].tick_params(width = 5,length = 10)
# Label heatmaps with model names
model_labels = {'Cashier' :'Public Facing',
'USPS' :'Non-Public Facing',
'Healthcare' :'Healthcare',
'No Structure' :'No essential workers',
'Healthcare_Sigmoid' :'Healthcare'}
if i == 0:
assign_column_label_figure3(ax=axes[i],text = metric_name, index = metric,font_size=font_size)
if metric == 0:
assign_row_label_figure3(ax=axes[i],model = model,font_size=font_size)
# Set variable names on the axis
if Yname:
axes[i].set_ylabel(Yname)
if Xname:
axes[i].set_xlabel(Xname)
cbar_ax.tick_params(width = 5,length = 10)
fig.subplots_adjust(hspace=0.3,wspace=0.3,right = 0.825)
plt.show()
keep_R0_constant = False
# Get R_0 assuming a doubling time of 3 days
R_0 = R0_from_doubling_time(args_default,3)
R_0_values = np.linspace(2,0,21)
# Get the values of theta and f to iterate over
# Only use values of theta such that R0(theta) < 2
theta_values=np.sort(R_0_values/R_0)
f_values = np.linspace(0,0.25,26)
all_results_phaseDiagram2 = run_iterations(theta_values=theta_values,
f_values = f_values,
R_0 = R_0,
n_exposed = 0,
n_infected = 20,
model_names = ['Cashier','USPS','Healthcare'])
# Make the plot for Figure 3
# This takes the resulting model solutions and converts them to the metrics we want
solved_results = get_metrics(all_results_phaseDiagram2)
# This plots the metrics we want as a heatmap.
sol, pp = run_model(t_lockdown=59,theta = (1/R_0),R_0 = R_0, n_exposed = 0, n_infected = 20, rho = 0,model = 'No Structure',prop_essential=0.05)
center_point = get_infection_rate(solution=sol,p=pp, time = 365)
plot_heatmap(solved_results,Yname=r'
',Xname=r'
',model_names = ['Cashier','USPS','Healthcare'],font_size = 42,center_point = center_point)
# Get the values of theta and f to iterate over
# Only use values of theta such that R0(theta) < 2
theta_values=np.sort(R_0_values/R_0)
f_values = np.linspace(0,0.25,26)
all_results_phaseDiagram2 = run_iterations(theta_values=theta_values,
f_values = f_values,
R_0 = R_0,
n_exposed = 0,
n_infected = 20,
t_lockdown=49, # Changing the lockdown date
model_names = ['Cashier','USPS','Healthcare'])
# Make the plot for Figure 3, but with the t_lockdown moved 10 days earlier
# This takes the resulting model solutions and converts them to the metrics we want
solved_results = get_metrics(all_results_phaseDiagram2)
# This plots the metrics we want as a heatmap.
sol, pp = run_model(t_lockdown=49,theta = (1/R_0),R_0 = R_0, n_exposed = 0, n_infected = 20, rho = 0,model = 'No Structure',prop_essential=0.05)
center_point = get_infection_rate(solution=sol,p=pp, time = 365)
plot_heatmap(solved_results,Yname=r'
',Xname=r'
',model_names = ['Cashier','USPS','Healthcare'],font_size = 42,center_point = center_point)
# Assign row and column labels for the plots of the effective R over time
def assign_row_label_figure4(ax,model,letter,pad = 5, xy = (0,0.5),letter_pad = 0):
if not letter_pad:
letter_pad = pad*20
ax.annotate(letter, xy=xy, xytext=(-ax.yaxis.labelpad - letter_pad, 0),
xycoords=ax.yaxis.label, textcoords='offset points',
size='large', ha='center', va='center',rotation=0)
ax.annotate(model+'\n', xy=xy, xytext=(-ax.yaxis.labelpad - pad, 0),
xycoords=ax.yaxis.label, textcoords='offset points',
size='large', ha='center', va='center',rotation=90)
# Label is the value of R_effective
def assign_column_label_figure4(ax,model, pad = 5, xy = (0.5,1)):
text = model_labels[model]
pad = 5 # in points
ax.annotate(text+'\n', xy=xy, xytext=(0, pad),
xycoords='axes fraction', textcoords='offset points',
size='large', ha='center', va=