From 4e53c1531d16a549464e4b9dab0d03b0521248a3 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Wed, 25 Jun 2025 07:48:58 +0000 Subject: [PATCH] poc/create-alter-for-metrics: ### Commit Message Enhance batch sorting in `batch_builder.rs` - Implement sorting of batches by primary key using `compute::sort_to_indices`. - Update `op_type` and `sequence` to use `Arc` for consistency. - Apply sorting to `value`, `timestamp`, and primary key arrays using `compute::take`. Signed-off-by: Lei, HUANG --- src/servers/src/batch_builder.rs | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/src/servers/src/batch_builder.rs b/src/servers/src/batch_builder.rs index f8fb6211c8..8338865162 100644 --- a/src/servers/src/batch_builder.rs +++ b/src/servers/src/batch_builder.rs @@ -21,6 +21,7 @@ use arrow::array::{ ArrayBuilder, ArrayRef, BinaryBuilder, Float64Array, TimestampMillisecondArray, UInt64Array, UInt8Array, }; +use arrow::compute; use arrow_schema::Field; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_query::prelude::{GREPTIME_PHYSICAL_TABLE, GREPTIME_TIMESTAMP, GREPTIME_VALUE}; @@ -320,21 +321,19 @@ impl BatchEncoder { let value = Float64Array::from(self.value); let timestamp = TimestampMillisecondArray::from(self.timestamps); - let op_type = UInt8Array::from_value(OpType::Put as u8, num_rows); + let op_type = Arc::new(UInt8Array::from_value(OpType::Put as u8, num_rows)) as ArrayRef; // todo: now we set sequence all to 0. - let sequence = UInt64Array::from_value(0, num_rows); - // todo(hl): sort batch by primary key. - arrow::array::RecordBatch::try_new( - Self::schema(), - vec![ - Arc::new(value) as ArrayRef, - Arc::new(timestamp) as ArrayRef, - Arc::new(self.encoded_primary_key_array_builder.finish()) as ArrayRef, - Arc::new(sequence) as ArrayRef, - Arc::new(op_type) as ArrayRef, - ], - ) - .context(error::ArrowSnafu) + let sequence = Arc::new(UInt64Array::from_value(0, num_rows)) as ArrayRef; + + let pk = self.encoded_primary_key_array_builder.finish(); + let indices = compute::sort_to_indices(&pk, None, None).context(error::ArrowSnafu)?; + + // Sort arrays + let value = compute::take(&value, &indices, None).context(error::ArrowSnafu)?; + let ts = compute::take(×tamp, &indices, None).context(error::ArrowSnafu)?; + let pk = compute::take(&pk, &indices, None).context(error::ArrowSnafu)?; + arrow::array::RecordBatch::try_new(Self::schema(), vec![value, ts, pk, sequence, op_type]) + .context(error::ArrowSnafu) } }