Compare commits

..

1 Commits

Author SHA1 Message Date
Pascal Seitz
4a9262cd2c accept * as field name 2024-12-06 09:42:18 +01:00
16 changed files with 151 additions and 576 deletions

View File

@@ -53,9 +53,8 @@ rayon = "1.5.2"
lru = "0.12.0"
fastdivide = "0.4.0"
itertools = "0.13.0"
measure_time = "0.9.0"
measure_time = "0.8.2"
arc-swap = "1.5.0"
bon = "3.3.1"
columnar = { version = "0.3", path = "./columnar", package = "tantivy-columnar" }
sstable = { version = "0.3", path = "./sstable", package = "tantivy-sstable", optional = true }

View File

@@ -1,7 +1,6 @@
use std::{fmt, io, mem};
use common::file_slice::FileSlice;
use common::json_path_writer::JSON_PATH_SEGMENT_SEP;
use common::BinarySerializable;
use sstable::{Dictionary, RangeSSTable};
@@ -77,19 +76,6 @@ fn read_all_columns_in_stream(
Ok(results)
}
fn column_dictionary_prefix_for_column_name(column_name: &str) -> String {
// Each column is a associated to a given `column_key`,
// that starts by `column_name\0column_header`.
//
// Listing the columns associated to the given column name is therefore equivalent to
// listing `column_key` with the prefix `column_name\0`.
format!("{}{}", column_name, '\0')
}
fn column_dictionary_prefix_for_subpath(root_path: &str) -> String {
format!("{}{}", root_path, JSON_PATH_SEGMENT_SEP as char)
}
impl ColumnarReader {
/// Opens a new Columnar file.
pub fn open<F>(file_slice: F) -> io::Result<ColumnarReader>
@@ -158,14 +144,32 @@ impl ColumnarReader {
Ok(self.iter_columns()?.collect())
}
fn stream_for_column_range(&self, column_name: &str) -> sstable::StreamerBuilder<RangeSSTable> {
// Each column is a associated to a given `column_key`,
// that starts by `column_name\0column_header`.
//
// Listing the columns associated to the given column name is therefore equivalent to
// listing `column_key` with the prefix `column_name\0`.
//
// This is in turn equivalent to searching for the range
// `[column_name,\0`..column_name\1)`.
// TODO can we get some more generic `prefix(..)` logic in the dictionary.
let mut start_key = column_name.to_string();
start_key.push('\0');
let mut end_key = column_name.to_string();
end_key.push(1u8 as char);
self.column_dictionary
.range()
.ge(start_key.as_bytes())
.lt(end_key.as_bytes())
}
pub async fn read_columns_async(
&self,
column_name: &str,
) -> io::Result<Vec<DynamicColumnHandle>> {
let prefix = column_dictionary_prefix_for_column_name(column_name);
let stream = self
.column_dictionary
.prefix_range(prefix)
.stream_for_column_range(column_name)
.into_stream_async()
.await?;
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
@@ -176,35 +180,7 @@ impl ColumnarReader {
/// There can be more than one column associated to a given column name, provided they have
/// different types.
pub fn read_columns(&self, column_name: &str) -> io::Result<Vec<DynamicColumnHandle>> {
let prefix = column_dictionary_prefix_for_column_name(column_name);
let stream = self.column_dictionary.prefix_range(prefix).into_stream()?;
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
}
pub async fn read_subpath_columns_async(
&self,
root_path: &str,
) -> io::Result<Vec<DynamicColumnHandle>> {
let prefix = column_dictionary_prefix_for_subpath(root_path);
let stream = self
.column_dictionary
.prefix_range(prefix)
.into_stream_async()
.await?;
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
}
/// Get all inner columns for a given JSON prefix, i.e columns for which the name starts
/// with the prefix then contain the [`JSON_PATH_SEGMENT_SEP`].
///
/// There can be more than one column associated to each path within the JSON structure,
/// provided they have different types.
pub fn read_subpath_columns(&self, root_path: &str) -> io::Result<Vec<DynamicColumnHandle>> {
let prefix = column_dictionary_prefix_for_subpath(root_path);
let stream = self
.column_dictionary
.prefix_range(prefix.as_bytes())
.into_stream()?;
let stream = self.stream_for_column_range(column_name).into_stream()?;
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
}
@@ -216,8 +192,6 @@ impl ColumnarReader {
#[cfg(test)]
mod tests {
use common::json_path_writer::JSON_PATH_SEGMENT_SEP;
use crate::{ColumnType, ColumnarReader, ColumnarWriter};
#[test]
@@ -250,64 +224,6 @@ mod tests {
assert_eq!(columns[0].1.column_type(), ColumnType::U64);
}
#[test]
fn test_read_columns() {
let mut columnar_writer = ColumnarWriter::default();
columnar_writer.record_column_type("col", ColumnType::U64, false);
columnar_writer.record_numerical(1, "col", 1u64);
let mut buffer = Vec::new();
columnar_writer.serialize(2, &mut buffer).unwrap();
let columnar = ColumnarReader::open(buffer).unwrap();
{
let columns = columnar.read_columns("col").unwrap();
assert_eq!(columns.len(), 1);
assert_eq!(columns[0].column_type(), ColumnType::U64);
}
{
let columns = columnar.read_columns("other").unwrap();
assert_eq!(columns.len(), 0);
}
}
#[test]
fn test_read_subpath_columns() {
let mut columnar_writer = ColumnarWriter::default();
columnar_writer.record_str(
0,
&format!("col1{}subcol1", JSON_PATH_SEGMENT_SEP as char),
"hello",
);
columnar_writer.record_numerical(
0,
&format!("col1{}subcol2", JSON_PATH_SEGMENT_SEP as char),
1i64,
);
columnar_writer.record_str(1, "col1", "hello");
columnar_writer.record_str(0, "col2", "hello");
let mut buffer = Vec::new();
columnar_writer.serialize(2, &mut buffer).unwrap();
let columnar = ColumnarReader::open(buffer).unwrap();
{
let columns = columnar.read_subpath_columns("col1").unwrap();
assert_eq!(columns.len(), 2);
assert_eq!(columns[0].column_type(), ColumnType::Str);
assert_eq!(columns[1].column_type(), ColumnType::I64);
}
{
let columns = columnar.read_subpath_columns("col1.subcol1").unwrap();
assert_eq!(columns.len(), 0);
}
{
let columns = columnar.read_subpath_columns("col2").unwrap();
assert_eq!(columns.len(), 0);
}
{
let columns = columnar.read_subpath_columns("other").unwrap();
assert_eq!(columns.len(), 0);
}
}
#[test]
#[should_panic(expected = "Input type forbidden")]
fn test_list_columns_strict_typing_panics_on_wrong_types() {

View File

@@ -285,6 +285,7 @@ impl ColumnarWriter {
.map(|(column_name, addr)| (column_name, ColumnType::DateTime, addr)),
);
columns.sort_unstable_by_key(|(column_name, col_type, _)| (*column_name, *col_type));
let (arena, buffers, dictionaries) = (&self.arena, &mut self.buffers, &self.dictionaries);
let mut symbol_byte_buffer: Vec<u8> = Vec::new();
for (column_name, column_type, addr) in columns {

View File

@@ -6,7 +6,7 @@ use nom::bytes::complete::tag;
use nom::character::complete::{
anychar, char, digit1, multispace0, multispace1, none_of, one_of, satisfy, u32,
};
use nom::combinator::{eof, map, map_res, opt, peek, recognize, value, verify};
use nom::combinator::{eof, map, map_res, not, opt, peek, recognize, value, verify};
use nom::error::{Error, ErrorKind};
use nom::multi::{many0, many1, separated_list0};
use nom::sequence::{delimited, preceded, separated_pair, terminated, tuple};
@@ -20,7 +20,7 @@ use crate::Occur;
// Note: '-' char is only forbidden at the beginning of a field name, would be clearer to add it to
// special characters.
const SPECIAL_CHARS: &[char] = &[
'+', '^', '`', ':', '{', '}', '"', '\'', '[', ']', '(', ')', '!', '\\', '*', ' ',
'+', '^', '`', ':', '{', '}', '"', '\'', '[', ']', '(', ')', '!', '\\', ' ',
];
/// consume a field name followed by colon. Return the field name with escape sequence
@@ -679,7 +679,10 @@ fn negate(expr: UserInputAst) -> UserInputAst {
fn leaf(inp: &str) -> IResult<&str, UserInputAst> {
alt((
delimited(char('('), ast, char(')')),
map(char('*'), |_| UserInputAst::from(UserInputLeaf::All)),
preceded(
peek(not(tag("*:"))),
map(char('*'), |_| UserInputAst::from(UserInputLeaf::All)),
),
map(preceded(tuple((tag("NOT"), multispace1)), leaf), negate),
literal,
))(inp)
@@ -700,7 +703,13 @@ fn leaf_infallible(inp: &str) -> JResult<&str, Option<UserInputAst>> {
),
),
(
value((), char('*')),
value(
(),
preceded(
peek(not(tag("*:"))), // Fail if `*:` is detected
char('*'), // Match standalone `*`
),
),
map(nothing, |_| {
(Some(UserInputAst::from(UserInputLeaf::All)), Vec::new())
}),
@@ -1222,6 +1231,7 @@ mod test {
#[test]
fn test_field_name() {
assert_eq!(super::field_name("*:a"), Ok(("a", "*".to_string())));
assert_eq!(
super::field_name(".my.field.name:a"),
Ok(("a", ".my.field.name".to_string()))
@@ -1527,6 +1537,11 @@ mod test {
test_parse_query_to_ast_helper("abc:toto", "\"abc\":toto");
}
#[test]
fn all_field_star() {
test_parse_query_to_ast_helper("*:toto", "\"*\":toto");
}
#[test]
fn test_phrase_with_field() {
test_parse_query_to_ast_helper("abc:\"happy tax payer\"", "\"abc\":\"happy tax payer\"");

View File

@@ -271,6 +271,10 @@ impl AggregationWithAccessor {
field: ref field_name,
..
})
| Count(CountAggregation {
field: ref field_name,
..
})
| Max(MaxAggregation {
field: ref field_name,
..
@@ -295,24 +299,6 @@ impl AggregationWithAccessor {
get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?;
add_agg_with_accessor(&agg, accessor, column_type, &mut res)?;
}
Count(CountAggregation {
field: ref field_name,
..
}) => {
let allowed_column_types = [
ColumnType::I64,
ColumnType::U64,
ColumnType::F64,
ColumnType::Str,
ColumnType::DateTime,
ColumnType::Bool,
ColumnType::IpAddr,
// ColumnType::Bytes Unsupported
];
let (accessor, column_type) =
get_ff_reader(reader, field_name, Some(&allowed_column_types))?;
add_agg_with_accessor(&agg, accessor, column_type, &mut res)?;
}
Percentiles(ref percentiles) => {
let (accessor, column_type) = get_ff_reader(
reader,

View File

@@ -220,23 +220,9 @@ impl SegmentStatsCollector {
.column_block_accessor
.fetch_block(docs, &agg_accessor.accessor);
}
if [
ColumnType::I64,
ColumnType::U64,
ColumnType::F64,
ColumnType::DateTime,
]
.contains(&self.field_type)
{
for val in agg_accessor.column_block_accessor.iter_vals() {
let val1 = f64_from_fastfield_u64(val, &self.field_type);
self.stats.collect(val1);
}
} else {
for _val in agg_accessor.column_block_accessor.iter_vals() {
// we ignore the value and simply record that we got something
self.stats.collect(0.0);
}
for val in agg_accessor.column_block_accessor.iter_vals() {
let val1 = f64_from_fastfield_u64(val, &self.field_type);
self.stats.collect(val1);
}
}
}
@@ -449,11 +435,6 @@ mod tests {
"field": "score",
},
},
"count_str": {
"value_count": {
"field": "text",
},
},
"range": range_agg
}))
.unwrap();
@@ -519,13 +500,6 @@ mod tests {
})
);
assert_eq!(
res["count_str"],
json!({
"value": 7.0,
})
);
Ok(())
}

View File

@@ -578,7 +578,7 @@ mod tests {
.set_indexing_options(
TextFieldIndexing::default().set_index_option(IndexRecordOption::WithFreqs),
)
.set_fast(Some("raw"))
.set_fast(None)
.set_stored();
let text_field = schema_builder.add_text_field("text", text_fieldtype);
let date_field = schema_builder.add_date_field("date", FAST);

View File

@@ -217,7 +217,7 @@ impl FastFieldReaders {
Ok(dynamic_column.into())
}
/// Returns a `dynamic_column_handle`.
/// Returning a `dynamic_column_handle`.
pub fn dynamic_column_handle(
&self,
field_name: &str,
@@ -234,7 +234,7 @@ impl FastFieldReaders {
Ok(dynamic_column_handle_opt)
}
/// Returns all `dynamic_column_handle` that match the given field name.
/// Returning all `dynamic_column_handle`.
pub fn dynamic_column_handles(
&self,
field_name: &str,
@@ -250,22 +250,6 @@ impl FastFieldReaders {
Ok(dynamic_column_handles)
}
/// Returns all `dynamic_column_handle` that are inner fields of the provided JSON path.
pub fn dynamic_subpath_column_handles(
&self,
root_path: &str,
) -> crate::Result<Vec<DynamicColumnHandle>> {
let Some(resolved_field_name) = self.resolve_field(root_path)? else {
return Ok(Vec::new());
};
let dynamic_column_handles = self
.columnar
.read_subpath_columns(&resolved_field_name)?
.into_iter()
.collect();
Ok(dynamic_column_handles)
}
#[doc(hidden)]
pub async fn list_dynamic_column_handles(
&self,
@@ -281,21 +265,6 @@ impl FastFieldReaders {
Ok(columns)
}
#[doc(hidden)]
pub async fn list_subpath_dynamic_column_handles(
&self,
root_path: &str,
) -> crate::Result<Vec<DynamicColumnHandle>> {
let Some(resolved_field_name) = self.resolve_field(root_path)? else {
return Ok(Vec::new());
};
let columns = self
.columnar
.read_subpath_columns_async(&resolved_field_name)
.await?;
Ok(columns)
}
/// Returns the `u64` column used to represent any `u64`-mapped typed (String/Bytes term ids,
/// i64, u64, f64, DateTime).
///
@@ -507,15 +476,6 @@ mod tests {
.iter()
.any(|column| column.column_type() == ColumnType::Str));
let json_columns = fast_fields.dynamic_column_handles("json").unwrap();
assert_eq!(json_columns.len(), 0);
let json_subcolumns = fast_fields.dynamic_subpath_column_handles("json").unwrap();
assert_eq!(json_subcolumns.len(), 3);
let foo_subcolumns = fast_fields
.dynamic_subpath_column_handles("json.foo")
.unwrap();
assert_eq!(foo_subcolumns.len(), 0);
println!("*** {:?}", fast_fields.columnar().list_columns());
}
}

View File

@@ -15,9 +15,7 @@ use crate::directory::MmapDirectory;
use crate::directory::{Directory, ManagedDirectory, RamDirectory, INDEX_WRITER_LOCK};
use crate::error::{DataCorruption, TantivyError};
use crate::index::{IndexMeta, SegmentId, SegmentMeta, SegmentMetaInventory};
use crate::indexer::index_writer::{
IndexWriterOptions, MAX_NUM_THREAD, MEMORY_BUDGET_NUM_BYTES_MIN,
};
use crate::indexer::index_writer::{MAX_NUM_THREAD, MEMORY_BUDGET_NUM_BYTES_MIN};
use crate::indexer::segment_updater::save_metas;
use crate::indexer::{IndexWriter, SingleSegmentIndexWriter};
use crate::reader::{IndexReader, IndexReaderBuilder};
@@ -521,43 +519,6 @@ impl Index {
load_metas(self.directory(), &self.inventory)
}
/// Open a new index writer with the given options. Attempts to acquire a lockfile.
///
/// The lockfile should be deleted on drop, but it is possible
/// that due to a panic or other error, a stale lockfile will be
/// left in the index directory. If you are sure that no other
/// `IndexWriter` on the system is accessing the index directory,
/// it is safe to manually delete the lockfile.
///
/// - `options` defines the writer configuration which includes things like buffer sizes,
/// indexer threads, etc...
///
/// # Errors
/// If the lockfile already exists, returns `TantivyError::LockFailure`.
/// If the memory arena per thread is too small or too big, returns
/// `TantivyError::InvalidArgument`
pub fn writer_with_options<D: Document>(
&self,
options: IndexWriterOptions,
) -> crate::Result<IndexWriter<D>> {
let directory_lock = self
.directory
.acquire_lock(&INDEX_WRITER_LOCK)
.map_err(|err| {
TantivyError::LockFailure(
err,
Some(
"Failed to acquire index lock. If you are using a regular directory, this \
means there is already an `IndexWriter` working on this `Directory`, in \
this process or in a different process."
.to_string(),
),
)
})?;
IndexWriter::new(self, options, directory_lock)
}
/// Open a new index writer. Attempts to acquire a lockfile.
///
/// The lockfile should be deleted on drop, but it is possible
@@ -582,12 +543,27 @@ impl Index {
num_threads: usize,
overall_memory_budget_in_bytes: usize,
) -> crate::Result<IndexWriter<D>> {
let directory_lock = self
.directory
.acquire_lock(&INDEX_WRITER_LOCK)
.map_err(|err| {
TantivyError::LockFailure(
err,
Some(
"Failed to acquire index lock. If you are using a regular directory, this \
means there is already an `IndexWriter` working on this `Directory`, in \
this process or in a different process."
.to_string(),
),
)
})?;
let memory_arena_in_bytes_per_thread = overall_memory_budget_in_bytes / num_threads;
let options = IndexWriterOptions::builder()
.num_worker_threads(num_threads)
.memory_budget_per_thread(memory_arena_in_bytes_per_thread)
.build();
self.writer_with_options(options)
IndexWriter::new(
self,
num_threads,
memory_arena_in_bytes_per_thread,
directory_lock,
)
}
/// Helper to create an index writer for tests.

View File

@@ -45,23 +45,6 @@ fn error_in_index_worker_thread(context: &str) -> TantivyError {
))
}
#[derive(Clone, bon::Builder)]
/// A builder for creating a new [IndexWriter] for an index.
pub struct IndexWriterOptions {
#[builder(default = MEMORY_BUDGET_NUM_BYTES_MIN)]
/// The memory budget per indexer thread.
///
/// When an indexer thread has buffered this much data in memory
/// it will flush the segment to disk (although this is not searchable until commit is called.)
memory_budget_per_thread: usize,
#[builder(default = 1)]
/// The number of indexer worker threads to use.
num_worker_threads: usize,
#[builder(default = 4)]
/// Defines the number of merger threads to use.
num_merge_threads: usize,
}
/// `IndexWriter` is the user entry-point to add document to an index.
///
/// It manages a small number of indexing thread, as well as a shared
@@ -75,7 +58,8 @@ pub struct IndexWriter<D: Document = TantivyDocument> {
index: Index,
options: IndexWriterOptions,
// The memory budget per thread, after which a commit is triggered.
memory_budget_in_bytes_per_thread: usize,
workers_join_handle: Vec<JoinHandle<crate::Result<()>>>,
@@ -86,6 +70,8 @@ pub struct IndexWriter<D: Document = TantivyDocument> {
worker_id: usize,
num_threads: usize,
delete_queue: DeleteQueue,
stamper: Stamper,
@@ -279,27 +265,23 @@ impl<D: Document> IndexWriter<D> {
/// `TantivyError::InvalidArgument`
pub(crate) fn new(
index: &Index,
options: IndexWriterOptions,
num_threads: usize,
memory_budget_in_bytes_per_thread: usize,
directory_lock: DirectoryLock,
) -> crate::Result<Self> {
if options.memory_budget_per_thread < MEMORY_BUDGET_NUM_BYTES_MIN {
if memory_budget_in_bytes_per_thread < MEMORY_BUDGET_NUM_BYTES_MIN {
let err_msg = format!(
"The memory arena in bytes per thread needs to be at least \
{MEMORY_BUDGET_NUM_BYTES_MIN}."
);
return Err(TantivyError::InvalidArgument(err_msg));
}
if options.memory_budget_per_thread >= MEMORY_BUDGET_NUM_BYTES_MAX {
if memory_budget_in_bytes_per_thread >= MEMORY_BUDGET_NUM_BYTES_MAX {
let err_msg = format!(
"The memory arena in bytes per thread cannot exceed {MEMORY_BUDGET_NUM_BYTES_MAX}"
);
return Err(TantivyError::InvalidArgument(err_msg));
}
if options.num_worker_threads == 0 {
let err_msg = "At least one worker thread is required, got 0".to_string();
return Err(TantivyError::InvalidArgument(err_msg));
}
let (document_sender, document_receiver) =
crossbeam_channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
@@ -309,17 +291,13 @@ impl<D: Document> IndexWriter<D> {
let stamper = Stamper::new(current_opstamp);
let segment_updater = SegmentUpdater::create(
index.clone(),
stamper.clone(),
&delete_queue.cursor(),
options.num_merge_threads,
)?;
let segment_updater =
SegmentUpdater::create(index.clone(), stamper.clone(), &delete_queue.cursor())?;
let mut index_writer = Self {
_directory_lock: Some(directory_lock),
options: options.clone(),
memory_budget_in_bytes_per_thread,
index: index.clone(),
index_writer_status: IndexWriterStatus::from(document_receiver),
operation_sender: document_sender,
@@ -327,6 +305,7 @@ impl<D: Document> IndexWriter<D> {
segment_updater,
workers_join_handle: vec![],
num_threads,
delete_queue,
@@ -419,7 +398,7 @@ impl<D: Document> IndexWriter<D> {
let mut delete_cursor = self.delete_queue.cursor();
let mem_budget = self.options.memory_budget_per_thread;
let mem_budget = self.memory_budget_in_bytes_per_thread;
let index = self.index.clone();
let join_handle: JoinHandle<crate::Result<()>> = thread::Builder::new()
.name(format!("thrd-tantivy-index{}", self.worker_id))
@@ -472,7 +451,7 @@ impl<D: Document> IndexWriter<D> {
}
fn start_workers(&mut self) -> crate::Result<()> {
for _ in 0..self.options.num_worker_threads {
for _ in 0..self.num_threads {
self.add_indexing_worker()?;
}
Ok(())
@@ -574,7 +553,12 @@ impl<D: Document> IndexWriter<D> {
.take()
.expect("The IndexWriter does not have any lock. This is a bug, please report.");
let new_index_writer = IndexWriter::new(&self.index, self.options.clone(), directory_lock)?;
let new_index_writer = IndexWriter::new(
&self.index,
self.num_threads,
self.memory_budget_in_bytes_per_thread,
directory_lock,
)?;
// the current `self` is dropped right away because of this call.
//
@@ -828,7 +812,7 @@ mod tests {
use crate::directory::error::LockError;
use crate::error::*;
use crate::indexer::index_writer::MEMORY_BUDGET_NUM_BYTES_MIN;
use crate::indexer::{IndexWriterOptions, NoMergePolicy};
use crate::indexer::NoMergePolicy;
use crate::query::{QueryParser, TermQuery};
use crate::schema::{
self, Facet, FacetOptions, IndexRecordOption, IpAddrOptions, JsonObjectOptions,
@@ -2549,36 +2533,4 @@ mod tests {
index_writer.commit().unwrap();
Ok(())
}
#[test]
fn test_writer_options_validation() {
let mut schema_builder = Schema::builder();
let field = schema_builder.add_bool_field("example", STORED);
let index = Index::create_in_ram(schema_builder.build());
let opt_wo_threads = IndexWriterOptions::builder().num_worker_threads(0).build();
let result = index.writer_with_options::<TantivyDocument>(opt_wo_threads);
assert!(result.is_err(), "Writer should reject 0 thread count");
assert!(matches!(result, Err(TantivyError::InvalidArgument(_))));
let opt_with_low_memory = IndexWriterOptions::builder()
.memory_budget_per_thread(10 << 10)
.build();
let result = index.writer_with_options::<TantivyDocument>(opt_with_low_memory);
assert!(
result.is_err(),
"Writer should reject options with too low memory size"
);
assert!(matches!(result, Err(TantivyError::InvalidArgument(_))));
let opt_with_low_memory = IndexWriterOptions::builder()
.memory_budget_per_thread(5 << 30)
.build();
let result = index.writer_with_options::<TantivyDocument>(opt_with_low_memory);
assert!(
result.is_err(),
"Writer should reject options with too high memory size"
);
assert!(matches!(result, Err(TantivyError::InvalidArgument(_))));
}
}

View File

@@ -31,7 +31,7 @@ mod stamper;
use crossbeam_channel as channel;
use smallvec::SmallVec;
pub use self::index_writer::{IndexWriter, IndexWriterOptions};
pub use self::index_writer::IndexWriter;
pub use self::log_merge_policy::LogMergePolicy;
pub use self::merge_operation::MergeOperation;
pub use self::merge_policy::{MergeCandidate, MergePolicy, NoMergePolicy};

View File

@@ -25,6 +25,8 @@ use crate::indexer::{
};
use crate::{FutureResult, Opstamp};
const NUM_MERGE_THREADS: usize = 4;
/// Save the index meta file.
/// This operation is atomic:
/// Either
@@ -271,7 +273,6 @@ impl SegmentUpdater {
index: Index,
stamper: Stamper,
delete_cursor: &DeleteCursor,
num_merge_threads: usize,
) -> crate::Result<SegmentUpdater> {
let segments = index.searchable_segment_metas()?;
let segment_manager = SegmentManager::from_segments(segments, delete_cursor);
@@ -286,18 +287,7 @@ impl SegmentUpdater {
})?;
let merge_thread_pool = ThreadPoolBuilder::new()
.thread_name(|i| format!("merge_thread_{i}"))
.num_threads(num_merge_threads)
.panic_handler(move |panic| {
let message = if let Some(msg) = panic.downcast_ref::<&str>() {
*msg
} else if let Some(msg) = panic.downcast_ref::<String>() {
msg.as_str()
} else {
"UNKNOWN"
};
eprintln!("merge thread panicked with: {message}")
})
.num_threads(NUM_MERGE_THREADS)
.build()
.map_err(|_| {
crate::TantivyError::SystemError(

View File

@@ -422,7 +422,6 @@ mod tests {
use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use columnar::ColumnType;
use tempfile::TempDir;
use crate::collector::{Count, TopDocs};
@@ -432,15 +431,15 @@ mod tests {
use crate::query::{PhraseQuery, QueryParser};
use crate::schema::{
Document, IndexRecordOption, OwnedValue, Schema, TextFieldIndexing, TextOptions, Value,
DATE_TIME_PRECISION_INDEXED, FAST, STORED, STRING, TEXT,
DATE_TIME_PRECISION_INDEXED, STORED, STRING, TEXT,
};
use crate::store::{Compressor, StoreReader, StoreWriter};
use crate::time::format_description::well_known::Rfc3339;
use crate::time::OffsetDateTime;
use crate::tokenizer::{PreTokenizedString, Token};
use crate::{
DateTime, Directory, DocAddress, DocSet, Index, IndexWriter, SegmentReader,
TantivyDocument, Term, TERMINATED,
DateTime, Directory, DocAddress, DocSet, Index, IndexWriter, TantivyDocument, Term,
TERMINATED,
};
#[test]
@@ -842,75 +841,6 @@ mod tests {
assert_eq!(searcher.search(&phrase_query, &Count).unwrap(), 0);
}
#[test]
fn test_json_fast() {
let mut schema_builder = Schema::builder();
let json_field = schema_builder.add_json_field("json", FAST);
let schema = schema_builder.build();
let json_val: serde_json::Value = serde_json::from_str(
r#"{
"toto": "titi",
"float": -0.2,
"bool": true,
"unsigned": 1,
"signed": -2,
"complexobject": {
"field.with.dot": 1
},
"date": "1985-04-12T23:20:50.52Z",
"my_arr": [2, 3, {"my_key": "two tokens"}, 4]
}"#,
)
.unwrap();
let doc = doc!(json_field=>json_val.clone());
let index = Index::create_in_ram(schema.clone());
let mut writer = index.writer_for_tests().unwrap();
writer.add_document(doc).unwrap();
writer.commit().unwrap();
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let segment_reader = searcher.segment_reader(0u32);
fn assert_type(reader: &SegmentReader, field: &str, typ: ColumnType) {
let cols = reader.fast_fields().dynamic_column_handles(field).unwrap();
assert_eq!(cols.len(), 1, "{}", field);
assert_eq!(cols[0].column_type(), typ, "{}", field);
}
assert_type(segment_reader, "json.toto", ColumnType::Str);
assert_type(segment_reader, "json.float", ColumnType::F64);
assert_type(segment_reader, "json.bool", ColumnType::Bool);
assert_type(segment_reader, "json.unsigned", ColumnType::I64);
assert_type(segment_reader, "json.signed", ColumnType::I64);
assert_type(
segment_reader,
"json.complexobject.field\\.with\\.dot",
ColumnType::I64,
);
assert_type(segment_reader, "json.date", ColumnType::DateTime);
assert_type(segment_reader, "json.my_arr", ColumnType::I64);
assert_type(segment_reader, "json.my_arr.my_key", ColumnType::Str);
fn assert_empty(reader: &SegmentReader, field: &str) {
let cols = reader.fast_fields().dynamic_column_handles(field).unwrap();
assert_eq!(cols.len(), 0);
}
assert_empty(segment_reader, "unknown");
assert_empty(segment_reader, "json");
assert_empty(segment_reader, "json.toto.titi");
let sub_columns = segment_reader
.fast_fields()
.dynamic_subpath_column_handles("json")
.unwrap();
assert_eq!(sub_columns.len(), 9);
let subsub_columns = segment_reader
.fast_fields()
.dynamic_subpath_column_handles("json.complexobject")
.unwrap();
assert_eq!(subsub_columns.len(), 1);
}
#[test]
fn test_json_term_with_numeric_merge_panic_regression_bug_2283() {
// https://github.com/quickwit-oss/tantivy/issues/2283

View File

@@ -7,32 +7,14 @@ use crate::docset::{DocSet, TERMINATED};
use crate::index::SegmentReader;
use crate::query::explanation::does_not_match;
use crate::query::{EnableScoring, Explanation, Query, Scorer, Weight};
use crate::schema::Type;
use crate::{DocId, Score, TantivyError};
/// Query that matches all documents with a non-null value in the specified
/// field.
///
/// When querying inside a JSON field, "exists" queries can be executed strictly
/// on the field name or check all the subpaths. In that second case a document
/// will be matched if a non-null value exists in any subpath. For example,
/// assuming the following document where `myfield` is a JSON fast field:
/// ```json
/// {
/// "myfield": {
/// "mysubfield": "hello"
/// }
/// }
/// ```
/// With `json_subpaths` enabled queries on either `myfield` or
/// `myfield.mysubfield` will match the document. If it is set to false, only
/// `myfield.mysubfield` will match it.
/// Query that matches all documents with a non-null value in the specified field.
///
/// All of the matched documents get the score 1.0.
#[derive(Clone, Debug)]
pub struct ExistsQuery {
field_name: String,
json_subpaths: bool,
}
impl ExistsQuery {
@@ -41,28 +23,8 @@ impl ExistsQuery {
/// This query matches all documents with at least one non-null value in the specified field.
/// This constructor never fails, but executing the search with this query will return an
/// error if the specified field doesn't exists or is not a fast field.
#[deprecated]
pub fn new_exists_query(field: String) -> ExistsQuery {
ExistsQuery {
field_name: field,
json_subpaths: false,
}
}
/// Creates a new `ExistQuery` from the given field.
///
/// This query matches all documents with at least one non-null value in the
/// specified field. If `json_subpaths` is set to true, documents with
/// non-null values in any JSON subpath will also be matched.
///
/// This constructor never fails, but executing the search with this query will
/// return an error if the specified field doesn't exists or is not a fast
/// field.
pub fn new(field: String, json_subpaths: bool) -> Self {
Self {
field_name: field,
json_subpaths,
}
ExistsQuery { field_name: field }
}
}
@@ -81,8 +43,6 @@ impl Query for ExistsQuery {
}
Ok(Box::new(ExistsWeight {
field_name: self.field_name.clone(),
field_type: field_type.value_type(),
json_subpaths: self.json_subpaths,
}))
}
}
@@ -90,20 +50,13 @@ impl Query for ExistsQuery {
/// Weight associated with the `ExistsQuery` query.
pub struct ExistsWeight {
field_name: String,
field_type: Type,
json_subpaths: bool,
}
impl Weight for ExistsWeight {
fn scorer(&self, reader: &SegmentReader, boost: Score) -> crate::Result<Box<dyn Scorer>> {
let fast_field_reader = reader.fast_fields();
let mut column_handles = fast_field_reader.dynamic_column_handles(&self.field_name)?;
if self.field_type == Type::Json && self.json_subpaths {
let mut sub_columns =
fast_field_reader.dynamic_subpath_column_handles(&self.field_name)?;
column_handles.append(&mut sub_columns);
}
let dynamic_columns: crate::Result<Vec<DynamicColumn>> = column_handles
let dynamic_columns: crate::Result<Vec<DynamicColumn>> = fast_field_reader
.dynamic_column_handles(&self.field_name)?
.into_iter()
.map(|handle| handle.open().map_err(|io_error| io_error.into()))
.collect();
@@ -227,12 +180,11 @@ mod tests {
let reader = index.reader()?;
let searcher = reader.searcher();
assert_eq!(count_existing_fields(&searcher, "all", false)?, 100);
assert_eq!(count_existing_fields(&searcher, "odd", false)?, 50);
assert_eq!(count_existing_fields(&searcher, "even", false)?, 50);
assert_eq!(count_existing_fields(&searcher, "multi", false)?, 10);
assert_eq!(count_existing_fields(&searcher, "multi", true)?, 10);
assert_eq!(count_existing_fields(&searcher, "never", false)?, 0);
assert_eq!(count_existing_fields(&searcher, "all")?, 100);
assert_eq!(count_existing_fields(&searcher, "odd")?, 50);
assert_eq!(count_existing_fields(&searcher, "even")?, 50);
assert_eq!(count_existing_fields(&searcher, "multi")?, 10);
assert_eq!(count_existing_fields(&searcher, "never")?, 0);
// exercise seek
let query = BooleanQuery::intersection(vec![
@@ -240,7 +192,7 @@ mod tests {
Bound::Included(Term::from_field_u64(all_field, 50)),
Bound::Unbounded,
)),
Box::new(ExistsQuery::new("even".to_string(), false)),
Box::new(ExistsQuery::new_exists_query("even".to_string())),
]);
assert_eq!(searcher.search(&query, &Count)?, 25);
@@ -249,7 +201,7 @@ mod tests {
Bound::Included(Term::from_field_u64(all_field, 0)),
Bound::Included(Term::from_field_u64(all_field, 50)),
)),
Box::new(ExistsQuery::new("odd".to_string(), false)),
Box::new(ExistsQuery::new_exists_query("odd".to_string())),
]);
assert_eq!(searcher.search(&query, &Count)?, 25);
@@ -278,18 +230,22 @@ mod tests {
let reader = index.reader()?;
let searcher = reader.searcher();
assert_eq!(count_existing_fields(&searcher, "json.all", false)?, 100);
assert_eq!(count_existing_fields(&searcher, "json.even", false)?, 50);
assert_eq!(count_existing_fields(&searcher, "json.even", true)?, 50);
assert_eq!(count_existing_fields(&searcher, "json.odd", false)?, 50);
assert_eq!(count_existing_fields(&searcher, "json", false)?, 0);
assert_eq!(count_existing_fields(&searcher, "json", true)?, 100);
assert_eq!(count_existing_fields(&searcher, "json.all")?, 100);
assert_eq!(count_existing_fields(&searcher, "json.even")?, 50);
assert_eq!(count_existing_fields(&searcher, "json.odd")?, 50);
// Handling of non-existing fields:
assert_eq!(count_existing_fields(&searcher, "json.absent", false)?, 0);
assert_eq!(count_existing_fields(&searcher, "json.absent", true)?, 0);
assert_does_not_exist(&searcher, "does_not_exists.absent", true);
assert_does_not_exist(&searcher, "does_not_exists.absent", false);
assert_eq!(count_existing_fields(&searcher, "json.absent")?, 0);
assert_eq!(
searcher
.search(
&ExistsQuery::new_exists_query("does_not_exists.absent".to_string()),
&Count
)
.unwrap_err()
.to_string(),
"The field does not exist: 'does_not_exists.absent'"
);
Ok(())
}
@@ -328,13 +284,12 @@ mod tests {
let reader = index.reader()?;
let searcher = reader.searcher();
assert_eq!(count_existing_fields(&searcher, "bool", false)?, 50);
assert_eq!(count_existing_fields(&searcher, "bool", true)?, 50);
assert_eq!(count_existing_fields(&searcher, "bytes", false)?, 50);
assert_eq!(count_existing_fields(&searcher, "date", false)?, 50);
assert_eq!(count_existing_fields(&searcher, "f64", false)?, 50);
assert_eq!(count_existing_fields(&searcher, "ip_addr", false)?, 50);
assert_eq!(count_existing_fields(&searcher, "facet", false)?, 50);
assert_eq!(count_existing_fields(&searcher, "bool")?, 50);
assert_eq!(count_existing_fields(&searcher, "bytes")?, 50);
assert_eq!(count_existing_fields(&searcher, "date")?, 50);
assert_eq!(count_existing_fields(&searcher, "f64")?, 50);
assert_eq!(count_existing_fields(&searcher, "ip_addr")?, 50);
assert_eq!(count_existing_fields(&searcher, "facet")?, 50);
Ok(())
}
@@ -358,33 +313,31 @@ mod tests {
assert_eq!(
searcher
.search(&ExistsQuery::new("not_fast".to_string(), false), &Count)
.search(
&ExistsQuery::new_exists_query("not_fast".to_string()),
&Count
)
.unwrap_err()
.to_string(),
"Schema error: 'Field not_fast is not a fast field.'"
);
assert_does_not_exist(&searcher, "does_not_exists", false);
assert_eq!(
searcher
.search(
&ExistsQuery::new_exists_query("does_not_exists".to_string()),
&Count
)
.unwrap_err()
.to_string(),
"The field does not exist: 'does_not_exists'"
);
Ok(())
}
fn count_existing_fields(
searcher: &Searcher,
field: &str,
json_subpaths: bool,
) -> crate::Result<usize> {
let query = ExistsQuery::new(field.to_string(), json_subpaths);
fn count_existing_fields(searcher: &Searcher, field: &str) -> crate::Result<usize> {
let query = ExistsQuery::new_exists_query(field.to_string());
searcher.search(&query, &Count)
}
fn assert_does_not_exist(searcher: &Searcher, field: &str, json_subpaths: bool) {
assert_eq!(
searcher
.search(&ExistsQuery::new(field.to_string(), json_subpaths), &Count)
.unwrap_err()
.to_string(),
format!("The field does not exist: '{}'", field)
);
}
}

View File

@@ -521,25 +521,6 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
StreamerBuilder::new(self, AlwaysMatch)
}
/// Returns a range builder filtered with a prefix.
pub fn prefix_range<K: AsRef<[u8]>>(&self, prefix: K) -> StreamerBuilder<TSSTable> {
let lower_bound = prefix.as_ref();
let mut upper_bound = lower_bound.to_vec();
for idx in (0..upper_bound.len()).rev() {
if upper_bound[idx] == 255 {
upper_bound.pop();
} else {
upper_bound[idx] += 1;
break;
}
}
let mut builder = self.range().ge(lower_bound);
if !upper_bound.is_empty() {
builder = builder.lt(upper_bound);
}
builder
}
/// A stream of all the sorted terms.
pub fn stream(&self) -> io::Result<Streamer<TSSTable>> {
self.range().into_stream()
@@ -947,62 +928,4 @@ mod tests {
}
assert!(!stream.advance());
}
#[test]
fn test_prefix() {
let (dic, _slice) = make_test_sstable();
{
let mut stream = dic.prefix_range("1").into_stream().unwrap();
for i in 0x10000..0x20000 {
assert!(stream.advance());
assert_eq!(stream.term_ord(), i);
assert_eq!(stream.value(), &i);
assert_eq!(stream.key(), format!("{i:05X}").into_bytes());
}
assert!(!stream.advance());
}
{
let mut stream = dic.prefix_range("").into_stream().unwrap();
for i in 0..0x3ffff {
assert!(stream.advance(), "failed at {i:05X}");
assert_eq!(stream.term_ord(), i);
assert_eq!(stream.value(), &i);
assert_eq!(stream.key(), format!("{i:05X}").into_bytes());
}
assert!(!stream.advance());
}
{
let mut stream = dic.prefix_range("0FF").into_stream().unwrap();
for i in 0x0ff00..=0x0ffff {
assert!(stream.advance(), "failed at {i:05X}");
assert_eq!(stream.term_ord(), i);
assert_eq!(stream.value(), &i);
assert_eq!(stream.key(), format!("{i:05X}").into_bytes());
}
assert!(!stream.advance());
}
}
#[test]
fn test_prefix_edge() {
let dict = {
let mut builder = Dictionary::<MonotonicU64SSTable>::builder(Vec::new()).unwrap();
builder.insert(&[0, 254], &0).unwrap();
builder.insert(&[0, 255], &1).unwrap();
builder.insert(&[0, 255, 12], &2).unwrap();
builder.insert(&[1], &2).unwrap();
builder.insert(&[1, 0], &2).unwrap();
let table = builder.finish().unwrap();
let table = Arc::new(PermissionedHandle::new(table));
let slice = common::file_slice::FileSlice::new(table.clone());
Dictionary::<MonotonicU64SSTable>::open(slice).unwrap()
};
let mut stream = dict.prefix_range(&[0, 255]).into_stream().unwrap();
assert!(stream.advance());
assert_eq!(stream.key(), &[0, 255]);
assert!(stream.advance());
assert_eq!(stream.key(), &[0, 255, 12]);
assert!(!stream.advance());
}
}

View File

@@ -26,7 +26,7 @@ path = "example/hashmap.rs"
[dev-dependencies]
rand = "0.8.5"
zipf = "7.0.0"
rustc-hash = "2.1.0"
rustc-hash = "1.1.0"
proptest = "1.2.0"
binggan = { version = "0.14.0" }