Compare commits

...

9 Commits

Author SHA1 Message Date
Paul Masurel
82b510b88b Adding panic handler for the rayon merge thread pool 2025-02-19 17:19:28 +09:00
Remi Dettai
71cf19870b Exist queries match subpath fields (#2558)
* Exist queries match subpath fields

* Make subpath check optional

* Add async subpath listing
2025-01-06 10:17:39 +01:00
Harrison Burt
148594f0f9 Improve IndexWriter customisation via builder (#2562)
* Improve `IndexWriter` customisation via builder

* Remove change noise from PR

* Correct documentation

* Resolve comments and add test
2025-01-02 09:43:22 +01:00
dependabot[bot]
8edb439440 Update rustc-hash requirement from 1.1.0 to 2.1.0 (#2551)
---
updated-dependencies:
- dependency-name: rustc-hash
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-12-26 10:25:05 +01:00
trinity-1686a
c39d91f827 Merge pull request #2547 from quickwit-oss/trinity/count-str
add support for counting non integer in aggregation
2024-12-17 15:27:30 +01:00
trinity Pointard
32b6e9711b add tests 2024-12-13 16:06:24 +01:00
dependabot[bot]
0f99d4f420 Update measure_time requirement from 0.8.2 to 0.9.0 (#2557)
---
updated-dependencies:
- dependency-name: measure_time
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-12-09 21:39:01 +01:00
Pierre Barre
6e02c5cb25 Make NUM_MERGE_THREADS configurable (#2535)
* Make `NUM_MERGE_THREADS` configurable

* Remove unused import

* Reword comment src/index/index.rs

Co-authored-by: PSeitz <PSeitz@users.noreply.github.com>

---------

Co-authored-by: PSeitz <PSeitz@users.noreply.github.com>
2024-12-09 16:53:11 +08:00
trinity-1686a
0bac391291 add support for counting non integer in aggregation 2024-11-28 19:52:47 +01:00
15 changed files with 572 additions and 132 deletions

View File

@@ -53,8 +53,9 @@ rayon = "1.5.2"
lru = "0.12.0"
fastdivide = "0.4.0"
itertools = "0.13.0"
measure_time = "0.8.2"
measure_time = "0.9.0"
arc-swap = "1.5.0"
bon = "3.3.1"
columnar = { version = "0.3", path = "./columnar", package = "tantivy-columnar" }
sstable = { version = "0.3", path = "./sstable", package = "tantivy-sstable", optional = true }

View File

@@ -1,6 +1,7 @@
use std::{fmt, io, mem};
use common::file_slice::FileSlice;
use common::json_path_writer::JSON_PATH_SEGMENT_SEP;
use common::BinarySerializable;
use sstable::{Dictionary, RangeSSTable};
@@ -76,6 +77,19 @@ fn read_all_columns_in_stream(
Ok(results)
}
fn column_dictionary_prefix_for_column_name(column_name: &str) -> String {
// Each column is a associated to a given `column_key`,
// that starts by `column_name\0column_header`.
//
// Listing the columns associated to the given column name is therefore equivalent to
// listing `column_key` with the prefix `column_name\0`.
format!("{}{}", column_name, '\0')
}
fn column_dictionary_prefix_for_subpath(root_path: &str) -> String {
format!("{}{}", root_path, JSON_PATH_SEGMENT_SEP as char)
}
impl ColumnarReader {
/// Opens a new Columnar file.
pub fn open<F>(file_slice: F) -> io::Result<ColumnarReader>
@@ -144,32 +158,14 @@ impl ColumnarReader {
Ok(self.iter_columns()?.collect())
}
fn stream_for_column_range(&self, column_name: &str) -> sstable::StreamerBuilder<RangeSSTable> {
// Each column is a associated to a given `column_key`,
// that starts by `column_name\0column_header`.
//
// Listing the columns associated to the given column name is therefore equivalent to
// listing `column_key` with the prefix `column_name\0`.
//
// This is in turn equivalent to searching for the range
// `[column_name,\0`..column_name\1)`.
// TODO can we get some more generic `prefix(..)` logic in the dictionary.
let mut start_key = column_name.to_string();
start_key.push('\0');
let mut end_key = column_name.to_string();
end_key.push(1u8 as char);
self.column_dictionary
.range()
.ge(start_key.as_bytes())
.lt(end_key.as_bytes())
}
pub async fn read_columns_async(
&self,
column_name: &str,
) -> io::Result<Vec<DynamicColumnHandle>> {
let prefix = column_dictionary_prefix_for_column_name(column_name);
let stream = self
.stream_for_column_range(column_name)
.column_dictionary
.prefix_range(prefix)
.into_stream_async()
.await?;
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
@@ -180,7 +176,35 @@ impl ColumnarReader {
/// There can be more than one column associated to a given column name, provided they have
/// different types.
pub fn read_columns(&self, column_name: &str) -> io::Result<Vec<DynamicColumnHandle>> {
let stream = self.stream_for_column_range(column_name).into_stream()?;
let prefix = column_dictionary_prefix_for_column_name(column_name);
let stream = self.column_dictionary.prefix_range(prefix).into_stream()?;
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
}
pub async fn read_subpath_columns_async(
&self,
root_path: &str,
) -> io::Result<Vec<DynamicColumnHandle>> {
let prefix = column_dictionary_prefix_for_subpath(root_path);
let stream = self
.column_dictionary
.prefix_range(prefix)
.into_stream_async()
.await?;
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
}
/// Get all inner columns for a given JSON prefix, i.e columns for which the name starts
/// with the prefix then contain the [`JSON_PATH_SEGMENT_SEP`].
///
/// There can be more than one column associated to each path within the JSON structure,
/// provided they have different types.
pub fn read_subpath_columns(&self, root_path: &str) -> io::Result<Vec<DynamicColumnHandle>> {
let prefix = column_dictionary_prefix_for_subpath(root_path);
let stream = self
.column_dictionary
.prefix_range(prefix.as_bytes())
.into_stream()?;
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
}
@@ -192,6 +216,8 @@ impl ColumnarReader {
#[cfg(test)]
mod tests {
use common::json_path_writer::JSON_PATH_SEGMENT_SEP;
use crate::{ColumnType, ColumnarReader, ColumnarWriter};
#[test]
@@ -224,6 +250,64 @@ mod tests {
assert_eq!(columns[0].1.column_type(), ColumnType::U64);
}
#[test]
fn test_read_columns() {
let mut columnar_writer = ColumnarWriter::default();
columnar_writer.record_column_type("col", ColumnType::U64, false);
columnar_writer.record_numerical(1, "col", 1u64);
let mut buffer = Vec::new();
columnar_writer.serialize(2, &mut buffer).unwrap();
let columnar = ColumnarReader::open(buffer).unwrap();
{
let columns = columnar.read_columns("col").unwrap();
assert_eq!(columns.len(), 1);
assert_eq!(columns[0].column_type(), ColumnType::U64);
}
{
let columns = columnar.read_columns("other").unwrap();
assert_eq!(columns.len(), 0);
}
}
#[test]
fn test_read_subpath_columns() {
let mut columnar_writer = ColumnarWriter::default();
columnar_writer.record_str(
0,
&format!("col1{}subcol1", JSON_PATH_SEGMENT_SEP as char),
"hello",
);
columnar_writer.record_numerical(
0,
&format!("col1{}subcol2", JSON_PATH_SEGMENT_SEP as char),
1i64,
);
columnar_writer.record_str(1, "col1", "hello");
columnar_writer.record_str(0, "col2", "hello");
let mut buffer = Vec::new();
columnar_writer.serialize(2, &mut buffer).unwrap();
let columnar = ColumnarReader::open(buffer).unwrap();
{
let columns = columnar.read_subpath_columns("col1").unwrap();
assert_eq!(columns.len(), 2);
assert_eq!(columns[0].column_type(), ColumnType::Str);
assert_eq!(columns[1].column_type(), ColumnType::I64);
}
{
let columns = columnar.read_subpath_columns("col1.subcol1").unwrap();
assert_eq!(columns.len(), 0);
}
{
let columns = columnar.read_subpath_columns("col2").unwrap();
assert_eq!(columns.len(), 0);
}
{
let columns = columnar.read_subpath_columns("other").unwrap();
assert_eq!(columns.len(), 0);
}
}
#[test]
#[should_panic(expected = "Input type forbidden")]
fn test_list_columns_strict_typing_panics_on_wrong_types() {

View File

@@ -285,7 +285,6 @@ impl ColumnarWriter {
.map(|(column_name, addr)| (column_name, ColumnType::DateTime, addr)),
);
columns.sort_unstable_by_key(|(column_name, col_type, _)| (*column_name, *col_type));
let (arena, buffers, dictionaries) = (&self.arena, &mut self.buffers, &self.dictionaries);
let mut symbol_byte_buffer: Vec<u8> = Vec::new();
for (column_name, column_type, addr) in columns {

View File

@@ -271,10 +271,6 @@ impl AggregationWithAccessor {
field: ref field_name,
..
})
| Count(CountAggregation {
field: ref field_name,
..
})
| Max(MaxAggregation {
field: ref field_name,
..
@@ -299,6 +295,24 @@ impl AggregationWithAccessor {
get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?;
add_agg_with_accessor(&agg, accessor, column_type, &mut res)?;
}
Count(CountAggregation {
field: ref field_name,
..
}) => {
let allowed_column_types = [
ColumnType::I64,
ColumnType::U64,
ColumnType::F64,
ColumnType::Str,
ColumnType::DateTime,
ColumnType::Bool,
ColumnType::IpAddr,
// ColumnType::Bytes Unsupported
];
let (accessor, column_type) =
get_ff_reader(reader, field_name, Some(&allowed_column_types))?;
add_agg_with_accessor(&agg, accessor, column_type, &mut res)?;
}
Percentiles(ref percentiles) => {
let (accessor, column_type) = get_ff_reader(
reader,

View File

@@ -220,9 +220,23 @@ impl SegmentStatsCollector {
.column_block_accessor
.fetch_block(docs, &agg_accessor.accessor);
}
for val in agg_accessor.column_block_accessor.iter_vals() {
let val1 = f64_from_fastfield_u64(val, &self.field_type);
self.stats.collect(val1);
if [
ColumnType::I64,
ColumnType::U64,
ColumnType::F64,
ColumnType::DateTime,
]
.contains(&self.field_type)
{
for val in agg_accessor.column_block_accessor.iter_vals() {
let val1 = f64_from_fastfield_u64(val, &self.field_type);
self.stats.collect(val1);
}
} else {
for _val in agg_accessor.column_block_accessor.iter_vals() {
// we ignore the value and simply record that we got something
self.stats.collect(0.0);
}
}
}
}
@@ -435,6 +449,11 @@ mod tests {
"field": "score",
},
},
"count_str": {
"value_count": {
"field": "text",
},
},
"range": range_agg
}))
.unwrap();
@@ -500,6 +519,13 @@ mod tests {
})
);
assert_eq!(
res["count_str"],
json!({
"value": 7.0,
})
);
Ok(())
}

View File

@@ -578,7 +578,7 @@ mod tests {
.set_indexing_options(
TextFieldIndexing::default().set_index_option(IndexRecordOption::WithFreqs),
)
.set_fast(None)
.set_fast(Some("raw"))
.set_stored();
let text_field = schema_builder.add_text_field("text", text_fieldtype);
let date_field = schema_builder.add_date_field("date", FAST);

View File

@@ -217,7 +217,7 @@ impl FastFieldReaders {
Ok(dynamic_column.into())
}
/// Returning a `dynamic_column_handle`.
/// Returns a `dynamic_column_handle`.
pub fn dynamic_column_handle(
&self,
field_name: &str,
@@ -234,7 +234,7 @@ impl FastFieldReaders {
Ok(dynamic_column_handle_opt)
}
/// Returning all `dynamic_column_handle`.
/// Returns all `dynamic_column_handle` that match the given field name.
pub fn dynamic_column_handles(
&self,
field_name: &str,
@@ -250,6 +250,22 @@ impl FastFieldReaders {
Ok(dynamic_column_handles)
}
/// Returns all `dynamic_column_handle` that are inner fields of the provided JSON path.
pub fn dynamic_subpath_column_handles(
&self,
root_path: &str,
) -> crate::Result<Vec<DynamicColumnHandle>> {
let Some(resolved_field_name) = self.resolve_field(root_path)? else {
return Ok(Vec::new());
};
let dynamic_column_handles = self
.columnar
.read_subpath_columns(&resolved_field_name)?
.into_iter()
.collect();
Ok(dynamic_column_handles)
}
#[doc(hidden)]
pub async fn list_dynamic_column_handles(
&self,
@@ -265,6 +281,21 @@ impl FastFieldReaders {
Ok(columns)
}
#[doc(hidden)]
pub async fn list_subpath_dynamic_column_handles(
&self,
root_path: &str,
) -> crate::Result<Vec<DynamicColumnHandle>> {
let Some(resolved_field_name) = self.resolve_field(root_path)? else {
return Ok(Vec::new());
};
let columns = self
.columnar
.read_subpath_columns_async(&resolved_field_name)
.await?;
Ok(columns)
}
/// Returns the `u64` column used to represent any `u64`-mapped typed (String/Bytes term ids,
/// i64, u64, f64, DateTime).
///
@@ -476,6 +507,15 @@ mod tests {
.iter()
.any(|column| column.column_type() == ColumnType::Str));
println!("*** {:?}", fast_fields.columnar().list_columns());
let json_columns = fast_fields.dynamic_column_handles("json").unwrap();
assert_eq!(json_columns.len(), 0);
let json_subcolumns = fast_fields.dynamic_subpath_column_handles("json").unwrap();
assert_eq!(json_subcolumns.len(), 3);
let foo_subcolumns = fast_fields
.dynamic_subpath_column_handles("json.foo")
.unwrap();
assert_eq!(foo_subcolumns.len(), 0);
}
}

View File

@@ -15,7 +15,9 @@ use crate::directory::MmapDirectory;
use crate::directory::{Directory, ManagedDirectory, RamDirectory, INDEX_WRITER_LOCK};
use crate::error::{DataCorruption, TantivyError};
use crate::index::{IndexMeta, SegmentId, SegmentMeta, SegmentMetaInventory};
use crate::indexer::index_writer::{MAX_NUM_THREAD, MEMORY_BUDGET_NUM_BYTES_MIN};
use crate::indexer::index_writer::{
IndexWriterOptions, MAX_NUM_THREAD, MEMORY_BUDGET_NUM_BYTES_MIN,
};
use crate::indexer::segment_updater::save_metas;
use crate::indexer::{IndexWriter, SingleSegmentIndexWriter};
use crate::reader::{IndexReader, IndexReaderBuilder};
@@ -519,6 +521,43 @@ impl Index {
load_metas(self.directory(), &self.inventory)
}
/// Open a new index writer with the given options. Attempts to acquire a lockfile.
///
/// The lockfile should be deleted on drop, but it is possible
/// that due to a panic or other error, a stale lockfile will be
/// left in the index directory. If you are sure that no other
/// `IndexWriter` on the system is accessing the index directory,
/// it is safe to manually delete the lockfile.
///
/// - `options` defines the writer configuration which includes things like buffer sizes,
/// indexer threads, etc...
///
/// # Errors
/// If the lockfile already exists, returns `TantivyError::LockFailure`.
/// If the memory arena per thread is too small or too big, returns
/// `TantivyError::InvalidArgument`
pub fn writer_with_options<D: Document>(
&self,
options: IndexWriterOptions,
) -> crate::Result<IndexWriter<D>> {
let directory_lock = self
.directory
.acquire_lock(&INDEX_WRITER_LOCK)
.map_err(|err| {
TantivyError::LockFailure(
err,
Some(
"Failed to acquire index lock. If you are using a regular directory, this \
means there is already an `IndexWriter` working on this `Directory`, in \
this process or in a different process."
.to_string(),
),
)
})?;
IndexWriter::new(self, options, directory_lock)
}
/// Open a new index writer. Attempts to acquire a lockfile.
///
/// The lockfile should be deleted on drop, but it is possible
@@ -543,27 +582,12 @@ impl Index {
num_threads: usize,
overall_memory_budget_in_bytes: usize,
) -> crate::Result<IndexWriter<D>> {
let directory_lock = self
.directory
.acquire_lock(&INDEX_WRITER_LOCK)
.map_err(|err| {
TantivyError::LockFailure(
err,
Some(
"Failed to acquire index lock. If you are using a regular directory, this \
means there is already an `IndexWriter` working on this `Directory`, in \
this process or in a different process."
.to_string(),
),
)
})?;
let memory_arena_in_bytes_per_thread = overall_memory_budget_in_bytes / num_threads;
IndexWriter::new(
self,
num_threads,
memory_arena_in_bytes_per_thread,
directory_lock,
)
let options = IndexWriterOptions::builder()
.num_worker_threads(num_threads)
.memory_budget_per_thread(memory_arena_in_bytes_per_thread)
.build();
self.writer_with_options(options)
}
/// Helper to create an index writer for tests.

View File

@@ -45,6 +45,23 @@ fn error_in_index_worker_thread(context: &str) -> TantivyError {
))
}
#[derive(Clone, bon::Builder)]
/// A builder for creating a new [IndexWriter] for an index.
pub struct IndexWriterOptions {
#[builder(default = MEMORY_BUDGET_NUM_BYTES_MIN)]
/// The memory budget per indexer thread.
///
/// When an indexer thread has buffered this much data in memory
/// it will flush the segment to disk (although this is not searchable until commit is called.)
memory_budget_per_thread: usize,
#[builder(default = 1)]
/// The number of indexer worker threads to use.
num_worker_threads: usize,
#[builder(default = 4)]
/// Defines the number of merger threads to use.
num_merge_threads: usize,
}
/// `IndexWriter` is the user entry-point to add document to an index.
///
/// It manages a small number of indexing thread, as well as a shared
@@ -58,8 +75,7 @@ pub struct IndexWriter<D: Document = TantivyDocument> {
index: Index,
// The memory budget per thread, after which a commit is triggered.
memory_budget_in_bytes_per_thread: usize,
options: IndexWriterOptions,
workers_join_handle: Vec<JoinHandle<crate::Result<()>>>,
@@ -70,8 +86,6 @@ pub struct IndexWriter<D: Document = TantivyDocument> {
worker_id: usize,
num_threads: usize,
delete_queue: DeleteQueue,
stamper: Stamper,
@@ -265,23 +279,27 @@ impl<D: Document> IndexWriter<D> {
/// `TantivyError::InvalidArgument`
pub(crate) fn new(
index: &Index,
num_threads: usize,
memory_budget_in_bytes_per_thread: usize,
options: IndexWriterOptions,
directory_lock: DirectoryLock,
) -> crate::Result<Self> {
if memory_budget_in_bytes_per_thread < MEMORY_BUDGET_NUM_BYTES_MIN {
if options.memory_budget_per_thread < MEMORY_BUDGET_NUM_BYTES_MIN {
let err_msg = format!(
"The memory arena in bytes per thread needs to be at least \
{MEMORY_BUDGET_NUM_BYTES_MIN}."
);
return Err(TantivyError::InvalidArgument(err_msg));
}
if memory_budget_in_bytes_per_thread >= MEMORY_BUDGET_NUM_BYTES_MAX {
if options.memory_budget_per_thread >= MEMORY_BUDGET_NUM_BYTES_MAX {
let err_msg = format!(
"The memory arena in bytes per thread cannot exceed {MEMORY_BUDGET_NUM_BYTES_MAX}"
);
return Err(TantivyError::InvalidArgument(err_msg));
}
if options.num_worker_threads == 0 {
let err_msg = "At least one worker thread is required, got 0".to_string();
return Err(TantivyError::InvalidArgument(err_msg));
}
let (document_sender, document_receiver) =
crossbeam_channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
@@ -291,13 +309,17 @@ impl<D: Document> IndexWriter<D> {
let stamper = Stamper::new(current_opstamp);
let segment_updater =
SegmentUpdater::create(index.clone(), stamper.clone(), &delete_queue.cursor())?;
let segment_updater = SegmentUpdater::create(
index.clone(),
stamper.clone(),
&delete_queue.cursor(),
options.num_merge_threads,
)?;
let mut index_writer = Self {
_directory_lock: Some(directory_lock),
memory_budget_in_bytes_per_thread,
options: options.clone(),
index: index.clone(),
index_writer_status: IndexWriterStatus::from(document_receiver),
operation_sender: document_sender,
@@ -305,7 +327,6 @@ impl<D: Document> IndexWriter<D> {
segment_updater,
workers_join_handle: vec![],
num_threads,
delete_queue,
@@ -398,7 +419,7 @@ impl<D: Document> IndexWriter<D> {
let mut delete_cursor = self.delete_queue.cursor();
let mem_budget = self.memory_budget_in_bytes_per_thread;
let mem_budget = self.options.memory_budget_per_thread;
let index = self.index.clone();
let join_handle: JoinHandle<crate::Result<()>> = thread::Builder::new()
.name(format!("thrd-tantivy-index{}", self.worker_id))
@@ -451,7 +472,7 @@ impl<D: Document> IndexWriter<D> {
}
fn start_workers(&mut self) -> crate::Result<()> {
for _ in 0..self.num_threads {
for _ in 0..self.options.num_worker_threads {
self.add_indexing_worker()?;
}
Ok(())
@@ -553,12 +574,7 @@ impl<D: Document> IndexWriter<D> {
.take()
.expect("The IndexWriter does not have any lock. This is a bug, please report.");
let new_index_writer = IndexWriter::new(
&self.index,
self.num_threads,
self.memory_budget_in_bytes_per_thread,
directory_lock,
)?;
let new_index_writer = IndexWriter::new(&self.index, self.options.clone(), directory_lock)?;
// the current `self` is dropped right away because of this call.
//
@@ -812,7 +828,7 @@ mod tests {
use crate::directory::error::LockError;
use crate::error::*;
use crate::indexer::index_writer::MEMORY_BUDGET_NUM_BYTES_MIN;
use crate::indexer::NoMergePolicy;
use crate::indexer::{IndexWriterOptions, NoMergePolicy};
use crate::query::{QueryParser, TermQuery};
use crate::schema::{
self, Facet, FacetOptions, IndexRecordOption, IpAddrOptions, JsonObjectOptions,
@@ -2533,4 +2549,36 @@ mod tests {
index_writer.commit().unwrap();
Ok(())
}
#[test]
fn test_writer_options_validation() {
let mut schema_builder = Schema::builder();
let field = schema_builder.add_bool_field("example", STORED);
let index = Index::create_in_ram(schema_builder.build());
let opt_wo_threads = IndexWriterOptions::builder().num_worker_threads(0).build();
let result = index.writer_with_options::<TantivyDocument>(opt_wo_threads);
assert!(result.is_err(), "Writer should reject 0 thread count");
assert!(matches!(result, Err(TantivyError::InvalidArgument(_))));
let opt_with_low_memory = IndexWriterOptions::builder()
.memory_budget_per_thread(10 << 10)
.build();
let result = index.writer_with_options::<TantivyDocument>(opt_with_low_memory);
assert!(
result.is_err(),
"Writer should reject options with too low memory size"
);
assert!(matches!(result, Err(TantivyError::InvalidArgument(_))));
let opt_with_low_memory = IndexWriterOptions::builder()
.memory_budget_per_thread(5 << 30)
.build();
let result = index.writer_with_options::<TantivyDocument>(opt_with_low_memory);
assert!(
result.is_err(),
"Writer should reject options with too high memory size"
);
assert!(matches!(result, Err(TantivyError::InvalidArgument(_))));
}
}

View File

@@ -31,7 +31,7 @@ mod stamper;
use crossbeam_channel as channel;
use smallvec::SmallVec;
pub use self::index_writer::IndexWriter;
pub use self::index_writer::{IndexWriter, IndexWriterOptions};
pub use self::log_merge_policy::LogMergePolicy;
pub use self::merge_operation::MergeOperation;
pub use self::merge_policy::{MergeCandidate, MergePolicy, NoMergePolicy};

View File

@@ -25,8 +25,6 @@ use crate::indexer::{
};
use crate::{FutureResult, Opstamp};
const NUM_MERGE_THREADS: usize = 4;
/// Save the index meta file.
/// This operation is atomic:
/// Either
@@ -273,6 +271,7 @@ impl SegmentUpdater {
index: Index,
stamper: Stamper,
delete_cursor: &DeleteCursor,
num_merge_threads: usize,
) -> crate::Result<SegmentUpdater> {
let segments = index.searchable_segment_metas()?;
let segment_manager = SegmentManager::from_segments(segments, delete_cursor);
@@ -287,7 +286,18 @@ impl SegmentUpdater {
})?;
let merge_thread_pool = ThreadPoolBuilder::new()
.thread_name(|i| format!("merge_thread_{i}"))
.num_threads(NUM_MERGE_THREADS)
.num_threads(num_merge_threads)
.panic_handler(move |panic| {
let message = if let Some(msg) = panic.downcast_ref::<&str>() {
*msg
} else if let Some(msg) = panic.downcast_ref::<String>() {
msg.as_str()
} else {
"UNKNOWN"
};
eprintln!("merge thread panicked with: {message}")
})
.build()
.map_err(|_| {
crate::TantivyError::SystemError(

View File

@@ -422,6 +422,7 @@ mod tests {
use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use columnar::ColumnType;
use tempfile::TempDir;
use crate::collector::{Count, TopDocs};
@@ -431,15 +432,15 @@ mod tests {
use crate::query::{PhraseQuery, QueryParser};
use crate::schema::{
Document, IndexRecordOption, OwnedValue, Schema, TextFieldIndexing, TextOptions, Value,
DATE_TIME_PRECISION_INDEXED, STORED, STRING, TEXT,
DATE_TIME_PRECISION_INDEXED, FAST, STORED, STRING, TEXT,
};
use crate::store::{Compressor, StoreReader, StoreWriter};
use crate::time::format_description::well_known::Rfc3339;
use crate::time::OffsetDateTime;
use crate::tokenizer::{PreTokenizedString, Token};
use crate::{
DateTime, Directory, DocAddress, DocSet, Index, IndexWriter, TantivyDocument, Term,
TERMINATED,
DateTime, Directory, DocAddress, DocSet, Index, IndexWriter, SegmentReader,
TantivyDocument, Term, TERMINATED,
};
#[test]
@@ -841,6 +842,75 @@ mod tests {
assert_eq!(searcher.search(&phrase_query, &Count).unwrap(), 0);
}
#[test]
fn test_json_fast() {
let mut schema_builder = Schema::builder();
let json_field = schema_builder.add_json_field("json", FAST);
let schema = schema_builder.build();
let json_val: serde_json::Value = serde_json::from_str(
r#"{
"toto": "titi",
"float": -0.2,
"bool": true,
"unsigned": 1,
"signed": -2,
"complexobject": {
"field.with.dot": 1
},
"date": "1985-04-12T23:20:50.52Z",
"my_arr": [2, 3, {"my_key": "two tokens"}, 4]
}"#,
)
.unwrap();
let doc = doc!(json_field=>json_val.clone());
let index = Index::create_in_ram(schema.clone());
let mut writer = index.writer_for_tests().unwrap();
writer.add_document(doc).unwrap();
writer.commit().unwrap();
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let segment_reader = searcher.segment_reader(0u32);
fn assert_type(reader: &SegmentReader, field: &str, typ: ColumnType) {
let cols = reader.fast_fields().dynamic_column_handles(field).unwrap();
assert_eq!(cols.len(), 1, "{}", field);
assert_eq!(cols[0].column_type(), typ, "{}", field);
}
assert_type(segment_reader, "json.toto", ColumnType::Str);
assert_type(segment_reader, "json.float", ColumnType::F64);
assert_type(segment_reader, "json.bool", ColumnType::Bool);
assert_type(segment_reader, "json.unsigned", ColumnType::I64);
assert_type(segment_reader, "json.signed", ColumnType::I64);
assert_type(
segment_reader,
"json.complexobject.field\\.with\\.dot",
ColumnType::I64,
);
assert_type(segment_reader, "json.date", ColumnType::DateTime);
assert_type(segment_reader, "json.my_arr", ColumnType::I64);
assert_type(segment_reader, "json.my_arr.my_key", ColumnType::Str);
fn assert_empty(reader: &SegmentReader, field: &str) {
let cols = reader.fast_fields().dynamic_column_handles(field).unwrap();
assert_eq!(cols.len(), 0);
}
assert_empty(segment_reader, "unknown");
assert_empty(segment_reader, "json");
assert_empty(segment_reader, "json.toto.titi");
let sub_columns = segment_reader
.fast_fields()
.dynamic_subpath_column_handles("json")
.unwrap();
assert_eq!(sub_columns.len(), 9);
let subsub_columns = segment_reader
.fast_fields()
.dynamic_subpath_column_handles("json.complexobject")
.unwrap();
assert_eq!(subsub_columns.len(), 1);
}
#[test]
fn test_json_term_with_numeric_merge_panic_regression_bug_2283() {
// https://github.com/quickwit-oss/tantivy/issues/2283

View File

@@ -7,14 +7,32 @@ use crate::docset::{DocSet, TERMINATED};
use crate::index::SegmentReader;
use crate::query::explanation::does_not_match;
use crate::query::{EnableScoring, Explanation, Query, Scorer, Weight};
use crate::schema::Type;
use crate::{DocId, Score, TantivyError};
/// Query that matches all documents with a non-null value in the specified field.
/// Query that matches all documents with a non-null value in the specified
/// field.
///
/// When querying inside a JSON field, "exists" queries can be executed strictly
/// on the field name or check all the subpaths. In that second case a document
/// will be matched if a non-null value exists in any subpath. For example,
/// assuming the following document where `myfield` is a JSON fast field:
/// ```json
/// {
/// "myfield": {
/// "mysubfield": "hello"
/// }
/// }
/// ```
/// With `json_subpaths` enabled queries on either `myfield` or
/// `myfield.mysubfield` will match the document. If it is set to false, only
/// `myfield.mysubfield` will match it.
///
/// All of the matched documents get the score 1.0.
#[derive(Clone, Debug)]
pub struct ExistsQuery {
field_name: String,
json_subpaths: bool,
}
impl ExistsQuery {
@@ -23,8 +41,28 @@ impl ExistsQuery {
/// This query matches all documents with at least one non-null value in the specified field.
/// This constructor never fails, but executing the search with this query will return an
/// error if the specified field doesn't exists or is not a fast field.
#[deprecated]
pub fn new_exists_query(field: String) -> ExistsQuery {
ExistsQuery { field_name: field }
ExistsQuery {
field_name: field,
json_subpaths: false,
}
}
/// Creates a new `ExistQuery` from the given field.
///
/// This query matches all documents with at least one non-null value in the
/// specified field. If `json_subpaths` is set to true, documents with
/// non-null values in any JSON subpath will also be matched.
///
/// This constructor never fails, but executing the search with this query will
/// return an error if the specified field doesn't exists or is not a fast
/// field.
pub fn new(field: String, json_subpaths: bool) -> Self {
Self {
field_name: field,
json_subpaths,
}
}
}
@@ -43,6 +81,8 @@ impl Query for ExistsQuery {
}
Ok(Box::new(ExistsWeight {
field_name: self.field_name.clone(),
field_type: field_type.value_type(),
json_subpaths: self.json_subpaths,
}))
}
}
@@ -50,13 +90,20 @@ impl Query for ExistsQuery {
/// Weight associated with the `ExistsQuery` query.
pub struct ExistsWeight {
field_name: String,
field_type: Type,
json_subpaths: bool,
}
impl Weight for ExistsWeight {
fn scorer(&self, reader: &SegmentReader, boost: Score) -> crate::Result<Box<dyn Scorer>> {
let fast_field_reader = reader.fast_fields();
let dynamic_columns: crate::Result<Vec<DynamicColumn>> = fast_field_reader
.dynamic_column_handles(&self.field_name)?
let mut column_handles = fast_field_reader.dynamic_column_handles(&self.field_name)?;
if self.field_type == Type::Json && self.json_subpaths {
let mut sub_columns =
fast_field_reader.dynamic_subpath_column_handles(&self.field_name)?;
column_handles.append(&mut sub_columns);
}
let dynamic_columns: crate::Result<Vec<DynamicColumn>> = column_handles
.into_iter()
.map(|handle| handle.open().map_err(|io_error| io_error.into()))
.collect();
@@ -180,11 +227,12 @@ mod tests {
let reader = index.reader()?;
let searcher = reader.searcher();
assert_eq!(count_existing_fields(&searcher, "all")?, 100);
assert_eq!(count_existing_fields(&searcher, "odd")?, 50);
assert_eq!(count_existing_fields(&searcher, "even")?, 50);
assert_eq!(count_existing_fields(&searcher, "multi")?, 10);
assert_eq!(count_existing_fields(&searcher, "never")?, 0);
assert_eq!(count_existing_fields(&searcher, "all", false)?, 100);
assert_eq!(count_existing_fields(&searcher, "odd", false)?, 50);
assert_eq!(count_existing_fields(&searcher, "even", false)?, 50);
assert_eq!(count_existing_fields(&searcher, "multi", false)?, 10);
assert_eq!(count_existing_fields(&searcher, "multi", true)?, 10);
assert_eq!(count_existing_fields(&searcher, "never", false)?, 0);
// exercise seek
let query = BooleanQuery::intersection(vec![
@@ -192,7 +240,7 @@ mod tests {
Bound::Included(Term::from_field_u64(all_field, 50)),
Bound::Unbounded,
)),
Box::new(ExistsQuery::new_exists_query("even".to_string())),
Box::new(ExistsQuery::new("even".to_string(), false)),
]);
assert_eq!(searcher.search(&query, &Count)?, 25);
@@ -201,7 +249,7 @@ mod tests {
Bound::Included(Term::from_field_u64(all_field, 0)),
Bound::Included(Term::from_field_u64(all_field, 50)),
)),
Box::new(ExistsQuery::new_exists_query("odd".to_string())),
Box::new(ExistsQuery::new("odd".to_string(), false)),
]);
assert_eq!(searcher.search(&query, &Count)?, 25);
@@ -230,22 +278,18 @@ mod tests {
let reader = index.reader()?;
let searcher = reader.searcher();
assert_eq!(count_existing_fields(&searcher, "json.all")?, 100);
assert_eq!(count_existing_fields(&searcher, "json.even")?, 50);
assert_eq!(count_existing_fields(&searcher, "json.odd")?, 50);
assert_eq!(count_existing_fields(&searcher, "json.all", false)?, 100);
assert_eq!(count_existing_fields(&searcher, "json.even", false)?, 50);
assert_eq!(count_existing_fields(&searcher, "json.even", true)?, 50);
assert_eq!(count_existing_fields(&searcher, "json.odd", false)?, 50);
assert_eq!(count_existing_fields(&searcher, "json", false)?, 0);
assert_eq!(count_existing_fields(&searcher, "json", true)?, 100);
// Handling of non-existing fields:
assert_eq!(count_existing_fields(&searcher, "json.absent")?, 0);
assert_eq!(
searcher
.search(
&ExistsQuery::new_exists_query("does_not_exists.absent".to_string()),
&Count
)
.unwrap_err()
.to_string(),
"The field does not exist: 'does_not_exists.absent'"
);
assert_eq!(count_existing_fields(&searcher, "json.absent", false)?, 0);
assert_eq!(count_existing_fields(&searcher, "json.absent", true)?, 0);
assert_does_not_exist(&searcher, "does_not_exists.absent", true);
assert_does_not_exist(&searcher, "does_not_exists.absent", false);
Ok(())
}
@@ -284,12 +328,13 @@ mod tests {
let reader = index.reader()?;
let searcher = reader.searcher();
assert_eq!(count_existing_fields(&searcher, "bool")?, 50);
assert_eq!(count_existing_fields(&searcher, "bytes")?, 50);
assert_eq!(count_existing_fields(&searcher, "date")?, 50);
assert_eq!(count_existing_fields(&searcher, "f64")?, 50);
assert_eq!(count_existing_fields(&searcher, "ip_addr")?, 50);
assert_eq!(count_existing_fields(&searcher, "facet")?, 50);
assert_eq!(count_existing_fields(&searcher, "bool", false)?, 50);
assert_eq!(count_existing_fields(&searcher, "bool", true)?, 50);
assert_eq!(count_existing_fields(&searcher, "bytes", false)?, 50);
assert_eq!(count_existing_fields(&searcher, "date", false)?, 50);
assert_eq!(count_existing_fields(&searcher, "f64", false)?, 50);
assert_eq!(count_existing_fields(&searcher, "ip_addr", false)?, 50);
assert_eq!(count_existing_fields(&searcher, "facet", false)?, 50);
Ok(())
}
@@ -313,31 +358,33 @@ mod tests {
assert_eq!(
searcher
.search(
&ExistsQuery::new_exists_query("not_fast".to_string()),
&Count
)
.search(&ExistsQuery::new("not_fast".to_string(), false), &Count)
.unwrap_err()
.to_string(),
"Schema error: 'Field not_fast is not a fast field.'"
);
assert_eq!(
searcher
.search(
&ExistsQuery::new_exists_query("does_not_exists".to_string()),
&Count
)
.unwrap_err()
.to_string(),
"The field does not exist: 'does_not_exists'"
);
assert_does_not_exist(&searcher, "does_not_exists", false);
Ok(())
}
fn count_existing_fields(searcher: &Searcher, field: &str) -> crate::Result<usize> {
let query = ExistsQuery::new_exists_query(field.to_string());
fn count_existing_fields(
searcher: &Searcher,
field: &str,
json_subpaths: bool,
) -> crate::Result<usize> {
let query = ExistsQuery::new(field.to_string(), json_subpaths);
searcher.search(&query, &Count)
}
fn assert_does_not_exist(searcher: &Searcher, field: &str, json_subpaths: bool) {
assert_eq!(
searcher
.search(&ExistsQuery::new(field.to_string(), json_subpaths), &Count)
.unwrap_err()
.to_string(),
format!("The field does not exist: '{}'", field)
);
}
}

View File

@@ -521,6 +521,25 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
StreamerBuilder::new(self, AlwaysMatch)
}
/// Returns a range builder filtered with a prefix.
pub fn prefix_range<K: AsRef<[u8]>>(&self, prefix: K) -> StreamerBuilder<TSSTable> {
let lower_bound = prefix.as_ref();
let mut upper_bound = lower_bound.to_vec();
for idx in (0..upper_bound.len()).rev() {
if upper_bound[idx] == 255 {
upper_bound.pop();
} else {
upper_bound[idx] += 1;
break;
}
}
let mut builder = self.range().ge(lower_bound);
if !upper_bound.is_empty() {
builder = builder.lt(upper_bound);
}
builder
}
/// A stream of all the sorted terms.
pub fn stream(&self) -> io::Result<Streamer<TSSTable>> {
self.range().into_stream()
@@ -928,4 +947,62 @@ mod tests {
}
assert!(!stream.advance());
}
#[test]
fn test_prefix() {
let (dic, _slice) = make_test_sstable();
{
let mut stream = dic.prefix_range("1").into_stream().unwrap();
for i in 0x10000..0x20000 {
assert!(stream.advance());
assert_eq!(stream.term_ord(), i);
assert_eq!(stream.value(), &i);
assert_eq!(stream.key(), format!("{i:05X}").into_bytes());
}
assert!(!stream.advance());
}
{
let mut stream = dic.prefix_range("").into_stream().unwrap();
for i in 0..0x3ffff {
assert!(stream.advance(), "failed at {i:05X}");
assert_eq!(stream.term_ord(), i);
assert_eq!(stream.value(), &i);
assert_eq!(stream.key(), format!("{i:05X}").into_bytes());
}
assert!(!stream.advance());
}
{
let mut stream = dic.prefix_range("0FF").into_stream().unwrap();
for i in 0x0ff00..=0x0ffff {
assert!(stream.advance(), "failed at {i:05X}");
assert_eq!(stream.term_ord(), i);
assert_eq!(stream.value(), &i);
assert_eq!(stream.key(), format!("{i:05X}").into_bytes());
}
assert!(!stream.advance());
}
}
#[test]
fn test_prefix_edge() {
let dict = {
let mut builder = Dictionary::<MonotonicU64SSTable>::builder(Vec::new()).unwrap();
builder.insert(&[0, 254], &0).unwrap();
builder.insert(&[0, 255], &1).unwrap();
builder.insert(&[0, 255, 12], &2).unwrap();
builder.insert(&[1], &2).unwrap();
builder.insert(&[1, 0], &2).unwrap();
let table = builder.finish().unwrap();
let table = Arc::new(PermissionedHandle::new(table));
let slice = common::file_slice::FileSlice::new(table.clone());
Dictionary::<MonotonicU64SSTable>::open(slice).unwrap()
};
let mut stream = dict.prefix_range(&[0, 255]).into_stream().unwrap();
assert!(stream.advance());
assert_eq!(stream.key(), &[0, 255]);
assert!(stream.advance());
assert_eq!(stream.key(), &[0, 255, 12]);
assert!(!stream.advance());
}
}

View File

@@ -26,7 +26,7 @@ path = "example/hashmap.rs"
[dev-dependencies]
rand = "0.8.5"
zipf = "7.0.0"
rustc-hash = "1.1.0"
rustc-hash = "2.1.0"
proptest = "1.2.0"
binggan = { version = "0.14.0" }