Compare commits

...

2 Commits

Author SHA1 Message Date
Paul Masurel
3d04266997 added a bench to measure the perf of indexing logs 2022-02-18 18:53:09 +09:00
Paul Masurel
b98e7349f2 Simplified expull code. 2022-02-18 18:15:02 +09:00
5 changed files with 100106 additions and 32 deletions

View File

@@ -67,6 +67,7 @@ proptest = "1.0"
criterion = "0.3.5"
test-log = "0.2.8"
env_logger = "0.9.0"
pprof = {version= "0.6", features=["flamegraph", "criterion"]}
[dev-dependencies.fail]
version = "0.5"
@@ -77,6 +78,11 @@ opt-level = 3
debug = false
debug-assertions = false
[profile.bench]
opt-level = 3
debug = true
debug-assertions = false
[profile.test]
debug-assertions = true
overflow-checks = true
@@ -110,3 +116,7 @@ required-features = ["fail/failpoints"]
[[bench]]
name = "analyzer"
harness = false
[[bench]]
name = "index-bench"
harness = false

100000
benches/hdfs.json Normal file

File diff suppressed because it is too large Load Diff

80
benches/index-bench.rs Normal file
View File

@@ -0,0 +1,80 @@
use criterion::{criterion_group, criterion_main, Criterion};
use tantivy::schema::{INDEXED, STORED, STRING, TEXT};
use tantivy::Index;
use pprof::criterion::{Output, PProfProfiler};
const HDFS_LOGS: &str = include_str!("hdfs.json");
pub fn hdfs_index_benchmark(c: &mut Criterion) {
let schema = {
let mut schema_builder = tantivy::schema::SchemaBuilder::new();
schema_builder.add_u64_field("timestamp", INDEXED);
schema_builder.add_text_field("body", TEXT);
schema_builder.add_text_field("severity", STRING);
schema_builder.build()
};
let schema_with_store = {
let mut schema_builder = tantivy::schema::SchemaBuilder::new();
schema_builder.add_u64_field("timestamp", INDEXED | STORED);
schema_builder.add_text_field("body", TEXT | STORED);
schema_builder.add_text_field("severity", STRING | STORED);
schema_builder.build()
};
let mut group = c.benchmark_group("index-hdfs");
group.sample_size(20);
group.bench_function("index-hdfs-no-commit", |b| {
b.iter(|| {
let index = Index::create_in_ram(schema.clone());
let index_writer = index.writer_with_num_threads(1, 100_000_000).unwrap();
for _ in 0..10 {
for doc_json in HDFS_LOGS.trim().split("\n") {
let doc = schema.parse_document(doc_json).unwrap();
index_writer.add_document(doc).unwrap();
}
}
})
});
group.bench_function("index-hdfs-with-commit", |b| {
b.iter(|| {
let index = Index::create_in_ram(schema.clone());
let mut index_writer = index.writer_with_num_threads(1, 100_000_000).unwrap();
for _ in 0..10 {
for doc_json in HDFS_LOGS.trim().split("\n") {
let doc = schema.parse_document(doc_json).unwrap();
index_writer.add_document(doc).unwrap();
}
}
index_writer.commit().unwrap();
})
});
group.bench_function("index-hdfs-no-commit-with-docstore", |b| {
b.iter(|| {
let index = Index::create_in_ram(schema_with_store.clone());
let index_writer = index.writer_with_num_threads(1, 100_000_000).unwrap();
for doc_json in HDFS_LOGS.trim().split("\n") {
let doc = schema.parse_document(doc_json).unwrap();
index_writer.add_document(doc).unwrap();
}
})
});
group.bench_function("index-hdfs-with-commit-with-docstore", |b| {
b.iter(|| {
let index = Index::create_in_ram(schema_with_store.clone());
let mut index_writer = index.writer_with_num_threads(1, 100_000_000).unwrap();
for doc_json in HDFS_LOGS.trim().split("\n") {
let doc = schema.parse_document(doc_json).unwrap();
index_writer.add_document(doc).unwrap();
}
index_writer.commit().unwrap();
})
});
}
criterion_group! {
name = benches;
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
targets = hdfs_index_benchmark
}
criterion_main!(benches);

View File

@@ -1,4 +1,4 @@
use common::{read_u32_vint, write_u32_vint};
use common::read_u32_vint;
use super::stacker::{ExpUnrolledLinkedList, MemoryArena};
use crate::indexer::doc_id_mapping::DocIdMapping;
@@ -104,7 +104,7 @@ impl Recorder for NothingRecorder {
fn new_doc(&mut self, doc: DocId, arena: &mut MemoryArena) {
self.current_doc = doc;
let _ = write_u32_vint(doc, &mut self.stack.writer(arena));
self.stack.writer(arena).write_u32_vint(doc);
}
fn record_position(&mut self, _position: u32, _arena: &mut MemoryArena) {}
@@ -169,7 +169,7 @@ impl Recorder for TermFrequencyRecorder {
fn new_doc(&mut self, doc: DocId, arena: &mut MemoryArena) {
self.term_doc_freq += 1;
self.current_doc = doc;
let _ = write_u32_vint(doc, &mut self.stack.writer(arena));
self.stack.writer(arena).write_u32_vint(doc);
}
fn record_position(&mut self, _position: u32, _arena: &mut MemoryArena) {
@@ -178,7 +178,7 @@ impl Recorder for TermFrequencyRecorder {
fn close_doc(&mut self, arena: &mut MemoryArena) {
debug_assert!(self.current_tf > 0);
let _ = write_u32_vint(self.current_tf, &mut self.stack.writer(arena));
self.stack.writer(arena).write_u32_vint(self.current_tf);
self.current_tf = 0;
}
@@ -239,15 +239,15 @@ impl Recorder for TfAndPositionRecorder {
fn new_doc(&mut self, doc: DocId, arena: &mut MemoryArena) {
self.current_doc = doc;
self.term_doc_freq += 1u32;
let _ = write_u32_vint(doc, &mut self.stack.writer(arena));
self.stack.writer(arena).write_u32_vint(doc);
}
fn record_position(&mut self, position: u32, arena: &mut MemoryArena) {
let _ = write_u32_vint(position + 1u32, &mut self.stack.writer(arena));
self.stack.writer(arena).write_u32_vint(position.wrapping_add(1u32));
}
fn close_doc(&mut self, arena: &mut MemoryArena) {
let _ = write_u32_vint(POSITION_END, &mut self.stack.writer(arena));
self.stack.writer(arena).write_u32_vint(POSITION_END);
}
fn serialize(

View File

@@ -1,4 +1,5 @@
use std::{io, mem};
use std::mem;
use common::serialize_vint_u32;
use super::{Addr, MemoryArena};
use crate::postings::stacker::memory_arena::{load, store};
@@ -97,12 +98,14 @@ fn ensure_capacity<'a>(
}
impl<'a> ExpUnrolledLinkedListWriter<'a> {
pub fn write_u32_vint(&mut self, val: u32) {
let mut buf = [0u8; 8];
let data = serialize_vint_u32(val, &mut buf);
self.extend_from_slice(data);
}
pub fn extend_from_slice(&mut self, mut buf: &[u8]) {
if buf.is_empty() {
// we need to cut early, because `ensure_capacity`
// allocates if there is no capacity at all right now.
return;
}
while !buf.is_empty() {
let add_len: usize;
{
@@ -117,25 +120,6 @@ impl<'a> ExpUnrolledLinkedListWriter<'a> {
}
}
impl<'a> io::Write for ExpUnrolledLinkedListWriter<'a> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
// There is no use case to only write the capacity.
// This is not IO after all, so we write the whole
// buffer even if the contract of `.write` is looser.
self.extend_from_slice(buf);
Ok(buf.len())
}
fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
self.extend_from_slice(buf);
Ok(())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl ExpUnrolledLinkedList {
pub fn new() -> ExpUnrolledLinkedList {
ExpUnrolledLinkedList {