mirror of
https://gitlab.com/hagrid-keyserver/hagrid.git
synced 2023-02-13 20:55:02 -05:00
import: add multi-thread capabilities
This commit is contained in:
parent
38292526db
commit
99e564f1f7
2 changed files with 87 additions and 39 deletions
|
@ -1,13 +1,8 @@
|
|||
//! Imports keyrings into Hagrids database.
|
||||
//!
|
||||
//! Usage:
|
||||
//!
|
||||
//! cargo run --release --example import -- \
|
||||
//! <state-dir> <keyring> [<keyring>...]
|
||||
|
||||
use std::path::{Path,PathBuf};
|
||||
use std::fs::File;
|
||||
use std::io::Read;
|
||||
use std::thread;
|
||||
use std::cmp::min;
|
||||
|
||||
extern crate failure;
|
||||
use failure::Fallible as Result;
|
||||
|
@ -21,31 +16,61 @@ use openpgp::parse::{PacketParser, PacketParserResult, Parse};
|
|||
extern crate hagrid_database as database;
|
||||
use database::{Database, KeyDatabase, ImportResult};
|
||||
|
||||
use indicatif::{ProgressBar,ProgressStyle};
|
||||
use indicatif::{MultiProgress,ProgressBar,ProgressStyle};
|
||||
|
||||
use HagridConfig;
|
||||
|
||||
pub fn do_import(config: &HagridConfig, keyrings: Vec<PathBuf>) -> Result<()> {
|
||||
let db = KeyDatabase::new(
|
||||
config.keys_internal_dir.as_ref().unwrap(),
|
||||
config.keys_external_dir.as_ref().unwrap(),
|
||||
config.tmp_dir.as_ref().unwrap()
|
||||
)?;
|
||||
// parsing TPKs takes time, so we benefit from some parallelism. however, the
|
||||
// database is locked during the entire merge operation, so we get diminishing
|
||||
// returns after the first few threads.
|
||||
const NUM_THREADS_MAX: usize = 3;
|
||||
|
||||
// For each input file, create a parser.
|
||||
for input in keyrings.iter() {
|
||||
eprintln!("Parsing {:?}...", input);
|
||||
let errors = import_from_file(&db, &input)?;
|
||||
for error in errors {
|
||||
eprintln!("{}", error);
|
||||
}
|
||||
}
|
||||
pub fn do_import(config: &HagridConfig, input_files: Vec<PathBuf>) -> Result<()> {
|
||||
let num_threads = min(NUM_THREADS_MAX, input_files.len());
|
||||
let input_file_chunks = setup_chunks_with_progress(input_files, num_threads);
|
||||
|
||||
let threads: Vec<_> = input_file_chunks
|
||||
.into_iter()
|
||||
.map(move |(input_file_chunk, progress_bar)| {
|
||||
let config = config.clone();
|
||||
thread::spawn(move || {
|
||||
let errors = import_from_files(&config, input_file_chunk, progress_bar).unwrap();
|
||||
for error in errors {
|
||||
println!("{}", error);
|
||||
}
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
||||
threads.into_iter().for_each(|t| t.join().unwrap());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct ImportStats {
|
||||
progress: ProgressBar,
|
||||
fn setup_chunks_with_progress(
|
||||
mut input_files: Vec<PathBuf>,
|
||||
num_threads: usize,
|
||||
) -> Vec<(Vec<PathBuf>,ProgressBar)> {
|
||||
let multiprogress = MultiProgress::new();
|
||||
|
||||
let chunk_size = (input_files.len() + (num_threads - 1)) / num_threads;
|
||||
let input_file_chunks: Vec<(Vec<PathBuf>,ProgressBar)> = (0..num_threads)
|
||||
.map(|_| {
|
||||
let len = input_files.len();
|
||||
input_files.drain(0..min(chunk_size,len)).collect()
|
||||
})
|
||||
.map(|chunk| (chunk, multiprogress.add(ProgressBar::new(0))))
|
||||
.collect();
|
||||
|
||||
eprintln!("Importing in {:?} threads", num_threads);
|
||||
thread::spawn(move || multiprogress.join().unwrap());
|
||||
|
||||
input_file_chunks
|
||||
}
|
||||
|
||||
struct ImportStats<'a> {
|
||||
progress: &'a ProgressBar,
|
||||
filename: String,
|
||||
count_total: u64,
|
||||
count_err: u64,
|
||||
count_new: u64,
|
||||
|
@ -54,10 +79,11 @@ struct ImportStats {
|
|||
errors: Vec<failure::Error>,
|
||||
}
|
||||
|
||||
impl ImportStats {
|
||||
fn new(progress: ProgressBar) -> Self {
|
||||
impl <'a> ImportStats<'a> {
|
||||
fn new(progress: &'a ProgressBar, filename: String) -> Self {
|
||||
ImportStats {
|
||||
progress,
|
||||
filename,
|
||||
count_total: 0,
|
||||
count_err: 0,
|
||||
count_new: 0,
|
||||
|
@ -87,26 +113,48 @@ impl ImportStats {
|
|||
return;
|
||||
}
|
||||
self.progress.set_message(&format!(
|
||||
"imported {:5} keys, {:4} New {:4} Updated {:4} Unchanged {:4} Errors",
|
||||
self.count_total, self.count_new, self.count_updated, self.count_unchanged, self.count_err));
|
||||
"{}, imported {:5} keys, {:4} New {:4} Updated {:4} Unchanged {:4} Errors",
|
||||
&self.filename, self.count_total, self.count_new, self.count_updated, self.count_unchanged, self.count_err));
|
||||
|
||||
}
|
||||
fn progress_finish(&self) {
|
||||
self.progress.finish_with_message(&format!(
|
||||
"imported {:5} keys, {:4} New {:4} Updated {:4} Unchanged {:4} Errors",
|
||||
self.count_total, self.count_new, self.count_updated, self.count_unchanged, self.count_err));
|
||||
self.progress.set_message(&format!(
|
||||
"{}, imported {:5} keys, {:4} New {:4} Updated {:4} Unchanged {:4} Errors",
|
||||
&self.filename, self.count_total, self.count_new, self.count_updated, self.count_unchanged, self.count_err));
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
fn import_from_file(db: &KeyDatabase, input: &Path) -> Result<Vec<failure::Error>> {
|
||||
fn import_from_files(config: &HagridConfig, input_files: Vec<PathBuf>, progress_bar: ProgressBar) -> Result<Vec<failure::Error>> {
|
||||
let db = KeyDatabase::new(
|
||||
config.keys_internal_dir.as_ref().unwrap(),
|
||||
config.keys_external_dir.as_ref().unwrap(),
|
||||
config.tmp_dir.as_ref().unwrap()
|
||||
)?;
|
||||
|
||||
progress_bar
|
||||
.set_style(ProgressStyle::default_bar()
|
||||
.template("[{elapsed_precise}] {bar:40.cyan/blue} {msg}")
|
||||
.progress_chars("##-"));
|
||||
progress_bar.set_message("Starting…");
|
||||
|
||||
let result = input_files
|
||||
.into_iter()
|
||||
.map(|input_file| import_from_file(&db, &input_file, &progress_bar))
|
||||
.collect::<Result<Vec<_>>>()
|
||||
.map(|results| results.into_iter().flatten().collect());
|
||||
|
||||
progress_bar.finish();
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
fn import_from_file(db: &KeyDatabase, input: &Path, progress_bar: &ProgressBar) -> Result<Vec<failure::Error>> {
|
||||
let input_file = File::open(input)?;
|
||||
let pb = ProgressBar::new(input_file.metadata()?.len());
|
||||
pb.set_style(ProgressStyle::default_bar()
|
||||
.template("[{elapsed_precise}] {bar:40.cyan/blue} {msg}")
|
||||
.progress_chars("##-"));
|
||||
let input_reader = &mut pb.wrap_read(input_file);
|
||||
let mut stats = ImportStats::new(pb);
|
||||
progress_bar.set_length(input_file.metadata()?.len());
|
||||
let input_reader = &mut progress_bar.wrap_read(input_file);
|
||||
let filename = input.file_name().unwrap().to_string_lossy().to_string();
|
||||
let mut stats = ImportStats::new(progress_bar, filename);
|
||||
|
||||
read_file_to_tpks(input_reader, &mut |acc| {
|
||||
let result = import_key(&db, acc);
|
||||
|
|
|
@ -32,7 +32,7 @@ pub struct HagridConfigs {
|
|||
// this is not an exact match - Rocket config has more complicated semantics
|
||||
// than a plain toml file.
|
||||
// see also https://github.com/SergioBenitez/Rocket/issues/228
|
||||
#[derive(Deserialize)]
|
||||
#[derive(Deserialize,Clone)]
|
||||
pub struct HagridConfig {
|
||||
_template_dir: Option<PathBuf>,
|
||||
keys_internal_dir: Option<PathBuf>,
|
||||
|
|
Loading…
Reference in a new issue