poc/create-alter-for-metrics:

### Add Parquet Writer and Access Layer

 - **`Cargo.lock`, `Cargo.toml`**: Updated dependencies to include `parquet`, `object-store`, and `common-datasource`.
 - **`parquet_writer.rs`**: Introduced a new module for writing Parquet files asynchronously using `AsyncWriter`.
 - **`access_layer.rs`**: Added a new access layer for managing Parquet file writing, including `AccessLayerFactory` and `ParquetWriter` for handling record batches and metadata.

 ### Enhance Batch Builder

 - **`batch_builder.rs`**: Enhanced `BatchEncoder` to handle timestamp ranges and return optional record batches with timestamp metadata.

 ### Update Error Handling

 - **`error.rs`**: Added new error variants for handling object store and Parquet operations.

 ### Modify Mito2 Parquet Constants

 - **`parquet.rs`**: Changed visibility of `DEFAULT_ROW_GROUP_SIZE` to public for broader access.

 ### Add Tests

 - **`access_layer.rs`**: Added basic tests for building data region directories.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2025-06-26 09:17:53 +00:00
parent 699406ae32
commit 68409e28ea
9 changed files with 346 additions and 28 deletions

15
Cargo.lock generated
View File

@@ -6698,7 +6698,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4"
dependencies = [
"cfg-if",
"windows-targets 0.48.5",
"windows-targets 0.52.6",
]
[[package]]
@@ -11231,6 +11231,7 @@ dependencies = [
"common-base",
"common-catalog",
"common-config",
"common-datasource",
"common-error",
"common-frontend",
"common-grpc",
@@ -11276,9 +11277,11 @@ dependencies = [
"metric-engine",
"mime_guess",
"mito-codec",
"mito2",
"mysql_async",
"notify",
"object-pool",
"object-store",
"once_cell",
"openmetrics-parser",
"opensrv-mysql",
@@ -11286,6 +11289,7 @@ dependencies = [
"operator",
"otel-arrow-rust",
"parking_lot 0.12.3",
"parquet",
"partition",
"permutation",
"pgwire",
@@ -13861,12 +13865,13 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]]
name = "uuid"
version = "1.10.0"
version = "1.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314"
checksum = "3cf4199d1e5d15ddd86a694e4d0dffa9c323ce759fea589f00fef9d81cc1931d"
dependencies = [
"getrandom 0.2.15",
"rand 0.8.5",
"getrandom 0.3.2",
"js-sys",
"rand 0.9.0",
"serde",
"wasm-bindgen",
]

View File

@@ -21,6 +21,7 @@ pub mod error;
pub mod file_format;
pub mod lister;
pub mod object_store;
pub mod parquet_writer;
pub mod share_buffer;
#[cfg(test)]
pub mod test_util;

View File

@@ -0,0 +1,52 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use bytes::Bytes;
use futures::future::BoxFuture;
use object_store::Writer;
use parquet::arrow::async_writer::AsyncFileWriter;
use parquet::errors::ParquetError;
/// Bridges opendal [Writer] with parquet [AsyncFileWriter].
pub struct AsyncWriter {
inner: Writer,
}
impl AsyncWriter {
/// Create a [`AsyncWriter`] by given [`Writer`].
pub fn new(writer: Writer) -> Self {
Self { inner: writer }
}
}
impl AsyncFileWriter for AsyncWriter {
fn write(&mut self, bs: Bytes) -> BoxFuture<'_, parquet::errors::Result<()>> {
Box::pin(async move {
self.inner
.write(bs)
.await
.map_err(|err| ParquetError::External(Box::new(err)))
})
}
fn complete(&mut self) -> BoxFuture<'_, parquet::errors::Result<()>> {
Box::pin(async move {
self.inner
.close()
.await
.map(|_| ())
.map_err(|err| ParquetError::External(Box::new(err)))
})
}
}

View File

@@ -41,7 +41,7 @@ pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
/// Default batch size to read parquet files.
pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
/// Default row group size for parquet files.
pub(crate) const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
/// Parquet write options.
#[derive(Debug)]

View File

@@ -79,6 +79,10 @@ mime_guess = "2.0"
mito-codec.workspace = true
notify.workspace = true
object-pool = "0.5"
object-store.workspace = true
common-datasource.workspace = true
mito2.workspace = true
parquet.workspace = true
once_cell.workspace = true
openmetrics-parser = "0.4"
operator.workspace = true

View File

@@ -0,0 +1,215 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use arrow::array::{
Array, PrimitiveArray, RecordBatch, TimestampMicrosecondArray, TimestampMillisecondArray,
TimestampNanosecondArray, TimestampSecondArray,
};
use arrow::datatypes::Int64Type;
use arrow_schema::TimeUnit;
use common_datasource::parquet_writer::AsyncWriter;
use common_time::range::TimestampRange;
use datafusion::parquet::arrow::AsyncArrowWriter;
use datatypes::timestamp::TimestampSecond;
use mito2::sst::file::{FileId, FileMeta};
use mito2::sst::parquet::{DEFAULT_ROW_GROUP_SIZE, PARQUET_METADATA_KEY};
use object_store::config::ObjectStoreConfig;
use object_store::util::join_dir;
use object_store::ObjectStore;
use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::file::metadata::KeyValue;
use parquet::file::properties::WriterProperties;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::DATA_REGION_SUBDIR;
use store_api::storage::RegionId;
use table::metadata::{TableId, TableInfoRef};
use crate::error;
type AsyncParquetWriter = AsyncArrowWriter<AsyncWriter>;
pub struct AccessLayerFactory {
object_store: ObjectStore,
}
impl AccessLayerFactory {
async fn new(config: &ObjectStoreConfig) -> error::Result<AccessLayerFactory> {
let object_store = object_store::factory::new_raw_object_store(config, "")
.await
.context(error::ObjectStoreSnafu)?;
Ok(Self { object_store })
}
async fn create_sst_writer(
&self,
catalog: &str,
schema: &str,
region_metadata: RegionMetadataRef,
) -> error::Result<ParquetWriter> {
let region_dir = build_data_region_dir(catalog, schema, region_metadata.region_id);
let file_id = FileId::random();
let file_path = join_dir(&region_dir, &file_id.as_parquet());
let writer = self
.object_store
.writer(&file_path)
.await
.context(error::OpendalSnafu)?;
let schema = region_metadata.schema.arrow_schema().clone();
let key_value_meta = KeyValue::new(
PARQUET_METADATA_KEY.to_string(),
region_metadata.to_json().unwrap(),
);
let props = WriterProperties::builder()
.set_key_value_metadata(Some(vec![key_value_meta]))
.set_compression(Compression::ZSTD(ZstdLevel::default()))
.set_encoding(Encoding::PLAIN)
.set_max_row_group_size(DEFAULT_ROW_GROUP_SIZE)
.build();
let writer = AsyncParquetWriter::try_new(AsyncWriter::new(writer), schema, Some(props))
.context(error::ParquetSnafu)?;
Ok(ParquetWriter {
region_id: region_metadata.region_id,
file_id,
region_metadata,
writer,
timestamp_range: None,
})
}
}
pub struct ParquetWriter {
region_id: RegionId,
file_id: FileId,
region_metadata: RegionMetadataRef,
writer: AsyncParquetWriter,
timestamp_range: Option<(i64, i64)>,
}
impl ParquetWriter {
pub async fn write_record_batch(
&mut self,
batch: &RecordBatch,
timestamp_range: Option<(i64, i64)>,
) -> error::Result<()> {
self.writer
.write(&batch)
.await
.context(error::ParquetSnafu)?;
let (batch_min, batch_max) =
get_or_calculate_timestamp_range(timestamp_range, batch, &self.region_metadata)?;
if let Some((min, max)) = &mut self.timestamp_range {
*min = (*min).min(batch_min);
*max = (*max).max(batch_max);
} else {
self.timestamp_range = Some((batch_min, batch_max));
};
Ok(())
}
pub async fn finish(&mut self) -> error::Result<FileMeta> {
let (min, max) = self.timestamp_range.unwrap();
let timestamp_type = self
.region_metadata
.time_index_column()
.column_schema
.data_type
.as_timestamp()
.unwrap();
let min_ts = timestamp_type.create_timestamp(min);
let max_ts = timestamp_type.create_timestamp(max);
let file_meta = self.writer.finish().await.context(error::ParquetSnafu)?;
let meta = FileMeta {
region_id: self.region_id,
file_id: self.file_id,
time_range: (min_ts, max_ts),
level: 0,
file_size: self.writer.bytes_written() as u64,
available_indexes: Default::default(),
index_file_size: 0,
num_rows: file_meta.num_rows as u64,
num_row_groups: file_meta.row_groups.len() as u64,
sequence: None, //todo(hl): use flushed sequence here.
};
Ok(meta)
}
}
/// Builds the data region subdir for metric physical tables.
fn build_data_region_dir(catalog: &str, schema: &str, physical_region_id: RegionId) -> String {
let storage_path = common_meta::ddl::utils::region_storage_path(&catalog, &schema);
join_dir(
&store_api::path_utils::region_dir(&storage_path, physical_region_id),
DATA_REGION_SUBDIR,
)
}
fn get_or_calculate_timestamp_range(
timestamp_range: Option<(i64, i64)>,
rb: &RecordBatch,
region_metadata: &RegionMetadataRef,
) -> error::Result<(i64, i64)> {
if let Some(range) = timestamp_range {
return Ok(range);
};
let ts = rb
.column_by_name(&region_metadata.time_index_column().column_schema.name)
.expect("column not found");
let arrow::datatypes::DataType::Timestamp(unit, _) = ts.data_type() else {
unreachable!("expected timestamp types");
};
let primitives: PrimitiveArray<Int64Type> = match unit {
TimeUnit::Second => ts
.as_any()
.downcast_ref::<TimestampSecondArray>()
.unwrap()
.reinterpret_cast(),
TimeUnit::Millisecond => ts
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.reinterpret_cast(),
TimeUnit::Microsecond => ts
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap()
.reinterpret_cast(),
TimeUnit::Nanosecond => ts
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap()
.reinterpret_cast(),
};
let min = arrow::compute::min(&primitives).unwrap();
let max = arrow::compute::max(&primitives).unwrap();
Ok((min, max))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_build_data_region_dir_basic() {
let result = build_data_region_dir("greptime", "public", RegionId::new(1024, 0));
assert_eq!(&result, "data/greptime/public/1024/1024_0000000000/data/");
}
}

View File

@@ -28,6 +28,7 @@ use common_meta::node_manager::NodeManagerRef;
use common_query::prelude::{GREPTIME_PHYSICAL_TABLE, GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use itertools::Itertools;
use metric_engine::row_modifier::{RowModifier, RowsIter};
use mito2::sst::file::FileTimeRange;
use mito_codec::row_converter::SparsePrimaryKeyCodec;
use operator::schema_helper::{
ensure_logical_tables_for_metrics, metadatas_for_region_ids, LogicalSchema, LogicalSchemas,
@@ -252,13 +253,16 @@ impl MetricsBatchBuilder {
.or(current_schema.as_deref())
.unwrap_or(DEFAULT_SCHEMA_NAME);
// Look up physical region metadata by schema and table name
let schema_metadata = physical_region_metadata.get(schema)
.context(error::TableNotFoundSnafu {
catalog,
schema,
table: logical_table_name,
})?;
let (logical_table_id, physical_table) = schema_metadata.get(logical_table_name)
let schema_metadata =
physical_region_metadata
.get(schema)
.context(error::TableNotFoundSnafu {
catalog,
schema,
table: logical_table_name,
})?;
let (logical_table_id, physical_table) = schema_metadata
.get(logical_table_name)
.context(error::TableNotFoundSnafu {
catalog,
schema,
@@ -269,24 +273,22 @@ impl MetricsBatchBuilder {
.builders
.entry(physical_table.region_id.table_id())
.or_insert_with(|| Self::create_sparse_encoder(&physical_table));
encoder.append_rows(
*logical_table_id,
std::mem::take(table),
)?;
encoder.append_rows(*logical_table_id, std::mem::take(table))?;
}
}
Ok(())
}
/// Finishes current record batch builder and returns record batches grouped by physical table id.
pub(crate) fn finish(self) -> error::Result<HashMap<TableId, RecordBatch>> {
self.builders
.into_iter()
.map(|(physical_table_id, encoder)| {
let rb = encoder.finish()?;
Ok((physical_table_id, rb))
})
.collect::<error::Result<HashMap<_, _>>>()
pub(crate) fn finish(self) -> error::Result<HashMap<TableId, (RecordBatch, (i64, i64))>> {
let mut table_batches = HashMap::with_capacity(self.builders.len());
for (physical_table_id, mut encoder) in self.builders {
let rb = encoder.finish()?;
if let Some(v) = rb {
table_batches.insert(physical_table_id, v);
}
}
Ok(table_batches)
}
/// Creates Encoder that converts Rows into RecordBatch with primary key encoded.
@@ -306,6 +308,7 @@ struct BatchEncoder {
timestamps: Vec<i64>,
value: Vec<f64>,
pk_codec: SparsePrimaryKeyCodec,
timestamp_range: Option<(i64, i64)>,
}
impl BatchEncoder {
@@ -316,6 +319,7 @@ impl BatchEncoder {
timestamps: Vec::with_capacity(16),
value: Vec::with_capacity(16),
pk_codec: SparsePrimaryKeyCodec::schemaless(),
timestamp_range: None,
}
}
@@ -389,6 +393,12 @@ impl BatchEncoder {
return error::InvalidTimestampValueTypeSnafu.fail();
};
self.timestamps.push(*ts);
if let Some((min, max)) = &mut self.timestamp_range {
*min = (*min).min(*ts);
*max = (*max).max(*ts);
} else {
self.timestamp_range = Some((*ts, *ts));
}
// safety: field values cannot be null in prom remote write
let ValueData::F64Value(val) = row.value_at(1).value_data.as_ref().unwrap() else {
@@ -405,7 +415,10 @@ impl BatchEncoder {
Ok(())
}
fn finish(mut self) -> error::Result<arrow::record_batch::RecordBatch> {
fn finish(mut self) -> error::Result<Option<(RecordBatch, (i64, i64))>> {
if self.timestamps.is_empty() {
return Ok(None);
}
let num_rows = self.timestamps.len();
let value = Float64Array::from(self.value);
let timestamp = TimestampMillisecondArray::from(self.timestamps);
@@ -421,8 +434,9 @@ impl BatchEncoder {
let value = compute::take(&value, &indices, None).context(error::ArrowSnafu)?;
let ts = compute::take(&timestamp, &indices, None).context(error::ArrowSnafu)?;
let pk = compute::take(&pk, &indices, None).context(error::ArrowSnafu)?;
arrow::array::RecordBatch::try_new(Self::schema(), vec![value, ts, pk, sequence, op_type])
.context(error::ArrowSnafu)
let rb = RecordBatch::try_new(Self::schema(), vec![value, ts, pk, sequence, op_type])
.context(error::ArrowSnafu)?;
Ok(Some((rb, self.timestamp_range.unwrap())))
}
}

View File

@@ -659,6 +659,29 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to operate object store"))]
Opendal {
#[snafu(source)]
error: object_store::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to operate object store"))]
ObjectStore {
source: object_store::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to operate object store"))]
Parquet {
#[snafu(source)]
error: parquet::errors::ParquetError,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -788,6 +811,9 @@ impl ErrorExt for Error {
InvalidTimestampValueType { .. } | InvalidFieldValueType { .. } => {
StatusCode::Unexpected
}
ObjectStore { source, .. } => source.status_code(),
Parquet { .. } => StatusCode::Internal,
Opendal { .. } => StatusCode::Internal,
}
}

View File

@@ -21,6 +21,7 @@
use datafusion_expr::LogicalPlan;
use datatypes::schema::Schema;
mod access_layer;
pub mod addrs;
#[allow(dead_code)]
mod batch_builder;