poc/create-alter-for-metrics:

### Commit Message

 Enhance Schema and Table Handling in MetricsBatchBuilder

 - **`access_layer.rs`**: Made `create_sst_writer` function public within the crate to facilitate SST writing.
 - **`batch_builder.rs`**: Updated `MetricsBatchBuilder` to handle builders as a nested `HashMap` for schemas and logical table names. Modified `finish` method to return record batches grouped by schema and logical table name.
 - **`prom_row_builder.rs`**: Renamed `as_record_batch` to `as_record_batches` and implemented logic to write record batches using `AccessLayerFactory`.
 - **`proto.rs`**: Updated method call from `as_record_batch` to `as_record_batches` to reflect the new method name.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2025-06-26 13:00:54 +00:00
parent 025cae3679
commit 3d81a17360
5 changed files with 50 additions and 12 deletions

View File

@@ -49,7 +49,7 @@ impl AccessLayerFactory {
Ok(Self { object_store })
}
async fn create_sst_writer(
pub(crate) async fn create_sst_writer(
&self,
catalog: &str,
schema: &str,

View File

@@ -48,7 +48,7 @@ use crate::prom_row_builder::{PromCtx, TableBuilder};
pub struct MetricsBatchBuilder {
schema_helper: SchemaHelper,
builders: HashMap<TableId, BatchEncoder>,
builders: HashMap<String /*schema*/, HashMap<String /*logical table name*/, BatchEncoder>>,
partition_manager: PartitionRuleManagerRef,
node_manager: NodeManagerRef,
}
@@ -270,7 +270,9 @@ impl MetricsBatchBuilder {
let encoder = self
.builders
.entry(physical_table.region_id.table_id())
.entry(schema.to_string())
.or_default()
.entry(logical_table_name.clone())
.or_insert_with(|| Self::create_sparse_encoder(&physical_table));
encoder.append_rows(*logical_table_id, std::mem::take(table))?;
}
@@ -279,12 +281,24 @@ impl MetricsBatchBuilder {
}
/// Finishes current record batch builder and returns record batches grouped by physical table id.
pub(crate) fn finish(self) -> error::Result<HashMap<TableId, (RecordBatch, (i64, i64))>> {
let mut table_batches = HashMap::with_capacity(self.builders.len());
for (physical_table_id, encoder) in self.builders {
let rb = encoder.finish()?;
if let Some(v) = rb {
table_batches.insert(physical_table_id, v);
pub(crate) fn finish(
self,
) -> error::Result<
HashMap<
String, /*schema name*/
HashMap<String /*logical table name*/, (RecordBatch, (i64, i64))>,
>,
> {
let mut table_batches: HashMap<String, HashMap<String, (RecordBatch, (i64, i64))>> =
HashMap::with_capacity(self.builders.len());
for (schema_name, schema_tables) in self.builders {
let schema_batches = table_batches.entry(schema_name).or_default();
for (logical_table_name, table_data) in schema_tables {
let rb = table_data.finish()?;
if let Some(v) = rb {
schema_batches.entry(logical_table_name).insert_entry(v);
}
}
}
Ok(table_batches)

View File

@@ -37,6 +37,7 @@ use serde::{Deserialize, Serialize};
use session::context::{Channel, QueryContext, QueryContextRef};
use snafu::prelude::*;
use crate::access_layer::AccessLayerFactory;
use crate::error::{self, InternalSnafu, PipelineSnafu, Result};
use crate::http::extractor::PipelineInfo;
use crate::http::header::{write_cost_header_map, GREPTIME_DB_HEADER_METRICS};
@@ -221,6 +222,7 @@ pub struct PromBulkContext {
pub(crate) query_ctx: QueryContextRef,
pub(crate) partition_manager: PartitionRuleManagerRef,
pub(crate) node_manager: NodeManagerRef,
pub(crate) access_layer_factory: AccessLayerFactory,
}
async fn decode_remote_write_request(

View File

@@ -96,7 +96,7 @@ impl TablesBuilder {
}
/// Converts [TablesBuilder] to record batch and clears inner states.
pub(crate) async fn as_record_batch(&mut self, bulk_ctx: &PromBulkContext) -> Result<()> {
pub(crate) async fn as_record_batches(&mut self, bulk_ctx: &PromBulkContext) -> Result<()> {
let mut batch_builder = MetricsBatchBuilder::new(
bulk_ctx.schema_helper.clone(),
bulk_ctx.partition_manager.clone(),
@@ -130,7 +130,29 @@ impl TablesBuilder {
.await?;
let record_batches = batch_builder.finish()?;
todo!()
for (schema_name, schema_batches) in record_batches {
let schema_regions = physical_region_metadata
.get(&schema_name)
.expect("physical region metadata not found");
for (logical_table_name, (rb, time_range)) in schema_batches {
let (_table_id, physical_region_metadata) = schema_regions
.get(&logical_table_name)
.expect("physical region metadata not found");
let mut writer = bulk_ctx
.access_layer_factory
.create_sst_writer(
"greptime", //todo(hl): use the catalog name in query context.
&schema_name,
physical_region_metadata.clone(),
)
.await?;
writer.write_record_batch(&rb, Some(time_range)).await?;
let _file_meta = writer.finish().await?;
}
}
Ok(())
}
}

View File

@@ -356,7 +356,7 @@ impl PromWriteRequest {
/// Converts the write request into a record batch and reset the table data.
pub async fn as_record_batch(&mut self, bulk_ctx: &PromBulkContext) -> Result<()> {
self.table_data.as_record_batch(bulk_ctx).await
self.table_data.as_record_batches(bulk_ctx).await
}
}