diff --git a/Cargo.lock b/Cargo.lock index f0a1780746..e5418ca682 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11280,8 +11280,6 @@ dependencies = [ "parquet", "parquet_opendal", "prost 0.13.3", - "serde", - "serde_json", "snafu 0.8.5", "store-api", "table", diff --git a/src/sst-convert/Cargo.toml b/src/sst-convert/Cargo.toml index e7819e3d40..d11720941a 100644 --- a/src/sst-convert/Cargo.toml +++ b/src/sst-convert/Cargo.toml @@ -26,8 +26,6 @@ object-store.workspace = true parquet.workspace = true parquet_opendal = "0.3.0" prost.workspace = true -serde.workspace = true -serde_json.workspace = true snafu.workspace = true store-api.workspace = true table.workspace = true diff --git a/src/sst-convert/src/error.rs b/src/sst-convert/src/error.rs index 88269e4232..b4c20e9a62 100644 --- a/src/sst-convert/src/error.rs +++ b/src/sst-convert/src/error.rs @@ -31,22 +31,6 @@ pub enum Error { location: Location, }, - #[snafu(display("I/O error"))] - Io { - #[snafu(source)] - error: std::io::Error, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("JSON error"))] - Json { - #[snafu(source)] - error: serde_json::Error, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Missing __name__ label"))] MissingMetricName { #[snafu(implicit)] @@ -110,8 +94,6 @@ impl ErrorExt for Error { fn status_code(&self) -> StatusCode { match self { Error::ObjectStore { .. } => StatusCode::StorageUnavailable, - Error::Io { .. } => StatusCode::StorageUnavailable, - Error::Json { .. } => StatusCode::InvalidArguments, Error::MissingMetricName { .. } => StatusCode::InvalidArguments, Error::MissingTable { .. } => StatusCode::TableNotFound, Error::MissingColumn { .. } => StatusCode::TableColumnNotFound, diff --git a/src/sst-convert/src/reader/remote_write.rs b/src/sst-convert/src/reader/remote_write.rs index 409aca8ed9..9f6a9e0912 100644 --- a/src/sst-convert/src/reader/remote_write.rs +++ b/src/sst-convert/src/reader/remote_write.rs @@ -14,9 +14,10 @@ //! Prometheus remote write support. //! -//! The Prometheus remote write format is a Parquet representation of the Prometheus remote write protocol. -//! - Each Line contains a single timeseries. -//! - Each series only occurs once. +//! Prometheus remote write protocol in parquet format. +//! - Each row contains a protobuf binary representation of a single timeseries. +//! - Each series only occurs once in the file. +//! - Each timeseries must has a the `__name__` label. use std::collections::HashMap; use std::sync::Arc; @@ -27,12 +28,12 @@ use arrow_array::{ Array, BinaryArray, Float64Array, TimestampMillisecondArray, UInt64Array, UInt8Array, }; use datatypes::value::ValueRef; -use futures::{AsyncBufReadExt, StreamExt}; +use futures::StreamExt; use metric_engine::row_modifier::TsidGenerator; use mito2::error::ReadParquetSnafu; use mito2::read::{Batch, BatchBuilder, BatchReader}; use mito2::row_converter::SparsePrimaryKeyCodec; -use object_store::{FuturesAsyncReader, ObjectStore, Reader}; +use object_store::ObjectStore; use parquet::arrow::async_reader::ParquetRecordBatchStream; use parquet::arrow::ParquetRecordBatchStreamBuilder; use parquet_opendal::AsyncReader; @@ -44,8 +45,8 @@ use store_api::storage::{ColumnId, SequenceNumber}; use table::metadata::TableId; use crate::error::{ - DecodeSnafu, IoSnafu, JsonSnafu, MissingColumnSnafu, MissingMetricNameSnafu, MissingTableSnafu, - MitoSnafu, ObjectStoreSnafu, Result, + DecodeSnafu, MissingColumnSnafu, MissingMetricNameSnafu, MissingTableSnafu, MitoSnafu, + ObjectStoreSnafu, Result, }; use crate::table::TableMetadataHelper; @@ -157,42 +158,6 @@ impl SeriesConverter { } } -/// Prometheus remote write NDJSON reader. -pub struct TimeSeriesReader { - reader: FuturesAsyncReader, - buffer: String, -} - -impl TimeSeriesReader { - /// Creates a new [`TimeSeriesReader`] from a [`Reader`]. - pub async fn new(reader: Reader) -> Result { - let reader = reader - .into_futures_async_read(..) - .await - .context(ObjectStoreSnafu)?; - - Ok(Self { - reader, - buffer: String::new(), - }) - } - - /// Reads the next timeseries from the reader. - pub async fn next_series(&mut self) -> Result> { - self.buffer.clear(); - self.reader - .read_line(&mut self.buffer) - .await - .context(IoSnafu)?; - if self.buffer.is_empty() { - return Ok(None); - } - - let time_series = serde_json::from_str(&self.buffer).context(JsonSnafu)?; - Ok(Some(time_series)) - } -} - /// Prometheus remote write parquet reader. pub struct TimeSeriesParquetReader { path: String,