mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-26 08:00:01 +00:00
Compare commits
2 Commits
release/v1
...
fix-topk
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8ac57368af | ||
|
|
0bc5a305be |
@@ -552,9 +552,8 @@ impl StartCommand {
|
||||
let grpc_handler = fe_instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>;
|
||||
let weak_grpc_handler = Arc::downgrade(&grpc_handler);
|
||||
frontend_instance_handler
|
||||
.lock()
|
||||
.unwrap()
|
||||
.replace(weak_grpc_handler);
|
||||
.set_handler(weak_grpc_handler)
|
||||
.await;
|
||||
|
||||
// set the frontend invoker for flownode
|
||||
let flow_streaming_engine = flownode.flow_engine().streaming_engine();
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::sync::{Arc, Mutex, Weak};
|
||||
use std::time::SystemTime;
|
||||
|
||||
use api::v1::greptime_request::Request;
|
||||
@@ -38,6 +38,7 @@ use servers::query_handler::grpc::GrpcQueryHandler;
|
||||
use session::context::{QueryContextBuilder, QueryContextRef};
|
||||
use session::hints::READ_PREFERENCE_HINT;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use tokio::sync::SetOnce;
|
||||
|
||||
use crate::batching_mode::BatchingModeOptions;
|
||||
use crate::error::{
|
||||
@@ -75,7 +76,19 @@ impl<E: ErrorExt + Send + Sync + 'static, T: GrpcQueryHandler<Error = E> + Send
|
||||
}
|
||||
}
|
||||
|
||||
type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>;
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct HandlerMutable {
|
||||
handler: Arc<Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>,
|
||||
is_initialized: Arc<SetOnce<()>>,
|
||||
}
|
||||
|
||||
impl HandlerMutable {
|
||||
pub async fn set_handler(&self, handler: Weak<dyn GrpcQueryHandlerWithBoxedError>) {
|
||||
*self.handler.lock().unwrap() = Some(handler);
|
||||
// Ignore the error, as we allow the handler to be set multiple times.
|
||||
let _ = self.is_initialized.set(());
|
||||
}
|
||||
}
|
||||
|
||||
/// A simple frontend client able to execute sql using grpc protocol
|
||||
///
|
||||
@@ -100,7 +113,11 @@ pub enum FrontendClient {
|
||||
impl FrontendClient {
|
||||
/// Create a new empty frontend client, with a `HandlerMutable` to set the grpc handler later
|
||||
pub fn from_empty_grpc_handler(query: QueryOptions) -> (Self, HandlerMutable) {
|
||||
let handler = Arc::new(std::sync::Mutex::new(None));
|
||||
let is_initialized = Arc::new(SetOnce::new());
|
||||
let handler = HandlerMutable {
|
||||
handler: Arc::new(Mutex::new(None)),
|
||||
is_initialized,
|
||||
};
|
||||
(
|
||||
Self::Standalone {
|
||||
database_client: handler.clone(),
|
||||
@@ -110,23 +127,13 @@ impl FrontendClient {
|
||||
)
|
||||
}
|
||||
|
||||
/// Check if the frontend client is initialized.
|
||||
///
|
||||
/// In distributed mode, it is always initialized.
|
||||
/// In standalone mode, it checks if the database client is set.
|
||||
pub fn is_initialized(&self) -> bool {
|
||||
match self {
|
||||
FrontendClient::Distributed { .. } => true,
|
||||
FrontendClient::Standalone {
|
||||
database_client, ..
|
||||
} => {
|
||||
let guard = database_client.lock();
|
||||
if let Ok(guard) = guard {
|
||||
guard.is_some()
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
/// Waits until the frontend client is initialized.
|
||||
pub async fn wait_initialized(&self) {
|
||||
if let FrontendClient::Standalone {
|
||||
database_client, ..
|
||||
} = self
|
||||
{
|
||||
database_client.is_initialized.wait().await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -158,8 +165,14 @@ impl FrontendClient {
|
||||
grpc_handler: Weak<dyn GrpcQueryHandlerWithBoxedError>,
|
||||
query: QueryOptions,
|
||||
) -> Self {
|
||||
let is_initialized = Arc::new(SetOnce::new_with(Some(())));
|
||||
let handler = HandlerMutable {
|
||||
handler: Arc::new(Mutex::new(Some(grpc_handler))),
|
||||
is_initialized: is_initialized.clone(),
|
||||
};
|
||||
|
||||
Self::Standalone {
|
||||
database_client: Arc::new(std::sync::Mutex::new(Some(grpc_handler))),
|
||||
database_client: handler,
|
||||
query,
|
||||
}
|
||||
}
|
||||
@@ -341,6 +354,7 @@ impl FrontendClient {
|
||||
{
|
||||
let database_client = {
|
||||
database_client
|
||||
.handler
|
||||
.lock()
|
||||
.map_err(|e| {
|
||||
UnexpectedSnafu {
|
||||
@@ -418,6 +432,7 @@ impl FrontendClient {
|
||||
{
|
||||
let database_client = {
|
||||
database_client
|
||||
.handler
|
||||
.lock()
|
||||
.map_err(|e| {
|
||||
UnexpectedSnafu {
|
||||
@@ -480,3 +495,73 @@ impl std::fmt::Display for PeerDesc {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
use common_query::Output;
|
||||
use tokio::time::timeout;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct NoopHandler;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl GrpcQueryHandlerWithBoxedError for NoopHandler {
|
||||
async fn do_query(
|
||||
&self,
|
||||
_query: Request,
|
||||
_ctx: QueryContextRef,
|
||||
) -> std::result::Result<Output, BoxedError> {
|
||||
Ok(Output::new_with_affected_rows(0))
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn wait_initialized() {
|
||||
let (client, handler_mut) =
|
||||
FrontendClient::from_empty_grpc_handler(QueryOptions::default());
|
||||
|
||||
assert!(
|
||||
timeout(Duration::from_millis(50), client.wait_initialized())
|
||||
.await
|
||||
.is_err()
|
||||
);
|
||||
|
||||
let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
|
||||
handler_mut.set_handler(Arc::downgrade(&handler)).await;
|
||||
|
||||
timeout(Duration::from_secs(1), client.wait_initialized())
|
||||
.await
|
||||
.expect("wait_initialized should complete after handler is set");
|
||||
|
||||
timeout(Duration::from_millis(10), client.wait_initialized())
|
||||
.await
|
||||
.expect("wait_initialized should be a no-op once initialized");
|
||||
|
||||
let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
|
||||
let client =
|
||||
FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
|
||||
assert!(
|
||||
timeout(Duration::from_millis(10), client.wait_initialized())
|
||||
.await
|
||||
.is_ok()
|
||||
);
|
||||
|
||||
let meta_client = Arc::new(MetaClient::default());
|
||||
let client = FrontendClient::from_meta_client(
|
||||
meta_client,
|
||||
None,
|
||||
QueryOptions::default(),
|
||||
BatchingModeOptions::default(),
|
||||
)
|
||||
.unwrap();
|
||||
assert!(
|
||||
timeout(Duration::from_millis(10), client.wait_initialized())
|
||||
.await
|
||||
.is_ok()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -869,8 +869,16 @@ impl PartSortStream {
|
||||
|
||||
// If we've processed all partitions, mark completion.
|
||||
if self.cur_part_idx >= self.partition_ranges.len() {
|
||||
debug_assert!(remaining_range.num_rows() == 0);
|
||||
// If there is remaining data here, it means the input data doesn't match the
|
||||
// provided `PartitionRange`s (e.g. out-of-order input or mismatched ranges).
|
||||
// In release builds, the previous `debug_assert!` would silently drop data and
|
||||
// could lead to incorrect empty results. To keep query correctness, fall back
|
||||
// to consuming the remaining data as part of the last range.
|
||||
if remaining_range.num_rows() != 0 {
|
||||
self.push_buffer(remaining_range)?;
|
||||
}
|
||||
self.input_complete = true;
|
||||
self.evaluating_batch = None;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -937,7 +945,12 @@ impl PartSortStream {
|
||||
// If we've processed all partitions, sort and output
|
||||
if self.cur_part_idx >= self.partition_ranges.len() {
|
||||
// assert there is no data beyond the last partition range (remaining is empty).
|
||||
debug_assert!(remaining_range.num_rows() == 0);
|
||||
// Similar to the TopK path, do not silently drop remaining data in release builds.
|
||||
// If this happens, the input stream doesn't match `PartitionRange`s; include the
|
||||
// remaining data for correctness.
|
||||
if remaining_range.num_rows() != 0 {
|
||||
self.push_buffer(remaining_range)?;
|
||||
}
|
||||
|
||||
// Sort and output the final group
|
||||
return self.sorted_buffer_if_non_empty();
|
||||
@@ -999,11 +1012,11 @@ impl PartSortStream {
|
||||
{
|
||||
// Check if we've already processed all partitions
|
||||
if self.cur_part_idx >= self.partition_ranges.len() {
|
||||
// All partitions processed, discard remaining data
|
||||
if let Some(sorted_batch) = self.sorted_buffer_if_non_empty()? {
|
||||
return Poll::Ready(Some(Ok(sorted_batch)));
|
||||
}
|
||||
return Poll::Ready(None);
|
||||
// All partitions processed but we still have remaining data in-flight.
|
||||
// Don't discard it, otherwise we may incorrectly return an empty result.
|
||||
self.push_buffer(evaluating_batch)?;
|
||||
self.input_complete = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(sorted_batch) = self.split_batch(evaluating_batch)? {
|
||||
@@ -1431,6 +1444,47 @@ mod test {
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn topk_does_not_silently_drop_out_of_range_data() {
|
||||
let unit = TimeUnit::Millisecond;
|
||||
let schema = Arc::new(Schema::new(vec![Field::new(
|
||||
"ts",
|
||||
DataType::Timestamp(unit, None),
|
||||
false,
|
||||
)]));
|
||||
|
||||
// The input data is outside the provided PartitionRange.
|
||||
// Historically this could lead to an empty result in release builds due to
|
||||
// `debug_assert!`-only checks dropping the remaining batch.
|
||||
let input_ranged_data = vec![(
|
||||
PartitionRange {
|
||||
start: Timestamp::new(0, common_time::timestamp::TimeUnit::from(&unit)),
|
||||
end: Timestamp::new(10, common_time::timestamp::TimeUnit::from(&unit)),
|
||||
num_rows: 1,
|
||||
identifier: 0,
|
||||
},
|
||||
vec![
|
||||
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![100])])
|
||||
.unwrap(),
|
||||
],
|
||||
)];
|
||||
|
||||
let expected_output = Some(
|
||||
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![100])]).unwrap(),
|
||||
);
|
||||
|
||||
run_test(
|
||||
0,
|
||||
input_ranged_data,
|
||||
schema,
|
||||
SortOptions::default(),
|
||||
Some(10),
|
||||
expected_output,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[allow(clippy::print_stdout)]
|
||||
async fn run_test(
|
||||
case_id: usize,
|
||||
|
||||
@@ -259,9 +259,8 @@ impl GreptimeDbStandaloneBuilder {
|
||||
let grpc_handler = instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>;
|
||||
let weak_grpc_handler = Arc::downgrade(&grpc_handler);
|
||||
frontend_instance_handler
|
||||
.lock()
|
||||
.unwrap()
|
||||
.replace(weak_grpc_handler);
|
||||
.set_handler(weak_grpc_handler)
|
||||
.await;
|
||||
|
||||
let flow_streaming_engine = flownode.flow_engine().streaming_engine();
|
||||
let invoker = flow::FrontendInvoker::build_from(
|
||||
|
||||
Reference in New Issue
Block a user