add ip range query benchmark, add seek behaviour

This commit is contained in:
Pascal Seitz
2022-10-25 15:52:32 +08:00
parent a5e59ab598
commit 5e159c26bf

View File

@@ -123,11 +123,22 @@ struct IpRangeDocSet {
/// The range filter on the values.
value_range: RangeInclusive<Ipv6Addr>,
ip_addr_fast_field: Arc<dyn Column<Ipv6Addr>>,
/// The last docid (exclusive) that has been fetched.
fetched_until_doc: u32,
/// The next docid start range to fetch (inclusive).
next_fetch_start: u32,
/// Number of docs range checked in a batch.
///
/// There are two patterns.
/// - We do a full scan. => We can load large chunks. We don't know in advance if seek call
/// will come, so we start with small chunks
/// - We load docs, interspersed with seek calls. When there are big jumps in the seek, we
/// should load small chunks.
fetch_horizon: u32,
/// Current batch of loaded docs.
loaded_docs: VecCursor,
last_seek_pos_opt: Option<u32>,
}
const DEFALT_FETCH_HORIZON: u32 = 128;
impl IpRangeDocSet {
fn new(
value_range: RangeInclusive<Ipv6Addr>,
@@ -137,32 +148,47 @@ impl IpRangeDocSet {
value_range,
ip_addr_fast_field,
loaded_docs: VecCursor::new(),
fetched_until_doc: 0,
next_fetch_start: 0,
fetch_horizon: DEFALT_FETCH_HORIZON,
last_seek_pos_opt: None,
};
ip_range_docset.reset_fetch_range();
ip_range_docset.fetch_block();
ip_range_docset
}
fn reset_fetch_range(&mut self) {
self.fetch_horizon = DEFALT_FETCH_HORIZON;
}
/// Returns true if more data could be fetched
fn fetch_block(&mut self) {
let mut horizon: u32 = 1;
const MAX_HORIZON: u32 = 100_000;
while self.loaded_docs.is_empty() {
let finished_to_end = self.fetch_horizon(horizon);
let finished_to_end = self.fetch_horizon(self.fetch_horizon);
if finished_to_end {
break;
}
// Fetch more data, increase horizon
horizon = (horizon * 2).min(MAX_HORIZON);
// Fetch more data, increase horizon. Horizon only gets reset when doing a seek.
self.fetch_horizon = (self.fetch_horizon * 2).min(MAX_HORIZON);
}
}
/// Fetches a block for docid range [fetched_until_doc .. fetched_until_doc + HORIZON]
/// check if the distance between the seek calls is large
fn is_last_seek_distance_large(&self, new_seek: DocId) -> bool {
if let Some(last_seek_pos) = self.last_seek_pos_opt {
(new_seek - last_seek_pos) >= 128
} else {
true
}
}
/// Fetches a block for docid range [next_fetch_start .. next_fetch_start + HORIZON]
fn fetch_horizon(&mut self, horizon: u32) -> bool {
let mut end = self.fetched_until_doc + horizon;
let mut finished_to_end = false;
let limit = self.ip_addr_fast_field.num_vals();
let mut end = self.next_fetch_start + horizon;
if end >= limit {
end = limit;
finished_to_end = true;
@@ -170,19 +196,20 @@ impl IpRangeDocSet {
let data = self
.ip_addr_fast_field
.get_positions_for_value_range(self.value_range.clone(), self.fetched_until_doc..end);
.get_positions_for_value_range(self.value_range.clone(), self.next_fetch_start..end);
self.loaded_docs.set_data(data);
self.fetched_until_doc = end;
self.next_fetch_start = end;
finished_to_end
}
}
impl DocSet for IpRangeDocSet {
#[inline]
fn advance(&mut self) -> DocId {
if let Some(docid) = self.loaded_docs.next() {
docid as u32
} else {
if self.fetched_until_doc >= self.ip_addr_fast_field.num_vals() as u32 {
if self.next_fetch_start >= self.ip_addr_fast_field.num_vals() as u32 {
return TERMINATED;
}
self.fetch_block();
@@ -190,6 +217,7 @@ impl DocSet for IpRangeDocSet {
}
}
#[inline]
fn doc(&self) -> DocId {
self.loaded_docs
.current()
@@ -197,6 +225,31 @@ impl DocSet for IpRangeDocSet {
.unwrap_or(TERMINATED)
}
/// Advances the `DocSet` forward until reaching the target, or going to the
/// lowest [`DocId`] greater than the target.
///
/// If the end of the `DocSet` is reached, [`TERMINATED`] is returned.
///
/// Calling `.seek(target)` on a terminated `DocSet` is legal. Implementation
/// of `DocSet` should support it.
///
/// Calling `seek(TERMINATED)` is also legal and is the normal way to consume a `DocSet`.
fn seek(&mut self, target: DocId) -> DocId {
if self.is_last_seek_distance_large(target) {
self.reset_fetch_range();
}
if target > self.next_fetch_start {
self.next_fetch_start = target;
}
let mut doc = self.doc();
debug_assert!(doc <= target);
while doc < target {
doc = self.advance();
}
self.last_seek_pos_opt = Some(target);
doc
}
fn size_hint(&self) -> u32 {
0 // heuristic possible by checking number of hits when fetching a block
}
@@ -215,9 +268,9 @@ mod tests {
use crate::Index;
#[derive(Clone, Debug)]
struct Doc {
id: String,
ip: Ipv6Addr,
pub struct Doc {
pub id: String,
pub ip: Ipv6Addr,
}
fn operation_strategy() -> impl Strategy<Value = Doc> {
@@ -227,7 +280,7 @@ mod tests {
]
}
fn doc_from_id_1(id: u64) -> Doc {
pub fn doc_from_id_1(id: u64) -> Doc {
Doc {
// ip != id
id: id.to_string(),
@@ -243,9 +296,9 @@ mod tests {
}
proptest! {
#![proptest_config(ProptestConfig::with_cases(20))]
#![proptest_config(ProptestConfig::with_cases(10))]
#[test]
fn test_ip_range_for_docs_prop(ops in proptest::collection::vec(operation_strategy(), 1..100)) {
fn test_ip_range_for_docs_prop(ops in proptest::collection::vec(operation_strategy(), 1..1000)) {
assert!(test_ip_range_for_docs(ops).is_ok());
}
}
@@ -268,7 +321,7 @@ mod tests {
assert!(test_ip_range_for_docs(ops).is_ok());
}
fn test_ip_range_for_docs(docs: Vec<Doc>) -> crate::Result<()> {
pub fn create_index_from_docs(docs: &[Doc]) -> Index {
let mut schema_builder = Schema::builder();
let ip_field = schema_builder.add_ip_addr_field("ip", INDEXED | STORED | FAST);
let text_field = schema_builder.add_text_field("id", TEXT | STORED);
@@ -277,17 +330,22 @@ mod tests {
{
let mut index_writer = index.writer(3_000_000).unwrap();
for doc in &docs {
for doc in docs.iter() {
index_writer
.add_document(doc!(
ip_field => doc.ip,
text_field => doc.id.to_string()
text_field => doc.id.to_string(),
))
.unwrap();
}
index_writer.commit().unwrap();
}
index
}
fn test_ip_range_for_docs(docs: Vec<Doc>) -> crate::Result<()> {
let index = create_index_from_docs(&docs);
let reader = index.reader().unwrap();
let searcher = reader.searcher();
@@ -335,3 +393,185 @@ mod tests {
Ok(())
}
}
#[cfg(all(test, feature = "unstable"))]
mod bench {
use rand::{thread_rng, Rng};
use test::Bencher;
use super::tests::*;
use super::*;
use crate::collector::Count;
use crate::query::QueryParser;
use crate::Index;
fn get_index_0_to_100() -> Index {
let mut rng = thread_rng();
let num_vals = 100_000;
let docs: Vec<_> = (0..num_vals)
.map(|_i| {
let id = if rng.gen_bool(0.01) {
"veryfew".to_string() // 1%
} else if rng.gen_bool(0.1) {
"few".to_string() // 9%
} else {
"many".to_string() // 90%
};
Doc {
id: id,
ip: Ipv6Addr::from_u128(rng.gen_range(0..100)),
}
})
.collect();
let index = create_index_from_docs(&docs);
index
}
fn excute_query(
start_inclusive: Ipv6Addr,
end_inclusive: Ipv6Addr,
suffix: &str,
index: &Index,
) -> usize {
let gen_query_inclusive = |from: Ipv6Addr, to: Ipv6Addr| {
format!(
"ip:[{} TO {}] {}",
&from.to_string(),
&to.to_string(),
suffix
)
};
let query = gen_query_inclusive(start_inclusive, end_inclusive);
let query_from_text = |text: &str| {
QueryParser::for_index(&index, vec![])
.parse_query(text)
.unwrap()
};
let query = query_from_text(&query);
let reader = index.reader().unwrap();
let searcher = reader.searcher();
searcher.search(&query, &(Count)).unwrap()
}
#[bench]
fn bench_ip_range_hit_90_percent(bench: &mut Bencher) {
let index = get_index_0_to_100();
bench.iter(|| {
let start = Ipv6Addr::from_u128(0);
let end = Ipv6Addr::from_u128(90);
excute_query(start, end, "", &index)
});
}
#[bench]
fn bench_ip_range_hit_10_percent(bench: &mut Bencher) {
let index = get_index_0_to_100();
bench.iter(|| {
let start = Ipv6Addr::from_u128(0);
let end = Ipv6Addr::from_u128(10);
excute_query(start, end, "", &index)
});
}
#[bench]
fn bench_ip_range_hit_1_percent(bench: &mut Bencher) {
let index = get_index_0_to_100();
bench.iter(|| {
let start = Ipv6Addr::from_u128(0);
let end = Ipv6Addr::from_u128(0);
excute_query(start, end, "", &index)
});
}
#[bench]
fn bench_ip_range_hit_10_percent_intersect_with_10_percent(bench: &mut Bencher) {
let index = get_index_0_to_100();
bench.iter(|| {
let start = Ipv6Addr::from_u128(0);
let end = Ipv6Addr::from_u128(10);
excute_query(start, end, "AND id:few", &index)
});
}
#[bench]
fn bench_ip_range_hit_1_percent_intersect_with_10_percent(bench: &mut Bencher) {
let index = get_index_0_to_100();
bench.iter(|| {
let start = Ipv6Addr::from_u128(0);
let end = Ipv6Addr::from_u128(0);
excute_query(start, end, "AND id:few", &index)
});
}
#[bench]
fn bench_ip_range_hit_1_percent_intersect_with_90_percent(bench: &mut Bencher) {
let index = get_index_0_to_100();
bench.iter(|| {
let start = Ipv6Addr::from_u128(0);
let end = Ipv6Addr::from_u128(0);
excute_query(start, end, "AND id:many", &index)
});
}
#[bench]
fn bench_ip_range_hit_1_percent_intersect_with_1_percent(bench: &mut Bencher) {
let index = get_index_0_to_100();
bench.iter(|| {
let start = Ipv6Addr::from_u128(0);
let end = Ipv6Addr::from_u128(0);
excute_query(start, end, "AND id:veryfew", &index)
});
}
#[bench]
fn bench_ip_range_hit_10_percent_intersect_with_90_percent(bench: &mut Bencher) {
let index = get_index_0_to_100();
bench.iter(|| {
let start = Ipv6Addr::from_u128(0);
let end = Ipv6Addr::from_u128(10);
excute_query(start, end, "AND id:many", &index)
});
}
#[bench]
fn bench_ip_range_hit_90_percent_intersect_with_90_percent(bench: &mut Bencher) {
let index = get_index_0_to_100();
bench.iter(|| {
let start = Ipv6Addr::from_u128(0);
let end = Ipv6Addr::from_u128(90);
excute_query(start, end, "AND id:many", &index)
});
}
#[bench]
fn bench_ip_range_hit_90_percent_intersect_with_10_percent(bench: &mut Bencher) {
let index = get_index_0_to_100();
bench.iter(|| {
let start = Ipv6Addr::from_u128(0);
let end = Ipv6Addr::from_u128(90);
excute_query(start, end, "AND id:few", &index)
});
}
}