## Analysis of pollution, climate and health data in Vitolo et al., Earth and Space Science (2018)

This is a short HOWTO describing the data analysis performed to learn the Bayesian network spanning the pollution, climate and health data in “Modelling Air Pollution, Climate and Health Data Using Bayesian Networks: a Case Study of the English Regions.” by Vitolo, Scutari, Ghalaieny, Tucker and Russell (Earth and Space Science, 2018). The code was initially prototyped by Claudia Vitolo and then parallelised and extended by Marco Scutari; it implements a parallel versions of the Structural EM algorithm for Bayesian network structure learning in the presence of missing data.

### The data

The data (codenamed MEHRA from “Multi-dimensional Environment-Health Risk Analysis”) contains almost 50 million observations to explore the interplay between environmental factors, exposure levels to outdoor air pollutants, and health outcomes in the English regions of the United Kingdom between 1981 and 2014. The CLGBN learned in the paper (yellow nodes are discrete, blue nodes are Gaussian, and yellow nodes are conditional linear Gaussian) is the following:

It comprises 24 variables describing the concentrations of various air pollutants (O3,
PM_{2.5}, PM_{10}, SO_{2}, NO_{2}, CO), geography (latitude,
longitude, latitude, region and zone type), climate (wind speed and direction, temperature,
rainfall, solar radiation), demography and mortality rates. Many of these variables contains
large amounts of missing values, corresponding to measurements that were not available in the
older years' data.

The data is available from:

Claudia Vitolo, Marco Scutari, Mohamed Ghalaieny, Allan Tucker and Andrew Russell. (2017). MEHRA data and model for the English regions [Data set]. Zenodo. http://doi.org/10.5281/zenodo.838571

### Learning the Bayesian network

First we load the data and the packages we will use to learn the model:
**bnlearn** for the Structural EM and **parallel**
to spread the computation across several CPUs.

> library(bnlearn) > library(parallel) > > training = readRDS("trainingESS.rds")

Other than the data, we need to inform structure learning about which arcs are illogical given
our background knowledge of the data (*e.g.* pollutants such as `co`

and
`o3`

influencing the `Day`

). This will be used as a massive blacklist to
reduce the number of candidate networks we will need to score.

> bl = data.frame( + "from" = c(rep("Region", 10), rep("Zone", 10), rep("Type", 10), rep("Year", 10), + rep("Season", 10), rep("Month", 10), rep("Day", 10), rep("Hour", 10), + rep("Latitude", 10), rep("Longitude", 10), rep("Altitude", 10), + rep("CVD60", 23), rep("t2m", 11), rep("ws", 11), rep("wd", 11), + rep("tp", 11), rep("blh", 11), rep("ssr", 11), rep("no2", 11), + rep("so2", 11), rep("co", 11), rep("o3", 11), rep("pm10", 11), + rep("pm2.5", 11)), + "to" = c("Zone", "Type", "Year", "Season", "Month", "Day", "Hour", "Latitude", + "Longitude", "Altitude", "Region", "Type", "Year", "Season", "Month", + "Day", "Hour", "Latitude", "Longitude", "Altitude", "Region", "Zone", + "Year", "Season", "Month", "Day", "Hour", "Latitude", "Longitude", + "Altitude", "Region", "Zone", "Type", "Season", "Month", "Day", + "Hour", "Latitude", "Longitude", "Altitude", "Region", "Zone", + "Type", "Year", "Month", "Day", "Hour", "Latitude", "Longitude", + "Altitude", "Region", "Zone", "Type", "Year", "Season", "Day", "Hour", + "Latitude", "Longitude", "Altitude", "Region", "Zone", "Type", "Year", + "Season", "Month", "Hour", "Latitude", "Longitude", "Altitude", + "Region", "Zone", "Type", "Year", "Season", "Month", "Day", + "Latitude", "Longitude", "Altitude", "Region", "Zone", "Type", "Year", + "Season", "Month", "Day", "Hour", "Longitude", "Altitude", "Region", + "Zone", "Type", "Year", "Season", "Month", "Day", "Hour", "Latitude", + "Altitude", "Region", "Zone", "Type", "Year", "Season", "Month", + "Day", "Hour", "Latitude", "Longitude", "Region", "Zone", "Type", + "Year", "Season", "Month", "Day", "Hour", "Latitude", "Longitude", + "Altitude", "t2m", "ws", "wd", "tp", "blh", "ssr", "no2", "o3", + "so2", "co", "pm10", "pm2.5", "Region", "Zone", "Type", "Year", + "Season", "Month", "Day", "Hour", "Latitude", "Longitude", "Altitude", + "Region", "Zone", "Type", "Year", "Season", "Month", "Day", "Hour", + "Latitude", "Longitude", "Altitude", "Region", "Zone", "Type", + "Year", "Season", "Month", "Day", "Hour", "Latitude", "Longitude", + "Altitude", "Region", "Zone", "Type", "Year", "Season", "Month", + "Day", "Hour", "Latitude", "Longitude", "Altitude", "Region", "Zone", + "Type", "Year", "Season", "Month", "Day", "Hour", "Latitude", + "Longitude", "Altitude", "Region", "Zone", "Type", "Year", "Season", + "Month", "Day", "Hour", "Latitude", "Longitude", "Altitude", + "Region", "Zone", "Type", "Year", "Season", "Month", "Day", "Hour", + "Latitude", "Longitude", "Altitude", "Region", "Zone", "Type", + "Year", "Season", "Month", "Day", "Hour", "Latitude", "Longitude", + "Altitude", "Region", "Zone", "Type", "Year", "Season", "Month", + "Day", "Hour", "Latitude", "Longitude", "Altitude", "Region", "Zone", + "Type", "Year", "Season", "Month", "Day", "Hour", "Latitude", + "Longitude", "Altitude", "Region", "Zone", "Type", "Year", "Season", + "Month", "Day", "Hour", "Latitude", "Longitude", "Altitude", + "Region", "Zone", "Type", "Year", "Season", "Month", "Day", "Hour", + "Latitude", "Longitude", "Altitude"))

Having set up the data and the blacklist, we now create the variables we will use to track the status of Structural EM:

`incompleteColumns:`

the names of the variables that contain missing data;`rowsCompleteObservations`

and`completeObservations`

: the indexes of the observations that contain missing data, and the subset of complete observations;`dagCurrent`

and`bnCurrent`

: the`bn`

and`bn.fit`

objects encoding the candidate Bayesian network from the previous iteration;`dagNew`

and`bnNew`

: the`bn`

and`bn.fit`

objects encoding the new candidate Bayesian network at the end of the current iteration.

> incompleteColumns = names(which(sapply(training, anyNA))) > rowsCompleteObservations = which(complete.cases(training)) > completeObservations = training[rowsCompleteObservations, ] > dagNew = dagCurrent = empty.graph(names(completeObservations)) > bnNew = bnCurrent = bn.fit(dagCurrent, completeObservations)

The last thing left to do to prepare for the Structural EM is to initialise the
**parallel** cluster:

- create 10 slave processes;
- load
**bnlearn**in each slave; - export the blacklist
`bl`

to each slave; - split the data into 10 subsets and export one to each slave.

> cl = makeCluster(10) > invisible(clusterEvalQ(cl, library(bnlearn))) > clusterExport(cl, "bl") > > # split the data into as many parts as there are slave processes, and export > # one part to each slave. > splitted = split(sample(nrow(training)), seq(length(cl))) > > for (i in seq_along(splitted)) { + data_split = training[splitted[[i]], , drop = FALSE] + clusterExport(cl[i], "data_split") + }#FOR

Considering the size of the data, we want to perform as much computation as possible in
parallel; in order to do so we export the function `impute_split()`

below to
perform imputation on each subset of data in the slave processes. (Note that this function
predates the `impute()`

function currently implemented in **bnlearn**,
but does more or less the same thing.)

> impute_split = function(n = 50) { + nodes = nodes(bnCurrent) + # variables corresponding to isolated nodes can be quickly imputed by their + # expectations. + for (var in nodes) { + missing = is.na(data_split[, var]) + if ((length(nbr(bnCurrent, var)) == 0) && (any(missing))) + data_split[missing, var] = + rnorm(length(which(missing)), mean = bnCurrent[[var]]$coef, + sd = bnCurrent[[var]]$sd / sqrt(n)) + }#FOR + # reassess which observations have missing data. + missing = !complete.cases(data_split) + for (i in which(missing)) { + from = nodes[which(!is.na(data_split[i, ]))] + to = setdiff(nodes, from) + # use the observed part of the observation as the evidence. + if (length(from) == 0) + evidence = TRUE + else + evidence = lapply(data_split[i, from], + function(x) if (is.factor(x)) as.character(x) else x) + # simulate the particles and the weights using likelihood weighting. + particles = + cpdist(bnCurrent, nodes = to, evidence = evidence, method = "lw", n = n) + # impute by posterior mode (discrete variables) or posterior expectation + # (continuous variables). + particles = sapply(particles, function(x, w) { + if (is.factor(x)) + names(which.max(by(w, INDICES = x, FUN = sum))) + else if (is.numeric(x)) + weighted.mean(x = x, w = w) + }, w = attr(particles, "weights")) + data_split[i, to] = particles + }#FOR + return(data_split) + }#IMPUTE_SPLIT > > clusterExport(cl, "impute_split")

This is the main loop that iterates the *expectation* and *maximisation* steps
of Structural EM, performing up to 10 steps before stopping. The two steps are as follows:

*Expectation step:*missing values are imputed with`impute_split()`

on each slave.*Maximisation step:*the structure of the network is learned with`hc()`

in parallel on the data in each slave, and the resulting networks are averaged with`averaged.network()`

to produce a consensus network for the whole data set. The parameters of this network are then learned with`bn.fit()`

.

The loop stops when the new candidate network `dagNew`

is identical to the current
network `dagCurrent`

.

> for (iteration in seq(10)) { + # export the current network. + dagCurrent = dagNew + bnCurrent = bnNew + clusterExport(cl, c("dagCurrent", "bnCurrent")) + # expectation step: impute the missing data points on the data splits. + current = training + invisible(clusterEvalQ(cl, { + complete = impute_split() + NULL + })) + # maximization step: learn one network structure from each split and build + # a consensus network using model averaging. + models = parLapply(cl, seq(length(cl)), function(...) { + # starting from the previous network assumes that the new network will be + # similar, with few arc changes to evaluate. + hc(complete, blacklist = bl, start = dagCurrent) + }) + # average the networks (and make sure the result is completely directed). + strengthNew = custom.strength(models, nodes = nodes(dagCurrent)) + dagNew = averaged.network(strengthNew) + dagNew = cextend(dagNew) + # if there was no change, the network from the previous iteration is final. + if (isTRUE(all.equal(dagCurrent, dagNew))) + break + # retrieve the imputed values. + for (i in incompleteColumns) { + imputed = clusterCall(cl, function(col) complete[, col], col = i) + current[unlist(splitted), i] = unlist(imputed) + }#THEN + # fit the parameters. + bnNew = bn.fit(dagNew, data = current, keep.fitted = FALSE) + # save the DAG, the arc strengths and the fitted BN. + save(strengthNew, file = paste("strengthNew-", iteration, ".rda", sep = "")) + save(dagNew, file = paste("dagNew-", iteration, ".rda", sep = "")) + save(bnNew, file = paste("bnNew-", iteration, ".rda", sep = "")) + save(imputed, file = paste("imputed-", iteration, ".rda", sep = "")) + }#FOR > > stopCluster(cl)

`Tue Jan 30 12:08:08 2018`

with **bnlearn**

`4.3`

and `R version 3.0.2 (2013-09-25)`

.