diff --git a/hagridctl/src/import.rs b/hagridctl/src/import.rs index f16aa43..2b4d8e8 100644 --- a/hagridctl/src/import.rs +++ b/hagridctl/src/import.rs @@ -1,13 +1,8 @@ -//! Imports keyrings into Hagrids database. -//! -//! Usage: -//! -//! cargo run --release --example import -- \ -//! [...] - use std::path::{Path,PathBuf}; use std::fs::File; use std::io::Read; +use std::thread; +use std::cmp::min; extern crate failure; use failure::Fallible as Result; @@ -21,31 +16,61 @@ use openpgp::parse::{PacketParser, PacketParserResult, Parse}; extern crate hagrid_database as database; use database::{Database, KeyDatabase, ImportResult}; -use indicatif::{ProgressBar,ProgressStyle}; +use indicatif::{MultiProgress,ProgressBar,ProgressStyle}; use HagridConfig; -pub fn do_import(config: &HagridConfig, keyrings: Vec) -> Result<()> { - let db = KeyDatabase::new( - config.keys_internal_dir.as_ref().unwrap(), - config.keys_external_dir.as_ref().unwrap(), - config.tmp_dir.as_ref().unwrap() - )?; +// parsing TPKs takes time, so we benefit from some parallelism. however, the +// database is locked during the entire merge operation, so we get diminishing +// returns after the first few threads. +const NUM_THREADS_MAX: usize = 3; - // For each input file, create a parser. - for input in keyrings.iter() { - eprintln!("Parsing {:?}...", input); - let errors = import_from_file(&db, &input)?; - for error in errors { - eprintln!("{}", error); - } - } +pub fn do_import(config: &HagridConfig, input_files: Vec) -> Result<()> { + let num_threads = min(NUM_THREADS_MAX, input_files.len()); + let input_file_chunks = setup_chunks_with_progress(input_files, num_threads); + + let threads: Vec<_> = input_file_chunks + .into_iter() + .map(move |(input_file_chunk, progress_bar)| { + let config = config.clone(); + thread::spawn(move || { + let errors = import_from_files(&config, input_file_chunk, progress_bar).unwrap(); + for error in errors { + println!("{}", error); + } + }) + }) + .collect(); + + threads.into_iter().for_each(|t| t.join().unwrap()); Ok(()) } -struct ImportStats { - progress: ProgressBar, +fn setup_chunks_with_progress( + mut input_files: Vec, + num_threads: usize, +) -> Vec<(Vec,ProgressBar)> { + let multiprogress = MultiProgress::new(); + + let chunk_size = (input_files.len() + (num_threads - 1)) / num_threads; + let input_file_chunks: Vec<(Vec,ProgressBar)> = (0..num_threads) + .map(|_| { + let len = input_files.len(); + input_files.drain(0..min(chunk_size,len)).collect() + }) + .map(|chunk| (chunk, multiprogress.add(ProgressBar::new(0)))) + .collect(); + + eprintln!("Importing in {:?} threads", num_threads); + thread::spawn(move || multiprogress.join().unwrap()); + + input_file_chunks +} + +struct ImportStats<'a> { + progress: &'a ProgressBar, + filename: String, count_total: u64, count_err: u64, count_new: u64, @@ -54,10 +79,11 @@ struct ImportStats { errors: Vec, } -impl ImportStats { - fn new(progress: ProgressBar) -> Self { +impl <'a> ImportStats<'a> { + fn new(progress: &'a ProgressBar, filename: String) -> Self { ImportStats { progress, + filename, count_total: 0, count_err: 0, count_new: 0, @@ -87,26 +113,48 @@ impl ImportStats { return; } self.progress.set_message(&format!( - "imported {:5} keys, {:4} New {:4} Updated {:4} Unchanged {:4} Errors", - self.count_total, self.count_new, self.count_updated, self.count_unchanged, self.count_err)); + "{}, imported {:5} keys, {:4} New {:4} Updated {:4} Unchanged {:4} Errors", + &self.filename, self.count_total, self.count_new, self.count_updated, self.count_unchanged, self.count_err)); } fn progress_finish(&self) { - self.progress.finish_with_message(&format!( - "imported {:5} keys, {:4} New {:4} Updated {:4} Unchanged {:4} Errors", - self.count_total, self.count_new, self.count_updated, self.count_unchanged, self.count_err)); + self.progress.set_message(&format!( + "{}, imported {:5} keys, {:4} New {:4} Updated {:4} Unchanged {:4} Errors", + &self.filename, self.count_total, self.count_new, self.count_updated, self.count_unchanged, self.count_err)); } } -fn import_from_file(db: &KeyDatabase, input: &Path) -> Result> { +fn import_from_files(config: &HagridConfig, input_files: Vec, progress_bar: ProgressBar) -> Result> { + let db = KeyDatabase::new( + config.keys_internal_dir.as_ref().unwrap(), + config.keys_external_dir.as_ref().unwrap(), + config.tmp_dir.as_ref().unwrap() + )?; + + progress_bar + .set_style(ProgressStyle::default_bar() + .template("[{elapsed_precise}] {bar:40.cyan/blue} {msg}") + .progress_chars("##-")); + progress_bar.set_message("Starting…"); + + let result = input_files + .into_iter() + .map(|input_file| import_from_file(&db, &input_file, &progress_bar)) + .collect::>>() + .map(|results| results.into_iter().flatten().collect()); + + progress_bar.finish(); + + result +} + +fn import_from_file(db: &KeyDatabase, input: &Path, progress_bar: &ProgressBar) -> Result> { let input_file = File::open(input)?; - let pb = ProgressBar::new(input_file.metadata()?.len()); - pb.set_style(ProgressStyle::default_bar() - .template("[{elapsed_precise}] {bar:40.cyan/blue} {msg}") - .progress_chars("##-")); - let input_reader = &mut pb.wrap_read(input_file); - let mut stats = ImportStats::new(pb); + progress_bar.set_length(input_file.metadata()?.len()); + let input_reader = &mut progress_bar.wrap_read(input_file); + let filename = input.file_name().unwrap().to_string_lossy().to_string(); + let mut stats = ImportStats::new(progress_bar, filename); read_file_to_tpks(input_reader, &mut |acc| { let result = import_key(&db, acc); diff --git a/hagridctl/src/main.rs b/hagridctl/src/main.rs index 3fe4e82..1a2ec61 100644 --- a/hagridctl/src/main.rs +++ b/hagridctl/src/main.rs @@ -32,7 +32,7 @@ pub struct HagridConfigs { // this is not an exact match - Rocket config has more complicated semantics // than a plain toml file. // see also https://github.com/SergioBenitez/Rocket/issues/228 -#[derive(Deserialize)] +#[derive(Deserialize,Clone)] pub struct HagridConfig { _template_dir: Option, keys_internal_dir: Option,