mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-23 06:30:05 +00:00
Compare commits
6 Commits
feature/df
...
flow/min_o
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c8fde8112a | ||
|
|
4534e4c31d | ||
|
|
3d8278dc4c | ||
|
|
e4328380b2 | ||
|
|
e962076207 | ||
|
|
9ef8ba6460 |
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -5133,7 +5133,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "greptime-proto"
|
name = "greptime-proto"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=454c52634c3bac27de10bf0d85d5533eed1cf03f#454c52634c3bac27de10bf0d85d5533eed1cf03f"
|
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=2dca1dc67862d7b410838aef81232274c019b3f6#2dca1dc67862d7b410838aef81232274c019b3f6"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"prost 0.13.5",
|
"prost 0.13.5",
|
||||||
"serde",
|
"serde",
|
||||||
|
|||||||
@@ -133,7 +133,7 @@ etcd-client = "0.14"
|
|||||||
fst = "0.4.7"
|
fst = "0.4.7"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
futures-util = "0.3"
|
futures-util = "0.3"
|
||||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "454c52634c3bac27de10bf0d85d5533eed1cf03f" }
|
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2dca1dc67862d7b410838aef81232274c019b3f6" }
|
||||||
hex = "0.4"
|
hex = "0.4"
|
||||||
http = "1"
|
http = "1"
|
||||||
humantime = "2.1"
|
humantime = "2.1"
|
||||||
|
|||||||
@@ -232,6 +232,7 @@
|
|||||||
| `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
|
| `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
|
||||||
| `grpc.server_addr` | String | `127.0.0.1:4001` | The address advertised to the metasrv, and used for connections from outside the host.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `grpc.bind_addr`. |
|
| `grpc.server_addr` | String | `127.0.0.1:4001` | The address advertised to the metasrv, and used for connections from outside the host.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `grpc.bind_addr`. |
|
||||||
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
|
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
|
||||||
|
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for frontend side Arrow IPC service. Available options:<br/>- `none`: disable all compression<br/>- `transport`: only enable gRPC transport compression (zstd)<br/>- `arrow_ipc`: only enable Arrow IPC compression (lz4)<br/>- `all`: enable all compression. |
|
||||||
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
|
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
|
||||||
| `grpc.tls.mode` | String | `disable` | TLS mode. |
|
| `grpc.tls.mode` | String | `disable` | TLS mode. |
|
||||||
| `grpc.tls.cert_path` | String | Unset | Certificate file path. |
|
| `grpc.tls.cert_path` | String | Unset | Certificate file path. |
|
||||||
@@ -404,6 +405,7 @@
|
|||||||
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
|
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
|
||||||
| `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
|
| `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
|
||||||
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
|
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
|
||||||
|
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for datanode side Arrow IPC service. Available options:<br/>- `none`: disable all compression<br/>- `transport`: only enable gRPC transport compression (zstd)<br/>- `arrow_ipc`: only enable Arrow IPC compression (lz4)<br/>- `all`: enable all compression. |
|
||||||
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
|
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
|
||||||
| `grpc.tls.mode` | String | `disable` | TLS mode. |
|
| `grpc.tls.mode` | String | `disable` | TLS mode. |
|
||||||
| `grpc.tls.cert_path` | String | Unset | Certificate file path. |
|
| `grpc.tls.cert_path` | String | Unset | Certificate file path. |
|
||||||
|
|||||||
@@ -44,6 +44,12 @@ runtime_size = 8
|
|||||||
max_recv_message_size = "512MB"
|
max_recv_message_size = "512MB"
|
||||||
## The maximum send message size for gRPC server.
|
## The maximum send message size for gRPC server.
|
||||||
max_send_message_size = "512MB"
|
max_send_message_size = "512MB"
|
||||||
|
## Compression mode for datanode side Arrow IPC service. Available options:
|
||||||
|
## - `none`: disable all compression
|
||||||
|
## - `transport`: only enable gRPC transport compression (zstd)
|
||||||
|
## - `arrow_ipc`: only enable Arrow IPC compression (lz4)
|
||||||
|
## - `all`: enable all compression.
|
||||||
|
flight_compression = "arrow_ipc"
|
||||||
|
|
||||||
## gRPC server TLS options, see `mysql.tls` section.
|
## gRPC server TLS options, see `mysql.tls` section.
|
||||||
[grpc.tls]
|
[grpc.tls]
|
||||||
|
|||||||
@@ -54,6 +54,12 @@ bind_addr = "127.0.0.1:4001"
|
|||||||
server_addr = "127.0.0.1:4001"
|
server_addr = "127.0.0.1:4001"
|
||||||
## The number of server worker threads.
|
## The number of server worker threads.
|
||||||
runtime_size = 8
|
runtime_size = 8
|
||||||
|
## Compression mode for frontend side Arrow IPC service. Available options:
|
||||||
|
## - `none`: disable all compression
|
||||||
|
## - `transport`: only enable gRPC transport compression (zstd)
|
||||||
|
## - `arrow_ipc`: only enable Arrow IPC compression (lz4)
|
||||||
|
## - `all`: enable all compression.
|
||||||
|
flight_compression = "arrow_ipc"
|
||||||
|
|
||||||
## gRPC server TLS options, see `mysql.tls` section.
|
## gRPC server TLS options, see `mysql.tls` section.
|
||||||
[grpc.tls]
|
[grpc.tls]
|
||||||
|
|||||||
90
src/common/function/src/adjust_flow.rs
Normal file
90
src/common/function/src/adjust_flow.rs
Normal file
@@ -0,0 +1,90 @@
|
|||||||
|
// Copyright 2023 Greptime Team
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
use common_macro::admin_fn;
|
||||||
|
use common_query::error::{
|
||||||
|
InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
|
||||||
|
};
|
||||||
|
use common_query::prelude::Signature;
|
||||||
|
use datafusion::logical_expr::Volatility;
|
||||||
|
use datatypes::value::{Value, ValueRef};
|
||||||
|
use session::context::QueryContextRef;
|
||||||
|
use snafu::ensure;
|
||||||
|
use store_api::storage::ConcreteDataType;
|
||||||
|
|
||||||
|
use crate::handlers::FlowServiceHandlerRef;
|
||||||
|
use crate::helper::parse_catalog_flow;
|
||||||
|
|
||||||
|
fn adjust_signature() -> Signature {
|
||||||
|
Signature::exact(
|
||||||
|
vec![
|
||||||
|
ConcreteDataType::string_datatype(), // flow name
|
||||||
|
ConcreteDataType::uint64_datatype(), // min_run_interval in seconds
|
||||||
|
ConcreteDataType::uint64_datatype(), // max filter number per query
|
||||||
|
],
|
||||||
|
Volatility::Immutable,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[admin_fn(
|
||||||
|
name = AdjustFlowFunction,
|
||||||
|
display_name = adjust_flow,
|
||||||
|
sig_fn = adjust_signature,
|
||||||
|
ret = uint64
|
||||||
|
)]
|
||||||
|
pub(crate) async fn adjust_flow(
|
||||||
|
flow_service_handler: &FlowServiceHandlerRef,
|
||||||
|
query_ctx: &QueryContextRef,
|
||||||
|
params: &[ValueRef<'_>],
|
||||||
|
) -> Result<Value> {
|
||||||
|
ensure!(
|
||||||
|
params.len() == 3,
|
||||||
|
InvalidFuncArgsSnafu {
|
||||||
|
err_msg: format!(
|
||||||
|
"The length of the args is not correct, expect 3, have: {}",
|
||||||
|
params.len()
|
||||||
|
),
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let (flow_name, min_run_interval, max_filter_num) = match (params[0], params[1], params[2]) {
|
||||||
|
(
|
||||||
|
ValueRef::String(flow_name),
|
||||||
|
ValueRef::UInt64(min_run_interval),
|
||||||
|
ValueRef::UInt64(max_filter_num),
|
||||||
|
) => (flow_name, min_run_interval, max_filter_num),
|
||||||
|
_ => {
|
||||||
|
return UnsupportedInputDataTypeSnafu {
|
||||||
|
function: "adjust_flow",
|
||||||
|
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
|
||||||
|
}
|
||||||
|
.fail();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let (catalog_name, flow_name) = parse_catalog_flow(flow_name, query_ctx)?;
|
||||||
|
|
||||||
|
let res = flow_service_handler
|
||||||
|
.adjust(
|
||||||
|
&catalog_name,
|
||||||
|
&flow_name,
|
||||||
|
min_run_interval,
|
||||||
|
max_filter_num as usize,
|
||||||
|
query_ctx.clone(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
let affected_rows = res.affected_rows;
|
||||||
|
|
||||||
|
Ok(Value::from(affected_rows))
|
||||||
|
}
|
||||||
@@ -26,6 +26,7 @@ use flush_compact_table::{CompactTableFunction, FlushTableFunction};
|
|||||||
use migrate_region::MigrateRegionFunction;
|
use migrate_region::MigrateRegionFunction;
|
||||||
use remove_region_follower::RemoveRegionFollowerFunction;
|
use remove_region_follower::RemoveRegionFollowerFunction;
|
||||||
|
|
||||||
|
use crate::adjust_flow::AdjustFlowFunction;
|
||||||
use crate::flush_flow::FlushFlowFunction;
|
use crate::flush_flow::FlushFlowFunction;
|
||||||
use crate::function_registry::FunctionRegistry;
|
use crate::function_registry::FunctionRegistry;
|
||||||
|
|
||||||
@@ -43,5 +44,6 @@ impl AdminFunction {
|
|||||||
registry.register_async(Arc::new(FlushTableFunction));
|
registry.register_async(Arc::new(FlushTableFunction));
|
||||||
registry.register_async(Arc::new(CompactTableFunction));
|
registry.register_async(Arc::new(CompactTableFunction));
|
||||||
registry.register_async(Arc::new(FlushFlowFunction));
|
registry.register_async(Arc::new(FlushFlowFunction));
|
||||||
|
registry.register_async(Arc::new(AdjustFlowFunction));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,21 +12,19 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
use common_error::ext::BoxedError;
|
|
||||||
use common_macro::admin_fn;
|
use common_macro::admin_fn;
|
||||||
use common_query::error::{
|
use common_query::error::{
|
||||||
ExecuteSnafu, InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result,
|
InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
|
||||||
UnsupportedInputDataTypeSnafu,
|
|
||||||
};
|
};
|
||||||
use common_query::prelude::Signature;
|
use common_query::prelude::Signature;
|
||||||
use datafusion::logical_expr::Volatility;
|
use datafusion::logical_expr::Volatility;
|
||||||
use datatypes::value::{Value, ValueRef};
|
use datatypes::value::{Value, ValueRef};
|
||||||
use session::context::QueryContextRef;
|
use session::context::QueryContextRef;
|
||||||
use snafu::{ensure, ResultExt};
|
use snafu::ensure;
|
||||||
use sql::parser::ParserContext;
|
|
||||||
use store_api::storage::ConcreteDataType;
|
use store_api::storage::ConcreteDataType;
|
||||||
|
|
||||||
use crate::handlers::FlowServiceHandlerRef;
|
use crate::handlers::FlowServiceHandlerRef;
|
||||||
|
use crate::helper::parse_catalog_flow;
|
||||||
|
|
||||||
fn flush_signature() -> Signature {
|
fn flush_signature() -> Signature {
|
||||||
Signature::uniform(
|
Signature::uniform(
|
||||||
@@ -47,20 +45,6 @@ pub(crate) async fn flush_flow(
|
|||||||
query_ctx: &QueryContextRef,
|
query_ctx: &QueryContextRef,
|
||||||
params: &[ValueRef<'_>],
|
params: &[ValueRef<'_>],
|
||||||
) -> Result<Value> {
|
) -> Result<Value> {
|
||||||
let (catalog_name, flow_name) = parse_flush_flow(params, query_ctx)?;
|
|
||||||
|
|
||||||
let res = flow_service_handler
|
|
||||||
.flush(&catalog_name, &flow_name, query_ctx.clone())
|
|
||||||
.await?;
|
|
||||||
let affected_rows = res.affected_rows;
|
|
||||||
|
|
||||||
Ok(Value::from(affected_rows))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn parse_flush_flow(
|
|
||||||
params: &[ValueRef<'_>],
|
|
||||||
query_ctx: &QueryContextRef,
|
|
||||||
) -> Result<(String, String)> {
|
|
||||||
ensure!(
|
ensure!(
|
||||||
params.len() == 1,
|
params.len() == 1,
|
||||||
InvalidFuncArgsSnafu {
|
InvalidFuncArgsSnafu {
|
||||||
@@ -70,7 +54,6 @@ fn parse_flush_flow(
|
|||||||
),
|
),
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
let ValueRef::String(flow_name) = params[0] else {
|
let ValueRef::String(flow_name) = params[0] else {
|
||||||
return UnsupportedInputDataTypeSnafu {
|
return UnsupportedInputDataTypeSnafu {
|
||||||
function: "flush_flow",
|
function: "flush_flow",
|
||||||
@@ -78,27 +61,14 @@ fn parse_flush_flow(
|
|||||||
}
|
}
|
||||||
.fail();
|
.fail();
|
||||||
};
|
};
|
||||||
let obj_name = ParserContext::parse_table_name(flow_name, query_ctx.sql_dialect())
|
let (catalog_name, flow_name) = parse_catalog_flow(flow_name, query_ctx)?;
|
||||||
.map_err(BoxedError::new)
|
|
||||||
.context(ExecuteSnafu)?;
|
|
||||||
|
|
||||||
let (catalog_name, flow_name) = match &obj_name.0[..] {
|
let res = flow_service_handler
|
||||||
[flow_name] => (
|
.flush(&catalog_name, &flow_name, query_ctx.clone())
|
||||||
query_ctx.current_catalog().to_string(),
|
.await?;
|
||||||
flow_name.value.clone(),
|
let affected_rows = res.affected_rows;
|
||||||
),
|
|
||||||
[catalog, flow_name] => (catalog.value.clone(), flow_name.value.clone()),
|
Ok(Value::from(affected_rows))
|
||||||
_ => {
|
|
||||||
return InvalidFuncArgsSnafu {
|
|
||||||
err_msg: format!(
|
|
||||||
"expect flow name to be <catalog>.<flow-name> or <flow-name>, actual: {}",
|
|
||||||
obj_name
|
|
||||||
),
|
|
||||||
}
|
|
||||||
.fail()
|
|
||||||
}
|
|
||||||
};
|
|
||||||
Ok((catalog_name, flow_name))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@@ -154,10 +124,7 @@ mod test {
|
|||||||
("catalog.flow_name", ("catalog", "flow_name")),
|
("catalog.flow_name", ("catalog", "flow_name")),
|
||||||
];
|
];
|
||||||
for (input, expected) in testcases.iter() {
|
for (input, expected) in testcases.iter() {
|
||||||
let args = vec![*input];
|
let result = parse_catalog_flow(input, &QueryContext::arc()).unwrap();
|
||||||
let args = args.into_iter().map(ValueRef::String).collect::<Vec<_>>();
|
|
||||||
|
|
||||||
let result = parse_flush_flow(&args, &QueryContext::arc()).unwrap();
|
|
||||||
assert_eq!(*expected, (result.0.as_str(), result.1.as_str()));
|
assert_eq!(*expected, (result.0.as_str(), result.1.as_str()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -87,6 +87,15 @@ pub trait FlowServiceHandler: Send + Sync {
|
|||||||
flow: &str,
|
flow: &str,
|
||||||
ctx: QueryContextRef,
|
ctx: QueryContextRef,
|
||||||
) -> Result<api::v1::flow::FlowResponse>;
|
) -> Result<api::v1::flow::FlowResponse>;
|
||||||
|
|
||||||
|
async fn adjust(
|
||||||
|
&self,
|
||||||
|
catalog: &str,
|
||||||
|
flow: &str,
|
||||||
|
min_run_interval_secs: u64,
|
||||||
|
max_filter_num_per_query: usize,
|
||||||
|
ctx: QueryContextRef,
|
||||||
|
) -> Result<api::v1::flow::FlowResponse>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type TableMutationHandlerRef = Arc<dyn TableMutationHandler>;
|
pub type TableMutationHandlerRef = Arc<dyn TableMutationHandler>;
|
||||||
|
|||||||
@@ -12,12 +12,15 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
use common_query::error::{InvalidInputTypeSnafu, Result};
|
use common_error::ext::BoxedError;
|
||||||
|
use common_query::error::{ExecuteSnafu, InvalidFuncArgsSnafu, InvalidInputTypeSnafu, Result};
|
||||||
use common_query::prelude::{Signature, TypeSignature, Volatility};
|
use common_query::prelude::{Signature, TypeSignature, Volatility};
|
||||||
use datatypes::prelude::ConcreteDataType;
|
use datatypes::prelude::ConcreteDataType;
|
||||||
use datatypes::types::cast::cast;
|
use datatypes::types::cast::cast;
|
||||||
use datatypes::value::ValueRef;
|
use datatypes::value::ValueRef;
|
||||||
|
use session::context::QueryContextRef;
|
||||||
use snafu::ResultExt;
|
use snafu::ResultExt;
|
||||||
|
use sql::parser::ParserContext;
|
||||||
|
|
||||||
/// Create a function signature with oneof signatures of interleaving two arguments.
|
/// Create a function signature with oneof signatures of interleaving two arguments.
|
||||||
pub fn one_of_sigs2(args1: Vec<ConcreteDataType>, args2: Vec<ConcreteDataType>) -> Signature {
|
pub fn one_of_sigs2(args1: Vec<ConcreteDataType>, args2: Vec<ConcreteDataType>) -> Signature {
|
||||||
@@ -43,3 +46,30 @@ pub fn cast_u64(value: &ValueRef) -> Result<Option<u64>> {
|
|||||||
})
|
})
|
||||||
.map(|v| v.as_u64())
|
.map(|v| v.as_u64())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn parse_catalog_flow(
|
||||||
|
flow_name: &str,
|
||||||
|
query_ctx: &QueryContextRef,
|
||||||
|
) -> Result<(String, String)> {
|
||||||
|
let obj_name = ParserContext::parse_table_name(flow_name, query_ctx.sql_dialect())
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
.context(ExecuteSnafu)?;
|
||||||
|
|
||||||
|
let (catalog_name, flow_name) = match &obj_name.0[..] {
|
||||||
|
[flow_name] => (
|
||||||
|
query_ctx.current_catalog().to_string(),
|
||||||
|
flow_name.value.clone(),
|
||||||
|
),
|
||||||
|
[catalog, flow_name] => (catalog.value.clone(), flow_name.value.clone()),
|
||||||
|
_ => {
|
||||||
|
return InvalidFuncArgsSnafu {
|
||||||
|
err_msg: format!(
|
||||||
|
"expect flow name to be <catalog>.<flow-name> or <flow-name>, actual: {}",
|
||||||
|
obj_name
|
||||||
|
),
|
||||||
|
}
|
||||||
|
.fail()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Ok((catalog_name, flow_name))
|
||||||
|
}
|
||||||
|
|||||||
@@ -15,6 +15,7 @@
|
|||||||
#![feature(let_chains)]
|
#![feature(let_chains)]
|
||||||
#![feature(try_blocks)]
|
#![feature(try_blocks)]
|
||||||
|
|
||||||
|
mod adjust_flow;
|
||||||
mod admin;
|
mod admin;
|
||||||
mod flush_flow;
|
mod flush_flow;
|
||||||
mod macros;
|
mod macros;
|
||||||
|
|||||||
@@ -148,6 +148,17 @@ impl FunctionState {
|
|||||||
) -> Result<api::v1::flow::FlowResponse> {
|
) -> Result<api::v1::flow::FlowResponse> {
|
||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn adjust(
|
||||||
|
&self,
|
||||||
|
_catalog: &str,
|
||||||
|
_flow: &str,
|
||||||
|
_min_run_interval_secs: u64,
|
||||||
|
_max_filter_num_per_query: usize,
|
||||||
|
_ctx: QueryContextRef,
|
||||||
|
) -> Result<api::v1::flow::FlowResponse> {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
|
|||||||
@@ -64,6 +64,7 @@ impl Default for FlightEncoder {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl FlightEncoder {
|
impl FlightEncoder {
|
||||||
|
/// Creates new [FlightEncoder] with compression disabled.
|
||||||
pub fn with_compression_disabled() -> Self {
|
pub fn with_compression_disabled() -> Self {
|
||||||
let write_options = writer::IpcWriteOptions::default()
|
let write_options = writer::IpcWriteOptions::default()
|
||||||
.try_with_compression(None)
|
.try_with_compression(None)
|
||||||
|
|||||||
@@ -372,6 +372,7 @@ impl DatanodeBuilder {
|
|||||||
opts.max_concurrent_queries,
|
opts.max_concurrent_queries,
|
||||||
//TODO: revaluate the hardcoded timeout on the next version of datanode concurrency limiter.
|
//TODO: revaluate the hardcoded timeout on the next version of datanode concurrency limiter.
|
||||||
Duration::from_millis(100),
|
Duration::from_millis(100),
|
||||||
|
opts.grpc.flight_compression,
|
||||||
);
|
);
|
||||||
|
|
||||||
let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
|
let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
|
||||||
|
|||||||
@@ -50,6 +50,7 @@ use query::QueryEngineRef;
|
|||||||
use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult};
|
use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult};
|
||||||
use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
|
use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
|
||||||
use servers::grpc::region_server::RegionServerHandler;
|
use servers::grpc::region_server::RegionServerHandler;
|
||||||
|
use servers::grpc::FlightCompression;
|
||||||
use session::context::{QueryContextBuilder, QueryContextRef};
|
use session::context::{QueryContextBuilder, QueryContextRef};
|
||||||
use snafu::{ensure, OptionExt, ResultExt};
|
use snafu::{ensure, OptionExt, ResultExt};
|
||||||
use store_api::metric_engine_consts::{
|
use store_api::metric_engine_consts::{
|
||||||
@@ -80,6 +81,7 @@ use crate::event_listener::RegionServerEventListenerRef;
|
|||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct RegionServer {
|
pub struct RegionServer {
|
||||||
inner: Arc<RegionServerInner>,
|
inner: Arc<RegionServerInner>,
|
||||||
|
flight_compression: FlightCompression,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct RegionStat {
|
pub struct RegionStat {
|
||||||
@@ -93,6 +95,7 @@ impl RegionServer {
|
|||||||
query_engine: QueryEngineRef,
|
query_engine: QueryEngineRef,
|
||||||
runtime: Runtime,
|
runtime: Runtime,
|
||||||
event_listener: RegionServerEventListenerRef,
|
event_listener: RegionServerEventListenerRef,
|
||||||
|
flight_compression: FlightCompression,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self::with_table_provider(
|
Self::with_table_provider(
|
||||||
query_engine,
|
query_engine,
|
||||||
@@ -101,6 +104,7 @@ impl RegionServer {
|
|||||||
Arc::new(DummyTableProviderFactory),
|
Arc::new(DummyTableProviderFactory),
|
||||||
0,
|
0,
|
||||||
Duration::from_millis(0),
|
Duration::from_millis(0),
|
||||||
|
flight_compression,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -111,6 +115,7 @@ impl RegionServer {
|
|||||||
table_provider_factory: TableProviderFactoryRef,
|
table_provider_factory: TableProviderFactoryRef,
|
||||||
max_concurrent_queries: usize,
|
max_concurrent_queries: usize,
|
||||||
concurrent_query_limiter_timeout: Duration,
|
concurrent_query_limiter_timeout: Duration,
|
||||||
|
flight_compression: FlightCompression,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
inner: Arc::new(RegionServerInner::new(
|
inner: Arc::new(RegionServerInner::new(
|
||||||
@@ -123,6 +128,7 @@ impl RegionServer {
|
|||||||
concurrent_query_limiter_timeout,
|
concurrent_query_limiter_timeout,
|
||||||
),
|
),
|
||||||
)),
|
)),
|
||||||
|
flight_compression,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -536,7 +542,11 @@ impl FlightCraft for RegionServer {
|
|||||||
.trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
|
.trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let stream = Box::pin(FlightRecordBatchStream::new(result, tracing_context));
|
let stream = Box::pin(FlightRecordBatchStream::new(
|
||||||
|
result,
|
||||||
|
tracing_context,
|
||||||
|
self.flight_compression,
|
||||||
|
));
|
||||||
Ok(Response::new(stream))
|
Ok(Response::new(stream))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ use query::dataframe::DataFrame;
|
|||||||
use query::planner::LogicalPlanner;
|
use query::planner::LogicalPlanner;
|
||||||
use query::query_engine::{DescribeResult, QueryEngineState};
|
use query::query_engine::{DescribeResult, QueryEngineState};
|
||||||
use query::{QueryEngine, QueryEngineContext};
|
use query::{QueryEngine, QueryEngineContext};
|
||||||
|
use servers::grpc::FlightCompression;
|
||||||
use session::context::QueryContextRef;
|
use session::context::QueryContextRef;
|
||||||
use store_api::metadata::RegionMetadataRef;
|
use store_api::metadata::RegionMetadataRef;
|
||||||
use store_api::region_engine::{
|
use store_api::region_engine::{
|
||||||
@@ -97,6 +98,7 @@ pub fn mock_region_server() -> RegionServer {
|
|||||||
Arc::new(MockQueryEngine),
|
Arc::new(MockQueryEngine),
|
||||||
Runtime::builder().build().unwrap(),
|
Runtime::builder().build().unwrap(),
|
||||||
Box::new(NoopRegionServerEventListener),
|
Box::new(NoopRegionServerEventListener),
|
||||||
|
FlightCompression::default(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -61,6 +61,7 @@ prost.workspace = true
|
|||||||
query.workspace = true
|
query.workspace = true
|
||||||
rand.workspace = true
|
rand.workspace = true
|
||||||
serde.workspace = true
|
serde.workspace = true
|
||||||
|
serde_json.workspace = true
|
||||||
servers.workspace = true
|
servers.workspace = true
|
||||||
session.workspace = true
|
session.workspace = true
|
||||||
smallvec.workspace = true
|
smallvec.workspace = true
|
||||||
|
|||||||
@@ -316,7 +316,7 @@ impl StreamingEngine {
|
|||||||
);
|
);
|
||||||
|
|
||||||
METRIC_FLOW_ROWS
|
METRIC_FLOW_ROWS
|
||||||
.with_label_values(&["out"])
|
.with_label_values(&["out-streaming"])
|
||||||
.inc_by(total_rows as u64);
|
.inc_by(total_rows as u64);
|
||||||
|
|
||||||
let now = self.tick_manager.tick();
|
let now = self.tick_manager.tick();
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ use std::sync::atomic::AtomicBool;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use api::v1::flow::{
|
use api::v1::flow::{
|
||||||
flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow,
|
flow_request, AdjustFlow, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow,
|
||||||
};
|
};
|
||||||
use api::v1::region::InsertRequests;
|
use api::v1::region::InsertRequests;
|
||||||
use catalog::CatalogManager;
|
use catalog::CatalogManager;
|
||||||
@@ -32,6 +32,7 @@ use common_telemetry::{error, info, trace, warn};
|
|||||||
use datatypes::value::Value;
|
use datatypes::value::Value;
|
||||||
use futures::TryStreamExt;
|
use futures::TryStreamExt;
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
use session::context::QueryContextBuilder;
|
use session::context::QueryContextBuilder;
|
||||||
use snafu::{ensure, IntoError, OptionExt, ResultExt};
|
use snafu::{ensure, IntoError, OptionExt, ResultExt};
|
||||||
use store_api::storage::{RegionId, TableId};
|
use store_api::storage::{RegionId, TableId};
|
||||||
@@ -46,7 +47,7 @@ use crate::error::{
|
|||||||
IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu,
|
IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu,
|
||||||
NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu,
|
NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu,
|
||||||
};
|
};
|
||||||
use crate::metrics::METRIC_FLOW_TASK_COUNT;
|
use crate::metrics::{METRIC_FLOW_ROWS, METRIC_FLOW_TASK_COUNT};
|
||||||
use crate::repr::{self, DiffRow};
|
use crate::repr::{self, DiffRow};
|
||||||
use crate::{Error, FlowId};
|
use crate::{Error, FlowId};
|
||||||
|
|
||||||
@@ -689,6 +690,9 @@ impl FlowEngine for FlowDualEngine {
|
|||||||
let mut to_stream_engine = Vec::with_capacity(request.requests.len());
|
let mut to_stream_engine = Vec::with_capacity(request.requests.len());
|
||||||
let mut to_batch_engine = request.requests;
|
let mut to_batch_engine = request.requests;
|
||||||
|
|
||||||
|
let mut batching_row_cnt = 0;
|
||||||
|
let mut streaming_row_cnt = 0;
|
||||||
|
|
||||||
{
|
{
|
||||||
// not locking this, or recover flows will be starved when also handling flow inserts
|
// not locking this, or recover flows will be starved when also handling flow inserts
|
||||||
let src_table2flow = self.src_table2flow.read().await;
|
let src_table2flow = self.src_table2flow.read().await;
|
||||||
@@ -698,9 +702,11 @@ impl FlowEngine for FlowDualEngine {
|
|||||||
let is_in_stream = src_table2flow.in_stream(table_id);
|
let is_in_stream = src_table2flow.in_stream(table_id);
|
||||||
let is_in_batch = src_table2flow.in_batch(table_id);
|
let is_in_batch = src_table2flow.in_batch(table_id);
|
||||||
if is_in_stream {
|
if is_in_stream {
|
||||||
|
streaming_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
|
||||||
to_stream_engine.push(req.clone());
|
to_stream_engine.push(req.clone());
|
||||||
}
|
}
|
||||||
if is_in_batch {
|
if is_in_batch {
|
||||||
|
batching_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
if !is_in_batch && !is_in_stream {
|
if !is_in_batch && !is_in_stream {
|
||||||
@@ -713,6 +719,14 @@ impl FlowEngine for FlowDualEngine {
|
|||||||
// can't use drop due to https://github.com/rust-lang/rust/pull/128846
|
// can't use drop due to https://github.com/rust-lang/rust/pull/128846
|
||||||
}
|
}
|
||||||
|
|
||||||
|
METRIC_FLOW_ROWS
|
||||||
|
.with_label_values(&["in-streaming"])
|
||||||
|
.inc_by(streaming_row_cnt as u64);
|
||||||
|
|
||||||
|
METRIC_FLOW_ROWS
|
||||||
|
.with_label_values(&["in-batching"])
|
||||||
|
.inc_by(batching_row_cnt as u64);
|
||||||
|
|
||||||
let streaming_engine = self.streaming_engine.clone();
|
let streaming_engine = self.streaming_engine.clone();
|
||||||
let stream_handler: JoinHandle<Result<(), Error>> =
|
let stream_handler: JoinHandle<Result<(), Error>> =
|
||||||
common_runtime::spawn_global(async move {
|
common_runtime::spawn_global(async move {
|
||||||
@@ -809,6 +823,25 @@ impl common_meta::node_manager::Flownode for FlowDualEngine {
|
|||||||
..Default::default()
|
..Default::default()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
Some(flow_request::Body::Adjust(AdjustFlow { flow_id, options })) => {
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
struct Options {
|
||||||
|
min_run_interval_secs: u64,
|
||||||
|
max_filter_num_per_query: usize,
|
||||||
|
}
|
||||||
|
let options: Options = serde_json::from_str(&options).with_context(|_| {
|
||||||
|
common_meta::error::DeserializeFromJsonSnafu { input: options }
|
||||||
|
})?;
|
||||||
|
self.batching_engine
|
||||||
|
.adjust_flow(
|
||||||
|
flow_id.unwrap().id as u64,
|
||||||
|
options.min_run_interval_secs,
|
||||||
|
options.max_filter_num_per_query,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(to_meta_err(snafu::location!()))?;
|
||||||
|
Ok(Default::default())
|
||||||
|
}
|
||||||
other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
|
other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -841,93 +874,6 @@ fn to_meta_err(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl common_meta::node_manager::Flownode for StreamingEngine {
|
|
||||||
async fn handle(&self, request: FlowRequest) -> MetaResult<FlowResponse> {
|
|
||||||
let query_ctx = request
|
|
||||||
.header
|
|
||||||
.and_then(|h| h.query_context)
|
|
||||||
.map(|ctx| ctx.into());
|
|
||||||
match request.body {
|
|
||||||
Some(flow_request::Body::Create(CreateRequest {
|
|
||||||
flow_id: Some(task_id),
|
|
||||||
source_table_ids,
|
|
||||||
sink_table_name: Some(sink_table_name),
|
|
||||||
create_if_not_exists,
|
|
||||||
expire_after,
|
|
||||||
comment,
|
|
||||||
sql,
|
|
||||||
flow_options,
|
|
||||||
or_replace,
|
|
||||||
})) => {
|
|
||||||
let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
|
|
||||||
let sink_table_name = [
|
|
||||||
sink_table_name.catalog_name,
|
|
||||||
sink_table_name.schema_name,
|
|
||||||
sink_table_name.table_name,
|
|
||||||
];
|
|
||||||
let expire_after = expire_after.map(|e| e.value);
|
|
||||||
let args = CreateFlowArgs {
|
|
||||||
flow_id: task_id.id as u64,
|
|
||||||
sink_table_name,
|
|
||||||
source_table_ids,
|
|
||||||
create_if_not_exists,
|
|
||||||
or_replace,
|
|
||||||
expire_after,
|
|
||||||
comment: Some(comment),
|
|
||||||
sql: sql.clone(),
|
|
||||||
flow_options,
|
|
||||||
query_ctx,
|
|
||||||
};
|
|
||||||
let ret = self
|
|
||||||
.create_flow(args)
|
|
||||||
.await
|
|
||||||
.map_err(BoxedError::new)
|
|
||||||
.with_context(|_| CreateFlowSnafu { sql: sql.clone() })
|
|
||||||
.map_err(to_meta_err(snafu::location!()))?;
|
|
||||||
METRIC_FLOW_TASK_COUNT.inc();
|
|
||||||
Ok(FlowResponse {
|
|
||||||
affected_flows: ret
|
|
||||||
.map(|id| greptime_proto::v1::FlowId { id: id as u32 })
|
|
||||||
.into_iter()
|
|
||||||
.collect_vec(),
|
|
||||||
..Default::default()
|
|
||||||
})
|
|
||||||
}
|
|
||||||
Some(flow_request::Body::Drop(DropRequest {
|
|
||||||
flow_id: Some(flow_id),
|
|
||||||
})) => {
|
|
||||||
self.remove_flow(flow_id.id as u64)
|
|
||||||
.await
|
|
||||||
.map_err(to_meta_err(snafu::location!()))?;
|
|
||||||
METRIC_FLOW_TASK_COUNT.dec();
|
|
||||||
Ok(Default::default())
|
|
||||||
}
|
|
||||||
Some(flow_request::Body::Flush(FlushFlow {
|
|
||||||
flow_id: Some(flow_id),
|
|
||||||
})) => {
|
|
||||||
let row = self
|
|
||||||
.flush_flow_inner(flow_id.id as u64)
|
|
||||||
.await
|
|
||||||
.map_err(to_meta_err(snafu::location!()))?;
|
|
||||||
Ok(FlowResponse {
|
|
||||||
affected_flows: vec![flow_id],
|
|
||||||
affected_rows: row as u64,
|
|
||||||
..Default::default()
|
|
||||||
})
|
|
||||||
}
|
|
||||||
other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_inserts(&self, request: InsertRequests) -> MetaResult<FlowResponse> {
|
|
||||||
self.handle_inserts_inner(request)
|
|
||||||
.await
|
|
||||||
.map(|_| Default::default())
|
|
||||||
.map_err(to_meta_err(snafu::location!()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl FlowEngine for StreamingEngine {
|
impl FlowEngine for StreamingEngine {
|
||||||
async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
|
async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
|
||||||
self.create_flow_inner(args).await
|
self.create_flow_inner(args).await
|
||||||
|
|||||||
@@ -388,6 +388,20 @@ impl BatchingEngine {
|
|||||||
pub async fn flow_exist_inner(&self, flow_id: FlowId) -> bool {
|
pub async fn flow_exist_inner(&self, flow_id: FlowId) -> bool {
|
||||||
self.tasks.read().await.contains_key(&flow_id)
|
self.tasks.read().await.contains_key(&flow_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn adjust_flow(
|
||||||
|
&self,
|
||||||
|
flow_id: FlowId,
|
||||||
|
min_run_interval_secs: u64,
|
||||||
|
max_filter_num_per_query: usize,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let task = self.tasks.read().await.get(&flow_id).cloned();
|
||||||
|
let task = task.with_context(|| FlowNotFoundSnafu { id: flow_id })?;
|
||||||
|
debug!("Adjusting flow {flow_id} with min_run_interval_secs={} and max_filter_num_per_query={}", min_run_interval_secs, max_filter_num_per_query);
|
||||||
|
task.adjust(min_run_interval_secs, max_filter_num_per_query);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FlowEngine for BatchingEngine {
|
impl FlowEngine for BatchingEngine {
|
||||||
|
|||||||
@@ -14,8 +14,9 @@
|
|||||||
|
|
||||||
//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
|
//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
|
||||||
|
|
||||||
use std::sync::{Arc, Weak};
|
use std::collections::HashMap;
|
||||||
use std::time::SystemTime;
|
use std::sync::{Arc, Mutex, Weak};
|
||||||
|
use std::time::{Duration, Instant, SystemTime};
|
||||||
|
|
||||||
use api::v1::greptime_request::Request;
|
use api::v1::greptime_request::Request;
|
||||||
use api::v1::CreateTableExpr;
|
use api::v1::CreateTableExpr;
|
||||||
@@ -26,20 +27,21 @@ use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
|
|||||||
use common_meta::peer::Peer;
|
use common_meta::peer::Peer;
|
||||||
use common_meta::rpc::store::RangeRequest;
|
use common_meta::rpc::store::RangeRequest;
|
||||||
use common_query::Output;
|
use common_query::Output;
|
||||||
use common_telemetry::warn;
|
use common_telemetry::{debug, warn};
|
||||||
|
use itertools::Itertools;
|
||||||
use meta_client::client::MetaClient;
|
use meta_client::client::MetaClient;
|
||||||
use rand::rng;
|
|
||||||
use rand::seq::SliceRandom;
|
|
||||||
use servers::query_handler::grpc::GrpcQueryHandler;
|
use servers::query_handler::grpc::GrpcQueryHandler;
|
||||||
use session::context::{QueryContextBuilder, QueryContextRef};
|
use session::context::{QueryContextBuilder, QueryContextRef};
|
||||||
use snafu::{OptionExt, ResultExt};
|
use snafu::{OptionExt, ResultExt};
|
||||||
|
|
||||||
|
use crate::batching_mode::task::BatchingTask;
|
||||||
use crate::batching_mode::{
|
use crate::batching_mode::{
|
||||||
DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, FRONTEND_ACTIVITY_TIMEOUT, GRPC_CONN_TIMEOUT,
|
DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, FRONTEND_ACTIVITY_TIMEOUT, GRPC_CONN_TIMEOUT,
|
||||||
GRPC_MAX_RETRIES,
|
GRPC_MAX_RETRIES,
|
||||||
};
|
};
|
||||||
use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu};
|
use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu};
|
||||||
use crate::{Error, FlowAuthHeader};
|
use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD;
|
||||||
|
use crate::{Error, FlowAuthHeader, FlowId};
|
||||||
|
|
||||||
/// Just like [`GrpcQueryHandler`] but use BoxedError
|
/// Just like [`GrpcQueryHandler`] but use BoxedError
|
||||||
///
|
///
|
||||||
@@ -74,6 +76,105 @@ impl<
|
|||||||
|
|
||||||
type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>;
|
type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>;
|
||||||
|
|
||||||
|
/// Statistics about running query on this frontend from flownode
|
||||||
|
#[derive(Debug, Default, Clone)]
|
||||||
|
struct FrontendStat {
|
||||||
|
/// The query for flow id has been running since this timestamp
|
||||||
|
since: HashMap<FlowId, Instant>,
|
||||||
|
/// The average query time for each flow id
|
||||||
|
/// This is used to calculate the average query time for each flow id
|
||||||
|
past_query_avg: HashMap<FlowId, (usize, Duration)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Default, Clone)]
|
||||||
|
pub struct FrontendStats {
|
||||||
|
/// The statistics for each flow id
|
||||||
|
stats: Arc<Mutex<HashMap<String, FrontendStat>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FrontendStats {
|
||||||
|
pub fn observe(&self, frontend_addr: &str, flow_id: FlowId) -> FrontendStatsGuard {
|
||||||
|
let mut stats = self.stats.lock().expect("Failed to lock frontend stats");
|
||||||
|
let stat = stats.entry(frontend_addr.to_string()).or_default();
|
||||||
|
stat.since.insert(flow_id, Instant::now());
|
||||||
|
|
||||||
|
FrontendStatsGuard {
|
||||||
|
stats: self.stats.clone(),
|
||||||
|
frontend_addr: frontend_addr.to_string(),
|
||||||
|
cur: flow_id,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// return frontend addrs sorted by load, from lightest to heaviest
|
||||||
|
/// The load is calculated as the total average query time for each flow id plus running query's total running time elapsed
|
||||||
|
pub fn sort_by_load(&self) -> Vec<String> {
|
||||||
|
let stats = self.stats.lock().expect("Failed to lock frontend stats");
|
||||||
|
let fe_load_factor = stats
|
||||||
|
.iter()
|
||||||
|
.map(|(node_addr, stat)| {
|
||||||
|
// total expected avg running time for all currently running queries
|
||||||
|
let total_expect_avg_run_time = stat
|
||||||
|
.since
|
||||||
|
.keys()
|
||||||
|
.map(|f| {
|
||||||
|
let (count, total_duration) =
|
||||||
|
stat.past_query_avg.get(f).unwrap_or(&(0, Duration::ZERO));
|
||||||
|
if *count == 0 {
|
||||||
|
0.0
|
||||||
|
} else {
|
||||||
|
total_duration.as_secs_f64() / *count as f64
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.sum::<f64>();
|
||||||
|
let total_cur_running_time = stat
|
||||||
|
.since
|
||||||
|
.values()
|
||||||
|
.map(|since| since.elapsed().as_secs_f64())
|
||||||
|
.sum::<f64>();
|
||||||
|
(
|
||||||
|
node_addr.to_string(),
|
||||||
|
total_expect_avg_run_time + total_cur_running_time,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.sorted_by(|(_, load_a), (_, load_b)| {
|
||||||
|
load_a
|
||||||
|
.partial_cmp(load_b)
|
||||||
|
.unwrap_or(std::cmp::Ordering::Equal)
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
debug!("Frontend load factor: {:?}", fe_load_factor);
|
||||||
|
for (node_addr, load) in &fe_load_factor {
|
||||||
|
METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD
|
||||||
|
.with_label_values(&[&node_addr.to_string()])
|
||||||
|
.observe(*load);
|
||||||
|
}
|
||||||
|
fe_load_factor
|
||||||
|
.into_iter()
|
||||||
|
.map(|(addr, _)| addr)
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct FrontendStatsGuard {
|
||||||
|
stats: Arc<Mutex<HashMap<String, FrontendStat>>>,
|
||||||
|
frontend_addr: String,
|
||||||
|
cur: FlowId,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for FrontendStatsGuard {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
let mut stats = self.stats.lock().expect("Failed to lock frontend stats");
|
||||||
|
if let Some(stat) = stats.get_mut(&self.frontend_addr) {
|
||||||
|
if let Some(since) = stat.since.remove(&self.cur) {
|
||||||
|
let elapsed = since.elapsed();
|
||||||
|
let (count, total_duration) = stat.past_query_avg.entry(self.cur).or_default();
|
||||||
|
*count += 1;
|
||||||
|
*total_duration += elapsed;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// A simple frontend client able to execute sql using grpc protocol
|
/// A simple frontend client able to execute sql using grpc protocol
|
||||||
///
|
///
|
||||||
/// This is for computation-heavy query which need to offload computation to frontend, lifting the load from flownode
|
/// This is for computation-heavy query which need to offload computation to frontend, lifting the load from flownode
|
||||||
@@ -83,6 +184,7 @@ pub enum FrontendClient {
|
|||||||
meta_client: Arc<MetaClient>,
|
meta_client: Arc<MetaClient>,
|
||||||
chnl_mgr: ChannelManager,
|
chnl_mgr: ChannelManager,
|
||||||
auth: Option<FlowAuthHeader>,
|
auth: Option<FlowAuthHeader>,
|
||||||
|
fe_stats: FrontendStats,
|
||||||
},
|
},
|
||||||
Standalone {
|
Standalone {
|
||||||
/// for the sake of simplicity still use grpc even in standalone mode
|
/// for the sake of simplicity still use grpc even in standalone mode
|
||||||
@@ -114,6 +216,7 @@ impl FrontendClient {
|
|||||||
ChannelManager::with_config(cfg)
|
ChannelManager::with_config(cfg)
|
||||||
},
|
},
|
||||||
auth,
|
auth,
|
||||||
|
fe_stats: Default::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -183,7 +286,7 @@ impl FrontendClient {
|
|||||||
|
|
||||||
/// Get the frontend with recent enough(less than 1 minute from now) `last_activity_ts`
|
/// Get the frontend with recent enough(less than 1 minute from now) `last_activity_ts`
|
||||||
/// and is able to process query
|
/// and is able to process query
|
||||||
async fn get_random_active_frontend(
|
pub(crate) async fn get_random_active_frontend(
|
||||||
&self,
|
&self,
|
||||||
catalog: &str,
|
catalog: &str,
|
||||||
schema: &str,
|
schema: &str,
|
||||||
@@ -192,6 +295,7 @@ impl FrontendClient {
|
|||||||
meta_client: _,
|
meta_client: _,
|
||||||
chnl_mgr,
|
chnl_mgr,
|
||||||
auth,
|
auth,
|
||||||
|
fe_stats,
|
||||||
} = self
|
} = self
|
||||||
else {
|
else {
|
||||||
return UnexpectedSnafu {
|
return UnexpectedSnafu {
|
||||||
@@ -208,8 +312,21 @@ impl FrontendClient {
|
|||||||
.duration_since(SystemTime::UNIX_EPOCH)
|
.duration_since(SystemTime::UNIX_EPOCH)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.as_millis() as i64;
|
.as_millis() as i64;
|
||||||
// shuffle the frontends to avoid always pick the same one
|
let node_addrs_by_load = fe_stats.sort_by_load();
|
||||||
frontends.shuffle(&mut rng());
|
// index+1 to load order asc, so that the lightest node has load 1 and non-existent node has load 0
|
||||||
|
let addr2load = node_addrs_by_load
|
||||||
|
.iter()
|
||||||
|
.enumerate()
|
||||||
|
.map(|(i, id)| (id.clone(), i + 1))
|
||||||
|
.collect::<HashMap<_, _>>();
|
||||||
|
// sort frontends by load, from lightest to heaviest
|
||||||
|
frontends.sort_by(|(_, a), (_, b)| {
|
||||||
|
// if not even in stats, treat as 0 load since never been queried
|
||||||
|
let load_a = addr2load.get(&a.peer.addr).unwrap_or(&0);
|
||||||
|
let load_b = addr2load.get(&b.peer.addr).unwrap_or(&0);
|
||||||
|
load_a.cmp(load_b)
|
||||||
|
});
|
||||||
|
debug!("Frontend nodes sorted by load: {:?}", frontends);
|
||||||
|
|
||||||
// found node with maximum last_activity_ts
|
// found node with maximum last_activity_ts
|
||||||
for (_, node_info) in frontends
|
for (_, node_info) in frontends
|
||||||
@@ -257,6 +374,7 @@ impl FrontendClient {
|
|||||||
create: CreateTableExpr,
|
create: CreateTableExpr,
|
||||||
catalog: &str,
|
catalog: &str,
|
||||||
schema: &str,
|
schema: &str,
|
||||||
|
task: Option<&BatchingTask>,
|
||||||
) -> Result<u32, Error> {
|
) -> Result<u32, Error> {
|
||||||
self.handle(
|
self.handle(
|
||||||
Request::Ddl(api::v1::DdlRequest {
|
Request::Ddl(api::v1::DdlRequest {
|
||||||
@@ -264,7 +382,8 @@ impl FrontendClient {
|
|||||||
}),
|
}),
|
||||||
catalog,
|
catalog,
|
||||||
schema,
|
schema,
|
||||||
&mut None,
|
None,
|
||||||
|
task,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
@@ -275,15 +394,31 @@ impl FrontendClient {
|
|||||||
req: api::v1::greptime_request::Request,
|
req: api::v1::greptime_request::Request,
|
||||||
catalog: &str,
|
catalog: &str,
|
||||||
schema: &str,
|
schema: &str,
|
||||||
peer_desc: &mut Option<PeerDesc>,
|
use_peer: Option<Peer>,
|
||||||
|
task: Option<&BatchingTask>,
|
||||||
) -> Result<u32, Error> {
|
) -> Result<u32, Error> {
|
||||||
match self {
|
match self {
|
||||||
FrontendClient::Distributed { .. } => {
|
FrontendClient::Distributed {
|
||||||
let db = self.get_random_active_frontend(catalog, schema).await?;
|
fe_stats, chnl_mgr, ..
|
||||||
|
} => {
|
||||||
|
let db = if let Some(peer) = use_peer {
|
||||||
|
DatabaseWithPeer::new(
|
||||||
|
Database::new(
|
||||||
|
catalog,
|
||||||
|
schema,
|
||||||
|
Client::with_manager_and_urls(
|
||||||
|
chnl_mgr.clone(),
|
||||||
|
vec![peer.addr.clone()],
|
||||||
|
),
|
||||||
|
),
|
||||||
|
peer,
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
self.get_random_active_frontend(catalog, schema).await?
|
||||||
|
};
|
||||||
|
|
||||||
*peer_desc = Some(PeerDesc::Dist {
|
let flow_id = task.map(|t| t.config.flow_id).unwrap_or_default();
|
||||||
peer: db.peer.clone(),
|
let _guard = fe_stats.observe(&db.peer.addr, flow_id);
|
||||||
});
|
|
||||||
|
|
||||||
db.database
|
db.database
|
||||||
.handle_with_retry(req.clone(), GRPC_MAX_RETRIES)
|
.handle_with_retry(req.clone(), GRPC_MAX_RETRIES)
|
||||||
|
|||||||
@@ -32,6 +32,7 @@ use crate::batching_mode::MIN_REFRESH_DURATION;
|
|||||||
use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
|
use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
|
||||||
use crate::metrics::{
|
use crate::metrics::{
|
||||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT,
|
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT,
|
||||||
|
METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT,
|
||||||
};
|
};
|
||||||
use crate::{Error, FlowId};
|
use crate::{Error, FlowId};
|
||||||
|
|
||||||
@@ -52,6 +53,13 @@ pub struct TaskState {
|
|||||||
pub(crate) shutdown_rx: oneshot::Receiver<()>,
|
pub(crate) shutdown_rx: oneshot::Receiver<()>,
|
||||||
/// Task handle
|
/// Task handle
|
||||||
pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
|
pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
|
||||||
|
/// Slow Query metrics update task handle
|
||||||
|
pub(crate) slow_query_metric_task: Option<tokio::task::JoinHandle<()>>,
|
||||||
|
|
||||||
|
/// min run interval in seconds
|
||||||
|
pub(crate) min_run_interval: Option<u64>,
|
||||||
|
/// max filter number per query
|
||||||
|
pub(crate) max_filter_num: Option<usize>,
|
||||||
}
|
}
|
||||||
impl TaskState {
|
impl TaskState {
|
||||||
pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
|
pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
|
||||||
@@ -63,6 +71,9 @@ impl TaskState {
|
|||||||
exec_state: ExecState::Idle,
|
exec_state: ExecState::Idle,
|
||||||
shutdown_rx,
|
shutdown_rx,
|
||||||
task_handle: None,
|
task_handle: None,
|
||||||
|
slow_query_metric_task: None,
|
||||||
|
min_run_interval: None,
|
||||||
|
max_filter_num: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -87,20 +98,17 @@ impl TaskState {
|
|||||||
pub fn get_next_start_query_time(
|
pub fn get_next_start_query_time(
|
||||||
&self,
|
&self,
|
||||||
flow_id: FlowId,
|
flow_id: FlowId,
|
||||||
time_window_size: &Option<Duration>,
|
_time_window_size: &Option<Duration>,
|
||||||
max_timeout: Option<Duration>,
|
max_timeout: Option<Duration>,
|
||||||
) -> Instant {
|
) -> Instant {
|
||||||
let last_duration = max_timeout
|
let next_duration = max_timeout
|
||||||
.unwrap_or(self.last_query_duration)
|
.unwrap_or(self.last_query_duration)
|
||||||
.min(self.last_query_duration)
|
.min(self.last_query_duration)
|
||||||
.max(MIN_REFRESH_DURATION);
|
.max(
|
||||||
|
self.min_run_interval
|
||||||
let next_duration = time_window_size
|
.map(Duration::from_secs)
|
||||||
.map(|t| {
|
.unwrap_or(MIN_REFRESH_DURATION),
|
||||||
let half = t / 2;
|
);
|
||||||
half.max(last_duration)
|
|
||||||
})
|
|
||||||
.unwrap_or(last_duration);
|
|
||||||
|
|
||||||
// if have dirty time window, execute immediately to clean dirty time window
|
// if have dirty time window, execute immediately to clean dirty time window
|
||||||
if self.dirty_time_windows.windows.is_empty() {
|
if self.dirty_time_windows.windows.is_empty() {
|
||||||
@@ -206,47 +214,69 @@ impl DirtyTimeWindows {
|
|||||||
|
|
||||||
// get the first `window_cnt` time windows
|
// get the first `window_cnt` time windows
|
||||||
let max_time_range = window_size * window_cnt as i32;
|
let max_time_range = window_size * window_cnt as i32;
|
||||||
let nth = {
|
|
||||||
let mut cur_time_range = chrono::Duration::zero();
|
|
||||||
let mut nth_key = None;
|
|
||||||
for (idx, (start, end)) in self.windows.iter().enumerate() {
|
|
||||||
// if time range is too long, stop
|
|
||||||
if cur_time_range > max_time_range {
|
|
||||||
nth_key = Some(*start);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// if we have enough time windows, stop
|
let mut to_be_query = BTreeMap::new();
|
||||||
if idx >= window_cnt {
|
let mut new_windows = self.windows.clone();
|
||||||
nth_key = Some(*start);
|
let mut cur_time_range = chrono::Duration::zero();
|
||||||
break;
|
for (idx, (start, end)) in self.windows.iter().enumerate() {
|
||||||
}
|
let first_end = start
|
||||||
|
.add_duration(window_size.to_std().unwrap())
|
||||||
|
.context(TimeSnafu)?;
|
||||||
|
let end = end.unwrap_or(first_end);
|
||||||
|
|
||||||
if let Some(end) = end {
|
// if time range is too long, stop
|
||||||
if let Some(x) = end.sub(start) {
|
if cur_time_range >= max_time_range {
|
||||||
cur_time_range += x;
|
break;
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
nth_key
|
// if we have enough time windows, stop
|
||||||
};
|
if idx >= window_cnt {
|
||||||
let first_nth = {
|
break;
|
||||||
if let Some(nth) = nth {
|
|
||||||
let mut after = self.windows.split_off(&nth);
|
|
||||||
std::mem::swap(&mut self.windows, &mut after);
|
|
||||||
|
|
||||||
after
|
|
||||||
} else {
|
|
||||||
std::mem::take(&mut self.windows)
|
|
||||||
}
|
}
|
||||||
};
|
|
||||||
|
if let Some(x) = end.sub(start) {
|
||||||
|
if cur_time_range + x <= max_time_range {
|
||||||
|
to_be_query.insert(*start, Some(end));
|
||||||
|
new_windows.remove(start);
|
||||||
|
cur_time_range += x;
|
||||||
|
} else {
|
||||||
|
// too large a window, split it
|
||||||
|
// split at window_size * times
|
||||||
|
let surplus = max_time_range - cur_time_range;
|
||||||
|
let times = surplus.num_seconds() / window_size.num_seconds();
|
||||||
|
|
||||||
|
let split_offset = window_size * times as i32;
|
||||||
|
let split_at = start
|
||||||
|
.add_duration(split_offset.to_std().unwrap())
|
||||||
|
.context(TimeSnafu)?;
|
||||||
|
to_be_query.insert(*start, Some(split_at));
|
||||||
|
|
||||||
|
// remove the original window
|
||||||
|
new_windows.remove(start);
|
||||||
|
new_windows.insert(split_at, Some(end));
|
||||||
|
cur_time_range += split_offset;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.windows = new_windows;
|
||||||
|
|
||||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
|
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
|
||||||
.with_label_values(&[flow_id.to_string().as_str()])
|
.with_label_values(&[
|
||||||
.observe(first_nth.len() as f64);
|
flow_id.to_string().as_str(),
|
||||||
|
format!("{}", window_size).as_str(),
|
||||||
|
])
|
||||||
|
.observe(to_be_query.len() as f64);
|
||||||
|
|
||||||
let full_time_range = first_nth
|
METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT
|
||||||
|
.with_label_values(&[
|
||||||
|
flow_id.to_string().as_str(),
|
||||||
|
format!("{}", window_size).as_str(),
|
||||||
|
])
|
||||||
|
.observe(self.windows.len() as f64);
|
||||||
|
|
||||||
|
let full_time_range = to_be_query
|
||||||
.iter()
|
.iter()
|
||||||
.fold(chrono::Duration::zero(), |acc, (start, end)| {
|
.fold(chrono::Duration::zero(), |acc, (start, end)| {
|
||||||
if let Some(end) = end {
|
if let Some(end) = end {
|
||||||
@@ -257,11 +287,14 @@ impl DirtyTimeWindows {
|
|||||||
})
|
})
|
||||||
.num_seconds() as f64;
|
.num_seconds() as f64;
|
||||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE
|
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE
|
||||||
.with_label_values(&[flow_id.to_string().as_str()])
|
.with_label_values(&[
|
||||||
|
flow_id.to_string().as_str(),
|
||||||
|
format!("{}", window_size).as_str(),
|
||||||
|
])
|
||||||
.observe(full_time_range);
|
.observe(full_time_range);
|
||||||
|
|
||||||
let mut expr_lst = vec![];
|
let mut expr_lst = vec![];
|
||||||
for (start, end) in first_nth.into_iter() {
|
for (start, end) in to_be_query.into_iter() {
|
||||||
// align using time window exprs
|
// align using time window exprs
|
||||||
let (start, end) = if let Some(ctx) = task_ctx {
|
let (start, end) = if let Some(ctx) = task_ctx {
|
||||||
let Some(time_window_expr) = &ctx.config.time_window_expr else {
|
let Some(time_window_expr) = &ctx.config.time_window_expr else {
|
||||||
@@ -495,6 +528,64 @@ mod test {
|
|||||||
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))",
|
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))",
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
|
// split range
|
||||||
|
(
|
||||||
|
Vec::from_iter((0..20).map(|i|Timestamp::new_second(i*3)).chain(std::iter::once(
|
||||||
|
Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
|
||||||
|
))),
|
||||||
|
(chrono::Duration::seconds(3), None),
|
||||||
|
BTreeMap::from([
|
||||||
|
(
|
||||||
|
Timestamp::new_second(0),
|
||||||
|
Some(Timestamp::new_second(
|
||||||
|
60
|
||||||
|
)),
|
||||||
|
),
|
||||||
|
(
|
||||||
|
Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
|
||||||
|
Some(Timestamp::new_second(
|
||||||
|
60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1) + 3
|
||||||
|
)),
|
||||||
|
)]),
|
||||||
|
Some(
|
||||||
|
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
|
||||||
|
)
|
||||||
|
),
|
||||||
|
// split 2 min into 1 min
|
||||||
|
(
|
||||||
|
Vec::from_iter((0..40).map(|i|Timestamp::new_second(i*3))),
|
||||||
|
(chrono::Duration::seconds(3), None),
|
||||||
|
BTreeMap::from([
|
||||||
|
(
|
||||||
|
Timestamp::new_second(0),
|
||||||
|
Some(Timestamp::new_second(
|
||||||
|
40 * 3
|
||||||
|
)),
|
||||||
|
)]),
|
||||||
|
Some(
|
||||||
|
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
|
||||||
|
)
|
||||||
|
),
|
||||||
|
// split 3s + 1min into 3s + 57s
|
||||||
|
(
|
||||||
|
Vec::from_iter(std::iter::once(Timestamp::new_second(0)).chain((0..40).map(|i|Timestamp::new_second(20+i*3)))),
|
||||||
|
(chrono::Duration::seconds(3), None),
|
||||||
|
BTreeMap::from([
|
||||||
|
(
|
||||||
|
Timestamp::new_second(0),
|
||||||
|
Some(Timestamp::new_second(
|
||||||
|
3
|
||||||
|
)),
|
||||||
|
),(
|
||||||
|
Timestamp::new_second(20),
|
||||||
|
Some(Timestamp::new_second(
|
||||||
|
140
|
||||||
|
)),
|
||||||
|
)]),
|
||||||
|
Some(
|
||||||
|
"(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:03' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:00:20' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:17' AS TIMESTAMP))))",
|
||||||
|
)
|
||||||
|
),
|
||||||
// expired
|
// expired
|
||||||
(
|
(
|
||||||
vec![
|
vec![
|
||||||
@@ -511,6 +602,8 @@ mod test {
|
|||||||
None
|
None
|
||||||
),
|
),
|
||||||
];
|
];
|
||||||
|
// let len = testcases.len();
|
||||||
|
// let testcases = testcases[(len - 2)..(len - 1)].to_vec();
|
||||||
for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in
|
for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in
|
||||||
testcases
|
testcases
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -61,7 +61,9 @@ use crate::error::{
|
|||||||
SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
|
SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
|
||||||
};
|
};
|
||||||
use crate::metrics::{
|
use crate::metrics::{
|
||||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
|
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME,
|
||||||
|
METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
|
||||||
|
METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT, METRIC_FLOW_ROWS,
|
||||||
};
|
};
|
||||||
use crate::{Error, FlowId};
|
use crate::{Error, FlowId};
|
||||||
|
|
||||||
@@ -81,6 +83,14 @@ pub struct TaskConfig {
|
|||||||
query_type: QueryType,
|
query_type: QueryType,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl TaskConfig {
|
||||||
|
pub fn time_window_size(&self) -> Option<Duration> {
|
||||||
|
self.time_window_expr
|
||||||
|
.as_ref()
|
||||||
|
.and_then(|expr| *expr.time_window_size())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
|
fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
|
||||||
let stmts =
|
let stmts =
|
||||||
ParserContext::create_with_dialect(query, query_ctx.sql_dialect(), ParseOptions::default())
|
ParserContext::create_with_dialect(query, query_ctx.sql_dialect(), ParseOptions::default())
|
||||||
@@ -144,6 +154,12 @@ impl BatchingTask {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn adjust(&self, min_run_interval_secs: u64, max_filter_num_per_query: usize) {
|
||||||
|
let mut state = self.state.write().unwrap();
|
||||||
|
state.min_run_interval = Some(min_run_interval_secs);
|
||||||
|
state.max_filter_num = Some(max_filter_num_per_query);
|
||||||
|
}
|
||||||
|
|
||||||
/// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set)
|
/// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set)
|
||||||
///
|
///
|
||||||
/// useful for flush_flow to flush dirty time windows range
|
/// useful for flush_flow to flush dirty time windows range
|
||||||
@@ -280,7 +296,7 @@ impl BatchingTask {
|
|||||||
let catalog = &self.config.sink_table_name[0];
|
let catalog = &self.config.sink_table_name[0];
|
||||||
let schema = &self.config.sink_table_name[1];
|
let schema = &self.config.sink_table_name[1];
|
||||||
frontend_client
|
frontend_client
|
||||||
.create(expr.clone(), catalog, schema)
|
.create(expr.clone(), catalog, schema, Some(self))
|
||||||
.await?;
|
.await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -328,11 +344,53 @@ impl BatchingTask {
|
|||||||
})?;
|
})?;
|
||||||
|
|
||||||
let plan = expanded_plan;
|
let plan = expanded_plan;
|
||||||
let mut peer_desc = None;
|
|
||||||
|
let db = frontend_client
|
||||||
|
.get_random_active_frontend(catalog, schema)
|
||||||
|
.await?;
|
||||||
|
let peer_desc = db.peer.clone();
|
||||||
|
|
||||||
|
let (tx, mut rx) = oneshot::channel();
|
||||||
|
let peer_inner = peer_desc.clone();
|
||||||
|
let window_size_pretty = format!(
|
||||||
|
"{}s",
|
||||||
|
self.config.time_window_size().unwrap_or_default().as_secs()
|
||||||
|
);
|
||||||
|
let inner_window_size_pretty = window_size_pretty.clone();
|
||||||
|
let flow_id = self.config.flow_id;
|
||||||
|
let slow_query_metric_task = tokio::task::spawn(async move {
|
||||||
|
tokio::time::sleep(SLOW_QUERY_THRESHOLD).await;
|
||||||
|
METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT
|
||||||
|
.with_label_values(&[
|
||||||
|
flow_id.to_string().as_str(),
|
||||||
|
&peer_inner.to_string(),
|
||||||
|
inner_window_size_pretty.as_str(),
|
||||||
|
])
|
||||||
|
.add(1.0);
|
||||||
|
while rx.try_recv() == Err(TryRecvError::Empty) {
|
||||||
|
// sleep for a while before next update
|
||||||
|
tokio::time::sleep(MIN_REFRESH_DURATION).await;
|
||||||
|
}
|
||||||
|
METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT
|
||||||
|
.with_label_values(&[
|
||||||
|
flow_id.to_string().as_str(),
|
||||||
|
&peer_inner.to_string(),
|
||||||
|
inner_window_size_pretty.as_str(),
|
||||||
|
])
|
||||||
|
.sub(1.0);
|
||||||
|
});
|
||||||
|
self.state.write().unwrap().slow_query_metric_task = Some(slow_query_metric_task);
|
||||||
|
|
||||||
let res = {
|
let res = {
|
||||||
let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
|
let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
|
||||||
.with_label_values(&[flow_id.to_string().as_str()])
|
.with_label_values(&[
|
||||||
|
flow_id.to_string().as_str(),
|
||||||
|
format!(
|
||||||
|
"{}s",
|
||||||
|
self.config.time_window_size().unwrap_or_default().as_secs()
|
||||||
|
)
|
||||||
|
.as_str(),
|
||||||
|
])
|
||||||
.start_timer();
|
.start_timer();
|
||||||
|
|
||||||
// hack and special handling the insert logical plan
|
// hack and special handling the insert logical plan
|
||||||
@@ -361,16 +419,21 @@ impl BatchingTask {
|
|||||||
};
|
};
|
||||||
|
|
||||||
frontend_client
|
frontend_client
|
||||||
.handle(req, catalog, schema, &mut peer_desc)
|
.handle(req, catalog, schema, Some(db.peer), Some(self))
|
||||||
.await
|
.await
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// signaling the slow query metric task to stop
|
||||||
|
let _ = tx.send(());
|
||||||
let elapsed = instant.elapsed();
|
let elapsed = instant.elapsed();
|
||||||
if let Ok(affected_rows) = &res {
|
if let Ok(affected_rows) = &res {
|
||||||
debug!(
|
debug!(
|
||||||
"Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}",
|
"Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}",
|
||||||
elapsed
|
elapsed
|
||||||
);
|
);
|
||||||
|
METRIC_FLOW_ROWS
|
||||||
|
.with_label_values(&[format!("{}-out-batching", flow_id).as_str()])
|
||||||
|
.inc_by(*affected_rows as _);
|
||||||
} else if let Err(err) = &res {
|
} else if let Err(err) = &res {
|
||||||
warn!(
|
warn!(
|
||||||
"Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
|
"Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
|
||||||
@@ -387,7 +450,12 @@ impl BatchingTask {
|
|||||||
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
|
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
|
||||||
.with_label_values(&[
|
.with_label_values(&[
|
||||||
flow_id.to_string().as_str(),
|
flow_id.to_string().as_str(),
|
||||||
&peer_desc.unwrap_or_default().to_string(),
|
&peer_desc.to_string(),
|
||||||
|
format!(
|
||||||
|
"{}s",
|
||||||
|
self.config.time_window_size().unwrap_or_default().as_secs()
|
||||||
|
)
|
||||||
|
.as_str(),
|
||||||
])
|
])
|
||||||
.observe(elapsed.as_secs_f64());
|
.observe(elapsed.as_secs_f64());
|
||||||
}
|
}
|
||||||
@@ -410,6 +478,7 @@ impl BatchingTask {
|
|||||||
engine: QueryEngineRef,
|
engine: QueryEngineRef,
|
||||||
frontend_client: Arc<FrontendClient>,
|
frontend_client: Arc<FrontendClient>,
|
||||||
) {
|
) {
|
||||||
|
let flow_id_str = self.config.flow_id.to_string();
|
||||||
loop {
|
loop {
|
||||||
// first check if shutdown signal is received
|
// first check if shutdown signal is received
|
||||||
// if so, break the loop
|
// if so, break the loop
|
||||||
@@ -427,6 +496,9 @@ impl BatchingTask {
|
|||||||
Err(TryRecvError::Empty) => (),
|
Err(TryRecvError::Empty) => (),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT
|
||||||
|
.with_label_values(&[&flow_id_str])
|
||||||
|
.inc();
|
||||||
|
|
||||||
let new_query = match self.gen_insert_plan(&engine).await {
|
let new_query = match self.gen_insert_plan(&engine).await {
|
||||||
Ok(new_query) => new_query,
|
Ok(new_query) => new_query,
|
||||||
@@ -473,6 +545,9 @@ impl BatchingTask {
|
|||||||
}
|
}
|
||||||
// TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
|
// TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
|
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
|
||||||
|
.with_label_values(&[&flow_id_str])
|
||||||
|
.inc();
|
||||||
match new_query {
|
match new_query {
|
||||||
Some(query) => {
|
Some(query) => {
|
||||||
common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id)
|
common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id)
|
||||||
@@ -580,19 +655,20 @@ impl BatchingTask {
|
|||||||
),
|
),
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
let expr = self
|
let expr = {
|
||||||
.state
|
let mut state = self.state.write().unwrap();
|
||||||
.write()
|
let max_window_cnt = state
|
||||||
.unwrap()
|
.max_filter_num
|
||||||
.dirty_time_windows
|
.unwrap_or(DirtyTimeWindows::MAX_FILTER_NUM);
|
||||||
.gen_filter_exprs(
|
state.dirty_time_windows.gen_filter_exprs(
|
||||||
&col_name,
|
&col_name,
|
||||||
Some(l),
|
Some(l),
|
||||||
window_size,
|
window_size,
|
||||||
DirtyTimeWindows::MAX_FILTER_NUM,
|
max_window_cnt,
|
||||||
self.config.flow_id,
|
self.config.flow_id,
|
||||||
Some(self),
|
Some(self),
|
||||||
)?;
|
)?
|
||||||
|
};
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
"Flow id={:?}, Generated filter expr: {:?}",
|
"Flow id={:?}, Generated filter expr: {:?}",
|
||||||
|
|||||||
@@ -31,22 +31,37 @@ lazy_static! {
|
|||||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME: HistogramVec = register_histogram_vec!(
|
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME: HistogramVec = register_histogram_vec!(
|
||||||
"greptime_flow_batching_engine_query_time_secs",
|
"greptime_flow_batching_engine_query_time_secs",
|
||||||
"flow batching engine query time(seconds)",
|
"flow batching engine query time(seconds)",
|
||||||
&["flow_id"],
|
&["flow_id", "time_window_granularity"],
|
||||||
vec![0.0, 5., 10., 20., 40., 80., 160., 320., 640.,]
|
vec![0.0, 5., 10., 20., 40., 80., 160., 320., 640.,]
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY: HistogramVec = register_histogram_vec!(
|
pub static ref METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY: HistogramVec = register_histogram_vec!(
|
||||||
"greptime_flow_batching_engine_slow_query_secs",
|
"greptime_flow_batching_engine_slow_query_secs",
|
||||||
"flow batching engine slow query(seconds)",
|
"flow batching engine slow query(seconds), updated after query finished",
|
||||||
&["flow_id", "peer"],
|
&["flow_id", "peer", "time_window_granularity"],
|
||||||
vec![60., 2. * 60., 3. * 60., 5. * 60., 10. * 60.]
|
vec![60., 2. * 60., 3. * 60., 5. * 60., 10. * 60.]
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
pub static ref METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT: GaugeVec =
|
||||||
|
register_gauge_vec!(
|
||||||
|
"greptime_flow_batching_engine_real_time_slow_query_number",
|
||||||
|
"flow batching engine real time slow query number, updated in real time",
|
||||||
|
&["flow_id", "peer", "time_window_granularity"],
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
pub static ref METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT: HistogramVec =
|
||||||
|
register_histogram_vec!(
|
||||||
|
"greptime_flow_batching_engine_stalled_query_window_cnt",
|
||||||
|
"flow batching engine stalled query time window count",
|
||||||
|
&["flow_id", "time_window_granularity"],
|
||||||
|
vec![0.0, 5., 10., 20., 40.]
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT: HistogramVec =
|
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT: HistogramVec =
|
||||||
register_histogram_vec!(
|
register_histogram_vec!(
|
||||||
"greptime_flow_batching_engine_query_window_cnt",
|
"greptime_flow_batching_engine_query_window_cnt",
|
||||||
"flow batching engine query time window count",
|
"flow batching engine query time window count",
|
||||||
&["flow_id"],
|
&["flow_id", "time_window_granularity"],
|
||||||
vec![0.0, 5., 10., 20., 40.]
|
vec![0.0, 5., 10., 20., 40.]
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
@@ -54,10 +69,32 @@ lazy_static! {
|
|||||||
register_histogram_vec!(
|
register_histogram_vec!(
|
||||||
"greptime_flow_batching_engine_query_time_range_secs",
|
"greptime_flow_batching_engine_query_time_range_secs",
|
||||||
"flow batching engine query time range(seconds)",
|
"flow batching engine query time range(seconds)",
|
||||||
&["flow_id"],
|
&["flow_id", "time_window_granularity"],
|
||||||
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
|
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
pub static ref METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD: HistogramVec =
|
||||||
|
register_histogram_vec!(
|
||||||
|
"greptime_flow_batching_engine_guess_fe_load",
|
||||||
|
"flow batching engine guessed frontend load",
|
||||||
|
&["fe_addr"],
|
||||||
|
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
pub static ref METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT: IntCounterVec =
|
||||||
|
register_int_counter_vec!(
|
||||||
|
"greptime_flow_batching_start_query_count",
|
||||||
|
"flow batching engine started query count",
|
||||||
|
&["flow_id"],
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
pub static ref METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT: IntCounterVec =
|
||||||
|
register_int_counter_vec!(
|
||||||
|
"greptime_flow_batching_error_count",
|
||||||
|
"flow batching engine error count per flow id",
|
||||||
|
&["flow_id"],
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge =
|
pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge =
|
||||||
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
|
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
|
||||||
pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(
|
pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(
|
||||||
|
|||||||
@@ -154,6 +154,7 @@ where
|
|||||||
ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()),
|
ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()),
|
||||||
user_provider.clone(),
|
user_provider.clone(),
|
||||||
runtime,
|
runtime,
|
||||||
|
opts.grpc.flight_compression,
|
||||||
);
|
);
|
||||||
|
|
||||||
let grpc_server = builder
|
let grpc_server = builder
|
||||||
|
|||||||
@@ -12,7 +12,7 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
use api::v1::flow::FlowRequestHeader;
|
use api::v1::flow::{AdjustFlow, FlowRequestHeader};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use common_error::ext::BoxedError;
|
use common_error::ext::BoxedError;
|
||||||
use common_function::handlers::FlowServiceHandler;
|
use common_function::handlers::FlowServiceHandler;
|
||||||
@@ -22,6 +22,7 @@ use common_query::error::Result;
|
|||||||
use common_telemetry::tracing_context::TracingContext;
|
use common_telemetry::tracing_context::TracingContext;
|
||||||
use futures::stream::FuturesUnordered;
|
use futures::stream::FuturesUnordered;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
|
use serde_json::json;
|
||||||
use session::context::QueryContextRef;
|
use session::context::QueryContextRef;
|
||||||
use snafu::{OptionExt, ResultExt};
|
use snafu::{OptionExt, ResultExt};
|
||||||
|
|
||||||
@@ -57,9 +58,96 @@ impl FlowServiceHandler for FlowServiceOperator {
|
|||||||
) -> Result<api::v1::flow::FlowResponse> {
|
) -> Result<api::v1::flow::FlowResponse> {
|
||||||
self.flush_inner(catalog, flow, ctx).await
|
self.flush_inner(catalog, flow, ctx).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn adjust(
|
||||||
|
&self,
|
||||||
|
catalog: &str,
|
||||||
|
flow: &str,
|
||||||
|
min_run_interval_secs: u64,
|
||||||
|
max_filter_num_per_query: usize,
|
||||||
|
ctx: QueryContextRef,
|
||||||
|
) -> Result<api::v1::flow::FlowResponse> {
|
||||||
|
self.adjust_inner(
|
||||||
|
catalog,
|
||||||
|
flow,
|
||||||
|
min_run_interval_secs,
|
||||||
|
max_filter_num_per_query,
|
||||||
|
ctx,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FlowServiceOperator {
|
impl FlowServiceOperator {
|
||||||
|
async fn adjust_inner(
|
||||||
|
&self,
|
||||||
|
catalog: &str,
|
||||||
|
flow: &str,
|
||||||
|
min_run_interval_secs: u64,
|
||||||
|
max_filter_num_per_query: usize,
|
||||||
|
ctx: QueryContextRef,
|
||||||
|
) -> Result<api::v1::flow::FlowResponse> {
|
||||||
|
let id = self
|
||||||
|
.flow_metadata_manager
|
||||||
|
.flow_name_manager()
|
||||||
|
.get(catalog, flow)
|
||||||
|
.await
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
.context(common_query::error::ExecuteSnafu)?
|
||||||
|
.context(common_meta::error::FlowNotFoundSnafu {
|
||||||
|
flow_name: format!("{}.{}", catalog, flow),
|
||||||
|
})
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
.context(common_query::error::ExecuteSnafu)?
|
||||||
|
.flow_id();
|
||||||
|
|
||||||
|
let all_flownode_peers = self
|
||||||
|
.flow_metadata_manager
|
||||||
|
.flow_route_manager()
|
||||||
|
.routes(id)
|
||||||
|
.await
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
.context(common_query::error::ExecuteSnafu)?;
|
||||||
|
|
||||||
|
// order of flownodes doesn't matter here
|
||||||
|
let all_flow_nodes = FuturesUnordered::from_iter(
|
||||||
|
all_flownode_peers
|
||||||
|
.iter()
|
||||||
|
.map(|(_key, peer)| self.node_manager.flownode(peer.peer())),
|
||||||
|
)
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// TODO(discord9): use proper type for flow options
|
||||||
|
let options = json!({
|
||||||
|
"min_run_interval_secs": min_run_interval_secs,
|
||||||
|
"max_filter_num_per_query": max_filter_num_per_query,
|
||||||
|
});
|
||||||
|
|
||||||
|
for node in all_flow_nodes {
|
||||||
|
let _res = {
|
||||||
|
use api::v1::flow::{flow_request, FlowRequest};
|
||||||
|
let flush_req = FlowRequest {
|
||||||
|
header: Some(FlowRequestHeader {
|
||||||
|
tracing_context: TracingContext::from_current_span().to_w3c(),
|
||||||
|
query_context: Some(
|
||||||
|
common_meta::rpc::ddl::QueryContext::from(ctx.clone()).into(),
|
||||||
|
),
|
||||||
|
}),
|
||||||
|
body: Some(flow_request::Body::Adjust(AdjustFlow {
|
||||||
|
flow_id: Some(api::v1::FlowId { id }),
|
||||||
|
options: options.to_string(),
|
||||||
|
})),
|
||||||
|
};
|
||||||
|
node.handle(flush_req)
|
||||||
|
.await
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
.context(common_query::error::ExecuteSnafu)?
|
||||||
|
};
|
||||||
|
}
|
||||||
|
Ok(Default::default())
|
||||||
|
}
|
||||||
|
|
||||||
/// Flush the flownodes according to the flow id.
|
/// Flush the flownodes according to the flow id.
|
||||||
async fn flush_inner(
|
async fn flush_inner(
|
||||||
&self,
|
&self,
|
||||||
|
|||||||
@@ -66,6 +66,8 @@ pub struct GrpcOptions {
|
|||||||
pub max_recv_message_size: ReadableSize,
|
pub max_recv_message_size: ReadableSize,
|
||||||
/// Max gRPC sending(encoding) message size
|
/// Max gRPC sending(encoding) message size
|
||||||
pub max_send_message_size: ReadableSize,
|
pub max_send_message_size: ReadableSize,
|
||||||
|
/// Compression mode in Arrow Flight service.
|
||||||
|
pub flight_compression: FlightCompression,
|
||||||
pub runtime_size: usize,
|
pub runtime_size: usize,
|
||||||
#[serde(default = "Default::default")]
|
#[serde(default = "Default::default")]
|
||||||
pub tls: TlsOption,
|
pub tls: TlsOption,
|
||||||
@@ -114,6 +116,7 @@ impl Default for GrpcOptions {
|
|||||||
server_addr: String::new(),
|
server_addr: String::new(),
|
||||||
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
|
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
|
||||||
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
|
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
|
||||||
|
flight_compression: FlightCompression::ArrowIpc,
|
||||||
runtime_size: 8,
|
runtime_size: 8,
|
||||||
tls: TlsOption::default(),
|
tls: TlsOption::default(),
|
||||||
}
|
}
|
||||||
@@ -132,6 +135,30 @@ impl GrpcOptions {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Default)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub enum FlightCompression {
|
||||||
|
/// Disable all compression in Arrow Flight service.
|
||||||
|
None,
|
||||||
|
/// Enable only transport layer compression (zstd).
|
||||||
|
Transport,
|
||||||
|
/// Enable only payload compression (lz4)
|
||||||
|
#[default]
|
||||||
|
ArrowIpc,
|
||||||
|
/// Enable all compression.
|
||||||
|
All,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FlightCompression {
|
||||||
|
pub fn transport_compression(&self) -> bool {
|
||||||
|
self == &FlightCompression::Transport || self == &FlightCompression::All
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn arrow_compression(&self) -> bool {
|
||||||
|
self == &FlightCompression::ArrowIpc || self == &FlightCompression::All
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub struct GrpcServer {
|
pub struct GrpcServer {
|
||||||
// states
|
// states
|
||||||
shutdown_tx: Mutex<Option<Sender<()>>>,
|
shutdown_tx: Mutex<Option<Sender<()>>>,
|
||||||
|
|||||||
@@ -45,7 +45,7 @@ use tonic::{Request, Response, Status, Streaming};
|
|||||||
use crate::error::{InvalidParameterSnafu, ParseJsonSnafu, Result, ToJsonSnafu};
|
use crate::error::{InvalidParameterSnafu, ParseJsonSnafu, Result, ToJsonSnafu};
|
||||||
pub use crate::grpc::flight::stream::FlightRecordBatchStream;
|
pub use crate::grpc::flight::stream::FlightRecordBatchStream;
|
||||||
use crate::grpc::greptime_handler::{get_request_type, GreptimeRequestHandler};
|
use crate::grpc::greptime_handler::{get_request_type, GreptimeRequestHandler};
|
||||||
use crate::grpc::TonicResult;
|
use crate::grpc::{FlightCompression, TonicResult};
|
||||||
use crate::http::header::constants::GREPTIME_DB_HEADER_NAME;
|
use crate::http::header::constants::GREPTIME_DB_HEADER_NAME;
|
||||||
use crate::http::AUTHORIZATION_HEADER;
|
use crate::http::AUTHORIZATION_HEADER;
|
||||||
use crate::{error, hint_headers};
|
use crate::{error, hint_headers};
|
||||||
@@ -195,9 +195,14 @@ impl FlightCraft for GreptimeRequestHandler {
|
|||||||
protocol = "grpc",
|
protocol = "grpc",
|
||||||
request_type = get_request_type(&request)
|
request_type = get_request_type(&request)
|
||||||
);
|
);
|
||||||
|
let flight_compression = self.flight_compression;
|
||||||
async {
|
async {
|
||||||
let output = self.handle_request(request, hints).await?;
|
let output = self.handle_request(request, hints).await?;
|
||||||
let stream = to_flight_data_stream(output, TracingContext::from_current_span());
|
let stream = to_flight_data_stream(
|
||||||
|
output,
|
||||||
|
TracingContext::from_current_span(),
|
||||||
|
flight_compression,
|
||||||
|
);
|
||||||
Ok(Response::new(stream))
|
Ok(Response::new(stream))
|
||||||
}
|
}
|
||||||
.trace(span)
|
.trace(span)
|
||||||
@@ -365,14 +370,16 @@ impl Stream for PutRecordBatchRequestStream {
|
|||||||
fn to_flight_data_stream(
|
fn to_flight_data_stream(
|
||||||
output: Output,
|
output: Output,
|
||||||
tracing_context: TracingContext,
|
tracing_context: TracingContext,
|
||||||
|
flight_compression: FlightCompression,
|
||||||
) -> TonicStream<FlightData> {
|
) -> TonicStream<FlightData> {
|
||||||
match output.data {
|
match output.data {
|
||||||
OutputData::Stream(stream) => {
|
OutputData::Stream(stream) => {
|
||||||
let stream = FlightRecordBatchStream::new(stream, tracing_context);
|
let stream = FlightRecordBatchStream::new(stream, tracing_context, flight_compression);
|
||||||
Box::pin(stream) as _
|
Box::pin(stream) as _
|
||||||
}
|
}
|
||||||
OutputData::RecordBatches(x) => {
|
OutputData::RecordBatches(x) => {
|
||||||
let stream = FlightRecordBatchStream::new(x.as_stream(), tracing_context);
|
let stream =
|
||||||
|
FlightRecordBatchStream::new(x.as_stream(), tracing_context, flight_compression);
|
||||||
Box::pin(stream) as _
|
Box::pin(stream) as _
|
||||||
}
|
}
|
||||||
OutputData::AffectedRows(rows) => {
|
OutputData::AffectedRows(rows) => {
|
||||||
|
|||||||
@@ -30,6 +30,7 @@ use tokio::task::JoinHandle;
|
|||||||
|
|
||||||
use crate::error;
|
use crate::error;
|
||||||
use crate::grpc::flight::TonicResult;
|
use crate::grpc::flight::TonicResult;
|
||||||
|
use crate::grpc::FlightCompression;
|
||||||
|
|
||||||
#[pin_project(PinnedDrop)]
|
#[pin_project(PinnedDrop)]
|
||||||
pub struct FlightRecordBatchStream {
|
pub struct FlightRecordBatchStream {
|
||||||
@@ -41,18 +42,27 @@ pub struct FlightRecordBatchStream {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl FlightRecordBatchStream {
|
impl FlightRecordBatchStream {
|
||||||
pub fn new(recordbatches: SendableRecordBatchStream, tracing_context: TracingContext) -> Self {
|
pub fn new(
|
||||||
|
recordbatches: SendableRecordBatchStream,
|
||||||
|
tracing_context: TracingContext,
|
||||||
|
compression: FlightCompression,
|
||||||
|
) -> Self {
|
||||||
let (tx, rx) = mpsc::channel::<TonicResult<FlightMessage>>(1);
|
let (tx, rx) = mpsc::channel::<TonicResult<FlightMessage>>(1);
|
||||||
let join_handle = common_runtime::spawn_global(async move {
|
let join_handle = common_runtime::spawn_global(async move {
|
||||||
Self::flight_data_stream(recordbatches, tx)
|
Self::flight_data_stream(recordbatches, tx)
|
||||||
.trace(tracing_context.attach(info_span!("flight_data_stream")))
|
.trace(tracing_context.attach(info_span!("flight_data_stream")))
|
||||||
.await
|
.await
|
||||||
});
|
});
|
||||||
|
let encoder = if compression.arrow_compression() {
|
||||||
|
FlightEncoder::default()
|
||||||
|
} else {
|
||||||
|
FlightEncoder::with_compression_disabled()
|
||||||
|
};
|
||||||
Self {
|
Self {
|
||||||
rx,
|
rx,
|
||||||
join_handle,
|
join_handle,
|
||||||
done: false,
|
done: false,
|
||||||
encoder: FlightEncoder::with_compression_disabled(),
|
encoder,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -161,7 +171,11 @@ mod test {
|
|||||||
let recordbatches = RecordBatches::try_new(schema.clone(), vec![recordbatch.clone()])
|
let recordbatches = RecordBatches::try_new(schema.clone(), vec![recordbatch.clone()])
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.as_stream();
|
.as_stream();
|
||||||
let mut stream = FlightRecordBatchStream::new(recordbatches, TracingContext::default());
|
let mut stream = FlightRecordBatchStream::new(
|
||||||
|
recordbatches,
|
||||||
|
TracingContext::default(),
|
||||||
|
FlightCompression::default(),
|
||||||
|
);
|
||||||
|
|
||||||
let mut raw_data = Vec::with_capacity(2);
|
let mut raw_data = Vec::with_capacity(2);
|
||||||
raw_data.push(stream.next().await.unwrap().unwrap());
|
raw_data.push(stream.next().await.unwrap().unwrap());
|
||||||
|
|||||||
@@ -49,7 +49,7 @@ use crate::error::{
|
|||||||
JoinTaskSnafu, NotFoundAuthHeaderSnafu, Result, UnknownHintSnafu,
|
JoinTaskSnafu, NotFoundAuthHeaderSnafu, Result, UnknownHintSnafu,
|
||||||
};
|
};
|
||||||
use crate::grpc::flight::{PutRecordBatchRequest, PutRecordBatchRequestStream};
|
use crate::grpc::flight::{PutRecordBatchRequest, PutRecordBatchRequestStream};
|
||||||
use crate::grpc::TonicResult;
|
use crate::grpc::{FlightCompression, TonicResult};
|
||||||
use crate::metrics;
|
use crate::metrics;
|
||||||
use crate::metrics::{METRIC_AUTH_FAILURE, METRIC_SERVER_GRPC_DB_REQUEST_TIMER};
|
use crate::metrics::{METRIC_AUTH_FAILURE, METRIC_SERVER_GRPC_DB_REQUEST_TIMER};
|
||||||
use crate::query_handler::grpc::ServerGrpcQueryHandlerRef;
|
use crate::query_handler::grpc::ServerGrpcQueryHandlerRef;
|
||||||
@@ -59,6 +59,7 @@ pub struct GreptimeRequestHandler {
|
|||||||
handler: ServerGrpcQueryHandlerRef,
|
handler: ServerGrpcQueryHandlerRef,
|
||||||
user_provider: Option<UserProviderRef>,
|
user_provider: Option<UserProviderRef>,
|
||||||
runtime: Option<Runtime>,
|
runtime: Option<Runtime>,
|
||||||
|
pub(crate) flight_compression: FlightCompression,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl GreptimeRequestHandler {
|
impl GreptimeRequestHandler {
|
||||||
@@ -66,11 +67,13 @@ impl GreptimeRequestHandler {
|
|||||||
handler: ServerGrpcQueryHandlerRef,
|
handler: ServerGrpcQueryHandlerRef,
|
||||||
user_provider: Option<UserProviderRef>,
|
user_provider: Option<UserProviderRef>,
|
||||||
runtime: Option<Runtime>,
|
runtime: Option<Runtime>,
|
||||||
|
flight_compression: FlightCompression,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
handler,
|
handler,
|
||||||
user_provider,
|
user_provider,
|
||||||
runtime,
|
runtime,
|
||||||
|
flight_compression,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -34,7 +34,7 @@ mod test {
|
|||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use servers::grpc::builder::GrpcServerBuilder;
|
use servers::grpc::builder::GrpcServerBuilder;
|
||||||
use servers::grpc::greptime_handler::GreptimeRequestHandler;
|
use servers::grpc::greptime_handler::GreptimeRequestHandler;
|
||||||
use servers::grpc::GrpcServerConfig;
|
use servers::grpc::{FlightCompression, GrpcServerConfig};
|
||||||
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter;
|
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter;
|
||||||
use servers::server::Server;
|
use servers::server::Server;
|
||||||
|
|
||||||
@@ -94,6 +94,7 @@ mod test {
|
|||||||
)
|
)
|
||||||
.ok(),
|
.ok(),
|
||||||
Some(runtime.clone()),
|
Some(runtime.clone()),
|
||||||
|
FlightCompression::default(),
|
||||||
);
|
);
|
||||||
let mut grpc_server = GrpcServerBuilder::new(GrpcServerConfig::default(), runtime)
|
let mut grpc_server = GrpcServerBuilder::new(GrpcServerConfig::default(), runtime)
|
||||||
.flight_handler(Arc::new(greptime_request_handler))
|
.flight_handler(Arc::new(greptime_request_handler))
|
||||||
@@ -139,8 +140,7 @@ mod test {
|
|||||||
let schema = record_batches[0].schema.arrow_schema().clone();
|
let schema = record_batches[0].schema.arrow_schema().clone();
|
||||||
|
|
||||||
let stream = futures::stream::once(async move {
|
let stream = futures::stream::once(async move {
|
||||||
let mut schema_data =
|
let mut schema_data = FlightEncoder::default().encode(FlightMessage::Schema(schema));
|
||||||
FlightEncoder::with_compression_disabled().encode(FlightMessage::Schema(schema));
|
|
||||||
let metadata = DoPutMetadata::new(0);
|
let metadata = DoPutMetadata::new(0);
|
||||||
schema_data.app_metadata = serde_json::to_vec(&metadata).unwrap().into();
|
schema_data.app_metadata = serde_json::to_vec(&metadata).unwrap().into();
|
||||||
// first message in "DoPut" stream should carry table name in flight descriptor
|
// first message in "DoPut" stream should carry table name in flight descriptor
|
||||||
@@ -155,7 +155,7 @@ mod test {
|
|||||||
tokio_stream::iter(record_batches)
|
tokio_stream::iter(record_batches)
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.map(|(i, x)| {
|
.map(|(i, x)| {
|
||||||
let mut encoder = FlightEncoder::with_compression_disabled();
|
let mut encoder = FlightEncoder::default();
|
||||||
let message = FlightMessage::RecordBatch(x.into_df_record_batch());
|
let message = FlightMessage::RecordBatch(x.into_df_record_batch());
|
||||||
let mut data = encoder.encode(message);
|
let mut data = encoder.encode(message);
|
||||||
let metadata = DoPutMetadata::new((i + 1) as i64);
|
let metadata = DoPutMetadata::new((i + 1) as i64);
|
||||||
|
|||||||
@@ -42,7 +42,7 @@ use object_store::test_util::TempFolder;
|
|||||||
use object_store::ObjectStore;
|
use object_store::ObjectStore;
|
||||||
use servers::grpc::builder::GrpcServerBuilder;
|
use servers::grpc::builder::GrpcServerBuilder;
|
||||||
use servers::grpc::greptime_handler::GreptimeRequestHandler;
|
use servers::grpc::greptime_handler::GreptimeRequestHandler;
|
||||||
use servers::grpc::{GrpcOptions, GrpcServer, GrpcServerConfig};
|
use servers::grpc::{FlightCompression, GrpcOptions, GrpcServer, GrpcServerConfig};
|
||||||
use servers::http::{HttpOptions, HttpServerBuilder, PromValidationMode};
|
use servers::http::{HttpOptions, HttpServerBuilder, PromValidationMode};
|
||||||
use servers::metrics_handler::MetricsHandler;
|
use servers::metrics_handler::MetricsHandler;
|
||||||
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
|
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
|
||||||
@@ -585,6 +585,7 @@ pub async fn setup_grpc_server_with(
|
|||||||
ServerGrpcQueryHandlerAdapter::arc(fe_instance_ref.clone()),
|
ServerGrpcQueryHandlerAdapter::arc(fe_instance_ref.clone()),
|
||||||
user_provider.clone(),
|
user_provider.clone(),
|
||||||
Some(runtime.clone()),
|
Some(runtime.clone()),
|
||||||
|
FlightCompression::default(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let flight_handler = Arc::new(greptime_request_handler.clone());
|
let flight_handler = Arc::new(greptime_request_handler.clone());
|
||||||
|
|||||||
@@ -1025,6 +1025,7 @@ bind_addr = "127.0.0.1:4001"
|
|||||||
server_addr = "127.0.0.1:4001"
|
server_addr = "127.0.0.1:4001"
|
||||||
max_recv_message_size = "512MiB"
|
max_recv_message_size = "512MiB"
|
||||||
max_send_message_size = "512MiB"
|
max_send_message_size = "512MiB"
|
||||||
|
flight_compression = "arrow_ipc"
|
||||||
runtime_size = 8
|
runtime_size = 8
|
||||||
|
|
||||||
[grpc.tls]
|
[grpc.tls]
|
||||||
|
|||||||
Reference in New Issue
Block a user