feat: series to batch

This commit is contained in:
evenyag
2025-03-09 15:07:53 +08:00
parent 77dee84a75
commit 6ad186a13e
2 changed files with 93 additions and 12 deletions

View File

@@ -53,8 +53,16 @@ pub enum Error {
location: Location,
},
#[snafu(display("Table not found"))]
#[snafu(display("Table not found: {}", table_name))]
MissingTable {
table_name: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Column not found: {}", column_name))]
MissingColumn {
column_name: String,
#[snafu(implicit)]
location: Location,
},
@@ -71,7 +79,15 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
StatusCode::Unexpected
match self {
Error::ObjectStore { .. } => StatusCode::StorageUnavailable,
Error::Io { .. } => StatusCode::StorageUnavailable,
Error::Json { .. } => StatusCode::InvalidArguments,
Error::MissingMetricName { .. } => StatusCode::InvalidArguments,
Error::MissingTable { .. } => StatusCode::TableNotFound,
Error::MissingColumn { .. } => StatusCode::TableColumnNotFound,
Error::Mito { source, .. } => source.status_code(),
}
}
fn as_any(&self) -> &dyn std::any::Any {

View File

@@ -19,33 +19,39 @@
//! - Each series only occurs once.
use std::collections::HashMap;
use std::sync::Arc;
use api::prom_store::remote::{Label, TimeSeries};
use api::v1::OpType;
use arrow_array::{Float64Array, TimestampMillisecondArray, UInt64Array, UInt8Array};
use datatypes::value::ValueRef;
use futures::AsyncBufReadExt;
use metric_engine::row_modifier::TsidGenerator;
use mito2::read::{Batch, BatchReader};
use mito2::read::{Batch, BatchBuilder, BatchReader};
use mito2::row_converter::SparsePrimaryKeyCodec;
use object_store::{FuturesAsyncReader, ObjectStore, Reader};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::{ColumnId, SequenceNumber};
use table::metadata::TableId;
use crate::error::{
IoSnafu, JsonSnafu, MissingMetricNameSnafu, MissingTableSnafu, MitoSnafu, ObjectStoreSnafu,
Result,
IoSnafu, JsonSnafu, MissingColumnSnafu, MissingMetricNameSnafu, MissingTableSnafu, MitoSnafu,
ObjectStoreSnafu, Result,
};
use crate::table::TableMetadataHelper;
const METRIC_NAME_LABEL: &str = "__name__";
const GREPTIME_VALUE: &str = "greptime_value";
/// A reader that reads remote write file, sorts and outputs timeseries in the primary key order.
struct RemoteWriteReader {
/// Timeseries sorted by primary key.
/// Timeseries sorted by primary key in reverse order.
/// So we can pop the series.
series: Vec<(Vec<u8>, TimeSeries)>,
/// Current index in the series.
index: usize,
/// Converter for converting timeseries to batches.
converter: SeriesConverter,
}
impl RemoteWriteReader {
@@ -60,6 +66,12 @@ impl RemoteWriteReader {
table_helper: TableMetadataHelper,
) -> Result<Self> {
let codec = SparsePrimaryKeyCodec::new(&metadata);
let value_id = metadata
.column_by_name(GREPTIME_VALUE)
.context(MissingColumnSnafu {
column_name: GREPTIME_VALUE,
})?
.column_id;
let encoder = PrimaryKeyEncoder {
catalog,
schema,
@@ -68,6 +80,10 @@ impl RemoteWriteReader {
table_ids: HashMap::new(),
codec,
};
let converter = SeriesConverter {
value_id,
override_sequence: None,
};
let mut sorter = TimeSeriesSorter::new(encoder);
let reader = operator.reader(path).await.context(ObjectStoreSnafu)?;
@@ -75,16 +91,63 @@ impl RemoteWriteReader {
while let Some(series) = reader.next_series().await? {
sorter.push(series).await?;
}
let series = sorter.sort();
let mut series = sorter.sort();
series.reverse();
Ok(Self { series, index: 0 })
Ok(Self { series, converter })
}
/// Sets the sequence number of the batch.
/// Otherwise, the sequence number will be 0.
pub fn with_sequence(mut self, sequence: SequenceNumber) -> Self {
self.converter.override_sequence = Some(sequence);
self
}
fn next_series(&mut self) -> Option<(Vec<u8>, TimeSeries)> {
self.series.pop()
}
}
#[async_trait::async_trait]
impl BatchReader for RemoteWriteReader {
async fn next_batch(&mut self) -> mito2::error::Result<Option<Batch>> {
todo!()
let Some((pk, series)) = self.next_series() else {
return Ok(None);
};
self.converter.convert(pk, series).map(Some)
}
}
struct SeriesConverter {
/// Column id for the value field.
value_id: ColumnId,
/// Sequence number of the batch.
override_sequence: Option<SequenceNumber>,
}
impl SeriesConverter {
/// Builds a batch from a primary key and a time series.
fn convert(&self, primary_key: Vec<u8>, series: TimeSeries) -> mito2::error::Result<Batch> {
let num_rows = series.samples.len();
let op_types = vec![OpType::Put as u8; num_rows];
// TODO(yingwen): Should we use 0 or 1 as default?
let sequences = vec![self.override_sequence.unwrap_or(0); num_rows];
let mut timestamps = Vec::with_capacity(num_rows);
let mut values = Vec::with_capacity(num_rows);
for sample in series.samples {
timestamps.push(sample.timestamp);
values.push(sample.value);
}
let mut builder = BatchBuilder::new(primary_key);
builder
.timestamps_array(Arc::new(TimestampMillisecondArray::from(timestamps)))?
.sequences_array(Arc::new(UInt64Array::from(sequences)))?
.op_types_array(Arc::new(UInt8Array::from(op_types)))?
.push_field_array(self.value_id, Arc::new(Float64Array::from(values)))?;
builder.build()
}
}
@@ -164,7 +227,9 @@ impl PrimaryKeyEncoder {
.table_helper
.get_table(&self.catalog, &self.schema, &name_label.name)
.await
.context(MissingTableSnafu)?;
.context(MissingTableSnafu {
table_name: &name_label.name,
})?;
let id = table_info.table_info.ident.table_id;
self.table_ids.insert(name_label.name.clone(), id);