diff --git a/src/servers/src/batch_builder.rs b/src/servers/src/batch_builder.rs index f8fb6211c8..8338865162 100644 --- a/src/servers/src/batch_builder.rs +++ b/src/servers/src/batch_builder.rs @@ -21,6 +21,7 @@ use arrow::array::{ ArrayBuilder, ArrayRef, BinaryBuilder, Float64Array, TimestampMillisecondArray, UInt64Array, UInt8Array, }; +use arrow::compute; use arrow_schema::Field; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_query::prelude::{GREPTIME_PHYSICAL_TABLE, GREPTIME_TIMESTAMP, GREPTIME_VALUE}; @@ -320,21 +321,19 @@ impl BatchEncoder { let value = Float64Array::from(self.value); let timestamp = TimestampMillisecondArray::from(self.timestamps); - let op_type = UInt8Array::from_value(OpType::Put as u8, num_rows); + let op_type = Arc::new(UInt8Array::from_value(OpType::Put as u8, num_rows)) as ArrayRef; // todo: now we set sequence all to 0. - let sequence = UInt64Array::from_value(0, num_rows); - // todo(hl): sort batch by primary key. - arrow::array::RecordBatch::try_new( - Self::schema(), - vec![ - Arc::new(value) as ArrayRef, - Arc::new(timestamp) as ArrayRef, - Arc::new(self.encoded_primary_key_array_builder.finish()) as ArrayRef, - Arc::new(sequence) as ArrayRef, - Arc::new(op_type) as ArrayRef, - ], - ) - .context(error::ArrowSnafu) + let sequence = Arc::new(UInt64Array::from_value(0, num_rows)) as ArrayRef; + + let pk = self.encoded_primary_key_array_builder.finish(); + let indices = compute::sort_to_indices(&pk, None, None).context(error::ArrowSnafu)?; + + // Sort arrays + let value = compute::take(&value, &indices, None).context(error::ArrowSnafu)?; + let ts = compute::take(×tamp, &indices, None).context(error::ArrowSnafu)?; + let pk = compute::take(&pk, &indices, None).context(error::ArrowSnafu)?; + arrow::array::RecordBatch::try_new(Self::schema(), vec![value, ts, pk, sequence, op_type]) + .context(error::ArrowSnafu) } }