feat: implement the handle_batch_open_requests (#4075)

* feat: implement the `handle_batch_open_requests`

* refactor: refactor `handle_batch_open_requests` method signature

* test: add tests for `handle_batch_open_requests`

* chore: fmt code

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2024-06-05 17:22:34 +08:00
committed by GitHub
parent e0a2c5a581
commit c23f8ad113
12 changed files with 532 additions and 47 deletions

View File

@@ -21,6 +21,8 @@ mod append_mode_test;
#[cfg(test)]
mod basic_test;
#[cfg(test)]
mod batch_open_test;
#[cfg(test)]
mod catchup_test;
#[cfg(test)]
mod close_test;
@@ -50,6 +52,7 @@ mod set_readonly_test;
mod truncate_test;
use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
@@ -58,22 +61,33 @@ use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::tracing;
use common_wal::options::{WalOptions, WAL_OPTIONS_KEY};
use futures::future::{join_all, try_join_all};
use object_store::manager::ObjectStoreManagerRef;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::logstore::provider::Provider;
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse};
use store_api::region_request::{AffectedRows, RegionRequest};
use store_api::region_engine::{
BatchResponses, RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse,
};
use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::oneshot;
use tokio::sync::{oneshot, Semaphore};
use crate::config::MitoConfig;
use crate::error::{InvalidRequestSnafu, RecvSnafu, RegionNotFoundSnafu, Result};
use crate::error::{
InvalidRequestSnafu, JoinSnafu, RecvSnafu, RegionNotFoundSnafu, Result, SerdeJsonSnafu,
};
use crate::manifest::action::RegionEdit;
use crate::metrics::HANDLE_REQUEST_ELAPSED;
use crate::read::scan_region::{ScanParallism, ScanRegion, Scanner};
use crate::region::RegionUsage;
use crate::request::WorkerRequest;
use crate::wal::entry_distributor::{
build_wal_entry_distributor_and_receivers, DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
};
use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RawEntryReader};
use crate::worker::WorkerGroup;
pub const MITO_ENGINE_NAME: &str = "mito";
@@ -211,6 +225,41 @@ struct EngineInner {
workers: WorkerGroup,
/// Config of the engine.
config: Arc<MitoConfig>,
/// The Wal raw entry reader.
wal_raw_entry_reader: Arc<dyn RawEntryReader>,
}
type TopicGroupedRegionOpenRequests = HashMap<String, Vec<(RegionId, RegionOpenRequest)>>;
/// Returns requests([TopicGroupedRegionOpenRequests]) grouped by topic and remaining requests.
fn prepare_batch_open_requests(
requests: Vec<(RegionId, RegionOpenRequest)>,
) -> Result<(
TopicGroupedRegionOpenRequests,
Vec<(RegionId, RegionOpenRequest)>,
)> {
let mut topic_to_regions: HashMap<String, Vec<(RegionId, RegionOpenRequest)>> = HashMap::new();
let mut remaining_regions: Vec<(RegionId, RegionOpenRequest)> = Vec::new();
for (region_id, request) in requests {
let options = if let Some(options) = request.options.get(WAL_OPTIONS_KEY) {
serde_json::from_str(options).context(SerdeJsonSnafu)?
} else {
WalOptions::RaftEngine
};
match options {
WalOptions::Kafka(options) => {
topic_to_regions
.entry(options.topic)
.or_default()
.push((region_id, request));
}
WalOptions::RaftEngine => {
remaining_regions.push((region_id, request));
}
}
}
Ok((topic_to_regions, remaining_regions))
}
impl EngineInner {
@@ -221,9 +270,11 @@ impl EngineInner {
object_store_manager: ObjectStoreManagerRef,
) -> Result<EngineInner> {
let config = Arc::new(config);
let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
Ok(EngineInner {
workers: WorkerGroup::start(config.clone(), log_store, object_store_manager).await?,
config,
wal_raw_entry_reader,
})
}
@@ -244,6 +295,93 @@ impl EngineInner {
Ok(region.metadata())
}
async fn open_topic_regions(
&self,
topic: String,
region_requests: Vec<(RegionId, RegionOpenRequest)>,
) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
let region_ids = region_requests
.iter()
.map(|(region_id, _)| *region_id)
.collect::<Vec<_>>();
let provider = Provider::kafka_provider(topic);
let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
provider,
self.wal_raw_entry_reader.clone(),
&region_ids,
DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
);
let mut responses = Vec::with_capacity(region_requests.len());
for ((region_id, request), entry_receiver) in
region_requests.into_iter().zip(entry_receivers)
{
let (request, receiver) =
WorkerRequest::new_open_region_request(region_id, request, Some(entry_receiver));
self.workers.submit_to_worker(region_id, request).await?;
responses.push(async move { receiver.await.context(RecvSnafu)? });
}
// Waits for entries distribution.
let distribution =
common_runtime::spawn_read(async move { distributor.distribute().await });
// Waits for worker returns.
let responses = join_all(responses).await;
distribution.await.context(JoinSnafu)??;
Ok(region_ids.into_iter().zip(responses).collect())
}
async fn handle_batch_open_requests(
&self,
parallelism: usize,
requests: Vec<(RegionId, RegionOpenRequest)>,
) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
let semaphore = Arc::new(Semaphore::new(parallelism));
let (topic_to_region_requests, remaining_region_requests) =
prepare_batch_open_requests(requests)?;
let mut responses =
Vec::with_capacity(topic_to_region_requests.len() + remaining_region_requests.len());
if !topic_to_region_requests.is_empty() {
let mut tasks = Vec::with_capacity(topic_to_region_requests.len());
for (topic, region_requests) in topic_to_region_requests {
let semaphore_moved = semaphore.clone();
tasks.push(async move {
// Safety: semaphore must exist
let _permit = semaphore_moved.acquire().await.unwrap();
self.open_topic_regions(topic, region_requests).await
})
}
let r = try_join_all(tasks).await?;
responses.extend(r.into_iter().flatten());
}
if !remaining_region_requests.is_empty() {
let mut tasks = Vec::with_capacity(remaining_region_requests.len());
let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
for (region_id, request) in remaining_region_requests {
let semaphore_moved = semaphore.clone();
region_ids.push(region_id);
tasks.push(async move {
// Safety: semaphore must exist
let _permit = semaphore_moved.acquire().await.unwrap();
let (request, receiver) =
WorkerRequest::new_open_region_request(region_id, request, None);
self.workers.submit_to_worker(region_id, request).await?;
receiver.await.context(RecvSnafu)?
})
}
let results = join_all(tasks).await;
responses.extend(region_ids.into_iter().zip(results));
}
Ok(responses)
}
/// Handles [RegionRequest] and return its executed result.
async fn handle_request(
&self,
@@ -323,6 +461,30 @@ impl RegionEngine for MitoEngine {
MITO_ENGINE_NAME
}
#[tracing::instrument(skip_all)]
async fn handle_batch_open_requests(
&self,
parallelism: usize,
requests: Vec<(RegionId, RegionOpenRequest)>,
) -> Result<BatchResponses, BoxedError> {
// TODO(weny): add metrics.
self.inner
.handle_batch_open_requests(parallelism, requests)
.await
.map(|responses| {
responses
.into_iter()
.map(|(region_id, response)| {
(
region_id,
response.map(RegionResponse::new).map_err(BoxedError::new),
)
})
.collect::<Vec<_>>()
})
.map_err(BoxedError::new)
}
#[tracing::instrument(skip_all)]
async fn handle_request(
&self,
@@ -421,6 +583,7 @@ impl MitoEngine {
config.sanitize(data_home)?;
let config = Arc::new(config);
let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
Ok(MitoEngine {
inner: Arc::new(EngineInner {
workers: WorkerGroup::start_for_test(
@@ -433,6 +596,7 @@ impl MitoEngine {
)
.await?,
config,
wal_raw_entry_reader,
}),
})
}

View File

@@ -0,0 +1,203 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::v1::Rows;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_recordbatch::RecordBatches;
use common_wal::options::{KafkaWalOptions, WalOptions, WAL_OPTIONS_KEY};
use rstest::rstest;
use rstest_reuse::apply;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionOpenRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use super::MitoEngine;
use crate::config::MitoConfig;
use crate::test_util::{
build_rows, kafka_log_store_factory, multiple_log_store_factories,
prepare_test_for_kafka_log_store, put_rows, raft_engine_log_store_factory, rows_schema,
CreateRequestBuilder, LogStoreFactory, TestEnv,
};
#[apply(multiple_log_store_factories)]
async fn test_batch_open(factory: Option<LogStoreFactory>) {
common_telemetry::init_default_ut_logging();
let Some(factory) = factory else {
return;
};
let mut env =
TestEnv::with_prefix("open-batch-regions").with_log_store_factory(factory.clone());
let engine = env.create_engine(MitoConfig::default()).await;
let topic = prepare_test_for_kafka_log_store(&factory).await;
let num_regions = 3u32;
let region_dir = |region_id| format!("test/{region_id}");
let mut region_schema = HashMap::new();
for id in 1..=num_regions {
let engine = engine.clone();
let topic = topic.clone();
let region_id = RegionId::new(1, id);
let request = CreateRequestBuilder::new()
.region_dir(&region_dir(region_id))
.kafka_topic(topic.clone())
.build();
let column_schemas = rows_schema(&request);
region_schema.insert(region_id, column_schemas);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
}
for i in 0..10 {
for region_number in 1..=num_regions {
let region_id = RegionId::new(1, region_number);
let rows = Rows {
schema: region_schema[&region_id].clone(),
rows: build_rows(
(region_number as usize) * 120 + i as usize,
(region_number as usize) * 120 + i as usize + 1,
),
};
put_rows(&engine, region_id, rows).await;
}
}
let assert_result = |engine: MitoEngine| async move {
for i in 1..num_regions {
let region_id = RegionId::new(1, i);
let request = ScanRequest::default();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let mut expected = String::new();
expected.push_str(
"+-------+---------+---------------------+\n| tag_0 | field_0 | ts |\n+-------+---------+---------------------+\n",
);
for row in 0..10 {
expected.push_str(&format!(
"| {} | {}.0 | 1970-01-01T00:{:02}:{:02} |\n",
i * 120 + row,
i * 120 + row,
2 * i,
row
));
}
expected.push_str("+-------+---------+---------------------+");
assert_eq!(expected, batches.pretty_print().unwrap());
}
};
assert_result(engine.clone()).await;
let mut options = HashMap::new();
if let Some(topic) = &topic {
options.insert(
WAL_OPTIONS_KEY.to_string(),
serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
topic: topic.to_string(),
}))
.unwrap(),
);
};
let mut requests = (1..=num_regions)
.map(|id| {
let region_id = RegionId::new(1, id);
(
region_id,
RegionOpenRequest {
engine: String::new(),
region_dir: region_dir(region_id),
options: options.clone(),
skip_wal_replay: false,
},
)
})
.collect::<Vec<_>>();
requests.push((
RegionId::new(1, 4),
RegionOpenRequest {
engine: String::new(),
region_dir: "no-exists".to_string(),
options: options.clone(),
skip_wal_replay: false,
},
));
// Reopen engine.
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
let mut results = engine
.handle_batch_open_requests(4, requests)
.await
.unwrap();
let (_, result) = results.pop().unwrap();
assert_eq!(
result.unwrap_err().status_code(),
StatusCode::RegionNotFound
);
for (_, result) in results {
assert!(result.is_ok());
}
assert_result(engine.clone()).await;
}
#[apply(multiple_log_store_factories)]
async fn test_batch_open_err(factory: Option<LogStoreFactory>) {
common_telemetry::init_default_ut_logging();
let Some(factory) = factory else {
return;
};
let mut env =
TestEnv::with_prefix("open-batch-regions-err").with_log_store_factory(factory.clone());
let engine = env.create_engine(MitoConfig::default()).await;
let topic = prepare_test_for_kafka_log_store(&factory).await;
let mut options = HashMap::new();
if let Some(topic) = &topic {
options.insert(
WAL_OPTIONS_KEY.to_string(),
serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
topic: topic.to_string(),
}))
.unwrap(),
);
};
let num_regions = 3u32;
let region_dir = "test".to_string();
let requests = (1..=num_regions)
.map(|id| {
(
RegionId::new(1, id),
RegionOpenRequest {
engine: String::new(),
region_dir: region_dir.to_string(),
options: options.clone(),
skip_wal_replay: false,
},
)
})
.collect::<Vec<_>>();
let results = engine
.handle_batch_open_requests(3, requests)
.await
.unwrap();
for (_, result) in results {
assert_eq!(
result.unwrap_err().status_code(),
StatusCode::RegionNotFound
);
}
}

View File

@@ -20,6 +20,7 @@ use std::sync::Arc;
use common_telemetry::{debug, error, info, warn};
use common_wal::options::WalOptions;
use futures::future::BoxFuture;
use futures::StreamExt;
use object_store::manager::ObjectStoreManagerRef;
use object_store::util::{join_dir, normalize_dir};
@@ -48,6 +49,7 @@ use crate::schedule::scheduler::SchedulerRef;
use crate::sst::file_purger::LocalFilePurger;
use crate::sst::index::intermediate::IntermediateManager;
use crate::time_provider::{StdTimeProvider, TimeProviderRef};
use crate::wal::entry_reader::WalEntryReader;
use crate::wal::{EntryId, Wal};
/// Builder to create a new [MitoRegion] or open an existing one.
@@ -64,6 +66,7 @@ pub(crate) struct RegionOpener {
intermediate_manager: IntermediateManager,
time_provider: Option<TimeProviderRef>,
stats: ManifestStats,
wal_entry_reader: Option<Box<dyn WalEntryReader>>,
}
impl RegionOpener {
@@ -89,6 +92,7 @@ impl RegionOpener {
intermediate_manager,
time_provider: None,
stats: Default::default(),
wal_entry_reader: None,
}
}
@@ -104,6 +108,16 @@ impl RegionOpener {
Ok(self)
}
/// If a [WalEntryReader] is set, the [RegionOpener] will use [WalEntryReader] instead of
/// constructing a new one from scratch.
pub(crate) fn wal_entry_reader(
mut self,
wal_entry_reader: Option<Box<dyn WalEntryReader>>,
) -> Self {
self.wal_entry_reader = wal_entry_reader;
self
}
/// Sets options for the region.
pub(crate) fn options(mut self, options: RegionOptions) -> Self {
self.options = Some(options);
@@ -165,8 +179,8 @@ impl RegionOpener {
}
}
let options = self.options.take().unwrap();
let provider = self.provider(&options.wal_options);
let object_store = self.object_store(&options.storage)?.clone();
let provider = self.provider(&options.wal_options);
// Create a manifest manager for this region and writes regions to the manifest file.
let region_manifest_options = self.manifest_options(config, &options)?;
@@ -231,7 +245,7 @@ impl RegionOpener {
///
/// Returns error if the region doesn't exist.
pub(crate) async fn open<S: LogStore>(
self,
mut self,
config: &MitoConfig,
wal: &Wal<S>,
) -> Result<MitoRegion> {
@@ -267,7 +281,7 @@ impl RegionOpener {
/// Tries to open the region and returns `None` if the region directory is empty.
async fn maybe_open<S: LogStore>(
&self,
&mut self,
config: &MitoConfig,
wal: &Wal<S>,
) -> Result<Option<MitoRegion>> {
@@ -288,6 +302,11 @@ impl RegionOpener {
let region_id = self.region_id;
let provider = self.provider(&region_options.wal_options);
let wal_entry_reader = self
.wal_entry_reader
.take()
.unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id));
let on_region_opened = wal.on_region_opened();
let object_store = self.object_store(&region_options.storage)?.clone();
debug!("Open region {} with options: {:?}", region_id, self.options);
@@ -331,12 +350,13 @@ impl RegionOpener {
region_id
);
replay_memtable(
wal,
&provider,
wal_entry_reader,
region_id,
flushed_entry_id,
&version_control,
config.allow_stale_entries,
on_region_opened,
)
.await?;
} else {
@@ -357,7 +377,7 @@ impl RegionOpener {
RegionState::ReadOnly,
)),
file_purger,
provider,
provider: provider.clone(),
last_flush_millis: AtomicI64::new(time_provider.current_time_millis()),
time_provider,
memtable_builder,
@@ -448,21 +468,25 @@ pub(crate) fn check_recovered_region(
}
/// Replays the mutations from WAL and inserts mutations to memtable of given region.
pub(crate) async fn replay_memtable<S: LogStore>(
wal: &Wal<S>,
pub(crate) async fn replay_memtable<F>(
provider: &Provider,
mut wal_entry_reader: Box<dyn WalEntryReader>,
region_id: RegionId,
flushed_entry_id: EntryId,
version_control: &VersionControlRef,
allow_stale_entries: bool,
) -> Result<EntryId> {
on_region_opened: F,
) -> Result<EntryId>
where
F: FnOnce(RegionId, EntryId, &Provider) -> BoxFuture<Result<()>> + Send,
{
let mut rows_replayed = 0;
// Last entry id should start from flushed entry id since there might be no
// data in the WAL.
let mut last_entry_id = flushed_entry_id;
let replay_from_entry_id = flushed_entry_id + 1;
let mut wal_stream = wal.scan(region_id, replay_from_entry_id, provider)?;
let mut wal_stream = wal_entry_reader.read(provider, replay_from_entry_id)?;
while let Some(res) = wal_stream.next().await {
let (entry_id, entry) = res?;
if entry_id <= flushed_entry_id {
@@ -496,7 +520,7 @@ pub(crate) async fn replay_memtable<S: LogStore>(
// TODO(weny): We need to update `flushed_entry_id` in the region manifest
// to avoid reading potentially incomplete entries in the future.
wal.obsolete(region_id, flushed_entry_id, provider).await?;
(on_region_opened)(region_id, flushed_entry_id, provider).await?;
info!(
"Replay WAL for region: {}, rows recovered: {}, last entry id: {}",

View File

@@ -44,6 +44,7 @@ use crate::error::{
};
use crate::manifest::action::RegionEdit;
use crate::metrics::COMPACTION_ELAPSED_TOTAL;
use crate::wal::entry_distributor::WalEntryReceiver;
use crate::wal::EntryId;
/// Request to write a region.
@@ -497,6 +498,22 @@ pub(crate) enum WorkerRequest {
}
impl WorkerRequest {
pub(crate) fn new_open_region_request(
region_id: RegionId,
request: RegionOpenRequest,
entry_receiver: Option<WalEntryReceiver>,
) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
let (sender, receiver) = oneshot::channel();
let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
region_id,
sender: sender.into(),
request: DdlRequest::Open((request, entry_receiver)),
});
(worker_request, receiver)
}
/// Converts request from a [RegionRequest].
pub(crate) fn try_from_region_request(
region_id: RegionId,
@@ -531,7 +548,7 @@ impl WorkerRequest {
RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest {
region_id,
sender: sender.into(),
request: DdlRequest::Open(v),
request: DdlRequest::Open((v, None)),
}),
RegionRequest::Close(v) => WorkerRequest::Ddl(SenderDdlRequest {
region_id,
@@ -585,7 +602,7 @@ impl WorkerRequest {
pub(crate) enum DdlRequest {
Create(RegionCreateRequest),
Drop(RegionDropRequest),
Open(RegionOpenRequest),
Open((RegionOpenRequest, Option<WalEntryReceiver>)),
Close(RegionCloseRequest),
Alter(RegionAlterRequest),
Flush(RegionFlushRequest),

View File

@@ -30,6 +30,7 @@ use std::sync::Arc;
use api::v1::WalEntry;
use common_error::ext::BoxedError;
use futures::future::BoxFuture;
use futures::stream::BoxStream;
use prost::Message;
use snafu::ResultExt;
@@ -86,6 +87,39 @@ impl<S: LogStore> Wal<S> {
}
}
/// Returns a [OnRegionOpened] function.
pub(crate) fn on_region_opened(
&self,
) -> impl FnOnce(RegionId, EntryId, &Provider) -> BoxFuture<Result<()>> {
let store = self.store.clone();
move |region_id, last_entry_id, provider| -> BoxFuture<'_, Result<()>> {
Box::pin(async move {
store
.obsolete(provider, last_entry_id)
.await
.map_err(BoxedError::new)
.context(DeleteWalSnafu { region_id })
})
}
}
/// Returns a [WalEntryReader]
pub(crate) fn wal_entry_reader(
&self,
provider: &Provider,
region_id: RegionId,
) -> Box<dyn WalEntryReader> {
match provider {
Provider::RaftEngine(_) => Box::new(LogStoreEntryReader::new(
LogStoreRawEntryReader::new(self.store.clone()),
)),
Provider::Kafka(_) => Box::new(LogStoreEntryReader::new(RegionRawEntryReader::new(
LogStoreRawEntryReader::new(self.store.clone()),
region_id,
))),
}
}
/// Scan entries of specific region starting from `start_id` (inclusive).
pub fn scan<'a>(
&'a self,

View File

@@ -20,7 +20,7 @@ use api::v1::WalEntry;
use async_stream::stream;
use common_telemetry::{debug, error};
use futures::future::join_all;
use snafu::ensure;
use snafu::{ensure, OptionExt};
use store_api::logstore::entry::Entry;
use store_api::logstore::provider::Provider;
use store_api::storage::RegionId;
@@ -101,9 +101,9 @@ impl WalEntryDistributor {
pub(crate) struct WalEntryReceiver {
region_id: RegionId,
/// Receives the [Entry] from the [WalEntryDistributor].
entry_receiver: Receiver<Entry>,
entry_receiver: Option<Receiver<Entry>>,
/// Sends the `start_id` to the [WalEntryDistributor].
arg_sender: oneshot::Sender<EntryId>,
arg_sender: Option<oneshot::Sender<EntryId>>,
}
impl WalEntryReceiver {
@@ -114,19 +114,22 @@ impl WalEntryReceiver {
) -> Self {
Self {
region_id,
entry_receiver,
arg_sender,
entry_receiver: Some(entry_receiver),
arg_sender: Some(arg_sender),
}
}
}
impl WalEntryReader for WalEntryReceiver {
fn read(self, provider: &Provider, start_id: EntryId) -> Result<WalEntryStream<'static>> {
let WalEntryReceiver {
region_id: expected_region_id,
mut entry_receiver,
arg_sender,
} = self;
fn read(&mut self, provider: &Provider, start_id: EntryId) -> Result<WalEntryStream<'static>> {
let mut arg_sender =
self.arg_sender
.take()
.with_context(|| error::InvalidWalReadRequestSnafu {
reason: format!("Call WalEntryReceiver multiple time, start_id: {start_id}"),
})?;
// Safety: check via arg_sender
let mut entry_receiver = self.entry_receiver.take().unwrap();
if arg_sender.send(start_id).is_err() {
return error::InvalidWalReadRequestSnafu {
@@ -167,6 +170,9 @@ struct EntryReceiver {
sender: Sender<Entry>,
}
/// The default buffer size of the [Entry] receiver.
pub const DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE: usize = 2048;
/// Returns [WalEntryDistributor] and batch [WalEntryReceiver]s.
///
/// ### Note:
@@ -186,14 +192,14 @@ struct EntryReceiver {
pub fn build_wal_entry_distributor_and_receivers(
provider: Provider,
raw_wal_reader: Arc<dyn RawEntryReader>,
region_ids: Vec<RegionId>,
region_ids: &[RegionId],
buffer_size: usize,
) -> (WalEntryDistributor, Vec<WalEntryReceiver>) {
let mut senders = HashMap::with_capacity(region_ids.len());
let mut readers = Vec::with_capacity(region_ids.len());
let mut arg_receivers = Vec::with_capacity(region_ids.len());
for region_id in region_ids {
for &region_id in region_ids {
let (entry_sender, entry_receiver) = mpsc::channel(buffer_size);
let (arg_sender, arg_receiver) = oneshot::channel();
@@ -257,7 +263,7 @@ mod tests {
let (distributor, receivers) = build_wal_entry_distributor_and_receivers(
provider,
reader,
vec![RegionId::new(1024, 1), RegionId::new(1025, 1)],
&[RegionId::new(1024, 1), RegionId::new(1025, 1)],
128,
);
@@ -317,7 +323,7 @@ mod tests {
let (distributor, mut receivers) = build_wal_entry_distributor_and_receivers(
provider.clone(),
reader,
vec![
&[
RegionId::new(1024, 1),
RegionId::new(1024, 2),
RegionId::new(1024, 3),
@@ -331,7 +337,7 @@ mod tests {
drop(last);
let mut streams = receivers
.into_iter()
.iter_mut()
.map(|receiver| receiver.read(&provider, 0).unwrap())
.collect::<Vec<_>>();
distributor.distribute().await.unwrap();
@@ -427,12 +433,12 @@ mod tests {
let (distributor, mut receivers) = build_wal_entry_distributor_and_receivers(
provider.clone(),
Arc::new(corrupted_stream),
vec![region1, region2, region3],
&[region1, region2, region3],
128,
);
assert_eq!(receivers.len(), 3);
let mut streams = receivers
.into_iter()
.iter_mut()
.map(|receiver| receiver.read(&provider, 0).unwrap())
.collect::<Vec<_>>();
distributor.distribute().await.unwrap();
@@ -510,12 +516,12 @@ mod tests {
let (distributor, mut receivers) = build_wal_entry_distributor_and_receivers(
provider.clone(),
Arc::new(corrupted_stream),
vec![region1, region2],
&[region1, region2],
128,
);
assert_eq!(receivers.len(), 2);
let mut streams = receivers
.into_iter()
.iter_mut()
.map(|receiver| receiver.read(&provider, 0).unwrap())
.collect::<Vec<_>>();
distributor.distribute().await.unwrap();
@@ -602,12 +608,12 @@ mod tests {
let (distributor, mut receivers) = build_wal_entry_distributor_and_receivers(
provider.clone(),
reader,
vec![RegionId::new(1024, 1), RegionId::new(1024, 2)],
&[RegionId::new(1024, 1), RegionId::new(1024, 2)],
128,
);
assert_eq!(receivers.len(), 2);
let mut streams = receivers
.into_iter()
.iter_mut()
.map(|receiver| receiver.read(&provider, 4).unwrap())
.collect::<Vec<_>>();
distributor.distribute().await.unwrap();

View File

@@ -38,8 +38,10 @@ pub(crate) fn decode_raw_entry(raw_entry: Entry) -> Result<(EntryId, WalEntry)>
}
/// [WalEntryReader] provides the ability to read and decode entries from the underlying store.
///
/// Notes: It will consume the inner stream and only allow invoking the `read` at once.
pub(crate) trait WalEntryReader: Send + Sync {
fn read(self, ns: &'_ Provider, start_id: EntryId) -> Result<WalEntryStream<'static>>;
fn read(&mut self, ns: &'_ Provider, start_id: EntryId) -> Result<WalEntryStream<'static>>;
}
/// A Reader reads the [RawEntry] from [RawEntryReader] and decodes [RawEntry] into [WalEntry].
@@ -54,7 +56,7 @@ impl<R> LogStoreEntryReader<R> {
}
impl<R: RawEntryReader> WalEntryReader for LogStoreEntryReader<R> {
fn read(self, ns: &'_ Provider, start_id: EntryId) -> Result<WalEntryStream<'static>> {
fn read(&mut self, ns: &'_ Provider, start_id: EntryId) -> Result<WalEntryStream<'static>> {
let LogStoreEntryReader { reader } = self;
let mut stream = reader.read(ns, start_id)?;
@@ -136,7 +138,7 @@ mod tests {
],
};
let reader = LogStoreEntryReader::new(raw_entry_stream);
let mut reader = LogStoreEntryReader::new(raw_entry_stream);
let entries = reader
.read(&provider, 0)
.unwrap()
@@ -172,7 +174,7 @@ mod tests {
],
};
let reader = LogStoreEntryReader::new(raw_entry_stream);
let mut reader = LogStoreEntryReader::new(raw_entry_stream);
let err = reader
.read(&provider, 0)
.unwrap()

View File

@@ -721,8 +721,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
let res = match ddl.request {
DdlRequest::Create(req) => self.handle_create_request(ddl.region_id, req).await,
DdlRequest::Drop(_) => self.handle_drop_request(ddl.region_id).await,
DdlRequest::Open(req) => {
self.handle_open_request(ddl.region_id, req, ddl.sender)
DdlRequest::Open((req, wal_entry_receiver)) => {
self.handle_open_request(ddl.region_id, req, wal_entry_receiver, ddl.sender)
.await;
continue;
}

View File

@@ -73,13 +73,16 @@ impl<S: LogStore> RegionWorkerLoop<S> {
let flushed_entry_id = region.version_control.current().last_entry_id;
info!("Trying to replay memtable for region: {region_id}, flushed entry id: {flushed_entry_id}");
let timer = Instant::now();
let wal_entry_reader = self.wal.wal_entry_reader(&region.provider, region_id);
let on_region_opened = self.wal.on_region_opened();
let last_entry_id = replay_memtable(
&self.wal,
&region.provider,
wal_entry_reader,
region_id,
flushed_entry_id,
&region.version_control,
self.config.allow_stale_entries,
on_region_opened,
)
.await?;
info!(

View File

@@ -29,6 +29,7 @@ use crate::error::{
use crate::metrics::REGION_COUNT;
use crate::region::opener::RegionOpener;
use crate::request::OptionOutputTx;
use crate::wal::entry_distributor::WalEntryReceiver;
use crate::worker::handle_drop::remove_region_dir_once;
use crate::worker::{RegionWorkerLoop, DROPPING_MARKER_FILE};
@@ -66,6 +67,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
&mut self,
region_id: RegionId,
request: RegionOpenRequest,
wal_entry_receiver: Option<WalEntryReceiver>,
sender: OptionOutputTx,
) {
if self.regions.is_region_exists(region_id) {
@@ -95,6 +97,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
)
.skip_wal_replay(request.skip_wal_replay)
.cache(Some(self.cache_manager.clone()))
.wal_entry_reader(wal_entry_receiver.map(|receiver| Box::new(receiver) as _))
.parse_options(request.options)
{
Ok(opener) => opener,

View File

@@ -26,8 +26,8 @@ serde.workspace = true
serde_json.workspace = true
snafu.workspace = true
strum.workspace = true
tokio.workspace = true
[dev-dependencies]
async-stream.workspace = true
serde_json.workspace = true
tokio.workspace = true

View File

@@ -26,12 +26,14 @@ use common_query::error::ExecuteRepeatedlySnafu;
use common_recordbatch::SendableRecordBatchStream;
use datafusion_physical_plan::{DisplayAs, DisplayFormatType};
use datatypes::schema::SchemaRef;
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use tokio::sync::Semaphore;
use crate::logstore::entry;
use crate::metadata::RegionMetadataRef;
use crate::region_request::RegionRequest;
use crate::region_request::{RegionOpenRequest, RegionRequest};
use crate::storage::{RegionId, ScanRequest};
/// The result of setting readonly for the region.
@@ -177,11 +179,38 @@ pub trait RegionScanner: Debug + DisplayAs + Send + Sync {
pub type RegionScannerRef = Arc<dyn RegionScanner>;
pub type BatchResponses = Vec<(RegionId, Result<RegionResponse, BoxedError>)>;
#[async_trait]
pub trait RegionEngine: Send + Sync {
/// Name of this engine
fn name(&self) -> &str;
/// Handles batch open region requests.
async fn handle_batch_open_requests(
&self,
parallelism: usize,
requests: Vec<(RegionId, RegionOpenRequest)>,
) -> Result<BatchResponses, BoxedError> {
let semaphore = Arc::new(Semaphore::new(parallelism));
let mut tasks = Vec::with_capacity(requests.len());
for (region_id, request) in requests {
let semaphore_moved = semaphore.clone();
tasks.push(async move {
// Safety: semaphore must exist
let _permit = semaphore_moved.acquire().await.unwrap();
let result = self
.handle_request(region_id, RegionRequest::Open(request))
.await;
(region_id, result)
});
}
Ok(join_all(tasks).await)
}
/// Handles non-query request to the region. Returns the count of affected rows.
async fn handle_request(
&self,