feat: introduce new kafka topic consumer respecting WAL index (#4424)

* feat: introduce new kafka topic consumer respecting WAL index

* chore: fmt

* chore: fmt toml

* chore: add comments

* feat: merge close ranges

* fix: fix unit tests

* chore: fix typos

* chore: use loop

* chore: use unstable sort

* chore: use gt instead of gte

* chore: add comments

* chore: rename to `current_entry_id`

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* refactor: minor refactor

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2024-08-05 14:56:25 +08:00
committed by GitHub
parent 4c93fe6c2d
commit 3937e67694
8 changed files with 914 additions and 1 deletions

1
Cargo.lock generated
View File

@@ -5781,6 +5781,7 @@ dependencies = [
"futures-util",
"itertools 0.10.5",
"lazy_static",
"pin-project",
"prometheus",
"protobuf",
"protobuf-build",

View File

@@ -26,7 +26,9 @@ common-time.workspace = true
common-wal.workspace = true
futures.workspace = true
futures-util.workspace = true
itertools.workspace = true
lazy_static.workspace = true
pin-project.workspace = true
prometheus.workspace = true
protobuf = { version = "2", features = ["bytes"] }
raft-engine.workspace = true

View File

@@ -13,6 +13,9 @@
// limitations under the License.
pub(crate) mod client_manager;
pub(crate) mod consumer;
#[allow(unused)]
pub(crate) mod index;
pub mod log_store;
pub(crate) mod producer;
pub(crate) mod util;

View File

@@ -0,0 +1,380 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::VecDeque;
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use common_telemetry::debug;
use futures::future::{BoxFuture, Fuse, FusedFuture};
use futures::{FutureExt, Stream};
use pin_project::pin_project;
use rskafka::client::partition::PartitionClient;
use rskafka::record::RecordAndOffset;
use super::index::{NextBatchHint, RegionWalIndexIterator};
#[async_trait::async_trait]
pub trait FetchClient: std::fmt::Debug + Send + Sync {
/// Fetch records.
///
/// Arguments are identical to [`PartitionClient::fetch_records`].
async fn fetch_records(
&self,
offset: i64,
bytes: Range<i32>,
max_wait_ms: i32,
) -> rskafka::client::error::Result<(Vec<RecordAndOffset>, i64)>;
}
#[async_trait::async_trait]
impl FetchClient for PartitionClient {
async fn fetch_records(
&self,
offset: i64,
bytes: Range<i32>,
max_wait_ms: i32,
) -> rskafka::client::error::Result<(Vec<RecordAndOffset>, i64)> {
self.fetch_records(offset, bytes, max_wait_ms).await
}
}
struct FetchResult {
records_and_offsets: Vec<RecordAndOffset>,
batch_size: usize,
fetch_bytes: i32,
watermark: i64,
used_offset: i64,
}
/// The [`Consumer`] struct represents a Kafka consumer that fetches messages from
/// a Kafka cluster. Yielding records respecting the [`RegionWalIndexIterator`].
#[pin_project]
pub struct Consumer {
last_high_watermark: i64,
/// The client is used to fetch records from kafka topic.
client: Arc<dyn FetchClient>,
/// The max batch size in a single fetch request.
max_batch_size: usize,
/// The max wait milliseconds.
max_wait_ms: u32,
/// The avg record size
avg_record_size: usize,
/// Termination flag
terminated: bool,
/// The buffer of records.
buffer: RecordsBuffer,
/// The fetch future.
fetch_fut: Fuse<BoxFuture<'static, rskafka::client::error::Result<FetchResult>>>,
}
struct RecordsBuffer {
buffer: VecDeque<RecordAndOffset>,
index: Box<dyn RegionWalIndexIterator>,
}
impl RecordsBuffer {
fn pop_front(&mut self) -> Option<RecordAndOffset> {
while let Some(index) = self.index.peek() {
if let Some(record_and_offset) = self.buffer.pop_front() {
if index == record_and_offset.offset as u64 {
self.index.next();
return Some(record_and_offset);
}
} else {
return None;
}
}
self.buffer.clear();
None
}
fn extend(&mut self, records: Vec<RecordAndOffset>) {
if let (Some(first), Some(index)) = (records.first(), self.index.peek()) {
// TODO(weny): throw an error?
assert!(
index <= first.offset as u64,
"index: {index}, first offset: {}",
first.offset
);
}
self.buffer.extend(records);
}
}
impl Stream for Consumer {
type Item = rskafka::client::error::Result<(RecordAndOffset, i64)>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
loop {
if *this.terminated {
return Poll::Ready(None);
}
if this.buffer.index.peek().is_none() {
return Poll::Ready(None);
}
if let Some(x) = this.buffer.pop_front() {
debug!("Yielding record with offset: {}", x.offset);
return Poll::Ready(Some(Ok((x, *this.last_high_watermark))));
}
if this.fetch_fut.is_terminated() {
match this.buffer.index.peek() {
Some(next_offset) => {
let client = Arc::clone(this.client);
let max_wait_ms = *this.max_wait_ms as i32;
let offset = next_offset as i64;
let NextBatchHint { bytes, len } = this
.buffer
.index
.next_batch_hint(*this.avg_record_size)
.unwrap_or(NextBatchHint {
bytes: *this.avg_record_size,
len: 1,
});
let fetch_range =
1i32..(bytes.saturating_add(1).min(*this.max_batch_size) as i32);
*this.fetch_fut = FutureExt::fuse(Box::pin(async move {
let (records_and_offsets, watermark) = client
.fetch_records(offset, fetch_range, max_wait_ms)
.await?;
Ok(FetchResult {
records_and_offsets,
watermark,
used_offset: offset,
fetch_bytes: bytes as i32,
batch_size: len,
})
}));
}
None => {
return Poll::Ready(None);
}
}
}
let data = futures::ready!(this.fetch_fut.poll_unpin(cx));
match data {
Ok(FetchResult {
mut records_and_offsets,
watermark,
used_offset,
fetch_bytes,
batch_size,
}) => {
// Sort records by offset in case they aren't in order
records_and_offsets.sort_unstable_by_key(|x| x.offset);
*this.last_high_watermark = watermark;
if !records_and_offsets.is_empty() {
*this.avg_record_size = fetch_bytes as usize / records_and_offsets.len();
debug!("set avg_record_size: {}", *this.avg_record_size);
}
debug!(
"Fetch result: {:?}, used_offset: {used_offset}, max_batch_size: {fetch_bytes}, expected batch_num: {batch_size}, actual batch_num: {}",
records_and_offsets
.iter()
.map(|record| record.offset)
.collect::<Vec<_>>(),
records_and_offsets
.len()
);
this.buffer.extend(records_and_offsets);
continue;
}
Err(e) => {
*this.terminated = true;
return Poll::Ready(Some(Err(e)));
}
}
}
}
}
#[cfg(test)]
mod tests {
use std::collections::VecDeque;
use std::ops::Range;
use std::sync::Arc;
use chrono::{TimeZone, Utc};
use futures::future::Fuse;
use futures::TryStreamExt;
use rskafka::record::{Record, RecordAndOffset};
use super::FetchClient;
use crate::kafka::consumer::{Consumer, RecordsBuffer};
use crate::kafka::index::{MultipleRegionWalIndexIterator, RegionWalRange, RegionWalVecIndex};
#[derive(Debug)]
struct MockFetchClient {
record: Record,
}
#[async_trait::async_trait]
impl FetchClient for MockFetchClient {
async fn fetch_records(
&self,
offset: i64,
bytes: Range<i32>,
_max_wait_ms: i32,
) -> rskafka::client::error::Result<(Vec<RecordAndOffset>, i64)> {
let record_size = self.record.approximate_size();
let num = (bytes.end.unsigned_abs() as usize / record_size).max(1);
let records = (0..num)
.map(|idx| RecordAndOffset {
record: self.record.clone(),
offset: offset + idx as i64,
})
.collect::<Vec<_>>();
let max_offset = offset + records.len() as i64;
Ok((records, max_offset))
}
}
fn test_record() -> Record {
Record {
key: Some(vec![0; 4]),
value: Some(vec![0; 6]),
headers: Default::default(),
timestamp: Utc.timestamp_millis_opt(1337).unwrap(),
}
}
#[tokio::test]
async fn test_consumer_with_index() {
common_telemetry::init_default_ut_logging();
let record = test_record();
let record_size = record.approximate_size();
let mock_client = MockFetchClient {
record: record.clone(),
};
let index = RegionWalVecIndex::new([1, 3, 4, 8, 10, 12], record_size * 3);
let consumer = Consumer {
last_high_watermark: -1,
client: Arc::new(mock_client),
max_batch_size: usize::MAX,
max_wait_ms: 500,
avg_record_size: record_size,
terminated: false,
buffer: RecordsBuffer {
buffer: VecDeque::new(),
index: Box::new(index),
},
fetch_fut: Fuse::terminated(),
};
let records = consumer.try_collect::<Vec<_>>().await.unwrap();
assert_eq!(
records
.into_iter()
.map(|(x, _)| x.offset)
.collect::<Vec<_>>(),
vec![1, 3, 4, 8, 10, 12]
)
}
#[tokio::test]
async fn test_consumer_without_index() {
common_telemetry::init_default_ut_logging();
let record = test_record();
let mock_client = MockFetchClient {
record: record.clone(),
};
let index = RegionWalRange::new(0..30, 1024);
let consumer = Consumer {
last_high_watermark: -1,
client: Arc::new(mock_client),
max_batch_size: usize::MAX,
max_wait_ms: 500,
avg_record_size: record.approximate_size(),
terminated: false,
buffer: RecordsBuffer {
buffer: VecDeque::new(),
index: Box::new(index),
},
fetch_fut: Fuse::terminated(),
};
let records = consumer.try_collect::<Vec<_>>().await.unwrap();
assert_eq!(
records
.into_iter()
.map(|(x, _)| x.offset)
.collect::<Vec<_>>(),
(0..30).collect::<Vec<_>>()
)
}
#[tokio::test]
async fn test_consumer_with_multiple_index() {
common_telemetry::init_default_ut_logging();
let record = test_record();
let mock_client = MockFetchClient {
record: record.clone(),
};
let iter0 = Box::new(RegionWalRange::new(0..0, 1024)) as _;
let iter1 = Box::new(RegionWalVecIndex::new(
[0, 1, 2, 7, 8, 11],
record.approximate_size() * 4,
)) as _;
let iter2 = Box::new(RegionWalRange::new(12..12, 1024)) as _;
let iter3 = Box::new(RegionWalRange::new(1024..1028, 1024)) as _;
let iter = MultipleRegionWalIndexIterator::new([iter0, iter1, iter2, iter3]);
let consumer = Consumer {
last_high_watermark: -1,
client: Arc::new(mock_client),
max_batch_size: usize::MAX,
max_wait_ms: 500,
avg_record_size: record.approximate_size(),
terminated: false,
buffer: RecordsBuffer {
buffer: VecDeque::new(),
index: Box::new(iter),
},
fetch_fut: Fuse::terminated(),
};
let records = consumer.try_collect::<Vec<_>>().await.unwrap();
assert_eq!(
records
.into_iter()
.map(|(x, _)| x.offset)
.collect::<Vec<_>>(),
[0, 1, 2, 7, 8, 11, 1024, 1025, 1026, 1027]
)
}
}

View File

@@ -0,0 +1,20 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod iterator;
pub(crate) use iterator::{
MultipleRegionWalIndexIterator, NextBatchHint, RegionWalIndexIterator, RegionWalRange,
RegionWalVecIndex,
};

View File

@@ -0,0 +1,360 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::cmp::{max, min};
use std::collections::VecDeque;
use std::iter::Peekable;
use std::marker::PhantomData;
use std::ops::{Add, Mul, Range, Sub};
use chrono::format::Item;
use itertools::Itertools;
use store_api::logstore::EntryId;
use crate::kafka::util::range::{ConvertIndexToRange, MergeRange};
#[derive(Debug, PartialEq, Eq)]
pub(crate) struct NextBatchHint {
pub(crate) bytes: usize,
pub(crate) len: usize,
}
/// An iterator over WAL (Write-Ahead Log) entries index for a region.
pub trait RegionWalIndexIterator: Send + Sync {
/// Returns next batch hint.
fn next_batch_hint(&self, avg_size: usize) -> Option<NextBatchHint>;
// Peeks the next EntryId without advancing the iterator.
fn peek(&self) -> Option<EntryId>;
// Advances the iterator and returns the next EntryId.
fn next(&mut self) -> Option<EntryId>;
}
/// Represents a range [next_entry_id, end_entry_id) of WAL entries for a region.
pub struct RegionWalRange {
current_entry_id: EntryId,
end_entry_id: EntryId,
max_batch_size: usize,
}
impl RegionWalRange {
pub fn new(range: Range<EntryId>, max_batch_size: usize) -> Self {
Self {
current_entry_id: range.start,
end_entry_id: range.end,
max_batch_size,
}
}
fn next_batch_size(&self) -> Option<u64> {
if self.current_entry_id < self.end_entry_id {
Some(
self.end_entry_id
.checked_sub(self.current_entry_id)
.unwrap_or_default(),
)
} else {
None
}
}
}
impl RegionWalIndexIterator for RegionWalRange {
fn next_batch_hint(&self, avg_size: usize) -> Option<NextBatchHint> {
if let Some(size) = self.next_batch_size() {
let bytes = min(size as usize * avg_size, self.max_batch_size);
let len = bytes / avg_size;
return Some(NextBatchHint { bytes, len });
}
None
}
fn peek(&self) -> Option<EntryId> {
if self.current_entry_id < self.end_entry_id {
Some(self.current_entry_id)
} else {
None
}
}
fn next(&mut self) -> Option<EntryId> {
if self.current_entry_id < self.end_entry_id {
let next = self.current_entry_id;
self.current_entry_id += 1;
Some(next)
} else {
None
}
}
}
/// Represents an index of Write-Ahead Log entries for a region,
/// stored as a vector of [EntryId]s.
pub struct RegionWalVecIndex {
index: VecDeque<EntryId>,
min_batch_window_size: usize,
}
impl RegionWalVecIndex {
pub fn new<I: IntoIterator<Item = EntryId>>(index: I, min_batch_window_size: usize) -> Self {
Self {
index: index.into_iter().collect::<VecDeque<_>>(),
min_batch_window_size,
}
}
}
impl RegionWalIndexIterator for RegionWalVecIndex {
fn next_batch_hint(&self, avg_size: usize) -> Option<NextBatchHint> {
let merger = MergeRange::new(
ConvertIndexToRange::new(self.index.iter().peekable(), avg_size),
self.min_batch_window_size,
);
merger.merge().map(|(range, size)| NextBatchHint {
bytes: range.end - range.start - 1,
len: size,
})
}
fn peek(&self) -> Option<EntryId> {
self.index.front().cloned()
}
fn next(&mut self) -> Option<EntryId> {
self.index.pop_front()
}
}
/// Represents an iterator over multiple region WAL indexes.
///
/// Allowing iteration through multiple WAL indexes.
pub struct MultipleRegionWalIndexIterator {
iterator: VecDeque<Box<dyn RegionWalIndexIterator>>,
}
impl MultipleRegionWalIndexIterator {
pub fn new<I: IntoIterator<Item = Box<dyn RegionWalIndexIterator>>>(iterator: I) -> Self {
Self {
iterator: iterator.into_iter().collect::<VecDeque<_>>(),
}
}
}
impl RegionWalIndexIterator for MultipleRegionWalIndexIterator {
fn next_batch_hint(&self, avg_size: usize) -> Option<NextBatchHint> {
for iter in &self.iterator {
if let Some(batch) = iter.next_batch_hint(avg_size) {
return Some(batch);
}
}
None
}
fn peek(&self) -> Option<EntryId> {
for iter in &self.iterator {
let peek = iter.peek();
if peek.is_some() {
return peek;
}
}
None
}
fn next(&mut self) -> Option<EntryId> {
while !self.iterator.is_empty() {
let remove = self.iterator.front().and_then(|iter| iter.peek()).is_none();
if remove {
self.iterator.pop_front();
} else {
break;
}
}
self.iterator.front_mut().and_then(|iter| iter.next())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_region_wal_range() {
let mut range = RegionWalRange::new(0..1024, 1024);
assert_eq!(
range.next_batch_hint(10),
Some(NextBatchHint {
bytes: 1024,
len: 102
})
);
let mut range = RegionWalRange::new(0..1, 1024);
assert_eq!(range.next_batch_size(), Some(1));
assert_eq!(range.peek(), Some(0));
// Advance 1 step
assert_eq!(range.next(), Some(0));
assert_eq!(range.next_batch_size(), None);
// Advance 1 step
assert_eq!(range.next(), None);
assert_eq!(range.next_batch_size(), None);
// No effect
assert_eq!(range.next(), None);
assert_eq!(range.next_batch_size(), None);
let mut range = RegionWalRange::new(0..0, 1024);
assert_eq!(range.next_batch_size(), None);
// No effect
assert_eq!(range.next(), None);
assert_eq!(range.next_batch_size(), None);
}
#[test]
fn test_region_wal_vec_index() {
let mut index = RegionWalVecIndex::new([0, 1, 2, 7, 8, 11], 30);
assert_eq!(
index.next_batch_hint(10),
Some(NextBatchHint { bytes: 30, len: 3 })
);
assert_eq!(index.peek(), Some(0));
// Advance 1 step
assert_eq!(index.next(), Some(0));
assert_eq!(
index.next_batch_hint(10),
Some(NextBatchHint { bytes: 20, len: 2 })
);
// Advance 1 step
assert_eq!(index.next(), Some(1));
assert_eq!(
index.next_batch_hint(10),
Some(NextBatchHint { bytes: 10, len: 1 })
);
// Advance 1 step
assert_eq!(index.next(), Some(2));
assert_eq!(
index.next_batch_hint(10),
Some(NextBatchHint { bytes: 20, len: 2 })
);
// Advance 1 step
assert_eq!(index.next(), Some(7));
assert_eq!(
index.next_batch_hint(10),
Some(NextBatchHint { bytes: 40, len: 2 })
);
// Advance 1 step
assert_eq!(index.next(), Some(8));
assert_eq!(
index.next_batch_hint(10),
Some(NextBatchHint { bytes: 10, len: 1 })
);
// Advance 1 step
assert_eq!(index.next(), Some(11));
assert_eq!(index.next_batch_hint(10), None);
// No effect
assert_eq!(index.next(), None);
assert_eq!(index.next_batch_hint(10), None);
let mut index = RegionWalVecIndex::new([], 1024);
assert_eq!(index.next_batch_hint(10), None);
assert_eq!(index.peek(), None);
// No effect
assert_eq!(index.peek(), None);
assert_eq!(index.next(), None);
assert_eq!(index.next_batch_hint(10), None);
}
#[test]
fn test_multiple_region_wal_iterator() {
let iter0 = Box::new(RegionWalRange::new(0..0, 1024)) as _;
let iter1 = Box::new(RegionWalVecIndex::new([0, 1, 2, 7, 8, 11], 40)) as _;
let iter2 = Box::new(RegionWalRange::new(1024..1024, 1024)) as _;
let mut iter = MultipleRegionWalIndexIterator::new([iter0, iter1, iter2]);
// The next batch is 0, 1, 2
assert_eq!(
iter.next_batch_hint(10),
Some(NextBatchHint { bytes: 30, len: 3 })
);
assert_eq!(iter.peek(), Some(0));
// Advance 1 step
assert_eq!(iter.next(), Some(0));
// The next batch is 1, 2
assert_eq!(
iter.next_batch_hint(10),
Some(NextBatchHint { bytes: 20, len: 2 })
);
assert_eq!(iter.peek(), Some(1));
// Advance 1 step
assert_eq!(iter.next(), Some(1));
// The next batch is 2
assert_eq!(
iter.next_batch_hint(10),
Some(NextBatchHint { bytes: 10, len: 1 })
);
assert_eq!(iter.peek(), Some(2));
// Advance 1 step
assert_eq!(iter.next(), Some(2));
// The next batch is 7, 8, 11
assert_eq!(
iter.next_batch_hint(10),
Some(NextBatchHint { bytes: 50, len: 3 })
);
assert_eq!(iter.peek(), Some(7));
// Advance 1 step
assert_eq!(iter.next(), Some(7));
// The next batch is 8, 11
assert_eq!(
iter.next_batch_hint(10),
Some(NextBatchHint { bytes: 40, len: 2 })
);
assert_eq!(iter.peek(), Some(8));
// Advance 1 step
assert_eq!(iter.next(), Some(8));
// The next batch is 11
assert_eq!(
iter.next_batch_hint(10),
Some(NextBatchHint { bytes: 10, len: 1 })
);
assert_eq!(iter.peek(), Some(11));
// Advance 1 step
assert_eq!(iter.next(), Some(11));
assert_eq!(iter.next_batch_hint(10), None,);
assert_eq!(iter.peek(), None);
assert!(!iter.iterator.is_empty());
assert_eq!(iter.next(), None);
assert!(iter.iterator.is_empty());
// No effect
assert_eq!(iter.next(), None);
assert_eq!(iter.next_batch_hint(10), None,);
assert_eq!(iter.peek(), None);
assert_eq!(iter.next(), None);
}
}

View File

@@ -12,4 +12,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod record;
pub(crate) mod range;
pub(crate) mod record;

View File

@@ -0,0 +1,146 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::cmp::{max, min};
use std::iter::Peekable;
use std::ops::Range;
use store_api::logstore::EntryId;
/// Convert a sequence of [`EntryId`]s into size ranges.
pub(crate) struct ConvertIndexToRange<'a, I: Iterator<Item = &'a EntryId>> {
base: Option<EntryId>,
iter: Peekable<I>,
avg_size: usize,
}
impl<'a, I: Iterator<Item = &'a EntryId>> ConvertIndexToRange<'a, I> {
pub fn new(mut iter: Peekable<I>, avg_size: usize) -> Self {
let base = iter.peek().cloned().cloned();
Self {
base,
iter,
avg_size,
}
}
}
impl<'a, I: Iterator<Item = &'a EntryId>> Iterator for ConvertIndexToRange<'a, I> {
type Item = Range<usize>;
fn next(&mut self) -> Option<Self::Item> {
let (base, val) = (&self.base?, self.iter.next()?);
let start = (*val - *base) as usize * self.avg_size;
let end = start + self.avg_size + 1;
Some(start..end)
}
}
/// Merge all ranges smaller than the `window_size`.
///
/// e.g.,
///
/// Case 1
/// - input: range: [(0..3), (5..6), (5..8)], window_size: 6
/// - output: range: (0..6)
///
/// Case 2
/// - input: range: [(0..3)], window_size: 6
/// - output: range: (0..3)
pub(crate) struct MergeRange<I: Iterator<Item = Range<usize>>> {
iter: I,
window_size: usize,
}
impl<I: Iterator<Item = Range<usize>>> MergeRange<I> {
pub fn new(iter: I, window_size: usize) -> Self {
Self { iter, window_size }
}
}
/// Merges ranges.
fn merge(this: &mut Range<usize>, other: &Range<usize>) {
this.start = min(this.start, other.start);
this.end = max(this.end, other.end);
}
impl<I: Iterator<Item = Range<usize>>> MergeRange<I> {
/// Calculates the size of the next merged range.
pub(crate) fn merge(mut self) -> Option<(Range<usize>, usize)> {
let mut merged_range = self.iter.next();
let this = merged_range.as_mut()?;
let mut merged = 1;
for next in self.iter {
let window = next.start - this.start;
if window > self.window_size {
break;
} else {
merge(this, &next);
merged += 1;
}
}
merged_range.map(|range| (range, merged))
}
}
#[cfg(test)]
#[allow(clippy::single_range_in_vec_init)]
mod tests {
use super::*;
#[test]
fn test_convert_index_to_range() {
let avg_size = 1024;
let index = [1u64, 4, 10, 15];
let mut converter = ConvertIndexToRange::new(index.iter().peekable(), avg_size);
assert_eq!(converter.next(), Some(0..avg_size + 1));
assert_eq!(converter.next(), Some(3 * avg_size..4 * avg_size + 1));
assert_eq!(converter.next(), Some(9 * avg_size..10 * avg_size + 1));
assert_eq!(converter.next(), Some(14 * avg_size..15 * avg_size + 1));
assert_eq!(converter.next(), None);
let index = [];
let mut converter = ConvertIndexToRange::new(index.iter().peekable(), avg_size);
assert_eq!(converter.next(), None);
}
#[test]
fn test_merge_range() {
let size_range = [(10usize..13), (12..14), (16..18), (19..29)];
let merger = MergeRange::new(size_range.into_iter(), 9);
assert_eq!(merger.merge(), Some((10..29, 4)));
let size_range = [(10usize..13), (12..14), (16..18)];
let merger = MergeRange::new(size_range.into_iter(), 5);
assert_eq!(merger.merge(), Some((10..14, 2)));
let size_range = [(10usize..13), (15..17), (16..18)];
let merger = MergeRange::new(size_range.into_iter(), 5);
assert_eq!(merger.merge(), Some((10..17, 2)));
let size_range = [(10usize..13)];
let merger = MergeRange::new(size_range.into_iter(), 4);
assert_eq!(merger.merge(), Some((10..13, 1)));
let size_range = [(10usize..13)];
let merger = MergeRange::new(size_range.into_iter(), 2);
assert_eq!(merger.merge(), Some((10..13, 1)));
let size_range = [];
let merger = MergeRange::new(size_range.into_iter(), 2);
assert_eq!(merger.merge(), None);
}
}