diff --git a/src/servers/src/batch_builder.rs b/src/servers/src/batch_builder.rs index 7a71d060dc..4697d76c39 100644 --- a/src/servers/src/batch_builder.rs +++ b/src/servers/src/batch_builder.rs @@ -26,6 +26,7 @@ use arrow_schema::Field; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_meta::node_manager::NodeManagerRef; use common_query::prelude::{GREPTIME_PHYSICAL_TABLE, GREPTIME_TIMESTAMP, GREPTIME_VALUE}; +use common_telemetry::info; use itertools::Itertools; use metric_engine::row_modifier::{RowModifier, RowsIter}; use mito_codec::row_converter::SparsePrimaryKeyCodec; @@ -284,6 +285,7 @@ impl MetricsBatchBuilder { encoder.append_rows(*logical_table_id, std::mem::take(table))?; } } + Ok(()) } @@ -293,18 +295,21 @@ impl MetricsBatchBuilder { ) -> error::Result< HashMap< String, /*schema name*/ - HashMap, + HashMap>, >, > { - let mut table_batches: HashMap> = + let mut table_batches: HashMap>> = HashMap::with_capacity(self.builders.len()); for (schema_name, schema_tables) in self.builders { let schema_batches = table_batches.entry(schema_name).or_default(); for (physical_region_id, table_data) in schema_tables { let rb = table_data.finish()?; - if let Some(v) = rb { - schema_batches.entry(physical_region_id).insert_entry(v); + if !rb.is_empty() { + schema_batches + .entry(physical_region_id) + .or_default() + .extend(rb); } } } @@ -322,27 +327,122 @@ impl MetricsBatchBuilder { } } -struct BatchEncoder { - name_to_id: HashMap, +struct Columns { encoded_primary_key_array_builder: BinaryBuilder, timestamps: Vec, value: Vec, - pk_codec: SparsePrimaryKeyCodec, timestamp_range: Option<(i64, i64)>, } +impl Columns { + fn pk_offset(&self) -> usize { + self.encoded_primary_key_array_builder + .offsets_slice() + .last() + .copied() + .unwrap_or(0) as usize + } + + fn estimated_size(&self) -> usize { + let value_size = self.encoded_primary_key_array_builder.values_slice().len(); + let offset_size = self.encoded_primary_key_array_builder.offsets_slice().len() * 4; + let validity_sze = self + .encoded_primary_key_array_builder + .validity_slice() + .map(|v| v.len()) + .unwrap_or(0); + let timestamp_size = self.timestamps.len() * 8 + std::mem::size_of::>(); + let val_size = self.value.len() * 8 + std::mem::size_of::>(); + value_size + offset_size + validity_sze + timestamp_size + val_size + size_of::() + } + + fn push(&mut self, pk: &[u8], val: f64, timestamp: i64) { + self.encoded_primary_key_array_builder.append_value(&pk); + self.value.push(val); + self.timestamps.push(timestamp); + if let Some((min, max)) = &mut self.timestamp_range { + *min = (*min).min(timestamp); + *max = (*max).max(timestamp); + } else { + self.timestamp_range = Some((timestamp, timestamp)); + } + } +} + +impl Default for Columns { + fn default() -> Self { + Self { + encoded_primary_key_array_builder: BinaryBuilder::with_capacity(16, 0), + timestamps: Vec::with_capacity(16), + value: Vec::with_capacity(16), + timestamp_range: None, + } + } +} + +#[derive(Default)] +struct ColumnsBuilder { + columns: Vec, +} + +impl ColumnsBuilder { + fn push(&mut self, pk: &[u8], val: f64, ts: i64) { + let last = match self.columns.last_mut() { + None => { + self.columns.push(Columns::default()); + self.columns.last_mut().unwrap() + } + Some(last_builder) => { + if last_builder.pk_offset() + pk.len() >= i32::MAX as usize { + info!( + "Current builder is full {}, rows: {}/{}", + last_builder.pk_offset(), + last_builder.encoded_primary_key_array_builder.len(), + last_builder.timestamps.len() + ); + // Current builder is full, create a new one + self.columns.push(Columns::default()); + self.columns.last_mut().unwrap() + } else { + last_builder + } + } + }; + last.push(pk, val, ts); + } +} + +struct BatchEncoder { + name_to_id: HashMap, + pk_codec: SparsePrimaryKeyCodec, + columns_builder: ColumnsBuilder, +} + impl BatchEncoder { fn new(name_to_id: HashMap) -> BatchEncoder { Self { name_to_id, - encoded_primary_key_array_builder: BinaryBuilder::with_capacity(16, 0), - timestamps: Vec::with_capacity(16), - value: Vec::with_capacity(16), pk_codec: SparsePrimaryKeyCodec::schemaless(), - timestamp_range: None, + columns_builder: ColumnsBuilder::default(), } } + pub(crate) fn estimated_size(&self) -> usize { + self.columns_builder + .columns + .iter() + .map(|v| v.estimated_size()) + .sum() + } + + pub(crate) fn total_rows(&self) -> usize { + self.columns_builder + .columns + .iter() + .map(|v| v.timestamps.len()) + .sum() + } + fn append_rows( &mut self, logical_table_id: TableId, @@ -372,9 +472,10 @@ impl BatchEncoder { self.pk_codec .encode_to_vec(row.primary_keys(), &mut encode_buf) .context(error::EncodePrimaryKeySnafu)?; - self.encoded_primary_key_array_builder - .append_value(&encode_buf); - + // safety: field values cannot be null in prom remote write + let ValueData::F64Value(val) = row.value_at(1).value_data.as_ref().unwrap() else { + return error::InvalidFieldValueTypeSnafu.fail(); + }; // process timestamp and field. We already know the position of timestamps and values in [TableBuilder]. let ValueData::TimestampMillisecondValue(ts) = // safety: timestamp values cannot be null @@ -382,51 +483,42 @@ impl BatchEncoder { else { return error::InvalidTimestampValueTypeSnafu.fail(); }; - self.timestamps.push(*ts); - if let Some((min, max)) = &mut self.timestamp_range { - *min = (*min).min(*ts); - *max = (*max).max(*ts); - } else { - self.timestamp_range = Some((*ts, *ts)); - } - // safety: field values cannot be null in prom remote write - let ValueData::F64Value(val) = row.value_at(1).value_data.as_ref().unwrap() else { - return error::InvalidFieldValueTypeSnafu.fail(); - }; - self.value.push(*val); + self.columns_builder.push(&encode_buf, *val, *ts); } - - debug_assert_eq!(self.value.len(), self.timestamps.len()); - debug_assert_eq!( - self.value.len(), - self.encoded_primary_key_array_builder.len() - ); Ok(()) } - fn finish(mut self) -> error::Result> { - if self.timestamps.is_empty() { - return Ok(None); + fn finish(self) -> error::Result> { + if self.columns_builder.columns.is_empty() { + return Ok(vec![]); } - let num_rows = self.timestamps.len(); - let value = Float64Array::from(self.value); - let timestamp = TimestampMillisecondArray::from(self.timestamps); - let op_type = Arc::new(UInt8Array::from_value(OpType::Put as u8, num_rows)) as ArrayRef; - // todo: now we set sequence all to 0. - let sequence = Arc::new(UInt64Array::from_value(0, num_rows)) as ArrayRef; + let mut res = Vec::with_capacity(self.columns_builder.columns.len()); - let pk = self.encoded_primary_key_array_builder.finish(); - let indices = compute::sort_to_indices(&pk, None, None).context(error::ArrowSnafu)?; + for mut columns in self.columns_builder.columns { + let num_rows = columns.timestamps.len(); + let value = Float64Array::from(columns.value); + let timestamp = TimestampMillisecondArray::from(columns.timestamps); - // Sort arrays - let value = compute::take(&value, &indices, None).context(error::ArrowSnafu)?; - let ts = compute::take(×tamp, &indices, None).context(error::ArrowSnafu)?; - let pk = compute::take(&pk, &indices, None).context(error::ArrowSnafu)?; - let rb = RecordBatch::try_new(physical_schema(), vec![value, ts, pk, sequence, op_type]) - .context(error::ArrowSnafu)?; - Ok(Some((rb, self.timestamp_range.unwrap()))) + let op_type = Arc::new(UInt8Array::from_value(OpType::Put as u8, num_rows)) as ArrayRef; + // todo: now we set sequence all to 0. + let sequence = Arc::new(UInt64Array::from_value(0, num_rows)) as ArrayRef; + + let pk = columns.encoded_primary_key_array_builder.finish(); + let indices = compute::sort_to_indices(&pk, None, None).context(error::ArrowSnafu)?; + + // Sort arrays + let value = compute::take(&value, &indices, None).context(error::ArrowSnafu)?; + let ts = compute::take(×tamp, &indices, None).context(error::ArrowSnafu)?; + let pk = compute::take(&pk, &indices, None).context(error::ArrowSnafu)?; + let rb = + RecordBatch::try_new(physical_schema(), vec![value, ts, pk, sequence, op_type]) + .context(error::ArrowSnafu)?; + res.push((rb, columns.timestamp_range.unwrap())) + } + + Ok(res) } } diff --git a/src/servers/src/http/prom_store.rs b/src/servers/src/http/prom_store.rs index 43eb3689c3..17f8f604b6 100644 --- a/src/servers/src/http/prom_store.rs +++ b/src/servers/src/http/prom_store.rs @@ -148,7 +148,6 @@ pub async fn remote_write( } if let Some(state) = bulk_state { - info!("Use bulk mode"); let context = PromBulkContext { schema_helper: state.schema_helper, query_ctx: query_ctx.clone(), @@ -167,8 +166,6 @@ pub async fn remote_write( return Ok((StatusCode::NO_CONTENT, write_cost_header_map(0)).into_response()); } - info!("Use original mode"); - let req = decode_remote_write_request(is_zstd, body, prom_validation_mode, &mut processor).await?; diff --git a/src/servers/src/prom_row_builder.rs b/src/servers/src/prom_row_builder.rs index 359e3982a0..c4de97843b 100644 --- a/src/servers/src/prom_row_builder.rs +++ b/src/servers/src/prom_row_builder.rs @@ -164,35 +164,37 @@ impl TablesBuilder { let start = Instant::now(); let mut tables_per_schema = HashMap::with_capacity(record_batches.len()); + let mut file_metas = vec![]; for (schema_name, schema_batches) in record_batches { let tables_in_schema = tables_per_schema.entry(schema_name.clone()).or_insert(0); *tables_in_schema = *tables_in_schema + 1; let schema_regions = physical_region_id_to_meta .get(&schema_name) .expect("physical region metadata not found"); - for (physical_region_id, (rb, time_range)) in schema_batches { + for (physical_region_id, record_batches) in schema_batches { let physical_region_metadata = schema_regions .get(&physical_region_id) .expect("physical region metadata not found"); - let mut writer = bulk_ctx - .access_layer_factory - .create_sst_writer( - "greptime", //todo(hl): use the catalog name in query context. - &schema_name, - physical_region_metadata.clone(), - ) - .await?; - - writer.write_record_batch(&rb, Some(time_range)).await?; - let file_meta = writer.finish().await?; - info!("file meta: {:?}", file_meta); + for (rb, time_range) in record_batches { + let mut writer = bulk_ctx + .access_layer_factory + .create_sst_writer( + "greptime", //todo(hl): use the catalog name in query context. + &schema_name, + physical_region_metadata.clone(), + ) + .await?; + writer.write_record_batch(&rb, Some(time_range)).await?; + let file_meta = writer.finish().await?; + file_metas.push(file_meta); + } } } info!( - "upload sst files, elapsed time: {}ms, schema num: {} tables_per_schema: {:?}", + "upload sst files, elapsed time: {}ms, schema num: {} tables_per_schema: {:?}, file_metas: {:?}", start.elapsed().as_millis(), tables_per_schema.len(), - tables_per_schema + tables_per_schema, file_metas ); Ok(()) }