Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 5 additions & 40 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ diesel = { version = "2.2", features = ["postgres", "r2d2", "serde_json"] }
diesel_migrations = "2.2"
uuid = { version = "1.18", features = ["v4"] }
base64 = "0.22"
tower = { version = "0.4", features = ["limit", "util", "buffer"] }
tower = { version = "0.5", features = ["limit", "util", "buffer"] }
97 changes: 46 additions & 51 deletions src/sync/collections.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::{get_json, request};
use crate::models::{self, CollectionNew, CollectionVersionNew};
use crate::schema::collection_versions;
use crate::sync::utils::RateLimitedHttpService;
use actix_web::web;
use anyhow::{Context, Result};
use diesel::pg::upsert::excluded;
Expand All @@ -11,13 +12,10 @@ use diesel::{
};
use futures::future::try_join_all;
use log::info;
use reqwest::{Client, Request};
use serde_json::Value;
use std::collections::{HashMap, HashSet};
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use tower::buffer::Buffer;
use tower::limit::{ConcurrencyLimit, RateLimit};

#[allow(dead_code)]
#[derive(Debug, Clone)]
Expand All @@ -30,10 +28,7 @@ pub struct CollectionData {
pub metadata: Value,
}

pub async fn get_version(
url: String,
service: Buffer<ConcurrencyLimit<RateLimit<Client>>, Request>,
) -> Result<Value> {
pub async fn get_version(url: String, service: RateLimitedHttpService) -> Result<Value> {
let (service, resp) = request(url, service).await;
let status = resp.status().as_str().to_string();
let json_response = resp.json::<Value>().await.unwrap();
Expand Down Expand Up @@ -69,34 +64,33 @@ pub async fn get_version(
pub async fn sync_collections(
pool: web::Data<Pool<ConnectionManager<PgConnection>>>,
response: &Value,
service: Buffer<ConcurrencyLimit<RateLimit<Client>>, Request>,
) -> Result<()> {
let results = response.as_object().unwrap()["data"].as_array().unwrap();
let galaxy_url = dotenv::var("GALAXY_URL").unwrap_or("https://galaxy.ansible.com/".to_string());
let collection_version_futures: Vec<_> = results
.iter()
.map(|v| {
let nspace = v["collection_version"]["namespace"]
.as_str()
.unwrap()
.to_string();
let n = v["collection_version"]["name"]
.as_str()
.unwrap()
.to_string();
let vs = v["collection_version"]["version"]
.as_str()
.unwrap()
.to_string();
get_version(
format!(
"{}api/v3/plugin/ansible/content/published/collections/index/{}/{}/versions/{}/",
galaxy_url, nspace, n, vs
),
service.clone(),
)
})
.collect();
let client = reqwest::Client::new();
let mut collection_version_futures = Vec::new();
for v in results.iter() {
let nspace = v["collection_version"]["namespace"]
.as_str()
.unwrap()
.to_string();
let n = v["collection_version"]["name"]
.as_str()
.unwrap()
.to_string();
let vs = v["collection_version"]["version"]
.as_str()
.unwrap()
.to_string();
let service = crate::sync::utils::build_service(client.clone());
collection_version_futures.push(get_version(
format!(
"{}api/v3/plugin/ansible/content/published/collections/index/{}/{}/versions/{}/",
galaxy_url, nspace, n, vs
),
service,
));
}
let cversions = try_join_all(collection_version_futures)
.await
.context("Failed to join collection versions futures")?;
Expand Down Expand Up @@ -167,7 +161,7 @@ pub async fn sync_collections(
}

pub async fn fetch_versions(
mut service: Buffer<ConcurrencyLimit<RateLimit<Client>>, Request>,
mut service: RateLimitedHttpService,
url: &Value,
) -> Result<Vec<CollectionData>> {
let mut versions: Vec<CollectionData> = Vec::new();
Expand All @@ -186,19 +180,19 @@ pub async fn fetch_versions(
.unwrap();

// Downloading
let collection_version_futures: Vec<_> = results
.iter()
.map(|v| {
get_version(
format!(
"{}{}",
galaxy_url.strip_suffix('/').unwrap(),
v["href"].as_str().unwrap()
),
service.clone(),
)
})
.collect();
let client = reqwest::Client::new();
let mut collection_version_futures = Vec::new();
for v in results.iter() {
let service = crate::sync::utils::build_service(client.clone());
collection_version_futures.push(get_version(
format!(
"{}{}",
galaxy_url.strip_suffix('/').unwrap(),
v["href"].as_str().unwrap()
),
service,
));
}
let cversions = try_join_all(collection_version_futures)
.await
.context("Failed to join collection versions futures")?;
Expand Down Expand Up @@ -231,7 +225,6 @@ pub async fn fetch_versions(

pub async fn process_collection_data(
pool: web::Data<Pool<ConnectionManager<PgConnection>>>,
service: Buffer<ConcurrencyLimit<RateLimit<Client>>, Request>,
data: Vec<Vec<CollectionData>>,
fetch_dependencies: bool,
) -> Result<()> {
Expand Down Expand Up @@ -326,10 +319,12 @@ pub async fn process_collection_data(
info!("Fetching collection dependencies");
let dependencies: Vec<_> = deps.keys().map(|url| get_json(url)).collect();
let deps_json = try_join_all(dependencies).await.unwrap();
let to_fetch: Vec<_> = deps_json
.iter()
.map(|c| fetch_versions(service.clone(), &c["versions_url"]))
.collect();
let client = reqwest::Client::new();
let mut to_fetch = Vec::new();
for c in deps_json.iter() {
let service = crate::sync::utils::build_service(client.clone());
to_fetch.push(fetch_versions(service, &c["versions_url"]));
}
to_process = try_join_all(to_fetch).await.unwrap();
} else {
break;
Expand Down
20 changes: 11 additions & 9 deletions src/sync/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,16 @@ pub async fn process_requirements(
} else {
info!("Syncing collections");
let client = reqwest::Client::new();
let service = build_service(client.clone());
let to_fetch: Vec<_> = responses
.iter()
.map(|c| fetch_versions(service.clone(), &c["versions_url"]))
.collect();

// Create separate services for each fetch operation
let mut to_fetch = Vec::new();
for c in responses.iter() {
let service = build_service(client.clone());
to_fetch.push(fetch_versions(service, &c["versions_url"]));
}
let data = try_join_all(to_fetch).await?;
process_collection_data(pool.clone(), service.clone(), data, true).await?

process_collection_data(pool.clone(), data, true).await?
};
}
}
Expand Down Expand Up @@ -141,8 +144,7 @@ pub async fn mirror_content(
} else {
panic!("Invalid content type!")
};
let client = reqwest::Client::new();
let service = build_service(client.clone());

loop {
let results = get_json(target.as_str()).await.unwrap();
if content_type == "roles" {
Expand All @@ -157,7 +159,7 @@ pub async fn mirror_content(
.context("Failed to join next_link")?
} else if content_type == "collections" {
info!("Syncing collections");
sync_collections(pool.clone(), &results, service.clone()).await?;
sync_collections(pool.clone(), &results).await?;
if results.as_object().unwrap()["links"]["next"]
.as_str()
.is_none()
Expand Down
Loading
Loading