\(\newcommand{\B}[1]{ {\bf #1} }\) \(\newcommand{\R}[1]{ {\rm #1} }\)
mulcov_freeze.py¶
View page sourcemulcov_freeze: Python Source Code¶
# ----------------------------------------------------------------------------
# imports
# ----------------------------------------------------------------------------
import math
import sys
import os
import copy
import time
import csv
import random
import shutil
import dismod_at
from math import exp
#
# import at_cascade with a preference current directory version
current_directory = os.getcwd()
if os.path.isfile( current_directory + '/at_cascade/__init__.py' ) :
sys.path.insert(0, current_directory)
import at_cascade
# -----------------------------------------------------------------------------
# global variables
# -----------------------------------------------------------------------------
# BEGIN fit_goal_set
fit_goal_set = { 'n3', 'n4', 'n5', 'n6' }
# END fit_goal_set
#
# BEGIN random_seed
random_seed = 0
if random_seed == 0 :
random_seed = int( time.time() )
random.seed(random_seed)
print('mulcov_freeze: random_seed = ', random_seed)
# END random_seed
#
# BEGIN alpha_true
alpha_true = - 0.2
# END alpha_true
#
# BEGIN mulcov_freeze_table
mulcov_freeze_table = [
{ 'fit_node_id' : 1, 'split_reference_id' : None, 'mulcov_id' : 0 },
]
# END mulcov_freeze_table
#
# BEGIN avg_income
avg_income = { 'n3':1.0, 'n4':2.0, 'n5':3.0, 'n6':4.0 }
avg_income['n2'] = ( avg_income['n5'] + avg_income['n6'] ) / 2.0
avg_income['n1'] = ( avg_income['n3'] + avg_income['n4'] ) / 2.0
avg_income['n0'] = ( avg_income['n1'] + avg_income['n2'] ) / 2.0
# END avg_income
#
# BEGIN sum_random_effect
size_level1 = 0.2
size_level2 = 0.2
sum_random = { 'n0': 0.0, 'n1': size_level1, 'n2': -size_level1 }
sum_random['n3'] = sum_random['n1'] + size_level2;
sum_random['n4'] = sum_random['n1'] - size_level2;
sum_random['n5'] = sum_random['n2'] + size_level2;
sum_random['n6'] = sum_random['n2'] - size_level2;
# END sum_random_effect
#
# BEGIN age_grid
age_grid = [0.0, 20.0, 40.0, 60.0, 80.0, 100.0 ]
# END age_grid
#
# BEGIN income_grid
random_income = False
income_grid = dict()
for node in [ 'n3', 'n4', 'n5', 'n6' ] :
max_income = 2.0 * avg_income[node]
if random_income :
n_income_grid = 10
income_grid[node] = \
[ random.uniform(0.0, max_income) for j in range(n_income_grid) ]
income_grid[node] = sorted( income_grid[node] )
else :
n_income_grid = 3
d_income_grid = max_income / (n_income_grid - 1)
income_grid[node] = [ j * d_income_grid for j in range(n_income_grid) ]
# END income_grid
# ----------------------------------------------------------------------------
# functions
# ----------------------------------------------------------------------------
# BEGIN rate_true
def rate_true(rate, a, t, n, c) :
income = c[0]
s_n = sum_random[n]
r_0 = avg_income['n0']
effect = s_n + alpha_true * ( income - r_0 )
if rate == 'iota' :
return (1 + a / 100) * 1e-2 * exp(effect)
return 0.0
# END rate_true
# ----------------------------------------------------------------------------
def root_node_db(file_name) :
#
# BEGIN iota_50
covariate_list = [ avg_income['n0'] ]
iota_50 = rate_true('iota', 50.0, None, 'n0', covariate_list)
# END iota_50
#
# prior_table
prior_table = list()
prior_table.append(
# BEGIN parent_value_prior
{ 'name': 'parent_value_prior',
'density': 'gaussian',
'lower': iota_50 / 10.0,
'upper': iota_50 * 10.0,
'mean': iota_50,
'std': iota_50 * 10.0,
'eta': iota_50 * 1e-3
}
# END parent_value_prior
)
prior_table.append(
# BEGIN parent_dage_prior
{ 'name': 'parent_dage_prior',
'density': 'log_gaussian',
'mean': 0.0,
'std': 3.0,
'eta': iota_50 * 1e-3,
}
# END parent_dage_prior
)
prior_table.append(
# BEGIN child_value_prior
{ 'name': 'child_value_prior',
'density': 'gaussian',
'mean': 0.0,
'std': 10.0,
}
# END child_value_prior
)
prior_table.append(
# BEGIN alpha_value_prior
{ 'name': 'alpha_value_prior',
'density': 'gaussian',
'lower': - 10 * abs(alpha_true),
'upper': + 10 * abs(alpha_true),
'std': + 10 * abs(alpha_true),
'mean': 0.0,
}
# END alpha_value_prior
)
#
# smooth_table
smooth_table = list()
#
# parent_smooth
fun = lambda a, t : ('parent_value_prior', 'parent_dage_prior', None)
smooth_table.append({
'name': 'parent_smooth',
'age_id': range( len(age_grid) ),
'time_id': [0],
'fun': fun,
})
#
# child_smooth
fun = lambda a, t : ('child_value_prior', None, None)
smooth_table.append({
'name': 'child_smooth',
'age_id': [0],
'time_id': [0],
'fun': fun,
})
#
# alpha_smooth
fun = lambda a, t : ('alpha_value_prior', None, None)
smooth_table.append({
'name': 'alpha_smooth',
'age_id': [0],
'time_id': [0],
'fun': fun,
})
#
# node_table
node_table = [
{ 'name':'n0', 'parent':'' },
{ 'name':'n1', 'parent':'n0' },
{ 'name':'n2', 'parent':'n0' },
{ 'name':'n3', 'parent':'n1' },
{ 'name':'n4', 'parent':'n1' },
{ 'name':'n5', 'parent':'n2' },
{ 'name':'n6', 'parent':'n2' },
]
#
# rate_table
rate_table = [ {
'name': 'iota',
'parent_smooth': 'parent_smooth',
'child_smooth': 'child_smooth' ,
} ]
#
# covariate_table
covariate_table = [ { 'name':'income', 'reference':avg_income['n0'] } ]
#
# mulcov_table
mulcov_table = [ {
# alpha
'covariate': 'income',
'type': 'rate_value',
'effected': 'iota',
'group': 'world',
'smooth': 'alpha_smooth',
} ]
#
# subgroup_table
subgroup_table = [ {'subgroup': 'world', 'group':'world'} ]
#
# integrand_table
integrand_table = [ {'name':'Sincidence'} ]
for mulcov_id in range( len(mulcov_table) ) :
integrand_table.append( { 'name': f'mulcov_{mulcov_id}' } )
#
# avgint_table
avgint_table = list()
row = {
'node': 'n0',
'subgroup': 'world',
'weight': '',
'time_lower': 2000.0,
'time_upper': 2000.0,
'income': None,
'integrand': 'Sincidence',
}
for age in age_grid :
row['age_lower'] = age
row['age_upper'] = age
avgint_table.append( copy.copy(row) )
#
# data_table
data_table = list()
leaf_set = { 'n3', 'n4', 'n5', 'n6' }
for (age_id, age) in enumerate( age_grid ) :
row = {
'subgroup': 'world',
'weight': '',
'time_lower': 2000.0,
'time_upper': 2000.0,
'integrand': 'Sincidence',
'density': 'gaussian',
'hold_out': False,
}
for node in leaf_set :
for income in income_grid[node] :
meas_value = rate_true(
'iota', age, None, node, [ income ]
)
row['node'] = node
row['meas_value'] = meas_value
row['age_lower'] = age
row['age_upper'] = age
row['income'] = income
# The model for the measurement noise is small so a few
# data points act like lots of real data points.
# The actual measruement noise is zero.
row['meas_std'] = meas_value / 10.0
data_table.append( copy.copy(row) )
#
# time_grid
time_grid = [ 2000.0 ]
#
# weight table:
weight_table = list()
#
# nslist_table
nslist_table = dict()
#
# option_table
option_table = [
{ 'name':'parent_node_name', 'value':'n0'},
{ 'name':'rate_case', 'value':'iota_pos_rho_zero'},
{ 'name': 'zero_sum_child_rate', 'value':'iota'},
{ 'name':'quasi_fixed', 'value':'false'},
{ 'name':'max_num_iter_fixed', 'value':'50'},
{ 'name':'tolerance_fixed', 'value':'1e-8'},
{ 'name':'random_seed', 'value':str(random_seed)},
]
# ----------------------------------------------------------------------
# create database
dismod_at.create_database(
file_name,
age_grid,
time_grid,
integrand_table,
node_table,
subgroup_table,
weight_table,
covariate_table,
avgint_table,
data_table,
prior_table,
smooth_table,
nslist_table,
rate_table,
mulcov_table,
option_table
)
# ----------------------------------------------------------------------------
# main
# ----------------------------------------------------------------------------
def main() :
# -------------------------------------------------------------------------
# result_dir
result_dir = 'build/example'
at_cascade.empty_directory(result_dir)
#
# Create root.db
root_database = f'{result_dir}/root.db'
root_node_db(root_database)
#
# Create all_node.db
all_node_database = f'{result_dir}/all_node.db'
option_all = {
'result_dir': result_dir,
'root_node_name': 'n0',
'root_database': root_database,
}
at_cascade.create_all_node_db(
all_node_database = all_node_database,
option_all = option_all,
mulcov_freeze_table = mulcov_freeze_table,
)
#
# root_node_dir
root_node_dir = f'{result_dir}/n0'
os.mkdir(root_node_dir)
#
# avgint_table
# This also erases the avgint table from root_database
avgint_table = at_cascade.extract_avgint( root_database )
#
# cascade starting at root node
at_cascade.cascade_root_node(
all_node_database = all_node_database ,
fit_goal_set = fit_goal_set ,
)
#
# check results
for goal_dir in [ 'n0/n1/n3', 'n0/n1/n4', 'n0/n2/n5', 'n0/n2/n6' ] :
goal_database = f'{result_dir}/{goal_dir}/dismod.db'
at_cascade.check_cascade_node(
rate_true = rate_true,
all_node_database = all_node_database,
fit_database = goal_database,
avgint_table = avgint_table,
relative_tolerance = 2e-3,
)
#
# alpha_n1
database = f'{result_dir}/n0/n1/dismod.db'
connection = dismod_at.create_connection(
database, new = False, readonly = True
)
var_table = dismod_at.get_table_dict(connection, 'var')
fit_var_table = dismod_at.get_table_dict(connection, 'fit_var')
connection.close()
for (var_id, var_row) in enumerate(var_table) :
if var_row['var_type'] == 'mulcov_rate_value' :
alpha_n1 = fit_var_table[var_id]['fit_var_value']
#
# fit_dir
for fit_dir in [
'n0', 'n0/n2', 'n0/n1/n3', 'n0/n1/n4', 'n0/n2/n5', 'n0/n2/n6'
] :
#
# alpha
database = f'{result_dir}/{fit_dir}/dismod.db'
connection = dismod_at.create_connection(
database, new = False, readonly = True
)
var_table = dismod_at.get_table_dict(connection, 'var')
fit_var_table = dismod_at.get_table_dict(connection, 'fit_var')
connection.close()
for (var_id, var_row) in enumerate(var_table) :
if var_row['var_type'] == 'mulcov_rate_value' :
alpha = fit_var_table[var_id]['fit_var_value']
#
eps = sys.float_info.epsilon
if '/n1/' in fit_dir :
assert abs( 1.0 - alpha / alpha_n1 ) < 20.0 * eps
else :
assert abs( 1.0 - alpha / alpha_n1 ) > 20.0 * eps
#
if __name__ == '__main__' :
main()
print('mulcov_freeze: OK')