feat: use pb in parquet

This commit is contained in:
evenyag
2025-03-10 14:40:29 +08:00
parent dd9d13e7df
commit c80a73bc20
4 changed files with 30 additions and 18 deletions

1
Cargo.lock generated
View File

@@ -11279,6 +11279,7 @@ dependencies = [
"object-store",
"parquet",
"parquet_opendal",
"prost 0.13.3",
"serde",
"serde_json",
"snafu 0.8.5",

View File

@@ -25,6 +25,7 @@ mito2.workspace = true
object-store.workspace = true
parquet.workspace = true
parquet_opendal = "0.3.0"
prost.workspace = true
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true

View File

@@ -94,6 +94,14 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Decode error"))]
Decode {
#[snafu(source)]
error: prost::DecodeError,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -111,6 +119,7 @@ impl ErrorExt for Error {
Error::Datanode { source, .. } => source.status_code(),
Error::Meta { source, .. } => source.status_code(),
Error::MetaClient { source, .. } => source.status_code(),
Error::Decode { .. } => StatusCode::InvalidArguments,
}
}

View File

@@ -12,9 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Prometheus remote write JSON support.
//! Prometheus remote write support.
//!
//! The Prometheus remote write JSON format is a NDJSON representation of the Prometheus remote write protocol.
//! The Prometheus remote write format is a Parquet representation of the Prometheus remote write protocol.
//! - Each Line contains a single timeseries.
//! - Each series only occurs once.
@@ -24,7 +24,7 @@ use std::sync::Arc;
use api::prom_store::remote::{Label, TimeSeries};
use api::v1::OpType;
use arrow_array::{
Array, Float64Array, StringArray, TimestampMillisecondArray, UInt64Array, UInt8Array,
Array, BinaryArray, Float64Array, TimestampMillisecondArray, UInt64Array, UInt8Array,
};
use datatypes::value::ValueRef;
use futures::{AsyncBufReadExt, StreamExt};
@@ -36,6 +36,7 @@ use object_store::{FuturesAsyncReader, ObjectStore, Reader};
use parquet::arrow::async_reader::ParquetRecordBatchStream;
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet_opendal::AsyncReader;
use prost::Message;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::consts::ReservedColumnId;
@@ -43,8 +44,8 @@ use store_api::storage::{ColumnId, SequenceNumber};
use table::metadata::TableId;
use crate::error::{
IoSnafu, JsonSnafu, MissingColumnSnafu, MissingMetricNameSnafu, MissingTableSnafu, MitoSnafu,
ObjectStoreSnafu, Result,
DecodeSnafu, IoSnafu, JsonSnafu, MissingColumnSnafu, MissingMetricNameSnafu, MissingTableSnafu,
MitoSnafu, ObjectStoreSnafu, Result,
};
use crate::table::TableMetadataHelper;
@@ -198,8 +199,8 @@ pub struct TimeSeriesParquetReader {
stream: ParquetRecordBatchStream<AsyncReader>,
/// Is the stream EOF.
eof: bool,
/// Current string array.
array: Option<StringArray>,
/// Current binary array.
array: Option<BinaryArray>,
/// Current row in the record batch.
/// Only valid when the record batch is Some.
row_index: usize,
@@ -260,21 +261,21 @@ impl TimeSeriesParquetReader {
self.eof = true;
return Ok(None);
};
self.array = Some(
record_batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.context(MissingColumnSnafu {
column_name: "remote write json column",
})?
.clone(),
);
let array = record_batch
.column(0)
.as_any()
.downcast_ref::<BinaryArray>()
.context(MissingColumnSnafu {
column_name: "remote write json column",
})?
.clone();
assert!(!array.is_empty());
self.array = Some(array);
}
let array = self.array.as_ref().unwrap();
let value = array.value(self.row_index);
let time_series = serde_json::from_str(value).context(JsonSnafu)?;
let time_series = TimeSeries::decode(value).context(DecodeSnafu)?;
self.row_index += 1;
Ok(Some(time_series))