diff --git a/itcc-omics-ingest/src/controllers/mod.rs b/itcc-omics-ingest/src/controllers/mod.rs index 26d6bdc..4559ec0 100644 --- a/itcc-omics-ingest/src/controllers/mod.rs +++ b/itcc-omics-ingest/src/controllers/mod.rs @@ -1,3 +1,4 @@ pub mod extractors; pub mod health; pub mod omics; +pub mod patient; diff --git a/itcc-omics-ingest/src/controllers/patient.rs b/itcc-omics-ingest/src/controllers/patient.rs new file mode 100644 index 0000000..927394c --- /dev/null +++ b/itcc-omics-ingest/src/controllers/patient.rs @@ -0,0 +1,171 @@ +use crate::beam; +use crate::utils::error_type::ErrorType; +use crate::AppState; +use axum::extract::{Path, Request}; +use axum::http::StatusCode; +use axum::response::{IntoResponse, Response}; +use axum::{extract::State, routing::post, Json, Router}; +use itcc_omics_lib::fhir::blaze::{ + get_all_patient_count, get_all_patient_identifiers, get_patient_by_id, +}; +use itcc_omics_lib::mainzelliste::handler::{ + create_patients, create_session, create_token, CreatePatientResp, CreateTokenResp, +}; +use itcc_omics_lib::mainzelliste::{encryption_ml, init_mainzelliste}; +use serde::{Deserialize, Serialize}; +use std::collections::HashSet; +use std::sync::Arc; +use tower_http::trace::TraceLayer; +use tracing::debug; +use tracing::{error, info, info_span}; + +pub fn routers() -> Router> { + Router::new() + .route("/upload/patient/{id}", post(upload_patient_by_id_handler)) + .route("/upload/patient", post(upload_all_patients_handler)) + .layer( + TraceLayer::new_for_http() + .make_span_with(|request: &Request<_>| { + info_span!( + "http_request", + method = %request.method(), + uri = %request.uri(), + status_code = tracing::field::Empty, + ) + }) + .on_response( + |response: &axum::response::Response, + latency: std::time::Duration, + span: &tracing::Span| { + span.record("status_code", response.status().as_u16()); + info!(parent: span, latency_ms = latency.as_millis(), "response sent"); + }, + ), + ) +} + +// POST /upload/patient/{id} +#[tracing::instrument(skip(state), fields(patient_id = %id))] +async fn upload_patient_by_id_handler( + State(state): State>, + Path(id): Path, +) -> Response { + match export_patients_to_dwh(&state, Some(id.clone())).await { + Ok(result) => (StatusCode::CREATED, Json(result)).into_response(), + Err(e) => { + error!("Error exporting patient {}: {:?}", id, e); + e.into_response() + } + } +} + +// POST /upload/patient +#[tracing::instrument(skip(state))] +async fn upload_all_patients_handler(State(state): State>) -> Response { + match export_patients_to_dwh(&state, None).await { + Ok(result) => (StatusCode::CREATED, Json(result)).into_response(), + Err(e) => { + error!("Error exporting all patients: {:?}", e); + e.into_response() + } + } +} + +async fn export_patients_to_dwh( + state: &Arc, + patient_id: Option, +) -> Result { + match patient_id { + Some(id) => { + let patient_vec: HashSet = HashSet::from([id]); + export_single_patient(state, patient_vec).await + } + None => export_all_patients(state).await, + } +} + +async fn export_single_patient( + app_state: &Arc, + patient_id: HashSet, +) -> Result { + let token: CreateTokenResp = init_mainzelliste( + &app_state.http, + app_state.services.ml_api_key.as_ref(), + &app_state.services.ml_url, + patient_id.len(), + ) + .await?; + + let local_crypto_ids = encryption_ml( + &app_state.http, + app_state.services.ml_api_key.as_ref(), + &app_state.services.ml_url, + &token.id, + patient_id.clone(), + ) + .await?; + + for (patient_id, pseudo_id) in local_crypto_ids.iter() { + let mut bundle = + get_patient_by_id(&app_state.http, &app_state.services.blaze_url, patient_id).await?; + bundle.rename_patient_id_everywhere(patient_id, pseudo_id)?; + debug!("Bundle AFTER: {:#?}", bundle); + beam::send_fhir_bundle(app_state, bundle).await?; + } + + let exported_id = patient_id.iter().next().cloned().unwrap_or_default(); + + Ok(PatientExportResponse { + message: format!("Patient {} exported successfully", exported_id), + exported_patient_id: patient_id, + exported_all: false, + }) +} + +async fn export_all_patients( + app_state: &Arc, +) -> Result { + let count = get_all_patient_count(&app_state.http, &app_state.services.blaze_url).await?; + + let patient_ids = + get_all_patient_identifiers(&app_state.http, &app_state.services.blaze_url, count).await?; + + let token: CreateTokenResp = init_mainzelliste( + &app_state.http, + app_state.services.ml_api_key.as_ref(), + &app_state.services.ml_url, + patient_ids.len(), + ) + .await?; + + let local_crypto_ids = encryption_ml( + &app_state.http, + app_state.services.ml_api_key.as_ref(), + &app_state.services.ml_url, + &token.id, + patient_ids.clone(), + ) + .await?; + + for (patient_id, pseudo_id) in local_crypto_ids.iter() { + let mut bundle = + get_patient_by_id(&app_state.http, &app_state.services.blaze_url, patient_id).await?; + + bundle.rename_patient_id_everywhere(patient_id, pseudo_id)?; + debug!("Bundle AFTER: {:#?}", bundle); + beam::send_fhir_bundle(app_state, bundle).await?; + } + + Ok(PatientExportResponse { + message: format!("All {} patients exported successfully", patient_ids.len()), + exported_patient_id: patient_ids, + exported_all: true, + }) +} + +#[derive(Deserialize, Debug, Clone, Serialize)] +struct PatientExportResponse { + pub message: String, + pub exported_patient_id: HashSet, + pub exported_all: bool, +} diff --git a/itcc-omics-ingest/src/lib.rs b/itcc-omics-ingest/src/lib.rs index c1bdc2a..778a841 100644 --- a/itcc-omics-ingest/src/lib.rs +++ b/itcc-omics-ingest/src/lib.rs @@ -7,7 +7,7 @@ pub mod test; pub mod utils; use crate::controllers::extractors::api_key_check; -use crate::controllers::{health, omics}; +use crate::controllers::{health, omics, patient}; use crate::utils::config::IngestConfig; use crate::utils::config::{AppState, Services}; use axum::extract::State; @@ -41,6 +41,7 @@ pub async fn run_with_config() { pub fn create_router(app_state: Arc) -> Router { Router::new() .merge(omics::routers()) + .merge(patient::routers()) .route_layer(from_fn_with_state(app_state.clone(), api_key_check)) .merge(health::routers()) .with_state(app_state)