From 8b392477c871c5e703c90d6cf25a8169ccc556f6 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Wed, 25 Jun 2025 07:14:02 +0000 Subject: [PATCH] poc/create-alter-for-metrics: ### Commit Summary - **Add New Dependencies**: Updated `Cargo.lock` and `Cargo.toml` to include `metric-engine` and `mito-codec` dependencies. - **Enhance `MetricEngineInner`**: Modified `put.rs` to use `logical_table_id` instead of `table_id` and adjusted method calls accordingly. - **Expose Structs and Methods**: Made `RowModifier`, `RowsIter`, and `RowIter` structs and their methods public in `row_modifier.rs`. - **Implement Batch Processing**: Added batch processing logic in `batch_builder.rs` to handle row conversion to record batches with primary key encoding. - **Error Handling**: Introduced `EncodePrimaryKey` error variant in `error.rs` for handling primary key encoding errors. - **Clone Support for `TableBuilder`**: Added `Clone` trait to `TableBuilder` in `prom_row_builder.rs`. Signed-off-by: Lei, HUANG --- Cargo.lock | 2 + src/metric-engine/src/engine/put.rs | 6 +- src/metric-engine/src/row_modifier.rs | 42 ++++-- src/operator/src/schema_helper.rs | 8 +- src/servers/Cargo.toml | 4 +- src/servers/src/batch_builder.rs | 197 ++++++++++++++++++++++++-- src/servers/src/error.rs | 8 ++ src/servers/src/lib.rs | 1 + src/servers/src/prom_row_builder.rs | 2 +- 9 files changed, 240 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8381d83409..7ae88c65fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11272,7 +11272,9 @@ dependencies = [ "local-ip-address", "log-query", "loki-proto", + "metric-engine", "mime_guess", + "mito-codec", "mysql_async", "notify", "object-pool", diff --git a/src/metric-engine/src/engine/put.rs b/src/metric-engine/src/engine/put.rs index c7dc44be1e..32d4c4274f 100644 --- a/src/metric-engine/src/engine/put.rs +++ b/src/metric-engine/src/engine/put.rs @@ -147,7 +147,7 @@ impl MetricEngineInner { fn modify_rows( &self, physical_region_id: RegionId, - table_id: TableId, + logical_table_id: TableId, rows: &mut Rows, encoding: PrimaryKeyEncoding, ) -> Result<()> { @@ -163,7 +163,9 @@ impl MetricEngineInner { .physical_columns(); RowsIter::new(input, name_to_id) }; - let output = self.row_modifier.modify_rows(iter, table_id, encoding)?; + let output = self + .row_modifier + .modify_rows(iter, logical_table_id, encoding)?; *rows = output; Ok(()) } diff --git a/src/metric-engine/src/row_modifier.rs b/src/metric-engine/src/row_modifier.rs index 56618ded8d..d771ad4552 100644 --- a/src/metric-engine/src/row_modifier.rs +++ b/src/metric-engine/src/row_modifier.rs @@ -40,7 +40,7 @@ const TSID_HASH_SEED: u32 = 846793005; /// /// - For [`PrimaryKeyEncoding::Dense`] encoding, /// it adds two columns(`__table_id`, `__tsid`) to the row. -pub(crate) struct RowModifier { +pub struct RowModifier { codec: SparsePrimaryKeyCodec, } @@ -52,7 +52,7 @@ impl RowModifier { } /// Modify rows with the given primary key encoding. - pub(crate) fn modify_rows( + pub fn modify_rows( &self, iter: RowsIter, table_id: TableId, @@ -74,7 +74,7 @@ impl RowModifier { let mut buffer = vec![]; for mut iter in iter.iter_mut() { - let (table_id, tsid) = self.fill_internal_columns(table_id, &iter); + let (table_id, tsid) = Self::fill_internal_columns(table_id, &iter); let mut values = Vec::with_capacity(num_output_column); buffer.clear(); let internal_columns = [ @@ -135,7 +135,7 @@ impl RowModifier { options: None, }); for iter in iter.iter_mut() { - let (table_id, tsid) = self.fill_internal_columns(table_id, &iter); + let (table_id, tsid) = Self::fill_internal_columns(table_id, &iter); iter.row.values.push(table_id); iter.row.values.push(tsid); } @@ -144,7 +144,7 @@ impl RowModifier { } /// Fills internal columns of a row with table name and a hash of tag values. - fn fill_internal_columns(&self, table_id: TableId, iter: &RowIter<'_>) -> (Value, Value) { + pub fn fill_internal_columns(table_id: TableId, iter: &RowIter<'_>) -> (Value, Value) { let mut hasher = TsidGenerator::default(); for (name, value) in iter.primary_keys_with_name() { // The type is checked before. So only null is ignored. @@ -264,7 +264,7 @@ impl IterIndex { } /// Iterator of rows. -pub(crate) struct RowsIter { +pub struct RowsIter { rows: Rows, index: IterIndex, } @@ -276,7 +276,7 @@ impl RowsIter { } /// Returns the iterator of rows. - fn iter_mut(&mut self) -> impl Iterator { + pub fn iter_mut(&mut self) -> impl Iterator { self.rows.rows.iter_mut().map(|row| RowIter { row, index: &self.index, @@ -290,10 +290,22 @@ impl RowsIter { .iter() .map(|idx| std::mem::take(&mut self.rows.schema[idx.index])) } + + pub fn num_rows(&self) -> usize { + self.rows.rows.len() + } + + pub fn num_columns(&self) -> usize { + self.rows.schema.len() + } + + pub fn num_primary_keys(&self) -> usize { + self.index.num_primary_key_column + } } /// Iterator of a row. -struct RowIter<'a> { +pub struct RowIter<'a> { row: &'a mut Row, index: &'a IterIndex, schema: &'a Vec, @@ -313,7 +325,7 @@ impl RowIter<'_> { } /// Returns the primary keys. - fn primary_keys(&self) -> impl Iterator { + pub fn primary_keys(&self) -> impl Iterator { self.index.indices[..self.index.num_primary_key_column] .iter() .map(|idx| { @@ -333,6 +345,13 @@ impl RowIter<'_> { .iter() .map(|idx| std::mem::take(&mut self.row.values[idx.index])) } + + /// Returns value at given offset. + /// # Panics + /// Panics if offset out-of-bound + pub fn value_at(&self, idx: usize) -> &Value { + &self.row.values[idx] + } } #[cfg(test)] @@ -476,7 +495,6 @@ mod tests { #[test] fn test_fill_internal_columns() { let name_to_column_id = test_name_to_column_id(); - let encoder = RowModifier::new(); let table_id = 1025; let schema = test_schema(); let row = test_row("greptimedb", "127.0.0.1"); @@ -486,7 +504,7 @@ mod tests { }; let mut rows_iter = RowsIter::new(rows, &name_to_column_id); let row_iter = rows_iter.iter_mut().next().unwrap(); - let (encoded_table_id, tsid) = encoder.fill_internal_columns(table_id, &row_iter); + let (encoded_table_id, tsid) = RowModifier::fill_internal_columns(table_id, &row_iter); assert_eq!(encoded_table_id, ValueData::U32Value(1025).into()); assert_eq!(tsid, ValueData::U64Value(9442261431637846000).into()); @@ -514,7 +532,7 @@ mod tests { }; let mut rows_iter = RowsIter::new(rows, &name_to_column_id); let row_iter = rows_iter.iter_mut().next().unwrap(); - let (encoded_table_id, tsid) = encoder.fill_internal_columns(table_id, &row_iter); + let (encoded_table_id, tsid) = RowModifier::fill_internal_columns(table_id, &row_iter); assert_eq!(encoded_table_id, ValueData::U32Value(1025).into()); assert_eq!(tsid, ValueData::U64Value(9442261431637846000).into()); } diff --git a/src/operator/src/schema_helper.rs b/src/operator/src/schema_helper.rs index 647433b6c0..2538e1ac85 100644 --- a/src/operator/src/schema_helper.rs +++ b/src/operator/src/schema_helper.rs @@ -639,16 +639,12 @@ pub async fn ensure_logical_tables_for_metrics( ) -> Result<()> { let catalog_name = query_ctx.current_catalog(); let schema_name = query_ctx.current_schema(); - + // 1. For each physical table, creates it if it doesn't exist. for physical_table_name in schemas.schemas.keys() { // Check if the physical table exists and create it if it doesn't let physical_table_opt = helper - .get_table( - catalog_name, - &schema_name, - physical_table_name, - ) + .get_table(catalog_name, &schema_name, physical_table_name) .await?; if physical_table_opt.is_none() { diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 0a6cb2f756..eb1d071134 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -74,12 +74,14 @@ jsonb.workspace = true lazy_static.workspace = true log-query.workspace = true loki-proto.workspace = true +metric-engine.workspace = true mime_guess = "2.0" +mito-codec.workspace = true notify.workspace = true object-pool = "0.5" once_cell.workspace = true -operator.workspace = true openmetrics-parser = "0.4" +operator.workspace = true simd-json.workspace = true socket2 = "0.5" # use crates.io version once the following PRs is merged into the nextest release diff --git a/src/servers/src/batch_builder.rs b/src/servers/src/batch_builder.rs index 3ebecdebf1..a52125da2d 100644 --- a/src/servers/src/batch_builder.rs +++ b/src/servers/src/batch_builder.rs @@ -13,16 +13,30 @@ // limitations under the License. use std::collections::HashSet; +use std::sync::Arc; use ahash::HashMap; -use api::v1::{ColumnDataType, ColumnSchema, SemanticType}; +use api::v1::value::ValueData; +use api::v1::{ColumnDataType, ColumnSchema, OpType, SemanticType}; +use arrow::array::{ + ArrayBuilder, ArrayRef, BinaryBuilder, Float64Array, TimestampMillisecondArray, UInt64Array, + UInt8Array, +}; +use arrow_schema::Field; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_query::prelude::{GREPTIME_PHYSICAL_TABLE, GREPTIME_TIMESTAMP, GREPTIME_VALUE}; +use metric_engine::row_modifier::{RowModifier, RowsIter}; +use mito_codec::row_converter::SparsePrimaryKeyCodec; use operator::schema_helper::{ ensure_logical_tables_for_metrics, LogicalSchema, LogicalSchemas, SchemaHelper, }; use session::context::QueryContextRef; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; +use store_api::metadata::{RegionMetadata, RegionMetadataRef}; +use store_api::storage::consts::{ + ReservedColumnId, OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME, +}; +use table::metadata::TableId; use table::table_name::TableName; use table::TableRef; @@ -113,21 +127,31 @@ impl MetricsBatchBuilder { Ok(physical_table_name.to_string()) } + /// Retrieves physical region metadata of given logical table names. + async fn collect_physical_region_metadata( + logical_table_names: &[String], + ) -> HashMap< + String, /*logical table name*/ + RegionMetadata, /*Region metadata for physical re*/ + > { + todo!() + } + /// Builds [RecordBatch] from rows with primary key encoded. /// Potentially we also need to modify the column name of timestamp and value field to /// match the schema of physical tables. /// Note: /// Make sure all logical table and physical table are created when reach here and the mapping /// from logical table name to physical table ref is stored in [physical_tables]. - fn rows_to_batch( + async fn rows_to_batch( &self, current_catalog: Option, current_schema: Option, - table_data: &HashMap>, - physical_tables: &HashMap, + table_data: &mut HashMap>, + physical_table_metadata: &HashMap, ) -> error::Result<()> { for (ctx, tables_in_schema) in table_data { - for (logical_table_name, _table) in tables_in_schema { + for (logical_table_name, table) in tables_in_schema { // use session catalog. let catalog = current_catalog.as_deref().unwrap_or(DEFAULT_CATALOG_NAME); // schema in PromCtx precedes session schema. @@ -136,8 +160,19 @@ impl MetricsBatchBuilder { .as_deref() .or(current_schema.as_deref()) .unwrap_or(DEFAULT_SCHEMA_NAME); - let logical_table = TableName::new(catalog, schema, logical_table_name); - let Some(_physical_table) = physical_tables.get(&logical_table) else { + let logical_table_ref = self + .schema_helper + .catalog_manager() + .table(&catalog, &schema, &logical_table_name, None) + .await + .context(error::CatalogSnafu)? + .context(error::TableNotFoundSnafu { + catalog, + schema, + table: logical_table_name, + })?; + let logical_table = TableName::new(catalog, schema, logical_table_name.clone()); + let Some(physical_table) = physical_table_metadata.get(&logical_table) else { // all physical tables must be created when reach here. return error::TableNotFoundSnafu { catalog, @@ -146,11 +181,157 @@ impl MetricsBatchBuilder { } .fail(); }; + + let batch_encoder = Self::create_sparse_encoder( + logical_table_ref.table_info().table_id(), + logical_table_name.clone(), + physical_table.clone(), + std::mem::take(table), + ); + let _batch = batch_encoder.to_batch()?; + //todo(hl): Ingest batch. } } todo!() } + + /// Creates Encoder that converts Rows into RecordBatch with primary key encoded. + fn create_sparse_encoder( + logical_table_id: TableId, + logical_table_name: String, + physical_region_meta: RegionMetadataRef, + mut table_builder: TableBuilder, + ) -> BatchEncoder { + let name_to_id: std::collections::HashMap<_, _> = physical_region_meta + .column_metadatas + .iter() + .map(|c| (c.column_schema.name.clone(), c.column_id)) + .collect(); + + // todo(hl): we can simplified the row iter because schema in TableBuilder is known (ts, val, tags...) + let row_insert_request = table_builder.as_row_insert_request(logical_table_name); + let rows = row_insert_request.rows.unwrap(); + let rows_iter = RowsIter::new(rows, &name_to_id); + BatchEncoder::new(logical_table_id, rows_iter) + } +} + +struct BatchEncoder { + logical_table_id: TableId, + iter: RowsIter, + encoded_primary_key_array_builder: BinaryBuilder, + timestamps: Vec, + value: Vec, + pk_codec: SparsePrimaryKeyCodec, +} + +impl BatchEncoder { + fn new(logical_table_id: TableId, iter: RowsIter) -> BatchEncoder { + let num_rows = iter.num_rows(); + Self { + logical_table_id, + iter, + encoded_primary_key_array_builder: BinaryBuilder::with_capacity(num_rows, 0), + timestamps: Vec::with_capacity(num_rows), + value: Vec::with_capacity(num_rows), + pk_codec: SparsePrimaryKeyCodec::schemaless(), + } + } + + /// Creates the schema of output record batch. + fn schema() -> arrow::datatypes::SchemaRef { + Arc::new(arrow::datatypes::Schema::new(vec![ + Field::new(GREPTIME_VALUE, arrow::datatypes::DataType::Float64, false), + Field::new( + GREPTIME_TIMESTAMP, + arrow::datatypes::DataType::Timestamp( + arrow::datatypes::TimeUnit::Millisecond, + None, + ), + false, + ), + Field::new( + PRIMARY_KEY_COLUMN_NAME, + arrow::datatypes::DataType::Binary, + false, + ), + Field::new( + SEQUENCE_COLUMN_NAME, + arrow::datatypes::DataType::UInt64, + false, + ), + Field::new( + OP_TYPE_COLUMN_NAME, + arrow::datatypes::DataType::UInt8, + false, + ), + ])) + } + + fn to_batch(mut self) -> error::Result { + let mut encode_buf = vec![]; + for row in self.iter.iter_mut() { + let (table_id, ts_id) = RowModifier::fill_internal_columns(self.logical_table_id, &row); + let internal_columns = [ + ( + ReservedColumnId::table_id(), + api::helper::pb_value_to_value_ref(&table_id, &None), + ), + ( + ReservedColumnId::tsid(), + api::helper::pb_value_to_value_ref(&ts_id, &None), + ), + ]; + self.pk_codec + .encode_to_vec(internal_columns.into_iter(), &mut encode_buf) + .context(error::EncodePrimaryKeySnafu)?; + self.pk_codec + .encode_to_vec(row.primary_keys(), &mut encode_buf) + .context(error::EncodePrimaryKeySnafu)?; + self.encoded_primary_key_array_builder + .append_value(&encode_buf); + + // process timestamp and field. We already know the position of timestamps and values in [TableBuilder]. + let ValueData::TimestampMillisecondValue(ts) = + row.value_at(0).value_data.as_ref().unwrap() + else { + todo!("return an error") + }; + self.timestamps.push(*ts); + + let ValueData::F64Value(val) = row.value_at(1).value_data.as_ref().unwrap() else { + todo!("return an error") + }; + self.value.push(*val); + } + + let num_rows = self.iter.num_rows(); + debug_assert_eq!(self.value.len(), num_rows); + debug_assert_eq!(self.value.len(), self.timestamps.len()); + debug_assert_eq!( + self.value.len(), + self.encoded_primary_key_array_builder.len() + ); + let value = Float64Array::from(self.value); + let timestamp = TimestampMillisecondArray::from(self.timestamps); + + let op_type = UInt8Array::from_value(OpType::Put as u8, num_rows); + // todo: now we set sequence all to 0. + let sequence = UInt64Array::from_value(0, num_rows); + // todo(hl): sort batch by primary key. + arrow::array::RecordBatch::try_new( + Self::schema(), + vec![ + Arc::new(value) as ArrayRef, + Arc::new(timestamp) as ArrayRef, + Arc::new(self.encoded_primary_key_array_builder.finish()) as ArrayRef, + Arc::new(sequence) as ArrayRef, + Arc::new(op_type) as ArrayRef, + ], + ) + .context(error::ArrowSnafu) + } } fn tags_to_logical_schemas( diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 848fb142a3..40685c25eb 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -638,6 +638,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to encode primary key"))] + EncodePrimaryKey { + source: mito_codec::error::Error, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -763,6 +770,7 @@ impl ErrorExt for Error { HandleOtelArrowRequest { .. } => StatusCode::Internal, CommonMeta { source, .. } => source.status_code(), Operator { source, .. } => source.status_code(), + EncodePrimaryKey { source, .. } => source.status_code(), } } diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index 8b183e62ef..4b14c9fba5 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -22,6 +22,7 @@ use datafusion_expr::LogicalPlan; use datatypes::schema::Schema; pub mod addrs; +#[allow(dead_code)] mod batch_builder; pub mod configurator; pub(crate) mod elasticsearch; diff --git a/src/servers/src/prom_row_builder.rs b/src/servers/src/prom_row_builder.rs index 54296b3564..8e61c80dc1 100644 --- a/src/servers/src/prom_row_builder.rs +++ b/src/servers/src/prom_row_builder.rs @@ -114,7 +114,7 @@ impl TablesBuilder { } /// Builder for one table. -#[derive(Debug)] +#[derive(Debug, Clone)] pub(crate) struct TableBuilder { /// Column schemas. schema: Vec,