diff --git a/CHANGELOG.md b/CHANGELOG.md index bade1c7c0..97d92ed4c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ Tantivy 0.15.0 - Simplified positions index format (@fulmicoton) #1022 - Moved bitpacking to bitpacker subcrate and add BlockedBitpacker, which bitpacks blocks of 128 elements (@PSeitz) #1030 - Added support for more-like-this query in tantivy (@evanxg852000) #1011 +- Added support for sorting an index, e.g presorting documents in an index by a timestamp field. This can heavily improve performance for certain scenarios, by utilizing the sorted data (Top-n optimizations). #1026 Tantivy 0.14.0 ========================= diff --git a/Cargo.toml b/Cargo.toml index 12d3a08c3..e0f058f65 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ smallvec = "1" rayon = "1" lru = "0.6" fastdivide = "0.3" +itertools = "0.10.0" [target.'cfg(windows)'.dependencies] winapi = "0.3" diff --git a/bitpacker/src/bitpacker.rs b/bitpacker/src/bitpacker.rs index 7f936760b..a6bc998d2 100644 --- a/bitpacker/src/bitpacker.rs +++ b/bitpacker/src/bitpacker.rs @@ -4,7 +4,11 @@ pub struct BitPacker { mini_buffer: u64, mini_buffer_written: usize, } - +impl Default for BitPacker { + fn default() -> Self { + BitPacker::new() + } +} impl BitPacker { pub fn new() -> BitPacker { BitPacker { diff --git a/bitpacker/src/blocked_bitpacker.rs b/bitpacker/src/blocked_bitpacker.rs index 027886dfb..1fa142b21 100644 --- a/bitpacker/src/blocked_bitpacker.rs +++ b/bitpacker/src/blocked_bitpacker.rs @@ -15,6 +15,11 @@ pub struct BlockedBitpacker { buffer: Vec, offset_and_bits: Vec, } +impl Default for BlockedBitpacker { + fn default() -> Self { + BlockedBitpacker::new() + } +} /// `BlockedBitpackerEntryMetaData` encodes the /// offset and bit_width into a u64 bit field @@ -115,8 +120,6 @@ impl BlockedBitpacker { self.buffer.clear(); self.compressed_blocks .resize(self.compressed_blocks.len() + 8, 0); // add padding for bitpacker - } else { - return; } } pub fn get(&self, idx: usize) -> u64 { diff --git a/src/core/index.rs b/src/core/index.rs index a683fd6aa..582e3f26f 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -64,31 +64,42 @@ fn load_metas( /// /// ``` /// use tantivy::schema::*; -/// use tantivy::{Index, IndexSettings}; +/// use tantivy::{Index, IndexSettings, IndexSortByField, Order}; /// /// let mut schema_builder = Schema::builder(); /// let id_field = schema_builder.add_text_field("id", STRING); /// let title_field = schema_builder.add_text_field("title", TEXT); /// let body_field = schema_builder.add_text_field("body", TEXT); +/// let number_field = schema_builder.add_u64_field( +/// "number", +/// IntOptions::default().set_fast(Cardinality::SingleValue), +/// ); +/// /// let schema = schema_builder.build(); -/// let settings = IndexSettings::default(); +/// let settings = IndexSettings{sort_by_field: Some(IndexSortByField{field:"number".to_string(), order:Order::Asc})}; /// let index = Index::builder().schema(schema).settings(settings).create_in_ram(); /// /// ``` pub struct IndexBuilder { schema: Option, - index_settings: Option, + index_settings: IndexSettings, +} +impl Default for IndexBuilder { + fn default() -> Self { + IndexBuilder::new() + } } impl IndexBuilder { + /// Creates a new `IndexBuilder` pub fn new() -> Self { Self { schema: None, - index_settings: None, + index_settings: IndexSettings::default(), } } /// Set the settings pub fn settings(mut self, settings: IndexSettings) -> Self { - self.index_settings = Some(settings); + self.index_settings = settings; self } /// Set the schema @@ -131,15 +142,11 @@ impl IndexBuilder { let mmap_directory = MmapDirectory::create_from_tempdir()?; self.create(mmap_directory) } - fn get_settings_or_default(&self) -> Option { - self.index_settings.as_ref().cloned() - } fn get_expect_schema(&self) -> crate::Result { - Ok(self - .schema + self.schema .as_ref() .cloned() - .ok_or_else(|| TantivyError::IndexBuilderMissingArgument("schema"))?) + .ok_or(TantivyError::IndexBuilderMissingArgument("schema")) } /// Opens or creates a new index in the provided directory pub fn open_or_create(self, dir: Dir) -> crate::Result { @@ -162,11 +169,11 @@ impl IndexBuilder { let directory = ManagedDirectory::wrap(dir)?; save_new_metas( self.get_expect_schema()?, - self.get_settings_or_default(), + self.index_settings.clone(), &directory, )?; let mut metas = IndexMeta::with_schema(self.get_expect_schema()?); - metas.index_settings = self.get_settings_or_default(); + metas.index_settings = self.index_settings.clone(); let index = Index::open_from_metas(directory, &metas, SegmentMetaInventory::default()); Ok(index) } @@ -177,7 +184,7 @@ impl IndexBuilder { pub struct Index { directory: ManagedDirectory, schema: Schema, - settings: Option, + settings: IndexSettings, executor: Arc, tokenizers: TokenizerManager, inventory: SegmentMetaInventory, @@ -265,12 +272,10 @@ impl Index { pub fn create( dir: Dir, schema: Schema, - settings: Option, + settings: IndexSettings, ) -> crate::Result { let mut builder = IndexBuilder::new().schema(schema); - if let Some(settings) = settings { - builder = builder.settings(settings); - } + builder = builder.settings(settings); builder.create(dir) } @@ -423,7 +428,7 @@ impl Index { /// Helper to create an index writer for tests. /// - /// That index writer only simply has a single thread and a heap of 5 MB. + /// That index writer only simply has a single thread and a heap of 10 MB. /// Using a single thread gives us a deterministic allocation of DocId. #[cfg(test)] pub fn writer_for_tests(&self) -> crate::Result { @@ -452,7 +457,7 @@ impl Index { /// Accessor to the index settings /// - pub fn settings(&self) -> &Option { + pub fn settings(&self) -> &IndexSettings { &self.settings } /// Accessor to the index schema @@ -523,11 +528,14 @@ impl fmt::Debug for Index { #[cfg(test)] mod tests { - use crate::directory::{RamDirectory, WatchCallback}; use crate::schema::Field; use crate::schema::{Schema, INDEXED, TEXT}; use crate::IndexReader; use crate::ReloadPolicy; + use crate::{ + directory::{RamDirectory, WatchCallback}, + IndexSettings, + }; use crate::{Directory, Index}; #[test] @@ -548,7 +556,12 @@ mod tests { fn test_index_exists() { let directory = RamDirectory::create(); assert!(!Index::exists(&directory).unwrap()); - assert!(Index::create(directory.clone(), throw_away_schema(), None).is_ok()); + assert!(Index::create( + directory.clone(), + throw_away_schema(), + IndexSettings::default() + ) + .is_ok()); assert!(Index::exists(&directory).unwrap()); } @@ -563,7 +576,12 @@ mod tests { #[test] fn open_or_create_should_open() { let directory = RamDirectory::create(); - assert!(Index::create(directory.clone(), throw_away_schema(), None).is_ok()); + assert!(Index::create( + directory.clone(), + throw_away_schema(), + IndexSettings::default() + ) + .is_ok()); assert!(Index::exists(&directory).unwrap()); assert!(Index::open_or_create(directory, throw_away_schema()).is_ok()); } @@ -571,15 +589,30 @@ mod tests { #[test] fn create_should_wipeoff_existing() { let directory = RamDirectory::create(); - assert!(Index::create(directory.clone(), throw_away_schema(), None).is_ok()); + assert!(Index::create( + directory.clone(), + throw_away_schema(), + IndexSettings::default() + ) + .is_ok()); assert!(Index::exists(&directory).unwrap()); - assert!(Index::create(directory.clone(), Schema::builder().build(), None).is_ok()); + assert!(Index::create( + directory.clone(), + Schema::builder().build(), + IndexSettings::default() + ) + .is_ok()); } #[test] fn open_or_create_exists_but_schema_does_not_match() { let directory = RamDirectory::create(); - assert!(Index::create(directory.clone(), throw_away_schema(), None).is_ok()); + assert!(Index::create( + directory.clone(), + throw_away_schema(), + IndexSettings::default() + ) + .is_ok()); assert!(Index::exists(&directory).unwrap()); assert!(Index::open_or_create(directory.clone(), throw_away_schema()).is_ok()); let err = Index::open_or_create(directory, Schema::builder().build()); @@ -714,7 +747,7 @@ mod tests { let directory = RamDirectory::create(); let schema = throw_away_schema(); let field = schema.get_field("num_likes").unwrap(); - let index = Index::create(directory.clone(), schema, None).unwrap(); + let index = Index::create(directory.clone(), schema, IndexSettings::default()).unwrap(); let mut writer = index.writer_with_num_threads(8, 24_000_000).unwrap(); for i in 0u64..8_000u64 { diff --git a/src/core/index_meta.rs b/src/core/index_meta.rs index ed0f9f3ab..0a9397583 100644 --- a/src/core/index_meta.rs +++ b/src/core/index_meta.rs @@ -4,9 +4,9 @@ use crate::schema::Schema; use crate::Opstamp; use census::{Inventory, TrackedObject}; use serde::{Deserialize, Serialize}; -use std::collections::HashSet; -use std::fmt; use std::path::PathBuf; +use std::{collections::HashSet, sync::atomic::AtomicBool}; +use std::{fmt, sync::Arc}; #[derive(Clone, Debug, Serialize, Deserialize)] struct DeleteMeta { @@ -33,6 +33,7 @@ impl SegmentMetaInventory { let inner = InnerSegmentMeta { segment_id, max_doc, + include_temp_doc_store: Arc::new(AtomicBool::new(true)), deletes: None, }; SegmentMeta::from(self.inventory.track(inner)) @@ -80,6 +81,15 @@ impl SegmentMeta { self.tracked.segment_id } + /// Removes the Component::TempStore from the alive list and + /// therefore marks the temp docstore file to be deleted by + /// the garbage collection. + pub fn untrack_temp_docstore(&self) { + self.tracked + .include_temp_doc_store + .store(false, std::sync::atomic::Ordering::Relaxed); + } + /// Returns the number of deleted documents. pub fn num_deleted_docs(&self) -> u32 { self.tracked @@ -96,9 +106,20 @@ impl SegmentMeta { /// is by removing all files that have been created by tantivy /// and are not used by any segment anymore. pub fn list_files(&self) -> HashSet { - SegmentComponent::iterator() - .map(|component| self.relative_path(*component)) - .collect::>() + if self + .tracked + .include_temp_doc_store + .load(std::sync::atomic::Ordering::Relaxed) + { + SegmentComponent::iterator() + .map(|component| self.relative_path(*component)) + .collect::>() + } else { + SegmentComponent::iterator() + .filter(|comp| *comp != &SegmentComponent::TempStore) + .map(|component| self.relative_path(*component)) + .collect::>() + } } /// Returns the relative path of a component of our segment. @@ -112,6 +133,7 @@ impl SegmentMeta { SegmentComponent::Positions => ".pos".to_string(), SegmentComponent::Terms => ".term".to_string(), SegmentComponent::Store => ".store".to_string(), + SegmentComponent::TempStore => ".store.temp".to_string(), SegmentComponent::FastFields => ".fast".to_string(), SegmentComponent::FieldNorms => ".fieldnorm".to_string(), SegmentComponent::Delete => format!(".{}.del", self.delete_opstamp().unwrap_or(0)), @@ -159,6 +181,7 @@ impl SegmentMeta { segment_id: inner_meta.segment_id, max_doc, deletes: None, + include_temp_doc_store: Arc::new(AtomicBool::new(true)), }); SegmentMeta { tracked } } @@ -172,6 +195,7 @@ impl SegmentMeta { let tracked = self.tracked.map(move |inner_meta| InnerSegmentMeta { segment_id: inner_meta.segment_id, max_doc: inner_meta.max_doc, + include_temp_doc_store: Arc::new(AtomicBool::new(true)), deletes: Some(delete_meta), }); SegmentMeta { tracked } @@ -183,6 +207,14 @@ struct InnerSegmentMeta { segment_id: SegmentId, max_doc: u32, deletes: Option, + /// If you want to avoid the SegmentComponent::TempStore file to be covered by + /// garbage collection and deleted, set this to true. This is used during merge. + #[serde(skip)] + #[serde(default = "default_temp_store")] + pub(crate) include_temp_doc_store: Arc, +} +fn default_temp_store() -> Arc { + Arc::new(AtomicBool::new(false)) } impl InnerSegmentMeta { @@ -193,9 +225,36 @@ impl InnerSegmentMeta { } } -/// Search Index Settings -#[derive(Clone, Default, Serialize)] -pub struct IndexSettings {} +/// Search Index Settings. +/// +/// Contains settings which are applied on the whole +/// index, like presort documents. +#[derive(Clone, Default, Serialize, Deserialize, Eq, PartialEq)] +pub struct IndexSettings { + /// Sorts the documents by information + /// provided in `IndexSortByField` + pub sort_by_field: Option, +} +/// Settings to presort the documents in an index +/// +/// Presorting documents can greatly performance +/// in some scenarios, by applying top n +/// optimizations. +#[derive(Clone, Serialize, Deserialize, Eq, PartialEq)] +pub struct IndexSortByField { + /// The field to sort the documents by + pub field: String, + /// The order to sort the documents by + pub order: Order, +} +/// The order to sort by +#[derive(Clone, Serialize, Deserialize, Eq, PartialEq)] +pub enum Order { + /// Ascending Order + Asc, + /// Descending Order + Desc, +} /// Meta information about the `Index`. /// /// This object is serialized on disk in the `meta.json` file. @@ -207,8 +266,8 @@ pub struct IndexSettings {} #[derive(Clone, Serialize)] pub struct IndexMeta { /// `IndexSettings` to configure index options. - #[serde(skip_serializing_if = "Option::is_none")] - pub index_settings: Option, + #[serde(default)] + pub index_settings: IndexSettings, /// List of `SegmentMeta` informations associated to each finalized segment of the index. pub segments: Vec, /// Index `Schema` @@ -227,6 +286,8 @@ pub struct IndexMeta { #[derive(Deserialize)] struct UntrackedIndexMeta { pub segments: Vec, + #[serde(default)] + pub index_settings: IndexSettings, pub schema: Schema, pub opstamp: Opstamp, #[serde(skip_serializing_if = "Option::is_none")] @@ -236,7 +297,7 @@ struct UntrackedIndexMeta { impl UntrackedIndexMeta { pub fn track(self, inventory: &SegmentMetaInventory) -> IndexMeta { IndexMeta { - index_settings: None, + index_settings: self.index_settings, segments: self .segments .into_iter() @@ -257,7 +318,7 @@ impl IndexMeta { /// Opstamp will the value `0u64`. pub fn with_schema(schema: Schema) -> IndexMeta { IndexMeta { - index_settings: None, + index_settings: IndexSettings::default(), segments: vec![], schema, opstamp: 0u64, @@ -289,7 +350,10 @@ impl fmt::Debug for IndexMeta { mod tests { use super::IndexMeta; - use crate::schema::{Schema, TEXT}; + use crate::{ + schema::{Schema, TEXT}, + IndexSettings, IndexSortByField, Order, + }; use serde_json; #[test] @@ -300,7 +364,12 @@ mod tests { schema_builder.build() }; let index_metas = IndexMeta { - index_settings: None, + index_settings: IndexSettings { + sort_by_field: Some(IndexSortByField { + field: "text".to_string(), + order: Order::Asc, + }), + }, segments: Vec::new(), schema, opstamp: 0u64, @@ -309,7 +378,7 @@ mod tests { let json = serde_json::ser::to_string(&index_metas).expect("serialization failed"); assert_eq!( json, - r#"{"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","tokenizer":"default"},"stored":false}}],"opstamp":0}"# + r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"}},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","tokenizer":"default"},"stored":false}}],"opstamp":0}"# ); } } diff --git a/src/core/mod.rs b/src/core/mod.rs index 741eb6999..aa3679ee4 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -10,7 +10,9 @@ mod segment_reader; pub use self::executor::Executor; pub use self::index::{Index, IndexBuilder}; -pub use self::index_meta::{IndexMeta, IndexSettings, SegmentMeta, SegmentMetaInventory}; +pub use self::index_meta::{ + IndexMeta, IndexSettings, IndexSortByField, Order, SegmentMeta, SegmentMetaInventory, +}; pub use self::inverted_index_reader::InvertedIndexReader; pub use self::searcher::Searcher; pub use self::segment::Segment; diff --git a/src/core/segment.rs b/src/core/segment.rs index efd1bcbb7..a4c135dbb 100644 --- a/src/core/segment.rs +++ b/src/core/segment.rs @@ -1,5 +1,4 @@ use super::SegmentComponent; -use crate::core::Index; use crate::core::SegmentId; use crate::core::SegmentMeta; use crate::directory::error::{OpenReadError, OpenWriteError}; @@ -8,6 +7,7 @@ use crate::directory::{FileSlice, WritePtr}; use crate::indexer::segment_serializer::SegmentSerializer; use crate::schema::Schema; use crate::Opstamp; +use crate::{core::Index, indexer::doc_id_mapping::DocIdMapping}; use std::fmt; use std::path::PathBuf; @@ -97,5 +97,13 @@ pub trait SerializableSegment { /// /// # Returns /// The number of documents in the segment. - fn write(&self, serializer: SegmentSerializer) -> crate::Result; + /// + /// doc_id_map is used when index is created and sorted, to map to the new doc_id order. + /// It is not used by the `IndexMerger`, since the doc_id_mapping on cross-segments works + /// differently + fn write( + &self, + serializer: SegmentSerializer, + doc_id_map: Option<&DocIdMapping>, + ) -> crate::Result; } diff --git a/src/core/segment_component.rs b/src/core/segment_component.rs index e8e1a45b5..c5e07255b 100644 --- a/src/core/segment_component.rs +++ b/src/core/segment_component.rs @@ -4,7 +4,7 @@ use std::slice; /// Each component is stored in its own file, /// using the pattern `segment_uuid`.`component_extension`, /// except the delete component that takes an `segment_uuid`.`delete_opstamp`.`component_extension` -#[derive(Copy, Clone)] +#[derive(Copy, Clone, Eq, PartialEq)] pub enum SegmentComponent { /// Postings (or inverted list). Sorted lists of document ids, associated to terms Postings, @@ -22,6 +22,8 @@ pub enum SegmentComponent { /// Accessing a document from the store is relatively slow, as it /// requires to decompress the entire block it belongs to. Store, + /// Temporary storage of the documents, before streamed to `Store`. + TempStore, /// Bitset describing which document of the segment is deleted. Delete, } @@ -29,13 +31,14 @@ pub enum SegmentComponent { impl SegmentComponent { /// Iterates through the components. pub fn iterator() -> slice::Iter<'static, SegmentComponent> { - static SEGMENT_COMPONENTS: [SegmentComponent; 7] = [ + static SEGMENT_COMPONENTS: [SegmentComponent; 8] = [ SegmentComponent::Postings, SegmentComponent::Positions, SegmentComponent::FastFields, SegmentComponent::FieldNorms, SegmentComponent::Terms, SegmentComponent::Store, + SegmentComponent::TempStore, SegmentComponent::Delete, ]; SEGMENT_COMPONENTS.iter() diff --git a/src/directory/mmap_directory.rs b/src/directory/mmap_directory.rs index 4ff419e0a..d929c524a 100644 --- a/src/directory/mmap_directory.rs +++ b/src/directory/mmap_directory.rs @@ -485,10 +485,13 @@ mod tests { // The following tests are specific to the MmapDirectory use super::*; - use crate::schema::{Schema, SchemaBuilder, TEXT}; use crate::Index; use crate::ReloadPolicy; use crate::{common::HasLen, indexer::LogMergePolicy}; + use crate::{ + schema::{Schema, SchemaBuilder, TEXT}, + IndexSettings, + }; #[test] fn test_open_non_existent_path() { @@ -585,7 +588,8 @@ mod tests { let schema = schema_builder.build(); { - let index = Index::create(mmap_directory.clone(), schema, None).unwrap(); + let index = + Index::create(mmap_directory.clone(), schema, IndexSettings::default()).unwrap(); let mut index_writer = index.writer_for_tests().unwrap(); let mut log_merge_policy = LogMergePolicy::default(); @@ -614,9 +618,10 @@ mod tests { reader.reload().unwrap(); let num_segments = reader.searcher().segment_readers().len(); assert!(num_segments <= 4); - let num_components_except_deletes = crate::core::SegmentComponent::iterator().len() - 1; + let num_components_except_deletes_and_tempstore = + crate::core::SegmentComponent::iterator().len() - 2; assert_eq!( - num_segments * num_components_except_deletes, + num_segments * num_components_except_deletes_and_tempstore, mmap_directory.get_cache_info().mmapped.len() ); } diff --git a/src/directory/owned_bytes.rs b/src/directory/owned_bytes.rs index dbe760e88..d8f493416 100644 --- a/src/directory/owned_bytes.rs +++ b/src/directory/owned_bytes.rs @@ -36,8 +36,8 @@ impl OwnedBytes { let bytes: &[u8] = box_stable_deref.as_ref(); let data = unsafe { mem::transmute::<_, &'static [u8]>(bytes.deref()) }; OwnedBytes { - box_stable_deref, data, + box_stable_deref, } } diff --git a/src/fastfield/bytes/reader.rs b/src/fastfield/bytes/reader.rs index 123d6a89b..c00d56ea1 100644 --- a/src/fastfield/bytes/reader.rs +++ b/src/fastfield/bytes/reader.rs @@ -1,7 +1,7 @@ -use crate::directory::FileSlice; use crate::directory::OwnedBytes; use crate::fastfield::FastFieldReader; use crate::DocId; +use crate::{directory::FileSlice, fastfield::MultiValueLength}; /// Reader for byte array fast fields /// @@ -40,8 +40,23 @@ impl BytesFastFieldReader { &self.values.as_slice()[start..stop] } + /// Returns the length of the bytes associated to the given `doc` + pub fn num_bytes(&self, doc: DocId) -> usize { + let (start, stop) = self.range(doc); + stop - start + } + /// Returns the overall number of bytes in this bytes fast field. pub fn total_num_bytes(&self) -> usize { self.values.len() } } + +impl MultiValueLength for BytesFastFieldReader { + fn get_len(&self, doc_id: DocId) -> u64 { + self.num_bytes(doc_id) as u64 + } + fn get_total_len(&self) -> u64 { + self.total_num_bytes() as u64 + } +} diff --git a/src/fastfield/bytes/writer.rs b/src/fastfield/bytes/writer.rs index 44a835a42..8322ed4c5 100644 --- a/src/fastfield/bytes/writer.rs +++ b/src/fastfield/bytes/writer.rs @@ -1,8 +1,8 @@ use std::io; -use crate::fastfield::serializer::FastFieldSerializer; use crate::schema::{Document, Field, Value}; use crate::DocId; +use crate::{fastfield::serializer::FastFieldSerializer, indexer::doc_id_mapping::DocIdMapping}; /// Writer for byte array (as in, any number of bytes per document) fast fields /// @@ -72,20 +72,62 @@ impl BytesFastFieldWriter { doc } + /// Returns an iterator over values per doc_id in ascending doc_id order. + /// + /// Normally the order is simply iterating self.doc_id_index. + /// With doc_id_map it accounts for the new mapping, returning values in the order of the + /// new doc_ids. + fn get_ordered_values<'a: 'b, 'b>( + &'a self, + doc_id_map: Option<&'b DocIdMapping>, + ) -> impl Iterator { + let doc_id_iter = if let Some(doc_id_map) = doc_id_map { + Box::new(doc_id_map.iter_old_doc_ids().cloned()) as Box> + } else { + Box::new(self.doc_index.iter().enumerate().map(|el| el.0 as u32)) + as Box> + }; + doc_id_iter.map(move |doc_id| self.get_values_for_doc_id(doc_id)) + } + + /// returns all values for a doc_ids + fn get_values_for_doc_id(&self, doc_id: u32) -> &[u8] { + let start_pos = self.doc_index[doc_id as usize] as usize; + let end_pos = self + .doc_index + .get(doc_id as usize + 1) + .cloned() + .unwrap_or(self.vals.len() as u64) as usize; // special case, last doc_id has no offset information + &self.vals[start_pos..end_pos] + } + /// Serializes the fast field values by pushing them to the `FastFieldSerializer`. - pub fn serialize(&self, serializer: &mut FastFieldSerializer) -> io::Result<()> { + pub fn serialize( + &self, + serializer: &mut FastFieldSerializer, + doc_id_map: Option<&DocIdMapping>, + ) -> io::Result<()> { // writing the offset index let mut doc_index_serializer = serializer.new_u64_fast_field_with_idx(self.field, 0, self.vals.len() as u64, 0)?; - for &offset in &self.doc_index { + let mut offset = 0; + for vals in self.get_ordered_values(doc_id_map) { doc_index_serializer.add_val(offset)?; + offset += vals.len() as u64; } doc_index_serializer.add_val(self.vals.len() as u64)?; doc_index_serializer.close_field()?; // writing the values themselves - serializer - .new_bytes_fast_field_with_idx(self.field, 1) - .write_all(&self.vals)?; + let mut value_serializer = serializer.new_bytes_fast_field_with_idx(self.field, 1); + // the else could be removed, but this is faster (difference not benchmarked) + if let Some(doc_id_map) = doc_id_map { + for vals in self.get_ordered_values(Some(doc_id_map)) { + // sort values in case of remapped doc_ids? + value_serializer.write_all(vals)?; + } + } else { + value_serializer.write_all(&self.vals)?; + } Ok(()) } } diff --git a/src/fastfield/delete.rs b/src/fastfield/delete.rs index e83dc2425..de23effac 100644 --- a/src/fastfield/delete.rs +++ b/src/fastfield/delete.rs @@ -41,7 +41,7 @@ pub fn write_delete_bitset( #[derive(Clone)] pub struct DeleteBitSet { data: OwnedBytes, - len: usize, + num_deleted: usize, } impl DeleteBitSet { @@ -73,7 +73,7 @@ impl DeleteBitSet { .sum(); Ok(DeleteBitSet { data: bytes, - len: num_deleted, + num_deleted, }) } @@ -99,7 +99,7 @@ impl DeleteBitSet { impl HasLen for DeleteBitSet { fn len(&self) -> usize { - self.len + self.num_deleted } } diff --git a/src/fastfield/mod.rs b/src/fastfield/mod.rs index 70e5d0e92..385c89e60 100644 --- a/src/fastfield/mod.rs +++ b/src/fastfield/mod.rs @@ -33,7 +33,6 @@ pub use self::reader::FastFieldReader; pub use self::readers::FastFieldReaders; pub use self::serializer::FastFieldSerializer; pub use self::writer::{FastFieldsWriter, IntFastFieldWriter}; -use crate::common; use crate::schema::Cardinality; use crate::schema::FieldType; use crate::schema::Value; @@ -41,6 +40,7 @@ use crate::{ chrono::{NaiveDateTime, Utc}, schema::Type, }; +use crate::{common, DocId}; mod bytes; mod delete; @@ -52,6 +52,15 @@ mod readers; mod serializer; mod writer; +/// Trait for `BytesFastFieldReader` and `MultiValuedFastFieldReader` to return the length of data +/// for a doc_id +pub trait MultiValueLength { + /// returns the num of values associated to a doc_id + fn get_len(&self, doc_id: DocId) -> u64; + /// returns the sum of num of all values for all doc_ids + fn get_total_len(&self) -> u64; +} + /// Trait for types that are allowed for fast fields: (u64, i64 and f64). pub trait FastValue: Clone + Copy + Send + Sync + PartialOrd + 'static { /// Converts a value from u64 @@ -251,7 +260,7 @@ mod tests { fast_field_writers.add_document(&doc!(*FIELD=>14u64)); fast_field_writers.add_document(&doc!(*FIELD=>2u64)); fast_field_writers - .serialize(&mut serializer, &HashMap::new()) + .serialize(&mut serializer, &HashMap::new(), None) .unwrap(); serializer.close().unwrap(); } @@ -283,7 +292,7 @@ mod tests { fast_field_writers.add_document(&doc!(*FIELD=>1_002u64)); fast_field_writers.add_document(&doc!(*FIELD=>1_501u64)); fast_field_writers.add_document(&doc!(*FIELD=>215u64)); - fast_field_writers.serialize(&mut serializer, &HashMap::new())?; + fast_field_writers.serialize(&mut serializer, &HashMap::new(), None)?; serializer.close()?; } let file = directory.open_read(&path)?; @@ -318,7 +327,7 @@ mod tests { fast_field_writers.add_document(&doc!(*FIELD=>100_000u64)); } fast_field_writers - .serialize(&mut serializer, &HashMap::new()) + .serialize(&mut serializer, &HashMap::new(), None) .unwrap(); serializer.close().unwrap(); } @@ -350,7 +359,7 @@ mod tests { fast_field_writers.add_document(&doc!(*FIELD=>5_000_000_000_000_000_000u64 + i)); } fast_field_writers - .serialize(&mut serializer, &HashMap::new()) + .serialize(&mut serializer, &HashMap::new(), None) .unwrap(); serializer.close().unwrap(); } @@ -389,7 +398,7 @@ mod tests { fast_field_writers.add_document(&doc); } fast_field_writers - .serialize(&mut serializer, &HashMap::new()) + .serialize(&mut serializer, &HashMap::new(), None) .unwrap(); serializer.close().unwrap(); } @@ -429,7 +438,7 @@ mod tests { let doc = Document::default(); fast_field_writers.add_document(&doc); fast_field_writers - .serialize(&mut serializer, &HashMap::new()) + .serialize(&mut serializer, &HashMap::new(), None) .unwrap(); serializer.close().unwrap(); } @@ -464,7 +473,7 @@ mod tests { for &x in &permutation { fast_field_writers.add_document(&doc!(*FIELD=>x)); } - fast_field_writers.serialize(&mut serializer, &HashMap::new())?; + fast_field_writers.serialize(&mut serializer, &HashMap::new(), None)?; serializer.close()?; } let file = directory.open_read(&path)?; @@ -621,7 +630,7 @@ mod bench { fast_field_writers.add_document(&doc!(*FIELD=>x)); } fast_field_writers - .serialize(&mut serializer, &HashMap::new()) + .serialize(&mut serializer, &HashMap::new(), None) .unwrap(); serializer.close().unwrap(); } @@ -655,7 +664,7 @@ mod bench { fast_field_writers.add_document(&doc!(*FIELD=>x)); } fast_field_writers - .serialize(&mut serializer, &HashMap::new()) + .serialize(&mut serializer, &HashMap::new(), None) .unwrap(); serializer.close().unwrap(); } diff --git a/src/fastfield/multivalued/reader.rs b/src/fastfield/multivalued/reader.rs index 6ead853e7..82993b042 100644 --- a/src/fastfield/multivalued/reader.rs +++ b/src/fastfield/multivalued/reader.rs @@ -1,6 +1,6 @@ use std::ops::Range; -use crate::fastfield::{FastFieldReader, FastValue}; +use crate::fastfield::{FastFieldReader, FastValue, MultiValueLength}; use crate::DocId; /// Reader for a multivalued `u64` fast field. @@ -56,6 +56,15 @@ impl MultiValuedFastFieldReader { } } +impl MultiValueLength for MultiValuedFastFieldReader { + fn get_len(&self, doc_id: DocId) -> u64 { + self.num_vals(doc_id) as u64 + } + + fn get_total_len(&self) -> u64 { + self.total_num_vals() as u64 + } +} #[cfg(test)] mod tests { diff --git a/src/fastfield/multivalued/writer.rs b/src/fastfield/multivalued/writer.rs index b25888e78..5bf0faaa4 100644 --- a/src/fastfield/multivalued/writer.rs +++ b/src/fastfield/multivalued/writer.rs @@ -1,13 +1,12 @@ use crate::fastfield::serializer::FastSingleFieldSerializer; -use crate::fastfield::value_to_u64; use crate::fastfield::FastFieldSerializer; use crate::postings::UnorderedTermId; use crate::schema::{Document, Field}; use crate::termdict::TermOrdinal; use crate::DocId; +use crate::{fastfield::value_to_u64, indexer::doc_id_mapping::DocIdMapping}; use fnv::FnvHashMap; use std::io; -use std::iter::once; use tantivy_bitpacker::minmax; /// Writer for multi-valued (as in, more than one value per document) @@ -94,7 +93,34 @@ impl MultiValuedFastFieldWriter { self.vals.extend_from_slice(vals); doc } + /// Returns an iterator over values per doc_id in ascending doc_id order. + /// + /// Normally the order is simply iterating self.doc_id_index. + /// With doc_id_map it accounts for the new mapping, returning values in the order of the + /// new doc_ids. + fn get_ordered_values<'a: 'b, 'b>( + &'a self, + doc_id_map: Option<&'b DocIdMapping>, + ) -> impl Iterator { + let doc_id_iter = if let Some(doc_id_map) = doc_id_map { + Box::new(doc_id_map.iter_old_doc_ids().cloned()) as Box> + } else { + Box::new(self.doc_index.iter().enumerate().map(|el| el.0 as u32)) + as Box> + }; + doc_id_iter.map(move |doc_id| self.get_values_for_doc_id(doc_id)) + } + /// returns all values for a doc_ids + fn get_values_for_doc_id(&self, doc_id: u32) -> &[u64] { + let start_pos = self.doc_index[doc_id as usize] as usize; + let end_pos = self + .doc_index + .get(doc_id as usize + 1) + .cloned() + .unwrap_or(self.vals.len() as u64) as usize; // special case, last doc_id has no offset information + &self.vals[start_pos..end_pos] + } /// Serializes fast field values by pushing them to the `FastFieldSerializer`. /// /// If a mapping is given, the values are remapped *and sorted* before serialization. @@ -110,15 +136,20 @@ impl MultiValuedFastFieldWriter { &self, serializer: &mut FastFieldSerializer, mapping_opt: Option<&FnvHashMap>, + doc_id_map: Option<&DocIdMapping>, ) -> io::Result<()> { { // writing the offset index let mut doc_index_serializer = serializer.new_u64_fast_field_with_idx(self.field, 0, self.vals.len() as u64, 0)?; - for &offset in &self.doc_index { + + let mut offset = 0; + for vals in self.get_ordered_values(doc_id_map) { doc_index_serializer.add_val(offset)?; + offset += vals.len() as u64; } doc_index_serializer.add_val(self.vals.len() as u64)?; + doc_index_serializer.close_field()?; } { @@ -133,18 +164,10 @@ impl MultiValuedFastFieldWriter { 1, )?; - let last_interval = - self.doc_index.last().cloned().unwrap() as usize..self.vals.len(); - let mut doc_vals: Vec = Vec::with_capacity(100); - for range in self - .doc_index - .windows(2) - .map(|interval| interval[0] as usize..interval[1] as usize) - .chain(once(last_interval)) - { + for vals in self.get_ordered_values(doc_id_map) { doc_vals.clear(); - let remapped_vals = self.vals[range] + let remapped_vals = vals .iter() .map(|val| *mapping.get(val).expect("Missing term ordinal")); doc_vals.extend(remapped_vals); @@ -159,8 +182,11 @@ impl MultiValuedFastFieldWriter { let (val_min, val_max) = val_min_max.unwrap_or((0u64, 0u64)); value_serializer = serializer.new_u64_fast_field_with_idx(self.field, val_min, val_max, 1)?; - for &val in &self.vals { - value_serializer.add_val(val)?; + for vals in self.get_ordered_values(doc_id_map) { + // sort values in case of remapped doc_ids? + for &val in vals { + value_serializer.add_val(val)?; + } } } } diff --git a/src/fastfield/reader.rs b/src/fastfield/reader.rs index 18cc77dbe..a861849ac 100644 --- a/src/fastfield/reader.rs +++ b/src/fastfield/reader.rs @@ -131,14 +131,14 @@ impl From> for FastFieldReader { let mut fast_field_writers = FastFieldsWriter::from_schema(&schema); { let fast_field_writer = fast_field_writers - .get_field_writer(field) + .get_field_writer_mut(field) .expect("With a RamDirectory, this should never fail."); for val in vals { fast_field_writer.add_val(val.to_u64()); } } fast_field_writers - .serialize(&mut serializer, &HashMap::new()) + .serialize(&mut serializer, &HashMap::new(), None) .unwrap(); serializer.close().unwrap(); } diff --git a/src/fastfield/readers.rs b/src/fastfield/readers.rs index 731b36e49..81024694c 100644 --- a/src/fastfield/readers.rs +++ b/src/fastfield/readers.rs @@ -46,8 +46,8 @@ fn type_and_cardinality(field_type: &FieldType) -> Option<(FastType, Cardinality impl FastFieldReaders { pub(crate) fn new(schema: Schema, fast_fields_composite: CompositeFile) -> FastFieldReaders { FastFieldReaders { - fast_fields_composite, schema, + fast_fields_composite, } } @@ -119,7 +119,7 @@ impl FastFieldReaders { /// Returns the `u64` fast field reader reader associated to `field`. /// - /// If `field` is not a u64 fast field, this method returns `None`. + /// If `field` is not a u64 fast field, this method returns an Error. pub fn u64(&self, field: Field) -> crate::Result> { self.check_type(field, FastType::U64, Cardinality::SingleValue)?; self.typed_fast_field_reader(field) @@ -135,7 +135,7 @@ impl FastFieldReaders { /// Returns the `i64` fast field reader reader associated to `field`. /// - /// If `field` is not a i64 fast field, this method returns `None`. + /// If `field` is not a i64 fast field, this method returns an Error. pub fn i64(&self, field: Field) -> crate::Result> { self.check_type(field, FastType::I64, Cardinality::SingleValue)?; self.typed_fast_field_reader(field) @@ -143,7 +143,7 @@ impl FastFieldReaders { /// Returns the `i64` fast field reader reader associated to `field`. /// - /// If `field` is not a i64 fast field, this method returns `None`. + /// If `field` is not a i64 fast field, this method returns an Error. pub fn date(&self, field: Field) -> crate::Result> { self.check_type(field, FastType::Date, Cardinality::SingleValue)?; self.typed_fast_field_reader(field) @@ -151,7 +151,7 @@ impl FastFieldReaders { /// Returns the `f64` fast field reader reader associated to `field`. /// - /// If `field` is not a f64 fast field, this method returns `None`. + /// If `field` is not a f64 fast field, this method returns an Error. pub fn f64(&self, field: Field) -> crate::Result> { self.check_type(field, FastType::F64, Cardinality::SingleValue)?; self.typed_fast_field_reader(field) @@ -159,15 +159,23 @@ impl FastFieldReaders { /// Returns a `u64s` multi-valued fast field reader reader associated to `field`. /// - /// If `field` is not a u64 multi-valued fast field, this method returns `None`. + /// If `field` is not a u64 multi-valued fast field, this method returns an Error. pub fn u64s(&self, field: Field) -> crate::Result> { self.check_type(field, FastType::U64, Cardinality::MultiValues)?; self.typed_fast_field_multi_reader(field) } + /// Returns a `u64s` multi-valued fast field reader reader associated to `field`, regardless of whether the given + /// field is effectively of type `u64` or not. + /// + /// If `field` is not a u64 multi-valued fast field, this method returns an Error. + pub fn u64s_lenient(&self, field: Field) -> crate::Result> { + self.typed_fast_field_multi_reader(field) + } + /// Returns a `i64s` multi-valued fast field reader reader associated to `field`. /// - /// If `field` is not a i64 multi-valued fast field, this method returns `None`. + /// If `field` is not a i64 multi-valued fast field, this method returns an Error. pub fn i64s(&self, field: Field) -> crate::Result> { self.check_type(field, FastType::I64, Cardinality::MultiValues)?; self.typed_fast_field_multi_reader(field) @@ -175,7 +183,7 @@ impl FastFieldReaders { /// Returns a `f64s` multi-valued fast field reader reader associated to `field`. /// - /// If `field` is not a f64 multi-valued fast field, this method returns `None`. + /// If `field` is not a f64 multi-valued fast field, this method returns an Error. pub fn f64s(&self, field: Field) -> crate::Result> { self.check_type(field, FastType::F64, Cardinality::MultiValues)?; self.typed_fast_field_multi_reader(field) @@ -183,7 +191,7 @@ impl FastFieldReaders { /// Returns a `crate::DateTime` multi-valued fast field reader reader associated to `field`. /// - /// If `field` is not a `crate::DateTime` multi-valued fast field, this method returns `None`. + /// If `field` is not a `crate::DateTime` multi-valued fast field, this method returns an Error. pub fn dates( &self, field: Field, @@ -194,7 +202,7 @@ impl FastFieldReaders { /// Returns the `bytes` fast field reader associated to `field`. /// - /// If `field` is not a bytes fast field, returns `None`. + /// If `field` is not a bytes fast field, returns an Error. pub fn bytes(&self, field: Field) -> crate::Result { let field_entry = self.schema.get_field_entry(field); if let FieldType::Bytes(bytes_option) = field_entry.field_type() { diff --git a/src/fastfield/serializer.rs b/src/fastfield/serializer.rs index fc03f7d21..a3841ff06 100644 --- a/src/fastfield/serializer.rs +++ b/src/fastfield/serializer.rs @@ -107,8 +107,8 @@ impl<'a, W: Write> FastSingleFieldSerializer<'a, W> { let num_bits = compute_num_bits(amplitude); let bit_packer = BitPacker::new(); Ok(FastSingleFieldSerializer { - write, bit_packer, + write, min_value, num_bits, }) diff --git a/src/fastfield/writer.rs b/src/fastfield/writer.rs index 0d5f31ced..c4b6c4d21 100644 --- a/src/fastfield/writer.rs +++ b/src/fastfield/writer.rs @@ -1,6 +1,7 @@ use super::multivalued::MultiValuedFastFieldWriter; use crate::common; use crate::fastfield::{BytesFastFieldWriter, FastFieldSerializer}; +use crate::indexer::doc_id_mapping::DocIdMapping; use crate::postings::UnorderedTermId; use crate::schema::{Cardinality, Document, Field, FieldEntry, FieldType, Schema}; use crate::termdict::TermOrdinal; @@ -90,7 +91,15 @@ impl FastFieldsWriter { } /// Get the `FastFieldWriter` associated to a field. - pub fn get_field_writer(&mut self, field: Field) -> Option<&mut IntFastFieldWriter> { + pub fn get_field_writer(&self, field: Field) -> Option<&IntFastFieldWriter> { + // TODO optimize + self.single_value_writers + .iter() + .find(|field_writer| field_writer.field() == field) + } + + /// Get the `FastFieldWriter` associated to a field. + pub fn get_field_writer_mut(&mut self, field: Field) -> Option<&mut IntFastFieldWriter> { // TODO optimize self.single_value_writers .iter_mut() @@ -101,7 +110,7 @@ impl FastFieldsWriter { /// /// Returns None if the field does not exist, or is not /// configured as a multivalued fastfield in the schema. - pub fn get_multivalue_writer( + pub fn get_multivalue_writer_mut( &mut self, field: Field, ) -> Option<&mut MultiValuedFastFieldWriter> { @@ -115,7 +124,7 @@ impl FastFieldsWriter { /// /// Returns None if the field does not exist, or is not /// configured as a bytes fastfield in the schema. - pub fn get_bytes_writer(&mut self, field: Field) -> Option<&mut BytesFastFieldWriter> { + pub fn get_bytes_writer_mut(&mut self, field: Field) -> Option<&mut BytesFastFieldWriter> { // TODO optimize self.bytes_value_writers .iter_mut() @@ -141,17 +150,18 @@ impl FastFieldsWriter { &self, serializer: &mut FastFieldSerializer, mapping: &HashMap>, + doc_id_map: Option<&DocIdMapping>, ) -> io::Result<()> { for field_writer in &self.single_value_writers { - field_writer.serialize(serializer)?; + field_writer.serialize(serializer, doc_id_map)?; } for field_writer in &self.multi_values_writers { let field = field_writer.field(); - field_writer.serialize(serializer, mapping.get(&field))?; + field_writer.serialize(serializer, mapping.get(&field), doc_id_map)?; } for field_writer in &self.bytes_value_writers { - field_writer.serialize(serializer)?; + field_writer.serialize(serializer, doc_id_map)?; } Ok(()) } @@ -254,19 +264,32 @@ impl IntFastFieldWriter { self.add_val(val); } + /// Extract the stored data + pub(crate) fn get_data(&self) -> Vec { + self.vals.iter().collect::>() + } + /// Push the fast fields value to the `FastFieldWriter`. - pub fn serialize(&self, serializer: &mut FastFieldSerializer) -> io::Result<()> { + pub fn serialize( + &self, + serializer: &mut FastFieldSerializer, + doc_id_map: Option<&DocIdMapping>, + ) -> io::Result<()> { let (min, max) = if self.val_min > self.val_max { (0, 0) } else { (self.val_min, self.val_max) }; - let mut single_field_serializer = serializer.new_u64_fast_field(self.field, min, max)?; - - for val in self.vals.iter() { - single_field_serializer.add_val(val)?; - } + if let Some(doc_id_map) = doc_id_map { + for doc_id in doc_id_map.iter_old_doc_ids() { + single_field_serializer.add_val(self.vals.get(*doc_id as usize))?; + } + } else { + for val in self.vals.iter() { + single_field_serializer.add_val(val)?; + } + }; single_field_serializer.close_field() } diff --git a/src/fieldnorm/reader.rs b/src/fieldnorm/reader.rs index 837e55336..71535e3f9 100644 --- a/src/fieldnorm/reader.rs +++ b/src/fieldnorm/reader.rs @@ -117,8 +117,8 @@ impl FieldNormReader { /// The fieldnorm is a value approximating the number /// of tokens in a given field of the `doc_id`. /// - /// It is imprecise, and always lower than the actual - /// number of tokens. + /// It is imprecise, and equal or lower than + /// the actual number of tokens. /// /// The fieldnorm is effectively decoded from the /// `fieldnorm_id` by doing a simple table lookup. diff --git a/src/fieldnorm/writer.rs b/src/fieldnorm/writer.rs index 5b99a9a92..9c764ad4c 100644 --- a/src/fieldnorm/writer.rs +++ b/src/fieldnorm/writer.rs @@ -1,4 +1,4 @@ -use crate::DocId; +use crate::{indexer::doc_id_mapping::DocIdMapping, DocId}; use super::fieldnorm_to_id; use super::FieldNormsSerializer; @@ -87,10 +87,23 @@ impl FieldNormsWriter { } /// Serialize the seen fieldnorm values to the serializer for all fields. - pub fn serialize(&self, mut fieldnorms_serializer: FieldNormsSerializer) -> io::Result<()> { + pub fn serialize( + &self, + mut fieldnorms_serializer: FieldNormsSerializer, + doc_id_map: Option<&DocIdMapping>, + ) -> io::Result<()> { for &field in self.fields.iter() { let fieldnorm_values: &[u8] = &self.fieldnorms_buffer[field.field_id() as usize][..]; - fieldnorms_serializer.serialize_field(field, fieldnorm_values)?; + if let Some(doc_id_map) = doc_id_map { + let mut mapped_fieldnorm_values = vec![]; + mapped_fieldnorm_values.resize(fieldnorm_values.len(), 0u8); + for (new_doc_id, old_doc_id) in doc_id_map.iter_old_doc_ids().enumerate() { + mapped_fieldnorm_values[new_doc_id] = fieldnorm_values[*old_doc_id as usize]; + } + fieldnorms_serializer.serialize_field(field, &mapped_fieldnorm_values)?; + } else { + fieldnorms_serializer.serialize_field(field, fieldnorm_values)?; + } } fieldnorms_serializer.close()?; Ok(()) diff --git a/src/indexer/doc_id_mapping.rs b/src/indexer/doc_id_mapping.rs new file mode 100644 index 000000000..a3760e509 --- /dev/null +++ b/src/indexer/doc_id_mapping.rs @@ -0,0 +1,423 @@ +//! This module is used when sorting the index by a property, e.g. +//! to get mappings from old doc_id to new doc_id and vice versa, after sorting +//! + +use super::SegmentWriter; +use crate::{ + schema::{Field, Schema}, + DocId, IndexSortByField, Order, TantivyError, +}; +use std::cmp::Reverse; + +/// Struct to provide mapping from old doc_id to new doc_id and vice versa +pub struct DocIdMapping { + new_doc_id_to_old: Vec, + old_doc_id_to_new: Vec, +} + +impl DocIdMapping { + /// returns the new doc_id for the old doc_id + pub fn get_new_doc_id(&self, doc_id: DocId) -> DocId { + self.old_doc_id_to_new[doc_id as usize] + } + /// returns the old doc_id for the new doc_id + pub fn get_old_doc_id(&self, doc_id: DocId) -> DocId { + self.new_doc_id_to_old[doc_id as usize] + } + /// iterate over old doc_ids in order of the new doc_ids + pub fn iter_old_doc_ids(&self) -> std::slice::Iter<'_, DocId> { + self.new_doc_id_to_old.iter() + } +} + +pub(crate) fn expect_field_id_for_sort_field( + schema: &Schema, + sort_by_field: &IndexSortByField, +) -> crate::Result { + schema.get_field(&sort_by_field.field).ok_or_else(|| { + TantivyError::InvalidArgument(format!( + "field to sort index by not found: {:?}", + sort_by_field.field + )) + }) +} + +// Generates a document mapping in the form of [index new doc_id] -> old doc_id +// TODO detect if field is already sorted and discard mapping +pub(crate) fn get_doc_id_mapping_from_field( + sort_by_field: IndexSortByField, + segment_writer: &SegmentWriter, +) -> crate::Result { + let schema = segment_writer.segment_serializer.segment().schema(); + let field_id = expect_field_id_for_sort_field(&schema, &sort_by_field)?; // for now expect fastfield, but not strictly required + let fast_field = segment_writer + .fast_field_writers + .get_field_writer(field_id) + .ok_or_else(|| { + TantivyError::InvalidArgument(format!( + "sort index by field is required to be a fast field {:?}", + sort_by_field.field + )) + })?; + + // create new doc_id to old doc_id index (used in fast_field_writers) + let data = fast_field.get_data(); + let mut doc_id_and_data = data + .into_iter() + .enumerate() + .map(|el| (el.0 as DocId, el.1)) + .collect::>(); + if sort_by_field.order == Order::Desc { + doc_id_and_data.sort_by_key(|k| Reverse(k.1)); + } else { + doc_id_and_data.sort_by_key(|k| k.1); + } + let new_doc_id_to_old = doc_id_and_data + .into_iter() + .map(|el| el.0) + .collect::>(); + + // create old doc_id to new doc_id index (used in posting recorder) + let max_doc = new_doc_id_to_old.len(); + let mut old_doc_id_to_new = vec![0; max_doc]; + for i in 0..max_doc { + old_doc_id_to_new[new_doc_id_to_old[i] as usize] = i as DocId; + } + let doc_id_map = DocIdMapping { + new_doc_id_to_old, + old_doc_id_to_new, + }; + Ok(doc_id_map) +} + +#[cfg(test)] +mod tests_indexsorting { + use crate::{collector::TopDocs, query::QueryParser, schema::*}; + use crate::{schema::Schema, DocAddress}; + use crate::{Index, IndexSettings, IndexSortByField, Order}; + + fn create_test_index( + index_settings: Option, + text_field_options: TextOptions, + ) -> Index { + let mut schema_builder = Schema::builder(); + + let my_text_field = schema_builder.add_text_field("text_field", text_field_options); + let my_string_field = schema_builder.add_text_field("string_field", STRING | STORED); + let my_number = schema_builder.add_u64_field( + "my_number", + IntOptions::default().set_fast(Cardinality::SingleValue), + ); + + let multi_numbers = schema_builder.add_u64_field( + "multi_numbers", + IntOptions::default().set_fast(Cardinality::MultiValues), + ); + + let schema = schema_builder.build(); + let mut index_builder = Index::builder().schema(schema); + if let Some(settings) = index_settings { + index_builder = index_builder.settings(settings); + } + let index = index_builder.create_in_ram().unwrap(); + + let mut index_writer = index.writer_for_tests().unwrap(); + index_writer.add_document(doc!(my_number=>40_u64)); + index_writer + .add_document(doc!(my_number=>20_u64, multi_numbers => 5_u64, multi_numbers => 6_u64)); + index_writer.add_document(doc!(my_number=>100_u64)); + index_writer.add_document( + doc!(my_number=>10_u64, my_string_field=> "blublub", my_text_field => "some text"), + ); + index_writer.add_document(doc!(my_number=>30_u64, multi_numbers => 3_u64 )); + index_writer.commit().unwrap(); + index + } + fn get_text_options() -> TextOptions { + TextOptions::default().set_indexing_options( + TextFieldIndexing::default().set_index_option(IndexRecordOption::Basic), + ) + } + #[test] + fn test_sort_index_test_text_field() -> crate::Result<()> { + // there are different serializers for different settings in postings/recorder.rs + // test remapping for all of them + let options = vec![ + get_text_options(), + get_text_options().set_indexing_options( + TextFieldIndexing::default().set_index_option(IndexRecordOption::WithFreqs), + ), + get_text_options().set_indexing_options( + TextFieldIndexing::default() + .set_index_option(IndexRecordOption::WithFreqsAndPositions), + ), + ]; + + for option in options { + //let options = get_text_options(); + // no index_sort + let index = create_test_index(None, option.clone()); + let my_text_field = index.schema().get_field("text_field").unwrap(); + let searcher = index.reader()?.searcher(); + + let query = QueryParser::for_index(&index, vec![my_text_field]).parse_query("text")?; + let top_docs: Vec<(f32, DocAddress)> = + searcher.search(&query, &TopDocs::with_limit(3))?; + assert_eq!( + top_docs.iter().map(|el| el.1.doc_id).collect::>(), + vec![3] + ); + + // sort by field asc + let index = create_test_index( + Some(IndexSettings { + sort_by_field: Some(IndexSortByField { + field: "my_number".to_string(), + order: Order::Asc, + }), + }), + option.clone(), + ); + let my_text_field = index.schema().get_field("text_field").unwrap(); + let reader = index.reader()?; + let searcher = reader.searcher(); + + let query = QueryParser::for_index(&index, vec![my_text_field]).parse_query("text")?; + let top_docs: Vec<(f32, DocAddress)> = + searcher.search(&query, &TopDocs::with_limit(3))?; + assert_eq!( + top_docs.iter().map(|el| el.1.doc_id).collect::>(), + vec![0] + ); + + // test new field norm mapping + { + let my_text_field = index.schema().get_field("text_field").unwrap(); + let fieldnorm_reader = searcher + .segment_reader(0) + .get_fieldnorms_reader(my_text_field)?; + assert_eq!(fieldnorm_reader.fieldnorm(0), 2); // some text + assert_eq!(fieldnorm_reader.fieldnorm(1), 0); + } + // sort by field desc + let index = create_test_index( + Some(IndexSettings { + sort_by_field: Some(IndexSortByField { + field: "my_number".to_string(), + order: Order::Desc, + }), + }), + option.clone(), + ); + let my_string_field = index.schema().get_field("text_field").unwrap(); + let searcher = index.reader()?.searcher(); + + let query = + QueryParser::for_index(&index, vec![my_string_field]).parse_query("text")?; + let top_docs: Vec<(f32, DocAddress)> = + searcher.search(&query, &TopDocs::with_limit(3))?; + assert_eq!( + top_docs.iter().map(|el| el.1.doc_id).collect::>(), + vec![4] + ); + // test new field norm mapping + { + let my_text_field = index.schema().get_field("text_field").unwrap(); + let fieldnorm_reader = searcher + .segment_reader(0) + .get_fieldnorms_reader(my_text_field)?; + assert_eq!(fieldnorm_reader.fieldnorm(0), 0); + assert_eq!(fieldnorm_reader.fieldnorm(1), 0); + assert_eq!(fieldnorm_reader.fieldnorm(2), 0); + assert_eq!(fieldnorm_reader.fieldnorm(3), 0); + assert_eq!(fieldnorm_reader.fieldnorm(4), 2); // some text + } + } + Ok(()) + } + #[test] + fn test_sort_index_get_documents() -> crate::Result<()> { + // default baseline + let index = create_test_index(None, get_text_options()); + let my_string_field = index.schema().get_field("string_field").unwrap(); + let searcher = index.reader()?.searcher(); + { + assert_eq!( + searcher + .doc(DocAddress::new(0, 0))? + .get_first(my_string_field), + None + ); + assert_eq!( + searcher + .doc(DocAddress::new(0, 3))? + .get_first(my_string_field) + .unwrap() + .text(), + Some("blublub") + ); + } + // sort by field asc + let index = create_test_index( + Some(IndexSettings { + sort_by_field: Some(IndexSortByField { + field: "my_number".to_string(), + order: Order::Asc, + }), + }), + get_text_options(), + ); + let my_string_field = index.schema().get_field("string_field").unwrap(); + let searcher = index.reader()?.searcher(); + { + assert_eq!( + searcher + .doc(DocAddress::new(0, 0))? + .get_first(my_string_field) + .unwrap() + .text(), + Some("blublub") + ); + let doc = searcher.doc(DocAddress::new(0, 4))?; + assert_eq!(doc.get_first(my_string_field), None); + } + // sort by field desc + let index = create_test_index( + Some(IndexSettings { + sort_by_field: Some(IndexSortByField { + field: "my_number".to_string(), + order: Order::Desc, + }), + }), + get_text_options(), + ); + let my_string_field = index.schema().get_field("string_field").unwrap(); + let searcher = index.reader()?.searcher(); + { + let doc = searcher.doc(DocAddress::new(0, 4))?; + assert_eq!( + doc.get_first(my_string_field).unwrap().text(), + Some("blublub") + ); + } + Ok(()) + } + + #[test] + fn test_sort_index_test_string_field() -> crate::Result<()> { + let index = create_test_index(None, get_text_options()); + let my_string_field = index.schema().get_field("string_field").unwrap(); + let searcher = index.reader()?.searcher(); + + let query = QueryParser::for_index(&index, vec![my_string_field]).parse_query("blublub")?; + let top_docs: Vec<(f32, DocAddress)> = searcher.search(&query, &TopDocs::with_limit(3))?; + assert_eq!( + top_docs.iter().map(|el| el.1.doc_id).collect::>(), + vec![3] + ); + + let index = create_test_index( + Some(IndexSettings { + sort_by_field: Some(IndexSortByField { + field: "my_number".to_string(), + order: Order::Asc, + }), + }), + get_text_options(), + ); + let my_string_field = index.schema().get_field("string_field").unwrap(); + let reader = index.reader()?; + let searcher = reader.searcher(); + + let query = QueryParser::for_index(&index, vec![my_string_field]).parse_query("blublub")?; + let top_docs: Vec<(f32, DocAddress)> = searcher.search(&query, &TopDocs::with_limit(3))?; + assert_eq!( + top_docs.iter().map(|el| el.1.doc_id).collect::>(), + vec![0] + ); + + // test new field norm mapping + { + let my_text_field = index.schema().get_field("text_field").unwrap(); + let fieldnorm_reader = searcher + .segment_reader(0) + .get_fieldnorms_reader(my_text_field)?; + assert_eq!(fieldnorm_reader.fieldnorm(0), 2); // some text + assert_eq!(fieldnorm_reader.fieldnorm(1), 0); + } + // sort by field desc + let index = create_test_index( + Some(IndexSettings { + sort_by_field: Some(IndexSortByField { + field: "my_number".to_string(), + order: Order::Desc, + }), + }), + get_text_options(), + ); + let my_string_field = index.schema().get_field("string_field").unwrap(); + let searcher = index.reader()?.searcher(); + + let query = QueryParser::for_index(&index, vec![my_string_field]).parse_query("blublub")?; + let top_docs: Vec<(f32, DocAddress)> = searcher.search(&query, &TopDocs::with_limit(3))?; + assert_eq!( + top_docs.iter().map(|el| el.1.doc_id).collect::>(), + vec![4] + ); + // test new field norm mapping + { + let my_text_field = index.schema().get_field("text_field").unwrap(); + let fieldnorm_reader = searcher + .segment_reader(0) + .get_fieldnorms_reader(my_text_field)?; + assert_eq!(fieldnorm_reader.fieldnorm(0), 0); + assert_eq!(fieldnorm_reader.fieldnorm(1), 0); + assert_eq!(fieldnorm_reader.fieldnorm(2), 0); + assert_eq!(fieldnorm_reader.fieldnorm(3), 0); + assert_eq!(fieldnorm_reader.fieldnorm(4), 2); // some text + } + Ok(()) + } + + #[test] + fn test_sort_index_fast_field() -> crate::Result<()> { + let index = create_test_index( + Some(IndexSettings { + sort_by_field: Some(IndexSortByField { + field: "my_number".to_string(), + order: Order::Asc, + }), + }), + get_text_options(), + ); + assert_eq!( + index.settings().sort_by_field.as_ref().unwrap().field, + "my_number".to_string() + ); + + let searcher = index.reader()?.searcher(); + assert_eq!(searcher.segment_readers().len(), 1); + let segment_reader = searcher.segment_reader(0); + let fast_fields = segment_reader.fast_fields(); + let my_number = index.schema().get_field("my_number").unwrap(); + + let fast_field = fast_fields.u64(my_number).unwrap(); + assert_eq!(fast_field.get(0u32), 10u64); + assert_eq!(fast_field.get(1u32), 20u64); + assert_eq!(fast_field.get(2u32), 30u64); + + let multi_numbers = index.schema().get_field("multi_numbers").unwrap(); + let multifield = fast_fields.u64s(multi_numbers).unwrap(); + let mut vals = vec![]; + multifield.get_vals(0u32, &mut vals); + assert_eq!(vals, &[] as &[u64]); + let mut vals = vec![]; + multifield.get_vals(1u32, &mut vals); + assert_eq!(vals, &[5, 6]); + + let mut vals = vec![]; + multifield.get_vals(2u32, &mut vals); + assert_eq!(vals, &[3]); + Ok(()) + } +} diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index eac379ca4..dee4fefbc 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -53,7 +53,7 @@ const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000; // Group of operations. // Most of the time, users will send operation one-by-one, but it can be useful to // send them as a small block to ensure that -// - all docs in the operation will happen on the same segment and continuous docids. +// - all docs in the operation will happen on the same segment and continuous doc_ids. // - all operations in the group are committed at the same time, making the group // atomic. type OperationGroup = SmallVec<[AddOperation; 4]>; @@ -239,11 +239,10 @@ fn index_documents( last_docstamp, )?; - let segment_entry = SegmentEntry::new( - segment_with_max_doc.meta().clone(), - delete_cursor, - delete_bitset_opt, - ); + let meta = segment_with_max_doc.meta().clone(); + meta.untrack_temp_docstore(); + // update segment_updater inventory to remove tempstore + let segment_entry = SegmentEntry::new(meta, delete_cursor, delete_bitset_opt); block_on(segment_updater.schedule_add_segment(segment_entry))?; Ok(true) } diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 6004d2984..cf934f418 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -1,11 +1,4 @@ -use tantivy_bitpacker::minmax; - -use crate::common::MAX_DOC_LIMIT; -use crate::core::Segment; -use crate::core::SegmentReader; -use crate::core::SerializableSegment; -use crate::docset::{DocSet, TERMINATED}; -use crate::fastfield::BytesFastFieldReader; +use super::doc_id_mapping::DocIdMapping; use crate::fastfield::DeleteBitSet; use crate::fastfield::FastFieldReader; use crate::fastfield::FastFieldSerializer; @@ -22,10 +15,21 @@ use crate::schema::{Field, Schema}; use crate::store::StoreWriter; use crate::termdict::TermMerger; use crate::termdict::TermOrdinal; +use crate::{common::HasLen, fastfield::MultiValueLength}; +use crate::{common::MAX_DOC_LIMIT, IndexSettings}; +use crate::{core::Segment, indexer::doc_id_mapping::expect_field_id_for_sort_field}; +use crate::{core::SegmentReader, Order}; +use crate::{core::SerializableSegment, IndexSortByField}; +use crate::{ + docset::{DocSet, TERMINATED}, + SegmentOrdinal, +}; use crate::{DocId, InvertedIndexReader, SegmentComponent}; +use itertools::Itertools; use std::cmp; use std::collections::HashMap; use std::sync::Arc; +use tantivy_bitpacker::minmax; fn compute_total_num_tokens(readers: &[SegmentReader], field: Field) -> crate::Result { let mut total_tokens = 0u64; @@ -54,7 +58,28 @@ fn compute_total_num_tokens(readers: &[SegmentReader], field: Field) -> crate::R .sum::()) } +/// `ReaderWithOrdinal` is used to be able to easier associate +/// data with a `SegmentReader`. The ordinal is supposed to be +/// used as an index access. +/// +/// The ordinal identifies the position within `Merger` readers. +#[derive(Clone, Copy)] +pub(crate) struct SegmentReaderWithOrdinal<'a> { + pub reader: &'a SegmentReader, + pub ordinal: SegmentOrdinal, +} + +impl<'a> From<(usize, &'a SegmentReader)> for SegmentReaderWithOrdinal<'a> { + fn from(data: (usize, &'a SegmentReader)) -> Self { + SegmentReaderWithOrdinal { + reader: data.1, + ordinal: data.0 as u32, + } + } +} + pub struct IndexMerger { + index_settings: IndexSettings, schema: Schema, readers: Vec, max_doc: u32, @@ -143,7 +168,11 @@ impl DeltaComputer { } impl IndexMerger { - pub fn open(schema: Schema, segments: &[Segment]) -> crate::Result { + pub fn open( + schema: Schema, + index_settings: IndexSettings, + segments: &[Segment], + ) -> crate::Result { let mut readers = vec![]; let mut max_doc: u32 = 0u32; for segment in segments { @@ -163,6 +192,7 @@ impl IndexMerger { } Ok(IndexMerger { schema, + index_settings, readers, max_doc, }) @@ -171,17 +201,27 @@ impl IndexMerger { fn write_fieldnorms( &self, mut fieldnorms_serializer: FieldNormsSerializer, + doc_id_mapping: &Option>, ) -> crate::Result<()> { let fields = FieldNormsWriter::fields_with_fieldnorm(&self.schema); let mut fieldnorms_data = Vec::with_capacity(self.max_doc as usize); for field in fields { fieldnorms_data.clear(); - for reader in &self.readers { - let fieldnorms_reader = reader.get_fieldnorms_reader(field)?; - for doc_id in reader.doc_ids_alive() { - let fieldnorm_id = fieldnorms_reader.fieldnorm_id(doc_id); + if let Some(doc_id_mapping) = doc_id_mapping { + for (doc_id, reader_with_ordinal) in doc_id_mapping { + let fieldnorms_reader = + reader_with_ordinal.reader.get_fieldnorms_reader(field)?; + let fieldnorm_id = fieldnorms_reader.fieldnorm_id(*doc_id); fieldnorms_data.push(fieldnorm_id); } + } else { + for reader in &self.readers { + let fieldnorms_reader = reader.get_fieldnorms_reader(field)?; + for doc_id in reader.doc_ids_alive() { + let fieldnorm_id = fieldnorms_reader.fieldnorm_id(doc_id); + fieldnorms_data.push(fieldnorm_id); + } + } } fieldnorms_serializer.serialize_field(field, &fieldnorms_data[..])?; } @@ -193,6 +233,7 @@ impl IndexMerger { &self, fast_field_serializer: &mut FastFieldSerializer, mut term_ord_mappings: HashMap, + doc_id_mapping: &Option>, ) -> crate::Result<()> { for (field, field_entry) in self.schema.fields() { let field_type = field_entry.field_type(); @@ -206,6 +247,7 @@ impl IndexMerger { field, &term_ordinal_mapping, fast_field_serializer, + doc_id_mapping, )?; } FieldType::U64(ref options) @@ -213,10 +255,10 @@ impl IndexMerger { | FieldType::F64(ref options) | FieldType::Date(ref options) => match options.get_fastfield_cardinality() { Some(Cardinality::SingleValue) => { - self.write_single_fast_field(field, fast_field_serializer)?; + self.write_single_fast_field(field, fast_field_serializer, doc_id_mapping)?; } Some(Cardinality::MultiValues) => { - self.write_multi_fast_field(field, fast_field_serializer)?; + self.write_multi_fast_field(field, fast_field_serializer, doc_id_mapping)?; } None => {} }, @@ -227,7 +269,7 @@ impl IndexMerger { } FieldType::Bytes(byte_options) => { if byte_options.is_fast() { - self.write_bytes_fast_field(field, fast_field_serializer)?; + self.write_bytes_fast_field(field, fast_field_serializer, doc_id_mapping)?; } } } @@ -240,110 +282,246 @@ impl IndexMerger { &self, field: Field, fast_field_serializer: &mut FastFieldSerializer, + doc_id_mapping: &Option>, ) -> crate::Result<()> { - let mut u64_readers = vec![]; - let mut min_value = u64::max_value(); - let mut max_value = u64::min_value(); - - for reader in &self.readers { - let u64_reader: FastFieldReader = reader + let (min_value, max_value) = self.readers.iter().map(|reader|{ + let u64_reader: FastFieldReader = reader .fast_fields() .typed_fast_field_reader(field) .expect("Failed to find a reader for single fast field. This is a tantivy bug and it should never happen."); - if let Some((seg_min_val, seg_max_val)) = compute_min_max_val(&u64_reader, reader.max_doc(), reader.delete_bitset()) - { - // the segment has some non-deleted documents - min_value = cmp::min(min_value, seg_min_val); - max_value = cmp::max(max_value, seg_max_val); - u64_readers.push((reader.max_doc(), u64_reader, reader.delete_bitset())); - } else { - // all documents have been deleted. + }) + .filter_map(|x| x) + .reduce(|a, b| { + (a.0.min(b.0), a.1.max(b.1)) + }).expect("Unexpected error, empty readers in IndexMerger"); + + let fast_field_readers = self + .readers + .iter() + .map(|reader| { + let u64_reader: FastFieldReader = reader + .fast_fields() + .typed_fast_field_reader(field) + .expect("Failed to find a reader for single fast field. This is a tantivy bug and it should never happen."); + u64_reader + }) + .collect::>(); + if let Some(doc_id_mapping) = doc_id_mapping { + let sorted_doc_ids = doc_id_mapping.iter().map(|(doc_id, reader_with_ordinal)| { + ( + doc_id, + &fast_field_readers[reader_with_ordinal.ordinal as usize], + ) + }); + // add values in order of the new doc_ids + let mut fast_single_field_serializer = + fast_field_serializer.new_u64_fast_field(field, min_value, max_value)?; + for (doc_id, field_reader) in sorted_doc_ids { + let val = field_reader.get(*doc_id); + fast_single_field_serializer.add_val(val)?; } - } - if min_value > max_value { - // There is not a single document remaining in the index. - min_value = 0; - max_value = 0; - } + fast_single_field_serializer.close_field()?; + Ok(()) + } else { + let u64_readers = self.readers.iter() + .filter(|reader|reader.max_doc() != reader.delete_bitset().map(|bit_set|bit_set.len() as u32).unwrap_or(0)) + .map(|reader|{ + let u64_reader: FastFieldReader = reader + .fast_fields() + .typed_fast_field_reader(field) + .expect("Failed to find a reader for single fast field. This is a tantivy bug and it should never happen."); + (reader.max_doc(), u64_reader, reader.delete_bitset()) + }).collect::>(); - let mut fast_single_field_serializer = - fast_field_serializer.new_u64_fast_field(field, min_value, max_value)?; - for (max_doc, u64_reader, delete_bitset_opt) in u64_readers { - for doc_id in 0u32..max_doc { - let is_deleted = delete_bitset_opt - .map(|delete_bitset| delete_bitset.is_deleted(doc_id)) - .unwrap_or(false); - if !is_deleted { - let val = u64_reader.get(doc_id); - fast_single_field_serializer.add_val(val)?; + let mut fast_single_field_serializer = + fast_field_serializer.new_u64_fast_field(field, min_value, max_value)?; + for (max_doc, u64_reader, delete_bitset_opt) in u64_readers { + for doc_id in 0u32..max_doc { + let is_deleted = delete_bitset_opt + .map(|delete_bitset| delete_bitset.is_deleted(doc_id)) + .unwrap_or(false); + if !is_deleted { + let val = u64_reader.get(doc_id); + fast_single_field_serializer.add_val(val)?; + } } } - } - fast_single_field_serializer.close_field()?; - Ok(()) + fast_single_field_serializer.close_field()?; + Ok(()) + } } - fn write_fast_field_idx( + /// Generates the doc_id mapping where position in the vec=new + /// doc_id. + /// ReaderWithOrdinal will include the ordinal position of the + /// reader in self.readers. + pub(crate) fn generate_doc_id_mapping( &self, + sort_by_field: &IndexSortByField, + ) -> crate::Result> { + let reader_and_field_accessors = self + .readers + .iter() + .enumerate() + .map(|reader| { + let reader_with_ordinal: SegmentReaderWithOrdinal = reader.into(); + let field_id = expect_field_id_for_sort_field( + &reader_with_ordinal.reader.schema(), + &sort_by_field, + )?; // for now expect fastfield, but not strictly required + let value_accessor = reader_with_ordinal + .reader + .fast_fields() + .u64_lenient(field_id)?; + Ok((reader_with_ordinal, value_accessor)) + }) + .collect::>>()?; // Collecting to bind the lifetime of value_accessor into the vec, or can't be used as a reference. + // Loading the field accessor on demand causes a 15x regression + + // create iterators over segment/sort_accessor/doc_id tuple + let doc_id_reader_pair = reader_and_field_accessors + .iter() + .map(|reader_and_field_accessor| { + reader_and_field_accessor + .0 + .reader + .doc_ids_alive() + .map(move |doc_id| { + ( + doc_id, + reader_and_field_accessor.0, + &reader_and_field_accessor.1, + ) + }) + }) + .collect::>(); + + // create iterator tuple of (old doc_id, reader) in order of the new doc_ids + let sorted_doc_ids: Vec<(DocId, SegmentReaderWithOrdinal)> = doc_id_reader_pair + .into_iter() + .kmerge_by(|a, b| { + let val1 = a.2.get(a.0); + let val2 = b.2.get(b.0); + if sort_by_field.order == Order::Asc { + val1 < val2 + } else { + val1 > val2 + } + }) + .map(|(doc_id, reader_with_id, _)| (doc_id, reader_with_id)) + .collect::>(); + Ok(sorted_doc_ids) + } + + // Creating the index file to point into the data, generic over `BytesFastFieldReader` and + // `MultiValuedFastFieldReader` + // + // Important: reader_and_field_accessor needs + // to have the same order as self.readers since ReaderWithOrdinal + // is used to index the reader_and_field_accessors vec. + fn write_1_n_fast_field_idx_generic( field: Field, fast_field_serializer: &mut FastFieldSerializer, + doc_id_mapping: &Option>, + reader_and_field_accessors: &[(&SegmentReader, impl MultiValueLength)], ) -> crate::Result<()> { let mut total_num_vals = 0u64; - let mut u64s_readers: Vec> = Vec::new(); - // In the first pass, we compute the total number of vals. // // This is required by the bitpacker, as it needs to know // what should be the bit length use for bitpacking. - for reader in &self.readers { - let u64s_reader = reader.fast_fields() - .typed_fast_field_multi_reader(field) - .expect("Failed to find index for multivalued field. This is a bug in tantivy, please report."); + for (reader, u64s_reader) in reader_and_field_accessors.iter() { if let Some(delete_bitset) = reader.delete_bitset() { for doc in 0u32..reader.max_doc() { if delete_bitset.is_alive(doc) { - let num_vals = u64s_reader.num_vals(doc) as u64; + let num_vals = u64s_reader.get_len(doc) as u64; total_num_vals += num_vals; } } } else { - total_num_vals += u64s_reader.total_num_vals(); + total_num_vals += u64s_reader.get_total_len(); } - u64s_readers.push(u64s_reader); } // We can now create our `idx` serializer, and in a second pass, // can effectively push the different indexes. - let mut serialize_idx = - fast_field_serializer.new_u64_fast_field_with_idx(field, 0, total_num_vals, 0)?; - let mut idx = 0; - for (segment_reader, u64s_reader) in self.readers.iter().zip(&u64s_readers) { - for doc in segment_reader.doc_ids_alive() { - serialize_idx.add_val(idx)?; - idx += u64s_reader.num_vals(doc) as u64; + if let Some(doc_id_mapping) = doc_id_mapping { + let mut serialize_idx = + fast_field_serializer.new_u64_fast_field_with_idx(field, 0, total_num_vals, 0)?; + + let mut offset = 0; + for (doc_id, reader) in doc_id_mapping { + let reader = &reader_and_field_accessors[reader.ordinal as usize].1; + serialize_idx.add_val(offset)?; + offset += reader.get_len(*doc_id) as u64; } + serialize_idx.add_val(offset as u64)?; + + serialize_idx.close_field()?; + } else { + let mut serialize_idx = + fast_field_serializer.new_u64_fast_field_with_idx(field, 0, total_num_vals, 0)?; + let mut idx = 0; + for (segment_reader, u64s_reader) in reader_and_field_accessors.iter() { + for doc in segment_reader.doc_ids_alive() { + serialize_idx.add_val(idx)?; + idx += u64s_reader.get_len(doc) as u64; + } + } + serialize_idx.add_val(idx)?; + serialize_idx.close_field()?; } - serialize_idx.add_val(idx)?; - serialize_idx.close_field()?; Ok(()) } + fn write_multi_value_fast_field_idx( + &self, + field: Field, + fast_field_serializer: &mut FastFieldSerializer, + doc_id_mapping: &Option>, + ) -> crate::Result<()> { + let reader_and_field_accessors = self.readers.iter().map(|reader|{ + let u64s_reader: MultiValuedFastFieldReader = reader.fast_fields() + .typed_fast_field_multi_reader(field) + .expect("Failed to find index for multivalued field. This is a bug in tantivy, please report."); + (reader, u64s_reader) + }).collect::>(); + + Self::write_1_n_fast_field_idx_generic( + field, + fast_field_serializer, + doc_id_mapping, + &reader_and_field_accessors, + ) + } fn write_hierarchical_facet_field( &self, field: Field, term_ordinal_mappings: &TermOrdinalMapping, fast_field_serializer: &mut FastFieldSerializer, + doc_id_mapping: &Option>, ) -> crate::Result<()> { // Multifastfield consists in 2 fastfields. // The first serves as an index into the second one and is stricly increasing. // The second contains the actual values. // First we merge the idx fast field. - self.write_fast_field_idx(field, fast_field_serializer)?; + self.write_multi_value_fast_field_idx(field, fast_field_serializer, doc_id_mapping)?; + let fast_field_reader = self + .readers + .iter() + .map(|reader| { + let ff_reader: MultiValuedFastFieldReader = reader + .fast_fields() + .u64s(field) + .expect("Could not find multivalued u64 fast value reader."); + ff_reader + }) + .collect::>(); // We can now write the actual fast field values. // In the case of hierarchical facets, they are actually term ordinals. let max_term_ord = term_ordinal_mappings.max_term_ord(); @@ -351,21 +529,32 @@ impl IndexMerger { let mut serialize_vals = fast_field_serializer.new_u64_fast_field_with_idx(field, 0u64, max_term_ord, 1)?; let mut vals = Vec::with_capacity(100); - for (segment_ord, segment_reader) in self.readers.iter().enumerate() { - let term_ordinal_mapping: &[TermOrdinal] = - term_ordinal_mappings.get_segment(segment_ord); - let ff_reader: MultiValuedFastFieldReader = segment_reader - .fast_fields() - .u64s(field) - .expect("Could not find multivalued u64 fast value reader."); - // TODO optimize if no deletes - for doc in segment_reader.doc_ids_alive() { - ff_reader.get_vals(doc, &mut vals); + if let Some(doc_id_mapping) = doc_id_mapping { + for (old_doc_id, reader_with_ordinal) in doc_id_mapping { + let term_ordinal_mapping: &[TermOrdinal] = + term_ordinal_mappings.get_segment(reader_with_ordinal.ordinal as usize); + + let ff_reader = &fast_field_reader[reader_with_ordinal.ordinal as usize]; + ff_reader.get_vals(*old_doc_id, &mut vals); for &prev_term_ord in &vals { let new_term_ord = term_ordinal_mapping[prev_term_ord as usize]; serialize_vals.add_val(new_term_ord)?; } } + } else { + for (segment_ord, segment_reader) in self.readers.iter().enumerate() { + let term_ordinal_mapping: &[TermOrdinal] = + term_ordinal_mappings.get_segment(segment_ord); + let ff_reader = &fast_field_reader[segment_ord as usize]; + // TODO optimize if no deletes + for doc in segment_reader.doc_ids_alive() { + ff_reader.get_vals(doc, &mut vals); + for &prev_term_ord in &vals { + let new_term_ord = term_ordinal_mapping[prev_term_ord as usize]; + serialize_vals.add_val(new_term_ord)?; + } + } + } } serialize_vals.close_field()?; } @@ -376,13 +565,14 @@ impl IndexMerger { &self, field: Field, fast_field_serializer: &mut FastFieldSerializer, + doc_id_mapping: &Option>, ) -> crate::Result<()> { // Multifastfield consists in 2 fastfields. // The first serves as an index into the second one and is stricly increasing. // The second contains the actual values. // First we merge the idx fast field. - self.write_fast_field_idx(field, fast_field_serializer)?; + self.write_multi_value_fast_field_idx(field, fast_field_serializer, doc_id_mapping)?; let mut min_value = u64::max_value(); let mut max_value = u64::min_value(); @@ -421,10 +611,29 @@ impl IndexMerger { max_value = 0; } + let fast_field_reader = self + .readers + .iter() + .map(|reader| { + let ff_reader : MultiValuedFastFieldReader = reader.fast_fields() + .typed_fast_field_multi_reader(field) + .expect("Failed to find index for multivalued field. This is a bug in tantivy, please report."); + ff_reader + }) + .collect::>(); + // We can now initialize our serializer, and push it the different values - { - let mut serialize_vals = fast_field_serializer - .new_u64_fast_field_with_idx(field, min_value, max_value, 1)?; + let mut serialize_vals = + fast_field_serializer.new_u64_fast_field_with_idx(field, min_value, max_value, 1)?; + if let Some(doc_id_mapping) = doc_id_mapping { + for (doc_id, reader_with_ordinal) in doc_id_mapping { + let ff_reader = &fast_field_reader[reader_with_ordinal.ordinal as usize]; + ff_reader.get_vals(*doc_id, &mut vals); + for &val in &vals { + serialize_vals.add_val(val)?; + } + } + } else { for (reader, ff_reader) in self.readers.iter().zip(ff_readers) { // TODO optimize if no deletes for doc in reader.doc_ids_alive() { @@ -434,8 +643,8 @@ impl IndexMerger { } } } - serialize_vals.close_field()?; } + serialize_vals.close_field()?; Ok(()) } @@ -443,50 +652,42 @@ impl IndexMerger { &self, field: Field, fast_field_serializer: &mut FastFieldSerializer, + doc_id_mapping: &Option>, ) -> crate::Result<()> { - let mut total_num_vals = 0u64; - let mut bytes_readers: Vec = Vec::new(); - - for reader in &self.readers { - let bytes_reader = reader.fast_fields().bytes(field)?; - if let Some(delete_bitset) = reader.delete_bitset() { - for doc in 0u32..reader.max_doc() { - if delete_bitset.is_alive(doc) { - let num_vals = bytes_reader.get_bytes(doc).len() as u64; - total_num_vals += num_vals; - } - } - } else { - total_num_vals += bytes_reader.total_num_bytes() as u64; - } - bytes_readers.push(bytes_reader); - } - - { - // We can now create our `idx` serializer, and in a second pass, - // can effectively push the different indexes. - let mut serialize_idx = - fast_field_serializer.new_u64_fast_field_with_idx(field, 0, total_num_vals, 0)?; - let mut idx = 0; - for (segment_reader, bytes_reader) in self.readers.iter().zip(&bytes_readers) { - for doc in segment_reader.doc_ids_alive() { - serialize_idx.add_val(idx)?; - idx += bytes_reader.get_bytes(doc).len() as u64; - } - } - serialize_idx.add_val(idx)?; - serialize_idx.close_field()?; - } + let reader_and_field_accessors = self + .readers + .iter() + .map(|reader| { + let bytes_reader = reader.fast_fields().bytes(field) + .expect("Failed to find index for bytes field. This is a bug in tantivy, please report."); + (reader, bytes_reader) + }) + .collect::>(); + Self::write_1_n_fast_field_idx_generic( + field, + fast_field_serializer, + doc_id_mapping, + &reader_and_field_accessors, + )?; let mut serialize_vals = fast_field_serializer.new_bytes_fast_field_with_idx(field, 1); - for segment_reader in &self.readers { - let bytes_reader = segment_reader.fast_fields().bytes(field) - .expect("Failed to find bytes field in fast field reader. This is a bug in tantivy. Please report."); - // TODO: optimize if no deletes - for doc in segment_reader.doc_ids_alive() { - let val = bytes_reader.get_bytes(doc); + if let Some(doc_id_mapping) = doc_id_mapping { + for (doc_id, reader_with_ordinal) in doc_id_mapping { + let bytes_reader = + &reader_and_field_accessors[reader_with_ordinal.ordinal as usize].1; + let val = bytes_reader.get_bytes(*doc_id); serialize_vals.write_all(val)?; } + } else { + for segment_reader in &self.readers { + let bytes_reader = segment_reader.fast_fields().bytes(field) + .expect("Failed to find bytes field in fast field reader. This is a bug in tantivy. Please report."); + // TODO: optimize if no deletes + for doc in segment_reader.doc_ids_alive() { + let val = bytes_reader.get_bytes(doc); + serialize_vals.write_all(val)?; + } + } } serialize_vals.flush()?; Ok(()) @@ -498,6 +699,7 @@ impl IndexMerger { field_type: &FieldType, serializer: &mut InvertedIndexSerializer, fieldnorm_reader: Option, + doc_id_mapping: &Option>, ) -> crate::Result> { let mut positions_buffer: Vec = Vec::with_capacity(1_000); let mut delta_computer = DeltaComputer::new(); @@ -528,19 +730,35 @@ impl IndexMerger { // map from segment doc ids to the resulting merged segment doc id. let mut merged_doc_id_map: Vec>> = Vec::with_capacity(self.readers.len()); - for reader in &self.readers { - let mut segment_local_map = Vec::with_capacity(reader.max_doc() as usize); - for doc_id in 0..reader.max_doc() { - if reader.is_deleted(doc_id) { - segment_local_map.push(None); - } else { - segment_local_map.push(Some(max_doc)); - max_doc += 1u32; - } + if let Some(doc_id_mapping) = doc_id_mapping { + merged_doc_id_map = self + .readers + .iter() + .map(|reader| { + let mut segment_local_map = vec![]; + segment_local_map.resize(reader.max_doc() as usize, None); + segment_local_map + }) + .collect(); + for (new_doc_id, (old_doc_id, segment_and_ordinal)) in doc_id_mapping.iter().enumerate() + { + let segment_map = &mut merged_doc_id_map[segment_and_ordinal.ordinal as usize]; + segment_map[*old_doc_id as usize] = Some(new_doc_id as DocId); + } + } else { + for reader in &self.readers { + let mut segment_local_map = Vec::with_capacity(reader.max_doc() as usize); + for doc_id in 0..reader.max_doc() { + if reader.is_deleted(doc_id) { + segment_local_map.push(None); + } else { + segment_local_map.push(Some(max_doc)); + max_doc += 1u32; + } + } + merged_doc_id_map.push(segment_local_map); } - merged_doc_id_map.push(segment_local_map); } - // The total number of tokens will only be exact when there has been no deletes. // // Otherwise, we approximate by removing deleted documents proportionally. @@ -555,7 +773,9 @@ impl IndexMerger { // - Segment 1's doc ids become [seg0.max_doc, seg0.max_doc + seg.max_doc] // - Segment 2's doc ids become [seg0.max_doc + seg1.max_doc, // seg0.max_doc + seg1.max_doc + seg2.max_doc] - // ... + // + // This stacking applies only when the index is not sorted, in that case the + // doc_ids are kmerged by their sort property let mut field_serializer = serializer.new_field(indexed_field, total_num_tokens, fieldnorm_reader)?; @@ -568,6 +788,7 @@ impl IndexMerger { ); let mut segment_postings_containing_the_term: Vec<(usize, SegmentPostings)> = vec![]; + let mut doc_id_and_positions = vec![]; while merged_terms.advance() { segment_postings_containing_the_term.clear(); @@ -624,18 +845,39 @@ impl IndexMerger { while doc != TERMINATED { // deleted doc are skipped as they do not have a `remapped_doc_id`. if let Some(remapped_doc_id) = old_to_new_doc_id[doc as usize] { - // we make sure to only write the term iff + // we make sure to only write the term if // there is at least one document. let term_freq = segment_postings.term_freq(); segment_postings.positions(&mut positions_buffer); - - let delta_positions = delta_computer.compute_delta(&positions_buffer); - field_serializer.write_doc(remapped_doc_id, term_freq, delta_positions); + // if doc_id_mapping exists, the docids are reordered, they are + // not just stacked. The field serializer expects monotonically increasing + // docids, so we collect and sort them first, before writing. + // + // I think this is not strictly necessary, it would be possible to + // avoid the loading into a vec via some form of kmerge, but then the merge + // logic would deviate much more from the stacking case (unsorted index) + if doc_id_mapping.is_some() { + doc_id_and_positions.push(( + remapped_doc_id, + term_freq, + positions_buffer.to_vec(), + )); + } else { + let delta_positions = delta_computer.compute_delta(&positions_buffer); + field_serializer.write_doc(remapped_doc_id, term_freq, delta_positions); + } } doc = segment_postings.advance(); } } + if doc_id_mapping.is_some() { + doc_id_and_positions.sort_unstable_by_key(|&(doc_id, _, _)| doc_id); + for (doc_id, term_freq, positions) in &doc_id_and_positions { + field_serializer.write_doc(*doc_id, *term_freq, positions); + } + doc_id_and_positions.clear(); + } // closing the term. field_serializer.close_term()?; @@ -648,6 +890,7 @@ impl IndexMerger { &self, serializer: &mut InvertedIndexSerializer, fieldnorm_readers: FieldNormReaders, + doc_id_mapping: &Option>, ) -> crate::Result> { let mut term_ordinal_mappings = HashMap::new(); for (field, field_entry) in self.schema.fields() { @@ -658,6 +901,7 @@ impl IndexMerger { field_entry.field_type(), serializer, fieldnorm_reader, + doc_id_mapping, )? { term_ordinal_mappings.insert(field, term_ordinal_mapping); } @@ -666,16 +910,33 @@ impl IndexMerger { Ok(term_ordinal_mappings) } - fn write_storable_fields(&self, store_writer: &mut StoreWriter) -> crate::Result<()> { - for reader in &self.readers { - let store_reader = reader.get_store_reader()?; - if reader.num_deleted_docs() > 0 { - for doc_id in reader.doc_ids_alive() { - let doc = store_reader.get(doc_id)?; - store_writer.store(&doc)?; + fn write_storable_fields( + &self, + store_writer: &mut StoreWriter, + doc_id_mapping: &Option>, + ) -> crate::Result<()> { + let store_readers: Vec<_> = self + .readers + .iter() + .map(|reader| reader.get_store_reader()) + .collect::>()?; + if let Some(doc_id_mapping) = doc_id_mapping { + for (old_doc_id, reader_with_ordinal) in doc_id_mapping { + let store_reader = &store_readers[reader_with_ordinal.ordinal as usize]; + let raw_doc = store_reader.get_raw(*old_doc_id)?; + store_writer.store_bytes(raw_doc.get_bytes())?; + } + } else { + for reader in &self.readers { + let store_reader = reader.get_store_reader()?; + if reader.num_deleted_docs() > 0 { + for doc_id in reader.doc_ids_alive() { + let raw_doc = store_reader.get_raw(doc_id)?; + store_writer.store_bytes(raw_doc.get_bytes())?; + } + } else { + store_writer.stack(&store_reader)?; } - } else { - store_writer.stack(&store_reader)?; } } Ok(()) @@ -683,18 +944,36 @@ impl IndexMerger { } impl SerializableSegment for IndexMerger { - fn write(&self, mut serializer: SegmentSerializer) -> crate::Result { + fn write( + &self, + mut serializer: SegmentSerializer, + _: Option<&DocIdMapping>, + ) -> crate::Result { + let doc_id_mapping = if let Some(sort_by_field) = self.index_settings.sort_by_field.as_ref() + { + Some(self.generate_doc_id_mapping(sort_by_field)?) + } else { + None + }; + if let Some(fieldnorms_serializer) = serializer.extract_fieldnorms_serializer() { - self.write_fieldnorms(fieldnorms_serializer)?; + self.write_fieldnorms(fieldnorms_serializer, &doc_id_mapping)?; } let fieldnorm_data = serializer .segment() .open_read(SegmentComponent::FieldNorms)?; let fieldnorm_readers = FieldNormReaders::open(fieldnorm_data)?; - let term_ord_mappings = - self.write_postings(serializer.get_postings_serializer(), fieldnorm_readers)?; - self.write_fast_fields(serializer.get_fast_field_serializer(), term_ord_mappings)?; - self.write_storable_fields(serializer.get_store_writer())?; + let term_ord_mappings = self.write_postings( + serializer.get_postings_serializer(), + fieldnorm_readers, + &doc_id_mapping, + )?; + self.write_fast_fields( + serializer.get_fast_field_serializer(), + term_ord_mappings, + &doc_id_mapping, + )?; + self.write_storable_fields(serializer.get_store_writer(), &doc_id_mapping)?; serializer.close()?; Ok(self.max_doc) } @@ -717,12 +996,14 @@ mod tests { use crate::schema::IntOptions; use crate::schema::Term; use crate::schema::TextFieldIndexing; - use crate::schema::INDEXED; use crate::schema::{Cardinality, TEXT}; use crate::DocAddress; + use crate::IndexSettings; + use crate::IndexSortByField; use crate::IndexWriter; use crate::Searcher; use crate::{schema, DocSet, SegmentId}; + use crate::{schema::INDEXED, Order}; use byteorder::{BigEndian, ReadBytesExt}; use futures::executor::block_on; use schema::FAST; @@ -1180,20 +1461,56 @@ mod tests { } Ok(()) } + #[test] + fn test_merge_facets_sort_none() { + test_merge_facets(None) + } #[test] - fn test_merge_facets() { + fn test_merge_facets_sort_asc() { + // the data is already sorted asc, so this should have no effect, but go through the docid + // mapping code + test_merge_facets(Some(IndexSettings { + sort_by_field: Some(IndexSortByField { + field: "intval".to_string(), + order: Order::Asc, + }), + })); + } + + #[test] + fn test_merge_facets_sort_desc() { + test_merge_facets(Some(IndexSettings { + sort_by_field: Some(IndexSortByField { + field: "intval".to_string(), + order: Order::Desc, + }), + })); + } + fn test_merge_facets(index_settings: Option) { let mut schema_builder = schema::Schema::builder(); let facet_field = schema_builder.add_facet_field("facet", INDEXED); - let index = Index::create_in_ram(schema_builder.build()); + let int_options = IntOptions::default() + .set_fast(Cardinality::SingleValue) + .set_indexed(); + let int_field = schema_builder.add_u64_field("intval", int_options); + let mut index_builder = Index::builder().schema(schema_builder.build()); + if let Some(settings) = index_settings { + index_builder = index_builder.settings(settings); + } + let index = index_builder.create_in_ram().unwrap(); + //let index = Index::create_in_ram(schema_builder.build()); let reader = index.reader().unwrap(); + let mut int_val = 0; { let mut index_writer = index.writer_for_tests().unwrap(); - let index_doc = |index_writer: &mut IndexWriter, doc_facets: &[&str]| { + let mut index_doc = |index_writer: &mut IndexWriter, doc_facets: &[&str]| { let mut doc = Document::default(); for facet in doc_facets { doc.add_facet(facet_field, Facet::from(facet)); } + doc.add_u64(int_field, int_val); + int_val += 1; index_writer.add_document(doc); }; diff --git a/src/indexer/merger_sorted_index_test.rs b/src/indexer/merger_sorted_index_test.rs new file mode 100644 index 000000000..e71ea0fe5 --- /dev/null +++ b/src/indexer/merger_sorted_index_test.rs @@ -0,0 +1,388 @@ +#[cfg(test)] +mod tests { + use crate::{ + collector::TopDocs, + schema::{Cardinality, TextFieldIndexing}, + }; + use crate::{core::Index, fastfield::MultiValuedFastFieldReader}; + use crate::{ + query::QueryParser, + schema::{IntOptions, TextOptions}, + }; + use crate::{schema::Facet, IndexSortByField}; + use crate::{schema::INDEXED, Order}; + use crate::{ + schema::{self, BytesOptions}, + DocAddress, + }; + use crate::{IndexSettings, Term}; + use futures::executor::block_on; + + fn create_test_index_posting_list_issue(index_settings: Option) -> Index { + let mut schema_builder = schema::Schema::builder(); + let int_options = IntOptions::default() + .set_fast(Cardinality::SingleValue) + .set_indexed(); + let int_field = schema_builder.add_u64_field("intval", int_options); + + let facet_field = schema_builder.add_facet_field("facet", INDEXED); + + let schema = schema_builder.build(); + + let mut index_builder = Index::builder().schema(schema); + if let Some(settings) = index_settings { + index_builder = index_builder.settings(settings); + } + let index = index_builder.create_in_ram().unwrap(); + + { + let mut index_writer = index.writer_for_tests().unwrap(); + + index_writer.add_document(doc!(int_field=>3_u64, facet_field=> Facet::from("/crime"))); + + assert!(index_writer.commit().is_ok()); + index_writer.add_document(doc!(int_field=>5_u64, facet_field=> Facet::from("/fanta"))); + + assert!(index_writer.commit().is_ok()); + } + + // Merging the segments + { + let segment_ids = index + .searchable_segment_ids() + .expect("Searchable segments failed."); + let mut index_writer = index.writer_for_tests().unwrap(); + assert!(block_on(index_writer.merge(&segment_ids)).is_ok()); + assert!(index_writer.wait_merging_threads().is_ok()); + } + index + } + + fn create_test_index(index_settings: Option) -> Index { + let mut schema_builder = schema::Schema::builder(); + let int_options = IntOptions::default() + .set_fast(Cardinality::SingleValue) + .set_stored() + .set_indexed(); + let int_field = schema_builder.add_u64_field("intval", int_options); + + let bytes_options = BytesOptions::default().set_fast().set_indexed(); + let bytes_field = schema_builder.add_bytes_field("bytes", bytes_options); + let facet_field = schema_builder.add_facet_field("facet", INDEXED); + + let multi_numbers = schema_builder.add_u64_field( + "multi_numbers", + IntOptions::default().set_fast(Cardinality::MultiValues), + ); + let text_field_options = TextOptions::default() + .set_indexing_options( + TextFieldIndexing::default() + .set_index_option(schema::IndexRecordOption::WithFreqsAndPositions), + ) + .set_stored(); + let text_field = schema_builder.add_text_field("text_field", text_field_options); + let schema = schema_builder.build(); + + let mut index_builder = Index::builder().schema(schema); + if let Some(settings) = index_settings { + index_builder = index_builder.settings(settings); + } + let index = index_builder.create_in_ram().unwrap(); + + { + let mut index_writer = index.writer_for_tests().unwrap(); + + index_writer.add_document(doc!(int_field=>1_u64)); + index_writer.add_document( + doc!(int_field=>3_u64, multi_numbers => 3_u64, multi_numbers => 4_u64, bytes_field => vec![1, 2, 3], text_field => "some text", facet_field=> Facet::from("/book/crime")), + ); + index_writer.add_document(doc!(int_field=>1_u64, text_field=> "deleteme")); + index_writer.add_document( + doc!(int_field=>2_u64, multi_numbers => 2_u64, multi_numbers => 3_u64), + ); + + assert!(index_writer.commit().is_ok()); + index_writer.add_document(doc!(int_field=>20_u64, multi_numbers => 20_u64)); + index_writer.add_document(doc!(int_field=>1_u64, text_field=> "deleteme", facet_field=> Facet::from("/book/crime"))); + assert!(index_writer.commit().is_ok()); + index_writer.add_document( + doc!(int_field=>10_u64, multi_numbers => 10_u64, multi_numbers => 11_u64, text_field=> "blubber", facet_field=> Facet::from("/book/fantasy")), + ); + index_writer.add_document(doc!(int_field=>5_u64, text_field=> "deleteme")); + index_writer.add_document( + doc!(int_field=>1_000u64, multi_numbers => 1001_u64, multi_numbers => 1002_u64, bytes_field => vec![5, 5],text_field => "the biggest num") + ); + + index_writer.delete_term(Term::from_field_text(text_field, "deleteme")); + assert!(index_writer.commit().is_ok()); + } + + // Merging the segments + { + let segment_ids = index + .searchable_segment_ids() + .expect("Searchable segments failed."); + let mut index_writer = index.writer_for_tests().unwrap(); + assert!(block_on(index_writer.merge(&segment_ids)).is_ok()); + assert!(index_writer.wait_merging_threads().is_ok()); + } + index + } + + #[test] + fn test_merge_sorted_postinglist_sort_issue() { + create_test_index_posting_list_issue(Some(IndexSettings { + sort_by_field: Some(IndexSortByField { + field: "intval".to_string(), + order: Order::Desc, + }), + })); + } + + #[test] + fn test_merge_sorted_index_desc() { + let index = create_test_index(Some(IndexSettings { + sort_by_field: Some(IndexSortByField { + field: "intval".to_string(), + order: Order::Desc, + }), + })); + + let int_field = index.schema().get_field("intval").unwrap(); + let reader = index.reader().unwrap(); + + let searcher = reader.searcher(); + assert_eq!(searcher.segment_readers().len(), 1); + let segment_reader = searcher.segment_readers().last().unwrap(); + + let fast_fields = segment_reader.fast_fields(); + let fast_field = fast_fields.u64(int_field).unwrap(); + assert_eq!(fast_field.get(5u32), 1u64); + assert_eq!(fast_field.get(4u32), 2u64); + assert_eq!(fast_field.get(3u32), 3u64); + assert_eq!(fast_field.get(2u32), 10u64); + assert_eq!(fast_field.get(1u32), 20u64); + assert_eq!(fast_field.get(0u32), 1_000u64); + + // test new field norm mapping + { + let my_text_field = index.schema().get_field("text_field").unwrap(); + let fieldnorm_reader = segment_reader.get_fieldnorms_reader(my_text_field).unwrap(); + assert_eq!(fieldnorm_reader.fieldnorm(0), 3); // the biggest num + assert_eq!(fieldnorm_reader.fieldnorm(1), 0); + assert_eq!(fieldnorm_reader.fieldnorm(2), 1); // blubber + assert_eq!(fieldnorm_reader.fieldnorm(3), 2); // some text + assert_eq!(fieldnorm_reader.fieldnorm(5), 0); + } + + let my_text_field = index.schema().get_field("text_field").unwrap(); + let searcher = index.reader().unwrap().searcher(); + { + let my_text_field = index.schema().get_field("text_field").unwrap(); + + let do_search = |term: &str| { + let query = QueryParser::for_index(&index, vec![my_text_field]) + .parse_query(term) + .unwrap(); + let top_docs: Vec<(f32, DocAddress)> = + searcher.search(&query, &TopDocs::with_limit(3)).unwrap(); + + top_docs.iter().map(|el| el.1.doc_id).collect::>() + }; + + assert_eq!(do_search("some"), vec![3]); + assert_eq!(do_search("blubber"), vec![2]); + assert_eq!(do_search("biggest"), vec![0]); + } + + // access doc store + { + let doc = searcher.doc(DocAddress::new(0, 2)).unwrap(); + assert_eq!( + doc.get_first(my_text_field).unwrap().text(), + Some("blubber") + ); + let doc = searcher.doc(DocAddress::new(0, 0)).unwrap(); + assert_eq!(doc.get_first(int_field).unwrap().u64_value(), Some(1000)); + } + } + + #[test] + fn test_merge_sorted_index_asc() { + let index = create_test_index(Some(IndexSettings { + sort_by_field: Some(IndexSortByField { + field: "intval".to_string(), + order: Order::Asc, + }), + })); + + let int_field = index.schema().get_field("intval").unwrap(); + let multi_numbers = index.schema().get_field("multi_numbers").unwrap(); + let bytes_field = index.schema().get_field("bytes").unwrap(); + let reader = index.reader().unwrap(); + let searcher = reader.searcher(); + assert_eq!(searcher.segment_readers().len(), 1); + let segment_reader = searcher.segment_readers().last().unwrap(); + + let fast_fields = segment_reader.fast_fields(); + let fast_field = fast_fields.u64(int_field).unwrap(); + assert_eq!(fast_field.get(0u32), 1u64); + assert_eq!(fast_field.get(1u32), 2u64); + assert_eq!(fast_field.get(2u32), 3u64); + assert_eq!(fast_field.get(3u32), 10u64); + assert_eq!(fast_field.get(4u32), 20u64); + assert_eq!(fast_field.get(5u32), 1_000u64); + + let get_vals = |fast_field: &MultiValuedFastFieldReader, doc_id: u32| -> Vec { + let mut vals = vec![]; + fast_field.get_vals(doc_id, &mut vals); + vals + }; + let fast_fields = segment_reader.fast_fields(); + let fast_field = fast_fields.u64s(multi_numbers).unwrap(); + assert_eq!(&get_vals(&fast_field, 0), &[] as &[u64]); + assert_eq!(&get_vals(&fast_field, 1), &[2, 3]); + assert_eq!(&get_vals(&fast_field, 2), &[3, 4]); + assert_eq!(&get_vals(&fast_field, 3), &[10, 11]); + assert_eq!(&get_vals(&fast_field, 4), &[20]); + assert_eq!(&get_vals(&fast_field, 5), &[1001, 1002]); + + let fast_field = fast_fields.bytes(bytes_field).unwrap(); + assert_eq!(fast_field.get_bytes(0), &[] as &[u8]); + assert_eq!(fast_field.get_bytes(2), &[1, 2, 3]); + assert_eq!(fast_field.get_bytes(5), &[5, 5]); + + // test new field norm mapping + { + let my_text_field = index.schema().get_field("text_field").unwrap(); + let fieldnorm_reader = segment_reader.get_fieldnorms_reader(my_text_field).unwrap(); + assert_eq!(fieldnorm_reader.fieldnorm(0), 0); + assert_eq!(fieldnorm_reader.fieldnorm(1), 0); + assert_eq!(fieldnorm_reader.fieldnorm(2), 2); // some text + assert_eq!(fieldnorm_reader.fieldnorm(3), 1); + assert_eq!(fieldnorm_reader.fieldnorm(5), 3); // the biggest num + } + + let searcher = index.reader().unwrap().searcher(); + { + let my_text_field = index.schema().get_field("text_field").unwrap(); + + let do_search = |term: &str| { + let query = QueryParser::for_index(&index, vec![my_text_field]) + .parse_query(term) + .unwrap(); + let top_docs: Vec<(f32, DocAddress)> = + searcher.search(&query, &TopDocs::with_limit(3)).unwrap(); + + top_docs.iter().map(|el| el.1.doc_id).collect::>() + }; + + assert_eq!(do_search("some"), vec![2]); + assert_eq!(do_search("blubber"), vec![3]); + assert_eq!(do_search("biggest"), vec![5]); + } + } +} + +#[cfg(all(test, feature = "unstable"))] +mod bench_sorted_index_merge { + + use crate::core::Index; + //use cratedoc_id, readerdoc_id_mappinglet vals = reader.fate::schema; + use crate::fastfield::FastFieldReader; + use crate::indexer::merger::IndexMerger; + use crate::schema::Cardinality; + use crate::schema::Document; + use crate::schema::IntOptions; + use crate::schema::Schema; + use crate::IndexSettings; + use crate::IndexSortByField; + use crate::IndexWriter; + use crate::Order; + use futures::executor::block_on; + use test::{self, Bencher}; + fn create_index(sort_by_field: Option) -> Index { + let mut schema_builder = Schema::builder(); + let int_options = IntOptions::default() + .set_fast(Cardinality::SingleValue) + .set_indexed(); + let int_field = schema_builder.add_u64_field("intval", int_options); + let int_field = schema_builder.add_u64_field("intval", int_options); + let schema = schema_builder.build(); + + let index_builder = Index::builder() + .schema(schema) + .settings(IndexSettings { sort_by_field }); + let index = index_builder.create_in_ram().unwrap(); + + { + let mut index_writer = index.writer_for_tests().unwrap(); + let index_doc = |index_writer: &mut IndexWriter, val: u64| { + let mut doc = Document::default(); + doc.add_u64(int_field, val); + index_writer.add_document(doc); + }; + // 3 segments with 10_000 values in the fast fields + for _ in 0..3 { + index_doc(&mut index_writer, 5000); // fix to make it unordered + for i in 0..10_000 { + index_doc(&mut index_writer, i); + } + index_writer.commit().unwrap(); + } + } + index + } + #[bench] + fn create_sorted_index_walk_overkmerge_on_merge_fastfield( + b: &mut Bencher, + ) -> crate::Result<()> { + let sort_by_field = IndexSortByField { + field: "intval".to_string(), + order: Order::Desc, + }; + let index = create_index(Some(sort_by_field.clone())); + let field = index.schema().get_field("intval").unwrap(); + let segments = index.searchable_segments().unwrap(); + let merger: IndexMerger = + IndexMerger::open(index.schema(), index.settings().clone(), &segments[..])?; + let doc_id_mapping = merger.generate_doc_id_mapping(&sort_by_field).unwrap(); + b.iter(|| { + + let sorted_doc_ids = doc_id_mapping.iter().map(|(doc_id, reader)|{ + let u64_reader: FastFieldReader = reader + .fast_fields() + .typed_fast_field_reader(field) + .expect("Failed to find a reader for single fast field. This is a tantivy bug and it should never happen."); + (doc_id, reader, u64_reader) + }); + // add values in order of the new docids + let mut val = 0; + for (doc_id, _reader, field_reader) in sorted_doc_ids { + val = field_reader.get(*doc_id); + } + + val + + }); + + Ok(()) + } + #[bench] + fn create_sorted_index_create_docid_mapping(b: &mut Bencher) -> crate::Result<()> { + let sort_by_field = IndexSortByField { + field: "intval".to_string(), + order: Order::Desc, + }; + let index = create_index(Some(sort_by_field.clone())); + let field = index.schema().get_field("intval").unwrap(); + let segments = index.searchable_segments().unwrap(); + let merger: IndexMerger = + IndexMerger::open(index.schema(), index.settings().clone(), &segments[..])?; + b.iter(|| { + merger.generate_doc_id_mapping(&sort_by_field).unwrap(); + }); + + Ok(()) + } +} diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index e02458d3c..27d9fee96 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -1,11 +1,13 @@ pub mod delete_queue; +pub mod doc_id_mapping; mod doc_opstamp_mapping; pub mod index_writer; mod log_merge_policy; mod merge_operation; pub mod merge_policy; pub mod merger; +mod merger_sorted_index_test; pub mod operation; mod prepared_commit; mod segment_entry; diff --git a/src/indexer/segment_serializer.rs b/src/indexer/segment_serializer.rs index 97c0196b8..e71964088 100644 --- a/src/indexer/segment_serializer.rs +++ b/src/indexer/segment_serializer.rs @@ -9,7 +9,7 @@ use crate::store::StoreWriter; /// the data accumulated and sorted by the `SegmentWriter`. pub struct SegmentSerializer { segment: Segment, - store_writer: StoreWriter, + pub(crate) store_writer: StoreWriter, fast_field_serializer: FastFieldSerializer, fieldnorms_serializer: Option, postings_serializer: InvertedIndexSerializer, @@ -17,8 +17,20 @@ pub struct SegmentSerializer { impl SegmentSerializer { /// Creates a new `SegmentSerializer`. - pub fn for_segment(mut segment: Segment) -> crate::Result { - let store_write = segment.open_write(SegmentComponent::Store)?; + pub fn for_segment( + mut segment: Segment, + is_in_merge: bool, + ) -> crate::Result { + // If the segment is going to be sorted, we stream the docs first to a temporary file. + // In the merge case this is not necessary because we can kmerge the already sorted + // segments + let remapping_required = segment.index().settings().sort_by_field.is_some() && !is_in_merge; + let store_component = if remapping_required { + SegmentComponent::TempStore + } else { + SegmentComponent::Store + }; + let store_write = segment.open_write(store_component)?; let fast_field_write = segment.open_write(SegmentComponent::FastFields)?; let fast_field_serializer = FastFieldSerializer::from_write(fast_field_write)?; @@ -45,6 +57,10 @@ impl SegmentSerializer { &self.segment } + pub fn segment_mut(&mut self) -> &mut Segment { + &mut self.segment + } + /// Accessor to the `PostingsSerializer`. pub fn get_postings_serializer(&mut self) -> &mut InvertedIndexSerializer { &mut self.postings_serializer diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 778ae39ba..0be2f93de 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -46,7 +46,7 @@ const NUM_MERGE_THREADS: usize = 4; /// This method is not part of tantivy's public API pub fn save_new_metas( schema: Schema, - index_settings: Option, + index_settings: IndexSettings, directory: &dyn Directory, ) -> crate::Result<()> { save_metas( @@ -134,12 +134,13 @@ fn merge( .collect(); // An IndexMerger is like a "view" of our merged segments. - let merger: IndexMerger = IndexMerger::open(index.schema(), &segments[..])?; + let merger: IndexMerger = + IndexMerger::open(index.schema(), index.settings().clone(), &segments[..])?; - // ... we just serialize this index merger in our new segment to merge the two segments. - let segment_serializer = SegmentSerializer::for_segment(merged_segment.clone())?; + // ... we just serialize this index merger in our new segment to merge the segments. + let segment_serializer = SegmentSerializer::for_segment(merged_segment.clone(), true)?; - let num_docs = merger.write(segment_serializer)?; + let num_docs = merger.write(segment_serializer, None)?; let merged_segment_id = merged_segment.id(); @@ -171,6 +172,7 @@ pub fn merge_segments( } let target_schema = indices[0].schema(); + let target_settings = indices[0].settings().clone(); // let's check that all of the indices have the same schema if indices @@ -182,22 +184,32 @@ pub fn merge_segments( "Attempt to merge different schema indices".to_string(), )); } + // let's check that all of the indices have the same index settings + if indices + .iter() + .skip(1) + .any(|index| index.settings() != &target_settings) + { + return Err(crate::TantivyError::InvalidArgument( + "Attempt to merge indices with different index_settings".to_string(), + )); + } let mut segments: Vec = Vec::new(); for index in indices { segments.extend(index.searchable_segments()?); } - let mut merged_index = Index::create( - output_directory, - target_schema.clone(), - indices[0].settings().clone(), - )?; + let mut merged_index = Index::create(output_directory, target_schema.clone(), target_settings)?; let merged_segment = merged_index.new_segment(); let merged_segment_id = merged_segment.id(); - let merger: IndexMerger = IndexMerger::open(merged_index.schema(), &segments[..])?; - let segment_serializer = SegmentSerializer::for_segment(merged_segment)?; - let num_docs = merger.write(segment_serializer)?; + let merger: IndexMerger = IndexMerger::open( + merged_index.schema(), + merged_index.settings().clone(), + &segments[..], + )?; + let segment_serializer = SegmentSerializer::for_segment(merged_segment, true)?; + let num_docs = merger.write(segment_serializer, None)?; let segment_meta = merged_index.new_segment_meta(merged_segment_id, num_docs); @@ -234,7 +246,7 @@ pub(crate) struct InnerSegmentUpdater { // // This should be up to date as all update happen through // the unique active `SegmentUpdater`. - active_metas: RwLock>, + active_index_meta: RwLock>, pool: ThreadPool, merge_thread_pool: ThreadPool, @@ -274,7 +286,7 @@ impl SegmentUpdater { })?; let index_meta = index.load_metas()?; Ok(SegmentUpdater(Arc::new(InnerSegmentUpdater { - active_metas: RwLock::new(Arc::new(index_meta)), + active_index_meta: RwLock::new(Arc::new(index_meta)), pool, merge_thread_pool, index, @@ -379,7 +391,7 @@ impl SegmentUpdater { // Segment 1 from disk 1, Segment 1 from disk 2, etc. commited_segment_metas.sort_by_key(|segment_meta| -(segment_meta.max_doc() as i32)); let index_meta = IndexMeta { - index_settings: None, + index_settings: index.settings().clone(), segments: commited_segment_metas, schema: index.schema(), opstamp, @@ -431,15 +443,15 @@ impl SegmentUpdater { } fn store_meta(&self, index_meta: &IndexMeta) { - *self.active_metas.write().unwrap() = Arc::new(index_meta.clone()); + *self.active_index_meta.write().unwrap() = Arc::new(index_meta.clone()); } - fn load_metas(&self) -> Arc { - self.active_metas.read().unwrap().clone() + fn load_meta(&self) -> Arc { + self.active_index_meta.read().unwrap().clone() } pub(crate) fn make_merge_operation(&self, segment_ids: &[SegmentId]) -> MergeOperation { - let commit_opstamp = self.load_metas().opstamp; + let commit_opstamp = self.load_meta().opstamp; MergeOperation::new(&self.merge_operations, commit_opstamp, segment_ids.to_vec()) } @@ -509,8 +521,11 @@ impl SegmentUpdater { } }); - Ok(merging_future_recv - .unwrap_or_else(|_| Err(crate::TantivyError::SystemError("Merge failed".to_string())))) + Ok(merging_future_recv.unwrap_or_else(|e| { + Err(crate::TantivyError::SystemError( + "Merge failed:".to_string() + &e.to_string(), + )) + })) } async fn consider_merge_options(&self) { @@ -531,7 +546,7 @@ impl SegmentUpdater { }) .collect(); - let commit_opstamp = self.load_metas().opstamp; + let commit_opstamp = self.load_meta().opstamp; let committed_merge_candidates = merge_policy .compute_merge_candidates(&committed_segments) .into_iter() @@ -562,7 +577,7 @@ impl SegmentUpdater { { let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone(); if let Some(delete_operation) = delete_cursor.get() { - let committed_opstamp = segment_updater.load_metas().opstamp; + let committed_opstamp = segment_updater.load_meta().opstamp; if delete_operation.opstamp < committed_opstamp { let index = &segment_updater.index; let segment = index.segment(after_merge_segment_entry.meta().clone()); @@ -586,7 +601,7 @@ impl SegmentUpdater { } } } - let previous_metas = segment_updater.load_metas(); + let previous_metas = segment_updater.load_meta(); let segments_status = segment_updater .segment_manager .end_merge(merge_operation.segment_ids(), after_merge_segment_entry)?; @@ -598,6 +613,7 @@ impl SegmentUpdater { segment_updater.consider_merge_options().await; } // we drop all possible handle to a now useless `SegmentMeta`. + let _ = garbage_collect_files(segment_updater).await; Ok(()) }); diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index 586dfbd25..4ff505aa2 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -1,6 +1,7 @@ -use super::operation::AddOperation; -use crate::core::Segment; -use crate::core::SerializableSegment; +use super::{ + doc_id_mapping::{get_doc_id_mapping_from_field, DocIdMapping}, + operation::AddOperation, +}; use crate::fastfield::FastFieldsWriter; use crate::fieldnorm::{FieldNormReaders, FieldNormsWriter}; use crate::indexer::segment_serializer::SegmentSerializer; @@ -15,6 +16,8 @@ use crate::tokenizer::{BoxTokenStream, PreTokenizedStream}; use crate::tokenizer::{FacetTokenizer, TextAnalyzer}; use crate::tokenizer::{TokenStreamChain, Tokenizer}; use crate::Opstamp; +use crate::{core::Segment, store::StoreWriter}; +use crate::{core::SerializableSegment, store::StoreReader}; use crate::{DocId, SegmentComponent}; /// Computes the initial size of the hash table. @@ -39,12 +42,12 @@ fn initial_table_size(per_thread_memory_budget: usize) -> crate::Result { /// They creates the postings list in anonymous memory. /// The segment is layed on disk when the segment gets `finalized`. pub struct SegmentWriter { - max_doc: DocId, - multifield_postings: MultiFieldPostingsWriter, - segment_serializer: SegmentSerializer, - fast_field_writers: FastFieldsWriter, - fieldnorms_writer: FieldNormsWriter, - doc_opstamps: Vec, + pub(crate) max_doc: DocId, + pub(crate) multifield_postings: MultiFieldPostingsWriter, + pub(crate) segment_serializer: SegmentSerializer, + pub(crate) fast_field_writers: FastFieldsWriter, + pub(crate) fieldnorms_writer: FieldNormsWriter, + pub(crate) doc_opstamps: Vec, tokenizers: Vec>, term_buffer: Term, } @@ -66,7 +69,7 @@ impl SegmentWriter { ) -> crate::Result { let tokenizer_manager = segment.index().tokenizers().clone(); let table_num_bits = initial_table_size(memory_budget)?; - let segment_serializer = SegmentSerializer::for_segment(segment)?; + let segment_serializer = SegmentSerializer::for_segment(segment, false)?; let multifield_postings = MultiFieldPostingsWriter::new(schema, table_num_bits); let tokenizers = schema .fields() @@ -100,11 +103,21 @@ impl SegmentWriter { /// be used afterwards. pub fn finalize(mut self) -> crate::Result> { self.fieldnorms_writer.fill_up_to_max_doc(self.max_doc); + let mapping: Option = self + .segment_serializer + .segment() + .index() + .settings() + .sort_by_field + .clone() + .map(|sort_by_field| get_doc_id_mapping_from_field(sort_by_field, &self)) + .transpose()?; write( &self.multifield_postings, &self.fast_field_writers, &self.fieldnorms_writer, self.segment_serializer, + mapping.as_ref(), )?; Ok(self.doc_opstamps) } @@ -168,7 +181,7 @@ impl SegmentWriter { }); if let Some(unordered_term_id) = unordered_term_id_opt { self.fast_field_writers - .get_multivalue_writer(field) + .get_multivalue_writer_mut(field) .expect("writer for facet missing") .add_val(unordered_term_id); } @@ -308,29 +321,63 @@ fn write( fast_field_writers: &FastFieldsWriter, fieldnorms_writer: &FieldNormsWriter, mut serializer: SegmentSerializer, + doc_id_map: Option<&DocIdMapping>, ) -> crate::Result<()> { if let Some(fieldnorms_serializer) = serializer.extract_fieldnorms_serializer() { - fieldnorms_writer.serialize(fieldnorms_serializer)?; + fieldnorms_writer.serialize(fieldnorms_serializer, doc_id_map)?; } let fieldnorm_data = serializer .segment() .open_read(SegmentComponent::FieldNorms)?; let fieldnorm_readers = FieldNormReaders::open(fieldnorm_data)?; - let term_ord_map = - multifield_postings.serialize(serializer.get_postings_serializer(), fieldnorm_readers)?; - fast_field_writers.serialize(serializer.get_fast_field_serializer(), &term_ord_map)?; + let term_ord_map = multifield_postings.serialize( + serializer.get_postings_serializer(), + fieldnorm_readers, + doc_id_map, + )?; + fast_field_writers.serialize( + serializer.get_fast_field_serializer(), + &term_ord_map, + doc_id_map, + )?; + // finalize temp docstore and create version, which reflects the doc_id_map + if let Some(doc_id_map) = doc_id_map { + let store_write = serializer + .segment_mut() + .open_write(SegmentComponent::Store)?; + let old_store_writer = + std::mem::replace(&mut serializer.store_writer, StoreWriter::new(store_write)); + old_store_writer.close()?; + let store_read = StoreReader::open( + serializer + .segment() + .open_read(SegmentComponent::TempStore)?, + )?; + for old_doc_id in doc_id_map.iter_old_doc_ids() { + let raw_doc = store_read.get_raw(*old_doc_id)?; + serializer + .get_store_writer() + .store_bytes(raw_doc.get_bytes())?; + } + // TODO delete temp store + } serializer.close()?; Ok(()) } impl SerializableSegment for SegmentWriter { - fn write(&self, serializer: SegmentSerializer) -> crate::Result { + fn write( + &self, + serializer: SegmentSerializer, + doc_id_map: Option<&DocIdMapping>, + ) -> crate::Result { let max_doc = self.max_doc; write( &self.multifield_postings, &self.fast_field_writers, &self.fieldnorms_writer, serializer, + doc_id_map, )?; Ok(max_doc) } diff --git a/src/lib.rs b/src/lib.rs index 2d282882b..954ae1c50 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -160,7 +160,10 @@ pub use self::docset::{DocSet, TERMINATED}; pub use crate::common::HasLen; pub use crate::common::{f64_to_u64, i64_to_u64, u64_to_f64, u64_to_i64}; pub use crate::core::{Executor, SegmentComponent}; -pub use crate::core::{Index, IndexMeta, IndexSettings, Searcher, Segment, SegmentId, SegmentMeta}; +pub use crate::core::{ + Index, IndexBuilder, IndexMeta, IndexSettings, IndexSortByField, Order, Searcher, Segment, + SegmentId, SegmentMeta, +}; pub use crate::core::{InvertedIndexReader, SegmentReader}; pub use crate::directory::Directory; pub use crate::indexer::merge_segments; @@ -255,7 +258,7 @@ pub type Opstamp = u64; /// the document to the search query. pub type Score = f32; -/// A `SegmentOrdinal` identifies a segment, within a `Searcher`. +/// A `SegmentOrdinal` identifies a segment, within a `Searcher` or `Merger`. pub type SegmentOrdinal = u32; impl DocAddress { diff --git a/src/postings/postings_writer.rs b/src/postings/postings_writer.rs index 959ec2bb2..9e0059b0f 100644 --- a/src/postings/postings_writer.rs +++ b/src/postings/postings_writer.rs @@ -1,6 +1,5 @@ use super::stacker::{Addr, MemoryArena, TermHashMap}; -use crate::fieldnorm::FieldNormReaders; use crate::postings::recorder::{ BufferLender, NothingRecorder, Recorder, TermFrequencyRecorder, TfAndPositionRecorder, }; @@ -12,6 +11,7 @@ use crate::termdict::TermOrdinal; use crate::tokenizer::TokenStream; use crate::tokenizer::{Token, MAX_TOKEN_LEN}; use crate::DocId; +use crate::{fieldnorm::FieldNormReaders, indexer::doc_id_mapping::DocIdMapping}; use fnv::FnvHashMap; use std::collections::HashMap; use std::io; @@ -130,6 +130,7 @@ impl MultiFieldPostingsWriter { &self, serializer: &mut InvertedIndexSerializer, fieldnorm_readers: FieldNormReaders, + doc_id_map: Option<&DocIdMapping>, ) -> crate::Result>> { let mut term_offsets: Vec<(&[u8], Addr, UnorderedTermId)> = self.term_index.iter().collect(); @@ -175,6 +176,7 @@ impl MultiFieldPostingsWriter { &mut field_serializer, &self.term_index.heap, &self.heap, + doc_id_map, )?; field_serializer.close()?; } @@ -211,6 +213,7 @@ pub trait PostingsWriter { serializer: &mut FieldSerializer<'_>, term_heap: &MemoryArena, heap: &MemoryArena, + doc_id_map: Option<&DocIdMapping>, ) -> io::Result<()>; /// Tokenize a text and subscribe all of its token. @@ -236,7 +239,7 @@ pub trait PostingsWriter { heap, ); } else { - info!( + warn!( "A token exceeding MAX_TOKEN_LEN ({}>{}) was dropped. Search for \ MAX_TOKEN_LEN in the documentation for more information.", token.text.len(), @@ -307,13 +310,14 @@ impl PostingsWriter for SpecializedPostingsWriter serializer: &mut FieldSerializer<'_>, termdict_heap: &MemoryArena, heap: &MemoryArena, + doc_id_map: Option<&DocIdMapping>, ) -> io::Result<()> { let mut buffer_lender = BufferLender::default(); for &(term_bytes, addr, _) in term_addrs { let recorder: Rec = termdict_heap.read(addr); let term_doc_freq = recorder.term_doc_freq().unwrap_or(0u32); serializer.new_term(&term_bytes[4..], term_doc_freq)?; - recorder.serialize(&mut buffer_lender, serializer, heap); + recorder.serialize(&mut buffer_lender, serializer, heap, doc_id_map); serializer.close_term()?; } Ok(()) diff --git a/src/postings/recorder.rs b/src/postings/recorder.rs index 8da4322c8..d5bffa521 100644 --- a/src/postings/recorder.rs +++ b/src/postings/recorder.rs @@ -1,7 +1,10 @@ use super::stacker::{ExpUnrolledLinkedList, MemoryArena}; -use crate::common::{read_u32_vint, write_u32_vint}; use crate::postings::FieldSerializer; use crate::DocId; +use crate::{ + common::{read_u32_vint, write_u32_vint}, + indexer::doc_id_mapping::DocIdMapping, +}; const POSITION_END: u32 = 0; @@ -73,6 +76,7 @@ pub(crate) trait Recorder: Copy + 'static { buffer_lender: &mut BufferLender, serializer: &mut FieldSerializer<'_>, heap: &MemoryArena, + doc_id_map: Option<&DocIdMapping>, ); /// Returns the number of document containing this term. /// @@ -113,12 +117,25 @@ impl Recorder for NothingRecorder { buffer_lender: &mut BufferLender, serializer: &mut FieldSerializer<'_>, heap: &MemoryArena, + doc_id_map: Option<&DocIdMapping>, ) { - let buffer = buffer_lender.lend_u8(); + let (buffer, doc_ids) = buffer_lender.lend_all(); self.stack.read_to_end(heap, buffer); - // TODO avoid reading twice. - for doc in VInt32Reader::new(&buffer[..]) { - serializer.write_doc(doc as u32, 0u32, &[][..]); + //TODO avoid reading twice. + if let Some(doc_id_map) = doc_id_map { + doc_ids.extend( + VInt32Reader::new(&buffer[..]) + .map(|old_doc_id| doc_id_map.get_new_doc_id(old_doc_id)), + ); + doc_ids.sort_unstable(); + + for doc in doc_ids { + serializer.write_doc(*doc, 0u32, &[][..]); + } + } else { + for doc in VInt32Reader::new(&buffer[..]) { + serializer.write_doc(doc, 0u32, &[][..]); + } } } @@ -140,7 +157,7 @@ impl Recorder for TermFrequencyRecorder { fn new() -> Self { TermFrequencyRecorder { stack: ExpUnrolledLinkedList::new(), - current_doc: u32::max_value(), + current_doc: 0, current_tf: 0u32, term_doc_freq: 0u32, } @@ -171,13 +188,27 @@ impl Recorder for TermFrequencyRecorder { buffer_lender: &mut BufferLender, serializer: &mut FieldSerializer<'_>, heap: &MemoryArena, + doc_id_map: Option<&DocIdMapping>, ) { let buffer = buffer_lender.lend_u8(); self.stack.read_to_end(heap, buffer); let mut u32_it = VInt32Reader::new(&buffer[..]); - while let Some(doc) = u32_it.next() { - let term_freq = u32_it.next().unwrap_or(self.current_tf); - serializer.write_doc(doc as u32, term_freq, &[][..]); + if let Some(doc_id_map) = doc_id_map { + let mut doc_id_and_tf = vec![]; + while let Some(old_doc_id) = u32_it.next() { + let term_freq = u32_it.next().unwrap_or(self.current_tf); + doc_id_and_tf.push((doc_id_map.get_new_doc_id(old_doc_id), term_freq)); + } + doc_id_and_tf.sort_unstable_by_key(|&(doc_id, _)| doc_id); + + for (doc_id, tf) in doc_id_and_tf { + serializer.write_doc(doc_id, tf, &[][..]); + } + } else { + while let Some(doc) = u32_it.next() { + let term_freq = u32_it.next().unwrap_or(self.current_tf); + serializer.write_doc(doc, term_freq, &[][..]); + } } } @@ -225,10 +256,12 @@ impl Recorder for TfAndPositionRecorder { buffer_lender: &mut BufferLender, serializer: &mut FieldSerializer<'_>, heap: &MemoryArena, + doc_id_map: Option<&DocIdMapping>, ) { let (buffer_u8, buffer_positions) = buffer_lender.lend_all(); self.stack.read_to_end(heap, buffer_u8); let mut u32_it = VInt32Reader::new(&buffer_u8[..]); + let mut doc_id_and_positions = vec![]; while let Some(doc) = u32_it.next() { let mut prev_position_plus_one = 1u32; buffer_positions.clear(); @@ -244,7 +277,19 @@ impl Recorder for TfAndPositionRecorder { } } } - serializer.write_doc(doc, buffer_positions.len() as u32, &buffer_positions); + if let Some(doc_id_map) = doc_id_map { + // this simple variant to remap may consume to much memory + doc_id_and_positions + .push((doc_id_map.get_new_doc_id(doc), buffer_positions.to_vec())); + } else { + serializer.write_doc(doc, buffer_positions.len() as u32, &buffer_positions); + } + } + if doc_id_map.is_some() { + doc_id_and_positions.sort_unstable_by_key(|&(doc_id, _)| doc_id); + for (doc_id, positions) in doc_id_and_positions { + serializer.write_doc(doc_id, positions.len() as u32, &positions); + } } } diff --git a/src/schema/facet.rs b/src/schema/facet.rs index 1fec07185..05243ce96 100644 --- a/src/schema/facet.rs +++ b/src/schema/facet.rs @@ -109,7 +109,7 @@ impl Facet { self.0.push_str(facet_str); } - /// Returns `true` iff other is a subfacet of `self`. + /// Returns `true` if other is a subfacet of `self`. pub fn is_prefix_of(&self, other: &Facet) -> bool { let self_str = self.encoded_str(); let other_str = other.encoded_str(); diff --git a/src/space_usage/mod.rs b/src/space_usage/mod.rs index 886b9bde3..2bc5cafea 100644 --- a/src/space_usage/mod.rs +++ b/src/space_usage/mod.rs @@ -125,6 +125,7 @@ impl SegmentSpaceUsage { FieldNorms => PerField(self.fieldnorms().clone()), Terms => PerField(self.termdict().clone()), SegmentComponent::Store => ComponentSpaceUsage::Store(self.store().clone()), + SegmentComponent::TempStore => ComponentSpaceUsage::Store(self.store().clone()), Delete => Basic(self.deletes()), } } diff --git a/src/store/reader.rs b/src/store/reader.rs index b7f7f94a9..a7a0fafe3 100644 --- a/src/store/reader.rs +++ b/src/store/reader.rs @@ -86,23 +86,46 @@ impl StoreReader { /// Reads a given document. /// /// Calling `.get(doc)` is relatively costly as it requires - /// decompressing a compressed block. + /// decompressing a compressed block. The store utilizes a LRU cache, + /// so accessing docs from the same compressed block should be faster. + /// For that reason a store reader should be kept and reused. /// /// It should not be called to score documents /// for instance. pub fn get(&self, doc_id: DocId) -> crate::Result { + let raw_doc = self.get_raw(doc_id)?; + let mut cursor = raw_doc.get_bytes(); + Ok(Document::deserialize(&mut cursor)?) + } + + /// Reads raw bytes of a given document. Returns `RawDocument`, which contains the block of a document and its start and end + /// position within the block. + /// + /// Calling `.get(doc)` is relatively costly as it requires + /// decompressing a compressed block. The store utilizes a LRU cache, + /// so accessing docs from the same compressed block should be faster. + /// For that reason a store reader should be kept and reused. + /// + pub fn get_raw(&self, doc_id: DocId) -> crate::Result { let checkpoint = self.block_checkpoint(doc_id).ok_or_else(|| { crate::TantivyError::InvalidArgument(format!("Failed to lookup Doc #{}.", doc_id)) })?; - let mut cursor = &self.read_block(&checkpoint)?[..]; + let block = self.read_block(&checkpoint)?; + let mut cursor = &block[..]; + let cursor_len_before = cursor.len(); for _ in checkpoint.doc_range.start..doc_id { let doc_length = VInt::deserialize(&mut cursor)?.val() as usize; cursor = &cursor[doc_length..]; } let doc_length = VInt::deserialize(&mut cursor)?.val() as usize; - cursor = &cursor[..doc_length]; - Ok(Document::deserialize(&mut cursor)?) + let start_pos = cursor_len_before - cursor.len(); + let end_pos = cursor_len_before - cursor.len() + doc_length; + Ok(RawDocument { + block, + start_pos, + end_pos, + }) } /// Summarize total space usage of this store reader. @@ -111,6 +134,23 @@ impl StoreReader { } } +/// Get the bytes of a serialized `Document` in a decompressed block. +pub struct RawDocument { + /// the block of data containing multiple documents + block: Arc>, + /// start position of the document in the block + start_pos: usize, + /// end position of the document in the block + end_pos: usize, +} + +impl RawDocument { + /// Get the bytes of a serialized `Document` in a decompressed block. + pub fn get_bytes(&self) -> &[u8] { + &self.block[self.start_pos..self.end_pos] + } +} + fn split_file(data: FileSlice) -> io::Result<(FileSlice, FileSlice)> { let (data, footer_len_bytes) = data.split_from_end(size_of::()); let serialized_offset: OwnedBytes = footer_len_bytes.read_bytes()?; diff --git a/src/store/writer.rs b/src/store/writer.rs index b9e36b8bf..0c261f88c 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -50,14 +50,33 @@ impl StoreWriter { self.intermediary_buffer.capacity() + self.current_block.capacity() } + /// Store bytes of a serialized document. + /// + /// The document id is implicitely the current number + /// of documents. + /// + pub fn store_bytes(&mut self, serialized_document: &[u8]) -> io::Result<()> { + let doc_num_bytes = serialized_document.len(); + VInt(doc_num_bytes as u64).serialize(&mut self.current_block)?; + self.current_block.write_all(&serialized_document)?; + self.doc += 1; + if self.current_block.len() > BLOCK_SIZE { + self.write_and_compress_block()?; + } + Ok(()) + } + /// Store a new document. /// - /// The document id is implicitely the number of times - /// this method has been called. + /// The document id is implicitely the current number + /// of documents. /// pub fn store(&mut self, stored_document: &Document) -> io::Result<()> { self.intermediary_buffer.clear(); stored_document.serialize(&mut self.intermediary_buffer)?; + // calling store bytes would be preferable for code reuse, but then we can't use + // intermediary_buffer due to the borrow checker + // a new buffer costs ~1% indexing performance let doc_num_bytes = self.intermediary_buffer.len(); VInt(doc_num_bytes as u64).serialize(&mut self.current_block)?; self.current_block