Compare commits

..

2 Commits

Author SHA1 Message Date
Ruihang Xia
8ac57368af fix: handle remaining buf
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-12-17 20:36:54 +08:00
fys
0bc5a305be chore: add wait_initialized method for frontend client (#7414)
* chore: add wait_initialized method for frontend client

* fix: some

* fix: cargo fmt

* add comment

* add unit test

* rename

* fix: cargo check

* fix: cr by copilot
2025-12-17 08:13:36 +00:00
4 changed files with 171 additions and 34 deletions

View File

@@ -552,9 +552,8 @@ impl StartCommand {
let grpc_handler = fe_instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>;
let weak_grpc_handler = Arc::downgrade(&grpc_handler);
frontend_instance_handler
.lock()
.unwrap()
.replace(weak_grpc_handler);
.set_handler(weak_grpc_handler)
.await;
// set the frontend invoker for flownode
let flow_streaming_engine = flownode.flow_engine().streaming_engine();

View File

@@ -15,7 +15,7 @@
//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
use std::collections::HashMap;
use std::sync::{Arc, Weak};
use std::sync::{Arc, Mutex, Weak};
use std::time::SystemTime;
use api::v1::greptime_request::Request;
@@ -38,6 +38,7 @@ use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::{QueryContextBuilder, QueryContextRef};
use session::hints::READ_PREFERENCE_HINT;
use snafu::{OptionExt, ResultExt};
use tokio::sync::SetOnce;
use crate::batching_mode::BatchingModeOptions;
use crate::error::{
@@ -75,7 +76,19 @@ impl<E: ErrorExt + Send + Sync + 'static, T: GrpcQueryHandler<Error = E> + Send
}
}
type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>;
#[derive(Debug, Clone)]
pub struct HandlerMutable {
handler: Arc<Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>,
is_initialized: Arc<SetOnce<()>>,
}
impl HandlerMutable {
pub async fn set_handler(&self, handler: Weak<dyn GrpcQueryHandlerWithBoxedError>) {
*self.handler.lock().unwrap() = Some(handler);
// Ignore the error, as we allow the handler to be set multiple times.
let _ = self.is_initialized.set(());
}
}
/// A simple frontend client able to execute sql using grpc protocol
///
@@ -100,7 +113,11 @@ pub enum FrontendClient {
impl FrontendClient {
/// Create a new empty frontend client, with a `HandlerMutable` to set the grpc handler later
pub fn from_empty_grpc_handler(query: QueryOptions) -> (Self, HandlerMutable) {
let handler = Arc::new(std::sync::Mutex::new(None));
let is_initialized = Arc::new(SetOnce::new());
let handler = HandlerMutable {
handler: Arc::new(Mutex::new(None)),
is_initialized,
};
(
Self::Standalone {
database_client: handler.clone(),
@@ -110,23 +127,13 @@ impl FrontendClient {
)
}
/// Check if the frontend client is initialized.
///
/// In distributed mode, it is always initialized.
/// In standalone mode, it checks if the database client is set.
pub fn is_initialized(&self) -> bool {
match self {
FrontendClient::Distributed { .. } => true,
FrontendClient::Standalone {
database_client, ..
} => {
let guard = database_client.lock();
if let Ok(guard) = guard {
guard.is_some()
} else {
false
}
}
/// Waits until the frontend client is initialized.
pub async fn wait_initialized(&self) {
if let FrontendClient::Standalone {
database_client, ..
} = self
{
database_client.is_initialized.wait().await;
}
}
@@ -158,8 +165,14 @@ impl FrontendClient {
grpc_handler: Weak<dyn GrpcQueryHandlerWithBoxedError>,
query: QueryOptions,
) -> Self {
let is_initialized = Arc::new(SetOnce::new_with(Some(())));
let handler = HandlerMutable {
handler: Arc::new(Mutex::new(Some(grpc_handler))),
is_initialized: is_initialized.clone(),
};
Self::Standalone {
database_client: Arc::new(std::sync::Mutex::new(Some(grpc_handler))),
database_client: handler,
query,
}
}
@@ -341,6 +354,7 @@ impl FrontendClient {
{
let database_client = {
database_client
.handler
.lock()
.map_err(|e| {
UnexpectedSnafu {
@@ -418,6 +432,7 @@ impl FrontendClient {
{
let database_client = {
database_client
.handler
.lock()
.map_err(|e| {
UnexpectedSnafu {
@@ -480,3 +495,73 @@ impl std::fmt::Display for PeerDesc {
}
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use common_query::Output;
use tokio::time::timeout;
use super::*;
#[derive(Debug)]
struct NoopHandler;
#[async_trait::async_trait]
impl GrpcQueryHandlerWithBoxedError for NoopHandler {
async fn do_query(
&self,
_query: Request,
_ctx: QueryContextRef,
) -> std::result::Result<Output, BoxedError> {
Ok(Output::new_with_affected_rows(0))
}
}
#[tokio::test]
async fn wait_initialized() {
let (client, handler_mut) =
FrontendClient::from_empty_grpc_handler(QueryOptions::default());
assert!(
timeout(Duration::from_millis(50), client.wait_initialized())
.await
.is_err()
);
let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
handler_mut.set_handler(Arc::downgrade(&handler)).await;
timeout(Duration::from_secs(1), client.wait_initialized())
.await
.expect("wait_initialized should complete after handler is set");
timeout(Duration::from_millis(10), client.wait_initialized())
.await
.expect("wait_initialized should be a no-op once initialized");
let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
let client =
FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
assert!(
timeout(Duration::from_millis(10), client.wait_initialized())
.await
.is_ok()
);
let meta_client = Arc::new(MetaClient::default());
let client = FrontendClient::from_meta_client(
meta_client,
None,
QueryOptions::default(),
BatchingModeOptions::default(),
)
.unwrap();
assert!(
timeout(Duration::from_millis(10), client.wait_initialized())
.await
.is_ok()
);
}
}

View File

@@ -869,8 +869,16 @@ impl PartSortStream {
// If we've processed all partitions, mark completion.
if self.cur_part_idx >= self.partition_ranges.len() {
debug_assert!(remaining_range.num_rows() == 0);
// If there is remaining data here, it means the input data doesn't match the
// provided `PartitionRange`s (e.g. out-of-order input or mismatched ranges).
// In release builds, the previous `debug_assert!` would silently drop data and
// could lead to incorrect empty results. To keep query correctness, fall back
// to consuming the remaining data as part of the last range.
if remaining_range.num_rows() != 0 {
self.push_buffer(remaining_range)?;
}
self.input_complete = true;
self.evaluating_batch = None;
return Ok(());
}
@@ -937,7 +945,12 @@ impl PartSortStream {
// If we've processed all partitions, sort and output
if self.cur_part_idx >= self.partition_ranges.len() {
// assert there is no data beyond the last partition range (remaining is empty).
debug_assert!(remaining_range.num_rows() == 0);
// Similar to the TopK path, do not silently drop remaining data in release builds.
// If this happens, the input stream doesn't match `PartitionRange`s; include the
// remaining data for correctness.
if remaining_range.num_rows() != 0 {
self.push_buffer(remaining_range)?;
}
// Sort and output the final group
return self.sorted_buffer_if_non_empty();
@@ -999,11 +1012,11 @@ impl PartSortStream {
{
// Check if we've already processed all partitions
if self.cur_part_idx >= self.partition_ranges.len() {
// All partitions processed, discard remaining data
if let Some(sorted_batch) = self.sorted_buffer_if_non_empty()? {
return Poll::Ready(Some(Ok(sorted_batch)));
}
return Poll::Ready(None);
// All partitions processed but we still have remaining data in-flight.
// Don't discard it, otherwise we may incorrectly return an empty result.
self.push_buffer(evaluating_batch)?;
self.input_complete = true;
continue;
}
if let Some(sorted_batch) = self.split_batch(evaluating_batch)? {
@@ -1431,6 +1444,47 @@ mod test {
}
}
#[tokio::test]
async fn topk_does_not_silently_drop_out_of_range_data() {
let unit = TimeUnit::Millisecond;
let schema = Arc::new(Schema::new(vec![Field::new(
"ts",
DataType::Timestamp(unit, None),
false,
)]));
// The input data is outside the provided PartitionRange.
// Historically this could lead to an empty result in release builds due to
// `debug_assert!`-only checks dropping the remaining batch.
let input_ranged_data = vec![(
PartitionRange {
start: Timestamp::new(0, common_time::timestamp::TimeUnit::from(&unit)),
end: Timestamp::new(10, common_time::timestamp::TimeUnit::from(&unit)),
num_rows: 1,
identifier: 0,
},
vec![
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![100])])
.unwrap(),
],
)];
let expected_output = Some(
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![100])]).unwrap(),
);
run_test(
0,
input_ranged_data,
schema,
SortOptions::default(),
Some(10),
expected_output,
None,
)
.await;
}
#[allow(clippy::print_stdout)]
async fn run_test(
case_id: usize,

View File

@@ -259,9 +259,8 @@ impl GreptimeDbStandaloneBuilder {
let grpc_handler = instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>;
let weak_grpc_handler = Arc::downgrade(&grpc_handler);
frontend_instance_handler
.lock()
.unwrap()
.replace(weak_grpc_handler);
.set_handler(weak_grpc_handler)
.await;
let flow_streaming_engine = flownode.flow_engine().streaming_engine();
let invoker = flow::FrontendInvoker::build_from(