diff --git a/Cargo.lock b/Cargo.lock
index ca03b88277..fd17338f79 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2341,10 +2341,8 @@ dependencies = [
"api",
"async-trait",
"backon",
- "client",
"common-error",
"common-macro",
- "common-meta",
"common-telemetry",
"common-time",
"serde",
@@ -2630,11 +2628,13 @@ dependencies = [
name = "common-procedure"
version = "0.16.0"
dependencies = [
+ "api",
"async-stream",
"async-trait",
"backon",
"common-base",
"common-error",
+ "common-event-recorder",
"common-macro",
"common-runtime",
"common-telemetry",
@@ -6234,6 +6234,17 @@ dependencies = [
"snafu 0.7.5",
]
+[[package]]
+name = "inherent"
+version = "1.0.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6c38228f24186d9cc68c729accb4d413be9eaed6ad07ff79e0270d9e56f3de13"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.100",
+]
+
[[package]]
name = "inotify"
version = "0.11.0"
@@ -7370,6 +7381,7 @@ dependencies = [
"common-catalog",
"common-config",
"common-error",
+ "common-event-recorder",
"common-greptimedb-telemetry",
"common-grpc",
"common-macro",
@@ -11257,6 +11269,30 @@ dependencies = [
"sha2",
]
+[[package]]
+name = "sea-query"
+version = "0.32.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "64c91783d1514b99754fc6a4079081dcc2c587dadbff65c48c7f62297443536a"
+dependencies = [
+ "inherent",
+ "sea-query-derive",
+]
+
+[[package]]
+name = "sea-query-derive"
+version = "0.4.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bae0cbad6ab996955664982739354128c58d16e126114fe88c2a493642502aab"
+dependencies = [
+ "darling 0.20.10",
+ "heck 0.4.1",
+ "proc-macro2",
+ "quote",
+ "syn 2.0.100",
+ "thiserror 2.0.12",
+]
+
[[package]]
name = "seahash"
version = "4.1.0"
@@ -13017,6 +13053,7 @@ dependencies = [
"common-catalog",
"common-config",
"common-error",
+ "common-event-recorder",
"common-grpc",
"common-meta",
"common-procedure",
@@ -13059,6 +13096,7 @@ dependencies = [
"rand 0.9.0",
"rstest",
"rstest_reuse",
+ "sea-query",
"serde_json",
"servers",
"session",
diff --git a/Cargo.toml b/Cargo.toml
index 4c8ae155ec..cdc1c1ee16 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -195,7 +195,9 @@ rstest = "0.25"
rstest_reuse = "0.7"
rust_decimal = "1.33"
rustc-hash = "2.0"
-rustls = { version = "0.23.25", default-features = false } # It is worth noting that we should try to avoid using aws-lc-rs until it can be compiled on various platforms.
+# It is worth noting that we should try to avoid using aws-lc-rs until it can be compiled on various platforms.
+rustls = { version = "0.23.25", default-features = false }
+sea-query = "0.32"
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", features = ["float_roundtrip"] }
serde_with = "3"
diff --git a/config/config.md b/config/config.md
index cc787ae8f9..8ecde920e8 100644
--- a/config/config.md
+++ b/config/config.md
@@ -381,6 +381,8 @@
| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
Only accepts strings that match the following regular expression pattern:
[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*
i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1. |
| `wal.replication_factor` | Integer | `1` | Expected number of replicas of each partition. |
| `wal.create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled. |
+| `event_recorder` | -- | -- | Configuration options for the event recorder. |
+| `event_recorder.ttl` | String | `30d` | TTL for the events table that will be used to store the events. |
| `logging` | -- | -- | The logging options. |
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml
index 893c3a7ee8..10f07c138d 100644
--- a/config/metasrv.example.toml
+++ b/config/metasrv.example.toml
@@ -240,6 +240,11 @@ create_topic_timeout = "30s"
# client_cert_path = "/path/to/client_cert"
# client_key_path = "/path/to/key"
+## Configuration options for the event recorder.
+[event_recorder]
+## TTL for the events table that will be used to store the events.
+ttl = "30d"
+
## The logging options.
[logging]
## The directory to store the log files. If set to empty, logs will not be written to files.
diff --git a/src/common/event-recorder/Cargo.toml b/src/common/event-recorder/Cargo.toml
index 173fa70639..28cc9b54de 100644
--- a/src/common/event-recorder/Cargo.toml
+++ b/src/common/event-recorder/Cargo.toml
@@ -8,10 +8,8 @@ license.workspace = true
api.workspace = true
async-trait.workspace = true
backon.workspace = true
-client.workspace = true
common-error.workspace = true
common-macro.workspace = true
-common-meta.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
serde.workspace = true
diff --git a/src/common/event-recorder/src/error.rs b/src/common/event-recorder/src/error.rs
index 8567a1a46e..01f7327718 100644
--- a/src/common/event-recorder/src/error.rs
+++ b/src/common/event-recorder/src/error.rs
@@ -13,7 +13,7 @@
// limitations under the License.
use api::v1::ColumnSchema;
-use common_error::ext::ErrorExt;
+use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
@@ -35,6 +35,30 @@ pub enum Error {
expected: Vec,
actual: Vec,
},
+
+ #[snafu(display("Failed to serialize event"))]
+ SerializeEvent {
+ #[snafu(source)]
+ error: serde_json::error::Error,
+ #[snafu(implicit)]
+ location: Location,
+ },
+
+ #[snafu(display("Failed to insert events"))]
+ InsertEvents {
+ // BoxedError is utilized here to prevent introducing a circular dependency that would arise from directly referencing `client::error::Error`.
+ source: BoxedError,
+ #[snafu(implicit)]
+ location: Location,
+ },
+
+ #[snafu(display("Keyvalue backend error"))]
+ KvBackend {
+ // BoxedError is utilized here to prevent introducing a circular dependency that would arise from directly referencing `common_meta::error::Error`.
+ source: BoxedError,
+ #[snafu(implicit)]
+ location: Location,
+ },
}
pub type Result = std::result::Result;
@@ -42,8 +66,12 @@ pub type Result = std::result::Result;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
- Error::MismatchedSchema { .. } => StatusCode::InvalidArguments,
- Error::NoAvailableFrontend { .. } => StatusCode::Internal,
+ Error::MismatchedSchema { .. } | Error::SerializeEvent { .. } => {
+ StatusCode::InvalidArguments
+ }
+ Error::NoAvailableFrontend { .. }
+ | Error::InsertEvents { .. }
+ | Error::KvBackend { .. } => StatusCode::Internal,
}
}
diff --git a/src/common/event-recorder/src/recorder.rs b/src/common/event-recorder/src/recorder.rs
index 3c887aa455..781fbbbf6d 100644
--- a/src/common/event-recorder/src/recorder.rs
+++ b/src/common/event-recorder/src/recorder.rs
@@ -81,7 +81,9 @@ pub trait Event: Send + Sync + Debug {
}
/// Returns the JSON bytes of the event as the payload. It will use JSON type to store the payload.
- fn json_payload(&self) -> Result;
+ fn json_payload(&self) -> Result {
+ Ok("".to_string())
+ }
/// Add the extra schema to the event with the default schema.
fn extra_schema(&self) -> Vec {
@@ -97,6 +99,14 @@ pub trait Event: Send + Sync + Debug {
fn as_any(&self) -> &dyn Any;
}
+/// Eventable trait defines the interface for objects that can be converted to [Event].
+pub trait Eventable: Send + Sync + Debug {
+ /// Converts the object to an [Event].
+ fn to_event(&self) -> Option> {
+ None
+ }
+}
+
/// Returns the hints for the insert operation.
pub fn insert_hints() -> Vec<(&'static str, &'static str)> {
vec![
@@ -199,7 +209,7 @@ fn validate_events(events: &[&Box]) -> Result<()> {
}
/// EventRecorder trait defines the interface for recording events.
-pub trait EventRecorder: Send + Sync + 'static {
+pub trait EventRecorder: Send + Sync + Debug + 'static {
/// Records an event for persistence and processing by [EventHandler].
fn record(&self, event: Box);
@@ -231,6 +241,7 @@ impl Default for EventRecorderOptions {
}
/// Implementation of [EventRecorder] that records the events and processes them in the background by the [EventHandler].
+#[derive(Debug)]
pub struct EventRecorderImpl {
// The channel to send the events to the background processor.
tx: Sender>,
diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs
index 1750161d34..372903975f 100644
--- a/src/common/meta/src/ddl_manager.rs
+++ b/src/common/meta/src/ddl_manager.rs
@@ -953,6 +953,7 @@ mod tests {
state_store,
poison_manager,
None,
+ None,
));
let _ = DdlManager::try_new(
diff --git a/src/common/procedure/Cargo.toml b/src/common/procedure/Cargo.toml
index 99ed2e5617..37e5f3669f 100644
--- a/src/common/procedure/Cargo.toml
+++ b/src/common/procedure/Cargo.toml
@@ -11,11 +11,13 @@ testing = []
workspace = true
[dependencies]
+api.workspace = true
async-stream.workspace = true
async-trait.workspace = true
backon.workspace = true
common-base.workspace = true
common-error.workspace = true
+common-event-recorder.workspace = true
common-macro.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
diff --git a/src/common/procedure/src/event.rs b/src/common/procedure/src/event.rs
new file mode 100644
index 0000000000..022805f7bd
--- /dev/null
+++ b/src/common/procedure/src/event.rs
@@ -0,0 +1,116 @@
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::any::Any;
+
+use api::v1::value::ValueData;
+use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType};
+use common_event_recorder::error::Result;
+use common_event_recorder::Event;
+use common_time::timestamp::{TimeUnit, Timestamp};
+
+use crate::{ProcedureId, ProcedureState};
+
+pub const EVENTS_TABLE_PROCEDURE_ID_COLUMN_NAME: &str = "procedure_id";
+pub const EVENTS_TABLE_PROCEDURE_STATE_COLUMN_NAME: &str = "procedure_state";
+pub const EVENTS_TABLE_PROCEDURE_ERROR_COLUMN_NAME: &str = "procedure_error";
+
+/// `ProcedureEvent` represents an event emitted by a procedure during its execution lifecycle.
+#[derive(Debug)]
+pub struct ProcedureEvent {
+ /// Unique identifier associated with the originating procedure instance.
+ pub procedure_id: ProcedureId,
+ /// The timestamp of the event.
+ pub timestamp: Timestamp,
+ /// The state of the procedure.
+ pub state: ProcedureState,
+ /// The event emitted by the procedure. It's generated by [Procedure::event].
+ pub internal_event: Box,
+}
+
+impl ProcedureEvent {
+ pub fn new(
+ procedure_id: ProcedureId,
+ internal_event: Box,
+ state: ProcedureState,
+ ) -> Self {
+ Self {
+ procedure_id,
+ internal_event,
+ timestamp: Timestamp::current_time(TimeUnit::Nanosecond),
+ state,
+ }
+ }
+}
+
+impl Event for ProcedureEvent {
+ fn event_type(&self) -> &str {
+ self.internal_event.event_type()
+ }
+
+ fn timestamp(&self) -> Timestamp {
+ self.timestamp
+ }
+
+ fn json_payload(&self) -> Result {
+ self.internal_event.json_payload()
+ }
+
+ fn extra_schema(&self) -> Vec {
+ let mut schema = vec![
+ ColumnSchema {
+ column_name: EVENTS_TABLE_PROCEDURE_ID_COLUMN_NAME.to_string(),
+ datatype: ColumnDataType::String.into(),
+ semantic_type: SemanticType::Field.into(),
+ ..Default::default()
+ },
+ ColumnSchema {
+ column_name: EVENTS_TABLE_PROCEDURE_STATE_COLUMN_NAME.to_string(),
+ datatype: ColumnDataType::String.into(),
+ semantic_type: SemanticType::Field.into(),
+ ..Default::default()
+ },
+ ColumnSchema {
+ column_name: EVENTS_TABLE_PROCEDURE_ERROR_COLUMN_NAME.to_string(),
+ datatype: ColumnDataType::String.into(),
+ semantic_type: SemanticType::Field.into(),
+ ..Default::default()
+ },
+ ];
+ schema.append(&mut self.internal_event.extra_schema());
+ schema
+ }
+
+ fn extra_row(&self) -> Result {
+ let error_str = match &self.state {
+ ProcedureState::Failed { error } => format!("{:?}", error),
+ ProcedureState::PrepareRollback { error } => format!("{:?}", error),
+ ProcedureState::RollingBack { error } => format!("{:?}", error),
+ ProcedureState::Retrying { error } => format!("{:?}", error),
+ ProcedureState::Poisoned { error, .. } => format!("{:?}", error),
+ _ => "".to_string(),
+ };
+ let mut row = vec![
+ ValueData::StringValue(self.procedure_id.to_string()).into(),
+ ValueData::StringValue(self.state.as_str_name().to_string()).into(),
+ ValueData::StringValue(error_str).into(),
+ ];
+ row.append(&mut self.internal_event.extra_row()?.values);
+ Ok(Row { values: row })
+ }
+
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+}
diff --git a/src/common/procedure/src/lib.rs b/src/common/procedure/src/lib.rs
index 2cd7c39d0c..e322765558 100644
--- a/src/common/procedure/src/lib.rs
+++ b/src/common/procedure/src/lib.rs
@@ -17,6 +17,7 @@
#![feature(assert_matches)]
pub mod error;
+pub mod event;
pub mod local;
pub mod options;
mod procedure;
@@ -28,9 +29,11 @@ pub mod watcher;
pub mod test_util;
pub use crate::error::{Error, Result};
+pub use crate::event::ProcedureEvent;
pub use crate::procedure::{
BoxedProcedure, BoxedProcedureLoader, Context, ContextProvider, ContextProviderRef, LockKey,
Output, ParseIdError, PoisonKey, PoisonKeys, Procedure, ProcedureId, ProcedureInfo,
ProcedureManager, ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, StringKey,
+ UserMetadata,
};
pub use crate::watcher::Watcher;
diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs
index 9b80884436..2730f52630 100644
--- a/src/common/procedure/src/local.rs
+++ b/src/common/procedure/src/local.rs
@@ -23,6 +23,7 @@ use std::time::{Duration, Instant};
use async_trait::async_trait;
use backon::ExponentialBuilder;
use common_error::ext::BoxedError;
+use common_event_recorder::EventRecorderRef;
use common_runtime::{RepeatedTask, TaskFunction};
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{error, info, tracing};
@@ -36,6 +37,7 @@ use crate::error::{
Result, StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu,
TooManyRunningProceduresSnafu,
};
+use crate::event::ProcedureEvent;
use crate::local::runner::Runner;
use crate::procedure::{BoxedProcedureLoader, InitProcedureState, PoisonKeys, ProcedureInfo};
use crate::rwlock::{KeyRwLock, OwnedKeyRwLockGuard};
@@ -43,7 +45,7 @@ use crate::store::poison_store::PoisonStoreRef;
use crate::store::{ProcedureMessage, ProcedureMessages, ProcedureStore, StateStoreRef};
use crate::{
BoxedProcedure, ContextProvider, LockKey, PoisonKey, ProcedureId, ProcedureManager,
- ProcedureState, ProcedureWithId, StringKey, Watcher,
+ ProcedureState, ProcedureWithId, StringKey, UserMetadata, Watcher,
};
/// The expired time of a procedure's metadata.
@@ -81,9 +83,14 @@ pub(crate) struct ProcedureMeta {
start_time_ms: AtomicI64,
/// End execution time of this procedure.
end_time_ms: AtomicI64,
+ /// Event recorder.
+ event_recorder: Option,
+ /// The user metadata of the procedure. It's generated by [Procedure::user_metadata].
+ user_metadata: Option,
}
impl ProcedureMeta {
+ #[allow(clippy::too_many_arguments)]
fn new(
id: ProcedureId,
procedure_state: ProcedureState,
@@ -91,6 +98,8 @@ impl ProcedureMeta {
lock_key: LockKey,
poison_keys: PoisonKeys,
type_name: &str,
+ event_recorder: Option,
+ user_metadata: Option,
) -> ProcedureMeta {
let (state_sender, state_receiver) = watch::channel(procedure_state);
ProcedureMeta {
@@ -105,6 +114,8 @@ impl ProcedureMeta {
start_time_ms: AtomicI64::new(0),
end_time_ms: AtomicI64::new(0),
type_name: type_name.to_string(),
+ event_recorder,
+ user_metadata,
}
}
@@ -115,6 +126,15 @@ impl ProcedureMeta {
/// Update current [ProcedureState].
fn set_state(&self, state: ProcedureState) {
+ // Emit the event to the event recorder if the user metadata contains the eventable object.
+ if let (Some(event_recorder), Some(user_metadata)) =
+ (&self.event_recorder, &self.user_metadata)
+ {
+ if let Some(event) = user_metadata.to_event() {
+ event_recorder.record(Box::new(ProcedureEvent::new(self.id, event, state.clone())));
+ }
+ }
+
// Safety: ProcedureMeta also holds the receiver, so `send()` should never fail.
self.state_sender.send(state).unwrap();
}
@@ -557,6 +577,7 @@ pub struct LocalManager {
remove_outdated_meta_task: TokioMutex