diff --git a/Cargo.lock b/Cargo.lock index 262457400b..e3fc0afd89 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11225,15 +11225,20 @@ dependencies = [ "async-trait", "catalog", "common-error", + "common-macro", "common-meta", "common-recordbatch", "datatypes", + "futures", "futures-util", "meta-client", + "metric-engine", "mito2", "object-store", "parquet", "parquet_opendal", + "serde", + "serde_json", "snafu 0.8.5", "store-api", "table", diff --git a/src/metric-engine/src/lib.rs b/src/metric-engine/src/lib.rs index 7c6fe5c9f4..597e8f5897 100644 --- a/src/metric-engine/src/lib.rs +++ b/src/metric-engine/src/lib.rs @@ -59,7 +59,7 @@ pub mod engine; pub mod error; mod metadata_region; mod metrics; -mod row_modifier; +pub mod row_modifier; #[cfg(test)] mod test_util; mod utils; diff --git a/src/metric-engine/src/row_modifier.rs b/src/metric-engine/src/row_modifier.rs index 5fcf3c0d23..2af296cb95 100644 --- a/src/metric-engine/src/row_modifier.rs +++ b/src/metric-engine/src/row_modifier.rs @@ -40,7 +40,7 @@ const TSID_HASH_SEED: u32 = 846793005; /// /// - For [`PrimaryKeyEncoding::Dense`] encoding, /// it adds two columns(`__table_id`, `__tsid`) to the row. -pub struct RowModifier { +pub(crate) struct RowModifier { codec: SparsePrimaryKeyCodec, } @@ -52,7 +52,7 @@ impl RowModifier { } /// Modify rows with the given primary key encoding. - pub fn modify_rows( + pub(crate) fn modify_rows( &self, iter: RowsIter, table_id: TableId, @@ -145,16 +145,14 @@ impl RowModifier { /// Fills internal columns of a row with table name and a hash of tag values. fn fill_internal_columns(&self, table_id: TableId, iter: &RowIter<'_>) -> (Value, Value) { - let mut hasher = mur3::Hasher128::with_seed(TSID_HASH_SEED); + let mut hasher = TsidGenerator::default(); for (name, value) in iter.primary_keys_with_name() { // The type is checked before. So only null is ignored. if let Some(ValueData::StringValue(string)) = &value.value_data { - name.hash(&mut hasher); - string.hash(&mut hasher); + hasher.write_label(name, string); } } - // TSID is 64 bits, simply truncate the 128 bits hash - let (hash, _) = hasher.finish128(); + let hash = hasher.finish(); ( ValueData::U32Value(table_id).into(), @@ -163,6 +161,34 @@ impl RowModifier { } } +/// Tsid generator. +pub struct TsidGenerator { + hasher: mur3::Hasher128, +} + +impl Default for TsidGenerator { + fn default() -> Self { + Self { + hasher: mur3::Hasher128::with_seed(TSID_HASH_SEED), + } + } +} + +impl TsidGenerator { + /// Writes a label pair to the generator. + pub fn write_label(&mut self, name: &str, value: &str) { + name.hash(&mut self.hasher); + value.hash(&mut self.hasher); + } + + /// Generates a new TSID. + pub fn finish(&mut self) -> u64 { + // TSID is 64 bits, simply truncate the 128 bits hash + let (hash, _) = self.hasher.finish128(); + hash + } +} + /// Index of a value. #[derive(Debug, Clone, Copy)] struct ValueIndex { diff --git a/src/mito2/src/row_converter/sparse.rs b/src/mito2/src/row_converter/sparse.rs index 92c69acb9b..38f5268974 100644 --- a/src/mito2/src/row_converter/sparse.rs +++ b/src/mito2/src/row_converter/sparse.rs @@ -33,6 +33,8 @@ use crate::row_converter::dense::SortField; use crate::row_converter::{CompositeValues, PrimaryKeyCodec, PrimaryKeyFilter}; /// A codec for sparse key of metrics. +/// It requires the input primary key columns are sorted by the column name in lexicographical order. +/// It encodes the column id of the physical region. #[derive(Clone, Debug)] pub struct SparsePrimaryKeyCodec { inner: Arc, diff --git a/src/sst-convert/Cargo.toml b/src/sst-convert/Cargo.toml index c5d1673308..7a6db63d28 100644 --- a/src/sst-convert/Cargo.toml +++ b/src/sst-convert/Cargo.toml @@ -10,15 +10,20 @@ arrow-array.workspace = true async-trait.workspace = true catalog.workspace = true common-error.workspace = true +common-macro.workspace = true common-meta.workspace = true common-recordbatch.workspace = true datatypes.workspace = true futures-util.workspace = true meta-client.workspace = true +futures.workspace = true +metric-engine.workspace = true mito2.workspace = true object-store.workspace = true parquet.workspace = true parquet_opendal = "0.3.0" +serde.workspace = true +serde_json.workspace = true snafu.workspace = true store-api.workspace = true table.workspace = true diff --git a/src/sst-convert/src/error.rs b/src/sst-convert/src/error.rs new file mode 100644 index 0000000000..441ad8346a --- /dev/null +++ b/src/sst-convert/src/error.rs @@ -0,0 +1,80 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Errors for SST conversion. + +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use common_macro::stack_trace_debug; +use snafu::{Location, Snafu}; + +#[derive(Snafu)] +#[snafu(visibility(pub))] +#[stack_trace_debug] +pub enum Error { + #[snafu(display("Object store error"))] + ObjectStore { + #[snafu(source)] + error: object_store::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("I/O error"))] + Io { + #[snafu(source)] + error: std::io::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("JSON error"))] + Json { + #[snafu(source)] + error: serde_json::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Missing __name__ label"))] + MissingMetricName { + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Table not found"))] + MissingTable { + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Mito error"))] + Mito { + source: mito2::error::Error, + #[snafu(implicit)] + location: Location, + }, +} + +pub type Result = std::result::Result; + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + StatusCode::Unexpected + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } +} diff --git a/src/sst-convert/src/lib.rs b/src/sst-convert/src/lib.rs index e4ccd8195f..b4b588ecc6 100644 --- a/src/sst-convert/src/lib.rs +++ b/src/sst-convert/src/lib.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod error; mod reader; mod table; pub mod writer; diff --git a/src/sst-convert/src/reader/remote_write.rs b/src/sst-convert/src/reader/remote_write.rs index 5f83cfee4e..db14ac6fd5 100644 --- a/src/sst-convert/src/reader/remote_write.rs +++ b/src/sst-convert/src/reader/remote_write.rs @@ -12,4 +12,233 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Prometheus remote write format support. +//! Prometheus remote write JSON support. +//! +//! The Prometheus remote write JSON format is a NDJSON representation of the Prometheus remote write protocol. +//! - Each Line contains a single timeseries. +//! - Each series only occurs once. + +use std::collections::HashMap; + +use api::prom_store::remote::{Label, TimeSeries}; +use datatypes::value::ValueRef; +use futures::AsyncBufReadExt; +use metric_engine::row_modifier::TsidGenerator; +use mito2::read::{Batch, BatchReader}; +use mito2::row_converter::SparsePrimaryKeyCodec; +use object_store::{FuturesAsyncReader, ObjectStore, Reader}; +use snafu::{OptionExt, ResultExt}; +use store_api::metadata::RegionMetadataRef; +use store_api::storage::consts::ReservedColumnId; +use table::metadata::TableId; + +use crate::error::{ + IoSnafu, JsonSnafu, MissingMetricNameSnafu, MissingTableSnafu, MitoSnafu, ObjectStoreSnafu, + Result, +}; +use crate::table::TableMetadataHelper; + +const METRIC_NAME_LABEL: &str = "__name__"; + +/// A reader that reads remote write file, sorts and outputs timeseries in the primary key order. +struct RemoteWriteReader { + /// Timeseries sorted by primary key. + series: Vec<(Vec, TimeSeries)>, + /// Current index in the series. + index: usize, +} + +impl RemoteWriteReader { + /// Creates a new [`RemoteWriteReader`] from a object store. + /// It reads and sorts timeseries. + pub async fn open( + operator: ObjectStore, + path: &str, + catalog: String, + schema: String, + metadata: RegionMetadataRef, + table_helper: TableMetadataHelper, + ) -> Result { + let codec = SparsePrimaryKeyCodec::new(&metadata); + let encoder = PrimaryKeyEncoder { + catalog, + schema, + metadata, + table_helper, + table_ids: HashMap::new(), + codec, + }; + let mut sorter = TimeSeriesSorter::new(encoder); + + let reader = operator.reader(path).await.context(ObjectStoreSnafu)?; + let mut reader = TimeSeriesReader::new(reader).await?; + while let Some(series) = reader.next_series().await? { + sorter.push(series).await?; + } + let series = sorter.sort(); + + Ok(Self { series, index: 0 }) + } +} + +#[async_trait::async_trait] +impl BatchReader for RemoteWriteReader { + async fn next_batch(&mut self) -> mito2::error::Result> { + todo!() + } +} + +/// Prometheus remote write NDJSON reader. +struct TimeSeriesReader { + reader: FuturesAsyncReader, + buffer: String, +} + +impl TimeSeriesReader { + /// Creates a new [`TimeSeriesReader`] from a [`Reader`]. + pub async fn new(reader: Reader) -> Result { + let reader = reader + .into_futures_async_read(..) + .await + .context(ObjectStoreSnafu)?; + + Ok(Self { + reader, + buffer: String::new(), + }) + } + + /// Reads the next timeseries from the reader. + pub async fn next_series(&mut self) -> Result> { + self.buffer.clear(); + self.reader + .read_line(&mut self.buffer) + .await + .context(IoSnafu)?; + if self.buffer.is_empty() { + return Ok(None); + } + + let time_series = serde_json::from_str(&self.buffer).context(JsonSnafu)?; + Ok(Some(time_series)) + } +} + +/// Encoder to encode labels into primary key. +struct PrimaryKeyEncoder { + /// Catalog name. + catalog: String, + /// Schema name. + schema: String, + /// The metadata of the physical region. + metadata: RegionMetadataRef, + /// Helper to get table metadata. + table_helper: TableMetadataHelper, + /// Cached table name to table id. + table_ids: HashMap, + /// Primary key encoder. + codec: SparsePrimaryKeyCodec, +} + +impl PrimaryKeyEncoder { + /// Encodes the primary key for the given labels. + /// It'll sort the labels by name before encoding. + async fn encode_primary_key( + &mut self, + labels: &mut Vec