mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-14 12:00:40 +00:00
@@ -21,7 +21,7 @@ use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_telemetry::tracing::{Span, debug_span};
|
||||
use common_telemetry::tracing::{Span, info_span};
|
||||
use common_time::util::format_nanoseconds_human_readable;
|
||||
use datafusion::arrow::compute::cast;
|
||||
use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
|
||||
@@ -247,7 +247,7 @@ impl RecordBatchStreamAdapter {
|
||||
pub fn try_new_with_span(stream: DfSendableRecordBatchStream, span: Span) -> Result<Self> {
|
||||
let schema =
|
||||
Arc::new(Schema::try_from(stream.schema()).context(error::SchemaConversionSnafu)?);
|
||||
let subspan = debug_span!(parent: &span, "RecordBatchStreamAdapter");
|
||||
let subspan = info_span!(parent: &span, "RecordBatchStreamAdapter");
|
||||
Ok(Self {
|
||||
schema,
|
||||
stream,
|
||||
@@ -301,13 +301,15 @@ impl Stream for RecordBatchStreamAdapter {
|
||||
.map(|m| m.elapsed_compute().clone())
|
||||
.unwrap_or_default();
|
||||
let _guard = timer.timer();
|
||||
let poll_span = debug_span!(parent: &self.span, "poll_next");
|
||||
let poll_span = info_span!(parent: &self.span, "poll_next");
|
||||
let _entered = poll_span.enter();
|
||||
match Pin::new(&mut self.stream).poll_next(cx) {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(Some(df_record_batch)) => {
|
||||
let df_record_batch = df_record_batch?;
|
||||
if let Metrics::Unresolved(df_plan) = &self.metrics_2 {
|
||||
if let Metrics::Unresolved(df_plan) | Metrics::PartialResolved(df_plan, _) =
|
||||
&self.metrics_2
|
||||
{
|
||||
let mut metric_collector = MetricCollector::new(self.explain_verbose);
|
||||
accept(df_plan.as_ref(), &mut metric_collector).unwrap();
|
||||
self.metrics_2 = Metrics::PartialResolved(
|
||||
@@ -460,6 +462,7 @@ fn format_bytes_human_readable(bytes: usize) -> String {
|
||||
format!("{}", ReadableSize(bytes as u64))
|
||||
}
|
||||
|
||||
/// Only display `plan_metrics` with indent ` ` (2 spaces).
|
||||
impl Display for RecordBatchMetrics {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
for metric in &self.plan_metrics {
|
||||
|
||||
@@ -66,20 +66,24 @@ impl ConvertBatchStream {
|
||||
}
|
||||
}
|
||||
|
||||
fn convert_into_pending(&mut self, batch: ScanBatch) -> common_recordbatch::error::Result<()> {
|
||||
fn convert(&mut self, batch: ScanBatch) -> common_recordbatch::error::Result<RecordBatch> {
|
||||
match batch {
|
||||
ScanBatch::Normal(batch) => {
|
||||
// Safety: Only primary key format returns this batch.
|
||||
let mapper = self.projection_mapper.as_primary_key().unwrap();
|
||||
|
||||
if batch.is_empty() {
|
||||
self.pending.push_back(mapper.empty_record_batch());
|
||||
Ok(mapper.empty_record_batch())
|
||||
} else {
|
||||
self.pending
|
||||
.push_back(mapper.convert(&batch, &self.cache_strategy)?);
|
||||
mapper.convert(&batch, &self.cache_strategy)
|
||||
}
|
||||
}
|
||||
ScanBatch::Series(series) => {
|
||||
debug_assert!(
|
||||
self.pending.is_empty(),
|
||||
"ConvertBatchStream should not convert a new SeriesBatch when pending batches exist"
|
||||
);
|
||||
|
||||
match series {
|
||||
SeriesBatch::PrimaryKey(primary_key_batch) => {
|
||||
// Safety: Only primary key format returns this batch.
|
||||
@@ -100,17 +104,20 @@ impl ConvertBatchStream {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let output_schema = self.projection_mapper.output_schema();
|
||||
Ok(self
|
||||
.pending
|
||||
.pop_front()
|
||||
.unwrap_or_else(|| RecordBatch::new_empty(output_schema)))
|
||||
}
|
||||
ScanBatch::RecordBatch(df_record_batch) => {
|
||||
// Safety: Only flat format returns this batch.
|
||||
let mapper = self.projection_mapper.as_flat().unwrap();
|
||||
|
||||
self.pending
|
||||
.push_back(mapper.convert(&df_record_batch, &self.cache_strategy)?);
|
||||
mapper.convert(&df_record_batch, &self.cache_strategy)
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -122,30 +129,21 @@ impl Stream for ConvertBatchStream {
|
||||
return Poll::Ready(Some(Ok(batch)));
|
||||
}
|
||||
|
||||
loop {
|
||||
let batch = futures::ready!(self.inner.poll_next_unpin(cx));
|
||||
let Some(batch) = batch else {
|
||||
return Poll::Ready(None);
|
||||
};
|
||||
let batch = futures::ready!(self.inner.poll_next_unpin(cx));
|
||||
let Some(batch) = batch else {
|
||||
return Poll::Ready(None);
|
||||
};
|
||||
|
||||
let result = match batch {
|
||||
Ok(batch) => {
|
||||
let start = Instant::now();
|
||||
let result = self.convert_into_pending(batch);
|
||||
self.partition_metrics
|
||||
.inc_convert_batch_cost(start.elapsed());
|
||||
result
|
||||
}
|
||||
Err(e) => Err(BoxedError::new(e)).context(ExternalSnafu),
|
||||
};
|
||||
|
||||
if let Err(e) = result {
|
||||
return Poll::Ready(Some(Err(e)));
|
||||
let record_batch = match batch {
|
||||
Ok(batch) => {
|
||||
let start = Instant::now();
|
||||
let record_batch = self.convert(batch);
|
||||
self.partition_metrics
|
||||
.inc_convert_batch_cost(start.elapsed());
|
||||
record_batch
|
||||
}
|
||||
|
||||
if let Some(batch) = self.pending.pop_front() {
|
||||
return Poll::Ready(Some(Ok(batch)));
|
||||
}
|
||||
}
|
||||
Err(e) => Err(BoxedError::new(e)).context(ExternalSnafu),
|
||||
};
|
||||
Poll::Ready(Some(record_batch))
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user