Compare commits

..

5 Commits

22 changed files with 352 additions and 529 deletions

View File

@@ -25,7 +25,7 @@ aho-corasick = "1.0"
tantivy-fst = "0.5"
memmap2 = { version = "0.9.0", optional = true }
lz4_flex = { version = "0.11", default-features = false, optional = true }
zstd = { version = "0.13", default-features = false }
zstd = { version = "0.13", optional = true, default-features = false }
tempfile = { version = "3.3.0", optional = true }
log = "0.4.16"
serde = { version = "1.0.136", features = ["derive"] }
@@ -105,7 +105,7 @@ mmap = ["fs4", "tempfile", "memmap2"]
stopwords = []
lz4-compression = ["lz4_flex"]
zstd-compression = []
zstd-compression = ["zstd"]
failpoints = ["fail", "fail/failpoints"]
unstable = [] # useful for benches.

View File

@@ -58,7 +58,7 @@ impl ColumnType {
self == &ColumnType::DateTime
}
pub fn try_from_code(code: u8) -> Result<ColumnType, InvalidData> {
pub(crate) fn try_from_code(code: u8) -> Result<ColumnType, InvalidData> {
COLUMN_TYPES.get(code as usize).copied().ok_or(InvalidData)
}
}

View File

@@ -333,7 +333,7 @@ impl ColumnarWriter {
num_docs: RowId,
old_to_new_row_ids: Option<&[RowId]>,
wrt: &mut dyn io::Write,
) -> io::Result<Vec<(String, ColumnType)>> {
) -> io::Result<()> {
let mut serializer = ColumnarSerializer::new(wrt);
let mut columns: Vec<(&[u8], ColumnType, Addr)> = self
.numerical_field_hash_map
@@ -374,9 +374,7 @@ impl ColumnarWriter {
let (arena, buffers, dictionaries) = (&self.arena, &mut self.buffers, &self.dictionaries);
let mut symbol_byte_buffer: Vec<u8> = Vec::new();
for (column_name, column_type, addr) in columns.iter() {
let column_type = *column_type;
let addr = *addr;
for (column_name, column_type, addr) in columns {
match column_type {
ColumnType::Bool => {
let column_writer: ColumnWriter = self.bool_field_hash_map.read(addr);
@@ -487,15 +485,7 @@ impl ColumnarWriter {
};
}
serializer.finalize(num_docs)?;
Ok(columns
.into_iter()
.map(|(column_name, column_type, _)| {
(
String::from_utf8_lossy(column_name).to_string(),
column_type,
)
})
.collect())
Ok(())
}
}

View File

@@ -6,7 +6,7 @@ use ownedbytes::OwnedBytes;
use crate::ByteCount;
#[derive(Clone, Copy, Eq, PartialEq, Hash)]
#[derive(Clone, Copy, Eq, PartialEq)]
pub struct TinySet(u64);
impl fmt::Debug for TinySet {

View File

@@ -142,7 +142,6 @@ impl SegmentMeta {
SegmentComponent::FastFields => ".fast".to_string(),
SegmentComponent::FieldNorms => ".fieldnorm".to_string(),
SegmentComponent::Delete => format!(".{}.del", self.delete_opstamp().unwrap_or(0)),
SegmentComponent::FieldList => ".fieldlist".to_string(),
});
PathBuf::from(path)
}

View File

@@ -70,7 +70,7 @@ impl InvertedIndexReader {
&self.termdict
}
/// Return the fields and types encoded in the dictionary in lexicographic order.
/// Return the fields and types encoded in the dictionary in lexicographic oder.
/// Only valid on JSON fields.
///
/// Notice: This requires a full scan and therefore **very expensive**.

View File

@@ -1,4 +1,4 @@
use columnar::{ColumnType, MonotonicallyMappableToU64};
use columnar::MonotonicallyMappableToU64;
use common::{replace_in_place, JsonPathWriter};
use rustc_hash::FxHashMap;
@@ -153,7 +153,7 @@ fn index_json_value<'a, V: Value<'a>>(
let mut token_stream = text_analyzer.token_stream(val);
let unordered_id = ctx
.path_to_unordered_id
.get_or_allocate_unordered_id(json_path_writer.as_str(), ColumnType::Str);
.get_or_allocate_unordered_id(json_path_writer.as_str());
// TODO: make sure the chain position works out.
set_path_id(term_buffer, unordered_id);
@@ -171,7 +171,7 @@ fn index_json_value<'a, V: Value<'a>>(
set_path_id(
term_buffer,
ctx.path_to_unordered_id
.get_or_allocate_unordered_id(json_path_writer.as_str(), ColumnType::U64),
.get_or_allocate_unordered_id(json_path_writer.as_str()),
);
term_buffer.append_type_and_fast_value(val);
postings_writer.subscribe(doc, 0u32, term_buffer, ctx);
@@ -180,7 +180,7 @@ fn index_json_value<'a, V: Value<'a>>(
set_path_id(
term_buffer,
ctx.path_to_unordered_id
.get_or_allocate_unordered_id(json_path_writer.as_str(), ColumnType::I64),
.get_or_allocate_unordered_id(json_path_writer.as_str()),
);
term_buffer.append_type_and_fast_value(val);
postings_writer.subscribe(doc, 0u32, term_buffer, ctx);
@@ -189,7 +189,7 @@ fn index_json_value<'a, V: Value<'a>>(
set_path_id(
term_buffer,
ctx.path_to_unordered_id
.get_or_allocate_unordered_id(json_path_writer.as_str(), ColumnType::F64),
.get_or_allocate_unordered_id(json_path_writer.as_str()),
);
term_buffer.append_type_and_fast_value(val);
postings_writer.subscribe(doc, 0u32, term_buffer, ctx);
@@ -198,7 +198,7 @@ fn index_json_value<'a, V: Value<'a>>(
set_path_id(
term_buffer,
ctx.path_to_unordered_id
.get_or_allocate_unordered_id(json_path_writer.as_str(), ColumnType::Bool),
.get_or_allocate_unordered_id(json_path_writer.as_str()),
);
term_buffer.append_type_and_fast_value(val);
postings_writer.subscribe(doc, 0u32, term_buffer, ctx);
@@ -206,10 +206,8 @@ fn index_json_value<'a, V: Value<'a>>(
ReferenceValueLeaf::Date(val) => {
set_path_id(
term_buffer,
ctx.path_to_unordered_id.get_or_allocate_unordered_id(
json_path_writer.as_str(),
ColumnType::DateTime,
),
ctx.path_to_unordered_id
.get_or_allocate_unordered_id(json_path_writer.as_str()),
);
term_buffer.append_type_and_fast_value(val);
postings_writer.subscribe(doc, 0u32, term_buffer, ctx);

View File

@@ -1,4 +1,6 @@
use std::collections::BTreeMap;
#[cfg(feature = "quickwit")]
use std::future::Future;
use std::sync::Arc;
use std::{fmt, io};
@@ -112,6 +114,108 @@ impl Searcher {
store_reader.get_async(doc_address.doc_id).await
}
/// Fetches multiple documents in an asynchronous manner.
///
/// This method is more efficient than calling [`doc_async`](Self::doc_async) multiple times,
/// as it groups overlapping requests to segments and blocks and avoids concurrent requests
/// trashing the caches of each other. However, it does so using intermediate data structures
/// and independent block caches so it will be slower if documents from very few blocks are
/// fetched which would have fit into the global block cache.
///
/// The caller is expected to poll these futures concurrently (e.g. using `FuturesUnordered`)
/// or in parallel (e.g. using `JoinSet`) as fits best with the given use case, i.e. whether
/// it is predominately I/O-bound or rather CPU-bound.
///
/// Note that any blocks brought into any of the per-segment-and-block groups will not be pulled
/// into the global block cache and hence not be available for subsequent calls.
///
/// Note that there is no synchronous variant of this method as the same degree of efficiency
/// can be had by accessing documents in address order.
///
/// # Example
///
/// ```rust,no_run
/// # use futures::executor::block_on;
/// # use futures::stream::{FuturesUnordered, StreamExt};
/// #
/// # use tantivy::schema::Schema;
/// # use tantivy::{DocAddress, Index, TantivyDocument, TantivyError};
/// #
/// # let index = Index::create_in_ram(Schema::builder().build());
/// # let searcher = index.reader()?.searcher();
/// #
/// # let doc_addresses = (0..10).map(|_| DocAddress::new(0, 0));
/// #
/// let mut groups: FuturesUnordered<_> = searcher
/// .docs_async::<TantivyDocument>(doc_addresses)?
/// .collect();
///
/// let mut docs = Vec::new();
///
/// block_on(async {
/// while let Some(group) = groups.next().await {
/// docs.extend(group?);
/// }
///
/// Ok::<_, TantivyError>(())
/// })?;
/// #
/// # Ok::<_, TantivyError>(())
/// ```
#[cfg(feature = "quickwit")]
pub fn docs_async<D: DocumentDeserialize>(
&self,
doc_addresses: impl IntoIterator<Item = DocAddress>,
) -> crate::Result<
impl Iterator<Item = impl Future<Output = crate::Result<Vec<(DocAddress, D)>>>> + '_,
> {
use rustc_hash::FxHashMap;
use crate::store::CacheKey;
use crate::{DocId, SegmentOrdinal};
let mut groups: FxHashMap<(SegmentOrdinal, CacheKey), Vec<DocId>> = Default::default();
for doc_address in doc_addresses {
let store_reader = &self.inner.store_readers[doc_address.segment_ord as usize];
let cache_key = store_reader.cache_key(doc_address.doc_id)?;
groups
.entry((doc_address.segment_ord, cache_key))
.or_default()
.push(doc_address.doc_id);
}
let futures = groups
.into_iter()
.map(|((segment_ord, cache_key), doc_ids)| {
// Each group fetches documents from exactly one block and
// therefore gets an independent block cache of size one.
let store_reader =
self.inner.store_readers[segment_ord as usize].fork_cache(1, &[cache_key]);
async move {
let mut docs = Vec::new();
for doc_id in doc_ids {
let doc = store_reader.get_async(doc_id).await?;
docs.push((
DocAddress {
segment_ord,
doc_id,
},
doc,
));
}
Ok(docs)
}
});
Ok(futures)
}
/// Access the schema associated with the index of this searcher.
pub fn schema(&self) -> &Schema {
&self.inner.schema

View File

@@ -27,14 +27,12 @@ pub enum SegmentComponent {
/// Bitset describing which document of the segment is alive.
/// (It was representing deleted docs but changed to represent alive docs from v0.17)
Delete,
/// Field list describing the fields in the segment.
FieldList,
}
impl SegmentComponent {
/// Iterates through the components.
pub fn iterator() -> slice::Iter<'static, SegmentComponent> {
static SEGMENT_COMPONENTS: [SegmentComponent; 9] = [
static SEGMENT_COMPONENTS: [SegmentComponent; 8] = [
SegmentComponent::Postings,
SegmentComponent::Positions,
SegmentComponent::FastFields,
@@ -43,7 +41,6 @@ impl SegmentComponent {
SegmentComponent::Store,
SegmentComponent::TempStore,
SegmentComponent::Delete,
SegmentComponent::FieldList,
];
SEGMENT_COMPONENTS.iter()
}

View File

@@ -3,14 +3,15 @@ use std::ops::BitOrAssign;
use std::sync::{Arc, RwLock};
use std::{fmt, io};
use fnv::FnvHashMap;
use itertools::Itertools;
use crate::core::{InvertedIndexReader, Segment, SegmentComponent, SegmentId};
use crate::directory::{CompositeFile, FileSlice};
use crate::error::DataCorruption;
use crate::fastfield::{intersect_alive_bitsets, AliveBitSet, FacetReader, FastFieldReaders};
use crate::field_list::read_split_fields;
use crate::fieldnorm::{FieldNormReader, FieldNormReaders};
use crate::json_utils::json_path_sep_to_dot;
use crate::schema::{Field, IndexRecordOption, Schema, Type};
use crate::space_usage::SegmentSpaceUsage;
use crate::store::StoreReader;
@@ -43,7 +44,6 @@ pub struct SegmentReader {
fast_fields_readers: FastFieldReaders,
fieldnorm_readers: FieldNormReaders,
list_fields_file: Option<FileSlice>, // Optional field list file for backwards compatibility
store_file: FileSlice,
alive_bitset_opt: Option<AliveBitSet>,
schema: Schema,
@@ -153,7 +153,6 @@ impl SegmentReader {
let termdict_composite = CompositeFile::open(&termdict_file)?;
let store_file = segment.open_read(SegmentComponent::Store)?;
let list_fields_file = segment.open_read(SegmentComponent::FieldList).ok();
crate::fail_point!("SegmentReader::open#middle");
@@ -202,7 +201,6 @@ impl SegmentReader {
segment_id: segment.id(),
delete_opstamp: segment.meta().delete_opstamp(),
store_file,
list_fields_file,
alive_bitset_opt,
positions_composite,
schema,
@@ -301,25 +299,87 @@ impl SegmentReader {
/// field that is not indexed nor a fast field but is stored, it is possible for the field
/// to not be listed.
pub fn fields_metadata(&self) -> crate::Result<Vec<FieldMetadata>> {
if let Some(list_fields_file) = self.list_fields_file.as_ref() {
let file = list_fields_file.read_bytes()?;
let fields_metadata =
read_split_fields(file)?.collect::<io::Result<Vec<FieldMetadata>>>();
fields_metadata.map_err(|e| e.into())
} else {
// Schema fallback
Ok(self
.schema()
.fields()
.map(|(_field, entry)| FieldMetadata {
field_name: entry.name().to_string(),
typ: entry.field_type().value_type(),
indexed: entry.is_indexed(),
stored: entry.is_stored(),
fast: entry.is_fast(),
})
.collect())
let mut indexed_fields: Vec<FieldMetadata> = Vec::new();
let mut map_to_canonical = FnvHashMap::default();
for (field, field_entry) in self.schema().fields() {
let field_name = field_entry.name().to_string();
let is_indexed = field_entry.is_indexed();
if is_indexed {
let is_json = field_entry.field_type().value_type() == Type::Json;
if is_json {
let inv_index = self.inverted_index(field)?;
let encoded_fields_in_index = inv_index.list_encoded_fields()?;
let mut build_path = |field_name: &str, mut json_path: String| {
// In this case we need to map the potential fast field to the field name
// accepted by the query parser.
let create_canonical =
!field_entry.is_expand_dots_enabled() && json_path.contains('.');
if create_canonical {
// Without expand dots enabled dots need to be escaped.
let escaped_json_path = json_path.replace('.', "\\.");
let full_path = format!("{}.{}", field_name, escaped_json_path);
let full_path_unescaped = format!("{}.{}", field_name, &json_path);
map_to_canonical.insert(full_path_unescaped, full_path.to_string());
full_path
} else {
// With expand dots enabled, we can use '.' instead of '\u{1}'.
json_path_sep_to_dot(&mut json_path);
format!("{}.{}", field_name, json_path)
}
};
indexed_fields.extend(
encoded_fields_in_index
.into_iter()
.map(|(name, typ)| (build_path(&field_name, name), typ))
.map(|(field_name, typ)| FieldMetadata {
indexed: true,
stored: false,
field_name,
fast: false,
typ,
}),
);
} else {
indexed_fields.push(FieldMetadata {
indexed: true,
stored: false,
field_name: field_name.to_string(),
fast: false,
typ: field_entry.field_type().value_type(),
});
}
}
}
let mut fast_fields: Vec<FieldMetadata> = self
.fast_fields()
.columnar()
.iter_columns()?
.map(|(mut field_name, handle)| {
json_path_sep_to_dot(&mut field_name);
// map to canonical path, to avoid similar but different entries.
// Eventually we should just accept '.' seperated for all cases.
let field_name = map_to_canonical
.get(&field_name)
.unwrap_or(&field_name)
.to_string();
FieldMetadata {
indexed: false,
stored: false,
field_name,
fast: true,
typ: Type::from(handle.column_type()),
}
})
.collect();
// Since the type is encoded differently in the fast field and in the inverted index,
// the order of the fields is not guaranteed to be the same. Therefore, we sort the fields.
// If we are sure that the order is the same, we can remove this sort.
indexed_fields.sort_unstable();
fast_fields.sort_unstable();
let merged = merge_field_meta_data(vec![indexed_fields, fast_fields], &self.schema);
Ok(merged)
}
/// Returns the segment id

View File

@@ -424,7 +424,7 @@ fn test_non_text_json_term_freq() {
json_term_writer.set_fast_value(75u64);
let postings = inv_idx
.read_postings(
&json_term_writer.term(),
json_term_writer.term(),
IndexRecordOption::WithFreqsAndPositions,
)
.unwrap()
@@ -462,7 +462,7 @@ fn test_non_text_json_term_freq_bitpacked() {
json_term_writer.set_fast_value(75u64);
let mut postings = inv_idx
.read_postings(
&json_term_writer.term(),
json_term_writer.term(),
IndexRecordOption::WithFreqsAndPositions,
)
.unwrap()
@@ -474,3 +474,60 @@ fn test_non_text_json_term_freq_bitpacked() {
assert_eq!(postings.term_freq(), 1u32);
}
}
#[cfg(feature = "quickwit")]
#[test]
fn test_get_many_docs() -> crate::Result<()> {
use futures::executor::block_on;
use futures::stream::{FuturesUnordered, StreamExt};
use crate::schema::{OwnedValue, STORED};
use crate::{DocAddress, TantivyError};
let mut schema_builder = Schema::builder();
let num_field = schema_builder.add_u64_field("num", STORED);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer: IndexWriter = index.writer_for_tests()?;
index_writer.set_merge_policy(Box::new(NoMergePolicy));
for i in 0..10u64 {
let doc = doc!(num_field=>i);
index_writer.add_document(doc)?;
}
index_writer.commit()?;
let segment_ids = index.searchable_segment_ids()?;
index_writer.merge(&segment_ids).wait().unwrap();
let searcher = index.reader()?.searcher();
assert_eq!(searcher.num_docs(), 10);
let doc_addresses = (0..10).map(|i| DocAddress::new(0, i));
let mut groups: FuturesUnordered<_> = searcher
.docs_async::<TantivyDocument>(doc_addresses)?
.collect();
let mut doc_nums = Vec::new();
block_on(async {
while let Some(group) = groups.next().await {
for (_doc_address, doc) in group? {
let num_value = doc.get_first(num_field).unwrap();
if let OwnedValue::U64(num) = num_value {
doc_nums.push(*num);
} else {
panic!("Expected u64 value");
}
}
}
Ok::<_, TantivyError>(())
})?;
doc_nums.sort();
assert_eq!(doc_nums, (0..10).collect::<Vec<u64>>());
Ok(())
}

View File

@@ -238,17 +238,13 @@ impl FastFieldsWriter {
mut self,
wrt: &mut dyn io::Write,
doc_id_map_opt: Option<&DocIdMapping>,
) -> io::Result<Vec<(String, Type)>> {
) -> io::Result<()> {
let num_docs = self.num_docs;
let old_to_new_row_ids =
doc_id_map_opt.map(|doc_id_mapping| doc_id_mapping.old_to_new_ids());
let columns = self
.columnar_writer
self.columnar_writer
.serialize(num_docs, old_to_new_row_ids, wrt)?;
Ok(columns
.into_iter()
.map(|(field_name, column)| (field_name.to_string(), column.into()))
.collect())
Ok(())
}
}

View File

@@ -1,369 +0,0 @@
//! The list of fields that are stored in a `tantivy` `Index`.
use std::collections::HashSet;
use std::io::{self, ErrorKind, Read};
use columnar::ColumnType;
use common::TinySet;
use fnv::FnvHashMap;
use crate::indexer::path_to_unordered_id::OrderedPathId;
use crate::json_utils::json_path_sep_to_dot;
use crate::postings::IndexingContext;
use crate::schema::{Field, Schema, Type};
use crate::{merge_field_meta_data, FieldMetadata, Term};
#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
pub(crate) struct FieldConfig {
pub typ: Type,
pub indexed: bool,
pub stored: bool,
pub fast: bool,
}
impl FieldConfig {
fn serialize(&self) -> [u8; 2] {
let typ = self.typ.to_code();
let flags = (self.indexed as u8) << 2 | (self.stored as u8) << 1 | (self.fast as u8);
[typ, flags]
}
fn deserialize_from(data: [u8; 2]) -> io::Result<FieldConfig> {
let typ = Type::from_code(data[0]).ok_or_else(|| {
io::Error::new(
ErrorKind::InvalidData,
format!("could not deserialize type {}", data[0]),
)
})?;
let data = data[1];
let indexed = (data & 0b100) != 0;
let stored = (data & 0b010) != 0;
let fast = (data & 0b001) != 0;
Ok(FieldConfig {
typ,
indexed,
stored,
fast,
})
}
}
/// Serializes the split fields.
pub(crate) fn serialize_segment_fields(
ctx: IndexingContext,
wrt: &mut dyn io::Write,
schema: &Schema,
unordered_id_to_ordered_id: &[(OrderedPathId, TinySet)],
mut columns: Vec<(String, Type)>,
) -> crate::Result<()> {
let mut field_list_set: HashSet<(Field, OrderedPathId, TinySet)> = HashSet::default();
let mut encoded_fields = Vec::new();
let mut map_to_canonical = FnvHashMap::default();
// Replace unordered ids by ordered ids to be able to sort
let ordered_id_to_path = ctx.path_to_unordered_id.ordered_id_to_path();
for (key, _addr) in ctx.term_index.iter() {
let field = Term::wrap(key).field();
let field_entry = schema.get_field_entry(field);
if field_entry.field_type().value_type() == Type::Json {
let byte_range_unordered_id = 5..5 + 4;
let unordered_id =
u32::from_be_bytes(key[byte_range_unordered_id.clone()].try_into().unwrap());
let (path_id, typ_code_bitvec) = unordered_id_to_ordered_id[unordered_id as usize];
if !field_list_set.contains(&(field, path_id, typ_code_bitvec)) {
field_list_set.insert((field, path_id, typ_code_bitvec));
let mut build_path = |field_name: &str, mut json_path: String| {
// In this case we need to map the potential fast field to the field name
// accepted by the query parser.
let create_canonical =
!field_entry.is_expand_dots_enabled() && json_path.contains('.');
if create_canonical {
// Without expand dots enabled dots need to be escaped.
let escaped_json_path = json_path.replace('.', "\\.");
let full_path = format!("{}.{}", field_name, escaped_json_path);
let full_path_unescaped = format!("{}.{}", field_name, &json_path);
map_to_canonical.insert(full_path_unescaped, full_path.to_string());
full_path
} else {
// With expand dots enabled, we can use '.' instead of '\u{1}'.
json_path_sep_to_dot(&mut json_path);
format!("{}.{}", field_name, json_path)
}
};
let path = build_path(
field_entry.name(),
ordered_id_to_path[path_id.path_id() as usize].to_string(), /* String::from_utf8(key[5..].to_vec()).unwrap(), */
);
encoded_fields.push((path, typ_code_bitvec));
}
}
}
let mut indexed_fields: Vec<FieldMetadata> = Vec::new();
for (_field, field_entry) in schema.fields() {
let field_name = field_entry.name().to_string();
let is_indexed = field_entry.is_indexed();
let is_json = field_entry.field_type().value_type() == Type::Json;
if is_indexed && !is_json {
indexed_fields.push(FieldMetadata {
indexed: true,
stored: false,
field_name: field_name.to_string(),
fast: false,
typ: field_entry.field_type().value_type(),
});
}
}
for (field_name, field_type_set) in encoded_fields {
for field_type in field_type_set {
let column_type = ColumnType::try_from_code(field_type as u8).unwrap();
indexed_fields.push(FieldMetadata {
indexed: true,
stored: false,
field_name: field_name.to_string(),
fast: false,
typ: Type::from(column_type),
});
}
}
let mut fast_fields: Vec<FieldMetadata> = columns
.iter_mut()
.map(|(field_name, typ)| {
json_path_sep_to_dot(field_name);
// map to canonical path, to avoid similar but different entries.
// Eventually we should just accept '.' seperated for all cases.
let field_name = map_to_canonical
.get(field_name)
.unwrap_or(field_name)
.to_string();
FieldMetadata {
indexed: false,
stored: false,
field_name,
fast: true,
typ: *typ,
}
})
.collect();
// Since the type is encoded differently in the fast field and in the inverted index,
// the order of the fields is not guaranteed to be the same. Therefore, we sort the fields.
// If we are sure that the order is the same, we can remove this sort.
indexed_fields.sort_unstable();
fast_fields.sort_unstable();
let merged = merge_field_meta_data(vec![indexed_fields, fast_fields], schema);
let out = serialize_split_fields(&merged);
wrt.write_all(&out)?;
Ok(())
}
/// Serializes the Split fields.
///
/// `fields_metadata` has to be sorted.
pub fn serialize_split_fields(fields_metadata: &[FieldMetadata]) -> Vec<u8> {
// ensure that fields_metadata is strictly sorted.
debug_assert!(fields_metadata.windows(2).all(|w| w[0] < w[1]));
let mut payload = Vec::new();
// Write Num Fields
let length = fields_metadata.len() as u32;
payload.extend_from_slice(&length.to_le_bytes());
for field_metadata in fields_metadata {
write_field(field_metadata, &mut payload);
}
let compression_level = 3;
let payload_compressed = zstd::stream::encode_all(&mut &payload[..], compression_level)
.expect("zstd encoding failed");
let mut out = Vec::new();
// Write Header -- Format Version
let format_version = 1u8;
out.push(format_version);
// Write Payload
out.extend_from_slice(&payload_compressed);
out
}
fn write_field(field_metadata: &FieldMetadata, out: &mut Vec<u8>) {
let field_config = FieldConfig {
typ: field_metadata.typ,
indexed: field_metadata.indexed,
stored: field_metadata.stored,
fast: field_metadata.fast,
};
// Write Config 2 bytes
out.extend_from_slice(&field_config.serialize());
let str_length = field_metadata.field_name.len() as u16;
// Write String length 2 bytes
out.extend_from_slice(&str_length.to_le_bytes());
out.extend_from_slice(field_metadata.field_name.as_bytes());
}
/// Reads a fixed number of bytes into an array and returns the array.
fn read_exact_array<R: Read, const N: usize>(reader: &mut R) -> io::Result<[u8; N]> {
let mut buffer = [0u8; N];
reader.read_exact(&mut buffer)?;
Ok(buffer)
}
/// Reads the Split fields from a zstd compressed stream of bytes
pub fn read_split_fields<R: Read>(
mut reader: R,
) -> io::Result<impl Iterator<Item = io::Result<FieldMetadata>>> {
let format_version = read_exact_array::<_, 1>(&mut reader)?[0];
assert_eq!(format_version, 1);
let reader = zstd::Decoder::new(reader)?;
read_split_fields_from_zstd(reader)
}
fn read_field<R: Read>(reader: &mut R) -> io::Result<FieldMetadata> {
// Read FieldConfig (2 bytes)
let config_bytes = read_exact_array::<_, 2>(reader)?;
let field_config = FieldConfig::deserialize_from(config_bytes)?; // Assuming this returns a Result
// Read field name length and the field name
let name_len = u16::from_le_bytes(read_exact_array::<_, 2>(reader)?) as usize;
let mut data = vec![0; name_len];
reader.read_exact(&mut data)?;
let field_name = String::from_utf8(data).map_err(|err| {
io::Error::new(
ErrorKind::InvalidData,
format!(
"Encountered invalid utf8 when deserializing field name: {}",
err
),
)
})?;
Ok(FieldMetadata {
field_name,
typ: field_config.typ,
indexed: field_config.indexed,
stored: field_config.stored,
fast: field_config.fast,
})
}
/// Reads the Split fields from a stream of bytes
fn read_split_fields_from_zstd<R: Read>(
mut reader: R,
) -> io::Result<impl Iterator<Item = io::Result<FieldMetadata>>> {
let mut num_fields = u32::from_le_bytes(read_exact_array::<_, 4>(&mut reader)?);
Ok(std::iter::from_fn(move || {
if num_fields == 0 {
return None;
}
num_fields -= 1;
Some(read_field(&mut reader))
}))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn field_config_deser_test() {
let field_config = FieldConfig {
typ: Type::Str,
indexed: true,
stored: false,
fast: true,
};
let serialized = field_config.serialize();
let deserialized = FieldConfig::deserialize_from(serialized).unwrap();
assert_eq!(field_config, deserialized);
}
#[test]
fn write_read_field_test() {
for typ in Type::iter_values() {
let field_metadata = FieldMetadata {
field_name: "test".to_string(),
typ,
indexed: true,
stored: true,
fast: true,
};
let mut out = Vec::new();
write_field(&field_metadata, &mut out);
let deserialized = read_field(&mut &out[..]).unwrap();
assert_eq!(field_metadata, deserialized);
}
let field_metadata = FieldMetadata {
field_name: "test".to_string(),
typ: Type::Str,
indexed: false,
stored: true,
fast: true,
};
let mut out = Vec::new();
write_field(&field_metadata, &mut out);
let deserialized = read_field(&mut &out[..]).unwrap();
assert_eq!(field_metadata, deserialized);
let field_metadata = FieldMetadata {
field_name: "test".to_string(),
typ: Type::Str,
indexed: false,
stored: false,
fast: true,
};
let mut out = Vec::new();
write_field(&field_metadata, &mut out);
let deserialized = read_field(&mut &out[..]).unwrap();
assert_eq!(field_metadata, deserialized);
let field_metadata = FieldMetadata {
field_name: "test".to_string(),
typ: Type::Str,
indexed: true,
stored: false,
fast: false,
};
let mut out = Vec::new();
write_field(&field_metadata, &mut out);
let deserialized = read_field(&mut &out[..]).unwrap();
assert_eq!(field_metadata, deserialized);
}
#[test]
fn write_split_fields_test() {
let fields_metadata = vec![
FieldMetadata {
field_name: "test".to_string(),
typ: Type::Str,
indexed: true,
stored: true,
fast: true,
},
FieldMetadata {
field_name: "test2".to_string(),
typ: Type::Str,
indexed: true,
stored: false,
fast: false,
},
FieldMetadata {
field_name: "test3".to_string(),
typ: Type::U64,
indexed: true,
stored: false,
fast: true,
},
];
let out = serialize_split_fields(&fields_metadata);
let deserialized: Vec<FieldMetadata> = read_split_fields(&mut &out[..])
.unwrap()
.map(|el| el.unwrap())
.collect();
assert_eq!(fields_metadata, deserialized);
}
}

View File

@@ -1,4 +1,3 @@
use std::io::Write;
use std::sync::Arc;
use columnar::{
@@ -14,7 +13,6 @@ use crate::directory::WritePtr;
use crate::docset::{DocSet, TERMINATED};
use crate::error::DataCorruption;
use crate::fastfield::{AliveBitSet, FastFieldNotAvailableError};
use crate::field_list::serialize_split_fields;
use crate::fieldnorm::{FieldNormReader, FieldNormReaders, FieldNormsSerializer, FieldNormsWriter};
use crate::indexer::doc_id_mapping::{MappingType, SegmentDocIdMapping};
use crate::indexer::SegmentSerializer;
@@ -23,8 +21,8 @@ use crate::schema::{value_type_to_column_type, Field, FieldType, Schema};
use crate::store::StoreWriter;
use crate::termdict::{TermMerger, TermOrdinal};
use crate::{
merge_field_meta_data, DocAddress, DocId, FieldMetadata, IndexSettings, IndexSortByField,
InvertedIndexReader, Order, SegmentComponent, SegmentOrdinal,
DocAddress, DocId, IndexSettings, IndexSortByField, InvertedIndexReader, Order,
SegmentComponent, SegmentOrdinal,
};
/// Segment's max doc must be `< MAX_DOC_LIMIT`.
@@ -257,19 +255,6 @@ impl IndexMerger {
Ok(())
}
fn write_field_list(&self, list_field_wrt: &mut WritePtr) -> crate::Result<()> {
let field_metadatas: Vec<Vec<FieldMetadata>> = self
.readers
.iter()
.map(|reader| reader.fields_metadata())
.collect::<crate::Result<Vec<_>>>()?;
let merged = merge_field_meta_data(field_metadatas, &self.schema);
let out = serialize_split_fields(&merged);
list_field_wrt.write_all(&out)?;
Ok(())
}
fn write_fast_fields(
&self,
fast_field_wrt: &mut WritePtr,
@@ -788,7 +773,6 @@ impl IndexMerger {
self.write_storable_fields(serializer.get_store_writer(), &doc_id_mapping)?;
debug!("write-fastfields");
self.write_fast_fields(serializer.get_fast_field_write(), doc_id_mapping)?;
self.write_field_list(serializer.get_field_list_write())?;
debug!("close-serializer");
serializer.close()?;

View File

@@ -1,5 +1,3 @@
use columnar::ColumnType;
use common::TinySet;
use fnv::FnvHashMap;
/// `Field` is represented by an unsigned 32-bit integer type.
@@ -26,44 +24,34 @@ impl From<u32> for OrderedPathId {
#[derive(Default)]
pub(crate) struct PathToUnorderedId {
/// TinySet contains the type codes of the columns in the path.
map: FnvHashMap<String, (u32, TinySet)>,
map: FnvHashMap<String, u32>,
}
impl PathToUnorderedId {
#[inline]
pub(crate) fn get_or_allocate_unordered_id(&mut self, path: &str, typ: ColumnType) -> u32 {
let code_bit = typ.to_code();
if let Some((id, all_codes)) = self.map.get_mut(path) {
*all_codes = all_codes.insert(code_bit as u32);
pub(crate) fn get_or_allocate_unordered_id(&mut self, path: &str) -> u32 {
if let Some(id) = self.map.get(path) {
return *id;
}
self.insert_new_path(path, code_bit)
self.insert_new_path(path)
}
#[cold]
fn insert_new_path(&mut self, path: &str, typ_code: u8) -> u32 {
fn insert_new_path(&mut self, path: &str) -> u32 {
let next_id = self.map.len() as u32;
self.map.insert(
path.to_string(),
(next_id, TinySet::singleton(typ_code as u32)),
);
self.map.insert(path.to_string(), next_id);
next_id
}
/// Retuns ids which reflect the lexical order of the paths.
///
/// The returned vec can be indexed with the unordered id to get the ordered id.
pub(crate) fn unordered_id_to_ordered_id(&self) -> Vec<(OrderedPathId, TinySet)> {
let mut sorted_ids: Vec<(&str, (u32, TinySet))> = self
.map
.iter()
.map(|(k, (id, typ_code))| (k.as_str(), (*id, *typ_code)))
.collect();
pub(crate) fn unordered_id_to_ordered_id(&self) -> Vec<OrderedPathId> {
let mut sorted_ids: Vec<(&str, &u32)> =
self.map.iter().map(|(k, v)| (k.as_str(), v)).collect();
sorted_ids.sort_unstable_by_key(|(path, _)| *path);
let mut result = vec![(OrderedPathId::default(), TinySet::empty()); sorted_ids.len()];
for (ordered, (unordered, typ_code)) in sorted_ids.iter().map(|(_k, v)| v).enumerate() {
result[*unordered as usize] =
(OrderedPathId::from_ordered_id(ordered as u32), *typ_code);
let mut result = vec![OrderedPathId::default(); sorted_ids.len()];
for (ordered, unordered) in sorted_ids.iter().map(|(_k, v)| v).enumerate() {
result[**unordered as usize] = OrderedPathId::from_ordered_id(ordered as u32);
}
result
}
@@ -86,12 +74,12 @@ mod tests {
let terms = vec!["b", "a", "b", "c"];
let ids = terms
.iter()
.map(|term| path_to_id.get_or_allocate_unordered_id(term, ColumnType::Str))
.map(|term| path_to_id.get_or_allocate_unordered_id(term))
.collect::<Vec<u32>>();
assert_eq!(ids, vec![0, 1, 0, 2]);
let ordered_ids = ids
.iter()
.map(|id| path_to_id.unordered_id_to_ordered_id()[*id as usize].0)
.map(|id| path_to_id.unordered_id_to_ordered_id()[*id as usize])
.collect::<Vec<OrderedPathId>>();
assert_eq!(ordered_ids, vec![1.into(), 0.into(), 1.into(), 2.into()]);
// Fetch terms

View File

@@ -12,7 +12,6 @@ pub struct SegmentSerializer {
segment: Segment,
pub(crate) store_writer: StoreWriter,
fast_field_write: WritePtr,
field_list_write: WritePtr,
fieldnorms_serializer: Option<FieldNormsSerializer>,
postings_serializer: InvertedIndexSerializer,
}
@@ -50,7 +49,6 @@ impl SegmentSerializer {
};
let fast_field_write = segment.open_write(SegmentComponent::FastFields)?;
let field_list_write = segment.open_write(SegmentComponent::FieldList)?;
let fieldnorms_write = segment.open_write(SegmentComponent::FieldNorms)?;
let fieldnorms_serializer = FieldNormsSerializer::from_write(fieldnorms_write)?;
@@ -60,7 +58,6 @@ impl SegmentSerializer {
segment,
store_writer,
fast_field_write,
field_list_write,
fieldnorms_serializer: Some(fieldnorms_serializer),
postings_serializer,
})
@@ -84,11 +81,6 @@ impl SegmentSerializer {
&mut self.postings_serializer
}
/// Accessor to the ``.
pub fn get_field_list_write(&mut self) -> &mut WritePtr {
&mut self.field_list_write
}
/// Accessor to the `FastFieldSerializer`.
pub fn get_fast_field_write(&mut self) -> &mut WritePtr {
&mut self.fast_field_write
@@ -112,7 +104,6 @@ impl SegmentSerializer {
fieldnorms_serializer.close()?;
}
self.fast_field_write.terminate()?;
self.field_list_write.terminate()?;
self.postings_serializer.close()?;
self.store_writer.close()?;
Ok(())

View File

@@ -8,7 +8,6 @@ use super::operation::AddOperation;
use crate::core::json_utils::index_json_values;
use crate::core::Segment;
use crate::fastfield::FastFieldsWriter;
use crate::field_list::serialize_segment_fields;
use crate::fieldnorm::{FieldNormReaders, FieldNormsWriter};
use crate::indexer::segment_serializer::SegmentSerializer;
use crate::postings::{
@@ -444,29 +443,16 @@ fn remap_and_write(
.segment()
.open_read(SegmentComponent::FieldNorms)?;
let fieldnorm_readers = FieldNormReaders::open(fieldnorm_data)?;
let unordered_id_to_ordered_id = ctx.path_to_unordered_id.unordered_id_to_ordered_id();
serialize_postings(
&ctx,
schema.clone(),
ctx,
schema,
per_field_postings_writers,
fieldnorm_readers,
doc_id_map,
&unordered_id_to_ordered_id,
serializer.get_postings_serializer(),
)?;
debug!("fastfield-serialize");
let columns = fast_field_writers.serialize(serializer.get_fast_field_write(), doc_id_map)?;
let field_list_serializer = serializer.get_field_list_write();
serialize_segment_fields(
ctx,
field_list_serializer,
&schema,
&unordered_id_to_ordered_id,
columns,
)?;
fast_field_writers.serialize(serializer.get_fast_field_write(), doc_id_map)?;
// finalize temp docstore and create version, which reflects the doc_id_map
if let Some(doc_id_map) = doc_id_map {

View File

@@ -188,7 +188,6 @@ pub mod aggregation;
pub mod collector;
pub mod directory;
pub mod fastfield;
pub mod field_list;
pub mod fieldnorm;
pub mod positions;
pub mod postings;
@@ -239,9 +238,7 @@ pub use crate::schema::DatePrecision;
pub use crate::schema::{DateOptions, DateTimePrecision, Document, TantivyDocument, Term};
/// Index format version.
///
/// Version 7: Add `.fieldlist` file containing the list of fields in a segment.
const INDEX_FORMAT_VERSION: u32 = 7;
const INDEX_FORMAT_VERSION: u32 = 6;
/// Oldest index format version this tantivy version can read.
const INDEX_FORMAT_OLDEST_SUPPORTED_VERSION: u32 = 4;

View File

@@ -2,7 +2,6 @@ use std::io;
use std::marker::PhantomData;
use std::ops::Range;
use common::TinySet;
use stacker::Addr;
use crate::fieldnorm::FieldNormReaders;
@@ -47,38 +46,37 @@ fn make_field_partition(
/// It pushes all term, one field at a time, towards the
/// postings serializer.
pub(crate) fn serialize_postings(
ctx: &IndexingContext,
ctx: IndexingContext,
schema: Schema,
per_field_postings_writers: &PerFieldPostingsWriter,
fieldnorm_readers: FieldNormReaders,
doc_id_map: Option<&DocIdMapping>,
unordered_id_to_ordered_id: &[(OrderedPathId, TinySet)],
serializer: &mut InvertedIndexSerializer,
) -> crate::Result<()> {
// Replace unordered ids by ordered ids to be able to sort
let ordered_id_to_path = ctx.path_to_unordered_id.ordered_id_to_path();
let unordered_id_to_ordered_id: Vec<OrderedPathId> =
ctx.path_to_unordered_id.unordered_id_to_ordered_id();
let mut term_offsets: Vec<(Field, OrderedPathId, &[u8], Addr)> =
Vec::with_capacity(ctx.term_index.len());
for (key, addr) in ctx.term_index.iter() {
term_offsets.extend(ctx.term_index.iter().map(|(key, addr)| {
let field = Term::wrap(key).field();
let field_entry = schema.get_field_entry(field);
if field_entry.field_type().value_type() == Type::Json {
let byte_range_unordered_id = 5..5 + 4;
let unordered_id =
u32::from_be_bytes(key[byte_range_unordered_id.clone()].try_into().unwrap());
let (path_id, _typ_code_bitvec) = unordered_id_to_ordered_id[unordered_id as usize];
term_offsets.push((field, path_id, &key[byte_range_unordered_id.end..], addr));
if schema.get_field_entry(field).field_type().value_type() == Type::Json {
let byte_range_path = 5..5 + 4;
let unordered_id = u32::from_be_bytes(key[byte_range_path.clone()].try_into().unwrap());
let path_id = unordered_id_to_ordered_id[unordered_id as usize];
(field, path_id, &key[byte_range_path.end..], addr)
} else {
term_offsets.push((field, 0.into(), &key[5..], addr));
(field, 0.into(), &key[5..], addr)
}
}
}));
// Sort by field, path, and term
term_offsets.sort_unstable_by(
|(field1, path_id1, bytes1, _), (field2, path_id2, bytes2, _)| {
(field1, path_id1, bytes1).cmp(&(field2, path_id2, bytes2))
},
);
let ordered_id_to_path = ctx.path_to_unordered_id.ordered_id_to_path();
let field_offsets = make_field_partition(&term_offsets);
for (field, byte_offsets) in field_offsets {
let postings_writer = per_field_postings_writers.get_for_field(field);
@@ -89,7 +87,7 @@ pub(crate) fn serialize_postings(
&term_offsets[byte_offsets],
&ordered_id_to_path,
doc_id_map,
ctx,
&ctx,
&mut field_serializer,
)?;
field_serializer.close()?;

View File

@@ -117,7 +117,6 @@ impl SegmentSpaceUsage {
use self::ComponentSpaceUsage::*;
use crate::SegmentComponent::*;
match component {
FieldList => ComponentSpaceUsage::Basic(ByteCount::from(0u64)),
Postings => PerField(self.postings().clone()),
Positions => PerField(self.positions().clone()),
FastFields => PerField(self.fast_fields().clone()),

View File

@@ -37,6 +37,8 @@ mod reader;
mod writer;
pub use self::compressors::{Compressor, ZstdCompressor};
pub use self::decompressors::Decompressor;
#[cfg(feature = "quickwit")]
pub(crate) use self::reader::CacheKey;
pub(crate) use self::reader::DOCSTORE_CACHE_CAPACITY;
pub use self::reader::{CacheStats, StoreReader};
pub use self::writer::StoreWriter;

View File

@@ -40,6 +40,15 @@ struct BlockCache {
}
impl BlockCache {
fn new(cache_num_blocks: usize) -> Self {
Self {
cache: NonZeroUsize::new(cache_num_blocks)
.map(|cache_num_blocks| Mutex::new(LruCache::new(cache_num_blocks))),
cache_hits: Default::default(),
cache_misses: Default::default(),
}
}
fn get_from_cache(&self, pos: usize) -> Option<Block> {
if let Some(block) = self
.cache
@@ -81,6 +90,10 @@ impl BlockCache {
}
}
/// Opaque cache key which indicates which documents are cached together.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub(crate) struct CacheKey(usize);
#[derive(Debug, Default)]
/// CacheStats for the `StoreReader`.
pub struct CacheStats {
@@ -128,17 +141,35 @@ impl StoreReader {
Ok(StoreReader {
decompressor: footer.decompressor,
data: data_file,
cache: BlockCache {
cache: NonZeroUsize::new(cache_num_blocks)
.map(|cache_num_blocks| Mutex::new(LruCache::new(cache_num_blocks))),
cache_hits: Default::default(),
cache_misses: Default::default(),
},
cache: BlockCache::new(cache_num_blocks),
skip_index: Arc::new(skip_index),
space_usage,
})
}
/// Clones the given store reader with an independent block cache of the given size.
///
/// `cache_keys` is used to seed the forked cache from the current cache
/// if some blocks are already available.
#[cfg(feature = "quickwit")]
pub(crate) fn fork_cache(&self, cache_num_blocks: usize, cache_keys: &[CacheKey]) -> Self {
let forked = Self {
decompressor: self.decompressor,
data: self.data.clone(),
cache: BlockCache::new(cache_num_blocks),
skip_index: Arc::clone(&self.skip_index),
space_usage: self.space_usage.clone(),
};
for &CacheKey(pos) in cache_keys {
if let Some(block) = self.cache.get_from_cache(pos) {
forked.cache.put_into_cache(pos, block);
}
}
forked
}
pub(crate) fn block_checkpoints(&self) -> impl Iterator<Item = Checkpoint> + '_ {
self.skip_index.checkpoints()
}
@@ -152,6 +183,21 @@ impl StoreReader {
self.cache.stats()
}
/// Returns the cache key for a given document
///
/// These keys are opaque and are not used with the public API,
/// but having the same cache key means that the documents
/// will only require one I/O and decompression operation
/// when retrieve from the same store reader consecutively.
///
/// Note that looking up the cache key of a document
/// will not yet pull anything into the block cache.
#[cfg(feature = "quickwit")]
pub(crate) fn cache_key(&self, doc_id: DocId) -> crate::Result<CacheKey> {
let checkpoint = self.block_checkpoint(doc_id)?;
Ok(CacheKey(checkpoint.byte_range.start))
}
/// Get checkpoint for `DocId`. The checkpoint can be used to load a block containing the
/// document.
///