feat: implement handle upgrade region instruction (#3013)

* feat: implement task tracker

* feat: implement handle upgrade region instruction

* refactor: remove redundant code

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* refactor: refactor wait_for_replay_millis to wait_for_replay_timeout

* chore: apply suggestions from CR

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2023-12-28 11:08:47 +09:00
committed by GitHub
parent bd0eed7af9
commit 485a91f49a
13 changed files with 923 additions and 100 deletions

View File

@@ -14,6 +14,7 @@
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::time::Duration;
use serde::{Deserialize, Serialize};
use store_api::storage::{RegionId, RegionNumber};
@@ -140,11 +141,12 @@ pub struct UpgradeRegion {
pub region_id: RegionId,
/// The `last_entry_id` of old leader region.
pub last_entry_id: Option<u64>,
/// The second of waiting for a wal replay.
/// The timeout of waiting for a wal replay.
///
/// `None` stands for no wait,
/// it's helpful to verify whether the leader region is ready.
pub wait_for_replay_secs: Option<u64>,
#[serde(with = "humantime_serde")]
pub wait_for_replay_timeout: Option<Duration>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Display)]

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::any::Any;
use std::sync::Arc;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
@@ -27,6 +28,19 @@ use table::error::Error as TableError;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Failed to execute async task"))]
AsyncTaskExecute {
location: Location,
source: Arc<Error>,
},
#[snafu(display("Failed to watch change"))]
WatchAsyncTaskChange {
location: Location,
#[snafu(source)]
error: tokio::sync::watch::error::RecvError,
},
#[snafu(display("Failed to handle heartbeat response"))]
HandleHeartbeatResponse {
location: Location,
@@ -292,7 +306,11 @@ impl ErrorExt for Error {
| MissingWalDirConfig { .. }
| MissingKvBackend { .. } => StatusCode::InvalidArguments,
PayloadNotExist { .. } | Unexpected { .. } => StatusCode::Unexpected,
PayloadNotExist { .. } | Unexpected { .. } | WatchAsyncTaskChange { .. } => {
StatusCode::Unexpected
}
AsyncTaskExecute { source, .. } => source.status_code(),
// TODO(yingwen): Further categorize http error.
ParseAddr { .. }

View File

@@ -41,6 +41,7 @@ use crate::metrics;
use crate::region_server::RegionServer;
pub(crate) mod handler;
pub(crate) mod task_tracker;
pub struct HeartbeatTask {
node_id: u64,

View File

@@ -17,130 +17,75 @@ use common_meta::error::{InvalidHeartbeatResponseSnafu, Result as MetaResult};
use common_meta::heartbeat::handler::{
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
};
use common_meta::instruction::{
DowngradeRegion, DowngradeRegionReply, Instruction, InstructionReply, OpenRegion, SimpleReply,
};
use common_meta::instruction::{Instruction, InstructionReply};
use common_meta::RegionIdent;
use common_telemetry::error;
use futures::future::BoxFuture;
use snafu::OptionExt;
use store_api::path_utils::region_dir;
use store_api::region_engine::SetReadonlyResponse;
use store_api::region_request::{RegionCloseRequest, RegionOpenRequest, RegionRequest};
use store_api::storage::RegionId;
use crate::error;
mod close_region;
mod downgrade_region;
mod open_region;
mod upgrade_region;
use super::task_tracker::TaskTracker;
use crate::region_server::RegionServer;
/// Handler for [Instruction::OpenRegion] and [Instruction::CloseRegion].
#[derive(Clone)]
pub struct RegionHeartbeatResponseHandler {
region_server: RegionServer,
catchup_tasks: TaskTracker<()>,
}
/// Handler of the instruction.
pub type InstructionHandler =
Box<dyn FnOnce(RegionServer) -> BoxFuture<'static, InstructionReply> + Send>;
Box<dyn FnOnce(HandlerContext) -> BoxFuture<'static, InstructionReply> + Send>;
#[derive(Clone)]
pub struct HandlerContext {
region_server: RegionServer,
catchup_tasks: TaskTracker<()>,
}
impl HandlerContext {
fn region_ident_to_region_id(region_ident: &RegionIdent) -> RegionId {
RegionId::new(region_ident.table_id, region_ident.region_number)
}
}
impl RegionHeartbeatResponseHandler {
/// Returns the [RegionHeartbeatResponseHandler].
pub fn new(region_server: RegionServer) -> Self {
Self { region_server }
Self {
region_server,
catchup_tasks: TaskTracker::new(),
}
}
/// Builds the [InstructionHandler].
fn build_handler(instruction: Instruction) -> MetaResult<InstructionHandler> {
match instruction {
Instruction::OpenRegion(OpenRegion {
region_ident,
region_storage_path,
region_options,
region_wal_options,
skip_wal_replay,
}) => Ok(Box::new(move |region_server| {
Box::pin(async move {
let region_id = Self::region_ident_to_region_id(&region_ident);
// TODO(niebayes): extends region options with region_wal_options.
let _ = region_wal_options;
let request = RegionRequest::Open(RegionOpenRequest {
engine: region_ident.engine,
region_dir: region_dir(&region_storage_path, region_id),
options: region_options,
skip_wal_replay,
});
let result = region_server.handle_request(region_id, request).await;
let success = result.is_ok();
let error = result.as_ref().map_err(|e| e.to_string()).err();
InstructionReply::OpenRegion(SimpleReply {
result: success,
error,
})
})
Instruction::OpenRegion(open_region) => Ok(Box::new(move |handler_context| {
handler_context.handle_open_region_instruction(open_region)
})),
Instruction::CloseRegion(region_ident) => Ok(Box::new(|region_server| {
Box::pin(async move {
let region_id = Self::region_ident_to_region_id(&region_ident);
let request = RegionRequest::Close(RegionCloseRequest {});
let result = region_server.handle_request(region_id, request).await;
match result {
Ok(_) => InstructionReply::CloseRegion(SimpleReply {
result: true,
error: None,
}),
Err(error::Error::RegionNotFound { .. }) => {
InstructionReply::CloseRegion(SimpleReply {
result: true,
error: None,
})
}
Err(err) => InstructionReply::CloseRegion(SimpleReply {
result: false,
error: Some(err.to_string()),
}),
}
})
Instruction::CloseRegion(close_region) => Ok(Box::new(|handler_context| {
handler_context.handle_close_region_instruction(close_region)
})),
Instruction::DowngradeRegion(DowngradeRegion { region_id }) => {
Ok(Box::new(move |region_server| {
Box::pin(async move {
match region_server.set_readonly_gracefully(region_id).await {
Ok(SetReadonlyResponse::Success { last_entry_id }) => {
InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id,
exists: true,
error: None,
})
}
Ok(SetReadonlyResponse::NotFound) => {
InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
exists: false,
error: None,
})
}
Err(err) => InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
exists: false,
error: Some(err.to_string()),
}),
}
})
Instruction::DowngradeRegion(downgrade_region) => {
Ok(Box::new(move |handler_context| {
handler_context.handle_downgrade_region_instruction(downgrade_region)
}))
}
Instruction::UpgradeRegion(_) => {
todo!()
}
Instruction::UpgradeRegion(upgrade_region) => Ok(Box::new(move |handler_context| {
handler_context.handle_upgrade_region_instruction(upgrade_region)
})),
Instruction::InvalidateTableIdCache(_) | Instruction::InvalidateTableNameCache(_) => {
InvalidHeartbeatResponseSnafu.fail()
}
}
}
fn region_ident_to_region_id(region_ident: &RegionIdent) -> RegionId {
RegionId::new(region_ident.table_id, region_ident.region_number)
}
}
#[async_trait]
@@ -162,9 +107,14 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
let mailbox = ctx.mailbox.clone();
let region_server = self.region_server.clone();
let catchup_tasks = self.catchup_tasks.clone();
let handler = Self::build_handler(instruction)?;
let _handle = common_runtime::spawn_bg(async move {
let reply = handler(region_server).await;
let reply = handler(HandlerContext {
region_server,
catchup_tasks,
})
.await;
if let Err(e) = mailbox.send((meta, reply)).await {
error!(e; "Failed to send reply to mailbox");
@@ -184,10 +134,12 @@ mod tests {
use common_meta::heartbeat::mailbox::{
HeartbeatMailbox, IncomingMessage, MailboxRef, MessageMeta,
};
use common_meta::instruction::{DowngradeRegion, OpenRegion};
use mito2::config::MitoConfig;
use mito2::engine::MITO_ENGINE_NAME;
use mito2::test_util::{CreateRequestBuilder, TestEnv};
use store_api::region_request::RegionRequest;
use store_api::path_utils::region_dir;
use store_api::region_request::{RegionCloseRequest, RegionRequest};
use store_api::storage::RegionId;
use tokio::sync::mpsc::{self, Receiver};

View File

@@ -0,0 +1,54 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_error::ext::ErrorExt;
use common_meta::instruction::{InstructionReply, SimpleReply};
use common_meta::RegionIdent;
use common_telemetry::warn;
use futures_util::future::BoxFuture;
use store_api::region_request::{RegionCloseRequest, RegionRequest};
use crate::error;
use crate::heartbeat::handler::HandlerContext;
impl HandlerContext {
pub(crate) fn handle_close_region_instruction(
self,
region_ident: RegionIdent,
) -> BoxFuture<'static, InstructionReply> {
Box::pin(async move {
let region_id = Self::region_ident_to_region_id(&region_ident);
let request = RegionRequest::Close(RegionCloseRequest {});
let result = self.region_server.handle_request(region_id, request).await;
match result {
Ok(_) => InstructionReply::CloseRegion(SimpleReply {
result: true,
error: None,
}),
Err(error::Error::RegionNotFound { .. }) => {
warn!("Received a close region instruction from meta, but target region:{region_id} is not found.");
InstructionReply::CloseRegion(SimpleReply {
result: true,
error: None,
})
}
Err(err) => InstructionReply::CloseRegion(SimpleReply {
result: false,
error: Some(err.output_msg()),
}),
}
})
}
}

View File

@@ -0,0 +1,51 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_error::ext::ErrorExt;
use common_meta::instruction::{DowngradeRegion, DowngradeRegionReply, InstructionReply};
use futures_util::future::BoxFuture;
use store_api::region_engine::SetReadonlyResponse;
use crate::heartbeat::handler::HandlerContext;
impl HandlerContext {
pub(crate) fn handle_downgrade_region_instruction(
self,
DowngradeRegion { region_id }: DowngradeRegion,
) -> BoxFuture<'static, InstructionReply> {
Box::pin(async move {
match self.region_server.set_readonly_gracefully(region_id).await {
Ok(SetReadonlyResponse::Success { last_entry_id }) => {
InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id,
exists: true,
error: None,
})
}
Ok(SetReadonlyResponse::NotFound) => {
InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
exists: false,
error: None,
})
}
Err(err) => InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
exists: true,
error: Some(err.output_msg()),
}),
}
})
}
}

View File

@@ -0,0 +1,55 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_error::ext::ErrorExt;
use common_meta::instruction::{InstructionReply, OpenRegion, SimpleReply};
use futures_util::future::BoxFuture;
use store_api::path_utils::region_dir;
use store_api::region_request::{RegionOpenRequest, RegionRequest};
use crate::heartbeat::handler::HandlerContext;
impl HandlerContext {
pub(crate) fn handle_open_region_instruction(
self,
OpenRegion {
region_ident,
region_storage_path,
region_options,
region_wal_options,
skip_wal_replay,
}: OpenRegion,
) -> BoxFuture<'static, InstructionReply> {
Box::pin(async move {
let region_id = Self::region_ident_to_region_id(&region_ident);
// TODO(niebayes): extends region options with region_wal_options.
let _ = region_wal_options;
let request = RegionRequest::Open(RegionOpenRequest {
engine: region_ident.engine,
region_dir: region_dir(&region_storage_path, region_id),
options: region_options,
skip_wal_replay,
});
let result = self.region_server.handle_request(region_id, request).await;
let success = result.is_ok();
let error = result.as_ref().map_err(|e| e.output_msg()).err();
InstructionReply::OpenRegion(SimpleReply {
result: success,
error,
})
})
}
}

View File

@@ -0,0 +1,363 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_error::ext::ErrorExt;
use common_meta::instruction::{InstructionReply, UpgradeRegion, UpgradeRegionReply};
use common_telemetry::warn;
use futures_util::future::BoxFuture;
use store_api::region_request::{RegionCatchupRequest, RegionRequest};
use crate::heartbeat::handler::HandlerContext;
use crate::heartbeat::task_tracker::WaitResult;
impl HandlerContext {
pub(crate) fn handle_upgrade_region_instruction(
self,
UpgradeRegion {
region_id,
last_entry_id,
wait_for_replay_timeout,
}: UpgradeRegion,
) -> BoxFuture<'static, InstructionReply> {
Box::pin(async move {
let Some(writable) = self.region_server.is_writable(region_id) else {
return InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: false,
exists: false,
error: None,
});
};
if writable {
return InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: true,
exists: true,
error: None,
});
}
let region_server_moved = self.region_server.clone();
// The catchup task is almost zero cost if the inside region is writable.
// Therefore, it always registers a new catchup task.
let register_result = self
.catchup_tasks
.try_register(
region_id,
Box::pin(async move {
region_server_moved
.handle_request(
region_id,
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true,
entry_id: last_entry_id,
}),
)
.await?;
Ok(())
}),
)
.await;
if register_result.is_busy() {
warn!("Another catchup task is running for the region: {region_id}");
}
// Returns immediately
let Some(wait_for_replay_timeout) = wait_for_replay_timeout else {
return InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: false,
exists: true,
error: None,
});
};
// We don't care that it returns a newly registered or running task.
let mut watcher = register_result.into_watcher();
let result = self
.catchup_tasks
.wait(&mut watcher, wait_for_replay_timeout)
.await;
match result {
WaitResult::Timeout => InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: false,
exists: true,
error: None,
}),
WaitResult::Finish(Ok(_)) => InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: true,
exists: true,
error: None,
}),
WaitResult::Finish(Err(err)) => {
InstructionReply::UpgradeRegion(UpgradeRegionReply {
ready: false,
exists: true,
error: Some(err.output_msg()),
})
}
}
})
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::time::Duration;
use common_meta::instruction::{InstructionReply, UpgradeRegion};
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;
use tokio::time::Instant;
use crate::error;
use crate::heartbeat::handler::HandlerContext;
use crate::heartbeat::task_tracker::TaskTracker;
use crate::tests::{mock_region_server, MockRegionEngine};
#[tokio::test]
async fn test_region_not_exist() {
let mut mock_region_server = mock_region_server();
let (mock_engine, _) = MockRegionEngine::new();
mock_region_server.register_engine(mock_engine);
let handler_context = HandlerContext {
region_server: mock_region_server,
catchup_tasks: TaskTracker::new(),
};
let region_id = RegionId::new(1024, 1);
let waits = vec![None, Some(Duration::from_millis(100u64))];
for wait_for_replay_timeout in waits {
let reply = handler_context
.clone()
.handle_upgrade_region_instruction(UpgradeRegion {
region_id,
last_entry_id: None,
wait_for_replay_timeout,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
if let InstructionReply::UpgradeRegion(reply) = reply {
assert!(!reply.exists);
assert!(reply.error.is_none());
}
}
}
#[tokio::test]
async fn test_region_writable() {
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(|region_engine| {
region_engine.mock_role = Some(Some(RegionRole::Leader));
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
// Should be unreachable.
unreachable!();
}));
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext {
region_server: mock_region_server,
catchup_tasks: TaskTracker::new(),
};
let waits = vec![None, Some(Duration::from_millis(100u64))];
for wait_for_replay_timeout in waits {
let reply = handler_context
.clone()
.handle_upgrade_region_instruction(UpgradeRegion {
region_id,
last_entry_id: None,
wait_for_replay_timeout,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
if let InstructionReply::UpgradeRegion(reply) = reply {
assert!(reply.ready);
assert!(reply.exists);
assert!(reply.error.is_none());
}
}
}
#[tokio::test]
async fn test_region_not_ready() {
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(|region_engine| {
// Region is not ready.
region_engine.mock_role = Some(Some(RegionRole::Follower));
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
// Note: Don't change.
region_engine.handle_request_delay = Some(Duration::from_secs(100));
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext {
region_server: mock_region_server,
catchup_tasks: TaskTracker::new(),
};
let waits = vec![None, Some(Duration::from_millis(100u64))];
for wait_for_replay_timeout in waits {
let reply = handler_context
.clone()
.handle_upgrade_region_instruction(UpgradeRegion {
region_id,
last_entry_id: None,
wait_for_replay_timeout,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
if let InstructionReply::UpgradeRegion(reply) = reply {
assert!(!reply.ready);
assert!(reply.exists);
assert!(reply.error.is_none());
}
}
}
#[tokio::test]
async fn test_region_not_ready_with_retry() {
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(|region_engine| {
// Region is not ready.
region_engine.mock_role = Some(Some(RegionRole::Follower));
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
// Note: Don't change.
region_engine.handle_request_delay = Some(Duration::from_millis(300));
});
mock_region_server.register_test_region(region_id, mock_engine);
let waits = vec![
Some(Duration::from_millis(100u64)),
Some(Duration::from_millis(100u64)),
];
let handler_context = HandlerContext {
region_server: mock_region_server,
catchup_tasks: TaskTracker::new(),
};
for wait_for_replay_timeout in waits {
let reply = handler_context
.clone()
.handle_upgrade_region_instruction(UpgradeRegion {
region_id,
last_entry_id: None,
wait_for_replay_timeout,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
if let InstructionReply::UpgradeRegion(reply) = reply {
assert!(!reply.ready);
assert!(reply.exists);
assert!(reply.error.is_none());
}
}
let timer = Instant::now();
let reply = handler_context
.handle_upgrade_region_instruction(UpgradeRegion {
region_id,
last_entry_id: None,
wait_for_replay_timeout: Some(Duration::from_millis(500)),
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
// Must less than 300 ms.
assert!(timer.elapsed().as_millis() < 300);
if let InstructionReply::UpgradeRegion(reply) = reply {
assert!(reply.ready);
assert!(reply.exists);
assert!(reply.error.is_none());
}
}
#[tokio::test]
async fn test_region_error() {
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(|region_engine| {
// Region is not ready.
region_engine.mock_role = Some(Some(RegionRole::Follower));
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
error::UnexpectedSnafu {
violated: "mock_error".to_string(),
}
.fail()
}));
// Note: Don't change.
region_engine.handle_request_delay = Some(Duration::from_millis(100));
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext {
region_server: mock_region_server,
catchup_tasks: TaskTracker::new(),
};
let reply = handler_context
.clone()
.handle_upgrade_region_instruction(UpgradeRegion {
region_id,
last_entry_id: None,
wait_for_replay_timeout: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
// It didn't wait for handle returns; it had no idea about the error.
if let InstructionReply::UpgradeRegion(reply) = reply {
assert!(!reply.ready);
assert!(reply.exists);
assert!(reply.error.is_none());
}
let reply = handler_context
.clone()
.handle_upgrade_region_instruction(UpgradeRegion {
region_id,
last_entry_id: None,
wait_for_replay_timeout: Some(Duration::from_millis(200)),
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
if let InstructionReply::UpgradeRegion(reply) = reply {
assert!(!reply.ready);
assert!(reply.exists);
assert!(reply.error.is_some());
assert!(reply.error.unwrap().contains("mock_error"));
}
}
}

View File

@@ -0,0 +1,279 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use futures_util::future::BoxFuture;
use snafu::ResultExt;
use store_api::storage::RegionId;
use tokio::sync::watch::{self, Receiver};
use tokio::sync::RwLock;
use crate::error::{self, Error, Result};
/// The state of a async task.
#[derive(Debug, Default, Clone)]
pub(crate) enum TaskState<T: Send + Sync + Clone> {
Error(Arc<Error>),
#[default]
Running,
Done(T),
}
pub(crate) type TaskWatcher<T> = Receiver<TaskState<T>>;
async fn wait<T: Send + Sync + Clone>(watcher: &mut TaskWatcher<T>) -> Result<T> {
loop {
watcher
.changed()
.await
.context(error::WatchAsyncTaskChangeSnafu)?;
let r = &*watcher.borrow();
match r {
TaskState::Error(err) => return Err(err.clone()).context(error::AsyncTaskExecuteSnafu),
TaskState::Running => {}
TaskState::Done(value) => return Ok(value.clone()),
}
}
}
/// The running async task.
pub(crate) struct Task<T: Send + Sync + Clone> {
watcher: TaskWatcher<T>,
}
pub(crate) struct TaskTrackerInner<T: Send + Sync + Clone> {
state: HashMap<RegionId, Task<T>>,
}
impl<T: Send + Sync + Clone> Default for TaskTrackerInner<T> {
fn default() -> Self {
TaskTrackerInner {
state: HashMap::new(),
}
}
}
/// Tracks the long-running async tasks.
#[derive(Clone)]
pub(crate) struct TaskTracker<T: Send + Sync + Clone> {
inner: Arc<RwLock<TaskTrackerInner<T>>>,
}
/// The registering result of a async task.
pub(crate) enum RegisterResult<T: Send + Sync + Clone> {
// The watcher of the running task.
Busy(TaskWatcher<T>),
// The watcher of the newly registered task.
Running(TaskWatcher<T>),
}
impl<T: Send + Sync + Clone> RegisterResult<T> {
pub(crate) fn into_watcher(self) -> TaskWatcher<T> {
match self {
RegisterResult::Busy(inner) => inner,
RegisterResult::Running(inner) => inner,
}
}
/// Returns true if it's [RegisterResult::Busy].
pub(crate) fn is_busy(&self) -> bool {
matches!(self, RegisterResult::Busy(_))
}
#[cfg(test)]
/// Returns true if it's [RegisterResult::Running].
pub(crate) fn is_running(&self) -> bool {
matches!(self, RegisterResult::Running(_))
}
}
/// The result of waiting.
pub(crate) enum WaitResult<T> {
Timeout,
Finish(Result<T>),
}
#[cfg(test)]
impl<T> WaitResult<T> {
/// Returns true if it's [WaitResult::Timeout].
pub(crate) fn is_timeout(&self) -> bool {
matches!(self, WaitResult::Timeout)
}
/// Into the [WaitResult::Timeout] if it's.
pub(crate) fn into_finish(self) -> Option<Result<T>> {
match self {
WaitResult::Timeout => None,
WaitResult::Finish(result) => Some(result),
}
}
}
impl<T: Send + Sync + Clone + 'static> TaskTracker<T> {
/// Returns an empty [AsyncTaskTracker].
pub(crate) fn new() -> Self {
Self {
inner: Arc::new(RwLock::new(TaskTrackerInner::default())),
}
}
/// Waits for a [RegisterResult] and returns a [WaitResult].
pub(crate) async fn wait(
&self,
watcher: &mut TaskWatcher<T>,
timeout: Duration,
) -> WaitResult<T> {
match tokio::time::timeout(timeout, wait(watcher)).await {
Ok(result) => WaitResult::Finish(result),
Err(_) => WaitResult::Timeout,
}
}
/// Tries to register a new async task, returns [RegisterResult::Busy] if previous task is running.
pub(crate) async fn try_register(
&self,
region_id: RegionId,
fut: BoxFuture<'static, Result<T>>,
) -> RegisterResult<T> {
let mut inner = self.inner.write().await;
if let Some(task) = inner.state.get(&region_id) {
RegisterResult::Busy(task.watcher.clone())
} else {
let moved_inner = self.inner.clone();
let (tx, rx) = watch::channel(TaskState::<T>::Running);
common_runtime::spawn_bg(async move {
match fut.await {
Ok(result) => {
let _ = tx.send(TaskState::Done(result));
}
Err(err) => {
let _ = tx.send(TaskState::Error(Arc::new(err)));
}
};
moved_inner.write().await.state.remove(&region_id);
});
inner.state.insert(
region_id,
Task {
watcher: rx.clone(),
},
);
RegisterResult::Running(rx.clone())
}
}
#[cfg(test)]
async fn watcher(&self, region_id: RegionId) -> Option<TaskWatcher<T>> {
self.inner
.read()
.await
.state
.get(&region_id)
.map(|task| task.watcher.clone())
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use store_api::storage::RegionId;
use tokio::sync::oneshot;
use crate::heartbeat::task_tracker::{wait, TaskTracker};
#[derive(Debug, Clone, PartialEq, Eq)]
struct TestResult {
value: i32,
}
#[tokio::test]
async fn test_async_task_tracker_register() {
let tracker = TaskTracker::<TestResult>::new();
let region_id = RegionId::new(1024, 1);
let (tx, rx) = oneshot::channel::<()>();
let result = tracker
.try_register(
region_id,
Box::pin(async move {
let _ = rx.await;
Ok(TestResult { value: 1024 })
}),
)
.await;
assert!(result.is_running());
let result = tracker
.try_register(
region_id,
Box::pin(async move { Ok(TestResult { value: 1023 }) }),
)
.await;
assert!(result.is_busy());
let mut watcher = tracker.watcher(region_id).await.unwrap();
// Triggers first future return.
tx.send(()).unwrap();
assert_eq!(
TestResult { value: 1024 },
wait(&mut watcher).await.unwrap()
);
let result = tracker
.try_register(
region_id,
Box::pin(async move { Ok(TestResult { value: 1022 }) }),
)
.await;
assert!(result.is_running());
}
#[tokio::test]
async fn test_async_task_tracker_wait_timeout() {
let tracker = TaskTracker::<TestResult>::new();
let region_id = RegionId::new(1024, 1);
let (tx, rx) = oneshot::channel::<()>();
let result = tracker
.try_register(
region_id,
Box::pin(async move {
let _ = rx.await;
Ok(TestResult { value: 1024 })
}),
)
.await;
let mut watcher = result.into_watcher();
let result = tracker.wait(&mut watcher, Duration::from_millis(100)).await;
assert!(result.is_timeout());
// Triggers first future return.
tx.send(()).unwrap();
let result = tracker
.wait(&mut watcher, Duration::from_millis(100))
.await
.into_finish()
.unwrap()
.unwrap();
assert_eq!(TestResult { value: 1024 }, result);
assert!(tracker.watcher(region_id).await.is_none());
}
}

View File

@@ -139,6 +139,16 @@ impl RegionServer {
.collect()
}
pub fn is_writable(&self, region_id: RegionId) -> Option<bool> {
// TODO(weny): Finds a better way.
self.inner.region_map.get(&region_id).and_then(|engine| {
engine.role(region_id).map(|role| match role {
RegionRole::Follower => false,
RegionRole::Leader => true,
})
})
}
pub fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<()> {
let engine = self
.inner
@@ -178,6 +188,14 @@ impl RegionServer {
pub async fn stop(&self) -> Result<()> {
self.inner.stop().await
}
#[cfg(test)]
/// Registers a region for test purpose.
pub(crate) fn register_test_region(&self, region_id: RegionId, engine: RegionEngineRef) {
self.inner
.region_map
.insert(region_id, RegionEngineWithStatus::Ready(engine));
}
}
#[async_trait]

View File

@@ -14,6 +14,7 @@
use std::any::Any;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use common_error::ext::BoxedError;
@@ -93,7 +94,9 @@ pub type MockRequestHandler =
pub struct MockRegionEngine {
sender: Sender<(RegionId, RegionRequest)>,
handle_request_mock_fn: Option<MockRequestHandler>,
pub(crate) handle_request_delay: Option<Duration>,
pub(crate) handle_request_mock_fn: Option<MockRequestHandler>,
pub(crate) mock_role: Option<Option<RegionRole>>,
}
impl MockRegionEngine {
@@ -102,8 +105,10 @@ impl MockRegionEngine {
(
Arc::new(Self {
handle_request_delay: None,
sender: tx,
handle_request_mock_fn: None,
mock_role: None,
}),
rx,
)
@@ -116,12 +121,31 @@ impl MockRegionEngine {
(
Arc::new(Self {
handle_request_delay: None,
sender: tx,
handle_request_mock_fn: Some(mock_fn),
mock_role: None,
}),
rx,
)
}
pub fn with_custom_apply_fn<F>(apply: F) -> (Arc<Self>, Receiver<(RegionId, RegionRequest)>)
where
F: FnOnce(&mut MockRegionEngine),
{
let (tx, rx) = tokio::sync::mpsc::channel(8);
let mut region_engine = Self {
handle_request_delay: None,
sender: tx,
handle_request_mock_fn: None,
mock_role: None,
};
apply(&mut region_engine);
(Arc::new(region_engine), rx)
}
}
#[async_trait::async_trait]
@@ -135,6 +159,9 @@ impl RegionEngine for MockRegionEngine {
region_id: RegionId,
request: RegionRequest,
) -> Result<AffectedRows, BoxedError> {
if let Some(delay) = self.handle_request_delay {
tokio::time::sleep(delay).await;
}
if let Some(mock_fn) = &self.handle_request_mock_fn {
return mock_fn(region_id, request).map_err(BoxedError::new);
};
@@ -175,6 +202,9 @@ impl RegionEngine for MockRegionEngine {
}
fn role(&self, _region_id: RegionId) -> Option<RegionRole> {
if let Some(role) = self.mock_role {
return role;
}
Some(RegionRole::Leader)
}
}

View File

@@ -89,7 +89,7 @@ impl UpgradeCandidateRegion {
Instruction::UpgradeRegion(UpgradeRegion {
region_id,
last_entry_id,
wait_for_replay_secs: Some(self.replay_timeout.as_secs()),
wait_for_replay_timeout: Some(self.replay_timeout),
})
}

View File

@@ -79,9 +79,9 @@ impl From<PbGrantedRegion> for GrantedRegion {
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum RegionRole {
// Readonly region(mito2), Readonly region(file).
// Readonly region(mito2)
Follower,
// Writable region(mito2).
// Writable region(mito2), Readonly region(file).
Leader,
}