Compare commits

..

3 Commits

Author SHA1 Message Date
evenyag
3ed33e1aeb feat: add slow log and configure by SLOW_FILE_SCAN_THRESHOLD
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-12-12 10:41:06 +08:00
evenyag
2fcaa5ebc3 feat: divide build_cost to build_part_cost and build_reader_cost
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-12-12 10:41:06 +08:00
evenyag
e2517dec80 feat: collect per file metrics
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-12-12 10:41:06 +08:00
121 changed files with 2222 additions and 6783 deletions

View File

@@ -51,7 +51,7 @@ runs:
run: |
helm upgrade \
--install my-greptimedb \
--set 'meta.backendStorage.etcd.endpoints[0]=${{ inputs.etcd-endpoints }}' \
--set meta.backendStorage.etcd.endpoints=${{ inputs.etcd-endpoints }} \
--set meta.enableRegionFailover=${{ inputs.enable-region-failover }} \
--set image.registry=${{ inputs.image-registry }} \
--set image.repository=${{ inputs.image-repository }} \

View File

@@ -81,7 +81,7 @@ function deploy_greptimedb_cluster() {
--create-namespace \
--set image.tag="$GREPTIMEDB_IMAGE_TAG" \
--set initializer.tag="$GREPTIMEDB_INITIALIZER_IMAGE_TAG" \
--set "meta.backendStorage.etcd.endpoints[0]=etcd.$install_namespace.svc.cluster.local:2379" \
--set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \
--set meta.backendStorage.etcd.storeKeyPrefix="$cluster_name" \
-n "$install_namespace"
@@ -119,7 +119,7 @@ function deploy_greptimedb_cluster_with_s3_storage() {
--create-namespace \
--set image.tag="$GREPTIMEDB_IMAGE_TAG" \
--set initializer.tag="$GREPTIMEDB_INITIALIZER_IMAGE_TAG" \
--set "meta.backendStorage.etcd.endpoints[0]=etcd.$install_namespace.svc.cluster.local:2379" \
--set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \
--set meta.backendStorage.etcd.storeKeyPrefix="$cluster_name" \
--set objectStorage.s3.bucket="$AWS_CI_TEST_BUCKET" \
--set objectStorage.s3.region="$AWS_REGION" \

285
Cargo.lock generated
View File

@@ -212,7 +212,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
[[package]]
name = "api"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"arrow-schema",
"common-base",
@@ -733,7 +733,7 @@ dependencies = [
[[package]]
name = "auth"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"api",
"async-trait",
@@ -1383,7 +1383,7 @@ dependencies = [
[[package]]
name = "cache"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"catalog",
"common-error",
@@ -1418,7 +1418,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "catalog"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"api",
"arrow",
@@ -1763,7 +1763,7 @@ checksum = "b94f61472cee1439c0b966b47e3aca9ae07e45d070759512cd390ea2bebc6675"
[[package]]
name = "cli"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"async-stream",
"async-trait",
@@ -1816,7 +1816,7 @@ dependencies = [
[[package]]
name = "client"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"api",
"arc-swap",
@@ -1849,7 +1849,7 @@ dependencies = [
"snafu 0.8.6",
"store-api",
"substrait 0.37.3",
"substrait 1.0.0-beta.3",
"substrait 1.0.0-beta.2",
"tokio",
"tokio-stream",
"tonic 0.13.1",
@@ -1889,7 +1889,7 @@ dependencies = [
[[package]]
name = "cmd"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"async-trait",
"auth",
@@ -1977,17 +1977,6 @@ dependencies = [
"unicode-width 0.2.1",
]
[[package]]
name = "codespan-reporting"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af491d569909a7e4dee0ad7db7f5341fef5c614d5b8ec8cf765732aba3cff681"
dependencies = [
"serde",
"termcolor",
"unicode-width 0.2.1",
]
[[package]]
name = "colorchoice"
version = "1.0.4"
@@ -2023,7 +2012,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
[[package]]
name = "common-base"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"anymap2",
"async-trait",
@@ -2047,14 +2036,14 @@ dependencies = [
[[package]]
name = "common-catalog"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"const_format",
]
[[package]]
name = "common-config"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"common-base",
"common-error",
@@ -2079,7 +2068,7 @@ dependencies = [
[[package]]
name = "common-datasource"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"arrow",
"arrow-schema",
@@ -2114,7 +2103,7 @@ dependencies = [
[[package]]
name = "common-decimal"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"bigdecimal 0.4.8",
"common-error",
@@ -2127,7 +2116,7 @@ dependencies = [
[[package]]
name = "common-error"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"common-macro",
"http 1.3.1",
@@ -2138,7 +2127,7 @@ dependencies = [
[[package]]
name = "common-event-recorder"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"api",
"async-trait",
@@ -2160,7 +2149,7 @@ dependencies = [
[[package]]
name = "common-frontend"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"api",
"async-trait",
@@ -2182,7 +2171,7 @@ dependencies = [
[[package]]
name = "common-function"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"ahash 0.8.12",
"api",
@@ -2242,7 +2231,7 @@ dependencies = [
[[package]]
name = "common-greptimedb-telemetry"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"async-trait",
"common-runtime",
@@ -2259,7 +2248,7 @@ dependencies = [
[[package]]
name = "common-grpc"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"api",
"arrow-flight",
@@ -2294,7 +2283,7 @@ dependencies = [
[[package]]
name = "common-grpc-expr"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"api",
"common-base",
@@ -2314,7 +2303,7 @@ dependencies = [
[[package]]
name = "common-macro"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"greptime-proto",
"once_cell",
@@ -2325,7 +2314,7 @@ dependencies = [
[[package]]
name = "common-mem-prof"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"anyhow",
"common-error",
@@ -2339,22 +2328,9 @@ dependencies = [
"tokio",
]
[[package]]
name = "common-memory-manager"
version = "1.0.0-beta.3"
dependencies = [
"common-error",
"common-macro",
"common-telemetry",
"humantime",
"serde",
"snafu 0.8.6",
"tokio",
]
[[package]]
name = "common-meta"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"anymap2",
"api",
@@ -2426,7 +2402,7 @@ dependencies = [
[[package]]
name = "common-options"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"common-grpc",
"humantime-serde",
@@ -2435,11 +2411,11 @@ dependencies = [
[[package]]
name = "common-plugins"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
[[package]]
name = "common-pprof"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"common-error",
"common-macro",
@@ -2451,7 +2427,7 @@ dependencies = [
[[package]]
name = "common-procedure"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"api",
"async-stream",
@@ -2480,7 +2456,7 @@ dependencies = [
[[package]]
name = "common-procedure-test"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"async-trait",
"common-procedure",
@@ -2490,7 +2466,7 @@ dependencies = [
[[package]]
name = "common-query"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"api",
"async-trait",
@@ -2516,7 +2492,7 @@ dependencies = [
[[package]]
name = "common-recordbatch"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"arc-swap",
"common-base",
@@ -2540,7 +2516,7 @@ dependencies = [
[[package]]
name = "common-runtime"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"async-trait",
"clap 4.5.40",
@@ -2569,7 +2545,7 @@ dependencies = [
[[package]]
name = "common-session"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"serde",
"strum 0.27.1",
@@ -2577,7 +2553,7 @@ dependencies = [
[[package]]
name = "common-sql"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"common-base",
"common-decimal",
@@ -2595,7 +2571,7 @@ dependencies = [
[[package]]
name = "common-stat"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"common-base",
"common-runtime",
@@ -2610,7 +2586,7 @@ dependencies = [
[[package]]
name = "common-telemetry"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"backtrace",
"common-base",
@@ -2639,7 +2615,7 @@ dependencies = [
[[package]]
name = "common-test-util"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"client",
"common-grpc",
@@ -2652,7 +2628,7 @@ dependencies = [
[[package]]
name = "common-time"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"arrow",
"chrono",
@@ -2670,7 +2646,7 @@ dependencies = [
[[package]]
name = "common-version"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"build-data",
"cargo-manifest",
@@ -2681,7 +2657,7 @@ dependencies = [
[[package]]
name = "common-wal"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"common-base",
"common-error",
@@ -2704,7 +2680,7 @@ dependencies = [
[[package]]
name = "common-workload"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"common-telemetry",
"serde",
@@ -3180,68 +3156,6 @@ dependencies = [
"cipher",
]
[[package]]
name = "cxx"
version = "1.0.190"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7620f6cfc4dcca21f2b085b7a890e16c60fd66f560cd69ee60594908dc72ab1"
dependencies = [
"cc",
"cxx-build",
"cxxbridge-cmd",
"cxxbridge-flags",
"cxxbridge-macro",
"foldhash 0.2.0",
"link-cplusplus",
]
[[package]]
name = "cxx-build"
version = "1.0.190"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a9bc1a22964ff6a355fbec24cf68266a0ed28f8b84c0864c386474ea3d0e479"
dependencies = [
"cc",
"codespan-reporting 0.13.1",
"indexmap 2.11.4",
"proc-macro2",
"quote",
"scratch",
"syn 2.0.106",
]
[[package]]
name = "cxxbridge-cmd"
version = "1.0.190"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1f29a879d35f7906e3c9b77d7a1005a6a0787d330c09dfe4ffb5f617728cb44"
dependencies = [
"clap 4.5.40",
"codespan-reporting 0.13.1",
"indexmap 2.11.4",
"proc-macro2",
"quote",
"syn 2.0.106",
]
[[package]]
name = "cxxbridge-flags"
version = "1.0.190"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d67109015f93f683e364085aa6489a5b2118b4a40058482101d699936a7836d6"
[[package]]
name = "cxxbridge-macro"
version = "1.0.190"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d187e019e7b05a1f3e69a8396b70800ee867aa9fc2ab972761173ccee03742df"
dependencies = [
"indexmap 2.11.4",
"proc-macro2",
"quote",
"syn 2.0.106",
]
[[package]]
name = "darling"
version = "0.14.4"
@@ -4012,7 +3926,7 @@ dependencies = [
[[package]]
name = "datanode"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"api",
"arrow-flight",
@@ -4076,7 +3990,7 @@ dependencies = [
[[package]]
name = "datatypes"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"arrow",
"arrow-array",
@@ -4750,7 +4664,7 @@ checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
[[package]]
name = "file-engine"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"api",
"async-trait",
@@ -4882,7 +4796,7 @@ checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8"
[[package]]
name = "flow"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"api",
"arrow",
@@ -4951,7 +4865,7 @@ dependencies = [
"sql",
"store-api",
"strum 0.27.1",
"substrait 1.0.0-beta.3",
"substrait 1.0.0-beta.2",
"table",
"tokio",
"tonic 0.13.1",
@@ -4989,12 +4903,6 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2"
[[package]]
name = "foldhash"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb"
[[package]]
name = "form_urlencoded"
version = "1.2.2"
@@ -5012,7 +4920,7 @@ checksum = "28dd6caf6059519a65843af8fe2a3ae298b14b80179855aeb4adc2c1934ee619"
[[package]]
name = "frontend"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"api",
"arc-swap",
@@ -5595,7 +5503,7 @@ checksum = "5971ac85611da7067dbfcabef3c70ebb5606018acd9e2a3903a0da507521e0d5"
dependencies = [
"allocator-api2",
"equivalent",
"foldhash 0.1.5",
"foldhash",
]
[[package]]
@@ -6227,7 +6135,7 @@ dependencies = [
[[package]]
name = "index"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"async-trait",
"asynchronous-codec",
@@ -6240,7 +6148,6 @@ dependencies = [
"common-telemetry",
"common-test-util",
"criterion 0.4.0",
"datatypes",
"fastbloom",
"fst",
"futures",
@@ -6249,7 +6156,6 @@ dependencies = [
"jieba-rs",
"lazy_static",
"mockall",
"nalgebra",
"pin-project",
"prost 0.13.5",
"puffin",
@@ -6267,7 +6173,6 @@ dependencies = [
"tempfile",
"tokio",
"tokio-util",
"usearch",
"uuid",
]
@@ -7099,15 +7004,6 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "link-cplusplus"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f78c730aaa7d0b9336a299029ea49f9ee53b0ed06e9202e8cb7db9bae7b8c82"
dependencies = [
"cc",
]
[[package]]
name = "linked-hash-map"
version = "0.5.6"
@@ -7168,7 +7064,7 @@ checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94"
[[package]]
name = "log-query"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"chrono",
"common-error",
@@ -7180,7 +7076,7 @@ dependencies = [
[[package]]
name = "log-store"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"async-stream",
"async-trait",
@@ -7421,6 +7317,12 @@ dependencies = [
"digest",
]
[[package]]
name = "md5"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
[[package]]
name = "md5"
version = "0.8.0"
@@ -7481,7 +7383,7 @@ dependencies = [
[[package]]
name = "meta-client"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"api",
"async-trait",
@@ -7509,7 +7411,7 @@ dependencies = [
[[package]]
name = "meta-srv"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"api",
"async-trait",
@@ -7609,7 +7511,7 @@ dependencies = [
[[package]]
name = "metric-engine"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"api",
"aquamarine",
@@ -7706,7 +7608,7 @@ dependencies = [
[[package]]
name = "mito-codec"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"api",
"bytes",
@@ -7731,7 +7633,7 @@ dependencies = [
[[package]]
name = "mito2"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"api",
"aquamarine",
@@ -7749,7 +7651,6 @@ dependencies = [
"common-function",
"common-grpc",
"common-macro",
"common-memory-manager",
"common-meta",
"common-query",
"common-recordbatch",
@@ -7771,7 +7672,6 @@ dependencies = [
"either",
"futures",
"greptime-proto",
"humantime",
"humantime-serde",
"index",
"itertools 0.14.0",
@@ -8471,7 +8371,7 @@ dependencies = [
[[package]]
name = "object-store"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"anyhow",
"bytes",
@@ -8484,6 +8384,7 @@ dependencies = [
"futures",
"humantime-serde",
"lazy_static",
"md5 0.7.0",
"moka",
"opendal",
"prometheus",
@@ -8756,7 +8657,7 @@ dependencies = [
[[package]]
name = "operator"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"ahash 0.8.12",
"api",
@@ -8816,7 +8717,7 @@ dependencies = [
"sql",
"sqlparser",
"store-api",
"substrait 1.0.0-beta.3",
"substrait 1.0.0-beta.2",
"table",
"tokio",
"tokio-util",
@@ -9102,7 +9003,7 @@ dependencies = [
[[package]]
name = "partition"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"api",
"async-trait",
@@ -9330,7 +9231,7 @@ dependencies = [
"futures",
"hex",
"lazy-regex",
"md5",
"md5 0.8.0",
"postgres-types",
"rand 0.9.1",
"ring",
@@ -9459,7 +9360,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pipeline"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"ahash 0.8.12",
"api",
@@ -9615,7 +9516,7 @@ dependencies = [
[[package]]
name = "plugins"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"auth",
"catalog",
@@ -9917,7 +9818,7 @@ dependencies = [
[[package]]
name = "promql"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"ahash 0.8.12",
"async-trait",
@@ -10200,7 +10101,7 @@ dependencies = [
[[package]]
name = "puffin"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"async-compression 0.4.19",
"async-trait",
@@ -10242,7 +10143,7 @@ dependencies = [
[[package]]
name = "query"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"ahash 0.8.12",
"api",
@@ -10309,7 +10210,7 @@ dependencies = [
"sql",
"sqlparser",
"store-api",
"substrait 1.0.0-beta.3",
"substrait 1.0.0-beta.2",
"table",
"tokio",
"tokio-stream",
@@ -11381,12 +11282,6 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "scratch"
version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d68f2ec51b097e4c1a75b681a8bec621909b5e91f15bb7b840c4f2f7b01148b2"
[[package]]
name = "scrypt"
version = "0.11.0"
@@ -11651,7 +11546,7 @@ dependencies = [
[[package]]
name = "servers"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"ahash 0.8.12",
"api",
@@ -11779,7 +11674,7 @@ dependencies = [
[[package]]
name = "session"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"ahash 0.8.12",
"api",
@@ -12113,7 +12008,7 @@ dependencies = [
[[package]]
name = "sql"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"api",
"arrow-buffer",
@@ -12173,7 +12068,7 @@ dependencies = [
[[package]]
name = "sqlness-runner"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"async-trait",
"clap 4.5.40",
@@ -12450,7 +12345,7 @@ dependencies = [
[[package]]
name = "standalone"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"async-trait",
"catalog",
@@ -12491,7 +12386,7 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "store-api"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"api",
"aquamarine",
@@ -12704,7 +12599,7 @@ dependencies = [
[[package]]
name = "substrait"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"async-trait",
"bytes",
@@ -12827,7 +12722,7 @@ dependencies = [
[[package]]
name = "table"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"api",
"async-trait",
@@ -13096,7 +12991,7 @@ checksum = "8f50febec83f5ee1df3015341d8bd429f2d1cc62bcba7ea2076759d315084683"
[[package]]
name = "tests-fuzz"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"arbitrary",
"async-trait",
@@ -13140,7 +13035,7 @@ dependencies = [
[[package]]
name = "tests-integration"
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
dependencies = [
"api",
"arrow-flight",
@@ -13215,7 +13110,7 @@ dependencies = [
"sqlx",
"standalone",
"store-api",
"substrait 1.0.0-beta.3",
"substrait 1.0.0-beta.2",
"table",
"tempfile",
"time",
@@ -14240,16 +14135,6 @@ version = "2.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da"
[[package]]
name = "usearch"
version = "2.21.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2cc9fc5f872a3a4f9081d5f42624d788231b763e1846c829b9968a3755ac884d"
dependencies = [
"cxx",
"cxx-build",
]
[[package]]
name = "utf8-ranges"
version = "1.0.5"
@@ -14389,7 +14274,7 @@ dependencies = [
"ciborium",
"cidr",
"clap 4.5.40",
"codespan-reporting 0.12.0",
"codespan-reporting",
"community-id",
"convert_case 0.7.1",
"crc",

View File

@@ -21,7 +21,6 @@ members = [
"src/common/grpc-expr",
"src/common/macro",
"src/common/mem-prof",
"src/common/memory-manager",
"src/common/meta",
"src/common/options",
"src/common/plugins",
@@ -75,7 +74,7 @@ members = [
resolver = "2"
[workspace.package]
version = "1.0.0-beta.3"
version = "1.0.0-beta.2"
edition = "2024"
license = "Apache-2.0"
@@ -267,7 +266,6 @@ common-grpc = { path = "src/common/grpc" }
common-grpc-expr = { path = "src/common/grpc-expr" }
common-macro = { path = "src/common/macro" }
common-mem-prof = { path = "src/common/mem-prof" }
common-memory-manager = { path = "src/common/memory-manager" }
common-meta = { path = "src/common/meta" }
common-options = { path = "src/common/options" }
common-plugins = { path = "src/common/plugins" }

View File

@@ -108,6 +108,9 @@
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `./greptimedb_data` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
| `storage.enable_read_cache` | Bool | `true` | Whether to enable read cache. If not set, the read cache will be enabled by default when using object storage. |
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. |
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |
| `storage.bucket` | String | Unset | The S3 bucket name.<br/>**It's only used when the storage type is `S3`, `Oss` and `Gcs`**. |
| `storage.root` | String | Unset | The S3 data will be stored in the specified prefix, for example, `s3://${bucket}/${root}`.<br/>**It's only used when the storage type is `S3`, `Oss` and `Azblob`**. |
| `storage.access_key_id` | String | Unset | The access key id of the aws account.<br/>It's **highly recommended** to use AWS IAM roles instead of hardcoding the access key id and secret key.<br/>**It's only used when the storage type is `S3` and `Oss`**. |
@@ -138,8 +141,6 @@
| `region_engine.mito.max_background_flushes` | Integer | Auto | Max number of running background flush jobs (default: 1/2 of cpu cores). |
| `region_engine.mito.max_background_compactions` | Integer | Auto | Max number of running background compaction jobs (default: 1/4 of cpu cores). |
| `region_engine.mito.max_background_purges` | Integer | Auto | Max number of running background purge jobs (default: number of cpu cores). |
| `region_engine.mito.experimental_compaction_memory_limit` | String | 0 | Memory budget for compaction tasks. Setting it to 0 or "unlimited" disables the limit. |
| `region_engine.mito.experimental_compaction_on_exhausted` | String | wait | Behavior when compaction cannot acquire memory from the budget.<br/>Options: "wait" (default, 10s), "wait(<duration>)", "fail" |
| `region_engine.mito.auto_flush_interval` | String | `1h` | Interval to auto flush a region if it has not flushed yet. |
| `region_engine.mito.global_write_buffer_size` | String | Auto | Global write buffer size for all regions. If not set, it's default to 1/8 of OS memory with a max limitation of 1GB. |
| `region_engine.mito.global_write_buffer_reject_size` | String | Auto | Global write buffer size threshold to reject write requests. If not set, it's default to 2 times of `global_write_buffer_size`. |
@@ -153,8 +154,6 @@
| `region_engine.mito.write_cache_ttl` | String | Unset | TTL for write cache. |
| `region_engine.mito.preload_index_cache` | Bool | `true` | Preload index (puffin) files into cache on region open (default: true).<br/>When enabled, index files are loaded into the write cache during region initialization,<br/>which can improve query performance at the cost of longer startup times. |
| `region_engine.mito.index_cache_percent` | Integer | `20` | Percentage of write cache capacity allocated for index (puffin) files (default: 20).<br/>The remaining capacity is used for data (parquet) files.<br/>Must be between 0 and 100 (exclusive). For example, with a 5GiB write cache and 20% allocation,<br/>1GiB is reserved for index files and 4GiB for data files. |
| `region_engine.mito.enable_refill_cache_on_read` | Bool | `true` | Enable refilling cache on read operations (default: true).<br/>When disabled, cache refilling on read won't happen. |
| `region_engine.mito.manifest_cache_size` | String | `256MB` | Capacity for manifest cache (default: 256MB). |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. |
@@ -487,6 +486,9 @@
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `./greptimedb_data` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. |
| `storage.enable_read_cache` | Bool | `true` | Whether to enable read cache. If not set, the read cache will be enabled by default when using object storage. |
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |
| `storage.bucket` | String | Unset | The S3 bucket name.<br/>**It's only used when the storage type is `S3`, `Oss` and `Gcs`**. |
| `storage.root` | String | Unset | The S3 data will be stored in the specified prefix, for example, `s3://${bucket}/${root}`.<br/>**It's only used when the storage type is `S3`, `Oss` and `Azblob`**. |
| `storage.access_key_id` | String | Unset | The access key id of the aws account.<br/>It's **highly recommended** to use AWS IAM roles instead of hardcoding the access key id and secret key.<br/>**It's only used when the storage type is `S3` and `Oss`**. |
@@ -519,8 +521,6 @@
| `region_engine.mito.max_background_flushes` | Integer | Auto | Max number of running background flush jobs (default: 1/2 of cpu cores). |
| `region_engine.mito.max_background_compactions` | Integer | Auto | Max number of running background compaction jobs (default: 1/4 of cpu cores). |
| `region_engine.mito.max_background_purges` | Integer | Auto | Max number of running background purge jobs (default: number of cpu cores). |
| `region_engine.mito.experimental_compaction_memory_limit` | String | 0 | Memory budget for compaction tasks. Setting it to 0 or "unlimited" disables the limit. |
| `region_engine.mito.experimental_compaction_on_exhausted` | String | wait | Behavior when compaction cannot acquire memory from the budget.<br/>Options: "wait" (default, 10s), "wait(<duration>)", "fail" |
| `region_engine.mito.auto_flush_interval` | String | `1h` | Interval to auto flush a region if it has not flushed yet. |
| `region_engine.mito.global_write_buffer_size` | String | Auto | Global write buffer size for all regions. If not set, it's default to 1/8 of OS memory with a max limitation of 1GB. |
| `region_engine.mito.global_write_buffer_reject_size` | String | Auto | Global write buffer size threshold to reject write requests. If not set, it's default to 2 times of `global_write_buffer_size` |
@@ -534,8 +534,6 @@
| `region_engine.mito.write_cache_ttl` | String | Unset | TTL for write cache. |
| `region_engine.mito.preload_index_cache` | Bool | `true` | Preload index (puffin) files into cache on region open (default: true).<br/>When enabled, index files are loaded into the write cache during region initialization,<br/>which can improve query performance at the cost of longer startup times. |
| `region_engine.mito.index_cache_percent` | Integer | `20` | Percentage of write cache capacity allocated for index (puffin) files (default: 20).<br/>The remaining capacity is used for data (parquet) files.<br/>Must be between 0 and 100 (exclusive). For example, with a 5GiB write cache and 20% allocation,<br/>1GiB is reserved for index files and 4GiB for data files. |
| `region_engine.mito.enable_refill_cache_on_read` | Bool | `true` | Enable refilling cache on read operations (default: true).<br/>When disabled, cache refilling on read won't happen. |
| `region_engine.mito.manifest_cache_size` | String | `256MB` | Capacity for manifest cache (default: 256MB). |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. |

View File

@@ -281,6 +281,18 @@ data_home = "./greptimedb_data"
## - `Oss`: the data is stored in the Aliyun OSS.
type = "File"
## Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.
## A local file directory, defaults to `{data_home}`. An empty string means disabling.
## @toml2docs:none-default
#+ cache_path = ""
## Whether to enable read cache. If not set, the read cache will be enabled by default when using object storage.
#+ enable_read_cache = true
## The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger.
## @toml2docs:none-default
cache_capacity = "5GiB"
## The S3 bucket name.
## **It's only used when the storage type is `S3`, `Oss` and `Gcs`**.
## @toml2docs:none-default
@@ -440,15 +452,6 @@ compress_manifest = false
## @toml2docs:none-default="Auto"
#+ max_background_purges = 8
## Memory budget for compaction tasks. Setting it to 0 or "unlimited" disables the limit.
## @toml2docs:none-default="0"
#+ experimental_compaction_memory_limit = "0"
## Behavior when compaction cannot acquire memory from the budget.
## Options: "wait" (default, 10s), "wait(<duration>)", "fail"
## @toml2docs:none-default="wait"
#+ experimental_compaction_on_exhausted = "wait"
## Interval to auto flush a region if it has not flushed yet.
auto_flush_interval = "1h"
@@ -504,13 +507,6 @@ preload_index_cache = true
## 1GiB is reserved for index files and 4GiB for data files.
index_cache_percent = 20
## Enable refilling cache on read operations (default: true).
## When disabled, cache refilling on read won't happen.
enable_refill_cache_on_read = true
## Capacity for manifest cache (default: 256MB).
manifest_cache_size = "256MB"
## Buffer size for SST writing.
sst_write_buffer_size = "8MB"

View File

@@ -388,6 +388,18 @@ data_home = "./greptimedb_data"
## - `Oss`: the data is stored in the Aliyun OSS.
type = "File"
## Whether to enable read cache. If not set, the read cache will be enabled by default when using object storage.
#+ enable_read_cache = true
## Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.
## A local file directory, defaults to `{data_home}`. An empty string means disabling.
## @toml2docs:none-default
#+ cache_path = ""
## The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger.
## @toml2docs:none-default
cache_capacity = "5GiB"
## The S3 bucket name.
## **It's only used when the storage type is `S3`, `Oss` and `Gcs`**.
## @toml2docs:none-default
@@ -534,15 +546,6 @@ compress_manifest = false
## @toml2docs:none-default="Auto"
#+ max_background_purges = 8
## Memory budget for compaction tasks. Setting it to 0 or "unlimited" disables the limit.
## @toml2docs:none-default="0"
#+ experimental_compaction_memory_limit = "0"
## Behavior when compaction cannot acquire memory from the budget.
## Options: "wait" (default, 10s), "wait(<duration>)", "fail"
## @toml2docs:none-default="wait"
#+ experimental_compaction_on_exhausted = "wait"
## Interval to auto flush a region if it has not flushed yet.
auto_flush_interval = "1h"
@@ -598,13 +601,6 @@ preload_index_cache = true
## 1GiB is reserved for index files and 4GiB for data files.
index_cache_percent = 20
## Enable refilling cache on read operations (default: true).
## When disabled, cache refilling on read won't happen.
enable_refill_cache_on_read = true
## Capacity for manifest cache (default: 256MB).
manifest_cache_size = "256MB"
## Buffer size for SST writing.
sst_write_buffer_size = "8MB"

View File

@@ -428,7 +428,7 @@ pub trait InformationExtension {
}
/// The request to inspect the datanode.
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DatanodeInspectRequest {
/// Kind to fetch from datanode.
pub kind: DatanodeInspectKind,

View File

@@ -145,17 +145,6 @@ impl ObjbenchCommand {
let region_meta = extract_region_metadata(&self.source, &parquet_meta)?;
let num_rows = parquet_meta.file_metadata().num_rows() as u64;
let num_row_groups = parquet_meta.num_row_groups() as u64;
let max_row_group_uncompressed_size: u64 = parquet_meta
.row_groups()
.iter()
.map(|rg| {
rg.columns()
.iter()
.map(|c| c.uncompressed_size() as u64)
.sum::<u64>()
})
.max()
.unwrap_or(0);
println!(
"{} Metadata loaded - rows: {}, size: {} bytes",
@@ -171,7 +160,6 @@ impl ObjbenchCommand {
time_range: Default::default(),
level: 0,
file_size,
max_row_group_uncompressed_size,
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,

View File

@@ -52,7 +52,7 @@ use plugins::frontend::context::{
};
use servers::addrs;
use servers::grpc::GrpcOptions;
use servers::tls::{TlsMode, TlsOption, merge_tls_option};
use servers::tls::{TlsMode, TlsOption};
use snafu::{OptionExt, ResultExt};
use tracing_appender::non_blocking::WorkerGuard;
@@ -256,7 +256,7 @@ impl StartCommand {
if let Some(addr) = &self.rpc_bind_addr {
opts.grpc.bind_addr.clone_from(addr);
opts.grpc.tls = merge_tls_option(&opts.grpc.tls, tls_opts.clone());
opts.grpc.tls = tls_opts.clone();
}
if let Some(addr) = &self.rpc_server_addr {
@@ -291,13 +291,13 @@ impl StartCommand {
if let Some(addr) = &self.mysql_addr {
opts.mysql.enable = true;
opts.mysql.addr.clone_from(addr);
opts.mysql.tls = merge_tls_option(&opts.mysql.tls, tls_opts.clone());
opts.mysql.tls = tls_opts.clone();
}
if let Some(addr) = &self.postgres_addr {
opts.postgres.enable = true;
opts.postgres.addr.clone_from(addr);
opts.postgres.tls = merge_tls_option(&opts.postgres.tls, tls_opts.clone());
opts.postgres.tls = tls_opts;
}
if let Some(enable) = self.influxdb_enable {

View File

@@ -62,7 +62,7 @@ use plugins::frontend::context::{
CatalogManagerConfigureContext, StandaloneCatalogManagerConfigureContext,
};
use plugins::standalone::context::DdlManagerConfigureContext;
use servers::tls::{TlsMode, TlsOption, merge_tls_option};
use servers::tls::{TlsMode, TlsOption};
use snafu::ResultExt;
use standalone::StandaloneInformationExtension;
use standalone::options::StandaloneOptions;
@@ -293,20 +293,19 @@ impl StartCommand {
),
}.fail();
}
opts.grpc.bind_addr.clone_from(addr);
opts.grpc.tls = merge_tls_option(&opts.grpc.tls, tls_opts.clone());
opts.grpc.bind_addr.clone_from(addr)
}
if let Some(addr) = &self.mysql_addr {
opts.mysql.enable = true;
opts.mysql.addr.clone_from(addr);
opts.mysql.tls = merge_tls_option(&opts.mysql.tls, tls_opts.clone());
opts.mysql.tls = tls_opts.clone();
}
if let Some(addr) = &self.postgres_addr {
opts.postgres.enable = true;
opts.postgres.addr.clone_from(addr);
opts.postgres.tls = merge_tls_option(&opts.postgres.tls, tls_opts.clone());
opts.postgres.tls = tls_opts;
}
if self.influxdb_enable {
@@ -766,6 +765,7 @@ mod tests {
user_provider: Some("static_user_provider:cmd:test=test".to_string()),
mysql_addr: Some("127.0.0.1:4002".to_string()),
postgres_addr: Some("127.0.0.1:4003".to_string()),
tls_watch: true,
..Default::default()
};
@@ -782,6 +782,8 @@ mod tests {
assert_eq!("./greptimedb_data/test/logs", opts.logging.dir);
assert_eq!("debug", opts.logging.level.unwrap());
assert!(opts.mysql.tls.watch);
assert!(opts.postgres.tls.watch);
}
#[test]

View File

@@ -1,20 +0,0 @@
[package]
name = "common-memory-manager"
version.workspace = true
edition.workspace = true
license.workspace = true
[lints]
workspace = true
[dependencies]
common-error = { workspace = true }
common-macro = { workspace = true }
common-telemetry = { workspace = true }
humantime = { workspace = true }
serde = { workspace = true }
snafu = { workspace = true }
tokio = { workspace = true, features = ["sync"] }
[dev-dependencies]
tokio = { workspace = true, features = ["rt", "macros"] }

View File

@@ -1,53 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::Snafu;
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display(
"Memory limit exceeded: requested {requested_bytes} bytes, limit {limit_bytes} bytes"
))]
MemoryLimitExceeded {
requested_bytes: u64,
limit_bytes: u64,
},
#[snafu(display("Memory semaphore unexpectedly closed"))]
MemorySemaphoreClosed,
}
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use Error::*;
match self {
MemoryLimitExceeded { .. } => StatusCode::RuntimeResourcesExhausted,
MemorySemaphoreClosed => StatusCode::Unexpected,
}
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -1,138 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{fmt, mem};
use common_telemetry::debug;
use tokio::sync::{OwnedSemaphorePermit, TryAcquireError};
use crate::manager::{MemoryMetrics, MemoryQuota, bytes_to_permits, permits_to_bytes};
/// Guard representing a slice of reserved memory.
pub struct MemoryGuard<M: MemoryMetrics> {
pub(crate) state: GuardState<M>,
}
pub(crate) enum GuardState<M: MemoryMetrics> {
Unlimited,
Limited {
permit: OwnedSemaphorePermit,
quota: MemoryQuota<M>,
},
}
impl<M: MemoryMetrics> MemoryGuard<M> {
pub(crate) fn unlimited() -> Self {
Self {
state: GuardState::Unlimited,
}
}
pub(crate) fn limited(permit: OwnedSemaphorePermit, quota: MemoryQuota<M>) -> Self {
Self {
state: GuardState::Limited { permit, quota },
}
}
/// Returns granted quota in bytes.
pub fn granted_bytes(&self) -> u64 {
match &self.state {
GuardState::Unlimited => 0,
GuardState::Limited { permit, .. } => permits_to_bytes(permit.num_permits() as u32),
}
}
/// Tries to allocate additional memory during task execution.
///
/// On success, merges the new memory into this guard and returns true.
/// On failure, returns false and leaves this guard unchanged.
pub fn request_additional(&mut self, bytes: u64) -> bool {
match &mut self.state {
GuardState::Unlimited => true,
GuardState::Limited { permit, quota } => {
if bytes == 0 {
return true;
}
let additional_permits = bytes_to_permits(bytes);
match quota
.semaphore
.clone()
.try_acquire_many_owned(additional_permits)
{
Ok(additional_permit) => {
permit.merge(additional_permit);
quota.update_in_use_metric();
debug!("Allocated additional {} bytes", bytes);
true
}
Err(TryAcquireError::NoPermits) | Err(TryAcquireError::Closed) => {
quota.metrics.inc_rejected("request_additional");
false
}
}
}
}
}
/// Releases a portion of granted memory back to the pool early,
/// before the guard is dropped.
///
/// Returns true if the release succeeds or is a no-op; false if the request exceeds granted.
pub fn early_release_partial(&mut self, bytes: u64) -> bool {
match &mut self.state {
GuardState::Unlimited => true,
GuardState::Limited { permit, quota } => {
if bytes == 0 {
return true;
}
let release_permits = bytes_to_permits(bytes);
match permit.split(release_permits as usize) {
Some(released_permit) => {
let released_bytes = permits_to_bytes(released_permit.num_permits() as u32);
drop(released_permit);
quota.update_in_use_metric();
debug!("Early released {} bytes from memory guard", released_bytes);
true
}
None => false,
}
}
}
}
}
impl<M: MemoryMetrics> Drop for MemoryGuard<M> {
fn drop(&mut self) {
if let GuardState::Limited { permit, quota } =
mem::replace(&mut self.state, GuardState::Unlimited)
{
let bytes = permits_to_bytes(permit.num_permits() as u32);
drop(permit);
quota.update_in_use_metric();
debug!("Released memory: {} bytes", bytes);
}
}
}
impl<M: MemoryMetrics> fmt::Debug for MemoryGuard<M> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("MemoryGuard")
.field("granted_bytes", &self.granted_bytes())
.finish()
}
}

View File

@@ -1,47 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Generic memory management for resource-constrained operations.
//!
//! This crate provides a reusable memory quota system based on semaphores,
//! allowing different subsystems (compaction, flush, index build, etc.) to
//! share the same allocation logic while using their own metrics.
mod error;
mod guard;
mod manager;
mod policy;
#[cfg(test)]
mod tests;
pub use error::{Error, Result};
pub use guard::MemoryGuard;
pub use manager::{MemoryManager, MemoryMetrics, PERMIT_GRANULARITY_BYTES};
pub use policy::{DEFAULT_MEMORY_WAIT_TIMEOUT, OnExhaustedPolicy};
/// No-op metrics implementation for testing.
#[derive(Clone, Copy, Debug, Default)]
pub struct NoOpMetrics;
impl MemoryMetrics for NoOpMetrics {
#[inline(always)]
fn set_limit(&self, _: i64) {}
#[inline(always)]
fn set_in_use(&self, _: i64) {}
#[inline(always)]
fn inc_rejected(&self, _: &str) {}
}

View File

@@ -1,173 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use snafu::ensure;
use tokio::sync::{Semaphore, TryAcquireError};
use crate::error::{MemoryLimitExceededSnafu, MemorySemaphoreClosedSnafu, Result};
use crate::guard::MemoryGuard;
/// Minimum bytes controlled by one semaphore permit.
pub const PERMIT_GRANULARITY_BYTES: u64 = 1 << 20; // 1 MB
/// Trait for recording memory usage metrics.
pub trait MemoryMetrics: Clone + Send + Sync + 'static {
fn set_limit(&self, bytes: i64);
fn set_in_use(&self, bytes: i64);
fn inc_rejected(&self, reason: &str);
}
/// Generic memory manager for quota-controlled operations.
#[derive(Clone)]
pub struct MemoryManager<M: MemoryMetrics> {
quota: Option<MemoryQuota<M>>,
}
#[derive(Clone)]
pub(crate) struct MemoryQuota<M: MemoryMetrics> {
pub(crate) semaphore: Arc<Semaphore>,
pub(crate) limit_permits: u32,
pub(crate) metrics: M,
}
impl<M: MemoryMetrics> MemoryManager<M> {
/// Creates a new memory manager with the given limit in bytes.
/// `limit_bytes = 0` disables the limit.
pub fn new(limit_bytes: u64, metrics: M) -> Self {
if limit_bytes == 0 {
metrics.set_limit(0);
return Self { quota: None };
}
let limit_permits = bytes_to_permits(limit_bytes);
let limit_aligned_bytes = permits_to_bytes(limit_permits);
metrics.set_limit(limit_aligned_bytes as i64);
Self {
quota: Some(MemoryQuota {
semaphore: Arc::new(Semaphore::new(limit_permits as usize)),
limit_permits,
metrics,
}),
}
}
/// Returns the configured limit in bytes (0 if unlimited).
pub fn limit_bytes(&self) -> u64 {
self.quota
.as_ref()
.map(|quota| permits_to_bytes(quota.limit_permits))
.unwrap_or(0)
}
/// Returns currently used bytes.
pub fn used_bytes(&self) -> u64 {
self.quota
.as_ref()
.map(|quota| permits_to_bytes(quota.used_permits()))
.unwrap_or(0)
}
/// Returns available bytes.
pub fn available_bytes(&self) -> u64 {
self.quota
.as_ref()
.map(|quota| permits_to_bytes(quota.available_permits_clamped()))
.unwrap_or(0)
}
/// Acquires memory, waiting if necessary until enough is available.
///
/// # Errors
/// - Returns error if requested bytes exceed the total limit
/// - Returns error if the semaphore is unexpectedly closed
pub async fn acquire(&self, bytes: u64) -> Result<MemoryGuard<M>> {
match &self.quota {
None => Ok(MemoryGuard::unlimited()),
Some(quota) => {
let permits = bytes_to_permits(bytes);
ensure!(
permits <= quota.limit_permits,
MemoryLimitExceededSnafu {
requested_bytes: bytes,
limit_bytes: permits_to_bytes(quota.limit_permits),
}
);
let permit = quota
.semaphore
.clone()
.acquire_many_owned(permits)
.await
.map_err(|_| MemorySemaphoreClosedSnafu.build())?;
quota.update_in_use_metric();
Ok(MemoryGuard::limited(permit, quota.clone()))
}
}
}
/// Tries to acquire memory. Returns Some(guard) on success, None if insufficient.
pub fn try_acquire(&self, bytes: u64) -> Option<MemoryGuard<M>> {
match &self.quota {
None => Some(MemoryGuard::unlimited()),
Some(quota) => {
let permits = bytes_to_permits(bytes);
match quota.semaphore.clone().try_acquire_many_owned(permits) {
Ok(permit) => {
quota.update_in_use_metric();
Some(MemoryGuard::limited(permit, quota.clone()))
}
Err(TryAcquireError::NoPermits) | Err(TryAcquireError::Closed) => {
quota.metrics.inc_rejected("try_acquire");
None
}
}
}
}
}
}
impl<M: MemoryMetrics> MemoryQuota<M> {
pub(crate) fn used_permits(&self) -> u32 {
self.limit_permits
.saturating_sub(self.available_permits_clamped())
}
pub(crate) fn available_permits_clamped(&self) -> u32 {
self.semaphore
.available_permits()
.min(self.limit_permits as usize) as u32
}
pub(crate) fn update_in_use_metric(&self) {
let bytes = permits_to_bytes(self.used_permits());
self.metrics.set_in_use(bytes as i64);
}
}
pub(crate) fn bytes_to_permits(bytes: u64) -> u32 {
bytes
.saturating_add(PERMIT_GRANULARITY_BYTES - 1)
.saturating_div(PERMIT_GRANULARITY_BYTES)
.min(Semaphore::MAX_PERMITS as u64)
.min(u32::MAX as u64) as u32
}
pub(crate) fn permits_to_bytes(permits: u32) -> u64 {
(permits as u64).saturating_mul(PERMIT_GRANULARITY_BYTES)
}

View File

@@ -1,83 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use humantime::{format_duration, parse_duration};
use serde::{Deserialize, Serialize};
/// Default wait timeout for memory acquisition.
pub const DEFAULT_MEMORY_WAIT_TIMEOUT: Duration = Duration::from_secs(10);
/// Defines how to react when memory cannot be acquired immediately.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OnExhaustedPolicy {
/// Wait until enough memory is released, bounded by timeout.
Wait { timeout: Duration },
/// Fail immediately if memory is not available.
Fail,
}
impl Default for OnExhaustedPolicy {
fn default() -> Self {
OnExhaustedPolicy::Wait {
timeout: DEFAULT_MEMORY_WAIT_TIMEOUT,
}
}
}
impl Serialize for OnExhaustedPolicy {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let text = match self {
OnExhaustedPolicy::Fail => "fail".to_string(),
OnExhaustedPolicy::Wait { timeout } if *timeout == DEFAULT_MEMORY_WAIT_TIMEOUT => {
"wait".to_string()
}
OnExhaustedPolicy::Wait { timeout } => format!("wait({})", format_duration(*timeout)),
};
serializer.serialize_str(&text)
}
}
impl<'de> Deserialize<'de> for OnExhaustedPolicy {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let raw = String::deserialize(deserializer)?;
let lower = raw.to_ascii_lowercase();
// Accept both "skip" (legacy) and "fail".
if lower == "skip" || lower == "fail" {
return Ok(OnExhaustedPolicy::Fail);
}
if lower == "wait" {
return Ok(OnExhaustedPolicy::default());
}
if lower.starts_with("wait(") && lower.ends_with(')') {
let inner = &raw[5..raw.len() - 1];
let timeout = parse_duration(inner).map_err(serde::de::Error::custom)?;
return Ok(OnExhaustedPolicy::Wait { timeout });
}
Err(serde::de::Error::custom(format!(
"invalid memory policy: {}, expected wait, wait(<duration>), fail",
raw
)))
}
}

View File

@@ -1,247 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use tokio::time::{Duration, sleep};
use crate::{MemoryManager, NoOpMetrics, PERMIT_GRANULARITY_BYTES};
#[test]
fn test_try_acquire_unlimited() {
let manager = MemoryManager::new(0, NoOpMetrics);
let guard = manager.try_acquire(10 * PERMIT_GRANULARITY_BYTES).unwrap();
assert_eq!(manager.limit_bytes(), 0);
assert_eq!(guard.granted_bytes(), 0);
}
#[test]
fn test_try_acquire_limited_success_and_release() {
let bytes = 2 * PERMIT_GRANULARITY_BYTES;
let manager = MemoryManager::new(bytes, NoOpMetrics);
{
let guard = manager.try_acquire(PERMIT_GRANULARITY_BYTES).unwrap();
assert_eq!(guard.granted_bytes(), PERMIT_GRANULARITY_BYTES);
assert_eq!(manager.used_bytes(), PERMIT_GRANULARITY_BYTES);
drop(guard);
}
assert_eq!(manager.used_bytes(), 0);
}
#[test]
fn test_try_acquire_exceeds_limit() {
let limit = PERMIT_GRANULARITY_BYTES;
let manager = MemoryManager::new(limit, NoOpMetrics);
let result = manager.try_acquire(limit + PERMIT_GRANULARITY_BYTES);
assert!(result.is_none());
}
#[tokio::test(flavor = "current_thread")]
async fn test_acquire_blocks_and_unblocks() {
let bytes = 2 * PERMIT_GRANULARITY_BYTES;
let manager = MemoryManager::new(bytes, NoOpMetrics);
let guard = manager.try_acquire(bytes).unwrap();
// Spawn a task that will block on acquire()
let waiter = {
let manager = manager.clone();
tokio::spawn(async move {
// This will block until memory is available
let _guard = manager.acquire(bytes).await.unwrap();
})
};
sleep(Duration::from_millis(10)).await;
// Release memory - this should unblock the waiter
drop(guard);
// Waiter should complete now
waiter.await.unwrap();
}
#[test]
fn test_request_additional_success() {
let limit = 10 * PERMIT_GRANULARITY_BYTES; // 10MB limit
let manager = MemoryManager::new(limit, NoOpMetrics);
// Acquire base quota (5MB)
let base = 5 * PERMIT_GRANULARITY_BYTES;
let mut guard = manager.try_acquire(base).unwrap();
assert_eq!(guard.granted_bytes(), base);
assert_eq!(manager.used_bytes(), base);
// Request additional memory (3MB) - should succeed and merge
assert!(guard.request_additional(3 * PERMIT_GRANULARITY_BYTES));
assert_eq!(guard.granted_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
}
#[test]
fn test_request_additional_exceeds_limit() {
let limit = 10 * PERMIT_GRANULARITY_BYTES; // 10MB limit
let manager = MemoryManager::new(limit, NoOpMetrics);
// Acquire base quota (5MB)
let base = 5 * PERMIT_GRANULARITY_BYTES;
let mut guard = manager.try_acquire(base).unwrap();
// Request additional memory (3MB) - should succeed
assert!(guard.request_additional(3 * PERMIT_GRANULARITY_BYTES));
assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
// Request more (3MB) - should fail (would exceed 10MB limit)
let result = guard.request_additional(3 * PERMIT_GRANULARITY_BYTES);
assert!(!result);
// Still at 8MB
assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
assert_eq!(guard.granted_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
}
#[test]
fn test_request_additional_auto_release_on_guard_drop() {
let limit = 10 * PERMIT_GRANULARITY_BYTES;
let manager = MemoryManager::new(limit, NoOpMetrics);
{
let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap();
// Request additional - memory is merged into guard
assert!(guard.request_additional(3 * PERMIT_GRANULARITY_BYTES));
assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
// When guard drops, all memory (base + additional) is released together
}
// After scope, all memory should be released
assert_eq!(manager.used_bytes(), 0);
}
#[test]
fn test_request_additional_unlimited() {
let manager = MemoryManager::new(0, NoOpMetrics); // Unlimited
let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap();
// Should always succeed with unlimited manager
assert!(guard.request_additional(100 * PERMIT_GRANULARITY_BYTES));
assert_eq!(guard.granted_bytes(), 0);
assert_eq!(manager.used_bytes(), 0);
}
#[test]
fn test_request_additional_zero_bytes() {
let limit = 10 * PERMIT_GRANULARITY_BYTES;
let manager = MemoryManager::new(limit, NoOpMetrics);
let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap();
// Request 0 bytes should succeed without affecting anything
assert!(guard.request_additional(0));
assert_eq!(guard.granted_bytes(), 5 * PERMIT_GRANULARITY_BYTES);
assert_eq!(manager.used_bytes(), 5 * PERMIT_GRANULARITY_BYTES);
}
#[test]
fn test_early_release_partial_success() {
let limit = 10 * PERMIT_GRANULARITY_BYTES;
let manager = MemoryManager::new(limit, NoOpMetrics);
let mut guard = manager.try_acquire(8 * PERMIT_GRANULARITY_BYTES).unwrap();
assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
// Release half
assert!(guard.early_release_partial(4 * PERMIT_GRANULARITY_BYTES));
assert_eq!(guard.granted_bytes(), 4 * PERMIT_GRANULARITY_BYTES);
assert_eq!(manager.used_bytes(), 4 * PERMIT_GRANULARITY_BYTES);
// Released memory should be available to others
let _guard2 = manager.try_acquire(4 * PERMIT_GRANULARITY_BYTES).unwrap();
assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
}
#[test]
fn test_early_release_partial_exceeds_granted() {
let manager = MemoryManager::new(10 * PERMIT_GRANULARITY_BYTES, NoOpMetrics);
let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap();
// Try to release more than granted - should fail
assert!(!guard.early_release_partial(10 * PERMIT_GRANULARITY_BYTES));
assert_eq!(guard.granted_bytes(), 5 * PERMIT_GRANULARITY_BYTES);
assert_eq!(manager.used_bytes(), 5 * PERMIT_GRANULARITY_BYTES);
}
#[test]
fn test_early_release_partial_unlimited() {
let manager = MemoryManager::new(0, NoOpMetrics);
let mut guard = manager.try_acquire(100 * PERMIT_GRANULARITY_BYTES).unwrap();
// Unlimited guard - release should succeed (no-op)
assert!(guard.early_release_partial(50 * PERMIT_GRANULARITY_BYTES));
assert_eq!(guard.granted_bytes(), 0);
}
#[test]
fn test_request_and_early_release_symmetry() {
let limit = 20 * PERMIT_GRANULARITY_BYTES;
let manager = MemoryManager::new(limit, NoOpMetrics);
let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap();
// Request additional
assert!(guard.request_additional(5 * PERMIT_GRANULARITY_BYTES));
assert_eq!(guard.granted_bytes(), 10 * PERMIT_GRANULARITY_BYTES);
assert_eq!(manager.used_bytes(), 10 * PERMIT_GRANULARITY_BYTES);
// Early release some
assert!(guard.early_release_partial(3 * PERMIT_GRANULARITY_BYTES));
assert_eq!(guard.granted_bytes(), 7 * PERMIT_GRANULARITY_BYTES);
assert_eq!(manager.used_bytes(), 7 * PERMIT_GRANULARITY_BYTES);
// Request again
assert!(guard.request_additional(2 * PERMIT_GRANULARITY_BYTES));
assert_eq!(guard.granted_bytes(), 9 * PERMIT_GRANULARITY_BYTES);
assert_eq!(manager.used_bytes(), 9 * PERMIT_GRANULARITY_BYTES);
// Early release again
assert!(guard.early_release_partial(4 * PERMIT_GRANULARITY_BYTES));
assert_eq!(guard.granted_bytes(), 5 * PERMIT_GRANULARITY_BYTES);
assert_eq!(manager.used_bytes(), 5 * PERMIT_GRANULARITY_BYTES);
drop(guard);
assert_eq!(manager.used_bytes(), 0);
}
#[test]
fn test_small_allocation_rounds_up() {
// Test that allocations smaller than PERMIT_GRANULARITY_BYTES
// round up to 1 permit and can use request_additional()
let limit = 10 * PERMIT_GRANULARITY_BYTES;
let manager = MemoryManager::new(limit, NoOpMetrics);
let mut guard = manager.try_acquire(512 * 1024).unwrap(); // 512KB
assert_eq!(guard.granted_bytes(), PERMIT_GRANULARITY_BYTES); // Rounds up to 1MB
assert!(guard.request_additional(2 * PERMIT_GRANULARITY_BYTES)); // Can request more
assert_eq!(guard.granted_bytes(), 3 * PERMIT_GRANULARITY_BYTES);
}
#[test]
fn test_acquire_zero_bytes_lazy_allocation() {
// Test that acquire(0) returns 0 permits but can request_additional() later
let manager = MemoryManager::new(10 * PERMIT_GRANULARITY_BYTES, NoOpMetrics);
let mut guard = manager.try_acquire(0).unwrap();
assert_eq!(guard.granted_bytes(), 0); // No permits consumed
assert_eq!(manager.used_bytes(), 0);
assert!(guard.request_additional(3 * PERMIT_GRANULARITY_BYTES)); // Lazy allocation
assert_eq!(guard.granted_bytes(), 3 * PERMIT_GRANULARITY_BYTES);
}

View File

@@ -58,14 +58,10 @@ pub fn get_total_memory_bytes() -> i64 {
}
}
/// Get the total CPU cores. The result will be rounded up to the next integer (ceiling).
/// For example, if the total CPU is 1.1 cores (1100 millicores) or 1.5 cores (1500 millicores), the result will be 2.
/// Get the total CPU cores. The result will be rounded to the nearest integer.
/// For example, if the total CPU is 1.5 cores(1500 millicores), the result will be 2.
pub fn get_total_cpu_cores() -> usize {
cpu_cores(get_total_cpu_millicores())
}
fn cpu_cores(cpu_millicores: i64) -> usize {
((cpu_millicores as f64) / 1_000.0).ceil() as usize
((get_total_cpu_millicores() as f64) / 1000.0).round() as usize
}
/// Get the total memory in readable size.
@@ -182,13 +178,6 @@ mod tests {
#[test]
fn test_get_total_cpu_cores() {
assert!(get_total_cpu_cores() > 0);
assert_eq!(cpu_cores(1), 1);
assert_eq!(cpu_cores(100), 1);
assert_eq!(cpu_cores(500), 1);
assert_eq!(cpu_cores(1000), 1);
assert_eq!(cpu_cores(1100), 2);
assert_eq!(cpu_cores(1900), 2);
assert_eq!(cpu_cores(10_000), 10);
}
#[test]

View File

@@ -410,6 +410,14 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to build cache store"))]
BuildCacheStore {
#[snafu(source)]
error: object_store::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Not yet implemented: {what}"))]
NotYetImplemented { what: String },
}
@@ -485,6 +493,7 @@ impl ErrorExt for Error {
SerializeJson { .. } => StatusCode::Internal,
ObjectStore { source, .. } => source.status_code(),
BuildCacheStore { .. } => StatusCode::StorageUnavailable,
}
}

View File

@@ -14,10 +14,15 @@
//! object storage utilities
use common_telemetry::{info, warn};
use std::sync::Arc;
use common_telemetry::info;
use object_store::config::ObjectStorageCacheConfig;
use object_store::factory::new_raw_object_store;
use object_store::layers::LruCacheLayer;
use object_store::services::Fs;
use object_store::util::{clean_temp_dir, join_dir, with_instrument_layers, with_retry_layers};
use object_store::{ATOMIC_WRITE_DIR, ObjectStore};
use object_store::{ATOMIC_WRITE_DIR, Access, ObjectStore, ObjectStoreBuilder};
use snafu::prelude::*;
use crate::config::ObjectStoreConfig;
@@ -42,58 +47,23 @@ pub(crate) async fn new_object_store_without_cache(
Ok(object_store)
}
/// Cleans up old LRU read cache directories that were removed.
fn clean_old_read_cache(store: &ObjectStoreConfig, data_home: &str) {
if !store.is_object_storage() {
return;
}
let Some(cache_config) = store.cache_config() else {
return;
};
// Only cleans if read cache was enabled
if !cache_config.enable_read_cache {
return;
}
let cache_base_dir = if cache_config.cache_path.is_empty() {
data_home
} else {
&cache_config.cache_path
};
// Cleans up the old read cache directory
let old_read_cache_dir = join_dir(cache_base_dir, "cache/object/read");
info!(
"Cleaning up old read cache directory: {}",
old_read_cache_dir
);
if let Err(e) = clean_temp_dir(&old_read_cache_dir) {
warn!(e; "Failed to clean old read cache directory {}", old_read_cache_dir);
}
// Cleans up the atomic temp dir used by the cache layer
let cache_atomic_temp_dir = join_dir(cache_base_dir, ATOMIC_WRITE_DIR);
info!(
"Cleaning up old cache atomic temp directory: {}",
cache_atomic_temp_dir
);
if let Err(e) = clean_temp_dir(&cache_atomic_temp_dir) {
warn!(e; "Failed to clean old cache atomic temp directory {}", cache_atomic_temp_dir);
}
}
pub async fn new_object_store(store: ObjectStoreConfig, data_home: &str) -> Result<ObjectStore> {
// Cleans up old LRU read cache directories.
// TODO: Remove this line after the 1.0 release.
clean_old_read_cache(&store, data_home);
let object_store = new_raw_object_store(&store, data_home)
.await
.context(error::ObjectStoreSnafu)?;
// Enables retry layer for non-fs object storages
// Enable retry layer and cache layer for non-fs object storages
let object_store = if store.is_object_storage() {
let object_store = {
// It's safe to unwrap here because we already checked above.
let cache_config = store.cache_config().unwrap();
if let Some(cache_layer) = build_cache_layer(cache_config, data_home).await? {
// Adds cache layer
object_store.layer(cache_layer)
} else {
object_store
}
};
// Adds retry layer
with_retry_layers(object_store)
} else {
@@ -103,3 +73,40 @@ pub async fn new_object_store(store: ObjectStoreConfig, data_home: &str) -> Resu
let object_store = with_instrument_layers(object_store, true);
Ok(object_store)
}
async fn build_cache_layer(
cache_config: &ObjectStorageCacheConfig,
data_home: &str,
) -> Result<Option<LruCacheLayer<impl Access>>> {
// No need to build cache layer if read cache is disabled.
if !cache_config.enable_read_cache {
return Ok(None);
}
let cache_base_dir = if cache_config.cache_path.is_empty() {
data_home
} else {
&cache_config.cache_path
};
let atomic_temp_dir = join_dir(cache_base_dir, ATOMIC_WRITE_DIR);
clean_temp_dir(&atomic_temp_dir).context(error::ObjectStoreSnafu)?;
let cache_store = Fs::default()
.root(cache_base_dir)
.atomic_write_dir(&atomic_temp_dir)
.build()
.context(error::BuildCacheStoreSnafu)?;
let cache_layer = LruCacheLayer::new(
Arc::new(cache_store),
cache_config.cache_capacity.0 as usize,
)
.context(error::BuildCacheStoreSnafu)?;
cache_layer.recover_cache(false).await;
info!(
"Enabled local object storage cache, path: {}, capacity: {}.",
cache_config.cache_path, cache_config.cache_capacity
);
Ok(Some(cache_layer))
}

View File

@@ -33,9 +33,9 @@ use servers::grpc::FlightCompression;
use session::context::QueryContextRef;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{
CopyRegionFromRequest, CopyRegionFromResponse, RegionEngine, RegionManifestInfo, RegionRole,
RegionScannerRef, RegionStatistic, RemapManifestsRequest, RemapManifestsResponse,
SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse,
RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic,
RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse,
SettableRegionRoleState, SyncManifestResponse,
};
use store_api::region_request::{AffectedRows, RegionRequest};
use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
@@ -299,14 +299,6 @@ impl RegionEngine for MockRegionEngine {
unimplemented!()
}
async fn copy_region_from(
&self,
_region_id: RegionId,
_request: CopyRegionFromRequest,
) -> Result<CopyRegionFromResponse, BoxedError> {
unimplemented!()
}
fn as_any(&self) -> &dyn Any {
self
}

View File

@@ -33,8 +33,7 @@ pub use crate::schema::column_schema::{
COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE, COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY,
COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY, ColumnExtType, ColumnSchema, FULLTEXT_KEY,
FulltextAnalyzer, FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, Metadata,
SKIPPING_INDEX_KEY, SkippingIndexOptions, SkippingIndexType, TIME_INDEX_KEY, VECTOR_INDEX_KEY,
VectorDistanceMetric, VectorIndexEngineType, VectorIndexOptions,
SKIPPING_INDEX_KEY, SkippingIndexOptions, SkippingIndexType, TIME_INDEX_KEY,
};
pub use crate::schema::constraint::ColumnDefaultConstraint;
pub use crate::schema::raw::RawSchema;

View File

@@ -46,8 +46,6 @@ pub const FULLTEXT_KEY: &str = "greptime:fulltext";
pub const INVERTED_INDEX_KEY: &str = "greptime:inverted_index";
/// Key used to store skip options in arrow field's metadata.
pub const SKIPPING_INDEX_KEY: &str = "greptime:skipping_index";
/// Key used to store vector index options in arrow field's metadata.
pub const VECTOR_INDEX_KEY: &str = "greptime:vector_index";
/// Keys used in fulltext options
pub const COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE: &str = "enable";
@@ -218,53 +216,6 @@ impl ColumnSchema {
self.metadata.contains_key(INVERTED_INDEX_KEY)
}
/// Checks if this column has a vector index.
pub fn is_vector_indexed(&self) -> bool {
match self.vector_index_options() {
Ok(opts) => opts.is_some(),
Err(e) => {
common_telemetry::warn!(
"Failed to deserialize vector_index_options for column '{}': {}",
self.name,
e
);
false
}
}
}
/// Gets the vector index options.
pub fn vector_index_options(&self) -> Result<Option<VectorIndexOptions>> {
match self.metadata.get(VECTOR_INDEX_KEY) {
None => Ok(None),
Some(json) => {
let options =
serde_json::from_str(json).context(error::DeserializeSnafu { json })?;
Ok(Some(options))
}
}
}
/// Sets the vector index options.
pub fn set_vector_index_options(&mut self, options: &VectorIndexOptions) -> Result<()> {
self.metadata.insert(
VECTOR_INDEX_KEY.to_string(),
serde_json::to_string(options).context(error::SerializeSnafu)?,
);
Ok(())
}
/// Removes the vector index options.
pub fn unset_vector_index_options(&mut self) {
self.metadata.remove(VECTOR_INDEX_KEY);
}
/// Sets vector index options and returns self for chaining.
pub fn with_vector_index_options(mut self, options: &VectorIndexOptions) -> Result<Self> {
self.set_vector_index_options(options)?;
Ok(self)
}
/// Set default constraint.
///
/// If a default constraint exists for the column, this method will
@@ -1013,181 +964,6 @@ impl TryFrom<HashMap<String, String>> for SkippingIndexOptions {
}
}
/// Distance metric for vector similarity search.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default, Visit, VisitMut)]
#[serde(rename_all = "lowercase")]
pub enum VectorDistanceMetric {
/// Squared Euclidean distance (L2^2).
#[default]
L2sq,
/// Cosine distance (1 - cosine similarity).
Cosine,
/// Inner product (negative, for maximum inner product search).
#[serde(alias = "ip")]
InnerProduct,
}
impl fmt::Display for VectorDistanceMetric {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
VectorDistanceMetric::L2sq => write!(f, "l2sq"),
VectorDistanceMetric::Cosine => write!(f, "cosine"),
VectorDistanceMetric::InnerProduct => write!(f, "ip"),
}
}
}
impl std::str::FromStr for VectorDistanceMetric {
type Err = String;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"l2sq" | "l2" | "euclidean" => Ok(VectorDistanceMetric::L2sq),
"cosine" | "cos" => Ok(VectorDistanceMetric::Cosine),
"inner_product" | "ip" | "dot" => Ok(VectorDistanceMetric::InnerProduct),
_ => Err(format!(
"Unknown distance metric: {}. Expected: l2sq, cosine, or ip",
s
)),
}
}
}
impl VectorDistanceMetric {
/// Returns the metric as u8 for blob serialization.
pub fn as_u8(&self) -> u8 {
match self {
Self::L2sq => 0,
Self::Cosine => 1,
Self::InnerProduct => 2,
}
}
/// Parses metric from u8 (used when reading blob).
pub fn try_from_u8(v: u8) -> Option<Self> {
match v {
0 => Some(Self::L2sq),
1 => Some(Self::Cosine),
2 => Some(Self::InnerProduct),
_ => None,
}
}
}
/// Default HNSW connectivity parameter.
const DEFAULT_VECTOR_INDEX_CONNECTIVITY: u32 = 16;
/// Default expansion factor during index construction.
const DEFAULT_VECTOR_INDEX_EXPANSION_ADD: u32 = 128;
/// Default expansion factor during search.
const DEFAULT_VECTOR_INDEX_EXPANSION_SEARCH: u32 = 64;
fn default_vector_index_connectivity() -> u32 {
DEFAULT_VECTOR_INDEX_CONNECTIVITY
}
fn default_vector_index_expansion_add() -> u32 {
DEFAULT_VECTOR_INDEX_EXPANSION_ADD
}
fn default_vector_index_expansion_search() -> u32 {
DEFAULT_VECTOR_INDEX_EXPANSION_SEARCH
}
/// Supported vector index engine types.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize, Visit, VisitMut)]
#[serde(rename_all = "lowercase")]
pub enum VectorIndexEngineType {
/// USearch HNSW implementation.
#[default]
Usearch,
// Future: Vsag,
}
impl VectorIndexEngineType {
/// Returns the engine type as u8 for blob serialization.
pub fn as_u8(&self) -> u8 {
match self {
Self::Usearch => 0,
}
}
/// Parses engine type from u8 (used when reading blob).
pub fn try_from_u8(v: u8) -> Option<Self> {
match v {
0 => Some(Self::Usearch),
_ => None,
}
}
}
impl fmt::Display for VectorIndexEngineType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Usearch => write!(f, "usearch"),
}
}
}
impl std::str::FromStr for VectorIndexEngineType {
type Err = String;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"usearch" => Ok(Self::Usearch),
_ => Err(format!(
"Unknown vector index engine: {}. Expected: usearch",
s
)),
}
}
}
/// Options for vector index (HNSW).
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Visit, VisitMut)]
#[serde(rename_all = "kebab-case")]
pub struct VectorIndexOptions {
/// Vector index engine type (default: usearch).
#[serde(default)]
pub engine: VectorIndexEngineType,
/// Distance metric for similarity search.
#[serde(default)]
pub metric: VectorDistanceMetric,
/// HNSW connectivity parameter (M in the paper).
/// Higher values improve recall but increase memory usage.
#[serde(default = "default_vector_index_connectivity")]
pub connectivity: u32,
/// Expansion factor during index construction (ef_construction).
/// Higher values improve index quality but slow down construction.
#[serde(default = "default_vector_index_expansion_add")]
pub expansion_add: u32,
/// Expansion factor during search (ef_search).
/// Higher values improve recall but slow down search.
#[serde(default = "default_vector_index_expansion_search")]
pub expansion_search: u32,
}
impl Default for VectorIndexOptions {
fn default() -> Self {
Self {
engine: VectorIndexEngineType::default(),
metric: VectorDistanceMetric::default(),
connectivity: DEFAULT_VECTOR_INDEX_CONNECTIVITY,
expansion_add: DEFAULT_VECTOR_INDEX_EXPANSION_ADD,
expansion_search: DEFAULT_VECTOR_INDEX_EXPANSION_SEARCH,
}
}
}
impl fmt::Display for VectorIndexOptions {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"engine={}, metric={}, connectivity={}, expansion_add={}, expansion_search={}",
self.engine, self.metric, self.connectivity, self.expansion_add, self.expansion_search
)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;

View File

@@ -26,10 +26,10 @@ use object_store::ObjectStore;
use snafu::{OptionExt, ensure};
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{
CopyRegionFromRequest, CopyRegionFromResponse, RegionEngine, RegionManifestInfo, RegionRole,
RegionScannerRef, RegionStatistic, RemapManifestsRequest, RemapManifestsResponse,
SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState,
SinglePartitionScanner, SyncManifestResponse,
RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic,
RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse,
SetRegionRoleStateSuccess, SettableRegionRoleState, SinglePartitionScanner,
SyncManifestResponse,
};
use store_api::region_request::{
AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest,
@@ -163,19 +163,6 @@ impl RegionEngine for FileRegionEngine {
))
}
async fn copy_region_from(
&self,
_region_id: RegionId,
_request: CopyRegionFromRequest,
) -> Result<CopyRegionFromResponse, BoxedError> {
Err(BoxedError::new(
UnsupportedSnafu {
operation: "copy_region_from",
}
.build(),
))
}
fn role(&self, region_id: RegionId) -> Option<RegionRole> {
self.inner.state(region_id)
}

View File

@@ -110,26 +110,6 @@ impl FrontendClient {
)
}
/// Check if the frontend client is initialized.
///
/// In distributed mode, it is always initialized.
/// In standalone mode, it checks if the database client is set.
pub fn is_initialized(&self) -> bool {
match self {
FrontendClient::Distributed { .. } => true,
FrontendClient::Standalone {
database_client, ..
} => {
let guard = database_client.lock();
if let Ok(guard) = guard {
guard.is_some()
} else {
false
}
}
}
}
pub fn from_meta_client(
meta_client: Arc<MetaClient>,
auth: Option<FlowAuthHeader>,

View File

@@ -301,15 +301,39 @@ impl GrpcQueryHandler for Instance {
mut stream: servers::grpc::flight::PutRecordBatchRequestStream,
ctx: QueryContextRef,
) -> Pin<Box<dyn Stream<Item = Result<DoPutResponse>> + Send>> {
// Resolve table once for the stream
// Clone all necessary data to make it 'static
let catalog_manager = self.catalog_manager().clone();
let plugins = self.plugins.clone();
let inserter = self.inserter.clone();
let table_name = stream.table_name().clone();
let ctx = ctx.clone();
let mut table_ref: Option<TableRef> = None;
let mut table_checked = false;
Box::pin(try_stream! {
plugins
.get::<PermissionCheckerRef>()
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::BulkInsert)
.context(PermissionSnafu)?;
// Cache for resolved table reference - resolve once and reuse
let table_ref = catalog_manager
.table(
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
None,
)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: table_name.to_string(),
})?;
// Check permissions once for the stream
let interceptor_ref = plugins.get::<GrpcQueryInterceptorRef<Error>>();
let interceptor = interceptor_ref.as_ref();
interceptor.pre_bulk_insert(table_ref.clone(), ctx.clone())?;
// Process each request in the stream
while let Some(request_result) = stream.next().await {
let request = request_result.map_err(|e| {
@@ -317,45 +341,11 @@ impl GrpcQueryHandler for Instance {
IncompleteGrpcRequestSnafu { err_msg: error_msg }.build()
})?;
// Resolve table and check permissions on first RecordBatch (after schema is received)
if !table_checked {
let table_name = &request.table_name;
plugins
.get::<PermissionCheckerRef>()
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::BulkInsert)
.context(PermissionSnafu)?;
// Resolve table reference
table_ref = Some(
catalog_manager
.table(
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
None,
)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: table_name.to_string(),
})?,
);
// Check permissions for the table
let interceptor_ref = plugins.get::<GrpcQueryInterceptorRef<Error>>();
let interceptor = interceptor_ref.as_ref();
interceptor.pre_bulk_insert(table_ref.clone().unwrap(), ctx.clone())?;
table_checked = true;
}
let request_id = request.request_id;
let start = Instant::now();
let rows = inserter
.handle_bulk_insert(
table_ref.clone().unwrap(),
table_ref.clone(),
request.flight_data,
request.record_batch,
request.schema_bytes,

View File

@@ -7,9 +7,6 @@ license.workspace = true
[lints]
workspace = true
[features]
vector_index = ["dep:usearch"]
[dependencies]
async-trait.workspace = true
asynchronous-codec = "0.7.0"
@@ -20,7 +17,6 @@ common-error.workspace = true
common-macro.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
datatypes.workspace = true
fastbloom = "0.8"
fst.workspace = true
futures.workspace = true
@@ -29,7 +25,6 @@ itertools.workspace = true
jieba-rs = "0.8"
lazy_static.workspace = true
mockall.workspace = true
nalgebra.workspace = true
pin-project.workspace = true
prost.workspace = true
puffin.workspace = true
@@ -44,7 +39,6 @@ tantivy = { version = "0.24", features = ["zstd-compression"] }
tantivy-jieba = "0.16"
tokio.workspace = true
tokio-util.workspace = true
usearch = { version = "2.21", default-features = false, features = ["fp16lib"], optional = true }
uuid.workspace = true
[dev-dependencies]

View File

@@ -22,8 +22,6 @@ pub mod external_provider;
pub mod fulltext_index;
pub mod inverted_index;
pub mod target;
#[cfg(feature = "vector_index")]
pub mod vector;
pub type Bytes = Vec<u8>;
pub type BytesRef<'a> = &'a [u8];

View File

@@ -1,163 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Vector index types and options.
//!
//! This module re-exports types from `datatypes` and provides conversions
//! to USearch types, as well as distance computation functions.
pub use datatypes::schema::{VectorDistanceMetric, VectorIndexOptions};
use nalgebra::DVectorView;
pub use usearch::MetricKind;
/// Converts a VectorDistanceMetric to a USearch MetricKind.
pub fn distance_metric_to_usearch(metric: VectorDistanceMetric) -> MetricKind {
match metric {
VectorDistanceMetric::L2sq => MetricKind::L2sq,
VectorDistanceMetric::Cosine => MetricKind::Cos,
VectorDistanceMetric::InnerProduct => MetricKind::IP,
}
}
/// Computes distance between two vectors using the specified metric.
///
/// Uses SIMD-optimized implementations via nalgebra.
///
/// **Note:** The caller must ensure that the two vectors have the same length
/// and are non-empty. Empty vectors return 0.0 for all metrics.
pub fn compute_distance(v1: &[f32], v2: &[f32], metric: VectorDistanceMetric) -> f32 {
// Empty vectors are degenerate; return 0.0 uniformly across all metrics.
if v1.is_empty() || v2.is_empty() {
return 0.0;
}
match metric {
VectorDistanceMetric::L2sq => l2sq(v1, v2),
VectorDistanceMetric::Cosine => cosine(v1, v2),
VectorDistanceMetric::InnerProduct => -dot(v1, v2),
}
}
/// Calculates the squared L2 distance between two vectors.
fn l2sq(lhs: &[f32], rhs: &[f32]) -> f32 {
let lhs = DVectorView::from_slice(lhs, lhs.len());
let rhs = DVectorView::from_slice(rhs, rhs.len());
(lhs - rhs).norm_squared()
}
/// Calculates the cosine distance between two vectors.
///
/// Returns a value in `[0.0, 2.0]` where 0.0 means identical direction and 2.0 means
/// opposite direction. For degenerate cases (zero or near-zero magnitude vectors),
/// returns 1.0 (maximum uncertainty) to avoid NaN and ensure safe index operations.
fn cosine(lhs: &[f32], rhs: &[f32]) -> f32 {
let lhs_vec = DVectorView::from_slice(lhs, lhs.len());
let rhs_vec = DVectorView::from_slice(rhs, rhs.len());
let dot_product = lhs_vec.dot(&rhs_vec);
let lhs_norm = lhs_vec.norm();
let rhs_norm = rhs_vec.norm();
// Zero-magnitude vectors have undefined direction; return max distance as safe fallback.
if dot_product.abs() < f32::EPSILON
|| lhs_norm.abs() < f32::EPSILON
|| rhs_norm.abs() < f32::EPSILON
{
return 1.0;
}
let cos_similar = dot_product / (lhs_norm * rhs_norm);
let res = 1.0 - cos_similar;
// Clamp near-zero results to exactly 0.0 to avoid floating-point artifacts.
if res.abs() < f32::EPSILON { 0.0 } else { res }
}
/// Calculates the dot product between two vectors.
fn dot(lhs: &[f32], rhs: &[f32]) -> f32 {
let lhs = DVectorView::from_slice(lhs, lhs.len());
let rhs = DVectorView::from_slice(rhs, rhs.len());
lhs.dot(&rhs)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_distance_metric_to_usearch() {
assert_eq!(
distance_metric_to_usearch(VectorDistanceMetric::L2sq),
MetricKind::L2sq
);
assert_eq!(
distance_metric_to_usearch(VectorDistanceMetric::Cosine),
MetricKind::Cos
);
assert_eq!(
distance_metric_to_usearch(VectorDistanceMetric::InnerProduct),
MetricKind::IP
);
}
#[test]
fn test_vector_index_options_default() {
let options = VectorIndexOptions::default();
assert_eq!(options.metric, VectorDistanceMetric::L2sq);
assert_eq!(options.connectivity, 16);
assert_eq!(options.expansion_add, 128);
assert_eq!(options.expansion_search, 64);
}
#[test]
fn test_compute_distance_l2sq() {
let v1 = vec![1.0, 2.0, 3.0];
let v2 = vec![4.0, 5.0, 6.0];
// L2sq = (4-1)^2 + (5-2)^2 + (6-3)^2 = 9 + 9 + 9 = 27
let dist = compute_distance(&v1, &v2, VectorDistanceMetric::L2sq);
assert!((dist - 27.0).abs() < 1e-6);
}
#[test]
fn test_compute_distance_cosine() {
let v1 = vec![1.0, 0.0, 0.0];
let v2 = vec![0.0, 1.0, 0.0];
// Orthogonal vectors have cosine similarity of 0, distance of 1
let dist = compute_distance(&v1, &v2, VectorDistanceMetric::Cosine);
assert!((dist - 1.0).abs() < 1e-6);
}
#[test]
fn test_compute_distance_inner_product() {
let v1 = vec![1.0, 2.0, 3.0];
let v2 = vec![4.0, 5.0, 6.0];
// Inner product = 1*4 + 2*5 + 3*6 = 4 + 10 + 18 = 32
// Distance is negated: -32
let dist = compute_distance(&v1, &v2, VectorDistanceMetric::InnerProduct);
assert!((dist - (-32.0)).abs() < 1e-6);
}
#[test]
fn test_compute_distance_empty_vectors() {
// Empty vectors should return 0.0 uniformly for all metrics
assert_eq!(compute_distance(&[], &[], VectorDistanceMetric::L2sq), 0.0);
assert_eq!(
compute_distance(&[], &[], VectorDistanceMetric::Cosine),
0.0
);
assert_eq!(
compute_distance(&[], &[], VectorDistanceMetric::InnerProduct),
0.0
);
}
}

View File

@@ -291,7 +291,7 @@ pub async fn metasrv_builder(
use common_meta::distributed_time_constants::POSTGRES_KEEP_ALIVE_SECS;
use common_meta::kv_backend::rds::PgStore;
use deadpool_postgres::{Config, ManagerConfig, RecyclingMethod};
use deadpool_postgres::Config;
use crate::election::rds::postgres::{ElectionPgClient, PgElection};
use crate::utils::postgres::create_postgres_pool;
@@ -305,16 +305,9 @@ pub async fn metasrv_builder(
let mut cfg = Config::new();
cfg.keepalives = Some(true);
cfg.keepalives_idle = Some(Duration::from_secs(POSTGRES_KEEP_ALIVE_SECS));
cfg.manager = Some(ManagerConfig {
recycling_method: RecyclingMethod::Verified,
});
// Use a dedicated pool for the election client to allow customized session settings.
let pool = create_postgres_pool(
&opts.store_addrs,
Some(cfg.clone()),
opts.backend_tls.clone(),
)
.await?;
// We use a separate pool for election since we need a different session keep-alive idle time.
let pool = create_postgres_pool(&opts.store_addrs, Some(cfg), opts.backend_tls.clone())
.await?;
let election_client = ElectionPgClient::new(
pool,
@@ -334,8 +327,8 @@ pub async fn metasrv_builder(
)
.await?;
let pool = create_postgres_pool(&opts.store_addrs, Some(cfg), opts.backend_tls.clone())
.await?;
let pool =
create_postgres_pool(&opts.store_addrs, None, opts.backend_tls.clone()).await?;
let kv_backend = PgStore::with_pg_pool(
pool,
opts.meta_schema_name.as_deref(),

View File

@@ -135,9 +135,6 @@ async fn test_full_gc_workflow() {
);
}
/// Due to https://github.com/rust-lang/rust/issues/100141 can't have Instant early than process start time on non-linux OS
/// This is fine since in real usage instant will always be after process start time
#[cfg(target_os = "linux")]
#[tokio::test]
async fn test_tracker_cleanup() {
init_default_ut_logging();

View File

@@ -43,10 +43,9 @@ pub(crate) use state::MetricEngineState;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use store_api::region_engine::{
BatchResponses, CopyRegionFromRequest, CopyRegionFromResponse, RegionEngine,
RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic, RemapManifestsRequest,
RemapManifestsResponse, SetRegionRoleStateResponse, SetRegionRoleStateSuccess,
SettableRegionRoleState, SyncManifestResponse,
BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
RegionStatistic, RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse,
SetRegionRoleStateSuccess, SettableRegionRoleState, SyncManifestResponse,
};
use store_api::region_request::{
BatchRegionDdlRequest, RegionCatchupRequest, RegionOpenRequest, RegionRequest,
@@ -376,14 +375,6 @@ impl RegionEngine for MetricEngine {
}
}
async fn copy_region_from(
&self,
_region_id: RegionId,
_request: CopyRegionFromRequest,
) -> Result<CopyRegionFromResponse, BoxedError> {
todo!()
}
async fn set_region_role_state_gracefully(
&self,
region_id: RegionId,

View File

@@ -30,7 +30,6 @@ common-error.workspace = true
common-grpc.workspace = true
common-macro.workspace = true
common-meta.workspace = true
common-memory-manager.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true
common-runtime.workspace = true
@@ -49,7 +48,6 @@ dotenv.workspace = true
either.workspace = true
futures.workspace = true
humantime-serde.workspace = true
humantime.workspace = true
index.workspace = true
itertools.workspace = true
greptime-proto.workspace = true

View File

@@ -34,7 +34,6 @@ use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef}
use index::result_cache::IndexResultCache;
use moka::notification::RemovalCause;
use moka::sync::Cache;
use object_store::ObjectStore;
use parquet::file::metadata::ParquetMetaData;
use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
use store_api::storage::{ConcreteDataType, FileId, RegionId, TimeSeriesRowSelector};
@@ -264,26 +263,6 @@ impl CacheStrategy {
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
}
}
/// Triggers download if the strategy is [CacheStrategy::EnableAll] and write cache is available.
pub fn maybe_download_background(
&self,
index_key: IndexKey,
remote_path: String,
remote_store: ObjectStore,
file_size: u64,
) {
if let CacheStrategy::EnableAll(cache_manager) = self
&& let Some(write_cache) = cache_manager.write_cache()
{
write_cache.file_cache().maybe_download_background(
index_key,
remote_path,
remote_store,
file_size,
);
}
}
}
/// Manages cached data for the engine.

View File

@@ -31,7 +31,7 @@ use object_store::{ErrorKind, ObjectStore, Reader};
use parquet::file::metadata::ParquetMetaData;
use snafu::ResultExt;
use store_api::storage::{FileId, RegionId};
use tokio::sync::mpsc::{Sender, UnboundedReceiver};
use tokio::sync::mpsc::UnboundedReceiver;
use crate::access_layer::TempFileCleaner;
use crate::cache::{FILE_TYPE, INDEX_TYPE};
@@ -55,17 +55,6 @@ pub(crate) const DEFAULT_INDEX_CACHE_PERCENT: u8 = 20;
/// Minimum capacity for each cache (512MB).
const MIN_CACHE_CAPACITY: u64 = 512 * 1024 * 1024;
/// Channel capacity for background download tasks.
const DOWNLOAD_TASK_CHANNEL_SIZE: usize = 64;
/// A task to download a file in the background.
struct DownloadTask {
index_key: IndexKey,
remote_path: String,
remote_store: ObjectStore,
file_size: u64,
}
/// Inner struct for FileCache that can be used in spawned tasks.
#[derive(Debug)]
struct FileCacheInner {
@@ -181,8 +170,8 @@ impl FileCacheInner {
remote_path: &str,
remote_store: &ObjectStore,
file_size: u64,
concurrency: usize,
) -> Result<()> {
const DOWNLOAD_READER_CONCURRENCY: usize = 8;
const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8);
let file_type = index_key.file_type;
@@ -195,7 +184,7 @@ impl FileCacheInner {
let reader = remote_store
.reader_with(remote_path)
.concurrent(concurrency)
.concurrent(DOWNLOAD_READER_CONCURRENCY)
.chunk(DOWNLOAD_READER_CHUNK_SIZE.as_bytes() as usize)
.await
.context(error::OpenDalSnafu)?
@@ -249,14 +238,11 @@ impl FileCacheInner {
remote_path: &str,
remote_store: &ObjectStore,
file_size: u64,
concurrency: usize,
) -> Result<()> {
if let Err(e) = self
.download_without_cleaning(index_key, remote_path, remote_store, file_size, concurrency)
.download_without_cleaning(index_key, remote_path, remote_store, file_size)
.await
{
error!(e; "Failed to download file '{}' for region {}", remote_path, index_key.region_id);
let filename = index_key.to_string();
TempFileCleaner::clean_atomic_dir_files(&self.local_store, &[&filename]).await;
@@ -265,11 +251,6 @@ impl FileCacheInner {
Ok(())
}
/// Checks if the key is in the file cache.
fn contains_key(&self, key: &IndexKey) -> bool {
self.memory_index(key.file_type).contains_key(key)
}
}
/// A file cache manages files on local store and evict files based
@@ -280,8 +261,6 @@ pub(crate) struct FileCache {
inner: Arc<FileCacheInner>,
/// Capacity of the puffin (index) cache in bytes.
puffin_capacity: u64,
/// Channel for background download tasks. None if background worker is disabled.
download_task_tx: Option<Sender<DownloadTask>>,
}
pub(crate) type FileCacheRef = Arc<FileCache>;
@@ -293,7 +272,6 @@ impl FileCache {
capacity: ReadableSize,
ttl: Option<Duration>,
index_cache_percent: Option<u8>,
enable_background_worker: bool,
) -> FileCache {
// Validate and use the provided percent or default
let index_percent = index_cache_percent
@@ -328,54 +306,12 @@ impl FileCache {
puffin_index,
});
// Only create channel and spawn worker if background download is enabled
let download_task_tx = if enable_background_worker {
let (tx, rx) = tokio::sync::mpsc::channel(DOWNLOAD_TASK_CHANNEL_SIZE);
Self::spawn_download_worker(inner.clone(), rx);
Some(tx)
} else {
None
};
FileCache {
inner,
puffin_capacity,
download_task_tx,
}
}
/// Spawns a background worker to process download tasks.
fn spawn_download_worker(
inner: Arc<FileCacheInner>,
mut download_task_rx: tokio::sync::mpsc::Receiver<DownloadTask>,
) {
tokio::spawn(async move {
info!("Background download worker started");
while let Some(task) = download_task_rx.recv().await {
// Check if the file is already in the cache
if inner.contains_key(&task.index_key) {
debug!(
"Skipping background download for region {}, file {} - already in cache",
task.index_key.region_id, task.index_key.file_id
);
continue;
}
// Ignores background download errors.
let _ = inner
.download(
task.index_key,
&task.remote_path,
&task.remote_store,
task.file_size,
1, // Background downloads use concurrency=1
)
.await;
}
info!("Background download worker stopped");
});
}
/// Builds a cache for a specific file type.
fn build_cache(
local_store: ObjectStore,
@@ -397,9 +333,11 @@ impl FileCache {
let file_path = cache_file_path(FILE_DIR, *key);
async move {
if let RemovalCause::Replaced = cause {
// The cache is replaced by another file (maybe download again). We don't remove the same
// The cache is replaced by another file. This is unexpected, we don't remove the same
// file but updates the metrics as the file is already replaced by users.
CACHE_BYTES.with_label_values(&[label]).sub(value.file_size.into());
// TODO(yingwen): Don't log warn later.
warn!("Replace existing cache {} for region {} unexpectedly", file_path, key.region_id);
return;
}
@@ -615,7 +553,7 @@ impl FileCache {
/// Checks if the key is in the file cache.
pub(crate) fn contains_key(&self, key: &IndexKey) -> bool {
self.inner.contains_key(key)
self.inner.memory_index(key.file_type).contains_key(key)
}
/// Returns the capacity of the puffin (index) cache in bytes.
@@ -638,42 +576,9 @@ impl FileCache {
file_size: u64,
) -> Result<()> {
self.inner
.download(index_key, remote_path, remote_store, file_size, 8) // Foreground uses concurrency=8
.download(index_key, remote_path, remote_store, file_size)
.await
}
/// Downloads a file in `remote_path` from the remote object store to the local cache
/// (specified by `index_key`) in the background. Errors are logged but not returned.
///
/// This method attempts to send a download task to the background worker.
/// If the channel is full, the task is silently dropped.
pub(crate) fn maybe_download_background(
&self,
index_key: IndexKey,
remote_path: String,
remote_store: ObjectStore,
file_size: u64,
) {
// Do nothing if background worker is disabled (channel is None)
let Some(tx) = &self.download_task_tx else {
return;
};
let task = DownloadTask {
index_key,
remote_path,
remote_store,
file_size,
};
// Try to send the task; if the channel is full, just drop it
if let Err(e) = tx.try_send(task) {
debug!(
"Failed to queue background download task for region {}, file {}: {:?}",
index_key.region_id, index_key.file_id, e
);
}
}
}
/// Key of file cache index.
@@ -803,7 +708,6 @@ mod tests {
ReadableSize::mb(10),
Some(Duration::from_millis(10)),
None,
true, // enable_background_worker
);
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
@@ -840,13 +744,7 @@ mod tests {
let dir = create_temp_dir("");
let local_store = new_fs_store(dir.path().to_str().unwrap());
let cache = FileCache::new(
local_store.clone(),
ReadableSize::mb(10),
None,
None,
true, // enable_background_worker
);
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None, None);
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
let key = IndexKey::new(region_id, file_id, FileType::Parquet);
@@ -894,13 +792,7 @@ mod tests {
let dir = create_temp_dir("");
let local_store = new_fs_store(dir.path().to_str().unwrap());
let cache = FileCache::new(
local_store.clone(),
ReadableSize::mb(10),
None,
None,
true, // enable_background_worker
);
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None, None);
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
let key = IndexKey::new(region_id, file_id, FileType::Parquet);
@@ -932,13 +824,7 @@ mod tests {
async fn test_file_cache_recover() {
let dir = create_temp_dir("");
let local_store = new_fs_store(dir.path().to_str().unwrap());
let cache = FileCache::new(
local_store.clone(),
ReadableSize::mb(10),
None,
None,
true, // enable_background_worker
);
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None, None);
let region_id = RegionId::new(2000, 0);
let file_type = FileType::Parquet;
@@ -964,13 +850,7 @@ mod tests {
}
// Recover the cache.
let cache = FileCache::new(
local_store.clone(),
ReadableSize::mb(10),
None,
None,
true, // enable_background_worker
);
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None, None);
// No entry before recovery.
assert!(
cache
@@ -999,13 +879,7 @@ mod tests {
async fn test_file_cache_read_ranges() {
let dir = create_temp_dir("");
let local_store = new_fs_store(dir.path().to_str().unwrap());
let file_cache = FileCache::new(
local_store.clone(),
ReadableSize::mb(10),
None,
None,
true, // enable_background_worker
);
let file_cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None, None);
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
let key = IndexKey::new(region_id, file_id, FileType::Parquet);

View File

@@ -370,22 +370,7 @@ impl ManifestCache {
/// If `check_mtime` is true, only removes directories that have not been modified
/// for at least 1 hour.
fn clean_empty_dirs_sync(dir: &PathBuf, check_mtime: bool) -> std::io::Result<()> {
let is_empty = Self::remove_empty_dirs_recursive_sync(dir, check_mtime)?;
if is_empty {
if let Err(e) = std::fs::remove_dir(dir) {
if e.kind() != std::io::ErrorKind::NotFound {
warn!(e; "Failed to remove empty root dir {}", dir.display());
return Err(e);
} else {
warn!("Empty root dir not found before removal {}", dir.display());
}
} else {
info!(
"Removed empty root dir {} from manifest cache",
dir.display()
);
}
}
Self::remove_empty_dirs_recursive_sync(dir, check_mtime)?;
Ok(())
}
@@ -427,16 +412,11 @@ impl ManifestCache {
let subdir_empty = Self::remove_empty_dirs_recursive_sync(&path, check_mtime)?;
if subdir_empty {
if let Err(e) = std::fs::remove_dir(&path) {
if e.kind() != std::io::ErrorKind::NotFound {
warn!(e; "Failed to remove empty directory {}", path.display());
is_empty = false;
} else {
info!(
"Empty directory {} not found before removal",
path.display()
);
}
if let Err(e) = std::fs::remove_dir(&path)
&& e.kind() != std::io::ErrorKind::NotFound
{
warn!(e; "Failed to remove empty directory {}", path.display());
is_empty = false;
} else {
info!(
"Removed empty directory {} from manifest cache",
@@ -591,116 +571,4 @@ mod tests {
cache.cache_file_path("region_1/manifest/00000000000000000007.checkpoint")
);
}
#[tokio::test]
async fn test_clean_empty_dirs_sync_no_mtime_check() {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("");
let root = PathBuf::from(dir.path());
// Create a directory structure:
// root/
// empty_dir1/
// empty_dir2/
// empty_subdir/
// non_empty_dir/
// file.txt
// nested/
// empty_subdir1/
// non_empty_subdir/
// file.txt
let empty_dir1 = root.join("empty_dir1");
let empty_dir2 = root.join("empty_dir2");
let empty_subdir = empty_dir2.join("empty_subdir");
let non_empty_dir = root.join("non_empty_dir");
let nested = root.join("nested");
let nested_empty = nested.join("empty_subdir1");
let nested_non_empty = nested.join("non_empty_subdir");
// Create directories
std::fs::create_dir_all(&empty_dir1).unwrap();
std::fs::create_dir_all(&empty_subdir).unwrap();
std::fs::create_dir_all(&non_empty_dir).unwrap();
std::fs::create_dir_all(&nested_empty).unwrap();
std::fs::create_dir_all(&nested_non_empty).unwrap();
// Create files in non-empty directories
std::fs::write(non_empty_dir.join("file.txt"), b"content").unwrap();
std::fs::write(nested_non_empty.join("file.txt"), b"content").unwrap();
// Verify initial state
assert!(empty_dir1.exists());
assert!(empty_dir2.exists());
assert!(empty_subdir.exists());
assert!(non_empty_dir.exists());
assert!(nested.exists());
assert!(nested_empty.exists());
assert!(nested_non_empty.exists());
// Clean empty directories with check_mtime = false
ManifestCache::clean_empty_dirs_sync(&root, false).unwrap();
// Verify empty directories are removed
assert!(!empty_dir1.exists());
assert!(!empty_dir2.exists());
assert!(!empty_subdir.exists());
assert!(!nested_empty.exists());
// Verify non-empty directories still exist
assert!(non_empty_dir.exists());
assert!(non_empty_dir.join("file.txt").exists());
assert!(nested.exists());
assert!(nested_non_empty.exists());
assert!(nested_non_empty.join("file.txt").exists());
}
#[tokio::test]
async fn test_clean_empty_dirs_sync_with_mtime_check() {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("");
let root = PathBuf::from(dir.path());
// Create a directory structure with recently created empty directories
// root/
// empty_dir1/
// empty_dir2/
// empty_subdir/
// non_empty_dir/
// file.txt
let empty_dir1 = root.join("empty_dir1");
let empty_dir2 = root.join("empty_dir2");
let empty_subdir = empty_dir2.join("empty_subdir");
let non_empty_dir = root.join("non_empty_dir");
// Create directories
std::fs::create_dir_all(&empty_dir1).unwrap();
std::fs::create_dir_all(&empty_subdir).unwrap();
std::fs::create_dir_all(&non_empty_dir).unwrap();
// Create file in non-empty directory
std::fs::write(non_empty_dir.join("file.txt"), b"content").unwrap();
// Verify initial state
assert!(empty_dir1.exists());
assert!(empty_dir2.exists());
assert!(empty_subdir.exists());
assert!(non_empty_dir.exists());
// Clean empty directories with check_mtime = true
// Since the directories were just created (mtime < 1 hour), they should NOT be removed
ManifestCache::clean_empty_dirs_sync(&root, true).unwrap();
// Verify empty directories are NOT removed (they're too recent)
assert!(empty_dir1.exists());
assert!(empty_dir2.exists());
assert!(empty_subdir.exists());
// Verify non-empty directory still exists
assert!(non_empty_dir.exists());
assert!(non_empty_dir.join("file.txt").exists());
}
}

View File

@@ -63,13 +63,11 @@ pub type WriteCacheRef = Arc<WriteCache>;
impl WriteCache {
/// Create the cache with a `local_store` to cache files and a
/// `object_store_manager` for all object stores.
#[allow(clippy::too_many_arguments)]
pub async fn new(
local_store: ObjectStore,
cache_capacity: ReadableSize,
ttl: Option<Duration>,
index_cache_percent: Option<u8>,
enable_background_worker: bool,
puffin_manager_factory: PuffinManagerFactory,
intermediate_manager: IntermediateManager,
manifest_cache: Option<ManifestCache>,
@@ -81,7 +79,6 @@ impl WriteCache {
cache_capacity,
ttl,
index_cache_percent,
enable_background_worker,
));
file_cache.recover(false, Some(task_receiver)).await;
@@ -95,13 +92,11 @@ impl WriteCache {
}
/// Creates a write cache based on local fs.
#[allow(clippy::too_many_arguments)]
pub async fn new_fs(
cache_dir: &str,
cache_capacity: ReadableSize,
ttl: Option<Duration>,
index_cache_percent: Option<u8>,
enable_background_worker: bool,
puffin_manager_factory: PuffinManagerFactory,
intermediate_manager: IntermediateManager,
manifest_cache_capacity: ReadableSize,
@@ -122,7 +117,6 @@ impl WriteCache {
cache_capacity,
ttl,
index_cache_percent,
enable_background_worker,
puffin_manager_factory,
intermediate_manager,
manifest_cache,

View File

@@ -14,7 +14,6 @@
mod buckets;
pub mod compactor;
pub mod memory_manager;
pub mod picker;
pub mod run;
mod task;
@@ -30,7 +29,6 @@ use std::time::Instant;
use api::v1::region::compact_request;
use api::v1::region::compact_request::Options;
use common_base::Plugins;
use common_memory_manager::OnExhaustedPolicy;
use common_meta::key::SchemaMetadataManagerRef;
use common_telemetry::{debug, error, info, warn};
use common_time::range::TimestampRange;
@@ -48,8 +46,7 @@ use tokio::sync::mpsc::{self, Sender};
use crate::access_layer::AccessLayerRef;
use crate::cache::{CacheManagerRef, CacheStrategy};
use crate::compaction::compactor::{CompactionRegion, CompactionVersion, DefaultCompactor};
use crate::compaction::memory_manager::CompactionMemoryManager;
use crate::compaction::picker::{CompactionTask, PickerOutput, new_picker};
use crate::compaction::picker::{CompactionTask, new_picker};
use crate::compaction::task::CompactionTaskImpl;
use crate::config::MitoConfig;
use crate::error::{
@@ -107,15 +104,12 @@ pub(crate) struct CompactionScheduler {
request_sender: Sender<WorkerRequestWithTime>,
cache_manager: CacheManagerRef,
engine_config: Arc<MitoConfig>,
memory_manager: Arc<CompactionMemoryManager>,
memory_policy: OnExhaustedPolicy,
listener: WorkerListener,
/// Plugins for the compaction scheduler.
plugins: Plugins,
}
impl CompactionScheduler {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
scheduler: SchedulerRef,
request_sender: Sender<WorkerRequestWithTime>,
@@ -123,8 +117,6 @@ impl CompactionScheduler {
engine_config: Arc<MitoConfig>,
listener: WorkerListener,
plugins: Plugins,
memory_manager: Arc<CompactionMemoryManager>,
memory_policy: OnExhaustedPolicy,
) -> Self {
Self {
scheduler,
@@ -132,8 +124,6 @@ impl CompactionScheduler {
request_sender,
cache_manager,
engine_config,
memory_manager,
memory_policy,
listener,
plugins,
}
@@ -439,8 +429,7 @@ impl CompactionScheduler {
};
// Create a local compaction task.
let estimated_bytes = estimate_compaction_bytes(&picker_output);
let local_compaction_task = Box::new(CompactionTaskImpl {
let mut local_compaction_task = Box::new(CompactionTaskImpl {
request_sender,
waiters,
start_time,
@@ -448,27 +437,18 @@ impl CompactionScheduler {
picker_output,
compaction_region,
compactor: Arc::new(DefaultCompactor {}),
memory_manager: self.memory_manager.clone(),
memory_policy: self.memory_policy,
estimated_memory_bytes: estimated_bytes,
});
self.submit_compaction_task(local_compaction_task, region_id)
}
fn submit_compaction_task(
&mut self,
mut task: Box<CompactionTaskImpl>,
region_id: RegionId,
) -> Result<()> {
// Submit the compaction task.
self.scheduler
.schedule(Box::pin(async move {
INFLIGHT_COMPACTION_COUNT.inc();
task.run().await;
local_compaction_task.run().await;
INFLIGHT_COMPACTION_COUNT.dec();
}))
.map_err(|e| {
error!(e; "Failed to submit compaction request for region {}", region_id);
// If failed to submit the job, we need to remove the region from the scheduler.
self.region_status.remove(&region_id);
e
})
@@ -778,20 +758,6 @@ fn get_expired_ssts(
.collect()
}
/// Estimates compaction memory as the sum of all input files' maximum row-group
/// uncompressed sizes.
fn estimate_compaction_bytes(picker_output: &PickerOutput) -> u64 {
picker_output
.outputs
.iter()
.flat_map(|output| output.inputs.iter())
.map(|file: &FileHandle| {
let meta = file.meta_ref();
meta.max_row_group_uncompressed_size
})
.sum()
}
/// Pending compaction request that is supposed to run after current task is finished,
/// typically used for manual compactions.
struct PendingCompaction {
@@ -807,10 +773,9 @@ struct PendingCompaction {
mod tests {
use api::v1::region::StrictWindow;
use common_datasource::compression::CompressionType;
use tokio::sync::{Barrier, oneshot};
use tokio::sync::oneshot;
use super::*;
use crate::compaction::memory_manager::{CompactionMemoryGuard, new_compaction_memory_manager};
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::region::ManifestContext;
use crate::sst::FormatType;
@@ -1180,39 +1145,4 @@ mod tests {
assert_eq!(result.unwrap(), 0); // is there a better way to check this?
assert_eq!(0, scheduler.region_status.len());
}
#[tokio::test]
async fn test_concurrent_memory_competition() {
let manager = Arc::new(new_compaction_memory_manager(3 * 1024 * 1024)); // 3MB
let barrier = Arc::new(Barrier::new(3));
let mut handles = vec![];
// Spawn 3 tasks competing for memory, each trying to acquire 2MB
for _i in 0..3 {
let mgr = manager.clone();
let bar = barrier.clone();
let handle = tokio::spawn(async move {
bar.wait().await; // Synchronize start
mgr.try_acquire(2 * 1024 * 1024)
});
handles.push(handle);
}
let results: Vec<Option<CompactionMemoryGuard>> = futures::future::join_all(handles)
.await
.into_iter()
.map(|r| r.unwrap())
.collect();
// Only 1 should succeed (3MB limit, 2MB request, can only fit one)
let succeeded = results.iter().filter(|r| r.is_some()).count();
let failed = results.iter().filter(|r| r.is_none()).count();
assert_eq!(succeeded, 1, "Expected exactly 1 task to acquire memory");
assert_eq!(failed, 2, "Expected 2 tasks to fail");
// Clean up
drop(results);
assert_eq!(manager.used_bytes(), 0);
}
}

View File

@@ -396,7 +396,6 @@ impl DefaultCompactor {
time_range: sst_info.time_range,
level: output.output_level,
file_size: sst_info.file_size,
max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
available_indexes: sst_info.index_metadata.build_available_indexes(),
indexes: sst_info.index_metadata.build_indexes(),
index_file_size: sst_info.index_metadata.file_size,

View File

@@ -1,50 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_memory_manager::{MemoryGuard, MemoryManager, MemoryMetrics};
use crate::metrics::{
COMPACTION_MEMORY_IN_USE, COMPACTION_MEMORY_LIMIT, COMPACTION_MEMORY_REJECTED,
};
/// Compaction-specific memory metrics implementation.
#[derive(Clone, Copy, Debug, Default)]
pub struct CompactionMemoryMetrics;
impl MemoryMetrics for CompactionMemoryMetrics {
fn set_limit(&self, bytes: i64) {
COMPACTION_MEMORY_LIMIT.set(bytes);
}
fn set_in_use(&self, bytes: i64) {
COMPACTION_MEMORY_IN_USE.set(bytes);
}
fn inc_rejected(&self, reason: &str) {
COMPACTION_MEMORY_REJECTED
.with_label_values(&[reason])
.inc();
}
}
/// Compaction memory manager.
pub type CompactionMemoryManager = MemoryManager<CompactionMemoryMetrics>;
/// Compaction memory guard.
pub type CompactionMemoryGuard = MemoryGuard<CompactionMemoryMetrics>;
/// Helper to construct a compaction memory manager without passing metrics explicitly.
pub fn new_compaction_memory_manager(limit_bytes: u64) -> CompactionMemoryManager {
CompactionMemoryManager::new(limit_bytes, CompactionMemoryMetrics)
}

View File

@@ -16,18 +16,16 @@ use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use std::time::Instant;
use common_memory_manager::OnExhaustedPolicy;
use common_telemetry::{error, info, warn};
use itertools::Itertools;
use snafu::ResultExt;
use tokio::sync::mpsc;
use crate::compaction::compactor::{CompactionRegion, Compactor};
use crate::compaction::memory_manager::{CompactionMemoryGuard, CompactionMemoryManager};
use crate::compaction::picker::{CompactionTask, PickerOutput};
use crate::error::{CompactRegionSnafu, CompactionMemoryExhaustedSnafu, MemoryAcquireFailedSnafu};
use crate::error::CompactRegionSnafu;
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_MEMORY_WAIT, COMPACTION_STAGE_ELAPSED};
use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED};
use crate::region::RegionRoleState;
use crate::request::{
BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, RegionEditResult,
@@ -54,12 +52,6 @@ pub(crate) struct CompactionTaskImpl {
pub(crate) compactor: Arc<dyn Compactor>,
/// Output of the picker.
pub(crate) picker_output: PickerOutput,
/// Memory manager to acquire memory budget.
pub(crate) memory_manager: Arc<CompactionMemoryManager>,
/// Policy when memory is exhausted.
pub(crate) memory_policy: OnExhaustedPolicy,
/// Estimated memory bytes needed for this compaction.
pub(crate) estimated_memory_bytes: u64,
}
impl Debug for CompactionTaskImpl {
@@ -89,88 +81,6 @@ impl CompactionTaskImpl {
.for_each(|o| o.inputs.iter().for_each(|f| f.set_compacting(compacting)));
}
/// Acquires memory budget based on the configured policy.
///
/// Returns an error if memory cannot be acquired according to the policy.
async fn acquire_memory_with_policy(&self) -> error::Result<CompactionMemoryGuard> {
let region_id = self.compaction_region.region_id;
let requested_bytes = self.estimated_memory_bytes;
let limit_bytes = self.memory_manager.limit_bytes();
if limit_bytes > 0 && requested_bytes > limit_bytes {
warn!(
"Compaction for region {} requires {} bytes but limit is {} bytes; cannot satisfy request",
region_id, requested_bytes, limit_bytes
);
return Err(CompactionMemoryExhaustedSnafu {
region_id,
required_bytes: requested_bytes,
limit_bytes,
policy: "exceed_limit".to_string(),
}
.build());
}
match self.memory_policy {
OnExhaustedPolicy::Wait {
timeout: wait_timeout,
} => {
let timer = COMPACTION_MEMORY_WAIT.start_timer();
match tokio::time::timeout(
wait_timeout,
self.memory_manager.acquire(requested_bytes),
)
.await
{
Ok(Ok(guard)) => {
timer.observe_duration();
Ok(guard)
}
Ok(Err(e)) => {
timer.observe_duration();
Err(e).with_context(|_| MemoryAcquireFailedSnafu {
region_id,
policy: format!("wait_timeout({}ms)", wait_timeout.as_millis()),
})
}
Err(_) => {
timer.observe_duration();
warn!(
"Compaction for region {} waited {:?} for {} bytes but timed out",
region_id, wait_timeout, requested_bytes
);
CompactionMemoryExhaustedSnafu {
region_id,
required_bytes: requested_bytes,
limit_bytes,
policy: format!("wait_timeout({}ms)", wait_timeout.as_millis()),
}
.fail()
}
}
}
OnExhaustedPolicy::Fail => {
// Try to acquire, fail immediately if not available
self.memory_manager
.try_acquire(requested_bytes)
.ok_or_else(|| {
warn!(
"Compaction memory exhausted for region {} (policy=fail, need {} bytes, limit {} bytes)",
region_id, requested_bytes, limit_bytes
);
CompactionMemoryExhaustedSnafu {
region_id,
required_bytes: requested_bytes,
limit_bytes,
policy: "fail".to_string(),
}
.build()
})
}
}
}
/// Remove expired ssts files, update manifest immediately
/// and apply the edit to region version.
///
@@ -312,7 +222,7 @@ impl CompactionTaskImpl {
}
/// Handles compaction failure, notifies all waiters.
pub(crate) fn on_failure(&mut self, err: Arc<error::Error>) {
fn on_failure(&mut self, err: Arc<error::Error>) {
COMPACTION_FAILURE_COUNT.inc();
for waiter in self.waiters.drain(..) {
waiter.send(Err(err.clone()).context(CompactRegionSnafu {
@@ -339,26 +249,6 @@ impl CompactionTaskImpl {
#[async_trait::async_trait]
impl CompactionTask for CompactionTaskImpl {
async fn run(&mut self) {
// Acquire memory budget before starting compaction
let _memory_guard = match self.acquire_memory_with_policy().await {
Ok(guard) => guard,
Err(e) => {
error!(e; "Failed to acquire memory for compaction, region id: {}", self.compaction_region.region_id);
let err = Arc::new(e);
self.on_failure(err.clone());
let notify = BackgroundNotify::CompactionFailed(CompactionFailed {
region_id: self.compaction_region.region_id,
err,
});
self.send_to_worker(WorkerRequest::Background {
region_id: self.compaction_region.region_id,
notify,
})
.await;
return;
}
};
let notify = match self.handle_expiration_and_compaction().await {
Ok(edit) => BackgroundNotify::CompactionFinished(CompactionFinished {
region_id: self.compaction_region.region_id,

View File

@@ -74,7 +74,6 @@ pub fn new_file_handle_with_size_and_sequence(
),
level,
file_size,
max_row_group_uncompressed_size: file_size,
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,

View File

@@ -20,7 +20,6 @@ use std::time::Duration;
use common_base::memory_limit::MemoryLimit;
use common_base::readable_size::ReadableSize;
use common_memory_manager::OnExhaustedPolicy;
use common_stat::{get_total_cpu_cores, get_total_memory_readable};
use common_telemetry::warn;
use serde::{Deserialize, Serialize};
@@ -93,10 +92,6 @@ pub struct MitoConfig {
pub max_background_compactions: usize,
/// Max number of running background purge jobs (default: number of cpu cores).
pub max_background_purges: usize,
/// Memory budget for compaction tasks. Setting it to 0 or "unlimited" disables the limit.
pub experimental_compaction_memory_limit: MemoryLimit,
/// Behavior when compaction cannot acquire memory from the budget.
pub experimental_compaction_on_exhausted: OnExhaustedPolicy,
// Flush configs:
/// Interval to auto flush a region if it has not flushed yet (default 30 min).
@@ -131,11 +126,6 @@ pub struct MitoConfig {
/// The remaining capacity is used for data (parquet) files.
/// Must be between 0 and 100 (exclusive).
pub index_cache_percent: u8,
/// Enable background downloading of files to the local cache when accessed during queries (default: true).
/// When enabled, files will be asynchronously downloaded to improve performance for subsequent reads.
pub enable_refill_cache_on_read: bool,
/// Capacity for manifest cache (default: 256MB).
pub manifest_cache_size: ReadableSize,
// Other configs:
/// Buffer size for SST writing.
@@ -188,8 +178,6 @@ impl Default for MitoConfig {
max_background_flushes: divide_num_cpus(2),
max_background_compactions: divide_num_cpus(4),
max_background_purges: get_total_cpu_cores(),
experimental_compaction_memory_limit: MemoryLimit::Unlimited,
experimental_compaction_on_exhausted: OnExhaustedPolicy::default(),
auto_flush_interval: Duration::from_secs(30 * 60),
global_write_buffer_size: ReadableSize::gb(1),
global_write_buffer_reject_size: ReadableSize::gb(2),
@@ -203,8 +191,6 @@ impl Default for MitoConfig {
write_cache_ttl: None,
preload_index_cache: true,
index_cache_percent: DEFAULT_INDEX_CACHE_PERCENT,
enable_refill_cache_on_read: true,
manifest_cache_size: ReadableSize::mb(256),
sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,

View File

@@ -71,8 +71,6 @@ mod sync_test;
#[cfg(test)]
mod truncate_test;
#[cfg(test)]
mod copy_region_from_test;
#[cfg(test)]
mod remap_manifests_test;
@@ -105,9 +103,8 @@ use store_api::metric_engine_consts::{
MANIFEST_INFO_EXTENSION_KEY, TABLE_COLUMN_METADATA_EXTENSION_KEY,
};
use store_api::region_engine::{
BatchResponses, CopyRegionFromRequest, CopyRegionFromResponse, MitoCopyRegionFromResponse,
RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic,
RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse,
BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
RegionStatistic, RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse,
SettableRegionRoleState, SyncManifestResponse,
};
use store_api::region_request::{
@@ -122,8 +119,8 @@ use crate::cache::{CacheManagerRef, CacheStrategy};
use crate::config::MitoConfig;
use crate::engine::puffin_index::{IndexEntryContext, collect_index_entries_from_puffin};
use crate::error::{
self, InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu,
Result, SerdeJsonSnafu, SerializeColumnMetadataSnafu, SerializeManifestSnafu,
InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result,
SerdeJsonSnafu, SerializeColumnMetadataSnafu, SerializeManifestSnafu,
};
#[cfg(feature = "enterprise")]
use crate::extension::BoxedExtensionRangeProviderFactory;
@@ -424,17 +421,6 @@ impl MitoEngine {
rx.await.context(RecvSnafu)?
}
/// Handles copy region from request.
///
/// This method is only supported for internal use and is not exposed in the trait implementation.
pub async fn copy_region_from(
&self,
region_id: RegionId,
request: CopyRegionFromRequest,
) -> Result<MitoCopyRegionFromResponse> {
self.inner.copy_region_from(region_id, request).await
}
#[cfg(test)]
pub(crate) fn get_region(&self, id: RegionId) -> Option<crate::region::MitoRegionRef> {
self.find_region(id)
@@ -635,9 +621,7 @@ impl MitoEngine {
}
}
/// Check whether the region edit is valid.
///
/// Only adding or removing files to region is considered valid now.
/// Check whether the region edit is valid. Only adding files to region is considered valid now.
fn is_valid_region_edit(edit: &RegionEdit) -> bool {
!edit.files_to_add.is_empty()
&& edit.files_to_remove.is_empty()
@@ -1070,18 +1054,6 @@ impl EngineInner {
Ok(RemapManifestsResponse { new_manifests })
}
async fn copy_region_from(
&self,
region_id: RegionId,
request: CopyRegionFromRequest,
) -> Result<MitoCopyRegionFromResponse> {
let (request, receiver) =
WorkerRequest::try_from_copy_region_from_request(region_id, request)?;
self.workers.submit_to_worker(region_id, request).await?;
let response = receiver.await.context(RecvSnafu)??;
Ok(response)
}
fn role(&self, region_id: RegionId) -> Option<RegionRole> {
self.workers.get_region(region_id).map(|region| {
if region.is_follower() {
@@ -1268,19 +1240,6 @@ impl RegionEngine for MitoEngine {
.map_err(BoxedError::new)
}
async fn copy_region_from(
&self,
_region_id: RegionId,
_request: CopyRegionFromRequest,
) -> Result<CopyRegionFromResponse, BoxedError> {
Err(BoxedError::new(
error::UnsupportedOperationSnafu {
err_msg: "copy_region_from is not supported",
}
.build(),
))
}
fn role(&self, region_id: RegionId) -> Option<RegionRole> {
self.inner.role(region_id)
}

View File

@@ -1,361 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::assert_matches::assert_matches;
use std::fs;
use std::sync::Arc;
use api::v1::Rows;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use object_store::layers::mock::{Error as MockError, ErrorKind, MockLayerBuilder};
use store_api::region_engine::{CopyRegionFromRequest, RegionEngine, RegionRole};
use store_api::region_request::{RegionFlushRequest, RegionRequest};
use store_api::storage::RegionId;
use crate::config::MitoConfig;
use crate::error::Error;
use crate::test_util::{CreateRequestBuilder, TestEnv, build_rows, put_rows, rows_schema};
#[tokio::test]
async fn test_engine_copy_region_from() {
common_telemetry::init_default_ut_logging();
test_engine_copy_region_from_with_format(true, true).await;
test_engine_copy_region_from_with_format(true, false).await;
test_engine_copy_region_from_with_format(false, true).await;
test_engine_copy_region_from_with_format(false, false).await;
}
async fn test_engine_copy_region_from_with_format(flat_format: bool, with_index: bool) {
let mut env = TestEnv::with_prefix("copy-region-from").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
})
.await;
// Creates a source region and adds some data
let source_region_id = RegionId::new(1, 1);
let mut request = CreateRequestBuilder::new().build();
if with_index {
request
.column_metadatas
.iter_mut()
.find(|c| c.column_schema.name == "tag_0")
.unwrap()
.column_schema
.set_inverted_index(true);
}
let column_schemas = rows_schema(&request);
engine
.handle_request(source_region_id, RegionRequest::Create(request.clone()))
.await
.unwrap();
let rows = Rows {
schema: column_schemas,
rows: build_rows(0, 42),
};
put_rows(&engine, source_region_id, rows).await;
engine
.handle_request(
source_region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
)
.await
.unwrap();
// Creates a target region and enters staging mode
let target_region_id = RegionId::new(1, 2);
engine
.handle_request(target_region_id, RegionRequest::Create(request))
.await
.unwrap();
common_telemetry::debug!("copy region from");
let resp = engine
.copy_region_from(
target_region_id,
CopyRegionFromRequest {
source_region_id,
parallelism: 1,
},
)
.await
.unwrap();
let manifest = engine
.get_region(target_region_id)
.unwrap()
.manifest_ctx
.manifest()
.await;
assert!(!manifest.files.is_empty());
for meta in manifest.files.values() {
assert_eq!(meta.region_id, target_region_id);
assert_eq!(meta.exists_index(), with_index);
}
let source_region_dir = format!("{}/data/test/1_0000000001", env.data_home().display());
let source_region_files = collect_filename_in_dir(&source_region_dir);
let target_region_dir = format!("{}/data/test/1_0000000002", env.data_home().display());
let target_region_files = collect_filename_in_dir(&target_region_dir);
assert_eq!(source_region_files, target_region_files);
if with_index {
let source_region_index_files =
collect_filename_in_dir(&format!("{}/index", source_region_dir));
let target_region_index_files =
collect_filename_in_dir(&format!("{}/index", target_region_dir));
assert_eq!(source_region_index_files, target_region_index_files);
}
common_telemetry::debug!("copy region from again");
let resp2 = engine
.copy_region_from(
target_region_id,
CopyRegionFromRequest {
source_region_id,
parallelism: 1,
},
)
.await
.unwrap();
assert_eq!(resp.copied_file_ids, resp2.copied_file_ids);
}
#[tokio::test]
async fn test_engine_copy_region_failure() {
common_telemetry::init_default_ut_logging();
test_engine_copy_region_failure_with_format(false).await;
test_engine_copy_region_failure_with_format(true).await;
}
async fn test_engine_copy_region_failure_with_format(flat_format: bool) {
let mock_layer = MockLayerBuilder::default()
.copy_interceptor(Arc::new(|from, _, _args| {
if from.contains(".puffin") {
Some(Err(MockError::new(ErrorKind::Unexpected, "mock err")))
} else {
None
}
}))
.build()
.unwrap();
let mut env = TestEnv::new().await.with_mock_layer(mock_layer);
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
})
.await;
// Creates a source region and adds some data
let source_region_id = RegionId::new(1, 1);
let mut request = CreateRequestBuilder::new().build();
request
.column_metadatas
.iter_mut()
.find(|c| c.column_schema.name == "tag_0")
.unwrap()
.column_schema
.set_inverted_index(true);
let column_schemas = rows_schema(&request);
engine
.handle_request(source_region_id, RegionRequest::Create(request.clone()))
.await
.unwrap();
let rows = Rows {
schema: column_schemas,
rows: build_rows(0, 42),
};
put_rows(&engine, source_region_id, rows).await;
engine
.handle_request(
source_region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
)
.await
.unwrap();
let source_region_dir = format!("{}/data/test/1_0000000001", env.data_home().display());
assert_file_num_in_dir(&source_region_dir, 1);
assert_file_num_in_dir(&format!("{}/index", source_region_dir), 1);
let source_region_files = collect_filename_in_dir(&source_region_dir);
let source_region_index_files =
collect_filename_in_dir(&format!("{}/index", source_region_dir));
// Creates a target region and enters staging mode
let target_region_id = RegionId::new(1, 2);
engine
.handle_request(target_region_id, RegionRequest::Create(request))
.await
.unwrap();
let err = engine
.copy_region_from(
target_region_id,
CopyRegionFromRequest {
source_region_id,
parallelism: 1,
},
)
.await
.unwrap_err();
assert_eq!(err.status_code(), StatusCode::StorageUnavailable);
// Check target region directory is empty
let target_region_dir = format!("{}/data/test/1_0000000002", env.data_home().display());
assert_file_num_in_dir(&target_region_dir, 0);
assert!(!fs::exists(format!("{}/index", target_region_dir)).unwrap());
// Check source region directory is not affected
let source_region_dir = format!("{}/data/test/1_0000000001", env.data_home().display());
assert_file_num_in_dir(&source_region_dir, 1);
assert_file_num_in_dir(&format!("{}/index", source_region_dir), 1);
assert_eq!(
source_region_files,
collect_filename_in_dir(&source_region_dir)
);
assert_eq!(
source_region_index_files,
collect_filename_in_dir(&format!("{}/index", source_region_dir))
);
}
fn assert_file_num_in_dir(dir: &str, expected_num: usize) {
let files = fs::read_dir(dir)
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap()
.into_iter()
.filter(|f| f.metadata().unwrap().is_file())
.collect::<Vec<_>>();
assert_eq!(
files.len(),
expected_num,
"The number of files in the directory should be {}, got: {:?}",
expected_num,
files
);
}
fn collect_filename_in_dir(dir: &str) -> Vec<String> {
let mut files = fs::read_dir(dir)
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap()
.into_iter()
.filter(|f| f.metadata().unwrap().is_file())
.map(|f| {
f.path()
.to_string_lossy()
.rsplit("/")
.last()
.unwrap()
.to_string()
})
.collect::<Vec<_>>();
files.sort_unstable();
files
}
#[tokio::test]
async fn test_engine_copy_region_invalid_args() {
common_telemetry::init_default_ut_logging();
test_engine_copy_region_invalid_args_with_format(false).await;
test_engine_copy_region_invalid_args_with_format(true).await;
}
async fn test_engine_copy_region_invalid_args_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
engine
.handle_request(region_id, RegionRequest::Create(request.clone()))
.await
.unwrap();
let err = engine
.copy_region_from(
region_id,
CopyRegionFromRequest {
source_region_id: RegionId::new(2, 1),
parallelism: 1,
},
)
.await
.unwrap_err();
assert_eq!(err.status_code(), StatusCode::InvalidArguments);
let err = engine
.copy_region_from(
region_id,
CopyRegionFromRequest {
source_region_id: RegionId::new(1, 1),
parallelism: 1,
},
)
.await
.unwrap_err();
assert_eq!(err.status_code(), StatusCode::InvalidArguments);
}
#[tokio::test]
async fn test_engine_copy_region_unexpected_state() {
common_telemetry::init_default_ut_logging();
test_engine_copy_region_unexpected_state_with_format(false).await;
test_engine_copy_region_unexpected_state_with_format(true).await;
}
async fn test_engine_copy_region_unexpected_state_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
engine
.handle_request(region_id, RegionRequest::Create(request.clone()))
.await
.unwrap();
engine
.set_region_role(region_id, RegionRole::Follower)
.unwrap();
let err = engine
.copy_region_from(
region_id,
CopyRegionFromRequest {
source_region_id: RegionId::new(1, 2),
parallelism: 1,
},
)
.await
.unwrap_err();
assert_matches!(
err.as_any().downcast_ref::<Error>().unwrap(),
Error::RegionState { .. }
)
}

View File

@@ -160,8 +160,6 @@ async fn test_index_build_type_flush() {
#[tokio::test]
async fn test_index_build_type_compact() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::with_prefix("test_index_build_type_compact_").await;
let listener = Arc::new(IndexBuildListener::default());
let engine = env

View File

@@ -19,7 +19,6 @@ use common_datasource::compression::CompressionType;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_memory_manager;
use common_runtime::JoinError;
use common_time::Timestamp;
use common_time::timestamp::TimeUnit;
@@ -1042,28 +1041,6 @@ pub enum Error {
#[snafu(display("Manual compaction is override by following operations."))]
ManualCompactionOverride {},
#[snafu(display(
"Compaction memory limit exceeded for region {region_id}: required {required_bytes} bytes, limit {limit_bytes} bytes (policy: {policy})",
))]
CompactionMemoryExhausted {
region_id: RegionId,
required_bytes: u64,
limit_bytes: u64,
policy: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to acquire memory for region {region_id} (policy: {policy})"))]
MemoryAcquireFailed {
region_id: RegionId,
policy: String,
#[snafu(source)]
source: common_memory_manager::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Incompatible WAL provider change. This is typically caused by changing WAL provider in database config file without completely cleaning existing files. Global provider: {}, region provider: {}",
global,
@@ -1185,18 +1162,6 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Invalid source and target region, source: {}, target: {}",
source_region_id,
target_region_id
))]
InvalidSourceAndTargetRegion {
source_region_id: RegionId,
target_region_id: RegionId,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -1265,8 +1230,7 @@ impl ErrorExt for Error {
| MissingManifest { .. }
| NoOldManifests { .. }
| MissingPartitionExpr { .. }
| SerializePartitionExpr { .. }
| InvalidSourceAndTargetRegion { .. } => StatusCode::InvalidArguments,
| SerializePartitionExpr { .. } => StatusCode::InvalidArguments,
RegionMetadataNotFound { .. }
| Join { .. }
@@ -1359,10 +1323,6 @@ impl ErrorExt for Error {
ManualCompactionOverride {} => StatusCode::Cancelled,
CompactionMemoryExhausted { .. } => StatusCode::RuntimeResourcesExhausted,
MemoryAcquireFailed { source, .. } => source.status_code(),
IncompatibleWalProviderChange { .. } => StatusCode::InvalidArguments,
ScanSeries { source, .. } => source.status_code(),

View File

@@ -640,7 +640,6 @@ impl RegionFlushTask {
time_range: sst_info.time_range,
level: 0,
file_size: sst_info.file_size,
max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
available_indexes: sst_info.index_metadata.build_available_indexes(),
indexes: sst_info.index_metadata.build_indexes(),
index_file_size: sst_info.index_metadata.file_size,
@@ -774,12 +773,7 @@ fn memtable_flat_sources(
let iter = only_range.build_record_batch_iter(None)?;
// Dedup according to append mode and merge mode.
// Even single range may have duplicate rows.
let iter = maybe_dedup_one(
options.append_mode,
options.merge_mode(),
field_column_start,
iter,
);
let iter = maybe_dedup_one(options, field_column_start, iter);
flat_sources.sources.push(FlatSource::Iter(iter));
};
} else {
@@ -847,18 +841,17 @@ fn merge_and_dedup(
Ok(maybe_dedup)
}
pub fn maybe_dedup_one(
append_mode: bool,
merge_mode: MergeMode,
fn maybe_dedup_one(
options: &RegionOptions,
field_column_start: usize,
input_iter: BoxedRecordBatchIterator,
) -> BoxedRecordBatchIterator {
if append_mode {
if options.append_mode {
// No dedup in append mode
input_iter
} else {
// Dedup according to merge mode.
match merge_mode {
match options.merge_mode() {
MergeMode::LastRow => {
Box::new(FlatDedupIterator::new(input_iter, FlatLastRow::new(false)))
}

View File

@@ -157,8 +157,6 @@ impl ManifestObjectStore {
total_manifest_size: Arc<AtomicU64>,
manifest_cache: Option<ManifestCache>,
) -> Self {
common_telemetry::info!("Create manifest store, cache: {}", manifest_cache.is_some());
let path = util::normalize_dir(path);
let staging_path = {
// Convert "region_dir/manifest/" to "region_dir/staging/manifest/"

View File

@@ -244,7 +244,6 @@ async fn checkpoint_with_different_compression_types() {
time_range: (0.into(), 10000000.into()),
level: 0,
file_size: 1024000,
max_row_group_uncompressed_size: 1024000,
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
@@ -310,7 +309,6 @@ fn generate_action_lists(num: usize) -> (Vec<FileId>, Vec<RegionMetaActionList>)
time_range: (0.into(), 10000000.into()),
level: 0,
file_size: 1024000,
max_row_group_uncompressed_size: 1024000,
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,

View File

@@ -55,8 +55,10 @@ pub mod time_partition;
pub mod time_series;
pub(crate) mod version;
#[cfg(any(test, feature = "test"))]
pub use bulk::part::BulkPart;
pub use bulk::part::{
BulkPart, BulkPartEncoder, BulkPartMeta, UnorderedPart, record_batch_estimated_size,
BulkPartEncoder, BulkPartMeta, UnorderedPart, record_batch_estimated_size,
sort_primary_key_record_batch,
};
#[cfg(any(test, feature = "test"))]

View File

@@ -668,10 +668,10 @@ impl BulkMemtable {
}
/// Iterator builder for bulk range
pub struct BulkRangeIterBuilder {
pub part: BulkPart,
pub context: Arc<BulkIterContext>,
pub sequence: Option<SequenceRange>,
struct BulkRangeIterBuilder {
part: BulkPart,
context: Arc<BulkIterContext>,
sequence: Option<SequenceRange>,
}
impl IterBuilder for BulkRangeIterBuilder {
@@ -1188,6 +1188,7 @@ impl MemtableBuilder for BulkMemtableBuilder {
#[cfg(test)]
mod tests {
use mito_codec::row_converter::build_primary_key_codec;
use super::*;

View File

@@ -974,19 +974,6 @@ impl EncodedBulkPart {
/// Returns a `SstInfo` instance with information derived from this bulk part's metadata
pub(crate) fn to_sst_info(&self, file_id: FileId) -> SstInfo {
let unit = self.metadata.region_metadata.time_index_type().unit();
let max_row_group_uncompressed_size: u64 = self
.metadata
.parquet_metadata
.row_groups()
.iter()
.map(|rg| {
rg.columns()
.iter()
.map(|c| c.uncompressed_size() as u64)
.sum::<u64>()
})
.max()
.unwrap_or(0);
SstInfo {
file_id,
time_range: (
@@ -994,7 +981,6 @@ impl EncodedBulkPart {
Timestamp::new(self.metadata.max_timestamp, unit),
),
file_size: self.data.len() as u64,
max_row_group_uncompressed_size,
num_rows: self.metadata.num_rows,
num_row_groups: self.metadata.parquet_metadata.num_row_groups() as u64,
file_metadata: Some(self.metadata.parquet_metadata.clone()),
@@ -1211,24 +1197,343 @@ impl BulkPartEncoder {
}
}
/// Converts mutations to record batches.
fn mutations_to_record_batch(
mutations: &[Mutation],
metadata: &RegionMetadataRef,
pk_encoder: &DensePrimaryKeyCodec,
dedup: bool,
) -> Result<Option<(RecordBatch, i64, i64)>> {
let total_rows: usize = mutations
.iter()
.map(|m| m.rows.as_ref().map(|r| r.rows.len()).unwrap_or(0))
.sum();
if total_rows == 0 {
return Ok(None);
}
let mut pk_builder = BinaryBuilder::with_capacity(total_rows, 0);
let mut ts_vector: Box<dyn MutableVector> = metadata
.time_index_column()
.column_schema
.data_type
.create_mutable_vector(total_rows);
let mut sequence_builder = UInt64Builder::with_capacity(total_rows);
let mut op_type_builder = UInt8Builder::with_capacity(total_rows);
let mut field_builders: Vec<Box<dyn MutableVector>> = metadata
.field_columns()
.map(|f| f.column_schema.data_type.create_mutable_vector(total_rows))
.collect();
let mut pk_buffer = vec![];
for m in mutations {
let Some(key_values) = KeyValuesRef::new(metadata, m) else {
continue;
};
for row in key_values.iter() {
pk_buffer.clear();
pk_encoder
.encode_to_vec(row.primary_keys(), &mut pk_buffer)
.context(EncodeSnafu)?;
pk_builder.append_value(pk_buffer.as_bytes());
ts_vector.push_value_ref(&row.timestamp());
sequence_builder.append_value(row.sequence());
op_type_builder.append_value(row.op_type() as u8);
for (builder, field) in field_builders.iter_mut().zip(row.fields()) {
builder.push_value_ref(&field);
}
}
}
let arrow_schema = to_sst_arrow_schema(metadata);
// safety: timestamp column must be valid, and values must not be None.
let timestamp_unit = metadata
.time_index_column()
.column_schema
.data_type
.as_timestamp()
.unwrap()
.unit();
let sorter = ArraysSorter {
encoded_primary_keys: pk_builder.finish(),
timestamp_unit,
timestamp: ts_vector.to_vector().to_arrow_array(),
sequence: sequence_builder.finish(),
op_type: op_type_builder.finish(),
fields: field_builders
.iter_mut()
.map(|f| f.to_vector().to_arrow_array()),
dedup,
arrow_schema,
};
sorter.sort().map(Some)
}
struct ArraysSorter<I> {
encoded_primary_keys: BinaryArray,
timestamp_unit: TimeUnit,
timestamp: ArrayRef,
sequence: UInt64Array,
op_type: UInt8Array,
fields: I,
dedup: bool,
arrow_schema: SchemaRef,
}
impl<I> ArraysSorter<I>
where
I: Iterator<Item = ArrayRef>,
{
/// Converts arrays to record batch.
fn sort(self) -> Result<(RecordBatch, i64, i64)> {
debug_assert!(!self.timestamp.is_empty());
debug_assert!(self.timestamp.len() == self.sequence.len());
debug_assert!(self.timestamp.len() == self.op_type.len());
debug_assert!(self.timestamp.len() == self.encoded_primary_keys.len());
let timestamp_iter = timestamp_array_to_iter(self.timestamp_unit, &self.timestamp);
let (mut min_timestamp, mut max_timestamp) = (i64::MAX, i64::MIN);
let mut to_sort = self
.encoded_primary_keys
.iter()
.zip(timestamp_iter)
.zip(self.sequence.iter())
.map(|((pk, timestamp), sequence)| {
max_timestamp = max_timestamp.max(*timestamp);
min_timestamp = min_timestamp.min(*timestamp);
(pk, timestamp, sequence)
})
.enumerate()
.collect::<Vec<_>>();
to_sort.sort_unstable_by(|(_, (l_pk, l_ts, l_seq)), (_, (r_pk, r_ts, r_seq))| {
l_pk.cmp(r_pk)
.then(l_ts.cmp(r_ts))
.then(l_seq.cmp(r_seq).reverse())
});
if self.dedup {
// Dedup by timestamps while ignore sequence.
to_sort.dedup_by(|(_, (l_pk, l_ts, _)), (_, (r_pk, r_ts, _))| {
l_pk == r_pk && l_ts == r_ts
});
}
let indices = UInt32Array::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
let pk_dictionary = Arc::new(binary_array_to_dictionary(
// safety: pk must be BinaryArray
arrow::compute::take(
&self.encoded_primary_keys,
&indices,
Some(TakeOptions {
check_bounds: false,
}),
)
.context(ComputeArrowSnafu)?
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap(),
)?) as ArrayRef;
let mut arrays = Vec::with_capacity(self.arrow_schema.fields.len());
for arr in self.fields {
arrays.push(
arrow::compute::take(
&arr,
&indices,
Some(TakeOptions {
check_bounds: false,
}),
)
.context(ComputeArrowSnafu)?,
);
}
let timestamp = arrow::compute::take(
&self.timestamp,
&indices,
Some(TakeOptions {
check_bounds: false,
}),
)
.context(ComputeArrowSnafu)?;
arrays.push(timestamp);
arrays.push(pk_dictionary);
arrays.push(
arrow::compute::take(
&self.sequence,
&indices,
Some(TakeOptions {
check_bounds: false,
}),
)
.context(ComputeArrowSnafu)?,
);
arrays.push(
arrow::compute::take(
&self.op_type,
&indices,
Some(TakeOptions {
check_bounds: false,
}),
)
.context(ComputeArrowSnafu)?,
);
let batch = RecordBatch::try_new(self.arrow_schema, arrays).context(NewRecordBatchSnafu)?;
Ok((batch, min_timestamp, max_timestamp))
}
}
/// Converts timestamp array to an iter of i64 values.
fn timestamp_array_to_iter(
timestamp_unit: TimeUnit,
timestamp: &ArrayRef,
) -> impl Iterator<Item = &i64> {
match timestamp_unit {
// safety: timestamp column must be valid.
TimeUnit::Second => timestamp
.as_any()
.downcast_ref::<TimestampSecondArray>()
.unwrap()
.values()
.iter(),
TimeUnit::Millisecond => timestamp
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.values()
.iter(),
TimeUnit::Microsecond => timestamp
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap()
.values()
.iter(),
TimeUnit::Nanosecond => timestamp
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap()
.values()
.iter(),
}
}
/// Converts a **sorted** [BinaryArray] to [DictionaryArray].
fn binary_array_to_dictionary(input: &BinaryArray) -> Result<PrimaryKeyArray> {
if input.is_empty() {
return Ok(DictionaryArray::new(
UInt32Array::from(Vec::<u32>::new()),
Arc::new(BinaryArray::from_vec(vec![])) as ArrayRef,
));
}
let mut keys = Vec::with_capacity(16);
let mut values = BinaryBuilder::new();
let mut prev: usize = 0;
keys.push(prev as u32);
values.append_value(input.value(prev));
for current_bytes in input.iter().skip(1) {
// safety: encoded pk must present.
let current_bytes = current_bytes.unwrap();
let prev_bytes = input.value(prev);
if current_bytes != prev_bytes {
values.append_value(current_bytes);
prev += 1;
}
keys.push(prev as u32);
}
Ok(DictionaryArray::new(
UInt32Array::from(keys),
Arc::new(values.finish()) as ArrayRef,
))
}
#[cfg(test)]
mod tests {
use std::collections::VecDeque;
use api::v1::{Row, SemanticType, WriteHint};
use datafusion_common::ScalarValue;
use datatypes::arrow::array::Float64Array;
use datatypes::prelude::{ConcreteDataType, Value};
use datatypes::prelude::{ConcreteDataType, ScalarVector, Value};
use datatypes::schema::ColumnSchema;
use datatypes::vectors::{Float64Vector, TimestampMillisecondVector};
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;
use store_api::storage::consts::ReservedColumnId;
use super::*;
use crate::memtable::bulk::context::BulkIterContext;
use crate::sst::parquet::format::{PrimaryKeyReadFormat, ReadFormat};
use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
use crate::test_util::memtable_util::{
build_key_values_with_ts_seq_values, metadata_for_test, region_metadata_to_row_schema,
};
fn check_binary_array_to_dictionary(
input: &[&[u8]],
expected_keys: &[u32],
expected_values: &[&[u8]],
) {
let input = BinaryArray::from_iter_values(input.iter());
let array = binary_array_to_dictionary(&input).unwrap();
assert_eq!(
&expected_keys,
&array.keys().iter().map(|v| v.unwrap()).collect::<Vec<_>>()
);
assert_eq!(
expected_values,
&array
.values()
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap()
.iter()
.map(|v| v.unwrap())
.collect::<Vec<_>>()
);
}
#[test]
fn test_binary_array_to_dictionary() {
check_binary_array_to_dictionary(&[], &[], &[]);
check_binary_array_to_dictionary(&["a".as_bytes()], &[0], &["a".as_bytes()]);
check_binary_array_to_dictionary(
&["a".as_bytes(), "a".as_bytes()],
&[0, 0],
&["a".as_bytes()],
);
check_binary_array_to_dictionary(
&["a".as_bytes(), "a".as_bytes(), "b".as_bytes()],
&[0, 0, 1],
&["a".as_bytes(), "b".as_bytes()],
);
check_binary_array_to_dictionary(
&[
"a".as_bytes(),
"a".as_bytes(),
"b".as_bytes(),
"c".as_bytes(),
],
&[0, 0, 1, 2],
&["a".as_bytes(), "b".as_bytes(), "c".as_bytes()],
);
}
struct MutationInput<'a> {
k0: &'a str,
k1: u32,
@@ -1244,6 +1549,232 @@ mod tests {
v1: &'a [Option<f64>],
}
fn check_mutations_to_record_batches(
input: &[MutationInput],
expected: &[BatchOutput],
expected_timestamp: (i64, i64),
dedup: bool,
) {
let metadata = metadata_for_test();
let mutations = input
.iter()
.map(|m| {
build_key_values_with_ts_seq_values(
&metadata,
m.k0.to_string(),
m.k1,
m.timestamps.iter().copied(),
m.v1.iter().copied(),
m.sequence,
)
.mutation
})
.collect::<Vec<_>>();
let total_rows: usize = mutations
.iter()
.flat_map(|m| m.rows.iter())
.map(|r| r.rows.len())
.sum();
let pk_encoder = DensePrimaryKeyCodec::new(&metadata);
let (batch, _, _) = mutations_to_record_batch(&mutations, &metadata, &pk_encoder, dedup)
.unwrap()
.unwrap();
let read_format = PrimaryKeyReadFormat::new_with_all_columns(metadata.clone());
let mut batches = VecDeque::new();
read_format
.convert_record_batch(&batch, None, &mut batches)
.unwrap();
if !dedup {
assert_eq!(
total_rows,
batches.iter().map(|b| { b.num_rows() }).sum::<usize>()
);
}
let batch_values = batches
.into_iter()
.map(|b| {
let pk_values = pk_encoder.decode(b.primary_key()).unwrap().into_dense();
let timestamps = b
.timestamps()
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap()
.iter_data()
.map(|v| v.unwrap().0.value())
.collect::<Vec<_>>();
let float_values = b.fields()[1]
.data
.as_any()
.downcast_ref::<Float64Vector>()
.unwrap()
.iter_data()
.collect::<Vec<_>>();
(pk_values, timestamps, float_values)
})
.collect::<Vec<_>>();
assert_eq!(expected.len(), batch_values.len());
for idx in 0..expected.len() {
assert_eq!(expected[idx].pk_values, &batch_values[idx].0);
assert_eq!(expected[idx].timestamps, &batch_values[idx].1);
assert_eq!(expected[idx].v1, &batch_values[idx].2);
}
}
#[test]
fn test_mutations_to_record_batch() {
check_mutations_to_record_batches(
&[MutationInput {
k0: "a",
k1: 0,
timestamps: &[0],
v1: &[Some(0.1)],
sequence: 0,
}],
&[BatchOutput {
pk_values: &[Value::String("a".into()), Value::UInt32(0)],
timestamps: &[0],
v1: &[Some(0.1)],
}],
(0, 0),
true,
);
check_mutations_to_record_batches(
&[
MutationInput {
k0: "a",
k1: 0,
timestamps: &[0],
v1: &[Some(0.1)],
sequence: 0,
},
MutationInput {
k0: "b",
k1: 0,
timestamps: &[0],
v1: &[Some(0.0)],
sequence: 0,
},
MutationInput {
k0: "a",
k1: 0,
timestamps: &[1],
v1: &[Some(0.2)],
sequence: 1,
},
MutationInput {
k0: "a",
k1: 1,
timestamps: &[1],
v1: &[Some(0.3)],
sequence: 2,
},
],
&[
BatchOutput {
pk_values: &[Value::String("a".into()), Value::UInt32(0)],
timestamps: &[0, 1],
v1: &[Some(0.1), Some(0.2)],
},
BatchOutput {
pk_values: &[Value::String("a".into()), Value::UInt32(1)],
timestamps: &[1],
v1: &[Some(0.3)],
},
BatchOutput {
pk_values: &[Value::String("b".into()), Value::UInt32(0)],
timestamps: &[0],
v1: &[Some(0.0)],
},
],
(0, 1),
true,
);
check_mutations_to_record_batches(
&[
MutationInput {
k0: "a",
k1: 0,
timestamps: &[0],
v1: &[Some(0.1)],
sequence: 0,
},
MutationInput {
k0: "b",
k1: 0,
timestamps: &[0],
v1: &[Some(0.0)],
sequence: 0,
},
MutationInput {
k0: "a",
k1: 0,
timestamps: &[0],
v1: &[Some(0.2)],
sequence: 1,
},
],
&[
BatchOutput {
pk_values: &[Value::String("a".into()), Value::UInt32(0)],
timestamps: &[0],
v1: &[Some(0.2)],
},
BatchOutput {
pk_values: &[Value::String("b".into()), Value::UInt32(0)],
timestamps: &[0],
v1: &[Some(0.0)],
},
],
(0, 0),
true,
);
check_mutations_to_record_batches(
&[
MutationInput {
k0: "a",
k1: 0,
timestamps: &[0],
v1: &[Some(0.1)],
sequence: 0,
},
MutationInput {
k0: "b",
k1: 0,
timestamps: &[0],
v1: &[Some(0.0)],
sequence: 0,
},
MutationInput {
k0: "a",
k1: 0,
timestamps: &[0],
v1: &[Some(0.2)],
sequence: 1,
},
],
&[
BatchOutput {
pk_values: &[Value::String("a".into()), Value::UInt32(0)],
timestamps: &[0, 0],
v1: &[Some(0.2), Some(0.1)],
},
BatchOutput {
pk_values: &[Value::String("b".into()), Value::UInt32(0)],
timestamps: &[0],
v1: &[Some(0.0)],
},
],
(0, 0),
false,
);
}
fn encode(input: &[MutationInput]) -> EncodedBulkPart {
let metadata = metadata_for_test();
let kvs = input

View File

@@ -157,35 +157,6 @@ lazy_static! {
"greptime_mito_inflight_compaction_count",
"inflight compaction count",
).unwrap();
/// Bytes reserved by compaction memory manager.
pub static ref COMPACTION_MEMORY_IN_USE: IntGauge =
register_int_gauge!(
"greptime_mito_compaction_memory_in_use_bytes",
"bytes currently reserved for compaction tasks",
)
.unwrap();
/// Configured compaction memory limit.
pub static ref COMPACTION_MEMORY_LIMIT: IntGauge =
register_int_gauge!(
"greptime_mito_compaction_memory_limit_bytes",
"maximum bytes allowed for compaction tasks",
)
.unwrap();
/// Wait time to obtain compaction memory.
pub static ref COMPACTION_MEMORY_WAIT: Histogram = register_histogram!(
"greptime_mito_compaction_memory_wait_seconds",
"time waiting for compaction memory",
// 0.01s ~ ~10s
exponential_buckets(0.01, 2.0, 10).unwrap(),
).unwrap();
/// Counter of rejected compaction memory allocations.
pub static ref COMPACTION_MEMORY_REJECTED: IntCounterVec =
register_int_counter_vec!(
"greptime_mito_compaction_memory_rejected_total",
"number of compaction tasks rejected due to memory limit",
&[TYPE_LABEL]
).unwrap();
}
// Query metrics.

View File

@@ -14,7 +14,7 @@
//! Utilities for scanners.
use std::collections::{BinaryHeap, HashMap, VecDeque};
use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
@@ -26,6 +26,7 @@ use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder,
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::timestamp::timestamp_array_to_primitive;
use futures::Stream;
use lazy_static::lazy_static;
use prometheus::IntGauge;
use smallvec::SmallVec;
use snafu::OptionExt;
@@ -52,6 +53,19 @@ use crate::sst::parquet::flat_format::time_index_column_index;
use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics};
use crate::sst::parquet::row_group::ParquetFetchMetrics;
lazy_static! {
/// Threshold for slow file scan warning in milliseconds.
/// Can be configured via SLOW_FILE_SCAN_THRESHOLD_MS environment variable.
/// Defaults to 1000ms (1 second).
static ref SLOW_FILE_SCAN_THRESHOLD: Duration = {
let threshold_ms = std::env::var("SLOW_FILE_SCAN_THRESHOLD_MS")
.ok()
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(1000);
Duration::from_millis(threshold_ms)
};
}
/// Per-file scan metrics.
#[derive(Default, Clone)]
pub struct FileScanMetrics {
@@ -206,35 +220,6 @@ pub(crate) struct ScanMetricsSet {
per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
}
/// Wrapper for file metrics that compares by total cost in reverse order.
/// This allows using BinaryHeap as a min-heap for efficient top-K selection.
struct CompareCostReverse<'a> {
total_cost: Duration,
file_id: RegionFileId,
metrics: &'a FileScanMetrics,
}
impl Ord for CompareCostReverse<'_> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// Reverse comparison: smaller costs are "greater"
other.total_cost.cmp(&self.total_cost)
}
}
impl PartialOrd for CompareCostReverse<'_> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Eq for CompareCostReverse<'_> {}
impl PartialEq for CompareCostReverse<'_> {
fn eq(&self, other: &Self) -> bool {
self.total_cost == other.total_cost
}
}
impl fmt::Debug for ScanMetricsSet {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let ScanMetricsSet {
@@ -409,44 +394,12 @@ impl fmt::Debug for ScanMetricsSet {
write!(f, ", \"dedup_metrics\":{:?}", dedup_metrics)?;
}
// Write top file metrics if present and non-empty
// Write per-file metrics if present and non-empty
if let Some(file_metrics) = per_file_metrics
&& !file_metrics.is_empty()
{
// Use min-heap (BinaryHeap with reverse comparison) to keep only top 10
let mut heap = BinaryHeap::new();
for (file_id, metrics) in file_metrics.iter() {
let total_cost =
metrics.build_part_cost + metrics.build_reader_cost + metrics.scan_cost;
if heap.len() < 10 {
// Haven't reached 10 yet, just push
heap.push(CompareCostReverse {
total_cost,
file_id: *file_id,
metrics,
});
} else if let Some(min_entry) = heap.peek() {
// If current cost is higher than the minimum in our top-10, replace it
if total_cost > min_entry.total_cost {
heap.pop();
heap.push(CompareCostReverse {
total_cost,
file_id: *file_id,
metrics,
});
}
}
}
let top_files = heap.into_sorted_vec();
write!(f, ", \"top_file_metrics\": {{")?;
for (i, item) in top_files.iter().enumerate() {
let CompareCostReverse {
total_cost: _,
file_id,
metrics,
} = item;
write!(f, ", \"per_file_metrics\": {{")?;
for (i, (file_id, metrics)) in file_metrics.iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
@@ -1107,6 +1060,18 @@ pub(crate) async fn scan_file_ranges(
None
};
// Warn if build_part_cost exceeds threshold
if reader_metrics.build_cost > *SLOW_FILE_SCAN_THRESHOLD {
let file = stream_ctx.input.file_from_index(index);
let file_id = file.file_id();
common_telemetry::warn!(
"Slow file part build detected - region_id: {}, file_id: {}, build_part_cost: {:?}",
part_metrics.0.region_id,
file_id,
reader_metrics.build_cost
);
}
Ok(build_file_range_scan_stream(
stream_ctx,
part_metrics,
@@ -1152,6 +1117,18 @@ pub(crate) async fn scan_flat_file_ranges(
None
};
// Warn if build_part_cost exceeds threshold
if reader_metrics.build_cost > *SLOW_FILE_SCAN_THRESHOLD {
let file = stream_ctx.input.file_from_index(index);
let file_id = file.file_id();
common_telemetry::warn!(
"Slow file part build detected - region_id: {}, file_id: {}, build_part_cost: {:?}",
part_metrics.0.region_id,
file_id,
reader_metrics.build_cost
);
}
Ok(build_flat_file_range_scan_stream(
stream_ctx,
part_metrics,
@@ -1195,6 +1172,20 @@ pub fn build_file_range_scan_stream(
if let Source::PruneReader(reader) = source {
let prune_metrics = reader.metrics();
// Warn if build_cost + scan_cost exceeds threshold
let total_cost = build_cost + prune_metrics.scan_cost;
if total_cost > *SLOW_FILE_SCAN_THRESHOLD {
let file_id = range.file_handle().file_id();
common_telemetry::warn!(
"Slow file scan detected - region_id: {}, file_id: {}, total_cost: {:?} (build_reader_cost: {:?}, scan_cost: {:?})",
part_metrics.0.region_id,
file_id,
total_cost,
build_cost,
prune_metrics.scan_cost
);
}
// Update per-file metrics if tracking is enabled
if let Some(file_metrics_map) = per_file_metrics.as_mut() {
let file_id = range.file_handle().file_id();
@@ -1262,6 +1253,20 @@ pub fn build_flat_file_range_scan_stream(
let prune_metrics = reader.metrics();
// Warn if build_cost + scan_cost exceeds threshold
let total_cost = build_cost + prune_metrics.scan_cost;
if total_cost > *SLOW_FILE_SCAN_THRESHOLD {
let file_id = range.file_handle().file_id();
common_telemetry::warn!(
"Slow file scan detected - region_id: {}, file_id: {}, total_cost: {:?} (build_reader_cost: {:?}, scan_cost: {:?})",
part_metrics.0.region_id,
file_id,
total_cost,
build_cost,
prune_metrics.scan_cost
);
}
// Update per-file metrics if tracking is enabled
if let Some(file_metrics_map) = per_file_metrics.as_mut() {
let file_id = range.file_handle().file_id();

View File

@@ -17,7 +17,6 @@
pub mod catchup;
pub mod opener;
pub mod options;
pub mod utils;
pub(crate) mod version;
use std::collections::hash_map::Entry;
@@ -35,11 +34,9 @@ use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{
RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState,
};
use store_api::region_request::PathType;
use store_api::sst_entry::ManifestSstEntry;
use store_api::storage::{FileId, RegionId, SequenceNumber};
use store_api::storage::{RegionId, SequenceNumber};
use tokio::sync::RwLockWriteGuard;
pub use utils::*;
use crate::access_layer::AccessLayerRef;
use crate::error::{
@@ -52,7 +49,6 @@ use crate::manifest::action::{
use crate::manifest::manager::RegionManifestManager;
use crate::region::version::{VersionControlRef, VersionRef};
use crate::request::{OnFailure, OptionOutputTx};
use crate::sst::file::FileMeta;
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::location::{index_file_path, sst_file_path};
use crate::time_provider::TimeProviderRef;
@@ -219,11 +215,6 @@ impl MitoRegion {
self.access_layer.table_dir()
}
/// Returns the path type of the region.
pub(crate) fn path_type(&self) -> PathType {
self.access_layer.path_type()
}
/// Returns whether the region is writable.
pub(crate) fn is_writable(&self) -> bool {
matches!(
@@ -666,16 +657,6 @@ impl MitoRegion {
.collect()
}
/// Returns the file metas of the region by file ids.
pub async fn file_metas(&self, file_ids: &[FileId]) -> Vec<Option<FileMeta>> {
let manifest_files = self.manifest_ctx.manifest().await.files.clone();
file_ids
.iter()
.map(|file_id| manifest_files.get(file_id).cloned())
.collect::<Vec<_>>()
}
/// Exit staging mode successfully by merging all staged manifests and making them visible.
pub(crate) async fn exit_staging_on_success(
&self,

View File

@@ -636,6 +636,54 @@ pub fn get_object_store(
}
}
/// A loader for loading metadata from a region dir.
#[derive(Debug, Clone)]
pub struct RegionMetadataLoader {
config: Arc<MitoConfig>,
object_store_manager: ObjectStoreManagerRef,
}
impl RegionMetadataLoader {
/// Creates a new `RegionOpenerBuilder`.
pub fn new(config: Arc<MitoConfig>, object_store_manager: ObjectStoreManagerRef) -> Self {
Self {
config,
object_store_manager,
}
}
/// Loads the metadata of the region from the region dir.
pub async fn load(
&self,
region_dir: &str,
region_options: &RegionOptions,
) -> Result<Option<RegionMetadataRef>> {
let manifest = self
.load_manifest(region_dir, &region_options.storage)
.await?;
Ok(manifest.map(|m| m.metadata.clone()))
}
/// Loads the manifest of the region from the region dir.
pub async fn load_manifest(
&self,
region_dir: &str,
storage: &Option<String>,
) -> Result<Option<Arc<RegionManifest>>> {
let object_store = get_object_store(storage, &self.object_store_manager)?;
let region_manifest_options =
RegionManifestOptions::new(&self.config, region_dir, &object_store);
let Some(manifest_manager) =
RegionManifestManager::open(region_manifest_options, &Default::default()).await?
else {
return Ok(None);
};
let manifest = manifest_manager.manifest();
Ok(Some(manifest))
}
}
/// Checks whether the recovered region has the same schema as region to create.
pub(crate) fn check_recovered_region(
recovered: &RegionMetadata,

View File

@@ -1,345 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::time::Instant;
use common_base::readable_size::ReadableSize;
use common_telemetry::{debug, error, info};
use futures::future::try_join_all;
use object_store::manager::ObjectStoreManagerRef;
use snafu::{ResultExt, ensure};
use store_api::metadata::RegionMetadataRef;
use store_api::region_request::PathType;
use store_api::storage::{FileId, IndexVersion, RegionId};
use crate::access_layer::AccessLayerRef;
use crate::config::MitoConfig;
use crate::error::{self, InvalidSourceAndTargetRegionSnafu, Result};
use crate::manifest::action::RegionManifest;
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::region::opener::get_object_store;
use crate::region::options::RegionOptions;
use crate::sst::file::{RegionFileId, RegionIndexId};
use crate::sst::location;
/// A loader for loading metadata from a region dir.
#[derive(Debug, Clone)]
pub struct RegionMetadataLoader {
config: Arc<MitoConfig>,
object_store_manager: ObjectStoreManagerRef,
}
impl RegionMetadataLoader {
/// Creates a new `RegionMetadataLoader`.
pub fn new(config: Arc<MitoConfig>, object_store_manager: ObjectStoreManagerRef) -> Self {
Self {
config,
object_store_manager,
}
}
/// Loads the metadata of the region from the region dir.
pub async fn load(
&self,
region_dir: &str,
region_options: &RegionOptions,
) -> Result<Option<RegionMetadataRef>> {
let manifest = self
.load_manifest(region_dir, &region_options.storage)
.await?;
Ok(manifest.map(|m| m.metadata.clone()))
}
/// Loads the manifest of the region from the region dir.
pub async fn load_manifest(
&self,
region_dir: &str,
storage: &Option<String>,
) -> Result<Option<Arc<RegionManifest>>> {
let object_store = get_object_store(storage, &self.object_store_manager)?;
let region_manifest_options =
RegionManifestOptions::new(&self.config, region_dir, &object_store);
let Some(manifest_manager) =
RegionManifestManager::open(region_manifest_options, &Default::default()).await?
else {
return Ok(None);
};
let manifest = manifest_manager.manifest();
Ok(Some(manifest))
}
}
/// A copier for copying files from a region to another region.
#[derive(Debug, Clone)]
pub struct RegionFileCopier {
access_layer: AccessLayerRef,
}
/// A descriptor for a file.
#[derive(Debug, Clone, Copy)]
pub enum FileDescriptor {
/// An index file.
Index {
file_id: FileId,
version: IndexVersion,
size: u64,
},
/// A data file.
Data { file_id: FileId, size: u64 },
}
impl FileDescriptor {
pub fn size(&self) -> u64 {
match self {
FileDescriptor::Index { size, .. } => *size,
FileDescriptor::Data { size, .. } => *size,
}
}
}
/// Builds the source and target file paths for a given file descriptor.
///
/// # Arguments
///
/// * `source_region_id`: The ID of the source region.
/// * `target_region_id`: The ID of the target region.
/// * `file_id`: The ID of the file.
///
/// # Returns
///
/// A tuple containing the source and target file paths.
fn build_copy_file_paths(
source_region_id: RegionId,
target_region_id: RegionId,
file_descriptor: FileDescriptor,
table_dir: &str,
path_type: PathType,
) -> (String, String) {
match file_descriptor {
FileDescriptor::Index {
file_id, version, ..
} => (
location::index_file_path(
table_dir,
RegionIndexId::new(RegionFileId::new(source_region_id, file_id), version),
path_type,
),
location::index_file_path(
table_dir,
RegionIndexId::new(RegionFileId::new(target_region_id, file_id), version),
path_type,
),
),
FileDescriptor::Data { file_id, .. } => (
location::sst_file_path(
table_dir,
RegionFileId::new(source_region_id, file_id),
path_type,
),
location::sst_file_path(
table_dir,
RegionFileId::new(target_region_id, file_id),
path_type,
),
),
}
}
fn build_delete_file_path(
target_region_id: RegionId,
file_descriptor: FileDescriptor,
table_dir: &str,
path_type: PathType,
) -> String {
match file_descriptor {
FileDescriptor::Index {
file_id, version, ..
} => location::index_file_path(
table_dir,
RegionIndexId::new(RegionFileId::new(target_region_id, file_id), version),
path_type,
),
FileDescriptor::Data { file_id, .. } => location::sst_file_path(
table_dir,
RegionFileId::new(target_region_id, file_id),
path_type,
),
}
}
impl RegionFileCopier {
pub fn new(access_layer: AccessLayerRef) -> Self {
Self { access_layer }
}
/// Copies files from a source region to a target region.
///
/// # Arguments
///
/// * `source_region_id`: The ID of the source region.
/// * `target_region_id`: The ID of the target region.
/// * `file_ids`: The IDs of the files to copy.
pub async fn copy_files(
&self,
source_region_id: RegionId,
target_region_id: RegionId,
file_ids: Vec<FileDescriptor>,
parallelism: usize,
) -> Result<()> {
ensure!(
source_region_id.table_id() == target_region_id.table_id(),
InvalidSourceAndTargetRegionSnafu {
source_region_id,
target_region_id,
},
);
let table_dir = self.access_layer.table_dir();
let path_type = self.access_layer.path_type();
let object_store = self.access_layer.object_store();
info!(
"Copying {} files from region {} to region {}",
file_ids.len(),
source_region_id,
target_region_id
);
debug!(
"Copying files: {:?} from region {} to region {}",
file_ids, source_region_id, target_region_id
);
let mut tasks = Vec::with_capacity(parallelism);
for skip in 0..parallelism {
let target_file_ids = file_ids.iter().skip(skip).step_by(parallelism).copied();
let object_store = object_store.clone();
tasks.push(async move {
for file_desc in target_file_ids {
let (source_path, target_path) = build_copy_file_paths(
source_region_id,
target_region_id,
file_desc,
table_dir,
path_type,
);
let now = Instant::now();
object_store
.copy(&source_path, &target_path)
.await
.inspect_err(
|e| error!(e; "Failed to copy file {} to {}", source_path, target_path),
)
.context(error::OpenDalSnafu)?;
let file_size = ReadableSize(file_desc.size());
info!(
"Copied file {} to {}, file size: {}, elapsed: {:?}",
source_path,
target_path,
file_size,
now.elapsed(),
);
}
Ok(())
});
}
if let Err(err) = try_join_all(tasks).await {
error!(err; "Failed to copy files from region {} to region {}", source_region_id, target_region_id);
self.clean_target_region(target_region_id, file_ids).await;
return Err(err);
}
Ok(())
}
/// Cleans the copied files from the target region.
async fn clean_target_region(&self, target_region_id: RegionId, file_ids: Vec<FileDescriptor>) {
let table_dir = self.access_layer.table_dir();
let path_type = self.access_layer.path_type();
let object_store = self.access_layer.object_store();
let delete_file_path = file_ids
.into_iter()
.map(|file_descriptor| {
build_delete_file_path(target_region_id, file_descriptor, table_dir, path_type)
})
.collect::<Vec<_>>();
debug!(
"Deleting files: {:?} after failed to copy files to target region {}",
delete_file_path, target_region_id
);
if let Err(err) = object_store.delete_iter(delete_file_path).await {
error!(err; "Failed to delete files from region {}", target_region_id);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_build_copy_file_paths() {
common_telemetry::init_default_ut_logging();
let file_id = FileId::random();
let source_region_id = RegionId::new(1, 1);
let target_region_id = RegionId::new(1, 2);
let file_descriptor = FileDescriptor::Data { file_id, size: 100 };
let table_dir = "/table_dir";
let path_type = PathType::Bare;
let (source_path, target_path) = build_copy_file_paths(
source_region_id,
target_region_id,
file_descriptor,
table_dir,
path_type,
);
assert_eq!(
source_path,
format!("/table_dir/1_0000000001/{}.parquet", file_id)
);
assert_eq!(
target_path,
format!("/table_dir/1_0000000002/{}.parquet", file_id)
);
let version = 1;
let file_descriptor = FileDescriptor::Index {
file_id,
version,
size: 100,
};
let (source_path, target_path) = build_copy_file_paths(
source_region_id,
target_region_id,
file_descriptor,
table_dir,
path_type,
);
assert_eq!(
source_path,
format!(
"/table_dir/1_0000000001/index/{}.{}.puffin",
file_id, version
)
);
assert_eq!(
target_path,
format!(
"/table_dir/1_0000000002/index/{}.{}.puffin",
file_id, version
)
);
}
}

View File

@@ -425,7 +425,6 @@ mod tests {
time_range: FileTimeRange::default(),
level: 0,
file_size: 1024,
max_row_group_uncompressed_size: 1024,
available_indexes: SmallVec::new(),
indexes: Default::default(),
index_file_size: 0,

View File

@@ -33,9 +33,7 @@ use snafu::{OptionExt, ResultExt, ensure};
use store_api::ManifestVersion;
use store_api::codec::{PrimaryKeyEncoding, infer_primary_key_encoding_from_hint};
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
use store_api::region_engine::{
MitoCopyRegionFromResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
};
use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
use store_api::region_request::{
AffectedRows, EnterStagingRequest, RegionAlterRequest, RegionBuildIndexRequest,
RegionBulkInsertsRequest, RegionCatchupRequest, RegionCloseRequest, RegionCompactRequest,
@@ -607,9 +605,6 @@ pub(crate) enum WorkerRequest {
/// Remap manifests request.
RemapManifests(RemapManifestsRequest),
/// Copy region from request.
CopyRegionFrom(CopyRegionFromRequest),
}
impl WorkerRequest {
@@ -818,24 +813,6 @@ impl WorkerRequest {
Ok((WorkerRequest::RemapManifests(request), receiver))
}
/// Converts [CopyRegionFromRequest] from a [CopyRegionFromRequest](store_api::region_engine::CopyRegionFromRequest).
pub(crate) fn try_from_copy_region_from_request(
region_id: RegionId,
store_api::region_engine::CopyRegionFromRequest {
source_region_id,
parallelism,
}: store_api::region_engine::CopyRegionFromRequest,
) -> Result<(WorkerRequest, Receiver<Result<MitoCopyRegionFromResponse>>)> {
let (sender, receiver) = oneshot::channel();
let request = CopyRegionFromRequest {
region_id,
source_region_id,
parallelism,
sender,
};
Ok((WorkerRequest::CopyRegionFrom(request), receiver))
}
}
/// DDL request to a region.
@@ -890,8 +867,6 @@ pub(crate) enum BackgroundNotify {
RegionEdit(RegionEditResult),
/// Enter staging result.
EnterStaging(EnterStagingResult),
/// Copy region result.
CopyRegionFromFinished(CopyRegionFromFinished),
}
/// Notifies a flush job is finished.
@@ -1048,16 +1023,6 @@ pub(crate) struct EnterStagingResult {
pub(crate) result: Result<()>,
}
#[derive(Debug)]
pub(crate) struct CopyRegionFromFinished {
/// Region id.
pub(crate) region_id: RegionId,
/// Region edit to apply.
pub(crate) edit: RegionEdit,
/// Result sender.
pub(crate) sender: Sender<Result<MitoCopyRegionFromResponse>>,
}
/// Request to edit a region directly.
#[derive(Debug)]
pub(crate) struct RegionEditRequest {
@@ -1112,18 +1077,6 @@ pub(crate) struct RemapManifestsRequest {
pub(crate) sender: Sender<Result<HashMap<RegionId, RegionManifest>>>,
}
#[derive(Debug)]
pub(crate) struct CopyRegionFromRequest {
/// The [`RegionId`] of the target region.
pub(crate) region_id: RegionId,
/// The [`RegionId`] of the source region.
pub(crate) source_region_id: RegionId,
/// The parallelism of the copy operation.
pub(crate) parallelism: usize,
/// Result sender.
pub(crate) sender: Sender<Result<MitoCopyRegionFromResponse>>,
}
#[cfg(test)]
mod tests {
use api::v1::value::ValueData;

View File

@@ -121,7 +121,7 @@ impl FlatSchemaOptions {
///
/// The schema is:
/// ```text
/// primary key columns, field columns, time index, __primary_key, __sequence, __op_type
/// primary key columns, field columns, time index, __prmary_key, __sequence, __op_type
/// ```
///
/// # Panics

View File

@@ -180,8 +180,6 @@ pub struct FileMeta {
pub level: Level,
/// Size of the file.
pub file_size: u64,
/// Maximum uncompressed row group size of the file. 0 means unknown.
pub max_row_group_uncompressed_size: u64,
/// Available indexes of the file.
pub available_indexes: IndexTypes,
/// Created indexes of the file for each column.
@@ -250,11 +248,7 @@ impl Debug for FileMeta {
)
})
.field("level", &self.level)
.field("file_size", &ReadableSize(self.file_size))
.field(
"max_row_group_uncompressed_size",
&ReadableSize(self.max_row_group_uncompressed_size),
);
.field("file_size", &ReadableSize(self.file_size));
if !self.available_indexes.is_empty() {
debug_struct
.field("available_indexes", &self.available_indexes)
@@ -658,7 +652,6 @@ mod tests {
time_range: FileTimeRange::default(),
level,
file_size: 0,
max_row_group_uncompressed_size: 0,
available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
indexes: vec![ColumnIndexMetadata {
column_id: 0,
@@ -710,7 +703,6 @@ mod tests {
time_range: FileTimeRange::default(),
level: 0,
file_size: 0,
max_row_group_uncompressed_size: 0,
available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
indexes: vec![ColumnIndexMetadata {
column_id: 0,

View File

@@ -251,7 +251,6 @@ mod tests {
time_range: FileTimeRange::default(),
level: 0,
file_size: 4096,
max_row_group_uncompressed_size: 4096,
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
@@ -322,7 +321,6 @@ mod tests {
time_range: FileTimeRange::default(),
level: 0,
file_size: 4096,
max_row_group_uncompressed_size: 4096,
available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
indexes: vec![ColumnIndexMetadata {
column_id: 0,

View File

@@ -224,7 +224,6 @@ mod tests {
time_range: FileTimeRange::default(),
level: 0,
file_size: 4096,
max_row_group_uncompressed_size: 4096,
available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
indexes: vec![ColumnIndexMetadata {
column_id: 0,

View File

@@ -32,7 +32,6 @@ use datatypes::arrow::array::BinaryArray;
use datatypes::arrow::record_batch::RecordBatch;
use mito_codec::index::IndexValuesCodec;
use mito_codec::row_converter::CompositeValues;
use object_store::ObjectStore;
use puffin_manager::SstPuffinManager;
use smallvec::{SmallVec, smallvec};
use snafu::{OptionExt, ResultExt};
@@ -43,7 +42,7 @@ use strum::IntoStaticStr;
use tokio::sync::mpsc::Sender;
use crate::access_layer::{AccessLayerRef, FilePathProvider, OperationType, RegionFilePathFactory};
use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
use crate::cache::file_cache::{FileType, IndexKey};
use crate::cache::write_cache::{UploadTracker, WriteCacheRef};
use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
use crate::error::{
@@ -77,30 +76,6 @@ pub(crate) const TYPE_INVERTED_INDEX: &str = "inverted_index";
pub(crate) const TYPE_FULLTEXT_INDEX: &str = "fulltext_index";
pub(crate) const TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
/// Triggers background download of an index file to the local cache.
pub(crate) fn trigger_index_background_download(
file_cache: Option<&FileCacheRef>,
file_id: &RegionIndexId,
file_size_hint: Option<u64>,
path_factory: &RegionFilePathFactory,
object_store: &ObjectStore,
) {
if let (Some(file_cache), Some(file_size)) = (file_cache, file_size_hint) {
let index_key = IndexKey::new(
file_id.region_id(),
file_id.file_id(),
FileType::Puffin(file_id.version),
);
let remote_path = path_factory.build_index_file_path(file_id.file_id);
file_cache.maybe_download_background(
index_key,
remote_path,
object_store.clone(),
file_size,
);
}
}
/// Output of the index creation.
#[derive(Debug, Clone, Default)]
pub struct IndexOutput {
@@ -685,8 +660,8 @@ impl IndexBuildTask {
// TODO(SNC123): optimize index batch
loop {
match parquet_reader.next_batch().await {
Ok(Some(mut batch)) => {
indexer.update(&mut batch).await;
Ok(Some(batch)) => {
indexer.update(&mut batch.clone()).await;
}
Ok(None) => break,
Err(e) => {
@@ -1574,7 +1549,6 @@ mod tests {
region_id,
file_id: sst_info.file_id,
file_size: sst_info.file_size,
max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
index_file_size: sst_info.index_metadata.file_size,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
@@ -1649,7 +1623,6 @@ mod tests {
region_id,
file_id: sst_info.file_id,
file_size: sst_info.file_size,
max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
index_file_size: sst_info.index_metadata.file_size,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
@@ -1753,7 +1726,6 @@ mod tests {
region_id,
file_id: sst_info.file_id,
file_size: sst_info.file_size,
max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
index_file_size: sst_info.index_metadata.file_size,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
@@ -1819,7 +1791,6 @@ mod tests {
ReadableSize::mb(10),
None,
None,
true, // enable_background_worker
factory,
intm_manager,
ReadableSize::mb(10),

View File

@@ -45,10 +45,10 @@ use crate::error::{
};
use crate::metrics::INDEX_APPLY_ELAPSED;
use crate::sst::file::RegionIndexId;
use crate::sst::index::TYPE_BLOOM_FILTER_INDEX;
use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
pub use crate::sst::index::bloom_filter::applier::builder::BloomFilterIndexApplierBuilder;
use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
use crate::sst::index::{TYPE_BLOOM_FILTER_INDEX, trigger_index_background_download};
/// Metrics for tracking bloom filter index apply operations.
#[derive(Default, Clone)]
@@ -378,20 +378,12 @@ impl BloomFilterIndexApplier {
column_id: ColumnId,
file_size_hint: Option<u64>,
) -> Result<BlobReader> {
let path_factory = RegionFilePathFactory::new(self.table_dir.clone(), self.path_type);
// Trigger background download if file cache and file size are available
trigger_index_background_download(
self.file_cache.as_ref(),
&file_id,
file_size_hint,
&path_factory,
&self.object_store,
);
let puffin_manager = self
.puffin_manager_factory
.build(self.object_store.clone(), path_factory)
.build(
self.object_store.clone(),
RegionFilePathFactory::new(self.table_dir.clone(), self.path_type),
)
.with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
let blob_name = Self::column_blob_name(column_id);

View File

@@ -45,12 +45,12 @@ use crate::error::{
};
use crate::metrics::INDEX_APPLY_ELAPSED;
use crate::sst::file::RegionIndexId;
use crate::sst::index::TYPE_FULLTEXT_INDEX;
use crate::sst::index::fulltext_index::applier::builder::{FulltextRequest, FulltextTerm};
use crate::sst::index::fulltext_index::{INDEX_BLOB_TYPE_BLOOM, INDEX_BLOB_TYPE_TANTIVY};
use crate::sst::index::puffin_manager::{
PuffinManagerFactory, SstPuffinBlob, SstPuffinDir, SstPuffinReader,
};
use crate::sst::index::{TYPE_FULLTEXT_INDEX, trigger_index_background_download};
pub mod builder;
@@ -748,20 +748,12 @@ impl IndexSource {
file_id: RegionIndexId,
file_size_hint: Option<u64>,
) -> Result<SstPuffinReader> {
let path_factory = RegionFilePathFactory::new(self.table_dir.clone(), self.path_type);
// Trigger background download if file cache and file size are available
trigger_index_background_download(
self.file_cache.as_ref(),
&file_id,
file_size_hint,
&path_factory,
&self.remote_store,
);
let puffin_manager = self
.puffin_manager_factory
.build(self.remote_store.clone(), path_factory)
.build(
self.remote_store.clone(),
RegionFilePathFactory::new(self.table_dir.clone(), self.path_type),
)
.with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
let reader = puffin_manager

View File

@@ -41,9 +41,9 @@ use crate::error::{
};
use crate::metrics::{INDEX_APPLY_ELAPSED, INDEX_APPLY_MEMORY_USAGE};
use crate::sst::file::RegionIndexId;
use crate::sst::index::TYPE_INVERTED_INDEX;
use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
use crate::sst::index::{TYPE_INVERTED_INDEX, trigger_index_background_download};
/// Metrics for tracking inverted index apply operations.
#[derive(Default, Clone)]
@@ -311,20 +311,12 @@ impl InvertedIndexApplier {
file_id: RegionIndexId,
file_size_hint: Option<u64>,
) -> Result<BlobReader> {
let path_factory = RegionFilePathFactory::new(self.table_dir.clone(), self.path_type);
// Trigger background download if file cache and file size are available
trigger_index_background_download(
self.file_cache.as_ref(),
&file_id,
file_size_hint,
&path_factory,
&self.store,
);
let puffin_manager = self
.puffin_manager_factory
.build(self.store.clone(), path_factory)
.build(
self.store.clone(),
RegionFilePathFactory::new(self.table_dir.clone(), self.path_type),
)
.with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
puffin_manager

View File

@@ -76,8 +76,6 @@ pub struct SstInfo {
pub time_range: FileTimeRange,
/// File size in bytes.
pub file_size: u64,
/// Maximum uncompressed row group size in bytes. 0 if unknown.
pub max_row_group_uncompressed_size: u64,
/// Number of rows.
pub num_rows: usize,
/// Number of row groups
@@ -773,7 +771,6 @@ mod tests {
time_range: info.time_range,
level: 0,
file_size: info.file_size,
max_row_group_uncompressed_size: info.max_row_group_uncompressed_size,
available_indexes: info.index_metadata.build_available_indexes(),
indexes: info.index_metadata.build_indexes(),
index_file_size: info.index_metadata.file_size,

View File

@@ -269,7 +269,7 @@ impl ParquetReaderBuilder {
let file_size = self.file_handle.meta_ref().file_size;
// Loads parquet metadata of the file.
let (parquet_meta, cache_miss) = self
let parquet_meta = self
.read_parquet_metadata(&file_path, file_size, &mut metrics.metadata_cache_metrics)
.await?;
// Decodes region metadata.
@@ -326,22 +326,6 @@ impl ParquetReaderBuilder {
.row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
.await;
// Trigger background download if metadata had a cache miss and selection is not empty
if cache_miss && !selection.is_empty() {
use crate::cache::file_cache::{FileType, IndexKey};
let index_key = IndexKey::new(
self.file_handle.region_id(),
self.file_handle.file_id().file_id(),
FileType::Parquet,
);
self.cache_strategy.maybe_download_background(
index_key,
file_path.clone(),
self.object_store.clone(),
file_size,
);
}
let reader_builder = RowGroupReaderBuilder {
file_handle: self.file_handle.clone(),
file_path,
@@ -411,13 +395,12 @@ impl ParquetReaderBuilder {
}
/// Reads parquet metadata of specific file.
/// Returns (metadata, cache_miss_flag).
async fn read_parquet_metadata(
&self,
file_path: &str,
file_size: u64,
cache_metrics: &mut MetadataCacheMetrics,
) -> Result<(Arc<ParquetMetaData>, bool)> {
) -> Result<Arc<ParquetMetaData>> {
let start = Instant::now();
let _t = READ_STAGE_ELAPSED
.with_label_values(&["read_parquet_metadata"])
@@ -431,7 +414,7 @@ impl ParquetReaderBuilder {
.await
{
cache_metrics.metadata_load_cost += start.elapsed();
return Ok((metadata, false));
return Ok(metadata);
}
// Cache miss, load metadata directly.
@@ -444,7 +427,7 @@ impl ParquetReaderBuilder {
.put_parquet_meta_data(file_id, metadata.clone());
cache_metrics.metadata_load_cost += start.elapsed();
Ok((metadata, true))
Ok(metadata)
}
/// Computes row groups to read, along with their respective row selections.

View File

@@ -213,23 +213,11 @@ where
// convert FileMetaData to ParquetMetaData
let parquet_metadata = parse_parquet_metadata(file_meta)?;
let max_row_group_uncompressed_size: u64 = parquet_metadata
.row_groups()
.iter()
.map(|rg| {
rg.columns()
.iter()
.map(|c| c.uncompressed_size() as u64)
.sum::<u64>()
})
.max()
.unwrap_or(0);
let num_series = stats.series_estimator.finish();
ssts.push(SstInfo {
file_id: self.current_file,
time_range,
file_size,
max_row_group_uncompressed_size,
num_rows: stats.num_rows,
num_row_groups: parquet_metadata.num_row_groups() as u64,
file_metadata: Some(Arc::new(parquet_metadata)),

View File

@@ -655,7 +655,6 @@ impl TestEnv {
capacity,
None,
None,
true, // enable_background_worker
self.puffin_manager.clone(),
self.intermediate_manager.clone(),
None, // manifest_cache
@@ -677,7 +676,6 @@ impl TestEnv {
capacity,
None,
None,
true, // enable_background_worker
self.puffin_manager.clone(),
self.intermediate_manager.clone(),
ReadableSize::mb(0), // manifest_cache_capacity

View File

@@ -18,7 +18,6 @@ use std::sync::{Arc, Mutex};
use common_base::Plugins;
use common_datasource::compression::CompressionType;
use common_memory_manager::OnExhaustedPolicy;
use common_test_util::temp_dir::{TempDir, create_temp_dir};
use object_store::ObjectStore;
use object_store::services::Fs;
@@ -29,7 +28,6 @@ use tokio::sync::mpsc::Sender;
use crate::access_layer::{AccessLayer, AccessLayerRef};
use crate::cache::CacheManager;
use crate::compaction::CompactionScheduler;
use crate::compaction::memory_manager::{CompactionMemoryManager, new_compaction_memory_manager};
use crate::config::MitoConfig;
use crate::error::Result;
use crate::flush::FlushScheduler;
@@ -102,8 +100,6 @@ impl SchedulerEnv {
Arc::new(MitoConfig::default()),
WorkerListener::default(),
Plugins::new(),
Arc::new(new_compaction_memory_manager(0)),
OnExhaustedPolicy::default(),
)
}

View File

@@ -123,7 +123,6 @@ pub fn sst_file_handle_with_file_id(file_id: FileId, start_ms: i64, end_ms: i64)
),
level: 0,
file_size: 0,
max_row_group_uncompressed_size: 0,
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,

View File

@@ -101,7 +101,6 @@ impl VersionControlBuilder {
),
level: 0,
file_size: 0, // We don't care file size.
max_row_group_uncompressed_size: 0,
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
@@ -193,7 +192,6 @@ pub(crate) fn apply_edit(
),
level: 0,
file_size: 0, // We don't care file size.
max_row_group_uncompressed_size: 0,
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,

View File

@@ -19,7 +19,6 @@ mod handle_bulk_insert;
mod handle_catchup;
mod handle_close;
mod handle_compaction;
mod handle_copy_region;
mod handle_create;
mod handle_drop;
mod handle_enter_staging;
@@ -38,10 +37,10 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use common_base::Plugins;
use common_base::readable_size::ReadableSize;
use common_error::ext::BoxedError;
use common_meta::key::SchemaMetadataManagerRef;
use common_runtime::JoinHandle;
use common_stat::get_total_memory_bytes;
use common_telemetry::{error, info, warn};
use futures::future::try_join_all;
use object_store::manager::ObjectStoreManagerRef;
@@ -59,7 +58,6 @@ use tokio::sync::{Mutex, Semaphore, mpsc, oneshot, watch};
use crate::cache::write_cache::{WriteCache, WriteCacheRef};
use crate::cache::{CacheManager, CacheManagerRef};
use crate::compaction::CompactionScheduler;
use crate::compaction::memory_manager::{CompactionMemoryManager, new_compaction_memory_manager};
use crate::config::MitoConfig;
use crate::error::{self, CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
@@ -207,17 +205,6 @@ impl WorkerGroup {
.build(),
);
let time_provider = Arc::new(StdTimeProvider);
let total_memory = get_total_memory_bytes();
let total_memory = if total_memory > 0 {
total_memory as u64
} else {
0
};
let compaction_limit_bytes = config
.experimental_compaction_memory_limit
.resolve(total_memory);
let compaction_memory_manager =
Arc::new(new_compaction_memory_manager(compaction_limit_bytes));
let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job));
let workers = (0..config.num_workers)
@@ -234,7 +221,6 @@ impl WorkerGroup {
purge_scheduler: purge_scheduler.clone(),
listener: WorkerListener::default(),
cache_manager: cache_manager.clone(),
compaction_memory_manager: compaction_memory_manager.clone(),
puffin_manager_factory: puffin_manager_factory.clone(),
intermediate_manager: intermediate_manager.clone(),
time_provider: time_provider.clone(),
@@ -395,17 +381,6 @@ impl WorkerGroup {
.write_cache(write_cache)
.build(),
);
let total_memory = get_total_memory_bytes();
let total_memory = if total_memory > 0 {
total_memory as u64
} else {
0
};
let compaction_limit_bytes = config
.experimental_compaction_memory_limit
.resolve(total_memory);
let compaction_memory_manager =
Arc::new(new_compaction_memory_manager(compaction_limit_bytes));
let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job));
let workers = (0..config.num_workers)
.map(|id| {
@@ -421,7 +396,6 @@ impl WorkerGroup {
purge_scheduler: purge_scheduler.clone(),
listener: WorkerListener::new(listener.clone()),
cache_manager: cache_manager.clone(),
compaction_memory_manager: compaction_memory_manager.clone(),
puffin_manager_factory: puffin_manager_factory.clone(),
intermediate_manager: intermediate_manager.clone(),
time_provider: time_provider.clone(),
@@ -480,10 +454,10 @@ pub async fn write_cache_from_config(
config.write_cache_size,
config.write_cache_ttl,
Some(config.index_cache_percent),
config.enable_refill_cache_on_read,
puffin_manager_factory,
intermediate_manager,
config.manifest_cache_size,
// TODO(yingwen): Enable manifest cache after removing read cache.
ReadableSize(0),
)
.await?;
Ok(Some(Arc::new(cache)))
@@ -508,7 +482,6 @@ struct WorkerStarter<S> {
purge_scheduler: SchedulerRef,
listener: WorkerListener,
cache_manager: CacheManagerRef,
compaction_memory_manager: Arc<CompactionMemoryManager>,
puffin_manager_factory: PuffinManagerFactory,
intermediate_manager: IntermediateManager,
time_provider: TimeProviderRef,
@@ -561,11 +534,9 @@ impl<S: LogStore> WorkerStarter<S> {
self.compact_job_pool,
sender.clone(),
self.cache_manager.clone(),
self.config.clone(),
self.config,
self.listener.clone(),
self.plugins.clone(),
self.compaction_memory_manager.clone(),
self.config.experimental_compaction_on_exhausted,
),
stalled_requests: StalledRequests::default(),
listener: self.listener,
@@ -1039,9 +1010,6 @@ impl<S: LogStore> RegionWorkerLoop<S> {
WorkerRequest::RemapManifests(req) => {
self.handle_remap_manifests_request(req);
}
WorkerRequest::CopyRegionFrom(req) => {
self.handle_copy_region_from_request(req);
}
}
}
@@ -1157,9 +1125,6 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
BackgroundNotify::EnterStaging(req) => self.handle_enter_staging_result(req).await,
BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req).await,
BackgroundNotify::CopyRegionFromFinished(req) => {
self.handle_copy_region_from_finished(req)
}
}
}

View File

@@ -1,245 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_telemetry::{debug, error, info};
use snafu::OptionExt;
use store_api::region_engine::MitoCopyRegionFromResponse;
use store_api::storage::{FileId, RegionId};
use crate::error::{InvalidRequestSnafu, MissingManifestSnafu, Result};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::region::{FileDescriptor, MitoRegionRef, RegionFileCopier, RegionMetadataLoader};
use crate::request::{
BackgroundNotify, CopyRegionFromFinished, CopyRegionFromRequest, WorkerRequest,
};
use crate::sst::location::region_dir_from_table_dir;
use crate::worker::{RegionWorkerLoop, WorkerRequestWithTime};
impl<S> RegionWorkerLoop<S> {
pub(crate) fn handle_copy_region_from_request(&mut self, request: CopyRegionFromRequest) {
let region_id = request.region_id;
let source_region_id = request.source_region_id;
let sender = request.sender;
let region = match self.regions.writable_region(region_id) {
Ok(region) => region,
Err(e) => {
let _ = sender.send(Err(e));
return;
}
};
let same_table = source_region_id.table_id() == region_id.table_id();
if !same_table {
let _ = sender.send(
InvalidRequestSnafu {
region_id,
reason: format!("Source and target regions must be from the same table, source_region_id: {source_region_id}, target_region_id: {region_id}"),
}
.fail(),
);
return;
}
if source_region_id == region_id {
let _ = sender.send(
InvalidRequestSnafu {
region_id,
reason: format!("Source and target regions must be different, source_region_id: {source_region_id}, target_region_id: {region_id}"),
}
.fail(),
);
return;
}
let region_metadata_loader =
RegionMetadataLoader::new(self.config.clone(), self.object_store_manager.clone());
let worker_sender = self.sender.clone();
common_runtime::spawn_global(async move {
let (region_edit, file_ids) = match Self::copy_region_from(
&region,
region_metadata_loader,
source_region_id,
region_id,
request.parallelism.max(1),
)
.await
{
Ok(region_files) => region_files,
Err(e) => {
let _ = sender.send(Err(e));
return;
}
};
match region_edit {
Some(region_edit) => {
if let Err(e) = worker_sender
.send(WorkerRequestWithTime::new(WorkerRequest::Background {
region_id,
notify: BackgroundNotify::CopyRegionFromFinished(
CopyRegionFromFinished {
region_id,
edit: region_edit,
sender,
},
),
}))
.await
{
error!(e; "Failed to send copy region from finished notification to worker, region_id: {}", region_id);
}
}
None => {
let _ = sender.send(Ok(MitoCopyRegionFromResponse {
copied_file_ids: file_ids,
}));
}
}
});
}
pub(crate) fn handle_copy_region_from_finished(&mut self, request: CopyRegionFromFinished) {
let region_id = request.region_id;
let sender = request.sender;
let region = match self.regions.writable_region(region_id) {
Ok(region) => region,
Err(e) => {
let _ = sender.send(Err(e));
return;
}
};
let copied_file_ids = request
.edit
.files_to_add
.iter()
.map(|file_meta| file_meta.file_id)
.collect();
region
.version_control
.apply_edit(Some(request.edit), &[], region.file_purger.clone());
let _ = sender.send(Ok(MitoCopyRegionFromResponse { copied_file_ids }));
}
/// Returns the region edit and the file ids that were copied from the source region to the target region.
///
/// If no need to copy files, returns (None, file_ids).
async fn copy_region_from(
region: &MitoRegionRef,
region_metadata_loader: RegionMetadataLoader,
source_region_id: RegionId,
target_region_id: RegionId,
parallelism: usize,
) -> Result<(Option<RegionEdit>, Vec<FileId>)> {
let table_dir = region.table_dir();
let path_type = region.path_type();
let region_dir = region_dir_from_table_dir(table_dir, source_region_id, path_type);
info!(
"Loading source region manifest from region dir: {region_dir}, target region: {target_region_id}"
);
let source_region_manifest = region_metadata_loader
.load_manifest(&region_dir, &region.version().options.storage)
.await?
.context(MissingManifestSnafu {
region_id: source_region_id,
})?;
let mut files_to_copy = vec![];
let target_region_manifest = region.manifest_ctx.manifest().await;
let file_ids = source_region_manifest
.files
.keys()
.cloned()
.collect::<Vec<_>>();
debug!(
"source region files: {:?}, source region id: {}",
source_region_manifest.files, source_region_id
);
for (file_id, file_meta) in &source_region_manifest.files {
if !target_region_manifest.files.contains_key(file_id) {
let mut new_file_meta = file_meta.clone();
new_file_meta.region_id = target_region_id;
files_to_copy.push(new_file_meta);
}
}
if files_to_copy.is_empty() {
return Ok((None, file_ids));
}
let file_descriptors = files_to_copy
.iter()
.flat_map(|file_meta| {
if file_meta.exists_index() {
let region_index_id = file_meta.index_id();
let file_id = region_index_id.file_id.file_id();
let version = region_index_id.version;
let file_size = file_meta.file_size;
let index_file_size = file_meta.index_file_size();
vec![
FileDescriptor::Data {
file_id: file_meta.file_id,
size: file_size,
},
FileDescriptor::Index {
file_id,
version,
size: index_file_size,
},
]
} else {
let file_size = file_meta.file_size;
vec![FileDescriptor::Data {
file_id: file_meta.file_id,
size: file_size,
}]
}
})
.collect();
debug!("File descriptors to copy: {:?}", file_descriptors);
let copier = RegionFileCopier::new(region.access_layer());
// TODO(weny): ensure the target region is empty.
copier
.copy_files(
source_region_id,
target_region_id,
file_descriptors,
parallelism,
)
.await?;
let edit = RegionEdit {
files_to_add: files_to_copy,
files_to_remove: vec![],
timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
committed_sequence: None,
};
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
info!("Applying {edit:?} to region {target_region_id}, reason: CopyRegionFrom");
let version = region
.manifest_ctx
.manifest_manager
.write()
.await
.update(action_list, false)
.await?;
info!(
"Successfully update manifest version to {version}, region: {target_region_id}, reason: CopyRegionFrom"
);
Ok((Some(edit), file_ids))
}
}

View File

@@ -20,11 +20,13 @@ use common_telemetry::info;
use futures::future::try_join_all;
use partition::expr::PartitionExpr;
use snafu::{OptionExt, ResultExt};
use store_api::region_request::PathType;
use store_api::storage::RegionId;
use crate::error::{FetchManifestsSnafu, InvalidRequestSnafu, MissingManifestSnafu, Result};
use crate::manifest::action::RegionManifest;
use crate::region::{MitoRegionRef, RegionMetadataLoader};
use crate::region::MitoRegionRef;
use crate::region::opener::RegionMetadataLoader;
use crate::remap_manifest::RemapManifest;
use crate::request::RemapManifestsRequest;
use crate::sst::location::region_dir_from_table_dir;
@@ -85,10 +87,10 @@ impl<S> RegionWorkerLoop<S> {
let mut tasks = Vec::with_capacity(input_regions.len());
let region_options = region.version().options.clone();
let table_dir = region.table_dir();
let path_type = region.path_type();
let now = Instant::now();
for input_region in &input_regions {
let region_dir = region_dir_from_table_dir(table_dir, *input_region, path_type);
let region_dir = region_dir_from_table_dir(table_dir, *input_region, PathType::Bare);
let storage = region_options.storage.clone();
let moved_region_metadata_loader = region_metadata_loader.clone();
tasks.push(async move {

View File

@@ -21,6 +21,7 @@ derive_builder = { workspace = true, optional = true }
futures.workspace = true
humantime-serde.workspace = true
lazy_static.workspace = true
md5 = "0.7"
moka = { workspace = true, features = ["future"] }
opendal = { version = "0.54", features = [
"layers-tracing",

View File

@@ -16,7 +16,6 @@ use std::time::Duration;
use common_base::readable_size::ReadableSize;
use common_base::secrets::{ExposeSecret, SecretString};
use common_telemetry::tracing::warn;
use opendal::services::{Azblob, Gcs, Oss, S3};
use serde::{Deserialize, Serialize};
@@ -124,18 +123,11 @@ impl From<&S3Connection> for S3 {
fn from(connection: &S3Connection) -> Self {
let root = util::normalize_dir(&connection.root);
let mut builder = S3::default().root(&root).bucket(&connection.bucket);
if !connection.access_key_id.expose_secret().is_empty()
&& !connection.secret_access_key.expose_secret().is_empty()
{
builder = builder
.access_key_id(connection.access_key_id.expose_secret())
.secret_access_key(connection.secret_access_key.expose_secret());
} else {
warn!("No access key id or secret access key provided, using anonymous access");
builder = builder.allow_anonymous().disable_ec2_metadata();
}
let mut builder = S3::default()
.root(&root)
.bucket(&connection.bucket)
.access_key_id(connection.access_key_id.expose_secret())
.secret_access_key(connection.secret_access_key.expose_secret());
if let Some(endpoint) = &connection.endpoint {
builder = builder.endpoint(endpoint);

View File

@@ -12,9 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod lru_cache;
#[cfg(feature = "testing")]
pub mod mock;
pub use lru_cache::*;
pub use opendal::layers::*;
pub use prometheus::build_prometheus_metrics_layer;

View File

@@ -0,0 +1,134 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use opendal::Result;
use opendal::raw::oio::Reader;
use opendal::raw::{
Access, Layer, LayeredAccess, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead, RpWrite,
};
mod read_cache;
use std::time::Instant;
use common_telemetry::{error, info};
use read_cache::ReadCache;
use crate::layers::lru_cache::read_cache::CacheAwareDeleter;
/// An opendal layer with local LRU file cache supporting.
pub struct LruCacheLayer<C: Access> {
// The read cache
read_cache: ReadCache<C>,
}
impl<C: Access> Clone for LruCacheLayer<C> {
fn clone(&self) -> Self {
Self {
read_cache: self.read_cache.clone(),
}
}
}
impl<C: Access> LruCacheLayer<C> {
/// Create a [`LruCacheLayer`] with local file cache and capacity in bytes.
pub fn new(file_cache: Arc<C>, capacity: usize) -> Result<Self> {
let read_cache = ReadCache::new(file_cache, capacity);
Ok(Self { read_cache })
}
/// Recovers cache
pub async fn recover_cache(&self, sync: bool) {
let now = Instant::now();
let moved_read_cache = self.read_cache.clone();
let handle = tokio::spawn(async move {
match moved_read_cache.recover_cache().await {
Ok((entries, bytes)) => info!(
"Recovered {} entries and total size {} in bytes for LruCacheLayer, cost: {:?}",
entries,
bytes,
now.elapsed()
),
Err(err) => error!(err; "Failed to recover file cache."),
}
});
if sync {
let _ = handle.await;
}
}
/// Returns true when the local cache contains the specific file
pub async fn contains_file(&self, path: &str) -> bool {
self.read_cache.contains_file(path).await
}
/// Returns the read cache statistics info `(EntryCount, SizeInBytes)`.
pub async fn read_cache_stat(&self) -> (u64, u64) {
self.read_cache.cache_stat().await
}
}
impl<I: Access, C: Access> Layer<I> for LruCacheLayer<C> {
type LayeredAccess = LruCacheAccess<I, C>;
fn layer(&self, inner: I) -> Self::LayeredAccess {
LruCacheAccess {
inner,
read_cache: self.read_cache.clone(),
}
}
}
#[derive(Debug)]
pub struct LruCacheAccess<I, C> {
inner: I,
read_cache: ReadCache<C>,
}
impl<I: Access, C: Access> LayeredAccess for LruCacheAccess<I, C> {
type Inner = I;
type Reader = Reader;
type Writer = I::Writer;
type Lister = I::Lister;
type Deleter = CacheAwareDeleter<C, I::Deleter>;
fn inner(&self) -> &Self::Inner {
&self.inner
}
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
self.read_cache
.read_from_cache(&self.inner, path, args)
.await
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let result = self.inner.write(path, args).await;
self.read_cache.invalidate_entries_with_prefix(path);
result
}
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
self.inner
.delete()
.await
.map(|(rp, deleter)| (rp, CacheAwareDeleter::new(self.read_cache.clone(), deleter)))
}
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
self.inner.list(path, args).await
}
}

View File

@@ -0,0 +1,366 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_telemetry::{debug, trace};
use futures::{FutureExt, TryStreamExt};
use moka::future::Cache;
use moka::notification::ListenerFuture;
use moka::policy::EvictionPolicy;
use opendal::raw::oio::{Read, Reader, Write};
use opendal::raw::{Access, OpDelete, OpRead, OpStat, OpWrite, RpRead, oio};
use opendal::{Error as OpendalError, ErrorKind, OperatorBuilder, Result};
use crate::metrics::{
OBJECT_STORE_LRU_CACHE_BYTES, OBJECT_STORE_LRU_CACHE_ENTRIES, OBJECT_STORE_LRU_CACHE_HIT,
OBJECT_STORE_LRU_CACHE_MISS, OBJECT_STORE_READ_ERROR,
};
const RECOVER_CACHE_LIST_CONCURRENT: usize = 8;
/// Subdirectory of cached files for read.
///
/// This must contain three layers, corresponding to [`build_prometheus_metrics_layer`](object_store::layers::build_prometheus_metrics_layer).
const READ_CACHE_DIR: &str = "cache/object/read";
/// Cache value for read file
#[derive(Debug, Clone, PartialEq, Eq, Copy)]
enum ReadResult {
// Read success with size
Success(u32),
// File not found
NotFound,
}
impl ReadResult {
fn size_bytes(&self) -> u32 {
match self {
ReadResult::NotFound => 0,
ReadResult::Success(size) => *size,
}
}
}
/// Returns true when the path of the file can be cached.
fn can_cache(path: &str) -> bool {
// TODO(dennis): find a better way
!path.ends_with("_last_checkpoint")
}
/// Generate a unique cache key for the read path and range.
fn read_cache_key(path: &str, args: &OpRead) -> String {
format!(
"{READ_CACHE_DIR}/{:x}.cache-{}",
md5::compute(path),
args.range().to_header()
)
}
fn read_cache_root() -> String {
format!("/{READ_CACHE_DIR}/")
}
fn read_cache_key_prefix(path: &str) -> String {
format!("{READ_CACHE_DIR}/{:x}", md5::compute(path))
}
/// Local read cache for files in object storage
#[derive(Debug)]
pub(crate) struct ReadCache<C> {
/// Local file cache backend
file_cache: Arc<C>,
/// Local memory cache to track local cache files
mem_cache: Cache<String, ReadResult>,
}
impl<C> Clone for ReadCache<C> {
fn clone(&self) -> Self {
Self {
file_cache: self.file_cache.clone(),
mem_cache: self.mem_cache.clone(),
}
}
}
impl<C: Access> ReadCache<C> {
/// Create a [`ReadCache`] with capacity in bytes.
pub(crate) fn new(file_cache: Arc<C>, capacity: usize) -> Self {
let file_cache_cloned = OperatorBuilder::new(file_cache.clone()).finish();
let eviction_listener =
move |read_key: Arc<String>, read_result: ReadResult, cause| -> ListenerFuture {
// Delete the file from local file cache when it's purged from mem_cache.
OBJECT_STORE_LRU_CACHE_ENTRIES.dec();
let file_cache_cloned = file_cache_cloned.clone();
async move {
if let ReadResult::Success(size) = read_result {
OBJECT_STORE_LRU_CACHE_BYTES.sub(size as i64);
let result = file_cache_cloned.delete(&read_key).await;
debug!(
"Deleted local cache file `{}`, result: {:?}, cause: {:?}.",
read_key, result, cause
);
}
}
.boxed()
};
Self {
file_cache,
mem_cache: Cache::builder()
.max_capacity(capacity as u64)
.eviction_policy(EvictionPolicy::lru())
.weigher(|_key, value: &ReadResult| -> u32 {
// TODO(dennis): add key's length to weight?
value.size_bytes()
})
.async_eviction_listener(eviction_listener)
.support_invalidation_closures()
.build(),
}
}
/// Returns the cache's entry count and total approximate entry size in bytes.
pub(crate) async fn cache_stat(&self) -> (u64, u64) {
self.mem_cache.run_pending_tasks().await;
(self.mem_cache.entry_count(), self.mem_cache.weighted_size())
}
/// Invalidate all cache items belong to the specific path.
pub(crate) fn invalidate_entries_with_prefix(&self, path: &str) {
let prefix = read_cache_key_prefix(path);
// Safety: always ok when building cache with `support_invalidation_closures`.
self.mem_cache
.invalidate_entries_if(move |k: &String, &_v| k.starts_with(&prefix))
.ok();
}
/// Recover existing cache items from `file_cache` to `mem_cache`.
/// Return entry count and total approximate entry size in bytes.
pub(crate) async fn recover_cache(&self) -> Result<(u64, u64)> {
let op = OperatorBuilder::new(self.file_cache.clone()).finish();
let cloned_op = op.clone();
let root = read_cache_root();
let mut entries = op
.lister_with(&root)
.await?
.map_ok(|entry| async {
let (path, mut meta) = entry.into_parts();
// TODO(dennis): Use a better API, see https://github.com/apache/opendal/issues/6522
if meta.content_length() == 0 {
meta = cloned_op.stat(&path).await?;
}
Ok((path, meta))
})
.try_buffer_unordered(RECOVER_CACHE_LIST_CONCURRENT)
.try_collect::<Vec<_>>()
.await?;
while let Some((read_key, metadata)) = entries.pop() {
if !metadata.is_file() {
continue;
}
let size = metadata.content_length();
OBJECT_STORE_LRU_CACHE_ENTRIES.inc();
OBJECT_STORE_LRU_CACHE_BYTES.add(size as i64);
self.mem_cache
.insert(read_key.clone(), ReadResult::Success(size as u32))
.await;
}
Ok(self.cache_stat().await)
}
/// Returns true when the read cache contains the specific file.
pub(crate) async fn contains_file(&self, path: &str) -> bool {
self.mem_cache.run_pending_tasks().await;
self.mem_cache.contains_key(path)
&& self.file_cache.stat(path, OpStat::default()).await.is_ok()
}
/// Read from a specific path using the OpRead operation.
/// It will attempt to retrieve the data from the local cache.
/// If the data is not found in the local cache,
/// it will fall back to retrieving it from remote object storage
/// and cache the result locally.
pub(crate) async fn read_from_cache<I>(
&self,
inner: &I,
path: &str,
args: OpRead,
) -> Result<(RpRead, Reader)>
where
I: Access,
{
if !can_cache(path) {
return inner.read(path, args).await.map(to_output_reader);
}
let read_key = read_cache_key(path, &args);
let read_result = self
.mem_cache
.try_get_with(
read_key.clone(),
self.read_remote(inner, &read_key, path, args.clone()),
)
.await
.map_err(|e| OpendalError::new(e.kind(), e.to_string()))?;
match read_result {
ReadResult::Success(_) => {
// There is a concurrent issue here, the local cache may be purged
// while reading, we have to fall back to remote read
match self.file_cache.read(&read_key, OpRead::default()).await {
Ok(ret) => {
OBJECT_STORE_LRU_CACHE_HIT
.with_label_values(&["success"])
.inc();
Ok(to_output_reader(ret))
}
Err(_) => {
OBJECT_STORE_LRU_CACHE_MISS.inc();
inner.read(path, args).await.map(to_output_reader)
}
}
}
ReadResult::NotFound => {
OBJECT_STORE_LRU_CACHE_HIT
.with_label_values(&["not_found"])
.inc();
Err(OpendalError::new(
ErrorKind::NotFound,
format!("File not found: {path}"),
))
}
}
}
async fn try_write_cache<I>(&self, mut reader: I::Reader, read_key: &str) -> Result<usize>
where
I: Access,
{
let (_, mut writer) = self.file_cache.write(read_key, OpWrite::new()).await?;
let mut total = 0;
loop {
let bytes = reader.read().await?;
if bytes.is_empty() {
break;
}
total += bytes.len();
writer.write(bytes).await?;
}
// Call `close` to ensure data is written.
writer.close().await?;
Ok(total)
}
/// Read the file from remote storage. If success, write the content into local cache.
async fn read_remote<I>(
&self,
inner: &I,
read_key: &str,
path: &str,
args: OpRead,
) -> Result<ReadResult>
where
I: Access,
{
OBJECT_STORE_LRU_CACHE_MISS.inc();
let (_, reader) = inner.read(path, args).await?;
let result = self.try_write_cache::<I>(reader, read_key).await;
trace!(
"Read cache miss for key '{}' and fetch file '{}' from object store",
read_key, path,
);
match result {
Ok(read_bytes) => {
OBJECT_STORE_LRU_CACHE_ENTRIES.inc();
OBJECT_STORE_LRU_CACHE_BYTES.add(read_bytes as i64);
Ok(ReadResult::Success(read_bytes as u32))
}
Err(e) if e.kind() == ErrorKind::NotFound => {
OBJECT_STORE_READ_ERROR
.with_label_values(&[e.kind().to_string().as_str()])
.inc();
OBJECT_STORE_LRU_CACHE_ENTRIES.inc();
Ok(ReadResult::NotFound)
}
Err(e) => {
OBJECT_STORE_READ_ERROR
.with_label_values(&[e.kind().to_string().as_str()])
.inc();
Err(e)
}
}
}
}
pub struct CacheAwareDeleter<C, D> {
cache: ReadCache<C>,
deleter: D,
}
impl<C: Access, D: oio::Delete> CacheAwareDeleter<C, D> {
pub(crate) fn new(cache: ReadCache<C>, deleter: D) -> Self {
Self { cache, deleter }
}
}
impl<C: Access, D: oio::Delete> oio::Delete for CacheAwareDeleter<C, D> {
fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
self.cache.invalidate_entries_with_prefix(path);
self.deleter.delete(path, args)?;
Ok(())
}
async fn flush(&mut self) -> Result<usize> {
self.deleter.flush().await
}
}
fn to_output_reader<R: Read + 'static>(input: (RpRead, R)) -> (RpRead, Reader) {
(input.0, Box::new(input.1))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_can_cache() {
assert!(can_cache("test"));
assert!(can_cache("a/b/c.parquet"));
assert!(can_cache("1.json"));
assert!(can_cache("100.checkpoint"));
assert!(can_cache("test/last_checkpoint"));
assert!(!can_cache("test/__last_checkpoint"));
assert!(!can_cache("a/b/c/__last_checkpoint"));
}
}

View File

@@ -21,14 +21,12 @@ pub use opendal::raw::{
Access, Layer, LayeredAccess, OpDelete, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead,
RpWrite, oio,
};
use opendal::raw::{OpCopy, RpCopy};
pub use opendal::{Buffer, Error, ErrorKind, Metadata, Result};
pub type MockWriterFactory = Arc<dyn Fn(&str, OpWrite, oio::Writer) -> oio::Writer + Send + Sync>;
pub type MockReaderFactory = Arc<dyn Fn(&str, OpRead, oio::Reader) -> oio::Reader + Send + Sync>;
pub type MockListerFactory = Arc<dyn Fn(&str, OpList, oio::Lister) -> oio::Lister + Send + Sync>;
pub type MockDeleterFactory = Arc<dyn Fn(oio::Deleter) -> oio::Deleter + Send + Sync>;
pub type CopyInterceptor = Arc<dyn Fn(&str, &str, OpCopy) -> Option<Result<RpCopy>> + Send + Sync>;
#[derive(Builder)]
pub struct MockLayer {
@@ -40,8 +38,6 @@ pub struct MockLayer {
lister_factory: Option<MockListerFactory>,
#[builder(setter(strip_option), default)]
deleter_factory: Option<MockDeleterFactory>,
#[builder(setter(strip_option), default)]
copy_interceptor: Option<CopyInterceptor>,
}
impl Clone for MockLayer {
@@ -51,7 +47,6 @@ impl Clone for MockLayer {
reader_factory: self.reader_factory.clone(),
lister_factory: self.lister_factory.clone(),
deleter_factory: self.deleter_factory.clone(),
copy_interceptor: self.copy_interceptor.clone(),
}
}
}
@@ -66,7 +61,6 @@ impl<A: Access> Layer<A> for MockLayer {
reader_factory: self.reader_factory.clone(),
lister_factory: self.lister_factory.clone(),
deleter_factory: self.deleter_factory.clone(),
copy_interceptor: self.copy_interceptor.clone(),
}
}
}
@@ -77,7 +71,6 @@ pub struct MockAccessor<A> {
reader_factory: Option<MockReaderFactory>,
lister_factory: Option<MockListerFactory>,
deleter_factory: Option<MockDeleterFactory>,
copy_interceptor: Option<CopyInterceptor>,
}
impl<A: Debug> Debug for MockAccessor<A> {
@@ -221,16 +214,4 @@ impl<A: Access> LayeredAccess for MockAccessor<A> {
})
}
}
async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
let Some(copy_interceptor) = self.copy_interceptor.as_ref() else {
return self.inner.copy(from, to, args).await;
};
let Some(result) = copy_interceptor(from, to, args.clone()) else {
return self.inner.copy(from, to, args).await;
};
result
}
}

View File

@@ -13,3 +13,38 @@
// limitations under the License.
//! object-store metrics
/// Cache hit counter, no matter what the cache result is.
use lazy_static::lazy_static;
use prometheus::*;
lazy_static! {
/// Cache hit counter, no matter what the cache result is.
pub static ref OBJECT_STORE_LRU_CACHE_HIT: IntCounterVec = register_int_counter_vec!(
"greptime_object_store_lru_cache_hit",
"object store lru cache hit",
&["result"]
)
.unwrap();
/// Cache miss counter
pub static ref OBJECT_STORE_LRU_CACHE_MISS: IntCounter =
register_int_counter!("greptime_object_store_lru_cache_miss", "object store lru cache miss")
.unwrap();
/// Object store read error counter
pub static ref OBJECT_STORE_READ_ERROR: IntCounterVec = register_int_counter_vec!(
"greptime_object_store_read_errors",
"object store read errors",
&["kind"]
)
.unwrap();
/// Cache entry number
pub static ref OBJECT_STORE_LRU_CACHE_ENTRIES: IntGauge =
register_int_gauge!("greptime_object_store_lru_cache_entries", "object store lru cache entries")
.unwrap();
/// Cache size in bytes
pub static ref OBJECT_STORE_LRU_CACHE_BYTES: IntGauge =
register_int_gauge!("greptime_object_store_lru_cache_bytes", "object store lru cache bytes")
.unwrap();
}

View File

@@ -13,15 +13,22 @@
// limitations under the License.
use std::env;
use std::sync::Arc;
use anyhow::Result;
use common_telemetry::info;
use common_test_util::temp_dir::create_temp_dir;
use object_store::ObjectStore;
use object_store::layers::LruCacheLayer;
use object_store::services::{Fs, S3};
use object_store::test_util::TempFolder;
use opendal::EntryMode;
use object_store::{ObjectStore, ObjectStoreBuilder};
use opendal::raw::oio::{List, Read};
use opendal::raw::{Access, OpList, OpRead};
use opendal::services::{Azblob, Gcs, Oss};
use opendal::{EntryMode, OperatorBuilder};
/// Duplicate of the constant in `src/layers/lru_cache/read_cache.rs`
const READ_CACHE_DIR: &str = "cache/object/read";
async fn test_object_crud(store: &ObjectStore) -> Result<()> {
// Create object handler.
@@ -224,3 +231,249 @@ async fn test_gcs_backend() -> Result<()> {
}
Ok(())
}
#[tokio::test]
async fn test_file_backend_with_lru_cache() -> Result<()> {
common_telemetry::init_default_ut_logging();
let data_dir = create_temp_dir("test_file_backend_with_lru_cache");
let tmp_dir = create_temp_dir("test_file_backend_with_lru_cache");
let builder = Fs::default()
.root(&data_dir.path().to_string_lossy())
.atomic_write_dir(&tmp_dir.path().to_string_lossy());
let store = builder.build().unwrap();
let cache_dir = create_temp_dir("test_file_backend_with_lru_cache");
let cache_layer = {
let builder = Fs::default()
.root(&cache_dir.path().to_string_lossy())
.atomic_write_dir(&cache_dir.path().to_string_lossy());
let file_cache = Arc::new(builder.build().unwrap());
let cache_layer = LruCacheLayer::new(file_cache, 32).unwrap();
cache_layer.recover_cache(true).await;
cache_layer
};
let store = OperatorBuilder::new(store)
.layer(cache_layer.clone())
.finish();
test_object_crud(&store).await?;
test_object_list(&store).await?;
assert_eq!(cache_layer.read_cache_stat().await, (0, 0));
Ok(())
}
async fn assert_lru_cache<C: Access>(cache_layer: &LruCacheLayer<C>, file_names: &[&str]) {
for file_name in file_names {
let file_path = format!("{READ_CACHE_DIR}/{file_name}");
assert!(cache_layer.contains_file(&file_path).await, "{file_path:?}");
}
}
async fn assert_cache_files<C: Access>(
store: &C,
file_names: &[&str],
file_contents: &[&str],
) -> Result<()> {
let (_, mut lister) = store.list("/", OpList::default()).await?;
let mut objects = vec![];
while let Some(e) = lister.next().await? {
if e.mode() == EntryMode::FILE {
objects.push(e);
}
}
// compare the cache file with the expected cache file; ignore orders
for o in objects {
let position = file_names.iter().position(|&x| x == o.path());
assert!(position.is_some(), "file not found: {}", o.path());
let position = position.unwrap();
let (_, mut r) = store.read(o.path(), OpRead::default()).await.unwrap();
let bs = r.read_all().await.unwrap();
assert_eq!(
file_contents[position],
String::from_utf8(bs.to_vec())?,
"file content not match: {}",
o.path()
);
}
Ok(())
}
#[tokio::test]
async fn test_object_store_cache_policy() -> Result<()> {
common_telemetry::init_default_ut_logging();
// create file storage
let root_dir = create_temp_dir("test_object_store_cache_policy");
let store = OperatorBuilder::new(
Fs::default()
.root(&root_dir.path().to_string_lossy())
.atomic_write_dir(&root_dir.path().to_string_lossy())
.build()
.unwrap(),
)
.finish();
// create file cache layer
let cache_dir = create_temp_dir("test_object_store_cache_policy_cache");
let atomic_temp_dir = create_temp_dir("test_object_store_cache_policy_cache_tmp");
let builder = Fs::default()
.root(&cache_dir.path().to_string_lossy())
.atomic_write_dir(&atomic_temp_dir.path().to_string_lossy());
let file_cache = Arc::new(builder.build().unwrap());
let cache_store = file_cache.clone();
// create operator for cache dir to verify cache file
let cache_layer = LruCacheLayer::new(cache_store.clone(), 38).unwrap();
cache_layer.recover_cache(true).await;
let store = store.layer(cache_layer.clone());
// create several object handler.
// write data into object;
let p1 = "test_file1";
let p2 = "test_file2";
store.write(p1, "Hello, object1!").await.unwrap();
store.write(p2, "Hello, object2!").await.unwrap();
// Try to read p1 and p2
let _ = store.read_with(p1).range(0..).await?;
let _ = store.read(p1).await?;
let _ = store.read_with(p2).range(0..).await?;
let _ = store.read_with(p2).range(7..).await?;
let _ = store.read(p2).await?;
assert_eq!(cache_layer.read_cache_stat().await, (3, 38));
assert_cache_files(
&cache_store,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-",
],
&["Hello, object1!", "object2!", "Hello, object2!"],
)
.await?;
assert_lru_cache(
&cache_layer,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-",
],
)
.await;
// Delete p2 file
store.delete(p2).await.unwrap();
assert_eq!(cache_layer.read_cache_stat().await, (1, 15));
assert_cache_files(
&cache_store,
&["6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-"],
&["Hello, object1!"],
)
.await?;
assert_lru_cache(
&cache_layer,
&["6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-"],
)
.await;
// Read the deleted file without a deterministic range size requires an extra `stat.`
// Therefore, it won't go into the cache.
assert!(store.read(p2).await.is_err());
let p3 = "test_file3";
store.write(p3, "Hello, object3!").await.unwrap();
// Try to read p3
let _ = store.read(p3).await.unwrap();
let _ = store.read_with(p3).range(0..5).await.unwrap();
assert_eq!(cache_layer.read_cache_stat().await, (3, 35));
// However, The real open file happens after the reader is created.
// The reader will throw an error during the reading
// instead of returning `NotFound` during the reader creation.
// The entry count is 4, because we have the p2 `NotFound` cache.
assert!(store.read_with(p2).range(0..4).await.is_err());
assert_eq!(cache_layer.read_cache_stat().await, (3, 35));
assert_cache_files(
&cache_store,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4",
],
&["Hello, object1!", "Hello, object3!", "Hello"],
)
.await?;
assert_lru_cache(
&cache_layer,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4",
],
)
.await;
// try to read p1, p2, p3
let _ = store.read(p3).await.unwrap();
let _ = store.read_with(p3).range(0..5).await.unwrap();
assert!(store.read(p2).await.is_err());
// Read p1 with range `1..` , the existing p1 with range `0..` must be evicted.
let _ = store.read_with(p1).range(1..15).await.unwrap();
assert_eq!(cache_layer.read_cache_stat().await, (3, 34));
assert_cache_files(
&cache_store,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=1-14",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4",
],
&["ello, object1!", "Hello, object3!", "Hello"],
)
.await?;
assert_lru_cache(
&cache_layer,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=1-14",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4",
],
)
.await;
let metric_text = common_telemetry::dump_metrics().unwrap();
assert!(metric_text.contains("object_store_lru_cache_hit"));
assert!(metric_text.contains("object_store_lru_cache_miss"));
drop(cache_layer);
// Test recover
let cache_layer = LruCacheLayer::new(cache_store, 38).unwrap();
cache_layer.recover_cache(true).await;
// The p2 `NotFound` cache will not be recovered
assert_eq!(cache_layer.read_cache_stat().await, (3, 34));
assert_lru_cache(
&cache_layer,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=1-14",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4",
],
)
.await;
Ok(())
}

View File

@@ -36,8 +36,8 @@ use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::expressions::{CastExpr as PhyCast, Column as PhyColumn};
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties,
Partitioning, PhysicalExpr, PlanProperties, RecordBatchStream, SendableRecordBatchStream,
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PhysicalExpr,
PlanProperties, RecordBatchStream, SendableRecordBatchStream,
};
use datafusion::prelude::{Column, Expr};
use datatypes::prelude::{ConcreteDataType, DataType as GtDataType};
@@ -180,33 +180,10 @@ impl HistogramFold {
.index_of_column_by_name(None, &self.ts_column)
.unwrap();
let tag_columns = exec_input
.schema()
.fields()
.iter()
.enumerate()
.filter_map(|(idx, field)| {
if idx == le_column_index || idx == field_column_index || idx == ts_column_index {
None
} else {
Some(Arc::new(PhyColumn::new(field.name(), idx)) as _)
}
})
.collect::<Vec<_>>();
let mut partition_exprs = tag_columns.clone();
partition_exprs.push(Arc::new(PhyColumn::new(
self.input.schema().field(ts_column_index).name(),
ts_column_index,
)) as _);
let output_schema: SchemaRef = self.output_schema.inner().clone();
let properties = PlanProperties::new(
EquivalenceProperties::new(output_schema.clone()),
Partitioning::Hash(
partition_exprs.clone(),
exec_input.output_partitioning().partition_count(),
),
Partitioning::UnknownPartitioning(1),
EmissionType::Incremental,
Boundedness::Bounded,
);
@@ -215,8 +192,6 @@ impl HistogramFold {
field_column_index,
ts_column_index,
input: exec_input,
tag_columns,
partition_exprs,
quantile: self.quantile.into(),
output_schema,
metric: ExecutionPlanMetricsSet::new(),
@@ -278,9 +253,6 @@ pub struct HistogramFoldExec {
/// Index for field column in the schema of input.
field_column_index: usize,
ts_column_index: usize,
/// Tag columns are all columns except `le`, `field` and `ts` columns.
tag_columns: Vec<Arc<dyn PhysicalExpr>>,
partition_exprs: Vec<Arc<dyn PhysicalExpr>>,
quantile: f64,
metric: ExecutionPlanMetricsSet,
properties: PlanProperties,
@@ -297,10 +269,10 @@ impl ExecutionPlan for HistogramFoldExec {
fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
let mut cols = self
.tag_columns
.iter()
.tag_col_exprs()
.into_iter()
.map(|expr| PhysicalSortRequirement {
expr: expr.clone(),
expr,
options: None,
})
.collect::<Vec<PhysicalSortRequirement>>();
@@ -335,7 +307,7 @@ impl ExecutionPlan for HistogramFoldExec {
}
fn required_input_distribution(&self) -> Vec<Distribution> {
vec![Distribution::HashPartitioned(self.partition_exprs.clone())]
self.input.required_input_distribution()
}
fn maintains_input_order(&self) -> Vec<bool> {
@@ -352,27 +324,15 @@ impl ExecutionPlan for HistogramFoldExec {
children: Vec<Arc<dyn ExecutionPlan>>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
assert!(!children.is_empty());
let new_input = children[0].clone();
let properties = PlanProperties::new(
EquivalenceProperties::new(self.output_schema.clone()),
Partitioning::Hash(
self.partition_exprs.clone(),
new_input.output_partitioning().partition_count(),
),
EmissionType::Incremental,
Boundedness::Bounded,
);
Ok(Arc::new(Self {
input: new_input,
input: children[0].clone(),
metric: self.metric.clone(),
le_column_index: self.le_column_index,
ts_column_index: self.ts_column_index,
tag_columns: self.tag_columns.clone(),
partition_exprs: self.partition_exprs.clone(),
quantile: self.quantile,
output_schema: self.output_schema.clone(),
field_column_index: self.field_column_index,
properties,
properties: self.properties.clone(),
}))
}
@@ -434,6 +394,30 @@ impl ExecutionPlan for HistogramFoldExec {
}
}
impl HistogramFoldExec {
/// Return all the [PhysicalExpr] of tag columns in order.
///
/// Tag columns are all columns except `le`, `field` and `ts` columns.
pub fn tag_col_exprs(&self) -> Vec<Arc<dyn PhysicalExpr>> {
self.input
.schema()
.fields()
.iter()
.enumerate()
.filter_map(|(idx, field)| {
if idx == self.le_column_index
|| idx == self.field_column_index
|| idx == self.ts_column_index
{
None
} else {
Some(Arc::new(PhyColumn::new(field.name(), idx)) as _)
}
})
.collect()
}
}
impl DisplayAs for HistogramFoldExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match t {
@@ -1067,83 +1051,9 @@ mod test {
quantile: f64,
ts_column_index: usize,
) -> Arc<HistogramFoldExec> {
let input: Arc<dyn ExecutionPlan> = Arc::new(DataSourceExec::new(Arc::new(
let memory_exec = Arc::new(DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(&[batches], schema.clone(), None).unwrap(),
)));
let output_schema: SchemaRef = Arc::new(
HistogramFold::convert_schema(&Arc::new(input.schema().to_dfschema().unwrap()), "le")
.unwrap()
.as_arrow()
.clone(),
);
let (tag_columns, partition_exprs, properties) =
build_test_plan_properties(&input, output_schema.clone(), ts_column_index);
Arc::new(HistogramFoldExec {
le_column_index: 1,
field_column_index: 2,
quantile,
ts_column_index,
input,
output_schema,
tag_columns,
partition_exprs,
metric: ExecutionPlanMetricsSet::new(),
properties,
})
}
type PlanPropsResult = (
Vec<Arc<dyn PhysicalExpr>>,
Vec<Arc<dyn PhysicalExpr>>,
PlanProperties,
);
fn build_test_plan_properties(
input: &Arc<dyn ExecutionPlan>,
output_schema: SchemaRef,
ts_column_index: usize,
) -> PlanPropsResult {
let tag_columns = input
.schema()
.fields()
.iter()
.enumerate()
.filter_map(|(idx, field)| {
if idx == 1 || idx == 2 || idx == ts_column_index {
None
} else {
Some(Arc::new(PhyColumn::new(field.name(), idx)) as _)
}
})
.collect::<Vec<_>>();
let partition_exprs = if tag_columns.is_empty() {
vec![Arc::new(PhyColumn::new(
input.schema().field(ts_column_index).name(),
ts_column_index,
)) as _]
} else {
tag_columns.clone()
};
let properties = PlanProperties::new(
EquivalenceProperties::new(output_schema.clone()),
Partitioning::Hash(
partition_exprs.clone(),
input.output_partitioning().partition_count(),
),
EmissionType::Incremental,
Boundedness::Bounded,
);
(tag_columns, partition_exprs, properties)
}
#[tokio::test]
async fn fold_overall() {
let memory_exec: Arc<dyn ExecutionPlan> = Arc::new(prepare_test_data());
let output_schema: SchemaRef = Arc::new(
HistogramFold::convert_schema(
&Arc::new(memory_exec.schema().to_dfschema().unwrap()),
@@ -1153,17 +1063,50 @@ mod test {
.as_arrow()
.clone(),
);
let (tag_columns, partition_exprs, properties) =
build_test_plan_properties(&memory_exec, output_schema.clone(), 0);
let properties = PlanProperties::new(
EquivalenceProperties::new(output_schema.clone()),
Partitioning::UnknownPartitioning(1),
EmissionType::Incremental,
Boundedness::Bounded,
);
Arc::new(HistogramFoldExec {
le_column_index: 1,
field_column_index: 2,
quantile,
ts_column_index,
input: memory_exec,
output_schema,
metric: ExecutionPlanMetricsSet::new(),
properties,
})
}
#[tokio::test]
async fn fold_overall() {
let memory_exec = Arc::new(prepare_test_data());
let output_schema: SchemaRef = Arc::new(
HistogramFold::convert_schema(
&Arc::new(memory_exec.schema().to_dfschema().unwrap()),
"le",
)
.unwrap()
.as_arrow()
.clone(),
);
let properties = PlanProperties::new(
EquivalenceProperties::new(output_schema.clone()),
Partitioning::UnknownPartitioning(1),
EmissionType::Incremental,
Boundedness::Bounded,
);
let fold_exec = Arc::new(HistogramFoldExec {
le_column_index: 1,
field_column_index: 2,
quantile: 0.4,
ts_column_index: 0,
ts_column_index: 9999, // not exist but doesn't matter
input: memory_exec,
output_schema,
tag_columns,
partition_exprs,
metric: ExecutionPlanMetricsSet::new(),
properties,
});

View File

@@ -365,7 +365,7 @@ impl SeriesNormalizeStream {
Arc::new(ts_column.clone()) as _
} else {
Arc::new(TimestampMillisecondArray::from_iter(
ts_column.iter().map(|ts| ts.map(|ts| ts + self.offset)),
ts_column.iter().map(|ts| ts.map(|ts| ts - self.offset)),
))
};
let mut columns = input.columns().to_vec();
@@ -518,11 +518,11 @@ mod test {
"+---------------------+--------+------+\
\n| timestamp | value | path |\
\n+---------------------+--------+------+\
\n| 1970-01-01T00:01:01 | 0.0 | foo |\
\n| 1970-01-01T00:02:01 | 1.0 | foo |\
\n| 1970-01-01T00:00:01 | 10.0 | foo |\
\n| 1970-01-01T00:00:31 | 100.0 | foo |\
\n| 1970-01-01T00:01:31 | 1000.0 | foo |\
\n| 1970-01-01T00:00:59 | 0.0 | foo |\
\n| 1970-01-01T00:01:59 | 1.0 | foo |\
\n| 1969-12-31T23:59:59 | 10.0 | foo |\
\n| 1970-01-01T00:00:29 | 100.0 | foo |\
\n| 1970-01-01T00:01:29 | 1000.0 | foo |\
\n+---------------------+--------+------+",
);

View File

@@ -41,8 +41,6 @@ use snafu::{Location, ResultExt};
use crate::error::{CatalogSnafu, Result};
use crate::query_engine::{DefaultPlanDecoder, QueryEngineState};
mod function_alias;
pub struct DfContextProviderAdapter {
engine_state: Arc<QueryEngineState>,
session_state: SessionState,
@@ -149,17 +147,7 @@ impl ContextProvider for DfContextProviderAdapter {
fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
self.engine_state.scalar_function(name).map_or_else(
|| {
self.session_state
.scalar_functions()
.get(name)
.cloned()
.or_else(|| {
function_alias::resolve_scalar(name).and_then(|name| {
self.session_state.scalar_functions().get(name).cloned()
})
})
},
|| self.session_state.scalar_functions().get(name).cloned(),
|func| {
Some(Arc::new(func.provide(FunctionContext {
query_ctx: self.query_ctx.clone(),
@@ -171,17 +159,7 @@ impl ContextProvider for DfContextProviderAdapter {
fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>> {
self.engine_state.aggr_function(name).map_or_else(
|| {
self.session_state
.aggregate_functions()
.get(name)
.cloned()
.or_else(|| {
function_alias::resolve_aggregate(name).and_then(|name| {
self.session_state.aggregate_functions().get(name).cloned()
})
})
},
|| self.session_state.aggregate_functions().get(name).cloned(),
|func| Some(Arc::new(func)),
)
}
@@ -215,14 +193,12 @@ impl ContextProvider for DfContextProviderAdapter {
fn udf_names(&self) -> Vec<String> {
let mut names = self.engine_state.scalar_names();
names.extend(self.session_state.scalar_functions().keys().cloned());
names.extend(function_alias::scalar_alias_names().map(|name| name.to_string()));
names
}
fn udaf_names(&self) -> Vec<String> {
let mut names = self.engine_state.aggr_names();
names.extend(self.session_state.aggregate_functions().keys().cloned());
names.extend(function_alias::aggregate_alias_names().map(|name| name.to_string()));
names
}
@@ -257,14 +233,9 @@ impl ContextProvider for DfContextProviderAdapter {
.table_functions()
.get(name)
.cloned()
.or_else(|| {
function_alias::resolve_scalar(name)
.and_then(|alias| self.session_state.table_functions().get(alias).cloned())
});
let tbl_func = tbl_func.ok_or_else(|| {
DataFusionError::Plan(format!("table function '{name}' not found"))
})?;
.ok_or_else(|| {
DataFusionError::Plan(format!("table function '{name}' not found"))
})?;
let provider = tbl_func.create_table_provider(&args)?;
Ok(provider_as_source(provider))

View File

@@ -1,86 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use once_cell::sync::Lazy;
const SCALAR_ALIASES: &[(&str, &str)] = &[
// SQL compat aliases.
("ucase", "upper"),
("lcase", "lower"),
("ceiling", "ceil"),
("mid", "substr"),
// MySQL's RAND([seed]) accepts an optional seed argument, while DataFusion's `random()`
// does not. We alias the name for `rand()` compatibility, and `rand(seed)` will error
// due to mismatched arity.
("rand", "random"),
];
const AGGREGATE_ALIASES: &[(&str, &str)] = &[
// MySQL compat aliases that don't override existing DataFusion aggregate names.
//
// NOTE: We intentionally do NOT alias `stddev` here, because DataFusion defines `stddev`
// as sample standard deviation while MySQL's `STDDEV` is population standard deviation.
("std", "stddev_pop"),
("variance", "var_pop"),
];
static SCALAR_FUNCTION_ALIAS: Lazy<HashMap<&'static str, &'static str>> =
Lazy::new(|| SCALAR_ALIASES.iter().copied().collect());
static AGGREGATE_FUNCTION_ALIAS: Lazy<HashMap<&'static str, &'static str>> =
Lazy::new(|| AGGREGATE_ALIASES.iter().copied().collect());
pub fn resolve_scalar(name: &str) -> Option<&'static str> {
let name = name.to_ascii_lowercase();
SCALAR_FUNCTION_ALIAS.get(name.as_str()).copied()
}
pub fn resolve_aggregate(name: &str) -> Option<&'static str> {
let name = name.to_ascii_lowercase();
AGGREGATE_FUNCTION_ALIAS.get(name.as_str()).copied()
}
pub fn scalar_alias_names() -> impl Iterator<Item = &'static str> {
SCALAR_ALIASES.iter().map(|(name, _)| *name)
}
pub fn aggregate_alias_names() -> impl Iterator<Item = &'static str> {
AGGREGATE_ALIASES.iter().map(|(name, _)| *name)
}
#[cfg(test)]
mod tests {
use super::{resolve_aggregate, resolve_scalar};
#[test]
fn resolves_scalar_aliases_case_insensitive() {
assert_eq!(resolve_scalar("ucase"), Some("upper"));
assert_eq!(resolve_scalar("UCASE"), Some("upper"));
assert_eq!(resolve_scalar("lcase"), Some("lower"));
assert_eq!(resolve_scalar("ceiling"), Some("ceil"));
assert_eq!(resolve_scalar("MID"), Some("substr"));
assert_eq!(resolve_scalar("RAND"), Some("random"));
assert_eq!(resolve_scalar("not_a_real_alias"), None);
}
#[test]
fn resolves_aggregate_aliases_case_insensitive() {
assert_eq!(resolve_aggregate("std"), Some("stddev_pop"));
assert_eq!(resolve_aggregate("variance"), Some("var_pop"));
assert_eq!(resolve_aggregate("STDDEV"), None);
assert_eq!(resolve_aggregate("not_a_real_alias"), None);
}
}

View File

@@ -87,19 +87,11 @@ impl ParallelizeScan {
&& order_expr.options.descending
{
for ranges in partition_ranges.iter_mut() {
// Primary: end descending (larger end first)
// Secondary: start descending (shorter range first when ends are equal)
ranges.sort_by(|a, b| {
b.end.cmp(&a.end).then_with(|| b.start.cmp(&a.start))
});
ranges.sort_by(|a, b| b.end.cmp(&a.end));
}
} else {
for ranges in partition_ranges.iter_mut() {
// Primary: start ascending (smaller start first)
// Secondary: end ascending (shorter range first when starts are equal)
ranges.sort_by(|a, b| {
a.start.cmp(&b.start).then_with(|| a.end.cmp(&b.end))
});
ranges.sort_by(|a, b| a.start.cmp(&b.start));
}
}

View File

@@ -28,9 +28,9 @@ use store_api::metadata::{
ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
};
use store_api::region_engine::{
CopyRegionFromRequest, CopyRegionFromResponse, RegionEngine, RegionManifestInfo, RegionRole,
RegionScannerRef, RegionStatistic, RemapManifestsRequest, RemapManifestsResponse,
SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse,
RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic,
RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse,
SettableRegionRoleState, SyncManifestResponse,
};
use store_api::region_request::RegionRequest;
use store_api::storage::{ConcreteDataType, RegionId, ScanRequest, SequenceNumber};
@@ -125,14 +125,6 @@ impl RegionEngine for MetaRegionEngine {
unimplemented!()
}
async fn copy_region_from(
&self,
_region_id: RegionId,
_request: CopyRegionFromRequest,
) -> Result<CopyRegionFromResponse, BoxedError> {
unimplemented!()
}
fn role(&self, _region_id: RegionId) -> Option<RegionRole> {
None
}

View File

@@ -110,12 +110,12 @@ impl WindowedSortPhysicalRule {
{
sort_input
} else {
Arc::new(PartSortExec::try_new(
Arc::new(PartSortExec::new(
first_sort_expr.clone(),
sort_exec.fetch(),
scanner_info.partition_ranges.clone(),
sort_input,
)?)
))
};
let windowed_sort_exec = WindowedSortExec::try_new(

File diff suppressed because it is too large Load Diff

View File

@@ -1414,14 +1414,14 @@ impl PromPlanner {
.clone()
.gt_eq(DfExpr::Literal(
ScalarValue::TimestampMillisecond(
Some(self.ctx.start - offset_duration - self.ctx.lookback_delta - range),
Some(self.ctx.start + offset_duration - self.ctx.lookback_delta - range),
None,
),
None,
))
.and(time_index_expr.lt_eq(DfExpr::Literal(
ScalarValue::TimestampMillisecond(
Some(self.ctx.end - offset_duration + self.ctx.lookback_delta),
Some(self.ctx.end + offset_duration + self.ctx.lookback_delta),
None,
),
None,
@@ -1437,14 +1437,14 @@ impl PromPlanner {
.clone()
.gt_eq(DfExpr::Literal(
ScalarValue::TimestampMillisecond(
Some(timestamp - offset_duration - lookback_delta - range),
Some(timestamp + offset_duration - lookback_delta - range),
None,
),
None,
))
.and(time_index_expr.clone().lt_eq(DfExpr::Literal(
ScalarValue::TimestampMillisecond(
Some(timestamp - offset_duration + lookback_delta),
Some(timestamp + offset_duration + lookback_delta),
None,
),
None,

View File

@@ -84,31 +84,23 @@ pub struct WindowedSortExec {
properties: PlanProperties,
}
/// Checks that partition ranges are sorted correctly for the given sort direction.
/// - Descending: sorted by (end DESC, start DESC) - shorter ranges first when ends are equal
/// - Ascending: sorted by (start ASC, end ASC) - shorter ranges first when starts are equal
pub fn check_partition_range_monotonicity(
fn check_partition_range_monotonicity(
ranges: &[Vec<PartitionRange>],
descending: bool,
) -> Result<()> {
let is_valid = ranges.iter().all(|r| {
if descending {
// Primary: end descending, Secondary: start descending (shorter range first)
r.windows(2)
.all(|w| w[0].end > w[1].end || (w[0].end == w[1].end && w[0].start >= w[1].start))
r.windows(2).all(|w| w[0].end >= w[1].end)
} else {
// Primary: start ascending, Secondary: end ascending (shorter range first)
r.windows(2).all(|w| {
w[0].start < w[1].start || (w[0].start == w[1].start && w[0].end <= w[1].end)
})
r.windows(2).all(|w| w[0].start <= w[1].start)
}
});
if !is_valid {
let msg = if descending {
"Input `PartitionRange`s are not sorted by (end DESC, start DESC)"
"Input `PartitionRange`s's upper bound is not monotonic non-increase"
} else {
"Input `PartitionRange`s are not sorted by (start ASC, end ASC)"
"Input `PartitionRange`s's lower bound is not monotonic non-decrease"
};
let plain_error = PlainError::new(msg.to_string(), StatusCode::Unexpected);
Err(BoxedError::new(plain_error)).context(QueryExecutionSnafu {})
@@ -2837,9 +2829,8 @@ mod test {
// generate input data
for part_id in 0..rng.usize(0..part_cnt_bound) {
let (start, end) = if descending {
// Use 1..=range_offset_bound to ensure strictly decreasing end values
let end = bound_val
.map(|i| i - rng.i64(1..=range_offset_bound))
.map(|i| i - rng.i64(0..range_offset_bound))
.unwrap_or_else(|| rng.i64(..));
bound_val = Some(end);
let start = end - rng.i64(1..range_size_bound);
@@ -2847,9 +2838,8 @@ mod test {
let end = Timestamp::new(end, unit.into());
(start, end)
} else {
// Use 1..=range_offset_bound to ensure strictly increasing start values
let start = bound_val
.map(|i| i + rng.i64(1..=range_offset_bound))
.map(|i| i + rng.i64(0..range_offset_bound))
.unwrap_or_else(|| rng.i64(..));
bound_val = Some(start);
let end = start + rng.i64(1..range_size_bound);

View File

@@ -23,9 +23,8 @@ use auth::UserProviderRef;
use axum::extract::Request;
use axum::response::IntoResponse;
use axum::routing::Route;
use common_grpc::error::{InvalidConfigFilePathSnafu, Result};
use common_grpc::error::{Error, InvalidConfigFilePathSnafu, Result};
use common_runtime::Runtime;
use common_telemetry::warn;
use otel_arrow_rust::proto::opentelemetry::arrow::v1::arrow_metrics_service_server::ArrowMetricsServiceServer;
use snafu::ResultExt;
use tokio::sync::Mutex;
@@ -196,7 +195,10 @@ impl GrpcServerBuilder {
// tonic does not support watching for tls config changes
// so we don't support it either for now
if tls_option.watch {
warn!("Certificates watch and reloading for gRPC is NOT supported at the moment");
return Err(Error::NotSupported {
feat: "Certificates watch and reloading for gRPC is not supported at the moment"
.to_string(),
});
}
self.tls_config = if tls_option.should_force_tls() {
let cert = std::fs::read_to_string(tls_option.cert_path)

View File

@@ -31,7 +31,6 @@ use common_grpc::flight::do_put::{DoPutMetadata, DoPutResponse};
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
use common_query::{Output, OutputData};
use common_recordbatch::DfRecordBatch;
use common_telemetry::debug;
use common_telemetry::tracing::info_span;
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use datatypes::arrow::datatypes::SchemaRef;
@@ -234,7 +233,7 @@ impl FlightCraft for GreptimeRequestHandler {
limiter,
)
.await?;
// Ack immediately when stream is created successfully (in Init state)
// Ack to the first schema message when we successfully built the stream.
let _ = tx.send(Ok(DoPutResponse::new(0, 0, 0.0))).await;
self.put_record_batches(stream, tx, query_ctx).await;
@@ -306,83 +305,98 @@ impl PutRecordBatchRequest {
pub struct PutRecordBatchRequestStream {
flight_data_stream: Streaming<FlightData>,
catalog: String,
schema_name: String,
table_name: TableName,
schema: SchemaRef,
schema_bytes: Bytes,
decoder: FlightDecoder,
limiter: Option<RequestMemoryLimiter>,
// Client now lazily sends schema data so we cannot eagerly wait for it.
// Instead, we need to decode while receiving record batches.
state: StreamState,
}
enum StreamState {
Init,
Ready {
table_name: TableName,
schema: SchemaRef,
schema_bytes: Bytes,
decoder: FlightDecoder,
},
}
impl PutRecordBatchRequestStream {
/// Creates a new `PutRecordBatchRequestStream` in Init state.
/// The stream will transition to Ready state when it receives the schema message.
/// Creates a new `PutRecordBatchRequestStream` by waiting for the first message,
/// extracting the table name from the flight descriptor, and decoding the schema.
pub async fn new(
flight_data_stream: Streaming<FlightData>,
mut flight_data_stream: Streaming<FlightData>,
catalog: String,
schema: String,
limiter: Option<RequestMemoryLimiter>,
) -> TonicResult<Self> {
fn extract_table_name(mut descriptor: FlightDescriptor) -> Result<String> {
ensure!(
descriptor.r#type == arrow_flight::flight_descriptor::DescriptorType::Path as i32,
InvalidParameterSnafu {
reason: "expect FlightDescriptor::type == 'Path' only",
}
);
ensure!(
descriptor.path.len() == 1,
InvalidParameterSnafu {
reason: "expect FlightDescriptor::path has only one table name",
}
);
Ok(descriptor.path.remove(0))
}
// Wait for the first message which must be a Schema message
let first_message = flight_data_stream.next().await.ok_or_else(|| {
Status::failed_precondition("flight data stream ended unexpectedly")
})??;
let flight_descriptor = first_message
.flight_descriptor
.as_ref()
.ok_or_else(|| {
Status::failed_precondition("table to put is not found in flight descriptor")
})?
.clone();
let table_name_str = extract_table_name(flight_descriptor)
.map_err(|e| Status::invalid_argument(e.to_string()))?;
let table_name = TableName::new(catalog, schema, table_name_str);
// Decode the first message as schema
let mut decoder = FlightDecoder::default();
let schema_message = decoder
.try_decode(&first_message)
.map_err(|e| Status::invalid_argument(format!("Failed to decode schema: {}", e)))?;
let (schema, schema_bytes) = match schema_message {
Some(FlightMessage::Schema(schema)) => {
let schema_bytes = decoder.schema_bytes().ok_or_else(|| {
Status::internal("decoder should have schema bytes after decoding schema")
})?;
(schema, schema_bytes)
}
_ => {
return Err(Status::failed_precondition(
"first message must be a Schema message",
));
}
};
Ok(Self {
flight_data_stream,
catalog,
schema_name: schema,
table_name,
schema,
schema_bytes,
decoder,
limiter,
state: StreamState::Init,
})
}
/// Returns the table name extracted from the flight descriptor.
/// Returns None if the stream is still in Init state.
pub fn table_name(&self) -> Option<&TableName> {
match &self.state {
StreamState::Init => None,
StreamState::Ready { table_name, .. } => Some(table_name),
}
pub fn table_name(&self) -> &TableName {
&self.table_name
}
/// Returns the Arrow schema decoded from the first flight message.
/// Returns None if the stream is still in Init state.
pub fn schema(&self) -> Option<&SchemaRef> {
match &self.state {
StreamState::Init => None,
StreamState::Ready { schema, .. } => Some(schema),
}
pub fn schema(&self) -> &SchemaRef {
&self.schema
}
/// Returns the raw schema bytes in IPC format.
/// Returns None if the stream is still in Init state.
pub fn schema_bytes(&self) -> Option<&Bytes> {
match &self.state {
StreamState::Init => None,
StreamState::Ready { schema_bytes, .. } => Some(schema_bytes),
}
}
fn extract_table_name(mut descriptor: FlightDescriptor) -> Result<String> {
ensure!(
descriptor.r#type == arrow_flight::flight_descriptor::DescriptorType::Path as i32,
InvalidParameterSnafu {
reason: "expect FlightDescriptor::type == 'Path' only",
}
);
ensure!(
descriptor.path.len() == 1,
InvalidParameterSnafu {
reason: "expect FlightDescriptor::path has only one table name",
}
);
Ok(descriptor.path.remove(0))
pub fn schema_bytes(&self) -> &Bytes {
&self.schema_bytes
}
}
@@ -395,114 +409,48 @@ impl Stream for PutRecordBatchRequestStream {
match poll {
Some(Ok(flight_data)) => {
// Clone limiter once to avoid borrowing issues
let limiter = self.limiter.clone();
match &mut self.state {
StreamState::Init => {
// First message - expecting schema
let flight_descriptor = match flight_data.flight_descriptor.as_ref() {
Some(descriptor) => descriptor.clone(),
None => {
return Poll::Ready(Some(Err(Status::failed_precondition(
"table to put is not found in flight descriptor",
))));
}
};
let table_name_str = match Self::extract_table_name(flight_descriptor) {
Ok(name) => name,
Err(e) => {
return Poll::Ready(Some(Err(Status::invalid_argument(
e.to_string(),
))));
}
};
let table_name = TableName::new(
self.catalog.clone(),
self.schema_name.clone(),
table_name_str,
);
// Decode the schema
let mut decoder = FlightDecoder::default();
let schema_message = decoder.try_decode(&flight_data).map_err(|e| {
Status::invalid_argument(format!("Failed to decode schema: {}", e))
})?;
match schema_message {
Some(FlightMessage::Schema(schema)) => {
let schema_bytes = decoder.schema_bytes().ok_or_else(|| {
Status::internal(
"decoder should have schema bytes after decoding schema",
)
})?;
// Transition to Ready state with all necessary data
self.state = StreamState::Ready {
table_name,
schema,
schema_bytes,
decoder,
};
// Continue to next iteration to process RecordBatch messages
continue;
}
_ => {
return Poll::Ready(Some(Err(Status::failed_precondition(
"first message must be a Schema message",
))));
}
}
// Extract request_id and body_size from FlightData before decoding
let request_id = if !flight_data.app_metadata.is_empty() {
match serde_json::from_slice::<DoPutMetadata>(&flight_data.app_metadata) {
Ok(metadata) => metadata.request_id(),
Err(_) => 0,
}
StreamState::Ready {
table_name,
schema: _,
schema_bytes,
decoder,
} => {
// Extract request_id and body_size from FlightData before decoding
let request_id = if !flight_data.app_metadata.is_empty() {
serde_json::from_slice::<DoPutMetadata>(&flight_data.app_metadata)
.map(|meta| meta.request_id())
.unwrap_or_default()
} else {
0
};
} else {
0
};
// Decode FlightData to RecordBatch
match decoder.try_decode(&flight_data) {
Ok(Some(FlightMessage::RecordBatch(record_batch))) => {
let table_name = table_name.clone();
let schema_bytes = schema_bytes.clone();
return Poll::Ready(Some(
PutRecordBatchRequest::try_new(
table_name,
record_batch,
request_id,
schema_bytes,
flight_data,
limiter.as_ref(),
)
.map_err(|e| Status::invalid_argument(e.to_string())),
));
}
Ok(Some(other)) => {
debug!("Unexpected flight message: {:?}", other);
return Poll::Ready(Some(Err(Status::invalid_argument(
"Expected RecordBatch message, got other message type",
))));
}
Ok(None) => {
// Dictionary batch - processed internally by decoder, continue polling
continue;
}
Err(e) => {
return Poll::Ready(Some(Err(Status::invalid_argument(
format!("Failed to decode RecordBatch: {}", e),
))));
}
}
// Decode FlightData to RecordBatch
match self.decoder.try_decode(&flight_data) {
Ok(Some(FlightMessage::RecordBatch(record_batch))) => {
let limiter = self.limiter.clone();
let table_name = self.table_name.clone();
let schema_bytes = self.schema_bytes.clone();
return Poll::Ready(Some(
PutRecordBatchRequest::try_new(
table_name,
record_batch,
request_id,
schema_bytes,
flight_data,
limiter.as_ref(),
)
.map_err(|e| Status::invalid_argument(e.to_string())),
));
}
Ok(Some(_)) => {
return Poll::Ready(Some(Err(Status::invalid_argument(
"Expected RecordBatch message, got other message type",
))));
}
Ok(None) => {
// Dictionary batch - processed internally by decoder, continue polling
continue;
}
Err(e) => {
return Poll::Ready(Some(Err(Status::invalid_argument(format!(
"Failed to decode RecordBatch: {}",
e
)))));
}
}
}

View File

@@ -91,47 +91,6 @@ impl TlsOption {
tls_option
}
/// Validates the TLS configuration.
///
/// Returns an error if:
/// - TLS mode is enabled (not `Disable`) but `cert_path` or `key_path` is empty
/// - TLS mode is `VerifyCa` or `VerifyFull` but `ca_cert_path` is empty
pub fn validate(&self) -> Result<()> {
if self.mode == TlsMode::Disable {
return Ok(());
}
// When TLS is enabled, cert_path and key_path are required
if self.cert_path.is_empty() {
return Err(crate::error::Error::Internal {
err_msg: format!(
"TLS mode is {:?} but cert_path is not configured",
self.mode
),
});
}
if self.key_path.is_empty() {
return Err(crate::error::Error::Internal {
err_msg: format!("TLS mode is {:?} but key_path is not configured", self.mode),
});
}
// For VerifyCa and VerifyFull modes, ca_cert_path is required for client verification
if matches!(self.mode, TlsMode::VerifyCa | TlsMode::VerifyFull)
&& self.ca_cert_path.is_empty()
{
return Err(crate::error::Error::Internal {
err_msg: format!(
"TLS mode is {:?} but ca_cert_path is not configured",
self.mode
),
});
}
Ok(())
}
pub fn setup(&self) -> Result<Option<ServerConfig>> {
if let TlsMode::Disable = self.mode {
return Ok(None);
@@ -188,13 +147,6 @@ impl TlsOption {
}
}
pub fn merge_tls_option(main: &TlsOption, other: TlsOption) -> TlsOption {
if other.mode != TlsMode::Disable && other.validate().is_ok() {
return other;
}
main.clone()
}
impl TlsConfigLoader<Arc<ServerConfig>> for TlsOption {
type Error = crate::error::Error;
@@ -231,118 +183,6 @@ mod tests {
use crate::install_ring_crypto_provider;
use crate::tls::TlsMode::Disable;
#[test]
fn test_validate_disable_mode() {
let tls = TlsOption {
mode: TlsMode::Disable,
cert_path: String::new(),
key_path: String::new(),
ca_cert_path: String::new(),
watch: false,
};
assert!(tls.validate().is_ok());
}
#[test]
fn test_validate_missing_cert_path() {
let tls = TlsOption {
mode: TlsMode::Require,
cert_path: String::new(),
key_path: "/path/to/key".to_string(),
ca_cert_path: String::new(),
watch: false,
};
let err = tls.validate().unwrap_err();
assert!(err.to_string().contains("cert_path"));
}
#[test]
fn test_validate_missing_key_path() {
let tls = TlsOption {
mode: TlsMode::Require,
cert_path: "/path/to/cert".to_string(),
key_path: String::new(),
ca_cert_path: String::new(),
watch: false,
};
let err = tls.validate().unwrap_err();
assert!(err.to_string().contains("key_path"));
}
#[test]
fn test_validate_require_mode_success() {
let tls = TlsOption {
mode: TlsMode::Require,
cert_path: "/path/to/cert".to_string(),
key_path: "/path/to/key".to_string(),
ca_cert_path: String::new(),
watch: false,
};
assert!(tls.validate().is_ok());
}
#[test]
fn test_validate_verify_ca_missing_ca_cert() {
let tls = TlsOption {
mode: TlsMode::VerifyCa,
cert_path: "/path/to/cert".to_string(),
key_path: "/path/to/key".to_string(),
ca_cert_path: String::new(),
watch: false,
};
let err = tls.validate().unwrap_err();
assert!(err.to_string().contains("ca_cert_path"));
}
#[test]
fn test_validate_verify_full_missing_ca_cert() {
let tls = TlsOption {
mode: TlsMode::VerifyFull,
cert_path: "/path/to/cert".to_string(),
key_path: "/path/to/key".to_string(),
ca_cert_path: String::new(),
watch: false,
};
let err = tls.validate().unwrap_err();
assert!(err.to_string().contains("ca_cert_path"));
}
#[test]
fn test_validate_verify_ca_success() {
let tls = TlsOption {
mode: TlsMode::VerifyCa,
cert_path: "/path/to/cert".to_string(),
key_path: "/path/to/key".to_string(),
ca_cert_path: "/path/to/ca".to_string(),
watch: false,
};
assert!(tls.validate().is_ok());
}
#[test]
fn test_validate_verify_full_success() {
let tls = TlsOption {
mode: TlsMode::VerifyFull,
cert_path: "/path/to/cert".to_string(),
key_path: "/path/to/key".to_string(),
ca_cert_path: "/path/to/ca".to_string(),
watch: false,
};
assert!(tls.validate().is_ok());
}
#[test]
fn test_validate_prefer_mode() {
let tls = TlsOption {
mode: TlsMode::Prefer,
cert_path: "/path/to/cert".to_string(),
key_path: "/path/to/key".to_string(),
ca_cert_path: String::new(),
watch: false,
};
assert!(tls.validate().is_ok());
}
#[test]
fn test_new_tls_option() {
assert_eq!(

Some files were not shown because too many files have changed in this diff Show More