poc/create-alter-for-metrics:

- **Add `ahash` Dependency**: Updated `Cargo.lock` and `Cargo.toml` to include `ahash` as a dependency for the `metric-engine` project.
 - **Refactor `MetricsBatchBuilder`**: Modified `MetricsBatchBuilder` in `batch_builder.rs` to include a `builders` field and refactored methods to use `BatchEncoder` for appending rows.
 - **Update `BatchEncoder`**: Changed `BatchEncoder` in `batch_builder.rs` to use a `name_to_id` map and added `append_rows` and `finish` methods for handling row encoding.
 - **Modify `LogicalSchemas` Structure**: Updated `schema_helper.rs` to use `HashMap` instead of `ahash::HashMap` for the `schemas` field in `LogicalSchemas`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2025-06-25 07:33:11 +00:00
parent 8b392477c8
commit 892cb66c53
5 changed files with 48 additions and 42 deletions

1
Cargo.lock generated
View File

@@ -7234,6 +7234,7 @@ dependencies = [
name = "metric-engine"
version = "0.15.0"
dependencies = [
"ahash 0.8.11",
"api",
"aquamarine",
"async-stream",

View File

@@ -8,6 +8,7 @@ license.workspace = true
workspace = true
[dependencies]
ahash.workspace = true
api.workspace = true
aquamarine.workspace = true
async-stream.workspace = true

View File

@@ -627,7 +627,7 @@ pub struct LogicalSchema {
/// Logical table schemas.
pub struct LogicalSchemas {
/// Logical table schemas group by physical table name.
pub schemas: ahash::HashMap<String, Vec<LogicalSchema>>,
pub schemas: HashMap<String, Vec<LogicalSchema>>,
}
/// Creates or alters logical tables to match the provided schemas

View File

@@ -12,10 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use ahash::HashMap;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, OpType, SemanticType};
use arrow::array::{
@@ -36,20 +35,24 @@ use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::consts::{
ReservedColumnId, OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
};
use store_api::storage::ColumnId;
use table::metadata::TableId;
use table::table_name::TableName;
use table::TableRef;
use crate::error;
use crate::prom_row_builder::{PromCtx, TableBuilder};
pub struct MetricsBatchBuilder {
schema_helper: SchemaHelper,
builders: HashMap<TableId, BatchEncoder>,
}
impl MetricsBatchBuilder {
pub fn new(schema_helper: SchemaHelper) -> Self {
MetricsBatchBuilder { schema_helper }
MetricsBatchBuilder {
schema_helper,
builders: Default::default(),
}
}
/// Detected the DDL requirements according to the staged table rows.
@@ -129,7 +132,7 @@ impl MetricsBatchBuilder {
/// Retrieves physical region metadata of given logical table names.
async fn collect_physical_region_metadata(
logical_table_names: &[String],
_logical_table_names: &[String],
) -> HashMap<
String, /*logical table name*/
RegionMetadata, /*Region metadata for physical re*/
@@ -143,8 +146,8 @@ impl MetricsBatchBuilder {
/// Note:
/// Make sure all logical table and physical table are created when reach here and the mapping
/// from logical table name to physical table ref is stored in [physical_tables].
async fn rows_to_batch(
&self,
async fn append_rows_to_batch(
&mut self,
current_catalog: Option<String>,
current_schema: Option<String>,
table_data: &mut HashMap<PromCtx, HashMap<String, TableBuilder>>,
@@ -182,14 +185,16 @@ impl MetricsBatchBuilder {
.fail();
};
let batch_encoder = Self::create_sparse_encoder(
let encoder = self
.builders
.entry(physical_table.region_id.table_id())
.or_insert_with(|| Self::create_sparse_encoder(&physical_table));
encoder.append_rows(
logical_table_ref.table_info().table_id(),
logical_table_name.clone(),
physical_table.clone(),
std::mem::take(table),
);
let _batch = batch_encoder.to_batch()?;
//todo(hl): Ingest batch.
)?;
//todo(hl): finish and ingest batch.
}
}
@@ -197,29 +202,18 @@ impl MetricsBatchBuilder {
}
/// Creates Encoder that converts Rows into RecordBatch with primary key encoded.
fn create_sparse_encoder(
logical_table_id: TableId,
logical_table_name: String,
physical_region_meta: RegionMetadataRef,
mut table_builder: TableBuilder,
) -> BatchEncoder {
let name_to_id: std::collections::HashMap<_, _> = physical_region_meta
fn create_sparse_encoder(physical_region_meta: &RegionMetadataRef) -> BatchEncoder {
let name_to_id: HashMap<_, _> = physical_region_meta
.column_metadatas
.iter()
.map(|c| (c.column_schema.name.clone(), c.column_id))
.collect();
// todo(hl): we can simplified the row iter because schema in TableBuilder is known (ts, val, tags...)
let row_insert_request = table_builder.as_row_insert_request(logical_table_name);
let rows = row_insert_request.rows.unwrap();
let rows_iter = RowsIter::new(rows, &name_to_id);
BatchEncoder::new(logical_table_id, rows_iter)
BatchEncoder::new(name_to_id)
}
}
struct BatchEncoder {
logical_table_id: TableId,
iter: RowsIter,
name_to_id: HashMap<String, ColumnId>,
encoded_primary_key_array_builder: BinaryBuilder,
timestamps: Vec<i64>,
value: Vec<f64>,
@@ -227,14 +221,12 @@ struct BatchEncoder {
}
impl BatchEncoder {
fn new(logical_table_id: TableId, iter: RowsIter) -> BatchEncoder {
let num_rows = iter.num_rows();
fn new(name_to_id: HashMap<String, ColumnId>) -> BatchEncoder {
Self {
logical_table_id,
iter,
encoded_primary_key_array_builder: BinaryBuilder::with_capacity(num_rows, 0),
timestamps: Vec::with_capacity(num_rows),
value: Vec::with_capacity(num_rows),
name_to_id,
encoded_primary_key_array_builder: BinaryBuilder::with_capacity(16, 0),
timestamps: Vec::with_capacity(16),
value: Vec::with_capacity(16),
pk_codec: SparsePrimaryKeyCodec::schemaless(),
}
}
@@ -269,10 +261,19 @@ impl BatchEncoder {
]))
}
fn to_batch(mut self) -> error::Result<arrow::record_batch::RecordBatch> {
fn append_rows(
&mut self,
logical_table_id: TableId,
mut table_builder: TableBuilder,
) -> error::Result<()> {
// todo(hl): we can simplified the row iter because schema in TableBuilder is known (ts, val, tags...)
let row_insert_request = table_builder.as_row_insert_request("don't care".to_string());
let mut iter = RowsIter::new(row_insert_request.rows.unwrap(), &self.name_to_id);
let mut encode_buf = vec![];
for row in self.iter.iter_mut() {
let (table_id, ts_id) = RowModifier::fill_internal_columns(self.logical_table_id, &row);
for row in iter.iter_mut() {
let (table_id, ts_id) = RowModifier::fill_internal_columns(logical_table_id, &row);
let internal_columns = [
(
ReservedColumnId::table_id(),
@@ -306,13 +307,16 @@ impl BatchEncoder {
self.value.push(*val);
}
let num_rows = self.iter.num_rows();
debug_assert_eq!(self.value.len(), num_rows);
debug_assert_eq!(self.value.len(), self.timestamps.len());
debug_assert_eq!(
self.value.len(),
self.encoded_primary_key_array_builder.len()
);
Ok(())
}
fn finish(mut self) -> error::Result<arrow::record_batch::RecordBatch> {
let num_rows = self.timestamps.len();
let value = Float64Array::from(self.value);
let timestamp = TimestampMillisecondArray::from(self.timestamps);

View File

@@ -13,9 +13,9 @@
// limitations under the License.
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::string::ToString;
use ahash::HashMap;
use api::prom_store::remote::Sample;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, Row, RowInsertRequest, Rows, SemanticType, Value};