From f7a9e94387649723bf2e385c835f6a6849672cc6 Mon Sep 17 00:00:00 2001 From: Alaa Abdel Latif Date: Fri, 23 Sep 2022 15:25:00 -0700 Subject: [PATCH 1/3] add url for PrevalenceByLocationAndTimeHandler2 --- web/handlers/genomics/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/web/handlers/genomics/__init__.py b/web/handlers/genomics/__init__.py index 881da552..b65ac42c 100644 --- a/web/handlers/genomics/__init__.py +++ b/web/handlers/genomics/__init__.py @@ -1,5 +1,5 @@ from .lineage import LineageByCountryHandler, LineageByDivisionHandler, LineageAndCountryHandler, LineageAndDivisionHandler, LineageHandler, LineageMutationsHandler, MutationDetailsHandler, MutationsByLineage -from .prevalence import GlobalPrevalenceByTimeHandler, PrevalenceByLocationAndTimeHandler, CumulativePrevalenceByLocationHandler, PrevalenceAllLineagesByLocationHandler, PrevalenceByAAPositionHandler +from .prevalence import GlobalPrevalenceByTimeHandler, PrevalenceByLocationAndTimeHandler, PrevalenceByLocationAndTimeHandler2, CumulativePrevalenceByLocationHandler, PrevalenceAllLineagesByLocationHandler, PrevalenceByAAPositionHandler from .general import LocationHandler, LocationDetailsHandler, MetadataHandler, MutationHandler, SubmissionLagHandler, SequenceCountHandler, MostRecentSubmissionDateHandler, MostRecentCollectionDateHandler, GisaidIDHandler from .gisaid_auth import GISAIDTokenHandler @@ -11,6 +11,7 @@ (r"/genomics/sequence-count", SequenceCountHandler), (r"/genomics/global-prevalence", GlobalPrevalenceByTimeHandler), (r"/genomics/prevalence-by-location", PrevalenceByLocationAndTimeHandler), + (r"/genomics/prevalence-by-location2", PrevalenceByLocationAndTimeHandler2), (r"/genomics/prevalence-by-location-all-lineages", PrevalenceAllLineagesByLocationHandler), (r"/genomics/prevalence-by-position", PrevalenceByAAPositionHandler), (r"/genomics/lineage-by-sub-admin-most-recent", CumulativePrevalenceByLocationHandler), From 966a56a2d9535887e4fa358f738f438ece37f167 Mon Sep 17 00:00:00 2001 From: Alaa Abdel Latif Date: Fri, 23 Sep 2022 15:25:54 -0700 Subject: [PATCH 2/3] implement support functions calculate_proportion2() and calculate_confidence_intervals() for handler prevalence.PrevalenceByLocationAndTimeHandler2 --- web/handlers/genomics/util.py | 52 +++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/web/handlers/genomics/util.py b/web/handlers/genomics/util.py index 56acc634..bc8b7ff7 100644 --- a/web/handlers/genomics/util.py +++ b/web/handlers/genomics/util.py @@ -2,6 +2,30 @@ from scipy.stats import beta import pandas as pd + + +def calculate_confidence_intervals(x): + """helper function for calculating confidence intervals from rolling count data""" + _x = x['lineage_count_rolling'].round() + n = x['total_count_rolling'].round() + ci_lower, ci_upper = beta.interval(1 - 0.05, _x + 0.5, n - _x + 0.5) + return ci_lower, ci_upper + + + +def calculate_proportion2(_x): + """helper function for calculating relative prevalence (proportion) from rolling count data""" + x = _x['lineage_count_rolling'].round() + n = _x['total_count_rolling'].round() + ci_low, ci_upp = beta.interval(1 - 0.05, x + 0.5, n - x + 0.5) # Jeffreys Interval + d = {} + d['proportion'] = _x['lineage_count_rolling']/_x['total_count_rolling'] + d['proportion_ci_lower'] = beta.interval(1 - 0.05, x + 0.5, n - x + 0.5)[0] + d['proportion_ci_upper'] = beta.interval(1 - 0.05, x + 0.5, n - x + 0.5)[1] + return pd.Series(d, index=['proportion', 'proportion_ci_lower', 'proportion_ci_upper']) + + + def calculate_proportion(_x, _n): x = _x.round() n = _n.round() @@ -9,10 +33,14 @@ def calculate_proportion(_x, _n): est_proportion = _x/_n return est_proportion, ci_low, ci_upp + + def compute_total_count(df, col, new_col): df.loc[:,new_col] = df[col].sum() return df + + def expand_dates(df, date_min, date_max, index_col, grp_col): idx = pd.date_range(date_min, date_max) df = ( @@ -29,6 +57,8 @@ def expand_dates(df, date_min, date_max, index_col, grp_col): ) return df + + def compute_rolling_mean_all_lineages(df, index_col, col, new_col, grp_col): idx = pd.date_range(df[index_col].min(), df[index_col].max()) df = ( @@ -48,6 +78,8 @@ def compute_rolling_mean_all_lineages(df, index_col, col, new_col, grp_col): ) return df + + def compute_rolling_mean(df, index_col, col, new_col): df = ( df @@ -57,6 +89,8 @@ def compute_rolling_mean(df, index_col, col, new_col): ) return df + + def transform_prevalence(resp, path_to_results = [], cumulative = False): buckets = resp for i in path_to_results: @@ -109,6 +143,8 @@ def transform_prevalence(resp, path_to_results = [], cumulative = False): } return dict_response + + def compute_cumulative(grp, cols): grp = grp.sort_values("date") if grp.shape[0] != 0: @@ -124,6 +160,8 @@ def compute_cumulative(grp, cols): grp.loc[:, "cum_{}".format(i)] = 0 return grp.tail(1) + + def transform_prevalence_by_location_and_tiime(flattened_response, ndays = None, query_detected = False): df_response = ( pd.DataFrame(flattened_response) @@ -151,6 +189,8 @@ def transform_prevalence_by_location_and_tiime(flattened_response, ndays = None, } return dict_response + + def create_nested_mutation_query(location_id = None, lineages = [], mutations = []): # For multiple lineages and mutations: (Lineage 1 AND mutation 1 AND mutation 2..) OR (Lineage 2 AND mutation 1 AND mutation 2..) ... query_obj = { @@ -197,6 +237,8 @@ def create_nested_mutation_query(location_id = None, lineages = [], mutations = parse_location_id_to_query(location_id, query_obj) return query_obj + + def classify_other_category(grp, keep_lineages): grp.loc[(~grp["lineage"].isin(keep_lineages)) | (grp["lineage"] == "none"), "lineage"] = "other" # Temporarily remove none. TODO: Proper fix grp = grp.groupby("lineage").agg({ @@ -205,6 +247,8 @@ def classify_other_category(grp, keep_lineages): }) return grp + + def get_major_lineage_prevalence(df, index_col, keep_lineages = [], prevalence_threshold = 0.05, nday_threshold = 10, ndays = 180): date_limit = dt.today() - timedelta(days = ndays) lineages_to_retain = df[(df["prevalence"] >= prevalence_threshold) & (df["date"] >= date_limit)]["lineage"].value_counts() @@ -218,6 +262,8 @@ def get_major_lineage_prevalence(df, index_col, keep_lineages = [], prevalence_t df.loc[:,"prevalence"] = df["lineage_count"]/df["total_count"] return df + + def parse_location_id_to_query(query_id, query_obj = None): if query_id == None: return None @@ -247,6 +293,8 @@ def parse_location_id_to_query(query_id, query_obj = None): }) return query_obj + + def create_lineage_concat_query(queries, query_tmpl): queries = queries.split(",") if len(queries) == 1: @@ -267,6 +315,8 @@ def create_lineage_concat_query(queries, query_tmpl): } } + + def create_iterator(lineages, mutations): print(lineages) print(mutations) @@ -276,5 +326,7 @@ def create_iterator(lineages, mutations): return zip([None], [mutations]) return zip([], []) + + def get_total_hits(d): # To account for difference in ES versions 7.12.0 vs 6.8.13 return d["hits"]["total"]["value"] if isinstance(d["hits"]["total"], dict) else d["hits"]["total"] \ No newline at end of file From 8dd6a8c08109d83d9f3930330e145a9010952cda Mon Sep 17 00:00:00 2001 From: Alaa Abdel Latif Date: Fri, 23 Sep 2022 15:26:30 -0700 Subject: [PATCH 3/3] implement new handler PrevalenceByLocationAndTimeHandler2 for speedup improvements --- web/handlers/genomics/prevalence.py | 170 +++++++++++++++++++++++++++- 1 file changed, 165 insertions(+), 5 deletions(-) diff --git a/web/handlers/genomics/prevalence.py b/web/handlers/genomics/prevalence.py index 5c5f1938..4f22c6f3 100644 --- a/web/handlers/genomics/prevalence.py +++ b/web/handlers/genomics/prevalence.py @@ -1,9 +1,25 @@ -from .util import transform_prevalence, transform_prevalence_by_location_and_tiime, compute_rolling_mean, create_nested_mutation_query, get_major_lineage_prevalence, compute_total_count, compute_rolling_mean_all_lineages, expand_dates, parse_location_id_to_query, create_iterator +from .util import ( + calculate_confidence_intervals, + calculate_proportion2, + transform_prevalence, + transform_prevalence_by_location_and_tiime, + compute_rolling_mean, + create_nested_mutation_query, + get_major_lineage_prevalence, + compute_total_count, + compute_rolling_mean_all_lineages, + expand_dates, + parse_location_id_to_query, + create_iterator +) from .base import BaseHandler +from collections import defaultdict from tornado import gen import pandas as pd from datetime import timedelta, datetime as dt + + # Get global prevalence of lineage by date class GlobalPrevalenceByTimeHandler(BaseHandler): @@ -41,6 +57,146 @@ def _get(self): "results": resp } +class PrevalenceByLocationAndTimeHandler2(BaseHandler): + + @gen.coroutine + def _get(self): + # collect parameters + query_location = self.get_argument("location_id", None) + query_pangolin_lineage = self.get_argument("pangolin_lineage", None) + query_pangolin_lineage = query_pangolin_lineage.split(",") if query_pangolin_lineage is not None else [] + query_mutations = self.get_argument("mutations", None) + query_mutations = query_mutations.split(" AND ") if query_mutations is not None else [] + cumulative = self.get_argument("cumulative", None) + cumulative = True if cumulative == "true" else False + results = {} + # initialize ES queries - one for the chosen lineages/mutations, another for total count + total_query = { + "size": 0, + "aggs": { + "prevalence": { + "filter": { + "bool": { + "must": [] + } + }, + "aggs": { + "count": { + "terms": { + "field": "date_collected", + "size": self.size + }, + } + } + } + } + } + lineage_query = { + "size": 0, + "aggs": { + "prevalence": { + "filter": { + "bool": { + "should": [{"term": {"pangolin_lineage": i}} for i in query_pangolin_lineage], + "must": [{"term": {"mutations.mutation": i}} for i in query_mutations] + }, + }, + "aggs": { + "count": { + "terms": { + "field": "date_collected", + "size": len(query_pangolin_lineage) + }, + "aggs": { + "lineage_count":{"terms": { + "field": "pangolin_lineage", + "size": self.size + }} + } + } + } + } + } + } + # add location information to each query + parse_location_id_to_query(query_location, total_query["aggs"]["prevalence"]["filter"]) + parse_location_id_to_query(query_location, lineage_query["aggs"]["prevalence"]["filter"]) + # run total count query + total_resp = yield self.asynchronous_fetch(total_query) + # process total count data into dataframe + total_resp_dict = defaultdict(list) + for bkt in total_resp['aggregations']['prevalence']['count']['buckets']: + total_resp_dict['date'].append(bkt['key']) + total_resp_dict['total_count'].append(bkt['doc_count']) + total_resp_df = pd.DataFrame().from_dict(total_resp_dict) + total_resp_df['date'] = pd.to_datetime(total_resp_df['date']) + total_resp_df.sort_values(by='date', ascending=True, inplace=True) + total_resp_df = ( + total_resp_df + .set_index('date') + .assign(**{'total_count_rolling': lambda x: x['total_count'].rolling("7d").mean()}) + .reset_index() + ) + # run lineage/mutation count query + resp = yield self.asynchronous_fetch(lineage_query) + # process lineage/mutation count data into dataframe + resp_dict = defaultdict(list) + for bkt in resp['aggregations']['prevalence']['count']['buckets']: + for lin in bkt['lineage_count']['buckets']: + resp_dict['date'].append(bkt['key']) + resp_dict['pangolin_lineage'].append(lin['key']) + resp_dict['lineage_count'].append(lin['doc_count']) + resp_df = pd.DataFrame().from_dict(resp_dict) + resp_df['pangolin_lineage'] = resp_df['pangolin_lineage'].str.upper() + resp_df['date'] = pd.to_datetime(resp_df['date']) + resp_df.sort_values(by='date', ascending=True, inplace=True) + # compute 7-day rolling count for each lineage + lin_count_rolling = (resp_df.set_index('date') + .groupby('pangolin_lineage')['lineage_count'] + .apply(lambda x: x.asfreq('1d').rolling('7d').mean()) + .reset_index() + .rename(columns={'lineage_count': 'lineage_count_rolling'}) + ) + lin_count_rolling['lineage_count_rolling'] = lin_count_rolling['lineage_count_rolling'].fillna(0) + resp_df = pd.merge(lin_count_rolling, resp_df, on=['date', 'pangolin_lineage'], how='left') + resp_df['lineage_count'] = resp_df['lineage_count'].fillna(0) + # fuse dataframes and compute proportion results + mrg_resp_df = pd.merge(resp_df, total_resp_df, on='date', how='inner') + first_dates = (resp_df[resp_df['lineage_count']>0] + .groupby('pangolin_lineage') + .agg(first_date=('date', 'min')) + .reset_index()) + mrg_resp_df = pd.merge(mrg_resp_df, first_dates, on='pangolin_lineage') + mrg_resp_df = mrg_resp_df.loc[mrg_resp_df['date'] >= (mrg_resp_df['first_date'] - pd.to_timedelta(6, unit='d'))] + mrg_resp_df['proportion'] = mrg_resp_df['lineage_count_rolling'] / mrg_resp_df['total_count_rolling'] + mrg_resp_df[['proportion_ci_lower', + 'proportion_ci_upper']] = (mrg_resp_df[['lineage_count_rolling', 'total_count_rolling']] + .apply(calculate_confidence_intervals, axis=1, result_type='expand')) + mrg_resp_df.loc[:,"date"] = mrg_resp_df["date"].apply(lambda x: x.strftime("%Y-%m-%d")) + mrg_resp_df['proportion'] = mrg_resp_df['proportion'].fillna(0).astype(float) + mrg_resp_df['proportion_ci_lower'] = mrg_resp_df['proportion_ci_lower'].fillna(0).astype(float) + mrg_resp_df['proportion_ci_upper'] = mrg_resp_df['proportion_ci_upper'].fillna(0).astype(float) + target_cols = ['date', 'total_count', 'lineage_count', 'total_count_rolling', 'lineage_count_rolling', + 'proportion', 'proportion_ci_lower', 'proportion_ci_upper'] + # convert into nested dictionary object + final_result = (mrg_resp_df + .sort_values(by='date') + .groupby('pangolin_lineage') + .apply(lambda x: x[target_cols].to_dict(orient='records')) + .to_dict()) + res_key = None + if len(query_pangolin_lineage) > 0: + res_key = " OR ".join(query_pangolin_lineage) + if len(query_mutations) > 0: + res_key = "({}) AND ({})".format(res_key, " AND ".join(query_mutations)) if res_key is not None else " AND ".join(query_mutations) + results[res_key] = final_result + return { + "success": True, + "results": final_result, + } + + + class PrevalenceByLocationAndTimeHandler(BaseHandler): @gen.coroutine @@ -95,9 +251,11 @@ def _get(self): results[res_key] = resp return { "success": True, - "results": results + "results": results, } + + class CumulativePrevalenceByLocationHandler(BaseHandler): country_iso3_to_iso2 = {"BGD": "BD", "BEL": "BE", "BFA": "BF", "BGR": "BG", "BIH": "BA", "BRB": "BB", "WLF": "WF", "BLM": "BL", "BMU": "BM", "BRN": "BN", "BOL": "BO", "BHR": "BH", "BDI": "BI", "BEN": "BJ", "BTN": "BT", "JAM": "JM", "BVT": "BV", "BWA": "BW", "WSM": "WS", "BES": "BQ", "BRA": "BR", "BHS": "BS", "JEY": "JE", "BLR": "BY", "BLZ": "BZ", "RUS": "RU", "RWA": "RW", "SRB": "RS", "TLS": "TL", "REU": "RE", "TKM": "TM", "TJK": "TJ", "ROU": "RO", "TKL": "TK", "GNB": "GW", "GUM": "GU", "GTM": "GT", "SGS": "GS", "GRC": "GR", "GNQ": "GQ", "GLP": "GP", "JPN": "JP", "GUY": "GY", "GGY": "GG", "GUF": "GF", "GEO": "GE", "GRD": "GD", "GBR": "GB", "GAB": "GA", "SLV": "SV", "GIN": "GN", "GMB": "GM", "GRL": "GL", "GIB": "GI", "GHA": "GH", "OMN": "OM", "TUN": "TN", "JOR": "JO", "HRV": "HR", "HTI": "HT", "HUN": "HU", "HKG": "HK", "HND": "HN", "HMD": "HM", "VEN": "VE", "PRI": "PR", "PSE": "PS", "PLW": "PW", "PRT": "PT", "SJM": "SJ", "PRY": "PY", "IRQ": "IQ", "PAN": "PA", "PYF": "PF", "PNG": "PG", "PER": "PE", "PAK": "PK", "PHL": "PH", "PCN": "PN", "POL": "PL", "SPM": "PM", "ZMB": "ZM", "ESH": "EH", "EST": "EE", "EGY": "EG", "ZAF": "ZA", "ECU": "EC", "ITA": "IT", "VNM": "VN", "SLB": "SB", "ETH": "ET", "SOM": "SO", "ZWE": "ZW", "SAU": "SA", "ESP": "ES", "ERI": "ER", "MNE": "ME", "MDA": "MD", "MDG": "MG", "MAF": "MF", "MAR": "MA", "MCO": "MC", "UZB": "UZ", "MMR": "MM", "MLI": "ML", "MAC": "MO", "MNG": "MN", "MHL": "MH", "MKD": "MK", "MUS": "MU", "MLT": "MT", "MWI": "MW", "MDV": "MV", "MTQ": "MQ", "MNP": "MP", "MSR": "MS", "MRT": "MR", "IMN": "IM", "UGA": "UG", "TZA": "TZ", "MYS": "MY", "MEX": "MX", "ISR": "IL", "FRA": "FR", "IOT": "IO", "SHN": "SH", "FIN": "FI", "FJI": "FJ", "FLK": "FK", "FSM": "FM", "FRO": "FO", "NIC": "NI", "NLD": "NL", "NOR": "NO", "NAM": "NA", "VUT": "VU", "NCL": "NC", "NER": "NE", "NFK": "NF", "NGA": "NG", "NZL": "NZ", "NPL": "NP", "NRU": "NR", "NIU": "NU", "COK": "CK", "XKX": "XK", "CIV": "CI", "CHE": "CH", "COL": "CO", "CHN": "CN", "CMR": "CM", "CHL": "CL", "CCK": "CC", "CAN": "CA", "COG": "CG", "CAF": "CF", "COD": "CD", "CZE": "CZ", "CYP": "CY", "CXR": "CX", "CRI": "CR", "CUW": "CW", "CPV": "CV", "CUB": "CU", "SWZ": "SZ", "SYR": "SY", "SXM": "SX", "KGZ": "KG", "KEN": "KE", "SSD": "SS", "SUR": "SR", "KIR": "KI", "KHM": "KH", "KNA": "KN", "COM": "KM", "STP": "ST", "SVK": "SK", "KOR": "KR", "SVN": "SI", "PRK": "KP", "KWT": "KW", "SEN": "SN", "SMR": "SM", "SLE": "SL", "SYC": "SC", "KAZ": "KZ", "CYM": "KY", "SGP": "SG", "SWE": "SE", "SDN": "SD", "DOM": "DO", "DMA": "DM", "DJI": "DJ", "DNK": "DK", "VGB": "VG", "DEU": "DE", "YEM": "YE", "DZA": "DZ", "USA": "US", "URY": "UY", "MYT": "YT", "UMI": "UM", "LBN": "LB", "LCA": "LC", "LAO": "LA", "TUV": "TV", "TWN": "TW", "TTO": "TT", "TUR": "TR", "LKA": "LK", "LIE": "LI", "LVA": "LV", "TON": "TO", "LTU": "LT", "LUX": "LU", "LBR": "LR", "LSO": "LS", "THA": "TH", "ATF": "TF", "TGO": "TG", "TCD": "TD", "TCA": "TC", "LBY": "LY", "VAT": "VA", "VCT": "VC", "ARE": "AE", "AND": "AD", "ATG": "AG", "AFG": "AF", "AIA": "AI", "VIR": "VI", "ISL": "IS", "IRN": "IR", "ARM": "AM", "ALB": "AL", "AGO": "AO", "ATA": "AQ", "ASM": "AS", "ARG": "AR", "AUS": "AU", "AUT": "AT", "ABW": "AW", "IND": "IN", "ALA": "AX", "AZE": "AZ", "IRL": "IE", "IDN": "ID", "UKR": "UA", "QAT": "QA", "MOZ": "MZ"} # TODO: Move to separate class. @@ -200,6 +358,8 @@ def _get(self): "results": results } + + class PrevalenceAllLineagesByLocationHandler(BaseHandler): @gen.coroutine @@ -281,6 +441,8 @@ def _get(self): resp = {"success": True, "results": df_response.to_dict(orient="records")} return resp + + class PrevalenceByAAPositionHandler(BaseHandler): @gen.coroutine @@ -410,6 +572,4 @@ def _get(self): df_response.loc[:,"date"] = df_response["date"].apply(lambda x: x.strftime("%Y-%m-%d")) dict_response = df_response.to_dict(orient="records") resp = {"success": True, "results": dict_response} - return resp - - + return resp \ No newline at end of file