Compare commits

...

4 Commits

Author SHA1 Message Date
Paul Masurel
7453df8db3 postings writer enum 2026-01-13 15:24:03 +01:00
Paul Masurel
ba6abba20a First stab at introducing codecs 2026-01-12 19:41:06 +01:00
Paul Masurel
d128e5c2a2 first stab at codec 2026-01-12 15:59:43 +01:00
Paul Masurel
e6d062bf2d Minor refactoring in PostingsSerializer
Removes the Write generics argument in PostingsSerializer.
This removes useless generic.
Prepares the path for codecs.
Removes one useless CountingWrite layer.
etc.
2026-01-12 11:06:45 +01:00
25 changed files with 1292 additions and 266 deletions

49
src/codec/mod.rs Normal file
View File

@@ -0,0 +1,49 @@
mod postings;
mod standard;
use std::borrow::Cow;
use serde::{Deserialize, Serialize};
pub use standard::StandardCodec;
pub trait Codec: Clone + std::fmt::Debug + Send + Sync + 'static {
type PostingsCodec;
const NAME: &'static str;
fn from_json_props(json_value: &serde_json::Value) -> crate::Result<Self>;
fn to_json_props(&self) -> serde_json::Value;
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct CodecConfiguration {
name: Cow<'static, str>,
#[serde(default, skip_serializing_if = "serde_json::Value::is_null")]
props: serde_json::Value,
}
impl CodecConfiguration {
pub fn from_codec<C: Codec>(codec: &C) -> Self {
CodecConfiguration {
name: Cow::Borrowed(C::NAME),
props: codec.to_json_props(),
}
}
pub fn to_codec<C: Codec>(&self) -> crate::Result<C> {
if self.name != C::NAME {
return Err(crate::TantivyError::InvalidArgument(format!(
"Codec name mismatch: expected {}, got {}",
C::NAME,
self.name
)));
}
C::from_json_props(&self.props)
}
}
impl Default for CodecConfiguration {
fn default() -> Self {
CodecConfiguration::from_codec(&StandardCodec)
}
}

23
src/codec/postings/mod.rs Normal file
View File

@@ -0,0 +1,23 @@
use std::io;
use crate::fieldnorm::FieldNormReader;
use crate::schema::IndexRecordOption;
use crate::{DocId, Score};
pub trait PostingsCodec {
type PostingsSerializer: PostingsSerializer;
}
pub trait PostingsSerializer {
fn new(
avg_fieldnorm: Score,
mode: IndexRecordOption,
fieldnorm_reader: Option<FieldNormReader>,
) -> Self;
fn new_term(&mut self, term_doc_freq: u32, record_term_freq: bool);
fn write_doc(&mut self, doc_id: DocId, term_freq: u32);
fn close_term(&mut self, doc_freq: u32, wrt: &mut impl io::Write) -> io::Result<()>;
}

29
src/codec/standard/mod.rs Normal file
View File

@@ -0,0 +1,29 @@
use serde::{Deserialize, Serialize};
use crate::codec::standard::postings::StandardPostingsCodec;
use crate::codec::Codec;
mod postings;
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct StandardCodec;
impl Codec for StandardCodec {
type PostingsCodec = StandardPostingsCodec;
const NAME: &'static str = "standard";
fn from_json_props(json_value: &serde_json::Value) -> crate::Result<Self> {
if !json_value.is_null() {
return Err(crate::TantivyError::InvalidArgument(format!(
"Codec property for the StandardCodec are unexpected. expected null, got {}",
json_value.as_str().unwrap_or("null")
)));
}
Ok(StandardCodec)
}
fn to_json_props(&self) -> serde_json::Value {
serde_json::Value::Null
}
}

View File

@@ -0,0 +1,50 @@
use crate::postings::compression::COMPRESSION_BLOCK_SIZE;
use crate::DocId;
pub struct Block {
doc_ids: [DocId; COMPRESSION_BLOCK_SIZE],
term_freqs: [u32; COMPRESSION_BLOCK_SIZE],
len: usize,
}
impl Block {
pub fn new() -> Self {
Block {
doc_ids: [0u32; COMPRESSION_BLOCK_SIZE],
term_freqs: [0u32; COMPRESSION_BLOCK_SIZE],
len: 0,
}
}
pub fn doc_ids(&self) -> &[DocId] {
&self.doc_ids[..self.len]
}
pub fn term_freqs(&self) -> &[u32] {
&self.term_freqs[..self.len]
}
pub fn clear(&mut self) {
self.len = 0;
}
pub fn append_doc(&mut self, doc: DocId, term_freq: u32) {
let len = self.len;
self.doc_ids[len] = doc;
self.term_freqs[len] = term_freq;
self.len = len + 1;
}
pub fn is_full(&self) -> bool {
self.len == COMPRESSION_BLOCK_SIZE
}
pub fn is_empty(&self) -> bool {
self.len == 0
}
pub fn last_doc(&self) -> DocId {
assert_eq!(self.len, COMPRESSION_BLOCK_SIZE);
self.doc_ids[COMPRESSION_BLOCK_SIZE - 1]
}
}

View File

@@ -0,0 +1,13 @@
use crate::codec::postings::PostingsCodec;
mod block;
mod postings_serializer;
mod skip;
pub use postings_serializer::StandardPostingsSerializer;
pub struct StandardPostingsCodec;
impl PostingsCodec for StandardPostingsCodec {
type PostingsSerializer = StandardPostingsSerializer;
}

View File

@@ -0,0 +1,187 @@
use std::cmp::Ordering;
use std::io::{self, Write as _};
use common::{BinarySerializable as _, VInt};
use crate::codec::postings::PostingsSerializer;
use crate::codec::standard::postings::block::Block;
use crate::codec::standard::postings::skip::SkipSerializer;
use crate::fieldnorm::FieldNormReader;
use crate::postings::compression::{BlockEncoder, VIntEncoder as _, COMPRESSION_BLOCK_SIZE};
use crate::query::Bm25Weight;
use crate::schema::IndexRecordOption;
use crate::{DocId, Score};
pub struct StandardPostingsSerializer {
last_doc_id_encoded: u32,
block_encoder: BlockEncoder,
block: Box<Block>,
postings_write: Vec<u8>,
skip_write: SkipSerializer,
mode: IndexRecordOption,
fieldnorm_reader: Option<FieldNormReader>,
bm25_weight: Option<Bm25Weight>,
avg_fieldnorm: Score, /* Average number of term in the field for that segment.
* this value is used to compute the block wand information. */
term_has_freq: bool,
}
impl PostingsSerializer for StandardPostingsSerializer {
fn new(
avg_fieldnorm: Score,
mode: IndexRecordOption,
fieldnorm_reader: Option<FieldNormReader>,
) -> StandardPostingsSerializer {
Self {
block_encoder: BlockEncoder::new(),
block: Box::new(Block::new()),
postings_write: Vec::new(),
skip_write: SkipSerializer::new(),
last_doc_id_encoded: 0u32,
mode,
fieldnorm_reader,
bm25_weight: None,
avg_fieldnorm,
term_has_freq: false,
}
}
fn new_term(&mut self, term_doc_freq: u32, record_term_freq: bool) {
self.bm25_weight = None;
self.term_has_freq = self.mode.has_freq() && record_term_freq;
if !self.term_has_freq {
return;
}
let num_docs_in_segment: u64 =
if let Some(fieldnorm_reader) = self.fieldnorm_reader.as_ref() {
fieldnorm_reader.num_docs() as u64
} else {
return;
};
if num_docs_in_segment == 0 {
return;
}
self.bm25_weight = Some(Bm25Weight::for_one_term_without_explain(
term_doc_freq as u64,
num_docs_in_segment,
self.avg_fieldnorm,
));
}
fn write_doc(&mut self, doc_id: DocId, term_freq: u32) {
self.block.append_doc(doc_id, term_freq);
if self.block.is_full() {
self.write_block();
}
}
fn close_term(
&mut self,
doc_freq: u32,
output_write: &mut impl std::io::Write,
) -> io::Result<()> {
if !self.block.is_empty() {
// we have doc ids waiting to be written
// this happens when the number of doc ids is
// not a perfect multiple of our block size.
//
// In that case, the remaining part is encoded
// using variable int encoding.
{
let block_encoded = self
.block_encoder
.compress_vint_sorted(self.block.doc_ids(), self.last_doc_id_encoded);
self.postings_write.write_all(block_encoded)?;
}
// ... Idem for term frequencies
if self.term_has_freq {
let block_encoded = self
.block_encoder
.compress_vint_unsorted(self.block.term_freqs());
self.postings_write.write_all(block_encoded)?;
}
self.block.clear();
}
if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 {
let skip_data = self.skip_write.data();
VInt(skip_data.len() as u64).serialize(output_write)?;
output_write.write_all(skip_data)?;
}
output_write.write_all(&self.postings_write[..])?;
self.skip_write.clear();
self.postings_write.clear();
self.bm25_weight = None;
Ok(())
}
}
impl StandardPostingsSerializer {
fn write_block(&mut self) {
{
// encode the doc ids
let (num_bits, block_encoded): (u8, &[u8]) = self
.block_encoder
.compress_block_sorted(self.block.doc_ids(), self.last_doc_id_encoded);
self.last_doc_id_encoded = self.block.last_doc();
self.skip_write
.write_doc(self.last_doc_id_encoded, num_bits);
// last el block 0, offset block 1,
self.postings_write.extend(block_encoded);
}
if self.term_has_freq {
let (num_bits, block_encoded): (u8, &[u8]) = self
.block_encoder
.compress_block_unsorted(self.block.term_freqs(), true);
self.postings_write.extend(block_encoded);
self.skip_write.write_term_freq(num_bits);
if self.mode.has_positions() {
// We serialize the sum of term freqs within the skip information
// in order to navigate through positions.
let sum_freq = self.block.term_freqs().iter().cloned().sum();
self.skip_write.write_total_term_freq(sum_freq);
}
let mut blockwand_params = (0u8, 0u32);
if let Some(bm25_weight) = self.bm25_weight.as_ref() {
if let Some(fieldnorm_reader) = self.fieldnorm_reader.as_ref() {
let docs = self.block.doc_ids().iter().cloned();
let term_freqs = self.block.term_freqs().iter().cloned();
let fieldnorms = docs.map(|doc| fieldnorm_reader.fieldnorm_id(doc));
blockwand_params = fieldnorms
.zip(term_freqs)
.max_by(
|(left_fieldnorm_id, left_term_freq),
(right_fieldnorm_id, right_term_freq)| {
let left_score =
bm25_weight.tf_factor(*left_fieldnorm_id, *left_term_freq);
let right_score =
bm25_weight.tf_factor(*right_fieldnorm_id, *right_term_freq);
left_score
.partial_cmp(&right_score)
.unwrap_or(Ordering::Equal)
},
)
.unwrap();
}
}
let (fieldnorm_id, term_freq) = blockwand_params;
self.skip_write.write_blockwand_max(fieldnorm_id, term_freq);
}
self.block.clear();
}
fn clear(&mut self) {
self.block.clear();
self.last_doc_id_encoded = 0;
}
}

View File

@@ -0,0 +1,448 @@
use crate::directory::OwnedBytes;
use crate::postings::compression::{compressed_block_size, COMPRESSION_BLOCK_SIZE};
use crate::query::Bm25Weight;
use crate::schema::IndexRecordOption;
use crate::{DocId, Score, TERMINATED};
// doc num bits uses the following encoding:
// given 0b a b cdefgh
// |1|2|3| 4 |
// - 1: unused
// - 2: is delta-1 encoded. 0 if not, 1, if yes
// - 3: unused
// - 4: a 5 bit number in 0..32, the actual bitwidth. Bitpacking could in theory say this is 32
// (requiring a 6th bit), but the biggest doc_id we can want to encode is TERMINATED-1, which can
// be represented on 31b without delta encoding.
fn encode_bitwidth(bitwidth: u8, delta_1: bool) -> u8 {
assert!(bitwidth < 32);
bitwidth | ((delta_1 as u8) << 6)
}
fn decode_bitwidth(raw_bitwidth: u8) -> (u8, bool) {
let delta_1 = ((raw_bitwidth >> 6) & 1) != 0;
let bitwidth = raw_bitwidth & 0x1f;
(bitwidth, delta_1)
}
#[inline]
fn encode_block_wand_max_tf(max_tf: u32) -> u8 {
max_tf.min(u8::MAX as u32) as u8
}
#[inline]
fn decode_block_wand_max_tf(max_tf_code: u8) -> u32 {
if max_tf_code == u8::MAX {
u32::MAX
} else {
max_tf_code as u32
}
}
#[inline]
fn read_u32(data: &[u8]) -> u32 {
u32::from_le_bytes(data[..4].try_into().unwrap())
}
#[inline]
fn write_u32(val: u32, buf: &mut Vec<u8>) {
buf.extend_from_slice(&val.to_le_bytes());
}
pub struct SkipSerializer {
buffer: Vec<u8>,
}
impl SkipSerializer {
pub fn new() -> SkipSerializer {
SkipSerializer { buffer: Vec::new() }
}
pub fn write_doc(&mut self, last_doc: DocId, doc_num_bits: u8) {
write_u32(last_doc, &mut self.buffer);
self.buffer.push(encode_bitwidth(doc_num_bits, true));
}
pub fn write_term_freq(&mut self, tf_num_bits: u8) {
self.buffer.push(tf_num_bits);
}
pub fn write_total_term_freq(&mut self, tf_sum: u32) {
write_u32(tf_sum, &mut self.buffer);
}
pub fn write_blockwand_max(&mut self, fieldnorm_id: u8, term_freq: u32) {
let block_wand_tf = encode_block_wand_max_tf(term_freq);
self.buffer
.extend_from_slice(&[fieldnorm_id, block_wand_tf]);
}
pub fn data(&self) -> &[u8] {
&self.buffer[..]
}
pub fn clear(&mut self) {
self.buffer.clear();
}
}
#[derive(Clone)]
pub(crate) struct SkipReader {
last_doc_in_block: DocId,
pub(crate) last_doc_in_previous_block: DocId,
owned_read: OwnedBytes,
skip_info: IndexRecordOption,
byte_offset: usize,
remaining_docs: u32, // number of docs remaining, including the
// documents in the current block.
block_info: BlockInfo,
position_offset: u64,
}
#[derive(Clone, Eq, PartialEq, Copy, Debug)]
pub(crate) enum BlockInfo {
BitPacked {
doc_num_bits: u8,
strict_delta_encoded: bool,
tf_num_bits: u8,
tf_sum: u32,
block_wand_fieldnorm_id: u8,
block_wand_term_freq: u32,
},
VInt {
num_docs: u32,
},
}
impl Default for BlockInfo {
fn default() -> Self {
BlockInfo::VInt { num_docs: 0u32 }
}
}
impl SkipReader {
pub fn new(data: OwnedBytes, doc_freq: u32, skip_info: IndexRecordOption) -> SkipReader {
let mut skip_reader = SkipReader {
last_doc_in_block: if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 {
0
} else {
TERMINATED
},
last_doc_in_previous_block: 0u32,
owned_read: data,
skip_info,
block_info: BlockInfo::VInt { num_docs: doc_freq },
byte_offset: 0,
remaining_docs: doc_freq,
position_offset: 0u64,
};
if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 {
skip_reader.read_block_info();
}
skip_reader
}
pub fn reset(&mut self, data: OwnedBytes, doc_freq: u32) {
self.last_doc_in_block = if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 {
0
} else {
TERMINATED
};
self.last_doc_in_previous_block = 0u32;
self.owned_read = data;
self.block_info = BlockInfo::VInt { num_docs: doc_freq };
self.byte_offset = 0;
self.remaining_docs = doc_freq;
self.position_offset = 0u64;
if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 {
self.read_block_info();
}
}
// Returns the block max score for this block if available.
//
// The block max score is available for all full bitpacked block,
// but no available for the last VInt encoded incomplete block.
pub fn block_max_score(&self, bm25_weight: &Bm25Weight) -> Option<Score> {
match self.block_info {
BlockInfo::BitPacked {
block_wand_fieldnorm_id,
block_wand_term_freq,
..
} => Some(bm25_weight.score(block_wand_fieldnorm_id, block_wand_term_freq)),
BlockInfo::VInt { .. } => None,
}
}
pub(crate) fn last_doc_in_block(&self) -> DocId {
self.last_doc_in_block
}
pub fn position_offset(&self) -> u64 {
self.position_offset
}
#[inline]
pub fn byte_offset(&self) -> usize {
self.byte_offset
}
fn read_block_info(&mut self) {
let bytes = self.owned_read.as_slice();
let advance_len: usize;
self.last_doc_in_block = read_u32(bytes);
let (doc_num_bits, strict_delta_encoded) = decode_bitwidth(bytes[4]);
match self.skip_info {
IndexRecordOption::Basic => {
advance_len = 5;
self.block_info = BlockInfo::BitPacked {
doc_num_bits,
strict_delta_encoded,
tf_num_bits: 0,
tf_sum: 0,
block_wand_fieldnorm_id: 0,
block_wand_term_freq: 0,
};
}
IndexRecordOption::WithFreqs => {
let tf_num_bits = bytes[5];
let block_wand_fieldnorm_id = bytes[6];
let block_wand_term_freq = decode_block_wand_max_tf(bytes[7]);
advance_len = 8;
self.block_info = BlockInfo::BitPacked {
doc_num_bits,
strict_delta_encoded,
tf_num_bits,
tf_sum: 0,
block_wand_fieldnorm_id,
block_wand_term_freq,
};
}
IndexRecordOption::WithFreqsAndPositions => {
let tf_num_bits = bytes[5];
let tf_sum = read_u32(&bytes[6..10]);
let block_wand_fieldnorm_id = bytes[10];
let block_wand_term_freq = decode_block_wand_max_tf(bytes[11]);
advance_len = 12;
self.block_info = BlockInfo::BitPacked {
doc_num_bits,
strict_delta_encoded,
tf_num_bits,
tf_sum,
block_wand_fieldnorm_id,
block_wand_term_freq,
};
}
}
self.owned_read.advance(advance_len);
}
pub fn block_info(&self) -> BlockInfo {
self.block_info
}
/// Advance the skip reader to the block that may contain the target.
///
/// If the target is larger than all documents, the skip_reader
/// then advance to the last Variable In block.
pub fn seek(&mut self, target: DocId) -> bool {
if self.last_doc_in_block() >= target {
return false;
}
loop {
self.advance();
if self.last_doc_in_block() >= target {
return true;
}
}
}
pub fn advance(&mut self) {
match self.block_info {
BlockInfo::BitPacked {
doc_num_bits,
tf_num_bits,
tf_sum,
..
} => {
self.remaining_docs -= COMPRESSION_BLOCK_SIZE as u32;
self.byte_offset += compressed_block_size(doc_num_bits + tf_num_bits);
self.position_offset += tf_sum as u64;
}
BlockInfo::VInt { num_docs } => {
debug_assert_eq!(num_docs, self.remaining_docs);
self.remaining_docs = 0;
self.byte_offset = usize::MAX;
}
}
self.last_doc_in_previous_block = self.last_doc_in_block;
if self.remaining_docs >= COMPRESSION_BLOCK_SIZE as u32 {
self.read_block_info();
} else {
self.last_doc_in_block = TERMINATED;
self.block_info = BlockInfo::VInt {
num_docs: self.remaining_docs,
};
}
}
}
#[cfg(test)]
mod tests {
use super::{
decode_bitwidth, encode_bitwidth, BlockInfo, IndexRecordOption, SkipReader, SkipSerializer,
};
use crate::directory::OwnedBytes;
use crate::postings::compression::COMPRESSION_BLOCK_SIZE;
#[test]
fn test_encode_block_wand_max_tf() {
for tf in 0..255 {
assert_eq!(super::encode_block_wand_max_tf(tf), tf as u8);
}
for &tf in &[255, 256, 1_000_000, u32::MAX] {
assert_eq!(super::encode_block_wand_max_tf(tf), 255);
}
}
#[test]
fn test_decode_block_wand_max_tf() {
for tf in 0..255 {
assert_eq!(super::decode_block_wand_max_tf(tf), tf as u32);
}
assert_eq!(super::decode_block_wand_max_tf(255), u32::MAX);
}
#[test]
fn test_skip_with_freq() {
let buf = {
let mut skip_serializer = SkipSerializer::new();
skip_serializer.write_doc(1u32, 2u8);
skip_serializer.write_term_freq(3u8);
skip_serializer.write_blockwand_max(13u8, 3u32);
skip_serializer.write_doc(5u32, 5u8);
skip_serializer.write_term_freq(2u8);
skip_serializer.write_blockwand_max(8u8, 2u32);
skip_serializer.data().to_owned()
};
let doc_freq = 3u32 + (COMPRESSION_BLOCK_SIZE * 2) as u32;
let mut skip_reader =
SkipReader::new(OwnedBytes::new(buf), doc_freq, IndexRecordOption::WithFreqs);
assert_eq!(skip_reader.last_doc_in_block(), 1u32);
assert_eq!(
skip_reader.block_info,
BlockInfo::BitPacked {
doc_num_bits: 2u8,
strict_delta_encoded: true,
tf_num_bits: 3u8,
tf_sum: 0,
block_wand_fieldnorm_id: 13,
block_wand_term_freq: 3
}
);
skip_reader.advance();
assert_eq!(skip_reader.last_doc_in_block(), 5u32);
assert_eq!(
skip_reader.block_info(),
BlockInfo::BitPacked {
doc_num_bits: 5u8,
strict_delta_encoded: true,
tf_num_bits: 2u8,
tf_sum: 0,
block_wand_fieldnorm_id: 8,
block_wand_term_freq: 2
}
);
skip_reader.advance();
assert_eq!(skip_reader.block_info(), BlockInfo::VInt { num_docs: 3u32 });
skip_reader.advance();
assert_eq!(skip_reader.block_info(), BlockInfo::VInt { num_docs: 0u32 });
skip_reader.advance();
assert_eq!(skip_reader.block_info(), BlockInfo::VInt { num_docs: 0u32 });
}
#[test]
fn test_skip_no_freq() {
let buf = {
let mut skip_serializer = SkipSerializer::new();
skip_serializer.write_doc(1u32, 2u8);
skip_serializer.write_doc(5u32, 5u8);
skip_serializer.data().to_owned()
};
let doc_freq = 3u32 + (COMPRESSION_BLOCK_SIZE * 2) as u32;
let mut skip_reader =
SkipReader::new(OwnedBytes::new(buf), doc_freq, IndexRecordOption::Basic);
assert_eq!(skip_reader.last_doc_in_block(), 1u32);
assert_eq!(
skip_reader.block_info(),
BlockInfo::BitPacked {
doc_num_bits: 2u8,
strict_delta_encoded: true,
tf_num_bits: 0,
tf_sum: 0u32,
block_wand_fieldnorm_id: 0,
block_wand_term_freq: 0
}
);
skip_reader.advance();
assert_eq!(skip_reader.last_doc_in_block(), 5u32);
assert_eq!(
skip_reader.block_info(),
BlockInfo::BitPacked {
doc_num_bits: 5u8,
strict_delta_encoded: true,
tf_num_bits: 0,
tf_sum: 0u32,
block_wand_fieldnorm_id: 0,
block_wand_term_freq: 0
}
);
skip_reader.advance();
assert_eq!(skip_reader.block_info(), BlockInfo::VInt { num_docs: 3u32 });
skip_reader.advance();
assert_eq!(skip_reader.block_info(), BlockInfo::VInt { num_docs: 0u32 });
skip_reader.advance();
assert_eq!(skip_reader.block_info(), BlockInfo::VInt { num_docs: 0u32 });
}
#[test]
fn test_skip_multiple_of_block_size() {
let buf = {
let mut skip_serializer = SkipSerializer::new();
skip_serializer.write_doc(1u32, 2u8);
skip_serializer.data().to_owned()
};
let doc_freq = COMPRESSION_BLOCK_SIZE as u32;
let mut skip_reader =
SkipReader::new(OwnedBytes::new(buf), doc_freq, IndexRecordOption::Basic);
assert_eq!(skip_reader.last_doc_in_block(), 1u32);
assert_eq!(
skip_reader.block_info(),
BlockInfo::BitPacked {
doc_num_bits: 2u8,
strict_delta_encoded: true,
tf_num_bits: 0,
tf_sum: 0u32,
block_wand_fieldnorm_id: 0,
block_wand_term_freq: 0
}
);
skip_reader.advance();
assert_eq!(skip_reader.block_info(), BlockInfo::VInt { num_docs: 0u32 });
}
#[test]
fn test_encode_decode_bitwidth() {
for bitwidth in 0..32 {
for delta_1 in [false, true] {
assert_eq!(
(bitwidth, delta_1),
decode_bitwidth(encode_bitwidth(bitwidth, delta_1))
);
}
}
assert_eq!(0b01000010, encode_bitwidth(0b10, true));
assert_eq!(0b00000010, encode_bitwidth(0b10, false));
}
}

View File

@@ -8,6 +8,7 @@ use std::thread::available_parallelism;
use super::segment::Segment;
use super::segment_reader::merge_field_meta_data;
use super::{FieldMetadata, IndexSettings};
use crate::codec::{CodecConfiguration, StandardCodec};
use crate::core::{Executor, META_FILEPATH};
use crate::directory::error::OpenReadError;
#[cfg(feature = "mmap")]
@@ -59,6 +60,7 @@ fn save_new_metas(
schema: Schema,
index_settings: IndexSettings,
directory: &dyn Directory,
codec: CodecConfiguration,
) -> crate::Result<()> {
save_metas(
&IndexMeta {
@@ -67,6 +69,7 @@ fn save_new_metas(
schema,
opstamp: 0u64,
payload: None,
codec,
},
directory,
)?;
@@ -101,18 +104,21 @@ fn save_new_metas(
/// };
/// let index = Index::builder().schema(schema).settings(settings).create_in_ram();
/// ```
pub struct IndexBuilder {
pub struct IndexBuilder<Codec: crate::codec::Codec = StandardCodec> {
schema: Option<Schema>,
index_settings: IndexSettings,
tokenizer_manager: TokenizerManager,
fast_field_tokenizer_manager: TokenizerManager,
codec: Codec,
}
impl Default for IndexBuilder {
impl Default for IndexBuilder<StandardCodec> {
fn default() -> Self {
IndexBuilder::new()
}
}
impl IndexBuilder {
impl IndexBuilder<StandardCodec> {
/// Creates a new `IndexBuilder`
pub fn new() -> Self {
Self {
@@ -120,6 +126,21 @@ impl IndexBuilder {
index_settings: IndexSettings::default(),
tokenizer_manager: TokenizerManager::default(),
fast_field_tokenizer_manager: TokenizerManager::default(),
codec: StandardCodec,
}
}
}
impl<Codec: crate::codec::Codec> IndexBuilder<Codec> {
/// Set the codec
#[must_use]
pub fn codec<NewCodec: crate::codec::Codec>(self, codec: NewCodec) -> IndexBuilder<NewCodec> {
IndexBuilder {
schema: self.schema,
index_settings: self.index_settings,
tokenizer_manager: self.tokenizer_manager,
fast_field_tokenizer_manager: self.fast_field_tokenizer_manager,
codec,
}
}
@@ -154,7 +175,7 @@ impl IndexBuilder {
/// The index will be allocated in anonymous memory.
/// This is useful for indexing small set of documents
/// for instances like unit test or temporary in memory index.
pub fn create_in_ram(self) -> Result<Index, TantivyError> {
pub fn create_in_ram(self) -> Result<Index<Codec>, TantivyError> {
let ram_directory = RamDirectory::create();
self.create(ram_directory)
}
@@ -165,7 +186,7 @@ impl IndexBuilder {
/// If a previous index was in this directory, it returns an
/// [`TantivyError::IndexAlreadyExists`] error.
#[cfg(feature = "mmap")]
pub fn create_in_dir<P: AsRef<Path>>(self, directory_path: P) -> crate::Result<Index> {
pub fn create_in_dir<P: AsRef<Path>>(self, directory_path: P) -> crate::Result<Index<Codec>> {
let mmap_directory: Box<dyn Directory> = Box::new(MmapDirectory::open(directory_path)?);
if Index::exists(&*mmap_directory)? {
return Err(TantivyError::IndexAlreadyExists);
@@ -186,7 +207,7 @@ impl IndexBuilder {
self,
dir: impl Into<Box<dyn Directory>>,
mem_budget: usize,
) -> crate::Result<SingleSegmentIndexWriter<D>> {
) -> crate::Result<SingleSegmentIndexWriter<Codec, D>> {
let index = self.create(dir)?;
let index_simple_writer = SingleSegmentIndexWriter::new(index, mem_budget)?;
Ok(index_simple_writer)
@@ -202,7 +223,7 @@ impl IndexBuilder {
/// For other unit tests, prefer the [`RamDirectory`], see:
/// [`IndexBuilder::create_in_ram()`].
#[cfg(feature = "mmap")]
pub fn create_from_tempdir(self) -> crate::Result<Index> {
pub fn create_from_tempdir(self) -> crate::Result<Index<Codec>> {
let mmap_directory: Box<dyn Directory> = Box::new(MmapDirectory::create_from_tempdir()?);
self.create(mmap_directory)
}
@@ -215,12 +236,15 @@ impl IndexBuilder {
}
/// Opens or creates a new index in the provided directory
pub fn open_or_create<T: Into<Box<dyn Directory>>>(self, dir: T) -> crate::Result<Index> {
pub fn open_or_create<T: Into<Box<dyn Directory>>>(
self,
dir: T,
) -> crate::Result<Index<Codec>> {
let dir: Box<dyn Directory> = dir.into();
if !Index::exists(&*dir)? {
return self.create(dir);
}
let mut index = Index::open(dir)?;
let mut index: Index<Codec> = Index::<Codec>::open_with_codec(dir)?;
index.set_tokenizers(self.tokenizer_manager.clone());
if index.schema() == self.get_expect_schema()? {
Ok(index)
@@ -244,18 +268,26 @@ impl IndexBuilder {
/// Creates a new index given an implementation of the trait `Directory`.
///
/// If a directory previously existed, it will be erased.
fn create<T: Into<Box<dyn Directory>>>(self, dir: T) -> crate::Result<Index> {
pub fn create<T: Into<Box<dyn Directory>>>(self, dir: T) -> crate::Result<Index<Codec>> {
self.create_avoid_monomorphization(dir.into())
}
fn create_avoid_monomorphization(self, dir: Box<dyn Directory>) -> crate::Result<Index<Codec>> {
self.validate()?;
let dir = dir.into();
let directory = ManagedDirectory::wrap(dir)?;
let codec: CodecConfiguration = CodecConfiguration::from_codec(&self.codec);
save_new_metas(
self.get_expect_schema()?,
self.index_settings.clone(),
&directory,
codec,
)?;
let mut metas = IndexMeta::with_schema(self.get_expect_schema()?);
let schema = self.get_expect_schema()?;
let mut metas = IndexMeta::with_schema_and_codec(schema, &self.codec);
metas.index_settings = self.index_settings;
let mut index = Index::open_from_metas(directory, &metas, SegmentMetaInventory::default());
let mut index: Index<Codec> =
Index::<Codec>::open_from_metas(directory, &metas, SegmentMetaInventory::default())?;
index.set_tokenizers(self.tokenizer_manager);
index.set_fast_field_tokenizers(self.fast_field_tokenizer_manager);
Ok(index)
@@ -264,7 +296,7 @@ impl IndexBuilder {
/// Search Index
#[derive(Clone)]
pub struct Index {
pub struct Index<Codec: crate::codec::Codec = crate::codec::StandardCodec> {
directory: ManagedDirectory,
schema: Schema,
settings: IndexSettings,
@@ -272,6 +304,7 @@ pub struct Index {
tokenizers: TokenizerManager,
fast_field_tokenizers: TokenizerManager,
inventory: SegmentMetaInventory,
codec: Codec,
}
impl Index {
@@ -279,41 +312,6 @@ impl Index {
pub fn builder() -> IndexBuilder {
IndexBuilder::new()
}
/// Examines the directory to see if it contains an index.
///
/// Effectively, it only checks for the presence of the `meta.json` file.
pub fn exists(dir: &dyn Directory) -> Result<bool, OpenReadError> {
dir.exists(&META_FILEPATH)
}
/// Accessor to the search executor.
///
/// This pool is used by default when calling `searcher.search(...)`
/// to perform search on the individual segments.
///
/// By default the executor is single thread, and simply runs in the calling thread.
pub fn search_executor(&self) -> &Executor {
&self.executor
}
/// Replace the default single thread search executor pool
/// by a thread pool with a given number of threads.
pub fn set_multithread_executor(&mut self, num_threads: usize) -> crate::Result<()> {
self.executor = Executor::multi_thread(num_threads, "tantivy-search-")?;
Ok(())
}
/// Custom thread pool by a outer thread pool.
pub fn set_executor(&mut self, executor: Executor) {
self.executor = executor;
}
/// Replace the default single thread search executor pool
/// by a thread pool with as many threads as there are CPUs on the system.
pub fn set_default_multithread_executor(&mut self) -> crate::Result<()> {
let default_num_threads = available_parallelism()?.get();
self.set_multithread_executor(default_num_threads)
}
/// Creates a new index using the [`RamDirectory`].
///
@@ -324,6 +322,13 @@ impl Index {
IndexBuilder::new().schema(schema).create_in_ram().unwrap()
}
/// Examines the directory to see if it contains an index.
///
/// Effectively, it only checks for the presence of the `meta.json` file.
pub fn exists(directory: &dyn Directory) -> Result<bool, OpenReadError> {
directory.exists(&META_FILEPATH)
}
/// Creates a new index in a given filepath.
/// The index will use the [`MmapDirectory`].
///
@@ -370,20 +375,107 @@ impl Index {
schema: Schema,
settings: IndexSettings,
) -> crate::Result<Index> {
let dir: Box<dyn Directory> = dir.into();
Self::create_to_avoid_monomorphization(dir.into(), schema, settings)
}
fn create_to_avoid_monomorphization(
dir: Box<dyn Directory>,
schema: Schema,
settings: IndexSettings,
) -> crate::Result<Index> {
let mut builder = IndexBuilder::new().schema(schema);
builder = builder.settings(settings);
builder.create(dir)
}
/// Opens a new directory from an index path.
#[cfg(feature = "mmap")]
pub fn open_in_dir<P: AsRef<Path>>(directory_path: P) -> crate::Result<Index> {
Self::open_in_dir_to_avoid_monomorphization(directory_path.as_ref())
}
#[inline(never)]
fn open_in_dir_to_avoid_monomorphization(directory_path: &Path) -> crate::Result<Index> {
let mmap_directory = MmapDirectory::open(directory_path)?;
Index::open(mmap_directory)
}
/// Open the index using the provided directory
pub fn open<T: Into<Box<dyn Directory>>>(directory: T) -> crate::Result<Index> {
Index::<StandardCodec>::open_with_codec(directory.into())
}
}
impl<Codec: crate::codec::Codec> Index<Codec> {
/// Returns a version of this index with the standard codec.
/// This is useful when you need to pass the index to APIs that
/// don't care about the codec (e.g., for reading).
pub(crate) fn with_standard_codec(&self) -> Index<StandardCodec> {
Index {
directory: self.directory.clone(),
schema: self.schema.clone(),
settings: self.settings.clone(),
executor: self.executor.clone(),
tokenizers: self.tokenizers.clone(),
fast_field_tokenizers: self.fast_field_tokenizers.clone(),
inventory: self.inventory.clone(),
codec: StandardCodec::default(),
}
}
/// Open the index using the provided directory
#[inline(never)]
pub fn open_with_codec(directory: Box<dyn Directory>) -> crate::Result<Index<Codec>> {
let directory = ManagedDirectory::wrap(directory)?;
let inventory = SegmentMetaInventory::default();
let metas = load_metas(&directory, &inventory)?;
let index: Index<Codec> = Index::<Codec>::open_from_metas(directory, &metas, inventory)?;
Ok(index)
}
/// Accessor to the codec.
pub fn codec(&self) -> &Codec {
&self.codec
}
/// Accessor to the search executor.
///
/// This pool is used by default when calling `searcher.search(...)`
/// to perform search on the individual segments.
///
/// By default the executor is single thread, and simply runs in the calling thread.
pub fn search_executor(&self) -> &Executor {
&self.executor
}
/// Replace the default single thread search executor pool
/// by a thread pool with a given number of threads.
pub fn set_multithread_executor(&mut self, num_threads: usize) -> crate::Result<()> {
self.executor = Executor::multi_thread(num_threads, "tantivy-search-")?;
Ok(())
}
/// Custom thread pool by a outer thread pool.
pub fn set_executor(&mut self, executor: Executor) {
self.executor = executor;
}
/// Replace the default single thread search executor pool
/// by a thread pool with as many threads as there are CPUs on the system.
pub fn set_default_multithread_executor(&mut self) -> crate::Result<()> {
let default_num_threads = available_parallelism()?.get();
self.set_multithread_executor(default_num_threads)
}
/// Creates a new index given a directory and an [`IndexMeta`].
fn open_from_metas(
fn open_from_metas<C: crate::codec::Codec>(
directory: ManagedDirectory,
metas: &IndexMeta,
inventory: SegmentMetaInventory,
) -> Index {
) -> crate::Result<Index<C>> {
let schema = metas.schema.clone();
Index {
let codec = metas.codec.to_codec::<C>()?;
Ok(Index {
settings: metas.index_settings.clone(),
directory,
schema,
@@ -391,7 +483,8 @@ impl Index {
fast_field_tokenizers: TokenizerManager::default(),
executor: Executor::single_thread(),
inventory,
}
codec,
})
}
/// Setter for the tokenizer manager.
@@ -447,7 +540,7 @@ impl Index {
/// Create a default [`IndexReader`] for the given index.
///
/// See [`Index.reader_builder()`].
pub fn reader(&self) -> crate::Result<IndexReader> {
pub fn reader(&self) -> crate::Result<IndexReader<Codec>> {
self.reader_builder().try_into()
}
@@ -455,17 +548,10 @@ impl Index {
///
/// Most project should create at most one reader for a given index.
/// This method is typically called only once per `Index` instance.
pub fn reader_builder(&self) -> IndexReaderBuilder {
pub fn reader_builder(&self) -> IndexReaderBuilder<Codec> {
IndexReaderBuilder::new(self.clone())
}
/// Opens a new directory from an index path.
#[cfg(feature = "mmap")]
pub fn open_in_dir<P: AsRef<Path>>(directory_path: P) -> crate::Result<Index> {
let mmap_directory = MmapDirectory::open(directory_path)?;
Index::open(mmap_directory)
}
/// Returns the list of the segment metas tracked by the index.
///
/// Such segments can of course be part of the index,
@@ -506,16 +592,6 @@ impl Index {
self.inventory.new_segment_meta(segment_id, max_doc)
}
/// Open the index using the provided directory
pub fn open<T: Into<Box<dyn Directory>>>(directory: T) -> crate::Result<Index> {
let directory = directory.into();
let directory = ManagedDirectory::wrap(directory)?;
let inventory = SegmentMetaInventory::default();
let metas = load_metas(&directory, &inventory)?;
let index = Index::open_from_metas(directory, &metas, inventory);
Ok(index)
}
/// Reads the index meta file from the directory.
pub fn load_metas(&self) -> crate::Result<IndexMeta> {
load_metas(self.directory(), &self.inventory)
@@ -539,7 +615,7 @@ impl Index {
pub fn writer_with_options<D: Document>(
&self,
options: IndexWriterOptions,
) -> crate::Result<IndexWriter<D>> {
) -> crate::Result<IndexWriter<Codec, D>> {
let directory_lock = self
.directory
.acquire_lock(&INDEX_WRITER_LOCK)
@@ -581,7 +657,7 @@ impl Index {
&self,
num_threads: usize,
overall_memory_budget_in_bytes: usize,
) -> crate::Result<IndexWriter<D>> {
) -> crate::Result<IndexWriter<Codec, D>> {
let memory_arena_in_bytes_per_thread = overall_memory_budget_in_bytes / num_threads;
let options = IndexWriterOptions::builder()
.num_worker_threads(num_threads)
@@ -595,7 +671,7 @@ impl Index {
/// That index writer only simply has a single thread and a memory budget of 15 MB.
/// Using a single thread gives us a deterministic allocation of DocId.
#[cfg(test)]
pub fn writer_for_tests<D: Document>(&self) -> crate::Result<IndexWriter<D>> {
pub fn writer_for_tests<D: Document>(&self) -> crate::Result<IndexWriter<Codec, D>> {
self.writer_with_num_threads(1, MEMORY_BUDGET_NUM_BYTES_MIN)
}
@@ -613,7 +689,7 @@ impl Index {
pub fn writer<D: Document>(
&self,
memory_budget_in_bytes: usize,
) -> crate::Result<IndexWriter<D>> {
) -> crate::Result<IndexWriter<Codec, D>> {
let mut num_threads = std::cmp::min(available_parallelism()?.get(), MAX_NUM_THREAD);
let memory_budget_num_bytes_per_thread = memory_budget_in_bytes / num_threads;
if memory_budget_num_bytes_per_thread < MEMORY_BUDGET_NUM_BYTES_MIN {
@@ -640,7 +716,7 @@ impl Index {
}
/// Returns the list of segments that are searchable
pub fn searchable_segments(&self) -> crate::Result<Vec<Segment>> {
pub fn searchable_segments(&self) -> crate::Result<Vec<Segment<Codec>>> {
Ok(self
.searchable_segment_metas()?
.into_iter()
@@ -649,12 +725,12 @@ impl Index {
}
#[doc(hidden)]
pub fn segment(&self, segment_meta: SegmentMeta) -> Segment {
pub fn segment(&self, segment_meta: SegmentMeta) -> Segment<Codec> {
Segment::for_index(self.clone(), segment_meta)
}
/// Creates a new segment.
pub fn new_segment(&self) -> Segment {
pub fn new_segment(&self) -> Segment<Codec> {
let segment_meta = self
.inventory
.new_segment_meta(SegmentId::generate_random(), 0);

View File

@@ -7,6 +7,7 @@ use std::sync::Arc;
use serde::{Deserialize, Serialize};
use super::SegmentComponent;
use crate::codec::{Codec, CodecConfiguration};
use crate::index::SegmentId;
use crate::schema::Schema;
use crate::store::Compressor;
@@ -320,6 +321,7 @@ pub struct IndexMeta {
/// This payload is entirely unused by tantivy.
#[serde(skip_serializing_if = "Option::is_none")]
pub payload: Option<String>,
pub codec: CodecConfiguration,
}
#[derive(Deserialize, Debug)]
@@ -331,6 +333,8 @@ struct UntrackedIndexMeta {
pub opstamp: Opstamp,
#[serde(skip_serializing_if = "Option::is_none")]
pub payload: Option<String>,
#[serde(default)]
pub codec: CodecConfiguration,
}
impl UntrackedIndexMeta {
@@ -345,6 +349,7 @@ impl UntrackedIndexMeta {
schema: self.schema,
opstamp: self.opstamp,
payload: self.payload,
codec: self.codec,
}
}
}
@@ -355,13 +360,14 @@ impl IndexMeta {
///
/// This new index does not contains any segments.
/// Opstamp will the value `0u64`.
pub fn with_schema(schema: Schema) -> IndexMeta {
pub fn with_schema_and_codec<C: Codec>(schema: Schema, codec: &C) -> IndexMeta {
IndexMeta {
index_settings: IndexSettings::default(),
segments: vec![],
schema,
opstamp: 0u64,
payload: None,
codec: CodecConfiguration::from_codec(codec),
}
}
@@ -412,11 +418,12 @@ mod tests {
schema,
opstamp: 0u64,
payload: None,
codec: Default::default(),
};
let json = serde_json::ser::to_string(&index_metas).expect("serialization failed");
assert_eq!(
json,
r#"{"index_settings":{"docstore_compression":"none","docstore_blocksize":16384},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"#
r#"{"index_settings":{"docstore_compression":"none","docstore_blocksize":16384},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0,"codec":{"name":"standard"}}"#
);
let deser_meta: UntrackedIndexMeta = serde_json::from_str(&json).unwrap();

View File

@@ -2,6 +2,7 @@ use std::fmt;
use std::path::PathBuf;
use super::SegmentComponent;
use crate::codec::StandardCodec;
use crate::directory::error::{OpenReadError, OpenWriteError};
use crate::directory::{Directory, FileSlice, WritePtr};
use crate::index::{Index, SegmentId, SegmentMeta};
@@ -10,25 +11,25 @@ use crate::Opstamp;
/// A segment is a piece of the index.
#[derive(Clone)]
pub struct Segment {
index: Index,
pub struct Segment<C: crate::codec::Codec = StandardCodec> {
index: Index<C>,
meta: SegmentMeta,
}
impl fmt::Debug for Segment {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
impl<C: crate::codec::Codec> fmt::Debug for Segment<C> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Segment({:?})", self.id().uuid_string())
}
}
impl Segment {
impl<C: crate::codec::Codec> Segment<C> {
/// Creates a new segment given an `Index` and a `SegmentId`
pub(crate) fn for_index(index: Index, meta: SegmentMeta) -> Segment {
pub(crate) fn for_index(index: Index<C>, meta: SegmentMeta) -> Segment<C> {
Segment { index, meta }
}
/// Returns the index the segment belongs to.
pub fn index(&self) -> &Index {
pub fn index(&self) -> &Index<C> {
&self.index
}
@@ -46,7 +47,7 @@ impl Segment {
///
/// This method is only used when updating `max_doc` from 0
/// as we finalize a fresh new segment.
pub fn with_max_doc(self, max_doc: u32) -> Segment {
pub fn with_max_doc(self, max_doc: u32) -> Segment<C> {
Segment {
index: self.index,
meta: self.meta.with_max_doc(max_doc),
@@ -55,7 +56,7 @@ impl Segment {
#[doc(hidden)]
#[must_use]
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: Opstamp) -> Segment {
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: Opstamp) -> Segment<C> {
Segment {
index: self.index,
meta: self.meta.with_delete_meta(num_deleted_docs, opstamp),

View File

@@ -140,13 +140,13 @@ impl SegmentReader {
}
/// Open a new segment for reading.
pub fn open(segment: &Segment) -> crate::Result<SegmentReader> {
pub fn open<C: crate::codec::Codec>(segment: &Segment<C>) -> crate::Result<SegmentReader> {
Self::open_with_custom_alive_set(segment, None)
}
/// Open a new segment for reading.
pub fn open_with_custom_alive_set(
segment: &Segment,
pub fn open_with_custom_alive_set<C: crate::codec::Codec>(
segment: &Segment<C>,
custom_bitset: Option<AliveBitSet>,
) -> crate::Result<SegmentReader> {
let termdict_file = segment.open_read(SegmentComponent::Terms)?;

View File

@@ -9,6 +9,7 @@ use smallvec::smallvec;
use super::operation::{AddOperation, UserOperation};
use super::segment_updater::SegmentUpdater;
use super::{AddBatch, AddBatchReceiver, AddBatchSender, PreparedCommit};
use crate::codec::{Codec, StandardCodec};
use crate::directory::{DirectoryLock, GarbageCollectionResult, TerminatingWrite};
use crate::error::TantivyError;
use crate::fastfield::write_alive_bitset;
@@ -68,12 +69,12 @@ pub struct IndexWriterOptions {
/// indexing queue.
/// Each indexing thread builds its own independent [`Segment`], via
/// a `SegmentWriter` object.
pub struct IndexWriter<D: Document = TantivyDocument> {
pub struct IndexWriter<C: Codec = StandardCodec, D: Document = TantivyDocument> {
// the lock is just used to bind the
// lifetime of the lock with that of the IndexWriter.
_directory_lock: Option<DirectoryLock>,
index: Index,
index: Index<C>,
options: IndexWriterOptions,
@@ -82,7 +83,7 @@ pub struct IndexWriter<D: Document = TantivyDocument> {
index_writer_status: IndexWriterStatus<D>,
operation_sender: AddBatchSender<D>,
segment_updater: SegmentUpdater,
segment_updater: SegmentUpdater<C>,
worker_id: usize,
@@ -128,8 +129,8 @@ fn compute_deleted_bitset(
/// is `==` target_opstamp.
/// For instance, there was no delete operation between the state of the `segment_entry` and
/// the `target_opstamp`, `segment_entry` is not updated.
pub fn advance_deletes(
mut segment: Segment,
pub fn advance_deletes<C: Codec>(
mut segment: Segment<C>,
segment_entry: &mut SegmentEntry,
target_opstamp: Opstamp,
) -> crate::Result<()> {
@@ -179,11 +180,11 @@ pub fn advance_deletes(
Ok(())
}
fn index_documents<D: Document>(
fn index_documents<C: crate::codec::Codec, D: Document>(
memory_budget: usize,
segment: Segment,
segment: Segment<C>,
grouped_document_iterator: &mut dyn Iterator<Item = AddBatch<D>>,
segment_updater: &SegmentUpdater,
segment_updater: &SegmentUpdater<C>,
mut delete_cursor: DeleteCursor,
) -> crate::Result<()> {
let mut segment_writer = SegmentWriter::for_segment(memory_budget, segment.clone())?;
@@ -226,8 +227,8 @@ fn index_documents<D: Document>(
}
/// `doc_opstamps` is required to be non-empty.
fn apply_deletes(
segment: &Segment,
fn apply_deletes<C: crate::codec::Codec>(
segment: &Segment<C>,
delete_cursor: &mut DeleteCursor,
doc_opstamps: &[Opstamp],
) -> crate::Result<Option<BitSet>> {
@@ -262,7 +263,7 @@ fn apply_deletes(
})
}
impl<D: Document> IndexWriter<D> {
impl<C: Codec, D: Document> IndexWriter<C, D> {
/// Create a new index writer. Attempts to acquire a lockfile.
///
/// The lockfile should be deleted on drop, but it is possible
@@ -278,7 +279,7 @@ impl<D: Document> IndexWriter<D> {
/// If the memory arena per thread is too small or too big, returns
/// `TantivyError::InvalidArgument`
pub(crate) fn new(
index: &Index,
index: &Index<C>,
options: IndexWriterOptions,
directory_lock: DirectoryLock,
) -> crate::Result<Self> {
@@ -345,7 +346,7 @@ impl<D: Document> IndexWriter<D> {
}
/// Accessor to the index.
pub fn index(&self) -> &Index {
pub fn index(&self) -> &Index<C> {
&self.index
}
@@ -393,7 +394,7 @@ impl<D: Document> IndexWriter<D> {
/// It is safe to start writing file associated with the new `Segment`.
/// These will not be garbage collected as long as an instance object of
/// `SegmentMeta` object associated with the new `Segment` is "alive".
pub fn new_segment(&self) -> Segment {
pub fn new_segment(&self) -> Segment<C> {
self.index.new_segment()
}
@@ -615,7 +616,7 @@ impl<D: Document> IndexWriter<D> {
/// It is also possible to add a payload to the `commit`
/// using this API.
/// See [`PreparedCommit::set_payload()`].
pub fn prepare_commit(&mut self) -> crate::Result<PreparedCommit<'_, D>> {
pub fn prepare_commit(&mut self) -> crate::Result<PreparedCommit<'_, C, D>> {
// Here, because we join all of the worker threads,
// all of the segment update for this commit have been
// sent.
@@ -665,7 +666,7 @@ impl<D: Document> IndexWriter<D> {
self.prepare_commit()?.commit()
}
pub(crate) fn segment_updater(&self) -> &SegmentUpdater {
pub(crate) fn segment_updater(&self) -> &SegmentUpdater<C> {
&self.segment_updater
}
@@ -804,7 +805,7 @@ impl<D: Document> IndexWriter<D> {
}
}
impl<D: Document> Drop for IndexWriter<D> {
impl<C: Codec, D: Document> Drop for IndexWriter<C, D> {
fn drop(&mut self) {
self.segment_updater.kill();
self.drop_sender();

View File

@@ -145,7 +145,10 @@ fn extract_fast_field_required_columns(schema: &Schema) -> Vec<(String, ColumnTy
}
impl IndexMerger {
pub fn open(schema: Schema, segments: &[Segment]) -> crate::Result<IndexMerger> {
pub fn open<C: crate::codec::Codec>(
schema: Schema,
segments: &[Segment<C>],
) -> crate::Result<IndexMerger> {
let alive_bitset = segments.iter().map(|_| None).collect_vec();
Self::open_with_custom_alive_set(schema, segments, alive_bitset)
}
@@ -162,9 +165,9 @@ impl IndexMerger {
// This can be used to merge but also apply an additional filter.
// One use case is demux, which is basically taking a list of
// segments and partitions them e.g. by a value in a field.
pub fn open_with_custom_alive_set(
pub fn open_with_custom_alive_set<C: crate::codec::Codec>(
schema: Schema,
segments: &[Segment],
segments: &[Segment<C>],
alive_bitset_opt: Vec<Option<AliveBitSet>>,
) -> crate::Result<IndexMerger> {
let mut readers = vec![];
@@ -525,7 +528,10 @@ impl IndexMerger {
///
/// # Returns
/// The number of documents in the resulting segment.
pub fn write(&self, mut serializer: SegmentSerializer) -> crate::Result<u32> {
pub fn write<C: crate::codec::Codec>(
&self,
mut serializer: SegmentSerializer<C>,
) -> crate::Result<u32> {
let doc_id_mapping = self.get_doc_id_from_concatenated_data()?;
debug!("write-fieldnorms");
if let Some(fieldnorms_serializer) = serializer.extract_fieldnorms_serializer() {

View File

@@ -1,16 +1,17 @@
use super::IndexWriter;
use crate::codec::Codec;
use crate::schema::document::Document;
use crate::{FutureResult, Opstamp, TantivyDocument};
/// A prepared commit
pub struct PreparedCommit<'a, D: Document = TantivyDocument> {
index_writer: &'a mut IndexWriter<D>,
pub struct PreparedCommit<'a, C: Codec, D: Document = TantivyDocument> {
index_writer: &'a mut IndexWriter<C, D>,
payload: Option<String>,
opstamp: Opstamp,
}
impl<'a, D: Document> PreparedCommit<'a, D> {
pub(crate) fn new(index_writer: &'a mut IndexWriter<D>, opstamp: Opstamp) -> Self {
impl<'a, C: Codec, D: Document> PreparedCommit<'a, C, D> {
pub(crate) fn new(index_writer: &'a mut IndexWriter<C, D>, opstamp: Opstamp) -> Self {
Self {
index_writer,
payload: None,

View File

@@ -8,17 +8,17 @@ use crate::store::StoreWriter;
/// Segment serializer is in charge of laying out on disk
/// the data accumulated and sorted by the `SegmentWriter`.
pub struct SegmentSerializer {
segment: Segment,
pub struct SegmentSerializer<C: crate::codec::Codec> {
segment: Segment<C>,
pub(crate) store_writer: StoreWriter,
fast_field_write: WritePtr,
fieldnorms_serializer: Option<FieldNormsSerializer>,
postings_serializer: InvertedIndexSerializer,
}
impl SegmentSerializer {
impl<C: crate::codec::Codec> SegmentSerializer<C> {
/// Creates a new `SegmentSerializer`.
pub fn for_segment(mut segment: Segment) -> crate::Result<SegmentSerializer> {
pub fn for_segment(mut segment: Segment<C>) -> crate::Result<SegmentSerializer<C>> {
let settings = segment.index().settings().clone();
let store_writer = {
let store_write = segment.open_write(SegmentComponent::Store)?;
@@ -50,7 +50,7 @@ impl SegmentSerializer {
self.store_writer.mem_usage()
}
pub fn segment(&self) -> &Segment {
pub fn segment(&self) -> &Segment<C> {
&self.segment
}

View File

@@ -10,6 +10,7 @@ use std::sync::{Arc, RwLock};
use rayon::{ThreadPool, ThreadPoolBuilder};
use super::segment_manager::SegmentManager;
use crate::codec::{Codec, CodecConfiguration};
use crate::core::META_FILEPATH;
use crate::directory::{Directory, DirectoryClone, GarbageCollectionResult};
use crate::fastfield::AliveBitSet;
@@ -61,10 +62,10 @@ pub(crate) fn save_metas(metas: &IndexMeta, directory: &dyn Directory) -> crate:
// We voluntarily pass a merge_operation ref to guarantee that
// the merge_operation is alive during the process
#[derive(Clone)]
pub(crate) struct SegmentUpdater(Arc<InnerSegmentUpdater>);
pub(crate) struct SegmentUpdater<C: Codec>(Arc<InnerSegmentUpdater<C>>);
impl Deref for SegmentUpdater {
type Target = InnerSegmentUpdater;
impl<C: Codec> Deref for SegmentUpdater<C> {
type Target = InnerSegmentUpdater<C>;
#[inline]
fn deref(&self) -> &Self::Target {
@@ -72,8 +73,8 @@ impl Deref for SegmentUpdater {
}
}
fn garbage_collect_files(
segment_updater: SegmentUpdater,
fn garbage_collect_files<C: Codec>(
segment_updater: SegmentUpdater<C>,
) -> crate::Result<GarbageCollectionResult> {
info!("Running garbage collection");
let mut index = segment_updater.index.clone();
@@ -84,8 +85,8 @@ fn garbage_collect_files(
/// Merges a list of segments the list of segment givens in the `segment_entries`.
/// This function happens in the calling thread and is computationally expensive.
fn merge(
index: &Index,
fn merge<Codec: crate::codec::Codec>(
index: &Index<Codec>,
mut segment_entries: Vec<SegmentEntry>,
target_opstamp: Opstamp,
) -> crate::Result<Option<SegmentEntry>> {
@@ -108,7 +109,7 @@ fn merge(
let delete_cursor = segment_entries[0].delete_cursor().clone();
let segments: Vec<Segment> = segment_entries
let segments: Vec<Segment<Codec>> = segment_entries
.iter()
.map(|segment_entry| index.segment(segment_entry.meta().clone()))
.collect();
@@ -139,10 +140,10 @@ fn merge(
/// meant to work if you have an `IndexWriter` running for the origin indices, or
/// the destination `Index`.
#[doc(hidden)]
pub fn merge_indices<T: Into<Box<dyn Directory>>>(
indices: &[Index],
output_directory: T,
) -> crate::Result<Index> {
pub fn merge_indices<Codec: crate::codec::Codec>(
indices: &[Index<Codec>],
output_directory: Box<dyn Directory>,
) -> crate::Result<Index<Codec>> {
if indices.is_empty() {
// If there are no indices to merge, there is no need to do anything.
return Err(crate::TantivyError::InvalidArgument(
@@ -163,7 +164,7 @@ pub fn merge_indices<T: Into<Box<dyn Directory>>>(
));
}
let mut segments: Vec<Segment> = Vec::new();
let mut segments: Vec<Segment<Codec>> = Vec::new();
for index in indices {
segments.extend(index.searchable_segments()?);
}
@@ -185,12 +186,12 @@ pub fn merge_indices<T: Into<Box<dyn Directory>>>(
/// meant to work if you have an `IndexWriter` running for the origin indices, or
/// the destination `Index`.
#[doc(hidden)]
pub fn merge_filtered_segments<T: Into<Box<dyn Directory>>>(
segments: &[Segment],
pub fn merge_filtered_segments<Codec: crate::codec::Codec, T: Into<Box<dyn Directory>>>(
segments: &[Segment<Codec>],
target_settings: IndexSettings,
filter_doc_ids: Vec<Option<AliveBitSet>>,
output_directory: T,
) -> crate::Result<Index> {
) -> crate::Result<Index<Codec>> {
if segments.is_empty() {
// If there are no indices to merge, there is no need to do anything.
return Err(crate::TantivyError::InvalidArgument(
@@ -211,11 +212,12 @@ pub fn merge_filtered_segments<T: Into<Box<dyn Directory>>>(
));
}
let mut merged_index = Index::create(
output_directory,
target_schema.clone(),
target_settings.clone(),
)?;
let mut merged_index: Index<Codec> = Index::builder()
.schema(target_schema.clone())
.codec(segments[0].index().codec().clone())
.settings(target_settings.clone())
.create(output_directory.into())?;
let merged_segment = merged_index.new_segment();
let merged_segment_id = merged_segment.id();
let merger: IndexMerger =
@@ -235,6 +237,7 @@ pub fn merge_filtered_segments<T: Into<Box<dyn Directory>>>(
))
.trim_end()
);
let codec_configuration = CodecConfiguration::from_codec(segments[0].index().codec());
let index_meta = IndexMeta {
index_settings: target_settings, // index_settings of all segments should be the same
@@ -242,6 +245,7 @@ pub fn merge_filtered_segments<T: Into<Box<dyn Directory>>>(
schema: target_schema,
opstamp: 0u64,
payload: Some(stats),
codec: codec_configuration,
};
// save the meta.json
@@ -250,7 +254,7 @@ pub fn merge_filtered_segments<T: Into<Box<dyn Directory>>>(
Ok(merged_index)
}
pub(crate) struct InnerSegmentUpdater {
pub(crate) struct InnerSegmentUpdater<C: Codec> {
// we keep a copy of the current active IndexMeta to
// avoid loading the file every time we need it in the
// `SegmentUpdater`.
@@ -261,7 +265,7 @@ pub(crate) struct InnerSegmentUpdater {
pool: ThreadPool,
merge_thread_pool: ThreadPool,
index: Index,
index: Index<C>,
segment_manager: SegmentManager,
merge_policy: RwLock<Arc<dyn MergePolicy>>,
killed: AtomicBool,
@@ -269,13 +273,13 @@ pub(crate) struct InnerSegmentUpdater {
merge_operations: MergeOperationInventory,
}
impl SegmentUpdater {
impl<Codec: crate::codec::Codec> SegmentUpdater<Codec> {
pub fn create(
index: Index,
index: Index<Codec>,
stamper: Stamper,
delete_cursor: &DeleteCursor,
num_merge_threads: usize,
) -> crate::Result<SegmentUpdater> {
) -> crate::Result<Self> {
let segments = index.searchable_segment_metas()?;
let segment_manager = SegmentManager::from_segments(segments, delete_cursor);
let pool = ThreadPoolBuilder::new()
@@ -404,12 +408,14 @@ impl SegmentUpdater {
//
// Segment 1 from disk 1, Segment 1 from disk 2, etc.
committed_segment_metas.sort_by_key(|segment_meta| -(segment_meta.max_doc() as i32));
let codec = CodecConfiguration::from_codec(index.codec());
let index_meta = IndexMeta {
index_settings: index.settings().clone(),
segments: committed_segment_metas,
schema: index.schema(),
opstamp,
payload: commit_message,
codec,
};
// TODO add context to the error.
save_metas(&index_meta, directory.box_clone().borrow_mut())?;
@@ -443,7 +449,7 @@ impl SegmentUpdater {
opstamp: Opstamp,
payload: Option<String>,
) -> FutureResult<Opstamp> {
let segment_updater: SegmentUpdater = self.clone();
let segment_updater: SegmentUpdater<Codec> = self.clone();
self.schedule_task(move || {
let segment_entries = segment_updater.purge_deletes(opstamp)?;
segment_updater.segment_manager.commit(segment_entries);
@@ -702,6 +708,7 @@ impl SegmentUpdater {
#[cfg(test)]
mod tests {
use super::merge_indices;
use crate::codec::StandardCodec;
use crate::collector::TopDocs;
use crate::directory::RamDirectory;
use crate::fastfield::AliveBitSet;
@@ -915,7 +922,7 @@ mod tests {
#[test]
fn test_merge_empty_indices_array() {
let merge_result = merge_indices(&[], RamDirectory::default());
let merge_result = merge_indices::<StandardCodec>(&[], Box::new(RamDirectory::default()));
assert!(merge_result.is_err());
}
@@ -942,7 +949,10 @@ mod tests {
};
// mismatched schema index list
let result = merge_indices(&[first_index, second_index], RamDirectory::default());
let result = merge_indices(
&[first_index, second_index],
Box::new(RamDirectory::default()),
);
assert!(result.is_err());
Ok(())

View File

@@ -4,6 +4,7 @@ use itertools::Itertools;
use tokenizer_api::BoxTokenStream;
use super::operation::AddOperation;
use crate::codec::Codec;
use crate::fastfield::FastFieldsWriter;
use crate::fieldnorm::{FieldNormReaders, FieldNormsWriter};
use crate::index::{Segment, SegmentComponent};
@@ -45,11 +46,11 @@ fn compute_initial_table_size(per_thread_memory_budget: usize) -> crate::Result<
///
/// They creates the postings list in anonymous memory.
/// The segment is laid on disk when the segment gets `finalized`.
pub struct SegmentWriter {
pub struct SegmentWriter<Codec: crate::codec::Codec> {
pub(crate) max_doc: DocId,
pub(crate) ctx: IndexingContext,
pub(crate) per_field_postings_writers: PerFieldPostingsWriter,
pub(crate) segment_serializer: SegmentSerializer,
pub(crate) segment_serializer: SegmentSerializer<Codec>,
pub(crate) fast_field_writers: FastFieldsWriter,
pub(crate) fieldnorms_writer: FieldNormsWriter,
pub(crate) json_path_writer: JsonPathWriter,
@@ -60,7 +61,7 @@ pub struct SegmentWriter {
schema: Schema,
}
impl SegmentWriter {
impl<Codec: crate::codec::Codec> SegmentWriter<Codec> {
/// Creates a new `SegmentWriter`
///
/// The arguments are defined as follows
@@ -70,7 +71,10 @@ impl SegmentWriter {
/// behavior as a memory limit.
/// - segment: The segment being written
/// - schema
pub fn for_segment(memory_budget_in_bytes: usize, segment: Segment) -> crate::Result<Self> {
pub fn for_segment(
memory_budget_in_bytes: usize,
segment: Segment<Codec>,
) -> crate::Result<Self> {
let schema = segment.schema();
let tokenizer_manager = segment.index().tokenizers().clone();
let tokenizer_manager_fast_field = segment.index().fast_field_tokenizer().clone();
@@ -386,13 +390,13 @@ impl SegmentWriter {
/// to the `SegmentSerializer`.
///
/// `doc_id_map` is used to map to the new doc_id order.
fn remap_and_write(
fn remap_and_write<C: Codec>(
schema: Schema,
per_field_postings_writers: &PerFieldPostingsWriter,
ctx: IndexingContext,
fast_field_writers: FastFieldsWriter,
fieldnorms_writer: &FieldNormsWriter,
mut serializer: SegmentSerializer,
mut serializer: SegmentSerializer<C>,
) -> crate::Result<()> {
debug!("remap-and-write");
if let Some(fieldnorms_serializer) = serializer.extract_fieldnorms_serializer() {

View File

@@ -1,5 +1,6 @@
use std::marker::PhantomData;
use crate::codec::CodecConfiguration;
use crate::indexer::operation::AddOperation;
use crate::indexer::segment_updater::save_metas;
use crate::indexer::SegmentWriter;
@@ -7,22 +8,22 @@ use crate::schema::document::Document;
use crate::{Directory, Index, IndexMeta, Opstamp, Segment, TantivyDocument};
#[doc(hidden)]
pub struct SingleSegmentIndexWriter<D: Document = TantivyDocument> {
segment_writer: SegmentWriter,
segment: Segment,
pub struct SingleSegmentIndexWriter<Codec: crate::codec::Codec, D: Document = TantivyDocument> {
segment_writer: SegmentWriter<Codec>,
segment: Segment<Codec>,
opstamp: Opstamp,
_phantom: PhantomData<D>,
_doc: PhantomData<D>,
}
impl<D: Document> SingleSegmentIndexWriter<D> {
pub fn new(index: Index, mem_budget: usize) -> crate::Result<Self> {
impl<Codec: crate::codec::Codec, D: Document> SingleSegmentIndexWriter<Codec, D> {
pub fn new(index: Index<Codec>, mem_budget: usize) -> crate::Result<Self> {
let segment = index.new_segment();
let segment_writer = SegmentWriter::for_segment(mem_budget, segment.clone())?;
Ok(Self {
segment_writer,
segment,
opstamp: 0,
_phantom: PhantomData,
_doc: PhantomData,
})
}
@@ -37,10 +38,10 @@ impl<D: Document> SingleSegmentIndexWriter<D> {
.add_document(AddOperation { opstamp, document })
}
pub fn finalize(self) -> crate::Result<Index> {
pub fn finalize(self) -> crate::Result<Index<Codec>> {
let max_doc = self.segment_writer.max_doc();
self.segment_writer.finalize()?;
let segment: Segment = self.segment.with_max_doc(max_doc);
let segment: Segment<Codec> = self.segment.with_max_doc(max_doc);
let index = segment.index();
let index_meta = IndexMeta {
index_settings: index.settings().clone(),
@@ -48,6 +49,7 @@ impl<D: Document> SingleSegmentIndexWriter<D> {
schema: index.schema(),
opstamp: 0,
payload: None,
codec: CodecConfiguration::from_codec(index.codec()),
};
save_metas(&index_meta, index.directory())?;
index.directory().sync_directory()?;

View File

@@ -166,6 +166,7 @@ mod functional_test;
#[macro_use]
mod macros;
pub mod codec;
mod future_result;
// Re-exports

View File

@@ -22,12 +22,6 @@ pub(crate) struct JsonPostingsWriter<Rec: Recorder> {
non_str_posting_writer: SpecializedPostingsWriter<DocIdRecorder>,
}
impl<Rec: Recorder> From<JsonPostingsWriter<Rec>> for Box<dyn PostingsWriter> {
fn from(json_postings_writer: JsonPostingsWriter<Rec>) -> Box<dyn PostingsWriter> {
Box::new(json_postings_writer)
}
}
impl<Rec: Recorder> PostingsWriter for JsonPostingsWriter<Rec> {
#[inline]
fn subscribe(

View File

@@ -1,16 +1,15 @@
use crate::postings::json_postings_writer::JsonPostingsWriter;
use crate::postings::postings_writer::SpecializedPostingsWriter;
use crate::postings::postings_writer::{PostingsWriterEnum, SpecializedPostingsWriter};
use crate::postings::recorder::{DocIdRecorder, TermFrequencyRecorder, TfAndPositionRecorder};
use crate::postings::PostingsWriter;
use crate::schema::{Field, FieldEntry, FieldType, IndexRecordOption, Schema};
pub(crate) struct PerFieldPostingsWriter {
per_field_postings_writers: Vec<Box<dyn PostingsWriter>>,
per_field_postings_writers: Vec<PostingsWriterEnum>,
}
impl PerFieldPostingsWriter {
pub fn for_schema(schema: &Schema) -> Self {
let per_field_postings_writers = schema
let per_field_postings_writers: Vec<PostingsWriterEnum> = schema
.fields()
.map(|(_, field_entry)| posting_writer_from_field_entry(field_entry))
.collect();
@@ -19,16 +18,16 @@ impl PerFieldPostingsWriter {
}
}
pub(crate) fn get_for_field(&self, field: Field) -> &dyn PostingsWriter {
self.per_field_postings_writers[field.field_id() as usize].as_ref()
pub(crate) fn get_for_field(&self, field: Field) -> &PostingsWriterEnum {
&self.per_field_postings_writers[field.field_id() as usize]
}
pub(crate) fn get_for_field_mut(&mut self, field: Field) -> &mut dyn PostingsWriter {
self.per_field_postings_writers[field.field_id() as usize].as_mut()
pub(crate) fn get_for_field_mut(&mut self, field: Field) -> &mut PostingsWriterEnum {
&mut self.per_field_postings_writers[field.field_id() as usize]
}
}
fn posting_writer_from_field_entry(field_entry: &FieldEntry) -> Box<dyn PostingsWriter> {
fn posting_writer_from_field_entry(field_entry: &FieldEntry) -> PostingsWriterEnum {
match *field_entry.field_type() {
FieldType::Str(ref text_options) => text_options
.get_indexing_options()
@@ -51,7 +50,7 @@ fn posting_writer_from_field_entry(field_entry: &FieldEntry) -> Box<dyn Postings
| FieldType::Date(_)
| FieldType::Bytes(_)
| FieldType::IpAddr(_)
| FieldType::Facet(_) => Box::<SpecializedPostingsWriter<DocIdRecorder>>::default(),
| FieldType::Facet(_) => <SpecializedPostingsWriter<DocIdRecorder>>::default().into(),
FieldType::JsonObject(ref json_object_options) => {
if let Some(text_indexing_option) = json_object_options.get_text_indexing_options() {
match text_indexing_option.index_option() {

View File

@@ -7,7 +7,10 @@ use stacker::Addr;
use crate::fieldnorm::FieldNormReaders;
use crate::indexer::indexing_term::IndexingTerm;
use crate::indexer::path_to_unordered_id::OrderedPathId;
use crate::postings::recorder::{BufferLender, Recorder};
use crate::postings::json_postings_writer::JsonPostingsWriter;
use crate::postings::recorder::{
BufferLender, DocIdRecorder, Recorder, TermFrequencyRecorder, TfAndPositionRecorder,
};
use crate::postings::{
FieldSerializer, IndexingContext, InvertedIndexSerializer, PerFieldPostingsWriter,
};
@@ -100,6 +103,141 @@ pub(crate) struct IndexingPosition {
pub end_position: u32,
}
pub enum PostingsWriterEnum {
DocId(SpecializedPostingsWriter<DocIdRecorder>),
DocIdTf(SpecializedPostingsWriter<TermFrequencyRecorder>),
DocTfAndPosition(SpecializedPostingsWriter<TfAndPositionRecorder>),
JsonDocId(JsonPostingsWriter<DocIdRecorder>),
JsonDocIdTf(JsonPostingsWriter<TermFrequencyRecorder>),
JsonDocTfAndPosition(JsonPostingsWriter<TfAndPositionRecorder>),
}
impl From<SpecializedPostingsWriter<DocIdRecorder>> for PostingsWriterEnum {
fn from(doc_id_recorder_writer: SpecializedPostingsWriter<DocIdRecorder>) -> Self {
PostingsWriterEnum::DocId(doc_id_recorder_writer)
}
}
impl From<SpecializedPostingsWriter<TermFrequencyRecorder>> for PostingsWriterEnum {
fn from(doc_id_tf_recorder_writer: SpecializedPostingsWriter<TermFrequencyRecorder>) -> Self {
PostingsWriterEnum::DocIdTf(doc_id_tf_recorder_writer)
}
}
impl From<SpecializedPostingsWriter<TfAndPositionRecorder>> for PostingsWriterEnum {
fn from(
doc_id_tf_and_positions_recorder_writer: SpecializedPostingsWriter<TfAndPositionRecorder>,
) -> Self {
PostingsWriterEnum::DocTfAndPosition(doc_id_tf_and_positions_recorder_writer)
}
}
impl From<JsonPostingsWriter<DocIdRecorder>> for PostingsWriterEnum {
fn from(doc_id_recorder_writer: JsonPostingsWriter<DocIdRecorder>) -> Self {
PostingsWriterEnum::JsonDocId(doc_id_recorder_writer)
}
}
impl From<JsonPostingsWriter<TermFrequencyRecorder>> for PostingsWriterEnum {
fn from(doc_id_tf_recorder_writer: JsonPostingsWriter<TermFrequencyRecorder>) -> Self {
PostingsWriterEnum::JsonDocIdTf(doc_id_tf_recorder_writer)
}
}
impl From<JsonPostingsWriter<TfAndPositionRecorder>> for PostingsWriterEnum {
fn from(
doc_id_tf_and_positions_recorder_writer: JsonPostingsWriter<TfAndPositionRecorder>,
) -> Self {
PostingsWriterEnum::JsonDocTfAndPosition(doc_id_tf_and_positions_recorder_writer)
}
}
impl PostingsWriter for PostingsWriterEnum {
fn subscribe(&mut self, doc: DocId, pos: u32, term: &IndexingTerm, ctx: &mut IndexingContext) {
match self {
PostingsWriterEnum::DocId(writer) => writer.subscribe(doc, pos, term, ctx),
PostingsWriterEnum::DocIdTf(writer) => writer.subscribe(doc, pos, term, ctx),
PostingsWriterEnum::DocTfAndPosition(writer) => writer.subscribe(doc, pos, term, ctx),
PostingsWriterEnum::JsonDocId(writer) => writer.subscribe(doc, pos, term, ctx),
PostingsWriterEnum::JsonDocIdTf(writer) => writer.subscribe(doc, pos, term, ctx),
PostingsWriterEnum::JsonDocTfAndPosition(writer) => {
writer.subscribe(doc, pos, term, ctx)
}
}
}
fn serialize(
&self,
term_addrs: &[(Field, OrderedPathId, &[u8], Addr)],
ordered_id_to_path: &[&str],
ctx: &IndexingContext,
serializer: &mut FieldSerializer,
) -> io::Result<()> {
match self {
PostingsWriterEnum::DocId(writer) => {
writer.serialize(term_addrs, ordered_id_to_path, ctx, serializer)
}
PostingsWriterEnum::DocIdTf(writer) => {
writer.serialize(term_addrs, ordered_id_to_path, ctx, serializer)
}
PostingsWriterEnum::DocTfAndPosition(writer) => {
writer.serialize(term_addrs, ordered_id_to_path, ctx, serializer)
}
PostingsWriterEnum::JsonDocId(writer) => {
writer.serialize(term_addrs, ordered_id_to_path, ctx, serializer)
}
PostingsWriterEnum::JsonDocIdTf(writer) => {
writer.serialize(term_addrs, ordered_id_to_path, ctx, serializer)
}
PostingsWriterEnum::JsonDocTfAndPosition(writer) => {
writer.serialize(term_addrs, ordered_id_to_path, ctx, serializer)
}
}
}
/// Tokenize a text and subscribe all of its token.
fn index_text(
&mut self,
doc_id: DocId,
token_stream: &mut dyn TokenStream,
term_buffer: &mut IndexingTerm,
ctx: &mut IndexingContext,
indexing_position: &mut IndexingPosition,
) {
match self {
PostingsWriterEnum::DocId(writer) => {
writer.index_text(doc_id, token_stream, term_buffer, ctx, indexing_position)
}
PostingsWriterEnum::DocIdTf(writer) => {
writer.index_text(doc_id, token_stream, term_buffer, ctx, indexing_position)
}
PostingsWriterEnum::DocTfAndPosition(writer) => {
writer.index_text(doc_id, token_stream, term_buffer, ctx, indexing_position)
}
PostingsWriterEnum::JsonDocId(writer) => {
writer.index_text(doc_id, token_stream, term_buffer, ctx, indexing_position)
}
PostingsWriterEnum::JsonDocIdTf(writer) => {
writer.index_text(doc_id, token_stream, term_buffer, ctx, indexing_position)
}
PostingsWriterEnum::JsonDocTfAndPosition(writer) => {
writer.index_text(doc_id, token_stream, term_buffer, ctx, indexing_position)
}
}
}
fn total_num_tokens(&self) -> u64 {
match self {
PostingsWriterEnum::DocId(writer) => writer.total_num_tokens(),
PostingsWriterEnum::DocIdTf(writer) => writer.total_num_tokens(),
PostingsWriterEnum::DocTfAndPosition(writer) => writer.total_num_tokens(),
PostingsWriterEnum::JsonDocId(writer) => writer.total_num_tokens(),
PostingsWriterEnum::JsonDocIdTf(writer) => writer.total_num_tokens(),
PostingsWriterEnum::JsonDocTfAndPosition(writer) => writer.total_num_tokens(),
}
}
}
/// The `PostingsWriter` is in charge of receiving documenting
/// and building a `Segment` in anonymous memory.
///
@@ -171,14 +309,6 @@ pub(crate) struct SpecializedPostingsWriter<Rec: Recorder> {
_recorder_type: PhantomData<Rec>,
}
impl<Rec: Recorder> From<SpecializedPostingsWriter<Rec>> for Box<dyn PostingsWriter> {
fn from(
specialized_postings_writer: SpecializedPostingsWriter<Rec>,
) -> Box<dyn PostingsWriter> {
Box::new(specialized_postings_writer)
}
}
impl<Rec: Recorder> SpecializedPostingsWriter<Rec> {
#[inline]
pub(crate) fn serialize_one_term(

View File

@@ -70,13 +70,13 @@ impl SegmentPostings {
let mut buffer = Vec::new();
{
let mut postings_serializer =
PostingsSerializer::new(&mut buffer, 0.0, IndexRecordOption::Basic, None);
PostingsSerializer::new(0.0, IndexRecordOption::Basic, None);
postings_serializer.new_term(docs.len() as u32, false);
for &doc in docs {
postings_serializer.write_doc(doc, 1u32);
}
postings_serializer
.close_term(docs.len() as u32)
.close_term(docs.len() as u32, &mut buffer)
.expect("In memory Serialization should never fail.");
}
let block_segment_postings = BlockSegmentPostings::open(
@@ -115,7 +115,6 @@ impl SegmentPostings {
})
.unwrap_or(0.0);
let mut postings_serializer = PostingsSerializer::new(
&mut buffer,
average_field_norm,
IndexRecordOption::WithFreqs,
fieldnorm_reader,
@@ -125,7 +124,7 @@ impl SegmentPostings {
postings_serializer.write_doc(doc, tf);
}
postings_serializer
.close_term(doc_and_tfs.len() as u32)
.close_term(doc_and_tfs.len() as u32, &mut buffer)
.unwrap();
let block_segment_postings = BlockSegmentPostings::open(
doc_and_tfs.len() as u32,

View File

@@ -55,7 +55,9 @@ pub struct InvertedIndexSerializer {
impl InvertedIndexSerializer {
/// Open a new `InvertedIndexSerializer` for the given segment
pub fn open(segment: &mut Segment) -> crate::Result<InvertedIndexSerializer> {
pub fn open<C: crate::codec::Codec>(
segment: &mut Segment<C>,
) -> crate::Result<InvertedIndexSerializer> {
use crate::index::SegmentComponent::{Positions, Postings, Terms};
let inv_index_serializer = InvertedIndexSerializer {
terms_write: CompositeWrite::wrap(segment.open_write(Terms)?),
@@ -104,10 +106,12 @@ impl InvertedIndexSerializer {
/// the serialization of a specific field.
pub struct FieldSerializer<'a> {
term_dictionary_builder: TermDictionaryBuilder<&'a mut CountingWriter<WritePtr>>,
postings_serializer: PostingsSerializer<&'a mut CountingWriter<WritePtr>>,
postings_serializer: PostingsSerializer,
positions_serializer_opt: Option<PositionSerializer<&'a mut CountingWriter<WritePtr>>>,
current_term_info: TermInfo,
term_open: bool,
postings_write: &'a mut CountingWriter<WritePtr>,
postings_start_offset: u64,
}
impl<'a> FieldSerializer<'a> {
@@ -128,27 +132,30 @@ impl<'a> FieldSerializer<'a> {
.as_ref()
.map(|ff_reader| total_num_tokens as Score / ff_reader.num_docs() as Score)
.unwrap_or(0.0);
let postings_serializer = PostingsSerializer::new(
postings_write,
average_fieldnorm,
index_record_option,
fieldnorm_reader,
);
let postings_serializer =
PostingsSerializer::new(average_fieldnorm, index_record_option, fieldnorm_reader);
let positions_serializer_opt = if index_record_option.has_positions() {
Some(PositionSerializer::new(positions_write))
} else {
None
};
let postings_start_offset = postings_write.written_bytes();
Ok(FieldSerializer {
term_dictionary_builder,
postings_serializer,
positions_serializer_opt,
current_term_info: TermInfo::default(),
term_open: false,
postings_write,
postings_start_offset,
})
}
fn postings_offset(&self) -> usize {
(self.postings_write.written_bytes() - self.postings_start_offset) as usize
}
fn current_term_info(&self) -> TermInfo {
let positions_start =
if let Some(positions_serializer) = self.positions_serializer_opt.as_ref() {
@@ -156,7 +163,7 @@ impl<'a> FieldSerializer<'a> {
} else {
0u64
} as usize;
let addr = self.postings_serializer.written_bytes() as usize;
let addr = self.postings_offset();
TermInfo {
doc_freq: 0,
postings_range: addr..addr,
@@ -213,21 +220,22 @@ impl<'a> FieldSerializer<'a> {
crate::fail_point!("FieldSerializer::close_term", |msg: Option<String>| {
Err(io::Error::new(io::ErrorKind::Other, format!("{msg:?}")))
});
if self.term_open {
self.postings_serializer
.close_term(self.current_term_info.doc_freq)?;
self.current_term_info.postings_range.end =
self.postings_serializer.written_bytes() as usize;
if let Some(positions_serializer) = self.positions_serializer_opt.as_mut() {
positions_serializer.close_term()?;
self.current_term_info.positions_range.end =
positions_serializer.written_bytes() as usize;
}
self.term_dictionary_builder
.insert_value(&self.current_term_info)?;
self.term_open = false;
if !self.term_open {
return Ok(());
};
self.postings_serializer
.close_term(self.current_term_info.doc_freq, self.postings_write)?;
self.current_term_info.postings_range.end = self.postings_offset();
if let Some(positions_serializer) = self.positions_serializer_opt.as_mut() {
positions_serializer.close_term()?;
self.current_term_info.positions_range.end =
positions_serializer.written_bytes() as usize;
}
self.term_dictionary_builder
.insert_value(&self.current_term_info)?;
self.term_open = false;
Ok(())
}
@@ -237,7 +245,7 @@ impl<'a> FieldSerializer<'a> {
if let Some(positions_serializer) = self.positions_serializer_opt {
positions_serializer.close()?;
}
self.postings_serializer.close()?;
self.postings_write.flush()?;
self.term_dictionary_builder.finish()?;
Ok(())
}
@@ -291,8 +299,7 @@ impl Block {
}
}
pub struct PostingsSerializer<W: Write> {
output_write: CountingWriter<W>,
pub struct PostingsSerializer {
last_doc_id_encoded: u32,
block_encoder: BlockEncoder,
@@ -310,16 +317,13 @@ pub struct PostingsSerializer<W: Write> {
term_has_freq: bool,
}
impl<W: Write> PostingsSerializer<W> {
impl PostingsSerializer {
pub fn new(
write: W,
avg_fieldnorm: Score,
mode: IndexRecordOption,
fieldnorm_reader: Option<FieldNormReader>,
) -> PostingsSerializer<W> {
) -> PostingsSerializer {
PostingsSerializer {
output_write: CountingWriter::wrap(write),
block_encoder: BlockEncoder::new(),
block: Box::new(Block::new()),
@@ -422,11 +426,11 @@ impl<W: Write> PostingsSerializer<W> {
}
}
fn close(mut self) -> io::Result<()> {
self.postings_write.flush()
}
pub fn close_term(&mut self, doc_freq: u32) -> io::Result<()> {
pub fn close_term(
&mut self,
doc_freq: u32,
output_write: &mut impl std::io::Write,
) -> io::Result<()> {
if !self.block.is_empty() {
// we have doc ids waiting to be written
// this happens when the number of doc ids is
@@ -451,26 +455,16 @@ impl<W: Write> PostingsSerializer<W> {
}
if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 {
let skip_data = self.skip_write.data();
VInt(skip_data.len() as u64).serialize(&mut self.output_write)?;
self.output_write.write_all(skip_data)?;
VInt(skip_data.len() as u64).serialize(output_write)?;
output_write.write_all(skip_data)?;
}
self.output_write.write_all(&self.postings_write[..])?;
output_write.write_all(&self.postings_write[..])?;
self.skip_write.clear();
self.postings_write.clear();
self.bm25_weight = None;
Ok(())
}
/// Returns the number of bytes written in the postings write object
/// at this point.
/// When called before writing the postings of a term, this value is used as
/// start offset.
/// When called after writing the postings of a term, this value is used as a
/// end offset.
fn written_bytes(&self) -> u64 {
self.output_write.written_bytes()
}
fn clear(&mut self) {
self.block.clear();
self.last_doc_id_encoded = 0;

View File

@@ -7,6 +7,7 @@ use arc_swap::ArcSwap;
pub use warming::Warmer;
use self::warming::WarmingState;
use crate::codec::Codec;
use crate::core::searcher::{SearcherGeneration, SearcherInner};
use crate::directory::{Directory, WatchCallback, WatchHandle, META_LOCK};
use crate::store::DOCSTORE_CACHE_CAPACITY;
@@ -38,17 +39,17 @@ pub enum ReloadPolicy {
/// - number of warming threads, for parallelizing warming work
/// - The cache size of the underlying doc store readers.
#[derive(Clone)]
pub struct IndexReaderBuilder {
pub struct IndexReaderBuilder<C: Codec = crate::codec::StandardCodec> {
reload_policy: ReloadPolicy,
index: Index,
index: Index<C>,
warmers: Vec<Weak<dyn Warmer>>,
num_warming_threads: usize,
doc_store_cache_num_blocks: usize,
}
impl IndexReaderBuilder {
impl<C: Codec> IndexReaderBuilder<C> {
#[must_use]
pub(crate) fn new(index: Index) -> IndexReaderBuilder {
pub(crate) fn new(index: Index<C>) -> IndexReaderBuilder<C> {
IndexReaderBuilder {
reload_policy: ReloadPolicy::OnCommitWithDelay,
index,
@@ -63,7 +64,7 @@ impl IndexReaderBuilder {
/// Building the reader is a non-trivial operation that requires
/// to open different segment readers. It may take hundreds of milliseconds
/// of time and it may return an error.
pub fn try_into(self) -> crate::Result<IndexReader> {
pub fn try_into(self) -> crate::Result<IndexReader<C>> {
let searcher_generation_inventory = Inventory::default();
let warming_state = WarmingState::new(
self.num_warming_threads,
@@ -106,7 +107,7 @@ impl IndexReaderBuilder {
///
/// See [`ReloadPolicy`] for more details.
#[must_use]
pub fn reload_policy(mut self, reload_policy: ReloadPolicy) -> IndexReaderBuilder {
pub fn reload_policy(mut self, reload_policy: ReloadPolicy) -> IndexReaderBuilder<C> {
self.reload_policy = reload_policy;
self
}
@@ -118,14 +119,14 @@ impl IndexReaderBuilder {
pub fn doc_store_cache_num_blocks(
mut self,
doc_store_cache_num_blocks: usize,
) -> IndexReaderBuilder {
) -> IndexReaderBuilder<C> {
self.doc_store_cache_num_blocks = doc_store_cache_num_blocks;
self
}
/// Set the [`Warmer`]s that are invoked when reloading searchable segments.
#[must_use]
pub fn warmers(mut self, warmers: Vec<Weak<dyn Warmer>>) -> IndexReaderBuilder {
pub fn warmers(mut self, warmers: Vec<Weak<dyn Warmer>>) -> IndexReaderBuilder<C> {
self.warmers = warmers;
self
}
@@ -135,33 +136,33 @@ impl IndexReaderBuilder {
/// This allows parallelizing warming work when there are multiple [`Warmer`] registered with
/// the [`IndexReader`].
#[must_use]
pub fn num_warming_threads(mut self, num_warming_threads: usize) -> IndexReaderBuilder {
pub fn num_warming_threads(mut self, num_warming_threads: usize) -> IndexReaderBuilder<C> {
self.num_warming_threads = num_warming_threads;
self
}
}
impl TryInto<IndexReader> for IndexReaderBuilder {
impl<C: Codec> TryInto<IndexReader<C>> for IndexReaderBuilder<C> {
type Error = crate::TantivyError;
fn try_into(self) -> crate::Result<IndexReader> {
fn try_into(self) -> crate::Result<IndexReader<C>> {
IndexReaderBuilder::try_into(self)
}
}
struct InnerIndexReader {
struct InnerIndexReader<C: Codec> {
doc_store_cache_num_blocks: usize,
index: Index,
index: Index<C>,
warming_state: WarmingState,
searcher: arc_swap::ArcSwap<SearcherInner>,
searcher_generation_counter: Arc<AtomicU64>,
searcher_generation_inventory: Inventory<SearcherGeneration>,
}
impl InnerIndexReader {
impl<C: Codec> InnerIndexReader<C> {
fn new(
doc_store_cache_num_blocks: usize,
index: Index,
index: Index<C>,
warming_state: WarmingState,
// The searcher_generation_inventory is not used as source, but as target to track the
// loaded segments.
@@ -189,7 +190,7 @@ impl InnerIndexReader {
///
/// This function acquires a lock to prevent GC from removing files
/// as we are opening our index.
fn open_segment_readers(index: &Index) -> crate::Result<Vec<SegmentReader>> {
fn open_segment_readers(index: &Index<C>) -> crate::Result<Vec<SegmentReader>> {
// Prevents segment files from getting deleted while we are in the process of opening them
let _meta_lock = index.directory().acquire_lock(&META_LOCK)?;
let searchable_segments = index.searchable_segments()?;
@@ -212,7 +213,7 @@ impl InnerIndexReader {
}
fn create_searcher(
index: &Index,
index: &Index<C>,
doc_store_cache_num_blocks: usize,
warming_state: &WarmingState,
searcher_generation_counter: &Arc<AtomicU64>,
@@ -226,9 +227,10 @@ impl InnerIndexReader {
);
let schema = index.schema();
// SearcherInner uses Index<StandardCodec> since the codec doesn't affect reading
let searcher = Arc::new(SearcherInner::new(
schema,
index.clone(),
index.with_standard_codec(),
segment_readers,
searcher_generation,
doc_store_cache_num_blocks,
@@ -264,14 +266,14 @@ impl InnerIndexReader {
///
/// `IndexReader` just wraps an `Arc`.
#[derive(Clone)]
pub struct IndexReader {
inner: Arc<InnerIndexReader>,
pub struct IndexReader<C: Codec = crate::codec::StandardCodec> {
inner: Arc<InnerIndexReader<C>>,
_watch_handle_opt: Option<WatchHandle>,
}
impl IndexReader {
impl<C: Codec> IndexReader<C> {
#[cfg(test)]
pub(crate) fn index(&self) -> Index {
pub(crate) fn index(&self) -> Index<C> {
self.inner.index.clone()
}