From cc35bab5e4225abfd357d1baff2d18a6051d16f5 Mon Sep 17 00:00:00 2001 From: zyy17 Date: Tue, 5 Aug 2025 20:30:33 -0700 Subject: [PATCH] feat: record the migration events in metasrv (#6579) * feat: collect procedure event Signed-off-by: zyy17 * feat: collect region migration events Signed-off-by: zyy17 * test: add integration test Signed-off-by: zyy17 * chore: fix docs error Signed-off-by: zyy17 * chore: fix integration test error Signed-off-by: zyy17 * chore: change status code for errors Signed-off-by: zyy17 * refactor: add `event()` in Procedure Signed-off-by: zyy17 * refactor: improve trait design 1. Add `user_metadata()` in `Procedure` trait; 2. Add `Eventable` trait; Signed-off-by: zyy17 * chore: polish the code Signed-off-by: zyy17 --------- Signed-off-by: zyy17 --- Cargo.lock | 42 ++++- Cargo.toml | 4 +- config/config.md | 2 + config/metasrv.example.toml | 5 + src/common/event-recorder/Cargo.toml | 2 - src/common/event-recorder/src/error.rs | 34 +++- src/common/event-recorder/src/recorder.rs | 15 +- src/common/meta/src/ddl_manager.rs | 1 + src/common/procedure/Cargo.toml | 2 + src/common/procedure/src/event.rs | 116 +++++++++++++ src/common/procedure/src/lib.rs | 3 + src/common/procedure/src/local.rs | 60 +++++-- src/common/procedure/src/local/runner.rs | 6 + src/common/procedure/src/procedure.rs | 24 +++ src/common/procedure/src/watcher.rs | 2 +- src/frontend/src/instance.rs | 1 + src/meta-srv/Cargo.toml | 1 + src/meta-srv/src/events.rs | 88 ++++++++++ .../src/events/region_migration_event.rs | 159 +++++++++++++++++ src/meta-srv/src/lib.rs | 1 + src/meta-srv/src/metasrv.rs | 4 + src/meta-srv/src/metasrv/builder.rs | 19 ++- .../src/procedure/region_migration.rs | 40 +++-- .../procedure/region_migration/test_util.rs | 1 + .../src/procedure/wal_prune/test_util.rs | 1 + tests-integration/Cargo.toml | 3 + tests-integration/README.md | 2 +- tests-integration/src/cluster.rs | 87 +++++++++- tests-integration/src/grpc/flight.rs | 2 +- tests-integration/src/tests.rs | 2 +- tests-integration/src/tests/test_util.rs | 8 +- tests-integration/tests/region_migration.rs | 161 +++++++++++++++++- 32 files changed, 841 insertions(+), 57 deletions(-) create mode 100644 src/common/procedure/src/event.rs create mode 100644 src/meta-srv/src/events.rs create mode 100644 src/meta-srv/src/events/region_migration_event.rs diff --git a/Cargo.lock b/Cargo.lock index ca03b88277..fd17338f79 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2341,10 +2341,8 @@ dependencies = [ "api", "async-trait", "backon", - "client", "common-error", "common-macro", - "common-meta", "common-telemetry", "common-time", "serde", @@ -2630,11 +2628,13 @@ dependencies = [ name = "common-procedure" version = "0.16.0" dependencies = [ + "api", "async-stream", "async-trait", "backon", "common-base", "common-error", + "common-event-recorder", "common-macro", "common-runtime", "common-telemetry", @@ -6234,6 +6234,17 @@ dependencies = [ "snafu 0.7.5", ] +[[package]] +name = "inherent" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c38228f24186d9cc68c729accb4d413be9eaed6ad07ff79e0270d9e56f3de13" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "inotify" version = "0.11.0" @@ -7370,6 +7381,7 @@ dependencies = [ "common-catalog", "common-config", "common-error", + "common-event-recorder", "common-greptimedb-telemetry", "common-grpc", "common-macro", @@ -11257,6 +11269,30 @@ dependencies = [ "sha2", ] +[[package]] +name = "sea-query" +version = "0.32.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64c91783d1514b99754fc6a4079081dcc2c587dadbff65c48c7f62297443536a" +dependencies = [ + "inherent", + "sea-query-derive", +] + +[[package]] +name = "sea-query-derive" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bae0cbad6ab996955664982739354128c58d16e126114fe88c2a493642502aab" +dependencies = [ + "darling 0.20.10", + "heck 0.4.1", + "proc-macro2", + "quote", + "syn 2.0.100", + "thiserror 2.0.12", +] + [[package]] name = "seahash" version = "4.1.0" @@ -13017,6 +13053,7 @@ dependencies = [ "common-catalog", "common-config", "common-error", + "common-event-recorder", "common-grpc", "common-meta", "common-procedure", @@ -13059,6 +13096,7 @@ dependencies = [ "rand 0.9.0", "rstest", "rstest_reuse", + "sea-query", "serde_json", "servers", "session", diff --git a/Cargo.toml b/Cargo.toml index 4c8ae155ec..cdc1c1ee16 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -195,7 +195,9 @@ rstest = "0.25" rstest_reuse = "0.7" rust_decimal = "1.33" rustc-hash = "2.0" -rustls = { version = "0.23.25", default-features = false } # It is worth noting that we should try to avoid using aws-lc-rs until it can be compiled on various platforms. +# It is worth noting that we should try to avoid using aws-lc-rs until it can be compiled on various platforms. +rustls = { version = "0.23.25", default-features = false } +sea-query = "0.32" serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0", features = ["float_roundtrip"] } serde_with = "3" diff --git a/config/config.md b/config/config.md index cc787ae8f9..8ecde920e8 100644 --- a/config/config.md +++ b/config/config.md @@ -381,6 +381,8 @@ | `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
Only accepts strings that match the following regular expression pattern:
[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*
i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1. | | `wal.replication_factor` | Integer | `1` | Expected number of replicas of each partition. | | `wal.create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled. | +| `event_recorder` | -- | -- | Configuration options for the event recorder. | +| `event_recorder.ttl` | String | `30d` | TTL for the events table that will be used to store the events. | | `logging` | -- | -- | The logging options. | | `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. | | `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. | diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 893c3a7ee8..10f07c138d 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -240,6 +240,11 @@ create_topic_timeout = "30s" # client_cert_path = "/path/to/client_cert" # client_key_path = "/path/to/key" +## Configuration options for the event recorder. +[event_recorder] +## TTL for the events table that will be used to store the events. +ttl = "30d" + ## The logging options. [logging] ## The directory to store the log files. If set to empty, logs will not be written to files. diff --git a/src/common/event-recorder/Cargo.toml b/src/common/event-recorder/Cargo.toml index 173fa70639..28cc9b54de 100644 --- a/src/common/event-recorder/Cargo.toml +++ b/src/common/event-recorder/Cargo.toml @@ -8,10 +8,8 @@ license.workspace = true api.workspace = true async-trait.workspace = true backon.workspace = true -client.workspace = true common-error.workspace = true common-macro.workspace = true -common-meta.workspace = true common-telemetry.workspace = true common-time.workspace = true serde.workspace = true diff --git a/src/common/event-recorder/src/error.rs b/src/common/event-recorder/src/error.rs index 8567a1a46e..01f7327718 100644 --- a/src/common/event-recorder/src/error.rs +++ b/src/common/event-recorder/src/error.rs @@ -13,7 +13,7 @@ // limitations under the License. use api::v1::ColumnSchema; -use common_error::ext::ErrorExt; +use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use snafu::{Location, Snafu}; @@ -35,6 +35,30 @@ pub enum Error { expected: Vec, actual: Vec, }, + + #[snafu(display("Failed to serialize event"))] + SerializeEvent { + #[snafu(source)] + error: serde_json::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to insert events"))] + InsertEvents { + // BoxedError is utilized here to prevent introducing a circular dependency that would arise from directly referencing `client::error::Error`. + source: BoxedError, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Keyvalue backend error"))] + KvBackend { + // BoxedError is utilized here to prevent introducing a circular dependency that would arise from directly referencing `common_meta::error::Error`. + source: BoxedError, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -42,8 +66,12 @@ pub type Result = std::result::Result; impl ErrorExt for Error { fn status_code(&self) -> StatusCode { match self { - Error::MismatchedSchema { .. } => StatusCode::InvalidArguments, - Error::NoAvailableFrontend { .. } => StatusCode::Internal, + Error::MismatchedSchema { .. } | Error::SerializeEvent { .. } => { + StatusCode::InvalidArguments + } + Error::NoAvailableFrontend { .. } + | Error::InsertEvents { .. } + | Error::KvBackend { .. } => StatusCode::Internal, } } diff --git a/src/common/event-recorder/src/recorder.rs b/src/common/event-recorder/src/recorder.rs index 3c887aa455..781fbbbf6d 100644 --- a/src/common/event-recorder/src/recorder.rs +++ b/src/common/event-recorder/src/recorder.rs @@ -81,7 +81,9 @@ pub trait Event: Send + Sync + Debug { } /// Returns the JSON bytes of the event as the payload. It will use JSON type to store the payload. - fn json_payload(&self) -> Result; + fn json_payload(&self) -> Result { + Ok("".to_string()) + } /// Add the extra schema to the event with the default schema. fn extra_schema(&self) -> Vec { @@ -97,6 +99,14 @@ pub trait Event: Send + Sync + Debug { fn as_any(&self) -> &dyn Any; } +/// Eventable trait defines the interface for objects that can be converted to [Event]. +pub trait Eventable: Send + Sync + Debug { + /// Converts the object to an [Event]. + fn to_event(&self) -> Option> { + None + } +} + /// Returns the hints for the insert operation. pub fn insert_hints() -> Vec<(&'static str, &'static str)> { vec![ @@ -199,7 +209,7 @@ fn validate_events(events: &[&Box]) -> Result<()> { } /// EventRecorder trait defines the interface for recording events. -pub trait EventRecorder: Send + Sync + 'static { +pub trait EventRecorder: Send + Sync + Debug + 'static { /// Records an event for persistence and processing by [EventHandler]. fn record(&self, event: Box); @@ -231,6 +241,7 @@ impl Default for EventRecorderOptions { } /// Implementation of [EventRecorder] that records the events and processes them in the background by the [EventHandler]. +#[derive(Debug)] pub struct EventRecorderImpl { // The channel to send the events to the background processor. tx: Sender>, diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 1750161d34..372903975f 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -953,6 +953,7 @@ mod tests { state_store, poison_manager, None, + None, )); let _ = DdlManager::try_new( diff --git a/src/common/procedure/Cargo.toml b/src/common/procedure/Cargo.toml index 99ed2e5617..37e5f3669f 100644 --- a/src/common/procedure/Cargo.toml +++ b/src/common/procedure/Cargo.toml @@ -11,11 +11,13 @@ testing = [] workspace = true [dependencies] +api.workspace = true async-stream.workspace = true async-trait.workspace = true backon.workspace = true common-base.workspace = true common-error.workspace = true +common-event-recorder.workspace = true common-macro.workspace = true common-runtime.workspace = true common-telemetry.workspace = true diff --git a/src/common/procedure/src/event.rs b/src/common/procedure/src/event.rs new file mode 100644 index 0000000000..022805f7bd --- /dev/null +++ b/src/common/procedure/src/event.rs @@ -0,0 +1,116 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use api::v1::value::ValueData; +use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType}; +use common_event_recorder::error::Result; +use common_event_recorder::Event; +use common_time::timestamp::{TimeUnit, Timestamp}; + +use crate::{ProcedureId, ProcedureState}; + +pub const EVENTS_TABLE_PROCEDURE_ID_COLUMN_NAME: &str = "procedure_id"; +pub const EVENTS_TABLE_PROCEDURE_STATE_COLUMN_NAME: &str = "procedure_state"; +pub const EVENTS_TABLE_PROCEDURE_ERROR_COLUMN_NAME: &str = "procedure_error"; + +/// `ProcedureEvent` represents an event emitted by a procedure during its execution lifecycle. +#[derive(Debug)] +pub struct ProcedureEvent { + /// Unique identifier associated with the originating procedure instance. + pub procedure_id: ProcedureId, + /// The timestamp of the event. + pub timestamp: Timestamp, + /// The state of the procedure. + pub state: ProcedureState, + /// The event emitted by the procedure. It's generated by [Procedure::event]. + pub internal_event: Box, +} + +impl ProcedureEvent { + pub fn new( + procedure_id: ProcedureId, + internal_event: Box, + state: ProcedureState, + ) -> Self { + Self { + procedure_id, + internal_event, + timestamp: Timestamp::current_time(TimeUnit::Nanosecond), + state, + } + } +} + +impl Event for ProcedureEvent { + fn event_type(&self) -> &str { + self.internal_event.event_type() + } + + fn timestamp(&self) -> Timestamp { + self.timestamp + } + + fn json_payload(&self) -> Result { + self.internal_event.json_payload() + } + + fn extra_schema(&self) -> Vec { + let mut schema = vec![ + ColumnSchema { + column_name: EVENTS_TABLE_PROCEDURE_ID_COLUMN_NAME.to_string(), + datatype: ColumnDataType::String.into(), + semantic_type: SemanticType::Field.into(), + ..Default::default() + }, + ColumnSchema { + column_name: EVENTS_TABLE_PROCEDURE_STATE_COLUMN_NAME.to_string(), + datatype: ColumnDataType::String.into(), + semantic_type: SemanticType::Field.into(), + ..Default::default() + }, + ColumnSchema { + column_name: EVENTS_TABLE_PROCEDURE_ERROR_COLUMN_NAME.to_string(), + datatype: ColumnDataType::String.into(), + semantic_type: SemanticType::Field.into(), + ..Default::default() + }, + ]; + schema.append(&mut self.internal_event.extra_schema()); + schema + } + + fn extra_row(&self) -> Result { + let error_str = match &self.state { + ProcedureState::Failed { error } => format!("{:?}", error), + ProcedureState::PrepareRollback { error } => format!("{:?}", error), + ProcedureState::RollingBack { error } => format!("{:?}", error), + ProcedureState::Retrying { error } => format!("{:?}", error), + ProcedureState::Poisoned { error, .. } => format!("{:?}", error), + _ => "".to_string(), + }; + let mut row = vec![ + ValueData::StringValue(self.procedure_id.to_string()).into(), + ValueData::StringValue(self.state.as_str_name().to_string()).into(), + ValueData::StringValue(error_str).into(), + ]; + row.append(&mut self.internal_event.extra_row()?.values); + Ok(Row { values: row }) + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/common/procedure/src/lib.rs b/src/common/procedure/src/lib.rs index 2cd7c39d0c..e322765558 100644 --- a/src/common/procedure/src/lib.rs +++ b/src/common/procedure/src/lib.rs @@ -17,6 +17,7 @@ #![feature(assert_matches)] pub mod error; +pub mod event; pub mod local; pub mod options; mod procedure; @@ -28,9 +29,11 @@ pub mod watcher; pub mod test_util; pub use crate::error::{Error, Result}; +pub use crate::event::ProcedureEvent; pub use crate::procedure::{ BoxedProcedure, BoxedProcedureLoader, Context, ContextProvider, ContextProviderRef, LockKey, Output, ParseIdError, PoisonKey, PoisonKeys, Procedure, ProcedureId, ProcedureInfo, ProcedureManager, ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, StringKey, + UserMetadata, }; pub use crate::watcher::Watcher; diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index 9b80884436..2730f52630 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -23,6 +23,7 @@ use std::time::{Duration, Instant}; use async_trait::async_trait; use backon::ExponentialBuilder; use common_error::ext::BoxedError; +use common_event_recorder::EventRecorderRef; use common_runtime::{RepeatedTask, TaskFunction}; use common_telemetry::tracing_context::{FutureExt, TracingContext}; use common_telemetry::{error, info, tracing}; @@ -36,6 +37,7 @@ use crate::error::{ Result, StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu, TooManyRunningProceduresSnafu, }; +use crate::event::ProcedureEvent; use crate::local::runner::Runner; use crate::procedure::{BoxedProcedureLoader, InitProcedureState, PoisonKeys, ProcedureInfo}; use crate::rwlock::{KeyRwLock, OwnedKeyRwLockGuard}; @@ -43,7 +45,7 @@ use crate::store::poison_store::PoisonStoreRef; use crate::store::{ProcedureMessage, ProcedureMessages, ProcedureStore, StateStoreRef}; use crate::{ BoxedProcedure, ContextProvider, LockKey, PoisonKey, ProcedureId, ProcedureManager, - ProcedureState, ProcedureWithId, StringKey, Watcher, + ProcedureState, ProcedureWithId, StringKey, UserMetadata, Watcher, }; /// The expired time of a procedure's metadata. @@ -81,9 +83,14 @@ pub(crate) struct ProcedureMeta { start_time_ms: AtomicI64, /// End execution time of this procedure. end_time_ms: AtomicI64, + /// Event recorder. + event_recorder: Option, + /// The user metadata of the procedure. It's generated by [Procedure::user_metadata]. + user_metadata: Option, } impl ProcedureMeta { + #[allow(clippy::too_many_arguments)] fn new( id: ProcedureId, procedure_state: ProcedureState, @@ -91,6 +98,8 @@ impl ProcedureMeta { lock_key: LockKey, poison_keys: PoisonKeys, type_name: &str, + event_recorder: Option, + user_metadata: Option, ) -> ProcedureMeta { let (state_sender, state_receiver) = watch::channel(procedure_state); ProcedureMeta { @@ -105,6 +114,8 @@ impl ProcedureMeta { start_time_ms: AtomicI64::new(0), end_time_ms: AtomicI64::new(0), type_name: type_name.to_string(), + event_recorder, + user_metadata, } } @@ -115,6 +126,15 @@ impl ProcedureMeta { /// Update current [ProcedureState]. fn set_state(&self, state: ProcedureState) { + // Emit the event to the event recorder if the user metadata contains the eventable object. + if let (Some(event_recorder), Some(user_metadata)) = + (&self.event_recorder, &self.user_metadata) + { + if let Some(event) = user_metadata.to_event() { + event_recorder.record(Box::new(ProcedureEvent::new(self.id, event, state.clone()))); + } + } + // Safety: ProcedureMeta also holds the receiver, so `send()` should never fail. self.state_sender.send(state).unwrap(); } @@ -557,6 +577,7 @@ pub struct LocalManager { remove_outdated_meta_task: TokioMutex>>, config: ManagerConfig, pause_aware: Option, + event_recorder: Option, } impl LocalManager { @@ -566,6 +587,7 @@ impl LocalManager { state_store: StateStoreRef, poison_store: PoisonStoreRef, pause_aware: Option, + event_recorder: Option, ) -> LocalManager { let manager_ctx = Arc::new(ManagerContext::new(poison_store)); @@ -577,6 +599,7 @@ impl LocalManager { remove_outdated_meta_task: TokioMutex::new(None), config, pause_aware, + event_recorder, } } @@ -601,6 +624,7 @@ impl LocalManager { ) -> Result { ensure!(self.manager_ctx.running(), ManagerNotStartSnafu); + let user_metadata = procedure.user_metadata(); let meta = Arc::new(ProcedureMeta::new( procedure_id, procedure_state, @@ -608,6 +632,8 @@ impl LocalManager { procedure.lock_key(), procedure.poison_keys(), procedure.type_name(), + self.event_recorder.clone(), + user_metadata.clone(), )); let runner = Runner { meta: meta.clone(), @@ -619,8 +645,20 @@ impl LocalManager { .with_max_times(self.max_retry_times), store: self.procedure_store.clone(), rolling_back: false, + event_recorder: self.event_recorder.clone(), }; + if let (Some(event_recorder), Some(event)) = ( + self.event_recorder.as_ref(), + user_metadata.and_then(|m| m.to_event()), + ) { + event_recorder.record(Box::new(ProcedureEvent::new( + procedure_id, + event, + ProcedureState::Running, + ))); + } + let watcher = meta.state_receiver.clone(); ensure!( @@ -870,6 +908,8 @@ pub(crate) mod test_util { LockKey::default(), PoisonKeys::default(), "ProcedureAdapter", + None, + None, ) } @@ -1019,7 +1059,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); let poison_manager = Arc::new(InMemoryPoisonStore::new()); - let manager = LocalManager::new(config, state_store, poison_manager, None); + let manager = LocalManager::new(config, state_store, poison_manager, None, None); manager.manager_ctx.start(); manager @@ -1044,7 +1084,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(object_store.clone())); let poison_manager = Arc::new(InMemoryPoisonStore::new()); - let manager = LocalManager::new(config, state_store, poison_manager, None); + let manager = LocalManager::new(config, state_store, poison_manager, None, None); manager.manager_ctx.start(); manager @@ -1098,7 +1138,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); let poison_manager = Arc::new(InMemoryPoisonStore::new()); - let manager = LocalManager::new(config, state_store, poison_manager, None); + let manager = LocalManager::new(config, state_store, poison_manager, None, None); manager.manager_ctx.start(); let procedure_id = ProcedureId::random(); @@ -1150,7 +1190,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); let poison_manager = Arc::new(InMemoryPoisonStore::new()); - let manager = LocalManager::new(config, state_store, poison_manager, None); + let manager = LocalManager::new(config, state_store, poison_manager, None, None); manager.manager_ctx.start(); #[derive(Debug)] @@ -1231,7 +1271,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); let poison_manager = Arc::new(InMemoryPoisonStore::new()); - let manager = LocalManager::new(config, state_store, poison_manager, None); + let manager = LocalManager::new(config, state_store, poison_manager, None, None); let mut procedure = ProcedureToLoad::new("submit"); procedure.lock_key = LockKey::single_exclusive("test.submit"); @@ -1259,7 +1299,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); let poison_manager = Arc::new(InMemoryPoisonStore::new()); - let manager = LocalManager::new(config, state_store, poison_manager, None); + let manager = LocalManager::new(config, state_store, poison_manager, None, None); manager.start().await.unwrap(); manager.stop().await.unwrap(); @@ -1296,7 +1336,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(object_store.clone())); let poison_manager = Arc::new(InMemoryPoisonStore::new()); - let manager = LocalManager::new(config, state_store, poison_manager, None); + let manager = LocalManager::new(config, state_store, poison_manager, None, None); manager.manager_ctx.set_running(); let mut procedure = ProcedureToLoad::new("submit"); @@ -1378,7 +1418,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); let poison_manager = Arc::new(InMemoryPoisonStore::new()); - let manager = LocalManager::new(config, state_store, poison_manager, None); + let manager = LocalManager::new(config, state_store, poison_manager, None, None); manager.manager_ctx.set_running(); manager @@ -1503,7 +1543,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(object_store.clone())); let poison_manager = Arc::new(InMemoryPoisonStore::new()); - let manager = LocalManager::new(config, state_store, poison_manager, None); + let manager = LocalManager::new(config, state_store, poison_manager, None, None); manager.manager_ctx.start(); let notify = Arc::new(Notify::new()); diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index aacb61f6e1..56e7e47d9a 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use std::time::Duration; use backon::{BackoffBuilder, ExponentialBuilder}; +use common_event_recorder::EventRecorderRef; use common_telemetry::{debug, error, info}; use rand::Rng; use snafu::ResultExt; @@ -96,6 +97,7 @@ pub(crate) struct Runner { pub(crate) exponential_builder: ExponentialBuilder, pub(crate) store: Arc, pub(crate) rolling_back: bool, + pub(crate) event_recorder: Option, } impl Runner { @@ -425,6 +427,8 @@ impl Runner { procedure.lock_key(), procedure.poison_keys(), procedure.type_name(), + self.event_recorder.clone(), + procedure.user_metadata(), )); let runner = Runner { meta: meta.clone(), @@ -434,6 +438,7 @@ impl Runner { exponential_builder: self.exponential_builder, store: self.store.clone(), rolling_back: false, + event_recorder: self.event_recorder.clone(), }; // Insert the procedure. We already check the procedure existence before inserting @@ -627,6 +632,7 @@ mod tests { exponential_builder: ExponentialBuilder::default(), store, rolling_back: false, + event_recorder: None, } } diff --git a/src/common/procedure/src/procedure.rs b/src/common/procedure/src/procedure.rs index 0b5746d268..995ff68d1d 100644 --- a/src/common/procedure/src/procedure.rs +++ b/src/common/procedure/src/procedure.rs @@ -19,6 +19,7 @@ use std::str::FromStr; use std::sync::Arc; use async_trait::async_trait; +use common_event_recorder::{Event, Eventable}; use serde::{Deserialize, Serialize}; use smallvec::{smallvec, SmallVec}; use snafu::{ResultExt, Snafu}; @@ -214,6 +215,29 @@ pub trait Procedure: Send { fn poison_keys(&self) -> PoisonKeys { PoisonKeys::default() } + + /// Returns the user metadata of the procedure. If the metadata contains the eventable object, you can use [UserMetadata::to_event] to get the event and emit it to the event recorder. + fn user_metadata(&self) -> Option { + None + } +} + +/// The user metadata injected by the procedure caller. It can be used to emit events to the event recorder. +#[derive(Clone, Debug)] +pub struct UserMetadata { + event_object: Arc, +} + +impl UserMetadata { + /// Creates a new [UserMetadata] with the given event object. + pub fn new(event_object: Arc) -> Self { + Self { event_object } + } + + /// Returns the event of the procedure. It can be None if the procedure does not emit any event. + pub fn to_event(&self) -> Option> { + self.event_object.to_event() + } } #[async_trait] diff --git a/src/common/procedure/src/watcher.rs b/src/common/procedure/src/watcher.rs index 48d4c8559d..ea351936ef 100644 --- a/src/common/procedure/src/watcher.rs +++ b/src/common/procedure/src/watcher.rs @@ -83,7 +83,7 @@ mod tests { }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); let poison_manager = Arc::new(InMemoryPoisonStore::default()); - let manager = LocalManager::new(config, state_store, poison_manager, None); + let manager = LocalManager::new(config, state_store, poison_manager, None, None); manager.start().await.unwrap(); #[derive(Debug)] diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 27fd4144b4..570c384922 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -159,6 +159,7 @@ impl Instance { kv_state_store.clone(), kv_state_store, Some(runtime_switch_manager), + None, )); Ok((kv_backend, procedure_manager)) diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index f8b9dcfdc2..85f1be2af8 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -31,6 +31,7 @@ common-base.workspace = true common-catalog.workspace = true common-config.workspace = true common-error.workspace = true +common-event-recorder.workspace = true common-greptimedb-telemetry.workspace = true common-grpc.workspace = true common-macro.workspace = true diff --git a/src/meta-srv/src/events.rs b/src/meta-srv/src/events.rs new file mode 100644 index 0000000000..666432d5f3 --- /dev/null +++ b/src/meta-srv/src/events.rs @@ -0,0 +1,88 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use client::{Client, Database}; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME}; +use common_error::ext::BoxedError; +use common_event_recorder::error::{ + InsertEventsSnafu, KvBackendSnafu, NoAvailableFrontendSnafu, Result, +}; +use common_event_recorder::{build_row_inserts_request, insert_hints, Event, EventHandler}; +use common_grpc::channel_manager::ChannelManager; +use common_meta::peer::PeerLookupServiceRef; +use common_telemetry::debug; +use snafu::{ensure, ResultExt}; + +use crate::cluster::MetaPeerClientRef; +use crate::lease::MetaPeerLookupService; + +pub mod region_migration_event; + +/// EventHandlerImpl is the default event handler implementation in metasrv. It sends the received events to the frontend instances. +pub struct EventHandlerImpl { + peer_lookup_service: PeerLookupServiceRef, + channel_manager: ChannelManager, +} + +impl EventHandlerImpl { + pub fn new(meta_peer_client: MetaPeerClientRef) -> Self { + Self { + peer_lookup_service: Arc::new(MetaPeerLookupService::new(meta_peer_client)), + channel_manager: ChannelManager::new(), + } + } +} + +#[async_trait] +impl EventHandler for EventHandlerImpl { + async fn handle(&self, events: &[Box]) -> Result<()> { + self.build_database_client() + .await? + .row_inserts_with_hints(build_row_inserts_request(events)?, &insert_hints()) + .await + .map_err(BoxedError::new) + .context(InsertEventsSnafu)?; + + Ok(()) + } +} + +impl EventHandlerImpl { + async fn build_database_client(&self) -> Result { + let frontends = self + .peer_lookup_service + .active_frontends() + .await + .map_err(BoxedError::new) + .context(KvBackendSnafu)?; + + ensure!(!frontends.is_empty(), NoAvailableFrontendSnafu); + + let urls = frontends + .into_iter() + .map(|peer| peer.addr) + .collect::>(); + + debug!("Available frontend addresses: {:?}", urls); + + Ok(Database::new( + DEFAULT_CATALOG_NAME, + DEFAULT_PRIVATE_SCHEMA_NAME, + Client::with_manager_and_urls(self.channel_manager.clone(), urls), + )) + } +} diff --git a/src/meta-srv/src/events/region_migration_event.rs b/src/meta-srv/src/events/region_migration_event.rs new file mode 100644 index 0000000000..1c530b281a --- /dev/null +++ b/src/meta-srv/src/events/region_migration_event.rs @@ -0,0 +1,159 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::time::Duration; + +use api::v1::value::ValueData; +use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType}; +use common_event_recorder::error::{Result, SerializeEventSnafu}; +use common_event_recorder::Event; +use serde::Serialize; +use snafu::ResultExt; +use store_api::storage::{RegionId, TableId}; + +use crate::procedure::region_migration::{PersistentContext, RegionMigrationTriggerReason}; + +pub const REGION_MIGRATION_EVENT_TYPE: &str = "region_migration"; +pub const EVENTS_TABLE_REGION_ID_COLUMN_NAME: &str = "region_id"; +pub const EVENTS_TABLE_TABLE_ID_COLUMN_NAME: &str = "table_id"; +pub const EVENTS_TABLE_REGION_NUMBER_COLUMN_NAME: &str = "region_number"; +pub const EVENTS_TABLE_REGION_MIGRATION_TRIGGER_REASON_COLUMN_NAME: &str = + "region_migration_trigger_reason"; +pub const EVENTS_TABLE_SRC_NODE_ID_COLUMN_NAME: &str = "region_migration_src_node_id"; +pub const EVENTS_TABLE_SRC_PEER_ADDR_COLUMN_NAME: &str = "region_migration_src_peer_addr"; +pub const EVENTS_TABLE_DST_NODE_ID_COLUMN_NAME: &str = "region_migration_dst_node_id"; +pub const EVENTS_TABLE_DST_PEER_ADDR_COLUMN_NAME: &str = "region_migration_dst_peer_addr"; + +/// RegionMigrationEvent is the event of region migration. +#[derive(Debug, Serialize)] +pub(crate) struct RegionMigrationEvent { + #[serde(skip)] + region_id: RegionId, + #[serde(skip)] + table_id: TableId, + #[serde(skip)] + region_number: u32, + #[serde(skip)] + trigger_reason: RegionMigrationTriggerReason, + #[serde(skip)] + src_node_id: u64, + #[serde(skip)] + src_peer_addr: String, + #[serde(skip)] + dst_node_id: u64, + #[serde(skip)] + dst_peer_addr: String, + + // The following fields will be serialized as the json payload. + timeout: Duration, +} + +impl RegionMigrationEvent { + pub fn from_persistent_ctx(ctx: &PersistentContext) -> Self { + Self { + region_id: ctx.region_id, + table_id: ctx.region_id.table_id(), + region_number: ctx.region_id.region_number(), + trigger_reason: ctx.trigger_reason, + src_node_id: ctx.from_peer.id, + src_peer_addr: ctx.from_peer.addr.clone(), + dst_node_id: ctx.to_peer.id, + dst_peer_addr: ctx.to_peer.addr.clone(), + timeout: ctx.timeout, + } + } +} + +impl Event for RegionMigrationEvent { + fn event_type(&self) -> &str { + REGION_MIGRATION_EVENT_TYPE + } + + fn extra_schema(&self) -> Vec { + vec![ + ColumnSchema { + column_name: EVENTS_TABLE_REGION_ID_COLUMN_NAME.to_string(), + datatype: ColumnDataType::Uint64.into(), + semantic_type: SemanticType::Field.into(), + ..Default::default() + }, + ColumnSchema { + column_name: EVENTS_TABLE_TABLE_ID_COLUMN_NAME.to_string(), + datatype: ColumnDataType::Uint32.into(), + semantic_type: SemanticType::Field.into(), + ..Default::default() + }, + ColumnSchema { + column_name: EVENTS_TABLE_REGION_NUMBER_COLUMN_NAME.to_string(), + datatype: ColumnDataType::Uint32.into(), + semantic_type: SemanticType::Field.into(), + ..Default::default() + }, + ColumnSchema { + column_name: EVENTS_TABLE_REGION_MIGRATION_TRIGGER_REASON_COLUMN_NAME.to_string(), + datatype: ColumnDataType::String.into(), + semantic_type: SemanticType::Field.into(), + ..Default::default() + }, + ColumnSchema { + column_name: EVENTS_TABLE_SRC_NODE_ID_COLUMN_NAME.to_string(), + datatype: ColumnDataType::Uint64.into(), + semantic_type: SemanticType::Field.into(), + ..Default::default() + }, + ColumnSchema { + column_name: EVENTS_TABLE_SRC_PEER_ADDR_COLUMN_NAME.to_string(), + datatype: ColumnDataType::String.into(), + semantic_type: SemanticType::Field.into(), + ..Default::default() + }, + ColumnSchema { + column_name: EVENTS_TABLE_DST_NODE_ID_COLUMN_NAME.to_string(), + datatype: ColumnDataType::Uint64.into(), + semantic_type: SemanticType::Field.into(), + ..Default::default() + }, + ColumnSchema { + column_name: EVENTS_TABLE_DST_PEER_ADDR_COLUMN_NAME.to_string(), + datatype: ColumnDataType::String.into(), + semantic_type: SemanticType::Field.into(), + ..Default::default() + }, + ] + } + + fn extra_row(&self) -> Result { + Ok(Row { + values: vec![ + ValueData::U64Value(self.region_id.as_u64()).into(), + ValueData::U32Value(self.table_id).into(), + ValueData::U32Value(self.region_number).into(), + ValueData::StringValue(self.trigger_reason.to_string()).into(), + ValueData::U64Value(self.src_node_id).into(), + ValueData::StringValue(self.src_peer_addr.to_string()).into(), + ValueData::U64Value(self.dst_node_id).into(), + ValueData::StringValue(self.dst_peer_addr.to_string()).into(), + ], + }) + } + + fn json_payload(&self) -> Result { + serde_json::to_string(self).context(SerializeEventSnafu) + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index f0508c6bbe..678971556d 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -22,6 +22,7 @@ pub mod cache_invalidator; pub mod cluster; pub mod election; pub mod error; +pub mod events; mod failure_detector; pub mod flow_meta_alloc; pub mod handler; diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 88d5af5c36..a62d875954 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -23,6 +23,7 @@ use clap::ValueEnum; use common_base::readable_size::ReadableSize; use common_base::Plugins; use common_config::{Configurable, DEFAULT_DATA_HOME}; +use common_event_recorder::EventRecorderOptions; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::ddl_manager::DdlManagerRef; @@ -179,6 +180,8 @@ pub struct MetasrvOptions { pub meta_election_lock_id: u64, #[serde(with = "humantime_serde")] pub node_max_idle_time: Duration, + /// The event recorder options. + pub event_recorder: EventRecorderOptions, } impl fmt::Debug for MetasrvOptions { @@ -269,6 +272,7 @@ impl Default for MetasrvOptions { #[cfg(feature = "pg_kvbackend")] meta_election_lock_id: common_meta::kv_backend::DEFAULT_META_ELECTION_LOCK_ID, node_max_idle_time: Duration::from_secs(24 * 60 * 60), + event_recorder: EventRecorderOptions::default(), } } } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index c014dec6ed..3da28f4dab 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -19,6 +19,7 @@ use std::sync::{Arc, Mutex, RwLock}; use client::client_manager::NodeClients; use common_base::Plugins; use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID}; +use common_event_recorder::{EventRecorderImpl, EventRecorderRef}; use common_grpc::channel_manager::ChannelConfig; use common_meta::ddl::flow_meta::FlowMetadataAllocator; use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef}; @@ -48,6 +49,7 @@ use snafu::{ensure, ResultExt}; use crate::cache_invalidator::MetasrvCacheInvalidator; use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef}; use crate::error::{self, BuildWalOptionsAllocatorSnafu, Result}; +use crate::events::EventHandlerImpl; use crate::flow_meta_alloc::FlowPeerAllocator; use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; use crate::handler::failure_handler::RegionFailureHandler; @@ -191,12 +193,23 @@ impl MetasrvBuilder { let meta_peer_client = meta_peer_client .unwrap_or_else(|| build_default_meta_peer_client(&election, &in_memory)); + + // Builds the event recorder to record important events and persist them as the system table. + let event_recorder = Arc::new(EventRecorderImpl::new( + Box::new(EventHandlerImpl::new(meta_peer_client.clone())), + options.event_recorder.clone(), + )); + let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector::default())); let pushers = Pushers::default(); let mailbox = build_mailbox(&kv_backend, &pushers); let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend.clone())); - let procedure_manager = - build_procedure_manager(&options, &kv_backend, &runtime_switch_manager); + let procedure_manager = build_procedure_manager( + &options, + &kv_backend, + &runtime_switch_manager, + event_recorder, + ); let table_metadata_manager = Arc::new(TableMetadataManager::new( leader_cached_kv_backend.clone() as _, @@ -524,6 +537,7 @@ fn build_procedure_manager( options: &MetasrvOptions, kv_backend: &KvBackendRef, runtime_switch_manager: &RuntimeSwitchManagerRef, + event_recorder: EventRecorderRef, ) -> ProcedureManagerRef { let manager_config = ManagerConfig { max_retry_times: options.procedure.max_retry_times, @@ -545,6 +559,7 @@ fn build_procedure_manager( kv_state_store.clone(), kv_state_store, Some(runtime_switch_manager.clone()), + Some(event_recorder), )) } diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 7c8152903e..ac52bc8280 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -27,9 +27,11 @@ pub(crate) mod upgrade_candidate_region; use std::any::Any; use std::fmt::{Debug, Display}; +use std::sync::Arc; use std::time::Duration; use common_error::ext::BoxedError; +use common_event_recorder::{Event, Eventable}; use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::ddl::RegionFailureDetectorControllerRef; use common_meta::instruction::CacheIdent; @@ -44,7 +46,9 @@ use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard}; use common_procedure::error::{ Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu, }; -use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status, StringKey}; +use common_procedure::{ + Context as ProcedureContext, LockKey, Procedure, Status, StringKey, UserMetadata, +}; use common_telemetry::{error, info}; use manager::RegionMigrationProcedureGuard; pub use manager::{ @@ -58,6 +62,7 @@ use tokio::time::Instant; use self::migration_start::RegionMigrationStart; use crate::error::{self, Result}; +use crate::events::region_migration_event::RegionMigrationEvent; use crate::metrics::{ METRIC_META_REGION_MIGRATION_ERROR, METRIC_META_REGION_MIGRATION_EXECUTE, METRIC_META_REGION_MIGRATION_STAGE_ELAPSED, @@ -75,21 +80,21 @@ pub const DEFAULT_REGION_MIGRATION_TIMEOUT: Duration = Duration::from_secs(120); #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct PersistentContext { /// The table catalog. - catalog: String, + pub(crate) catalog: String, /// The table schema. - schema: String, + pub(crate) schema: String, /// The [Peer] of migration source. - from_peer: Peer, + pub(crate) from_peer: Peer, /// The [Peer] of migration destination. - to_peer: Peer, + pub(crate) to_peer: Peer, /// The [RegionId] of migration region. - region_id: RegionId, + pub(crate) region_id: RegionId, /// The timeout for downgrading leader region and upgrading candidate region operations. #[serde(with = "humantime_serde", default = "default_timeout")] - timeout: Duration, + pub(crate) timeout: Duration, /// The trigger reason of region migration. #[serde(default)] - trigger_reason: RegionMigrationTriggerReason, + pub(crate) trigger_reason: RegionMigrationTriggerReason, } fn default_timeout() -> Duration { @@ -109,6 +114,12 @@ impl PersistentContext { } } +impl Eventable for PersistentContext { + fn to_event(&self) -> Option> { + Some(Box::new(RegionMigrationEvent::from_persistent_ctx(self))) + } +} + /// Metrics of region migration. #[derive(Debug, Clone, Default)] pub struct Metrics { @@ -307,7 +318,7 @@ impl DefaultContextFactory { impl ContextFactory for DefaultContextFactory { fn new_context(self, persistent_ctx: PersistentContext) -> Context { Context { - persistent_ctx, + persistent_ctx: Arc::new(persistent_ctx), volatile_ctx: self.volatile_ctx, in_memory: self.in_memory_key, table_metadata_manager: self.table_metadata_manager, @@ -322,7 +333,7 @@ impl ContextFactory for DefaultContextFactory { /// The context of procedure execution. pub struct Context { - persistent_ctx: PersistentContext, + persistent_ctx: Arc, volatile_ctx: VolatileContext, in_memory: ResettableKvBackendRef, table_metadata_manager: TableMetadataManagerRef, @@ -539,6 +550,11 @@ impl Context { .await; Ok(()) } + + /// Returns the [PersistentContext] of the procedure. + pub fn persistent_ctx(&self) -> Arc { + self.persistent_ctx.clone() + } } #[async_trait::async_trait] @@ -743,6 +759,10 @@ impl Procedure for RegionMigrationProcedure { fn lock_key(&self) -> LockKey { LockKey::new(self.context.persistent_ctx.lock_key()) } + + fn user_metadata(&self) -> Option { + Some(UserMetadata::new(self.context.persistent_ctx())) + } } #[cfg(test)] diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index 896a236564..9895723b07 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -95,6 +95,7 @@ impl TestingEnv { state_store, poison_manager, None, + None, )); Self { diff --git a/src/meta-srv/src/procedure/wal_prune/test_util.rs b/src/meta-srv/src/procedure/wal_prune/test_util.rs index 53436d7bef..6a79ef03f8 100644 --- a/src/meta-srv/src/procedure/wal_prune/test_util.rs +++ b/src/meta-srv/src/procedure/wal_prune/test_util.rs @@ -53,6 +53,7 @@ impl TestEnv { state_store, poison_manager, None, + None, )); let mailbox_ctx = MailboxContext::new(mailbox_sequence); diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index a9cabe1eb2..9b91a4ffd9 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -27,6 +27,7 @@ common-base.workspace = true common-catalog.workspace = true common-config.workspace = true common-error.workspace = true +common-event-recorder.workspace = true common-grpc.workspace = true common-meta = { workspace = true, features = ["testing"] } common-procedure.workspace = true @@ -59,8 +60,10 @@ object-store.workspace = true operator = { workspace = true, features = ["testing"] } prost.workspace = true query.workspace = true +rand.workspace = true rstest.workspace = true rstest_reuse.workspace = true +sea-query.workspace = true serde_json.workspace = true servers = { workspace = true, features = ["testing"] } session.workspace = true diff --git a/tests-integration/README.md b/tests-integration/README.md index 9934072538..ea68e16969 100644 --- a/tests-integration/README.md +++ b/tests-integration/README.md @@ -52,5 +52,5 @@ GT_KAFKA_ENDPOINTS = localhost:9092 ``` cd tests-integration/fixtures -docker compose -f docker-compose-standalone.yml up kafka +docker compose -f docker-compose.yml up kafka ``` diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 2b82952f18..c5018d137f 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -14,6 +14,8 @@ use std::collections::HashMap; use std::env; +use std::net::TcpListener; +use std::ops::RangeInclusive; use std::sync::Arc; use std::time::Duration; @@ -50,12 +52,14 @@ use frontend::frontend::{Frontend, FrontendOptions}; use frontend::heartbeat::HeartbeatTask; use frontend::instance::builder::FrontendBuilder; use frontend::instance::Instance as FeInstance; +use frontend::server::Services; use hyper_util::rt::TokioIo; use meta_client::client::MetaClientBuilder; use meta_srv::cluster::MetaPeerClientRef; use meta_srv::metasrv::{Metasrv, MetasrvOptions, SelectorRef}; use meta_srv::mocks::MockInfo; use object_store::config::ObjectStoreConfig; +use rand::Rng; use servers::grpc::flight::FlightCraftWrapper; use servers::grpc::region_server::RegionServerRequestHandler; use servers::grpc::GrpcOptions; @@ -177,6 +181,7 @@ impl GreptimeDbClusterBuilder { pub async fn build_with( &self, datanode_options: Vec, + start_frontend_servers: bool, guards: Vec, ) -> GreptimeDbCluster { let datanodes = datanode_options.len(); @@ -218,7 +223,9 @@ impl GreptimeDbClusterBuilder { self.wait_datanodes_alive(metasrv.metasrv.meta_peer_client(), datanodes) .await; - let mut frontend = self.build_frontend(metasrv.clone(), datanode_clients).await; + let mut frontend = self + .build_frontend(metasrv.clone(), datanode_clients, start_frontend_servers) + .await; test_util::prepare_another_catalog_and_schema(&frontend.instance).await; @@ -234,10 +241,11 @@ impl GreptimeDbClusterBuilder { } } - pub async fn build(&self) -> GreptimeDbCluster { + pub async fn build(&self, start_frontend_servers: bool) -> GreptimeDbCluster { let datanodes = self.datanodes.unwrap_or(4); let (datanode_options, guards) = self.build_datanode_options_and_guards(datanodes).await; - self.build_with(datanode_options, guards).await + self.build_with(datanode_options, start_frontend_servers, guards) + .await } async fn build_datanode_options_and_guards( @@ -352,6 +360,7 @@ impl GreptimeDbClusterBuilder { &self, metasrv: MockInfo, datanode_clients: Arc, + start_frontend_servers: bool, ) -> Frontend { let mut meta_client = MetaClientBuilder::frontend_default_options() .channel_manager(metasrv.channel_manager) @@ -393,37 +402,97 @@ impl GreptimeDbClusterBuilder { Arc::new(InvalidateCacheHandler::new(cache_registry.clone())), ]); - let options = FrontendOptions::default(); + let fe_opts = self.build_frontend_options(); + let heartbeat_task = HeartbeatTask::new( - &options, + &fe_opts, meta_client.clone(), HeartbeatOptions::default(), Arc::new(handlers_executor), ); - let server_addr = options.grpc.server_addr.clone(); let instance = FrontendBuilder::new( - options, + fe_opts.clone(), cached_meta_backend.clone(), cache_registry.clone(), catalog_manager, datanode_clients, meta_client, - Arc::new(ProcessManager::new(server_addr, None)), + Arc::new(ProcessManager::new(fe_opts.grpc.server_addr.clone(), None)), ) .with_local_cache_invalidator(cache_registry) .try_build() .await .unwrap(); + let instance = Arc::new(instance); + // Build the servers for the frontend. + let servers = if start_frontend_servers { + Services::new(fe_opts, instance.clone(), Plugins::default()) + .build() + .unwrap() + } else { + ServerHandlers::default() + }; + Frontend { instance, - servers: ServerHandlers::default(), + servers, heartbeat_task: Some(heartbeat_task), export_metrics_task: None, } } + + fn build_frontend_options(&self) -> FrontendOptions { + let mut fe_opts = FrontendOptions::default(); + + // Choose a random unused port between [14000, 24000] for local test to avoid conflicts. + let port_range = 14000..=24000; + let max_attempts = 10; + let localhost = "127.0.0.1"; + let construct_addr = |port: u16| format!("{}:{}", localhost, port); + + fe_opts.http.addr = construct_addr(self.choose_random_unused_port( + port_range.clone(), + max_attempts, + localhost, + )); + + let grpc_port = self.choose_random_unused_port(port_range.clone(), max_attempts, localhost); + fe_opts.grpc.bind_addr = construct_addr(grpc_port); + fe_opts.grpc.server_addr = construct_addr(grpc_port); + fe_opts.mysql.addr = construct_addr(self.choose_random_unused_port( + port_range.clone(), + max_attempts, + localhost, + )); + fe_opts.postgres.addr = + construct_addr(self.choose_random_unused_port(port_range, max_attempts, localhost)); + + fe_opts + } + + // Choose a random unused port between [start, end]. + fn choose_random_unused_port( + &self, + port_range: RangeInclusive, + max_attempts: u16, + addr: &str, + ) -> u16 { + let mut rng = rand::rng(); + + let mut attempts = 0; + while attempts < max_attempts { + let port = rng.random_range(port_range.clone()); + if TcpListener::bind(format!("{}:{}", addr, port)).is_ok() { + return port; + } + attempts += 1; + } + + panic!("No unused port found"); + } } async fn build_datanode_clients( diff --git a/tests-integration/src/grpc/flight.rs b/tests-integration/src/grpc/flight.rs index 27e5fbc88d..590964d8f2 100644 --- a/tests-integration/src/grpc/flight.rs +++ b/tests-integration/src/grpc/flight.rs @@ -83,7 +83,7 @@ mod test { let db = GreptimeDbClusterBuilder::new("test_distributed_flight_do_put") .await - .build() + .build(false) .await; let runtime = common_runtime::global_runtime().clone(); diff --git a/tests-integration/src/tests.rs b/tests-integration/src/tests.rs index 811b0abdc0..255409a300 100644 --- a/tests-integration/src/tests.rs +++ b/tests-integration/src/tests.rs @@ -44,6 +44,6 @@ impl MockDistributedInstance { pub async fn create_distributed_instance(test_name: &str) -> MockDistributedInstance { let builder = GreptimeDbClusterBuilder::new(test_name).await; - let cluster = builder.build().await; + let cluster = builder.build(false).await; MockDistributedInstance(cluster) } diff --git a/tests-integration/src/tests/test_util.rs b/tests-integration/src/tests/test_util.rs index d8df68afaa..e3db5ea186 100644 --- a/tests-integration/src/tests/test_util.rs +++ b/tests-integration/src/tests/test_util.rs @@ -97,7 +97,7 @@ impl MockInstanceBuilder { MockInstanceImpl::Standalone(builder.build().await) } MockInstanceBuilder::Distributed(builder) => { - MockInstanceImpl::Distributed(builder.build().await) + MockInstanceImpl::Distributed(builder.build(false).await) } } } @@ -131,7 +131,9 @@ impl MockInstanceBuilder { .. } = instance; - MockInstanceImpl::Distributed(builder.build_with(datanode_options, guards).await) + MockInstanceImpl::Distributed( + builder.build_with(datanode_options, false, guards).await, + ) } } } @@ -207,7 +209,7 @@ pub(crate) async fn distributed_with_multiple_object_stores() -> Arc EVENTS_TABLE_PROCEDURE_ID_COLUMN_NAME, + Self::Timestamp => EVENTS_TABLE_TIMESTAMP_COLUMN_NAME, + Self::ProcedureState => EVENTS_TABLE_PROCEDURE_STATE_COLUMN_NAME, + Self::Schema => DEFAULT_PRIVATE_SCHEMA_NAME, + Self::Table => DEFAULT_EVENTS_TABLE_NAME, + Self::EventType => EVENTS_TABLE_TYPE_COLUMN_NAME, + Self::RegionMigrationTriggerReason => + EVENTS_TABLE_REGION_MIGRATION_TRIGGER_REASON_COLUMN_NAME, + Self::RegionId => EVENTS_TABLE_REGION_ID_COLUMN_NAME, + Self::SrcNodeId => EVENTS_TABLE_SRC_NODE_ID_COLUMN_NAME, + Self::DstNodeId => EVENTS_TABLE_DST_NODE_ID_COLUMN_NAME, + } + ) + .unwrap(); + } +} + +async fn check_region_migration_events_system_table( + fe_instance: &Arc, + procedure_id: &str, + region_id: u64, + from_peer_id: u64, + to_peer_id: u64, +) { + // Sleep enough time to ensure the event is recorded. + tokio::time::sleep(DEFAULT_FLUSH_INTERVAL_SECONDS * 2).await; + + // The query is equivalent to the following SQL: + // SELECT region_migration_trigger_reason, procedure_state FROM greptime_private.events WHERE + // type = 'region_migration' AND + // procedure_id = '${procedure_id}' AND + // table_id = ${table_id} AND + // region_id = ${region_id} AND + // region_migration_src_node_id = ${from_peer_id} AND + // region_migration_dst_node_id = ${to_peer_id} + // ORDER BY timestamp ASC + let query = Query::select() + .column(RegionMigrationEvents::RegionMigrationTriggerReason) + .column(RegionMigrationEvents::ProcedureState) + .from((RegionMigrationEvents::Schema, RegionMigrationEvents::Table)) + .and_where(Expr::col(RegionMigrationEvents::EventType).eq(REGION_MIGRATION_EVENT_TYPE)) + .and_where(Expr::col(RegionMigrationEvents::ProcedureId).eq(procedure_id)) + .and_where(Expr::col(RegionMigrationEvents::RegionId).eq(region_id)) + .and_where(Expr::col(RegionMigrationEvents::SrcNodeId).eq(from_peer_id)) + .and_where(Expr::col(RegionMigrationEvents::DstNodeId).eq(to_peer_id)) + .order_by(RegionMigrationEvents::Timestamp, Order::Asc) + .to_string(PostgresQueryBuilder); + + let result = fe_instance + .do_query(&query, QueryContext::arc()) + .await + .remove(0); + + let expected = "\ ++---------------------------------+-----------------+ +| region_migration_trigger_reason | procedure_state | ++---------------------------------+-----------------+ +| Manual | Running | +| Manual | Done | ++---------------------------------+-----------------+"; + check_output_stream(result.unwrap().data, expected).await; +}