mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-06 21:32:58 +00:00
feat: handle the downgrade region instruction (#2855)
* feat: handle the downgrade region instruction * test: add tests for RegionHeartbeatResponseHandler * refactor: remove unused code
This commit is contained in:
@@ -37,7 +37,7 @@ pub struct HeartbeatResponseHandlerContext {
|
||||
/// HandleControl
|
||||
///
|
||||
/// Controls process of handling heartbeat response.
|
||||
#[derive(PartialEq)]
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum HandleControl {
|
||||
Continue,
|
||||
Done,
|
||||
|
||||
@@ -30,8 +30,8 @@ pub struct MessageMeta {
|
||||
pub from: String,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl MessageMeta {
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
pub fn new_test(id: u64, subject: &str, to: &str, from: &str) -> Self {
|
||||
MessageMeta {
|
||||
id,
|
||||
|
||||
@@ -77,7 +77,9 @@ uuid.workspace = true
|
||||
[dev-dependencies]
|
||||
axum-test-helper = { git = "https://github.com/sunng87/axum-test-helper.git", branch = "patch-1" }
|
||||
client.workspace = true
|
||||
common-meta = { workspace = true, features = ["testing"] }
|
||||
common-query.workspace = true
|
||||
common-test-util.workspace = true
|
||||
datafusion-common.workspace = true
|
||||
mito2 = { workspace = true, features = ["test"] }
|
||||
session.workspace = true
|
||||
|
||||
@@ -13,54 +13,116 @@
|
||||
// limitations under the License.
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_meta::error::{InvalidHeartbeatResponseSnafu, Result as MetaResult};
|
||||
use common_meta::heartbeat::handler::{
|
||||
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
|
||||
};
|
||||
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
|
||||
use common_meta::instruction::{
|
||||
DowngradeRegion, DowngradeRegionReply, Instruction, InstructionReply, OpenRegion, SimpleReply,
|
||||
};
|
||||
use common_meta::RegionIdent;
|
||||
use common_query::Output;
|
||||
use common_telemetry::error;
|
||||
use futures::future::BoxFuture;
|
||||
use snafu::OptionExt;
|
||||
use store_api::path_utils::region_dir;
|
||||
use store_api::region_engine::SetReadonlyResponse;
|
||||
use store_api::region_request::{RegionCloseRequest, RegionOpenRequest, RegionRequest};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::error;
|
||||
use crate::region_server::RegionServer;
|
||||
|
||||
/// Handler for [Instruction::OpenRegion] and [Instruction::CloseRegion].
|
||||
#[derive(Clone)]
|
||||
pub struct RegionHeartbeatResponseHandler {
|
||||
region_server: RegionServer,
|
||||
}
|
||||
|
||||
/// Handler of the instruction.
|
||||
pub type InstructionHandler =
|
||||
Box<dyn FnOnce(RegionServer) -> BoxFuture<'static, InstructionReply> + Send>;
|
||||
|
||||
impl RegionHeartbeatResponseHandler {
|
||||
/// Returns the [RegionHeartbeatResponseHandler].
|
||||
pub fn new(region_server: RegionServer) -> Self {
|
||||
Self { region_server }
|
||||
}
|
||||
|
||||
fn instruction_to_request(instruction: Instruction) -> MetaResult<(RegionId, RegionRequest)> {
|
||||
/// Builds the [InstructionHandler].
|
||||
fn build_handler(instruction: Instruction) -> MetaResult<InstructionHandler> {
|
||||
match instruction {
|
||||
Instruction::OpenRegion(OpenRegion {
|
||||
region_ident,
|
||||
region_storage_path,
|
||||
options,
|
||||
}) => {
|
||||
let region_id = Self::region_ident_to_region_id(®ion_ident);
|
||||
let open_region_req = RegionRequest::Open(RegionOpenRequest {
|
||||
engine: region_ident.engine,
|
||||
region_dir: region_dir(®ion_storage_path, region_id),
|
||||
options,
|
||||
});
|
||||
Ok((region_id, open_region_req))
|
||||
}
|
||||
Instruction::CloseRegion(region_ident) => {
|
||||
let region_id = Self::region_ident_to_region_id(®ion_ident);
|
||||
let close_region_req = RegionRequest::Close(RegionCloseRequest {});
|
||||
Ok((region_id, close_region_req))
|
||||
}) => Ok(Box::new(|region_server| {
|
||||
Box::pin(async move {
|
||||
let region_id = Self::region_ident_to_region_id(®ion_ident);
|
||||
let request = RegionRequest::Open(RegionOpenRequest {
|
||||
engine: region_ident.engine,
|
||||
region_dir: region_dir(®ion_storage_path, region_id),
|
||||
options,
|
||||
});
|
||||
let result = region_server.handle_request(region_id, request).await;
|
||||
|
||||
let success = result.is_ok();
|
||||
let error = result.as_ref().map_err(|e| e.to_string()).err();
|
||||
|
||||
InstructionReply::OpenRegion(SimpleReply {
|
||||
result: success,
|
||||
error,
|
||||
})
|
||||
})
|
||||
})),
|
||||
Instruction::CloseRegion(region_ident) => Ok(Box::new(|region_server| {
|
||||
Box::pin(async move {
|
||||
let region_id = Self::region_ident_to_region_id(®ion_ident);
|
||||
let request = RegionRequest::Close(RegionCloseRequest {});
|
||||
let result = region_server.handle_request(region_id, request).await;
|
||||
|
||||
match result {
|
||||
Ok(_) => InstructionReply::CloseRegion(SimpleReply {
|
||||
result: true,
|
||||
error: None,
|
||||
}),
|
||||
Err(error::Error::RegionNotFound { .. }) => {
|
||||
InstructionReply::CloseRegion(SimpleReply {
|
||||
result: true,
|
||||
error: None,
|
||||
})
|
||||
}
|
||||
Err(err) => InstructionReply::CloseRegion(SimpleReply {
|
||||
result: false,
|
||||
error: Some(err.to_string()),
|
||||
}),
|
||||
}
|
||||
})
|
||||
})),
|
||||
Instruction::DowngradeRegion(DowngradeRegion { region_id }) => {
|
||||
Ok(Box::new(move |region_server| {
|
||||
Box::pin(async move {
|
||||
match region_server.set_readonly_gracefully(region_id).await {
|
||||
Ok(SetReadonlyResponse::Success { last_entry_id }) => {
|
||||
InstructionReply::DowngradeRegion(DowngradeRegionReply {
|
||||
last_entry_id,
|
||||
exists: true,
|
||||
error: None,
|
||||
})
|
||||
}
|
||||
Ok(SetReadonlyResponse::NotFound) => {
|
||||
InstructionReply::DowngradeRegion(DowngradeRegionReply {
|
||||
last_entry_id: None,
|
||||
exists: false,
|
||||
error: None,
|
||||
})
|
||||
}
|
||||
Err(err) => InstructionReply::DowngradeRegion(DowngradeRegionReply {
|
||||
last_entry_id: None,
|
||||
exists: false,
|
||||
error: Some(err.to_string()),
|
||||
}),
|
||||
}
|
||||
})
|
||||
}))
|
||||
}
|
||||
Instruction::UpgradeRegion(_) => {
|
||||
todo!()
|
||||
@@ -68,77 +130,12 @@ impl RegionHeartbeatResponseHandler {
|
||||
Instruction::InvalidateTableIdCache(_) | Instruction::InvalidateTableNameCache(_) => {
|
||||
InvalidHeartbeatResponseSnafu.fail()
|
||||
}
|
||||
Instruction::DowngradeRegion(_) => {
|
||||
// TODO(weny): add it later.
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn region_ident_to_region_id(region_ident: &RegionIdent) -> RegionId {
|
||||
RegionId::new(region_ident.table_id, region_ident.region_number)
|
||||
}
|
||||
|
||||
fn reply_template_from_instruction(instruction: &Instruction) -> InstructionReply {
|
||||
match instruction {
|
||||
Instruction::OpenRegion(_) => InstructionReply::OpenRegion(SimpleReply {
|
||||
result: false,
|
||||
error: None,
|
||||
}),
|
||||
Instruction::CloseRegion(_) => InstructionReply::CloseRegion(SimpleReply {
|
||||
result: false,
|
||||
error: None,
|
||||
}),
|
||||
Instruction::UpgradeRegion(_) => {
|
||||
todo!()
|
||||
}
|
||||
Instruction::InvalidateTableIdCache(_) | Instruction::InvalidateTableNameCache(_) => {
|
||||
InstructionReply::InvalidateTableCache(SimpleReply {
|
||||
result: false,
|
||||
error: None,
|
||||
})
|
||||
}
|
||||
Instruction::DowngradeRegion(_) => {
|
||||
// TODO(weny): add it later.
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn fill_reply(mut template: InstructionReply, result: Result<Output>) -> InstructionReply {
|
||||
let success = result.is_ok();
|
||||
let error = result.as_ref().map_err(|e| e.to_string()).err();
|
||||
match &mut template {
|
||||
InstructionReply::OpenRegion(reply) => {
|
||||
reply.result = success;
|
||||
reply.error = error;
|
||||
}
|
||||
InstructionReply::CloseRegion(reply) => match result {
|
||||
Err(e) => {
|
||||
if e.status_code() == StatusCode::RegionNotFound {
|
||||
reply.result = true;
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
reply.result = success;
|
||||
reply.error = error;
|
||||
}
|
||||
},
|
||||
InstructionReply::UpgradeRegion(_) => {
|
||||
todo!()
|
||||
}
|
||||
InstructionReply::InvalidateTableCache(reply) => {
|
||||
reply.result = success;
|
||||
reply.error = error;
|
||||
}
|
||||
InstructionReply::DowngradeRegion(_) => {
|
||||
// TODO(weny): add it later.
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
template
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -146,7 +143,9 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
|
||||
fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
|
||||
matches!(
|
||||
ctx.incoming_message.as_ref(),
|
||||
Some((_, Instruction::OpenRegion { .. })) | Some((_, Instruction::CloseRegion { .. }))
|
||||
Some((_, Instruction::OpenRegion { .. }))
|
||||
| Some((_, Instruction::CloseRegion { .. }))
|
||||
| Some((_, Instruction::DowngradeRegion { .. }))
|
||||
)
|
||||
}
|
||||
|
||||
@@ -158,15 +157,11 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
|
||||
|
||||
let mailbox = ctx.mailbox.clone();
|
||||
let region_server = self.region_server.clone();
|
||||
let reply_template = Self::reply_template_from_instruction(&instruction);
|
||||
let (region_id, region_req) = Self::instruction_to_request(instruction)?;
|
||||
let handler = Self::build_handler(instruction)?;
|
||||
let _handle = common_runtime::spawn_bg(async move {
|
||||
let result = region_server.handle_request(region_id, region_req).await;
|
||||
let reply = handler(region_server).await;
|
||||
|
||||
if let Err(e) = mailbox
|
||||
.send((meta, Self::fill_reply(reply_template, result)))
|
||||
.await
|
||||
{
|
||||
if let Err(e) = mailbox.send((meta, reply)).await {
|
||||
error!(e; "Failed to send reply to mailbox");
|
||||
}
|
||||
});
|
||||
@@ -174,3 +169,266 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
|
||||
Ok(HandleControl::Done)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_meta::heartbeat::mailbox::{
|
||||
HeartbeatMailbox, IncomingMessage, MailboxRef, MessageMeta,
|
||||
};
|
||||
use mito2::config::MitoConfig;
|
||||
use mito2::engine::MITO_ENGINE_NAME;
|
||||
use mito2::test_util::{CreateRequestBuilder, TestEnv};
|
||||
use store_api::region_request::RegionRequest;
|
||||
use store_api::storage::RegionId;
|
||||
use tokio::sync::mpsc::{self, Receiver};
|
||||
|
||||
use super::*;
|
||||
use crate::error;
|
||||
use crate::tests::mock_region_server;
|
||||
|
||||
pub struct HeartbeatResponseTestEnv {
|
||||
mailbox: MailboxRef,
|
||||
receiver: Receiver<(MessageMeta, InstructionReply)>,
|
||||
}
|
||||
|
||||
impl HeartbeatResponseTestEnv {
|
||||
pub fn new() -> Self {
|
||||
let (tx, rx) = mpsc::channel(8);
|
||||
let mailbox = Arc::new(HeartbeatMailbox::new(tx));
|
||||
|
||||
HeartbeatResponseTestEnv {
|
||||
mailbox,
|
||||
receiver: rx,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create_handler_ctx(
|
||||
&self,
|
||||
incoming_message: IncomingMessage,
|
||||
) -> HeartbeatResponseHandlerContext {
|
||||
HeartbeatResponseHandlerContext {
|
||||
mailbox: self.mailbox.clone(),
|
||||
response: Default::default(),
|
||||
incoming_message: Some(incoming_message),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn close_region_instruction(region_id: RegionId) -> Instruction {
|
||||
Instruction::CloseRegion(RegionIdent {
|
||||
table_id: region_id.table_id(),
|
||||
region_number: region_id.region_number(),
|
||||
cluster_id: 1,
|
||||
datanode_id: 2,
|
||||
engine: MITO_ENGINE_NAME.to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
fn open_region_instruction(region_id: RegionId, path: &str) -> Instruction {
|
||||
Instruction::OpenRegion(OpenRegion::new(
|
||||
RegionIdent {
|
||||
table_id: region_id.table_id(),
|
||||
region_number: region_id.region_number(),
|
||||
cluster_id: 1,
|
||||
datanode_id: 2,
|
||||
engine: MITO_ENGINE_NAME.to_string(),
|
||||
},
|
||||
path,
|
||||
HashMap::new(),
|
||||
))
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_close_region() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let mut region_server = mock_region_server();
|
||||
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
|
||||
|
||||
let mut engine_env = TestEnv::with_prefix("close-region");
|
||||
let engine = engine_env.create_engine(MitoConfig::default()).await;
|
||||
region_server.register_engine(Arc::new(engine));
|
||||
let region_id = RegionId::new(1024, 1);
|
||||
|
||||
let builder = CreateRequestBuilder::new();
|
||||
let create_req = builder.build();
|
||||
region_server
|
||||
.handle_request(region_id, RegionRequest::Create(create_req))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut heartbeat_env = HeartbeatResponseTestEnv::new();
|
||||
|
||||
// Should be ok, if we try to close it twice.
|
||||
for _ in 0..2 {
|
||||
let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
|
||||
let instruction = close_region_instruction(region_id);
|
||||
|
||||
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
|
||||
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
|
||||
assert_matches!(control, HandleControl::Done);
|
||||
|
||||
let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
|
||||
|
||||
if let InstructionReply::CloseRegion(reply) = reply {
|
||||
assert!(reply.result);
|
||||
assert!(reply.error.is_none());
|
||||
} else {
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
assert_matches!(
|
||||
region_server.set_writable(region_id, true).unwrap_err(),
|
||||
error::Error::RegionNotFound { .. }
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_open_region_ok() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let mut region_server = mock_region_server();
|
||||
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
|
||||
|
||||
let mut engine_env = TestEnv::with_prefix("open-region");
|
||||
let engine = engine_env.create_engine(MitoConfig::default()).await;
|
||||
region_server.register_engine(Arc::new(engine));
|
||||
let region_id = RegionId::new(1024, 1);
|
||||
|
||||
let builder = CreateRequestBuilder::new();
|
||||
let mut create_req = builder.build();
|
||||
let storage_path = "test";
|
||||
create_req.region_dir = region_dir(storage_path, region_id);
|
||||
|
||||
region_server
|
||||
.handle_request(region_id, RegionRequest::Create(create_req))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
region_server
|
||||
.handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
|
||||
.await
|
||||
.unwrap();
|
||||
let mut heartbeat_env = HeartbeatResponseTestEnv::new();
|
||||
|
||||
// Should be ok, if we try to open it twice.
|
||||
for _ in 0..2 {
|
||||
let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
|
||||
let instruction = open_region_instruction(region_id, storage_path);
|
||||
|
||||
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
|
||||
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
|
||||
assert_matches!(control, HandleControl::Done);
|
||||
|
||||
let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
|
||||
|
||||
if let InstructionReply::OpenRegion(reply) = reply {
|
||||
assert!(reply.result);
|
||||
assert!(reply.error.is_none());
|
||||
} else {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_open_not_exists_region() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let mut region_server = mock_region_server();
|
||||
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
|
||||
|
||||
let mut engine_env = TestEnv::with_prefix("open-not-exists-region");
|
||||
let engine = engine_env.create_engine(MitoConfig::default()).await;
|
||||
region_server.register_engine(Arc::new(engine));
|
||||
let region_id = RegionId::new(1024, 1);
|
||||
let storage_path = "test";
|
||||
|
||||
let mut heartbeat_env = HeartbeatResponseTestEnv::new();
|
||||
|
||||
let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
|
||||
let instruction = open_region_instruction(region_id, storage_path);
|
||||
|
||||
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
|
||||
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
|
||||
assert_matches!(control, HandleControl::Done);
|
||||
|
||||
let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
|
||||
|
||||
if let InstructionReply::OpenRegion(reply) = reply {
|
||||
assert!(!reply.result);
|
||||
assert!(reply.error.is_some());
|
||||
} else {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_downgrade_region() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let mut region_server = mock_region_server();
|
||||
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
|
||||
|
||||
let mut engine_env = TestEnv::with_prefix("downgrade-region");
|
||||
let engine = engine_env.create_engine(MitoConfig::default()).await;
|
||||
region_server.register_engine(Arc::new(engine));
|
||||
let region_id = RegionId::new(1024, 1);
|
||||
|
||||
let builder = CreateRequestBuilder::new();
|
||||
let mut create_req = builder.build();
|
||||
let storage_path = "test";
|
||||
create_req.region_dir = region_dir(storage_path, region_id);
|
||||
|
||||
region_server
|
||||
.handle_request(region_id, RegionRequest::Create(create_req))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut heartbeat_env = HeartbeatResponseTestEnv::new();
|
||||
|
||||
// Should be ok, if we try to downgrade it twice.
|
||||
for _ in 0..2 {
|
||||
let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
|
||||
let instruction = Instruction::DowngradeRegion(DowngradeRegion { region_id });
|
||||
|
||||
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
|
||||
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
|
||||
assert_matches!(control, HandleControl::Done);
|
||||
|
||||
let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
|
||||
|
||||
if let InstructionReply::DowngradeRegion(reply) = reply {
|
||||
assert!(reply.exists);
|
||||
assert!(reply.error.is_none());
|
||||
assert_eq!(reply.last_entry_id.unwrap(), 0);
|
||||
} else {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
|
||||
// Downgrades a not exists region.
|
||||
let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
|
||||
let instruction = Instruction::DowngradeRegion(DowngradeRegion {
|
||||
region_id: RegionId::new(2048, 1),
|
||||
});
|
||||
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
|
||||
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
|
||||
assert_matches!(control, HandleControl::Done);
|
||||
|
||||
let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
|
||||
|
||||
if let InstructionReply::DowngradeRegion(reply) = reply {
|
||||
assert!(!reply.exists);
|
||||
assert!(reply.error.is_none());
|
||||
assert!(reply.last_entry_id.is_none());
|
||||
} else {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,5 +26,4 @@ pub mod metrics;
|
||||
pub mod region_server;
|
||||
mod store;
|
||||
#[cfg(test)]
|
||||
#[allow(dead_code)]
|
||||
mod tests;
|
||||
|
||||
@@ -49,7 +49,7 @@ use servers::grpc::region_server::RegionServerHandler;
|
||||
use session::context::{QueryContextBuilder, QueryContextRef};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::region_engine::{RegionEngineRef, RegionRole};
|
||||
use store_api::region_engine::{RegionEngineRef, RegionRole, SetReadonlyResponse};
|
||||
use store_api::region_request::{RegionCloseRequest, RegionRequest};
|
||||
use store_api::storage::{RegionId, ScanRequest};
|
||||
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
|
||||
@@ -148,6 +148,19 @@ impl RegionServer {
|
||||
.with_context(|_| HandleRegionRequestSnafu { region_id })
|
||||
}
|
||||
|
||||
pub async fn set_readonly_gracefully(
|
||||
&self,
|
||||
region_id: RegionId,
|
||||
) -> Result<SetReadonlyResponse> {
|
||||
match self.inner.region_map.get(®ion_id) {
|
||||
Some(engine) => Ok(engine
|
||||
.set_readonly_gracefully(region_id)
|
||||
.await
|
||||
.with_context(|_| HandleRegionRequestSnafu { region_id })?),
|
||||
None => Ok(SetReadonlyResponse::NotFound),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn runtime(&self) -> Arc<Runtime> {
|
||||
self.inner.runtime.clone()
|
||||
}
|
||||
|
||||
@@ -13,19 +13,12 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::meta::HeartbeatResponse;
|
||||
use async_trait::async_trait;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
|
||||
use common_function::scalars::FunctionRef;
|
||||
use common_meta::heartbeat::handler::{
|
||||
HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor,
|
||||
};
|
||||
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta};
|
||||
use common_meta::instruction::{Instruction, OpenRegion, RegionIdent};
|
||||
use common_query::prelude::ScalarUdf;
|
||||
use common_query::Output;
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
@@ -46,51 +39,6 @@ use tokio::sync::mpsc::{Receiver, Sender};
|
||||
use crate::event_listener::NoopRegionServerEventListener;
|
||||
use crate::region_server::RegionServer;
|
||||
|
||||
pub fn test_message_meta(id: u64, subject: &str, to: &str, from: &str) -> MessageMeta {
|
||||
MessageMeta {
|
||||
id,
|
||||
subject: subject.to_string(),
|
||||
to: to.to_string(),
|
||||
from: from.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_instruction(
|
||||
executor: Arc<dyn HeartbeatResponseHandlerExecutor>,
|
||||
mailbox: Arc<HeartbeatMailbox>,
|
||||
instruction: Instruction,
|
||||
) {
|
||||
let response = HeartbeatResponse::default();
|
||||
let mut ctx: HeartbeatResponseHandlerContext =
|
||||
HeartbeatResponseHandlerContext::new(mailbox, response);
|
||||
ctx.incoming_message = Some((test_message_meta(1, "hi", "foo", "bar"), instruction));
|
||||
executor.handle(ctx).await.unwrap();
|
||||
}
|
||||
|
||||
fn close_region_instruction() -> Instruction {
|
||||
Instruction::CloseRegion(RegionIdent {
|
||||
table_id: 1024,
|
||||
region_number: 0,
|
||||
cluster_id: 1,
|
||||
datanode_id: 2,
|
||||
engine: "mito2".to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
fn open_region_instruction() -> Instruction {
|
||||
Instruction::OpenRegion(OpenRegion::new(
|
||||
RegionIdent {
|
||||
table_id: 1024,
|
||||
region_number: 0,
|
||||
cluster_id: 1,
|
||||
datanode_id: 2,
|
||||
engine: "mito2".to_string(),
|
||||
},
|
||||
"path/dir",
|
||||
HashMap::new(),
|
||||
))
|
||||
}
|
||||
|
||||
pub struct MockQueryEngine;
|
||||
|
||||
#[async_trait]
|
||||
|
||||
@@ -50,7 +50,7 @@ use store_api::storage::{ColumnId, RegionId};
|
||||
|
||||
use crate::config::MitoConfig;
|
||||
use crate::engine::listener::EventListenerRef;
|
||||
use crate::engine::MitoEngine;
|
||||
use crate::engine::{MitoEngine, MITO_ENGINE_NAME};
|
||||
use crate::error::Result;
|
||||
use crate::flush::{WriteBufferManager, WriteBufferManagerRef};
|
||||
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
|
||||
@@ -278,6 +278,7 @@ pub struct CreateRequestBuilder {
|
||||
options: HashMap<String, String>,
|
||||
primary_key: Option<Vec<ColumnId>>,
|
||||
all_not_null: bool,
|
||||
engine: String,
|
||||
}
|
||||
|
||||
impl Default for CreateRequestBuilder {
|
||||
@@ -289,6 +290,7 @@ impl Default for CreateRequestBuilder {
|
||||
options: HashMap::new(),
|
||||
primary_key: None,
|
||||
all_not_null: false,
|
||||
engine: MITO_ENGINE_NAME.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -378,7 +380,7 @@ impl CreateRequestBuilder {
|
||||
|
||||
RegionCreateRequest {
|
||||
// We use empty engine name as we already locates the engine.
|
||||
engine: String::new(),
|
||||
engine: self.engine.to_string(),
|
||||
column_metadatas,
|
||||
primary_key: self.primary_key.clone().unwrap_or(primary_key),
|
||||
options: self.options.clone(),
|
||||
|
||||
Reference in New Issue
Block a user