diff --git a/Cargo.lock b/Cargo.lock index 15979a3211..058f99cf80 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5781,6 +5781,7 @@ dependencies = [ "futures-util", "itertools 0.10.5", "lazy_static", + "pin-project", "prometheus", "protobuf", "protobuf-build", diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index 7d324d81ef..e599e03349 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -26,7 +26,9 @@ common-time.workspace = true common-wal.workspace = true futures.workspace = true futures-util.workspace = true +itertools.workspace = true lazy_static.workspace = true +pin-project.workspace = true prometheus.workspace = true protobuf = { version = "2", features = ["bytes"] } raft-engine.workspace = true diff --git a/src/log-store/src/kafka.rs b/src/log-store/src/kafka.rs index a1cb2dc1b1..21c5a397c0 100644 --- a/src/log-store/src/kafka.rs +++ b/src/log-store/src/kafka.rs @@ -13,6 +13,9 @@ // limitations under the License. pub(crate) mod client_manager; +pub(crate) mod consumer; +#[allow(unused)] +pub(crate) mod index; pub mod log_store; pub(crate) mod producer; pub(crate) mod util; diff --git a/src/log-store/src/kafka/consumer.rs b/src/log-store/src/kafka/consumer.rs new file mode 100644 index 0000000000..70fa5e8482 --- /dev/null +++ b/src/log-store/src/kafka/consumer.rs @@ -0,0 +1,380 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::VecDeque; +use std::ops::Range; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use common_telemetry::debug; +use futures::future::{BoxFuture, Fuse, FusedFuture}; +use futures::{FutureExt, Stream}; +use pin_project::pin_project; +use rskafka::client::partition::PartitionClient; +use rskafka::record::RecordAndOffset; + +use super::index::{NextBatchHint, RegionWalIndexIterator}; + +#[async_trait::async_trait] +pub trait FetchClient: std::fmt::Debug + Send + Sync { + /// Fetch records. + /// + /// Arguments are identical to [`PartitionClient::fetch_records`]. + async fn fetch_records( + &self, + offset: i64, + bytes: Range, + max_wait_ms: i32, + ) -> rskafka::client::error::Result<(Vec, i64)>; +} + +#[async_trait::async_trait] +impl FetchClient for PartitionClient { + async fn fetch_records( + &self, + offset: i64, + bytes: Range, + max_wait_ms: i32, + ) -> rskafka::client::error::Result<(Vec, i64)> { + self.fetch_records(offset, bytes, max_wait_ms).await + } +} + +struct FetchResult { + records_and_offsets: Vec, + batch_size: usize, + fetch_bytes: i32, + watermark: i64, + used_offset: i64, +} + +/// The [`Consumer`] struct represents a Kafka consumer that fetches messages from +/// a Kafka cluster. Yielding records respecting the [`RegionWalIndexIterator`]. +#[pin_project] +pub struct Consumer { + last_high_watermark: i64, + + /// The client is used to fetch records from kafka topic. + client: Arc, + + /// The max batch size in a single fetch request. + max_batch_size: usize, + + /// The max wait milliseconds. + max_wait_ms: u32, + + /// The avg record size + avg_record_size: usize, + + /// Termination flag + terminated: bool, + + /// The buffer of records. + buffer: RecordsBuffer, + + /// The fetch future. + fetch_fut: Fuse>>, +} + +struct RecordsBuffer { + buffer: VecDeque, + + index: Box, +} + +impl RecordsBuffer { + fn pop_front(&mut self) -> Option { + while let Some(index) = self.index.peek() { + if let Some(record_and_offset) = self.buffer.pop_front() { + if index == record_and_offset.offset as u64 { + self.index.next(); + return Some(record_and_offset); + } + } else { + return None; + } + } + + self.buffer.clear(); + None + } + + fn extend(&mut self, records: Vec) { + if let (Some(first), Some(index)) = (records.first(), self.index.peek()) { + // TODO(weny): throw an error? + assert!( + index <= first.offset as u64, + "index: {index}, first offset: {}", + first.offset + ); + } + self.buffer.extend(records); + } +} + +impl Stream for Consumer { + type Item = rskafka::client::error::Result<(RecordAndOffset, i64)>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + + loop { + if *this.terminated { + return Poll::Ready(None); + } + + if this.buffer.index.peek().is_none() { + return Poll::Ready(None); + } + + if let Some(x) = this.buffer.pop_front() { + debug!("Yielding record with offset: {}", x.offset); + return Poll::Ready(Some(Ok((x, *this.last_high_watermark)))); + } + + if this.fetch_fut.is_terminated() { + match this.buffer.index.peek() { + Some(next_offset) => { + let client = Arc::clone(this.client); + let max_wait_ms = *this.max_wait_ms as i32; + let offset = next_offset as i64; + let NextBatchHint { bytes, len } = this + .buffer + .index + .next_batch_hint(*this.avg_record_size) + .unwrap_or(NextBatchHint { + bytes: *this.avg_record_size, + len: 1, + }); + + let fetch_range = + 1i32..(bytes.saturating_add(1).min(*this.max_batch_size) as i32); + *this.fetch_fut = FutureExt::fuse(Box::pin(async move { + let (records_and_offsets, watermark) = client + .fetch_records(offset, fetch_range, max_wait_ms) + .await?; + + Ok(FetchResult { + records_and_offsets, + watermark, + used_offset: offset, + fetch_bytes: bytes as i32, + batch_size: len, + }) + })); + } + None => { + return Poll::Ready(None); + } + } + } + + let data = futures::ready!(this.fetch_fut.poll_unpin(cx)); + + match data { + Ok(FetchResult { + mut records_and_offsets, + watermark, + used_offset, + fetch_bytes, + batch_size, + }) => { + // Sort records by offset in case they aren't in order + records_and_offsets.sort_unstable_by_key(|x| x.offset); + *this.last_high_watermark = watermark; + if !records_and_offsets.is_empty() { + *this.avg_record_size = fetch_bytes as usize / records_and_offsets.len(); + debug!("set avg_record_size: {}", *this.avg_record_size); + } + + debug!( + "Fetch result: {:?}, used_offset: {used_offset}, max_batch_size: {fetch_bytes}, expected batch_num: {batch_size}, actual batch_num: {}", + records_and_offsets + .iter() + .map(|record| record.offset) + .collect::>(), + records_and_offsets + .len() + ); + this.buffer.extend(records_and_offsets); + continue; + } + Err(e) => { + *this.terminated = true; + + return Poll::Ready(Some(Err(e))); + } + } + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::VecDeque; + use std::ops::Range; + use std::sync::Arc; + + use chrono::{TimeZone, Utc}; + use futures::future::Fuse; + use futures::TryStreamExt; + use rskafka::record::{Record, RecordAndOffset}; + + use super::FetchClient; + use crate::kafka::consumer::{Consumer, RecordsBuffer}; + use crate::kafka::index::{MultipleRegionWalIndexIterator, RegionWalRange, RegionWalVecIndex}; + + #[derive(Debug)] + struct MockFetchClient { + record: Record, + } + + #[async_trait::async_trait] + impl FetchClient for MockFetchClient { + async fn fetch_records( + &self, + offset: i64, + bytes: Range, + _max_wait_ms: i32, + ) -> rskafka::client::error::Result<(Vec, i64)> { + let record_size = self.record.approximate_size(); + let num = (bytes.end.unsigned_abs() as usize / record_size).max(1); + + let records = (0..num) + .map(|idx| RecordAndOffset { + record: self.record.clone(), + offset: offset + idx as i64, + }) + .collect::>(); + let max_offset = offset + records.len() as i64; + Ok((records, max_offset)) + } + } + + fn test_record() -> Record { + Record { + key: Some(vec![0; 4]), + value: Some(vec![0; 6]), + headers: Default::default(), + timestamp: Utc.timestamp_millis_opt(1337).unwrap(), + } + } + + #[tokio::test] + async fn test_consumer_with_index() { + common_telemetry::init_default_ut_logging(); + let record = test_record(); + let record_size = record.approximate_size(); + let mock_client = MockFetchClient { + record: record.clone(), + }; + let index = RegionWalVecIndex::new([1, 3, 4, 8, 10, 12], record_size * 3); + let consumer = Consumer { + last_high_watermark: -1, + client: Arc::new(mock_client), + max_batch_size: usize::MAX, + max_wait_ms: 500, + avg_record_size: record_size, + terminated: false, + buffer: RecordsBuffer { + buffer: VecDeque::new(), + index: Box::new(index), + }, + fetch_fut: Fuse::terminated(), + }; + + let records = consumer.try_collect::>().await.unwrap(); + assert_eq!( + records + .into_iter() + .map(|(x, _)| x.offset) + .collect::>(), + vec![1, 3, 4, 8, 10, 12] + ) + } + + #[tokio::test] + async fn test_consumer_without_index() { + common_telemetry::init_default_ut_logging(); + let record = test_record(); + let mock_client = MockFetchClient { + record: record.clone(), + }; + let index = RegionWalRange::new(0..30, 1024); + let consumer = Consumer { + last_high_watermark: -1, + client: Arc::new(mock_client), + max_batch_size: usize::MAX, + max_wait_ms: 500, + avg_record_size: record.approximate_size(), + terminated: false, + buffer: RecordsBuffer { + buffer: VecDeque::new(), + index: Box::new(index), + }, + fetch_fut: Fuse::terminated(), + }; + + let records = consumer.try_collect::>().await.unwrap(); + assert_eq!( + records + .into_iter() + .map(|(x, _)| x.offset) + .collect::>(), + (0..30).collect::>() + ) + } + + #[tokio::test] + async fn test_consumer_with_multiple_index() { + common_telemetry::init_default_ut_logging(); + let record = test_record(); + let mock_client = MockFetchClient { + record: record.clone(), + }; + + let iter0 = Box::new(RegionWalRange::new(0..0, 1024)) as _; + let iter1 = Box::new(RegionWalVecIndex::new( + [0, 1, 2, 7, 8, 11], + record.approximate_size() * 4, + )) as _; + let iter2 = Box::new(RegionWalRange::new(12..12, 1024)) as _; + let iter3 = Box::new(RegionWalRange::new(1024..1028, 1024)) as _; + let iter = MultipleRegionWalIndexIterator::new([iter0, iter1, iter2, iter3]); + + let consumer = Consumer { + last_high_watermark: -1, + client: Arc::new(mock_client), + max_batch_size: usize::MAX, + max_wait_ms: 500, + avg_record_size: record.approximate_size(), + terminated: false, + buffer: RecordsBuffer { + buffer: VecDeque::new(), + index: Box::new(iter), + }, + fetch_fut: Fuse::terminated(), + }; + + let records = consumer.try_collect::>().await.unwrap(); + assert_eq!( + records + .into_iter() + .map(|(x, _)| x.offset) + .collect::>(), + [0, 1, 2, 7, 8, 11, 1024, 1025, 1026, 1027] + ) + } +} diff --git a/src/log-store/src/kafka/index.rs b/src/log-store/src/kafka/index.rs new file mode 100644 index 0000000000..b0b4048516 --- /dev/null +++ b/src/log-store/src/kafka/index.rs @@ -0,0 +1,20 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod iterator; + +pub(crate) use iterator::{ + MultipleRegionWalIndexIterator, NextBatchHint, RegionWalIndexIterator, RegionWalRange, + RegionWalVecIndex, +}; diff --git a/src/log-store/src/kafka/index/iterator.rs b/src/log-store/src/kafka/index/iterator.rs new file mode 100644 index 0000000000..8a33cf1d9a --- /dev/null +++ b/src/log-store/src/kafka/index/iterator.rs @@ -0,0 +1,360 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp::{max, min}; +use std::collections::VecDeque; +use std::iter::Peekable; +use std::marker::PhantomData; +use std::ops::{Add, Mul, Range, Sub}; + +use chrono::format::Item; +use itertools::Itertools; +use store_api::logstore::EntryId; + +use crate::kafka::util::range::{ConvertIndexToRange, MergeRange}; + +#[derive(Debug, PartialEq, Eq)] +pub(crate) struct NextBatchHint { + pub(crate) bytes: usize, + pub(crate) len: usize, +} + +/// An iterator over WAL (Write-Ahead Log) entries index for a region. +pub trait RegionWalIndexIterator: Send + Sync { + /// Returns next batch hint. + fn next_batch_hint(&self, avg_size: usize) -> Option; + + // Peeks the next EntryId without advancing the iterator. + fn peek(&self) -> Option; + + // Advances the iterator and returns the next EntryId. + fn next(&mut self) -> Option; +} + +/// Represents a range [next_entry_id, end_entry_id) of WAL entries for a region. +pub struct RegionWalRange { + current_entry_id: EntryId, + end_entry_id: EntryId, + max_batch_size: usize, +} + +impl RegionWalRange { + pub fn new(range: Range, max_batch_size: usize) -> Self { + Self { + current_entry_id: range.start, + end_entry_id: range.end, + max_batch_size, + } + } + + fn next_batch_size(&self) -> Option { + if self.current_entry_id < self.end_entry_id { + Some( + self.end_entry_id + .checked_sub(self.current_entry_id) + .unwrap_or_default(), + ) + } else { + None + } + } +} + +impl RegionWalIndexIterator for RegionWalRange { + fn next_batch_hint(&self, avg_size: usize) -> Option { + if let Some(size) = self.next_batch_size() { + let bytes = min(size as usize * avg_size, self.max_batch_size); + let len = bytes / avg_size; + + return Some(NextBatchHint { bytes, len }); + } + + None + } + + fn peek(&self) -> Option { + if self.current_entry_id < self.end_entry_id { + Some(self.current_entry_id) + } else { + None + } + } + + fn next(&mut self) -> Option { + if self.current_entry_id < self.end_entry_id { + let next = self.current_entry_id; + self.current_entry_id += 1; + Some(next) + } else { + None + } + } +} + +/// Represents an index of Write-Ahead Log entries for a region, +/// stored as a vector of [EntryId]s. +pub struct RegionWalVecIndex { + index: VecDeque, + min_batch_window_size: usize, +} + +impl RegionWalVecIndex { + pub fn new>(index: I, min_batch_window_size: usize) -> Self { + Self { + index: index.into_iter().collect::>(), + min_batch_window_size, + } + } +} + +impl RegionWalIndexIterator for RegionWalVecIndex { + fn next_batch_hint(&self, avg_size: usize) -> Option { + let merger = MergeRange::new( + ConvertIndexToRange::new(self.index.iter().peekable(), avg_size), + self.min_batch_window_size, + ); + + merger.merge().map(|(range, size)| NextBatchHint { + bytes: range.end - range.start - 1, + len: size, + }) + } + + fn peek(&self) -> Option { + self.index.front().cloned() + } + + fn next(&mut self) -> Option { + self.index.pop_front() + } +} + +/// Represents an iterator over multiple region WAL indexes. +/// +/// Allowing iteration through multiple WAL indexes. +pub struct MultipleRegionWalIndexIterator { + iterator: VecDeque>, +} + +impl MultipleRegionWalIndexIterator { + pub fn new>>(iterator: I) -> Self { + Self { + iterator: iterator.into_iter().collect::>(), + } + } +} + +impl RegionWalIndexIterator for MultipleRegionWalIndexIterator { + fn next_batch_hint(&self, avg_size: usize) -> Option { + for iter in &self.iterator { + if let Some(batch) = iter.next_batch_hint(avg_size) { + return Some(batch); + } + } + + None + } + + fn peek(&self) -> Option { + for iter in &self.iterator { + let peek = iter.peek(); + if peek.is_some() { + return peek; + } + } + + None + } + + fn next(&mut self) -> Option { + while !self.iterator.is_empty() { + let remove = self.iterator.front().and_then(|iter| iter.peek()).is_none(); + if remove { + self.iterator.pop_front(); + } else { + break; + } + } + + self.iterator.front_mut().and_then(|iter| iter.next()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_region_wal_range() { + let mut range = RegionWalRange::new(0..1024, 1024); + assert_eq!( + range.next_batch_hint(10), + Some(NextBatchHint { + bytes: 1024, + len: 102 + }) + ); + + let mut range = RegionWalRange::new(0..1, 1024); + + assert_eq!(range.next_batch_size(), Some(1)); + assert_eq!(range.peek(), Some(0)); + + // Advance 1 step + assert_eq!(range.next(), Some(0)); + assert_eq!(range.next_batch_size(), None); + + // Advance 1 step + assert_eq!(range.next(), None); + assert_eq!(range.next_batch_size(), None); + // No effect + assert_eq!(range.next(), None); + assert_eq!(range.next_batch_size(), None); + + let mut range = RegionWalRange::new(0..0, 1024); + assert_eq!(range.next_batch_size(), None); + // No effect + assert_eq!(range.next(), None); + assert_eq!(range.next_batch_size(), None); + } + + #[test] + fn test_region_wal_vec_index() { + let mut index = RegionWalVecIndex::new([0, 1, 2, 7, 8, 11], 30); + assert_eq!( + index.next_batch_hint(10), + Some(NextBatchHint { bytes: 30, len: 3 }) + ); + assert_eq!(index.peek(), Some(0)); + // Advance 1 step + assert_eq!(index.next(), Some(0)); + assert_eq!( + index.next_batch_hint(10), + Some(NextBatchHint { bytes: 20, len: 2 }) + ); + // Advance 1 step + assert_eq!(index.next(), Some(1)); + assert_eq!( + index.next_batch_hint(10), + Some(NextBatchHint { bytes: 10, len: 1 }) + ); + // Advance 1 step + assert_eq!(index.next(), Some(2)); + assert_eq!( + index.next_batch_hint(10), + Some(NextBatchHint { bytes: 20, len: 2 }) + ); + // Advance 1 step + assert_eq!(index.next(), Some(7)); + assert_eq!( + index.next_batch_hint(10), + Some(NextBatchHint { bytes: 40, len: 2 }) + ); + // Advance 1 step + assert_eq!(index.next(), Some(8)); + assert_eq!( + index.next_batch_hint(10), + Some(NextBatchHint { bytes: 10, len: 1 }) + ); + // Advance 1 step + assert_eq!(index.next(), Some(11)); + assert_eq!(index.next_batch_hint(10), None); + + // No effect + assert_eq!(index.next(), None); + assert_eq!(index.next_batch_hint(10), None); + + let mut index = RegionWalVecIndex::new([], 1024); + assert_eq!(index.next_batch_hint(10), None); + assert_eq!(index.peek(), None); + // No effect + assert_eq!(index.peek(), None); + assert_eq!(index.next(), None); + assert_eq!(index.next_batch_hint(10), None); + } + + #[test] + fn test_multiple_region_wal_iterator() { + let iter0 = Box::new(RegionWalRange::new(0..0, 1024)) as _; + let iter1 = Box::new(RegionWalVecIndex::new([0, 1, 2, 7, 8, 11], 40)) as _; + let iter2 = Box::new(RegionWalRange::new(1024..1024, 1024)) as _; + let mut iter = MultipleRegionWalIndexIterator::new([iter0, iter1, iter2]); + + // The next batch is 0, 1, 2 + assert_eq!( + iter.next_batch_hint(10), + Some(NextBatchHint { bytes: 30, len: 3 }) + ); + assert_eq!(iter.peek(), Some(0)); + // Advance 1 step + assert_eq!(iter.next(), Some(0)); + + // The next batch is 1, 2 + assert_eq!( + iter.next_batch_hint(10), + Some(NextBatchHint { bytes: 20, len: 2 }) + ); + assert_eq!(iter.peek(), Some(1)); + // Advance 1 step + assert_eq!(iter.next(), Some(1)); + + // The next batch is 2 + assert_eq!( + iter.next_batch_hint(10), + Some(NextBatchHint { bytes: 10, len: 1 }) + ); + assert_eq!(iter.peek(), Some(2)); + + // Advance 1 step + assert_eq!(iter.next(), Some(2)); + // The next batch is 7, 8, 11 + assert_eq!( + iter.next_batch_hint(10), + Some(NextBatchHint { bytes: 50, len: 3 }) + ); + assert_eq!(iter.peek(), Some(7)); + + // Advance 1 step + assert_eq!(iter.next(), Some(7)); + // The next batch is 8, 11 + assert_eq!( + iter.next_batch_hint(10), + Some(NextBatchHint { bytes: 40, len: 2 }) + ); + assert_eq!(iter.peek(), Some(8)); + + // Advance 1 step + assert_eq!(iter.next(), Some(8)); + // The next batch is 11 + assert_eq!( + iter.next_batch_hint(10), + Some(NextBatchHint { bytes: 10, len: 1 }) + ); + assert_eq!(iter.peek(), Some(11)); + // Advance 1 step + assert_eq!(iter.next(), Some(11)); + + assert_eq!(iter.next_batch_hint(10), None,); + assert_eq!(iter.peek(), None); + assert!(!iter.iterator.is_empty()); + assert_eq!(iter.next(), None); + assert!(iter.iterator.is_empty()); + + // No effect + assert_eq!(iter.next(), None); + assert_eq!(iter.next_batch_hint(10), None,); + assert_eq!(iter.peek(), None); + assert_eq!(iter.next(), None); + } +} diff --git a/src/log-store/src/kafka/util.rs b/src/log-store/src/kafka/util.rs index 52d575cbce..e871feca42 100644 --- a/src/log-store/src/kafka/util.rs +++ b/src/log-store/src/kafka/util.rs @@ -12,4 +12,5 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod record; +pub(crate) mod range; +pub(crate) mod record; diff --git a/src/log-store/src/kafka/util/range.rs b/src/log-store/src/kafka/util/range.rs new file mode 100644 index 0000000000..282d0aba18 --- /dev/null +++ b/src/log-store/src/kafka/util/range.rs @@ -0,0 +1,146 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp::{max, min}; +use std::iter::Peekable; +use std::ops::Range; + +use store_api::logstore::EntryId; + +/// Convert a sequence of [`EntryId`]s into size ranges. +pub(crate) struct ConvertIndexToRange<'a, I: Iterator> { + base: Option, + iter: Peekable, + avg_size: usize, +} + +impl<'a, I: Iterator> ConvertIndexToRange<'a, I> { + pub fn new(mut iter: Peekable, avg_size: usize) -> Self { + let base = iter.peek().cloned().cloned(); + + Self { + base, + iter, + avg_size, + } + } +} + +impl<'a, I: Iterator> Iterator for ConvertIndexToRange<'a, I> { + type Item = Range; + + fn next(&mut self) -> Option { + let (base, val) = (&self.base?, self.iter.next()?); + let start = (*val - *base) as usize * self.avg_size; + let end = start + self.avg_size + 1; + Some(start..end) + } +} + +/// Merge all ranges smaller than the `window_size`. +/// +/// e.g., +/// +/// Case 1 +/// - input: range: [(0..3), (5..6), (5..8)], window_size: 6 +/// - output: range: (0..6) +/// +/// Case 2 +/// - input: range: [(0..3)], window_size: 6 +/// - output: range: (0..3) +pub(crate) struct MergeRange>> { + iter: I, + window_size: usize, +} + +impl>> MergeRange { + pub fn new(iter: I, window_size: usize) -> Self { + Self { iter, window_size } + } +} + +/// Merges ranges. +fn merge(this: &mut Range, other: &Range) { + this.start = min(this.start, other.start); + this.end = max(this.end, other.end); +} + +impl>> MergeRange { + /// Calculates the size of the next merged range. + pub(crate) fn merge(mut self) -> Option<(Range, usize)> { + let mut merged_range = self.iter.next(); + let this = merged_range.as_mut()?; + let mut merged = 1; + for next in self.iter { + let window = next.start - this.start; + if window > self.window_size { + break; + } else { + merge(this, &next); + merged += 1; + } + } + merged_range.map(|range| (range, merged)) + } +} + +#[cfg(test)] +#[allow(clippy::single_range_in_vec_init)] +mod tests { + use super::*; + + #[test] + fn test_convert_index_to_range() { + let avg_size = 1024; + let index = [1u64, 4, 10, 15]; + let mut converter = ConvertIndexToRange::new(index.iter().peekable(), avg_size); + + assert_eq!(converter.next(), Some(0..avg_size + 1)); + assert_eq!(converter.next(), Some(3 * avg_size..4 * avg_size + 1)); + assert_eq!(converter.next(), Some(9 * avg_size..10 * avg_size + 1)); + assert_eq!(converter.next(), Some(14 * avg_size..15 * avg_size + 1)); + assert_eq!(converter.next(), None); + + let index = []; + let mut converter = ConvertIndexToRange::new(index.iter().peekable(), avg_size); + assert_eq!(converter.next(), None); + } + + #[test] + fn test_merge_range() { + let size_range = [(10usize..13), (12..14), (16..18), (19..29)]; + let merger = MergeRange::new(size_range.into_iter(), 9); + assert_eq!(merger.merge(), Some((10..29, 4))); + + let size_range = [(10usize..13), (12..14), (16..18)]; + let merger = MergeRange::new(size_range.into_iter(), 5); + assert_eq!(merger.merge(), Some((10..14, 2))); + + let size_range = [(10usize..13), (15..17), (16..18)]; + let merger = MergeRange::new(size_range.into_iter(), 5); + assert_eq!(merger.merge(), Some((10..17, 2))); + + let size_range = [(10usize..13)]; + let merger = MergeRange::new(size_range.into_iter(), 4); + assert_eq!(merger.merge(), Some((10..13, 1))); + + let size_range = [(10usize..13)]; + let merger = MergeRange::new(size_range.into_iter(), 2); + assert_eq!(merger.merge(), Some((10..13, 1))); + + let size_range = []; + let merger = MergeRange::new(size_range.into_iter(), 2); + assert_eq!(merger.merge(), None); + } +}