diff --git a/Cargo.lock b/Cargo.lock index 3075cad48a..08711e739b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -317,8 +317,7 @@ dependencies = [ [[package]] name = "arrow" version = "57.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4df8bb5b0bd64c0b9bc61317fcc480bad0f00e56d3bc32c69a4c8dada4786bae" +source = "git+https://github.com/GreptimeTeam/arrow-rs.git?rev=ea8d41eaa336cd85b1116a32918eec652017f8af#ea8d41eaa336cd85b1116a32918eec652017f8af" dependencies = [ "arrow-arith 57.0.0", "arrow-array 57.0.0", @@ -352,8 +351,7 @@ dependencies = [ [[package]] name = "arrow-arith" version = "57.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1a640186d3bd30a24cb42264c2dafb30e236a6f50d510e56d40b708c9582491" +source = "git+https://github.com/GreptimeTeam/arrow-rs.git?rev=ea8d41eaa336cd85b1116a32918eec652017f8af#ea8d41eaa336cd85b1116a32918eec652017f8af" dependencies = [ "arrow-array 57.0.0", "arrow-buffer 57.0.0", @@ -382,8 +380,7 @@ dependencies = [ [[package]] name = "arrow-array" version = "57.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "219fe420e6800979744c8393b687afb0252b3f8a89b91027d27887b72aa36d31" +source = "git+https://github.com/GreptimeTeam/arrow-rs.git?rev=ea8d41eaa336cd85b1116a32918eec652017f8af#ea8d41eaa336cd85b1116a32918eec652017f8af" dependencies = [ "ahash 0.8.12", "arrow-buffer 57.0.0", @@ -412,8 +409,7 @@ dependencies = [ [[package]] name = "arrow-buffer" version = "57.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76885a2697a7edf6b59577f568b456afc94ce0e2edc15b784ce3685b6c3c5c27" +source = "git+https://github.com/GreptimeTeam/arrow-rs.git?rev=ea8d41eaa336cd85b1116a32918eec652017f8af#ea8d41eaa336cd85b1116a32918eec652017f8af" dependencies = [ "bytes", "half", @@ -444,8 +440,7 @@ dependencies = [ [[package]] name = "arrow-cast" version = "57.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c9ebb4c987e6b3b236fb4a14b20b34835abfdd80acead3ccf1f9bf399e1f168" +source = "git+https://github.com/GreptimeTeam/arrow-rs.git?rev=ea8d41eaa336cd85b1116a32918eec652017f8af#ea8d41eaa336cd85b1116a32918eec652017f8af" dependencies = [ "arrow-array 57.0.0", "arrow-buffer 57.0.0", @@ -480,8 +475,7 @@ dependencies = [ [[package]] name = "arrow-csv" version = "57.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92386159c8d4bce96f8bd396b0642a0d544d471bdc2ef34d631aec80db40a09c" +source = "git+https://github.com/GreptimeTeam/arrow-rs.git?rev=ea8d41eaa336cd85b1116a32918eec652017f8af#ea8d41eaa336cd85b1116a32918eec652017f8af" dependencies = [ "arrow-array 57.0.0", "arrow-cast 57.0.0", @@ -507,8 +501,7 @@ dependencies = [ [[package]] name = "arrow-data" version = "57.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "727681b95de313b600eddc2a37e736dcb21980a40f640314dcf360e2f36bc89b" +source = "git+https://github.com/GreptimeTeam/arrow-rs.git?rev=ea8d41eaa336cd85b1116a32918eec652017f8af#ea8d41eaa336cd85b1116a32918eec652017f8af" dependencies = [ "arrow-buffer 57.0.0", "arrow-schema 57.0.0", @@ -520,8 +513,7 @@ dependencies = [ [[package]] name = "arrow-flight" version = "57.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f70bb56412a007b0cfc116d15f24dda6adeed9611a213852a004cda20085a3b9" +source = "git+https://github.com/GreptimeTeam/arrow-rs.git?rev=ea8d41eaa336cd85b1116a32918eec652017f8af#ea8d41eaa336cd85b1116a32918eec652017f8af" dependencies = [ "arrow-array 57.0.0", "arrow-buffer 57.0.0", @@ -555,8 +547,7 @@ dependencies = [ [[package]] name = "arrow-ipc" version = "57.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da9ba92e3de170295c98a84e5af22e2b037f0c7b32449445e6c493b5fca27f27" +source = "git+https://github.com/GreptimeTeam/arrow-rs.git?rev=ea8d41eaa336cd85b1116a32918eec652017f8af#ea8d41eaa336cd85b1116a32918eec652017f8af" dependencies = [ "arrow-array 57.0.0", "arrow-buffer 57.0.0", @@ -593,8 +584,7 @@ dependencies = [ [[package]] name = "arrow-json" version = "57.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b969b4a421ae83828591c6bf5450bd52e6d489584142845ad6a861f42fe35df8" +source = "git+https://github.com/GreptimeTeam/arrow-rs.git?rev=ea8d41eaa336cd85b1116a32918eec652017f8af#ea8d41eaa336cd85b1116a32918eec652017f8af" dependencies = [ "arrow-array 57.0.0", "arrow-buffer 57.0.0", @@ -630,8 +620,7 @@ dependencies = [ [[package]] name = "arrow-ord" version = "57.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "141c05298b21d03e88062317a1f1a73f5ba7b6eb041b350015b1cd6aabc0519b" +source = "git+https://github.com/GreptimeTeam/arrow-rs.git?rev=ea8d41eaa336cd85b1116a32918eec652017f8af#ea8d41eaa336cd85b1116a32918eec652017f8af" dependencies = [ "arrow-array 57.0.0", "arrow-buffer 57.0.0", @@ -673,8 +662,7 @@ dependencies = [ [[package]] name = "arrow-row" version = "57.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5f3c06a6abad6164508ed283c7a02151515cef3de4b4ff2cebbcaeb85533db2" +source = "git+https://github.com/GreptimeTeam/arrow-rs.git?rev=ea8d41eaa336cd85b1116a32918eec652017f8af#ea8d41eaa336cd85b1116a32918eec652017f8af" dependencies = [ "arrow-array 57.0.0", "arrow-buffer 57.0.0", @@ -692,8 +680,7 @@ checksum = "b3aa9e59c611ebc291c28582077ef25c97f1975383f1479b12f3b9ffee2ffabe" [[package]] name = "arrow-schema" version = "57.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cfa7a03d1eee2a4d061476e1840ad5c9867a544ca6c4c59256496af5d0a8be5" +source = "git+https://github.com/GreptimeTeam/arrow-rs.git?rev=ea8d41eaa336cd85b1116a32918eec652017f8af#ea8d41eaa336cd85b1116a32918eec652017f8af" dependencies = [ "serde", "serde_core", @@ -717,8 +704,7 @@ dependencies = [ [[package]] name = "arrow-select" version = "57.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bafa595babaad59f2455f4957d0f26448fb472722c186739f4fac0823a1bdb47" +source = "git+https://github.com/GreptimeTeam/arrow-rs.git?rev=ea8d41eaa336cd85b1116a32918eec652017f8af#ea8d41eaa336cd85b1116a32918eec652017f8af" dependencies = [ "ahash 0.8.12", "arrow-array 57.0.0", @@ -748,8 +734,7 @@ dependencies = [ [[package]] name = "arrow-string" version = "57.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32f46457dbbb99f2650ff3ac23e46a929e0ab81db809b02aa5511c258348bef2" +source = "git+https://github.com/GreptimeTeam/arrow-rs.git?rev=ea8d41eaa336cd85b1116a32918eec652017f8af#ea8d41eaa336cd85b1116a32918eec652017f8af" dependencies = [ "arrow-array 57.0.0", "arrow-buffer 57.0.0", @@ -4295,6 +4280,7 @@ dependencies = [ "async-trait", "bytes", "cache", + "catalog", "client", "common-base", "common-catalog", @@ -4345,6 +4331,7 @@ dependencies = [ "servers", "session", "snafu 0.8.6", + "sql", "store-api", "table", "tokio", @@ -9394,8 +9381,7 @@ dependencies = [ [[package]] name = "parquet" version = "57.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a0f31027ef1af7549f7cec603a9a21dce706d3f8d7c2060a68f43c1773be95a" +source = "git+https://github.com/GreptimeTeam/arrow-rs.git?rev=ea8d41eaa336cd85b1116a32918eec652017f8af#ea8d41eaa336cd85b1116a32918eec652017f8af" dependencies = [ "ahash 0.8.12", "arrow-array 57.0.0", diff --git a/Cargo.toml b/Cargo.toml index 5dc943d2ba..9dc1cc6bbf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -329,6 +329,16 @@ git = "https://github.com/GreptimeTeam/greptime-meter.git" rev = "5618e779cf2bb4755b499c630fba4c35e91898cb" [patch.crates-io] +# on branch "greptime-57" +arrow = { git = "https://github.com/GreptimeTeam/arrow-rs.git", rev = "ea8d41eaa336cd85b1116a32918eec652017f8af" } +arrow-array = { git = "https://github.com/GreptimeTeam/arrow-rs.git", rev = "ea8d41eaa336cd85b1116a32918eec652017f8af" } +arrow-buffer = { git = "https://github.com/GreptimeTeam/arrow-rs.git", rev = "ea8d41eaa336cd85b1116a32918eec652017f8af" } +arrow-cast = { git = "https://github.com/GreptimeTeam/arrow-rs.git", rev = "ea8d41eaa336cd85b1116a32918eec652017f8af" } +arrow-flight = { git = "https://github.com/GreptimeTeam/arrow-rs.git", rev = "ea8d41eaa336cd85b1116a32918eec652017f8af" } +arrow-ipc = { git = "https://github.com/GreptimeTeam/arrow-rs.git", rev = "ea8d41eaa336cd85b1116a32918eec652017f8af" } +arrow-ord = { git = "https://github.com/GreptimeTeam/arrow-rs.git", rev = "ea8d41eaa336cd85b1116a32918eec652017f8af" } +arrow-schema = { git = "https://github.com/GreptimeTeam/arrow-rs.git", rev = "ea8d41eaa336cd85b1116a32918eec652017f8af" } +parquet = { git = "https://github.com/GreptimeTeam/arrow-rs.git", rev = "ea8d41eaa336cd85b1116a32918eec652017f8af" } datafusion = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" } datafusion-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" } datafusion-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" } diff --git a/src/common/error/src/ext.rs b/src/common/error/src/ext.rs index 3f95c5fe1a..50eb53bc85 100644 --- a/src/common/error/src/ext.rs +++ b/src/common/error/src/ext.rs @@ -13,8 +13,11 @@ // limitations under the License. use std::any::Any; +use std::fmt::{Debug, Formatter}; use std::sync::Arc; +use snafu::{FromString, Snafu}; + use crate::status_code::StatusCode; /// Extension to [`Error`](std::error::Error) in std. @@ -116,6 +119,39 @@ impl StackError for Box { } } +/// A simple [Result] of which the error is convertable from [ErrorExt] (which every GreptimeDB +/// error implements). Use this if you are tired of writing `unwrap`s in test codes, that you can +/// use the `?` on all GreptimeDB errors. +pub type WhateverResult = Result; + +#[derive(Snafu)] +#[snafu(display("{inner}"))] +pub struct Whatever { + inner: snafu::Whatever, +} + +impl Debug for Whatever { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.inner) + } +} + +impl From for Whatever { + fn from(e: E) -> Self { + Self { + inner: FromString::without_source(format!("{e:?}")), + } + } +} + +impl From for Whatever { + fn from(s: String) -> Self { + Self { + inner: FromString::without_source(s), + } + } +} + /// An opaque boxed error based on errors that implement [ErrorExt] trait. pub struct BoxedError { inner: Box, diff --git a/src/common/error/tests/ext.rs b/src/common/error/tests/ext.rs index d22ad96a43..c9e6137729 100644 --- a/src/common/error/tests/ext.rs +++ b/src/common/error/tests/ext.rs @@ -14,7 +14,7 @@ use std::any::Any; -use common_error::ext::{ErrorExt, PlainError, StackError}; +use common_error::ext::{ErrorExt, PlainError, StackError, WhateverResult}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use snafu::{Location, ResultExt, Snafu}; @@ -60,6 +60,34 @@ fn transparent_error() -> Result<(), MyError> { Err(plain_error)? } +#[test] +fn test_into_whatever_error() { + fn f(g: fn() -> Result<(), MyError>) -> WhateverResult<()> { + g()?; + Ok(()) + } + + let whatever = f(normal_error).unwrap_err(); + assert_eq!( + normalize_path(&whatever.to_string()), + format!( + r#"0: A normal error with "display" attribute, message "blabla", at {}:55:22 +1: PlainError {{ msg: "", status_code: Unexpected }}"#, + normalize_path(file!()) + ) + ); + + let whatever = f(transparent_error).unwrap_err(); + assert_eq!( + normalize_path(&whatever.to_string()), + format!( + r#"0: , at {}:60:5 +1: PlainError {{ msg: "", status_code: Unexpected }}"#, + normalize_path(file!()) + ) + ); +} + #[test] fn test_output_msg() { let result = normal_error(); diff --git a/src/common/recordbatch/src/ext.rs b/src/common/recordbatch/src/ext.rs new file mode 100644 index 0000000000..9e098dbdce --- /dev/null +++ b/src/common/recordbatch/src/ext.rs @@ -0,0 +1,29 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use datatypes::arrow::record_batch::RecordBatch; +use datatypes::arrow::util::pretty; + +pub trait RecordBatchExt { + fn pretty_print(&self) -> String; +} + +impl RecordBatchExt for RecordBatch { + fn pretty_print(&self) -> String { + match pretty::pretty_format_batches(std::slice::from_ref(self)) { + Ok(s) => s.to_string(), + Err(e) => format!("unable to pretty print {self:?}: {e}"), + } + } +} diff --git a/src/common/recordbatch/src/lib.rs b/src/common/recordbatch/src/lib.rs index 85e0d5c496..6fa65fd92a 100644 --- a/src/common/recordbatch/src/lib.rs +++ b/src/common/recordbatch/src/lib.rs @@ -17,6 +17,7 @@ pub mod adapter; pub mod cursor; pub mod error; +pub mod ext; pub mod filter; pub mod recordbatch; pub mod util; diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 5487d05d47..11aa80754c 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -73,6 +73,7 @@ tracing.workspace = true [dev-dependencies] cache.workspace = true +catalog = { workspace = true, features = ["testing"] } client.workspace = true common-meta = { workspace = true, features = ["testing"] } common-query.workspace = true @@ -81,3 +82,4 @@ datafusion-common.workspace = true mito2 = { workspace = true, features = ["test"] } partition.workspace = true session.workspace = true +sql.workspace = true diff --git a/src/datanode/src/lib.rs b/src/datanode/src/lib.rs index 55d2b1796d..b0163ace4a 100644 --- a/src/datanode/src/lib.rs +++ b/src/datanode/src/lib.rs @@ -22,7 +22,7 @@ pub mod event_listener; mod greptimedb_telemetry; pub mod heartbeat; pub mod metrics; -mod partition_expr_fetcher; +pub mod partition_expr_fetcher; pub mod region_server; pub mod service; pub mod store; diff --git a/src/datanode/tests/region_read.rs b/src/datanode/tests/region_read.rs new file mode 100644 index 0000000000..9de01b705e --- /dev/null +++ b/src/datanode/tests/region_read.rs @@ -0,0 +1,199 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; + +use api::v1::SemanticType; +use catalog::memory::MemoryCatalogManager; +use common_error::ext::WhateverResult; +use common_meta::key::SchemaMetadataManager; +use common_meta::kv_backend::memory::MemoryKvBackend; +use common_query::Output; +use common_query::request::QueryRequest; +use datanode::config::StorageConfig; +use datanode::event_listener::NoopRegionServerEventListener; +use datanode::partition_expr_fetcher::MetaPartitionExprFetcher; +use datanode::region_server::RegionServer; +use datanode::store; +use datatypes::data_type::ConcreteDataType; +use datatypes::schema::{ColumnSchema, Schema}; +use datatypes::types::{StructField, StructType}; +use log_store::noop::log_store::NoopLogStore; +use mito2::engine::MitoEngineBuilder; +use mito2::sst::file_ref::FileReferenceManager; +use object_store::manager::ObjectStoreManager; +use query::QueryEngineFactory; +use query::dummy_catalog::{DummyCatalogManager, DummyTableProviderFactory}; +use query::parser::QueryStatement; +use session::context::QueryContext; +use sql::dialect::GreptimeDbDialect; +use sql::parser::ParserContext; +use store_api::metadata::ColumnMetadata; +use store_api::region_request::{PathType, RegionCreateRequest, RegionRequest}; +use store_api::storage::RegionId; +use table::metadata::{TableInfoBuilder, TableMetaBuilder}; +use table::test_util::EmptyTable; + +#[tokio::test] +async fn test_column_field_access_pushdown() -> WhateverResult<()> { + common_telemetry::init_default_ut_logging(); + + let column_schemas = + vec![ + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + ColumnSchema::new( + "a", + ConcreteDataType::struct_datatype(StructType::new(Arc::new(vec![ + StructField::new("x", ConcreteDataType::string_datatype(), true), + ]))), + true, + ), + ColumnSchema::new("b", ConcreteDataType::int32_datatype(), true), + ]; + + let plan = QueryEngineFactory::new( + MemoryCatalogManager::new_with_table(EmptyTable::from_table_info( + &TableInfoBuilder::new( + "foo", + TableMetaBuilder::empty() + .schema(Arc::new(Schema::new(column_schemas.clone()))) + .primary_key_indices(vec![]) + .next_column_id(3) + .build() + .unwrap(), + ) + .build() + .unwrap(), + )), + None, + None, + None, + None, + true, + Default::default(), + ) + .query_engine() + .planner() + .plan( + &QueryStatement::Sql( + ParserContext::create_with_dialect( + "select a['x'] from foo where b > 0 order by ts limit 1", + &GreptimeDbDialect {}, + Default::default(), + )? + .remove(0), + ), + QueryContext::arc(), + ) + .await?; + + let data_home = "/tmp"; + let config = StorageConfig { + data_home: data_home.to_string(), + ..Default::default() + }; + let kv_backend = Arc::new(MemoryKvBackend::new()); + let registry = cache::build_datanode_cache_registry(kv_backend.clone()); + let mito_engine = Arc::new( + MitoEngineBuilder::new( + data_home, + Default::default(), + Arc::new(NoopLogStore), + Arc::new(ObjectStoreManager::new( + "test", + store::new_object_store(config.store.clone(), data_home).await?, + )), + Arc::new(SchemaMetadataManager::new( + registry.get().unwrap(), + registry.get().unwrap(), + )), + Arc::new(FileReferenceManager::new(None)), + Arc::new(MetaPartitionExprFetcher::new(kv_backend)), + Default::default(), + 0, + ) + .try_build() + .await?, + ); + + let mut server = RegionServer::with_table_provider( + QueryEngineFactory::new( + DummyCatalogManager::arc(), + None, + None, + None, + None, + false, + Default::default(), + ) + .query_engine(), + common_runtime::global_runtime(), + Box::new(NoopRegionServerEventListener), + Arc::new(DummyTableProviderFactory), + 0, + Duration::from_secs(0), + Default::default(), + ); + server.register_engine(mito_engine); + + let region_id = RegionId::new(1024, 0); + server + .handle_request( + region_id, + RegionRequest::Create(RegionCreateRequest { + engine: "mito".to_string(), + column_metadatas: vec![ + ColumnMetadata { + column_schema: column_schemas[0].clone(), + semantic_type: SemanticType::Timestamp, + column_id: 0, + }, + ColumnMetadata { + column_schema: column_schemas[1].clone(), + semantic_type: SemanticType::Field, + column_id: 1, + }, + ColumnMetadata { + column_schema: column_schemas[2].clone(), + semantic_type: SemanticType::Field, + column_id: 2, + }, + ], + primary_key: vec![], + options: HashMap::new(), + table_dir: data_home.to_string(), + path_type: PathType::Bare, + partition_expr_json: None, + }), + ) + .await?; + + let request = QueryRequest { + header: None, + region_id, + plan, + }; + let result = server.handle_read(request).await?; + + let output = Output::new_with_stream(result).data.pretty_print().await; + common_telemetry::debug!("{}", output); + Ok(()) +} diff --git a/src/mito2/src/memtable/bulk/row_group_reader.rs b/src/mito2/src/memtable/bulk/row_group_reader.rs index 1e9e5dec4d..4e947c1039 100644 --- a/src/mito2/src/memtable/bulk/row_group_reader.rs +++ b/src/mito2/src/memtable/bulk/row_group_reader.rs @@ -19,6 +19,7 @@ use bytes::Bytes; use datatypes::arrow::array::RecordBatch; use datatypes::arrow::error::ArrowError; use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowGroups, RowSelection}; +use parquet::arrow::async_reader::ColumnChunkIterator; use parquet::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels}; use parquet::column::page::{PageIterator, PageReader}; use parquet::file::metadata::ParquetMetaData; @@ -30,7 +31,7 @@ use crate::memtable::bulk::context::BulkIterContextRef; use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE; use crate::sst::parquet::format::ReadFormat; use crate::sst::parquet::reader::RowGroupReaderContext; -use crate::sst::parquet::row_group::{ColumnChunkIterator, RowGroupBase}; +use crate::sst::parquet::row_group::RowGroupBase; /// Helper for reading specific row group inside Memtable Parquet parts. // This is similar to [mito2::sst::parquet::row_group::InMemoryRowGroup] since @@ -99,9 +100,9 @@ impl RowGroups for MemtableRowGroupPageFetcher<'_> { } fn column_chunks(&self, i: usize) -> parquet::errors::Result> { - Ok(Box::new(ColumnChunkIterator { - reader: Some(self.column_page_reader(i)), - })) + Ok(Box::new(ColumnChunkIterator::new(Some( + self.column_page_reader(i), + )))) } } diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 989aeb812b..6346fdfc1a 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -93,30 +93,35 @@ pub struct SstInfo { #[cfg(test)] mod tests { use std::collections::HashSet; + use std::ops::Range; use std::sync::Arc; use api::v1::{OpType, SemanticType}; + use common_error::ext::WhateverResult; use common_function::function::FunctionRef; use common_function::function_factory::ScalarFunctionFactory; use common_function::scalars::matches::MatchesFunction; use common_function::scalars::matches_term::MatchesTermFunction; + use common_recordbatch::ext::RecordBatchExt; use common_time::Timestamp; use datafusion_common::{Column, ScalarValue}; use datafusion_expr::expr::ScalarFunction; use datafusion_expr::{BinaryExpr, Expr, Literal, Operator, col, lit}; use datatypes::arrow; use datatypes::arrow::array::{ - ArrayRef, BinaryDictionaryBuilder, RecordBatch, StringArray, StringDictionaryBuilder, - TimestampMillisecondArray, UInt8Array, UInt64Array, + ArrayRef, BinaryDictionaryBuilder, Int32Array, RecordBatch, StringArray, + StringDictionaryBuilder, StructArray, TimestampMillisecondArray, UInt8Array, UInt64Array, }; - use datatypes::arrow::datatypes::{DataType, Field, Schema, UInt32Type}; + use datatypes::arrow::datatypes::{DataType, Field, Fields, Schema, UInt32Type}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{FulltextAnalyzer, FulltextBackend, FulltextOptions}; + use datatypes::types::{StructField, StructType}; use object_store::ObjectStore; use parquet::arrow::AsyncArrowWriter; use parquet::basic::{Compression, Encoding, ZstdLevel}; use parquet::file::metadata::{KeyValue, PageIndexPolicy}; use parquet::file::properties::WriterProperties; + use smallvec::SmallVec; use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}; use store_api::region_request::PathType; @@ -129,6 +134,7 @@ mod tests { use crate::cache::test_util::assert_parquet_metadata_equal; use crate::cache::{CacheManager, CacheStrategy, PageKey}; use crate::config::IndexConfig; + use crate::read::range::FileRangeBuilder; use crate::read::{BatchBuilder, BatchReader, FlatSource}; use crate::region::options::{IndexOptions, InvertedIndexOptions}; use crate::sst::file::{FileHandle, FileMeta, RegionFileId, RegionIndexId}; @@ -2113,4 +2119,228 @@ mod tests { assert_eq!(metrics.filter_metrics.rg_fulltext_filtered, 2); assert_eq!(metrics.filter_metrics.rows_fulltext_filtered, 100); } + + #[tokio::test] + async fn test_parquet_write_and_read_with_nested_data_type() -> WhateverResult<()> { + common_telemetry::init_default_ut_logging(); + + let mut env = TestEnv::new().await; + let object_store = env.init_object_store_manager(); + + let mut builder = RegionMetadataBuilder::new(REGION_ID); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "tag_0".to_string(), + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Tag, + column_id: 0, + }) + // Create a column "a" with nested (struct) data type: + // + // struct< + // "a_a": string, + // "a_": struct< + // "a_b": int32, + // > + // > + // + // There are two leaf columns in the parquet: "a_a" and "a_b", which will be written and read in this test. + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "a", + ConcreteDataType::struct_datatype(StructType::new(Arc::new(vec![ + StructField::new("a_a", ConcreteDataType::string_datatype(), true), + StructField::new( + "a_", + ConcreteDataType::struct_datatype(StructType::new(Arc::new(vec![ + StructField::new("a_b", ConcreteDataType::int32_datatype(), true), + ]))), + true, + ), + ]))), + true, + ), + semantic_type: SemanticType::Field, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts".to_string(), + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 2, + }) + .primary_key(vec![0]); + let metadata = Arc::new(builder.build()?); + let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default()); + + let create_record_batch = |tag: &str, range: Range| -> WhateverResult { + let num_rows = range.len(); + let mut columns = Vec::new(); + + // Add primary key column (tag_0) as dictionary array + let mut tag_builder = StringDictionaryBuilder::::new(); + for _ in 0..num_rows { + tag_builder.append_value(tag); + } + columns.push(Arc::new(tag_builder.finish()) as ArrayRef); + + columns.push(Arc::new(StructArray::new( + Fields::from(vec![ + Field::new("a_a", DataType::Utf8, true), + Field::new_struct( + "a_", + Fields::from(vec![Field::new("a_b", DataType::Int32, true)]), + true, + ), + ]), + vec![ + Arc::new(StringArray::from_iter_values( + range.clone().map(|i| format!("s_{i}")), + )) as ArrayRef, + Arc::new(StructArray::new( + Fields::from(vec![Field::new("a_b", DataType::Int32, true)]), + vec![Arc::new(Int32Array::from_iter_values(range.clone())) as ArrayRef], + None, + )), + ], + None, + ))); + + // Add time index column (ts) + let timestamps: Vec = range.map(|v| v as i64).collect(); + columns.push(Arc::new(TimestampMillisecondArray::from(timestamps))); + + // Add encoded primary key column + let pk = new_primary_key(&[tag]); + let mut pk_builder = BinaryDictionaryBuilder::::new(); + for _ in 0..num_rows { + pk_builder.append(&pk).unwrap(); + } + columns.push(Arc::new(pk_builder.finish())); + + // Add sequence column + columns.push(Arc::new(UInt64Array::from_value(1000, num_rows))); + + // Add op_type column + columns.push(Arc::new(UInt8Array::from_value( + OpType::Put as u8, + num_rows, + ))); + + Ok(RecordBatch::try_new(flat_schema.clone(), columns).map_err(|e| e.to_string())?) + }; + let flat_source = new_flat_source_from_record_batches(vec![ + create_record_batch("x", 0..10)?, + create_record_batch("y", 10..20)?, + create_record_batch("z", 20..30)?, + ]); + + let info = ParquetWriter::new_with_object_store( + object_store.clone(), + metadata.clone(), + IndexConfig::default(), + NoopIndexBuilder, + RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare), + &mut Metrics::new(WriteType::Flush), + ) + .await + .write_all_flat( + flat_source, + &WriteOptions { + row_group_size: 16, + ..Default::default() + }, + ) + .await? + .remove(0); + + assert_eq!(30, info.num_rows); + assert!(info.file_size > 0); + assert_eq!( + ( + Timestamp::new_millisecond(0), + Timestamp::new_millisecond(29) + ), + info.time_range + ); + + let (context, selection) = ParquetReaderBuilder::new( + FILE_DIR.to_string(), + PathType::Bare, + create_file_handle_from_sst_info(&info, &metadata), + object_store, + ) + .flat_format(true) + .build_reader_input(&mut ReaderMetrics::default()) + .await?; + + assert_eq!(selection.row_group_count(), 2); + assert_eq!(selection.row_count(), 30); + + let mut record_batches = vec![]; + let mut ranges = SmallVec::<[_; 2]>::new(); + FileRangeBuilder::new(Arc::new(context), selection).build_ranges(-1, &mut ranges); + for range in ranges { + let Some(mut reader) = range.flat_reader(None).await? else { + continue; + }; + while let Some(record_batch) = reader.next_batch()? { + record_batches.push(record_batch); + } + } + + let expects = vec![ + r#" ++-------+----------------------------+-------------------------+------------------------+------------+-----------+ +| tag_0 | a | ts | __primary_key | __sequence | __op_type | ++-------+----------------------------+-------------------------+------------------------+------------+-----------+ +| x | {a_a: s_0, a_: {a_b: 0}} | 1970-01-01T00:00:00 | 0101780000000000000001 | 1000 | 1 | +| x | {a_a: s_1, a_: {a_b: 1}} | 1970-01-01T00:00:00.001 | 0101780000000000000001 | 1000 | 1 | +| x | {a_a: s_2, a_: {a_b: 2}} | 1970-01-01T00:00:00.002 | 0101780000000000000001 | 1000 | 1 | +| x | {a_a: s_3, a_: {a_b: 3}} | 1970-01-01T00:00:00.003 | 0101780000000000000001 | 1000 | 1 | +| x | {a_a: s_4, a_: {a_b: 4}} | 1970-01-01T00:00:00.004 | 0101780000000000000001 | 1000 | 1 | +| x | {a_a: s_5, a_: {a_b: 5}} | 1970-01-01T00:00:00.005 | 0101780000000000000001 | 1000 | 1 | +| x | {a_a: s_6, a_: {a_b: 6}} | 1970-01-01T00:00:00.006 | 0101780000000000000001 | 1000 | 1 | +| x | {a_a: s_7, a_: {a_b: 7}} | 1970-01-01T00:00:00.007 | 0101780000000000000001 | 1000 | 1 | +| x | {a_a: s_8, a_: {a_b: 8}} | 1970-01-01T00:00:00.008 | 0101780000000000000001 | 1000 | 1 | +| x | {a_a: s_9, a_: {a_b: 9}} | 1970-01-01T00:00:00.009 | 0101780000000000000001 | 1000 | 1 | +| y | {a_a: s_10, a_: {a_b: 10}} | 1970-01-01T00:00:00.010 | 0101790000000000000001 | 1000 | 1 | +| y | {a_a: s_11, a_: {a_b: 11}} | 1970-01-01T00:00:00.011 | 0101790000000000000001 | 1000 | 1 | +| y | {a_a: s_12, a_: {a_b: 12}} | 1970-01-01T00:00:00.012 | 0101790000000000000001 | 1000 | 1 | +| y | {a_a: s_13, a_: {a_b: 13}} | 1970-01-01T00:00:00.013 | 0101790000000000000001 | 1000 | 1 | +| y | {a_a: s_14, a_: {a_b: 14}} | 1970-01-01T00:00:00.014 | 0101790000000000000001 | 1000 | 1 | +| y | {a_a: s_15, a_: {a_b: 15}} | 1970-01-01T00:00:00.015 | 0101790000000000000001 | 1000 | 1 | ++-------+----------------------------+-------------------------+------------------------+------------+-----------+"#, + r#" ++-------+----------------------------+-------------------------+------------------------+------------+-----------+ +| tag_0 | a | ts | __primary_key | __sequence | __op_type | ++-------+----------------------------+-------------------------+------------------------+------------+-----------+ +| y | {a_a: s_16, a_: {a_b: 16}} | 1970-01-01T00:00:00.016 | 0101790000000000000001 | 1000 | 1 | +| y | {a_a: s_17, a_: {a_b: 17}} | 1970-01-01T00:00:00.017 | 0101790000000000000001 | 1000 | 1 | +| y | {a_a: s_18, a_: {a_b: 18}} | 1970-01-01T00:00:00.018 | 0101790000000000000001 | 1000 | 1 | +| y | {a_a: s_19, a_: {a_b: 19}} | 1970-01-01T00:00:00.019 | 0101790000000000000001 | 1000 | 1 | +| z | {a_a: s_20, a_: {a_b: 20}} | 1970-01-01T00:00:00.020 | 01017a0000000000000001 | 1000 | 1 | +| z | {a_a: s_21, a_: {a_b: 21}} | 1970-01-01T00:00:00.021 | 01017a0000000000000001 | 1000 | 1 | +| z | {a_a: s_22, a_: {a_b: 22}} | 1970-01-01T00:00:00.022 | 01017a0000000000000001 | 1000 | 1 | +| z | {a_a: s_23, a_: {a_b: 23}} | 1970-01-01T00:00:00.023 | 01017a0000000000000001 | 1000 | 1 | +| z | {a_a: s_24, a_: {a_b: 24}} | 1970-01-01T00:00:00.024 | 01017a0000000000000001 | 1000 | 1 | +| z | {a_a: s_25, a_: {a_b: 25}} | 1970-01-01T00:00:00.025 | 01017a0000000000000001 | 1000 | 1 | +| z | {a_a: s_26, a_: {a_b: 26}} | 1970-01-01T00:00:00.026 | 01017a0000000000000001 | 1000 | 1 | +| z | {a_a: s_27, a_: {a_b: 27}} | 1970-01-01T00:00:00.027 | 01017a0000000000000001 | 1000 | 1 | +| z | {a_a: s_28, a_: {a_b: 28}} | 1970-01-01T00:00:00.028 | 01017a0000000000000001 | 1000 | 1 | +| z | {a_a: s_29, a_: {a_b: 29}} | 1970-01-01T00:00:00.029 | 01017a0000000000000001 | 1000 | 1 | ++-------+----------------------------+-------------------------+------------------------+------------+-----------+"#, + ]; + assert_eq!(record_batches.len(), expects.len()); + for (record_batch, expect) in record_batches.iter().zip(expects) { + assert_eq!(record_batch.pretty_print(), expect.trim()); + } + Ok(()) + } } diff --git a/src/mito2/src/sst/parquet/flat_format.rs b/src/mito2/src/sst/parquet/flat_format.rs index d6b061e468..94f1217ec5 100644 --- a/src/mito2/src/sst/parquet/flat_format.rs +++ b/src/mito2/src/sst/parquet/flat_format.rs @@ -322,6 +322,18 @@ impl FlatReadFormat { return Ok(false); } + if metadata + .schema + .arrow_schema() + .fields() + .iter() + .any(|x| x.data_type().is_nested()) + { + // Definitely not legacy if any data type is nested. Because the nested data types, + // like "struct", are only introduced in the new version of GreptimeDB. + return Ok(false); + } + // For flat format, compute expected column number: // all columns + internal columns (pk, sequence, op_type) let expected_columns = metadata.column_metadatas.len() + INTERNAL_COLUMN_NUM; diff --git a/src/mito2/src/sst/parquet/row_group.rs b/src/mito2/src/sst/parquet/row_group.rs index b8baf7960f..bd57f3bfb8 100644 --- a/src/mito2/src/sst/parquet/row_group.rs +++ b/src/mito2/src/sst/parquet/row_group.rs @@ -17,15 +17,15 @@ use std::ops::Range; use std::sync::Arc; -use bytes::{Buf, Bytes}; +use bytes::Bytes; use object_store::ObjectStore; use parquet::arrow::ProjectionMask; use parquet::arrow::arrow_reader::{RowGroups, RowSelection}; -use parquet::column::page::{PageIterator, PageReader}; +use parquet::arrow::async_reader::{ColumnChunkData, ColumnChunkIterator}; +use parquet::column::page::PageIterator; use parquet::errors::{ParquetError, Result}; use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData}; use parquet::file::page_index::offset_index::OffsetIndexMetaData; -use parquet::file::reader::{ChunkReader, Length}; use parquet::file::serialized_reader::SerializedPageReader; use store_api::storage::{FileId, RegionId}; use tokio::task::yield_now; @@ -595,78 +595,8 @@ impl RowGroups for InMemoryRowGroup<'_> { // Creates a page reader to read column at `i`. let page_reader = self.base.column_reader(i)?; - Ok(Box::new(ColumnChunkIterator { - reader: Some(Ok(Box::new(page_reader))), - })) + Ok(Box::new(ColumnChunkIterator::new(Some(Ok(Box::new( + page_reader, + )))))) } } - -/// An in-memory column chunk -#[derive(Clone)] -pub(crate) enum ColumnChunkData { - /// Column chunk data representing only a subset of data pages - Sparse { - /// Length of the full column chunk - length: usize, - /// Set of data pages included in this sparse chunk. Each element is a tuple - /// of (page offset, page data) - data: Vec<(usize, Bytes)>, - }, - /// Full column chunk and its offset - Dense { offset: usize, data: Bytes }, -} - -impl ColumnChunkData { - fn get(&self, start: u64) -> Result { - match &self { - ColumnChunkData::Sparse { data, .. } => data - .binary_search_by_key(&start, |(offset, _)| *offset as u64) - .map(|idx| data[idx].1.clone()) - .map_err(|_| { - ParquetError::General(format!( - "Invalid offset in sparse column chunk data: {start}" - )) - }), - ColumnChunkData::Dense { offset, data } => { - let start = start as usize - *offset; - Ok(data.slice(start..)) - } - } - } -} - -impl Length for ColumnChunkData { - fn len(&self) -> u64 { - match &self { - ColumnChunkData::Sparse { length, .. } => *length as u64, - ColumnChunkData::Dense { data, .. } => data.len() as u64, - } - } -} - -impl ChunkReader for ColumnChunkData { - type T = bytes::buf::Reader; - - fn get_read(&self, start: u64) -> Result { - Ok(self.get(start)?.reader()) - } - - fn get_bytes(&self, start: u64, length: usize) -> Result { - Ok(self.get(start)?.slice(..length)) - } -} - -/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`] -pub(crate) struct ColumnChunkIterator { - pub(crate) reader: Option>>, -} - -impl Iterator for ColumnChunkIterator { - type Item = Result>; - - fn next(&mut self) -> Option { - self.reader.take() - } -} - -impl PageIterator for ColumnChunkIterator {} diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index b8496c27de..896fc9ab19 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -33,6 +33,7 @@ use datafusion::physical_plan::filter_pushdown::{ ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, }; use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; +use datafusion::physical_plan::projection::ProjectionExec; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, RecordBatchStream as DfRecordBatchStream, @@ -438,6 +439,13 @@ impl ExecutionPlan for RegionScanExec { "RegionScanExec" } + fn try_swapping_with_projection( + &self, + _projection: &ProjectionExec, + ) -> DfResult>> { + todo!() + } + fn handle_child_pushdown_result( &self, _phase: FilterPushdownPhase,