From 8aadd1e59a30835f02ffc8bfdd4272889c71cdb0 Mon Sep 17 00:00:00 2001 From: evenyag Date: Sun, 9 Mar 2025 23:41:16 +0800 Subject: [PATCH] feat: parquet remote write reader --- src/sst-convert/src/lib.rs | 2 +- src/sst-convert/src/reader.rs | 2 +- src/sst-convert/src/reader/remote_write.rs | 98 +++++++++++++++++++++- 3 files changed, 97 insertions(+), 5 deletions(-) diff --git a/src/sst-convert/src/lib.rs b/src/sst-convert/src/lib.rs index c85a16731f..03c5e6e558 100644 --- a/src/sst-convert/src/lib.rs +++ b/src/sst-convert/src/lib.rs @@ -14,7 +14,7 @@ pub mod converter; pub mod error; -mod reader; +pub mod reader; mod table; pub mod writer; diff --git a/src/sst-convert/src/reader.rs b/src/sst-convert/src/reader.rs index e34ff04959..a8dabc9981 100644 --- a/src/sst-convert/src/reader.rs +++ b/src/sst-convert/src/reader.rs @@ -36,7 +36,7 @@ use crate::table::TableMetadataHelper; use crate::OpenDALParquetReader; pub(crate) mod parquet; -mod remote_write; +pub mod remote_write; /// Reader and context. pub struct ReaderInfo { diff --git a/src/sst-convert/src/reader/remote_write.rs b/src/sst-convert/src/reader/remote_write.rs index ddae378816..e6104e1191 100644 --- a/src/sst-convert/src/reader/remote_write.rs +++ b/src/sst-convert/src/reader/remote_write.rs @@ -23,13 +23,19 @@ use std::sync::Arc; use api::prom_store::remote::{Label, TimeSeries}; use api::v1::OpType; -use arrow_array::{Float64Array, TimestampMillisecondArray, UInt64Array, UInt8Array}; +use arrow_array::{ + Array, Float64Array, StringArray, TimestampMillisecondArray, UInt64Array, UInt8Array, +}; use datatypes::value::ValueRef; -use futures::AsyncBufReadExt; +use futures::{AsyncBufReadExt, StreamExt}; use metric_engine::row_modifier::TsidGenerator; +use mito2::error::ReadParquetSnafu; use mito2::read::{Batch, BatchBuilder, BatchReader}; use mito2::row_converter::SparsePrimaryKeyCodec; use object_store::{FuturesAsyncReader, ObjectStore, Reader}; +use parquet::arrow::async_reader::ParquetRecordBatchStream; +use parquet::arrow::ParquetRecordBatchStreamBuilder; +use parquet_opendal::AsyncReader; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; use store_api::storage::consts::ReservedColumnId; @@ -152,7 +158,7 @@ impl SeriesConverter { } /// Prometheus remote write NDJSON reader. -struct TimeSeriesReader { +pub struct TimeSeriesReader { reader: FuturesAsyncReader, buffer: String, } @@ -187,6 +193,92 @@ impl TimeSeriesReader { } } +/// Prometheus remote write parquet reader. +pub struct TimeSeriesParquetReader { + path: String, + stream: ParquetRecordBatchStream, + /// Is the stream EOF. + eof: bool, + /// Current string array. + array: Option, + /// Current row in the record batch. + /// Only valid when the record batch is Some. + row_index: usize, +} + +impl TimeSeriesParquetReader { + /// Creates a new instance of `TimeSeriesParquetReader`. + pub async fn new(object_store: ObjectStore, path: &str) -> Result { + let reader = object_store + .reader_with(path) + .await + .context(ObjectStoreSnafu)?; + let content_len = object_store + .stat(path) + .await + .context(ObjectStoreSnafu)? + .content_length(); + let reader = AsyncReader::new(reader, content_len).with_prefetch_footer_size(512 * 1024); + let stream = ParquetRecordBatchStreamBuilder::new(reader) + .await + .context(ReadParquetSnafu { path }) + .context(MitoSnafu)? + .build() + .context(ReadParquetSnafu { path }) + .context(MitoSnafu)?; + + Ok(Self { + path: path.to_string(), + stream, + eof: false, + array: None, + row_index: 0, + }) + } + + /// Reads the next timeseries from the reader. + pub async fn next_series(&mut self) -> Result> { + if self.eof { + return Ok(None); + } + + if let Some(array) = &self.array { + if self.row_index >= array.len() { + let Some(record_batch) = self + .stream + .next() + .await + .transpose() + .context(ReadParquetSnafu { path: &self.path }) + .context(MitoSnafu)? + else { + self.eof = true; + return Ok(None); + }; + + self.row_index = 0; + self.array = Some( + record_batch + .column(0) + .as_any() + .downcast_ref::() + .context(MissingColumnSnafu { + column_name: "remote write json column", + })? + .clone(), + ); + } + } + + let array = self.array.as_ref().unwrap(); + let value = array.value(self.row_index); + let time_series = serde_json::from_str(value).context(JsonSnafu)?; + self.row_index += 1; + + Ok(Some(time_series)) + } +} + /// Encoder to encode labels into primary key. struct PrimaryKeyEncoder { /// Catalog name.