From fac8c3e62cf0f3f08ce15cea072198165fef9fd6 Mon Sep 17 00:00:00 2001 From: zyy17 Date: Wed, 16 Jul 2025 11:59:41 +0800 Subject: [PATCH] feat: introduce common event recorder (#6501) * feat: introduce common event recorder Signed-off-by: zyy17 * refactor: use `backon` as retry lib Signed-off-by: zyy17 * chore: remove unused error Signed-off-by: zyy17 * refactor: add CancellationToken Signed-off-by: zyy17 * refactor: add `cancel()` Signed-off-by: zyy17 * refactor: `cancel()` -> `close()` chore: `cancel()` -> `close()` and polish some code Signed-off-by: zyy17 --------- Signed-off-by: zyy17 --- Cargo.lock | 21 + Cargo.toml | 2 + src/common/event-recorder/Cargo.toml | 25 ++ src/common/event-recorder/src/error.rs | 53 +++ src/common/event-recorder/src/lib.rs | 18 + src/common/event-recorder/src/recorder.rs | 521 ++++++++++++++++++++++ 6 files changed, 640 insertions(+) create mode 100644 src/common/event-recorder/Cargo.toml create mode 100644 src/common/event-recorder/src/error.rs create mode 100644 src/common/event-recorder/src/lib.rs create mode 100644 src/common/event-recorder/src/recorder.rs diff --git a/Cargo.lock b/Cargo.lock index 960f510cba..8a47dc1982 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2302,6 +2302,27 @@ dependencies = [ "tonic 0.12.3", ] +[[package]] +name = "common-event-recorder" +version = "0.16.0" +dependencies = [ + "api", + "async-trait", + "backon", + "client", + "common-error", + "common-macro", + "common-meta", + "common-telemetry", + "common-time", + "serde", + "serde_json", + "snafu 0.8.5", + "store-api", + "tokio", + "tokio-util", +] + [[package]] name = "common-frontend" version = "0.16.0" diff --git a/Cargo.toml b/Cargo.toml index a61412f621..44e355b0ed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ members = [ "src/common/datasource", "src/common/decimal", "src/common/error", + "src/common/event-recorder", "src/common/frontend", "src/common/function", "src/common/greptimedb-telemetry", @@ -247,6 +248,7 @@ common-config = { path = "src/common/config" } common-datasource = { path = "src/common/datasource" } common-decimal = { path = "src/common/decimal" } common-error = { path = "src/common/error" } +common-event-recorder = { path = "src/common/event-recorder" } common-frontend = { path = "src/common/frontend" } common-function = { path = "src/common/function" } common-greptimedb-telemetry = { path = "src/common/greptimedb-telemetry" } diff --git a/src/common/event-recorder/Cargo.toml b/src/common/event-recorder/Cargo.toml new file mode 100644 index 0000000000..173fa70639 --- /dev/null +++ b/src/common/event-recorder/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "common-event-recorder" +version.workspace = true +edition.workspace = true +license.workspace = true + +[dependencies] +api.workspace = true +async-trait.workspace = true +backon.workspace = true +client.workspace = true +common-error.workspace = true +common-macro.workspace = true +common-meta.workspace = true +common-telemetry.workspace = true +common-time.workspace = true +serde.workspace = true +serde_json.workspace = true +snafu.workspace = true +store-api.workspace = true +tokio.workspace = true +tokio-util.workspace = true + +[lints] +workspace = true diff --git a/src/common/event-recorder/src/error.rs b/src/common/event-recorder/src/error.rs new file mode 100644 index 0000000000..8567a1a46e --- /dev/null +++ b/src/common/event-recorder/src/error.rs @@ -0,0 +1,53 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::ColumnSchema; +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use common_macro::stack_trace_debug; +use snafu::{Location, Snafu}; + +#[derive(Snafu)] +#[snafu(visibility(pub))] +#[stack_trace_debug] +pub enum Error { + #[snafu(display("No available frontend"))] + NoAvailableFrontend { + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Mismatched schema, expected: {:?}, actual: {:?}", expected, actual))] + MismatchedSchema { + #[snafu(implicit)] + location: Location, + expected: Vec, + actual: Vec, + }, +} + +pub type Result = std::result::Result; + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + match self { + Error::MismatchedSchema { .. } => StatusCode::InvalidArguments, + Error::NoAvailableFrontend { .. } => StatusCode::Internal, + } + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } +} diff --git a/src/common/event-recorder/src/lib.rs b/src/common/event-recorder/src/lib.rs new file mode 100644 index 0000000000..4f764291a3 --- /dev/null +++ b/src/common/event-recorder/src/lib.rs @@ -0,0 +1,18 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod error; +pub mod recorder; + +pub use recorder::*; diff --git a/src/common/event-recorder/src/recorder.rs b/src/common/event-recorder/src/recorder.rs new file mode 100644 index 0000000000..a1287aa1d5 --- /dev/null +++ b/src/common/event-recorder/src/recorder.rs @@ -0,0 +1,521 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::collections::HashMap; +use std::fmt::Debug; +use std::sync::{Arc, OnceLock}; +use std::time::Duration; + +use api::v1::column_data_type_extension::TypeExt; +use api::v1::value::ValueData; +use api::v1::{ + ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row, + RowInsertRequest, RowInsertRequests, Rows, SemanticType, +}; +use async_trait::async_trait; +use backon::{BackoffBuilder, ExponentialBuilder}; +use common_telemetry::{debug, error, info, warn}; +use common_time::timestamp::{TimeUnit, Timestamp}; +use serde::{Deserialize, Serialize}; +use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY}; +use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio::task::JoinHandle; +use tokio::time::sleep; +use tokio_util::sync::CancellationToken; + +use crate::error::{MismatchedSchemaSnafu, Result}; + +/// The default table name for storing the events. +pub const DEFAULT_EVENTS_TABLE_NAME: &str = "events"; + +/// The column name for the event type. +pub const EVENTS_TABLE_TYPE_COLUMN_NAME: &str = "type"; +/// The column name for the event payload. +pub const EVENTS_TABLE_PAYLOAD_COLUMN_NAME: &str = "payload"; +/// The column name for the event timestamp. +pub const EVENTS_TABLE_TIMESTAMP_COLUMN_NAME: &str = "timestamp"; + +/// EventRecorderRef is the reference to the event recorder. +pub type EventRecorderRef = Arc; + +static EVENTS_TABLE_TTL: OnceLock = OnceLock::new(); + +/// The time interval for flushing batched events to the event handler. +pub const DEFAULT_FLUSH_INTERVAL_SECONDS: Duration = Duration::from_secs(5); +// The default TTL for the events table. +const DEFAULT_EVENTS_TABLE_TTL: &str = "30d"; +// The capacity of the tokio channel for transmitting events to background processor. +const DEFAULT_CHANNEL_SIZE: usize = 2048; +// The size of the buffer for batching events before flushing to event handler. +const DEFAULT_BUFFER_SIZE: usize = 100; +// The maximum number of retry attempts when event handler processing fails. +const DEFAULT_MAX_RETRY_TIMES: u64 = 3; + +/// Event trait defines the interface for events that can be recorded and persisted as the system table. +/// By default, the event will be persisted as the system table with the following schema: +/// +/// - `type`: the type of the event. +/// - `payload`: the JSON bytes of the event. +/// - `timestamp`: the timestamp of the event. +/// +/// The event can also add the extra schema and row to the event by overriding the `extra_schema` and `extra_row` methods. +pub trait Event: Send + Sync + Debug { + /// Returns the type of the event. + fn event_type(&self) -> &str; + + /// Returns the timestamp of the event. Default to the current time. + fn timestamp(&self) -> Timestamp { + Timestamp::current_time(TimeUnit::Nanosecond) + } + + /// Returns the JSON bytes of the event as the payload. It will use JSON type to store the payload. + fn json_payload(&self) -> Result; + + /// Add the extra schema to the event with the default schema. + fn extra_schema(&self) -> Vec { + vec![] + } + + /// Add the extra row to the event with the default row. + fn extra_row(&self) -> Result { + Ok(Row { values: vec![] }) + } + + /// Returns the event as any type. + fn as_any(&self) -> &dyn Any; +} + +/// Returns the hints for the insert operation. +pub fn insert_hints() -> Vec<(&'static str, &'static str)> { + vec![ + ( + TTL_KEY, + EVENTS_TABLE_TTL + .get() + .map(|s| s.as_str()) + .unwrap_or(DEFAULT_EVENTS_TABLE_TTL), + ), + (APPEND_MODE_KEY, "true"), + ] +} + +/// Builds the row inserts request for the events that will be persisted to the events table. +pub fn build_row_inserts_request(events: &[Box]) -> Result { + // Aggregate the events by the event type. + let mut event_groups: HashMap<&str, Vec<&Box>> = HashMap::new(); + + for event in events { + event_groups + .entry(event.event_type()) + .or_default() + .push(event); + } + + let mut row_insert_requests = RowInsertRequests { + inserts: Vec::with_capacity(event_groups.len()), + }; + + for (_, events) in event_groups { + validate_events(&events)?; + + // We already validated the events, so it's safe to get the first event to build the schema for the RowInsertRequest. + let event = &events[0]; + let mut schema = vec![ + ColumnSchema { + column_name: EVENTS_TABLE_TYPE_COLUMN_NAME.to_string(), + datatype: ColumnDataType::String.into(), + semantic_type: SemanticType::Tag.into(), + ..Default::default() + }, + ColumnSchema { + column_name: EVENTS_TABLE_PAYLOAD_COLUMN_NAME.to_string(), + datatype: ColumnDataType::Binary as i32, + semantic_type: SemanticType::Field as i32, + datatype_extension: Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())), + }), + ..Default::default() + }, + ColumnSchema { + column_name: EVENTS_TABLE_TIMESTAMP_COLUMN_NAME.to_string(), + datatype: ColumnDataType::TimestampNanosecond.into(), + semantic_type: SemanticType::Timestamp.into(), + ..Default::default() + }, + ]; + schema.extend(event.extra_schema()); + + let rows = events + .iter() + .map(|event| { + let mut row = Row { + values: vec![ + ValueData::StringValue(event.event_type().to_string()).into(), + ValueData::BinaryValue(event.json_payload()?.as_bytes().to_vec()).into(), + ValueData::TimestampNanosecondValue(event.timestamp().value()).into(), + ], + }; + row.values.extend(event.extra_row()?.values); + Ok(row) + }) + .collect::>>()?; + + row_insert_requests.inserts.push(RowInsertRequest { + table_name: DEFAULT_EVENTS_TABLE_NAME.to_string(), + rows: Some(Rows { schema, rows }), + }); + } + + Ok(row_insert_requests) +} + +// Ensure the events with the same event type have the same extra schema. +#[allow(clippy::borrowed_box)] +fn validate_events(events: &[&Box]) -> Result<()> { + // It's safe to get the first event because the events are already grouped by the event type. + let extra_schema = events[0].extra_schema(); + for event in events { + if event.extra_schema() != extra_schema { + MismatchedSchemaSnafu { + expected: extra_schema.clone(), + actual: event.extra_schema(), + } + .fail()?; + } + } + Ok(()) +} + +/// EventRecorder trait defines the interface for recording events. +pub trait EventRecorder: Send + Sync + 'static { + /// Records an event for persistence and processing by [EventHandler]. + fn record(&self, event: Box); + + /// Cancels the event recorder. + fn close(&self); +} + +/// EventHandler trait defines the interface for how to handle the event. +#[async_trait] +pub trait EventHandler: Send + Sync + 'static { + /// Processes and handles incoming events. The [DefaultEventHandlerImpl] implementation forwards events to frontend instances for persistence. + /// We use `&[Box]` to avoid consuming the events, so the caller can buffer the events and retry if the handler fails. + async fn handle(&self, events: &[Box]) -> Result<()>; +} + +/// Configuration options for the event recorder. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct EventRecorderOptions { + /// TTL for the events table that will be used to store the events. + pub ttl: String, +} + +impl Default for EventRecorderOptions { + fn default() -> Self { + Self { + ttl: DEFAULT_EVENTS_TABLE_TTL.to_string(), + } + } +} + +/// Implementation of [EventRecorder] that records the events and processes them in the background by the [EventHandler]. +pub struct EventRecorderImpl { + // The channel to send the events to the background processor. + tx: Sender>, + // The cancel token to cancel the background processor. + cancel_token: CancellationToken, + // The background processor to process the events. + handle: Option>, +} + +impl EventRecorderImpl { + pub fn new(event_handler: Box, opts: EventRecorderOptions) -> Self { + info!("Creating event recorder with options: {:?}", opts); + + let (tx, rx) = channel(DEFAULT_CHANNEL_SIZE); + let cancel_token = CancellationToken::new(); + + let mut recorder = Self { + tx, + handle: None, + cancel_token: cancel_token.clone(), + }; + + let processor = EventProcessor::new( + rx, + event_handler, + DEFAULT_FLUSH_INTERVAL_SECONDS, + DEFAULT_MAX_RETRY_TIMES, + ) + .with_cancel_token(cancel_token); + + // Spawn a background task to process the events. + let handle = tokio::spawn(async move { + processor.process(DEFAULT_BUFFER_SIZE).await; + }); + + recorder.handle = Some(handle); + + // It only sets the ttl once, so it's safe to skip the error. + if EVENTS_TABLE_TTL.set(opts.ttl.clone()).is_err() { + info!( + "Events table ttl already set to {}, skip setting it", + opts.ttl + ); + } + + recorder + } +} + +impl EventRecorder for EventRecorderImpl { + // Accepts an event and send it to the background handler. + fn record(&self, event: Box) { + if let Err(e) = self.tx.try_send(event) { + error!("Failed to send event to the background processor: {}", e); + } + } + + // Closes the event recorder. It will stop the background processor and flush the buffer. + fn close(&self) { + self.cancel_token.cancel(); + } +} + +impl Drop for EventRecorderImpl { + fn drop(&mut self) { + if let Some(handle) = self.handle.take() { + handle.abort(); + info!("Aborted the background processor in event recorder"); + } + } +} + +struct EventProcessor { + rx: Receiver>, + event_handler: Box, + max_retry_times: u64, + process_interval: Duration, + cancel_token: CancellationToken, +} + +impl EventProcessor { + fn new( + rx: Receiver>, + event_handler: Box, + process_interval: Duration, + max_retry_times: u64, + ) -> Self { + Self { + rx, + event_handler, + max_retry_times, + process_interval, + cancel_token: CancellationToken::new(), + } + } + + fn with_cancel_token(mut self, cancel_token: CancellationToken) -> Self { + self.cancel_token = cancel_token; + self + } + + async fn process(mut self, buffer_size: usize) { + info!("Start the background processor in event recorder to handle the received events."); + + let mut buffer = Vec::with_capacity(buffer_size); + let mut interval = tokio::time::interval(self.process_interval); + + loop { + tokio::select! { + maybe_event = self.rx.recv() => { + if let Some(maybe_event) = maybe_event { + debug!("Received event: {:?}", maybe_event); + + if buffer.len() >= buffer_size { + debug!( + "Flushing events to the event handler because the buffer is full with {} events", + buffer.len() + ); + self.flush_events_to_handler(&mut buffer).await; + } + + // Push the event to the buffer, the buffer will be flushed when the interval is triggered or received a closed signal. + buffer.push(maybe_event); + } else { + // When received a closed signal, flush the buffer and exit the loop. + self.flush_events_to_handler(&mut buffer).await; + break; + } + } + // Cancel the processor through the cancel token. + _ = self.cancel_token.cancelled() => { + warn!("Received a cancel signal, flushing the buffer and exiting the loop"); + self.flush_events_to_handler(&mut buffer).await; + break; + } + // When the interval is triggered, flush the buffer and send the events to the event handler. + _ = interval.tick() => { + self.flush_events_to_handler(&mut buffer).await; + } + } + } + } + + // NOTE: While we implement a retry mechanism for failed event handling, there is no guarantee that all events will be processed successfully. + async fn flush_events_to_handler(&self, buffer: &mut Vec>) { + if !buffer.is_empty() { + debug!("Flushing {} events to the event handler", buffer.len()); + + let mut backoff = ExponentialBuilder::default() + .with_min_delay(Duration::from_millis( + DEFAULT_FLUSH_INTERVAL_SECONDS.as_millis() as u64 / self.max_retry_times.max(1), + )) + .with_max_delay(Duration::from_millis( + DEFAULT_FLUSH_INTERVAL_SECONDS.as_millis() as u64, + )) + .with_max_times(self.max_retry_times as usize) + .build(); + + loop { + match self.event_handler.handle(buffer).await { + Ok(()) => { + debug!("Successfully handled {} events", buffer.len()); + break; + } + Err(e) => { + if let Some(d) = backoff.next() { + warn!(e; "Failed to handle events, retrying..."); + sleep(d).await; + continue; + } else { + warn!( + e; "Failed to handle events after {} retries", + self.max_retry_times + ); + break; + } + } + } + } + } + + // Clear the buffer to prevent unbounded memory growth, regardless of whether event processing succeeded or failed. + buffer.clear(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[derive(Debug)] + struct TestEvent {} + + impl Event for TestEvent { + fn event_type(&self) -> &str { + "test_event" + } + + fn json_payload(&self) -> Result { + Ok("{\"procedure_id\": \"1234567890\"}".to_string()) + } + + fn as_any(&self) -> &dyn Any { + self + } + } + + struct TestEventHandlerImpl {} + + #[async_trait] + impl EventHandler for TestEventHandlerImpl { + async fn handle(&self, events: &[Box]) -> Result<()> { + let event = events + .first() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!( + event.json_payload().unwrap(), + "{\"procedure_id\": \"1234567890\"}" + ); + assert_eq!(event.event_type(), "test_event"); + Ok(()) + } + } + + #[tokio::test] + async fn test_event_recorder() { + let mut event_recorder = EventRecorderImpl::new( + Box::new(TestEventHandlerImpl {}), + EventRecorderOptions::default(), + ); + event_recorder.record(Box::new(TestEvent {})); + + // Cancel the event recorder to flush the buffer. + event_recorder.close(); + + // Sleep for a while to let the background task process the event. + sleep(Duration::from_millis(100)).await; + + if let Some(handle) = event_recorder.handle.take() { + assert!(handle.await.is_ok()); + } + } + + struct TestEventHandlerImplShouldPanic {} + + #[async_trait] + impl EventHandler for TestEventHandlerImplShouldPanic { + async fn handle(&self, events: &[Box]) -> Result<()> { + let event = events + .first() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + + // Set the incorrect payload and event type to trigger the panic. + assert_eq!( + event.json_payload().unwrap(), + "{\"procedure_id\": \"should_panic\"}" + ); + assert_eq!(event.event_type(), "should_panic"); + Ok(()) + } + } + + #[tokio::test] + async fn test_event_recorder_should_panic() { + let mut event_recorder = EventRecorderImpl::new( + Box::new(TestEventHandlerImplShouldPanic {}), + EventRecorderOptions::default(), + ); + + event_recorder.record(Box::new(TestEvent {})); + + // Close the event recorder to flush the buffer. + event_recorder.close(); + + // Sleep for a while to let the background task process the event. + sleep(Duration::from_millis(100)).await; + + if let Some(handle) = event_recorder.handle.take() { + assert!(handle.await.unwrap_err().is_panic()); + } + } +}