mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-03 15:52:55 +00:00
Compare commits
9 Commits
test_parse
...
issue/2411
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
82b510b88b | ||
|
|
71cf19870b | ||
|
|
148594f0f9 | ||
|
|
8edb439440 | ||
|
|
c39d91f827 | ||
|
|
32b6e9711b | ||
|
|
0f99d4f420 | ||
|
|
6e02c5cb25 | ||
|
|
0bac391291 |
@@ -53,8 +53,9 @@ rayon = "1.5.2"
|
||||
lru = "0.12.0"
|
||||
fastdivide = "0.4.0"
|
||||
itertools = "0.13.0"
|
||||
measure_time = "0.8.2"
|
||||
measure_time = "0.9.0"
|
||||
arc-swap = "1.5.0"
|
||||
bon = "3.3.1"
|
||||
|
||||
columnar = { version = "0.3", path = "./columnar", package = "tantivy-columnar" }
|
||||
sstable = { version = "0.3", path = "./sstable", package = "tantivy-sstable", optional = true }
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use std::{fmt, io, mem};
|
||||
|
||||
use common::file_slice::FileSlice;
|
||||
use common::json_path_writer::JSON_PATH_SEGMENT_SEP;
|
||||
use common::BinarySerializable;
|
||||
use sstable::{Dictionary, RangeSSTable};
|
||||
|
||||
@@ -76,6 +77,19 @@ fn read_all_columns_in_stream(
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
fn column_dictionary_prefix_for_column_name(column_name: &str) -> String {
|
||||
// Each column is a associated to a given `column_key`,
|
||||
// that starts by `column_name\0column_header`.
|
||||
//
|
||||
// Listing the columns associated to the given column name is therefore equivalent to
|
||||
// listing `column_key` with the prefix `column_name\0`.
|
||||
format!("{}{}", column_name, '\0')
|
||||
}
|
||||
|
||||
fn column_dictionary_prefix_for_subpath(root_path: &str) -> String {
|
||||
format!("{}{}", root_path, JSON_PATH_SEGMENT_SEP as char)
|
||||
}
|
||||
|
||||
impl ColumnarReader {
|
||||
/// Opens a new Columnar file.
|
||||
pub fn open<F>(file_slice: F) -> io::Result<ColumnarReader>
|
||||
@@ -144,32 +158,14 @@ impl ColumnarReader {
|
||||
Ok(self.iter_columns()?.collect())
|
||||
}
|
||||
|
||||
fn stream_for_column_range(&self, column_name: &str) -> sstable::StreamerBuilder<RangeSSTable> {
|
||||
// Each column is a associated to a given `column_key`,
|
||||
// that starts by `column_name\0column_header`.
|
||||
//
|
||||
// Listing the columns associated to the given column name is therefore equivalent to
|
||||
// listing `column_key` with the prefix `column_name\0`.
|
||||
//
|
||||
// This is in turn equivalent to searching for the range
|
||||
// `[column_name,\0`..column_name\1)`.
|
||||
// TODO can we get some more generic `prefix(..)` logic in the dictionary.
|
||||
let mut start_key = column_name.to_string();
|
||||
start_key.push('\0');
|
||||
let mut end_key = column_name.to_string();
|
||||
end_key.push(1u8 as char);
|
||||
self.column_dictionary
|
||||
.range()
|
||||
.ge(start_key.as_bytes())
|
||||
.lt(end_key.as_bytes())
|
||||
}
|
||||
|
||||
pub async fn read_columns_async(
|
||||
&self,
|
||||
column_name: &str,
|
||||
) -> io::Result<Vec<DynamicColumnHandle>> {
|
||||
let prefix = column_dictionary_prefix_for_column_name(column_name);
|
||||
let stream = self
|
||||
.stream_for_column_range(column_name)
|
||||
.column_dictionary
|
||||
.prefix_range(prefix)
|
||||
.into_stream_async()
|
||||
.await?;
|
||||
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
|
||||
@@ -180,7 +176,35 @@ impl ColumnarReader {
|
||||
/// There can be more than one column associated to a given column name, provided they have
|
||||
/// different types.
|
||||
pub fn read_columns(&self, column_name: &str) -> io::Result<Vec<DynamicColumnHandle>> {
|
||||
let stream = self.stream_for_column_range(column_name).into_stream()?;
|
||||
let prefix = column_dictionary_prefix_for_column_name(column_name);
|
||||
let stream = self.column_dictionary.prefix_range(prefix).into_stream()?;
|
||||
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
|
||||
}
|
||||
|
||||
pub async fn read_subpath_columns_async(
|
||||
&self,
|
||||
root_path: &str,
|
||||
) -> io::Result<Vec<DynamicColumnHandle>> {
|
||||
let prefix = column_dictionary_prefix_for_subpath(root_path);
|
||||
let stream = self
|
||||
.column_dictionary
|
||||
.prefix_range(prefix)
|
||||
.into_stream_async()
|
||||
.await?;
|
||||
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
|
||||
}
|
||||
|
||||
/// Get all inner columns for a given JSON prefix, i.e columns for which the name starts
|
||||
/// with the prefix then contain the [`JSON_PATH_SEGMENT_SEP`].
|
||||
///
|
||||
/// There can be more than one column associated to each path within the JSON structure,
|
||||
/// provided they have different types.
|
||||
pub fn read_subpath_columns(&self, root_path: &str) -> io::Result<Vec<DynamicColumnHandle>> {
|
||||
let prefix = column_dictionary_prefix_for_subpath(root_path);
|
||||
let stream = self
|
||||
.column_dictionary
|
||||
.prefix_range(prefix.as_bytes())
|
||||
.into_stream()?;
|
||||
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
|
||||
}
|
||||
|
||||
@@ -192,6 +216,8 @@ impl ColumnarReader {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common::json_path_writer::JSON_PATH_SEGMENT_SEP;
|
||||
|
||||
use crate::{ColumnType, ColumnarReader, ColumnarWriter};
|
||||
|
||||
#[test]
|
||||
@@ -224,6 +250,64 @@ mod tests {
|
||||
assert_eq!(columns[0].1.column_type(), ColumnType::U64);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_read_columns() {
|
||||
let mut columnar_writer = ColumnarWriter::default();
|
||||
columnar_writer.record_column_type("col", ColumnType::U64, false);
|
||||
columnar_writer.record_numerical(1, "col", 1u64);
|
||||
let mut buffer = Vec::new();
|
||||
columnar_writer.serialize(2, &mut buffer).unwrap();
|
||||
let columnar = ColumnarReader::open(buffer).unwrap();
|
||||
{
|
||||
let columns = columnar.read_columns("col").unwrap();
|
||||
assert_eq!(columns.len(), 1);
|
||||
assert_eq!(columns[0].column_type(), ColumnType::U64);
|
||||
}
|
||||
{
|
||||
let columns = columnar.read_columns("other").unwrap();
|
||||
assert_eq!(columns.len(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_read_subpath_columns() {
|
||||
let mut columnar_writer = ColumnarWriter::default();
|
||||
columnar_writer.record_str(
|
||||
0,
|
||||
&format!("col1{}subcol1", JSON_PATH_SEGMENT_SEP as char),
|
||||
"hello",
|
||||
);
|
||||
columnar_writer.record_numerical(
|
||||
0,
|
||||
&format!("col1{}subcol2", JSON_PATH_SEGMENT_SEP as char),
|
||||
1i64,
|
||||
);
|
||||
columnar_writer.record_str(1, "col1", "hello");
|
||||
columnar_writer.record_str(0, "col2", "hello");
|
||||
let mut buffer = Vec::new();
|
||||
columnar_writer.serialize(2, &mut buffer).unwrap();
|
||||
|
||||
let columnar = ColumnarReader::open(buffer).unwrap();
|
||||
{
|
||||
let columns = columnar.read_subpath_columns("col1").unwrap();
|
||||
assert_eq!(columns.len(), 2);
|
||||
assert_eq!(columns[0].column_type(), ColumnType::Str);
|
||||
assert_eq!(columns[1].column_type(), ColumnType::I64);
|
||||
}
|
||||
{
|
||||
let columns = columnar.read_subpath_columns("col1.subcol1").unwrap();
|
||||
assert_eq!(columns.len(), 0);
|
||||
}
|
||||
{
|
||||
let columns = columnar.read_subpath_columns("col2").unwrap();
|
||||
assert_eq!(columns.len(), 0);
|
||||
}
|
||||
{
|
||||
let columns = columnar.read_subpath_columns("other").unwrap();
|
||||
assert_eq!(columns.len(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic(expected = "Input type forbidden")]
|
||||
fn test_list_columns_strict_typing_panics_on_wrong_types() {
|
||||
|
||||
@@ -285,7 +285,6 @@ impl ColumnarWriter {
|
||||
.map(|(column_name, addr)| (column_name, ColumnType::DateTime, addr)),
|
||||
);
|
||||
columns.sort_unstable_by_key(|(column_name, col_type, _)| (*column_name, *col_type));
|
||||
|
||||
let (arena, buffers, dictionaries) = (&self.arena, &mut self.buffers, &self.dictionaries);
|
||||
let mut symbol_byte_buffer: Vec<u8> = Vec::new();
|
||||
for (column_name, column_type, addr) in columns {
|
||||
|
||||
@@ -271,10 +271,6 @@ impl AggregationWithAccessor {
|
||||
field: ref field_name,
|
||||
..
|
||||
})
|
||||
| Count(CountAggregation {
|
||||
field: ref field_name,
|
||||
..
|
||||
})
|
||||
| Max(MaxAggregation {
|
||||
field: ref field_name,
|
||||
..
|
||||
@@ -299,6 +295,24 @@ impl AggregationWithAccessor {
|
||||
get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?;
|
||||
add_agg_with_accessor(&agg, accessor, column_type, &mut res)?;
|
||||
}
|
||||
Count(CountAggregation {
|
||||
field: ref field_name,
|
||||
..
|
||||
}) => {
|
||||
let allowed_column_types = [
|
||||
ColumnType::I64,
|
||||
ColumnType::U64,
|
||||
ColumnType::F64,
|
||||
ColumnType::Str,
|
||||
ColumnType::DateTime,
|
||||
ColumnType::Bool,
|
||||
ColumnType::IpAddr,
|
||||
// ColumnType::Bytes Unsupported
|
||||
];
|
||||
let (accessor, column_type) =
|
||||
get_ff_reader(reader, field_name, Some(&allowed_column_types))?;
|
||||
add_agg_with_accessor(&agg, accessor, column_type, &mut res)?;
|
||||
}
|
||||
Percentiles(ref percentiles) => {
|
||||
let (accessor, column_type) = get_ff_reader(
|
||||
reader,
|
||||
|
||||
@@ -220,9 +220,23 @@ impl SegmentStatsCollector {
|
||||
.column_block_accessor
|
||||
.fetch_block(docs, &agg_accessor.accessor);
|
||||
}
|
||||
for val in agg_accessor.column_block_accessor.iter_vals() {
|
||||
let val1 = f64_from_fastfield_u64(val, &self.field_type);
|
||||
self.stats.collect(val1);
|
||||
if [
|
||||
ColumnType::I64,
|
||||
ColumnType::U64,
|
||||
ColumnType::F64,
|
||||
ColumnType::DateTime,
|
||||
]
|
||||
.contains(&self.field_type)
|
||||
{
|
||||
for val in agg_accessor.column_block_accessor.iter_vals() {
|
||||
let val1 = f64_from_fastfield_u64(val, &self.field_type);
|
||||
self.stats.collect(val1);
|
||||
}
|
||||
} else {
|
||||
for _val in agg_accessor.column_block_accessor.iter_vals() {
|
||||
// we ignore the value and simply record that we got something
|
||||
self.stats.collect(0.0);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -435,6 +449,11 @@ mod tests {
|
||||
"field": "score",
|
||||
},
|
||||
},
|
||||
"count_str": {
|
||||
"value_count": {
|
||||
"field": "text",
|
||||
},
|
||||
},
|
||||
"range": range_agg
|
||||
}))
|
||||
.unwrap();
|
||||
@@ -500,6 +519,13 @@ mod tests {
|
||||
})
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
res["count_str"],
|
||||
json!({
|
||||
"value": 7.0,
|
||||
})
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -578,7 +578,7 @@ mod tests {
|
||||
.set_indexing_options(
|
||||
TextFieldIndexing::default().set_index_option(IndexRecordOption::WithFreqs),
|
||||
)
|
||||
.set_fast(None)
|
||||
.set_fast(Some("raw"))
|
||||
.set_stored();
|
||||
let text_field = schema_builder.add_text_field("text", text_fieldtype);
|
||||
let date_field = schema_builder.add_date_field("date", FAST);
|
||||
|
||||
@@ -217,7 +217,7 @@ impl FastFieldReaders {
|
||||
Ok(dynamic_column.into())
|
||||
}
|
||||
|
||||
/// Returning a `dynamic_column_handle`.
|
||||
/// Returns a `dynamic_column_handle`.
|
||||
pub fn dynamic_column_handle(
|
||||
&self,
|
||||
field_name: &str,
|
||||
@@ -234,7 +234,7 @@ impl FastFieldReaders {
|
||||
Ok(dynamic_column_handle_opt)
|
||||
}
|
||||
|
||||
/// Returning all `dynamic_column_handle`.
|
||||
/// Returns all `dynamic_column_handle` that match the given field name.
|
||||
pub fn dynamic_column_handles(
|
||||
&self,
|
||||
field_name: &str,
|
||||
@@ -250,6 +250,22 @@ impl FastFieldReaders {
|
||||
Ok(dynamic_column_handles)
|
||||
}
|
||||
|
||||
/// Returns all `dynamic_column_handle` that are inner fields of the provided JSON path.
|
||||
pub fn dynamic_subpath_column_handles(
|
||||
&self,
|
||||
root_path: &str,
|
||||
) -> crate::Result<Vec<DynamicColumnHandle>> {
|
||||
let Some(resolved_field_name) = self.resolve_field(root_path)? else {
|
||||
return Ok(Vec::new());
|
||||
};
|
||||
let dynamic_column_handles = self
|
||||
.columnar
|
||||
.read_subpath_columns(&resolved_field_name)?
|
||||
.into_iter()
|
||||
.collect();
|
||||
Ok(dynamic_column_handles)
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
pub async fn list_dynamic_column_handles(
|
||||
&self,
|
||||
@@ -265,6 +281,21 @@ impl FastFieldReaders {
|
||||
Ok(columns)
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
pub async fn list_subpath_dynamic_column_handles(
|
||||
&self,
|
||||
root_path: &str,
|
||||
) -> crate::Result<Vec<DynamicColumnHandle>> {
|
||||
let Some(resolved_field_name) = self.resolve_field(root_path)? else {
|
||||
return Ok(Vec::new());
|
||||
};
|
||||
let columns = self
|
||||
.columnar
|
||||
.read_subpath_columns_async(&resolved_field_name)
|
||||
.await?;
|
||||
Ok(columns)
|
||||
}
|
||||
|
||||
/// Returns the `u64` column used to represent any `u64`-mapped typed (String/Bytes term ids,
|
||||
/// i64, u64, f64, DateTime).
|
||||
///
|
||||
@@ -476,6 +507,15 @@ mod tests {
|
||||
.iter()
|
||||
.any(|column| column.column_type() == ColumnType::Str));
|
||||
|
||||
println!("*** {:?}", fast_fields.columnar().list_columns());
|
||||
let json_columns = fast_fields.dynamic_column_handles("json").unwrap();
|
||||
assert_eq!(json_columns.len(), 0);
|
||||
|
||||
let json_subcolumns = fast_fields.dynamic_subpath_column_handles("json").unwrap();
|
||||
assert_eq!(json_subcolumns.len(), 3);
|
||||
|
||||
let foo_subcolumns = fast_fields
|
||||
.dynamic_subpath_column_handles("json.foo")
|
||||
.unwrap();
|
||||
assert_eq!(foo_subcolumns.len(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,7 +15,9 @@ use crate::directory::MmapDirectory;
|
||||
use crate::directory::{Directory, ManagedDirectory, RamDirectory, INDEX_WRITER_LOCK};
|
||||
use crate::error::{DataCorruption, TantivyError};
|
||||
use crate::index::{IndexMeta, SegmentId, SegmentMeta, SegmentMetaInventory};
|
||||
use crate::indexer::index_writer::{MAX_NUM_THREAD, MEMORY_BUDGET_NUM_BYTES_MIN};
|
||||
use crate::indexer::index_writer::{
|
||||
IndexWriterOptions, MAX_NUM_THREAD, MEMORY_BUDGET_NUM_BYTES_MIN,
|
||||
};
|
||||
use crate::indexer::segment_updater::save_metas;
|
||||
use crate::indexer::{IndexWriter, SingleSegmentIndexWriter};
|
||||
use crate::reader::{IndexReader, IndexReaderBuilder};
|
||||
@@ -519,6 +521,43 @@ impl Index {
|
||||
load_metas(self.directory(), &self.inventory)
|
||||
}
|
||||
|
||||
/// Open a new index writer with the given options. Attempts to acquire a lockfile.
|
||||
///
|
||||
/// The lockfile should be deleted on drop, but it is possible
|
||||
/// that due to a panic or other error, a stale lockfile will be
|
||||
/// left in the index directory. If you are sure that no other
|
||||
/// `IndexWriter` on the system is accessing the index directory,
|
||||
/// it is safe to manually delete the lockfile.
|
||||
///
|
||||
/// - `options` defines the writer configuration which includes things like buffer sizes,
|
||||
/// indexer threads, etc...
|
||||
///
|
||||
/// # Errors
|
||||
/// If the lockfile already exists, returns `TantivyError::LockFailure`.
|
||||
/// If the memory arena per thread is too small or too big, returns
|
||||
/// `TantivyError::InvalidArgument`
|
||||
pub fn writer_with_options<D: Document>(
|
||||
&self,
|
||||
options: IndexWriterOptions,
|
||||
) -> crate::Result<IndexWriter<D>> {
|
||||
let directory_lock = self
|
||||
.directory
|
||||
.acquire_lock(&INDEX_WRITER_LOCK)
|
||||
.map_err(|err| {
|
||||
TantivyError::LockFailure(
|
||||
err,
|
||||
Some(
|
||||
"Failed to acquire index lock. If you are using a regular directory, this \
|
||||
means there is already an `IndexWriter` working on this `Directory`, in \
|
||||
this process or in a different process."
|
||||
.to_string(),
|
||||
),
|
||||
)
|
||||
})?;
|
||||
|
||||
IndexWriter::new(self, options, directory_lock)
|
||||
}
|
||||
|
||||
/// Open a new index writer. Attempts to acquire a lockfile.
|
||||
///
|
||||
/// The lockfile should be deleted on drop, but it is possible
|
||||
@@ -543,27 +582,12 @@ impl Index {
|
||||
num_threads: usize,
|
||||
overall_memory_budget_in_bytes: usize,
|
||||
) -> crate::Result<IndexWriter<D>> {
|
||||
let directory_lock = self
|
||||
.directory
|
||||
.acquire_lock(&INDEX_WRITER_LOCK)
|
||||
.map_err(|err| {
|
||||
TantivyError::LockFailure(
|
||||
err,
|
||||
Some(
|
||||
"Failed to acquire index lock. If you are using a regular directory, this \
|
||||
means there is already an `IndexWriter` working on this `Directory`, in \
|
||||
this process or in a different process."
|
||||
.to_string(),
|
||||
),
|
||||
)
|
||||
})?;
|
||||
let memory_arena_in_bytes_per_thread = overall_memory_budget_in_bytes / num_threads;
|
||||
IndexWriter::new(
|
||||
self,
|
||||
num_threads,
|
||||
memory_arena_in_bytes_per_thread,
|
||||
directory_lock,
|
||||
)
|
||||
let options = IndexWriterOptions::builder()
|
||||
.num_worker_threads(num_threads)
|
||||
.memory_budget_per_thread(memory_arena_in_bytes_per_thread)
|
||||
.build();
|
||||
self.writer_with_options(options)
|
||||
}
|
||||
|
||||
/// Helper to create an index writer for tests.
|
||||
|
||||
@@ -45,6 +45,23 @@ fn error_in_index_worker_thread(context: &str) -> TantivyError {
|
||||
))
|
||||
}
|
||||
|
||||
#[derive(Clone, bon::Builder)]
|
||||
/// A builder for creating a new [IndexWriter] for an index.
|
||||
pub struct IndexWriterOptions {
|
||||
#[builder(default = MEMORY_BUDGET_NUM_BYTES_MIN)]
|
||||
/// The memory budget per indexer thread.
|
||||
///
|
||||
/// When an indexer thread has buffered this much data in memory
|
||||
/// it will flush the segment to disk (although this is not searchable until commit is called.)
|
||||
memory_budget_per_thread: usize,
|
||||
#[builder(default = 1)]
|
||||
/// The number of indexer worker threads to use.
|
||||
num_worker_threads: usize,
|
||||
#[builder(default = 4)]
|
||||
/// Defines the number of merger threads to use.
|
||||
num_merge_threads: usize,
|
||||
}
|
||||
|
||||
/// `IndexWriter` is the user entry-point to add document to an index.
|
||||
///
|
||||
/// It manages a small number of indexing thread, as well as a shared
|
||||
@@ -58,8 +75,7 @@ pub struct IndexWriter<D: Document = TantivyDocument> {
|
||||
|
||||
index: Index,
|
||||
|
||||
// The memory budget per thread, after which a commit is triggered.
|
||||
memory_budget_in_bytes_per_thread: usize,
|
||||
options: IndexWriterOptions,
|
||||
|
||||
workers_join_handle: Vec<JoinHandle<crate::Result<()>>>,
|
||||
|
||||
@@ -70,8 +86,6 @@ pub struct IndexWriter<D: Document = TantivyDocument> {
|
||||
|
||||
worker_id: usize,
|
||||
|
||||
num_threads: usize,
|
||||
|
||||
delete_queue: DeleteQueue,
|
||||
|
||||
stamper: Stamper,
|
||||
@@ -265,23 +279,27 @@ impl<D: Document> IndexWriter<D> {
|
||||
/// `TantivyError::InvalidArgument`
|
||||
pub(crate) fn new(
|
||||
index: &Index,
|
||||
num_threads: usize,
|
||||
memory_budget_in_bytes_per_thread: usize,
|
||||
options: IndexWriterOptions,
|
||||
directory_lock: DirectoryLock,
|
||||
) -> crate::Result<Self> {
|
||||
if memory_budget_in_bytes_per_thread < MEMORY_BUDGET_NUM_BYTES_MIN {
|
||||
if options.memory_budget_per_thread < MEMORY_BUDGET_NUM_BYTES_MIN {
|
||||
let err_msg = format!(
|
||||
"The memory arena in bytes per thread needs to be at least \
|
||||
{MEMORY_BUDGET_NUM_BYTES_MIN}."
|
||||
);
|
||||
return Err(TantivyError::InvalidArgument(err_msg));
|
||||
}
|
||||
if memory_budget_in_bytes_per_thread >= MEMORY_BUDGET_NUM_BYTES_MAX {
|
||||
if options.memory_budget_per_thread >= MEMORY_BUDGET_NUM_BYTES_MAX {
|
||||
let err_msg = format!(
|
||||
"The memory arena in bytes per thread cannot exceed {MEMORY_BUDGET_NUM_BYTES_MAX}"
|
||||
);
|
||||
return Err(TantivyError::InvalidArgument(err_msg));
|
||||
}
|
||||
if options.num_worker_threads == 0 {
|
||||
let err_msg = "At least one worker thread is required, got 0".to_string();
|
||||
return Err(TantivyError::InvalidArgument(err_msg));
|
||||
}
|
||||
|
||||
let (document_sender, document_receiver) =
|
||||
crossbeam_channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
|
||||
|
||||
@@ -291,13 +309,17 @@ impl<D: Document> IndexWriter<D> {
|
||||
|
||||
let stamper = Stamper::new(current_opstamp);
|
||||
|
||||
let segment_updater =
|
||||
SegmentUpdater::create(index.clone(), stamper.clone(), &delete_queue.cursor())?;
|
||||
let segment_updater = SegmentUpdater::create(
|
||||
index.clone(),
|
||||
stamper.clone(),
|
||||
&delete_queue.cursor(),
|
||||
options.num_merge_threads,
|
||||
)?;
|
||||
|
||||
let mut index_writer = Self {
|
||||
_directory_lock: Some(directory_lock),
|
||||
|
||||
memory_budget_in_bytes_per_thread,
|
||||
options: options.clone(),
|
||||
index: index.clone(),
|
||||
index_writer_status: IndexWriterStatus::from(document_receiver),
|
||||
operation_sender: document_sender,
|
||||
@@ -305,7 +327,6 @@ impl<D: Document> IndexWriter<D> {
|
||||
segment_updater,
|
||||
|
||||
workers_join_handle: vec![],
|
||||
num_threads,
|
||||
|
||||
delete_queue,
|
||||
|
||||
@@ -398,7 +419,7 @@ impl<D: Document> IndexWriter<D> {
|
||||
|
||||
let mut delete_cursor = self.delete_queue.cursor();
|
||||
|
||||
let mem_budget = self.memory_budget_in_bytes_per_thread;
|
||||
let mem_budget = self.options.memory_budget_per_thread;
|
||||
let index = self.index.clone();
|
||||
let join_handle: JoinHandle<crate::Result<()>> = thread::Builder::new()
|
||||
.name(format!("thrd-tantivy-index{}", self.worker_id))
|
||||
@@ -451,7 +472,7 @@ impl<D: Document> IndexWriter<D> {
|
||||
}
|
||||
|
||||
fn start_workers(&mut self) -> crate::Result<()> {
|
||||
for _ in 0..self.num_threads {
|
||||
for _ in 0..self.options.num_worker_threads {
|
||||
self.add_indexing_worker()?;
|
||||
}
|
||||
Ok(())
|
||||
@@ -553,12 +574,7 @@ impl<D: Document> IndexWriter<D> {
|
||||
.take()
|
||||
.expect("The IndexWriter does not have any lock. This is a bug, please report.");
|
||||
|
||||
let new_index_writer = IndexWriter::new(
|
||||
&self.index,
|
||||
self.num_threads,
|
||||
self.memory_budget_in_bytes_per_thread,
|
||||
directory_lock,
|
||||
)?;
|
||||
let new_index_writer = IndexWriter::new(&self.index, self.options.clone(), directory_lock)?;
|
||||
|
||||
// the current `self` is dropped right away because of this call.
|
||||
//
|
||||
@@ -812,7 +828,7 @@ mod tests {
|
||||
use crate::directory::error::LockError;
|
||||
use crate::error::*;
|
||||
use crate::indexer::index_writer::MEMORY_BUDGET_NUM_BYTES_MIN;
|
||||
use crate::indexer::NoMergePolicy;
|
||||
use crate::indexer::{IndexWriterOptions, NoMergePolicy};
|
||||
use crate::query::{QueryParser, TermQuery};
|
||||
use crate::schema::{
|
||||
self, Facet, FacetOptions, IndexRecordOption, IpAddrOptions, JsonObjectOptions,
|
||||
@@ -2533,4 +2549,36 @@ mod tests {
|
||||
index_writer.commit().unwrap();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_writer_options_validation() {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let field = schema_builder.add_bool_field("example", STORED);
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
|
||||
let opt_wo_threads = IndexWriterOptions::builder().num_worker_threads(0).build();
|
||||
let result = index.writer_with_options::<TantivyDocument>(opt_wo_threads);
|
||||
assert!(result.is_err(), "Writer should reject 0 thread count");
|
||||
assert!(matches!(result, Err(TantivyError::InvalidArgument(_))));
|
||||
|
||||
let opt_with_low_memory = IndexWriterOptions::builder()
|
||||
.memory_budget_per_thread(10 << 10)
|
||||
.build();
|
||||
let result = index.writer_with_options::<TantivyDocument>(opt_with_low_memory);
|
||||
assert!(
|
||||
result.is_err(),
|
||||
"Writer should reject options with too low memory size"
|
||||
);
|
||||
assert!(matches!(result, Err(TantivyError::InvalidArgument(_))));
|
||||
|
||||
let opt_with_low_memory = IndexWriterOptions::builder()
|
||||
.memory_budget_per_thread(5 << 30)
|
||||
.build();
|
||||
let result = index.writer_with_options::<TantivyDocument>(opt_with_low_memory);
|
||||
assert!(
|
||||
result.is_err(),
|
||||
"Writer should reject options with too high memory size"
|
||||
);
|
||||
assert!(matches!(result, Err(TantivyError::InvalidArgument(_))));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ mod stamper;
|
||||
use crossbeam_channel as channel;
|
||||
use smallvec::SmallVec;
|
||||
|
||||
pub use self::index_writer::IndexWriter;
|
||||
pub use self::index_writer::{IndexWriter, IndexWriterOptions};
|
||||
pub use self::log_merge_policy::LogMergePolicy;
|
||||
pub use self::merge_operation::MergeOperation;
|
||||
pub use self::merge_policy::{MergeCandidate, MergePolicy, NoMergePolicy};
|
||||
|
||||
@@ -25,8 +25,6 @@ use crate::indexer::{
|
||||
};
|
||||
use crate::{FutureResult, Opstamp};
|
||||
|
||||
const NUM_MERGE_THREADS: usize = 4;
|
||||
|
||||
/// Save the index meta file.
|
||||
/// This operation is atomic:
|
||||
/// Either
|
||||
@@ -273,6 +271,7 @@ impl SegmentUpdater {
|
||||
index: Index,
|
||||
stamper: Stamper,
|
||||
delete_cursor: &DeleteCursor,
|
||||
num_merge_threads: usize,
|
||||
) -> crate::Result<SegmentUpdater> {
|
||||
let segments = index.searchable_segment_metas()?;
|
||||
let segment_manager = SegmentManager::from_segments(segments, delete_cursor);
|
||||
@@ -287,7 +286,18 @@ impl SegmentUpdater {
|
||||
})?;
|
||||
let merge_thread_pool = ThreadPoolBuilder::new()
|
||||
.thread_name(|i| format!("merge_thread_{i}"))
|
||||
.num_threads(NUM_MERGE_THREADS)
|
||||
.num_threads(num_merge_threads)
|
||||
.panic_handler(move |panic| {
|
||||
let message = if let Some(msg) = panic.downcast_ref::<&str>() {
|
||||
*msg
|
||||
} else if let Some(msg) = panic.downcast_ref::<String>() {
|
||||
msg.as_str()
|
||||
} else {
|
||||
"UNKNOWN"
|
||||
};
|
||||
eprintln!("merge thread panicked with: {message}")
|
||||
|
||||
})
|
||||
.build()
|
||||
.map_err(|_| {
|
||||
crate::TantivyError::SystemError(
|
||||
|
||||
@@ -422,6 +422,7 @@ mod tests {
|
||||
use std::collections::BTreeMap;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use columnar::ColumnType;
|
||||
use tempfile::TempDir;
|
||||
|
||||
use crate::collector::{Count, TopDocs};
|
||||
@@ -431,15 +432,15 @@ mod tests {
|
||||
use crate::query::{PhraseQuery, QueryParser};
|
||||
use crate::schema::{
|
||||
Document, IndexRecordOption, OwnedValue, Schema, TextFieldIndexing, TextOptions, Value,
|
||||
DATE_TIME_PRECISION_INDEXED, STORED, STRING, TEXT,
|
||||
DATE_TIME_PRECISION_INDEXED, FAST, STORED, STRING, TEXT,
|
||||
};
|
||||
use crate::store::{Compressor, StoreReader, StoreWriter};
|
||||
use crate::time::format_description::well_known::Rfc3339;
|
||||
use crate::time::OffsetDateTime;
|
||||
use crate::tokenizer::{PreTokenizedString, Token};
|
||||
use crate::{
|
||||
DateTime, Directory, DocAddress, DocSet, Index, IndexWriter, TantivyDocument, Term,
|
||||
TERMINATED,
|
||||
DateTime, Directory, DocAddress, DocSet, Index, IndexWriter, SegmentReader,
|
||||
TantivyDocument, Term, TERMINATED,
|
||||
};
|
||||
|
||||
#[test]
|
||||
@@ -841,6 +842,75 @@ mod tests {
|
||||
assert_eq!(searcher.search(&phrase_query, &Count).unwrap(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_json_fast() {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let json_field = schema_builder.add_json_field("json", FAST);
|
||||
let schema = schema_builder.build();
|
||||
let json_val: serde_json::Value = serde_json::from_str(
|
||||
r#"{
|
||||
"toto": "titi",
|
||||
"float": -0.2,
|
||||
"bool": true,
|
||||
"unsigned": 1,
|
||||
"signed": -2,
|
||||
"complexobject": {
|
||||
"field.with.dot": 1
|
||||
},
|
||||
"date": "1985-04-12T23:20:50.52Z",
|
||||
"my_arr": [2, 3, {"my_key": "two tokens"}, 4]
|
||||
}"#,
|
||||
)
|
||||
.unwrap();
|
||||
let doc = doc!(json_field=>json_val.clone());
|
||||
let index = Index::create_in_ram(schema.clone());
|
||||
let mut writer = index.writer_for_tests().unwrap();
|
||||
writer.add_document(doc).unwrap();
|
||||
writer.commit().unwrap();
|
||||
let reader = index.reader().unwrap();
|
||||
let searcher = reader.searcher();
|
||||
let segment_reader = searcher.segment_reader(0u32);
|
||||
|
||||
fn assert_type(reader: &SegmentReader, field: &str, typ: ColumnType) {
|
||||
let cols = reader.fast_fields().dynamic_column_handles(field).unwrap();
|
||||
assert_eq!(cols.len(), 1, "{}", field);
|
||||
assert_eq!(cols[0].column_type(), typ, "{}", field);
|
||||
}
|
||||
assert_type(segment_reader, "json.toto", ColumnType::Str);
|
||||
assert_type(segment_reader, "json.float", ColumnType::F64);
|
||||
assert_type(segment_reader, "json.bool", ColumnType::Bool);
|
||||
assert_type(segment_reader, "json.unsigned", ColumnType::I64);
|
||||
assert_type(segment_reader, "json.signed", ColumnType::I64);
|
||||
assert_type(
|
||||
segment_reader,
|
||||
"json.complexobject.field\\.with\\.dot",
|
||||
ColumnType::I64,
|
||||
);
|
||||
assert_type(segment_reader, "json.date", ColumnType::DateTime);
|
||||
assert_type(segment_reader, "json.my_arr", ColumnType::I64);
|
||||
assert_type(segment_reader, "json.my_arr.my_key", ColumnType::Str);
|
||||
|
||||
fn assert_empty(reader: &SegmentReader, field: &str) {
|
||||
let cols = reader.fast_fields().dynamic_column_handles(field).unwrap();
|
||||
assert_eq!(cols.len(), 0);
|
||||
}
|
||||
assert_empty(segment_reader, "unknown");
|
||||
assert_empty(segment_reader, "json");
|
||||
assert_empty(segment_reader, "json.toto.titi");
|
||||
|
||||
let sub_columns = segment_reader
|
||||
.fast_fields()
|
||||
.dynamic_subpath_column_handles("json")
|
||||
.unwrap();
|
||||
assert_eq!(sub_columns.len(), 9);
|
||||
|
||||
let subsub_columns = segment_reader
|
||||
.fast_fields()
|
||||
.dynamic_subpath_column_handles("json.complexobject")
|
||||
.unwrap();
|
||||
assert_eq!(subsub_columns.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_json_term_with_numeric_merge_panic_regression_bug_2283() {
|
||||
// https://github.com/quickwit-oss/tantivy/issues/2283
|
||||
|
||||
@@ -7,14 +7,32 @@ use crate::docset::{DocSet, TERMINATED};
|
||||
use crate::index::SegmentReader;
|
||||
use crate::query::explanation::does_not_match;
|
||||
use crate::query::{EnableScoring, Explanation, Query, Scorer, Weight};
|
||||
use crate::schema::Type;
|
||||
use crate::{DocId, Score, TantivyError};
|
||||
|
||||
/// Query that matches all documents with a non-null value in the specified field.
|
||||
/// Query that matches all documents with a non-null value in the specified
|
||||
/// field.
|
||||
///
|
||||
/// When querying inside a JSON field, "exists" queries can be executed strictly
|
||||
/// on the field name or check all the subpaths. In that second case a document
|
||||
/// will be matched if a non-null value exists in any subpath. For example,
|
||||
/// assuming the following document where `myfield` is a JSON fast field:
|
||||
/// ```json
|
||||
/// {
|
||||
/// "myfield": {
|
||||
/// "mysubfield": "hello"
|
||||
/// }
|
||||
/// }
|
||||
/// ```
|
||||
/// With `json_subpaths` enabled queries on either `myfield` or
|
||||
/// `myfield.mysubfield` will match the document. If it is set to false, only
|
||||
/// `myfield.mysubfield` will match it.
|
||||
///
|
||||
/// All of the matched documents get the score 1.0.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ExistsQuery {
|
||||
field_name: String,
|
||||
json_subpaths: bool,
|
||||
}
|
||||
|
||||
impl ExistsQuery {
|
||||
@@ -23,8 +41,28 @@ impl ExistsQuery {
|
||||
/// This query matches all documents with at least one non-null value in the specified field.
|
||||
/// This constructor never fails, but executing the search with this query will return an
|
||||
/// error if the specified field doesn't exists or is not a fast field.
|
||||
#[deprecated]
|
||||
pub fn new_exists_query(field: String) -> ExistsQuery {
|
||||
ExistsQuery { field_name: field }
|
||||
ExistsQuery {
|
||||
field_name: field,
|
||||
json_subpaths: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new `ExistQuery` from the given field.
|
||||
///
|
||||
/// This query matches all documents with at least one non-null value in the
|
||||
/// specified field. If `json_subpaths` is set to true, documents with
|
||||
/// non-null values in any JSON subpath will also be matched.
|
||||
///
|
||||
/// This constructor never fails, but executing the search with this query will
|
||||
/// return an error if the specified field doesn't exists or is not a fast
|
||||
/// field.
|
||||
pub fn new(field: String, json_subpaths: bool) -> Self {
|
||||
Self {
|
||||
field_name: field,
|
||||
json_subpaths,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,6 +81,8 @@ impl Query for ExistsQuery {
|
||||
}
|
||||
Ok(Box::new(ExistsWeight {
|
||||
field_name: self.field_name.clone(),
|
||||
field_type: field_type.value_type(),
|
||||
json_subpaths: self.json_subpaths,
|
||||
}))
|
||||
}
|
||||
}
|
||||
@@ -50,13 +90,20 @@ impl Query for ExistsQuery {
|
||||
/// Weight associated with the `ExistsQuery` query.
|
||||
pub struct ExistsWeight {
|
||||
field_name: String,
|
||||
field_type: Type,
|
||||
json_subpaths: bool,
|
||||
}
|
||||
|
||||
impl Weight for ExistsWeight {
|
||||
fn scorer(&self, reader: &SegmentReader, boost: Score) -> crate::Result<Box<dyn Scorer>> {
|
||||
let fast_field_reader = reader.fast_fields();
|
||||
let dynamic_columns: crate::Result<Vec<DynamicColumn>> = fast_field_reader
|
||||
.dynamic_column_handles(&self.field_name)?
|
||||
let mut column_handles = fast_field_reader.dynamic_column_handles(&self.field_name)?;
|
||||
if self.field_type == Type::Json && self.json_subpaths {
|
||||
let mut sub_columns =
|
||||
fast_field_reader.dynamic_subpath_column_handles(&self.field_name)?;
|
||||
column_handles.append(&mut sub_columns);
|
||||
}
|
||||
let dynamic_columns: crate::Result<Vec<DynamicColumn>> = column_handles
|
||||
.into_iter()
|
||||
.map(|handle| handle.open().map_err(|io_error| io_error.into()))
|
||||
.collect();
|
||||
@@ -180,11 +227,12 @@ mod tests {
|
||||
let reader = index.reader()?;
|
||||
let searcher = reader.searcher();
|
||||
|
||||
assert_eq!(count_existing_fields(&searcher, "all")?, 100);
|
||||
assert_eq!(count_existing_fields(&searcher, "odd")?, 50);
|
||||
assert_eq!(count_existing_fields(&searcher, "even")?, 50);
|
||||
assert_eq!(count_existing_fields(&searcher, "multi")?, 10);
|
||||
assert_eq!(count_existing_fields(&searcher, "never")?, 0);
|
||||
assert_eq!(count_existing_fields(&searcher, "all", false)?, 100);
|
||||
assert_eq!(count_existing_fields(&searcher, "odd", false)?, 50);
|
||||
assert_eq!(count_existing_fields(&searcher, "even", false)?, 50);
|
||||
assert_eq!(count_existing_fields(&searcher, "multi", false)?, 10);
|
||||
assert_eq!(count_existing_fields(&searcher, "multi", true)?, 10);
|
||||
assert_eq!(count_existing_fields(&searcher, "never", false)?, 0);
|
||||
|
||||
// exercise seek
|
||||
let query = BooleanQuery::intersection(vec![
|
||||
@@ -192,7 +240,7 @@ mod tests {
|
||||
Bound::Included(Term::from_field_u64(all_field, 50)),
|
||||
Bound::Unbounded,
|
||||
)),
|
||||
Box::new(ExistsQuery::new_exists_query("even".to_string())),
|
||||
Box::new(ExistsQuery::new("even".to_string(), false)),
|
||||
]);
|
||||
assert_eq!(searcher.search(&query, &Count)?, 25);
|
||||
|
||||
@@ -201,7 +249,7 @@ mod tests {
|
||||
Bound::Included(Term::from_field_u64(all_field, 0)),
|
||||
Bound::Included(Term::from_field_u64(all_field, 50)),
|
||||
)),
|
||||
Box::new(ExistsQuery::new_exists_query("odd".to_string())),
|
||||
Box::new(ExistsQuery::new("odd".to_string(), false)),
|
||||
]);
|
||||
assert_eq!(searcher.search(&query, &Count)?, 25);
|
||||
|
||||
@@ -230,22 +278,18 @@ mod tests {
|
||||
let reader = index.reader()?;
|
||||
let searcher = reader.searcher();
|
||||
|
||||
assert_eq!(count_existing_fields(&searcher, "json.all")?, 100);
|
||||
assert_eq!(count_existing_fields(&searcher, "json.even")?, 50);
|
||||
assert_eq!(count_existing_fields(&searcher, "json.odd")?, 50);
|
||||
assert_eq!(count_existing_fields(&searcher, "json.all", false)?, 100);
|
||||
assert_eq!(count_existing_fields(&searcher, "json.even", false)?, 50);
|
||||
assert_eq!(count_existing_fields(&searcher, "json.even", true)?, 50);
|
||||
assert_eq!(count_existing_fields(&searcher, "json.odd", false)?, 50);
|
||||
assert_eq!(count_existing_fields(&searcher, "json", false)?, 0);
|
||||
assert_eq!(count_existing_fields(&searcher, "json", true)?, 100);
|
||||
|
||||
// Handling of non-existing fields:
|
||||
assert_eq!(count_existing_fields(&searcher, "json.absent")?, 0);
|
||||
assert_eq!(
|
||||
searcher
|
||||
.search(
|
||||
&ExistsQuery::new_exists_query("does_not_exists.absent".to_string()),
|
||||
&Count
|
||||
)
|
||||
.unwrap_err()
|
||||
.to_string(),
|
||||
"The field does not exist: 'does_not_exists.absent'"
|
||||
);
|
||||
assert_eq!(count_existing_fields(&searcher, "json.absent", false)?, 0);
|
||||
assert_eq!(count_existing_fields(&searcher, "json.absent", true)?, 0);
|
||||
assert_does_not_exist(&searcher, "does_not_exists.absent", true);
|
||||
assert_does_not_exist(&searcher, "does_not_exists.absent", false);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -284,12 +328,13 @@ mod tests {
|
||||
let reader = index.reader()?;
|
||||
let searcher = reader.searcher();
|
||||
|
||||
assert_eq!(count_existing_fields(&searcher, "bool")?, 50);
|
||||
assert_eq!(count_existing_fields(&searcher, "bytes")?, 50);
|
||||
assert_eq!(count_existing_fields(&searcher, "date")?, 50);
|
||||
assert_eq!(count_existing_fields(&searcher, "f64")?, 50);
|
||||
assert_eq!(count_existing_fields(&searcher, "ip_addr")?, 50);
|
||||
assert_eq!(count_existing_fields(&searcher, "facet")?, 50);
|
||||
assert_eq!(count_existing_fields(&searcher, "bool", false)?, 50);
|
||||
assert_eq!(count_existing_fields(&searcher, "bool", true)?, 50);
|
||||
assert_eq!(count_existing_fields(&searcher, "bytes", false)?, 50);
|
||||
assert_eq!(count_existing_fields(&searcher, "date", false)?, 50);
|
||||
assert_eq!(count_existing_fields(&searcher, "f64", false)?, 50);
|
||||
assert_eq!(count_existing_fields(&searcher, "ip_addr", false)?, 50);
|
||||
assert_eq!(count_existing_fields(&searcher, "facet", false)?, 50);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -313,31 +358,33 @@ mod tests {
|
||||
|
||||
assert_eq!(
|
||||
searcher
|
||||
.search(
|
||||
&ExistsQuery::new_exists_query("not_fast".to_string()),
|
||||
&Count
|
||||
)
|
||||
.search(&ExistsQuery::new("not_fast".to_string(), false), &Count)
|
||||
.unwrap_err()
|
||||
.to_string(),
|
||||
"Schema error: 'Field not_fast is not a fast field.'"
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
searcher
|
||||
.search(
|
||||
&ExistsQuery::new_exists_query("does_not_exists".to_string()),
|
||||
&Count
|
||||
)
|
||||
.unwrap_err()
|
||||
.to_string(),
|
||||
"The field does not exist: 'does_not_exists'"
|
||||
);
|
||||
assert_does_not_exist(&searcher, "does_not_exists", false);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn count_existing_fields(searcher: &Searcher, field: &str) -> crate::Result<usize> {
|
||||
let query = ExistsQuery::new_exists_query(field.to_string());
|
||||
fn count_existing_fields(
|
||||
searcher: &Searcher,
|
||||
field: &str,
|
||||
json_subpaths: bool,
|
||||
) -> crate::Result<usize> {
|
||||
let query = ExistsQuery::new(field.to_string(), json_subpaths);
|
||||
searcher.search(&query, &Count)
|
||||
}
|
||||
|
||||
fn assert_does_not_exist(searcher: &Searcher, field: &str, json_subpaths: bool) {
|
||||
assert_eq!(
|
||||
searcher
|
||||
.search(&ExistsQuery::new(field.to_string(), json_subpaths), &Count)
|
||||
.unwrap_err()
|
||||
.to_string(),
|
||||
format!("The field does not exist: '{}'", field)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -521,6 +521,25 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
|
||||
StreamerBuilder::new(self, AlwaysMatch)
|
||||
}
|
||||
|
||||
/// Returns a range builder filtered with a prefix.
|
||||
pub fn prefix_range<K: AsRef<[u8]>>(&self, prefix: K) -> StreamerBuilder<TSSTable> {
|
||||
let lower_bound = prefix.as_ref();
|
||||
let mut upper_bound = lower_bound.to_vec();
|
||||
for idx in (0..upper_bound.len()).rev() {
|
||||
if upper_bound[idx] == 255 {
|
||||
upper_bound.pop();
|
||||
} else {
|
||||
upper_bound[idx] += 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
let mut builder = self.range().ge(lower_bound);
|
||||
if !upper_bound.is_empty() {
|
||||
builder = builder.lt(upper_bound);
|
||||
}
|
||||
builder
|
||||
}
|
||||
|
||||
/// A stream of all the sorted terms.
|
||||
pub fn stream(&self) -> io::Result<Streamer<TSSTable>> {
|
||||
self.range().into_stream()
|
||||
@@ -928,4 +947,62 @@ mod tests {
|
||||
}
|
||||
assert!(!stream.advance());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prefix() {
|
||||
let (dic, _slice) = make_test_sstable();
|
||||
{
|
||||
let mut stream = dic.prefix_range("1").into_stream().unwrap();
|
||||
for i in 0x10000..0x20000 {
|
||||
assert!(stream.advance());
|
||||
assert_eq!(stream.term_ord(), i);
|
||||
assert_eq!(stream.value(), &i);
|
||||
assert_eq!(stream.key(), format!("{i:05X}").into_bytes());
|
||||
}
|
||||
assert!(!stream.advance());
|
||||
}
|
||||
{
|
||||
let mut stream = dic.prefix_range("").into_stream().unwrap();
|
||||
for i in 0..0x3ffff {
|
||||
assert!(stream.advance(), "failed at {i:05X}");
|
||||
assert_eq!(stream.term_ord(), i);
|
||||
assert_eq!(stream.value(), &i);
|
||||
assert_eq!(stream.key(), format!("{i:05X}").into_bytes());
|
||||
}
|
||||
assert!(!stream.advance());
|
||||
}
|
||||
{
|
||||
let mut stream = dic.prefix_range("0FF").into_stream().unwrap();
|
||||
for i in 0x0ff00..=0x0ffff {
|
||||
assert!(stream.advance(), "failed at {i:05X}");
|
||||
assert_eq!(stream.term_ord(), i);
|
||||
assert_eq!(stream.value(), &i);
|
||||
assert_eq!(stream.key(), format!("{i:05X}").into_bytes());
|
||||
}
|
||||
assert!(!stream.advance());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prefix_edge() {
|
||||
let dict = {
|
||||
let mut builder = Dictionary::<MonotonicU64SSTable>::builder(Vec::new()).unwrap();
|
||||
builder.insert(&[0, 254], &0).unwrap();
|
||||
builder.insert(&[0, 255], &1).unwrap();
|
||||
builder.insert(&[0, 255, 12], &2).unwrap();
|
||||
builder.insert(&[1], &2).unwrap();
|
||||
builder.insert(&[1, 0], &2).unwrap();
|
||||
let table = builder.finish().unwrap();
|
||||
let table = Arc::new(PermissionedHandle::new(table));
|
||||
let slice = common::file_slice::FileSlice::new(table.clone());
|
||||
Dictionary::<MonotonicU64SSTable>::open(slice).unwrap()
|
||||
};
|
||||
|
||||
let mut stream = dict.prefix_range(&[0, 255]).into_stream().unwrap();
|
||||
assert!(stream.advance());
|
||||
assert_eq!(stream.key(), &[0, 255]);
|
||||
assert!(stream.advance());
|
||||
assert_eq!(stream.key(), &[0, 255, 12]);
|
||||
assert!(!stream.advance());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,7 +26,7 @@ path = "example/hashmap.rs"
|
||||
[dev-dependencies]
|
||||
rand = "0.8.5"
|
||||
zipf = "7.0.0"
|
||||
rustc-hash = "1.1.0"
|
||||
rustc-hash = "2.1.0"
|
||||
proptest = "1.2.0"
|
||||
binggan = { version = "0.14.0" }
|
||||
|
||||
|
||||
Reference in New Issue
Block a user