Compare commits

..

3 Commits

Author SHA1 Message Date
Pascal Seitz
9c12860b01 extend proptest to cover bytes field codec bug 2022-02-18 10:50:46 +01:00
Pascal Seitz
886245ad21 Fix opening bytes index with dynamic codec
Fix #1278
2022-02-18 07:08:43 +01:00
Shikhar Bhushan
505e6a440c Remove test assertion sensitive to background segment merging (#1274) 2022-02-17 10:59:46 +09:00
11 changed files with 46 additions and 100118 deletions

View File

@@ -67,7 +67,6 @@ proptest = "1.0"
criterion = "0.3.5"
test-log = "0.2.8"
env_logger = "0.9.0"
pprof = {version= "0.6", features=["flamegraph", "criterion"]}
[dev-dependencies.fail]
version = "0.5"
@@ -78,11 +77,6 @@ opt-level = 3
debug = false
debug-assertions = false
[profile.bench]
opt-level = 3
debug = true
debug-assertions = false
[profile.test]
debug-assertions = true
overflow-checks = true
@@ -116,7 +110,3 @@ required-features = ["fail/failpoints"]
[[bench]]
name = "analyzer"
harness = false
[[bench]]
name = "index-bench"
harness = false

File diff suppressed because it is too large Load Diff

View File

@@ -1,80 +0,0 @@
use criterion::{criterion_group, criterion_main, Criterion};
use tantivy::schema::{INDEXED, STORED, STRING, TEXT};
use tantivy::Index;
use pprof::criterion::{Output, PProfProfiler};
const HDFS_LOGS: &str = include_str!("hdfs.json");
pub fn hdfs_index_benchmark(c: &mut Criterion) {
let schema = {
let mut schema_builder = tantivy::schema::SchemaBuilder::new();
schema_builder.add_u64_field("timestamp", INDEXED);
schema_builder.add_text_field("body", TEXT);
schema_builder.add_text_field("severity", STRING);
schema_builder.build()
};
let schema_with_store = {
let mut schema_builder = tantivy::schema::SchemaBuilder::new();
schema_builder.add_u64_field("timestamp", INDEXED | STORED);
schema_builder.add_text_field("body", TEXT | STORED);
schema_builder.add_text_field("severity", STRING | STORED);
schema_builder.build()
};
let mut group = c.benchmark_group("index-hdfs");
group.sample_size(20);
group.bench_function("index-hdfs-no-commit", |b| {
b.iter(|| {
let index = Index::create_in_ram(schema.clone());
let index_writer = index.writer_with_num_threads(1, 100_000_000).unwrap();
for _ in 0..10 {
for doc_json in HDFS_LOGS.trim().split("\n") {
let doc = schema.parse_document(doc_json).unwrap();
index_writer.add_document(doc).unwrap();
}
}
})
});
group.bench_function("index-hdfs-with-commit", |b| {
b.iter(|| {
let index = Index::create_in_ram(schema.clone());
let mut index_writer = index.writer_with_num_threads(1, 100_000_000).unwrap();
for _ in 0..10 {
for doc_json in HDFS_LOGS.trim().split("\n") {
let doc = schema.parse_document(doc_json).unwrap();
index_writer.add_document(doc).unwrap();
}
}
index_writer.commit().unwrap();
})
});
group.bench_function("index-hdfs-no-commit-with-docstore", |b| {
b.iter(|| {
let index = Index::create_in_ram(schema_with_store.clone());
let index_writer = index.writer_with_num_threads(1, 100_000_000).unwrap();
for doc_json in HDFS_LOGS.trim().split("\n") {
let doc = schema.parse_document(doc_json).unwrap();
index_writer.add_document(doc).unwrap();
}
})
});
group.bench_function("index-hdfs-with-commit-with-docstore", |b| {
b.iter(|| {
let index = Index::create_in_ram(schema_with_store.clone());
let mut index_writer = index.writer_with_num_threads(1, 100_000_000).unwrap();
for doc_json in HDFS_LOGS.trim().split("\n") {
let doc = schema.parse_document(doc_json).unwrap();
index_writer.add_document(doc).unwrap();
}
index_writer.commit().unwrap();
})
});
}
criterion_group! {
name = benches;
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
targets = hdfs_index_benchmark
}
criterion_main!(benches);

View File

@@ -1,5 +1,5 @@
use crate::directory::{FileSlice, OwnedBytes};
use crate::fastfield::{BitpackedFastFieldReader, FastFieldReader, MultiValueLength};
use crate::fastfield::{DynamicFastFieldReader, FastFieldReader, MultiValueLength};
use crate::DocId;
/// Reader for byte array fast fields
@@ -14,13 +14,13 @@ use crate::DocId;
/// and the start index for the next document, and keeping the bytes in between.
#[derive(Clone)]
pub struct BytesFastFieldReader {
idx_reader: BitpackedFastFieldReader<u64>,
idx_reader: DynamicFastFieldReader<u64>,
values: OwnedBytes,
}
impl BytesFastFieldReader {
pub(crate) fn open(
idx_reader: BitpackedFastFieldReader<u64>,
idx_reader: DynamicFastFieldReader<u64>,
values_file: FileSlice,
) -> crate::Result<BytesFastFieldReader> {
let values = values_file.read_bytes()?;

View File

@@ -26,7 +26,6 @@ pub use self::bytes::{BytesFastFieldReader, BytesFastFieldWriter};
pub use self::error::{FastFieldNotAvailableError, Result};
pub use self::facet_reader::FacetReader;
pub use self::multivalued::{MultiValuedFastFieldReader, MultiValuedFastFieldWriter};
pub(crate) use self::reader::BitpackedFastFieldReader;
pub use self::reader::{DynamicFastFieldReader, FastFieldReader};
pub use self::readers::FastFieldReaders;
pub use self::serializer::{CompositeFastFieldSerializer, FastFieldDataAccess, FastFieldStats};

View File

@@ -248,8 +248,6 @@ impl<Item: FastValue, C: FastFieldCodecReader + Clone> FastFieldReader<Item>
}
}
pub(crate) type BitpackedFastFieldReader<Item> = FastFieldReaderCodecWrapper<Item, BitpackedReader>;
impl<Item: FastValue> From<Vec<Item>> for DynamicFastFieldReader<Item> {
fn from(vals: Vec<Item>) -> DynamicFastFieldReader<Item> {
let mut schema_builder = Schema::builder();

View File

@@ -1,8 +1,7 @@
use super::reader::DynamicFastFieldReader;
use crate::directory::{CompositeFile, FileSlice};
use crate::fastfield::{
BitpackedFastFieldReader, BytesFastFieldReader, FastFieldNotAvailableError, FastValue,
MultiValuedFastFieldReader,
BytesFastFieldReader, FastFieldNotAvailableError, FastValue, MultiValuedFastFieldReader,
};
use crate::schema::{Cardinality, Field, FieldType, Schema};
use crate::space_usage::PerFieldSpaceUsage;
@@ -219,7 +218,7 @@ impl FastFieldReaders {
)));
}
let fast_field_idx_file = self.fast_field_data(field, 0)?;
let idx_reader = BitpackedFastFieldReader::open(fast_field_idx_file)?;
let idx_reader = DynamicFastFieldReader::open(fast_field_idx_file)?;
let data = self.fast_field_data(field, 1)?;
BytesFastFieldReader::open(idx_reader, data)
} else {

View File

@@ -1389,6 +1389,7 @@ mod tests {
) -> crate::Result<()> {
let mut schema_builder = schema::Schema::builder();
let id_field = schema_builder.add_u64_field("id", FAST | INDEXED | STORED);
let bytes_field = schema_builder.add_bytes_field("bytes", FAST | INDEXED | STORED);
let text_field = schema_builder.add_text_field(
"text_field",
TextOptions::default()
@@ -1435,8 +1436,14 @@ mod tests {
match op {
IndexingOp::AddDoc { id } => {
let facet = Facet::from(&("/cola/".to_string() + &id.to_string()));
index_writer
.add_document(doc!(id_field=>id, multi_numbers=> id, multi_numbers => id, text_field => id.to_string(), facet_field => facet, large_text_field=> LOREM))?;
index_writer.add_document(doc!(id_field=>id,
bytes_field => id.to_le_bytes().as_slice(),
multi_numbers=> id,
multi_numbers => id,
text_field => id.to_string(),
facet_field => facet,
large_text_field=> LOREM
))?;
}
IndexingOp::DeleteDoc { id } => {
index_writer.delete_term(Term::from_field_u64(id_field, id));

View File

@@ -1,4 +1,4 @@
use common::read_u32_vint;
use common::{read_u32_vint, write_u32_vint};
use super::stacker::{ExpUnrolledLinkedList, MemoryArena};
use crate::indexer::doc_id_mapping::DocIdMapping;
@@ -104,7 +104,7 @@ impl Recorder for NothingRecorder {
fn new_doc(&mut self, doc: DocId, arena: &mut MemoryArena) {
self.current_doc = doc;
self.stack.writer(arena).write_u32_vint(doc);
let _ = write_u32_vint(doc, &mut self.stack.writer(arena));
}
fn record_position(&mut self, _position: u32, _arena: &mut MemoryArena) {}
@@ -169,7 +169,7 @@ impl Recorder for TermFrequencyRecorder {
fn new_doc(&mut self, doc: DocId, arena: &mut MemoryArena) {
self.term_doc_freq += 1;
self.current_doc = doc;
self.stack.writer(arena).write_u32_vint(doc);
let _ = write_u32_vint(doc, &mut self.stack.writer(arena));
}
fn record_position(&mut self, _position: u32, _arena: &mut MemoryArena) {
@@ -178,7 +178,7 @@ impl Recorder for TermFrequencyRecorder {
fn close_doc(&mut self, arena: &mut MemoryArena) {
debug_assert!(self.current_tf > 0);
self.stack.writer(arena).write_u32_vint(self.current_tf);
let _ = write_u32_vint(self.current_tf, &mut self.stack.writer(arena));
self.current_tf = 0;
}
@@ -239,15 +239,15 @@ impl Recorder for TfAndPositionRecorder {
fn new_doc(&mut self, doc: DocId, arena: &mut MemoryArena) {
self.current_doc = doc;
self.term_doc_freq += 1u32;
self.stack.writer(arena).write_u32_vint(doc);
let _ = write_u32_vint(doc, &mut self.stack.writer(arena));
}
fn record_position(&mut self, position: u32, arena: &mut MemoryArena) {
self.stack.writer(arena).write_u32_vint(position.wrapping_add(1u32));
let _ = write_u32_vint(position + 1u32, &mut self.stack.writer(arena));
}
fn close_doc(&mut self, arena: &mut MemoryArena) {
self.stack.writer(arena).write_u32_vint(POSITION_END);
let _ = write_u32_vint(POSITION_END, &mut self.stack.writer(arena));
}
fn serialize(

View File

@@ -1,5 +1,4 @@
use std::mem;
use common::serialize_vint_u32;
use std::{io, mem};
use super::{Addr, MemoryArena};
use crate::postings::stacker::memory_arena::{load, store};
@@ -98,14 +97,12 @@ fn ensure_capacity<'a>(
}
impl<'a> ExpUnrolledLinkedListWriter<'a> {
pub fn write_u32_vint(&mut self, val: u32) {
let mut buf = [0u8; 8];
let data = serialize_vint_u32(val, &mut buf);
self.extend_from_slice(data);
}
pub fn extend_from_slice(&mut self, mut buf: &[u8]) {
if buf.is_empty() {
// we need to cut early, because `ensure_capacity`
// allocates if there is no capacity at all right now.
return;
}
while !buf.is_empty() {
let add_len: usize;
{
@@ -120,6 +117,25 @@ impl<'a> ExpUnrolledLinkedListWriter<'a> {
}
}
impl<'a> io::Write for ExpUnrolledLinkedListWriter<'a> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
// There is no use case to only write the capacity.
// This is not IO after all, so we write the whole
// buffer even if the contract of `.write` is looser.
self.extend_from_slice(buf);
Ok(buf.len())
}
fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
self.extend_from_slice(buf);
Ok(())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl ExpUnrolledLinkedList {
pub fn new() -> ExpUnrolledLinkedList {
ExpUnrolledLinkedList {

View File

@@ -283,7 +283,6 @@ mod tests {
let warming_state = &reader.inner.warming_state;
let searcher = reader.searcher();
assert_eq!(searcher.segment_readers().len(), num_writer_threads);
assert!(
!warming_state.gc_maybe(),
"no GC after first searcher generation"