feat(mito): Implement SST format for mito2 (#2178)

* chore: update comment

* feat: stream writer takes arrow's types

* feat: Define Batch struct

* feat: arrow_schema_to_store

* refactor: rename

* feat: write parquet in new format with tsids

* feat: reader support projection

* feat: Impl read compat

* refactor: rename SchemaCompat to CompatRecordBatch

* feat: changing sst format

* feat: make it compile

* feat: remove tsid and some structs

* feat: from_sst_record_batch wip

* chore: push array

* chore: wip

* feat: decode batches from RecordBatch

* feat: reader converts record batches

* feat: remove compat mod

* chore: remove some codes

* feat: sort fields by column id

* test: test to_sst_arrow_schema

* feat: do not sort fields

* test: more test helpers

* feat: simplify projection

* fix: projection indices is incorrect

* refactor: define write/read format

* test: test write format

* test: test projection

* test: test convert record batch

* feat: remove unused errors

* refactor: wrap get_field_batch_columns

* chore: clippy

* chore: fix clippy

* feat: build arrow schema from region meta in ReadFormat

* feat: initialize the parquet reader at `build()`

* chore: fix typo
This commit is contained in:
Yingwen
2023-08-17 14:25:50 +08:00
committed by GitHub
parent 832e5dcfd7
commit 4ba12155fe
8 changed files with 917 additions and 109 deletions

View File

@@ -309,8 +309,28 @@ pub enum Error {
location: Location,
},
#[snafu(display("Invalid parquet SST file {}, reason: {}", file, reason))]
InvalidParquet {
file: String,
reason: String,
location: Location,
},
#[snafu(display("Invalid batch, {}, location: {}", reason, location))]
InvalidBatch { reason: String, location: Location },
#[snafu(display("Invalid arrow record batch, {}, location: {}", reason, location))]
InvalidRecordBatch { reason: String, location: Location },
#[snafu(display(
"Failed to convert array to vector, location: {}, source: {}",
location,
source
))]
ConvertVector {
location: Location,
source: datatypes::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -341,7 +361,8 @@ impl ErrorExt for Error {
| NewRecordBatch { .. }
| RegionNotFound { .. }
| RegionCorrupted { .. }
| CreateDefault { .. } => StatusCode::Unexpected,
| CreateDefault { .. }
| InvalidParquet { .. } => StatusCode::Unexpected,
InvalidScanIndex { .. }
| InvalidMeta { .. }
| InvalidSchema { .. }
@@ -362,6 +383,8 @@ impl ErrorExt for Error {
NotSupportedField { .. } => StatusCode::Unsupported,
DeserializeField { .. } => StatusCode::Unexpected,
InvalidBatch { .. } => StatusCode::InvalidArguments,
InvalidRecordBatch { .. } => StatusCode::InvalidArguments,
ConvertVector { source, .. } => source.status_code(),
}
}

View File

@@ -18,12 +18,15 @@ use std::sync::Arc;
use async_trait::async_trait;
use common_time::Timestamp;
use datatypes::vectors::{UInt64Vector, UInt8Vector, Vector, VectorRef};
use snafu::ensure;
use datatypes::arrow;
use datatypes::arrow::array::ArrayRef;
use datatypes::prelude::DataType;
use datatypes::vectors::{Helper, UInt64Vector, UInt8Vector, Vector, VectorRef};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use crate::error::{InvalidBatchSnafu, Result};
use crate::error::{ConvertVectorSnafu, InvalidBatchSnafu, Result};
/// Storage internal representation of a batch of rows
/// for a primary key (time series).
@@ -56,7 +59,7 @@ impl Batch {
op_types: Arc<UInt8Vector>,
fields: Vec<BatchColumn>,
) -> Result<Batch> {
BatchBuilder::new(primary_key, timestamps, sequences, op_types)
BatchBuilder::with_required_columns(primary_key, timestamps, sequences, op_types)
.with_fields(fields)
.build()
}
@@ -111,15 +114,26 @@ pub struct BatchColumn {
/// Builder to build [Batch].
pub struct BatchBuilder {
primary_key: Vec<u8>,
timestamps: VectorRef,
sequences: Arc<UInt64Vector>,
op_types: Arc<UInt8Vector>,
timestamps: Option<VectorRef>,
sequences: Option<Arc<UInt64Vector>>,
op_types: Option<Arc<UInt8Vector>>,
fields: Vec<BatchColumn>,
}
impl BatchBuilder {
/// Creates a new [BatchBuilder].
pub fn new(
/// Creates a new [BatchBuilder] with primary key.
pub fn new(primary_key: Vec<u8>) -> BatchBuilder {
BatchBuilder {
primary_key,
timestamps: None,
sequences: None,
op_types: None,
fields: Vec::new(),
}
}
/// Creates a new [BatchBuilder] with all required columns.
pub fn with_required_columns(
primary_key: Vec<u8>,
timestamps: VectorRef,
sequences: Arc<UInt64Vector>,
@@ -127,9 +141,9 @@ impl BatchBuilder {
) -> BatchBuilder {
BatchBuilder {
primary_key,
timestamps,
sequences,
op_types,
timestamps: Some(timestamps),
sequences: Some(sequences),
op_types: Some(op_types),
fields: Vec::new(),
}
}
@@ -146,25 +160,90 @@ impl BatchBuilder {
self
}
/// Push an array as a field.
pub fn push_field_array(&mut self, column_id: ColumnId, array: ArrayRef) -> Result<&mut Self> {
let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
self.fields.push(BatchColumn {
column_id,
data: vector,
});
Ok(self)
}
/// Try to set an array as timestamps.
pub fn timestamps_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
let vector = Helper::try_into_vector(array).context(ConvertVectorSnafu)?;
ensure!(
vector.data_type().is_timestamp_compatible(),
InvalidBatchSnafu {
reason: format!("{:?} is a timestamp type", vector.data_type()),
}
);
self.timestamps = Some(vector);
Ok(self)
}
/// Try to set an array as sequences.
pub fn sequences_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
ensure!(
*array.data_type() == arrow::datatypes::DataType::UInt64,
InvalidBatchSnafu {
reason: "sequence array is not UInt64 type",
}
);
// Safety: The cast must success as we have ensured it is uint64 type.
let vector = Arc::new(UInt64Vector::try_from_arrow_array(array).unwrap());
self.sequences = Some(vector);
Ok(self)
}
/// Try to set an array as op types.
pub fn op_types_array(&mut self, array: ArrayRef) -> Result<&mut Self> {
ensure!(
*array.data_type() == arrow::datatypes::DataType::UInt8,
InvalidBatchSnafu {
reason: "sequence array is not UInt8 type",
}
);
// Safety: The cast must success as we have ensured it is uint64 type.
let vector = Arc::new(UInt8Vector::try_from_arrow_array(array).unwrap());
self.op_types = Some(vector);
Ok(self)
}
/// Builds the [Batch].
pub fn build(self) -> Result<Batch> {
let ts_len = self.timestamps.len();
let timestamps = self.timestamps.context(InvalidBatchSnafu {
reason: "missing timestamps",
})?;
let sequences = self.sequences.context(InvalidBatchSnafu {
reason: "missing sequences",
})?;
let op_types = self.op_types.context(InvalidBatchSnafu {
reason: "missing op_types",
})?;
let ts_len = timestamps.len();
ensure!(
self.sequences.len() == ts_len,
sequences.len() == ts_len,
InvalidBatchSnafu {
reason: format!(
"sequence have different len {} != {}",
self.sequences.len(),
sequences.len(),
ts_len
),
}
);
ensure!(
self.op_types.len() == ts_len,
op_types.len() == ts_len,
InvalidBatchSnafu {
reason: format!(
"op type have different len {} != {}",
self.op_types.len(),
op_types.len(),
ts_len
),
}
@@ -185,9 +264,9 @@ impl BatchBuilder {
Ok(Batch {
primary_key: self.primary_key,
timestamps: self.timestamps,
sequences: self.sequences,
op_types: self.op_types,
timestamps,
sequences,
op_types,
fields: self.fields,
})
}
@@ -232,8 +311,12 @@ impl Source {
}
/// Async batch reader.
///
/// The reader must guarantee [Batch]es returned by it have the same schema.
#[async_trait]
pub trait BatchReader: Send {
// TODO(yingwen): fields of the batch returned.
/// Fetch next [Batch].
///
/// Returns `Ok(None)` when the reader has reached its end and calling `next_batch()`

View File

@@ -14,6 +14,7 @@
//! SST in parquet format.
mod format;
mod reader;
mod writer;

View File

@@ -0,0 +1,581 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Format to store in parquet.
//!
//! We store three internal columns in parquet:
//! - `__primary_key`, the primary key of the row (tags). Type: dictionary(uint16, binary)
//! - `__sequence`, the sequence number of a row. Type: uint64
//! - `__op_type`, the op type of the row. Type: uint8
//!
//! The schema of a parquet file is:
//! ```text
//! field 0, field 1, ..., field N, time index, primary key, sequence, op type
//! ```
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::SemanticType;
use datatypes::arrow::array::{ArrayRef, BinaryArray, DictionaryArray, UInt16Array};
use datatypes::arrow::datatypes::{
DataType, Field, FieldRef, Fields, Schema, SchemaRef, UInt16Type,
};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::vectors::{Helper, Vector};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::consts::{
OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
};
use store_api::storage::ColumnId;
use crate::error::{
ConvertVectorSnafu, InvalidBatchSnafu, InvalidRecordBatchSnafu, NewRecordBatchSnafu, Result,
};
use crate::read::{Batch, BatchBuilder, BatchColumn};
/// Number of columns that have fixed positions.
///
/// Contains: time index and internal columns.
const FIXED_POS_COLUMN_NUM: usize = 4;
/// Helper for writing the SST format.
pub(crate) struct WriteFormat {
metadata: RegionMetadataRef,
/// SST file schema.
arrow_schema: SchemaRef,
}
impl WriteFormat {
/// Creates a new helper.
pub(crate) fn new(metadata: RegionMetadataRef) -> WriteFormat {
let arrow_schema = to_sst_arrow_schema(&metadata);
WriteFormat {
metadata,
arrow_schema,
}
}
/// Gets the arrow schema to store in parquet.
pub(crate) fn arrow_schema(&self) -> SchemaRef {
self.arrow_schema.clone()
}
/// Convert `batch` to a arrow record batch to store in parquet.
pub(crate) fn convert_batch(&self, batch: &Batch) -> Result<RecordBatch> {
debug_assert_eq!(
batch.fields().len() + FIXED_POS_COLUMN_NUM,
self.arrow_schema.fields().len()
);
let mut columns = Vec::with_capacity(batch.fields().len() + FIXED_POS_COLUMN_NUM);
// Store all fields first.
for (column, column_metadata) in batch.fields().iter().zip(self.metadata.field_columns()) {
ensure!(
column.column_id == column_metadata.column_id,
InvalidBatchSnafu {
reason: format!(
"Batch has column {} but metadata has column {}",
column.column_id, column_metadata.column_id
),
}
);
columns.push(column.data.to_arrow_array());
}
// Add time index column.
columns.push(batch.timestamps().to_arrow_array());
// Add internal columns: primary key, sequences, op types.
columns.push(new_primary_key_array(batch.primary_key(), batch.num_rows()));
columns.push(batch.sequences().to_arrow_array());
columns.push(batch.op_types().to_arrow_array());
RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
}
}
/// Helper for reading the SST format.
pub(crate) struct ReadFormat {
metadata: RegionMetadataRef,
/// SST file schema.
arrow_schema: SchemaRef,
// Field column id to its index in `schema` (SST schema).
field_id_to_index: HashMap<ColumnId, usize>,
}
impl ReadFormat {
/// Creates a helper with existing `metadata`.
pub(crate) fn new(metadata: RegionMetadataRef) -> ReadFormat {
let field_id_to_index: HashMap<_, _> = metadata
.field_columns()
.enumerate()
.map(|(index, column)| (column.column_id, index))
.collect();
let arrow_schema = to_sst_arrow_schema(&metadata);
ReadFormat {
metadata,
arrow_schema,
field_id_to_index,
}
}
/// Gets the converted arrow schema.
pub(crate) fn arrow_schema(&self) -> &SchemaRef {
&self.arrow_schema
}
/// Gets sorted projection indices to read `columns` from parquet files.
///
/// This function ignores columns not in `metadata` to for compatibility between
/// different schemas.
pub(crate) fn projection_indices(
&self,
columns: impl IntoIterator<Item = ColumnId>,
) -> Vec<usize> {
let mut indices: Vec<_> = columns
.into_iter()
.filter_map(|column_id| {
// Only apply projection to fields.
self.field_id_to_index.get(&column_id).copied()
})
// We need to add all fixed position columns.
.chain(
self.arrow_schema.fields.len() - FIXED_POS_COLUMN_NUM
..self.arrow_schema.fields.len(),
)
.collect();
indices.sort_unstable();
indices
}
/// Convert a arrow record batch into `batches`.
///
/// Note that the `record_batch` may only contains a subset of columns if it is projected.
pub(crate) fn convert_record_batch(
&self,
record_batch: &RecordBatch,
batches: &mut Vec<Batch>,
) -> Result<()> {
debug_assert!(batches.is_empty());
// The record batch must has time index and internal columns.
ensure!(
record_batch.num_columns() >= FIXED_POS_COLUMN_NUM,
InvalidRecordBatchSnafu {
reason: format!(
"record batch only has {} columns",
record_batch.num_columns()
),
}
);
let mut fixed_pos_columns = record_batch
.columns()
.iter()
.rev()
.take(FIXED_POS_COLUMN_NUM);
// Safety: We have checked the column number.
let op_type_array = fixed_pos_columns.next().unwrap();
let sequence_array = fixed_pos_columns.next().unwrap();
let pk_array = fixed_pos_columns.next().unwrap();
let ts_array = fixed_pos_columns.next().unwrap();
let field_batch_columns = self.get_field_batch_columns(record_batch)?;
// Compute primary key offsets.
let pk_dict_array = pk_array
.as_any()
.downcast_ref::<DictionaryArray<UInt16Type>>()
.with_context(|| InvalidRecordBatchSnafu {
reason: format!("primary key array should not be {:?}", pk_array.data_type()),
})?;
let offsets = primary_key_offsets(pk_dict_array)?;
if offsets.is_empty() {
return Ok(());
}
// Split record batch according to pk offsets.
let keys = pk_dict_array.keys();
let pk_values = pk_dict_array
.values()
.as_any()
.downcast_ref::<BinaryArray>()
.with_context(|| InvalidRecordBatchSnafu {
reason: format!(
"values of primary key array should not be {:?}",
pk_dict_array.values().data_type()
),
})?;
for (i, start) in offsets[..offsets.len() - 1].iter().enumerate() {
let end = offsets[i + 1];
let rows_in_batch = end - start;
let dict_key = keys.value(*start);
let primary_key = pk_values.value(dict_key.into()).to_vec();
let mut builder = BatchBuilder::new(primary_key);
builder
.timestamps_array(ts_array.slice(*start, rows_in_batch))?
.sequences_array(sequence_array.slice(*start, rows_in_batch))?
.op_types_array(op_type_array.slice(*start, rows_in_batch))?;
// Push all fields
for batch_column in &field_batch_columns {
builder.push_field(BatchColumn {
column_id: batch_column.column_id,
data: batch_column.data.slice(*start, rows_in_batch),
});
}
let batch = builder.build()?;
batches.push(batch);
}
Ok(())
}
/// Get fields from `record_batch`.
fn get_field_batch_columns(&self, record_batch: &RecordBatch) -> Result<Vec<BatchColumn>> {
record_batch
.columns()
.iter()
.zip(record_batch.schema().fields())
.take(record_batch.num_columns() - FIXED_POS_COLUMN_NUM) // Take all field columns.
.map(|(array, field)| {
let vector = Helper::try_into_vector(array.clone()).context(ConvertVectorSnafu)?;
let column = self
.metadata
.column_by_name(field.name())
.with_context(|| InvalidRecordBatchSnafu {
reason: format!("column {} not found in metadata", field.name()),
})?;
Ok(BatchColumn {
column_id: column.column_id,
data: vector,
})
})
.collect()
}
}
/// Gets the arrow schema to store in parquet.
fn to_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef {
let fields = Fields::from_iter(
metadata
.schema
.arrow_schema()
.fields()
.iter()
.zip(&metadata.column_metadatas)
.filter_map(|(field, column_meta)| {
if column_meta.semantic_type == SemanticType::Field {
Some(field.clone())
} else {
// We have fixed positions for tags (primary key) and time index.
None
}
})
.chain([metadata.time_index_field()])
.chain(internal_fields()),
);
Arc::new(Schema::new(fields))
}
/// Compute offsets of different primary keys in the array.
fn primary_key_offsets(pk_dict_array: &DictionaryArray<UInt16Type>) -> Result<Vec<usize>> {
if pk_dict_array.is_empty() {
return Ok(Vec::new());
}
// Init offsets.
let mut offsets = vec![0];
let keys = pk_dict_array.keys();
// We know that primary keys are always not null so we iterate `keys.values()` directly.
let pk_indices = keys.values();
for (i, key) in pk_indices.iter().take(keys.len() - 1).enumerate() {
// Compare each key with next key
if *key != pk_indices[i + 1] {
// We meet a new key, push the next index as end of the offset.
offsets.push(i + 1);
}
}
offsets.push(keys.len());
Ok(offsets)
}
/// Fields for internal columns.
fn internal_fields() -> [FieldRef; 3] {
// Internal columns are always not null.
[
Arc::new(Field::new_dictionary(
PRIMARY_KEY_COLUMN_NAME,
DataType::UInt16,
DataType::Binary,
false,
)),
Arc::new(Field::new(SEQUENCE_COLUMN_NAME, DataType::UInt64, false)),
Arc::new(Field::new(OP_TYPE_COLUMN_NAME, DataType::UInt8, false)),
]
}
/// Creates a new array for specific `primary_key`.
fn new_primary_key_array(primary_key: &[u8], num_rows: usize) -> ArrayRef {
let values = Arc::new(BinaryArray::from_iter_values([primary_key]));
let keys = UInt16Array::from_value(0, num_rows);
// Safety: The key index is valid.
Arc::new(DictionaryArray::new(keys, values))
}
#[cfg(test)]
mod tests {
use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt64Array, UInt8Array};
use datatypes::arrow::datatypes::TimeUnit;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt64Vector, UInt8Vector};
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;
use super::*;
const TEST_SEQUENCE: u64 = 1;
const TEST_OP_TYPE: u8 = 1;
fn build_test_region_metadata() -> RegionMetadataRef {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("tag0", ConcreteDataType::int64_datatype(), true),
semantic_type: SemanticType::Tag,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"field1",
ConcreteDataType::int64_datatype(),
true,
),
semantic_type: SemanticType::Field,
column_id: 4, // We change the order of fields columns.
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("tag1", ConcreteDataType::int64_datatype(), true),
semantic_type: SemanticType::Tag,
column_id: 3,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"field0",
ConcreteDataType::int64_datatype(),
true,
),
semantic_type: SemanticType::Field,
column_id: 2,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 5,
})
.primary_key(vec![1, 3]);
Arc::new(builder.build().unwrap())
}
fn build_test_arrow_schema() -> SchemaRef {
let fields = vec![
Field::new("field1", DataType::Int64, true),
Field::new("field0", DataType::Int64, true),
Field::new(
"ts",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new(
"__primary_key",
DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Binary)),
false,
),
Field::new("__sequence", DataType::UInt64, false),
Field::new("__op_type", DataType::UInt8, false),
];
Arc::new(Schema::new(fields))
}
fn new_batch(primary_key: &[u8], start_ts: i64, start_field: i64, num_rows: usize) -> Batch {
let ts_values = (0..num_rows).map(|i| start_ts + i as i64);
let timestamps = Arc::new(TimestampMillisecondVector::from_values(ts_values));
let sequences = Arc::new(UInt64Vector::from_vec(vec![TEST_SEQUENCE; num_rows]));
let op_types = Arc::new(UInt8Vector::from_vec(vec![TEST_OP_TYPE; num_rows]));
let fields = vec![
BatchColumn {
column_id: 4,
data: Arc::new(Int64Vector::from_vec(vec![start_field; num_rows])),
}, // field1
BatchColumn {
column_id: 2,
data: Arc::new(Int64Vector::from_vec(vec![start_field + 1; num_rows])),
}, // field0
];
BatchBuilder::with_required_columns(primary_key.to_vec(), timestamps, sequences, op_types)
.with_fields(fields)
.build()
.unwrap()
}
#[test]
fn test_to_sst_arrow_schema() {
let metadata = build_test_region_metadata();
let write_format = WriteFormat::new(metadata);
assert_eq!(build_test_arrow_schema(), write_format.arrow_schema());
}
#[test]
fn test_new_primary_key_array() {
let array = new_primary_key_array(b"test", 3);
let expect = build_test_pk_array(&[(b"test".to_vec(), 3)]) as ArrayRef;
assert_eq!(&expect, &array);
}
fn build_test_pk_array(pk_row_nums: &[(Vec<u8>, usize)]) -> Arc<DictionaryArray<UInt16Type>> {
let values = Arc::new(BinaryArray::from_iter_values(
pk_row_nums.iter().map(|v| &v.0),
));
let mut keys = vec![];
for (index, num_rows) in pk_row_nums.iter().map(|v| v.1).enumerate() {
keys.extend(std::iter::repeat(index as u16).take(num_rows));
}
let keys = UInt16Array::from(keys);
Arc::new(DictionaryArray::new(keys, values))
}
#[test]
fn test_convert_batch() {
let metadata = build_test_region_metadata();
let write_format = WriteFormat::new(metadata);
let num_rows = 4;
let batch = new_batch(b"test", 1, 2, num_rows);
let columns: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from(vec![2; num_rows])), // field1
Arc::new(Int64Array::from(vec![3; num_rows])), // field0
Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // primary key
Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), // sequence
Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
];
let expect_record = RecordBatch::try_new(build_test_arrow_schema(), columns).unwrap();
let actual = write_format.convert_batch(&batch).unwrap();
assert_eq!(expect_record, actual);
}
#[test]
fn test_projection_indices() {
let metadata = build_test_region_metadata();
let read_format = ReadFormat::new(metadata);
// Only read tag1
assert_eq!(vec![2, 3, 4, 5], read_format.projection_indices([3]));
// Only read field1
assert_eq!(vec![0, 2, 3, 4, 5], read_format.projection_indices([4]));
// Only read ts
assert_eq!(vec![2, 3, 4, 5], read_format.projection_indices([5]));
// Read field0, tag0, ts
assert_eq!(
vec![1, 2, 3, 4, 5],
read_format.projection_indices([2, 1, 5])
);
}
#[test]
fn test_empty_primary_key_offsets() {
let array = build_test_pk_array(&[]);
assert!(primary_key_offsets(&array).unwrap().is_empty());
}
#[test]
fn test_primary_key_offsets_one_series() {
let array = build_test_pk_array(&[(b"one".to_vec(), 1)]);
assert_eq!(vec![0, 1], primary_key_offsets(&array).unwrap());
let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 1)]);
assert_eq!(vec![0, 1, 2], primary_key_offsets(&array).unwrap());
let array = build_test_pk_array(&[
(b"one".to_vec(), 1),
(b"two".to_vec(), 1),
(b"three".to_vec(), 1),
]);
assert_eq!(vec![0, 1, 2, 3], primary_key_offsets(&array).unwrap());
}
#[test]
fn test_primary_key_offsets_multi_series() {
let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 3)]);
assert_eq!(vec![0, 1, 4], primary_key_offsets(&array).unwrap());
let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 1)]);
assert_eq!(vec![0, 3, 4], primary_key_offsets(&array).unwrap());
let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 3)]);
assert_eq!(vec![0, 3, 6], primary_key_offsets(&array).unwrap());
}
#[test]
fn test_convert_empty_record_batch() {
let metadata = build_test_region_metadata();
let arrow_schema = build_test_arrow_schema();
let read_format = ReadFormat::new(metadata);
assert_eq!(arrow_schema, *read_format.arrow_schema());
let record_batch = RecordBatch::new_empty(arrow_schema);
let mut batches = vec![];
read_format
.convert_record_batch(&record_batch, &mut batches)
.unwrap();
assert!(batches.is_empty());
}
#[test]
fn test_convert_record_batch() {
let metadata = build_test_region_metadata();
let read_format = ReadFormat::new(metadata);
let columns: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from(vec![1, 1, 10, 10])), // field1
Arc::new(Int64Array::from(vec![2, 2, 11, 11])), // field0
Arc::new(TimestampMillisecondArray::from(vec![1, 2, 11, 12])), // ts
build_test_pk_array(&[(b"one".to_vec(), 2), (b"two".to_vec(), 2)]), // primary key
Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; 4])), // sequence
Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; 4])), // op type
];
let arrow_schema = build_test_arrow_schema();
let record_batch = RecordBatch::try_new(arrow_schema, columns).unwrap();
let mut batches = vec![];
read_format
.convert_record_batch(&record_batch, &mut batches)
.unwrap();
assert_eq!(
vec![new_batch(b"one", 1, 1, 2), new_batch(b"two", 11, 10, 2)],
batches
);
}
}

View File

@@ -14,6 +14,8 @@
//! Parquet reader.
use std::sync::Arc;
use async_compat::CompatExt;
use async_trait::async_trait;
use common_time::range::TimestampRange;
@@ -21,15 +23,22 @@ use datatypes::arrow::record_batch::RecordBatch;
use futures::stream::BoxStream;
use futures::TryStreamExt;
use object_store::ObjectStore;
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::errors::ParquetError;
use snafu::ResultExt;
use parquet::format::KeyValue;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::RegionMetadata;
use store_api::storage::ColumnId;
use table::predicate::Predicate;
use tokio::io::BufReader;
use crate::error::{OpenDalSnafu, ReadParquetSnafu, Result};
use crate::error::{
InvalidMetadataSnafu, InvalidParquetSnafu, OpenDalSnafu, ReadParquetSnafu, Result,
};
use crate::read::{Batch, BatchReader};
use crate::sst::file::FileHandle;
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::PARQUET_METADATA_KEY;
/// Parquet SST reader builder.
pub struct ParquetReaderBuilder {
@@ -38,6 +47,7 @@ pub struct ParquetReaderBuilder {
object_store: ObjectStore,
predicate: Option<Predicate>,
time_range: Option<TimestampRange>,
projection: Option<Vec<ColumnId>>,
}
impl ParquetReaderBuilder {
@@ -53,6 +63,7 @@ impl ParquetReaderBuilder {
object_store,
predicate: None,
time_range: None,
projection: None,
}
}
@@ -68,23 +79,126 @@ impl ParquetReaderBuilder {
self
}
/// Builds a [ParquetReader].
pub fn build(self) -> ParquetReader {
/// Attaches the projection to the builder.
///
/// The reader only applies the projection to fields.
pub fn projection(mut self, projection: Vec<ColumnId>) -> ParquetReaderBuilder {
self.projection = Some(projection);
self
}
/// Builds and initializes a [ParquetReader].
///
/// This needs to perform IO operation.
pub async fn build(self) -> Result<ParquetReader> {
let file_path = self.file_handle.file_path(&self.file_dir);
ParquetReader {
let (stream, read_format) = self.init_stream(&file_path).await?;
Ok(ParquetReader {
file_path,
file_handle: self.file_handle,
object_store: self.object_store,
predicate: self.predicate,
time_range: self.time_range,
stream: None,
projection: self.projection,
stream,
read_format,
batches: Vec::new(),
})
}
/// Initializes the parquet stream, also creates a [ReadFormat] to decode record batches.
async fn init_stream(&self, file_path: &str) -> Result<(BoxedRecordBatchStream, ReadFormat)> {
// Creates parquet stream builder.
let reader = self
.object_store
.reader(file_path)
.await
.context(OpenDalSnafu)?
.compat();
let buf_reader = BufReader::new(reader);
let mut builder = ParquetRecordBatchStreamBuilder::new(buf_reader)
.await
.context(ReadParquetSnafu { path: file_path })?;
// Decode region metadata.
let key_value_meta = builder.metadata().file_metadata().key_value_metadata();
let region_meta = self.get_region_metadata(file_path, key_value_meta)?;
// Prune row groups by metadata.
if let Some(predicate) = &self.predicate {
// TODO(yingwen): Now we encode tags into the full primary key so we need some approach
// to implement pruning.
let pruned_row_groups = predicate
.prune_row_groups(builder.metadata().row_groups())
.into_iter()
.enumerate()
.filter_map(|(idx, valid)| if valid { Some(idx) } else { None })
.collect::<Vec<_>>();
builder = builder.with_row_groups(pruned_row_groups);
}
let read_format = ReadFormat::new(Arc::new(region_meta));
// The arrow schema converted from the region meta should be the same as parquet's.
// We only compare fields to avoid schema's metadata breaks the comparison.
ensure!(
read_format.arrow_schema().fields() == builder.schema().fields(),
InvalidParquetSnafu {
file: file_path,
reason: format!(
"schema mismatch, expect: {:?}, given: {:?}",
read_format.arrow_schema().fields(),
builder.schema().fields()
)
}
);
let parquet_schema_desc = builder.metadata().file_metadata().schema_descr();
if let Some(column_ids) = self.projection.as_ref() {
let indices = read_format.projection_indices(column_ids.iter().copied());
let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices);
builder = builder.with_projection(projection_mask);
}
let stream = builder
.build()
.context(ReadParquetSnafu { path: file_path })?;
Ok((Box::pin(stream), read_format))
}
/// Decode region metadata from key value.
fn get_region_metadata(
&self,
file_path: &str,
key_value_meta: Option<&Vec<KeyValue>>,
) -> Result<RegionMetadata> {
let key_values = key_value_meta.context(InvalidParquetSnafu {
file: file_path,
reason: "missing key value meta",
})?;
let meta_value = key_values
.iter()
.find(|kv| kv.key == PARQUET_METADATA_KEY)
.with_context(|| InvalidParquetSnafu {
file: file_path,
reason: format!("key {} not found", PARQUET_METADATA_KEY),
})?;
let json = meta_value
.value
.as_ref()
.with_context(|| InvalidParquetSnafu {
file: file_path,
reason: format!("No value for key {}", PARQUET_METADATA_KEY),
})?;
RegionMetadata::from_json(json).context(InvalidMetadataSnafu)
}
}
type BoxedRecordBatchStream = BoxStream<'static, std::result::Result<RecordBatch, ParquetError>>;
/// Parquet batch reader.
/// Parquet batch reader to read our SST format.
pub struct ParquetReader {
/// Path of the file.
file_path: String,
@@ -97,75 +211,42 @@ pub struct ParquetReader {
predicate: Option<Predicate>,
/// Time range to filter.
time_range: Option<TimestampRange>,
/// Metadata of columns to read.
///
/// `None` reads all columns. Due to schema change, the projection
/// can contain columns not in the parquet file.
projection: Option<Vec<ColumnId>>,
/// Inner parquet record batch stream.
stream: Option<BoxedRecordBatchStream>,
}
impl ParquetReader {
/// Initializes the reader and the parquet stream.
async fn maybe_init(&mut self) -> Result<()> {
if self.stream.is_some() {
// Already initialized.
return Ok(());
}
let reader = self
.object_store
.reader(&self.file_path)
.await
.context(OpenDalSnafu)?
.compat();
let buf_reader = BufReader::new(reader);
let mut builder = ParquetRecordBatchStreamBuilder::new(buf_reader)
.await
.context(ReadParquetSnafu {
path: &self.file_path,
})?;
// TODO(yingwen): Decode region metadata, create read adapter.
// Prune row groups by metadata.
if let Some(predicate) = &self.predicate {
let pruned_row_groups = predicate
.prune_row_groups(builder.metadata().row_groups())
.into_iter()
.enumerate()
.filter_map(|(idx, valid)| if valid { Some(idx) } else { None })
.collect::<Vec<_>>();
builder = builder.with_row_groups(pruned_row_groups);
}
// TODO(yingwen): Projection.
let stream = builder.build().context(ReadParquetSnafu {
path: &self.file_path,
})?;
self.stream = Some(Box::pin(stream));
Ok(())
}
/// Converts our [Batch] from arrow's [RecordBatch].
fn convert_arrow_record_batch(&self, _record_batch: RecordBatch) -> Result<Batch> {
unimplemented!()
}
stream: BoxedRecordBatchStream,
/// Helper to read record batches.
///
/// Not `None` if [ParquetReader::stream] is not `None`.
read_format: ReadFormat,
/// Buffered batches to return.
batches: Vec<Batch>,
}
#[async_trait]
impl BatchReader for ParquetReader {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
self.maybe_init().await?;
if let Some(batch) = self.batches.pop() {
return Ok(Some(batch));
}
self.stream
.as_mut()
.unwrap()
.try_next()
.await
.context(ReadParquetSnafu {
path: &self.file_path,
})?
.map(|rb| self.convert_arrow_record_batch(rb))
.transpose()
// We need to fetch next record batch and convert it to batches.
let Some(record_batch) = self.stream.try_next().await.context(ReadParquetSnafu {
path: &self.file_path,
})?
else {
return Ok(None);
};
self.read_format
.convert_record_batch(&record_batch, &mut self.batches)?;
// Reverse batches so we could pop it.
self.batches.reverse();
Ok(self.batches.pop())
}
}

View File

@@ -15,15 +15,17 @@
//! Parquet writer.
use common_telemetry::debug;
use datatypes::arrow::record_batch::RecordBatch;
use object_store::ObjectStore;
use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::file::metadata::KeyValue;
use parquet::file::properties::WriterProperties;
use parquet::schema::types::ColumnPath;
use snafu::ResultExt;
use store_api::storage::consts::SEQUENCE_COLUMN_NAME;
use crate::error::{InvalidMetadataSnafu, NewRecordBatchSnafu, Result};
use crate::error::{InvalidMetadataSnafu, Result};
use crate::read::Source;
use crate::sst::parquet::format::WriteFormat;
use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY};
use crate::sst::stream_writer::BufferedWriter;
@@ -54,38 +56,40 @@ impl<'a> ParquetWriter<'a> {
let json = metadata.to_json().context(InvalidMetadataSnafu)?;
let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
let ts_column = metadata.time_index_column();
// FIXME(yingwen): encode metadata into key value.
// TODO(yingwen): Find and set proper column encoding for internal columns: op type and tsid.
let props_builder = WriterProperties::builder()
.set_key_value_metadata(Some(vec![key_value_meta]))
.set_compression(Compression::ZSTD(ZstdLevel::default()))
.set_encoding(Encoding::PLAIN)
.set_max_row_group_size(opts.row_group_size);
// TODO(yingwen): Set column encoding for internal columns and timestamp.
// e.g. Use DELTA_BINARY_PACKED and disable dictionary for sequence.
.set_max_row_group_size(opts.row_group_size)
.set_column_encoding(
ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]),
Encoding::DELTA_BINARY_PACKED,
)
.set_column_dictionary_enabled(
ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]),
false,
)
.set_column_encoding(
ColumnPath::new(vec![ts_column.column_schema.name.clone()]),
Encoding::DELTA_BINARY_PACKED,
);
let writer_props = props_builder.build();
let arrow_schema = metadata.schema.arrow_schema();
let write_format = WriteFormat::new(metadata);
let mut buffered_writer = BufferedWriter::try_new(
self.file_path.to_string(),
self.object_store.clone(),
arrow_schema.clone(),
write_format.arrow_schema(),
Some(writer_props),
opts.write_buffer_size.as_bytes() as usize,
)
.await?;
while let Some(batch) = self.source.next_batch().await? {
let arrow_batch = RecordBatch::try_new(
arrow_schema.clone(),
batch
.fields()
.iter()
.map(|v| v.data.to_arrow_array())
.collect::<Vec<_>>(),
)
.context(NewRecordBatchSnafu)?;
let arrow_batch = write_format.convert_batch(&batch)?;
buffered_writer.write(&arrow_batch).await?;
}

View File

@@ -23,6 +23,7 @@ use std::sync::Arc;
use api::v1::SemanticType;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use datatypes::arrow::datatypes::FieldRef;
use datatypes::prelude::DataType;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use serde::de::Error;
@@ -124,6 +125,11 @@ impl<'de> Deserialize<'de> for RegionMetadata {
}
impl RegionMetadata {
/// Decode the metadata from a JSON str.
pub fn from_json(s: &str) -> Result<Self> {
serde_json::from_str(s).context(SerdeJsonSnafu)
}
/// Encode the metadata to a JSON string.
pub fn to_json(&self) -> Result<String> {
serde_json::to_string(&self).context(SerdeJsonSnafu)
@@ -136,6 +142,11 @@ impl RegionMetadata {
.map(|index| &self.column_metadatas[*index])
}
/// Find column index by id.
pub fn column_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
self.id_to_index.get(&column_id).copied()
}
/// Returns the time index column
///
/// # Panics
@@ -145,6 +156,26 @@ impl RegionMetadata {
&self.column_metadatas[index]
}
/// Returns the arrow field of the time index column.
pub fn time_index_field(&self) -> FieldRef {
let index = self.id_to_index[&self.time_index];
self.schema.arrow_schema().fields[index].clone()
}
/// Finds a column by name.
pub fn column_by_name(&self, name: &str) -> Option<&ColumnMetadata> {
self.schema
.column_index_by_name(name)
.map(|index| &self.column_metadatas[index])
}
/// Returns all field columns.
pub fn field_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
self.column_metadatas
.iter()
.filter(|column| column.semantic_type == SemanticType::Field)
}
/// Checks whether the metadata is valid.
fn validate(&self) -> Result<()> {
// Id to name.
@@ -264,6 +295,7 @@ impl RegionMetadata {
/// Checks whether it is a valid column.
fn validate_column_metadata(column_metadata: &ColumnMetadata) -> Result<()> {
// TODO(yingwen): Ensure column name is not internal columns.
if column_metadata.semantic_type == SemanticType::Timestamp {
ensure!(
column_metadata

View File

@@ -81,6 +81,9 @@ pub const SEQUENCE_COLUMN_NAME: &str = "__sequence";
/// Name for reserved column: op_type
pub const OP_TYPE_COLUMN_NAME: &str = "__op_type";
/// Name for reserved column: primary_key
pub const PRIMARY_KEY_COLUMN_NAME: &str = "__primary_key";
// -----------------------------------------------------------------------------
// ---------- Default options --------------------------------------------------