Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
luofucong
2026-01-27 16:57:42 +08:00
parent 235eb39e5b
commit 64e99739d6
14 changed files with 589 additions and 117 deletions

50
Cargo.lock generated
View File

@@ -317,8 +317,7 @@ dependencies = [
[[package]]
name = "arrow"
version = "57.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4df8bb5b0bd64c0b9bc61317fcc480bad0f00e56d3bc32c69a4c8dada4786bae"
source = "git+https://github.com/GreptimeTeam/arrow-rs.git?rev=ea8d41eaa336cd85b1116a32918eec652017f8af#ea8d41eaa336cd85b1116a32918eec652017f8af"
dependencies = [
"arrow-arith 57.0.0",
"arrow-array 57.0.0",
@@ -352,8 +351,7 @@ dependencies = [
[[package]]
name = "arrow-arith"
version = "57.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1a640186d3bd30a24cb42264c2dafb30e236a6f50d510e56d40b708c9582491"
source = "git+https://github.com/GreptimeTeam/arrow-rs.git?rev=ea8d41eaa336cd85b1116a32918eec652017f8af#ea8d41eaa336cd85b1116a32918eec652017f8af"
dependencies = [
"arrow-array 57.0.0",
"arrow-buffer 57.0.0",
@@ -382,8 +380,7 @@ dependencies = [
[[package]]
name = "arrow-array"
version = "57.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "219fe420e6800979744c8393b687afb0252b3f8a89b91027d27887b72aa36d31"
source = "git+https://github.com/GreptimeTeam/arrow-rs.git?rev=ea8d41eaa336cd85b1116a32918eec652017f8af#ea8d41eaa336cd85b1116a32918eec652017f8af"
dependencies = [
"ahash 0.8.12",
"arrow-buffer 57.0.0",
@@ -412,8 +409,7 @@ dependencies = [
[[package]]
name = "arrow-buffer"
version = "57.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76885a2697a7edf6b59577f568b456afc94ce0e2edc15b784ce3685b6c3c5c27"
source = "git+https://github.com/GreptimeTeam/arrow-rs.git?rev=ea8d41eaa336cd85b1116a32918eec652017f8af#ea8d41eaa336cd85b1116a32918eec652017f8af"
dependencies = [
"bytes",
"half",
@@ -444,8 +440,7 @@ dependencies = [
[[package]]
name = "arrow-cast"
version = "57.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c9ebb4c987e6b3b236fb4a14b20b34835abfdd80acead3ccf1f9bf399e1f168"
source = "git+https://github.com/GreptimeTeam/arrow-rs.git?rev=ea8d41eaa336cd85b1116a32918eec652017f8af#ea8d41eaa336cd85b1116a32918eec652017f8af"
dependencies = [
"arrow-array 57.0.0",
"arrow-buffer 57.0.0",
@@ -480,8 +475,7 @@ dependencies = [
[[package]]
name = "arrow-csv"
version = "57.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92386159c8d4bce96f8bd396b0642a0d544d471bdc2ef34d631aec80db40a09c"
source = "git+https://github.com/GreptimeTeam/arrow-rs.git?rev=ea8d41eaa336cd85b1116a32918eec652017f8af#ea8d41eaa336cd85b1116a32918eec652017f8af"
dependencies = [
"arrow-array 57.0.0",
"arrow-cast 57.0.0",
@@ -507,8 +501,7 @@ dependencies = [
[[package]]
name = "arrow-data"
version = "57.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "727681b95de313b600eddc2a37e736dcb21980a40f640314dcf360e2f36bc89b"
source = "git+https://github.com/GreptimeTeam/arrow-rs.git?rev=ea8d41eaa336cd85b1116a32918eec652017f8af#ea8d41eaa336cd85b1116a32918eec652017f8af"
dependencies = [
"arrow-buffer 57.0.0",
"arrow-schema 57.0.0",
@@ -520,8 +513,7 @@ dependencies = [
[[package]]
name = "arrow-flight"
version = "57.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f70bb56412a007b0cfc116d15f24dda6adeed9611a213852a004cda20085a3b9"
source = "git+https://github.com/GreptimeTeam/arrow-rs.git?rev=ea8d41eaa336cd85b1116a32918eec652017f8af#ea8d41eaa336cd85b1116a32918eec652017f8af"
dependencies = [
"arrow-array 57.0.0",
"arrow-buffer 57.0.0",
@@ -555,8 +547,7 @@ dependencies = [
[[package]]
name = "arrow-ipc"
version = "57.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da9ba92e3de170295c98a84e5af22e2b037f0c7b32449445e6c493b5fca27f27"
source = "git+https://github.com/GreptimeTeam/arrow-rs.git?rev=ea8d41eaa336cd85b1116a32918eec652017f8af#ea8d41eaa336cd85b1116a32918eec652017f8af"
dependencies = [
"arrow-array 57.0.0",
"arrow-buffer 57.0.0",
@@ -593,8 +584,7 @@ dependencies = [
[[package]]
name = "arrow-json"
version = "57.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b969b4a421ae83828591c6bf5450bd52e6d489584142845ad6a861f42fe35df8"
source = "git+https://github.com/GreptimeTeam/arrow-rs.git?rev=ea8d41eaa336cd85b1116a32918eec652017f8af#ea8d41eaa336cd85b1116a32918eec652017f8af"
dependencies = [
"arrow-array 57.0.0",
"arrow-buffer 57.0.0",
@@ -630,8 +620,7 @@ dependencies = [
[[package]]
name = "arrow-ord"
version = "57.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "141c05298b21d03e88062317a1f1a73f5ba7b6eb041b350015b1cd6aabc0519b"
source = "git+https://github.com/GreptimeTeam/arrow-rs.git?rev=ea8d41eaa336cd85b1116a32918eec652017f8af#ea8d41eaa336cd85b1116a32918eec652017f8af"
dependencies = [
"arrow-array 57.0.0",
"arrow-buffer 57.0.0",
@@ -673,8 +662,7 @@ dependencies = [
[[package]]
name = "arrow-row"
version = "57.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c5f3c06a6abad6164508ed283c7a02151515cef3de4b4ff2cebbcaeb85533db2"
source = "git+https://github.com/GreptimeTeam/arrow-rs.git?rev=ea8d41eaa336cd85b1116a32918eec652017f8af#ea8d41eaa336cd85b1116a32918eec652017f8af"
dependencies = [
"arrow-array 57.0.0",
"arrow-buffer 57.0.0",
@@ -692,8 +680,7 @@ checksum = "b3aa9e59c611ebc291c28582077ef25c97f1975383f1479b12f3b9ffee2ffabe"
[[package]]
name = "arrow-schema"
version = "57.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cfa7a03d1eee2a4d061476e1840ad5c9867a544ca6c4c59256496af5d0a8be5"
source = "git+https://github.com/GreptimeTeam/arrow-rs.git?rev=ea8d41eaa336cd85b1116a32918eec652017f8af#ea8d41eaa336cd85b1116a32918eec652017f8af"
dependencies = [
"serde",
"serde_core",
@@ -717,8 +704,7 @@ dependencies = [
[[package]]
name = "arrow-select"
version = "57.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bafa595babaad59f2455f4957d0f26448fb472722c186739f4fac0823a1bdb47"
source = "git+https://github.com/GreptimeTeam/arrow-rs.git?rev=ea8d41eaa336cd85b1116a32918eec652017f8af#ea8d41eaa336cd85b1116a32918eec652017f8af"
dependencies = [
"ahash 0.8.12",
"arrow-array 57.0.0",
@@ -748,8 +734,7 @@ dependencies = [
[[package]]
name = "arrow-string"
version = "57.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32f46457dbbb99f2650ff3ac23e46a929e0ab81db809b02aa5511c258348bef2"
source = "git+https://github.com/GreptimeTeam/arrow-rs.git?rev=ea8d41eaa336cd85b1116a32918eec652017f8af#ea8d41eaa336cd85b1116a32918eec652017f8af"
dependencies = [
"arrow-array 57.0.0",
"arrow-buffer 57.0.0",
@@ -4295,6 +4280,7 @@ dependencies = [
"async-trait",
"bytes",
"cache",
"catalog",
"client",
"common-base",
"common-catalog",
@@ -4345,6 +4331,7 @@ dependencies = [
"servers",
"session",
"snafu 0.8.6",
"sql",
"store-api",
"table",
"tokio",
@@ -9394,8 +9381,7 @@ dependencies = [
[[package]]
name = "parquet"
version = "57.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a0f31027ef1af7549f7cec603a9a21dce706d3f8d7c2060a68f43c1773be95a"
source = "git+https://github.com/GreptimeTeam/arrow-rs.git?rev=ea8d41eaa336cd85b1116a32918eec652017f8af#ea8d41eaa336cd85b1116a32918eec652017f8af"
dependencies = [
"ahash 0.8.12",
"arrow-array 57.0.0",

View File

@@ -329,6 +329,16 @@ git = "https://github.com/GreptimeTeam/greptime-meter.git"
rev = "5618e779cf2bb4755b499c630fba4c35e91898cb"
[patch.crates-io]
# on branch "greptime-57"
arrow = { git = "https://github.com/GreptimeTeam/arrow-rs.git", rev = "ea8d41eaa336cd85b1116a32918eec652017f8af" }
arrow-array = { git = "https://github.com/GreptimeTeam/arrow-rs.git", rev = "ea8d41eaa336cd85b1116a32918eec652017f8af" }
arrow-buffer = { git = "https://github.com/GreptimeTeam/arrow-rs.git", rev = "ea8d41eaa336cd85b1116a32918eec652017f8af" }
arrow-cast = { git = "https://github.com/GreptimeTeam/arrow-rs.git", rev = "ea8d41eaa336cd85b1116a32918eec652017f8af" }
arrow-flight = { git = "https://github.com/GreptimeTeam/arrow-rs.git", rev = "ea8d41eaa336cd85b1116a32918eec652017f8af" }
arrow-ipc = { git = "https://github.com/GreptimeTeam/arrow-rs.git", rev = "ea8d41eaa336cd85b1116a32918eec652017f8af" }
arrow-ord = { git = "https://github.com/GreptimeTeam/arrow-rs.git", rev = "ea8d41eaa336cd85b1116a32918eec652017f8af" }
arrow-schema = { git = "https://github.com/GreptimeTeam/arrow-rs.git", rev = "ea8d41eaa336cd85b1116a32918eec652017f8af" }
parquet = { git = "https://github.com/GreptimeTeam/arrow-rs.git", rev = "ea8d41eaa336cd85b1116a32918eec652017f8af" }
datafusion = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }

View File

@@ -13,8 +13,11 @@
// limitations under the License.
use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use snafu::{FromString, Snafu};
use crate::status_code::StatusCode;
/// Extension to [`Error`](std::error::Error) in std.
@@ -116,6 +119,39 @@ impl<T: StackError> StackError for Box<T> {
}
}
/// A simple [Result] of which the error is convertable from [ErrorExt] (which every GreptimeDB
/// error implements). Use this if you are tired of writing `unwrap`s in test codes, that you can
/// use the `?` on all GreptimeDB errors.
pub type WhateverResult<T> = Result<T, Whatever>;
#[derive(Snafu)]
#[snafu(display("{inner}"))]
pub struct Whatever {
inner: snafu::Whatever,
}
impl Debug for Whatever {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.inner)
}
}
impl<E: ErrorExt> From<E> for Whatever {
fn from(e: E) -> Self {
Self {
inner: FromString::without_source(format!("{e:?}")),
}
}
}
impl From<String> for Whatever {
fn from(s: String) -> Self {
Self {
inner: FromString::without_source(s),
}
}
}
/// An opaque boxed error based on errors that implement [ErrorExt] trait.
pub struct BoxedError {
inner: Box<dyn crate::ext::ErrorExt + Send + Sync>,

View File

@@ -14,7 +14,7 @@
use std::any::Any;
use common_error::ext::{ErrorExt, PlainError, StackError};
use common_error::ext::{ErrorExt, PlainError, StackError, WhateverResult};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::{Location, ResultExt, Snafu};
@@ -60,6 +60,34 @@ fn transparent_error() -> Result<(), MyError> {
Err(plain_error)?
}
#[test]
fn test_into_whatever_error() {
fn f(g: fn() -> Result<(), MyError>) -> WhateverResult<()> {
g()?;
Ok(())
}
let whatever = f(normal_error).unwrap_err();
assert_eq!(
normalize_path(&whatever.to_string()),
format!(
r#"0: A normal error with "display" attribute, message "blabla", at {}:55:22
1: PlainError {{ msg: "<root cause>", status_code: Unexpected }}"#,
normalize_path(file!())
)
);
let whatever = f(transparent_error).unwrap_err();
assert_eq!(
normalize_path(&whatever.to_string()),
format!(
r#"0: <transparent>, at {}:60:5
1: PlainError {{ msg: "<root cause>", status_code: Unexpected }}"#,
normalize_path(file!())
)
);
}
#[test]
fn test_output_msg() {
let result = normal_error();

View File

@@ -0,0 +1,29 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::arrow::util::pretty;
pub trait RecordBatchExt {
fn pretty_print(&self) -> String;
}
impl RecordBatchExt for RecordBatch {
fn pretty_print(&self) -> String {
match pretty::pretty_format_batches(std::slice::from_ref(self)) {
Ok(s) => s.to_string(),
Err(e) => format!("unable to pretty print {self:?}: {e}"),
}
}
}

View File

@@ -17,6 +17,7 @@
pub mod adapter;
pub mod cursor;
pub mod error;
pub mod ext;
pub mod filter;
pub mod recordbatch;
pub mod util;

View File

@@ -73,6 +73,7 @@ tracing.workspace = true
[dev-dependencies]
cache.workspace = true
catalog = { workspace = true, features = ["testing"] }
client.workspace = true
common-meta = { workspace = true, features = ["testing"] }
common-query.workspace = true
@@ -81,3 +82,4 @@ datafusion-common.workspace = true
mito2 = { workspace = true, features = ["test"] }
partition.workspace = true
session.workspace = true
sql.workspace = true

View File

@@ -22,7 +22,7 @@ pub mod event_listener;
mod greptimedb_telemetry;
pub mod heartbeat;
pub mod metrics;
mod partition_expr_fetcher;
pub mod partition_expr_fetcher;
pub mod region_server;
pub mod service;
pub mod store;

View File

@@ -0,0 +1,199 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use api::v1::SemanticType;
use catalog::memory::MemoryCatalogManager;
use common_error::ext::WhateverResult;
use common_meta::key::SchemaMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_query::Output;
use common_query::request::QueryRequest;
use datanode::config::StorageConfig;
use datanode::event_listener::NoopRegionServerEventListener;
use datanode::partition_expr_fetcher::MetaPartitionExprFetcher;
use datanode::region_server::RegionServer;
use datanode::store;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::types::{StructField, StructType};
use log_store::noop::log_store::NoopLogStore;
use mito2::engine::MitoEngineBuilder;
use mito2::sst::file_ref::FileReferenceManager;
use object_store::manager::ObjectStoreManager;
use query::QueryEngineFactory;
use query::dummy_catalog::{DummyCatalogManager, DummyTableProviderFactory};
use query::parser::QueryStatement;
use session::context::QueryContext;
use sql::dialect::GreptimeDbDialect;
use sql::parser::ParserContext;
use store_api::metadata::ColumnMetadata;
use store_api::region_request::{PathType, RegionCreateRequest, RegionRequest};
use store_api::storage::RegionId;
use table::metadata::{TableInfoBuilder, TableMetaBuilder};
use table::test_util::EmptyTable;
#[tokio::test]
async fn test_column_field_access_pushdown() -> WhateverResult<()> {
common_telemetry::init_default_ut_logging();
let column_schemas =
vec![
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
ColumnSchema::new(
"a",
ConcreteDataType::struct_datatype(StructType::new(Arc::new(vec![
StructField::new("x", ConcreteDataType::string_datatype(), true),
]))),
true,
),
ColumnSchema::new("b", ConcreteDataType::int32_datatype(), true),
];
let plan = QueryEngineFactory::new(
MemoryCatalogManager::new_with_table(EmptyTable::from_table_info(
&TableInfoBuilder::new(
"foo",
TableMetaBuilder::empty()
.schema(Arc::new(Schema::new(column_schemas.clone())))
.primary_key_indices(vec![])
.next_column_id(3)
.build()
.unwrap(),
)
.build()
.unwrap(),
)),
None,
None,
None,
None,
true,
Default::default(),
)
.query_engine()
.planner()
.plan(
&QueryStatement::Sql(
ParserContext::create_with_dialect(
"select a['x'] from foo where b > 0 order by ts limit 1",
&GreptimeDbDialect {},
Default::default(),
)?
.remove(0),
),
QueryContext::arc(),
)
.await?;
let data_home = "/tmp";
let config = StorageConfig {
data_home: data_home.to_string(),
..Default::default()
};
let kv_backend = Arc::new(MemoryKvBackend::new());
let registry = cache::build_datanode_cache_registry(kv_backend.clone());
let mito_engine = Arc::new(
MitoEngineBuilder::new(
data_home,
Default::default(),
Arc::new(NoopLogStore),
Arc::new(ObjectStoreManager::new(
"test",
store::new_object_store(config.store.clone(), data_home).await?,
)),
Arc::new(SchemaMetadataManager::new(
registry.get().unwrap(),
registry.get().unwrap(),
)),
Arc::new(FileReferenceManager::new(None)),
Arc::new(MetaPartitionExprFetcher::new(kv_backend)),
Default::default(),
0,
)
.try_build()
.await?,
);
let mut server = RegionServer::with_table_provider(
QueryEngineFactory::new(
DummyCatalogManager::arc(),
None,
None,
None,
None,
false,
Default::default(),
)
.query_engine(),
common_runtime::global_runtime(),
Box::new(NoopRegionServerEventListener),
Arc::new(DummyTableProviderFactory),
0,
Duration::from_secs(0),
Default::default(),
);
server.register_engine(mito_engine);
let region_id = RegionId::new(1024, 0);
server
.handle_request(
region_id,
RegionRequest::Create(RegionCreateRequest {
engine: "mito".to_string(),
column_metadatas: vec![
ColumnMetadata {
column_schema: column_schemas[0].clone(),
semantic_type: SemanticType::Timestamp,
column_id: 0,
},
ColumnMetadata {
column_schema: column_schemas[1].clone(),
semantic_type: SemanticType::Field,
column_id: 1,
},
ColumnMetadata {
column_schema: column_schemas[2].clone(),
semantic_type: SemanticType::Field,
column_id: 2,
},
],
primary_key: vec![],
options: HashMap::new(),
table_dir: data_home.to_string(),
path_type: PathType::Bare,
partition_expr_json: None,
}),
)
.await?;
let request = QueryRequest {
header: None,
region_id,
plan,
};
let result = server.handle_read(request).await?;
let output = Output::new_with_stream(result).data.pretty_print().await;
common_telemetry::debug!("{}", output);
Ok(())
}

View File

@@ -19,6 +19,7 @@ use bytes::Bytes;
use datatypes::arrow::array::RecordBatch;
use datatypes::arrow::error::ArrowError;
use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowGroups, RowSelection};
use parquet::arrow::async_reader::ColumnChunkIterator;
use parquet::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels};
use parquet::column::page::{PageIterator, PageReader};
use parquet::file::metadata::ParquetMetaData;
@@ -30,7 +31,7 @@ use crate::memtable::bulk::context::BulkIterContextRef;
use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::reader::RowGroupReaderContext;
use crate::sst::parquet::row_group::{ColumnChunkIterator, RowGroupBase};
use crate::sst::parquet::row_group::RowGroupBase;
/// Helper for reading specific row group inside Memtable Parquet parts.
// This is similar to [mito2::sst::parquet::row_group::InMemoryRowGroup] since
@@ -99,9 +100,9 @@ impl RowGroups for MemtableRowGroupPageFetcher<'_> {
}
fn column_chunks(&self, i: usize) -> parquet::errors::Result<Box<dyn PageIterator>> {
Ok(Box::new(ColumnChunkIterator {
reader: Some(self.column_page_reader(i)),
}))
Ok(Box::new(ColumnChunkIterator::new(Some(
self.column_page_reader(i),
))))
}
}

View File

@@ -93,30 +93,35 @@ pub struct SstInfo {
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::ops::Range;
use std::sync::Arc;
use api::v1::{OpType, SemanticType};
use common_error::ext::WhateverResult;
use common_function::function::FunctionRef;
use common_function::function_factory::ScalarFunctionFactory;
use common_function::scalars::matches::MatchesFunction;
use common_function::scalars::matches_term::MatchesTermFunction;
use common_recordbatch::ext::RecordBatchExt;
use common_time::Timestamp;
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::{BinaryExpr, Expr, Literal, Operator, col, lit};
use datatypes::arrow;
use datatypes::arrow::array::{
ArrayRef, BinaryDictionaryBuilder, RecordBatch, StringArray, StringDictionaryBuilder,
TimestampMillisecondArray, UInt8Array, UInt64Array,
ArrayRef, BinaryDictionaryBuilder, Int32Array, RecordBatch, StringArray,
StringDictionaryBuilder, StructArray, TimestampMillisecondArray, UInt8Array, UInt64Array,
};
use datatypes::arrow::datatypes::{DataType, Field, Schema, UInt32Type};
use datatypes::arrow::datatypes::{DataType, Field, Fields, Schema, UInt32Type};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{FulltextAnalyzer, FulltextBackend, FulltextOptions};
use datatypes::types::{StructField, StructType};
use object_store::ObjectStore;
use parquet::arrow::AsyncArrowWriter;
use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::file::metadata::{KeyValue, PageIndexPolicy};
use parquet::file::properties::WriterProperties;
use smallvec::SmallVec;
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
use store_api::region_request::PathType;
@@ -129,6 +134,7 @@ mod tests {
use crate::cache::test_util::assert_parquet_metadata_equal;
use crate::cache::{CacheManager, CacheStrategy, PageKey};
use crate::config::IndexConfig;
use crate::read::range::FileRangeBuilder;
use crate::read::{BatchBuilder, BatchReader, FlatSource};
use crate::region::options::{IndexOptions, InvertedIndexOptions};
use crate::sst::file::{FileHandle, FileMeta, RegionFileId, RegionIndexId};
@@ -2113,4 +2119,228 @@ mod tests {
assert_eq!(metrics.filter_metrics.rg_fulltext_filtered, 2);
assert_eq!(metrics.filter_metrics.rows_fulltext_filtered, 100);
}
#[tokio::test]
async fn test_parquet_write_and_read_with_nested_data_type() -> WhateverResult<()> {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new().await;
let object_store = env.init_object_store_manager();
let mut builder = RegionMetadataBuilder::new(REGION_ID);
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"tag_0".to_string(),
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 0,
})
// Create a column "a" with nested (struct) data type:
//
// struct<
// "a_a": string,
// "a_": struct<
// "a_b": int32,
// >
// >
//
// There are two leaf columns in the parquet: "a_a" and "a_b", which will be written and read in this test.
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"a",
ConcreteDataType::struct_datatype(StructType::new(Arc::new(vec![
StructField::new("a_a", ConcreteDataType::string_datatype(), true),
StructField::new(
"a_",
ConcreteDataType::struct_datatype(StructType::new(Arc::new(vec![
StructField::new("a_b", ConcreteDataType::int32_datatype(), true),
]))),
true,
),
]))),
true,
),
semantic_type: SemanticType::Field,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts".to_string(),
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 2,
})
.primary_key(vec![0]);
let metadata = Arc::new(builder.build()?);
let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
let create_record_batch = |tag: &str, range: Range<i32>| -> WhateverResult<RecordBatch> {
let num_rows = range.len();
let mut columns = Vec::new();
// Add primary key column (tag_0) as dictionary array
let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
for _ in 0..num_rows {
tag_builder.append_value(tag);
}
columns.push(Arc::new(tag_builder.finish()) as ArrayRef);
columns.push(Arc::new(StructArray::new(
Fields::from(vec![
Field::new("a_a", DataType::Utf8, true),
Field::new_struct(
"a_",
Fields::from(vec![Field::new("a_b", DataType::Int32, true)]),
true,
),
]),
vec![
Arc::new(StringArray::from_iter_values(
range.clone().map(|i| format!("s_{i}")),
)) as ArrayRef,
Arc::new(StructArray::new(
Fields::from(vec![Field::new("a_b", DataType::Int32, true)]),
vec![Arc::new(Int32Array::from_iter_values(range.clone())) as ArrayRef],
None,
)),
],
None,
)));
// Add time index column (ts)
let timestamps: Vec<i64> = range.map(|v| v as i64).collect();
columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
// Add encoded primary key column
let pk = new_primary_key(&[tag]);
let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
for _ in 0..num_rows {
pk_builder.append(&pk).unwrap();
}
columns.push(Arc::new(pk_builder.finish()));
// Add sequence column
columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
// Add op_type column
columns.push(Arc::new(UInt8Array::from_value(
OpType::Put as u8,
num_rows,
)));
Ok(RecordBatch::try_new(flat_schema.clone(), columns).map_err(|e| e.to_string())?)
};
let flat_source = new_flat_source_from_record_batches(vec![
create_record_batch("x", 0..10)?,
create_record_batch("y", 10..20)?,
create_record_batch("z", 20..30)?,
]);
let info = ParquetWriter::new_with_object_store(
object_store.clone(),
metadata.clone(),
IndexConfig::default(),
NoopIndexBuilder,
RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare),
&mut Metrics::new(WriteType::Flush),
)
.await
.write_all_flat(
flat_source,
&WriteOptions {
row_group_size: 16,
..Default::default()
},
)
.await?
.remove(0);
assert_eq!(30, info.num_rows);
assert!(info.file_size > 0);
assert_eq!(
(
Timestamp::new_millisecond(0),
Timestamp::new_millisecond(29)
),
info.time_range
);
let (context, selection) = ParquetReaderBuilder::new(
FILE_DIR.to_string(),
PathType::Bare,
create_file_handle_from_sst_info(&info, &metadata),
object_store,
)
.flat_format(true)
.build_reader_input(&mut ReaderMetrics::default())
.await?;
assert_eq!(selection.row_group_count(), 2);
assert_eq!(selection.row_count(), 30);
let mut record_batches = vec![];
let mut ranges = SmallVec::<[_; 2]>::new();
FileRangeBuilder::new(Arc::new(context), selection).build_ranges(-1, &mut ranges);
for range in ranges {
let Some(mut reader) = range.flat_reader(None).await? else {
continue;
};
while let Some(record_batch) = reader.next_batch()? {
record_batches.push(record_batch);
}
}
let expects = vec![
r#"
+-------+----------------------------+-------------------------+------------------------+------------+-----------+
| tag_0 | a | ts | __primary_key | __sequence | __op_type |
+-------+----------------------------+-------------------------+------------------------+------------+-----------+
| x | {a_a: s_0, a_: {a_b: 0}} | 1970-01-01T00:00:00 | 0101780000000000000001 | 1000 | 1 |
| x | {a_a: s_1, a_: {a_b: 1}} | 1970-01-01T00:00:00.001 | 0101780000000000000001 | 1000 | 1 |
| x | {a_a: s_2, a_: {a_b: 2}} | 1970-01-01T00:00:00.002 | 0101780000000000000001 | 1000 | 1 |
| x | {a_a: s_3, a_: {a_b: 3}} | 1970-01-01T00:00:00.003 | 0101780000000000000001 | 1000 | 1 |
| x | {a_a: s_4, a_: {a_b: 4}} | 1970-01-01T00:00:00.004 | 0101780000000000000001 | 1000 | 1 |
| x | {a_a: s_5, a_: {a_b: 5}} | 1970-01-01T00:00:00.005 | 0101780000000000000001 | 1000 | 1 |
| x | {a_a: s_6, a_: {a_b: 6}} | 1970-01-01T00:00:00.006 | 0101780000000000000001 | 1000 | 1 |
| x | {a_a: s_7, a_: {a_b: 7}} | 1970-01-01T00:00:00.007 | 0101780000000000000001 | 1000 | 1 |
| x | {a_a: s_8, a_: {a_b: 8}} | 1970-01-01T00:00:00.008 | 0101780000000000000001 | 1000 | 1 |
| x | {a_a: s_9, a_: {a_b: 9}} | 1970-01-01T00:00:00.009 | 0101780000000000000001 | 1000 | 1 |
| y | {a_a: s_10, a_: {a_b: 10}} | 1970-01-01T00:00:00.010 | 0101790000000000000001 | 1000 | 1 |
| y | {a_a: s_11, a_: {a_b: 11}} | 1970-01-01T00:00:00.011 | 0101790000000000000001 | 1000 | 1 |
| y | {a_a: s_12, a_: {a_b: 12}} | 1970-01-01T00:00:00.012 | 0101790000000000000001 | 1000 | 1 |
| y | {a_a: s_13, a_: {a_b: 13}} | 1970-01-01T00:00:00.013 | 0101790000000000000001 | 1000 | 1 |
| y | {a_a: s_14, a_: {a_b: 14}} | 1970-01-01T00:00:00.014 | 0101790000000000000001 | 1000 | 1 |
| y | {a_a: s_15, a_: {a_b: 15}} | 1970-01-01T00:00:00.015 | 0101790000000000000001 | 1000 | 1 |
+-------+----------------------------+-------------------------+------------------------+------------+-----------+"#,
r#"
+-------+----------------------------+-------------------------+------------------------+------------+-----------+
| tag_0 | a | ts | __primary_key | __sequence | __op_type |
+-------+----------------------------+-------------------------+------------------------+------------+-----------+
| y | {a_a: s_16, a_: {a_b: 16}} | 1970-01-01T00:00:00.016 | 0101790000000000000001 | 1000 | 1 |
| y | {a_a: s_17, a_: {a_b: 17}} | 1970-01-01T00:00:00.017 | 0101790000000000000001 | 1000 | 1 |
| y | {a_a: s_18, a_: {a_b: 18}} | 1970-01-01T00:00:00.018 | 0101790000000000000001 | 1000 | 1 |
| y | {a_a: s_19, a_: {a_b: 19}} | 1970-01-01T00:00:00.019 | 0101790000000000000001 | 1000 | 1 |
| z | {a_a: s_20, a_: {a_b: 20}} | 1970-01-01T00:00:00.020 | 01017a0000000000000001 | 1000 | 1 |
| z | {a_a: s_21, a_: {a_b: 21}} | 1970-01-01T00:00:00.021 | 01017a0000000000000001 | 1000 | 1 |
| z | {a_a: s_22, a_: {a_b: 22}} | 1970-01-01T00:00:00.022 | 01017a0000000000000001 | 1000 | 1 |
| z | {a_a: s_23, a_: {a_b: 23}} | 1970-01-01T00:00:00.023 | 01017a0000000000000001 | 1000 | 1 |
| z | {a_a: s_24, a_: {a_b: 24}} | 1970-01-01T00:00:00.024 | 01017a0000000000000001 | 1000 | 1 |
| z | {a_a: s_25, a_: {a_b: 25}} | 1970-01-01T00:00:00.025 | 01017a0000000000000001 | 1000 | 1 |
| z | {a_a: s_26, a_: {a_b: 26}} | 1970-01-01T00:00:00.026 | 01017a0000000000000001 | 1000 | 1 |
| z | {a_a: s_27, a_: {a_b: 27}} | 1970-01-01T00:00:00.027 | 01017a0000000000000001 | 1000 | 1 |
| z | {a_a: s_28, a_: {a_b: 28}} | 1970-01-01T00:00:00.028 | 01017a0000000000000001 | 1000 | 1 |
| z | {a_a: s_29, a_: {a_b: 29}} | 1970-01-01T00:00:00.029 | 01017a0000000000000001 | 1000 | 1 |
+-------+----------------------------+-------------------------+------------------------+------------+-----------+"#,
];
assert_eq!(record_batches.len(), expects.len());
for (record_batch, expect) in record_batches.iter().zip(expects) {
assert_eq!(record_batch.pretty_print(), expect.trim());
}
Ok(())
}
}

View File

@@ -322,6 +322,18 @@ impl FlatReadFormat {
return Ok(false);
}
if metadata
.schema
.arrow_schema()
.fields()
.iter()
.any(|x| x.data_type().is_nested())
{
// Definitely not legacy if any data type is nested. Because the nested data types,
// like "struct", are only introduced in the new version of GreptimeDB.
return Ok(false);
}
// For flat format, compute expected column number:
// all columns + internal columns (pk, sequence, op_type)
let expected_columns = metadata.column_metadatas.len() + INTERNAL_COLUMN_NUM;

View File

@@ -17,15 +17,15 @@
use std::ops::Range;
use std::sync::Arc;
use bytes::{Buf, Bytes};
use bytes::Bytes;
use object_store::ObjectStore;
use parquet::arrow::ProjectionMask;
use parquet::arrow::arrow_reader::{RowGroups, RowSelection};
use parquet::column::page::{PageIterator, PageReader};
use parquet::arrow::async_reader::{ColumnChunkData, ColumnChunkIterator};
use parquet::column::page::PageIterator;
use parquet::errors::{ParquetError, Result};
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
use parquet::file::page_index::offset_index::OffsetIndexMetaData;
use parquet::file::reader::{ChunkReader, Length};
use parquet::file::serialized_reader::SerializedPageReader;
use store_api::storage::{FileId, RegionId};
use tokio::task::yield_now;
@@ -595,78 +595,8 @@ impl RowGroups for InMemoryRowGroup<'_> {
// Creates a page reader to read column at `i`.
let page_reader = self.base.column_reader(i)?;
Ok(Box::new(ColumnChunkIterator {
reader: Some(Ok(Box::new(page_reader))),
}))
Ok(Box::new(ColumnChunkIterator::new(Some(Ok(Box::new(
page_reader,
))))))
}
}
/// An in-memory column chunk
#[derive(Clone)]
pub(crate) enum ColumnChunkData {
/// Column chunk data representing only a subset of data pages
Sparse {
/// Length of the full column chunk
length: usize,
/// Set of data pages included in this sparse chunk. Each element is a tuple
/// of (page offset, page data)
data: Vec<(usize, Bytes)>,
},
/// Full column chunk and its offset
Dense { offset: usize, data: Bytes },
}
impl ColumnChunkData {
fn get(&self, start: u64) -> Result<Bytes> {
match &self {
ColumnChunkData::Sparse { data, .. } => data
.binary_search_by_key(&start, |(offset, _)| *offset as u64)
.map(|idx| data[idx].1.clone())
.map_err(|_| {
ParquetError::General(format!(
"Invalid offset in sparse column chunk data: {start}"
))
}),
ColumnChunkData::Dense { offset, data } => {
let start = start as usize - *offset;
Ok(data.slice(start..))
}
}
}
}
impl Length for ColumnChunkData {
fn len(&self) -> u64 {
match &self {
ColumnChunkData::Sparse { length, .. } => *length as u64,
ColumnChunkData::Dense { data, .. } => data.len() as u64,
}
}
}
impl ChunkReader for ColumnChunkData {
type T = bytes::buf::Reader<Bytes>;
fn get_read(&self, start: u64) -> Result<Self::T> {
Ok(self.get(start)?.reader())
}
fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
Ok(self.get(start)?.slice(..length))
}
}
/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`]
pub(crate) struct ColumnChunkIterator {
pub(crate) reader: Option<Result<Box<dyn PageReader>>>,
}
impl Iterator for ColumnChunkIterator {
type Item = Result<Box<dyn PageReader>>;
fn next(&mut self) -> Option<Self::Item> {
self.reader.take()
}
}
impl PageIterator for ColumnChunkIterator {}

View File

@@ -33,6 +33,7 @@ use datafusion::physical_plan::filter_pushdown::{
ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation,
};
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
RecordBatchStream as DfRecordBatchStream,
@@ -438,6 +439,13 @@ impl ExecutionPlan for RegionScanExec {
"RegionScanExec"
}
fn try_swapping_with_projection(
&self,
_projection: &ProjectionExec,
) -> DfResult<Option<Arc<dyn ExecutionPlan>>> {
todo!()
}
fn handle_child_pushdown_result(
&self,
_phase: FilterPushdownPhase,