From 6ad186a13e7d8f8b4ee1931c87dfca4607299446 Mon Sep 17 00:00:00 2001 From: evenyag Date: Sun, 9 Mar 2025 15:07:53 +0800 Subject: [PATCH] feat: series to batch --- src/sst-convert/src/error.rs | 20 ++++- src/sst-convert/src/reader/remote_write.rs | 85 +++++++++++++++++++--- 2 files changed, 93 insertions(+), 12 deletions(-) diff --git a/src/sst-convert/src/error.rs b/src/sst-convert/src/error.rs index 441ad8346a..8050e3fbea 100644 --- a/src/sst-convert/src/error.rs +++ b/src/sst-convert/src/error.rs @@ -53,8 +53,16 @@ pub enum Error { location: Location, }, - #[snafu(display("Table not found"))] + #[snafu(display("Table not found: {}", table_name))] MissingTable { + table_name: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Column not found: {}", column_name))] + MissingColumn { + column_name: String, #[snafu(implicit)] location: Location, }, @@ -71,7 +79,15 @@ pub type Result = std::result::Result; impl ErrorExt for Error { fn status_code(&self) -> StatusCode { - StatusCode::Unexpected + match self { + Error::ObjectStore { .. } => StatusCode::StorageUnavailable, + Error::Io { .. } => StatusCode::StorageUnavailable, + Error::Json { .. } => StatusCode::InvalidArguments, + Error::MissingMetricName { .. } => StatusCode::InvalidArguments, + Error::MissingTable { .. } => StatusCode::TableNotFound, + Error::MissingColumn { .. } => StatusCode::TableColumnNotFound, + Error::Mito { source, .. } => source.status_code(), + } } fn as_any(&self) -> &dyn std::any::Any { diff --git a/src/sst-convert/src/reader/remote_write.rs b/src/sst-convert/src/reader/remote_write.rs index db14ac6fd5..b67b130056 100644 --- a/src/sst-convert/src/reader/remote_write.rs +++ b/src/sst-convert/src/reader/remote_write.rs @@ -19,33 +19,39 @@ //! - Each series only occurs once. use std::collections::HashMap; +use std::sync::Arc; use api::prom_store::remote::{Label, TimeSeries}; +use api::v1::OpType; +use arrow_array::{Float64Array, TimestampMillisecondArray, UInt64Array, UInt8Array}; use datatypes::value::ValueRef; use futures::AsyncBufReadExt; use metric_engine::row_modifier::TsidGenerator; -use mito2::read::{Batch, BatchReader}; +use mito2::read::{Batch, BatchBuilder, BatchReader}; use mito2::row_converter::SparsePrimaryKeyCodec; use object_store::{FuturesAsyncReader, ObjectStore, Reader}; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; use store_api::storage::consts::ReservedColumnId; +use store_api::storage::{ColumnId, SequenceNumber}; use table::metadata::TableId; use crate::error::{ - IoSnafu, JsonSnafu, MissingMetricNameSnafu, MissingTableSnafu, MitoSnafu, ObjectStoreSnafu, - Result, + IoSnafu, JsonSnafu, MissingColumnSnafu, MissingMetricNameSnafu, MissingTableSnafu, MitoSnafu, + ObjectStoreSnafu, Result, }; use crate::table::TableMetadataHelper; const METRIC_NAME_LABEL: &str = "__name__"; +const GREPTIME_VALUE: &str = "greptime_value"; /// A reader that reads remote write file, sorts and outputs timeseries in the primary key order. struct RemoteWriteReader { - /// Timeseries sorted by primary key. + /// Timeseries sorted by primary key in reverse order. + /// So we can pop the series. series: Vec<(Vec, TimeSeries)>, - /// Current index in the series. - index: usize, + /// Converter for converting timeseries to batches. + converter: SeriesConverter, } impl RemoteWriteReader { @@ -60,6 +66,12 @@ impl RemoteWriteReader { table_helper: TableMetadataHelper, ) -> Result { let codec = SparsePrimaryKeyCodec::new(&metadata); + let value_id = metadata + .column_by_name(GREPTIME_VALUE) + .context(MissingColumnSnafu { + column_name: GREPTIME_VALUE, + })? + .column_id; let encoder = PrimaryKeyEncoder { catalog, schema, @@ -68,6 +80,10 @@ impl RemoteWriteReader { table_ids: HashMap::new(), codec, }; + let converter = SeriesConverter { + value_id, + override_sequence: None, + }; let mut sorter = TimeSeriesSorter::new(encoder); let reader = operator.reader(path).await.context(ObjectStoreSnafu)?; @@ -75,16 +91,63 @@ impl RemoteWriteReader { while let Some(series) = reader.next_series().await? { sorter.push(series).await?; } - let series = sorter.sort(); + let mut series = sorter.sort(); + series.reverse(); - Ok(Self { series, index: 0 }) + Ok(Self { series, converter }) + } + + /// Sets the sequence number of the batch. + /// Otherwise, the sequence number will be 0. + pub fn with_sequence(mut self, sequence: SequenceNumber) -> Self { + self.converter.override_sequence = Some(sequence); + self + } + + fn next_series(&mut self) -> Option<(Vec, TimeSeries)> { + self.series.pop() } } #[async_trait::async_trait] impl BatchReader for RemoteWriteReader { async fn next_batch(&mut self) -> mito2::error::Result> { - todo!() + let Some((pk, series)) = self.next_series() else { + return Ok(None); + }; + + self.converter.convert(pk, series).map(Some) + } +} + +struct SeriesConverter { + /// Column id for the value field. + value_id: ColumnId, + /// Sequence number of the batch. + override_sequence: Option, +} + +impl SeriesConverter { + /// Builds a batch from a primary key and a time series. + fn convert(&self, primary_key: Vec, series: TimeSeries) -> mito2::error::Result { + let num_rows = series.samples.len(); + let op_types = vec![OpType::Put as u8; num_rows]; + // TODO(yingwen): Should we use 0 or 1 as default? + let sequences = vec![self.override_sequence.unwrap_or(0); num_rows]; + let mut timestamps = Vec::with_capacity(num_rows); + let mut values = Vec::with_capacity(num_rows); + for sample in series.samples { + timestamps.push(sample.timestamp); + values.push(sample.value); + } + + let mut builder = BatchBuilder::new(primary_key); + builder + .timestamps_array(Arc::new(TimestampMillisecondArray::from(timestamps)))? + .sequences_array(Arc::new(UInt64Array::from(sequences)))? + .op_types_array(Arc::new(UInt8Array::from(op_types)))? + .push_field_array(self.value_id, Arc::new(Float64Array::from(values)))?; + builder.build() } } @@ -164,7 +227,9 @@ impl PrimaryKeyEncoder { .table_helper .get_table(&self.catalog, &self.schema, &name_label.name) .await - .context(MissingTableSnafu)?; + .context(MissingTableSnafu { + table_name: &name_label.name, + })?; let id = table_info.table_info.ident.table_id; self.table_ids.insert(name_label.name.clone(), id);