poc/create-alter-for-metrics:

### Commit Message

 Enhance Prometheus Bulk Write Handling

 - **`server.rs`**: Introduced `start_background_task` in `PromBulkState` to handle asynchronous batch processing and SST file writing. Added a new `tx` field to manage task communication.
 - **`access_layer.rs`**: Added `file_id` method to `ParquetWriter` for file identification.
 - **`batch_builder.rs`**: Modified `MetricsBatchBuilder` to utilize session catalog and schema, and updated batch processing logic to handle column metadata.
 - **`prom_store.rs`**: Updated `remote_write` to use `decode_remote_write_request_to_batch` for batch processing and send data to the background task.
 - **`prom_row_builder.rs`**: Made `TableBuilder` and `TablesBuilder` fields public for external access.
 - **`proto.rs`**: Exposed `table_data` in `PromWriteRequest` for batch processing.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2025-06-30 08:27:26 +00:00
parent 4ad40af468
commit 7e79b4b2f6
6 changed files with 209 additions and 25 deletions

View File

@@ -97,12 +97,15 @@ where
if opts.prom_store.enable {
let bulk_state = if opts.prom_store.bulk_mode {
Some(PromBulkState {
let mut state = PromBulkState {
schema_helper: self.instance.create_schema_helper(),
partition_manager: self.instance.partition_manager().clone(),
node_manager: self.instance.node_manager().clone(),
access_layer_factory: self.instance.access_layer_factory().clone(),
})
tx: None,
};
state.start_background_task();
Some(state)
} else {
None
};

View File

@@ -100,6 +100,12 @@ pub struct ParquetWriter {
timestamp_range: Option<(i64, i64)>,
}
impl ParquetWriter {
pub(crate) fn file_id(&self) -> FileId {
self.file_id
}
}
impl ParquetWriter {
pub async fn write_record_batch(
&mut self,

View File

@@ -250,24 +250,25 @@ impl MetricsBatchBuilder {
>,
) -> error::Result<()> {
for (ctx, tables_in_schema) in table_data {
// use session catalog.
let catalog = current_catalog.as_deref().unwrap_or(DEFAULT_CATALOG_NAME);
// schema in PromCtx precedes session schema.
let schema = ctx
.schema
.as_deref()
.or(current_schema.as_deref())
.unwrap_or(DEFAULT_SCHEMA_NAME);
// Look up physical region metadata by schema and table name
let schema_metadata =
physical_region_metadata
.get(schema)
.context(error::TableNotFoundSnafu {
catalog,
schema,
table: "",
})?;
for (logical_table_name, table) in tables_in_schema {
// use session catalog.
let catalog = current_catalog.as_deref().unwrap_or(DEFAULT_CATALOG_NAME);
// schema in PromCtx precedes session schema.
let schema = ctx
.schema
.as_deref()
.or(current_schema.as_deref())
.unwrap_or(DEFAULT_SCHEMA_NAME);
// Look up physical region metadata by schema and table name
let schema_metadata =
physical_region_metadata
.get(schema)
.context(error::TableNotFoundSnafu {
catalog,
schema,
table: logical_table_name,
})?;
let (logical_table_id, physical_table) = schema_metadata
.get(logical_table_name)
.context(error::TableNotFoundSnafu {
@@ -282,6 +283,12 @@ impl MetricsBatchBuilder {
.or_default()
.entry(physical_table.region_id)
.or_insert_with(|| Self::create_sparse_encoder(&physical_table));
let name_to_id: HashMap<_, _> = physical_table
.column_metadatas
.iter()
.map(|c| (c.column_schema.name.clone(), c.column_id))
.collect();
let _ = std::mem::replace(encoder.name_to_id_mut(), name_to_id);
encoder.append_rows(*logical_table_id, std::mem::take(table))?;
}
}
@@ -443,6 +450,10 @@ impl BatchEncoder {
.sum()
}
pub(crate) fn name_to_id_mut(&mut self) -> &mut HashMap<String, ColumnId> {
&mut self.name_to_id
}
fn append_rows(
&mut self,
logical_table_id: TableId,

View File

@@ -12,7 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use api::prom_store::remote::ReadRequest;
use axum::body::Bytes;
@@ -36,12 +38,15 @@ use prost::Message;
use serde::{Deserialize, Serialize};
use session::context::{Channel, QueryContext, QueryContextRef};
use snafu::prelude::*;
use tokio::sync::mpsc::Sender;
use crate::access_layer::AccessLayerFactory;
use crate::batch_builder::MetricsBatchBuilder;
use crate::error::{self, InternalSnafu, PipelineSnafu, Result};
use crate::http::extractor::PipelineInfo;
use crate::http::header::{write_cost_header_map, GREPTIME_DB_HEADER_METRICS};
use crate::http::PromValidationMode;
use crate::prom_row_builder::{PromCtx, TableBuilder, TablesBuilder};
use crate::prom_store::{snappy_decompress, zstd_decompress};
use crate::proto::{PromSeriesProcessor, PromWriteRequest};
use crate::query_handler::{PipelineHandlerRef, PromStoreProtocolHandlerRef, PromStoreResponse};
@@ -63,6 +68,12 @@ pub struct PromBulkState {
pub partition_manager: PartitionRuleManagerRef,
pub node_manager: NodeManagerRef,
pub access_layer_factory: AccessLayerFactory,
pub tx: Option<
Sender<(
QueryContextRef,
HashMap<PromCtx, HashMap<String, TableBuilder>>,
)>,
>,
}
#[derive(Clone)]
@@ -74,6 +85,152 @@ pub struct PromStoreState {
pub bulk_state: Option<PromBulkState>,
}
impl PromBulkState {
pub fn start_background_task(&mut self) {
let (tx, mut rx) = tokio::sync::mpsc::channel::<(
QueryContextRef,
HashMap<PromCtx, HashMap<String, TableBuilder>>,
)>(16);
self.tx = Some(tx);
let schema_helper = self.schema_helper.clone();
let partition_manager = self.partition_manager.clone();
let node_manager = self.node_manager.clone();
let access_layer_factory = self.access_layer_factory.clone();
let handle = tokio::spawn(async move {
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
let start = Instant::now();
let mut batch_builder = MetricsBatchBuilder::new(
schema_helper.clone(),
partition_manager.clone(),
node_manager.clone(),
);
let mut physical_region_metadata_total = HashMap::new();
let mut num_batches = 0;
while let Some((query_context, mut tables)) = rx.recv().await {
batch_builder
.create_or_alter_physical_tables(&tables, &query_context)
.await
.unwrap();
info!(
"create_or_alter_physical_tables, elapsed time: {}ms",
start.elapsed().as_millis()
);
// Extract logical table names from tables for metadata collection
let current_schema = query_context.current_schema();
let logical_tables: Vec<(String, String)> = tables
.iter()
.flat_map(|(ctx, table_map)| {
let schema = ctx.schema.as_deref().unwrap_or(&current_schema);
table_map
.keys()
.map(|table_name| (schema.to_string(), table_name.clone()))
})
.collect();
let start = Instant::now();
// Gather all region metadata for region 0 of physical tables.
let physical_region_metadata = batch_builder
.collect_physical_region_metadata(&logical_tables, &query_context)
.await
.unwrap();
physical_region_metadata_total.extend(physical_region_metadata);
info!(
"collect_physical_region_metadata, elapsed time: {}ms",
start.elapsed().as_millis()
);
let start = Instant::now();
batch_builder
.append_rows_to_batch(
None,
None,
&mut tables,
&physical_region_metadata_total,
)
.await
.unwrap();
num_batches += 1;
info!(
"append_rows_to_batch, elapsed time: {}ms, batches: {}",
num_batches,
start.elapsed().as_millis()
);
if num_batches >= 10 {
break;
}
}
let start = Instant::now();
let record_batches = batch_builder.finish().unwrap();
let physical_region_id_to_meta = physical_region_metadata_total
.into_iter()
.map(|(schema_name, tables)| {
let region_id_to_meta = tables
.into_values()
.map(|(_, physical_region_meta)| {
(physical_region_meta.region_id, physical_region_meta)
})
.collect::<HashMap<_, _>>();
(schema_name, region_id_to_meta)
})
.collect::<HashMap<_, _>>();
info!("Finishing batches cost: {}ms", start.elapsed().as_millis());
let start = Instant::now();
let mut tables_per_schema = HashMap::with_capacity(record_batches.len());
let mut file_metas = vec![];
for (schema_name, schema_batches) in record_batches {
let tables_in_schema =
tables_per_schema.entry(schema_name.clone()).or_insert(0);
*tables_in_schema = *tables_in_schema + 1;
let schema_regions = physical_region_id_to_meta
.get(&schema_name)
.expect("physical region schema not found");
for (physical_region_id, record_batches) in schema_batches {
let physical_region_metadata = schema_regions
.get(&physical_region_id)
.expect("physical region metadata not found");
for (rb, time_range) in record_batches {
let mut writer = access_layer_factory
.create_sst_writer(
"greptime", //todo(hl): use the catalog name in query context.
&schema_name,
physical_region_metadata.clone(),
)
.await
.unwrap();
let start = Instant::now();
info!("Created writer: {}", writer.file_id());
writer
.write_record_batch(&rb, Some(time_range))
.await
.unwrap();
let file_meta = writer.finish().await.unwrap();
info!(
"Finished writer: {}, elapsed time: {}ms",
writer.file_id(),
start.elapsed().as_millis()
);
file_metas.push(file_meta);
}
}
}
info!(
"upload sst files, elapsed time: {}ms, schema num: {} tables_per_schema: {:?}, file_metas: {:?}",
start.elapsed().as_millis(),tables_per_schema.len(),tables_per_schema,file_metas
);
}
});
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct RemoteWriteQuery {
pub db: Option<String>,
@@ -155,7 +312,7 @@ pub async fn remote_write(
node_manager: state.node_manager,
access_layer_factory: state.access_layer_factory,
};
decode_remote_write_request_to_batch(
let builder = decode_remote_write_request_to_batch(
is_zstd,
body,
prom_validation_mode,
@@ -163,6 +320,13 @@ pub async fn remote_write(
context,
)
.await?;
state
.tx
.as_ref()
.unwrap()
.send((query_ctx, builder.tables))
.await
.unwrap();
return Ok((StatusCode::NO_CONTENT, write_cost_header_map(0)).into_response());
}
@@ -285,7 +449,7 @@ async fn decode_remote_write_request_to_batch(
prom_validation_mode: PromValidationMode,
processor: &mut PromSeriesProcessor,
bulk: PromBulkContext,
) -> Result<()> {
) -> Result<TablesBuilder> {
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
// due to vmagent's limitation, there is a chance that vmagent is
@@ -308,7 +472,7 @@ async fn decode_remote_write_request_to_batch(
.merge(buf, prom_validation_mode, processor)
.context(error::DecodePromRemoteRequestSnafu)?;
request.as_record_batch(&bulk).await
Ok(std::mem::take(&mut request.table_data))
}
async fn decode_remote_read_request(body: Bytes) -> Result<ReadRequest> {

View File

@@ -43,7 +43,7 @@ pub struct PromCtx {
#[derive(Default, Debug)]
pub(crate) struct TablesBuilder {
// schema -> table -> table_builder
tables: HashMap<PromCtx, HashMap<String, TableBuilder>>,
pub tables: HashMap<PromCtx, HashMap<String, TableBuilder>>,
}
impl Clear for TablesBuilder {
@@ -202,7 +202,7 @@ impl TablesBuilder {
/// Builder for one table.
#[derive(Debug, Clone)]
pub(crate) struct TableBuilder {
pub struct TableBuilder {
/// Column schemas.
schema: Vec<ColumnSchema>,
/// Rows written.

View File

@@ -284,7 +284,7 @@ pub(crate) fn decode_string(
#[derive(Default, Debug)]
pub struct PromWriteRequest {
table_data: TablesBuilder,
pub table_data: TablesBuilder,
series: PromTimeSeries,
}