From e4b048e7882c273621f0787bb1a52f41317f094d Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Sat, 28 Jun 2025 09:58:33 +0000 Subject: [PATCH] poc/create-alter-for-metrics: ### Commit Message Enhance Schema Handling and Add Telemetry Logging - **`access_layer.rs`**: Refactored to use `physical_schema` for schema creation and improved error handling with telemetry logging for batch writing. - **`batch_builder.rs`**: Introduced `physical_schema` function for schema creation and updated data structures to use `RegionId` for physical tables. Removed redundant schema function. - **`prom_store.rs`**: Added telemetry logging to track bulk mode usage and processing time. - **`prom_row_builder.rs`**: Implemented telemetry logging to measure elapsed time for key operations like table creation, metadata collection, and batch appending. Signed-off-by: Lei, HUANG --- src/servers/src/access_layer.rs | 12 +++-- src/servers/src/batch_builder.rs | 82 +++++++++++++++-------------- src/servers/src/http/prom_store.rs | 9 ++-- src/servers/src/prom_row_builder.rs | 54 ++++++++++++++++--- 4 files changed, 104 insertions(+), 53 deletions(-) diff --git a/src/servers/src/access_layer.rs b/src/servers/src/access_layer.rs index a2db06b6be..ac6932c451 100644 --- a/src/servers/src/access_layer.rs +++ b/src/servers/src/access_layer.rs @@ -33,6 +33,7 @@ use store_api::metadata::RegionMetadataRef; use store_api::metric_engine_consts::DATA_REGION_SUBDIR; use store_api::storage::RegionId; +use crate::batch_builder::physical_schema; use crate::error; type AsyncParquetWriter = AsyncArrowWriter; @@ -64,7 +65,8 @@ impl AccessLayerFactory { .writer(&file_path) .await .context(error::OpendalSnafu)?; - let schema = region_metadata.schema.arrow_schema().clone(); + + let schema = physical_schema(); let key_value_meta = KeyValue::new( PARQUET_METADATA_KEY.to_string(), @@ -104,10 +106,10 @@ impl ParquetWriter { batch: &RecordBatch, timestamp_range: Option<(i64, i64)>, ) -> error::Result<()> { - self.writer - .write(&batch) - .await - .context(error::ParquetSnafu)?; + if let Err(e) = self.writer.write(&batch).await.context(error::ParquetSnafu) { + common_telemetry::error!(e; "Region metadata: {:?}, batch schema: {:?}", self.region_metadata, batch.schema_ref()); + return Err(e); + } let (batch_min, batch_max) = get_or_calculate_timestamp_range(timestamp_range, batch, &self.region_metadata)?; diff --git a/src/servers/src/batch_builder.rs b/src/servers/src/batch_builder.rs index 8c1d6051e7..7a71d060dc 100644 --- a/src/servers/src/batch_builder.rs +++ b/src/servers/src/batch_builder.rs @@ -48,7 +48,8 @@ use crate::prom_row_builder::{PromCtx, TableBuilder}; pub struct MetricsBatchBuilder { schema_helper: SchemaHelper, - builders: HashMap>, + builders: + HashMap>, partition_manager: PartitionRuleManagerRef, node_manager: NodeManagerRef, } @@ -239,7 +240,13 @@ impl MetricsBatchBuilder { current_catalog: Option, current_schema: Option, table_data: &mut HashMap>, - physical_region_metadata: &HashMap>, + physical_region_metadata: &HashMap< + String, /*schema name*/ + HashMap< + String, /*logical table name*/ + (TableId /*logical table id*/, RegionMetadataRef), + >, + >, ) -> error::Result<()> { for (ctx, tables_in_schema) in table_data { for (logical_table_name, table) in tables_in_schema { @@ -272,7 +279,7 @@ impl MetricsBatchBuilder { .builders .entry(schema.to_string()) .or_default() - .entry(logical_table_name.clone()) + .entry(physical_table.region_id) .or_insert_with(|| Self::create_sparse_encoder(&physical_table)); encoder.append_rows(*logical_table_id, std::mem::take(table))?; } @@ -286,18 +293,18 @@ impl MetricsBatchBuilder { ) -> error::Result< HashMap< String, /*schema name*/ - HashMap, + HashMap, >, > { - let mut table_batches: HashMap> = + let mut table_batches: HashMap> = HashMap::with_capacity(self.builders.len()); for (schema_name, schema_tables) in self.builders { let schema_batches = table_batches.entry(schema_name).or_default(); - for (logical_table_name, table_data) in schema_tables { + for (physical_region_id, table_data) in schema_tables { let rb = table_data.finish()?; if let Some(v) = rb { - schema_batches.entry(logical_table_name).insert_entry(v); + schema_batches.entry(physical_region_id).insert_entry(v); } } } @@ -336,36 +343,6 @@ impl BatchEncoder { } } - /// Creates the schema of output record batch. - fn schema() -> arrow::datatypes::SchemaRef { - Arc::new(arrow::datatypes::Schema::new(vec![ - Field::new(GREPTIME_VALUE, arrow::datatypes::DataType::Float64, false), - Field::new( - GREPTIME_TIMESTAMP, - arrow::datatypes::DataType::Timestamp( - arrow::datatypes::TimeUnit::Millisecond, - None, - ), - false, - ), - Field::new( - PRIMARY_KEY_COLUMN_NAME, - arrow::datatypes::DataType::Binary, - false, - ), - Field::new( - SEQUENCE_COLUMN_NAME, - arrow::datatypes::DataType::UInt64, - false, - ), - Field::new( - OP_TYPE_COLUMN_NAME, - arrow::datatypes::DataType::UInt8, - false, - ), - ])) - } - fn append_rows( &mut self, logical_table_id: TableId, @@ -447,7 +424,7 @@ impl BatchEncoder { let value = compute::take(&value, &indices, None).context(error::ArrowSnafu)?; let ts = compute::take(×tamp, &indices, None).context(error::ArrowSnafu)?; let pk = compute::take(&pk, &indices, None).context(error::ArrowSnafu)?; - let rb = RecordBatch::try_new(Self::schema(), vec![value, ts, pk, sequence, op_type]) + let rb = RecordBatch::try_new(physical_schema(), vec![value, ts, pk, sequence, op_type]) .context(error::ArrowSnafu)?; Ok(Some((rb, self.timestamp_range.unwrap()))) } @@ -473,7 +450,7 @@ fn tags_to_logical_schemas( .collect(); columns.push(ColumnSchema { column_name: GREPTIME_TIMESTAMP.to_string(), - datatype: ColumnDataType::TimestampNanosecond as i32, + datatype: ColumnDataType::TimestampMillisecond as i32, semantic_type: SemanticType::Timestamp as i32, ..Default::default() }); @@ -495,3 +472,30 @@ fn tags_to_logical_schemas( LogicalSchemas { schemas } } + +/// Creates the schema of output record batch. +pub fn physical_schema() -> arrow::datatypes::SchemaRef { + Arc::new(arrow::datatypes::Schema::new(vec![ + Field::new(GREPTIME_VALUE, arrow::datatypes::DataType::Float64, false), + Field::new( + GREPTIME_TIMESTAMP, + arrow::datatypes::DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None), + false, + ), + Field::new( + PRIMARY_KEY_COLUMN_NAME, + arrow::datatypes::DataType::Binary, + false, + ), + Field::new( + SEQUENCE_COLUMN_NAME, + arrow::datatypes::DataType::UInt64, + false, + ), + Field::new( + OP_TYPE_COLUMN_NAME, + arrow::datatypes::DataType::UInt8, + false, + ), + ])) +} diff --git a/src/servers/src/http/prom_store.rs b/src/servers/src/http/prom_store.rs index eec55e3ce2..43eb3689c3 100644 --- a/src/servers/src/http/prom_store.rs +++ b/src/servers/src/http/prom_store.rs @@ -24,7 +24,7 @@ use axum_extra::TypedHeader; use common_catalog::consts::DEFAULT_SCHEMA_NAME; use common_meta::node_manager::NodeManagerRef; use common_query::prelude::GREPTIME_PHYSICAL_TABLE; -use common_telemetry::tracing; +use common_telemetry::{info, tracing}; use hyper::HeaderMap; use lazy_static::lazy_static; use object_pool::Pool; @@ -112,7 +112,7 @@ pub async fn remote_write( pipeline_handler, prom_store_with_metric_engine, prom_validation_mode, - bulk_state: _, + bulk_state, } = state; if let Some(_vm_handshake) = params.get_vm_proto_version { @@ -147,7 +147,8 @@ pub async fn remote_write( processor.set_pipeline(pipeline_handler, query_ctx.clone(), pipeline_def); } - if let Some(state) = state.bulk_state { + if let Some(state) = bulk_state { + info!("Use bulk mode"); let context = PromBulkContext { schema_helper: state.schema_helper, query_ctx: query_ctx.clone(), @@ -166,6 +167,8 @@ pub async fn remote_write( return Ok((StatusCode::NO_CONTENT, write_cost_header_map(0)).into_response()); } + info!("Use original mode"); + let req = decode_remote_write_request(is_zstd, body, prom_validation_mode, &mut processor).await?; diff --git a/src/servers/src/prom_row_builder.rs b/src/servers/src/prom_row_builder.rs index c1b535fbe1..359e3982a0 100644 --- a/src/servers/src/prom_row_builder.rs +++ b/src/servers/src/prom_row_builder.rs @@ -15,11 +15,13 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use std::string::ToString; +use std::time::Instant; use api::prom_store::remote::Sample; use api::v1::value::ValueData; use api::v1::{ColumnDataType, ColumnSchema, Row, RowInsertRequest, Rows, SemanticType, Value}; use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; +use common_telemetry::info; use pipeline::{ContextOpt, ContextReq}; use prost::DecodeError; @@ -104,9 +106,14 @@ impl TablesBuilder { ); let mut tables = std::mem::take(&mut self.tables); + let start = Instant::now(); batch_builder .create_or_alter_physical_tables(&tables, &bulk_ctx.query_ctx) .await?; + info!( + "create_or_alter_physical_tables, elapsed time: {}ms", + start.elapsed().as_millis() + ); // Extract logical table names from tables for metadata collection let current_schema = bulk_ctx.query_ctx.current_schema(); @@ -120,24 +127,52 @@ impl TablesBuilder { }) .collect(); + let start = Instant::now(); // Gather all region metadata for region 0 of physical tables. let physical_region_metadata = batch_builder .collect_physical_region_metadata(&logical_tables, &bulk_ctx.query_ctx) .await?; + info!( + "collect_physical_region_metadata, elapsed time: {}ms", + start.elapsed().as_millis() + ); + + let start = Instant::now(); batch_builder .append_rows_to_batch(None, None, &mut tables, &physical_region_metadata) .await?; - let record_batches = batch_builder.finish()?; + info!( + "append_rows_to_batch, elapsed time: {}ms", + start.elapsed().as_millis() + ); + let physical_region_id_to_meta = physical_region_metadata + .into_iter() + .map(|(schema_name, tables)| { + let region_id_to_meta = tables + .into_values() + .map(|(_, physical_region_meta)| { + (physical_region_meta.region_id, physical_region_meta) + }) + .collect::>(); + (schema_name, region_id_to_meta) + }) + .collect::>(); + + let start = Instant::now(); + + let mut tables_per_schema = HashMap::with_capacity(record_batches.len()); for (schema_name, schema_batches) in record_batches { - let schema_regions = physical_region_metadata + let tables_in_schema = tables_per_schema.entry(schema_name.clone()).or_insert(0); + *tables_in_schema = *tables_in_schema + 1; + let schema_regions = physical_region_id_to_meta .get(&schema_name) .expect("physical region metadata not found"); - for (logical_table_name, (rb, time_range)) in schema_batches { - let (_table_id, physical_region_metadata) = schema_regions - .get(&logical_table_name) + for (physical_region_id, (rb, time_range)) in schema_batches { + let physical_region_metadata = schema_regions + .get(&physical_region_id) .expect("physical region metadata not found"); let mut writer = bulk_ctx .access_layer_factory @@ -149,9 +184,16 @@ impl TablesBuilder { .await?; writer.write_record_batch(&rb, Some(time_range)).await?; - let _file_meta = writer.finish().await?; + let file_meta = writer.finish().await?; + info!("file meta: {:?}", file_meta); } } + info!( + "upload sst files, elapsed time: {}ms, schema num: {} tables_per_schema: {:?}", + start.elapsed().as_millis(), + tables_per_schema.len(), + tables_per_schema + ); Ok(()) } }