mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-29 11:20:38 +00:00
fix: dictionary key type use u32 (#4396)
* fix: dictionary key type use u32 * fix: fix error whle reading content * fix: bulk memtable dictionary type
This commit is contained in:
@@ -22,11 +22,11 @@ use datafusion::arrow::array::{TimestampNanosecondArray, UInt64Builder};
|
||||
use datatypes::arrow;
|
||||
use datatypes::arrow::array::{
|
||||
Array, ArrayRef, BinaryBuilder, DictionaryArray, RecordBatch, TimestampMicrosecondArray,
|
||||
TimestampMillisecondArray, TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array,
|
||||
UInt8Array, UInt8Builder,
|
||||
TimestampMillisecondArray, TimestampSecondArray, UInt32Array, UInt64Array, UInt8Array,
|
||||
UInt8Builder,
|
||||
};
|
||||
use datatypes::arrow::compute::TakeOptions;
|
||||
use datatypes::arrow::datatypes::{DataType as ArrowDataType, SchemaRef, UInt16Type};
|
||||
use datatypes::arrow::datatypes::{DataType as ArrowDataType, SchemaRef};
|
||||
use datatypes::arrow_array::BinaryArray;
|
||||
use datatypes::data_type::DataType;
|
||||
use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
|
||||
@@ -40,6 +40,7 @@ use crate::error::{ComputeArrowSnafu, EncodeMemtableSnafu, NewRecordBatchSnafu,
|
||||
use crate::memtable::key_values::KeyValuesRef;
|
||||
use crate::read::Batch;
|
||||
use crate::row_converter::{McmpRowCodec, RowCodec};
|
||||
use crate::sst::parquet::format::PrimaryKeyArray;
|
||||
use crate::sst::to_sst_arrow_schema;
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -344,17 +345,17 @@ fn timestamp_array_to_iter(
|
||||
}
|
||||
|
||||
/// Converts a **sorted** [BinaryArray] to [DictionaryArray].
|
||||
fn binary_array_to_dictionary(input: &BinaryArray) -> Result<DictionaryArray<UInt16Type>> {
|
||||
fn binary_array_to_dictionary(input: &BinaryArray) -> Result<PrimaryKeyArray> {
|
||||
if input.is_empty() {
|
||||
return Ok(DictionaryArray::new(
|
||||
UInt16Array::from(Vec::<u16>::new()),
|
||||
UInt32Array::from(Vec::<u32>::new()),
|
||||
Arc::new(BinaryArray::from_vec(vec![])) as ArrayRef,
|
||||
));
|
||||
}
|
||||
let mut keys = Vec::with_capacity(16);
|
||||
let mut values = BinaryBuilder::new();
|
||||
let mut prev: usize = 0;
|
||||
keys.push(prev as u16);
|
||||
keys.push(prev as u32);
|
||||
values.append_value(input.value(prev));
|
||||
|
||||
for current_bytes in input.iter().skip(1) {
|
||||
@@ -365,11 +366,11 @@ fn binary_array_to_dictionary(input: &BinaryArray) -> Result<DictionaryArray<UIn
|
||||
values.append_value(current_bytes);
|
||||
prev += 1;
|
||||
}
|
||||
keys.push(prev as u16);
|
||||
keys.push(prev as u32);
|
||||
}
|
||||
|
||||
Ok(DictionaryArray::new(
|
||||
UInt16Array::from(keys),
|
||||
UInt32Array::from(keys),
|
||||
Arc::new(values.finish()) as ArrayRef,
|
||||
))
|
||||
}
|
||||
@@ -387,7 +388,7 @@ mod tests {
|
||||
|
||||
fn check_binary_array_to_dictionary(
|
||||
input: &[&[u8]],
|
||||
expected_keys: &[u16],
|
||||
expected_keys: &[u32],
|
||||
expected_values: &[&[u8]],
|
||||
) {
|
||||
let input = BinaryArray::from_iter_values(input.iter());
|
||||
|
||||
@@ -69,7 +69,7 @@ fn internal_fields() -> [FieldRef; 3] {
|
||||
[
|
||||
Arc::new(Field::new_dictionary(
|
||||
PRIMARY_KEY_COLUMN_NAME,
|
||||
ArrowDataType::UInt16,
|
||||
ArrowDataType::UInt32,
|
||||
ArrowDataType::Binary,
|
||||
false,
|
||||
)),
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
//! Format to store in parquet.
|
||||
//!
|
||||
//! We store three internal columns in parquet:
|
||||
//! - `__primary_key`, the primary key of the row (tags). Type: dictionary(uint16, binary)
|
||||
//! - `__primary_key`, the primary key of the row (tags). Type: dictionary(uint32, binary)
|
||||
//! - `__sequence`, the sequence number of a row. Type: uint64
|
||||
//! - `__op_type`, the op type of the row. Type: uint8
|
||||
//!
|
||||
@@ -32,8 +32,8 @@ use std::sync::Arc;
|
||||
|
||||
use api::v1::SemanticType;
|
||||
use datafusion_common::ScalarValue;
|
||||
use datatypes::arrow::array::{ArrayRef, BinaryArray, DictionaryArray, UInt16Array, UInt64Array};
|
||||
use datatypes::arrow::datatypes::{SchemaRef, UInt16Type};
|
||||
use datatypes::arrow::array::{ArrayRef, BinaryArray, DictionaryArray, UInt32Array, UInt64Array};
|
||||
use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use datatypes::prelude::DataType;
|
||||
use datatypes::vectors::{Helper, Vector};
|
||||
@@ -50,6 +50,9 @@ use crate::read::{Batch, BatchBuilder, BatchColumn};
|
||||
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
|
||||
use crate::sst::to_sst_arrow_schema;
|
||||
|
||||
/// Arrow array type for the primary key dictionary.
|
||||
pub(crate) type PrimaryKeyArray = DictionaryArray<UInt32Type>;
|
||||
|
||||
/// Number of columns that have fixed positions.
|
||||
///
|
||||
/// Contains: time index and internal columns.
|
||||
@@ -230,7 +233,7 @@ impl ReadFormat {
|
||||
// Compute primary key offsets.
|
||||
let pk_dict_array = pk_array
|
||||
.as_any()
|
||||
.downcast_ref::<DictionaryArray<UInt16Type>>()
|
||||
.downcast_ref::<PrimaryKeyArray>()
|
||||
.with_context(|| InvalidRecordBatchSnafu {
|
||||
reason: format!("primary key array should not be {:?}", pk_array.data_type()),
|
||||
})?;
|
||||
@@ -255,7 +258,7 @@ impl ReadFormat {
|
||||
let end = offsets[i + 1];
|
||||
let rows_in_batch = end - start;
|
||||
let dict_key = keys.value(*start);
|
||||
let primary_key = pk_values.value(dict_key.into()).to_vec();
|
||||
let primary_key = pk_values.value(dict_key as usize).to_vec();
|
||||
|
||||
let mut builder = BatchBuilder::new(primary_key);
|
||||
builder
|
||||
@@ -524,7 +527,7 @@ impl ReadFormat {
|
||||
}
|
||||
|
||||
/// Compute offsets of different primary keys in the array.
|
||||
fn primary_key_offsets(pk_dict_array: &DictionaryArray<UInt16Type>) -> Result<Vec<usize>> {
|
||||
fn primary_key_offsets(pk_dict_array: &PrimaryKeyArray) -> Result<Vec<usize>> {
|
||||
if pk_dict_array.is_empty() {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
@@ -549,7 +552,7 @@ fn primary_key_offsets(pk_dict_array: &DictionaryArray<UInt16Type>) -> Result<Ve
|
||||
/// Creates a new array for specific `primary_key`.
|
||||
fn new_primary_key_array(primary_key: &[u8], num_rows: usize) -> ArrayRef {
|
||||
let values = Arc::new(BinaryArray::from_iter_values([primary_key]));
|
||||
let keys = UInt16Array::from_value(0, num_rows);
|
||||
let keys = UInt32Array::from_value(0, num_rows);
|
||||
|
||||
// Safety: The key index is valid.
|
||||
Arc::new(DictionaryArray::new(keys, values))
|
||||
@@ -627,7 +630,7 @@ mod tests {
|
||||
Field::new(
|
||||
"__primary_key",
|
||||
ArrowDataType::Dictionary(
|
||||
Box::new(ArrowDataType::UInt16),
|
||||
Box::new(ArrowDataType::UInt32),
|
||||
Box::new(ArrowDataType::Binary),
|
||||
),
|
||||
false,
|
||||
@@ -674,15 +677,15 @@ mod tests {
|
||||
assert_eq!(&expect, &array);
|
||||
}
|
||||
|
||||
fn build_test_pk_array(pk_row_nums: &[(Vec<u8>, usize)]) -> Arc<DictionaryArray<UInt16Type>> {
|
||||
fn build_test_pk_array(pk_row_nums: &[(Vec<u8>, usize)]) -> Arc<PrimaryKeyArray> {
|
||||
let values = Arc::new(BinaryArray::from_iter_values(
|
||||
pk_row_nums.iter().map(|v| &v.0),
|
||||
));
|
||||
let mut keys = vec![];
|
||||
for (index, num_rows) in pk_row_nums.iter().map(|v| v.1).enumerate() {
|
||||
keys.extend(std::iter::repeat(index as u16).take(num_rows));
|
||||
keys.extend(std::iter::repeat(index as u32).take(num_rows));
|
||||
}
|
||||
let keys = UInt16Array::from(keys);
|
||||
let keys = UInt32Array::from(keys);
|
||||
Arc::new(DictionaryArray::new(keys, values))
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user