diff --git a/common/src/lib.rs b/common/src/lib.rs index 9dcdc5a46..054378ee5 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -118,6 +118,7 @@ pub fn u64_to_f64(val: u64) -> f64 { /// /// This function assumes that the needle is rarely contained in the bytes string /// and offers a fast path if the needle is not present. +#[inline] pub fn replace_in_place(needle: u8, replacement: u8, bytes: &mut [u8]) { if !bytes.contains(&needle) { return; diff --git a/src/core/json_utils.rs b/src/core/json_utils.rs index ae8db931a..6a5387cf0 100644 --- a/src/core/json_utils.rs +++ b/src/core/json_utils.rs @@ -1,6 +1,5 @@ use columnar::MonotonicallyMappableToU64; use common::{replace_in_place, JsonPathWriter}; -use murmurhash32::murmurhash2; use rustc_hash::FxHashMap; use crate::fastfield::FastValue; @@ -58,13 +57,12 @@ struct IndexingPositionsPerPath { } impl IndexingPositionsPerPath { - fn get_position(&mut self, term: &Term) -> &mut IndexingPosition { - self.positions_per_path - .entry(murmurhash2(term.serialized_term())) - .or_default() + fn get_position_from_id(&mut self, id: u32) -> &mut IndexingPosition { + self.positions_per_path.entry(id).or_default() } } +#[allow(clippy::too_many_arguments)] pub(crate) fn index_json_values<'a, V: Value<'a>>( doc: DocId, json_visitors: impl Iterator>, @@ -72,9 +70,11 @@ pub(crate) fn index_json_values<'a, V: Value<'a>>( expand_dots_enabled: bool, term_buffer: &mut Term, postings_writer: &mut dyn PostingsWriter, + json_path_writer: &mut JsonPathWriter, ctx: &mut IndexingContext, ) -> crate::Result<()> { - let mut json_term_writer = JsonTermWriter::wrap(term_buffer, expand_dots_enabled); + json_path_writer.clear(); + json_path_writer.set_expand_dots(expand_dots_enabled); let mut positions_per_path: IndexingPositionsPerPath = Default::default(); for json_visitor_res in json_visitors { let json_visitor = json_visitor_res?; @@ -82,7 +82,8 @@ pub(crate) fn index_json_values<'a, V: Value<'a>>( doc, json_visitor, text_analyzer, - &mut json_term_writer, + term_buffer, + json_path_writer, postings_writer, ctx, &mut positions_per_path, @@ -91,75 +92,117 @@ pub(crate) fn index_json_values<'a, V: Value<'a>>( Ok(()) } +#[allow(clippy::too_many_arguments)] fn index_json_object<'a, V: Value<'a>>( doc: DocId, json_visitor: V::ObjectIter, text_analyzer: &mut TextAnalyzer, - json_term_writer: &mut JsonTermWriter, + term_buffer: &mut Term, + json_path_writer: &mut JsonPathWriter, postings_writer: &mut dyn PostingsWriter, ctx: &mut IndexingContext, positions_per_path: &mut IndexingPositionsPerPath, ) { for (json_path_segment, json_value_visitor) in json_visitor { - json_term_writer.push_path_segment(json_path_segment); + json_path_writer.push(json_path_segment); index_json_value( doc, json_value_visitor, text_analyzer, - json_term_writer, + term_buffer, + json_path_writer, postings_writer, ctx, positions_per_path, ); - json_term_writer.pop_path_segment(); + json_path_writer.pop(); } } +#[allow(clippy::too_many_arguments)] fn index_json_value<'a, V: Value<'a>>( doc: DocId, json_value: V, text_analyzer: &mut TextAnalyzer, - json_term_writer: &mut JsonTermWriter, + term_buffer: &mut Term, + json_path_writer: &mut JsonPathWriter, postings_writer: &mut dyn PostingsWriter, ctx: &mut IndexingContext, positions_per_path: &mut IndexingPositionsPerPath, ) { + let set_path_id = |term_buffer: &mut Term, unordered_id: u32| { + term_buffer.truncate_value_bytes(0); + term_buffer.append_bytes(&unordered_id.to_be_bytes()); + }; + let set_type = |term_buffer: &mut Term, typ: Type| { + term_buffer.append_bytes(&[typ.to_code()]); + }; + match json_value.as_value() { ReferenceValue::Leaf(leaf) => match leaf { ReferenceValueLeaf::Null => {} ReferenceValueLeaf::Str(val) => { let mut token_stream = text_analyzer.token_stream(val); + let unordered_id = ctx + .path_to_unordered_id + .get_or_allocate_unordered_id(json_path_writer.as_str()); // TODO: make sure the chain position works out. - json_term_writer.close_path_and_set_type(Type::Str); - let indexing_position = positions_per_path.get_position(json_term_writer.term()); + set_path_id(term_buffer, unordered_id); + set_type(term_buffer, Type::Str); + let indexing_position = positions_per_path.get_position_from_id(unordered_id); postings_writer.index_text( doc, &mut *token_stream, - json_term_writer.term_buffer, + term_buffer, ctx, indexing_position, ); } ReferenceValueLeaf::U64(val) => { - json_term_writer.set_fast_value(val); - postings_writer.subscribe(doc, 0u32, json_term_writer.term(), ctx); + set_path_id( + term_buffer, + ctx.path_to_unordered_id + .get_or_allocate_unordered_id(json_path_writer.as_str()), + ); + term_buffer.append_type_and_fast_value(val); + postings_writer.subscribe(doc, 0u32, term_buffer, ctx); } ReferenceValueLeaf::I64(val) => { - json_term_writer.set_fast_value(val); - postings_writer.subscribe(doc, 0u32, json_term_writer.term(), ctx); + set_path_id( + term_buffer, + ctx.path_to_unordered_id + .get_or_allocate_unordered_id(json_path_writer.as_str()), + ); + term_buffer.append_type_and_fast_value(val); + postings_writer.subscribe(doc, 0u32, term_buffer, ctx); } ReferenceValueLeaf::F64(val) => { - json_term_writer.set_fast_value(val); - postings_writer.subscribe(doc, 0u32, json_term_writer.term(), ctx); + set_path_id( + term_buffer, + ctx.path_to_unordered_id + .get_or_allocate_unordered_id(json_path_writer.as_str()), + ); + term_buffer.append_type_and_fast_value(val); + postings_writer.subscribe(doc, 0u32, term_buffer, ctx); } ReferenceValueLeaf::Bool(val) => { - json_term_writer.set_fast_value(val); - postings_writer.subscribe(doc, 0u32, json_term_writer.term(), ctx); + set_path_id( + term_buffer, + ctx.path_to_unordered_id + .get_or_allocate_unordered_id(json_path_writer.as_str()), + ); + term_buffer.append_type_and_fast_value(val); + postings_writer.subscribe(doc, 0u32, term_buffer, ctx); } ReferenceValueLeaf::Date(val) => { - json_term_writer.set_fast_value(val); - postings_writer.subscribe(doc, 0u32, json_term_writer.term(), ctx); + set_path_id( + term_buffer, + ctx.path_to_unordered_id + .get_or_allocate_unordered_id(json_path_writer.as_str()), + ); + term_buffer.append_type_and_fast_value(val); + postings_writer.subscribe(doc, 0u32, term_buffer, ctx); } ReferenceValueLeaf::PreTokStr(_) => { unimplemented!( @@ -182,7 +225,8 @@ fn index_json_value<'a, V: Value<'a>>( doc, val, text_analyzer, - json_term_writer, + term_buffer, + json_path_writer, postings_writer, ctx, positions_per_path, @@ -194,7 +238,8 @@ fn index_json_value<'a, V: Value<'a>>( doc, object, text_analyzer, - json_term_writer, + term_buffer, + json_path_writer, postings_writer, ctx, positions_per_path, @@ -361,6 +406,7 @@ impl<'a> JsonTermWriter<'a> { self.term_buffer.append_bytes(&[typ.to_code()]); } + // TODO: Remove this function and use JsonPathWriter instead. pub fn push_path_segment(&mut self, segment: &str) { // the path stack should never be empty. self.trim_to_end_of_path(); diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index 666909391..13731444a 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -5,6 +5,7 @@ //! [`Index::writer`](crate::Index::writer). pub(crate) mod delete_queue; +pub(crate) mod path_to_unordered_id; pub(crate) mod doc_id_mapping; mod doc_opstamp_mapping; diff --git a/src/indexer/path_to_unordered_id.rs b/src/indexer/path_to_unordered_id.rs new file mode 100644 index 000000000..054654f94 --- /dev/null +++ b/src/indexer/path_to_unordered_id.rs @@ -0,0 +1,92 @@ +use fnv::FnvHashMap; + +/// `Field` is represented by an unsigned 32-bit integer type. +/// The schema holds the mapping between field names and `Field` objects. +#[derive(Copy, Default, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash)] +pub struct OrderedPathId(u32); + +impl OrderedPathId { + /// Create a new field object for the given PathId. + pub const fn from_ordered_id(field_id: u32) -> OrderedPathId { + OrderedPathId(field_id) + } + + /// Returns a u32 identifying uniquely a path within a schema. + pub const fn path_id(self) -> u32 { + self.0 + } +} +impl From for OrderedPathId { + fn from(id: u32) -> Self { + Self(id) + } +} + +#[derive(Default)] +pub(crate) struct PathToUnorderedId { + map: FnvHashMap, +} + +impl PathToUnorderedId { + #[inline] + pub(crate) fn get_or_allocate_unordered_id(&mut self, path: &str) -> u32 { + if let Some(id) = self.map.get(path) { + return *id; + } + self.insert_new_path(path) + } + #[cold] + fn insert_new_path(&mut self, path: &str) -> u32 { + let next_id = self.map.len() as u32; + self.map.insert(path.to_string(), next_id); + next_id + } + + /// Retuns ids which reflect the lexical order of the paths. + /// + /// The returned vec can be indexed with the unordered id to get the ordered id. + pub(crate) fn unordered_id_to_ordered_id(&self) -> Vec { + let mut sorted_ids: Vec<(&str, &u32)> = + self.map.iter().map(|(k, v)| (k.as_str(), v)).collect(); + sorted_ids.sort_unstable_by_key(|(path, _)| *path); + let mut result = vec![OrderedPathId::default(); sorted_ids.len()]; + for (ordered, unordered) in sorted_ids.iter().map(|(_k, v)| v).enumerate() { + result[**unordered as usize] = OrderedPathId::from_ordered_id(ordered as u32); + } + result + } + + /// Retuns the paths so they can be queried by the ordered id (which is the index). + pub(crate) fn ordered_id_to_path(&self) -> Vec<&str> { + let mut paths = self.map.keys().map(String::as_str).collect::>(); + paths.sort_unstable(); + paths + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn path_to_unordered_test() { + let mut path_to_id = PathToUnorderedId::default(); + let terms = vec!["b", "a", "b", "c"]; + let ids = terms + .iter() + .map(|term| path_to_id.get_or_allocate_unordered_id(term)) + .collect::>(); + assert_eq!(ids, vec![0, 1, 0, 2]); + let ordered_ids = ids + .iter() + .map(|id| path_to_id.unordered_id_to_ordered_id()[*id as usize]) + .collect::>(); + assert_eq!(ordered_ids, vec![1.into(), 0.into(), 1.into(), 2.into()]); + // Fetch terms + let terms_fetched = ordered_ids + .iter() + .map(|id| path_to_id.ordered_id_to_path()[id.path_id() as usize]) + .collect::>(); + assert_eq!(terms_fetched, terms); + } +} diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index 1369f1ad3..180d5a66d 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -1,4 +1,5 @@ use columnar::MonotonicallyMappableToU64; +use common::JsonPathWriter; use itertools::Itertools; use tokenizer_api::BoxTokenStream; @@ -66,6 +67,7 @@ pub struct SegmentWriter { pub(crate) segment_serializer: SegmentSerializer, pub(crate) fast_field_writers: FastFieldsWriter, pub(crate) fieldnorms_writer: FieldNormsWriter, + pub(crate) json_path_writer: JsonPathWriter, pub(crate) doc_opstamps: Vec, per_field_text_analyzers: Vec, term_buffer: Term, @@ -116,6 +118,7 @@ impl SegmentWriter { ctx: IndexingContext::new(table_size), per_field_postings_writers, fieldnorms_writer: FieldNormsWriter::for_schema(&schema), + json_path_writer: JsonPathWriter::default(), segment_serializer, fast_field_writers: FastFieldsWriter::from_schema_and_tokenizer_manager( &schema, @@ -144,6 +147,7 @@ impl SegmentWriter { .map(|sort_by_field| get_doc_id_mapping_from_field(sort_by_field, &self)) .transpose()?; remap_and_write( + self.schema, &self.per_field_postings_writers, self.ctx, self.fast_field_writers, @@ -355,6 +359,7 @@ impl SegmentWriter { json_options.is_expand_dots_enabled(), term_buffer, postings_writer, + &mut self.json_path_writer, ctx, )?; } @@ -422,6 +427,7 @@ impl SegmentWriter { /// /// `doc_id_map` is used to map to the new doc_id order. fn remap_and_write( + schema: Schema, per_field_postings_writers: &PerFieldPostingsWriter, ctx: IndexingContext, fast_field_writers: FastFieldsWriter, @@ -439,6 +445,7 @@ fn remap_and_write( let fieldnorm_readers = FieldNormReaders::open(fieldnorm_data)?; serialize_postings( ctx, + schema, per_field_postings_writers, fieldnorm_readers, doc_id_map, @@ -489,11 +496,11 @@ mod tests { use tempfile::TempDir; use super::compute_initial_table_size; - use crate::collector::Count; + use crate::collector::{Count, TopDocs}; use crate::core::json_utils::JsonTermWriter; use crate::directory::RamDirectory; use crate::postings::TermInfo; - use crate::query::PhraseQuery; + use crate::query::{PhraseQuery, QueryParser}; use crate::schema::document::Value; use crate::schema::{ Document, IndexRecordOption, Schema, TextFieldIndexing, TextOptions, Type, STORED, STRING, @@ -552,6 +559,43 @@ mod tests { assert_eq!(doc.field_values()[0].value().as_str(), Some("A")); assert_eq!(doc.field_values()[1].value().as_str(), Some("title")); } + #[test] + fn test_simple_json_indexing() { + let mut schema_builder = Schema::builder(); + let json_field = schema_builder.add_json_field("json", STORED | STRING); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema.clone()); + let mut writer = index.writer_for_tests().unwrap(); + writer + .add_document(doc!(json_field=>json!({"my_field": "b"}))) + .unwrap(); + writer + .add_document(doc!(json_field=>json!({"my_field": "a"}))) + .unwrap(); + writer + .add_document(doc!(json_field=>json!({"my_field": "b"}))) + .unwrap(); + writer.commit().unwrap(); + + let query_parser = QueryParser::for_index(&index, vec![json_field]); + let text_query = query_parser.parse_query("my_field:a").unwrap(); + let score_docs: Vec<(_, DocAddress)> = index + .reader() + .unwrap() + .searcher() + .search(&text_query, &TopDocs::with_limit(4)) + .unwrap(); + assert_eq!(score_docs.len(), 1); + + let text_query = query_parser.parse_query("my_field:b").unwrap(); + let score_docs: Vec<(_, DocAddress)> = index + .reader() + .unwrap() + .searcher() + .search(&text_query, &TopDocs::with_limit(4)) + .unwrap(); + assert_eq!(score_docs.len(), 2); + } #[test] fn test_json_indexing() { diff --git a/src/postings/indexing_context.rs b/src/postings/indexing_context.rs index 975de71dd..2675476f3 100644 --- a/src/postings/indexing_context.rs +++ b/src/postings/indexing_context.rs @@ -1,5 +1,7 @@ use stacker::{ArenaHashMap, MemoryArena}; +use crate::indexer::path_to_unordered_id::PathToUnorderedId; + /// IndexingContext contains all of the transient memory arenas /// required for building the inverted index. pub(crate) struct IndexingContext { @@ -8,6 +10,7 @@ pub(crate) struct IndexingContext { pub term_index: ArenaHashMap, /// Arena is a memory arena that stores posting lists / term frequencies / positions. pub arena: MemoryArena, + pub path_to_unordered_id: PathToUnorderedId, } impl IndexingContext { @@ -17,6 +20,7 @@ impl IndexingContext { IndexingContext { arena: MemoryArena::default(), term_index, + path_to_unordered_id: PathToUnorderedId::default(), } } diff --git a/src/postings/json_postings_writer.rs b/src/postings/json_postings_writer.rs index 0f875768c..db337f458 100644 --- a/src/postings/json_postings_writer.rs +++ b/src/postings/json_postings_writer.rs @@ -3,10 +3,11 @@ use std::io; use stacker::Addr; use crate::indexer::doc_id_mapping::DocIdMapping; +use crate::indexer::path_to_unordered_id::OrderedPathId; use crate::postings::postings_writer::SpecializedPostingsWriter; use crate::postings::recorder::{BufferLender, DocIdRecorder, Recorder}; use crate::postings::{FieldSerializer, IndexingContext, IndexingPosition, PostingsWriter}; -use crate::schema::Type; +use crate::schema::{Field, Type, JSON_END_OF_PATH}; use crate::tokenizer::TokenStream; use crate::{DocId, Term}; @@ -54,18 +55,24 @@ impl PostingsWriter for JsonPostingsWriter { /// The actual serialization format is handled by the `PostingsSerializer`. fn serialize( &self, - term_addrs: &[(Term<&[u8]>, Addr)], + term_addrs: &[(Field, OrderedPathId, &[u8], Addr)], + ordered_id_to_path: &[&str], doc_id_map: Option<&DocIdMapping>, ctx: &IndexingContext, serializer: &mut FieldSerializer, ) -> io::Result<()> { + let mut term_buffer = Term::with_capacity(48); let mut buffer_lender = BufferLender::default(); - for (term, addr) in term_addrs { - if let Some(json_value) = term.value().as_json_value_bytes() { + for (_field, path_id, term, addr) in term_addrs { + term_buffer.clear_with_field_and_type(Type::Json, Field::from_field_id(0)); + term_buffer.append_bytes(ordered_id_to_path[path_id.path_id() as usize].as_bytes()); + term_buffer.append_bytes(&[JSON_END_OF_PATH]); + term_buffer.append_bytes(term); + if let Some(json_value) = term_buffer.value().as_json_value_bytes() { let typ = json_value.typ(); if typ == Type::Str { SpecializedPostingsWriter::::serialize_one_term( - term, + term_buffer.serialized_value_bytes(), *addr, doc_id_map, &mut buffer_lender, @@ -74,7 +81,7 @@ impl PostingsWriter for JsonPostingsWriter { )?; } else { SpecializedPostingsWriter::::serialize_one_term( - term, + term_buffer.serialized_value_bytes(), *addr, doc_id_map, &mut buffer_lender, diff --git a/src/postings/postings_writer.rs b/src/postings/postings_writer.rs index 96952e2a7..c51d4d834 100644 --- a/src/postings/postings_writer.rs +++ b/src/postings/postings_writer.rs @@ -6,20 +6,23 @@ use stacker::Addr; use crate::fieldnorm::FieldNormReaders; use crate::indexer::doc_id_mapping::DocIdMapping; +use crate::indexer::path_to_unordered_id::OrderedPathId; use crate::postings::recorder::{BufferLender, Recorder}; use crate::postings::{ FieldSerializer, IndexingContext, InvertedIndexSerializer, PerFieldPostingsWriter, }; -use crate::schema::{Field, Term}; +use crate::schema::{Field, Schema, Term, Type}; use crate::tokenizer::{Token, TokenStream, MAX_TOKEN_LEN}; use crate::DocId; const POSITION_GAP: u32 = 1; -fn make_field_partition(term_offsets: &[(Term<&[u8]>, Addr)]) -> Vec<(Field, Range)> { +fn make_field_partition( + term_offsets: &[(Field, OrderedPathId, &[u8], Addr)], +) -> Vec<(Field, Range)> { let term_offsets_it = term_offsets .iter() - .map(|(term, _)| term.field()) + .map(|(field, _, _, _)| *field) .enumerate(); let mut prev_field_opt = None; let mut fields = vec![]; @@ -44,19 +47,36 @@ fn make_field_partition(term_offsets: &[(Term<&[u8]>, Addr)]) -> Vec<(Field, Ran /// postings serializer. pub(crate) fn serialize_postings( ctx: IndexingContext, + schema: Schema, per_field_postings_writers: &PerFieldPostingsWriter, fieldnorm_readers: FieldNormReaders, doc_id_map: Option<&DocIdMapping>, serializer: &mut InvertedIndexSerializer, ) -> crate::Result<()> { - let mut term_offsets: Vec<(Term<&[u8]>, Addr)> = Vec::with_capacity(ctx.term_index.len()); - term_offsets.extend( - ctx.term_index - .iter() - .map(|(bytes, addr)| (Term::wrap(bytes), addr)), - ); - term_offsets.sort_unstable_by_key(|(k, _)| k.clone()); + // Replace unordered ids by ordered ids to be able to sort + let unordered_id_to_ordered_id: Vec = + ctx.path_to_unordered_id.unordered_id_to_ordered_id(); + let mut term_offsets: Vec<(Field, OrderedPathId, &[u8], Addr)> = + Vec::with_capacity(ctx.term_index.len()); + term_offsets.extend(ctx.term_index.iter().map(|(key, addr)| { + let field = Term::wrap(key).field(); + if schema.get_field_entry(field).field_type().value_type() == Type::Json { + let byte_range_path = 5..5 + 4; + let unordered_id = u32::from_be_bytes(key[byte_range_path.clone()].try_into().unwrap()); + let path_id = unordered_id_to_ordered_id[unordered_id as usize]; + (field, path_id, &key[byte_range_path.end..], addr) + } else { + (field, 0.into(), &key[5..], addr) + } + })); + // Sort by field, path, and term + term_offsets.sort_unstable_by( + |(field1, path_id1, bytes1, _), (field2, path_id2, bytes2, _)| { + (field1, path_id1, bytes1).cmp(&(field2, path_id2, bytes2)) + }, + ); + let ordered_id_to_path = ctx.path_to_unordered_id.ordered_id_to_path(); let field_offsets = make_field_partition(&term_offsets); for (field, byte_offsets) in field_offsets { let postings_writer = per_field_postings_writers.get_for_field(field); @@ -65,12 +85,14 @@ pub(crate) fn serialize_postings( serializer.new_field(field, postings_writer.total_num_tokens(), fieldnorm_reader)?; postings_writer.serialize( &term_offsets[byte_offsets], + &ordered_id_to_path, doc_id_map, &ctx, &mut field_serializer, )?; field_serializer.close()?; } + Ok(()) } @@ -98,7 +120,8 @@ pub(crate) trait PostingsWriter: Send + Sync { /// The actual serialization format is handled by the `PostingsSerializer`. fn serialize( &self, - term_addrs: &[(Term<&[u8]>, Addr)], + term_addrs: &[(Field, OrderedPathId, &[u8], Addr)], + ordered_id_to_path: &[&str], doc_id_map: Option<&DocIdMapping>, ctx: &IndexingContext, serializer: &mut FieldSerializer, @@ -162,7 +185,7 @@ impl From> for Box SpecializedPostingsWriter { #[inline] pub(crate) fn serialize_one_term( - term: &Term<&[u8]>, + term: &[u8], addr: Addr, doc_id_map: Option<&DocIdMapping>, buffer_lender: &mut BufferLender, @@ -171,7 +194,7 @@ impl SpecializedPostingsWriter { ) -> io::Result<()> { let recorder: Rec = ctx.term_index.read(addr); let term_doc_freq = recorder.term_doc_freq().unwrap_or(0u32); - serializer.new_term(term.serialized_value_bytes(), term_doc_freq)?; + serializer.new_term(term, term_doc_freq)?; recorder.serialize(&ctx.arena, doc_id_map, serializer, buffer_lender); serializer.close_term()?; Ok(()) @@ -204,13 +227,14 @@ impl PostingsWriter for SpecializedPostingsWriter { fn serialize( &self, - term_addrs: &[(Term<&[u8]>, Addr)], + term_addrs: &[(Field, OrderedPathId, &[u8], Addr)], + _ordered_id_to_path: &[&str], doc_id_map: Option<&DocIdMapping>, ctx: &IndexingContext, serializer: &mut FieldSerializer, ) -> io::Result<()> { let mut buffer_lender = BufferLender::default(); - for (term, addr) in term_addrs { + for (_field, _path_id, term, addr) in term_addrs { Self::serialize_one_term(term, *addr, doc_id_map, &mut buffer_lender, ctx, serializer)?; } Ok(()) diff --git a/src/schema/field_type.rs b/src/schema/field_type.rs index 166977b38..04e71394b 100644 --- a/src/schema/field_type.rs +++ b/src/schema/field_type.rs @@ -93,6 +93,7 @@ impl Type { } /// Returns a 1 byte code used to identify the type. + #[inline] pub fn to_code(&self) -> u8 { *self as u8 } @@ -115,6 +116,7 @@ impl Type { /// Interprets a 1byte code as a type. /// Returns `None` if the code is invalid. + #[inline] pub fn from_code(code: u8) -> Option { match code { b's' => Some(Type::Str), diff --git a/src/schema/schema.rs b/src/schema/schema.rs index 98d6de94e..ed3a60193 100644 --- a/src/schema/schema.rs +++ b/src/schema/schema.rs @@ -278,6 +278,7 @@ fn locate_splitting_dots(field_path: &str) -> Vec { impl Schema { /// Return the `FieldEntry` associated with a `Field`. + #[inline] pub fn get_field_entry(&self, field: Field) -> &FieldEntry { &self.0.fields[field.field_id() as usize] } diff --git a/src/schema/term.rs b/src/schema/term.rs index 995137b53..db707e294 100644 --- a/src/schema/term.rs +++ b/src/schema/term.rs @@ -3,7 +3,7 @@ use std::hash::{Hash, Hasher}; use std::net::Ipv6Addr; use std::{fmt, str}; -use columnar::MonotonicallyMappableToU128; +use columnar::{MonotonicallyMappableToU128, MonotonicallyMappableToU64}; use super::date_time_options::DATE_TIME_PRECISION_INDEXED; use super::Field; @@ -170,6 +170,18 @@ impl Term { self.set_bytes(val.to_u64().to_be_bytes().as_ref()); } + pub(crate) fn append_type_and_fast_value(&mut self, val: T) { + self.0.push(T::to_type().to_code()); + let value = if T::to_type() == Type::Date { + DateTime::from_u64(val.to_u64()) + .truncate(DATE_TIME_PRECISION_INDEXED) + .to_u64() + } else { + val.to_u64() + }; + self.0.extend(value.to_be_bytes().as_ref()); + } + /// Sets a `Ipv6Addr` value in the term. pub fn set_ip_addr(&mut self, val: Ipv6Addr) { self.set_bytes(val.to_u128().to_be_bytes().as_ref()); diff --git a/stacker/src/arena_hashmap.rs b/stacker/src/arena_hashmap.rs index d855e7043..09999d021 100644 --- a/stacker/src/arena_hashmap.rs +++ b/stacker/src/arena_hashmap.rs @@ -23,7 +23,7 @@ type HashType = u64; /// The `value_addr` also points to an address in the memory arena. #[derive(Copy, Clone)] struct KeyValue { - key_value_addr: Addr, + pub(crate) key_value_addr: Addr, hash: HashType, } @@ -58,7 +58,7 @@ impl KeyValue { /// or copying the key as long as there is no insert. pub struct ArenaHashMap { table: Vec, - memory_arena: MemoryArena, + pub memory_arena: MemoryArena, mask: usize, len: usize, } diff --git a/stacker/src/memory_arena.rs b/stacker/src/memory_arena.rs index f3ed5d4bb..0d5de72f0 100644 --- a/stacker/src/memory_arena.rs +++ b/stacker/src/memory_arena.rs @@ -148,6 +148,11 @@ impl MemoryArena { self.get_page(addr.page_id()) .slice_from(addr.page_local_addr()) } + #[inline] + pub fn slice_from_mut(&mut self, addr: Addr) -> &mut [u8] { + self.get_page_mut(addr.page_id()) + .slice_from_mut(addr.page_local_addr()) + } #[inline] pub fn slice_mut(&mut self, addr: Addr, len: usize) -> &mut [u8] { @@ -206,6 +211,10 @@ impl Page { fn slice_from(&self, local_addr: usize) -> &[u8] { &self.data[local_addr..] } + #[inline] + fn slice_from_mut(&mut self, local_addr: usize) -> &mut [u8] { + &mut self.data[local_addr..] + } #[inline] fn slice_mut(&mut self, local_addr: usize, len: usize) -> &mut [u8] {