Compare commits

..

3 Commits

Author SHA1 Message Date
dennis zhuang
a56a00224f feat: impl vector index scan in storage (#7528)
* feat: impl vector index scan in storage

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* feat: fallback to read remote blob when blob not found

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: refactor encoding and decoding and apply suggestions

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: license

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* test: add apply_with_k tests

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: apply suggestions

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: forgot to align nulls when the vector column is not in the batch

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* test: add test for vector column is not in a batch while buiilding

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

---------

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
2026-01-12 08:30:51 +00:00
discord9
6487f14f70 feat: gc schd update repart mapping (#7517)
* feat(gc): batch gc now alos handle routing

Signed-off-by: discord9 <discord9@163.com>

typo

Signed-off-by: discord9 <discord9@163.com>

s

Signed-off-by: discord9 <discord9@163.com>

feat: use batch gc procedure

Signed-off-by: discord9 <discord9@163.com>

feat: cross region refs

Signed-off-by: discord9 <discord9@163.com>

feat: clean up repartition

Signed-off-by: discord9 <discord9@163.com>

chore: cleanup

Signed-off-by: discord9 <discord9@163.com>

per review

Signed-off-by: discord9 <discord9@163.com>

test: update mock test

Signed-off-by: discord9 <discord9@163.com>

refactor: rm unused

Signed-off-by: discord9 <discord9@163.com>

refactor: invert related_regions

Signed-off-by: discord9 <discord9@163.com>

clippy

Signed-off-by: discord9 <discord9@163.com>

pcr

Signed-off-by: discord9 <discord9@163.com>

chore: remove unused

Signed-off-by: discord9 <discord9@163.com>

fix: after invert fix

Signed-off-by: discord9 <discord9@163.com>

chore: rm unused

Signed-off-by: discord9 <discord9@163.com>

refactor: eff

Signed-off-by: discord9 <discord9@163.com>

docs: chore

Signed-off-by: discord9 <discord9@163.com>

* after rebase fix

Signed-off-by: discord9 <discord9@163.com>

* chore

Signed-off-by: discord9 <discord9@163.com>

* pcr

Signed-off-by: discord9 <discord9@163.com>

* fix: mssing region

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
2026-01-12 08:28:34 +00:00
Ruihang Xia
45b4067721 feat: always canonicalize partition expr (#7553)
* feat: always canonicalize partition expr

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix ut assertion

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2026-01-12 07:24:29 +00:00
103 changed files with 3381 additions and 2567 deletions

1551
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -100,13 +100,13 @@ rust.unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
# See for more detaiils: https://github.com/rust-lang/cargo/issues/11329
ahash = { version = "0.8", features = ["compile-time-rng"] }
aquamarine = "0.6"
arrow = { version = "57.0", features = ["prettyprint"] }
arrow-array = { version = "57.0", default-features = false, features = ["chrono-tz"] }
arrow-buffer = "57.0"
arrow-cast = "57.0"
arrow-flight = "57.0"
arrow-ipc = { version = "57.0", default-features = false, features = ["lz4", "zstd"] }
arrow-schema = { version = "57.0", features = ["serde"] }
arrow = { version = "56.2", features = ["prettyprint"] }
arrow-array = { version = "56.2", default-features = false, features = ["chrono-tz"] }
arrow-buffer = "56.2"
arrow-cast = "56.2"
arrow-flight = "56.2"
arrow-ipc = { version = "56.2", default-features = false, features = ["lz4", "zstd"] }
arrow-schema = { version = "56.2", features = ["serde"] }
async-stream = "0.3"
async-trait = "0.1"
# Remember to update axum-extra, axum-macros when updating axum
@@ -120,39 +120,38 @@ bitflags = "2.4.1"
bytemuck = "1.12"
bytes = { version = "1.7", features = ["serde"] }
chrono = { version = "0.4", features = ["serde"] }
chrono-tz = { version = "0.10", features = ["case-insensitive"] }
chrono-tz = { version = "0.10.1", features = ["case-insensitive"] }
clap = { version = "4.4", features = ["derive"] }
config = "0.13.0"
const_format = "0.2"
crossbeam-utils = "0.8"
dashmap = "6.1"
datafusion = "51.0"
datafusion-common = "51.0"
datafusion-datasource = "51.0"
datafusion-expr = "51.0"
datafusion-functions = "51.0"
datafusion-functions-aggregate-common = "51.0"
datafusion-optimizer = "51.0"
datafusion-orc = { git = "https://github.com/GreptimeTeam/datafusion-orc.git", rev = "35f2e04bf81f2ab7b6f86c0450d6a77b7098d43e" }
datafusion-pg-catalog = "0.13"
datafusion-physical-expr = "51.0"
datafusion-physical-plan = "51.0"
datafusion-sql = "51.0"
datafusion-substrait = "51.0"
datafusion = "50"
datafusion-common = "50"
datafusion-expr = "50"
datafusion-functions = "50"
datafusion-functions-aggregate-common = "50"
datafusion-optimizer = "50"
datafusion-orc = "0.5"
datafusion-pg-catalog = "0.12.3"
datafusion-physical-expr = "50"
datafusion-physical-plan = "50"
datafusion-sql = "50"
datafusion-substrait = "50"
deadpool = "0.12"
deadpool-postgres = "0.14"
derive_builder = "0.20"
derive_more = { version = "2.1", features = ["full"] }
dotenv = "0.15"
either = "1.15"
etcd-client = { version = "0.17", features = [
etcd-client = { version = "0.16.1", features = [
"tls",
"tls-roots",
] }
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "69499de6d38d9032101fa8a9e10d375e124ca8d3" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "58aeee49267fb1eafa6f9123f9d0c47dd0f62722" }
hex = "0.4"
http = "1"
humantime = "2.1"
@@ -163,7 +162,7 @@ itertools = "0.14"
jsonb = { git = "https://github.com/databendlabs/jsonb.git", rev = "8c8d2fc294a39f3ff08909d60f718639cfba3875", default-features = false }
lazy_static = "1.4"
local-ip-address = "0.6"
loki-proto = { git = "https://github.com/GreptimeTeam/loki-proto.git", rev = "a73a6b83eeb014645aac527b456816a81bd6b472" }
loki-proto = { git = "https://github.com/GreptimeTeam/loki-proto.git", rev = "3b7cd33234358b18ece977bf689dc6fb760f29ab" }
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "5618e779cf2bb4755b499c630fba4c35e91898cb" }
mockall = "0.13"
moka = "0.12"
@@ -173,7 +172,7 @@ notify = "8.0"
num_cpus = "1.16"
object_store_opendal = "0.54"
once_cell = "1.18"
opentelemetry-proto = { version = "0.31", features = [
opentelemetry-proto = { version = "0.30", features = [
"gen-tonic",
"metrics",
"trace",
@@ -181,18 +180,18 @@ opentelemetry-proto = { version = "0.31", features = [
"logs",
] }
ordered-float = { version = "4.3", features = ["serde"] }
otel-arrow-rust = { git = "https://github.com/GreptimeTeam/otel-arrow", rev = "5da284414e9b14f678344b51e5292229e4b5f8d2", features = [
otel-arrow-rust = { git = "https://github.com/GreptimeTeam/otel-arrow", rev = "2d64b7c0fa95642028a8205b36fe9ea0b023ec59", features = [
"server",
] }
parking_lot = "0.12"
parquet = { version = "57.0", default-features = false, features = ["arrow", "async", "object_store"] }
parquet = { version = "56.2", default-features = false, features = ["arrow", "async", "object_store"] }
paste = "1.0"
pin-project = "1.0"
pretty_assertions = "1.4.0"
prometheus = { version = "0.13.3", features = ["process"] }
promql-parser = { version = "0.7.1", features = ["ser"] }
prost = { version = "0.14", features = ["no-recursion-limit"] }
prost-types = "0.14"
prost = { version = "0.13", features = ["no-recursion-limit"] }
prost-types = "0.13"
raft-engine = { version = "0.4.1", default-features = false }
rand = "0.9"
ratelimit = "0.10"
@@ -223,7 +222,7 @@ simd-json = "0.15"
similar-asserts = "1.6.0"
smallvec = { version = "1", features = ["serde"] }
snafu = "0.8"
sqlparser = { version = "0.59.0", default-features = false, features = ["std", "visitor", "serde"] }
sqlparser = { version = "0.58.0", default-features = false, features = ["std", "visitor", "serde"] }
sqlx = { version = "0.8", default-features = false, features = ["any", "macros", "json", "runtime-tokio-rustls"] }
strum = { version = "0.27", features = ["derive"] }
sysinfo = "0.33"
@@ -234,7 +233,7 @@ tokio-rustls = { version = "0.26.2", default-features = false }
tokio-stream = "0.1"
tokio-util = { version = "0.7", features = ["io-util", "compat"] }
toml = "0.8.8"
tonic = { version = "0.14", features = ["tls-ring", "gzip", "zstd"] }
tonic = { version = "0.13", features = ["tls-ring", "gzip", "zstd"] }
tower = "0.5"
tower-http = "0.6"
tracing = "0.1"
@@ -322,20 +321,19 @@ git = "https://github.com/GreptimeTeam/greptime-meter.git"
rev = "5618e779cf2bb4755b499c630fba4c35e91898cb"
[patch.crates-io]
datafusion = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-functions = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-functions-aggregate-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-optimizer = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-physical-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-physical-expr-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-physical-plan = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-pg-catalog = { git = "https://github.com/GreptimeTeam/datafusion-postgres.git", rev = "74ac8e2806be6de91ff192b97f64735392539d16" }
datafusion-datasource = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-sql = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-substrait = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "d7d95a44889e099e32d78e9bad9bc00598faef28" } # on branch v0.59.x
datafusion = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-functions = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-functions-aggregate-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-optimizer = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-physical-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-physical-expr-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-physical-plan = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-datasource = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-sql = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-substrait = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "a0ce2bc6eb3e804532932f39833c32432f5c9a39" } # branch = "v0.58.x"
[profile.release]
debug = 1

View File

@@ -35,7 +35,6 @@ use mito2::sst::parquet::reader::ParquetReaderBuilder;
use mito2::sst::parquet::{PARQUET_METADATA_KEY, WriteOptions};
use mito2::worker::write_cache_from_config;
use object_store::ObjectStore;
use parquet::file::metadata::{FooterTail, KeyValue};
use regex::Regex;
use snafu::OptionExt;
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
@@ -464,6 +463,7 @@ fn extract_region_metadata(
file_path: &str,
meta: &parquet::file::metadata::ParquetMetaData,
) -> error::Result<RegionMetadataRef> {
use parquet::format::KeyValue;
let kvs: Option<&Vec<KeyValue>> = meta.file_metadata().key_value_metadata();
let Some(kvs) = kvs else {
return Err(error::IllegalConfigSnafu {
@@ -608,7 +608,7 @@ async fn load_parquet_metadata(
let buffer_len = buffer.len();
let mut footer = [0; 8];
footer.copy_from_slice(&buffer[buffer_len - FOOTER_SIZE..]);
let footer = FooterTail::try_new(&footer)?;
let footer = ParquetMetaDataReader::decode_footer_tail(&footer)?;
let metadata_len = footer.metadata_length() as u64;
if actual_size - (FOOTER_SIZE as u64) < metadata_len {
return Err("invalid footer/metadata length".into());

View File

@@ -27,14 +27,13 @@ common-recordbatch.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
datafusion.workspace = true
datafusion-datasource.workspace = true
datafusion-orc.workspace = true
datatypes.workspace = true
futures.workspace = true
lazy_static.workspace = true
object-store.workspace = true
object_store_opendal.workspace = true
orc-rust = { version = "0.7", default-features = false, features = ["async"] }
orc-rust = { version = "0.6.3", default-features = false, features = ["async"] }
parquet.workspace = true
paste.workspace = true
regex.workspace = true

View File

@@ -14,7 +14,7 @@
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use parquet::file::metadata::ParquetMetaData;
use datafusion::parquet::format::FileMetaData;
use crate::error::Result;
@@ -24,5 +24,5 @@ pub trait DfRecordBatchEncoder {
#[async_trait]
pub trait ArrowWriterCloser {
async fn close(mut self) -> Result<ParquetMetaData>;
async fn close(mut self) -> Result<FileMetaData>;
}

View File

@@ -40,6 +40,7 @@ use datafusion::datasource::physical_plan::{
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datatypes::arrow::datatypes::SchemaRef;
use futures::{StreamExt, TryStreamExt};
use object_store::ObjectStore;
use object_store_opendal::OpendalStore;
@@ -302,20 +303,24 @@ where
pub async fn file_to_stream(
store: &ObjectStore,
filename: &str,
file_schema: SchemaRef,
file_source: Arc<dyn FileSource>,
projection: Option<Vec<usize>>,
compression_type: CompressionType,
) -> Result<DfSendableRecordBatchStream> {
let df_compression: DfCompressionType = compression_type.into();
let config =
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source.clone())
.with_file_group(FileGroup::new(vec![PartitionedFile::new(
filename.to_string(),
0,
)]))
.with_projection_indices(projection)
.with_file_compression_type(df_compression)
.build();
let config = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
file_schema,
file_source.clone(),
)
.with_file_group(FileGroup::new(vec![PartitionedFile::new(
filename.to_string(),
0,
)]))
.with_projection(projection)
.with_file_compression_type(df_compression)
.build();
let store = Arc::new(OpendalStore::new(store.clone()));
let file_opener = file_source

View File

@@ -440,11 +440,14 @@ mod tests {
.await
.unwrap(),
);
let csv_source = CsvSource::new(schema).with_batch_size(8192);
let csv_source = CsvSource::new(true, b',', b'"')
.with_schema(schema.clone())
.with_batch_size(8192);
let stream = file_to_stream(
&store,
compressed_file_path_str,
schema.clone(),
csv_source.clone(),
None,
compression_type,

View File

@@ -347,11 +347,14 @@ mod tests {
.await
.unwrap(),
);
let json_source = JsonSource::new(schema).with_batch_size(8192);
let json_source = JsonSource::new()
.with_schema(schema.clone())
.with_batch_size(8192);
let stream = file_to_stream(
&store,
compressed_file_path_str,
schema.clone(),
json_source.clone(),
None,
compression_type,

View File

@@ -18,15 +18,15 @@ use std::sync::Arc;
use arrow::record_batch::RecordBatch;
use arrow_schema::Schema;
use async_trait::async_trait;
use datafusion::datasource::physical_plan::ParquetFileReaderFactory;
use datafusion::datasource::physical_plan::{FileMeta, ParquetFileReaderFactory};
use datafusion::error::Result as DatafusionResult;
use datafusion::parquet::arrow::async_reader::AsyncFileReader;
use datafusion::parquet::arrow::{ArrowWriter, parquet_to_arrow_schema};
use datafusion::parquet::errors::{ParquetError, Result as ParquetResult};
use datafusion::parquet::file::metadata::ParquetMetaData;
use datafusion::parquet::format::FileMetaData;
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_datasource::PartitionedFile;
use datatypes::schema::SchemaRef;
use futures::StreamExt;
use futures::future::BoxFuture;
@@ -100,11 +100,11 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
fn create_reader(
&self,
_partition_index: usize,
partitioned_file: PartitionedFile,
file_meta: FileMeta,
_metadata_size_hint: Option<usize>,
_metrics: &ExecutionPlanMetricsSet,
) -> DatafusionResult<Box<dyn AsyncFileReader + Send>> {
let path = partitioned_file.path().to_string();
let path = file_meta.location().to_string();
let object_store = self.object_store.clone();
Ok(Box::new(LazyParquetFileReader::new(object_store, path)))
@@ -180,7 +180,7 @@ impl DfRecordBatchEncoder for ArrowWriter<SharedBuffer> {
#[async_trait]
impl ArrowWriterCloser for ArrowWriter<SharedBuffer> {
async fn close(self) -> Result<ParquetMetaData> {
async fn close(self) -> Result<FileMetaData> {
self.close().context(error::EncodeRecordBatchSnafu)
}
}

View File

@@ -67,14 +67,14 @@ impl Test<'_> {
async fn test_json_opener() {
let store = test_store("/");
let schema = test_basic_schema();
let file_source = Arc::new(JsonSource::new(schema)).with_batch_size(test_util::TEST_BATCH_SIZE);
let file_source = Arc::new(JsonSource::new()).with_batch_size(test_util::TEST_BATCH_SIZE);
let path = &find_workspace_path("/src/common/datasource/tests/json/basic.json")
.display()
.to_string();
let tests = [
Test {
config: scan_config(None, path, file_source.clone()),
config: scan_config(schema.clone(), None, path, file_source.clone()),
file_source: file_source.clone(),
expected: vec![
"+-----+-------+",
@@ -87,7 +87,7 @@ async fn test_json_opener() {
],
},
Test {
config: scan_config(Some(1), path, file_source.clone()),
config: scan_config(schema, Some(1), path, file_source.clone()),
file_source,
expected: vec![
"+-----+------+",
@@ -112,11 +112,13 @@ async fn test_csv_opener() {
.display()
.to_string();
let file_source = CsvSource::new(schema).with_batch_size(test_util::TEST_BATCH_SIZE);
let file_source = CsvSource::new(true, b',', b'"')
.with_batch_size(test_util::TEST_BATCH_SIZE)
.with_schema(schema.clone());
let tests = [
Test {
config: scan_config(None, path, file_source.clone()),
config: scan_config(schema.clone(), None, path, file_source.clone()),
file_source: file_source.clone(),
expected: vec![
"+-----+-------+---------------------+----------+------------+",
@@ -129,7 +131,7 @@ async fn test_csv_opener() {
],
},
Test {
config: scan_config(Some(1), path, file_source.clone()),
config: scan_config(schema, Some(1), path, file_source.clone()),
file_source,
expected: vec![
"+-----+------+---------------------+----------+------------+",
@@ -156,10 +158,10 @@ async fn test_parquet_exec() {
.display()
.to_string();
let parquet_source = ParquetSource::new(schema)
let parquet_source = ParquetSource::default()
.with_parquet_file_reader_factory(Arc::new(DefaultParquetFileReaderFactory::new(store)));
let config = scan_config(None, path, Arc::new(parquet_source));
let config = scan_config(schema, None, path, Arc::new(parquet_source));
let exec = DataSourceExec::from_data_source(config);
let ctx = SessionContext::new();
@@ -195,11 +197,11 @@ async fn test_orc_opener() {
let store = test_store("/");
let schema = Arc::new(OrcFormat.infer_schema(&store, path).await.unwrap());
let file_source = Arc::new(OrcSource::new(schema.into()));
let file_source = Arc::new(OrcSource::default());
let tests = [
Test {
config: scan_config(None, path, file_source.clone()),
config: scan_config(schema.clone(), None, path, file_source.clone()),
file_source: file_source.clone(),
expected: vec![
"+----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+",
@@ -214,7 +216,7 @@ async fn test_orc_opener() {
],
},
Test {
config: scan_config(Some(1), path, file_source.clone()),
config: scan_config(schema.clone(), Some(1), path, file_source.clone()),
file_source,
expected: vec![
"+----------+-----+------+------------+---+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+-------------------------+-------------+",

View File

@@ -80,6 +80,7 @@ pub fn csv_basic_schema() -> SchemaRef {
}
pub(crate) fn scan_config(
file_schema: SchemaRef,
limit: Option<usize>,
filename: &str,
file_source: Arc<dyn FileSource>,
@@ -88,7 +89,7 @@ pub(crate) fn scan_config(
let filename = &filename.replace('\\', "/");
let file_group = FileGroup::new(vec![PartitionedFile::new(filename.clone(), 4096)]);
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_schema, file_source)
.with_file_group(file_group)
.with_limit(limit)
.build()
@@ -108,7 +109,7 @@ pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usi
let size = store.read(origin_path).await.unwrap().len();
let config = scan_config(None, origin_path, Arc::new(JsonSource::new(schema)));
let config = scan_config(schema, None, origin_path, Arc::new(JsonSource::new()));
let stream = FileStream::new(
&config,
0,
@@ -150,8 +151,10 @@ pub async fn setup_stream_to_csv_test(
let schema = csv_basic_schema();
let csv_source = CsvSource::new(schema).with_batch_size(TEST_BATCH_SIZE);
let config = scan_config(None, origin_path, csv_source.clone());
let csv_source = CsvSource::new(true, b',', b'"')
.with_schema(schema.clone())
.with_batch_size(TEST_BATCH_SIZE);
let config = scan_config(schema, None, origin_path, csv_source.clone());
let size = store.read(origin_path).await.unwrap().len();
let csv_opener = csv_source.create_file_opener(

View File

@@ -104,8 +104,7 @@ mod tests {
assert!(matches!(f.signature(),
datafusion_expr::Signature {
type_signature: datafusion_expr::TypeSignature::Uniform(1, valid_types),
volatility: datafusion_expr::Volatility::Immutable,
..
volatility: datafusion_expr::Volatility::Immutable
} if valid_types == &ConcreteDataType::numerics().into_iter().map(|dt| { use datatypes::data_type::DataType; dt.as_arrow_type() }).collect::<Vec<_>>()));
}

View File

@@ -331,8 +331,7 @@ mod tests {
assert!(matches!(f.signature(),
datafusion_expr::Signature {
type_signature: datafusion_expr::TypeSignature::Uniform(1, valid_types),
volatility: datafusion_expr::Volatility::Immutable,
..
volatility: datafusion_expr::Volatility::Immutable
} if valid_types == &vec![ArrowDataType::Utf8]));
}

View File

@@ -145,8 +145,7 @@ mod tests {
assert!(matches!(f.signature(),
datafusion_expr::Signature {
type_signature: datafusion_expr::TypeSignature::OneOf(sigs),
volatility: datafusion_expr::Volatility::Immutable,
..
volatility: datafusion_expr::Volatility::Immutable
} if sigs.len() == 2));
}

View File

@@ -341,7 +341,6 @@ impl AggregateUDFImpl for StateWrapper {
name: acc_args.name,
is_distinct: acc_args.is_distinct,
exprs: acc_args.exprs,
expr_fields: &[],
};
self.inner.accumulator(acc_args)?
};

View File

@@ -122,8 +122,7 @@ mod tests {
matches!(f.signature(),
Signature {
type_signature: TypeSignature::OneOf(sigs),
volatility: Volatility::Immutable,
..
volatility: Volatility::Immutable
} if sigs.len() == 15),
"{:?}",
f.signature()

View File

@@ -193,8 +193,7 @@ mod tests {
assert!(matches!(f.signature(),
Signature {
type_signature: TypeSignature::OneOf(sigs),
volatility: Volatility::Immutable,
..
volatility: Volatility::Immutable
} if sigs.len() == 6));
}

View File

@@ -120,8 +120,7 @@ mod tests {
matches!(f.signature(),
Signature {
type_signature: TypeSignature::OneOf(sigs),
volatility: Volatility::Immutable,
..
volatility: Volatility::Immutable
} if sigs.len() == 15),
"{:?}",
f.signature()

View File

@@ -25,6 +25,7 @@ use datafusion_common::arrow::array::{
};
use datafusion_common::arrow::datatypes::DataType;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::type_coercion::aggregates::STRINGS;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
use datatypes::arrow_array::{int_array_value_at_index, string_array_value_at_index};
use datatypes::json::JsonStructureSettings;
@@ -518,7 +519,7 @@ impl Default for JsonGetObject {
DataType::LargeBinary,
DataType::BinaryView,
],
vec![DataType::UInt8, DataType::LargeUtf8, DataType::Utf8View],
STRINGS.to_vec(),
),
}
}

View File

@@ -99,8 +99,7 @@ mod tests {
assert!(matches!(rate.signature(),
Signature {
type_signature: TypeSignature::Uniform(2, valid_types),
volatility: Volatility::Immutable,
..
volatility: Volatility::Immutable
} if valid_types == NUMERICS
));
let values = vec![1.0, 3.0, 6.0];

View File

@@ -19,10 +19,8 @@ use datafusion_common::DataFusionError;
use datafusion_common::arrow::array::{Array, AsArray, StringViewBuilder};
use datafusion_common::arrow::compute;
use datafusion_common::arrow::datatypes::DataType;
use datafusion_common::types::logical_binary;
use datafusion_expr::{
Coercion, ColumnarValue, ScalarFunctionArgs, Signature, TypeSignatureClass, Volatility,
};
use datafusion_expr::type_coercion::aggregates::BINARYS;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, TypeSignature, Volatility};
use datatypes::types::vector_type_value_to_string;
use crate::function::{Function, extract_args};
@@ -37,10 +35,11 @@ pub struct VectorToStringFunction {
impl Default for VectorToStringFunction {
fn default() -> Self {
Self {
signature: Signature::coercible(
vec![Coercion::new_exact(TypeSignatureClass::Native(
logical_binary(),
))],
signature: Signature::one_of(
vec![
TypeSignature::Uniform(1, vec![DataType::BinaryView]),
TypeSignature::Uniform(1, BINARYS.to_vec()),
],
Volatility::Immutable,
),
}

View File

@@ -15,10 +15,10 @@
use std::fmt::Display;
use datafusion::arrow::datatypes::DataType;
use datafusion::logical_expr::{Coercion, ColumnarValue, TypeSignature, TypeSignatureClass};
use datafusion::logical_expr::ColumnarValue;
use datafusion_common::ScalarValue;
use datafusion_common::types::{logical_binary, logical_string};
use datafusion_expr::{ScalarFunctionArgs, Signature, Volatility};
use datafusion_expr::type_coercion::aggregates::{BINARYS, STRINGS};
use datafusion_expr::{ScalarFunctionArgs, Signature, TypeSignature, Volatility};
use nalgebra::DVectorView;
use crate::function::Function;
@@ -36,12 +36,9 @@ impl Default for ElemAvgFunction {
Self {
signature: Signature::one_of(
vec![
TypeSignature::Coercible(vec![Coercion::new_exact(
TypeSignatureClass::Native(logical_binary()),
)]),
TypeSignature::Coercible(vec![Coercion::new_exact(
TypeSignatureClass::Native(logical_string()),
)]),
TypeSignature::Uniform(1, STRINGS.to_vec()),
TypeSignature::Uniform(1, BINARYS.to_vec()),
TypeSignature::Uniform(1, vec![DataType::BinaryView]),
],
Volatility::Immutable,
),

View File

@@ -15,10 +15,10 @@
use std::fmt::Display;
use datafusion::arrow::datatypes::DataType;
use datafusion::logical_expr::{Coercion, ColumnarValue, TypeSignature, TypeSignatureClass};
use datafusion::logical_expr::ColumnarValue;
use datafusion::logical_expr_common::type_coercion::aggregates::{BINARYS, STRINGS};
use datafusion_common::ScalarValue;
use datafusion_common::types::{logical_binary, logical_string};
use datafusion_expr::{ScalarFunctionArgs, Signature, Volatility};
use datafusion_expr::{ScalarFunctionArgs, Signature, TypeSignature, Volatility};
use nalgebra::DVectorView;
use crate::function::Function;
@@ -49,12 +49,9 @@ impl Default for ElemProductFunction {
Self {
signature: Signature::one_of(
vec![
TypeSignature::Coercible(vec![Coercion::new_exact(
TypeSignatureClass::Native(logical_binary()),
)]),
TypeSignature::Coercible(vec![Coercion::new_exact(
TypeSignatureClass::Native(logical_string()),
)]),
TypeSignature::Uniform(1, STRINGS.to_vec()),
TypeSignature::Uniform(1, BINARYS.to_vec()),
TypeSignature::Uniform(1, vec![DataType::BinaryView]),
],
Volatility::Immutable,
),

View File

@@ -15,9 +15,9 @@
use std::fmt::Display;
use datafusion::arrow::datatypes::DataType;
use datafusion::logical_expr::{Coercion, ColumnarValue, TypeSignatureClass};
use datafusion::logical_expr::ColumnarValue;
use datafusion_common::ScalarValue;
use datafusion_common::types::{logical_binary, logical_string};
use datafusion_expr::type_coercion::aggregates::{BINARYS, STRINGS};
use datafusion_expr::{ScalarFunctionArgs, Signature, TypeSignature, Volatility};
use nalgebra::DVectorView;
@@ -36,12 +36,9 @@ impl Default for ElemSumFunction {
Self {
signature: Signature::one_of(
vec![
TypeSignature::Coercible(vec![Coercion::new_exact(
TypeSignatureClass::Native(logical_binary()),
)]),
TypeSignature::Coercible(vec![Coercion::new_exact(
TypeSignatureClass::Native(logical_string()),
)]),
TypeSignature::Uniform(1, STRINGS.to_vec()),
TypeSignature::Uniform(1, BINARYS.to_vec()),
TypeSignature::Uniform(1, vec![DataType::BinaryView]),
],
Volatility::Immutable,
),

View File

@@ -15,10 +15,10 @@
use std::fmt::Display;
use datafusion::arrow::datatypes::DataType;
use datafusion::logical_expr::{Coercion, ColumnarValue, TypeSignatureClass};
use datafusion::logical_expr::ColumnarValue;
use datafusion::logical_expr_common::type_coercion::aggregates::{BINARYS, STRINGS};
use datafusion_common::ScalarValue;
use datafusion_common::types::{logical_binary, logical_string};
use datafusion_expr::{ScalarFunctionArgs, Signature, Volatility};
use datafusion_expr::{ScalarFunctionArgs, Signature, TypeSignature, Volatility};
use crate::function::Function;
use crate::scalars::vector::VectorCalculator;
@@ -47,10 +47,10 @@ pub(crate) struct VectorDimFunction {
impl Default for VectorDimFunction {
fn default() -> Self {
Self {
signature: Signature::coercible(
signature: Signature::one_of(
vec![
Coercion::new_exact(TypeSignatureClass::Native(logical_binary())),
Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
TypeSignature::Uniform(1, STRINGS.to_vec()),
TypeSignature::Uniform(1, BINARYS.to_vec()),
],
Volatility::Immutable,
),

View File

@@ -15,10 +15,10 @@
use std::fmt::Display;
use datafusion::arrow::datatypes::DataType;
use datafusion::logical_expr::{Coercion, ColumnarValue, TypeSignatureClass};
use datafusion::logical_expr::ColumnarValue;
use datafusion::logical_expr_common::type_coercion::aggregates::{BINARYS, STRINGS};
use datafusion_common::ScalarValue;
use datafusion_common::types::{logical_binary, logical_string};
use datafusion_expr::{ScalarFunctionArgs, Signature, Volatility};
use datafusion_expr::{ScalarFunctionArgs, Signature, TypeSignature, Volatility};
use nalgebra::DVectorView;
use crate::function::Function;
@@ -50,10 +50,11 @@ pub(crate) struct VectorNormFunction {
impl Default for VectorNormFunction {
fn default() -> Self {
Self {
signature: Signature::coercible(
signature: Signature::one_of(
vec![
Coercion::new_exact(TypeSignatureClass::Native(logical_binary())),
Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
TypeSignature::Uniform(1, STRINGS.to_vec()),
TypeSignature::Uniform(1, BINARYS.to_vec()),
TypeSignature::Uniform(1, vec![DataType::BinaryView]),
],
Volatility::Immutable,
),

View File

@@ -106,8 +106,7 @@ mod tests {
assert!(matches!(f.signature(),
datafusion_expr::Signature {
type_signature: datafusion_expr::TypeSignature::Uniform(1, valid_types),
volatility: datafusion_expr::Volatility::Immutable,
..
volatility: datafusion_expr::Volatility::Immutable
} if valid_types == &vec![ArrowDataType::Utf8]));
}

View File

@@ -103,11 +103,10 @@ impl FlightEncoder {
FlightMessage::RecordBatch(record_batch) => {
let (encoded_dictionaries, encoded_batch) = self
.data_gen
.encode(
.encoded_batch(
&record_batch,
&mut self.dictionary_tracker,
&self.write_options,
&mut Default::default(),
)
.expect("DictionaryTracker configured above to not fail on replacement");

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::fmt::{Display, Formatter};
use std::time::Duration;
@@ -432,11 +432,11 @@ where
pub struct GetFileRefs {
/// List of region IDs to get file references from active FileHandles (in-memory).
pub query_regions: Vec<RegionId>,
/// Mapping from the source region ID (where to read the manifest) to
/// the target region IDs (whose file references to look for).
/// Key: The region ID of the manifest.
/// Value: The list of region IDs to find references for in that manifest.
pub related_regions: HashMap<RegionId, Vec<RegionId>>,
/// Mapping from the src region IDs (whose file references to look for) to
/// the dst region IDs (where to read the manifests).
/// Key: The source region IDs (where files originally came from).
/// Value: The set of destination region IDs (whose manifests need to be read).
pub related_regions: HashMap<RegionId, HashSet<RegionId>>,
}
impl Display for GetFileRefs {

View File

@@ -7,6 +7,7 @@ license.workspace = true
[dependencies]
common-error.workspace = true
common-macro.workspace = true
prost.workspace = true
snafu.workspace = true
tokio.workspace = true

View File

@@ -1145,11 +1145,10 @@ impl TryFrom<ScalarValue> for Value {
ScalarValue::List(array) => {
// this is for item type
let datatype = ConcreteDataType::try_from(&array.value_type())?;
let scalar_values = ScalarValue::convert_array_to_scalar_vec(array.as_ref())
.context(ConvertArrowArrayToScalarsSnafu)?;
let items = scalar_values
let items = ScalarValue::convert_array_to_scalar_vec(array.as_ref())
.context(ConvertArrowArrayToScalarsSnafu)?
.into_iter()
.flat_map(|v| v.unwrap_or_else(|| vec![ScalarValue::Null]))
.flatten()
.map(|x| x.try_into())
.collect::<Result<Vec<Value>>>()?;
Value::List(ListValue::new(items, Arc::new(datatype)))
@@ -2998,7 +2997,6 @@ pub(crate) mod tests {
.unwrap()
.into_iter()
.flatten()
.flatten()
.collect::<Vec<_>>();
assert_eq!(
vs,

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::any::Any;
use std::borrow::Borrow;
use std::sync::Arc;
use arrow::array::{Array, ArrayBuilder, ArrayIter, ArrayRef, BooleanArray, BooleanBuilder};
@@ -68,8 +69,8 @@ impl From<Vec<Option<bool>>> for BooleanVector {
}
}
impl FromIterator<Option<bool>> for BooleanVector {
fn from_iter<T: IntoIterator<Item = Option<bool>>>(iter: T) -> Self {
impl<Ptr: Borrow<Option<bool>>> FromIterator<Ptr> for BooleanVector {
fn from_iter<I: IntoIterator<Item = Ptr>>(iter: I) -> Self {
BooleanVector {
array: BooleanArray::from_iter(iter),
}
@@ -302,7 +303,7 @@ mod tests {
#[test]
fn test_boolean_vector_from_iter() {
let input = vec![Some(false), Some(true), Some(false), Some(true)];
let vec = input.iter().cloned().collect::<BooleanVector>();
let vec = input.iter().collect::<BooleanVector>();
assert_eq!(4, vec.len());
for (i, v) in input.into_iter().enumerate() {
assert_eq!(v, vec.get_data(i), "Failed at {i}")

View File

@@ -170,11 +170,10 @@ impl Helper {
ScalarValue::List(array) => {
let item_type = Arc::new(ConcreteDataType::try_from(&array.value_type())?);
let mut builder = ListVectorBuilder::with_type_capacity(item_type.clone(), 1);
let scalar_values = ScalarValue::convert_array_to_scalar_vec(array.as_ref())
.context(ConvertArrowArrayToScalarsSnafu)?;
let values = scalar_values
let values = ScalarValue::convert_array_to_scalar_vec(array.as_ref())
.context(ConvertArrowArrayToScalarsSnafu)?
.into_iter()
.flat_map(|v| v.unwrap_or_else(|| vec![ScalarValue::Null]))
.flatten()
.map(ScalarValue::try_into)
.collect::<Result<Vec<Value>>>()?;
builder.push(Some(ListValueRef::Ref {

View File

@@ -18,7 +18,6 @@ use common_datasource::file_format::Format;
use common_datasource::file_format::csv::CsvFormat;
use common_datasource::file_format::parquet::DefaultParquetFileReaderFactory;
use datafusion::common::ToDFSchema;
use datafusion::config::CsvOptions;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{
@@ -35,6 +34,7 @@ use datafusion::prelude::SessionContext;
use datafusion_expr::expr::Expr;
use datafusion_expr::utils::conjunction;
use datafusion_orc::OrcSource;
use datatypes::arrow::datatypes::Schema as ArrowSchema;
use datatypes::schema::SchemaRef;
use object_store::ObjectStore;
use snafu::ResultExt;
@@ -45,6 +45,7 @@ const DEFAULT_BATCH_SIZE: usize = 8192;
fn build_record_batch_stream(
scan_plan_config: &ScanPlanConfig,
file_schema: Arc<ArrowSchema>,
limit: Option<usize>,
file_source: Arc<dyn FileSource>,
) -> Result<DfSendableRecordBatchStream> {
@@ -54,12 +55,15 @@ fn build_record_batch_stream(
.map(|filename| PartitionedFile::new(filename.clone(), 0))
.collect::<Vec<_>>();
let config =
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source.clone())
.with_projection_indices(scan_plan_config.projection.cloned())
.with_limit(limit)
.with_file_group(FileGroup::new(files))
.build();
let config = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
file_schema,
file_source.clone(),
)
.with_projection(scan_plan_config.projection.cloned())
.with_limit(limit)
.with_file_group(FileGroup::new(files))
.build();
let store = Arc::new(object_store_opendal::OpendalStore::new(
scan_plan_config.store.clone(),
@@ -85,14 +89,11 @@ fn new_csv_stream(
// push down limit only if there is no filter
let limit = config.filters.is_empty().then_some(config.limit).flatten();
let options = CsvOptions::default()
.with_has_header(format.has_header)
.with_delimiter(format.delimiter);
let csv_source = CsvSource::new(file_schema)
.with_csv_options(options)
let csv_source = CsvSource::new(format.has_header, format.delimiter, b'"')
.with_schema(file_schema.clone())
.with_batch_size(DEFAULT_BATCH_SIZE);
build_record_batch_stream(config, limit, csv_source)
build_record_batch_stream(config, file_schema, limit, csv_source)
}
fn new_json_stream(config: &ScanPlanConfig) -> Result<DfSendableRecordBatchStream> {
@@ -101,8 +102,8 @@ fn new_json_stream(config: &ScanPlanConfig) -> Result<DfSendableRecordBatchStrea
// push down limit only if there is no filter
let limit = config.filters.is_empty().then_some(config.limit).flatten();
let file_source = JsonSource::new(file_schema).with_batch_size(DEFAULT_BATCH_SIZE);
build_record_batch_stream(config, limit, file_source)
let file_source = JsonSource::new().with_batch_size(DEFAULT_BATCH_SIZE);
build_record_batch_stream(config, file_schema, limit, file_source)
}
fn new_parquet_stream_with_exec_plan(
@@ -125,10 +126,9 @@ fn new_parquet_stream_with_exec_plan(
.collect::<Vec<_>>(),
);
let mut parquet_source = ParquetSource::new(file_schema.clone())
.with_parquet_file_reader_factory(Arc::new(DefaultParquetFileReaderFactory::new(
store.clone(),
)));
let mut parquet_source = ParquetSource::default().with_parquet_file_reader_factory(Arc::new(
DefaultParquetFileReaderFactory::new(store.clone()),
));
// build predicate filter
let filters = filters.to_vec();
@@ -143,12 +143,15 @@ fn new_parquet_stream_with_exec_plan(
parquet_source = parquet_source.with_predicate(filters);
};
let file_scan_config =
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), Arc::new(parquet_source))
.with_file_group(file_group)
.with_projection_indices(projection.cloned())
.with_limit(*limit)
.build();
let file_scan_config = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
file_schema,
Arc::new(parquet_source),
)
.with_file_group(file_group)
.with_projection(projection.cloned())
.with_limit(*limit)
.build();
// TODO(ruihang): get this from upper layer
let task_ctx = SessionContext::default().task_ctx();
@@ -167,8 +170,8 @@ fn new_orc_stream(config: &ScanPlanConfig) -> Result<DfSendableRecordBatchStream
// push down limit only if there is no filter
let limit = config.filters.is_empty().then_some(config.limit).flatten();
let file_source = OrcSource::new(file_schema.into()).with_batch_size(DEFAULT_BATCH_SIZE);
build_record_batch_stream(config, limit, file_source)
let file_source = OrcSource::default().with_batch_size(DEFAULT_BATCH_SIZE);
build_record_batch_stream(config, file_schema, limit, file_source)
}
#[derive(Debug, Clone)]

View File

@@ -12,9 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// TODO(discord9): remove this once gc scheduler is fully merged
#![allow(unused)]
use std::collections::{HashMap, HashSet};
use common_meta::peer::Peer;
@@ -29,6 +26,7 @@ mod options;
mod procedure;
mod scheduler;
mod tracker;
mod util;
pub use options::GcSchedulerOptions;
pub use procedure::BatchGcProcedure;

View File

@@ -12,29 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_meta::datanode::RegionStat;
use common_meta::instruction::{
GcRegions, GetFileRefs, GetFileRefsReply, Instruction, InstructionReply,
};
use common_meta::key::TableMetadataManagerRef;
use common_meta::key::table_route::PhysicalTableRouteValue;
use common_meta::peer::Peer;
use common_procedure::{ProcedureManagerRef, ProcedureWithId, watcher};
use common_telemetry::{debug, error, warn};
use common_telemetry::debug;
use snafu::{OptionExt as _, ResultExt as _};
use store_api::storage::{FileId, FileRefsManifest, GcReport, RegionId};
use store_api::storage::{GcReport, RegionId};
use table::metadata::TableId;
use crate::cluster::MetaPeerClientRef;
use crate::error::{self, Result, TableMetadataManagerSnafu, UnexpectedSnafu};
use crate::gc::Region2Peers;
use crate::gc::procedure::{BatchGcProcedure, GcRegionProcedure};
use crate::handler::HeartbeatMailbox;
use crate::service::mailbox::{Channel, MailboxRef};
use crate::error::{self, Result, TableMetadataManagerSnafu};
use crate::gc::procedure::BatchGcProcedure;
use crate::service::mailbox::MailboxRef;
#[async_trait::async_trait]
pub(crate) trait SchedulerCtx: Send + Sync {
@@ -45,19 +38,9 @@ pub(crate) trait SchedulerCtx: Send + Sync {
table_id: TableId,
) -> Result<(TableId, PhysicalTableRouteValue)>;
async fn get_file_references(
&self,
query_regions: &[RegionId],
related_regions: HashMap<RegionId, Vec<RegionId>>,
region_routes: &Region2Peers,
timeout: Duration,
) -> Result<FileRefsManifest>;
async fn gc_regions(
&self,
peer: Peer,
region_ids: &[RegionId],
file_refs_manifest: &FileRefsManifest,
full_file_listing: bool,
timeout: Duration,
) -> Result<GcReport>;
@@ -100,7 +83,7 @@ impl SchedulerCtx for DefaultGcSchedulerCtx {
let dn_stats = self.meta_peer_client.get_all_dn_stat_kvs().await?;
let mut table_to_region_stats: HashMap<TableId, Vec<RegionStat>> = HashMap::new();
for (_dn_id, stats) in dn_stats {
let mut stats = stats.stats;
let stats = stats.stats;
let Some(latest_stat) = stats.iter().max_by_key(|s| s.timestamp_millis).cloned() else {
continue;
@@ -129,142 +112,34 @@ impl SchedulerCtx for DefaultGcSchedulerCtx {
async fn gc_regions(
&self,
peer: Peer,
region_ids: &[RegionId],
file_refs_manifest: &FileRefsManifest,
full_file_listing: bool,
timeout: Duration,
) -> Result<GcReport> {
self.gc_regions_inner(
peer,
region_ids,
file_refs_manifest,
full_file_listing,
timeout,
)
.await
}
async fn get_file_references(
&self,
query_regions: &[RegionId],
related_regions: HashMap<RegionId, Vec<RegionId>>,
region_routes: &Region2Peers,
timeout: Duration,
) -> Result<FileRefsManifest> {
debug!(
"Getting file references for {} regions",
query_regions.len()
);
// Group regions by datanode to minimize RPC calls
let mut datanode2query_regions: HashMap<Peer, Vec<RegionId>> = HashMap::new();
for region_id in query_regions {
if let Some((leader, followers)) = region_routes.get(region_id) {
datanode2query_regions
.entry(leader.clone())
.or_default()
.push(*region_id);
// also need to send for follower regions for file refs in case query is running on follower
for follower in followers {
datanode2query_regions
.entry(follower.clone())
.or_default()
.push(*region_id);
}
} else {
return error::UnexpectedSnafu {
violated: format!(
"region_routes: {region_routes:?} does not contain region_id: {region_id}",
),
}
.fail();
}
}
let mut datanode2related_regions: HashMap<Peer, HashMap<RegionId, Vec<RegionId>>> =
HashMap::new();
for (related_region, queries) in related_regions {
if let Some((leader, followers)) = region_routes.get(&related_region) {
datanode2related_regions
.entry(leader.clone())
.or_default()
.insert(related_region, queries.clone());
} // since read from manifest, no need to send to followers
}
// Send GetFileRefs instructions to each datanode
let mut all_file_refs: HashMap<RegionId, HashSet<_>> = HashMap::new();
let mut all_manifest_versions = HashMap::new();
for (peer, regions) in datanode2query_regions {
let related_regions = datanode2related_regions.remove(&peer).unwrap_or_default();
match self
.send_get_file_refs_instruction(&peer, &regions, related_regions, timeout)
.await
{
Ok(manifest) => {
// TODO(discord9): if other regions provide file refs for one region on other datanode, and no version,
// is it correct to merge manifest_version directly?
// FIXME: follower region how to merge version???
for (region_id, file_refs) in manifest.file_refs {
all_file_refs
.entry(region_id)
.or_default()
.extend(file_refs);
}
// region manifest version should be the smallest one among all peers, so outdated region can be detected
for (region_id, version) in manifest.manifest_version {
let entry = all_manifest_versions.entry(region_id).or_insert(version);
*entry = (*entry).min(version);
}
}
Err(e) => {
warn!(
"Failed to get file refs from datanode {}: {}. Skipping regions on this datanode.",
peer, e
);
// Continue processing other datanodes instead of failing the entire operation
continue;
}
}
}
Ok(FileRefsManifest {
file_refs: all_file_refs,
manifest_version: all_manifest_versions,
})
self.gc_regions_inner(region_ids, full_file_listing, timeout)
.await
}
}
impl DefaultGcSchedulerCtx {
async fn gc_regions_inner(
&self,
peer: Peer,
region_ids: &[RegionId],
file_refs_manifest: &FileRefsManifest,
full_file_listing: bool,
timeout: Duration,
) -> Result<GcReport> {
debug!(
"Sending GC instruction to datanode {} for {} regions (full_file_listing: {})",
peer,
"Sending GC instruction for {} regions (full_file_listing: {})",
region_ids.len(),
full_file_listing
);
let gc_regions = GcRegions {
regions: region_ids.to_vec(),
file_refs_manifest: file_refs_manifest.clone(),
full_file_listing,
};
let procedure = GcRegionProcedure::new(
let procedure = BatchGcProcedure::new(
self.mailbox.clone(),
self.table_metadata_manager.clone(),
self.server_addr.clone(),
peer,
gc_regions,
format!("GC for {} regions", region_ids.len()),
region_ids.to_vec(),
full_file_listing,
timeout,
);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
@@ -285,96 +160,8 @@ impl DefaultGcSchedulerCtx {
),
})?;
let gc_report = GcRegionProcedure::cast_result(res)?;
let gc_report = BatchGcProcedure::cast_result(res)?;
Ok(gc_report)
}
/// TODO(discord9): add support to read manifest of related regions for file refs too
/// (now it's only reading active FileHandles)
async fn send_get_file_refs_instruction(
&self,
peer: &Peer,
query_regions: &[RegionId],
related_regions: HashMap<RegionId, Vec<RegionId>>,
timeout: Duration,
) -> Result<FileRefsManifest> {
debug!(
"Sending GetFileRefs instruction to datanode {} for {} regions",
peer,
query_regions.len()
);
let instruction = Instruction::GetFileRefs(GetFileRefs {
query_regions: query_regions.to_vec(),
related_regions,
});
let reply = self
.send_instruction(peer, instruction, "Get file references", timeout)
.await?;
let InstructionReply::GetFileRefs(GetFileRefsReply {
file_refs_manifest,
success,
error,
}) = reply
else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: format!("{:?}", reply),
reason: "Unexpected reply of the GetFileRefs instruction",
}
.fail();
};
if !success {
return error::UnexpectedSnafu {
violated: format!(
"Failed to get file references from datanode {}: {:?}",
peer, error
),
}
.fail();
}
Ok(file_refs_manifest)
}
async fn send_instruction(
&self,
peer: &Peer,
instruction: Instruction,
description: &str,
timeout: Duration,
) -> Result<InstructionReply> {
let msg = MailboxMessage::json_message(
&format!("{}: {}", description, instruction),
&format!("Metasrv@{}", self.server_addr),
&format!("Datanode-{}@{}", peer.id, peer.addr),
common_time::util::current_time_millis(),
&instruction,
)
.with_context(|_| error::SerializeToJsonSnafu {
input: instruction.to_string(),
})?;
let mailbox_rx = self
.mailbox
.send(&Channel::Datanode(peer.id), msg, timeout)
.await?;
match mailbox_rx.await {
Ok(reply_msg) => {
let reply = HeartbeatMailbox::json_reply(&reply_msg)?;
Ok(reply)
}
Err(e) => {
error!(
"Failed to receive reply from datanode {} for {}: {}",
peer, description, e
);
Err(e)
}
}
}
}

View File

@@ -15,24 +15,17 @@
use std::collections::{HashMap, HashSet};
use std::time::Instant;
use common_meta::key::table_route::PhysicalTableRouteValue;
use common_meta::peer::Peer;
use common_telemetry::{debug, error, info, warn};
use futures::StreamExt;
use itertools::Itertools;
use store_api::storage::{FileRefsManifest, GcReport, RegionId};
use store_api::storage::{GcReport, RegionId};
use table::metadata::TableId;
use tokio::time::sleep;
use crate::error::Result;
use crate::gc::candidate::GcCandidate;
use crate::gc::scheduler::{GcJobReport, GcScheduler};
use crate::gc::tracker::RegionGcInfo;
use crate::region;
pub(crate) type Region2Peers = HashMap<RegionId, (Peer, Vec<Peer>)>;
pub(crate) type Peer2Regions = HashMap<Peer, HashSet<RegionId>>;
impl GcScheduler {
/// Iterate through all region stats, find region that might need gc, and send gc instruction to
@@ -61,6 +54,8 @@ impl GcScheduler {
.aggregate_candidates_by_datanode(per_table_candidates)
.await?;
// TODO(discord9): add deleted regions from repartition mapping
if datanode_to_candidates.is_empty() {
info!("No valid datanode candidates found, skipping GC cycle");
return Ok(Default::default());
@@ -83,17 +78,6 @@ impl GcScheduler {
Ok(report)
}
/// Find related regions that might share files with the candidate regions.
/// Currently returns the same regions since repartition is not implemented yet.
/// TODO(discord9): When repartition is implemented, this should also find src/dst regions
/// that might share files with the candidate regions.
pub(crate) async fn find_related_regions(
&self,
candidate_region_ids: &[RegionId],
) -> Result<HashMap<RegionId, Vec<RegionId>>> {
Ok(candidate_region_ids.iter().map(|&r| (r, vec![r])).collect())
}
/// Aggregate GC candidates by their corresponding datanode peer.
pub(crate) async fn aggregate_candidates_by_datanode(
&self,
@@ -210,28 +194,11 @@ impl GcScheduler {
let all_region_ids: Vec<RegionId> = candidates.iter().map(|(_, c)| c.region_id).collect();
let all_related_regions = self.find_related_regions(&all_region_ids).await?;
let (region_to_peer, _) = self
.discover_datanodes_for_regions(&all_related_regions.keys().cloned().collect_vec())
.await?;
// Step 1: Get file references for all regions on this datanode
let file_refs_manifest = self
.ctx
.get_file_references(
&all_region_ids,
all_related_regions,
&region_to_peer,
self.config.mailbox_timeout,
)
.await?;
// Step 2: Create a single GcRegionProcedure for all regions on this datanode
// Step 2: Run GC for all regions on this datanode in a single batch
let (gc_report, fully_listed_regions) = {
// Partition regions into full listing and fast listing in a single pass
let mut batch_full_listing_decisions =
let batch_full_listing_decisions =
self.batch_should_use_full_listing(&all_region_ids).await;
let need_full_list_regions = batch_full_listing_decisions
@@ -242,7 +209,7 @@ impl GcScheduler {
},
)
.collect_vec();
let mut fast_list_regions = batch_full_listing_decisions
let fast_list_regions = batch_full_listing_decisions
.iter()
.filter_map(
|(&region_id, &need_full)| {
@@ -257,13 +224,7 @@ impl GcScheduler {
if !fast_list_regions.is_empty() {
match self
.ctx
.gc_regions(
peer.clone(),
&fast_list_regions,
&file_refs_manifest,
false,
self.config.mailbox_timeout,
)
.gc_regions(&fast_list_regions, false, self.config.mailbox_timeout)
.await
{
Ok(report) => combined_report.merge(report),
@@ -284,13 +245,7 @@ impl GcScheduler {
if !need_full_list_regions.is_empty() {
match self
.ctx
.gc_regions(
peer.clone(),
&need_full_list_regions,
&file_refs_manifest,
true,
self.config.mailbox_timeout,
)
.gc_regions(&need_full_list_regions, true, self.config.mailbox_timeout)
.await
{
Ok(report) => combined_report.merge(report),
@@ -330,98 +285,6 @@ impl GcScheduler {
Ok(gc_report)
}
/// Discover datanodes for the given regions(and it's related regions) by fetching table routes in batches.
/// Returns mappings from region to peer(leader, Vec<followers>) and peer to regions.
async fn discover_datanodes_for_regions(
&self,
regions: &[RegionId],
) -> Result<(Region2Peers, Peer2Regions)> {
let all_related_regions = self
.find_related_regions(regions)
.await?
.into_iter()
.flat_map(|(k, mut v)| {
v.push(k);
v
})
.collect_vec();
let mut region_to_peer = HashMap::new();
let mut peer_to_regions = HashMap::new();
// Group regions by table ID for batch processing
let mut table_to_regions: HashMap<TableId, Vec<RegionId>> = HashMap::new();
for region_id in all_related_regions {
let table_id = region_id.table_id();
table_to_regions
.entry(table_id)
.or_default()
.push(region_id);
}
// Process each table's regions together for efficiency
for (table_id, table_regions) in table_to_regions {
match self.ctx.get_table_route(table_id).await {
Ok((_phy_table_id, table_route)) => {
self.get_table_regions_peer(
&table_route,
&table_regions,
&mut region_to_peer,
&mut peer_to_regions,
);
}
Err(e) => {
// Continue with other tables instead of failing completely
// TODO(discord9): consider failing here instead
warn!(
"Failed to get table route for table {}: {}, skipping its regions",
table_id, e
);
continue;
}
}
}
Ok((region_to_peer, peer_to_regions))
}
/// Process regions for a single table to find their current leader peers.
fn get_table_regions_peer(
&self,
table_route: &PhysicalTableRouteValue,
table_regions: &[RegionId],
region_to_peer: &mut Region2Peers,
peer_to_regions: &mut Peer2Regions,
) {
for &region_id in table_regions {
let mut found = false;
// Find the region in the table route
for region_route in &table_route.region_routes {
if region_route.region.id == region_id
&& let Some(leader_peer) = &region_route.leader_peer
{
region_to_peer.insert(
region_id,
(leader_peer.clone(), region_route.follower_peers.clone()),
);
peer_to_regions
.entry(leader_peer.clone())
.or_default()
.insert(region_id);
found = true;
break;
}
}
if !found {
warn!(
"Failed to find region {} in table route or no leader peer found",
region_id,
);
}
}
}
async fn batch_should_use_full_listing(
&self,
region_ids: &[RegionId],

View File

@@ -36,10 +36,9 @@ use store_api::storage::{FileRefsManifest, GcReport, RegionId};
use table::metadata::TableId;
use tokio::sync::mpsc::Sender;
use crate::error::{Result, UnexpectedSnafu};
use crate::error::Result;
use crate::gc::candidate::GcCandidate;
use crate::gc::ctx::SchedulerCtx;
use crate::gc::handler::Region2Peers;
use crate::gc::options::GcSchedulerOptions;
use crate::gc::scheduler::{Event, GcScheduler};
@@ -67,12 +66,10 @@ pub struct MockSchedulerCtx {
pub gc_reports: Arc<Mutex<HashMap<RegionId, GcReport>>>,
pub candidates: Arc<Mutex<Option<HashMap<TableId, Vec<GcCandidate>>>>>,
pub get_table_to_region_stats_calls: Arc<Mutex<usize>>,
pub get_file_references_calls: Arc<Mutex<usize>>,
pub gc_regions_calls: Arc<Mutex<usize>>,
// Error injection fields for testing
pub get_table_to_region_stats_error: Arc<Mutex<Option<crate::error::Error>>>,
pub get_table_route_error: Arc<Mutex<Option<crate::error::Error>>>,
pub get_file_references_error: Arc<Mutex<Option<crate::error::Error>>>,
pub gc_regions_error: Arc<Mutex<Option<crate::error::Error>>>,
// Retry testing fields
pub gc_regions_retry_count: Arc<Mutex<HashMap<RegionId, usize>>>,
@@ -119,57 +116,12 @@ impl MockSchedulerCtx {
*self.get_table_route_error.lock().unwrap() = Some(error);
}
/// Set an error to be returned by `get_file_references`
#[allow(dead_code)]
pub fn with_get_file_references_error(self, error: crate::error::Error) -> Self {
*self.get_file_references_error.lock().unwrap() = Some(error);
self
}
/// Set an error to be returned by `gc_regions`
pub fn with_gc_regions_error(self, error: crate::error::Error) -> Self {
*self.gc_regions_error.lock().unwrap() = Some(error);
self
}
/// Set a sequence of errors to be returned by `gc_regions` for retry testing
pub fn set_gc_regions_error_sequence(&self, errors: Vec<crate::error::Error>) {
*self.gc_regions_error_sequence.lock().unwrap() = errors;
}
/// Set success after a specific number of retries for a region
pub fn set_gc_regions_success_after_retries(&self, region_id: RegionId, retries: usize) {
self.gc_regions_success_after_retries
.lock()
.unwrap()
.insert(region_id, retries);
}
/// Get the retry count for a specific region
pub fn get_retry_count(&self, region_id: RegionId) -> usize {
self.gc_regions_retry_count
.lock()
.unwrap()
.get(&region_id)
.copied()
.unwrap_or(0)
}
/// Reset all retry tracking
pub fn reset_retry_tracking(&self) {
*self.gc_regions_retry_count.lock().unwrap() = HashMap::new();
*self.gc_regions_error_sequence.lock().unwrap() = Vec::new();
*self.gc_regions_success_after_retries.lock().unwrap() = HashMap::new();
}
/// Set an error to be returned for a specific region
pub fn set_gc_regions_error_for_region(&self, region_id: RegionId, error: crate::error::Error) {
self.gc_regions_per_region_errors
.lock()
.unwrap()
.insert(region_id, error);
}
/// Clear per-region errors
#[allow(unused)]
pub fn clear_gc_regions_per_region_errors(&self) {
@@ -213,39 +165,9 @@ impl SchedulerCtx for MockSchedulerCtx {
.unwrap_or_else(|| (table_id, PhysicalTableRouteValue::default())))
}
async fn get_file_references(
&self,
query_regions: &[RegionId],
_related_regions: HashMap<RegionId, Vec<RegionId>>,
region_to_peer: &Region2Peers,
_timeout: Duration,
) -> Result<FileRefsManifest> {
*self.get_file_references_calls.lock().unwrap() += 1;
// Check if we should return an injected error
if let Some(error) = self.get_file_references_error.lock().unwrap().take() {
return Err(error);
}
if query_regions
.iter()
.any(|region_id| !region_to_peer.contains_key(region_id))
{
UnexpectedSnafu {
violated: format!(
"region_to_peer{region_to_peer:?} does not contain all region_ids requested: {:?}",
query_regions
),
}.fail()?;
}
Ok(self.file_refs.lock().unwrap().clone().unwrap_or_default())
}
async fn gc_regions(
&self,
_peer: Peer,
region_ids: &[RegionId],
_file_refs_manifest: &FileRefsManifest,
_full_file_listing: bool,
_timeout: Duration,
) -> Result<GcReport> {

View File

@@ -152,7 +152,6 @@ async fn test_handle_tick() {
);
assert_eq!(*ctx.get_table_to_region_stats_calls.lock().unwrap(), 1);
assert_eq!(*ctx.get_file_references_calls.lock().unwrap(), 1);
assert_eq!(*ctx.gc_regions_calls.lock().unwrap(), 1);
let tracker = scheduler.region_gc_tracker.lock().await;

View File

@@ -64,6 +64,7 @@ async fn test_gc_regions_failure_handling() {
region_id,
HashSet::from([FileRef::new(region_id, FileId::random(), None)]),
)]),
cross_region_refs: HashMap::new(),
};
let ctx = Arc::new(
@@ -121,10 +122,6 @@ async fn test_gc_regions_failure_handling() {
1,
"Expected 1 call to get_table_to_region_stats"
);
assert!(
*ctx.get_file_references_calls.lock().unwrap() >= 1,
"Expected at least 1 call to get_file_references"
);
assert!(
*ctx.gc_regions_calls.lock().unwrap() >= 1,
"Expected at least 1 call to gc_regions"
@@ -206,13 +203,6 @@ async fn test_get_file_references_failure() {
datanode_report.deleted_files[&region_id].is_empty(),
"Should have empty deleted files due to file refs failure"
);
// Should still attempt to get file references (may be called multiple times due to retry logic)
assert!(
*ctx.get_file_references_calls.lock().unwrap() >= 1,
"Expected at least 1 call to get_file_references, got {}",
*ctx.get_file_references_calls.lock().unwrap()
);
}
#[tokio::test]
@@ -255,42 +245,22 @@ async fn test_get_table_route_failure() {
last_tracker_cleanup: Arc::new(tokio::sync::Mutex::new(Instant::now())),
};
// Get candidates first
let stats = &ctx
.table_to_region_stats
.lock()
.unwrap()
.clone()
.unwrap_or_default();
let candidates = scheduler.select_gc_candidates(stats).await.unwrap();
// Test the full workflow to trigger table route failure during aggregation
// The table route failure should cause the entire GC cycle to fail
let result = scheduler.handle_tick().await;
// Convert table-based candidates to datanode-based candidates
let datanode_to_candidates = HashMap::from([(
Peer::new(1, ""),
candidates
.into_iter()
.flat_map(|(table_id, candidates)| candidates.into_iter().map(move |c| (table_id, c)))
.collect(),
)]);
// This should handle table route failure gracefully
let report = scheduler
.parallel_process_datanodes(datanode_to_candidates)
.await;
// Should process the datanode but handle route error gracefully
assert_eq!(
report.per_datanode_reports.len(),
0,
"Expected 0 datanode report"
);
assert_eq!(
report.failed_datanodes.len(),
1,
"Expected 1 failed datanodes (route error handled gracefully)"
);
// The table route failure should be propagated as an error
assert!(
report.failed_datanodes.contains_key(&1),
"Failed datanodes should contain the datanode with route error"
result.is_err(),
"Expected table route failure to propagate as error"
);
// Verify the error message contains our simulated failure
let error = result.unwrap_err();
let error_msg = format!("{}", error);
assert!(
error_msg.contains("Simulated table route failure for testing"),
"Error message should contain our simulated failure: {}",
error_msg
);
}

View File

@@ -123,11 +123,6 @@ async fn test_full_gc_workflow() {
1,
"Expected 1 call to get_table_to_region_stats"
);
assert_eq!(
*ctx.get_file_references_calls.lock().unwrap(),
1,
"Expected 1 call to get_file_references"
);
assert_eq!(
*ctx.gc_regions_calls.lock().unwrap(),
1,

View File

@@ -19,6 +19,8 @@ use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_meta::instruction::{self, GcRegions, GetFileRefs, GetFileRefsReply, InstructionReply};
use common_meta::key::TableMetadataManagerRef;
use common_meta::key::table_route::PhysicalTableRouteValue;
use common_meta::lock_key::RegionLock;
use common_meta::peer::Peer;
use common_procedure::error::ToJsonSnafu;
@@ -26,14 +28,16 @@ use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
Result as ProcedureResult, Status,
};
use common_telemetry::{debug, error, info, warn};
use common_telemetry::{error, info, warn};
use itertools::Itertools as _;
use serde::{Deserialize, Serialize};
use snafu::ResultExt as _;
use store_api::storage::{FileRefsManifest, GcReport, RegionId};
use table::metadata::TableId;
use crate::error::{self, Result, SerializeToJsonSnafu};
use crate::gc::Region2Peers;
use crate::error::{self, KvBackendSnafu, Result, SerializeToJsonSnafu, TableMetadataManagerSnafu};
use crate::gc::util::table_route_to_region;
use crate::gc::{Peer2Regions, Region2Peers};
use crate::handler::HeartbeatMailbox;
use crate::service::mailbox::{Channel, MailboxRef};
@@ -146,56 +150,73 @@ async fn send_gc_regions(
}
}
/// TODO(discord9): another procedure which do both get file refs and gc regions.
pub struct GcRegionProcedure {
/// Procedure to perform get file refs then batch GC for multiple regions,
/// it holds locks for all regions during the whole procedure.
pub struct BatchGcProcedure {
mailbox: MailboxRef,
data: GcRegionData,
table_metadata_manager: TableMetadataManagerRef,
data: BatchGcData,
}
#[derive(Serialize, Deserialize)]
pub struct GcRegionData {
pub struct BatchGcData {
state: State,
/// Meta server address
server_addr: String,
peer: Peer,
gc_regions: GcRegions,
description: String,
/// The regions to be GC-ed
regions: Vec<RegionId>,
full_file_listing: bool,
region_routes: Region2Peers,
/// Related regions (e.g., for shared files after repartition).
/// The source regions (where those files originally came from) are used as the key, and the destination regions (where files are currently stored) are used as the value.
related_regions: HashMap<RegionId, HashSet<RegionId>>,
/// Acquired file references (Populated in Acquiring state)
file_refs: FileRefsManifest,
/// mailbox timeout duration
timeout: Duration,
gc_report: Option<GcReport>,
}
impl GcRegionProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::GcRegionProcedure";
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum State {
/// Initial state
Start,
/// Fetching file references from datanodes
Acquiring,
/// Sending GC instruction to the target datanode
Gcing,
/// Updating region repartition info in kvbackend after GC based on the GC result
UpdateRepartition,
}
impl BatchGcProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::BatchGcProcedure";
pub fn new(
mailbox: MailboxRef,
table_metadata_manager: TableMetadataManagerRef,
server_addr: String,
peer: Peer,
gc_regions: GcRegions,
description: String,
regions: Vec<RegionId>,
full_file_listing: bool,
timeout: Duration,
) -> Self {
Self {
mailbox,
data: GcRegionData {
peer,
table_metadata_manager,
data: BatchGcData {
state: State::Start,
server_addr,
gc_regions,
description,
regions,
full_file_listing,
timeout,
region_routes: HashMap::new(),
related_regions: HashMap::new(),
file_refs: FileRefsManifest::default(),
gc_report: None,
},
}
}
async fn send_gc_instr(&self) -> Result<GcReport> {
send_gc_regions(
&self.mailbox,
&self.data.peer,
self.data.gc_regions.clone(),
&self.data.server_addr,
self.data.timeout,
&self.data.description,
)
.await
}
pub fn cast_result(res: Arc<dyn Any>) -> Result<GcReport> {
res.downcast_ref::<GcReport>().cloned().ok_or_else(|| {
error::UnexpectedSnafu {
@@ -207,111 +228,129 @@ impl GcRegionProcedure {
.build()
})
}
}
#[async_trait::async_trait]
impl Procedure for GcRegionProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
// Send GC instruction to the datanode. This procedure only handle lock&send, results or other kind of
// errors will be reported back via the oneshot channel.
let reply = self
.send_gc_instr()
async fn get_table_route(
&self,
table_id: TableId,
) -> Result<(TableId, PhysicalTableRouteValue)> {
self.table_metadata_manager
.table_route_manager()
.get_physical_table_route(table_id)
.await
.map_err(ProcedureError::external)?;
Ok(Status::done_with_output(reply))
.context(TableMetadataManagerSnafu)
}
fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
}
/// Read lock all regions involved in this GC procedure.
/// So i.e. region migration won't happen during GC and cause race conditions.
///
/// only read lock the regions not catatlog/schema because it can run concurrently with other procedures(i.e. drop database/table)
/// TODO:(discord9): integration test to verify this
fn lock_key(&self) -> LockKey {
let lock_key: Vec<_> = self
.data
.gc_regions
.regions
.iter()
.sorted() // sort to have a deterministic lock order
.map(|id| RegionLock::Read(*id).into())
.collect();
LockKey::new(lock_key)
}
}
/// Procedure to perform get file refs then batch GC for multiple regions, should only be used by admin function
/// for triggering manual gc, as it holds locks for too long and for all regions during the procedure.
pub struct BatchGcProcedure {
mailbox: MailboxRef,
data: BatchGcData,
}
#[derive(Serialize, Deserialize)]
pub struct BatchGcData {
state: State,
server_addr: String,
/// The regions to be GC-ed
regions: Vec<RegionId>,
full_file_listing: bool,
region_routes: Region2Peers,
/// Related regions (e.g., for shared files). Map: RegionId -> List of related RegionIds.
related_regions: HashMap<RegionId, Vec<RegionId>>,
/// Acquired file references (Populated in Acquiring state)
file_refs: FileRefsManifest,
/// mailbox timeout duration
timeout: Duration,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum State {
/// Initial state
Start,
/// Fetching file references from datanodes
Acquiring,
/// Sending GC instruction to the target datanode
Gcing,
}
impl BatchGcProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::BatchGcProcedure";
pub fn new(
mailbox: MailboxRef,
server_addr: String,
regions: Vec<RegionId>,
full_file_listing: bool,
region_routes: Region2Peers,
related_regions: HashMap<RegionId, Vec<RegionId>>,
timeout: Duration,
) -> Self {
Self {
mailbox,
data: BatchGcData {
state: State::Start,
server_addr,
regions,
full_file_listing,
region_routes,
related_regions,
file_refs: FileRefsManifest::default(),
timeout,
},
/// Return related regions for the given regions.
/// The returned map uses the source regions (where those files originally came from) as the key,
/// and the destination regions (where files are currently stored) as the value.
/// If a region is not found in the repartition manager, the returned map still have this region as key,
/// just empty value
async fn find_related_regions(
&self,
regions: &[RegionId],
) -> Result<HashMap<RegionId, HashSet<RegionId>>> {
let repart_mgr = self.table_metadata_manager.table_repart_manager();
let mut related_regions: HashMap<RegionId, HashSet<RegionId>> = HashMap::new();
for src_region in regions {
// TODO(discord9): batch get
if let Some(dst_regions) = repart_mgr
.get_dst_regions(*src_region)
.await
.context(KvBackendSnafu)?
{
related_regions.insert(*src_region, dst_regions.into_iter().collect());
} else {
related_regions.insert(*src_region, Default::default());
}
}
Ok(related_regions)
}
/// Clean up region repartition info in kvbackend after GC
/// according to cross reference in `FileRefsManifest`.
async fn cleanup_region_repartition(&self) -> Result<()> {
for (src_region, dst_regions) in self.data.file_refs.cross_region_refs.iter() {
// TODO(discord9): batch update
self.table_metadata_manager
.table_repart_manager()
.update_mappings(*src_region, &dst_regions.iter().cloned().collect_vec())
.await
.context(KvBackendSnafu)?;
}
Ok(())
}
/// Discover region routes for the given regions.
async fn discover_route_for_regions(
&self,
regions: &[RegionId],
) -> Result<(Region2Peers, Peer2Regions)> {
let mut region_to_peer = HashMap::new();
let mut peer_to_regions = HashMap::new();
// Group regions by table ID for batch processing
let mut table_to_regions: HashMap<TableId, Vec<RegionId>> = HashMap::new();
for region_id in regions {
let table_id = region_id.table_id();
table_to_regions
.entry(table_id)
.or_default()
.push(*region_id);
}
// Process each table's regions together for efficiency
for (table_id, table_regions) in table_to_regions {
match self.get_table_route(table_id).await {
Ok((_phy_table_id, table_route)) => {
table_route_to_region(
&table_route,
&table_regions,
&mut region_to_peer,
&mut peer_to_regions,
);
}
Err(e) => {
// Continue with other tables instead of failing completely
// TODO(discord9): consider failing here instead
warn!(
"Failed to get table route for table {}: {}, skipping its regions",
table_id, e
);
continue;
}
}
}
Ok((region_to_peer, peer_to_regions))
}
/// Set region routes and related regions for GC procedure
async fn set_routes_and_related_regions(&mut self) -> Result<()> {
let related_regions = self.find_related_regions(&self.data.regions).await?;
self.data.related_regions = related_regions.clone();
// Discover routes for all regions involved in GC, including both the
// primary GC regions and their related regions.
let mut regions_set: HashSet<RegionId> = self.data.regions.iter().cloned().collect();
regions_set.extend(related_regions.keys().cloned());
regions_set.extend(related_regions.values().flat_map(|v| v.iter()).cloned());
let regions_to_discover = regions_set.into_iter().collect_vec();
let (region_to_peer, _) = self
.discover_route_for_regions(&regions_to_discover)
.await?;
self.data.region_routes = region_to_peer;
Ok(())
}
/// Get file references from all datanodes that host the regions
async fn get_file_references(&self) -> Result<FileRefsManifest> {
use std::collections::{HashMap, HashSet};
async fn get_file_references(&mut self) -> Result<FileRefsManifest> {
self.set_routes_and_related_regions().await?;
let query_regions = &self.data.regions;
let related_regions = &self.data.related_regions;
@@ -344,20 +383,25 @@ impl BatchGcProcedure {
}
}
let mut datanode2related_regions: HashMap<Peer, HashMap<RegionId, Vec<RegionId>>> =
let mut datanode2related_regions: HashMap<Peer, HashMap<RegionId, HashSet<RegionId>>> =
HashMap::new();
for (related_region, queries) in related_regions {
if let Some((leader, _followers)) = region_routes.get(related_region) {
datanode2related_regions
.entry(leader.clone())
.or_default()
.insert(*related_region, queries.clone());
} // since read from manifest, no need to send to followers
for (src_region, dst_regions) in related_regions {
for dst_region in dst_regions {
if let Some((leader, _followers)) = region_routes.get(dst_region) {
datanode2related_regions
.entry(leader.clone())
.or_default()
.entry(*src_region)
.or_default()
.insert(*dst_region);
} // since read from manifest, no need to send to followers
}
}
// Send GetFileRefs instructions to each datanode
let mut all_file_refs: HashMap<RegionId, HashSet<_>> = HashMap::new();
let mut all_manifest_versions = HashMap::new();
let mut all_cross_region_refs = HashMap::new();
for (peer, regions) in datanode2query_regions {
let related_regions_for_peer =
@@ -400,17 +444,25 @@ impl BatchGcProcedure {
let entry = all_manifest_versions.entry(region_id).or_insert(version);
*entry = (*entry).min(version);
}
for (region_id, related_region_ids) in reply.file_refs_manifest.cross_region_refs {
let entry = all_cross_region_refs
.entry(region_id)
.or_insert_with(HashSet::new);
entry.extend(related_region_ids);
}
}
Ok(FileRefsManifest {
file_refs: all_file_refs,
manifest_version: all_manifest_versions,
cross_region_refs: all_cross_region_refs,
})
}
/// Send GC instruction to all datanodes that host the regions,
/// returns regions that need retry.
async fn send_gc_instructions(&self) -> Result<Vec<RegionId>> {
async fn send_gc_instructions(&self) -> Result<GcReport> {
let regions = &self.data.regions;
let region_routes = &self.data.region_routes;
let file_refs = &self.data.file_refs;
@@ -418,6 +470,7 @@ impl BatchGcProcedure {
// Group regions by datanode
let mut datanode2regions: HashMap<Peer, Vec<RegionId>> = HashMap::new();
let mut all_report = GcReport::default();
for region_id in regions {
if let Some((leader, _followers)) = region_routes.get(region_id) {
@@ -469,10 +522,15 @@ impl BatchGcProcedure {
peer, success, need_retry
);
}
all_need_retry.extend(report.need_retry_regions);
all_need_retry.extend(report.need_retry_regions.clone());
all_report.merge(report);
}
Ok(all_need_retry.into_iter().collect())
if !all_need_retry.is_empty() {
warn!("Regions need retry after batch GC: {:?}", all_need_retry);
}
Ok(all_report)
}
}
@@ -507,12 +565,10 @@ impl Procedure for BatchGcProcedure {
// Send GC instructions to all datanodes
// TODO(discord9): handle need-retry regions
match self.send_gc_instructions().await {
Ok(_) => {
info!(
"Batch GC completed successfully for regions {:?}",
self.data.regions
);
Ok(Status::done())
Ok(report) => {
self.data.state = State::UpdateRepartition;
self.data.gc_report = Some(report);
Ok(Status::executing(false))
}
Err(e) => {
error!("Failed to send GC instructions: {}", e);
@@ -520,6 +576,29 @@ impl Procedure for BatchGcProcedure {
}
}
}
State::UpdateRepartition => match self.cleanup_region_repartition().await {
Ok(()) => {
info!(
"Cleanup region repartition info completed successfully for regions {:?}",
self.data.regions
);
info!(
"Batch GC completed successfully for regions {:?}",
self.data.regions
);
let Some(report) = self.data.gc_report.take() else {
return common_procedure::error::UnexpectedSnafu {
err_msg: "GC report should be present after GC completion".to_string(),
}
.fail();
};
Ok(Status::done_with_output(report))
}
Err(e) => {
error!("Failed to cleanup region repartition info: {}", e);
Err(ProcedureError::external(e))
}
},
}
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
@@ -38,29 +38,6 @@ pub struct GcJobReport {
pub per_datanode_reports: HashMap<DatanodeId, GcReport>,
pub failed_datanodes: HashMap<DatanodeId, Vec<Error>>,
}
impl GcJobReport {
pub fn merge(&mut self, mut other: GcJobReport) {
// merge per_datanode_reports&failed_datanodes
for (dn_id, report) in other.per_datanode_reports {
let self_report = self.per_datanode_reports.entry(dn_id).or_default();
self_report.merge(report);
}
let all_failed_dn_ids = self
.failed_datanodes
.keys()
.cloned()
.chain(other.failed_datanodes.keys().cloned())
.collect::<HashSet<_>>();
for dn_id in all_failed_dn_ids {
let entry = self.failed_datanodes.entry(dn_id).or_default();
if let Some(other_errors) = other.failed_datanodes.remove(&dn_id) {
entry.extend(other_errors);
}
}
self.failed_datanodes
.retain(|dn_id, _| !self.per_datanode_reports.contains_key(dn_id));
}
}
/// [`Event`] represents various types of events that can be processed by the gc ticker.
///

View File

@@ -30,15 +30,6 @@ pub(crate) struct RegionGcInfo {
pub(crate) last_full_listing_time: Option<Instant>,
}
impl RegionGcInfo {
pub(crate) fn new(last_gc_time: Instant) -> Self {
Self {
last_gc_time,
last_full_listing_time: None,
}
}
}
/// Tracks the last GC time for regions to implement cooldown.
pub(crate) type RegionGcTracker = HashMap<RegionId, RegionGcInfo>;
@@ -46,7 +37,7 @@ impl GcScheduler {
/// Clean up stale entries from the region GC tracker if enough time has passed.
/// This removes entries for regions that no longer exist in the current table routes.
pub(crate) async fn cleanup_tracker_if_needed(&self) -> Result<()> {
let mut last_cleanup = *self.last_tracker_cleanup.lock().await;
let last_cleanup = *self.last_tracker_cleanup.lock().await;
let now = Instant::now();
// Check if enough time has passed since last cleanup
@@ -85,25 +76,6 @@ impl GcScheduler {
Ok(())
}
/// Determine if full file listing should be used for a region based on the last full listing time.
pub(crate) async fn should_use_full_listing(&self, region_id: RegionId) -> bool {
let gc_tracker = self.region_gc_tracker.lock().await;
let now = Instant::now();
if let Some(gc_info) = gc_tracker.get(&region_id) {
if let Some(last_full_listing) = gc_info.last_full_listing_time {
let elapsed = now.saturating_duration_since(last_full_listing);
elapsed >= self.config.full_file_listing_interval
} else {
// Never did full listing for this region, do it now
true
}
} else {
// First time GC for this region, do full listing
true
}
}
pub(crate) async fn update_full_listing_time(
&self,
region_id: RegionId,

View File

@@ -0,0 +1,55 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::key::table_route::PhysicalTableRouteValue;
use common_telemetry::warn;
use store_api::storage::RegionId;
use crate::gc::{Peer2Regions, Region2Peers};
pub fn table_route_to_region(
table_route: &PhysicalTableRouteValue,
table_regions: &[RegionId],
region_to_peer: &mut Region2Peers,
peer_to_regions: &mut Peer2Regions,
) {
for &region_id in table_regions {
let mut found = false;
// Find the region in the table route
for region_route in &table_route.region_routes {
if region_route.region.id == region_id
&& let Some(leader_peer) = &region_route.leader_peer
{
region_to_peer.insert(
region_id,
(leader_peer.clone(), region_route.follower_peers.clone()),
);
peer_to_regions
.entry(leader_peer.clone())
.or_default()
.insert(region_id);
found = true;
break;
}
}
if !found {
warn!(
"Failed to find region {} in table route or no leader peer found",
region_id,
);
}
}
}

View File

@@ -42,6 +42,8 @@ use store_api::storage::{ConcreteDataType, FileId, RegionId, TimeSeriesRowSelect
use crate::cache::cache_size::parquet_meta_size;
use crate::cache::file_cache::{FileType, IndexKey};
use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCacheRef};
#[cfg(feature = "vector_index")]
use crate::cache::index::vector_index::{VectorIndexCache, VectorIndexCacheRef};
use crate::cache::write_cache::WriteCacheRef;
use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
use crate::read::Batch;
@@ -247,6 +249,16 @@ impl CacheStrategy {
}
}
/// Calls [CacheManager::vector_index_cache()].
/// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
#[cfg(feature = "vector_index")]
pub fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
match self {
CacheStrategy::EnableAll(cache_manager) => cache_manager.vector_index_cache(),
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
}
}
/// Calls [CacheManager::puffin_metadata_cache()].
/// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
pub fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
@@ -303,6 +315,9 @@ pub struct CacheManager {
inverted_index_cache: Option<InvertedIndexCacheRef>,
/// Cache for bloom filter index.
bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
/// Cache for vector index.
#[cfg(feature = "vector_index")]
vector_index_cache: Option<VectorIndexCacheRef>,
/// Puffin metadata cache.
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
/// Cache for time series selectors.
@@ -434,6 +449,11 @@ impl CacheManager {
cache.invalidate_file(file_id.file_id());
}
#[cfg(feature = "vector_index")]
if let Some(cache) = &self.vector_index_cache {
cache.invalidate_file(file_id.file_id());
}
if let Some(cache) = &self.puffin_metadata_cache {
cache.remove(&file_id.to_string());
}
@@ -486,6 +506,11 @@ impl CacheManager {
self.bloom_filter_index_cache.as_ref()
}
#[cfg(feature = "vector_index")]
pub(crate) fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
self.vector_index_cache.as_ref()
}
pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
self.puffin_metadata_cache.as_ref()
}
@@ -646,6 +671,9 @@ impl CacheManagerBuilder {
self.index_content_size,
self.index_content_page_size,
);
#[cfg(feature = "vector_index")]
let vector_index_cache = (self.index_content_size != 0)
.then(|| Arc::new(VectorIndexCache::new(self.index_content_size)));
let index_result_cache = (self.index_result_cache_size != 0)
.then(|| IndexResultCache::new(self.index_result_cache_size));
let puffin_metadata_cache =
@@ -672,6 +700,8 @@ impl CacheManagerBuilder {
write_cache: self.write_cache,
inverted_index_cache: Some(Arc::new(inverted_index_cache)),
bloom_filter_index_cache: Some(Arc::new(bloom_filter_index_cache)),
#[cfg(feature = "vector_index")]
vector_index_cache,
puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
selector_result_cache,
index_result_cache,

View File

@@ -16,13 +16,11 @@
use std::mem;
use parquet::basic::ColumnOrder;
use parquet::file::metadata::{
FileMetaData, KeyValue, ParquetColumnIndex, ParquetMetaData, ParquetOffsetIndex,
RowGroupMetaData,
FileMetaData, ParquetColumnIndex, ParquetMetaData, ParquetOffsetIndex, RowGroupMetaData,
};
use parquet::file::page_index::column_index::ColumnIndexMetaData as Index;
use parquet::file::page_index::offset_index::PageLocation;
use parquet::file::page_index::index::Index;
use parquet::format::{ColumnOrder, KeyValue, PageLocation};
use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor, Type};
/// Returns estimated size of [ParquetMetaData].

View File

@@ -15,6 +15,8 @@
pub mod bloom_filter_index;
pub mod inverted_index;
pub mod result_cache;
#[cfg(feature = "vector_index")]
pub mod vector_index;
use std::future::Future;
use std::hash::Hash;

View File

@@ -0,0 +1,137 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use moka::notification::RemovalCause;
use moka::sync::Cache;
use roaring::RoaringBitmap;
use store_api::storage::{ColumnId, FileId, IndexVersion, VectorDistanceMetric, VectorIndexEngine};
use crate::metrics::{CACHE_BYTES, CACHE_EVICTION};
const VECTOR_INDEX_CACHE_TYPE: &str = "vector_index";
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct VectorIndexCacheKey {
file_id: FileId,
index_version: IndexVersion,
column_id: ColumnId,
}
impl VectorIndexCacheKey {
pub fn new(file_id: FileId, index_version: IndexVersion, column_id: ColumnId) -> Self {
Self {
file_id,
index_version,
column_id,
}
}
}
pub struct CachedVectorIndex {
pub engine: Box<dyn VectorIndexEngine>,
pub null_bitmap: RoaringBitmap,
pub size_bytes: usize,
pub dimensions: u32,
pub metric: VectorDistanceMetric,
pub total_rows: u64,
pub indexed_rows: u64,
}
impl CachedVectorIndex {
pub fn new(
engine: Box<dyn VectorIndexEngine>,
null_bitmap: RoaringBitmap,
dimensions: u32,
metric: VectorDistanceMetric,
total_rows: u64,
indexed_rows: u64,
) -> Self {
let size_bytes =
engine.memory_usage() + null_bitmap.serialized_size() + std::mem::size_of::<Self>();
Self {
engine,
null_bitmap,
size_bytes,
dimensions,
metric,
total_rows,
indexed_rows,
}
}
}
impl std::fmt::Debug for CachedVectorIndex {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CachedVectorIndex")
.field("size_bytes", &self.size_bytes)
.field("dimensions", &self.dimensions)
.field("metric", &self.metric)
.field("total_rows", &self.total_rows)
.field("indexed_rows", &self.indexed_rows)
.field("null_bitmap_len", &self.null_bitmap.len())
.finish()
}
}
pub struct VectorIndexCache {
inner: Cache<VectorIndexCacheKey, Arc<CachedVectorIndex>>,
}
pub type VectorIndexCacheRef = Arc<VectorIndexCache>;
impl VectorIndexCache {
pub fn new(capacity: u64) -> Self {
fn to_str(cause: RemovalCause) -> &'static str {
match cause {
RemovalCause::Expired => "expired",
RemovalCause::Explicit => "explicit",
RemovalCause::Replaced => "replaced",
RemovalCause::Size => "size",
}
}
let inner = Cache::builder()
.max_capacity(capacity)
.weigher(|_k, v: &Arc<CachedVectorIndex>| v.size_bytes as u32)
.eviction_listener(|_k, v, cause| {
CACHE_BYTES
.with_label_values(&[VECTOR_INDEX_CACHE_TYPE])
.sub(v.size_bytes as i64);
CACHE_EVICTION
.with_label_values(&[VECTOR_INDEX_CACHE_TYPE, to_str(cause)])
.inc();
})
.build();
Self { inner }
}
pub fn get(&self, key: &VectorIndexCacheKey) -> Option<Arc<CachedVectorIndex>> {
self.inner.get(key)
}
pub fn insert(&self, key: VectorIndexCacheKey, value: Arc<CachedVectorIndex>) {
CACHE_BYTES
.with_label_values(&[VECTOR_INDEX_CACHE_TYPE])
.add(value.size_bytes as i64);
self.inner.insert(key, value);
}
pub fn invalidate_file(&self, file_id: FileId) {
let _ = self
.inner
.invalidate_entries_if(move |k, _| k.file_id == file_id);
}
}

View File

@@ -81,7 +81,7 @@ mod apply_staging_manifest_test;
mod puffin_index;
use std::any::Any;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Instant;
@@ -303,7 +303,7 @@ impl MitoEngine {
pub async fn get_snapshot_of_file_refs(
&self,
file_handle_regions: impl IntoIterator<Item = RegionId>,
manifest_regions: HashMap<RegionId, Vec<RegionId>>,
related_regions: HashMap<RegionId, HashSet<RegionId>>,
) -> Result<FileRefsManifest> {
let file_ref_mgr = self.file_ref_manager();
@@ -315,15 +315,30 @@ impl MitoEngine {
.filter_map(|region_id| self.find_region(region_id))
.collect();
let related_regions: Vec<(MitoRegionRef, Vec<RegionId>)> = manifest_regions
.into_iter()
.filter_map(|(related_region, queries)| {
self.find_region(related_region).map(|r| (r, queries))
})
.collect();
let dst_region_to_src_regions: Vec<(MitoRegionRef, HashSet<RegionId>)> = {
let dst2src = related_regions
.into_iter()
.flat_map(|(src, dsts)| dsts.into_iter().map(move |dst| (dst, src)))
.fold(
HashMap::<RegionId, HashSet<RegionId>>::new(),
|mut acc, (k, v)| {
let entry = acc.entry(k).or_default();
entry.insert(v);
acc
},
);
let mut dst_region_to_src_regions = Vec::with_capacity(dst2src.len());
for (dst_region, srcs) in dst2src {
let Some(dst_region) = self.find_region(dst_region) else {
continue;
};
dst_region_to_src_regions.push((dst_region, srcs));
}
dst_region_to_src_regions
};
file_ref_mgr
.get_snapshot_of_file_refs(query_regions, related_regions)
.get_snapshot_of_file_refs(query_regions, dst_region_to_src_regions)
.await
}

View File

@@ -601,6 +601,14 @@ pub enum Error {
location: Location,
},
#[snafu(display("Invalid file metadata"))]
ConvertMetaData {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: parquet::errors::ParquetError,
},
#[snafu(display("Column not found, column: {column}"))]
ColumnNotFound {
column: String,
@@ -644,6 +652,14 @@ pub enum Error {
location: Location,
},
#[cfg(feature = "vector_index")]
#[snafu(display("Failed to apply vector index: {}", reason))]
ApplyVectorIndex {
reason: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to push index value"))]
PushIndexValue {
source: index::inverted_index::error::Error,
@@ -1268,6 +1284,7 @@ impl ErrorExt for Error {
| Join { .. }
| WorkerStopped { .. }
| Recv { .. }
| ConvertMetaData { .. }
| DecodeWal { .. }
| ComputeArrow { .. }
| BiErrors { .. }
@@ -1315,6 +1332,8 @@ impl ErrorExt for Error {
| PushIndexValue { source, .. }
| ApplyInvertedIndex { source, .. }
| IndexFinish { source, .. } => source.status_code(),
#[cfg(feature = "vector_index")]
ApplyVectorIndex { .. } => StatusCode::Internal,
PuffinReadBlob { source, .. }
| PuffinAddBlob { source, .. }
| PuffinInitStager { source, .. }

View File

@@ -134,6 +134,7 @@ async fn test_gc_worker_basic_truncate() {
let file_ref_manifest = FileRefsManifest {
file_refs: Default::default(),
manifest_version: [(region_id, version)].into(),
cross_region_refs: HashMap::new(),
};
let gc_worker = create_gc_worker(&engine, regions, &file_ref_manifest, true).await;
let report = gc_worker.run().await.unwrap();
@@ -232,6 +233,7 @@ async fn test_gc_worker_truncate_with_ref() {
)]
.into(),
manifest_version: [(region_id, version)].into(),
cross_region_refs: HashMap::new(),
};
let gc_worker = create_gc_worker(&engine, regions, &file_ref_manifest, true).await;
let report = gc_worker.run().await.unwrap();
@@ -313,6 +315,7 @@ async fn test_gc_worker_basic_compact() {
let file_ref_manifest = FileRefsManifest {
file_refs: Default::default(),
manifest_version: [(region_id, version)].into(),
cross_region_refs: HashMap::new(),
};
let gc_worker = create_gc_worker(&engine, regions, &file_ref_manifest, true).await;
@@ -399,6 +402,7 @@ async fn test_gc_worker_compact_with_ref() {
.collect(),
)]),
manifest_version: [(region_id, version)].into(),
cross_region_refs: HashMap::new(),
};
let gc_worker = create_gc_worker(&engine, regions, &file_ref_manifest, true).await;

View File

@@ -71,6 +71,7 @@ use crate::sst::index::IndexOutput;
use crate::sst::parquet::file_range::{PreFilterMode, row_group_contains_delete};
use crate::sst::parquet::flat_format::primary_key_column_index;
use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder, ReadFormat};
use crate::sst::parquet::helper::parse_parquet_metadata;
use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo};
use crate::sst::{SeriesEstimator, to_sst_arrow_schema};
@@ -1196,7 +1197,7 @@ impl BulkPartEncoder {
metrics.num_rows += total_rows;
let buf = Bytes::from(buf);
let parquet_metadata = Arc::new(file_metadata);
let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
let num_series = series_estimator.finish();
Ok(Some(EncodedBulkPart {
@@ -1231,7 +1232,7 @@ impl BulkPartEncoder {
};
let buf = Bytes::from(buf);
let parquet_metadata = Arc::new(file_metadata);
let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
Ok(Some(EncodedBulkPart {
data: buf,

View File

@@ -70,11 +70,15 @@ use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
#[cfg(feature = "vector_index")]
use crate::sst::index::vector_index::applier::{VectorIndexApplier, VectorIndexApplierRef};
use crate::sst::parquet::file_range::PreFilterMode;
use crate::sst::parquet::reader::ReaderMetrics;
/// Parallel scan channel size for flat format.
const FLAT_SCAN_CHANNEL_SIZE: usize = 2;
#[cfg(feature = "vector_index")]
const VECTOR_INDEX_OVERFETCH_MULTIPLIER: usize = 2;
/// A scanner scans a region and returns a [SendableRecordBatchStream].
pub(crate) enum Scanner {
@@ -498,6 +502,16 @@ impl ScanRegion {
self.build_fulltext_index_applier(&non_field_filters),
self.build_fulltext_index_applier(&field_filters),
];
#[cfg(feature = "vector_index")]
let vector_index_applier = self.build_vector_index_applier();
#[cfg(feature = "vector_index")]
let vector_index_k = self.request.vector_search.as_ref().map(|search| {
if self.request.filters.is_empty() {
search.k
} else {
search.k.saturating_mul(VECTOR_INDEX_OVERFETCH_MULTIPLIER)
}
});
let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
if flat_format {
@@ -523,6 +537,10 @@ impl ScanRegion {
.with_series_row_selector(self.request.series_row_selector)
.with_distribution(self.request.distribution)
.with_flat_format(flat_format);
#[cfg(feature = "vector_index")]
let input = input
.with_vector_index_applier(vector_index_applier)
.with_vector_index_k(vector_index_k);
#[cfg(feature = "enterprise")]
let input = if let Some(provider) = self.extension_range_provider {
@@ -667,6 +685,31 @@ impl ScanRegion {
.flatten()
.map(Arc::new)
}
/// Build the vector index applier from vector search request.
#[cfg(feature = "vector_index")]
fn build_vector_index_applier(&self) -> Option<VectorIndexApplierRef> {
let vector_search = self.request.vector_search.as_ref()?;
let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
let vector_index_cache = self.cache_strategy.vector_index_cache().cloned();
let applier = VectorIndexApplier::new(
self.access_layer.table_dir().to_string(),
self.access_layer.path_type(),
self.access_layer.object_store().clone(),
self.access_layer.puffin_manager_factory().clone(),
vector_search.column_id,
vector_search.query_vector.clone(),
vector_search.metric,
)
.with_file_cache(file_cache)
.with_puffin_metadata_cache(puffin_metadata_cache)
.with_vector_index_cache(vector_index_cache);
Some(Arc::new(applier))
}
}
/// Returns true if the time range of a SST `file` matches the `predicate`.
@@ -708,6 +751,12 @@ pub struct ScanInput {
inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
/// Vector index applier for KNN search.
#[cfg(feature = "vector_index")]
pub(crate) vector_index_applier: Option<VectorIndexApplierRef>,
/// Over-fetched k for vector index scan.
#[cfg(feature = "vector_index")]
pub(crate) vector_index_k: Option<usize>,
/// Start time of the query.
pub(crate) query_start: Option<Instant>,
/// The region is using append mode.
@@ -747,6 +796,10 @@ impl ScanInput {
inverted_index_appliers: [None, None],
bloom_filter_index_appliers: [None, None],
fulltext_index_appliers: [None, None],
#[cfg(feature = "vector_index")]
vector_index_applier: None,
#[cfg(feature = "vector_index")]
vector_index_k: None,
query_start: None,
append_mode: false,
filter_deleted: true,
@@ -853,6 +906,25 @@ impl ScanInput {
self
}
/// Sets vector index applier for KNN search.
#[cfg(feature = "vector_index")]
#[must_use]
pub(crate) fn with_vector_index_applier(
mut self,
applier: Option<VectorIndexApplierRef>,
) -> Self {
self.vector_index_applier = applier;
self
}
/// Sets over-fetched k for vector index scan.
#[cfg(feature = "vector_index")]
#[must_use]
pub(crate) fn with_vector_index_k(mut self, k: Option<usize>) -> Self {
self.vector_index_k = k;
self
}
/// Sets start time of the query.
#[must_use]
pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
@@ -988,7 +1060,7 @@ impl ScanInput {
let predicate = self.predicate_for_file(file);
let filter_mode = pre_filter_mode(self.append_mode, self.merge_mode);
let decode_pk_values = !self.compaction && self.mapper.has_tags();
let res = self
let reader = self
.access_layer
.read_sst(file.clone())
.predicate(predicate)
@@ -996,7 +1068,15 @@ impl ScanInput {
.cache(self.cache_strategy.clone())
.inverted_index_appliers(self.inverted_index_appliers.clone())
.bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
.fulltext_index_appliers(self.fulltext_index_appliers.clone())
.fulltext_index_appliers(self.fulltext_index_appliers.clone());
#[cfg(feature = "vector_index")]
let reader = {
let mut reader = reader;
reader =
reader.vector_index_applier(self.vector_index_applier.clone(), self.vector_index_k);
reader
};
let res = reader
.expected_metadata(Some(self.mapper.metadata().clone()))
.flat_format(self.flat_format)
.compaction(self.compaction)

View File

@@ -151,6 +151,8 @@ pub(crate) struct ScanMetricsSet {
rg_minmax_filtered: usize,
/// Number of row groups filtered by bloom filter index.
rg_bloom_filtered: usize,
/// Number of row groups filtered by vector index.
rg_vector_filtered: usize,
/// Number of rows in row group before filtering.
rows_before_filter: usize,
/// Number of rows in row group filtered by fulltext index.
@@ -159,6 +161,8 @@ pub(crate) struct ScanMetricsSet {
rows_inverted_filtered: usize,
/// Number of rows in row group filtered by bloom filter index.
rows_bloom_filtered: usize,
/// Number of rows filtered by vector index.
rows_vector_filtered: usize,
/// Number of rows filtered by precise filter.
rows_precise_filtered: usize,
/// Number of record batches read from SST.
@@ -255,10 +259,12 @@ impl fmt::Debug for ScanMetricsSet {
rg_inverted_filtered,
rg_minmax_filtered,
rg_bloom_filtered,
rg_vector_filtered,
rows_before_filter,
rows_fulltext_filtered,
rows_inverted_filtered,
rows_bloom_filtered,
rows_vector_filtered,
rows_precise_filtered,
num_sst_record_batches,
num_sst_batches,
@@ -320,6 +326,9 @@ impl fmt::Debug for ScanMetricsSet {
if *rg_bloom_filtered > 0 {
write!(f, ", \"rg_bloom_filtered\":{rg_bloom_filtered}")?;
}
if *rg_vector_filtered > 0 {
write!(f, ", \"rg_vector_filtered\":{rg_vector_filtered}")?;
}
if *rows_fulltext_filtered > 0 {
write!(f, ", \"rows_fulltext_filtered\":{rows_fulltext_filtered}")?;
}
@@ -329,6 +338,9 @@ impl fmt::Debug for ScanMetricsSet {
if *rows_bloom_filtered > 0 {
write!(f, ", \"rows_bloom_filtered\":{rows_bloom_filtered}")?;
}
if *rows_vector_filtered > 0 {
write!(f, ", \"rows_vector_filtered\":{rows_vector_filtered}")?;
}
if *rows_precise_filtered > 0 {
write!(f, ", \"rows_precise_filtered\":{rows_precise_filtered}")?;
}
@@ -500,10 +512,12 @@ impl ScanMetricsSet {
rg_inverted_filtered,
rg_minmax_filtered,
rg_bloom_filtered,
rg_vector_filtered,
rows_total,
rows_fulltext_filtered,
rows_inverted_filtered,
rows_bloom_filtered,
rows_vector_filtered,
rows_precise_filtered,
inverted_index_apply_metrics,
bloom_filter_apply_metrics,
@@ -525,11 +539,13 @@ impl ScanMetricsSet {
self.rg_inverted_filtered += *rg_inverted_filtered;
self.rg_minmax_filtered += *rg_minmax_filtered;
self.rg_bloom_filtered += *rg_bloom_filtered;
self.rg_vector_filtered += *rg_vector_filtered;
self.rows_before_filter += *rows_total;
self.rows_fulltext_filtered += *rows_fulltext_filtered;
self.rows_inverted_filtered += *rows_inverted_filtered;
self.rows_bloom_filtered += *rows_bloom_filtered;
self.rows_vector_filtered += *rows_vector_filtered;
self.rows_precise_filtered += *rows_precise_filtered;
self.num_sst_record_batches += *num_record_batches;
@@ -631,6 +647,10 @@ impl ScanMetricsSet {
READ_ROW_GROUPS_TOTAL
.with_label_values(&["bloom_filter_index_filtered"])
.inc_by(self.rg_bloom_filtered as u64);
#[cfg(feature = "vector_index")]
READ_ROW_GROUPS_TOTAL
.with_label_values(&["vector_index_filtered"])
.inc_by(self.rg_vector_filtered as u64);
PRECISE_FILTER_ROWS_TOTAL
.with_label_values(&["parquet"])
@@ -647,6 +667,10 @@ impl ScanMetricsSet {
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["bloom_filter_index_filtered"])
.inc_by(self.rows_bloom_filtered as u64);
#[cfg(feature = "vector_index")]
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["vector_index_filtered"])
.inc_by(self.rows_vector_filtered as u64);
}
}

View File

@@ -344,6 +344,12 @@ impl FileMeta {
.contains(&IndexType::BloomFilterIndex)
}
/// Returns true if the file has a vector index.
#[cfg(feature = "vector_index")]
pub fn vector_index_available(&self) -> bool {
self.available_indexes.contains(&IndexType::VectorIndex)
}
pub fn index_file_size(&self) -> u64 {
self.index_file_size
}

View File

@@ -87,7 +87,7 @@ impl FileReferenceManager {
pub(crate) async fn get_snapshot_of_file_refs(
&self,
query_regions_for_mem: Vec<MitoRegionRef>,
related_regions_in_manifest: Vec<(MitoRegionRef, Vec<RegionId>)>,
dst_region_to_src_regions: Vec<(MitoRegionRef, HashSet<RegionId>)>,
) -> Result<FileRefsManifest> {
let mut ref_files = HashMap::new();
// get from in memory file handles
@@ -99,12 +99,17 @@ impl FileReferenceManager {
let mut manifest_version = HashMap::new();
let mut cross_region_refs = HashMap::new();
// get file refs from related regions' manifests
for (related_region, queries) in &related_regions_in_manifest {
let queries = queries.iter().cloned().collect::<HashSet<_>>();
let manifest = related_region.manifest_ctx.manifest().await;
for (dst_region, src_regions) in &dst_region_to_src_regions {
let manifest = dst_region.manifest_ctx.manifest().await;
for meta in manifest.files.values() {
if queries.contains(&meta.region_id) {
if src_regions.contains(&meta.region_id) {
cross_region_refs
.entry(meta.region_id)
.or_insert_with(HashSet::new)
.insert(dst_region.region_id());
// since gc couldn't happen together with repartition
// (both the queries and related_region acquire region read lock), no need to worry about
// staging manifest in repartition here.
@@ -119,7 +124,7 @@ impl FileReferenceManager {
}
}
// not sure if related region's manifest version is needed, but record it for now.
manifest_version.insert(related_region.region_id(), manifest.manifest_version);
manifest_version.insert(dst_region.region_id(), manifest.manifest_version);
}
for r in &query_regions_for_mem {
@@ -138,6 +143,7 @@ impl FileReferenceManager {
Ok(FileRefsManifest {
file_refs: ref_files,
manifest_version,
cross_region_refs,
})
}

View File

@@ -0,0 +1,650 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Vector index applier for KNN search.
use std::sync::Arc;
use common_base::range_read::RangeReader;
use common_telemetry::warn;
use index::vector::distance_metric_to_usearch;
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
use puffin::puffin_manager::{PuffinManager, PuffinReader};
use roaring::RoaringBitmap;
use snafu::ResultExt;
use store_api::storage::{ColumnId, VectorDistanceMetric};
use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
use crate::cache::index::vector_index::{
CachedVectorIndex, VectorIndexCacheKey, VectorIndexCacheRef,
};
use crate::error::{ApplyVectorIndexSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result};
use crate::sst::file::RegionIndexId;
use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
use crate::sst::index::trigger_index_background_download;
use crate::sst::index::vector_index::creator::VectorIndexConfig;
use crate::sst::index::vector_index::format::VectorIndexBlobHeader;
use crate::sst::index::vector_index::{INDEX_BLOB_TYPE, engine};
/// Result of applying vector index.
#[derive(Debug)]
pub struct VectorIndexApplyOutput {
/// Row offsets in the SST file.
pub row_offsets: Vec<u64>,
}
/// Vector index applier for KNN search against SST blobs.
pub struct VectorIndexApplier {
table_dir: String,
path_type: store_api::region_request::PathType,
object_store: object_store::ObjectStore,
puffin_manager_factory: PuffinManagerFactory,
file_cache: Option<FileCacheRef>,
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
vector_index_cache: Option<VectorIndexCacheRef>,
column_id: ColumnId,
query_vector: Vec<f32>,
metric: VectorDistanceMetric,
}
pub type VectorIndexApplierRef = Arc<VectorIndexApplier>;
impl VectorIndexApplier {
pub fn new(
table_dir: String,
path_type: store_api::region_request::PathType,
object_store: object_store::ObjectStore,
puffin_manager_factory: PuffinManagerFactory,
column_id: ColumnId,
query_vector: Vec<f32>,
metric: VectorDistanceMetric,
) -> Self {
Self {
table_dir,
path_type,
object_store,
puffin_manager_factory,
file_cache: None,
puffin_metadata_cache: None,
vector_index_cache: None,
column_id,
query_vector,
metric,
}
}
pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
self.file_cache = file_cache;
self
}
pub fn with_puffin_metadata_cache(
mut self,
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
) -> Self {
self.puffin_metadata_cache = puffin_metadata_cache;
self
}
pub fn with_vector_index_cache(mut self, cache: Option<VectorIndexCacheRef>) -> Self {
self.vector_index_cache = cache;
self
}
/// Applies vector index to the file and returns candidates.
///
/// This method loads the vector index blob (from cache or remote), runs
/// a KNN search against the indexed vectors, and maps the HNSW keys back
/// to row offsets in the SST file. It returns only row offsets; callers
/// are responsible for any higher-level ordering or limit enforcement.
pub async fn apply_with_k(
&self,
file_id: RegionIndexId,
file_size_hint: Option<u64>,
k: usize,
) -> Result<VectorIndexApplyOutput> {
if k == 0 {
return Ok(VectorIndexApplyOutput {
row_offsets: Vec::new(),
});
}
let index = self.load_or_read_index(file_id, file_size_hint).await?;
let Some(index) = index else {
return Ok(VectorIndexApplyOutput {
row_offsets: Vec::new(),
});
};
if self.query_vector.len() != index.dimensions as usize {
return ApplyVectorIndexSnafu {
reason: format!(
"Query vector dimension {} does not match index dimension {}",
self.query_vector.len(),
index.dimensions
),
}
.fail();
}
if self.metric != index.metric {
return ApplyVectorIndexSnafu {
reason: format!(
"Query metric {} does not match index metric {}",
self.metric, index.metric
),
}
.fail();
}
if index.indexed_rows == 0 {
return Ok(VectorIndexApplyOutput {
row_offsets: Vec::new(),
});
}
let matches = index
.engine
.search(&self.query_vector, k.min(index.indexed_rows as usize))
.map_err(|e| {
ApplyVectorIndexSnafu {
reason: e.to_string(),
}
.build()
})?;
let row_offsets = map_hnsw_keys_to_row_offsets(
&index.null_bitmap,
index.total_rows,
index.indexed_rows,
matches.keys,
)?;
Ok(VectorIndexApplyOutput { row_offsets })
}
async fn load_or_read_index(
&self,
file_id: RegionIndexId,
file_size_hint: Option<u64>,
) -> Result<Option<Arc<CachedVectorIndex>>> {
let cache_key =
VectorIndexCacheKey::new(file_id.file_id(), file_id.version, self.column_id);
if let Some(cache) = &self.vector_index_cache
&& let Some(cached) = cache.get(&cache_key)
{
return Ok(Some(cached));
}
let reader = match self.cached_blob_reader(file_id, file_size_hint).await {
Ok(Some(reader)) => reader,
Ok(None) => self.remote_blob_reader(file_id, file_size_hint).await?,
Err(err) => {
if is_blob_not_found(&err) {
self.remote_blob_reader(file_id, file_size_hint).await?
} else {
warn!(err; "Failed to read cached vector index blob, fallback to remote");
self.remote_blob_reader(file_id, file_size_hint).await?
}
}
};
let blob_data = read_all_blob(reader, file_size_hint).await?;
if blob_data.is_empty() {
return Ok(None);
}
let cached = Arc::new(parse_vector_index_blob(&blob_data)?);
if let Some(cache) = &self.vector_index_cache {
cache.insert(cache_key, cached.clone());
}
Ok(Some(cached))
}
async fn cached_blob_reader(
&self,
file_id: RegionIndexId,
file_size_hint: Option<u64>,
) -> Result<Option<BlobReader>> {
let Some(file_cache) = &self.file_cache else {
return Ok(None);
};
let index_key = IndexKey::new(
file_id.region_id(),
file_id.file_id(),
FileType::Puffin(file_id.version),
);
if file_cache.get(index_key).await.is_none() {
return Ok(None);
}
let puffin_manager = self.puffin_manager_factory.build(
file_cache.local_store(),
WriteCachePathProvider::new(file_cache.clone()),
);
let blob_name = column_blob_name(self.column_id);
let reader = puffin_manager
.reader(&file_id)
.await
.context(PuffinBuildReaderSnafu)?
.with_file_size_hint(file_size_hint)
.blob(&blob_name)
.await
.context(PuffinReadBlobSnafu)?
.reader()
.await
.context(PuffinBuildReaderSnafu)?;
Ok(Some(reader))
}
async fn remote_blob_reader(
&self,
file_id: RegionIndexId,
file_size_hint: Option<u64>,
) -> Result<BlobReader> {
let path_factory = RegionFilePathFactory::new(self.table_dir.clone(), self.path_type);
trigger_index_background_download(
self.file_cache.as_ref(),
&file_id,
file_size_hint,
&path_factory,
&self.object_store,
);
let puffin_manager = self
.puffin_manager_factory
.build(self.object_store.clone(), path_factory)
.with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
let blob_name = column_blob_name(self.column_id);
puffin_manager
.reader(&file_id)
.await
.context(PuffinBuildReaderSnafu)?
.with_file_size_hint(file_size_hint)
.blob(&blob_name)
.await
.context(PuffinReadBlobSnafu)?
.reader()
.await
.context(PuffinBuildReaderSnafu)
}
}
fn column_blob_name(column_id: ColumnId) -> String {
format!("{INDEX_BLOB_TYPE}-{}", column_id)
}
fn is_blob_not_found(err: &crate::error::Error) -> bool {
matches!(
err,
crate::error::Error::PuffinReadBlob {
source: puffin::error::Error::BlobNotFound { .. },
..
}
)
}
async fn read_all_blob(reader: BlobReader, file_size_hint: Option<u64>) -> Result<Vec<u8>> {
let metadata = reader.metadata().await.map_err(|e| {
ApplyVectorIndexSnafu {
reason: format!("Failed to read vector index metadata: {}", e),
}
.build()
})?;
if let Some(limit) = file_size_hint
&& metadata.content_length > limit
{
return ApplyVectorIndexSnafu {
reason: format!(
"Vector index blob size {} exceeds file size hint {}",
metadata.content_length, limit
),
}
.fail();
}
let bytes = reader.read(0..metadata.content_length).await.map_err(|e| {
ApplyVectorIndexSnafu {
reason: format!("Failed to read vector index data: {}", e),
}
.build()
})?;
Ok(bytes.to_vec())
}
fn parse_vector_index_blob(data: &[u8]) -> Result<CachedVectorIndex> {
let (header, mut offset) = VectorIndexBlobHeader::decode(data).map_err(|e| {
ApplyVectorIndexSnafu {
reason: e.to_string(),
}
.build()
})?;
let null_bitmap_len = header.null_bitmap_len as usize;
if data.len() < offset + null_bitmap_len {
return ApplyVectorIndexSnafu {
reason: "Vector index blob truncated while reading null bitmap".to_string(),
}
.fail();
}
let null_bitmap_bytes = &data[offset..offset + null_bitmap_len];
offset += null_bitmap_len;
let null_bitmap = RoaringBitmap::deserialize_from(null_bitmap_bytes).map_err(|e| {
ApplyVectorIndexSnafu {
reason: format!("Failed to deserialize null bitmap: {}", e),
}
.build()
})?;
let index_bytes = &data[offset..];
let config = VectorIndexConfig {
engine: header.engine_type,
dim: header.dim as usize,
metric: distance_metric_to_usearch(header.metric),
distance_metric: header.metric,
connectivity: header.connectivity as usize,
expansion_add: header.expansion_add as usize,
expansion_search: header.expansion_search as usize,
};
let engine = engine::load_engine(header.engine_type, &config, index_bytes).map_err(|e| {
ApplyVectorIndexSnafu {
reason: e.to_string(),
}
.build()
})?;
Ok(CachedVectorIndex::new(
engine,
null_bitmap,
header.dim,
header.metric,
header.total_rows,
header.indexed_rows,
))
}
fn map_hnsw_keys_to_row_offsets(
null_bitmap: &RoaringBitmap,
total_rows: u64,
indexed_rows: u64,
keys: Vec<u64>,
) -> Result<Vec<u64>> {
if total_rows == 0 {
return Ok(Vec::new());
}
let total_rows_u32 = u32::try_from(total_rows).map_err(|_| {
ApplyVectorIndexSnafu {
reason: format!("Total rows {} exceeds u32::MAX", total_rows),
}
.build()
})?;
let mut row_offsets = Vec::with_capacity(keys.len());
for key in keys {
let offset = hnsw_key_to_row_offset(null_bitmap, total_rows_u32, indexed_rows, key)?;
row_offsets.push(offset as u64);
}
Ok(row_offsets)
}
fn hnsw_key_to_row_offset(
null_bitmap: &RoaringBitmap,
total_rows: u32,
indexed_rows: u64,
key: u64,
) -> Result<u32> {
if total_rows == 0 {
return ApplyVectorIndexSnafu {
reason: "Total rows is zero".to_string(),
}
.fail();
}
if key >= indexed_rows {
return ApplyVectorIndexSnafu {
reason: format!("HNSW key {} exceeds indexed rows {}", key, indexed_rows),
}
.fail();
}
if null_bitmap.is_empty() {
return Ok(key as u32);
}
let mut left: u32 = 0;
let mut right: u32 = total_rows - 1;
while left <= right {
let mid = left + (right - left) / 2;
let nulls_before = null_bitmap.rank(mid);
let non_nulls = (mid as u64 + 1).saturating_sub(nulls_before);
if non_nulls > key {
if mid == 0 {
break;
}
right = mid - 1;
} else {
left = mid + 1;
}
}
if left >= total_rows {
return ApplyVectorIndexSnafu {
reason: "Failed to map HNSW key to row offset".to_string(),
}
.fail();
}
Ok(left)
}
#[cfg(test)]
mod tests {
use common_test_util::temp_dir::TempDir;
use futures::io::Cursor;
use object_store::ObjectStore;
use object_store::services::Memory;
use puffin::puffin_manager::PuffinWriter;
use store_api::region_request::PathType;
use store_api::storage::{ColumnId, FileId, VectorDistanceMetric, VectorIndexEngineType};
use super::*;
use crate::access_layer::RegionFilePathFactory;
use crate::sst::file::RegionFileId;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::sst::index::vector_index::creator::VectorIndexConfig;
async fn build_applier_with_blob(
blob: Vec<u8>,
column_id: ColumnId,
query_vector: Vec<f32>,
metric: VectorDistanceMetric,
) -> (TempDir, VectorIndexApplier, RegionIndexId, u64) {
let (dir, puffin_manager_factory) =
PuffinManagerFactory::new_for_test_async("test_vector_index_applier_").await;
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
let file_id = RegionFileId::new(0.into(), FileId::random());
let index_id = RegionIndexId::new(file_id, 0);
let table_dir = "table_dir".to_string();
let puffin_manager = puffin_manager_factory.build(
object_store.clone(),
RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
);
let mut writer = puffin_manager.writer(&index_id).await.unwrap();
let blob_name = column_blob_name(column_id);
let _bytes_written = writer
.put_blob(
blob_name.as_str(),
Cursor::new(blob),
Default::default(),
Default::default(),
)
.await
.unwrap();
let file_size = writer.finish().await.unwrap();
let applier = VectorIndexApplier::new(
table_dir,
PathType::Bare,
object_store,
puffin_manager_factory,
column_id,
query_vector,
metric,
);
(dir, applier, index_id, file_size)
}
fn build_blob_with_vectors(
config: &VectorIndexConfig,
vectors: Vec<(u64, Vec<f32>)>,
null_bitmap: &RoaringBitmap,
total_rows: u64,
indexed_rows: u64,
) -> Vec<u8> {
let mut engine = engine::create_engine(config.engine, config).unwrap();
for (key, vector) in vectors {
engine.add(key, &vector).unwrap();
}
let index_size = engine.serialized_length();
let mut index_bytes = vec![0u8; index_size];
engine.save_to_buffer(&mut index_bytes).unwrap();
let mut null_bitmap_bytes = Vec::new();
null_bitmap.serialize_into(&mut null_bitmap_bytes).unwrap();
let header = VectorIndexBlobHeader::new(
config.engine,
config.dim as u32,
config.distance_metric,
config.connectivity as u16,
config.expansion_add as u16,
config.expansion_search as u16,
total_rows,
indexed_rows,
null_bitmap_bytes.len() as u32,
)
.unwrap();
let mut blob = Vec::new();
header.encode_into(&mut blob);
blob.extend_from_slice(&null_bitmap_bytes);
blob.extend_from_slice(&index_bytes);
blob
}
#[test]
fn test_hnsw_key_to_row_offset_with_nulls() {
let mut bitmap = RoaringBitmap::new();
bitmap.insert(1);
bitmap.insert(3);
assert_eq!(hnsw_key_to_row_offset(&bitmap, 6, 4, 0).unwrap(), 0);
assert_eq!(hnsw_key_to_row_offset(&bitmap, 6, 4, 1).unwrap(), 2);
assert_eq!(hnsw_key_to_row_offset(&bitmap, 6, 4, 2).unwrap(), 4);
}
#[test]
fn test_hnsw_key_to_row_offset_without_nulls() {
let bitmap = RoaringBitmap::new();
assert_eq!(hnsw_key_to_row_offset(&bitmap, 4, 4, 3).unwrap(), 3);
}
#[test]
fn test_hnsw_key_to_row_offset_out_of_range() {
let bitmap = RoaringBitmap::new();
assert!(hnsw_key_to_row_offset(&bitmap, 4, 4, 4).is_err());
}
#[test]
fn test_map_hnsw_keys_to_row_offsets_multiple_keys() {
let bitmap = RoaringBitmap::new();
let offsets = map_hnsw_keys_to_row_offsets(&bitmap, 4, 4, vec![0, 2, 3]).unwrap();
assert_eq!(offsets, vec![0, 2, 3]);
}
#[tokio::test]
async fn test_apply_with_k_returns_offsets() {
let config = VectorIndexConfig {
engine: VectorIndexEngineType::Usearch,
dim: 2,
metric: distance_metric_to_usearch(VectorDistanceMetric::L2sq),
distance_metric: VectorDistanceMetric::L2sq,
connectivity: 16,
expansion_add: 128,
expansion_search: 64,
};
let mut null_bitmap = RoaringBitmap::new();
null_bitmap.insert(1);
let blob = build_blob_with_vectors(
&config,
vec![(0, vec![1.0, 0.0]), (1, vec![0.0, 1.0])],
&null_bitmap,
3,
2,
);
let (_dir, applier, index_id, size_bytes) =
build_applier_with_blob(blob, 1, vec![1.0, 0.0], VectorDistanceMetric::L2sq).await;
let output = applier
.apply_with_k(index_id, Some(size_bytes), 2)
.await
.unwrap();
assert_eq!(output.row_offsets, vec![0, 2]);
}
#[tokio::test]
async fn test_apply_with_k_dimension_mismatch() {
let config = VectorIndexConfig {
engine: VectorIndexEngineType::Usearch,
dim: 2,
metric: distance_metric_to_usearch(VectorDistanceMetric::L2sq),
distance_metric: VectorDistanceMetric::L2sq,
connectivity: 16,
expansion_add: 128,
expansion_search: 64,
};
let null_bitmap = RoaringBitmap::new();
let blob = build_blob_with_vectors(&config, vec![(0, vec![1.0, 0.0])], &null_bitmap, 1, 1);
let (_dir, applier, index_id, size_bytes) =
build_applier_with_blob(blob, 1, vec![1.0, 0.0, 0.0], VectorDistanceMetric::L2sq).await;
let res = applier.apply_with_k(index_id, Some(size_bytes), 1).await;
assert!(res.is_err());
}
#[tokio::test]
async fn test_apply_with_k_empty_blob() {
let config = VectorIndexConfig {
engine: VectorIndexEngineType::Usearch,
dim: 1,
metric: distance_metric_to_usearch(VectorDistanceMetric::L2sq),
distance_metric: VectorDistanceMetric::L2sq,
connectivity: 16,
expansion_add: 128,
expansion_search: 64,
};
let null_bitmap = RoaringBitmap::new();
let blob = build_blob_with_vectors(&config, Vec::new(), &null_bitmap, 0, 0);
let (_dir, applier, index_id, size_bytes) =
build_applier_with_blob(blob, 1, vec![1.0], VectorDistanceMetric::L2sq).await;
let output = applier
.apply_with_k(index_id, Some(size_bytes), 1)
.await
.unwrap();
assert!(output.row_offsets.is_empty());
}
}

View File

@@ -44,6 +44,9 @@ use crate::sst::index::intermediate::{
};
use crate::sst::index::puffin_manager::SstPuffinWriter;
use crate::sst::index::statistics::{ByteCount, RowCount, Statistics};
use crate::sst::index::vector_index::format::{
VECTOR_INDEX_BLOB_HEADER_SIZE, VectorIndexBlobHeader,
};
use crate::sst::index::vector_index::util::bytes_to_f32_slice;
use crate::sst::index::vector_index::{INDEX_BLOB_TYPE, engine};
@@ -323,6 +326,7 @@ impl VectorIndexer {
for (col_id, creator) in &mut self.creators {
let Some(values) = batch.field_col_value(*col_id) else {
creator.add_nulls(n);
continue;
};
@@ -561,36 +565,12 @@ impl VectorIndexer {
creator.save_to_buffer(&mut index_bytes)?;
// Header size: version(1) + engine(1) + dim(4) + metric(1) +
// connectivity(2) + expansion_add(2) + expansion_search(2) +
// total_rows(8) + indexed_rows(8) + bitmap_len(4) = 33 bytes
/// Size of the vector index blob header in bytes.
/// Header format: version(1) + engine(1) + dim(4) + metric(1) +
/// connectivity(2) + expansion_add(2) + expansion_search(2) +
/// total_rows(8) + indexed_rows(8) + bitmap_len(4) = 33 bytes
const VECTOR_INDEX_BLOB_HEADER_SIZE: usize = 33;
// connectivity(2) + expansion_add(2) + expansion_search(2) +
// total_rows(8) + indexed_rows(8) + bitmap_len(4) = 33 bytes.
let total_size =
VECTOR_INDEX_BLOB_HEADER_SIZE + null_bitmap_bytes.len() + index_bytes.len();
let mut blob_data = Vec::with_capacity(total_size);
// Write version (1 byte)
blob_data.push(1u8);
// Write engine type (1 byte)
blob_data.push(creator.engine_type().as_u8());
// Write dimension (4 bytes, little-endian)
blob_data.extend_from_slice(&(creator.config.dim as u32).to_le_bytes());
// Write metric (1 byte)
blob_data.push(creator.metric().as_u8());
// Write connectivity/M (2 bytes, little-endian)
blob_data.extend_from_slice(&(creator.config.connectivity as u16).to_le_bytes());
// Write expansion_add/ef_construction (2 bytes, little-endian)
blob_data.extend_from_slice(&(creator.config.expansion_add as u16).to_le_bytes());
// Write expansion_search/ef_search (2 bytes, little-endian)
blob_data.extend_from_slice(&(creator.config.expansion_search as u16).to_le_bytes());
// Write total_rows (8 bytes, little-endian)
blob_data.extend_from_slice(&creator.current_row_offset.to_le_bytes());
// Write indexed_rows (8 bytes, little-endian)
blob_data.extend_from_slice(&creator.next_hnsw_key.to_le_bytes());
// Write NULL bitmap length (4 bytes, little-endian)
let bitmap_len: u32 = null_bitmap_bytes.len().try_into().map_err(|_| {
VectorIndexBuildSnafu {
reason: format!(
@@ -601,7 +581,24 @@ impl VectorIndexer {
}
.build()
})?;
blob_data.extend_from_slice(&bitmap_len.to_le_bytes());
let header = VectorIndexBlobHeader::new(
creator.engine_type(),
creator.config.dim as u32,
creator.metric(),
creator.config.connectivity as u16,
creator.config.expansion_add as u16,
creator.config.expansion_search as u16,
creator.current_row_offset,
creator.next_hnsw_key,
bitmap_len,
)
.map_err(|e| {
VectorIndexFinishSnafu {
reason: e.to_string(),
}
.build()
})?;
header.encode_into(&mut blob_data);
// Write NULL bitmap
blob_data.extend_from_slice(&null_bitmap_bytes);
// Write vector index
@@ -686,7 +683,78 @@ impl VectorIndexer {
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::SemanticType;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::value::ValueRef;
use datatypes::vectors::{TimestampMillisecondVector, UInt8Vector, UInt64Vector};
use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
use store_api::storage::{ColumnId, FileId, RegionId};
use super::*;
use crate::read::BatchColumn;
fn mock_region_metadata_with_vector() -> RegionMetadataRef {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("tag", ConcreteDataType::int64_datatype(), true),
semantic_type: SemanticType::Tag,
column_id: 2,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("vec", ConcreteDataType::vector_datatype(2), true),
semantic_type: SemanticType::Field,
column_id: 3,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"field_u64",
ConcreteDataType::uint64_datatype(),
true,
),
semantic_type: SemanticType::Field,
column_id: 4,
})
.primary_key(vec![2]);
Arc::new(builder.build().unwrap())
}
fn new_batch_missing_vector_column(column_id: ColumnId, rows: usize) -> Batch {
let fields = vec![(0, SortField::new(ConcreteDataType::int64_datatype()))];
let codec = DensePrimaryKeyCodec::with_fields(fields);
let primary_key = codec.encode([ValueRef::Int64(1)].into_iter()).unwrap();
let field = BatchColumn {
column_id,
data: Arc::new(UInt64Vector::from_iter_values(0..rows as u64)),
};
Batch::new(
primary_key,
Arc::new(TimestampMillisecondVector::from_values(
(0..rows).map(|i| i as i64).collect::<Vec<_>>(),
)),
Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(0, rows))),
Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(1, rows))),
vec![field],
)
.unwrap()
}
#[test]
fn test_vector_index_creator() {
@@ -837,23 +905,24 @@ mod tests {
let mut index_bytes = vec![0u8; index_size];
creator.save_to_buffer(&mut index_bytes).unwrap();
// Header: 33 bytes
let header_size = 33;
let total_size = header_size + null_bitmap_bytes.len() + index_bytes.len();
let total_size =
VECTOR_INDEX_BLOB_HEADER_SIZE + null_bitmap_bytes.len() + index_bytes.len();
let mut blob_data = Vec::with_capacity(total_size);
// Write header fields
blob_data.push(1u8); // version
blob_data.push(creator.engine_type().as_u8()); // engine type
blob_data.extend_from_slice(&(creator.config.dim as u32).to_le_bytes()); // dimension
blob_data.push(creator.metric().as_u8()); // metric
blob_data.extend_from_slice(&(creator.config.connectivity as u16).to_le_bytes());
blob_data.extend_from_slice(&(creator.config.expansion_add as u16).to_le_bytes());
blob_data.extend_from_slice(&(creator.config.expansion_search as u16).to_le_bytes());
blob_data.extend_from_slice(&creator.current_row_offset.to_le_bytes()); // total_rows
blob_data.extend_from_slice(&creator.next_hnsw_key.to_le_bytes()); // indexed_rows
let bitmap_len: u32 = null_bitmap_bytes.len().try_into().unwrap();
blob_data.extend_from_slice(&bitmap_len.to_le_bytes());
let header = VectorIndexBlobHeader::new(
creator.engine_type(),
creator.config.dim as u32,
creator.metric(),
creator.config.connectivity as u16,
creator.config.expansion_add as u16,
creator.config.expansion_search as u16,
creator.current_row_offset,
creator.next_hnsw_key,
bitmap_len,
)
.unwrap();
header.encode_into(&mut blob_data);
blob_data.extend_from_slice(&null_bitmap_bytes);
blob_data.extend_from_slice(&index_bytes);
@@ -861,60 +930,62 @@ mod tests {
assert_eq!(blob_data.len(), total_size);
// Parse header and verify values
assert_eq!(blob_data[0], 1); // version
assert_eq!(blob_data[1], VectorIndexEngineType::Usearch.as_u8()); // engine
let dim = u32::from_le_bytes([blob_data[2], blob_data[3], blob_data[4], blob_data[5]]);
assert_eq!(dim, 4);
let metric = blob_data[6];
let (decoded, header_size) = VectorIndexBlobHeader::decode(&blob_data).unwrap();
assert_eq!(header_size, VECTOR_INDEX_BLOB_HEADER_SIZE);
assert_eq!(decoded.engine_type, VectorIndexEngineType::Usearch);
assert_eq!(decoded.dim, 4);
assert_eq!(
metric,
datatypes::schema::VectorDistanceMetric::L2sq.as_u8()
decoded.metric,
datatypes::schema::VectorDistanceMetric::L2sq
);
let connectivity = u16::from_le_bytes([blob_data[7], blob_data[8]]);
assert_eq!(connectivity, 24);
let expansion_add = u16::from_le_bytes([blob_data[9], blob_data[10]]);
assert_eq!(expansion_add, 200);
let expansion_search = u16::from_le_bytes([blob_data[11], blob_data[12]]);
assert_eq!(expansion_search, 100);
let total_rows = u64::from_le_bytes([
blob_data[13],
blob_data[14],
blob_data[15],
blob_data[16],
blob_data[17],
blob_data[18],
blob_data[19],
blob_data[20],
]);
assert_eq!(total_rows, 5);
let indexed_rows = u64::from_le_bytes([
blob_data[21],
blob_data[22],
blob_data[23],
blob_data[24],
blob_data[25],
blob_data[26],
blob_data[27],
blob_data[28],
]);
assert_eq!(indexed_rows, 3);
let null_bitmap_len =
u32::from_le_bytes([blob_data[29], blob_data[30], blob_data[31], blob_data[32]]);
assert_eq!(null_bitmap_len as usize, null_bitmap_bytes.len());
assert_eq!(decoded.connectivity, 24);
assert_eq!(decoded.expansion_add, 200);
assert_eq!(decoded.expansion_search, 100);
assert_eq!(decoded.total_rows, 5);
assert_eq!(decoded.indexed_rows, 3);
assert_eq!(decoded.null_bitmap_len as usize, null_bitmap_bytes.len());
// Verify null bitmap can be deserialized
let null_bitmap_data = &blob_data[header_size..header_size + null_bitmap_len as usize];
let null_bitmap_data =
&blob_data[header_size..header_size + decoded.null_bitmap_len as usize];
let restored_bitmap = RoaringBitmap::deserialize_from(null_bitmap_data).unwrap();
assert_eq!(restored_bitmap.len(), 2); // 2 nulls
assert!(restored_bitmap.contains(1));
assert!(restored_bitmap.contains(3));
}
#[tokio::test]
async fn test_vector_index_missing_column_as_nulls() {
let tempdir = common_test_util::temp_dir::create_temp_dir(
"test_vector_index_missing_column_as_nulls_",
);
let intm_mgr = IntermediateManager::init_fs(tempdir.path().to_string_lossy())
.await
.unwrap();
let region_metadata = mock_region_metadata_with_vector();
let mut vector_index_options = HashMap::new();
vector_index_options.insert(3, VectorIndexOptions::default());
let mut indexer = VectorIndexer::new(
FileId::random(),
&region_metadata,
intm_mgr,
None,
&vector_index_options,
)
.unwrap()
.unwrap();
let mut batch = new_batch_missing_vector_column(4, 3);
indexer.update(&mut batch).await.unwrap();
let creator = indexer.creators.get(&3).unwrap();
assert_eq!(creator.size(), 0);
assert_eq!(creator.current_row_offset, 3);
assert_eq!(creator.null_bitmap.len(), 3);
for idx in 0..3 {
assert!(creator.null_bitmap.contains(idx as u32));
}
}
}

View File

@@ -0,0 +1,324 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Vector index blob format helpers.
use std::fmt;
#[cfg(test)]
use datatypes::schema::VectorDistanceMetric as SchemaVectorDistanceMetric;
#[cfg(test)]
use index::vector::distance_metric_to_usearch;
use store_api::storage::{VectorDistanceMetric, VectorIndexEngineType};
pub(crate) const VECTOR_INDEX_BLOB_VERSION: u8 = 1;
pub(crate) const VECTOR_INDEX_BLOB_HEADER_SIZE: usize = 33;
#[derive(Debug, Clone, Copy)]
pub(crate) struct VectorIndexBlobHeader {
pub engine_type: VectorIndexEngineType,
pub dim: u32,
pub metric: VectorDistanceMetric,
pub connectivity: u16,
pub expansion_add: u16,
pub expansion_search: u16,
pub total_rows: u64,
pub indexed_rows: u64,
pub null_bitmap_len: u32,
}
impl VectorIndexBlobHeader {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
engine_type: VectorIndexEngineType,
dim: u32,
metric: VectorDistanceMetric,
connectivity: u16,
expansion_add: u16,
expansion_search: u16,
total_rows: u64,
indexed_rows: u64,
null_bitmap_len: u32,
) -> Result<Self, VectorIndexBlobFormatError> {
if total_rows < indexed_rows {
return Err(VectorIndexBlobFormatError::InvalidRowCounts {
total: total_rows,
indexed: indexed_rows,
});
}
if total_rows > u64::from(u32::MAX) || indexed_rows > u64::from(u32::MAX) {
return Err(VectorIndexBlobFormatError::RowsExceedU32 {
total: total_rows,
indexed: indexed_rows,
});
}
Ok(Self {
engine_type,
dim,
metric,
connectivity,
expansion_add,
expansion_search,
total_rows,
indexed_rows,
null_bitmap_len,
})
}
pub(crate) fn encode_into(&self, buf: &mut Vec<u8>) {
buf.push(VECTOR_INDEX_BLOB_VERSION);
buf.push(self.engine_type.as_u8());
buf.extend_from_slice(&self.dim.to_le_bytes());
buf.push(self.metric.as_u8());
buf.extend_from_slice(&self.connectivity.to_le_bytes());
buf.extend_from_slice(&self.expansion_add.to_le_bytes());
buf.extend_from_slice(&self.expansion_search.to_le_bytes());
buf.extend_from_slice(&self.total_rows.to_le_bytes());
buf.extend_from_slice(&self.indexed_rows.to_le_bytes());
buf.extend_from_slice(&self.null_bitmap_len.to_le_bytes());
}
pub(crate) fn decode(data: &[u8]) -> Result<(Self, usize), VectorIndexBlobFormatError> {
if data.len() < VECTOR_INDEX_BLOB_HEADER_SIZE {
return Err(VectorIndexBlobFormatError::Truncated("header"));
}
let mut offset = 0;
let version = read_u8(data, &mut offset)?;
if version != VECTOR_INDEX_BLOB_VERSION {
return Err(VectorIndexBlobFormatError::UnsupportedVersion(version));
}
let engine_type = VectorIndexEngineType::try_from_u8(read_u8(data, &mut offset)?)
.ok_or_else(|| VectorIndexBlobFormatError::UnknownEngine(data[offset - 1]))?;
let dim = read_u32(data, &mut offset)?;
let metric = VectorDistanceMetric::try_from_u8(read_u8(data, &mut offset)?)
.ok_or_else(|| VectorIndexBlobFormatError::UnknownMetric(data[offset - 1]))?;
let connectivity = read_u16(data, &mut offset)?;
let expansion_add = read_u16(data, &mut offset)?;
let expansion_search = read_u16(data, &mut offset)?;
let total_rows = read_u64(data, &mut offset)?;
let indexed_rows = read_u64(data, &mut offset)?;
let null_bitmap_len = read_u32(data, &mut offset)?;
let header = VectorIndexBlobHeader::new(
engine_type,
dim,
metric,
connectivity,
expansion_add,
expansion_search,
total_rows,
indexed_rows,
null_bitmap_len,
)?;
Ok((header, offset))
}
}
#[derive(Debug)]
pub(crate) enum VectorIndexBlobFormatError {
Truncated(&'static str),
UnsupportedVersion(u8),
UnknownEngine(u8),
UnknownMetric(u8),
InvalidRowCounts { total: u64, indexed: u64 },
RowsExceedU32 { total: u64, indexed: u64 },
}
impl fmt::Display for VectorIndexBlobFormatError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Truncated(label) => {
write!(f, "Vector index blob truncated while reading {}", label)
}
Self::UnsupportedVersion(version) => {
write!(f, "Unsupported vector index version {}", version)
}
Self::UnknownEngine(value) => write!(f, "Unknown vector index engine type {}", value),
Self::UnknownMetric(value) => write!(f, "Unknown vector index metric {}", value),
Self::InvalidRowCounts { total, indexed } => {
write!(
f,
"Total rows {} is smaller than indexed rows {}",
total, indexed
)
}
Self::RowsExceedU32 { total, indexed } => {
write!(
f,
"Vector index rows exceed u32::MAX (total={}, indexed={})",
total, indexed
)
}
}
}
}
fn read_exact<const N: usize>(
data: &[u8],
offset: &mut usize,
label: &'static str,
) -> Result<[u8; N], VectorIndexBlobFormatError> {
if *offset + N > data.len() {
return Err(VectorIndexBlobFormatError::Truncated(label));
}
let mut buf = [0u8; N];
buf.copy_from_slice(&data[*offset..*offset + N]);
*offset += N;
Ok(buf)
}
fn read_u8(data: &[u8], offset: &mut usize) -> Result<u8, VectorIndexBlobFormatError> {
Ok(read_exact::<1>(data, offset, "u8")?[0])
}
fn read_u16(data: &[u8], offset: &mut usize) -> Result<u16, VectorIndexBlobFormatError> {
Ok(u16::from_le_bytes(read_exact::<2>(data, offset, "u16")?))
}
fn read_u32(data: &[u8], offset: &mut usize) -> Result<u32, VectorIndexBlobFormatError> {
Ok(u32::from_le_bytes(read_exact::<4>(data, offset, "u32")?))
}
fn read_u64(data: &[u8], offset: &mut usize) -> Result<u64, VectorIndexBlobFormatError> {
Ok(u64::from_le_bytes(read_exact::<8>(data, offset, "u64")?))
}
#[cfg(test)]
mod tests {
use roaring::RoaringBitmap;
use store_api::storage::VectorIndexEngineType;
use super::*;
use crate::sst::index::vector_index::creator::VectorIndexConfig;
use crate::sst::index::vector_index::engine;
#[test]
fn test_vector_index_blob_header_roundtrip() {
let header = VectorIndexBlobHeader::new(
VectorIndexEngineType::Usearch,
4,
VectorDistanceMetric::L2sq,
24,
200,
100,
5,
3,
16,
)
.unwrap();
let mut bytes = Vec::new();
header.encode_into(&mut bytes);
let (decoded, offset) = VectorIndexBlobHeader::decode(&bytes).unwrap();
assert_eq!(offset, VECTOR_INDEX_BLOB_HEADER_SIZE);
assert_eq!(decoded.engine_type, header.engine_type);
assert_eq!(decoded.dim, header.dim);
assert_eq!(decoded.metric, header.metric);
assert_eq!(decoded.connectivity, header.connectivity);
assert_eq!(decoded.expansion_add, header.expansion_add);
assert_eq!(decoded.expansion_search, header.expansion_search);
assert_eq!(decoded.total_rows, header.total_rows);
assert_eq!(decoded.indexed_rows, header.indexed_rows);
assert_eq!(decoded.null_bitmap_len, header.null_bitmap_len);
}
#[test]
fn test_vector_index_blob_header_invalid_version() {
let mut blob = vec![0u8; VECTOR_INDEX_BLOB_HEADER_SIZE];
blob[0] = 2;
assert!(VectorIndexBlobHeader::decode(&blob).is_err());
}
#[test]
fn test_vector_index_blob_header_truncated() {
let blob = vec![0u8; VECTOR_INDEX_BLOB_HEADER_SIZE - 1];
assert!(VectorIndexBlobHeader::decode(&blob).is_err());
}
#[test]
fn test_vector_index_blob_parse_roundtrip() {
let config = VectorIndexConfig {
engine: VectorIndexEngineType::Usearch,
dim: 2,
metric: distance_metric_to_usearch(VectorDistanceMetric::L2sq),
distance_metric: VectorDistanceMetric::L2sq,
connectivity: 16,
expansion_add: 128,
expansion_search: 64,
};
let mut engine = engine::create_engine(config.engine, &config).unwrap();
engine.add(0, &[0.0, 1.0]).unwrap();
let index_size = engine.serialized_length();
let mut index_bytes = vec![0u8; index_size];
engine.save_to_buffer(&mut index_bytes).unwrap();
let null_bitmap = RoaringBitmap::new();
let mut null_bitmap_bytes = Vec::new();
null_bitmap.serialize_into(&mut null_bitmap_bytes).unwrap();
let header = VectorIndexBlobHeader::new(
config.engine,
config.dim as u32,
VectorDistanceMetric::L2sq,
config.connectivity as u16,
config.expansion_add as u16,
config.expansion_search as u16,
1,
1,
null_bitmap_bytes.len() as u32,
)
.unwrap();
let mut blob = Vec::new();
header.encode_into(&mut blob);
blob.extend_from_slice(&null_bitmap_bytes);
blob.extend_from_slice(&index_bytes);
let (decoded, offset) = VectorIndexBlobHeader::decode(&blob).unwrap();
let null_bitmap_len = decoded.null_bitmap_len as usize;
let null_bitmap_data = &blob[offset..offset + null_bitmap_len];
let restored_bitmap = RoaringBitmap::deserialize_from(null_bitmap_data).unwrap();
assert_eq!(decoded.metric, VectorDistanceMetric::L2sq);
assert_eq!(decoded.total_rows, 1);
assert_eq!(decoded.indexed_rows, 1);
assert_eq!(restored_bitmap.len(), 0);
}
#[test]
fn test_vector_index_blob_header_format_matches_creator() {
let header = VectorIndexBlobHeader::new(
VectorIndexEngineType::Usearch,
4,
VectorDistanceMetric::L2sq,
24,
200,
100,
5,
3,
2,
)
.unwrap();
let mut bytes = Vec::new();
header.encode_into(&mut bytes);
let (decoded, header_size) = VectorIndexBlobHeader::decode(&bytes).unwrap();
assert_eq!(header_size, VECTOR_INDEX_BLOB_HEADER_SIZE);
assert_eq!(decoded.metric, SchemaVectorDistanceMetric::L2sq);
assert_eq!(decoded.total_rows, 5);
assert_eq!(decoded.indexed_rows, 3);
assert_eq!(decoded.null_bitmap_len, 2);
}
}

View File

@@ -14,8 +14,10 @@
//! Vector index module for HNSW-based approximate nearest neighbor search.
pub(crate) mod applier;
pub(crate) mod creator;
pub(crate) mod engine;
pub(crate) mod format;
pub(crate) mod util;
/// The blob type identifier for vector index in puffin files.

View File

@@ -13,11 +13,82 @@
// limitations under the License.
use std::ops::Range;
use std::sync::Arc;
use std::time::Instant;
use bytes::Bytes;
use common_telemetry::trace;
use object_store::ObjectStore;
use parquet::basic::ColumnOrder;
use parquet::file::metadata::{FileMetaData, ParquetMetaData, RowGroupMetaData};
use parquet::format;
use parquet::schema::types::{SchemaDescriptor, from_thrift};
use snafu::ResultExt;
use crate::error;
use crate::error::Result;
// Refer to https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/file/footer.rs#L74-L90
/// Convert [format::FileMetaData] to [ParquetMetaData]
pub fn parse_parquet_metadata(t_file_metadata: format::FileMetaData) -> Result<ParquetMetaData> {
let schema = from_thrift(&t_file_metadata.schema).context(error::ConvertMetaDataSnafu)?;
let schema_desc_ptr = Arc::new(SchemaDescriptor::new(schema));
let mut row_groups = Vec::with_capacity(t_file_metadata.row_groups.len());
for rg in t_file_metadata.row_groups {
row_groups.push(
RowGroupMetaData::from_thrift(schema_desc_ptr.clone(), rg)
.context(error::ConvertMetaDataSnafu)?,
);
}
let column_orders = parse_column_orders(t_file_metadata.column_orders, &schema_desc_ptr);
let file_metadata = FileMetaData::new(
t_file_metadata.version,
t_file_metadata.num_rows,
t_file_metadata.created_by,
t_file_metadata.key_value_metadata,
schema_desc_ptr,
column_orders,
);
// There may be a problem owing to lacking of column_index and offset_index,
// if we open page index in the future.
Ok(ParquetMetaData::new(file_metadata, row_groups))
}
// Port from https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/file/footer.rs#L106-L137
/// Parses column orders from Thrift definition.
/// If no column orders are defined, returns `None`.
fn parse_column_orders(
t_column_orders: Option<Vec<format::ColumnOrder>>,
schema_descr: &SchemaDescriptor,
) -> Option<Vec<ColumnOrder>> {
match t_column_orders {
Some(orders) => {
// Should always be the case
assert_eq!(
orders.len(),
schema_descr.num_columns(),
"Column order length mismatch"
);
let mut res = Vec::with_capacity(schema_descr.num_columns());
for (i, column) in schema_descr.columns().iter().enumerate() {
match orders[i] {
format::ColumnOrder::TYPEORDER(_) => {
let sort_order = ColumnOrder::get_sort_order(
column.logical_type(),
column.converted_type(),
column.physical_type(),
);
res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
}
}
}
Some(res)
}
None => None,
}
}
const FETCH_PARALLELISM: usize = 8;
pub(crate) const MERGE_GAP: usize = 512 * 1024;

View File

@@ -14,6 +14,8 @@
//! Parquet reader.
#[cfg(feature = "vector_index")]
use std::collections::BTreeSet;
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -31,7 +33,8 @@ use mito_codec::row_converter::build_primary_key_codec;
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
use parquet::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels};
use parquet::file::metadata::{KeyValue, ParquetMetaData};
use parquet::file::metadata::ParquetMetaData;
use parquet::format::KeyValue;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
use store_api::region_request::PathType;
@@ -40,6 +43,8 @@ use table::predicate::Predicate;
use crate::cache::CacheStrategy;
use crate::cache::index::result_cache::PredicateKey;
#[cfg(feature = "vector_index")]
use crate::error::ApplyVectorIndexSnafu;
use crate::error::{
ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadDataPartSnafu,
ReadParquetSnafu, Result,
@@ -60,6 +65,8 @@ use crate::sst::index::fulltext_index::applier::{
use crate::sst::index::inverted_index::applier::{
InvertedIndexApplierRef, InvertedIndexApplyMetrics,
};
#[cfg(feature = "vector_index")]
use crate::sst::index::vector_index::applier::VectorIndexApplierRef;
use crate::sst::parquet::file_range::{
FileRangeContext, FileRangeContextRef, PreFilterMode, RangeBase, row_group_contains_delete,
};
@@ -73,6 +80,7 @@ use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY};
const INDEX_TYPE_FULLTEXT: &str = "fulltext";
const INDEX_TYPE_INVERTED: &str = "inverted";
const INDEX_TYPE_BLOOM: &str = "bloom filter";
const INDEX_TYPE_VECTOR: &str = "vector";
macro_rules! handle_index_error {
($err:expr, $file_handle:expr, $index_type:expr) => {
@@ -116,6 +124,12 @@ pub struct ParquetReaderBuilder {
inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
/// Vector index applier for KNN search.
#[cfg(feature = "vector_index")]
vector_index_applier: Option<VectorIndexApplierRef>,
/// Over-fetched k for vector index scan.
#[cfg(feature = "vector_index")]
vector_index_k: Option<usize>,
/// Expected metadata of the region while reading the SST.
/// This is usually the latest metadata of the region. The reader use
/// it get the correct column id of a column by name.
@@ -149,6 +163,10 @@ impl ParquetReaderBuilder {
inverted_index_appliers: [None, None],
bloom_filter_index_appliers: [None, None],
fulltext_index_appliers: [None, None],
#[cfg(feature = "vector_index")]
vector_index_applier: None,
#[cfg(feature = "vector_index")]
vector_index_k: None,
expected_metadata: None,
flat_format: false,
compaction: false,
@@ -210,6 +228,19 @@ impl ParquetReaderBuilder {
self
}
/// Attaches the vector index applier to the builder.
#[cfg(feature = "vector_index")]
#[must_use]
pub(crate) fn vector_index_applier(
mut self,
applier: Option<VectorIndexApplierRef>,
k: Option<usize>,
) -> Self {
self.vector_index_applier = applier;
self.vector_index_k = k;
self
}
/// Attaches the expected metadata to the builder.
#[must_use]
pub fn expected_metadata(mut self, expected_metadata: Option<RegionMetadataRef>) -> Self {
@@ -571,6 +602,19 @@ impl ParquetReaderBuilder {
)
.await;
}
#[cfg(feature = "vector_index")]
{
self.prune_row_groups_by_vector_index(
row_group_size,
num_row_groups,
&mut output,
metrics,
)
.await;
if output.is_empty() {
return output;
}
}
output
}
@@ -798,6 +842,48 @@ impl ParquetReaderBuilder {
pruned
}
/// Prunes row groups by vector index results.
#[cfg(feature = "vector_index")]
async fn prune_row_groups_by_vector_index(
&self,
row_group_size: usize,
num_row_groups: usize,
output: &mut RowGroupSelection,
metrics: &mut ReaderFilterMetrics,
) {
let Some(applier) = &self.vector_index_applier else {
return;
};
let Some(k) = self.vector_index_k else {
return;
};
if !self.file_handle.meta_ref().vector_index_available() {
return;
}
let file_size_hint = self.file_handle.meta_ref().index_file_size();
let apply_res = applier
.apply_with_k(self.file_handle.index_id(), Some(file_size_hint), k)
.await;
let row_ids = match apply_res {
Ok(res) => res.row_offsets,
Err(err) => {
handle_index_error!(err, self.file_handle, INDEX_TYPE_VECTOR);
return;
}
};
let selection = match vector_selection_from_offsets(row_ids, row_group_size, num_row_groups)
{
Ok(selection) => selection,
Err(err) => {
handle_index_error!(err, self.file_handle, INDEX_TYPE_VECTOR);
return;
}
};
apply_selection_and_update_metrics(output, &selection, metrics, INDEX_TYPE_VECTOR);
}
async fn prune_row_groups_by_fulltext_bloom(
&self,
row_group_size: usize,
@@ -982,6 +1068,29 @@ fn apply_selection_and_update_metrics(
*output = intersection;
}
#[cfg(feature = "vector_index")]
fn vector_selection_from_offsets(
row_offsets: Vec<u64>,
row_group_size: usize,
num_row_groups: usize,
) -> Result<RowGroupSelection> {
let mut row_ids = BTreeSet::new();
for offset in row_offsets {
let row_id = u32::try_from(offset).map_err(|_| {
ApplyVectorIndexSnafu {
reason: format!("Row offset {} exceeds u32::MAX", offset),
}
.build()
})?;
row_ids.insert(row_id);
}
Ok(RowGroupSelection::from_row_ids(
row_ids,
row_group_size,
num_row_groups,
))
}
fn all_required_row_groups_searched(
required_row_groups: &RowGroupSelection,
cached_row_groups: &RowGroupSelection,
@@ -1007,6 +1116,8 @@ pub(crate) struct ReaderFilterMetrics {
pub(crate) rg_minmax_filtered: usize,
/// Number of row groups filtered by bloom filter index.
pub(crate) rg_bloom_filtered: usize,
/// Number of row groups filtered by vector index.
pub(crate) rg_vector_filtered: usize,
/// Number of rows in row group before filtering.
pub(crate) rows_total: usize,
@@ -1016,6 +1127,8 @@ pub(crate) struct ReaderFilterMetrics {
pub(crate) rows_inverted_filtered: usize,
/// Number of rows in row group filtered by bloom filter index.
pub(crate) rows_bloom_filtered: usize,
/// Number of rows filtered by vector index.
pub(crate) rows_vector_filtered: usize,
/// Number of rows filtered by precise filter.
pub(crate) rows_precise_filtered: usize,
@@ -1035,11 +1148,13 @@ impl ReaderFilterMetrics {
self.rg_inverted_filtered += other.rg_inverted_filtered;
self.rg_minmax_filtered += other.rg_minmax_filtered;
self.rg_bloom_filtered += other.rg_bloom_filtered;
self.rg_vector_filtered += other.rg_vector_filtered;
self.rows_total += other.rows_total;
self.rows_fulltext_filtered += other.rows_fulltext_filtered;
self.rows_inverted_filtered += other.rows_inverted_filtered;
self.rows_bloom_filtered += other.rows_bloom_filtered;
self.rows_vector_filtered += other.rows_vector_filtered;
self.rows_precise_filtered += other.rows_precise_filtered;
// Merge optional applier metrics
@@ -1077,6 +1192,9 @@ impl ReaderFilterMetrics {
READ_ROW_GROUPS_TOTAL
.with_label_values(&["bloom_filter_index_filtered"])
.inc_by(self.rg_bloom_filtered as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["vector_index_filtered"])
.inc_by(self.rg_vector_filtered as u64);
PRECISE_FILTER_ROWS_TOTAL
.with_label_values(&["parquet"])
@@ -1093,6 +1211,9 @@ impl ReaderFilterMetrics {
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["bloom_filter_index_filtered"])
.inc_by(self.rows_bloom_filtered as u64);
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["vector_index_filtered"])
.inc_by(self.rows_vector_filtered as u64);
}
fn update_index_metrics(&mut self, index_type: &str, row_group_count: usize, row_count: usize) {
@@ -1109,11 +1230,67 @@ impl ReaderFilterMetrics {
self.rg_bloom_filtered += row_group_count;
self.rows_bloom_filtered += row_count;
}
INDEX_TYPE_VECTOR => {
self.rg_vector_filtered += row_group_count;
self.rows_vector_filtered += row_count;
}
_ => {}
}
}
}
#[cfg(all(test, feature = "vector_index"))]
mod tests {
use super::*;
#[test]
fn test_vector_selection_from_offsets() {
let row_group_size = 4;
let num_row_groups = 3;
let selection =
vector_selection_from_offsets(vec![0, 1, 5, 9], row_group_size, num_row_groups)
.unwrap();
assert_eq!(selection.row_group_count(), 3);
assert_eq!(selection.row_count(), 4);
assert!(selection.contains_non_empty_row_group(0));
assert!(selection.contains_non_empty_row_group(1));
assert!(selection.contains_non_empty_row_group(2));
}
#[test]
fn test_vector_selection_from_offsets_out_of_range() {
let row_group_size = 4;
let num_row_groups = 2;
let selection = vector_selection_from_offsets(
vec![0, 7, u64::from(u32::MAX) + 1],
row_group_size,
num_row_groups,
);
assert!(selection.is_err());
}
#[test]
fn test_vector_selection_updates_metrics() {
let row_group_size = 4;
let total_rows = 8;
let mut output = RowGroupSelection::new(row_group_size, total_rows);
let selection = vector_selection_from_offsets(vec![1], row_group_size, 2).unwrap();
let mut metrics = ReaderFilterMetrics::default();
apply_selection_and_update_metrics(
&mut output,
&selection,
&mut metrics,
INDEX_TYPE_VECTOR,
);
assert_eq!(metrics.rg_vector_filtered, 1);
assert_eq!(metrics.rows_vector_filtered, 7);
assert_eq!(output.row_count(), 1);
}
}
/// Metrics for parquet metadata cache operations.
#[derive(Default, Clone, Copy)]
pub(crate) struct MetadataCacheMetrics {

View File

@@ -55,6 +55,7 @@ use crate::sst::file::RegionFileId;
use crate::sst::index::{IndexOutput, Indexer, IndexerBuilder};
use crate::sst::parquet::flat_format::{FlatWriteFormat, time_index_column_index};
use crate::sst::parquet::format::PrimaryKeyWriteFormat;
use crate::sst::parquet::helper::parse_parquet_metadata;
use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo, WriteOptions};
use crate::sst::{
DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, SeriesEstimator,
@@ -204,12 +205,14 @@ where
}
current_writer.flush().await.context(WriteParquetSnafu)?;
let parquet_metadata = current_writer.close().await.context(WriteParquetSnafu)?;
let file_meta = current_writer.close().await.context(WriteParquetSnafu)?;
let file_size = self.bytes_written.load(Ordering::Relaxed) as u64;
// Safety: num rows > 0 so we must have min/max.
let time_range = stats.time_range.unwrap();
// convert FileMetaData to ParquetMetaData
let parquet_metadata = parse_parquet_metadata(file_meta)?;
let max_row_group_uncompressed_size: u64 = parquet_metadata
.row_groups()
.iter()

View File

@@ -33,7 +33,6 @@ use common_telemetry::{debug, tracing};
use datafusion::datasource::physical_plan::{CsvSource, FileSource, JsonSource};
use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder;
use datafusion::parquet::arrow::arrow_reader::ArrowReaderMetadata;
use datafusion_common::config::CsvOptions;
use datafusion_expr::Expr;
use datatypes::arrow::compute::can_cast_types;
use datatypes::arrow::datatypes::{DataType as ArrowDataType, Schema, SchemaRef};
@@ -215,15 +214,13 @@ impl StatementExecutor {
.context(error::ProjectSchemaSnafu)?,
);
let options = CsvOptions::default()
.with_has_header(format.has_header)
.with_delimiter(format.delimiter);
let csv_source = CsvSource::new(schema.clone())
.with_csv_options(options)
let csv_source = CsvSource::new(format.has_header, format.delimiter, b'"')
.with_schema(schema.clone())
.with_batch_size(DEFAULT_BATCH_SIZE);
let stream = file_to_stream(
object_store,
path,
schema.clone(),
csv_source,
Some(projection),
format.compression_type,
@@ -250,11 +247,13 @@ impl StatementExecutor {
.context(error::ProjectSchemaSnafu)?,
);
let json_source =
JsonSource::new(schema.clone()).with_batch_size(DEFAULT_BATCH_SIZE);
let json_source = JsonSource::new()
.with_schema(schema.clone())
.with_batch_size(DEFAULT_BATCH_SIZE);
let stream = file_to_stream(
object_store,
path,
schema.clone(),
json_source,
Some(projection),
format.compression_type,

View File

@@ -2208,6 +2208,7 @@ mod test {
use sql::dialect::GreptimeDbDialect;
use sql::parser::{ParseOptions, ParserContext};
use sql::statements::statement::Statement;
use sqlparser::parser::Parser;
use super::*;
use crate::expr_helper;
@@ -2225,6 +2226,39 @@ mod test {
assert!(!NAME_PATTERN_REG.is_match("#"));
}
#[test]
fn test_partition_expr_equivalence_with_swapped_operands() {
let column_name = "device_id".to_string();
let column_name_and_type =
HashMap::from([(&column_name, ConcreteDataType::int32_datatype())]);
let timezone = Timezone::from_tz_string("UTC").unwrap();
let dialect = GreptimeDbDialect {};
let mut parser = Parser::new(&dialect)
.try_with_sql("device_id < 100")
.unwrap();
let expr_left = parser.parse_expr().unwrap();
let mut parser = Parser::new(&dialect)
.try_with_sql("100 > device_id")
.unwrap();
let expr_right = parser.parse_expr().unwrap();
let partition_left =
convert_one_expr(&expr_left, &column_name_and_type, &timezone).unwrap();
let partition_right =
convert_one_expr(&expr_right, &column_name_and_type, &timezone).unwrap();
assert_eq!(partition_left, partition_right);
assert!([partition_left.clone()].contains(&partition_right));
let mut physical_partition_exprs = vec![partition_left];
let mut logical_partition_exprs = vec![partition_right];
physical_partition_exprs.sort_unstable();
logical_partition_exprs.sort_unstable();
assert_eq!(physical_partition_exprs, logical_partition_exprs);
}
#[tokio::test]
#[ignore = "TODO(ruihang): WIP new partition rule"]
async fn test_parse_partitions() {

View File

@@ -185,6 +185,19 @@ impl RestrictedOp {
Self::Or => ParserBinaryOperator::Or,
}
}
fn invert_for_swap(&self) -> Self {
match self {
Self::Eq => Self::Eq,
Self::NotEq => Self::NotEq,
Self::Lt => Self::Gt,
Self::LtEq => Self::GtEq,
Self::Gt => Self::Lt,
Self::GtEq => Self::LtEq,
Self::And => Self::And,
Self::Or => Self::Or,
}
}
}
impl Display for RestrictedOp {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
@@ -208,6 +221,32 @@ impl PartitionExpr {
op,
rhs: Box::new(rhs),
}
.canonicalize()
}
/// Canonicalize to `Column op Value` form when possible for consistent equality checks.
pub fn canonicalize(self) -> Self {
let lhs = Self::canonicalize_operand(*self.lhs);
let rhs = Self::canonicalize_operand(*self.rhs);
let mut expr = Self {
lhs: Box::new(lhs),
op: self.op,
rhs: Box::new(rhs),
};
if matches!(&*expr.lhs, Operand::Value(_)) && matches!(&*expr.rhs, Operand::Column(_)) {
std::mem::swap(&mut expr.lhs, &mut expr.rhs);
expr.op = expr.op.invert_for_swap();
}
expr
}
fn canonicalize_operand(operand: Operand) -> Operand {
match operand {
Operand::Expr(expr) => Operand::Expr(expr.canonicalize()),
other => other,
}
}
/// Convert [Self] back to sqlparser's [Expr]
@@ -354,7 +393,7 @@ impl PartitionExpr {
let bound: PartitionBound = serde_json::from_str(s).context(error::DeserializeJsonSnafu)?;
match bound {
PartitionBound::Expr(expr) => Ok(Some(expr)),
PartitionBound::Expr(expr) => Ok(Some(expr.canonicalize())),
_ => Ok(None),
}
}
@@ -494,7 +533,7 @@ mod tests {
.try_as_logical_expr()
.unwrap()
.to_string(),
"Int64(10) < a OR a IS NULL"
"a > Int64(10) OR a IS NULL"
);
// Test Gt with column on LHS
@@ -519,7 +558,7 @@ mod tests {
.try_as_logical_expr()
.unwrap()
.to_string(),
"Int64(10) > a OR a IS NULL"
"a < Int64(10) OR a IS NULL"
);
// Test GtEq with column on LHS

View File

@@ -255,9 +255,9 @@ fn metrics_to_string(metrics: RecordBatchMetrics, format: AnalyzeFormat) -> DfRe
match format {
AnalyzeFormat::JSON => Ok(JsonMetrics::from_record_batch_metrics(metrics).to_string()),
AnalyzeFormat::TEXT => Ok(metrics.to_string()),
format => Err(DataFusionError::NotImplemented(format!(
"AnalyzeFormat {format}",
))),
AnalyzeFormat::GRAPHVIZ => Err(DataFusionError::NotImplemented(
"GRAPHVIZ format is not supported for metrics output".to_string(),
)),
}
}

View File

@@ -543,8 +543,7 @@ impl QueryEngine for DatafusionQueryEngine {
}
// configure execution options
state.config_mut().options_mut().execution.time_zone =
Some(query_ctx.timezone().to_string());
state.config_mut().options_mut().execution.time_zone = query_ctx.timezone().to_string();
// usually it's impossible to have both `set variable` set by sql client and
// hint in header by grpc client, so only need to deal with them separately

View File

@@ -123,7 +123,7 @@ tokio-rustls.workspace = true
tokio-stream = { workspace = true, features = ["net"] }
tokio-util.workspace = true
tonic.workspace = true
tonic-reflection = "0.14"
tonic-reflection = "0.13"
tower = { workspace = true, features = ["full"] }
tower-http = { version = "0.6", features = ["full"] }
tracing.workspace = true

View File

@@ -715,7 +715,7 @@ fn replace_params_with_values(
if let Some(Some(t)) = param_types.get(&format_placeholder(i + 1)) {
let value = helper::convert_value(param, t)?;
values.push(value.into());
values.push(value);
}
}
@@ -744,7 +744,7 @@ fn replace_params_with_exprs(
if let Some(Some(t)) = param_types.get(&format_placeholder(i + 1)) {
let value = helper::convert_expr_to_scalar_value(param, t)?;
values.push(value.into());
values.push(value);
}
}

View File

@@ -13,10 +13,8 @@
// limitations under the License.
use std::ops::ControlFlow;
use std::sync::Arc;
use std::time::Duration;
use arrow_schema::Field;
use chrono::NaiveDate;
use common_query::prelude::ScalarValue;
use common_sql::convert::sql_value_to_value;
@@ -89,8 +87,8 @@ pub fn fix_placeholder_types(plan: &mut LogicalPlan) -> Result<()> {
let give_placeholder_types = |mut e: datafusion_expr::Expr| {
if let datafusion_expr::Expr::Cast(cast) = &mut e {
if let datafusion_expr::Expr::Placeholder(ph) = &mut *cast.expr {
if ph.field.is_none() {
ph.field = Some(Arc::new(Field::new("", cast.data_type.clone(), true)));
if ph.data_type.is_none() {
ph.data_type = Some(cast.data_type.clone());
common_telemetry::debug!(
"give placeholder type {:?} to {:?}",
cast.data_type,

View File

@@ -324,12 +324,11 @@ impl ExtendedQueryHandler for PostgresServerHandlerInner {
}
let output = if let Some(plan) = &sql_plan.plan {
let values = parameters_to_scalar_values(plan, portal)?;
let plan = plan
.clone()
.replace_params_with_values(&ParamValues::List(
values.into_iter().map(Into::into).collect(),
))
.replace_params_with_values(&ParamValues::List(parameters_to_scalar_values(
plan, portal,
)?))
.context(DataFusionSnafu)
.map_err(convert_err)?;
self.query_handler

View File

@@ -225,7 +225,7 @@ impl QueryContext {
/// Create a new datafusion's ConfigOptions instance based on the current QueryContext.
pub fn create_config_options(&self) -> ConfigOptions {
let mut config = ConfigOptions::default();
config.execution.time_zone = Some(self.timezone().to_string());
config.execution.time_zone = self.timezone().to_string();
config
}

View File

@@ -271,7 +271,7 @@ pub fn sql_data_type_to_concrete_data_type(
})?
.map(|t| ConcreteDataType::timestamp_datatype(t.unit()))
.unwrap_or(ConcreteDataType::timestamp_millisecond_datatype())),
SqlDataType::Interval { .. } => Ok(ConcreteDataType::interval_month_day_nano_datatype()),
SqlDataType::Interval => Ok(ConcreteDataType::interval_month_day_nano_datatype()),
SqlDataType::Decimal(exact_info) => match exact_info {
ExactNumberInfo::None => Ok(ConcreteDataType::decimal128_default_datatype()),
// refer to https://dev.mysql.com/doc/refman/8.0/en/fixed-point-types.html
@@ -333,8 +333,8 @@ pub fn concrete_data_type_to_sql_data_type(data_type: &ConcreteDataType) -> Resu
ConcreteDataType::Int8(_) => Ok(SqlDataType::TinyInt(None)),
ConcreteDataType::UInt8(_) => Ok(SqlDataType::TinyIntUnsigned(None)),
ConcreteDataType::String(_) => Ok(SqlDataType::String(None)),
ConcreteDataType::Float32(_) => Ok(SqlDataType::Float(ExactNumberInfo::Precision(4))),
ConcreteDataType::Float64(_) => Ok(SqlDataType::Float(ExactNumberInfo::Precision(8))),
ConcreteDataType::Float32(_) => Ok(SqlDataType::Float(None)),
ConcreteDataType::Float64(_) => Ok(SqlDataType::Double(ExactNumberInfo::None)),
ConcreteDataType::Boolean(_) => Ok(SqlDataType::Boolean),
ConcreteDataType::Date(_) => Ok(SqlDataType::Date),
ConcreteDataType::Timestamp(ts_type) => Ok(SqlDataType::Timestamp(
@@ -345,13 +345,10 @@ pub fn concrete_data_type_to_sql_data_type(data_type: &ConcreteDataType) -> Resu
Some(time_type.precision()),
TimezoneInfo::None,
)),
ConcreteDataType::Interval(_) => Ok(SqlDataType::Interval {
fields: None,
precision: None,
}),
ConcreteDataType::Interval(_) => Ok(SqlDataType::Interval),
ConcreteDataType::Binary(_) => Ok(SqlDataType::Varbinary(None)),
ConcreteDataType::Decimal128(d) => Ok(SqlDataType::Decimal(
ExactNumberInfo::PrecisionAndScale(d.precision() as u64, d.scale() as i64),
ExactNumberInfo::PrecisionAndScale(d.precision() as u64, d.scale() as u64),
)),
ConcreteDataType::Json(_) => Ok(SqlDataType::JSON),
ConcreteDataType::Vector(v) => Ok(SqlDataType::Custom(
@@ -415,7 +412,7 @@ mod tests {
ConcreteDataType::string_datatype(),
);
check_type(
SqlDataType::Float(ExactNumberInfo::Precision(4)),
SqlDataType::Float(None),
ConcreteDataType::float32_datatype(),
);
check_type(
@@ -453,10 +450,7 @@ mod tests {
ConcreteDataType::timestamp_microsecond_datatype(),
);
check_type(
SqlDataType::Interval {
fields: None,
precision: None,
},
SqlDataType::Interval,
ConcreteDataType::interval_month_day_nano_datatype(),
);
check_type(SqlDataType::JSON, ConcreteDataType::json_datatype());

View File

@@ -114,7 +114,7 @@ impl TransformRule for ExpandIntervalTransformRule {
kind,
format,
} => {
if matches!(data_type, DataType::Interval { .. }) {
if DataType::Interval == *data_type {
match &**cast_exp {
Expr::Value(ValueWithSpan {
value: Value::SingleQuotedString(value),
@@ -129,7 +129,7 @@ impl TransformRule for ExpandIntervalTransformRule {
*expr = Expr::Cast {
kind: kind.clone(),
expr: single_quoted_string_expr(interval_value),
data_type: data_type.clone(),
data_type: DataType::Interval,
format: std::mem::take(format),
}
}
@@ -392,10 +392,7 @@ mod tests {
let mut cast_to_interval_expr = Expr::Cast {
expr: single_quoted_string_expr("3y2mon".to_string()),
data_type: DataType::Interval {
fields: None,
precision: None,
},
data_type: DataType::Interval,
format: None,
kind: sqlparser::ast::CastKind::Cast,
};
@@ -410,10 +407,7 @@ mod tests {
expr: Box::new(Expr::Value(
Value::SingleQuotedString("3 years 2 months".to_string()).into()
)),
data_type: DataType::Interval {
fields: None,
precision: None,
},
data_type: DataType::Interval,
format: None,
}
);

View File

@@ -178,10 +178,10 @@ pub(crate) fn get_type_by_alias(data_type: &DataType) -> Option<DataType> {
DataType::UInt16 => Some(DataType::SmallIntUnsigned(None)),
DataType::UInt32 => Some(DataType::IntUnsigned(None)),
DataType::UInt64 => Some(DataType::BigIntUnsigned(None)),
DataType::Float4 => Some(DataType::Float(ExactNumberInfo::Precision(4))),
DataType::Float4 => Some(DataType::Float(None)),
DataType::Float8 => Some(DataType::Double(ExactNumberInfo::None)),
DataType::Float32 => Some(DataType::Float(ExactNumberInfo::Precision(4))),
DataType::Float64 => Some(DataType::Float(ExactNumberInfo::Precision(8))),
DataType::Float32 => Some(DataType::Float(None)),
DataType::Float64 => Some(DataType::Double(ExactNumberInfo::None)),
DataType::Bool => Some(DataType::Boolean),
DataType::Datetime(_) => Some(DataType::Timestamp(Some(6), TimezoneInfo::None)),
_ => None,
@@ -222,10 +222,10 @@ pub(crate) fn get_data_type_by_alias_name(name: &str) -> Option<DataType> {
"UINT16" => Some(DataType::SmallIntUnsigned(None)),
"UINT32" => Some(DataType::IntUnsigned(None)),
"UINT64" => Some(DataType::BigIntUnsigned(None)),
"FLOAT4" => Some(DataType::Float(ExactNumberInfo::Precision(4))),
"FLOAT4" => Some(DataType::Float(None)),
"FLOAT8" => Some(DataType::Double(ExactNumberInfo::None)),
"FLOAT32" => Some(DataType::Float(ExactNumberInfo::Precision(4))),
"FLOAT64" => Some(DataType::Float(ExactNumberInfo::Precision(8))),
"FLOAT32" => Some(DataType::Float(None)),
"FLOAT64" => Some(DataType::Double(ExactNumberInfo::None)),
// String type alias
"TINYTEXT" | "MEDIUMTEXT" | "LONGTEXT" => Some(DataType::Text),
_ => None,
@@ -256,7 +256,7 @@ mod tests {
);
assert_eq!(
get_data_type_by_alias_name("float32"),
Some(DataType::Float(ExactNumberInfo::Precision(4)))
Some(DataType::Float(None))
);
assert_eq!(
get_data_type_by_alias_name("float8"),
@@ -264,7 +264,7 @@ mod tests {
);
assert_eq!(
get_data_type_by_alias_name("float4"),
Some(DataType::Float(ExactNumberInfo::Precision(4)))
Some(DataType::Float(None))
);
assert_eq!(
get_data_type_by_alias_name("int8"),

View File

@@ -95,8 +95,15 @@ impl FileRef {
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct FileRefsManifest {
pub file_refs: HashMap<RegionId, HashSet<FileRef>>,
/// Manifest version when this manifest is read for it's files
/// Manifest version when this manifest is read for its files
pub manifest_version: HashMap<RegionId, ManifestVersion>,
/// Cross-region file ownership mapping.
///
/// Key is the source/original region id (before repartition); value is the set of
/// target/destination region ids (after repartition) that currently hold files
/// originally coming from that source region.
///
pub cross_region_refs: HashMap<RegionId, HashSet<RegionId>>,
}
#[derive(Clone, Default, Debug, PartialEq, Eq, Serialize, Deserialize)]
@@ -179,6 +186,8 @@ mod tests {
.insert(r1, [FileRef::new(r1, FileId::random(), None)].into());
manifest.manifest_version.insert(r0, 10);
manifest.manifest_version.insert(r1, 20);
manifest.cross_region_refs.insert(r0, [r1].into());
manifest.cross_region_refs.insert(r1, [r0].into());
let json = serde_json::to_string(&manifest).unwrap();
let parsed: FileRefsManifest = serde_json::from_str(&json).unwrap();

View File

@@ -6,6 +6,7 @@ license.workspace = true
[features]
dashboard = []
vector_index = []
[lints]
workspace = true

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
use std::time::Duration;
use common_meta::key::TableMetadataManagerRef;
@@ -105,8 +105,10 @@ async fn distributed_with_gc(store_type: &StorageType) -> (TestContext, TempDirG
#[tokio::test]
async fn test_gc_basic_different_store() {
let _ = dotenv::dotenv();
common_telemetry::init_default_ut_logging();
let store_type = StorageType::build_storage_types_based_on_env();
info!("store type: {:?}", store_type);
for store in store_type {
if store == StorageType::File {
continue; // no point in test gc in fs storage
@@ -190,17 +192,16 @@ async fn test_gc_basic(store_type: &StorageType) {
assert_eq!(sst_files_after_compaction.len(), 5); // 4 old + 1 new
// Step 5: Get table route information for GC procedure
let (region_routes, regions) =
let (_region_routes, regions) =
get_table_route(metasrv.table_metadata_manager(), table_id).await;
// Step 6: Create and execute BatchGcProcedure
let procedure = BatchGcProcedure::new(
metasrv.mailbox().clone(),
metasrv.table_metadata_manager().clone(),
metasrv.options().grpc.server_addr.clone(),
regions.clone(),
false, // full_file_listing
region_routes,
HashMap::new(), // related_regions (empty for this simple test)
false, // full_file_listing
Duration::from_secs(10), // timeout
);

View File

@@ -1379,6 +1379,19 @@ providers = []"#,
)
};
let vector_index_config = if cfg!(feature = "vector_index") {
r#"
[region_engine.mito.vector_index]
create_on_flush = "auto"
create_on_compaction = "auto"
apply_on_query = "auto"
mem_threshold_on_create = "auto"
"#
} else {
"\n"
};
let expected_toml_str = format!(
r#"
enable_telemetry = true
@@ -1545,14 +1558,7 @@ create_on_flush = "auto"
create_on_compaction = "auto"
apply_on_query = "auto"
mem_threshold_on_create = "auto"
[region_engine.mito.vector_index]
create_on_flush = "auto"
create_on_compaction = "auto"
apply_on_query = "auto"
mem_threshold_on_create = "auto"
[region_engine.mito.memtable]
{vector_index_config}[region_engine.mito.memtable]
type = "time_series"
[region_engine.mito.gc]

View File

@@ -2,103 +2,103 @@
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
explain select * from numbers;
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Projection: numbers.number |
| | TableScan: numbers |
| | ]] |
| physical_plan | CooperativeExec |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Projection: numbers.number |
| | TableScan: numbers |
| | ]] |
| physical_plan | CooperativeExec |
| | RepartitionExec: partitioning=REDACTED
| | CooperativeExec |
| | StreamScanAdapter: [<SendableRecordBatchStream>], schema: [Schema { fields: [Field { name: "number", data_type: UInt32 }], metadata: {"greptime:version": "0"} }] |
| | |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| | CooperativeExec |
| | StreamScanAdapter: [<SendableRecordBatchStream>], schema: [Schema { fields: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {"greptime:version": "0"} }] |
| | |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
explain select * from numbers order by number desc;
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Sort: numbers.number DESC NULLS FIRST |
| | Projection: numbers.number |
| | TableScan: numbers |
| | ]] |
| physical_plan | SortPreservingMergeExec: [number@0 DESC] |
| | CooperativeExec |
| | SortExec: expr=[number@0 DESC], preserve_partitioning=[true] |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Sort: numbers.number DESC NULLS FIRST |
| | Projection: numbers.number |
| | TableScan: numbers |
| | ]] |
| physical_plan | SortPreservingMergeExec: [number@0 DESC] |
| | CooperativeExec |
| | SortExec: expr=[number@0 DESC], preserve_partitioning=[true] |
| | RepartitionExec: partitioning=REDACTED
| | CooperativeExec |
| | StreamScanAdapter: [<SendableRecordBatchStream>], schema: [Schema { fields: [Field { name: "number", data_type: UInt32 }], metadata: {"greptime:version": "0"} }] |
| | |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| | CooperativeExec |
| | StreamScanAdapter: [<SendableRecordBatchStream>], schema: [Schema { fields: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {"greptime:version": "0"} }] |
| | |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
explain select * from numbers order by number asc;
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Sort: numbers.number ASC NULLS LAST |
| | Projection: numbers.number |
| | TableScan: numbers |
| | ]] |
| physical_plan | SortPreservingMergeExec: [number@0 ASC NULLS LAST] |
| | CooperativeExec |
| | SortExec: expr=[number@0 ASC NULLS LAST], preserve_partitioning=[true] |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Sort: numbers.number ASC NULLS LAST |
| | Projection: numbers.number |
| | TableScan: numbers |
| | ]] |
| physical_plan | SortPreservingMergeExec: [number@0 ASC NULLS LAST] |
| | CooperativeExec |
| | SortExec: expr=[number@0 ASC NULLS LAST], preserve_partitioning=[true] |
| | RepartitionExec: partitioning=REDACTED
| | CooperativeExec |
| | StreamScanAdapter: [<SendableRecordBatchStream>], schema: [Schema { fields: [Field { name: "number", data_type: UInt32 }], metadata: {"greptime:version": "0"} }] |
| | |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| | CooperativeExec |
| | StreamScanAdapter: [<SendableRecordBatchStream>], schema: [Schema { fields: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {"greptime:version": "0"} }] |
| | |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
explain select * from numbers order by number desc limit 10;
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Limit: skip=0, fetch=10 |
| | Sort: numbers.number DESC NULLS FIRST |
| | Projection: numbers.number |
| | TableScan: numbers |
| | ]] |
| physical_plan | SortPreservingMergeExec: [number@0 DESC], fetch=10 |
| | CooperativeExec |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Limit: skip=0, fetch=10 |
| | Sort: numbers.number DESC NULLS FIRST |
| | Projection: numbers.number |
| | TableScan: numbers |
| | ]] |
| physical_plan | SortPreservingMergeExec: [number@0 DESC], fetch=10 |
| | CooperativeExec |
| | RepartitionExec: partitioning=REDACTED
| | SortExec: TopK(fetch=10), expr=[number@0 DESC], preserve_partitioning=[false] |
| | CooperativeExec |
| | StreamScanAdapter: [<SendableRecordBatchStream>], schema: [Schema { fields: [Field { name: "number", data_type: UInt32 }], metadata: {"greptime:version": "0"} }] |
| | |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| | SortExec: TopK(fetch=10), expr=[number@0 DESC], preserve_partitioning=[false] |
| | CooperativeExec |
| | StreamScanAdapter: [<SendableRecordBatchStream>], schema: [Schema { fields: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {"greptime:version": "0"} }] |
| | |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
explain select * from numbers order by number asc limit 10;
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Limit: skip=0, fetch=10 |
| | Sort: numbers.number ASC NULLS LAST |
| | Projection: numbers.number |
| | TableScan: numbers |
| | ]] |
| physical_plan | SortPreservingMergeExec: [number@0 ASC NULLS LAST], fetch=10 |
| | CooperativeExec |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Limit: skip=0, fetch=10 |
| | Sort: numbers.number ASC NULLS LAST |
| | Projection: numbers.number |
| | TableScan: numbers |
| | ]] |
| physical_plan | SortPreservingMergeExec: [number@0 ASC NULLS LAST], fetch=10 |
| | CooperativeExec |
| | RepartitionExec: partitioning=REDACTED
| | SortExec: TopK(fetch=10), expr=[number@0 ASC NULLS LAST], preserve_partitioning=[false] |
| | CooperativeExec |
| | StreamScanAdapter: [<SendableRecordBatchStream>], schema: [Schema { fields: [Field { name: "number", data_type: UInt32 }], metadata: {"greptime:version": "0"} }] |
| | |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| | SortExec: TopK(fetch=10), expr=[number@0 ASC NULLS LAST], preserve_partitioning=[false] |
| | CooperativeExec |
| | StreamScanAdapter: [<SendableRecordBatchStream>], schema: [Schema { fields: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {"greptime:version": "0"} }] |
| | |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

View File

@@ -38,7 +38,7 @@ Affected Rows: 3
SELECT AVG(i), AVG(1), AVG(DISTINCT i), AVG(NULL) FROM integers;
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Execution error: Function 'avg' user-defined coercion failed with "Error during planning: Avg does not support inputs of type Null." No function matches the given name and argument types 'avg(Null)'. You might need to add explicit type casts.
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Execution error: Function 'avg' user-defined coercion failed with "Error during planning: The function \"avg\" does not support inputs of type Null." No function matches the given name and argument types 'avg(Null)'. You might need to add explicit type casts.
Candidate functions:
avg(UserDefined)

View File

@@ -1,19 +1,19 @@
--- date_add ---
SELECT date_add('2023-12-06 07:39:46.222'::TIMESTAMP_MS, INTERVAL '5 day');
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
| date_add(arrow_cast(Utf8("2023-12-06 07:39:46.222"),Utf8("Timestamp(ms)")),IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 5, nanoseconds: 0 }")) |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
| 2023-12-11T07:39:46.222 |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| date_add(arrow_cast(Utf8("2023-12-06 07:39:46.222"),Utf8("Timestamp(Millisecond, None)")),IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 5, nanoseconds: 0 }")) |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| 2023-12-11T07:39:46.222 |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
SELECT date_add('2023-12-06 07:39:46.222'::TIMESTAMP_MS, '5 day');
+-------------------------------------------------------------------------------------------+
| date_add(arrow_cast(Utf8("2023-12-06 07:39:46.222"),Utf8("Timestamp(ms)")),Utf8("5 day")) |
+-------------------------------------------------------------------------------------------+
| 2023-12-11T07:39:46.222 |
+-------------------------------------------------------------------------------------------+
+----------------------------------------------------------------------------------------------------------+
| date_add(arrow_cast(Utf8("2023-12-06 07:39:46.222"),Utf8("Timestamp(Millisecond, None)")),Utf8("5 day")) |
+----------------------------------------------------------------------------------------------------------+
| 2023-12-11T07:39:46.222 |
+----------------------------------------------------------------------------------------------------------+
SELECT date_add('2023-12-06'::DATE, INTERVAL '3 month 5 day');
@@ -34,19 +34,19 @@ SELECT date_add('2023-12-06'::DATE, '3 month 5 day');
--- date_sub ---
SELECT date_sub('2023-12-06 07:39:46.222'::TIMESTAMP_MS, INTERVAL '5 day');
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
| date_sub(arrow_cast(Utf8("2023-12-06 07:39:46.222"),Utf8("Timestamp(ms)")),IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 5, nanoseconds: 0 }")) |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
| 2023-12-01T07:39:46.222 |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| date_sub(arrow_cast(Utf8("2023-12-06 07:39:46.222"),Utf8("Timestamp(Millisecond, None)")),IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 5, nanoseconds: 0 }")) |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| 2023-12-01T07:39:46.222 |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
SELECT date_sub('2023-12-06 07:39:46.222'::TIMESTAMP_MS, '5 day');
+-------------------------------------------------------------------------------------------+
| date_sub(arrow_cast(Utf8("2023-12-06 07:39:46.222"),Utf8("Timestamp(ms)")),Utf8("5 day")) |
+-------------------------------------------------------------------------------------------+
| 2023-12-01T07:39:46.222 |
+-------------------------------------------------------------------------------------------+
+----------------------------------------------------------------------------------------------------------+
| date_sub(arrow_cast(Utf8("2023-12-06 07:39:46.222"),Utf8("Timestamp(Millisecond, None)")),Utf8("5 day")) |
+----------------------------------------------------------------------------------------------------------+
| 2023-12-01T07:39:46.222 |
+----------------------------------------------------------------------------------------------------------+
SELECT date_sub('2023-12-06'::DATE, INTERVAL '3 month 5 day');
@@ -67,28 +67,28 @@ SELECT date_sub('2023-12-06'::DATE, '3 month 5 day');
--- date_format ---
SELECT date_format('2023-12-06 07:39:46.222'::TIMESTAMP_MS, '%Y-%m-%d %H:%M:%S:%3f');
+--------------------------------------------------------------------------------------------------------------+
| date_format(arrow_cast(Utf8("2023-12-06 07:39:46.222"),Utf8("Timestamp(ms)")),Utf8("%Y-%m-%d %H:%M:%S:%3f")) |
+--------------------------------------------------------------------------------------------------------------+
| 2023-12-06 07:39:46:222 |
+--------------------------------------------------------------------------------------------------------------+
+-----------------------------------------------------------------------------------------------------------------------------+
| date_format(arrow_cast(Utf8("2023-12-06 07:39:46.222"),Utf8("Timestamp(Millisecond, None)")),Utf8("%Y-%m-%d %H:%M:%S:%3f")) |
+-----------------------------------------------------------------------------------------------------------------------------+
| 2023-12-06 07:39:46:222 |
+-----------------------------------------------------------------------------------------------------------------------------+
SELECT date_format('2023-12-06 07:39:46.222'::TIMESTAMP_S, '%Y-%m-%d %H:%M:%S:%3f');
+-------------------------------------------------------------------------------------------------------------+
| date_format(arrow_cast(Utf8("2023-12-06 07:39:46.222"),Utf8("Timestamp(s)")),Utf8("%Y-%m-%d %H:%M:%S:%3f")) |
+-------------------------------------------------------------------------------------------------------------+
| 2023-12-06 07:39:46:000 |
+-------------------------------------------------------------------------------------------------------------+
+------------------------------------------------------------------------------------------------------------------------+
| date_format(arrow_cast(Utf8("2023-12-06 07:39:46.222"),Utf8("Timestamp(Second, None)")),Utf8("%Y-%m-%d %H:%M:%S:%3f")) |
+------------------------------------------------------------------------------------------------------------------------+
| 2023-12-06 07:39:46:000 |
+------------------------------------------------------------------------------------------------------------------------+
--- datetime not supported yet ---
SELECT date_format('2023-12-06 07:39:46.222'::DATETIME, '%Y-%m-%d %H:%M:%S:%3f');
+--------------------------------------------------------------------------------------------------------------+
| date_format(arrow_cast(Utf8("2023-12-06 07:39:46.222"),Utf8("Timestamp(µs)")),Utf8("%Y-%m-%d %H:%M:%S:%3f")) |
+--------------------------------------------------------------------------------------------------------------+
| 2023-12-06 07:39:46:222 |
+--------------------------------------------------------------------------------------------------------------+
+-----------------------------------------------------------------------------------------------------------------------------+
| date_format(arrow_cast(Utf8("2023-12-06 07:39:46.222"),Utf8("Timestamp(Microsecond, None)")),Utf8("%Y-%m-%d %H:%M:%S:%3f")) |
+-----------------------------------------------------------------------------------------------------------------------------+
| 2023-12-06 07:39:46:222 |
+-----------------------------------------------------------------------------------------------------------------------------+
SELECT date_format('2023-12-06'::DATE, '%m-%d');

View File

@@ -335,11 +335,11 @@ FROM cell_cte;
SELECT json_encode_path(37.76938, -122.3889, 1728083375::TimestampSecond);
+-----------------------------------------------------------------------------------------------------------+
| json_encode_path(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(1728083375),Utf8("Timestamp(s)"))) |
+-----------------------------------------------------------------------------------------------------------+
| [[-122.3889,37.76938]] |
+-----------------------------------------------------------------------------------------------------------+
+----------------------------------------------------------------------------------------------------------------------+
| json_encode_path(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(1728083375),Utf8("Timestamp(Second, None)"))) |
+----------------------------------------------------------------------------------------------------------------------+
| [[-122.3889,37.76938]] |
+----------------------------------------------------------------------------------------------------------------------+
SELECT json_encode_path(lat, lon, ts)
FROM(
@@ -360,11 +360,11 @@ FROM(
SELECT UNNEST(geo_path(37.76938, -122.3889, 1728083375::TimestampSecond));
+-----------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------+
| __unnest_placeholder(geo_path(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(1728083375),Utf8("Timestamp(s)")))).lat | __unnest_placeholder(geo_path(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(1728083375),Utf8("Timestamp(s)")))).lng |
+-----------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------+
| [37.76938] | [-122.3889] |
+-----------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------+
+----------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------+
| __unnest_placeholder(geo_path(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(1728083375),Utf8("Timestamp(Second, None)")))).lat | __unnest_placeholder(geo_path(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(1728083375),Utf8("Timestamp(Second, None)")))).lng |
+----------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------+
| [37.76938] | [-122.3889] |
+----------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------+
SELECT UNNEST(geo_path(lat, lon, ts))
FROM(

View File

@@ -22,9 +22,9 @@ select GREATEST('2000-02-11'::Date, '2020-12-30'::Date);
select GREATEST('2021-07-01 00:00:00'::Timestamp, '2024-07-01 00:00:00'::Timestamp);
+---------------------------------------------------------------------------------------------------------------------------------------+
| greatest(arrow_cast(Utf8("2021-07-01 00:00:00"),Utf8("Timestamp(ms)")),arrow_cast(Utf8("2024-07-01 00:00:00"),Utf8("Timestamp(ms)"))) |
+---------------------------------------------------------------------------------------------------------------------------------------+
| 2024-07-01T00:00:00 |
+---------------------------------------------------------------------------------------------------------------------------------------+
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| greatest(arrow_cast(Utf8("2021-07-01 00:00:00"),Utf8("Timestamp(Millisecond, None)")),arrow_cast(Utf8("2024-07-01 00:00:00"),Utf8("Timestamp(Millisecond, None)"))) |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| 2024-07-01T00:00:00 |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+

View File

@@ -1,6 +1,6 @@
-- Migrated from DuckDB test: test/sql/join/ complex condition tests
-- Tests complex join conditions and predicates
CREATE TABLE sales_reps(rep_id INTEGER, "name" VARCHAR, "region" VARCHAR, quota INTEGER, ts TIMESTAMP TIME INDEX);
CREATE TABLE sales_reps(rep_id INTEGER, "name" VARCHAR, region VARCHAR, quota INTEGER, ts TIMESTAMP TIME INDEX);
Affected Rows: 0

View File

@@ -1,7 +1,7 @@
-- Migrated from DuckDB test: test/sql/join/ complex condition tests
-- Tests complex join conditions and predicates
CREATE TABLE sales_reps(rep_id INTEGER, "name" VARCHAR, "region" VARCHAR, quota INTEGER, ts TIMESTAMP TIME INDEX);
CREATE TABLE sales_reps(rep_id INTEGER, "name" VARCHAR, region VARCHAR, quota INTEGER, ts TIMESTAMP TIME INDEX);
CREATE TABLE customer_accounts(account_id INTEGER, account_name VARCHAR, region VARCHAR, rep_id INTEGER, revenue INTEGER, ts TIMESTAMP TIME INDEX);

View File

@@ -3,7 +3,7 @@ CREATE TABLE events_push(event_id INTEGER, user_id INTEGER, event_type VARCHAR,
Affected Rows: 0
CREATE TABLE users_push(user_id INTEGER, user_name VARCHAR, "region" VARCHAR, ts TIMESTAMP TIME INDEX);
CREATE TABLE users_push(user_id INTEGER, user_name VARCHAR, region VARCHAR, ts TIMESTAMP TIME INDEX);
Affected Rows: 0

View File

@@ -2,7 +2,7 @@
CREATE TABLE events_push(event_id INTEGER, user_id INTEGER, event_type VARCHAR, "value" INTEGER, ts TIMESTAMP TIME INDEX);
CREATE TABLE users_push(user_id INTEGER, user_name VARCHAR, "region" VARCHAR, ts TIMESTAMP TIME INDEX);
CREATE TABLE users_push(user_id INTEGER, user_name VARCHAR, region VARCHAR, ts TIMESTAMP TIME INDEX);
INSERT INTO events_push VALUES (1, 100, 'click', 1, 1000), (2, 100, 'view', 2, 2000), (3, 200, 'click', 1, 3000), (4, 300, 'purchase', 5, 4000);

View File

@@ -45,11 +45,11 @@ Error: 3000(PlanQuery), Failed to plan SQL: No field named a.
SELECT a FROM test LIMIT SUM(42);
Error: 1001(Unsupported), This feature is not implemented: Unsupported LIMIT expression: Some(AggregateFunction(AggregateFunction { func: AggregateUDF { inner: Sum { signature: Signature { type_signature: UserDefined, volatility: Immutable, parameter_names: None } } }, params: AggregateFunctionParams { args: [Literal(Int64(42), None)], distinct: false, filter: None, order_by: [], null_treatment: None } }))
Error: 1001(Unsupported), This feature is not implemented: Unsupported LIMIT expression: Some(AggregateFunction(AggregateFunction { func: AggregateUDF { inner: Sum { signature: Signature { type_signature: UserDefined, volatility: Immutable } } }, params: AggregateFunctionParams { args: [Literal(Int64(42), None)], distinct: false, filter: None, order_by: [], null_treatment: None } }))
SELECT a FROM test LIMIT row_number() OVER ();
Error: 3001(EngineExecuteQuery), This feature is not implemented: Unsupported LIMIT expression: Some(Cast(Cast { expr: WindowFunction(WindowFunction { fun: WindowUDF(WindowUDF { inner: RowNumber { signature: Signature { type_signature: Nullary, volatility: Immutable, parameter_names: None } } }), params: WindowFunctionParams { args: [], partition_by: [], order_by: [], window_frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }, filter: None, null_treatment: None, distinct: false } }), data_type: Int64 }))
Error: 3001(EngineExecuteQuery), This feature is not implemented: Unsupported LIMIT expression: Some(Cast(Cast { expr: WindowFunction(WindowFunction { fun: WindowUDF(WindowUDF { inner: RowNumber { signature: Signature { type_signature: Nullary, volatility: Immutable } } }), params: WindowFunctionParams { args: [], partition_by: [], order_by: [], window_frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }, filter: None, null_treatment: None, distinct: false } }), data_type: Int64 }))
CREATE TABLE test2 (a STRING, ts TIMESTAMP TIME INDEX);
@@ -102,23 +102,23 @@ SELECT * FROM integers LIMIT 4;
SELECT * FROM integers as int LIMIT (SELECT MIN(integers.i) FROM integers);
Error: 3001(EngineExecuteQuery), DataFusion error: Error during planning: Expected LIMIT to be an integer or null, but got Timestamp(ms)
Error: 3001(EngineExecuteQuery), DataFusion error: Error during planning: Expected LIMIT to be an integer or null, but got Timestamp(Millisecond, None)
SELECT * FROM integers as int OFFSET (SELECT MIN(integers.i) FROM integers);
Error: 3001(EngineExecuteQuery), DataFusion error: Error during planning: Expected OFFSET to be an integer or null, but got Timestamp(ms)
Error: 3001(EngineExecuteQuery), DataFusion error: Error during planning: Expected OFFSET to be an integer or null, but got Timestamp(Millisecond, None)
SELECT * FROM integers as int LIMIT (SELECT MAX(integers.i) FROM integers) OFFSET (SELECT MIN(integers.i) FROM integers);
Error: 3001(EngineExecuteQuery), DataFusion error: Error during planning: Expected LIMIT to be an integer or null, but got Timestamp(ms)
Error: 3001(EngineExecuteQuery), DataFusion error: Error during planning: Expected LIMIT to be an integer or null, but got Timestamp(Millisecond, None)
SELECT * FROM integers as int LIMIT (SELECT max(integers.i) FROM integers where i > 5);
Error: 3001(EngineExecuteQuery), DataFusion error: Error during planning: Cannot infer common argument type for comparison operation Timestamp(ms) > Int64
Error: 3001(EngineExecuteQuery), DataFusion error: Error during planning: Cannot infer common argument type for comparison operation Timestamp(Millisecond, None) > Int64
SELECT * FROM integers as int LIMIT (SELECT max(integers.i) FROM integers where i > 5);
Error: 3001(EngineExecuteQuery), DataFusion error: Error during planning: Cannot infer common argument type for comparison operation Timestamp(ms) > Int64
Error: 3001(EngineExecuteQuery), DataFusion error: Error during planning: Cannot infer common argument type for comparison operation Timestamp(Millisecond, None) > Int64
SELECT * FROM integers as int LIMIT (SELECT NULL);

View File

@@ -732,7 +732,7 @@ CREATE TABLE IF NOT EXISTS node_network_transmit_bytes_total (
host STRING NULL,
job STRING NULL,
node STRING NULL,
"region" STRING NULL,
region STRING NULL,
src_port STRING NULL,
src STRING NULL,
src_namespace STRING NULL,

View File

@@ -349,7 +349,7 @@ CREATE TABLE IF NOT EXISTS node_network_transmit_bytes_total (
host STRING NULL,
job STRING NULL,
node STRING NULL,
"region" STRING NULL,
region STRING NULL,
src_port STRING NULL,
src STRING NULL,
src_namespace STRING NULL,

View File

@@ -45,19 +45,19 @@ SELECT
FROM
ngx_access_log);
+---------------+-------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Projection: count(Int64(1)) AS count(*) |
| | Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] |
| | Projection: TimestampNanosecond(NOW, None) AS now() |
| | TableScan: ngx_access_log |
| | ]] |
| physical_plan | CooperativeExec |
+---------------+-----------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Projection: count(Int64(1)) AS count(*) |
| | Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] |
| | Projection: TimestampNanosecond(NOW, Some("+00:00")) AS now() |
| | TableScan: ngx_access_log |
| | ]] |
| physical_plan | CooperativeExec |
| | MergeScanExec: REDACTED
| | |
+---------------+-------------------------------------------------------------------------+
| | |
+---------------+-----------------------------------------------------------------------------------+
DROP TABLE ngx_access_log;

View File

@@ -477,28 +477,28 @@ SELECT
FROM tql_base
ORDER BY ts;
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: tql_base.ts ASC NULLS LAST |
| | Projection: tql_base.ts, tql_base.val, lag(tql_base.val,Int64(1)) ORDER BY [tql_base.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS prev_value |
| | WindowAggr: windowExpr=[[lag(tql_base.val, Int64(1)) ORDER BY [tql_base.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | SubqueryAlias: tql_base |
| | Projection: metric.ts AS ts, metric.val AS val |
| | PromInstantManipulate: range=[0..40000], lookback=[300000], interval=[10000], time index=[ts] |
| | PromSeriesDivide: tags=[] |
| | Filter: metric.ts >= TimestampMillisecond(-300000, None) AND metric.ts <= TimestampMillisecond(340000, None) |
| | TableScan: metric |
| | ]] |
| physical_plan | ProjectionExec: expr=[ts@0 as ts, val@1 as val, lag(tql_base.val,Int64(1)) ORDER BY [tql_base.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as prev_value] |
| | BoundedWindowAggExec: wdw=[lag(tql_base.val,Int64(1)) ORDER BY [tql_base.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "lag(tql_base.val,Int64(1)) ORDER BY [tql_base.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Float64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] |
| | SortPreservingMergeExec: [ts@0 ASC NULLS LAST] |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: tql_base.ts ASC NULLS LAST |
| | Projection: tql_base.ts, tql_base.val, lag(tql_base.val,Int64(1)) ORDER BY [tql_base.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS prev_value |
| | WindowAggr: windowExpr=[[lag(tql_base.val, Int64(1)) ORDER BY [tql_base.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | SubqueryAlias: tql_base |
| | Projection: metric.ts AS ts, metric.val AS val |
| | PromInstantManipulate: range=[0..40000], lookback=[300000], interval=[10000], time index=[ts] |
| | PromSeriesDivide: tags=[] |
| | Filter: metric.ts >= TimestampMillisecond(-300000, None) AND metric.ts <= TimestampMillisecond(340000, None) |
| | TableScan: metric |
| | ]] |
| physical_plan | ProjectionExec: expr=[ts@0 as ts, val@1 as val, lag(tql_base.val,Int64(1)) ORDER BY [tql_base.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as prev_value] |
| | BoundedWindowAggExec: wdw=[lag(tql_base.val,Int64(1)) ORDER BY [tql_base.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { name: "lag(tql_base.val,Int64(1)) ORDER BY [tql_base.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] |
| | SortPreservingMergeExec: [ts@0 ASC NULLS LAST] |
| | SortExec: expr=[ts@0 ASC NULLS LAST], preserve_REDACTED
| | CooperativeExec |
| | CooperativeExec |
| | MergeScanExec: REDACTED
| | |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| | |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- TQL CTE with HAVING clause
WITH tql_grouped(ts, host, cpu) AS (

View File

@@ -74,7 +74,7 @@ SELECT arrow_typeof(INTERVAL '1 month');
-- INTERVAL + TIME CONSTANT
SELECT current_time() + INTERVAL '1 hour';
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Cannot coerce arithmetic expression Time64(ns) + Interval(MonthDayNano) to valid types
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Cannot coerce arithmetic expression Time64(Nanosecond) + Interval(MonthDayNano) to valid types
-- table with interval type test
-- breaking change from #5422 table do not support interval type will raise an error
@@ -185,67 +185,67 @@ SELECT INTERVAL '2h' + INTERVAL 'P3Y3M700DT133H17M36.789S';
select '2022-01-01T00:00:01'::timestamp + '1 days'::interval;
+--------------------------------------------------------------------------------+
| arrow_cast(Utf8("2022-01-01T00:00:01"),Utf8("Timestamp(ms)")) + Utf8("1 days") |
+--------------------------------------------------------------------------------+
| 2022-01-02T00:00:01 |
+--------------------------------------------------------------------------------+
+-----------------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2022-01-01T00:00:01"),Utf8("Timestamp(Millisecond, None)")) + Utf8("1 days") |
+-----------------------------------------------------------------------------------------------+
| 2022-01-02T00:00:01 |
+-----------------------------------------------------------------------------------------------+
select '2022-01-01T00:00:01'::timestamp + '2 days'::interval;
+--------------------------------------------------------------------------------+
| arrow_cast(Utf8("2022-01-01T00:00:01"),Utf8("Timestamp(ms)")) + Utf8("2 days") |
+--------------------------------------------------------------------------------+
| 2022-01-03T00:00:01 |
+--------------------------------------------------------------------------------+
+-----------------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2022-01-01T00:00:01"),Utf8("Timestamp(Millisecond, None)")) + Utf8("2 days") |
+-----------------------------------------------------------------------------------------------+
| 2022-01-03T00:00:01 |
+-----------------------------------------------------------------------------------------------+
select '2022-01-01T00:00:01'::timestamp - '1 days'::interval;
+--------------------------------------------------------------------------------+
| arrow_cast(Utf8("2022-01-01T00:00:01"),Utf8("Timestamp(ms)")) - Utf8("1 days") |
+--------------------------------------------------------------------------------+
| 2021-12-31T00:00:01 |
+--------------------------------------------------------------------------------+
+-----------------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2022-01-01T00:00:01"),Utf8("Timestamp(Millisecond, None)")) - Utf8("1 days") |
+-----------------------------------------------------------------------------------------------+
| 2021-12-31T00:00:01 |
+-----------------------------------------------------------------------------------------------+
select '2022-01-01T00:00:01'::timestamp - '2 days'::interval;
+--------------------------------------------------------------------------------+
| arrow_cast(Utf8("2022-01-01T00:00:01"),Utf8("Timestamp(ms)")) - Utf8("2 days") |
+--------------------------------------------------------------------------------+
| 2021-12-30T00:00:01 |
+--------------------------------------------------------------------------------+
+-----------------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2022-01-01T00:00:01"),Utf8("Timestamp(Millisecond, None)")) - Utf8("2 days") |
+-----------------------------------------------------------------------------------------------+
| 2021-12-30T00:00:01 |
+-----------------------------------------------------------------------------------------------+
select '2022-01-01T00:00:01'::timestamp + '1 month'::interval;
+---------------------------------------------------------------------------------+
| arrow_cast(Utf8("2022-01-01T00:00:01"),Utf8("Timestamp(ms)")) + Utf8("1 month") |
+---------------------------------------------------------------------------------+
| 2022-02-01T00:00:01 |
+---------------------------------------------------------------------------------+
+------------------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2022-01-01T00:00:01"),Utf8("Timestamp(Millisecond, None)")) + Utf8("1 month") |
+------------------------------------------------------------------------------------------------+
| 2022-02-01T00:00:01 |
+------------------------------------------------------------------------------------------------+
select '2022-01-01T00:00:01'::timestamp + '2 months'::interval;
+----------------------------------------------------------------------------------+
| arrow_cast(Utf8("2022-01-01T00:00:01"),Utf8("Timestamp(ms)")) + Utf8("2 months") |
+----------------------------------------------------------------------------------+
| 2022-03-01T00:00:01 |
+----------------------------------------------------------------------------------+
+-------------------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2022-01-01T00:00:01"),Utf8("Timestamp(Millisecond, None)")) + Utf8("2 months") |
+-------------------------------------------------------------------------------------------------+
| 2022-03-01T00:00:01 |
+-------------------------------------------------------------------------------------------------+
select '2022-01-01T00:00:01'::timestamp + '1 year'::interval;
+--------------------------------------------------------------------------------+
| arrow_cast(Utf8("2022-01-01T00:00:01"),Utf8("Timestamp(ms)")) + Utf8("1 year") |
+--------------------------------------------------------------------------------+
| 2023-01-01T00:00:01 |
+--------------------------------------------------------------------------------+
+-----------------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2022-01-01T00:00:01"),Utf8("Timestamp(Millisecond, None)")) + Utf8("1 year") |
+-----------------------------------------------------------------------------------------------+
| 2023-01-01T00:00:01 |
+-----------------------------------------------------------------------------------------------+
select '2023-01-01T00:00:01'::timestamp + '2 years'::interval;
+---------------------------------------------------------------------------------+
| arrow_cast(Utf8("2023-01-01T00:00:01"),Utf8("Timestamp(ms)")) + Utf8("2 years") |
+---------------------------------------------------------------------------------+
| 2025-01-01T00:00:01 |
+---------------------------------------------------------------------------------+
+------------------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2023-01-01T00:00:01"),Utf8("Timestamp(Millisecond, None)")) + Utf8("2 years") |
+------------------------------------------------------------------------------------------------+
| 2025-01-01T00:00:01 |
+------------------------------------------------------------------------------------------------+
-- DATE + INTERVAL
SELECT DATE '2000-10-30' + '1 days'::interval;

View File

@@ -75,31 +75,31 @@ SELECT MAX(t) FROM timestamp;
SELECT SUM(t) FROM timestamp;
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Execution error: Function 'sum' user-defined coercion failed with "Execution error: Sum not supported for Timestamp(ms)" No function matches the given name and argument types 'sum(Timestamp(ms))'. You might need to add explicit type casts.
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Execution error: Function 'sum' user-defined coercion failed with "Execution error: Sum not supported for Timestamp(Millisecond, None)" No function matches the given name and argument types 'sum(Timestamp(Millisecond, None))'. You might need to add explicit type casts.
Candidate functions:
sum(UserDefined)
SELECT AVG(t) FROM timestamp;
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Execution error: Function 'avg' user-defined coercion failed with "Error during planning: Avg does not support inputs of type Timestamp(ms)." No function matches the given name and argument types 'avg(Timestamp(ms))'. You might need to add explicit type casts.
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Execution error: Function 'avg' user-defined coercion failed with "Error during planning: The function \"avg\" does not support inputs of type Timestamp(Millisecond, None)." No function matches the given name and argument types 'avg(Timestamp(Millisecond, None))'. You might need to add explicit type casts.
Candidate functions:
avg(UserDefined)
SELECT t+t FROM timestamp;
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Cannot get result type for temporal operation Timestamp(ms) + Timestamp(ms): Invalid argument error: Invalid timestamp arithmetic operation: Timestamp(ms) + Timestamp(ms)
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Cannot get result type for temporal operation Timestamp(Millisecond, None) + Timestamp(Millisecond, None): Invalid argument error: Invalid timestamp arithmetic operation: Timestamp(Millisecond, None) + Timestamp(Millisecond, None)
SELECT t*t FROM timestamp;
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Cannot get result type for temporal operation Timestamp(ms) * Timestamp(ms): Invalid argument error: Invalid timestamp arithmetic operation: Timestamp(ms) * Timestamp(ms)
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Cannot get result type for temporal operation Timestamp(Millisecond, None) * Timestamp(Millisecond, None): Invalid argument error: Invalid timestamp arithmetic operation: Timestamp(Millisecond, None) * Timestamp(Millisecond, None)
SELECT t/t FROM timestamp;
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Cannot get result type for temporal operation Timestamp(ms) / Timestamp(ms): Invalid argument error: Invalid timestamp arithmetic operation: Timestamp(ms) / Timestamp(ms)
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Cannot get result type for temporal operation Timestamp(Millisecond, None) / Timestamp(Millisecond, None): Invalid argument error: Invalid timestamp arithmetic operation: Timestamp(Millisecond, None) / Timestamp(Millisecond, None)
SELECT t%t FROM timestamp;
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Cannot get result type for temporal operation Timestamp(ms) % Timestamp(ms): Invalid argument error: Invalid timestamp arithmetic operation: Timestamp(ms) % Timestamp(ms)
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Cannot get result type for temporal operation Timestamp(Millisecond, None) % Timestamp(Millisecond, None): Invalid argument error: Invalid timestamp arithmetic operation: Timestamp(Millisecond, None) % Timestamp(Millisecond, None)
SELECT t-t FROM timestamp;
@@ -166,11 +166,11 @@ SELECT t::TIME FROM timestamp WHERE EXTRACT(YEAR from t)=2007 ORDER BY 1;
SELECT (DATE '1992-01-01')::TIMESTAMP;
+------------------------------------------------------+
| arrow_cast(Utf8("1992-01-01"),Utf8("Timestamp(ms)")) |
+------------------------------------------------------+
| 1992-01-01T00:00:00 |
+------------------------------------------------------+
+---------------------------------------------------------------------+
| arrow_cast(Utf8("1992-01-01"),Utf8("Timestamp(Millisecond, None)")) |
+---------------------------------------------------------------------+
| 1992-01-01T00:00:00 |
+---------------------------------------------------------------------+
SELECT TIMESTAMP '2008-01-01 00:00:01.5'::VARCHAR;
@@ -195,9 +195,9 @@ Affected Rows: 0
-- SQLNESS PROTOCOL POSTGRES
SELECT '2025-04-14 20:42:19.021000'::TIMESTAMP - '2025-04-14 20:42:18.103000'::TIMESTAMP;
+---------------------------------------------------------------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2025-04-14 20:42:19.021000"),Utf8("Timestamp(ms)")) - arrow_cast(Utf8("2025-04-14 20:42:18.103000"),Utf8("Timestamp(ms)")) |
+---------------------------------------------------------------------------------------------------------------------------------------------+
| 00:00:00.918000 |
+---------------------------------------------------------------------------------------------------------------------------------------------+
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2025-04-14 20:42:19.021000"),Utf8("Timestamp(Millisecond, None)")) - arrow_cast(Utf8("2025-04-14 20:42:18.103000"),Utf8("Timestamp(Millisecond, None)")) |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| 00:00:00.918000 |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

View File

@@ -33,59 +33,59 @@ SELECT EXTRACT(MICROSECONDS FROM sec), EXTRACT(MICROSECONDS FROM msec), EXTRACT(
-- any other precision is rounded up (e.g. 1/2 -> 3, 4/5 -> 6, 7/8 -> 9)
SELECT TIMESTAMP '2020-01-01 01:23:45.123456789'::TIMESTAMP(0);
+------------------------------------------------------------------------+
| arrow_cast(Utf8("2020-01-01 01:23:45.123456789"),Utf8("Timestamp(s)")) |
+------------------------------------------------------------------------+
| 2020-01-01T01:23:45 |
+------------------------------------------------------------------------+
+-----------------------------------------------------------------------------------+
| arrow_cast(Utf8("2020-01-01 01:23:45.123456789"),Utf8("Timestamp(Second, None)")) |
+-----------------------------------------------------------------------------------+
| 2020-01-01T01:23:45 |
+-----------------------------------------------------------------------------------+
SELECT TIMESTAMP '2020-01-01 01:23:45.123456789'::TIMESTAMP(3);
+-------------------------------------------------------------------------+
| arrow_cast(Utf8("2020-01-01 01:23:45.123456789"),Utf8("Timestamp(ms)")) |
+-------------------------------------------------------------------------+
| 2020-01-01T01:23:45.123 |
+-------------------------------------------------------------------------+
+----------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2020-01-01 01:23:45.123456789"),Utf8("Timestamp(Millisecond, None)")) |
+----------------------------------------------------------------------------------------+
| 2020-01-01T01:23:45.123 |
+----------------------------------------------------------------------------------------+
SELECT TIMESTAMP '2020-01-01 01:23:45.123456789'::TIMESTAMP(6);
+-------------------------------------------------------------------------+
| arrow_cast(Utf8("2020-01-01 01:23:45.123456789"),Utf8("Timestamp(µs)")) |
+-------------------------------------------------------------------------+
| 2020-01-01T01:23:45.123456 |
+-------------------------------------------------------------------------+
+----------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2020-01-01 01:23:45.123456789"),Utf8("Timestamp(Microsecond, None)")) |
+----------------------------------------------------------------------------------------+
| 2020-01-01T01:23:45.123456 |
+----------------------------------------------------------------------------------------+
SELECT TIMESTAMP '2020-01-01 01:23:45.123456789'::TIMESTAMP(9);
+-------------------------------------------------------------------------+
| arrow_cast(Utf8("2020-01-01 01:23:45.123456789"),Utf8("Timestamp(ns)")) |
+-------------------------------------------------------------------------+
| 2020-01-01T01:23:45.123456789 |
+-------------------------------------------------------------------------+
+---------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2020-01-01 01:23:45.123456789"),Utf8("Timestamp(Nanosecond, None)")) |
+---------------------------------------------------------------------------------------+
| 2020-01-01T01:23:45.123456789 |
+---------------------------------------------------------------------------------------+
SELECT TIMESTAMP '2020-01-01 01:23:45.12'::TIMESTAMP(3);
+------------------------------------------------------------------+
| arrow_cast(Utf8("2020-01-01 01:23:45.12"),Utf8("Timestamp(ms)")) |
+------------------------------------------------------------------+
| 2020-01-01T01:23:45.120 |
+------------------------------------------------------------------+
+---------------------------------------------------------------------------------+
| arrow_cast(Utf8("2020-01-01 01:23:45.12"),Utf8("Timestamp(Millisecond, None)")) |
+---------------------------------------------------------------------------------+
| 2020-01-01T01:23:45.120 |
+---------------------------------------------------------------------------------+
SELECT TIMESTAMP '2020-01-01 01:23:45.12345'::TIMESTAMP(6);
+---------------------------------------------------------------------+
| arrow_cast(Utf8("2020-01-01 01:23:45.12345"),Utf8("Timestamp(µs)")) |
+---------------------------------------------------------------------+
| 2020-01-01T01:23:45.123450 |
+---------------------------------------------------------------------+
+------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2020-01-01 01:23:45.12345"),Utf8("Timestamp(Microsecond, None)")) |
+------------------------------------------------------------------------------------+
| 2020-01-01T01:23:45.123450 |
+------------------------------------------------------------------------------------+
SELECT TIMESTAMP '2020-01-01 01:23:45.12345678'::TIMESTAMP(9);
+------------------------------------------------------------------------+
| arrow_cast(Utf8("2020-01-01 01:23:45.12345678"),Utf8("Timestamp(ns)")) |
+------------------------------------------------------------------------+
| 2020-01-01T01:23:45.123456780 |
+------------------------------------------------------------------------+
+--------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2020-01-01 01:23:45.12345678"),Utf8("Timestamp(Nanosecond, None)")) |
+--------------------------------------------------------------------------------------+
| 2020-01-01T01:23:45.123456780 |
+--------------------------------------------------------------------------------------+
DROP TABLE ts_precision;

View File

@@ -25,11 +25,11 @@ SELECT extract(YEAR from sec),extract( YEAR from milli),extract(YEAR from nano)
SELECT nano::TIMESTAMP, milli::TIMESTAMP,sec::TIMESTAMP from timestamp;
+--------------------------------------------------+---------------------------------------------------+-------------------------------------------------+
| arrow_cast(timestamp.nano,Utf8("Timestamp(ms)")) | arrow_cast(timestamp.milli,Utf8("Timestamp(ms)")) | arrow_cast(timestamp.sec,Utf8("Timestamp(ms)")) |
+--------------------------------------------------+---------------------------------------------------+-------------------------------------------------+
| 2008-01-01T00:00:01.889 | 2008-01-01T00:00:01.594 | 2008-01-01T00:00:01 |
+--------------------------------------------------+---------------------------------------------------+-------------------------------------------------+
+-----------------------------------------------------------------+------------------------------------------------------------------+----------------------------------------------------------------+
| arrow_cast(timestamp.nano,Utf8("Timestamp(Millisecond, None)")) | arrow_cast(timestamp.milli,Utf8("Timestamp(Millisecond, None)")) | arrow_cast(timestamp.sec,Utf8("Timestamp(Millisecond, None)")) |
+-----------------------------------------------------------------+------------------------------------------------------------------+----------------------------------------------------------------+
| 2008-01-01T00:00:01.889 | 2008-01-01T00:00:01.594 | 2008-01-01T00:00:01 |
+-----------------------------------------------------------------+------------------------------------------------------------------+----------------------------------------------------------------+
SELECT micro::TIMESTAMP_S as m1, micro::TIMESTAMP_MS as m2,micro::TIMESTAMP_NS as m3 from timestamp;
@@ -113,63 +113,63 @@ select nano::TIME from timestamp;
select sec::TIMESTAMP_MS from timestamp;
+-------------------------------------------------+
| arrow_cast(timestamp.sec,Utf8("Timestamp(ms)")) |
+-------------------------------------------------+
| 2008-01-01T00:00:01 |
| 2008-01-01T00:00:51 |
| 2008-01-01T00:00:11 |
+-------------------------------------------------+
+----------------------------------------------------------------+
| arrow_cast(timestamp.sec,Utf8("Timestamp(Millisecond, None)")) |
+----------------------------------------------------------------+
| 2008-01-01T00:00:01 |
| 2008-01-01T00:00:51 |
| 2008-01-01T00:00:11 |
+----------------------------------------------------------------+
select sec::TIMESTAMP_NS from timestamp;
+-------------------------------------------------+
| arrow_cast(timestamp.sec,Utf8("Timestamp(ns)")) |
+-------------------------------------------------+
| 2008-01-01T00:00:01 |
| 2008-01-01T00:00:51 |
| 2008-01-01T00:00:11 |
+-------------------------------------------------+
+---------------------------------------------------------------+
| arrow_cast(timestamp.sec,Utf8("Timestamp(Nanosecond, None)")) |
+---------------------------------------------------------------+
| 2008-01-01T00:00:01 |
| 2008-01-01T00:00:51 |
| 2008-01-01T00:00:11 |
+---------------------------------------------------------------+
select milli::TIMESTAMP_SEC from timestamp;
+--------------------------------------------------+
| arrow_cast(timestamp.milli,Utf8("Timestamp(s)")) |
+--------------------------------------------------+
| 2008-01-01T00:00:01 |
| 2008-01-01T00:00:01 |
| 2008-01-01T00:00:01 |
+--------------------------------------------------+
+-------------------------------------------------------------+
| arrow_cast(timestamp.milli,Utf8("Timestamp(Second, None)")) |
+-------------------------------------------------------------+
| 2008-01-01T00:00:01 |
| 2008-01-01T00:00:01 |
| 2008-01-01T00:00:01 |
+-------------------------------------------------------------+
select milli::TIMESTAMP_NS from timestamp;
+---------------------------------------------------+
| arrow_cast(timestamp.milli,Utf8("Timestamp(ns)")) |
+---------------------------------------------------+
| 2008-01-01T00:00:01.594 |
| 2008-01-01T00:00:01.894 |
| 2008-01-01T00:00:01.794 |
+---------------------------------------------------+
+-----------------------------------------------------------------+
| arrow_cast(timestamp.milli,Utf8("Timestamp(Nanosecond, None)")) |
+-----------------------------------------------------------------+
| 2008-01-01T00:00:01.594 |
| 2008-01-01T00:00:01.894 |
| 2008-01-01T00:00:01.794 |
+-----------------------------------------------------------------+
select nano::TIMESTAMP_SEC from timestamp;
+-------------------------------------------------+
| arrow_cast(timestamp.nano,Utf8("Timestamp(s)")) |
+-------------------------------------------------+
| 2008-01-01T00:00:01 |
| 2008-01-01T00:00:01 |
| 2008-01-01T00:00:01 |
+-------------------------------------------------+
+------------------------------------------------------------+
| arrow_cast(timestamp.nano,Utf8("Timestamp(Second, None)")) |
+------------------------------------------------------------+
| 2008-01-01T00:00:01 |
| 2008-01-01T00:00:01 |
| 2008-01-01T00:00:01 |
+------------------------------------------------------------+
select nano::TIMESTAMP_MS from timestamp;
+--------------------------------------------------+
| arrow_cast(timestamp.nano,Utf8("Timestamp(ms)")) |
+--------------------------------------------------+
| 2008-01-01T00:00:01.889 |
| 2008-01-01T00:00:01.999 |
| 2008-01-01T00:00:01.899 |
+--------------------------------------------------+
+-----------------------------------------------------------------+
| arrow_cast(timestamp.nano,Utf8("Timestamp(Millisecond, None)")) |
+-----------------------------------------------------------------+
| 2008-01-01T00:00:01.889 |
| 2008-01-01T00:00:01.999 |
| 2008-01-01T00:00:01.899 |
+-----------------------------------------------------------------+
select sec from timestamp order by sec;
@@ -276,99 +276,99 @@ select timestamp.nano from timestamp inner join timestamp_two on (timestamp.nan
select '2008-01-01 00:00:11'::TIMESTAMP_US = '2008-01-01 00:00:11'::TIMESTAMP_MS;
+-------------------------------------------------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2008-01-01 00:00:11"),Utf8("Timestamp(µs)")) = arrow_cast(Utf8("2008-01-01 00:00:11"),Utf8("Timestamp(ms)")) |
+-------------------------------------------------------------------------------------------------------------------------------+
| true |
+-------------------------------------------------------------------------------------------------------------------------------+
+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2008-01-01 00:00:11"),Utf8("Timestamp(Microsecond, None)")) = arrow_cast(Utf8("2008-01-01 00:00:11"),Utf8("Timestamp(Millisecond, None)")) |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
| true |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
select '2008-01-01 00:00:11'::TIMESTAMP_US = '2008-01-01 00:00:11'::TIMESTAMP_NS;
+-------------------------------------------------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2008-01-01 00:00:11"),Utf8("Timestamp(µs)")) = arrow_cast(Utf8("2008-01-01 00:00:11"),Utf8("Timestamp(ns)")) |
+-------------------------------------------------------------------------------------------------------------------------------+
| true |
+-------------------------------------------------------------------------------------------------------------------------------+
+------------------------------------------------------------------------------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2008-01-01 00:00:11"),Utf8("Timestamp(Microsecond, None)")) = arrow_cast(Utf8("2008-01-01 00:00:11"),Utf8("Timestamp(Nanosecond, None)")) |
+------------------------------------------------------------------------------------------------------------------------------------------------------------+
| true |
+------------------------------------------------------------------------------------------------------------------------------------------------------------+
select '2008-01-01 00:00:11'::TIMESTAMP_US = '2008-01-01 00:00:11'::TIMESTAMP_S;
+------------------------------------------------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2008-01-01 00:00:11"),Utf8("Timestamp(µs)")) = arrow_cast(Utf8("2008-01-01 00:00:11"),Utf8("Timestamp(s)")) |
+------------------------------------------------------------------------------------------------------------------------------+
| true |
+------------------------------------------------------------------------------------------------------------------------------+
+--------------------------------------------------------------------------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2008-01-01 00:00:11"),Utf8("Timestamp(Microsecond, None)")) = arrow_cast(Utf8("2008-01-01 00:00:11"),Utf8("Timestamp(Second, None)")) |
+--------------------------------------------------------------------------------------------------------------------------------------------------------+
| true |
+--------------------------------------------------------------------------------------------------------------------------------------------------------+
select '2008-01-01 00:00:11.1'::TIMESTAMP_US = '2008-01-01 00:00:11'::TIMESTAMP_MS;
+---------------------------------------------------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2008-01-01 00:00:11.1"),Utf8("Timestamp(µs)")) = arrow_cast(Utf8("2008-01-01 00:00:11"),Utf8("Timestamp(ms)")) |
+---------------------------------------------------------------------------------------------------------------------------------+
| false |
+---------------------------------------------------------------------------------------------------------------------------------+
+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2008-01-01 00:00:11.1"),Utf8("Timestamp(Microsecond, None)")) = arrow_cast(Utf8("2008-01-01 00:00:11"),Utf8("Timestamp(Millisecond, None)")) |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
| false |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
select '2008-01-01 00:00:11.1'::TIMESTAMP_US = '2008-01-01 00:00:11'::TIMESTAMP_NS;
+---------------------------------------------------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2008-01-01 00:00:11.1"),Utf8("Timestamp(µs)")) = arrow_cast(Utf8("2008-01-01 00:00:11"),Utf8("Timestamp(ns)")) |
+---------------------------------------------------------------------------------------------------------------------------------+
| false |
+---------------------------------------------------------------------------------------------------------------------------------+
+--------------------------------------------------------------------------------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2008-01-01 00:00:11.1"),Utf8("Timestamp(Microsecond, None)")) = arrow_cast(Utf8("2008-01-01 00:00:11"),Utf8("Timestamp(Nanosecond, None)")) |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------+
| false |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------+
select '2008-01-01 00:00:11.1'::TIMESTAMP_US = '2008-01-01 00:00:11.1'::TIMESTAMP_S;
+----------------------------------------------------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2008-01-01 00:00:11.1"),Utf8("Timestamp(µs)")) = arrow_cast(Utf8("2008-01-01 00:00:11.1"),Utf8("Timestamp(s)")) |
+----------------------------------------------------------------------------------------------------------------------------------+
| true |
+----------------------------------------------------------------------------------------------------------------------------------+
+------------------------------------------------------------------------------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2008-01-01 00:00:11.1"),Utf8("Timestamp(Microsecond, None)")) = arrow_cast(Utf8("2008-01-01 00:00:11.1"),Utf8("Timestamp(Second, None)")) |
+------------------------------------------------------------------------------------------------------------------------------------------------------------+
| true |
+------------------------------------------------------------------------------------------------------------------------------------------------------------+
select '2008-01-01 00:00:11.1'::TIMESTAMP_MS = '2008-01-01 00:00:11'::TIMESTAMP_NS;
+---------------------------------------------------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2008-01-01 00:00:11.1"),Utf8("Timestamp(ms)")) = arrow_cast(Utf8("2008-01-01 00:00:11"),Utf8("Timestamp(ns)")) |
+---------------------------------------------------------------------------------------------------------------------------------+
| false |
+---------------------------------------------------------------------------------------------------------------------------------+
+--------------------------------------------------------------------------------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2008-01-01 00:00:11.1"),Utf8("Timestamp(Millisecond, None)")) = arrow_cast(Utf8("2008-01-01 00:00:11"),Utf8("Timestamp(Nanosecond, None)")) |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------+
| false |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------+
select '2008-01-01 00:00:11.1'::TIMESTAMP_MS = '2008-01-01 00:00:11'::TIMESTAMP_S;
+--------------------------------------------------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2008-01-01 00:00:11.1"),Utf8("Timestamp(ms)")) = arrow_cast(Utf8("2008-01-01 00:00:11"),Utf8("Timestamp(s)")) |
+--------------------------------------------------------------------------------------------------------------------------------+
| true |
+--------------------------------------------------------------------------------------------------------------------------------+
+----------------------------------------------------------------------------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2008-01-01 00:00:11.1"),Utf8("Timestamp(Millisecond, None)")) = arrow_cast(Utf8("2008-01-01 00:00:11"),Utf8("Timestamp(Second, None)")) |
+----------------------------------------------------------------------------------------------------------------------------------------------------------+
| true |
+----------------------------------------------------------------------------------------------------------------------------------------------------------+
select '2008-01-01 00:00:11.1'::TIMESTAMP_NS = '2008-01-01 00:00:11'::TIMESTAMP_S;
+--------------------------------------------------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2008-01-01 00:00:11.1"),Utf8("Timestamp(ns)")) = arrow_cast(Utf8("2008-01-01 00:00:11"),Utf8("Timestamp(s)")) |
+--------------------------------------------------------------------------------------------------------------------------------+
| true |
+--------------------------------------------------------------------------------------------------------------------------------+
+---------------------------------------------------------------------------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2008-01-01 00:00:11.1"),Utf8("Timestamp(Nanosecond, None)")) = arrow_cast(Utf8("2008-01-01 00:00:11"),Utf8("Timestamp(Second, None)")) |
+---------------------------------------------------------------------------------------------------------------------------------------------------------+
| true |
+---------------------------------------------------------------------------------------------------------------------------------------------------------+
select '2008-01-01 00:00:11'::TIMESTAMP_MS = '2008-01-01 00:00:11'::TIMESTAMP_NS;
+-------------------------------------------------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2008-01-01 00:00:11"),Utf8("Timestamp(ms)")) = arrow_cast(Utf8("2008-01-01 00:00:11"),Utf8("Timestamp(ns)")) |
+-------------------------------------------------------------------------------------------------------------------------------+
| true |
+-------------------------------------------------------------------------------------------------------------------------------+
+------------------------------------------------------------------------------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2008-01-01 00:00:11"),Utf8("Timestamp(Millisecond, None)")) = arrow_cast(Utf8("2008-01-01 00:00:11"),Utf8("Timestamp(Nanosecond, None)")) |
+------------------------------------------------------------------------------------------------------------------------------------------------------------+
| true |
+------------------------------------------------------------------------------------------------------------------------------------------------------------+
select '2008-01-01 00:00:11'::TIMESTAMP_MS = '2008-01-01 00:00:11'::TIMESTAMP_S;
+------------------------------------------------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2008-01-01 00:00:11"),Utf8("Timestamp(ms)")) = arrow_cast(Utf8("2008-01-01 00:00:11"),Utf8("Timestamp(s)")) |
+------------------------------------------------------------------------------------------------------------------------------+
| true |
+------------------------------------------------------------------------------------------------------------------------------+
+--------------------------------------------------------------------------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2008-01-01 00:00:11"),Utf8("Timestamp(Millisecond, None)")) = arrow_cast(Utf8("2008-01-01 00:00:11"),Utf8("Timestamp(Second, None)")) |
+--------------------------------------------------------------------------------------------------------------------------------------------------------+
| true |
+--------------------------------------------------------------------------------------------------------------------------------------------------------+
select '2008-01-01 00:00:11'::TIMESTAMP_NS = '2008-01-01 00:00:11'::TIMESTAMP_S;
+------------------------------------------------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2008-01-01 00:00:11"),Utf8("Timestamp(ns)")) = arrow_cast(Utf8("2008-01-01 00:00:11"),Utf8("Timestamp(s)")) |
+------------------------------------------------------------------------------------------------------------------------------+
| true |
+------------------------------------------------------------------------------------------------------------------------------+
+-------------------------------------------------------------------------------------------------------------------------------------------------------+
| arrow_cast(Utf8("2008-01-01 00:00:11"),Utf8("Timestamp(Nanosecond, None)")) = arrow_cast(Utf8("2008-01-01 00:00:11"),Utf8("Timestamp(Second, None)")) |
+-------------------------------------------------------------------------------------------------------------------------------------------------------+
| true |
+-------------------------------------------------------------------------------------------------------------------------------------------------------+
DROP TABLE timestamp;

View File

@@ -8,25 +8,25 @@ select timestamptz '2021-11-15 02:30:00';
select '2021-11-15 02:30:00'::TIMESTAMP::TIMESTAMPTZ;
+-------------------------------------------------------------------------------------------------+
| arrow_cast(arrow_cast(Utf8("2021-11-15 02:30:00"),Utf8("Timestamp(ms)")),Utf8("Timestamp(ms)")) |
+-------------------------------------------------------------------------------------------------+
| 2021-11-15T02:30:00 |
+-------------------------------------------------------------------------------------------------+
+-------------------------------------------------------------------------------------------------------------------------------+
| arrow_cast(arrow_cast(Utf8("2021-11-15 02:30:00"),Utf8("Timestamp(Millisecond, None)")),Utf8("Timestamp(Millisecond, None)")) |
+-------------------------------------------------------------------------------------------------------------------------------+
| 2021-11-15T02:30:00 |
+-------------------------------------------------------------------------------------------------------------------------------+
SELECT '2021-04-29 10:50:09-05'::TIMESTAMPTZ::DATE;
+------------------------------------------------------------------+
| arrow_cast(Utf8("2021-04-29 10:50:09-05"),Utf8("Timestamp(ms)")) |
+------------------------------------------------------------------+
| 2021-04-29 |
+------------------------------------------------------------------+
+---------------------------------------------------------------------------------+
| arrow_cast(Utf8("2021-04-29 10:50:09-05"),Utf8("Timestamp(Millisecond, None)")) |
+---------------------------------------------------------------------------------+
| 2021-04-29 |
+---------------------------------------------------------------------------------+
SELECT '2021-04-29 10:50:09-05'::TIMESTAMPTZ::TIME;
+------------------------------------------------------------------+
| arrow_cast(Utf8("2021-04-29 10:50:09-05"),Utf8("Timestamp(ms)")) |
+------------------------------------------------------------------+
| 15:50:09 |
+------------------------------------------------------------------+
+---------------------------------------------------------------------------------+
| arrow_cast(Utf8("2021-04-29 10:50:09-05"),Utf8("Timestamp(Millisecond, None)")) |
+---------------------------------------------------------------------------------+
| 15:50:09 |
+---------------------------------------------------------------------------------+

Some files were not shown because too many files have changed in this diff Show More