Compare commits

..

4 Commits

Author SHA1 Message Date
trinity Pointard
5ad731aaf0 upgrade some dependancies
including rand, which had a few breaking changes
2026-01-13 23:08:07 +01:00
trinity-1686a
0c94eb94c3 Merge pull request #2799 from jollygreenlaser/lru 2026-01-13 22:47:35 +01:00
Paul Masurel
c92e831dde Minor refactoring in PostingsSerializer (#2801)
Removes the Write generics argument in PostingsSerializer.
This removes useless generic.
Prepares the path for codecs.
Removes one useless CountingWrite layer.
etc.

Co-authored-by: Paul Masurel <paul.masurel@datadoghq.com>
2026-01-12 13:53:43 +01:00
Alex Lazar
947c0d5f40 Bump lru to 0.16.3 per dependabot 2026-01-09 23:25:51 -08:00
61 changed files with 306 additions and 1341 deletions

View File

@@ -27,7 +27,7 @@ regex = { version = "1.5.5", default-features = false, features = [
aho-corasick = "1.0"
tantivy-fst = "0.5"
memmap2 = { version = "0.9.0", optional = true }
lz4_flex = { version = "0.11", default-features = false, optional = true }
lz4_flex = { version = "0.12", default-features = false, optional = true }
zstd = { version = "0.13", optional = true, default-features = false }
tempfile = { version = "3.12.0", optional = true }
log = "0.4.16"
@@ -50,7 +50,7 @@ fail = { version = "0.5.0", optional = true }
time = { version = "0.3.35", features = ["serde-well-known"] }
smallvec = "1.8.0"
rayon = "1.5.2"
lru = "0.12.0"
lru = "0.16.3"
fastdivide = "0.4.0"
itertools = "0.14.0"
measure_time = "0.9.0"
@@ -76,7 +76,7 @@ winapi = "0.3.9"
[dev-dependencies]
binggan = "0.14.2"
rand = "0.8.5"
rand = "0.9"
maplit = "1.0.2"
matches = "0.1.9"
pretty_assertions = "1.2.1"
@@ -85,7 +85,7 @@ test-log = "0.2.10"
futures = "0.3.21"
paste = "1.0.11"
more-asserts = "0.3.1"
rand_distr = "0.4.3"
rand_distr = "0.5"
time = { version = "0.3.10", features = ["serde-well-known", "macros"] }
postcard = { version = "1.0.4", features = [
"use-std",

View File

@@ -1,8 +1,8 @@
use binggan::plugins::PeakMemAllocPlugin;
use binggan::{black_box, InputGroup, PeakMemAlloc, INSTRUMENTED_SYSTEM};
use rand::distributions::WeightedIndex;
use rand::prelude::SliceRandom;
use rand::distr::weighted::WeightedIndex;
use rand::rngs::StdRng;
use rand::seq::IndexedRandom;
use rand::{Rng, SeedableRng};
use rand_distr::Distribution;
use serde_json::json;
@@ -532,7 +532,7 @@ fn get_test_index_bench(cardinality: Cardinality) -> tantivy::Result<Index> {
// Prepare 1000 unique terms sampled using a Zipf distribution.
// Exponent ~1.1 approximates top-20 terms covering around ~20%.
let terms_1000: Vec<String> = (1..=1000).map(|i| format!("term_{i}")).collect();
let zipf_1000 = rand_distr::Zipf::new(1000, 1.1f64).unwrap();
let zipf_1000 = rand_distr::Zipf::new(1000.0, 1.1f64).unwrap();
{
let mut rng = StdRng::from_seed([1u8; 32]);
@@ -576,8 +576,8 @@ fn get_test_index_bench(cardinality: Cardinality) -> tantivy::Result<Index> {
}
let _val_max = 1_000_000.0;
for _ in 0..doc_with_value {
let val: f64 = rng.gen_range(0.0..1_000_000.0);
let json = if rng.gen_bool(0.1) {
let val: f64 = rng.random_range(0.0..1_000_000.0);
let json = if rng.random_bool(0.1) {
// 10% are numeric values
json!({ "mixed_type": val })
} else {
@@ -586,7 +586,7 @@ fn get_test_index_bench(cardinality: Cardinality) -> tantivy::Result<Index> {
index_writer.add_document(doc!(
text_field => "cool",
json_field => json,
text_field_all_unique_terms => format!("unique_term_{}", rng.gen::<u64>()),
text_field_all_unique_terms => format!("unique_term_{}", rng.random::<u64>()),
text_field_many_terms => many_terms_data.choose(&mut rng).unwrap().to_string(),
text_field_few_terms_status => status_field_data[log_level_distribution.sample(&mut rng)].0,
text_field_1000_terms_zipf => terms_1000[zipf_1000.sample(&mut rng) as usize - 1].as_str(),

View File

@@ -55,29 +55,29 @@ fn build_shared_indices(num_docs: usize, p_a: f32, p_b: f32, p_c: f32) -> (Bench
{
let mut writer = index.writer_with_num_threads(1, 500_000_000).unwrap();
for _ in 0..num_docs {
let has_a = rng.gen_bool(p_a as f64);
let has_b = rng.gen_bool(p_b as f64);
let has_c = rng.gen_bool(p_c as f64);
let score = rng.gen_range(0u64..100u64);
let score2 = rng.gen_range(0u64..100_000u64);
let has_a = rng.random_bool(p_a as f64);
let has_b = rng.random_bool(p_b as f64);
let has_c = rng.random_bool(p_c as f64);
let score = rng.random_range(0u64..100u64);
let score2 = rng.random_range(0u64..100_000u64);
let mut title_tokens: Vec<&str> = Vec::new();
let mut body_tokens: Vec<&str> = Vec::new();
if has_a {
if rng.gen_bool(0.1) {
if rng.random_bool(0.1) {
title_tokens.push("a");
} else {
body_tokens.push("a");
}
}
if has_b {
if rng.gen_bool(0.1) {
if rng.random_bool(0.1) {
title_tokens.push("b");
} else {
body_tokens.push("b");
}
}
if has_c {
if rng.gen_bool(0.1) {
if rng.random_bool(0.1) {
title_tokens.push("c");
} else {
body_tokens.push("c");

View File

@@ -36,13 +36,13 @@ fn build_shared_indices(num_docs: usize, p_title_a: f32, distribution: &str) ->
"dense" => {
for doc_id in 0..num_docs {
// Always add title to avoid empty documents
let title_token = if rng.gen_bool(p_title_a as f64) {
let title_token = if rng.random_bool(p_title_a as f64) {
"a"
} else {
"b"
};
let num_rand = rng.gen_range(0u64..1000u64);
let num_rand = rng.random_range(0u64..1000u64);
let num_asc = (doc_id / 10000) as u64;
@@ -60,13 +60,13 @@ fn build_shared_indices(num_docs: usize, p_title_a: f32, distribution: &str) ->
"sparse" => {
for doc_id in 0..num_docs {
// Always add title to avoid empty documents
let title_token = if rng.gen_bool(p_title_a as f64) {
let title_token = if rng.random_bool(p_title_a as f64) {
"a"
} else {
"b"
};
let num_rand = rng.gen_range(0u64..10000000u64);
let num_rand = rng.random_range(0u64..10000000u64);
let num_asc = doc_id as u64;

View File

@@ -33,7 +33,7 @@ fn build_shared_indices(num_docs: usize, distribution: &str) -> BenchIndex {
match distribution {
"dense" => {
for doc_id in 0..num_docs {
let num_rand = rng.gen_range(0u64..1000u64);
let num_rand = rng.random_range(0u64..1000u64);
let num_asc = (doc_id / 10000) as u64;
writer
@@ -46,7 +46,7 @@ fn build_shared_indices(num_docs: usize, distribution: &str) -> BenchIndex {
}
"sparse" => {
for doc_id in 0..num_docs {
let num_rand = rng.gen_range(0u64..10000000u64);
let num_rand = rng.random_range(0u64..10000000u64);
let num_asc = doc_id as u64;
writer

View File

@@ -97,20 +97,20 @@ fn get_index_0_to_100() -> Index {
let num_vals = 100_000;
let docs: Vec<_> = (0..num_vals)
.map(|_i| {
let id_name = if rng.gen_bool(0.01) {
let id_name = if rng.random_bool(0.01) {
"veryfew".to_string() // 1%
} else if rng.gen_bool(0.1) {
} else if rng.random_bool(0.1) {
"few".to_string() // 9%
} else {
"most".to_string() // 90%
};
Doc {
id_name,
id: rng.gen_range(0..100),
id: rng.random_range(0..100),
// Multiply by 1000, so that we create most buckets in the compact space
// The benches depend on this range to select n-percent of elements with the
// methods below.
ip: Ipv6Addr::from_u128(rng.gen_range(0..100) * 1000),
ip: Ipv6Addr::from_u128(rng.random_range(0..100) * 1000),
}
})
.collect();

View File

@@ -18,5 +18,5 @@ homepage = "https://github.com/quickwit-oss/tantivy"
bitpacking = { version = "0.9.2", default-features = false, features = ["bitpacker1x"] }
[dev-dependencies]
rand = "0.8"
rand = "0.9"
proptest = "1"

View File

@@ -4,8 +4,8 @@ extern crate test;
#[cfg(test)]
mod tests {
use rand::rng;
use rand::seq::IteratorRandom;
use rand::thread_rng;
use tantivy_bitpacker::{BitPacker, BitUnpacker, BlockedBitpacker};
use test::Bencher;
@@ -27,7 +27,7 @@ mod tests {
let num_els = 1_000_000u32;
let bit_unpacker = BitUnpacker::new(bit_width);
let data = create_bitpacked_data(bit_width, num_els);
let idxs: Vec<u32> = (0..num_els).choose_multiple(&mut thread_rng(), 100_000);
let idxs: Vec<u32> = (0..num_els).choose_multiple(&mut rng(), 100_000);
b.iter(|| {
let mut out = 0u64;
for &idx in &idxs {

View File

@@ -22,7 +22,7 @@ downcast-rs = "2.0.1"
[dev-dependencies]
proptest = "1"
more-asserts = "0.3.1"
rand = "0.8"
rand = "0.9"
binggan = "0.14.0"
[[bench]]

View File

@@ -9,7 +9,7 @@ use tantivy_columnar::column_values::{CodecType, serialize_and_load_u64_based_co
fn get_data() -> Vec<u64> {
let mut rng = StdRng::seed_from_u64(2u64);
let mut data: Vec<_> = (100..55_000_u64)
.map(|num| num + rng.r#gen::<u8>() as u64)
.map(|num| num + rng.random::<u8>() as u64)
.collect();
data.push(99_000);
data.insert(1000, 2000);

View File

@@ -6,7 +6,7 @@ use tantivy_columnar::column_values::{CodecType, serialize_u64_based_column_valu
fn get_data() -> Vec<u64> {
let mut rng = StdRng::seed_from_u64(2u64);
let mut data: Vec<_> = (100..55_000_u64)
.map(|num| num + rng.r#gen::<u8>() as u64)
.map(|num| num + rng.random::<u8>() as u64)
.collect();
data.push(99_000);
data.insert(1000, 2000);

View File

@@ -8,7 +8,7 @@ const TOTAL_NUM_VALUES: u32 = 1_000_000;
fn gen_optional_index(fill_ratio: f64) -> OptionalIndex {
let mut rng: StdRng = StdRng::from_seed([1u8; 32]);
let vals: Vec<u32> = (0..TOTAL_NUM_VALUES)
.map(|_| rng.gen_bool(fill_ratio))
.map(|_| rng.random_bool(fill_ratio))
.enumerate()
.filter(|(_pos, val)| *val)
.map(|(pos, _)| pos as u32)
@@ -25,7 +25,7 @@ fn random_range_iterator(
let mut rng: StdRng = StdRng::from_seed([1u8; 32]);
let mut current = start;
std::iter::from_fn(move || {
current += rng.gen_range(avg_step_size - avg_deviation..=avg_step_size + avg_deviation);
current += rng.random_range(avg_step_size - avg_deviation..=avg_step_size + avg_deviation);
if current >= end { None } else { Some(current) }
})
}

View File

@@ -39,7 +39,7 @@ fn get_data_50percent_item() -> Vec<u128> {
let mut data = vec![];
for _ in 0..300_000 {
let val = rng.gen_range(1..=100);
let val = rng.random_range(1..=100);
data.push(val);
}
data.push(SINGLE_ITEM);

View File

@@ -34,7 +34,7 @@ fn get_data_50percent_item() -> Vec<u128> {
let mut data = vec![];
for _ in 0..300_000 {
let val = rng.gen_range(1..=100);
let val = rng.random_range(1..=100);
data.push(val);
}
data.push(SINGLE_ITEM);

View File

@@ -268,7 +268,7 @@ mod tests {
#[test]
fn linear_interpol_fast_field_rand() {
let mut rng = rand::thread_rng();
let mut rng = rand::rng();
for _ in 0..50 {
let mut data = (0..10_000).map(|_| rng.next_u64()).collect::<Vec<_>>();
create_and_validate::<LinearCodec>(&data, "random");

View File

@@ -122,7 +122,7 @@ pub(crate) fn create_and_validate<TColumnCodec: ColumnCodec>(
assert_eq!(vals, buffer);
if !vals.is_empty() {
let test_rand_idx = rand::thread_rng().gen_range(0..=vals.len() - 1);
let test_rand_idx = rand::rng().random_range(0..=vals.len() - 1);
let expected_positions: Vec<u32> = vals
.iter()
.enumerate()

View File

@@ -21,5 +21,5 @@ serde = { version = "1.0.136", features = ["derive"] }
[dev-dependencies]
binggan = "0.14.0"
proptest = "1.0.0"
rand = "0.8.4"
rand = "0.9"

View File

@@ -1,6 +1,6 @@
use binggan::{BenchRunner, black_box};
use rand::rng;
use rand::seq::IteratorRandom;
use rand::thread_rng;
use tantivy_common::{BitSet, TinySet, serialize_vint_u32};
fn bench_vint() {
@@ -17,7 +17,7 @@ fn bench_vint() {
black_box(out);
});
let vals: Vec<u32> = (0..20_000).choose_multiple(&mut thread_rng(), 100_000);
let vals: Vec<u32> = (0..20_000).choose_multiple(&mut rng(), 100_000);
runner.bench_function("bench_vint_rand", move |_| {
let mut out = 0u64;
for val in vals.iter().cloned() {

View File

@@ -416,7 +416,7 @@ mod tests {
use std::collections::HashSet;
use ownedbytes::OwnedBytes;
use rand::distributions::Bernoulli;
use rand::distr::Bernoulli;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};

View File

@@ -1,49 +0,0 @@
mod postings;
mod standard;
use std::borrow::Cow;
use serde::{Deserialize, Serialize};
pub use standard::StandardCodec;
pub trait Codec: Clone + std::fmt::Debug + Send + Sync + 'static {
type PostingsCodec;
const NAME: &'static str;
fn from_json_props(json_value: &serde_json::Value) -> crate::Result<Self>;
fn to_json_props(&self) -> serde_json::Value;
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct CodecConfiguration {
name: Cow<'static, str>,
#[serde(default, skip_serializing_if = "serde_json::Value::is_null")]
props: serde_json::Value,
}
impl CodecConfiguration {
pub fn from_codec<C: Codec>(codec: &C) -> Self {
CodecConfiguration {
name: Cow::Borrowed(C::NAME),
props: codec.to_json_props(),
}
}
pub fn to_codec<C: Codec>(&self) -> crate::Result<C> {
if self.name != C::NAME {
return Err(crate::TantivyError::InvalidArgument(format!(
"Codec name mismatch: expected {}, got {}",
C::NAME,
self.name
)));
}
C::from_json_props(&self.props)
}
}
impl Default for CodecConfiguration {
fn default() -> Self {
CodecConfiguration::from_codec(&StandardCodec)
}
}

View File

@@ -1,23 +0,0 @@
use std::io;
use crate::fieldnorm::FieldNormReader;
use crate::schema::IndexRecordOption;
use crate::{DocId, Score};
pub trait PostingsCodec {
type PostingsSerializer: PostingsSerializer;
}
pub trait PostingsSerializer {
fn new(
avg_fieldnorm: Score,
mode: IndexRecordOption,
fieldnorm_reader: Option<FieldNormReader>,
) -> Self;
fn new_term(&mut self, term_doc_freq: u32, record_term_freq: bool);
fn write_doc(&mut self, doc_id: DocId, term_freq: u32);
fn close_term(&mut self, doc_freq: u32, wrt: &mut impl io::Write) -> io::Result<()>;
}

View File

@@ -1,29 +0,0 @@
use serde::{Deserialize, Serialize};
use crate::codec::standard::postings::StandardPostingsCodec;
use crate::codec::Codec;
mod postings;
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct StandardCodec;
impl Codec for StandardCodec {
type PostingsCodec = StandardPostingsCodec;
const NAME: &'static str = "standard";
fn from_json_props(json_value: &serde_json::Value) -> crate::Result<Self> {
if !json_value.is_null() {
return Err(crate::TantivyError::InvalidArgument(format!(
"Codec property for the StandardCodec are unexpected. expected null, got {}",
json_value.as_str().unwrap_or("null")
)));
}
Ok(StandardCodec)
}
fn to_json_props(&self) -> serde_json::Value {
serde_json::Value::Null
}
}

View File

@@ -1,50 +0,0 @@
use crate::postings::compression::COMPRESSION_BLOCK_SIZE;
use crate::DocId;
pub struct Block {
doc_ids: [DocId; COMPRESSION_BLOCK_SIZE],
term_freqs: [u32; COMPRESSION_BLOCK_SIZE],
len: usize,
}
impl Block {
pub fn new() -> Self {
Block {
doc_ids: [0u32; COMPRESSION_BLOCK_SIZE],
term_freqs: [0u32; COMPRESSION_BLOCK_SIZE],
len: 0,
}
}
pub fn doc_ids(&self) -> &[DocId] {
&self.doc_ids[..self.len]
}
pub fn term_freqs(&self) -> &[u32] {
&self.term_freqs[..self.len]
}
pub fn clear(&mut self) {
self.len = 0;
}
pub fn append_doc(&mut self, doc: DocId, term_freq: u32) {
let len = self.len;
self.doc_ids[len] = doc;
self.term_freqs[len] = term_freq;
self.len = len + 1;
}
pub fn is_full(&self) -> bool {
self.len == COMPRESSION_BLOCK_SIZE
}
pub fn is_empty(&self) -> bool {
self.len == 0
}
pub fn last_doc(&self) -> DocId {
assert_eq!(self.len, COMPRESSION_BLOCK_SIZE);
self.doc_ids[COMPRESSION_BLOCK_SIZE - 1]
}
}

View File

@@ -1,13 +0,0 @@
use crate::codec::postings::PostingsCodec;
mod block;
mod postings_serializer;
mod skip;
pub use postings_serializer::StandardPostingsSerializer;
pub struct StandardPostingsCodec;
impl PostingsCodec for StandardPostingsCodec {
type PostingsSerializer = StandardPostingsSerializer;
}

View File

@@ -1,187 +0,0 @@
use std::cmp::Ordering;
use std::io::{self, Write as _};
use common::{BinarySerializable as _, VInt};
use crate::codec::postings::PostingsSerializer;
use crate::codec::standard::postings::block::Block;
use crate::codec::standard::postings::skip::SkipSerializer;
use crate::fieldnorm::FieldNormReader;
use crate::postings::compression::{BlockEncoder, VIntEncoder as _, COMPRESSION_BLOCK_SIZE};
use crate::query::Bm25Weight;
use crate::schema::IndexRecordOption;
use crate::{DocId, Score};
pub struct StandardPostingsSerializer {
last_doc_id_encoded: u32,
block_encoder: BlockEncoder,
block: Box<Block>,
postings_write: Vec<u8>,
skip_write: SkipSerializer,
mode: IndexRecordOption,
fieldnorm_reader: Option<FieldNormReader>,
bm25_weight: Option<Bm25Weight>,
avg_fieldnorm: Score, /* Average number of term in the field for that segment.
* this value is used to compute the block wand information. */
term_has_freq: bool,
}
impl PostingsSerializer for StandardPostingsSerializer {
fn new(
avg_fieldnorm: Score,
mode: IndexRecordOption,
fieldnorm_reader: Option<FieldNormReader>,
) -> StandardPostingsSerializer {
Self {
block_encoder: BlockEncoder::new(),
block: Box::new(Block::new()),
postings_write: Vec::new(),
skip_write: SkipSerializer::new(),
last_doc_id_encoded: 0u32,
mode,
fieldnorm_reader,
bm25_weight: None,
avg_fieldnorm,
term_has_freq: false,
}
}
fn new_term(&mut self, term_doc_freq: u32, record_term_freq: bool) {
self.bm25_weight = None;
self.term_has_freq = self.mode.has_freq() && record_term_freq;
if !self.term_has_freq {
return;
}
let num_docs_in_segment: u64 =
if let Some(fieldnorm_reader) = self.fieldnorm_reader.as_ref() {
fieldnorm_reader.num_docs() as u64
} else {
return;
};
if num_docs_in_segment == 0 {
return;
}
self.bm25_weight = Some(Bm25Weight::for_one_term_without_explain(
term_doc_freq as u64,
num_docs_in_segment,
self.avg_fieldnorm,
));
}
fn write_doc(&mut self, doc_id: DocId, term_freq: u32) {
self.block.append_doc(doc_id, term_freq);
if self.block.is_full() {
self.write_block();
}
}
fn close_term(
&mut self,
doc_freq: u32,
output_write: &mut impl std::io::Write,
) -> io::Result<()> {
if !self.block.is_empty() {
// we have doc ids waiting to be written
// this happens when the number of doc ids is
// not a perfect multiple of our block size.
//
// In that case, the remaining part is encoded
// using variable int encoding.
{
let block_encoded = self
.block_encoder
.compress_vint_sorted(self.block.doc_ids(), self.last_doc_id_encoded);
self.postings_write.write_all(block_encoded)?;
}
// ... Idem for term frequencies
if self.term_has_freq {
let block_encoded = self
.block_encoder
.compress_vint_unsorted(self.block.term_freqs());
self.postings_write.write_all(block_encoded)?;
}
self.block.clear();
}
if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 {
let skip_data = self.skip_write.data();
VInt(skip_data.len() as u64).serialize(output_write)?;
output_write.write_all(skip_data)?;
}
output_write.write_all(&self.postings_write[..])?;
self.skip_write.clear();
self.postings_write.clear();
self.bm25_weight = None;
Ok(())
}
}
impl StandardPostingsSerializer {
fn write_block(&mut self) {
{
// encode the doc ids
let (num_bits, block_encoded): (u8, &[u8]) = self
.block_encoder
.compress_block_sorted(self.block.doc_ids(), self.last_doc_id_encoded);
self.last_doc_id_encoded = self.block.last_doc();
self.skip_write
.write_doc(self.last_doc_id_encoded, num_bits);
// last el block 0, offset block 1,
self.postings_write.extend(block_encoded);
}
if self.term_has_freq {
let (num_bits, block_encoded): (u8, &[u8]) = self
.block_encoder
.compress_block_unsorted(self.block.term_freqs(), true);
self.postings_write.extend(block_encoded);
self.skip_write.write_term_freq(num_bits);
if self.mode.has_positions() {
// We serialize the sum of term freqs within the skip information
// in order to navigate through positions.
let sum_freq = self.block.term_freqs().iter().cloned().sum();
self.skip_write.write_total_term_freq(sum_freq);
}
let mut blockwand_params = (0u8, 0u32);
if let Some(bm25_weight) = self.bm25_weight.as_ref() {
if let Some(fieldnorm_reader) = self.fieldnorm_reader.as_ref() {
let docs = self.block.doc_ids().iter().cloned();
let term_freqs = self.block.term_freqs().iter().cloned();
let fieldnorms = docs.map(|doc| fieldnorm_reader.fieldnorm_id(doc));
blockwand_params = fieldnorms
.zip(term_freqs)
.max_by(
|(left_fieldnorm_id, left_term_freq),
(right_fieldnorm_id, right_term_freq)| {
let left_score =
bm25_weight.tf_factor(*left_fieldnorm_id, *left_term_freq);
let right_score =
bm25_weight.tf_factor(*right_fieldnorm_id, *right_term_freq);
left_score
.partial_cmp(&right_score)
.unwrap_or(Ordering::Equal)
},
)
.unwrap();
}
}
let (fieldnorm_id, term_freq) = blockwand_params;
self.skip_write.write_blockwand_max(fieldnorm_id, term_freq);
}
self.block.clear();
}
fn clear(&mut self) {
self.block.clear();
self.last_doc_id_encoded = 0;
}
}

View File

@@ -1,448 +0,0 @@
use crate::directory::OwnedBytes;
use crate::postings::compression::{compressed_block_size, COMPRESSION_BLOCK_SIZE};
use crate::query::Bm25Weight;
use crate::schema::IndexRecordOption;
use crate::{DocId, Score, TERMINATED};
// doc num bits uses the following encoding:
// given 0b a b cdefgh
// |1|2|3| 4 |
// - 1: unused
// - 2: is delta-1 encoded. 0 if not, 1, if yes
// - 3: unused
// - 4: a 5 bit number in 0..32, the actual bitwidth. Bitpacking could in theory say this is 32
// (requiring a 6th bit), but the biggest doc_id we can want to encode is TERMINATED-1, which can
// be represented on 31b without delta encoding.
fn encode_bitwidth(bitwidth: u8, delta_1: bool) -> u8 {
assert!(bitwidth < 32);
bitwidth | ((delta_1 as u8) << 6)
}
fn decode_bitwidth(raw_bitwidth: u8) -> (u8, bool) {
let delta_1 = ((raw_bitwidth >> 6) & 1) != 0;
let bitwidth = raw_bitwidth & 0x1f;
(bitwidth, delta_1)
}
#[inline]
fn encode_block_wand_max_tf(max_tf: u32) -> u8 {
max_tf.min(u8::MAX as u32) as u8
}
#[inline]
fn decode_block_wand_max_tf(max_tf_code: u8) -> u32 {
if max_tf_code == u8::MAX {
u32::MAX
} else {
max_tf_code as u32
}
}
#[inline]
fn read_u32(data: &[u8]) -> u32 {
u32::from_le_bytes(data[..4].try_into().unwrap())
}
#[inline]
fn write_u32(val: u32, buf: &mut Vec<u8>) {
buf.extend_from_slice(&val.to_le_bytes());
}
pub struct SkipSerializer {
buffer: Vec<u8>,
}
impl SkipSerializer {
pub fn new() -> SkipSerializer {
SkipSerializer { buffer: Vec::new() }
}
pub fn write_doc(&mut self, last_doc: DocId, doc_num_bits: u8) {
write_u32(last_doc, &mut self.buffer);
self.buffer.push(encode_bitwidth(doc_num_bits, true));
}
pub fn write_term_freq(&mut self, tf_num_bits: u8) {
self.buffer.push(tf_num_bits);
}
pub fn write_total_term_freq(&mut self, tf_sum: u32) {
write_u32(tf_sum, &mut self.buffer);
}
pub fn write_blockwand_max(&mut self, fieldnorm_id: u8, term_freq: u32) {
let block_wand_tf = encode_block_wand_max_tf(term_freq);
self.buffer
.extend_from_slice(&[fieldnorm_id, block_wand_tf]);
}
pub fn data(&self) -> &[u8] {
&self.buffer[..]
}
pub fn clear(&mut self) {
self.buffer.clear();
}
}
#[derive(Clone)]
pub(crate) struct SkipReader {
last_doc_in_block: DocId,
pub(crate) last_doc_in_previous_block: DocId,
owned_read: OwnedBytes,
skip_info: IndexRecordOption,
byte_offset: usize,
remaining_docs: u32, // number of docs remaining, including the
// documents in the current block.
block_info: BlockInfo,
position_offset: u64,
}
#[derive(Clone, Eq, PartialEq, Copy, Debug)]
pub(crate) enum BlockInfo {
BitPacked {
doc_num_bits: u8,
strict_delta_encoded: bool,
tf_num_bits: u8,
tf_sum: u32,
block_wand_fieldnorm_id: u8,
block_wand_term_freq: u32,
},
VInt {
num_docs: u32,
},
}
impl Default for BlockInfo {
fn default() -> Self {
BlockInfo::VInt { num_docs: 0u32 }
}
}
impl SkipReader {
pub fn new(data: OwnedBytes, doc_freq: u32, skip_info: IndexRecordOption) -> SkipReader {
let mut skip_reader = SkipReader {
last_doc_in_block: if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 {
0
} else {
TERMINATED
},
last_doc_in_previous_block: 0u32,
owned_read: data,
skip_info,
block_info: BlockInfo::VInt { num_docs: doc_freq },
byte_offset: 0,
remaining_docs: doc_freq,
position_offset: 0u64,
};
if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 {
skip_reader.read_block_info();
}
skip_reader
}
pub fn reset(&mut self, data: OwnedBytes, doc_freq: u32) {
self.last_doc_in_block = if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 {
0
} else {
TERMINATED
};
self.last_doc_in_previous_block = 0u32;
self.owned_read = data;
self.block_info = BlockInfo::VInt { num_docs: doc_freq };
self.byte_offset = 0;
self.remaining_docs = doc_freq;
self.position_offset = 0u64;
if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 {
self.read_block_info();
}
}
// Returns the block max score for this block if available.
//
// The block max score is available for all full bitpacked block,
// but no available for the last VInt encoded incomplete block.
pub fn block_max_score(&self, bm25_weight: &Bm25Weight) -> Option<Score> {
match self.block_info {
BlockInfo::BitPacked {
block_wand_fieldnorm_id,
block_wand_term_freq,
..
} => Some(bm25_weight.score(block_wand_fieldnorm_id, block_wand_term_freq)),
BlockInfo::VInt { .. } => None,
}
}
pub(crate) fn last_doc_in_block(&self) -> DocId {
self.last_doc_in_block
}
pub fn position_offset(&self) -> u64 {
self.position_offset
}
#[inline]
pub fn byte_offset(&self) -> usize {
self.byte_offset
}
fn read_block_info(&mut self) {
let bytes = self.owned_read.as_slice();
let advance_len: usize;
self.last_doc_in_block = read_u32(bytes);
let (doc_num_bits, strict_delta_encoded) = decode_bitwidth(bytes[4]);
match self.skip_info {
IndexRecordOption::Basic => {
advance_len = 5;
self.block_info = BlockInfo::BitPacked {
doc_num_bits,
strict_delta_encoded,
tf_num_bits: 0,
tf_sum: 0,
block_wand_fieldnorm_id: 0,
block_wand_term_freq: 0,
};
}
IndexRecordOption::WithFreqs => {
let tf_num_bits = bytes[5];
let block_wand_fieldnorm_id = bytes[6];
let block_wand_term_freq = decode_block_wand_max_tf(bytes[7]);
advance_len = 8;
self.block_info = BlockInfo::BitPacked {
doc_num_bits,
strict_delta_encoded,
tf_num_bits,
tf_sum: 0,
block_wand_fieldnorm_id,
block_wand_term_freq,
};
}
IndexRecordOption::WithFreqsAndPositions => {
let tf_num_bits = bytes[5];
let tf_sum = read_u32(&bytes[6..10]);
let block_wand_fieldnorm_id = bytes[10];
let block_wand_term_freq = decode_block_wand_max_tf(bytes[11]);
advance_len = 12;
self.block_info = BlockInfo::BitPacked {
doc_num_bits,
strict_delta_encoded,
tf_num_bits,
tf_sum,
block_wand_fieldnorm_id,
block_wand_term_freq,
};
}
}
self.owned_read.advance(advance_len);
}
pub fn block_info(&self) -> BlockInfo {
self.block_info
}
/// Advance the skip reader to the block that may contain the target.
///
/// If the target is larger than all documents, the skip_reader
/// then advance to the last Variable In block.
pub fn seek(&mut self, target: DocId) -> bool {
if self.last_doc_in_block() >= target {
return false;
}
loop {
self.advance();
if self.last_doc_in_block() >= target {
return true;
}
}
}
pub fn advance(&mut self) {
match self.block_info {
BlockInfo::BitPacked {
doc_num_bits,
tf_num_bits,
tf_sum,
..
} => {
self.remaining_docs -= COMPRESSION_BLOCK_SIZE as u32;
self.byte_offset += compressed_block_size(doc_num_bits + tf_num_bits);
self.position_offset += tf_sum as u64;
}
BlockInfo::VInt { num_docs } => {
debug_assert_eq!(num_docs, self.remaining_docs);
self.remaining_docs = 0;
self.byte_offset = usize::MAX;
}
}
self.last_doc_in_previous_block = self.last_doc_in_block;
if self.remaining_docs >= COMPRESSION_BLOCK_SIZE as u32 {
self.read_block_info();
} else {
self.last_doc_in_block = TERMINATED;
self.block_info = BlockInfo::VInt {
num_docs: self.remaining_docs,
};
}
}
}
#[cfg(test)]
mod tests {
use super::{
decode_bitwidth, encode_bitwidth, BlockInfo, IndexRecordOption, SkipReader, SkipSerializer,
};
use crate::directory::OwnedBytes;
use crate::postings::compression::COMPRESSION_BLOCK_SIZE;
#[test]
fn test_encode_block_wand_max_tf() {
for tf in 0..255 {
assert_eq!(super::encode_block_wand_max_tf(tf), tf as u8);
}
for &tf in &[255, 256, 1_000_000, u32::MAX] {
assert_eq!(super::encode_block_wand_max_tf(tf), 255);
}
}
#[test]
fn test_decode_block_wand_max_tf() {
for tf in 0..255 {
assert_eq!(super::decode_block_wand_max_tf(tf), tf as u32);
}
assert_eq!(super::decode_block_wand_max_tf(255), u32::MAX);
}
#[test]
fn test_skip_with_freq() {
let buf = {
let mut skip_serializer = SkipSerializer::new();
skip_serializer.write_doc(1u32, 2u8);
skip_serializer.write_term_freq(3u8);
skip_serializer.write_blockwand_max(13u8, 3u32);
skip_serializer.write_doc(5u32, 5u8);
skip_serializer.write_term_freq(2u8);
skip_serializer.write_blockwand_max(8u8, 2u32);
skip_serializer.data().to_owned()
};
let doc_freq = 3u32 + (COMPRESSION_BLOCK_SIZE * 2) as u32;
let mut skip_reader =
SkipReader::new(OwnedBytes::new(buf), doc_freq, IndexRecordOption::WithFreqs);
assert_eq!(skip_reader.last_doc_in_block(), 1u32);
assert_eq!(
skip_reader.block_info,
BlockInfo::BitPacked {
doc_num_bits: 2u8,
strict_delta_encoded: true,
tf_num_bits: 3u8,
tf_sum: 0,
block_wand_fieldnorm_id: 13,
block_wand_term_freq: 3
}
);
skip_reader.advance();
assert_eq!(skip_reader.last_doc_in_block(), 5u32);
assert_eq!(
skip_reader.block_info(),
BlockInfo::BitPacked {
doc_num_bits: 5u8,
strict_delta_encoded: true,
tf_num_bits: 2u8,
tf_sum: 0,
block_wand_fieldnorm_id: 8,
block_wand_term_freq: 2
}
);
skip_reader.advance();
assert_eq!(skip_reader.block_info(), BlockInfo::VInt { num_docs: 3u32 });
skip_reader.advance();
assert_eq!(skip_reader.block_info(), BlockInfo::VInt { num_docs: 0u32 });
skip_reader.advance();
assert_eq!(skip_reader.block_info(), BlockInfo::VInt { num_docs: 0u32 });
}
#[test]
fn test_skip_no_freq() {
let buf = {
let mut skip_serializer = SkipSerializer::new();
skip_serializer.write_doc(1u32, 2u8);
skip_serializer.write_doc(5u32, 5u8);
skip_serializer.data().to_owned()
};
let doc_freq = 3u32 + (COMPRESSION_BLOCK_SIZE * 2) as u32;
let mut skip_reader =
SkipReader::new(OwnedBytes::new(buf), doc_freq, IndexRecordOption::Basic);
assert_eq!(skip_reader.last_doc_in_block(), 1u32);
assert_eq!(
skip_reader.block_info(),
BlockInfo::BitPacked {
doc_num_bits: 2u8,
strict_delta_encoded: true,
tf_num_bits: 0,
tf_sum: 0u32,
block_wand_fieldnorm_id: 0,
block_wand_term_freq: 0
}
);
skip_reader.advance();
assert_eq!(skip_reader.last_doc_in_block(), 5u32);
assert_eq!(
skip_reader.block_info(),
BlockInfo::BitPacked {
doc_num_bits: 5u8,
strict_delta_encoded: true,
tf_num_bits: 0,
tf_sum: 0u32,
block_wand_fieldnorm_id: 0,
block_wand_term_freq: 0
}
);
skip_reader.advance();
assert_eq!(skip_reader.block_info(), BlockInfo::VInt { num_docs: 3u32 });
skip_reader.advance();
assert_eq!(skip_reader.block_info(), BlockInfo::VInt { num_docs: 0u32 });
skip_reader.advance();
assert_eq!(skip_reader.block_info(), BlockInfo::VInt { num_docs: 0u32 });
}
#[test]
fn test_skip_multiple_of_block_size() {
let buf = {
let mut skip_serializer = SkipSerializer::new();
skip_serializer.write_doc(1u32, 2u8);
skip_serializer.data().to_owned()
};
let doc_freq = COMPRESSION_BLOCK_SIZE as u32;
let mut skip_reader =
SkipReader::new(OwnedBytes::new(buf), doc_freq, IndexRecordOption::Basic);
assert_eq!(skip_reader.last_doc_in_block(), 1u32);
assert_eq!(
skip_reader.block_info(),
BlockInfo::BitPacked {
doc_num_bits: 2u8,
strict_delta_encoded: true,
tf_num_bits: 0,
tf_sum: 0u32,
block_wand_fieldnorm_id: 0,
block_wand_term_freq: 0
}
);
skip_reader.advance();
assert_eq!(skip_reader.block_info(), BlockInfo::VInt { num_docs: 0u32 });
}
#[test]
fn test_encode_decode_bitwidth() {
for bitwidth in 0..32 {
for delta_1 in [false, true] {
assert_eq!(
(bitwidth, delta_1),
decode_bitwidth(encode_bitwidth(bitwidth, delta_1))
);
}
}
assert_eq!(0b01000010, encode_bitwidth(0b10, true));
assert_eq!(0b00000010, encode_bitwidth(0b10, false));
}
}

View File

@@ -486,9 +486,9 @@ mod tests {
use std::collections::BTreeSet;
use columnar::Dictionary;
use rand::distributions::Uniform;
use rand::distr::Uniform;
use rand::prelude::SliceRandom;
use rand::{thread_rng, Rng};
use rand::{rng, Rng};
use super::{FacetCollector, FacetCounts};
use crate::collector::facet_collector::compress_mapping;
@@ -731,7 +731,7 @@ mod tests {
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let uniform = Uniform::new_inclusive(1, 100_000);
let uniform = Uniform::new_inclusive(1, 100_000).unwrap();
let mut docs: Vec<TantivyDocument> =
vec![("a", 10), ("b", 100), ("c", 7), ("d", 12), ("e", 21)]
.into_iter()
@@ -741,14 +741,11 @@ mod tests {
std::iter::repeat_n(doc, count)
})
.map(|mut doc| {
doc.add_facet(
facet_field,
&format!("/facet/{}", thread_rng().sample(uniform)),
);
doc.add_facet(facet_field, &format!("/facet/{}", rng().sample(uniform)));
doc
})
.collect();
docs[..].shuffle(&mut thread_rng());
docs[..].shuffle(&mut rng());
let mut index_writer: IndexWriter = index.writer_for_tests().unwrap();
for doc in docs {
@@ -822,8 +819,8 @@ mod tests {
#[cfg(all(test, feature = "unstable"))]
mod bench {
use rand::rng;
use rand::seq::SliceRandom;
use rand::thread_rng;
use test::Bencher;
use crate::collector::FacetCollector;
@@ -846,7 +843,7 @@ mod bench {
}
}
// 40425 docs
docs[..].shuffle(&mut thread_rng());
docs[..].shuffle(&mut rng());
let mut index_writer: IndexWriter = index.writer_for_tests().unwrap();
for doc in docs {

View File

@@ -160,7 +160,7 @@ mod tests {
expected: &[(crate::Score, usize)],
) {
let mut vals: Vec<(crate::Score, usize)> = (0..10).map(|val| (val as f32, val)).collect();
vals.shuffle(&mut rand::thread_rng());
vals.shuffle(&mut rand::rng());
let vals_merged = merge_top_k(vals.into_iter(), doc_range, ComparatorEnum::from(order));
assert_eq!(&vals_merged, expected);
}

View File

@@ -162,7 +162,7 @@ mod tests {
mod bench {
use rand::prelude::IteratorRandom;
use rand::thread_rng;
use rand::rng;
use test::Bencher;
use super::AliveBitSet;
@@ -176,7 +176,7 @@ mod bench {
}
fn remove_rand(raw: &mut Vec<u32>) {
let i = (0..raw.len()).choose(&mut thread_rng()).unwrap();
let i = (0..raw.len()).choose(&mut rng()).unwrap();
raw.remove(i);
}

View File

@@ -879,7 +879,7 @@ mod tests {
const ONE_HOUR_IN_MICROSECS: i64 = 3_600 * 1_000_000;
let times: Vec<DateTime> = std::iter::repeat_with(|| {
// +- One hour.
let t = T0 + rng.gen_range(-ONE_HOUR_IN_MICROSECS..ONE_HOUR_IN_MICROSECS);
let t = T0 + rng.random_range(-ONE_HOUR_IN_MICROSECS..ONE_HOUR_IN_MICROSECS);
DateTime::from_timestamp_micros(t)
})
.take(1_000)

View File

@@ -1,6 +1,6 @@
use std::collections::HashSet;
use rand::{thread_rng, Rng};
use rand::{rng, Rng};
use crate::indexer::index_writer::MEMORY_BUDGET_NUM_BYTES_MIN;
use crate::schema::*;
@@ -29,7 +29,7 @@ fn test_functional_store() -> crate::Result<()> {
let index = Index::create_in_ram(schema);
let reader = index.reader()?;
let mut rng = thread_rng();
let mut rng = rng();
let mut index_writer: IndexWriter =
index.writer_with_num_threads(3, 3 * MEMORY_BUDGET_NUM_BYTES_MIN)?;
@@ -38,9 +38,9 @@ fn test_functional_store() -> crate::Result<()> {
let mut doc_id = 0u64;
for _iteration in 0..get_num_iterations() {
let num_docs: usize = rng.gen_range(0..4);
let num_docs: usize = rng.random_range(0..4);
if !doc_set.is_empty() {
let doc_to_remove_id = rng.gen_range(0..doc_set.len());
let doc_to_remove_id = rng.random_range(0..doc_set.len());
let removed_doc_id = doc_set.swap_remove(doc_to_remove_id);
index_writer.delete_term(Term::from_field_u64(id_field, removed_doc_id));
}
@@ -70,10 +70,10 @@ const LOREM: &str = "Doc Lorem ipsum dolor sit amet, consectetur adipiscing elit
cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat \
non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.";
fn get_text() -> String {
use rand::seq::SliceRandom;
let mut rng = thread_rng();
use rand::seq::IndexedRandom;
let mut rng = rng();
let tokens: Vec<_> = LOREM.split(' ').collect();
let random_val = rng.gen_range(0..20);
let random_val = rng.random_range(0..20);
(0..random_val)
.map(|_| tokens.choose(&mut rng).unwrap())
@@ -101,7 +101,7 @@ fn test_functional_indexing_unsorted() -> crate::Result<()> {
let index = Index::create_from_tempdir(schema)?;
let reader = index.reader()?;
let mut rng = thread_rng();
let mut rng = rng();
let mut index_writer: IndexWriter =
index.writer_with_num_threads(3, 3 * MEMORY_BUDGET_NUM_BYTES_MIN)?;
@@ -110,7 +110,7 @@ fn test_functional_indexing_unsorted() -> crate::Result<()> {
let mut uncommitted_docs: HashSet<u64> = HashSet::new();
for _ in 0..get_num_iterations() {
let random_val = rng.gen_range(0..20);
let random_val = rng.random_range(0..20);
if random_val == 0 {
index_writer.commit()?;
committed_docs.extend(&uncommitted_docs);

View File

@@ -8,7 +8,6 @@ use std::thread::available_parallelism;
use super::segment::Segment;
use super::segment_reader::merge_field_meta_data;
use super::{FieldMetadata, IndexSettings};
use crate::codec::{CodecConfiguration, StandardCodec};
use crate::core::{Executor, META_FILEPATH};
use crate::directory::error::OpenReadError;
#[cfg(feature = "mmap")]
@@ -60,7 +59,6 @@ fn save_new_metas(
schema: Schema,
index_settings: IndexSettings,
directory: &dyn Directory,
codec: CodecConfiguration,
) -> crate::Result<()> {
save_metas(
&IndexMeta {
@@ -69,7 +67,6 @@ fn save_new_metas(
schema,
opstamp: 0u64,
payload: None,
codec,
},
directory,
)?;
@@ -104,21 +101,18 @@ fn save_new_metas(
/// };
/// let index = Index::builder().schema(schema).settings(settings).create_in_ram();
/// ```
pub struct IndexBuilder<Codec: crate::codec::Codec = StandardCodec> {
pub struct IndexBuilder {
schema: Option<Schema>,
index_settings: IndexSettings,
tokenizer_manager: TokenizerManager,
fast_field_tokenizer_manager: TokenizerManager,
codec: Codec,
}
impl Default for IndexBuilder<StandardCodec> {
impl Default for IndexBuilder {
fn default() -> Self {
IndexBuilder::new()
}
}
impl IndexBuilder<StandardCodec> {
impl IndexBuilder {
/// Creates a new `IndexBuilder`
pub fn new() -> Self {
Self {
@@ -126,21 +120,6 @@ impl IndexBuilder<StandardCodec> {
index_settings: IndexSettings::default(),
tokenizer_manager: TokenizerManager::default(),
fast_field_tokenizer_manager: TokenizerManager::default(),
codec: StandardCodec,
}
}
}
impl<Codec: crate::codec::Codec> IndexBuilder<Codec> {
/// Set the codec
#[must_use]
pub fn codec<NewCodec: crate::codec::Codec>(self, codec: NewCodec) -> IndexBuilder<NewCodec> {
IndexBuilder {
schema: self.schema,
index_settings: self.index_settings,
tokenizer_manager: self.tokenizer_manager,
fast_field_tokenizer_manager: self.fast_field_tokenizer_manager,
codec,
}
}
@@ -175,7 +154,7 @@ impl<Codec: crate::codec::Codec> IndexBuilder<Codec> {
/// The index will be allocated in anonymous memory.
/// This is useful for indexing small set of documents
/// for instances like unit test or temporary in memory index.
pub fn create_in_ram(self) -> Result<Index<Codec>, TantivyError> {
pub fn create_in_ram(self) -> Result<Index, TantivyError> {
let ram_directory = RamDirectory::create();
self.create(ram_directory)
}
@@ -186,7 +165,7 @@ impl<Codec: crate::codec::Codec> IndexBuilder<Codec> {
/// If a previous index was in this directory, it returns an
/// [`TantivyError::IndexAlreadyExists`] error.
#[cfg(feature = "mmap")]
pub fn create_in_dir<P: AsRef<Path>>(self, directory_path: P) -> crate::Result<Index<Codec>> {
pub fn create_in_dir<P: AsRef<Path>>(self, directory_path: P) -> crate::Result<Index> {
let mmap_directory: Box<dyn Directory> = Box::new(MmapDirectory::open(directory_path)?);
if Index::exists(&*mmap_directory)? {
return Err(TantivyError::IndexAlreadyExists);
@@ -207,7 +186,7 @@ impl<Codec: crate::codec::Codec> IndexBuilder<Codec> {
self,
dir: impl Into<Box<dyn Directory>>,
mem_budget: usize,
) -> crate::Result<SingleSegmentIndexWriter<Codec, D>> {
) -> crate::Result<SingleSegmentIndexWriter<D>> {
let index = self.create(dir)?;
let index_simple_writer = SingleSegmentIndexWriter::new(index, mem_budget)?;
Ok(index_simple_writer)
@@ -223,7 +202,7 @@ impl<Codec: crate::codec::Codec> IndexBuilder<Codec> {
/// For other unit tests, prefer the [`RamDirectory`], see:
/// [`IndexBuilder::create_in_ram()`].
#[cfg(feature = "mmap")]
pub fn create_from_tempdir(self) -> crate::Result<Index<Codec>> {
pub fn create_from_tempdir(self) -> crate::Result<Index> {
let mmap_directory: Box<dyn Directory> = Box::new(MmapDirectory::create_from_tempdir()?);
self.create(mmap_directory)
}
@@ -236,15 +215,12 @@ impl<Codec: crate::codec::Codec> IndexBuilder<Codec> {
}
/// Opens or creates a new index in the provided directory
pub fn open_or_create<T: Into<Box<dyn Directory>>>(
self,
dir: T,
) -> crate::Result<Index<Codec>> {
pub fn open_or_create<T: Into<Box<dyn Directory>>>(self, dir: T) -> crate::Result<Index> {
let dir: Box<dyn Directory> = dir.into();
if !Index::exists(&*dir)? {
return self.create(dir);
}
let mut index: Index<Codec> = Index::<Codec>::open_with_codec(dir)?;
let mut index = Index::open(dir)?;
index.set_tokenizers(self.tokenizer_manager.clone());
if index.schema() == self.get_expect_schema()? {
Ok(index)
@@ -268,26 +244,18 @@ impl<Codec: crate::codec::Codec> IndexBuilder<Codec> {
/// Creates a new index given an implementation of the trait `Directory`.
///
/// If a directory previously existed, it will be erased.
pub fn create<T: Into<Box<dyn Directory>>>(self, dir: T) -> crate::Result<Index<Codec>> {
self.create_avoid_monomorphization(dir.into())
}
fn create_avoid_monomorphization(self, dir: Box<dyn Directory>) -> crate::Result<Index<Codec>> {
fn create<T: Into<Box<dyn Directory>>>(self, dir: T) -> crate::Result<Index> {
self.validate()?;
let dir = dir.into();
let directory = ManagedDirectory::wrap(dir)?;
let codec: CodecConfiguration = CodecConfiguration::from_codec(&self.codec);
save_new_metas(
self.get_expect_schema()?,
self.index_settings.clone(),
&directory,
codec,
)?;
let schema = self.get_expect_schema()?;
let mut metas = IndexMeta::with_schema_and_codec(schema, &self.codec);
let mut metas = IndexMeta::with_schema(self.get_expect_schema()?);
metas.index_settings = self.index_settings;
let mut index: Index<Codec> =
Index::<Codec>::open_from_metas(directory, &metas, SegmentMetaInventory::default())?;
let mut index = Index::open_from_metas(directory, &metas, SegmentMetaInventory::default());
index.set_tokenizers(self.tokenizer_manager);
index.set_fast_field_tokenizers(self.fast_field_tokenizer_manager);
Ok(index)
@@ -296,7 +264,7 @@ impl<Codec: crate::codec::Codec> IndexBuilder<Codec> {
/// Search Index
#[derive(Clone)]
pub struct Index<Codec: crate::codec::Codec = crate::codec::StandardCodec> {
pub struct Index {
directory: ManagedDirectory,
schema: Schema,
settings: IndexSettings,
@@ -304,7 +272,6 @@ pub struct Index<Codec: crate::codec::Codec = crate::codec::StandardCodec> {
tokenizers: TokenizerManager,
fast_field_tokenizers: TokenizerManager,
inventory: SegmentMetaInventory,
codec: Codec,
}
impl Index {
@@ -312,6 +279,41 @@ impl Index {
pub fn builder() -> IndexBuilder {
IndexBuilder::new()
}
/// Examines the directory to see if it contains an index.
///
/// Effectively, it only checks for the presence of the `meta.json` file.
pub fn exists(dir: &dyn Directory) -> Result<bool, OpenReadError> {
dir.exists(&META_FILEPATH)
}
/// Accessor to the search executor.
///
/// This pool is used by default when calling `searcher.search(...)`
/// to perform search on the individual segments.
///
/// By default the executor is single thread, and simply runs in the calling thread.
pub fn search_executor(&self) -> &Executor {
&self.executor
}
/// Replace the default single thread search executor pool
/// by a thread pool with a given number of threads.
pub fn set_multithread_executor(&mut self, num_threads: usize) -> crate::Result<()> {
self.executor = Executor::multi_thread(num_threads, "tantivy-search-")?;
Ok(())
}
/// Custom thread pool by a outer thread pool.
pub fn set_executor(&mut self, executor: Executor) {
self.executor = executor;
}
/// Replace the default single thread search executor pool
/// by a thread pool with as many threads as there are CPUs on the system.
pub fn set_default_multithread_executor(&mut self) -> crate::Result<()> {
let default_num_threads = available_parallelism()?.get();
self.set_multithread_executor(default_num_threads)
}
/// Creates a new index using the [`RamDirectory`].
///
@@ -322,13 +324,6 @@ impl Index {
IndexBuilder::new().schema(schema).create_in_ram().unwrap()
}
/// Examines the directory to see if it contains an index.
///
/// Effectively, it only checks for the presence of the `meta.json` file.
pub fn exists(directory: &dyn Directory) -> Result<bool, OpenReadError> {
directory.exists(&META_FILEPATH)
}
/// Creates a new index in a given filepath.
/// The index will use the [`MmapDirectory`].
///
@@ -375,107 +370,20 @@ impl Index {
schema: Schema,
settings: IndexSettings,
) -> crate::Result<Index> {
Self::create_to_avoid_monomorphization(dir.into(), schema, settings)
}
fn create_to_avoid_monomorphization(
dir: Box<dyn Directory>,
schema: Schema,
settings: IndexSettings,
) -> crate::Result<Index> {
let dir: Box<dyn Directory> = dir.into();
let mut builder = IndexBuilder::new().schema(schema);
builder = builder.settings(settings);
builder.create(dir)
}
/// Opens a new directory from an index path.
#[cfg(feature = "mmap")]
pub fn open_in_dir<P: AsRef<Path>>(directory_path: P) -> crate::Result<Index> {
Self::open_in_dir_to_avoid_monomorphization(directory_path.as_ref())
}
#[inline(never)]
fn open_in_dir_to_avoid_monomorphization(directory_path: &Path) -> crate::Result<Index> {
let mmap_directory = MmapDirectory::open(directory_path)?;
Index::open(mmap_directory)
}
/// Open the index using the provided directory
pub fn open<T: Into<Box<dyn Directory>>>(directory: T) -> crate::Result<Index> {
Index::<StandardCodec>::open_with_codec(directory.into())
}
}
impl<Codec: crate::codec::Codec> Index<Codec> {
/// Returns a version of this index with the standard codec.
/// This is useful when you need to pass the index to APIs that
/// don't care about the codec (e.g., for reading).
pub(crate) fn with_standard_codec(&self) -> Index<StandardCodec> {
Index {
directory: self.directory.clone(),
schema: self.schema.clone(),
settings: self.settings.clone(),
executor: self.executor.clone(),
tokenizers: self.tokenizers.clone(),
fast_field_tokenizers: self.fast_field_tokenizers.clone(),
inventory: self.inventory.clone(),
codec: StandardCodec::default(),
}
}
/// Open the index using the provided directory
#[inline(never)]
pub fn open_with_codec(directory: Box<dyn Directory>) -> crate::Result<Index<Codec>> {
let directory = ManagedDirectory::wrap(directory)?;
let inventory = SegmentMetaInventory::default();
let metas = load_metas(&directory, &inventory)?;
let index: Index<Codec> = Index::<Codec>::open_from_metas(directory, &metas, inventory)?;
Ok(index)
}
/// Accessor to the codec.
pub fn codec(&self) -> &Codec {
&self.codec
}
/// Accessor to the search executor.
///
/// This pool is used by default when calling `searcher.search(...)`
/// to perform search on the individual segments.
///
/// By default the executor is single thread, and simply runs in the calling thread.
pub fn search_executor(&self) -> &Executor {
&self.executor
}
/// Replace the default single thread search executor pool
/// by a thread pool with a given number of threads.
pub fn set_multithread_executor(&mut self, num_threads: usize) -> crate::Result<()> {
self.executor = Executor::multi_thread(num_threads, "tantivy-search-")?;
Ok(())
}
/// Custom thread pool by a outer thread pool.
pub fn set_executor(&mut self, executor: Executor) {
self.executor = executor;
}
/// Replace the default single thread search executor pool
/// by a thread pool with as many threads as there are CPUs on the system.
pub fn set_default_multithread_executor(&mut self) -> crate::Result<()> {
let default_num_threads = available_parallelism()?.get();
self.set_multithread_executor(default_num_threads)
}
/// Creates a new index given a directory and an [`IndexMeta`].
fn open_from_metas<C: crate::codec::Codec>(
fn open_from_metas(
directory: ManagedDirectory,
metas: &IndexMeta,
inventory: SegmentMetaInventory,
) -> crate::Result<Index<C>> {
) -> Index {
let schema = metas.schema.clone();
let codec = metas.codec.to_codec::<C>()?;
Ok(Index {
Index {
settings: metas.index_settings.clone(),
directory,
schema,
@@ -483,8 +391,7 @@ impl<Codec: crate::codec::Codec> Index<Codec> {
fast_field_tokenizers: TokenizerManager::default(),
executor: Executor::single_thread(),
inventory,
codec,
})
}
}
/// Setter for the tokenizer manager.
@@ -540,7 +447,7 @@ impl<Codec: crate::codec::Codec> Index<Codec> {
/// Create a default [`IndexReader`] for the given index.
///
/// See [`Index.reader_builder()`].
pub fn reader(&self) -> crate::Result<IndexReader<Codec>> {
pub fn reader(&self) -> crate::Result<IndexReader> {
self.reader_builder().try_into()
}
@@ -548,10 +455,17 @@ impl<Codec: crate::codec::Codec> Index<Codec> {
///
/// Most project should create at most one reader for a given index.
/// This method is typically called only once per `Index` instance.
pub fn reader_builder(&self) -> IndexReaderBuilder<Codec> {
pub fn reader_builder(&self) -> IndexReaderBuilder {
IndexReaderBuilder::new(self.clone())
}
/// Opens a new directory from an index path.
#[cfg(feature = "mmap")]
pub fn open_in_dir<P: AsRef<Path>>(directory_path: P) -> crate::Result<Index> {
let mmap_directory = MmapDirectory::open(directory_path)?;
Index::open(mmap_directory)
}
/// Returns the list of the segment metas tracked by the index.
///
/// Such segments can of course be part of the index,
@@ -592,6 +506,16 @@ impl<Codec: crate::codec::Codec> Index<Codec> {
self.inventory.new_segment_meta(segment_id, max_doc)
}
/// Open the index using the provided directory
pub fn open<T: Into<Box<dyn Directory>>>(directory: T) -> crate::Result<Index> {
let directory = directory.into();
let directory = ManagedDirectory::wrap(directory)?;
let inventory = SegmentMetaInventory::default();
let metas = load_metas(&directory, &inventory)?;
let index = Index::open_from_metas(directory, &metas, inventory);
Ok(index)
}
/// Reads the index meta file from the directory.
pub fn load_metas(&self) -> crate::Result<IndexMeta> {
load_metas(self.directory(), &self.inventory)
@@ -615,7 +539,7 @@ impl<Codec: crate::codec::Codec> Index<Codec> {
pub fn writer_with_options<D: Document>(
&self,
options: IndexWriterOptions,
) -> crate::Result<IndexWriter<Codec, D>> {
) -> crate::Result<IndexWriter<D>> {
let directory_lock = self
.directory
.acquire_lock(&INDEX_WRITER_LOCK)
@@ -657,7 +581,7 @@ impl<Codec: crate::codec::Codec> Index<Codec> {
&self,
num_threads: usize,
overall_memory_budget_in_bytes: usize,
) -> crate::Result<IndexWriter<Codec, D>> {
) -> crate::Result<IndexWriter<D>> {
let memory_arena_in_bytes_per_thread = overall_memory_budget_in_bytes / num_threads;
let options = IndexWriterOptions::builder()
.num_worker_threads(num_threads)
@@ -671,7 +595,7 @@ impl<Codec: crate::codec::Codec> Index<Codec> {
/// That index writer only simply has a single thread and a memory budget of 15 MB.
/// Using a single thread gives us a deterministic allocation of DocId.
#[cfg(test)]
pub fn writer_for_tests<D: Document>(&self) -> crate::Result<IndexWriter<Codec, D>> {
pub fn writer_for_tests<D: Document>(&self) -> crate::Result<IndexWriter<D>> {
self.writer_with_num_threads(1, MEMORY_BUDGET_NUM_BYTES_MIN)
}
@@ -689,7 +613,7 @@ impl<Codec: crate::codec::Codec> Index<Codec> {
pub fn writer<D: Document>(
&self,
memory_budget_in_bytes: usize,
) -> crate::Result<IndexWriter<Codec, D>> {
) -> crate::Result<IndexWriter<D>> {
let mut num_threads = std::cmp::min(available_parallelism()?.get(), MAX_NUM_THREAD);
let memory_budget_num_bytes_per_thread = memory_budget_in_bytes / num_threads;
if memory_budget_num_bytes_per_thread < MEMORY_BUDGET_NUM_BYTES_MIN {
@@ -716,7 +640,7 @@ impl<Codec: crate::codec::Codec> Index<Codec> {
}
/// Returns the list of segments that are searchable
pub fn searchable_segments(&self) -> crate::Result<Vec<Segment<Codec>>> {
pub fn searchable_segments(&self) -> crate::Result<Vec<Segment>> {
Ok(self
.searchable_segment_metas()?
.into_iter()
@@ -725,12 +649,12 @@ impl<Codec: crate::codec::Codec> Index<Codec> {
}
#[doc(hidden)]
pub fn segment(&self, segment_meta: SegmentMeta) -> Segment<Codec> {
pub fn segment(&self, segment_meta: SegmentMeta) -> Segment {
Segment::for_index(self.clone(), segment_meta)
}
/// Creates a new segment.
pub fn new_segment(&self) -> Segment<Codec> {
pub fn new_segment(&self) -> Segment {
let segment_meta = self
.inventory
.new_segment_meta(SegmentId::generate_random(), 0);

View File

@@ -7,7 +7,6 @@ use std::sync::Arc;
use serde::{Deserialize, Serialize};
use super::SegmentComponent;
use crate::codec::{Codec, CodecConfiguration};
use crate::index::SegmentId;
use crate::schema::Schema;
use crate::store::Compressor;
@@ -321,7 +320,6 @@ pub struct IndexMeta {
/// This payload is entirely unused by tantivy.
#[serde(skip_serializing_if = "Option::is_none")]
pub payload: Option<String>,
pub codec: CodecConfiguration,
}
#[derive(Deserialize, Debug)]
@@ -333,8 +331,6 @@ struct UntrackedIndexMeta {
pub opstamp: Opstamp,
#[serde(skip_serializing_if = "Option::is_none")]
pub payload: Option<String>,
#[serde(default)]
pub codec: CodecConfiguration,
}
impl UntrackedIndexMeta {
@@ -349,7 +345,6 @@ impl UntrackedIndexMeta {
schema: self.schema,
opstamp: self.opstamp,
payload: self.payload,
codec: self.codec,
}
}
}
@@ -360,14 +355,13 @@ impl IndexMeta {
///
/// This new index does not contains any segments.
/// Opstamp will the value `0u64`.
pub fn with_schema_and_codec<C: Codec>(schema: Schema, codec: &C) -> IndexMeta {
pub fn with_schema(schema: Schema) -> IndexMeta {
IndexMeta {
index_settings: IndexSettings::default(),
segments: vec![],
schema,
opstamp: 0u64,
payload: None,
codec: CodecConfiguration::from_codec(codec),
}
}
@@ -418,12 +412,11 @@ mod tests {
schema,
opstamp: 0u64,
payload: None,
codec: Default::default(),
};
let json = serde_json::ser::to_string(&index_metas).expect("serialization failed");
assert_eq!(
json,
r#"{"index_settings":{"docstore_compression":"none","docstore_blocksize":16384},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0,"codec":{"name":"standard"}}"#
r#"{"index_settings":{"docstore_compression":"none","docstore_blocksize":16384},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"#
);
let deser_meta: UntrackedIndexMeta = serde_json::from_str(&json).unwrap();

View File

@@ -2,7 +2,6 @@ use std::fmt;
use std::path::PathBuf;
use super::SegmentComponent;
use crate::codec::StandardCodec;
use crate::directory::error::{OpenReadError, OpenWriteError};
use crate::directory::{Directory, FileSlice, WritePtr};
use crate::index::{Index, SegmentId, SegmentMeta};
@@ -11,25 +10,25 @@ use crate::Opstamp;
/// A segment is a piece of the index.
#[derive(Clone)]
pub struct Segment<C: crate::codec::Codec = StandardCodec> {
index: Index<C>,
pub struct Segment {
index: Index,
meta: SegmentMeta,
}
impl<C: crate::codec::Codec> fmt::Debug for Segment<C> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
impl fmt::Debug for Segment {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Segment({:?})", self.id().uuid_string())
}
}
impl<C: crate::codec::Codec> Segment<C> {
impl Segment {
/// Creates a new segment given an `Index` and a `SegmentId`
pub(crate) fn for_index(index: Index<C>, meta: SegmentMeta) -> Segment<C> {
pub(crate) fn for_index(index: Index, meta: SegmentMeta) -> Segment {
Segment { index, meta }
}
/// Returns the index the segment belongs to.
pub fn index(&self) -> &Index<C> {
pub fn index(&self) -> &Index {
&self.index
}
@@ -47,7 +46,7 @@ impl<C: crate::codec::Codec> Segment<C> {
///
/// This method is only used when updating `max_doc` from 0
/// as we finalize a fresh new segment.
pub fn with_max_doc(self, max_doc: u32) -> Segment<C> {
pub fn with_max_doc(self, max_doc: u32) -> Segment {
Segment {
index: self.index,
meta: self.meta.with_max_doc(max_doc),
@@ -56,7 +55,7 @@ impl<C: crate::codec::Codec> Segment<C> {
#[doc(hidden)]
#[must_use]
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: Opstamp) -> Segment<C> {
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: Opstamp) -> Segment {
Segment {
index: self.index,
meta: self.meta.with_delete_meta(num_deleted_docs, opstamp),

View File

@@ -140,13 +140,13 @@ impl SegmentReader {
}
/// Open a new segment for reading.
pub fn open<C: crate::codec::Codec>(segment: &Segment<C>) -> crate::Result<SegmentReader> {
pub fn open(segment: &Segment) -> crate::Result<SegmentReader> {
Self::open_with_custom_alive_set(segment, None)
}
/// Open a new segment for reading.
pub fn open_with_custom_alive_set<C: crate::codec::Codec>(
segment: &Segment<C>,
pub fn open_with_custom_alive_set(
segment: &Segment,
custom_bitset: Option<AliveBitSet>,
) -> crate::Result<SegmentReader> {
let termdict_file = segment.open_read(SegmentComponent::Terms)?;

View File

@@ -9,7 +9,6 @@ use smallvec::smallvec;
use super::operation::{AddOperation, UserOperation};
use super::segment_updater::SegmentUpdater;
use super::{AddBatch, AddBatchReceiver, AddBatchSender, PreparedCommit};
use crate::codec::{Codec, StandardCodec};
use crate::directory::{DirectoryLock, GarbageCollectionResult, TerminatingWrite};
use crate::error::TantivyError;
use crate::fastfield::write_alive_bitset;
@@ -69,12 +68,12 @@ pub struct IndexWriterOptions {
/// indexing queue.
/// Each indexing thread builds its own independent [`Segment`], via
/// a `SegmentWriter` object.
pub struct IndexWriter<C: Codec = StandardCodec, D: Document = TantivyDocument> {
pub struct IndexWriter<D: Document = TantivyDocument> {
// the lock is just used to bind the
// lifetime of the lock with that of the IndexWriter.
_directory_lock: Option<DirectoryLock>,
index: Index<C>,
index: Index,
options: IndexWriterOptions,
@@ -83,7 +82,7 @@ pub struct IndexWriter<C: Codec = StandardCodec, D: Document = TantivyDocument>
index_writer_status: IndexWriterStatus<D>,
operation_sender: AddBatchSender<D>,
segment_updater: SegmentUpdater<C>,
segment_updater: SegmentUpdater,
worker_id: usize,
@@ -129,8 +128,8 @@ fn compute_deleted_bitset(
/// is `==` target_opstamp.
/// For instance, there was no delete operation between the state of the `segment_entry` and
/// the `target_opstamp`, `segment_entry` is not updated.
pub fn advance_deletes<C: Codec>(
mut segment: Segment<C>,
pub fn advance_deletes(
mut segment: Segment,
segment_entry: &mut SegmentEntry,
target_opstamp: Opstamp,
) -> crate::Result<()> {
@@ -180,11 +179,11 @@ pub fn advance_deletes<C: Codec>(
Ok(())
}
fn index_documents<C: crate::codec::Codec, D: Document>(
fn index_documents<D: Document>(
memory_budget: usize,
segment: Segment<C>,
segment: Segment,
grouped_document_iterator: &mut dyn Iterator<Item = AddBatch<D>>,
segment_updater: &SegmentUpdater<C>,
segment_updater: &SegmentUpdater,
mut delete_cursor: DeleteCursor,
) -> crate::Result<()> {
let mut segment_writer = SegmentWriter::for_segment(memory_budget, segment.clone())?;
@@ -227,8 +226,8 @@ fn index_documents<C: crate::codec::Codec, D: Document>(
}
/// `doc_opstamps` is required to be non-empty.
fn apply_deletes<C: crate::codec::Codec>(
segment: &Segment<C>,
fn apply_deletes(
segment: &Segment,
delete_cursor: &mut DeleteCursor,
doc_opstamps: &[Opstamp],
) -> crate::Result<Option<BitSet>> {
@@ -263,7 +262,7 @@ fn apply_deletes<C: crate::codec::Codec>(
})
}
impl<C: Codec, D: Document> IndexWriter<C, D> {
impl<D: Document> IndexWriter<D> {
/// Create a new index writer. Attempts to acquire a lockfile.
///
/// The lockfile should be deleted on drop, but it is possible
@@ -279,7 +278,7 @@ impl<C: Codec, D: Document> IndexWriter<C, D> {
/// If the memory arena per thread is too small or too big, returns
/// `TantivyError::InvalidArgument`
pub(crate) fn new(
index: &Index<C>,
index: &Index,
options: IndexWriterOptions,
directory_lock: DirectoryLock,
) -> crate::Result<Self> {
@@ -346,7 +345,7 @@ impl<C: Codec, D: Document> IndexWriter<C, D> {
}
/// Accessor to the index.
pub fn index(&self) -> &Index<C> {
pub fn index(&self) -> &Index {
&self.index
}
@@ -394,7 +393,7 @@ impl<C: Codec, D: Document> IndexWriter<C, D> {
/// It is safe to start writing file associated with the new `Segment`.
/// These will not be garbage collected as long as an instance object of
/// `SegmentMeta` object associated with the new `Segment` is "alive".
pub fn new_segment(&self) -> Segment<C> {
pub fn new_segment(&self) -> Segment {
self.index.new_segment()
}
@@ -616,7 +615,7 @@ impl<C: Codec, D: Document> IndexWriter<C, D> {
/// It is also possible to add a payload to the `commit`
/// using this API.
/// See [`PreparedCommit::set_payload()`].
pub fn prepare_commit(&mut self) -> crate::Result<PreparedCommit<'_, C, D>> {
pub fn prepare_commit(&mut self) -> crate::Result<PreparedCommit<'_, D>> {
// Here, because we join all of the worker threads,
// all of the segment update for this commit have been
// sent.
@@ -666,7 +665,7 @@ impl<C: Codec, D: Document> IndexWriter<C, D> {
self.prepare_commit()?.commit()
}
pub(crate) fn segment_updater(&self) -> &SegmentUpdater<C> {
pub(crate) fn segment_updater(&self) -> &SegmentUpdater {
&self.segment_updater
}
@@ -805,7 +804,7 @@ impl<C: Codec, D: Document> IndexWriter<C, D> {
}
}
impl<C: Codec, D: Document> Drop for IndexWriter<C, D> {
impl<D: Document> Drop for IndexWriter<D> {
fn drop(&mut self) {
self.segment_updater.kill();
self.drop_sender();

View File

@@ -145,10 +145,7 @@ fn extract_fast_field_required_columns(schema: &Schema) -> Vec<(String, ColumnTy
}
impl IndexMerger {
pub fn open<C: crate::codec::Codec>(
schema: Schema,
segments: &[Segment<C>],
) -> crate::Result<IndexMerger> {
pub fn open(schema: Schema, segments: &[Segment]) -> crate::Result<IndexMerger> {
let alive_bitset = segments.iter().map(|_| None).collect_vec();
Self::open_with_custom_alive_set(schema, segments, alive_bitset)
}
@@ -165,9 +162,9 @@ impl IndexMerger {
// This can be used to merge but also apply an additional filter.
// One use case is demux, which is basically taking a list of
// segments and partitions them e.g. by a value in a field.
pub fn open_with_custom_alive_set<C: crate::codec::Codec>(
pub fn open_with_custom_alive_set(
schema: Schema,
segments: &[Segment<C>],
segments: &[Segment],
alive_bitset_opt: Vec<Option<AliveBitSet>>,
) -> crate::Result<IndexMerger> {
let mut readers = vec![];
@@ -528,10 +525,7 @@ impl IndexMerger {
///
/// # Returns
/// The number of documents in the resulting segment.
pub fn write<C: crate::codec::Codec>(
&self,
mut serializer: SegmentSerializer<C>,
) -> crate::Result<u32> {
pub fn write(&self, mut serializer: SegmentSerializer) -> crate::Result<u32> {
let doc_id_mapping = self.get_doc_id_from_concatenated_data()?;
debug!("write-fieldnorms");
if let Some(fieldnorms_serializer) = serializer.extract_fieldnorms_serializer() {

View File

@@ -1,17 +1,16 @@
use super::IndexWriter;
use crate::codec::Codec;
use crate::schema::document::Document;
use crate::{FutureResult, Opstamp, TantivyDocument};
/// A prepared commit
pub struct PreparedCommit<'a, C: Codec, D: Document = TantivyDocument> {
index_writer: &'a mut IndexWriter<C, D>,
pub struct PreparedCommit<'a, D: Document = TantivyDocument> {
index_writer: &'a mut IndexWriter<D>,
payload: Option<String>,
opstamp: Opstamp,
}
impl<'a, C: Codec, D: Document> PreparedCommit<'a, C, D> {
pub(crate) fn new(index_writer: &'a mut IndexWriter<C, D>, opstamp: Opstamp) -> Self {
impl<'a, D: Document> PreparedCommit<'a, D> {
pub(crate) fn new(index_writer: &'a mut IndexWriter<D>, opstamp: Opstamp) -> Self {
Self {
index_writer,
payload: None,

View File

@@ -8,17 +8,17 @@ use crate::store::StoreWriter;
/// Segment serializer is in charge of laying out on disk
/// the data accumulated and sorted by the `SegmentWriter`.
pub struct SegmentSerializer<C: crate::codec::Codec> {
segment: Segment<C>,
pub struct SegmentSerializer {
segment: Segment,
pub(crate) store_writer: StoreWriter,
fast_field_write: WritePtr,
fieldnorms_serializer: Option<FieldNormsSerializer>,
postings_serializer: InvertedIndexSerializer,
}
impl<C: crate::codec::Codec> SegmentSerializer<C> {
impl SegmentSerializer {
/// Creates a new `SegmentSerializer`.
pub fn for_segment(mut segment: Segment<C>) -> crate::Result<SegmentSerializer<C>> {
pub fn for_segment(mut segment: Segment) -> crate::Result<SegmentSerializer> {
let settings = segment.index().settings().clone();
let store_writer = {
let store_write = segment.open_write(SegmentComponent::Store)?;
@@ -50,7 +50,7 @@ impl<C: crate::codec::Codec> SegmentSerializer<C> {
self.store_writer.mem_usage()
}
pub fn segment(&self) -> &Segment<C> {
pub fn segment(&self) -> &Segment {
&self.segment
}

View File

@@ -10,7 +10,6 @@ use std::sync::{Arc, RwLock};
use rayon::{ThreadPool, ThreadPoolBuilder};
use super::segment_manager::SegmentManager;
use crate::codec::{Codec, CodecConfiguration};
use crate::core::META_FILEPATH;
use crate::directory::{Directory, DirectoryClone, GarbageCollectionResult};
use crate::fastfield::AliveBitSet;
@@ -62,10 +61,10 @@ pub(crate) fn save_metas(metas: &IndexMeta, directory: &dyn Directory) -> crate:
// We voluntarily pass a merge_operation ref to guarantee that
// the merge_operation is alive during the process
#[derive(Clone)]
pub(crate) struct SegmentUpdater<C: Codec>(Arc<InnerSegmentUpdater<C>>);
pub(crate) struct SegmentUpdater(Arc<InnerSegmentUpdater>);
impl<C: Codec> Deref for SegmentUpdater<C> {
type Target = InnerSegmentUpdater<C>;
impl Deref for SegmentUpdater {
type Target = InnerSegmentUpdater;
#[inline]
fn deref(&self) -> &Self::Target {
@@ -73,8 +72,8 @@ impl<C: Codec> Deref for SegmentUpdater<C> {
}
}
fn garbage_collect_files<C: Codec>(
segment_updater: SegmentUpdater<C>,
fn garbage_collect_files(
segment_updater: SegmentUpdater,
) -> crate::Result<GarbageCollectionResult> {
info!("Running garbage collection");
let mut index = segment_updater.index.clone();
@@ -85,8 +84,8 @@ fn garbage_collect_files<C: Codec>(
/// Merges a list of segments the list of segment givens in the `segment_entries`.
/// This function happens in the calling thread and is computationally expensive.
fn merge<Codec: crate::codec::Codec>(
index: &Index<Codec>,
fn merge(
index: &Index,
mut segment_entries: Vec<SegmentEntry>,
target_opstamp: Opstamp,
) -> crate::Result<Option<SegmentEntry>> {
@@ -109,7 +108,7 @@ fn merge<Codec: crate::codec::Codec>(
let delete_cursor = segment_entries[0].delete_cursor().clone();
let segments: Vec<Segment<Codec>> = segment_entries
let segments: Vec<Segment> = segment_entries
.iter()
.map(|segment_entry| index.segment(segment_entry.meta().clone()))
.collect();
@@ -140,10 +139,10 @@ fn merge<Codec: crate::codec::Codec>(
/// meant to work if you have an `IndexWriter` running for the origin indices, or
/// the destination `Index`.
#[doc(hidden)]
pub fn merge_indices<Codec: crate::codec::Codec>(
indices: &[Index<Codec>],
output_directory: Box<dyn Directory>,
) -> crate::Result<Index<Codec>> {
pub fn merge_indices<T: Into<Box<dyn Directory>>>(
indices: &[Index],
output_directory: T,
) -> crate::Result<Index> {
if indices.is_empty() {
// If there are no indices to merge, there is no need to do anything.
return Err(crate::TantivyError::InvalidArgument(
@@ -164,7 +163,7 @@ pub fn merge_indices<Codec: crate::codec::Codec>(
));
}
let mut segments: Vec<Segment<Codec>> = Vec::new();
let mut segments: Vec<Segment> = Vec::new();
for index in indices {
segments.extend(index.searchable_segments()?);
}
@@ -186,12 +185,12 @@ pub fn merge_indices<Codec: crate::codec::Codec>(
/// meant to work if you have an `IndexWriter` running for the origin indices, or
/// the destination `Index`.
#[doc(hidden)]
pub fn merge_filtered_segments<Codec: crate::codec::Codec, T: Into<Box<dyn Directory>>>(
segments: &[Segment<Codec>],
pub fn merge_filtered_segments<T: Into<Box<dyn Directory>>>(
segments: &[Segment],
target_settings: IndexSettings,
filter_doc_ids: Vec<Option<AliveBitSet>>,
output_directory: T,
) -> crate::Result<Index<Codec>> {
) -> crate::Result<Index> {
if segments.is_empty() {
// If there are no indices to merge, there is no need to do anything.
return Err(crate::TantivyError::InvalidArgument(
@@ -212,12 +211,11 @@ pub fn merge_filtered_segments<Codec: crate::codec::Codec, T: Into<Box<dyn Direc
));
}
let mut merged_index: Index<Codec> = Index::builder()
.schema(target_schema.clone())
.codec(segments[0].index().codec().clone())
.settings(target_settings.clone())
.create(output_directory.into())?;
let mut merged_index = Index::create(
output_directory,
target_schema.clone(),
target_settings.clone(),
)?;
let merged_segment = merged_index.new_segment();
let merged_segment_id = merged_segment.id();
let merger: IndexMerger =
@@ -237,7 +235,6 @@ pub fn merge_filtered_segments<Codec: crate::codec::Codec, T: Into<Box<dyn Direc
))
.trim_end()
);
let codec_configuration = CodecConfiguration::from_codec(segments[0].index().codec());
let index_meta = IndexMeta {
index_settings: target_settings, // index_settings of all segments should be the same
@@ -245,7 +242,6 @@ pub fn merge_filtered_segments<Codec: crate::codec::Codec, T: Into<Box<dyn Direc
schema: target_schema,
opstamp: 0u64,
payload: Some(stats),
codec: codec_configuration,
};
// save the meta.json
@@ -254,7 +250,7 @@ pub fn merge_filtered_segments<Codec: crate::codec::Codec, T: Into<Box<dyn Direc
Ok(merged_index)
}
pub(crate) struct InnerSegmentUpdater<C: Codec> {
pub(crate) struct InnerSegmentUpdater {
// we keep a copy of the current active IndexMeta to
// avoid loading the file every time we need it in the
// `SegmentUpdater`.
@@ -265,7 +261,7 @@ pub(crate) struct InnerSegmentUpdater<C: Codec> {
pool: ThreadPool,
merge_thread_pool: ThreadPool,
index: Index<C>,
index: Index,
segment_manager: SegmentManager,
merge_policy: RwLock<Arc<dyn MergePolicy>>,
killed: AtomicBool,
@@ -273,13 +269,13 @@ pub(crate) struct InnerSegmentUpdater<C: Codec> {
merge_operations: MergeOperationInventory,
}
impl<Codec: crate::codec::Codec> SegmentUpdater<Codec> {
impl SegmentUpdater {
pub fn create(
index: Index<Codec>,
index: Index,
stamper: Stamper,
delete_cursor: &DeleteCursor,
num_merge_threads: usize,
) -> crate::Result<Self> {
) -> crate::Result<SegmentUpdater> {
let segments = index.searchable_segment_metas()?;
let segment_manager = SegmentManager::from_segments(segments, delete_cursor);
let pool = ThreadPoolBuilder::new()
@@ -408,14 +404,12 @@ impl<Codec: crate::codec::Codec> SegmentUpdater<Codec> {
//
// Segment 1 from disk 1, Segment 1 from disk 2, etc.
committed_segment_metas.sort_by_key(|segment_meta| -(segment_meta.max_doc() as i32));
let codec = CodecConfiguration::from_codec(index.codec());
let index_meta = IndexMeta {
index_settings: index.settings().clone(),
segments: committed_segment_metas,
schema: index.schema(),
opstamp,
payload: commit_message,
codec,
};
// TODO add context to the error.
save_metas(&index_meta, directory.box_clone().borrow_mut())?;
@@ -449,7 +443,7 @@ impl<Codec: crate::codec::Codec> SegmentUpdater<Codec> {
opstamp: Opstamp,
payload: Option<String>,
) -> FutureResult<Opstamp> {
let segment_updater: SegmentUpdater<Codec> = self.clone();
let segment_updater: SegmentUpdater = self.clone();
self.schedule_task(move || {
let segment_entries = segment_updater.purge_deletes(opstamp)?;
segment_updater.segment_manager.commit(segment_entries);
@@ -708,7 +702,6 @@ impl<Codec: crate::codec::Codec> SegmentUpdater<Codec> {
#[cfg(test)]
mod tests {
use super::merge_indices;
use crate::codec::StandardCodec;
use crate::collector::TopDocs;
use crate::directory::RamDirectory;
use crate::fastfield::AliveBitSet;
@@ -922,7 +915,7 @@ mod tests {
#[test]
fn test_merge_empty_indices_array() {
let merge_result = merge_indices::<StandardCodec>(&[], Box::new(RamDirectory::default()));
let merge_result = merge_indices(&[], RamDirectory::default());
assert!(merge_result.is_err());
}
@@ -949,10 +942,7 @@ mod tests {
};
// mismatched schema index list
let result = merge_indices(
&[first_index, second_index],
Box::new(RamDirectory::default()),
);
let result = merge_indices(&[first_index, second_index], RamDirectory::default());
assert!(result.is_err());
Ok(())

View File

@@ -4,7 +4,6 @@ use itertools::Itertools;
use tokenizer_api::BoxTokenStream;
use super::operation::AddOperation;
use crate::codec::Codec;
use crate::fastfield::FastFieldsWriter;
use crate::fieldnorm::{FieldNormReaders, FieldNormsWriter};
use crate::index::{Segment, SegmentComponent};
@@ -46,11 +45,11 @@ fn compute_initial_table_size(per_thread_memory_budget: usize) -> crate::Result<
///
/// They creates the postings list in anonymous memory.
/// The segment is laid on disk when the segment gets `finalized`.
pub struct SegmentWriter<Codec: crate::codec::Codec> {
pub struct SegmentWriter {
pub(crate) max_doc: DocId,
pub(crate) ctx: IndexingContext,
pub(crate) per_field_postings_writers: PerFieldPostingsWriter,
pub(crate) segment_serializer: SegmentSerializer<Codec>,
pub(crate) segment_serializer: SegmentSerializer,
pub(crate) fast_field_writers: FastFieldsWriter,
pub(crate) fieldnorms_writer: FieldNormsWriter,
pub(crate) json_path_writer: JsonPathWriter,
@@ -61,7 +60,7 @@ pub struct SegmentWriter<Codec: crate::codec::Codec> {
schema: Schema,
}
impl<Codec: crate::codec::Codec> SegmentWriter<Codec> {
impl SegmentWriter {
/// Creates a new `SegmentWriter`
///
/// The arguments are defined as follows
@@ -71,10 +70,7 @@ impl<Codec: crate::codec::Codec> SegmentWriter<Codec> {
/// behavior as a memory limit.
/// - segment: The segment being written
/// - schema
pub fn for_segment(
memory_budget_in_bytes: usize,
segment: Segment<Codec>,
) -> crate::Result<Self> {
pub fn for_segment(memory_budget_in_bytes: usize, segment: Segment) -> crate::Result<Self> {
let schema = segment.schema();
let tokenizer_manager = segment.index().tokenizers().clone();
let tokenizer_manager_fast_field = segment.index().fast_field_tokenizer().clone();
@@ -390,13 +386,13 @@ impl<Codec: crate::codec::Codec> SegmentWriter<Codec> {
/// to the `SegmentSerializer`.
///
/// `doc_id_map` is used to map to the new doc_id order.
fn remap_and_write<C: Codec>(
fn remap_and_write(
schema: Schema,
per_field_postings_writers: &PerFieldPostingsWriter,
ctx: IndexingContext,
fast_field_writers: FastFieldsWriter,
fieldnorms_writer: &FieldNormsWriter,
mut serializer: SegmentSerializer<C>,
mut serializer: SegmentSerializer,
) -> crate::Result<()> {
debug!("remap-and-write");
if let Some(fieldnorms_serializer) = serializer.extract_fieldnorms_serializer() {

View File

@@ -1,6 +1,5 @@
use std::marker::PhantomData;
use crate::codec::CodecConfiguration;
use crate::indexer::operation::AddOperation;
use crate::indexer::segment_updater::save_metas;
use crate::indexer::SegmentWriter;
@@ -8,22 +7,22 @@ use crate::schema::document::Document;
use crate::{Directory, Index, IndexMeta, Opstamp, Segment, TantivyDocument};
#[doc(hidden)]
pub struct SingleSegmentIndexWriter<Codec: crate::codec::Codec, D: Document = TantivyDocument> {
segment_writer: SegmentWriter<Codec>,
segment: Segment<Codec>,
pub struct SingleSegmentIndexWriter<D: Document = TantivyDocument> {
segment_writer: SegmentWriter,
segment: Segment,
opstamp: Opstamp,
_doc: PhantomData<D>,
_phantom: PhantomData<D>,
}
impl<Codec: crate::codec::Codec, D: Document> SingleSegmentIndexWriter<Codec, D> {
pub fn new(index: Index<Codec>, mem_budget: usize) -> crate::Result<Self> {
impl<D: Document> SingleSegmentIndexWriter<D> {
pub fn new(index: Index, mem_budget: usize) -> crate::Result<Self> {
let segment = index.new_segment();
let segment_writer = SegmentWriter::for_segment(mem_budget, segment.clone())?;
Ok(Self {
segment_writer,
segment,
opstamp: 0,
_doc: PhantomData,
_phantom: PhantomData,
})
}
@@ -38,10 +37,10 @@ impl<Codec: crate::codec::Codec, D: Document> SingleSegmentIndexWriter<Codec, D>
.add_document(AddOperation { opstamp, document })
}
pub fn finalize(self) -> crate::Result<Index<Codec>> {
pub fn finalize(self) -> crate::Result<Index> {
let max_doc = self.segment_writer.max_doc();
self.segment_writer.finalize()?;
let segment: Segment<Codec> = self.segment.with_max_doc(max_doc);
let segment: Segment = self.segment.with_max_doc(max_doc);
let index = segment.index();
let index_meta = IndexMeta {
index_settings: index.settings().clone(),
@@ -49,7 +48,6 @@ impl<Codec: crate::codec::Codec, D: Document> SingleSegmentIndexWriter<Codec, D>
schema: index.schema(),
opstamp: 0,
payload: None,
codec: CodecConfiguration::from_codec(index.codec()),
};
save_metas(&index_meta, index.directory())?;
index.directory().sync_directory()?;

View File

@@ -166,7 +166,6 @@ mod functional_test;
#[macro_use]
mod macros;
pub mod codec;
mod future_result;
// Re-exports
@@ -378,7 +377,7 @@ pub mod tests {
use common::{BinarySerializable, FixedSize};
use query_grammar::{UserInputAst, UserInputLeaf, UserInputLiteral};
use rand::distributions::{Bernoulli, Uniform};
use rand::distr::{Bernoulli, Uniform};
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use time::OffsetDateTime;
@@ -429,7 +428,7 @@ pub mod tests {
pub fn generate_nonunique_unsorted(max_value: u32, n_elems: usize) -> Vec<u32> {
let seed: [u8; 32] = [1; 32];
StdRng::from_seed(seed)
.sample_iter(&Uniform::new(0u32, max_value))
.sample_iter(&Uniform::new(0u32, max_value).unwrap())
.take(n_elems)
.collect::<Vec<u32>>()
}

View File

@@ -397,7 +397,10 @@ mod bench {
let mut seed: [u8; 32] = [0; 32];
seed[31] = seed_val;
let mut rng = StdRng::from_seed(seed);
(0u32..).filter(|_| rng.gen_bool(ratio)).take(n).collect()
(0u32..)
.filter(|_| rng.random_bool(ratio))
.take(n)
.collect()
}
pub fn generate_array(n: usize, ratio: f64) -> Vec<u32> {

View File

@@ -22,6 +22,12 @@ pub(crate) struct JsonPostingsWriter<Rec: Recorder> {
non_str_posting_writer: SpecializedPostingsWriter<DocIdRecorder>,
}
impl<Rec: Recorder> From<JsonPostingsWriter<Rec>> for Box<dyn PostingsWriter> {
fn from(json_postings_writer: JsonPostingsWriter<Rec>) -> Box<dyn PostingsWriter> {
Box::new(json_postings_writer)
}
}
impl<Rec: Recorder> PostingsWriter for JsonPostingsWriter<Rec> {
#[inline]
fn subscribe(

View File

@@ -604,13 +604,13 @@ mod bench {
let mut index_writer: IndexWriter = index.writer_for_tests().unwrap();
for _ in 0..posting_list_size {
let mut doc = TantivyDocument::default();
if rng.gen_bool(1f64 / 15f64) {
if rng.random_bool(1f64 / 15f64) {
doc.add_text(text_field, "a");
}
if rng.gen_bool(1f64 / 10f64) {
if rng.random_bool(1f64 / 10f64) {
doc.add_text(text_field, "b");
}
if rng.gen_bool(1f64 / 5f64) {
if rng.random_bool(1f64 / 5f64) {
doc.add_text(text_field, "c");
}
doc.add_text(text_field, "d");

View File

@@ -1,15 +1,16 @@
use crate::postings::json_postings_writer::JsonPostingsWriter;
use crate::postings::postings_writer::{PostingsWriterEnum, SpecializedPostingsWriter};
use crate::postings::postings_writer::SpecializedPostingsWriter;
use crate::postings::recorder::{DocIdRecorder, TermFrequencyRecorder, TfAndPositionRecorder};
use crate::postings::PostingsWriter;
use crate::schema::{Field, FieldEntry, FieldType, IndexRecordOption, Schema};
pub(crate) struct PerFieldPostingsWriter {
per_field_postings_writers: Vec<PostingsWriterEnum>,
per_field_postings_writers: Vec<Box<dyn PostingsWriter>>,
}
impl PerFieldPostingsWriter {
pub fn for_schema(schema: &Schema) -> Self {
let per_field_postings_writers: Vec<PostingsWriterEnum> = schema
let per_field_postings_writers = schema
.fields()
.map(|(_, field_entry)| posting_writer_from_field_entry(field_entry))
.collect();
@@ -18,16 +19,16 @@ impl PerFieldPostingsWriter {
}
}
pub(crate) fn get_for_field(&self, field: Field) -> &PostingsWriterEnum {
&self.per_field_postings_writers[field.field_id() as usize]
pub(crate) fn get_for_field(&self, field: Field) -> &dyn PostingsWriter {
self.per_field_postings_writers[field.field_id() as usize].as_ref()
}
pub(crate) fn get_for_field_mut(&mut self, field: Field) -> &mut PostingsWriterEnum {
&mut self.per_field_postings_writers[field.field_id() as usize]
pub(crate) fn get_for_field_mut(&mut self, field: Field) -> &mut dyn PostingsWriter {
self.per_field_postings_writers[field.field_id() as usize].as_mut()
}
}
fn posting_writer_from_field_entry(field_entry: &FieldEntry) -> PostingsWriterEnum {
fn posting_writer_from_field_entry(field_entry: &FieldEntry) -> Box<dyn PostingsWriter> {
match *field_entry.field_type() {
FieldType::Str(ref text_options) => text_options
.get_indexing_options()
@@ -50,7 +51,7 @@ fn posting_writer_from_field_entry(field_entry: &FieldEntry) -> PostingsWriterEn
| FieldType::Date(_)
| FieldType::Bytes(_)
| FieldType::IpAddr(_)
| FieldType::Facet(_) => <SpecializedPostingsWriter<DocIdRecorder>>::default().into(),
| FieldType::Facet(_) => Box::<SpecializedPostingsWriter<DocIdRecorder>>::default(),
FieldType::JsonObject(ref json_object_options) => {
if let Some(text_indexing_option) = json_object_options.get_text_indexing_options() {
match text_indexing_option.index_option() {

View File

@@ -7,10 +7,7 @@ use stacker::Addr;
use crate::fieldnorm::FieldNormReaders;
use crate::indexer::indexing_term::IndexingTerm;
use crate::indexer::path_to_unordered_id::OrderedPathId;
use crate::postings::json_postings_writer::JsonPostingsWriter;
use crate::postings::recorder::{
BufferLender, DocIdRecorder, Recorder, TermFrequencyRecorder, TfAndPositionRecorder,
};
use crate::postings::recorder::{BufferLender, Recorder};
use crate::postings::{
FieldSerializer, IndexingContext, InvertedIndexSerializer, PerFieldPostingsWriter,
};
@@ -103,141 +100,6 @@ pub(crate) struct IndexingPosition {
pub end_position: u32,
}
pub enum PostingsWriterEnum {
DocId(SpecializedPostingsWriter<DocIdRecorder>),
DocIdTf(SpecializedPostingsWriter<TermFrequencyRecorder>),
DocTfAndPosition(SpecializedPostingsWriter<TfAndPositionRecorder>),
JsonDocId(JsonPostingsWriter<DocIdRecorder>),
JsonDocIdTf(JsonPostingsWriter<TermFrequencyRecorder>),
JsonDocTfAndPosition(JsonPostingsWriter<TfAndPositionRecorder>),
}
impl From<SpecializedPostingsWriter<DocIdRecorder>> for PostingsWriterEnum {
fn from(doc_id_recorder_writer: SpecializedPostingsWriter<DocIdRecorder>) -> Self {
PostingsWriterEnum::DocId(doc_id_recorder_writer)
}
}
impl From<SpecializedPostingsWriter<TermFrequencyRecorder>> for PostingsWriterEnum {
fn from(doc_id_tf_recorder_writer: SpecializedPostingsWriter<TermFrequencyRecorder>) -> Self {
PostingsWriterEnum::DocIdTf(doc_id_tf_recorder_writer)
}
}
impl From<SpecializedPostingsWriter<TfAndPositionRecorder>> for PostingsWriterEnum {
fn from(
doc_id_tf_and_positions_recorder_writer: SpecializedPostingsWriter<TfAndPositionRecorder>,
) -> Self {
PostingsWriterEnum::DocTfAndPosition(doc_id_tf_and_positions_recorder_writer)
}
}
impl From<JsonPostingsWriter<DocIdRecorder>> for PostingsWriterEnum {
fn from(doc_id_recorder_writer: JsonPostingsWriter<DocIdRecorder>) -> Self {
PostingsWriterEnum::JsonDocId(doc_id_recorder_writer)
}
}
impl From<JsonPostingsWriter<TermFrequencyRecorder>> for PostingsWriterEnum {
fn from(doc_id_tf_recorder_writer: JsonPostingsWriter<TermFrequencyRecorder>) -> Self {
PostingsWriterEnum::JsonDocIdTf(doc_id_tf_recorder_writer)
}
}
impl From<JsonPostingsWriter<TfAndPositionRecorder>> for PostingsWriterEnum {
fn from(
doc_id_tf_and_positions_recorder_writer: JsonPostingsWriter<TfAndPositionRecorder>,
) -> Self {
PostingsWriterEnum::JsonDocTfAndPosition(doc_id_tf_and_positions_recorder_writer)
}
}
impl PostingsWriter for PostingsWriterEnum {
fn subscribe(&mut self, doc: DocId, pos: u32, term: &IndexingTerm, ctx: &mut IndexingContext) {
match self {
PostingsWriterEnum::DocId(writer) => writer.subscribe(doc, pos, term, ctx),
PostingsWriterEnum::DocIdTf(writer) => writer.subscribe(doc, pos, term, ctx),
PostingsWriterEnum::DocTfAndPosition(writer) => writer.subscribe(doc, pos, term, ctx),
PostingsWriterEnum::JsonDocId(writer) => writer.subscribe(doc, pos, term, ctx),
PostingsWriterEnum::JsonDocIdTf(writer) => writer.subscribe(doc, pos, term, ctx),
PostingsWriterEnum::JsonDocTfAndPosition(writer) => {
writer.subscribe(doc, pos, term, ctx)
}
}
}
fn serialize(
&self,
term_addrs: &[(Field, OrderedPathId, &[u8], Addr)],
ordered_id_to_path: &[&str],
ctx: &IndexingContext,
serializer: &mut FieldSerializer,
) -> io::Result<()> {
match self {
PostingsWriterEnum::DocId(writer) => {
writer.serialize(term_addrs, ordered_id_to_path, ctx, serializer)
}
PostingsWriterEnum::DocIdTf(writer) => {
writer.serialize(term_addrs, ordered_id_to_path, ctx, serializer)
}
PostingsWriterEnum::DocTfAndPosition(writer) => {
writer.serialize(term_addrs, ordered_id_to_path, ctx, serializer)
}
PostingsWriterEnum::JsonDocId(writer) => {
writer.serialize(term_addrs, ordered_id_to_path, ctx, serializer)
}
PostingsWriterEnum::JsonDocIdTf(writer) => {
writer.serialize(term_addrs, ordered_id_to_path, ctx, serializer)
}
PostingsWriterEnum::JsonDocTfAndPosition(writer) => {
writer.serialize(term_addrs, ordered_id_to_path, ctx, serializer)
}
}
}
/// Tokenize a text and subscribe all of its token.
fn index_text(
&mut self,
doc_id: DocId,
token_stream: &mut dyn TokenStream,
term_buffer: &mut IndexingTerm,
ctx: &mut IndexingContext,
indexing_position: &mut IndexingPosition,
) {
match self {
PostingsWriterEnum::DocId(writer) => {
writer.index_text(doc_id, token_stream, term_buffer, ctx, indexing_position)
}
PostingsWriterEnum::DocIdTf(writer) => {
writer.index_text(doc_id, token_stream, term_buffer, ctx, indexing_position)
}
PostingsWriterEnum::DocTfAndPosition(writer) => {
writer.index_text(doc_id, token_stream, term_buffer, ctx, indexing_position)
}
PostingsWriterEnum::JsonDocId(writer) => {
writer.index_text(doc_id, token_stream, term_buffer, ctx, indexing_position)
}
PostingsWriterEnum::JsonDocIdTf(writer) => {
writer.index_text(doc_id, token_stream, term_buffer, ctx, indexing_position)
}
PostingsWriterEnum::JsonDocTfAndPosition(writer) => {
writer.index_text(doc_id, token_stream, term_buffer, ctx, indexing_position)
}
}
}
fn total_num_tokens(&self) -> u64 {
match self {
PostingsWriterEnum::DocId(writer) => writer.total_num_tokens(),
PostingsWriterEnum::DocIdTf(writer) => writer.total_num_tokens(),
PostingsWriterEnum::DocTfAndPosition(writer) => writer.total_num_tokens(),
PostingsWriterEnum::JsonDocId(writer) => writer.total_num_tokens(),
PostingsWriterEnum::JsonDocIdTf(writer) => writer.total_num_tokens(),
PostingsWriterEnum::JsonDocTfAndPosition(writer) => writer.total_num_tokens(),
}
}
}
/// The `PostingsWriter` is in charge of receiving documenting
/// and building a `Segment` in anonymous memory.
///
@@ -309,6 +171,14 @@ pub(crate) struct SpecializedPostingsWriter<Rec: Recorder> {
_recorder_type: PhantomData<Rec>,
}
impl<Rec: Recorder> From<SpecializedPostingsWriter<Rec>> for Box<dyn PostingsWriter> {
fn from(
specialized_postings_writer: SpecializedPostingsWriter<Rec>,
) -> Box<dyn PostingsWriter> {
Box::new(specialized_postings_writer)
}
}
impl<Rec: Recorder> SpecializedPostingsWriter<Rec> {
#[inline]
pub(crate) fn serialize_one_term(

View File

@@ -55,9 +55,7 @@ pub struct InvertedIndexSerializer {
impl InvertedIndexSerializer {
/// Open a new `InvertedIndexSerializer` for the given segment
pub fn open<C: crate::codec::Codec>(
segment: &mut Segment<C>,
) -> crate::Result<InvertedIndexSerializer> {
pub fn open(segment: &mut Segment) -> crate::Result<InvertedIndexSerializer> {
use crate::index::SegmentComponent::{Positions, Postings, Terms};
let inv_index_serializer = InvertedIndexSerializer {
terms_write: CompositeWrite::wrap(segment.open_write(Terms)?),

View File

@@ -311,7 +311,7 @@ mod tests {
#![proptest_config(ProptestConfig::with_cases(50))]
#[test]
fn test_phrase_regex_with_random_strings(mut random_strings in proptest::collection::vec("[c-z ]{0,10}", 1..100), num_occurrences in 1..150_usize) {
let mut rng = rand::thread_rng();
let mut rng = rand::rng();
// Insert "aaa ccc" the specified number of times into the list
for _ in 0..num_occurrences {

View File

@@ -429,7 +429,7 @@ mod tests {
docs.push(doc);
}
docs.shuffle(&mut rand::thread_rng());
docs.shuffle(&mut rand::rng());
let mut docs_it = docs.into_iter();
for doc in (&mut docs_it).take(50) {
index_writer.add_document(doc)?;

View File

@@ -491,7 +491,7 @@ mod tests {
use common::DateTime;
use proptest::prelude::*;
use rand::rngs::StdRng;
use rand::seq::SliceRandom;
use rand::seq::IndexedRandom;
use rand::SeedableRng;
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;

View File

@@ -304,10 +304,10 @@ mod tests {
let mut writer: IndexWriter =
index.writer_with_num_threads(3, 3 * MEMORY_BUDGET_NUM_BYTES_MIN)?;
use rand::Rng;
let mut rng = rand::thread_rng();
let mut rng = rand::rng();
writer.set_merge_policy(Box::new(NoMergePolicy));
for _ in 0..3_000 {
let term_freq = rng.gen_range(1..10000);
let term_freq = rng.random_range(1..10000);
let words: Vec<&str> = std::iter::repeat_n("bbbb", term_freq).collect();
let text = words.join(" ");
writer.add_document(doc!(text_field=>text))?;

View File

@@ -7,7 +7,6 @@ use arc_swap::ArcSwap;
pub use warming::Warmer;
use self::warming::WarmingState;
use crate::codec::Codec;
use crate::core::searcher::{SearcherGeneration, SearcherInner};
use crate::directory::{Directory, WatchCallback, WatchHandle, META_LOCK};
use crate::store::DOCSTORE_CACHE_CAPACITY;
@@ -39,17 +38,17 @@ pub enum ReloadPolicy {
/// - number of warming threads, for parallelizing warming work
/// - The cache size of the underlying doc store readers.
#[derive(Clone)]
pub struct IndexReaderBuilder<C: Codec = crate::codec::StandardCodec> {
pub struct IndexReaderBuilder {
reload_policy: ReloadPolicy,
index: Index<C>,
index: Index,
warmers: Vec<Weak<dyn Warmer>>,
num_warming_threads: usize,
doc_store_cache_num_blocks: usize,
}
impl<C: Codec> IndexReaderBuilder<C> {
impl IndexReaderBuilder {
#[must_use]
pub(crate) fn new(index: Index<C>) -> IndexReaderBuilder<C> {
pub(crate) fn new(index: Index) -> IndexReaderBuilder {
IndexReaderBuilder {
reload_policy: ReloadPolicy::OnCommitWithDelay,
index,
@@ -64,7 +63,7 @@ impl<C: Codec> IndexReaderBuilder<C> {
/// Building the reader is a non-trivial operation that requires
/// to open different segment readers. It may take hundreds of milliseconds
/// of time and it may return an error.
pub fn try_into(self) -> crate::Result<IndexReader<C>> {
pub fn try_into(self) -> crate::Result<IndexReader> {
let searcher_generation_inventory = Inventory::default();
let warming_state = WarmingState::new(
self.num_warming_threads,
@@ -107,7 +106,7 @@ impl<C: Codec> IndexReaderBuilder<C> {
///
/// See [`ReloadPolicy`] for more details.
#[must_use]
pub fn reload_policy(mut self, reload_policy: ReloadPolicy) -> IndexReaderBuilder<C> {
pub fn reload_policy(mut self, reload_policy: ReloadPolicy) -> IndexReaderBuilder {
self.reload_policy = reload_policy;
self
}
@@ -119,14 +118,14 @@ impl<C: Codec> IndexReaderBuilder<C> {
pub fn doc_store_cache_num_blocks(
mut self,
doc_store_cache_num_blocks: usize,
) -> IndexReaderBuilder<C> {
) -> IndexReaderBuilder {
self.doc_store_cache_num_blocks = doc_store_cache_num_blocks;
self
}
/// Set the [`Warmer`]s that are invoked when reloading searchable segments.
#[must_use]
pub fn warmers(mut self, warmers: Vec<Weak<dyn Warmer>>) -> IndexReaderBuilder<C> {
pub fn warmers(mut self, warmers: Vec<Weak<dyn Warmer>>) -> IndexReaderBuilder {
self.warmers = warmers;
self
}
@@ -136,33 +135,33 @@ impl<C: Codec> IndexReaderBuilder<C> {
/// This allows parallelizing warming work when there are multiple [`Warmer`] registered with
/// the [`IndexReader`].
#[must_use]
pub fn num_warming_threads(mut self, num_warming_threads: usize) -> IndexReaderBuilder<C> {
pub fn num_warming_threads(mut self, num_warming_threads: usize) -> IndexReaderBuilder {
self.num_warming_threads = num_warming_threads;
self
}
}
impl<C: Codec> TryInto<IndexReader<C>> for IndexReaderBuilder<C> {
impl TryInto<IndexReader> for IndexReaderBuilder {
type Error = crate::TantivyError;
fn try_into(self) -> crate::Result<IndexReader<C>> {
fn try_into(self) -> crate::Result<IndexReader> {
IndexReaderBuilder::try_into(self)
}
}
struct InnerIndexReader<C: Codec> {
struct InnerIndexReader {
doc_store_cache_num_blocks: usize,
index: Index<C>,
index: Index,
warming_state: WarmingState,
searcher: arc_swap::ArcSwap<SearcherInner>,
searcher_generation_counter: Arc<AtomicU64>,
searcher_generation_inventory: Inventory<SearcherGeneration>,
}
impl<C: Codec> InnerIndexReader<C> {
impl InnerIndexReader {
fn new(
doc_store_cache_num_blocks: usize,
index: Index<C>,
index: Index,
warming_state: WarmingState,
// The searcher_generation_inventory is not used as source, but as target to track the
// loaded segments.
@@ -190,7 +189,7 @@ impl<C: Codec> InnerIndexReader<C> {
///
/// This function acquires a lock to prevent GC from removing files
/// as we are opening our index.
fn open_segment_readers(index: &Index<C>) -> crate::Result<Vec<SegmentReader>> {
fn open_segment_readers(index: &Index) -> crate::Result<Vec<SegmentReader>> {
// Prevents segment files from getting deleted while we are in the process of opening them
let _meta_lock = index.directory().acquire_lock(&META_LOCK)?;
let searchable_segments = index.searchable_segments()?;
@@ -213,7 +212,7 @@ impl<C: Codec> InnerIndexReader<C> {
}
fn create_searcher(
index: &Index<C>,
index: &Index,
doc_store_cache_num_blocks: usize,
warming_state: &WarmingState,
searcher_generation_counter: &Arc<AtomicU64>,
@@ -227,10 +226,9 @@ impl<C: Codec> InnerIndexReader<C> {
);
let schema = index.schema();
// SearcherInner uses Index<StandardCodec> since the codec doesn't affect reading
let searcher = Arc::new(SearcherInner::new(
schema,
index.with_standard_codec(),
index.clone(),
segment_readers,
searcher_generation,
doc_store_cache_num_blocks,
@@ -266,14 +264,14 @@ impl<C: Codec> InnerIndexReader<C> {
///
/// `IndexReader` just wraps an `Arc`.
#[derive(Clone)]
pub struct IndexReader<C: Codec = crate::codec::StandardCodec> {
inner: Arc<InnerIndexReader<C>>,
pub struct IndexReader {
inner: Arc<InnerIndexReader>,
_watch_handle_opt: Option<WatchHandle>,
}
impl<C: Codec> IndexReader<C> {
impl IndexReader {
#[cfg(test)]
pub(crate) fn index(&self) -> Index<C> {
pub(crate) fn index(&self) -> Index {
self.inner.index.clone()
}

View File

@@ -95,7 +95,7 @@ impl<'a> TermMerger<'a> {
#[cfg(all(test, feature = "unstable"))]
mod bench {
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use rand::{rng, Rng};
use test::{self, Bencher};
use super::TermMerger;
@@ -117,9 +117,9 @@ mod bench {
let buffer: Vec<u8> = {
let mut terms = vec![];
for _i in 0..num_terms {
let rand_string: String = thread_rng()
let rand_string: String = rng()
.sample_iter(&Alphanumeric)
.take(thread_rng().gen_range(30..42))
.take(rng().random_range(30..42))
.map(char::from)
.collect();
terms.push(rand_string);

View File

@@ -25,7 +25,7 @@ zstd-compression = ["zstd"]
proptest = "1"
criterion = { version = "0.5", default-features = false }
names = "0.14"
rand = "0.8"
rand = "0.9"
[[bench]]
name = "stream_bench"

View File

@@ -10,9 +10,9 @@ use tantivy_sstable::{Dictionary, MonotonicU64SSTable};
const CHARSET: &[u8] = b"abcdefghij";
fn generate_key(rng: &mut impl Rng) -> String {
let len = rng.gen_range(3..12);
let len = rng.random_range(3..12);
std::iter::from_fn(|| {
let idx = rng.gen_range(0..CHARSET.len());
let idx = rng.random_range(0..CHARSET.len());
Some(CHARSET[idx] as char)
})
.take(len)

View File

@@ -23,12 +23,12 @@ name = "hashmap"
path = "example/hashmap.rs"
[dev-dependencies]
rand = "0.8.5"
rand = "0.9"
zipf = "7.0.0"
rustc-hash = "2.1.0"
proptest = "1.2.0"
binggan = { version = "0.14.0" }
rand_distr = "0.4.3"
rand_distr = "0.5"
[features]
compare_hash_only = ["ahash"] # Compare hash only, not the key in the Hashmap

View File

@@ -90,10 +90,10 @@ fn bench_vint() {
}
// benchmark zipfs distribution numbers
{
use rand::distributions::Distribution;
use rand::distr::Distribution;
use rand::rngs::StdRng;
let mut rng = StdRng::from_seed([3u8; 32]);
let zipf = zipf::ZipfDistribution::new(10_000, 1.03).unwrap();
let zipf = rand_distr::Zipf::new(10_000.0f64, 1.03).unwrap();
let numbers: Vec<[u8; 8]> = (0..num_numbers)
.map(|_| zipf.sample(&mut rng).to_le_bytes())
.collect();

View File

@@ -7,8 +7,8 @@ edition = "2021"
[dependencies]
ahash = "0.8.7"
rand = "0.8.5"
rand_distr = "0.4.3"
rand = "0.9"
rand_distr = "0.5"
tantivy-stacker = { version = "0.2.0", path = ".." }
[workspace]

View File

@@ -14,7 +14,7 @@ fn test_with_seed(seed: u64) {
let mut hash_map = AHashMap::new();
let mut arena_hashmap = ArenaHashMap::default();
let mut rng = StdRng::seed_from_u64(seed);
let key_count = rng.gen_range(1_000..=1_000_000);
let key_count = rng.random_range(1_000..=1_000_000);
let exp = Exp::new(0.05).unwrap();
for _ in 0..key_count {