chore: remove unused codes

This commit is contained in:
evenyag
2025-03-17 15:20:42 +08:00
parent bc9614e22c
commit d5760a7348
4 changed files with 8 additions and 65 deletions

2
Cargo.lock generated
View File

@@ -11280,8 +11280,6 @@ dependencies = [
"parquet",
"parquet_opendal",
"prost 0.13.3",
"serde",
"serde_json",
"snafu 0.8.5",
"store-api",
"table",

View File

@@ -26,8 +26,6 @@ object-store.workspace = true
parquet.workspace = true
parquet_opendal = "0.3.0"
prost.workspace = true
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true
store-api.workspace = true
table.workspace = true

View File

@@ -31,22 +31,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("I/O error"))]
Io {
#[snafu(source)]
error: std::io::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("JSON error"))]
Json {
#[snafu(source)]
error: serde_json::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Missing __name__ label"))]
MissingMetricName {
#[snafu(implicit)]
@@ -110,8 +94,6 @@ impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::ObjectStore { .. } => StatusCode::StorageUnavailable,
Error::Io { .. } => StatusCode::StorageUnavailable,
Error::Json { .. } => StatusCode::InvalidArguments,
Error::MissingMetricName { .. } => StatusCode::InvalidArguments,
Error::MissingTable { .. } => StatusCode::TableNotFound,
Error::MissingColumn { .. } => StatusCode::TableColumnNotFound,

View File

@@ -14,9 +14,10 @@
//! Prometheus remote write support.
//!
//! The Prometheus remote write format is a Parquet representation of the Prometheus remote write protocol.
//! - Each Line contains a single timeseries.
//! - Each series only occurs once.
//! Prometheus remote write protocol in parquet format.
//! - Each row contains a protobuf binary representation of a single timeseries.
//! - Each series only occurs once in the file.
//! - Each timeseries must has a the `__name__` label.
use std::collections::HashMap;
use std::sync::Arc;
@@ -27,12 +28,12 @@ use arrow_array::{
Array, BinaryArray, Float64Array, TimestampMillisecondArray, UInt64Array, UInt8Array,
};
use datatypes::value::ValueRef;
use futures::{AsyncBufReadExt, StreamExt};
use futures::StreamExt;
use metric_engine::row_modifier::TsidGenerator;
use mito2::error::ReadParquetSnafu;
use mito2::read::{Batch, BatchBuilder, BatchReader};
use mito2::row_converter::SparsePrimaryKeyCodec;
use object_store::{FuturesAsyncReader, ObjectStore, Reader};
use object_store::ObjectStore;
use parquet::arrow::async_reader::ParquetRecordBatchStream;
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet_opendal::AsyncReader;
@@ -44,8 +45,8 @@ use store_api::storage::{ColumnId, SequenceNumber};
use table::metadata::TableId;
use crate::error::{
DecodeSnafu, IoSnafu, JsonSnafu, MissingColumnSnafu, MissingMetricNameSnafu, MissingTableSnafu,
MitoSnafu, ObjectStoreSnafu, Result,
DecodeSnafu, MissingColumnSnafu, MissingMetricNameSnafu, MissingTableSnafu, MitoSnafu,
ObjectStoreSnafu, Result,
};
use crate::table::TableMetadataHelper;
@@ -157,42 +158,6 @@ impl SeriesConverter {
}
}
/// Prometheus remote write NDJSON reader.
pub struct TimeSeriesReader {
reader: FuturesAsyncReader,
buffer: String,
}
impl TimeSeriesReader {
/// Creates a new [`TimeSeriesReader`] from a [`Reader`].
pub async fn new(reader: Reader) -> Result<Self> {
let reader = reader
.into_futures_async_read(..)
.await
.context(ObjectStoreSnafu)?;
Ok(Self {
reader,
buffer: String::new(),
})
}
/// Reads the next timeseries from the reader.
pub async fn next_series(&mut self) -> Result<Option<TimeSeries>> {
self.buffer.clear();
self.reader
.read_line(&mut self.buffer)
.await
.context(IoSnafu)?;
if self.buffer.is_empty() {
return Ok(None);
}
let time_series = serde_json::from_str(&self.buffer).context(JsonSnafu)?;
Ok(Some(time_series))
}
}
/// Prometheus remote write parquet reader.
pub struct TimeSeriesParquetReader {
path: String,