feat: add bulk mode flag for prom store with SchemaHelper integration

Add a new bulk_mode flag to PromStoreOptions that enables bulk processing
for prometheus metrics ingestion. When enabled, initializes PromBulkState
with a SchemaHelper instance for efficient schema management.

Changes:
- Add bulk_mode field to PromStoreOptions (default: false)
- Add create_schema_helper() method to Instance for SchemaHelper construction
- Add getter methods to StatementExecutor for procedure_executor and cache_invalidator
- Update server initialization to conditionally create PromBulkState when bulk_mode is enabled
- Fix clippy warnings in schema_helper.rs

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
evenyag
2025-06-25 13:03:00 +08:00
committed by Lei, HUANG
parent 6c04cb9b19
commit 905593dc16
12 changed files with 157 additions and 42 deletions

View File

@@ -49,6 +49,7 @@ use datafusion_expr::LogicalPlan;
use log_store::raft_engine::RaftEngineBackend;
use operator::delete::DeleterRef;
use operator::insert::InserterRef;
use operator::schema_helper::SchemaHelper;
use operator::statement::{StatementExecutor, StatementExecutorRef};
use pipeline::pipeline_operator::PipelineOperator;
use prometheus::HistogramTimer;
@@ -161,6 +162,15 @@ impl Instance {
pub fn process_manager(&self) -> &ProcessManagerRef {
&self.process_manager
}
pub fn create_schema_helper(&self) -> SchemaHelper {
SchemaHelper::new(
self.catalog_manager.clone(),
self.table_metadata_manager.clone(),
self.statement_executor.procedure_executor().clone(),
self.statement_executor.cache_invalidator().clone(),
)
}
}
fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {

View File

@@ -24,6 +24,7 @@ use servers::grpc::frontend_grpc_handler::FrontendGrpcHandler;
use servers::grpc::greptime_handler::GreptimeRequestHandler;
use servers::grpc::{GrpcOptions, GrpcServer};
use servers::http::event::LogValidatorRef;
use servers::http::prom_store::{PromBulkState, PromStoreState};
use servers::http::{HttpServer, HttpServerBuilder};
use servers::interceptor::LogIngestInterceptorRef;
use servers::metrics_handler::MetricsHandler;
@@ -95,13 +96,24 @@ where
}
if opts.prom_store.enable {
let bulk_state = if opts.prom_store.bulk_mode {
Some(PromBulkState {
schema_helper: self.instance.create_schema_helper(),
})
} else {
None
};
let state = PromStoreState {
prom_store_handler: self.instance.clone(),
pipeline_handler: Some(self.instance.clone()),
prom_store_with_metric_engine: opts.prom_store.with_metric_engine,
prom_validation_mode: opts.http.prom_validation_mode,
bulk_state,
};
builder = builder
.with_prom_handler(
self.instance.clone(),
Some(self.instance.clone()),
opts.prom_store.with_metric_engine,
opts.http.prom_validation_mode,
)
.with_prom_handler(state)
.with_prometheus_handler(self.instance.clone());
}

View File

@@ -18,6 +18,7 @@ use serde::{Deserialize, Serialize};
pub struct PromStoreOptions {
pub enable: bool,
pub with_metric_engine: bool,
pub bulk_mode: bool,
}
impl Default for PromStoreOptions {
@@ -25,6 +26,7 @@ impl Default for PromStoreOptions {
Self {
enable: true,
with_metric_engine: true,
bulk_mode: false,
}
}
}
@@ -37,6 +39,7 @@ mod tests {
fn test_prom_store_options() {
let default = PromStoreOptions::default();
assert!(default.enable);
assert!(default.with_metric_engine)
assert!(default.with_metric_engine);
assert!(!default.bulk_mode);
}
}

View File

@@ -627,7 +627,7 @@ pub struct LogicalSchema {
/// Logical table schemas.
pub struct LogicalSchemas {
/// Logical table schemas group by physical table name.
pub schemas: HashMap<String, Vec<LogicalSchema>>,
pub schemas: ahash::HashMap<String, Vec<LogicalSchema>>,
}
/// Creates or alters logical tables to match the provided schemas
@@ -637,13 +637,16 @@ pub async fn ensure_logical_tables_for_metrics(
schemas: &LogicalSchemas,
query_ctx: &QueryContextRef,
) -> Result<()> {
let catalog_name = query_ctx.current_catalog();
let schema_name = query_ctx.current_schema();
// 1. For each physical table, creates it if it doesn't exist.
for (physical_table_name, _) in &schemas.schemas {
for physical_table_name in schemas.schemas.keys() {
// Check if the physical table exists and create it if it doesn't
let physical_table_opt = helper
.get_table(
&query_ctx.current_catalog(),
&query_ctx.current_schema(),
catalog_name,
&schema_name,
physical_table_name,
)
.await?;
@@ -662,8 +665,6 @@ pub async fn ensure_logical_tables_for_metrics(
// 3. Collects alterations (columns to add) for each logical table. (AlterTableExpr)
let mut tables_to_alter: Vec<AlterTableExpr> = Vec::new();
let catalog_name = query_ctx.current_catalog();
let schema_name = query_ctx.current_schema();
// Process each logical table to determine if it needs to be created or altered
for (physical_table_name, logical_schemas) in &schemas.schemas {
for logical_schema in logical_schemas {
@@ -733,6 +734,7 @@ pub async fn ensure_logical_tables_for_metrics(
/// Gets the list of metadatas for a list of region ids.
// TODO(yingwen): Should we return RegionMetadataRef?
#[allow(dead_code)]
async fn metadatas_for_region_ids(
partition_manager: &PartitionRuleManagerRef,
node_manager: &NodeManagerRef,

View File

@@ -102,6 +102,14 @@ pub struct StatementExecutor {
pub type StatementExecutorRef = Arc<StatementExecutor>;
impl StatementExecutor {
pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
&self.procedure_executor
}
pub fn cache_invalidator(&self) -> &CacheInvalidatorRef {
&self.cache_invalidator
}
#[allow(clippy::too_many_arguments)]
pub fn new(
catalog_manager: CatalogManagerRef,

View File

@@ -12,12 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
use ahash::HashMap;
use api::v1::{ColumnDataType, ColumnSchema, SemanticType};
use catalog::CatalogManagerRef;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::key::table_route::TableRouteManagerRef;
use common_query::prelude::{GREPTIME_PHYSICAL_TABLE, GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use operator::schema_helper::{
ensure_logical_tables_for_metrics, LogicalSchema, LogicalSchemas, SchemaHelper,
@@ -30,12 +29,15 @@ use table::TableRef;
use crate::error;
use crate::prom_row_builder::{PromCtx, TableBuilder};
#[allow(dead_code)]
pub struct MetricsBatchBuilder {
schema_helper: SchemaHelper,
}
impl MetricsBatchBuilder {
pub fn new(schema_helper: SchemaHelper) -> Self {
MetricsBatchBuilder { schema_helper }
}
/// Detected the DDL requirements according to the staged table rows.
pub async fn create_or_alter_physical_tables(
&self,
@@ -43,7 +45,7 @@ impl MetricsBatchBuilder {
query_ctx: &QueryContextRef,
) -> error::Result<()> {
// Physical table name -> logical tables -> tags in logical table
let mut tags: HashMap<String, HashMap<String, HashSet<String>>> = HashMap::new();
let mut tags: HashMap<String, HashMap<String, HashSet<String>>> = HashMap::default();
let catalog = query_ctx.current_catalog();
let schema = query_ctx.current_schema();

View File

@@ -76,7 +76,6 @@ use crate::query_handler::sql::ServerSqlQueryHandlerRef;
use crate::query_handler::{
InfluxdbLineProtocolHandlerRef, JaegerQueryHandlerRef, LogQueryHandlerRef,
OpenTelemetryProtocolHandlerRef, OpentsdbProtocolHandlerRef, PipelineHandlerRef,
PromStoreProtocolHandlerRef,
};
use crate::server::Server;
@@ -566,20 +565,7 @@ impl HttpServerBuilder {
}
}
pub fn with_prom_handler(
self,
handler: PromStoreProtocolHandlerRef,
pipeline_handler: Option<PipelineHandlerRef>,
prom_store_with_metric_engine: bool,
prom_validation_mode: PromValidationMode,
) -> Self {
let state = PromStoreState {
prom_store_handler: handler,
pipeline_handler,
prom_store_with_metric_engine,
prom_validation_mode,
};
pub fn with_prom_handler(self, state: PromStoreState) -> Self {
Self {
router: self.router.nest(
&format!("/{HTTP_API_VERSION}/prometheus"),

View File

@@ -27,11 +27,12 @@ use common_telemetry::tracing;
use hyper::HeaderMap;
use lazy_static::lazy_static;
use object_pool::Pool;
use operator::schema_helper::SchemaHelper;
use pipeline::util::to_pipeline_version;
use pipeline::{ContextReq, PipelineDefinition};
use prost::Message;
use serde::{Deserialize, Serialize};
use session::context::{Channel, QueryContext};
use session::context::{Channel, QueryContext, QueryContextRef};
use snafu::prelude::*;
use crate::error::{self, InternalSnafu, PipelineSnafu, Result};
@@ -52,12 +53,19 @@ pub const DEFAULT_ENCODING: &str = "snappy";
pub const VM_ENCODING: &str = "zstd";
pub const VM_PROTO_VERSION: &str = "1";
/// Additional states for bulk write requests.
#[derive(Clone)]
pub struct PromBulkState {
pub schema_helper: SchemaHelper,
}
#[derive(Clone)]
pub struct PromStoreState {
pub prom_store_handler: PromStoreProtocolHandlerRef,
pub pipeline_handler: Option<PipelineHandlerRef>,
pub prom_store_with_metric_engine: bool,
pub prom_validation_mode: PromValidationMode,
pub bulk_state: Option<PromBulkState>,
}
#[derive(Debug, Serialize, Deserialize)]
@@ -98,6 +106,7 @@ pub async fn remote_write(
pipeline_handler,
prom_store_with_metric_engine,
prom_validation_mode,
bulk_state: _,
} = state;
if let Some(_vm_handshake) = params.get_vm_proto_version {
@@ -202,6 +211,12 @@ fn try_decompress(is_zstd: bool, body: &[u8]) -> Result<Bytes> {
}))
}
/// Context for processing remote write requests in bulk mode.
struct PromBulkContext {
schema_helper: SchemaHelper,
query_ctx: QueryContextRef,
}
async fn decode_remote_write_request(
is_zstd: bool,
body: Bytes,
@@ -236,6 +251,40 @@ async fn decode_remote_write_request(
}
}
async fn decode_remote_write_request_to_batch(
is_zstd: bool,
body: Bytes,
prom_validation_mode: PromValidationMode,
processor: &mut PromSeriesProcessor,
bulk: PromBulkContext,
) -> Result<()> {
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
// due to vmagent's limitation, there is a chance that vmagent is
// sending content type wrong so we have to apply a fallback with decoding
// the content in another method.
//
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5301
// see https://github.com/GreptimeTeam/greptimedb/issues/3929
let buf = if let Ok(buf) = try_decompress(is_zstd, &body[..]) {
buf
} else {
// fallback to the other compression method
try_decompress(!is_zstd, &body[..])?
};
let mut request = PROM_WRITE_REQUEST_POOL.pull(PromWriteRequest::default);
processor.use_pipeline = false;
request
.merge(buf, prom_validation_mode, processor)
.context(error::DecodePromRemoteRequestSnafu)?;
request
.as_record_batch(bulk.schema_helper, &bulk.query_ctx)
.await
}
async fn decode_remote_read_request(body: Bytes) -> Result<ReadRequest> {
let buf = snappy_decompress(&body[..])?;

View File

@@ -20,9 +20,13 @@ use api::prom_store::remote::Sample;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, Row, RowInsertRequest, Rows, SemanticType, Value};
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use operator::schema_helper::SchemaHelper;
use pipeline::{ContextOpt, ContextReq};
use prost::DecodeError;
use session::context::QueryContextRef;
use crate::batch_builder::MetricsBatchBuilder;
use crate::error::Result;
use crate::http::PromValidationMode;
use crate::proto::{decode_string, PromLabel};
use crate::repeated_field::Clear;
@@ -91,6 +95,22 @@ impl TablesBuilder {
req
})
}
/// Converts [TablesBuilder] to record batch and clears inner states.
pub(crate) async fn as_record_batch(
&mut self,
schema_helper: SchemaHelper,
query_ctx: &QueryContextRef,
) -> Result<()> {
let batch_builder = MetricsBatchBuilder::new(schema_helper);
let tables = std::mem::take(&mut self.tables);
batch_builder
.create_or_alter_physical_tables(&tables, query_ctx)
.await?;
todo!()
}
}
/// Builder for one table.

View File

@@ -21,6 +21,7 @@ use api::prom_store::remote::Sample;
use bytes::{Buf, Bytes};
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_telemetry::debug;
use operator::schema_helper::SchemaHelper;
use pipeline::{ContextReq, GreptimePipelineParams, PipelineContext, PipelineDefinition, Value};
use prost::encoding::message::merge;
use prost::encoding::{decode_key, decode_varint, WireType};
@@ -28,7 +29,7 @@ use prost::DecodeError;
use session::context::QueryContextRef;
use snafu::OptionExt;
use crate::error::InternalSnafu;
use crate::error::{InternalSnafu, Result};
use crate::http::event::PipelineIngestRequest;
use crate::http::PromValidationMode;
use crate::pipeline::run_pipeline;
@@ -352,6 +353,17 @@ impl PromWriteRequest {
Ok(())
}
/// Converts the write request into a record batch and reset the table data.
pub async fn as_record_batch(
&mut self,
schema_helper: SchemaHelper,
query_ctx: &QueryContextRef,
) -> Result<()> {
self.table_data
.as_record_batch(schema_helper, query_ctx)
.await
}
}
/// A hook to be injected into the PromWriteRequest decoding process.

View File

@@ -28,6 +28,7 @@ use query::parser::PromQuery;
use query::query_engine::DescribeResult;
use servers::error::{Error, Result};
use servers::http::header::{CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF};
use servers::http::prom_store::PromStoreState;
use servers::http::test_helpers::TestClient;
use servers::http::{HttpOptions, HttpServerBuilder, PromValidationMode};
use servers::prom_store;
@@ -121,9 +122,16 @@ fn make_test_app(tx: mpsc::Sender<(String, Vec<u8>)>) -> Router {
};
let instance = Arc::new(DummyInstance { tx });
let state = PromStoreState {
prom_store_handler: instance.clone(),
pipeline_handler: None,
prom_store_with_metric_engine: true,
prom_validation_mode: PromValidationMode::Unchecked,
bulk_state: None,
};
let server = HttpServerBuilder::new(http_opts)
.with_sql_handler(instance.clone())
.with_prom_handler(instance, None, true, PromValidationMode::Unchecked)
.with_prom_handler(state)
.build();
server.build(server.make_app()).unwrap()
}

View File

@@ -43,6 +43,7 @@ use object_store::ObjectStore;
use servers::grpc::builder::GrpcServerBuilder;
use servers::grpc::greptime_handler::GreptimeRequestHandler;
use servers::grpc::{FlightCompression, GrpcOptions, GrpcServer, GrpcServerConfig};
use servers::http::prom_store::PromStoreState;
use servers::http::{HttpOptions, HttpServerBuilder, PromValidationMode};
use servers::metrics_handler::MetricsHandler;
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
@@ -534,15 +535,17 @@ pub async fn setup_test_prom_app_with_frontend(
..Default::default()
};
let frontend_ref = instance.fe_instance().clone();
let state = PromStoreState {
prom_store_handler: frontend_ref.clone(),
pipeline_handler: Some(frontend_ref.clone()),
prom_store_with_metric_engine: true,
prom_validation_mode: PromValidationMode::Strict,
bulk_state: None,
};
let http_server = HttpServerBuilder::new(http_opts)
.with_sql_handler(ServerSqlQueryHandlerAdapter::arc(frontend_ref.clone()))
.with_logs_handler(instance.fe_instance().clone())
.with_prom_handler(
frontend_ref.clone(),
Some(frontend_ref.clone()),
true,
PromValidationMode::Strict,
)
.with_prom_handler(state)
.with_prometheus_handler(frontend_ref)
.with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap())
.build();