poc/create-alter-for-metrics:

### Commit Summary

 - **Add New Dependencies**: Updated `Cargo.lock` and `Cargo.toml` to include `metric-engine` and `mito-codec` dependencies.
 - **Enhance `MetricEngineInner`**: Modified `put.rs` to use `logical_table_id` instead of `table_id` and adjusted method calls accordingly.
 - **Expose Structs and Methods**: Made `RowModifier`, `RowsIter`, and `RowIter` structs and their methods public in `row_modifier.rs`.
 - **Implement Batch Processing**: Added batch processing logic in `batch_builder.rs` to handle row conversion to record batches with primary key encoding.
 - **Error Handling**: Introduced `EncodePrimaryKey` error variant in `error.rs` for handling primary key encoding errors.
 - **Clone Support for `TableBuilder`**: Added `Clone` trait to `TableBuilder` in `prom_row_builder.rs`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2025-06-25 07:14:02 +00:00
parent 905593dc16
commit 8b392477c8
9 changed files with 240 additions and 30 deletions

2
Cargo.lock generated
View File

@@ -11272,7 +11272,9 @@ dependencies = [
"local-ip-address",
"log-query",
"loki-proto",
"metric-engine",
"mime_guess",
"mito-codec",
"mysql_async",
"notify",
"object-pool",

View File

@@ -147,7 +147,7 @@ impl MetricEngineInner {
fn modify_rows(
&self,
physical_region_id: RegionId,
table_id: TableId,
logical_table_id: TableId,
rows: &mut Rows,
encoding: PrimaryKeyEncoding,
) -> Result<()> {
@@ -163,7 +163,9 @@ impl MetricEngineInner {
.physical_columns();
RowsIter::new(input, name_to_id)
};
let output = self.row_modifier.modify_rows(iter, table_id, encoding)?;
let output = self
.row_modifier
.modify_rows(iter, logical_table_id, encoding)?;
*rows = output;
Ok(())
}

View File

@@ -40,7 +40,7 @@ const TSID_HASH_SEED: u32 = 846793005;
///
/// - For [`PrimaryKeyEncoding::Dense`] encoding,
/// it adds two columns(`__table_id`, `__tsid`) to the row.
pub(crate) struct RowModifier {
pub struct RowModifier {
codec: SparsePrimaryKeyCodec,
}
@@ -52,7 +52,7 @@ impl RowModifier {
}
/// Modify rows with the given primary key encoding.
pub(crate) fn modify_rows(
pub fn modify_rows(
&self,
iter: RowsIter,
table_id: TableId,
@@ -74,7 +74,7 @@ impl RowModifier {
let mut buffer = vec![];
for mut iter in iter.iter_mut() {
let (table_id, tsid) = self.fill_internal_columns(table_id, &iter);
let (table_id, tsid) = Self::fill_internal_columns(table_id, &iter);
let mut values = Vec::with_capacity(num_output_column);
buffer.clear();
let internal_columns = [
@@ -135,7 +135,7 @@ impl RowModifier {
options: None,
});
for iter in iter.iter_mut() {
let (table_id, tsid) = self.fill_internal_columns(table_id, &iter);
let (table_id, tsid) = Self::fill_internal_columns(table_id, &iter);
iter.row.values.push(table_id);
iter.row.values.push(tsid);
}
@@ -144,7 +144,7 @@ impl RowModifier {
}
/// Fills internal columns of a row with table name and a hash of tag values.
fn fill_internal_columns(&self, table_id: TableId, iter: &RowIter<'_>) -> (Value, Value) {
pub fn fill_internal_columns(table_id: TableId, iter: &RowIter<'_>) -> (Value, Value) {
let mut hasher = TsidGenerator::default();
for (name, value) in iter.primary_keys_with_name() {
// The type is checked before. So only null is ignored.
@@ -264,7 +264,7 @@ impl IterIndex {
}
/// Iterator of rows.
pub(crate) struct RowsIter {
pub struct RowsIter {
rows: Rows,
index: IterIndex,
}
@@ -276,7 +276,7 @@ impl RowsIter {
}
/// Returns the iterator of rows.
fn iter_mut(&mut self) -> impl Iterator<Item = RowIter> {
pub fn iter_mut(&mut self) -> impl Iterator<Item = RowIter> {
self.rows.rows.iter_mut().map(|row| RowIter {
row,
index: &self.index,
@@ -290,10 +290,22 @@ impl RowsIter {
.iter()
.map(|idx| std::mem::take(&mut self.rows.schema[idx.index]))
}
pub fn num_rows(&self) -> usize {
self.rows.rows.len()
}
pub fn num_columns(&self) -> usize {
self.rows.schema.len()
}
pub fn num_primary_keys(&self) -> usize {
self.index.num_primary_key_column
}
}
/// Iterator of a row.
struct RowIter<'a> {
pub struct RowIter<'a> {
row: &'a mut Row,
index: &'a IterIndex,
schema: &'a Vec<ColumnSchema>,
@@ -313,7 +325,7 @@ impl RowIter<'_> {
}
/// Returns the primary keys.
fn primary_keys(&self) -> impl Iterator<Item = (ColumnId, ValueRef)> {
pub fn primary_keys(&self) -> impl Iterator<Item = (ColumnId, ValueRef)> {
self.index.indices[..self.index.num_primary_key_column]
.iter()
.map(|idx| {
@@ -333,6 +345,13 @@ impl RowIter<'_> {
.iter()
.map(|idx| std::mem::take(&mut self.row.values[idx.index]))
}
/// Returns value at given offset.
/// # Panics
/// Panics if offset out-of-bound
pub fn value_at(&self, idx: usize) -> &Value {
&self.row.values[idx]
}
}
#[cfg(test)]
@@ -476,7 +495,6 @@ mod tests {
#[test]
fn test_fill_internal_columns() {
let name_to_column_id = test_name_to_column_id();
let encoder = RowModifier::new();
let table_id = 1025;
let schema = test_schema();
let row = test_row("greptimedb", "127.0.0.1");
@@ -486,7 +504,7 @@ mod tests {
};
let mut rows_iter = RowsIter::new(rows, &name_to_column_id);
let row_iter = rows_iter.iter_mut().next().unwrap();
let (encoded_table_id, tsid) = encoder.fill_internal_columns(table_id, &row_iter);
let (encoded_table_id, tsid) = RowModifier::fill_internal_columns(table_id, &row_iter);
assert_eq!(encoded_table_id, ValueData::U32Value(1025).into());
assert_eq!(tsid, ValueData::U64Value(9442261431637846000).into());
@@ -514,7 +532,7 @@ mod tests {
};
let mut rows_iter = RowsIter::new(rows, &name_to_column_id);
let row_iter = rows_iter.iter_mut().next().unwrap();
let (encoded_table_id, tsid) = encoder.fill_internal_columns(table_id, &row_iter);
let (encoded_table_id, tsid) = RowModifier::fill_internal_columns(table_id, &row_iter);
assert_eq!(encoded_table_id, ValueData::U32Value(1025).into());
assert_eq!(tsid, ValueData::U64Value(9442261431637846000).into());
}

View File

@@ -639,16 +639,12 @@ pub async fn ensure_logical_tables_for_metrics(
) -> Result<()> {
let catalog_name = query_ctx.current_catalog();
let schema_name = query_ctx.current_schema();
// 1. For each physical table, creates it if it doesn't exist.
for physical_table_name in schemas.schemas.keys() {
// Check if the physical table exists and create it if it doesn't
let physical_table_opt = helper
.get_table(
catalog_name,
&schema_name,
physical_table_name,
)
.get_table(catalog_name, &schema_name, physical_table_name)
.await?;
if physical_table_opt.is_none() {

View File

@@ -74,12 +74,14 @@ jsonb.workspace = true
lazy_static.workspace = true
log-query.workspace = true
loki-proto.workspace = true
metric-engine.workspace = true
mime_guess = "2.0"
mito-codec.workspace = true
notify.workspace = true
object-pool = "0.5"
once_cell.workspace = true
operator.workspace = true
openmetrics-parser = "0.4"
operator.workspace = true
simd-json.workspace = true
socket2 = "0.5"
# use crates.io version once the following PRs is merged into the nextest release

View File

@@ -13,16 +13,30 @@
// limitations under the License.
use std::collections::HashSet;
use std::sync::Arc;
use ahash::HashMap;
use api::v1::{ColumnDataType, ColumnSchema, SemanticType};
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, OpType, SemanticType};
use arrow::array::{
ArrayBuilder, ArrayRef, BinaryBuilder, Float64Array, TimestampMillisecondArray, UInt64Array,
UInt8Array,
};
use arrow_schema::Field;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::prelude::{GREPTIME_PHYSICAL_TABLE, GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use metric_engine::row_modifier::{RowModifier, RowsIter};
use mito_codec::row_converter::SparsePrimaryKeyCodec;
use operator::schema_helper::{
ensure_logical_tables_for_metrics, LogicalSchema, LogicalSchemas, SchemaHelper,
};
use session::context::QueryContextRef;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::consts::{
ReservedColumnId, OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
};
use table::metadata::TableId;
use table::table_name::TableName;
use table::TableRef;
@@ -113,21 +127,31 @@ impl MetricsBatchBuilder {
Ok(physical_table_name.to_string())
}
/// Retrieves physical region metadata of given logical table names.
async fn collect_physical_region_metadata(
logical_table_names: &[String],
) -> HashMap<
String, /*logical table name*/
RegionMetadata, /*Region metadata for physical re*/
> {
todo!()
}
/// Builds [RecordBatch] from rows with primary key encoded.
/// Potentially we also need to modify the column name of timestamp and value field to
/// match the schema of physical tables.
/// Note:
/// Make sure all logical table and physical table are created when reach here and the mapping
/// from logical table name to physical table ref is stored in [physical_tables].
fn rows_to_batch(
async fn rows_to_batch(
&self,
current_catalog: Option<String>,
current_schema: Option<String>,
table_data: &HashMap<PromCtx, HashMap<String, TableBuilder>>,
physical_tables: &HashMap<TableName, TableRef>,
table_data: &mut HashMap<PromCtx, HashMap<String, TableBuilder>>,
physical_table_metadata: &HashMap<TableName, RegionMetadataRef>,
) -> error::Result<()> {
for (ctx, tables_in_schema) in table_data {
for (logical_table_name, _table) in tables_in_schema {
for (logical_table_name, table) in tables_in_schema {
// use session catalog.
let catalog = current_catalog.as_deref().unwrap_or(DEFAULT_CATALOG_NAME);
// schema in PromCtx precedes session schema.
@@ -136,8 +160,19 @@ impl MetricsBatchBuilder {
.as_deref()
.or(current_schema.as_deref())
.unwrap_or(DEFAULT_SCHEMA_NAME);
let logical_table = TableName::new(catalog, schema, logical_table_name);
let Some(_physical_table) = physical_tables.get(&logical_table) else {
let logical_table_ref = self
.schema_helper
.catalog_manager()
.table(&catalog, &schema, &logical_table_name, None)
.await
.context(error::CatalogSnafu)?
.context(error::TableNotFoundSnafu {
catalog,
schema,
table: logical_table_name,
})?;
let logical_table = TableName::new(catalog, schema, logical_table_name.clone());
let Some(physical_table) = physical_table_metadata.get(&logical_table) else {
// all physical tables must be created when reach here.
return error::TableNotFoundSnafu {
catalog,
@@ -146,11 +181,157 @@ impl MetricsBatchBuilder {
}
.fail();
};
let batch_encoder = Self::create_sparse_encoder(
logical_table_ref.table_info().table_id(),
logical_table_name.clone(),
physical_table.clone(),
std::mem::take(table),
);
let _batch = batch_encoder.to_batch()?;
//todo(hl): Ingest batch.
}
}
todo!()
}
/// Creates Encoder that converts Rows into RecordBatch with primary key encoded.
fn create_sparse_encoder(
logical_table_id: TableId,
logical_table_name: String,
physical_region_meta: RegionMetadataRef,
mut table_builder: TableBuilder,
) -> BatchEncoder {
let name_to_id: std::collections::HashMap<_, _> = physical_region_meta
.column_metadatas
.iter()
.map(|c| (c.column_schema.name.clone(), c.column_id))
.collect();
// todo(hl): we can simplified the row iter because schema in TableBuilder is known (ts, val, tags...)
let row_insert_request = table_builder.as_row_insert_request(logical_table_name);
let rows = row_insert_request.rows.unwrap();
let rows_iter = RowsIter::new(rows, &name_to_id);
BatchEncoder::new(logical_table_id, rows_iter)
}
}
struct BatchEncoder {
logical_table_id: TableId,
iter: RowsIter,
encoded_primary_key_array_builder: BinaryBuilder,
timestamps: Vec<i64>,
value: Vec<f64>,
pk_codec: SparsePrimaryKeyCodec,
}
impl BatchEncoder {
fn new(logical_table_id: TableId, iter: RowsIter) -> BatchEncoder {
let num_rows = iter.num_rows();
Self {
logical_table_id,
iter,
encoded_primary_key_array_builder: BinaryBuilder::with_capacity(num_rows, 0),
timestamps: Vec::with_capacity(num_rows),
value: Vec::with_capacity(num_rows),
pk_codec: SparsePrimaryKeyCodec::schemaless(),
}
}
/// Creates the schema of output record batch.
fn schema() -> arrow::datatypes::SchemaRef {
Arc::new(arrow::datatypes::Schema::new(vec![
Field::new(GREPTIME_VALUE, arrow::datatypes::DataType::Float64, false),
Field::new(
GREPTIME_TIMESTAMP,
arrow::datatypes::DataType::Timestamp(
arrow::datatypes::TimeUnit::Millisecond,
None,
),
false,
),
Field::new(
PRIMARY_KEY_COLUMN_NAME,
arrow::datatypes::DataType::Binary,
false,
),
Field::new(
SEQUENCE_COLUMN_NAME,
arrow::datatypes::DataType::UInt64,
false,
),
Field::new(
OP_TYPE_COLUMN_NAME,
arrow::datatypes::DataType::UInt8,
false,
),
]))
}
fn to_batch(mut self) -> error::Result<arrow::record_batch::RecordBatch> {
let mut encode_buf = vec![];
for row in self.iter.iter_mut() {
let (table_id, ts_id) = RowModifier::fill_internal_columns(self.logical_table_id, &row);
let internal_columns = [
(
ReservedColumnId::table_id(),
api::helper::pb_value_to_value_ref(&table_id, &None),
),
(
ReservedColumnId::tsid(),
api::helper::pb_value_to_value_ref(&ts_id, &None),
),
];
self.pk_codec
.encode_to_vec(internal_columns.into_iter(), &mut encode_buf)
.context(error::EncodePrimaryKeySnafu)?;
self.pk_codec
.encode_to_vec(row.primary_keys(), &mut encode_buf)
.context(error::EncodePrimaryKeySnafu)?;
self.encoded_primary_key_array_builder
.append_value(&encode_buf);
// process timestamp and field. We already know the position of timestamps and values in [TableBuilder].
let ValueData::TimestampMillisecondValue(ts) =
row.value_at(0).value_data.as_ref().unwrap()
else {
todo!("return an error")
};
self.timestamps.push(*ts);
let ValueData::F64Value(val) = row.value_at(1).value_data.as_ref().unwrap() else {
todo!("return an error")
};
self.value.push(*val);
}
let num_rows = self.iter.num_rows();
debug_assert_eq!(self.value.len(), num_rows);
debug_assert_eq!(self.value.len(), self.timestamps.len());
debug_assert_eq!(
self.value.len(),
self.encoded_primary_key_array_builder.len()
);
let value = Float64Array::from(self.value);
let timestamp = TimestampMillisecondArray::from(self.timestamps);
let op_type = UInt8Array::from_value(OpType::Put as u8, num_rows);
// todo: now we set sequence all to 0.
let sequence = UInt64Array::from_value(0, num_rows);
// todo(hl): sort batch by primary key.
arrow::array::RecordBatch::try_new(
Self::schema(),
vec![
Arc::new(value) as ArrayRef,
Arc::new(timestamp) as ArrayRef,
Arc::new(self.encoded_primary_key_array_builder.finish()) as ArrayRef,
Arc::new(sequence) as ArrayRef,
Arc::new(op_type) as ArrayRef,
],
)
.context(error::ArrowSnafu)
}
}
fn tags_to_logical_schemas(

View File

@@ -638,6 +638,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to encode primary key"))]
EncodePrimaryKey {
source: mito_codec::error::Error,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -763,6 +770,7 @@ impl ErrorExt for Error {
HandleOtelArrowRequest { .. } => StatusCode::Internal,
CommonMeta { source, .. } => source.status_code(),
Operator { source, .. } => source.status_code(),
EncodePrimaryKey { source, .. } => source.status_code(),
}
}

View File

@@ -22,6 +22,7 @@ use datafusion_expr::LogicalPlan;
use datatypes::schema::Schema;
pub mod addrs;
#[allow(dead_code)]
mod batch_builder;
pub mod configurator;
pub(crate) mod elasticsearch;

View File

@@ -114,7 +114,7 @@ impl TablesBuilder {
}
/// Builder for one table.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub(crate) struct TableBuilder {
/// Column schemas.
schema: Vec<ColumnSchema>,