poc/create-alter-for-metrics:

### Commit Message

 Enhance Schema Handling and Add Telemetry Logging

 - **`access_layer.rs`**: Refactored to use `physical_schema` for schema creation and improved error handling with telemetry logging for batch writing.
 - **`batch_builder.rs`**: Introduced `physical_schema` function for schema creation and updated data structures to use `RegionId` for physical tables. Removed redundant schema function.
 - **`prom_store.rs`**: Added telemetry logging to track bulk mode usage and processing time.
 - **`prom_row_builder.rs`**: Implemented telemetry logging to measure elapsed time for key operations like table creation, metadata collection, and batch appending.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2025-06-28 09:58:33 +00:00
parent ecbf372de3
commit e4b048e788
4 changed files with 104 additions and 53 deletions

View File

@@ -33,6 +33,7 @@ use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::DATA_REGION_SUBDIR;
use store_api::storage::RegionId;
use crate::batch_builder::physical_schema;
use crate::error;
type AsyncParquetWriter = AsyncArrowWriter<AsyncWriter>;
@@ -64,7 +65,8 @@ impl AccessLayerFactory {
.writer(&file_path)
.await
.context(error::OpendalSnafu)?;
let schema = region_metadata.schema.arrow_schema().clone();
let schema = physical_schema();
let key_value_meta = KeyValue::new(
PARQUET_METADATA_KEY.to_string(),
@@ -104,10 +106,10 @@ impl ParquetWriter {
batch: &RecordBatch,
timestamp_range: Option<(i64, i64)>,
) -> error::Result<()> {
self.writer
.write(&batch)
.await
.context(error::ParquetSnafu)?;
if let Err(e) = self.writer.write(&batch).await.context(error::ParquetSnafu) {
common_telemetry::error!(e; "Region metadata: {:?}, batch schema: {:?}", self.region_metadata, batch.schema_ref());
return Err(e);
}
let (batch_min, batch_max) =
get_or_calculate_timestamp_range(timestamp_range, batch, &self.region_metadata)?;

View File

@@ -48,7 +48,8 @@ use crate::prom_row_builder::{PromCtx, TableBuilder};
pub struct MetricsBatchBuilder {
schema_helper: SchemaHelper,
builders: HashMap<String /*schema*/, HashMap<String /*logical table name*/, BatchEncoder>>,
builders:
HashMap<String /*schema*/, HashMap<RegionId /*physical table name*/, BatchEncoder>>,
partition_manager: PartitionRuleManagerRef,
node_manager: NodeManagerRef,
}
@@ -239,7 +240,13 @@ impl MetricsBatchBuilder {
current_catalog: Option<String>,
current_schema: Option<String>,
table_data: &mut HashMap<PromCtx, HashMap<String, TableBuilder>>,
physical_region_metadata: &HashMap<String, HashMap<String, (TableId, RegionMetadataRef)>>,
physical_region_metadata: &HashMap<
String, /*schema name*/
HashMap<
String, /*logical table name*/
(TableId /*logical table id*/, RegionMetadataRef),
>,
>,
) -> error::Result<()> {
for (ctx, tables_in_schema) in table_data {
for (logical_table_name, table) in tables_in_schema {
@@ -272,7 +279,7 @@ impl MetricsBatchBuilder {
.builders
.entry(schema.to_string())
.or_default()
.entry(logical_table_name.clone())
.entry(physical_table.region_id)
.or_insert_with(|| Self::create_sparse_encoder(&physical_table));
encoder.append_rows(*logical_table_id, std::mem::take(table))?;
}
@@ -286,18 +293,18 @@ impl MetricsBatchBuilder {
) -> error::Result<
HashMap<
String, /*schema name*/
HashMap<String /*logical table name*/, (RecordBatch, (i64, i64))>,
HashMap<RegionId /*physical region id*/, (RecordBatch, (i64, i64))>,
>,
> {
let mut table_batches: HashMap<String, HashMap<String, (RecordBatch, (i64, i64))>> =
let mut table_batches: HashMap<String, HashMap<RegionId, (RecordBatch, (i64, i64))>> =
HashMap::with_capacity(self.builders.len());
for (schema_name, schema_tables) in self.builders {
let schema_batches = table_batches.entry(schema_name).or_default();
for (logical_table_name, table_data) in schema_tables {
for (physical_region_id, table_data) in schema_tables {
let rb = table_data.finish()?;
if let Some(v) = rb {
schema_batches.entry(logical_table_name).insert_entry(v);
schema_batches.entry(physical_region_id).insert_entry(v);
}
}
}
@@ -336,36 +343,6 @@ impl BatchEncoder {
}
}
/// Creates the schema of output record batch.
fn schema() -> arrow::datatypes::SchemaRef {
Arc::new(arrow::datatypes::Schema::new(vec![
Field::new(GREPTIME_VALUE, arrow::datatypes::DataType::Float64, false),
Field::new(
GREPTIME_TIMESTAMP,
arrow::datatypes::DataType::Timestamp(
arrow::datatypes::TimeUnit::Millisecond,
None,
),
false,
),
Field::new(
PRIMARY_KEY_COLUMN_NAME,
arrow::datatypes::DataType::Binary,
false,
),
Field::new(
SEQUENCE_COLUMN_NAME,
arrow::datatypes::DataType::UInt64,
false,
),
Field::new(
OP_TYPE_COLUMN_NAME,
arrow::datatypes::DataType::UInt8,
false,
),
]))
}
fn append_rows(
&mut self,
logical_table_id: TableId,
@@ -447,7 +424,7 @@ impl BatchEncoder {
let value = compute::take(&value, &indices, None).context(error::ArrowSnafu)?;
let ts = compute::take(&timestamp, &indices, None).context(error::ArrowSnafu)?;
let pk = compute::take(&pk, &indices, None).context(error::ArrowSnafu)?;
let rb = RecordBatch::try_new(Self::schema(), vec![value, ts, pk, sequence, op_type])
let rb = RecordBatch::try_new(physical_schema(), vec![value, ts, pk, sequence, op_type])
.context(error::ArrowSnafu)?;
Ok(Some((rb, self.timestamp_range.unwrap())))
}
@@ -473,7 +450,7 @@ fn tags_to_logical_schemas(
.collect();
columns.push(ColumnSchema {
column_name: GREPTIME_TIMESTAMP.to_string(),
datatype: ColumnDataType::TimestampNanosecond as i32,
datatype: ColumnDataType::TimestampMillisecond as i32,
semantic_type: SemanticType::Timestamp as i32,
..Default::default()
});
@@ -495,3 +472,30 @@ fn tags_to_logical_schemas(
LogicalSchemas { schemas }
}
/// Creates the schema of output record batch.
pub fn physical_schema() -> arrow::datatypes::SchemaRef {
Arc::new(arrow::datatypes::Schema::new(vec![
Field::new(GREPTIME_VALUE, arrow::datatypes::DataType::Float64, false),
Field::new(
GREPTIME_TIMESTAMP,
arrow::datatypes::DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
false,
),
Field::new(
PRIMARY_KEY_COLUMN_NAME,
arrow::datatypes::DataType::Binary,
false,
),
Field::new(
SEQUENCE_COLUMN_NAME,
arrow::datatypes::DataType::UInt64,
false,
),
Field::new(
OP_TYPE_COLUMN_NAME,
arrow::datatypes::DataType::UInt8,
false,
),
]))
}

View File

@@ -24,7 +24,7 @@ use axum_extra::TypedHeader;
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_meta::node_manager::NodeManagerRef;
use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
use common_telemetry::tracing;
use common_telemetry::{info, tracing};
use hyper::HeaderMap;
use lazy_static::lazy_static;
use object_pool::Pool;
@@ -112,7 +112,7 @@ pub async fn remote_write(
pipeline_handler,
prom_store_with_metric_engine,
prom_validation_mode,
bulk_state: _,
bulk_state,
} = state;
if let Some(_vm_handshake) = params.get_vm_proto_version {
@@ -147,7 +147,8 @@ pub async fn remote_write(
processor.set_pipeline(pipeline_handler, query_ctx.clone(), pipeline_def);
}
if let Some(state) = state.bulk_state {
if let Some(state) = bulk_state {
info!("Use bulk mode");
let context = PromBulkContext {
schema_helper: state.schema_helper,
query_ctx: query_ctx.clone(),
@@ -166,6 +167,8 @@ pub async fn remote_write(
return Ok((StatusCode::NO_CONTENT, write_cost_header_map(0)).into_response());
}
info!("Use original mode");
let req =
decode_remote_write_request(is_zstd, body, prom_validation_mode, &mut processor).await?;

View File

@@ -15,11 +15,13 @@
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::string::ToString;
use std::time::Instant;
use api::prom_store::remote::Sample;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, Row, RowInsertRequest, Rows, SemanticType, Value};
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_telemetry::info;
use pipeline::{ContextOpt, ContextReq};
use prost::DecodeError;
@@ -104,9 +106,14 @@ impl TablesBuilder {
);
let mut tables = std::mem::take(&mut self.tables);
let start = Instant::now();
batch_builder
.create_or_alter_physical_tables(&tables, &bulk_ctx.query_ctx)
.await?;
info!(
"create_or_alter_physical_tables, elapsed time: {}ms",
start.elapsed().as_millis()
);
// Extract logical table names from tables for metadata collection
let current_schema = bulk_ctx.query_ctx.current_schema();
@@ -120,24 +127,52 @@ impl TablesBuilder {
})
.collect();
let start = Instant::now();
// Gather all region metadata for region 0 of physical tables.
let physical_region_metadata = batch_builder
.collect_physical_region_metadata(&logical_tables, &bulk_ctx.query_ctx)
.await?;
info!(
"collect_physical_region_metadata, elapsed time: {}ms",
start.elapsed().as_millis()
);
let start = Instant::now();
batch_builder
.append_rows_to_batch(None, None, &mut tables, &physical_region_metadata)
.await?;
let record_batches = batch_builder.finish()?;
info!(
"append_rows_to_batch, elapsed time: {}ms",
start.elapsed().as_millis()
);
let physical_region_id_to_meta = physical_region_metadata
.into_iter()
.map(|(schema_name, tables)| {
let region_id_to_meta = tables
.into_values()
.map(|(_, physical_region_meta)| {
(physical_region_meta.region_id, physical_region_meta)
})
.collect::<HashMap<_, _>>();
(schema_name, region_id_to_meta)
})
.collect::<HashMap<_, _>>();
let start = Instant::now();
let mut tables_per_schema = HashMap::with_capacity(record_batches.len());
for (schema_name, schema_batches) in record_batches {
let schema_regions = physical_region_metadata
let tables_in_schema = tables_per_schema.entry(schema_name.clone()).or_insert(0);
*tables_in_schema = *tables_in_schema + 1;
let schema_regions = physical_region_id_to_meta
.get(&schema_name)
.expect("physical region metadata not found");
for (logical_table_name, (rb, time_range)) in schema_batches {
let (_table_id, physical_region_metadata) = schema_regions
.get(&logical_table_name)
for (physical_region_id, (rb, time_range)) in schema_batches {
let physical_region_metadata = schema_regions
.get(&physical_region_id)
.expect("physical region metadata not found");
let mut writer = bulk_ctx
.access_layer_factory
@@ -149,9 +184,16 @@ impl TablesBuilder {
.await?;
writer.write_record_batch(&rb, Some(time_range)).await?;
let _file_meta = writer.finish().await?;
let file_meta = writer.finish().await?;
info!("file meta: {:?}", file_meta);
}
}
info!(
"upload sst files, elapsed time: {}ms, schema num: {} tables_per_schema: {:?}",
start.elapsed().as_millis(),
tables_per_schema.len(),
tables_per_schema
);
Ok(())
}
}