mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-09 14:52:58 +00:00
Compare commits
4 Commits
zhongzc/re
...
basic_with
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
af90fad499 | ||
|
|
22c61432f6 | ||
|
|
91f373e66e | ||
|
|
1d53dd26ae |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -2336,7 +2336,6 @@ dependencies = [
|
|||||||
"num-traits",
|
"num-traits",
|
||||||
"once_cell",
|
"once_cell",
|
||||||
"paste",
|
"paste",
|
||||||
"promql",
|
|
||||||
"s2",
|
"s2",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
|
|||||||
@@ -232,6 +232,7 @@
|
|||||||
| `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
|
| `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
|
||||||
| `grpc.server_addr` | String | `127.0.0.1:4001` | The address advertised to the metasrv, and used for connections from outside the host.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `grpc.bind_addr`. |
|
| `grpc.server_addr` | String | `127.0.0.1:4001` | The address advertised to the metasrv, and used for connections from outside the host.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `grpc.bind_addr`. |
|
||||||
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
|
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
|
||||||
|
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for frontend side Arrow IPC service. Available options:<br/>- `none`: disable all compression<br/>- `transport`: only enable gRPC transport compression (zstd)<br/>- `arrow_ipc`: only enable Arrow IPC compression (lz4)<br/>- `all`: enable all compression. |
|
||||||
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
|
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
|
||||||
| `grpc.tls.mode` | String | `disable` | TLS mode. |
|
| `grpc.tls.mode` | String | `disable` | TLS mode. |
|
||||||
| `grpc.tls.cert_path` | String | Unset | Certificate file path. |
|
| `grpc.tls.cert_path` | String | Unset | Certificate file path. |
|
||||||
@@ -404,6 +405,7 @@
|
|||||||
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
|
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
|
||||||
| `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
|
| `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
|
||||||
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
|
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
|
||||||
|
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for datanode side Arrow IPC service. Available options:<br/>- `none`: disable all compression<br/>- `transport`: only enable gRPC transport compression (zstd)<br/>- `arrow_ipc`: only enable Arrow IPC compression (lz4)<br/>- `all`: enable all compression. |
|
||||||
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
|
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
|
||||||
| `grpc.tls.mode` | String | `disable` | TLS mode. |
|
| `grpc.tls.mode` | String | `disable` | TLS mode. |
|
||||||
| `grpc.tls.cert_path` | String | Unset | Certificate file path. |
|
| `grpc.tls.cert_path` | String | Unset | Certificate file path. |
|
||||||
|
|||||||
@@ -44,6 +44,13 @@ runtime_size = 8
|
|||||||
max_recv_message_size = "512MB"
|
max_recv_message_size = "512MB"
|
||||||
## The maximum send message size for gRPC server.
|
## The maximum send message size for gRPC server.
|
||||||
max_send_message_size = "512MB"
|
max_send_message_size = "512MB"
|
||||||
|
## Compression mode for datanode side Arrow IPC service. Available options:
|
||||||
|
## - `none`: disable all compression
|
||||||
|
## - `transport`: only enable gRPC transport compression (zstd)
|
||||||
|
## - `arrow_ipc`: only enable Arrow IPC compression (lz4)
|
||||||
|
## - `all`: enable all compression.
|
||||||
|
## Default to `none`
|
||||||
|
flight_compression = "arrow_ipc"
|
||||||
|
|
||||||
## gRPC server TLS options, see `mysql.tls` section.
|
## gRPC server TLS options, see `mysql.tls` section.
|
||||||
[grpc.tls]
|
[grpc.tls]
|
||||||
|
|||||||
@@ -54,6 +54,13 @@ bind_addr = "127.0.0.1:4001"
|
|||||||
server_addr = "127.0.0.1:4001"
|
server_addr = "127.0.0.1:4001"
|
||||||
## The number of server worker threads.
|
## The number of server worker threads.
|
||||||
runtime_size = 8
|
runtime_size = 8
|
||||||
|
## Compression mode for frontend side Arrow IPC service. Available options:
|
||||||
|
## - `none`: disable all compression
|
||||||
|
## - `transport`: only enable gRPC transport compression (zstd)
|
||||||
|
## - `arrow_ipc`: only enable Arrow IPC compression (lz4)
|
||||||
|
## - `all`: enable all compression.
|
||||||
|
## Default to `none`
|
||||||
|
flight_compression = "arrow_ipc"
|
||||||
|
|
||||||
## gRPC server TLS options, see `mysql.tls` section.
|
## gRPC server TLS options, see `mysql.tls` section.
|
||||||
[grpc.tls]
|
[grpc.tls]
|
||||||
|
|||||||
@@ -162,12 +162,23 @@ impl Client {
|
|||||||
.as_bytes() as usize
|
.as_bytes() as usize
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn make_flight_client(&self) -> Result<FlightClient> {
|
pub fn make_flight_client(
|
||||||
|
&self,
|
||||||
|
send_compression: bool,
|
||||||
|
accept_compression: bool,
|
||||||
|
) -> Result<FlightClient> {
|
||||||
let (addr, channel) = self.find_channel()?;
|
let (addr, channel) = self.find_channel()?;
|
||||||
|
|
||||||
let client = FlightServiceClient::new(channel)
|
let mut client = FlightServiceClient::new(channel)
|
||||||
.max_decoding_message_size(self.max_grpc_recv_message_size())
|
.max_decoding_message_size(self.max_grpc_recv_message_size())
|
||||||
.max_encoding_message_size(self.max_grpc_send_message_size());
|
.max_encoding_message_size(self.max_grpc_send_message_size());
|
||||||
|
// todo(hl): support compression methods.
|
||||||
|
if send_compression {
|
||||||
|
client = client.send_compressed(CompressionEncoding::Zstd);
|
||||||
|
}
|
||||||
|
if accept_compression {
|
||||||
|
client = client.accept_compressed(CompressionEncoding::Zstd);
|
||||||
|
}
|
||||||
|
|
||||||
Ok(FlightClient { addr, client })
|
Ok(FlightClient { addr, client })
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -49,7 +49,16 @@ impl NodeManager for NodeClients {
|
|||||||
async fn datanode(&self, datanode: &Peer) -> DatanodeRef {
|
async fn datanode(&self, datanode: &Peer) -> DatanodeRef {
|
||||||
let client = self.get_client(datanode).await;
|
let client = self.get_client(datanode).await;
|
||||||
|
|
||||||
Arc::new(RegionRequester::new(client))
|
let ChannelConfig {
|
||||||
|
send_compression,
|
||||||
|
accept_compression,
|
||||||
|
..
|
||||||
|
} = self.channel_manager.config();
|
||||||
|
Arc::new(RegionRequester::new(
|
||||||
|
client,
|
||||||
|
*send_compression,
|
||||||
|
*accept_compression,
|
||||||
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn flownode(&self, flownode: &Peer) -> FlownodeRef {
|
async fn flownode(&self, flownode: &Peer) -> FlownodeRef {
|
||||||
|
|||||||
@@ -287,7 +287,7 @@ impl Database {
|
|||||||
let mut request = tonic::Request::new(request);
|
let mut request = tonic::Request::new(request);
|
||||||
Self::put_hints(request.metadata_mut(), hints)?;
|
Self::put_hints(request.metadata_mut(), hints)?;
|
||||||
|
|
||||||
let mut client = self.client.make_flight_client()?;
|
let mut client = self.client.make_flight_client(false, false)?;
|
||||||
|
|
||||||
let response = client.mut_inner().do_get(request).await.or_else(|e| {
|
let response = client.mut_inner().do_get(request).await.or_else(|e| {
|
||||||
let tonic_code = e.code();
|
let tonic_code = e.code();
|
||||||
@@ -409,7 +409,7 @@ impl Database {
|
|||||||
MetadataValue::from_str(db_to_put).context(InvalidTonicMetadataValueSnafu)?,
|
MetadataValue::from_str(db_to_put).context(InvalidTonicMetadataValueSnafu)?,
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut client = self.client.make_flight_client()?;
|
let mut client = self.client.make_flight_client(false, false)?;
|
||||||
let response = client.mut_inner().do_put(request).await?;
|
let response = client.mut_inner().do_put(request).await?;
|
||||||
let response = response
|
let response = response
|
||||||
.into_inner()
|
.into_inner()
|
||||||
|
|||||||
@@ -46,6 +46,8 @@ use crate::{metrics, Client, Error};
|
|||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct RegionRequester {
|
pub struct RegionRequester {
|
||||||
client: Client,
|
client: Client,
|
||||||
|
send_compression: bool,
|
||||||
|
accept_compression: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
@@ -89,12 +91,18 @@ impl Datanode for RegionRequester {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl RegionRequester {
|
impl RegionRequester {
|
||||||
pub fn new(client: Client) -> Self {
|
pub fn new(client: Client, send_compression: bool, accept_compression: bool) -> Self {
|
||||||
Self { client }
|
Self {
|
||||||
|
client,
|
||||||
|
send_compression,
|
||||||
|
accept_compression,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn do_get_inner(&self, ticket: Ticket) -> Result<SendableRecordBatchStream> {
|
pub async fn do_get_inner(&self, ticket: Ticket) -> Result<SendableRecordBatchStream> {
|
||||||
let mut flight_client = self.client.make_flight_client()?;
|
let mut flight_client = self
|
||||||
|
.client
|
||||||
|
.make_flight_client(self.send_compression, self.accept_compression)?;
|
||||||
let response = flight_client
|
let response = flight_client
|
||||||
.mut_inner()
|
.mut_inner()
|
||||||
.do_get(ticket)
|
.do_get(ticket)
|
||||||
|
|||||||
@@ -364,12 +364,16 @@ impl StartCommand {
|
|||||||
|
|
||||||
// frontend to datanode need not timeout.
|
// frontend to datanode need not timeout.
|
||||||
// Some queries are expected to take long time.
|
// Some queries are expected to take long time.
|
||||||
let channel_config = ChannelConfig {
|
let mut channel_config = ChannelConfig {
|
||||||
timeout: None,
|
timeout: None,
|
||||||
tcp_nodelay: opts.datanode.client.tcp_nodelay,
|
tcp_nodelay: opts.datanode.client.tcp_nodelay,
|
||||||
connect_timeout: Some(opts.datanode.client.connect_timeout),
|
connect_timeout: Some(opts.datanode.client.connect_timeout),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
|
if opts.grpc.flight_compression.transport_compression() {
|
||||||
|
channel_config.accept_compression = true;
|
||||||
|
channel_config.send_compression = true;
|
||||||
|
}
|
||||||
let client = NodeClients::new(channel_config);
|
let client = NodeClients::new(channel_config);
|
||||||
|
|
||||||
let instance = FrontendBuilder::new(
|
let instance = FrontendBuilder::new(
|
||||||
|
|||||||
@@ -47,7 +47,6 @@ num = "0.4"
|
|||||||
num-traits = "0.2"
|
num-traits = "0.2"
|
||||||
once_cell.workspace = true
|
once_cell.workspace = true
|
||||||
paste.workspace = true
|
paste.workspace = true
|
||||||
promql.workspace = true
|
|
||||||
s2 = { version = "0.0.12", optional = true }
|
s2 = { version = "0.0.12", optional = true }
|
||||||
serde.workspace = true
|
serde.workspace = true
|
||||||
serde_json.workspace = true
|
serde_json.workspace = true
|
||||||
|
|||||||
@@ -39,7 +39,7 @@ use crate::system::SystemFunction;
|
|||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub struct FunctionRegistry {
|
pub struct FunctionRegistry {
|
||||||
scalar_functions: RwLock<HashMap<String, ScalarFunctionFactory>>,
|
functions: RwLock<HashMap<String, ScalarFunctionFactory>>,
|
||||||
async_functions: RwLock<HashMap<String, AsyncFunctionRef>>,
|
async_functions: RwLock<HashMap<String, AsyncFunctionRef>>,
|
||||||
aggregate_functions: RwLock<HashMap<String, AggregateUDF>>,
|
aggregate_functions: RwLock<HashMap<String, AggregateUDF>>,
|
||||||
}
|
}
|
||||||
@@ -48,7 +48,7 @@ impl FunctionRegistry {
|
|||||||
pub fn register(&self, func: impl Into<ScalarFunctionFactory>) {
|
pub fn register(&self, func: impl Into<ScalarFunctionFactory>) {
|
||||||
let func = func.into();
|
let func = func.into();
|
||||||
let _ = self
|
let _ = self
|
||||||
.scalar_functions
|
.functions
|
||||||
.write()
|
.write()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.insert(func.name().to_string(), func);
|
.insert(func.name().to_string(), func);
|
||||||
@@ -87,17 +87,13 @@ impl FunctionRegistry {
|
|||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_scalar_function(&self, name: &str) -> Option<ScalarFunctionFactory> {
|
#[cfg(test)]
|
||||||
self.scalar_functions.read().unwrap().get(name).cloned()
|
pub fn get_function(&self, name: &str) -> Option<ScalarFunctionFactory> {
|
||||||
|
self.functions.read().unwrap().get(name).cloned()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn scalar_functions(&self) -> Vec<ScalarFunctionFactory> {
|
pub fn scalar_functions(&self) -> Vec<ScalarFunctionFactory> {
|
||||||
self.scalar_functions
|
self.functions.read().unwrap().values().cloned().collect()
|
||||||
.read()
|
|
||||||
.unwrap()
|
|
||||||
.values()
|
|
||||||
.cloned()
|
|
||||||
.collect()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn aggregate_functions(&self) -> Vec<AggregateUDF> {
|
pub fn aggregate_functions(&self) -> Vec<AggregateUDF> {
|
||||||
@@ -148,11 +144,6 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
|
|||||||
// Approximate functions
|
// Approximate functions
|
||||||
ApproximateFunction::register(&function_registry);
|
ApproximateFunction::register(&function_registry);
|
||||||
|
|
||||||
// PromQL aggregate functions
|
|
||||||
for aggr in promql::functions::aggr_funcs() {
|
|
||||||
function_registry.register_aggr(aggr);
|
|
||||||
}
|
|
||||||
|
|
||||||
Arc::new(function_registry)
|
Arc::new(function_registry)
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -165,10 +156,10 @@ mod tests {
|
|||||||
fn test_function_registry() {
|
fn test_function_registry() {
|
||||||
let registry = FunctionRegistry::default();
|
let registry = FunctionRegistry::default();
|
||||||
|
|
||||||
assert!(registry.get_scalar_function("test_and").is_none());
|
assert!(registry.get_function("test_and").is_none());
|
||||||
assert!(registry.scalar_functions().is_empty());
|
assert!(registry.scalar_functions().is_empty());
|
||||||
registry.register_scalar(TestAndFunction);
|
registry.register_scalar(TestAndFunction);
|
||||||
let _ = registry.get_scalar_function("test_and").unwrap();
|
let _ = registry.get_function("test_and").unwrap();
|
||||||
assert_eq!(1, registry.scalar_functions().len());
|
assert_eq!(1, registry.scalar_functions().len());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -296,6 +296,8 @@ pub struct ChannelConfig {
|
|||||||
pub max_recv_message_size: ReadableSize,
|
pub max_recv_message_size: ReadableSize,
|
||||||
// Max gRPC sending(encoding) message size
|
// Max gRPC sending(encoding) message size
|
||||||
pub max_send_message_size: ReadableSize,
|
pub max_send_message_size: ReadableSize,
|
||||||
|
pub send_compression: bool,
|
||||||
|
pub accept_compression: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for ChannelConfig {
|
impl Default for ChannelConfig {
|
||||||
@@ -316,6 +318,8 @@ impl Default for ChannelConfig {
|
|||||||
client_tls: None,
|
client_tls: None,
|
||||||
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
|
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
|
||||||
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
|
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
|
||||||
|
send_compression: false,
|
||||||
|
accept_compression: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -566,6 +570,8 @@ mod tests {
|
|||||||
client_tls: None,
|
client_tls: None,
|
||||||
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
|
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
|
||||||
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
|
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
|
||||||
|
send_compression: false,
|
||||||
|
accept_compression: false,
|
||||||
},
|
},
|
||||||
default_cfg
|
default_cfg
|
||||||
);
|
);
|
||||||
@@ -610,6 +616,8 @@ mod tests {
|
|||||||
}),
|
}),
|
||||||
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
|
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
|
||||||
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
|
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
|
||||||
|
send_compression: false,
|
||||||
|
accept_compression: false,
|
||||||
},
|
},
|
||||||
cfg
|
cfg
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -64,6 +64,7 @@ impl Default for FlightEncoder {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl FlightEncoder {
|
impl FlightEncoder {
|
||||||
|
/// Creates new [FlightEncoder] with compression disabled.
|
||||||
pub fn with_compression_disabled() -> Self {
|
pub fn with_compression_disabled() -> Self {
|
||||||
let write_options = writer::IpcWriteOptions::default()
|
let write_options = writer::IpcWriteOptions::default()
|
||||||
.try_with_compression(None)
|
.try_with_compression(None)
|
||||||
|
|||||||
@@ -372,6 +372,7 @@ impl DatanodeBuilder {
|
|||||||
opts.max_concurrent_queries,
|
opts.max_concurrent_queries,
|
||||||
//TODO: revaluate the hardcoded timeout on the next version of datanode concurrency limiter.
|
//TODO: revaluate the hardcoded timeout on the next version of datanode concurrency limiter.
|
||||||
Duration::from_millis(100),
|
Duration::from_millis(100),
|
||||||
|
opts.grpc.flight_compression,
|
||||||
);
|
);
|
||||||
|
|
||||||
let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
|
let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
|
||||||
|
|||||||
@@ -50,6 +50,7 @@ use query::QueryEngineRef;
|
|||||||
use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult};
|
use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult};
|
||||||
use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
|
use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
|
||||||
use servers::grpc::region_server::RegionServerHandler;
|
use servers::grpc::region_server::RegionServerHandler;
|
||||||
|
use servers::grpc::FlightCompression;
|
||||||
use session::context::{QueryContextBuilder, QueryContextRef};
|
use session::context::{QueryContextBuilder, QueryContextRef};
|
||||||
use snafu::{ensure, OptionExt, ResultExt};
|
use snafu::{ensure, OptionExt, ResultExt};
|
||||||
use store_api::metric_engine_consts::{
|
use store_api::metric_engine_consts::{
|
||||||
@@ -80,6 +81,7 @@ use crate::event_listener::RegionServerEventListenerRef;
|
|||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct RegionServer {
|
pub struct RegionServer {
|
||||||
inner: Arc<RegionServerInner>,
|
inner: Arc<RegionServerInner>,
|
||||||
|
flight_compression: FlightCompression,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct RegionStat {
|
pub struct RegionStat {
|
||||||
@@ -93,6 +95,7 @@ impl RegionServer {
|
|||||||
query_engine: QueryEngineRef,
|
query_engine: QueryEngineRef,
|
||||||
runtime: Runtime,
|
runtime: Runtime,
|
||||||
event_listener: RegionServerEventListenerRef,
|
event_listener: RegionServerEventListenerRef,
|
||||||
|
flight_compression: FlightCompression,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self::with_table_provider(
|
Self::with_table_provider(
|
||||||
query_engine,
|
query_engine,
|
||||||
@@ -101,6 +104,7 @@ impl RegionServer {
|
|||||||
Arc::new(DummyTableProviderFactory),
|
Arc::new(DummyTableProviderFactory),
|
||||||
0,
|
0,
|
||||||
Duration::from_millis(0),
|
Duration::from_millis(0),
|
||||||
|
flight_compression,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -111,6 +115,7 @@ impl RegionServer {
|
|||||||
table_provider_factory: TableProviderFactoryRef,
|
table_provider_factory: TableProviderFactoryRef,
|
||||||
max_concurrent_queries: usize,
|
max_concurrent_queries: usize,
|
||||||
concurrent_query_limiter_timeout: Duration,
|
concurrent_query_limiter_timeout: Duration,
|
||||||
|
flight_compression: FlightCompression,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
inner: Arc::new(RegionServerInner::new(
|
inner: Arc::new(RegionServerInner::new(
|
||||||
@@ -123,6 +128,7 @@ impl RegionServer {
|
|||||||
concurrent_query_limiter_timeout,
|
concurrent_query_limiter_timeout,
|
||||||
),
|
),
|
||||||
)),
|
)),
|
||||||
|
flight_compression,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -536,7 +542,11 @@ impl FlightCraft for RegionServer {
|
|||||||
.trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
|
.trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let stream = Box::pin(FlightRecordBatchStream::new(result, tracing_context));
|
let stream = Box::pin(FlightRecordBatchStream::new(
|
||||||
|
result,
|
||||||
|
tracing_context,
|
||||||
|
self.flight_compression,
|
||||||
|
));
|
||||||
Ok(Response::new(stream))
|
Ok(Response::new(stream))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ use query::dataframe::DataFrame;
|
|||||||
use query::planner::LogicalPlanner;
|
use query::planner::LogicalPlanner;
|
||||||
use query::query_engine::{DescribeResult, QueryEngineState};
|
use query::query_engine::{DescribeResult, QueryEngineState};
|
||||||
use query::{QueryEngine, QueryEngineContext};
|
use query::{QueryEngine, QueryEngineContext};
|
||||||
|
use servers::grpc::FlightCompression;
|
||||||
use session::context::QueryContextRef;
|
use session::context::QueryContextRef;
|
||||||
use store_api::metadata::RegionMetadataRef;
|
use store_api::metadata::RegionMetadataRef;
|
||||||
use store_api::region_engine::{
|
use store_api::region_engine::{
|
||||||
@@ -97,6 +98,7 @@ pub fn mock_region_server() -> RegionServer {
|
|||||||
Arc::new(MockQueryEngine),
|
Arc::new(MockQueryEngine),
|
||||||
Runtime::builder().build().unwrap(),
|
Runtime::builder().build().unwrap(),
|
||||||
Box::new(NoopRegionServerEventListener),
|
Box::new(NoopRegionServerEventListener),
|
||||||
|
FlightCompression::default(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -316,7 +316,7 @@ impl StreamingEngine {
|
|||||||
);
|
);
|
||||||
|
|
||||||
METRIC_FLOW_ROWS
|
METRIC_FLOW_ROWS
|
||||||
.with_label_values(&["out"])
|
.with_label_values(&["out-streaming"])
|
||||||
.inc_by(total_rows as u64);
|
.inc_by(total_rows as u64);
|
||||||
|
|
||||||
let now = self.tick_manager.tick();
|
let now = self.tick_manager.tick();
|
||||||
|
|||||||
@@ -46,7 +46,7 @@ use crate::error::{
|
|||||||
IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu,
|
IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu,
|
||||||
NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu,
|
NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu,
|
||||||
};
|
};
|
||||||
use crate::metrics::METRIC_FLOW_TASK_COUNT;
|
use crate::metrics::{METRIC_FLOW_ROWS, METRIC_FLOW_TASK_COUNT};
|
||||||
use crate::repr::{self, DiffRow};
|
use crate::repr::{self, DiffRow};
|
||||||
use crate::{Error, FlowId};
|
use crate::{Error, FlowId};
|
||||||
|
|
||||||
@@ -689,6 +689,9 @@ impl FlowEngine for FlowDualEngine {
|
|||||||
let mut to_stream_engine = Vec::with_capacity(request.requests.len());
|
let mut to_stream_engine = Vec::with_capacity(request.requests.len());
|
||||||
let mut to_batch_engine = request.requests;
|
let mut to_batch_engine = request.requests;
|
||||||
|
|
||||||
|
let mut batching_row_cnt = 0;
|
||||||
|
let mut streaming_row_cnt = 0;
|
||||||
|
|
||||||
{
|
{
|
||||||
// not locking this, or recover flows will be starved when also handling flow inserts
|
// not locking this, or recover flows will be starved when also handling flow inserts
|
||||||
let src_table2flow = self.src_table2flow.read().await;
|
let src_table2flow = self.src_table2flow.read().await;
|
||||||
@@ -698,9 +701,11 @@ impl FlowEngine for FlowDualEngine {
|
|||||||
let is_in_stream = src_table2flow.in_stream(table_id);
|
let is_in_stream = src_table2flow.in_stream(table_id);
|
||||||
let is_in_batch = src_table2flow.in_batch(table_id);
|
let is_in_batch = src_table2flow.in_batch(table_id);
|
||||||
if is_in_stream {
|
if is_in_stream {
|
||||||
|
streaming_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
|
||||||
to_stream_engine.push(req.clone());
|
to_stream_engine.push(req.clone());
|
||||||
}
|
}
|
||||||
if is_in_batch {
|
if is_in_batch {
|
||||||
|
batching_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
if !is_in_batch && !is_in_stream {
|
if !is_in_batch && !is_in_stream {
|
||||||
@@ -713,6 +718,14 @@ impl FlowEngine for FlowDualEngine {
|
|||||||
// can't use drop due to https://github.com/rust-lang/rust/pull/128846
|
// can't use drop due to https://github.com/rust-lang/rust/pull/128846
|
||||||
}
|
}
|
||||||
|
|
||||||
|
METRIC_FLOW_ROWS
|
||||||
|
.with_label_values(&["in-streaming"])
|
||||||
|
.inc_by(streaming_row_cnt as u64);
|
||||||
|
|
||||||
|
METRIC_FLOW_ROWS
|
||||||
|
.with_label_values(&["in-batching"])
|
||||||
|
.inc_by(batching_row_cnt as u64);
|
||||||
|
|
||||||
let streaming_engine = self.streaming_engine.clone();
|
let streaming_engine = self.streaming_engine.clone();
|
||||||
let stream_handler: JoinHandle<Result<(), Error>> =
|
let stream_handler: JoinHandle<Result<(), Error>> =
|
||||||
common_runtime::spawn_global(async move {
|
common_runtime::spawn_global(async move {
|
||||||
|
|||||||
@@ -42,6 +42,7 @@ use crate::error::{
|
|||||||
ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu,
|
ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu,
|
||||||
UnexpectedSnafu, UnsupportedSnafu,
|
UnexpectedSnafu, UnsupportedSnafu,
|
||||||
};
|
};
|
||||||
|
use crate::metrics::METRIC_FLOW_ROWS;
|
||||||
use crate::{CreateFlowArgs, Error, FlowId, TableName};
|
use crate::{CreateFlowArgs, Error, FlowId, TableName};
|
||||||
|
|
||||||
/// Batching mode Engine, responsible for driving all the batching mode tasks
|
/// Batching mode Engine, responsible for driving all the batching mode tasks
|
||||||
@@ -155,6 +156,10 @@ impl BatchingEngine {
|
|||||||
let Some(expr) = &task.config.time_window_expr else {
|
let Some(expr) = &task.config.time_window_expr else {
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
|
let row_cnt: usize = entry.iter().map(|rows| rows.rows.len()).sum();
|
||||||
|
METRIC_FLOW_ROWS
|
||||||
|
.with_label_values(&[&format!("{}-batching-in", task.config.flow_id)])
|
||||||
|
.inc_by(row_cnt as u64);
|
||||||
let involved_time_windows = expr.handle_rows(entry.clone()).await?;
|
let involved_time_windows = expr.handle_rows(entry.clone()).await?;
|
||||||
let mut state = task.state.write().unwrap();
|
let mut state = task.state.write().unwrap();
|
||||||
state
|
state
|
||||||
|
|||||||
@@ -32,6 +32,7 @@ use crate::batching_mode::MIN_REFRESH_DURATION;
|
|||||||
use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
|
use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
|
||||||
use crate::metrics::{
|
use crate::metrics::{
|
||||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT,
|
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT,
|
||||||
|
METRIC_FLOW_BATCHING_ENGINE_WAIT_TIME,
|
||||||
};
|
};
|
||||||
use crate::{Error, FlowId};
|
use crate::{Error, FlowId};
|
||||||
|
|
||||||
@@ -102,8 +103,19 @@ impl TaskState {
|
|||||||
})
|
})
|
||||||
.unwrap_or(last_duration);
|
.unwrap_or(last_duration);
|
||||||
|
|
||||||
|
METRIC_FLOW_BATCHING_ENGINE_WAIT_TIME
|
||||||
|
.with_label_values(&[
|
||||||
|
flow_id.to_string().as_str(),
|
||||||
|
time_window_size
|
||||||
|
.unwrap_or_default()
|
||||||
|
.as_secs_f64()
|
||||||
|
.to_string()
|
||||||
|
.as_str(),
|
||||||
|
])
|
||||||
|
.observe(next_duration.as_secs_f64());
|
||||||
|
|
||||||
// if have dirty time window, execute immediately to clean dirty time window
|
// if have dirty time window, execute immediately to clean dirty time window
|
||||||
if self.dirty_time_windows.windows.is_empty() {
|
/*if self.dirty_time_windows.windows.is_empty() {
|
||||||
self.last_update_time + next_duration
|
self.last_update_time + next_duration
|
||||||
} else {
|
} else {
|
||||||
debug!(
|
debug!(
|
||||||
@@ -113,7 +125,9 @@ impl TaskState {
|
|||||||
self.dirty_time_windows.windows
|
self.dirty_time_windows.windows
|
||||||
);
|
);
|
||||||
Instant::now()
|
Instant::now()
|
||||||
}
|
}*/
|
||||||
|
|
||||||
|
self.last_update_time + next_duration
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -206,47 +220,59 @@ impl DirtyTimeWindows {
|
|||||||
|
|
||||||
// get the first `window_cnt` time windows
|
// get the first `window_cnt` time windows
|
||||||
let max_time_range = window_size * window_cnt as i32;
|
let max_time_range = window_size * window_cnt as i32;
|
||||||
let nth = {
|
|
||||||
let mut cur_time_range = chrono::Duration::zero();
|
|
||||||
let mut nth_key = None;
|
|
||||||
for (idx, (start, end)) in self.windows.iter().enumerate() {
|
|
||||||
// if time range is too long, stop
|
|
||||||
if cur_time_range > max_time_range {
|
|
||||||
nth_key = Some(*start);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// if we have enough time windows, stop
|
let mut to_be_query = BTreeMap::new();
|
||||||
if idx >= window_cnt {
|
let mut new_windows = self.windows.clone();
|
||||||
nth_key = Some(*start);
|
let mut cur_time_range = chrono::Duration::zero();
|
||||||
break;
|
for (idx, (start, end)) in self.windows.iter().enumerate() {
|
||||||
}
|
let first_end = start
|
||||||
|
.add_duration(window_size.to_std().unwrap())
|
||||||
|
.context(TimeSnafu)?;
|
||||||
|
let end = end.unwrap_or(first_end);
|
||||||
|
|
||||||
if let Some(end) = end {
|
// if time range is too long, stop
|
||||||
if let Some(x) = end.sub(start) {
|
if cur_time_range >= max_time_range {
|
||||||
cur_time_range += x;
|
break;
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
nth_key
|
// if we have enough time windows, stop
|
||||||
};
|
if idx >= window_cnt {
|
||||||
let first_nth = {
|
break;
|
||||||
if let Some(nth) = nth {
|
|
||||||
let mut after = self.windows.split_off(&nth);
|
|
||||||
std::mem::swap(&mut self.windows, &mut after);
|
|
||||||
|
|
||||||
after
|
|
||||||
} else {
|
|
||||||
std::mem::take(&mut self.windows)
|
|
||||||
}
|
}
|
||||||
};
|
|
||||||
|
if let Some(x) = end.sub(start) {
|
||||||
|
if cur_time_range + x <= max_time_range {
|
||||||
|
to_be_query.insert(*start, Some(end));
|
||||||
|
new_windows.remove(start);
|
||||||
|
cur_time_range += x;
|
||||||
|
} else {
|
||||||
|
// too large a window, split it
|
||||||
|
// split at window_size * times
|
||||||
|
let surplus = max_time_range - cur_time_range;
|
||||||
|
let times = surplus.num_seconds() / window_size.num_seconds();
|
||||||
|
|
||||||
|
let split_offset = window_size * times as i32;
|
||||||
|
let split_at = start
|
||||||
|
.add_duration(split_offset.to_std().unwrap())
|
||||||
|
.context(TimeSnafu)?;
|
||||||
|
to_be_query.insert(*start, Some(split_at));
|
||||||
|
|
||||||
|
// remove the original window
|
||||||
|
new_windows.remove(start);
|
||||||
|
new_windows.insert(split_at, Some(end));
|
||||||
|
cur_time_range += split_offset;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.windows = new_windows;
|
||||||
|
|
||||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
|
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
|
||||||
.with_label_values(&[flow_id.to_string().as_str()])
|
.with_label_values(&[flow_id.to_string().as_str()])
|
||||||
.observe(first_nth.len() as f64);
|
.observe(to_be_query.len() as f64);
|
||||||
|
|
||||||
let full_time_range = first_nth
|
let full_time_range = to_be_query
|
||||||
.iter()
|
.iter()
|
||||||
.fold(chrono::Duration::zero(), |acc, (start, end)| {
|
.fold(chrono::Duration::zero(), |acc, (start, end)| {
|
||||||
if let Some(end) = end {
|
if let Some(end) = end {
|
||||||
@@ -261,7 +287,7 @@ impl DirtyTimeWindows {
|
|||||||
.observe(full_time_range);
|
.observe(full_time_range);
|
||||||
|
|
||||||
let mut expr_lst = vec![];
|
let mut expr_lst = vec![];
|
||||||
for (start, end) in first_nth.into_iter() {
|
for (start, end) in to_be_query.into_iter() {
|
||||||
// align using time window exprs
|
// align using time window exprs
|
||||||
let (start, end) = if let Some(ctx) = task_ctx {
|
let (start, end) = if let Some(ctx) = task_ctx {
|
||||||
let Some(time_window_expr) = &ctx.config.time_window_expr else {
|
let Some(time_window_expr) = &ctx.config.time_window_expr else {
|
||||||
@@ -495,6 +521,64 @@ mod test {
|
|||||||
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))",
|
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))",
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
|
// split range
|
||||||
|
(
|
||||||
|
Vec::from_iter((0..20).map(|i|Timestamp::new_second(i*3)).chain(std::iter::once(
|
||||||
|
Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
|
||||||
|
))),
|
||||||
|
(chrono::Duration::seconds(3), None),
|
||||||
|
BTreeMap::from([
|
||||||
|
(
|
||||||
|
Timestamp::new_second(0),
|
||||||
|
Some(Timestamp::new_second(
|
||||||
|
60
|
||||||
|
)),
|
||||||
|
),
|
||||||
|
(
|
||||||
|
Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
|
||||||
|
Some(Timestamp::new_second(
|
||||||
|
60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1) + 3
|
||||||
|
)),
|
||||||
|
)]),
|
||||||
|
Some(
|
||||||
|
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
|
||||||
|
)
|
||||||
|
),
|
||||||
|
// split 2 min into 1 min
|
||||||
|
(
|
||||||
|
Vec::from_iter((0..40).map(|i|Timestamp::new_second(i*3))),
|
||||||
|
(chrono::Duration::seconds(3), None),
|
||||||
|
BTreeMap::from([
|
||||||
|
(
|
||||||
|
Timestamp::new_second(0),
|
||||||
|
Some(Timestamp::new_second(
|
||||||
|
40 * 3
|
||||||
|
)),
|
||||||
|
)]),
|
||||||
|
Some(
|
||||||
|
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
|
||||||
|
)
|
||||||
|
),
|
||||||
|
// split 3s + 1min into 3s + 57s
|
||||||
|
(
|
||||||
|
Vec::from_iter(std::iter::once(Timestamp::new_second(0)).chain((0..40).map(|i|Timestamp::new_second(20+i*3)))),
|
||||||
|
(chrono::Duration::seconds(3), None),
|
||||||
|
BTreeMap::from([
|
||||||
|
(
|
||||||
|
Timestamp::new_second(0),
|
||||||
|
Some(Timestamp::new_second(
|
||||||
|
3
|
||||||
|
)),
|
||||||
|
),(
|
||||||
|
Timestamp::new_second(20),
|
||||||
|
Some(Timestamp::new_second(
|
||||||
|
140
|
||||||
|
)),
|
||||||
|
)]),
|
||||||
|
Some(
|
||||||
|
"(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:03' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:00:20' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:17' AS TIMESTAMP))))",
|
||||||
|
)
|
||||||
|
),
|
||||||
// expired
|
// expired
|
||||||
(
|
(
|
||||||
vec![
|
vec![
|
||||||
@@ -511,6 +595,8 @@ mod test {
|
|||||||
None
|
None
|
||||||
),
|
),
|
||||||
];
|
];
|
||||||
|
// let len = testcases.len();
|
||||||
|
// let testcases = testcases[(len - 2)..(len - 1)].to_vec();
|
||||||
for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in
|
for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in
|
||||||
testcases
|
testcases
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -61,7 +61,9 @@ use crate::error::{
|
|||||||
SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
|
SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
|
||||||
};
|
};
|
||||||
use crate::metrics::{
|
use crate::metrics::{
|
||||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
|
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME,
|
||||||
|
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT,
|
||||||
|
METRIC_FLOW_ROWS,
|
||||||
};
|
};
|
||||||
use crate::{Error, FlowId};
|
use crate::{Error, FlowId};
|
||||||
|
|
||||||
@@ -371,6 +373,9 @@ impl BatchingTask {
|
|||||||
"Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}",
|
"Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}",
|
||||||
elapsed
|
elapsed
|
||||||
);
|
);
|
||||||
|
METRIC_FLOW_ROWS
|
||||||
|
.with_label_values(&[format!("{}-out-batching", flow_id).as_str()])
|
||||||
|
.inc_by(*affected_rows as _);
|
||||||
} else if let Err(err) = &res {
|
} else if let Err(err) = &res {
|
||||||
warn!(
|
warn!(
|
||||||
"Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
|
"Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
|
||||||
@@ -410,6 +415,7 @@ impl BatchingTask {
|
|||||||
engine: QueryEngineRef,
|
engine: QueryEngineRef,
|
||||||
frontend_client: Arc<FrontendClient>,
|
frontend_client: Arc<FrontendClient>,
|
||||||
) {
|
) {
|
||||||
|
let flow_id_str = self.config.flow_id.to_string();
|
||||||
loop {
|
loop {
|
||||||
// first check if shutdown signal is received
|
// first check if shutdown signal is received
|
||||||
// if so, break the loop
|
// if so, break the loop
|
||||||
@@ -427,6 +433,9 @@ impl BatchingTask {
|
|||||||
Err(TryRecvError::Empty) => (),
|
Err(TryRecvError::Empty) => (),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT
|
||||||
|
.with_label_values(&[&flow_id_str])
|
||||||
|
.inc();
|
||||||
|
|
||||||
let new_query = match self.gen_insert_plan(&engine).await {
|
let new_query = match self.gen_insert_plan(&engine).await {
|
||||||
Ok(new_query) => new_query,
|
Ok(new_query) => new_query,
|
||||||
@@ -473,6 +482,9 @@ impl BatchingTask {
|
|||||||
}
|
}
|
||||||
// TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
|
// TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
|
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
|
||||||
|
.with_label_values(&[&flow_id_str])
|
||||||
|
.inc();
|
||||||
match new_query {
|
match new_query {
|
||||||
Some(query) => {
|
Some(query) => {
|
||||||
common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id)
|
common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id)
|
||||||
|
|||||||
@@ -35,6 +35,13 @@ lazy_static! {
|
|||||||
vec![0.0, 5., 10., 20., 40., 80., 160., 320., 640.,]
|
vec![0.0, 5., 10., 20., 40., 80., 160., 320., 640.,]
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
pub static ref METRIC_FLOW_BATCHING_ENGINE_WAIT_TIME: HistogramVec = register_histogram_vec!(
|
||||||
|
"greptime_flow_batching_engine_wait_time_secs",
|
||||||
|
"flow batching engine wait time between query(seconds)",
|
||||||
|
&["flow_id", "time_window_size"],
|
||||||
|
vec![0.0, 5., 10., 20., 40., 80., 160., 320., 640.,]
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY: HistogramVec = register_histogram_vec!(
|
pub static ref METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY: HistogramVec = register_histogram_vec!(
|
||||||
"greptime_flow_batching_engine_slow_query_secs",
|
"greptime_flow_batching_engine_slow_query_secs",
|
||||||
"flow batching engine slow query(seconds)",
|
"flow batching engine slow query(seconds)",
|
||||||
@@ -58,6 +65,20 @@ lazy_static! {
|
|||||||
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
|
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
pub static ref METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT: IntCounterVec =
|
||||||
|
register_int_counter_vec!(
|
||||||
|
"greptime_flow_batching_start_query_count",
|
||||||
|
"flow batching engine started query count",
|
||||||
|
&["flow_id"],
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
pub static ref METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT: IntCounterVec =
|
||||||
|
register_int_counter_vec!(
|
||||||
|
"greptime_flow_batching_error_count",
|
||||||
|
"flow batching engine error count per flow id",
|
||||||
|
&["flow_id"],
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge =
|
pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge =
|
||||||
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
|
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
|
||||||
pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(
|
pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(
|
||||||
|
|||||||
@@ -154,6 +154,7 @@ where
|
|||||||
ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()),
|
ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()),
|
||||||
user_provider.clone(),
|
user_provider.clone(),
|
||||||
runtime,
|
runtime,
|
||||||
|
opts.grpc.flight_compression,
|
||||||
);
|
);
|
||||||
|
|
||||||
let grpc_server = builder
|
let grpc_server = builder
|
||||||
|
|||||||
@@ -34,7 +34,6 @@ pub use changes::Changes;
|
|||||||
use datafusion::arrow::array::{ArrayRef, Float64Array, TimestampMillisecondArray};
|
use datafusion::arrow::array::{ArrayRef, Float64Array, TimestampMillisecondArray};
|
||||||
use datafusion::error::DataFusionError;
|
use datafusion::error::DataFusionError;
|
||||||
use datafusion::physical_plan::ColumnarValue;
|
use datafusion::physical_plan::ColumnarValue;
|
||||||
use datafusion_expr::{AggregateUDF, ScalarUDF};
|
|
||||||
pub use deriv::Deriv;
|
pub use deriv::Deriv;
|
||||||
pub use extrapolate_rate::{Delta, Increase, Rate};
|
pub use extrapolate_rate::{Delta, Increase, Rate};
|
||||||
pub use holt_winters::HoltWinters;
|
pub use holt_winters::HoltWinters;
|
||||||
@@ -45,39 +44,6 @@ pub use quantile_aggr::{quantile_udaf, QUANTILE_NAME};
|
|||||||
pub use resets::Resets;
|
pub use resets::Resets;
|
||||||
pub use round::Round;
|
pub use round::Round;
|
||||||
|
|
||||||
/// Range functions for PromQL.
|
|
||||||
pub fn range_funcs() -> Vec<ScalarUDF> {
|
|
||||||
vec![
|
|
||||||
IDelta::<false>::scalar_udf(),
|
|
||||||
IDelta::<true>::scalar_udf(),
|
|
||||||
Rate::scalar_udf(),
|
|
||||||
Increase::scalar_udf(),
|
|
||||||
Delta::scalar_udf(),
|
|
||||||
Resets::scalar_udf(),
|
|
||||||
Changes::scalar_udf(),
|
|
||||||
Deriv::scalar_udf(),
|
|
||||||
Round::scalar_udf(),
|
|
||||||
AvgOverTime::scalar_udf(),
|
|
||||||
MinOverTime::scalar_udf(),
|
|
||||||
MaxOverTime::scalar_udf(),
|
|
||||||
SumOverTime::scalar_udf(),
|
|
||||||
CountOverTime::scalar_udf(),
|
|
||||||
LastOverTime::scalar_udf(),
|
|
||||||
AbsentOverTime::scalar_udf(),
|
|
||||||
PresentOverTime::scalar_udf(),
|
|
||||||
StddevOverTime::scalar_udf(),
|
|
||||||
StdvarOverTime::scalar_udf(),
|
|
||||||
QuantileOverTime::scalar_udf(),
|
|
||||||
PredictLinear::scalar_udf(),
|
|
||||||
HoltWinters::scalar_udf(),
|
|
||||||
]
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Aggregate functions for PromQL.
|
|
||||||
pub fn aggr_funcs() -> Vec<AggregateUDF> {
|
|
||||||
vec![quantile_udaf()]
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Extracts an array from a `ColumnarValue`.
|
/// Extracts an array from a `ColumnarValue`.
|
||||||
///
|
///
|
||||||
/// If the `ColumnarValue` is a scalar, it converts it to an array of size 1.
|
/// If the `ColumnarValue` is a scalar, it converts it to an array of size 1.
|
||||||
|
|||||||
@@ -40,8 +40,8 @@ pub struct QuantileAccumulator {
|
|||||||
|
|
||||||
/// Create a quantile `AggregateUDF` for PromQL quantile operator,
|
/// Create a quantile `AggregateUDF` for PromQL quantile operator,
|
||||||
/// which calculates φ-quantile (0 ≤ φ ≤ 1) over dimensions
|
/// which calculates φ-quantile (0 ≤ φ ≤ 1) over dimensions
|
||||||
pub fn quantile_udaf() -> AggregateUDF {
|
pub fn quantile_udaf() -> Arc<AggregateUDF> {
|
||||||
create_udaf(
|
Arc::new(create_udaf(
|
||||||
QUANTILE_NAME,
|
QUANTILE_NAME,
|
||||||
// Input type: (φ, values)
|
// Input type: (φ, values)
|
||||||
vec![DataType::Float64, DataType::Float64],
|
vec![DataType::Float64, DataType::Float64],
|
||||||
@@ -63,7 +63,7 @@ pub fn quantile_udaf() -> AggregateUDF {
|
|||||||
)]
|
)]
|
||||||
.into(),
|
.into(),
|
||||||
)]),
|
)]),
|
||||||
)
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
impl QuantileAccumulator {
|
impl QuantileAccumulator {
|
||||||
|
|||||||
@@ -1948,7 +1948,7 @@ impl PromPlanner {
|
|||||||
token::T_QUANTILE => {
|
token::T_QUANTILE => {
|
||||||
let q = Self::get_param_value_as_f64(op, param)?;
|
let q = Self::get_param_value_as_f64(op, param)?;
|
||||||
non_col_args.push(lit(q));
|
non_col_args.push(lit(q));
|
||||||
Arc::new(quantile_udaf())
|
quantile_udaf()
|
||||||
}
|
}
|
||||||
token::T_AVG => avg_udaf(),
|
token::T_AVG => avg_udaf(),
|
||||||
token::T_COUNT_VALUES | token::T_COUNT => count_udaf(),
|
token::T_COUNT_VALUES | token::T_COUNT => count_udaf(),
|
||||||
|
|||||||
@@ -28,6 +28,11 @@ use datafusion::execution::{FunctionRegistry, SessionStateBuilder};
|
|||||||
use datafusion::logical_expr::LogicalPlan;
|
use datafusion::logical_expr::LogicalPlan;
|
||||||
use datafusion_expr::UserDefinedLogicalNode;
|
use datafusion_expr::UserDefinedLogicalNode;
|
||||||
use greptime_proto::substrait_extension::MergeScan as PbMergeScan;
|
use greptime_proto::substrait_extension::MergeScan as PbMergeScan;
|
||||||
|
use promql::functions::{
|
||||||
|
quantile_udaf, AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, IDelta,
|
||||||
|
Increase, LastOverTime, MaxOverTime, MinOverTime, PresentOverTime, Rate, Resets, Round,
|
||||||
|
StddevOverTime, StdvarOverTime, SumOverTime,
|
||||||
|
};
|
||||||
use prost::Message;
|
use prost::Message;
|
||||||
use session::context::QueryContextRef;
|
use session::context::QueryContextRef;
|
||||||
use snafu::ResultExt;
|
use snafu::ResultExt;
|
||||||
@@ -112,15 +117,12 @@ impl SubstraitPlanDecoder for DefaultPlanDecoder {
|
|||||||
let mut session_state = SessionStateBuilder::new_from_existing(self.session_state.clone())
|
let mut session_state = SessionStateBuilder::new_from_existing(self.session_state.clone())
|
||||||
.with_catalog_list(catalog_list)
|
.with_catalog_list(catalog_list)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
// Substrait decoder will look up the UDFs in SessionState, so we need to register them
|
// Substrait decoder will look up the UDFs in SessionState, so we need to register them
|
||||||
// Note: the query context must be passed to set the timezone
|
// Note: the query context must be passed to set the timezone
|
||||||
// We MUST register the UDFs after we build the session state, otherwise the UDFs will be lost
|
// We MUST register the UDFs after we build the session state, otherwise the UDFs will be lost
|
||||||
// if they have the same name as the default UDFs or their alias.
|
// if they have the same name as the default UDFs or their alias.
|
||||||
// e.g. The default UDF `to_char()` has an alias `date_format()`, if we register a UDF with the name `date_format()`
|
// e.g. The default UDF `to_char()` has an alias `date_format()`, if we register a UDF with the name `date_format()`
|
||||||
// before we build the session state, the UDF will be lost.
|
// before we build the session state, the UDF will be lost.
|
||||||
|
|
||||||
// Scalar functions
|
|
||||||
for func in FUNCTION_REGISTRY.scalar_functions() {
|
for func in FUNCTION_REGISTRY.scalar_functions() {
|
||||||
let udf = func.provide(FunctionContext {
|
let udf = func.provide(FunctionContext {
|
||||||
query_ctx: self.query_ctx.clone(),
|
query_ctx: self.query_ctx.clone(),
|
||||||
@@ -131,15 +133,6 @@ impl SubstraitPlanDecoder for DefaultPlanDecoder {
|
|||||||
.context(RegisterUdfSnafu { name: func.name() })?;
|
.context(RegisterUdfSnafu { name: func.name() })?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// PromQL range functions
|
|
||||||
for func in promql::functions::range_funcs() {
|
|
||||||
let name = func.name().to_string();
|
|
||||||
session_state
|
|
||||||
.register_udf(Arc::new(func))
|
|
||||||
.context(RegisterUdfSnafu { name })?;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Aggregate functions
|
|
||||||
for func in FUNCTION_REGISTRY.aggregate_functions() {
|
for func in FUNCTION_REGISTRY.aggregate_functions() {
|
||||||
let name = func.name().to_string();
|
let name = func.name().to_string();
|
||||||
session_state
|
session_state
|
||||||
@@ -147,6 +140,29 @@ impl SubstraitPlanDecoder for DefaultPlanDecoder {
|
|||||||
.context(RegisterUdfSnafu { name })?;
|
.context(RegisterUdfSnafu { name })?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let _ = session_state.register_udaf(quantile_udaf());
|
||||||
|
|
||||||
|
let _ = session_state.register_udf(Arc::new(IDelta::<false>::scalar_udf()));
|
||||||
|
let _ = session_state.register_udf(Arc::new(IDelta::<true>::scalar_udf()));
|
||||||
|
let _ = session_state.register_udf(Arc::new(Rate::scalar_udf()));
|
||||||
|
let _ = session_state.register_udf(Arc::new(Increase::scalar_udf()));
|
||||||
|
let _ = session_state.register_udf(Arc::new(Delta::scalar_udf()));
|
||||||
|
let _ = session_state.register_udf(Arc::new(Resets::scalar_udf()));
|
||||||
|
let _ = session_state.register_udf(Arc::new(Changes::scalar_udf()));
|
||||||
|
let _ = session_state.register_udf(Arc::new(Deriv::scalar_udf()));
|
||||||
|
let _ = session_state.register_udf(Arc::new(Round::scalar_udf()));
|
||||||
|
let _ = session_state.register_udf(Arc::new(AvgOverTime::scalar_udf()));
|
||||||
|
let _ = session_state.register_udf(Arc::new(MinOverTime::scalar_udf()));
|
||||||
|
let _ = session_state.register_udf(Arc::new(MaxOverTime::scalar_udf()));
|
||||||
|
let _ = session_state.register_udf(Arc::new(SumOverTime::scalar_udf()));
|
||||||
|
let _ = session_state.register_udf(Arc::new(CountOverTime::scalar_udf()));
|
||||||
|
let _ = session_state.register_udf(Arc::new(LastOverTime::scalar_udf()));
|
||||||
|
let _ = session_state.register_udf(Arc::new(AbsentOverTime::scalar_udf()));
|
||||||
|
let _ = session_state.register_udf(Arc::new(PresentOverTime::scalar_udf()));
|
||||||
|
let _ = session_state.register_udf(Arc::new(StddevOverTime::scalar_udf()));
|
||||||
|
let _ = session_state.register_udf(Arc::new(StdvarOverTime::scalar_udf()));
|
||||||
|
// TODO(ruihang): add quantile_over_time, predict_linear, holt_winters, round
|
||||||
|
|
||||||
let logical_plan = DFLogicalSubstraitConvertor
|
let logical_plan = DFLogicalSubstraitConvertor
|
||||||
.decode(message, session_state)
|
.decode(message, session_state)
|
||||||
.await
|
.await
|
||||||
|
|||||||
@@ -66,6 +66,8 @@ pub struct GrpcOptions {
|
|||||||
pub max_recv_message_size: ReadableSize,
|
pub max_recv_message_size: ReadableSize,
|
||||||
/// Max gRPC sending(encoding) message size
|
/// Max gRPC sending(encoding) message size
|
||||||
pub max_send_message_size: ReadableSize,
|
pub max_send_message_size: ReadableSize,
|
||||||
|
/// Compression mode in Arrow Flight service.
|
||||||
|
pub flight_compression: FlightCompression,
|
||||||
pub runtime_size: usize,
|
pub runtime_size: usize,
|
||||||
#[serde(default = "Default::default")]
|
#[serde(default = "Default::default")]
|
||||||
pub tls: TlsOption,
|
pub tls: TlsOption,
|
||||||
@@ -114,6 +116,7 @@ impl Default for GrpcOptions {
|
|||||||
server_addr: String::new(),
|
server_addr: String::new(),
|
||||||
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
|
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
|
||||||
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
|
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
|
||||||
|
flight_compression: FlightCompression::ArrowIpc,
|
||||||
runtime_size: 8,
|
runtime_size: 8,
|
||||||
tls: TlsOption::default(),
|
tls: TlsOption::default(),
|
||||||
}
|
}
|
||||||
@@ -132,6 +135,30 @@ impl GrpcOptions {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Default)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub enum FlightCompression {
|
||||||
|
/// Disable all compression in Arrow Flight service.
|
||||||
|
#[default]
|
||||||
|
None,
|
||||||
|
/// Enable only transport layer compression (zstd).
|
||||||
|
Transport,
|
||||||
|
/// Enable only payload compression (lz4)
|
||||||
|
ArrowIpc,
|
||||||
|
/// Enable all compression.
|
||||||
|
All,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FlightCompression {
|
||||||
|
pub fn transport_compression(&self) -> bool {
|
||||||
|
self == &FlightCompression::Transport || self == &FlightCompression::All
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn arrow_compression(&self) -> bool {
|
||||||
|
self == &FlightCompression::ArrowIpc || self == &FlightCompression::All
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub struct GrpcServer {
|
pub struct GrpcServer {
|
||||||
// states
|
// states
|
||||||
shutdown_tx: Mutex<Option<Sender<()>>>,
|
shutdown_tx: Mutex<Option<Sender<()>>>,
|
||||||
|
|||||||
@@ -45,7 +45,7 @@ use tonic::{Request, Response, Status, Streaming};
|
|||||||
use crate::error::{InvalidParameterSnafu, ParseJsonSnafu, Result, ToJsonSnafu};
|
use crate::error::{InvalidParameterSnafu, ParseJsonSnafu, Result, ToJsonSnafu};
|
||||||
pub use crate::grpc::flight::stream::FlightRecordBatchStream;
|
pub use crate::grpc::flight::stream::FlightRecordBatchStream;
|
||||||
use crate::grpc::greptime_handler::{get_request_type, GreptimeRequestHandler};
|
use crate::grpc::greptime_handler::{get_request_type, GreptimeRequestHandler};
|
||||||
use crate::grpc::TonicResult;
|
use crate::grpc::{FlightCompression, TonicResult};
|
||||||
use crate::http::header::constants::GREPTIME_DB_HEADER_NAME;
|
use crate::http::header::constants::GREPTIME_DB_HEADER_NAME;
|
||||||
use crate::http::AUTHORIZATION_HEADER;
|
use crate::http::AUTHORIZATION_HEADER;
|
||||||
use crate::{error, hint_headers};
|
use crate::{error, hint_headers};
|
||||||
@@ -195,9 +195,14 @@ impl FlightCraft for GreptimeRequestHandler {
|
|||||||
protocol = "grpc",
|
protocol = "grpc",
|
||||||
request_type = get_request_type(&request)
|
request_type = get_request_type(&request)
|
||||||
);
|
);
|
||||||
|
let flight_compression = self.flight_compression;
|
||||||
async {
|
async {
|
||||||
let output = self.handle_request(request, hints).await?;
|
let output = self.handle_request(request, hints).await?;
|
||||||
let stream = to_flight_data_stream(output, TracingContext::from_current_span());
|
let stream = to_flight_data_stream(
|
||||||
|
output,
|
||||||
|
TracingContext::from_current_span(),
|
||||||
|
flight_compression,
|
||||||
|
);
|
||||||
Ok(Response::new(stream))
|
Ok(Response::new(stream))
|
||||||
}
|
}
|
||||||
.trace(span)
|
.trace(span)
|
||||||
@@ -365,14 +370,16 @@ impl Stream for PutRecordBatchRequestStream {
|
|||||||
fn to_flight_data_stream(
|
fn to_flight_data_stream(
|
||||||
output: Output,
|
output: Output,
|
||||||
tracing_context: TracingContext,
|
tracing_context: TracingContext,
|
||||||
|
flight_compression: FlightCompression,
|
||||||
) -> TonicStream<FlightData> {
|
) -> TonicStream<FlightData> {
|
||||||
match output.data {
|
match output.data {
|
||||||
OutputData::Stream(stream) => {
|
OutputData::Stream(stream) => {
|
||||||
let stream = FlightRecordBatchStream::new(stream, tracing_context);
|
let stream = FlightRecordBatchStream::new(stream, tracing_context, flight_compression);
|
||||||
Box::pin(stream) as _
|
Box::pin(stream) as _
|
||||||
}
|
}
|
||||||
OutputData::RecordBatches(x) => {
|
OutputData::RecordBatches(x) => {
|
||||||
let stream = FlightRecordBatchStream::new(x.as_stream(), tracing_context);
|
let stream =
|
||||||
|
FlightRecordBatchStream::new(x.as_stream(), tracing_context, flight_compression);
|
||||||
Box::pin(stream) as _
|
Box::pin(stream) as _
|
||||||
}
|
}
|
||||||
OutputData::AffectedRows(rows) => {
|
OutputData::AffectedRows(rows) => {
|
||||||
|
|||||||
@@ -30,6 +30,7 @@ use tokio::task::JoinHandle;
|
|||||||
|
|
||||||
use crate::error;
|
use crate::error;
|
||||||
use crate::grpc::flight::TonicResult;
|
use crate::grpc::flight::TonicResult;
|
||||||
|
use crate::grpc::FlightCompression;
|
||||||
|
|
||||||
#[pin_project(PinnedDrop)]
|
#[pin_project(PinnedDrop)]
|
||||||
pub struct FlightRecordBatchStream {
|
pub struct FlightRecordBatchStream {
|
||||||
@@ -41,18 +42,27 @@ pub struct FlightRecordBatchStream {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl FlightRecordBatchStream {
|
impl FlightRecordBatchStream {
|
||||||
pub fn new(recordbatches: SendableRecordBatchStream, tracing_context: TracingContext) -> Self {
|
pub fn new(
|
||||||
|
recordbatches: SendableRecordBatchStream,
|
||||||
|
tracing_context: TracingContext,
|
||||||
|
compression: FlightCompression,
|
||||||
|
) -> Self {
|
||||||
let (tx, rx) = mpsc::channel::<TonicResult<FlightMessage>>(1);
|
let (tx, rx) = mpsc::channel::<TonicResult<FlightMessage>>(1);
|
||||||
let join_handle = common_runtime::spawn_global(async move {
|
let join_handle = common_runtime::spawn_global(async move {
|
||||||
Self::flight_data_stream(recordbatches, tx)
|
Self::flight_data_stream(recordbatches, tx)
|
||||||
.trace(tracing_context.attach(info_span!("flight_data_stream")))
|
.trace(tracing_context.attach(info_span!("flight_data_stream")))
|
||||||
.await
|
.await
|
||||||
});
|
});
|
||||||
|
let encoder = if compression.arrow_compression() {
|
||||||
|
FlightEncoder::default()
|
||||||
|
} else {
|
||||||
|
FlightEncoder::with_compression_disabled()
|
||||||
|
};
|
||||||
Self {
|
Self {
|
||||||
rx,
|
rx,
|
||||||
join_handle,
|
join_handle,
|
||||||
done: false,
|
done: false,
|
||||||
encoder: FlightEncoder::with_compression_disabled(),
|
encoder,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -161,7 +171,11 @@ mod test {
|
|||||||
let recordbatches = RecordBatches::try_new(schema.clone(), vec![recordbatch.clone()])
|
let recordbatches = RecordBatches::try_new(schema.clone(), vec![recordbatch.clone()])
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.as_stream();
|
.as_stream();
|
||||||
let mut stream = FlightRecordBatchStream::new(recordbatches, TracingContext::default());
|
let mut stream = FlightRecordBatchStream::new(
|
||||||
|
recordbatches,
|
||||||
|
TracingContext::default(),
|
||||||
|
FlightCompression::default(),
|
||||||
|
);
|
||||||
|
|
||||||
let mut raw_data = Vec::with_capacity(2);
|
let mut raw_data = Vec::with_capacity(2);
|
||||||
raw_data.push(stream.next().await.unwrap().unwrap());
|
raw_data.push(stream.next().await.unwrap().unwrap());
|
||||||
|
|||||||
@@ -49,7 +49,7 @@ use crate::error::{
|
|||||||
JoinTaskSnafu, NotFoundAuthHeaderSnafu, Result, UnknownHintSnafu,
|
JoinTaskSnafu, NotFoundAuthHeaderSnafu, Result, UnknownHintSnafu,
|
||||||
};
|
};
|
||||||
use crate::grpc::flight::{PutRecordBatchRequest, PutRecordBatchRequestStream};
|
use crate::grpc::flight::{PutRecordBatchRequest, PutRecordBatchRequestStream};
|
||||||
use crate::grpc::TonicResult;
|
use crate::grpc::{FlightCompression, TonicResult};
|
||||||
use crate::metrics;
|
use crate::metrics;
|
||||||
use crate::metrics::{METRIC_AUTH_FAILURE, METRIC_SERVER_GRPC_DB_REQUEST_TIMER};
|
use crate::metrics::{METRIC_AUTH_FAILURE, METRIC_SERVER_GRPC_DB_REQUEST_TIMER};
|
||||||
use crate::query_handler::grpc::ServerGrpcQueryHandlerRef;
|
use crate::query_handler::grpc::ServerGrpcQueryHandlerRef;
|
||||||
@@ -59,6 +59,7 @@ pub struct GreptimeRequestHandler {
|
|||||||
handler: ServerGrpcQueryHandlerRef,
|
handler: ServerGrpcQueryHandlerRef,
|
||||||
user_provider: Option<UserProviderRef>,
|
user_provider: Option<UserProviderRef>,
|
||||||
runtime: Option<Runtime>,
|
runtime: Option<Runtime>,
|
||||||
|
pub(crate) flight_compression: FlightCompression,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl GreptimeRequestHandler {
|
impl GreptimeRequestHandler {
|
||||||
@@ -66,11 +67,13 @@ impl GreptimeRequestHandler {
|
|||||||
handler: ServerGrpcQueryHandlerRef,
|
handler: ServerGrpcQueryHandlerRef,
|
||||||
user_provider: Option<UserProviderRef>,
|
user_provider: Option<UserProviderRef>,
|
||||||
runtime: Option<Runtime>,
|
runtime: Option<Runtime>,
|
||||||
|
flight_compression: FlightCompression,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
handler,
|
handler,
|
||||||
user_provider,
|
user_provider,
|
||||||
runtime,
|
runtime,
|
||||||
|
flight_compression,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -34,7 +34,7 @@ mod test {
|
|||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use servers::grpc::builder::GrpcServerBuilder;
|
use servers::grpc::builder::GrpcServerBuilder;
|
||||||
use servers::grpc::greptime_handler::GreptimeRequestHandler;
|
use servers::grpc::greptime_handler::GreptimeRequestHandler;
|
||||||
use servers::grpc::GrpcServerConfig;
|
use servers::grpc::{FlightCompression, GrpcServerConfig};
|
||||||
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter;
|
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter;
|
||||||
use servers::server::Server;
|
use servers::server::Server;
|
||||||
|
|
||||||
@@ -94,6 +94,7 @@ mod test {
|
|||||||
)
|
)
|
||||||
.ok(),
|
.ok(),
|
||||||
Some(runtime.clone()),
|
Some(runtime.clone()),
|
||||||
|
FlightCompression::default(),
|
||||||
);
|
);
|
||||||
let mut grpc_server = GrpcServerBuilder::new(GrpcServerConfig::default(), runtime)
|
let mut grpc_server = GrpcServerBuilder::new(GrpcServerConfig::default(), runtime)
|
||||||
.flight_handler(Arc::new(greptime_request_handler))
|
.flight_handler(Arc::new(greptime_request_handler))
|
||||||
@@ -139,8 +140,7 @@ mod test {
|
|||||||
let schema = record_batches[0].schema.arrow_schema().clone();
|
let schema = record_batches[0].schema.arrow_schema().clone();
|
||||||
|
|
||||||
let stream = futures::stream::once(async move {
|
let stream = futures::stream::once(async move {
|
||||||
let mut schema_data =
|
let mut schema_data = FlightEncoder::default().encode(FlightMessage::Schema(schema));
|
||||||
FlightEncoder::with_compression_disabled().encode(FlightMessage::Schema(schema));
|
|
||||||
let metadata = DoPutMetadata::new(0);
|
let metadata = DoPutMetadata::new(0);
|
||||||
schema_data.app_metadata = serde_json::to_vec(&metadata).unwrap().into();
|
schema_data.app_metadata = serde_json::to_vec(&metadata).unwrap().into();
|
||||||
// first message in "DoPut" stream should carry table name in flight descriptor
|
// first message in "DoPut" stream should carry table name in flight descriptor
|
||||||
@@ -155,7 +155,7 @@ mod test {
|
|||||||
tokio_stream::iter(record_batches)
|
tokio_stream::iter(record_batches)
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.map(|(i, x)| {
|
.map(|(i, x)| {
|
||||||
let mut encoder = FlightEncoder::with_compression_disabled();
|
let mut encoder = FlightEncoder::default();
|
||||||
let message = FlightMessage::RecordBatch(x.into_df_record_batch());
|
let message = FlightMessage::RecordBatch(x.into_df_record_batch());
|
||||||
let mut data = encoder.encode(message);
|
let mut data = encoder.encode(message);
|
||||||
let metadata = DoPutMetadata::new((i + 1) as i64);
|
let metadata = DoPutMetadata::new((i + 1) as i64);
|
||||||
|
|||||||
@@ -42,7 +42,7 @@ use object_store::test_util::TempFolder;
|
|||||||
use object_store::ObjectStore;
|
use object_store::ObjectStore;
|
||||||
use servers::grpc::builder::GrpcServerBuilder;
|
use servers::grpc::builder::GrpcServerBuilder;
|
||||||
use servers::grpc::greptime_handler::GreptimeRequestHandler;
|
use servers::grpc::greptime_handler::GreptimeRequestHandler;
|
||||||
use servers::grpc::{GrpcOptions, GrpcServer, GrpcServerConfig};
|
use servers::grpc::{FlightCompression, GrpcOptions, GrpcServer, GrpcServerConfig};
|
||||||
use servers::http::{HttpOptions, HttpServerBuilder, PromValidationMode};
|
use servers::http::{HttpOptions, HttpServerBuilder, PromValidationMode};
|
||||||
use servers::metrics_handler::MetricsHandler;
|
use servers::metrics_handler::MetricsHandler;
|
||||||
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
|
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
|
||||||
@@ -585,6 +585,7 @@ pub async fn setup_grpc_server_with(
|
|||||||
ServerGrpcQueryHandlerAdapter::arc(fe_instance_ref.clone()),
|
ServerGrpcQueryHandlerAdapter::arc(fe_instance_ref.clone()),
|
||||||
user_provider.clone(),
|
user_provider.clone(),
|
||||||
Some(runtime.clone()),
|
Some(runtime.clone()),
|
||||||
|
FlightCompression::default(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let flight_handler = Arc::new(greptime_request_handler.clone());
|
let flight_handler = Arc::new(greptime_request_handler.clone());
|
||||||
|
|||||||
@@ -1025,6 +1025,7 @@ bind_addr = "127.0.0.1:4001"
|
|||||||
server_addr = "127.0.0.1:4001"
|
server_addr = "127.0.0.1:4001"
|
||||||
max_recv_message_size = "512MiB"
|
max_recv_message_size = "512MiB"
|
||||||
max_send_message_size = "512MiB"
|
max_send_message_size = "512MiB"
|
||||||
|
flight_compression = "arrow_ipc"
|
||||||
runtime_size = 8
|
runtime_size = 8
|
||||||
|
|
||||||
[grpc.tls]
|
[grpc.tls]
|
||||||
|
|||||||
Reference in New Issue
Block a user