refactor(servers): allow custom flight service (#7333)

* refactor/allow-custom-flight-service:
 ### Add Custom Flight Handler Support

 - **`server.rs`**:
   - Introduced a new field `flight_handler` in the `Services` struct to allow optional custom flight handler configuration.
   - Added a method `with_flight_handler` to set the custom flight handler.
   - Modified `build_grpc_server` to use the custom flight handler if provided, defaulting to `GreptimeRequestHandler` otherwise.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* refactor/allow-custom-flight-service:
 ### Make structs and enums public in `flight.rs`

 - Changed visibility of `PutRecordBatchRequest` and `PutRecordBatchRequestStream` structs to public.
 - Made `PutRecordBatchRequestStreamState` enum public.
 - Updated fields within `PutRecordBatchRequest` and `PutRecordBatchRequestStream` to be public.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2025-12-02 19:01:59 -08:00
committed by GitHub
parent 0177f244e9
commit 68fff3b1aa
2 changed files with 27 additions and 11 deletions

View File

@@ -22,6 +22,7 @@ use common_telemetry::info;
use meta_client::MetaClientOptions;
use servers::error::Error as ServerError;
use servers::grpc::builder::GrpcServerBuilder;
use servers::grpc::flight::FlightCraftRef;
use servers::grpc::frontend_grpc_handler::FrontendGrpcHandler;
use servers::grpc::greptime_handler::GreptimeRequestHandler;
use servers::grpc::{GrpcOptions, GrpcServer};
@@ -52,6 +53,7 @@ where
grpc_server_builder: Option<GrpcServerBuilder>,
http_server_builder: Option<HttpServerBuilder>,
plugins: Plugins,
flight_handler: Option<FlightCraftRef>,
}
impl<T> Services<T>
@@ -65,6 +67,7 @@ where
grpc_server_builder: None,
http_server_builder: None,
plugins,
flight_handler: None,
}
}
@@ -139,6 +142,13 @@ where
}
}
pub fn with_flight_handler(self, flight_handler: FlightCraftRef) -> Self {
Self {
flight_handler: Some(flight_handler),
..self
}
}
fn build_grpc_server(
&mut self,
grpc: &GrpcOptions,
@@ -173,6 +183,12 @@ where
grpc.flight_compression,
);
// Use custom flight handler if provided, otherwise use the default GreptimeRequestHandler
let flight_handler = self
.flight_handler
.clone()
.unwrap_or_else(|| Arc::new(greptime_request_handler.clone()) as FlightCraftRef);
let grpc_server = builder
.name(name)
.database_handler(greptime_request_handler.clone())
@@ -181,7 +197,7 @@ where
self.instance.clone(),
user_provider.clone(),
))
.flight_handler(Arc::new(greptime_request_handler));
.flight_handler(flight_handler);
let grpc_server = if !external {
let frontend_grpc_handler =

View File

@@ -249,11 +249,11 @@ impl FlightCraft for GreptimeRequestHandler {
}
}
pub(crate) struct PutRecordBatchRequest {
pub(crate) table_name: TableName,
pub(crate) request_id: i64,
pub(crate) data: FlightData,
pub(crate) _guard: Option<RequestMemoryGuard>,
pub struct PutRecordBatchRequest {
pub table_name: TableName,
pub request_id: i64,
pub data: FlightData,
pub _guard: Option<RequestMemoryGuard>,
}
impl PutRecordBatchRequest {
@@ -297,13 +297,13 @@ impl PutRecordBatchRequest {
}
}
pub(crate) struct PutRecordBatchRequestStream {
flight_data_stream: Streaming<FlightData>,
state: PutRecordBatchRequestStreamState,
limiter: Option<RequestMemoryLimiter>,
pub struct PutRecordBatchRequestStream {
pub flight_data_stream: Streaming<FlightData>,
pub state: PutRecordBatchRequestStreamState,
pub limiter: Option<RequestMemoryLimiter>,
}
enum PutRecordBatchRequestStreamState {
pub enum PutRecordBatchRequestStreamState {
Init(String, String),
Started(TableName),
}