feat: sync region followers after altering regions (#5901)

* feat: close follower regions after dropping leader regions

* chore: upgrade greptime-proto

* feat: sync region followers after alter region operations

* test: add tests

* chore: apply suggestions from CR

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2025-04-18 18:21:35 +08:00
committed by GitHub
parent 115e5a03a8
commit b8c6f1c8ed
25 changed files with 922 additions and 148 deletions

2
Cargo.lock generated
View File

@@ -4727,7 +4727,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=583daa3fbbbe39c90b7b92d13646bc3291d9c941#583daa3fbbbe39c90b7b92d13646bc3291d9c941"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=b6d9cffd43c4e6358805a798f17e03e232994b82#b6d9cffd43c4e6358805a798f17e03e232994b82"
dependencies = [
"prost 0.13.5",
"serde",

View File

@@ -129,7 +129,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "583daa3fbbbe39c90b7b92d13646bc3291d9c941" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "b6d9cffd43c4e6358805a798f17e03e232994b82" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -18,10 +18,12 @@ mod region_request;
mod table_cache_keys;
mod update_metadata;
use api::region::RegionResponse;
use async_trait::async_trait;
use common_catalog::format_full_table_name;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context, LockKey, Procedure, Status};
use common_telemetry::{info, warn};
use common_telemetry::{error, info, warn};
use futures_util::future;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
@@ -30,7 +32,7 @@ use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
use strum::AsRefStr;
use table::metadata::TableId;
use crate::ddl::utils::add_peer_context_if_needed;
use crate::ddl::utils::{add_peer_context_if_needed, sync_follower_regions};
use crate::ddl::DdlContext;
use crate::error::{DecodeJsonSnafu, Error, MetadataCorruptionSnafu, Result};
use crate::key::table_info::TableInfoValue;
@@ -39,7 +41,7 @@ use crate::key::DeserializedValueWithBytes;
use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
use crate::metrics;
use crate::rpc::ddl::AlterTableTask;
use crate::rpc::router::find_leaders;
use crate::rpc::router::{find_leaders, RegionRoute};
pub struct AlterLogicalTablesProcedure {
pub context: DdlContext,
@@ -125,14 +127,20 @@ impl AlterLogicalTablesProcedure {
});
}
// Collects responses from datanodes.
let phy_raw_schemas = future::join_all(alter_region_tasks)
let mut results = future::join_all(alter_region_tasks)
.await
.into_iter()
.map(|res| res.map(|mut res| res.extensions.remove(ALTER_PHYSICAL_EXTENSION_KEY)))
.collect::<Result<Vec<_>>>()?;
// Collects responses from datanodes.
let phy_raw_schemas = results
.iter_mut()
.map(|res| res.extensions.remove(ALTER_PHYSICAL_EXTENSION_KEY))
.collect::<Vec<_>>();
if phy_raw_schemas.is_empty() {
self.submit_sync_region_requests(results, &physical_table_route.region_routes)
.await;
self.data.state = AlterTablesState::UpdateMetadata;
return Ok(Status::executing(true));
}
@@ -155,10 +163,34 @@ impl AlterLogicalTablesProcedure {
warn!("altering logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged");
}
self.submit_sync_region_requests(results, &physical_table_route.region_routes)
.await;
self.data.state = AlterTablesState::UpdateMetadata;
Ok(Status::executing(true))
}
async fn submit_sync_region_requests(
&self,
results: Vec<RegionResponse>,
region_routes: &[RegionRoute],
) {
let table_info = &self.data.physical_table_info.as_ref().unwrap().table_info;
if let Err(err) = sync_follower_regions(
&self.context,
self.data.physical_table_id,
results,
region_routes,
table_info.meta.engine.as_str(),
)
.await
{
error!(err; "Failed to sync regions for table {}, table_id: {}",
format_full_table_name(&table_info.catalog_name, &table_info.schema_name, &table_info.name),
self.data.physical_table_id
);
}
}
pub(crate) async fn on_update_metadata(&mut self) -> Result<Status> {
self.update_physical_table_metadata().await?;
self.update_logical_tables_metadata().await?;

View File

@@ -19,6 +19,7 @@ mod update_metadata;
use std::vec;
use api::region::RegionResponse;
use api::v1::alter_table_expr::Kind;
use api::v1::RenameTable;
use async_trait::async_trait;
@@ -29,7 +30,7 @@ use common_procedure::{
PoisonKeys, Procedure, ProcedureId, Status, StringKey,
};
use common_telemetry::{debug, error, info};
use futures::future;
use futures::future::{self};
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use store_api::storage::RegionId;
@@ -38,7 +39,9 @@ use table::metadata::{RawTableInfo, TableId, TableInfo};
use table::table_reference::TableReference;
use crate::cache_invalidator::Context;
use crate::ddl::utils::{add_peer_context_if_needed, handle_multiple_results, MultipleResults};
use crate::ddl::utils::{
add_peer_context_if_needed, handle_multiple_results, sync_follower_regions, MultipleResults,
};
use crate::ddl::DdlContext;
use crate::error::{AbortProcedureSnafu, Error, NoLeaderSnafu, PutPoisonSnafu, Result};
use crate::instruction::CacheIdent;
@@ -48,7 +51,7 @@ use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
use crate::metrics;
use crate::poison_key::table_poison_key;
use crate::rpc::ddl::AlterTableTask;
use crate::rpc::router::{find_leader_regions, find_leaders, region_distribution};
use crate::rpc::router::{find_leader_regions, find_leaders, region_distribution, RegionRoute};
/// The alter table procedure
pub struct AlterTableProcedure {
@@ -194,7 +197,9 @@ impl AlterTableProcedure {
// Just returns the error, and wait for the next try.
Err(error)
}
MultipleResults::Ok => {
MultipleResults::Ok(results) => {
self.submit_sync_region_requests(results, &physical_table_route.region_routes)
.await;
self.data.state = AlterTableState::UpdateMetadata;
Ok(Status::executing_with_clean_poisons(true))
}
@@ -211,6 +216,26 @@ impl AlterTableProcedure {
}
}
async fn submit_sync_region_requests(
&mut self,
results: Vec<RegionResponse>,
region_routes: &[RegionRoute],
) {
// Safety: filled in `prepare` step.
let table_info = self.data.table_info().unwrap();
if let Err(err) = sync_follower_regions(
&self.context,
self.data.table_id(),
results,
region_routes,
table_info.meta.engine.as_str(),
)
.await
{
error!(err; "Failed to sync regions for table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
}
}
/// Update table metadata.
pub(crate) async fn on_update_metadata(&mut self) -> Result<Status> {
let table_id = self.data.table_id();

View File

@@ -17,12 +17,14 @@ mod metadata;
mod region_request;
mod update_metadata;
use api::region::RegionResponse;
use api::v1::CreateTableExpr;
use async_trait::async_trait;
use common_catalog::consts::METRIC_ENGINE;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use common_telemetry::{debug, warn};
use futures_util::future::join_all;
use common_telemetry::{debug, error, warn};
use futures::future;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use store_api::metadata::ColumnMetadata;
@@ -31,7 +33,7 @@ use store_api::storage::{RegionId, RegionNumber};
use strum::AsRefStr;
use table::metadata::{RawTableInfo, TableId};
use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error};
use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error, sync_follower_regions};
use crate::ddl::DdlContext;
use crate::error::{DecodeJsonSnafu, MetadataCorruptionSnafu, Result};
use crate::key::table_route::TableRouteValue;
@@ -156,14 +158,20 @@ impl CreateLogicalTablesProcedure {
});
}
// Collects response from datanodes.
let phy_raw_schemas = join_all(create_region_tasks)
let mut results = future::join_all(create_region_tasks)
.await
.into_iter()
.map(|res| res.map(|mut res| res.extensions.remove(ALTER_PHYSICAL_EXTENSION_KEY)))
.collect::<Result<Vec<_>>>()?;
// Collects response from datanodes.
let phy_raw_schemas = results
.iter_mut()
.map(|res| res.extensions.remove(ALTER_PHYSICAL_EXTENSION_KEY))
.collect::<Vec<_>>();
if phy_raw_schemas.is_empty() {
self.submit_sync_region_requests(results, region_routes)
.await;
self.data.state = CreateTablesState::CreateMetadata;
return Ok(Status::executing(false));
}
@@ -186,10 +194,30 @@ impl CreateLogicalTablesProcedure {
warn!("creating logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged");
}
self.submit_sync_region_requests(results, region_routes)
.await;
self.data.state = CreateTablesState::CreateMetadata;
Ok(Status::executing(true))
}
async fn submit_sync_region_requests(
&self,
results: Vec<RegionResponse>,
region_routes: &[RegionRoute],
) {
if let Err(err) = sync_follower_regions(
&self.context,
self.data.physical_table_id,
results,
region_routes,
METRIC_ENGINE,
)
.await
{
error!(err; "Failed to sync regions for physical table_id: {}",self.data.physical_table_id);
}
}
}
#[async_trait]

View File

@@ -15,12 +15,13 @@
use std::collections::HashMap;
use api::v1::region::{
region_request, DropRequest as PbDropRegionRequest, RegionRequest, RegionRequestHeader,
region_request, CloseRequest as PbCloseRegionRequest, DropRequest as PbDropRegionRequest,
RegionRequest, RegionRequestHeader,
};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_telemetry::debug;
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{debug, error};
use common_wal::options::WalOptions;
use futures::future::join_all;
use snafu::ensure;
@@ -36,7 +37,8 @@ use crate::instruction::CacheIdent;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::rpc::router::{
find_leader_regions, find_leaders, operating_leader_regions, RegionRoute,
find_follower_regions, find_followers, find_leader_regions, find_leaders,
operating_leader_regions, RegionRoute,
};
/// [Control] indicated to the caller whether to go to the next step.
@@ -210,10 +212,10 @@ impl DropTableExecutor {
region_routes: &[RegionRoute],
fast_path: bool,
) -> Result<()> {
// Drops leader regions on datanodes.
let leaders = find_leaders(region_routes);
let mut drop_region_tasks = Vec::with_capacity(leaders.len());
let table_id = self.table_id;
for datanode in leaders {
let requester = ctx.node_manager.datanode(&datanode).await;
let regions = find_leader_regions(region_routes, &datanode);
@@ -252,6 +254,53 @@ impl DropTableExecutor {
.into_iter()
.collect::<Result<Vec<_>>>()?;
// Drops follower regions on datanodes.
let followers = find_followers(region_routes);
let mut close_region_tasks = Vec::with_capacity(followers.len());
for datanode in followers {
let requester = ctx.node_manager.datanode(&datanode).await;
let regions = find_follower_regions(region_routes, &datanode);
let region_ids = regions
.iter()
.map(|region_number| RegionId::new(table_id, *region_number))
.collect::<Vec<_>>();
for region_id in region_ids {
debug!("Closing region {region_id} on Datanode {datanode:?}");
let request = RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(region_request::Body::Close(PbCloseRegionRequest {
region_id: region_id.as_u64(),
})),
};
let datanode = datanode.clone();
let requester = requester.clone();
close_region_tasks.push(async move {
if let Err(err) = requester.handle(request).await {
if err.status_code() != StatusCode::RegionNotFound {
return Err(add_peer_context_if_needed(datanode)(err));
}
}
Ok(())
});
}
}
// Failure to close follower regions is not critical.
// When a leader region is dropped, follower regions will be unable to renew their leases via metasrv.
// Eventually, these follower regions will be automatically closed by the region livekeeper.
if let Err(err) = join_all(close_region_tasks)
.await
.into_iter()
.collect::<Result<Vec<_>>>()
{
error!(err; "Failed to close follower regions on datanodes, table_id: {}", table_id);
}
// Deletes the leader region from registry.
let region_ids = operating_leader_regions(region_routes);
ctx.leader_region_registry

View File

@@ -18,7 +18,9 @@ use api::v1::column_def::try_as_column_schema;
use api::v1::meta::Partition;
use api::v1::{ColumnDataType, ColumnDef, CreateTableExpr, SemanticType};
use chrono::DateTime;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO2_ENGINE};
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO2_ENGINE, MITO_ENGINE,
};
use datatypes::schema::RawSchema;
use derive_builder::Builder;
use store_api::storage::TableId;
@@ -164,6 +166,7 @@ pub fn test_create_table_task(name: &str, table_id: TableId) -> CreateTableTask
.time_index("ts")
.primary_keys(["host".into()])
.table_name(name)
.engine(MITO_ENGINE)
.build()
.unwrap()
.into();

View File

@@ -45,14 +45,41 @@ impl MockDatanodeHandler for () {
}
#[derive(Clone)]
pub struct DatanodeWatcher(pub mpsc::Sender<(Peer, RegionRequest)>);
pub struct DatanodeWatcher {
sender: mpsc::Sender<(Peer, RegionRequest)>,
handler: Option<fn(Peer, RegionRequest) -> Result<RegionResponse>>,
}
impl DatanodeWatcher {
pub fn new(sender: mpsc::Sender<(Peer, RegionRequest)>) -> Self {
Self {
sender,
handler: None,
}
}
pub fn with_handler(
mut self,
user_handler: fn(Peer, RegionRequest) -> Result<RegionResponse>,
) -> Self {
self.handler = Some(user_handler);
self
}
}
#[async_trait::async_trait]
impl MockDatanodeHandler for DatanodeWatcher {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
self.0.send((peer.clone(), request)).await.unwrap();
Ok(RegionResponse::new(0))
self.sender
.send((peer.clone(), request.clone()))
.await
.unwrap();
if let Some(handler) = self.handler {
handler(peer.clone(), request)
} else {
Ok(RegionResponse::new(0))
}
}
async fn handle_query(

View File

@@ -15,19 +15,33 @@
use std::assert_matches::assert_matches;
use std::sync::Arc;
use api::region::RegionResponse;
use api::v1::meta::Peer;
use api::v1::region::sync_request::ManifestInfo;
use api::v1::region::{region_request, MetricManifestInfo, RegionRequest, SyncRequest};
use api::v1::{ColumnDataType, SemanticType};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_procedure::{Procedure, ProcedureId, Status};
use common_procedure_test::MockContextProvider;
use store_api::metric_engine_consts::MANIFEST_INFO_EXTENSION_KEY;
use store_api::region_engine::RegionManifestInfo;
use store_api::storage::RegionId;
use tokio::sync::mpsc;
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
use crate::ddl::test_util::alter_table::TestAlterTableExprBuilder;
use crate::ddl::test_util::columns::TestColumnDefBuilder;
use crate::ddl::test_util::datanode_handler::NaiveDatanodeHandler;
use crate::ddl::test_util::{create_logical_table, create_physical_table};
use crate::ddl::test_util::datanode_handler::{DatanodeWatcher, NaiveDatanodeHandler};
use crate::ddl::test_util::{
create_logical_table, create_physical_table, create_physical_table_metadata,
test_create_physical_table_task,
};
use crate::error::Error::{AlterLogicalTablesInvalidArguments, TableNotFound};
use crate::error::Result;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::{PhysicalTableRouteValue, TableRouteValue};
use crate::rpc::ddl::AlterTableTask;
use crate::rpc::router::{Region, RegionRoute};
use crate::test_util::{new_ddl_context, MockDatanodeManager};
fn make_alter_logical_table_add_column_task(
@@ -407,3 +421,78 @@ async fn test_on_part_duplicate_alter_request() {
]
);
}
fn alters_request_handler(_peer: Peer, request: RegionRequest) -> Result<RegionResponse> {
if let region_request::Body::Alters(_) = request.body.unwrap() {
let mut response = RegionResponse::new(0);
// Default region id for physical table.
let region_id = RegionId::new(1000, 1);
response.extensions.insert(
MANIFEST_INFO_EXTENSION_KEY.to_string(),
RegionManifestInfo::encode_list(&[(region_id, RegionManifestInfo::metric(1, 0, 2, 0))])
.unwrap(),
);
return Ok(response);
}
Ok(RegionResponse::new(0))
}
#[tokio::test]
async fn test_on_submit_alter_region_request() {
common_telemetry::init_default_ut_logging();
let (tx, mut rx) = mpsc::channel(8);
let handler = DatanodeWatcher::new(tx).with_handler(alters_request_handler);
let node_manager = Arc::new(MockDatanodeManager::new(handler));
let ddl_context = new_ddl_context(node_manager);
let mut create_physical_table_task = test_create_physical_table_task("phy");
let phy_id = 1000u32;
let region_routes = vec![RegionRoute {
region: Region::new_test(RegionId::new(phy_id, 1)),
leader_peer: Some(Peer::empty(1)),
follower_peers: vec![Peer::empty(5)],
leader_state: None,
leader_down_since: None,
}];
create_physical_table_task.set_table_id(phy_id);
create_physical_table_metadata(
&ddl_context,
create_physical_table_task.table_info.clone(),
TableRouteValue::Physical(PhysicalTableRouteValue::new(region_routes)),
)
.await;
create_logical_table(ddl_context.clone(), phy_id, "table1").await;
create_logical_table(ddl_context.clone(), phy_id, "table2").await;
let tasks = vec![
make_alter_logical_table_add_column_task(None, "table1", vec!["new_col".to_string()]),
make_alter_logical_table_add_column_task(None, "table2", vec!["mew_col".to_string()]),
];
let mut procedure = AlterLogicalTablesProcedure::new(tasks, phy_id, ddl_context);
procedure.on_prepare().await.unwrap();
procedure.on_submit_alter_region_requests().await.unwrap();
let mut results = Vec::new();
for _ in 0..2 {
let result = rx.try_recv().unwrap();
results.push(result);
}
rx.try_recv().unwrap_err();
let (peer, request) = results.remove(0);
assert_eq!(peer.id, 1);
assert_matches!(request.body.unwrap(), region_request::Body::Alters(_));
let (peer, request) = results.remove(0);
assert_eq!(peer.id, 5);
assert_matches!(
request.body.unwrap(),
region_request::Body::Sync(SyncRequest {
manifest_info: Some(ManifestInfo::MetricManifestInfo(MetricManifestInfo {
data_manifest_version: 1,
metadata_manifest_version: 2,
..
})),
..
})
);
}

View File

@@ -16,7 +16,9 @@ use std::assert_matches::assert_matches;
use std::collections::HashMap;
use std::sync::Arc;
use api::region::RegionResponse;
use api::v1::alter_table_expr::Kind;
use api::v1::region::sync_request::ManifestInfo;
use api::v1::region::{region_request, RegionRequest};
use api::v1::{
AddColumn, AddColumns, AlterTableExpr, ColumnDataType, ColumnDef as PbColumnDef, DropColumn,
@@ -28,6 +30,8 @@ use common_error::status_code::StatusCode;
use common_procedure::store::poison_store::PoisonStore;
use common_procedure::{ProcedureId, Status};
use common_procedure_test::MockContextProvider;
use store_api::metric_engine_consts::MANIFEST_INFO_EXTENSION_KEY;
use store_api::region_engine::RegionManifestInfo;
use store_api::storage::RegionId;
use table::requests::TTL_KEY;
use tokio::sync::mpsc::{self};
@@ -39,7 +43,7 @@ use crate::ddl::test_util::datanode_handler::{
AllFailureDatanodeHandler, DatanodeWatcher, PartialSuccessDatanodeHandler,
RequestOutdatedErrorDatanodeHandler,
};
use crate::error::Error;
use crate::error::{Error, Result};
use crate::key::datanode_table::DatanodeTableKey;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
@@ -120,10 +124,71 @@ async fn test_on_prepare_table_not_exists_err() {
assert_matches!(err.status_code(), StatusCode::TableNotFound);
}
fn test_alter_table_task(table_name: &str) -> AlterTableTask {
AlterTableTask {
alter_table: AlterTableExpr {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
kind: Some(Kind::DropColumns(DropColumns {
drop_columns: vec![DropColumn {
name: "cpu".to_string(),
}],
})),
},
}
}
fn assert_alter_request(
peer: Peer,
request: RegionRequest,
expected_peer_id: u64,
expected_region_id: RegionId,
) {
assert_eq!(peer.id, expected_peer_id);
let Some(region_request::Body::Alter(req)) = request.body else {
unreachable!();
};
assert_eq!(req.region_id, expected_region_id);
}
fn assert_sync_request(
peer: Peer,
request: RegionRequest,
expected_peer_id: u64,
expected_region_id: RegionId,
expected_manifest_version: u64,
) {
assert_eq!(peer.id, expected_peer_id);
let Some(region_request::Body::Sync(req)) = request.body else {
unreachable!();
};
let Some(ManifestInfo::MitoManifestInfo(info)) = req.manifest_info else {
unreachable!();
};
assert_eq!(info.data_manifest_version, expected_manifest_version);
assert_eq!(req.region_id, expected_region_id);
}
fn alter_request_handler(_peer: Peer, request: RegionRequest) -> Result<RegionResponse> {
if let region_request::Body::Alter(req) = request.body.unwrap() {
let mut response = RegionResponse::new(0);
let region_id = RegionId::from(req.region_id);
response.extensions.insert(
MANIFEST_INFO_EXTENSION_KEY.to_string(),
RegionManifestInfo::encode_list(&[(region_id, RegionManifestInfo::mito(1, 1))])
.unwrap(),
);
return Ok(response);
}
Ok(RegionResponse::new(0))
}
#[tokio::test]
async fn test_on_submit_alter_request() {
let (tx, mut rx) = mpsc::channel(8);
let datanode_handler = DatanodeWatcher(tx);
let datanode_handler = DatanodeWatcher::new(tx).with_handler(alter_request_handler);
let node_manager = Arc::new(MockDatanodeManager::new(datanode_handler));
let ddl_context = new_ddl_context(node_manager);
let table_id = 1024;
@@ -140,18 +205,7 @@ async fn test_on_submit_alter_request() {
.await
.unwrap();
let alter_table_task = AlterTableTask {
alter_table: AlterTableExpr {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
kind: Some(Kind::DropColumns(DropColumns {
drop_columns: vec![DropColumn {
name: "cpu".to_string(),
}],
})),
},
};
let alter_table_task = test_alter_table_task(table_name);
let procedure_id = ProcedureId::random();
let provider = Arc::new(MockContextProvider::default());
let mut procedure =
@@ -162,30 +216,72 @@ async fn test_on_submit_alter_request() {
.await
.unwrap();
let check = |peer: Peer,
request: RegionRequest,
expected_peer_id: u64,
expected_region_id: RegionId| {
assert_eq!(peer.id, expected_peer_id);
let Some(region_request::Body::Alter(req)) = request.body else {
unreachable!();
};
assert_eq!(req.region_id, expected_region_id);
};
let mut results = Vec::new();
for _ in 0..5 {
let result = rx.try_recv().unwrap();
results.push(result);
}
rx.try_recv().unwrap_err();
results.sort_unstable_by(|(a, _), (b, _)| a.id.cmp(&b.id));
let (peer, request) = results.remove(0);
assert_alter_request(peer, request, 1, RegionId::new(table_id, 1));
let (peer, request) = results.remove(0);
assert_alter_request(peer, request, 2, RegionId::new(table_id, 2));
let (peer, request) = results.remove(0);
assert_alter_request(peer, request, 3, RegionId::new(table_id, 3));
let (peer, request) = results.remove(0);
assert_sync_request(peer, request, 4, RegionId::new(table_id, 2), 1);
let (peer, request) = results.remove(0);
assert_sync_request(peer, request, 5, RegionId::new(table_id, 1), 1);
}
#[tokio::test]
async fn test_on_submit_alter_request_without_sync_request() {
let (tx, mut rx) = mpsc::channel(8);
// without use `alter_request_handler`, so no sync request will be sent.
let datanode_handler = DatanodeWatcher::new(tx);
let node_manager = Arc::new(MockDatanodeManager::new(datanode_handler));
let ddl_context = new_ddl_context(node_manager);
let table_id = 1024;
let table_name = "foo";
let task = test_create_table_task(table_name, table_id);
// Puts a value to table name key.
ddl_context
.table_metadata_manager
.create_table_metadata(
task.table_info.clone(),
prepare_table_route(table_id),
HashMap::new(),
)
.await
.unwrap();
let alter_table_task = test_alter_table_task(table_name);
let procedure_id = ProcedureId::random();
let provider = Arc::new(MockContextProvider::default());
let mut procedure =
AlterTableProcedure::new(table_id, alter_table_task, ddl_context.clone()).unwrap();
procedure.on_prepare().await.unwrap();
procedure
.submit_alter_region_requests(procedure_id, provider.as_ref())
.await
.unwrap();
let mut results = Vec::new();
for _ in 0..3 {
let result = rx.try_recv().unwrap();
results.push(result);
}
rx.try_recv().unwrap_err();
results.sort_unstable_by(|(a, _), (b, _)| a.id.cmp(&b.id));
let (peer, request) = results.remove(0);
check(peer, request, 1, RegionId::new(table_id, 1));
assert_alter_request(peer, request, 1, RegionId::new(table_id, 1));
let (peer, request) = results.remove(0);
check(peer, request, 2, RegionId::new(table_id, 2));
assert_alter_request(peer, request, 2, RegionId::new(table_id, 2));
let (peer, request) = results.remove(0);
check(peer, request, 3, RegionId::new(table_id, 3));
assert_alter_request(peer, request, 3, RegionId::new(table_id, 3));
}
#[tokio::test]

View File

@@ -15,20 +15,28 @@
use std::assert_matches::assert_matches;
use std::sync::Arc;
use api::region::RegionResponse;
use api::v1::meta::Peer;
use api::v1::region::sync_request::ManifestInfo;
use api::v1::region::{region_request, MetricManifestInfo, RegionRequest, SyncRequest};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId, Status};
use common_procedure_test::MockContextProvider;
use store_api::metric_engine_consts::MANIFEST_INFO_EXTENSION_KEY;
use store_api::region_engine::RegionManifestInfo;
use store_api::storage::RegionId;
use tokio::sync::mpsc;
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::ddl::test_util::datanode_handler::NaiveDatanodeHandler;
use crate::ddl::test_util::datanode_handler::{DatanodeWatcher, NaiveDatanodeHandler};
use crate::ddl::test_util::{
create_physical_table_metadata, test_create_logical_table_task, test_create_physical_table_task,
};
use crate::ddl::TableMetadata;
use crate::error::Error;
use crate::key::table_route::TableRouteValue;
use crate::error::{Error, Result};
use crate::key::table_route::{PhysicalTableRouteValue, TableRouteValue};
use crate::rpc::router::{Region, RegionRoute};
use crate::test_util::{new_ddl_context, MockDatanodeManager};
#[tokio::test]
@@ -390,3 +398,76 @@ async fn test_on_create_metadata_err() {
let error = procedure.execute(&ctx).await.unwrap_err();
assert!(!error.is_retry_later());
}
fn creates_request_handler(_peer: Peer, request: RegionRequest) -> Result<RegionResponse> {
if let region_request::Body::Creates(_) = request.body.unwrap() {
let mut response = RegionResponse::new(0);
// Default region id for physical table.
let region_id = RegionId::new(1024, 1);
response.extensions.insert(
MANIFEST_INFO_EXTENSION_KEY.to_string(),
RegionManifestInfo::encode_list(&[(region_id, RegionManifestInfo::metric(1, 0, 2, 0))])
.unwrap(),
);
return Ok(response);
}
Ok(RegionResponse::new(0))
}
#[tokio::test]
async fn test_on_submit_create_request() {
common_telemetry::init_default_ut_logging();
let (tx, mut rx) = mpsc::channel(8);
let handler = DatanodeWatcher::new(tx).with_handler(creates_request_handler);
let node_manager = Arc::new(MockDatanodeManager::new(handler));
let ddl_context = new_ddl_context(node_manager);
let mut create_physical_table_task = test_create_physical_table_task("phy_table");
let table_id = 1024u32;
let region_routes = vec![RegionRoute {
region: Region::new_test(RegionId::new(table_id, 1)),
leader_peer: Some(Peer::empty(1)),
follower_peers: vec![Peer::empty(5)],
leader_state: None,
leader_down_since: None,
}];
create_physical_table_task.set_table_id(table_id);
create_physical_table_metadata(
&ddl_context,
create_physical_table_task.table_info.clone(),
TableRouteValue::Physical(PhysicalTableRouteValue::new(region_routes)),
)
.await;
let physical_table_id = table_id;
let task = test_create_logical_table_task("foo");
let yet_another_task = test_create_logical_table_task("bar");
let mut procedure = CreateLogicalTablesProcedure::new(
vec![task, yet_another_task],
physical_table_id,
ddl_context,
);
procedure.on_prepare().await.unwrap();
procedure.on_datanode_create_regions().await.unwrap();
let mut results = Vec::new();
for _ in 0..2 {
let result = rx.try_recv().unwrap();
results.push(result);
}
rx.try_recv().unwrap_err();
let (peer, request) = results.remove(0);
assert_eq!(peer.id, 1);
assert_matches!(request.body.unwrap(), region_request::Body::Creates(_));
let (peer, request) = results.remove(0);
assert_eq!(peer.id, 5);
assert_matches!(
request.body.unwrap(),
region_request::Body::Sync(SyncRequest {
manifest_info: Some(ManifestInfo::MetricManifestInfo(MetricManifestInfo {
data_manifest_version: 1,
metadata_manifest_version: 2,
..
})),
..
})
);
}

View File

@@ -100,7 +100,7 @@ async fn test_on_prepare_table() {
#[tokio::test]
async fn test_on_datanode_drop_regions() {
let (tx, mut rx) = mpsc::channel(8);
let datanode_handler = DatanodeWatcher(tx);
let datanode_handler = DatanodeWatcher::new(tx);
let node_manager = Arc::new(MockDatanodeManager::new(datanode_handler));
let ddl_context = new_ddl_context(node_manager);
let table_id = 1024;
@@ -148,27 +148,39 @@ async fn test_on_datanode_drop_regions() {
let check = |peer: Peer,
request: RegionRequest,
expected_peer_id: u64,
expected_region_id: RegionId| {
expected_region_id: RegionId,
follower: bool| {
assert_eq!(peer.id, expected_peer_id);
let Some(region_request::Body::Drop(req)) = request.body else {
unreachable!();
if follower {
let Some(region_request::Body::Close(req)) = request.body else {
unreachable!();
};
assert_eq!(req.region_id, expected_region_id);
} else {
let Some(region_request::Body::Drop(req)) = request.body else {
unreachable!();
};
assert_eq!(req.region_id, expected_region_id);
};
assert_eq!(req.region_id, expected_region_id);
};
let mut results = Vec::new();
for _ in 0..3 {
for _ in 0..5 {
let result = rx.try_recv().unwrap();
results.push(result);
}
results.sort_unstable_by(|(a, _), (b, _)| a.id.cmp(&b.id));
let (peer, request) = results.remove(0);
check(peer, request, 1, RegionId::new(table_id, 1));
check(peer, request, 1, RegionId::new(table_id, 1), false);
let (peer, request) = results.remove(0);
check(peer, request, 2, RegionId::new(table_id, 2));
check(peer, request, 2, RegionId::new(table_id, 2), false);
let (peer, request) = results.remove(0);
check(peer, request, 3, RegionId::new(table_id, 3));
check(peer, request, 3, RegionId::new(table_id, 3), false);
let (peer, request) = results.remove(0);
check(peer, request, 4, RegionId::new(table_id, 2), true);
let (peer, request) = results.remove(0);
check(peer, request, 5, RegionId::new(table_id, 1), true);
}
#[tokio::test]

View File

@@ -15,27 +15,37 @@
use std::collections::HashMap;
use std::fmt::Debug;
use common_catalog::consts::METRIC_ENGINE;
use api::region::RegionResponse;
use api::v1::region::sync_request::ManifestInfo;
use api::v1::region::{
region_request, MetricManifestInfo, MitoManifestInfo, RegionRequest, RegionRequestHeader,
SyncRequest,
};
use common_catalog::consts::{METRIC_ENGINE, MITO_ENGINE};
use common_error::ext::BoxedError;
use common_procedure::error::Error as ProcedureError;
use common_telemetry::{error, warn};
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{error, info, warn};
use common_wal::options::WalOptions;
use futures::future::join_all;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
use store_api::storage::RegionNumber;
use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, MANIFEST_INFO_EXTENSION_KEY};
use store_api::region_engine::RegionManifestInfo;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;
use table::table_reference::TableReference;
use crate::ddl::DetectingRegion;
use crate::ddl::{DdlContext, DetectingRegion};
use crate::error::{
Error, OperateDatanodeSnafu, ParseWalOptionsSnafu, Result, TableNotFoundSnafu, UnsupportedSnafu,
self, Error, OperateDatanodeSnafu, ParseWalOptionsSnafu, Result, TableNotFoundSnafu,
UnsupportedSnafu,
};
use crate::key::datanode_table::DatanodeTableValue;
use crate::key::table_name::TableNameKey;
use crate::key::TableMetadataManagerRef;
use crate::peer::Peer;
use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::RegionRoute;
use crate::rpc::router::{find_follower_regions, find_followers, RegionRoute};
/// Adds [Peer] context if the error is unretryable.
pub fn add_peer_context_if_needed(datanode: Peer) -> impl FnOnce(Error) -> Error {
@@ -192,8 +202,8 @@ pub fn extract_region_wal_options(
/// - PartialNonRetryable: if any operation is non retryable, the result is non retryable.
/// - AllRetryable: all operations are retryable.
/// - AllNonRetryable: all operations are not retryable.
pub enum MultipleResults {
Ok,
pub enum MultipleResults<T> {
Ok(Vec<T>),
PartialRetryable(Error),
PartialNonRetryable(Error),
AllRetryable(Error),
@@ -205,9 +215,9 @@ pub enum MultipleResults {
/// For partial success, we need to check if the errors are retryable.
/// If all the errors are retryable, we return a retryable error.
/// Otherwise, we return the first error.
pub fn handle_multiple_results<T: Debug>(results: Vec<Result<T>>) -> MultipleResults {
pub fn handle_multiple_results<T: Debug>(results: Vec<Result<T>>) -> MultipleResults<T> {
if results.is_empty() {
return MultipleResults::Ok;
return MultipleResults::Ok(Vec::new());
}
let num_results = results.len();
let mut retryable_results = Vec::new();
@@ -216,7 +226,7 @@ pub fn handle_multiple_results<T: Debug>(results: Vec<Result<T>>) -> MultipleRes
for result in results {
match result {
Ok(_) => ok_results.push(result),
Ok(value) => ok_results.push(value),
Err(err) => {
if err.is_retry_later() {
retryable_results.push(err);
@@ -243,7 +253,7 @@ pub fn handle_multiple_results<T: Debug>(results: Vec<Result<T>>) -> MultipleRes
}
return MultipleResults::AllNonRetryable(non_retryable_results.into_iter().next().unwrap());
} else if ok_results.len() == num_results {
return MultipleResults::Ok;
return MultipleResults::Ok(ok_results);
} else if !retryable_results.is_empty()
&& !ok_results.is_empty()
&& non_retryable_results.is_empty()
@@ -264,6 +274,125 @@ pub fn handle_multiple_results<T: Debug>(results: Vec<Result<T>>) -> MultipleRes
MultipleResults::PartialNonRetryable(non_retryable_results.into_iter().next().unwrap())
}
/// Parses manifest infos from extensions.
pub fn parse_manifest_infos_from_extensions(
extensions: &HashMap<String, Vec<u8>>,
) -> Result<Vec<(RegionId, RegionManifestInfo)>> {
let data_manifest_version =
extensions
.get(MANIFEST_INFO_EXTENSION_KEY)
.context(error::UnexpectedSnafu {
err_msg: "manifest info extension not found",
})?;
let data_manifest_version =
RegionManifestInfo::decode_list(data_manifest_version).context(error::SerdeJsonSnafu {})?;
Ok(data_manifest_version)
}
/// Sync follower regions on datanodes.
pub async fn sync_follower_regions(
context: &DdlContext,
table_id: TableId,
results: Vec<RegionResponse>,
region_routes: &[RegionRoute],
engine: &str,
) -> Result<()> {
if engine != MITO_ENGINE && engine != METRIC_ENGINE {
info!(
"Skip submitting sync region requests for table_id: {}, engine: {}",
table_id, engine
);
return Ok(());
}
let results = results
.into_iter()
.map(|response| parse_manifest_infos_from_extensions(&response.extensions))
.collect::<Result<Vec<_>>>()?
.into_iter()
.flatten()
.collect::<HashMap<_, _>>();
let is_mito_engine = engine == MITO_ENGINE;
let followers = find_followers(region_routes);
if followers.is_empty() {
return Ok(());
}
let mut sync_region_tasks = Vec::with_capacity(followers.len());
for datanode in followers {
let requester = context.node_manager.datanode(&datanode).await;
let regions = find_follower_regions(region_routes, &datanode);
for region in regions {
let region_id = RegionId::new(table_id, region);
let manifest_info = if is_mito_engine {
let region_manifest_info =
results.get(&region_id).context(error::UnexpectedSnafu {
err_msg: format!("No manifest info found for region {}", region_id),
})?;
ensure!(
region_manifest_info.is_mito(),
error::UnexpectedSnafu {
err_msg: format!("Region {} is not a mito region", region_id)
}
);
ManifestInfo::MitoManifestInfo(MitoManifestInfo {
data_manifest_version: region_manifest_info.data_manifest_version(),
})
} else {
let region_manifest_info =
results.get(&region_id).context(error::UnexpectedSnafu {
err_msg: format!("No manifest info found for region {}", region_id),
})?;
ensure!(
region_manifest_info.is_metric(),
error::UnexpectedSnafu {
err_msg: format!("Region {} is not a metric region", region_id)
}
);
ManifestInfo::MetricManifestInfo(MetricManifestInfo {
data_manifest_version: region_manifest_info.data_manifest_version(),
metadata_manifest_version: region_manifest_info
.metadata_manifest_version()
.unwrap_or_default(),
})
};
let request = RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(region_request::Body::Sync(SyncRequest {
region_id: region_id.as_u64(),
manifest_info: Some(manifest_info),
})),
};
let datanode = datanode.clone();
let requester = requester.clone();
sync_region_tasks.push(async move {
requester
.handle(request)
.await
.map_err(add_peer_context_if_needed(datanode))
});
}
}
// Failure to sync region is not critical.
// We try our best to sync the regions.
if let Err(err) = join_all(sync_region_tasks)
.await
.into_iter()
.collect::<Result<Vec<_>>>()
{
error!(err; "Failed to sync follower regions on datanodes, table_id: {}", table_id);
}
info!("Sync follower regions on datanodes, table_id: {}", table_id);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -62,6 +62,7 @@ pub struct TableRoute {
region_leaders: HashMap<RegionNumber, Option<Peer>>,
}
/// Returns the leader peers of the table.
pub fn find_leaders(region_routes: &[RegionRoute]) -> HashSet<Peer> {
region_routes
.iter()
@@ -70,6 +71,15 @@ pub fn find_leaders(region_routes: &[RegionRoute]) -> HashSet<Peer> {
.collect()
}
/// Returns the followers of the table.
pub fn find_followers(region_routes: &[RegionRoute]) -> HashSet<Peer> {
region_routes
.iter()
.flat_map(|x| &x.follower_peers)
.cloned()
.collect()
}
/// Returns the operating leader regions with corresponding [DatanodeId].
pub fn operating_leader_regions(region_routes: &[RegionRoute]) -> Vec<(RegionId, DatanodeId)> {
region_routes
@@ -108,6 +118,7 @@ pub fn find_region_leader(
.cloned()
}
/// Returns the region numbers of the leader regions on the target datanode.
pub fn find_leader_regions(region_routes: &[RegionRoute], datanode: &Peer) -> Vec<RegionNumber> {
region_routes
.iter()
@@ -122,6 +133,19 @@ pub fn find_leader_regions(region_routes: &[RegionRoute], datanode: &Peer) -> Ve
.collect()
}
/// Returns the region numbers of the follower regions on the target datanode.
pub fn find_follower_regions(region_routes: &[RegionRoute], datanode: &Peer) -> Vec<RegionNumber> {
region_routes
.iter()
.filter_map(|x| {
if x.follower_peers.contains(datanode) {
return Some(x.region.id.region_number());
}
None
})
.collect()
}
impl TableRoute {
pub fn new(table: Table, region_routes: Vec<RegionRoute>) -> Self {
let region_leaders = region_routes

View File

@@ -19,7 +19,8 @@ use std::sync::{Arc, RwLock};
use std::time::Duration;
use api::region::RegionResponse;
use api::v1::region::{region_request, RegionResponse as RegionResponseV1};
use api::v1::region::sync_request::ManifestInfo;
use api::v1::region::{region_request, RegionResponse as RegionResponseV1, SyncRequest};
use api::v1::{ResponseHeader, Status};
use arrow_flight::{FlightData, Ticket};
use async_trait::async_trait;
@@ -308,38 +309,6 @@ impl RegionServer {
.with_context(|_| HandleRegionRequestSnafu { region_id })
}
/// Sync region manifest and registers new opened logical regions.
pub async fn sync_region_manifest(
&self,
region_id: RegionId,
manifest_info: RegionManifestInfo,
) -> Result<()> {
let engine_with_status = self
.inner
.region_map
.get(&region_id)
.with_context(|| RegionNotFoundSnafu { region_id })?;
let Some(new_opened_regions) = engine_with_status
.sync_region(region_id, manifest_info)
.await
.with_context(|_| HandleRegionRequestSnafu { region_id })?
.new_opened_logical_region_ids()
else {
return Ok(());
};
for region in new_opened_regions {
self.inner.region_map.insert(
region,
RegionEngineWithStatus::Ready(engine_with_status.engine().clone()),
);
info!("Logical region {} is registered!", region);
}
Ok(())
}
/// Set region role state gracefully.
///
/// For [SettableRegionRoleState::Follower]:
@@ -468,6 +437,52 @@ impl RegionServer {
extensions,
})
}
async fn handle_sync_region_request(&self, request: &SyncRequest) -> Result<RegionResponse> {
let region_id = RegionId::from_u64(request.region_id);
let manifest_info = request
.manifest_info
.context(error::MissingRequiredFieldSnafu {
name: "manifest_info",
})?;
let manifest_info = match manifest_info {
ManifestInfo::MitoManifestInfo(info) => {
RegionManifestInfo::mito(info.data_manifest_version, 0)
}
ManifestInfo::MetricManifestInfo(info) => RegionManifestInfo::metric(
info.data_manifest_version,
0,
info.metadata_manifest_version,
0,
),
};
let tracing_context = TracingContext::from_current_span();
let span = tracing_context.attach(info_span!("RegionServer::handle_sync_region_request"));
self.sync_region(region_id, manifest_info)
.trace(span)
.await
.map(|_| RegionResponse::new(AffectedRows::default()))
}
/// Sync region manifest and registers new opened logical regions.
pub async fn sync_region(
&self,
region_id: RegionId,
manifest_info: RegionManifestInfo,
) -> Result<()> {
let engine_with_status = self
.inner
.region_map
.get(&region_id)
.with_context(|| RegionNotFoundSnafu { region_id })?;
self.inner
.handle_sync_region(engine_with_status.engine(), region_id, manifest_info)
.await
}
}
#[async_trait]
@@ -480,6 +495,9 @@ impl RegionServerHandler for RegionServer {
region_request::Body::Inserts(_) | region_request::Body::Deletes(_) => {
self.handle_requests_in_parallel(request).await
}
region_request::Body::Sync(sync_request) => {
self.handle_sync_region_request(sync_request).await
}
_ => self.handle_requests_in_serial(request).await,
}
.map_err(BoxedError::new)
@@ -861,8 +879,8 @@ impl RegionServerInner {
match result {
Ok(result) => {
for (region_id, region_change) in region_changes {
self.set_region_status_ready(region_id, engine.clone(), region_change)
for (region_id, region_change) in &region_changes {
self.set_region_status_ready(*region_id, engine.clone(), *region_change)
.await?;
}
@@ -933,8 +951,9 @@ impl RegionServerInner {
.inc_by(result.affected_rows as u64);
}
// Sets corresponding region status to ready.
self.set_region_status_ready(region_id, engine, region_change)
self.set_region_status_ready(region_id, engine.clone(), region_change)
.await?;
Ok(RegionResponse {
affected_rows: result.affected_rows,
extensions: result.extensions,
@@ -948,6 +967,32 @@ impl RegionServerInner {
}
}
/// Handles the sync region request.
pub async fn handle_sync_region(
&self,
engine: &RegionEngineRef,
region_id: RegionId,
manifest_info: RegionManifestInfo,
) -> Result<()> {
let Some(new_opened_regions) = engine
.sync_region(region_id, manifest_info)
.await
.with_context(|_| HandleRegionRequestSnafu { region_id })?
.new_opened_logical_region_ids()
else {
warn!("No new opened logical regions");
return Ok(());
};
for region in new_opened_regions {
self.region_map
.insert(region, RegionEngineWithStatus::Ready(engine.clone()));
info!("Logical region {} is registered!", region);
}
Ok(())
}
fn set_region_status_not_ready(
&self,
region_id: RegionId,

View File

@@ -53,7 +53,7 @@ use crate::data_region::DataRegion;
use crate::error::{self, Result, UnsupportedRegionRequestSnafu};
use crate::metadata_region::MetadataRegion;
use crate::row_modifier::RowModifier;
use crate::utils;
use crate::utils::{self, get_region_statistic};
#[cfg_attr(doc, aquamarine::aquamarine)]
/// # Metric Engine
@@ -264,29 +264,7 @@ impl RegionEngine for MetricEngine {
/// Note: Returns `None` if it's a logical region.
fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
if self.inner.is_physical_region(region_id) {
let metadata_region_id = utils::to_metadata_region_id(region_id);
let data_region_id = utils::to_data_region_id(region_id);
let metadata_stat = self.inner.mito.region_statistic(metadata_region_id);
let data_stat = self.inner.mito.region_statistic(data_region_id);
match (metadata_stat, data_stat) {
(Some(metadata_stat), Some(data_stat)) => Some(RegionStatistic {
num_rows: metadata_stat.num_rows + data_stat.num_rows,
memtable_size: metadata_stat.memtable_size + data_stat.memtable_size,
wal_size: metadata_stat.wal_size + data_stat.wal_size,
manifest_size: metadata_stat.manifest_size + data_stat.manifest_size,
sst_size: metadata_stat.sst_size + data_stat.sst_size,
index_size: metadata_stat.index_size + data_stat.index_size,
manifest: RegionManifestInfo::Metric {
data_flushed_entry_id: data_stat.manifest.data_flushed_entry_id(),
data_manifest_version: data_stat.manifest.data_manifest_version(),
metadata_flushed_entry_id: metadata_stat.manifest.data_flushed_entry_id(),
metadata_manifest_version: metadata_stat.manifest.data_manifest_version(),
},
}),
_ => None,
}
get_region_statistic(&self.inner.mito, region_id)
} else {
None
}

View File

@@ -30,7 +30,7 @@ use crate::error::{
LogicalRegionNotFoundSnafu, PhysicalRegionNotFoundSnafu, Result, SerializeColumnMetadataSnafu,
UnexpectedRequestSnafu,
};
use crate::utils::to_data_region_id;
use crate::utils::{append_manifest_info, encode_manifest_info_to_extensions, to_data_region_id};
impl MetricEngineInner {
pub async fn alter_regions(
@@ -63,11 +63,15 @@ impl MetricEngineInner {
.unwrap()
.get_physical_region_id(region_id)
.with_context(|| LogicalRegionNotFoundSnafu { region_id })?;
let mut manifest_infos = Vec::with_capacity(1);
self.alter_logical_regions(physical_region_id, requests, extension_return_value)
.await?;
append_manifest_info(&self.mito, region_id, &mut manifest_infos);
encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
} else {
let grouped_requests =
self.group_logical_region_requests_by_physical_region_id(requests)?;
let mut manifest_infos = Vec::with_capacity(grouped_requests.len());
for (physical_region_id, requests) in grouped_requests {
self.alter_logical_regions(
physical_region_id,
@@ -75,7 +79,9 @@ impl MetricEngineInner {
extension_return_value,
)
.await?;
append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
}
encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
}
}
Ok(0)

View File

@@ -51,7 +51,10 @@ use crate::error::{
Result, SerializeColumnMetadataSnafu, UnexpectedRequestSnafu,
};
use crate::metrics::PHYSICAL_REGION_COUNT;
use crate::utils::{self, to_data_region_id, to_metadata_region_id};
use crate::utils::{
self, append_manifest_info, encode_manifest_info_to_extensions, to_data_region_id,
to_metadata_region_id,
};
const DEFAULT_TABLE_ID_SKIPPING_INDEX_GRANULARITY: u32 = 1024;
@@ -88,11 +91,15 @@ impl MetricEngineInner {
if requests.len() == 1 {
let request = &requests.first().unwrap().1;
let physical_region_id = parse_physical_region_id(request)?;
let mut manifest_infos = Vec::with_capacity(1);
self.create_logical_regions(physical_region_id, requests, extension_return_value)
.await?;
append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
} else {
let grouped_requests =
group_create_logical_region_requests_by_physical_region_id(requests)?;
let mut manifest_infos = Vec::with_capacity(grouped_requests.len());
for (physical_region_id, requests) in grouped_requests {
self.create_logical_regions(
physical_region_id,
@@ -100,7 +107,9 @@ impl MetricEngineInner {
extension_return_value,
)
.await?;
append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
}
encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
}
} else {
return MissingRegionOptionSnafu {}.fail();

View File

@@ -67,6 +67,14 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to serialize region manifest info"))]
SerializeRegionManifestInfo {
#[snafu(source)]
error: serde_json::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to decode base64 column value"))]
DecodeColumnValue {
#[snafu(source)]
@@ -304,7 +312,8 @@ impl ErrorExt for Error {
| DecodeColumnValue { .. }
| ParseRegionId { .. }
| InvalidMetadata { .. }
| SetSkippingIndexOption { .. } => StatusCode::Unexpected,
| SetSkippingIndexOption { .. }
| SerializeRegionManifestInfo { .. } => StatusCode::Unexpected,
PhysicalRegionNotFound { .. } | LogicalRegionNotFound { .. } => {
StatusCode::RegionNotFound

View File

@@ -12,9 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use store_api::metric_engine_consts::{METRIC_DATA_REGION_GROUP, METRIC_METADATA_REGION_GROUP};
use std::collections::HashMap;
use common_telemetry::{info, warn};
use mito2::engine::MitoEngine;
use snafu::ResultExt;
use store_api::metric_engine_consts::{
MANIFEST_INFO_EXTENSION_KEY, METRIC_DATA_REGION_GROUP, METRIC_METADATA_REGION_GROUP,
};
use store_api::region_engine::{RegionEngine, RegionManifestInfo, RegionStatistic};
use store_api::storage::RegionId;
use crate::error::{Result, SerializeRegionManifestInfoSnafu};
/// Change the given [RegionId]'s region group to [METRIC_METADATA_REGION_GROUP].
pub fn to_metadata_region_id(region_id: RegionId) -> RegionId {
let table_id = region_id.table_id();
@@ -29,6 +39,69 @@ pub fn to_data_region_id(region_id: RegionId) -> RegionId {
RegionId::with_group_and_seq(table_id, METRIC_DATA_REGION_GROUP, region_sequence)
}
/// Get the region statistic of the given [RegionId].
pub fn get_region_statistic(mito: &MitoEngine, region_id: RegionId) -> Option<RegionStatistic> {
let metadata_region_id = to_metadata_region_id(region_id);
let data_region_id = to_data_region_id(region_id);
let metadata_stat = mito.region_statistic(metadata_region_id);
let data_stat = mito.region_statistic(data_region_id);
match (&metadata_stat, &data_stat) {
(Some(metadata_stat), Some(data_stat)) => Some(RegionStatistic {
num_rows: metadata_stat.num_rows + data_stat.num_rows,
memtable_size: metadata_stat.memtable_size + data_stat.memtable_size,
wal_size: metadata_stat.wal_size + data_stat.wal_size,
manifest_size: metadata_stat.manifest_size + data_stat.manifest_size,
sst_size: metadata_stat.sst_size + data_stat.sst_size,
index_size: metadata_stat.index_size + data_stat.index_size,
manifest: RegionManifestInfo::Metric {
data_flushed_entry_id: data_stat.manifest.data_flushed_entry_id(),
data_manifest_version: data_stat.manifest.data_manifest_version(),
metadata_flushed_entry_id: metadata_stat.manifest.data_flushed_entry_id(),
metadata_manifest_version: metadata_stat.manifest.data_manifest_version(),
},
}),
_ => {
warn!(
"Failed to get region statistic for region {}, metadata_stat: {:?}, data_stat: {:?}",
region_id, metadata_stat, data_stat
);
None
}
}
}
/// Appends the given [RegionId]'s manifest info to the given list.
pub(crate) fn append_manifest_info(
mito: &MitoEngine,
region_id: RegionId,
manifest_infos: &mut Vec<(RegionId, RegionManifestInfo)>,
) {
if let Some(statistic) = get_region_statistic(mito, region_id) {
manifest_infos.push((region_id, statistic.manifest));
}
}
/// Encodes the given list of ([RegionId], [RegionManifestInfo]) to extensions(key: MANIFEST_INFO_EXTENSION_KEY).
pub(crate) fn encode_manifest_info_to_extensions(
manifest_infos: &[(RegionId, RegionManifestInfo)],
extensions: &mut HashMap<String, Vec<u8>>,
) -> Result<()> {
extensions.insert(
MANIFEST_INFO_EXTENSION_KEY.to_string(),
RegionManifestInfo::encode_list(manifest_infos)
.context(SerializeRegionManifestInfoSnafu)?,
);
for (region_id, manifest_info) in manifest_infos {
info!(
"Added manifest info: {:?} to extensions, region_id: {:?}",
manifest_info, region_id
);
}
Ok(())
}
#[cfg(test)]
mod tests {

View File

@@ -70,7 +70,7 @@ use common_base::Plugins;
use common_error::ext::BoxedError;
use common_meta::key::SchemaMetadataManagerRef;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::tracing;
use common_telemetry::{info, tracing};
use common_wal::options::{WalOptions, WAL_OPTIONS_KEY};
use futures::future::{join_all, try_join_all};
use object_store::manager::ObjectStoreManagerRef;
@@ -80,6 +80,7 @@ use store_api::logstore::provider::Provider;
use store_api::logstore::LogStore;
use store_api::manifest::ManifestVersion;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::MANIFEST_INFO_EXTENSION_KEY;
use store_api::region_engine::{
BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
RegionStatistic, SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse,
@@ -224,6 +225,24 @@ impl MitoEngine {
pub(crate) fn get_region(&self, id: RegionId) -> Option<crate::region::MitoRegionRef> {
self.inner.workers.get_region(id)
}
fn encode_manifest_info_to_extensions(
region_id: &RegionId,
manifest_info: RegionManifestInfo,
extensions: &mut HashMap<String, Vec<u8>>,
) -> Result<()> {
let region_manifest_info = vec![(*region_id, manifest_info)];
extensions.insert(
MANIFEST_INFO_EXTENSION_KEY.to_string(),
RegionManifestInfo::encode_list(&region_manifest_info).context(SerdeJsonSnafu)?,
);
info!(
"Added manifest info: {:?} to extensions, region_id: {:?}",
region_manifest_info, region_id
);
Ok(())
}
}
/// Check whether the region edit is valid. Only adding files to region is considered valid now.
@@ -557,11 +576,26 @@ impl RegionEngine for MitoEngine {
.with_label_values(&[request.request_type()])
.start_timer();
self.inner
let is_alter = matches!(request, RegionRequest::Alter(_));
let mut response = self
.inner
.handle_request(region_id, request)
.await
.map(RegionResponse::new)
.map_err(BoxedError::new)
.map_err(BoxedError::new)?;
if is_alter {
if let Some(statistic) = self.region_statistic(region_id) {
Self::encode_manifest_info_to_extensions(
&region_id,
statistic.manifest,
&mut response.extensions,
)
.map_err(BoxedError::new)?;
}
}
Ok(response)
}
#[tracing::instrument(skip_all)]

View File

@@ -966,6 +966,13 @@ pub enum MetadataError {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unexpected: {}", reason))]
Unexpected {
reason: String,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for MetadataError {

View File

@@ -73,6 +73,10 @@ pub const LOGICAL_TABLE_METADATA_KEY: &str = "on_physical_table";
/// Represent a list of column metadata that are added to physical table.
pub const ALTER_PHYSICAL_EXTENSION_KEY: &str = "ALTER_PHYSICAL";
/// HashMap key to be used in the region server's extension response.
/// Represent the manifest info of a region.
pub const MANIFEST_INFO_EXTENSION_KEY: &str = "MANIFEST_INFO";
/// Returns true if it's a internal column of the metric engine.
pub fn is_metric_engine_internal_column(name: &str) -> bool {
name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME || name == DATA_SCHEMA_TSID_COLUMN_NAME

View File

@@ -549,6 +549,16 @@ impl RegionManifestInfo {
} => Some(*metadata_flushed_entry_id),
}
}
/// Encodes a list of ([RegionId], [RegionManifestInfo]) to a byte array.
pub fn encode_list(manifest_infos: &[(RegionId, Self)]) -> serde_json::Result<Vec<u8>> {
serde_json::to_vec(manifest_infos)
}
/// Decodes a list of ([RegionId], [RegionManifestInfo]) from a byte array.
pub fn decode_list(value: &[u8]) -> serde_json::Result<Vec<(RegionId, Self)>> {
serde_json::from_slice(value)
}
}
impl Default for RegionManifestInfo {

View File

@@ -46,7 +46,7 @@ use crate::logstore::entry;
use crate::metadata::{
ColumnMetadata, DecodeArrowIpcSnafu, DecodeProtoSnafu, InvalidRawRegionRequestSnafu,
InvalidRegionRequestSnafu, InvalidSetRegionOptionRequestSnafu,
InvalidUnsetRegionOptionRequestSnafu, MetadataError, RegionMetadata, Result,
InvalidUnsetRegionOptionRequestSnafu, MetadataError, RegionMetadata, Result, UnexpectedSnafu,
};
use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
use crate::mito_engine_options::{
@@ -153,6 +153,10 @@ impl RegionRequest {
region_request::Body::Drops(drops) => make_region_drops(drops),
region_request::Body::Alters(alters) => make_region_alters(alters),
region_request::Body::BulkInserts(bulk) => make_region_bulk_inserts(bulk),
region_request::Body::Sync(_) => UnexpectedSnafu {
reason: "Sync request should be handled separately by RegionServer",
}
.fail(),
}
}