mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-23 14:40:01 +00:00
Compare commits
3 Commits
docs/vecto
...
basic_with
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
af90fad499 | ||
|
|
22c61432f6 | ||
|
|
91f373e66e |
@@ -316,7 +316,7 @@ impl StreamingEngine {
|
|||||||
);
|
);
|
||||||
|
|
||||||
METRIC_FLOW_ROWS
|
METRIC_FLOW_ROWS
|
||||||
.with_label_values(&["out"])
|
.with_label_values(&["out-streaming"])
|
||||||
.inc_by(total_rows as u64);
|
.inc_by(total_rows as u64);
|
||||||
|
|
||||||
let now = self.tick_manager.tick();
|
let now = self.tick_manager.tick();
|
||||||
|
|||||||
@@ -46,7 +46,7 @@ use crate::error::{
|
|||||||
IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu,
|
IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu,
|
||||||
NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu,
|
NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu,
|
||||||
};
|
};
|
||||||
use crate::metrics::METRIC_FLOW_TASK_COUNT;
|
use crate::metrics::{METRIC_FLOW_ROWS, METRIC_FLOW_TASK_COUNT};
|
||||||
use crate::repr::{self, DiffRow};
|
use crate::repr::{self, DiffRow};
|
||||||
use crate::{Error, FlowId};
|
use crate::{Error, FlowId};
|
||||||
|
|
||||||
@@ -689,6 +689,9 @@ impl FlowEngine for FlowDualEngine {
|
|||||||
let mut to_stream_engine = Vec::with_capacity(request.requests.len());
|
let mut to_stream_engine = Vec::with_capacity(request.requests.len());
|
||||||
let mut to_batch_engine = request.requests;
|
let mut to_batch_engine = request.requests;
|
||||||
|
|
||||||
|
let mut batching_row_cnt = 0;
|
||||||
|
let mut streaming_row_cnt = 0;
|
||||||
|
|
||||||
{
|
{
|
||||||
// not locking this, or recover flows will be starved when also handling flow inserts
|
// not locking this, or recover flows will be starved when also handling flow inserts
|
||||||
let src_table2flow = self.src_table2flow.read().await;
|
let src_table2flow = self.src_table2flow.read().await;
|
||||||
@@ -698,9 +701,11 @@ impl FlowEngine for FlowDualEngine {
|
|||||||
let is_in_stream = src_table2flow.in_stream(table_id);
|
let is_in_stream = src_table2flow.in_stream(table_id);
|
||||||
let is_in_batch = src_table2flow.in_batch(table_id);
|
let is_in_batch = src_table2flow.in_batch(table_id);
|
||||||
if is_in_stream {
|
if is_in_stream {
|
||||||
|
streaming_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
|
||||||
to_stream_engine.push(req.clone());
|
to_stream_engine.push(req.clone());
|
||||||
}
|
}
|
||||||
if is_in_batch {
|
if is_in_batch {
|
||||||
|
batching_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
if !is_in_batch && !is_in_stream {
|
if !is_in_batch && !is_in_stream {
|
||||||
@@ -713,6 +718,14 @@ impl FlowEngine for FlowDualEngine {
|
|||||||
// can't use drop due to https://github.com/rust-lang/rust/pull/128846
|
// can't use drop due to https://github.com/rust-lang/rust/pull/128846
|
||||||
}
|
}
|
||||||
|
|
||||||
|
METRIC_FLOW_ROWS
|
||||||
|
.with_label_values(&["in-streaming"])
|
||||||
|
.inc_by(streaming_row_cnt as u64);
|
||||||
|
|
||||||
|
METRIC_FLOW_ROWS
|
||||||
|
.with_label_values(&["in-batching"])
|
||||||
|
.inc_by(batching_row_cnt as u64);
|
||||||
|
|
||||||
let streaming_engine = self.streaming_engine.clone();
|
let streaming_engine = self.streaming_engine.clone();
|
||||||
let stream_handler: JoinHandle<Result<(), Error>> =
|
let stream_handler: JoinHandle<Result<(), Error>> =
|
||||||
common_runtime::spawn_global(async move {
|
common_runtime::spawn_global(async move {
|
||||||
|
|||||||
@@ -42,6 +42,7 @@ use crate::error::{
|
|||||||
ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu,
|
ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu,
|
||||||
UnexpectedSnafu, UnsupportedSnafu,
|
UnexpectedSnafu, UnsupportedSnafu,
|
||||||
};
|
};
|
||||||
|
use crate::metrics::METRIC_FLOW_ROWS;
|
||||||
use crate::{CreateFlowArgs, Error, FlowId, TableName};
|
use crate::{CreateFlowArgs, Error, FlowId, TableName};
|
||||||
|
|
||||||
/// Batching mode Engine, responsible for driving all the batching mode tasks
|
/// Batching mode Engine, responsible for driving all the batching mode tasks
|
||||||
@@ -155,6 +156,10 @@ impl BatchingEngine {
|
|||||||
let Some(expr) = &task.config.time_window_expr else {
|
let Some(expr) = &task.config.time_window_expr else {
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
|
let row_cnt: usize = entry.iter().map(|rows| rows.rows.len()).sum();
|
||||||
|
METRIC_FLOW_ROWS
|
||||||
|
.with_label_values(&[&format!("{}-batching-in", task.config.flow_id)])
|
||||||
|
.inc_by(row_cnt as u64);
|
||||||
let involved_time_windows = expr.handle_rows(entry.clone()).await?;
|
let involved_time_windows = expr.handle_rows(entry.clone()).await?;
|
||||||
let mut state = task.state.write().unwrap();
|
let mut state = task.state.write().unwrap();
|
||||||
state
|
state
|
||||||
|
|||||||
@@ -32,6 +32,7 @@ use crate::batching_mode::MIN_REFRESH_DURATION;
|
|||||||
use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
|
use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
|
||||||
use crate::metrics::{
|
use crate::metrics::{
|
||||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT,
|
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT,
|
||||||
|
METRIC_FLOW_BATCHING_ENGINE_WAIT_TIME,
|
||||||
};
|
};
|
||||||
use crate::{Error, FlowId};
|
use crate::{Error, FlowId};
|
||||||
|
|
||||||
@@ -102,8 +103,19 @@ impl TaskState {
|
|||||||
})
|
})
|
||||||
.unwrap_or(last_duration);
|
.unwrap_or(last_duration);
|
||||||
|
|
||||||
|
METRIC_FLOW_BATCHING_ENGINE_WAIT_TIME
|
||||||
|
.with_label_values(&[
|
||||||
|
flow_id.to_string().as_str(),
|
||||||
|
time_window_size
|
||||||
|
.unwrap_or_default()
|
||||||
|
.as_secs_f64()
|
||||||
|
.to_string()
|
||||||
|
.as_str(),
|
||||||
|
])
|
||||||
|
.observe(next_duration.as_secs_f64());
|
||||||
|
|
||||||
// if have dirty time window, execute immediately to clean dirty time window
|
// if have dirty time window, execute immediately to clean dirty time window
|
||||||
if self.dirty_time_windows.windows.is_empty() {
|
/*if self.dirty_time_windows.windows.is_empty() {
|
||||||
self.last_update_time + next_duration
|
self.last_update_time + next_duration
|
||||||
} else {
|
} else {
|
||||||
debug!(
|
debug!(
|
||||||
@@ -113,7 +125,9 @@ impl TaskState {
|
|||||||
self.dirty_time_windows.windows
|
self.dirty_time_windows.windows
|
||||||
);
|
);
|
||||||
Instant::now()
|
Instant::now()
|
||||||
}
|
}*/
|
||||||
|
|
||||||
|
self.last_update_time + next_duration
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -206,47 +220,59 @@ impl DirtyTimeWindows {
|
|||||||
|
|
||||||
// get the first `window_cnt` time windows
|
// get the first `window_cnt` time windows
|
||||||
let max_time_range = window_size * window_cnt as i32;
|
let max_time_range = window_size * window_cnt as i32;
|
||||||
let nth = {
|
|
||||||
|
let mut to_be_query = BTreeMap::new();
|
||||||
|
let mut new_windows = self.windows.clone();
|
||||||
let mut cur_time_range = chrono::Duration::zero();
|
let mut cur_time_range = chrono::Duration::zero();
|
||||||
let mut nth_key = None;
|
|
||||||
for (idx, (start, end)) in self.windows.iter().enumerate() {
|
for (idx, (start, end)) in self.windows.iter().enumerate() {
|
||||||
|
let first_end = start
|
||||||
|
.add_duration(window_size.to_std().unwrap())
|
||||||
|
.context(TimeSnafu)?;
|
||||||
|
let end = end.unwrap_or(first_end);
|
||||||
|
|
||||||
// if time range is too long, stop
|
// if time range is too long, stop
|
||||||
if cur_time_range > max_time_range {
|
if cur_time_range >= max_time_range {
|
||||||
nth_key = Some(*start);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// if we have enough time windows, stop
|
// if we have enough time windows, stop
|
||||||
if idx >= window_cnt {
|
if idx >= window_cnt {
|
||||||
nth_key = Some(*start);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(end) = end {
|
|
||||||
if let Some(x) = end.sub(start) {
|
if let Some(x) = end.sub(start) {
|
||||||
|
if cur_time_range + x <= max_time_range {
|
||||||
|
to_be_query.insert(*start, Some(end));
|
||||||
|
new_windows.remove(start);
|
||||||
cur_time_range += x;
|
cur_time_range += x;
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
nth_key
|
|
||||||
};
|
|
||||||
let first_nth = {
|
|
||||||
if let Some(nth) = nth {
|
|
||||||
let mut after = self.windows.split_off(&nth);
|
|
||||||
std::mem::swap(&mut self.windows, &mut after);
|
|
||||||
|
|
||||||
after
|
|
||||||
} else {
|
} else {
|
||||||
std::mem::take(&mut self.windows)
|
// too large a window, split it
|
||||||
|
// split at window_size * times
|
||||||
|
let surplus = max_time_range - cur_time_range;
|
||||||
|
let times = surplus.num_seconds() / window_size.num_seconds();
|
||||||
|
|
||||||
|
let split_offset = window_size * times as i32;
|
||||||
|
let split_at = start
|
||||||
|
.add_duration(split_offset.to_std().unwrap())
|
||||||
|
.context(TimeSnafu)?;
|
||||||
|
to_be_query.insert(*start, Some(split_at));
|
||||||
|
|
||||||
|
// remove the original window
|
||||||
|
new_windows.remove(start);
|
||||||
|
new_windows.insert(split_at, Some(end));
|
||||||
|
cur_time_range += split_offset;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
};
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.windows = new_windows;
|
||||||
|
|
||||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
|
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
|
||||||
.with_label_values(&[flow_id.to_string().as_str()])
|
.with_label_values(&[flow_id.to_string().as_str()])
|
||||||
.observe(first_nth.len() as f64);
|
.observe(to_be_query.len() as f64);
|
||||||
|
|
||||||
let full_time_range = first_nth
|
let full_time_range = to_be_query
|
||||||
.iter()
|
.iter()
|
||||||
.fold(chrono::Duration::zero(), |acc, (start, end)| {
|
.fold(chrono::Duration::zero(), |acc, (start, end)| {
|
||||||
if let Some(end) = end {
|
if let Some(end) = end {
|
||||||
@@ -261,7 +287,7 @@ impl DirtyTimeWindows {
|
|||||||
.observe(full_time_range);
|
.observe(full_time_range);
|
||||||
|
|
||||||
let mut expr_lst = vec![];
|
let mut expr_lst = vec![];
|
||||||
for (start, end) in first_nth.into_iter() {
|
for (start, end) in to_be_query.into_iter() {
|
||||||
// align using time window exprs
|
// align using time window exprs
|
||||||
let (start, end) = if let Some(ctx) = task_ctx {
|
let (start, end) = if let Some(ctx) = task_ctx {
|
||||||
let Some(time_window_expr) = &ctx.config.time_window_expr else {
|
let Some(time_window_expr) = &ctx.config.time_window_expr else {
|
||||||
@@ -495,6 +521,64 @@ mod test {
|
|||||||
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))",
|
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))",
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
|
// split range
|
||||||
|
(
|
||||||
|
Vec::from_iter((0..20).map(|i|Timestamp::new_second(i*3)).chain(std::iter::once(
|
||||||
|
Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
|
||||||
|
))),
|
||||||
|
(chrono::Duration::seconds(3), None),
|
||||||
|
BTreeMap::from([
|
||||||
|
(
|
||||||
|
Timestamp::new_second(0),
|
||||||
|
Some(Timestamp::new_second(
|
||||||
|
60
|
||||||
|
)),
|
||||||
|
),
|
||||||
|
(
|
||||||
|
Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
|
||||||
|
Some(Timestamp::new_second(
|
||||||
|
60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1) + 3
|
||||||
|
)),
|
||||||
|
)]),
|
||||||
|
Some(
|
||||||
|
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
|
||||||
|
)
|
||||||
|
),
|
||||||
|
// split 2 min into 1 min
|
||||||
|
(
|
||||||
|
Vec::from_iter((0..40).map(|i|Timestamp::new_second(i*3))),
|
||||||
|
(chrono::Duration::seconds(3), None),
|
||||||
|
BTreeMap::from([
|
||||||
|
(
|
||||||
|
Timestamp::new_second(0),
|
||||||
|
Some(Timestamp::new_second(
|
||||||
|
40 * 3
|
||||||
|
)),
|
||||||
|
)]),
|
||||||
|
Some(
|
||||||
|
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
|
||||||
|
)
|
||||||
|
),
|
||||||
|
// split 3s + 1min into 3s + 57s
|
||||||
|
(
|
||||||
|
Vec::from_iter(std::iter::once(Timestamp::new_second(0)).chain((0..40).map(|i|Timestamp::new_second(20+i*3)))),
|
||||||
|
(chrono::Duration::seconds(3), None),
|
||||||
|
BTreeMap::from([
|
||||||
|
(
|
||||||
|
Timestamp::new_second(0),
|
||||||
|
Some(Timestamp::new_second(
|
||||||
|
3
|
||||||
|
)),
|
||||||
|
),(
|
||||||
|
Timestamp::new_second(20),
|
||||||
|
Some(Timestamp::new_second(
|
||||||
|
140
|
||||||
|
)),
|
||||||
|
)]),
|
||||||
|
Some(
|
||||||
|
"(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:03' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:00:20' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:17' AS TIMESTAMP))))",
|
||||||
|
)
|
||||||
|
),
|
||||||
// expired
|
// expired
|
||||||
(
|
(
|
||||||
vec![
|
vec![
|
||||||
@@ -511,6 +595,8 @@ mod test {
|
|||||||
None
|
None
|
||||||
),
|
),
|
||||||
];
|
];
|
||||||
|
// let len = testcases.len();
|
||||||
|
// let testcases = testcases[(len - 2)..(len - 1)].to_vec();
|
||||||
for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in
|
for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in
|
||||||
testcases
|
testcases
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -61,7 +61,9 @@ use crate::error::{
|
|||||||
SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
|
SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
|
||||||
};
|
};
|
||||||
use crate::metrics::{
|
use crate::metrics::{
|
||||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
|
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME,
|
||||||
|
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT,
|
||||||
|
METRIC_FLOW_ROWS,
|
||||||
};
|
};
|
||||||
use crate::{Error, FlowId};
|
use crate::{Error, FlowId};
|
||||||
|
|
||||||
@@ -371,6 +373,9 @@ impl BatchingTask {
|
|||||||
"Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}",
|
"Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}",
|
||||||
elapsed
|
elapsed
|
||||||
);
|
);
|
||||||
|
METRIC_FLOW_ROWS
|
||||||
|
.with_label_values(&[format!("{}-out-batching", flow_id).as_str()])
|
||||||
|
.inc_by(*affected_rows as _);
|
||||||
} else if let Err(err) = &res {
|
} else if let Err(err) = &res {
|
||||||
warn!(
|
warn!(
|
||||||
"Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
|
"Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
|
||||||
@@ -410,6 +415,7 @@ impl BatchingTask {
|
|||||||
engine: QueryEngineRef,
|
engine: QueryEngineRef,
|
||||||
frontend_client: Arc<FrontendClient>,
|
frontend_client: Arc<FrontendClient>,
|
||||||
) {
|
) {
|
||||||
|
let flow_id_str = self.config.flow_id.to_string();
|
||||||
loop {
|
loop {
|
||||||
// first check if shutdown signal is received
|
// first check if shutdown signal is received
|
||||||
// if so, break the loop
|
// if so, break the loop
|
||||||
@@ -427,6 +433,9 @@ impl BatchingTask {
|
|||||||
Err(TryRecvError::Empty) => (),
|
Err(TryRecvError::Empty) => (),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT
|
||||||
|
.with_label_values(&[&flow_id_str])
|
||||||
|
.inc();
|
||||||
|
|
||||||
let new_query = match self.gen_insert_plan(&engine).await {
|
let new_query = match self.gen_insert_plan(&engine).await {
|
||||||
Ok(new_query) => new_query,
|
Ok(new_query) => new_query,
|
||||||
@@ -473,6 +482,9 @@ impl BatchingTask {
|
|||||||
}
|
}
|
||||||
// TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
|
// TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
|
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
|
||||||
|
.with_label_values(&[&flow_id_str])
|
||||||
|
.inc();
|
||||||
match new_query {
|
match new_query {
|
||||||
Some(query) => {
|
Some(query) => {
|
||||||
common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id)
|
common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id)
|
||||||
|
|||||||
@@ -35,6 +35,13 @@ lazy_static! {
|
|||||||
vec![0.0, 5., 10., 20., 40., 80., 160., 320., 640.,]
|
vec![0.0, 5., 10., 20., 40., 80., 160., 320., 640.,]
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
pub static ref METRIC_FLOW_BATCHING_ENGINE_WAIT_TIME: HistogramVec = register_histogram_vec!(
|
||||||
|
"greptime_flow_batching_engine_wait_time_secs",
|
||||||
|
"flow batching engine wait time between query(seconds)",
|
||||||
|
&["flow_id", "time_window_size"],
|
||||||
|
vec![0.0, 5., 10., 20., 40., 80., 160., 320., 640.,]
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY: HistogramVec = register_histogram_vec!(
|
pub static ref METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY: HistogramVec = register_histogram_vec!(
|
||||||
"greptime_flow_batching_engine_slow_query_secs",
|
"greptime_flow_batching_engine_slow_query_secs",
|
||||||
"flow batching engine slow query(seconds)",
|
"flow batching engine slow query(seconds)",
|
||||||
@@ -58,6 +65,20 @@ lazy_static! {
|
|||||||
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
|
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
pub static ref METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT: IntCounterVec =
|
||||||
|
register_int_counter_vec!(
|
||||||
|
"greptime_flow_batching_start_query_count",
|
||||||
|
"flow batching engine started query count",
|
||||||
|
&["flow_id"],
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
pub static ref METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT: IntCounterVec =
|
||||||
|
register_int_counter_vec!(
|
||||||
|
"greptime_flow_batching_error_count",
|
||||||
|
"flow batching engine error count per flow id",
|
||||||
|
&["flow_id"],
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge =
|
pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge =
|
||||||
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
|
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
|
||||||
pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(
|
pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(
|
||||||
|
|||||||
Reference in New Issue
Block a user