diff --git a/Cargo.lock b/Cargo.lock index 8fe5bb4078..f0a1780746 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11279,6 +11279,7 @@ dependencies = [ "object-store", "parquet", "parquet_opendal", + "prost 0.13.3", "serde", "serde_json", "snafu 0.8.5", diff --git a/src/sst-convert/Cargo.toml b/src/sst-convert/Cargo.toml index 140e4c5c6a..e7819e3d40 100644 --- a/src/sst-convert/Cargo.toml +++ b/src/sst-convert/Cargo.toml @@ -25,6 +25,7 @@ mito2.workspace = true object-store.workspace = true parquet.workspace = true parquet_opendal = "0.3.0" +prost.workspace = true serde.workspace = true serde_json.workspace = true snafu.workspace = true diff --git a/src/sst-convert/src/error.rs b/src/sst-convert/src/error.rs index c60fb7d504..88269e4232 100644 --- a/src/sst-convert/src/error.rs +++ b/src/sst-convert/src/error.rs @@ -94,6 +94,14 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Decode error"))] + Decode { + #[snafu(source)] + error: prost::DecodeError, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -111,6 +119,7 @@ impl ErrorExt for Error { Error::Datanode { source, .. } => source.status_code(), Error::Meta { source, .. } => source.status_code(), Error::MetaClient { source, .. } => source.status_code(), + Error::Decode { .. } => StatusCode::InvalidArguments, } } diff --git a/src/sst-convert/src/reader/remote_write.rs b/src/sst-convert/src/reader/remote_write.rs index 654f6e278e..3f3260a440 100644 --- a/src/sst-convert/src/reader/remote_write.rs +++ b/src/sst-convert/src/reader/remote_write.rs @@ -12,9 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Prometheus remote write JSON support. +//! Prometheus remote write support. //! -//! The Prometheus remote write JSON format is a NDJSON representation of the Prometheus remote write protocol. +//! The Prometheus remote write format is a Parquet representation of the Prometheus remote write protocol. //! - Each Line contains a single timeseries. //! - Each series only occurs once. @@ -24,7 +24,7 @@ use std::sync::Arc; use api::prom_store::remote::{Label, TimeSeries}; use api::v1::OpType; use arrow_array::{ - Array, Float64Array, StringArray, TimestampMillisecondArray, UInt64Array, UInt8Array, + Array, BinaryArray, Float64Array, TimestampMillisecondArray, UInt64Array, UInt8Array, }; use datatypes::value::ValueRef; use futures::{AsyncBufReadExt, StreamExt}; @@ -36,6 +36,7 @@ use object_store::{FuturesAsyncReader, ObjectStore, Reader}; use parquet::arrow::async_reader::ParquetRecordBatchStream; use parquet::arrow::ParquetRecordBatchStreamBuilder; use parquet_opendal::AsyncReader; +use prost::Message; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; use store_api::storage::consts::ReservedColumnId; @@ -43,8 +44,8 @@ use store_api::storage::{ColumnId, SequenceNumber}; use table::metadata::TableId; use crate::error::{ - IoSnafu, JsonSnafu, MissingColumnSnafu, MissingMetricNameSnafu, MissingTableSnafu, MitoSnafu, - ObjectStoreSnafu, Result, + DecodeSnafu, IoSnafu, JsonSnafu, MissingColumnSnafu, MissingMetricNameSnafu, MissingTableSnafu, + MitoSnafu, ObjectStoreSnafu, Result, }; use crate::table::TableMetadataHelper; @@ -198,8 +199,8 @@ pub struct TimeSeriesParquetReader { stream: ParquetRecordBatchStream, /// Is the stream EOF. eof: bool, - /// Current string array. - array: Option, + /// Current binary array. + array: Option, /// Current row in the record batch. /// Only valid when the record batch is Some. row_index: usize, @@ -260,21 +261,21 @@ impl TimeSeriesParquetReader { self.eof = true; return Ok(None); }; - self.array = Some( - record_batch - .column(0) - .as_any() - .downcast_ref::() - .context(MissingColumnSnafu { - column_name: "remote write json column", - })? - .clone(), - ); + let array = record_batch + .column(0) + .as_any() + .downcast_ref::() + .context(MissingColumnSnafu { + column_name: "remote write json column", + })? + .clone(); + assert!(!array.is_empty()); + self.array = Some(array); } let array = self.array.as_ref().unwrap(); let value = array.value(self.row_index); - let time_series = serde_json::from_str(value).context(JsonSnafu)?; + let time_series = TimeSeries::decode(value).context(DecodeSnafu)?; self.row_index += 1; Ok(Some(time_series))