From 7aae19aa8b67ed757beaa163451ababe624af1c5 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Fri, 19 Jul 2024 17:51:29 +0800 Subject: [PATCH] fix: dictionary key type use u32 (#4396) * fix: dictionary key type use u32 * fix: fix error whle reading content * fix: bulk memtable dictionary type --- src/mito2/src/memtable/bulk/part.rs | 19 ++++++++++--------- src/mito2/src/sst.rs | 2 +- src/mito2/src/sst/parquet/format.rs | 25 ++++++++++++++----------- 3 files changed, 25 insertions(+), 21 deletions(-) diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index 2f15968c6f..9e806a46bf 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -22,11 +22,11 @@ use datafusion::arrow::array::{TimestampNanosecondArray, UInt64Builder}; use datatypes::arrow; use datatypes::arrow::array::{ Array, ArrayRef, BinaryBuilder, DictionaryArray, RecordBatch, TimestampMicrosecondArray, - TimestampMillisecondArray, TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, - UInt8Array, UInt8Builder, + TimestampMillisecondArray, TimestampSecondArray, UInt32Array, UInt64Array, UInt8Array, + UInt8Builder, }; use datatypes::arrow::compute::TakeOptions; -use datatypes::arrow::datatypes::{DataType as ArrowDataType, SchemaRef, UInt16Type}; +use datatypes::arrow::datatypes::{DataType as ArrowDataType, SchemaRef}; use datatypes::arrow_array::BinaryArray; use datatypes::data_type::DataType; use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector}; @@ -40,6 +40,7 @@ use crate::error::{ComputeArrowSnafu, EncodeMemtableSnafu, NewRecordBatchSnafu, use crate::memtable::key_values::KeyValuesRef; use crate::read::Batch; use crate::row_converter::{McmpRowCodec, RowCodec}; +use crate::sst::parquet::format::PrimaryKeyArray; use crate::sst::to_sst_arrow_schema; #[derive(Debug)] @@ -344,17 +345,17 @@ fn timestamp_array_to_iter( } /// Converts a **sorted** [BinaryArray] to [DictionaryArray]. -fn binary_array_to_dictionary(input: &BinaryArray) -> Result> { +fn binary_array_to_dictionary(input: &BinaryArray) -> Result { if input.is_empty() { return Ok(DictionaryArray::new( - UInt16Array::from(Vec::::new()), + UInt32Array::from(Vec::::new()), Arc::new(BinaryArray::from_vec(vec![])) as ArrayRef, )); } let mut keys = Vec::with_capacity(16); let mut values = BinaryBuilder::new(); let mut prev: usize = 0; - keys.push(prev as u16); + keys.push(prev as u32); values.append_value(input.value(prev)); for current_bytes in input.iter().skip(1) { @@ -365,11 +366,11 @@ fn binary_array_to_dictionary(input: &BinaryArray) -> Result [FieldRef; 3] { [ Arc::new(Field::new_dictionary( PRIMARY_KEY_COLUMN_NAME, - ArrowDataType::UInt16, + ArrowDataType::UInt32, ArrowDataType::Binary, false, )), diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index 2c64800641..9f18a3390c 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -15,7 +15,7 @@ //! Format to store in parquet. //! //! We store three internal columns in parquet: -//! - `__primary_key`, the primary key of the row (tags). Type: dictionary(uint16, binary) +//! - `__primary_key`, the primary key of the row (tags). Type: dictionary(uint32, binary) //! - `__sequence`, the sequence number of a row. Type: uint64 //! - `__op_type`, the op type of the row. Type: uint8 //! @@ -32,8 +32,8 @@ use std::sync::Arc; use api::v1::SemanticType; use datafusion_common::ScalarValue; -use datatypes::arrow::array::{ArrayRef, BinaryArray, DictionaryArray, UInt16Array, UInt64Array}; -use datatypes::arrow::datatypes::{SchemaRef, UInt16Type}; +use datatypes::arrow::array::{ArrayRef, BinaryArray, DictionaryArray, UInt32Array, UInt64Array}; +use datatypes::arrow::datatypes::{SchemaRef, UInt32Type}; use datatypes::arrow::record_batch::RecordBatch; use datatypes::prelude::DataType; use datatypes::vectors::{Helper, Vector}; @@ -50,6 +50,9 @@ use crate::read::{Batch, BatchBuilder, BatchColumn}; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; use crate::sst::to_sst_arrow_schema; +/// Arrow array type for the primary key dictionary. +pub(crate) type PrimaryKeyArray = DictionaryArray; + /// Number of columns that have fixed positions. /// /// Contains: time index and internal columns. @@ -230,7 +233,7 @@ impl ReadFormat { // Compute primary key offsets. let pk_dict_array = pk_array .as_any() - .downcast_ref::>() + .downcast_ref::() .with_context(|| InvalidRecordBatchSnafu { reason: format!("primary key array should not be {:?}", pk_array.data_type()), })?; @@ -255,7 +258,7 @@ impl ReadFormat { let end = offsets[i + 1]; let rows_in_batch = end - start; let dict_key = keys.value(*start); - let primary_key = pk_values.value(dict_key.into()).to_vec(); + let primary_key = pk_values.value(dict_key as usize).to_vec(); let mut builder = BatchBuilder::new(primary_key); builder @@ -524,7 +527,7 @@ impl ReadFormat { } /// Compute offsets of different primary keys in the array. -fn primary_key_offsets(pk_dict_array: &DictionaryArray) -> Result> { +fn primary_key_offsets(pk_dict_array: &PrimaryKeyArray) -> Result> { if pk_dict_array.is_empty() { return Ok(Vec::new()); } @@ -549,7 +552,7 @@ fn primary_key_offsets(pk_dict_array: &DictionaryArray) -> Result ArrayRef { let values = Arc::new(BinaryArray::from_iter_values([primary_key])); - let keys = UInt16Array::from_value(0, num_rows); + let keys = UInt32Array::from_value(0, num_rows); // Safety: The key index is valid. Arc::new(DictionaryArray::new(keys, values)) @@ -627,7 +630,7 @@ mod tests { Field::new( "__primary_key", ArrowDataType::Dictionary( - Box::new(ArrowDataType::UInt16), + Box::new(ArrowDataType::UInt32), Box::new(ArrowDataType::Binary), ), false, @@ -674,15 +677,15 @@ mod tests { assert_eq!(&expect, &array); } - fn build_test_pk_array(pk_row_nums: &[(Vec, usize)]) -> Arc> { + fn build_test_pk_array(pk_row_nums: &[(Vec, usize)]) -> Arc { let values = Arc::new(BinaryArray::from_iter_values( pk_row_nums.iter().map(|v| &v.0), )); let mut keys = vec![]; for (index, num_rows) in pk_row_nums.iter().map(|v| v.1).enumerate() { - keys.extend(std::iter::repeat(index as u16).take(num_rows)); + keys.extend(std::iter::repeat(index as u32).take(num_rows)); } - let keys = UInt16Array::from(keys); + let keys = UInt32Array::from(keys); Arc::new(DictionaryArray::new(keys, values)) }