diff --git a/Cargo.lock b/Cargo.lock index 1d6ed173c1..0d734d0a8d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6698,7 +6698,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] @@ -11231,6 +11231,7 @@ dependencies = [ "common-base", "common-catalog", "common-config", + "common-datasource", "common-error", "common-frontend", "common-grpc", @@ -11276,9 +11277,11 @@ dependencies = [ "metric-engine", "mime_guess", "mito-codec", + "mito2", "mysql_async", "notify", "object-pool", + "object-store", "once_cell", "openmetrics-parser", "opensrv-mysql", @@ -11286,6 +11289,7 @@ dependencies = [ "operator", "otel-arrow-rust", "parking_lot 0.12.3", + "parquet", "partition", "permutation", "pgwire", @@ -13861,12 +13865,13 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.10.0" +version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314" +checksum = "3cf4199d1e5d15ddd86a694e4d0dffa9c323ce759fea589f00fef9d81cc1931d" dependencies = [ - "getrandom 0.2.15", - "rand 0.8.5", + "getrandom 0.3.2", + "js-sys", + "rand 0.9.0", "serde", "wasm-bindgen", ] diff --git a/src/common/datasource/src/lib.rs b/src/common/datasource/src/lib.rs index 5d24b1cdf4..72e94c7f36 100644 --- a/src/common/datasource/src/lib.rs +++ b/src/common/datasource/src/lib.rs @@ -21,6 +21,7 @@ pub mod error; pub mod file_format; pub mod lister; pub mod object_store; +pub mod parquet_writer; pub mod share_buffer; #[cfg(test)] pub mod test_util; diff --git a/src/common/datasource/src/parquet_writer.rs b/src/common/datasource/src/parquet_writer.rs new file mode 100644 index 0000000000..088d17fb83 --- /dev/null +++ b/src/common/datasource/src/parquet_writer.rs @@ -0,0 +1,52 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use bytes::Bytes; +use futures::future::BoxFuture; +use object_store::Writer; +use parquet::arrow::async_writer::AsyncFileWriter; +use parquet::errors::ParquetError; + +/// Bridges opendal [Writer] with parquet [AsyncFileWriter]. +pub struct AsyncWriter { + inner: Writer, +} + +impl AsyncWriter { + /// Create a [`AsyncWriter`] by given [`Writer`]. + pub fn new(writer: Writer) -> Self { + Self { inner: writer } + } +} + +impl AsyncFileWriter for AsyncWriter { + fn write(&mut self, bs: Bytes) -> BoxFuture<'_, parquet::errors::Result<()>> { + Box::pin(async move { + self.inner + .write(bs) + .await + .map_err(|err| ParquetError::External(Box::new(err))) + }) + } + + fn complete(&mut self) -> BoxFuture<'_, parquet::errors::Result<()>> { + Box::pin(async move { + self.inner + .close() + .await + .map(|_| ()) + .map_err(|err| ParquetError::External(Box::new(err))) + }) + } +} diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 93cf797dae..266daf696b 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -41,7 +41,7 @@ pub const PARQUET_METADATA_KEY: &str = "greptime:metadata"; /// Default batch size to read parquet files. pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024; /// Default row group size for parquet files. -pub(crate) const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE; +pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE; /// Parquet write options. #[derive(Debug)] diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 5861624d1e..cedc379a14 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -79,6 +79,10 @@ mime_guess = "2.0" mito-codec.workspace = true notify.workspace = true object-pool = "0.5" +object-store.workspace = true +common-datasource.workspace = true +mito2.workspace = true +parquet.workspace = true once_cell.workspace = true openmetrics-parser = "0.4" operator.workspace = true diff --git a/src/servers/src/access_layer.rs b/src/servers/src/access_layer.rs new file mode 100644 index 0000000000..b586634f8e --- /dev/null +++ b/src/servers/src/access_layer.rs @@ -0,0 +1,215 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use arrow::array::{ + Array, PrimitiveArray, RecordBatch, TimestampMicrosecondArray, TimestampMillisecondArray, + TimestampNanosecondArray, TimestampSecondArray, +}; +use arrow::datatypes::Int64Type; +use arrow_schema::TimeUnit; +use common_datasource::parquet_writer::AsyncWriter; +use common_time::range::TimestampRange; +use datafusion::parquet::arrow::AsyncArrowWriter; +use datatypes::timestamp::TimestampSecond; +use mito2::sst::file::{FileId, FileMeta}; +use mito2::sst::parquet::{DEFAULT_ROW_GROUP_SIZE, PARQUET_METADATA_KEY}; +use object_store::config::ObjectStoreConfig; +use object_store::util::join_dir; +use object_store::ObjectStore; +use parquet::basic::{Compression, Encoding, ZstdLevel}; +use parquet::file::metadata::KeyValue; +use parquet::file::properties::WriterProperties; +use snafu::{OptionExt, ResultExt}; +use store_api::metadata::RegionMetadataRef; +use store_api::metric_engine_consts::DATA_REGION_SUBDIR; +use store_api::storage::RegionId; +use table::metadata::{TableId, TableInfoRef}; + +use crate::error; + +type AsyncParquetWriter = AsyncArrowWriter; + +pub struct AccessLayerFactory { + object_store: ObjectStore, +} + +impl AccessLayerFactory { + async fn new(config: &ObjectStoreConfig) -> error::Result { + let object_store = object_store::factory::new_raw_object_store(config, "") + .await + .context(error::ObjectStoreSnafu)?; + Ok(Self { object_store }) + } + + async fn create_sst_writer( + &self, + catalog: &str, + schema: &str, + region_metadata: RegionMetadataRef, + ) -> error::Result { + let region_dir = build_data_region_dir(catalog, schema, region_metadata.region_id); + let file_id = FileId::random(); + let file_path = join_dir(®ion_dir, &file_id.as_parquet()); + let writer = self + .object_store + .writer(&file_path) + .await + .context(error::OpendalSnafu)?; + let schema = region_metadata.schema.arrow_schema().clone(); + + let key_value_meta = KeyValue::new( + PARQUET_METADATA_KEY.to_string(), + region_metadata.to_json().unwrap(), + ); + + let props = WriterProperties::builder() + .set_key_value_metadata(Some(vec![key_value_meta])) + .set_compression(Compression::ZSTD(ZstdLevel::default())) + .set_encoding(Encoding::PLAIN) + .set_max_row_group_size(DEFAULT_ROW_GROUP_SIZE) + .build(); + + let writer = AsyncParquetWriter::try_new(AsyncWriter::new(writer), schema, Some(props)) + .context(error::ParquetSnafu)?; + Ok(ParquetWriter { + region_id: region_metadata.region_id, + file_id, + region_metadata, + writer, + timestamp_range: None, + }) + } +} + +pub struct ParquetWriter { + region_id: RegionId, + file_id: FileId, + region_metadata: RegionMetadataRef, + writer: AsyncParquetWriter, + timestamp_range: Option<(i64, i64)>, +} + +impl ParquetWriter { + pub async fn write_record_batch( + &mut self, + batch: &RecordBatch, + timestamp_range: Option<(i64, i64)>, + ) -> error::Result<()> { + self.writer + .write(&batch) + .await + .context(error::ParquetSnafu)?; + + let (batch_min, batch_max) = + get_or_calculate_timestamp_range(timestamp_range, batch, &self.region_metadata)?; + + if let Some((min, max)) = &mut self.timestamp_range { + *min = (*min).min(batch_min); + *max = (*max).max(batch_max); + } else { + self.timestamp_range = Some((batch_min, batch_max)); + }; + Ok(()) + } + + pub async fn finish(&mut self) -> error::Result { + let (min, max) = self.timestamp_range.unwrap(); + let timestamp_type = self + .region_metadata + .time_index_column() + .column_schema + .data_type + .as_timestamp() + .unwrap(); + let min_ts = timestamp_type.create_timestamp(min); + let max_ts = timestamp_type.create_timestamp(max); + let file_meta = self.writer.finish().await.context(error::ParquetSnafu)?; + let meta = FileMeta { + region_id: self.region_id, + file_id: self.file_id, + time_range: (min_ts, max_ts), + level: 0, + file_size: self.writer.bytes_written() as u64, + available_indexes: Default::default(), + index_file_size: 0, + num_rows: file_meta.num_rows as u64, + num_row_groups: file_meta.row_groups.len() as u64, + sequence: None, //todo(hl): use flushed sequence here. + }; + Ok(meta) + } +} + +/// Builds the data region subdir for metric physical tables. +fn build_data_region_dir(catalog: &str, schema: &str, physical_region_id: RegionId) -> String { + let storage_path = common_meta::ddl::utils::region_storage_path(&catalog, &schema); + join_dir( + &store_api::path_utils::region_dir(&storage_path, physical_region_id), + DATA_REGION_SUBDIR, + ) +} + +fn get_or_calculate_timestamp_range( + timestamp_range: Option<(i64, i64)>, + rb: &RecordBatch, + region_metadata: &RegionMetadataRef, +) -> error::Result<(i64, i64)> { + if let Some(range) = timestamp_range { + return Ok(range); + }; + + let ts = rb + .column_by_name(®ion_metadata.time_index_column().column_schema.name) + .expect("column not found"); + let arrow::datatypes::DataType::Timestamp(unit, _) = ts.data_type() else { + unreachable!("expected timestamp types"); + }; + let primitives: PrimitiveArray = match unit { + TimeUnit::Second => ts + .as_any() + .downcast_ref::() + .unwrap() + .reinterpret_cast(), + TimeUnit::Millisecond => ts + .as_any() + .downcast_ref::() + .unwrap() + .reinterpret_cast(), + TimeUnit::Microsecond => ts + .as_any() + .downcast_ref::() + .unwrap() + .reinterpret_cast(), + TimeUnit::Nanosecond => ts + .as_any() + .downcast_ref::() + .unwrap() + .reinterpret_cast(), + }; + + let min = arrow::compute::min(&primitives).unwrap(); + let max = arrow::compute::max(&primitives).unwrap(); + Ok((min, max)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_build_data_region_dir_basic() { + let result = build_data_region_dir("greptime", "public", RegionId::new(1024, 0)); + assert_eq!(&result, "data/greptime/public/1024/1024_0000000000/data/"); + } +} diff --git a/src/servers/src/batch_builder.rs b/src/servers/src/batch_builder.rs index 30b5e09a18..fde5dba616 100644 --- a/src/servers/src/batch_builder.rs +++ b/src/servers/src/batch_builder.rs @@ -28,6 +28,7 @@ use common_meta::node_manager::NodeManagerRef; use common_query::prelude::{GREPTIME_PHYSICAL_TABLE, GREPTIME_TIMESTAMP, GREPTIME_VALUE}; use itertools::Itertools; use metric_engine::row_modifier::{RowModifier, RowsIter}; +use mito2::sst::file::FileTimeRange; use mito_codec::row_converter::SparsePrimaryKeyCodec; use operator::schema_helper::{ ensure_logical_tables_for_metrics, metadatas_for_region_ids, LogicalSchema, LogicalSchemas, @@ -252,13 +253,16 @@ impl MetricsBatchBuilder { .or(current_schema.as_deref()) .unwrap_or(DEFAULT_SCHEMA_NAME); // Look up physical region metadata by schema and table name - let schema_metadata = physical_region_metadata.get(schema) - .context(error::TableNotFoundSnafu { - catalog, - schema, - table: logical_table_name, - })?; - let (logical_table_id, physical_table) = schema_metadata.get(logical_table_name) + let schema_metadata = + physical_region_metadata + .get(schema) + .context(error::TableNotFoundSnafu { + catalog, + schema, + table: logical_table_name, + })?; + let (logical_table_id, physical_table) = schema_metadata + .get(logical_table_name) .context(error::TableNotFoundSnafu { catalog, schema, @@ -269,24 +273,22 @@ impl MetricsBatchBuilder { .builders .entry(physical_table.region_id.table_id()) .or_insert_with(|| Self::create_sparse_encoder(&physical_table)); - encoder.append_rows( - *logical_table_id, - std::mem::take(table), - )?; + encoder.append_rows(*logical_table_id, std::mem::take(table))?; } } Ok(()) } /// Finishes current record batch builder and returns record batches grouped by physical table id. - pub(crate) fn finish(self) -> error::Result> { - self.builders - .into_iter() - .map(|(physical_table_id, encoder)| { - let rb = encoder.finish()?; - Ok((physical_table_id, rb)) - }) - .collect::>>() + pub(crate) fn finish(self) -> error::Result> { + let mut table_batches = HashMap::with_capacity(self.builders.len()); + for (physical_table_id, mut encoder) in self.builders { + let rb = encoder.finish()?; + if let Some(v) = rb { + table_batches.insert(physical_table_id, v); + } + } + Ok(table_batches) } /// Creates Encoder that converts Rows into RecordBatch with primary key encoded. @@ -306,6 +308,7 @@ struct BatchEncoder { timestamps: Vec, value: Vec, pk_codec: SparsePrimaryKeyCodec, + timestamp_range: Option<(i64, i64)>, } impl BatchEncoder { @@ -316,6 +319,7 @@ impl BatchEncoder { timestamps: Vec::with_capacity(16), value: Vec::with_capacity(16), pk_codec: SparsePrimaryKeyCodec::schemaless(), + timestamp_range: None, } } @@ -389,6 +393,12 @@ impl BatchEncoder { return error::InvalidTimestampValueTypeSnafu.fail(); }; self.timestamps.push(*ts); + if let Some((min, max)) = &mut self.timestamp_range { + *min = (*min).min(*ts); + *max = (*max).max(*ts); + } else { + self.timestamp_range = Some((*ts, *ts)); + } // safety: field values cannot be null in prom remote write let ValueData::F64Value(val) = row.value_at(1).value_data.as_ref().unwrap() else { @@ -405,7 +415,10 @@ impl BatchEncoder { Ok(()) } - fn finish(mut self) -> error::Result { + fn finish(mut self) -> error::Result> { + if self.timestamps.is_empty() { + return Ok(None); + } let num_rows = self.timestamps.len(); let value = Float64Array::from(self.value); let timestamp = TimestampMillisecondArray::from(self.timestamps); @@ -421,8 +434,9 @@ impl BatchEncoder { let value = compute::take(&value, &indices, None).context(error::ArrowSnafu)?; let ts = compute::take(×tamp, &indices, None).context(error::ArrowSnafu)?; let pk = compute::take(&pk, &indices, None).context(error::ArrowSnafu)?; - arrow::array::RecordBatch::try_new(Self::schema(), vec![value, ts, pk, sequence, op_type]) - .context(error::ArrowSnafu) + let rb = RecordBatch::try_new(Self::schema(), vec![value, ts, pk, sequence, op_type]) + .context(error::ArrowSnafu)?; + Ok(Some((rb, self.timestamp_range.unwrap()))) } } diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index e5aaa72960..76706df2a8 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -659,6 +659,29 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to operate object store"))] + Opendal { + #[snafu(source)] + error: object_store::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to operate object store"))] + ObjectStore { + source: object_store::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to operate object store"))] + Parquet { + #[snafu(source)] + error: parquet::errors::ParquetError, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -788,6 +811,9 @@ impl ErrorExt for Error { InvalidTimestampValueType { .. } | InvalidFieldValueType { .. } => { StatusCode::Unexpected } + ObjectStore { source, .. } => source.status_code(), + Parquet { .. } => StatusCode::Internal, + Opendal { .. } => StatusCode::Internal, } } diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index 4b14c9fba5..3ace718418 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -21,6 +21,7 @@ use datafusion_expr::LogicalPlan; use datatypes::schema::Schema; +mod access_layer; pub mod addrs; #[allow(dead_code)] mod batch_builder;