feat: record the migration events in metasrv (#6579)

* feat: collect procedure event

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* feat: collect region migration events

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* test: add integration test

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* chore: fix docs error

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* chore: fix integration test error

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* chore: change status code for errors

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* refactor: add `event()` in Procedure

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* refactor: improve trait design

1. Add `user_metadata()` in `Procedure` trait;

2. Add `Eventable` trait;

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* chore: polish the code

Signed-off-by: zyy17 <zyylsxm@gmail.com>

---------

Signed-off-by: zyy17 <zyylsxm@gmail.com>
This commit is contained in:
zyy17
2025-08-05 20:30:33 -07:00
committed by Zhenchi
parent ebe78f668e
commit 7b48e53261
32 changed files with 841 additions and 57 deletions

View File

@@ -8,10 +8,8 @@ license.workspace = true
api.workspace = true
async-trait.workspace = true
backon.workspace = true
client.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-meta.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
serde.workspace = true

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use api::v1::ColumnSchema;
use common_error::ext::ErrorExt;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
@@ -35,6 +35,30 @@ pub enum Error {
expected: Vec<ColumnSchema>,
actual: Vec<ColumnSchema>,
},
#[snafu(display("Failed to serialize event"))]
SerializeEvent {
#[snafu(source)]
error: serde_json::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to insert events"))]
InsertEvents {
// BoxedError is utilized here to prevent introducing a circular dependency that would arise from directly referencing `client::error::Error`.
source: BoxedError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Keyvalue backend error"))]
KvBackend {
// BoxedError is utilized here to prevent introducing a circular dependency that would arise from directly referencing `common_meta::error::Error`.
source: BoxedError,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -42,8 +66,12 @@ pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::MismatchedSchema { .. } => StatusCode::InvalidArguments,
Error::NoAvailableFrontend { .. } => StatusCode::Internal,
Error::MismatchedSchema { .. } | Error::SerializeEvent { .. } => {
StatusCode::InvalidArguments
}
Error::NoAvailableFrontend { .. }
| Error::InsertEvents { .. }
| Error::KvBackend { .. } => StatusCode::Internal,
}
}

View File

@@ -81,7 +81,9 @@ pub trait Event: Send + Sync + Debug {
}
/// Returns the JSON bytes of the event as the payload. It will use JSON type to store the payload.
fn json_payload(&self) -> Result<String>;
fn json_payload(&self) -> Result<String> {
Ok("".to_string())
}
/// Add the extra schema to the event with the default schema.
fn extra_schema(&self) -> Vec<ColumnSchema> {
@@ -97,6 +99,14 @@ pub trait Event: Send + Sync + Debug {
fn as_any(&self) -> &dyn Any;
}
/// Eventable trait defines the interface for objects that can be converted to [Event].
pub trait Eventable: Send + Sync + Debug {
/// Converts the object to an [Event].
fn to_event(&self) -> Option<Box<dyn Event>> {
None
}
}
/// Returns the hints for the insert operation.
pub fn insert_hints() -> Vec<(&'static str, &'static str)> {
vec![
@@ -199,7 +209,7 @@ fn validate_events(events: &[&Box<dyn Event>]) -> Result<()> {
}
/// EventRecorder trait defines the interface for recording events.
pub trait EventRecorder: Send + Sync + 'static {
pub trait EventRecorder: Send + Sync + Debug + 'static {
/// Records an event for persistence and processing by [EventHandler].
fn record(&self, event: Box<dyn Event>);
@@ -231,6 +241,7 @@ impl Default for EventRecorderOptions {
}
/// Implementation of [EventRecorder] that records the events and processes them in the background by the [EventHandler].
#[derive(Debug)]
pub struct EventRecorderImpl {
// The channel to send the events to the background processor.
tx: Sender<Box<dyn Event>>,

View File

@@ -953,6 +953,7 @@ mod tests {
state_store,
poison_manager,
None,
None,
));
let _ = DdlManager::try_new(

View File

@@ -11,11 +11,13 @@ testing = []
workspace = true
[dependencies]
api.workspace = true
async-stream.workspace = true
async-trait.workspace = true
backon.workspace = true
common-base.workspace = true
common-error.workspace = true
common-event-recorder.workspace = true
common-macro.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true

View File

@@ -0,0 +1,116 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType};
use common_event_recorder::error::Result;
use common_event_recorder::Event;
use common_time::timestamp::{TimeUnit, Timestamp};
use crate::{ProcedureId, ProcedureState};
pub const EVENTS_TABLE_PROCEDURE_ID_COLUMN_NAME: &str = "procedure_id";
pub const EVENTS_TABLE_PROCEDURE_STATE_COLUMN_NAME: &str = "procedure_state";
pub const EVENTS_TABLE_PROCEDURE_ERROR_COLUMN_NAME: &str = "procedure_error";
/// `ProcedureEvent` represents an event emitted by a procedure during its execution lifecycle.
#[derive(Debug)]
pub struct ProcedureEvent {
/// Unique identifier associated with the originating procedure instance.
pub procedure_id: ProcedureId,
/// The timestamp of the event.
pub timestamp: Timestamp,
/// The state of the procedure.
pub state: ProcedureState,
/// The event emitted by the procedure. It's generated by [Procedure::event].
pub internal_event: Box<dyn Event>,
}
impl ProcedureEvent {
pub fn new(
procedure_id: ProcedureId,
internal_event: Box<dyn Event>,
state: ProcedureState,
) -> Self {
Self {
procedure_id,
internal_event,
timestamp: Timestamp::current_time(TimeUnit::Nanosecond),
state,
}
}
}
impl Event for ProcedureEvent {
fn event_type(&self) -> &str {
self.internal_event.event_type()
}
fn timestamp(&self) -> Timestamp {
self.timestamp
}
fn json_payload(&self) -> Result<String> {
self.internal_event.json_payload()
}
fn extra_schema(&self) -> Vec<ColumnSchema> {
let mut schema = vec![
ColumnSchema {
column_name: EVENTS_TABLE_PROCEDURE_ID_COLUMN_NAME.to_string(),
datatype: ColumnDataType::String.into(),
semantic_type: SemanticType::Field.into(),
..Default::default()
},
ColumnSchema {
column_name: EVENTS_TABLE_PROCEDURE_STATE_COLUMN_NAME.to_string(),
datatype: ColumnDataType::String.into(),
semantic_type: SemanticType::Field.into(),
..Default::default()
},
ColumnSchema {
column_name: EVENTS_TABLE_PROCEDURE_ERROR_COLUMN_NAME.to_string(),
datatype: ColumnDataType::String.into(),
semantic_type: SemanticType::Field.into(),
..Default::default()
},
];
schema.append(&mut self.internal_event.extra_schema());
schema
}
fn extra_row(&self) -> Result<Row> {
let error_str = match &self.state {
ProcedureState::Failed { error } => format!("{:?}", error),
ProcedureState::PrepareRollback { error } => format!("{:?}", error),
ProcedureState::RollingBack { error } => format!("{:?}", error),
ProcedureState::Retrying { error } => format!("{:?}", error),
ProcedureState::Poisoned { error, .. } => format!("{:?}", error),
_ => "".to_string(),
};
let mut row = vec![
ValueData::StringValue(self.procedure_id.to_string()).into(),
ValueData::StringValue(self.state.as_str_name().to_string()).into(),
ValueData::StringValue(error_str).into(),
];
row.append(&mut self.internal_event.extra_row()?.values);
Ok(Row { values: row })
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -17,6 +17,7 @@
#![feature(assert_matches)]
pub mod error;
pub mod event;
pub mod local;
pub mod options;
mod procedure;
@@ -28,9 +29,11 @@ pub mod watcher;
pub mod test_util;
pub use crate::error::{Error, Result};
pub use crate::event::ProcedureEvent;
pub use crate::procedure::{
BoxedProcedure, BoxedProcedureLoader, Context, ContextProvider, ContextProviderRef, LockKey,
Output, ParseIdError, PoisonKey, PoisonKeys, Procedure, ProcedureId, ProcedureInfo,
ProcedureManager, ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, StringKey,
UserMetadata,
};
pub use crate::watcher::Watcher;

View File

@@ -23,6 +23,7 @@ use std::time::{Duration, Instant};
use async_trait::async_trait;
use backon::ExponentialBuilder;
use common_error::ext::BoxedError;
use common_event_recorder::EventRecorderRef;
use common_runtime::{RepeatedTask, TaskFunction};
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{error, info, tracing};
@@ -36,6 +37,7 @@ use crate::error::{
Result, StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu,
TooManyRunningProceduresSnafu,
};
use crate::event::ProcedureEvent;
use crate::local::runner::Runner;
use crate::procedure::{BoxedProcedureLoader, InitProcedureState, PoisonKeys, ProcedureInfo};
use crate::rwlock::{KeyRwLock, OwnedKeyRwLockGuard};
@@ -43,7 +45,7 @@ use crate::store::poison_store::PoisonStoreRef;
use crate::store::{ProcedureMessage, ProcedureMessages, ProcedureStore, StateStoreRef};
use crate::{
BoxedProcedure, ContextProvider, LockKey, PoisonKey, ProcedureId, ProcedureManager,
ProcedureState, ProcedureWithId, StringKey, Watcher,
ProcedureState, ProcedureWithId, StringKey, UserMetadata, Watcher,
};
/// The expired time of a procedure's metadata.
@@ -81,9 +83,14 @@ pub(crate) struct ProcedureMeta {
start_time_ms: AtomicI64,
/// End execution time of this procedure.
end_time_ms: AtomicI64,
/// Event recorder.
event_recorder: Option<EventRecorderRef>,
/// The user metadata of the procedure. It's generated by [Procedure::user_metadata].
user_metadata: Option<UserMetadata>,
}
impl ProcedureMeta {
#[allow(clippy::too_many_arguments)]
fn new(
id: ProcedureId,
procedure_state: ProcedureState,
@@ -91,6 +98,8 @@ impl ProcedureMeta {
lock_key: LockKey,
poison_keys: PoisonKeys,
type_name: &str,
event_recorder: Option<EventRecorderRef>,
user_metadata: Option<UserMetadata>,
) -> ProcedureMeta {
let (state_sender, state_receiver) = watch::channel(procedure_state);
ProcedureMeta {
@@ -105,6 +114,8 @@ impl ProcedureMeta {
start_time_ms: AtomicI64::new(0),
end_time_ms: AtomicI64::new(0),
type_name: type_name.to_string(),
event_recorder,
user_metadata,
}
}
@@ -115,6 +126,15 @@ impl ProcedureMeta {
/// Update current [ProcedureState].
fn set_state(&self, state: ProcedureState) {
// Emit the event to the event recorder if the user metadata contains the eventable object.
if let (Some(event_recorder), Some(user_metadata)) =
(&self.event_recorder, &self.user_metadata)
{
if let Some(event) = user_metadata.to_event() {
event_recorder.record(Box::new(ProcedureEvent::new(self.id, event, state.clone())));
}
}
// Safety: ProcedureMeta also holds the receiver, so `send()` should never fail.
self.state_sender.send(state).unwrap();
}
@@ -557,6 +577,7 @@ pub struct LocalManager {
remove_outdated_meta_task: TokioMutex<Option<RepeatedTask<Error>>>,
config: ManagerConfig,
pause_aware: Option<PauseAwareRef>,
event_recorder: Option<EventRecorderRef>,
}
impl LocalManager {
@@ -566,6 +587,7 @@ impl LocalManager {
state_store: StateStoreRef,
poison_store: PoisonStoreRef,
pause_aware: Option<PauseAwareRef>,
event_recorder: Option<EventRecorderRef>,
) -> LocalManager {
let manager_ctx = Arc::new(ManagerContext::new(poison_store));
@@ -577,6 +599,7 @@ impl LocalManager {
remove_outdated_meta_task: TokioMutex::new(None),
config,
pause_aware,
event_recorder,
}
}
@@ -601,6 +624,7 @@ impl LocalManager {
) -> Result<Watcher> {
ensure!(self.manager_ctx.running(), ManagerNotStartSnafu);
let user_metadata = procedure.user_metadata();
let meta = Arc::new(ProcedureMeta::new(
procedure_id,
procedure_state,
@@ -608,6 +632,8 @@ impl LocalManager {
procedure.lock_key(),
procedure.poison_keys(),
procedure.type_name(),
self.event_recorder.clone(),
user_metadata.clone(),
));
let runner = Runner {
meta: meta.clone(),
@@ -619,8 +645,20 @@ impl LocalManager {
.with_max_times(self.max_retry_times),
store: self.procedure_store.clone(),
rolling_back: false,
event_recorder: self.event_recorder.clone(),
};
if let (Some(event_recorder), Some(event)) = (
self.event_recorder.as_ref(),
user_metadata.and_then(|m| m.to_event()),
) {
event_recorder.record(Box::new(ProcedureEvent::new(
procedure_id,
event,
ProcedureState::Running,
)));
}
let watcher = meta.state_receiver.clone();
ensure!(
@@ -870,6 +908,8 @@ pub(crate) mod test_util {
LockKey::default(),
PoisonKeys::default(),
"ProcedureAdapter",
None,
None,
)
}
@@ -1019,7 +1059,7 @@ mod tests {
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager, None);
let manager = LocalManager::new(config, state_store, poison_manager, None, None);
manager.manager_ctx.start();
manager
@@ -1044,7 +1084,7 @@ mod tests {
};
let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager, None);
let manager = LocalManager::new(config, state_store, poison_manager, None, None);
manager.manager_ctx.start();
manager
@@ -1098,7 +1138,7 @@ mod tests {
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager, None);
let manager = LocalManager::new(config, state_store, poison_manager, None, None);
manager.manager_ctx.start();
let procedure_id = ProcedureId::random();
@@ -1150,7 +1190,7 @@ mod tests {
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager, None);
let manager = LocalManager::new(config, state_store, poison_manager, None, None);
manager.manager_ctx.start();
#[derive(Debug)]
@@ -1231,7 +1271,7 @@ mod tests {
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager, None);
let manager = LocalManager::new(config, state_store, poison_manager, None, None);
let mut procedure = ProcedureToLoad::new("submit");
procedure.lock_key = LockKey::single_exclusive("test.submit");
@@ -1259,7 +1299,7 @@ mod tests {
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager, None);
let manager = LocalManager::new(config, state_store, poison_manager, None, None);
manager.start().await.unwrap();
manager.stop().await.unwrap();
@@ -1296,7 +1336,7 @@ mod tests {
};
let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager, None);
let manager = LocalManager::new(config, state_store, poison_manager, None, None);
manager.manager_ctx.set_running();
let mut procedure = ProcedureToLoad::new("submit");
@@ -1378,7 +1418,7 @@ mod tests {
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager, None);
let manager = LocalManager::new(config, state_store, poison_manager, None, None);
manager.manager_ctx.set_running();
manager
@@ -1503,7 +1543,7 @@ mod tests {
};
let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager, None);
let manager = LocalManager::new(config, state_store, poison_manager, None, None);
manager.manager_ctx.start();
let notify = Arc::new(Notify::new());

View File

@@ -17,6 +17,7 @@ use std::sync::Arc;
use std::time::Duration;
use backon::{BackoffBuilder, ExponentialBuilder};
use common_event_recorder::EventRecorderRef;
use common_telemetry::{debug, error, info};
use rand::Rng;
use snafu::ResultExt;
@@ -96,6 +97,7 @@ pub(crate) struct Runner {
pub(crate) exponential_builder: ExponentialBuilder,
pub(crate) store: Arc<ProcedureStore>,
pub(crate) rolling_back: bool,
pub(crate) event_recorder: Option<EventRecorderRef>,
}
impl Runner {
@@ -425,6 +427,8 @@ impl Runner {
procedure.lock_key(),
procedure.poison_keys(),
procedure.type_name(),
self.event_recorder.clone(),
procedure.user_metadata(),
));
let runner = Runner {
meta: meta.clone(),
@@ -434,6 +438,7 @@ impl Runner {
exponential_builder: self.exponential_builder,
store: self.store.clone(),
rolling_back: false,
event_recorder: self.event_recorder.clone(),
};
// Insert the procedure. We already check the procedure existence before inserting
@@ -627,6 +632,7 @@ mod tests {
exponential_builder: ExponentialBuilder::default(),
store,
rolling_back: false,
event_recorder: None,
}
}

View File

@@ -19,6 +19,7 @@ use std::str::FromStr;
use std::sync::Arc;
use async_trait::async_trait;
use common_event_recorder::{Event, Eventable};
use serde::{Deserialize, Serialize};
use smallvec::{smallvec, SmallVec};
use snafu::{ResultExt, Snafu};
@@ -214,6 +215,29 @@ pub trait Procedure: Send {
fn poison_keys(&self) -> PoisonKeys {
PoisonKeys::default()
}
/// Returns the user metadata of the procedure. If the metadata contains the eventable object, you can use [UserMetadata::to_event] to get the event and emit it to the event recorder.
fn user_metadata(&self) -> Option<UserMetadata> {
None
}
}
/// The user metadata injected by the procedure caller. It can be used to emit events to the event recorder.
#[derive(Clone, Debug)]
pub struct UserMetadata {
event_object: Arc<dyn Eventable>,
}
impl UserMetadata {
/// Creates a new [UserMetadata] with the given event object.
pub fn new(event_object: Arc<dyn Eventable>) -> Self {
Self { event_object }
}
/// Returns the event of the procedure. It can be None if the procedure does not emit any event.
pub fn to_event(&self) -> Option<Box<dyn Event>> {
self.event_object.to_event()
}
}
#[async_trait]

View File

@@ -83,7 +83,7 @@ mod tests {
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let poison_manager = Arc::new(InMemoryPoisonStore::default());
let manager = LocalManager::new(config, state_store, poison_manager, None);
let manager = LocalManager::new(config, state_store, poison_manager, None, None);
manager.start().await.unwrap();
#[derive(Debug)]

View File

@@ -159,6 +159,7 @@ impl Instance {
kv_state_store.clone(),
kv_state_store,
Some(runtime_switch_manager),
None,
));
Ok((kv_backend, procedure_manager))

View File

@@ -31,6 +31,7 @@ common-base.workspace = true
common-catalog.workspace = true
common-config.workspace = true
common-error.workspace = true
common-event-recorder.workspace = true
common-greptimedb-telemetry.workspace = true
common-grpc.workspace = true
common-macro.workspace = true

View File

@@ -0,0 +1,88 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use async_trait::async_trait;
use client::{Client, Database};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME};
use common_error::ext::BoxedError;
use common_event_recorder::error::{
InsertEventsSnafu, KvBackendSnafu, NoAvailableFrontendSnafu, Result,
};
use common_event_recorder::{build_row_inserts_request, insert_hints, Event, EventHandler};
use common_grpc::channel_manager::ChannelManager;
use common_meta::peer::PeerLookupServiceRef;
use common_telemetry::debug;
use snafu::{ensure, ResultExt};
use crate::cluster::MetaPeerClientRef;
use crate::lease::MetaPeerLookupService;
pub mod region_migration_event;
/// EventHandlerImpl is the default event handler implementation in metasrv. It sends the received events to the frontend instances.
pub struct EventHandlerImpl {
peer_lookup_service: PeerLookupServiceRef,
channel_manager: ChannelManager,
}
impl EventHandlerImpl {
pub fn new(meta_peer_client: MetaPeerClientRef) -> Self {
Self {
peer_lookup_service: Arc::new(MetaPeerLookupService::new(meta_peer_client)),
channel_manager: ChannelManager::new(),
}
}
}
#[async_trait]
impl EventHandler for EventHandlerImpl {
async fn handle(&self, events: &[Box<dyn Event>]) -> Result<()> {
self.build_database_client()
.await?
.row_inserts_with_hints(build_row_inserts_request(events)?, &insert_hints())
.await
.map_err(BoxedError::new)
.context(InsertEventsSnafu)?;
Ok(())
}
}
impl EventHandlerImpl {
async fn build_database_client(&self) -> Result<Database> {
let frontends = self
.peer_lookup_service
.active_frontends()
.await
.map_err(BoxedError::new)
.context(KvBackendSnafu)?;
ensure!(!frontends.is_empty(), NoAvailableFrontendSnafu);
let urls = frontends
.into_iter()
.map(|peer| peer.addr)
.collect::<Vec<_>>();
debug!("Available frontend addresses: {:?}", urls);
Ok(Database::new(
DEFAULT_CATALOG_NAME,
DEFAULT_PRIVATE_SCHEMA_NAME,
Client::with_manager_and_urls(self.channel_manager.clone(), urls),
))
}
}

View File

@@ -0,0 +1,159 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::time::Duration;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType};
use common_event_recorder::error::{Result, SerializeEventSnafu};
use common_event_recorder::Event;
use serde::Serialize;
use snafu::ResultExt;
use store_api::storage::{RegionId, TableId};
use crate::procedure::region_migration::{PersistentContext, RegionMigrationTriggerReason};
pub const REGION_MIGRATION_EVENT_TYPE: &str = "region_migration";
pub const EVENTS_TABLE_REGION_ID_COLUMN_NAME: &str = "region_id";
pub const EVENTS_TABLE_TABLE_ID_COLUMN_NAME: &str = "table_id";
pub const EVENTS_TABLE_REGION_NUMBER_COLUMN_NAME: &str = "region_number";
pub const EVENTS_TABLE_REGION_MIGRATION_TRIGGER_REASON_COLUMN_NAME: &str =
"region_migration_trigger_reason";
pub const EVENTS_TABLE_SRC_NODE_ID_COLUMN_NAME: &str = "region_migration_src_node_id";
pub const EVENTS_TABLE_SRC_PEER_ADDR_COLUMN_NAME: &str = "region_migration_src_peer_addr";
pub const EVENTS_TABLE_DST_NODE_ID_COLUMN_NAME: &str = "region_migration_dst_node_id";
pub const EVENTS_TABLE_DST_PEER_ADDR_COLUMN_NAME: &str = "region_migration_dst_peer_addr";
/// RegionMigrationEvent is the event of region migration.
#[derive(Debug, Serialize)]
pub(crate) struct RegionMigrationEvent {
#[serde(skip)]
region_id: RegionId,
#[serde(skip)]
table_id: TableId,
#[serde(skip)]
region_number: u32,
#[serde(skip)]
trigger_reason: RegionMigrationTriggerReason,
#[serde(skip)]
src_node_id: u64,
#[serde(skip)]
src_peer_addr: String,
#[serde(skip)]
dst_node_id: u64,
#[serde(skip)]
dst_peer_addr: String,
// The following fields will be serialized as the json payload.
timeout: Duration,
}
impl RegionMigrationEvent {
pub fn from_persistent_ctx(ctx: &PersistentContext) -> Self {
Self {
region_id: ctx.region_id,
table_id: ctx.region_id.table_id(),
region_number: ctx.region_id.region_number(),
trigger_reason: ctx.trigger_reason,
src_node_id: ctx.from_peer.id,
src_peer_addr: ctx.from_peer.addr.clone(),
dst_node_id: ctx.to_peer.id,
dst_peer_addr: ctx.to_peer.addr.clone(),
timeout: ctx.timeout,
}
}
}
impl Event for RegionMigrationEvent {
fn event_type(&self) -> &str {
REGION_MIGRATION_EVENT_TYPE
}
fn extra_schema(&self) -> Vec<ColumnSchema> {
vec![
ColumnSchema {
column_name: EVENTS_TABLE_REGION_ID_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Uint64.into(),
semantic_type: SemanticType::Field.into(),
..Default::default()
},
ColumnSchema {
column_name: EVENTS_TABLE_TABLE_ID_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Uint32.into(),
semantic_type: SemanticType::Field.into(),
..Default::default()
},
ColumnSchema {
column_name: EVENTS_TABLE_REGION_NUMBER_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Uint32.into(),
semantic_type: SemanticType::Field.into(),
..Default::default()
},
ColumnSchema {
column_name: EVENTS_TABLE_REGION_MIGRATION_TRIGGER_REASON_COLUMN_NAME.to_string(),
datatype: ColumnDataType::String.into(),
semantic_type: SemanticType::Field.into(),
..Default::default()
},
ColumnSchema {
column_name: EVENTS_TABLE_SRC_NODE_ID_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Uint64.into(),
semantic_type: SemanticType::Field.into(),
..Default::default()
},
ColumnSchema {
column_name: EVENTS_TABLE_SRC_PEER_ADDR_COLUMN_NAME.to_string(),
datatype: ColumnDataType::String.into(),
semantic_type: SemanticType::Field.into(),
..Default::default()
},
ColumnSchema {
column_name: EVENTS_TABLE_DST_NODE_ID_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Uint64.into(),
semantic_type: SemanticType::Field.into(),
..Default::default()
},
ColumnSchema {
column_name: EVENTS_TABLE_DST_PEER_ADDR_COLUMN_NAME.to_string(),
datatype: ColumnDataType::String.into(),
semantic_type: SemanticType::Field.into(),
..Default::default()
},
]
}
fn extra_row(&self) -> Result<Row> {
Ok(Row {
values: vec![
ValueData::U64Value(self.region_id.as_u64()).into(),
ValueData::U32Value(self.table_id).into(),
ValueData::U32Value(self.region_number).into(),
ValueData::StringValue(self.trigger_reason.to_string()).into(),
ValueData::U64Value(self.src_node_id).into(),
ValueData::StringValue(self.src_peer_addr.to_string()).into(),
ValueData::U64Value(self.dst_node_id).into(),
ValueData::StringValue(self.dst_peer_addr.to_string()).into(),
],
})
}
fn json_payload(&self) -> Result<String> {
serde_json::to_string(self).context(SerializeEventSnafu)
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -22,6 +22,7 @@ pub mod cache_invalidator;
pub mod cluster;
pub mod election;
pub mod error;
pub mod events;
mod failure_detector;
pub mod flow_meta_alloc;
pub mod handler;

View File

@@ -23,6 +23,7 @@ use clap::ValueEnum;
use common_base::readable_size::ReadableSize;
use common_base::Plugins;
use common_config::{Configurable, DEFAULT_DATA_HOME};
use common_event_recorder::EventRecorderOptions;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl_manager::DdlManagerRef;
@@ -179,6 +180,8 @@ pub struct MetasrvOptions {
pub meta_election_lock_id: u64,
#[serde(with = "humantime_serde")]
pub node_max_idle_time: Duration,
/// The event recorder options.
pub event_recorder: EventRecorderOptions,
}
impl fmt::Debug for MetasrvOptions {
@@ -269,6 +272,7 @@ impl Default for MetasrvOptions {
#[cfg(feature = "pg_kvbackend")]
meta_election_lock_id: common_meta::kv_backend::DEFAULT_META_ELECTION_LOCK_ID,
node_max_idle_time: Duration::from_secs(24 * 60 * 60),
event_recorder: EventRecorderOptions::default(),
}
}
}

View File

@@ -19,6 +19,7 @@ use std::sync::{Arc, Mutex, RwLock};
use client::client_manager::NodeClients;
use common_base::Plugins;
use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
use common_event_recorder::{EventRecorderImpl, EventRecorderRef};
use common_grpc::channel_manager::ChannelConfig;
use common_meta::ddl::flow_meta::FlowMetadataAllocator;
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
@@ -48,6 +49,7 @@ use snafu::{ensure, ResultExt};
use crate::cache_invalidator::MetasrvCacheInvalidator;
use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
use crate::error::{self, BuildWalOptionsAllocatorSnafu, Result};
use crate::events::EventHandlerImpl;
use crate::flow_meta_alloc::FlowPeerAllocator;
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::handler::failure_handler::RegionFailureHandler;
@@ -191,12 +193,23 @@ impl MetasrvBuilder {
let meta_peer_client = meta_peer_client
.unwrap_or_else(|| build_default_meta_peer_client(&election, &in_memory));
// Builds the event recorder to record important events and persist them as the system table.
let event_recorder = Arc::new(EventRecorderImpl::new(
Box::new(EventHandlerImpl::new(meta_peer_client.clone())),
options.event_recorder.clone(),
));
let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector::default()));
let pushers = Pushers::default();
let mailbox = build_mailbox(&kv_backend, &pushers);
let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend.clone()));
let procedure_manager =
build_procedure_manager(&options, &kv_backend, &runtime_switch_manager);
let procedure_manager = build_procedure_manager(
&options,
&kv_backend,
&runtime_switch_manager,
event_recorder,
);
let table_metadata_manager = Arc::new(TableMetadataManager::new(
leader_cached_kv_backend.clone() as _,
@@ -524,6 +537,7 @@ fn build_procedure_manager(
options: &MetasrvOptions,
kv_backend: &KvBackendRef,
runtime_switch_manager: &RuntimeSwitchManagerRef,
event_recorder: EventRecorderRef,
) -> ProcedureManagerRef {
let manager_config = ManagerConfig {
max_retry_times: options.procedure.max_retry_times,
@@ -545,6 +559,7 @@ fn build_procedure_manager(
kv_state_store.clone(),
kv_state_store,
Some(runtime_switch_manager.clone()),
Some(event_recorder),
))
}

View File

@@ -27,9 +27,11 @@ pub(crate) mod upgrade_candidate_region;
use std::any::Any;
use std::fmt::{Debug, Display};
use std::sync::Arc;
use std::time::Duration;
use common_error::ext::BoxedError;
use common_event_recorder::{Event, Eventable};
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl::RegionFailureDetectorControllerRef;
use common_meta::instruction::CacheIdent;
@@ -44,7 +46,9 @@ use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
use common_procedure::error::{
Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status, StringKey};
use common_procedure::{
Context as ProcedureContext, LockKey, Procedure, Status, StringKey, UserMetadata,
};
use common_telemetry::{error, info};
use manager::RegionMigrationProcedureGuard;
pub use manager::{
@@ -58,6 +62,7 @@ use tokio::time::Instant;
use self::migration_start::RegionMigrationStart;
use crate::error::{self, Result};
use crate::events::region_migration_event::RegionMigrationEvent;
use crate::metrics::{
METRIC_META_REGION_MIGRATION_ERROR, METRIC_META_REGION_MIGRATION_EXECUTE,
METRIC_META_REGION_MIGRATION_STAGE_ELAPSED,
@@ -75,21 +80,21 @@ pub const DEFAULT_REGION_MIGRATION_TIMEOUT: Duration = Duration::from_secs(120);
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct PersistentContext {
/// The table catalog.
catalog: String,
pub(crate) catalog: String,
/// The table schema.
schema: String,
pub(crate) schema: String,
/// The [Peer] of migration source.
from_peer: Peer,
pub(crate) from_peer: Peer,
/// The [Peer] of migration destination.
to_peer: Peer,
pub(crate) to_peer: Peer,
/// The [RegionId] of migration region.
region_id: RegionId,
pub(crate) region_id: RegionId,
/// The timeout for downgrading leader region and upgrading candidate region operations.
#[serde(with = "humantime_serde", default = "default_timeout")]
timeout: Duration,
pub(crate) timeout: Duration,
/// The trigger reason of region migration.
#[serde(default)]
trigger_reason: RegionMigrationTriggerReason,
pub(crate) trigger_reason: RegionMigrationTriggerReason,
}
fn default_timeout() -> Duration {
@@ -109,6 +114,12 @@ impl PersistentContext {
}
}
impl Eventable for PersistentContext {
fn to_event(&self) -> Option<Box<dyn Event>> {
Some(Box::new(RegionMigrationEvent::from_persistent_ctx(self)))
}
}
/// Metrics of region migration.
#[derive(Debug, Clone, Default)]
pub struct Metrics {
@@ -307,7 +318,7 @@ impl DefaultContextFactory {
impl ContextFactory for DefaultContextFactory {
fn new_context(self, persistent_ctx: PersistentContext) -> Context {
Context {
persistent_ctx,
persistent_ctx: Arc::new(persistent_ctx),
volatile_ctx: self.volatile_ctx,
in_memory: self.in_memory_key,
table_metadata_manager: self.table_metadata_manager,
@@ -322,7 +333,7 @@ impl ContextFactory for DefaultContextFactory {
/// The context of procedure execution.
pub struct Context {
persistent_ctx: PersistentContext,
persistent_ctx: Arc<PersistentContext>,
volatile_ctx: VolatileContext,
in_memory: ResettableKvBackendRef,
table_metadata_manager: TableMetadataManagerRef,
@@ -539,6 +550,11 @@ impl Context {
.await;
Ok(())
}
/// Returns the [PersistentContext] of the procedure.
pub fn persistent_ctx(&self) -> Arc<PersistentContext> {
self.persistent_ctx.clone()
}
}
#[async_trait::async_trait]
@@ -743,6 +759,10 @@ impl Procedure for RegionMigrationProcedure {
fn lock_key(&self) -> LockKey {
LockKey::new(self.context.persistent_ctx.lock_key())
}
fn user_metadata(&self) -> Option<UserMetadata> {
Some(UserMetadata::new(self.context.persistent_ctx()))
}
}
#[cfg(test)]

View File

@@ -95,6 +95,7 @@ impl TestingEnv {
state_store,
poison_manager,
None,
None,
));
Self {

View File

@@ -53,6 +53,7 @@ impl TestEnv {
state_store,
poison_manager,
None,
None,
));
let mailbox_ctx = MailboxContext::new(mailbox_sequence);