diff --git a/Cargo.lock b/Cargo.lock index 7ae88c65fd..4a25143946 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7234,6 +7234,7 @@ dependencies = [ name = "metric-engine" version = "0.15.0" dependencies = [ + "ahash 0.8.11", "api", "aquamarine", "async-stream", diff --git a/src/metric-engine/Cargo.toml b/src/metric-engine/Cargo.toml index 0306a38ade..f629798e04 100644 --- a/src/metric-engine/Cargo.toml +++ b/src/metric-engine/Cargo.toml @@ -8,6 +8,7 @@ license.workspace = true workspace = true [dependencies] +ahash.workspace = true api.workspace = true aquamarine.workspace = true async-stream.workspace = true diff --git a/src/operator/src/schema_helper.rs b/src/operator/src/schema_helper.rs index 2538e1ac85..efb6e56334 100644 --- a/src/operator/src/schema_helper.rs +++ b/src/operator/src/schema_helper.rs @@ -627,7 +627,7 @@ pub struct LogicalSchema { /// Logical table schemas. pub struct LogicalSchemas { /// Logical table schemas group by physical table name. - pub schemas: ahash::HashMap>, + pub schemas: HashMap>, } /// Creates or alters logical tables to match the provided schemas diff --git a/src/servers/src/batch_builder.rs b/src/servers/src/batch_builder.rs index a52125da2d..f8fb6211c8 100644 --- a/src/servers/src/batch_builder.rs +++ b/src/servers/src/batch_builder.rs @@ -12,10 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use ahash::HashMap; use api::v1::value::ValueData; use api::v1::{ColumnDataType, ColumnSchema, OpType, SemanticType}; use arrow::array::{ @@ -36,20 +35,24 @@ use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use store_api::storage::consts::{ ReservedColumnId, OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME, }; +use store_api::storage::ColumnId; use table::metadata::TableId; use table::table_name::TableName; -use table::TableRef; use crate::error; use crate::prom_row_builder::{PromCtx, TableBuilder}; pub struct MetricsBatchBuilder { schema_helper: SchemaHelper, + builders: HashMap, } impl MetricsBatchBuilder { pub fn new(schema_helper: SchemaHelper) -> Self { - MetricsBatchBuilder { schema_helper } + MetricsBatchBuilder { + schema_helper, + builders: Default::default(), + } } /// Detected the DDL requirements according to the staged table rows. @@ -129,7 +132,7 @@ impl MetricsBatchBuilder { /// Retrieves physical region metadata of given logical table names. async fn collect_physical_region_metadata( - logical_table_names: &[String], + _logical_table_names: &[String], ) -> HashMap< String, /*logical table name*/ RegionMetadata, /*Region metadata for physical re*/ @@ -143,8 +146,8 @@ impl MetricsBatchBuilder { /// Note: /// Make sure all logical table and physical table are created when reach here and the mapping /// from logical table name to physical table ref is stored in [physical_tables]. - async fn rows_to_batch( - &self, + async fn append_rows_to_batch( + &mut self, current_catalog: Option, current_schema: Option, table_data: &mut HashMap>, @@ -182,14 +185,16 @@ impl MetricsBatchBuilder { .fail(); }; - let batch_encoder = Self::create_sparse_encoder( + let encoder = self + .builders + .entry(physical_table.region_id.table_id()) + .or_insert_with(|| Self::create_sparse_encoder(&physical_table)); + encoder.append_rows( logical_table_ref.table_info().table_id(), - logical_table_name.clone(), - physical_table.clone(), std::mem::take(table), - ); - let _batch = batch_encoder.to_batch()?; - //todo(hl): Ingest batch. + )?; + + //todo(hl): finish and ingest batch. } } @@ -197,29 +202,18 @@ impl MetricsBatchBuilder { } /// Creates Encoder that converts Rows into RecordBatch with primary key encoded. - fn create_sparse_encoder( - logical_table_id: TableId, - logical_table_name: String, - physical_region_meta: RegionMetadataRef, - mut table_builder: TableBuilder, - ) -> BatchEncoder { - let name_to_id: std::collections::HashMap<_, _> = physical_region_meta + fn create_sparse_encoder(physical_region_meta: &RegionMetadataRef) -> BatchEncoder { + let name_to_id: HashMap<_, _> = physical_region_meta .column_metadatas .iter() .map(|c| (c.column_schema.name.clone(), c.column_id)) .collect(); - - // todo(hl): we can simplified the row iter because schema in TableBuilder is known (ts, val, tags...) - let row_insert_request = table_builder.as_row_insert_request(logical_table_name); - let rows = row_insert_request.rows.unwrap(); - let rows_iter = RowsIter::new(rows, &name_to_id); - BatchEncoder::new(logical_table_id, rows_iter) + BatchEncoder::new(name_to_id) } } struct BatchEncoder { - logical_table_id: TableId, - iter: RowsIter, + name_to_id: HashMap, encoded_primary_key_array_builder: BinaryBuilder, timestamps: Vec, value: Vec, @@ -227,14 +221,12 @@ struct BatchEncoder { } impl BatchEncoder { - fn new(logical_table_id: TableId, iter: RowsIter) -> BatchEncoder { - let num_rows = iter.num_rows(); + fn new(name_to_id: HashMap) -> BatchEncoder { Self { - logical_table_id, - iter, - encoded_primary_key_array_builder: BinaryBuilder::with_capacity(num_rows, 0), - timestamps: Vec::with_capacity(num_rows), - value: Vec::with_capacity(num_rows), + name_to_id, + encoded_primary_key_array_builder: BinaryBuilder::with_capacity(16, 0), + timestamps: Vec::with_capacity(16), + value: Vec::with_capacity(16), pk_codec: SparsePrimaryKeyCodec::schemaless(), } } @@ -269,10 +261,19 @@ impl BatchEncoder { ])) } - fn to_batch(mut self) -> error::Result { + fn append_rows( + &mut self, + logical_table_id: TableId, + mut table_builder: TableBuilder, + ) -> error::Result<()> { + // todo(hl): we can simplified the row iter because schema in TableBuilder is known (ts, val, tags...) + let row_insert_request = table_builder.as_row_insert_request("don't care".to_string()); + + let mut iter = RowsIter::new(row_insert_request.rows.unwrap(), &self.name_to_id); + let mut encode_buf = vec![]; - for row in self.iter.iter_mut() { - let (table_id, ts_id) = RowModifier::fill_internal_columns(self.logical_table_id, &row); + for row in iter.iter_mut() { + let (table_id, ts_id) = RowModifier::fill_internal_columns(logical_table_id, &row); let internal_columns = [ ( ReservedColumnId::table_id(), @@ -306,13 +307,16 @@ impl BatchEncoder { self.value.push(*val); } - let num_rows = self.iter.num_rows(); - debug_assert_eq!(self.value.len(), num_rows); debug_assert_eq!(self.value.len(), self.timestamps.len()); debug_assert_eq!( self.value.len(), self.encoded_primary_key_array_builder.len() ); + Ok(()) + } + + fn finish(mut self) -> error::Result { + let num_rows = self.timestamps.len(); let value = Float64Array::from(self.value); let timestamp = TimestampMillisecondArray::from(self.timestamps); diff --git a/src/servers/src/prom_row_builder.rs b/src/servers/src/prom_row_builder.rs index 8e61c80dc1..448e54422d 100644 --- a/src/servers/src/prom_row_builder.rs +++ b/src/servers/src/prom_row_builder.rs @@ -13,9 +13,9 @@ // limitations under the License. use std::collections::hash_map::Entry; +use std::collections::HashMap; use std::string::ToString; -use ahash::HashMap; use api::prom_store::remote::Sample; use api::v1::value::ValueData; use api::v1::{ColumnDataType, ColumnSchema, Row, RowInsertRequest, Rows, SemanticType, Value};