feat: sort time series

This commit is contained in:
evenyag
2025-03-08 21:56:25 +08:00
parent 8796ddaf31
commit a57e263e5a
9 changed files with 358 additions and 9 deletions

5
Cargo.lock generated
View File

@@ -11225,15 +11225,20 @@ dependencies = [
"async-trait",
"catalog",
"common-error",
"common-macro",
"common-meta",
"common-recordbatch",
"datatypes",
"futures",
"futures-util",
"meta-client",
"metric-engine",
"mito2",
"object-store",
"parquet",
"parquet_opendal",
"serde",
"serde_json",
"snafu 0.8.5",
"store-api",
"table",

View File

@@ -59,7 +59,7 @@ pub mod engine;
pub mod error;
mod metadata_region;
mod metrics;
mod row_modifier;
pub mod row_modifier;
#[cfg(test)]
mod test_util;
mod utils;

View File

@@ -40,7 +40,7 @@ const TSID_HASH_SEED: u32 = 846793005;
///
/// - For [`PrimaryKeyEncoding::Dense`] encoding,
/// it adds two columns(`__table_id`, `__tsid`) to the row.
pub struct RowModifier {
pub(crate) struct RowModifier {
codec: SparsePrimaryKeyCodec,
}
@@ -52,7 +52,7 @@ impl RowModifier {
}
/// Modify rows with the given primary key encoding.
pub fn modify_rows(
pub(crate) fn modify_rows(
&self,
iter: RowsIter,
table_id: TableId,
@@ -145,16 +145,14 @@ impl RowModifier {
/// Fills internal columns of a row with table name and a hash of tag values.
fn fill_internal_columns(&self, table_id: TableId, iter: &RowIter<'_>) -> (Value, Value) {
let mut hasher = mur3::Hasher128::with_seed(TSID_HASH_SEED);
let mut hasher = TsidGenerator::default();
for (name, value) in iter.primary_keys_with_name() {
// The type is checked before. So only null is ignored.
if let Some(ValueData::StringValue(string)) = &value.value_data {
name.hash(&mut hasher);
string.hash(&mut hasher);
hasher.write_label(name, string);
}
}
// TSID is 64 bits, simply truncate the 128 bits hash
let (hash, _) = hasher.finish128();
let hash = hasher.finish();
(
ValueData::U32Value(table_id).into(),
@@ -163,6 +161,34 @@ impl RowModifier {
}
}
/// Tsid generator.
pub struct TsidGenerator {
hasher: mur3::Hasher128,
}
impl Default for TsidGenerator {
fn default() -> Self {
Self {
hasher: mur3::Hasher128::with_seed(TSID_HASH_SEED),
}
}
}
impl TsidGenerator {
/// Writes a label pair to the generator.
pub fn write_label(&mut self, name: &str, value: &str) {
name.hash(&mut self.hasher);
value.hash(&mut self.hasher);
}
/// Generates a new TSID.
pub fn finish(&mut self) -> u64 {
// TSID is 64 bits, simply truncate the 128 bits hash
let (hash, _) = self.hasher.finish128();
hash
}
}
/// Index of a value.
#[derive(Debug, Clone, Copy)]
struct ValueIndex {

View File

@@ -33,6 +33,8 @@ use crate::row_converter::dense::SortField;
use crate::row_converter::{CompositeValues, PrimaryKeyCodec, PrimaryKeyFilter};
/// A codec for sparse key of metrics.
/// It requires the input primary key columns are sorted by the column name in lexicographical order.
/// It encodes the column id of the physical region.
#[derive(Clone, Debug)]
pub struct SparsePrimaryKeyCodec {
inner: Arc<SparsePrimaryKeyCodecInner>,

View File

@@ -10,15 +10,20 @@ arrow-array.workspace = true
async-trait.workspace = true
catalog.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-meta.workspace = true
common-recordbatch.workspace = true
datatypes.workspace = true
futures-util.workspace = true
meta-client.workspace = true
futures.workspace = true
metric-engine.workspace = true
mito2.workspace = true
object-store.workspace = true
parquet.workspace = true
parquet_opendal = "0.3.0"
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true
store-api.workspace = true
table.workspace = true

View File

@@ -0,0 +1,80 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Errors for SST conversion.
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Object store error"))]
ObjectStore {
#[snafu(source)]
error: object_store::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("I/O error"))]
Io {
#[snafu(source)]
error: std::io::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("JSON error"))]
Json {
#[snafu(source)]
error: serde_json::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Missing __name__ label"))]
MissingMetricName {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Table not found"))]
MissingTable {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Mito error"))]
Mito {
source: mito2::error::Error,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
StatusCode::Unexpected
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod error;
mod reader;
mod table;
pub mod writer;

View File

@@ -12,4 +12,233 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Prometheus remote write format support.
//! Prometheus remote write JSON support.
//!
//! The Prometheus remote write JSON format is a NDJSON representation of the Prometheus remote write protocol.
//! - Each Line contains a single timeseries.
//! - Each series only occurs once.
use std::collections::HashMap;
use api::prom_store::remote::{Label, TimeSeries};
use datatypes::value::ValueRef;
use futures::AsyncBufReadExt;
use metric_engine::row_modifier::TsidGenerator;
use mito2::read::{Batch, BatchReader};
use mito2::row_converter::SparsePrimaryKeyCodec;
use object_store::{FuturesAsyncReader, ObjectStore, Reader};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::consts::ReservedColumnId;
use table::metadata::TableId;
use crate::error::{
IoSnafu, JsonSnafu, MissingMetricNameSnafu, MissingTableSnafu, MitoSnafu, ObjectStoreSnafu,
Result,
};
use crate::table::TableMetadataHelper;
const METRIC_NAME_LABEL: &str = "__name__";
/// A reader that reads remote write file, sorts and outputs timeseries in the primary key order.
struct RemoteWriteReader {
/// Timeseries sorted by primary key.
series: Vec<(Vec<u8>, TimeSeries)>,
/// Current index in the series.
index: usize,
}
impl RemoteWriteReader {
/// Creates a new [`RemoteWriteReader`] from a object store.
/// It reads and sorts timeseries.
pub async fn open(
operator: ObjectStore,
path: &str,
catalog: String,
schema: String,
metadata: RegionMetadataRef,
table_helper: TableMetadataHelper,
) -> Result<Self> {
let codec = SparsePrimaryKeyCodec::new(&metadata);
let encoder = PrimaryKeyEncoder {
catalog,
schema,
metadata,
table_helper,
table_ids: HashMap::new(),
codec,
};
let mut sorter = TimeSeriesSorter::new(encoder);
let reader = operator.reader(path).await.context(ObjectStoreSnafu)?;
let mut reader = TimeSeriesReader::new(reader).await?;
while let Some(series) = reader.next_series().await? {
sorter.push(series).await?;
}
let series = sorter.sort();
Ok(Self { series, index: 0 })
}
}
#[async_trait::async_trait]
impl BatchReader for RemoteWriteReader {
async fn next_batch(&mut self) -> mito2::error::Result<Option<Batch>> {
todo!()
}
}
/// Prometheus remote write NDJSON reader.
struct TimeSeriesReader {
reader: FuturesAsyncReader,
buffer: String,
}
impl TimeSeriesReader {
/// Creates a new [`TimeSeriesReader`] from a [`Reader`].
pub async fn new(reader: Reader) -> Result<Self> {
let reader = reader
.into_futures_async_read(..)
.await
.context(ObjectStoreSnafu)?;
Ok(Self {
reader,
buffer: String::new(),
})
}
/// Reads the next timeseries from the reader.
pub async fn next_series(&mut self) -> Result<Option<TimeSeries>> {
self.buffer.clear();
self.reader
.read_line(&mut self.buffer)
.await
.context(IoSnafu)?;
if self.buffer.is_empty() {
return Ok(None);
}
let time_series = serde_json::from_str(&self.buffer).context(JsonSnafu)?;
Ok(Some(time_series))
}
}
/// Encoder to encode labels into primary key.
struct PrimaryKeyEncoder {
/// Catalog name.
catalog: String,
/// Schema name.
schema: String,
/// The metadata of the physical region.
metadata: RegionMetadataRef,
/// Helper to get table metadata.
table_helper: TableMetadataHelper,
/// Cached table name to table id.
table_ids: HashMap<String, TableId>,
/// Primary key encoder.
codec: SparsePrimaryKeyCodec,
}
impl PrimaryKeyEncoder {
/// Encodes the primary key for the given labels.
/// It'll sort the labels by name before encoding.
async fn encode_primary_key(
&mut self,
labels: &mut Vec<Label>,
key_buf: &mut Vec<u8>,
) -> Result<()> {
if !labels.is_sorted_by(|left, right| left.name <= right.name) {
labels.sort_unstable_by(|left, right| left.name.cmp(&right.name));
}
// Gets the table id from the label.
let name_label = labels
.iter()
.find(|label| label.name == METRIC_NAME_LABEL)
.context(MissingMetricNameSnafu)?;
let table_id = match self.table_ids.get(&name_label.name) {
Some(id) => *id,
None => {
let table_info = self
.table_helper
.get_table(&self.catalog, &self.schema, &name_label.name)
.await
.context(MissingTableSnafu)?;
let id = table_info.table_info.ident.table_id;
self.table_ids.insert(name_label.name.clone(), id);
id
}
};
// Computes the tsid for the given labels.
let mut generator = TsidGenerator::default();
for label in &*labels {
if label.name != METRIC_NAME_LABEL {
generator.write_label(&label.name, &label.value);
}
}
let tsid = generator.finish();
key_buf.clear();
let internal_columns = [
(ReservedColumnId::table_id(), ValueRef::UInt32(table_id)),
(ReservedColumnId::tsid(), ValueRef::UInt64(tsid)),
];
self.codec
.encode_to_vec(internal_columns.into_iter(), key_buf)
.context(MitoSnafu)?;
let label_iter = labels.iter().filter_map(|label| {
if label.name == METRIC_NAME_LABEL {
return None;
}
let column_id = self.metadata.column_by_name(&label.name)?.column_id;
Some((column_id, ValueRef::String(&label.value)))
});
self.codec
.encode_to_vec(label_iter, key_buf)
.context(MitoSnafu)?;
Ok(())
}
}
/// Prom timeseries sorter.
/// Sorts timeseries by the primary key.
struct TimeSeriesSorter {
/// Timeseries to sort.
series: Vec<(Vec<u8>, TimeSeries)>,
/// Key encoder.
encoder: PrimaryKeyEncoder,
}
impl TimeSeriesSorter {
/// Creates a new sorter.
fn new(encoder: PrimaryKeyEncoder) -> Self {
Self {
series: Vec::new(),
encoder,
}
}
/// Push a timeseries to the sorter.
async fn push(&mut self, mut series: TimeSeries) -> Result<()> {
let mut key_buf = Vec::new();
self.encoder
.encode_primary_key(&mut series.labels, &mut key_buf)
.await?;
self.series.push((key_buf, series));
Ok(())
}
/// Sorts the requests.
/// Returns the sorted timeseries and the primary key.
fn sort(&mut self) -> Vec<(Vec<u8>, TimeSeries)> {
self.series
.sort_unstable_by(|left, right| left.0.cmp(&right.0));
std::mem::take(&mut self.series)
}
}

View File

@@ -22,6 +22,7 @@ use common_meta::key::table_name::TableNameKey;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use meta_client::{MetaClientOptions, MetaClientType};
#[derive(Clone)]
pub struct TableMetadataHelper {
table_metadata_manager: TableMetadataManagerRef,
}