csv.diphtheria

View page source

Csv Example Simulating, Fitting and Predicting Diphtheria

Prior Versus Posterior

This example does a check of priors versus posteriors; see the following function at the end:

# check_sam_predict
#
# Check that the prior samples of Sincendence have larger standard deviation
# than the posterior samples.
#
# prefix
# This is 'fit' for the fit predictions; i.e., the predictions corresponding
# to the optimal value for the fit.
# This is 'sam' for the sample predictions; i.e., the predictions corresponding
# to the posterior distribution for the fit.
#
# pjob
# We use the notation pjob for a prediction job; i.e.,
# the (node, sex) tuple that we are predictong for.
#
# fjob
# We use the notation fjob for a fit job; i.e.,
# the (node, sex) tuple for the fit that is doing the prediction.
#
# at
# We use the notation at for the (age, time) tuple that
# the prediction is for.
#
def check_sam_predict(fit_dir) :

Global Variables

Global variables besides sim_file and fit_file. Variables that do not appear in a heading are temporaries.

child_prior_std_factor

The factor that multiplies the prior standard deviations:

child_prior_std_factor = 2.0

ode_step_size

The step size used to approximate the ode solution and to average integrals with respect to age and time.

ode_step_size = 5.0

integrand_list

Generate data and predict for the following integrands:

integrand_list = [ 'mtexcess', 'mtspecific', 'mtwith', 'Sincidence' ]

age_grid, time_grid

Use this age-time grid for values the covariate grid and the parent rage grid.

#
age_grid = dict()
age_grid['iota'] = [ 0.0, 100.00 ]
age_grid['chi']  = [ 0.0 ]
age_grid['rho']  = [ 0.0 ]
age_grid['all']  = sorted( set(
    [100.0] + age_grid['iota'] + age_grid['chi'] + age_grid['rho']
) )
#
time_grid = dict()
time_grid['iota'] = [ 1980.0, 2025.0 ]
time_grid['chi']  = [ 1980.0 ]
time_grid['rho']  = [ 1980.0 ]
time_grid['all']  = sorted( set(
    [2025.0] + time_grid['iota'] + time_grid['chi'] + time_grid['rho']
) )

node_dict

Keys are nodes, values are corresponding parent node:

node_dict = {
    'n0' : ''   ,
    'n1' : 'n0' ,
    'n2' : 'n0' ,
}

no_effect_rate_truth

The true values (values used during simulation) for iota, rho, and chi are constant w.r.t age and time:

no_effect_rate_truth = {
    'iota' : 1e-6  ,
    'rho'  : 13.0 ,
    'chi'  : 1e-1 ,
}

omega_truth

omega_truth      = 0.02

std_random_effects_truth

This is the true standard deviation of the random effects

std_random_effects_truth = 1.0

dtp3_multiplier_truth

This factor multiples the dtp3 covariate to get the reduction in diphtheria incidence due to the vaccine:

dtp3_multiplier_truth = -3.0

sim_file

Input CSV files that are placed in the simulate directory:

sim_file = dict()

option_sim.csv

sim_file['option_sim.csv'] =  \
    'name,value\n' + \
    'float_precision,5\n' + \
    'random_depend_sex,false\n' + \
    f'integrand_step_size,{ode_step_size}\n' + \
    f'std_random_effects_iota,{std_random_effects_truth}\n'

node.csv

sim_file['node.csv'] = \
'node_name,parent_name\n'
for node_name in node_dict :
    parent_name = node_dict[node_name]
    sim_file['node.csv'] += f'{node_name},{parent_name}\n'

covariate.csv

The covariate dtp3 is the fraction of the population that received the diphtheria-tetanus-pertussis vaccine during their first year of life.

sim_file['covariate.csv'] = 'node_name,sex,age,time,omega,dtp3\n'
for node_name in node_dict :
    for sex in [ 'female', 'male' ] :
        for age in age_grid['all' ] :
            for time in time_grid['all'] :
                r     = (time - 2020.0) / 100.0
                dtp3  = 0.9 * ( 1.0 - r * r )
                row   = f'{node_name},{sex},{age},{time},{omega_truth},{dtp3}\n'
                sim_file['covariate.csv'] += row

multiplier_sim.csv

There are is one covariate multiplier in this example.

sim_file['multiplier_sim.csv'] = \
    'multiplier_id,rate_name,covariate_or_sex,multiplier_truth\n' + \
    f'0,iota,dtp3,{dtp3_multiplier_truth}'

simulate.csv

header  = 'simulate_id,integrand_name,node_name,sex,age_lower,age_upper,'
header += 'time_lower,time_upper,meas_std_cv,meas_std_min\n'
meas_std_cv     = 1.0
std_min         = 1e-6
simulate_id     = 0
sim_file['simulate.csv'] = header
for integrand_name in integrand_list :
    for node_name in node_dict :
        for sex in [ 'female', 'male' ] :
            for age in age_grid['all'] :
                for time in time_grid['all'] :
                    row  = f'{simulate_id},{integrand_name},{node_name},{sex},'
                    row += f'{age},{age},{time},{time},'
                    row += f'{meas_std_cv},{std_min}\n'
                    sim_file['simulate.csv'] += row
                    simulate_id += 1

no_effect_rate.csv

The no effect rates are constant, w.r.t age and time, during the simulation.

sim_file['no_effect_rate.csv'] = 'rate_name,age,time,rate_truth\n'
for rate_name in no_effect_rate_truth :
    rate_truth = no_effect_rate_truth[rate_name]
    sim_file['no_effect_rate.csv'] += f'{rate_name},0.0,0.0,{rate_truth}\n'

fit_file

Input CSV files that are placed in the fit directory:

fit_file = dict()

Copies of Simulation Files

fit_file['node.csv']      = sim_file['node.csv']
fit_file['covariate.csv'] = sim_file['covariate.csv']

option_fit.csv

  1. We use censor_asymptotic to make sure we do not get negative samples for iota and prevalence, which would cause the predictions to fail.

  2. We are completely ignoring the mtexcess data. It gets set to zero just before the fit, to test ignoring it.

  3. We are using a large number of samples so that the two realizations of the no effect iota have close sample standard deviation.

fit_file['option_fit.csv']  =  \
"""name,value
max_fit,500
max_fit_parent,1000
sample_method,censor_asymptotic
number_sample,1000
max_num_iter_fixed,50
root_node_name,n0
refit_split,false
max_abs_effect,4
quasi_fixed,false
ode_method,iota_pos_rho_pos
balance_sex,false
freeze_type,mean
child_prior_std_factor_mulcov,1
tolerance_fixed,1e-10
no_ode_ignore,mtexcess
hold_out_integrand,mtexcess
"""
fit_file['option_fit.csv'] += \
    f'child_prior_std_factor,{child_prior_std_factor}\n'
fit_file['option_fit.csv'] += \
    f'ode_step_size,{ode_step_size}\n'

option_predict.csv

A predict is run using the same directory as the corresponding fit. All of its input files are also inputs for the fit except for the option_predict.csv file.

fit_file['option_predict.csv']  =  \
"""name,value
db2csv,true
plot,true
float_precision,5
"""

fit_goal.csv

An empty fit_goal.csv corresponds to fitting n0 an n1 (skipping n2):

fit_file['fit_goal.csv'] = \
"""node_name
n1
"""

prior.csv

We are using a uniform for the random effects because we have good data, know the true random effects, and are trying to reproduce it with the fit. (Often one uses a Gaussian prior on the random effects.) We are using a Gaussian for the fixed effects. If we used a uniform for the fixed effects, the child prior standard deviations would not matter. Note that the stand deviation at the root level is 1.0 (which is very large relative to the true rate values).

delta_prior_std        = 0.2
fit_file['prior.csv']  = \
    'name,density,mean,std,eta,lower,upper\n' + \
    f'delta_prior,log_gaussian,0.0,{delta_prior_std},1e-10,,\n' + \
    f'random_effects_prior,uniform,0.0,,,,,\n'
for rate_name in no_effect_rate_truth :
    rate_truth = no_effect_rate_truth[rate_name]
    if rate_name == 'rho' :
        lower = rate_truth
        upper = rate_truth
    else :
        lower      = rate_truth / 10.0
        upper      = rate_truth * 10.0
    fit_file['prior.csv'] += \
        f'prior_{rate_name},gaussian,{rate_truth},1.0,,{lower},{upper}\n'

parent_rate.csv

The rates are constant during simulation, but not during fitting.

fit_file['parent_rate.csv'] = \
    'rate_name,age,time,value_prior,dage_prior,dtime_prior,const_value\n'
for rate_name in no_effect_rate_truth :
    for age in age_grid[rate_name] :
        for time in time_grid[rate_name] :
            row  = f'{rate_name},{age},{time},prior_{rate_name},'
            row += 'delta_prior,delta_prior,\n'
            fit_file['parent_rate.csv'] += row

child_rate.csv

fit_file['child_rate.csv'] = \
"""rate_name,value_prior
iota,random_effects_prior
"""

mulcov.csv

fit_file['mulcov.csv'] = \
    'covariate,type,effected,value_prior,const_value\n' + \
    f'dtp3,rate_value,iota,,{dtp3_multiplier_truth}\n'

predict_integrand.csv

fit_file['predict_integrand.csv'] = 'integrand_name\n'
for integrand_name in integrand_list :
    fit_file['predict_integrand.csv'] += f'{integrand_name}\n'

Rest of Source Code

def get_dtp3_reference(node, sex) :
    #
    # file_name
    if node == 'n0' :
        assert sex == 'both'
        file_name = f'{fit_dir}/n0/dismod.db'
    else :
        assert sex in [ 'female', 'male' ]
        file_name = f'{fit_dir}/n0/{sex}/n1/dismod.db'
    #
    # connection
    connection = dismod_at.create_connection(
        file_name, new = False, readonly = True
    )
    #
    # covariate_table
    covariate_table = dismod_at.get_table_dict(
        connection, tbl_name = 'covariate'
    )
    #
    # connection
    connection.close()
    #
    # reference
    reference = None
    for row in covariate_table :
        if row['covariate_name'] == 'dtp3' :
            reference = row['reference']
    assert reference != None
    #
    return reference
# ---------------------------------------------------------------------------
def sim(sim_dir ) :
    #
    # write input csv files
    for name in sim_file :
        file_name = f'{sim_dir}/{name}'
        file_ptr  = open(file_name, 'w')
        file_ptr.write( sim_file[name] )
        file_ptr.close()
    #
    # csv.simulate
    at_cascade.csv.simulate(sim_dir)
    #
    # data_join.csv
    at_cascade.csv.join_file(
        left_file   = f'{sim_dir}/simulate.csv' ,
        right_file  = f'{sim_dir}/data_sim.csv' ,
        result_file = f'{sim_dir}/data_join.csv'     ,
    )
# ---------------------------------------------------------------------------
def fit(sim_dir, fit_dir) :
    #
    # csv files in fit_file
    for name in fit_file :
        file_name = f'{fit_dir}/{name}'
        file_ptr  = open(file_name, 'w')
        file_ptr.write( fit_file[name] )
        file_ptr.close()
    #
    # fit_goal_set
    fit_goal_table = at_cascade.csv.read_table(
        file_name = f'{fit_dir}/fit_goal.csv'
    )
    fit_goal_set = set()
    for row in fit_goal_table :
        node_name = row['node_name']
        for sex in [ 'female', 'male' ] :
            fit_goal_set.add( (node_name, sex) )
    #
    # data_join_table
    # This is a join of simulate.csv and dats_sim.csv
    data_join_table = at_cascade.csv.read_table(
        file_name = f'{sim_dir}/data_join.csv'
    )
    #
    # copy_row
    # columns that are just copied from data_join_table to data_in_table
    copy_column  = [ 'integrand_name', 'node_name', 'sex' ]
    copy_column += [ 'age_lower', 'age_upper', 'time_lower', 'time_upper' ]
    copy_column += [ 'meas_std']
    #
    # data_in_table
    data_in_table = list()
    for row_join in data_join_table :
        #
        # row_in
        row_in            = dict()
        for key in copy_column :
            row_in[key] = row_join[key]
        #
        # row_in
        # All the mtexcess data is ignored; see option_fit.csv table.
        # The model for mtspecific is not numerically stable near zero prevalence.
        row_in['data_id']       = row_join['simulate_id']
        row_in['density_name']  = 'gaussian'
        row_in['hold_out']      = 0
        if row_join['integrand_name'] == 'mtexcess' :
            row_in['meas_value']    = 0
        else :
            row_in['meas_value']    = row_join['meas_mean']
        if float( row_join['age_upper'] ) <= ode_step_size :
            if row_join['integrand_name'] == 'mtspecific' :
                row_in['hold_out'] = 1
        #
        # data_in_table
        data_in_table.append( row_in )
    #
    # data_in.csv
    at_cascade.csv.write_table(
        file_name = f'{fit_dir}/data_in.csv' ,
        table     = data_in_table            ,
    )
    #
    # fit
    at_cascade.csv.fit(fit_dir)
# ---------------------------------------------------------------------------
# check_variable_csv
#
# Check that the fit values for all the variables are close to the truth
# used when the data was simulated.
def check_variable_csv(fit_dir) :
    file_name      = f'{fit_dir}/n0/variable.csv'
    variable_table = at_cascade.csv.read_table(file_name)
    for row in variable_table :
        fit_value = float( row['fit_value'] )
        truth     = float( row['truth'] )
        if truth == 0.0 :
            assert fit_value == 0.0
        else :
            # This test failed before 2025-07-07 because the covariate reference
            # used for the random effects in set_truth.py were not correct.
            relerr = ( 1.0 - fit_value / truth )
            assert abs(relerr) < 1e-1

# ---------------------------------------------------------------------------
# check_fit_predict
#
# Check that the predictions corresponding to the fit are close to the
# predictions corresponding the truth used to simulate the data.
def check_fit_predict(fit_dir) :
    #
    # predict_table
    predict_table = dict()
    for prefix in [ 'fit', 'tru' ] :
        file_name = f'{fit_dir}/{prefix}_predict.csv'
        predict_table[prefix] = at_cascade.csv.read_table(file_name)
    #
    # predict_table
    key = lambda row : (
        int( row['avgint_id'] ) ,
        row['node_name'] ,
        row['sex'] ,
        row['fit_node_name'] ,
        row['fit_sex']
    )
    for prefix in [ 'fit', 'tru' ] :
        predict_table[prefix] = sorted(predict_table[prefix], key=key)
    #
    # i
    for i in range( len(predict_table['tru'] ) ) :
        #
        # tru_value, fit_value
        tru_row        = predict_table['tru'][i]
        fit_row        = predict_table['fit'][i]
        #
        assert int(tru_row['avgint_id']) == int(fit_row['avgint_id'])
        assert tru_row['integrand_name'] == fit_row['integrand_name']
        #
        tru_value      = float( tru_row['avg_integrand'] )
        fit_value      = float( fit_row['avg_integrand'] )
        #
        # check fit_value
        if tru_value == 0.0 :
            assert fit_value == 0.0
        else :
            relerr = 1.0 - fit_value / tru_value
            assert abs(relerr) < 1e-1
    #
# ---------------------------------------------------------------------------
# BEGIN_CHECK_SAM_PREDICT
# check_sam_predict
#
# Check that the prior samples of Sincendence have larger standard deviation
# than the posterior samples.
#
# prefix
# This is 'fit' for the fit predictions; i.e., the predictions corresponding
# to the optimal value for the fit.
# This is 'sam' for the sample predictions; i.e., the predictions corresponding
# to the posterior distribution for the fit.
#
# pjob
# We use the notation pjob for a prediction job; i.e.,
# the (node, sex) tuple that we are predictong for.
#
# fjob
# We use the notation fjob for a fit job; i.e.,
# the (node, sex) tuple for the fit that is doing the prediction.
#
# at
# We use the notation at for the (age, time) tuple that
# the prediction is for.
#
def check_sam_predict(fit_dir) :
# END_CHECK_SAM_PREDICT
    #
    # predict_table
    predict_table = dict()
    for prefix in [ 'fit', 'sam' ] :
        file_name             = f'{fit_dir}/{prefix}_predict.csv'
        predict_table[prefix] = at_cascade.csv.read_table(file_name)
    #
    # prefix_pjob_fjob_at
    prefix_pjob_fjob_at = dict()
    for prefix in [ 'fit', 'sam' ] :
        prefix_pjob_fjob_at[prefix] = dict()
        #
        for row in predict_table[prefix] :
            if row['integrand_name'] == 'Sincidence' :
                #
                pjob = ( row['node_name'] , row['sex'] )
                if pjob not in prefix_pjob_fjob_at[prefix] :
                        prefix_pjob_fjob_at[prefix][pjob] = dict()
                #
                fjob = ( row['fit_node_name'] , row['fit_sex'] )
                if fjob not in prefix_pjob_fjob_at[prefix][pjob] :
                    prefix_pjob_fjob_at[prefix][pjob][fjob] = dict()
                #
                at            = ( float( row['age'] ), float( row['time'] ) )
                avg_integrand = float( row['avg_integrand'] )
                #
                if prefix == 'fit' :
                    assert at not in prefix_pjob_fjob_at[prefix][pjob][fjob]
                    prefix_pjob_fjob_at[prefix][pjob][fjob][at] = avg_integrand
                else :
                    if at not in prefix_pjob_fjob_at[prefix][pjob][fjob] :
                        prefix_pjob_fjob_at[prefix][pjob][fjob][at] = list()
                    this_list = prefix_pjob_fjob_at[prefix][pjob][fjob][at]
                    this_list.append( avg_integrand )
    #
    # std_pjob_fjob_at
    std_pjob_fjob_at = dict()
    for pjob in prefix_pjob_fjob_at['sam'] :
        std_pjob_fjob_at[pjob] = dict()
        for fjob in prefix_pjob_fjob_at['sam'][pjob] :
            std_pjob_fjob_at[pjob][fjob] = dict()
            for at in prefix_pjob_fjob_at['sam'][pjob][fjob] :
                mean    = prefix_pjob_fjob_at['fit'][pjob][fjob][at]
                samples = prefix_pjob_fjob_at['sam'][pjob][fjob][at]
                std     = numpy.std(samples, mean = mean )
                #
                std_pjob_fjob_at[pjob][fjob][at] = std
    #
    # dtp3_pjob_fjob_at
    dtp3_pjob_fjob_at = dict()
    for row in predict_table['fit'] :
        if row['integrand_name'] == 'Sincidence' :
            #
            pjob = ( row['node_name'] , row['sex'] )
            if pjob not in dtp3_pjob_fjob_at :
                    dtp3_pjob_fjob_at[pjob] = dict()
            #
            fjob = ( row['fit_node_name'] , row['fit_sex'] )
            if fjob not in dtp3_pjob_fjob_at[pjob] :
                dtp3_pjob_fjob_at[pjob][fjob] = dict()
            #
            at   = ( float( row['age'] ), float( row['time'] ) )
            assert at not in dtp3_pjob_fjob_at[pjob][fjob]
            #
            dtp3_pjob_fjob_at[pjob][fjob][at] = float( row['dtp3'] )
    #
    # compare prior and posterior std
    for sex in ['female', 'male' ] :
        pjob = ( 'n1', sex )
        for at in std_pjob_fjob_at[pjob][('n0', 'both')] :
            std_posterior  = std_pjob_fjob_at[pjob][ ('n1', sex ) ][at]
            std_prior      = std_pjob_fjob_at[pjob][ ('n0', 'both' ) ][at]
            assert std_posterior < std_prior
    #
    # compare no_effect prior calculated from predictions
    # and the prior for n1 in variable.csv
    for sex in [ 'female', 'male' ] :
        file_name      = f'{fit_dir}/n0/{sex}/n1/variable.csv'
        variable_table = at_cascade.csv.read_table(file_name)
        pjob           = ( 'n1', sex )
        fjob           = ( 'n0', 'both' )
        dtp3_reference = get_dtp3_reference( 'n1', sex )
        for row in variable_table :
            if row['var_type'] == 'rate' and row['rate'] == 'iota' :
                age     = float( row['age'] )
                time    = float( row['time'] )
                mean_v  = float( row['mean_v'] )
                std_v   = float( row['std_v'] )
                #
                at         = (age, time)
                dtp3       = dtp3_pjob_fjob_at[pjob][fjob][at] - dtp3_reference
                multiplier = numpy.exp( dtp3_multiplier_truth * dtp3 )
                #
                mean    = prefix_pjob_fjob_at['fit'][pjob][fjob][at] / multiplier
                std     = std_pjob_fjob_at[pjob][fjob][at] / multiplier
                #
                # mean should be very accurate
                assert abs( 1.0 - mean_v / mean ) < 1e-4
                #
                # std should not be accurate because it uses different samples
                # from the posterior distribution
                relerr = 1.0 - std_v / (std * child_prior_std_factor)
                assert abs( relerr ) < 0.1
# ---------------------------------------------------------------------------
if __name__ == '__main__' :
    #
    # sim_dir
    sim_dir = 'build/example/csv/sim'
    at_cascade.empty_directory(sim_dir)
    #
    # sim
    sim(sim_dir)
    #
    # fit_dir
    fit_dir = 'build/example/csv/fit'
    at_cascade.empty_directory(fit_dir)
    #
    # fit
    fit(sim_dir, fit_dir)
    #
    # predict
    at_cascade.csv.predict(fit_dir, sim_dir)
    #
    # check_variable_csv
    check_variable_csv(fit_dir)
    #
    # check_fit_predict
    check_fit_predict(fit_dir)
    #
    # check_sam_predict
    check_sam_predict(fit_dir)
    #
    print('diphtheria.py: OK')