From 905593dc1635fde77e9214f708ce93859738de88 Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 25 Jun 2025 13:03:00 +0800 Subject: [PATCH] feat: add bulk mode flag for prom store with SchemaHelper integration Add a new bulk_mode flag to PromStoreOptions that enables bulk processing for prometheus metrics ingestion. When enabled, initializes PromBulkState with a SchemaHelper instance for efficient schema management. Changes: - Add bulk_mode field to PromStoreOptions (default: false) - Add create_schema_helper() method to Instance for SchemaHelper construction - Add getter methods to StatementExecutor for procedure_executor and cache_invalidator - Update server initialization to conditionally create PromBulkState when bulk_mode is enabled - Fix clippy warnings in schema_helper.rs Signed-off-by: evenyag --- src/frontend/src/instance.rs | 10 ++++ src/frontend/src/server.rs | 24 ++++++--- src/frontend/src/service_config/prom_store.rs | 5 +- src/operator/src/schema_helper.rs | 14 ++--- src/operator/src/statement.rs | 8 +++ src/servers/src/batch_builder.rs | 12 +++-- src/servers/src/http.rs | 16 +----- src/servers/src/http/prom_store.rs | 51 ++++++++++++++++++- src/servers/src/prom_row_builder.rs | 20 ++++++++ src/servers/src/proto.rs | 14 ++++- src/servers/tests/http/prom_store_test.rs | 10 +++- tests-integration/src/test_util.rs | 15 +++--- 12 files changed, 157 insertions(+), 42 deletions(-) diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index ddf728c1ba..d9bf7135a0 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -49,6 +49,7 @@ use datafusion_expr::LogicalPlan; use log_store::raft_engine::RaftEngineBackend; use operator::delete::DeleterRef; use operator::insert::InserterRef; +use operator::schema_helper::SchemaHelper; use operator::statement::{StatementExecutor, StatementExecutorRef}; use pipeline::pipeline_operator::PipelineOperator; use prometheus::HistogramTimer; @@ -161,6 +162,15 @@ impl Instance { pub fn process_manager(&self) -> &ProcessManagerRef { &self.process_manager } + + pub fn create_schema_helper(&self) -> SchemaHelper { + SchemaHelper::new( + self.catalog_manager.clone(), + self.table_metadata_manager.clone(), + self.statement_executor.procedure_executor().clone(), + self.statement_executor.cache_invalidator().clone(), + ) + } } fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result> { diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index e1e91080b6..30baf4de24 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -24,6 +24,7 @@ use servers::grpc::frontend_grpc_handler::FrontendGrpcHandler; use servers::grpc::greptime_handler::GreptimeRequestHandler; use servers::grpc::{GrpcOptions, GrpcServer}; use servers::http::event::LogValidatorRef; +use servers::http::prom_store::{PromBulkState, PromStoreState}; use servers::http::{HttpServer, HttpServerBuilder}; use servers::interceptor::LogIngestInterceptorRef; use servers::metrics_handler::MetricsHandler; @@ -95,13 +96,24 @@ where } if opts.prom_store.enable { + let bulk_state = if opts.prom_store.bulk_mode { + Some(PromBulkState { + schema_helper: self.instance.create_schema_helper(), + }) + } else { + None + }; + + let state = PromStoreState { + prom_store_handler: self.instance.clone(), + pipeline_handler: Some(self.instance.clone()), + prom_store_with_metric_engine: opts.prom_store.with_metric_engine, + prom_validation_mode: opts.http.prom_validation_mode, + bulk_state, + }; + builder = builder - .with_prom_handler( - self.instance.clone(), - Some(self.instance.clone()), - opts.prom_store.with_metric_engine, - opts.http.prom_validation_mode, - ) + .with_prom_handler(state) .with_prometheus_handler(self.instance.clone()); } diff --git a/src/frontend/src/service_config/prom_store.rs b/src/frontend/src/service_config/prom_store.rs index b3adf889d2..46a710d545 100644 --- a/src/frontend/src/service_config/prom_store.rs +++ b/src/frontend/src/service_config/prom_store.rs @@ -18,6 +18,7 @@ use serde::{Deserialize, Serialize}; pub struct PromStoreOptions { pub enable: bool, pub with_metric_engine: bool, + pub bulk_mode: bool, } impl Default for PromStoreOptions { @@ -25,6 +26,7 @@ impl Default for PromStoreOptions { Self { enable: true, with_metric_engine: true, + bulk_mode: false, } } } @@ -37,6 +39,7 @@ mod tests { fn test_prom_store_options() { let default = PromStoreOptions::default(); assert!(default.enable); - assert!(default.with_metric_engine) + assert!(default.with_metric_engine); + assert!(!default.bulk_mode); } } diff --git a/src/operator/src/schema_helper.rs b/src/operator/src/schema_helper.rs index 476b15185f..647433b6c0 100644 --- a/src/operator/src/schema_helper.rs +++ b/src/operator/src/schema_helper.rs @@ -627,7 +627,7 @@ pub struct LogicalSchema { /// Logical table schemas. pub struct LogicalSchemas { /// Logical table schemas group by physical table name. - pub schemas: HashMap>, + pub schemas: ahash::HashMap>, } /// Creates or alters logical tables to match the provided schemas @@ -637,13 +637,16 @@ pub async fn ensure_logical_tables_for_metrics( schemas: &LogicalSchemas, query_ctx: &QueryContextRef, ) -> Result<()> { + let catalog_name = query_ctx.current_catalog(); + let schema_name = query_ctx.current_schema(); + // 1. For each physical table, creates it if it doesn't exist. - for (physical_table_name, _) in &schemas.schemas { + for physical_table_name in schemas.schemas.keys() { // Check if the physical table exists and create it if it doesn't let physical_table_opt = helper .get_table( - &query_ctx.current_catalog(), - &query_ctx.current_schema(), + catalog_name, + &schema_name, physical_table_name, ) .await?; @@ -662,8 +665,6 @@ pub async fn ensure_logical_tables_for_metrics( // 3. Collects alterations (columns to add) for each logical table. (AlterTableExpr) let mut tables_to_alter: Vec = Vec::new(); - let catalog_name = query_ctx.current_catalog(); - let schema_name = query_ctx.current_schema(); // Process each logical table to determine if it needs to be created or altered for (physical_table_name, logical_schemas) in &schemas.schemas { for logical_schema in logical_schemas { @@ -733,6 +734,7 @@ pub async fn ensure_logical_tables_for_metrics( /// Gets the list of metadatas for a list of region ids. // TODO(yingwen): Should we return RegionMetadataRef? +#[allow(dead_code)] async fn metadatas_for_region_ids( partition_manager: &PartitionRuleManagerRef, node_manager: &NodeManagerRef, diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index 8b69a04ad2..0eb8e1f361 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -102,6 +102,14 @@ pub struct StatementExecutor { pub type StatementExecutorRef = Arc; impl StatementExecutor { + pub fn procedure_executor(&self) -> &ProcedureExecutorRef { + &self.procedure_executor + } + + pub fn cache_invalidator(&self) -> &CacheInvalidatorRef { + &self.cache_invalidator + } + #[allow(clippy::too_many_arguments)] pub fn new( catalog_manager: CatalogManagerRef, diff --git a/src/servers/src/batch_builder.rs b/src/servers/src/batch_builder.rs index ef248957f9..3ebecdebf1 100644 --- a/src/servers/src/batch_builder.rs +++ b/src/servers/src/batch_builder.rs @@ -12,12 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; +use ahash::HashMap; use api::v1::{ColumnDataType, ColumnSchema, SemanticType}; -use catalog::CatalogManagerRef; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; -use common_meta::key::table_route::TableRouteManagerRef; use common_query::prelude::{GREPTIME_PHYSICAL_TABLE, GREPTIME_TIMESTAMP, GREPTIME_VALUE}; use operator::schema_helper::{ ensure_logical_tables_for_metrics, LogicalSchema, LogicalSchemas, SchemaHelper, @@ -30,12 +29,15 @@ use table::TableRef; use crate::error; use crate::prom_row_builder::{PromCtx, TableBuilder}; -#[allow(dead_code)] pub struct MetricsBatchBuilder { schema_helper: SchemaHelper, } impl MetricsBatchBuilder { + pub fn new(schema_helper: SchemaHelper) -> Self { + MetricsBatchBuilder { schema_helper } + } + /// Detected the DDL requirements according to the staged table rows. pub async fn create_or_alter_physical_tables( &self, @@ -43,7 +45,7 @@ impl MetricsBatchBuilder { query_ctx: &QueryContextRef, ) -> error::Result<()> { // Physical table name -> logical tables -> tags in logical table - let mut tags: HashMap>> = HashMap::new(); + let mut tags: HashMap>> = HashMap::default(); let catalog = query_ctx.current_catalog(); let schema = query_ctx.current_schema(); diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index dd7a8804ab..83d88b8711 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -76,7 +76,6 @@ use crate::query_handler::sql::ServerSqlQueryHandlerRef; use crate::query_handler::{ InfluxdbLineProtocolHandlerRef, JaegerQueryHandlerRef, LogQueryHandlerRef, OpenTelemetryProtocolHandlerRef, OpentsdbProtocolHandlerRef, PipelineHandlerRef, - PromStoreProtocolHandlerRef, }; use crate::server::Server; @@ -566,20 +565,7 @@ impl HttpServerBuilder { } } - pub fn with_prom_handler( - self, - handler: PromStoreProtocolHandlerRef, - pipeline_handler: Option, - prom_store_with_metric_engine: bool, - prom_validation_mode: PromValidationMode, - ) -> Self { - let state = PromStoreState { - prom_store_handler: handler, - pipeline_handler, - prom_store_with_metric_engine, - prom_validation_mode, - }; - + pub fn with_prom_handler(self, state: PromStoreState) -> Self { Self { router: self.router.nest( &format!("/{HTTP_API_VERSION}/prometheus"), diff --git a/src/servers/src/http/prom_store.rs b/src/servers/src/http/prom_store.rs index bada913cae..d9b0e076b0 100644 --- a/src/servers/src/http/prom_store.rs +++ b/src/servers/src/http/prom_store.rs @@ -27,11 +27,12 @@ use common_telemetry::tracing; use hyper::HeaderMap; use lazy_static::lazy_static; use object_pool::Pool; +use operator::schema_helper::SchemaHelper; use pipeline::util::to_pipeline_version; use pipeline::{ContextReq, PipelineDefinition}; use prost::Message; use serde::{Deserialize, Serialize}; -use session::context::{Channel, QueryContext}; +use session::context::{Channel, QueryContext, QueryContextRef}; use snafu::prelude::*; use crate::error::{self, InternalSnafu, PipelineSnafu, Result}; @@ -52,12 +53,19 @@ pub const DEFAULT_ENCODING: &str = "snappy"; pub const VM_ENCODING: &str = "zstd"; pub const VM_PROTO_VERSION: &str = "1"; +/// Additional states for bulk write requests. +#[derive(Clone)] +pub struct PromBulkState { + pub schema_helper: SchemaHelper, +} + #[derive(Clone)] pub struct PromStoreState { pub prom_store_handler: PromStoreProtocolHandlerRef, pub pipeline_handler: Option, pub prom_store_with_metric_engine: bool, pub prom_validation_mode: PromValidationMode, + pub bulk_state: Option, } #[derive(Debug, Serialize, Deserialize)] @@ -98,6 +106,7 @@ pub async fn remote_write( pipeline_handler, prom_store_with_metric_engine, prom_validation_mode, + bulk_state: _, } = state; if let Some(_vm_handshake) = params.get_vm_proto_version { @@ -202,6 +211,12 @@ fn try_decompress(is_zstd: bool, body: &[u8]) -> Result { })) } +/// Context for processing remote write requests in bulk mode. +struct PromBulkContext { + schema_helper: SchemaHelper, + query_ctx: QueryContextRef, +} + async fn decode_remote_write_request( is_zstd: bool, body: Bytes, @@ -236,6 +251,40 @@ async fn decode_remote_write_request( } } +async fn decode_remote_write_request_to_batch( + is_zstd: bool, + body: Bytes, + prom_validation_mode: PromValidationMode, + processor: &mut PromSeriesProcessor, + bulk: PromBulkContext, +) -> Result<()> { + let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer(); + + // due to vmagent's limitation, there is a chance that vmagent is + // sending content type wrong so we have to apply a fallback with decoding + // the content in another method. + // + // see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5301 + // see https://github.com/GreptimeTeam/greptimedb/issues/3929 + let buf = if let Ok(buf) = try_decompress(is_zstd, &body[..]) { + buf + } else { + // fallback to the other compression method + try_decompress(!is_zstd, &body[..])? + }; + + let mut request = PROM_WRITE_REQUEST_POOL.pull(PromWriteRequest::default); + + processor.use_pipeline = false; + request + .merge(buf, prom_validation_mode, processor) + .context(error::DecodePromRemoteRequestSnafu)?; + + request + .as_record_batch(bulk.schema_helper, &bulk.query_ctx) + .await +} + async fn decode_remote_read_request(body: Bytes) -> Result { let buf = snappy_decompress(&body[..])?; diff --git a/src/servers/src/prom_row_builder.rs b/src/servers/src/prom_row_builder.rs index cd298d0582..54296b3564 100644 --- a/src/servers/src/prom_row_builder.rs +++ b/src/servers/src/prom_row_builder.rs @@ -20,9 +20,13 @@ use api::prom_store::remote::Sample; use api::v1::value::ValueData; use api::v1::{ColumnDataType, ColumnSchema, Row, RowInsertRequest, Rows, SemanticType, Value}; use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; +use operator::schema_helper::SchemaHelper; use pipeline::{ContextOpt, ContextReq}; use prost::DecodeError; +use session::context::QueryContextRef; +use crate::batch_builder::MetricsBatchBuilder; +use crate::error::Result; use crate::http::PromValidationMode; use crate::proto::{decode_string, PromLabel}; use crate::repeated_field::Clear; @@ -91,6 +95,22 @@ impl TablesBuilder { req }) } + + /// Converts [TablesBuilder] to record batch and clears inner states. + pub(crate) async fn as_record_batch( + &mut self, + schema_helper: SchemaHelper, + query_ctx: &QueryContextRef, + ) -> Result<()> { + let batch_builder = MetricsBatchBuilder::new(schema_helper); + let tables = std::mem::take(&mut self.tables); + + batch_builder + .create_or_alter_physical_tables(&tables, query_ctx) + .await?; + + todo!() + } } /// Builder for one table. diff --git a/src/servers/src/proto.rs b/src/servers/src/proto.rs index 8b2a73461f..a62ad04f48 100644 --- a/src/servers/src/proto.rs +++ b/src/servers/src/proto.rs @@ -21,6 +21,7 @@ use api::prom_store::remote::Sample; use bytes::{Buf, Bytes}; use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; use common_telemetry::debug; +use operator::schema_helper::SchemaHelper; use pipeline::{ContextReq, GreptimePipelineParams, PipelineContext, PipelineDefinition, Value}; use prost::encoding::message::merge; use prost::encoding::{decode_key, decode_varint, WireType}; @@ -28,7 +29,7 @@ use prost::DecodeError; use session::context::QueryContextRef; use snafu::OptionExt; -use crate::error::InternalSnafu; +use crate::error::{InternalSnafu, Result}; use crate::http::event::PipelineIngestRequest; use crate::http::PromValidationMode; use crate::pipeline::run_pipeline; @@ -352,6 +353,17 @@ impl PromWriteRequest { Ok(()) } + + /// Converts the write request into a record batch and reset the table data. + pub async fn as_record_batch( + &mut self, + schema_helper: SchemaHelper, + query_ctx: &QueryContextRef, + ) -> Result<()> { + self.table_data + .as_record_batch(schema_helper, query_ctx) + .await + } } /// A hook to be injected into the PromWriteRequest decoding process. diff --git a/src/servers/tests/http/prom_store_test.rs b/src/servers/tests/http/prom_store_test.rs index f87697cf4d..90d031aae4 100644 --- a/src/servers/tests/http/prom_store_test.rs +++ b/src/servers/tests/http/prom_store_test.rs @@ -28,6 +28,7 @@ use query::parser::PromQuery; use query::query_engine::DescribeResult; use servers::error::{Error, Result}; use servers::http::header::{CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF}; +use servers::http::prom_store::PromStoreState; use servers::http::test_helpers::TestClient; use servers::http::{HttpOptions, HttpServerBuilder, PromValidationMode}; use servers::prom_store; @@ -121,9 +122,16 @@ fn make_test_app(tx: mpsc::Sender<(String, Vec)>) -> Router { }; let instance = Arc::new(DummyInstance { tx }); + let state = PromStoreState { + prom_store_handler: instance.clone(), + pipeline_handler: None, + prom_store_with_metric_engine: true, + prom_validation_mode: PromValidationMode::Unchecked, + bulk_state: None, + }; let server = HttpServerBuilder::new(http_opts) .with_sql_handler(instance.clone()) - .with_prom_handler(instance, None, true, PromValidationMode::Unchecked) + .with_prom_handler(state) .build(); server.build(server.make_app()).unwrap() } diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 1f4c47903d..8b25d9ae96 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -43,6 +43,7 @@ use object_store::ObjectStore; use servers::grpc::builder::GrpcServerBuilder; use servers::grpc::greptime_handler::GreptimeRequestHandler; use servers::grpc::{FlightCompression, GrpcOptions, GrpcServer, GrpcServerConfig}; +use servers::http::prom_store::PromStoreState; use servers::http::{HttpOptions, HttpServerBuilder, PromValidationMode}; use servers::metrics_handler::MetricsHandler; use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef}; @@ -534,15 +535,17 @@ pub async fn setup_test_prom_app_with_frontend( ..Default::default() }; let frontend_ref = instance.fe_instance().clone(); + let state = PromStoreState { + prom_store_handler: frontend_ref.clone(), + pipeline_handler: Some(frontend_ref.clone()), + prom_store_with_metric_engine: true, + prom_validation_mode: PromValidationMode::Strict, + bulk_state: None, + }; let http_server = HttpServerBuilder::new(http_opts) .with_sql_handler(ServerSqlQueryHandlerAdapter::arc(frontend_ref.clone())) .with_logs_handler(instance.fe_instance().clone()) - .with_prom_handler( - frontend_ref.clone(), - Some(frontend_ref.clone()), - true, - PromValidationMode::Strict, - ) + .with_prom_handler(state) .with_prometheus_handler(frontend_ref) .with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap()) .build();