From bb44cc84c43cd00217f6ead349c1a30fd5ed045c Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Thu, 28 Apr 2022 20:55:36 +0800 Subject: [PATCH 1/4] update dependencies --- Cargo.toml | 78 +++++++++++++++++++++++++++--------------------------- 1 file changed, 39 insertions(+), 39 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 85d309fb1..287c7f4d9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,24 +13,24 @@ keywords = ["search", "information", "retrieval"] edition = "2018" [dependencies] -oneshot = "0.1" -base64 = "0.13" +oneshot = "0.1.3" +base64 = "0.13.0" byteorder = "1.4.3" -crc32fast = "1.2.1" -once_cell = "1.7.2" -regex ={ version = "1.5.4", default-features = false, features = ["std"] } -tantivy-fst = "0.3" -memmap2 = {version = "0.5", optional=true} -lz4_flex = { version = "0.9", default-features = false, features = ["checked-decode"], optional = true } -brotli = { version = "3.3", optional = true } +crc32fast = "1.3.2" +once_cell = "1.10.0" +regex ={ version = "1.5.5", default-features = false, features = ["std"] } +tantivy-fst = "0.3.0" +memmap2 = { version = "0.5.3", optional = true } +lz4_flex = { version = "0.9.2", default-features = false, features = ["checked-decode"], optional = true } +brotli = { version = "3.3.4", optional = true } snap = { version = "1.0.5", optional = true } -tempfile = { version = "3.2", optional = true } -log = "0.4.14" -serde = { version = "1.0.126", features = ["derive"] } -serde_json = "1.0.64" -num_cpus = "1.13" +tempfile = { version = "3.3.0", optional = true } +log = "0.4.16" +serde = { version = "1.0.136", features = ["derive"] } +serde_json = "1.0.79" +num_cpus = "1.13.1" fs2={ version = "0.4.3", optional = true } -levenshtein_automata = "0.2" +levenshtein_automata = "0.2.1" uuid = { version = "1.0.0", features = ["v4", "serde"] } crossbeam = "0.8.1" tantivy-query-grammar = { version="0.15.0", path="./query-grammar" } @@ -38,43 +38,43 @@ tantivy-bitpacker = { version="0.1", path="./bitpacker" } common = { version = "0.2", path = "./common/", package = "tantivy-common" } fastfield_codecs = { version="0.1", path="./fastfield_codecs", default-features = false } ownedbytes = { version="0.2", path="./ownedbytes" } -stable_deref_trait = "1.2" -rust-stemmers = "1.2" -downcast-rs = "1.2" +stable_deref_trait = "1.2.0" +rust-stemmers = "1.2.0" +downcast-rs = "1.2.0" bitpacking = { version = "0.8.4", default-features = false, features = ["bitpacker4x"] } -census = "0.4" +census = "0.4.0" fnv = "1.0.7" -thiserror = "1.0.24" +thiserror = "1.0.30" htmlescape = "0.3.1" -fail = "0.5" -murmurhash32 = "0.2" -time = { version = "0.3.7", features = ["serde-well-known"] } -smallvec = "1.6.1" -rayon = "1.5" -lru = "0.7.0" -fastdivide = "0.4" -itertools = "0.10.0" -measure_time = "0.8.0" -pretty_assertions = "1.1.0" -serde_cbor = {version="0.11", optional=true} -async-trait = "0.1" +fail = "0.5.0" +murmurhash32 = "0.2.0" +time = { version = "0.3.9", features = ["serde-well-known"] } +smallvec = "1.8.0" +rayon = "1.5.2" +lru = "0.7.5" +fastdivide = "0.4.0" +itertools = "0.10.3" +measure_time = "0.8.2" +pretty_assertions = "1.2.1" +serde_cbor = { version = "0.11.2", optional = true } +async-trait = "0.1.53" [target.'cfg(windows)'.dependencies] winapi = "0.3.9" [dev-dependencies] -rand = "0.8.3" +rand = "0.8.5" maplit = "1.0.2" -matches = "0.1.8" -proptest = "1.0" +matches = "0.1.9" +proptest = "1.0.0" criterion = "0.3.5" -test-log = "0.2.8" +test-log = "0.2.10" env_logger = "0.9.0" -pprof = {version= "0.8", features=["flamegraph", "criterion"]} -futures = "0.3.15" +pprof = { version = "0.8.0", features = ["flamegraph", "criterion"] } +futures = "0.3.21" [dev-dependencies.fail] -version = "0.5" +version = "0.5.0" features = ["failpoints"] [profile.release] From 4db655ae82ae3a89c75d7e29c8ff336addfb3389 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Thu, 28 Apr 2022 21:52:07 +0800 Subject: [PATCH 2/4] update dependencies, update edition --- Cargo.toml | 2 +- src/core/executor.rs | 13 ++++++++----- src/indexer/segment_updater.rs | 13 +++++++------ 3 files changed, 16 insertions(+), 12 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 287c7f4d9..a4ac6ae54 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ homepage = "https://github.com/quickwit-oss/tantivy" repository = "https://github.com/quickwit-oss/tantivy" readme = "README.md" keywords = ["search", "information", "retrieval"] -edition = "2018" +edition = "2021" [dependencies] oneshot = "0.1.3" diff --git a/src/core/executor.rs b/src/core/executor.rs index 5f0b930aa..257acc961 100644 --- a/src/core/executor.rs +++ b/src/core/executor.rs @@ -1,3 +1,5 @@ +use std::sync::atomic::AtomicUsize; + use crossbeam::channel; use rayon::{ThreadPool, ThreadPoolBuilder}; @@ -47,16 +49,17 @@ impl Executor { match self { Executor::SingleThread => args.map(f).collect::>(), Executor::ThreadPool(pool) => { - let args_with_indices: Vec<(usize, A)> = args.enumerate().collect(); - let num_fruits = args_with_indices.len(); + // let args_with_indices: Vec<(usize, A)> = args.enumerate().collect(); + let args: Vec = args.collect(); + let num_fruits = args.len(); let fruit_receiver = { let (fruit_sender, fruit_receiver) = channel::unbounded(); pool.scope(|scope| { - for arg_with_idx in args_with_indices { + for (idx, arg) in args.into_iter().enumerate() { + let idx = AtomicUsize::new(idx); scope.spawn(|_| { - let (idx, arg) = arg_with_idx; let fruit = f(arg); - if let Err(err) = fruit_sender.send((idx, fruit)) { + if let Err(err) = fruit_sender.send((idx.into_inner(), fruit)) { error!( "Failed to send search task. It probably means all search \ threads have panicked. {:?}", diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 4ca751d8d..54124c9e0 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -1,6 +1,5 @@ use std::borrow::BorrowMut; use std::collections::HashSet; -use std::io; use std::io::Write; use std::ops::Deref; use std::path::PathBuf; @@ -27,7 +26,7 @@ use crate::indexer::{ SegmentSerializer, }; use crate::schema::Schema; -use crate::{FutureResult, Opstamp, TantivyError}; +use crate::{FutureResult, Opstamp}; const NUM_MERGE_THREADS: usize = 4; @@ -73,10 +72,12 @@ fn save_metas(metas: &IndexMeta, directory: &dyn Directory) -> crate::Result<()> let mut buffer = serde_json::to_vec_pretty(metas)?; // Just adding a new line at the end of the buffer. writeln!(&mut buffer)?; - fail_point!("save_metas", |msg| Err(TantivyError::from(io::Error::new( - io::ErrorKind::Other, - msg.unwrap_or_else(|| "Undefined".to_string()) - )))); + fail_point!("save_metas", |msg| Err(crate::TantivyError::from( + std::io::Error::new( + std::io::ErrorKind::Other, + msg.unwrap_or_else(|| "Undefined".to_string()) + ) + ))); directory.sync_directory()?; directory.atomic_write(&META_FILEPATH, &buffer[..])?; debug!("Saved metas {:?}", serde_json::to_string_pretty(&metas)); From a1afc806002af7c3ad101cb9ea7418972d423e53 Mon Sep 17 00:00:00 2001 From: PSeitz Date: Wed, 4 May 2022 08:39:44 +0200 Subject: [PATCH 3/4] Update src/core/executor.rs Co-authored-by: Paul Masurel --- src/core/executor.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/core/executor.rs b/src/core/executor.rs index 257acc961..f5e15d2b4 100644 --- a/src/core/executor.rs +++ b/src/core/executor.rs @@ -49,7 +49,6 @@ impl Executor { match self { Executor::SingleThread => args.map(f).collect::>(), Executor::ThreadPool(pool) => { - // let args_with_indices: Vec<(usize, A)> = args.enumerate().collect(); let args: Vec = args.collect(); let num_fruits = args.len(); let fruit_receiver = { From be70804d17087206ee9896a9a0fd3141eb40f635 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 4 May 2022 16:43:04 +0900 Subject: [PATCH 4/4] Removed AtomicUsize. --- src/core/executor.rs | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/src/core/executor.rs b/src/core/executor.rs index f5e15d2b4..def78eb93 100644 --- a/src/core/executor.rs +++ b/src/core/executor.rs @@ -1,8 +1,8 @@ -use std::sync::atomic::AtomicUsize; - use crossbeam::channel; use rayon::{ThreadPool, ThreadPoolBuilder}; +use crate::TantivyError; + /// Search executor whether search request are single thread or multithread. /// /// We don't expose Rayon thread pool directly here for several reasons. @@ -55,10 +55,13 @@ impl Executor { let (fruit_sender, fruit_receiver) = channel::unbounded(); pool.scope(|scope| { for (idx, arg) in args.into_iter().enumerate() { - let idx = AtomicUsize::new(idx); - scope.spawn(|_| { - let fruit = f(arg); - if let Err(err) = fruit_sender.send((idx.into_inner(), fruit)) { + // We name references for f and fruit_sender_ref because we do not + // want these two to be moved into the closure. + let f_ref = &f; + let fruit_sender_ref = &fruit_sender; + scope.spawn(move |_| { + let fruit = f_ref(arg); + if let Err(err) = fruit_sender_ref.send((idx, fruit)) { error!( "Failed to send search task. It probably means all search \ threads have panicked. {:?}", @@ -73,18 +76,19 @@ impl Executor { // This is important as it makes it possible for the fruit_receiver iteration to // terminate. }; - // This is lame, but safe. - let mut results_with_position = Vec::with_capacity(num_fruits); + let mut result_placeholders: Vec> = + std::iter::repeat_with(|| None).take(num_fruits).collect(); for (pos, fruit_res) in fruit_receiver { let fruit = fruit_res?; - results_with_position.push((pos, fruit)); + result_placeholders[pos] = Some(fruit); } - results_with_position.sort_by_key(|(pos, _)| *pos); - assert_eq!(results_with_position.len(), num_fruits); - Ok(results_with_position - .into_iter() - .map(|(_, fruit)| fruit) - .collect::>()) + let results: Vec = result_placeholders.into_iter().flatten().collect(); + if results.len() != num_fruits { + return Err(TantivyError::InternalError( + "One of the mapped execution failed.".to_string(), + )); + } + Ok(results) } } }