feat: replay WAL entries respect index (#4565)

* feat(log_store): use new `Consumer`

* feat: add `from_peer_id`

* feat: read WAL entries respect index

* test: add test for `build_region_wal_index_iterator`

* fix: keep the handle

* fix: incorrect last index

* fix: replay last entry id may be greater than expected

* chore: remove unused code

* chore: apply suggestions from CR

* chore: rename `datanode_id` to `location_id`

* chore: rename `from_peer_id` to `location_id`

* chore: rename `from_peer_id` to `location_id`

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2024-08-28 19:37:18 +08:00
committed by GitHub
parent 64ae32def0
commit 47657ebbc8
23 changed files with 459 additions and 72 deletions

1
Cargo.lock generated
View File

@@ -5826,6 +5826,7 @@ dependencies = [
"common-time",
"common-wal",
"delta-encoding",
"derive_builder 0.12.0",
"futures",
"futures-util",
"itertools 0.10.5",

View File

@@ -153,6 +153,9 @@ pub struct UpgradeRegion {
/// it's helpful to verify whether the leader region is ready.
#[serde(with = "humantime_serde")]
pub wait_for_replay_timeout: Option<Duration>,
/// The hint for replaying memtable.
#[serde(default)]
pub location_id: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]

View File

@@ -206,6 +206,7 @@ mod tests {
region_id,
last_entry_id: None,
wait_for_replay_timeout: None,
location_id: None,
});
assert!(
heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction)))

View File

@@ -27,6 +27,7 @@ impl HandlerContext {
region_id,
last_entry_id,
wait_for_replay_timeout,
location_id,
}: UpgradeRegion,
) -> BoxFuture<'static, InstructionReply> {
Box::pin(async move {
@@ -62,6 +63,7 @@ impl HandlerContext {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true,
entry_id: last_entry_id,
location_id,
}),
)
.await?;
@@ -151,6 +153,7 @@ mod tests {
region_id,
last_entry_id: None,
wait_for_replay_timeout,
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
@@ -191,6 +194,7 @@ mod tests {
region_id,
last_entry_id: None,
wait_for_replay_timeout,
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
@@ -232,6 +236,7 @@ mod tests {
region_id,
last_entry_id: None,
wait_for_replay_timeout,
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
@@ -274,8 +279,9 @@ mod tests {
.clone()
.handle_upgrade_region_instruction(UpgradeRegion {
region_id,
last_entry_id: None,
wait_for_replay_timeout,
last_entry_id: None,
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
@@ -293,6 +299,7 @@ mod tests {
region_id,
last_entry_id: None,
wait_for_replay_timeout: Some(Duration::from_millis(500)),
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
@@ -337,6 +344,7 @@ mod tests {
region_id,
last_entry_id: None,
wait_for_replay_timeout: None,
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
@@ -354,6 +362,7 @@ mod tests {
region_id,
last_entry_id: None,
wait_for_replay_timeout: Some(Duration::from_millis(200)),
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));

View File

@@ -26,6 +26,7 @@ common-telemetry.workspace = true
common-time.workspace = true
common-wal.workspace = true
delta-encoding = "0.4"
derive_builder.workspace = true
futures.workspace = true
futures-util.workspace = true
itertools.workspace = true

View File

@@ -304,6 +304,15 @@ pub enum Error {
error: object_store::Error,
},
#[snafu(display("Failed to read index, path: {path}"))]
ReadIndex {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: object_store::Error,
path: String,
},
#[snafu(display(
"The length of meta if exceeded the limit: {}, actual: {}",
limit,

View File

@@ -14,15 +14,10 @@
pub(crate) mod client_manager;
pub(crate) mod consumer;
/// TODO(weny): remove it.
#[allow(dead_code)]
#[allow(unused_imports)]
pub(crate) mod index;
pub mod log_store;
pub(crate) mod producer;
pub(crate) mod util;
/// TODO(weny): remove it.
#[allow(dead_code)]
pub(crate) mod worker;
pub use index::{default_index_file, GlobalIndexCollector};

View File

@@ -19,6 +19,7 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use common_telemetry::debug;
use derive_builder::Builder;
use futures::future::{BoxFuture, Fuse, FusedFuture};
use futures::{FutureExt, Stream};
use pin_project::pin_project;
@@ -60,40 +61,61 @@ struct FetchResult {
used_offset: i64,
}
const MAX_BATCH_SIZE: usize = 52428800;
const AVG_RECORD_SIZE: usize = 256 * 1024;
/// The [`Consumer`] struct represents a Kafka consumer that fetches messages from
/// a Kafka cluster. Yielding records respecting the [`RegionWalIndexIterator`].
#[pin_project]
#[derive(Builder)]
#[builder(pattern = "owned")]
pub struct Consumer {
#[builder(default = "-1")]
last_high_watermark: i64,
/// The client is used to fetch records from kafka topic.
client: Arc<dyn FetchClient>,
/// The max batch size in a single fetch request.
#[builder(default = "MAX_BATCH_SIZE")]
max_batch_size: usize,
/// The max wait milliseconds.
#[builder(default = "500")]
max_wait_ms: u32,
/// The avg record size
#[builder(default = "AVG_RECORD_SIZE")]
avg_record_size: usize,
/// Termination flag
#[builder(default = "false")]
terminated: bool,
/// The buffer of records.
buffer: RecordsBuffer,
/// The fetch future.
#[builder(default = "Fuse::terminated()")]
fetch_fut: Fuse<BoxFuture<'static, rskafka::client::error::Result<FetchResult>>>,
}
struct RecordsBuffer {
pub(crate) struct RecordsBuffer {
buffer: VecDeque<RecordAndOffset>,
index: Box<dyn RegionWalIndexIterator>,
}
impl RecordsBuffer {
/// Creates an empty [`RecordsBuffer`]
pub fn new(index: Box<dyn RegionWalIndexIterator>) -> Self {
RecordsBuffer {
buffer: VecDeque::new(),
index,
}
}
}
impl RecordsBuffer {
fn pop_front(&mut self) -> Option<RecordAndOffset> {
while let Some(index) = self.index.peek() {

View File

@@ -20,10 +20,11 @@ pub use collector::GlobalIndexCollector;
pub(crate) use collector::{IndexCollector, NoopCollector};
pub(crate) use encoder::{IndexEncoder, JsonIndexEncoder};
pub(crate) use iterator::{
MultipleRegionWalIndexIterator, NextBatchHint, RegionWalIndexIterator, RegionWalRange,
RegionWalVecIndex,
build_region_wal_index_iterator, NextBatchHint, RegionWalIndexIterator, MIN_BATCH_WINDOW_SIZE,
};
#[cfg(test)]
pub(crate) use iterator::{MultipleRegionWalIndexIterator, RegionWalRange, RegionWalVecIndex};
pub fn default_index_file(datanode_id: u64) -> String {
format!("__datanode/{datanode_id}/index.json")
pub fn default_index_file(location_id: u64) -> String {
format!("__wal/{location_id}/index.json")
}

View File

@@ -19,6 +19,7 @@ use std::time::Duration;
use common_telemetry::{error, info};
use futures::future::try_join_all;
use object_store::ErrorKind;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::logstore::provider::KafkaProvider;
@@ -28,8 +29,9 @@ use tokio::select;
use tokio::sync::mpsc::Sender;
use tokio::sync::Mutex as TokioMutex;
use super::default_index_file;
use crate::error::{self, Result};
use crate::kafka::index::encoder::IndexEncoder;
use crate::kafka::index::encoder::{DatanodeWalIndexes, IndexEncoder};
use crate::kafka::index::JsonIndexEncoder;
use crate::kafka::worker::{DumpIndexRequest, TruncateIndexRequest, WorkerRequest};
@@ -52,10 +54,11 @@ pub trait IndexCollector: Send + Sync {
/// The [`GlobalIndexCollector`] struct is responsible for managing index entries
/// across multiple providers.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct GlobalIndexCollector {
providers: Arc<TokioMutex<HashMap<Arc<KafkaProvider>, Sender<WorkerRequest>>>>,
task: CollectionTask,
operator: object_store::ObjectStore,
_handle: CollectionTaskHandle,
}
#[derive(Debug, Clone)]
@@ -103,7 +106,7 @@ impl CollectionTask {
/// The background task performs two main operations:
/// - Persists the WAL index to the specified `path` at every `dump_index_interval`.
/// - Updates the latest index ID for each WAL provider at every `checkpoint_interval`.
fn run(&self) {
fn run(self) -> CollectionTaskHandle {
let mut dump_index_interval = tokio::time::interval(self.dump_index_interval);
let running = self.running.clone();
let moved_self = self.clone();
@@ -122,15 +125,23 @@ impl CollectionTask {
}
}
});
CollectionTaskHandle {
running: self.running.clone(),
}
}
}
impl Drop for CollectionTask {
impl Drop for CollectionTaskHandle {
fn drop(&mut self) {
self.running.store(false, Ordering::Relaxed);
}
}
#[derive(Debug, Default)]
struct CollectionTaskHandle {
running: Arc<AtomicBool>,
}
impl GlobalIndexCollector {
/// Constructs a [`GlobalIndexCollector`].
///
@@ -148,16 +159,65 @@ impl GlobalIndexCollector {
let task = CollectionTask {
providers: providers.clone(),
dump_index_interval,
operator,
operator: operator.clone(),
path,
running: Arc::new(AtomicBool::new(true)),
};
task.run();
Self { providers, task }
let handle = task.run();
Self {
providers,
operator,
_handle: handle,
}
}
#[cfg(test)]
pub fn new_for_test(operator: object_store::ObjectStore) -> Self {
Self {
providers: Default::default(),
operator,
_handle: Default::default(),
}
}
}
impl GlobalIndexCollector {
/// Retrieve [`EntryId`]s for a specified `region_id` in `datanode_id`
/// that are greater than or equal to a given `entry_id`.
pub(crate) async fn read_remote_region_index(
&self,
location_id: u64,
provider: &KafkaProvider,
region_id: RegionId,
entry_id: EntryId,
) -> Result<Option<(BTreeSet<EntryId>, EntryId)>> {
let path = default_index_file(location_id);
let bytes = match self.operator.read(&path).await {
Ok(bytes) => bytes.to_vec(),
Err(err) => {
if err.kind() == ErrorKind::NotFound {
return Ok(None);
} else {
return Err(err).context(error::ReadIndexSnafu { path });
}
}
};
match DatanodeWalIndexes::decode(&bytes)?.provider(provider) {
Some(indexes) => {
let last_index = indexes.last_index();
let indexes = indexes
.region(region_id)
.unwrap_or_default()
.split_off(&entry_id);
Ok(Some((indexes, last_index)))
}
None => Ok(None),
}
}
/// Creates a new [`ProviderLevelIndexCollector`] for a specified provider.
pub(crate) async fn provider_level_index_collector(
&self,
@@ -266,3 +326,92 @@ impl IndexCollector for NoopCollector {
fn dump(&mut self, _encoder: &dyn IndexEncoder) {}
}
#[cfg(test)]
mod tests {
use std::collections::{BTreeSet, HashMap};
use store_api::logstore::provider::KafkaProvider;
use store_api::storage::RegionId;
use crate::kafka::index::collector::RegionIndexes;
use crate::kafka::index::encoder::IndexEncoder;
use crate::kafka::index::JsonIndexEncoder;
use crate::kafka::{default_index_file, GlobalIndexCollector};
#[tokio::test]
async fn test_read_remote_region_index() {
let operator = object_store::ObjectStore::new(object_store::services::Memory::default())
.unwrap()
.finish();
let path = default_index_file(0);
let encoder = JsonIndexEncoder::default();
encoder.encode(
&KafkaProvider::new("my_topic_0".to_string()),
&RegionIndexes {
regions: HashMap::from([(RegionId::new(1, 1), BTreeSet::from([1, 5, 15]))]),
latest_entry_id: 20,
},
);
let bytes = encoder.finish().unwrap();
let mut writer = operator.writer(&path).await.unwrap();
writer.write(bytes).await.unwrap();
writer.close().await.unwrap();
let collector = GlobalIndexCollector::new_for_test(operator.clone());
// Index file doesn't exist
let result = collector
.read_remote_region_index(
1,
&KafkaProvider::new("my_topic_0".to_string()),
RegionId::new(1, 1),
1,
)
.await
.unwrap();
assert!(result.is_none());
// RegionId doesn't exist
let (indexes, last_index) = collector
.read_remote_region_index(
0,
&KafkaProvider::new("my_topic_0".to_string()),
RegionId::new(1, 2),
5,
)
.await
.unwrap()
.unwrap();
assert_eq!(indexes, BTreeSet::new());
assert_eq!(last_index, 20);
// RegionId(1, 1), Start EntryId: 5
let (indexes, last_index) = collector
.read_remote_region_index(
0,
&KafkaProvider::new("my_topic_0".to_string()),
RegionId::new(1, 1),
5,
)
.await
.unwrap()
.unwrap();
assert_eq!(indexes, BTreeSet::from([5, 15]));
assert_eq!(last_index, 20);
// RegionId(1, 1), Start EntryId: 20
let (indexes, last_index) = collector
.read_remote_region_index(
0,
&KafkaProvider::new("my_topic_0".to_string()),
RegionId::new(1, 1),
20,
)
.await
.unwrap()
.unwrap();
assert_eq!(indexes, BTreeSet::new());
assert_eq!(last_index, 20);
}
}

View File

@@ -50,7 +50,7 @@ pub struct DeltaEncodedRegionIndexes {
impl DeltaEncodedRegionIndexes {
/// Retrieves the original (decoded) index values for a given region.
fn region(&self, region_id: RegionId) -> Option<BTreeSet<u64>> {
pub(crate) fn region(&self, region_id: RegionId) -> Option<BTreeSet<u64>> {
let decoded = self
.regions
.get(&region_id)
@@ -60,7 +60,7 @@ impl DeltaEncodedRegionIndexes {
}
/// Retrieves the last index.
fn last_index(&self) -> u64 {
pub(crate) fn last_index(&self) -> u64 {
self.last_index
}
}
@@ -86,7 +86,7 @@ impl DatanodeWalIndexes {
value
}
fn decode(byte: &[u8]) -> Result<Self> {
pub(crate) fn decode(byte: &[u8]) -> Result<Self> {
serde_json::from_slice(byte).context(error::DecodeJsonSnafu)
}
@@ -118,7 +118,7 @@ impl IndexEncoder for JsonIndexEncoder {
#[cfg(test)]
mod tests {
use std::collections::{BTreeSet, HashMap, HashSet};
use std::collections::{BTreeSet, HashMap};
use store_api::logstore::provider::KafkaProvider;
use store_api::storage::RegionId;

View File

@@ -12,8 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::cmp::min;
use std::collections::VecDeque;
use std::cmp::{max, min};
use std::collections::{BTreeSet, VecDeque};
use std::fmt::Debug;
use std::ops::Range;
use store_api::logstore::EntryId;
@@ -27,7 +28,7 @@ pub(crate) struct NextBatchHint {
}
/// An iterator over WAL (Write-Ahead Log) entries index for a region.
pub trait RegionWalIndexIterator: Send + Sync {
pub trait RegionWalIndexIterator: Send + Sync + Debug {
/// Returns next batch hint.
fn next_batch_hint(&self, avg_size: usize) -> Option<NextBatchHint>;
@@ -36,9 +37,13 @@ pub trait RegionWalIndexIterator: Send + Sync {
// Advances the iterator and returns the next EntryId.
fn next(&mut self) -> Option<EntryId>;
#[cfg(test)]
fn as_any(&self) -> &dyn std::any::Any;
}
/// Represents a range [next_entry_id, end_entry_id) of WAL entries for a region.
#[derive(Debug)]
pub struct RegionWalRange {
current_entry_id: EntryId,
end_entry_id: EntryId,
@@ -96,10 +101,18 @@ impl RegionWalIndexIterator for RegionWalRange {
None
}
}
#[cfg(test)]
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
pub const MIN_BATCH_WINDOW_SIZE: usize = 4 * 1024 * 1024;
/// Represents an index of Write-Ahead Log entries for a region,
/// stored as a vector of [EntryId]s.
#[derive(Debug)]
pub struct RegionWalVecIndex {
index: VecDeque<EntryId>,
min_batch_window_size: usize,
@@ -134,11 +147,17 @@ impl RegionWalIndexIterator for RegionWalVecIndex {
fn next(&mut self) -> Option<EntryId> {
self.index.pop_front()
}
#[cfg(test)]
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
/// Represents an iterator over multiple region WAL indexes.
///
/// Allowing iteration through multiple WAL indexes.
#[derive(Debug)]
pub struct MultipleRegionWalIndexIterator {
iterator: VecDeque<Box<dyn RegionWalIndexIterator>>,
}
@@ -185,6 +204,53 @@ impl RegionWalIndexIterator for MultipleRegionWalIndexIterator {
self.iterator.front_mut().and_then(|iter| iter.next())
}
#[cfg(test)]
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
/// Builds [`RegionWalIndexIterator`].
///
/// Returns None means there are no entries to replay.
pub fn build_region_wal_index_iterator(
start_entry_id: EntryId,
end_entry_id: EntryId,
region_indexes: Option<(BTreeSet<EntryId>, EntryId)>,
max_batch_bytes: usize,
min_window_size: usize,
) -> Option<Box<dyn RegionWalIndexIterator>> {
if (start_entry_id..end_entry_id).is_empty() {
return None;
}
match region_indexes {
Some((region_indexes, last_index)) => {
if region_indexes.is_empty() && last_index >= end_entry_id {
return None;
}
let mut iterator: Vec<Box<dyn RegionWalIndexIterator>> = Vec::with_capacity(2);
if !region_indexes.is_empty() {
let index = RegionWalVecIndex::new(region_indexes, min_window_size);
iterator.push(Box::new(index));
}
let known_last_index = max(last_index, start_entry_id);
if known_last_index < end_entry_id {
let range = known_last_index..end_entry_id;
let index = RegionWalRange::new(range, max_batch_bytes);
iterator.push(Box::new(index));
}
Some(Box::new(MultipleRegionWalIndexIterator::new(iterator)))
}
None => {
let range = start_entry_id..end_entry_id;
Some(Box::new(RegionWalRange::new(range, max_batch_bytes)))
}
}
}
#[cfg(test)]
@@ -353,4 +419,69 @@ mod tests {
assert_eq!(iter.peek(), None);
assert_eq!(iter.next(), None);
}
#[test]
fn test_build_region_wal_index_iterator() {
let iterator = build_region_wal_index_iterator(1024, 1024, None, 5, 5);
assert!(iterator.is_none());
let iterator = build_region_wal_index_iterator(1024, 1023, None, 5, 5);
assert!(iterator.is_none());
let iterator =
build_region_wal_index_iterator(1024, 1024, Some((BTreeSet::new(), 1024)), 5, 5);
assert!(iterator.is_none());
let iterator =
build_region_wal_index_iterator(1, 1024, Some((BTreeSet::new(), 1024)), 5, 5);
assert!(iterator.is_none());
let iterator =
build_region_wal_index_iterator(1, 1024, Some((BTreeSet::new(), 1025)), 5, 5);
assert!(iterator.is_none());
let iterator = build_region_wal_index_iterator(
1,
1024,
Some((BTreeSet::from([512, 756]), 1024)),
5,
5,
)
.unwrap();
let iter = iterator
.as_any()
.downcast_ref::<MultipleRegionWalIndexIterator>()
.unwrap();
assert_eq!(iter.iterator.len(), 1);
let vec_index = iter.iterator[0]
.as_any()
.downcast_ref::<RegionWalVecIndex>()
.unwrap();
assert_eq!(vec_index.index, VecDeque::from([512, 756]));
let iterator = build_region_wal_index_iterator(
1,
1024,
Some((BTreeSet::from([512, 756]), 1023)),
5,
5,
)
.unwrap();
let iter = iterator
.as_any()
.downcast_ref::<MultipleRegionWalIndexIterator>()
.unwrap();
assert_eq!(iter.iterator.len(), 2);
let vec_index = iter.iterator[0]
.as_any()
.downcast_ref::<RegionWalVecIndex>()
.unwrap();
assert_eq!(vec_index.index, VecDeque::from([512, 756]));
let wal_range = iter.iterator[1]
.as_any()
.downcast_ref::<RegionWalRange>()
.unwrap();
assert_eq!(wal_range.current_entry_id, 1023);
assert_eq!(wal_range.end_entry_id, 1024);
}
}

View File

@@ -20,19 +20,20 @@ use common_telemetry::{debug, warn};
use common_wal::config::kafka::DatanodeKafkaConfig;
use futures::future::try_join_all;
use futures_util::StreamExt;
use rskafka::client::consumer::{StartOffset, StreamConsumerBuilder};
use rskafka::client::partition::OffsetAt;
use snafu::{OptionExt, ResultExt};
use store_api::logstore::entry::{
Entry, Id as EntryId, MultiplePartEntry, MultiplePartHeader, NaiveEntry,
};
use store_api::logstore::provider::{KafkaProvider, Provider};
use store_api::logstore::{AppendBatchResponse, LogStore, SendableEntryStream};
use store_api::logstore::{AppendBatchResponse, LogStore, SendableEntryStream, WalIndex};
use store_api::storage::RegionId;
use super::index::build_region_wal_index_iterator;
use crate::error::{self, ConsumeRecordSnafu, Error, GetOffsetSnafu, InvalidProviderSnafu, Result};
use crate::kafka::client_manager::{ClientManager, ClientManagerRef};
use crate::kafka::index::GlobalIndexCollector;
use crate::kafka::consumer::{ConsumerBuilder, RecordsBuffer};
use crate::kafka::index::{GlobalIndexCollector, MIN_BATCH_WINDOW_SIZE};
use crate::kafka::producer::OrderedBatchProducerRef;
use crate::kafka::util::record::{
convert_to_kafka_records, maybe_emit_entry, remaining_entries, Record, ESTIMATED_META_SIZE,
@@ -205,6 +206,7 @@ impl LogStore for KafkaLogStore {
&self,
provider: &Provider,
entry_id: EntryId,
index: Option<WalIndex>,
) -> Result<SendableEntryStream<'static, Entry, Self::Error>> {
let provider = provider
.as_kafka_provider()
@@ -232,35 +234,41 @@ impl LogStore for KafkaLogStore {
.await
.context(GetOffsetSnafu {
topic: &provider.topic,
})?
- 1;
// Reads entries with offsets in the range [start_offset, end_offset].
let start_offset = entry_id as i64;
})?;
debug!(
"Start reading entries in range [{}, {}] for ns {}",
start_offset, end_offset, provider
);
let region_indexes = if let (Some(index), Some(collector)) =
(index, self.client_manager.global_index_collector())
{
collector
.read_remote_region_index(index.location_id, provider, index.region_id, entry_id)
.await?
} else {
None
};
// Abort if there're no new entries.
// FIXME(niebayes): how come this case happens?
if start_offset > end_offset {
warn!(
"No new entries for ns {} in range [{}, {}]",
provider, start_offset, end_offset
);
let Some(iterator) = build_region_wal_index_iterator(
entry_id,
end_offset as u64,
region_indexes,
self.max_batch_bytes,
MIN_BATCH_WINDOW_SIZE,
) else {
let range = entry_id..end_offset as u64;
warn!("No new entries in range {:?} of ns {}", range, provider);
return Ok(futures_util::stream::empty().boxed());
}
};
let mut stream_consumer = StreamConsumerBuilder::new(client, StartOffset::At(start_offset))
.with_max_batch_size(self.max_batch_bytes as i32)
.with_max_wait_ms(self.consumer_wait_timeout.as_millis() as i32)
.build();
debug!("Reading entries with {:?} of ns {}", iterator, provider);
debug!(
"Built a stream consumer for ns {} to consume entries in range [{}, {}]",
provider, start_offset, end_offset
);
// Safety: Must be ok.
let mut stream_consumer = ConsumerBuilder::default()
.client(client)
// Safety: checked before.
.buffer(RecordsBuffer::new(iterator))
.max_batch_size(self.max_batch_bytes)
.max_wait_ms(self.consumer_wait_timeout.as_millis() as u32)
.build()
.unwrap();
// A buffer is used to collect records to construct a complete entry.
let mut entry_records: HashMap<RegionId, Vec<Record>> = HashMap::new();
@@ -511,7 +519,7 @@ mod tests {
// 5 region
assert_eq!(response.last_entry_ids.len(), 5);
let got_entries = logstore
.read(&provider, 0)
.read(&provider, 0, None)
.await
.unwrap()
.try_collect::<Vec<_>>()
@@ -584,7 +592,7 @@ mod tests {
// 5 region
assert_eq!(response.last_entry_ids.len(), 5);
let got_entries = logstore
.read(&provider, 0)
.read(&provider, 0, None)
.await
.unwrap()
.try_collect::<Vec<_>>()

View File

@@ -25,7 +25,7 @@ use raft_engine::{Config, Engine, LogBatch, MessageExt, ReadableSize, RecoveryMo
use snafu::{ensure, OptionExt, ResultExt};
use store_api::logstore::entry::{Entry, Id as EntryId, NaiveEntry};
use store_api::logstore::provider::{Provider, RaftEngineProvider};
use store_api::logstore::{AppendBatchResponse, LogStore, SendableEntryStream};
use store_api::logstore::{AppendBatchResponse, LogStore, SendableEntryStream, WalIndex};
use store_api::storage::RegionId;
use crate::error::{
@@ -252,6 +252,7 @@ impl LogStore for RaftEngineLogStore {
&self,
provider: &Provider,
entry_id: EntryId,
_index: Option<WalIndex>,
) -> Result<SendableEntryStream<'static, Entry, Self::Error>> {
let ns = provider
.as_raft_engine_provider()
@@ -545,7 +546,7 @@ mod tests {
}
let mut entries = HashSet::with_capacity(1024);
let mut s = logstore
.read(&Provider::raft_engine_provider(1), 0)
.read(&Provider::raft_engine_provider(1), 0, None)
.await
.unwrap();
while let Some(r) = s.next().await {
@@ -578,7 +579,7 @@ mod tests {
.await
.is_ok());
let entries = logstore
.read(&Provider::raft_engine_provider(1), 1)
.read(&Provider::raft_engine_provider(1), 1, None)
.await
.unwrap()
.collect::<Vec<_>>()
@@ -596,7 +597,7 @@ mod tests {
let entries = collect_entries(
logstore
.read(&Provider::raft_engine_provider(1), 1)
.read(&Provider::raft_engine_provider(1), 1, None)
.await
.unwrap(),
)
@@ -682,7 +683,7 @@ mod tests {
logstore.obsolete(&namespace, region_id, 100).await.unwrap();
assert_eq!(101, logstore.engine.first_index(namespace_id).unwrap());
let res = logstore.read(&namespace, 100).await.unwrap();
let res = logstore.read(&namespace, 100, None).await.unwrap();
let mut vec = collect_entries(res).await;
vec.sort_by(|a, b| a.entry_id().partial_cmp(&b.entry_id()).unwrap());
assert_eq!(101, vec.first().unwrap().entry_id());

View File

@@ -90,6 +90,7 @@ impl UpgradeCandidateRegion {
region_id,
last_entry_id,
wait_for_replay_timeout: Some(self.replay_timeout),
location_id: Some(ctx.persistent_ctx.from_peer.id),
})
}

View File

@@ -41,6 +41,7 @@ impl MetricEngineInner {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: req.set_writable,
entry_id: None,
location_id: req.location_id,
}),
)
.await
@@ -52,6 +53,7 @@ impl MetricEngineInner {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: req.set_writable,
entry_id: req.entry_id,
location_id: req.location_id,
}),
)
.await

View File

@@ -94,6 +94,7 @@ async fn test_catchup_with_last_entry_id() {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: false,
entry_id: last_entry_id,
location_id: None,
}),
)
.await;
@@ -125,6 +126,7 @@ async fn test_catchup_with_last_entry_id() {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true,
entry_id: last_entry_id,
location_id: None,
}),
)
.await;
@@ -191,6 +193,7 @@ async fn test_catchup_with_incorrect_last_entry_id() {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: false,
entry_id: incorrect_last_entry_id,
location_id: None,
}),
)
.await
@@ -207,6 +210,7 @@ async fn test_catchup_with_incorrect_last_entry_id() {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: false,
entry_id: incorrect_last_entry_id,
location_id: None,
}),
)
.await;
@@ -255,6 +259,7 @@ async fn test_catchup_without_last_entry_id() {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: false,
entry_id: None,
location_id: None,
}),
)
.await;
@@ -285,6 +290,7 @@ async fn test_catchup_without_last_entry_id() {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true,
entry_id: None,
location_id: None,
}),
)
.await;
@@ -354,6 +360,7 @@ async fn test_catchup_with_manifest_update() {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: false,
entry_id: None,
location_id: None,
}),
)
.await;
@@ -390,6 +397,7 @@ async fn test_catchup_with_manifest_update() {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true,
entry_id: None,
location_id: None,
}),
)
.await;
@@ -411,6 +419,7 @@ async fn test_catchup_not_exist() {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true,
entry_id: None,
location_id: None,
}),
)
.await

View File

@@ -313,7 +313,7 @@ impl RegionOpener {
let wal_entry_reader = self
.wal_entry_reader
.take()
.unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id));
.unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id, None));
let on_region_opened = wal.on_region_opened();
let object_store = self.object_store(&region_options.storage)?.clone();

View File

@@ -30,7 +30,7 @@ use prost::Message;
use snafu::ResultExt;
use store_api::logstore::entry::Entry;
use store_api::logstore::provider::Provider;
use store_api::logstore::{AppendBatchResponse, LogStore};
use store_api::logstore::{AppendBatchResponse, LogStore, WalIndex};
use store_api::storage::RegionId;
use crate::error::{BuildEntrySnafu, DeleteWalSnafu, EncodeWalSnafu, Result, WriteWalSnafu};
@@ -102,15 +102,24 @@ impl<S: LogStore> Wal<S> {
&self,
provider: &Provider,
region_id: RegionId,
location_id: Option<u64>,
) -> Box<dyn WalEntryReader> {
match provider {
Provider::RaftEngine(_) => Box::new(LogStoreEntryReader::new(
LogStoreRawEntryReader::new(self.store.clone()),
)),
Provider::Kafka(_) => Box::new(LogStoreEntryReader::new(RegionRawEntryReader::new(
LogStoreRawEntryReader::new(self.store.clone()),
region_id,
))),
Provider::Kafka(_) => {
let reader = if let Some(location_id) = location_id {
LogStoreRawEntryReader::new(self.store.clone())
.with_wal_index(WalIndex::new(region_id, location_id))
} else {
LogStoreRawEntryReader::new(self.store.clone())
};
Box::new(LogStoreEntryReader::new(RegionRawEntryReader::new(
reader, region_id,
)))
}
}
}

View File

@@ -20,7 +20,7 @@ use futures::stream::BoxStream;
use snafu::ResultExt;
use store_api::logstore::entry::Entry;
use store_api::logstore::provider::Provider;
use store_api::logstore::LogStore;
use store_api::logstore::{LogStore, WalIndex};
use store_api::storage::RegionId;
use tokio_stream::StreamExt;
@@ -38,11 +38,20 @@ pub(crate) trait RawEntryReader: Send + Sync {
/// Implement the [RawEntryReader] for the [LogStore].
pub struct LogStoreRawEntryReader<S> {
store: Arc<S>,
wal_index: Option<WalIndex>,
}
impl<S> LogStoreRawEntryReader<S> {
pub fn new(store: Arc<S>) -> Self {
Self { store }
Self {
store,
wal_index: None,
}
}
pub fn with_wal_index(mut self, wal_index: WalIndex) -> Self {
self.wal_index = Some(wal_index);
self
}
}
@@ -50,9 +59,10 @@ impl<S: LogStore> RawEntryReader for LogStoreRawEntryReader<S> {
fn read(&self, provider: &Provider, start_id: EntryId) -> Result<EntryStream<'static>> {
let store = self.store.clone();
let provider = provider.clone();
let wal_index = self.wal_index;
let stream = try_stream!({
let mut stream = store
.read(&provider, start_id)
.read(&provider, start_id, wal_index)
.await
.map_err(BoxedError::new)
.with_context(|_| error::ReadWalSnafu {
@@ -119,7 +129,9 @@ mod tests {
use futures::{stream, TryStreamExt};
use store_api::logstore::entry::{Entry, NaiveEntry};
use store_api::logstore::{AppendBatchResponse, EntryId, LogStore, SendableEntryStream};
use store_api::logstore::{
AppendBatchResponse, EntryId, LogStore, SendableEntryStream, WalIndex,
};
use store_api::storage::RegionId;
use super::*;
@@ -149,6 +161,7 @@ mod tests {
&self,
_provider: &Provider,
_id: EntryId,
_index: Option<WalIndex>,
) -> Result<SendableEntryStream<'static, Entry, Self::Error>, Self::Error> {
Ok(Box::pin(stream::iter(vec![Ok(self.entries.clone())])))
}

View File

@@ -74,7 +74,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
let flushed_entry_id = region.version_control.current().last_entry_id;
info!("Trying to replay memtable for region: {region_id}, flushed entry id: {flushed_entry_id}");
let timer = Instant::now();
let wal_entry_reader = self.wal.wal_entry_reader(&region.provider, region_id);
let wal_entry_reader =
self.wal
.wal_entry_reader(&region.provider, region_id, request.location_id);
let on_region_opened = self.wal.on_region_opened();
let last_entry_id = replay_memtable(
&region.provider,
@@ -93,7 +95,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
);
if let Some(expected_last_entry_id) = request.entry_id {
ensure!(
expected_last_entry_id == last_entry_id,
// The replayed last entry id may be greater than the `expected_last_entry_id`.
last_entry_id >= expected_last_entry_id,
error::UnexpectedReplaySnafu {
region_id,
expected_last_entry_id,

View File

@@ -30,6 +30,22 @@ pub use crate::logstore::entry::Id as EntryId;
use crate::logstore::provider::Provider;
use crate::storage::RegionId;
// The information used to locate WAL index for the specified region.
#[derive(Debug, Clone, Copy)]
pub struct WalIndex {
pub region_id: RegionId,
pub location_id: u64,
}
impl WalIndex {
pub fn new(region_id: RegionId, location_id: u64) -> Self {
Self {
region_id,
location_id,
}
}
}
/// `LogStore` serves as a Write-Ahead-Log for storage engine.
#[async_trait::async_trait]
pub trait LogStore: Send + Sync + 'static + std::fmt::Debug {
@@ -48,6 +64,7 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug {
&self,
provider: &Provider,
id: EntryId,
index: Option<WalIndex>,
) -> Result<SendableEntryStream<'static, Entry, Self::Error>, Self::Error>;
/// Creates a new `Namespace` from the given ref.

View File

@@ -673,6 +673,8 @@ pub struct RegionCatchupRequest {
/// The `entry_id` that was expected to reply to.
/// `None` stands replaying to latest.
pub entry_id: Option<entry::Id>,
/// The hint for replaying memtable.
pub location_id: Option<u64>,
}
impl fmt::Display for RegionRequest {