refactor: unify the event recorder (#6689)

* refactor: unify the event recorder

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* refactor: add `table_name()` in `Event` trait

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* chore: add `slow_query_options` in `Instance`

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* refactor: add `EventHandlerOptions` and `options()` in `EventHandler` trait

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* chore: add `aggregate_events_by_type()` and support log mode of slow query

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* chore: polish the code

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* fix: clippy errors

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* chore: support to set ttl by using extension of query context

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* chore: refine the configs fields

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* fix: sqlness test errors

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* refactor: use `Duration` type instead of `String` for ttl fields

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* refactor: use pre-allocation for building RowInsertRequests

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* chore: fix clippy errors

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* refactor: code review

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* chore: fix integration errors

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* refactor: polish code for `group_events_by_type()` and `build_row_inserts_request()`, also add the unit tests

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* chore: refine comments

---------

Signed-off-by: zyy17 <zyylsxm@gmail.com>
This commit is contained in:
zyy17
2025-08-12 11:26:12 -07:00
committed by GitHub
parent 9ad9a7d2bc
commit 777da35b0d
28 changed files with 649 additions and 627 deletions

12
Cargo.lock generated
View File

@@ -1666,6 +1666,7 @@ dependencies = [
"common-base",
"common-catalog",
"common-error",
"common-event-recorder",
"common-frontend",
"common-macro",
"common-meta",
@@ -2347,6 +2348,9 @@ dependencies = [
"common-macro",
"common-telemetry",
"common-time",
"humantime",
"humantime-serde",
"itertools 0.14.0",
"serde",
"serde_json",
"snafu 0.8.5",
@@ -2359,13 +2363,18 @@ dependencies = [
name = "common-frontend"
version = "0.17.0"
dependencies = [
"api",
"async-trait",
"common-error",
"common-event-recorder",
"common-grpc",
"common-macro",
"common-meta",
"common-time",
"greptime-proto",
"humantime",
"meta-client",
"serde",
"session",
"snafu 0.8.5",
"tokio",
@@ -4878,6 +4887,7 @@ dependencies = [
"common-config",
"common-datasource",
"common-error",
"common-event-recorder",
"common-frontend",
"common-function",
"common-grpc",
@@ -4898,6 +4908,7 @@ dependencies = [
"datanode",
"datatypes",
"futures",
"humantime",
"humantime-serde",
"lazy_static",
"log-query",
@@ -13069,6 +13080,7 @@ dependencies = [
"common-config",
"common-error",
"common-event-recorder",
"common-frontend",
"common-grpc",
"common-meta",
"common-procedure",

View File

@@ -305,7 +305,7 @@
| `slow_query.record_type` | String | `system_table` | The record type of slow queries. It can be `system_table` or `log`.<br/>If `system_table` is selected, the slow queries will be recorded in a system table `greptime_private.slow_queries`.<br/>If `log` is selected, the slow queries will be logged in a log file `greptimedb-slow-queries.*`. |
| `slow_query.threshold` | String | `30s` | The threshold of slow query. It can be human readable time string, for example: `10s`, `100ms`, `1s`. |
| `slow_query.sample_ratio` | Float | `1.0` | The sampling ratio of slow query log. The value should be in the range of (0, 1]. For example, `0.1` means 10% of the slow queries will be logged and `1.0` means all slow queries will be logged. |
| `slow_query.ttl` | String | `30d` | The TTL of the `slow_queries` system table. Default is `30d` when `record_type` is `system_table`. |
| `slow_query.ttl` | String | `90d` | The TTL of the `slow_queries` system table. Default is `90d` when `record_type` is `system_table`. |
| `export_metrics` | -- | -- | The frontend can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
@@ -316,6 +316,8 @@
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
| `memory` | -- | -- | The memory options. |
| `memory.enable_heap_profiling` | Bool | `true` | Whether to enable heap profiling activation during startup.<br/>When enabled, heap profiling will be activated if the `MALLOC_CONF` environment variable<br/>is set to "prof:true,prof_active:false". The official image adds this env variable.<br/>Default is true. |
| `event_recorder` | -- | -- | Configuration options for the event recorder. |
| `event_recorder.ttl` | String | `90d` | TTL for the events table that will be used to store the events. Default is `90d`. |
### Metasrv
@@ -382,7 +384,7 @@
| `wal.replication_factor` | Integer | `1` | Expected number of replicas of each partition. |
| `wal.create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled. |
| `event_recorder` | -- | -- | Configuration options for the event recorder. |
| `event_recorder.ttl` | String | `30d` | TTL for the events table that will be used to store the events. |
| `event_recorder.ttl` | String | `90d` | TTL for the events table that will be used to store the events. Default is `90d`. |
| `logging` | -- | -- | The logging options. |
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |

View File

@@ -257,8 +257,8 @@ threshold = "30s"
## The sampling ratio of slow query log. The value should be in the range of (0, 1]. For example, `0.1` means 10% of the slow queries will be logged and `1.0` means all slow queries will be logged.
sample_ratio = 1.0
## The TTL of the `slow_queries` system table. Default is `30d` when `record_type` is `system_table`.
ttl = "30d"
## The TTL of the `slow_queries` system table. Default is `90d` when `record_type` is `system_table`.
ttl = "90d"
## The frontend can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
@@ -288,3 +288,8 @@ headers = { }
## is set to "prof:true,prof_active:false". The official image adds this env variable.
## Default is true.
enable_heap_profiling = true
## Configuration options for the event recorder.
[event_recorder]
## TTL for the events table that will be used to store the events. Default is `90d`.
ttl = "90d"

View File

@@ -242,8 +242,8 @@ create_topic_timeout = "30s"
## Configuration options for the event recorder.
[event_recorder]
## TTL for the events table that will be used to store the events.
ttl = "30d"
## TTL for the events table that will be used to store the events. Default is `90d`.
ttl = "90d"
## The logging options.
[logging]

View File

@@ -21,6 +21,7 @@ bytes.workspace = true
common-base.workspace = true
common-catalog.workspace = true
common-error.workspace = true
common-event-recorder.workspace = true
common-frontend.workspace = true
common-macro.workspace = true
common-meta.workspace = true

View File

@@ -21,17 +21,17 @@ use std::time::{Duration, Instant, UNIX_EPOCH};
use api::v1::frontend::{KillProcessRequest, ListProcessRequest, ProcessInfo};
use common_base::cancellation::CancellationHandle;
use common_event_recorder::EventRecorderRef;
use common_frontend::selector::{FrontendSelector, MetaClientSelector};
use common_frontend::slow_query_event::SlowQueryEvent;
use common_telemetry::{debug, error, info, warn};
use common_telemetry::logging::SlowQueriesRecordType;
use common_telemetry::{debug, info, slow, warn};
use common_time::util::current_time_millis;
use meta_client::MetaClientRef;
use promql_parser::parser::EvalStmt;
use rand::random;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use sql::statements::statement::Statement;
use tokio::sync::mpsc::Sender;
use crate::error;
use crate::metrics::{PROCESS_KILL_COUNT, PROCESS_LIST_COUNT};
@@ -249,6 +249,8 @@ pub struct Ticket {
pub(crate) manager: ProcessManagerRef,
pub(crate) id: ProcessId,
pub cancellation_handle: Arc<CancellationHandle>,
// Keep the handle of the slow query timer to ensure it will trigger the event recording when dropped.
_slow_query_timer: Option<SlowQueryTimer>,
}
@@ -295,38 +297,37 @@ impl Debug for CancellableProcess {
pub struct SlowQueryTimer {
start: Instant,
stmt: QueryStatement,
query_ctx: QueryContextRef,
threshold: Option<Duration>,
sample_ratio: Option<f64>,
tx: Sender<SlowQueryEvent>,
threshold: Duration,
sample_ratio: f64,
record_type: SlowQueriesRecordType,
recorder: EventRecorderRef,
}
impl SlowQueryTimer {
pub fn new(
stmt: QueryStatement,
query_ctx: QueryContextRef,
threshold: Option<Duration>,
sample_ratio: Option<f64>,
tx: Sender<SlowQueryEvent>,
threshold: Duration,
sample_ratio: f64,
record_type: SlowQueriesRecordType,
recorder: EventRecorderRef,
) -> Self {
Self {
start: Instant::now(),
stmt,
query_ctx,
threshold,
sample_ratio,
tx,
record_type,
recorder,
}
}
}
impl SlowQueryTimer {
fn send_slow_query_event(&self, elapsed: Duration, threshold: Duration) {
fn send_slow_query_event(&self, elapsed: Duration) {
let mut slow_query_event = SlowQueryEvent {
cost: elapsed.as_millis() as u64,
threshold: threshold.as_millis() as u64,
threshold: self.threshold.as_millis() as u64,
query: "".to_string(),
query_ctx: self.query_ctx.clone(),
// The following fields are only used for PromQL queries.
is_promql: false,
@@ -363,29 +364,37 @@ impl SlowQueryTimer {
}
}
// Send SlowQueryEvent to the handler.
if let Err(e) = self.tx.try_send(slow_query_event) {
error!(e; "Failed to send slow query event");
match self.record_type {
// Send the slow query event to the event recorder to persist it as the system table.
SlowQueriesRecordType::SystemTable => {
self.recorder.record(Box::new(slow_query_event));
}
// Record the slow query in a specific logs file.
SlowQueriesRecordType::Log => {
slow!(
cost = slow_query_event.cost,
threshold = slow_query_event.threshold,
query = slow_query_event.query,
is_promql = slow_query_event.is_promql,
promql_range = slow_query_event.promql_range,
promql_step = slow_query_event.promql_step,
promql_start = slow_query_event.promql_start,
promql_end = slow_query_event.promql_end,
);
}
}
}
}
impl Drop for SlowQueryTimer {
fn drop(&mut self) {
if let Some(threshold) = self.threshold {
// Calculate the elaspsed duration since the timer is created.
let elapsed = self.start.elapsed();
if elapsed > threshold {
if let Some(ratio) = self.sample_ratio {
// Only capture a portion of slow queries based on sample_ratio.
// Generate a random number in [0, 1) and compare it with sample_ratio.
if ratio >= 1.0 || random::<f64>() <= ratio {
self.send_slow_query_event(elapsed, threshold);
}
} else {
// Captures all slow queries if sample_ratio is not set.
self.send_slow_query_event(elapsed, threshold);
}
// Calculate the elaspsed duration since the timer is created.
let elapsed = self.start.elapsed();
if elapsed > self.threshold {
// Only capture a portion of slow queries based on sample_ratio.
// Generate a random number in [0, 1) and compare it with sample_ratio.
if self.sample_ratio >= 1.0 || random::<f64>() <= self.sample_ratio {
self.send_slow_query_event(elapsed);
}
}
}

View File

@@ -279,7 +279,7 @@ impl StartCommand {
&opts.component.logging,
&opts.component.tracing,
opts.component.node_id.clone(),
opts.component.slow_query.as_ref(),
Some(&opts.component.slow_query),
);
log_versions(verbose_version(), short_version(), APP_NAME);

View File

@@ -157,7 +157,7 @@ pub struct StandaloneOptions {
pub init_regions_in_background: bool,
pub init_regions_parallelism: usize,
pub max_in_flight_write_bytes: Option<ReadableSize>,
pub slow_query: Option<SlowQueryOptions>,
pub slow_query: SlowQueryOptions,
pub query: QueryOptions,
pub memory: MemoryOptions,
}
@@ -191,7 +191,7 @@ impl Default for StandaloneOptions {
init_regions_in_background: false,
init_regions_parallelism: 16,
max_in_flight_write_bytes: None,
slow_query: Some(SlowQueryOptions::default()),
slow_query: SlowQueryOptions::default(),
query: QueryOptions::default(),
memory: MemoryOptions::default(),
}
@@ -486,7 +486,7 @@ impl StartCommand {
&opts.component.logging,
&opts.component.tracing,
None,
opts.component.slow_query.as_ref(),
Some(&opts.component.slow_query),
);
log_versions(verbose_version(), short_version(), APP_NAME);

View File

@@ -12,6 +12,9 @@ common-error.workspace = true
common-macro.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
humantime.workspace = true
humantime-serde.workspace = true
itertools.workspace = true
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![feature(duration_constructors)]
pub mod error;
pub mod recorder;

View File

@@ -15,11 +15,10 @@
use std::any::Any;
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::{Arc, OnceLock};
use std::sync::Arc;
use std::time::Duration;
use api::v1::column_data_type_extension::TypeExt;
use api::v1::helper::{tag_column_schema, time_index_column_schema};
use api::v1::value::ValueData;
use api::v1::{
ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row,
@@ -29,6 +28,8 @@ use async_trait::async_trait;
use backon::{BackoffBuilder, ExponentialBuilder};
use common_telemetry::{debug, error, info, warn};
use common_time::timestamp::{TimeUnit, Timestamp};
use humantime::format_duration;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY};
use tokio::sync::mpsc::{channel, Receiver, Sender};
@@ -51,12 +52,10 @@ pub const EVENTS_TABLE_TIMESTAMP_COLUMN_NAME: &str = "timestamp";
/// EventRecorderRef is the reference to the event recorder.
pub type EventRecorderRef = Arc<dyn EventRecorder>;
static EVENTS_TABLE_TTL: OnceLock<String> = OnceLock::new();
/// The time interval for flushing batched events to the event handler.
pub const DEFAULT_FLUSH_INTERVAL_SECONDS: Duration = Duration::from_secs(5);
// The default TTL for the events table.
const DEFAULT_EVENTS_TABLE_TTL: &str = "30d";
/// The default TTL(90 days) for the events table.
const DEFAULT_EVENTS_TABLE_TTL: Duration = Duration::from_days(90);
// The capacity of the tokio channel for transmitting events to background processor.
const DEFAULT_CHANNEL_SIZE: usize = 2048;
// The size of the buffer for batching events before flushing to event handler.
@@ -73,6 +72,11 @@ const DEFAULT_MAX_RETRY_TIMES: u64 = 3;
///
/// The event can also add the extra schema and row to the event by overriding the `extra_schema` and `extra_row` methods.
pub trait Event: Send + Sync + Debug {
/// Returns the table name of the event.
fn table_name(&self) -> &str {
DEFAULT_EVENTS_TABLE_NAME
}
/// Returns the type of the event.
fn event_type(&self) -> &str;
@@ -108,81 +112,68 @@ pub trait Eventable: Send + Sync + Debug {
}
}
/// Returns the hints for the insert operation.
pub fn insert_hints() -> Vec<(&'static str, &'static str)> {
vec![
(
TTL_KEY,
EVENTS_TABLE_TTL
.get()
.map(|s| s.as_str())
.unwrap_or(DEFAULT_EVENTS_TABLE_TTL),
),
(APPEND_MODE_KEY, "true"),
]
/// Groups events by its `event_type`.
#[allow(clippy::borrowed_box)]
pub fn group_events_by_type(events: &[Box<dyn Event>]) -> HashMap<&str, Vec<&Box<dyn Event>>> {
events
.iter()
.into_grouping_map_by(|event| event.event_type())
.collect()
}
/// Builds the row inserts request for the events that will be persisted to the events table.
pub fn build_row_inserts_request(events: &[Box<dyn Event>]) -> Result<RowInsertRequests> {
// Aggregate the events by the event type.
let mut event_groups: HashMap<&str, Vec<&Box<dyn Event>>> = HashMap::new();
/// Builds the row inserts request for the events that will be persisted to the events table. The `events` should have the same event type, or it will return an error.
#[allow(clippy::borrowed_box)]
pub fn build_row_inserts_request(events: &[&Box<dyn Event>]) -> Result<RowInsertRequests> {
// Ensure all the events are the same type.
validate_events(events)?;
// We already validated the events, so it's safe to get the first event to build the schema for the RowInsertRequest.
let event = &events[0];
let mut schema: Vec<ColumnSchema> = Vec::with_capacity(3 + event.extra_schema().len());
schema.extend(vec![
ColumnSchema {
column_name: EVENTS_TABLE_TYPE_COLUMN_NAME.to_string(),
datatype: ColumnDataType::String.into(),
semantic_type: SemanticType::Tag.into(),
..Default::default()
},
ColumnSchema {
column_name: EVENTS_TABLE_PAYLOAD_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Binary as i32,
semantic_type: SemanticType::Field as i32,
datatype_extension: Some(ColumnDataTypeExtension {
type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
}),
..Default::default()
},
ColumnSchema {
column_name: EVENTS_TABLE_TIMESTAMP_COLUMN_NAME.to_string(),
datatype: ColumnDataType::TimestampNanosecond.into(),
semantic_type: SemanticType::Timestamp.into(),
..Default::default()
},
]);
schema.extend(event.extra_schema());
let mut rows: Vec<Row> = Vec::with_capacity(events.len());
for event in events {
event_groups
.entry(event.event_type())
.or_default()
.push(event);
let extra_row = event.extra_row()?;
let mut values = Vec::with_capacity(3 + extra_row.values.len());
values.extend([
ValueData::StringValue(event.event_type().to_string()).into(),
ValueData::BinaryValue(event.json_payload()?.into_bytes()).into(),
ValueData::TimestampNanosecondValue(event.timestamp().value()).into(),
]);
values.extend(extra_row.values);
rows.push(Row { values });
}
let mut row_insert_requests = RowInsertRequests {
inserts: Vec::with_capacity(event_groups.len()),
};
for (_, events) in event_groups {
validate_events(&events)?;
// We already validated the events, so it's safe to get the first event to build the schema for the RowInsertRequest.
let event = &events[0];
let mut schema = vec![
tag_column_schema(EVENTS_TABLE_TYPE_COLUMN_NAME, ColumnDataType::String),
ColumnSchema {
column_name: EVENTS_TABLE_PAYLOAD_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Binary as i32,
semantic_type: SemanticType::Field as i32,
datatype_extension: Some(ColumnDataTypeExtension {
type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
}),
..Default::default()
},
time_index_column_schema(
EVENTS_TABLE_TIMESTAMP_COLUMN_NAME,
ColumnDataType::TimestampNanosecond,
),
];
schema.extend(event.extra_schema());
let rows = events
.iter()
.map(|event| {
let mut row = Row {
values: vec![
ValueData::StringValue(event.event_type().to_string()).into(),
ValueData::BinaryValue(event.json_payload()?.as_bytes().to_vec()).into(),
ValueData::TimestampNanosecondValue(event.timestamp().value()).into(),
],
};
row.values.extend(event.extra_row()?.values);
Ok(row)
})
.collect::<Result<Vec<_>>>()?;
row_insert_requests.inserts.push(RowInsertRequest {
table_name: DEFAULT_EVENTS_TABLE_NAME.to_string(),
Ok(RowInsertRequests {
inserts: vec![RowInsertRequest {
table_name: event.table_name().to_string(),
rows: Some(Rows { schema, rows }),
});
}
Ok(row_insert_requests)
}],
})
}
// Ensure the events with the same event type have the same extra schema.
@@ -211,25 +202,59 @@ pub trait EventRecorder: Send + Sync + Debug + 'static {
fn close(&self);
}
/// EventHandlerOptions is the options for the event handler.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct EventHandlerOptions {
/// TTL for the events table that will be used to store the events.
pub ttl: Duration,
/// Append mode for the events table that will be used to store the events.
pub append_mode: bool,
}
impl Default for EventHandlerOptions {
fn default() -> Self {
Self {
ttl: DEFAULT_EVENTS_TABLE_TTL,
append_mode: true,
}
}
}
impl EventHandlerOptions {
/// Converts the options to the hints for the insert operation.
pub fn to_hints(&self) -> Vec<(&str, String)> {
vec![
(TTL_KEY, format_duration(self.ttl).to_string()),
(APPEND_MODE_KEY, self.append_mode.to_string()),
]
}
}
/// EventHandler trait defines the interface for how to handle the event.
#[async_trait]
pub trait EventHandler: Send + Sync + 'static {
/// Processes and handles incoming events. The [DefaultEventHandlerImpl] implementation forwards events to frontend instances for persistence.
/// We use `&[Box<dyn Event>]` to avoid consuming the events, so the caller can buffer the events and retry if the handler fails.
async fn handle(&self, events: &[Box<dyn Event>]) -> Result<()>;
/// Returns the handler options for the event type. We can use different options for different event types.
fn options(&self, _event_type: &str) -> EventHandlerOptions {
EventHandlerOptions::default()
}
}
/// Configuration options for the event recorder.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct EventRecorderOptions {
/// TTL for the events table that will be used to store the events.
pub ttl: String,
#[serde(with = "humantime_serde")]
pub ttl: Duration,
}
impl Default for EventRecorderOptions {
fn default() -> Self {
Self {
ttl: DEFAULT_EVENTS_TABLE_TTL.to_string(),
ttl: DEFAULT_EVENTS_TABLE_TTL,
}
}
}
@@ -246,9 +271,7 @@ pub struct EventRecorderImpl {
}
impl EventRecorderImpl {
pub fn new(event_handler: Box<dyn EventHandler>, opts: EventRecorderOptions) -> Self {
info!("Creating event recorder with options: {:?}", opts);
pub fn new(event_handler: Box<dyn EventHandler>) -> Self {
let (tx, rx) = channel(DEFAULT_CHANNEL_SIZE);
let cancel_token = CancellationToken::new();
@@ -273,14 +296,6 @@ impl EventRecorderImpl {
recorder.handle = Some(handle);
// It only sets the ttl once, so it's safe to skip the error.
if EVENTS_TABLE_TTL.set(opts.ttl.clone()).is_err() {
info!(
"Events table ttl already set to {}, skip setting it",
opts.ttl
);
}
recorder
}
}
@@ -465,10 +480,7 @@ mod tests {
#[tokio::test]
async fn test_event_recorder() {
let mut event_recorder = EventRecorderImpl::new(
Box::new(TestEventHandlerImpl {}),
EventRecorderOptions::default(),
);
let mut event_recorder = EventRecorderImpl::new(Box::new(TestEventHandlerImpl {}));
event_recorder.record(Box::new(TestEvent {}));
// Sleep for a while to let the event be sent to the event handler.
@@ -509,10 +521,8 @@ mod tests {
#[tokio::test]
async fn test_event_recorder_should_panic() {
let mut event_recorder = EventRecorderImpl::new(
Box::new(TestEventHandlerImplShouldPanic {}),
EventRecorderOptions::default(),
);
let mut event_recorder =
EventRecorderImpl::new(Box::new(TestEventHandlerImplShouldPanic {}));
event_recorder.record(Box::new(TestEvent {}));
@@ -529,4 +539,135 @@ mod tests {
assert!(handle.await.unwrap_err().is_panic());
}
}
#[derive(Debug)]
struct TestEventA {}
impl Event for TestEventA {
fn event_type(&self) -> &str {
"A"
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[derive(Debug)]
struct TestEventB {}
impl Event for TestEventB {
fn table_name(&self) -> &str {
"table_B"
}
fn event_type(&self) -> &str {
"B"
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[derive(Debug)]
struct TestEventC {}
impl Event for TestEventC {
fn table_name(&self) -> &str {
"table_C"
}
fn event_type(&self) -> &str {
"C"
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[test]
fn test_group_events_by_type() {
let events: Vec<Box<dyn Event>> = vec![
Box::new(TestEventA {}),
Box::new(TestEventB {}),
Box::new(TestEventA {}),
Box::new(TestEventC {}),
Box::new(TestEventB {}),
Box::new(TestEventC {}),
Box::new(TestEventA {}),
];
let event_groups = group_events_by_type(&events);
assert_eq!(event_groups.len(), 3);
assert_eq!(event_groups.get("A").unwrap().len(), 3);
assert_eq!(event_groups.get("B").unwrap().len(), 2);
assert_eq!(event_groups.get("C").unwrap().len(), 2);
}
#[test]
fn test_build_row_inserts_request() {
let events: Vec<Box<dyn Event>> = vec![
Box::new(TestEventA {}),
Box::new(TestEventB {}),
Box::new(TestEventA {}),
Box::new(TestEventC {}),
Box::new(TestEventB {}),
Box::new(TestEventC {}),
Box::new(TestEventA {}),
];
let event_groups = group_events_by_type(&events);
assert_eq!(event_groups.len(), 3);
assert_eq!(event_groups.get("A").unwrap().len(), 3);
assert_eq!(event_groups.get("B").unwrap().len(), 2);
assert_eq!(event_groups.get("C").unwrap().len(), 2);
for (event_type, events) in event_groups {
let row_inserts_request = build_row_inserts_request(&events).unwrap();
if event_type == "A" {
assert_eq!(row_inserts_request.inserts.len(), 1);
assert_eq!(
row_inserts_request.inserts[0].table_name,
DEFAULT_EVENTS_TABLE_NAME
);
assert_eq!(
row_inserts_request.inserts[0]
.rows
.as_ref()
.unwrap()
.rows
.len(),
3
);
} else if event_type == "B" {
assert_eq!(row_inserts_request.inserts.len(), 1);
assert_eq!(row_inserts_request.inserts[0].table_name, "table_B");
assert_eq!(
row_inserts_request.inserts[0]
.rows
.as_ref()
.unwrap()
.rows
.len(),
2
);
} else if event_type == "C" {
assert_eq!(row_inserts_request.inserts.len(), 1);
assert_eq!(row_inserts_request.inserts[0].table_name, "table_C");
assert_eq!(
row_inserts_request.inserts[0]
.rows
.as_ref()
.unwrap()
.rows
.len(),
2
);
} else {
panic!("Unexpected event type: {}", event_type);
}
}
}
}

View File

@@ -5,13 +5,18 @@ edition.workspace = true
license.workspace = true
[dependencies]
api.workspace = true
async-trait.workspace = true
common-error.workspace = true
common-event-recorder.workspace = true
common-grpc.workspace = true
common-macro.workspace = true
common-meta.workspace = true
common-time.workspace = true
greptime-proto.workspace = true
humantime.workspace = true
meta-client.workspace = true
serde.workspace = true
session.workspace = true
snafu.workspace = true
tonic.workspace = true

View File

@@ -12,17 +12,121 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use session::context::QueryContextRef;
use std::any::Any;
#[derive(Debug)]
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType};
use common_event_recorder::error::Result;
use common_event_recorder::Event;
use serde::Serialize;
pub const SLOW_QUERY_TABLE_NAME: &str = "slow_queries";
pub const SLOW_QUERY_TABLE_COST_COLUMN_NAME: &str = "cost";
pub const SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME: &str = "threshold";
pub const SLOW_QUERY_TABLE_QUERY_COLUMN_NAME: &str = "query";
pub const SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME: &str = "timestamp";
pub const SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME: &str = "is_promql";
pub const SLOW_QUERY_TABLE_PROMQL_START_COLUMN_NAME: &str = "promql_start";
pub const SLOW_QUERY_TABLE_PROMQL_END_COLUMN_NAME: &str = "promql_end";
pub const SLOW_QUERY_TABLE_PROMQL_RANGE_COLUMN_NAME: &str = "promql_range";
pub const SLOW_QUERY_TABLE_PROMQL_STEP_COLUMN_NAME: &str = "promql_step";
pub const SLOW_QUERY_EVENT_TYPE: &str = "slow_query";
/// SlowQueryEvent is the event of slow query.
#[derive(Debug, Serialize)]
pub struct SlowQueryEvent {
pub cost: u64,
pub threshold: u64,
pub query: String,
pub is_promql: bool,
pub query_ctx: QueryContextRef,
pub promql_range: Option<u64>,
pub promql_step: Option<u64>,
pub promql_start: Option<i64>,
pub promql_end: Option<i64>,
}
impl Event for SlowQueryEvent {
fn table_name(&self) -> &str {
SLOW_QUERY_TABLE_NAME
}
fn event_type(&self) -> &str {
SLOW_QUERY_EVENT_TYPE
}
fn extra_schema(&self) -> Vec<ColumnSchema> {
vec![
ColumnSchema {
column_name: SLOW_QUERY_TABLE_COST_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Uint64.into(),
semantic_type: SemanticType::Field.into(),
..Default::default()
},
ColumnSchema {
column_name: SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Uint64.into(),
semantic_type: SemanticType::Field.into(),
..Default::default()
},
ColumnSchema {
column_name: SLOW_QUERY_TABLE_QUERY_COLUMN_NAME.to_string(),
datatype: ColumnDataType::String.into(),
semantic_type: SemanticType::Field.into(),
..Default::default()
},
ColumnSchema {
column_name: SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Boolean.into(),
semantic_type: SemanticType::Field.into(),
..Default::default()
},
ColumnSchema {
column_name: SLOW_QUERY_TABLE_PROMQL_RANGE_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Uint64.into(),
semantic_type: SemanticType::Field.into(),
..Default::default()
},
ColumnSchema {
column_name: SLOW_QUERY_TABLE_PROMQL_STEP_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Uint64.into(),
semantic_type: SemanticType::Field.into(),
..Default::default()
},
ColumnSchema {
column_name: SLOW_QUERY_TABLE_PROMQL_START_COLUMN_NAME.to_string(),
datatype: ColumnDataType::TimestampMillisecond.into(),
semantic_type: SemanticType::Field.into(),
..Default::default()
},
ColumnSchema {
column_name: SLOW_QUERY_TABLE_PROMQL_END_COLUMN_NAME.to_string(),
datatype: ColumnDataType::TimestampMillisecond.into(),
semantic_type: SemanticType::Field.into(),
..Default::default()
},
]
}
fn extra_row(&self) -> Result<Row> {
Ok(Row {
values: vec![
ValueData::U64Value(self.cost).into(),
ValueData::U64Value(self.threshold).into(),
ValueData::StringValue(self.query.to_string()).into(),
ValueData::BoolValue(self.is_promql).into(),
ValueData::U64Value(self.promql_range.unwrap_or(0)).into(),
ValueData::U64Value(self.promql_step.unwrap_or(0)).into(),
ValueData::TimestampMillisecondValue(self.promql_start.unwrap_or(0)).into(),
ValueData::TimestampMillisecondValue(self.promql_end.unwrap_or(0)).into(),
],
})
}
fn json_payload(&self) -> Result<String> {
Ok("".to_string())
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -13,6 +13,7 @@
// limitations under the License.
#![feature(let_chains)]
#![feature(duration_constructors)]
pub mod logging;
mod macros;

View File

@@ -103,14 +103,15 @@ pub struct SlowQueryOptions {
/// The threshold of slow queries.
#[serde(with = "humantime_serde")]
pub threshold: Option<Duration>,
pub threshold: Duration,
/// The sample ratio of slow queries.
pub sample_ratio: Option<f64>,
pub sample_ratio: f64,
/// The table TTL of `slow_queries` system table. Default is "30d".
/// The table TTL of `slow_queries` system table. Default is "90d".
/// It's used when `record_type` is `SystemTable`.
pub ttl: Option<String>,
#[serde(with = "humantime_serde")]
pub ttl: Duration,
}
impl Default for SlowQueryOptions {
@@ -118,9 +119,9 @@ impl Default for SlowQueryOptions {
Self {
enable: true,
record_type: SlowQueriesRecordType::SystemTable,
threshold: Some(Duration::from_secs(30)),
sample_ratio: Some(1.0),
ttl: Some("30d".to_string()),
threshold: Duration::from_secs(30),
sample_ratio: 1.0,
ttl: Duration::from_days(90),
}
}
}
@@ -128,7 +129,9 @@ impl Default for SlowQueryOptions {
#[derive(Clone, Debug, Serialize, Deserialize, Copy, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum SlowQueriesRecordType {
/// Record the slow query in the system table.
SystemTable,
/// Record the slow query in a specific logs file.
Log,
}

View File

@@ -26,6 +26,7 @@ common-catalog.workspace = true
common-config.workspace = true
common-datasource.workspace = true
common-error.workspace = true
common-event-recorder.workspace = true
common-frontend.workspace = true
common-function.workspace = true
common-grpc.workspace = true
@@ -45,6 +46,7 @@ datafusion-expr.workspace = true
datanode.workspace = true
datatypes.workspace = true
futures.workspace = true
humantime.workspace = true
humantime-serde.workspace = true
lazy_static.workspace = true
log-query.workspace = true

100
src/frontend/src/events.rs Normal file
View File

@@ -0,0 +1,100 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use async_trait::async_trait;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME};
use common_error::ext::BoxedError;
use common_event_recorder::error::{InsertEventsSnafu, Result};
use common_event_recorder::{
build_row_inserts_request, group_events_by_type, Event, EventHandler, EventHandlerOptions,
};
use common_frontend::slow_query_event::SLOW_QUERY_EVENT_TYPE;
use humantime::format_duration;
use operator::insert::InserterRef;
use operator::statement::StatementExecutorRef;
use session::context::QueryContextBuilder;
use snafu::ResultExt;
use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY};
/// EventHandlerImpl is the default event handler implementation in frontend.
pub struct EventHandlerImpl {
inserter: InserterRef,
statement_executor: StatementExecutorRef,
slow_query_ttl: Duration,
global_ttl: Duration,
}
impl EventHandlerImpl {
/// Create a new EventHandlerImpl.
pub fn new(
inserter: InserterRef,
statement_executor: StatementExecutorRef,
slow_query_ttl: Duration,
global_ttl: Duration,
) -> Self {
Self {
inserter,
statement_executor,
slow_query_ttl,
global_ttl,
}
}
}
#[async_trait]
impl EventHandler for EventHandlerImpl {
async fn handle(&self, events: &[Box<dyn Event>]) -> Result<()> {
let event_groups = group_events_by_type(events);
for (event_type, events) in event_groups {
let opts = self.options(event_type);
let query_ctx = QueryContextBuilder::default()
.current_catalog(DEFAULT_CATALOG_NAME.to_string())
.current_schema(DEFAULT_PRIVATE_SCHEMA_NAME.to_string())
.set_extension(TTL_KEY.to_string(), format_duration(opts.ttl).to_string())
.set_extension(APPEND_MODE_KEY.to_string(), opts.append_mode.to_string())
.build()
.into();
self.inserter
.handle_row_inserts(
build_row_inserts_request(&events)?,
query_ctx,
&self.statement_executor,
false,
false,
)
.await
.map_err(BoxedError::new)
.context(InsertEventsSnafu)?;
}
Ok(())
}
fn options(&self, event_type: &str) -> EventHandlerOptions {
match event_type {
SLOW_QUERY_EVENT_TYPE => EventHandlerOptions {
ttl: self.slow_query_ttl,
append_mode: true,
},
_ => EventHandlerOptions {
ttl: self.global_ttl,
append_mode: true,
},
}
}
}

View File

@@ -16,6 +16,7 @@ use std::sync::Arc;
use common_base::readable_size::ReadableSize;
use common_config::config::Configurable;
use common_event_recorder::EventRecorderOptions;
use common_options::datanode::DatanodeClientOptions;
use common_options::memory::MemoryOptions;
use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, TracingOptions};
@@ -62,8 +63,10 @@ pub struct FrontendOptions {
pub tracing: TracingOptions,
pub query: QueryOptions,
pub max_in_flight_write_bytes: Option<ReadableSize>,
pub slow_query: Option<SlowQueryOptions>,
pub slow_query: SlowQueryOptions,
pub memory: MemoryOptions,
/// The event recorder options.
pub event_recorder: EventRecorderOptions,
}
impl Default for FrontendOptions {
@@ -89,8 +92,9 @@ impl Default for FrontendOptions {
tracing: TracingOptions::default(),
query: QueryOptions::default(),
max_in_flight_write_bytes: None,
slow_query: Some(SlowQueryOptions::default()),
slow_query: SlowQueryOptions::default(),
memory: MemoryOptions::default(),
event_recorder: EventRecorderOptions::default(),
}
}
}

View File

@@ -32,13 +32,16 @@ use std::time::{Duration, SystemTime};
use async_stream::stream;
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use catalog::process_manager::{ProcessManagerRef, QueryStatement as CatalogQueryStatement};
use catalog::process_manager::{
ProcessManagerRef, QueryStatement as CatalogQueryStatement, SlowQueryTimer,
};
use catalog::CatalogManagerRef;
use client::OutputData;
use common_base::cancellation::CancellableFuture;
use common_base::Plugins;
use common_config::KvBackendConfig;
use common_error::ext::{BoxedError, ErrorExt};
use common_event_recorder::EventRecorderRef;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::key::runtime_switch::RuntimeSwitchManager;
use common_meta::key::table_name::TableNameKey;
@@ -53,6 +56,7 @@ use common_procedure::ProcedureManagerRef;
use common_query::Output;
use common_recordbatch::error::StreamTimeoutSnafu;
use common_recordbatch::RecordBatchStreamWrapper;
use common_telemetry::logging::SlowQueryOptions;
use common_telemetry::{debug, error, info, tracing};
use dashmap::DashMap;
use datafusion_expr::LogicalPlan;
@@ -99,7 +103,6 @@ use crate::error::{
StatementTimeoutSnafu, TableOperationSnafu,
};
use crate::limiter::LimiterRef;
use crate::slow_query_recorder::SlowQueryRecorder;
use crate::stream_wrapper::CancellableStreamWrapper;
lazy_static! {
@@ -119,9 +122,10 @@ pub struct Instance {
inserter: InserterRef,
deleter: DeleterRef,
table_metadata_manager: TableMetadataManagerRef,
slow_query_recorder: Option<SlowQueryRecorder>,
event_recorder: Option<EventRecorderRef>,
limiter: Option<LimiterRef>,
process_manager: ProcessManagerRef,
slow_query_options: SlowQueryOptions,
// cache for otlp metrics
// first layer key: db-string
@@ -222,9 +226,20 @@ impl Instance {
let query_interceptor = query_interceptor.as_ref();
if should_capture_statement(Some(&stmt)) {
let slow_query_timer = self.slow_query_recorder.as_ref().and_then(|recorder| {
recorder.start(CatalogQueryStatement::Sql(stmt.clone()), query_ctx.clone())
});
let slow_query_timer = self
.slow_query_options
.enable
.then(|| self.event_recorder.clone())
.flatten()
.map(|event_recorder| {
SlowQueryTimer::new(
CatalogQueryStatement::Sql(stmt.clone()),
self.slow_query_options.threshold,
self.slow_query_options.sample_ratio,
self.slow_query_options.record_type,
event_recorder,
)
});
let ticket = self.process_manager.register_query(
query_ctx.current_catalog().to_string(),
@@ -586,9 +601,20 @@ impl SqlQueryHandler for Instance {
// It's safe to unwrap here because we've already checked the type.
let stmt = stmt.unwrap();
let query = stmt.to_string();
let slow_query_timer = self.slow_query_recorder.as_ref().and_then(|recorder| {
recorder.start(CatalogQueryStatement::Sql(stmt), query_ctx.clone())
});
let slow_query_timer = self
.slow_query_options
.enable
.then(|| self.event_recorder.clone())
.flatten()
.map(|event_recorder| {
SlowQueryTimer::new(
CatalogQueryStatement::Sql(stmt.clone()),
self.slow_query_options.threshold,
self.slow_query_options.sample_ratio,
self.slow_query_options.record_type,
event_recorder,
)
});
let ticket = self.process_manager.register_query(
query_ctx.current_catalog().to_string(),
@@ -739,9 +765,19 @@ impl PrometheusHandler for Instance {
let query = query_statement.to_string();
let slow_query_timer = self
.slow_query_recorder
.as_ref()
.and_then(|recorder| recorder.start(query_statement, query_ctx.clone()));
.slow_query_options
.enable
.then(|| self.event_recorder.clone())
.flatten()
.map(|event_recorder| {
SlowQueryTimer::new(
query_statement,
self.slow_query_options.threshold,
self.slow_query_options.sample_ratio,
self.slow_query_options.record_type,
event_recorder,
)
});
let ticket = self.process_manager.register_query(
query_ctx.current_catalog().to_string(),

View File

@@ -18,6 +18,7 @@ use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
use catalog::process_manager::ProcessManagerRef;
use catalog::CatalogManagerRef;
use common_base::Plugins;
use common_event_recorder::EventRecorderImpl;
use common_meta::cache::{LayeredCacheRegistryRef, TableRouteCacheRef};
use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator};
use common_meta::key::flow::FlowMetadataManager;
@@ -40,11 +41,11 @@ use query::QueryEngineFactory;
use snafu::OptionExt;
use crate::error::{self, Result};
use crate::events::EventHandlerImpl;
use crate::frontend::FrontendOptions;
use crate::instance::region_query::FrontendRegionQueryHandler;
use crate::instance::Instance;
use crate::limiter::Limiter;
use crate::slow_query_recorder::SlowQueryRecorder;
/// The frontend [`Instance`] builder.
pub struct FrontendBuilder {
@@ -194,16 +195,12 @@ impl FrontendBuilder {
plugins.insert::<StatementExecutorRef>(statement_executor.clone());
let slow_query_recorder = self.options.slow_query.and_then(|opts| {
opts.enable.then(|| {
SlowQueryRecorder::new(
opts.clone(),
inserter.clone(),
statement_executor.clone(),
self.catalog_manager.clone(),
)
})
});
let event_recorder = Arc::new(EventRecorderImpl::new(Box::new(EventHandlerImpl::new(
inserter.clone(),
statement_executor.clone(),
self.options.slow_query.ttl,
self.options.event_recorder.ttl,
))));
// Create the limiter if the max_in_flight_write_bytes is set.
let limiter = self
@@ -222,10 +219,11 @@ impl FrontendBuilder {
inserter,
deleter,
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)),
slow_query_recorder,
event_recorder: Some(event_recorder),
limiter,
process_manager,
otlp_metrics_table_legacy_cache: DashMap::new(),
slow_query_options: self.options.slow_query.clone(),
})
}
}

View File

@@ -15,6 +15,7 @@
#![feature(assert_matches)]
pub mod error;
pub mod events;
pub mod frontend;
pub mod heartbeat;
pub mod instance;
@@ -22,5 +23,4 @@ pub(crate) mod limiter;
pub(crate) mod metrics;
pub mod server;
pub mod service_config;
pub mod slow_query_recorder;
mod stream_wrapper;

View File

@@ -1,434 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::value::ValueData;
use api::v1::{
ColumnDataType, ColumnDef, ColumnSchema, CreateTableExpr, Row, RowInsertRequest,
RowInsertRequests, Rows, SemanticType,
};
use catalog::process_manager::{QueryStatement as CatalogQueryStatement, SlowQueryTimer};
use catalog::CatalogManagerRef;
use common_catalog::consts::{default_engine, DEFAULT_PRIVATE_SCHEMA_NAME};
use common_frontend::slow_query_event::SlowQueryEvent;
use common_telemetry::logging::{SlowQueriesRecordType, SlowQueryOptions};
use common_telemetry::{debug, error, info, slow};
use common_time::timestamp::{TimeUnit, Timestamp};
use operator::insert::InserterRef;
use operator::statement::StatementExecutorRef;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::ResultExt;
use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY};
use table::TableRef;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::task::JoinHandle;
use crate::error::{CatalogSnafu, Result, TableOperationSnafu};
pub const SLOW_QUERY_TABLE_NAME: &str = "slow_queries";
pub const SLOW_QUERY_TABLE_COST_COLUMN_NAME: &str = "cost";
pub const SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME: &str = "threshold";
pub const SLOW_QUERY_TABLE_QUERY_COLUMN_NAME: &str = "query";
pub const SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME: &str = "timestamp";
pub const SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME: &str = "is_promql";
pub const SLOW_QUERY_TABLE_PROMQL_START_COLUMN_NAME: &str = "promql_start";
pub const SLOW_QUERY_TABLE_PROMQL_END_COLUMN_NAME: &str = "promql_end";
pub const SLOW_QUERY_TABLE_PROMQL_RANGE_COLUMN_NAME: &str = "promql_range";
pub const SLOW_QUERY_TABLE_PROMQL_STEP_COLUMN_NAME: &str = "promql_step";
const DEFAULT_SLOW_QUERY_TABLE_TTL: &str = "30d";
const DEFAULT_SLOW_QUERY_EVENTS_CHANNEL_SIZE: usize = 1024;
/// SlowQueryRecorder is responsible for recording slow queries.
#[derive(Clone)]
pub struct SlowQueryRecorder {
tx: Sender<SlowQueryEvent>,
slow_query_opts: SlowQueryOptions,
_handle: Arc<JoinHandle<()>>,
}
impl SlowQueryRecorder {
/// Create a new SlowQueryRecorder.
pub fn new(
slow_query_opts: SlowQueryOptions,
inserter: InserterRef,
statement_executor: StatementExecutorRef,
catalog_manager: CatalogManagerRef,
) -> Self {
let (tx, rx) = channel(DEFAULT_SLOW_QUERY_EVENTS_CHANNEL_SIZE);
let ttl = slow_query_opts
.ttl
.clone()
.unwrap_or(DEFAULT_SLOW_QUERY_TABLE_TTL.to_string());
// Start a new task to process the slow query events.
let event_handler = SlowQueryEventHandler {
inserter,
statement_executor,
catalog_manager,
rx,
record_type: slow_query_opts.record_type,
ttl,
};
// Start a new background task to process the slow query events.
let handle = tokio::spawn(async move {
event_handler.process_slow_query().await;
});
Self {
tx,
slow_query_opts,
_handle: Arc::new(handle),
}
}
/// Starts a new SlowQueryTimer. Returns `None` if `slow_query.enable` is false.
/// The timer sets the start time when created and calculates the elapsed duration when dropped.
pub fn start(
&self,
stmt: CatalogQueryStatement,
query_ctx: QueryContextRef,
) -> Option<SlowQueryTimer> {
if self.slow_query_opts.enable {
Some(SlowQueryTimer::new(
stmt,
query_ctx,
self.slow_query_opts.threshold,
self.slow_query_opts.sample_ratio,
self.tx.clone(),
))
} else {
None
}
}
}
struct SlowQueryEventHandler {
inserter: InserterRef,
statement_executor: StatementExecutorRef,
catalog_manager: CatalogManagerRef,
rx: Receiver<SlowQueryEvent>,
record_type: SlowQueriesRecordType,
ttl: String,
}
impl SlowQueryEventHandler {
async fn process_slow_query(mut self) {
info!(
"Start the background handler to process slow query events and record them in {:?}.",
self.record_type
);
while let Some(event) = self.rx.recv().await {
self.record_slow_query(event).await;
}
}
async fn record_slow_query(&self, event: SlowQueryEvent) {
match self.record_type {
SlowQueriesRecordType::Log => {
// Record the slow query in a specific logs file.
slow!(
cost = event.cost,
threshold = event.threshold,
query = event.query,
is_promql = event.is_promql,
promql_range = event.promql_range,
promql_step = event.promql_step,
promql_start = event.promql_start,
promql_end = event.promql_end,
);
}
SlowQueriesRecordType::SystemTable => {
// Record the slow query in a system table that is stored in greptimedb itself.
if let Err(e) = self.insert_slow_query(&event).await {
error!(e; "Failed to insert slow query, query: {:?}", event);
}
}
}
}
async fn insert_slow_query(&self, event: &SlowQueryEvent) -> Result<()> {
debug!("Handle the slow query event: {:?}", event);
let table = if let Some(table) = self
.catalog_manager
.table(
event.query_ctx.current_catalog(),
DEFAULT_PRIVATE_SCHEMA_NAME,
SLOW_QUERY_TABLE_NAME,
Some(&event.query_ctx),
)
.await
.context(CatalogSnafu)?
{
table
} else {
// Create the system table if it doesn't exist.
self.create_system_table(event.query_ctx.clone()).await?
};
let insert = RowInsertRequest {
table_name: SLOW_QUERY_TABLE_NAME.to_string(),
rows: Some(Rows {
schema: self.build_insert_column_schema(),
rows: vec![Row {
values: vec![
ValueData::U64Value(event.cost).into(),
ValueData::U64Value(event.threshold).into(),
ValueData::StringValue(event.query.to_string()).into(),
ValueData::BoolValue(event.is_promql).into(),
ValueData::TimestampNanosecondValue(
Timestamp::current_time(TimeUnit::Nanosecond).value(),
)
.into(),
ValueData::U64Value(event.promql_range.unwrap_or(0)).into(),
ValueData::U64Value(event.promql_step.unwrap_or(0)).into(),
ValueData::TimestampMillisecondValue(event.promql_start.unwrap_or(0))
.into(),
ValueData::TimestampMillisecondValue(event.promql_end.unwrap_or(0)).into(),
],
}],
}),
};
let requests = RowInsertRequests {
inserts: vec![insert],
};
let table_info = table.table_info();
let query_ctx = QueryContextBuilder::default()
.current_catalog(table_info.catalog_name.to_string())
.current_schema(table_info.schema_name.to_string())
.build()
.into();
self.inserter
.handle_row_inserts(requests, query_ctx, &self.statement_executor, false, false)
.await
.context(TableOperationSnafu)?;
Ok(())
}
async fn create_system_table(&self, query_ctx: QueryContextRef) -> Result<TableRef> {
let mut create_table_expr = self.build_create_table_expr(query_ctx.current_catalog());
if let Some(table) = self
.catalog_manager
.table(
&create_table_expr.catalog_name,
&create_table_expr.schema_name,
&create_table_expr.table_name,
Some(&query_ctx),
)
.await
.context(CatalogSnafu)?
{
// The table is already created, so we don't need to create it again.
return Ok(table);
}
// Create the `slow_queries` system table.
let table = self
.statement_executor
.create_table_inner(&mut create_table_expr, None, query_ctx.clone())
.await
.context(TableOperationSnafu)?;
info!(
"Create the {} system table in {:?} successfully.",
SLOW_QUERY_TABLE_NAME, DEFAULT_PRIVATE_SCHEMA_NAME
);
Ok(table)
}
fn build_create_table_expr(&self, catalog: &str) -> CreateTableExpr {
let column_defs = vec![
ColumnDef {
name: SLOW_QUERY_TABLE_COST_COLUMN_NAME.to_string(),
data_type: ColumnDataType::Uint64 as i32,
is_nullable: false,
default_constraint: vec![],
semantic_type: SemanticType::Field as i32,
comment: "The cost of the slow query in milliseconds".to_string(),
datatype_extension: None,
options: None,
},
ColumnDef {
name: SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME.to_string(),
data_type: ColumnDataType::Uint64 as i32,
is_nullable: false,
default_constraint: vec![],
semantic_type: SemanticType::Field as i32,
comment:
"When the query cost exceeds this value, it will be recorded as a slow query"
.to_string(),
datatype_extension: None,
options: None,
},
ColumnDef {
name: SLOW_QUERY_TABLE_QUERY_COLUMN_NAME.to_string(),
data_type: ColumnDataType::String as i32,
is_nullable: false,
default_constraint: vec![],
semantic_type: SemanticType::Field as i32,
comment: "The original query statement".to_string(),
datatype_extension: None,
options: None,
},
ColumnDef {
name: SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME.to_string(),
data_type: ColumnDataType::Boolean as i32,
is_nullable: false,
default_constraint: vec![],
semantic_type: SemanticType::Field as i32,
comment: "Whether the query is a PromQL query".to_string(),
datatype_extension: None,
options: None,
},
ColumnDef {
name: SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME.to_string(),
data_type: ColumnDataType::TimestampNanosecond as i32,
is_nullable: false,
default_constraint: vec![],
semantic_type: SemanticType::Timestamp as i32,
comment: "The timestamp of the slow query".to_string(),
datatype_extension: None,
options: None,
},
ColumnDef {
name: SLOW_QUERY_TABLE_PROMQL_RANGE_COLUMN_NAME.to_string(),
data_type: ColumnDataType::Uint64 as i32,
is_nullable: false,
default_constraint: vec![],
semantic_type: SemanticType::Field as i32,
comment: "The time range of the PromQL query in milliseconds".to_string(),
datatype_extension: None,
options: None,
},
ColumnDef {
name: SLOW_QUERY_TABLE_PROMQL_STEP_COLUMN_NAME.to_string(),
data_type: ColumnDataType::Uint64 as i32,
is_nullable: false,
default_constraint: vec![],
semantic_type: SemanticType::Field as i32,
comment: "The step of the PromQL query in milliseconds".to_string(),
datatype_extension: None,
options: None,
},
ColumnDef {
name: SLOW_QUERY_TABLE_PROMQL_START_COLUMN_NAME.to_string(),
data_type: ColumnDataType::TimestampMillisecond as i32,
is_nullable: false,
default_constraint: vec![],
semantic_type: SemanticType::Field as i32,
comment: "The start timestamp of the PromQL query in milliseconds".to_string(),
datatype_extension: None,
options: None,
},
ColumnDef {
name: SLOW_QUERY_TABLE_PROMQL_END_COLUMN_NAME.to_string(),
data_type: ColumnDataType::TimestampMillisecond as i32,
is_nullable: false,
default_constraint: vec![],
semantic_type: SemanticType::Field as i32,
comment: "The end timestamp of the PromQL query in milliseconds".to_string(),
datatype_extension: None,
options: None,
},
];
let table_options = HashMap::from([
(APPEND_MODE_KEY.to_string(), "true".to_string()),
(TTL_KEY.to_string(), self.ttl.to_string()),
]);
CreateTableExpr {
catalog_name: catalog.to_string(),
schema_name: DEFAULT_PRIVATE_SCHEMA_NAME.to_string(), // Always to store in the `greptime_private` schema.
table_name: SLOW_QUERY_TABLE_NAME.to_string(),
desc: "GreptimeDB system table for storing slow queries".to_string(),
column_defs,
time_index: SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME.to_string(),
primary_keys: vec![],
create_if_not_exists: true,
table_options,
table_id: None,
engine: default_engine().to_string(),
}
}
fn build_insert_column_schema(&self) -> Vec<ColumnSchema> {
vec![
ColumnSchema {
column_name: SLOW_QUERY_TABLE_COST_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Uint64.into(),
semantic_type: SemanticType::Field.into(),
..Default::default()
},
ColumnSchema {
column_name: SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Uint64.into(),
semantic_type: SemanticType::Field.into(),
..Default::default()
},
ColumnSchema {
column_name: SLOW_QUERY_TABLE_QUERY_COLUMN_NAME.to_string(),
datatype: ColumnDataType::String.into(),
semantic_type: SemanticType::Field.into(),
..Default::default()
},
ColumnSchema {
column_name: SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Boolean.into(),
semantic_type: SemanticType::Field.into(),
..Default::default()
},
ColumnSchema {
column_name: SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME.to_string(),
datatype: ColumnDataType::TimestampNanosecond.into(),
semantic_type: SemanticType::Timestamp.into(),
..Default::default()
},
ColumnSchema {
column_name: SLOW_QUERY_TABLE_PROMQL_RANGE_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Uint64.into(),
semantic_type: SemanticType::Field.into(),
..Default::default()
},
ColumnSchema {
column_name: SLOW_QUERY_TABLE_PROMQL_STEP_COLUMN_NAME.to_string(),
datatype: ColumnDataType::Uint64.into(),
semantic_type: SemanticType::Field.into(),
..Default::default()
},
ColumnSchema {
column_name: SLOW_QUERY_TABLE_PROMQL_START_COLUMN_NAME.to_string(),
datatype: ColumnDataType::TimestampMillisecond.into(),
semantic_type: SemanticType::Field.into(),
..Default::default()
},
ColumnSchema {
column_name: SLOW_QUERY_TABLE_PROMQL_END_COLUMN_NAME.to_string(),
datatype: ColumnDataType::TimestampMillisecond.into(),
semantic_type: SemanticType::Field.into(),
..Default::default()
},
]
}
}

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use client::{Client, Database};
@@ -21,7 +22,9 @@ use common_error::ext::BoxedError;
use common_event_recorder::error::{
InsertEventsSnafu, KvBackendSnafu, NoAvailableFrontendSnafu, Result,
};
use common_event_recorder::{build_row_inserts_request, insert_hints, Event, EventHandler};
use common_event_recorder::{
build_row_inserts_request, group_events_by_type, Event, EventHandler, EventHandlerOptions,
};
use common_grpc::channel_manager::ChannelManager;
use common_meta::peer::PeerLookupServiceRef;
use common_telemetry::debug;
@@ -36,13 +39,15 @@ pub mod region_migration_event;
pub struct EventHandlerImpl {
peer_lookup_service: PeerLookupServiceRef,
channel_manager: ChannelManager,
ttl: Duration,
}
impl EventHandlerImpl {
pub fn new(meta_peer_client: MetaPeerClientRef) -> Self {
pub fn new(meta_peer_client: MetaPeerClientRef, ttl: Duration) -> Self {
Self {
peer_lookup_service: Arc::new(MetaPeerLookupService::new(meta_peer_client)),
channel_manager: ChannelManager::new(),
ttl,
}
}
}
@@ -50,15 +55,35 @@ impl EventHandlerImpl {
#[async_trait]
impl EventHandler for EventHandlerImpl {
async fn handle(&self, events: &[Box<dyn Event>]) -> Result<()> {
self.build_database_client()
.await?
.row_inserts_with_hints(build_row_inserts_request(events)?, &insert_hints())
.await
.map_err(BoxedError::new)
.context(InsertEventsSnafu)?;
let event_groups = group_events_by_type(events);
for (event_type, events) in event_groups {
let opts = self.options(event_type);
let hints = opts.to_hints();
self.build_database_client()
.await?
.row_inserts_with_hints(
build_row_inserts_request(&events)?,
&hints
.iter()
.map(|(k, v)| (*k, v.as_str()))
.collect::<Vec<_>>(),
)
.await
.map_err(BoxedError::new)
.context(InsertEventsSnafu)?;
}
Ok(())
}
fn options(&self, _event_type: &str) -> EventHandlerOptions {
EventHandlerOptions {
ttl: self.ttl,
append_mode: true,
}
}
}
impl EventHandlerImpl {

View File

@@ -195,10 +195,10 @@ impl MetasrvBuilder {
.unwrap_or_else(|| build_default_meta_peer_client(&election, &in_memory));
// Builds the event recorder to record important events and persist them as the system table.
let event_recorder = Arc::new(EventRecorderImpl::new(
Box::new(EventHandlerImpl::new(meta_peer_client.clone())),
options.event_recorder.clone(),
));
let event_recorder = Arc::new(EventRecorderImpl::new(Box::new(EventHandlerImpl::new(
meta_peer_client.clone(),
options.event_recorder.ttl,
))));
let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector::default()));
let pushers = Pushers::default();

View File

@@ -28,6 +28,7 @@ common-catalog.workspace = true
common-config.workspace = true
common-error.workspace = true
common-event-recorder.workspace = true
common-frontend.workspace = true
common-grpc.workspace = true
common-meta = { workspace = true, features = ["testing"] }
common-procedure.workspace = true

View File

@@ -330,12 +330,12 @@ impl GreptimeDbStandaloneBuilder {
wal: self.metasrv_wal_config.clone().into(),
grpc: GrpcOptions::default().with_server_addr("127.0.0.1:4001"),
// Enable slow query log with 1s threshold to run the slow query test.
slow_query: Some(SlowQueryOptions {
slow_query: SlowQueryOptions {
enable: true,
// Set the threshold to 1s to run the slow query test.
threshold: Some(Duration::from_secs(1)),
threshold: Duration::from_secs(1),
..Default::default()
}),
},
..StandaloneOptions::default()
};

View File

@@ -28,9 +28,11 @@ use common_catalog::consts::{
trace_services_table_name, DEFAULT_PRIVATE_SCHEMA_NAME, TRACE_TABLE_NAME,
};
use common_error::status_code::StatusCode as ErrorCode;
use common_frontend::slow_query_event::{
SLOW_QUERY_TABLE_NAME, SLOW_QUERY_TABLE_QUERY_COLUMN_NAME,
};
use flate2::write::GzEncoder;
use flate2::Compression;
use frontend::slow_query_recorder::{SLOW_QUERY_TABLE_NAME, SLOW_QUERY_TABLE_QUERY_COLUMN_NAME};
use log_query::{Context, Limit, LogQuery, TimeFilter};
use loki_proto::logproto::{EntryAdapter, LabelPairAdapter, PushRequest, StreamAdapter};
use loki_proto::prost_types::Timestamp;
@@ -1383,7 +1385,7 @@ enable = true
record_type = "system_table"
threshold = "1s"
sample_ratio = 1.0
ttl = "30d"
ttl = "2months 29days 2h 52m 48s"
[query]
parallelism = 0

View File

@@ -17,7 +17,7 @@ use std::collections::HashMap;
use auth::user_provider_from_option;
use chrono::{DateTime, NaiveDate, NaiveDateTime, SecondsFormat, Utc};
use common_catalog::consts::DEFAULT_PRIVATE_SCHEMA_NAME;
use frontend::slow_query_recorder::{
use common_frontend::slow_query_event::{
SLOW_QUERY_TABLE_COST_COLUMN_NAME, SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME,
SLOW_QUERY_TABLE_NAME, SLOW_QUERY_TABLE_QUERY_COLUMN_NAME,
SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME,