fix: prevent registering logical regions with AliveKeeper (#3965)

* fix: register logical region

* chore: fix Clippy

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2024-05-17 16:38:35 +09:00
committed by GitHub
parent 0168d43d60
commit f696f41a02
7 changed files with 168 additions and 75 deletions

View File

@@ -18,7 +18,7 @@ use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_error::{GREPTIME_DB_HEADER_ERROR_CODE, GREPTIME_DB_HEADER_ERROR_MSG};
use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
use snafu::{location, Location, Snafu};
use tonic::{Code, Status};
#[derive(Snafu)]
@@ -83,14 +83,28 @@ pub enum Error {
},
#[snafu(display("Failed to request RegionServer, code: {}", code))]
RegionServer { code: Code, source: BoxedError },
RegionServer {
code: Code,
source: BoxedError,
#[snafu(implicit)]
location: Location,
},
// Server error carried in Tonic Status's metadata.
#[snafu(display("{}", msg))]
Server { code: StatusCode, msg: String },
Server {
code: StatusCode,
msg: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Illegal Database response: {err_msg}"))]
IllegalDatabaseResponse { err_msg: String },
IllegalDatabaseResponse {
err_msg: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to send request with streaming: {}", err_msg))]
ClientStreaming {
@@ -148,7 +162,11 @@ impl From<Status> for Error {
let msg = get_metadata_value(&e, GREPTIME_DB_HEADER_ERROR_MSG)
.unwrap_or_else(|| e.message().to_string());
Self::Server { code, msg }
Self::Server {
code,
msg,
location: location!(),
}
}
}

View File

@@ -189,6 +189,7 @@ impl RegionRequester {
error::Error::RegionServer {
code,
source: BoxedError::new(err),
location: location!(),
}
})?
.into_inner();
@@ -272,7 +273,7 @@ mod test {
err_msg: "blabla".to_string(),
}),
}));
let Server { code, msg } = result.unwrap_err() else {
let Server { code, msg, .. } = result.unwrap_err() else {
unreachable!()
};
assert_eq!(code, StatusCode::Internal);

View File

@@ -516,6 +516,7 @@ mod tests {
use common_meta::key::datanode_table::DatanodeTableManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
use mito2::engine::MITO_ENGINE_NAME;
use store_api::region_request::RegionRequest;
use store_api::storage::RegionId;
@@ -528,7 +529,7 @@ mod tests {
let txn = mgr
.build_create_txn(
1028,
"mock",
MITO_ENGINE_NAME,
"foo/bar/weny",
HashMap::from([("foo".to_string(), "bar".to_string())]),
HashMap::default(),
@@ -542,8 +543,9 @@ mod tests {
#[tokio::test]
async fn test_initialize_region_server() {
common_telemetry::init_default_ut_logging();
let mut mock_region_server = mock_region_server();
let (mock_region, mut mock_region_handler) = MockRegionEngine::new();
let (mock_region, mut mock_region_handler) = MockRegionEngine::new(MITO_ENGINE_NAME);
mock_region_server.register_engine(mock_region.clone());

View File

@@ -121,6 +121,7 @@ mod tests {
use std::time::Duration;
use common_meta::instruction::{InstructionReply, UpgradeRegion};
use mito2::engine::MITO_ENGINE_NAME;
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;
use tokio::time::Instant;
@@ -133,7 +134,7 @@ mod tests {
#[tokio::test]
async fn test_region_not_exist() {
let mut mock_region_server = mock_region_server();
let (mock_engine, _) = MockRegionEngine::new();
let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
mock_region_server.register_engine(mock_engine);
let handler_context = HandlerContext {
@@ -167,13 +168,14 @@ mod tests {
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(|region_engine| {
region_engine.mock_role = Some(Some(RegionRole::Leader));
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
// Should be unreachable.
unreachable!();
}));
});
let (mock_engine, _) =
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
region_engine.mock_role = Some(Some(RegionRole::Leader));
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
// Should be unreachable.
unreachable!();
}));
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext {
@@ -207,13 +209,14 @@ mod tests {
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(|region_engine| {
// Region is not ready.
region_engine.mock_role = Some(Some(RegionRole::Follower));
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
// Note: Don't change.
region_engine.handle_request_delay = Some(Duration::from_secs(100));
});
let (mock_engine, _) =
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
// Region is not ready.
region_engine.mock_role = Some(Some(RegionRole::Follower));
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
// Note: Don't change.
region_engine.handle_request_delay = Some(Duration::from_secs(100));
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext {
@@ -247,13 +250,14 @@ mod tests {
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(|region_engine| {
// Region is not ready.
region_engine.mock_role = Some(Some(RegionRole::Follower));
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
// Note: Don't change.
region_engine.handle_request_delay = Some(Duration::from_millis(300));
});
let (mock_engine, _) =
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
// Region is not ready.
region_engine.mock_role = Some(Some(RegionRole::Follower));
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
// Note: Don't change.
region_engine.handle_request_delay = Some(Duration::from_millis(300));
});
mock_region_server.register_test_region(region_id, mock_engine);
let waits = vec![
@@ -308,18 +312,19 @@ mod tests {
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(|region_engine| {
// Region is not ready.
region_engine.mock_role = Some(Some(RegionRole::Follower));
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
error::UnexpectedSnafu {
violated: "mock_error".to_string(),
}
.fail()
}));
// Note: Don't change.
region_engine.handle_request_delay = Some(Duration::from_millis(100));
});
let (mock_engine, _) =
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
// Region is not ready.
region_engine.mock_role = Some(Some(RegionRole::Follower));
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
error::UnexpectedSnafu {
violated: "mock_error".to_string(),
}
.fail()
}));
// Note: Don't change.
region_engine.handle_request_delay = Some(Duration::from_millis(100));
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext {

View File

@@ -34,6 +34,7 @@ use common_telemetry::{info, warn};
use dashmap::DashMap;
use futures_util::future::try_join_all;
use metric_engine::engine::MetricEngine;
use mito2::engine::MITO_ENGINE_NAME;
use prost::Message;
pub use query::dummy_catalog::{
DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef,
@@ -44,7 +45,9 @@ use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
use servers::grpc::region_server::RegionServerHandler;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{OptionExt, ResultExt};
use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY};
use store_api::metric_engine_consts::{
FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
};
use store_api::region_engine::{RegionEngineRef, RegionRole, SetReadonlyResponse};
use store_api::region_request::{AffectedRows, RegionCloseRequest, RegionRequest};
use store_api::storage::RegionId;
@@ -403,7 +406,7 @@ impl RegionServerInner {
let current_region_status = self.region_map.get(&region_id);
let engine = match region_change {
RegionChange::Register(ref engine_type, _) => match current_region_status {
RegionChange::Register(attribute) => match current_region_status {
Some(status) => match status.clone() {
RegionEngineWithStatus::Registering(_) => {
return Ok(CurrentEngine::EarlyReturn(0))
@@ -417,8 +420,10 @@ impl RegionServerInner {
.engines
.read()
.unwrap()
.get(engine_type)
.with_context(|| RegionEngineNotFoundSnafu { name: engine_type })?
.get(attribute.engine())
.with_context(|| RegionEngineNotFoundSnafu {
name: attribute.engine(),
})?
.clone(),
},
RegionChange::Deregisters => match current_region_status {
@@ -461,11 +466,13 @@ impl RegionServerInner {
.start_timer();
let region_change = match &request {
RegionRequest::Create(create) => RegionChange::Register(create.engine.clone(), false),
RegionRequest::Create(create) => {
let attribute = parse_region_attribute(&create.engine, &create.options)?;
RegionChange::Register(attribute)
}
RegionRequest::Open(open) => {
let is_opening_physical_region =
open.options.contains_key(PHYSICAL_TABLE_METADATA_KEY);
RegionChange::Register(open.engine.clone(), is_opening_physical_region)
let attribute = parse_region_attribute(&open.engine, &open.options)?;
RegionChange::Register(attribute)
}
RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters,
RegionRequest::Put(_)
@@ -514,7 +521,7 @@ impl RegionServerInner {
region_change: &RegionChange,
) {
match region_change {
RegionChange::Register(_, _) => {
RegionChange::Register(_) => {
self.region_map.insert(
region_id,
RegionEngineWithStatus::Registering(engine.clone()),
@@ -533,7 +540,7 @@ impl RegionServerInner {
fn unset_region_status(&self, region_id: RegionId, region_change: RegionChange) {
match region_change {
RegionChange::None => {}
RegionChange::Register(_, _) | RegionChange::Deregisters => {
RegionChange::Register(_) | RegionChange::Deregisters => {
self.region_map.remove(&region_id);
}
}
@@ -548,15 +555,28 @@ impl RegionServerInner {
let engine_type = engine.name();
match region_change {
RegionChange::None => {}
RegionChange::Register(_, is_opening_physical_region) => {
if is_opening_physical_region {
self.register_logical_regions(&engine, region_id).await?;
}
info!("Region {region_id} is registered to engine {engine_type}");
RegionChange::Register(attribute) => {
info!(
"Region {region_id} is registered to engine {}",
attribute.engine()
);
self.region_map
.insert(region_id, RegionEngineWithStatus::Ready(engine));
self.event_listener.on_region_registered(region_id);
.insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
match attribute {
RegionAttribute::Metric { physical } => {
if physical {
// Registers the logical regions belong to the physical region (`region_id`).
self.register_logical_regions(&engine, region_id).await?;
// We only send the `on_region_registered` event of the physical region.
self.event_listener.on_region_registered(region_id);
}
}
RegionAttribute::Mito => self.event_listener.on_region_registered(region_id),
RegionAttribute::File => {
// do nothing
}
}
}
RegionChange::Deregisters => {
info!("Region {region_id} is deregistered from engine {engine_type}");
@@ -699,10 +719,45 @@ impl RegionServerInner {
enum RegionChange {
None,
Register(String, bool),
Register(RegionAttribute),
Deregisters,
}
fn parse_region_attribute(
engine: &str,
options: &HashMap<String, String>,
) -> Result<RegionAttribute> {
match engine {
MITO_ENGINE_NAME => Ok(RegionAttribute::Mito),
METRIC_ENGINE_NAME => {
let physical = !options.contains_key(LOGICAL_TABLE_METADATA_KEY);
Ok(RegionAttribute::Metric { physical })
}
FILE_ENGINE_NAME => Ok(RegionAttribute::File),
_ => error::UnexpectedSnafu {
violated: format!("Unknown engine: {}", engine),
}
.fail(),
}
}
enum RegionAttribute {
Mito,
Metric { physical: bool },
File,
}
impl RegionAttribute {
fn engine(&self) -> &'static str {
match self {
RegionAttribute::Mito => MITO_ENGINE_NAME,
RegionAttribute::Metric { .. } => METRIC_ENGINE_NAME,
RegionAttribute::File => FILE_ENGINE_NAME,
}
}
}
#[cfg(test)]
mod tests {
@@ -723,7 +778,7 @@ mod tests {
common_telemetry::init_default_ut_logging();
let mut mock_region_server = mock_region_server();
let (engine, _receiver) = MockRegionEngine::new();
let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
let engine_name = engine.name();
mock_region_server.register_engine(engine.clone());
@@ -781,7 +836,7 @@ mod tests {
common_telemetry::init_default_ut_logging();
let mut mock_region_server = mock_region_server();
let (engine, _receiver) = MockRegionEngine::new();
let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
mock_region_server.register_engine(engine.clone());
@@ -832,7 +887,7 @@ mod tests {
common_telemetry::init_default_ut_logging();
let mut mock_region_server = mock_region_server();
let (engine, _receiver) = MockRegionEngine::new();
let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
mock_region_server.register_engine(engine.clone());
@@ -857,13 +912,15 @@ mod tests {
common_telemetry::init_default_ut_logging();
let mut mock_region_server = mock_region_server();
let (engine, _receiver) =
MockRegionEngine::with_mock_fn(Box::new(|_region_id, _request| {
let (engine, _receiver) = MockRegionEngine::with_mock_fn(
MITO_ENGINE_NAME,
Box::new(|_region_id, _request| {
error::UnexpectedSnafu {
violated: "test".to_string(),
}
.fail()
}));
}),
);
mock_region_server.register_engine(engine.clone());
@@ -904,7 +961,7 @@ mod tests {
common_telemetry::init_default_ut_logging();
let mut mock_region_server = mock_region_server();
let (engine, _) = MockRegionEngine::new();
let (engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
mock_region_server.register_engine(engine.clone());
let region_id = RegionId::new(1024, 1);
@@ -950,7 +1007,7 @@ mod tests {
CurrentEngineTest {
region_id,
current_region_status: None,
region_change: RegionChange::Register(engine.name().to_string(), false),
region_change: RegionChange::Register(RegionAttribute::Mito),
assert: Box::new(|result| {
let current_engine = result.unwrap();
assert_matches!(current_engine, CurrentEngine::Engine(_));
@@ -959,7 +1016,7 @@ mod tests {
CurrentEngineTest {
region_id,
current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
region_change: RegionChange::Register(engine.name().to_string(), false),
region_change: RegionChange::Register(RegionAttribute::Mito),
assert: Box::new(|result| {
let current_engine = result.unwrap();
assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
@@ -968,7 +1025,7 @@ mod tests {
CurrentEngineTest {
region_id,
current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
region_change: RegionChange::Register(engine.name().to_string(), false),
region_change: RegionChange::Register(RegionAttribute::Mito),
assert: Box::new(|result| {
let err = result.unwrap_err();
assert_eq!(err.status_code(), StatusCode::RegionBusy);
@@ -977,7 +1034,7 @@ mod tests {
CurrentEngineTest {
region_id,
current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
region_change: RegionChange::Register(engine.name().to_string(), false),
region_change: RegionChange::Register(RegionAttribute::Mito),
assert: Box::new(|result| {
let current_engine = result.unwrap();
assert_matches!(current_engine, CurrentEngine::Engine(_));

View File

@@ -106,10 +106,11 @@ pub struct MockRegionEngine {
pub(crate) handle_request_delay: Option<Duration>,
pub(crate) handle_request_mock_fn: Option<MockRequestHandler>,
pub(crate) mock_role: Option<Option<RegionRole>>,
engine: String,
}
impl MockRegionEngine {
pub fn new() -> (Arc<Self>, Receiver<(RegionId, RegionRequest)>) {
pub fn new(engine: &str) -> (Arc<Self>, Receiver<(RegionId, RegionRequest)>) {
let (tx, rx) = tokio::sync::mpsc::channel(8);
(
@@ -118,12 +119,14 @@ impl MockRegionEngine {
sender: tx,
handle_request_mock_fn: None,
mock_role: None,
engine: engine.to_string(),
}),
rx,
)
}
pub fn with_mock_fn(
engine: &str,
mock_fn: MockRequestHandler,
) -> (Arc<Self>, Receiver<(RegionId, RegionRequest)>) {
let (tx, rx) = tokio::sync::mpsc::channel(8);
@@ -134,12 +137,16 @@ impl MockRegionEngine {
sender: tx,
handle_request_mock_fn: Some(mock_fn),
mock_role: None,
engine: engine.to_string(),
}),
rx,
)
}
pub fn with_custom_apply_fn<F>(apply: F) -> (Arc<Self>, Receiver<(RegionId, RegionRequest)>)
pub fn with_custom_apply_fn<F>(
engine: &str,
apply: F,
) -> (Arc<Self>, Receiver<(RegionId, RegionRequest)>)
where
F: FnOnce(&mut MockRegionEngine),
{
@@ -149,6 +156,7 @@ impl MockRegionEngine {
sender: tx,
handle_request_mock_fn: None,
mock_role: None,
engine: engine.to_string(),
};
apply(&mut region_engine);
@@ -160,7 +168,7 @@ impl MockRegionEngine {
#[async_trait::async_trait]
impl RegionEngine for MockRegionEngine {
fn name(&self) -> &str {
"mock"
&self.engine
}
async fn handle_request(

View File

@@ -39,6 +39,8 @@ pub const DATA_REGION_SUBDIR: &str = "data";
pub const METRIC_ENGINE_NAME: &str = "metric";
pub const FILE_ENGINE_NAME: &str = "file";
/// Metadata key present in the `CREATE TABLE ... WITH ()` clause. This key is
/// used to identify the table is a physical metric table. E.g.:
/// ```sql