mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-15 12:30:38 +00:00
chore: move Tantivy fulltext search to blocking thread pool (#7919)
perf: move Tantivy fulltext search to blocking thread pool Wrap the synchronous Tantivy search (query parsing, posting list traversal, stored field reads) in spawn_blocking_global to avoid starving the tokio async runtime with CPU-bound work. Signed-off-by: lyang24 <lanqingy93@gmail.com>
This commit is contained in:
@@ -14,6 +14,7 @@
|
||||
|
||||
use std::collections::{BTreeSet, HashMap};
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use async_trait::async_trait;
|
||||
@@ -27,15 +28,19 @@ use tantivy::{Index, IndexReader, ReloadPolicy, TantivyDocument};
|
||||
use crate::fulltext_index::Config;
|
||||
use crate::fulltext_index::create::{ROWID_FIELD_NAME, TEXT_FIELD_NAME};
|
||||
use crate::fulltext_index::error::{
|
||||
Result, TantivyDocNotFoundSnafu, TantivyParserSnafu, TantivySnafu,
|
||||
JoinSnafu, Result, TantivyDocNotFoundSnafu, TantivyParserSnafu, TantivySnafu,
|
||||
};
|
||||
use crate::fulltext_index::search::{FulltextIndexSearcher, RowId};
|
||||
|
||||
/// `TantivyFulltextIndexSearcher` is a searcher using Tantivy.
|
||||
pub struct TantivyFulltextIndexSearcher {
|
||||
/// Tanitvy index.
|
||||
inner: Arc<TantivySearcherInner>,
|
||||
}
|
||||
|
||||
struct TantivySearcherInner {
|
||||
/// Tantivy index.
|
||||
index: Index,
|
||||
/// Tanitvy index reader.
|
||||
/// Tantivy index reader.
|
||||
reader: IndexReader,
|
||||
/// The default field used to build `QueryParser`
|
||||
default_field: Field,
|
||||
@@ -66,63 +71,73 @@ impl TantivyFulltextIndexSearcher {
|
||||
);
|
||||
|
||||
Ok(Self {
|
||||
index,
|
||||
reader,
|
||||
default_field,
|
||||
inner: Arc::new(TantivySearcherInner {
|
||||
index,
|
||||
reader,
|
||||
default_field,
|
||||
}),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn search_sync(inner: &TantivySearcherInner, query: &str) -> Result<BTreeSet<RowId>> {
|
||||
let searcher = inner.reader.searcher();
|
||||
let query_parser = QueryParser::for_index(&inner.index, vec![inner.default_field]);
|
||||
let query = query_parser
|
||||
.parse_query(query)
|
||||
.context(TantivyParserSnafu)?;
|
||||
let doc_addrs = searcher
|
||||
.search(&query, &DocSetCollector)
|
||||
.context(TantivySnafu)?;
|
||||
|
||||
let seg_metas = inner
|
||||
.index
|
||||
.searchable_segment_metas()
|
||||
.context(TantivySnafu)?;
|
||||
|
||||
// FAST PATH: only one segment, the doc id is the same as the row id.
|
||||
// Also for compatibility with the old version.
|
||||
if seg_metas.len() == 1 {
|
||||
return Ok(doc_addrs.into_iter().map(|d| d.doc_id).collect());
|
||||
}
|
||||
|
||||
// SLOW PATH: multiple segments, need to calculate the row id.
|
||||
let rowid_field = searcher
|
||||
.schema()
|
||||
.get_field(ROWID_FIELD_NAME)
|
||||
.context(TantivySnafu)?;
|
||||
let mut seg_offsets = HashMap::with_capacity(seg_metas.len());
|
||||
let mut res = BTreeSet::new();
|
||||
for doc_addr in doc_addrs {
|
||||
let offset = if let Some(offset) = seg_offsets.get(&doc_addr.segment_ord) {
|
||||
*offset
|
||||
} else {
|
||||
// Calculate the offset at the first time meeting the segment and cache it since
|
||||
// the offset is the same for all rows in the same segment.
|
||||
let doc: TantivyDocument = searcher.doc(doc_addr).context(TantivySnafu)?;
|
||||
let rowid = doc
|
||||
.get_first(rowid_field)
|
||||
.and_then(|v| v.as_u64())
|
||||
.context(TantivyDocNotFoundSnafu { doc_addr })?;
|
||||
|
||||
let offset = rowid as u32 - doc_addr.doc_id;
|
||||
seg_offsets.insert(doc_addr.segment_ord, offset);
|
||||
offset
|
||||
};
|
||||
|
||||
res.insert(doc_addr.doc_id + offset);
|
||||
}
|
||||
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl FulltextIndexSearcher for TantivyFulltextIndexSearcher {
|
||||
async fn search(&self, query: &str) -> Result<BTreeSet<RowId>> {
|
||||
let searcher = self.reader.searcher();
|
||||
let query_parser = QueryParser::for_index(&self.index, vec![self.default_field]);
|
||||
let query = query_parser
|
||||
.parse_query(query)
|
||||
.context(TantivyParserSnafu)?;
|
||||
let doc_addrs = searcher
|
||||
.search(&query, &DocSetCollector)
|
||||
.context(TantivySnafu)?;
|
||||
|
||||
let seg_metas = self
|
||||
.index
|
||||
.searchable_segment_metas()
|
||||
.context(TantivySnafu)?;
|
||||
|
||||
// FAST PATH: only one segment, the doc id is the same as the row id.
|
||||
// Also for compatibility with the old version.
|
||||
if seg_metas.len() == 1 {
|
||||
return Ok(doc_addrs.into_iter().map(|d| d.doc_id).collect());
|
||||
}
|
||||
|
||||
// SLOW PATH: multiple segments, need to calculate the row id.
|
||||
let rowid_field = searcher
|
||||
.schema()
|
||||
.get_field(ROWID_FIELD_NAME)
|
||||
.context(TantivySnafu)?;
|
||||
let mut seg_offsets = HashMap::with_capacity(seg_metas.len());
|
||||
let mut res = BTreeSet::new();
|
||||
for doc_addr in doc_addrs {
|
||||
let offset = if let Some(offset) = seg_offsets.get(&doc_addr.segment_ord) {
|
||||
*offset
|
||||
} else {
|
||||
// Calculate the offset at the first time meeting the segment and cache it since
|
||||
// the offset is the same for all rows in the same segment.
|
||||
let doc: TantivyDocument = searcher.doc(doc_addr).context(TantivySnafu)?;
|
||||
let rowid = doc
|
||||
.get_first(rowid_field)
|
||||
.and_then(|v| v.as_u64())
|
||||
.context(TantivyDocNotFoundSnafu { doc_addr })?;
|
||||
|
||||
let offset = rowid as u32 - doc_addr.doc_id;
|
||||
seg_offsets.insert(doc_addr.segment_ord, offset);
|
||||
offset
|
||||
};
|
||||
|
||||
res.insert(doc_addr.doc_id + offset);
|
||||
}
|
||||
|
||||
Ok(res)
|
||||
let inner = self.inner.clone();
|
||||
let query = query.to_string();
|
||||
common_runtime::spawn_blocking_global(move || search_sync(&inner, &query))
|
||||
.await
|
||||
.context(JoinSnafu)?
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user