\(\newcommand{\B}[1]{ {\bf #1} }\) \(\newcommand{\R}[1]{ {\rm #1} }\)
no_ode_xam.py¶
View page sourceno_ode_xam: Python Source Code¶
# ----------------------------------------------------------------------------
# imports
# ----------------------------------------------------------------------------
import sys
import os
import copy
import time
import csv
import random
import numpy
import shutil
import dismod_at
import math
#
# import at_cascade with a preference current directory version
current_directory = os.getcwd()
if os.path.isfile( current_directory + '/at_cascade/__init__.py' ) :
sys.path.insert(0, current_directory)
import at_cascade
# -----------------------------------------------------------------------------
# global variables
# -----------------------------------------------------------------------------
# BEGIN fit_goal_set
fit_goal_set = { 'n1', 'n2' }
# END fit_goal_set
#
# BEGIN random_seed
random_seed = 0
if random_seed == 0 :
random_seed = int( time.time() )
random.seed(random_seed)
print('no_ode_xam: random_seed = ', random_seed)
# END random_seed
#
# BEGIN alpha_true
alpha_true = { 'iota':- 0.25, 'chi':-0.15 }
alpha_true_max_abs = 0.3
# END alpha_true
#
# BEGIN avg_income
avg_income = { 'n1':1.0, 'n2':2.0 }
avg_income['n0'] = ( avg_income['n1'] + avg_income['n2'] ) / 2.0
# END avg_income
#
# BEGIN random_effect_true
random_effect_true = { 'n0': 0.0, 'n1': 0.3, 'n2': -0.3}
# END random_effect_true
#
# BEGIN age_grid
age_grid = [0.0, 20.0, 40.0, 60.0, 80.0, 100.0 ]
# END age_grid
#
# BEGIN income_grid
random_income = False
income_grid = dict()
for node in [ 'n1', 'n2' ] :
max_income = 2.0 * avg_income[node]
n_income_grid = 3
d_income_grid = max_income / (n_income_grid - 1)
income_grid[node] = [ j * d_income_grid for j in range(n_income_grid) ]
# END income_grid
# ----------------------------------------------------------------------------
# functions
# ----------------------------------------------------------------------------
# BEGIN rate_true
def rate_true(rate, a, t, n, c) :
if rate in [ 'pini', 'rho' ] :
return 0.0
income = c[0]
R_n = random_effect_true[n]
I_n = avg_income['n0']
if rate == 'iota' :
effect = R_n + alpha_true[rate] * ( income - I_n )
return (1 + a / 100) * 1e-3 * math.exp( effect )
if rate == 'chi' :
effect = R_n + alpha_true[rate] * ( income - I_n )
# chi is constant up to second age grid point because prevalence
# cannot determine chi at age zero.
aa = max(a, age_grid[1] )
return (1 + aa / 100) * 1e-2 * math.exp( effect )
if rate == 'omega' :
# omega random effect is constrained for each node so to get an
# exact match it cannot depend on income, but it can depend on
# age and time
effect = R_n + a / 10000
return (1 + a / 100) * 1e-2 * math.exp( effect)
assert False
# END rate_true
# ----------------------------------------------------------------------------
def average_integrand(integrand_name, age, node_name, income) :
covariate_list = [ income ]
def iota(a, t) :
return rate_true('iota', a, t, node_name, covariate_list)
def chi(a, t) :
return rate_true('chi', a, t, node_name, covariate_list)
def omega(a, t) :
return rate_true('omega', a, t, node_name, covariate_list)
rate = { 'iota': iota, 'chi': chi, 'omega': omega }
grid = { 'age' : [age], 'time': [2000.0] }
abs_tol = 1e-6
avg_integrand = dismod_at.average_integrand(
rate, integrand_name, grid, abs_tol
)
return avg_integrand
# ----------------------------------------------------------------------------
def root_node_db(file_name) :
# BEGIN iota_chi_50
covariate_list = [ avg_income['n0'] ]
iota_50 = rate_true('iota', 50.0, None, 'n0', covariate_list )
chi_50 = rate_true('chi', 50.0, None, 'n0', covariate_list )
# END iota_chi_50
#
# prior_table
prior_table = list()
#
prior_table.append(
# BEGIN parent_iota_value_prior
{ 'name': 'parent_iota_value_prior',
'density': 'uniform',
'lower': iota_50 / 10.0,
'upper': iota_50 * 10.0,
'mean': iota_50,
'std' : iota_50 * 10.0,
'eta': iota_50 * 1e-3,
}
# END parent_iota_value_prior
)
prior_table.append(
# BEGIN parent_chi_value_prior
{ 'name': 'parent_chi_value_prior',
'density': 'uniform',
'lower': chi_50 / 10.0,
'upper': chi_50 * 10.0,
'mean': chi_50,
'std': chi_50 * 10.0,
'eta': chi_50 * 1e-3,
}
# END parent_chi_value_prior
)
#
prior_table.append(
# BEGIN parent_dage_prior
{ 'name': 'parent_dage_prior',
'density': 'log_gaussian',
'mean': 0.0,
'std': 4.0,
'eta': min(iota_50 , chi_50 ) * 1e-3,
}
# END parent_dage_prior
)
prior_table.append(
# BEGIN child_value_prior
{ 'name': 'child_value_prior',
'density': 'gaussian',
'mean': 0.0,
'std': 1.0,
}
# END child_value_prior
)
prior_table.append(
# BEGIN alpha_value_prior
{ 'name': 'alpha_value_prior',
'density': 'gaussian',
'lower': - 10 * alpha_true_max_abs,
'upper': + 10 * alpha_true_max_abs,
'std' : 10 * alpha_true_max_abs,
'mean': 0.0,
}
# END alpha_value_prior
)
#
# smooth_table
smooth_table = list()
#
# parent_iota_smooth
fun = lambda a, t : ('parent_iota_value_prior', 'parent_dage_prior', None)
smooth_table.append({
'name': 'parent_iota_smooth',
'age_id': range( len(age_grid) ),
'time_id': [0],
'fun': fun,
})
#
# parent_chi_smooth
fun = lambda a, t : ('parent_chi_value_prior', 'parent_dage_prior', None)
smooth_table.append({
'name': 'parent_chi_smooth',
'age_id': range(1, len(age_grid) ),
'time_id': [0],
'fun': fun,
})
#
# child_smooth
fun = lambda a, t : ('child_value_prior', None, None)
smooth_table.append({
'name': 'child_smooth',
'age_id': [0],
'time_id': [0],
'fun': fun,
})
#
# alpha_smooth
fun = lambda a, t : ('alpha_value_prior', None, None)
smooth_table.append({
'name': 'alpha_smooth',
'age_id': [0],
'time_id': [0],
'fun': fun,
})
#
# node_table
node_table = [
{ 'name':'n0', 'parent':'' },
{ 'name':'n1', 'parent':'n0' },
{ 'name':'n2', 'parent':'n0' },
]
#
# rate_table
rate_table = [ {
'name': 'iota',
'parent_smooth': 'parent_iota_smooth',
'child_smooth': 'child_smooth' ,
},{
'name': 'chi',
'parent_smooth': 'parent_chi_smooth',
'child_smooth': 'child_smooth' ,
} ]
#
# covariate_table
covariate_table = [
{ 'name':'income', 'reference':avg_income['n0'] },
]
#
# mulcov_table
mulcov_table = [
{ # alpha_iota
'covariate': 'income',
'type': 'rate_value',
'effected': 'iota',
'group': 'world',
'smooth': 'alpha_smooth',
},{ # alpha_chi
'covariate': 'income',
'type': 'rate_value',
'effected': 'chi',
'group': 'world',
'smooth': 'alpha_smooth',
},
]
#
# subgroup_table
subgroup_table = [ {'subgroup': 'world', 'group':'world'} ]
#
# integrand_table
integrand_table = [
{'name': 'Sincidence'},
{'name': 'mtexcess' },
{'name': 'prevalence'},
]
for mulcov_id in range( len(mulcov_table) ) :
integrand_table.append( { 'name': f'mulcov_{mulcov_id}' } )
#
# avgint_table
avgint_table = list()
row = {
'node': 'n0',
'subgroup': 'world',
'weight': '',
'time_lower': 2000.0,
'time_upper': 2000.0,
'income': None,
}
for age in age_grid :
row['age_lower'] = age
row['age_upper'] = age
for integrand in [ 'prevalence' ] :
row['integrand'] = integrand
avgint_table.append( copy.copy(row) )
#
# data_table
data_table = list()
leaf_set = [ 'n1', 'n2' ]
for node in leaf_set :
row = {
'subgroup': 'world',
'weight': '',
'time_lower': 2000.0,
'time_upper': 2000.0,
'density': 'log_gaussian',
'hold_out': False,
}
row_list = list()
max_meas_value = {
'mtexcess': 0.0, 'Sincidence': 0.0, 'prevalence': 0.0
}
for (age_id, age) in enumerate( age_grid ) :
for income in income_grid[node] :
row['node'] = node
row['age_lower'] = age
row['age_upper'] = age
row['income'] = income
for integrand in max_meas_value :
meas_value = average_integrand(
integrand, age, node, income
)
row['integrand'] = integrand
row['meas_value'] = meas_value
max_meas_value[integrand] = max(
meas_value, max_meas_value[integrand]
)
row_list.append( copy.copy(row) )
n_row = len(age_grid) * n_income_grid * len(max_meas_value)
assert len(row_list) == n_row
for row in row_list :
# The model for the measurement noise is small so a few
# data points act like lots of real data points.
# The actual measruement noise is zero.
for integrand in max_meas_value :
if row['integrand'] == integrand :
row['meas_std'] = max_meas_value[integrand] / 50.0
row['eta'] = 1e-4 * max_meas_value[integrand]
#
data_table += row_list
#
# time_grid
time_grid = [ 2000.0 ]
#
# weight table:
weight_table = list()
#
# nslist_table
nslist_table = dict()
#
# option_table
option_table = [
{ 'name':'parent_node_name', 'value':'n0'},
{ 'name':'rate_case', 'value':'iota_pos_rho_zero'},
{ 'name': 'zero_sum_child_rate', 'value':'iota chi'},
{ 'name':'quasi_fixed', 'value':'false'},
{ 'name':'max_num_iter_fixed', 'value':'50'},
{ 'name':'max_num_iter_random', 'value':'200'},
{ 'name':'tolerance_fixed', 'value':'1e-8'},
{ 'name':'tolerance_random', 'value':'1e-8'},
{ 'name':'hold_out_integrand', 'value':'mtexcess'},
{ 'name':'random_seed', 'value':str(random_seed)},
]
# ----------------------------------------------------------------------
# create database
dismod_at.create_database(
file_name,
age_grid,
time_grid,
integrand_table,
node_table,
subgroup_table,
weight_table,
covariate_table,
avgint_table,
data_table,
prior_table,
smooth_table,
nslist_table,
rate_table,
mulcov_table,
option_table
)
# ---------------------------------------------------------------------------
def display_no_ode_fit(root_node_dir) :
#
# database
database = root_node_dir + '/no_ode/dismod.db'
#
# pdf_file
pdf_dir = root_node_dir + '/no_ode'
#
# data.pdf
integrand_list = [ 'Sincidence', 'mtexcess', 'prevalence' ]
pdf_file = pdf_dir + '/data.pdf'
dismod_at.plot_data_fit(
database = database ,
pdf_file = pdf_file ,
integrand_list = integrand_list ,
)
#
# rate.pdf
rate_set = { 'iota', 'chi' }
pdf_file = pdf_dir + '/rate.pdf'
plot_title = 'rate'
dismod_at.plot_rate_fit(database, pdf_file, plot_title, rate_set)
#
# db2csv
dismod_at.db2csv_command( database )
# ----------------------------------------------------------------------------
def check_no_ode_fit(root_database) :
#
# root_node_dir
root_node_dir = 'n0'
index = root_database.rfind('/')
if 0 <= index :
root_node_dir = root_database[0 : index] + '/n0'
#
# no_ode_database
no_ode_database = root_node_dir + '/no_ode/dismod.db'
#
# predict
dismod_at.system_command_prc([
'dismod_at', no_ode_database, 'predict', 'fit_var'
] )
#
# table
fit_or_root = at_cascade.fit_or_root_class(
no_ode_database, root_database
)
table = dict()
for tbl_name in [
'age', 'time', 'var', 'rate', 'node', 'fit_var', 'avgint',
'integrand', 'predict'
] :
table[tbl_name] = fit_or_root.get_table(tbl_name)
fit_or_root.close()
#
# check variable values
for (var_id, var_row) in enumerate( table['var'] ) :
#
# fit_var_value
fit_var_value = table['fit_var'][var_id]['fit_var_value']
#
# var_type
var_type = var_row['var_type']
#
# rate_name
rate_id = var_row['rate_id']
rate_name = table['rate'][rate_id]['rate_name']
#
if var_type == 'rate' :
#
# node_name
node_id = var_row['node_id']
node_name = table['node'][node_id]['node_name']
#
# c_0
c_0 = [ avg_income['n0'] ]
#
# rate_true_n0
age_id = var_row['age_id']
time_id = var_row['time_id']
age = table['age'][age_id]['age']
time = table['time'][time_id]['time']
rate_true_n0 = rate_true(rate_name, age, time, 'n0', c_0)
#
if node_name == 'n0' :
# fixed effect
rel_error = (1.0 - fit_var_value / rate_true_n0 )
if rate_name == 'omega' :
assert rel_error < 1e-12
else :
assert abs( rel_error ) < 1e-5
elif rate_name in [ 'iota', 'chi'] :
# These random effects are constant w.r.t age and time
rate_true_n = rate_true(rate_name, age, time, node_name, c_0)
effect_true = math.log( rate_true_n / rate_true_n0 )
rel_error = (1.0 - fit_var_value / effect_true )
assert abs( rel_error ) < 1e-4
else :
# These random effects are constraints
c_n = [ avg_income[node_name] ]
rate_true_n = rate_true(rate_name, age, time, node_name, c_n)
effect_true = math.log( rate_true_n / rate_true_n0 )
rel_error = (1.0 - fit_var_value / effect_true )
assert abs( rel_error ) < 1e-12
else :
assert var_type == 'mulcov_rate_value'
rel_error = (1.0 - fit_var_value / alpha_true[rate_name])
assert abs( rel_error ) < 1e-4
# ----------------------------------------------------------------------------
# main
# ----------------------------------------------------------------------------
def main() :
# -------------------------------------------------------------------------
# result_dir
result_dir = 'build/example'
at_cascade.empty_directory(result_dir)
#
# Create root.db
root_database = f'{result_dir}/root.db'
root_node_db(root_database)
#
# omega_grid
omega_grid = dict()
omega_grid['age'] = range( len(age_grid) )
omega_grid['time'] = [ 0 ]
#
# mulcov_freeze_table
mulcov_freeze_table = [
{ 'fit_node_name' : 'n0', 'split_reference_id' : None, 'mulcov_id' : 0 },
{ 'fit_node_name' : 'n0', 'split_reference_id' : None, 'mulcov_id' : 1 },
]
#
# omega_all_data
omega_all_data = dict()
for node_name in [ 'n0', 'n1', 'n2' ] :
omega_list = list()
income = avg_income[node_name]
for age_id in omega_grid['age'] :
age = age_grid[age_id]
time = 2000.0
integrand_name = 'mtother'
mtother = \
average_integrand(integrand_name, age, node_name, income)
omega_list.append(mtother)
omega_all_data[node_name] = [ omega_list ]
#
# Create all_node.db
all_node_database = f'{result_dir}/all_node.db'
option_all = {
'result_dir' : result_dir,
'root_node_name' : 'n0',
'root_database' : root_database,
}
at_cascade.create_all_node_db(
all_node_database = all_node_database,
option_all = option_all,
omega_grid = omega_grid,
omega_data = omega_all_data,
mulcov_freeze_table = mulcov_freeze_table,
)
#
# root_node_dir
root_node_dir = f'{result_dir}/n0'
os.mkdir(root_node_dir)
#
# avgint_table
# This also erases the avgint table from root_database
avgint_table = at_cascade.extract_avgint( root_database )
#
# cascade starting at root node
at_cascade.cascade_root_node(
all_node_database = all_node_database ,
fit_goal_set = fit_goal_set,
no_ode_fit = True,
)
#
# check_no_ode_fit
check_no_ode_fit(root_database)
#
# display_no_ode_fit
display_no_ode_fit(root_node_dir)
#
# check results
for goal_dir in [ 'n0/n1', 'n0/n2' ] :
goal_database = f'{result_dir}/{goal_dir}/dismod.db'
at_cascade.check_cascade_node(
rate_true = rate_true,
all_node_database = all_node_database,
fit_database = goal_database,
avgint_table = avgint_table,
relative_tolerance = 1e-2,
)
#
# var_table, rate_table, smooth_grid_table
fit_database = f'{result_dir}/n0/dismod.db'
connection = dismod_at.create_connection(
fit_database, new = False, readonly = True
)
var_table = dismod_at.get_table_dict(connection, 'var')
rate_table = dismod_at.get_table_dict(connection, 'rate')
prior_table = dismod_at.get_table_dict(connection, 'prior')
smooth_grid_table = dismod_at.get_table_dict(connection, 'smooth_grid')
connection.close()
#
# smooth_id
smooth_id = None
assert rate_table[1]['rate_name'] == 'iota'
for row in var_table :
if row['var_type'] == 'mulcov_rate_value' and row['rate_id'] == 1 :
assert smooth_id == None
smooth_id = row['smooth_id']
assert smooth_id != None
#
# prior_id
value_prior_id = None
for row in smooth_grid_table :
if row['smooth_id'] == smooth_id :
assert value_prior_id == None
value_prior_id = row['value_prior_id']
#
# check that lower, upper, and std are the same and mean is different
row = prior_table[value_prior_id]
assert float(row['lower']) == -10 * alpha_true_max_abs
assert float(row['upper']) == +10 * alpha_true_max_abs
assert float(row['std']) == 10 * alpha_true_max_abs
assert float(row['mean']) != 0.0
#
if __name__ == '__main__' :
main()
print('no_ode_xam: OK')