refactor: per review

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-04-29 16:54:24 +08:00
parent 5166282269
commit 04fd62dccc
7 changed files with 497 additions and 246 deletions

View File

@@ -17,8 +17,10 @@ mod catalog;
use std::collections::HashMap;
use std::fmt::Debug;
use std::ops::Deref;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::task::{Context, Poll};
use std::time::Duration;
use api::region::RegionResponse;
@@ -36,7 +38,8 @@ use common_error::status_code::StatusCode;
use common_meta::datanode::TopicStatsReporter;
use common_query::OutputData;
use common_query::request::QueryRequest;
use common_recordbatch::SendableRecordBatchStream;
use common_recordbatch::adapter::RecordBatchMetrics;
use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use common_runtime::Runtime;
use common_telemetry::tracing::{self, info_span};
use common_telemetry::tracing_context::{FutureExt, TracingContext};
@@ -45,6 +48,7 @@ use dashmap::DashMap;
use datafusion::datasource::TableProvider;
use datafusion_common::tree_node::TreeNode;
use either::Either;
use futures_util::Stream;
use futures_util::future::try_join_all;
use metric_engine::engine::MetricEngine;
use mito2::engine::{MITO_ENGINE_NAME, MitoEngine};
@@ -53,6 +57,7 @@ use query::QueryEngineRef;
pub use query::dummy_catalog::{
DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef,
};
use query::options::should_collect_region_watermark_from_extensions;
use serde_json;
use servers::error::{
self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult, SuspendedSnafu,
@@ -278,16 +283,31 @@ impl RegionServer {
.await
.context(DecodeLogicalPlanSnafu)?;
self.inner
let stream = self
.inner
.handle_read(
QueryRequest {
header: request.header,
region_id,
plan,
},
query_ctx,
query_ctx.clone(),
)
.await
.await?;
let region_latest_seq =
if should_collect_region_watermark_from_extensions(&query_ctx.extensions()) {
query_ctx.get_snapshot(region_id.as_u64())
} else {
None
};
if let Some(seq) = region_latest_seq {
Ok(Box::pin(RegionWatermarkStream::new(stream, region_id, seq))
as SendableRecordBatchStream)
} else {
Ok(stream)
}
}
#[tracing::instrument(skip_all)]
@@ -749,6 +769,83 @@ impl RegionServer {
}
}
/// Wraps a region read stream so terminal metrics can carry the scan-open watermark.
struct RegionWatermarkStream {
stream: SendableRecordBatchStream,
region_id: u64,
snapshot_seq: u64,
finished: AtomicBool,
}
impl RegionWatermarkStream {
fn new(stream: SendableRecordBatchStream, region_id: RegionId, snapshot_seq: u64) -> Self {
Self {
stream,
region_id: region_id.as_u64(),
snapshot_seq,
finished: AtomicBool::new(false),
}
}
fn merged_metrics(&self, mut metrics: RecordBatchMetrics) -> RecordBatchMetrics {
let entry = if let Some(entry) = metrics
.region_watermarks
.iter_mut()
.find(|entry| entry.region_id == self.region_id)
{
entry
} else {
metrics
.region_watermarks
.push(common_recordbatch::adapter::RegionWatermarkEntry {
region_id: self.region_id,
watermark: None,
});
metrics.region_watermarks.last_mut().unwrap()
};
entry.watermark = Some(self.snapshot_seq);
metrics
}
}
impl RecordBatchStream for RegionWatermarkStream {
fn name(&self) -> &str {
self.stream.name()
}
fn schema(&self) -> datatypes::schema::SchemaRef {
self.stream.schema()
}
fn output_ordering(&self) -> Option<&[OrderOption]> {
self.stream.output_ordering()
}
fn metrics(&self) -> Option<RecordBatchMetrics> {
let base = self.stream.metrics();
if !self.finished.load(Ordering::Relaxed) {
return base;
}
Some(self.merged_metrics(base.unwrap_or_default()))
}
}
impl Stream for RegionWatermarkStream {
type Item = common_recordbatch::error::Result<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Ready(None) => {
self.finished.store(true, Ordering::Relaxed);
Poll::Ready(None)
}
other => other,
}
}
}
#[async_trait]
impl RegionServerHandler for RegionServer {
async fn handle(&self, request: region_request::Body) -> ServerResult<RegionResponseV1> {
@@ -1669,10 +1766,16 @@ impl RegionAttribute {
mod tests {
use std::assert_matches;
use std::sync::Arc;
use api::v1::SemanticType;
use common_error::ext::ErrorExt;
use datatypes::prelude::ConcreteDataType;
use common_recordbatch::RecordBatches;
use common_recordbatch::adapter::RegionWatermarkEntry;
use datatypes::prelude::{ConcreteDataType, VectorRef};
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::Int32Vector;
use futures_util::StreamExt;
use mito2::test_util::CreateRequestBuilder;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
use store_api::region_engine::RegionEngine;
@@ -1685,6 +1788,36 @@ mod tests {
use crate::error::Result;
use crate::tests::{MockRegionEngine, mock_region_server};
#[tokio::test]
async fn test_region_watermark_stream_only_sets_terminal_metrics() {
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
"v",
ConcreteDataType::int32_datatype(),
false,
)]));
let values: VectorRef = Arc::new(Int32Vector::from_slice([1, 2]));
let batch = RecordBatch::new(schema.clone(), vec![values]).unwrap();
let stream = RecordBatches::try_new(schema, vec![batch])
.unwrap()
.as_stream();
let region_id = RegionId::new(42, 7);
let wrapped = RegionWatermarkStream::new(stream, region_id, 99);
let mut pinned = Box::pin(wrapped);
assert!(pinned.as_ref().get_ref().metrics().is_none());
while pinned.next().await.is_some() {}
let metrics = pinned.as_ref().get_ref().metrics().unwrap();
assert_eq!(
metrics.region_watermarks,
vec![RegionWatermarkEntry {
region_id: region_id.as_u64(),
watermark: Some(99),
}]
);
}
#[tokio::test]
async fn test_region_registering() {
common_telemetry::init_default_ut_logging();