From 3d81a17360fcd8c550d2a9838d7084aa75e0f7d8 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Thu, 26 Jun 2025 13:00:54 +0000 Subject: [PATCH] poc/create-alter-for-metrics: ### Commit Message Enhance Schema and Table Handling in MetricsBatchBuilder - **`access_layer.rs`**: Made `create_sst_writer` function public within the crate to facilitate SST writing. - **`batch_builder.rs`**: Updated `MetricsBatchBuilder` to handle builders as a nested `HashMap` for schemas and logical table names. Modified `finish` method to return record batches grouped by schema and logical table name. - **`prom_row_builder.rs`**: Renamed `as_record_batch` to `as_record_batches` and implemented logic to write record batches using `AccessLayerFactory`. - **`proto.rs`**: Updated method call from `as_record_batch` to `as_record_batches` to reflect the new method name. Signed-off-by: Lei, HUANG --- src/servers/src/access_layer.rs | 2 +- src/servers/src/batch_builder.rs | 30 +++++++++++++++++++++-------- src/servers/src/http/prom_store.rs | 2 ++ src/servers/src/prom_row_builder.rs | 26 +++++++++++++++++++++++-- src/servers/src/proto.rs | 2 +- 5 files changed, 50 insertions(+), 12 deletions(-) diff --git a/src/servers/src/access_layer.rs b/src/servers/src/access_layer.rs index fa5c806637..94113baa3e 100644 --- a/src/servers/src/access_layer.rs +++ b/src/servers/src/access_layer.rs @@ -49,7 +49,7 @@ impl AccessLayerFactory { Ok(Self { object_store }) } - async fn create_sst_writer( + pub(crate) async fn create_sst_writer( &self, catalog: &str, schema: &str, diff --git a/src/servers/src/batch_builder.rs b/src/servers/src/batch_builder.rs index 161c929ef3..8c1d6051e7 100644 --- a/src/servers/src/batch_builder.rs +++ b/src/servers/src/batch_builder.rs @@ -48,7 +48,7 @@ use crate::prom_row_builder::{PromCtx, TableBuilder}; pub struct MetricsBatchBuilder { schema_helper: SchemaHelper, - builders: HashMap, + builders: HashMap>, partition_manager: PartitionRuleManagerRef, node_manager: NodeManagerRef, } @@ -270,7 +270,9 @@ impl MetricsBatchBuilder { let encoder = self .builders - .entry(physical_table.region_id.table_id()) + .entry(schema.to_string()) + .or_default() + .entry(logical_table_name.clone()) .or_insert_with(|| Self::create_sparse_encoder(&physical_table)); encoder.append_rows(*logical_table_id, std::mem::take(table))?; } @@ -279,12 +281,24 @@ impl MetricsBatchBuilder { } /// Finishes current record batch builder and returns record batches grouped by physical table id. - pub(crate) fn finish(self) -> error::Result> { - let mut table_batches = HashMap::with_capacity(self.builders.len()); - for (physical_table_id, encoder) in self.builders { - let rb = encoder.finish()?; - if let Some(v) = rb { - table_batches.insert(physical_table_id, v); + pub(crate) fn finish( + self, + ) -> error::Result< + HashMap< + String, /*schema name*/ + HashMap, + >, + > { + let mut table_batches: HashMap> = + HashMap::with_capacity(self.builders.len()); + + for (schema_name, schema_tables) in self.builders { + let schema_batches = table_batches.entry(schema_name).or_default(); + for (logical_table_name, table_data) in schema_tables { + let rb = table_data.finish()?; + if let Some(v) = rb { + schema_batches.entry(logical_table_name).insert_entry(v); + } } } Ok(table_batches) diff --git a/src/servers/src/http/prom_store.rs b/src/servers/src/http/prom_store.rs index 3b83788655..7aaf38cebb 100644 --- a/src/servers/src/http/prom_store.rs +++ b/src/servers/src/http/prom_store.rs @@ -37,6 +37,7 @@ use serde::{Deserialize, Serialize}; use session::context::{Channel, QueryContext, QueryContextRef}; use snafu::prelude::*; +use crate::access_layer::AccessLayerFactory; use crate::error::{self, InternalSnafu, PipelineSnafu, Result}; use crate::http::extractor::PipelineInfo; use crate::http::header::{write_cost_header_map, GREPTIME_DB_HEADER_METRICS}; @@ -221,6 +222,7 @@ pub struct PromBulkContext { pub(crate) query_ctx: QueryContextRef, pub(crate) partition_manager: PartitionRuleManagerRef, pub(crate) node_manager: NodeManagerRef, + pub(crate) access_layer_factory: AccessLayerFactory, } async fn decode_remote_write_request( diff --git a/src/servers/src/prom_row_builder.rs b/src/servers/src/prom_row_builder.rs index 6db3da31a7..c1b535fbe1 100644 --- a/src/servers/src/prom_row_builder.rs +++ b/src/servers/src/prom_row_builder.rs @@ -96,7 +96,7 @@ impl TablesBuilder { } /// Converts [TablesBuilder] to record batch and clears inner states. - pub(crate) async fn as_record_batch(&mut self, bulk_ctx: &PromBulkContext) -> Result<()> { + pub(crate) async fn as_record_batches(&mut self, bulk_ctx: &PromBulkContext) -> Result<()> { let mut batch_builder = MetricsBatchBuilder::new( bulk_ctx.schema_helper.clone(), bulk_ctx.partition_manager.clone(), @@ -130,7 +130,29 @@ impl TablesBuilder { .await?; let record_batches = batch_builder.finish()?; - todo!() + + for (schema_name, schema_batches) in record_batches { + let schema_regions = physical_region_metadata + .get(&schema_name) + .expect("physical region metadata not found"); + for (logical_table_name, (rb, time_range)) in schema_batches { + let (_table_id, physical_region_metadata) = schema_regions + .get(&logical_table_name) + .expect("physical region metadata not found"); + let mut writer = bulk_ctx + .access_layer_factory + .create_sst_writer( + "greptime", //todo(hl): use the catalog name in query context. + &schema_name, + physical_region_metadata.clone(), + ) + .await?; + + writer.write_record_batch(&rb, Some(time_range)).await?; + let _file_meta = writer.finish().await?; + } + } + Ok(()) } } diff --git a/src/servers/src/proto.rs b/src/servers/src/proto.rs index c257700fc3..bbd4a7f0dc 100644 --- a/src/servers/src/proto.rs +++ b/src/servers/src/proto.rs @@ -356,7 +356,7 @@ impl PromWriteRequest { /// Converts the write request into a record batch and reset the table data. pub async fn as_record_batch(&mut self, bulk_ctx: &PromBulkContext) -> Result<()> { - self.table_data.as_record_batch(bulk_ctx).await + self.table_data.as_record_batches(bulk_ctx).await } }