From bb57e63522e677f9ac68a28d53ec423cad615848 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Wed, 13 Dec 2023 15:52:39 +0800 Subject: [PATCH] Store List of Fields in Segment Fiels may be encoded in the columnar storage or in the inverted index for JSON fields. Add a new Segment file that contains the list of fields (schema + encoded) --- Cargo.toml | 4 +- columnar/src/columnar/column_type.rs | 2 +- columnar/src/columnar/writer/mod.rs | 16 +- common/src/bitset.rs | 2 +- src/core/index_meta.rs | 1 + src/core/inverted_index_reader.rs | 2 +- src/core/json_utils.rs | 18 +- src/core/segment_component.rs | 5 +- src/core/segment_reader.rs | 104 ++------ src/fastfield/writer.rs | 10 +- src/field_list/mod.rs | 369 +++++++++++++++++++++++++++ src/indexer/merger.rs | 20 +- src/indexer/path_to_unordered_id.rs | 40 ++- src/indexer/segment_serializer.rs | 9 + src/indexer/segment_writer.rs | 20 +- src/lib.rs | 5 +- src/postings/postings_writer.rs | 28 +- src/space_usage/mod.rs | 1 + 18 files changed, 521 insertions(+), 135 deletions(-) create mode 100644 src/field_list/mod.rs diff --git a/Cargo.toml b/Cargo.toml index 90aeb97f4..5cf0f2d57 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,7 @@ aho-corasick = "1.0" tantivy-fst = "0.5" memmap2 = { version = "0.9.0", optional = true } lz4_flex = { version = "0.11", default-features = false, optional = true } -zstd = { version = "0.13", optional = true, default-features = false } +zstd = { version = "0.13", default-features = false } tempfile = { version = "3.3.0", optional = true } log = "0.4.16" serde = { version = "1.0.136", features = ["derive"] } @@ -105,7 +105,7 @@ mmap = ["fs4", "tempfile", "memmap2"] stopwords = [] lz4-compression = ["lz4_flex"] -zstd-compression = ["zstd"] +zstd-compression = [] failpoints = ["fail", "fail/failpoints"] unstable = [] # useful for benches. diff --git a/columnar/src/columnar/column_type.rs b/columnar/src/columnar/column_type.rs index ac61a7253..ed2f001fb 100644 --- a/columnar/src/columnar/column_type.rs +++ b/columnar/src/columnar/column_type.rs @@ -58,7 +58,7 @@ impl ColumnType { self == &ColumnType::DateTime } - pub(crate) fn try_from_code(code: u8) -> Result { + pub fn try_from_code(code: u8) -> Result { COLUMN_TYPES.get(code as usize).copied().ok_or(InvalidData) } } diff --git a/columnar/src/columnar/writer/mod.rs b/columnar/src/columnar/writer/mod.rs index 53f0088c8..56875c772 100644 --- a/columnar/src/columnar/writer/mod.rs +++ b/columnar/src/columnar/writer/mod.rs @@ -333,7 +333,7 @@ impl ColumnarWriter { num_docs: RowId, old_to_new_row_ids: Option<&[RowId]>, wrt: &mut dyn io::Write, - ) -> io::Result<()> { + ) -> io::Result> { let mut serializer = ColumnarSerializer::new(wrt); let mut columns: Vec<(&[u8], ColumnType, Addr)> = self .numerical_field_hash_map @@ -374,7 +374,9 @@ impl ColumnarWriter { let (arena, buffers, dictionaries) = (&self.arena, &mut self.buffers, &self.dictionaries); let mut symbol_byte_buffer: Vec = Vec::new(); - for (column_name, column_type, addr) in columns { + for (column_name, column_type, addr) in columns.iter() { + let column_type = *column_type; + let addr = *addr; match column_type { ColumnType::Bool => { let column_writer: ColumnWriter = self.bool_field_hash_map.read(addr); @@ -485,7 +487,15 @@ impl ColumnarWriter { }; } serializer.finalize(num_docs)?; - Ok(()) + Ok(columns + .into_iter() + .map(|(column_name, column_type, _)| { + ( + String::from_utf8_lossy(column_name).to_string(), + column_type, + ) + }) + .collect()) } } diff --git a/common/src/bitset.rs b/common/src/bitset.rs index 6932b0416..f6ed06b7b 100644 --- a/common/src/bitset.rs +++ b/common/src/bitset.rs @@ -6,7 +6,7 @@ use ownedbytes::OwnedBytes; use crate::ByteCount; -#[derive(Clone, Copy, Eq, PartialEq)] +#[derive(Clone, Copy, Eq, PartialEq, Hash)] pub struct TinySet(u64); impl fmt::Debug for TinySet { diff --git a/src/core/index_meta.rs b/src/core/index_meta.rs index 0ed61e2a6..437adc37a 100644 --- a/src/core/index_meta.rs +++ b/src/core/index_meta.rs @@ -142,6 +142,7 @@ impl SegmentMeta { SegmentComponent::FastFields => ".fast".to_string(), SegmentComponent::FieldNorms => ".fieldnorm".to_string(), SegmentComponent::Delete => format!(".{}.del", self.delete_opstamp().unwrap_or(0)), + SegmentComponent::FieldList => ".fieldlist".to_string(), }); PathBuf::from(path) } diff --git a/src/core/inverted_index_reader.rs b/src/core/inverted_index_reader.rs index 059ec988c..3a96205a8 100644 --- a/src/core/inverted_index_reader.rs +++ b/src/core/inverted_index_reader.rs @@ -70,7 +70,7 @@ impl InvertedIndexReader { &self.termdict } - /// Return the fields and types encoded in the dictionary in lexicographic oder. + /// Return the fields and types encoded in the dictionary in lexicographic order. /// Only valid on JSON fields. /// /// Notice: This requires a full scan and therefore **very expensive**. diff --git a/src/core/json_utils.rs b/src/core/json_utils.rs index 09059ddbf..9656fda0b 100644 --- a/src/core/json_utils.rs +++ b/src/core/json_utils.rs @@ -1,4 +1,4 @@ -use columnar::MonotonicallyMappableToU64; +use columnar::{ColumnType, MonotonicallyMappableToU64}; use common::{replace_in_place, JsonPathWriter}; use rustc_hash::FxHashMap; @@ -153,7 +153,7 @@ fn index_json_value<'a, V: Value<'a>>( let mut token_stream = text_analyzer.token_stream(val); let unordered_id = ctx .path_to_unordered_id - .get_or_allocate_unordered_id(json_path_writer.as_str()); + .get_or_allocate_unordered_id(json_path_writer.as_str(), ColumnType::Str); // TODO: make sure the chain position works out. set_path_id(term_buffer, unordered_id); @@ -171,7 +171,7 @@ fn index_json_value<'a, V: Value<'a>>( set_path_id( term_buffer, ctx.path_to_unordered_id - .get_or_allocate_unordered_id(json_path_writer.as_str()), + .get_or_allocate_unordered_id(json_path_writer.as_str(), ColumnType::U64), ); term_buffer.append_type_and_fast_value(val); postings_writer.subscribe(doc, 0u32, term_buffer, ctx); @@ -180,7 +180,7 @@ fn index_json_value<'a, V: Value<'a>>( set_path_id( term_buffer, ctx.path_to_unordered_id - .get_or_allocate_unordered_id(json_path_writer.as_str()), + .get_or_allocate_unordered_id(json_path_writer.as_str(), ColumnType::I64), ); term_buffer.append_type_and_fast_value(val); postings_writer.subscribe(doc, 0u32, term_buffer, ctx); @@ -189,7 +189,7 @@ fn index_json_value<'a, V: Value<'a>>( set_path_id( term_buffer, ctx.path_to_unordered_id - .get_or_allocate_unordered_id(json_path_writer.as_str()), + .get_or_allocate_unordered_id(json_path_writer.as_str(), ColumnType::F64), ); term_buffer.append_type_and_fast_value(val); postings_writer.subscribe(doc, 0u32, term_buffer, ctx); @@ -198,7 +198,7 @@ fn index_json_value<'a, V: Value<'a>>( set_path_id( term_buffer, ctx.path_to_unordered_id - .get_or_allocate_unordered_id(json_path_writer.as_str()), + .get_or_allocate_unordered_id(json_path_writer.as_str(), ColumnType::Bool), ); term_buffer.append_type_and_fast_value(val); postings_writer.subscribe(doc, 0u32, term_buffer, ctx); @@ -206,8 +206,10 @@ fn index_json_value<'a, V: Value<'a>>( ReferenceValueLeaf::Date(val) => { set_path_id( term_buffer, - ctx.path_to_unordered_id - .get_or_allocate_unordered_id(json_path_writer.as_str()), + ctx.path_to_unordered_id.get_or_allocate_unordered_id( + json_path_writer.as_str(), + ColumnType::DateTime, + ), ); term_buffer.append_type_and_fast_value(val); postings_writer.subscribe(doc, 0u32, term_buffer, ctx); diff --git a/src/core/segment_component.rs b/src/core/segment_component.rs index eb757a80d..79da7ba69 100644 --- a/src/core/segment_component.rs +++ b/src/core/segment_component.rs @@ -27,12 +27,14 @@ pub enum SegmentComponent { /// Bitset describing which document of the segment is alive. /// (It was representing deleted docs but changed to represent alive docs from v0.17) Delete, + /// Field list describing the fields in the segment. + FieldList, } impl SegmentComponent { /// Iterates through the components. pub fn iterator() -> slice::Iter<'static, SegmentComponent> { - static SEGMENT_COMPONENTS: [SegmentComponent; 8] = [ + static SEGMENT_COMPONENTS: [SegmentComponent; 9] = [ SegmentComponent::Postings, SegmentComponent::Positions, SegmentComponent::FastFields, @@ -41,6 +43,7 @@ impl SegmentComponent { SegmentComponent::Store, SegmentComponent::TempStore, SegmentComponent::Delete, + SegmentComponent::FieldList, ]; SEGMENT_COMPONENTS.iter() } diff --git a/src/core/segment_reader.rs b/src/core/segment_reader.rs index cae1b537d..a132e4a13 100644 --- a/src/core/segment_reader.rs +++ b/src/core/segment_reader.rs @@ -3,15 +3,14 @@ use std::ops::BitOrAssign; use std::sync::{Arc, RwLock}; use std::{fmt, io}; -use fnv::FnvHashMap; use itertools::Itertools; use crate::core::{InvertedIndexReader, Segment, SegmentComponent, SegmentId}; use crate::directory::{CompositeFile, FileSlice}; use crate::error::DataCorruption; use crate::fastfield::{intersect_alive_bitsets, AliveBitSet, FacetReader, FastFieldReaders}; +use crate::field_list::read_split_fields; use crate::fieldnorm::{FieldNormReader, FieldNormReaders}; -use crate::json_utils::json_path_sep_to_dot; use crate::schema::{Field, IndexRecordOption, Schema, Type}; use crate::space_usage::SegmentSpaceUsage; use crate::store::StoreReader; @@ -44,6 +43,7 @@ pub struct SegmentReader { fast_fields_readers: FastFieldReaders, fieldnorm_readers: FieldNormReaders, + list_fields_file: Option, // Optional field list file for backwards compatibility store_file: FileSlice, alive_bitset_opt: Option, schema: Schema, @@ -153,6 +153,7 @@ impl SegmentReader { let termdict_composite = CompositeFile::open(&termdict_file)?; let store_file = segment.open_read(SegmentComponent::Store)?; + let list_fields_file = segment.open_read(SegmentComponent::FieldList).ok(); crate::fail_point!("SegmentReader::open#middle"); @@ -201,6 +202,7 @@ impl SegmentReader { segment_id: segment.id(), delete_opstamp: segment.meta().delete_opstamp(), store_file, + list_fields_file, alive_bitset_opt, positions_composite, schema, @@ -299,87 +301,25 @@ impl SegmentReader { /// field that is not indexed nor a fast field but is stored, it is possible for the field /// to not be listed. pub fn fields_metadata(&self) -> crate::Result> { - let mut indexed_fields: Vec = Vec::new(); - let mut map_to_canonical = FnvHashMap::default(); - for (field, field_entry) in self.schema().fields() { - let field_name = field_entry.name().to_string(); - let is_indexed = field_entry.is_indexed(); - - if is_indexed { - let is_json = field_entry.field_type().value_type() == Type::Json; - if is_json { - let inv_index = self.inverted_index(field)?; - let encoded_fields_in_index = inv_index.list_encoded_fields()?; - let mut build_path = |field_name: &str, mut json_path: String| { - // In this case we need to map the potential fast field to the field name - // accepted by the query parser. - let create_canonical = - !field_entry.is_expand_dots_enabled() && json_path.contains('.'); - if create_canonical { - // Without expand dots enabled dots need to be escaped. - let escaped_json_path = json_path.replace('.', "\\."); - let full_path = format!("{}.{}", field_name, escaped_json_path); - let full_path_unescaped = format!("{}.{}", field_name, &json_path); - map_to_canonical.insert(full_path_unescaped, full_path.to_string()); - full_path - } else { - // With expand dots enabled, we can use '.' instead of '\u{1}'. - json_path_sep_to_dot(&mut json_path); - format!("{}.{}", field_name, json_path) - } - }; - indexed_fields.extend( - encoded_fields_in_index - .into_iter() - .map(|(name, typ)| (build_path(&field_name, name), typ)) - .map(|(field_name, typ)| FieldMetadata { - indexed: true, - stored: false, - field_name, - fast: false, - typ, - }), - ); - } else { - indexed_fields.push(FieldMetadata { - indexed: true, - stored: false, - field_name: field_name.to_string(), - fast: false, - typ: field_entry.field_type().value_type(), - }); - } - } + if let Some(list_fields_file) = self.list_fields_file.as_ref() { + let file = list_fields_file.read_bytes()?; + let fields_metadata = + read_split_fields(file)?.collect::>>(); + fields_metadata.map_err(|e| e.into()) + } else { + // Schema fallback + Ok(self + .schema() + .fields() + .map(|(_field, entry)| FieldMetadata { + field_name: entry.name().to_string(), + typ: entry.field_type().value_type(), + indexed: entry.is_indexed(), + stored: entry.is_stored(), + fast: entry.is_fast(), + }) + .collect()) } - let mut fast_fields: Vec = self - .fast_fields() - .columnar() - .iter_columns()? - .map(|(mut field_name, handle)| { - json_path_sep_to_dot(&mut field_name); - // map to canonical path, to avoid similar but different entries. - // Eventually we should just accept '.' seperated for all cases. - let field_name = map_to_canonical - .get(&field_name) - .unwrap_or(&field_name) - .to_string(); - FieldMetadata { - indexed: false, - stored: false, - field_name, - fast: true, - typ: Type::from(handle.column_type()), - } - }) - .collect(); - // Since the type is encoded differently in the fast field and in the inverted index, - // the order of the fields is not guaranteed to be the same. Therefore, we sort the fields. - // If we are sure that the order is the same, we can remove this sort. - indexed_fields.sort_unstable(); - fast_fields.sort_unstable(); - let merged = merge_field_meta_data(vec![indexed_fields, fast_fields], &self.schema); - - Ok(merged) } /// Returns the segment id diff --git a/src/fastfield/writer.rs b/src/fastfield/writer.rs index ca0da8145..526246d34 100644 --- a/src/fastfield/writer.rs +++ b/src/fastfield/writer.rs @@ -238,13 +238,17 @@ impl FastFieldsWriter { mut self, wrt: &mut dyn io::Write, doc_id_map_opt: Option<&DocIdMapping>, - ) -> io::Result<()> { + ) -> io::Result> { let num_docs = self.num_docs; let old_to_new_row_ids = doc_id_map_opt.map(|doc_id_mapping| doc_id_mapping.old_to_new_ids()); - self.columnar_writer + let columns = self + .columnar_writer .serialize(num_docs, old_to_new_row_ids, wrt)?; - Ok(()) + Ok(columns + .into_iter() + .map(|(field_name, column)| (field_name.to_string(), column.into())) + .collect()) } } diff --git a/src/field_list/mod.rs b/src/field_list/mod.rs new file mode 100644 index 000000000..1967a346c --- /dev/null +++ b/src/field_list/mod.rs @@ -0,0 +1,369 @@ +//! The list of fields that are stored in a `tantivy` `Index`. + +use std::collections::HashSet; +use std::io::{self, ErrorKind, Read}; + +use columnar::ColumnType; +use common::TinySet; +use fnv::FnvHashMap; + +use crate::indexer::path_to_unordered_id::OrderedPathId; +use crate::json_utils::json_path_sep_to_dot; +use crate::postings::IndexingContext; +use crate::schema::{Field, Schema, Type}; +use crate::{merge_field_meta_data, FieldMetadata, Term}; + +#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)] +pub(crate) struct FieldConfig { + pub typ: Type, + pub indexed: bool, + pub stored: bool, + pub fast: bool, +} + +impl FieldConfig { + fn serialize(&self) -> [u8; 2] { + let typ = self.typ.to_code(); + let flags = (self.indexed as u8) << 2 | (self.stored as u8) << 1 | (self.fast as u8); + [typ, flags] + } + fn deserialize_from(data: [u8; 2]) -> io::Result { + let typ = Type::from_code(data[0]).ok_or_else(|| { + io::Error::new( + ErrorKind::InvalidData, + format!("could not deserialize type {}", data[0]), + ) + })?; + + let data = data[1]; + let indexed = (data & 0b100) != 0; + let stored = (data & 0b010) != 0; + let fast = (data & 0b001) != 0; + + Ok(FieldConfig { + typ, + indexed, + stored, + fast, + }) + } +} +/// Serializes the split fields. +pub(crate) fn serialize_segment_fields( + ctx: IndexingContext, + wrt: &mut dyn io::Write, + schema: &Schema, + unordered_id_to_ordered_id: &[(OrderedPathId, TinySet)], + mut columns: Vec<(String, Type)>, +) -> crate::Result<()> { + let mut field_list_set: HashSet<(Field, OrderedPathId, TinySet)> = HashSet::default(); + let mut encoded_fields = Vec::new(); + let mut map_to_canonical = FnvHashMap::default(); + + // Replace unordered ids by ordered ids to be able to sort + let ordered_id_to_path = ctx.path_to_unordered_id.ordered_id_to_path(); + + for (key, _addr) in ctx.term_index.iter() { + let field = Term::wrap(key).field(); + let field_entry = schema.get_field_entry(field); + if field_entry.field_type().value_type() == Type::Json { + let byte_range_unordered_id = 5..5 + 4; + let unordered_id = + u32::from_be_bytes(key[byte_range_unordered_id.clone()].try_into().unwrap()); + let (path_id, typ_code_bitvec) = unordered_id_to_ordered_id[unordered_id as usize]; + if !field_list_set.contains(&(field, path_id, typ_code_bitvec)) { + field_list_set.insert((field, path_id, typ_code_bitvec)); + let mut build_path = |field_name: &str, mut json_path: String| { + // In this case we need to map the potential fast field to the field name + // accepted by the query parser. + let create_canonical = + !field_entry.is_expand_dots_enabled() && json_path.contains('.'); + if create_canonical { + // Without expand dots enabled dots need to be escaped. + let escaped_json_path = json_path.replace('.', "\\."); + let full_path = format!("{}.{}", field_name, escaped_json_path); + let full_path_unescaped = format!("{}.{}", field_name, &json_path); + map_to_canonical.insert(full_path_unescaped, full_path.to_string()); + full_path + } else { + // With expand dots enabled, we can use '.' instead of '\u{1}'. + json_path_sep_to_dot(&mut json_path); + format!("{}.{}", field_name, json_path) + } + }; + + let path = build_path( + field_entry.name(), + ordered_id_to_path[path_id.path_id() as usize].to_string(), /* String::from_utf8(key[5..].to_vec()).unwrap(), */ + ); + encoded_fields.push((path, typ_code_bitvec)); + } + } + } + + let mut indexed_fields: Vec = Vec::new(); + for (_field, field_entry) in schema.fields() { + let field_name = field_entry.name().to_string(); + let is_indexed = field_entry.is_indexed(); + + let is_json = field_entry.field_type().value_type() == Type::Json; + if is_indexed && !is_json { + indexed_fields.push(FieldMetadata { + indexed: true, + stored: false, + field_name: field_name.to_string(), + fast: false, + typ: field_entry.field_type().value_type(), + }); + } + } + for (field_name, field_type_set) in encoded_fields { + for field_type in field_type_set { + let column_type = ColumnType::try_from_code(field_type as u8).unwrap(); + indexed_fields.push(FieldMetadata { + indexed: true, + stored: false, + field_name: field_name.to_string(), + fast: false, + typ: Type::from(column_type), + }); + } + } + let mut fast_fields: Vec = columns + .iter_mut() + .map(|(field_name, typ)| { + json_path_sep_to_dot(field_name); + // map to canonical path, to avoid similar but different entries. + // Eventually we should just accept '.' seperated for all cases. + let field_name = map_to_canonical + .get(field_name) + .unwrap_or(field_name) + .to_string(); + FieldMetadata { + indexed: false, + stored: false, + field_name, + fast: true, + typ: *typ, + } + }) + .collect(); + // Since the type is encoded differently in the fast field and in the inverted index, + // the order of the fields is not guaranteed to be the same. Therefore, we sort the fields. + // If we are sure that the order is the same, we can remove this sort. + indexed_fields.sort_unstable(); + fast_fields.sort_unstable(); + let merged = merge_field_meta_data(vec![indexed_fields, fast_fields], schema); + let out = serialize_split_fields(&merged); + wrt.write_all(&out)?; + + Ok(()) +} + +/// Serializes the Split fields. +/// +/// `fields_metadata` has to be sorted. +pub fn serialize_split_fields(fields_metadata: &[FieldMetadata]) -> Vec { + // ensure that fields_metadata is strictly sorted. + debug_assert!(fields_metadata.windows(2).all(|w| w[0] < w[1])); + let mut payload = Vec::new(); + // Write Num Fields + let length = fields_metadata.len() as u32; + payload.extend_from_slice(&length.to_le_bytes()); + + for field_metadata in fields_metadata { + write_field(field_metadata, &mut payload); + } + let compression_level = 3; + let payload_compressed = zstd::stream::encode_all(&mut &payload[..], compression_level) + .expect("zstd encoding failed"); + let mut out = Vec::new(); + // Write Header -- Format Version + let format_version = 1u8; + out.push(format_version); + // Write Payload + out.extend_from_slice(&payload_compressed); + out +} + +fn write_field(field_metadata: &FieldMetadata, out: &mut Vec) { + let field_config = FieldConfig { + typ: field_metadata.typ, + indexed: field_metadata.indexed, + stored: field_metadata.stored, + fast: field_metadata.fast, + }; + + // Write Config 2 bytes + out.extend_from_slice(&field_config.serialize()); + let str_length = field_metadata.field_name.len() as u16; + // Write String length 2 bytes + out.extend_from_slice(&str_length.to_le_bytes()); + out.extend_from_slice(field_metadata.field_name.as_bytes()); +} + +/// Reads a fixed number of bytes into an array and returns the array. +fn read_exact_array(reader: &mut R) -> io::Result<[u8; N]> { + let mut buffer = [0u8; N]; + reader.read_exact(&mut buffer)?; + Ok(buffer) +} + +/// Reads the Split fields from a zstd compressed stream of bytes +pub fn read_split_fields( + mut reader: R, +) -> io::Result>> { + let format_version = read_exact_array::<_, 1>(&mut reader)?[0]; + assert_eq!(format_version, 1); + let reader = zstd::Decoder::new(reader)?; + read_split_fields_from_zstd(reader) +} + +fn read_field(reader: &mut R) -> io::Result { + // Read FieldConfig (2 bytes) + let config_bytes = read_exact_array::<_, 2>(reader)?; + let field_config = FieldConfig::deserialize_from(config_bytes)?; // Assuming this returns a Result + + // Read field name length and the field name + let name_len = u16::from_le_bytes(read_exact_array::<_, 2>(reader)?) as usize; + + let mut data = vec![0; name_len]; + reader.read_exact(&mut data)?; + + let field_name = String::from_utf8(data).map_err(|err| { + io::Error::new( + ErrorKind::InvalidData, + format!( + "Encountered invalid utf8 when deserializing field name: {}", + err + ), + ) + })?; + Ok(FieldMetadata { + field_name, + typ: field_config.typ, + indexed: field_config.indexed, + stored: field_config.stored, + fast: field_config.fast, + }) +} + +/// Reads the Split fields from a stream of bytes +fn read_split_fields_from_zstd( + mut reader: R, +) -> io::Result>> { + let mut num_fields = u32::from_le_bytes(read_exact_array::<_, 4>(&mut reader)?); + + Ok(std::iter::from_fn(move || { + if num_fields == 0 { + return None; + } + num_fields -= 1; + + Some(read_field(&mut reader)) + })) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn field_config_deser_test() { + let field_config = FieldConfig { + typ: Type::Str, + indexed: true, + stored: false, + fast: true, + }; + let serialized = field_config.serialize(); + let deserialized = FieldConfig::deserialize_from(serialized).unwrap(); + assert_eq!(field_config, deserialized); + } + #[test] + fn write_read_field_test() { + for typ in Type::iter_values() { + let field_metadata = FieldMetadata { + field_name: "test".to_string(), + typ, + indexed: true, + stored: true, + fast: true, + }; + let mut out = Vec::new(); + write_field(&field_metadata, &mut out); + let deserialized = read_field(&mut &out[..]).unwrap(); + assert_eq!(field_metadata, deserialized); + } + let field_metadata = FieldMetadata { + field_name: "test".to_string(), + typ: Type::Str, + indexed: false, + stored: true, + fast: true, + }; + let mut out = Vec::new(); + write_field(&field_metadata, &mut out); + let deserialized = read_field(&mut &out[..]).unwrap(); + assert_eq!(field_metadata, deserialized); + + let field_metadata = FieldMetadata { + field_name: "test".to_string(), + typ: Type::Str, + indexed: false, + stored: false, + fast: true, + }; + let mut out = Vec::new(); + write_field(&field_metadata, &mut out); + let deserialized = read_field(&mut &out[..]).unwrap(); + assert_eq!(field_metadata, deserialized); + + let field_metadata = FieldMetadata { + field_name: "test".to_string(), + typ: Type::Str, + indexed: true, + stored: false, + fast: false, + }; + let mut out = Vec::new(); + write_field(&field_metadata, &mut out); + let deserialized = read_field(&mut &out[..]).unwrap(); + assert_eq!(field_metadata, deserialized); + } + #[test] + fn write_split_fields_test() { + let fields_metadata = vec![ + FieldMetadata { + field_name: "test".to_string(), + typ: Type::Str, + indexed: true, + stored: true, + fast: true, + }, + FieldMetadata { + field_name: "test2".to_string(), + typ: Type::Str, + indexed: true, + stored: false, + fast: false, + }, + FieldMetadata { + field_name: "test3".to_string(), + typ: Type::U64, + indexed: true, + stored: false, + fast: true, + }, + ]; + + let out = serialize_split_fields(&fields_metadata); + + let deserialized: Vec = read_split_fields(&mut &out[..]) + .unwrap() + .map(|el| el.unwrap()) + .collect(); + + assert_eq!(fields_metadata, deserialized); + } +} diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 87bc4c8c8..6ad9eb008 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -1,3 +1,4 @@ +use std::io::Write; use std::sync::Arc; use columnar::{ @@ -13,6 +14,7 @@ use crate::directory::WritePtr; use crate::docset::{DocSet, TERMINATED}; use crate::error::DataCorruption; use crate::fastfield::{AliveBitSet, FastFieldNotAvailableError}; +use crate::field_list::serialize_split_fields; use crate::fieldnorm::{FieldNormReader, FieldNormReaders, FieldNormsSerializer, FieldNormsWriter}; use crate::indexer::doc_id_mapping::{MappingType, SegmentDocIdMapping}; use crate::indexer::SegmentSerializer; @@ -21,8 +23,8 @@ use crate::schema::{value_type_to_column_type, Field, FieldType, Schema}; use crate::store::StoreWriter; use crate::termdict::{TermMerger, TermOrdinal}; use crate::{ - DocAddress, DocId, IndexSettings, IndexSortByField, InvertedIndexReader, Order, - SegmentComponent, SegmentOrdinal, + merge_field_meta_data, DocAddress, DocId, FieldMetadata, IndexSettings, IndexSortByField, + InvertedIndexReader, Order, SegmentComponent, SegmentOrdinal, }; /// Segment's max doc must be `< MAX_DOC_LIMIT`. @@ -255,6 +257,19 @@ impl IndexMerger { Ok(()) } + fn write_field_list(&self, list_field_wrt: &mut WritePtr) -> crate::Result<()> { + let field_metadatas: Vec> = self + .readers + .iter() + .map(|reader| reader.fields_metadata()) + .collect::>>()?; + let merged = merge_field_meta_data(field_metadatas, &self.schema); + let out = serialize_split_fields(&merged); + list_field_wrt.write_all(&out)?; + + Ok(()) + } + fn write_fast_fields( &self, fast_field_wrt: &mut WritePtr, @@ -773,6 +788,7 @@ impl IndexMerger { self.write_storable_fields(serializer.get_store_writer(), &doc_id_mapping)?; debug!("write-fastfields"); self.write_fast_fields(serializer.get_fast_field_write(), doc_id_mapping)?; + self.write_field_list(serializer.get_field_list_write())?; debug!("close-serializer"); serializer.close()?; diff --git a/src/indexer/path_to_unordered_id.rs b/src/indexer/path_to_unordered_id.rs index 054654f94..80b372b9a 100644 --- a/src/indexer/path_to_unordered_id.rs +++ b/src/indexer/path_to_unordered_id.rs @@ -1,3 +1,5 @@ +use columnar::ColumnType; +use common::TinySet; use fnv::FnvHashMap; /// `Field` is represented by an unsigned 32-bit integer type. @@ -24,34 +26,44 @@ impl From for OrderedPathId { #[derive(Default)] pub(crate) struct PathToUnorderedId { - map: FnvHashMap, + /// TinySet contains the type codes of the columns in the path. + map: FnvHashMap, } impl PathToUnorderedId { #[inline] - pub(crate) fn get_or_allocate_unordered_id(&mut self, path: &str) -> u32 { - if let Some(id) = self.map.get(path) { + pub(crate) fn get_or_allocate_unordered_id(&mut self, path: &str, typ: ColumnType) -> u32 { + let code_bit = typ.to_code(); + if let Some((id, all_codes)) = self.map.get_mut(path) { + *all_codes = all_codes.insert(code_bit as u32); return *id; } - self.insert_new_path(path) + self.insert_new_path(path, code_bit) } #[cold] - fn insert_new_path(&mut self, path: &str) -> u32 { + fn insert_new_path(&mut self, path: &str, typ_code: u8) -> u32 { let next_id = self.map.len() as u32; - self.map.insert(path.to_string(), next_id); + self.map.insert( + path.to_string(), + (next_id, TinySet::singleton(typ_code as u32)), + ); next_id } /// Retuns ids which reflect the lexical order of the paths. /// /// The returned vec can be indexed with the unordered id to get the ordered id. - pub(crate) fn unordered_id_to_ordered_id(&self) -> Vec { - let mut sorted_ids: Vec<(&str, &u32)> = - self.map.iter().map(|(k, v)| (k.as_str(), v)).collect(); + pub(crate) fn unordered_id_to_ordered_id(&self) -> Vec<(OrderedPathId, TinySet)> { + let mut sorted_ids: Vec<(&str, (u32, TinySet))> = self + .map + .iter() + .map(|(k, (id, typ_code))| (k.as_str(), (*id, *typ_code))) + .collect(); sorted_ids.sort_unstable_by_key(|(path, _)| *path); - let mut result = vec![OrderedPathId::default(); sorted_ids.len()]; - for (ordered, unordered) in sorted_ids.iter().map(|(_k, v)| v).enumerate() { - result[**unordered as usize] = OrderedPathId::from_ordered_id(ordered as u32); + let mut result = vec![(OrderedPathId::default(), TinySet::empty()); sorted_ids.len()]; + for (ordered, (unordered, typ_code)) in sorted_ids.iter().map(|(_k, v)| v).enumerate() { + result[*unordered as usize] = + (OrderedPathId::from_ordered_id(ordered as u32), *typ_code); } result } @@ -74,12 +86,12 @@ mod tests { let terms = vec!["b", "a", "b", "c"]; let ids = terms .iter() - .map(|term| path_to_id.get_or_allocate_unordered_id(term)) + .map(|term| path_to_id.get_or_allocate_unordered_id(term, ColumnType::Str)) .collect::>(); assert_eq!(ids, vec![0, 1, 0, 2]); let ordered_ids = ids .iter() - .map(|id| path_to_id.unordered_id_to_ordered_id()[*id as usize]) + .map(|id| path_to_id.unordered_id_to_ordered_id()[*id as usize].0) .collect::>(); assert_eq!(ordered_ids, vec![1.into(), 0.into(), 1.into(), 2.into()]); // Fetch terms diff --git a/src/indexer/segment_serializer.rs b/src/indexer/segment_serializer.rs index 6dcb442ff..f69eee3c8 100644 --- a/src/indexer/segment_serializer.rs +++ b/src/indexer/segment_serializer.rs @@ -12,6 +12,7 @@ pub struct SegmentSerializer { segment: Segment, pub(crate) store_writer: StoreWriter, fast_field_write: WritePtr, + field_list_write: WritePtr, fieldnorms_serializer: Option, postings_serializer: InvertedIndexSerializer, } @@ -49,6 +50,7 @@ impl SegmentSerializer { }; let fast_field_write = segment.open_write(SegmentComponent::FastFields)?; + let field_list_write = segment.open_write(SegmentComponent::FieldList)?; let fieldnorms_write = segment.open_write(SegmentComponent::FieldNorms)?; let fieldnorms_serializer = FieldNormsSerializer::from_write(fieldnorms_write)?; @@ -58,6 +60,7 @@ impl SegmentSerializer { segment, store_writer, fast_field_write, + field_list_write, fieldnorms_serializer: Some(fieldnorms_serializer), postings_serializer, }) @@ -81,6 +84,11 @@ impl SegmentSerializer { &mut self.postings_serializer } + /// Accessor to the ``. + pub fn get_field_list_write(&mut self) -> &mut WritePtr { + &mut self.field_list_write + } + /// Accessor to the `FastFieldSerializer`. pub fn get_fast_field_write(&mut self) -> &mut WritePtr { &mut self.fast_field_write @@ -104,6 +112,7 @@ impl SegmentSerializer { fieldnorms_serializer.close()?; } self.fast_field_write.terminate()?; + self.field_list_write.terminate()?; self.postings_serializer.close()?; self.store_writer.close()?; Ok(()) diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index 1888f3b47..22c62eb05 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -8,6 +8,7 @@ use super::operation::AddOperation; use crate::core::json_utils::index_json_values; use crate::core::Segment; use crate::fastfield::FastFieldsWriter; +use crate::field_list::serialize_segment_fields; use crate::fieldnorm::{FieldNormReaders, FieldNormsWriter}; use crate::indexer::segment_serializer::SegmentSerializer; use crate::postings::{ @@ -443,16 +444,29 @@ fn remap_and_write( .segment() .open_read(SegmentComponent::FieldNorms)?; let fieldnorm_readers = FieldNormReaders::open(fieldnorm_data)?; + + let unordered_id_to_ordered_id = ctx.path_to_unordered_id.unordered_id_to_ordered_id(); + serialize_postings( - ctx, - schema, + &ctx, + schema.clone(), per_field_postings_writers, fieldnorm_readers, doc_id_map, + &unordered_id_to_ordered_id, serializer.get_postings_serializer(), )?; debug!("fastfield-serialize"); - fast_field_writers.serialize(serializer.get_fast_field_write(), doc_id_map)?; + let columns = fast_field_writers.serialize(serializer.get_fast_field_write(), doc_id_map)?; + + let field_list_serializer = serializer.get_field_list_write(); + serialize_segment_fields( + ctx, + field_list_serializer, + &schema, + &unordered_id_to_ordered_id, + columns, + )?; // finalize temp docstore and create version, which reflects the doc_id_map if let Some(doc_id_map) = doc_id_map { diff --git a/src/lib.rs b/src/lib.rs index e14d02a6c..55dbbb776 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -188,6 +188,7 @@ pub mod aggregation; pub mod collector; pub mod directory; pub mod fastfield; +pub mod field_list; pub mod fieldnorm; pub mod positions; pub mod postings; @@ -238,7 +239,9 @@ pub use crate::schema::DatePrecision; pub use crate::schema::{DateOptions, DateTimePrecision, Document, TantivyDocument, Term}; /// Index format version. -const INDEX_FORMAT_VERSION: u32 = 6; +/// +/// Version 7: Add `.fieldlist` file containing the list of fields in a segment. +const INDEX_FORMAT_VERSION: u32 = 7; /// Oldest index format version this tantivy version can read. const INDEX_FORMAT_OLDEST_SUPPORTED_VERSION: u32 = 4; diff --git a/src/postings/postings_writer.rs b/src/postings/postings_writer.rs index 264392889..f9c14d185 100644 --- a/src/postings/postings_writer.rs +++ b/src/postings/postings_writer.rs @@ -2,6 +2,7 @@ use std::io; use std::marker::PhantomData; use std::ops::Range; +use common::TinySet; use stacker::Addr; use crate::fieldnorm::FieldNormReaders; @@ -46,37 +47,38 @@ fn make_field_partition( /// It pushes all term, one field at a time, towards the /// postings serializer. pub(crate) fn serialize_postings( - ctx: IndexingContext, + ctx: &IndexingContext, schema: Schema, per_field_postings_writers: &PerFieldPostingsWriter, fieldnorm_readers: FieldNormReaders, doc_id_map: Option<&DocIdMapping>, + unordered_id_to_ordered_id: &[(OrderedPathId, TinySet)], serializer: &mut InvertedIndexSerializer, ) -> crate::Result<()> { // Replace unordered ids by ordered ids to be able to sort - let unordered_id_to_ordered_id: Vec = - ctx.path_to_unordered_id.unordered_id_to_ordered_id(); + let ordered_id_to_path = ctx.path_to_unordered_id.ordered_id_to_path(); let mut term_offsets: Vec<(Field, OrderedPathId, &[u8], Addr)> = Vec::with_capacity(ctx.term_index.len()); - term_offsets.extend(ctx.term_index.iter().map(|(key, addr)| { + for (key, addr) in ctx.term_index.iter() { let field = Term::wrap(key).field(); - if schema.get_field_entry(field).field_type().value_type() == Type::Json { - let byte_range_path = 5..5 + 4; - let unordered_id = u32::from_be_bytes(key[byte_range_path.clone()].try_into().unwrap()); - let path_id = unordered_id_to_ordered_id[unordered_id as usize]; - (field, path_id, &key[byte_range_path.end..], addr) + let field_entry = schema.get_field_entry(field); + if field_entry.field_type().value_type() == Type::Json { + let byte_range_unordered_id = 5..5 + 4; + let unordered_id = + u32::from_be_bytes(key[byte_range_unordered_id.clone()].try_into().unwrap()); + let (path_id, _typ_code_bitvec) = unordered_id_to_ordered_id[unordered_id as usize]; + term_offsets.push((field, path_id, &key[byte_range_unordered_id.end..], addr)); } else { - (field, 0.into(), &key[5..], addr) + term_offsets.push((field, 0.into(), &key[5..], addr)); } - })); + } // Sort by field, path, and term term_offsets.sort_unstable_by( |(field1, path_id1, bytes1, _), (field2, path_id2, bytes2, _)| { (field1, path_id1, bytes1).cmp(&(field2, path_id2, bytes2)) }, ); - let ordered_id_to_path = ctx.path_to_unordered_id.ordered_id_to_path(); let field_offsets = make_field_partition(&term_offsets); for (field, byte_offsets) in field_offsets { let postings_writer = per_field_postings_writers.get_for_field(field); @@ -87,7 +89,7 @@ pub(crate) fn serialize_postings( &term_offsets[byte_offsets], &ordered_id_to_path, doc_id_map, - &ctx, + ctx, &mut field_serializer, )?; field_serializer.close()?; diff --git a/src/space_usage/mod.rs b/src/space_usage/mod.rs index 84fb074d0..76e947c2a 100644 --- a/src/space_usage/mod.rs +++ b/src/space_usage/mod.rs @@ -117,6 +117,7 @@ impl SegmentSpaceUsage { use self::ComponentSpaceUsage::*; use crate::SegmentComponent::*; match component { + FieldList => ComponentSpaceUsage::Basic(ByteCount::from(0u64)), Postings => PerField(self.postings().clone()), Positions => PerField(self.positions().clone()), FastFields => PerField(self.fast_fields().clone()),