poc/create-alter-for-metrics:

### Commit Message

 Enhance batch sorting in `batch_builder.rs`

 - Implement sorting of batches by primary key using `compute::sort_to_indices`.
 - Update `op_type` and `sequence` to use `Arc` for consistency.
 - Apply sorting to `value`, `timestamp`, and primary key arrays using `compute::take`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2025-06-25 07:48:58 +00:00
parent 892cb66c53
commit 4e53c1531d

View File

@@ -21,6 +21,7 @@ use arrow::array::{
ArrayBuilder, ArrayRef, BinaryBuilder, Float64Array, TimestampMillisecondArray, UInt64Array,
UInt8Array,
};
use arrow::compute;
use arrow_schema::Field;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::prelude::{GREPTIME_PHYSICAL_TABLE, GREPTIME_TIMESTAMP, GREPTIME_VALUE};
@@ -320,21 +321,19 @@ impl BatchEncoder {
let value = Float64Array::from(self.value);
let timestamp = TimestampMillisecondArray::from(self.timestamps);
let op_type = UInt8Array::from_value(OpType::Put as u8, num_rows);
let op_type = Arc::new(UInt8Array::from_value(OpType::Put as u8, num_rows)) as ArrayRef;
// todo: now we set sequence all to 0.
let sequence = UInt64Array::from_value(0, num_rows);
// todo(hl): sort batch by primary key.
arrow::array::RecordBatch::try_new(
Self::schema(),
vec![
Arc::new(value) as ArrayRef,
Arc::new(timestamp) as ArrayRef,
Arc::new(self.encoded_primary_key_array_builder.finish()) as ArrayRef,
Arc::new(sequence) as ArrayRef,
Arc::new(op_type) as ArrayRef,
],
)
.context(error::ArrowSnafu)
let sequence = Arc::new(UInt64Array::from_value(0, num_rows)) as ArrayRef;
let pk = self.encoded_primary_key_array_builder.finish();
let indices = compute::sort_to_indices(&pk, None, None).context(error::ArrowSnafu)?;
// Sort arrays
let value = compute::take(&value, &indices, None).context(error::ArrowSnafu)?;
let ts = compute::take(&timestamp, &indices, None).context(error::ArrowSnafu)?;
let pk = compute::take(&pk, &indices, None).context(error::ArrowSnafu)?;
arrow::array::RecordBatch::try_new(Self::schema(), vec![value, ts, pk, sequence, op_type])
.context(error::ArrowSnafu)
}
}