poc/create-alter-for-metrics:

- **Refactor `BatchEncoder` and `Columns`**:
   - Introduced `Columns` and `ColumnsBuilder` to manage primary key, timestamp, and value data more efficiently.
   - Updated `BatchEncoder` to utilize `ColumnsBuilder` for handling multiple record batches.
   - Modified `finish` method to return a vector of record batches instead of a single optional batch.
   - Added methods for estimating size and counting total rows in `BatchEncoder`.

 - **Enhance `MetricsBatchBuilder`**:
   - Changed the structure to store multiple record batches per region in `MetricsBatchBuilder`.

 - **Update `TablesBuilder` in `prom_row_builder.rs`**:
   - Modified to handle multiple record batches per region and collect file metadata for logging.

 - **Remove unnecessary logging in `prom_store.rs`**:
   - Removed the "Use bulk mode" log statement.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2025-06-29 14:06:44 +00:00
parent e4b048e788
commit 4ad40af468
3 changed files with 159 additions and 68 deletions

View File

@@ -26,6 +26,7 @@ use arrow_schema::Field;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::node_manager::NodeManagerRef;
use common_query::prelude::{GREPTIME_PHYSICAL_TABLE, GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_telemetry::info;
use itertools::Itertools;
use metric_engine::row_modifier::{RowModifier, RowsIter};
use mito_codec::row_converter::SparsePrimaryKeyCodec;
@@ -284,6 +285,7 @@ impl MetricsBatchBuilder {
encoder.append_rows(*logical_table_id, std::mem::take(table))?;
}
}
Ok(())
}
@@ -293,18 +295,21 @@ impl MetricsBatchBuilder {
) -> error::Result<
HashMap<
String, /*schema name*/
HashMap<RegionId /*physical region id*/, (RecordBatch, (i64, i64))>,
HashMap<RegionId /*physical region id*/, Vec<(RecordBatch, (i64, i64))>>,
>,
> {
let mut table_batches: HashMap<String, HashMap<RegionId, (RecordBatch, (i64, i64))>> =
let mut table_batches: HashMap<String, HashMap<RegionId, Vec<(RecordBatch, (i64, i64))>>> =
HashMap::with_capacity(self.builders.len());
for (schema_name, schema_tables) in self.builders {
let schema_batches = table_batches.entry(schema_name).or_default();
for (physical_region_id, table_data) in schema_tables {
let rb = table_data.finish()?;
if let Some(v) = rb {
schema_batches.entry(physical_region_id).insert_entry(v);
if !rb.is_empty() {
schema_batches
.entry(physical_region_id)
.or_default()
.extend(rb);
}
}
}
@@ -322,27 +327,122 @@ impl MetricsBatchBuilder {
}
}
struct BatchEncoder {
name_to_id: HashMap<String, ColumnId>,
struct Columns {
encoded_primary_key_array_builder: BinaryBuilder,
timestamps: Vec<i64>,
value: Vec<f64>,
pk_codec: SparsePrimaryKeyCodec,
timestamp_range: Option<(i64, i64)>,
}
impl Columns {
fn pk_offset(&self) -> usize {
self.encoded_primary_key_array_builder
.offsets_slice()
.last()
.copied()
.unwrap_or(0) as usize
}
fn estimated_size(&self) -> usize {
let value_size = self.encoded_primary_key_array_builder.values_slice().len();
let offset_size = self.encoded_primary_key_array_builder.offsets_slice().len() * 4;
let validity_sze = self
.encoded_primary_key_array_builder
.validity_slice()
.map(|v| v.len())
.unwrap_or(0);
let timestamp_size = self.timestamps.len() * 8 + std::mem::size_of::<Vec<i64>>();
let val_size = self.value.len() * 8 + std::mem::size_of::<Vec<f64>>();
value_size + offset_size + validity_sze + timestamp_size + val_size + size_of::<Self>()
}
fn push(&mut self, pk: &[u8], val: f64, timestamp: i64) {
self.encoded_primary_key_array_builder.append_value(&pk);
self.value.push(val);
self.timestamps.push(timestamp);
if let Some((min, max)) = &mut self.timestamp_range {
*min = (*min).min(timestamp);
*max = (*max).max(timestamp);
} else {
self.timestamp_range = Some((timestamp, timestamp));
}
}
}
impl Default for Columns {
fn default() -> Self {
Self {
encoded_primary_key_array_builder: BinaryBuilder::with_capacity(16, 0),
timestamps: Vec::with_capacity(16),
value: Vec::with_capacity(16),
timestamp_range: None,
}
}
}
#[derive(Default)]
struct ColumnsBuilder {
columns: Vec<Columns>,
}
impl ColumnsBuilder {
fn push(&mut self, pk: &[u8], val: f64, ts: i64) {
let last = match self.columns.last_mut() {
None => {
self.columns.push(Columns::default());
self.columns.last_mut().unwrap()
}
Some(last_builder) => {
if last_builder.pk_offset() + pk.len() >= i32::MAX as usize {
info!(
"Current builder is full {}, rows: {}/{}",
last_builder.pk_offset(),
last_builder.encoded_primary_key_array_builder.len(),
last_builder.timestamps.len()
);
// Current builder is full, create a new one
self.columns.push(Columns::default());
self.columns.last_mut().unwrap()
} else {
last_builder
}
}
};
last.push(pk, val, ts);
}
}
struct BatchEncoder {
name_to_id: HashMap<String, ColumnId>,
pk_codec: SparsePrimaryKeyCodec,
columns_builder: ColumnsBuilder,
}
impl BatchEncoder {
fn new(name_to_id: HashMap<String, ColumnId>) -> BatchEncoder {
Self {
name_to_id,
encoded_primary_key_array_builder: BinaryBuilder::with_capacity(16, 0),
timestamps: Vec::with_capacity(16),
value: Vec::with_capacity(16),
pk_codec: SparsePrimaryKeyCodec::schemaless(),
timestamp_range: None,
columns_builder: ColumnsBuilder::default(),
}
}
pub(crate) fn estimated_size(&self) -> usize {
self.columns_builder
.columns
.iter()
.map(|v| v.estimated_size())
.sum()
}
pub(crate) fn total_rows(&self) -> usize {
self.columns_builder
.columns
.iter()
.map(|v| v.timestamps.len())
.sum()
}
fn append_rows(
&mut self,
logical_table_id: TableId,
@@ -372,9 +472,10 @@ impl BatchEncoder {
self.pk_codec
.encode_to_vec(row.primary_keys(), &mut encode_buf)
.context(error::EncodePrimaryKeySnafu)?;
self.encoded_primary_key_array_builder
.append_value(&encode_buf);
// safety: field values cannot be null in prom remote write
let ValueData::F64Value(val) = row.value_at(1).value_data.as_ref().unwrap() else {
return error::InvalidFieldValueTypeSnafu.fail();
};
// process timestamp and field. We already know the position of timestamps and values in [TableBuilder].
let ValueData::TimestampMillisecondValue(ts) =
// safety: timestamp values cannot be null
@@ -382,51 +483,42 @@ impl BatchEncoder {
else {
return error::InvalidTimestampValueTypeSnafu.fail();
};
self.timestamps.push(*ts);
if let Some((min, max)) = &mut self.timestamp_range {
*min = (*min).min(*ts);
*max = (*max).max(*ts);
} else {
self.timestamp_range = Some((*ts, *ts));
}
// safety: field values cannot be null in prom remote write
let ValueData::F64Value(val) = row.value_at(1).value_data.as_ref().unwrap() else {
return error::InvalidFieldValueTypeSnafu.fail();
};
self.value.push(*val);
self.columns_builder.push(&encode_buf, *val, *ts);
}
debug_assert_eq!(self.value.len(), self.timestamps.len());
debug_assert_eq!(
self.value.len(),
self.encoded_primary_key_array_builder.len()
);
Ok(())
}
fn finish(mut self) -> error::Result<Option<(RecordBatch, (i64, i64))>> {
if self.timestamps.is_empty() {
return Ok(None);
fn finish(self) -> error::Result<Vec<(RecordBatch, (i64, i64))>> {
if self.columns_builder.columns.is_empty() {
return Ok(vec![]);
}
let num_rows = self.timestamps.len();
let value = Float64Array::from(self.value);
let timestamp = TimestampMillisecondArray::from(self.timestamps);
let op_type = Arc::new(UInt8Array::from_value(OpType::Put as u8, num_rows)) as ArrayRef;
// todo: now we set sequence all to 0.
let sequence = Arc::new(UInt64Array::from_value(0, num_rows)) as ArrayRef;
let mut res = Vec::with_capacity(self.columns_builder.columns.len());
let pk = self.encoded_primary_key_array_builder.finish();
let indices = compute::sort_to_indices(&pk, None, None).context(error::ArrowSnafu)?;
for mut columns in self.columns_builder.columns {
let num_rows = columns.timestamps.len();
let value = Float64Array::from(columns.value);
let timestamp = TimestampMillisecondArray::from(columns.timestamps);
// Sort arrays
let value = compute::take(&value, &indices, None).context(error::ArrowSnafu)?;
let ts = compute::take(&timestamp, &indices, None).context(error::ArrowSnafu)?;
let pk = compute::take(&pk, &indices, None).context(error::ArrowSnafu)?;
let rb = RecordBatch::try_new(physical_schema(), vec![value, ts, pk, sequence, op_type])
.context(error::ArrowSnafu)?;
Ok(Some((rb, self.timestamp_range.unwrap())))
let op_type = Arc::new(UInt8Array::from_value(OpType::Put as u8, num_rows)) as ArrayRef;
// todo: now we set sequence all to 0.
let sequence = Arc::new(UInt64Array::from_value(0, num_rows)) as ArrayRef;
let pk = columns.encoded_primary_key_array_builder.finish();
let indices = compute::sort_to_indices(&pk, None, None).context(error::ArrowSnafu)?;
// Sort arrays
let value = compute::take(&value, &indices, None).context(error::ArrowSnafu)?;
let ts = compute::take(&timestamp, &indices, None).context(error::ArrowSnafu)?;
let pk = compute::take(&pk, &indices, None).context(error::ArrowSnafu)?;
let rb =
RecordBatch::try_new(physical_schema(), vec![value, ts, pk, sequence, op_type])
.context(error::ArrowSnafu)?;
res.push((rb, columns.timestamp_range.unwrap()))
}
Ok(res)
}
}

View File

@@ -148,7 +148,6 @@ pub async fn remote_write(
}
if let Some(state) = bulk_state {
info!("Use bulk mode");
let context = PromBulkContext {
schema_helper: state.schema_helper,
query_ctx: query_ctx.clone(),
@@ -167,8 +166,6 @@ pub async fn remote_write(
return Ok((StatusCode::NO_CONTENT, write_cost_header_map(0)).into_response());
}
info!("Use original mode");
let req =
decode_remote_write_request(is_zstd, body, prom_validation_mode, &mut processor).await?;

View File

@@ -164,35 +164,37 @@ impl TablesBuilder {
let start = Instant::now();
let mut tables_per_schema = HashMap::with_capacity(record_batches.len());
let mut file_metas = vec![];
for (schema_name, schema_batches) in record_batches {
let tables_in_schema = tables_per_schema.entry(schema_name.clone()).or_insert(0);
*tables_in_schema = *tables_in_schema + 1;
let schema_regions = physical_region_id_to_meta
.get(&schema_name)
.expect("physical region metadata not found");
for (physical_region_id, (rb, time_range)) in schema_batches {
for (physical_region_id, record_batches) in schema_batches {
let physical_region_metadata = schema_regions
.get(&physical_region_id)
.expect("physical region metadata not found");
let mut writer = bulk_ctx
.access_layer_factory
.create_sst_writer(
"greptime", //todo(hl): use the catalog name in query context.
&schema_name,
physical_region_metadata.clone(),
)
.await?;
writer.write_record_batch(&rb, Some(time_range)).await?;
let file_meta = writer.finish().await?;
info!("file meta: {:?}", file_meta);
for (rb, time_range) in record_batches {
let mut writer = bulk_ctx
.access_layer_factory
.create_sst_writer(
"greptime", //todo(hl): use the catalog name in query context.
&schema_name,
physical_region_metadata.clone(),
)
.await?;
writer.write_record_batch(&rb, Some(time_range)).await?;
let file_meta = writer.finish().await?;
file_metas.push(file_meta);
}
}
}
info!(
"upload sst files, elapsed time: {}ms, schema num: {} tables_per_schema: {:?}",
"upload sst files, elapsed time: {}ms, schema num: {} tables_per_schema: {:?}, file_metas: {:?}",
start.elapsed().as_millis(),
tables_per_schema.len(),
tables_per_schema
tables_per_schema, file_metas
);
Ok(())
}