From 1ee5f90761f8890ddc94282b8397240e9eded350 Mon Sep 17 00:00:00 2001 From: Adam Reichold Date: Thu, 9 May 2024 09:01:13 +0200 Subject: [PATCH 1/2] Give allocation control to the caller instead of force a clone (#2389) Achieved by moving the boxes out of the temporary reference wrappers which are cloneable themselves, i.e. if required the caller can clone them already or consume them to reuse existing allocations. --- src/schema/document/value.rs | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/src/schema/document/value.rs b/src/schema/document/value.rs index e05649ad7..81edeef3f 100644 --- a/src/schema/document/value.rs +++ b/src/schema/document/value.rs @@ -27,7 +27,7 @@ pub trait Value<'a>: Send + Sync + Debug { } #[inline] - /// If the Value is a String, returns the associated str. Returns None otherwise. + /// If the Value is a leaf, returns the associated leaf. Returns None otherwise. fn as_leaf(&self) -> Option> { if let ReferenceValue::Leaf(val) = self.as_value() { Some(val) @@ -82,7 +82,8 @@ pub trait Value<'a>: Send + Sync + Debug { /// If the Value is a pre-tokenized string, returns the associated string. Returns None /// otherwise. fn as_pre_tokenized_text(&self) -> Option> { - self.as_leaf().and_then(|leaf| leaf.as_pre_tokenized_text()) + self.as_leaf() + .and_then(|leaf| leaf.into_pre_tokenized_text()) } #[inline] @@ -259,11 +260,11 @@ impl<'a> ReferenceValueLeaf<'a> { } #[inline] - /// If the Value is a pre-tokenized string, returns the associated string. Returns None - /// otherwise. - pub fn as_pre_tokenized_text(&self) -> Option> { + /// If the Value is a pre-tokenized string, consumes it and returns the string. + /// Returns None otherwise. + pub fn into_pre_tokenized_text(self) -> Option> { if let Self::PreTokStr(val) = self { - Some(val.clone()) + Some(val) } else { None } @@ -322,6 +323,16 @@ where V: Value<'a> } } + #[inline] + /// If the Value is a leaf, consume it and return the leaf. Returns None otherwise. + pub fn into_leaf(self) -> Option> { + if let Self::Leaf(val) = self { + Some(val) + } else { + None + } + } + #[inline] /// If the Value is a String, returns the associated str. Returns None otherwise. pub fn as_str(&self) -> Option<&'a str> { @@ -365,10 +376,11 @@ where V: Value<'a> } #[inline] - /// If the Value is a pre-tokenized string, returns the associated string. Returns None - /// otherwise. - pub fn as_pre_tokenized_text(&self) -> Option> { - self.as_leaf().and_then(|leaf| leaf.as_pre_tokenized_text()) + /// If the Value is a pre-tokenized string, consumes it and returns the string. + /// Returns None otherwise. + pub fn into_pre_tokenized_text(self) -> Option> { + self.into_leaf() + .and_then(|leaf| leaf.into_pre_tokenized_text()) } #[inline] From 6181c1eb5e2e0126ec16ba352b219825a9640a9d Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Fri, 10 May 2024 17:19:12 +0900 Subject: [PATCH 2/2] Small changes in the Executor API. (#2391) Warning, this change is mildly not backward compatible so I bumped tantivy's version. --- Cargo.toml | 44 ++++++++++++++++--------- src/core/executor.rs | 76 ++++++++++---------------------------------- src/index/index.rs | 17 ++++------ src/lib.rs | 1 - 4 files changed, 51 insertions(+), 87 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8a8b84f65..6ae152c5b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tantivy" -version = "0.22.0" +version = "0.23.0" authors = ["Paul Masurel "] license = "MIT" categories = ["database-implementations", "data-structures"] @@ -15,12 +15,16 @@ rust-version = "1.63" exclude = ["benches/*.json", "benches/*.txt"] [dependencies] -oneshot = "0.1.5" +# Switch back to the non-forked oneshot crate once https://github.com/faern/oneshot/pull/35 is merged +oneshot = { git = "https://github.com/fulmicoton/oneshot.git", rev = "c10a3ba" } base64 = "0.22.0" byteorder = "1.4.3" crc32fast = "1.3.2" once_cell = "1.10.0" -regex = { version = "1.5.5", default-features = false, features = ["std", "unicode"] } +regex = { version = "1.5.5", default-features = false, features = [ + "std", + "unicode", +] } aho-corasick = "1.0" tantivy-fst = "0.5" memmap2 = { version = "0.9.0", optional = true } @@ -36,7 +40,9 @@ uuid = { version = "1.0.0", features = ["v4", "serde"] } crossbeam-channel = "0.5.4" rust-stemmers = "1.2.0" downcast-rs = "1.2.0" -bitpacking = { version = "0.9.2", default-features = false, features = ["bitpacker4x"] } +bitpacking = { version = "0.9.2", default-features = false, features = [ + "bitpacker4x", +] } census = "0.4.2" rustc-hash = "1.1.0" thiserror = "1.0.30" @@ -51,13 +57,13 @@ itertools = "0.12.0" measure_time = "0.8.2" arc-swap = "1.5.0" -columnar = { version= "0.3", path="./columnar", package ="tantivy-columnar" } -sstable = { version= "0.3", path="./sstable", package ="tantivy-sstable", optional = true } -stacker = { version= "0.3", path="./stacker", package ="tantivy-stacker" } -query-grammar = { version= "0.22.0", path="./query-grammar", package = "tantivy-query-grammar" } -tantivy-bitpacker = { version= "0.6", path="./bitpacker" } -common = { version= "0.7", path = "./common/", package = "tantivy-common" } -tokenizer-api = { version= "0.3", path="./tokenizer-api", package="tantivy-tokenizer-api" } +columnar = { version = "0.3", path = "./columnar", package = "tantivy-columnar" } +sstable = { version = "0.3", path = "./sstable", package = "tantivy-sstable", optional = true } +stacker = { version = "0.3", path = "./stacker", package = "tantivy-stacker" } +query-grammar = { version = "0.22.0", path = "./query-grammar", package = "tantivy-query-grammar" } +tantivy-bitpacker = { version = "0.6", path = "./bitpacker" } +common = { version = "0.7", path = "./common/", package = "tantivy-common" } +tokenizer-api = { version = "0.3", path = "./tokenizer-api", package = "tantivy-tokenizer-api" } sketches-ddsketch = { version = "0.2.1", features = ["use_serde"] } futures-util = { version = "0.3.28", optional = true } fnv = "1.0.7" @@ -66,7 +72,7 @@ fnv = "1.0.7" winapi = "0.3.9" [dev-dependencies] -binggan = "0.5.1" +binggan = "0.5.2" rand = "0.8.5" maplit = "1.0.2" matches = "0.1.9" @@ -112,7 +118,7 @@ lz4-compression = ["lz4_flex"] zstd-compression = ["zstd"] failpoints = ["fail", "fail/failpoints"] -unstable = [] # useful for benches. +unstable = [] # useful for benches. quickwit = ["sstable", "futures-util"] @@ -122,7 +128,16 @@ quickwit = ["sstable", "futures-util"] compare_hash_only = ["stacker/compare_hash_only"] [workspace] -members = ["query-grammar", "bitpacker", "common", "ownedbytes", "stacker", "sstable", "tokenizer-api", "columnar"] +members = [ + "query-grammar", + "bitpacker", + "common", + "ownedbytes", + "stacker", + "sstable", + "tokenizer-api", + "columnar", +] # Following the "fail" crate best practises, we isolate # tests that define specific behavior in fail check points @@ -147,4 +162,3 @@ harness = false [[bench]] name = "agg_bench" harness = false - diff --git a/src/core/executor.rs b/src/core/executor.rs index 915534009..43c3d2687 100644 --- a/src/core/executor.rs +++ b/src/core/executor.rs @@ -1,21 +1,25 @@ +use std::sync::Arc; + #[cfg(feature = "quickwit")] use futures_util::{future::Either, FutureExt}; -use rayon::{ThreadPool, ThreadPoolBuilder}; use crate::TantivyError; -/// Search executor whether search request are single thread or multithread. -/// -/// We don't expose Rayon thread pool directly here for several reasons. -/// -/// First dependency hell. It is not a good idea to expose the -/// API of a dependency, knowing it might conflict with a different version -/// used by the client. Second, we may stop using rayon in the future. +/// Executor makes it possible to run tasks in single thread or +/// in a thread pool. +#[derive(Clone)] pub enum Executor { /// Single thread variant of an Executor SingleThread, /// Thread pool variant of an Executor - ThreadPool(ThreadPool), + ThreadPool(Arc), +} + +#[cfg(feature = "quickwit")] +impl From> for Executor { + fn from(thread_pool: Arc) -> Self { + Executor::ThreadPool(thread_pool) + } } impl Executor { @@ -26,11 +30,11 @@ impl Executor { /// Creates an Executor that dispatches the tasks in a thread pool. pub fn multi_thread(num_threads: usize, prefix: &'static str) -> crate::Result { - let pool = ThreadPoolBuilder::new() + let pool = rayon::ThreadPoolBuilder::new() .num_threads(num_threads) .thread_name(move |num| format!("{prefix}{num}")) .build()?; - Ok(Executor::ThreadPool(pool)) + Ok(Executor::ThreadPool(Arc::new(pool))) } /// Perform a map in the thread pool. @@ -105,7 +109,7 @@ impl Executor { match self { Executor::SingleThread => Either::Left(std::future::ready(Ok(cpu_intensive_task()))), Executor::ThreadPool(pool) => { - let (sender, receiver) = oneshot_with_sentinel::channel(); + let (sender, receiver) = oneshot::channel(); pool.spawn(|| { if sender.is_closed() { return; @@ -121,54 +125,6 @@ impl Executor { } } -#[cfg(feature = "quickwit")] -mod oneshot_with_sentinel { - use std::pin::Pin; - use std::sync::Arc; - use std::task::{Context, Poll}; - // TODO get ride of this if oneshot ever gains a is_closed() - - pub struct SenderWithSentinel { - tx: oneshot::Sender, - guard: Arc<()>, - } - - pub struct ReceiverWithSentinel { - rx: oneshot::Receiver, - _guard: Arc<()>, - } - - pub fn channel() -> (SenderWithSentinel, ReceiverWithSentinel) { - let (tx, rx) = oneshot::channel(); - let guard = Arc::new(()); - ( - SenderWithSentinel { - tx, - guard: guard.clone(), - }, - ReceiverWithSentinel { rx, _guard: guard }, - ) - } - - impl SenderWithSentinel { - pub fn send(self, message: T) -> Result<(), oneshot::SendError> { - self.tx.send(message) - } - - pub fn is_closed(&self) -> bool { - Arc::strong_count(&self.guard) == 1 - } - } - - impl std::future::Future for ReceiverWithSentinel { - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { - Pin::new(&mut self.rx).poll(ctx) - } - } -} - #[cfg(test)] mod tests { use super::Executor; diff --git a/src/index/index.rs b/src/index/index.rs index 0baea3345..89b5933fb 100644 --- a/src/index/index.rs +++ b/src/index/index.rs @@ -3,7 +3,6 @@ use std::fmt; #[cfg(feature = "mmap")] use std::path::Path; use std::path::PathBuf; -use std::sync::Arc; use std::thread::available_parallelism; use super::segment::Segment; @@ -294,7 +293,7 @@ pub struct Index { directory: ManagedDirectory, schema: Schema, settings: IndexSettings, - executor: Arc, + executor: Executor, tokenizers: TokenizerManager, fast_field_tokenizers: TokenizerManager, inventory: SegmentMetaInventory, @@ -319,23 +318,19 @@ impl Index { /// /// By default the executor is single thread, and simply runs in the calling thread. pub fn search_executor(&self) -> &Executor { - self.executor.as_ref() + &self.executor } /// Replace the default single thread search executor pool /// by a thread pool with a given number of threads. pub fn set_multithread_executor(&mut self, num_threads: usize) -> crate::Result<()> { - self.executor = Arc::new(Executor::multi_thread(num_threads, "tantivy-search-")?); + self.executor = Executor::multi_thread(num_threads, "tantivy-search-")?; Ok(()) } /// Custom thread pool by a outer thread pool. - pub fn set_shared_multithread_executor( - &mut self, - shared_thread_pool: Arc, - ) -> crate::Result<()> { - self.executor = shared_thread_pool.clone(); - Ok(()) + pub fn set_executor(&mut self, executor: Executor) { + self.executor = executor; } /// Replace the default single thread search executor pool @@ -419,7 +414,7 @@ impl Index { schema, tokenizers: TokenizerManager::default(), fast_field_tokenizers: TokenizerManager::default(), - executor: Arc::new(Executor::single_thread()), + executor: Executor::single_thread(), inventory, } } diff --git a/src/lib.rs b/src/lib.rs index 2978f4178..85b71b3c8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -436,7 +436,6 @@ pub mod tests { } #[test] - #[cfg(not(feature = "lz4"))] fn test_version_string() { use regex::Regex; let regex_ptn = Regex::new(