feat: Implements collect_physical_region_metadata

Add partition_manager and node_manager to PromBulkState and PromBulkContext

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
evenyag
2025-06-25 19:33:28 +08:00
committed by Lei, HUANG
parent 63803f2b43
commit 344006deca
10 changed files with 146 additions and 39 deletions

1
Cargo.lock generated
View File

@@ -11286,6 +11286,7 @@ dependencies = [
"operator", "operator",
"otel-arrow-rust", "otel-arrow-rust",
"parking_lot 0.12.3", "parking_lot 0.12.3",
"partition",
"permutation", "permutation",
"pgwire", "pgwire",
"pin-project", "pin-project",

View File

@@ -39,6 +39,7 @@ use common_config::KvBackendConfig;
use common_error::ext::{BoxedError, ErrorExt}; use common_error::ext::{BoxedError, ErrorExt};
use common_meta::key::TableMetadataManagerRef; use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::KvBackendRef; use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::NodeManagerRef;
use common_meta::state_store::KvStateStore; use common_meta::state_store::KvStateStore;
use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::options::ProcedureConfig; use common_procedure::options::ProcedureConfig;
@@ -51,6 +52,7 @@ use operator::delete::DeleterRef;
use operator::insert::InserterRef; use operator::insert::InserterRef;
use operator::schema_helper::SchemaHelper; use operator::schema_helper::SchemaHelper;
use operator::statement::{StatementExecutor, StatementExecutorRef}; use operator::statement::{StatementExecutor, StatementExecutorRef};
use partition::manager::PartitionRuleManagerRef;
use pipeline::pipeline_operator::PipelineOperator; use pipeline::pipeline_operator::PipelineOperator;
use prometheus::HistogramTimer; use prometheus::HistogramTimer;
use promql_parser::label::Matcher; use promql_parser::label::Matcher;
@@ -171,6 +173,14 @@ impl Instance {
self.statement_executor.cache_invalidator().clone(), self.statement_executor.cache_invalidator().clone(),
) )
} }
pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
self.inserter.partition_manager()
}
pub fn node_manager(&self) -> &NodeManagerRef {
self.inserter.node_manager()
}
} }
fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> { fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {

View File

@@ -99,6 +99,8 @@ where
let bulk_state = if opts.prom_store.bulk_mode { let bulk_state = if opts.prom_store.bulk_mode {
Some(PromBulkState { Some(PromBulkState {
schema_helper: self.instance.create_schema_helper(), schema_helper: self.instance.create_schema_helper(),
partition_manager: self.instance.partition_manager().clone(),
node_manager: self.instance.node_manager().clone(),
}) })
} else { } else {
None None

View File

@@ -136,6 +136,14 @@ impl Inserter {
} }
} }
pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
&self.partition_manager
}
pub fn node_manager(&self) -> &NodeManagerRef {
&self.node_manager
}
pub async fn handle_column_inserts( pub async fn handle_column_inserts(
&self, &self,
requests: InsertRequests, requests: InsertRequests,

View File

@@ -730,8 +730,7 @@ pub async fn ensure_logical_tables_for_metrics(
/// Gets the list of metadatas for a list of region ids. /// Gets the list of metadatas for a list of region ids.
// TODO(yingwen): Should we return RegionMetadataRef? // TODO(yingwen): Should we return RegionMetadataRef?
#[allow(dead_code)] pub async fn metadatas_for_region_ids(
async fn metadatas_for_region_ids(
partition_manager: &PartitionRuleManagerRef, partition_manager: &PartitionRuleManagerRef,
node_manager: &NodeManagerRef, node_manager: &NodeManagerRef,
region_ids: &[RegionId], region_ids: &[RegionId],

View File

@@ -82,6 +82,7 @@ object-pool = "0.5"
once_cell.workspace = true once_cell.workspace = true
openmetrics-parser = "0.4" openmetrics-parser = "0.4"
operator.workspace = true operator.workspace = true
partition.workspace = true
simd-json.workspace = true simd-json.workspace = true
socket2 = "0.5" socket2 = "0.5"
# use crates.io version once the following PRs is merged into the nextest release # use crates.io version once the following PRs is merged into the nextest release

View File

@@ -24,19 +24,23 @@ use arrow::array::{
use arrow::compute; use arrow::compute;
use arrow_schema::Field; use arrow_schema::Field;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::node_manager::NodeManagerRef;
use common_query::prelude::{GREPTIME_PHYSICAL_TABLE, GREPTIME_TIMESTAMP, GREPTIME_VALUE}; use common_query::prelude::{GREPTIME_PHYSICAL_TABLE, GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use itertools::Itertools;
use metric_engine::row_modifier::{RowModifier, RowsIter}; use metric_engine::row_modifier::{RowModifier, RowsIter};
use mito_codec::row_converter::SparsePrimaryKeyCodec; use mito_codec::row_converter::SparsePrimaryKeyCodec;
use operator::schema_helper::{ use operator::schema_helper::{
ensure_logical_tables_for_metrics, LogicalSchema, LogicalSchemas, SchemaHelper, ensure_logical_tables_for_metrics, metadatas_for_region_ids, LogicalSchema, LogicalSchemas,
SchemaHelper,
}; };
use partition::manager::PartitionRuleManagerRef;
use session::context::QueryContextRef; use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt}; use snafu::{OptionExt, ResultExt};
use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use store_api::metadata::RegionMetadataRef;
use store_api::storage::consts::{ use store_api::storage::consts::{
ReservedColumnId, OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME, ReservedColumnId, OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
}; };
use store_api::storage::ColumnId; use store_api::storage::{ColumnId, RegionId};
use table::metadata::TableId; use table::metadata::TableId;
use table::table_name::TableName; use table::table_name::TableName;
@@ -46,13 +50,21 @@ use crate::prom_row_builder::{PromCtx, TableBuilder};
pub struct MetricsBatchBuilder { pub struct MetricsBatchBuilder {
schema_helper: SchemaHelper, schema_helper: SchemaHelper,
builders: HashMap<TableId, BatchEncoder>, builders: HashMap<TableId, BatchEncoder>,
partition_manager: PartitionRuleManagerRef,
node_manager: NodeManagerRef,
} }
impl MetricsBatchBuilder { impl MetricsBatchBuilder {
pub fn new(schema_helper: SchemaHelper) -> Self { pub fn new(
schema_helper: SchemaHelper,
partition_manager: PartitionRuleManagerRef,
node_manager: NodeManagerRef,
) -> Self {
MetricsBatchBuilder { MetricsBatchBuilder {
schema_helper, schema_helper,
builders: Default::default(), builders: Default::default(),
partition_manager,
node_manager,
} }
} }
@@ -132,14 +144,89 @@ impl MetricsBatchBuilder {
} }
/// Retrieves physical region metadata of given logical table names. /// Retrieves physical region metadata of given logical table names.
pub async fn collect_physical_region_metadata( ///
/// The `logical_tables` is a list of table names, each entry contains the schema name and the table name.
/// Returns the following mapping: `schema => logical table => (logical table id, region 0 metadata of the physical table)`.
pub(crate) async fn collect_physical_region_metadata(
&self, &self,
logical_table_names: &[String], logical_tables: &[(String, String)],
) -> HashMap< query_ctx: &QueryContextRef,
TableName, /*logical table name*/ ) -> error::Result<HashMap<String, HashMap<String, (TableId, RegionMetadataRef)>>> {
RegionMetadataRef, /*Region metadata for physical re*/ let catalog = query_ctx.current_catalog();
> { // Logical and physical table ids.
todo!() let mut table_ids = Vec::with_capacity(logical_tables.len());
let mut physical_region_ids = HashSet::new();
for (schema, table_name) in logical_tables {
let logical_table = self
.schema_helper
.get_table(catalog, schema, table_name)
.await
.context(error::OperatorSnafu)?
.context(error::TableNotFoundSnafu {
catalog,
schema: schema,
table: table_name,
})?;
let logical_table_id = logical_table.table_info().table_id();
let physical_table_id = self
.schema_helper
.table_route_manager()
.get_physical_table_id(logical_table_id)
.await
.context(error::CommonMetaSnafu)?;
table_ids.push((logical_table_id, physical_table_id));
// We only get metadata from region 0.
physical_region_ids.insert(RegionId::new(physical_table_id, 0));
}
// Batch get physical metadata.
let physical_region_ids = physical_region_ids.into_iter().collect_vec();
let region_metadatas = metadatas_for_region_ids(
&self.partition_manager,
&self.node_manager,
&physical_region_ids,
query_ctx,
)
.await
.context(error::OperatorSnafu)?;
let mut result_map: HashMap<_, HashMap<_, _>> = HashMap::new();
let region_metadatas: HashMap<_, _> = region_metadatas
.into_iter()
.flatten()
.map(|meta| (meta.region_id, Arc::new(meta)))
.collect();
for (i, (schema, table_name)) in logical_tables.iter().enumerate() {
let physical_table_id = table_ids[i].1;
let physical_region_id = RegionId::new(physical_table_id, 0);
let physical_metadata =
region_metadatas.get(&physical_region_id).with_context(|| {
error::UnexpectedResultSnafu {
reason: format!(
"Physical region metadata {} for table {} not found",
physical_region_id, table_name
),
}
})?;
match result_map.get_mut(schema) {
Some(table_map) => {
table_map.insert(
table_name.clone(),
(table_ids[i].0, physical_metadata.clone()),
);
}
None => {
let mut table_map = HashMap::new();
table_map.insert(
table_name.clone(),
(table_ids[i].0, physical_metadata.clone()),
);
result_map.insert(schema.to_string(), table_map);
}
}
}
Ok(result_map)
} }
/// Builds [RecordBatch] from rows with primary key encoded. /// Builds [RecordBatch] from rows with primary key encoded.

View File

@@ -22,12 +22,14 @@ use axum::response::IntoResponse;
use axum::Extension; use axum::Extension;
use axum_extra::TypedHeader; use axum_extra::TypedHeader;
use common_catalog::consts::DEFAULT_SCHEMA_NAME; use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_meta::node_manager::NodeManagerRef;
use common_query::prelude::GREPTIME_PHYSICAL_TABLE; use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
use common_telemetry::tracing; use common_telemetry::tracing;
use hyper::HeaderMap; use hyper::HeaderMap;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use object_pool::Pool; use object_pool::Pool;
use operator::schema_helper::SchemaHelper; use operator::schema_helper::SchemaHelper;
use partition::manager::PartitionRuleManagerRef;
use pipeline::util::to_pipeline_version; use pipeline::util::to_pipeline_version;
use pipeline::{ContextReq, PipelineDefinition}; use pipeline::{ContextReq, PipelineDefinition};
use prost::Message; use prost::Message;
@@ -57,6 +59,8 @@ pub const VM_PROTO_VERSION: &str = "1";
#[derive(Clone)] #[derive(Clone)]
pub struct PromBulkState { pub struct PromBulkState {
pub schema_helper: SchemaHelper, pub schema_helper: SchemaHelper,
pub partition_manager: PartitionRuleManagerRef,
pub node_manager: NodeManagerRef,
} }
#[derive(Clone)] #[derive(Clone)]
@@ -212,9 +216,11 @@ fn try_decompress(is_zstd: bool, body: &[u8]) -> Result<Bytes> {
} }
/// Context for processing remote write requests in bulk mode. /// Context for processing remote write requests in bulk mode.
struct PromBulkContext { pub struct PromBulkContext {
schema_helper: SchemaHelper, pub(crate) schema_helper: SchemaHelper,
query_ctx: QueryContextRef, pub(crate) query_ctx: QueryContextRef,
pub(crate) partition_manager: PartitionRuleManagerRef,
pub(crate) node_manager: NodeManagerRef,
} }
async fn decode_remote_write_request( async fn decode_remote_write_request(
@@ -280,9 +286,7 @@ async fn decode_remote_write_request_to_batch(
.merge(buf, prom_validation_mode, processor) .merge(buf, prom_validation_mode, processor)
.context(error::DecodePromRemoteRequestSnafu)?; .context(error::DecodePromRemoteRequestSnafu)?;
request request.as_record_batch(&bulk).await
.as_record_batch(bulk.schema_helper, &bulk.query_ctx)
.await
} }
async fn decode_remote_read_request(body: Bytes) -> Result<ReadRequest> { async fn decode_remote_read_request(body: Bytes) -> Result<ReadRequest> {

View File

@@ -20,13 +20,12 @@ use api::prom_store::remote::Sample;
use api::v1::value::ValueData; use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, Row, RowInsertRequest, Rows, SemanticType, Value}; use api::v1::{ColumnDataType, ColumnSchema, Row, RowInsertRequest, Rows, SemanticType, Value};
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use operator::schema_helper::SchemaHelper;
use pipeline::{ContextOpt, ContextReq}; use pipeline::{ContextOpt, ContextReq};
use prost::DecodeError; use prost::DecodeError;
use session::context::QueryContextRef;
use crate::batch_builder::MetricsBatchBuilder; use crate::batch_builder::MetricsBatchBuilder;
use crate::error::Result; use crate::error::Result;
use crate::http::prom_store::PromBulkContext;
use crate::http::PromValidationMode; use crate::http::PromValidationMode;
use crate::proto::{decode_string, PromLabel}; use crate::proto::{decode_string, PromLabel};
use crate::repeated_field::Clear; use crate::repeated_field::Clear;
@@ -97,20 +96,22 @@ impl TablesBuilder {
} }
/// Converts [TablesBuilder] to record batch and clears inner states. /// Converts [TablesBuilder] to record batch and clears inner states.
pub(crate) async fn as_record_batch( pub(crate) async fn as_record_batch(&mut self, bulk_ctx: &PromBulkContext) -> Result<()> {
&mut self, let batch_builder = MetricsBatchBuilder::new(
schema_helper: SchemaHelper, bulk_ctx.schema_helper.clone(),
query_ctx: &QueryContextRef, bulk_ctx.partition_manager.clone(),
) -> Result<()> { bulk_ctx.node_manager.clone(),
let mut batch_builder = MetricsBatchBuilder::new(schema_helper); );
let mut tables = std::mem::take(&mut self.tables); let tables = std::mem::take(&mut self.tables);
batch_builder batch_builder
.create_or_alter_physical_tables(&tables, query_ctx) .create_or_alter_physical_tables(&tables, &bulk_ctx.query_ctx)
.await?; .await?;
// Gather all region metadata for region 0 of physical tables. // Gather all region metadata for region 0 of physical tables.
let physical_region_metadata = batch_builder.collect_physical_region_metadata(&[]).await; let physical_region_metadata = batch_builder
.collect_physical_region_metadata(&[], &bulk_ctx.query_ctx)
.await;
batch_builder batch_builder
.append_rows_to_batch(None, None, &mut tables, &physical_region_metadata) .append_rows_to_batch(None, None, &mut tables, &physical_region_metadata)

View File

@@ -21,7 +21,6 @@ use api::prom_store::remote::Sample;
use bytes::{Buf, Bytes}; use bytes::{Buf, Bytes};
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_telemetry::debug; use common_telemetry::debug;
use operator::schema_helper::SchemaHelper;
use pipeline::{ContextReq, GreptimePipelineParams, PipelineContext, PipelineDefinition, Value}; use pipeline::{ContextReq, GreptimePipelineParams, PipelineContext, PipelineDefinition, Value};
use prost::encoding::message::merge; use prost::encoding::message::merge;
use prost::encoding::{decode_key, decode_varint, WireType}; use prost::encoding::{decode_key, decode_varint, WireType};
@@ -31,6 +30,7 @@ use snafu::OptionExt;
use crate::error::{InternalSnafu, Result}; use crate::error::{InternalSnafu, Result};
use crate::http::event::PipelineIngestRequest; use crate::http::event::PipelineIngestRequest;
use crate::http::prom_store::PromBulkContext;
use crate::http::PromValidationMode; use crate::http::PromValidationMode;
use crate::pipeline::run_pipeline; use crate::pipeline::run_pipeline;
use crate::prom_row_builder::{PromCtx, TablesBuilder}; use crate::prom_row_builder::{PromCtx, TablesBuilder};
@@ -355,14 +355,8 @@ impl PromWriteRequest {
} }
/// Converts the write request into a record batch and reset the table data. /// Converts the write request into a record batch and reset the table data.
pub async fn as_record_batch( pub async fn as_record_batch(&mut self, bulk_ctx: &PromBulkContext) -> Result<()> {
&mut self, self.table_data.as_record_batch(bulk_ctx).await
schema_helper: SchemaHelper,
query_ctx: &QueryContextRef,
) -> Result<()> {
self.table_data
.as_record_batch(schema_helper, query_ctx)
.await
} }
} }