Compare commits

...

8 Commits

Author SHA1 Message Date
dennis zhuang
03a28320d6 feat!: enable read cache and write cache when using remote object stores (#5093)
* feat: enable read cache and write cache when using remote object stores

* feat: make read cache be aware of remote store names

* chore: docs

* chore: apply review suggestions

* chore: trim write cache path

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
2024-12-10 04:03:44 +00:00
Lei, HUANG
ce86ba3425 chore: Reduce FETCH_OPTION_TIMEOUT from 10 to 3 seconds in config.rs (#5117)
Reduce FETCH_OPTION_TIMEOUT from 10 to 3 seconds in config.rs
2024-12-09 13:39:18 +00:00
Yingwen
2fcb95f50a fix!: fix regression caused by unbalanced partitions and splitting ranges (#5090)
* feat: assign partition ranges by rows

* feat: balance partition rows

* feat: get uppoer bound for part nums

* feat: only split in non-compaction seq scan

* fix: parallel scan on multiple sources

* fix: can split check

* feat: scanner prepare by request

* feat: remove scan_parallelism

* docs: upate docs

* chore: update comment

* style: fix clippy

* feat: skip merge and dedup if there is only one source

* chore: Revert "feat: skip merge and dedup if there is only one source"

Since memtable won't do dedup jobs

This reverts commit 2fc7a54b11.

* test: avoid compaction in sqlness window sort test

* chore: do not create semaphore if num partitions is enough

* chore: more assertions

* chore: fix typo

* fix: compaction flag not set

* chore: address review comments
2024-12-09 12:50:57 +00:00
ZonaHe
1b642ea6a9 feat: update dashboard to v0.7.1 (#5123)
Co-authored-by: sunchanglong <sunchanglong@users.noreply.github.com>
2024-12-09 10:27:35 +00:00
Weny Xu
b35221ccb6 ci: set meta replicas to 1 (#5111) 2024-12-09 07:22:47 +00:00
Zhenchi
bac7e7bac9 refactor: extract implicit conversion helper functions of vector type (#5118)
refactor: extract implicit conversion helper functions of vector

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2024-12-09 07:19:00 +00:00
dennis zhuang
903da8f4cb fix: show create table doesn't quote option keys which contains dot (#5108)
* fix: show create table doesn't quote option keys which contains dot

* fix: compile
2024-12-09 03:27:46 +00:00
Ning Sun
c0f498b00c feat: update pgwire to 0.28 (#5113)
* feat: update pgwire to 0.28

* test: update tests
2024-12-09 03:12:11 +00:00
44 changed files with 1143 additions and 711 deletions

View File

@@ -8,7 +8,7 @@ inputs:
default: 2
description: "Number of Datanode replicas"
meta-replicas:
default: 3
default: 1
description: "Number of Metasrv replicas"
image-registry:
default: "docker.io"
@@ -58,7 +58,7 @@ runs:
--set image.tag=${{ inputs.image-tag }} \
--set base.podTemplate.main.resources.requests.cpu=50m \
--set base.podTemplate.main.resources.requests.memory=256Mi \
--set base.podTemplate.main.resources.limits.cpu=1000m \
--set base.podTemplate.main.resources.limits.cpu=2000m \
--set base.podTemplate.main.resources.limits.memory=2Gi \
--set frontend.replicas=${{ inputs.frontend-replicas }} \
--set datanode.replicas=${{ inputs.datanode-replicas }} \

270
Cargo.lock generated
View File

@@ -637,7 +637,7 @@ checksum = "3b43422f69d8ff38f95f1b2bb76517c91589a924d1559a0e935d7c8ce0274c11"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -659,7 +659,7 @@ checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -676,7 +676,7 @@ checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -687,7 +687,7 @@ checksum = "20235b6899dd1cb74a9afac0abf5b4a20c0e500dd6537280f4096e1b9f14da20"
dependencies = [
"async-fs",
"futures-lite",
"thiserror",
"thiserror 1.0.64",
]
[[package]]
@@ -774,7 +774,7 @@ checksum = "3c87f3f15e7794432337fc718554eaa4dc8f04c9677a950ffe366f20a162ae42"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -873,7 +873,7 @@ dependencies = [
"heck 0.4.1",
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -1012,7 +1012,7 @@ dependencies = [
"regex",
"rustc-hash 1.1.0",
"shlex",
"syn 2.0.79",
"syn 2.0.90",
"which",
]
@@ -1031,7 +1031,7 @@ dependencies = [
"regex",
"rustc-hash 1.1.0",
"shlex",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -1155,7 +1155,7 @@ dependencies = [
"proc-macro-crate 3.2.0",
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
"syn_derive",
]
@@ -1694,7 +1694,7 @@ dependencies = [
"heck 0.5.0",
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -2189,7 +2189,7 @@ dependencies = [
"quote",
"snafu 0.8.5",
"static_assertions",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -2927,7 +2927,7 @@ dependencies = [
"proc-macro2",
"quote",
"strsim 0.11.1",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -2949,7 +2949,7 @@ checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806"
dependencies = [
"darling_core 0.20.10",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -3443,7 +3443,7 @@ checksum = "2cdc8d50f426189eef89dac62fabfa0abb27d5cc008f25bf4156a0203325becc"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -3454,7 +3454,7 @@ checksum = "67e77553c4162a157adbf834ebae5b415acbecbeafc7a74b0e886657506a7611"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -3517,7 +3517,7 @@ dependencies = [
"darling 0.20.10",
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -3547,7 +3547,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4abae7035bf79b9877b779505d8cf3749285b80c43941eda66604841889451dc"
dependencies = [
"derive_builder_core 0.20.1",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -3567,7 +3567,7 @@ checksum = "cb7330aeadfbe296029522e6c40f315320aba36fc43a5b3632f3795348f3bd22"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
"unicode-xid",
]
@@ -3579,7 +3579,7 @@ checksum = "65f152f4b8559c4da5d574bafc7af85454d706b4c5fe8b530d508cacbb6807ea"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -3734,7 +3734,7 @@ dependencies = [
"chrono",
"rust_decimal",
"serde",
"thiserror",
"thiserror 1.0.64",
"time",
"winnow 0.6.20",
]
@@ -3800,7 +3800,7 @@ dependencies = [
"heck 0.5.0",
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -3812,7 +3812,7 @@ dependencies = [
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -4267,7 +4267,7 @@ checksum = "e99b8b3c28ae0e84b604c75f721c21dc77afb3706076af5e8216d15fd1deaae3"
dependencies = [
"frunk_proc_macro_helpers",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -4279,7 +4279,7 @@ dependencies = [
"frunk_core",
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -4291,7 +4291,7 @@ dependencies = [
"frunk_core",
"frunk_proc_macro_helpers",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -4421,7 +4421,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -5028,7 +5028,7 @@ dependencies = [
"proc-macro-crate 1.3.1",
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -5043,7 +5043,7 @@ dependencies = [
"rust-sitter",
"rust-sitter-tool",
"slotmap",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -5062,7 +5062,7 @@ dependencies = [
"serde",
"serde_json",
"slotmap",
"syn 2.0.79",
"syn 2.0.90",
"webbrowser",
]
@@ -5076,7 +5076,7 @@ dependencies = [
"proc-macro-crate 1.3.1",
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -5601,7 +5601,7 @@ dependencies = [
"combine",
"jni-sys",
"log",
"thiserror",
"thiserror 1.0.64",
"walkdir",
"windows-sys 0.45.0",
]
@@ -5639,7 +5639,7 @@ dependencies = [
"jsonptr",
"serde",
"serde_json",
"thiserror",
"thiserror 1.0.64",
]
[[package]]
@@ -5680,7 +5680,7 @@ dependencies = [
"pest_derive",
"regex",
"serde_json",
"thiserror",
"thiserror 1.0.64",
]
[[package]]
@@ -5693,7 +5693,7 @@ dependencies = [
"pest_derive",
"regex",
"serde_json",
"thiserror",
"thiserror 1.0.64",
]
[[package]]
@@ -5816,7 +5816,7 @@ dependencies = [
"serde",
"serde_json",
"serde_yaml",
"thiserror",
"thiserror 1.0.64",
"tokio",
"tokio-util",
"tower",
@@ -5838,7 +5838,7 @@ dependencies = [
"schemars",
"serde",
"serde_json",
"thiserror",
"thiserror 1.0.64",
]
[[package]]
@@ -5851,7 +5851,7 @@ dependencies = [
"proc-macro2",
"quote",
"serde_json",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -5876,7 +5876,7 @@ dependencies = [
"pin-project",
"serde",
"serde_json",
"thiserror",
"thiserror 1.0.64",
"tokio",
"tokio-util",
"tracing",
@@ -5943,7 +5943,7 @@ dependencies = [
"proc-macro2",
"quote",
"regex",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -6327,7 +6327,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "792ba667add2798c6c3e988e630f4eb921b5cbc735044825b7111ef1582c8730"
dependencies = [
"byteorder",
"thiserror",
"thiserror 1.0.64",
]
[[package]]
@@ -6432,7 +6432,7 @@ checksum = "376101dbd964fc502d5902216e180f92b3d003b5cc3d2e40e044eb5470fca677"
dependencies = [
"bytes",
"serde",
"thiserror",
"thiserror 1.0.64",
]
[[package]]
@@ -6807,7 +6807,7 @@ dependencies = [
"rustc_version",
"smallvec",
"tagptr",
"thiserror",
"thiserror 1.0.64",
"triomphe",
"uuid",
]
@@ -6898,9 +6898,9 @@ dependencies = [
"proc-macro-error",
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
"termcolor",
"thiserror",
"thiserror 1.0.64",
]
[[package]]
@@ -6916,9 +6916,9 @@ dependencies = [
"proc-macro-error",
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
"termcolor",
"thiserror",
"thiserror 1.0.64",
]
[[package]]
@@ -6948,7 +6948,7 @@ dependencies = [
"serde",
"serde_json",
"socket2 0.5.7",
"thiserror",
"thiserror 1.0.64",
"tokio",
"tokio-rustls 0.24.1",
"tokio-util",
@@ -6991,7 +6991,7 @@ dependencies = [
"sha2",
"smallvec",
"subprocess",
"thiserror",
"thiserror 1.0.64",
"time",
"uuid",
"zstd 0.12.4",
@@ -7031,7 +7031,7 @@ dependencies = [
"sha2",
"smallvec",
"subprocess",
"thiserror",
"thiserror 1.0.64",
"time",
"uuid",
"zstd 0.13.2",
@@ -7090,7 +7090,7 @@ checksum = "254a5372af8fc138e36684761d3c0cdb758a4410e938babcff1c860ce14ddbfc"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -7281,7 +7281,7 @@ checksum = "ed3955f1a9c7c0c15e092f9c887db08b1fc683305fdf6eb6684f22555355e202"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -7539,7 +7539,7 @@ dependencies = [
"js-sys",
"once_cell",
"pin-project-lite",
"thiserror",
"thiserror 1.0.64",
"urlencoding",
]
@@ -7554,7 +7554,7 @@ dependencies = [
"js-sys",
"once_cell",
"pin-project-lite",
"thiserror",
"thiserror 1.0.64",
"urlencoding",
]
@@ -7572,7 +7572,7 @@ dependencies = [
"opentelemetry-semantic-conventions",
"opentelemetry_sdk 0.21.2",
"prost 0.11.9",
"thiserror",
"thiserror 1.0.64",
"tokio",
"tonic 0.9.2",
]
@@ -7629,7 +7629,7 @@ dependencies = [
"ordered-float 4.3.0",
"percent-encoding",
"rand",
"thiserror",
"thiserror 1.0.64",
"tokio",
"tokio-stream",
]
@@ -7652,7 +7652,7 @@ dependencies = [
"percent-encoding",
"rand",
"serde_json",
"thiserror",
"thiserror 1.0.64",
]
[[package]]
@@ -8088,7 +8088,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fdbef9d1d47087a895abd220ed25eb4ad973a5e26f6a4367b038c25e28dfc2d9"
dependencies = [
"memchr",
"thiserror",
"thiserror 1.0.64",
"ucd-trie",
]
@@ -8112,7 +8112,7 @@ dependencies = [
"pest_meta",
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -8138,9 +8138,9 @@ dependencies = [
[[package]]
name = "pgwire"
version = "0.25.0"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e63bc3945a17010ff93677589c656c5e8fb4183b00bc86360de8e187d2a86cb"
checksum = "c84e671791f3a354f265e55e400be8bb4b6262c1ec04fac4289e710ccf22ab43"
dependencies = [
"async-trait",
"bytes",
@@ -8154,7 +8154,7 @@ dependencies = [
"rand",
"ring 0.17.8",
"rust_decimal",
"thiserror",
"thiserror 2.0.4",
"tokio",
"tokio-rustls 0.26.0",
"tokio-util",
@@ -8224,7 +8224,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -8497,7 +8497,7 @@ dependencies = [
"smallvec",
"symbolic-demangle",
"tempfile",
"thiserror",
"thiserror 1.0.64",
]
[[package]]
@@ -8572,7 +8572,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "479cf940fbbb3426c32c5d5176f62ad57549a0bb84773423ba8be9d089f5faba"
dependencies = [
"proc-macro2",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -8620,9 +8620,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
version = "1.0.86"
version = "1.0.92"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77"
checksum = "37d3544b3f2748c54e147655edb5025752e2303145b5aefb3c3ea2c78b973bb0"
dependencies = [
"unicode-ident",
]
@@ -8664,7 +8664,7 @@ dependencies = [
"parking_lot 0.12.3",
"procfs",
"protobuf",
"thiserror",
"thiserror 1.0.64",
]
[[package]]
@@ -8768,7 +8768,7 @@ dependencies = [
"prost 0.12.6",
"prost-types 0.12.6",
"regex",
"syn 2.0.79",
"syn 2.0.90",
"tempfile",
]
@@ -8814,7 +8814,7 @@ dependencies = [
"itertools 0.12.1",
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -8827,7 +8827,7 @@ dependencies = [
"itertools 0.13.0",
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -9010,7 +9010,7 @@ dependencies = [
"proc-macro2",
"pyo3-macros-backend",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -9023,7 +9023,7 @@ dependencies = [
"proc-macro2",
"pyo3-build-config",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -9159,7 +9159,7 @@ dependencies = [
"rustc-hash 2.0.0",
"rustls 0.23.13",
"socket2 0.5.7",
"thiserror",
"thiserror 1.0.64",
"tokio",
"tracing",
]
@@ -9176,7 +9176,7 @@ dependencies = [
"rustc-hash 2.0.0",
"rustls 0.23.13",
"slab",
"thiserror",
"thiserror 1.0.64",
"tinyvec",
"tracing",
]
@@ -9249,7 +9249,7 @@ dependencies = [
"serde",
"serde_repr",
"strum 0.25.0",
"thiserror",
"thiserror 1.0.64",
]
[[package]]
@@ -9302,7 +9302,7 @@ checksum = "6c1bb13e2dcfa2232ac6887157aad8d9b3fe4ca57f7c8d4938ff5ea9be742300"
dependencies = [
"clocksource",
"parking_lot 0.12.3",
"thiserror",
"thiserror 1.0.64",
]
[[package]]
@@ -9372,7 +9372,7 @@ checksum = "ba009ff324d1fc1b900bd1fdb31564febe58a8ccc8a6fdbb93b543d33b13ca43"
dependencies = [
"getrandom",
"libredox",
"thiserror",
"thiserror 1.0.64",
]
[[package]]
@@ -9392,7 +9392,7 @@ checksum = "bcc303e793d3734489387d205e9b186fac9c6cfacedd98cbb2e8a5943595f3e6"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -9579,7 +9579,7 @@ dependencies = [
"nix 0.25.1",
"regex",
"tempfile",
"thiserror",
"thiserror 1.0.64",
]
[[package]]
@@ -9723,7 +9723,7 @@ dependencies = [
"serde_json",
"sha2",
"stringprep",
"thiserror",
"thiserror 1.0.64",
]
[[package]]
@@ -9743,7 +9743,7 @@ dependencies = [
"rsasl",
"rustls 0.23.13",
"snap",
"thiserror",
"thiserror 1.0.64",
"tokio",
"tokio-rustls 0.26.0",
"tracing",
@@ -9787,7 +9787,7 @@ dependencies = [
"regex",
"relative-path",
"rustc_version",
"syn 2.0.79",
"syn 2.0.90",
"unicode-ident",
]
@@ -9799,7 +9799,7 @@ checksum = "b3a8fb4672e840a587a66fc577a5491375df51ddb88f2a2c2a792598c326fe14"
dependencies = [
"quote",
"rand",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -9822,7 +9822,7 @@ dependencies = [
"proc-macro2",
"quote",
"rust-embed-utils",
"syn 2.0.79",
"syn 2.0.90",
"walkdir",
]
@@ -10362,7 +10362,7 @@ dependencies = [
"static_assertions",
"strum 0.24.1",
"strum_macros 0.24.3",
"thiserror",
"thiserror 1.0.64",
"thread_local",
"timsort",
"uname",
@@ -10561,7 +10561,7 @@ dependencies = [
"proc-macro2",
"quote",
"serde_derive_internals",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -10662,7 +10662,7 @@ dependencies = [
"heck 0.4.1",
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -10740,7 +10740,7 @@ checksum = "243902eda00fad750862fc144cea25caca5e20d615af0a81bee94ca738f1df1f"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -10751,7 +10751,7 @@ checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -10785,7 +10785,7 @@ checksum = "6c64451ba24fc7a6a2d60fc75dd9c83c90903b19028d4eff35e88fc1e86564e9"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -10806,7 +10806,7 @@ dependencies = [
"proc-macro2",
"quote",
"serde",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -10848,7 +10848,7 @@ dependencies = [
"darling 0.20.10",
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -11147,7 +11147,7 @@ checksum = "adc4e5204eb1910f40f9cfa375f6f05b68c3abac4b6fd879c8ff5e7ae8a0a085"
dependencies = [
"num-bigint",
"num-traits",
"thiserror",
"thiserror 1.0.64",
"time",
]
@@ -11240,7 +11240,7 @@ dependencies = [
"heck 0.5.0",
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -11383,7 +11383,7 @@ dependencies = [
"prettydiff",
"regex",
"serde_json",
"thiserror",
"thiserror 1.0.64",
"toml 0.5.11",
"walkdir",
]
@@ -11451,7 +11451,7 @@ checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -11461,7 +11461,7 @@ source = "git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=54a267ac89c09
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -11525,7 +11525,7 @@ dependencies = [
"sqlformat",
"sqlx-rt",
"stringprep",
"thiserror",
"thiserror 1.0.64",
"tokio-stream",
"url",
"webpki-roots 0.22.6",
@@ -11753,7 +11753,7 @@ dependencies = [
"proc-macro2",
"quote",
"rustversion",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -11766,7 +11766,7 @@ dependencies = [
"proc-macro2",
"quote",
"rustversion",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -11819,7 +11819,7 @@ dependencies = [
"serde",
"serde_json",
"serde_yaml",
"syn 2.0.79",
"syn 2.0.90",
"typify",
"walkdir",
]
@@ -11840,7 +11840,7 @@ dependencies = [
"serde",
"serde_json",
"serde_yaml",
"syn 2.0.79",
"syn 2.0.90",
"typify",
"walkdir",
]
@@ -11887,9 +11887,9 @@ dependencies = [
[[package]]
name = "syn"
version = "2.0.79"
version = "2.0.90"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89132cd0bf050864e1d38dc3bbc07a0eb8e7530af26344d3d2bbbef83499f590"
checksum = "919d3b74a5dd0ccd15aeb8f93e7006bd9e14c295087c9896a110f490752bcf31"
dependencies = [
"proc-macro2",
"quote",
@@ -11924,7 +11924,7 @@ dependencies = [
"proc-macro-error",
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -12067,7 +12067,7 @@ dependencies = [
"tantivy-stacker",
"tantivy-tokenizer-api",
"tempfile",
"thiserror",
"thiserror 1.0.64",
"time",
"uuid",
"winapi",
@@ -12393,7 +12393,16 @@ version = "1.0.64"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d50af8abc119fb8bb6dbabcfa89656f46f84aa0ac7688088608076ad2b459a84"
dependencies = [
"thiserror-impl",
"thiserror-impl 1.0.64",
]
[[package]]
name = "thiserror"
version = "2.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f49a1853cf82743e3b7950f77e0f4d622ca36cf4317cba00c767838bac8d490"
dependencies = [
"thiserror-impl 2.0.4",
]
[[package]]
@@ -12404,7 +12413,18 @@ checksum = "08904e7672f5eb876eaaf87e0ce17857500934f4981c4a0ab2b4aa98baac7fc3"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
name = "thiserror-impl"
version = "2.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8381894bb3efe0c4acac3ded651301ceee58a15d47c2e34885ed1908ad667061"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.90",
]
[[package]]
@@ -12591,7 +12611,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -12864,7 +12884,7 @@ dependencies = [
"proc-macro2",
"prost-build 0.12.6",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -12981,7 +13001,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3566e8ce28cc0a3fe42519fc80e6b4c943cc4c8cef275620eb8dac2d3d4e06cf"
dependencies = [
"crossbeam-channel",
"thiserror",
"thiserror 1.0.64",
"time",
"tracing-subscriber",
]
@@ -12994,7 +13014,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -13154,7 +13174,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "042342584c5a7a0b833d9fc4e2bdab3f9868ddc6c4b339a1e01451c6720868bc"
dependencies = [
"regex",
"thiserror",
"thiserror 1.0.64",
"tree-sitter",
]
@@ -13185,7 +13205,7 @@ checksum = "ccb3f1376219530a37a809751ecf65aa35fd8b9c1c4ab6d4faf5f6a9eeda2c05"
dependencies = [
"memchr",
"regex",
"thiserror",
"thiserror 1.0.64",
"tree-sitter",
]
@@ -13251,7 +13271,7 @@ checksum = "70b20a22c42c8f1cd23ce5e34f165d4d37038f5b663ad20fb6adbdf029172483"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -13279,8 +13299,8 @@ dependencies = [
"semver",
"serde",
"serde_json",
"syn 2.0.79",
"thiserror",
"syn 2.0.90",
"thiserror 1.0.64",
"unicode-ident",
]
@@ -13297,7 +13317,7 @@ dependencies = [
"serde",
"serde_json",
"serde_tokenstream",
"syn 2.0.79",
"syn 2.0.90",
"typify-impl",
]
@@ -13621,7 +13641,7 @@ checksum = "ee1cd046f83ea2c4e920d6ee9f7c3537ef928d75dce5d84a87c2c5d6b3999a3a"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -13737,7 +13757,7 @@ dependencies = [
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
"wasm-bindgen-shared",
]
@@ -13771,7 +13791,7 @@ checksum = "afc340c74d9005395cf9dd098506f7f44e38f2b4a21c6aaacf9a105ea5e1e836"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
@@ -14288,7 +14308,7 @@ dependencies = [
"geo-types",
"log",
"num-traits",
"thiserror",
"thiserror 1.0.64",
]
[[package]]
@@ -14315,7 +14335,7 @@ dependencies = [
"ring 0.17.8",
"signature",
"spki 0.7.3",
"thiserror",
"thiserror 1.0.64",
"zeroize",
]
@@ -14367,7 +14387,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]
@@ -14387,7 +14407,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.79",
"syn 2.0.90",
]
[[package]]

View File

@@ -93,7 +93,7 @@
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `/tmp/greptimedb/` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
| `storage.cache_path` | String | Unset | Cache configuration for object storage such as 'S3' etc. It is recommended to configure it when using object storage for better performance.<br/>The local file cache directory. |
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}/object_cache/read`. An empty string means disabling. |
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |
| `storage.bucket` | String | Unset | The S3 bucket name.<br/>**It's only used when the storage type is `S3`, `Oss` and `Gcs`**. |
| `storage.root` | String | Unset | The S3 data will be stored in the specified prefix, for example, `s3://${bucket}/${root}`.<br/>**It's only used when the storage type is `S3`, `Oss` and `Azblob`**. |
@@ -131,12 +131,11 @@
| `region_engine.mito.vector_cache_size` | String | Auto | Cache size for vectors and arrow arrays. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.page_cache_size` | String | Auto | Cache size for pages of SST row groups. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/8 of OS memory. |
| `region_engine.mito.selector_result_cache_size` | String | Auto | Cache size for time series selector (e.g. `last_value()`). Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.enable_experimental_write_cache` | Bool | `false` | Whether to enable the experimental write cache. It is recommended to enable it when using object storage for better performance. |
| `region_engine.mito.experimental_write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}/write_cache`. |
| `region_engine.mito.experimental_write_cache_size` | String | `1GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. |
| `region_engine.mito.enable_experimental_write_cache` | Bool | `false` | Whether to enable the experimental write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance. |
| `region_engine.mito.experimental_write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}/object_cache/write`. |
| `region_engine.mito.experimental_write_cache_size` | String | `5GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. |
| `region_engine.mito.experimental_write_cache_ttl` | String | Unset | TTL for write cache. |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
| `region_engine.mito.scan_parallelism` | Integer | `0` | Parallelism to scan a region (default: 1/4 of cpu cores).<br/>- `0`: using the default value (1/4 of cpu cores).<br/>- `1`: scan in current thread.<br/>- `n`: scan in parallelism n. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.<br/>To align with the old behavior, the default value is 0 (no restrictions). |
@@ -421,7 +420,7 @@
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `/tmp/greptimedb/` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
| `storage.cache_path` | String | Unset | Cache configuration for object storage such as 'S3' etc. It is recommended to configure it when using object storage for better performance.<br/>The local file cache directory. |
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}/object_cache/read`. An empty string means disabling. |
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |
| `storage.bucket` | String | Unset | The S3 bucket name.<br/>**It's only used when the storage type is `S3`, `Oss` and `Gcs`**. |
| `storage.root` | String | Unset | The S3 data will be stored in the specified prefix, for example, `s3://${bucket}/${root}`.<br/>**It's only used when the storage type is `S3`, `Oss` and `Azblob`**. |
@@ -459,12 +458,11 @@
| `region_engine.mito.vector_cache_size` | String | Auto | Cache size for vectors and arrow arrays. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.page_cache_size` | String | Auto | Cache size for pages of SST row groups. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/8 of OS memory. |
| `region_engine.mito.selector_result_cache_size` | String | Auto | Cache size for time series selector (e.g. `last_value()`). Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.enable_experimental_write_cache` | Bool | `false` | Whether to enable the experimental write cache. It is recommended to enable it when using object storage for better performance. |
| `region_engine.mito.experimental_write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}/write_cache`. |
| `region_engine.mito.experimental_write_cache_size` | String | `1GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. |
| `region_engine.mito.enable_experimental_write_cache` | Bool | `false` | Whether to enable the experimental write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance. |
| `region_engine.mito.experimental_write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}/object_cache/write`. |
| `region_engine.mito.experimental_write_cache_size` | String | `5GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. |
| `region_engine.mito.experimental_write_cache_ttl` | String | Unset | TTL for write cache. |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
| `region_engine.mito.scan_parallelism` | Integer | `0` | Parallelism to scan a region (default: 1/4 of cpu cores).<br/>- `0`: using the default value (1/4 of cpu cores).<br/>- `1`: scan in current thread.<br/>- `n`: scan in parallelism n. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.<br/>To align with the old behavior, the default value is 0 (no restrictions). |

View File

@@ -294,14 +294,14 @@ data_home = "/tmp/greptimedb/"
## - `Oss`: the data is stored in the Aliyun OSS.
type = "File"
## Cache configuration for object storage such as 'S3' etc. It is recommended to configure it when using object storage for better performance.
## The local file cache directory.
## Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.
## A local file directory, defaults to `{data_home}/object_cache/read`. An empty string means disabling.
## @toml2docs:none-default
cache_path = "/path/local_cache"
#+ cache_path = ""
## The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger.
## @toml2docs:none-default
cache_capacity = "1GiB"
cache_capacity = "5GiB"
## The S3 bucket name.
## **It's only used when the storage type is `S3`, `Oss` and `Gcs`**.
@@ -476,14 +476,14 @@ auto_flush_interval = "1h"
## @toml2docs:none-default="Auto"
#+ selector_result_cache_size = "512MB"
## Whether to enable the experimental write cache. It is recommended to enable it when using object storage for better performance.
## Whether to enable the experimental write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance.
enable_experimental_write_cache = false
## File system path for write cache, defaults to `{data_home}/write_cache`.
## File system path for write cache, defaults to `{data_home}/object_cache/write`.
experimental_write_cache_path = ""
## Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger.
experimental_write_cache_size = "1GiB"
experimental_write_cache_size = "5GiB"
## TTL for write cache.
## @toml2docs:none-default
@@ -492,12 +492,6 @@ experimental_write_cache_ttl = "8h"
## Buffer size for SST writing.
sst_write_buffer_size = "8MB"
## Parallelism to scan a region (default: 1/4 of cpu cores).
## - `0`: using the default value (1/4 of cpu cores).
## - `1`: scan in current thread.
## - `n`: scan in parallelism n.
scan_parallelism = 0
## Capacity of the channel to send data from parallel scan tasks to the main task.
parallel_scan_channel_size = 32

View File

@@ -332,14 +332,14 @@ data_home = "/tmp/greptimedb/"
## - `Oss`: the data is stored in the Aliyun OSS.
type = "File"
## Cache configuration for object storage such as 'S3' etc. It is recommended to configure it when using object storage for better performance.
## The local file cache directory.
## Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.
## A local file directory, defaults to `{data_home}/object_cache/read`. An empty string means disabling.
## @toml2docs:none-default
cache_path = "/path/local_cache"
#+ cache_path = ""
## The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger.
## @toml2docs:none-default
cache_capacity = "1GiB"
cache_capacity = "5GiB"
## The S3 bucket name.
## **It's only used when the storage type is `S3`, `Oss` and `Gcs`**.
@@ -514,14 +514,14 @@ auto_flush_interval = "1h"
## @toml2docs:none-default="Auto"
#+ selector_result_cache_size = "512MB"
## Whether to enable the experimental write cache. It is recommended to enable it when using object storage for better performance.
## Whether to enable the experimental write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance.
enable_experimental_write_cache = false
## File system path for write cache, defaults to `{data_home}/write_cache`.
## File system path for write cache, defaults to `{data_home}/object_cache/write`.
experimental_write_cache_path = ""
## Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger.
experimental_write_cache_size = "1GiB"
experimental_write_cache_size = "5GiB"
## TTL for write cache.
## @toml2docs:none-default
@@ -530,12 +530,6 @@ experimental_write_cache_ttl = "8h"
## Buffer size for SST writing.
sst_write_buffer_size = "8MB"
## Parallelism to scan a region (default: 1/4 of cpu cores).
## - `0`: using the default value (1/4 of cpu cores).
## - `1`: scan in current thread.
## - `n`: scan in parallelism n.
scan_parallelism = 0
## Capacity of the channel to send data from parallel scan tasks to the main task.
parallel_scan_channel_size = 32

View File

@@ -69,7 +69,6 @@ fn test_load_datanode_example_config() {
region_engine: vec![
RegionEngineConfig::Mito(MitoConfig {
auto_flush_interval: Duration::from_secs(3600),
scan_parallelism: 0,
experimental_write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)),
..Default::default()
}),
@@ -205,7 +204,6 @@ fn test_load_standalone_example_config() {
RegionEngineConfig::Mito(MitoConfig {
auto_flush_interval: Duration::from_secs(3600),
experimental_write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)),
scan_parallelism: 0,
..Default::default()
}),
RegionEngineConfig::File(EngineConfig {}),

View File

@@ -14,6 +14,7 @@
mod convert;
mod distance;
pub(crate) mod impl_conv;
use std::sync::Arc;

View File

@@ -18,18 +18,17 @@ mod l2sq;
use std::borrow::Cow;
use std::fmt::Display;
use std::sync::Arc;
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::Signature;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::value::ValueRef;
use datatypes::vectors::{Float32VectorBuilder, MutableVector, Vector, VectorRef};
use datatypes::vectors::{Float32VectorBuilder, MutableVector, VectorRef};
use snafu::ensure;
use crate::function::{Function, FunctionContext};
use crate::helper;
use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const};
macro_rules! define_distance_function {
($StructName:ident, $display_name:expr, $similarity_method:path) => {
@@ -80,17 +79,17 @@ macro_rules! define_distance_function {
return Ok(result.to_vector());
}
let arg0_const = parse_if_constant_string(arg0)?;
let arg1_const = parse_if_constant_string(arg1)?;
let arg0_const = as_veclit_if_const(arg0)?;
let arg1_const = as_veclit_if_const(arg1)?;
for i in 0..size {
let vec0 = match arg0_const.as_ref() {
Some(a) => Some(Cow::Borrowed(a.as_slice())),
None => as_vector(arg0.get_ref(i))?,
Some(a) => Some(Cow::Borrowed(a.as_ref())),
None => as_veclit(arg0.get_ref(i))?,
};
let vec1 = match arg1_const.as_ref() {
Some(b) => Some(Cow::Borrowed(b.as_slice())),
None => as_vector(arg1.get_ref(i))?,
Some(b) => Some(Cow::Borrowed(b.as_ref())),
None => as_veclit(arg1.get_ref(i))?,
};
if let (Some(vec0), Some(vec1)) = (vec0, vec1) {
@@ -129,98 +128,6 @@ define_distance_function!(CosDistanceFunction, "vec_cos_distance", cos::cos);
define_distance_function!(L2SqDistanceFunction, "vec_l2sq_distance", l2sq::l2sq);
define_distance_function!(DotProductFunction, "vec_dot_product", dot::dot);
/// Parse a vector value if the value is a constant string.
fn parse_if_constant_string(arg: &Arc<dyn Vector>) -> Result<Option<Vec<f32>>> {
if !arg.is_const() {
return Ok(None);
}
if arg.data_type() != ConcreteDataType::string_datatype() {
return Ok(None);
}
arg.get_ref(0)
.as_string()
.unwrap() // Safe: checked if it is a string
.map(parse_f32_vector_from_string)
.transpose()
}
/// Convert a value to a vector value.
/// Supported data types are binary and string.
fn as_vector(arg: ValueRef<'_>) -> Result<Option<Cow<'_, [f32]>>> {
match arg.data_type() {
ConcreteDataType::Binary(_) => arg
.as_binary()
.unwrap() // Safe: checked if it is a binary
.map(binary_as_vector)
.transpose(),
ConcreteDataType::String(_) => arg
.as_string()
.unwrap() // Safe: checked if it is a string
.map(|s| Ok(Cow::Owned(parse_f32_vector_from_string(s)?)))
.transpose(),
ConcreteDataType::Null(_) => Ok(None),
_ => InvalidFuncArgsSnafu {
err_msg: format!("Unsupported data type: {:?}", arg.data_type()),
}
.fail(),
}
}
/// Convert a u8 slice to a vector value.
fn binary_as_vector(bytes: &[u8]) -> Result<Cow<'_, [f32]>> {
if bytes.len() % std::mem::size_of::<f32>() != 0 {
return InvalidFuncArgsSnafu {
err_msg: format!("Invalid binary length of vector: {}", bytes.len()),
}
.fail();
}
if cfg!(target_endian = "little") {
Ok(unsafe {
let vec = std::slice::from_raw_parts(
bytes.as_ptr() as *const f32,
bytes.len() / std::mem::size_of::<f32>(),
);
Cow::Borrowed(vec)
})
} else {
let v = bytes
.chunks_exact(std::mem::size_of::<f32>())
.map(|chunk| f32::from_le_bytes(chunk.try_into().unwrap()))
.collect::<Vec<f32>>();
Ok(Cow::Owned(v))
}
}
/// Parse a string to a vector value.
/// Valid inputs are strings like "[1.0, 2.0, 3.0]".
fn parse_f32_vector_from_string(s: &str) -> Result<Vec<f32>> {
let trimmed = s.trim();
if !trimmed.starts_with('[') || !trimmed.ends_with(']') {
return InvalidFuncArgsSnafu {
err_msg: format!(
"Failed to parse {s} to Vector value: not properly enclosed in brackets"
),
}
.fail();
}
let content = trimmed[1..trimmed.len() - 1].trim();
if content.is_empty() {
return Ok(Vec::new());
}
content
.split(',')
.map(|s| s.trim().parse::<f32>())
.collect::<std::result::Result<_, _>>()
.map_err(|e| {
InvalidFuncArgsSnafu {
err_msg: format!("Failed to parse {s} to Vector value: {e}"),
}
.build()
})
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
@@ -456,27 +363,4 @@ mod tests {
assert!(result.is_err());
}
}
#[test]
fn test_parse_vector_from_string() {
let result = parse_f32_vector_from_string("[1.0, 2.0, 3.0]").unwrap();
assert_eq!(result, vec![1.0, 2.0, 3.0]);
let result = parse_f32_vector_from_string("[]").unwrap();
assert_eq!(result, Vec::<f32>::new());
let result = parse_f32_vector_from_string("[1.0, a, 3.0]");
assert!(result.is_err());
}
#[test]
fn test_binary_as_vector() {
let bytes = [0, 0, 128, 63];
let result = binary_as_vector(&bytes).unwrap();
assert_eq!(result.as_ref(), &[1.0]);
let invalid_bytes = [0, 0, 128];
let result = binary_as_vector(&invalid_bytes);
assert!(result.is_err());
}
}

View File

@@ -0,0 +1,156 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::borrow::Cow;
use std::sync::Arc;
use common_query::error::{InvalidFuncArgsSnafu, Result};
use datatypes::prelude::ConcreteDataType;
use datatypes::value::ValueRef;
use datatypes::vectors::Vector;
/// Convert a constant string or binary literal to a vector literal.
pub fn as_veclit_if_const(arg: &Arc<dyn Vector>) -> Result<Option<Cow<'_, [f32]>>> {
if !arg.is_const() {
return Ok(None);
}
if arg.data_type() != ConcreteDataType::string_datatype()
&& arg.data_type() != ConcreteDataType::binary_datatype()
{
return Ok(None);
}
as_veclit(arg.get_ref(0))
}
/// Convert a string or binary literal to a vector literal.
pub fn as_veclit(arg: ValueRef<'_>) -> Result<Option<Cow<'_, [f32]>>> {
match arg.data_type() {
ConcreteDataType::Binary(_) => arg
.as_binary()
.unwrap() // Safe: checked if it is a binary
.map(binlit_as_veclit)
.transpose(),
ConcreteDataType::String(_) => arg
.as_string()
.unwrap() // Safe: checked if it is a string
.map(|s| Ok(Cow::Owned(parse_veclit_from_strlit(s)?)))
.transpose(),
ConcreteDataType::Null(_) => Ok(None),
_ => InvalidFuncArgsSnafu {
err_msg: format!("Unsupported data type: {:?}", arg.data_type()),
}
.fail(),
}
}
/// Convert a u8 slice to a vector literal.
pub fn binlit_as_veclit(bytes: &[u8]) -> Result<Cow<'_, [f32]>> {
if bytes.len() % std::mem::size_of::<f32>() != 0 {
return InvalidFuncArgsSnafu {
err_msg: format!("Invalid binary length of vector: {}", bytes.len()),
}
.fail();
}
if cfg!(target_endian = "little") {
Ok(unsafe {
let vec = std::slice::from_raw_parts(
bytes.as_ptr() as *const f32,
bytes.len() / std::mem::size_of::<f32>(),
);
Cow::Borrowed(vec)
})
} else {
let v = bytes
.chunks_exact(std::mem::size_of::<f32>())
.map(|chunk| f32::from_le_bytes(chunk.try_into().unwrap()))
.collect::<Vec<f32>>();
Ok(Cow::Owned(v))
}
}
/// Parse a string literal to a vector literal.
/// Valid inputs are strings like "[1.0, 2.0, 3.0]".
pub fn parse_veclit_from_strlit(s: &str) -> Result<Vec<f32>> {
let trimmed = s.trim();
if !trimmed.starts_with('[') || !trimmed.ends_with(']') {
return InvalidFuncArgsSnafu {
err_msg: format!(
"Failed to parse {s} to Vector value: not properly enclosed in brackets"
),
}
.fail();
}
let content = trimmed[1..trimmed.len() - 1].trim();
if content.is_empty() {
return Ok(Vec::new());
}
content
.split(',')
.map(|s| s.trim().parse::<f32>())
.collect::<std::result::Result<_, _>>()
.map_err(|e| {
InvalidFuncArgsSnafu {
err_msg: format!("Failed to parse {s} to Vector value: {e}"),
}
.build()
})
}
#[allow(unused)]
/// Convert a vector literal to a binary literal.
pub fn veclit_to_binlit(vec: &[f32]) -> Vec<u8> {
if cfg!(target_endian = "little") {
unsafe {
std::slice::from_raw_parts(vec.as_ptr() as *const u8, std::mem::size_of_val(vec))
.to_vec()
}
} else {
let mut bytes = Vec::with_capacity(std::mem::size_of_val(vec));
for e in vec {
bytes.extend_from_slice(&e.to_le_bytes());
}
bytes
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_veclit_from_strlit() {
let result = parse_veclit_from_strlit("[1.0, 2.0, 3.0]").unwrap();
assert_eq!(result, vec![1.0, 2.0, 3.0]);
let result = parse_veclit_from_strlit("[]").unwrap();
assert_eq!(result, Vec::<f32>::new());
let result = parse_veclit_from_strlit("[1.0, a, 3.0]");
assert!(result.is_err());
}
#[test]
fn test_binlit_as_veclit() {
let vec = &[1.0, 2.0, 3.0];
let bytes = veclit_to_binlit(vec);
let result = binlit_as_veclit(&bytes).unwrap();
assert_eq!(result.as_ref(), vec);
let invalid_bytes = [0, 0, 128];
let result = binlit_as_veclit(&invalid_bytes);
assert!(result.is_err());
}
}

View File

@@ -32,7 +32,7 @@ use servers::heartbeat_options::HeartbeatOptions;
use servers::http::HttpOptions;
use servers::Mode;
pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::gb(1);
pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::gb(5);
/// Default data home in file storage
const DEFAULT_DATA_HOME: &str = "/tmp/greptimedb";
@@ -60,6 +60,11 @@ impl ObjectStoreConfig {
}
}
/// Returns true when it's a remote object storage such as AWS s3 etc.
pub fn is_object_storage(&self) -> bool {
!matches!(self, Self::File(_))
}
/// Returns the object storage configuration name, return the provider name if it's empty.
pub fn config_name(&self) -> &str {
let name = match self {
@@ -91,6 +96,13 @@ pub struct StorageConfig {
pub providers: Vec<ObjectStoreConfig>,
}
impl StorageConfig {
/// Returns true when the default storage config is a remote object storage service such as AWS S3, etc.
pub fn is_object_storage(&self) -> bool {
self.store.is_object_storage()
}
}
impl Default for StorageConfig {
fn default() -> Self {
Self {
@@ -452,6 +464,20 @@ mod tests {
assert_eq!("S3", s3_config.provider_name());
}
#[test]
fn test_is_object_storage() {
let store = ObjectStoreConfig::default();
assert!(!store.is_object_storage());
let s3_config = ObjectStoreConfig::S3(S3Config::default());
assert!(s3_config.is_object_storage());
let oss_config = ObjectStoreConfig::Oss(OssConfig::default());
assert!(oss_config.is_object_storage());
let gcs_config = ObjectStoreConfig::Gcs(GcsConfig::default());
assert!(gcs_config.is_object_storage());
let azblob_config = ObjectStoreConfig::Azblob(AzblobConfig::default());
assert!(azblob_config.is_object_storage());
}
#[test]
fn test_secstr() {
let toml_str = r#"

View File

@@ -428,10 +428,16 @@ impl DatanodeBuilder {
async fn build_mito_engine(
opts: &DatanodeOptions,
object_store_manager: ObjectStoreManagerRef,
config: MitoConfig,
mut config: MitoConfig,
schema_metadata_manager: SchemaMetadataManagerRef,
plugins: Plugins,
) -> Result<MitoEngine> {
if opts.storage.is_object_storage() {
// Enable the write cache when setting object storage
config.enable_experimental_write_cache = true;
info!("Configured 'enable_experimental_write_cache=true' for mito engine.");
}
let mito_engine = match &opts.wal {
DatanodeWalConfig::RaftEngine(raft_engine_config) => MitoEngine::new(
&opts.storage.data_home,

View File

@@ -13,6 +13,7 @@
// limitations under the License.
#![feature(assert_matches)]
#![feature(let_chains)]
pub mod alive_keeper;
pub mod config;

View File

@@ -19,21 +19,20 @@ mod fs;
mod gcs;
mod oss;
mod s3;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use std::{env, path};
use common_base::readable_size::ReadableSize;
use common_telemetry::{info, warn};
use object_store::layers::{LruCacheLayer, RetryInterceptor, RetryLayer};
use object_store::services::Fs;
use object_store::util::{join_dir, normalize_dir, with_instrument_layers};
use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder};
use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder, OBJECT_CACHE_DIR};
use snafu::prelude::*;
use crate::config::{HttpClientConfig, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
use crate::error::{self, Result};
use crate::error::{self, CreateDirSnafu, Result};
pub(crate) async fn new_raw_object_store(
store: &ObjectStoreConfig,
@@ -68,7 +67,7 @@ pub(crate) async fn new_object_store_without_cache(
) -> Result<ObjectStore> {
let object_store = new_raw_object_store(store, data_home).await?;
// Enable retry layer and cache layer for non-fs object storages
let object_store = if !matches!(store, ObjectStoreConfig::File(..)) {
let object_store = if store.is_object_storage() {
// Adds retry layer
with_retry_layers(object_store)
} else {
@@ -85,8 +84,8 @@ pub(crate) async fn new_object_store(
) -> Result<ObjectStore> {
let object_store = new_raw_object_store(&store, data_home).await?;
// Enable retry layer and cache layer for non-fs object storages
let object_store = if !matches!(store, ObjectStoreConfig::File(..)) {
let object_store = if let Some(cache_layer) = build_cache_layer(&store).await? {
let object_store = if store.is_object_storage() {
let object_store = if let Some(cache_layer) = build_cache_layer(&store, data_home).await? {
// Adds cache layer
object_store.layer(cache_layer)
} else {
@@ -105,44 +104,72 @@ pub(crate) async fn new_object_store(
async fn build_cache_layer(
store_config: &ObjectStoreConfig,
data_home: &str,
) -> Result<Option<LruCacheLayer<impl Access>>> {
let (cache_path, cache_capacity) = match store_config {
let (name, mut cache_path, cache_capacity) = match store_config {
ObjectStoreConfig::S3(s3_config) => {
let path = s3_config.cache.cache_path.as_ref();
let path = s3_config.cache.cache_path.clone();
let name = &s3_config.name;
let capacity = s3_config
.cache
.cache_capacity
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
(path, capacity)
(name, path, capacity)
}
ObjectStoreConfig::Oss(oss_config) => {
let path = oss_config.cache.cache_path.as_ref();
let path = oss_config.cache.cache_path.clone();
let name = &oss_config.name;
let capacity = oss_config
.cache
.cache_capacity
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
(path, capacity)
(name, path, capacity)
}
ObjectStoreConfig::Azblob(azblob_config) => {
let path = azblob_config.cache.cache_path.as_ref();
let path = azblob_config.cache.cache_path.clone();
let name = &azblob_config.name;
let capacity = azblob_config
.cache
.cache_capacity
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
(path, capacity)
(name, path, capacity)
}
ObjectStoreConfig::Gcs(gcs_config) => {
let path = gcs_config.cache.cache_path.as_ref();
let path = gcs_config.cache.cache_path.clone();
let name = &gcs_config.name;
let capacity = gcs_config
.cache
.cache_capacity
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
(path, capacity)
(name, path, capacity)
}
_ => (None, ReadableSize(0)),
_ => unreachable!("Already checked above"),
};
if let Some(path) = cache_path {
// Enable object cache by default
// Set the cache_path to be `${data_home}/object_cache/read/{name}` by default
// if it's not present
if cache_path.is_none() {
let object_cache_path = join_dir(data_home, OBJECT_CACHE_DIR);
let read_cache_path = join_dir(&object_cache_path, "read");
let read_cache_path = join_dir(&read_cache_path, &name.to_lowercase());
tokio::fs::create_dir_all(Path::new(&read_cache_path))
.await
.context(CreateDirSnafu {
dir: &read_cache_path,
})?;
info!(
"The object storage cache path is not set for '{}', using the default path: '{}'",
name, &read_cache_path
);
cache_path = Some(read_cache_path);
}
if let Some(path) = cache_path.as_ref()
&& !path.trim().is_empty()
{
let atomic_temp_dir = join_dir(path, ".tmp/");
clean_temp_dir(&atomic_temp_dir)?;

View File

@@ -597,9 +597,8 @@ impl<'a> CompactionSstReaderBuilder<'a> {
scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
}
SeqScan::new(scan_input)
.with_compaction()
.build_reader()
SeqScan::new(scan_input, true)
.build_reader_for_compaction()
.await
}
}

View File

@@ -21,6 +21,7 @@ use std::time::Duration;
use common_base::readable_size::ReadableSize;
use common_telemetry::warn;
use object_store::util::join_dir;
use object_store::OBJECT_CACHE_DIR;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
@@ -30,7 +31,7 @@ use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5);
/// Default channel size for parallel scan task.
const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32;
pub(crate) const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32;
// Use `1/GLOBAL_WRITE_BUFFER_SIZE_FACTOR` of OS memory as global write buffer size in default mode
const GLOBAL_WRITE_BUFFER_SIZE_FACTOR: u64 = 8;
@@ -46,7 +47,7 @@ const PAGE_CACHE_SIZE_FACTOR: u64 = 8;
const INDEX_CREATE_MEM_THRESHOLD_FACTOR: u64 = 16;
/// Fetch option timeout
pub(crate) const FETCH_OPTION_TIMEOUT: Duration = Duration::from_secs(10);
pub(crate) const FETCH_OPTION_TIMEOUT: Duration = Duration::from_secs(3);
/// Configuration for [MitoEngine](crate::engine::MitoEngine).
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
@@ -96,7 +97,7 @@ pub struct MitoConfig {
pub selector_result_cache_size: ReadableSize,
/// Whether to enable the experimental write cache.
pub enable_experimental_write_cache: bool,
/// File system path for write cache, defaults to `{data_home}/write_cache`.
/// File system path for write cache, defaults to `{data_home}/object_cache/write`.
pub experimental_write_cache_path: String,
/// Capacity for write cache.
pub experimental_write_cache_size: ReadableSize,
@@ -107,11 +108,6 @@ pub struct MitoConfig {
// Other configs:
/// Buffer size for SST writing.
pub sst_write_buffer_size: ReadableSize,
/// Parallelism to scan a region (default: 1/4 of cpu cores).
/// - 0: using the default value (1/4 of cpu cores).
/// - 1: scan in current thread.
/// - n: scan in parallelism n.
pub scan_parallelism: usize,
/// Capacity of the channel to send data from parallel scan tasks to the main task (default 32).
pub parallel_scan_channel_size: usize,
/// Whether to allow stale entries read during replay.
@@ -153,10 +149,9 @@ impl Default for MitoConfig {
selector_result_cache_size: ReadableSize::mb(512),
enable_experimental_write_cache: false,
experimental_write_cache_path: String::new(),
experimental_write_cache_size: ReadableSize::gb(1),
experimental_write_cache_size: ReadableSize::gb(5),
experimental_write_cache_ttl: None,
sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
scan_parallelism: divide_num_cpus(4),
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
allow_stale_entries: false,
index: IndexConfig::default(),
@@ -229,11 +224,6 @@ impl MitoConfig {
);
}
// Use default value if `scan_parallelism` is 0.
if self.scan_parallelism == 0 {
self.scan_parallelism = divide_num_cpus(4);
}
if self.parallel_scan_channel_size < 1 {
self.parallel_scan_channel_size = DEFAULT_SCAN_CHANNEL_SIZE;
warn!(
@@ -243,8 +233,9 @@ impl MitoConfig {
}
// Sets write cache path if it is empty.
if self.experimental_write_cache_path.is_empty() {
self.experimental_write_cache_path = join_dir(data_home, "write_cache");
if self.experimental_write_cache_path.trim().is_empty() {
let object_cache_path = join_dir(data_home, OBJECT_CACHE_DIR);
self.experimental_write_cache_path = join_dir(&object_cache_path, "write");
}
self.index.sanitize(data_home, &self.inverted_index)?;

View File

@@ -90,7 +90,7 @@ use crate::error::{
};
use crate::manifest::action::RegionEdit;
use crate::metrics::HANDLE_REQUEST_ELAPSED;
use crate::read::scan_region::{ScanParallelism, ScanRegion, Scanner};
use crate::read::scan_region::{ScanRegion, Scanner};
use crate::request::{RegionEditRequest, WorkerRequest};
use crate::wal::entry_distributor::{
build_wal_entry_distributor_and_receivers, DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
@@ -171,19 +171,9 @@ impl MitoEngine {
self.scan_region(region_id, request)?.scanner()
}
/// Returns a region scanner to scan the region for `request`.
fn region_scanner(
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<RegionScannerRef> {
let scanner = self.scanner(region_id, request)?;
scanner.region_scanner()
}
/// Scans a region.
fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
self.inner.handle_query(region_id, request)
self.inner.scan_region(region_id, request)
}
/// Edit region's metadata by [RegionEdit] directly. Use with care.
@@ -423,7 +413,7 @@ impl EngineInner {
}
/// Handles the scan `request` and returns a [ScanRegion].
fn handle_query(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
let query_start = Instant::now();
// Reading a region doesn't need to go through the region worker thread.
let region = self
@@ -433,14 +423,10 @@ impl EngineInner {
let version = region.version();
// Get cache.
let cache_manager = self.workers.cache_manager();
let scan_parallelism = ScanParallelism {
parallelism: self.config.scan_parallelism,
channel_size: self.config.parallel_scan_channel_size,
};
let scan_region =
ScanRegion::new(version, region.access_layer.clone(), request, cache_manager)
.with_parallelism(scan_parallelism)
.with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
.with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
.with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
.with_start_time(query_start);
@@ -538,7 +524,9 @@ impl RegionEngine for MitoEngine {
region_id: RegionId,
request: ScanRequest,
) -> Result<RegionScannerRef, BoxedError> {
self.region_scanner(region_id, request)
self.scan_region(region_id, request)
.map_err(BoxedError::new)?
.region_scanner()
.map_err(BoxedError::new)
}

View File

@@ -92,7 +92,6 @@ async fn test_append_mode_compaction() {
let mut env = TestEnv::new();
let engine = env
.create_engine(MitoConfig {
scan_parallelism: 2,
..Default::default()
})
.await;
@@ -176,19 +175,19 @@ async fn test_append_mode_compaction() {
| b | 1.0 | 1970-01-01T00:00:01 |
+-------+---------+---------------------+";
// Scans in parallel.
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
let mut scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
assert_eq!(2, scanner.num_files());
assert_eq!(1, scanner.num_memtables());
scanner.set_target_partitions(2);
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"]));
// Reopens engine with parallelism 1.
// Reopens engine.
let engine = env
.reopen_engine(
engine,
MitoConfig {
scan_parallelism: 1,
..Default::default()
},
)

View File

@@ -92,7 +92,6 @@ async fn test_merge_mode_compaction() {
let mut env = TestEnv::new();
let engine = env
.create_engine(MitoConfig {
scan_parallelism: 2,
..Default::default()
})
.await;
@@ -190,19 +189,19 @@ async fn test_merge_mode_compaction() {
| a | | 13.0 | 1970-01-01T00:00:03 |
+-------+---------+---------+---------------------+";
// Scans in parallel.
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
let mut scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
assert_eq!(1, scanner.num_files());
assert_eq!(1, scanner.num_memtables());
scanner.set_target_partitions(2);
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"]));
// Reopens engine with parallelism 1.
// Reopens engine.
let engine = env
.reopen_engine(
engine,
MitoConfig {
scan_parallelism: 1,
..Default::default()
},
)

View File

@@ -37,7 +37,6 @@ async fn scan_in_parallel(
) {
let engine = env
.open_engine(MitoConfig {
scan_parallelism: parallelism,
parallel_scan_channel_size: channel_size,
..Default::default()
})
@@ -57,7 +56,9 @@ async fn scan_in_parallel(
.unwrap();
let request = ScanRequest::default();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let mut scanner = engine.scanner(region_id, request).unwrap();
scanner.set_target_partitions(parallelism);
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+

View File

@@ -677,6 +677,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to create directory {}", dir))]
CreateDir {
dir: String,
#[snafu(source)]
error: std::io::Error,
},
#[snafu(display("Failed to filter record batch"))]
FilterRecordBatch {
source: common_recordbatch::error::Error,
@@ -955,6 +962,7 @@ impl ErrorExt for Error {
| ComputeVector { .. }
| SerializeField { .. }
| EncodeMemtable { .. }
| CreateDir { .. }
| ReadDataPart { .. }
| CorruptedEntry { .. }
| BuildEntry { .. } => StatusCode::Internal,

View File

@@ -34,6 +34,16 @@ use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
const ALL_ROW_GROUPS: i64 = -1;
/// Index and metadata for a memtable or file.
#[derive(Debug, Clone, Copy, PartialEq)]
pub(crate) struct SourceIndex {
/// Index of the memtable and file.
pub(crate) index: usize,
/// Total number of row groups in this source. 0 if the metadata
/// is unavailable. We use this to split files.
pub(crate) num_row_groups: u64,
}
/// Index to access a row group.
#[derive(Debug, Clone, Copy, PartialEq)]
pub(crate) struct RowGroupIndex {
@@ -52,7 +62,7 @@ pub(crate) struct RangeMeta {
/// The time range of the range.
pub(crate) time_range: FileTimeRange,
/// Indices to memtables or files.
indices: SmallVec<[usize; 2]>,
pub(crate) indices: SmallVec<[SourceIndex; 2]>,
/// Indices to memtable/file row groups that this range scans.
pub(crate) row_group_indices: SmallVec<[RowGroupIndex; 2]>,
/// Estimated number of rows in the range. This can be 0 if the statistics are not available.
@@ -81,12 +91,17 @@ impl RangeMeta {
}
/// Creates a list of ranges from the `input` for seq scan.
pub(crate) fn seq_scan_ranges(input: &ScanInput) -> Vec<RangeMeta> {
/// If `compaction` is true, it doesn't split the ranges.
pub(crate) fn seq_scan_ranges(input: &ScanInput, compaction: bool) -> Vec<RangeMeta> {
let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len());
Self::push_seq_mem_ranges(&input.memtables, &mut ranges);
Self::push_seq_file_ranges(input.memtables.len(), &input.files, &mut ranges);
let ranges = group_ranges_for_seq_scan(ranges);
if compaction {
// We don't split ranges in compaction.
return ranges;
}
maybe_split_ranges_for_seq_scan(ranges)
}
@@ -105,13 +120,13 @@ impl RangeMeta {
}
/// Returns true if the time range of given `meta` overlaps with the time range of this meta.
pub(crate) fn overlaps(&self, meta: &RangeMeta) -> bool {
fn overlaps(&self, meta: &RangeMeta) -> bool {
overlaps(&self.time_range, &meta.time_range)
}
/// Merges given `meta` to this meta.
/// It assumes that the time ranges overlap and they don't have the same file or memtable index.
pub(crate) fn merge(&mut self, mut other: RangeMeta) {
fn merge(&mut self, mut other: RangeMeta) {
debug_assert!(self.overlaps(&other));
debug_assert!(self.indices.iter().all(|idx| !other.indices.contains(idx)));
debug_assert!(self
@@ -130,22 +145,28 @@ impl RangeMeta {
/// Returns true if we can split the range into multiple smaller ranges and
/// still preserve the order for [SeqScan].
pub(crate) fn can_split_preserve_order(&self) -> bool {
// Only one source and multiple row groups.
self.indices.len() == 1 && self.row_group_indices.len() > 1
fn can_split_preserve_order(&self) -> bool {
self.indices.len() == 1 && self.indices[0].num_row_groups > 1
}
/// Splits the range if it can preserve the order.
pub(crate) fn maybe_split(self, output: &mut Vec<RangeMeta>) {
fn maybe_split(self, output: &mut Vec<RangeMeta>) {
if self.can_split_preserve_order() {
let num_row_groups = self.indices[0].num_row_groups;
debug_assert_eq!(1, self.row_group_indices.len());
debug_assert_eq!(ALL_ROW_GROUPS, self.row_group_indices[0].row_group_index);
output.reserve(self.row_group_indices.len());
let num_rows = self.num_rows / self.row_group_indices.len();
let num_rows = self.num_rows / num_row_groups as usize;
// Splits by row group.
for index in self.row_group_indices {
for row_group_index in 0..num_row_groups {
output.push(RangeMeta {
time_range: self.time_range,
indices: self.indices.clone(),
row_group_indices: smallvec![index],
row_group_indices: smallvec![RowGroupIndex {
index: self.indices[0].index,
row_group_index: row_group_index as i64,
}],
num_rows,
});
}
@@ -165,7 +186,10 @@ impl RangeMeta {
let num_rows = stats.num_rows() / stats.num_ranges();
ranges.push(RangeMeta {
time_range,
indices: smallvec![memtable_index],
indices: smallvec![SourceIndex {
index: memtable_index,
num_row_groups: stats.num_ranges() as u64,
}],
row_group_indices: smallvec![RowGroupIndex {
index: memtable_index,
row_group_index: row_group_index as i64,
@@ -199,7 +223,10 @@ impl RangeMeta {
let num_rows = parquet_meta.row_group(row_group_index as usize).num_rows();
ranges.push(RangeMeta {
time_range: time_range.unwrap_or_else(|| file.time_range()),
indices: smallvec![file_index],
indices: smallvec![SourceIndex {
index: file_index,
num_row_groups: file.meta_ref().num_row_groups,
}],
row_group_indices: smallvec![RowGroupIndex {
index: file_index,
row_group_index: row_group_index as i64,
@@ -212,7 +239,10 @@ impl RangeMeta {
for row_group_index in 0..file.meta_ref().num_row_groups {
ranges.push(RangeMeta {
time_range: file.time_range(),
indices: smallvec![file_index],
indices: smallvec![SourceIndex {
index: file_index,
num_row_groups: file.meta_ref().num_row_groups,
}],
row_group_indices: smallvec![RowGroupIndex {
index: file_index,
row_group_index: row_group_index as i64,
@@ -224,7 +254,10 @@ impl RangeMeta {
// If we don't known the number of row groups in advance, scan all row groups.
ranges.push(RangeMeta {
time_range: file.time_range(),
indices: smallvec![file_index],
indices: smallvec![SourceIndex {
index: file_index,
num_row_groups: 0,
}],
row_group_indices: smallvec![RowGroupIndex {
index: file_index,
row_group_index: ALL_ROW_GROUPS,
@@ -245,7 +278,10 @@ impl RangeMeta {
};
ranges.push(RangeMeta {
time_range,
indices: smallvec![i],
indices: smallvec![SourceIndex {
index: i,
num_row_groups: stats.num_ranges() as u64,
}],
row_group_indices: smallvec![RowGroupIndex {
index: i,
row_group_index: ALL_ROW_GROUPS,
@@ -263,31 +299,18 @@ impl RangeMeta {
// For non append-only mode, each range only contains one file.
for (i, file) in files.iter().enumerate() {
let file_index = num_memtables + i;
if file.meta_ref().num_row_groups > 0 {
// All row groups share the same time range.
let row_group_indices = (0..file.meta_ref().num_row_groups)
.map(|row_group_index| RowGroupIndex {
index: file_index,
row_group_index: row_group_index as i64,
})
.collect();
ranges.push(RangeMeta {
time_range: file.time_range(),
indices: smallvec![file_index],
row_group_indices,
num_rows: file.meta_ref().num_rows as usize,
});
} else {
ranges.push(RangeMeta {
time_range: file.time_range(),
indices: smallvec![file_index],
row_group_indices: smallvec![RowGroupIndex {
index: file_index,
row_group_index: ALL_ROW_GROUPS,
}],
num_rows: file.meta_ref().num_rows as usize,
});
}
ranges.push(RangeMeta {
time_range: file.time_range(),
indices: smallvec![SourceIndex {
index: file_index,
num_row_groups: file.meta_ref().num_row_groups,
}],
row_group_indices: smallvec![RowGroupIndex {
index: file_index,
row_group_index: ALL_ROW_GROUPS,
}],
num_rows: file.meta_ref().num_rows as usize,
});
}
}
}
@@ -514,7 +537,10 @@ mod tests {
);
RangeMeta {
time_range,
indices: smallvec![*idx],
indices: smallvec![SourceIndex {
index: *idx,
num_row_groups: 0,
}],
row_group_indices: smallvec![RowGroupIndex {
index: *idx,
row_group_index: 0
@@ -527,7 +553,7 @@ mod tests {
let actual: Vec<_> = output
.iter()
.map(|range| {
let indices = range.indices.to_vec();
let indices = range.indices.iter().map(|index| index.index).collect();
let group_indices: Vec<_> = range
.row_group_indices
.iter()
@@ -578,7 +604,10 @@ mod tests {
fn test_merge_range() {
let mut left = RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
indices: smallvec![1],
indices: smallvec![SourceIndex {
index: 1,
num_row_groups: 2,
}],
row_group_indices: smallvec![
RowGroupIndex {
index: 1,
@@ -593,7 +622,10 @@ mod tests {
};
let right = RangeMeta {
time_range: (Timestamp::new_second(800), Timestamp::new_second(1200)),
indices: smallvec![2],
indices: smallvec![SourceIndex {
index: 2,
num_row_groups: 2,
}],
row_group_indices: smallvec![
RowGroupIndex {
index: 2,
@@ -612,7 +644,16 @@ mod tests {
left,
RangeMeta {
time_range: (Timestamp::new_second(800), Timestamp::new_second(2000)),
indices: smallvec![1, 2],
indices: smallvec![
SourceIndex {
index: 1,
num_row_groups: 2
},
SourceIndex {
index: 2,
num_row_groups: 2
}
],
row_group_indices: smallvec![
RowGroupIndex {
index: 1,
@@ -640,17 +681,14 @@ mod tests {
fn test_split_range() {
let range = RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
indices: smallvec![1],
row_group_indices: smallvec![
RowGroupIndex {
index: 1,
row_group_index: 1
},
RowGroupIndex {
index: 1,
row_group_index: 2
}
],
indices: smallvec![SourceIndex {
index: 1,
num_row_groups: 2,
}],
row_group_indices: smallvec![RowGroupIndex {
index: 1,
row_group_index: ALL_ROW_GROUPS,
}],
num_rows: 5,
};
@@ -663,19 +701,25 @@ mod tests {
&[
RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
indices: smallvec![1],
indices: smallvec![SourceIndex {
index: 1,
num_row_groups: 2,
}],
row_group_indices: smallvec![RowGroupIndex {
index: 1,
row_group_index: 1
row_group_index: 0
},],
num_rows: 2,
},
RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
indices: smallvec![1],
indices: smallvec![SourceIndex {
index: 1,
num_row_groups: 2,
}],
row_group_indices: smallvec![RowGroupIndex {
index: 1,
row_group_index: 2
row_group_index: 1
}],
num_rows: 2,
}
@@ -687,7 +731,16 @@ mod tests {
fn test_not_split_range() {
let range = RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
indices: smallvec![1, 2],
indices: smallvec![
SourceIndex {
index: 1,
num_row_groups: 1,
},
SourceIndex {
index: 2,
num_row_groups: 1,
}
],
row_group_indices: smallvec![
RowGroupIndex {
index: 1,
@@ -710,32 +763,50 @@ mod tests {
#[test]
fn test_maybe_split_ranges() {
let ranges = vec![
RangeMeta {
time_range: (Timestamp::new_second(0), Timestamp::new_second(500)),
indices: smallvec![SourceIndex {
index: 0,
num_row_groups: 1,
}],
row_group_indices: smallvec![RowGroupIndex {
index: 0,
row_group_index: 0,
},],
num_rows: 4,
},
RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
indices: smallvec![1],
row_group_indices: smallvec![
RowGroupIndex {
index: 1,
row_group_index: 0
},
RowGroupIndex {
index: 1,
row_group_index: 1
}
],
indices: smallvec![SourceIndex {
index: 1,
num_row_groups: 2,
}],
row_group_indices: smallvec![RowGroupIndex {
index: 1,
row_group_index: ALL_ROW_GROUPS,
},],
num_rows: 4,
},
RangeMeta {
time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)),
indices: smallvec![2, 3],
indices: smallvec![
SourceIndex {
index: 2,
num_row_groups: 2,
},
SourceIndex {
index: 3,
num_row_groups: 0,
}
],
row_group_indices: smallvec![
RowGroupIndex {
index: 2,
row_group_index: 0
row_group_index: ALL_ROW_GROUPS,
},
RowGroupIndex {
index: 3,
row_group_index: 0
row_group_index: ALL_ROW_GROUPS,
}
],
num_rows: 5,
@@ -745,9 +816,24 @@ mod tests {
assert_eq!(
output,
vec![
RangeMeta {
time_range: (Timestamp::new_second(0), Timestamp::new_second(500)),
indices: smallvec![SourceIndex {
index: 0,
num_row_groups: 1,
}],
row_group_indices: smallvec![RowGroupIndex {
index: 0,
row_group_index: 0
},],
num_rows: 4,
},
RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
indices: smallvec![1],
indices: smallvec![SourceIndex {
index: 1,
num_row_groups: 2,
}],
row_group_indices: smallvec![RowGroupIndex {
index: 1,
row_group_index: 0
@@ -756,7 +842,10 @@ mod tests {
},
RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
indices: smallvec![1],
indices: smallvec![SourceIndex {
index: 1,
num_row_groups: 2,
}],
row_group_indices: smallvec![RowGroupIndex {
index: 1,
row_group_index: 1
@@ -765,15 +854,24 @@ mod tests {
},
RangeMeta {
time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)),
indices: smallvec![2, 3],
indices: smallvec![
SourceIndex {
index: 2,
num_row_groups: 2
},
SourceIndex {
index: 3,
num_row_groups: 0,
}
],
row_group_indices: smallvec![
RowGroupIndex {
index: 2,
row_group_index: 0
row_group_index: ALL_ROW_GROUPS,
},
RowGroupIndex {
index: 3,
row_group_index: 0
row_group_index: ALL_ROW_GROUPS,
}
],
num_rows: 5,

View File

@@ -33,6 +33,7 @@ use tokio_stream::wrappers::ReceiverStream;
use crate::access_layer::AccessLayerRef;
use crate::cache::file_cache::FileCacheRef;
use crate::cache::CacheManagerRef;
use crate::config::DEFAULT_SCAN_CHANNEL_SIZE;
use crate::error::Result;
use crate::memtable::MemtableRef;
use crate::metrics::READ_SST_COUNT;
@@ -68,15 +69,6 @@ impl Scanner {
Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
}
}
/// Returns a [RegionScanner] to scan the region.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
pub(crate) fn region_scanner(self) -> Result<RegionScannerRef> {
match self {
Scanner::Seq(seq_scan) => Ok(Box::new(seq_scan)),
Scanner::Unordered(unordered_scan) => Ok(Box::new(unordered_scan)),
}
}
}
#[cfg(test)]
@@ -104,6 +96,17 @@ impl Scanner {
Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
}
}
/// Sets the target partitions for the scanner. It can controls the parallelism of the scanner.
pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
use store_api::region_engine::{PrepareRequest, RegionScanner};
let request = PrepareRequest::default().with_target_partitions(target_partitions);
match self {
Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
}
}
}
#[cfg_attr(doc, aquamarine::aquamarine)]
@@ -165,8 +168,8 @@ pub(crate) struct ScanRegion {
request: ScanRequest,
/// Cache.
cache_manager: CacheManagerRef,
/// Parallelism to scan.
parallelism: ScanParallelism,
/// Capacity of the channel to send data from parallel scan tasks to the main task.
parallel_scan_channel_size: usize,
/// Whether to ignore inverted index.
ignore_inverted_index: bool,
/// Whether to ignore fulltext index.
@@ -188,17 +191,20 @@ impl ScanRegion {
access_layer,
request,
cache_manager,
parallelism: ScanParallelism::default(),
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
ignore_inverted_index: false,
ignore_fulltext_index: false,
start_time: None,
}
}
/// Sets parallelism.
/// Sets parallel scan task channel size.
#[must_use]
pub(crate) fn with_parallelism(mut self, parallelism: ScanParallelism) -> Self {
self.parallelism = parallelism;
pub(crate) fn with_parallel_scan_channel_size(
mut self,
parallel_scan_channel_size: usize,
) -> Self {
self.parallel_scan_channel_size = parallel_scan_channel_size;
self
}
@@ -224,7 +230,7 @@ impl ScanRegion {
/// Returns a [Scanner] to scan the region.
pub(crate) fn scanner(self) -> Result<Scanner> {
if self.version.options.append_mode && self.request.series_row_selector.is_none() {
if self.use_unordered_scan() {
// If table is append only and there is no series row selector, we use unordered scan in query.
// We still use seq scan in compaction.
self.unordered_scan().map(Scanner::Unordered)
@@ -233,10 +239,20 @@ impl ScanRegion {
}
}
/// Returns a [RegionScanner] to scan the region.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
pub(crate) fn region_scanner(self) -> Result<RegionScannerRef> {
if self.use_unordered_scan() {
self.unordered_scan().map(|scanner| Box::new(scanner) as _)
} else {
self.seq_scan().map(|scanner| Box::new(scanner) as _)
}
}
/// Scan sequentially.
pub(crate) fn seq_scan(self) -> Result<SeqScan> {
let input = self.scan_input(true)?;
Ok(SeqScan::new(input))
Ok(SeqScan::new(input, false))
}
/// Unordered scan.
@@ -248,7 +264,14 @@ impl ScanRegion {
#[cfg(test)]
pub(crate) fn scan_without_filter_deleted(self) -> Result<SeqScan> {
let input = self.scan_input(false)?;
Ok(SeqScan::new(input))
Ok(SeqScan::new(input, false))
}
/// Returns true if the region can use unordered scan for current request.
fn use_unordered_scan(&self) -> bool {
// If table is append only and there is no series row selector, we use unordered scan in query.
// We still use seq scan in compaction.
self.version.options.append_mode && self.request.series_row_selector.is_none()
}
/// Creates a scan input.
@@ -314,7 +337,7 @@ impl ScanRegion {
.with_cache(self.cache_manager)
.with_inverted_index_applier(inverted_index_applier)
.with_fulltext_index_applier(fulltext_index_applier)
.with_parallelism(self.parallelism)
.with_parallel_scan_channel_size(self.parallel_scan_channel_size)
.with_start_time(self.start_time)
.with_append_mode(self.version.options.append_mode)
.with_filter_deleted(filter_deleted)
@@ -428,15 +451,6 @@ impl ScanRegion {
}
}
/// Config for parallel scan.
#[derive(Debug, Clone, Copy, Default)]
pub(crate) struct ScanParallelism {
/// Number of tasks expect to spawn to read data.
pub(crate) parallelism: usize,
/// Channel size to send batches. Only takes effect when the parallelism > 1.
pub(crate) channel_size: usize,
}
/// Returns true if the time range of a SST `file` matches the `predicate`.
fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
if predicate == &TimestampRange::min_to_max() {
@@ -466,8 +480,8 @@ pub(crate) struct ScanInput {
pub(crate) cache_manager: CacheManagerRef,
/// Ignores file not found error.
ignore_file_not_found: bool,
/// Parallelism to scan data.
pub(crate) parallelism: ScanParallelism,
/// Capacity of the channel to send data from parallel scan tasks to the main task.
pub(crate) parallel_scan_channel_size: usize,
/// Index appliers.
inverted_index_applier: Option<InvertedIndexApplierRef>,
fulltext_index_applier: Option<FulltextIndexApplierRef>,
@@ -496,7 +510,7 @@ impl ScanInput {
files: Vec::new(),
cache_manager: CacheManagerRef::default(),
ignore_file_not_found: false,
parallelism: ScanParallelism::default(),
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
inverted_index_applier: None,
fulltext_index_applier: None,
query_start: None,
@@ -549,10 +563,13 @@ impl ScanInput {
self
}
/// Sets scan parallelism.
/// Sets scan task channel size.
#[must_use]
pub(crate) fn with_parallelism(mut self, parallelism: ScanParallelism) -> Self {
self.parallelism = parallelism;
pub(crate) fn with_parallel_scan_channel_size(
mut self,
parallel_scan_channel_size: usize,
) -> Self {
self.parallel_scan_channel_size = parallel_scan_channel_size;
self
}
@@ -621,12 +638,15 @@ impl ScanInput {
sources: Vec<Source>,
semaphore: Arc<Semaphore>,
) -> Result<Vec<Source>> {
debug_assert!(self.parallelism.parallelism > 1);
if sources.len() <= 1 {
return Ok(sources);
}
// Spawn a task for each source.
let sources = sources
.into_iter()
.map(|source| {
let (sender, receiver) = mpsc::channel(self.parallelism.channel_size);
let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
self.spawn_scan_task(source, semaphore.clone(), sender);
let stream = Box::pin(ReceiverStream::new(receiver));
Source::Stream(stream)
@@ -761,9 +781,9 @@ pub(crate) struct StreamContext {
impl StreamContext {
/// Creates a new [StreamContext] for [SeqScan].
pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
pub(crate) fn seq_scan_ctx(input: ScanInput, compaction: bool) -> Self {
let query_start = input.query_start.unwrap_or_else(Instant::now);
let ranges = RangeMeta::seq_scan_ranges(&input);
let ranges = RangeMeta::seq_scan_ranges(&input, compaction);
READ_SST_COUNT.observe(input.num_files() as f64);
Self {

View File

@@ -28,7 +28,7 @@ use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
use datatypes::schema::SchemaRef;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{PartitionRange, RegionScanner, ScannerProperties};
use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, ScannerProperties};
use store_api::storage::TimeSeriesRowSelector;
use tokio::sync::Semaphore;
@@ -51,39 +51,27 @@ pub struct SeqScan {
properties: ScannerProperties,
/// Context of streams.
stream_ctx: Arc<StreamContext>,
/// Semaphore to control scan parallelism of files.
/// Streams created by the scanner share the same semaphore.
semaphore: Arc<Semaphore>,
/// The scanner is used for compaction.
compaction: bool,
}
impl SeqScan {
/// Creates a new [SeqScan].
pub(crate) fn new(input: ScanInput) -> Self {
// TODO(yingwen): Set permits according to partition num. But we need to support file
// level parallelism.
let parallelism = input.parallelism.parallelism.max(1);
/// Creates a new [SeqScan] with the given input and compaction flag.
/// If `compaction` is true, the scanner will not attempt to split ranges.
pub(crate) fn new(input: ScanInput, compaction: bool) -> Self {
let mut properties = ScannerProperties::default()
.with_append_mode(input.append_mode)
.with_total_rows(input.total_rows());
let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input, compaction));
properties.partitions = vec![stream_ctx.partition_ranges()];
Self {
properties,
stream_ctx,
semaphore: Arc::new(Semaphore::new(parallelism)),
compaction: false,
compaction,
}
}
/// Sets the scanner to be used for compaction.
pub(crate) fn with_compaction(mut self) -> Self {
self.compaction = true;
self
}
/// Builds a stream for the query.
///
/// The returned stream is not partitioned and will contains all the data. If want
@@ -98,7 +86,12 @@ impl SeqScan {
}
/// Builds a [BoxedBatchReader] from sequential scan for compaction.
pub async fn build_reader(&self) -> Result<BoxedBatchReader> {
///
/// # Panics
/// Panics if the compaction flag is not set.
pub async fn build_reader_for_compaction(&self) -> Result<BoxedBatchReader> {
assert!(self.compaction);
let part_metrics = PartitionMetrics::new(
self.stream_ctx.input.mapper.metadata().region_id,
0,
@@ -112,23 +105,20 @@ impl SeqScan {
debug_assert_eq!(1, self.properties.partitions.len());
let partition_ranges = &self.properties.partitions[0];
let reader = Self::build_all_merge_reader(
let reader = Self::merge_all_ranges_for_compaction(
&self.stream_ctx,
partition_ranges,
self.semaphore.clone(),
self.compaction,
&part_metrics,
)
.await?;
Ok(Box::new(reader))
}
/// Builds a merge reader that reads all data.
async fn build_all_merge_reader(
/// Builds a merge reader that reads all ranges.
/// Callers MUST not split ranges before calling this method.
async fn merge_all_ranges_for_compaction(
stream_ctx: &Arc<StreamContext>,
partition_ranges: &[PartitionRange],
semaphore: Arc<Semaphore>,
compaction: bool,
part_metrics: &PartitionMetrics,
) -> Result<BoxedBatchReader> {
let mut sources = Vec::new();
@@ -140,27 +130,37 @@ impl SeqScan {
build_sources(
stream_ctx,
part_range,
compaction,
true,
part_metrics,
range_builder_list.clone(),
&mut sources,
);
}
Self::build_reader_from_sources(stream_ctx, sources, semaphore).await
common_telemetry::debug!(
"Build reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
stream_ctx.input.mapper.metadata().region_id,
partition_ranges.len(),
sources.len()
);
Self::build_reader_from_sources(stream_ctx, sources, None).await
}
/// Builds a reader to read sources. If `semaphore` is provided, reads sources in parallel
/// if possible.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
async fn build_reader_from_sources(
stream_ctx: &StreamContext,
mut sources: Vec<Source>,
semaphore: Arc<Semaphore>,
semaphore: Option<Arc<Semaphore>>,
) -> Result<BoxedBatchReader> {
if stream_ctx.input.parallelism.parallelism > 1 {
// Read sources in parallel. We always spawn a task so we can control the parallelism
// by the semaphore.
sources = stream_ctx
.input
.create_parallel_sources(sources, semaphore.clone())?;
if let Some(semaphore) = semaphore.as_ref() {
// Read sources in parallel.
if sources.len() > 1 {
sources = stream_ctx
.input
.create_parallel_sources(sources, semaphore.clone())?;
}
}
let mut builder = MergeReaderBuilder::from_sources(sources);
@@ -207,10 +207,21 @@ impl SeqScan {
}
let stream_ctx = self.stream_ctx.clone();
let semaphore = self.semaphore.clone();
let semaphore = if self.properties.target_partitions() > self.properties.num_partitions() {
// We can use additional tasks to read the data if we have more target partitions than actual partitions.
// This semaphore is partition level.
// We don't use a global semaphore to avoid a partition waiting for others. The final concurrency
// of tasks usually won't exceed the target partitions a lot as compaction can reduce the number of
// files in a part range.
Some(Arc::new(Semaphore::new(
self.properties.target_partitions() - self.properties.num_partitions() + 1,
)))
} else {
None
};
let partition_ranges = self.properties.partitions[partition].clone();
let compaction = self.compaction;
let distinguish_range = self.properties.distinguish_partition_range();
let distinguish_range = self.properties.distinguish_partition_range;
let part_metrics = PartitionMetrics::new(
self.stream_ctx.input.mapper.metadata().region_id,
partition,
@@ -325,13 +336,8 @@ impl RegionScanner for SeqScan {
self.scan_partition_impl(partition)
}
fn prepare(
&mut self,
ranges: Vec<Vec<PartitionRange>>,
distinguish_partition_range: bool,
) -> Result<(), BoxedError> {
self.properties.partitions = ranges;
self.properties.distinguish_partition_range = distinguish_partition_range;
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
self.properties.prepare(request);
Ok(())
}
@@ -375,6 +381,20 @@ fn build_sources(
) {
// Gets range meta.
let range_meta = &stream_ctx.ranges[part_range.identifier];
#[cfg(debug_assertions)]
if compaction {
// Compaction expects input sources are not been split.
debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
// It should scan all row groups.
debug_assert_eq!(
-1, row_group_idx.row_group_index,
"Expect {} range scan all row groups, given: {}",
i, row_group_idx.row_group_index,
);
}
}
sources.reserve(range_meta.row_group_indices.len());
for index in &range_meta.row_group_indices {
let stream = if stream_ctx.is_mem_range_index(*index) {

View File

@@ -27,7 +27,7 @@ use datatypes::schema::SchemaRef;
use futures::{Stream, StreamExt};
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{PartitionRange, RegionScanner, ScannerProperties};
use store_api::region_engine::{PrepareRequest, RegionScanner, ScannerProperties};
use crate::error::{PartitionOutOfRangeSnafu, Result};
use crate::read::range::RangeBuilderList;
@@ -144,7 +144,7 @@ impl UnorderedScan {
);
let stream_ctx = self.stream_ctx.clone();
let part_ranges = self.properties.partitions[partition].clone();
let distinguish_range = self.properties.distinguish_partition_range();
let distinguish_range = self.properties.distinguish_partition_range;
let stream = try_stream! {
part_metrics.on_first_poll();
@@ -231,13 +231,8 @@ impl RegionScanner for UnorderedScan {
self.stream_ctx.input.mapper.output_schema()
}
fn prepare(
&mut self,
ranges: Vec<Vec<PartitionRange>>,
distinguish_partition_range: bool,
) -> Result<(), BoxedError> {
self.properties.partitions = ranges;
self.properties.distinguish_partition_range = distinguish_partition_range;
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
self.properties.prepare(request);
Ok(())
}

View File

@@ -25,8 +25,8 @@ mod handle_manifest;
mod handle_open;
mod handle_truncate;
mod handle_write;
use std::collections::HashMap;
use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
@@ -50,7 +50,7 @@ use crate::cache::write_cache::{WriteCache, WriteCacheRef};
use crate::cache::{CacheManager, CacheManagerRef};
use crate::compaction::CompactionScheduler;
use crate::config::MitoConfig;
use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu};
use crate::error::{CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
use crate::memtable::MemtableBuilderProvider;
use crate::metrics::{REGION_COUNT, WRITE_STALL_TOTAL};
@@ -373,6 +373,12 @@ async fn write_cache_from_config(
// TODO(yingwen): Remove this and document the config once the write cache is ready.
warn!("Write cache is an experimental feature");
tokio::fs::create_dir_all(Path::new(&config.experimental_write_cache_path))
.await
.context(CreateDirSnafu {
dir: &config.experimental_write_cache_path,
})?;
let cache = WriteCache::new_fs(
&config.experimental_write_cache_path,
object_store_manager,

View File

@@ -24,3 +24,5 @@ pub mod manager;
mod metrics;
pub mod test_util;
pub mod util;
/// The default object cache directory name.
pub const OBJECT_CACHE_DIR: &str = "object_cache";

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BinaryHeap;
use std::sync::Arc;
use common_telemetry::debug;
@@ -93,7 +94,7 @@ impl ParallelizeScan {
// update the partition ranges
let new_exec = region_scan_exec
.with_new_partitions(partition_ranges)
.with_new_partitions(partition_ranges, expected_partition_num)
.map_err(|e| DataFusionError::External(e.into_inner()))?;
return Ok(Transformed::yes(Arc::new(new_exec)));
}
@@ -109,21 +110,71 @@ impl ParallelizeScan {
/// Distribute [`PartitionRange`]s to each partition.
///
/// Currently we use a simple round-robin strategy to assign ranges to partitions.
/// Currently we assign ranges to partitions according to their rows so each partition
/// has similar number of rows.
/// This method may return partitions with smaller number than `expected_partition_num`
/// if the number of ranges is smaller than `expected_partition_num`. But this will
/// return at least one partition.
fn assign_partition_range(
ranges: Vec<PartitionRange>,
mut ranges: Vec<PartitionRange>,
expected_partition_num: usize,
) -> Vec<Vec<PartitionRange>> {
let actual_partition_num = expected_partition_num.min(ranges.len()).max(1);
if ranges.is_empty() {
// Returns a single partition with no range.
return vec![vec![]];
}
if ranges.len() == 1 {
return vec![ranges];
}
// Sort ranges by number of rows in descending order.
ranges.sort_by(|a, b| b.num_rows.cmp(&a.num_rows));
// Get the max row number of the ranges. Note that the number of rows may be 0 if statistics are not available.
let max_rows = ranges[0].num_rows;
let total_rows = ranges.iter().map(|range| range.num_rows).sum::<usize>();
// Computes the partition num by the max row number. This eliminates the unbalance of the partitions.
let balanced_partition_num = if max_rows > 0 {
total_rows.div_ceil(max_rows)
} else {
ranges.len()
};
let actual_partition_num = expected_partition_num.min(balanced_partition_num).max(1);
let mut partition_ranges = vec![vec![]; actual_partition_num];
// round-robin assignment
for (i, range) in ranges.into_iter().enumerate() {
let partition_idx = i % expected_partition_num;
#[derive(Eq, PartialEq)]
struct HeapNode {
num_rows: usize,
partition_idx: usize,
}
impl Ord for HeapNode {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// Reverse for min-heap.
self.num_rows.cmp(&other.num_rows).reverse()
}
}
impl PartialOrd for HeapNode {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
let mut part_heap =
BinaryHeap::from_iter((0..actual_partition_num).map(|partition_idx| HeapNode {
num_rows: 0,
partition_idx,
}));
// Assigns the range to the partition with the smallest number of rows.
for range in ranges {
// Safety: actual_partition_num always > 0.
let mut node = part_heap.pop().unwrap();
let partition_idx = node.partition_idx;
node.num_rows += range.num_rows;
partition_ranges[partition_idx].push(range);
part_heap.push(node);
}
partition_ranges
@@ -172,18 +223,18 @@ mod test {
ParallelizeScan::assign_partition_range(ranges.clone(), expected_partition_num);
let expected = vec![
vec![
PartitionRange {
start: Timestamp::new(30, TimeUnit::Second),
end: Timestamp::new(40, TimeUnit::Second),
num_rows: 250,
identifier: 4,
},
PartitionRange {
start: Timestamp::new(0, TimeUnit::Second),
end: Timestamp::new(10, TimeUnit::Second),
num_rows: 100,
identifier: 1,
},
PartitionRange {
start: Timestamp::new(20, TimeUnit::Second),
end: Timestamp::new(30, TimeUnit::Second),
num_rows: 150,
identifier: 3,
},
],
vec![
PartitionRange {
@@ -193,10 +244,10 @@ mod test {
identifier: 2,
},
PartitionRange {
start: Timestamp::new(30, TimeUnit::Second),
end: Timestamp::new(40, TimeUnit::Second),
num_rows: 250,
identifier: 4,
start: Timestamp::new(20, TimeUnit::Second),
end: Timestamp::new(30, TimeUnit::Second),
num_rows: 150,
identifier: 3,
},
],
];
@@ -207,10 +258,10 @@ mod test {
let result = ParallelizeScan::assign_partition_range(ranges, expected_partition_num);
let expected = vec![
vec![PartitionRange {
start: Timestamp::new(0, TimeUnit::Second),
end: Timestamp::new(10, TimeUnit::Second),
num_rows: 100,
identifier: 1,
start: Timestamp::new(30, TimeUnit::Second),
end: Timestamp::new(40, TimeUnit::Second),
num_rows: 250,
identifier: 4,
}],
vec![PartitionRange {
start: Timestamp::new(10, TimeUnit::Second),
@@ -218,18 +269,20 @@ mod test {
num_rows: 200,
identifier: 2,
}],
vec![PartitionRange {
start: Timestamp::new(20, TimeUnit::Second),
end: Timestamp::new(30, TimeUnit::Second),
num_rows: 150,
identifier: 3,
}],
vec![PartitionRange {
start: Timestamp::new(30, TimeUnit::Second),
end: Timestamp::new(40, TimeUnit::Second),
num_rows: 250,
identifier: 4,
}],
vec![
PartitionRange {
start: Timestamp::new(20, TimeUnit::Second),
end: Timestamp::new(30, TimeUnit::Second),
num_rows: 150,
identifier: 3,
},
PartitionRange {
start: Timestamp::new(0, TimeUnit::Second),
end: Timestamp::new(10, TimeUnit::Second),
num_rows: 100,
identifier: 1,
},
],
];
assert_eq!(result, expected);
@@ -237,4 +290,68 @@ mod test {
let result = ParallelizeScan::assign_partition_range(vec![], 5);
assert_eq!(result.len(), 1);
}
#[test]
fn test_assign_unbalance_partition_range() {
let ranges = vec![
PartitionRange {
start: Timestamp::new(0, TimeUnit::Second),
end: Timestamp::new(10, TimeUnit::Second),
num_rows: 100,
identifier: 1,
},
PartitionRange {
start: Timestamp::new(10, TimeUnit::Second),
end: Timestamp::new(20, TimeUnit::Second),
num_rows: 200,
identifier: 2,
},
PartitionRange {
start: Timestamp::new(20, TimeUnit::Second),
end: Timestamp::new(30, TimeUnit::Second),
num_rows: 150,
identifier: 3,
},
PartitionRange {
start: Timestamp::new(30, TimeUnit::Second),
end: Timestamp::new(40, TimeUnit::Second),
num_rows: 2500,
identifier: 4,
},
];
// assign to 2 partitions
let expected_partition_num = 2;
let result =
ParallelizeScan::assign_partition_range(ranges.clone(), expected_partition_num);
let expected = vec![
vec![PartitionRange {
start: Timestamp::new(30, TimeUnit::Second),
end: Timestamp::new(40, TimeUnit::Second),
num_rows: 2500,
identifier: 4,
}],
vec![
PartitionRange {
start: Timestamp::new(10, TimeUnit::Second),
end: Timestamp::new(20, TimeUnit::Second),
num_rows: 200,
identifier: 2,
},
PartitionRange {
start: Timestamp::new(20, TimeUnit::Second),
end: Timestamp::new(30, TimeUnit::Second),
num_rows: 150,
identifier: 3,
},
PartitionRange {
start: Timestamp::new(0, TimeUnit::Second),
end: Timestamp::new(10, TimeUnit::Second),
num_rows: 100,
identifier: 1,
},
],
];
assert_eq!(result, expected);
}
}

View File

@@ -215,6 +215,7 @@ pub fn create_table_stmt(
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;
use common_time::timestamp::TimeUnit;
use datatypes::prelude::ConcreteDataType;
@@ -258,13 +259,22 @@ mod tests {
let catalog_name = "greptime".to_string();
let regions = vec![0, 1, 2];
let mut options = table::requests::TableOptions {
ttl: Some(Duration::from_secs(30).into()),
..Default::default()
};
let _ = options
.extra_options
.insert("compaction.type".to_string(), "twcs".to_string());
let meta = TableMetaBuilder::default()
.schema(table_schema)
.primary_key_indices(vec![0, 1])
.value_indices(vec![2, 3])
.engine("mito".to_string())
.next_column_id(0)
.options(Default::default())
.options(options)
.created_on(Default::default())
.region_numbers(regions)
.build()
@@ -301,7 +311,10 @@ CREATE TABLE IF NOT EXISTS "system_metrics" (
INVERTED INDEX ("host")
)
ENGINE=mito
"#,
WITH(
'compaction.type' = 'twcs',
ttl = '30s'
)"#,
sql
);
}

View File

@@ -77,7 +77,7 @@ openmetrics-parser = "0.4"
opensrv-mysql = { git = "https://github.com/datafuselabs/opensrv", rev = "6bbc3b65e6b19212c4f7fc4f40c20daf6f452deb" }
opentelemetry-proto.workspace = true
parking_lot.workspace = true
pgwire = { version = "0.25.0", default-features = false, features = ["server-api-ring"] }
pgwire = { version = "0.28.0", default-features = false, features = ["server-api-ring"] }
pin-project = "1.0"
pipeline.workspace = true
postgres-types = { version = "0.2", features = ["with-chrono-0_4", "with-serde_json-1"] }

View File

@@ -1 +1 @@
v0.7.0
v0.7.1

View File

@@ -33,7 +33,7 @@ use ::auth::UserProviderRef;
use derive_builder::Builder;
use pgwire::api::auth::ServerParameterProvider;
use pgwire::api::copy::NoopCopyHandler;
use pgwire::api::{ClientInfo, PgWireHandlerFactory};
use pgwire::api::{ClientInfo, PgWireServerHandlers};
pub use server::PostgresServer;
use session::context::Channel;
use session::Session;
@@ -90,11 +90,12 @@ pub(crate) struct MakePostgresServerHandler {
pub(crate) struct PostgresServerHandler(Arc<PostgresServerHandlerInner>);
impl PgWireHandlerFactory for PostgresServerHandler {
impl PgWireServerHandlers for PostgresServerHandler {
type StartupHandler = PostgresServerHandlerInner;
type SimpleQueryHandler = PostgresServerHandlerInner;
type ExtendedQueryHandler = PostgresServerHandlerInner;
type CopyHandler = NoopCopyHandler;
type ErrorHandler = PostgresServerHandlerInner;
fn simple_query_handler(&self) -> Arc<Self::SimpleQueryHandler> {
self.0.clone()
@@ -111,6 +112,10 @@ impl PgWireHandlerFactory for PostgresServerHandler {
fn copy_handler(&self) -> Arc<Self::CopyHandler> {
Arc::new(NoopCopyHandler)
}
fn error_handler(&self) -> Arc<Self::ErrorHandler> {
self.0.clone()
}
}
impl MakePostgresServerHandler {

View File

@@ -177,7 +177,7 @@ impl StartupHandler for PostgresServerHandlerInner {
client.metadata().get(super::METADATA_USER).cloned(),
));
set_client_info(client, &self.session);
auth::finish_authentication(client, self.param_provider.as_ref()).await;
auth::finish_authentication(client, self.param_provider.as_ref()).await?;
}
}
PgWireFrontendMessage::PasswordMessageFamily(pwd) => {
@@ -194,7 +194,7 @@ impl StartupHandler for PostgresServerHandlerInner {
if let Ok(Some(user_info)) = auth_result {
self.session.set_user_info(user_info);
set_client_info(client, &self.session);
auth::finish_authentication(client, self.param_provider.as_ref()).await;
auth::finish_authentication(client, self.param_provider.as_ref()).await?;
} else {
return send_error(
client,

View File

@@ -78,14 +78,16 @@ pub(crate) fn process<'a>(query: &str, query_ctx: QueryContextRef) -> Option<Vec
if START_TRANSACTION_PATTERN.is_match(query) {
set_transaction_warning(query_ctx);
if query.to_lowercase().starts_with("begin") {
Some(vec![Response::Execution(Tag::new("BEGIN"))])
Some(vec![Response::TransactionStart(Tag::new("BEGIN"))])
} else {
Some(vec![Response::Execution(Tag::new("START TRANSACTION"))])
Some(vec![Response::TransactionStart(Tag::new(
"START TRANSACTION",
))])
}
} else if ABORT_TRANSACTION_PATTERN.is_match(query) {
Some(vec![Response::Execution(Tag::new("ROLLBACK"))])
Some(vec![Response::TransactionEnd(Tag::new("ROLLBACK"))])
} else if COMMIT_TRANSACTION_PATTERN.is_match(query) {
Some(vec![Response::Execution(Tag::new("COMMIT"))])
Some(vec![Response::TransactionEnd(Tag::new("COMMIT"))])
} else if let Some(show_var) = SHOW_PATTERN.captures(query) {
let show_var = show_var[1].to_lowercase();
if let Some(value) = VAR_VALUES.get(&show_var.as_ref()) {
@@ -127,7 +129,9 @@ mod test {
use super::*;
fn assert_tag(q: &str, t: &str, query_context: QueryContextRef) {
if let Response::Execution(tag) = process(q, query_context.clone())
if let Response::Execution(tag)
| Response::TransactionStart(tag)
| Response::TransactionEnd(tag) = process(q, query_context.clone())
.unwrap_or_else(|| panic!("fail to match {}", q))
.remove(0)
{

View File

@@ -31,7 +31,7 @@ use pgwire::api::results::{
DataRowEncoder, DescribePortalResponse, DescribeStatementResponse, QueryResponse, Response, Tag,
};
use pgwire::api::stmt::{QueryParser, StoredStatement};
use pgwire::api::{ClientInfo, Type};
use pgwire::api::{ClientInfo, ErrorHandler, Type};
use pgwire::error::{ErrorInfo, PgWireError, PgWireResult};
use pgwire::messages::PgWireBackendMessage;
use query::query_engine::DescribeResult;
@@ -414,3 +414,12 @@ impl ExtendedQueryHandler for PostgresServerHandlerInner {
}
}
}
impl ErrorHandler for PostgresServerHandlerInner {
fn on_error<C>(&self, _client: &C, error: &mut PgWireError)
where
C: ClientInfo,
{
debug!("Postgres interface error {}", error)
}
}

View File

@@ -27,7 +27,6 @@ impl ToSqlText for HexOutputBytea<'_> {
where
Self: Sized,
{
out.put_slice(b"\\x");
let _ = self.0.to_sql_text(ty, out);
Ok(IsNull::No)
}

View File

@@ -79,10 +79,18 @@ impl OptionMap {
pub fn kv_pairs(&self) -> Vec<String> {
let mut result = Vec::with_capacity(self.options.len() + self.secrets.len());
for (k, v) in self.options.iter() {
result.push(format!("{k} = '{}'", v.escape_default()));
if k.contains(".") {
result.push(format!("'{k}' = '{}'", v.escape_default()));
} else {
result.push(format!("{k} = '{}'", v.escape_default()));
}
}
for (k, _) in self.secrets.iter() {
result.push(format!("{k} = '******'"));
if k.contains(".") {
result.push(format!("'{k}' = '******'"));
} else {
result.push(format!("{k} = '******'"));
}
}
result
}

View File

@@ -206,16 +206,13 @@ pub struct ScannerProperties {
/// Whether to yield an empty batch to distinguish partition ranges.
pub distinguish_partition_range: bool,
/// The target partitions of the scanner. 0 indicates using the number of partitions as target partitions.
target_partitions: usize,
}
impl ScannerProperties {
/// Initialize partitions with given parallelism for scanner.
pub fn with_parallelism(mut self, parallelism: usize) -> Self {
self.partitions = vec![vec![]; parallelism];
self
}
/// Set append mode for scanner.
/// Sets append mode for scanner.
pub fn with_append_mode(mut self, append_mode: bool) -> Self {
self.append_mode = append_mode;
self
@@ -234,9 +231,24 @@ impl ScannerProperties {
append_mode,
total_rows,
distinguish_partition_range: false,
target_partitions: 0,
}
}
/// Updates the properties with the given [PrepareRequest].
pub fn prepare(&mut self, request: PrepareRequest) {
if let Some(ranges) = request.ranges {
self.partitions = ranges;
}
if let Some(distinguish_partition_range) = request.distinguish_partition_range {
self.distinguish_partition_range = distinguish_partition_range;
}
if let Some(target_partitions) = request.target_partitions {
self.target_partitions = target_partitions;
}
}
/// Returns the number of actual partitions.
pub fn num_partitions(&self) -> usize {
self.partitions.len()
}
@@ -249,8 +261,44 @@ impl ScannerProperties {
self.total_rows
}
pub fn distinguish_partition_range(&self) -> bool {
self.distinguish_partition_range
/// Returns the target partitions of the scanner. If it is not set, returns the number of partitions.
pub fn target_partitions(&self) -> usize {
if self.target_partitions == 0 {
self.num_partitions()
} else {
self.target_partitions
}
}
}
/// Request to override the scanner properties.
#[derive(Default)]
pub struct PrepareRequest {
/// Assigned partition ranges.
pub ranges: Option<Vec<Vec<PartitionRange>>>,
/// Distringuishes partition range by empty batches.
pub distinguish_partition_range: Option<bool>,
/// The expected number of target partitions.
pub target_partitions: Option<usize>,
}
impl PrepareRequest {
/// Sets the ranges.
pub fn with_ranges(mut self, ranges: Vec<Vec<PartitionRange>>) -> Self {
self.ranges = Some(ranges);
self
}
/// Sets the distinguish partition range flag.
pub fn with_distinguish_partition_range(mut self, distinguish_partition_range: bool) -> Self {
self.distinguish_partition_range = Some(distinguish_partition_range);
self
}
/// Sets the target partitions.
pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
self.target_partitions = Some(target_partitions);
self
}
}
@@ -271,11 +319,7 @@ pub trait RegionScanner: Debug + DisplayAs + Send {
/// Prepares the scanner with the given partition ranges.
///
/// This method is for the planner to adjust the scanner's behavior based on the partition ranges.
fn prepare(
&mut self,
ranges: Vec<Vec<PartitionRange>>,
distinguish_partition_range: bool,
) -> Result<(), BoxedError>;
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError>;
/// Scans the partition and returns a stream of record batches.
///
@@ -431,9 +475,7 @@ impl SinglePartitionScanner {
Self {
stream: Mutex::new(Some(stream)),
schema,
properties: ScannerProperties::default()
.with_parallelism(1)
.with_append_mode(append_mode),
properties: ScannerProperties::default().with_append_mode(append_mode),
metadata,
}
}
@@ -454,13 +496,8 @@ impl RegionScanner for SinglePartitionScanner {
self.schema.clone()
}
fn prepare(
&mut self,
ranges: Vec<Vec<PartitionRange>>,
distinguish_partition_range: bool,
) -> Result<(), BoxedError> {
self.properties.partitions = ranges;
self.properties.distinguish_partition_range = distinguish_partition_range;
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
self.properties.prepare(request);
Ok(())
}

View File

@@ -35,7 +35,7 @@ use datafusion_common::{ColumnStatistics, DataFusionError, Statistics};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalSortExpr};
use datatypes::arrow::datatypes::SchemaRef as ArrowSchemaRef;
use futures::{Stream, StreamExt};
use store_api::region_engine::{PartitionRange, RegionScannerRef};
use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScannerRef};
use crate::table::metrics::StreamMetrics;
@@ -112,6 +112,7 @@ impl RegionScanExec {
pub fn with_new_partitions(
&self,
partitions: Vec<Vec<PartitionRange>>,
target_partitions: usize,
) -> Result<Self, BoxedError> {
if self.is_partition_set {
warn!("Setting partition ranges more than once for RegionScanExec");
@@ -123,8 +124,11 @@ impl RegionScanExec {
{
let mut scanner = self.scanner.lock().unwrap();
let distinguish_partition_range = scanner.properties().distinguish_partition_range();
scanner.prepare(partitions, distinguish_partition_range)?;
scanner.prepare(
PrepareRequest::default()
.with_ranges(partitions)
.with_target_partitions(target_partitions),
)?;
}
Ok(Self {
@@ -141,9 +145,10 @@ impl RegionScanExec {
pub fn with_distinguish_partition_range(&self, distinguish_partition_range: bool) {
let mut scanner = self.scanner.lock().unwrap();
let partition_ranges = scanner.properties().partitions.clone();
// set distinguish_partition_range won't fail
let _ = scanner.prepare(partition_ranges, distinguish_partition_range);
let _ = scanner.prepare(
PrepareRequest::default().with_distinguish_partition_range(distinguish_partition_range),
);
}
pub fn time_index(&self) -> String {

View File

@@ -233,6 +233,9 @@ pub fn get_test_store_config(store_type: &StorageType) -> (ObjectStoreConfig, Te
if *store_type == StorageType::S3WithCache {
s3_config.cache.cache_path = Some("/tmp/greptimedb_cache".to_string());
} else {
// An empty string means disabling.
s3_config.cache.cache_path = Some("".to_string());
}
let mut builder = S3::default()

View File

@@ -917,7 +917,7 @@ compress_manifest = false
auto_flush_interval = "30m"
enable_experimental_write_cache = false
experimental_write_cache_path = ""
experimental_write_cache_size = "1GiB"
experimental_write_cache_size = "5GiB"
sst_write_buffer_size = "8MiB"
parallel_scan_channel_size = 32
allow_stale_entries = false

View File

@@ -22,8 +22,9 @@ mod sql;
mod region_migration;
grpc_tests!(File, S3, S3WithCache, Oss, Azblob, Gcs);
http_tests!(File, S3, S3WithCache, Oss, Azblob, Gcs);
// region_failover_tests!(File, S3, S3WithCache, Oss, Azblob);
sql_tests!(File);
region_migration_tests!(File);

View File

@@ -173,28 +173,28 @@ Affected Rows: 0
SHOW CREATE TABLE ato;
+-------+----------------------------------------------------+
| Table | Create Table |
+-------+----------------------------------------------------+
| ato | CREATE TABLE IF NOT EXISTS "ato" ( |
| | "i" INT NULL, |
| | "j" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("j"), |
| | PRIMARY KEY ("i") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | compaction.twcs.max_active_window_files = '2', |
| | compaction.twcs.max_active_window_runs = '6', |
| | compaction.twcs.max_inactive_window_files = '2', |
| | compaction.twcs.max_inactive_window_runs = '6', |
| | compaction.twcs.max_output_file_size = '500MB', |
| | compaction.twcs.time_window = '2h', |
| | compaction.type = 'twcs', |
| | ttl = '1s' |
| | ) |
+-------+----------------------------------------------------+
+-------+------------------------------------------------------+
| Table | Create Table |
+-------+------------------------------------------------------+
| ato | CREATE TABLE IF NOT EXISTS "ato" ( |
| | "i" INT NULL, |
| | "j" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("j"), |
| | PRIMARY KEY ("i") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | 'compaction.twcs.max_active_window_files' = '2', |
| | 'compaction.twcs.max_active_window_runs' = '6', |
| | 'compaction.twcs.max_inactive_window_files' = '2', |
| | 'compaction.twcs.max_inactive_window_runs' = '6', |
| | 'compaction.twcs.max_output_file_size' = '500MB', |
| | 'compaction.twcs.time_window' = '2h', |
| | 'compaction.type' = 'twcs', |
| | ttl = '1s' |
| | ) |
+-------+------------------------------------------------------+
ALTER TABLE ato UNSET 'compaction.twcs.time_window';
@@ -206,27 +206,27 @@ Error: 1004(InvalidArguments), Invalid unset table option request: Invalid set r
SHOW CREATE TABLE ato;
+-------+----------------------------------------------------+
| Table | Create Table |
+-------+----------------------------------------------------+
| ato | CREATE TABLE IF NOT EXISTS "ato" ( |
| | "i" INT NULL, |
| | "j" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("j"), |
| | PRIMARY KEY ("i") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | compaction.twcs.max_active_window_files = '2', |
| | compaction.twcs.max_active_window_runs = '6', |
| | compaction.twcs.max_inactive_window_files = '2', |
| | compaction.twcs.max_inactive_window_runs = '6', |
| | compaction.twcs.max_output_file_size = '500MB', |
| | compaction.type = 'twcs', |
| | ttl = '1s' |
| | ) |
+-------+----------------------------------------------------+
+-------+------------------------------------------------------+
| Table | Create Table |
+-------+------------------------------------------------------+
| ato | CREATE TABLE IF NOT EXISTS "ato" ( |
| | "i" INT NULL, |
| | "j" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("j"), |
| | PRIMARY KEY ("i") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | 'compaction.twcs.max_active_window_files' = '2', |
| | 'compaction.twcs.max_active_window_runs' = '6', |
| | 'compaction.twcs.max_inactive_window_files' = '2', |
| | 'compaction.twcs.max_inactive_window_runs' = '6', |
| | 'compaction.twcs.max_output_file_size' = '500MB', |
| | 'compaction.type' = 'twcs', |
| | ttl = '1s' |
| | ) |
+-------+------------------------------------------------------+
ALTER TABLE ato SET 'compaction.twcs.max_inactive_window_runs'='';
@@ -234,50 +234,50 @@ Affected Rows: 0
SHOW CREATE TABLE ato;
+-------+----------------------------------------------------+
| Table | Create Table |
+-------+----------------------------------------------------+
| ato | CREATE TABLE IF NOT EXISTS "ato" ( |
| | "i" INT NULL, |
| | "j" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("j"), |
| | PRIMARY KEY ("i") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | compaction.twcs.max_active_window_files = '2', |
| | compaction.twcs.max_active_window_runs = '6', |
| | compaction.twcs.max_inactive_window_files = '2', |
| | compaction.twcs.max_output_file_size = '500MB', |
| | compaction.type = 'twcs', |
| | ttl = '1s' |
| | ) |
+-------+----------------------------------------------------+
+-------+------------------------------------------------------+
| Table | Create Table |
+-------+------------------------------------------------------+
| ato | CREATE TABLE IF NOT EXISTS "ato" ( |
| | "i" INT NULL, |
| | "j" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("j"), |
| | PRIMARY KEY ("i") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | 'compaction.twcs.max_active_window_files' = '2', |
| | 'compaction.twcs.max_active_window_runs' = '6', |
| | 'compaction.twcs.max_inactive_window_files' = '2', |
| | 'compaction.twcs.max_output_file_size' = '500MB', |
| | 'compaction.type' = 'twcs', |
| | ttl = '1s' |
| | ) |
+-------+------------------------------------------------------+
-- SQLNESS ARG restart=true
SHOW CREATE TABLE ato;
+-------+----------------------------------------------------+
| Table | Create Table |
+-------+----------------------------------------------------+
| ato | CREATE TABLE IF NOT EXISTS "ato" ( |
| | "i" INT NULL, |
| | "j" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("j"), |
| | PRIMARY KEY ("i") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | compaction.twcs.max_active_window_files = '2', |
| | compaction.twcs.max_active_window_runs = '6', |
| | compaction.twcs.max_inactive_window_files = '2', |
| | compaction.twcs.max_output_file_size = '500MB', |
| | compaction.type = 'twcs', |
| | ttl = '1s' |
| | ) |
+-------+----------------------------------------------------+
+-------+------------------------------------------------------+
| Table | Create Table |
+-------+------------------------------------------------------+
| ato | CREATE TABLE IF NOT EXISTS "ato" ( |
| | "i" INT NULL, |
| | "j" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("j"), |
| | PRIMARY KEY ("i") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | 'compaction.twcs.max_active_window_files' = '2', |
| | 'compaction.twcs.max_active_window_runs' = '6', |
| | 'compaction.twcs.max_inactive_window_files' = '2', |
| | 'compaction.twcs.max_output_file_size' = '500MB', |
| | 'compaction.type' = 'twcs', |
| | ttl = '1s' |
| | ) |
+-------+------------------------------------------------------+
DROP TABLE ato;

View File

@@ -1,5 +1,5 @@
-- Test without PK, with a windowed sort query.
CREATE TABLE test(i INTEGER, t TIMESTAMP TIME INDEX);
CREATE TABLE test(i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.max_inactive_window_files'='4');
Affected Rows: 0
@@ -69,8 +69,8 @@ EXPLAIN ANALYZE SELECT * FROM test ORDER BY t LIMIT 5;
|_|_|_|
| 1_| 0_|_GlobalLimitExec: skip=0, fetch=5 REDACTED
|_|_|_SortPreservingMergeExec: [t@1 ASC NULLS LAST] REDACTED
|_|_|_WindowedSortExec: expr=t@1 ASC NULLS LAST num_ranges=2 fetch=5 REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=2 (1 memtable ranges, 1 file 1 ranges) REDACTED
|_|_|_WindowedSortExec: expr=t@1 ASC NULLS LAST num_ranges=4 fetch=5 REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=4 (1 memtable ranges, 3 file 3 ranges) REDACTED
|_|_|_|
|_|_| Total rows: 5_|
+-+-+-+
@@ -101,9 +101,9 @@ EXPLAIN ANALYZE SELECT * FROM test ORDER BY t DESC LIMIT 5;
|_|_|_|
| 1_| 0_|_GlobalLimitExec: skip=0, fetch=5 REDACTED
|_|_|_SortPreservingMergeExec: [t@1 DESC] REDACTED
|_|_|_WindowedSortExec: expr=t@1 DESC num_ranges=2 fetch=5 REDACTED
|_|_|_PartSortExec: expr=t@1 DESC num_ranges=2 limit=5 REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=2 (1 memtable ranges, 1 file 1 ranges) REDACTED
|_|_|_WindowedSortExec: expr=t@1 DESC num_ranges=4 fetch=5 REDACTED
|_|_|_PartSortExec: expr=t@1 DESC num_ranges=4 limit=5 REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=4 (1 memtable ranges, 3 file 3 ranges) REDACTED
|_|_|_|
|_|_| Total rows: 5_|
+-+-+-+
@@ -113,7 +113,7 @@ DROP TABLE test;
Affected Rows: 0
-- Test with PK, with a windowed sort query.
CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX);
CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.max_inactive_window_files'='4');
Affected Rows: 0
@@ -183,9 +183,9 @@ EXPLAIN ANALYZE SELECT * FROM test_pk ORDER BY t LIMIT 5;
|_|_|_|
| 1_| 0_|_GlobalLimitExec: skip=0, fetch=5 REDACTED
|_|_|_SortPreservingMergeExec: [t@2 ASC NULLS LAST] REDACTED
|_|_|_WindowedSortExec: expr=t@2 ASC NULLS LAST num_ranges=2 fetch=5 REDACTED
|_|_|_PartSortExec: expr=t@2 ASC NULLS LAST num_ranges=2 limit=5 REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=2 (1 memtable ranges, 1 file 1 ranges) REDACTED
|_|_|_WindowedSortExec: expr=t@2 ASC NULLS LAST num_ranges=4 fetch=5 REDACTED
|_|_|_PartSortExec: expr=t@2 ASC NULLS LAST num_ranges=4 limit=5 REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=4 (1 memtable ranges, 3 file 3 ranges) REDACTED
|_|_|_|
|_|_| Total rows: 5_|
+-+-+-+
@@ -216,9 +216,9 @@ EXPLAIN ANALYZE SELECT * FROM test_pk ORDER BY t DESC LIMIT 5;
|_|_|_|
| 1_| 0_|_GlobalLimitExec: skip=0, fetch=5 REDACTED
|_|_|_SortPreservingMergeExec: [t@2 DESC] REDACTED
|_|_|_WindowedSortExec: expr=t@2 DESC num_ranges=2 fetch=5 REDACTED
|_|_|_PartSortExec: expr=t@2 DESC num_ranges=2 limit=5 REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=2 (1 memtable ranges, 1 file 1 ranges) REDACTED
|_|_|_WindowedSortExec: expr=t@2 DESC num_ranges=4 fetch=5 REDACTED
|_|_|_PartSortExec: expr=t@2 DESC num_ranges=4 limit=5 REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=4 (1 memtable ranges, 3 file 3 ranges) REDACTED
|_|_|_|
|_|_| Total rows: 5_|
+-+-+-+

View File

@@ -1,5 +1,5 @@
-- Test without PK, with a windowed sort query.
CREATE TABLE test(i INTEGER, t TIMESTAMP TIME INDEX);
CREATE TABLE test(i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.max_inactive_window_files'='4');
INSERT INTO test VALUES (1, 1), (NULL, 2), (1, 3);
@@ -36,7 +36,7 @@ EXPLAIN ANALYZE SELECT * FROM test ORDER BY t DESC LIMIT 5;
DROP TABLE test;
-- Test with PK, with a windowed sort query.
CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX);
CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.max_inactive_window_files'='4');
INSERT INTO test_pk VALUES (1, 1, 1), (2, NULL, 2), (3, 1, 3);