Compare commits

...

36 Commits

Author SHA1 Message Date
Zhenchi
a6571d3392 chore: bump version to 0.10.0 (#5040)
* chore: bump version to 0.10.0

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix sqlness version regex

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2024-11-22 03:56:25 +00:00
Yohan Wal
1255638e84 refactor: unify mysql execute through cli and protocol (#5038)
refactor: mysql execute
2024-11-22 03:55:09 +00:00
Yohan Wal
1578c004b0 fix: prepare param mismatch (#5025)
* fix: prepare param mismatch

* test: clear state

* fix: minus 1
2024-11-22 02:31:53 +00:00
Yohan Wal
5f8d849981 feat: alter database ttl (#5035)
* feat: alter databaset ttl

* fix: make clippy happy

* feat: add unset database option

* fix: happy ci

* fix: happy clippy

* chore: fmt toml

* fix: fix header

* refactor: introduce `AlterDatabaseKind`

* chore: apply suggestions from CR

* refactor: add unset database option support

* test: add unit tests

* test: add sqlness tests

* feat: invalidate schema name value cache

* Apply suggestions from code review

* chore: fmt

* chore: update error messages

* test: add more test cases

* test: add more test cases

* Apply suggestions from code review

* chore: apply suggestions from CR

---------

Co-authored-by: WenyXu <wenymedia@gmail.com>
2024-11-21 12:41:41 +00:00
Lei, HUANG
3029b47a89 fix: find latest window (#5037)
* fix: find latest window

* more test files
2024-11-21 04:56:03 +00:00
Weny Xu
14d997e2d1 feat: add unset table options support (#5034)
* feat: add unset table options support

* test: add tests

* chore: update greptime-proto

* chore: add comments
2024-11-21 03:52:56 +00:00
Zhenchi
0aab68c23b feat(vector): add conversion between vector and string (#5029)
* feat(vector): add conversion between vector and string

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix sqlness

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2024-11-20 08:42:00 +00:00
Weny Xu
027284ed1b chore(cli): set default timeout for cli commands (#5021)
* chore(cli): set default timeout for cli commands

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* chore: update comments

* fix: treats `None` as `0s` to disable server-side default timeout

* chore: update comments
2024-11-20 07:36:17 +00:00
Ruihang Xia
6a958e2c36 feat: reimplement limit in PartSort to reduce memory footprint (#5018)
* feat: support windowed sort with where condition

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix split logic

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* modify fuzz test to reflect logic change

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* feat: handle sort that wont preserving partition

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* clean up

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix test case and add more cases

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* basic impl

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* install topk

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* tests: add test for limit

* add debug assertion

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: discord9 <discord9@163.com>
2024-11-20 07:11:49 +00:00
Zhenchi
db345c92df feat(vector): remove simsimd and use nalgebra instead (#5027)
* feat(vector): remove `simsimd` and use `nalgebra` instead

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* keep thing simple

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2024-11-20 06:57:10 +00:00
Yohan Wal
55ced9aa71 refactor: split up different stmts (#4997)
* refactor: set and unset

* chore: error message

* fix: reset Cargo.lock

* Apply suggestions from code review

Co-authored-by: jeremyhi <jiachun_feng@proton.me>

* Apply suggestions from code review

Co-authored-by: Weny Xu <wenymedia@gmail.com>

---------

Co-authored-by: jeremyhi <jiachun_feng@proton.me>
Co-authored-by: Weny Xu <wenymedia@gmail.com>
2024-11-20 06:02:51 +00:00
discord9
3633f25d0c feat: also shutdown gracefully on sigterm on unix (#5023)
* feat: support SIGTERM on unix

* chore: log

* fix: Result type
2024-11-19 15:20:33 +00:00
Yingwen
63bbfd04c7 fix: prune memtable/files range independently in each partition (#4998)
* feat: prune in each partition

* chore: change pick log to trace

* chore: add in progress partition scan to metrics

* feat: seqscan support pruning in partition

* chore: remove commented codes
2024-11-19 12:43:30 +00:00
Zhenchi
2f260d8b27 fix: android build failed due to simsimd (#5019)
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2024-11-19 11:21:58 +00:00
discord9
4d8fe29ea8 feat: CREATE OR REPLACE FLOW (#5001)
* feat: Replace flow

* refactor: better show create flow&tests: better check

* tests: sqlness result update

* tests: unit test for update

* refactor: cmp with raw bytes

* refactor: rename

* refactor: per review
2024-11-19 08:44:57 +00:00
Zhenchi
dbb3f2d98d fix: inverted index constraint to be case-insensitive (#5020)
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2024-11-19 08:30:20 +00:00
ZonaHe
9926e3bc78 feat: update dashboard to v0.6.1 (#5017)
Co-authored-by: ZonaHex <ZonaHex@users.noreply.github.com>
2024-11-19 07:28:10 +00:00
dennis zhuang
0dd02e93cf feat: make greatest supports timestamp and datetime types (#5005)
* feat: make greatest supports timestamp and datetime types

* chore: style

Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>

* refactor: greatest with gt_time_types macro

---------

Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>
2024-11-19 07:01:24 +00:00
Zhenchi
73e6bf399d test: reduce round precision to avoid platform diff (#5013)
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2024-11-18 16:37:15 +00:00
discord9
4402f638cd fix: distinct respect in range (#5015)
* fix: distinct respect in range

* tests: sqlness

* chore: newline
2024-11-18 12:11:07 +00:00
shuiyisong
c199604ece feat: Loki remote write (#4941)
* chore: add debug loki remote write url

* chore: add decode snappy

* chore: format output

* feature: impl loki remote write

* fix: special labels deserialize

* chore: move result to folder

* chore: finish todo in loki write

* test: loki write

* chore: fix cr issue

* chore: fix cr issue

* chore: fix cr issue

* chore: update pre-commit config

* chore: fix cr issue

Co-authored-by: dennis zhuang <killme2008@gmail.com>

---------

Co-authored-by: dennis zhuang <killme2008@gmail.com>
2024-11-18 08:39:17 +00:00
Yohan Wal
2b72e66536 test: subquery test migrated from duckdb (#4985)
* test: subquery test migrated from duckdb

* test: update test

* test: skip unsupported features and add sources
2024-11-18 08:37:06 +00:00
Weny Xu
7c135c0ef9 feat: introduce DynamicTimeoutLayer (#5006)
* feat: introduce `DynamicTimeoutLayer`

* test: add unit test

* chore: apply suggestions from CR

* feat: add timeout option for cli
2024-11-18 07:10:40 +00:00
Weny Xu
9289265f54 fix: correct unset_maintenance_mode behavior (#5009) 2024-11-18 06:39:13 +00:00
Lanqing Yang
485782af51 fix: ensure Create Or Replace and If Not Exist cannot coexist in create view (#5003)
ensure Create Or Replace and If Not Exist cannot coexist in create view statement
2024-11-17 07:08:30 +00:00
Weny Xu
4b263ef1cc fix: obsolete wal entires while opening a migrated region (#4993)
* fix: delete obsolete wal entrie while opening a migrated region

* chore: add logs

* chore: rust fmt

* fix: fix fuzz test
2024-11-15 10:55:22 +00:00
Weny Xu
08f59008cc refactor: introduce MaintenanceModeManager (#4994)
* refactor: introduce MaintenanceModeManager

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* chore: apply suggestions from CR
2024-11-15 07:15:22 +00:00
Yohan Wal
a2852affeb chore: rename change to modify (#5000)
* chore: rename change to modify

* chore: proto rev
2024-11-15 06:58:26 +00:00
Lanqing Yang
cdba7b442f feat: implement statement/execution timeout session variable (#4792)
* support set and show on statement/execution timeout session variables.

* implement statement timeout for mysql read, and postgres queries

* add mysql test with max execution time
2024-11-15 06:19:39 +00:00
Lin Yihai
42bf7e9965 refactor: Avoid wrapping Option for CacheManagerRef (#4996) 2024-11-14 13:37:02 +00:00
discord9
a70b4d7eba chore: update greptime-proto to e1070a (#4992)
* chore: update protot to e1070a

* fix: update proto usage
2024-11-14 11:59:00 +00:00
Zhenchi
408013c22b feat: add distance functions (#4987)
* feat: add distance functions

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: f64 instead

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* tiny adjust

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2024-11-14 10:18:58 +00:00
zyy17
22c8a7656b chore: update cluster dashboard (#4995) 2024-11-14 10:18:50 +00:00
discord9
35898f0b2e test: more sqlness tests for flow (#4988)
* tests: more flow testcase

* tests(WIP): more tests

* tests: more flow tests

* test: wired regex for sqlness

* refactor: put blog&example to two files
2024-11-14 07:40:14 +00:00
zyy17
1101e98651 refactor(grafana): update cluster dashboard (#4980) 2024-11-14 07:12:30 +00:00
zyy17
0089cf1b4f fix: run install.sh error (#4989)
* fix: use '/bin/sh' shebang and remove function key word

* ci: check install.sh in nightly CI
2024-11-13 21:54:24 +00:00
192 changed files with 14907 additions and 5831 deletions

14
.github/scripts/check-install-script.sh vendored Executable file
View File

@@ -0,0 +1,14 @@
#!/bin/sh
set -e
# Get the latest version of github.com/GreptimeTeam/greptimedb
VERSION=$(curl -s https://api.github.com/repos/GreptimeTeam/greptimedb/releases/latest | jq -r '.tag_name')
echo "Downloading the latest version: $VERSION"
# Download the install script
curl -fsSL https://raw.githubusercontent.com/greptimeteam/greptimedb/main/scripts/install.sh | sh -s $VERSION
# Execute the `greptime` command
./greptime --version

View File

@@ -22,6 +22,10 @@ jobs:
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Check install.sh
run: ./.github/scripts/check-install-script.sh
- name: Run sqlness test
uses: ./.github/actions/sqlness-test
with:

View File

@@ -91,7 +91,7 @@ env:
# The scheduled version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-YYYYMMDD', like v0.2.0-nigthly-20230313;
NIGHTLY_RELEASE_PREFIX: nightly
# Note: The NEXT_RELEASE_VERSION should be modified manually by every formal release.
NEXT_RELEASE_VERSION: v0.10.0
NEXT_RELEASE_VERSION: v0.11.0
# Permission reference: https://docs.github.com/en/actions/using-jobs/assigning-permissions-to-jobs
permissions:

View File

@@ -17,6 +17,6 @@ repos:
- id: fmt
- id: clippy
args: ["--workspace", "--all-targets", "--all-features", "--", "-D", "warnings"]
stages: [push]
stages: [pre-push]
- id: cargo-check
args: ["--workspace", "--all-targets", "--all-features"]

223
Cargo.lock generated
View File

@@ -208,7 +208,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
[[package]]
name = "api"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"common-base",
"common-decimal",
@@ -769,7 +769,7 @@ dependencies = [
[[package]]
name = "auth"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"api",
"async-trait",
@@ -1041,7 +1041,7 @@ dependencies = [
"bitflags 2.6.0",
"cexpr",
"clang-sys",
"itertools 0.10.5",
"itertools 0.12.1",
"lazy_static",
"lazycell",
"log",
@@ -1379,7 +1379,7 @@ dependencies = [
[[package]]
name = "cache"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"catalog",
"common-error",
@@ -1387,7 +1387,7 @@ dependencies = [
"common-meta",
"moka",
"snafu 0.8.5",
"substrait 0.9.5",
"substrait 0.10.0",
]
[[package]]
@@ -1414,7 +1414,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "catalog"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"api",
"arrow",
@@ -1753,7 +1753,7 @@ checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97"
[[package]]
name = "client"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"api",
"arc-swap",
@@ -1782,8 +1782,8 @@ dependencies = [
"rand",
"serde_json",
"snafu 0.8.5",
"substrait 0.10.0",
"substrait 0.37.3",
"substrait 0.9.5",
"tokio",
"tokio-stream",
"tonic 0.11.0",
@@ -1823,7 +1823,7 @@ dependencies = [
[[package]]
name = "cmd"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"async-trait",
"auth",
@@ -1859,6 +1859,7 @@ dependencies = [
"frontend",
"futures",
"human-panic",
"humantime",
"lazy_static",
"meta-client",
"meta-srv",
@@ -1881,7 +1882,7 @@ dependencies = [
"similar-asserts",
"snafu 0.8.5",
"store-api",
"substrait 0.9.5",
"substrait 0.10.0",
"table",
"temp-env",
"tempfile",
@@ -1927,7 +1928,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
[[package]]
name = "common-base"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"anymap2",
"async-trait",
@@ -1948,7 +1949,7 @@ dependencies = [
[[package]]
name = "common-catalog"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"chrono",
"common-error",
@@ -1959,7 +1960,7 @@ dependencies = [
[[package]]
name = "common-config"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"common-base",
"common-error",
@@ -1982,7 +1983,7 @@ dependencies = [
[[package]]
name = "common-datasource"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"arrow",
"arrow-schema",
@@ -2019,7 +2020,7 @@ dependencies = [
[[package]]
name = "common-decimal"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"bigdecimal 0.4.5",
"common-error",
@@ -2032,7 +2033,7 @@ dependencies = [
[[package]]
name = "common-error"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"snafu 0.8.5",
"strum 0.25.0",
@@ -2041,7 +2042,7 @@ dependencies = [
[[package]]
name = "common-frontend"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"api",
"async-trait",
@@ -2056,9 +2057,10 @@ dependencies = [
[[package]]
name = "common-function"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"api",
"approx 0.5.1",
"arc-swap",
"async-trait",
"common-base",
@@ -2079,6 +2081,7 @@ dependencies = [
"geohash",
"h3o",
"jsonb",
"nalgebra 0.33.2",
"num",
"num-traits",
"once_cell",
@@ -2099,7 +2102,7 @@ dependencies = [
[[package]]
name = "common-greptimedb-telemetry"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"async-trait",
"common-runtime",
@@ -2116,7 +2119,7 @@ dependencies = [
[[package]]
name = "common-grpc"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"api",
"arrow-flight",
@@ -2142,7 +2145,7 @@ dependencies = [
[[package]]
name = "common-grpc-expr"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"api",
"common-base",
@@ -2161,7 +2164,7 @@ dependencies = [
[[package]]
name = "common-macro"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"arc-swap",
"common-query",
@@ -2175,7 +2178,7 @@ dependencies = [
[[package]]
name = "common-mem-prof"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"common-error",
"common-macro",
@@ -2188,7 +2191,7 @@ dependencies = [
[[package]]
name = "common-meta"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"anymap2",
"api",
@@ -2245,7 +2248,7 @@ dependencies = [
[[package]]
name = "common-options"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"common-grpc",
"humantime-serde",
@@ -2254,11 +2257,11 @@ dependencies = [
[[package]]
name = "common-plugins"
version = "0.9.5"
version = "0.10.0"
[[package]]
name = "common-pprof"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"common-error",
"common-macro",
@@ -2270,7 +2273,7 @@ dependencies = [
[[package]]
name = "common-procedure"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"async-stream",
"async-trait",
@@ -2297,7 +2300,7 @@ dependencies = [
[[package]]
name = "common-procedure-test"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"async-trait",
"common-procedure",
@@ -2305,7 +2308,7 @@ dependencies = [
[[package]]
name = "common-query"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"api",
"async-trait",
@@ -2331,7 +2334,7 @@ dependencies = [
[[package]]
name = "common-recordbatch"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"arc-swap",
"common-error",
@@ -2350,7 +2353,7 @@ dependencies = [
[[package]]
name = "common-runtime"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -2380,7 +2383,7 @@ dependencies = [
[[package]]
name = "common-telemetry"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"atty",
"backtrace",
@@ -2408,7 +2411,7 @@ dependencies = [
[[package]]
name = "common-test-util"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"client",
"common-query",
@@ -2420,7 +2423,7 @@ dependencies = [
[[package]]
name = "common-time"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"arrow",
"chrono",
@@ -2436,7 +2439,7 @@ dependencies = [
[[package]]
name = "common-version"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"build-data",
"const_format",
@@ -2447,7 +2450,7 @@ dependencies = [
[[package]]
name = "common-wal"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"common-base",
"common-error",
@@ -3256,7 +3259,7 @@ dependencies = [
[[package]]
name = "datanode"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"api",
"arrow-flight",
@@ -3306,7 +3309,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"store-api",
"substrait 0.9.5",
"substrait 0.10.0",
"table",
"tokio",
"toml 0.8.19",
@@ -3315,7 +3318,7 @@ dependencies = [
[[package]]
name = "datatypes"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"arrow",
"arrow-array",
@@ -3933,7 +3936,7 @@ dependencies = [
[[package]]
name = "file-engine"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"api",
"async-trait",
@@ -4050,7 +4053,7 @@ checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8"
[[package]]
name = "flow"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"api",
"arrow",
@@ -4107,7 +4110,7 @@ dependencies = [
"snafu 0.8.5",
"store-api",
"strum 0.25.0",
"substrait 0.9.5",
"substrait 0.10.0",
"table",
"tokio",
"tonic 0.11.0",
@@ -4169,7 +4172,7 @@ checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
[[package]]
name = "frontend"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"api",
"arc-swap",
@@ -4594,7 +4597,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=75c5fb569183bb3d0fa1023df9c2214df722b9b1#75c5fb569183bb3d0fa1023df9c2214df722b9b1"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a875e976441188028353f7274a46a7e6e065c5d4#a875e976441188028353f7274a46a7e6e065c5d4"
dependencies = [
"prost 0.12.6",
"serde",
@@ -5090,7 +5093,7 @@ dependencies = [
"httpdate",
"itoa",
"pin-project-lite",
"socket2 0.4.10",
"socket2 0.5.7",
"tokio",
"tower-service",
"tracing",
@@ -5309,7 +5312,7 @@ dependencies = [
[[package]]
name = "index"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"async-trait",
"asynchronous-codec",
@@ -6080,7 +6083,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4"
dependencies = [
"cfg-if",
"windows-targets 0.48.5",
"windows-targets 0.52.6",
]
[[package]]
@@ -6153,7 +6156,7 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
[[package]]
name = "log-store"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"async-stream",
"async-trait",
@@ -6191,6 +6194,16 @@ dependencies = [
"uuid",
]
[[package]]
name = "loki-api"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "674883a98273598ac3aad4301724c56734bea90574c5033af067e8f9fb5eb399"
dependencies = [
"prost 0.12.6",
"prost-types 0.12.6",
]
[[package]]
name = "lrlex"
version = "0.13.7"
@@ -6473,7 +6486,7 @@ dependencies = [
[[package]]
name = "meta-client"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"api",
"async-trait",
@@ -6500,7 +6513,7 @@ dependencies = [
[[package]]
name = "meta-srv"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"api",
"async-trait",
@@ -6579,7 +6592,7 @@ dependencies = [
[[package]]
name = "metric-engine"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"api",
"aquamarine",
@@ -6682,7 +6695,7 @@ dependencies = [
[[package]]
name = "mito2"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"api",
"aquamarine",
@@ -7070,13 +7083,29 @@ checksum = "d506eb7e08d6329505faa8a3a00a5dcc6de9f76e0c77e4b75763ae3c770831ff"
dependencies = [
"approx 0.5.1",
"matrixmultiply",
"nalgebra-macros",
"nalgebra-macros 0.1.0",
"num-complex",
"num-rational",
"num-traits",
"rand",
"rand_distr",
"simba",
"simba 0.6.0",
"typenum",
]
[[package]]
name = "nalgebra"
version = "0.33.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26aecdf64b707efd1310e3544d709c5c0ac61c13756046aaaba41be5c4f66a3b"
dependencies = [
"approx 0.5.1",
"matrixmultiply",
"nalgebra-macros 0.2.2",
"num-complex",
"num-rational",
"num-traits",
"simba 0.9.0",
"typenum",
]
@@ -7091,6 +7120,17 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "nalgebra-macros"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "254a5372af8fc138e36684761d3c0cdb758a4410e938babcff1c860ce14ddbfc"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.79",
]
[[package]]
name = "named_pipe"
version = "0.4.1"
@@ -7419,7 +7459,7 @@ dependencies = [
[[package]]
name = "object-store"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"anyhow",
"bytes",
@@ -7710,9 +7750,10 @@ dependencies = [
[[package]]
name = "operator"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"api",
"async-stream",
"async-trait",
"catalog",
"chrono",
@@ -7756,7 +7797,7 @@ dependencies = [
"sql",
"sqlparser 0.45.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=54a267ac89c09b11c0c88934690530807185d3e7)",
"store-api",
"substrait 0.9.5",
"substrait 0.10.0",
"table",
"tokio",
"tokio-util",
@@ -8006,7 +8047,7 @@ dependencies = [
[[package]]
name = "partition"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"api",
"async-trait",
@@ -8307,7 +8348,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pipeline"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -8470,7 +8511,7 @@ dependencies = [
[[package]]
name = "plugins"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"auth",
"common-base",
@@ -8744,7 +8785,7 @@ dependencies = [
[[package]]
name = "promql"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"ahash 0.8.11",
"async-trait",
@@ -8822,7 +8863,7 @@ checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4"
dependencies = [
"bytes",
"heck 0.5.0",
"itertools 0.10.5",
"itertools 0.12.1",
"log",
"multimap",
"once_cell",
@@ -8874,7 +8915,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1"
dependencies = [
"anyhow",
"itertools 0.10.5",
"itertools 0.12.1",
"proc-macro2",
"quote",
"syn 2.0.79",
@@ -8982,7 +9023,7 @@ dependencies = [
[[package]]
name = "puffin"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"async-compression 0.4.13",
"async-trait",
@@ -9036,7 +9077,7 @@ dependencies = [
"indoc",
"libc",
"memoffset 0.9.1",
"parking_lot 0.11.2",
"parking_lot 0.12.3",
"portable-atomic",
"pyo3-build-config",
"pyo3-ffi",
@@ -9106,7 +9147,7 @@ dependencies = [
[[package]]
name = "query"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -9173,7 +9214,7 @@ dependencies = [
"stats-cli",
"store-api",
"streaming-stats",
"substrait 0.9.5",
"substrait 0.10.0",
"table",
"tokio",
"tokio-stream",
@@ -10636,7 +10677,7 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "script"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"api",
"arc-swap",
@@ -10930,7 +10971,7 @@ dependencies = [
[[package]]
name = "servers"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"ahash 0.8.11",
"aide",
@@ -10974,17 +11015,21 @@ dependencies = [
"datatypes",
"derive_builder 0.12.0",
"futures",
"futures-util",
"hashbrown 0.14.5",
"headers 0.3.9",
"hostname",
"http 0.2.12",
"http-body 0.4.6",
"humantime",
"humantime-serde",
"hyper 0.14.30",
"influxdb_line_protocol",
"itertools 0.10.5",
"json5",
"jsonb",
"lazy_static",
"loki-api",
"mime_guess",
"mysql_async",
"notify",
@@ -11041,7 +11086,7 @@ dependencies = [
[[package]]
name = "session"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"api",
"arc-swap",
@@ -11160,6 +11205,19 @@ dependencies = [
"wide",
]
[[package]]
name = "simba"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3a386a501cd104797982c15ae17aafe8b9261315b5d07e3ec803f2ea26be0fa"
dependencies = [
"approx 0.5.1",
"num-complex",
"num-traits",
"paste",
"wide",
]
[[package]]
name = "simdutf8"
version = "0.1.5"
@@ -11374,7 +11432,7 @@ dependencies = [
[[package]]
name = "sql"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"api",
"chrono",
@@ -11393,6 +11451,7 @@ dependencies = [
"datafusion-sql",
"datatypes",
"hex",
"humantime",
"iso8601",
"itertools 0.10.5",
"jsonb",
@@ -11402,6 +11461,7 @@ dependencies = [
"snafu 0.8.5",
"sqlparser 0.45.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=54a267ac89c09b11c0c88934690530807185d3e7)",
"sqlparser_derive 0.1.1",
"store-api",
"table",
]
@@ -11435,7 +11495,7 @@ dependencies = [
[[package]]
name = "sqlness-runner"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -11638,7 +11698,7 @@ checksum = "b35a062dbadac17a42e0fc64c27f419b25d6fae98572eb43c8814c9e873d7721"
dependencies = [
"approx 0.5.1",
"lazy_static",
"nalgebra",
"nalgebra 0.29.0",
"num-traits",
"rand",
]
@@ -11655,7 +11715,7 @@ dependencies = [
[[package]]
name = "store-api"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"api",
"aquamarine",
@@ -11826,7 +11886,7 @@ dependencies = [
[[package]]
name = "substrait"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"async-trait",
"bytes",
@@ -12025,7 +12085,7 @@ dependencies = [
[[package]]
name = "table"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"api",
"async-trait",
@@ -12291,7 +12351,7 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
[[package]]
name = "tests-fuzz"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"arbitrary",
"async-trait",
@@ -12333,7 +12393,7 @@ dependencies = [
[[package]]
name = "tests-integration"
version = "0.9.5"
version = "0.10.0"
dependencies = [
"api",
"arrow-flight",
@@ -12373,6 +12433,7 @@ dependencies = [
"futures-util",
"hex",
"itertools 0.10.5",
"loki-api",
"meta-client",
"meta-srv",
"moka",
@@ -12396,7 +12457,7 @@ dependencies = [
"sql",
"sqlx",
"store-api",
"substrait 0.9.5",
"substrait 0.10.0",
"table",
"tempfile",
"time",
@@ -13981,7 +14042,7 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
dependencies = [
"windows-sys 0.48.0",
"windows-sys 0.59.0",
]
[[package]]

View File

@@ -66,7 +66,7 @@ members = [
resolver = "2"
[workspace.package]
version = "0.9.5"
version = "0.10.0"
edition = "2021"
license = "Apache-2.0"
@@ -122,7 +122,7 @@ etcd-client = "0.13"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "75c5fb569183bb3d0fa1023df9c2214df722b9b1" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a875e976441188028353f7274a46a7e6e065c5d4" }
hex = "0.4"
humantime = "2.1"
humantime-serde = "1.1"

View File

@@ -5,6 +5,13 @@ GreptimeDB's official Grafana dashboard.
Status notify: we are still working on this config. It's expected to change frequently in the recent days. Please feel free to submit your feedback and/or contribution to this dashboard 🤗
If you use Helm [chart](https://github.com/GreptimeTeam/helm-charts) to deploy GreptimeDB cluster, you can enable self-monitoring by setting the following values in your Helm chart:
- `monitoring.enabled=true`: Deploys a standalone GreptimeDB instance dedicated to monitoring the cluster;
- `grafana.enabled=true`: Deploys Grafana and automatically imports the monitoring dashboard;
The standalone GreptimeDB instance will collect metrics from your cluster and the dashboard will be available in the Grafana UI. For detailed deployment instructions, please refer to our [Kubernetes deployment guide](https://docs.greptime.com/nightly/user-guide/deployments/deploy-on-kubernetes/getting-started).
# How to use
## `greptimedb.json`

File diff suppressed because it is too large Load Diff

View File

@@ -1,4 +1,4 @@
#!/usr/bin/env bash
#!/bin/sh
set -ue
@@ -15,7 +15,7 @@ GITHUB_ORG=GreptimeTeam
GITHUB_REPO=greptimedb
BIN=greptime
function get_os_type() {
get_os_type() {
os_type="$(uname -s)"
case "$os_type" in
@@ -31,7 +31,7 @@ function get_os_type() {
esac
}
function get_arch_type() {
get_arch_type() {
arch_type="$(uname -m)"
case "$arch_type" in
@@ -53,7 +53,7 @@ function get_arch_type() {
esac
}
function download_artifact() {
download_artifact() {
if [ -n "${OS_TYPE}" ] && [ -n "${ARCH_TYPE}" ]; then
# Use the latest stable released version.
# GitHub API reference: https://docs.github.com/en/rest/releases/releases?apiVersion=2022-11-28#get-the-latest-release.

View File

@@ -527,13 +527,14 @@ fn ddl_request_type(request: &DdlRequest) -> &'static str {
match request.expr {
Some(Expr::CreateDatabase(_)) => "ddl.create_database",
Some(Expr::CreateTable(_)) => "ddl.create_table",
Some(Expr::Alter(_)) => "ddl.alter",
Some(Expr::AlterTable(_)) => "ddl.alter_table",
Some(Expr::DropTable(_)) => "ddl.drop_table",
Some(Expr::TruncateTable(_)) => "ddl.truncate_table",
Some(Expr::CreateFlow(_)) => "ddl.create_flow",
Some(Expr::DropFlow(_)) => "ddl.drop_flow",
Some(Expr::CreateView(_)) => "ddl.create_view",
Some(Expr::DropView(_)) => "ddl.drop_view",
Some(Expr::AlterDatabase(_)) => "ddl.alter_database",
None => "ddl.empty",
}
}

View File

@@ -180,7 +180,7 @@ impl InformationSchemaSchemataBuilder {
.context(TableMetadataManagerSnafu)?
// information_schema is not available from this
// table_metadata_manager and we return None
.map(|schema_opts| format!("{schema_opts}"))
.map(|schema_opts| format!("{}", schema_opts.into_inner()))
} else {
None
};

View File

@@ -18,7 +18,7 @@ use api::v1::greptime_database_client::GreptimeDatabaseClient;
use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
use api::v1::{
AlterExpr, AuthHeader, CreateTableExpr, DdlRequest, GreptimeRequest, InsertRequests,
AlterTableExpr, AuthHeader, CreateTableExpr, DdlRequest, GreptimeRequest, InsertRequests,
QueryRequest, RequestHeader,
};
use arrow_flight::Ticket;
@@ -211,9 +211,9 @@ impl Database {
.await
}
pub async fn alter(&self, expr: AlterExpr) -> Result<Output> {
pub async fn alter(&self, expr: AlterTableExpr) -> Result<Output> {
self.do_get(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::Alter(expr)),
expr: Some(DdlExpr::AlterTable(expr)),
}))
.await
}

View File

@@ -53,6 +53,7 @@ flow.workspace = true
frontend = { workspace = true, default-features = false }
futures.workspace = true
human-panic = "2.0"
humantime.workspace = true
lazy_static.workspace = true
meta-client.workspace = true
meta-srv.workspace = true

View File

@@ -12,11 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use base64::engine::general_purpose;
use base64::Engine;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use humantime::format_duration;
use serde_json::Value;
use servers::http::greptime_result_v1::GreptimedbV1Response;
use servers::http::header::constants::GREPTIME_DB_HEADER_TIMEOUT;
use servers::http::result::greptime_result_v1::GreptimedbV1Response;
use servers::http::GreptimeQueryOutput;
use snafu::ResultExt;
@@ -26,10 +30,16 @@ pub(crate) struct DatabaseClient {
addr: String,
catalog: String,
auth_header: Option<String>,
timeout: Duration,
}
impl DatabaseClient {
pub fn new(addr: String, catalog: String, auth_basic: Option<String>) -> Self {
pub fn new(
addr: String,
catalog: String,
auth_basic: Option<String>,
timeout: Duration,
) -> Self {
let auth_header = if let Some(basic) = auth_basic {
let encoded = general_purpose::STANDARD.encode(basic);
Some(format!("basic {}", encoded))
@@ -41,6 +51,7 @@ impl DatabaseClient {
addr,
catalog,
auth_header,
timeout,
}
}
@@ -63,6 +74,11 @@ impl DatabaseClient {
request = request.header("Authorization", auth);
}
request = request.header(
GREPTIME_DB_HEADER_TIMEOUT,
format_duration(self.timeout).to_string(),
);
let response = request.send().await.with_context(|_| HttpQuerySqlSnafu {
reason: format!("bad url: {}", url),
})?;

View File

@@ -15,6 +15,7 @@
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use clap::{Parser, ValueEnum};
@@ -83,14 +84,26 @@ pub struct ExportCommand {
/// The basic authentication for connecting to the server
#[clap(long)]
auth_basic: Option<String>,
/// The timeout of invoking the database.
///
/// It is used to override the server-side timeout setting.
/// The default behavior will disable server-side default timeout(i.e. `0s`).
#[clap(long, value_parser = humantime::parse_duration)]
timeout: Option<Duration>,
}
impl ExportCommand {
pub async fn build(&self, guard: Vec<WorkerGuard>) -> Result<Instance> {
let (catalog, schema) = database::split_database(&self.database)?;
let database_client =
DatabaseClient::new(self.addr.clone(), catalog.clone(), self.auth_basic.clone());
let database_client = DatabaseClient::new(
self.addr.clone(),
catalog.clone(),
self.auth_basic.clone(),
// Treats `None` as `0s` to disable server-side default timeout.
self.timeout.unwrap_or_default(),
);
Ok(Instance::new(
Box::new(Export {

View File

@@ -14,6 +14,7 @@
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use clap::{Parser, ValueEnum};
@@ -68,13 +69,25 @@ pub struct ImportCommand {
/// The basic authentication for connecting to the server
#[clap(long)]
auth_basic: Option<String>,
/// The timeout of invoking the database.
///
/// It is used to override the server-side timeout setting.
/// The default behavior will disable server-side default timeout(i.e. `0s`).
#[clap(long, value_parser = humantime::parse_duration)]
timeout: Option<Duration>,
}
impl ImportCommand {
pub async fn build(&self, guard: Vec<WorkerGuard>) -> Result<Instance> {
let (catalog, schema) = database::split_database(&self.database)?;
let database_client =
DatabaseClient::new(self.addr.clone(), catalog.clone(), self.auth_basic.clone());
let database_client = DatabaseClient::new(
self.addr.clone(),
catalog.clone(),
self.auth_basic.clone(),
// Treats `None` as `0s` to disable server-side default timeout.
self.timeout.unwrap_or_default(),
);
Ok(Instance::new(
Box::new(Import {

View File

@@ -43,6 +43,31 @@ lazy_static::lazy_static! {
prometheus::register_int_gauge_vec!("greptime_app_version", "app version", &["version", "short_version", "app"]).unwrap();
}
/// wait for the close signal, for unix platform it's SIGINT or SIGTERM
#[cfg(unix)]
async fn start_wait_for_close_signal() -> std::io::Result<()> {
use tokio::signal::unix::{signal, SignalKind};
let mut sigint = signal(SignalKind::interrupt())?;
let mut sigterm = signal(SignalKind::terminate())?;
tokio::select! {
_ = sigint.recv() => {
info!("Received SIGINT, shutting down");
}
_ = sigterm.recv() => {
info!("Received SIGTERM, shutting down");
}
}
Ok(())
}
/// wait for the close signal, for non-unix platform it's ctrl-c
#[cfg(not(unix))]
async fn start_wait_for_close_signal() -> std::io::Result<()> {
tokio::signal::ctrl_c().await
}
#[async_trait]
pub trait App: Send {
fn name(&self) -> &str;
@@ -69,9 +94,9 @@ pub trait App: Send {
self.start().await?;
if self.wait_signal() {
if let Err(e) = tokio::signal::ctrl_c().await {
error!(e; "Failed to listen for ctrl-c signal");
// It's unusual to fail to listen for ctrl-c signal, maybe there's something unexpected in
if let Err(e) = start_wait_for_close_signal().await {
error!(e; "Failed to listen for close signal");
// It's unusual to fail to listen for close signal, maybe there's something unexpected in
// the underlying system. So we stop the app instead of running nonetheless to let people
// investigate the issue.
}

View File

@@ -33,6 +33,7 @@ geo-types = { version = "0.7", optional = true }
geohash = { version = "0.13", optional = true }
h3o = { version = "0.6", optional = true }
jsonb.workspace = true
nalgebra = "0.33"
num = "0.4"
num-traits = "0.2"
once_cell.workspace = true
@@ -49,6 +50,7 @@ table.workspace = true
wkt = { version = "0.11", optional = true }
[dev-dependencies]
approx = "0.5"
ron = "0.7"
serde = { version = "1.0", features = ["derive"] }
tokio.workspace = true

View File

@@ -27,6 +27,7 @@ use crate::scalars::matches::MatchesFunction;
use crate::scalars::math::MathFunction;
use crate::scalars::numpy::NumpyFunction;
use crate::scalars::timestamp::TimestampFunction;
use crate::scalars::vector::VectorFunction;
use crate::system::SystemFunction;
use crate::table::TableFunction;
@@ -120,6 +121,9 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
// Json related functions
JsonFunction::register(&function_registry);
// Vector related functions
VectorFunction::register(&function_registry);
// Geo functions
#[cfg(feature = "geo")]
crate::scalars::geo::GeoFunctions::register(&function_registry);

View File

@@ -21,6 +21,7 @@ pub mod json;
pub mod matches;
pub mod math;
pub mod numpy;
pub mod vector;
#[cfg(test)]
pub(crate) mod test;

View File

@@ -22,8 +22,12 @@ use datafusion::arrow::compute::kernels::cmp::gt;
use datatypes::arrow::array::AsArray;
use datatypes::arrow::compute::cast;
use datatypes::arrow::compute::kernels::zip;
use datatypes::arrow::datatypes::{DataType as ArrowDataType, Date32Type};
use datatypes::arrow::datatypes::{
DataType as ArrowDataType, Date32Type, Date64Type, TimestampMicrosecondType,
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
};
use datatypes::prelude::ConcreteDataType;
use datatypes::types::TimestampType;
use datatypes::vectors::{Helper, VectorRef};
use snafu::{ensure, ResultExt};
@@ -34,13 +38,47 @@ pub struct GreatestFunction;
const NAME: &str = "greatest";
macro_rules! gt_time_types {
($ty: ident, $columns:expr) => {{
let column1 = $columns[0].to_arrow_array();
let column2 = $columns[1].to_arrow_array();
let column1 = column1.as_primitive::<$ty>();
let column2 = column2.as_primitive::<$ty>();
let boolean_array = gt(&column1, &column2).context(ArrowComputeSnafu)?;
let result = zip::zip(&boolean_array, &column1, &column2).context(ArrowComputeSnafu)?;
Helper::try_into_vector(&result).context(error::FromArrowArraySnafu)
}};
}
impl Function for GreatestFunction {
fn name(&self) -> &str {
NAME
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::date_datatype())
fn return_type(&self, input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
ensure!(
input_types.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly two, have: {}",
input_types.len()
)
}
);
match &input_types[0] {
ConcreteDataType::String(_) => Ok(ConcreteDataType::datetime_datatype()),
ConcreteDataType::Date(_) => Ok(ConcreteDataType::date_datatype()),
ConcreteDataType::DateTime(_) => Ok(ConcreteDataType::datetime_datatype()),
ConcreteDataType::Timestamp(ts_type) => Ok(ConcreteDataType::Timestamp(*ts_type)),
_ => UnsupportedInputDataTypeSnafu {
function: NAME,
datatypes: input_types,
}
.fail(),
}
}
fn signature(&self) -> Signature {
@@ -49,6 +87,11 @@ impl Function for GreatestFunction {
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::date_datatype(),
ConcreteDataType::datetime_datatype(),
ConcreteDataType::timestamp_nanosecond_datatype(),
ConcreteDataType::timestamp_microsecond_datatype(),
ConcreteDataType::timestamp_millisecond_datatype(),
ConcreteDataType::timestamp_second_datatype(),
],
Volatility::Immutable,
)
@@ -66,27 +109,32 @@ impl Function for GreatestFunction {
);
match columns[0].data_type() {
ConcreteDataType::String(_) => {
let column1 = cast(&columns[0].to_arrow_array(), &ArrowDataType::Date32)
// Treats string as `DateTime` type.
let column1 = cast(&columns[0].to_arrow_array(), &ArrowDataType::Date64)
.context(ArrowComputeSnafu)?;
let column1 = column1.as_primitive::<Date32Type>();
let column2 = cast(&columns[1].to_arrow_array(), &ArrowDataType::Date32)
let column1 = column1.as_primitive::<Date64Type>();
let column2 = cast(&columns[1].to_arrow_array(), &ArrowDataType::Date64)
.context(ArrowComputeSnafu)?;
let column2 = column2.as_primitive::<Date32Type>();
let boolean_array = gt(&column1, &column2).context(ArrowComputeSnafu)?;
let result =
zip::zip(&boolean_array, &column1, &column2).context(ArrowComputeSnafu)?;
Ok(Helper::try_into_vector(&result).context(error::FromArrowArraySnafu)?)
}
ConcreteDataType::Date(_) => {
let column1 = columns[0].to_arrow_array();
let column1 = column1.as_primitive::<Date32Type>();
let column2 = columns[1].to_arrow_array();
let column2 = column2.as_primitive::<Date32Type>();
let column2 = column2.as_primitive::<Date64Type>();
let boolean_array = gt(&column1, &column2).context(ArrowComputeSnafu)?;
let result =
zip::zip(&boolean_array, &column1, &column2).context(ArrowComputeSnafu)?;
Ok(Helper::try_into_vector(&result).context(error::FromArrowArraySnafu)?)
}
ConcreteDataType::Date(_) => gt_time_types!(Date32Type, columns),
ConcreteDataType::DateTime(_) => gt_time_types!(Date64Type, columns),
ConcreteDataType::Timestamp(ts_type) => match ts_type {
TimestampType::Second(_) => gt_time_types!(TimestampSecondType, columns),
TimestampType::Millisecond(_) => {
gt_time_types!(TimestampMillisecondType, columns)
}
TimestampType::Microsecond(_) => {
gt_time_types!(TimestampMicrosecondType, columns)
}
TimestampType::Nanosecond(_) => {
gt_time_types!(TimestampNanosecondType, columns)
}
},
_ => UnsupportedInputDataTypeSnafu {
function: NAME,
datatypes: columns.iter().map(|c| c.data_type()).collect::<Vec<_>>(),
@@ -106,19 +154,31 @@ impl fmt::Display for GreatestFunction {
mod tests {
use std::sync::Arc;
use common_time::Date;
use datatypes::prelude::ConcreteDataType;
use datatypes::types::DateType;
use common_time::timestamp::TimeUnit;
use common_time::{Date, DateTime, Timestamp};
use datatypes::types::{
DateTimeType, DateType, TimestampMicrosecondType, TimestampMillisecondType,
TimestampNanosecondType, TimestampSecondType,
};
use datatypes::value::Value;
use datatypes::vectors::{DateVector, StringVector, Vector};
use datatypes::vectors::{
DateTimeVector, DateVector, StringVector, TimestampMicrosecondVector,
TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, Vector,
};
use paste::paste;
use super::*;
#[test]
fn test_greatest_takes_string_vector() {
let function = GreatestFunction;
assert_eq!(
function.return_type(&[]).unwrap(),
ConcreteDataType::Date(DateType)
function
.return_type(&[
ConcreteDataType::string_datatype(),
ConcreteDataType::string_datatype()
])
.unwrap(),
ConcreteDataType::DateTime(DateTimeType)
);
let columns = vec![
Arc::new(StringVector::from(vec![
@@ -132,15 +192,15 @@ mod tests {
];
let result = function.eval(FunctionContext::default(), &columns).unwrap();
let result = result.as_any().downcast_ref::<DateVector>().unwrap();
let result = result.as_any().downcast_ref::<DateTimeVector>().unwrap();
assert_eq!(result.len(), 2);
assert_eq!(
result.get(0),
Value::Date(Date::from_str_utc("2001-02-01").unwrap())
Value::DateTime(DateTime::from_str("2001-02-01 00:00:00", None).unwrap())
);
assert_eq!(
result.get(1),
Value::Date(Date::from_str_utc("2012-12-23").unwrap())
Value::DateTime(DateTime::from_str("2012-12-23 00:00:00", None).unwrap())
);
}
@@ -148,9 +208,15 @@ mod tests {
fn test_greatest_takes_date_vector() {
let function = GreatestFunction;
assert_eq!(
function.return_type(&[]).unwrap(),
function
.return_type(&[
ConcreteDataType::date_datatype(),
ConcreteDataType::date_datatype()
])
.unwrap(),
ConcreteDataType::Date(DateType)
);
let columns = vec![
Arc::new(DateVector::from_slice(vec![-1, 2])) as _,
Arc::new(DateVector::from_slice(vec![0, 1])) as _,
@@ -168,4 +234,81 @@ mod tests {
Value::Date(Date::from_str_utc("1970-01-03").unwrap())
);
}
#[test]
fn test_greatest_takes_datetime_vector() {
let function = GreatestFunction;
assert_eq!(
function
.return_type(&[
ConcreteDataType::datetime_datatype(),
ConcreteDataType::datetime_datatype()
])
.unwrap(),
ConcreteDataType::DateTime(DateTimeType)
);
let columns = vec![
Arc::new(DateTimeVector::from_slice(vec![-1, 2])) as _,
Arc::new(DateTimeVector::from_slice(vec![0, 1])) as _,
];
let result = function.eval(FunctionContext::default(), &columns).unwrap();
let result = result.as_any().downcast_ref::<DateTimeVector>().unwrap();
assert_eq!(result.len(), 2);
assert_eq!(
result.get(0),
Value::DateTime(DateTime::from_str("1970-01-01 00:00:00", None).unwrap())
);
assert_eq!(
result.get(1),
Value::DateTime(DateTime::from_str("1970-01-01 00:00:00.002", None).unwrap())
);
}
macro_rules! test_timestamp {
($type: expr,$unit: ident) => {
paste! {
#[test]
fn [<test_greatest_takes_ $unit:lower _vector>]() {
let function = GreatestFunction;
assert_eq!(
function.return_type(&[$type, $type]).unwrap(),
ConcreteDataType::Timestamp(TimestampType::$unit([<Timestamp $unit Type>]))
);
let columns = vec![
Arc::new([<Timestamp $unit Vector>]::from_slice(vec![-1, 2])) as _,
Arc::new([<Timestamp $unit Vector>]::from_slice(vec![0, 1])) as _,
];
let result = function.eval(FunctionContext::default(), &columns).unwrap();
let result = result.as_any().downcast_ref::<[<Timestamp $unit Vector>]>().unwrap();
assert_eq!(result.len(), 2);
assert_eq!(
result.get(0),
Value::Timestamp(Timestamp::new(0, TimeUnit::$unit))
);
assert_eq!(
result.get(1),
Value::Timestamp(Timestamp::new(2, TimeUnit::$unit))
);
}
}
}
}
test_timestamp!(
ConcreteDataType::timestamp_nanosecond_datatype(),
Nanosecond
);
test_timestamp!(
ConcreteDataType::timestamp_microsecond_datatype(),
Microsecond
);
test_timestamp!(
ConcreteDataType::timestamp_millisecond_datatype(),
Millisecond
);
test_timestamp!(ConcreteDataType::timestamp_second_datatype(), Second);
}

View File

@@ -0,0 +1,35 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod convert;
mod distance;
use std::sync::Arc;
use crate::function_registry::FunctionRegistry;
pub(crate) struct VectorFunction;
impl VectorFunction {
pub fn register(registry: &FunctionRegistry) {
// conversion
registry.register(Arc::new(convert::ParseVectorFunction));
registry.register(Arc::new(convert::VectorToStringFunction));
// distance
registry.register(Arc::new(distance::CosDistanceFunction));
registry.register(Arc::new(distance::DotProductFunction));
registry.register(Arc::new(distance::L2SqDistanceFunction));
}
}

View File

@@ -0,0 +1,19 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod parse_vector;
mod vector_to_string;
pub use parse_vector::ParseVectorFunction;
pub use vector_to_string::VectorToStringFunction;

View File

@@ -0,0 +1,160 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Display;
use common_query::error::{InvalidFuncArgsSnafu, InvalidVectorStringSnafu, Result};
use common_query::prelude::{Signature, Volatility};
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::types::parse_string_to_vector_type_value;
use datatypes::vectors::{BinaryVectorBuilder, MutableVector, VectorRef};
use snafu::{ensure, ResultExt};
use crate::function::{Function, FunctionContext};
const NAME: &str = "parse_vec";
#[derive(Debug, Clone, Default)]
pub struct ParseVectorFunction;
impl Function for ParseVectorFunction {
fn name(&self) -> &str {
NAME
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::binary_datatype())
}
fn signature(&self) -> Signature {
Signature::exact(
vec![ConcreteDataType::string_datatype()],
Volatility::Immutable,
)
}
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly one, have: {}",
columns.len()
),
}
);
let column = &columns[0];
let size = column.len();
let mut result = BinaryVectorBuilder::with_capacity(size);
for i in 0..size {
let value = column.get(i).as_string();
if let Some(value) = value {
let res = parse_string_to_vector_type_value(&value, None)
.context(InvalidVectorStringSnafu { vec_str: &value })?;
result.push(Some(&res));
} else {
result.push_null();
}
}
Ok(result.to_vector())
}
}
impl Display for ParseVectorFunction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", NAME.to_ascii_uppercase())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_base::bytes::Bytes;
use datatypes::value::Value;
use datatypes::vectors::StringVector;
use super::*;
#[test]
fn test_parse_vector() {
let func = ParseVectorFunction;
let input = Arc::new(StringVector::from(vec![
Some("[1.0,2.0,3.0]".to_string()),
Some("[4.0,5.0,6.0]".to_string()),
None,
]));
let result = func.eval(FunctionContext::default(), &[input]).unwrap();
let result = result.as_ref();
assert_eq!(result.len(), 3);
assert_eq!(
result.get(0),
Value::Binary(Bytes::from(
[1.0f32, 2.0, 3.0]
.iter()
.flat_map(|e| e.to_le_bytes())
.collect::<Vec<u8>>()
))
);
assert_eq!(
result.get(1),
Value::Binary(Bytes::from(
[4.0f32, 5.0, 6.0]
.iter()
.flat_map(|e| e.to_le_bytes())
.collect::<Vec<u8>>()
))
);
assert!(result.get(2).is_null());
}
#[test]
fn test_parse_vector_error() {
let func = ParseVectorFunction;
let input = Arc::new(StringVector::from(vec![
Some("[1.0,2.0,3.0]".to_string()),
Some("[4.0,5.0,6.0]".to_string()),
Some("[7.0,8.0,9.0".to_string()),
]));
let result = func.eval(FunctionContext::default(), &[input]);
assert!(result.is_err());
let input = Arc::new(StringVector::from(vec![
Some("[1.0,2.0,3.0]".to_string()),
Some("[4.0,5.0,6.0]".to_string()),
Some("7.0,8.0,9.0]".to_string()),
]));
let result = func.eval(FunctionContext::default(), &[input]);
assert!(result.is_err());
let input = Arc::new(StringVector::from(vec![
Some("[1.0,2.0,3.0]".to_string()),
Some("[4.0,5.0,6.0]".to_string()),
Some("[7.0,hello,9.0]".to_string()),
]));
let result = func.eval(FunctionContext::default(), &[input]);
assert!(result.is_err());
}
}

View File

@@ -0,0 +1,139 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Display;
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::{Signature, Volatility};
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::types::vector_type_value_to_string;
use datatypes::value::Value;
use datatypes::vectors::{MutableVector, StringVectorBuilder, VectorRef};
use snafu::ensure;
use crate::function::{Function, FunctionContext};
const NAME: &str = "vec_to_string";
#[derive(Debug, Clone, Default)]
pub struct VectorToStringFunction;
impl Function for VectorToStringFunction {
fn name(&self) -> &str {
NAME
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::string_datatype())
}
fn signature(&self) -> Signature {
Signature::exact(
vec![ConcreteDataType::binary_datatype()],
Volatility::Immutable,
)
}
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly one, have: {}",
columns.len()
),
}
);
let column = &columns[0];
let size = column.len();
let mut result = StringVectorBuilder::with_capacity(size);
for i in 0..size {
let value = column.get(i);
match value {
Value::Binary(bytes) => {
let len = bytes.len();
if len % std::mem::size_of::<f32>() != 0 {
return InvalidFuncArgsSnafu {
err_msg: format!("Invalid binary length of vector: {}", len),
}
.fail();
}
let dim = len / std::mem::size_of::<f32>();
// Safety: `dim` is calculated from the length of `bytes` and is guaranteed to be valid
let res = vector_type_value_to_string(&bytes, dim as _).unwrap();
result.push(Some(&res));
}
Value::Null => {
result.push_null();
}
_ => {
return InvalidFuncArgsSnafu {
err_msg: format!("Invalid value type: {:?}", value.data_type()),
}
.fail();
}
}
}
Ok(result.to_vector())
}
}
impl Display for VectorToStringFunction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", NAME.to_ascii_uppercase())
}
}
#[cfg(test)]
mod tests {
use datatypes::value::Value;
use datatypes::vectors::BinaryVectorBuilder;
use super::*;
#[test]
fn test_vector_to_string() {
let func = VectorToStringFunction;
let mut builder = BinaryVectorBuilder::with_capacity(3);
builder.push(Some(
[1.0f32, 2.0, 3.0]
.iter()
.flat_map(|e| e.to_le_bytes())
.collect::<Vec<_>>()
.as_slice(),
));
builder.push(Some(
[4.0f32, 5.0, 6.0]
.iter()
.flat_map(|e| e.to_le_bytes())
.collect::<Vec<_>>()
.as_slice(),
));
builder.push_null();
let vector = builder.to_vector();
let result = func.eval(FunctionContext::default(), &[vector]).unwrap();
assert_eq!(result.len(), 3);
assert_eq!(result.get(0), Value::String("[1,2,3]".to_string().into()));
assert_eq!(result.get(1), Value::String("[4,5,6]".to_string().into()));
assert_eq!(result.get(2), Value::Null);
}
}

View File

@@ -0,0 +1,482 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod cos;
mod dot;
mod l2sq;
use std::borrow::Cow;
use std::fmt::Display;
use std::sync::Arc;
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::Signature;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::value::ValueRef;
use datatypes::vectors::{Float32VectorBuilder, MutableVector, Vector, VectorRef};
use snafu::ensure;
use crate::function::{Function, FunctionContext};
use crate::helper;
macro_rules! define_distance_function {
($StructName:ident, $display_name:expr, $similarity_method:path) => {
/// A function calculates the distance between two vectors.
#[derive(Debug, Clone, Default)]
pub struct $StructName;
impl Function for $StructName {
fn name(&self) -> &str {
$display_name
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::float32_datatype())
}
fn signature(&self) -> Signature {
helper::one_of_sigs2(
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::binary_datatype(),
],
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::binary_datatype(),
],
)
}
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly two, have: {}",
columns.len()
),
}
);
let arg0 = &columns[0];
let arg1 = &columns[1];
let size = arg0.len();
let mut result = Float32VectorBuilder::with_capacity(size);
if size == 0 {
return Ok(result.to_vector());
}
let arg0_const = parse_if_constant_string(arg0)?;
let arg1_const = parse_if_constant_string(arg1)?;
for i in 0..size {
let vec0 = match arg0_const.as_ref() {
Some(a) => Some(Cow::Borrowed(a.as_slice())),
None => as_vector(arg0.get_ref(i))?,
};
let vec1 = match arg1_const.as_ref() {
Some(b) => Some(Cow::Borrowed(b.as_slice())),
None => as_vector(arg1.get_ref(i))?,
};
if let (Some(vec0), Some(vec1)) = (vec0, vec1) {
ensure!(
vec0.len() == vec1.len(),
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the vectors must match to calculate distance, have: {} vs {}",
vec0.len(),
vec1.len()
),
}
);
// Checked if the length of the vectors match
let d = $similarity_method(vec0.as_ref(), vec1.as_ref());
result.push(Some(d));
} else {
result.push_null();
}
}
return Ok(result.to_vector());
}
}
impl Display for $StructName {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", $display_name.to_ascii_uppercase())
}
}
}
}
define_distance_function!(CosDistanceFunction, "vec_cos_distance", cos::cos);
define_distance_function!(L2SqDistanceFunction, "vec_l2sq_distance", l2sq::l2sq);
define_distance_function!(DotProductFunction, "vec_dot_product", dot::dot);
/// Parse a vector value if the value is a constant string.
fn parse_if_constant_string(arg: &Arc<dyn Vector>) -> Result<Option<Vec<f32>>> {
if !arg.is_const() {
return Ok(None);
}
if arg.data_type() != ConcreteDataType::string_datatype() {
return Ok(None);
}
arg.get_ref(0)
.as_string()
.unwrap() // Safe: checked if it is a string
.map(parse_f32_vector_from_string)
.transpose()
}
/// Convert a value to a vector value.
/// Supported data types are binary and string.
fn as_vector(arg: ValueRef<'_>) -> Result<Option<Cow<'_, [f32]>>> {
match arg.data_type() {
ConcreteDataType::Binary(_) => arg
.as_binary()
.unwrap() // Safe: checked if it is a binary
.map(binary_as_vector)
.transpose(),
ConcreteDataType::String(_) => arg
.as_string()
.unwrap() // Safe: checked if it is a string
.map(|s| Ok(Cow::Owned(parse_f32_vector_from_string(s)?)))
.transpose(),
ConcreteDataType::Null(_) => Ok(None),
_ => InvalidFuncArgsSnafu {
err_msg: format!("Unsupported data type: {:?}", arg.data_type()),
}
.fail(),
}
}
/// Convert a u8 slice to a vector value.
fn binary_as_vector(bytes: &[u8]) -> Result<Cow<'_, [f32]>> {
if bytes.len() % std::mem::size_of::<f32>() != 0 {
return InvalidFuncArgsSnafu {
err_msg: format!("Invalid binary length of vector: {}", bytes.len()),
}
.fail();
}
if cfg!(target_endian = "little") {
Ok(unsafe {
let vec = std::slice::from_raw_parts(
bytes.as_ptr() as *const f32,
bytes.len() / std::mem::size_of::<f32>(),
);
Cow::Borrowed(vec)
})
} else {
let v = bytes
.chunks_exact(std::mem::size_of::<f32>())
.map(|chunk| f32::from_le_bytes(chunk.try_into().unwrap()))
.collect::<Vec<f32>>();
Ok(Cow::Owned(v))
}
}
/// Parse a string to a vector value.
/// Valid inputs are strings like "[1.0, 2.0, 3.0]".
fn parse_f32_vector_from_string(s: &str) -> Result<Vec<f32>> {
let trimmed = s.trim();
if !trimmed.starts_with('[') || !trimmed.ends_with(']') {
return InvalidFuncArgsSnafu {
err_msg: format!(
"Failed to parse {s} to Vector value: not properly enclosed in brackets"
),
}
.fail();
}
let content = trimmed[1..trimmed.len() - 1].trim();
if content.is_empty() {
return Ok(Vec::new());
}
content
.split(',')
.map(|s| s.trim().parse::<f32>())
.collect::<std::result::Result<_, _>>()
.map_err(|e| {
InvalidFuncArgsSnafu {
err_msg: format!("Failed to parse {s} to Vector value: {e}"),
}
.build()
})
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use datatypes::vectors::{BinaryVector, ConstantVector, StringVector};
use super::*;
#[test]
fn test_distance_string_string() {
let funcs = [
Box::new(CosDistanceFunction {}) as Box<dyn Function>,
Box::new(L2SqDistanceFunction {}) as Box<dyn Function>,
Box::new(DotProductFunction {}) as Box<dyn Function>,
];
for func in funcs {
let vec1 = Arc::new(StringVector::from(vec![
Some("[0.0, 1.0]"),
Some("[1.0, 0.0]"),
None,
Some("[1.0, 0.0]"),
])) as VectorRef;
let vec2 = Arc::new(StringVector::from(vec![
Some("[0.0, 1.0]"),
Some("[0.0, 1.0]"),
Some("[0.0, 1.0]"),
None,
])) as VectorRef;
let result = func
.eval(FunctionContext::default(), &[vec1.clone(), vec2.clone()])
.unwrap();
assert!(!result.get(0).is_null());
assert!(!result.get(1).is_null());
assert!(result.get(2).is_null());
assert!(result.get(3).is_null());
let result = func
.eval(FunctionContext::default(), &[vec2, vec1])
.unwrap();
assert!(!result.get(0).is_null());
assert!(!result.get(1).is_null());
assert!(result.get(2).is_null());
assert!(result.get(3).is_null());
}
}
#[test]
fn test_distance_binary_binary() {
let funcs = [
Box::new(CosDistanceFunction {}) as Box<dyn Function>,
Box::new(L2SqDistanceFunction {}) as Box<dyn Function>,
Box::new(DotProductFunction {}) as Box<dyn Function>,
];
for func in funcs {
let vec1 = Arc::new(BinaryVector::from(vec![
Some(vec![0, 0, 0, 0, 0, 0, 128, 63]),
Some(vec![0, 0, 128, 63, 0, 0, 0, 0]),
None,
Some(vec![0, 0, 128, 63, 0, 0, 0, 0]),
])) as VectorRef;
let vec2 = Arc::new(BinaryVector::from(vec![
// [0.0, 1.0]
Some(vec![0, 0, 0, 0, 0, 0, 128, 63]),
Some(vec![0, 0, 0, 0, 0, 0, 128, 63]),
Some(vec![0, 0, 0, 0, 0, 0, 128, 63]),
None,
])) as VectorRef;
let result = func
.eval(FunctionContext::default(), &[vec1.clone(), vec2.clone()])
.unwrap();
assert!(!result.get(0).is_null());
assert!(!result.get(1).is_null());
assert!(result.get(2).is_null());
assert!(result.get(3).is_null());
let result = func
.eval(FunctionContext::default(), &[vec2, vec1])
.unwrap();
assert!(!result.get(0).is_null());
assert!(!result.get(1).is_null());
assert!(result.get(2).is_null());
assert!(result.get(3).is_null());
}
}
#[test]
fn test_distance_string_binary() {
let funcs = [
Box::new(CosDistanceFunction {}) as Box<dyn Function>,
Box::new(L2SqDistanceFunction {}) as Box<dyn Function>,
Box::new(DotProductFunction {}) as Box<dyn Function>,
];
for func in funcs {
let vec1 = Arc::new(StringVector::from(vec![
Some("[0.0, 1.0]"),
Some("[1.0, 0.0]"),
None,
Some("[1.0, 0.0]"),
])) as VectorRef;
let vec2 = Arc::new(BinaryVector::from(vec![
// [0.0, 1.0]
Some(vec![0, 0, 0, 0, 0, 0, 128, 63]),
Some(vec![0, 0, 0, 0, 0, 0, 128, 63]),
Some(vec![0, 0, 0, 0, 0, 0, 128, 63]),
None,
])) as VectorRef;
let result = func
.eval(FunctionContext::default(), &[vec1.clone(), vec2.clone()])
.unwrap();
assert!(!result.get(0).is_null());
assert!(!result.get(1).is_null());
assert!(result.get(2).is_null());
assert!(result.get(3).is_null());
let result = func
.eval(FunctionContext::default(), &[vec2, vec1])
.unwrap();
assert!(!result.get(0).is_null());
assert!(!result.get(1).is_null());
assert!(result.get(2).is_null());
assert!(result.get(3).is_null());
}
}
#[test]
fn test_distance_const_string() {
let funcs = [
Box::new(CosDistanceFunction {}) as Box<dyn Function>,
Box::new(L2SqDistanceFunction {}) as Box<dyn Function>,
Box::new(DotProductFunction {}) as Box<dyn Function>,
];
for func in funcs {
let const_str = Arc::new(ConstantVector::new(
Arc::new(StringVector::from(vec!["[0.0, 1.0]"])),
4,
));
let vec1 = Arc::new(StringVector::from(vec![
Some("[0.0, 1.0]"),
Some("[1.0, 0.0]"),
None,
Some("[1.0, 0.0]"),
])) as VectorRef;
let vec2 = Arc::new(BinaryVector::from(vec![
// [0.0, 1.0]
Some(vec![0, 0, 0, 0, 0, 0, 128, 63]),
Some(vec![0, 0, 0, 0, 0, 0, 128, 63]),
Some(vec![0, 0, 0, 0, 0, 0, 128, 63]),
None,
])) as VectorRef;
let result = func
.eval(
FunctionContext::default(),
&[const_str.clone(), vec1.clone()],
)
.unwrap();
assert!(!result.get(0).is_null());
assert!(!result.get(1).is_null());
assert!(result.get(2).is_null());
assert!(!result.get(3).is_null());
let result = func
.eval(
FunctionContext::default(),
&[vec1.clone(), const_str.clone()],
)
.unwrap();
assert!(!result.get(0).is_null());
assert!(!result.get(1).is_null());
assert!(result.get(2).is_null());
assert!(!result.get(3).is_null());
let result = func
.eval(
FunctionContext::default(),
&[const_str.clone(), vec2.clone()],
)
.unwrap();
assert!(!result.get(0).is_null());
assert!(!result.get(1).is_null());
assert!(!result.get(2).is_null());
assert!(result.get(3).is_null());
let result = func
.eval(
FunctionContext::default(),
&[vec2.clone(), const_str.clone()],
)
.unwrap();
assert!(!result.get(0).is_null());
assert!(!result.get(1).is_null());
assert!(!result.get(2).is_null());
assert!(result.get(3).is_null());
}
}
#[test]
fn test_invalid_vector_length() {
let funcs = [
Box::new(CosDistanceFunction {}) as Box<dyn Function>,
Box::new(L2SqDistanceFunction {}) as Box<dyn Function>,
Box::new(DotProductFunction {}) as Box<dyn Function>,
];
for func in funcs {
let vec1 = Arc::new(StringVector::from(vec!["[1.0]"])) as VectorRef;
let vec2 = Arc::new(StringVector::from(vec!["[1.0, 1.0]"])) as VectorRef;
let result = func.eval(FunctionContext::default(), &[vec1, vec2]);
assert!(result.is_err());
let vec1 = Arc::new(BinaryVector::from(vec![vec![0, 0, 128, 63]])) as VectorRef;
let vec2 =
Arc::new(BinaryVector::from(vec![vec![0, 0, 128, 63, 0, 0, 0, 64]])) as VectorRef;
let result = func.eval(FunctionContext::default(), &[vec1, vec2]);
assert!(result.is_err());
}
}
#[test]
fn test_parse_vector_from_string() {
let result = parse_f32_vector_from_string("[1.0, 2.0, 3.0]").unwrap();
assert_eq!(result, vec![1.0, 2.0, 3.0]);
let result = parse_f32_vector_from_string("[]").unwrap();
assert_eq!(result, Vec::<f32>::new());
let result = parse_f32_vector_from_string("[1.0, a, 3.0]");
assert!(result.is_err());
}
#[test]
fn test_binary_as_vector() {
let bytes = [0, 0, 128, 63];
let result = binary_as_vector(&bytes).unwrap();
assert_eq!(result.as_ref(), &[1.0]);
let invalid_bytes = [0, 0, 128];
let result = binary_as_vector(&invalid_bytes);
assert!(result.is_err());
}
}

View File

@@ -0,0 +1,87 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use nalgebra::DVectorView;
/// Calculates the cos distance between two vectors.
///
/// **Note:** Must ensure that the length of the two vectors are the same.
pub fn cos(lhs: &[f32], rhs: &[f32]) -> f32 {
let lhs_vec = DVectorView::from_slice(lhs, lhs.len());
let rhs_vec = DVectorView::from_slice(rhs, rhs.len());
let dot_product = lhs_vec.dot(&rhs_vec);
let lhs_norm = lhs_vec.norm();
let rhs_norm = rhs_vec.norm();
if dot_product.abs() < f32::EPSILON
|| lhs_norm.abs() < f32::EPSILON
|| rhs_norm.abs() < f32::EPSILON
{
return 1.0;
}
let cos_similar = dot_product / (lhs_norm * rhs_norm);
let res = 1.0 - cos_similar;
if res.abs() < f32::EPSILON {
0.0
} else {
res
}
}
#[cfg(test)]
mod tests {
use approx::assert_relative_eq;
use super::*;
#[test]
fn test_cos_scalar() {
let lhs = vec![1.0, 2.0, 3.0];
let rhs = vec![1.0, 2.0, 3.0];
assert_relative_eq!(cos(&lhs, &rhs), 0.0, epsilon = 1e-2);
let lhs = vec![1.0, 2.0, 3.0];
let rhs = vec![4.0, 5.0, 6.0];
assert_relative_eq!(cos(&lhs, &rhs), 0.025, epsilon = 1e-2);
let lhs = vec![1.0, 2.0, 3.0];
let rhs = vec![7.0, 8.0, 9.0];
assert_relative_eq!(cos(&lhs, &rhs), 0.04, epsilon = 1e-2);
let lhs = vec![0.0, 0.0, 0.0];
let rhs = vec![1.0, 2.0, 3.0];
assert_relative_eq!(cos(&lhs, &rhs), 1.0, epsilon = 1e-2);
let lhs = vec![0.0, 0.0, 0.0];
let rhs = vec![4.0, 5.0, 6.0];
assert_relative_eq!(cos(&lhs, &rhs), 1.0, epsilon = 1e-2);
let lhs = vec![0.0, 0.0, 0.0];
let rhs = vec![7.0, 8.0, 9.0];
assert_relative_eq!(cos(&lhs, &rhs), 1.0, epsilon = 1e-2);
let lhs = vec![7.0, 8.0, 9.0];
let rhs = vec![1.0, 2.0, 3.0];
assert_relative_eq!(cos(&lhs, &rhs), 0.04, epsilon = 1e-2);
let lhs = vec![7.0, 8.0, 9.0];
let rhs = vec![4.0, 5.0, 6.0];
assert_relative_eq!(cos(&lhs, &rhs), 0.0, epsilon = 1e-2);
let lhs = vec![7.0, 8.0, 9.0];
let rhs = vec![7.0, 8.0, 9.0];
assert_relative_eq!(cos(&lhs, &rhs), 0.0, epsilon = 1e-2);
}
}

View File

@@ -0,0 +1,71 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use nalgebra::DVectorView;
/// Calculates the dot product between two vectors.
///
/// **Note:** Must ensure that the length of the two vectors are the same.
pub fn dot(lhs: &[f32], rhs: &[f32]) -> f32 {
let lhs = DVectorView::from_slice(lhs, lhs.len());
let rhs = DVectorView::from_slice(rhs, rhs.len());
lhs.dot(&rhs)
}
#[cfg(test)]
mod tests {
use approx::assert_relative_eq;
use super::*;
#[test]
fn test_dot_scalar() {
let lhs = vec![1.0, 2.0, 3.0];
let rhs = vec![1.0, 2.0, 3.0];
assert_relative_eq!(dot(&lhs, &rhs), 14.0, epsilon = 1e-2);
let lhs = vec![1.0, 2.0, 3.0];
let rhs = vec![4.0, 5.0, 6.0];
assert_relative_eq!(dot(&lhs, &rhs), 32.0, epsilon = 1e-2);
let lhs = vec![1.0, 2.0, 3.0];
let rhs = vec![7.0, 8.0, 9.0];
assert_relative_eq!(dot(&lhs, &rhs), 50.0, epsilon = 1e-2);
let lhs = vec![0.0, 0.0, 0.0];
let rhs = vec![1.0, 2.0, 3.0];
assert_relative_eq!(dot(&lhs, &rhs), 0.0, epsilon = 1e-2);
let lhs = vec![0.0, 0.0, 0.0];
let rhs = vec![4.0, 5.0, 6.0];
assert_relative_eq!(dot(&lhs, &rhs), 0.0, epsilon = 1e-2);
let lhs = vec![0.0, 0.0, 0.0];
let rhs = vec![7.0, 8.0, 9.0];
assert_relative_eq!(dot(&lhs, &rhs), 0.0, epsilon = 1e-2);
let lhs = vec![7.0, 8.0, 9.0];
let rhs = vec![1.0, 2.0, 3.0];
assert_relative_eq!(dot(&lhs, &rhs), 50.0, epsilon = 1e-2);
let lhs = vec![7.0, 8.0, 9.0];
let rhs = vec![4.0, 5.0, 6.0];
assert_relative_eq!(dot(&lhs, &rhs), 122.0, epsilon = 1e-2);
let lhs = vec![7.0, 8.0, 9.0];
let rhs = vec![7.0, 8.0, 9.0];
assert_relative_eq!(dot(&lhs, &rhs), 194.0, epsilon = 1e-2);
}
}

View File

@@ -0,0 +1,71 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use nalgebra::DVectorView;
/// Calculates the squared L2 distance between two vectors.
///
/// **Note:** Must ensure that the length of the two vectors are the same.
pub fn l2sq(lhs: &[f32], rhs: &[f32]) -> f32 {
let lhs = DVectorView::from_slice(lhs, lhs.len());
let rhs = DVectorView::from_slice(rhs, rhs.len());
(lhs - rhs).norm_squared()
}
#[cfg(test)]
mod tests {
use approx::assert_relative_eq;
use super::*;
#[test]
fn test_l2sq_scalar() {
let lhs = vec![1.0, 2.0, 3.0];
let rhs = vec![1.0, 2.0, 3.0];
assert_relative_eq!(l2sq(&lhs, &rhs), 0.0, epsilon = 1e-2);
let lhs = vec![1.0, 2.0, 3.0];
let rhs = vec![4.0, 5.0, 6.0];
assert_relative_eq!(l2sq(&lhs, &rhs), 27.0, epsilon = 1e-2);
let lhs = vec![1.0, 2.0, 3.0];
let rhs = vec![7.0, 8.0, 9.0];
assert_relative_eq!(l2sq(&lhs, &rhs), 108.0, epsilon = 1e-2);
let lhs = vec![0.0, 0.0, 0.0];
let rhs = vec![1.0, 2.0, 3.0];
assert_relative_eq!(l2sq(&lhs, &rhs), 14.0, epsilon = 1e-2);
let lhs = vec![0.0, 0.0, 0.0];
let rhs = vec![4.0, 5.0, 6.0];
assert_relative_eq!(l2sq(&lhs, &rhs), 77.0, epsilon = 1e-2);
let lhs = vec![0.0, 0.0, 0.0];
let rhs = vec![7.0, 8.0, 9.0];
assert_relative_eq!(l2sq(&lhs, &rhs), 194.0, epsilon = 1e-2);
let lhs = vec![7.0, 8.0, 9.0];
let rhs = vec![1.0, 2.0, 3.0];
assert_relative_eq!(l2sq(&lhs, &rhs), 108.0, epsilon = 1e-2);
let lhs = vec![7.0, 8.0, 9.0];
let rhs = vec![4.0, 5.0, 6.0];
assert_relative_eq!(l2sq(&lhs, &rhs), 27.0, epsilon = 1e-2);
let lhs = vec![7.0, 8.0, 9.0];
let rhs = vec![7.0, 8.0, 9.0];
assert_relative_eq!(l2sq(&lhs, &rhs), 0.0, epsilon = 1e-2);
}
}

View File

@@ -14,30 +14,30 @@
use api::helper::ColumnDataTypeWrapper;
use api::v1::add_column_location::LocationType;
use api::v1::alter_expr::Kind;
use api::v1::alter_table_expr::Kind;
use api::v1::column_def::as_fulltext_option;
use api::v1::{
column_def, AddColumnLocation as Location, AlterExpr, Analyzer, ChangeColumnTypes,
CreateTableExpr, DropColumns, RenameTable, SemanticType,
column_def, AddColumnLocation as Location, AlterTableExpr, Analyzer, CreateTableExpr,
DropColumns, ModifyColumnTypes, RenameTable, SemanticType,
};
use common_query::AddColumnLocation;
use datatypes::schema::{ColumnSchema, FulltextOptions, RawSchema};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::region_request::ChangeOption;
use store_api::region_request::{SetRegionOption, UnsetRegionOption};
use table::metadata::TableId;
use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, ChangeColumnTypeRequest};
use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, ModifyColumnTypeRequest};
use crate::error::{
InvalidChangeFulltextOptionRequestSnafu, InvalidChangeTableOptionRequestSnafu,
InvalidColumnDefSnafu, MissingFieldSnafu, MissingTimestampColumnSnafu, Result,
InvalidColumnDefSnafu, InvalidSetFulltextOptionRequestSnafu, InvalidSetTableOptionRequestSnafu,
InvalidUnsetTableOptionRequestSnafu, MissingFieldSnafu, MissingTimestampColumnSnafu, Result,
UnknownLocationTypeSnafu,
};
const LOCATION_TYPE_FIRST: i32 = LocationType::First as i32;
const LOCATION_TYPE_AFTER: i32 = LocationType::After as i32;
/// Convert an [`AlterExpr`] to an [`AlterTableRequest`]
pub fn alter_expr_to_request(table_id: TableId, expr: AlterExpr) -> Result<AlterTableRequest> {
/// Convert an [`AlterTableExpr`] to an [`AlterTableRequest`]
pub fn alter_expr_to_request(table_id: TableId, expr: AlterTableExpr) -> Result<AlterTableRequest> {
let catalog_name = expr.catalog_name;
let schema_name = expr.schema_name;
let kind = expr.kind.context(MissingFieldSnafu { field: "kind" })?;
@@ -68,25 +68,25 @@ pub fn alter_expr_to_request(table_id: TableId, expr: AlterExpr) -> Result<Alter
columns: add_column_requests,
}
}
Kind::ChangeColumnTypes(ChangeColumnTypes {
change_column_types,
Kind::ModifyColumnTypes(ModifyColumnTypes {
modify_column_types,
}) => {
let change_column_type_requests = change_column_types
let modify_column_type_requests = modify_column_types
.into_iter()
.map(|cct| {
let target_type =
ColumnDataTypeWrapper::new(cct.target_type(), cct.target_type_extension)
.into();
Ok(ChangeColumnTypeRequest {
Ok(ModifyColumnTypeRequest {
column_name: cct.column_name,
target_type,
})
})
.collect::<Result<Vec<_>>>()?;
AlterKind::ChangeColumnTypes {
columns: change_column_type_requests,
AlterKind::ModifyColumnTypes {
columns: modify_column_type_requests,
}
}
Kind::DropColumns(DropColumns { drop_columns }) => AlterKind::DropColumns {
@@ -95,26 +95,37 @@ pub fn alter_expr_to_request(table_id: TableId, expr: AlterExpr) -> Result<Alter
Kind::RenameTable(RenameTable { new_table_name }) => {
AlterKind::RenameTable { new_table_name }
}
Kind::ChangeTableOptions(api::v1::ChangeTableOptions {
change_table_options,
}) => AlterKind::ChangeTableOptions {
options: change_table_options
.iter()
.map(ChangeOption::try_from)
.collect::<std::result::Result<Vec<_>, _>>()
.context(InvalidChangeTableOptionRequestSnafu)?,
},
Kind::ChangeColumnFulltext(c) => AlterKind::ChangeColumnFulltext {
Kind::SetTableOptions(api::v1::SetTableOptions { table_options }) => {
AlterKind::SetTableOptions {
options: table_options
.iter()
.map(SetRegionOption::try_from)
.collect::<std::result::Result<Vec<_>, _>>()
.context(InvalidSetTableOptionRequestSnafu)?,
}
}
Kind::UnsetTableOptions(api::v1::UnsetTableOptions { keys }) => {
AlterKind::UnsetTableOptions {
keys: keys
.iter()
.map(|key| UnsetRegionOption::try_from(key.as_str()))
.collect::<std::result::Result<Vec<_>, _>>()
.context(InvalidUnsetTableOptionRequestSnafu)?,
}
}
Kind::SetColumnFulltext(c) => AlterKind::SetColumnFulltext {
column_name: c.column_name,
options: FulltextOptions {
enable: c.enable,
analyzer: as_fulltext_option(
Analyzer::try_from(c.analyzer)
.context(InvalidChangeFulltextOptionRequestSnafu)?,
Analyzer::try_from(c.analyzer).context(InvalidSetFulltextOptionRequestSnafu)?,
),
case_sensitive: c.case_sensitive,
},
},
Kind::UnsetColumnFulltext(c) => AlterKind::UnsetColumnFulltext {
column_name: c.column_name,
},
};
let request = AlterTableRequest {
@@ -183,7 +194,7 @@ fn parse_location(location: Option<Location>) -> Result<Option<AddColumnLocation
#[cfg(test)]
mod tests {
use api::v1::{
AddColumn, AddColumns, ChangeColumnType, ColumnDataType, ColumnDef, DropColumn,
AddColumn, AddColumns, ColumnDataType, ColumnDef, DropColumn, ModifyColumnType,
SemanticType,
};
use datatypes::prelude::ConcreteDataType;
@@ -192,7 +203,7 @@ mod tests {
#[test]
fn test_alter_expr_to_request() {
let expr = AlterExpr {
let expr = AlterTableExpr {
catalog_name: String::default(),
schema_name: String::default(),
table_name: "monitor".to_string(),
@@ -233,7 +244,7 @@ mod tests {
#[test]
fn test_alter_expr_with_location_to_request() {
let expr = AlterExpr {
let expr = AlterTableExpr {
catalog_name: String::default(),
schema_name: String::default(),
table_name: "monitor".to_string(),
@@ -309,14 +320,14 @@ mod tests {
}
#[test]
fn test_change_column_type_expr() {
let expr = AlterExpr {
fn test_modify_column_type_expr() {
let expr = AlterTableExpr {
catalog_name: "test_catalog".to_string(),
schema_name: "test_schema".to_string(),
table_name: "monitor".to_string(),
kind: Some(Kind::ChangeColumnTypes(ChangeColumnTypes {
change_column_types: vec![ChangeColumnType {
kind: Some(Kind::ModifyColumnTypes(ModifyColumnTypes {
modify_column_types: vec![ModifyColumnType {
column_name: "mem_usage".to_string(),
target_type: ColumnDataType::String as i32,
target_type_extension: None,
@@ -329,22 +340,22 @@ mod tests {
assert_eq!(alter_request.schema_name, "test_schema");
assert_eq!("monitor".to_string(), alter_request.table_name);
let mut change_column_types = match alter_request.alter_kind {
AlterKind::ChangeColumnTypes { columns } => columns,
let mut modify_column_types = match alter_request.alter_kind {
AlterKind::ModifyColumnTypes { columns } => columns,
_ => unreachable!(),
};
let change_column_type = change_column_types.pop().unwrap();
assert_eq!("mem_usage", change_column_type.column_name);
let modify_column_type = modify_column_types.pop().unwrap();
assert_eq!("mem_usage", modify_column_type.column_name);
assert_eq!(
ConcreteDataType::string_datatype(),
change_column_type.target_type
modify_column_type.target_type
);
}
#[test]
fn test_drop_column_expr() {
let expr = AlterExpr {
let expr = AlterTableExpr {
catalog_name: "test_catalog".to_string(),
schema_name: "test_schema".to_string(),
table_name: "monitor".to_string(),

View File

@@ -120,14 +120,20 @@ pub enum Error {
location: Location,
},
#[snafu(display("Invalid change table option request"))]
InvalidChangeTableOptionRequest {
#[snafu(display("Invalid set table option request"))]
InvalidSetTableOptionRequest {
#[snafu(source)]
error: MetadataError,
},
#[snafu(display("Invalid change fulltext option request"))]
InvalidChangeFulltextOptionRequest {
#[snafu(display("Invalid unset table option request"))]
InvalidUnsetTableOptionRequest {
#[snafu(source)]
error: MetadataError,
},
#[snafu(display("Invalid set fulltext option request"))]
InvalidSetFulltextOptionRequest {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
@@ -156,8 +162,9 @@ impl ErrorExt for Error {
Error::UnknownColumnDataType { .. } | Error::InvalidFulltextColumnType { .. } => {
StatusCode::InvalidArguments
}
Error::InvalidChangeTableOptionRequest { .. }
| Error::InvalidChangeFulltextOptionRequest { .. } => StatusCode::InvalidArguments,
Error::InvalidSetTableOptionRequest { .. }
| Error::InvalidUnsetTableOptionRequest { .. }
| Error::InvalidSetFulltextOptionRequest { .. } => StatusCode::InvalidArguments,
}
}

View File

@@ -32,6 +32,7 @@ use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
use crate::{ClusterId, DatanodeId};
pub mod alter_database;
pub mod alter_logical_tables;
pub mod alter_table;
pub mod create_database;

View File

@@ -0,0 +1,248 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use common_telemetry::tracing::info;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use strum::AsRefStr;
use super::utils::handle_retry_error;
use crate::cache_invalidator::Context;
use crate::ddl::DdlContext;
use crate::error::{Result, SchemaNotFoundSnafu};
use crate::instruction::CacheIdent;
use crate::key::schema_name::{SchemaName, SchemaNameKey, SchemaNameValue};
use crate::key::DeserializedValueWithBytes;
use crate::lock_key::{CatalogLock, SchemaLock};
use crate::rpc::ddl::UnsetDatabaseOption::{self};
use crate::rpc::ddl::{AlterDatabaseKind, AlterDatabaseTask, SetDatabaseOption};
use crate::ClusterId;
pub struct AlterDatabaseProcedure {
pub context: DdlContext,
pub data: AlterDatabaseData,
}
fn build_new_schema_value(
mut value: SchemaNameValue,
alter_kind: &AlterDatabaseKind,
) -> Result<SchemaNameValue> {
match alter_kind {
AlterDatabaseKind::SetDatabaseOptions(options) => {
for option in options.0.iter() {
match option {
SetDatabaseOption::Ttl(ttl) => {
if ttl.is_zero() {
value.ttl = None;
} else {
value.ttl = Some(*ttl);
}
}
}
}
}
AlterDatabaseKind::UnsetDatabaseOptions(keys) => {
for key in keys.0.iter() {
match key {
UnsetDatabaseOption::Ttl => value.ttl = None,
}
}
}
}
Ok(value)
}
impl AlterDatabaseProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterDatabase";
pub fn new(
cluster_id: ClusterId,
task: AlterDatabaseTask,
context: DdlContext,
) -> Result<Self> {
Ok(Self {
context,
data: AlterDatabaseData::new(task, cluster_id)?,
})
}
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data = serde_json::from_str(json).context(FromJsonSnafu)?;
Ok(Self { context, data })
}
pub async fn on_prepare(&mut self) -> Result<Status> {
let value = self
.context
.table_metadata_manager
.schema_manager()
.get(SchemaNameKey::new(self.data.catalog(), self.data.schema()))
.await?;
ensure!(
value.is_some(),
SchemaNotFoundSnafu {
table_schema: self.data.schema(),
}
);
self.data.schema_value = value;
self.data.state = AlterDatabaseState::UpdateMetadata;
Ok(Status::executing(true))
}
pub async fn on_update_metadata(&mut self) -> Result<Status> {
let schema_name = SchemaNameKey::new(self.data.catalog(), self.data.schema());
// Safety: schema_value is not None.
let current_schema_value = self.data.schema_value.as_ref().unwrap();
let new_schema_value = build_new_schema_value(
current_schema_value.get_inner_ref().clone(),
&self.data.kind,
)?;
self.context
.table_metadata_manager
.schema_manager()
.update(schema_name, current_schema_value, &new_schema_value)
.await?;
info!("Updated database metadata for schema {schema_name}");
self.data.state = AlterDatabaseState::InvalidateSchemaCache;
Ok(Status::executing(true))
}
pub async fn on_invalidate_schema_cache(&mut self) -> Result<Status> {
let cache_invalidator = &self.context.cache_invalidator;
cache_invalidator
.invalidate(
&Context::default(),
&[CacheIdent::SchemaName(SchemaName {
catalog_name: self.data.catalog().to_string(),
schema_name: self.data.schema().to_string(),
})],
)
.await?;
Ok(Status::done())
}
}
#[async_trait]
impl Procedure for AlterDatabaseProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
match self.data.state {
AlterDatabaseState::Prepare => self.on_prepare().await,
AlterDatabaseState::UpdateMetadata => self.on_update_metadata().await,
AlterDatabaseState::InvalidateSchemaCache => self.on_invalidate_schema_cache().await,
}
.map_err(handle_retry_error)
}
fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
}
fn lock_key(&self) -> LockKey {
let catalog = self.data.catalog();
let schema = self.data.schema();
let lock_key = vec![
CatalogLock::Read(catalog).into(),
SchemaLock::write(catalog, schema).into(),
];
LockKey::new(lock_key)
}
}
#[derive(Debug, Serialize, Deserialize, AsRefStr)]
enum AlterDatabaseState {
Prepare,
UpdateMetadata,
InvalidateSchemaCache,
}
/// The data of alter database procedure.
#[derive(Debug, Serialize, Deserialize)]
pub struct AlterDatabaseData {
cluster_id: ClusterId,
state: AlterDatabaseState,
kind: AlterDatabaseKind,
catalog_name: String,
schema_name: String,
schema_value: Option<DeserializedValueWithBytes<SchemaNameValue>>,
}
impl AlterDatabaseData {
pub fn new(task: AlterDatabaseTask, cluster_id: ClusterId) -> Result<Self> {
Ok(Self {
cluster_id,
state: AlterDatabaseState::Prepare,
kind: AlterDatabaseKind::try_from(task.alter_expr.kind.unwrap())?,
catalog_name: task.alter_expr.catalog_name,
schema_name: task.alter_expr.schema_name,
schema_value: None,
})
}
pub fn catalog(&self) -> &str {
&self.catalog_name
}
pub fn schema(&self) -> &str {
&self.schema_name
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use crate::ddl::alter_database::build_new_schema_value;
use crate::key::schema_name::SchemaNameValue;
use crate::rpc::ddl::{
AlterDatabaseKind, SetDatabaseOption, SetDatabaseOptions, UnsetDatabaseOption,
UnsetDatabaseOptions,
};
#[test]
fn test_build_new_schema_value() {
let set_ttl = AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(vec![
SetDatabaseOption::Ttl(Duration::from_secs(10)),
]));
let current_schema_value = SchemaNameValue::default();
let new_schema_value =
build_new_schema_value(current_schema_value.clone(), &set_ttl).unwrap();
assert_eq!(new_schema_value.ttl, Some(Duration::from_secs(10)));
let unset_ttl_alter_kind =
AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(vec![
UnsetDatabaseOption::Ttl,
]));
let new_schema_value =
build_new_schema_value(current_schema_value, &unset_ttl_alter_kind).unwrap();
assert_eq!(new_schema_value.ttl, None);
}
}

View File

@@ -14,7 +14,7 @@
use std::collections::HashSet;
use api::v1::alter_expr::Kind;
use api::v1::alter_table_expr::Kind;
use snafu::{ensure, OptionExt};
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use api::v1;
use api::v1::alter_expr::Kind;
use api::v1::alter_table_expr::Kind;
use api::v1::region::{
alter_request, region_request, AddColumn, AddColumns, AlterRequest, AlterRequests,
RegionColumnDef, RegionRequest, RegionRequestHeader,

View File

@@ -19,7 +19,7 @@ mod update_metadata;
use std::vec;
use api::v1::alter_expr::Kind;
use api::v1::alter_table_expr::Kind;
use api::v1::RenameTable;
use async_trait::async_trait;
use common_error::ext::ErrorExt;

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::alter_expr::Kind;
use api::v1::alter_table_expr::Kind;
use api::v1::RenameTable;
use common_catalog::format_full_table_name;
use snafu::ensure;

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::alter_expr::Kind;
use api::v1::alter_table_expr::Kind;
use api::v1::region::region_request::Body;
use api::v1::region::{
alter_request, AddColumn, AddColumns, AlterRequest, DropColumn, DropColumns, RegionColumnDef,
@@ -91,7 +91,7 @@ fn create_proto_alter_kind(
add_columns,
})))
}
Kind::ChangeColumnTypes(x) => Ok(Some(alter_request::Kind::ChangeColumnTypes(x.clone()))),
Kind::ModifyColumnTypes(x) => Ok(Some(alter_request::Kind::ModifyColumnTypes(x.clone()))),
Kind::DropColumns(x) => {
let drop_columns = x
.drop_columns
@@ -106,9 +106,11 @@ fn create_proto_alter_kind(
})))
}
Kind::RenameTable(_) => Ok(None),
Kind::ChangeTableOptions(v) => Ok(Some(alter_request::Kind::ChangeTableOptions(v.clone()))),
Kind::ChangeColumnFulltext(v) => {
Ok(Some(alter_request::Kind::ChangeColumnFulltext(v.clone())))
Kind::SetTableOptions(v) => Ok(Some(alter_request::Kind::SetTableOptions(v.clone()))),
Kind::UnsetTableOptions(v) => Ok(Some(alter_request::Kind::UnsetTableOptions(v.clone()))),
Kind::SetColumnFulltext(v) => Ok(Some(alter_request::Kind::SetColumnFulltext(v.clone()))),
Kind::UnsetColumnFulltext(v) => {
Ok(Some(alter_request::Kind::UnsetColumnFulltext(v.clone())))
}
}
}
@@ -119,12 +121,12 @@ mod tests {
use std::sync::Arc;
use api::v1::add_column_location::LocationType;
use api::v1::alter_expr::Kind;
use api::v1::alter_table_expr::Kind;
use api::v1::region::region_request::Body;
use api::v1::region::RegionColumnDef;
use api::v1::{
region, AddColumn, AddColumnLocation, AddColumns, AlterExpr, ChangeColumnType,
ChangeColumnTypes, ColumnDataType, ColumnDef as PbColumnDef, SemanticType,
region, AddColumn, AddColumnLocation, AddColumns, AlterTableExpr, ColumnDataType,
ColumnDef as PbColumnDef, ModifyColumnType, ModifyColumnTypes, SemanticType,
};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use store_api::storage::{RegionId, TableId};
@@ -213,7 +215,7 @@ mod tests {
prepare_ddl_context().await;
let task = AlterTableTask {
alter_table: AlterExpr {
alter_table: AlterTableExpr {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name,
@@ -280,12 +282,12 @@ mod tests {
prepare_ddl_context().await;
let task = AlterTableTask {
alter_table: AlterExpr {
alter_table: AlterTableExpr {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name,
kind: Some(Kind::ChangeColumnTypes(ChangeColumnTypes {
change_column_types: vec![ChangeColumnType {
kind: Some(Kind::ModifyColumnTypes(ModifyColumnTypes {
modify_column_types: vec![ModifyColumnType {
column_name: "cpu".to_string(),
target_type: ColumnDataType::String as i32,
target_type_extension: None,
@@ -306,9 +308,9 @@ mod tests {
assert_eq!(alter_region_request.schema_version, 1);
assert_eq!(
alter_region_request.kind,
Some(region::alter_request::Kind::ChangeColumnTypes(
ChangeColumnTypes {
change_column_types: vec![ChangeColumnType {
Some(region::alter_request::Kind::ModifyColumnTypes(
ModifyColumnTypes {
modify_column_types: vec![ModifyColumnType {
column_name: "cpu".to_string(),
target_type: ColumnDataType::String as i32,
target_type_extension: None,

View File

@@ -52,9 +52,11 @@ impl AlterTableProcedure {
new_info.name = new_table_name.to_string();
}
AlterKind::DropColumns { .. }
| AlterKind::ChangeColumnTypes { .. }
| AlterKind::ChangeTableOptions { .. }
| AlterKind::ChangeColumnFulltext { .. } => {}
| AlterKind::ModifyColumnTypes { .. }
| AlterKind::SetTableOptions { .. }
| AlterKind::UnsetTableOptions { .. }
| AlterKind::SetColumnFulltext { .. }
| AlterKind::UnsetColumnFulltext { .. } => {}
}
Ok(new_info)

View File

@@ -28,6 +28,7 @@ use common_procedure::{
use common_telemetry::info;
use common_telemetry::tracing_context::TracingContext;
use futures::future::join_all;
use futures::TryStreamExt;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
@@ -43,7 +44,7 @@ use crate::instruction::{CacheIdent, CreateFlow};
use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::flow::flow_route::FlowRouteValue;
use crate::key::table_name::TableNameKey;
use crate::key::{FlowId, FlowPartitionId};
use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId};
use crate::lock_key::{CatalogLock, FlowNameLock, TableNameLock};
use crate::peer::Peer;
use crate::rpc::ddl::{CreateFlowTask, QueryContext};
@@ -75,6 +76,7 @@ impl CreateFlowProcedure {
source_table_ids: vec![],
query_context,
state: CreateFlowState::Prepare,
prev_flow_info_value: None,
},
}
}
@@ -90,6 +92,7 @@ impl CreateFlowProcedure {
let flow_name = &self.data.task.flow_name;
let sink_table_name = &self.data.task.sink_table_name;
let create_if_not_exists = self.data.task.create_if_not_exists;
let or_replace = self.data.task.or_replace;
let flow_name_value = self
.context
@@ -98,16 +101,56 @@ impl CreateFlowProcedure {
.get(catalog_name, flow_name)
.await?;
if create_if_not_exists && or_replace {
// this is forbidden because not clear what does that mean exactly
return error::UnsupportedSnafu {
operation: "Create flow with both `IF NOT EXISTS` and `OR REPLACE`".to_string(),
}
.fail();
}
if let Some(value) = flow_name_value {
ensure!(
create_if_not_exists,
create_if_not_exists || or_replace,
error::FlowAlreadyExistsSnafu {
flow_name: format_full_flow_name(catalog_name, flow_name),
}
);
let flow_id = value.flow_id();
return Ok(Status::done_with_output(flow_id));
if create_if_not_exists {
info!("Flow already exists, flow_id: {}", flow_id);
return Ok(Status::done_with_output(flow_id));
}
let flow_id = value.flow_id();
let peers = self
.context
.flow_metadata_manager
.flow_route_manager()
.routes(flow_id)
.map_ok(|(_, value)| value.peer)
.try_collect::<Vec<_>>()
.await?;
self.data.flow_id = Some(flow_id);
self.data.peers = peers;
info!("Replacing flow, flow_id: {}", flow_id);
let flow_info_value = self
.context
.flow_metadata_manager
.flow_info_manager()
.get_raw(flow_id)
.await?;
ensure!(
flow_info_value.is_some(),
error::FlowNotFoundSnafu {
flow_name: format_full_flow_name(catalog_name, flow_name),
}
);
self.data.prev_flow_info_value = flow_info_value;
}
// Ensures sink table doesn't exist.
@@ -128,7 +171,9 @@ impl CreateFlowProcedure {
}
self.collect_source_tables().await?;
self.allocate_flow_id().await?;
if self.data.flow_id.is_none() {
self.allocate_flow_id().await?;
}
self.data.state = CreateFlowState::CreateFlows;
Ok(Status::executing(true))
@@ -153,7 +198,10 @@ impl CreateFlowProcedure {
.map_err(add_peer_context_if_needed(peer.clone()))
});
}
info!(
"Creating flow({:?}) on flownodes with peers={:?}",
self.data.flow_id, self.data.peers
);
join_all(create_flow)
.await
.into_iter()
@@ -170,18 +218,29 @@ impl CreateFlowProcedure {
async fn on_create_metadata(&mut self) -> Result<Status> {
// Safety: The flow id must be allocated.
let flow_id = self.data.flow_id.unwrap();
// TODO(weny): Support `or_replace`.
let (flow_info, flow_routes) = (&self.data).into();
self.context
.flow_metadata_manager
.create_flow_metadata(flow_id, flow_info, flow_routes)
.await?;
info!("Created flow metadata for flow {flow_id}");
if let Some(prev_flow_value) = self.data.prev_flow_info_value.as_ref()
&& self.data.task.or_replace
{
self.context
.flow_metadata_manager
.update_flow_metadata(flow_id, prev_flow_value, &flow_info, flow_routes)
.await?;
info!("Replaced flow metadata for flow {flow_id}");
} else {
self.context
.flow_metadata_manager
.create_flow_metadata(flow_id, flow_info, flow_routes)
.await?;
info!("Created flow metadata for flow {flow_id}");
}
self.data.state = CreateFlowState::InvalidateFlowCache;
Ok(Status::executing(true))
}
async fn on_broadcast(&mut self) -> Result<Status> {
debug_assert!(self.data.state == CreateFlowState::InvalidateFlowCache);
// Safety: The flow id must be allocated.
let flow_id = self.data.flow_id.unwrap();
let ctx = Context {
@@ -192,10 +251,13 @@ impl CreateFlowProcedure {
.cache_invalidator
.invalidate(
&ctx,
&[CacheIdent::CreateFlow(CreateFlow {
source_table_ids: self.data.source_table_ids.clone(),
flownodes: self.data.peers.clone(),
})],
&[
CacheIdent::CreateFlow(CreateFlow {
source_table_ids: self.data.source_table_ids.clone(),
flownodes: self.data.peers.clone(),
}),
CacheIdent::FlowId(flow_id),
],
)
.await?;
@@ -270,6 +332,9 @@ pub struct CreateFlowData {
pub(crate) peers: Vec<Peer>,
pub(crate) source_table_ids: Vec<TableId>,
pub(crate) query_context: QueryContext,
/// For verify if prev value is consistent when need to update flow metadata.
/// only set when `or_replace` is true.
pub(crate) prev_flow_info_value: Option<DeserializedValueWithBytes<FlowInfoValue>>,
}
impl From<&CreateFlowData> for CreateRequest {
@@ -284,8 +349,9 @@ impl From<&CreateFlowData> for CreateRequest {
.map(|table_id| api::v1::TableId { id: *table_id })
.collect_vec(),
sink_table_name: Some(value.task.sink_table_name.clone().into()),
// Always be true
// Always be true to ensure idempotent in case of retry
create_if_not_exists: true,
or_replace: value.task.or_replace,
expire_after: value.task.expire_after.map(|value| ExpireAfter { value }),
comment: value.task.comment.clone(),
sql: value.task.sql.clone(),

View File

@@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::alter_expr::Kind;
use api::v1::{AddColumn, AddColumns, AlterExpr, ColumnDef, RenameTable};
use api::v1::alter_table_expr::Kind;
use api::v1::{AddColumn, AddColumns, AlterTableExpr, ColumnDef, RenameTable};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use derive_builder::Builder;
@@ -32,7 +32,7 @@ pub struct TestAlterTableExpr {
new_table_name: Option<String>,
}
impl From<TestAlterTableExpr> for AlterExpr {
impl From<TestAlterTableExpr> for AlterTableExpr {
fn from(value: TestAlterTableExpr) -> Self {
if let Some(new_table_name) = value.new_table_name {
Self {

View File

@@ -16,11 +16,11 @@ use std::assert_matches::assert_matches;
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::alter_expr::Kind;
use api::v1::alter_table_expr::Kind;
use api::v1::region::{region_request, RegionRequest};
use api::v1::{
AddColumn, AddColumns, AlterExpr, ChangeTableOption, ChangeTableOptions, ColumnDataType,
ColumnDef as PbColumnDef, DropColumn, DropColumns, SemanticType,
AddColumn, AddColumns, AlterTableExpr, ColumnDataType, ColumnDef as PbColumnDef, DropColumn,
DropColumns, SemanticType, SetTableOptions,
};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::ErrorExt;
@@ -133,7 +133,7 @@ async fn test_on_submit_alter_request() {
.unwrap();
let alter_table_task = AlterTableTask {
alter_table: AlterExpr {
alter_table: AlterTableExpr {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
@@ -219,7 +219,7 @@ async fn test_on_submit_alter_request_with_outdated_request() {
.unwrap();
let alter_table_task = AlterTableTask {
alter_table: AlterExpr {
alter_table: AlterTableExpr {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
@@ -316,7 +316,7 @@ async fn test_on_update_metadata_add_columns() {
.unwrap();
let task = AlterTableTask {
alter_table: AlterExpr {
alter_table: AlterTableExpr {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
@@ -385,12 +385,12 @@ async fn test_on_update_table_options() {
.unwrap();
let task = AlterTableTask {
alter_table: AlterExpr {
alter_table: AlterTableExpr {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
kind: Some(Kind::ChangeTableOptions(ChangeTableOptions {
change_table_options: vec![ChangeTableOption {
kind: Some(Kind::SetTableOptions(SetTableOptions {
table_options: vec![api::v1::Option {
key: TTL_KEY.to_string(),
value: "1d".to_string(),
}],

View File

@@ -24,6 +24,7 @@ use derive_builder::Builder;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::TableId;
use crate::ddl::alter_database::AlterDatabaseProcedure;
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
use crate::ddl::alter_table::AlterTableProcedure;
use crate::ddl::create_database::CreateDatabaseProcedure;
@@ -47,12 +48,13 @@ use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use crate::rpc::ddl::DdlTask::{
AlterLogicalTables, AlterTable, CreateDatabase, CreateFlow, CreateLogicalTables, CreateTable,
CreateView, DropDatabase, DropFlow, DropLogicalTables, DropTable, DropView, TruncateTable,
AlterDatabase, AlterLogicalTables, AlterTable, CreateDatabase, CreateFlow, CreateLogicalTables,
CreateTable, CreateView, DropDatabase, DropFlow, DropLogicalTables, DropTable, DropView,
TruncateTable,
};
use crate::rpc::ddl::{
AlterTableTask, CreateDatabaseTask, CreateFlowTask, CreateTableTask, CreateViewTask,
DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask, QueryContext,
AlterDatabaseTask, AlterTableTask, CreateDatabaseTask, CreateFlowTask, CreateTableTask,
CreateViewTask, DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask, QueryContext,
SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
};
use crate::rpc::procedure;
@@ -129,6 +131,7 @@ impl DdlManager {
CreateFlowProcedure,
AlterTableProcedure,
AlterLogicalTablesProcedure,
AlterDatabaseProcedure,
DropTableProcedure,
DropFlowProcedure,
TruncateTableProcedure,
@@ -294,6 +297,18 @@ impl DdlManager {
self.submit_procedure(procedure_with_id).await
}
pub async fn submit_alter_database(
&self,
cluster_id: ClusterId,
alter_database_task: AlterDatabaseTask,
) -> Result<(ProcedureId, Option<Output>)> {
let context = self.create_context();
let procedure = AlterDatabaseProcedure::new(cluster_id, alter_database_task, context)?;
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
}
/// Submits and executes a create flow task.
#[tracing::instrument(skip_all)]
pub async fn submit_create_flow_task(
@@ -593,6 +608,28 @@ async fn handle_drop_database_task(
})
}
async fn handle_alter_database_task(
ddl_manager: &DdlManager,
cluster_id: ClusterId,
alter_database_task: AlterDatabaseTask,
) -> Result<SubmitDdlTaskResponse> {
let (id, _) = ddl_manager
.submit_alter_database(cluster_id, alter_database_task.clone())
.await?;
let procedure_id = id.to_string();
info!(
"Database {}.{} is altered via procedure_id {id:?}",
alter_database_task.catalog(),
alter_database_task.schema()
);
Ok(SubmitDdlTaskResponse {
key: procedure_id.into(),
..Default::default()
})
}
async fn handle_drop_flow_task(
ddl_manager: &DdlManager,
cluster_id: ClusterId,
@@ -655,10 +692,17 @@ async fn handle_create_flow_task(
procedure_id: &procedure_id,
err_msg: "downcast to `u32`",
})?);
info!(
"Flow {}.{}({flow_id}) is created via procedure_id {id:?}",
create_flow_task.catalog_name, create_flow_task.flow_name,
);
if !create_flow_task.or_replace {
info!(
"Flow {}.{}({flow_id}) is created via procedure_id {id:?}",
create_flow_task.catalog_name, create_flow_task.flow_name,
);
} else {
info!(
"Flow {}.{}({flow_id}) is replaced via procedure_id {id:?}",
create_flow_task.catalog_name, create_flow_task.flow_name,
);
}
Ok(SubmitDdlTaskResponse {
key: procedure_id.into(),
@@ -772,6 +816,9 @@ impl ProcedureExecutor for DdlManager {
DropDatabase(drop_database_task) => {
handle_drop_database_task(self, cluster_id, drop_database_task).await
}
AlterDatabase(alter_database_task) => {
handle_alter_database_task(self, cluster_id, alter_database_task).await
}
CreateFlow(create_flow_task) => {
handle_create_flow_task(
self,

View File

@@ -593,6 +593,21 @@ pub enum Error {
location: Location,
},
#[snafu(display("Invalid set database option, key: {}, value: {}", key, value))]
InvalidSetDatabaseOption {
key: String,
value: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid unset database option, key: {}", key))]
InvalidUnsetDatabaseOption {
key: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid prefix: {}, key: {}", prefix, key))]
MismatchPrefix {
prefix: String,
@@ -730,7 +745,9 @@ impl ErrorExt for Error {
| AlterLogicalTablesInvalidArguments { .. }
| CreateLogicalTablesInvalidArguments { .. }
| MismatchPrefix { .. }
| TlsConfig { .. } => StatusCode::InvalidArguments,
| TlsConfig { .. }
| InvalidSetDatabaseOption { .. }
| InvalidUnsetDatabaseOption { .. } => StatusCode::InvalidArguments,
FlowNotFound { .. } => StatusCode::FlowNotFound,
FlowRouteNotFound { .. } => StatusCode::Unexpected,

View File

@@ -90,6 +90,7 @@
pub mod catalog_name;
pub mod datanode_table;
pub mod flow;
pub mod maintenance;
pub mod node_address;
mod schema_metadata_manager;
pub mod schema_name;
@@ -564,13 +565,13 @@ impl TableMetadataManager {
let mut set = TxnOpGetResponseSet::from(&mut r.responses);
let remote_table_info = on_create_table_info_failure(&mut set)?
.context(error::UnexpectedSnafu {
err_msg: "Reads the empty table info during the create table metadata",
err_msg: "Reads the empty table info in comparing operation of creating table metadata",
})?
.into_inner();
let remote_view_info = on_create_view_info_failure(&mut set)?
.context(error::UnexpectedSnafu {
err_msg: "Reads the empty view info during the create view info",
err_msg: "Reads the empty view info in comparing operation of creating view metadata",
})?
.into_inner();
@@ -643,13 +644,13 @@ impl TableMetadataManager {
let mut set = TxnOpGetResponseSet::from(&mut r.responses);
let remote_table_info = on_create_table_info_failure(&mut set)?
.context(error::UnexpectedSnafu {
err_msg: "Reads the empty table info during the create table metadata",
err_msg: "Reads the empty table info in comparing operation of creating table metadata",
})?
.into_inner();
let remote_table_route = on_create_table_route_failure(&mut set)?
.context(error::UnexpectedSnafu {
err_msg: "Reads the empty table route during the create table metadata",
err_msg: "Reads the empty table route in comparing operation of creating table metadata",
})?
.into_inner();
@@ -730,13 +731,13 @@ impl TableMetadataManager {
for on_failure in on_failures {
let remote_table_info = (on_failure.on_create_table_info_failure)(&mut set)?
.context(error::UnexpectedSnafu {
err_msg: "Reads the empty table info during the create table metadata",
err_msg: "Reads the empty table info in comparing operation of creating table metadata",
})?
.into_inner();
let remote_table_route = (on_failure.on_create_table_route_failure)(&mut set)?
.context(error::UnexpectedSnafu {
err_msg: "Reads the empty table route during the create table metadata",
err_msg: "Reads the empty table route in comparing operation of creating table metadata",
})?
.into_inner();
@@ -914,7 +915,7 @@ impl TableMetadataManager {
let mut set = TxnOpGetResponseSet::from(&mut r.responses);
let remote_table_info = on_update_table_info_failure(&mut set)?
.context(error::UnexpectedSnafu {
err_msg: "Reads the empty table info during the rename table metadata",
err_msg: "Reads the empty table info in comparing operation of the rename table metadata",
})?
.into_inner();
@@ -960,7 +961,7 @@ impl TableMetadataManager {
let mut set = TxnOpGetResponseSet::from(&mut r.responses);
let remote_table_info = on_update_table_info_failure(&mut set)?
.context(error::UnexpectedSnafu {
err_msg: "Reads the empty table info during the updating table info",
err_msg: "Reads the empty table info in comparing operation of the updating table info",
})?
.into_inner();
@@ -1011,7 +1012,7 @@ impl TableMetadataManager {
let mut set = TxnOpGetResponseSet::from(&mut r.responses);
let remote_view_info = on_update_view_info_failure(&mut set)?
.context(error::UnexpectedSnafu {
err_msg: "Reads the empty view info during the updating view info",
err_msg: "Reads the empty view info in comparing operation of the updating view info",
})?
.into_inner();
@@ -1068,7 +1069,7 @@ impl TableMetadataManager {
for on_failure in on_failures {
let remote_table_info = (on_failure.on_update_table_info_failure)(&mut set)?
.context(error::UnexpectedSnafu {
err_msg: "Reads the empty table info during the updating table info",
err_msg: "Reads the empty table info in comparing operation of the updating table info",
})?
.into_inner();
@@ -1120,7 +1121,7 @@ impl TableMetadataManager {
let mut set = TxnOpGetResponseSet::from(&mut r.responses);
let remote_table_route = on_update_table_route_failure(&mut set)?
.context(error::UnexpectedSnafu {
err_msg: "Reads the empty table route during the updating table route",
err_msg: "Reads the empty table route in comparing operation of the updating table route",
})?
.into_inner();
@@ -1172,7 +1173,7 @@ impl TableMetadataManager {
let mut set = TxnOpGetResponseSet::from(&mut r.responses);
let remote_table_route = on_update_table_route_failure(&mut set)?
.context(error::UnexpectedSnafu {
err_msg: "Reads the empty table route during the updating leader region status",
err_msg: "Reads the empty table route in comparing operation of the updating leader region status",
})?
.into_inner();
@@ -1260,7 +1261,8 @@ impl_metadata_value! {
FlowNameValue,
FlowRouteValue,
TableFlowValue,
NodeAddressValue
NodeAddressValue,
SchemaNameValue
}
impl_optional_metadata_value! {

View File

@@ -38,7 +38,7 @@ use crate::key::flow::flow_name::FlowNameManager;
use crate::key::flow::flownode_flow::FlownodeFlowManager;
pub use crate::key::flow::table_flow::{TableFlowManager, TableFlowManagerRef};
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{FlowId, MetadataKey};
use crate::key::{DeserializedValueWithBytes, FlowId, MetadataKey};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::BatchDeleteRequest;
@@ -197,7 +197,7 @@ impl FlowMetadataManager {
on_create_flow_flow_name_failure(&mut set)?.with_context(|| {
error::UnexpectedSnafu {
err_msg: format!(
"Reads the empty flow name during the creating flow, flow_id: {flow_id}"
"Reads the empty flow name in comparing operation of the creating flow, flow_id: {flow_id}"
),
}
})?;
@@ -220,7 +220,7 @@ impl FlowMetadataManager {
let remote_flow =
on_create_flow_failure(&mut set)?.with_context(|| error::UnexpectedSnafu {
err_msg: format!(
"Reads the empty flow during the creating flow, flow_id: {flow_id}"
"Reads the empty flow in comparing operation of creating flow, flow_id: {flow_id}"
),
})?;
let op_name = "creating flow";
@@ -230,6 +230,102 @@ impl FlowMetadataManager {
Ok(())
}
/// Update metadata for flow and returns an error if old metadata IS NOT exists.
pub async fn update_flow_metadata(
&self,
flow_id: FlowId,
current_flow_info: &DeserializedValueWithBytes<FlowInfoValue>,
new_flow_info: &FlowInfoValue,
flow_routes: Vec<(FlowPartitionId, FlowRouteValue)>,
) -> Result<()> {
let (create_flow_flow_name_txn, on_create_flow_flow_name_failure) =
self.flow_name_manager.build_update_txn(
&new_flow_info.catalog_name,
&new_flow_info.flow_name,
flow_id,
)?;
let (create_flow_txn, on_create_flow_failure) =
self.flow_info_manager
.build_update_txn(flow_id, current_flow_info, new_flow_info)?;
let create_flow_routes_txn = self
.flow_route_manager
.build_create_txn(flow_id, flow_routes.clone())?;
let create_flownode_flow_txn = self
.flownode_flow_manager
.build_create_txn(flow_id, new_flow_info.flownode_ids().clone());
let create_table_flow_txn = self.table_flow_manager.build_create_txn(
flow_id,
flow_routes
.into_iter()
.map(|(partition_id, route)| (partition_id, TableFlowValue { peer: route.peer }))
.collect(),
new_flow_info.source_table_ids(),
)?;
let txn = Txn::merge_all(vec![
create_flow_flow_name_txn,
create_flow_txn,
create_flow_routes_txn,
create_flownode_flow_txn,
create_table_flow_txn,
]);
info!(
"Creating flow {}.{}({}), with {} txn operations",
new_flow_info.catalog_name,
new_flow_info.flow_name,
flow_id,
txn.max_operations()
);
let mut resp = self.kv_backend.txn(txn).await?;
if !resp.succeeded {
let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
let remote_flow_flow_name =
on_create_flow_flow_name_failure(&mut set)?.with_context(|| {
error::UnexpectedSnafu {
err_msg: format!(
"Reads the empty flow name in comparing operation of the updating flow, flow_id: {flow_id}"
),
}
})?;
if remote_flow_flow_name.flow_id() != flow_id {
info!(
"Trying to updating flow {}.{}({}), but flow({}) already exists with a different flow id",
new_flow_info.catalog_name,
new_flow_info.flow_name,
flow_id,
remote_flow_flow_name.flow_id()
);
return error::UnexpectedSnafu {
err_msg: format!(
"Reads different flow id when updating flow({2}.{3}), prev flow id = {0}, updating with flow id = {1}",
remote_flow_flow_name.flow_id(),
flow_id,
new_flow_info.catalog_name,
new_flow_info.flow_name,
),
}.fail();
}
let remote_flow =
on_create_flow_failure(&mut set)?.with_context(|| error::UnexpectedSnafu {
err_msg: format!(
"Reads the empty flow in comparing operation of the updating flow, flow_id: {flow_id}"
),
})?;
let op_name = "updating flow";
ensure_values!(*remote_flow, new_flow_info.clone(), op_name);
}
Ok(())
}
fn flow_metadata_keys(&self, flow_id: FlowId, flow_value: &FlowInfoValue) -> Vec<Vec<u8>> {
let source_table_ids = flow_value.source_table_ids();
let mut keys =
@@ -560,4 +656,222 @@ mod tests {
// Ensures all keys are deleted
assert!(mem_kv.is_empty())
}
#[tokio::test]
async fn test_update_flow_metadata() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
let flow_id = 10;
let flow_value = test_flow_info_value(
"flow",
[(0, 1u64), (1, 2u64)].into(),
vec![1024, 1025, 1026],
);
let flow_routes = vec![
(
1u32,
FlowRouteValue {
peer: Peer::empty(1),
},
),
(
2,
FlowRouteValue {
peer: Peer::empty(2),
},
),
];
flow_metadata_manager
.create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
.await
.unwrap();
let new_flow_value = {
let mut tmp = flow_value.clone();
tmp.raw_sql = "new".to_string();
tmp
};
// Update flow instead
flow_metadata_manager
.update_flow_metadata(
flow_id,
&DeserializedValueWithBytes::from_inner(flow_value.clone()),
&new_flow_value,
flow_routes.clone(),
)
.await
.unwrap();
let got = flow_metadata_manager
.flow_info_manager()
.get(flow_id)
.await
.unwrap()
.unwrap();
let routes = flow_metadata_manager
.flow_route_manager()
.routes(flow_id)
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(
routes,
vec![
(
FlowRouteKey::new(flow_id, 1),
FlowRouteValue {
peer: Peer::empty(1),
},
),
(
FlowRouteKey::new(flow_id, 2),
FlowRouteValue {
peer: Peer::empty(2),
},
),
]
);
assert_eq!(got, new_flow_value);
let flows = flow_metadata_manager
.flownode_flow_manager()
.flows(1)
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(flows, vec![(flow_id, 0)]);
for table_id in [1024, 1025, 1026] {
let nodes = flow_metadata_manager
.table_flow_manager()
.flows(table_id)
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(
nodes,
vec![
(
TableFlowKey::new(table_id, 1, flow_id, 1),
TableFlowValue {
peer: Peer::empty(1)
}
),
(
TableFlowKey::new(table_id, 2, flow_id, 2),
TableFlowValue {
peer: Peer::empty(2)
}
)
]
);
}
}
#[tokio::test]
async fn test_update_flow_metadata_flow_replace_diff_id_err() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
let flow_id = 10;
let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
let flow_routes = vec![
(
1u32,
FlowRouteValue {
peer: Peer::empty(1),
},
),
(
2,
FlowRouteValue {
peer: Peer::empty(2),
},
),
];
flow_metadata_manager
.create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
.await
.unwrap();
// update again with same flow id
flow_metadata_manager
.update_flow_metadata(
flow_id,
&DeserializedValueWithBytes::from_inner(flow_value.clone()),
&flow_value,
flow_routes.clone(),
)
.await
.unwrap();
// update again with wrong flow id, expected error
let err = flow_metadata_manager
.update_flow_metadata(
flow_id + 1,
&DeserializedValueWithBytes::from_inner(flow_value.clone()),
&flow_value,
flow_routes,
)
.await
.unwrap_err();
assert_matches!(err, error::Error::Unexpected { .. });
assert!(err
.to_string()
.contains("Reads different flow id when updating flow"));
}
#[tokio::test]
async fn test_update_flow_metadata_unexpected_err_prev_value_diff() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
let flow_id = 10;
let catalog_name = "greptime";
let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
let flow_routes = vec![
(
1u32,
FlowRouteValue {
peer: Peer::empty(1),
},
),
(
2,
FlowRouteValue {
peer: Peer::empty(2),
},
),
];
flow_metadata_manager
.create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
.await
.unwrap();
// Creates again.
let another_sink_table_name = TableName {
catalog_name: catalog_name.to_string(),
schema_name: "my_schema".to_string(),
table_name: "another_sink_table".to_string(),
};
let flow_value = FlowInfoValue {
catalog_name: "greptime".to_string(),
flow_name: "flow".to_string(),
source_table_ids: vec![1024, 1025, 1026],
sink_table_name: another_sink_table_name,
flownode_ids: [(0, 1u64)].into(),
raw_sql: "raw".to_string(),
expire_after: Some(300),
comment: "hi".to_string(),
options: Default::default(),
};
let err = flow_metadata_manager
.update_flow_metadata(
flow_id,
&DeserializedValueWithBytes::from_inner(flow_value.clone()),
&flow_value,
flow_routes.clone(),
)
.await
.unwrap_err();
assert!(
err.to_string().contains("Reads the different value"),
"error: {:?}",
err
);
}
}

View File

@@ -26,7 +26,7 @@ use crate::error::{self, Result};
use crate::key::flow::FlowScoped;
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey, MetadataValue};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::FlownodeId;
@@ -196,6 +196,19 @@ impl FlowInfoManager {
.transpose()
}
/// Returns the [FlowInfoValue] with original bytes of specified `flow_id`.
pub async fn get_raw(
&self,
flow_id: FlowId,
) -> Result<Option<DeserializedValueWithBytes<FlowInfoValue>>> {
let key = FlowInfoKey::new(flow_id).to_bytes();
self.kv_backend
.get(&key)
.await?
.map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value))
.transpose()
}
/// Builds a create flow transaction.
/// It is expected that the `__flow/info/{flow_id}` wasn't occupied.
/// Otherwise, the transaction will retrieve existing value.
@@ -215,6 +228,36 @@ impl FlowInfoManager {
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)),
))
}
/// Builds a update flow transaction.
/// It is expected that the `__flow/info/{flow_id}` IS ALREADY occupied and equal to `prev_flow_value`,
/// but the new value can be the same, so to allow replace operation to happen even when the value is the same.
/// Otherwise, the transaction will retrieve existing value and fail.
pub(crate) fn build_update_txn(
&self,
flow_id: FlowId,
current_flow_value: &DeserializedValueWithBytes<FlowInfoValue>,
new_flow_value: &FlowInfoValue,
) -> Result<(
Txn,
impl FnOnce(&mut TxnOpGetResponseSet) -> FlowInfoDecodeResult,
)> {
let key = FlowInfoKey::new(flow_id).to_bytes();
let raw_value = new_flow_value.try_as_raw_value()?;
let prev_value = current_flow_value.get_raw_bytes();
let txn = Txn::new()
.when(vec![
Compare::new(key.clone(), CompareOp::NotEqual, None),
Compare::new(key.clone(), CompareOp::Equal, Some(prev_value)),
])
.and_then(vec![TxnOp::Put(key.clone(), raw_value)])
.or_else(vec![TxnOp::Get(key.clone())]);
Ok((
txn,
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)),
))
}
}
#[cfg(test)]

View File

@@ -26,7 +26,7 @@ use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{
BytesAdapter, DeserializedValueWithBytes, FlowId, MetadataKey, MetadataValue, NAME_PATTERN,
};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use crate::rpc::store::RangeRequest;
@@ -237,6 +237,37 @@ impl FlowNameManager {
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
))
}
/// Builds a update flow name transaction. Which doesn't change either the name or id, just checking if they are the same.
/// It's expected that the `__flow/name/{catalog}/{flow_name}` IS already occupied,
/// and both flow name and flow id is the same.
/// Otherwise, the transaction will retrieve existing value(and fail).
pub fn build_update_txn(
&self,
catalog_name: &str,
flow_name: &str,
flow_id: FlowId,
) -> Result<(
Txn,
impl FnOnce(&mut TxnOpGetResponseSet) -> FlowNameDecodeResult,
)> {
let key = FlowNameKey::new(catalog_name, flow_name);
let raw_key = key.to_bytes();
let flow_flow_name_value = FlowNameValue::new(flow_id);
let raw_value = flow_flow_name_value.try_as_raw_value()?;
let txn = Txn::new()
.when(vec![Compare::new(
raw_key.clone(),
CompareOp::Equal,
Some(raw_value),
)])
.or_else(vec![TxnOp::Get(raw_key.clone())]);
Ok((
txn,
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
))
}
}
#[cfg(test)]

View File

@@ -0,0 +1,86 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use crate::error::Result;
use crate::key::MAINTENANCE_KEY;
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::PutRequest;
pub type MaintenanceModeManagerRef = Arc<MaintenanceModeManager>;
/// The maintenance mode manager.
///
/// Used to enable or disable maintenance mode.
#[derive(Clone)]
pub struct MaintenanceModeManager {
kv_backend: KvBackendRef,
}
impl MaintenanceModeManager {
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
}
/// Enables maintenance mode.
pub async fn set_maintenance_mode(&self) -> Result<()> {
let req = PutRequest {
key: Vec::from(MAINTENANCE_KEY),
value: vec![],
prev_kv: false,
};
self.kv_backend.put(req).await?;
Ok(())
}
/// Unsets maintenance mode.
pub async fn unset_maintenance_mode(&self) -> Result<()> {
self.kv_backend
.delete(MAINTENANCE_KEY.as_bytes(), false)
.await?;
Ok(())
}
/// Returns true if maintenance mode is enabled.
pub async fn maintenance_mode(&self) -> Result<bool> {
self.kv_backend.exists(MAINTENANCE_KEY.as_bytes()).await
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use crate::key::maintenance::MaintenanceModeManager;
use crate::kv_backend::memory::MemoryKvBackend;
#[tokio::test]
async fn test_maintenance_mode_manager() {
let maintenance_mode_manager = Arc::new(MaintenanceModeManager::new(Arc::new(
MemoryKvBackend::new(),
)));
assert!(!maintenance_mode_manager.maintenance_mode().await.unwrap());
maintenance_mode_manager
.set_maintenance_mode()
.await
.unwrap();
assert!(maintenance_mode_manager.maintenance_mode().await.unwrap());
maintenance_mode_manager
.unset_maintenance_mode()
.await
.unwrap();
assert!(!maintenance_mode_manager.maintenance_mode().await.unwrap());
}
}

View File

@@ -75,7 +75,10 @@ impl SchemaMetadataManager {
&table_info.table_info.catalog_name,
&table_info.table_info.schema_name,
);
self.schema_manager.get(key).await
self.schema_manager
.get(key)
.await
.map(|v| v.map(|v| v.into_inner()))
}
#[cfg(any(test, feature = "testing"))]

View File

@@ -21,10 +21,14 @@ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use futures::stream::BoxStream;
use humantime_serde::re::humantime;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use super::txn_helper::TxnOpGetResponseSet;
use super::DeserializedValueWithBytes;
use crate::ensure_values;
use crate::error::{self, Error, InvalidMetadataSnafu, ParseOptionSnafu, Result};
use crate::key::{MetadataKey, SCHEMA_NAME_KEY_PATTERN, SCHEMA_NAME_KEY_PREFIX};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use crate::rpc::store::RangeRequest;
@@ -171,6 +175,8 @@ pub struct SchemaManager {
kv_backend: KvBackendRef,
}
pub type SchemaNameDecodeResult = Result<Option<DeserializedValueWithBytes<SchemaNameValue>>>;
impl SchemaManager {
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
@@ -204,11 +210,15 @@ impl SchemaManager {
self.kv_backend.exists(&raw_key).await
}
pub async fn get(&self, schema: SchemaNameKey<'_>) -> Result<Option<SchemaNameValue>> {
pub async fn get(
&self,
schema: SchemaNameKey<'_>,
) -> Result<Option<DeserializedValueWithBytes<SchemaNameValue>>> {
let raw_key = schema.to_bytes();
let value = self.kv_backend.get(&raw_key).await?;
value
.and_then(|v| SchemaNameValue::try_from_raw_value(v.value.as_ref()).transpose())
self.kv_backend
.get(&raw_key)
.await?
.map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value))
.transpose()
}
@@ -220,6 +230,54 @@ impl SchemaManager {
Ok(())
}
pub(crate) fn build_update_txn(
&self,
schema: SchemaNameKey<'_>,
current_schema_value: &DeserializedValueWithBytes<SchemaNameValue>,
new_schema_value: &SchemaNameValue,
) -> Result<(
Txn,
impl FnOnce(&mut TxnOpGetResponseSet) -> SchemaNameDecodeResult,
)> {
let raw_key = schema.to_bytes();
let raw_value = current_schema_value.get_raw_bytes();
let new_raw_value: Vec<u8> = new_schema_value.try_as_raw_value()?;
let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value);
Ok((
txn,
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
))
}
/// Updates a [SchemaNameKey].
pub async fn update(
&self,
schema: SchemaNameKey<'_>,
current_schema_value: &DeserializedValueWithBytes<SchemaNameValue>,
new_schema_value: &SchemaNameValue,
) -> Result<()> {
let (txn, on_failure) =
self.build_update_txn(schema, current_schema_value, new_schema_value)?;
let mut r = self.kv_backend.txn(txn).await?;
if !r.succeeded {
let mut set = TxnOpGetResponseSet::from(&mut r.responses);
let remote_schema_value = on_failure(&mut set)?
.context(error::UnexpectedSnafu {
err_msg:
"Reads the empty schema name value in comparing operation of updating schema name value",
})?
.into_inner();
let op_name = "the updating schema name value";
ensure_values!(&remote_schema_value, new_schema_value, op_name);
}
Ok(())
}
/// Returns a schema stream, it lists all schemas belong to the target `catalog`.
pub fn schema_names(&self, catalog: &str) -> BoxStream<'static, Result<String>> {
let start_key = SchemaNameKey::range_start_key(catalog);
@@ -306,4 +364,42 @@ mod tests {
assert!(!manager.exists(wrong_schema_key).await.unwrap());
}
#[tokio::test]
async fn test_update_schema_value() {
let manager = SchemaManager::new(Arc::new(MemoryKvBackend::default()));
let schema_key = SchemaNameKey::new("my-catalog", "my-schema");
manager.create(schema_key, None, false).await.unwrap();
let current_schema_value = manager.get(schema_key).await.unwrap().unwrap();
let new_schema_value = SchemaNameValue {
ttl: Some(Duration::from_secs(10)),
};
manager
.update(schema_key, &current_schema_value, &new_schema_value)
.await
.unwrap();
// Update with the same value, should be ok
manager
.update(schema_key, &current_schema_value, &new_schema_value)
.await
.unwrap();
let new_schema_value = SchemaNameValue {
ttl: Some(Duration::from_secs(40)),
};
let incorrect_schema_value = SchemaNameValue {
ttl: Some(Duration::from_secs(20)),
}
.try_as_raw_value()
.unwrap();
let incorrect_schema_value =
DeserializedValueWithBytes::from_inner_slice(&incorrect_schema_value).unwrap();
manager
.update(schema_key, &incorrect_schema_value, &new_schema_value)
.await
.unwrap_err();
}
}

View File

@@ -14,25 +14,29 @@
use std::collections::{HashMap, HashSet};
use std::result;
use std::time::Duration;
use api::v1::alter_database_expr::Kind as PbAlterDatabaseKind;
use api::v1::meta::ddl_task_request::Task;
use api::v1::meta::{
AlterTableTask as PbAlterTableTask, AlterTableTasks as PbAlterTableTasks,
CreateDatabaseTask as PbCreateDatabaseTask, CreateFlowTask as PbCreateFlowTask,
CreateTableTask as PbCreateTableTask, CreateTableTasks as PbCreateTableTasks,
CreateViewTask as PbCreateViewTask, DdlTaskRequest as PbDdlTaskRequest,
DdlTaskResponse as PbDdlTaskResponse, DropDatabaseTask as PbDropDatabaseTask,
DropFlowTask as PbDropFlowTask, DropTableTask as PbDropTableTask,
DropTableTasks as PbDropTableTasks, DropViewTask as PbDropViewTask, Partition, ProcedureId,
AlterDatabaseTask as PbAlterDatabaseTask, AlterTableTask as PbAlterTableTask,
AlterTableTasks as PbAlterTableTasks, CreateDatabaseTask as PbCreateDatabaseTask,
CreateFlowTask as PbCreateFlowTask, CreateTableTask as PbCreateTableTask,
CreateTableTasks as PbCreateTableTasks, CreateViewTask as PbCreateViewTask,
DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse,
DropDatabaseTask as PbDropDatabaseTask, DropFlowTask as PbDropFlowTask,
DropTableTask as PbDropTableTask, DropTableTasks as PbDropTableTasks,
DropViewTask as PbDropViewTask, Partition, ProcedureId,
TruncateTableTask as PbTruncateTableTask,
};
use api::v1::{
AlterExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr,
DropDatabaseExpr, DropFlowExpr, DropTableExpr, DropViewExpr, ExpireAfter,
QueryContext as PbQueryContext, TruncateTableExpr,
AlterDatabaseExpr, AlterTableExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr,
CreateViewExpr, DropDatabaseExpr, DropFlowExpr, DropTableExpr, DropViewExpr, ExpireAfter,
Option as PbOption, QueryContext as PbQueryContext, TruncateTableExpr,
};
use base64::engine::general_purpose;
use base64::Engine as _;
use humantime_serde::re::humantime;
use prost::Message;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DefaultOnNull};
@@ -42,7 +46,7 @@ use table::metadata::{RawTableInfo, TableId};
use table::table_name::TableName;
use table::table_reference::TableReference;
use crate::error::{self, Result};
use crate::error::{self, InvalidSetDatabaseOptionSnafu, InvalidUnsetDatabaseOptionSnafu, Result};
use crate::key::FlowId;
/// DDL tasks
@@ -57,6 +61,7 @@ pub enum DdlTask {
AlterLogicalTables(Vec<AlterTableTask>),
CreateDatabase(CreateDatabaseTask),
DropDatabase(DropDatabaseTask),
AlterDatabase(AlterDatabaseTask),
CreateFlow(CreateFlowTask),
DropFlow(DropFlowTask),
CreateView(CreateViewTask),
@@ -99,7 +104,7 @@ impl DdlTask {
}
/// Creates a [`DdlTask`] to alter several logical tables.
pub fn new_alter_logical_tables(table_data: Vec<AlterExpr>) -> Self {
pub fn new_alter_logical_tables(table_data: Vec<AlterTableExpr>) -> Self {
DdlTask::AlterLogicalTables(
table_data
.into_iter()
@@ -149,8 +154,13 @@ impl DdlTask {
})
}
/// Creates a [`DdlTask`] to alter a database.
pub fn new_alter_database(alter_expr: AlterDatabaseExpr) -> Self {
DdlTask::AlterDatabase(AlterDatabaseTask { alter_expr })
}
/// Creates a [`DdlTask`] to alter a table.
pub fn new_alter_table(alter_table: AlterExpr) -> Self {
pub fn new_alter_table(alter_table: AlterTableExpr) -> Self {
DdlTask::AlterTable(AlterTableTask { alter_table })
}
@@ -223,6 +233,9 @@ impl TryFrom<Task> for DdlTask {
Task::DropDatabaseTask(drop_database) => {
Ok(DdlTask::DropDatabase(drop_database.try_into()?))
}
Task::AlterDatabaseTask(alter_database) => {
Ok(DdlTask::AlterDatabase(alter_database.try_into()?))
}
Task::CreateFlowTask(create_flow) => Ok(DdlTask::CreateFlow(create_flow.try_into()?)),
Task::DropFlowTask(drop_flow) => Ok(DdlTask::DropFlow(drop_flow.try_into()?)),
Task::CreateViewTask(create_view) => Ok(DdlTask::CreateView(create_view.try_into()?)),
@@ -272,6 +285,7 @@ impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
}
DdlTask::CreateDatabase(task) => Task::CreateDatabaseTask(task.try_into()?),
DdlTask::DropDatabase(task) => Task::DropDatabaseTask(task.try_into()?),
DdlTask::AlterDatabase(task) => Task::AlterDatabaseTask(task.try_into()?),
DdlTask::CreateFlow(task) => Task::CreateFlowTask(task.into()),
DdlTask::DropFlow(task) => Task::DropFlowTask(task.into()),
DdlTask::CreateView(task) => Task::CreateViewTask(task.try_into()?),
@@ -680,7 +694,8 @@ impl<'de> Deserialize<'de> for CreateTableTask {
#[derive(Debug, PartialEq, Clone)]
pub struct AlterTableTask {
pub alter_table: AlterExpr,
// TODO(CookiePieWw): Replace proto struct with user-defined struct
pub alter_table: AlterTableExpr,
}
impl AlterTableTask {
@@ -932,6 +947,125 @@ impl TryFrom<DropDatabaseTask> for PbDropDatabaseTask {
}
}
#[derive(Debug, PartialEq, Clone)]
pub struct AlterDatabaseTask {
pub alter_expr: AlterDatabaseExpr,
}
impl TryFrom<AlterDatabaseTask> for PbAlterDatabaseTask {
type Error = error::Error;
fn try_from(task: AlterDatabaseTask) -> Result<Self> {
Ok(PbAlterDatabaseTask {
task: Some(task.alter_expr),
})
}
}
impl TryFrom<PbAlterDatabaseTask> for AlterDatabaseTask {
type Error = error::Error;
fn try_from(pb: PbAlterDatabaseTask) -> Result<Self> {
let alter_expr = pb.task.context(error::InvalidProtoMsgSnafu {
err_msg: "expected alter database",
})?;
Ok(AlterDatabaseTask { alter_expr })
}
}
impl TryFrom<PbAlterDatabaseKind> for AlterDatabaseKind {
type Error = error::Error;
fn try_from(pb: PbAlterDatabaseKind) -> Result<Self> {
match pb {
PbAlterDatabaseKind::SetDatabaseOptions(options) => {
Ok(AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(
options
.set_database_options
.into_iter()
.map(SetDatabaseOption::try_from)
.collect::<Result<Vec<_>>>()?,
)))
}
PbAlterDatabaseKind::UnsetDatabaseOptions(options) => Ok(
AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(
options
.keys
.iter()
.map(|key| UnsetDatabaseOption::try_from(key.as_str()))
.collect::<Result<Vec<_>>>()?,
)),
),
}
}
}
const TTL_KEY: &str = "ttl";
impl TryFrom<PbOption> for SetDatabaseOption {
type Error = error::Error;
fn try_from(PbOption { key, value }: PbOption) -> Result<Self> {
match key.to_ascii_lowercase().as_str() {
TTL_KEY => {
let ttl = if value.is_empty() {
Duration::from_secs(0)
} else {
humantime::parse_duration(&value)
.map_err(|_| InvalidSetDatabaseOptionSnafu { key, value }.build())?
};
Ok(SetDatabaseOption::Ttl(ttl))
}
_ => InvalidSetDatabaseOptionSnafu { key, value }.fail(),
}
}
}
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
pub enum SetDatabaseOption {
Ttl(Duration),
}
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
pub enum UnsetDatabaseOption {
Ttl,
}
impl TryFrom<&str> for UnsetDatabaseOption {
type Error = error::Error;
fn try_from(key: &str) -> Result<Self> {
match key.to_ascii_lowercase().as_str() {
TTL_KEY => Ok(UnsetDatabaseOption::Ttl),
_ => InvalidUnsetDatabaseOptionSnafu { key }.fail(),
}
}
}
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
pub struct SetDatabaseOptions(pub Vec<SetDatabaseOption>);
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
pub struct UnsetDatabaseOptions(pub Vec<UnsetDatabaseOption>);
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
pub enum AlterDatabaseKind {
SetDatabaseOptions(SetDatabaseOptions),
UnsetDatabaseOptions(UnsetDatabaseOptions),
}
impl AlterDatabaseTask {
pub fn catalog(&self) -> &str {
&self.alter_expr.catalog_name
}
pub fn schema(&self) -> &str {
&self.alter_expr.catalog_name
}
}
/// Create flow
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CreateFlowTask {
@@ -1118,7 +1252,7 @@ impl From<QueryContext> for PbQueryContext {
mod tests {
use std::sync::Arc;
use api::v1::{AlterExpr, ColumnDef, CreateTableExpr, SemanticType};
use api::v1::{AlterTableExpr, ColumnDef, CreateTableExpr, SemanticType};
use datatypes::schema::{ColumnSchema, RawSchema, SchemaBuilder};
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use store_api::storage::ConcreteDataType;
@@ -1146,7 +1280,7 @@ mod tests {
#[test]
fn test_basic_ser_de_alter_table_task() {
let task = AlterTableTask {
alter_table: AlterExpr::default(),
alter_table: AlterTableExpr::default(),
};
let output = serde_json::to_vec(&task).unwrap();

View File

@@ -245,6 +245,14 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid vector string: {}", vec_str))]
InvalidVectorString {
vec_str: String,
source: DataTypeError,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -273,7 +281,8 @@ impl ErrorExt for Error {
| Error::IntoVector { source, .. }
| Error::FromScalarValue { source, .. }
| Error::ConvertArrowSchema { source, .. }
| Error::FromArrowArray { source, .. } => source.status_code(),
| Error::FromArrowArray { source, .. }
| Error::InvalidVectorString { source, .. } => source.status_code(),
Error::MissingTableMutationHandler { .. }
| Error::MissingProcedureServiceHandler { .. }

View File

@@ -20,6 +20,7 @@ pin-project.workspace = true
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true
tokio.workspace = true
[dev-dependencies]
tokio.workspace = true

View File

@@ -161,6 +161,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Stream timeout"))]
StreamTimeout {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: tokio::time::error::Elapsed,
},
}
impl ErrorExt for Error {
@@ -190,6 +197,8 @@ impl ErrorExt for Error {
Error::SchemaConversion { source, .. } | Error::CastVector { source, .. } => {
source.status_code()
}
Error::StreamTimeout { .. } => StatusCode::Cancelled,
}
}

View File

@@ -74,6 +74,10 @@ impl HandlerContext {
// Ignores flush request
if !writable {
warn!(
"Region: {region_id} is not writable, flush_timeout: {:?}",
flush_timeout
);
return self.downgrade_to_follower_gracefully(region_id).await;
}

View File

@@ -85,15 +85,12 @@ pub fn vector_type_value_to_string(val: &[u8], dim: u32) -> Result<String> {
return Ok("[]".to_string());
}
let elements = unsafe {
std::slice::from_raw_parts(
val.as_ptr() as *const f32,
val.len() / std::mem::size_of::<f32>(),
)
};
let elements = val
.chunks_exact(std::mem::size_of::<f32>())
.map(|e| f32::from_le_bytes(e.try_into().unwrap()));
let mut s = String::from("[");
for (i, e) in elements.iter().enumerate() {
for (i, e) in elements.enumerate() {
if i > 0 {
s.push(',');
}
@@ -105,7 +102,7 @@ pub fn vector_type_value_to_string(val: &[u8], dim: u32) -> Result<String> {
/// Parses a string to a vector type value
/// Valid input format: "[1.0,2.0,3.0]", "[1.0, 2.0, 3.0]"
pub fn parse_string_to_vector_type_value(s: &str, dim: u32) -> Result<Vec<u8>> {
pub fn parse_string_to_vector_type_value(s: &str, dim: Option<u32>) -> Result<Vec<u8>> {
// Trim the brackets
let trimmed = s.trim();
if !trimmed.starts_with('[') || !trimmed.ends_with(']') {
@@ -118,7 +115,7 @@ pub fn parse_string_to_vector_type_value(s: &str, dim: u32) -> Result<Vec<u8>> {
let content = trimmed[1..trimmed.len() - 1].trim();
if content.is_empty() {
if dim != 0 {
if dim.map_or(false, |d| d != 0) {
return InvalidVectorSnafu {
msg: format!("Failed to parse {s} to Vector value: wrong dimension"),
}
@@ -142,7 +139,7 @@ pub fn parse_string_to_vector_type_value(s: &str, dim: u32) -> Result<Vec<u8>> {
.collect::<Result<Vec<f32>>>()?;
// Check dimension
if elements.len() != dim as usize {
if dim.map_or(false, |d| d as usize != elements.len()) {
return InvalidVectorSnafu {
msg: format!("Failed to parse {s} to Vector value: wrong dimension"),
}
@@ -150,12 +147,19 @@ pub fn parse_string_to_vector_type_value(s: &str, dim: u32) -> Result<Vec<u8>> {
}
// Convert Vec<f32> to Vec<u8>
let bytes = unsafe {
std::slice::from_raw_parts(
elements.as_ptr() as *const u8,
elements.len() * std::mem::size_of::<f32>(),
)
.to_vec()
let bytes = if cfg!(target_endian = "little") {
unsafe {
std::slice::from_raw_parts(
elements.as_ptr() as *const u8,
elements.len() * std::mem::size_of::<f32>(),
)
.to_vec()
}
} else {
elements
.iter()
.flat_map(|e| e.to_le_bytes())
.collect::<Vec<u8>>()
};
Ok(bytes)
@@ -176,7 +180,7 @@ mod tests {
];
for (s, expected) in cases.iter() {
let val = parse_string_to_vector_type_value(s, dim).unwrap();
let val = parse_string_to_vector_type_value(s, Some(dim)).unwrap();
let s = vector_type_value_to_string(&val, dim).unwrap();
assert_eq!(s, *expected);
}
@@ -184,7 +188,7 @@ mod tests {
let dim = 0;
let cases = [("[]", "[]"), ("[ ]", "[]"), ("[ ]", "[]")];
for (s, expected) in cases.iter() {
let val = parse_string_to_vector_type_value(s, dim).unwrap();
let val = parse_string_to_vector_type_value(s, Some(dim)).unwrap();
let s = vector_type_value_to_string(&val, dim).unwrap();
assert_eq!(s, *expected);
}
@@ -207,15 +211,15 @@ mod tests {
fn test_parse_string_to_vector_type_value_not_properly_enclosed_in_brackets() {
let dim = 3;
let s = "1.0,2.0,3.0";
let res = parse_string_to_vector_type_value(s, dim);
let res = parse_string_to_vector_type_value(s, Some(dim));
assert!(res.is_err());
let s = "[1.0,2.0,3.0";
let res = parse_string_to_vector_type_value(s, dim);
let res = parse_string_to_vector_type_value(s, Some(dim));
assert!(res.is_err());
let s = "1.0,2.0,3.0]";
let res = parse_string_to_vector_type_value(s, dim);
let res = parse_string_to_vector_type_value(s, Some(dim));
assert!(res.is_err());
}
@@ -223,7 +227,7 @@ mod tests {
fn test_parse_string_to_vector_type_value_wrong_dimension() {
let dim = 3;
let s = "[1.0,2.0]";
let res = parse_string_to_vector_type_value(s, dim);
let res = parse_string_to_vector_type_value(s, Some(dim));
assert!(res.is_err());
}
@@ -231,7 +235,7 @@ mod tests {
fn test_parse_string_to_vector_type_value_elements_are_not_all_float32() {
let dim = 3;
let s = "[1.0,2.0,ah]";
let res = parse_string_to_vector_type_value(s, dim);
let res = parse_string_to_vector_type_value(s, Some(dim));
assert!(res.is_err());
}
}

View File

@@ -1089,7 +1089,7 @@ macro_rules! impl_as_for_value_ref {
};
}
impl ValueRef<'_> {
impl<'a> ValueRef<'a> {
define_data_type_func!(ValueRef);
/// Returns true if this is null.
@@ -1098,12 +1098,12 @@ impl ValueRef<'_> {
}
/// Cast itself to binary slice.
pub fn as_binary(&self) -> Result<Option<&[u8]>> {
pub fn as_binary(&self) -> Result<Option<&'a [u8]>> {
impl_as_for_value_ref!(self, Binary)
}
/// Cast itself to string slice.
pub fn as_string(&self) -> Result<Option<&str>> {
pub fn as_string(&self) -> Result<Option<&'a str>> {
impl_as_for_value_ref!(self, String)
}

View File

@@ -80,7 +80,7 @@ impl BinaryVector {
let v = if let Some(binary) = binary {
let bytes_size = dim as usize * std::mem::size_of::<f32>();
if let Ok(s) = String::from_utf8(binary.to_vec()) {
let v = parse_string_to_vector_type_value(&s, dim)?;
let v = parse_string_to_vector_type_value(&s, Some(dim))?;
Some(v)
} else if binary.len() == dim as usize * std::mem::size_of::<f32>() {
Some(binary.to_vec())

View File

@@ -50,7 +50,10 @@ use crate::adapter::util::column_schemas_to_proto;
use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
use crate::compute::ErrCollector;
use crate::df_optimizer::sql_to_flow_plan;
use crate::error::{EvalSnafu, ExternalSnafu, InternalSnafu, TableNotFoundSnafu, UnexpectedSnafu};
use crate::error::{
EvalSnafu, ExternalSnafu, FlowAlreadyExistSnafu, InternalSnafu, TableNotFoundSnafu,
UnexpectedSnafu,
};
use crate::expr::{Batch, GlobalId};
use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_RUN_INTERVAL_MS};
use crate::repr::{self, DiffRow, Row, BATCH_SIZE};
@@ -299,7 +302,7 @@ impl FlowWorkerManager {
)]);
}
if row.len() != proto_schema.len() {
InternalSnafu {
UnexpectedSnafu {
reason: format!(
"Flow output row length mismatch, expect {} got {}, the columns in schema are: {:?}",
proto_schema.len(),
@@ -673,6 +676,21 @@ impl FlowWorkerManager {
}
}
/// The arguments to create a flow in [`FlowWorkerManager`].
#[derive(Debug, Clone)]
pub struct CreateFlowArgs {
pub flow_id: FlowId,
pub sink_table_name: TableName,
pub source_table_ids: Vec<TableId>,
pub create_if_not_exists: bool,
pub or_replace: bool,
pub expire_after: Option<i64>,
pub comment: Option<String>,
pub sql: String,
pub flow_options: HashMap<String, String>,
pub query_ctx: Option<QueryContext>,
}
/// Create&Remove flow
impl FlowWorkerManager {
/// remove a flow by it's id
@@ -694,18 +712,48 @@ impl FlowWorkerManager {
/// 1. parse query into typed plan(and optional parse expire_after expr)
/// 2. render source/sink with output table id and used input table id
#[allow(clippy::too_many_arguments)]
pub async fn create_flow(
&self,
flow_id: FlowId,
sink_table_name: TableName,
source_table_ids: &[TableId],
create_if_not_exists: bool,
expire_after: Option<i64>,
comment: Option<String>,
sql: String,
flow_options: HashMap<String, String>,
query_ctx: Option<QueryContext>,
) -> Result<Option<FlowId>, Error> {
pub async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
let CreateFlowArgs {
flow_id,
sink_table_name,
source_table_ids,
create_if_not_exists,
or_replace,
expire_after,
comment,
sql,
flow_options,
query_ctx,
} = args;
let already_exist = {
let mut flag = false;
// check if the task already exists
for handle in self.worker_handles.iter() {
if handle.lock().await.contains_flow(flow_id).await? {
flag = true;
break;
}
}
flag
};
match (create_if_not_exists, or_replace, already_exist) {
// do replace
(_, true, true) => {
info!("Replacing flow with id={}", flow_id);
self.remove_flow(flow_id).await?;
}
(false, false, true) => FlowAlreadyExistSnafu { id: flow_id }.fail()?,
// do nothing if exists
(true, false, true) => {
info!("Flow with id={} already exists, do nothing", flow_id);
return Ok(None);
}
// create if not exists
(_, _, false) => (),
}
if create_if_not_exists {
// check if the task already exists
for handle in self.worker_handles.iter() {
@@ -717,7 +765,7 @@ impl FlowWorkerManager {
let mut node_ctx = self.node_context.write().await;
// assign global id to source and sink table
for source in source_table_ids {
for source in &source_table_ids {
node_ctx
.assign_global_id_to_table(&self.table_info_source, None, Some(*source))
.await?;
@@ -726,7 +774,7 @@ impl FlowWorkerManager {
.assign_global_id_to_table(&self.table_info_source, Some(sink_table_name.clone()), None)
.await?;
node_ctx.register_task_src_sink(flow_id, source_table_ids, sink_table_name.clone());
node_ctx.register_task_src_sink(flow_id, &source_table_ids, sink_table_name.clone());
node_ctx.query_context = query_ctx.map(Arc::new);
// construct a active dataflow state with it

View File

@@ -28,7 +28,7 @@ use itertools::Itertools;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use crate::adapter::FlowWorkerManager;
use crate::adapter::{CreateFlowArgs, FlowWorkerManager};
use crate::error::InternalSnafu;
use crate::metrics::METRIC_FLOW_TASK_COUNT;
use crate::repr::{self, DiffRow};
@@ -57,6 +57,7 @@ impl Flownode for FlowWorkerManager {
comment,
sql,
flow_options,
or_replace,
})) => {
let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
let sink_table_name = [
@@ -65,20 +66,19 @@ impl Flownode for FlowWorkerManager {
sink_table_name.table_name,
];
let expire_after = expire_after.map(|e| e.value);
let ret = self
.create_flow(
task_id.id as u64,
sink_table_name,
&source_table_ids,
create_if_not_exists,
expire_after,
Some(comment),
sql,
flow_options,
query_ctx,
)
.await
.map_err(to_meta_err)?;
let args = CreateFlowArgs {
flow_id: task_id.id as u64,
sink_table_name,
source_table_ids,
create_if_not_exists,
or_replace,
expire_after,
comment: Some(comment),
sql,
flow_options,
query_ctx,
};
let ret = self.create_flow(args).await.map_err(to_meta_err)?;
METRIC_FLOW_TASK_COUNT.inc();
Ok(FlowResponse {
affected_flows: ret

View File

@@ -48,7 +48,7 @@ use tonic::codec::CompressionEncoding;
use tonic::transport::server::TcpIncoming;
use tonic::{Request, Response, Status};
use crate::adapter::FlowWorkerManagerRef;
use crate::adapter::{CreateFlowArgs, FlowWorkerManagerRef};
use crate::error::{
CacheRequiredSnafu, ExternalSnafu, FlowNotFoundSnafu, ListFlowsSnafu, ParseAddrSnafu,
ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu,
@@ -355,23 +355,26 @@ impl FlownodeBuilder {
info.sink_table_name().schema_name.clone(),
info.sink_table_name().table_name.clone(),
];
manager
.create_flow(
flow_id as _,
sink_table_name,
info.source_table_ids(),
true,
info.expire_after(),
Some(info.comment().clone()),
info.raw_sql().clone(),
info.options().clone(),
Some(
QueryContextBuilder::default()
.current_catalog(info.catalog_name().clone())
.build(),
),
)
.await?;
let args = CreateFlowArgs {
flow_id: flow_id as _,
sink_table_name,
source_table_ids: info.source_table_ids().to_vec(),
// because recover should only happen on restart the `create_if_not_exists` and `or_replace` can be arbitrary value(since flow doesn't exist)
// but for the sake of consistency and to make sure recover of flow actually happen, we set both to true
// (which is also fine since checks for not allow both to be true is on metasrv and we already pass that)
create_if_not_exists: true,
or_replace: true,
expire_after: info.expire_after(),
comment: Some(info.comment().clone()),
sql: info.raw_sql().clone(),
flow_options: info.options().clone(),
query_ctx: Some(
QueryContextBuilder::default()
.current_catalog(info.catalog_name().clone())
.build(),
),
};
manager.create_flow(args).await?;
}
Ok(cnt)

View File

@@ -492,6 +492,7 @@ pub fn check_permission(
Statement::CreateDatabase(_)
| Statement::ShowDatabases(_)
| Statement::DropDatabase(_)
| Statement::AlterDatabase(_)
| Statement::DropFlow(_)
| Statement::Use(_) => {}
Statement::ShowCreateDatabase(stmt) => {
@@ -516,7 +517,7 @@ pub fn check_permission(
Statement::CreateView(stmt) => {
validate_param(&stmt.name, query_ctx)?;
}
Statement::Alter(stmt) => {
Statement::AlterTable(stmt) => {
validate_param(stmt.table_name(), query_ctx)?;
}
// set/show variable now only alter/show variable in session

View File

@@ -115,7 +115,14 @@ impl GrpcQueryHandler for Instance {
.await?;
Output::new_with_affected_rows(0)
}
DdlExpr::Alter(expr) => {
DdlExpr::AlterDatabase(expr) => {
let _ = self
.statement_executor
.alter_database_inner(expr, ctx.clone())
.await?;
Output::new_with_affected_rows(0)
}
DdlExpr::AlterTable(expr) => {
self.statement_executor
.alter_table_inner(expr, ctx.clone())
.await?
@@ -195,11 +202,11 @@ fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryConte
}
match ddl_expr {
Expr::CreateDatabase(_) => { /* do nothing*/ }
Expr::CreateDatabase(_) | Expr::AlterDatabase(_) => { /* do nothing*/ }
Expr::CreateTable(expr) => {
check_and_fill!(expr);
}
Expr::Alter(expr) => {
Expr::AlterTable(expr) => {
check_and_fill!(expr);
}
Expr::DropTable(expr) => {

View File

@@ -622,6 +622,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Maintenance mode manager error"))]
MaintenanceModeManager {
source: common_meta::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Keyvalue backend error"))]
KvBackend {
source: common_meta::error::Error,
@@ -815,6 +822,7 @@ impl ErrorExt for Error {
Error::SubmitDdlTask { source, .. } => source.status_code(),
Error::ConvertProtoData { source, .. }
| Error::TableMetadataManager { source, .. }
| Error::MaintenanceModeManager { source, .. }
| Error::KvBackend { source, .. }
| Error::UnexpectedLogicalRouteTable { source, .. } => source.status_code(),

View File

@@ -26,6 +26,7 @@ use common_config::Configurable;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl::ProcedureExecutorRef;
use common_meta::key::maintenance::MaintenanceModeManagerRef;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
use common_meta::leadership_notifier::{
@@ -340,6 +341,7 @@ pub struct Metasrv {
procedure_executor: ProcedureExecutorRef,
wal_options_allocator: WalOptionsAllocatorRef,
table_metadata_manager: TableMetadataManagerRef,
maintenance_mode_manager: MaintenanceModeManagerRef,
memory_region_keeper: MemoryRegionKeeperRef,
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
region_migration_manager: RegionMigrationManagerRef,
@@ -572,6 +574,10 @@ impl Metasrv {
&self.table_metadata_manager
}
pub fn maintenance_mode_manager(&self) -> &MaintenanceModeManagerRef {
&self.maintenance_mode_manager
}
pub fn memory_region_keeper(&self) -> &MemoryRegionKeeperRef {
&self.memory_region_keeper
}

View File

@@ -27,6 +27,7 @@ use common_meta::ddl::{
use common_meta::ddl_manager::DdlManager;
use common_meta::distributed_time_constants;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::maintenance::MaintenanceModeManager;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
@@ -195,6 +196,7 @@ impl MetasrvBuilder {
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(
leader_cached_kv_backend.clone() as _,
));
let maintenance_mode_manager = Arc::new(MaintenanceModeManager::new(kv_backend.clone()));
let selector_ctx = SelectorContext {
server_addr: options.server_addr.clone(),
datanode_lease_secs: distributed_time_constants::DATANODE_LEASE_SECS,
@@ -304,7 +306,7 @@ impl MetasrvBuilder {
selector_ctx.clone(),
selector.clone(),
region_migration_manager.clone(),
leader_cached_kv_backend.clone() as _,
maintenance_mode_manager.clone(),
peer_lookup_service.clone(),
);
@@ -375,6 +377,7 @@ impl MetasrvBuilder {
procedure_executor: ddl_manager,
wal_options_allocator,
table_metadata_manager,
maintenance_mode_manager,
greptimedb_telemetry_task: get_greptimedb_telemetry_task(
Some(metasrv_home),
meta_peer_client,

View File

@@ -165,6 +165,7 @@ impl DowngradeLeaderRegion {
match receiver.await? {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!("Downgrade region reply: {:?}", reply);
let InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id,
exists,

View File

@@ -21,6 +21,7 @@ use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, Simple
use common_meta::key::datanode_table::RegionInfo;
use common_meta::RegionIdent;
use common_procedure::Status;
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
@@ -144,6 +145,7 @@ impl OpenCandidateRegion {
match receiver.await? {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!("Received open region reply: {:?}", reply);
let InstructionReply::OpenRegion(SimpleReply { result, error }) = reply else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),

View File

@@ -19,8 +19,7 @@ use std::time::Duration;
use async_trait::async_trait;
use common_meta::datanode::Stat;
use common_meta::ddl::{DetectingRegion, RegionFailureDetectorController};
use common_meta::key::MAINTENANCE_KEY;
use common_meta::kv_backend::KvBackendRef;
use common_meta::key::maintenance::MaintenanceModeManagerRef;
use common_meta::leadership_notifier::LeadershipChangeListener;
use common_meta::peer::PeerLookupServiceRef;
use common_meta::{ClusterId, DatanodeId};
@@ -216,8 +215,8 @@ pub struct RegionSupervisor {
selector: SelectorRef,
/// Region migration manager.
region_migration_manager: RegionMigrationManagerRef,
// TODO(weny): find a better way
kv_backend: KvBackendRef,
/// The maintenance mode manager.
maintenance_mode_manager: MaintenanceModeManagerRef,
/// Peer lookup service
peer_lookup: PeerLookupServiceRef,
}
@@ -288,7 +287,7 @@ impl RegionSupervisor {
selector_context: SelectorContext,
selector: SelectorRef,
region_migration_manager: RegionMigrationManagerRef,
kv_backend: KvBackendRef,
maintenance_mode_manager: MaintenanceModeManagerRef,
peer_lookup: PeerLookupServiceRef,
) -> Self {
Self {
@@ -297,7 +296,7 @@ impl RegionSupervisor {
selector_context,
selector,
region_migration_manager,
kv_backend,
maintenance_mode_manager,
peer_lookup,
}
}
@@ -346,7 +345,7 @@ impl RegionSupervisor {
if regions.is_empty() {
return;
}
match self.is_maintenance_mode().await {
match self.is_maintenance_mode_enabled().await {
Ok(false) => {}
Ok(true) => {
info!("Maintenance mode is enabled, skip failover");
@@ -382,11 +381,11 @@ impl RegionSupervisor {
}
}
pub(crate) async fn is_maintenance_mode(&self) -> Result<bool> {
self.kv_backend
.exists(MAINTENANCE_KEY.as_bytes())
pub(crate) async fn is_maintenance_mode_enabled(&self) -> Result<bool> {
self.maintenance_mode_manager
.maintenance_mode()
.await
.context(error::KvBackendSnafu)
.context(error::MaintenanceModeManagerSnafu)
}
async fn do_failover(
@@ -479,6 +478,7 @@ pub(crate) mod tests {
use std::time::Duration;
use common_meta::ddl::RegionFailureDetectorController;
use common_meta::key::maintenance;
use common_meta::peer::Peer;
use common_meta::test_util::NoopPeerLookupService;
use common_time::util::current_time_millis;
@@ -505,7 +505,8 @@ pub(crate) mod tests {
env.procedure_manager().clone(),
context_factory,
));
let kv_backend = env.kv_backend();
let maintenance_mode_manager =
Arc::new(maintenance::MaintenanceModeManager::new(env.kv_backend()));
let peer_lookup = Arc::new(NoopPeerLookupService);
let (tx, rx) = RegionSupervisor::channel();
@@ -516,7 +517,7 @@ pub(crate) mod tests {
selector_context,
selector,
region_migration_manager,
kv_backend,
maintenance_mode_manager,
peer_lookup,
),
tx,

View File

@@ -57,7 +57,7 @@ pub fn make_admin_service(metasrv: Arc<Metasrv>) -> Admin {
let router = router.route(
"/maintenance",
maintenance::MaintenanceHandler {
kv_backend: metasrv.kv_backend().clone(),
manager: metasrv.maintenance_mode_manager().clone(),
},
);
let router = Router::nest("/admin", router);

View File

@@ -14,31 +14,29 @@
use std::collections::HashMap;
use common_meta::key::MAINTENANCE_KEY;
use common_meta::kv_backend::KvBackendRef;
use common_meta::rpc::store::PutRequest;
use common_meta::key::maintenance::MaintenanceModeManagerRef;
use snafu::{OptionExt, ResultExt};
use tonic::codegen::http;
use tonic::codegen::http::Response;
use crate::error::{
InvalidHttpBodySnafu, KvBackendSnafu, MissingRequiredParameterSnafu, ParseBoolSnafu,
UnsupportedSnafu,
InvalidHttpBodySnafu, MaintenanceModeManagerSnafu, MissingRequiredParameterSnafu,
ParseBoolSnafu, UnsupportedSnafu,
};
use crate::service::admin::HttpHandler;
#[derive(Clone)]
pub struct MaintenanceHandler {
pub kv_backend: KvBackendRef,
pub manager: MaintenanceModeManagerRef,
}
impl MaintenanceHandler {
async fn get_maintenance(&self) -> crate::Result<Response<String>> {
let enabled = self
.kv_backend
.exists(MAINTENANCE_KEY.as_bytes())
.manager
.maintenance_mode()
.await
.context(KvBackendSnafu)?;
.context(MaintenanceModeManagerSnafu)?;
let response = if enabled {
"Maintenance mode is enabled"
} else {
@@ -63,21 +61,16 @@ impl MaintenanceHandler {
})?;
let response = if enable {
let req = PutRequest {
key: Vec::from(MAINTENANCE_KEY),
value: vec![],
prev_kv: false,
};
self.kv_backend
.put(req.clone())
self.manager
.set_maintenance_mode()
.await
.context(KvBackendSnafu)?;
.context(MaintenanceModeManagerSnafu)?;
"Maintenance mode enabled"
} else {
self.kv_backend
.delete(MAINTENANCE_KEY.as_bytes(), false)
self.manager
.unset_maintenance_mode()
.await
.context(KvBackendSnafu)?;
.context(MaintenanceModeManagerSnafu)?;
"Maintenance mode disabled"
};

View File

@@ -501,7 +501,7 @@ mod tests {
// Read metadata from write cache
let builder = ParquetReaderBuilder::new(data_home, handle.clone(), mock_store.clone())
.cache(Some(cache_manager.clone()));
.cache(cache_manager.clone());
let reader = builder.build().await.unwrap();
// Check parquet metadata

View File

@@ -562,7 +562,7 @@ pub struct SerializedCompactionOutput {
struct CompactionSstReaderBuilder<'a> {
metadata: RegionMetadataRef,
sst_layer: AccessLayerRef,
cache: Option<CacheManagerRef>,
cache: CacheManagerRef,
inputs: &'a [FileHandle],
append_mode: bool,
filter_deleted: bool,

View File

@@ -295,7 +295,7 @@ impl Compactor for DefaultCompactor {
let reader = CompactionSstReaderBuilder {
metadata: region_metadata.clone(),
sst_layer: sst_layer.clone(),
cache: Some(cache_manager.clone()),
cache: cache_manager.clone(),
inputs: &output.inputs,
append_mode,
filter_deleted: output.filter_deleted,

View File

@@ -16,7 +16,7 @@ use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;
use common_telemetry::{debug, info};
use common_telemetry::{info, trace};
use common_time::timestamp::TimeUnit;
use common_time::timestamp_millis::BucketAligned;
use common_time::Timestamp;
@@ -114,7 +114,7 @@ impl TwcsPicker {
// Files in window exceeds file num limit
vec![enforce_file_num(&files.files, max_files)]
} else {
debug!("Skip building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, ", active_window, *window, max_runs, found_runs);
trace!("Skip building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, ", active_window, *window, max_runs, found_runs);
continue;
};
@@ -353,10 +353,10 @@ fn find_latest_window_in_seconds<'a>(
let mut latest_timestamp = None;
for f in files {
let (_, end) = f.time_range();
if let Some(latest) = latest_timestamp
&& end > latest
{
latest_timestamp = Some(end);
if let Some(latest) = latest_timestamp {
if end > latest {
latest_timestamp = Some(end);
}
} else {
latest_timestamp = Some(end);
}
@@ -406,6 +406,18 @@ mod tests {
)
.unwrap()
);
assert_eq!(
Some((i64::MAX / 3600000 + 1) * 3600),
find_latest_window_in_seconds(
[
new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0),
new_file_handle(FileId::random(), 0, 1000, 0)
]
.iter(),
3600
)
);
}
#[test]

View File

@@ -438,16 +438,12 @@ impl EngineInner {
channel_size: self.config.parallel_scan_channel_size,
};
let scan_region = ScanRegion::new(
version,
region.access_layer.clone(),
request,
Some(cache_manager),
)
.with_parallelism(scan_parallelism)
.with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
.with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
.with_start_time(query_start);
let scan_region =
ScanRegion::new(version, region.access_layer.clone(), request, cache_manager)
.with_parallelism(scan_parallelism)
.with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
.with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
.with_start_time(query_start);
Ok(scan_region)
}

View File

@@ -71,7 +71,7 @@ fn add_tag1() -> RegionAlterRequest {
fn alter_column_fulltext_options() -> RegionAlterRequest {
RegionAlterRequest {
schema_version: 0,
kind: AlterKind::ChangeColumnFulltext {
kind: AlterKind::SetColumnFulltext {
column_name: "tag_0".to_string(),
options: FulltextOptions {
enable: true,

View File

@@ -26,6 +26,8 @@ pub const FLUSH_REASON: &str = "reason";
pub const FILE_TYPE_LABEL: &str = "file_type";
/// Region worker id label.
pub const WORKER_LABEL: &str = "worker";
/// Partition label.
pub const PARTITION_LABEL: &str = "partition";
lazy_static! {
/// Global write buffer size in bytes.
@@ -134,6 +136,13 @@ lazy_static! {
)
.unwrap();
pub static ref READ_STAGE_FETCH_PAGES: Histogram = READ_STAGE_ELAPSED.with_label_values(&["fetch_pages"]);
/// Number of in-progress scan per partition.
pub static ref IN_PROGRESS_SCAN: IntGaugeVec = register_int_gauge_vec!(
"greptime_mito_in_progress_scan",
"mito in progress scan per partition",
&[TYPE_LABEL, PARTITION_LABEL]
)
.unwrap();
/// Counter of rows read from different source.
pub static ref READ_ROWS_TOTAL: IntCounterVec =
register_int_counter_vec!("greptime_mito_read_rows_total", "mito read rows total", &[TYPE_LABEL]).unwrap();

View File

@@ -85,7 +85,7 @@ impl RowGroupLastRowCachedReader {
pub(crate) fn new(
file_id: FileId,
row_group_idx: usize,
cache_manager: Option<CacheManagerRef>,
cache_manager: CacheManagerRef,
row_group_reader: RowGroupReader,
) -> Self {
let key = SelectorResultKey {
@@ -94,9 +94,6 @@ impl RowGroupLastRowCachedReader {
selector: TimeSeriesRowSelector::LastRow,
};
let Some(cache_manager) = cache_manager else {
return Self::new_miss(key, row_group_reader, None);
};
if let Some(value) = cache_manager.get_selector_result(&key) {
let schema_matches = value.projection
== row_group_reader

View File

@@ -171,7 +171,7 @@ impl ProjectionMapper {
pub(crate) fn convert(
&self,
batch: &Batch,
cache_manager: Option<&CacheManager>,
cache_manager: &CacheManager,
) -> common_recordbatch::error::Result<RecordBatch> {
debug_assert_eq!(self.batch_fields.len(), batch.fields().len());
debug_assert!(self
@@ -204,15 +204,12 @@ impl ProjectionMapper {
match index {
BatchIndex::Tag(idx) => {
let value = &pk_values[*idx];
let vector = match cache_manager {
Some(cache) => repeated_vector_with_cache(
&column_schema.data_type,
value,
num_rows,
cache,
)?,
None => new_repeated_vector(&column_schema.data_type, value, num_rows)?,
};
let vector = repeated_vector_with_cache(
&column_schema.data_type,
value,
num_rows,
cache_manager,
)?;
columns.push(vector);
}
BatchIndex::Timestamp => {
@@ -360,7 +357,7 @@ mod tests {
// With vector cache.
let cache = CacheManager::builder().vector_cache_size(1024).build();
let batch = new_batch(0, &[1, 2], &[(3, 3), (4, 4)], 3);
let record_batch = mapper.convert(&batch, Some(&cache)).unwrap();
let record_batch = mapper.convert(&batch, &cache).unwrap();
let expect = "\
+---------------------+----+----+----+----+
| ts | k0 | k1 | v0 | v1 |
@@ -380,7 +377,7 @@ mod tests {
assert!(cache
.get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(3))
.is_none());
let record_batch = mapper.convert(&batch, Some(&cache)).unwrap();
let record_batch = mapper.convert(&batch, &cache).unwrap();
assert_eq!(expect, print_record_batch(record_batch));
}
@@ -401,7 +398,8 @@ mod tests {
);
let batch = new_batch(0, &[1, 2], &[(4, 4)], 3);
let record_batch = mapper.convert(&batch, None).unwrap();
let cache = CacheManager::builder().vector_cache_size(1024).build();
let record_batch = mapper.convert(&batch, &cache).unwrap();
let expect = "\
+----+----+
| v1 | k0 |

View File

@@ -14,15 +14,22 @@
//! Structs for partition ranges.
use std::collections::BTreeMap;
use std::sync::{Arc, Mutex};
use common_time::Timestamp;
use parquet::arrow::arrow_reader::RowSelection;
use smallvec::{smallvec, SmallVec};
use store_api::region_engine::PartitionRange;
use crate::cache::CacheManager;
use crate::memtable::MemtableRef;
use crate::error::Result;
use crate::memtable::{MemtableRange, MemtableRef};
use crate::read::scan_region::ScanInput;
use crate::sst::file::{overlaps, FileHandle, FileTimeRange};
use crate::sst::parquet::file_range::{FileRange, FileRangeContextRef};
use crate::sst::parquet::format::parquet_row_group_time_range;
use crate::sst::parquet::reader::ReaderMetrics;
use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
const ALL_ROW_GROUPS: i64 = -1;
@@ -90,7 +97,7 @@ impl RangeMeta {
Self::push_unordered_file_ranges(
input.memtables.len(),
&input.files,
input.cache_manager.as_deref(),
&input.cache_manager,
&mut ranges,
);
@@ -172,16 +179,15 @@ impl RangeMeta {
fn push_unordered_file_ranges(
num_memtables: usize,
files: &[FileHandle],
cache: Option<&CacheManager>,
cache: &CacheManager,
ranges: &mut Vec<RangeMeta>,
) {
// For append mode, we can parallelize reading row groups.
for (i, file) in files.iter().enumerate() {
let file_index = num_memtables + i;
// Get parquet meta from the cache.
let parquet_meta = cache.and_then(|c| {
c.get_parquet_meta_data_from_mem_cache(file.region_id(), file.file_id())
});
let parquet_meta =
cache.get_parquet_meta_data_from_mem_cache(file.region_id(), file.file_id());
if let Some(parquet_meta) = parquet_meta {
// Scans each row group.
for row_group_index in 0..file.meta_ref().num_row_groups {
@@ -335,6 +341,160 @@ fn maybe_split_ranges_for_seq_scan(ranges: Vec<RangeMeta>) -> Vec<RangeMeta> {
new_ranges
}
/// Builder to create file ranges.
#[derive(Default)]
pub(crate) struct FileRangeBuilder {
/// Context for the file.
/// None indicates nothing to read.
context: Option<FileRangeContextRef>,
/// Row selections for each row group to read.
/// It skips the row group if it is not in the map.
row_groups: BTreeMap<usize, Option<RowSelection>>,
}
impl FileRangeBuilder {
/// Builds a file range builder from context and row groups.
pub(crate) fn new(
context: FileRangeContextRef,
row_groups: BTreeMap<usize, Option<RowSelection>>,
) -> Self {
Self {
context: Some(context),
row_groups,
}
}
/// Builds file ranges to read.
/// Negative `row_group_index` indicates all row groups.
pub(crate) fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[FileRange; 2]>) {
let Some(context) = self.context.clone() else {
return;
};
if row_group_index >= 0 {
let row_group_index = row_group_index as usize;
// Scans one row group.
let Some(row_selection) = self.row_groups.get(&row_group_index) else {
return;
};
ranges.push(FileRange::new(
context,
row_group_index,
row_selection.clone(),
));
} else {
// Scans all row groups.
ranges.extend(
self.row_groups
.iter()
.map(|(row_group_index, row_selection)| {
FileRange::new(context.clone(), *row_group_index, row_selection.clone())
}),
);
}
}
}
/// Builder to create mem ranges.
pub(crate) struct MemRangeBuilder {
/// Ranges of a memtable.
row_groups: BTreeMap<usize, MemtableRange>,
}
impl MemRangeBuilder {
/// Builds a mem range builder from row groups.
pub(crate) fn new(row_groups: BTreeMap<usize, MemtableRange>) -> Self {
Self { row_groups }
}
/// Builds mem ranges to read in the memtable.
/// Negative `row_group_index` indicates all row groups.
fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[MemtableRange; 2]>) {
if row_group_index >= 0 {
let row_group_index = row_group_index as usize;
// Scans one row group.
let Some(range) = self.row_groups.get(&row_group_index) else {
return;
};
ranges.push(range.clone());
} else {
ranges.extend(self.row_groups.values().cloned());
}
}
}
/// List to manages the builders to create file ranges.
/// Each scan partition should have its own list. Mutex inside this list is used to allow moving
/// the list to different streams in the same partition.
pub(crate) struct RangeBuilderList {
num_memtables: usize,
mem_builders: Mutex<Vec<Option<MemRangeBuilder>>>,
file_builders: Mutex<Vec<Option<Arc<FileRangeBuilder>>>>,
}
impl RangeBuilderList {
/// Creates a new [ReaderBuilderList] with the given number of memtables and files.
pub(crate) fn new(num_memtables: usize, num_files: usize) -> Self {
let mem_builders = (0..num_memtables).map(|_| None).collect();
let file_builders = (0..num_files).map(|_| None).collect();
Self {
num_memtables,
mem_builders: Mutex::new(mem_builders),
file_builders: Mutex::new(file_builders),
}
}
/// Builds file ranges to read the row group at `index`.
pub(crate) async fn build_file_ranges(
&self,
input: &ScanInput,
index: RowGroupIndex,
reader_metrics: &mut ReaderMetrics,
) -> Result<SmallVec<[FileRange; 2]>> {
let mut ranges = SmallVec::new();
let file_index = index.index - self.num_memtables;
let builder_opt = self.get_file_builder(file_index);
match builder_opt {
Some(builder) => builder.build_ranges(index.row_group_index, &mut ranges),
None => {
let builder = input.prune_file(file_index, reader_metrics).await?;
builder.build_ranges(index.row_group_index, &mut ranges);
self.set_file_builder(file_index, Arc::new(builder));
}
}
Ok(ranges)
}
/// Builds mem ranges to read the row group at `index`.
pub(crate) fn build_mem_ranges(
&self,
input: &ScanInput,
index: RowGroupIndex,
) -> SmallVec<[MemtableRange; 2]> {
let mut ranges = SmallVec::new();
let mut mem_builders = self.mem_builders.lock().unwrap();
match &mut mem_builders[index.index] {
Some(builder) => builder.build_ranges(index.row_group_index, &mut ranges),
None => {
let builder = input.prune_memtable(index.index);
builder.build_ranges(index.row_group_index, &mut ranges);
mem_builders[index.index] = Some(builder);
}
}
ranges
}
fn get_file_builder(&self, index: usize) -> Option<Arc<FileRangeBuilder>> {
let file_builders = self.file_builders.lock().unwrap();
file_builders[index].clone()
}
fn set_file_builder(&self, index: usize, builder: Arc<FileRangeBuilder>) {
let mut file_builders = self.file_builders.lock().unwrap();
file_builders[index] = Some(builder);
}
}
#[cfg(test)]
mod tests {
use common_time::timestamp::TimeUnit;

View File

@@ -14,9 +14,9 @@
//! Scans a region according to the scan request.
use std::collections::{BTreeMap, HashSet};
use std::collections::HashSet;
use std::fmt;
use std::sync::{Arc, Mutex as StdMutex};
use std::sync::Arc;
use std::time::Instant;
use common_error::ext::BoxedError;
@@ -24,23 +24,21 @@ use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::{debug, error, tracing, warn};
use common_time::range::TimestampRange;
use datafusion_expr::utils::expr_to_columns;
use parquet::arrow::arrow_reader::RowSelection;
use smallvec::SmallVec;
use store_api::region_engine::{PartitionRange, RegionScannerRef};
use store_api::storage::{ScanRequest, TimeSeriesRowSelector};
use table::predicate::{build_time_range_predicate, Predicate};
use tokio::sync::{mpsc, Mutex, Semaphore};
use tokio::sync::{mpsc, Semaphore};
use tokio_stream::wrappers::ReceiverStream;
use crate::access_layer::AccessLayerRef;
use crate::cache::file_cache::FileCacheRef;
use crate::cache::CacheManagerRef;
use crate::error::Result;
use crate::memtable::{MemtableRange, MemtableRef};
use crate::memtable::MemtableRef;
use crate::metrics::READ_SST_COUNT;
use crate::read::compat::{self, CompatBatch};
use crate::read::projection::ProjectionMapper;
use crate::read::range::{RangeMeta, RowGroupIndex};
use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
use crate::read::seq_scan::SeqScan;
use crate::read::unordered_scan::UnorderedScan;
use crate::read::{Batch, Source};
@@ -51,7 +49,6 @@ use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBui
use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
use crate::sst::parquet::file_range::{FileRange, FileRangeContextRef};
use crate::sst::parquet::reader::ReaderMetrics;
/// A scanner scans a region and returns a [SendableRecordBatchStream].
@@ -167,7 +164,7 @@ pub(crate) struct ScanRegion {
/// Scan request.
request: ScanRequest,
/// Cache.
cache_manager: Option<CacheManagerRef>,
cache_manager: CacheManagerRef,
/// Parallelism to scan.
parallelism: ScanParallelism,
/// Whether to ignore inverted index.
@@ -184,7 +181,7 @@ impl ScanRegion {
version: VersionRef,
access_layer: AccessLayerRef,
request: ScanRequest,
cache_manager: Option<CacheManagerRef>,
cache_manager: CacheManagerRef,
) -> ScanRegion {
ScanRegion {
version,
@@ -381,17 +378,12 @@ impl ScanRegion {
}
let file_cache = || -> Option<FileCacheRef> {
let cache_manager = self.cache_manager.as_ref()?;
let write_cache = cache_manager.write_cache()?;
let write_cache = self.cache_manager.write_cache()?;
let file_cache = write_cache.file_cache();
Some(file_cache)
}();
let index_cache = self
.cache_manager
.as_ref()
.and_then(|c| c.index_cache())
.cloned();
let index_cache = self.cache_manager.index_cache().cloned();
InvertedIndexApplierBuilder::new(
self.access_layer.region_dir().to_string(),
@@ -471,7 +463,7 @@ pub(crate) struct ScanInput {
/// Handles to SST files to scan.
pub(crate) files: Vec<FileHandle>,
/// Cache.
pub(crate) cache_manager: Option<CacheManagerRef>,
pub(crate) cache_manager: CacheManagerRef,
/// Ignores file not found error.
ignore_file_not_found: bool,
/// Parallelism to scan data.
@@ -502,7 +494,7 @@ impl ScanInput {
predicate: None,
memtables: Vec::new(),
files: Vec::new(),
cache_manager: None,
cache_manager: CacheManagerRef::default(),
ignore_file_not_found: false,
parallelism: ScanParallelism::default(),
inverted_index_applier: None,
@@ -545,7 +537,7 @@ impl ScanInput {
/// Sets cache for this query.
#[must_use]
pub(crate) fn with_cache(mut self, cache: Option<CacheManagerRef>) -> Self {
pub(crate) fn with_cache(mut self, cache: CacheManagerRef) -> Self {
self.cache_manager = cache;
self
}
@@ -644,14 +636,14 @@ impl ScanInput {
}
/// Prunes a memtable to scan and returns the builder to build readers.
fn prune_memtable(&self, mem_index: usize) -> MemRangeBuilder {
pub(crate) fn prune_memtable(&self, mem_index: usize) -> MemRangeBuilder {
let memtable = &self.memtables[mem_index];
let row_groups = memtable.ranges(Some(self.mapper.column_ids()), self.predicate.clone());
MemRangeBuilder { row_groups }
MemRangeBuilder::new(row_groups)
}
/// Prunes a file to scan and returns the builder to build readers.
async fn prune_file(
pub(crate) async fn prune_file(
&self,
file_index: usize,
reader_metrics: &mut ReaderMetrics,
@@ -692,10 +684,7 @@ impl ScanInput {
)?;
file_range_ctx.set_compat_batch(Some(compat));
}
Ok(FileRangeBuilder {
context: Some(Arc::new(file_range_ctx)),
row_groups,
})
Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), row_groups))
}
/// Scans the input source in another task and sends batches to the sender.
@@ -764,8 +753,6 @@ pub(crate) struct StreamContext {
pub(crate) input: ScanInput,
/// Metadata for partition ranges.
pub(crate) ranges: Vec<RangeMeta>,
/// Lists of range builders.
range_builders: RangeBuilderList,
// Metrics:
/// The start time of the query.
@@ -778,12 +765,10 @@ impl StreamContext {
let query_start = input.query_start.unwrap_or_else(Instant::now);
let ranges = RangeMeta::seq_scan_ranges(&input);
READ_SST_COUNT.observe(input.num_files() as f64);
let range_builders = RangeBuilderList::new(input.num_memtables(), input.num_files());
Self {
input,
ranges,
range_builders,
query_start,
}
}
@@ -793,12 +778,10 @@ impl StreamContext {
let query_start = input.query_start.unwrap_or_else(Instant::now);
let ranges = RangeMeta::unordered_scan_ranges(&input);
READ_SST_COUNT.observe(input.num_files() as f64);
let range_builders = RangeBuilderList::new(input.num_memtables(), input.num_files());
Self {
input,
ranges,
range_builders,
query_start,
}
}
@@ -808,27 +791,6 @@ impl StreamContext {
self.input.num_memtables() > index.index
}
/// Creates file ranges to scan.
pub(crate) async fn build_file_ranges(
&self,
index: RowGroupIndex,
reader_metrics: &mut ReaderMetrics,
) -> Result<SmallVec<[FileRange; 2]>> {
let mut ranges = SmallVec::new();
self.range_builders
.build_file_ranges(&self.input, index, &mut ranges, reader_metrics)
.await?;
Ok(ranges)
}
/// Creates memtable ranges to scan.
pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
let mut ranges = SmallVec::new();
self.range_builders
.build_mem_ranges(&self.input, index, &mut ranges);
ranges
}
/// Retrieves the partition ranges.
pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
self.ranges
@@ -864,125 +826,3 @@ impl StreamContext {
Ok(())
}
}
/// List to manages the builders to create file ranges.
struct RangeBuilderList {
mem_builders: Vec<StdMutex<Option<MemRangeBuilder>>>,
file_builders: Vec<Mutex<Option<FileRangeBuilder>>>,
}
impl RangeBuilderList {
/// Creates a new [ReaderBuilderList] with the given number of memtables and files.
fn new(num_memtables: usize, num_files: usize) -> Self {
let mem_builders = (0..num_memtables).map(|_| StdMutex::new(None)).collect();
let file_builders = (0..num_files).map(|_| Mutex::new(None)).collect();
Self {
mem_builders,
file_builders,
}
}
/// Builds file ranges to read the row group at `index`.
async fn build_file_ranges(
&self,
input: &ScanInput,
index: RowGroupIndex,
ranges: &mut SmallVec<[FileRange; 2]>,
reader_metrics: &mut ReaderMetrics,
) -> Result<()> {
let file_index = index.index - self.mem_builders.len();
let mut builder_opt = self.file_builders[file_index].lock().await;
match &mut *builder_opt {
Some(builder) => builder.build_ranges(index.row_group_index, ranges),
None => {
let builder = input.prune_file(file_index, reader_metrics).await?;
builder.build_ranges(index.row_group_index, ranges);
*builder_opt = Some(builder);
}
}
Ok(())
}
/// Builds mem ranges to read the row group at `index`.
fn build_mem_ranges(
&self,
input: &ScanInput,
index: RowGroupIndex,
ranges: &mut SmallVec<[MemtableRange; 2]>,
) {
let mut builder_opt = self.mem_builders[index.index].lock().unwrap();
match &mut *builder_opt {
Some(builder) => builder.build_ranges(index.row_group_index, ranges),
None => {
let builder = input.prune_memtable(index.index);
builder.build_ranges(index.row_group_index, ranges);
*builder_opt = Some(builder);
}
}
}
}
/// Builder to create file ranges.
#[derive(Default)]
struct FileRangeBuilder {
/// Context for the file.
/// None indicates nothing to read.
context: Option<FileRangeContextRef>,
/// Row selections for each row group to read.
/// It skips the row group if it is not in the map.
row_groups: BTreeMap<usize, Option<RowSelection>>,
}
impl FileRangeBuilder {
/// Builds file ranges to read.
/// Negative `row_group_index` indicates all row groups.
fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[FileRange; 2]>) {
let Some(context) = self.context.clone() else {
return;
};
if row_group_index >= 0 {
let row_group_index = row_group_index as usize;
// Scans one row group.
let Some(row_selection) = self.row_groups.get(&row_group_index) else {
return;
};
ranges.push(FileRange::new(
context,
row_group_index,
row_selection.clone(),
));
} else {
// Scans all row groups.
ranges.extend(
self.row_groups
.iter()
.map(|(row_group_index, row_selection)| {
FileRange::new(context.clone(), *row_group_index, row_selection.clone())
}),
);
}
}
}
/// Builder to create mem ranges.
struct MemRangeBuilder {
/// Ranges of a memtable.
row_groups: BTreeMap<usize, MemtableRange>,
}
impl MemRangeBuilder {
/// Builds mem ranges to read in the memtable.
/// Negative `row_group_index` indicates all row groups.
fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[MemtableRange; 2]>) {
if row_group_index >= 0 {
let row_group_index = row_group_index as usize;
// Scans one row group.
let Some(range) = self.row_groups.get(&row_group_index) else {
return;
};
ranges.push(range.clone());
} else {
ranges.extend(self.row_groups.values().cloned());
}
}
}

View File

@@ -20,10 +20,12 @@ use std::time::{Duration, Instant};
use async_stream::try_stream;
use common_telemetry::debug;
use futures::Stream;
use prometheus::IntGauge;
use store_api::storage::RegionId;
use crate::error::Result;
use crate::read::range::RowGroupIndex;
use crate::metrics::IN_PROGRESS_SCAN;
use crate::read::range::{RangeBuilderList, RowGroupIndex};
use crate::read::scan_region::StreamContext;
use crate::read::{Batch, ScannerMetrics, Source};
use crate::sst::file::FileTimeRange;
@@ -41,6 +43,7 @@ struct PartitionMetricsInner {
first_poll: Duration,
metrics: ScannerMetrics,
reader_metrics: ReaderMetrics,
in_progress_scan: IntGauge,
}
impl PartitionMetricsInner {
@@ -56,6 +59,7 @@ impl Drop for PartitionMetricsInner {
fn drop(&mut self) {
self.on_finish();
self.metrics.observe_metrics();
self.in_progress_scan.dec();
debug!(
"{} finished, region_id: {}, partition: {}, first_poll: {:?}, metrics: {:?}, reader_metrics: {:?}",
@@ -76,6 +80,8 @@ impl PartitionMetrics {
query_start: Instant,
metrics: ScannerMetrics,
) -> Self {
let partition_str = partition.to_string();
let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
let inner = PartitionMetricsInner {
region_id,
partition,
@@ -84,6 +90,7 @@ impl PartitionMetrics {
first_poll: Duration::default(),
metrics,
reader_metrics: ReaderMetrics::default(),
in_progress_scan,
};
Self(Arc::new(Mutex::new(inner)))
}
@@ -130,9 +137,10 @@ pub(crate) fn scan_mem_ranges(
part_metrics: PartitionMetrics,
index: RowGroupIndex,
time_range: FileTimeRange,
range_builder_list: Arc<RangeBuilderList>,
) -> impl Stream<Item = Result<Batch>> {
try_stream! {
let ranges = stream_ctx.build_mem_ranges(index);
let ranges = range_builder_list.build_mem_ranges(&stream_ctx.input, index);
part_metrics.inc_num_mem_ranges(ranges.len());
for range in ranges {
let build_reader_start = Instant::now();
@@ -153,17 +161,18 @@ pub(crate) fn scan_file_ranges(
part_metrics: PartitionMetrics,
index: RowGroupIndex,
read_type: &'static str,
range_builder: Arc<RangeBuilderList>,
) -> impl Stream<Item = Result<Batch>> {
try_stream! {
let mut reader_metrics = ReaderMetrics::default();
let ranges = stream_ctx
.build_file_ranges(index, &mut reader_metrics)
.await?;
let ranges = range_builder.build_file_ranges(&stream_ctx.input, index, &mut reader_metrics).await?;
part_metrics.inc_num_file_ranges(ranges.len());
for range in ranges {
let build_reader_start = Instant::now();
let reader = range.reader(None).await?;
part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
let build_cost = build_reader_start.elapsed();
part_metrics.inc_build_reader_cost(build_cost);
let compat_batch = range.compat_batch();
let mut source = Source::PruneReader(reader);
while let Some(mut batch) = source.next_batch().await? {

View File

@@ -36,6 +36,7 @@ use crate::error::{PartitionOutOfRangeSnafu, Result};
use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
use crate::read::last_row::LastRowReader;
use crate::read::merge::MergeReaderBuilder;
use crate::read::range::RangeBuilderList;
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::scan_util::{scan_file_ranges, scan_mem_ranges, PartitionMetrics};
use crate::read::{BatchReader, BoxedBatchReader, ScannerMetrics, Source};
@@ -131,12 +132,17 @@ impl SeqScan {
part_metrics: &PartitionMetrics,
) -> Result<BoxedBatchReader> {
let mut sources = Vec::new();
let range_builder_list = Arc::new(RangeBuilderList::new(
stream_ctx.input.num_memtables(),
stream_ctx.input.num_files(),
));
for part_range in partition_ranges {
build_sources(
stream_ctx,
part_range,
compaction,
part_metrics,
range_builder_list.clone(),
&mut sources,
);
}
@@ -219,17 +225,28 @@ impl SeqScan {
let stream = try_stream! {
part_metrics.on_first_poll();
let range_builder_list = Arc::new(RangeBuilderList::new(
stream_ctx.input.num_memtables(),
stream_ctx.input.num_files(),
));
// Scans each part.
for part_range in partition_ranges {
let mut sources = Vec::new();
build_sources(&stream_ctx, &part_range, compaction, &part_metrics, &mut sources);
build_sources(
&stream_ctx,
&part_range,
compaction,
&part_metrics,
range_builder_list.clone(),
&mut sources,
);
let mut reader =
Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let cache = stream_ctx.input.cache_manager.as_deref();
let cache = &stream_ctx.input.cache_manager;
let mut metrics = ScannerMetrics::default();
let mut fetch_start = Instant::now();
#[cfg(debug_assertions)]
@@ -353,6 +370,7 @@ fn build_sources(
part_range: &PartitionRange,
compaction: bool,
part_metrics: &PartitionMetrics,
range_builder_list: Arc<RangeBuilderList>,
sources: &mut Vec<Source>,
) {
// Gets range meta.
@@ -365,6 +383,7 @@ fn build_sources(
part_metrics.clone(),
*index,
range_meta.time_range,
range_builder_list.clone(),
);
Box::pin(stream) as _
} else {
@@ -373,8 +392,13 @@ fn build_sources(
} else {
"seq_scan_files"
};
let stream =
scan_file_ranges(stream_ctx.clone(), part_metrics.clone(), *index, read_type);
let stream = scan_file_ranges(
stream_ctx.clone(),
part_metrics.clone(),
*index,
read_type,
range_builder_list.clone(),
);
Box::pin(stream) as _
};
sources.push(Source::Stream(stream));

View File

@@ -30,6 +30,7 @@ use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{PartitionRange, RegionScanner, ScannerProperties};
use crate::error::{PartitionOutOfRangeSnafu, Result};
use crate::read::range::RangeBuilderList;
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::scan_util::{scan_file_ranges, scan_mem_ranges, PartitionMetrics};
use crate::read::{Batch, ScannerMetrics};
@@ -84,18 +85,31 @@ impl UnorderedScan {
stream_ctx: Arc<StreamContext>,
part_range_id: usize,
part_metrics: PartitionMetrics,
range_builder_list: Arc<RangeBuilderList>,
) -> impl Stream<Item = Result<Batch>> {
stream! {
// Gets range meta.
let range_meta = &stream_ctx.ranges[part_range_id];
for index in &range_meta.row_group_indices {
if stream_ctx.is_mem_range_index(*index) {
let stream = scan_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index, range_meta.time_range);
let stream = scan_mem_ranges(
stream_ctx.clone(),
part_metrics.clone(),
*index,
range_meta.time_range,
range_builder_list.clone(),
);
for await batch in stream {
yield batch;
}
} else {
let stream = scan_file_ranges(stream_ctx.clone(), part_metrics.clone(), *index, "unordered_scan_files");
let stream = scan_file_ranges(
stream_ctx.clone(),
part_metrics.clone(),
*index,
"unordered_scan_files",
range_builder_list.clone(),
);
for await batch in stream {
yield batch;
}
@@ -135,7 +149,11 @@ impl UnorderedScan {
let stream = try_stream! {
part_metrics.on_first_poll();
let cache = stream_ctx.input.cache_manager.as_deref();
let cache = &stream_ctx.input.cache_manager;
let range_builder_list = Arc::new(RangeBuilderList::new(
stream_ctx.input.num_memtables(),
stream_ctx.input.num_files(),
));
// Scans each part.
for part_range in part_ranges {
let mut metrics = ScannerMetrics::default();
@@ -149,6 +167,7 @@ impl UnorderedScan {
stream_ctx.clone(),
part_range.identifier,
part_metrics.clone(),
range_builder_list.clone(),
);
for await batch in stream {
let batch = batch.map_err(BoxedError::new).context(ExternalSnafu)?;

View File

@@ -195,11 +195,11 @@ mod tests {
.unwrap();
// Enable page cache.
let cache = Some(Arc::new(
let cache = Arc::new(
CacheManager::builder()
.page_cache_size(64 * 1024 * 1024)
.build(),
));
);
let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store)
.cache(cache.clone());
for _ in 0..3 {
@@ -219,15 +219,15 @@ mod tests {
// Doesn't have compressed page cached.
let page_key = PageKey::new_compressed(metadata.region_id, handle.file_id(), 0, 0);
assert!(cache.as_ref().unwrap().get_pages(&page_key).is_none());
assert!(cache.get_pages(&page_key).is_none());
// Cache 4 row groups.
for i in 0..4 {
let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), i, 0);
assert!(cache.as_ref().unwrap().get_pages(&page_key).is_some());
assert!(cache.get_pages(&page_key).is_some());
}
let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), 5, 0);
assert!(cache.as_ref().unwrap().get_pages(&page_key).is_none());
assert!(cache.get_pages(&page_key).is_none());
}
#[tokio::test]

View File

@@ -82,7 +82,7 @@ pub struct ParquetReaderBuilder {
/// can contain columns not in the parquet file.
projection: Option<Vec<ColumnId>>,
/// Manager that caches SST data.
cache_manager: Option<CacheManagerRef>,
cache_manager: CacheManagerRef,
/// Index appliers.
inverted_index_applier: Option<InvertedIndexApplierRef>,
fulltext_index_applier: Option<FulltextIndexApplierRef>,
@@ -106,7 +106,7 @@ impl ParquetReaderBuilder {
predicate: None,
time_range: None,
projection: None,
cache_manager: None,
cache_manager: CacheManagerRef::default(),
inverted_index_applier: None,
fulltext_index_applier: None,
expected_metadata: None,
@@ -138,7 +138,7 @@ impl ParquetReaderBuilder {
/// Attaches the cache to the builder.
#[must_use]
pub fn cache(mut self, cache: Option<CacheManagerRef>) -> ParquetReaderBuilder {
pub fn cache(mut self, cache: CacheManagerRef) -> ParquetReaderBuilder {
self.cache_manager = cache;
self
}
@@ -313,10 +313,12 @@ impl ParquetReaderBuilder {
let region_id = self.file_handle.region_id();
let file_id = self.file_handle.file_id();
// Tries to get from global cache.
if let Some(manager) = &self.cache_manager {
if let Some(metadata) = manager.get_parquet_meta_data(region_id, file_id).await {
return Ok(metadata);
}
if let Some(metadata) = self
.cache_manager
.get_parquet_meta_data(region_id, file_id)
.await
{
return Ok(metadata);
}
// Cache miss, load metadata directly.
@@ -324,13 +326,11 @@ impl ParquetReaderBuilder {
let metadata = metadata_loader.load().await?;
let metadata = Arc::new(metadata);
// Cache the metadata.
if let Some(cache) = &self.cache_manager {
cache.put_parquet_meta_data(
self.file_handle.region_id(),
self.file_handle.file_id(),
metadata.clone(),
);
}
self.cache_manager.put_parquet_meta_data(
self.file_handle.region_id(),
self.file_handle.file_id(),
metadata.clone(),
);
Ok(metadata)
}
@@ -846,7 +846,7 @@ pub(crate) struct RowGroupReaderBuilder {
/// Field levels to read.
field_levels: FieldLevels,
/// Cache.
cache_manager: Option<CacheManagerRef>,
cache_manager: CacheManagerRef,
}
impl RowGroupReaderBuilder {
@@ -864,7 +864,7 @@ impl RowGroupReaderBuilder {
&self.parquet_meta
}
pub(crate) fn cache_manager(&self) -> &Option<CacheManagerRef> {
pub(crate) fn cache_manager(&self) -> &CacheManagerRef {
&self.cache_manager
}

View File

@@ -48,7 +48,7 @@ pub struct InMemoryRowGroup<'a> {
region_id: RegionId,
file_id: FileId,
row_group_idx: usize,
cache_manager: Option<CacheManagerRef>,
cache_manager: CacheManagerRef,
/// Row group level cached pages for each column.
///
/// These pages are uncompressed pages of a row group.
@@ -69,7 +69,7 @@ impl<'a> InMemoryRowGroup<'a> {
file_id: FileId,
parquet_meta: &'a ParquetMetaData,
row_group_idx: usize,
cache_manager: Option<CacheManagerRef>,
cache_manager: CacheManagerRef,
file_path: &'a str,
object_store: ObjectStore,
) -> Self {
@@ -208,19 +208,18 @@ impl<'a> InMemoryRowGroup<'a> {
};
let column = self.metadata.column(idx);
if let Some(cache) = &self.cache_manager {
if !cache_uncompressed_pages(column) {
// For columns that have multiple uncompressed pages, we only cache the compressed page
// to save memory.
let page_key = PageKey::new_compressed(
self.region_id,
self.file_id,
self.row_group_idx,
idx,
);
cache
.put_pages(page_key, Arc::new(PageValue::new_compressed(data.clone())));
}
if !cache_uncompressed_pages(column) {
// For columns that have multiple uncompressed pages, we only cache the compressed page
// to save memory.
let page_key = PageKey::new_compressed(
self.region_id,
self.file_id,
self.row_group_idx,
idx,
);
self.cache_manager
.put_pages(page_key, Arc::new(PageValue::new_compressed(data.clone())));
}
*chunk = Some(Arc::new(ColumnChunkData::Dense {
@@ -242,9 +241,6 @@ impl<'a> InMemoryRowGroup<'a> {
.enumerate()
.filter(|(idx, chunk)| chunk.is_none() && projection.leaf_included(*idx))
.for_each(|(idx, chunk)| {
let Some(cache) = &self.cache_manager else {
return;
};
let column = self.metadata.column(idx);
if cache_uncompressed_pages(column) {
// Fetches uncompressed pages for the row group.
@@ -254,7 +250,7 @@ impl<'a> InMemoryRowGroup<'a> {
self.row_group_idx,
idx,
);
self.column_uncompressed_pages[idx] = cache.get_pages(&page_key);
self.column_uncompressed_pages[idx] = self.cache_manager.get_pages(&page_key);
} else {
// Fetches the compressed page from the cache.
let page_key = PageKey::new_compressed(
@@ -264,7 +260,7 @@ impl<'a> InMemoryRowGroup<'a> {
idx,
);
*chunk = cache.get_pages(&page_key).map(|page_value| {
*chunk = self.cache_manager.get_pages(&page_key).map(|page_value| {
Arc::new(ColumnChunkData::Dense {
offset: column.byte_range().0 as usize,
data: page_value.compressed.clone(),
@@ -300,7 +296,7 @@ impl<'a> InMemoryRowGroup<'a> {
key: IndexKey,
ranges: &[Range<u64>],
) -> Option<Vec<Bytes>> {
if let Some(cache) = self.cache_manager.as_ref()?.write_cache() {
if let Some(cache) = self.cache_manager.write_cache() {
return cache.file_cache().read_ranges(key, ranges).await;
}
None
@@ -331,10 +327,6 @@ impl<'a> InMemoryRowGroup<'a> {
}
};
let Some(cache) = &self.cache_manager else {
return Ok(Box::new(page_reader));
};
let column = self.metadata.column(i);
if cache_uncompressed_pages(column) {
// This column use row group level page cache.
@@ -343,7 +335,7 @@ impl<'a> InMemoryRowGroup<'a> {
let page_value = Arc::new(PageValue::new_row_group(pages));
let page_key =
PageKey::new_uncompressed(self.region_id, self.file_id, self.row_group_idx, i);
cache.put_pages(page_key, page_value.clone());
self.cache_manager.put_pages(page_key, page_value.clone());
return Ok(Box::new(RowGroupCachedReader::new(&page_value.row_group)));
}

View File

@@ -22,11 +22,11 @@ use common_telemetry::{debug, info};
use humantime_serde::re::humantime;
use snafu::ResultExt;
use store_api::metadata::{
InvalidRegionOptionChangeRequestSnafu, MetadataError, RegionMetadata, RegionMetadataBuilder,
InvalidSetRegionOptionRequestSnafu, MetadataError, RegionMetadata, RegionMetadataBuilder,
RegionMetadataRef,
};
use store_api::mito_engine_options;
use store_api::region_request::{AlterKind, ChangeOption, RegionAlterRequest};
use store_api::region_request::{AlterKind, RegionAlterRequest, SetRegionOption};
use store_api::storage::RegionId;
use crate::error::{
@@ -58,12 +58,29 @@ impl<S> RegionWorkerLoop<S> {
let version = region.version();
// fast path for memory state changes like options.
if let AlterKind::ChangeRegionOptions { options } = request.kind {
match self.handle_alter_region_options(region, version, options) {
Ok(_) => sender.send(Ok(0)),
Err(e) => sender.send(Err(e).context(InvalidMetadataSnafu)),
match request.kind {
AlterKind::SetRegionOptions { options } => {
match self.handle_alter_region_options(region, version, options) {
Ok(_) => sender.send(Ok(0)),
Err(e) => sender.send(Err(e).context(InvalidMetadataSnafu)),
}
return;
}
return;
AlterKind::UnsetRegionOptions { keys } => {
// Converts the keys to SetRegionOption.
//
// It passes an empty string to achieve the purpose of unset
match self.handle_alter_region_options(
region,
version,
keys.iter().map(Into::into).collect(),
) {
Ok(_) => sender.send(Ok(0)),
Err(e) => sender.send(Err(e).context(InvalidMetadataSnafu)),
}
return;
}
_ => {}
}
if version.metadata.schema_version != request.schema_version {
@@ -162,12 +179,12 @@ impl<S> RegionWorkerLoop<S> {
&mut self,
region: MitoRegionRef,
version: VersionRef,
options: Vec<ChangeOption>,
options: Vec<SetRegionOption>,
) -> std::result::Result<(), MetadataError> {
let mut current_options = version.options.clone();
for option in options {
match option {
ChangeOption::TTL(new_ttl) => {
SetRegionOption::TTL(new_ttl) => {
info!(
"Update region ttl: {}, previous: {:?} new: {:?}",
region.region_id, current_options.ttl, new_ttl
@@ -178,9 +195,9 @@ impl<S> RegionWorkerLoop<S> {
current_options.ttl = Some(new_ttl);
}
}
ChangeOption::Twsc(key, value) => {
SetRegionOption::Twsc(key, value) => {
let Twcs(options) = &mut current_options.compaction;
change_twcs_options(
set_twcs_options(
options,
&TwcsOptions::default(),
&key,
@@ -213,7 +230,7 @@ fn metadata_after_alteration(
Ok(Arc::new(new_meta))
}
fn change_twcs_options(
fn set_twcs_options(
options: &mut TwcsOptions,
default_option: &TwcsOptions,
key: &str,
@@ -245,30 +262,30 @@ fn change_twcs_options(
options.max_inactive_window_files = files;
}
mito_engine_options::TWCS_MAX_OUTPUT_FILE_SIZE => {
let size =
if value.is_empty() {
default_option.max_output_file_size
} else {
Some(ReadableSize::from_str(value).map_err(|_| {
InvalidRegionOptionChangeRequestSnafu { key, value }.build()
})?)
};
let size = if value.is_empty() {
default_option.max_output_file_size
} else {
Some(
ReadableSize::from_str(value)
.map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?,
)
};
log_option_update(region_id, key, options.max_output_file_size, size);
options.max_output_file_size = size;
}
mito_engine_options::TWCS_TIME_WINDOW => {
let window =
if value.is_empty() {
default_option.time_window
} else {
Some(humantime::parse_duration(value).map_err(|_| {
InvalidRegionOptionChangeRequestSnafu { key, value }.build()
})?)
};
let window = if value.is_empty() {
default_option.time_window
} else {
Some(
humantime::parse_duration(value)
.map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?,
)
};
log_option_update(region_id, key, options.time_window, window);
options.time_window = window;
}
_ => return InvalidRegionOptionChangeRequestSnafu { key, value }.fail(),
_ => return InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
}
Ok(())
}
@@ -283,7 +300,7 @@ fn parse_usize_with_default(
} else {
value
.parse::<usize>()
.map_err(|_| InvalidRegionOptionChangeRequestSnafu { key, value }.build())
.map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())
}
}

View File

@@ -49,7 +49,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// Utilizes the short circuit evaluation.
let region = if !is_mutable_empty || region.manifest_ctx.has_update().await? {
let manifest_version = region.manifest_ctx.manifest_version().await;
info!("Reopening the region: {region_id}, empty mutable: {is_mutable_empty}, manifest version: {manifest_version}");
let flushed_entry_id = region.version_control.current().last_entry_id;
info!("Reopening the region: {region_id}, empty mutable: {is_mutable_empty}, manifest version: {manifest_version}, flushed entry id: {flushed_entry_id}");
let reopened_region = Arc::new(
RegionOpener::new(
region_id,
@@ -111,6 +112,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
} else {
warn!("Skips to replay memtable for region: {}", region.region_id);
let flushed_entry_id = region.version_control.current().last_entry_id;
let on_region_opened = self.wal.on_region_opened();
on_region_opened(region_id, flushed_entry_id, &region.provider).await?;
}
if request.set_writable {

View File

@@ -12,6 +12,7 @@ workspace = true
[dependencies]
api.workspace = true
async-stream.workspace = true
async-trait = "0.1"
catalog.workspace = true
chrono.workspace = true

View File

@@ -23,6 +23,7 @@ use datafusion::parquet;
use datatypes::arrow::error::ArrowError;
use snafu::{Location, Snafu};
use table::metadata::TableType;
use tokio::time::error::Elapsed;
#[derive(Snafu)]
#[snafu(visibility(pub))]
@@ -777,6 +778,14 @@ pub enum Error {
location: Location,
json: String,
},
#[snafu(display("Canceling statement due to statement timeout"))]
StatementTimeout {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: Elapsed,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -924,6 +933,7 @@ impl ErrorExt for Error {
Error::BuildRecordBatch { source, .. } => source.status_code(),
Error::UpgradeCatalogManagerRef { .. } => StatusCode::Internal,
Error::StatementTimeout { .. } => StatusCode::Cancelled,
}
}

View File

@@ -15,13 +15,15 @@
use std::collections::{HashMap, HashSet};
use api::helper::ColumnDataTypeWrapper;
use api::v1::alter_expr::Kind;
use api::v1::alter_database_expr::Kind as AlterDatabaseKind;
use api::v1::alter_table_expr::Kind as AlterTableKind;
use api::v1::column_def::options_from_column_schema;
use api::v1::{
AddColumn, AddColumns, AlterExpr, Analyzer, ChangeColumnFulltext, ChangeColumnType,
ChangeColumnTypes, ChangeTableOptions, ColumnDataType, ColumnDataTypeExtension, CreateFlowExpr,
CreateTableExpr, CreateViewExpr, DropColumn, DropColumns, ExpireAfter, RenameTable,
SemanticType, TableName,
AddColumn, AddColumns, AlterDatabaseExpr, AlterTableExpr, Analyzer, ColumnDataType,
ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropColumn,
DropColumns, ExpireAfter, ModifyColumnType, ModifyColumnTypes, RenameTable, SemanticType,
SetColumnFulltext, SetDatabaseOptions, SetTableOptions, TableName, UnsetColumnFulltext,
UnsetDatabaseOptions, UnsetTableOptions,
};
use common_error::ext::BoxedError;
use common_grpc_expr::util::ColumnExpr;
@@ -37,7 +39,9 @@ use session::context::QueryContextRef;
use session::table_name::table_idents_to_full_name;
use snafu::{ensure, OptionExt, ResultExt};
use sql::ast::ColumnOption;
use sql::statements::alter::{AlterTable, AlterTableOperation};
use sql::statements::alter::{
AlterDatabase, AlterDatabaseOperation, AlterTable, AlterTableOperation,
};
use sql::statements::create::{
Column as SqlColumn, CreateExternalTable, CreateFlow, CreateTable, CreateView, TableConstraint,
};
@@ -472,10 +476,10 @@ pub fn column_schemas_to_defs(
.collect()
}
pub(crate) fn to_alter_expr(
pub(crate) fn to_alter_table_expr(
alter_table: AlterTable,
query_ctx: &QueryContextRef,
) -> Result<AlterExpr> {
) -> Result<AlterTableExpr> {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(alter_table.table_name(), query_ctx)
.map_err(BoxedError::new)
@@ -491,7 +495,7 @@ pub(crate) fn to_alter_expr(
AlterTableOperation::AddColumn {
column_def,
location,
} => Kind::AddColumns(AddColumns {
} => AlterTableKind::AddColumns(AddColumns {
add_columns: vec![AddColumn {
column_def: Some(
sql_column_def_to_grpc_column_def(&column_def, Some(&query_ctx.timezone()))
@@ -501,7 +505,7 @@ pub(crate) fn to_alter_expr(
location: location.as_ref().map(From::from),
}],
}),
AlterTableOperation::ChangeColumnType {
AlterTableOperation::ModifyColumnType {
column_name,
target_type,
} => {
@@ -510,31 +514,36 @@ pub(crate) fn to_alter_expr(
let (target_type, target_type_extension) = ColumnDataTypeWrapper::try_from(target_type)
.map(|w| w.to_parts())
.context(ColumnDataTypeSnafu)?;
Kind::ChangeColumnTypes(ChangeColumnTypes {
change_column_types: vec![ChangeColumnType {
AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
modify_column_types: vec![ModifyColumnType {
column_name: column_name.value,
target_type: target_type as i32,
target_type_extension,
}],
})
}
AlterTableOperation::DropColumn { name } => Kind::DropColumns(DropColumns {
AlterTableOperation::DropColumn { name } => AlterTableKind::DropColumns(DropColumns {
drop_columns: vec![DropColumn {
name: name.value.to_string(),
}],
}),
AlterTableOperation::RenameTable { new_table_name } => Kind::RenameTable(RenameTable {
new_table_name: new_table_name.to_string(),
}),
AlterTableOperation::ChangeTableOptions { options } => {
Kind::ChangeTableOptions(ChangeTableOptions {
change_table_options: options.into_iter().map(Into::into).collect(),
AlterTableOperation::RenameTable { new_table_name } => {
AlterTableKind::RenameTable(RenameTable {
new_table_name: new_table_name.to_string(),
})
}
AlterTableOperation::ChangeColumnFulltext {
AlterTableOperation::SetTableOptions { options } => {
AlterTableKind::SetTableOptions(SetTableOptions {
table_options: options.into_iter().map(Into::into).collect(),
})
}
AlterTableOperation::UnsetTableOptions { keys } => {
AlterTableKind::UnsetTableOptions(UnsetTableOptions { keys })
}
AlterTableOperation::SetColumnFulltext {
column_name,
options,
} => Kind::ChangeColumnFulltext(ChangeColumnFulltext {
} => AlterTableKind::SetColumnFulltext(SetColumnFulltext {
column_name: column_name.value,
enable: options.enable,
analyzer: match options.analyzer {
@@ -543,9 +552,14 @@ pub(crate) fn to_alter_expr(
},
case_sensitive: options.case_sensitive,
}),
AlterTableOperation::UnsetColumnFulltext { column_name } => {
AlterTableKind::UnsetColumnFulltext(UnsetColumnFulltext {
column_name: column_name.value,
})
}
};
Ok(AlterExpr {
Ok(AlterTableExpr {
catalog_name,
schema_name,
table_name,
@@ -553,6 +567,33 @@ pub(crate) fn to_alter_expr(
})
}
/// Try to cast the `[AlterDatabase]` statement into gRPC `[AlterDatabaseExpr]`.
pub fn to_alter_database_expr(
alter_database: AlterDatabase,
query_ctx: &QueryContextRef,
) -> Result<AlterDatabaseExpr> {
let catalog = query_ctx.current_catalog();
let schema = alter_database.database_name;
let kind = match alter_database.alter_operation {
AlterDatabaseOperation::SetDatabaseOption { options } => {
let options = options.into_iter().map(Into::into).collect();
AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
set_database_options: options,
})
}
AlterDatabaseOperation::UnsetDatabaseOption { keys } => {
AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys })
}
};
Ok(AlterDatabaseExpr {
catalog_name: catalog.to_string(),
schema_name: schema.to_string(),
kind: Some(kind),
})
}
/// Try to cast the `[CreateViewExpr]` statement into gRPC `[CreateViewExpr]`.
pub fn to_create_view_expr(
stmt: CreateView,
@@ -648,6 +689,7 @@ pub fn to_create_flow_task_expr(
#[cfg(test)]
mod tests {
use api::v1::{SetDatabaseOptions, UnsetDatabaseOptions};
use datatypes::value::Value;
use session::context::{QueryContext, QueryContextBuilder};
use sql::dialect::GreptimeDbDialect;
@@ -753,6 +795,55 @@ mod tests {
#[test]
fn test_to_alter_expr() {
let sql = "ALTER DATABASE greptime SET key1='value1', key2='value2';";
let stmt =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap()
.pop()
.unwrap();
let Statement::AlterDatabase(alter_database) = stmt else {
unreachable!()
};
let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
let kind = expr.kind.unwrap();
let AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions {
set_database_options,
}) = kind
else {
unreachable!()
};
assert_eq!(2, set_database_options.len());
assert_eq!("key1", set_database_options[0].key);
assert_eq!("value1", set_database_options[0].value);
assert_eq!("key2", set_database_options[1].key);
assert_eq!("value2", set_database_options[1].value);
let sql = "ALTER DATABASE greptime UNSET key1, key2;";
let stmt =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap()
.pop()
.unwrap();
let Statement::AlterDatabase(alter_database) = stmt else {
unreachable!()
};
let expr = to_alter_database_expr(alter_database, &QueryContext::arc()).unwrap();
let kind = expr.kind.unwrap();
let AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions { keys }) = kind else {
unreachable!()
};
assert_eq!(2, keys.len());
assert!(keys.contains(&"key1".to_string()));
assert!(keys.contains(&"key2".to_string()));
let sql = "ALTER TABLE monitor add column ts TIMESTAMP default '2024-01-30T00:01:01';";
let stmt =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
@@ -760,15 +851,15 @@ mod tests {
.pop()
.unwrap();
let Statement::Alter(alter_table) = stmt else {
let Statement::AlterTable(alter_table) = stmt else {
unreachable!()
};
// query context with system timezone UTC.
let expr = to_alter_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
let kind = expr.kind.unwrap();
let Kind::AddColumns(AddColumns { add_columns, .. }) = kind else {
let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
unreachable!()
};
@@ -786,10 +877,10 @@ mod tests {
.timezone(Timezone::from_tz_string("+08:00").unwrap())
.build()
.into();
let expr = to_alter_expr(alter_table, &ctx).unwrap();
let expr = to_alter_table_expr(alter_table, &ctx).unwrap();
let kind = expr.kind.unwrap();
let Kind::AddColumns(AddColumns { add_columns, .. }) = kind else {
let AlterTableKind::AddColumns(AddColumns { add_columns, .. }) = kind else {
unreachable!()
};
@@ -803,7 +894,7 @@ mod tests {
}
#[test]
fn test_to_alter_change_column_type_expr() {
fn test_to_alter_modify_column_type_expr() {
let sql = "ALTER TABLE monitor MODIFY COLUMN mem_usage STRING;";
let stmt =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
@@ -811,30 +902,30 @@ mod tests {
.pop()
.unwrap();
let Statement::Alter(alter_table) = stmt else {
let Statement::AlterTable(alter_table) = stmt else {
unreachable!()
};
// query context with system timezone UTC.
let expr = to_alter_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
let expr = to_alter_table_expr(alter_table.clone(), &QueryContext::arc()).unwrap();
let kind = expr.kind.unwrap();
let Kind::ChangeColumnTypes(ChangeColumnTypes {
change_column_types,
let AlterTableKind::ModifyColumnTypes(ModifyColumnTypes {
modify_column_types,
}) = kind
else {
unreachable!()
};
assert_eq!(1, change_column_types.len());
let change_column_type = &change_column_types[0];
assert_eq!(1, modify_column_types.len());
let modify_column_type = &modify_column_types[0];
assert_eq!("mem_usage", change_column_type.column_name);
assert_eq!("mem_usage", modify_column_type.column_name);
assert_eq!(
ColumnDataType::String as i32,
change_column_type.target_type
modify_column_type.target_type
);
assert!(change_column_type.target_type_extension.is_none());
assert!(modify_column_type.target_type_extension.is_none());
}
fn new_test_table_names() -> Vec<TableName> {

View File

@@ -15,11 +15,11 @@
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::alter_expr::Kind;
use api::v1::alter_table_expr::Kind;
use api::v1::region::{InsertRequests as RegionInsertRequests, RegionRequestHeader};
use api::v1::{
AlterExpr, ColumnDataType, ColumnSchema, CreateTableExpr, InsertRequests, RowInsertRequest,
RowInsertRequests, SemanticType,
AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, InsertRequests,
RowInsertRequest, RowInsertRequests, SemanticType,
};
use catalog::CatalogManagerRef;
use client::{OutputData, OutputMeta};
@@ -692,7 +692,7 @@ impl Inserter {
req: &RowInsertRequest,
table: &TableRef,
ctx: &QueryContextRef,
) -> Result<Option<AlterExpr>> {
) -> Result<Option<AlterTableExpr>> {
let catalog_name = ctx.current_catalog();
let schema_name = ctx.current_schema();
let table_name = table.table_info().name.clone();
@@ -705,7 +705,7 @@ impl Inserter {
return Ok(None);
};
Ok(Some(AlterExpr {
Ok(Some(AlterTableExpr {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name: table_name.to_string(),

View File

@@ -24,11 +24,14 @@ mod show;
mod tql;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use async_stream::stream;
use catalog::kvbackend::KvBackendCatalogManager;
use catalog::CatalogManagerRef;
use client::RecordBatches;
use client::{OutputData, RecordBatches};
use common_error::ext::BoxedError;
use common_meta::cache::TableRouteCacheRef;
use common_meta::cache_invalidator::CacheInvalidatorRef;
@@ -39,15 +42,19 @@ use common_meta::key::view_info::{ViewInfoManager, ViewInfoManagerRef};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_query::Output;
use common_recordbatch::error::StreamTimeoutSnafu;
use common_recordbatch::RecordBatchStreamWrapper;
use common_telemetry::tracing;
use common_time::range::TimestampRange;
use common_time::Timestamp;
use datafusion_expr::LogicalPlan;
use futures::stream::{Stream, StreamExt};
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
use query::parser::QueryStatement;
use query::QueryEngineRef;
use session::context::{Channel, QueryContextRef};
use session::table_name::table_idents_to_full_name;
use set::set_query_timeout;
use snafu::{ensure, OptionExt, ResultExt};
use sql::statements::copy::{CopyDatabase, CopyDatabaseArgument, CopyTable, CopyTableArgument};
use sql::statements::set_variables::SetVariables;
@@ -63,8 +70,8 @@ use table::TableRef;
use self::set::{set_bytea_output, set_datestyle, set_timezone, validate_client_encoding};
use crate::error::{
self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, NotSupportedSnafu,
PlanStatementSnafu, Result, SchemaNotFoundSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
UpgradeCatalogManagerRefSnafu,
PlanStatementSnafu, Result, SchemaNotFoundSnafu, StatementTimeoutSnafu,
TableMetadataManagerSnafu, TableNotFoundSnafu, UpgradeCatalogManagerRefSnafu,
};
use crate::insert::InserterRef;
use crate::statement::copy_database::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
@@ -217,7 +224,12 @@ impl StatementExecutor {
)
.await
}
Statement::Alter(alter_table) => self.alter_table(alter_table, query_ctx).await,
Statement::AlterTable(alter_table) => self.alter_table(alter_table, query_ctx).await,
Statement::AlterDatabase(alter_database) => {
self.alter_database(alter_database, query_ctx).await
}
Statement::DropTable(stmt) => {
let mut table_names = Vec::with_capacity(stmt.table_names().len());
for table_name_stmt in stmt.table_names() {
@@ -275,6 +287,7 @@ impl StatementExecutor {
.context(SchemaNotFoundSnafu {
schema_info: &database,
})?
.into_inner()
.into();
self.show_create_database(&database, opts.into()).await
@@ -338,6 +351,28 @@ impl StatementExecutor {
"DATESTYLE" => set_datestyle(set_var.value, query_ctx)?,
"CLIENT_ENCODING" => validate_client_encoding(set_var)?,
"MAX_EXECUTION_TIME" => match query_ctx.channel() {
Channel::Mysql => set_query_timeout(set_var.value, query_ctx)?,
Channel::Postgres => {
query_ctx.set_warning(format!("Unsupported set variable {}", var_name))
}
_ => {
return NotSupportedSnafu {
feat: format!("Unsupported set variable {}", var_name),
}
.fail()
}
},
"STATEMENT_TIMEOUT" => {
if query_ctx.channel() == Channel::Postgres {
set_query_timeout(set_var.value, query_ctx)?
} else {
return NotSupportedSnafu {
feat: format!("Unsupported set variable {}", var_name),
}
.fail();
}
}
_ => {
// for postgres, we give unknown SET statements a warning with
// success, this is prevent the SET call becoming a blocker
@@ -387,8 +422,19 @@ impl StatementExecutor {
#[tracing::instrument(skip_all)]
async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<Output> {
let plan = self.plan(&stmt, query_ctx.clone()).await?;
self.exec_plan(plan, query_ctx).await
let timeout = derive_timeout(&stmt, &query_ctx);
match timeout {
Some(timeout) => {
let start = tokio::time::Instant::now();
let output = tokio::time::timeout(timeout, self.plan_exec_inner(stmt, query_ctx))
.await
.context(StatementTimeoutSnafu)?;
// compute remaining timeout
let remaining_timeout = timeout.checked_sub(start.elapsed()).unwrap_or_default();
Ok(attach_timeout(output?, remaining_timeout))
}
None => self.plan_exec_inner(stmt, query_ctx).await,
}
}
async fn get_table(&self, table_ref: &TableReference<'_>) -> Result<TableRef> {
@@ -405,6 +451,49 @@ impl StatementExecutor {
table_name: table_ref.to_string(),
})
}
async fn plan_exec_inner(
&self,
stmt: QueryStatement,
query_ctx: QueryContextRef,
) -> Result<Output> {
let plan = self.plan(&stmt, query_ctx.clone()).await?;
self.exec_plan(plan, query_ctx).await
}
}
fn attach_timeout(output: Output, mut timeout: Duration) -> Output {
match output.data {
OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
OutputData::Stream(mut stream) => {
let schema = stream.schema();
let s = Box::pin(stream! {
let start = tokio::time::Instant::now();
while let Some(item) = tokio::time::timeout(timeout, stream.next()).await.context(StreamTimeoutSnafu)? {
yield item;
timeout = timeout.checked_sub(tokio::time::Instant::now() - start).unwrap_or(Duration::ZERO);
}
}) as Pin<Box<dyn Stream<Item = _> + Send>>;
let stream = RecordBatchStreamWrapper {
schema,
stream: s,
output_ordering: None,
metrics: Default::default(),
};
Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
}
}
}
/// If the relevant variables are set, the timeout is enforced for all PostgreSQL statements.
/// For MySQL, it applies only to read-only statements.
fn derive_timeout(stmt: &QueryStatement, query_ctx: &QueryContextRef) -> Option<Duration> {
let query_timeout = query_ctx.query_timeout()?;
match (query_ctx.channel(), stmt) {
(Channel::Mysql, QueryStatement::Sql(Statement::Query(_)))
| (Channel::Postgres, QueryStatement::Sql(_)) => Some(query_timeout),
(_, _) => None,
}
}
fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<CopyTableRequest> {

View File

@@ -17,7 +17,9 @@ use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::meta::CreateFlowTask as PbCreateFlowTask;
use api::v1::{column_def, AlterExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr};
use api::v1::{
column_def, AlterDatabaseExpr, AlterTableExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr,
};
use catalog::CatalogManagerRef;
use chrono::Utc;
use common_catalog::consts::{is_readonly_schema, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
@@ -26,7 +28,7 @@ use common_error::ext::BoxedError;
use common_meta::cache_invalidator::Context;
use common_meta::ddl::ExecutorContext;
use common_meta::instruction::CacheIdent;
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
use common_meta::key::NAME_PATTERN;
use common_meta::rpc::ddl::{
CreateFlowTask, DdlTask, DropFlowTask, DropViewTask, SubmitDdlTaskRequest,
@@ -51,7 +53,7 @@ use regex::Regex;
use session::context::QueryContextRef;
use session::table_name::table_idents_to_full_name;
use snafu::{ensure, OptionExt, ResultExt};
use sql::statements::alter::AlterTable;
use sql::statements::alter::{AlterDatabase, AlterTable};
use sql::statements::create::{
CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, Partitions,
};
@@ -71,10 +73,10 @@ use crate::error::{
self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu,
ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu, DeserializePartitionSnafu,
EmptyDdlExprSnafu, ExtractTableNamesSnafu, FlowNotFoundSnafu, InvalidPartitionRuleSnafu,
InvalidPartitionSnafu, InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu,
ParseSqlValueSnafu, Result, SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu,
SubstraitCodecSnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu,
InvalidPartitionSnafu, InvalidSqlSnafu, InvalidTableNameSnafu, InvalidViewNameSnafu,
InvalidViewStmtSnafu, ParseSqlValueSnafu, Result, SchemaInUseSnafu, SchemaNotFoundSnafu,
SchemaReadOnlySnafu, SubstraitCodecSnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu,
TableNotFoundSnafu, UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu,
};
use crate::expr_factory;
use crate::statement::show::create_partitions_stmt;
@@ -125,7 +127,8 @@ impl StatementExecutor {
schema: &schema,
})
.await
.context(TableMetadataManagerSnafu)?;
.context(TableMetadataManagerSnafu)?
.map(|v| v.into_inner());
let quote_style = ctx.quote_style();
let mut create_stmt =
@@ -465,6 +468,12 @@ impl StatementExecutor {
expr: CreateViewExpr,
ctx: QueryContextRef,
) -> Result<TableRef> {
ensure! {
!(expr.create_if_not_exists & expr.or_replace),
InvalidSqlSnafu {
err_msg: "syntax error Create Or Replace and If Not Exist cannot be used together",
}
};
let _timer = crate::metrics::DIST_CREATE_VIEW.start_timer();
let schema_exists = self
@@ -717,7 +726,7 @@ impl StatementExecutor {
#[tracing::instrument(skip_all)]
pub async fn alter_logical_tables(
&self,
alter_table_exprs: Vec<AlterExpr>,
alter_table_exprs: Vec<AlterTableExpr>,
query_context: QueryContextRef,
) -> Result<Output> {
let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
@@ -879,7 +888,7 @@ impl StatementExecutor {
&self,
table_id: TableId,
table_info: Arc<TableInfo>,
expr: AlterExpr,
expr: AlterTableExpr,
) -> Result<()> {
let request: AlterTableRequest = common_grpc_expr::alter_expr_to_request(table_id, expr)
.context(AlterExprToRequestSnafu)?;
@@ -915,14 +924,14 @@ impl StatementExecutor {
alter_table: AlterTable,
query_context: QueryContextRef,
) -> Result<Output> {
let expr = expr_factory::to_alter_expr(alter_table, &query_context)?;
let expr = expr_factory::to_alter_table_expr(alter_table, &query_context)?;
self.alter_table_inner(expr, query_context).await
}
#[tracing::instrument(skip_all)]
pub async fn alter_table_inner(
&self,
expr: AlterExpr,
expr: AlterTableExpr,
query_context: QueryContextRef,
) -> Result<Output> {
ensure!(
@@ -1035,6 +1044,58 @@ impl StatementExecutor {
Ok(Output::new_with_affected_rows(0))
}
#[tracing::instrument(skip_all)]
pub async fn alter_database(
&self,
alter_expr: AlterDatabase,
query_context: QueryContextRef,
) -> Result<Output> {
let alter_expr = expr_factory::to_alter_database_expr(alter_expr, &query_context)?;
self.alter_database_inner(alter_expr, query_context).await
}
#[tracing::instrument(skip_all)]
pub async fn alter_database_inner(
&self,
alter_expr: AlterDatabaseExpr,
query_context: QueryContextRef,
) -> Result<Output> {
ensure!(
!is_readonly_schema(&alter_expr.schema_name),
SchemaReadOnlySnafu {
name: query_context.current_schema().clone()
}
);
let exists = self
.catalog_manager
.schema_exists(&alter_expr.catalog_name, &alter_expr.schema_name, None)
.await
.context(CatalogSnafu)?;
ensure!(
exists,
SchemaNotFoundSnafu {
schema_info: alter_expr.schema_name,
}
);
let cache_ident = [CacheIdent::SchemaName(SchemaName {
catalog_name: alter_expr.catalog_name.clone(),
schema_name: alter_expr.schema_name.clone(),
})];
self.alter_database_procedure(alter_expr, query_context)
.await?;
// Invalidates local cache ASAP.
self.cache_invalidator
.invalidate(&Context::default(), &cache_ident)
.await
.context(error::InvalidateTableCacheSnafu)?;
Ok(Output::new_with_affected_rows(0))
}
async fn create_table_procedure(
&self,
create_table: CreateTableExpr,
@@ -1073,7 +1134,7 @@ impl StatementExecutor {
async fn alter_logical_tables_procedure(
&self,
tables_data: Vec<AlterExpr>,
tables_data: Vec<AlterTableExpr>,
query_context: QueryContextRef,
) -> Result<SubmitDdlTaskResponse> {
let request = SubmitDdlTaskRequest {
@@ -1129,6 +1190,22 @@ impl StatementExecutor {
.context(error::ExecuteDdlSnafu)
}
async fn alter_database_procedure(
&self,
alter_expr: AlterDatabaseExpr,
query_context: QueryContextRef,
) -> Result<SubmitDdlTaskResponse> {
let request = SubmitDdlTaskRequest {
query_context,
task: DdlTask::new_alter_database(alter_expr),
};
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)
.await
.context(error::ExecuteDdlSnafu)
}
async fn truncate_table_procedure(
&self,
table_name: &TableName,

View File

@@ -12,7 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use common_time::Timezone;
use lazy_static::lazy_static;
use regex::Regex;
use session::context::Channel::Postgres;
use session::context::QueryContextRef;
use session::session_config::{PGByteaOutputValue, PGDateOrder, PGDateTimeStyle};
use snafu::{ensure, OptionExt, ResultExt};
@@ -21,6 +26,15 @@ use sql::statements::set_variables::SetVariables;
use crate::error::{InvalidConfigValueSnafu, InvalidSqlSnafu, NotSupportedSnafu, Result};
lazy_static! {
// Regex rules:
// The string must start with a number (one or more digits).
// The number must be followed by one of the valid time units (ms, s, min, h, d).
// The string must end immediately after the unit, meaning there can be no extra
// characters or spaces after the valid time specification.
static ref PG_TIME_INPUT_REGEX: Regex = Regex::new(r"^(\d+)(ms|s|min|h|d)$").unwrap();
}
pub fn set_timezone(exprs: Vec<Expr>, ctx: QueryContextRef) -> Result<()> {
let tz_expr = exprs.first().context(NotSupportedSnafu {
feat: "No timezone find in set variable statement",
@@ -177,3 +191,96 @@ pub fn set_datestyle(exprs: Vec<Expr>, ctx: QueryContextRef) -> Result<()> {
.set_pg_datetime_style(style.unwrap_or(old_style), order.unwrap_or(older_order));
Ok(())
}
pub fn set_query_timeout(exprs: Vec<Expr>, ctx: QueryContextRef) -> Result<()> {
let timeout_expr = exprs.first().context(NotSupportedSnafu {
feat: "No timeout value find in set query timeout statement",
})?;
match timeout_expr {
Expr::Value(Value::Number(timeout, _)) => {
match timeout.parse::<u64>() {
Ok(timeout) => ctx.set_query_timeout(Duration::from_millis(timeout)),
Err(_) => {
return NotSupportedSnafu {
feat: format!("Invalid timeout expr {} in set variable statement", timeout),
}
.fail()
}
}
Ok(())
}
// postgres support time units i.e. SET STATEMENT_TIMEOUT = '50ms';
Expr::Value(Value::SingleQuotedString(timeout))
| Expr::Value(Value::DoubleQuotedString(timeout)) => {
if ctx.channel() != Postgres {
return NotSupportedSnafu {
feat: format!("Invalid timeout expr {} in set variable statement", timeout),
}
.fail();
}
let timeout = parse_pg_query_timeout_input(timeout)?;
ctx.set_query_timeout(Duration::from_millis(timeout));
Ok(())
}
expr => NotSupportedSnafu {
feat: format!(
"Unsupported timeout expr {} in set variable statement",
expr
),
}
.fail(),
}
}
// support time units in ms, s, min, h, d for postgres protocol.
// https://www.postgresql.org/docs/8.4/config-setting.html#:~:text=Valid%20memory%20units%20are%20kB,%2C%20and%20d%20(days).
fn parse_pg_query_timeout_input(input: &str) -> Result<u64> {
match input.parse::<u64>() {
Ok(timeout) => Ok(timeout),
Err(_) => {
if let Some(captures) = PG_TIME_INPUT_REGEX.captures(input) {
let value = captures[1].parse::<u64>().expect("regex failed");
let unit = &captures[2];
match unit {
"ms" => Ok(value),
"s" => Ok(value * 1000),
"min" => Ok(value * 60 * 1000),
"h" => Ok(value * 60 * 60 * 1000),
"d" => Ok(value * 24 * 60 * 60 * 1000),
_ => unreachable!("regex failed"),
}
} else {
NotSupportedSnafu {
feat: format!(
"Unsupported timeout expr {} in set variable statement",
input
),
}
.fail()
}
}
}
}
#[cfg(test)]
mod test {
use crate::statement::set::parse_pg_query_timeout_input;
#[test]
fn test_parse_pg_query_timeout_input() {
assert!(parse_pg_query_timeout_input("").is_err());
assert!(parse_pg_query_timeout_input(" 50 ms").is_err());
assert!(parse_pg_query_timeout_input("5s 1ms").is_err());
assert!(parse_pg_query_timeout_input("3a").is_err());
assert!(parse_pg_query_timeout_input("1.5min").is_err());
assert!(parse_pg_query_timeout_input("ms").is_err());
assert!(parse_pg_query_timeout_input("a").is_err());
assert!(parse_pg_query_timeout_input("-1").is_err());
assert_eq!(50, parse_pg_query_timeout_input("50").unwrap());
assert_eq!(12, parse_pg_query_timeout_input("12ms").unwrap());
assert_eq!(2000, parse_pg_query_timeout_input("2s").unwrap());
assert_eq!(60000, parse_pg_query_timeout_input("1min").unwrap());
}
}

View File

@@ -127,7 +127,8 @@ impl StatementExecutor {
schema: &table_name.schema_name,
})
.await
.context(TableMetadataManagerSnafu)?;
.context(TableMetadataManagerSnafu)?
.map(|v| v.into_inner());
let partitions = self
.partition_manager

View File

@@ -16,6 +16,7 @@
#![feature(int_roundings)]
#![feature(trait_upcasting)]
#![feature(try_blocks)]
#![feature(stmt_expr_attributes)]
mod analyze;
pub mod dataframe;

Some files were not shown because too many files have changed in this diff Show More