mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-07 17:42:55 +00:00
Compare commits
9 Commits
test_parse
...
issue/2411
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
82b510b88b | ||
|
|
71cf19870b | ||
|
|
148594f0f9 | ||
|
|
8edb439440 | ||
|
|
c39d91f827 | ||
|
|
32b6e9711b | ||
|
|
0f99d4f420 | ||
|
|
6e02c5cb25 | ||
|
|
0bac391291 |
@@ -53,8 +53,9 @@ rayon = "1.5.2"
|
|||||||
lru = "0.12.0"
|
lru = "0.12.0"
|
||||||
fastdivide = "0.4.0"
|
fastdivide = "0.4.0"
|
||||||
itertools = "0.13.0"
|
itertools = "0.13.0"
|
||||||
measure_time = "0.8.2"
|
measure_time = "0.9.0"
|
||||||
arc-swap = "1.5.0"
|
arc-swap = "1.5.0"
|
||||||
|
bon = "3.3.1"
|
||||||
|
|
||||||
columnar = { version = "0.3", path = "./columnar", package = "tantivy-columnar" }
|
columnar = { version = "0.3", path = "./columnar", package = "tantivy-columnar" }
|
||||||
sstable = { version = "0.3", path = "./sstable", package = "tantivy-sstable", optional = true }
|
sstable = { version = "0.3", path = "./sstable", package = "tantivy-sstable", optional = true }
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
use std::{fmt, io, mem};
|
use std::{fmt, io, mem};
|
||||||
|
|
||||||
use common::file_slice::FileSlice;
|
use common::file_slice::FileSlice;
|
||||||
|
use common::json_path_writer::JSON_PATH_SEGMENT_SEP;
|
||||||
use common::BinarySerializable;
|
use common::BinarySerializable;
|
||||||
use sstable::{Dictionary, RangeSSTable};
|
use sstable::{Dictionary, RangeSSTable};
|
||||||
|
|
||||||
@@ -76,6 +77,19 @@ fn read_all_columns_in_stream(
|
|||||||
Ok(results)
|
Ok(results)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn column_dictionary_prefix_for_column_name(column_name: &str) -> String {
|
||||||
|
// Each column is a associated to a given `column_key`,
|
||||||
|
// that starts by `column_name\0column_header`.
|
||||||
|
//
|
||||||
|
// Listing the columns associated to the given column name is therefore equivalent to
|
||||||
|
// listing `column_key` with the prefix `column_name\0`.
|
||||||
|
format!("{}{}", column_name, '\0')
|
||||||
|
}
|
||||||
|
|
||||||
|
fn column_dictionary_prefix_for_subpath(root_path: &str) -> String {
|
||||||
|
format!("{}{}", root_path, JSON_PATH_SEGMENT_SEP as char)
|
||||||
|
}
|
||||||
|
|
||||||
impl ColumnarReader {
|
impl ColumnarReader {
|
||||||
/// Opens a new Columnar file.
|
/// Opens a new Columnar file.
|
||||||
pub fn open<F>(file_slice: F) -> io::Result<ColumnarReader>
|
pub fn open<F>(file_slice: F) -> io::Result<ColumnarReader>
|
||||||
@@ -144,32 +158,14 @@ impl ColumnarReader {
|
|||||||
Ok(self.iter_columns()?.collect())
|
Ok(self.iter_columns()?.collect())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn stream_for_column_range(&self, column_name: &str) -> sstable::StreamerBuilder<RangeSSTable> {
|
|
||||||
// Each column is a associated to a given `column_key`,
|
|
||||||
// that starts by `column_name\0column_header`.
|
|
||||||
//
|
|
||||||
// Listing the columns associated to the given column name is therefore equivalent to
|
|
||||||
// listing `column_key` with the prefix `column_name\0`.
|
|
||||||
//
|
|
||||||
// This is in turn equivalent to searching for the range
|
|
||||||
// `[column_name,\0`..column_name\1)`.
|
|
||||||
// TODO can we get some more generic `prefix(..)` logic in the dictionary.
|
|
||||||
let mut start_key = column_name.to_string();
|
|
||||||
start_key.push('\0');
|
|
||||||
let mut end_key = column_name.to_string();
|
|
||||||
end_key.push(1u8 as char);
|
|
||||||
self.column_dictionary
|
|
||||||
.range()
|
|
||||||
.ge(start_key.as_bytes())
|
|
||||||
.lt(end_key.as_bytes())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn read_columns_async(
|
pub async fn read_columns_async(
|
||||||
&self,
|
&self,
|
||||||
column_name: &str,
|
column_name: &str,
|
||||||
) -> io::Result<Vec<DynamicColumnHandle>> {
|
) -> io::Result<Vec<DynamicColumnHandle>> {
|
||||||
|
let prefix = column_dictionary_prefix_for_column_name(column_name);
|
||||||
let stream = self
|
let stream = self
|
||||||
.stream_for_column_range(column_name)
|
.column_dictionary
|
||||||
|
.prefix_range(prefix)
|
||||||
.into_stream_async()
|
.into_stream_async()
|
||||||
.await?;
|
.await?;
|
||||||
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
|
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
|
||||||
@@ -180,7 +176,35 @@ impl ColumnarReader {
|
|||||||
/// There can be more than one column associated to a given column name, provided they have
|
/// There can be more than one column associated to a given column name, provided they have
|
||||||
/// different types.
|
/// different types.
|
||||||
pub fn read_columns(&self, column_name: &str) -> io::Result<Vec<DynamicColumnHandle>> {
|
pub fn read_columns(&self, column_name: &str) -> io::Result<Vec<DynamicColumnHandle>> {
|
||||||
let stream = self.stream_for_column_range(column_name).into_stream()?;
|
let prefix = column_dictionary_prefix_for_column_name(column_name);
|
||||||
|
let stream = self.column_dictionary.prefix_range(prefix).into_stream()?;
|
||||||
|
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn read_subpath_columns_async(
|
||||||
|
&self,
|
||||||
|
root_path: &str,
|
||||||
|
) -> io::Result<Vec<DynamicColumnHandle>> {
|
||||||
|
let prefix = column_dictionary_prefix_for_subpath(root_path);
|
||||||
|
let stream = self
|
||||||
|
.column_dictionary
|
||||||
|
.prefix_range(prefix)
|
||||||
|
.into_stream_async()
|
||||||
|
.await?;
|
||||||
|
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get all inner columns for a given JSON prefix, i.e columns for which the name starts
|
||||||
|
/// with the prefix then contain the [`JSON_PATH_SEGMENT_SEP`].
|
||||||
|
///
|
||||||
|
/// There can be more than one column associated to each path within the JSON structure,
|
||||||
|
/// provided they have different types.
|
||||||
|
pub fn read_subpath_columns(&self, root_path: &str) -> io::Result<Vec<DynamicColumnHandle>> {
|
||||||
|
let prefix = column_dictionary_prefix_for_subpath(root_path);
|
||||||
|
let stream = self
|
||||||
|
.column_dictionary
|
||||||
|
.prefix_range(prefix.as_bytes())
|
||||||
|
.into_stream()?;
|
||||||
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
|
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -192,6 +216,8 @@ impl ColumnarReader {
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
use common::json_path_writer::JSON_PATH_SEGMENT_SEP;
|
||||||
|
|
||||||
use crate::{ColumnType, ColumnarReader, ColumnarWriter};
|
use crate::{ColumnType, ColumnarReader, ColumnarWriter};
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@@ -224,6 +250,64 @@ mod tests {
|
|||||||
assert_eq!(columns[0].1.column_type(), ColumnType::U64);
|
assert_eq!(columns[0].1.column_type(), ColumnType::U64);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_read_columns() {
|
||||||
|
let mut columnar_writer = ColumnarWriter::default();
|
||||||
|
columnar_writer.record_column_type("col", ColumnType::U64, false);
|
||||||
|
columnar_writer.record_numerical(1, "col", 1u64);
|
||||||
|
let mut buffer = Vec::new();
|
||||||
|
columnar_writer.serialize(2, &mut buffer).unwrap();
|
||||||
|
let columnar = ColumnarReader::open(buffer).unwrap();
|
||||||
|
{
|
||||||
|
let columns = columnar.read_columns("col").unwrap();
|
||||||
|
assert_eq!(columns.len(), 1);
|
||||||
|
assert_eq!(columns[0].column_type(), ColumnType::U64);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let columns = columnar.read_columns("other").unwrap();
|
||||||
|
assert_eq!(columns.len(), 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_read_subpath_columns() {
|
||||||
|
let mut columnar_writer = ColumnarWriter::default();
|
||||||
|
columnar_writer.record_str(
|
||||||
|
0,
|
||||||
|
&format!("col1{}subcol1", JSON_PATH_SEGMENT_SEP as char),
|
||||||
|
"hello",
|
||||||
|
);
|
||||||
|
columnar_writer.record_numerical(
|
||||||
|
0,
|
||||||
|
&format!("col1{}subcol2", JSON_PATH_SEGMENT_SEP as char),
|
||||||
|
1i64,
|
||||||
|
);
|
||||||
|
columnar_writer.record_str(1, "col1", "hello");
|
||||||
|
columnar_writer.record_str(0, "col2", "hello");
|
||||||
|
let mut buffer = Vec::new();
|
||||||
|
columnar_writer.serialize(2, &mut buffer).unwrap();
|
||||||
|
|
||||||
|
let columnar = ColumnarReader::open(buffer).unwrap();
|
||||||
|
{
|
||||||
|
let columns = columnar.read_subpath_columns("col1").unwrap();
|
||||||
|
assert_eq!(columns.len(), 2);
|
||||||
|
assert_eq!(columns[0].column_type(), ColumnType::Str);
|
||||||
|
assert_eq!(columns[1].column_type(), ColumnType::I64);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let columns = columnar.read_subpath_columns("col1.subcol1").unwrap();
|
||||||
|
assert_eq!(columns.len(), 0);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let columns = columnar.read_subpath_columns("col2").unwrap();
|
||||||
|
assert_eq!(columns.len(), 0);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let columns = columnar.read_subpath_columns("other").unwrap();
|
||||||
|
assert_eq!(columns.len(), 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
#[should_panic(expected = "Input type forbidden")]
|
#[should_panic(expected = "Input type forbidden")]
|
||||||
fn test_list_columns_strict_typing_panics_on_wrong_types() {
|
fn test_list_columns_strict_typing_panics_on_wrong_types() {
|
||||||
|
|||||||
@@ -285,7 +285,6 @@ impl ColumnarWriter {
|
|||||||
.map(|(column_name, addr)| (column_name, ColumnType::DateTime, addr)),
|
.map(|(column_name, addr)| (column_name, ColumnType::DateTime, addr)),
|
||||||
);
|
);
|
||||||
columns.sort_unstable_by_key(|(column_name, col_type, _)| (*column_name, *col_type));
|
columns.sort_unstable_by_key(|(column_name, col_type, _)| (*column_name, *col_type));
|
||||||
|
|
||||||
let (arena, buffers, dictionaries) = (&self.arena, &mut self.buffers, &self.dictionaries);
|
let (arena, buffers, dictionaries) = (&self.arena, &mut self.buffers, &self.dictionaries);
|
||||||
let mut symbol_byte_buffer: Vec<u8> = Vec::new();
|
let mut symbol_byte_buffer: Vec<u8> = Vec::new();
|
||||||
for (column_name, column_type, addr) in columns {
|
for (column_name, column_type, addr) in columns {
|
||||||
|
|||||||
@@ -271,10 +271,6 @@ impl AggregationWithAccessor {
|
|||||||
field: ref field_name,
|
field: ref field_name,
|
||||||
..
|
..
|
||||||
})
|
})
|
||||||
| Count(CountAggregation {
|
|
||||||
field: ref field_name,
|
|
||||||
..
|
|
||||||
})
|
|
||||||
| Max(MaxAggregation {
|
| Max(MaxAggregation {
|
||||||
field: ref field_name,
|
field: ref field_name,
|
||||||
..
|
..
|
||||||
@@ -299,6 +295,24 @@ impl AggregationWithAccessor {
|
|||||||
get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?;
|
get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?;
|
||||||
add_agg_with_accessor(&agg, accessor, column_type, &mut res)?;
|
add_agg_with_accessor(&agg, accessor, column_type, &mut res)?;
|
||||||
}
|
}
|
||||||
|
Count(CountAggregation {
|
||||||
|
field: ref field_name,
|
||||||
|
..
|
||||||
|
}) => {
|
||||||
|
let allowed_column_types = [
|
||||||
|
ColumnType::I64,
|
||||||
|
ColumnType::U64,
|
||||||
|
ColumnType::F64,
|
||||||
|
ColumnType::Str,
|
||||||
|
ColumnType::DateTime,
|
||||||
|
ColumnType::Bool,
|
||||||
|
ColumnType::IpAddr,
|
||||||
|
// ColumnType::Bytes Unsupported
|
||||||
|
];
|
||||||
|
let (accessor, column_type) =
|
||||||
|
get_ff_reader(reader, field_name, Some(&allowed_column_types))?;
|
||||||
|
add_agg_with_accessor(&agg, accessor, column_type, &mut res)?;
|
||||||
|
}
|
||||||
Percentiles(ref percentiles) => {
|
Percentiles(ref percentiles) => {
|
||||||
let (accessor, column_type) = get_ff_reader(
|
let (accessor, column_type) = get_ff_reader(
|
||||||
reader,
|
reader,
|
||||||
|
|||||||
@@ -220,9 +220,23 @@ impl SegmentStatsCollector {
|
|||||||
.column_block_accessor
|
.column_block_accessor
|
||||||
.fetch_block(docs, &agg_accessor.accessor);
|
.fetch_block(docs, &agg_accessor.accessor);
|
||||||
}
|
}
|
||||||
for val in agg_accessor.column_block_accessor.iter_vals() {
|
if [
|
||||||
let val1 = f64_from_fastfield_u64(val, &self.field_type);
|
ColumnType::I64,
|
||||||
self.stats.collect(val1);
|
ColumnType::U64,
|
||||||
|
ColumnType::F64,
|
||||||
|
ColumnType::DateTime,
|
||||||
|
]
|
||||||
|
.contains(&self.field_type)
|
||||||
|
{
|
||||||
|
for val in agg_accessor.column_block_accessor.iter_vals() {
|
||||||
|
let val1 = f64_from_fastfield_u64(val, &self.field_type);
|
||||||
|
self.stats.collect(val1);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for _val in agg_accessor.column_block_accessor.iter_vals() {
|
||||||
|
// we ignore the value and simply record that we got something
|
||||||
|
self.stats.collect(0.0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -435,6 +449,11 @@ mod tests {
|
|||||||
"field": "score",
|
"field": "score",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
"count_str": {
|
||||||
|
"value_count": {
|
||||||
|
"field": "text",
|
||||||
|
},
|
||||||
|
},
|
||||||
"range": range_agg
|
"range": range_agg
|
||||||
}))
|
}))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
@@ -500,6 +519,13 @@ mod tests {
|
|||||||
})
|
})
|
||||||
);
|
);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
res["count_str"],
|
||||||
|
json!({
|
||||||
|
"value": 7.0,
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -578,7 +578,7 @@ mod tests {
|
|||||||
.set_indexing_options(
|
.set_indexing_options(
|
||||||
TextFieldIndexing::default().set_index_option(IndexRecordOption::WithFreqs),
|
TextFieldIndexing::default().set_index_option(IndexRecordOption::WithFreqs),
|
||||||
)
|
)
|
||||||
.set_fast(None)
|
.set_fast(Some("raw"))
|
||||||
.set_stored();
|
.set_stored();
|
||||||
let text_field = schema_builder.add_text_field("text", text_fieldtype);
|
let text_field = schema_builder.add_text_field("text", text_fieldtype);
|
||||||
let date_field = schema_builder.add_date_field("date", FAST);
|
let date_field = schema_builder.add_date_field("date", FAST);
|
||||||
|
|||||||
@@ -217,7 +217,7 @@ impl FastFieldReaders {
|
|||||||
Ok(dynamic_column.into())
|
Ok(dynamic_column.into())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returning a `dynamic_column_handle`.
|
/// Returns a `dynamic_column_handle`.
|
||||||
pub fn dynamic_column_handle(
|
pub fn dynamic_column_handle(
|
||||||
&self,
|
&self,
|
||||||
field_name: &str,
|
field_name: &str,
|
||||||
@@ -234,7 +234,7 @@ impl FastFieldReaders {
|
|||||||
Ok(dynamic_column_handle_opt)
|
Ok(dynamic_column_handle_opt)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returning all `dynamic_column_handle`.
|
/// Returns all `dynamic_column_handle` that match the given field name.
|
||||||
pub fn dynamic_column_handles(
|
pub fn dynamic_column_handles(
|
||||||
&self,
|
&self,
|
||||||
field_name: &str,
|
field_name: &str,
|
||||||
@@ -250,6 +250,22 @@ impl FastFieldReaders {
|
|||||||
Ok(dynamic_column_handles)
|
Ok(dynamic_column_handles)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns all `dynamic_column_handle` that are inner fields of the provided JSON path.
|
||||||
|
pub fn dynamic_subpath_column_handles(
|
||||||
|
&self,
|
||||||
|
root_path: &str,
|
||||||
|
) -> crate::Result<Vec<DynamicColumnHandle>> {
|
||||||
|
let Some(resolved_field_name) = self.resolve_field(root_path)? else {
|
||||||
|
return Ok(Vec::new());
|
||||||
|
};
|
||||||
|
let dynamic_column_handles = self
|
||||||
|
.columnar
|
||||||
|
.read_subpath_columns(&resolved_field_name)?
|
||||||
|
.into_iter()
|
||||||
|
.collect();
|
||||||
|
Ok(dynamic_column_handles)
|
||||||
|
}
|
||||||
|
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
pub async fn list_dynamic_column_handles(
|
pub async fn list_dynamic_column_handles(
|
||||||
&self,
|
&self,
|
||||||
@@ -265,6 +281,21 @@ impl FastFieldReaders {
|
|||||||
Ok(columns)
|
Ok(columns)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[doc(hidden)]
|
||||||
|
pub async fn list_subpath_dynamic_column_handles(
|
||||||
|
&self,
|
||||||
|
root_path: &str,
|
||||||
|
) -> crate::Result<Vec<DynamicColumnHandle>> {
|
||||||
|
let Some(resolved_field_name) = self.resolve_field(root_path)? else {
|
||||||
|
return Ok(Vec::new());
|
||||||
|
};
|
||||||
|
let columns = self
|
||||||
|
.columnar
|
||||||
|
.read_subpath_columns_async(&resolved_field_name)
|
||||||
|
.await?;
|
||||||
|
Ok(columns)
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns the `u64` column used to represent any `u64`-mapped typed (String/Bytes term ids,
|
/// Returns the `u64` column used to represent any `u64`-mapped typed (String/Bytes term ids,
|
||||||
/// i64, u64, f64, DateTime).
|
/// i64, u64, f64, DateTime).
|
||||||
///
|
///
|
||||||
@@ -476,6 +507,15 @@ mod tests {
|
|||||||
.iter()
|
.iter()
|
||||||
.any(|column| column.column_type() == ColumnType::Str));
|
.any(|column| column.column_type() == ColumnType::Str));
|
||||||
|
|
||||||
println!("*** {:?}", fast_fields.columnar().list_columns());
|
let json_columns = fast_fields.dynamic_column_handles("json").unwrap();
|
||||||
|
assert_eq!(json_columns.len(), 0);
|
||||||
|
|
||||||
|
let json_subcolumns = fast_fields.dynamic_subpath_column_handles("json").unwrap();
|
||||||
|
assert_eq!(json_subcolumns.len(), 3);
|
||||||
|
|
||||||
|
let foo_subcolumns = fast_fields
|
||||||
|
.dynamic_subpath_column_handles("json.foo")
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(foo_subcolumns.len(), 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -15,7 +15,9 @@ use crate::directory::MmapDirectory;
|
|||||||
use crate::directory::{Directory, ManagedDirectory, RamDirectory, INDEX_WRITER_LOCK};
|
use crate::directory::{Directory, ManagedDirectory, RamDirectory, INDEX_WRITER_LOCK};
|
||||||
use crate::error::{DataCorruption, TantivyError};
|
use crate::error::{DataCorruption, TantivyError};
|
||||||
use crate::index::{IndexMeta, SegmentId, SegmentMeta, SegmentMetaInventory};
|
use crate::index::{IndexMeta, SegmentId, SegmentMeta, SegmentMetaInventory};
|
||||||
use crate::indexer::index_writer::{MAX_NUM_THREAD, MEMORY_BUDGET_NUM_BYTES_MIN};
|
use crate::indexer::index_writer::{
|
||||||
|
IndexWriterOptions, MAX_NUM_THREAD, MEMORY_BUDGET_NUM_BYTES_MIN,
|
||||||
|
};
|
||||||
use crate::indexer::segment_updater::save_metas;
|
use crate::indexer::segment_updater::save_metas;
|
||||||
use crate::indexer::{IndexWriter, SingleSegmentIndexWriter};
|
use crate::indexer::{IndexWriter, SingleSegmentIndexWriter};
|
||||||
use crate::reader::{IndexReader, IndexReaderBuilder};
|
use crate::reader::{IndexReader, IndexReaderBuilder};
|
||||||
@@ -519,6 +521,43 @@ impl Index {
|
|||||||
load_metas(self.directory(), &self.inventory)
|
load_metas(self.directory(), &self.inventory)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Open a new index writer with the given options. Attempts to acquire a lockfile.
|
||||||
|
///
|
||||||
|
/// The lockfile should be deleted on drop, but it is possible
|
||||||
|
/// that due to a panic or other error, a stale lockfile will be
|
||||||
|
/// left in the index directory. If you are sure that no other
|
||||||
|
/// `IndexWriter` on the system is accessing the index directory,
|
||||||
|
/// it is safe to manually delete the lockfile.
|
||||||
|
///
|
||||||
|
/// - `options` defines the writer configuration which includes things like buffer sizes,
|
||||||
|
/// indexer threads, etc...
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
/// If the lockfile already exists, returns `TantivyError::LockFailure`.
|
||||||
|
/// If the memory arena per thread is too small or too big, returns
|
||||||
|
/// `TantivyError::InvalidArgument`
|
||||||
|
pub fn writer_with_options<D: Document>(
|
||||||
|
&self,
|
||||||
|
options: IndexWriterOptions,
|
||||||
|
) -> crate::Result<IndexWriter<D>> {
|
||||||
|
let directory_lock = self
|
||||||
|
.directory
|
||||||
|
.acquire_lock(&INDEX_WRITER_LOCK)
|
||||||
|
.map_err(|err| {
|
||||||
|
TantivyError::LockFailure(
|
||||||
|
err,
|
||||||
|
Some(
|
||||||
|
"Failed to acquire index lock. If you are using a regular directory, this \
|
||||||
|
means there is already an `IndexWriter` working on this `Directory`, in \
|
||||||
|
this process or in a different process."
|
||||||
|
.to_string(),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
IndexWriter::new(self, options, directory_lock)
|
||||||
|
}
|
||||||
|
|
||||||
/// Open a new index writer. Attempts to acquire a lockfile.
|
/// Open a new index writer. Attempts to acquire a lockfile.
|
||||||
///
|
///
|
||||||
/// The lockfile should be deleted on drop, but it is possible
|
/// The lockfile should be deleted on drop, but it is possible
|
||||||
@@ -543,27 +582,12 @@ impl Index {
|
|||||||
num_threads: usize,
|
num_threads: usize,
|
||||||
overall_memory_budget_in_bytes: usize,
|
overall_memory_budget_in_bytes: usize,
|
||||||
) -> crate::Result<IndexWriter<D>> {
|
) -> crate::Result<IndexWriter<D>> {
|
||||||
let directory_lock = self
|
|
||||||
.directory
|
|
||||||
.acquire_lock(&INDEX_WRITER_LOCK)
|
|
||||||
.map_err(|err| {
|
|
||||||
TantivyError::LockFailure(
|
|
||||||
err,
|
|
||||||
Some(
|
|
||||||
"Failed to acquire index lock. If you are using a regular directory, this \
|
|
||||||
means there is already an `IndexWriter` working on this `Directory`, in \
|
|
||||||
this process or in a different process."
|
|
||||||
.to_string(),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
let memory_arena_in_bytes_per_thread = overall_memory_budget_in_bytes / num_threads;
|
let memory_arena_in_bytes_per_thread = overall_memory_budget_in_bytes / num_threads;
|
||||||
IndexWriter::new(
|
let options = IndexWriterOptions::builder()
|
||||||
self,
|
.num_worker_threads(num_threads)
|
||||||
num_threads,
|
.memory_budget_per_thread(memory_arena_in_bytes_per_thread)
|
||||||
memory_arena_in_bytes_per_thread,
|
.build();
|
||||||
directory_lock,
|
self.writer_with_options(options)
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Helper to create an index writer for tests.
|
/// Helper to create an index writer for tests.
|
||||||
|
|||||||
@@ -45,6 +45,23 @@ fn error_in_index_worker_thread(context: &str) -> TantivyError {
|
|||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, bon::Builder)]
|
||||||
|
/// A builder for creating a new [IndexWriter] for an index.
|
||||||
|
pub struct IndexWriterOptions {
|
||||||
|
#[builder(default = MEMORY_BUDGET_NUM_BYTES_MIN)]
|
||||||
|
/// The memory budget per indexer thread.
|
||||||
|
///
|
||||||
|
/// When an indexer thread has buffered this much data in memory
|
||||||
|
/// it will flush the segment to disk (although this is not searchable until commit is called.)
|
||||||
|
memory_budget_per_thread: usize,
|
||||||
|
#[builder(default = 1)]
|
||||||
|
/// The number of indexer worker threads to use.
|
||||||
|
num_worker_threads: usize,
|
||||||
|
#[builder(default = 4)]
|
||||||
|
/// Defines the number of merger threads to use.
|
||||||
|
num_merge_threads: usize,
|
||||||
|
}
|
||||||
|
|
||||||
/// `IndexWriter` is the user entry-point to add document to an index.
|
/// `IndexWriter` is the user entry-point to add document to an index.
|
||||||
///
|
///
|
||||||
/// It manages a small number of indexing thread, as well as a shared
|
/// It manages a small number of indexing thread, as well as a shared
|
||||||
@@ -58,8 +75,7 @@ pub struct IndexWriter<D: Document = TantivyDocument> {
|
|||||||
|
|
||||||
index: Index,
|
index: Index,
|
||||||
|
|
||||||
// The memory budget per thread, after which a commit is triggered.
|
options: IndexWriterOptions,
|
||||||
memory_budget_in_bytes_per_thread: usize,
|
|
||||||
|
|
||||||
workers_join_handle: Vec<JoinHandle<crate::Result<()>>>,
|
workers_join_handle: Vec<JoinHandle<crate::Result<()>>>,
|
||||||
|
|
||||||
@@ -70,8 +86,6 @@ pub struct IndexWriter<D: Document = TantivyDocument> {
|
|||||||
|
|
||||||
worker_id: usize,
|
worker_id: usize,
|
||||||
|
|
||||||
num_threads: usize,
|
|
||||||
|
|
||||||
delete_queue: DeleteQueue,
|
delete_queue: DeleteQueue,
|
||||||
|
|
||||||
stamper: Stamper,
|
stamper: Stamper,
|
||||||
@@ -265,23 +279,27 @@ impl<D: Document> IndexWriter<D> {
|
|||||||
/// `TantivyError::InvalidArgument`
|
/// `TantivyError::InvalidArgument`
|
||||||
pub(crate) fn new(
|
pub(crate) fn new(
|
||||||
index: &Index,
|
index: &Index,
|
||||||
num_threads: usize,
|
options: IndexWriterOptions,
|
||||||
memory_budget_in_bytes_per_thread: usize,
|
|
||||||
directory_lock: DirectoryLock,
|
directory_lock: DirectoryLock,
|
||||||
) -> crate::Result<Self> {
|
) -> crate::Result<Self> {
|
||||||
if memory_budget_in_bytes_per_thread < MEMORY_BUDGET_NUM_BYTES_MIN {
|
if options.memory_budget_per_thread < MEMORY_BUDGET_NUM_BYTES_MIN {
|
||||||
let err_msg = format!(
|
let err_msg = format!(
|
||||||
"The memory arena in bytes per thread needs to be at least \
|
"The memory arena in bytes per thread needs to be at least \
|
||||||
{MEMORY_BUDGET_NUM_BYTES_MIN}."
|
{MEMORY_BUDGET_NUM_BYTES_MIN}."
|
||||||
);
|
);
|
||||||
return Err(TantivyError::InvalidArgument(err_msg));
|
return Err(TantivyError::InvalidArgument(err_msg));
|
||||||
}
|
}
|
||||||
if memory_budget_in_bytes_per_thread >= MEMORY_BUDGET_NUM_BYTES_MAX {
|
if options.memory_budget_per_thread >= MEMORY_BUDGET_NUM_BYTES_MAX {
|
||||||
let err_msg = format!(
|
let err_msg = format!(
|
||||||
"The memory arena in bytes per thread cannot exceed {MEMORY_BUDGET_NUM_BYTES_MAX}"
|
"The memory arena in bytes per thread cannot exceed {MEMORY_BUDGET_NUM_BYTES_MAX}"
|
||||||
);
|
);
|
||||||
return Err(TantivyError::InvalidArgument(err_msg));
|
return Err(TantivyError::InvalidArgument(err_msg));
|
||||||
}
|
}
|
||||||
|
if options.num_worker_threads == 0 {
|
||||||
|
let err_msg = "At least one worker thread is required, got 0".to_string();
|
||||||
|
return Err(TantivyError::InvalidArgument(err_msg));
|
||||||
|
}
|
||||||
|
|
||||||
let (document_sender, document_receiver) =
|
let (document_sender, document_receiver) =
|
||||||
crossbeam_channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
|
crossbeam_channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
|
||||||
|
|
||||||
@@ -291,13 +309,17 @@ impl<D: Document> IndexWriter<D> {
|
|||||||
|
|
||||||
let stamper = Stamper::new(current_opstamp);
|
let stamper = Stamper::new(current_opstamp);
|
||||||
|
|
||||||
let segment_updater =
|
let segment_updater = SegmentUpdater::create(
|
||||||
SegmentUpdater::create(index.clone(), stamper.clone(), &delete_queue.cursor())?;
|
index.clone(),
|
||||||
|
stamper.clone(),
|
||||||
|
&delete_queue.cursor(),
|
||||||
|
options.num_merge_threads,
|
||||||
|
)?;
|
||||||
|
|
||||||
let mut index_writer = Self {
|
let mut index_writer = Self {
|
||||||
_directory_lock: Some(directory_lock),
|
_directory_lock: Some(directory_lock),
|
||||||
|
|
||||||
memory_budget_in_bytes_per_thread,
|
options: options.clone(),
|
||||||
index: index.clone(),
|
index: index.clone(),
|
||||||
index_writer_status: IndexWriterStatus::from(document_receiver),
|
index_writer_status: IndexWriterStatus::from(document_receiver),
|
||||||
operation_sender: document_sender,
|
operation_sender: document_sender,
|
||||||
@@ -305,7 +327,6 @@ impl<D: Document> IndexWriter<D> {
|
|||||||
segment_updater,
|
segment_updater,
|
||||||
|
|
||||||
workers_join_handle: vec![],
|
workers_join_handle: vec![],
|
||||||
num_threads,
|
|
||||||
|
|
||||||
delete_queue,
|
delete_queue,
|
||||||
|
|
||||||
@@ -398,7 +419,7 @@ impl<D: Document> IndexWriter<D> {
|
|||||||
|
|
||||||
let mut delete_cursor = self.delete_queue.cursor();
|
let mut delete_cursor = self.delete_queue.cursor();
|
||||||
|
|
||||||
let mem_budget = self.memory_budget_in_bytes_per_thread;
|
let mem_budget = self.options.memory_budget_per_thread;
|
||||||
let index = self.index.clone();
|
let index = self.index.clone();
|
||||||
let join_handle: JoinHandle<crate::Result<()>> = thread::Builder::new()
|
let join_handle: JoinHandle<crate::Result<()>> = thread::Builder::new()
|
||||||
.name(format!("thrd-tantivy-index{}", self.worker_id))
|
.name(format!("thrd-tantivy-index{}", self.worker_id))
|
||||||
@@ -451,7 +472,7 @@ impl<D: Document> IndexWriter<D> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn start_workers(&mut self) -> crate::Result<()> {
|
fn start_workers(&mut self) -> crate::Result<()> {
|
||||||
for _ in 0..self.num_threads {
|
for _ in 0..self.options.num_worker_threads {
|
||||||
self.add_indexing_worker()?;
|
self.add_indexing_worker()?;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -553,12 +574,7 @@ impl<D: Document> IndexWriter<D> {
|
|||||||
.take()
|
.take()
|
||||||
.expect("The IndexWriter does not have any lock. This is a bug, please report.");
|
.expect("The IndexWriter does not have any lock. This is a bug, please report.");
|
||||||
|
|
||||||
let new_index_writer = IndexWriter::new(
|
let new_index_writer = IndexWriter::new(&self.index, self.options.clone(), directory_lock)?;
|
||||||
&self.index,
|
|
||||||
self.num_threads,
|
|
||||||
self.memory_budget_in_bytes_per_thread,
|
|
||||||
directory_lock,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
// the current `self` is dropped right away because of this call.
|
// the current `self` is dropped right away because of this call.
|
||||||
//
|
//
|
||||||
@@ -812,7 +828,7 @@ mod tests {
|
|||||||
use crate::directory::error::LockError;
|
use crate::directory::error::LockError;
|
||||||
use crate::error::*;
|
use crate::error::*;
|
||||||
use crate::indexer::index_writer::MEMORY_BUDGET_NUM_BYTES_MIN;
|
use crate::indexer::index_writer::MEMORY_BUDGET_NUM_BYTES_MIN;
|
||||||
use crate::indexer::NoMergePolicy;
|
use crate::indexer::{IndexWriterOptions, NoMergePolicy};
|
||||||
use crate::query::{QueryParser, TermQuery};
|
use crate::query::{QueryParser, TermQuery};
|
||||||
use crate::schema::{
|
use crate::schema::{
|
||||||
self, Facet, FacetOptions, IndexRecordOption, IpAddrOptions, JsonObjectOptions,
|
self, Facet, FacetOptions, IndexRecordOption, IpAddrOptions, JsonObjectOptions,
|
||||||
@@ -2533,4 +2549,36 @@ mod tests {
|
|||||||
index_writer.commit().unwrap();
|
index_writer.commit().unwrap();
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_writer_options_validation() {
|
||||||
|
let mut schema_builder = Schema::builder();
|
||||||
|
let field = schema_builder.add_bool_field("example", STORED);
|
||||||
|
let index = Index::create_in_ram(schema_builder.build());
|
||||||
|
|
||||||
|
let opt_wo_threads = IndexWriterOptions::builder().num_worker_threads(0).build();
|
||||||
|
let result = index.writer_with_options::<TantivyDocument>(opt_wo_threads);
|
||||||
|
assert!(result.is_err(), "Writer should reject 0 thread count");
|
||||||
|
assert!(matches!(result, Err(TantivyError::InvalidArgument(_))));
|
||||||
|
|
||||||
|
let opt_with_low_memory = IndexWriterOptions::builder()
|
||||||
|
.memory_budget_per_thread(10 << 10)
|
||||||
|
.build();
|
||||||
|
let result = index.writer_with_options::<TantivyDocument>(opt_with_low_memory);
|
||||||
|
assert!(
|
||||||
|
result.is_err(),
|
||||||
|
"Writer should reject options with too low memory size"
|
||||||
|
);
|
||||||
|
assert!(matches!(result, Err(TantivyError::InvalidArgument(_))));
|
||||||
|
|
||||||
|
let opt_with_low_memory = IndexWriterOptions::builder()
|
||||||
|
.memory_budget_per_thread(5 << 30)
|
||||||
|
.build();
|
||||||
|
let result = index.writer_with_options::<TantivyDocument>(opt_with_low_memory);
|
||||||
|
assert!(
|
||||||
|
result.is_err(),
|
||||||
|
"Writer should reject options with too high memory size"
|
||||||
|
);
|
||||||
|
assert!(matches!(result, Err(TantivyError::InvalidArgument(_))));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ mod stamper;
|
|||||||
use crossbeam_channel as channel;
|
use crossbeam_channel as channel;
|
||||||
use smallvec::SmallVec;
|
use smallvec::SmallVec;
|
||||||
|
|
||||||
pub use self::index_writer::IndexWriter;
|
pub use self::index_writer::{IndexWriter, IndexWriterOptions};
|
||||||
pub use self::log_merge_policy::LogMergePolicy;
|
pub use self::log_merge_policy::LogMergePolicy;
|
||||||
pub use self::merge_operation::MergeOperation;
|
pub use self::merge_operation::MergeOperation;
|
||||||
pub use self::merge_policy::{MergeCandidate, MergePolicy, NoMergePolicy};
|
pub use self::merge_policy::{MergeCandidate, MergePolicy, NoMergePolicy};
|
||||||
|
|||||||
@@ -25,8 +25,6 @@ use crate::indexer::{
|
|||||||
};
|
};
|
||||||
use crate::{FutureResult, Opstamp};
|
use crate::{FutureResult, Opstamp};
|
||||||
|
|
||||||
const NUM_MERGE_THREADS: usize = 4;
|
|
||||||
|
|
||||||
/// Save the index meta file.
|
/// Save the index meta file.
|
||||||
/// This operation is atomic:
|
/// This operation is atomic:
|
||||||
/// Either
|
/// Either
|
||||||
@@ -273,6 +271,7 @@ impl SegmentUpdater {
|
|||||||
index: Index,
|
index: Index,
|
||||||
stamper: Stamper,
|
stamper: Stamper,
|
||||||
delete_cursor: &DeleteCursor,
|
delete_cursor: &DeleteCursor,
|
||||||
|
num_merge_threads: usize,
|
||||||
) -> crate::Result<SegmentUpdater> {
|
) -> crate::Result<SegmentUpdater> {
|
||||||
let segments = index.searchable_segment_metas()?;
|
let segments = index.searchable_segment_metas()?;
|
||||||
let segment_manager = SegmentManager::from_segments(segments, delete_cursor);
|
let segment_manager = SegmentManager::from_segments(segments, delete_cursor);
|
||||||
@@ -287,7 +286,18 @@ impl SegmentUpdater {
|
|||||||
})?;
|
})?;
|
||||||
let merge_thread_pool = ThreadPoolBuilder::new()
|
let merge_thread_pool = ThreadPoolBuilder::new()
|
||||||
.thread_name(|i| format!("merge_thread_{i}"))
|
.thread_name(|i| format!("merge_thread_{i}"))
|
||||||
.num_threads(NUM_MERGE_THREADS)
|
.num_threads(num_merge_threads)
|
||||||
|
.panic_handler(move |panic| {
|
||||||
|
let message = if let Some(msg) = panic.downcast_ref::<&str>() {
|
||||||
|
*msg
|
||||||
|
} else if let Some(msg) = panic.downcast_ref::<String>() {
|
||||||
|
msg.as_str()
|
||||||
|
} else {
|
||||||
|
"UNKNOWN"
|
||||||
|
};
|
||||||
|
eprintln!("merge thread panicked with: {message}")
|
||||||
|
|
||||||
|
})
|
||||||
.build()
|
.build()
|
||||||
.map_err(|_| {
|
.map_err(|_| {
|
||||||
crate::TantivyError::SystemError(
|
crate::TantivyError::SystemError(
|
||||||
|
|||||||
@@ -422,6 +422,7 @@ mod tests {
|
|||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
|
|
||||||
|
use columnar::ColumnType;
|
||||||
use tempfile::TempDir;
|
use tempfile::TempDir;
|
||||||
|
|
||||||
use crate::collector::{Count, TopDocs};
|
use crate::collector::{Count, TopDocs};
|
||||||
@@ -431,15 +432,15 @@ mod tests {
|
|||||||
use crate::query::{PhraseQuery, QueryParser};
|
use crate::query::{PhraseQuery, QueryParser};
|
||||||
use crate::schema::{
|
use crate::schema::{
|
||||||
Document, IndexRecordOption, OwnedValue, Schema, TextFieldIndexing, TextOptions, Value,
|
Document, IndexRecordOption, OwnedValue, Schema, TextFieldIndexing, TextOptions, Value,
|
||||||
DATE_TIME_PRECISION_INDEXED, STORED, STRING, TEXT,
|
DATE_TIME_PRECISION_INDEXED, FAST, STORED, STRING, TEXT,
|
||||||
};
|
};
|
||||||
use crate::store::{Compressor, StoreReader, StoreWriter};
|
use crate::store::{Compressor, StoreReader, StoreWriter};
|
||||||
use crate::time::format_description::well_known::Rfc3339;
|
use crate::time::format_description::well_known::Rfc3339;
|
||||||
use crate::time::OffsetDateTime;
|
use crate::time::OffsetDateTime;
|
||||||
use crate::tokenizer::{PreTokenizedString, Token};
|
use crate::tokenizer::{PreTokenizedString, Token};
|
||||||
use crate::{
|
use crate::{
|
||||||
DateTime, Directory, DocAddress, DocSet, Index, IndexWriter, TantivyDocument, Term,
|
DateTime, Directory, DocAddress, DocSet, Index, IndexWriter, SegmentReader,
|
||||||
TERMINATED,
|
TantivyDocument, Term, TERMINATED,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@@ -841,6 +842,75 @@ mod tests {
|
|||||||
assert_eq!(searcher.search(&phrase_query, &Count).unwrap(), 0);
|
assert_eq!(searcher.search(&phrase_query, &Count).unwrap(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_json_fast() {
|
||||||
|
let mut schema_builder = Schema::builder();
|
||||||
|
let json_field = schema_builder.add_json_field("json", FAST);
|
||||||
|
let schema = schema_builder.build();
|
||||||
|
let json_val: serde_json::Value = serde_json::from_str(
|
||||||
|
r#"{
|
||||||
|
"toto": "titi",
|
||||||
|
"float": -0.2,
|
||||||
|
"bool": true,
|
||||||
|
"unsigned": 1,
|
||||||
|
"signed": -2,
|
||||||
|
"complexobject": {
|
||||||
|
"field.with.dot": 1
|
||||||
|
},
|
||||||
|
"date": "1985-04-12T23:20:50.52Z",
|
||||||
|
"my_arr": [2, 3, {"my_key": "two tokens"}, 4]
|
||||||
|
}"#,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
let doc = doc!(json_field=>json_val.clone());
|
||||||
|
let index = Index::create_in_ram(schema.clone());
|
||||||
|
let mut writer = index.writer_for_tests().unwrap();
|
||||||
|
writer.add_document(doc).unwrap();
|
||||||
|
writer.commit().unwrap();
|
||||||
|
let reader = index.reader().unwrap();
|
||||||
|
let searcher = reader.searcher();
|
||||||
|
let segment_reader = searcher.segment_reader(0u32);
|
||||||
|
|
||||||
|
fn assert_type(reader: &SegmentReader, field: &str, typ: ColumnType) {
|
||||||
|
let cols = reader.fast_fields().dynamic_column_handles(field).unwrap();
|
||||||
|
assert_eq!(cols.len(), 1, "{}", field);
|
||||||
|
assert_eq!(cols[0].column_type(), typ, "{}", field);
|
||||||
|
}
|
||||||
|
assert_type(segment_reader, "json.toto", ColumnType::Str);
|
||||||
|
assert_type(segment_reader, "json.float", ColumnType::F64);
|
||||||
|
assert_type(segment_reader, "json.bool", ColumnType::Bool);
|
||||||
|
assert_type(segment_reader, "json.unsigned", ColumnType::I64);
|
||||||
|
assert_type(segment_reader, "json.signed", ColumnType::I64);
|
||||||
|
assert_type(
|
||||||
|
segment_reader,
|
||||||
|
"json.complexobject.field\\.with\\.dot",
|
||||||
|
ColumnType::I64,
|
||||||
|
);
|
||||||
|
assert_type(segment_reader, "json.date", ColumnType::DateTime);
|
||||||
|
assert_type(segment_reader, "json.my_arr", ColumnType::I64);
|
||||||
|
assert_type(segment_reader, "json.my_arr.my_key", ColumnType::Str);
|
||||||
|
|
||||||
|
fn assert_empty(reader: &SegmentReader, field: &str) {
|
||||||
|
let cols = reader.fast_fields().dynamic_column_handles(field).unwrap();
|
||||||
|
assert_eq!(cols.len(), 0);
|
||||||
|
}
|
||||||
|
assert_empty(segment_reader, "unknown");
|
||||||
|
assert_empty(segment_reader, "json");
|
||||||
|
assert_empty(segment_reader, "json.toto.titi");
|
||||||
|
|
||||||
|
let sub_columns = segment_reader
|
||||||
|
.fast_fields()
|
||||||
|
.dynamic_subpath_column_handles("json")
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(sub_columns.len(), 9);
|
||||||
|
|
||||||
|
let subsub_columns = segment_reader
|
||||||
|
.fast_fields()
|
||||||
|
.dynamic_subpath_column_handles("json.complexobject")
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(subsub_columns.len(), 1);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_json_term_with_numeric_merge_panic_regression_bug_2283() {
|
fn test_json_term_with_numeric_merge_panic_regression_bug_2283() {
|
||||||
// https://github.com/quickwit-oss/tantivy/issues/2283
|
// https://github.com/quickwit-oss/tantivy/issues/2283
|
||||||
|
|||||||
@@ -7,14 +7,32 @@ use crate::docset::{DocSet, TERMINATED};
|
|||||||
use crate::index::SegmentReader;
|
use crate::index::SegmentReader;
|
||||||
use crate::query::explanation::does_not_match;
|
use crate::query::explanation::does_not_match;
|
||||||
use crate::query::{EnableScoring, Explanation, Query, Scorer, Weight};
|
use crate::query::{EnableScoring, Explanation, Query, Scorer, Weight};
|
||||||
|
use crate::schema::Type;
|
||||||
use crate::{DocId, Score, TantivyError};
|
use crate::{DocId, Score, TantivyError};
|
||||||
|
|
||||||
/// Query that matches all documents with a non-null value in the specified field.
|
/// Query that matches all documents with a non-null value in the specified
|
||||||
|
/// field.
|
||||||
|
///
|
||||||
|
/// When querying inside a JSON field, "exists" queries can be executed strictly
|
||||||
|
/// on the field name or check all the subpaths. In that second case a document
|
||||||
|
/// will be matched if a non-null value exists in any subpath. For example,
|
||||||
|
/// assuming the following document where `myfield` is a JSON fast field:
|
||||||
|
/// ```json
|
||||||
|
/// {
|
||||||
|
/// "myfield": {
|
||||||
|
/// "mysubfield": "hello"
|
||||||
|
/// }
|
||||||
|
/// }
|
||||||
|
/// ```
|
||||||
|
/// With `json_subpaths` enabled queries on either `myfield` or
|
||||||
|
/// `myfield.mysubfield` will match the document. If it is set to false, only
|
||||||
|
/// `myfield.mysubfield` will match it.
|
||||||
///
|
///
|
||||||
/// All of the matched documents get the score 1.0.
|
/// All of the matched documents get the score 1.0.
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct ExistsQuery {
|
pub struct ExistsQuery {
|
||||||
field_name: String,
|
field_name: String,
|
||||||
|
json_subpaths: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ExistsQuery {
|
impl ExistsQuery {
|
||||||
@@ -23,8 +41,28 @@ impl ExistsQuery {
|
|||||||
/// This query matches all documents with at least one non-null value in the specified field.
|
/// This query matches all documents with at least one non-null value in the specified field.
|
||||||
/// This constructor never fails, but executing the search with this query will return an
|
/// This constructor never fails, but executing the search with this query will return an
|
||||||
/// error if the specified field doesn't exists or is not a fast field.
|
/// error if the specified field doesn't exists or is not a fast field.
|
||||||
|
#[deprecated]
|
||||||
pub fn new_exists_query(field: String) -> ExistsQuery {
|
pub fn new_exists_query(field: String) -> ExistsQuery {
|
||||||
ExistsQuery { field_name: field }
|
ExistsQuery {
|
||||||
|
field_name: field,
|
||||||
|
json_subpaths: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Creates a new `ExistQuery` from the given field.
|
||||||
|
///
|
||||||
|
/// This query matches all documents with at least one non-null value in the
|
||||||
|
/// specified field. If `json_subpaths` is set to true, documents with
|
||||||
|
/// non-null values in any JSON subpath will also be matched.
|
||||||
|
///
|
||||||
|
/// This constructor never fails, but executing the search with this query will
|
||||||
|
/// return an error if the specified field doesn't exists or is not a fast
|
||||||
|
/// field.
|
||||||
|
pub fn new(field: String, json_subpaths: bool) -> Self {
|
||||||
|
Self {
|
||||||
|
field_name: field,
|
||||||
|
json_subpaths,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -43,6 +81,8 @@ impl Query for ExistsQuery {
|
|||||||
}
|
}
|
||||||
Ok(Box::new(ExistsWeight {
|
Ok(Box::new(ExistsWeight {
|
||||||
field_name: self.field_name.clone(),
|
field_name: self.field_name.clone(),
|
||||||
|
field_type: field_type.value_type(),
|
||||||
|
json_subpaths: self.json_subpaths,
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -50,13 +90,20 @@ impl Query for ExistsQuery {
|
|||||||
/// Weight associated with the `ExistsQuery` query.
|
/// Weight associated with the `ExistsQuery` query.
|
||||||
pub struct ExistsWeight {
|
pub struct ExistsWeight {
|
||||||
field_name: String,
|
field_name: String,
|
||||||
|
field_type: Type,
|
||||||
|
json_subpaths: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Weight for ExistsWeight {
|
impl Weight for ExistsWeight {
|
||||||
fn scorer(&self, reader: &SegmentReader, boost: Score) -> crate::Result<Box<dyn Scorer>> {
|
fn scorer(&self, reader: &SegmentReader, boost: Score) -> crate::Result<Box<dyn Scorer>> {
|
||||||
let fast_field_reader = reader.fast_fields();
|
let fast_field_reader = reader.fast_fields();
|
||||||
let dynamic_columns: crate::Result<Vec<DynamicColumn>> = fast_field_reader
|
let mut column_handles = fast_field_reader.dynamic_column_handles(&self.field_name)?;
|
||||||
.dynamic_column_handles(&self.field_name)?
|
if self.field_type == Type::Json && self.json_subpaths {
|
||||||
|
let mut sub_columns =
|
||||||
|
fast_field_reader.dynamic_subpath_column_handles(&self.field_name)?;
|
||||||
|
column_handles.append(&mut sub_columns);
|
||||||
|
}
|
||||||
|
let dynamic_columns: crate::Result<Vec<DynamicColumn>> = column_handles
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|handle| handle.open().map_err(|io_error| io_error.into()))
|
.map(|handle| handle.open().map_err(|io_error| io_error.into()))
|
||||||
.collect();
|
.collect();
|
||||||
@@ -180,11 +227,12 @@ mod tests {
|
|||||||
let reader = index.reader()?;
|
let reader = index.reader()?;
|
||||||
let searcher = reader.searcher();
|
let searcher = reader.searcher();
|
||||||
|
|
||||||
assert_eq!(count_existing_fields(&searcher, "all")?, 100);
|
assert_eq!(count_existing_fields(&searcher, "all", false)?, 100);
|
||||||
assert_eq!(count_existing_fields(&searcher, "odd")?, 50);
|
assert_eq!(count_existing_fields(&searcher, "odd", false)?, 50);
|
||||||
assert_eq!(count_existing_fields(&searcher, "even")?, 50);
|
assert_eq!(count_existing_fields(&searcher, "even", false)?, 50);
|
||||||
assert_eq!(count_existing_fields(&searcher, "multi")?, 10);
|
assert_eq!(count_existing_fields(&searcher, "multi", false)?, 10);
|
||||||
assert_eq!(count_existing_fields(&searcher, "never")?, 0);
|
assert_eq!(count_existing_fields(&searcher, "multi", true)?, 10);
|
||||||
|
assert_eq!(count_existing_fields(&searcher, "never", false)?, 0);
|
||||||
|
|
||||||
// exercise seek
|
// exercise seek
|
||||||
let query = BooleanQuery::intersection(vec![
|
let query = BooleanQuery::intersection(vec![
|
||||||
@@ -192,7 +240,7 @@ mod tests {
|
|||||||
Bound::Included(Term::from_field_u64(all_field, 50)),
|
Bound::Included(Term::from_field_u64(all_field, 50)),
|
||||||
Bound::Unbounded,
|
Bound::Unbounded,
|
||||||
)),
|
)),
|
||||||
Box::new(ExistsQuery::new_exists_query("even".to_string())),
|
Box::new(ExistsQuery::new("even".to_string(), false)),
|
||||||
]);
|
]);
|
||||||
assert_eq!(searcher.search(&query, &Count)?, 25);
|
assert_eq!(searcher.search(&query, &Count)?, 25);
|
||||||
|
|
||||||
@@ -201,7 +249,7 @@ mod tests {
|
|||||||
Bound::Included(Term::from_field_u64(all_field, 0)),
|
Bound::Included(Term::from_field_u64(all_field, 0)),
|
||||||
Bound::Included(Term::from_field_u64(all_field, 50)),
|
Bound::Included(Term::from_field_u64(all_field, 50)),
|
||||||
)),
|
)),
|
||||||
Box::new(ExistsQuery::new_exists_query("odd".to_string())),
|
Box::new(ExistsQuery::new("odd".to_string(), false)),
|
||||||
]);
|
]);
|
||||||
assert_eq!(searcher.search(&query, &Count)?, 25);
|
assert_eq!(searcher.search(&query, &Count)?, 25);
|
||||||
|
|
||||||
@@ -230,22 +278,18 @@ mod tests {
|
|||||||
let reader = index.reader()?;
|
let reader = index.reader()?;
|
||||||
let searcher = reader.searcher();
|
let searcher = reader.searcher();
|
||||||
|
|
||||||
assert_eq!(count_existing_fields(&searcher, "json.all")?, 100);
|
assert_eq!(count_existing_fields(&searcher, "json.all", false)?, 100);
|
||||||
assert_eq!(count_existing_fields(&searcher, "json.even")?, 50);
|
assert_eq!(count_existing_fields(&searcher, "json.even", false)?, 50);
|
||||||
assert_eq!(count_existing_fields(&searcher, "json.odd")?, 50);
|
assert_eq!(count_existing_fields(&searcher, "json.even", true)?, 50);
|
||||||
|
assert_eq!(count_existing_fields(&searcher, "json.odd", false)?, 50);
|
||||||
|
assert_eq!(count_existing_fields(&searcher, "json", false)?, 0);
|
||||||
|
assert_eq!(count_existing_fields(&searcher, "json", true)?, 100);
|
||||||
|
|
||||||
// Handling of non-existing fields:
|
// Handling of non-existing fields:
|
||||||
assert_eq!(count_existing_fields(&searcher, "json.absent")?, 0);
|
assert_eq!(count_existing_fields(&searcher, "json.absent", false)?, 0);
|
||||||
assert_eq!(
|
assert_eq!(count_existing_fields(&searcher, "json.absent", true)?, 0);
|
||||||
searcher
|
assert_does_not_exist(&searcher, "does_not_exists.absent", true);
|
||||||
.search(
|
assert_does_not_exist(&searcher, "does_not_exists.absent", false);
|
||||||
&ExistsQuery::new_exists_query("does_not_exists.absent".to_string()),
|
|
||||||
&Count
|
|
||||||
)
|
|
||||||
.unwrap_err()
|
|
||||||
.to_string(),
|
|
||||||
"The field does not exist: 'does_not_exists.absent'"
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -284,12 +328,13 @@ mod tests {
|
|||||||
let reader = index.reader()?;
|
let reader = index.reader()?;
|
||||||
let searcher = reader.searcher();
|
let searcher = reader.searcher();
|
||||||
|
|
||||||
assert_eq!(count_existing_fields(&searcher, "bool")?, 50);
|
assert_eq!(count_existing_fields(&searcher, "bool", false)?, 50);
|
||||||
assert_eq!(count_existing_fields(&searcher, "bytes")?, 50);
|
assert_eq!(count_existing_fields(&searcher, "bool", true)?, 50);
|
||||||
assert_eq!(count_existing_fields(&searcher, "date")?, 50);
|
assert_eq!(count_existing_fields(&searcher, "bytes", false)?, 50);
|
||||||
assert_eq!(count_existing_fields(&searcher, "f64")?, 50);
|
assert_eq!(count_existing_fields(&searcher, "date", false)?, 50);
|
||||||
assert_eq!(count_existing_fields(&searcher, "ip_addr")?, 50);
|
assert_eq!(count_existing_fields(&searcher, "f64", false)?, 50);
|
||||||
assert_eq!(count_existing_fields(&searcher, "facet")?, 50);
|
assert_eq!(count_existing_fields(&searcher, "ip_addr", false)?, 50);
|
||||||
|
assert_eq!(count_existing_fields(&searcher, "facet", false)?, 50);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -313,31 +358,33 @@ mod tests {
|
|||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
searcher
|
searcher
|
||||||
.search(
|
.search(&ExistsQuery::new("not_fast".to_string(), false), &Count)
|
||||||
&ExistsQuery::new_exists_query("not_fast".to_string()),
|
|
||||||
&Count
|
|
||||||
)
|
|
||||||
.unwrap_err()
|
.unwrap_err()
|
||||||
.to_string(),
|
.to_string(),
|
||||||
"Schema error: 'Field not_fast is not a fast field.'"
|
"Schema error: 'Field not_fast is not a fast field.'"
|
||||||
);
|
);
|
||||||
|
|
||||||
assert_eq!(
|
assert_does_not_exist(&searcher, "does_not_exists", false);
|
||||||
searcher
|
|
||||||
.search(
|
|
||||||
&ExistsQuery::new_exists_query("does_not_exists".to_string()),
|
|
||||||
&Count
|
|
||||||
)
|
|
||||||
.unwrap_err()
|
|
||||||
.to_string(),
|
|
||||||
"The field does not exist: 'does_not_exists'"
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn count_existing_fields(searcher: &Searcher, field: &str) -> crate::Result<usize> {
|
fn count_existing_fields(
|
||||||
let query = ExistsQuery::new_exists_query(field.to_string());
|
searcher: &Searcher,
|
||||||
|
field: &str,
|
||||||
|
json_subpaths: bool,
|
||||||
|
) -> crate::Result<usize> {
|
||||||
|
let query = ExistsQuery::new(field.to_string(), json_subpaths);
|
||||||
searcher.search(&query, &Count)
|
searcher.search(&query, &Count)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn assert_does_not_exist(searcher: &Searcher, field: &str, json_subpaths: bool) {
|
||||||
|
assert_eq!(
|
||||||
|
searcher
|
||||||
|
.search(&ExistsQuery::new(field.to_string(), json_subpaths), &Count)
|
||||||
|
.unwrap_err()
|
||||||
|
.to_string(),
|
||||||
|
format!("The field does not exist: '{}'", field)
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -521,6 +521,25 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
|
|||||||
StreamerBuilder::new(self, AlwaysMatch)
|
StreamerBuilder::new(self, AlwaysMatch)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns a range builder filtered with a prefix.
|
||||||
|
pub fn prefix_range<K: AsRef<[u8]>>(&self, prefix: K) -> StreamerBuilder<TSSTable> {
|
||||||
|
let lower_bound = prefix.as_ref();
|
||||||
|
let mut upper_bound = lower_bound.to_vec();
|
||||||
|
for idx in (0..upper_bound.len()).rev() {
|
||||||
|
if upper_bound[idx] == 255 {
|
||||||
|
upper_bound.pop();
|
||||||
|
} else {
|
||||||
|
upper_bound[idx] += 1;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let mut builder = self.range().ge(lower_bound);
|
||||||
|
if !upper_bound.is_empty() {
|
||||||
|
builder = builder.lt(upper_bound);
|
||||||
|
}
|
||||||
|
builder
|
||||||
|
}
|
||||||
|
|
||||||
/// A stream of all the sorted terms.
|
/// A stream of all the sorted terms.
|
||||||
pub fn stream(&self) -> io::Result<Streamer<TSSTable>> {
|
pub fn stream(&self) -> io::Result<Streamer<TSSTable>> {
|
||||||
self.range().into_stream()
|
self.range().into_stream()
|
||||||
@@ -928,4 +947,62 @@ mod tests {
|
|||||||
}
|
}
|
||||||
assert!(!stream.advance());
|
assert!(!stream.advance());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_prefix() {
|
||||||
|
let (dic, _slice) = make_test_sstable();
|
||||||
|
{
|
||||||
|
let mut stream = dic.prefix_range("1").into_stream().unwrap();
|
||||||
|
for i in 0x10000..0x20000 {
|
||||||
|
assert!(stream.advance());
|
||||||
|
assert_eq!(stream.term_ord(), i);
|
||||||
|
assert_eq!(stream.value(), &i);
|
||||||
|
assert_eq!(stream.key(), format!("{i:05X}").into_bytes());
|
||||||
|
}
|
||||||
|
assert!(!stream.advance());
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let mut stream = dic.prefix_range("").into_stream().unwrap();
|
||||||
|
for i in 0..0x3ffff {
|
||||||
|
assert!(stream.advance(), "failed at {i:05X}");
|
||||||
|
assert_eq!(stream.term_ord(), i);
|
||||||
|
assert_eq!(stream.value(), &i);
|
||||||
|
assert_eq!(stream.key(), format!("{i:05X}").into_bytes());
|
||||||
|
}
|
||||||
|
assert!(!stream.advance());
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let mut stream = dic.prefix_range("0FF").into_stream().unwrap();
|
||||||
|
for i in 0x0ff00..=0x0ffff {
|
||||||
|
assert!(stream.advance(), "failed at {i:05X}");
|
||||||
|
assert_eq!(stream.term_ord(), i);
|
||||||
|
assert_eq!(stream.value(), &i);
|
||||||
|
assert_eq!(stream.key(), format!("{i:05X}").into_bytes());
|
||||||
|
}
|
||||||
|
assert!(!stream.advance());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_prefix_edge() {
|
||||||
|
let dict = {
|
||||||
|
let mut builder = Dictionary::<MonotonicU64SSTable>::builder(Vec::new()).unwrap();
|
||||||
|
builder.insert(&[0, 254], &0).unwrap();
|
||||||
|
builder.insert(&[0, 255], &1).unwrap();
|
||||||
|
builder.insert(&[0, 255, 12], &2).unwrap();
|
||||||
|
builder.insert(&[1], &2).unwrap();
|
||||||
|
builder.insert(&[1, 0], &2).unwrap();
|
||||||
|
let table = builder.finish().unwrap();
|
||||||
|
let table = Arc::new(PermissionedHandle::new(table));
|
||||||
|
let slice = common::file_slice::FileSlice::new(table.clone());
|
||||||
|
Dictionary::<MonotonicU64SSTable>::open(slice).unwrap()
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut stream = dict.prefix_range(&[0, 255]).into_stream().unwrap();
|
||||||
|
assert!(stream.advance());
|
||||||
|
assert_eq!(stream.key(), &[0, 255]);
|
||||||
|
assert!(stream.advance());
|
||||||
|
assert_eq!(stream.key(), &[0, 255, 12]);
|
||||||
|
assert!(!stream.advance());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -26,7 +26,7 @@ path = "example/hashmap.rs"
|
|||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
rand = "0.8.5"
|
rand = "0.8.5"
|
||||||
zipf = "7.0.0"
|
zipf = "7.0.0"
|
||||||
rustc-hash = "1.1.0"
|
rustc-hash = "2.1.0"
|
||||||
proptest = "1.2.0"
|
proptest = "1.2.0"
|
||||||
binggan = { version = "0.14.0" }
|
binggan = { version = "0.14.0" }
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user