From 5e159c26bf14cbdda1b69ea16d7aeae1679db7ab Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Tue, 25 Oct 2022 15:52:32 +0800 Subject: [PATCH] add ip range query benchmark, add seek behaviour --- src/query/range_query_ip_fastfield.rs | 282 ++++++++++++++++++++++++-- 1 file changed, 261 insertions(+), 21 deletions(-) diff --git a/src/query/range_query_ip_fastfield.rs b/src/query/range_query_ip_fastfield.rs index 8a9170b6c..40c7b43ad 100644 --- a/src/query/range_query_ip_fastfield.rs +++ b/src/query/range_query_ip_fastfield.rs @@ -123,11 +123,22 @@ struct IpRangeDocSet { /// The range filter on the values. value_range: RangeInclusive, ip_addr_fast_field: Arc>, - /// The last docid (exclusive) that has been fetched. - fetched_until_doc: u32, + /// The next docid start range to fetch (inclusive). + next_fetch_start: u32, + /// Number of docs range checked in a batch. + /// + /// There are two patterns. + /// - We do a full scan. => We can load large chunks. We don't know in advance if seek call + /// will come, so we start with small chunks + /// - We load docs, interspersed with seek calls. When there are big jumps in the seek, we + /// should load small chunks. + fetch_horizon: u32, /// Current batch of loaded docs. loaded_docs: VecCursor, + last_seek_pos_opt: Option, } + +const DEFALT_FETCH_HORIZON: u32 = 128; impl IpRangeDocSet { fn new( value_range: RangeInclusive, @@ -137,32 +148,47 @@ impl IpRangeDocSet { value_range, ip_addr_fast_field, loaded_docs: VecCursor::new(), - fetched_until_doc: 0, + next_fetch_start: 0, + fetch_horizon: DEFALT_FETCH_HORIZON, + last_seek_pos_opt: None, }; + ip_range_docset.reset_fetch_range(); ip_range_docset.fetch_block(); ip_range_docset } + fn reset_fetch_range(&mut self) { + self.fetch_horizon = DEFALT_FETCH_HORIZON; + } + /// Returns true if more data could be fetched fn fetch_block(&mut self) { - let mut horizon: u32 = 1; const MAX_HORIZON: u32 = 100_000; while self.loaded_docs.is_empty() { - let finished_to_end = self.fetch_horizon(horizon); + let finished_to_end = self.fetch_horizon(self.fetch_horizon); if finished_to_end { break; } - // Fetch more data, increase horizon - horizon = (horizon * 2).min(MAX_HORIZON); + // Fetch more data, increase horizon. Horizon only gets reset when doing a seek. + self.fetch_horizon = (self.fetch_horizon * 2).min(MAX_HORIZON); } } - /// Fetches a block for docid range [fetched_until_doc .. fetched_until_doc + HORIZON] + /// check if the distance between the seek calls is large + fn is_last_seek_distance_large(&self, new_seek: DocId) -> bool { + if let Some(last_seek_pos) = self.last_seek_pos_opt { + (new_seek - last_seek_pos) >= 128 + } else { + true + } + } + + /// Fetches a block for docid range [next_fetch_start .. next_fetch_start + HORIZON] fn fetch_horizon(&mut self, horizon: u32) -> bool { - let mut end = self.fetched_until_doc + horizon; let mut finished_to_end = false; let limit = self.ip_addr_fast_field.num_vals(); + let mut end = self.next_fetch_start + horizon; if end >= limit { end = limit; finished_to_end = true; @@ -170,19 +196,20 @@ impl IpRangeDocSet { let data = self .ip_addr_fast_field - .get_positions_for_value_range(self.value_range.clone(), self.fetched_until_doc..end); + .get_positions_for_value_range(self.value_range.clone(), self.next_fetch_start..end); self.loaded_docs.set_data(data); - self.fetched_until_doc = end; + self.next_fetch_start = end; finished_to_end } } impl DocSet for IpRangeDocSet { + #[inline] fn advance(&mut self) -> DocId { if let Some(docid) = self.loaded_docs.next() { docid as u32 } else { - if self.fetched_until_doc >= self.ip_addr_fast_field.num_vals() as u32 { + if self.next_fetch_start >= self.ip_addr_fast_field.num_vals() as u32 { return TERMINATED; } self.fetch_block(); @@ -190,6 +217,7 @@ impl DocSet for IpRangeDocSet { } } + #[inline] fn doc(&self) -> DocId { self.loaded_docs .current() @@ -197,6 +225,31 @@ impl DocSet for IpRangeDocSet { .unwrap_or(TERMINATED) } + /// Advances the `DocSet` forward until reaching the target, or going to the + /// lowest [`DocId`] greater than the target. + /// + /// If the end of the `DocSet` is reached, [`TERMINATED`] is returned. + /// + /// Calling `.seek(target)` on a terminated `DocSet` is legal. Implementation + /// of `DocSet` should support it. + /// + /// Calling `seek(TERMINATED)` is also legal and is the normal way to consume a `DocSet`. + fn seek(&mut self, target: DocId) -> DocId { + if self.is_last_seek_distance_large(target) { + self.reset_fetch_range(); + } + if target > self.next_fetch_start { + self.next_fetch_start = target; + } + let mut doc = self.doc(); + debug_assert!(doc <= target); + while doc < target { + doc = self.advance(); + } + self.last_seek_pos_opt = Some(target); + doc + } + fn size_hint(&self) -> u32 { 0 // heuristic possible by checking number of hits when fetching a block } @@ -215,9 +268,9 @@ mod tests { use crate::Index; #[derive(Clone, Debug)] - struct Doc { - id: String, - ip: Ipv6Addr, + pub struct Doc { + pub id: String, + pub ip: Ipv6Addr, } fn operation_strategy() -> impl Strategy { @@ -227,7 +280,7 @@ mod tests { ] } - fn doc_from_id_1(id: u64) -> Doc { + pub fn doc_from_id_1(id: u64) -> Doc { Doc { // ip != id id: id.to_string(), @@ -243,9 +296,9 @@ mod tests { } proptest! { - #![proptest_config(ProptestConfig::with_cases(20))] + #![proptest_config(ProptestConfig::with_cases(10))] #[test] - fn test_ip_range_for_docs_prop(ops in proptest::collection::vec(operation_strategy(), 1..100)) { + fn test_ip_range_for_docs_prop(ops in proptest::collection::vec(operation_strategy(), 1..1000)) { assert!(test_ip_range_for_docs(ops).is_ok()); } } @@ -268,7 +321,7 @@ mod tests { assert!(test_ip_range_for_docs(ops).is_ok()); } - fn test_ip_range_for_docs(docs: Vec) -> crate::Result<()> { + pub fn create_index_from_docs(docs: &[Doc]) -> Index { let mut schema_builder = Schema::builder(); let ip_field = schema_builder.add_ip_addr_field("ip", INDEXED | STORED | FAST); let text_field = schema_builder.add_text_field("id", TEXT | STORED); @@ -277,17 +330,22 @@ mod tests { { let mut index_writer = index.writer(3_000_000).unwrap(); - for doc in &docs { + for doc in docs.iter() { index_writer .add_document(doc!( ip_field => doc.ip, - text_field => doc.id.to_string() + text_field => doc.id.to_string(), )) .unwrap(); } index_writer.commit().unwrap(); } + index + } + + fn test_ip_range_for_docs(docs: Vec) -> crate::Result<()> { + let index = create_index_from_docs(&docs); let reader = index.reader().unwrap(); let searcher = reader.searcher(); @@ -335,3 +393,185 @@ mod tests { Ok(()) } } + +#[cfg(all(test, feature = "unstable"))] +mod bench { + + use rand::{thread_rng, Rng}; + use test::Bencher; + + use super::tests::*; + use super::*; + use crate::collector::Count; + use crate::query::QueryParser; + use crate::Index; + + fn get_index_0_to_100() -> Index { + let mut rng = thread_rng(); + let num_vals = 100_000; + let docs: Vec<_> = (0..num_vals) + .map(|_i| { + let id = if rng.gen_bool(0.01) { + "veryfew".to_string() // 1% + } else if rng.gen_bool(0.1) { + "few".to_string() // 9% + } else { + "many".to_string() // 90% + }; + Doc { + id: id, + ip: Ipv6Addr::from_u128(rng.gen_range(0..100)), + } + }) + .collect(); + + let index = create_index_from_docs(&docs); + index + } + fn excute_query( + start_inclusive: Ipv6Addr, + end_inclusive: Ipv6Addr, + suffix: &str, + index: &Index, + ) -> usize { + let gen_query_inclusive = |from: Ipv6Addr, to: Ipv6Addr| { + format!( + "ip:[{} TO {}] {}", + &from.to_string(), + &to.to_string(), + suffix + ) + }; + + let query = gen_query_inclusive(start_inclusive, end_inclusive); + let query_from_text = |text: &str| { + QueryParser::for_index(&index, vec![]) + .parse_query(text) + .unwrap() + }; + let query = query_from_text(&query); + let reader = index.reader().unwrap(); + let searcher = reader.searcher(); + searcher.search(&query, &(Count)).unwrap() + } + + #[bench] + fn bench_ip_range_hit_90_percent(bench: &mut Bencher) { + let index = get_index_0_to_100(); + + bench.iter(|| { + let start = Ipv6Addr::from_u128(0); + let end = Ipv6Addr::from_u128(90); + + excute_query(start, end, "", &index) + }); + } + + #[bench] + fn bench_ip_range_hit_10_percent(bench: &mut Bencher) { + let index = get_index_0_to_100(); + + bench.iter(|| { + let start = Ipv6Addr::from_u128(0); + let end = Ipv6Addr::from_u128(10); + + excute_query(start, end, "", &index) + }); + } + + #[bench] + fn bench_ip_range_hit_1_percent(bench: &mut Bencher) { + let index = get_index_0_to_100(); + + bench.iter(|| { + let start = Ipv6Addr::from_u128(0); + let end = Ipv6Addr::from_u128(0); + + excute_query(start, end, "", &index) + }); + } + + #[bench] + fn bench_ip_range_hit_10_percent_intersect_with_10_percent(bench: &mut Bencher) { + let index = get_index_0_to_100(); + + bench.iter(|| { + let start = Ipv6Addr::from_u128(0); + let end = Ipv6Addr::from_u128(10); + + excute_query(start, end, "AND id:few", &index) + }); + } + + #[bench] + fn bench_ip_range_hit_1_percent_intersect_with_10_percent(bench: &mut Bencher) { + let index = get_index_0_to_100(); + + bench.iter(|| { + let start = Ipv6Addr::from_u128(0); + let end = Ipv6Addr::from_u128(0); + + excute_query(start, end, "AND id:few", &index) + }); + } + + #[bench] + fn bench_ip_range_hit_1_percent_intersect_with_90_percent(bench: &mut Bencher) { + let index = get_index_0_to_100(); + + bench.iter(|| { + let start = Ipv6Addr::from_u128(0); + let end = Ipv6Addr::from_u128(0); + + excute_query(start, end, "AND id:many", &index) + }); + } + + #[bench] + fn bench_ip_range_hit_1_percent_intersect_with_1_percent(bench: &mut Bencher) { + let index = get_index_0_to_100(); + + bench.iter(|| { + let start = Ipv6Addr::from_u128(0); + let end = Ipv6Addr::from_u128(0); + + excute_query(start, end, "AND id:veryfew", &index) + }); + } + + #[bench] + fn bench_ip_range_hit_10_percent_intersect_with_90_percent(bench: &mut Bencher) { + let index = get_index_0_to_100(); + + bench.iter(|| { + let start = Ipv6Addr::from_u128(0); + let end = Ipv6Addr::from_u128(10); + + excute_query(start, end, "AND id:many", &index) + }); + } + + #[bench] + fn bench_ip_range_hit_90_percent_intersect_with_90_percent(bench: &mut Bencher) { + let index = get_index_0_to_100(); + + bench.iter(|| { + let start = Ipv6Addr::from_u128(0); + let end = Ipv6Addr::from_u128(90); + + excute_query(start, end, "AND id:many", &index) + }); + } + + #[bench] + fn bench_ip_range_hit_90_percent_intersect_with_10_percent(bench: &mut Bencher) { + let index = get_index_0_to_100(); + + bench.iter(|| { + let start = Ipv6Addr::from_u128(0); + let end = Ipv6Addr::from_u128(90); + + excute_query(start, end, "AND id:few", &index) + }); + } +}