feat: record the migration events in metasrv (#6579)

* feat: collect procedure event

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* feat: collect region migration events

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* test: add integration test

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* chore: fix docs error

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* chore: fix integration test error

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* chore: change status code for errors

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* refactor: add `event()` in Procedure

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* refactor: improve trait design

1. Add `user_metadata()` in `Procedure` trait;

2. Add `Eventable` trait;

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* chore: polish the code

Signed-off-by: zyy17 <zyylsxm@gmail.com>

---------

Signed-off-by: zyy17 <zyylsxm@gmail.com>
This commit is contained in:
zyy17
2025-08-05 20:30:33 -07:00
committed by GitHub
parent 414db41219
commit cc35bab5e4
32 changed files with 841 additions and 57 deletions

View File

@@ -27,6 +27,7 @@ common-base.workspace = true
common-catalog.workspace = true
common-config.workspace = true
common-error.workspace = true
common-event-recorder.workspace = true
common-grpc.workspace = true
common-meta = { workspace = true, features = ["testing"] }
common-procedure.workspace = true
@@ -59,8 +60,10 @@ object-store.workspace = true
operator = { workspace = true, features = ["testing"] }
prost.workspace = true
query.workspace = true
rand.workspace = true
rstest.workspace = true
rstest_reuse.workspace = true
sea-query.workspace = true
serde_json.workspace = true
servers = { workspace = true, features = ["testing"] }
session.workspace = true

View File

@@ -52,5 +52,5 @@ GT_KAFKA_ENDPOINTS = localhost:9092
```
cd tests-integration/fixtures
docker compose -f docker-compose-standalone.yml up kafka
docker compose -f docker-compose.yml up kafka
```

View File

@@ -14,6 +14,8 @@
use std::collections::HashMap;
use std::env;
use std::net::TcpListener;
use std::ops::RangeInclusive;
use std::sync::Arc;
use std::time::Duration;
@@ -50,12 +52,14 @@ use frontend::frontend::{Frontend, FrontendOptions};
use frontend::heartbeat::HeartbeatTask;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::Instance as FeInstance;
use frontend::server::Services;
use hyper_util::rt::TokioIo;
use meta_client::client::MetaClientBuilder;
use meta_srv::cluster::MetaPeerClientRef;
use meta_srv::metasrv::{Metasrv, MetasrvOptions, SelectorRef};
use meta_srv::mocks::MockInfo;
use object_store::config::ObjectStoreConfig;
use rand::Rng;
use servers::grpc::flight::FlightCraftWrapper;
use servers::grpc::region_server::RegionServerRequestHandler;
use servers::grpc::GrpcOptions;
@@ -177,6 +181,7 @@ impl GreptimeDbClusterBuilder {
pub async fn build_with(
&self,
datanode_options: Vec<DatanodeOptions>,
start_frontend_servers: bool,
guards: Vec<TestGuard>,
) -> GreptimeDbCluster {
let datanodes = datanode_options.len();
@@ -218,7 +223,9 @@ impl GreptimeDbClusterBuilder {
self.wait_datanodes_alive(metasrv.metasrv.meta_peer_client(), datanodes)
.await;
let mut frontend = self.build_frontend(metasrv.clone(), datanode_clients).await;
let mut frontend = self
.build_frontend(metasrv.clone(), datanode_clients, start_frontend_servers)
.await;
test_util::prepare_another_catalog_and_schema(&frontend.instance).await;
@@ -234,10 +241,11 @@ impl GreptimeDbClusterBuilder {
}
}
pub async fn build(&self) -> GreptimeDbCluster {
pub async fn build(&self, start_frontend_servers: bool) -> GreptimeDbCluster {
let datanodes = self.datanodes.unwrap_or(4);
let (datanode_options, guards) = self.build_datanode_options_and_guards(datanodes).await;
self.build_with(datanode_options, guards).await
self.build_with(datanode_options, start_frontend_servers, guards)
.await
}
async fn build_datanode_options_and_guards(
@@ -352,6 +360,7 @@ impl GreptimeDbClusterBuilder {
&self,
metasrv: MockInfo,
datanode_clients: Arc<NodeClients>,
start_frontend_servers: bool,
) -> Frontend {
let mut meta_client = MetaClientBuilder::frontend_default_options()
.channel_manager(metasrv.channel_manager)
@@ -393,37 +402,97 @@ impl GreptimeDbClusterBuilder {
Arc::new(InvalidateCacheHandler::new(cache_registry.clone())),
]);
let options = FrontendOptions::default();
let fe_opts = self.build_frontend_options();
let heartbeat_task = HeartbeatTask::new(
&options,
&fe_opts,
meta_client.clone(),
HeartbeatOptions::default(),
Arc::new(handlers_executor),
);
let server_addr = options.grpc.server_addr.clone();
let instance = FrontendBuilder::new(
options,
fe_opts.clone(),
cached_meta_backend.clone(),
cache_registry.clone(),
catalog_manager,
datanode_clients,
meta_client,
Arc::new(ProcessManager::new(server_addr, None)),
Arc::new(ProcessManager::new(fe_opts.grpc.server_addr.clone(), None)),
)
.with_local_cache_invalidator(cache_registry)
.try_build()
.await
.unwrap();
let instance = Arc::new(instance);
// Build the servers for the frontend.
let servers = if start_frontend_servers {
Services::new(fe_opts, instance.clone(), Plugins::default())
.build()
.unwrap()
} else {
ServerHandlers::default()
};
Frontend {
instance,
servers: ServerHandlers::default(),
servers,
heartbeat_task: Some(heartbeat_task),
export_metrics_task: None,
}
}
fn build_frontend_options(&self) -> FrontendOptions {
let mut fe_opts = FrontendOptions::default();
// Choose a random unused port between [14000, 24000] for local test to avoid conflicts.
let port_range = 14000..=24000;
let max_attempts = 10;
let localhost = "127.0.0.1";
let construct_addr = |port: u16| format!("{}:{}", localhost, port);
fe_opts.http.addr = construct_addr(self.choose_random_unused_port(
port_range.clone(),
max_attempts,
localhost,
));
let grpc_port = self.choose_random_unused_port(port_range.clone(), max_attempts, localhost);
fe_opts.grpc.bind_addr = construct_addr(grpc_port);
fe_opts.grpc.server_addr = construct_addr(grpc_port);
fe_opts.mysql.addr = construct_addr(self.choose_random_unused_port(
port_range.clone(),
max_attempts,
localhost,
));
fe_opts.postgres.addr =
construct_addr(self.choose_random_unused_port(port_range, max_attempts, localhost));
fe_opts
}
// Choose a random unused port between [start, end].
fn choose_random_unused_port(
&self,
port_range: RangeInclusive<u16>,
max_attempts: u16,
addr: &str,
) -> u16 {
let mut rng = rand::rng();
let mut attempts = 0;
while attempts < max_attempts {
let port = rng.random_range(port_range.clone());
if TcpListener::bind(format!("{}:{}", addr, port)).is_ok() {
return port;
}
attempts += 1;
}
panic!("No unused port found");
}
}
async fn build_datanode_clients(

View File

@@ -83,7 +83,7 @@ mod test {
let db = GreptimeDbClusterBuilder::new("test_distributed_flight_do_put")
.await
.build()
.build(false)
.await;
let runtime = common_runtime::global_runtime().clone();

View File

@@ -44,6 +44,6 @@ impl MockDistributedInstance {
pub async fn create_distributed_instance(test_name: &str) -> MockDistributedInstance {
let builder = GreptimeDbClusterBuilder::new(test_name).await;
let cluster = builder.build().await;
let cluster = builder.build(false).await;
MockDistributedInstance(cluster)
}

View File

@@ -97,7 +97,7 @@ impl MockInstanceBuilder {
MockInstanceImpl::Standalone(builder.build().await)
}
MockInstanceBuilder::Distributed(builder) => {
MockInstanceImpl::Distributed(builder.build().await)
MockInstanceImpl::Distributed(builder.build(false).await)
}
}
}
@@ -131,7 +131,9 @@ impl MockInstanceBuilder {
..
} = instance;
MockInstanceImpl::Distributed(builder.build_with(datanode_options, guards).await)
MockInstanceImpl::Distributed(
builder.build_with(datanode_options, false, guards).await,
)
}
}
}
@@ -207,7 +209,7 @@ pub(crate) async fn distributed_with_multiple_object_stores() -> Arc<dyn MockIns
let cluster = GreptimeDbClusterBuilder::new(&test_name)
.await
.with_store_providers(storage_types)
.build()
.build(false)
.await;
Arc::new(MockDistributedInstance(cluster))
}

View File

@@ -16,8 +16,16 @@ use std::sync::Arc;
use std::time::Duration;
use client::{OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::consts::DEFAULT_PRIVATE_SCHEMA_NAME;
use common_event_recorder::{
DEFAULT_EVENTS_TABLE_NAME, DEFAULT_FLUSH_INTERVAL_SECONDS, EVENTS_TABLE_TIMESTAMP_COLUMN_NAME,
EVENTS_TABLE_TYPE_COLUMN_NAME,
};
use common_meta::key::{RegionDistribution, RegionRoleSet, TableMetadataManagerRef};
use common_meta::peer::Peer;
use common_procedure::event::{
EVENTS_TABLE_PROCEDURE_ID_COLUMN_NAME, EVENTS_TABLE_PROCEDURE_STATE_COLUMN_NAME,
};
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::info;
@@ -34,11 +42,17 @@ use frontend::instance::Instance;
use futures::future::BoxFuture;
use meta_srv::error;
use meta_srv::error::Result as MetaResult;
use meta_srv::events::region_migration_event::{
EVENTS_TABLE_DST_NODE_ID_COLUMN_NAME, EVENTS_TABLE_REGION_ID_COLUMN_NAME,
EVENTS_TABLE_REGION_MIGRATION_TRIGGER_REASON_COLUMN_NAME, EVENTS_TABLE_SRC_NODE_ID_COLUMN_NAME,
REGION_MIGRATION_EVENT_TYPE,
};
use meta_srv::metasrv::SelectorContext;
use meta_srv::procedure::region_migration::{
RegionMigrationProcedureTask, RegionMigrationTriggerReason,
};
use meta_srv::selector::{Selector, SelectorOptions};
use sea_query::{Expr, Iden, Order, PostgresQueryBuilder, Query};
use servers::query_handler::sql::SqlQueryHandler;
use session::context::{QueryContext, QueryContextRef};
use store_api::storage::RegionId;
@@ -137,7 +151,7 @@ pub async fn test_region_migration(store_type: StorageType, endpoints: Vec<Strin
}))
.with_shared_home_dir(Arc::new(home_dir))
.with_meta_selector(const_selector.clone())
.build()
.build(true)
.await;
let mut logical_timer = 1685508715000;
let table_metadata_manager = cluster.metasrv.table_metadata_manager().clone();
@@ -233,6 +247,15 @@ pub async fn test_region_migration(store_type: StorageType, endpoints: Vec<Strin
.await
.unwrap_err();
assert!(matches!(err, error::Error::RegionMigrated { .. }));
check_region_migration_events_system_table(
cluster.fe_instance(),
&procedure.unwrap().to_string(),
region_id.as_u64(),
from_peer_id,
to_peer_id,
)
.await;
}
/// A naive metric table region migration test by SQL function
@@ -280,7 +303,7 @@ pub async fn test_metric_table_region_migration_by_sql(
}))
.with_shared_home_dir(Arc::new(home_dir))
.with_meta_selector(const_selector.clone())
.build()
.build(true)
.await;
// Prepares test metric tables.
let table_id = prepare_testing_metric_table(&cluster).await;
@@ -321,11 +344,12 @@ pub async fn test_metric_table_region_migration_by_sql(
// Waits condition by checking procedure state
let frontend = cluster.fe_instance().clone();
let procedure_id_for_closure = procedure_id.clone();
wait_condition(
Duration::from_secs(10),
Box::pin(async move {
loop {
let state = query_procedure_by_sql(&frontend, &procedure_id).await;
let state = query_procedure_by_sql(&frontend, &procedure_id_for_closure).await;
if state == "{\"status\":\"Done\"}" {
info!("Migration done: {state}");
break;
@@ -369,6 +393,15 @@ pub async fn test_metric_table_region_migration_by_sql(
| job1 | 1970-01-01T00:00:00 | 0.0 |
+------+-------------------------+-----+";
check_output_stream(result.unwrap().data, expected).await;
check_region_migration_events_system_table(
cluster.fe_instance(),
&procedure_id,
region_id.as_u64(),
from_peer_id,
to_peer_id,
)
.await;
}
/// A naive region migration test by SQL function
@@ -413,7 +446,7 @@ pub async fn test_region_migration_by_sql(store_type: StorageType, endpoints: Ve
}))
.with_shared_home_dir(Arc::new(home_dir))
.with_meta_selector(const_selector.clone())
.build()
.build(true)
.await;
let mut logical_timer = 1685508715000;
@@ -454,11 +487,12 @@ pub async fn test_region_migration_by_sql(store_type: StorageType, endpoints: Ve
// Waits condition by checking procedure state
let frontend = cluster.fe_instance().clone();
let procedure_id_for_closure = procedure_id.clone();
wait_condition(
Duration::from_secs(10),
Box::pin(async move {
loop {
let state = query_procedure_by_sql(&frontend, &procedure_id).await;
let state = query_procedure_by_sql(&frontend, &procedure_id_for_closure).await;
if state == "{\"status\":\"Done\"}" {
info!("Migration done: {state}");
break;
@@ -471,6 +505,15 @@ pub async fn test_region_migration_by_sql(store_type: StorageType, endpoints: Ve
)
.await;
check_region_migration_events_system_table(
cluster.fe_instance(),
&procedure_id,
region_id.as_u64(),
from_peer_id,
to_peer_id,
)
.await;
// Inserts more table.
let results = insert_values(cluster.fe_instance(), logical_timer).await;
for result in results {
@@ -543,7 +586,7 @@ pub async fn test_region_migration_multiple_regions(
}))
.with_shared_home_dir(Arc::new(home_dir))
.with_meta_selector(const_selector.clone())
.build()
.build(true)
.await;
let mut logical_timer = 1685508715000;
let table_metadata_manager = cluster.metasrv.table_metadata_manager().clone();
@@ -634,6 +677,15 @@ pub async fn test_region_migration_multiple_regions(
assert!(matches!(result.unwrap().data, OutputData::AffectedRows(1)));
}
check_region_migration_events_system_table(
cluster.fe_instance(),
&procedure.unwrap().to_string(),
region_id.as_u64(),
from_peer_id,
to_peer_id,
)
.await;
// Asserts the writes.
assert_values(cluster.fe_instance()).await;
@@ -693,7 +745,7 @@ pub async fn test_region_migration_all_regions(store_type: StorageType, endpoint
}))
.with_shared_home_dir(Arc::new(home_dir))
.with_meta_selector(const_selector.clone())
.build()
.build(true)
.await;
let mut logical_timer = 1685508715000;
let table_metadata_manager = cluster.metasrv.table_metadata_manager().clone();
@@ -768,6 +820,15 @@ pub async fn test_region_migration_all_regions(store_type: StorageType, endpoint
)
.await;
check_region_migration_events_system_table(
cluster.fe_instance(),
&procedure.unwrap().to_string(),
region_id.as_u64(),
from_peer_id,
to_peer_id,
)
.await;
// Inserts more table.
let results = insert_values(cluster.fe_instance(), logical_timer).await;
for result in results {
@@ -835,7 +896,7 @@ pub async fn test_region_migration_incorrect_from_peer(
}))
.with_shared_home_dir(Arc::new(home_dir))
.with_meta_selector(const_selector.clone())
.build()
.build(true)
.await;
let logical_timer = 1685508715000;
let table_metadata_manager = cluster.metasrv.table_metadata_manager().clone();
@@ -918,7 +979,7 @@ pub async fn test_region_migration_incorrect_region_id(
}))
.with_shared_home_dir(Arc::new(home_dir))
.with_meta_selector(const_selector.clone())
.build()
.build(true)
.await;
let logical_timer = 1685508715000;
let table_metadata_manager = cluster.metasrv.table_metadata_manager().clone();
@@ -1226,3 +1287,85 @@ async fn run_sql(
info!("Run SQL: {sql}");
instance.do_query(sql, query_ctx).await.remove(0)
}
enum RegionMigrationEvents {
ProcedureId,
Timestamp,
ProcedureState,
Schema,
Table,
EventType,
RegionMigrationTriggerReason,
RegionId,
SrcNodeId,
DstNodeId,
}
impl Iden for RegionMigrationEvents {
fn unquoted(&self, s: &mut dyn std::fmt::Write) {
write!(
s,
"{}",
match self {
Self::ProcedureId => EVENTS_TABLE_PROCEDURE_ID_COLUMN_NAME,
Self::Timestamp => EVENTS_TABLE_TIMESTAMP_COLUMN_NAME,
Self::ProcedureState => EVENTS_TABLE_PROCEDURE_STATE_COLUMN_NAME,
Self::Schema => DEFAULT_PRIVATE_SCHEMA_NAME,
Self::Table => DEFAULT_EVENTS_TABLE_NAME,
Self::EventType => EVENTS_TABLE_TYPE_COLUMN_NAME,
Self::RegionMigrationTriggerReason =>
EVENTS_TABLE_REGION_MIGRATION_TRIGGER_REASON_COLUMN_NAME,
Self::RegionId => EVENTS_TABLE_REGION_ID_COLUMN_NAME,
Self::SrcNodeId => EVENTS_TABLE_SRC_NODE_ID_COLUMN_NAME,
Self::DstNodeId => EVENTS_TABLE_DST_NODE_ID_COLUMN_NAME,
}
)
.unwrap();
}
}
async fn check_region_migration_events_system_table(
fe_instance: &Arc<Instance>,
procedure_id: &str,
region_id: u64,
from_peer_id: u64,
to_peer_id: u64,
) {
// Sleep enough time to ensure the event is recorded.
tokio::time::sleep(DEFAULT_FLUSH_INTERVAL_SECONDS * 2).await;
// The query is equivalent to the following SQL:
// SELECT region_migration_trigger_reason, procedure_state FROM greptime_private.events WHERE
// type = 'region_migration' AND
// procedure_id = '${procedure_id}' AND
// table_id = ${table_id} AND
// region_id = ${region_id} AND
// region_migration_src_node_id = ${from_peer_id} AND
// region_migration_dst_node_id = ${to_peer_id}
// ORDER BY timestamp ASC
let query = Query::select()
.column(RegionMigrationEvents::RegionMigrationTriggerReason)
.column(RegionMigrationEvents::ProcedureState)
.from((RegionMigrationEvents::Schema, RegionMigrationEvents::Table))
.and_where(Expr::col(RegionMigrationEvents::EventType).eq(REGION_MIGRATION_EVENT_TYPE))
.and_where(Expr::col(RegionMigrationEvents::ProcedureId).eq(procedure_id))
.and_where(Expr::col(RegionMigrationEvents::RegionId).eq(region_id))
.and_where(Expr::col(RegionMigrationEvents::SrcNodeId).eq(from_peer_id))
.and_where(Expr::col(RegionMigrationEvents::DstNodeId).eq(to_peer_id))
.order_by(RegionMigrationEvents::Timestamp, Order::Asc)
.to_string(PostgresQueryBuilder);
let result = fe_instance
.do_query(&query, QueryContext::arc())
.await
.remove(0);
let expected = "\
+---------------------------------+-----------------+
| region_migration_trigger_reason | procedure_state |
+---------------------------------+-----------------+
| Manual | Running |
| Manual | Done |
+---------------------------------+-----------------+";
check_output_stream(result.unwrap().data, expected).await;
}