From 025cae3679041c0be12c7cb0db6b0e3161dfac52 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Thu, 26 Jun 2025 11:31:29 +0000 Subject: [PATCH] poc/create-alter-for-metrics: - **Refactor `AccessLayerFactory`**: Updated the method for joining paths in `access_layer.rs` by replacing `join_dir` with `join_path` for file path construction. - **Enhance Testing**: Added comprehensive tests in `access_layer.rs` to verify the functionality of the Parquet writer, including writing multiple batches and handling provided timestamp ranges. - **Optimize `MetricsBatchBuilder`**: Simplified the loop in `batch_builder.rs` by removing unnecessary mutability in the encoder variable. Signed-off-by: Lei, HUANG --- src/servers/src/access_layer.rs | 203 ++++++++++++++++++++++++++++++- src/servers/src/batch_builder.rs | 3 +- 2 files changed, 198 insertions(+), 8 deletions(-) diff --git a/src/servers/src/access_layer.rs b/src/servers/src/access_layer.rs index b586634f8e..fa5c806637 100644 --- a/src/servers/src/access_layer.rs +++ b/src/servers/src/access_layer.rs @@ -19,22 +19,19 @@ use arrow::array::{ use arrow::datatypes::Int64Type; use arrow_schema::TimeUnit; use common_datasource::parquet_writer::AsyncWriter; -use common_time::range::TimestampRange; use datafusion::parquet::arrow::AsyncArrowWriter; -use datatypes::timestamp::TimestampSecond; use mito2::sst::file::{FileId, FileMeta}; use mito2::sst::parquet::{DEFAULT_ROW_GROUP_SIZE, PARQUET_METADATA_KEY}; use object_store::config::ObjectStoreConfig; -use object_store::util::join_dir; +use object_store::util::{join_dir, join_path}; use object_store::ObjectStore; use parquet::basic::{Compression, Encoding, ZstdLevel}; use parquet::file::metadata::KeyValue; use parquet::file::properties::WriterProperties; -use snafu::{OptionExt, ResultExt}; +use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; use store_api::metric_engine_consts::DATA_REGION_SUBDIR; use store_api::storage::RegionId; -use table::metadata::{TableId, TableInfoRef}; use crate::error; @@ -60,7 +57,7 @@ impl AccessLayerFactory { ) -> error::Result { let region_dir = build_data_region_dir(catalog, schema, region_metadata.region_id); let file_id = FileId::random(); - let file_path = join_dir(®ion_dir, &file_id.as_parquet()); + let file_path = join_path(®ion_dir, &file_id.as_parquet()); let writer = self .object_store .writer(&file_path) @@ -205,6 +202,18 @@ fn get_or_calculate_timestamp_range( #[cfg(test)] mod tests { + use std::sync::Arc; + + use api::v1::SemanticType; + use arrow::array::{Float64Array, StringArray}; + use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; + use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; + use common_time::Timestamp; + use datatypes::data_type::ConcreteDataType; + use datatypes::schema::ColumnSchema; + use object_store::services::MemoryConfig; + use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; + use super::*; #[test] @@ -212,4 +221,186 @@ mod tests { let result = build_data_region_dir("greptime", "public", RegionId::new(1024, 0)); assert_eq!(&result, "data/greptime/public/1024/1024_0000000000/data/"); } + + fn create_test_region_metadata() -> RegionMetadataRef { + let mut builder = RegionMetadataBuilder::new(RegionId::new(1024, 0)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + GREPTIME_TIMESTAMP, + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + GREPTIME_VALUE, + ConcreteDataType::float64_datatype(), + true, + ), + semantic_type: SemanticType::Field, + column_id: 2, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("tag", ConcreteDataType::string_datatype(), true), + semantic_type: SemanticType::Tag, + column_id: 3, + }) + .primary_key(vec![3]); + let metadata = builder.build().unwrap(); + Arc::new(metadata) + } + + fn create_test_record_batch() -> RecordBatch { + let schema = Arc::new(Schema::new(vec![ + Field::new( + GREPTIME_TIMESTAMP, + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new(GREPTIME_VALUE, DataType::Float64, true), + Field::new("tag", DataType::Utf8, true), + ])); + + let timestamp_array = TimestampMillisecondArray::from(vec![1000, 2000, 3000]); + let value_array = Float64Array::from(vec![Some(10.0), None, Some(30.0)]); + let tag_array = StringArray::from(vec![Some("a"), Some("b"), Some("c")]); + + RecordBatch::try_new( + schema, + vec![ + Arc::new(timestamp_array), + Arc::new(value_array), + Arc::new(tag_array), + ], + ) + .unwrap() + } + + #[tokio::test] + async fn test_parquet_writer_write_and_finish() { + let object_store = ObjectStore::from_config(MemoryConfig::default()) + .unwrap() + .finish(); + let factory = AccessLayerFactory { object_store }; + + let region_metadata = create_test_region_metadata(); + let mut writer = factory + .create_sst_writer("test_catalog", "test_schema", region_metadata.clone()) + .await + .unwrap(); + + let batch = create_test_record_batch(); + + // Test writing a record batch + writer.write_record_batch(&batch, None).await.unwrap(); + + // Test finishing the writer + let file_meta = writer.finish().await.unwrap(); + + assert_eq!(file_meta.region_id, RegionId::new(1024, 0)); + assert_eq!(file_meta.level, 0); + assert_eq!(file_meta.num_rows, 3); + assert_eq!(file_meta.num_row_groups, 1); + assert!(file_meta.file_size > 0); + + assert_eq!(file_meta.time_range.0, Timestamp::new_millisecond(1000)); + assert_eq!(file_meta.time_range.1, Timestamp::new_millisecond(3000)); + } + + #[tokio::test] + async fn test_parquet_writer_multiple_batches() { + let object_store = ObjectStore::from_config(MemoryConfig::default()) + .unwrap() + .finish(); + let factory = AccessLayerFactory { object_store }; + + let region_metadata = create_test_region_metadata(); + let mut writer = factory + .create_sst_writer("test_catalog", "test_schema", region_metadata.clone()) + .await + .unwrap(); + + // Write first batch + let batch1 = create_test_record_batch(); + writer.write_record_batch(&batch1, None).await.unwrap(); + + // Create second batch with different timestamp range + let schema = region_metadata.schema.arrow_schema().clone(); + let timestamp_array = TimestampMillisecondArray::from(vec![4000, 5000]); + let value_array = Float64Array::from(vec![Some(40.0), Some(50.0)]); + let tag_array = StringArray::from(vec![Some("d"), Some("e")]); + + let batch2 = RecordBatch::try_new( + schema, + vec![ + Arc::new(timestamp_array), + Arc::new(value_array), + Arc::new(tag_array), + ], + ) + .unwrap(); + + writer.write_record_batch(&batch2, None).await.unwrap(); + + let file_meta = writer.finish().await.unwrap(); + + // Should have combined rows from both batches + assert_eq!(file_meta.num_rows, 5); + assert_eq!(file_meta.time_range.0, Timestamp::new_millisecond(1000)); + assert_eq!(file_meta.time_range.1, Timestamp::new_millisecond(5000)); + } + + #[tokio::test] + async fn test_parquet_writer_with_provided_timestamp_range() { + let object_store = ObjectStore::from_config(MemoryConfig::default()) + .unwrap() + .finish(); + let factory = AccessLayerFactory { object_store }; + + let region_metadata = create_test_region_metadata(); + let mut writer = factory + .create_sst_writer("test_catalog", "test_schema", region_metadata.clone()) + .await + .unwrap(); + + let batch = create_test_record_batch(); + + // Provide explicit timestamp range that differs from actual data + let provided_range = (500, 6000); + writer + .write_record_batch(&batch, Some(provided_range)) + .await + .unwrap(); + + let file_meta = writer.finish().await.unwrap(); + + assert_eq!(file_meta.time_range.0, Timestamp::new_millisecond(500)); + assert_eq!(file_meta.time_range.1, Timestamp::new_millisecond(6000)); + } + + #[test] + fn test_get_or_calculate_timestamp_range_with_provided_range() { + let region_metadata = create_test_region_metadata(); + let batch = create_test_record_batch(); + + let provided_range = Some((100, 200)); + let result = get_or_calculate_timestamp_range(provided_range, &batch, ®ion_metadata); + + assert!(result.is_ok()); + assert_eq!(result.unwrap(), (100, 200)); + } + + #[test] + fn test_get_or_calculate_timestamp_range_calculated() { + let region_metadata = create_test_region_metadata(); + let batch = create_test_record_batch(); + + let result = get_or_calculate_timestamp_range(None, &batch, ®ion_metadata); + + assert!(result.is_ok()); + assert_eq!(result.unwrap(), (1000, 3000)); + } } diff --git a/src/servers/src/batch_builder.rs b/src/servers/src/batch_builder.rs index fde5dba616..161c929ef3 100644 --- a/src/servers/src/batch_builder.rs +++ b/src/servers/src/batch_builder.rs @@ -28,7 +28,6 @@ use common_meta::node_manager::NodeManagerRef; use common_query::prelude::{GREPTIME_PHYSICAL_TABLE, GREPTIME_TIMESTAMP, GREPTIME_VALUE}; use itertools::Itertools; use metric_engine::row_modifier::{RowModifier, RowsIter}; -use mito2::sst::file::FileTimeRange; use mito_codec::row_converter::SparsePrimaryKeyCodec; use operator::schema_helper::{ ensure_logical_tables_for_metrics, metadatas_for_region_ids, LogicalSchema, LogicalSchemas, @@ -282,7 +281,7 @@ impl MetricsBatchBuilder { /// Finishes current record batch builder and returns record batches grouped by physical table id. pub(crate) fn finish(self) -> error::Result> { let mut table_batches = HashMap::with_capacity(self.builders.len()); - for (physical_table_id, mut encoder) in self.builders { + for (physical_table_id, encoder) in self.builders { let rb = encoder.finish()?; if let Some(v) = rb { table_batches.insert(physical_table_id, v);