feat: parquet remote write reader

This commit is contained in:
evenyag
2025-03-09 23:41:16 +08:00
parent cbd58291da
commit 8aadd1e59a
3 changed files with 97 additions and 5 deletions

View File

@@ -14,7 +14,7 @@
pub mod converter;
pub mod error;
mod reader;
pub mod reader;
mod table;
pub mod writer;

View File

@@ -36,7 +36,7 @@ use crate::table::TableMetadataHelper;
use crate::OpenDALParquetReader;
pub(crate) mod parquet;
mod remote_write;
pub mod remote_write;
/// Reader and context.
pub struct ReaderInfo {

View File

@@ -23,13 +23,19 @@ use std::sync::Arc;
use api::prom_store::remote::{Label, TimeSeries};
use api::v1::OpType;
use arrow_array::{Float64Array, TimestampMillisecondArray, UInt64Array, UInt8Array};
use arrow_array::{
Array, Float64Array, StringArray, TimestampMillisecondArray, UInt64Array, UInt8Array,
};
use datatypes::value::ValueRef;
use futures::AsyncBufReadExt;
use futures::{AsyncBufReadExt, StreamExt};
use metric_engine::row_modifier::TsidGenerator;
use mito2::error::ReadParquetSnafu;
use mito2::read::{Batch, BatchBuilder, BatchReader};
use mito2::row_converter::SparsePrimaryKeyCodec;
use object_store::{FuturesAsyncReader, ObjectStore, Reader};
use parquet::arrow::async_reader::ParquetRecordBatchStream;
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet_opendal::AsyncReader;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::consts::ReservedColumnId;
@@ -152,7 +158,7 @@ impl SeriesConverter {
}
/// Prometheus remote write NDJSON reader.
struct TimeSeriesReader {
pub struct TimeSeriesReader {
reader: FuturesAsyncReader,
buffer: String,
}
@@ -187,6 +193,92 @@ impl TimeSeriesReader {
}
}
/// Prometheus remote write parquet reader.
pub struct TimeSeriesParquetReader {
path: String,
stream: ParquetRecordBatchStream<AsyncReader>,
/// Is the stream EOF.
eof: bool,
/// Current string array.
array: Option<StringArray>,
/// Current row in the record batch.
/// Only valid when the record batch is Some.
row_index: usize,
}
impl TimeSeriesParquetReader {
/// Creates a new instance of `TimeSeriesParquetReader`.
pub async fn new(object_store: ObjectStore, path: &str) -> Result<Self> {
let reader = object_store
.reader_with(path)
.await
.context(ObjectStoreSnafu)?;
let content_len = object_store
.stat(path)
.await
.context(ObjectStoreSnafu)?
.content_length();
let reader = AsyncReader::new(reader, content_len).with_prefetch_footer_size(512 * 1024);
let stream = ParquetRecordBatchStreamBuilder::new(reader)
.await
.context(ReadParquetSnafu { path })
.context(MitoSnafu)?
.build()
.context(ReadParquetSnafu { path })
.context(MitoSnafu)?;
Ok(Self {
path: path.to_string(),
stream,
eof: false,
array: None,
row_index: 0,
})
}
/// Reads the next timeseries from the reader.
pub async fn next_series(&mut self) -> Result<Option<TimeSeries>> {
if self.eof {
return Ok(None);
}
if let Some(array) = &self.array {
if self.row_index >= array.len() {
let Some(record_batch) = self
.stream
.next()
.await
.transpose()
.context(ReadParquetSnafu { path: &self.path })
.context(MitoSnafu)?
else {
self.eof = true;
return Ok(None);
};
self.row_index = 0;
self.array = Some(
record_batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.context(MissingColumnSnafu {
column_name: "remote write json column",
})?
.clone(),
);
}
}
let array = self.array.as_ref().unwrap();
let value = array.value(self.row_index);
let time_series = serde_json::from_str(value).context(JsonSnafu)?;
self.row_index += 1;
Ok(Some(time_series))
}
}
/// Encoder to encode labels into primary key.
struct PrimaryKeyEncoder {
/// Catalog name.