Compare commits

...

9 Commits

Author SHA1 Message Date
discord9
ffe84af343 chore: added stalled time window range
Signed-off-by: discord9 <discord9@163.com>
2025-06-11 20:26:46 +08:00
discord9
41c40688c4 feat: adjust flow 2025-06-11 15:40:59 +08:00
discord9
59a82a3f29 feat: truly limit time range by split window 2025-06-11 15:40:59 +08:00
discord9
0d7012a1ea feat: flownode to frontend load balance with guess 2025-06-11 15:40:59 +08:00
discord9
88da98c829 metrics: more useful metrics batching mode 2025-06-11 15:40:59 +08:00
discord9
73ce5914f2 feat: metrics 2025-06-11 15:40:59 +08:00
discord9
f5eac3528c feat: mark dirty time window 2025-06-11 15:40:59 +08:00
Lei, HUANG
9cd61d221d feat/bulk-support-flow-batch:
### Add Dirty Window Handling in Flow Module

 - **Updated `greptime-proto` Dependency**: Updated the `greptime-proto` dependency to a new revision in `Cargo.lock` and `Cargo.toml`.
 - **Flow Module Enhancements**:
   - Added `DirtyWindowRequest` handling in `flow.rs`, `node_manager.rs`, `test_util.rs`, `flownode_impl.rs`, and `server.rs`.
   - Implemented `handle_mark_window_dirty` function to manage dirty time windows.
 - **Bulk Insert Enhancements**:
   - Modified `bulk_insert.rs` to notify flownodes about dirty time windows using `update_flow_dirty_window`.
 - **Removed Unused Imports**: Cleaned up unused imports in `greptime_handler.rs`, `grpc.rs`, and `mod.rs`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-11 15:40:59 +08:00
Lei, HUANG
7527ff976e feat/bulk-support-flow-batch:
### Refactor and Enhance Timestamp Handling in gRPC and Bulk Insert

 - **Refactor Table Handling**:
   - Updated `put_record_batch` method to use `TableRef` instead of `TableId` in `grpc.rs`, `greptime_handler.rs`, and `grpc.rs`.
   - Modified `handle_bulk_insert` to accept `TableRef` and extract `TableId` internally in `bulk_insert.rs`.

 - **Enhance Timestamp Processing**:
   - Added `compute_timestamp_range` function to calculate timestamp range in `bulk_insert.rs`.
   - Introduced error handling for invalid time index types in `error.rs`.

 - **Test Adjustments**:
   - Updated `DummyInstance` implementation in `tests/mod.rs` to align with new method signatures.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-11 15:40:59 +08:00
29 changed files with 992 additions and 165 deletions

2
Cargo.lock generated
View File

@@ -5133,7 +5133,7 @@ dependencies = [
[[package]] [[package]]
name = "greptime-proto" name = "greptime-proto"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=454c52634c3bac27de10bf0d85d5533eed1cf03f#454c52634c3bac27de10bf0d85d5533eed1cf03f" source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=52083925a15d741c259800a9a54eba3467939180#52083925a15d741c259800a9a54eba3467939180"
dependencies = [ dependencies = [
"prost 0.13.5", "prost 0.13.5",
"serde", "serde",

View File

@@ -133,7 +133,7 @@ etcd-client = "0.14"
fst = "0.4.7" fst = "0.4.7"
futures = "0.3" futures = "0.3"
futures-util = "0.3" futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "454c52634c3bac27de10bf0d85d5533eed1cf03f" } greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "52083925a15d741c259800a9a54eba3467939180" }
hex = "0.4" hex = "0.4"
http = "1" http = "1"
humantime = "2.1" humantime = "2.1"

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use api::v1::flow::{FlowRequest, FlowResponse}; use api::v1::flow::{DirtyWindowRequest, DirtyWindowRequests, FlowRequest, FlowResponse};
use api::v1::region::InsertRequests; use api::v1::region::InsertRequests;
use common_error::ext::BoxedError; use common_error::ext::BoxedError;
use common_meta::node_manager::Flownode; use common_meta::node_manager::Flownode;
@@ -44,6 +44,16 @@ impl Flownode for FlowRequester {
.map_err(BoxedError::new) .map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu) .context(common_meta::error::ExternalSnafu)
} }
async fn handle_mark_window_dirty(
&self,
req: DirtyWindowRequest,
) -> common_meta::error::Result<FlowResponse> {
self.handle_mark_window_dirty(req)
.await
.map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu)
}
} }
impl FlowRequester { impl FlowRequester {
@@ -91,4 +101,20 @@ impl FlowRequester {
.into_inner(); .into_inner();
Ok(response) Ok(response)
} }
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse> {
let (addr, mut client) = self.client.raw_flow_client()?;
let response = client
.handle_mark_dirty_time_window(DirtyWindowRequests {
requests: vec![req],
})
.await
.or_else(|e| {
let code = e.code();
let err: crate::error::Error = e.into();
Err(BoxedError::new(err)).context(FlowServerSnafu { addr, code })
})?
.into_inner();
Ok(response)
}
} }

View File

@@ -0,0 +1,90 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_macro::admin_fn;
use common_query::error::{
InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::Signature;
use datafusion::logical_expr::Volatility;
use datatypes::value::{Value, ValueRef};
use session::context::QueryContextRef;
use snafu::ensure;
use store_api::storage::ConcreteDataType;
use crate::handlers::FlowServiceHandlerRef;
use crate::helper::parse_catalog_flow;
fn adjust_signature() -> Signature {
Signature::exact(
vec![
ConcreteDataType::string_datatype(), // flow name
ConcreteDataType::uint64_datatype(), // min_run_interval in seconds
ConcreteDataType::uint64_datatype(), // max filter number per query
],
Volatility::Immutable,
)
}
#[admin_fn(
name = AdjustFlowFunction,
display_name = adjust_flow,
sig_fn = adjust_signature,
ret = uint64
)]
pub(crate) async fn adjust_flow(
flow_service_handler: &FlowServiceHandlerRef,
query_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
ensure!(
params.len() == 3,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 3, have: {}",
params.len()
),
}
);
let (flow_name, min_run_interval, max_filter_num) = match (params[0], params[1], params[2]) {
(
ValueRef::String(flow_name),
ValueRef::UInt64(min_run_interval),
ValueRef::UInt64(max_filter_num),
) => (flow_name, min_run_interval, max_filter_num),
_ => {
return UnsupportedInputDataTypeSnafu {
function: "adjust_flow",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
}
};
let (catalog_name, flow_name) = parse_catalog_flow(flow_name, query_ctx)?;
let res = flow_service_handler
.adjust(
&catalog_name,
&flow_name,
min_run_interval,
max_filter_num as usize,
query_ctx.clone(),
)
.await?;
let affected_rows = res.affected_rows;
Ok(Value::from(affected_rows))
}

View File

@@ -26,6 +26,7 @@ use flush_compact_table::{CompactTableFunction, FlushTableFunction};
use migrate_region::MigrateRegionFunction; use migrate_region::MigrateRegionFunction;
use remove_region_follower::RemoveRegionFollowerFunction; use remove_region_follower::RemoveRegionFollowerFunction;
use crate::adjust_flow::AdjustFlowFunction;
use crate::flush_flow::FlushFlowFunction; use crate::flush_flow::FlushFlowFunction;
use crate::function_registry::FunctionRegistry; use crate::function_registry::FunctionRegistry;
@@ -43,5 +44,6 @@ impl AdminFunction {
registry.register_async(Arc::new(FlushTableFunction)); registry.register_async(Arc::new(FlushTableFunction));
registry.register_async(Arc::new(CompactTableFunction)); registry.register_async(Arc::new(CompactTableFunction));
registry.register_async(Arc::new(FlushFlowFunction)); registry.register_async(Arc::new(FlushFlowFunction));
registry.register_async(Arc::new(AdjustFlowFunction));
} }
} }

View File

@@ -12,21 +12,19 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use common_error::ext::BoxedError;
use common_macro::admin_fn; use common_macro::admin_fn;
use common_query::error::{ use common_query::error::{
ExecuteSnafu, InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result, InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
UnsupportedInputDataTypeSnafu,
}; };
use common_query::prelude::Signature; use common_query::prelude::Signature;
use datafusion::logical_expr::Volatility; use datafusion::logical_expr::Volatility;
use datatypes::value::{Value, ValueRef}; use datatypes::value::{Value, ValueRef};
use session::context::QueryContextRef; use session::context::QueryContextRef;
use snafu::{ensure, ResultExt}; use snafu::ensure;
use sql::parser::ParserContext;
use store_api::storage::ConcreteDataType; use store_api::storage::ConcreteDataType;
use crate::handlers::FlowServiceHandlerRef; use crate::handlers::FlowServiceHandlerRef;
use crate::helper::parse_catalog_flow;
fn flush_signature() -> Signature { fn flush_signature() -> Signature {
Signature::uniform( Signature::uniform(
@@ -47,20 +45,6 @@ pub(crate) async fn flush_flow(
query_ctx: &QueryContextRef, query_ctx: &QueryContextRef,
params: &[ValueRef<'_>], params: &[ValueRef<'_>],
) -> Result<Value> { ) -> Result<Value> {
let (catalog_name, flow_name) = parse_flush_flow(params, query_ctx)?;
let res = flow_service_handler
.flush(&catalog_name, &flow_name, query_ctx.clone())
.await?;
let affected_rows = res.affected_rows;
Ok(Value::from(affected_rows))
}
fn parse_flush_flow(
params: &[ValueRef<'_>],
query_ctx: &QueryContextRef,
) -> Result<(String, String)> {
ensure!( ensure!(
params.len() == 1, params.len() == 1,
InvalidFuncArgsSnafu { InvalidFuncArgsSnafu {
@@ -70,7 +54,6 @@ fn parse_flush_flow(
), ),
} }
); );
let ValueRef::String(flow_name) = params[0] else { let ValueRef::String(flow_name) = params[0] else {
return UnsupportedInputDataTypeSnafu { return UnsupportedInputDataTypeSnafu {
function: "flush_flow", function: "flush_flow",
@@ -78,27 +61,14 @@ fn parse_flush_flow(
} }
.fail(); .fail();
}; };
let obj_name = ParserContext::parse_table_name(flow_name, query_ctx.sql_dialect()) let (catalog_name, flow_name) = parse_catalog_flow(flow_name, query_ctx)?;
.map_err(BoxedError::new)
.context(ExecuteSnafu)?;
let (catalog_name, flow_name) = match &obj_name.0[..] { let res = flow_service_handler
[flow_name] => ( .flush(&catalog_name, &flow_name, query_ctx.clone())
query_ctx.current_catalog().to_string(), .await?;
flow_name.value.clone(), let affected_rows = res.affected_rows;
),
[catalog, flow_name] => (catalog.value.clone(), flow_name.value.clone()), Ok(Value::from(affected_rows))
_ => {
return InvalidFuncArgsSnafu {
err_msg: format!(
"expect flow name to be <catalog>.<flow-name> or <flow-name>, actual: {}",
obj_name
),
}
.fail()
}
};
Ok((catalog_name, flow_name))
} }
#[cfg(test)] #[cfg(test)]
@@ -154,10 +124,7 @@ mod test {
("catalog.flow_name", ("catalog", "flow_name")), ("catalog.flow_name", ("catalog", "flow_name")),
]; ];
for (input, expected) in testcases.iter() { for (input, expected) in testcases.iter() {
let args = vec![*input]; let result = parse_catalog_flow(input, &QueryContext::arc()).unwrap();
let args = args.into_iter().map(ValueRef::String).collect::<Vec<_>>();
let result = parse_flush_flow(&args, &QueryContext::arc()).unwrap();
assert_eq!(*expected, (result.0.as_str(), result.1.as_str())); assert_eq!(*expected, (result.0.as_str(), result.1.as_str()));
} }
} }

View File

@@ -87,6 +87,15 @@ pub trait FlowServiceHandler: Send + Sync {
flow: &str, flow: &str,
ctx: QueryContextRef, ctx: QueryContextRef,
) -> Result<api::v1::flow::FlowResponse>; ) -> Result<api::v1::flow::FlowResponse>;
async fn adjust(
&self,
catalog: &str,
flow: &str,
min_run_interval_secs: u64,
max_filter_num_per_query: usize,
ctx: QueryContextRef,
) -> Result<api::v1::flow::FlowResponse>;
} }
pub type TableMutationHandlerRef = Arc<dyn TableMutationHandler>; pub type TableMutationHandlerRef = Arc<dyn TableMutationHandler>;

View File

@@ -12,12 +12,15 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use common_query::error::{InvalidInputTypeSnafu, Result}; use common_error::ext::BoxedError;
use common_query::error::{ExecuteSnafu, InvalidFuncArgsSnafu, InvalidInputTypeSnafu, Result};
use common_query::prelude::{Signature, TypeSignature, Volatility}; use common_query::prelude::{Signature, TypeSignature, Volatility};
use datatypes::prelude::ConcreteDataType; use datatypes::prelude::ConcreteDataType;
use datatypes::types::cast::cast; use datatypes::types::cast::cast;
use datatypes::value::ValueRef; use datatypes::value::ValueRef;
use session::context::QueryContextRef;
use snafu::ResultExt; use snafu::ResultExt;
use sql::parser::ParserContext;
/// Create a function signature with oneof signatures of interleaving two arguments. /// Create a function signature with oneof signatures of interleaving two arguments.
pub fn one_of_sigs2(args1: Vec<ConcreteDataType>, args2: Vec<ConcreteDataType>) -> Signature { pub fn one_of_sigs2(args1: Vec<ConcreteDataType>, args2: Vec<ConcreteDataType>) -> Signature {
@@ -43,3 +46,30 @@ pub fn cast_u64(value: &ValueRef) -> Result<Option<u64>> {
}) })
.map(|v| v.as_u64()) .map(|v| v.as_u64())
} }
pub fn parse_catalog_flow(
flow_name: &str,
query_ctx: &QueryContextRef,
) -> Result<(String, String)> {
let obj_name = ParserContext::parse_table_name(flow_name, query_ctx.sql_dialect())
.map_err(BoxedError::new)
.context(ExecuteSnafu)?;
let (catalog_name, flow_name) = match &obj_name.0[..] {
[flow_name] => (
query_ctx.current_catalog().to_string(),
flow_name.value.clone(),
),
[catalog, flow_name] => (catalog.value.clone(), flow_name.value.clone()),
_ => {
return InvalidFuncArgsSnafu {
err_msg: format!(
"expect flow name to be <catalog>.<flow-name> or <flow-name>, actual: {}",
obj_name
),
}
.fail()
}
};
Ok((catalog_name, flow_name))
}

View File

@@ -15,6 +15,7 @@
#![feature(let_chains)] #![feature(let_chains)]
#![feature(try_blocks)] #![feature(try_blocks)]
mod adjust_flow;
mod admin; mod admin;
mod flush_flow; mod flush_flow;
mod macros; mod macros;

View File

@@ -148,6 +148,17 @@ impl FunctionState {
) -> Result<api::v1::flow::FlowResponse> { ) -> Result<api::v1::flow::FlowResponse> {
todo!() todo!()
} }
async fn adjust(
&self,
_catalog: &str,
_flow: &str,
_min_run_interval_secs: u64,
_max_filter_num_per_query: usize,
_ctx: QueryContextRef,
) -> Result<api::v1::flow::FlowResponse> {
todo!()
}
} }
Self { Self {

View File

@@ -15,7 +15,7 @@
use std::sync::Arc; use std::sync::Arc;
use api::region::RegionResponse; use api::region::RegionResponse;
use api::v1::flow::{FlowRequest, FlowResponse}; use api::v1::flow::{DirtyWindowRequest, FlowRequest, FlowResponse};
use api::v1::region::{InsertRequests, RegionRequest}; use api::v1::region::{InsertRequests, RegionRequest};
pub use common_base::AffectedRows; pub use common_base::AffectedRows;
use common_query::request::QueryRequest; use common_query::request::QueryRequest;
@@ -42,6 +42,9 @@ pub trait Flownode: Send + Sync {
async fn handle(&self, request: FlowRequest) -> Result<FlowResponse>; async fn handle(&self, request: FlowRequest) -> Result<FlowResponse>;
async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse>; async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse>;
/// Handles requests to mark time window as dirty.
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse>;
} }
pub type FlownodeRef = Arc<dyn Flownode>; pub type FlownodeRef = Arc<dyn Flownode>;

View File

@@ -15,7 +15,7 @@
use std::sync::Arc; use std::sync::Arc;
use api::region::RegionResponse; use api::region::RegionResponse;
use api::v1::flow::{FlowRequest, FlowResponse}; use api::v1::flow::{DirtyWindowRequest, FlowRequest, FlowResponse};
use api::v1::region::{InsertRequests, RegionRequest}; use api::v1::region::{InsertRequests, RegionRequest};
pub use common_base::AffectedRows; pub use common_base::AffectedRows;
use common_query::request::QueryRequest; use common_query::request::QueryRequest;
@@ -67,6 +67,14 @@ pub trait MockFlownodeHandler: Sync + Send + Clone {
) -> Result<FlowResponse> { ) -> Result<FlowResponse> {
unimplemented!() unimplemented!()
} }
async fn handle_mark_window_dirty(
&self,
_peer: &Peer,
_req: DirtyWindowRequest,
) -> Result<FlowResponse> {
unimplemented!()
}
} }
/// A mock struct implements [NodeManager] only implement the `datanode` method. /// A mock struct implements [NodeManager] only implement the `datanode` method.
@@ -134,6 +142,10 @@ impl<T: MockFlownodeHandler> Flownode for MockNode<T> {
async fn handle_inserts(&self, requests: InsertRequests) -> Result<FlowResponse> { async fn handle_inserts(&self, requests: InsertRequests) -> Result<FlowResponse> {
self.handler.handle_inserts(&self.peer, requests).await self.handler.handle_inserts(&self.peer, requests).await
} }
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse> {
self.handler.handle_mark_window_dirty(&self.peer, req).await
}
} }
#[async_trait::async_trait] #[async_trait::async_trait]

View File

@@ -61,6 +61,7 @@ prost.workspace = true
query.workspace = true query.workspace = true
rand.workspace = true rand.workspace = true
serde.workspace = true serde.workspace = true
serde_json.workspace = true
servers.workspace = true servers.workspace = true
session.workspace = true session.workspace = true
smallvec.workspace = true smallvec.workspace = true

View File

@@ -316,7 +316,7 @@ impl StreamingEngine {
); );
METRIC_FLOW_ROWS METRIC_FLOW_ROWS
.with_label_values(&["out"]) .with_label_values(&["out-streaming"])
.inc_by(total_rows as u64); .inc_by(total_rows as u64);
let now = self.tick_manager.tick(); let now = self.tick_manager.tick();

View File

@@ -18,7 +18,7 @@ use std::sync::atomic::AtomicBool;
use std::sync::Arc; use std::sync::Arc;
use api::v1::flow::{ use api::v1::flow::{
flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow, flow_request, AdjustFlow, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow,
}; };
use api::v1::region::InsertRequests; use api::v1::region::InsertRequests;
use catalog::CatalogManager; use catalog::CatalogManager;
@@ -31,7 +31,9 @@ use common_runtime::JoinHandle;
use common_telemetry::{error, info, trace, warn}; use common_telemetry::{error, info, trace, warn};
use datatypes::value::Value; use datatypes::value::Value;
use futures::TryStreamExt; use futures::TryStreamExt;
use greptime_proto::v1::flow::DirtyWindowRequest;
use itertools::Itertools; use itertools::Itertools;
use serde::{Deserialize, Serialize};
use session::context::QueryContextBuilder; use session::context::QueryContextBuilder;
use snafu::{ensure, IntoError, OptionExt, ResultExt}; use snafu::{ensure, IntoError, OptionExt, ResultExt};
use store_api::storage::{RegionId, TableId}; use store_api::storage::{RegionId, TableId};
@@ -46,7 +48,7 @@ use crate::error::{
IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu, IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu,
NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu, NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu,
}; };
use crate::metrics::METRIC_FLOW_TASK_COUNT; use crate::metrics::{METRIC_FLOW_ROWS, METRIC_FLOW_TASK_COUNT};
use crate::repr::{self, DiffRow}; use crate::repr::{self, DiffRow};
use crate::{Error, FlowId}; use crate::{Error, FlowId};
@@ -689,6 +691,9 @@ impl FlowEngine for FlowDualEngine {
let mut to_stream_engine = Vec::with_capacity(request.requests.len()); let mut to_stream_engine = Vec::with_capacity(request.requests.len());
let mut to_batch_engine = request.requests; let mut to_batch_engine = request.requests;
let mut batching_row_cnt = 0;
let mut streaming_row_cnt = 0;
{ {
// not locking this, or recover flows will be starved when also handling flow inserts // not locking this, or recover flows will be starved when also handling flow inserts
let src_table2flow = self.src_table2flow.read().await; let src_table2flow = self.src_table2flow.read().await;
@@ -698,9 +703,11 @@ impl FlowEngine for FlowDualEngine {
let is_in_stream = src_table2flow.in_stream(table_id); let is_in_stream = src_table2flow.in_stream(table_id);
let is_in_batch = src_table2flow.in_batch(table_id); let is_in_batch = src_table2flow.in_batch(table_id);
if is_in_stream { if is_in_stream {
streaming_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
to_stream_engine.push(req.clone()); to_stream_engine.push(req.clone());
} }
if is_in_batch { if is_in_batch {
batching_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
return true; return true;
} }
if !is_in_batch && !is_in_stream { if !is_in_batch && !is_in_stream {
@@ -713,6 +720,14 @@ impl FlowEngine for FlowDualEngine {
// can't use drop due to https://github.com/rust-lang/rust/pull/128846 // can't use drop due to https://github.com/rust-lang/rust/pull/128846
} }
METRIC_FLOW_ROWS
.with_label_values(&["in-streaming"])
.inc_by(streaming_row_cnt as u64);
METRIC_FLOW_ROWS
.with_label_values(&["in-batching"])
.inc_by(batching_row_cnt as u64);
let streaming_engine = self.streaming_engine.clone(); let streaming_engine = self.streaming_engine.clone();
let stream_handler: JoinHandle<Result<(), Error>> = let stream_handler: JoinHandle<Result<(), Error>> =
common_runtime::spawn_global(async move { common_runtime::spawn_global(async move {
@@ -809,6 +824,25 @@ impl common_meta::node_manager::Flownode for FlowDualEngine {
..Default::default() ..Default::default()
}) })
} }
Some(flow_request::Body::Adjust(AdjustFlow { flow_id, options })) => {
#[derive(Debug, Serialize, Deserialize)]
struct Options {
min_run_interval_secs: u64,
max_filter_num_per_query: usize,
}
let options: Options = serde_json::from_str(&options).with_context(|_| {
common_meta::error::DeserializeFromJsonSnafu { input: options }
})?;
self.batching_engine
.adjust_flow(
flow_id.unwrap().id as u64,
options.min_run_interval_secs,
options.max_filter_num_per_query,
)
.await
.map_err(to_meta_err(snafu::location!()))?;
Ok(Default::default())
}
other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(), other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
} }
} }
@@ -819,6 +853,11 @@ impl common_meta::node_manager::Flownode for FlowDualEngine {
.map(|_| Default::default()) .map(|_| Default::default())
.map_err(to_meta_err(snafu::location!())) .map_err(to_meta_err(snafu::location!()))
} }
async fn handle_mark_window_dirty(&self, _req: DirtyWindowRequest) -> MetaResult<FlowResponse> {
// todo: implement
unimplemented!()
}
} }
/// return a function to convert `crate::error::Error` to `common_meta::error::Error` /// return a function to convert `crate::error::Error` to `common_meta::error::Error`
@@ -926,6 +965,11 @@ impl common_meta::node_manager::Flownode for StreamingEngine {
.map(|_| Default::default()) .map(|_| Default::default())
.map_err(to_meta_err(snafu::location!())) .map_err(to_meta_err(snafu::location!()))
} }
async fn handle_mark_window_dirty(&self, _req: DirtyWindowRequest) -> MetaResult<FlowResponse> {
// todo: implement
unimplemented!()
}
} }
impl FlowEngine for StreamingEngine { impl FlowEngine for StreamingEngine {

View File

@@ -17,6 +17,7 @@
use std::collections::{BTreeMap, HashMap}; use std::collections::{BTreeMap, HashMap};
use std::sync::Arc; use std::sync::Arc;
use api::v1::flow::{DirtyWindowRequests, FlowResponse};
use catalog::CatalogManagerRef; use catalog::CatalogManagerRef;
use common_error::ext::BoxedError; use common_error::ext::BoxedError;
use common_meta::ddl::create_flow::FlowType; use common_meta::ddl::create_flow::FlowType;
@@ -29,8 +30,7 @@ use common_telemetry::{debug, info};
use common_time::TimeToLive; use common_time::TimeToLive;
use query::QueryEngineRef; use query::QueryEngineRef;
use snafu::{ensure, OptionExt, ResultExt}; use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionId; use store_api::storage::{RegionId, TableId};
use table::metadata::TableId;
use tokio::sync::{oneshot, RwLock}; use tokio::sync::{oneshot, RwLock};
use crate::batching_mode::frontend_client::FrontendClient; use crate::batching_mode::frontend_client::FrontendClient;
@@ -42,6 +42,7 @@ use crate::error::{
ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu, ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu,
UnexpectedSnafu, UnsupportedSnafu, UnexpectedSnafu, UnsupportedSnafu,
}; };
use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW_RANGE;
use crate::{CreateFlowArgs, Error, FlowId, TableName}; use crate::{CreateFlowArgs, Error, FlowId, TableName};
/// Batching mode Engine, responsible for driving all the batching mode tasks /// Batching mode Engine, responsible for driving all the batching mode tasks
@@ -77,6 +78,122 @@ impl BatchingEngine {
} }
} }
pub async fn handle_mark_dirty_time_window(
&self,
reqs: DirtyWindowRequests,
) -> Result<FlowResponse, Error> {
let table_info_mgr = self.table_meta.table_info_manager();
let mut group_by_table_id: HashMap<u32, Vec<_>> = HashMap::new();
for r in reqs.requests {
let tid = TableId::from(r.table_id);
let entry = group_by_table_id.entry(tid).or_default();
entry.extend(r.dirty_time_ranges);
}
let tids = group_by_table_id.keys().cloned().collect::<Vec<TableId>>();
let table_infos =
table_info_mgr
.batch_get(&tids)
.await
.with_context(|_| TableNotFoundMetaSnafu {
msg: format!("Failed to get table info for table ids: {:?}", tids),
})?;
let group_by_table_name = group_by_table_id
.into_iter()
.filter_map(|(id, rows)| {
let table_name = table_infos.get(&id).map(|info| info.table_name());
let Some(table_name) = table_name else {
warn!("Failed to get table infos for table id: {:?}", id);
return None;
};
let table_name = [
table_name.catalog_name,
table_name.schema_name,
table_name.table_name,
];
let schema = &table_infos.get(&id).unwrap().table_info.meta.schema;
let time_index_unit = schema.column_schemas[schema.timestamp_index.unwrap()]
.data_type
.as_timestamp()
.unwrap()
.unit();
Some((table_name, (rows, time_index_unit)))
})
.collect::<HashMap<_, _>>();
let group_by_table_name = Arc::new(group_by_table_name);
let mut handles = Vec::new();
let tasks = self.tasks.read().await;
for (_flow_id, task) in tasks.iter() {
let src_table_names = &task.config.source_table_names;
if src_table_names
.iter()
.all(|name| !group_by_table_name.contains_key(name))
{
continue;
}
let group_by_table_name = group_by_table_name.clone();
let task = task.clone();
let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
let src_table_names = &task.config.source_table_names;
let mut all_dirty_windows = vec![];
for src_table_name in src_table_names {
if let Some((window_ranges, unit)) = group_by_table_name.get(src_table_name) {
let Some(expr) = &task.config.time_window_expr else {
continue;
};
for window in window_ranges {
let align_start = expr
.eval(common_time::Timestamp::new(window.start_value, *unit))?
.0
.context(UnexpectedSnafu {
reason: "Failed to eval start value",
})?;
let align_end = expr
.eval(common_time::Timestamp::new(window.end_value, *unit))?
.1
.context(UnexpectedSnafu {
reason: "Failed to eval end value",
})?;
all_dirty_windows.push((align_start, align_end));
}
}
}
let mut state = task.state.write().unwrap();
let flow_id_label = task.config.flow_id.to_string();
for (s, e) in all_dirty_windows {
METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW_RANGE
.with_label_values(&[&flow_id_label])
.observe(e.sub(&s).unwrap_or_default().num_seconds() as f64);
state.dirty_time_windows.add_window(s, Some(e));
}
Ok(())
});
handles.push(handle);
}
drop(tasks);
for handle in handles {
match handle.await {
Err(e) => {
warn!("Failed to handle inserts: {e}");
}
Ok(Ok(())) => (),
Ok(Err(e)) => {
warn!("Failed to handle inserts: {e}");
}
}
}
Ok(Default::default())
}
pub async fn handle_inserts_inner( pub async fn handle_inserts_inner(
&self, &self,
request: api::v1::region::InsertRequests, request: api::v1::region::InsertRequests,
@@ -388,6 +505,20 @@ impl BatchingEngine {
pub async fn flow_exist_inner(&self, flow_id: FlowId) -> bool { pub async fn flow_exist_inner(&self, flow_id: FlowId) -> bool {
self.tasks.read().await.contains_key(&flow_id) self.tasks.read().await.contains_key(&flow_id)
} }
pub async fn adjust_flow(
&self,
flow_id: FlowId,
min_run_interval_secs: u64,
max_filter_num_per_query: usize,
) -> Result<(), Error> {
let task = self.tasks.read().await.get(&flow_id).cloned();
let task = task.with_context(|| FlowNotFoundSnafu { id: flow_id })?;
debug!("Adjusting flow {flow_id} with min_run_interval_secs={} and max_filter_num_per_query={}", min_run_interval_secs, max_filter_num_per_query);
task.adjust(min_run_interval_secs, max_filter_num_per_query);
Ok(())
}
} }
impl FlowEngine for BatchingEngine { impl FlowEngine for BatchingEngine {

View File

@@ -14,8 +14,9 @@
//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user //! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
use std::sync::{Arc, Weak}; use std::collections::HashMap;
use std::time::SystemTime; use std::sync::{Arc, Mutex, Weak};
use std::time::{Duration, Instant, SystemTime};
use api::v1::greptime_request::Request; use api::v1::greptime_request::Request;
use api::v1::CreateTableExpr; use api::v1::CreateTableExpr;
@@ -26,20 +27,21 @@ use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
use common_meta::peer::Peer; use common_meta::peer::Peer;
use common_meta::rpc::store::RangeRequest; use common_meta::rpc::store::RangeRequest;
use common_query::Output; use common_query::Output;
use common_telemetry::warn; use common_telemetry::{debug, warn};
use itertools::Itertools;
use meta_client::client::MetaClient; use meta_client::client::MetaClient;
use rand::rng;
use rand::seq::SliceRandom;
use servers::query_handler::grpc::GrpcQueryHandler; use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::{QueryContextBuilder, QueryContextRef}; use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{OptionExt, ResultExt}; use snafu::{OptionExt, ResultExt};
use crate::batching_mode::task::BatchingTask;
use crate::batching_mode::{ use crate::batching_mode::{
DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, FRONTEND_ACTIVITY_TIMEOUT, GRPC_CONN_TIMEOUT, DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, FRONTEND_ACTIVITY_TIMEOUT, GRPC_CONN_TIMEOUT,
GRPC_MAX_RETRIES, GRPC_MAX_RETRIES,
}; };
use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu}; use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu};
use crate::{Error, FlowAuthHeader}; use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD;
use crate::{Error, FlowAuthHeader, FlowId};
/// Just like [`GrpcQueryHandler`] but use BoxedError /// Just like [`GrpcQueryHandler`] but use BoxedError
/// ///
@@ -74,6 +76,105 @@ impl<
type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>; type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>;
/// Statistics about running query on this frontend from flownode
#[derive(Debug, Default, Clone)]
struct FrontendStat {
/// The query for flow id has been running since this timestamp
since: HashMap<FlowId, Instant>,
/// The average query time for each flow id
/// This is used to calculate the average query time for each flow id
past_query_avg: HashMap<FlowId, (usize, Duration)>,
}
#[derive(Debug, Default, Clone)]
pub struct FrontendStats {
/// The statistics for each flow id
stats: Arc<Mutex<HashMap<String, FrontendStat>>>,
}
impl FrontendStats {
pub fn observe(&self, frontend_addr: &str, flow_id: FlowId) -> FrontendStatsGuard {
let mut stats = self.stats.lock().expect("Failed to lock frontend stats");
let stat = stats.entry(frontend_addr.to_string()).or_default();
stat.since.insert(flow_id, Instant::now());
FrontendStatsGuard {
stats: self.stats.clone(),
frontend_addr: frontend_addr.to_string(),
cur: flow_id,
}
}
/// return frontend addrs sorted by load, from lightest to heaviest
/// The load is calculated as the total average query time for each flow id plus running query's total running time elapsed
pub fn sort_by_load(&self) -> Vec<String> {
let stats = self.stats.lock().expect("Failed to lock frontend stats");
let fe_load_factor = stats
.iter()
.map(|(node_addr, stat)| {
// total expected avg running time for all currently running queries
let total_expect_avg_run_time = stat
.since
.keys()
.map(|f| {
let (count, total_duration) =
stat.past_query_avg.get(f).unwrap_or(&(0, Duration::ZERO));
if *count == 0 {
0.0
} else {
total_duration.as_secs_f64() / *count as f64
}
})
.sum::<f64>();
let total_cur_running_time = stat
.since
.values()
.map(|since| since.elapsed().as_secs_f64())
.sum::<f64>();
(
node_addr.to_string(),
total_expect_avg_run_time + total_cur_running_time,
)
})
.sorted_by(|(_, load_a), (_, load_b)| {
load_a
.partial_cmp(load_b)
.unwrap_or(std::cmp::Ordering::Equal)
})
.collect::<Vec<_>>();
debug!("Frontend load factor: {:?}", fe_load_factor);
for (node_addr, load) in &fe_load_factor {
METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD
.with_label_values(&[&node_addr.to_string()])
.observe(*load);
}
fe_load_factor
.into_iter()
.map(|(addr, _)| addr)
.collect::<Vec<_>>()
}
}
pub struct FrontendStatsGuard {
stats: Arc<Mutex<HashMap<String, FrontendStat>>>,
frontend_addr: String,
cur: FlowId,
}
impl Drop for FrontendStatsGuard {
fn drop(&mut self) {
let mut stats = self.stats.lock().expect("Failed to lock frontend stats");
if let Some(stat) = stats.get_mut(&self.frontend_addr) {
if let Some(since) = stat.since.remove(&self.cur) {
let elapsed = since.elapsed();
let (count, total_duration) = stat.past_query_avg.entry(self.cur).or_default();
*count += 1;
*total_duration += elapsed;
}
}
}
}
/// A simple frontend client able to execute sql using grpc protocol /// A simple frontend client able to execute sql using grpc protocol
/// ///
/// This is for computation-heavy query which need to offload computation to frontend, lifting the load from flownode /// This is for computation-heavy query which need to offload computation to frontend, lifting the load from flownode
@@ -83,6 +184,7 @@ pub enum FrontendClient {
meta_client: Arc<MetaClient>, meta_client: Arc<MetaClient>,
chnl_mgr: ChannelManager, chnl_mgr: ChannelManager,
auth: Option<FlowAuthHeader>, auth: Option<FlowAuthHeader>,
fe_stats: FrontendStats,
}, },
Standalone { Standalone {
/// for the sake of simplicity still use grpc even in standalone mode /// for the sake of simplicity still use grpc even in standalone mode
@@ -114,6 +216,7 @@ impl FrontendClient {
ChannelManager::with_config(cfg) ChannelManager::with_config(cfg)
}, },
auth, auth,
fe_stats: Default::default(),
} }
} }
@@ -192,6 +295,7 @@ impl FrontendClient {
meta_client: _, meta_client: _,
chnl_mgr, chnl_mgr,
auth, auth,
fe_stats,
} = self } = self
else { else {
return UnexpectedSnafu { return UnexpectedSnafu {
@@ -208,8 +312,21 @@ impl FrontendClient {
.duration_since(SystemTime::UNIX_EPOCH) .duration_since(SystemTime::UNIX_EPOCH)
.unwrap() .unwrap()
.as_millis() as i64; .as_millis() as i64;
// shuffle the frontends to avoid always pick the same one let node_addrs_by_load = fe_stats.sort_by_load();
frontends.shuffle(&mut rng()); // index+1 to load order asc, so that the lightest node has load 1 and non-existent node has load 0
let addr2load = node_addrs_by_load
.iter()
.enumerate()
.map(|(i, id)| (id.clone(), i + 1))
.collect::<HashMap<_, _>>();
// sort frontends by load, from lightest to heaviest
frontends.sort_by(|(_, a), (_, b)| {
// if not even in stats, treat as 0 load since never been queried
let load_a = addr2load.get(&a.peer.addr).unwrap_or(&0);
let load_b = addr2load.get(&b.peer.addr).unwrap_or(&0);
load_a.cmp(load_b)
});
debug!("Frontend nodes sorted by load: {:?}", frontends);
// found node with maximum last_activity_ts // found node with maximum last_activity_ts
for (_, node_info) in frontends for (_, node_info) in frontends
@@ -257,6 +374,7 @@ impl FrontendClient {
create: CreateTableExpr, create: CreateTableExpr,
catalog: &str, catalog: &str,
schema: &str, schema: &str,
task: Option<&BatchingTask>,
) -> Result<u32, Error> { ) -> Result<u32, Error> {
self.handle( self.handle(
Request::Ddl(api::v1::DdlRequest { Request::Ddl(api::v1::DdlRequest {
@@ -265,6 +383,7 @@ impl FrontendClient {
catalog, catalog,
schema, schema,
&mut None, &mut None,
task,
) )
.await .await
} }
@@ -276,15 +395,19 @@ impl FrontendClient {
catalog: &str, catalog: &str,
schema: &str, schema: &str,
peer_desc: &mut Option<PeerDesc>, peer_desc: &mut Option<PeerDesc>,
task: Option<&BatchingTask>,
) -> Result<u32, Error> { ) -> Result<u32, Error> {
match self { match self {
FrontendClient::Distributed { .. } => { FrontendClient::Distributed { fe_stats, .. } => {
let db = self.get_random_active_frontend(catalog, schema).await?; let db = self.get_random_active_frontend(catalog, schema).await?;
*peer_desc = Some(PeerDesc::Dist { *peer_desc = Some(PeerDesc::Dist {
peer: db.peer.clone(), peer: db.peer.clone(),
}); });
let flow_id = task.map(|t| t.config.flow_id).unwrap_or_default();
let _guard = fe_stats.observe(&db.peer.addr, flow_id);
db.database db.database
.handle_with_retry(req.clone(), GRPC_MAX_RETRIES) .handle_with_retry(req.clone(), GRPC_MAX_RETRIES)
.await .await

View File

@@ -31,7 +31,9 @@ use crate::batching_mode::time_window::TimeWindowExpr;
use crate::batching_mode::MIN_REFRESH_DURATION; use crate::batching_mode::MIN_REFRESH_DURATION;
use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu}; use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
use crate::metrics::{ use crate::metrics::{
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE,
METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT,
METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE,
}; };
use crate::{Error, FlowId}; use crate::{Error, FlowId};
@@ -52,6 +54,11 @@ pub struct TaskState {
pub(crate) shutdown_rx: oneshot::Receiver<()>, pub(crate) shutdown_rx: oneshot::Receiver<()>,
/// Task handle /// Task handle
pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>, pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
/// min run interval in seconds
pub(crate) min_run_interval: Option<u64>,
/// max filter number per query
pub(crate) max_filter_num: Option<usize>,
} }
impl TaskState { impl TaskState {
pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self { pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
@@ -63,6 +70,8 @@ impl TaskState {
exec_state: ExecState::Idle, exec_state: ExecState::Idle,
shutdown_rx, shutdown_rx,
task_handle: None, task_handle: None,
min_run_interval: None,
max_filter_num: None,
} }
} }
@@ -86,24 +95,21 @@ impl TaskState {
/// TODO: Make this behavior configurable. /// TODO: Make this behavior configurable.
pub fn get_next_start_query_time( pub fn get_next_start_query_time(
&self, &self,
flow_id: FlowId, _flow_id: FlowId,
time_window_size: &Option<Duration>, _time_window_size: &Option<Duration>,
max_timeout: Option<Duration>, max_timeout: Option<Duration>,
) -> Instant { ) -> Instant {
let last_duration = max_timeout let next_duration = max_timeout
.unwrap_or(self.last_query_duration) .unwrap_or(self.last_query_duration)
.min(self.last_query_duration) .min(self.last_query_duration)
.max(MIN_REFRESH_DURATION); .max(
self.min_run_interval
let next_duration = time_window_size .map(Duration::from_secs)
.map(|t| { .unwrap_or(MIN_REFRESH_DURATION),
let half = t / 2; );
half.max(last_duration)
})
.unwrap_or(last_duration);
// if have dirty time window, execute immediately to clean dirty time window // if have dirty time window, execute immediately to clean dirty time window
if self.dirty_time_windows.windows.is_empty() { /*if self.dirty_time_windows.windows.is_empty() {
self.last_update_time + next_duration self.last_update_time + next_duration
} else { } else {
debug!( debug!(
@@ -113,7 +119,10 @@ impl TaskState {
self.dirty_time_windows.windows self.dirty_time_windows.windows
); );
Instant::now() Instant::now()
} }*/
// wait for next duration anyway
self.last_update_time + next_duration
} }
} }
@@ -206,47 +215,63 @@ impl DirtyTimeWindows {
// get the first `window_cnt` time windows // get the first `window_cnt` time windows
let max_time_range = window_size * window_cnt as i32; let max_time_range = window_size * window_cnt as i32;
let nth = {
let mut cur_time_range = chrono::Duration::zero();
let mut nth_key = None;
for (idx, (start, end)) in self.windows.iter().enumerate() {
// if time range is too long, stop
if cur_time_range > max_time_range {
nth_key = Some(*start);
break;
}
// if we have enough time windows, stop let mut to_be_query = BTreeMap::new();
if idx >= window_cnt { let mut new_windows = self.windows.clone();
nth_key = Some(*start); let mut cur_time_range = chrono::Duration::zero();
break; for (idx, (start, end)) in self.windows.iter().enumerate() {
} let first_end = start
.add_duration(window_size.to_std().unwrap())
.context(TimeSnafu)?;
let end = end.unwrap_or(first_end);
if let Some(end) = end { // if time range is too long, stop
if let Some(x) = end.sub(start) { if cur_time_range >= max_time_range {
cur_time_range += x; break;
}
}
} }
nth_key // if we have enough time windows, stop
}; if idx >= window_cnt {
let first_nth = { break;
if let Some(nth) = nth {
let mut after = self.windows.split_off(&nth);
std::mem::swap(&mut self.windows, &mut after);
after
} else {
std::mem::take(&mut self.windows)
} }
};
if let Some(x) = end.sub(start) {
if cur_time_range + x <= max_time_range {
to_be_query.insert(*start, Some(end));
new_windows.remove(start);
cur_time_range += x;
} else {
// too large a window, split it
// split at window_size * times
let surplus = max_time_range - cur_time_range;
let times = surplus.num_seconds() / window_size.num_seconds();
let split_offset = window_size * times as i32;
let split_at = start
.add_duration(split_offset.to_std().unwrap())
.context(TimeSnafu)?;
to_be_query.insert(*start, Some(split_at));
// remove the original window
new_windows.remove(start);
new_windows.insert(split_at, Some(end));
cur_time_range += split_offset;
break;
}
}
}
self.windows = new_windows;
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
.with_label_values(&[flow_id.to_string().as_str()]) .with_label_values(&[flow_id.to_string().as_str()])
.observe(first_nth.len() as f64); .observe(to_be_query.len() as f64);
let full_time_range = first_nth METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT
.with_label_values(&[flow_id.to_string().as_str()])
.observe(self.windows.len() as f64);
let full_time_range = to_be_query
.iter() .iter()
.fold(chrono::Duration::zero(), |acc, (start, end)| { .fold(chrono::Duration::zero(), |acc, (start, end)| {
if let Some(end) = end { if let Some(end) = end {
@@ -256,12 +281,27 @@ impl DirtyTimeWindows {
} }
}) })
.num_seconds() as f64; .num_seconds() as f64;
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE
.with_label_values(&[flow_id.to_string().as_str()]) .with_label_values(&[flow_id.to_string().as_str()])
.observe(full_time_range); .observe(full_time_range);
let stalled_time_range =
self.windows
.iter()
.fold(chrono::Duration::zero(), |acc, (start, end)| {
if let Some(end) = end {
acc + end.sub(start).unwrap_or(chrono::Duration::zero())
} else {
acc
}
});
METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE
.with_label_values(&[flow_id.to_string().as_str()])
.observe(stalled_time_range.num_seconds() as f64);
let mut expr_lst = vec![]; let mut expr_lst = vec![];
for (start, end) in first_nth.into_iter() { for (start, end) in to_be_query.into_iter() {
// align using time window exprs // align using time window exprs
let (start, end) = if let Some(ctx) = task_ctx { let (start, end) = if let Some(ctx) = task_ctx {
let Some(time_window_expr) = &ctx.config.time_window_expr else { let Some(time_window_expr) = &ctx.config.time_window_expr else {
@@ -495,6 +535,64 @@ mod test {
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))", "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))",
) )
), ),
// split range
(
Vec::from_iter((0..20).map(|i|Timestamp::new_second(i*3)).chain(std::iter::once(
Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
))),
(chrono::Duration::seconds(3), None),
BTreeMap::from([
(
Timestamp::new_second(0),
Some(Timestamp::new_second(
60
)),
),
(
Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
Some(Timestamp::new_second(
60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1) + 3
)),
)]),
Some(
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
)
),
// split 2 min into 1 min
(
Vec::from_iter((0..40).map(|i|Timestamp::new_second(i*3))),
(chrono::Duration::seconds(3), None),
BTreeMap::from([
(
Timestamp::new_second(0),
Some(Timestamp::new_second(
40 * 3
)),
)]),
Some(
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
)
),
// split 3s + 1min into 3s + 57s
(
Vec::from_iter(std::iter::once(Timestamp::new_second(0)).chain((0..40).map(|i|Timestamp::new_second(20+i*3)))),
(chrono::Duration::seconds(3), None),
BTreeMap::from([
(
Timestamp::new_second(0),
Some(Timestamp::new_second(
3
)),
),(
Timestamp::new_second(20),
Some(Timestamp::new_second(
140
)),
)]),
Some(
"(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:03' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:00:20' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:17' AS TIMESTAMP))))",
)
),
// expired // expired
( (
vec![ vec![
@@ -511,6 +609,8 @@ mod test {
None None
), ),
]; ];
// let len = testcases.len();
// let testcases = testcases[(len - 2)..(len - 1)].to_vec();
for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in
testcases testcases
{ {

View File

@@ -61,7 +61,9 @@ use crate::error::{
SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu, SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
}; };
use crate::metrics::{ use crate::metrics::{
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME,
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT,
METRIC_FLOW_ROWS,
}; };
use crate::{Error, FlowId}; use crate::{Error, FlowId};
@@ -144,6 +146,12 @@ impl BatchingTask {
}) })
} }
pub fn adjust(&self, min_run_interval_secs: u64, max_filter_num_per_query: usize) {
let mut state = self.state.write().unwrap();
state.min_run_interval = Some(min_run_interval_secs);
state.max_filter_num = Some(max_filter_num_per_query);
}
/// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set) /// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set)
/// ///
/// useful for flush_flow to flush dirty time windows range /// useful for flush_flow to flush dirty time windows range
@@ -280,7 +288,7 @@ impl BatchingTask {
let catalog = &self.config.sink_table_name[0]; let catalog = &self.config.sink_table_name[0];
let schema = &self.config.sink_table_name[1]; let schema = &self.config.sink_table_name[1];
frontend_client frontend_client
.create(expr.clone(), catalog, schema) .create(expr.clone(), catalog, schema, Some(self))
.await?; .await?;
Ok(()) Ok(())
} }
@@ -361,7 +369,7 @@ impl BatchingTask {
}; };
frontend_client frontend_client
.handle(req, catalog, schema, &mut peer_desc) .handle(req, catalog, schema, &mut peer_desc, Some(self))
.await .await
}; };
@@ -371,6 +379,9 @@ impl BatchingTask {
"Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}", "Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}",
elapsed elapsed
); );
METRIC_FLOW_ROWS
.with_label_values(&[format!("{}-out-batching", flow_id).as_str()])
.inc_by(*affected_rows as _);
} else if let Err(err) = &res { } else if let Err(err) = &res {
warn!( warn!(
"Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}", "Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
@@ -410,6 +421,7 @@ impl BatchingTask {
engine: QueryEngineRef, engine: QueryEngineRef,
frontend_client: Arc<FrontendClient>, frontend_client: Arc<FrontendClient>,
) { ) {
let flow_id_str = self.config.flow_id.to_string();
loop { loop {
// first check if shutdown signal is received // first check if shutdown signal is received
// if so, break the loop // if so, break the loop
@@ -427,6 +439,9 @@ impl BatchingTask {
Err(TryRecvError::Empty) => (), Err(TryRecvError::Empty) => (),
} }
} }
METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT
.with_label_values(&[&flow_id_str])
.inc();
let new_query = match self.gen_insert_plan(&engine).await { let new_query = match self.gen_insert_plan(&engine).await {
Ok(new_query) => new_query, Ok(new_query) => new_query,
@@ -473,6 +488,9 @@ impl BatchingTask {
} }
// TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed // TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
Err(err) => { Err(err) => {
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
.with_label_values(&[&flow_id_str])
.inc();
match new_query { match new_query {
Some(query) => { Some(query) => {
common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id) common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id)
@@ -580,19 +598,20 @@ impl BatchingTask {
), ),
})?; })?;
let expr = self let expr = {
.state let mut state = self.state.write().unwrap();
.write() let max_window_cnt = state
.unwrap() .max_filter_num
.dirty_time_windows .unwrap_or(DirtyTimeWindows::MAX_FILTER_NUM);
.gen_filter_exprs( state.dirty_time_windows.gen_filter_exprs(
&col_name, &col_name,
Some(l), Some(l),
window_size, window_size,
DirtyTimeWindows::MAX_FILTER_NUM, max_window_cnt,
self.config.flow_id, self.config.flow_id,
Some(self), Some(self),
)?; )?
};
debug!( debug!(
"Flow id={:?}, Generated filter expr: {:?}", "Flow id={:?}, Generated filter expr: {:?}",

View File

@@ -42,6 +42,14 @@ lazy_static! {
vec![60., 2. * 60., 3. * 60., 5. * 60., 10. * 60.] vec![60., 2. * 60., 3. * 60., 5. * 60., 10. * 60.]
) )
.unwrap(); .unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT: HistogramVec =
register_histogram_vec!(
"greptime_flow_batching_engine_stalled_query_window_cnt",
"flow batching engine stalled query time window count",
&["flow_id"],
vec![0.0, 5., 10., 20., 40.]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT: HistogramVec = pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT: HistogramVec =
register_histogram_vec!( register_histogram_vec!(
"greptime_flow_batching_engine_query_window_cnt", "greptime_flow_batching_engine_query_window_cnt",
@@ -50,19 +58,57 @@ lazy_static! {
vec![0.0, 5., 10., 20., 40.] vec![0.0, 5., 10., 20., 40.]
) )
.unwrap(); .unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE: HistogramVec = pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE: HistogramVec =
register_histogram_vec!( register_histogram_vec!(
"greptime_flow_batching_engine_query_time_range_secs", "greptime_flow_batching_engine_query_window_size_secs",
"flow batching engine query time range(seconds)", "flow batching engine query window size(seconds)",
&["flow_id"], &["flow_id"],
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.] vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
) )
.unwrap(); .unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE: HistogramVec =
register_histogram_vec!(
"greptime_flow_batching_engine_stalled_window_size_secs",
"flow batching engine stalled window size(seconds)",
&["flow_id"],
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW_RANGE: HistogramVec =
register_histogram_vec!(
"greptime_flow_batching_engine_bulk_mark_time_window_range_secs",
"flow batching engine query time window range marked by bulk memtable in seconds",
&["flow_id"],
vec![0.0, 60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT: IntCounterVec =
register_int_counter_vec!(
"greptime_flow_batching_start_query_count",
"flow batching engine started query count",
&["flow_id"],
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT: IntCounterVec =
register_int_counter_vec!(
"greptime_flow_batching_error_count",
"flow batching engine error count per flow id",
&["flow_id"]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD: HistogramVec =
register_histogram_vec!(
"greptime_flow_batching_engine_guess_fe_load",
"flow batching engine guessed frontend load",
&["fe_addr"],
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
)
.unwrap();
pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge = pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge =
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap(); register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!( pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(
"greptime_flow_processed_rows", "greptime_flow_processed_rows",
"Count of rows flowing through the system", "Count of rows flowing through the system.",
&["direction"] &["direction"]
) )
.unwrap(); .unwrap();

View File

@@ -17,6 +17,7 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use api::v1::flow::DirtyWindowRequests;
use api::v1::{RowDeleteRequests, RowInsertRequests}; use api::v1::{RowDeleteRequests, RowInsertRequests};
use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME}; use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
use catalog::CatalogManagerRef; use catalog::CatalogManagerRef;
@@ -136,6 +137,18 @@ impl flow_server::Flow for FlowService {
.map(Response::new) .map(Response::new)
.map_err(to_status_with_last_err) .map_err(to_status_with_last_err)
} }
async fn handle_mark_dirty_time_window(
&self,
reqs: Request<DirtyWindowRequests>,
) -> Result<Response<FlowResponse>, Status> {
self.dual_engine
.batching_engine()
.handle_mark_dirty_time_window(reqs.into_inner())
.await
.map(Response::new)
.map_err(to_status_with_last_err)
}
} }
#[derive(Clone)] #[derive(Clone)]

View File

@@ -35,8 +35,8 @@ use servers::query_handler::grpc::GrpcQueryHandler;
use servers::query_handler::sql::SqlQueryHandler; use servers::query_handler::sql::SqlQueryHandler;
use session::context::QueryContextRef; use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt}; use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::TableId;
use table::table_name::TableName; use table::table_name::TableName;
use table::TableRef;
use crate::error::{ use crate::error::{
CatalogSnafu, DataFusionSnafu, Error, InFlightWriteBytesExceededSnafu, CatalogSnafu, DataFusionSnafu, Error, InFlightWriteBytesExceededSnafu,
@@ -235,34 +235,33 @@ impl GrpcQueryHandler for Instance {
async fn put_record_batch( async fn put_record_batch(
&self, &self,
table: &TableName, table_name: &TableName,
table_id: &mut Option<TableId>, table_ref: &mut Option<TableRef>,
decoder: &mut FlightDecoder, decoder: &mut FlightDecoder,
data: FlightData, data: FlightData,
) -> Result<AffectedRows> { ) -> Result<AffectedRows> {
let table_id = if let Some(table_id) = table_id { let table = if let Some(table) = table_ref {
*table_id table.clone()
} else { } else {
let table = self let table = self
.catalog_manager() .catalog_manager()
.table( .table(
&table.catalog_name, &table_name.catalog_name,
&table.schema_name, &table_name.schema_name,
&table.table_name, &table_name.table_name,
None, None,
) )
.await .await
.context(CatalogSnafu)? .context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu { .with_context(|| TableNotFoundSnafu {
table_name: table.to_string(), table_name: table_name.to_string(),
})?; })?;
let id = table.table_info().table_id(); *table_ref = Some(table.clone());
*table_id = Some(id); table
id
}; };
self.inserter self.inserter
.handle_bulk_insert(table_id, decoder, data) .handle_bulk_insert(table, decoder, data)
.await .await
.context(TableOperationSnafu) .context(TableOperationSnafu)
} }

View File

@@ -12,18 +12,28 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::collections::HashSet;
use ahash::{HashMap, HashMapExt}; use ahash::{HashMap, HashMapExt};
use api::v1::flow::{DirtyWindowRequest, WindowRange};
use api::v1::region::{ use api::v1::region::{
bulk_insert_request, region_request, BulkInsertRequest, RegionRequest, RegionRequestHeader, bulk_insert_request, region_request, BulkInsertRequest, RegionRequest, RegionRequestHeader,
}; };
use api::v1::ArrowIpc; use api::v1::ArrowIpc;
use arrow::array::{
Array, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray,
};
use arrow::datatypes::{DataType, Int64Type, TimeUnit};
use arrow::record_batch::RecordBatch;
use common_base::AffectedRows; use common_base::AffectedRows;
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage}; use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
use common_grpc::FlightData; use common_grpc::FlightData;
use common_telemetry::error;
use common_telemetry::tracing_context::TracingContext; use common_telemetry::tracing_context::TracingContext;
use snafu::ResultExt; use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId; use store_api::storage::{RegionId, TableId};
use table::metadata::TableId; use table::TableRef;
use crate::insert::Inserter; use crate::insert::Inserter;
use crate::{error, metrics}; use crate::{error, metrics};
@@ -32,10 +42,11 @@ impl Inserter {
/// Handle bulk insert request. /// Handle bulk insert request.
pub async fn handle_bulk_insert( pub async fn handle_bulk_insert(
&self, &self,
table_id: TableId, table: TableRef,
decoder: &mut FlightDecoder, decoder: &mut FlightDecoder,
data: FlightData, data: FlightData,
) -> error::Result<AffectedRows> { ) -> error::Result<AffectedRows> {
let table_id = table.table_info().table_id();
let decode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED let decode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
.with_label_values(&["decode_request"]) .with_label_values(&["decode_request"])
.start_timer(); .start_timer();
@@ -48,6 +59,20 @@ impl Inserter {
return Ok(0); return Ok(0);
}; };
decode_timer.observe_duration(); decode_timer.observe_duration();
if let Some((min, max)) = compute_timestamp_range(
&record_batch,
&table
.table_info()
.meta
.schema
.timestamp_column()
.as_ref()
.unwrap()
.name,
)? {
// notify flownode to update dirty time windows.
self.update_flow_dirty_window(table_id, min, max);
}
metrics::BULK_REQUEST_MESSAGE_SIZE.observe(body_size as f64); metrics::BULK_REQUEST_MESSAGE_SIZE.observe(body_size as f64);
metrics::BULK_REQUEST_ROWS metrics::BULK_REQUEST_ROWS
.with_label_values(&["raw"]) .with_label_values(&["raw"])
@@ -216,4 +241,88 @@ impl Inserter {
crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(rows_inserted as u64); crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(rows_inserted as u64);
Ok(rows_inserted) Ok(rows_inserted)
} }
fn update_flow_dirty_window(&self, table_id: TableId, min: i64, max: i64) {
let table_flownode_set_cache = self.table_flownode_set_cache.clone();
let node_manager = self.node_manager.clone();
common_runtime::spawn_global(async move {
let result = table_flownode_set_cache
.get(table_id)
.await
.context(error::RequestInsertsSnafu);
let flownodes = match result {
Ok(flownodes) => flownodes.unwrap_or_default(),
Err(e) => {
error!(e; "Failed to get flownodes for table id: {}", table_id);
return;
}
};
let peers: HashSet<_> = flownodes.values().cloned().collect();
for peer in peers {
let node_manager = node_manager.clone();
common_runtime::spawn_global(async move {
if let Err(e) = node_manager
.flownode(&peer)
.await
.handle_mark_window_dirty(DirtyWindowRequest {
table_id,
dirty_time_ranges: vec![WindowRange {
start_value: min,
end_value: max,
}],
})
.await
.context(error::RequestInsertsSnafu)
{
error!(e; "Failed to mark time window as dirty, table: {}, min: {}, max: {}", table_id, min, max);
}
});
}
});
}
}
/// Calculate the timestamp range of record batch. Return `None` if record batch is empty.
fn compute_timestamp_range(
rb: &RecordBatch,
timestamp_index_name: &str,
) -> error::Result<Option<(i64, i64)>> {
let ts_col = rb
.column_by_name(timestamp_index_name)
.context(error::ColumnNotFoundSnafu {
msg: timestamp_index_name,
})?;
if rb.num_rows() == 0 {
return Ok(None);
}
let primitive = match ts_col.data_type() {
DataType::Timestamp(unit, _) => match unit {
TimeUnit::Second => ts_col
.as_any()
.downcast_ref::<TimestampSecondArray>()
.unwrap()
.reinterpret_cast::<Int64Type>(),
TimeUnit::Millisecond => ts_col
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.reinterpret_cast::<Int64Type>(),
TimeUnit::Microsecond => ts_col
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap()
.reinterpret_cast::<Int64Type>(),
TimeUnit::Nanosecond => ts_col
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap()
.reinterpret_cast::<Int64Type>(),
},
t => {
return error::InvalidTimeIndexTypeSnafu { ty: t.clone() }.fail();
}
};
Ok(arrow::compute::min(&primitive).zip(arrow::compute::max(&primitive)))
} }

View File

@@ -837,6 +837,13 @@ pub enum Error {
#[snafu(implicit)] #[snafu(implicit)]
location: Location, location: Location,
}, },
#[snafu(display("Invalid time index type: {}", ty))]
InvalidTimeIndexType {
ty: arrow::datatypes::DataType,
#[snafu(implicit)]
location: Location,
},
} }
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
@@ -964,6 +971,7 @@ impl ErrorExt for Error {
Error::ColumnOptions { source, .. } => source.status_code(), Error::ColumnOptions { source, .. } => source.status_code(),
Error::DecodeFlightData { source, .. } => source.status_code(), Error::DecodeFlightData { source, .. } => source.status_code(),
Error::ComputeArrow { .. } => StatusCode::Internal, Error::ComputeArrow { .. } => StatusCode::Internal,
Error::InvalidTimeIndexType { .. } => StatusCode::InvalidArguments,
} }
} }

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use api::v1::flow::FlowRequestHeader; use api::v1::flow::{AdjustFlow, FlowRequestHeader};
use async_trait::async_trait; use async_trait::async_trait;
use common_error::ext::BoxedError; use common_error::ext::BoxedError;
use common_function::handlers::FlowServiceHandler; use common_function::handlers::FlowServiceHandler;
@@ -22,6 +22,7 @@ use common_query::error::Result;
use common_telemetry::tracing_context::TracingContext; use common_telemetry::tracing_context::TracingContext;
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use futures::StreamExt; use futures::StreamExt;
use serde_json::json;
use session::context::QueryContextRef; use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt}; use snafu::{OptionExt, ResultExt};
@@ -57,9 +58,96 @@ impl FlowServiceHandler for FlowServiceOperator {
) -> Result<api::v1::flow::FlowResponse> { ) -> Result<api::v1::flow::FlowResponse> {
self.flush_inner(catalog, flow, ctx).await self.flush_inner(catalog, flow, ctx).await
} }
async fn adjust(
&self,
catalog: &str,
flow: &str,
min_run_interval_secs: u64,
max_filter_num_per_query: usize,
ctx: QueryContextRef,
) -> Result<api::v1::flow::FlowResponse> {
self.adjust_inner(
catalog,
flow,
min_run_interval_secs,
max_filter_num_per_query,
ctx,
)
.await
}
} }
impl FlowServiceOperator { impl FlowServiceOperator {
async fn adjust_inner(
&self,
catalog: &str,
flow: &str,
min_run_interval_secs: u64,
max_filter_num_per_query: usize,
ctx: QueryContextRef,
) -> Result<api::v1::flow::FlowResponse> {
let id = self
.flow_metadata_manager
.flow_name_manager()
.get(catalog, flow)
.await
.map_err(BoxedError::new)
.context(common_query::error::ExecuteSnafu)?
.context(common_meta::error::FlowNotFoundSnafu {
flow_name: format!("{}.{}", catalog, flow),
})
.map_err(BoxedError::new)
.context(common_query::error::ExecuteSnafu)?
.flow_id();
let all_flownode_peers = self
.flow_metadata_manager
.flow_route_manager()
.routes(id)
.await
.map_err(BoxedError::new)
.context(common_query::error::ExecuteSnafu)?;
// order of flownodes doesn't matter here
let all_flow_nodes = FuturesUnordered::from_iter(
all_flownode_peers
.iter()
.map(|(_key, peer)| self.node_manager.flownode(peer.peer())),
)
.collect::<Vec<_>>()
.await;
// TODO(discord9): use proper type for flow options
let options = json!({
"min_run_interval_secs": min_run_interval_secs,
"max_filter_num_per_query": max_filter_num_per_query,
});
for node in all_flow_nodes {
let _res = {
use api::v1::flow::{flow_request, FlowRequest};
let flush_req = FlowRequest {
header: Some(FlowRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
query_context: Some(
common_meta::rpc::ddl::QueryContext::from(ctx.clone()).into(),
),
}),
body: Some(flow_request::Body::Adjust(AdjustFlow {
flow_id: Some(api::v1::FlowId { id }),
options: options.to_string(),
})),
};
node.handle(flush_req)
.await
.map_err(BoxedError::new)
.context(common_query::error::ExecuteSnafu)?
};
}
Ok(Default::default())
}
/// Flush the flownodes according to the flow id. /// Flush the flownodes according to the flow id.
async fn flush_inner( async fn flush_inner(
&self, &self,

View File

@@ -78,7 +78,7 @@ pub struct Inserter {
catalog_manager: CatalogManagerRef, catalog_manager: CatalogManagerRef,
pub(crate) partition_manager: PartitionRuleManagerRef, pub(crate) partition_manager: PartitionRuleManagerRef,
pub(crate) node_manager: NodeManagerRef, pub(crate) node_manager: NodeManagerRef,
table_flownode_set_cache: TableFlownodeSetCacheRef, pub(crate) table_flownode_set_cache: TableFlownodeSetCacheRef,
} }
pub type InserterRef = Arc<Inserter>; pub type InserterRef = Arc<Inserter>;

View File

@@ -40,7 +40,7 @@ use futures_util::StreamExt;
use session::context::{QueryContext, QueryContextBuilder, QueryContextRef}; use session::context::{QueryContext, QueryContextBuilder, QueryContextRef};
use session::hints::READ_PREFERENCE_HINT; use session::hints::READ_PREFERENCE_HINT;
use snafu::{OptionExt, ResultExt}; use snafu::{OptionExt, ResultExt};
use table::metadata::TableId; use table::TableRef;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use crate::error::Error::UnsupportedAuthScheme; use crate::error::Error::UnsupportedAuthScheme;
@@ -149,8 +149,8 @@ impl GreptimeRequestHandler {
.clone() .clone()
.unwrap_or_else(common_runtime::global_runtime); .unwrap_or_else(common_runtime::global_runtime);
runtime.spawn(async move { runtime.spawn(async move {
// Cached table id // Cached table ref
let mut table_id: Option<TableId> = None; let mut table_ref: Option<TableRef> = None;
let mut decoder = FlightDecoder::default(); let mut decoder = FlightDecoder::default();
while let Some(request) = stream.next().await { while let Some(request) = stream.next().await {
@@ -169,7 +169,7 @@ impl GreptimeRequestHandler {
let timer = metrics::GRPC_BULK_INSERT_ELAPSED.start_timer(); let timer = metrics::GRPC_BULK_INSERT_ELAPSED.start_timer();
let result = handler let result = handler
.put_record_batch(&table_name, &mut table_id, &mut decoder, data) .put_record_batch(&table_name, &mut table_ref, &mut decoder, data)
.await .await
.inspect_err(|e| error!(e; "Failed to handle flight record batches")); .inspect_err(|e| error!(e; "Failed to handle flight record batches"));
timer.observe_duration(); timer.observe_duration();

View File

@@ -23,8 +23,8 @@ use common_grpc::flight::FlightDecoder;
use common_query::Output; use common_query::Output;
use session::context::QueryContextRef; use session::context::QueryContextRef;
use snafu::ResultExt; use snafu::ResultExt;
use table::metadata::TableId;
use table::table_name::TableName; use table::table_name::TableName;
use table::TableRef;
use crate::error::{self, Result}; use crate::error::{self, Result};
@@ -45,8 +45,8 @@ pub trait GrpcQueryHandler {
async fn put_record_batch( async fn put_record_batch(
&self, &self,
table: &TableName, table_name: &TableName,
table_id: &mut Option<TableId>, table_ref: &mut Option<TableRef>,
decoder: &mut FlightDecoder, decoder: &mut FlightDecoder,
flight_data: FlightData, flight_data: FlightData,
) -> std::result::Result<AffectedRows, Self::Error>; ) -> std::result::Result<AffectedRows, Self::Error>;
@@ -77,13 +77,13 @@ where
async fn put_record_batch( async fn put_record_batch(
&self, &self,
table: &TableName, table_name: &TableName,
table_id: &mut Option<TableId>, table_ref: &mut Option<TableRef>,
decoder: &mut FlightDecoder, decoder: &mut FlightDecoder,
data: FlightData, data: FlightData,
) -> Result<AffectedRows> { ) -> Result<AffectedRows> {
self.0 self.0
.put_record_batch(table, table_id, decoder, data) .put_record_batch(table_name, table_ref, decoder, data)
.await .await
.map_err(BoxedError::new) .map_err(BoxedError::new)
.context(error::ExecuteGrpcRequestSnafu) .context(error::ExecuteGrpcRequestSnafu)

View File

@@ -34,7 +34,6 @@ use servers::query_handler::sql::{ServerSqlQueryHandlerRef, SqlQueryHandler};
use session::context::QueryContextRef; use session::context::QueryContextRef;
use snafu::ensure; use snafu::ensure;
use sql::statements::statement::Statement; use sql::statements::statement::Statement;
use table::metadata::TableId;
use table::table_name::TableName; use table::table_name::TableName;
use table::TableRef; use table::TableRef;
@@ -160,15 +159,11 @@ impl GrpcQueryHandler for DummyInstance {
async fn put_record_batch( async fn put_record_batch(
&self, &self,
table: &TableName, _table_name: &TableName,
table_id: &mut Option<TableId>, _table_ref: &mut Option<TableRef>,
decoder: &mut FlightDecoder, _decoder: &mut FlightDecoder,
data: FlightData, _data: FlightData,
) -> std::result::Result<AffectedRows, Self::Error> { ) -> std::result::Result<AffectedRows, Self::Error> {
let _ = table;
let _ = data;
let _ = table_id;
let _ = decoder;
unimplemented!() unimplemented!()
} }
} }