diff --git a/NAMESPACE b/NAMESPACE index 50753d80..07935ca6 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -1,6 +1,7 @@ # Generated by roxygen2: do not edit by hand export(add_rolling_years) +export(build_parquet_raw_data) export(check_afp_geographies) export(check_afp_guid_ctry_data) export(check_cache) @@ -19,6 +20,7 @@ export(create_emergence_group_gif) export(create_npafp_export) export(create_pop_check_export) export(create_pot_comp_clust_export) +export(create_raw_data_parquet) export(create_stool_adequacy_export) export(ctry_data_errors) export(duplicate_check) @@ -36,6 +38,7 @@ export(f.stool.ad.01) export(f.timely.detection.01) export(fix_ctry_data_missing_guids) export(force_load_polio_data_cache) +export(from_wkb_to_sf) export(generate_60_day_tab) export(generate_60_day_table_data) export(generate_ad_final_col) @@ -99,6 +102,7 @@ export(generate_timeliness_maps) export(generate_timely_det_violin) export(generate_timely_ship_violin) export(get_all_polio_data) +export(get_all_polio_data_2) export(get_azure_storage_connection) export(get_cdc_childvaxview_data) export(get_constant) @@ -129,6 +133,7 @@ export(update_city_spatial_data) export(update_polio_data) export(update_road_spatial_data) export(update_vacc_cov_data) +export(upload_parquet_to_edav) export(upload_to_sharepoint) import(dplyr) import(ggplot2) diff --git a/R/dal.R b/R/dal.R index 2ec10532..daa7aa25 100644 --- a/R/dal.R +++ b/R/dal.R @@ -897,955 +897,6 @@ normalize_format <- function(fmt) { #### 2) Key data pull functions #### - -#' Retrieve all pre-processed polio data -#' -#' @description Download POLIS data from the CDC pre-processed endpoint. By default -#' this function will return a "small" or recent dataset. This is primarily for data -#' that is from the past six years. You can specify a "medium" sized dataset for data -#' that is from 2016 onwards. Finally the "large" sized dataset will provide information -#' from 2000 onwards. Regular pulls form the data will recreate the "small" dataset -#' when new information is available and the Data Management Team can force the -#' creation of the "medium" and "large" static datasets as necessary. -#' -#' @param size `str` Size of data to download. Defaults to `"small"`. -#' - `"small"`: Data from the last six years. -#' - `"medium"`: Data from 2016-present. -#' - `"large"`: Data from 2000-present. -#' @param data_folder `str` Location of the data folder containing pre-processed POLIS data, -#' spatial files, coverage data, and population data. Defaults to `"GID/PEB/SIR/Data"`. -#' @param polis_folder `str` Location of the POLIS folder. Defaults to `"GID/PEB/SIR/POLIS"`. -#' @param core_ready_folder `str` Which core ready folder to use. Defaults to `"Core_Ready_Files"`. -#' @param force.new.run `logical` Default `FALSE`, if `TRUE` will run recent data and cache. -#' @param recreate.static.files `logical` Default `FALSE`, if `TRUE` will run all data and cache. -#' @param attach.spatial.data `logical` Default `TRUE`, adds spatial data to downloaded object. -#' @param use_edav `logical` Build raw data list using EDAV files. Defaults to `TRUE`. -#' @param archive Logical. Whether to archive previous output directories -#' before overwriting. Default is `TRUE`. -#' @param keep_n_archives Numeric. Number of archive folders to retain. -#' Defaults to `Inf`, which keeps all archives. Set to a finite number -#' (e.g., 3) to automatically delete older archives beyond the N most recent. -#' @param output_format str: output_format to save files as. -#' Available formats include 'rds' and 'qs2'. Defaults is 'rds'. -#' @param local_caching `logical` Enable local caching so data is stored locally and -#' only downloaded when there is updated data from EDAV. -#' @param use_archived_data `logical` Allows the ability to recreate the raw data file using previous -#' preprocessed data. If -#' @returns Named `list` containing polio data that is relevant to CDC. -#' @examples -#' \dontrun{ -#' raw.data <- get_all_polio_data() # downloads data for last 6 years, including spatial files -#' raw.data <- get_all_polio_data(size = "small", attach.spatial.data = FALSE) # exclude spatial data -#' } -#' -#' @export -get_all_polio_data <- function( - size = "small", - data_folder = "GID/PEB/SIR/Data", - polis_folder = "GID/PEB/SIR/POLIS", - core_ready_folder = "Core_Ready_Files", - force.new.run = FALSE, - recreate.static.files = FALSE, - attach.spatial.data = TRUE, - use_edav = TRUE, - use_archived_data = FALSE, - archive = TRUE, - keep_n_archives = Inf, - output_format = "rds", - local_caching = TRUE) { - - # check to see that size parameter is appropriate - if (!size %in% c("small", "medium", "large")) { - stop("The parameter 'size' must be either 'small', 'medium', or 'large'") - } - - # Check output format - if (!output_format %in% c("rds", "qs2")) { - stop("Only rds and qs2 is supported at this time.") - } - -# normalize and validate both output formats -output_format <- normalize_format(output_format) - -# Fail safe in instances where EDAV connection fails -if (use_edav) { - verify_edav <- tryCatch( - { - invisible(capture.output(test_EDAV_connection())) - cli::cli_alert_success("Connect to EDAV successful.") - TRUE - }, - error = \(e) { - cli::cli_alert_info("Connection to EDAV unsuccessful.") - FALSE - } - ) - - if (!verify_edav) { - cli::cli_alert_info("Unable to obtain data from EDAV. Loading from local cache instead.") - cli::cli_alert_info("NOTE: Data may be stale. Please review the global polio dataset metadata for information on when the data was last processed.") - raw.data <- force_load_polio_data_cache(attach.spatial.data, output_format) - return(raw.data) - } -} - -# Constant variables -# Each file comes out of these folders -analytic_folder <- file.path(data_folder, "analytic") -polis_data_folder <- file.path(data_folder, "polis") -spatial_folder <- file.path(data_folder, "spatial") -coverage_folder <- file.path(data_folder, "coverage") -pop_folder <- file.path(data_folder, "pop") - -# Year cutoffs for the different datasets -current_year <- lubridate::year(Sys.Date()) -small_year <- current_year - 5 -med_year <- 2016 #hardcode to 2016 because it's an important point in time - -# Required files -raw_data_recent_name <- paste0("raw.data.recent", output_format) -raw_data_medium_name <- paste0("raw.data.", med_year, ".", small_year - 1, output_format) -raw_data_2000_name <- paste0("raw.data.2000.", med_year - 1, output_format) -spatial_data_name <- paste0("spatial.data", output_format) -global_ctry_sf_name <- "global.ctry.rds" -global_prov_sf_name <- "global.prov.rds" -global_dist_sf_name <- "global.dist.rds" - -# Perform check to build using the archived polis folder -if (use_archived_data) { - cli::cli_alert_info("Using archived data") - cli::cli_alert_info("NOTE: the metadata will be for the most recent pull") - polis_data_folder <- get_archived_polis_data( - data_folder, - use_edav, - keep_n_archives - ) - recreate.static.files <- TRUE -} - -# look to see if the recent raw data rds is in the analytic folder -prev_table <- sirfunctions_io("list", NULL, analytic_folder, - edav = use_edav -) - -if (nrow(prev_table) > 0) { - prev_table <- prev_table |> - dplyr::filter(grepl(raw_data_recent_name, name)) |> - dplyr::select("file" = "name", "size", "ctime" = "lastModified") -} else { - # if empty, make sure to recreate tibble to the right format - prev_table <- tibble( - "file" = NA, - "size" = NA, - "ctime" = NA - ) |> - dplyr::mutate(file = as.character(file), - size = as.double(size), - ctime = as_datetime(ctime)) |> - dplyr::filter(!is.na(file)) -} - -if (recreate.static.files | force.new.run) { - force.new.run <- T - create.cache <- T -} - - -if (!force.new.run) { - - # Check if using the local cache is sufficient - if (use_edav & size == "small" & local_caching) { - if (!recache_raw_data(analytic_folder, use_edav, output_format)) { - - raw.data <- sirfunctions_io("read", NULL, file.path(rappdirs::user_data_dir("sirfunctions"), - paste0("raw_data", output_format)), - edav = FALSE) - - cli::cli_process_start("Checking for duplicates in datasets.") - raw.data <- duplicate_check(raw.data) - cli::cli_process_done() - if (attach.spatial.data) { - if (!recache_spatial_data(analytic_folder, spatial_folder, - use_edav, output_format)) { - spatial.data <- sirfunctions_io("read", NULL, file.path(rappdirs::user_data_dir("sirfunctions"), - paste0("spatial_data", output_format)), - edav = FALSE) - raw.data$global.ctry <- spatial.data$global.ctry - raw.data$global.prov <- spatial.data$global.prov - raw.data$global.dist <- spatial.data$global.dist - raw.data$roads <- spatial.data$roads - raw.data$cities <- spatial.data$cities - - return(raw.data) - } else { - spatial.data <- sirfunctions_io("read", NULL, file.path(analytic_folder, spatial_data_name), - edav = use_edav) - sirfunctions_io("write", NULL, file.path(rappdirs::user_data_dir("sirfunctions"), - paste0("spatial_data", output_format)), - obj = spatial.data, - edav = FALSE) - edav_spatial_timestamp <- sirfunctions_io( - "read", - NULL, - file.path(analytic_folder, paste0("spatial_timestamp", output_format)), - edav = use_edav - ) - sirfunctions_io("write", NULL, file.path(rappdirs::user_data_dir("sirfunctions"), - paste0("spatial_timestamp", output_format)), - obj = edav_spatial_timestamp, - edav = FALSE) - - raw.data$global.ctry <- spatial.data$global.ctry - raw.data$global.prov <- spatial.data$global.prov - raw.data$global.dist <- spatial.data$global.dist - raw.data$roads <- spatial.data$roads - raw.data$cities <- spatial.data$cities - - return(raw.data) - } - } else { - return(raw.data) - } - } - } - - if (use_edav) { - cli::cli_alert_info(paste0("Downloading most recent active polio data from ", small_year," onwards")) - } else { - cli::cli_alert_info(paste0("Loading most recent active polio data from ", small_year," onwards")) - } - - raw.data.small.pull <- sirfunctions_io("read", NULL, prev_table$file, edav = use_edav) - - if (size == "small") { - raw.data <- raw.data.small.pull - } - - if (size == "medium") { - prev_table <- sirfunctions_io("list", NULL, analytic_folder, edav = use_edav) |> - dplyr::filter(grepl(raw_data_medium_name, name)) |> - dplyr::select("file" = "name", "size", "ctime" = "lastModified") - - if (use_edav) { - cli::cli_alert_info(paste0("Downloading static polio data from ", med_year, "-", small_year)) - } else { - cli::cli_alert_info(paste0("Loading static polio data from ", med_year, "-", small_year)) - } - - raw.data.medium.pull <- sirfunctions_io("read", NULL, prev_table$file, edav = use_edav) - - raw.data <- split_concat_raw_data( - action = "concat", - raw.data.small.pull = raw.data.small.pull, - raw.data.medium.pull = raw.data.medium.pull - ) - } - - if (size == "large") { - prev_table <- sirfunctions_io("list", NULL, analytic_folder, - edav = use_edav, full_names = TRUE - ) |> - dplyr::filter(grepl(raw_data_medium_name, name)) |> - dplyr::select("file" = "name", "size", "ctime" = "lastModified") - - if (use_edav) { - cli::cli_alert_info(paste0("Downloading static polio data from ", med_year, "-", small_year)) - } else { - cli::cli_alert_info(paste0("Loading static polio data from ", med_year, "-", small_year)) - } - - raw.data.medium.pull <- sirfunctions_io("read", NULL, prev_table$file, edav = use_edav) - - prev_table <- sirfunctions_io("list", NULL, analytic_folder, edav = use_edav) |> - dplyr::filter(grepl(raw_data_2000_name, name)) |> - dplyr::select("file" = "name", "size", "ctime" = "lastModified") - - if (use_edav) { - cli::cli_alert_info(paste0("Downloading static polio data from 2001-", med_year)) - } else { - cli::cli_alert_info(paste0("Loading static polio data from 2001-", med_year)) - } - - raw.data.large.pull <- sirfunctions_io("read", NULL, prev_table$file, edav = use_edav) - - raw.data <- split_concat_raw_data( - action = "concat", - raw.data.small.pull = raw.data.small.pull, - raw.data.medium.pull = raw.data.medium.pull, - raw.data.large.pull = raw.data.large.pull - ) - } - - # Only cache the small dataset, which we use in 90% of the case - if (use_edav & local_caching) { - raw_data_timestamp_exists <- invisible(sirfunctions_io( - "exists.file", - NULL, - file.path(analytic_folder, paste0("raw_data_timestamp", output_format)), - edav = use_edav - )) - - } else { - raw_data_timestamp_exists <- FALSE - } - if (size == "small" & raw_data_timestamp_exists & local_caching) { - cli::cli_process_start("Caching global polio data locally") - - if (!dir.exists(rappdirs::user_data_dir("sirfunctions"))) { - dir.create(rappdirs::user_data_dir("sirfunctions"), recursive = TRUE) - } - - sirfunctions_io("write", NULL, - file.path(rappdirs::user_data_dir("sirfunctions"), paste0("raw_data", output_format)), - obj = raw.data, - edav = FALSE) - # Add edav tag file to local cache dir - edav_raw_data_timestamp <- sirfunctions_io( - "read", - NULL, - file.path(analytic_folder, paste0("raw_data_timestamp", output_format)), - edav = use_edav - ) - - sirfunctions_io("write", NULL, - file.path(rappdirs::user_data_dir("sirfunctions"), paste0("raw_data_timestamp", output_format)), - obj = edav_raw_data_timestamp, - edav = FALSE) - - cli::cli_process_done() - } - - cli::cli_process_done() - - cli::cli_process_start("Checking for duplicates in datasets.") - raw.data <- duplicate_check(raw.data) - cli::cli_process_done() - - if (attach.spatial.data) { - - # Don't recache spatial if up to date - if (!recache_spatial_data(analytic_folder, spatial_folder, - use_edav, output_format) & local_caching) { - spatial.data <- sirfunctions_io("read", NULL, file.path(rappdirs::user_data_dir("sirfunctions"), - paste0("spatial_data", output_format)), - edav = FALSE) - raw.data$global.ctry <- spatial.data$global.ctry - raw.data$global.prov <- spatial.data$global.prov - raw.data$global.dist <- spatial.data$global.dist - raw.data$roads <- spatial.data$roads - raw.data$cities <- spatial.data$cities - - return(raw.data) - } - - if (use_edav) { - cli::cli_process_start("Downloading and attaching spatial data") - } else { - cli::cli_process_start("Loading and attaching spatial data") - } - - spatial.data <- sirfunctions_io("read", NULL, - file.path(analytic_folder, spatial_data_name), - edav = use_edav - ) - - raw.data$global.ctry <- spatial.data$global.ctry - raw.data$global.prov <- spatial.data$global.prov - raw.data$global.dist <- spatial.data$global.dist - raw.data$roads <- spatial.data$roads - raw.data$cities <- spatial.data$cities - - cli::cli_process_done() - - if (use_edav & local_caching) { - spatial_timestamp_exists <- sirfunctions_io( - "exists.file", - NULL, - file.path(analytic_folder, paste0("spatial_timestamp", output_format)), - edav = use_edav - ) - } else { - spatial_timestamp_exists <- FALSE - } - - if (recache_spatial_data(analytic_folder, spatial_folder, - use_edav, output_format) & spatial_timestamp_exists & local_caching) { - sirfunctions_io("write", - NULL, - file.path(rappdirs::user_data_dir("sirfunctions"), - paste0("spatial_data", - output_format)), - obj = spatial.data, - edav = FALSE) - - spatial_processed_tag <- sirfunctions_io("read", - NULL, - file.path(analytic_folder, - paste0("spatial_timestamp", output_format)), - edav = use_edav) - sirfunctions_io("write", - NULL, - file.path(rappdirs::user_data_dir("sirfunctions"), - paste0("spatial_timestamp", output_format)), - obj = spatial_processed_tag, - edav = FALSE) - } - } - - return(raw.data) - -} else { - - # Check that the required folders have data - for (folder in c(analytic_folder, polis_data_folder, spatial_folder, - coverage_folder, pop_folder)) { - - # get_all_polio_data will recreate the analytic folder if it's missing - switch(basename(folder), - "analytic" = { - if (!sirfunctions_io("exists.dir", NULL, folder, edav = use_edav)) { - cli::cli_alert_info("No analytics folder found. Will create a new one.") - sirfunctions_io("create.dir", NULL, folder, edav = use_edav) - } - }, - "polis" = { - if (!sirfunctions_io("exists.dir", NULL, folder, edav = use_edav)) { - cli::cli_alert_info("Creating polis folder in the data folder") - sirfunctions_io("create.dir", NULL, folder, edav = use_edav) - } else { - cli::cli_alert_info("Moving updated polis data to the data folder") - } - - - create_polis_data_folder( - data_folder, - polis_folder, - core_ready_folder, - use_edav, - archive, - keep_n_archives - ) - - }, - "spatial" = { - if (!sirfunctions_io("exists.dir", NULL, folder, edav = use_edav)) { - cli::cli_abort(paste0("No spatial data found in the data folder.", - " Ensure that the output folder when running ", - " tidypolis::process_spatial() is ", - spatial_folder), - ) - } - }, - "coverage" = { - if (!sirfunctions_io("exists.dir", NULL, folder, edav = use_edav)) { - cli::cli_abort(paste0("Coverage data not found.", - "Please create and add coverage data in: ", - folder)) - } - }, - "pop" = { - if (!sirfunctions_io("exists.dir", NULL, folder, edav = use_edav)) { - cli::cli_abort(paste0("Population data not found. ", - "Preprocessing of population files may be required. ", - "Please create a pop data folder and add data in: ", - folder)) - } - } - ) - } - - if (use_edav) { - cli::cli_h1("Testing download times") - download_metrics <- test_EDAV_connection(return_list = T) - } - - # use the truncated AFP file - afp.trunc <- T - - if (recreate.static.files) { - afp.trunc <- F - } - - dl_table <- dplyr::bind_rows( - sirfunctions_io("list", NULL, polis_data_folder, edav = use_edav), - sirfunctions_io("list", NULL, spatial_folder, edav = use_edav), - sirfunctions_io("list", NULL, coverage_folder, edav = use_edav), - sirfunctions_io("list", NULL, pop_folder, edav = use_edav), - sirfunctions_io("list", NULL, polis_folder, edav = use_edav) |> - dplyr::filter(grepl("cache", name)) - ) |> - dplyr::filter(!is.na(size)) |> - dplyr::select("file" = "name", "size") - - if (use_edav) { - dl_table <- dl_table |> - dplyr::mutate( - "dl_time_sec" = size / download_metrics$size * download_metrics$d - ) - } - - if (afp.trunc) { - dl_table <- dl_table |> - dplyr::filter(!grepl("afp_linelist_2001", file)) - } else { - dl_table <- dl_table |> - dplyr::filter(!grepl("afp_linelist_2019", file)) - } - - file_size <- dl_table$size |> sum() - - if (use_edav) { - download_time <- dl_table$dl_time_sec |> sum() - } - - if (use_edav) { - cli::cli_h1("Downloading POLIS Data") - } else { - cli::cli_h1("Loading POLIS Data") - } - - raw.data <- list() - spatial.data <- list() - - # Check if spatial data needs to be redownloaded from the analytics folder - spatial_timestamp_exists <- sirfunctions_io( - "exists.file", - NULL, - file.path(analytic_folder, paste0("spatial_timestamp", output_format)), - edav = use_edav - ) - - if (spatial_timestamp_exists) { - # Check if it's recent or needs updating - edav_spatial_timestamp <- sirfunctions_io( - "read", - NULL, - file.path(analytic_folder, paste0("spatial_timestamp", output_format)), - edav = use_edav - ) |> - dplyr::select(name, lastModifiedEDAV = lastModified) - - edav_spatial_folder_info <- sirfunctions_io( - "list", - NULL, - file.path(spatial_folder), - edav = use_edav - ) |> - dplyr::select(name, lastModified) - - spatial_timestamp_comparison <- dplyr::left_join(edav_spatial_timestamp, - edav_spatial_folder_info) |> - dplyr::mutate(updated = ifelse(lastModifiedEDAV == lastModified, TRUE, FALSE)) |> - dplyr::pull(updated) |> sum(na.rm = TRUE) - } else { - - spatial_timestamp_comparison <- 0 - - } - - if (spatial_timestamp_comparison == 3) { - cli::cli_alert_success("Spatial data in the analytic folder is up to date. Loading from cache...") - spatial.data <- sirfunctions_io( - "read", - NULL, - file.path(analytic_folder, spatial_data_name), - edav = use_edav - ) - } else { - if (spatial_timestamp_exists) { - cli::cli_alert_warning("Spatial data in the analytic folder is outdated. Recreating from the spatial folder") - } else { - cli::cli_alert_warning("No spatial timestamp exists. Recreating from the spatial folder") - } - - cli::cli_process_start("1) Loading country shape files") - spatial.data$global.ctry <- load_clean_ctry_sp( - fp = file.path(spatial_folder, global_ctry_sf_name), - edav = use_edav - ) - cli::cli_process_done() - - cli::cli_process_start("2) Loading province shape files") - spatial.data$global.prov <- load_clean_prov_sp( - fp = file.path(spatial_folder, global_prov_sf_name), - edav = use_edav - ) - cli::cli_process_done() - - cli::cli_process_start("3) Loading district shape files") - spatial.data$global.dist <- load_clean_dist_sp( - fp = file.path(spatial_folder, global_dist_sf_name), - edav = use_edav - ) - cli::cli_process_done() - } - - cli::cli_process_start("4) Loading AFP line list data (This file is almost 3GB and can take a while)") - raw.data$afp <- - sirfunctions_io("read", NULL, file_loc = dplyr::filter( - dl_table, - grepl("afp", file) - ) |> - dplyr::pull(file), edav = use_edav) |> - dplyr::filter(surveillancetypename == "AFP") |> - dplyr::mutate( - cdc.classification.all2 = dplyr::case_when( - final.cell.culture.result == "Not received in lab" & - cdc.classification.all == "PENDING" ~ "LAB PENDING", - TRUE ~ cdc.classification.all - ), - hot.case = ifelse( - paralysis.asymmetric == "Yes" & - paralysis.onset.fever == "Yes" & - paralysis.rapid.progress == "Yes", - 1, - 0 - ), - hot.case = ifelse(is.na(hot.case), 99, hot.case) - ) - - cli::cli_process_done() - - cli::cli_process_start("Processing AFP data for analysis") - - raw.data$afp.epi <- raw.data$afp |> - dplyr::mutate(epi.week = lubridate::epiweek(dateonset)) |> - dplyr::group_by(place.admin.0, epi.week, yronset, cdc.classification.all2) |> - dplyr::summarize(afp.cases = dplyr::n(), - .groups = "drop") |> - dplyr::mutate(epiweek.year = paste(yronset, epi.week, sep = "-")) |> - # manual fix of epi week - dplyr::mutate(epi.week = ifelse(epi.week == 52 & - yronset == 2022, 1, epi.week)) - - # factoring cdc classification to have an order we like in stacked bar chart - raw.data$afp.epi$cdc.classification.all2 <- - factor( - raw.data$afp.epi$cdc.classification.all2, - levels = c( - "WILD 1", - "cVDPV 2", - "VDPV 2", - "cVDPV 1", - "VDPV 1", - "COMPATIBLE", - "PENDING", - "LAB PENDING", - "NPAFP", - "NOT-AFP", - "UNKNOWN", - "aVDPV 1", - "aVDPV 3", - "cVDPV1andcVDPV2", - "CombinationWild1-cVDPV 2", - "aVDPV 2", - "VDPV 3", - "iVDPV 2", - "VDPV1andcVDPV2", - "VAPP", - "cVDPV 3", - "iVDPV 3", - "WILD 3", - "WILD1andWILD3", - "iVDPV 1", - "cVDPV2andcVDPV3" - ), - labels = c( - "WILD 1", - "cVDPV 2", - "VDPV 2", - "cVDPV 1", - "VDPV 1", - "COMPATIBLE", - "PENDING", - "LAB PENDING", - "NPAFP", - "NOT-AFP", - "UNKNOWN", - "aVDPV 1", - "aVDPV 3", - "cVDPV1andcVDPV2", - "CombinationWild1-cVDPV 2", - "aVDPV 2", - "VDPV 3", - "iVDPV 2", - "VDPV1andcVDPV2", - "VAPP", - "cVDPV 3", - "iVDPV 3", - "WILD 3", - "WILD1andWILD3", - "iVDPV 1", - "cVDPV2andcVDPV3" - ) - ) - - raw.data$para.case <- raw.data$afp |> - dplyr::filter( - stringr::str_detect(cdc.classification.all2, "VDPV|WILD|COMPATIBLE") - ) |> - dplyr::mutate(yronset = ifelse(is.na(yronset) == T, 2022, yronset)) # this fix was for the manually added MOZ case - cli::cli_process_done() - - - cli::cli_process_start("5) Loading population data") - raw.data$dist.pop <- - sirfunctions_io("read", NULL, - dplyr::filter(dl_table, grepl("dist.pop", file)) |> - dplyr::pull(file), - edav = use_edav - ) |> - dplyr::ungroup() - - raw.data$prov.pop <- - sirfunctions_io("read", NULL, - file_loc = dplyr::filter(dl_table, grepl("prov.pop", file)) |> - dplyr::pull(file), edav = use_edav - ) |> - dplyr::ungroup() - - raw.data$ctry.pop <- - sirfunctions_io("read", NULL, - dplyr::filter(dl_table, grepl("ctry.pop", file)) |> - dplyr::pull(file), - edav = use_edav - ) |> - dplyr::ungroup() - cli::cli_process_done() - - - cli::cli_process_start("6) Loading coverage data") - raw.data$ctry.coverage <- sirfunctions_io("read", NULL, - file_loc = dplyr::filter(dl_table, grepl("ctry_cov", file)) |> - dplyr::pull(file), edav = use_edav - ) - - raw.data$prov.coverage <- sirfunctions_io("read", NULL, - file_loc = dplyr::filter(dl_table, grepl("prov_cov", file)) |> - dplyr::pull(file), edav = use_edav - ) - - raw.data$dist.coverage <- sirfunctions_io("read", NULL, - file_loc = dplyr::filter(dl_table, grepl("dist_cov", file)) |> - dplyr::pull(file), edav = use_edav - ) - - cli::cli_process_done() - - cli::cli_process_start("7) Loading ES data") - - raw.data$es <- - sirfunctions_io("read", NULL, - file_loc = dplyr::filter(dl_table, grepl("/es_2001", file)) |> - dplyr::pull(file), edav = use_edav - ) - cli::cli_process_done() - - cli::cli_process_start("8) Loading SIA data") - raw.data$sia <- - sirfunctions_io("read", NULL, - file_loc = dplyr::filter(dl_table, grepl("sia", file)) |> - dplyr::pull(file), edav = use_edav - ) - - cli::cli_process_done() - - cli::cli_process_start("9) Loading all positives") - raw.data$pos <- - sirfunctions_io("read", NULL, - file_loc = dplyr::filter(dl_table, grepl("/pos", file)) |> - dplyr::pull(file), edav = use_edav - ) - - cli::cli_process_done() - - cli::cli_process_start("10) Loading other surveillance linelist") - raw.data$other <- - sirfunctions_io("read", NULL, - file_loc = dplyr::filter(dl_table, grepl("/other", file)) |> - dplyr::pull(file), edav = use_edav - ) - - cli::cli_process_done() - - cli::cli_process_start("11) Loading road network data") - spatial.data$roads <- sirfunctions_io("read", NULL, - file_loc = dplyr::filter(dl_table, grepl("roads.rds", file)) |> - dplyr::pull(file), edav = use_edav - ) - cli::cli_process_done() - - cli::cli_process_start("12) Loading city spatial data") - spatial.data$cities <- sirfunctions_io("read", NULL, - file_loc = dplyr::filter(dl_table, grepl("cities.rds", file)) |> - dplyr::pull(file), edav = use_edav - ) - cli::cli_process_done() - - cli::cli_process_start("13) Creating Metadata object") - - polis.cache <- sirfunctions_io("read", NULL, - file_loc = dplyr::filter(dl_table, grepl("cache.rds", file)) |> - dplyr::pull(file), edav = use_edav - ) |> - dplyr::mutate(last_sync = as.Date(last_sync)) - - raw.data$metadata$download_time <- max(polis.cache$last_sync, na.rm = TRUE) - - raw.data$metadata$processed_time <- sirfunctions_io("list", NULL, - file.path(polis_folder, "data", core_ready_folder), - edav = use_edav - ) |> - dplyr::filter(grepl("positives_2001-01-01", name)) |> - dplyr::select("ctime" = "lastModified") |> - dplyr::mutate(ctime = as.Date(ctime)) |> - dplyr::pull(ctime) - - raw.data$metadata$user <- polis.cache |> - dplyr::filter(table == "virus") |> - dplyr::pull(last_user) - - raw.data$metadata$most_recent_pos <- max(raw.data$pos$dateonset, na.rm = TRUE) - raw.data$metadata$most_recent_pos_loc <- raw.data$pos |> - dplyr::arrange(dplyr::desc(dateonset)) |> - dplyr::slice(1) |> - dplyr::pull(place.admin.0) - - - raw.data$metadata$most_recent_afp <- max(raw.data$afp$dateonset, na.rm = TRUE) - raw.data$metadata$most_recent_afp_loc <- raw.data$afp |> - dplyr::arrange(dplyr::desc(dateonset)) |> - dplyr::slice(1) |> - dplyr::pull(place.admin.0) - - - raw.data$metadata$most_recent_env <- max(raw.data$es$collect.date, na.rm = TRUE) - raw.data$metadata$most_recent_env_loc <- raw.data$es |> - dplyr::arrange(dplyr::desc(collect.date)) |> - dplyr::slice(1) |> - dplyr::pull(ADM0_NAME) - - - raw.data$metadata$most_recent_sia <- max(raw.data$sia$sub.activity.start.date) - raw.data$metadata$most_recent_sia_code <- raw.data$sia |> - dplyr::arrange(dplyr::desc(sub.activity.start.date)) |> - dplyr::slice(1) |> - dplyr::pull(sia.code) - raw.data$metadata$most_recent_sia_location <- raw.data$sia |> - dplyr::arrange(dplyr::desc(sub.activity.start.date)) |> - dplyr::slice(1) |> - dplyr::pull(place.admin.0) - raw.data$metadata$most_recent_sia_vax <- raw.data$sia |> - dplyr::arrange(dplyr::desc(sub.activity.start.date)) |> - dplyr::slice(1) |> - dplyr::pull(vaccine.type) - - raw.data$metadata$most_recent_vdpv_class_change_date <- raw.data$pos$vdpvclassificationchangedate |> - lubridate::as_date() |> - max(na.rm = T) - - rm(polis.cache) - - cli::cli_process_done() - - cli::cli_process_start("14) Clearing out unused memory") - gc() - cli::cli_process_done() -} - -if (create.cache) { - cli::cli_process_start("15) Caching processed data") - - out <- split_concat_raw_data(action = "split", split.years = c(2000, med_year, small_year), raw.data.all = raw.data) - - out_files <- out$split.years |> - dplyr::mutate( - file_name = ifelse(grepl(current_year, tag), "recent", stringr::str_replace_all(tag, "-", ".")), - file_name = paste0("raw.data.", file_name, output_format) - ) - - if (!recreate.static.files) { - out_files <- out_files |> dplyr::filter(grepl("recent", file_name)) - } - - if (!use_archived_data) { - for (i in 1:nrow(out_files)) { - sirfunctions_io("write", NULL, - file_loc = file.path(analytic_folder, dplyr::pull(out_files[i, ], file_name)), - obj = out[[dplyr::pull(out_files[i, ], tag)]], - edav = use_edav - )} - } - -# set up path for spatial df - sp_file_path <- file.path(analytic_folder, paste0("spatial.data", output_format)) - - sirfunctions_io("write", NULL, - file_loc = sp_file_path, - obj = spatial.data, edav = use_edav - ) - - # Create tags only if not using "archived" version - if (use_edav & !use_archived_data) { - # Create raw data file tag for future comparisons - sirfunctions_io("write", NULL, - file_loc = file.path(analytic_folder, paste0("raw_data_timestamp", output_format)), - obj = Sys.time()) - - # Create spatial data file tag for future comparisons - spatial_files <- sirfunctions_io("list", - NULL, - spatial_folder, - edav = use_edav, - full_names = TRUE) - - edav_spatial_timestamp <- spatial_files |> - dplyr::filter(stringr::str_detect(name, "global."), - stringr::str_ends(name, output_format)) |> - dplyr::select(name, lastModified) - - sirfunctions_io( - "write", - NULL, - file.path(analytic_folder, paste0("spatial_timestamp", output_format)), - obj = edav_spatial_timestamp, - edav = use_edav - ) - } - - cli::cli_process_done() -} - -raw_data_cut_size <- switch(size, - "small" = small_year, - "medium" = med_year, - "large" = 2000) - -raw.data <- split_concat_raw_data(action = "split", - split.years = raw_data_cut_size, - raw.data.all = raw.data)[[1]] - -cli::cli_process_start("Checking for duplicates in datasets.") -raw.data <- duplicate_check(raw.data) -cli::cli_process_done() - -if (attach.spatial.data) { - raw.data$global.ctry <- spatial.data$global.ctry - raw.data$global.prov <- spatial.data$global.prov - raw.data$global.dist <- spatial.data$global.dist - raw.data$roads <- spatial.data$roads - raw.data$cities <- spatial.data$cities -} - -if (use_archived_data) { - cli::cli_alert_success(paste0("Successfully recreated global polio data from ", - basename(polis_data_folder))) -} - -return(raw.data) - -} - #' Assess duplicates in the get_all_polio_data() output #' #' @description diff --git a/R/dal.parquet.R b/R/dal.parquet.R new file mode 100644 index 00000000..8709f434 --- /dev/null +++ b/R/dal.parquet.R @@ -0,0 +1,481 @@ +#' Convert raw data into a parquet hierarchal folder +#' +#' The function takes a `raw_data` object (output of [get_all_polio_data()]) and +#' saves it into a parquet directory +#' @param raw_data `list` A `raw_data` object. +#' @param path `str` Path to export the parquet folder to. +#' +#' @returns None. +#' @export +#' +#' @examples +#' \dontrun{ +#' raw_data <- get_all_polio_data() +#' create_raw_data_parquet(raw_data, "C:/Users/ABC1/Desktop/raw_data_parquet") +#' } +create_raw_data_parquet <- function(raw_data, path) { + df_names <- names(raw_data) + + old_threads <- getOption("arrow.use_threads") + options(arrow.use_threads = TRUE) + on.exit(options(arrow.use_threads = old_threads), add = TRUE) + + if (!dir.exists(path)) { + cli::cli_abort("Directory path does not exist.") + } + + cli::cli_process_start("Creating raw_data parquet folder") + iter <- 1 + for (i in df_names) { + cli::cli_alert_info(paste0("Now processing: ", i)) + + data <- + if (i %in% c("global.ctry", "global.prov", "global.dist")) { + raw_data[[i]] |> + to_wkb_drop_sf() |> + arrow::write_dataset( + path = file.path(path, i), + partitioning = get_partition_cols(i) + ) + } else if (i %in% c("cities", "roads")) { + raw_data[[i]] |> + to_wkb_drop_sf() |> + arrow::write_dataset( + path = file.path(path, i), + partitioning = get_partition_cols(i) + ) + } else if (i == "metadata") { + raw_data[[i]] |> + dplyr::as_tibble() |> + arrow::write_dataset( + path = file.path(path, i), + partitioning = get_partition_cols(i) + ) + } else { + raw_data[[i]] |> + arrow::write_dataset( + path = file.path(path, i), + partitioning = get_partition_cols(i) + ) + } + + cli::cli_alert_info(paste0(iter, "/", length(df_names), " processed.")) + iter <- iter + 1 + } + + cli::cli_process_done() +} + +#' Recreate raw data from local parquet folder +#' +#' @description +#' Recreates an output of [get_all_polio_data()] from a folder housing +#' data in parquet format. +#' +#' @param path `str` Absolute path to the parquet folder. +#' @param from_edav `bool` Build using local files or files in EDAV? Defaults to TRUE. +#' @param container `azcontainer` An instance of an Azure container to connect. +#' to. Pass [get_azure_storage_connection()] using defaults if not accessing +#' using a service principal. +#' +#' @details +#' For tibbles with Shapes, pass to [from_wkb_to_sf()] first before creating maps. +#' +#' @returns `list` A list containing connections to the folders associated with +#' individual datasets in the original output of [get_all_polio_data()]. +#' @export +#' +#' @examples +#' \dontrun{ +#' # Building raw_data locally +#' parquet_path <- "C:/Users/ABC1/Desktop/parquet_folder" +#' raw_data <- build_parquet_raw_data(parquet_path) +#' +#' # Build raw_data from EDAV +#' raw_data <- build_parquet_raw_data() +#' } +build_parquet_raw_data <- function( + path = "GID/PEB/SIR/Data/analytic", + dataset = "all", + from_edav = TRUE, + container = get_azure_storage_connection() +) { + if (from_edav) { + # Default values + if (is.null(path)) { + cli::cli_abort("Please pass a file path to the parquet folder") + } + + raw_data <- build_parquet_raw_data_edav(path, dataset, container) + } else { + raw_data <- build_parquet_raw_data_local(path, dataset) + } + + return(raw_data) +} + +#' Uploads a local parquet folder to EDAV +#' +#' Uploads a folder containing parquet files to EDAV +#' +#' @param src `str` Local path to the parquet folder. +#' @param dest `str` EDAV endpoint. +#' @param container `azcontainer` An instance of an Azure container. +#' +#' @returns None. +#' @export +#' +#' @examples +#' \dontrun{ +#' local_dir <- "C:/Users/ABC1/Desktop/parquet_folder" +#' edav_dir <- "ABC/parquet_folder" +#' upload_parquet_to_edav(local_dir, edav_dir) +#' } +upload_parquet_to_edav <- function(src, dest, container = get_azure_storage_connection()) { + + dir_exists <- edav_io("exists.dir", NULL, dest) + if (!dir_exists) { + cli::cli_abort("Folder doesn't exist on EDAV. Unable to upload") + } + + cli::cli_process_start("Uploading parquet folder to EDAV") + AzureStor::multiupload_adls_file( + container, + paste0(src, "/*"), + file.path(dest, "raw_data_parquet"), + recursive = TRUE + ) + cli::cli_process_done() +} + +#' Convert WKB back to sf column +#' +#' @param sf_data `arrow connection` Geodata arrow connection. +#' +#' @returns `tibble` Geodata with `sf`. +#' +#' @export +#' @examples +#' \dontrun{ +#' raw_data <- build_parquet_raw_data() +#' kenya_ctry_sf <- raw_data$global.ctry |> +#' dplyr::filter(ctry == "KENYA") |> +#' dplyr::collect() |> +#' from_wkb_to_sf() +#' } +from_wkb_to_sf <- function(sf_data) { + # Ensure that global shapefiles have Shape and city/roads as geometry. + # Otherwise, need to modify this function. + + if (inherits(sf_data, "ArrowObject")) { + cli::cli_abort("Please run dplyr::collect() first prior to passing to the function.") + } + + if ("Shape" %in% names(sf_data)) { + sf_data <- sf_data |> + dplyr::mutate(Shape = sf::st_as_sfc(Shape, EWKB = TRUE, crs = 4326)) |> + sf::st_as_sf() + } else if ("geometry" %in% names(sf_data)) { + sf_data <- sf_data |> + dplyr::mutate(geometry = sf::st_as_sfc(geometry, EWKB = TRUE, crs = 4326)) |> + sf::st_as_sf() + } else { + cli::cli_abort("Not an sf dataset.") + } + + return(sf_data) +} + +# Private functions ---- + +#' Gets the column used to partition a column +#' +#' @param name `str` Name of the column +#' +#' @returns `chr` A character vector of columns to partition with. +#' @keywords internal +#' +#' @examples +#' \dontrun{ +#' get_partition_cols("afp") +#' } +get_partition_cols <- function(name) { + switch( + name, + "afp" = "yronset", + "afp.dupe" = "yronset", + "afp.epi" = "yronset", + "para.case" = "yronset", + "es" = "collect.yr", + "es.dupe" = "collect.yr", + "sia" = "yr.sia", + "sia.dupe" = "yr.sia", + "pos" = "yronset", + "pos.dupe" = "yronset", + "other" = "yronset", + "other.dupe" = "yronset", + "dist.pop" = "year", + "prov.pop" = "year", + "ctry.pop" = "year", + "global.ctry" = "WHO_REGION", + "global.prov" = "WHO_REGION", + "global.dist" = "WHO_REGION", + "ctry.coverage" = "year", + "prov.coverage" = "year", + "dist.coverage" = "year", + "roads" = "continent", + "cities" = "POP_CLASS", + "metadata" = "download_time" + ) +} + +#' Build raw_data using local parquet files +#' +#' @param path `str` A path to the parquet directory +#' @param dataset `str` A specific dataset. Defaults to `"all"`. Otherwise, can specify any dataset in the list returned by [get_all_polio_data()]. +#' +#' @returns `list` A list containing connections to the folders associated with +#' individual datasets in the original output of [get_all_polio_data()]. +#' @keywords internal +#' +build_parquet_raw_data_local <- function(path = NULL, dataset = "all") { + if (!dir.exists(path)) { + cli::cli_abort("Not a valid directory.") + } + + valid_values <- c( + "afp", + "afp.dupe", + "afp.epi", + "para.case", + "es", + "es.dupe", + "sia", + "sia.dupe", + "pos", + "pos.dupe", + "other", + "other.dupe", + "dist.pop", + "prov.pop", + "ctry.pop", + "global.ctry", + "global.prov", + "global.dist", + "ctry.coverage", + "prov.coverage", + "dist.coverage", + "roads", + "cities", + "metadata" + ) + data <- list.files(path) + + if (length(dataset) == 1 && dataset == "all") { + raw_data <- list() + + for (i in valid_values) { + + tryCatch({ + raw_data[[i]] <- arrow::open_dataset(file.path(path, i)) + }, error = \(e) { + cli::cli_alert_info(paste0("Dataset not found and won't be added: ", i)) + raw_data[[i]] <- NULL + }) + + } + } else if (length(dataset) > 1) { + invalid <- setdiff(dataset, valid_values) + + if (length(invalid) > 0) { + cli::cli_alert_info("The following type passed are invalid and won't be loaded: ") + cli::cli_li(invalid) + } + + valid <- dataset[!dataset %in% invalid] + + if (length(valid) == 0) { + cli::cli_abort("All the dataset passed are invalid.") + } + + has_all <- sum(stringr::str_detect(valid, "all")) + + if (has_all >= 1) { + cli::cli_abort("Please pass only 'all'.") + } + + raw_data <- list() + + for (i in valid) { + tryCatch({ + raw_data[[i]] <- arrow::open_dataset(file.path(path, i)) + }, error = \(e) { + cli::cli_alert_info(paste0("Dataset not found and won't be added: ", i)) + raw_data[[i]] <- NULL + }) + } + } else if (length(dataset) == 1 && dataset %in% valid_values) { + raw_data <- arrow::open_dataset(file.path(path, dataset)) + } + + return(raw_data) +} + +#' Build raw_data using EDAV files +#' +#' @param path `str` Path to EDAV folder containing parquet files. This must +#' be the absolute file path from the Blob endpoint of the container. +#' @param dataset `str` A specific dataset. Defaults to `"all"`. Otherwise, can specify any dataset in the list returned by [get_all_polio_data()]. +#' @param container `azcontainer` An instance of an Azure container to connect +#' to. Pass [get_azure_storage_connection()] using defaults if not accessing +#' using a service principal. +#' +#' @returns `list` A list containing connections to the folders associated with +#' individual datasets in the original output of [get_all_polio_data()]. +#' @keywords internal +#' +build_parquet_raw_data_edav <- function(path = NULL, dataset = "all", container = get_azure_storage_connection()) { + + valid_values <- c( + "afp", + "afp.dupe", + "afp.epi", + "para.case", + "es", + "es.dupe", + "sia", + "sia.dupe", + "pos", + "pos.dupe", + "other", + "other.dupe", + "dist.pop", + "prov.pop", + "ctry.pop", + "global.ctry", + "global.prov", + "global.dist", + "ctry.coverage", + "prov.coverage", + "dist.coverage", + "roads", + "cities", + "metadata" + ) + + exist <- edav_io("exists.dir", NULL, file_loc = path, azcontainer = container) + + if (!exist) { + cli::cli_abort("The directory does not exist on EDAV.") + } else { + rm(exist) + } + + cli::cli_process_start("Building raw_data from EDAV parquet files") + + raw_data <- NULL + + if (length(dataset) == 1 && dataset == "all") { + source_path <- file.path(path, "raw_data_parquet/*") + local_pq <- file.path(rappdirs::user_data_dir("sirfunctions"), "raw_data_parquet") + } else if (length(dataset) > 1) { + + invalid <- setdiff(dataset, valid_values) + + if (length(invalid) > 0) { + cli::cli_alert_info( + "The following type passed are invalid and won't be loaded: " + ) + cli::cli_li(invalid) + } + + valid <- dataset[!dataset %in% invalid] + + if (length(valid) == 0) { + cli::cli_abort("All the dataset passed are invalid.") + } + + has_all <- sum(stringr::str_detect(valid, "all")) + + if (has_all >= 1) { + cli::cli_abort("Please pass only 'all'.") + } + + source_path <- paste0(file.path(path, "raw_data_parquet"), "/", valid, "/*") + local_pq <- paste0(file.path(rappdirs::user_data_dir("sirfunctions"), "raw_data_parquet"), "/", valid) + } else if (length(dataset) == 1 && dataset %in% valid_values) { + source_path <- paste0(file.path(path, "raw_data_parquet", dataset), "/*") + local_pq <- paste0(file.path(rappdirs::user_data_dir("sirfunctions"), "raw_data_parquet"), "/", dataset) + } + + for (i in local_pq) { + + + unlink(i, recursive = TRUE, force = TRUE) + dir.create(i, recursive = TRUE) + + + } + + if (length(source_path) > 1) { + for (i in length(source_path)) { + + AzureStor::storage_multidownload( + container, + src = source_path[i], + dest = local_pq[i], + recursive = TRUE, + overwrite = TRUE + ) + } + } else { + AzureStor::storage_multidownload( + container, + src = source_path, + dest = local_pq, + recursive = TRUE, + overwrite = TRUE + ) + } + + raw_data <- build_parquet_raw_data_local(file.path(rappdirs::user_data_dir("sirfunctions"), "raw_data_parquet"), dataset) + cli::cli_process_done() + + return(raw_data) + +} + +#' Drop Shape column and convert to binary +#' +#' @param x `sf` or `data.frame` Geodata. +#' +#' @details +#' This function was written using the CDC EDAV Chatbot using the model GPT-5.2. +#' @returns `tibble` dData without any Shape column. +#' +#' @keywords internal +#' +to_wkb_drop_sf <- function(sf_data) { + if ("Shape" %in% names(sf_data)) { + geom_col <- "Shape" + } else if ("geometry" %in% names(sf_data)) { + geom_col <- "geometry" + } else { + cli::cli_abort("Not an sf dataset.") + } + + # Works whether x is sf or a plain data.frame with an sfc column + geom <- if (inherits(sf_data, "sf")) { + sf::st_geometry(sf_data) + } else { + sf_data[[geom_col]] + } + + # Convert to WKB (list of raw vectors), then drop the "WKB" class + wkb <- sf::st_as_binary(geom) + wkb <- unclass(wkb) # <- key line: makes it a plain list Arrow can infer + + sf_data[[geom_col]] <- wkb + + return(sf_data) +} \ No newline at end of file diff --git a/R/get_all_polio_data.R b/R/get_all_polio_data.R new file mode 100644 index 00000000..9630661f --- /dev/null +++ b/R/get_all_polio_data.R @@ -0,0 +1,947 @@ +#' Retrieve all pre-processed polio data +#' +#' @description Download POLIS data from the CDC pre-processed endpoint. By default +#' this function will return a "small" or recent dataset. This is primarily for data +#' that is from the past six years. You can specify a "medium" sized dataset for data +#' that is from 2016 onwards. Finally the "large" sized dataset will provide information +#' from 2000 onwards. Regular pulls form the data will recreate the "small" dataset +#' when new information is available and the Data Management Team can force the +#' creation of the "medium" and "large" static datasets as necessary. +#' +#' @param size `str` Size of data to download. Defaults to `"small"`. +#' - `"small"`: Data from the last six years. +#' - `"medium"`: Data from 2016-present. +#' - `"large"`: Data from 2000-present. +#' @param data_folder `str` Location of the data folder containing pre-processed POLIS data, +#' spatial files, coverage data, and population data. Defaults to `"GID/PEB/SIR/Data"`. +#' @param polis_folder `str` Location of the POLIS folder. Defaults to `"GID/PEB/SIR/POLIS"`. +#' @param core_ready_folder `str` Which core ready folder to use. Defaults to `"Core_Ready_Files"`. +#' @param force.new.run `logical` Default `FALSE`, if `TRUE` will run recent data and cache. +#' @param recreate.static.files `logical` Default `FALSE`, if `TRUE` will run all data and cache. +#' @param attach.spatial.data `logical` Default `TRUE`, adds spatial data to downloaded object. +#' @param use_edav `logical` Build raw data list using EDAV files. Defaults to `TRUE`. +#' @param archive Logical. Whether to archive previous output directories +#' before overwriting. Default is `TRUE`. +#' @param keep_n_archives Numeric. Number of archive folders to retain. +#' Defaults to `Inf`, which keeps all archives. Set to a finite number +#' (e.g., 3) to automatically delete older archives beyond the N most recent. +#' @param output_format str: output_format to save files as. +#' Available formats include 'rds' and 'qs2'. Defaults is 'rds'. +#' @param local_caching `logical` Enable local caching so data is stored locally and +#' only downloaded when there is updated data from EDAV. +#' @param use_archived_data `logical` Allows the ability to recreate the raw data file using previous +#' preprocessed data. If +#' @returns Named `list` containing polio data that is relevant to CDC. +#' @examples +#' \dontrun{ +#' raw.data <- get_all_polio_data() # downloads data for last 6 years, including spatial files +#' raw.data <- get_all_polio_data(size = "small", attach.spatial.data = FALSE) # exclude spatial data +#' } +#' +#' @export +get_all_polio_data <- function( + size = "small", + data_folder = "GID/PEB/SIR/Data", + polis_folder = "GID/PEB/SIR/POLIS", + core_ready_folder = "Core_Ready_Files", + force.new.run = FALSE, + recreate.static.files = FALSE, + attach.spatial.data = TRUE, + use_edav = TRUE, + use_archived_data = FALSE, + archive = TRUE, + keep_n_archives = Inf, + output_format = "rds", + local_caching = TRUE) { + + # check to see that size parameter is appropriate + if (!size %in% c("small", "medium", "large")) { + stop("The parameter 'size' must be either 'small', 'medium', or 'large'") + } + + # Check output format + if (!output_format %in% c("rds", "qs2")) { + stop("Only rds and qs2 is supported at this time.") + } + +# normalize and validate both output formats +output_format <- normalize_format(output_format) + +# Fail safe in instances where EDAV connection fails +if (use_edav) { + verify_edav <- tryCatch( + { + invisible(capture.output(test_EDAV_connection())) + cli::cli_alert_success("Connect to EDAV successful.") + TRUE + }, + error = \(e) { + cli::cli_alert_info("Connection to EDAV unsuccessful.") + FALSE + } + ) + + if (!verify_edav) { + cli::cli_alert_info("Unable to obtain data from EDAV. Loading from local cache instead.") + cli::cli_alert_info("NOTE: Data may be stale. Please review the global polio dataset metadata for information on when the data was last processed.") + raw.data <- force_load_polio_data_cache(attach.spatial.data, output_format) + return(raw.data) + } +} + +# Constant variables +# Each file comes out of these folders +analytic_folder <- file.path(data_folder, "analytic") +polis_data_folder <- file.path(data_folder, "polis") +spatial_folder <- file.path(data_folder, "spatial") +coverage_folder <- file.path(data_folder, "coverage") +pop_folder <- file.path(data_folder, "pop") + +# Year cutoffs for the different datasets +current_year <- lubridate::year(Sys.Date()) +small_year <- current_year - 5 +med_year <- 2016 #hardcode to 2016 because it's an important point in time + +# Required files +raw_data_recent_name <- paste0("raw.data.recent", output_format) +raw_data_medium_name <- paste0("raw.data.", med_year, ".", small_year - 1, output_format) +raw_data_2000_name <- paste0("raw.data.2000.", med_year - 1, output_format) +spatial_data_name <- paste0("spatial.data", output_format) +global_ctry_sf_name <- "global.ctry.rds" +global_prov_sf_name <- "global.prov.rds" +global_dist_sf_name <- "global.dist.rds" + +# Perform check to build using the archived polis folder +if (use_archived_data) { + cli::cli_alert_info("Using archived data") + cli::cli_alert_info("NOTE: the metadata will be for the most recent pull") + polis_data_folder <- get_archived_polis_data( + data_folder, + use_edav, + keep_n_archives + ) + recreate.static.files <- TRUE +} + +# look to see if the recent raw data rds is in the analytic folder +prev_table <- sirfunctions_io("list", NULL, analytic_folder, + edav = use_edav +) + +if (nrow(prev_table) > 0) { + prev_table <- prev_table |> + dplyr::filter(grepl(raw_data_recent_name, name)) |> + dplyr::select("file" = "name", "size", "ctime" = "lastModified") +} else { + # if empty, make sure to recreate tibble to the right format + prev_table <- tibble( + "file" = NA, + "size" = NA, + "ctime" = NA + ) |> + dplyr::mutate(file = as.character(file), + size = as.double(size), + ctime = as_datetime(ctime)) |> + dplyr::filter(!is.na(file)) +} + +if (recreate.static.files | force.new.run) { + force.new.run <- T + create.cache <- T +} + + +if (!force.new.run) { + + # Check if using the local cache is sufficient + if (use_edav & size == "small" & local_caching) { + if (!recache_raw_data(analytic_folder, use_edav, output_format)) { + + raw.data <- sirfunctions_io("read", NULL, file.path(rappdirs::user_data_dir("sirfunctions"), + paste0("raw_data", output_format)), + edav = FALSE) + + cli::cli_process_start("Checking for duplicates in datasets.") + raw.data <- duplicate_check(raw.data) + cli::cli_process_done() + if (attach.spatial.data) { + if (!recache_spatial_data(analytic_folder, spatial_folder, + use_edav, output_format)) { + spatial.data <- sirfunctions_io("read", NULL, file.path(rappdirs::user_data_dir("sirfunctions"), + paste0("spatial_data", output_format)), + edav = FALSE) + raw.data$global.ctry <- spatial.data$global.ctry + raw.data$global.prov <- spatial.data$global.prov + raw.data$global.dist <- spatial.data$global.dist + raw.data$roads <- spatial.data$roads + raw.data$cities <- spatial.data$cities + + return(raw.data) + } else { + spatial.data <- sirfunctions_io("read", NULL, file.path(analytic_folder, spatial_data_name), + edav = use_edav) + sirfunctions_io("write", NULL, file.path(rappdirs::user_data_dir("sirfunctions"), + paste0("spatial_data", output_format)), + obj = spatial.data, + edav = FALSE) + edav_spatial_timestamp <- sirfunctions_io( + "read", + NULL, + file.path(analytic_folder, paste0("spatial_timestamp", output_format)), + edav = use_edav + ) + sirfunctions_io("write", NULL, file.path(rappdirs::user_data_dir("sirfunctions"), + paste0("spatial_timestamp", output_format)), + obj = edav_spatial_timestamp, + edav = FALSE) + + raw.data$global.ctry <- spatial.data$global.ctry + raw.data$global.prov <- spatial.data$global.prov + raw.data$global.dist <- spatial.data$global.dist + raw.data$roads <- spatial.data$roads + raw.data$cities <- spatial.data$cities + + return(raw.data) + } + } else { + return(raw.data) + } + } + } + + if (use_edav) { + cli::cli_alert_info(paste0("Downloading most recent active polio data from ", small_year," onwards")) + } else { + cli::cli_alert_info(paste0("Loading most recent active polio data from ", small_year," onwards")) + } + + raw.data.small.pull <- sirfunctions_io("read", NULL, prev_table$file, edav = use_edav) + + if (size == "small") { + raw.data <- raw.data.small.pull + } + + if (size == "medium") { + prev_table <- sirfunctions_io("list", NULL, analytic_folder, edav = use_edav) |> + dplyr::filter(grepl(raw_data_medium_name, name)) |> + dplyr::select("file" = "name", "size", "ctime" = "lastModified") + + if (use_edav) { + cli::cli_alert_info(paste0("Downloading static polio data from ", med_year, "-", small_year)) + } else { + cli::cli_alert_info(paste0("Loading static polio data from ", med_year, "-", small_year)) + } + + raw.data.medium.pull <- sirfunctions_io("read", NULL, prev_table$file, edav = use_edav) + + raw.data <- split_concat_raw_data( + action = "concat", + raw.data.small.pull = raw.data.small.pull, + raw.data.medium.pull = raw.data.medium.pull + ) + } + + if (size == "large") { + prev_table <- sirfunctions_io("list", NULL, analytic_folder, + edav = use_edav, full_names = TRUE + ) |> + dplyr::filter(grepl(raw_data_medium_name, name)) |> + dplyr::select("file" = "name", "size", "ctime" = "lastModified") + + if (use_edav) { + cli::cli_alert_info(paste0("Downloading static polio data from ", med_year, "-", small_year)) + } else { + cli::cli_alert_info(paste0("Loading static polio data from ", med_year, "-", small_year)) + } + + raw.data.medium.pull <- sirfunctions_io("read", NULL, prev_table$file, edav = use_edav) + + prev_table <- sirfunctions_io("list", NULL, analytic_folder, edav = use_edav) |> + dplyr::filter(grepl(raw_data_2000_name, name)) |> + dplyr::select("file" = "name", "size", "ctime" = "lastModified") + + if (use_edav) { + cli::cli_alert_info(paste0("Downloading static polio data from 2001-", med_year)) + } else { + cli::cli_alert_info(paste0("Loading static polio data from 2001-", med_year)) + } + + raw.data.large.pull <- sirfunctions_io("read", NULL, prev_table$file, edav = use_edav) + + raw.data <- split_concat_raw_data( + action = "concat", + raw.data.small.pull = raw.data.small.pull, + raw.data.medium.pull = raw.data.medium.pull, + raw.data.large.pull = raw.data.large.pull + ) + } + + # Only cache the small dataset, which we use in 90% of the case + if (use_edav & local_caching) { + raw_data_timestamp_exists <- invisible(sirfunctions_io( + "exists.file", + NULL, + file.path(analytic_folder, paste0("raw_data_timestamp", output_format)), + edav = use_edav + )) + + } else { + raw_data_timestamp_exists <- FALSE + } + if (size == "small" & raw_data_timestamp_exists & local_caching) { + cli::cli_process_start("Caching global polio data locally") + + if (!dir.exists(rappdirs::user_data_dir("sirfunctions"))) { + dir.create(rappdirs::user_data_dir("sirfunctions"), recursive = TRUE) + } + + sirfunctions_io("write", NULL, + file.path(rappdirs::user_data_dir("sirfunctions"), paste0("raw_data", output_format)), + obj = raw.data, + edav = FALSE) + # Add edav tag file to local cache dir + edav_raw_data_timestamp <- sirfunctions_io( + "read", + NULL, + file.path(analytic_folder, paste0("raw_data_timestamp", output_format)), + edav = use_edav + ) + + sirfunctions_io("write", NULL, + file.path(rappdirs::user_data_dir("sirfunctions"), paste0("raw_data_timestamp", output_format)), + obj = edav_raw_data_timestamp, + edav = FALSE) + + cli::cli_process_done() + } + + cli::cli_process_done() + + cli::cli_process_start("Checking for duplicates in datasets.") + raw.data <- duplicate_check(raw.data) + cli::cli_process_done() + + if (attach.spatial.data) { + + # Don't recache spatial if up to date + if (!recache_spatial_data(analytic_folder, spatial_folder, + use_edav, output_format) & local_caching) { + spatial.data <- sirfunctions_io("read", NULL, file.path(rappdirs::user_data_dir("sirfunctions"), + paste0("spatial_data", output_format)), + edav = FALSE) + raw.data$global.ctry <- spatial.data$global.ctry + raw.data$global.prov <- spatial.data$global.prov + raw.data$global.dist <- spatial.data$global.dist + raw.data$roads <- spatial.data$roads + raw.data$cities <- spatial.data$cities + + return(raw.data) + } + + if (use_edav) { + cli::cli_process_start("Downloading and attaching spatial data") + } else { + cli::cli_process_start("Loading and attaching spatial data") + } + + spatial.data <- sirfunctions_io("read", NULL, + file.path(analytic_folder, spatial_data_name), + edav = use_edav + ) + + raw.data$global.ctry <- spatial.data$global.ctry + raw.data$global.prov <- spatial.data$global.prov + raw.data$global.dist <- spatial.data$global.dist + raw.data$roads <- spatial.data$roads + raw.data$cities <- spatial.data$cities + + cli::cli_process_done() + + if (use_edav & local_caching) { + spatial_timestamp_exists <- sirfunctions_io( + "exists.file", + NULL, + file.path(analytic_folder, paste0("spatial_timestamp", output_format)), + edav = use_edav + ) + } else { + spatial_timestamp_exists <- FALSE + } + + if (recache_spatial_data(analytic_folder, spatial_folder, + use_edav, output_format) & spatial_timestamp_exists & local_caching) { + sirfunctions_io("write", + NULL, + file.path(rappdirs::user_data_dir("sirfunctions"), + paste0("spatial_data", + output_format)), + obj = spatial.data, + edav = FALSE) + + spatial_processed_tag <- sirfunctions_io("read", + NULL, + file.path(analytic_folder, + paste0("spatial_timestamp", output_format)), + edav = use_edav) + sirfunctions_io("write", + NULL, + file.path(rappdirs::user_data_dir("sirfunctions"), + paste0("spatial_timestamp", output_format)), + obj = spatial_processed_tag, + edav = FALSE) + } + } + + return(raw.data) + +} else { + + # Check that the required folders have data + for (folder in c(analytic_folder, polis_data_folder, spatial_folder, + coverage_folder, pop_folder)) { + + # get_all_polio_data will recreate the analytic folder if it's missing + switch(basename(folder), + "analytic" = { + if (!sirfunctions_io("exists.dir", NULL, folder, edav = use_edav)) { + cli::cli_alert_info("No analytics folder found. Will create a new one.") + sirfunctions_io("create.dir", NULL, folder, edav = use_edav) + } + }, + "polis" = { + if (!sirfunctions_io("exists.dir", NULL, folder, edav = use_edav)) { + cli::cli_alert_info("Creating polis folder in the data folder") + sirfunctions_io("create.dir", NULL, folder, edav = use_edav) + } else { + cli::cli_alert_info("Moving updated polis data to the data folder") + } + + + create_polis_data_folder( + data_folder, + polis_folder, + core_ready_folder, + use_edav, + archive, + keep_n_archives + ) + + }, + "spatial" = { + if (!sirfunctions_io("exists.dir", NULL, folder, edav = use_edav)) { + cli::cli_abort(paste0("No spatial data found in the data folder.", + " Ensure that the output folder when running ", + " tidypolis::process_spatial() is ", + spatial_folder), + ) + } + }, + "coverage" = { + if (!sirfunctions_io("exists.dir", NULL, folder, edav = use_edav)) { + cli::cli_abort(paste0("Coverage data not found.", + "Please create and add coverage data in: ", + folder)) + } + }, + "pop" = { + if (!sirfunctions_io("exists.dir", NULL, folder, edav = use_edav)) { + cli::cli_abort(paste0("Population data not found. ", + "Preprocessing of population files may be required. ", + "Please create a pop data folder and add data in: ", + folder)) + } + } + ) + } + + if (use_edav) { + cli::cli_h1("Testing download times") + download_metrics <- test_EDAV_connection(return_list = T) + } + + # use the truncated AFP file + afp.trunc <- T + + if (recreate.static.files) { + afp.trunc <- F + } + + dl_table <- dplyr::bind_rows( + sirfunctions_io("list", NULL, polis_data_folder, edav = use_edav), + sirfunctions_io("list", NULL, spatial_folder, edav = use_edav), + sirfunctions_io("list", NULL, coverage_folder, edav = use_edav), + sirfunctions_io("list", NULL, pop_folder, edav = use_edav), + sirfunctions_io("list", NULL, polis_folder, edav = use_edav) |> + dplyr::filter(grepl("cache", name)) + ) |> + dplyr::filter(!is.na(size)) |> + dplyr::select("file" = "name", "size") + + if (use_edav) { + dl_table <- dl_table |> + dplyr::mutate( + "dl_time_sec" = size / download_metrics$size * download_metrics$d + ) + } + + if (afp.trunc) { + dl_table <- dl_table |> + dplyr::filter(!grepl("afp_linelist_2001", file)) + } else { + dl_table <- dl_table |> + dplyr::filter(!grepl("afp_linelist_2019", file)) + } + + file_size <- dl_table$size |> sum() + + if (use_edav) { + download_time <- dl_table$dl_time_sec |> sum() + } + + if (use_edav) { + cli::cli_h1("Downloading POLIS Data") + } else { + cli::cli_h1("Loading POLIS Data") + } + + raw.data <- list() + spatial.data <- list() + + # Check if spatial data needs to be redownloaded from the analytics folder + spatial_timestamp_exists <- sirfunctions_io( + "exists.file", + NULL, + file.path(analytic_folder, paste0("spatial_timestamp", output_format)), + edav = use_edav + ) + + if (spatial_timestamp_exists) { + # Check if it's recent or needs updating + edav_spatial_timestamp <- sirfunctions_io( + "read", + NULL, + file.path(analytic_folder, paste0("spatial_timestamp", output_format)), + edav = use_edav + ) |> + dplyr::select(name, lastModifiedEDAV = lastModified) + + edav_spatial_folder_info <- sirfunctions_io( + "list", + NULL, + file.path(spatial_folder), + edav = use_edav + ) |> + dplyr::select(name, lastModified) + + spatial_timestamp_comparison <- dplyr::left_join(edav_spatial_timestamp, + edav_spatial_folder_info) |> + dplyr::mutate(updated = ifelse(lastModifiedEDAV == lastModified, TRUE, FALSE)) |> + dplyr::pull(updated) |> sum(na.rm = TRUE) + } else { + + spatial_timestamp_comparison <- 0 + + } + + if (spatial_timestamp_comparison == 3) { + cli::cli_alert_success("Spatial data in the analytic folder is up to date. Loading from cache...") + spatial.data <- sirfunctions_io( + "read", + NULL, + file.path(analytic_folder, spatial_data_name), + edav = use_edav + ) + } else { + if (spatial_timestamp_exists) { + cli::cli_alert_warning("Spatial data in the analytic folder is outdated. Recreating from the spatial folder") + } else { + cli::cli_alert_warning("No spatial timestamp exists. Recreating from the spatial folder") + } + + cli::cli_process_start("1) Loading country shape files") + spatial.data$global.ctry <- load_clean_ctry_sp( + fp = file.path(spatial_folder, global_ctry_sf_name), + edav = use_edav + ) + cli::cli_process_done() + + cli::cli_process_start("2) Loading province shape files") + spatial.data$global.prov <- load_clean_prov_sp( + fp = file.path(spatial_folder, global_prov_sf_name), + edav = use_edav + ) + cli::cli_process_done() + + cli::cli_process_start("3) Loading district shape files") + spatial.data$global.dist <- load_clean_dist_sp( + fp = file.path(spatial_folder, global_dist_sf_name), + edav = use_edav + ) + cli::cli_process_done() + } + + cli::cli_process_start("4) Loading AFP line list data (This file is almost 3GB and can take a while)") + raw.data$afp <- + sirfunctions_io("read", NULL, file_loc = dplyr::filter( + dl_table, + grepl("afp", file) + ) |> + dplyr::pull(file), edav = use_edav) |> + dplyr::filter(surveillancetypename == "AFP") |> + dplyr::mutate( + cdc.classification.all2 = dplyr::case_when( + final.cell.culture.result == "Not received in lab" & + cdc.classification.all == "PENDING" ~ "LAB PENDING", + TRUE ~ cdc.classification.all + ), + hot.case = ifelse( + paralysis.asymmetric == "Yes" & + paralysis.onset.fever == "Yes" & + paralysis.rapid.progress == "Yes", + 1, + 0 + ), + hot.case = ifelse(is.na(hot.case), 99, hot.case) + ) + + cli::cli_process_done() + + cli::cli_process_start("Processing AFP data for analysis") + + raw.data$afp.epi <- raw.data$afp |> + dplyr::mutate(epi.week = lubridate::epiweek(dateonset)) |> + dplyr::group_by(place.admin.0, epi.week, yronset, cdc.classification.all2) |> + dplyr::summarize(afp.cases = dplyr::n(), + .groups = "drop") |> + dplyr::mutate(epiweek.year = paste(yronset, epi.week, sep = "-")) |> + # manual fix of epi week + dplyr::mutate(epi.week = ifelse(epi.week == 52 & + yronset == 2022, 1, epi.week)) + + # factoring cdc classification to have an order we like in stacked bar chart + raw.data$afp.epi$cdc.classification.all2 <- + factor( + raw.data$afp.epi$cdc.classification.all2, + levels = c( + "WILD 1", + "cVDPV 2", + "VDPV 2", + "cVDPV 1", + "VDPV 1", + "COMPATIBLE", + "PENDING", + "LAB PENDING", + "NPAFP", + "NOT-AFP", + "UNKNOWN", + "aVDPV 1", + "aVDPV 3", + "cVDPV1andcVDPV2", + "CombinationWild1-cVDPV 2", + "aVDPV 2", + "VDPV 3", + "iVDPV 2", + "VDPV1andcVDPV2", + "VAPP", + "cVDPV 3", + "iVDPV 3", + "WILD 3", + "WILD1andWILD3", + "iVDPV 1", + "cVDPV2andcVDPV3" + ), + labels = c( + "WILD 1", + "cVDPV 2", + "VDPV 2", + "cVDPV 1", + "VDPV 1", + "COMPATIBLE", + "PENDING", + "LAB PENDING", + "NPAFP", + "NOT-AFP", + "UNKNOWN", + "aVDPV 1", + "aVDPV 3", + "cVDPV1andcVDPV2", + "CombinationWild1-cVDPV 2", + "aVDPV 2", + "VDPV 3", + "iVDPV 2", + "VDPV1andcVDPV2", + "VAPP", + "cVDPV 3", + "iVDPV 3", + "WILD 3", + "WILD1andWILD3", + "iVDPV 1", + "cVDPV2andcVDPV3" + ) + ) + + raw.data$para.case <- raw.data$afp |> + dplyr::filter( + stringr::str_detect(cdc.classification.all2, "VDPV|WILD|COMPATIBLE") + ) |> + dplyr::mutate(yronset = ifelse(is.na(yronset) == T, 2022, yronset)) # this fix was for the manually added MOZ case + cli::cli_process_done() + + + cli::cli_process_start("5) Loading population data") + raw.data$dist.pop <- + sirfunctions_io("read", NULL, + dplyr::filter(dl_table, grepl("dist.pop", file)) |> + dplyr::pull(file), + edav = use_edav + ) |> + dplyr::ungroup() + + raw.data$prov.pop <- + sirfunctions_io("read", NULL, + file_loc = dplyr::filter(dl_table, grepl("prov.pop", file)) |> + dplyr::pull(file), edav = use_edav + ) |> + dplyr::ungroup() + + raw.data$ctry.pop <- + sirfunctions_io("read", NULL, + dplyr::filter(dl_table, grepl("ctry.pop", file)) |> + dplyr::pull(file), + edav = use_edav + ) |> + dplyr::ungroup() + cli::cli_process_done() + + + cli::cli_process_start("6) Loading coverage data") + raw.data$ctry.coverage <- sirfunctions_io("read", NULL, + file_loc = dplyr::filter(dl_table, grepl("ctry_cov", file)) |> + dplyr::pull(file), edav = use_edav + ) + + raw.data$prov.coverage <- sirfunctions_io("read", NULL, + file_loc = dplyr::filter(dl_table, grepl("prov_cov", file)) |> + dplyr::pull(file), edav = use_edav + ) + + raw.data$dist.coverage <- sirfunctions_io("read", NULL, + file_loc = dplyr::filter(dl_table, grepl("dist_cov", file)) |> + dplyr::pull(file), edav = use_edav + ) + + cli::cli_process_done() + + cli::cli_process_start("7) Loading ES data") + + raw.data$es <- + sirfunctions_io("read", NULL, + file_loc = dplyr::filter(dl_table, grepl("/es_2001", file)) |> + dplyr::pull(file), edav = use_edav + ) + cli::cli_process_done() + + cli::cli_process_start("8) Loading SIA data") + raw.data$sia <- + sirfunctions_io("read", NULL, + file_loc = dplyr::filter(dl_table, grepl("sia", file)) |> + dplyr::pull(file), edav = use_edav + ) + + cli::cli_process_done() + + cli::cli_process_start("9) Loading all positives") + raw.data$pos <- + sirfunctions_io("read", NULL, + file_loc = dplyr::filter(dl_table, grepl("/pos", file)) |> + dplyr::pull(file), edav = use_edav + ) + + cli::cli_process_done() + + cli::cli_process_start("10) Loading other surveillance linelist") + raw.data$other <- + sirfunctions_io("read", NULL, + file_loc = dplyr::filter(dl_table, grepl("/other", file)) |> + dplyr::pull(file), edav = use_edav + ) + + cli::cli_process_done() + + cli::cli_process_start("11) Loading road network data") + spatial.data$roads <- sirfunctions_io("read", NULL, + file_loc = dplyr::filter(dl_table, grepl("roads.rds", file)) |> + dplyr::pull(file), edav = use_edav + ) + cli::cli_process_done() + + cli::cli_process_start("12) Loading city spatial data") + spatial.data$cities <- sirfunctions_io("read", NULL, + file_loc = dplyr::filter(dl_table, grepl("cities.rds", file)) |> + dplyr::pull(file), edav = use_edav + ) + cli::cli_process_done() + + cli::cli_process_start("13) Creating Metadata object") + + polis.cache <- sirfunctions_io("read", NULL, + file_loc = dplyr::filter(dl_table, grepl("cache.rds", file)) |> + dplyr::pull(file), edav = use_edav + ) |> + dplyr::mutate(last_sync = as.Date(last_sync)) + + raw.data$metadata$download_time <- max(polis.cache$last_sync, na.rm = TRUE) + + raw.data$metadata$processed_time <- sirfunctions_io("list", NULL, + file.path(polis_folder, "data", core_ready_folder), + edav = use_edav + ) |> + dplyr::filter(grepl("positives_2001-01-01", name)) |> + dplyr::select("ctime" = "lastModified") |> + dplyr::mutate(ctime = as.Date(ctime)) |> + dplyr::pull(ctime) + + raw.data$metadata$user <- polis.cache |> + dplyr::filter(table == "virus") |> + dplyr::pull(last_user) + + raw.data$metadata$most_recent_pos <- max(raw.data$pos$dateonset, na.rm = TRUE) + raw.data$metadata$most_recent_pos_loc <- raw.data$pos |> + dplyr::arrange(dplyr::desc(dateonset)) |> + dplyr::slice(1) |> + dplyr::pull(place.admin.0) + + + raw.data$metadata$most_recent_afp <- max(raw.data$afp$dateonset, na.rm = TRUE) + raw.data$metadata$most_recent_afp_loc <- raw.data$afp |> + dplyr::arrange(dplyr::desc(dateonset)) |> + dplyr::slice(1) |> + dplyr::pull(place.admin.0) + + + raw.data$metadata$most_recent_env <- max(raw.data$es$collect.date, na.rm = TRUE) + raw.data$metadata$most_recent_env_loc <- raw.data$es |> + dplyr::arrange(dplyr::desc(collect.date)) |> + dplyr::slice(1) |> + dplyr::pull(ADM0_NAME) + + + raw.data$metadata$most_recent_sia <- max(raw.data$sia$sub.activity.start.date) + raw.data$metadata$most_recent_sia_code <- raw.data$sia |> + dplyr::arrange(dplyr::desc(sub.activity.start.date)) |> + dplyr::slice(1) |> + dplyr::pull(sia.code) + raw.data$metadata$most_recent_sia_location <- raw.data$sia |> + dplyr::arrange(dplyr::desc(sub.activity.start.date)) |> + dplyr::slice(1) |> + dplyr::pull(place.admin.0) + raw.data$metadata$most_recent_sia_vax <- raw.data$sia |> + dplyr::arrange(dplyr::desc(sub.activity.start.date)) |> + dplyr::slice(1) |> + dplyr::pull(vaccine.type) + + raw.data$metadata$most_recent_vdpv_class_change_date <- raw.data$pos$vdpvclassificationchangedate |> + lubridate::as_date() |> + max(na.rm = T) + + rm(polis.cache) + + cli::cli_process_done() + + cli::cli_process_start("14) Clearing out unused memory") + gc() + cli::cli_process_done() +} + +if (create.cache) { + cli::cli_process_start("15) Caching processed data") + + out <- split_concat_raw_data(action = "split", split.years = c(2000, med_year, small_year), raw.data.all = raw.data) + + out_files <- out$split.years |> + dplyr::mutate( + file_name = ifelse(grepl(current_year, tag), "recent", stringr::str_replace_all(tag, "-", ".")), + file_name = paste0("raw.data.", file_name, output_format) + ) + + if (!recreate.static.files) { + out_files <- out_files |> dplyr::filter(grepl("recent", file_name)) + } + + if (!use_archived_data) { + for (i in 1:nrow(out_files)) { + sirfunctions_io("write", NULL, + file_loc = file.path(analytic_folder, dplyr::pull(out_files[i, ], file_name)), + obj = out[[dplyr::pull(out_files[i, ], tag)]], + edav = use_edav + )} + } + +# set up path for spatial df + sp_file_path <- file.path(analytic_folder, paste0("spatial.data", output_format)) + + sirfunctions_io("write", NULL, + file_loc = sp_file_path, + obj = spatial.data, edav = use_edav + ) + + # Create tags only if not using "archived" version + if (use_edav & !use_archived_data) { + # Create raw data file tag for future comparisons + sirfunctions_io("write", NULL, + file_loc = file.path(analytic_folder, paste0("raw_data_timestamp", output_format)), + obj = Sys.time()) + + # Create spatial data file tag for future comparisons + spatial_files <- sirfunctions_io("list", + NULL, + spatial_folder, + edav = use_edav, + full_names = TRUE) + + edav_spatial_timestamp <- spatial_files |> + dplyr::filter(stringr::str_detect(name, "global."), + stringr::str_ends(name, output_format)) |> + dplyr::select(name, lastModified) + + sirfunctions_io( + "write", + NULL, + file.path(analytic_folder, paste0("spatial_timestamp", output_format)), + obj = edav_spatial_timestamp, + edav = use_edav + ) + } + + cli::cli_process_done() +} + +raw_data_cut_size <- switch(size, + "small" = small_year, + "medium" = med_year, + "large" = 2000) + +raw.data <- split_concat_raw_data(action = "split", + split.years = raw_data_cut_size, + raw.data.all = raw.data)[[1]] + +cli::cli_process_start("Checking for duplicates in datasets.") +raw.data <- duplicate_check(raw.data) +cli::cli_process_done() + +if (attach.spatial.data) { + raw.data$global.ctry <- spatial.data$global.ctry + raw.data$global.prov <- spatial.data$global.prov + raw.data$global.dist <- spatial.data$global.dist + raw.data$roads <- spatial.data$roads + raw.data$cities <- spatial.data$cities +} + +if (use_archived_data) { + cli::cli_alert_success(paste0("Successfully recreated global polio data from ", + basename(polis_data_folder))) +} + +return(raw.data) + +} \ No newline at end of file diff --git a/R/get_all_polio_data_2.R b/R/get_all_polio_data_2.R new file mode 100644 index 00000000..6b27a8f3 --- /dev/null +++ b/R/get_all_polio_data_2.R @@ -0,0 +1,639 @@ +# Helper functions + +#' Checks for required subfolders in the data folder +#' +#' @param data_folder `str` Path to the data folder. +#' @param polis_folder `str` POLIS folder with preprocessed data. +#' @param core_ready_folder `str` Name of the core ready folder. Need to be specified if preprocessing specific regions, which have their own core ready folder. +#' @param use_edav `logical` Whether to use EDAV or not. +#' @param cache `logical` Whether to cache the preprocessed data to data/polis subfolder. +#' +#' @returns `list` List of paths to the specific subfolders. +#' +#' @keywords internal +check_data_folder <- function(data_folder, polis_folder, core_ready_folder, use_edav, cache) { + + analytic_folder <- file.path(data_folder, "analytic") + polis_data_folder <- file.path(data_folder, "polis") + spatial_folder <- file.path(data_folder, "spatial") + coverage_folder <- file.path(data_folder, "coverage") + pop_folder <- file.path(data_folder, "pop") + + # Check that the required folders have data + for (folder in c(analytic_folder, polis_data_folder, spatial_folder, + coverage_folder, pop_folder)) { + + # get_all_polio_data will recreate the analytic folder if it's missing + switch(basename(folder), + "analytic" = { + if (!sirfunctions_io("exists.dir", NULL, folder, edav = use_edav)) { + cli::cli_alert_info("No analytics folder found. Will create a new one.") + sirfunctions_io("create.dir", NULL, folder, edav = use_edav) + } + }, + "polis" = { + if (!sirfunctions_io("exists.dir", NULL, folder, edav = use_edav)) { + cli::cli_alert_info("Creating polis folder in the data folder") + sirfunctions_io("create.dir", NULL, folder, edav = use_edav) + } else { + cli::cli_alert_info("Moving updated polis data to the data folder") + } + + + create_polis_data_folder( + data_folder, + polis_folder, + core_ready_folder, + use_edav, + cache, + Inf + ) + + }, + "spatial" = { + if (!sirfunctions_io("exists.dir", NULL, folder, edav = use_edav)) { + cli::cli_abort(paste0("No spatial data found in the data folder.", + " Ensure that the output folder when running ", + " tidypolis::process_spatial() is ", + spatial_folder), + ) + } + }, + "coverage" = { + if (!sirfunctions_io("exists.dir", NULL, folder, edav = use_edav)) { + cli::cli_abort(paste0("Coverage data not found.", + "Please create and add coverage data in: ", + folder)) + } + }, + "pop" = { + if (!sirfunctions_io("exists.dir", NULL, folder, edav = use_edav)) { + cli::cli_abort(paste0("Population data not found. ", + "Preprocessing of population files may be required. ", + "Please create a pop data folder and add data in: ", + folder)) + } + } + ) + } + + return(list(analytic_folder = analytic_folder, + polis_data_folder = polis_data_folder, + spatial_folder = spatial_folder, + coverage_folder = coverage_folder, + pop_folder = pop_folder)) + +} + +#' Creates the "download table", with paths to files required for recreating static files +#' +#' @param data_folders_paths `list` Output of [check_data_folder()]. +#' @param polis_folder `str` POLIS folder containing preprocessed data. NOT the subfolder under the data folder. +#' @param use_edav `logical` Whether to use EDAV or not. +#' +#' @returns `tibble` Dataset containing paths to required files. +#' +#' @keywords internal +list_required_files_for_processing <- function(data_folders_paths, polis_folder, use_edav) { + dl_table <- dplyr::bind_rows( + sirfunctions_io( + "list", + NULL, + data_folders_paths$polis_data_folder, + edav = use_edav + ), + sirfunctions_io( + "list", + NULL, + data_folders_paths$spatial_folder, + edav = use_edav + ), + sirfunctions_io( + "list", + NULL, + data_folders_paths$coverage_folder, + edav = use_edav + ), + sirfunctions_io( + "list", + NULL, + data_folders_paths$pop_folder, + edav = use_edav + ), + sirfunctions_io( + "list", + NULL, + polis_folder, + edav = use_edav + ) |> + dplyr::filter(grepl("cache", name)) + ) |> + dplyr::filter(!is.na(size), !grepl("afp_linelist_2019", name)) |> + dplyr::select("file" = "name", "size") + + return(dl_table) +} + +#' Create the spatial data for processing +#' +#' @param data_folder `str` Path to the data folder. +#' @param use_edav `logical` Use EDAV or not. +#' +#' @returns `list` Contains spatial datasets. +#' +#' @keywords internal +check_spatial_data_for_processing <- function(data_folder, use_edav) { + spatial_folder <- file.path(data_folder, "spatial") + analytic_folder <- file.path(data_folder, "analytic") + global_ctry_sf_name <- "global.ctry.rds" + global_prov_sf_name <- "global.prov.rds" + global_dist_sf_name <- "global.dist.rds" + spatial_data <- list() + + # Check if spatial data needs to be redownloaded from the analytics folder + spatial_timestamp_exists <- sirfunctions_io( + "exists.file", + NULL, + file.path(analytic_folder, "spatial_timestamp.parquet"), + edav = use_edav + ) + + if (spatial_timestamp_exists) { + # Check if it's recent or needs updating + edav_spatial_timestamp <- sirfunctions_io( + "read", + NULL, + file.path(analytic_folder, "spatial_timestamp.parquet"), + edav = use_edav + ) |> + dplyr::select(name, lastModifiedEDAV = lastModified) + + edav_spatial_folder_info <- sirfunctions_io( + "list", + NULL, + file.path(spatial_folder), + edav = use_edav + ) |> + dplyr::select(name, lastModified) + + spatial_timestamp_comparison <- dplyr::left_join( + edav_spatial_timestamp, + edav_spatial_folder_info + ) |> + dplyr::mutate( + updated = ifelse(lastModifiedEDAV == lastModified, TRUE, FALSE) + ) |> + dplyr::pull(updated) |> + sum(na.rm = TRUE) + } else { + spatial_timestamp_comparison <- 0 + } + + if (spatial_timestamp_comparison == 3) { + cli::cli_alert_success( + "Spatial data in the analytic folder is up to date. Loading from cache..." + ) + spatial_data <- build_parquet_raw_data( + file.path(data_folder, "analytic"), + dataset = c("global.ctry", "global.prov", "global.dist"), + from_edav = use_edav + ) + } else { + if (spatial_timestamp_exists) { + cli::cli_alert_warning( + "Spatial data in the analytic folder is outdated. Recreating from the spatial folder" + ) + } else { + cli::cli_alert_warning( + "No spatial timestamp exists. Recreating from the spatial folder" + ) + } + + cli::cli_process_start("1) Loading country shape files") + spatial_data$global.ctry <- load_clean_ctry_sp( + fp = file.path(spatial_folder, global_ctry_sf_name), + edav = use_edav + ) + cli::cli_process_done() + + cli::cli_process_start("2) Loading province shape files") + spatial_data$global.prov <- load_clean_prov_sp( + fp = file.path(spatial_folder, global_prov_sf_name), + edav = use_edav + ) + cli::cli_process_done() + + cli::cli_process_start("3) Loading district shape files") + spatial_data$global.dist <- load_clean_dist_sp( + fp = file.path(spatial_folder, global_dist_sf_name), + edav = use_edav + ) + cli::cli_process_done() + } + + return(spatial_data) + +} + +#' Creates the AFP dataset of raw_data +#' +#' @param dl_table `tibble` Output of [list_required_files_for_processing()]. +#' @param use_edav `logical` Whether to use EDAV or not. +#' +#' @returns `tibble` AFP dataset. +#' +#' @keywords internal +process_afp_raw_data <- function(dl_table, use_edav) { + + afp <- sirfunctions_io("read", NULL, file_loc = dplyr::filter( + dl_table, + grepl("afp", file) + ) |> + dplyr::pull(file), edav = use_edav) |> + dplyr::filter(surveillancetypename == "AFP") |> + dplyr::mutate( + cdc.classification.all2 = dplyr::case_when( + final.cell.culture.result == "Not received in lab" & + cdc.classification.all == "PENDING" ~ "LAB PENDING", + TRUE ~ cdc.classification.all + ), + hot.case = ifelse( + paralysis.asymmetric == "Yes" & + paralysis.onset.fever == "Yes" & + paralysis.rapid.progress == "Yes", + 1, + 0 + ), + hot.case = ifelse(is.na(hot.case), 99, hot.case) + ) + + return(afp) + +} + +#' Creates afp.epi dataset +#' +#' @param afp `tibble` Output of [process_afp_raw_data()]. +#' +#' @returns `tibble` Summary of AFP cases by year/epi-week per country. +#' +#' @keywords internal +process_afp_epi_raw_data <- function(afp) { + + afp.epi <- afp |> + dplyr::mutate(epi.week = lubridate::epiweek(dateonset)) |> + dplyr::group_by(place.admin.0, epi.week, yronset, cdc.classification.all2) |> + dplyr::summarize(afp.cases = dplyr::n(), + .groups = "drop") |> + dplyr::mutate(epiweek.year = paste(yronset, epi.week, sep = "-")) |> + # manual fix of epi week + dplyr::mutate(epi.week = ifelse(epi.week == 52 & + yronset == 2022, 1, epi.week)) + + # factoring cdc classification to have an order we like in stacked bar chart + afp.epi$cdc.classification.all2 <- + factor( + afp.epi$cdc.classification.all2, + levels = c( + "WILD 1", + "cVDPV 2", + "VDPV 2", + "cVDPV 1", + "VDPV 1", + "COMPATIBLE", + "PENDING", + "LAB PENDING", + "NPAFP", + "NOT-AFP", + "UNKNOWN", + "aVDPV 1", + "aVDPV 3", + "cVDPV1andcVDPV2", + "CombinationWild1-cVDPV 2", + "aVDPV 2", + "VDPV 3", + "iVDPV 2", + "VDPV1andcVDPV2", + "VAPP", + "cVDPV 3", + "iVDPV 3", + "WILD 3", + "WILD1andWILD3", + "iVDPV 1", + "cVDPV2andcVDPV3" + ), + labels = c( + "WILD 1", + "cVDPV 2", + "VDPV 2", + "cVDPV 1", + "VDPV 1", + "COMPATIBLE", + "PENDING", + "LAB PENDING", + "NPAFP", + "NOT-AFP", + "UNKNOWN", + "aVDPV 1", + "aVDPV 3", + "cVDPV1andcVDPV2", + "CombinationWild1-cVDPV 2", + "aVDPV 2", + "VDPV 3", + "iVDPV 2", + "VDPV1andcVDPV2", + "VAPP", + "cVDPV 3", + "iVDPV 3", + "WILD 3", + "WILD1andWILD3", + "iVDPV 1", + "cVDPV2andcVDPV3" + ) + ) + + return(afp.epi) +} + +#' Creates paralytics cases dataset +#' +#' @inheritParams process_afp_epi_raw_data +#' +#' @returns `tibble` Dataset with paralytic cases only. +#' +#' @keywords internal +process_paralytic_raw_data <- function(afp) { + para.case <- afp |> + dplyr::filter( + stringr::str_detect(cdc.classification.all2, "VDPV|WILD|COMPATIBLE") + ) |> + dplyr::mutate(yronset = ifelse(is.na(yronset) == T, 2022, yronset)) # this fix was for the manually added MOZ case + + return(para.case) +} + +#' Pull data listed in the download table +#' +#' @param dl_table `tibble` Output of [list_required_files_for_processing()]. +#' @param grepl_pattern `str` Pattern to use to filter the `dl_table`. +#' @param use_edav `logical` Whether to use EDAV or not. +#' +#' @returns `tibble` One of the datasets listed in `dl_table`. +#' +#' @keywords internal +pull_data_from_dl_table <- function(dl_table, grepl_pattern, use_edav) { + pulled_data <- sirfunctions_io( + "read", + NULL, + file_loc = dplyr::filter(dl_table, grepl(grepl_pattern, file)) |> + dplyr::pull(file), + edav = use_edav + ) |> + dplyr::ungroup() + + return(pulled_data) + +} + +#' Creates metadata tag +#' +#' @param dl_table `tibble` Output of [list_required_files_for_processing()]. +#' @param raw_data `list` Processed data combining all polio data. +#' @param polis_folder `str` Path to POLIS folder. +#' @param core_ready_folder `str` Name of the core ready folder. +#' @param use_edav `logical` Whether to use EDAV or not. +#' +#' @returns `tibble` Metadata tibble. +#' +#' @keywords internal +process_metadata_raw_data <- function(dl_table, raw_data, polis_folder, core_ready_folder, use_edav) { + metadata <- list() + polis.cache <- sirfunctions_io("read", NULL, + file_loc = dplyr::filter(dl_table, grepl("cache.rds", file)) |> + dplyr::pull(file), edav = use_edav + ) |> + dplyr::mutate(last_sync = as.Date(last_sync)) + + metadata$download_time <- max(polis.cache$last_sync, na.rm = TRUE) + + metadata$processed_time <- sirfunctions_io("list", NULL, + file.path(polis_folder, "data", core_ready_folder), + edav = use_edav + ) |> + dplyr::filter(grepl("positives_2001-01-01", name)) |> + dplyr::select("ctime" = "lastModified") |> + dplyr::mutate(ctime = as.Date(ctime)) |> + dplyr::pull(ctime) + + metadata$user <- polis.cache |> + dplyr::filter(table == "virus") |> + dplyr::pull(last_user) + + metadata$most_recent_pos <- max(raw_data$pos$dateonset, na.rm = TRUE) + metadata$most_recent_pos_loc <- raw_data$pos |> + dplyr::arrange(dplyr::desc(dateonset)) |> + dplyr::slice(1) |> + dplyr::pull(place.admin.0) + + + metadata$most_recent_afp <- max(raw_data$afp$dateonset, na.rm = TRUE) + metadata$most_recent_afp_loc <- raw_data$afp |> + dplyr::arrange(dplyr::desc(dateonset)) |> + dplyr::slice(1) |> + dplyr::pull(place.admin.0) + + + metadata$most_recent_env <- max(raw_data$es$collect.date, na.rm = TRUE) + metadata$most_recent_env_loc <- raw_data$es |> + dplyr::arrange(dplyr::desc(collect.date)) |> + dplyr::slice(1) |> + dplyr::pull(ADM0_NAME) + + + metadata$most_recent_sia <- max(raw_data$sia$sub.activity.start.date) + metadata$most_recent_sia_code <- raw_data$sia |> + dplyr::arrange(dplyr::desc(sub.activity.start.date)) |> + dplyr::slice(1) |> + dplyr::pull(sia.code) + metadata$most_recent_sia_location <- raw_data$sia |> + dplyr::arrange(dplyr::desc(sub.activity.start.date)) |> + dplyr::slice(1) |> + dplyr::pull(place.admin.0) + metadata$most_recent_sia_vax <- raw_data$sia |> + dplyr::arrange(dplyr::desc(sub.activity.start.date)) |> + dplyr::slice(1) |> + dplyr::pull(vaccine.type) + + metadata$most_recent_vdpv_class_change_date <- raw_data$pos$vdpvclassificationchangedate |> + lubridate::as_date() |> + max(na.rm = T) + + return(metadata) + +} + +#' Cache the raw data +#' +#' @param raw_data `list` Processed list of all polio data. +#' @param analytic_folder_path `str` Path to analytic folder. +#' @param use_edav `logical` Whether to use EDAV or not. +#' +#' @returns `NULL`, invisibly. +#' +#' @keywords internal +cache_raw_data <- function(raw_data, analytic_folder_path, use_edav) { + + if (use_edav) { + withr::with_tempdir({ + + create_raw_data_parquet(raw_data, getwd()) + upload_parquet_to_edav(getwd(), analytic_folder_path, get_azure_storage_connection()) + + }) + } else { + create_raw_data_parquet(raw_data, analytic_folder_path) + } + + invisible() +} + +#' Create timestamps for raw data and spatial data +#' +#' @param data_folders_paths `list` Output of [check_data_folder()]. +#' @param use_edav `logical` Whether to use EDAV or not. +#' +#' @returns `NULL`, invisibly. +#' +#' @keywords internal +create_raw_data_tags <- function(data_folders_paths, use_edav) { + + # Create tags only if not using "archived" version + if (use_edav) { + # Create raw data file tag for future comparisons + sirfunctions_io("write", NULL, + file_loc = file.path(data_folders_paths$analytic_folder, paste0("raw_data_timestamp.rds")), + obj = Sys.time()) + + # Create spatial data file tag for future comparisons + spatial_files <- sirfunctions_io("list", + NULL, + data_folders_paths$spatial_folder, + edav = use_edav, + full_names = TRUE) + + edav_spatial_timestamp <- spatial_files |> + dplyr::filter(stringr::str_detect(name, "global."), + stringr::str_ends(name, "parquet")) |> + dplyr::select(name, lastModified) + + sirfunctions_io( + "write", + NULL, + file.path(data_folders_paths$analytic_folder, paste0("spatial_timestamp.rds")), + obj = edav_spatial_timestamp, + edav = use_edav + ) + } + +} + +#' Reprocess the global polio dataset +#' +#' @param data_folder `str` Path to the data folder. +#' @param polis_folder `str` Path to the POLIS folder. +#' @param core_ready_folder `str` Name of the core ready folder. +#' @param use_edav `logical` Whether to use EDAV or not. +#' @param cache `logical` Whether to cache the preprocessed data to the data/polis subfolder. +#' +#' @returns `list` Processed raw data. +#' +#' @keywords internal +reprocess_polio_data <- function(data_folder, polis_folder, core_ready_folder, use_edav, cache) { + + raw_data <- list() + # NOTE: we will need to add mechanism for retrieving and loading archived parquet folders + data_folders_paths <- check_data_folder(data_folder, polis_folder, core_ready_folder, use_edav, cache) + + # List files required for processing + dl_table <- list_required_files_for_processing(data_folders_paths, polis_folder, use_edav) + + # Obtain spatial data information + spatial_data <- check_spatial_data_for_processing(data_folder, use_edav) + + # Process raw.data + raw_data$afp <- process_afp_raw_data(dl_table, use_edav) + raw_data$afp.epi <- process_afp_epi_raw_data(raw_data$afp) + raw_data$para.case <- process_paralytic_raw_data(raw_data$afp) + raw_data$ctry.pop <- pull_data_from_dl_table(dl_table, "ctry.pop", use_edav) + raw_data$prov.pop <- pull_data_from_dl_table(dl_table, "prov.pop", use_edav) + raw_data$dist.pop <- pull_data_from_dl_table(dl_table, "dist.pop", use_edav) + raw_data$ctry.coverage <- pull_data_from_dl_table(dl_table, "ctry_cov", use_edav) + raw_data$prov.coverage <- pull_data_from_dl_table(dl_table, "prov_cov", use_edav) + raw_data$dist.coverage <- pull_data_from_dl_table(dl_table, "dist_cov", use_edav) + raw_data$es <- pull_data_from_dl_table(dl_table, "/es_2001", use_edav) + raw_data$sia <- pull_data_from_dl_table(dl_table, "sia", use_edav) + raw_data$pos <- pull_data_from_dl_table(dl_table, "/pos", use_edav) + raw_data$other <- pull_data_from_dl_table(dl_table, "/other", use_edav) + + # Add spatial data to raw_data + raw_data$global.ctry <- spatial_data$global.ctry + raw_data$global.prov <- spatial_data$global.prov + raw_data$global.dist <- spatial_data$global.dist + raw_data$roads <- pull_data_from_dl_table(dl_table, "roads.rds", use_edav) + raw_data$cities <- pull_data_from_dl_table(dl_table, "cities.rds", use_edav) + + # Create metadata + raw_data$metadata <- process_metadata_raw_data(dl_table, raw_data, polis_folder, core_ready_folder, use_edav) + + # Check for duplicates + raw_data <- duplicate_check(raw_data) + + # Cache processed data only if we aren't using the archived version + cache_raw_data(raw_data,data_folders_paths$analytic_folder, use_edav) + + # Create data tags only if we aren't using the archived version + create_raw_data_tags(data_folders_paths, use_edav) + + return(raw_data) + +} + +# Main function + +#' Pull global polio dataset +#' +#' @param dataset `str` Name of the dataset. Defaults to 'all'. +#' @param data_folder `str` Path to data folder. +#' @param polis_folder `str` Path to the POLIS folder. +#' @param core_ready_folder `str` Name of the core ready folder. +#' @param recreate.static.files `logical` Whether to reprocess global polio data. +#' @param use_edav `logical` Whether to use EDAV or not. +#' @param azcontainer `azcontainer` Azure container object. +#' @param cache `logical` Whether to cache the preprocessed datasets in the `data/polis` folder. +#' +#' @returns `list` Global polio datasets. +#' +#' @export +#' @examples +#' \dontrun{ +#' raw_data <- get_all_polio_data_2() +#' } +get_all_polio_data_2 <- function(dataset = "all", + data_folder = "GID/PEB/SIR/Data", + polis_folder = "GID/PEB/SIR/POLIS", + core_ready_folder = "Core_Ready_Files", + recreate.static.files = FALSE, + use_edav = TRUE, + azcontainer = get_azure_storage_connection(), + cache = TRUE) { + + if (recreate.static.files) { + raw_data <- reprocess_polio_data(data_folder, polis_folder, core_ready_folder, use_edav, cache) + } else { + raw_data <- build_parquet_raw_data(file.path(data_folder, "analytic"), dataset, use_edav, azcontainer) + } + + return(raw_data) + +} + diff --git a/man/build_parquet_raw_data.Rd b/man/build_parquet_raw_data.Rd new file mode 100644 index 00000000..c0d0a7a6 --- /dev/null +++ b/man/build_parquet_raw_data.Rd @@ -0,0 +1,43 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/dal.parquet.R +\name{build_parquet_raw_data} +\alias{build_parquet_raw_data} +\title{Recreate raw data from local parquet folder} +\usage{ +build_parquet_raw_data( + path = "GID/PEB/SIR/Data/analytic", + dataset = "all", + from_edav = TRUE, + container = get_azure_storage_connection() +) +} +\arguments{ +\item{path}{\code{str} Absolute path to the parquet folder.} + +\item{from_edav}{\code{bool} Build using local files or files in EDAV? Defaults to TRUE.} + +\item{container}{\code{azcontainer} An instance of an Azure container to connect. +to. Pass \code{\link[=get_azure_storage_connection]{get_azure_storage_connection()}} using defaults if not accessing +using a service principal.} +} +\value{ +\code{list} A list containing connections to the folders associated with +individual datasets in the original output of \code{\link[=get_all_polio_data]{get_all_polio_data()}}. +} +\description{ +Recreates an output of \code{\link[=get_all_polio_data]{get_all_polio_data()}} from a folder housing +data in parquet format. +} +\details{ +For tibbles with Shapes, pass to \code{\link[=from_wkb_to_sf]{from_wkb_to_sf()}} first before creating maps. +} +\examples{ +\dontrun{ +# Building raw_data locally +parquet_path <- "C:/Users/ABC1/Desktop/parquet_folder" +raw_data <- build_parquet_raw_data(parquet_path) + +# Build raw_data from EDAV +raw_data <- build_parquet_raw_data() +} +} diff --git a/man/build_parquet_raw_data_edav.Rd b/man/build_parquet_raw_data_edav.Rd new file mode 100644 index 00000000..3ed377ff --- /dev/null +++ b/man/build_parquet_raw_data_edav.Rd @@ -0,0 +1,30 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/dal.parquet.R +\name{build_parquet_raw_data_edav} +\alias{build_parquet_raw_data_edav} +\title{Build raw_data using EDAV files} +\usage{ +build_parquet_raw_data_edav( + path = NULL, + dataset = "all", + container = get_azure_storage_connection() +) +} +\arguments{ +\item{path}{\code{str} Path to EDAV folder containing parquet files. This must +be the absolute file path from the Blob endpoint of the container.} + +\item{dataset}{\code{str} A specific dataset. Defaults to \code{"all"}. Otherwise, can specify any dataset in the list returned by \code{\link[=get_all_polio_data]{get_all_polio_data()}}.} + +\item{container}{\code{azcontainer} An instance of an Azure container to connect +to. Pass \code{\link[=get_azure_storage_connection]{get_azure_storage_connection()}} using defaults if not accessing +using a service principal.} +} +\value{ +\code{list} A list containing connections to the folders associated with +individual datasets in the original output of \code{\link[=get_all_polio_data]{get_all_polio_data()}}. +} +\description{ +Build raw_data using EDAV files +} +\keyword{internal} diff --git a/man/build_parquet_raw_data_local.Rd b/man/build_parquet_raw_data_local.Rd new file mode 100644 index 00000000..f4480ca1 --- /dev/null +++ b/man/build_parquet_raw_data_local.Rd @@ -0,0 +1,21 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/dal.parquet.R +\name{build_parquet_raw_data_local} +\alias{build_parquet_raw_data_local} +\title{Build raw_data using local parquet files} +\usage{ +build_parquet_raw_data_local(path = NULL, dataset = "all") +} +\arguments{ +\item{path}{\code{str} A path to the parquet directory} + +\item{dataset}{\code{str} A specific dataset. Defaults to \code{"all"}. Otherwise, can specify any dataset in the list returned by \code{\link[=get_all_polio_data]{get_all_polio_data()}}.} +} +\value{ +\code{list} A list containing connections to the folders associated with +individual datasets in the original output of \code{\link[=get_all_polio_data]{get_all_polio_data()}}. +} +\description{ +Build raw_data using local parquet files +} +\keyword{internal} diff --git a/man/cache_raw_data.Rd b/man/cache_raw_data.Rd new file mode 100644 index 00000000..3bde69c9 --- /dev/null +++ b/man/cache_raw_data.Rd @@ -0,0 +1,22 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/get_all_polio_data_2.R +\name{cache_raw_data} +\alias{cache_raw_data} +\title{Cache the raw data} +\usage{ +cache_raw_data(raw_data, analytic_folder_path, use_edav) +} +\arguments{ +\item{raw_data}{\code{list} Processed list of all polio data.} + +\item{analytic_folder_path}{\code{str} Path to analytic folder.} + +\item{use_edav}{\code{logical} Whether to use EDAV or not.} +} +\value{ +\code{NULL}, invisibly. +} +\description{ +Cache the raw data +} +\keyword{internal} diff --git a/man/check_data_folder.Rd b/man/check_data_folder.Rd new file mode 100644 index 00000000..6e50559e --- /dev/null +++ b/man/check_data_folder.Rd @@ -0,0 +1,32 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/get_all_polio_data_2.R +\name{check_data_folder} +\alias{check_data_folder} +\title{Checks for required subfolders in the data folder} +\usage{ +check_data_folder( + data_folder, + polis_folder, + core_ready_folder, + use_edav, + cache +) +} +\arguments{ +\item{data_folder}{\code{str} Path to the data folder.} + +\item{polis_folder}{\code{str} POLIS folder with preprocessed data.} + +\item{core_ready_folder}{\code{str} Name of the core ready folder. Need to be specified if preprocessing specific regions, which have their own core ready folder.} + +\item{use_edav}{\code{logical} Whether to use EDAV or not.} + +\item{cache}{\code{logical} Whether to cache the preprocessed data to data/polis subfolder.} +} +\value{ +\code{list} List of paths to the specific subfolders. +} +\description{ +Checks for required subfolders in the data folder +} +\keyword{internal} diff --git a/man/check_spatial_data_for_processing.Rd b/man/check_spatial_data_for_processing.Rd new file mode 100644 index 00000000..b0bc6998 --- /dev/null +++ b/man/check_spatial_data_for_processing.Rd @@ -0,0 +1,20 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/get_all_polio_data_2.R +\name{check_spatial_data_for_processing} +\alias{check_spatial_data_for_processing} +\title{Create the spatial data for processing} +\usage{ +check_spatial_data_for_processing(data_folder, use_edav) +} +\arguments{ +\item{data_folder}{\code{str} Path to the data folder.} + +\item{use_edav}{\code{logical} Use EDAV or not.} +} +\value{ +\code{list} Contains spatial datasets. +} +\description{ +Create the spatial data for processing +} +\keyword{internal} diff --git a/man/create_raw_data_parquet.Rd b/man/create_raw_data_parquet.Rd new file mode 100644 index 00000000..e65e2f63 --- /dev/null +++ b/man/create_raw_data_parquet.Rd @@ -0,0 +1,26 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/dal.parquet.R +\name{create_raw_data_parquet} +\alias{create_raw_data_parquet} +\title{Convert raw data into a parquet hierarchal folder} +\usage{ +create_raw_data_parquet(raw_data, path) +} +\arguments{ +\item{raw_data}{\code{list} A \code{raw_data} object.} + +\item{path}{\code{str} Path to export the parquet folder to.} +} +\value{ +None. +} +\description{ +The function takes a \code{raw_data} object (output of \code{\link[=get_all_polio_data]{get_all_polio_data()}}) and +saves it into a parquet directory +} +\examples{ +\dontrun{ +raw_data <- get_all_polio_data() +create_raw_data_parquet(raw_data, "C:/Users/ABC1/Desktop/raw_data_parquet") +} +} diff --git a/man/create_raw_data_tags.Rd b/man/create_raw_data_tags.Rd new file mode 100644 index 00000000..33f205f8 --- /dev/null +++ b/man/create_raw_data_tags.Rd @@ -0,0 +1,20 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/get_all_polio_data_2.R +\name{create_raw_data_tags} +\alias{create_raw_data_tags} +\title{Create timestamps for raw data and spatial data} +\usage{ +create_raw_data_tags(data_folders_paths, use_edav) +} +\arguments{ +\item{data_folders_paths}{\code{list} Output of \code{\link[=check_data_folder]{check_data_folder()}}.} + +\item{use_edav}{\code{logical} Whether to use EDAV or not.} +} +\value{ +\code{NULL}, invisibly. +} +\description{ +Create timestamps for raw data and spatial data +} +\keyword{internal} diff --git a/man/from_wkb_to_sf.Rd b/man/from_wkb_to_sf.Rd new file mode 100644 index 00000000..e1623f3b --- /dev/null +++ b/man/from_wkb_to_sf.Rd @@ -0,0 +1,26 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/dal.parquet.R +\name{from_wkb_to_sf} +\alias{from_wkb_to_sf} +\title{Convert WKB back to sf column} +\usage{ +from_wkb_to_sf(sf_data) +} +\arguments{ +\item{sf_data}{\verb{arrow connection} Geodata arrow connection.} +} +\value{ +\code{tibble} Geodata with \code{sf}. +} +\description{ +Convert WKB back to sf column +} +\examples{ +\dontrun{ +raw_data <- build_parquet_raw_data() +kenya_ctry_sf <- raw_data$global.ctry |> + dplyr::filter(ctry == "KENYA") |> + dplyr::collect() |> + from_wkb_to_sf() +} +} diff --git a/man/get_all_polio_data.Rd b/man/get_all_polio_data.Rd index 93b44408..a09aa17f 100644 --- a/man/get_all_polio_data.Rd +++ b/man/get_all_polio_data.Rd @@ -1,5 +1,5 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/dal.R +% Please edit documentation in R/get_all_polio_data.R \name{get_all_polio_data} \alias{get_all_polio_data} \title{Retrieve all pre-processed polio data} diff --git a/man/get_all_polio_data_2.Rd b/man/get_all_polio_data_2.Rd new file mode 100644 index 00000000..0980df03 --- /dev/null +++ b/man/get_all_polio_data_2.Rd @@ -0,0 +1,45 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/get_all_polio_data_2.R +\name{get_all_polio_data_2} +\alias{get_all_polio_data_2} +\title{Pull global polio dataset} +\usage{ +get_all_polio_data_2( + dataset = "all", + data_folder = "GID/PEB/SIR/Data", + polis_folder = "GID/PEB/SIR/POLIS", + core_ready_folder = "Core_Ready_Files", + recreate.static.files = FALSE, + use_edav = TRUE, + azcontainer = get_azure_storage_connection(), + cache = TRUE +) +} +\arguments{ +\item{dataset}{\code{str} Name of the dataset. Defaults to 'all'.} + +\item{data_folder}{\code{str} Path to data folder.} + +\item{polis_folder}{\code{str} Path to the POLIS folder.} + +\item{core_ready_folder}{\code{str} Name of the core ready folder.} + +\item{recreate.static.files}{\code{logical} Whether to reprocess global polio data.} + +\item{use_edav}{\code{logical} Whether to use EDAV or not.} + +\item{azcontainer}{\code{azcontainer} Azure container object.} + +\item{cache}{\code{logical} Whether to cache the preprocessed datasets in the \code{data/polis} folder.} +} +\value{ +\code{list} Global polio datasets. +} +\description{ +Pull global polio dataset +} +\examples{ +\dontrun{ +raw_data <- get_all_polio_data_2() +} +} diff --git a/man/get_partition_cols.Rd b/man/get_partition_cols.Rd new file mode 100644 index 00000000..cad472b2 --- /dev/null +++ b/man/get_partition_cols.Rd @@ -0,0 +1,23 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/dal.parquet.R +\name{get_partition_cols} +\alias{get_partition_cols} +\title{Gets the column used to partition a column} +\usage{ +get_partition_cols(name) +} +\arguments{ +\item{name}{\code{str} Name of the column} +} +\value{ +\code{chr} A character vector of columns to partition with. +} +\description{ +Gets the column used to partition a column +} +\examples{ +\dontrun{ +get_partition_cols("afp") +} +} +\keyword{internal} diff --git a/man/list_required_files_for_processing.Rd b/man/list_required_files_for_processing.Rd new file mode 100644 index 00000000..9a06e5e3 --- /dev/null +++ b/man/list_required_files_for_processing.Rd @@ -0,0 +1,22 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/get_all_polio_data_2.R +\name{list_required_files_for_processing} +\alias{list_required_files_for_processing} +\title{Creates the "download table", with paths to files required for recreating static files} +\usage{ +list_required_files_for_processing(data_folders_paths, polis_folder, use_edav) +} +\arguments{ +\item{data_folders_paths}{\code{list} Output of \code{\link[=check_data_folder]{check_data_folder()}}.} + +\item{polis_folder}{\code{str} POLIS folder containing preprocessed data. NOT the subfolder under the data folder.} + +\item{use_edav}{\code{logical} Whether to use EDAV or not.} +} +\value{ +\code{tibble} Dataset containing paths to required files. +} +\description{ +Creates the "download table", with paths to files required for recreating static files +} +\keyword{internal} diff --git a/man/process_afp_epi_raw_data.Rd b/man/process_afp_epi_raw_data.Rd new file mode 100644 index 00000000..8bba2c7b --- /dev/null +++ b/man/process_afp_epi_raw_data.Rd @@ -0,0 +1,18 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/get_all_polio_data_2.R +\name{process_afp_epi_raw_data} +\alias{process_afp_epi_raw_data} +\title{Creates afp.epi dataset} +\usage{ +process_afp_epi_raw_data(afp) +} +\arguments{ +\item{afp}{\code{tibble} Output of \code{\link[=process_afp_raw_data]{process_afp_raw_data()}}.} +} +\value{ +\code{tibble} Summary of AFP cases by year/epi-week per country. +} +\description{ +Creates afp.epi dataset +} +\keyword{internal} diff --git a/man/process_afp_raw_data.Rd b/man/process_afp_raw_data.Rd new file mode 100644 index 00000000..502e8796 --- /dev/null +++ b/man/process_afp_raw_data.Rd @@ -0,0 +1,20 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/get_all_polio_data_2.R +\name{process_afp_raw_data} +\alias{process_afp_raw_data} +\title{Creates the AFP dataset of raw_data} +\usage{ +process_afp_raw_data(dl_table, use_edav) +} +\arguments{ +\item{dl_table}{\code{tibble} Output of \code{\link[=list_required_files_for_processing]{list_required_files_for_processing()}}.} + +\item{use_edav}{\code{logical} Whether to use EDAV or not.} +} +\value{ +\code{tibble} AFP dataset. +} +\description{ +Creates the AFP dataset of raw_data +} +\keyword{internal} diff --git a/man/process_metadata_raw_data.Rd b/man/process_metadata_raw_data.Rd new file mode 100644 index 00000000..ebf919fa --- /dev/null +++ b/man/process_metadata_raw_data.Rd @@ -0,0 +1,32 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/get_all_polio_data_2.R +\name{process_metadata_raw_data} +\alias{process_metadata_raw_data} +\title{Creates metadata tag} +\usage{ +process_metadata_raw_data( + dl_table, + raw_data, + polis_folder, + core_ready_folder, + use_edav +) +} +\arguments{ +\item{dl_table}{\code{tibble} Output of \code{\link[=list_required_files_for_processing]{list_required_files_for_processing()}}.} + +\item{raw_data}{\code{list} Processed data combining all polio data.} + +\item{polis_folder}{\code{str} Path to POLIS folder.} + +\item{core_ready_folder}{\code{str} Name of the core ready folder.} + +\item{use_edav}{\code{logical} Whether to use EDAV or not.} +} +\value{ +\code{tibble} Metadata tibble. +} +\description{ +Creates metadata tag +} +\keyword{internal} diff --git a/man/process_paralytic_raw_data.Rd b/man/process_paralytic_raw_data.Rd new file mode 100644 index 00000000..b9c29de6 --- /dev/null +++ b/man/process_paralytic_raw_data.Rd @@ -0,0 +1,18 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/get_all_polio_data_2.R +\name{process_paralytic_raw_data} +\alias{process_paralytic_raw_data} +\title{Creates paralytics cases dataset} +\usage{ +process_paralytic_raw_data(afp) +} +\arguments{ +\item{afp}{\code{tibble} Output of \code{\link[=process_afp_raw_data]{process_afp_raw_data()}}.} +} +\value{ +\code{tibble} Dataset with paralytic cases only. +} +\description{ +Creates paralytics cases dataset +} +\keyword{internal} diff --git a/man/pull_data_from_dl_table.Rd b/man/pull_data_from_dl_table.Rd new file mode 100644 index 00000000..be0d0378 --- /dev/null +++ b/man/pull_data_from_dl_table.Rd @@ -0,0 +1,22 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/get_all_polio_data_2.R +\name{pull_data_from_dl_table} +\alias{pull_data_from_dl_table} +\title{Pull data listed in the download table} +\usage{ +pull_data_from_dl_table(dl_table, grepl_pattern, use_edav) +} +\arguments{ +\item{dl_table}{\code{tibble} Output of \code{\link[=list_required_files_for_processing]{list_required_files_for_processing()}}.} + +\item{grepl_pattern}{\code{str} Pattern to use to filter the \code{dl_table}.} + +\item{use_edav}{\code{logical} Whether to use EDAV or not.} +} +\value{ +\code{tibble} One of the datasets listed in \code{dl_table}. +} +\description{ +Pull data listed in the download table +} +\keyword{internal} diff --git a/man/reprocess_polio_data.Rd b/man/reprocess_polio_data.Rd new file mode 100644 index 00000000..54564147 --- /dev/null +++ b/man/reprocess_polio_data.Rd @@ -0,0 +1,32 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/get_all_polio_data_2.R +\name{reprocess_polio_data} +\alias{reprocess_polio_data} +\title{Reprocess the global polio dataset} +\usage{ +reprocess_polio_data( + data_folder, + polis_folder, + core_ready_folder, + use_edav, + cache +) +} +\arguments{ +\item{data_folder}{\code{str} Path to the data folder.} + +\item{polis_folder}{\code{str} Path to the POLIS folder.} + +\item{core_ready_folder}{\code{str} Name of the core ready folder.} + +\item{use_edav}{\code{logical} Whether to use EDAV or not.} + +\item{cache}{\code{logical} Whether to cache the preprocessed data to the data/polis subfolder.} +} +\value{ +\code{list} Processed raw data. +} +\description{ +Reprocess the global polio dataset +} +\keyword{internal} diff --git a/man/to_wkb_drop_sf.Rd b/man/to_wkb_drop_sf.Rd new file mode 100644 index 00000000..d1f0f560 --- /dev/null +++ b/man/to_wkb_drop_sf.Rd @@ -0,0 +1,21 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/dal.parquet.R +\name{to_wkb_drop_sf} +\alias{to_wkb_drop_sf} +\title{Drop Shape column and convert to binary} +\usage{ +to_wkb_drop_sf(sf_data) +} +\arguments{ +\item{x}{\code{sf} or \code{data.frame} Geodata.} +} +\value{ +\code{tibble} dData without any Shape column. +} +\description{ +Drop Shape column and convert to binary +} +\details{ +This function was written using the CDC EDAV Chatbot using the model GPT-5.2. +} +\keyword{internal} diff --git a/man/upload_parquet_to_edav.Rd b/man/upload_parquet_to_edav.Rd new file mode 100644 index 00000000..97c0a633 --- /dev/null +++ b/man/upload_parquet_to_edav.Rd @@ -0,0 +1,28 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/dal.parquet.R +\name{upload_parquet_to_edav} +\alias{upload_parquet_to_edav} +\title{Uploads a local parquet folder to EDAV} +\usage{ +upload_parquet_to_edav(src, dest, container = get_azure_storage_connection()) +} +\arguments{ +\item{src}{\code{str} Local path to the parquet folder.} + +\item{dest}{\code{str} EDAV endpoint.} + +\item{container}{\code{azcontainer} An instance of an Azure container.} +} +\value{ +None. +} +\description{ +Uploads a folder containing parquet files to EDAV +} +\examples{ +\dontrun{ +local_dir <- "C:/Users/ABC1/Desktop/parquet_folder" +edav_dir <- "ABC/parquet_folder" +upload_parquet_to_edav(local_dir, edav_dir) +} +}