poc/create-alter-for-metrics:

- **Refactor `AccessLayerFactory`**: Updated the method for joining paths in `access_layer.rs` by replacing `join_dir` with `join_path` for file path construction.
 - **Enhance Testing**: Added comprehensive tests in `access_layer.rs` to verify the functionality of the Parquet writer, including writing multiple batches and handling provided timestamp ranges.
 - **Optimize `MetricsBatchBuilder`**: Simplified the loop in `batch_builder.rs` by removing unnecessary mutability in the encoder variable.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2025-06-26 11:31:29 +00:00
parent 68409e28ea
commit 025cae3679
2 changed files with 198 additions and 8 deletions

View File

@@ -19,22 +19,19 @@ use arrow::array::{
use arrow::datatypes::Int64Type;
use arrow_schema::TimeUnit;
use common_datasource::parquet_writer::AsyncWriter;
use common_time::range::TimestampRange;
use datafusion::parquet::arrow::AsyncArrowWriter;
use datatypes::timestamp::TimestampSecond;
use mito2::sst::file::{FileId, FileMeta};
use mito2::sst::parquet::{DEFAULT_ROW_GROUP_SIZE, PARQUET_METADATA_KEY};
use object_store::config::ObjectStoreConfig;
use object_store::util::join_dir;
use object_store::util::{join_dir, join_path};
use object_store::ObjectStore;
use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::file::metadata::KeyValue;
use parquet::file::properties::WriterProperties;
use snafu::{OptionExt, ResultExt};
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::DATA_REGION_SUBDIR;
use store_api::storage::RegionId;
use table::metadata::{TableId, TableInfoRef};
use crate::error;
@@ -60,7 +57,7 @@ impl AccessLayerFactory {
) -> error::Result<ParquetWriter> {
let region_dir = build_data_region_dir(catalog, schema, region_metadata.region_id);
let file_id = FileId::random();
let file_path = join_dir(&region_dir, &file_id.as_parquet());
let file_path = join_path(&region_dir, &file_id.as_parquet());
let writer = self
.object_store
.writer(&file_path)
@@ -205,6 +202,18 @@ fn get_or_calculate_timestamp_range(
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::SemanticType;
use arrow::array::{Float64Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_time::Timestamp;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use object_store::services::MemoryConfig;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use super::*;
#[test]
@@ -212,4 +221,186 @@ mod tests {
let result = build_data_region_dir("greptime", "public", RegionId::new(1024, 0));
assert_eq!(&result, "data/greptime/public/1024/1024_0000000000/data/");
}
fn create_test_region_metadata() -> RegionMetadataRef {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1024, 0));
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
GREPTIME_TIMESTAMP,
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
GREPTIME_VALUE,
ConcreteDataType::float64_datatype(),
true,
),
semantic_type: SemanticType::Field,
column_id: 2,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("tag", ConcreteDataType::string_datatype(), true),
semantic_type: SemanticType::Tag,
column_id: 3,
})
.primary_key(vec![3]);
let metadata = builder.build().unwrap();
Arc::new(metadata)
}
fn create_test_record_batch() -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
Field::new(
GREPTIME_TIMESTAMP,
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new(GREPTIME_VALUE, DataType::Float64, true),
Field::new("tag", DataType::Utf8, true),
]));
let timestamp_array = TimestampMillisecondArray::from(vec![1000, 2000, 3000]);
let value_array = Float64Array::from(vec![Some(10.0), None, Some(30.0)]);
let tag_array = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
RecordBatch::try_new(
schema,
vec![
Arc::new(timestamp_array),
Arc::new(value_array),
Arc::new(tag_array),
],
)
.unwrap()
}
#[tokio::test]
async fn test_parquet_writer_write_and_finish() {
let object_store = ObjectStore::from_config(MemoryConfig::default())
.unwrap()
.finish();
let factory = AccessLayerFactory { object_store };
let region_metadata = create_test_region_metadata();
let mut writer = factory
.create_sst_writer("test_catalog", "test_schema", region_metadata.clone())
.await
.unwrap();
let batch = create_test_record_batch();
// Test writing a record batch
writer.write_record_batch(&batch, None).await.unwrap();
// Test finishing the writer
let file_meta = writer.finish().await.unwrap();
assert_eq!(file_meta.region_id, RegionId::new(1024, 0));
assert_eq!(file_meta.level, 0);
assert_eq!(file_meta.num_rows, 3);
assert_eq!(file_meta.num_row_groups, 1);
assert!(file_meta.file_size > 0);
assert_eq!(file_meta.time_range.0, Timestamp::new_millisecond(1000));
assert_eq!(file_meta.time_range.1, Timestamp::new_millisecond(3000));
}
#[tokio::test]
async fn test_parquet_writer_multiple_batches() {
let object_store = ObjectStore::from_config(MemoryConfig::default())
.unwrap()
.finish();
let factory = AccessLayerFactory { object_store };
let region_metadata = create_test_region_metadata();
let mut writer = factory
.create_sst_writer("test_catalog", "test_schema", region_metadata.clone())
.await
.unwrap();
// Write first batch
let batch1 = create_test_record_batch();
writer.write_record_batch(&batch1, None).await.unwrap();
// Create second batch with different timestamp range
let schema = region_metadata.schema.arrow_schema().clone();
let timestamp_array = TimestampMillisecondArray::from(vec![4000, 5000]);
let value_array = Float64Array::from(vec![Some(40.0), Some(50.0)]);
let tag_array = StringArray::from(vec![Some("d"), Some("e")]);
let batch2 = RecordBatch::try_new(
schema,
vec![
Arc::new(timestamp_array),
Arc::new(value_array),
Arc::new(tag_array),
],
)
.unwrap();
writer.write_record_batch(&batch2, None).await.unwrap();
let file_meta = writer.finish().await.unwrap();
// Should have combined rows from both batches
assert_eq!(file_meta.num_rows, 5);
assert_eq!(file_meta.time_range.0, Timestamp::new_millisecond(1000));
assert_eq!(file_meta.time_range.1, Timestamp::new_millisecond(5000));
}
#[tokio::test]
async fn test_parquet_writer_with_provided_timestamp_range() {
let object_store = ObjectStore::from_config(MemoryConfig::default())
.unwrap()
.finish();
let factory = AccessLayerFactory { object_store };
let region_metadata = create_test_region_metadata();
let mut writer = factory
.create_sst_writer("test_catalog", "test_schema", region_metadata.clone())
.await
.unwrap();
let batch = create_test_record_batch();
// Provide explicit timestamp range that differs from actual data
let provided_range = (500, 6000);
writer
.write_record_batch(&batch, Some(provided_range))
.await
.unwrap();
let file_meta = writer.finish().await.unwrap();
assert_eq!(file_meta.time_range.0, Timestamp::new_millisecond(500));
assert_eq!(file_meta.time_range.1, Timestamp::new_millisecond(6000));
}
#[test]
fn test_get_or_calculate_timestamp_range_with_provided_range() {
let region_metadata = create_test_region_metadata();
let batch = create_test_record_batch();
let provided_range = Some((100, 200));
let result = get_or_calculate_timestamp_range(provided_range, &batch, &region_metadata);
assert!(result.is_ok());
assert_eq!(result.unwrap(), (100, 200));
}
#[test]
fn test_get_or_calculate_timestamp_range_calculated() {
let region_metadata = create_test_region_metadata();
let batch = create_test_record_batch();
let result = get_or_calculate_timestamp_range(None, &batch, &region_metadata);
assert!(result.is_ok());
assert_eq!(result.unwrap(), (1000, 3000));
}
}

View File

@@ -28,7 +28,6 @@ use common_meta::node_manager::NodeManagerRef;
use common_query::prelude::{GREPTIME_PHYSICAL_TABLE, GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use itertools::Itertools;
use metric_engine::row_modifier::{RowModifier, RowsIter};
use mito2::sst::file::FileTimeRange;
use mito_codec::row_converter::SparsePrimaryKeyCodec;
use operator::schema_helper::{
ensure_logical_tables_for_metrics, metadatas_for_region_ids, LogicalSchema, LogicalSchemas,
@@ -282,7 +281,7 @@ impl MetricsBatchBuilder {
/// Finishes current record batch builder and returns record batches grouped by physical table id.
pub(crate) fn finish(self) -> error::Result<HashMap<TableId, (RecordBatch, (i64, i64))>> {
let mut table_batches = HashMap::with_capacity(self.builders.len());
for (physical_table_id, mut encoder) in self.builders {
for (physical_table_id, encoder) in self.builders {
let rb = encoder.finish()?;
if let Some(v) = rb {
table_batches.insert(physical_table_id, v);