Compare commits

..

43 Commits

Author SHA1 Message Date
Zhenchi
21bcd0fc81 chore: bump version to v0.10.2
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2024-11-26 21:04:16 +08:00
Lei, HUANG
49db50af81 feat: use cache kv manager for SchemaMetadataManager (#5053)
* feat: add cache for schema options

* fix/use-cache-kv-manager: Add cache invalidation handling to Datanode's heartbeat task

 • Implement InvalidateSchemaCacheHandler in heartbeat.rs to handle cache invalidation instructions.
 • Update HeartbeatTask constructor to accept cached_kv_backend and pass it to InvalidateSchemaCacheHandler.
 • Modify DatanodeBuilder to clone cached_kv_backend when creating schema_metadata_manager.
 • Refactor MetasrvCacheInvalidator in cache_invalidator.rs to reuse MailboxMessage for broadcasting to different channels.

* fix: only remove schema related cache entries

* chore: add more tests

* fix/use-cache-kv-manager: Moved InvalidateSchemaCacheHandler to a separate module

 • Extracted InvalidateSchemaCacheHandler and associated tests into a new file cache_invalidator.rs
 • Removed async_trait and CacheInvalidator related code from heartbeat.rs
 • Added cache_invalidator module declaration in handler.rs

* fix: unit tests

* fix/use-cache-kv-manager:
 Standardize TODO comment format in CachedKvBackend txn method

* Update src/datanode/src/heartbeat/handler/cache_invalidator.rs

* Update src/datanode/src/heartbeat/handler/cache_invalidator.rs

* Update src/datanode/src/heartbeat/handler/cache_invalidator.rs

---------

Co-authored-by: jeremyhi <jiachun_feng@proton.me>
2024-11-26 21:04:16 +08:00
Yingwen
8c804f6eeb fix: pass series row selector to file range reader (#5054) 2024-11-26 21:04:16 +08:00
Lei, HUANG
e060280ddc fix(metric-engine): set ttl also on opening metadata regions (#5051)
* fix/metric-metadata-region-options: Remove APPEND_MODE_KEY and refactor TTL option handling in MetricEngineInner

* fix/metric-metadata-region-options: Refactor metadata region options into a shared function

 • Extract metadata region options into region_options_for_metadata_region function
 • Replace inline options map with a call to the new shared function in both create.rs and open.rs files

* fix: exclude typos

* fix/metric-metadata-region-options:
 Refactor metadata region options to accept original options and remove APPEND_MODE_KEY
2024-11-26 21:04:16 +08:00
LFC
0787c5da66 refactor: expose configs for http clients used in object store (#5041) 2024-11-26 21:04:16 +08:00
Zhenchi
1cd6abb61f chore: bump version to v0.10.1 (#5048)
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2024-11-25 03:20:54 +00:00
Ruihang Xia
e3927ea6f7 fix: prevent metadata region from inheriting database ttl (#5044)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-11-25 02:13:11 +00:00
Zhenchi
a6571d3392 chore: bump version to 0.10.0 (#5040)
* chore: bump version to 0.10.0

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix sqlness version regex

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2024-11-22 03:56:25 +00:00
Yohan Wal
1255638e84 refactor: unify mysql execute through cli and protocol (#5038)
refactor: mysql execute
2024-11-22 03:55:09 +00:00
Yohan Wal
1578c004b0 fix: prepare param mismatch (#5025)
* fix: prepare param mismatch

* test: clear state

* fix: minus 1
2024-11-22 02:31:53 +00:00
Yohan Wal
5f8d849981 feat: alter database ttl (#5035)
* feat: alter databaset ttl

* fix: make clippy happy

* feat: add unset database option

* fix: happy ci

* fix: happy clippy

* chore: fmt toml

* fix: fix header

* refactor: introduce `AlterDatabaseKind`

* chore: apply suggestions from CR

* refactor: add unset database option support

* test: add unit tests

* test: add sqlness tests

* feat: invalidate schema name value cache

* Apply suggestions from code review

* chore: fmt

* chore: update error messages

* test: add more test cases

* test: add more test cases

* Apply suggestions from code review

* chore: apply suggestions from CR

---------

Co-authored-by: WenyXu <wenymedia@gmail.com>
2024-11-21 12:41:41 +00:00
Lei, HUANG
3029b47a89 fix: find latest window (#5037)
* fix: find latest window

* more test files
2024-11-21 04:56:03 +00:00
Weny Xu
14d997e2d1 feat: add unset table options support (#5034)
* feat: add unset table options support

* test: add tests

* chore: update greptime-proto

* chore: add comments
2024-11-21 03:52:56 +00:00
Zhenchi
0aab68c23b feat(vector): add conversion between vector and string (#5029)
* feat(vector): add conversion between vector and string

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix sqlness

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2024-11-20 08:42:00 +00:00
Weny Xu
027284ed1b chore(cli): set default timeout for cli commands (#5021)
* chore(cli): set default timeout for cli commands

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* chore: update comments

* fix: treats `None` as `0s` to disable server-side default timeout

* chore: update comments
2024-11-20 07:36:17 +00:00
Ruihang Xia
6a958e2c36 feat: reimplement limit in PartSort to reduce memory footprint (#5018)
* feat: support windowed sort with where condition

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix split logic

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* modify fuzz test to reflect logic change

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* feat: handle sort that wont preserving partition

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* clean up

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix test case and add more cases

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* basic impl

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* install topk

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* tests: add test for limit

* add debug assertion

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: discord9 <discord9@163.com>
2024-11-20 07:11:49 +00:00
Zhenchi
db345c92df feat(vector): remove simsimd and use nalgebra instead (#5027)
* feat(vector): remove `simsimd` and use `nalgebra` instead

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* keep thing simple

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2024-11-20 06:57:10 +00:00
Yohan Wal
55ced9aa71 refactor: split up different stmts (#4997)
* refactor: set and unset

* chore: error message

* fix: reset Cargo.lock

* Apply suggestions from code review

Co-authored-by: jeremyhi <jiachun_feng@proton.me>

* Apply suggestions from code review

Co-authored-by: Weny Xu <wenymedia@gmail.com>

---------

Co-authored-by: jeremyhi <jiachun_feng@proton.me>
Co-authored-by: Weny Xu <wenymedia@gmail.com>
2024-11-20 06:02:51 +00:00
discord9
3633f25d0c feat: also shutdown gracefully on sigterm on unix (#5023)
* feat: support SIGTERM on unix

* chore: log

* fix: Result type
2024-11-19 15:20:33 +00:00
Yingwen
63bbfd04c7 fix: prune memtable/files range independently in each partition (#4998)
* feat: prune in each partition

* chore: change pick log to trace

* chore: add in progress partition scan to metrics

* feat: seqscan support pruning in partition

* chore: remove commented codes
2024-11-19 12:43:30 +00:00
Zhenchi
2f260d8b27 fix: android build failed due to simsimd (#5019)
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2024-11-19 11:21:58 +00:00
discord9
4d8fe29ea8 feat: CREATE OR REPLACE FLOW (#5001)
* feat: Replace flow

* refactor: better show create flow&tests: better check

* tests: sqlness result update

* tests: unit test for update

* refactor: cmp with raw bytes

* refactor: rename

* refactor: per review
2024-11-19 08:44:57 +00:00
Zhenchi
dbb3f2d98d fix: inverted index constraint to be case-insensitive (#5020)
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2024-11-19 08:30:20 +00:00
ZonaHe
9926e3bc78 feat: update dashboard to v0.6.1 (#5017)
Co-authored-by: ZonaHex <ZonaHex@users.noreply.github.com>
2024-11-19 07:28:10 +00:00
dennis zhuang
0dd02e93cf feat: make greatest supports timestamp and datetime types (#5005)
* feat: make greatest supports timestamp and datetime types

* chore: style

Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>

* refactor: greatest with gt_time_types macro

---------

Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>
2024-11-19 07:01:24 +00:00
Zhenchi
73e6bf399d test: reduce round precision to avoid platform diff (#5013)
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2024-11-18 16:37:15 +00:00
discord9
4402f638cd fix: distinct respect in range (#5015)
* fix: distinct respect in range

* tests: sqlness

* chore: newline
2024-11-18 12:11:07 +00:00
shuiyisong
c199604ece feat: Loki remote write (#4941)
* chore: add debug loki remote write url

* chore: add decode snappy

* chore: format output

* feature: impl loki remote write

* fix: special labels deserialize

* chore: move result to folder

* chore: finish todo in loki write

* test: loki write

* chore: fix cr issue

* chore: fix cr issue

* chore: fix cr issue

* chore: update pre-commit config

* chore: fix cr issue

Co-authored-by: dennis zhuang <killme2008@gmail.com>

---------

Co-authored-by: dennis zhuang <killme2008@gmail.com>
2024-11-18 08:39:17 +00:00
Yohan Wal
2b72e66536 test: subquery test migrated from duckdb (#4985)
* test: subquery test migrated from duckdb

* test: update test

* test: skip unsupported features and add sources
2024-11-18 08:37:06 +00:00
Weny Xu
7c135c0ef9 feat: introduce DynamicTimeoutLayer (#5006)
* feat: introduce `DynamicTimeoutLayer`

* test: add unit test

* chore: apply suggestions from CR

* feat: add timeout option for cli
2024-11-18 07:10:40 +00:00
Weny Xu
9289265f54 fix: correct unset_maintenance_mode behavior (#5009) 2024-11-18 06:39:13 +00:00
Lanqing Yang
485782af51 fix: ensure Create Or Replace and If Not Exist cannot coexist in create view (#5003)
ensure Create Or Replace and If Not Exist cannot coexist in create view statement
2024-11-17 07:08:30 +00:00
Weny Xu
4b263ef1cc fix: obsolete wal entires while opening a migrated region (#4993)
* fix: delete obsolete wal entrie while opening a migrated region

* chore: add logs

* chore: rust fmt

* fix: fix fuzz test
2024-11-15 10:55:22 +00:00
Weny Xu
08f59008cc refactor: introduce MaintenanceModeManager (#4994)
* refactor: introduce MaintenanceModeManager

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* chore: apply suggestions from CR
2024-11-15 07:15:22 +00:00
Yohan Wal
a2852affeb chore: rename change to modify (#5000)
* chore: rename change to modify

* chore: proto rev
2024-11-15 06:58:26 +00:00
Lanqing Yang
cdba7b442f feat: implement statement/execution timeout session variable (#4792)
* support set and show on statement/execution timeout session variables.

* implement statement timeout for mysql read, and postgres queries

* add mysql test with max execution time
2024-11-15 06:19:39 +00:00
Lin Yihai
42bf7e9965 refactor: Avoid wrapping Option for CacheManagerRef (#4996) 2024-11-14 13:37:02 +00:00
discord9
a70b4d7eba chore: update greptime-proto to e1070a (#4992)
* chore: update protot to e1070a

* fix: update proto usage
2024-11-14 11:59:00 +00:00
Zhenchi
408013c22b feat: add distance functions (#4987)
* feat: add distance functions

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: f64 instead

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* tiny adjust

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2024-11-14 10:18:58 +00:00
zyy17
22c8a7656b chore: update cluster dashboard (#4995) 2024-11-14 10:18:50 +00:00
discord9
35898f0b2e test: more sqlness tests for flow (#4988)
* tests: more flow testcase

* tests(WIP): more tests

* tests: more flow tests

* test: wired regex for sqlness

* refactor: put blog&example to two files
2024-11-14 07:40:14 +00:00
zyy17
1101e98651 refactor(grafana): update cluster dashboard (#4980) 2024-11-14 07:12:30 +00:00
zyy17
0089cf1b4f fix: run install.sh error (#4989)
* fix: use '/bin/sh' shebang and remove function key word

* ci: check install.sh in nightly CI
2024-11-13 21:54:24 +00:00
218 changed files with 15318 additions and 5931 deletions

14
.github/scripts/check-install-script.sh vendored Executable file
View File

@@ -0,0 +1,14 @@
#!/bin/sh
set -e
# Get the latest version of github.com/GreptimeTeam/greptimedb
VERSION=$(curl -s https://api.github.com/repos/GreptimeTeam/greptimedb/releases/latest | jq -r '.tag_name')
echo "Downloading the latest version: $VERSION"
# Download the install script
curl -fsSL https://raw.githubusercontent.com/greptimeteam/greptimedb/main/scripts/install.sh | sh -s $VERSION
# Execute the `greptime` command
./greptime --version

View File

@@ -22,6 +22,10 @@ jobs:
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Check install.sh
run: ./.github/scripts/check-install-script.sh
- name: Run sqlness test
uses: ./.github/actions/sqlness-test
with:

View File

@@ -91,7 +91,7 @@ env:
# The scheduled version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-YYYYMMDD', like v0.2.0-nigthly-20230313;
NIGHTLY_RELEASE_PREFIX: nightly
# Note: The NEXT_RELEASE_VERSION should be modified manually by every formal release.
NEXT_RELEASE_VERSION: v0.10.0
NEXT_RELEASE_VERSION: v0.11.0
# Permission reference: https://docs.github.com/en/actions/using-jobs/assigning-permissions-to-jobs
permissions:

View File

@@ -17,6 +17,6 @@ repos:
- id: fmt
- id: clippy
args: ["--workspace", "--all-targets", "--all-features", "--", "-D", "warnings"]
stages: [push]
stages: [pre-push]
- id: cargo-check
args: ["--workspace", "--all-targets", "--all-features"]

223
Cargo.lock generated
View File

@@ -208,7 +208,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
[[package]]
name = "api"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"common-base",
"common-decimal",
@@ -769,7 +769,7 @@ dependencies = [
[[package]]
name = "auth"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"api",
"async-trait",
@@ -1041,7 +1041,7 @@ dependencies = [
"bitflags 2.6.0",
"cexpr",
"clang-sys",
"itertools 0.10.5",
"itertools 0.12.1",
"lazy_static",
"lazycell",
"log",
@@ -1379,7 +1379,7 @@ dependencies = [
[[package]]
name = "cache"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"catalog",
"common-error",
@@ -1387,7 +1387,7 @@ dependencies = [
"common-meta",
"moka",
"snafu 0.8.5",
"substrait 0.9.5",
"substrait 0.10.2",
]
[[package]]
@@ -1414,7 +1414,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "catalog"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"api",
"arrow",
@@ -1753,7 +1753,7 @@ checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97"
[[package]]
name = "client"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"api",
"arc-swap",
@@ -1782,8 +1782,8 @@ dependencies = [
"rand",
"serde_json",
"snafu 0.8.5",
"substrait 0.10.2",
"substrait 0.37.3",
"substrait 0.9.5",
"tokio",
"tokio-stream",
"tonic 0.11.0",
@@ -1823,7 +1823,7 @@ dependencies = [
[[package]]
name = "cmd"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"async-trait",
"auth",
@@ -1859,6 +1859,7 @@ dependencies = [
"frontend",
"futures",
"human-panic",
"humantime",
"lazy_static",
"meta-client",
"meta-srv",
@@ -1881,7 +1882,7 @@ dependencies = [
"similar-asserts",
"snafu 0.8.5",
"store-api",
"substrait 0.9.5",
"substrait 0.10.2",
"table",
"temp-env",
"tempfile",
@@ -1927,7 +1928,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
[[package]]
name = "common-base"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"anymap2",
"async-trait",
@@ -1948,7 +1949,7 @@ dependencies = [
[[package]]
name = "common-catalog"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"chrono",
"common-error",
@@ -1959,7 +1960,7 @@ dependencies = [
[[package]]
name = "common-config"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"common-base",
"common-error",
@@ -1982,7 +1983,7 @@ dependencies = [
[[package]]
name = "common-datasource"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"arrow",
"arrow-schema",
@@ -2019,7 +2020,7 @@ dependencies = [
[[package]]
name = "common-decimal"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"bigdecimal 0.4.5",
"common-error",
@@ -2032,7 +2033,7 @@ dependencies = [
[[package]]
name = "common-error"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"snafu 0.8.5",
"strum 0.25.0",
@@ -2041,7 +2042,7 @@ dependencies = [
[[package]]
name = "common-frontend"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"api",
"async-trait",
@@ -2056,9 +2057,10 @@ dependencies = [
[[package]]
name = "common-function"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"api",
"approx 0.5.1",
"arc-swap",
"async-trait",
"common-base",
@@ -2079,6 +2081,7 @@ dependencies = [
"geohash",
"h3o",
"jsonb",
"nalgebra 0.33.2",
"num",
"num-traits",
"once_cell",
@@ -2099,7 +2102,7 @@ dependencies = [
[[package]]
name = "common-greptimedb-telemetry"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"async-trait",
"common-runtime",
@@ -2116,7 +2119,7 @@ dependencies = [
[[package]]
name = "common-grpc"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"api",
"arrow-flight",
@@ -2142,7 +2145,7 @@ dependencies = [
[[package]]
name = "common-grpc-expr"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"api",
"common-base",
@@ -2161,7 +2164,7 @@ dependencies = [
[[package]]
name = "common-macro"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"arc-swap",
"common-query",
@@ -2175,7 +2178,7 @@ dependencies = [
[[package]]
name = "common-mem-prof"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"common-error",
"common-macro",
@@ -2188,7 +2191,7 @@ dependencies = [
[[package]]
name = "common-meta"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"anymap2",
"api",
@@ -2245,7 +2248,7 @@ dependencies = [
[[package]]
name = "common-options"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"common-grpc",
"humantime-serde",
@@ -2254,11 +2257,11 @@ dependencies = [
[[package]]
name = "common-plugins"
version = "0.9.5"
version = "0.10.2"
[[package]]
name = "common-pprof"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"common-error",
"common-macro",
@@ -2270,7 +2273,7 @@ dependencies = [
[[package]]
name = "common-procedure"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"async-stream",
"async-trait",
@@ -2297,7 +2300,7 @@ dependencies = [
[[package]]
name = "common-procedure-test"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"async-trait",
"common-procedure",
@@ -2305,7 +2308,7 @@ dependencies = [
[[package]]
name = "common-query"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"api",
"async-trait",
@@ -2331,7 +2334,7 @@ dependencies = [
[[package]]
name = "common-recordbatch"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"arc-swap",
"common-error",
@@ -2350,7 +2353,7 @@ dependencies = [
[[package]]
name = "common-runtime"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -2380,7 +2383,7 @@ dependencies = [
[[package]]
name = "common-telemetry"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"atty",
"backtrace",
@@ -2408,7 +2411,7 @@ dependencies = [
[[package]]
name = "common-test-util"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"client",
"common-query",
@@ -2420,7 +2423,7 @@ dependencies = [
[[package]]
name = "common-time"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"arrow",
"chrono",
@@ -2436,7 +2439,7 @@ dependencies = [
[[package]]
name = "common-version"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"build-data",
"const_format",
@@ -2447,7 +2450,7 @@ dependencies = [
[[package]]
name = "common-wal"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"common-base",
"common-error",
@@ -3256,7 +3259,7 @@ dependencies = [
[[package]]
name = "datanode"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"api",
"arrow-flight",
@@ -3306,7 +3309,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"store-api",
"substrait 0.9.5",
"substrait 0.10.2",
"table",
"tokio",
"toml 0.8.19",
@@ -3315,7 +3318,7 @@ dependencies = [
[[package]]
name = "datatypes"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"arrow",
"arrow-array",
@@ -3933,7 +3936,7 @@ dependencies = [
[[package]]
name = "file-engine"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"api",
"async-trait",
@@ -4050,7 +4053,7 @@ checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8"
[[package]]
name = "flow"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"api",
"arrow",
@@ -4107,7 +4110,7 @@ dependencies = [
"snafu 0.8.5",
"store-api",
"strum 0.25.0",
"substrait 0.9.5",
"substrait 0.10.2",
"table",
"tokio",
"tonic 0.11.0",
@@ -4169,7 +4172,7 @@ checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
[[package]]
name = "frontend"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"api",
"arc-swap",
@@ -4594,7 +4597,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=75c5fb569183bb3d0fa1023df9c2214df722b9b1#75c5fb569183bb3d0fa1023df9c2214df722b9b1"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a875e976441188028353f7274a46a7e6e065c5d4#a875e976441188028353f7274a46a7e6e065c5d4"
dependencies = [
"prost 0.12.6",
"serde",
@@ -5090,7 +5093,7 @@ dependencies = [
"httpdate",
"itoa",
"pin-project-lite",
"socket2 0.4.10",
"socket2 0.5.7",
"tokio",
"tower-service",
"tracing",
@@ -5309,7 +5312,7 @@ dependencies = [
[[package]]
name = "index"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"async-trait",
"asynchronous-codec",
@@ -6080,7 +6083,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4"
dependencies = [
"cfg-if",
"windows-targets 0.48.5",
"windows-targets 0.52.6",
]
[[package]]
@@ -6153,7 +6156,7 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
[[package]]
name = "log-store"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"async-stream",
"async-trait",
@@ -6191,6 +6194,16 @@ dependencies = [
"uuid",
]
[[package]]
name = "loki-api"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "674883a98273598ac3aad4301724c56734bea90574c5033af067e8f9fb5eb399"
dependencies = [
"prost 0.12.6",
"prost-types 0.12.6",
]
[[package]]
name = "lrlex"
version = "0.13.7"
@@ -6473,7 +6486,7 @@ dependencies = [
[[package]]
name = "meta-client"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"api",
"async-trait",
@@ -6500,7 +6513,7 @@ dependencies = [
[[package]]
name = "meta-srv"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"api",
"async-trait",
@@ -6579,7 +6592,7 @@ dependencies = [
[[package]]
name = "metric-engine"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"api",
"aquamarine",
@@ -6682,7 +6695,7 @@ dependencies = [
[[package]]
name = "mito2"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"api",
"aquamarine",
@@ -7070,13 +7083,29 @@ checksum = "d506eb7e08d6329505faa8a3a00a5dcc6de9f76e0c77e4b75763ae3c770831ff"
dependencies = [
"approx 0.5.1",
"matrixmultiply",
"nalgebra-macros",
"nalgebra-macros 0.1.0",
"num-complex",
"num-rational",
"num-traits",
"rand",
"rand_distr",
"simba",
"simba 0.6.0",
"typenum",
]
[[package]]
name = "nalgebra"
version = "0.33.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26aecdf64b707efd1310e3544d709c5c0ac61c13756046aaaba41be5c4f66a3b"
dependencies = [
"approx 0.5.1",
"matrixmultiply",
"nalgebra-macros 0.2.2",
"num-complex",
"num-rational",
"num-traits",
"simba 0.9.0",
"typenum",
]
@@ -7091,6 +7120,17 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "nalgebra-macros"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "254a5372af8fc138e36684761d3c0cdb758a4410e938babcff1c860ce14ddbfc"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.79",
]
[[package]]
name = "named_pipe"
version = "0.4.1"
@@ -7419,7 +7459,7 @@ dependencies = [
[[package]]
name = "object-store"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"anyhow",
"bytes",
@@ -7710,9 +7750,10 @@ dependencies = [
[[package]]
name = "operator"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"api",
"async-stream",
"async-trait",
"catalog",
"chrono",
@@ -7756,7 +7797,7 @@ dependencies = [
"sql",
"sqlparser 0.45.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=54a267ac89c09b11c0c88934690530807185d3e7)",
"store-api",
"substrait 0.9.5",
"substrait 0.10.2",
"table",
"tokio",
"tokio-util",
@@ -8006,7 +8047,7 @@ dependencies = [
[[package]]
name = "partition"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"api",
"async-trait",
@@ -8307,7 +8348,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pipeline"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"ahash 0.8.11",
"api",
@@ -8470,7 +8511,7 @@ dependencies = [
[[package]]
name = "plugins"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"auth",
"common-base",
@@ -8744,7 +8785,7 @@ dependencies = [
[[package]]
name = "promql"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"ahash 0.8.11",
"async-trait",
@@ -8822,7 +8863,7 @@ checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4"
dependencies = [
"bytes",
"heck 0.5.0",
"itertools 0.10.5",
"itertools 0.12.1",
"log",
"multimap",
"once_cell",
@@ -8874,7 +8915,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1"
dependencies = [
"anyhow",
"itertools 0.10.5",
"itertools 0.12.1",
"proc-macro2",
"quote",
"syn 2.0.79",
@@ -8982,7 +9023,7 @@ dependencies = [
[[package]]
name = "puffin"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"async-compression 0.4.13",
"async-trait",
@@ -9036,7 +9077,7 @@ dependencies = [
"indoc",
"libc",
"memoffset 0.9.1",
"parking_lot 0.11.2",
"parking_lot 0.12.3",
"portable-atomic",
"pyo3-build-config",
"pyo3-ffi",
@@ -9106,7 +9147,7 @@ dependencies = [
[[package]]
name = "query"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"ahash 0.8.11",
"api",
@@ -9173,7 +9214,7 @@ dependencies = [
"stats-cli",
"store-api",
"streaming-stats",
"substrait 0.9.5",
"substrait 0.10.2",
"table",
"tokio",
"tokio-stream",
@@ -10636,7 +10677,7 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "script"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"api",
"arc-swap",
@@ -10930,7 +10971,7 @@ dependencies = [
[[package]]
name = "servers"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"ahash 0.8.11",
"aide",
@@ -10974,17 +11015,21 @@ dependencies = [
"datatypes",
"derive_builder 0.12.0",
"futures",
"futures-util",
"hashbrown 0.14.5",
"headers 0.3.9",
"hostname",
"http 0.2.12",
"http-body 0.4.6",
"humantime",
"humantime-serde",
"hyper 0.14.30",
"influxdb_line_protocol",
"itertools 0.10.5",
"json5",
"jsonb",
"lazy_static",
"loki-api",
"mime_guess",
"mysql_async",
"notify",
@@ -11041,7 +11086,7 @@ dependencies = [
[[package]]
name = "session"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"api",
"arc-swap",
@@ -11160,6 +11205,19 @@ dependencies = [
"wide",
]
[[package]]
name = "simba"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3a386a501cd104797982c15ae17aafe8b9261315b5d07e3ec803f2ea26be0fa"
dependencies = [
"approx 0.5.1",
"num-complex",
"num-traits",
"paste",
"wide",
]
[[package]]
name = "simdutf8"
version = "0.1.5"
@@ -11374,7 +11432,7 @@ dependencies = [
[[package]]
name = "sql"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"api",
"chrono",
@@ -11393,6 +11451,7 @@ dependencies = [
"datafusion-sql",
"datatypes",
"hex",
"humantime",
"iso8601",
"itertools 0.10.5",
"jsonb",
@@ -11402,6 +11461,7 @@ dependencies = [
"snafu 0.8.5",
"sqlparser 0.45.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=54a267ac89c09b11c0c88934690530807185d3e7)",
"sqlparser_derive 0.1.1",
"store-api",
"table",
]
@@ -11435,7 +11495,7 @@ dependencies = [
[[package]]
name = "sqlness-runner"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -11638,7 +11698,7 @@ checksum = "b35a062dbadac17a42e0fc64c27f419b25d6fae98572eb43c8814c9e873d7721"
dependencies = [
"approx 0.5.1",
"lazy_static",
"nalgebra",
"nalgebra 0.29.0",
"num-traits",
"rand",
]
@@ -11655,7 +11715,7 @@ dependencies = [
[[package]]
name = "store-api"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"api",
"aquamarine",
@@ -11826,7 +11886,7 @@ dependencies = [
[[package]]
name = "substrait"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"async-trait",
"bytes",
@@ -12025,7 +12085,7 @@ dependencies = [
[[package]]
name = "table"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"api",
"async-trait",
@@ -12291,7 +12351,7 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
[[package]]
name = "tests-fuzz"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"arbitrary",
"async-trait",
@@ -12333,7 +12393,7 @@ dependencies = [
[[package]]
name = "tests-integration"
version = "0.9.5"
version = "0.10.2"
dependencies = [
"api",
"arrow-flight",
@@ -12373,6 +12433,7 @@ dependencies = [
"futures-util",
"hex",
"itertools 0.10.5",
"loki-api",
"meta-client",
"meta-srv",
"moka",
@@ -12396,7 +12457,7 @@ dependencies = [
"sql",
"sqlx",
"store-api",
"substrait 0.9.5",
"substrait 0.10.2",
"table",
"tempfile",
"time",
@@ -13981,7 +14042,7 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
dependencies = [
"windows-sys 0.48.0",
"windows-sys 0.59.0",
]
[[package]]

View File

@@ -66,7 +66,7 @@ members = [
resolver = "2"
[workspace.package]
version = "0.9.5"
version = "0.10.2"
edition = "2021"
license = "Apache-2.0"
@@ -122,7 +122,7 @@ etcd-client = "0.13"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "75c5fb569183bb3d0fa1023df9c2214df722b9b1" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a875e976441188028353f7274a46a7e6e065c5d4" }
hex = "0.4"
humantime = "2.1"
humantime-serde = "1.1"

View File

@@ -109,6 +109,11 @@
| `storage.sas_token` | String | Unset | The sas token of the azure account.<br/>**It's only used when the storage type is `Azblob`**. |
| `storage.endpoint` | String | Unset | The endpoint of the S3 service.<br/>**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. |
| `storage.region` | String | Unset | The region of the S3 service.<br/>**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. |
| `storage.http_client` | -- | -- | The http client options to the storage.<br/>**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. |
| `storage.http_client.pool_max_idle_per_host` | Integer | `1024` | The maximum idle connection per host allowed in the pool. |
| `storage.http_client.connect_timeout` | String | `30s` | The timeout for only the connect phase of a http client. |
| `storage.http_client.timeout` | String | `30s` | The total request timeout, applied from when the request starts connecting until the response body has finished.<br/>Also considered a total deadline. |
| `storage.http_client.pool_idle_timeout` | String | `90s` | The timeout for idle sockets being kept-alive. |
| `[[region_engine]]` | -- | -- | The region engine options. You can configure multiple region engines. |
| `region_engine.mito` | -- | -- | The Mito engine options. |
| `region_engine.mito.num_workers` | Integer | `8` | Number of region workers. |
@@ -432,6 +437,11 @@
| `storage.sas_token` | String | Unset | The sas token of the azure account.<br/>**It's only used when the storage type is `Azblob`**. |
| `storage.endpoint` | String | Unset | The endpoint of the S3 service.<br/>**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. |
| `storage.region` | String | Unset | The region of the S3 service.<br/>**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. |
| `storage.http_client` | -- | -- | The http client options to the storage.<br/>**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. |
| `storage.http_client.pool_max_idle_per_host` | Integer | `1024` | The maximum idle connection per host allowed in the pool. |
| `storage.http_client.connect_timeout` | String | `30s` | The timeout for only the connect phase of a http client. |
| `storage.http_client.timeout` | String | `30s` | The total request timeout, applied from when the request starts connecting until the response body has finished.<br/>Also considered a total deadline. |
| `storage.http_client.pool_idle_timeout` | String | `90s` | The timeout for idle sockets being kept-alive. |
| `[[region_engine]]` | -- | -- | The region engine options. You can configure multiple region engines. |
| `region_engine.mito` | -- | -- | The Mito engine options. |
| `region_engine.mito.num_workers` | Integer | `8` | Number of region workers. |

View File

@@ -375,6 +375,23 @@ endpoint = "https://s3.amazonaws.com"
## @toml2docs:none-default
region = "us-west-2"
## The http client options to the storage.
## **It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**.
[storage.http_client]
## The maximum idle connection per host allowed in the pool.
pool_max_idle_per_host = 1024
## The timeout for only the connect phase of a http client.
connect_timeout = "30s"
## The total request timeout, applied from when the request starts connecting until the response body has finished.
## Also considered a total deadline.
timeout = "30s"
## The timeout for idle sockets being kept-alive.
pool_idle_timeout = "90s"
# Custom storage options
# [[storage.providers]]
# name = "S3"

View File

@@ -413,6 +413,23 @@ endpoint = "https://s3.amazonaws.com"
## @toml2docs:none-default
region = "us-west-2"
## The http client options to the storage.
## **It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**.
[storage.http_client]
## The maximum idle connection per host allowed in the pool.
pool_max_idle_per_host = 1024
## The timeout for only the connect phase of a http client.
connect_timeout = "30s"
## The total request timeout, applied from when the request starts connecting until the response body has finished.
## Also considered a total deadline.
timeout = "30s"
## The timeout for idle sockets being kept-alive.
pool_idle_timeout = "90s"
# Custom storage options
# [[storage.providers]]
# name = "S3"

View File

@@ -49,7 +49,6 @@ RUN yum install -y epel-release \
WORKDIR /greptime
COPY --from=builder /out/target/${OUTPUT_DIR}/greptime /greptime/bin/
COPY --from=builder /out/target/${OUTPUT_DIR}/build/tikv-jemalloc-sys-*/out/build/bin/jeprof /greptime/bin/
ENV PATH /greptime/bin/:$PATH
ENTRYPOINT ["greptime"]

View File

@@ -57,9 +57,6 @@ RUN python3 -m pip install -r /etc/greptime/requirements.txt
WORKDIR /greptime
COPY --from=builder /out/target/${OUTPUT_DIR}/greptime /greptime/bin/
COPY --from=builder /out/target/${OUTPUT_DIR}/build/tikv-jemalloc-sys-*/out/build/bin/jeprof /greptime/bin/
RUN chmod +x /greptime/bin/jeprof
ENV PATH /greptime/bin/:$PATH
ENTRYPOINT ["greptime"]

View File

@@ -4,18 +4,6 @@ This crate provides an easy approach to dump memory profiling info.
## Prerequisites
### jemalloc
jeprof is already compiled in the target directory of GreptimeDB. You can find the binary and use it.
```
# find jeprof binary
find . -name 'jeprof'
# add executable permission
chmod +x <path_to_jeprof>
```
The path is usually under `./target/${PROFILE}/build/tikv-jemalloc-sys-${HASH}/out/build/bin/jeprof`.
The default version of jemalloc installed from the package manager may not have the `--collapsed` option.
You may need to check the whether the `jeprof` version is >= `5.3.0` if you want to install it from the package manager.
```bash
# for macOS
brew install jemalloc

View File

@@ -5,6 +5,13 @@ GreptimeDB's official Grafana dashboard.
Status notify: we are still working on this config. It's expected to change frequently in the recent days. Please feel free to submit your feedback and/or contribution to this dashboard 🤗
If you use Helm [chart](https://github.com/GreptimeTeam/helm-charts) to deploy GreptimeDB cluster, you can enable self-monitoring by setting the following values in your Helm chart:
- `monitoring.enabled=true`: Deploys a standalone GreptimeDB instance dedicated to monitoring the cluster;
- `grafana.enabled=true`: Deploys Grafana and automatically imports the monitoring dashboard;
The standalone GreptimeDB instance will collect metrics from your cluster and the dashboard will be available in the Grafana UI. For detailed deployment instructions, please refer to our [Kubernetes deployment guide](https://docs.greptime.com/nightly/user-guide/deployments/deploy-on-kubernetes/getting-started).
# How to use
## `greptimedb.json`

File diff suppressed because it is too large Load Diff

View File

@@ -1,4 +1,4 @@
#!/usr/bin/env bash
#!/bin/sh
set -ue
@@ -15,7 +15,7 @@ GITHUB_ORG=GreptimeTeam
GITHUB_REPO=greptimedb
BIN=greptime
function get_os_type() {
get_os_type() {
os_type="$(uname -s)"
case "$os_type" in
@@ -31,7 +31,7 @@ function get_os_type() {
esac
}
function get_arch_type() {
get_arch_type() {
arch_type="$(uname -m)"
case "$arch_type" in
@@ -53,7 +53,7 @@ function get_arch_type() {
esac
}
function download_artifact() {
download_artifact() {
if [ -n "${OS_TYPE}" ] && [ -n "${ARCH_TYPE}" ]; then
# Use the latest stable released version.
# GitHub API reference: https://docs.github.com/en/rest/releases/releases?apiVersion=2022-11-28#get-the-latest-release.

View File

@@ -527,13 +527,14 @@ fn ddl_request_type(request: &DdlRequest) -> &'static str {
match request.expr {
Some(Expr::CreateDatabase(_)) => "ddl.create_database",
Some(Expr::CreateTable(_)) => "ddl.create_table",
Some(Expr::Alter(_)) => "ddl.alter",
Some(Expr::AlterTable(_)) => "ddl.alter_table",
Some(Expr::DropTable(_)) => "ddl.drop_table",
Some(Expr::TruncateTable(_)) => "ddl.truncate_table",
Some(Expr::CreateFlow(_)) => "ddl.create_flow",
Some(Expr::DropFlow(_)) => "ddl.drop_flow",
Some(Expr::CreateView(_)) => "ddl.create_view",
Some(Expr::DropView(_)) => "ddl.drop_view",
Some(Expr::AlterDatabase(_)) => "ddl.alter_database",
None => "ddl.empty",
}
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub use client::{CachedMetaKvBackend, CachedMetaKvBackendBuilder, MetaKvBackend};
pub use client::{CachedKvBackend, CachedKvBackendBuilder, MetaKvBackend};
mod client;
mod manager;

View File

@@ -22,6 +22,7 @@ use common_error::ext::BoxedError;
use common_meta::cache_invalidator::KvCacheInvalidator;
use common_meta::error::Error::CacheNotGet;
use common_meta::error::{CacheNotGetSnafu, Error, ExternalSnafu, GetKvCacheSnafu, Result};
use common_meta::kv_backend::txn::{Txn, TxnResponse};
use common_meta::kv_backend::{KvBackend, KvBackendRef, TxnService};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
@@ -42,20 +43,20 @@ const DEFAULT_CACHE_MAX_CAPACITY: u64 = 10000;
const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(10 * 60);
const DEFAULT_CACHE_TTI: Duration = Duration::from_secs(5 * 60);
pub struct CachedMetaKvBackendBuilder {
pub struct CachedKvBackendBuilder {
cache_max_capacity: Option<u64>,
cache_ttl: Option<Duration>,
cache_tti: Option<Duration>,
meta_client: Arc<MetaClient>,
inner: KvBackendRef,
}
impl CachedMetaKvBackendBuilder {
pub fn new(meta_client: Arc<MetaClient>) -> Self {
impl CachedKvBackendBuilder {
pub fn new(inner: KvBackendRef) -> Self {
Self {
cache_max_capacity: None,
cache_ttl: None,
cache_tti: None,
meta_client,
inner,
}
}
@@ -74,7 +75,7 @@ impl CachedMetaKvBackendBuilder {
self
}
pub fn build(self) -> CachedMetaKvBackend {
pub fn build(self) -> CachedKvBackend {
let cache_max_capacity = self
.cache_max_capacity
.unwrap_or(DEFAULT_CACHE_MAX_CAPACITY);
@@ -85,14 +86,11 @@ impl CachedMetaKvBackendBuilder {
.time_to_live(cache_ttl)
.time_to_idle(cache_tti)
.build();
let kv_backend = Arc::new(MetaKvBackend {
client: self.meta_client,
});
let kv_backend = self.inner;
let name = format!("CachedKvBackend({})", kv_backend.name());
let version = AtomicUsize::new(0);
CachedMetaKvBackend {
CachedKvBackend {
kv_backend,
cache,
name,
@@ -112,19 +110,29 @@ pub type CacheBackend = Cache<Vec<u8>, KeyValue>;
/// Therefore, it is recommended to use CachedMetaKvBackend to only read metadata related
/// information. Note: If you read other information, you may read expired data, which depends on
/// TTL and TTI for cache.
pub struct CachedMetaKvBackend {
pub struct CachedKvBackend {
kv_backend: KvBackendRef,
cache: CacheBackend,
name: String,
version: AtomicUsize,
}
impl TxnService for CachedMetaKvBackend {
#[async_trait::async_trait]
impl TxnService for CachedKvBackend {
type Error = Error;
async fn txn(&self, txn: Txn) -> std::result::Result<TxnResponse, Self::Error> {
// TODO(hl): txn of CachedKvBackend simply pass through to inner backend without invalidating caches.
self.kv_backend.txn(txn).await
}
fn max_txn_ops(&self) -> usize {
self.kv_backend.max_txn_ops()
}
}
#[async_trait::async_trait]
impl KvBackend for CachedMetaKvBackend {
impl KvBackend for CachedKvBackend {
fn name(&self) -> &str {
&self.name
}
@@ -305,7 +313,7 @@ impl KvBackend for CachedMetaKvBackend {
}
#[async_trait::async_trait]
impl KvCacheInvalidator for CachedMetaKvBackend {
impl KvCacheInvalidator for CachedKvBackend {
async fn invalidate_key(&self, key: &[u8]) {
self.create_new_version();
self.cache.invalidate(key).await;
@@ -313,7 +321,7 @@ impl KvCacheInvalidator for CachedMetaKvBackend {
}
}
impl CachedMetaKvBackend {
impl CachedKvBackend {
// only for test
#[cfg(test)]
fn wrap(kv_backend: KvBackendRef) -> Self {
@@ -466,7 +474,7 @@ mod tests {
use common_meta::rpc::KeyValue;
use dashmap::DashMap;
use super::CachedMetaKvBackend;
use super::CachedKvBackend;
#[derive(Default)]
pub struct SimpleKvBackend {
@@ -540,7 +548,7 @@ mod tests {
async fn test_cached_kv_backend() {
let simple_kv = Arc::new(SimpleKvBackend::default());
let get_execute_times = simple_kv.get_execute_times.clone();
let cached_kv = CachedMetaKvBackend::wrap(simple_kv);
let cached_kv = CachedKvBackend::wrap(simple_kv);
add_some_vals(&cached_kv).await;

View File

@@ -180,7 +180,7 @@ impl InformationSchemaSchemataBuilder {
.context(TableMetadataManagerSnafu)?
// information_schema is not available from this
// table_metadata_manager and we return None
.map(|schema_opts| format!("{schema_opts}"))
.map(|schema_opts| format!("{}", schema_opts.into_inner()))
} else {
None
};

View File

@@ -18,7 +18,7 @@ use api::v1::greptime_database_client::GreptimeDatabaseClient;
use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
use api::v1::{
AlterExpr, AuthHeader, CreateTableExpr, DdlRequest, GreptimeRequest, InsertRequests,
AlterTableExpr, AuthHeader, CreateTableExpr, DdlRequest, GreptimeRequest, InsertRequests,
QueryRequest, RequestHeader,
};
use arrow_flight::Ticket;
@@ -211,9 +211,9 @@ impl Database {
.await
}
pub async fn alter(&self, expr: AlterExpr) -> Result<Output> {
pub async fn alter(&self, expr: AlterTableExpr) -> Result<Output> {
self.do_get(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::Alter(expr)),
expr: Some(DdlExpr::AlterTable(expr)),
}))
.await
}

View File

@@ -53,6 +53,7 @@ flow.workspace = true
frontend = { workspace = true, default-features = false }
futures.workspace = true
human-panic = "2.0"
humantime.workspace = true
lazy_static.workspace = true
meta-client.workspace = true
meta-srv.workspace = true

View File

@@ -12,11 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use base64::engine::general_purpose;
use base64::Engine;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use humantime::format_duration;
use serde_json::Value;
use servers::http::greptime_result_v1::GreptimedbV1Response;
use servers::http::header::constants::GREPTIME_DB_HEADER_TIMEOUT;
use servers::http::result::greptime_result_v1::GreptimedbV1Response;
use servers::http::GreptimeQueryOutput;
use snafu::ResultExt;
@@ -26,10 +30,16 @@ pub(crate) struct DatabaseClient {
addr: String,
catalog: String,
auth_header: Option<String>,
timeout: Duration,
}
impl DatabaseClient {
pub fn new(addr: String, catalog: String, auth_basic: Option<String>) -> Self {
pub fn new(
addr: String,
catalog: String,
auth_basic: Option<String>,
timeout: Duration,
) -> Self {
let auth_header = if let Some(basic) = auth_basic {
let encoded = general_purpose::STANDARD.encode(basic);
Some(format!("basic {}", encoded))
@@ -41,6 +51,7 @@ impl DatabaseClient {
addr,
catalog,
auth_header,
timeout,
}
}
@@ -63,6 +74,11 @@ impl DatabaseClient {
request = request.header("Authorization", auth);
}
request = request.header(
GREPTIME_DB_HEADER_TIMEOUT,
format_duration(self.timeout).to_string(),
);
let response = request.send().await.with_context(|_| HttpQuerySqlSnafu {
reason: format!("bad url: {}", url),
})?;

View File

@@ -15,6 +15,7 @@
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use clap::{Parser, ValueEnum};
@@ -83,14 +84,26 @@ pub struct ExportCommand {
/// The basic authentication for connecting to the server
#[clap(long)]
auth_basic: Option<String>,
/// The timeout of invoking the database.
///
/// It is used to override the server-side timeout setting.
/// The default behavior will disable server-side default timeout(i.e. `0s`).
#[clap(long, value_parser = humantime::parse_duration)]
timeout: Option<Duration>,
}
impl ExportCommand {
pub async fn build(&self, guard: Vec<WorkerGuard>) -> Result<Instance> {
let (catalog, schema) = database::split_database(&self.database)?;
let database_client =
DatabaseClient::new(self.addr.clone(), catalog.clone(), self.auth_basic.clone());
let database_client = DatabaseClient::new(
self.addr.clone(),
catalog.clone(),
self.auth_basic.clone(),
// Treats `None` as `0s` to disable server-side default timeout.
self.timeout.unwrap_or_default(),
);
Ok(Instance::new(
Box::new(Export {

View File

@@ -14,6 +14,7 @@
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use clap::{Parser, ValueEnum};
@@ -68,13 +69,25 @@ pub struct ImportCommand {
/// The basic authentication for connecting to the server
#[clap(long)]
auth_basic: Option<String>,
/// The timeout of invoking the database.
///
/// It is used to override the server-side timeout setting.
/// The default behavior will disable server-side default timeout(i.e. `0s`).
#[clap(long, value_parser = humantime::parse_duration)]
timeout: Option<Duration>,
}
impl ImportCommand {
pub async fn build(&self, guard: Vec<WorkerGuard>) -> Result<Instance> {
let (catalog, schema) = database::split_database(&self.database)?;
let database_client =
DatabaseClient::new(self.addr.clone(), catalog.clone(), self.auth_basic.clone());
let database_client = DatabaseClient::new(
self.addr.clone(),
catalog.clone(),
self.auth_basic.clone(),
// Treats `None` as `0s` to disable server-side default timeout.
self.timeout.unwrap_or_default(),
);
Ok(Instance::new(
Box::new(Import {

View File

@@ -21,13 +21,14 @@ use cache::{
TABLE_ROUTE_CACHE_NAME,
};
use catalog::kvbackend::{
CachedMetaKvBackend, CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend,
CachedKvBackend, CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend,
};
use client::{Client, Database, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_base::Plugins;
use common_config::Mode;
use common_error::ext::ErrorExt;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::kv_backend::KvBackendRef;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::debug;
@@ -258,8 +259,9 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
.context(StartMetaClientSnafu)?;
let meta_client = Arc::new(meta_client);
let cached_meta_backend =
Arc::new(CachedMetaKvBackendBuilder::new(meta_client.clone()).build());
let cached_meta_backend = Arc::new(
CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone()))).build(),
);
let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
CacheRegistryBuilder::default()
.add_cache(cached_meta_backend.clone())

View File

@@ -15,7 +15,7 @@
use std::sync::Arc;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use clap::Parser;
use client::client_manager::NodeClients;
use common_base::Plugins;
@@ -246,11 +246,12 @@ impl StartCommand {
let cache_tti = meta_config.metadata_cache_tti;
// TODO(discord9): add helper function to ease the creation of cache registry&such
let cached_meta_backend = CachedMetaKvBackendBuilder::new(meta_client.clone())
.cache_max_capacity(cache_max_capacity)
.cache_ttl(cache_ttl)
.cache_tti(cache_tti)
.build();
let cached_meta_backend =
CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
.cache_max_capacity(cache_max_capacity)
.cache_ttl(cache_ttl)
.cache_tti(cache_tti)
.build();
let cached_meta_backend = Arc::new(cached_meta_backend);
// Builds cache registry

View File

@@ -17,7 +17,7 @@ use std::time::Duration;
use async_trait::async_trait;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use clap::Parser;
use client::client_manager::NodeClients;
use common_base::Plugins;
@@ -293,11 +293,12 @@ impl StartCommand {
.context(MetaClientInitSnafu)?;
// TODO(discord9): add helper function to ease the creation of cache registry&such
let cached_meta_backend = CachedMetaKvBackendBuilder::new(meta_client.clone())
.cache_max_capacity(cache_max_capacity)
.cache_ttl(cache_ttl)
.cache_tti(cache_tti)
.build();
let cached_meta_backend =
CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
.cache_max_capacity(cache_max_capacity)
.cache_ttl(cache_ttl)
.cache_tti(cache_tti)
.build();
let cached_meta_backend = Arc::new(cached_meta_backend);
// Builds cache registry

View File

@@ -43,6 +43,31 @@ lazy_static::lazy_static! {
prometheus::register_int_gauge_vec!("greptime_app_version", "app version", &["version", "short_version", "app"]).unwrap();
}
/// wait for the close signal, for unix platform it's SIGINT or SIGTERM
#[cfg(unix)]
async fn start_wait_for_close_signal() -> std::io::Result<()> {
use tokio::signal::unix::{signal, SignalKind};
let mut sigint = signal(SignalKind::interrupt())?;
let mut sigterm = signal(SignalKind::terminate())?;
tokio::select! {
_ = sigint.recv() => {
info!("Received SIGINT, shutting down");
}
_ = sigterm.recv() => {
info!("Received SIGTERM, shutting down");
}
}
Ok(())
}
/// wait for the close signal, for non-unix platform it's ctrl-c
#[cfg(not(unix))]
async fn start_wait_for_close_signal() -> std::io::Result<()> {
tokio::signal::ctrl_c().await
}
#[async_trait]
pub trait App: Send {
fn name(&self) -> &str;
@@ -69,9 +94,9 @@ pub trait App: Send {
self.start().await?;
if self.wait_signal() {
if let Err(e) = tokio::signal::ctrl_c().await {
error!(e; "Failed to listen for ctrl-c signal");
// It's unusual to fail to listen for ctrl-c signal, maybe there's something unexpected in
if let Err(e) = start_wait_for_close_signal().await {
error!(e; "Failed to listen for close signal");
// It's unusual to fail to listen for close signal, maybe there's something unexpected in
// the underlying system. So we stop the app instead of running nonetheless to let people
// investigate the issue.
}

View File

@@ -33,6 +33,7 @@ geo-types = { version = "0.7", optional = true }
geohash = { version = "0.13", optional = true }
h3o = { version = "0.6", optional = true }
jsonb.workspace = true
nalgebra = "0.33"
num = "0.4"
num-traits = "0.2"
once_cell.workspace = true
@@ -49,6 +50,7 @@ table.workspace = true
wkt = { version = "0.11", optional = true }
[dev-dependencies]
approx = "0.5"
ron = "0.7"
serde = { version = "1.0", features = ["derive"] }
tokio.workspace = true

View File

@@ -27,6 +27,7 @@ use crate::scalars::matches::MatchesFunction;
use crate::scalars::math::MathFunction;
use crate::scalars::numpy::NumpyFunction;
use crate::scalars::timestamp::TimestampFunction;
use crate::scalars::vector::VectorFunction;
use crate::system::SystemFunction;
use crate::table::TableFunction;
@@ -120,6 +121,9 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
// Json related functions
JsonFunction::register(&function_registry);
// Vector related functions
VectorFunction::register(&function_registry);
// Geo functions
#[cfg(feature = "geo")]
crate::scalars::geo::GeoFunctions::register(&function_registry);

View File

@@ -21,6 +21,7 @@ pub mod json;
pub mod matches;
pub mod math;
pub mod numpy;
pub mod vector;
#[cfg(test)]
pub(crate) mod test;

View File

@@ -22,8 +22,12 @@ use datafusion::arrow::compute::kernels::cmp::gt;
use datatypes::arrow::array::AsArray;
use datatypes::arrow::compute::cast;
use datatypes::arrow::compute::kernels::zip;
use datatypes::arrow::datatypes::{DataType as ArrowDataType, Date32Type};
use datatypes::arrow::datatypes::{
DataType as ArrowDataType, Date32Type, Date64Type, TimestampMicrosecondType,
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
};
use datatypes::prelude::ConcreteDataType;
use datatypes::types::TimestampType;
use datatypes::vectors::{Helper, VectorRef};
use snafu::{ensure, ResultExt};
@@ -34,13 +38,47 @@ pub struct GreatestFunction;
const NAME: &str = "greatest";
macro_rules! gt_time_types {
($ty: ident, $columns:expr) => {{
let column1 = $columns[0].to_arrow_array();
let column2 = $columns[1].to_arrow_array();
let column1 = column1.as_primitive::<$ty>();
let column2 = column2.as_primitive::<$ty>();
let boolean_array = gt(&column1, &column2).context(ArrowComputeSnafu)?;
let result = zip::zip(&boolean_array, &column1, &column2).context(ArrowComputeSnafu)?;
Helper::try_into_vector(&result).context(error::FromArrowArraySnafu)
}};
}
impl Function for GreatestFunction {
fn name(&self) -> &str {
NAME
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::date_datatype())
fn return_type(&self, input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
ensure!(
input_types.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly two, have: {}",
input_types.len()
)
}
);
match &input_types[0] {
ConcreteDataType::String(_) => Ok(ConcreteDataType::datetime_datatype()),
ConcreteDataType::Date(_) => Ok(ConcreteDataType::date_datatype()),
ConcreteDataType::DateTime(_) => Ok(ConcreteDataType::datetime_datatype()),
ConcreteDataType::Timestamp(ts_type) => Ok(ConcreteDataType::Timestamp(*ts_type)),
_ => UnsupportedInputDataTypeSnafu {
function: NAME,
datatypes: input_types,
}
.fail(),
}
}
fn signature(&self) -> Signature {
@@ -49,6 +87,11 @@ impl Function for GreatestFunction {
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::date_datatype(),
ConcreteDataType::datetime_datatype(),
ConcreteDataType::timestamp_nanosecond_datatype(),
ConcreteDataType::timestamp_microsecond_datatype(),
ConcreteDataType::timestamp_millisecond_datatype(),
ConcreteDataType::timestamp_second_datatype(),
],
Volatility::Immutable,
)
@@ -66,27 +109,32 @@ impl Function for GreatestFunction {
);
match columns[0].data_type() {
ConcreteDataType::String(_) => {
let column1 = cast(&columns[0].to_arrow_array(), &ArrowDataType::Date32)
// Treats string as `DateTime` type.
let column1 = cast(&columns[0].to_arrow_array(), &ArrowDataType::Date64)
.context(ArrowComputeSnafu)?;
let column1 = column1.as_primitive::<Date32Type>();
let column2 = cast(&columns[1].to_arrow_array(), &ArrowDataType::Date32)
let column1 = column1.as_primitive::<Date64Type>();
let column2 = cast(&columns[1].to_arrow_array(), &ArrowDataType::Date64)
.context(ArrowComputeSnafu)?;
let column2 = column2.as_primitive::<Date32Type>();
let boolean_array = gt(&column1, &column2).context(ArrowComputeSnafu)?;
let result =
zip::zip(&boolean_array, &column1, &column2).context(ArrowComputeSnafu)?;
Ok(Helper::try_into_vector(&result).context(error::FromArrowArraySnafu)?)
}
ConcreteDataType::Date(_) => {
let column1 = columns[0].to_arrow_array();
let column1 = column1.as_primitive::<Date32Type>();
let column2 = columns[1].to_arrow_array();
let column2 = column2.as_primitive::<Date32Type>();
let column2 = column2.as_primitive::<Date64Type>();
let boolean_array = gt(&column1, &column2).context(ArrowComputeSnafu)?;
let result =
zip::zip(&boolean_array, &column1, &column2).context(ArrowComputeSnafu)?;
Ok(Helper::try_into_vector(&result).context(error::FromArrowArraySnafu)?)
}
ConcreteDataType::Date(_) => gt_time_types!(Date32Type, columns),
ConcreteDataType::DateTime(_) => gt_time_types!(Date64Type, columns),
ConcreteDataType::Timestamp(ts_type) => match ts_type {
TimestampType::Second(_) => gt_time_types!(TimestampSecondType, columns),
TimestampType::Millisecond(_) => {
gt_time_types!(TimestampMillisecondType, columns)
}
TimestampType::Microsecond(_) => {
gt_time_types!(TimestampMicrosecondType, columns)
}
TimestampType::Nanosecond(_) => {
gt_time_types!(TimestampNanosecondType, columns)
}
},
_ => UnsupportedInputDataTypeSnafu {
function: NAME,
datatypes: columns.iter().map(|c| c.data_type()).collect::<Vec<_>>(),
@@ -106,19 +154,31 @@ impl fmt::Display for GreatestFunction {
mod tests {
use std::sync::Arc;
use common_time::Date;
use datatypes::prelude::ConcreteDataType;
use datatypes::types::DateType;
use common_time::timestamp::TimeUnit;
use common_time::{Date, DateTime, Timestamp};
use datatypes::types::{
DateTimeType, DateType, TimestampMicrosecondType, TimestampMillisecondType,
TimestampNanosecondType, TimestampSecondType,
};
use datatypes::value::Value;
use datatypes::vectors::{DateVector, StringVector, Vector};
use datatypes::vectors::{
DateTimeVector, DateVector, StringVector, TimestampMicrosecondVector,
TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, Vector,
};
use paste::paste;
use super::*;
#[test]
fn test_greatest_takes_string_vector() {
let function = GreatestFunction;
assert_eq!(
function.return_type(&[]).unwrap(),
ConcreteDataType::Date(DateType)
function
.return_type(&[
ConcreteDataType::string_datatype(),
ConcreteDataType::string_datatype()
])
.unwrap(),
ConcreteDataType::DateTime(DateTimeType)
);
let columns = vec![
Arc::new(StringVector::from(vec![
@@ -132,15 +192,15 @@ mod tests {
];
let result = function.eval(FunctionContext::default(), &columns).unwrap();
let result = result.as_any().downcast_ref::<DateVector>().unwrap();
let result = result.as_any().downcast_ref::<DateTimeVector>().unwrap();
assert_eq!(result.len(), 2);
assert_eq!(
result.get(0),
Value::Date(Date::from_str_utc("2001-02-01").unwrap())
Value::DateTime(DateTime::from_str("2001-02-01 00:00:00", None).unwrap())
);
assert_eq!(
result.get(1),
Value::Date(Date::from_str_utc("2012-12-23").unwrap())
Value::DateTime(DateTime::from_str("2012-12-23 00:00:00", None).unwrap())
);
}
@@ -148,9 +208,15 @@ mod tests {
fn test_greatest_takes_date_vector() {
let function = GreatestFunction;
assert_eq!(
function.return_type(&[]).unwrap(),
function
.return_type(&[
ConcreteDataType::date_datatype(),
ConcreteDataType::date_datatype()
])
.unwrap(),
ConcreteDataType::Date(DateType)
);
let columns = vec![
Arc::new(DateVector::from_slice(vec![-1, 2])) as _,
Arc::new(DateVector::from_slice(vec![0, 1])) as _,
@@ -168,4 +234,81 @@ mod tests {
Value::Date(Date::from_str_utc("1970-01-03").unwrap())
);
}
#[test]
fn test_greatest_takes_datetime_vector() {
let function = GreatestFunction;
assert_eq!(
function
.return_type(&[
ConcreteDataType::datetime_datatype(),
ConcreteDataType::datetime_datatype()
])
.unwrap(),
ConcreteDataType::DateTime(DateTimeType)
);
let columns = vec![
Arc::new(DateTimeVector::from_slice(vec![-1, 2])) as _,
Arc::new(DateTimeVector::from_slice(vec![0, 1])) as _,
];
let result = function.eval(FunctionContext::default(), &columns).unwrap();
let result = result.as_any().downcast_ref::<DateTimeVector>().unwrap();
assert_eq!(result.len(), 2);
assert_eq!(
result.get(0),
Value::DateTime(DateTime::from_str("1970-01-01 00:00:00", None).unwrap())
);
assert_eq!(
result.get(1),
Value::DateTime(DateTime::from_str("1970-01-01 00:00:00.002", None).unwrap())
);
}
macro_rules! test_timestamp {
($type: expr,$unit: ident) => {
paste! {
#[test]
fn [<test_greatest_takes_ $unit:lower _vector>]() {
let function = GreatestFunction;
assert_eq!(
function.return_type(&[$type, $type]).unwrap(),
ConcreteDataType::Timestamp(TimestampType::$unit([<Timestamp $unit Type>]))
);
let columns = vec![
Arc::new([<Timestamp $unit Vector>]::from_slice(vec![-1, 2])) as _,
Arc::new([<Timestamp $unit Vector>]::from_slice(vec![0, 1])) as _,
];
let result = function.eval(FunctionContext::default(), &columns).unwrap();
let result = result.as_any().downcast_ref::<[<Timestamp $unit Vector>]>().unwrap();
assert_eq!(result.len(), 2);
assert_eq!(
result.get(0),
Value::Timestamp(Timestamp::new(0, TimeUnit::$unit))
);
assert_eq!(
result.get(1),
Value::Timestamp(Timestamp::new(2, TimeUnit::$unit))
);
}
}
}
}
test_timestamp!(
ConcreteDataType::timestamp_nanosecond_datatype(),
Nanosecond
);
test_timestamp!(
ConcreteDataType::timestamp_microsecond_datatype(),
Microsecond
);
test_timestamp!(
ConcreteDataType::timestamp_millisecond_datatype(),
Millisecond
);
test_timestamp!(ConcreteDataType::timestamp_second_datatype(), Second);
}

View File

@@ -0,0 +1,35 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod convert;
mod distance;
use std::sync::Arc;
use crate::function_registry::FunctionRegistry;
pub(crate) struct VectorFunction;
impl VectorFunction {
pub fn register(registry: &FunctionRegistry) {
// conversion
registry.register(Arc::new(convert::ParseVectorFunction));
registry.register(Arc::new(convert::VectorToStringFunction));
// distance
registry.register(Arc::new(distance::CosDistanceFunction));
registry.register(Arc::new(distance::DotProductFunction));
registry.register(Arc::new(distance::L2SqDistanceFunction));
}
}

View File

@@ -0,0 +1,19 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod parse_vector;
mod vector_to_string;
pub use parse_vector::ParseVectorFunction;
pub use vector_to_string::VectorToStringFunction;

View File

@@ -0,0 +1,160 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Display;
use common_query::error::{InvalidFuncArgsSnafu, InvalidVectorStringSnafu, Result};
use common_query::prelude::{Signature, Volatility};
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::types::parse_string_to_vector_type_value;
use datatypes::vectors::{BinaryVectorBuilder, MutableVector, VectorRef};
use snafu::{ensure, ResultExt};
use crate::function::{Function, FunctionContext};
const NAME: &str = "parse_vec";
#[derive(Debug, Clone, Default)]
pub struct ParseVectorFunction;
impl Function for ParseVectorFunction {
fn name(&self) -> &str {
NAME
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::binary_datatype())
}
fn signature(&self) -> Signature {
Signature::exact(
vec![ConcreteDataType::string_datatype()],
Volatility::Immutable,
)
}
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly one, have: {}",
columns.len()
),
}
);
let column = &columns[0];
let size = column.len();
let mut result = BinaryVectorBuilder::with_capacity(size);
for i in 0..size {
let value = column.get(i).as_string();
if let Some(value) = value {
let res = parse_string_to_vector_type_value(&value, None)
.context(InvalidVectorStringSnafu { vec_str: &value })?;
result.push(Some(&res));
} else {
result.push_null();
}
}
Ok(result.to_vector())
}
}
impl Display for ParseVectorFunction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", NAME.to_ascii_uppercase())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_base::bytes::Bytes;
use datatypes::value::Value;
use datatypes::vectors::StringVector;
use super::*;
#[test]
fn test_parse_vector() {
let func = ParseVectorFunction;
let input = Arc::new(StringVector::from(vec![
Some("[1.0,2.0,3.0]".to_string()),
Some("[4.0,5.0,6.0]".to_string()),
None,
]));
let result = func.eval(FunctionContext::default(), &[input]).unwrap();
let result = result.as_ref();
assert_eq!(result.len(), 3);
assert_eq!(
result.get(0),
Value::Binary(Bytes::from(
[1.0f32, 2.0, 3.0]
.iter()
.flat_map(|e| e.to_le_bytes())
.collect::<Vec<u8>>()
))
);
assert_eq!(
result.get(1),
Value::Binary(Bytes::from(
[4.0f32, 5.0, 6.0]
.iter()
.flat_map(|e| e.to_le_bytes())
.collect::<Vec<u8>>()
))
);
assert!(result.get(2).is_null());
}
#[test]
fn test_parse_vector_error() {
let func = ParseVectorFunction;
let input = Arc::new(StringVector::from(vec![
Some("[1.0,2.0,3.0]".to_string()),
Some("[4.0,5.0,6.0]".to_string()),
Some("[7.0,8.0,9.0".to_string()),
]));
let result = func.eval(FunctionContext::default(), &[input]);
assert!(result.is_err());
let input = Arc::new(StringVector::from(vec![
Some("[1.0,2.0,3.0]".to_string()),
Some("[4.0,5.0,6.0]".to_string()),
Some("7.0,8.0,9.0]".to_string()),
]));
let result = func.eval(FunctionContext::default(), &[input]);
assert!(result.is_err());
let input = Arc::new(StringVector::from(vec![
Some("[1.0,2.0,3.0]".to_string()),
Some("[4.0,5.0,6.0]".to_string()),
Some("[7.0,hello,9.0]".to_string()),
]));
let result = func.eval(FunctionContext::default(), &[input]);
assert!(result.is_err());
}
}

View File

@@ -0,0 +1,139 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Display;
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::{Signature, Volatility};
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::types::vector_type_value_to_string;
use datatypes::value::Value;
use datatypes::vectors::{MutableVector, StringVectorBuilder, VectorRef};
use snafu::ensure;
use crate::function::{Function, FunctionContext};
const NAME: &str = "vec_to_string";
#[derive(Debug, Clone, Default)]
pub struct VectorToStringFunction;
impl Function for VectorToStringFunction {
fn name(&self) -> &str {
NAME
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::string_datatype())
}
fn signature(&self) -> Signature {
Signature::exact(
vec![ConcreteDataType::binary_datatype()],
Volatility::Immutable,
)
}
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly one, have: {}",
columns.len()
),
}
);
let column = &columns[0];
let size = column.len();
let mut result = StringVectorBuilder::with_capacity(size);
for i in 0..size {
let value = column.get(i);
match value {
Value::Binary(bytes) => {
let len = bytes.len();
if len % std::mem::size_of::<f32>() != 0 {
return InvalidFuncArgsSnafu {
err_msg: format!("Invalid binary length of vector: {}", len),
}
.fail();
}
let dim = len / std::mem::size_of::<f32>();
// Safety: `dim` is calculated from the length of `bytes` and is guaranteed to be valid
let res = vector_type_value_to_string(&bytes, dim as _).unwrap();
result.push(Some(&res));
}
Value::Null => {
result.push_null();
}
_ => {
return InvalidFuncArgsSnafu {
err_msg: format!("Invalid value type: {:?}", value.data_type()),
}
.fail();
}
}
}
Ok(result.to_vector())
}
}
impl Display for VectorToStringFunction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", NAME.to_ascii_uppercase())
}
}
#[cfg(test)]
mod tests {
use datatypes::value::Value;
use datatypes::vectors::BinaryVectorBuilder;
use super::*;
#[test]
fn test_vector_to_string() {
let func = VectorToStringFunction;
let mut builder = BinaryVectorBuilder::with_capacity(3);
builder.push(Some(
[1.0f32, 2.0, 3.0]
.iter()
.flat_map(|e| e.to_le_bytes())
.collect::<Vec<_>>()
.as_slice(),
));
builder.push(Some(
[4.0f32, 5.0, 6.0]
.iter()
.flat_map(|e| e.to_le_bytes())
.collect::<Vec<_>>()
.as_slice(),
));
builder.push_null();
let vector = builder.to_vector();
let result = func.eval(FunctionContext::default(), &[vector]).unwrap();
assert_eq!(result.len(), 3);
assert_eq!(result.get(0), Value::String("[1,2,3]".to_string().into()));
assert_eq!(result.get(1), Value::String("[4,5,6]".to_string().into()));
assert_eq!(result.get(2), Value::Null);
}
}

View File

@@ -0,0 +1,482 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod cos;
mod dot;
mod l2sq;
use std::borrow::Cow;
use std::fmt::Display;
use std::sync::Arc;
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::Signature;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::value::ValueRef;
use datatypes::vectors::{Float32VectorBuilder, MutableVector, Vector, VectorRef};
use snafu::ensure;
use crate::function::{Function, FunctionContext};
use crate::helper;
macro_rules! define_distance_function {
($StructName:ident, $display_name:expr, $similarity_method:path) => {
/// A function calculates the distance between two vectors.
#[derive(Debug, Clone, Default)]
pub struct $StructName;
impl Function for $StructName {
fn name(&self) -> &str {
$display_name
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::float32_datatype())
}
fn signature(&self) -> Signature {
helper::one_of_sigs2(
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::binary_datatype(),
],
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::binary_datatype(),
],
)
}
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly two, have: {}",
columns.len()
),
}
);
let arg0 = &columns[0];
let arg1 = &columns[1];
let size = arg0.len();
let mut result = Float32VectorBuilder::with_capacity(size);
if size == 0 {
return Ok(result.to_vector());
}
let arg0_const = parse_if_constant_string(arg0)?;
let arg1_const = parse_if_constant_string(arg1)?;
for i in 0..size {
let vec0 = match arg0_const.as_ref() {
Some(a) => Some(Cow::Borrowed(a.as_slice())),
None => as_vector(arg0.get_ref(i))?,
};
let vec1 = match arg1_const.as_ref() {
Some(b) => Some(Cow::Borrowed(b.as_slice())),
None => as_vector(arg1.get_ref(i))?,
};
if let (Some(vec0), Some(vec1)) = (vec0, vec1) {
ensure!(
vec0.len() == vec1.len(),
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the vectors must match to calculate distance, have: {} vs {}",
vec0.len(),
vec1.len()
),
}
);
// Checked if the length of the vectors match
let d = $similarity_method(vec0.as_ref(), vec1.as_ref());
result.push(Some(d));
} else {
result.push_null();
}
}
return Ok(result.to_vector());
}
}
impl Display for $StructName {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", $display_name.to_ascii_uppercase())
}
}
}
}
define_distance_function!(CosDistanceFunction, "vec_cos_distance", cos::cos);
define_distance_function!(L2SqDistanceFunction, "vec_l2sq_distance", l2sq::l2sq);
define_distance_function!(DotProductFunction, "vec_dot_product", dot::dot);
/// Parse a vector value if the value is a constant string.
fn parse_if_constant_string(arg: &Arc<dyn Vector>) -> Result<Option<Vec<f32>>> {
if !arg.is_const() {
return Ok(None);
}
if arg.data_type() != ConcreteDataType::string_datatype() {
return Ok(None);
}
arg.get_ref(0)
.as_string()
.unwrap() // Safe: checked if it is a string
.map(parse_f32_vector_from_string)
.transpose()
}
/// Convert a value to a vector value.
/// Supported data types are binary and string.
fn as_vector(arg: ValueRef<'_>) -> Result<Option<Cow<'_, [f32]>>> {
match arg.data_type() {
ConcreteDataType::Binary(_) => arg
.as_binary()
.unwrap() // Safe: checked if it is a binary
.map(binary_as_vector)
.transpose(),
ConcreteDataType::String(_) => arg
.as_string()
.unwrap() // Safe: checked if it is a string
.map(|s| Ok(Cow::Owned(parse_f32_vector_from_string(s)?)))
.transpose(),
ConcreteDataType::Null(_) => Ok(None),
_ => InvalidFuncArgsSnafu {
err_msg: format!("Unsupported data type: {:?}", arg.data_type()),
}
.fail(),
}
}
/// Convert a u8 slice to a vector value.
fn binary_as_vector(bytes: &[u8]) -> Result<Cow<'_, [f32]>> {
if bytes.len() % std::mem::size_of::<f32>() != 0 {
return InvalidFuncArgsSnafu {
err_msg: format!("Invalid binary length of vector: {}", bytes.len()),
}
.fail();
}
if cfg!(target_endian = "little") {
Ok(unsafe {
let vec = std::slice::from_raw_parts(
bytes.as_ptr() as *const f32,
bytes.len() / std::mem::size_of::<f32>(),
);
Cow::Borrowed(vec)
})
} else {
let v = bytes
.chunks_exact(std::mem::size_of::<f32>())
.map(|chunk| f32::from_le_bytes(chunk.try_into().unwrap()))
.collect::<Vec<f32>>();
Ok(Cow::Owned(v))
}
}
/// Parse a string to a vector value.
/// Valid inputs are strings like "[1.0, 2.0, 3.0]".
fn parse_f32_vector_from_string(s: &str) -> Result<Vec<f32>> {
let trimmed = s.trim();
if !trimmed.starts_with('[') || !trimmed.ends_with(']') {
return InvalidFuncArgsSnafu {
err_msg: format!(
"Failed to parse {s} to Vector value: not properly enclosed in brackets"
),
}
.fail();
}
let content = trimmed[1..trimmed.len() - 1].trim();
if content.is_empty() {
return Ok(Vec::new());
}
content
.split(',')
.map(|s| s.trim().parse::<f32>())
.collect::<std::result::Result<_, _>>()
.map_err(|e| {
InvalidFuncArgsSnafu {
err_msg: format!("Failed to parse {s} to Vector value: {e}"),
}
.build()
})
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use datatypes::vectors::{BinaryVector, ConstantVector, StringVector};
use super::*;
#[test]
fn test_distance_string_string() {
let funcs = [
Box::new(CosDistanceFunction {}) as Box<dyn Function>,
Box::new(L2SqDistanceFunction {}) as Box<dyn Function>,
Box::new(DotProductFunction {}) as Box<dyn Function>,
];
for func in funcs {
let vec1 = Arc::new(StringVector::from(vec![
Some("[0.0, 1.0]"),
Some("[1.0, 0.0]"),
None,
Some("[1.0, 0.0]"),
])) as VectorRef;
let vec2 = Arc::new(StringVector::from(vec![
Some("[0.0, 1.0]"),
Some("[0.0, 1.0]"),
Some("[0.0, 1.0]"),
None,
])) as VectorRef;
let result = func
.eval(FunctionContext::default(), &[vec1.clone(), vec2.clone()])
.unwrap();
assert!(!result.get(0).is_null());
assert!(!result.get(1).is_null());
assert!(result.get(2).is_null());
assert!(result.get(3).is_null());
let result = func
.eval(FunctionContext::default(), &[vec2, vec1])
.unwrap();
assert!(!result.get(0).is_null());
assert!(!result.get(1).is_null());
assert!(result.get(2).is_null());
assert!(result.get(3).is_null());
}
}
#[test]
fn test_distance_binary_binary() {
let funcs = [
Box::new(CosDistanceFunction {}) as Box<dyn Function>,
Box::new(L2SqDistanceFunction {}) as Box<dyn Function>,
Box::new(DotProductFunction {}) as Box<dyn Function>,
];
for func in funcs {
let vec1 = Arc::new(BinaryVector::from(vec![
Some(vec![0, 0, 0, 0, 0, 0, 128, 63]),
Some(vec![0, 0, 128, 63, 0, 0, 0, 0]),
None,
Some(vec![0, 0, 128, 63, 0, 0, 0, 0]),
])) as VectorRef;
let vec2 = Arc::new(BinaryVector::from(vec![
// [0.0, 1.0]
Some(vec![0, 0, 0, 0, 0, 0, 128, 63]),
Some(vec![0, 0, 0, 0, 0, 0, 128, 63]),
Some(vec![0, 0, 0, 0, 0, 0, 128, 63]),
None,
])) as VectorRef;
let result = func
.eval(FunctionContext::default(), &[vec1.clone(), vec2.clone()])
.unwrap();
assert!(!result.get(0).is_null());
assert!(!result.get(1).is_null());
assert!(result.get(2).is_null());
assert!(result.get(3).is_null());
let result = func
.eval(FunctionContext::default(), &[vec2, vec1])
.unwrap();
assert!(!result.get(0).is_null());
assert!(!result.get(1).is_null());
assert!(result.get(2).is_null());
assert!(result.get(3).is_null());
}
}
#[test]
fn test_distance_string_binary() {
let funcs = [
Box::new(CosDistanceFunction {}) as Box<dyn Function>,
Box::new(L2SqDistanceFunction {}) as Box<dyn Function>,
Box::new(DotProductFunction {}) as Box<dyn Function>,
];
for func in funcs {
let vec1 = Arc::new(StringVector::from(vec![
Some("[0.0, 1.0]"),
Some("[1.0, 0.0]"),
None,
Some("[1.0, 0.0]"),
])) as VectorRef;
let vec2 = Arc::new(BinaryVector::from(vec![
// [0.0, 1.0]
Some(vec![0, 0, 0, 0, 0, 0, 128, 63]),
Some(vec![0, 0, 0, 0, 0, 0, 128, 63]),
Some(vec![0, 0, 0, 0, 0, 0, 128, 63]),
None,
])) as VectorRef;
let result = func
.eval(FunctionContext::default(), &[vec1.clone(), vec2.clone()])
.unwrap();
assert!(!result.get(0).is_null());
assert!(!result.get(1).is_null());
assert!(result.get(2).is_null());
assert!(result.get(3).is_null());
let result = func
.eval(FunctionContext::default(), &[vec2, vec1])
.unwrap();
assert!(!result.get(0).is_null());
assert!(!result.get(1).is_null());
assert!(result.get(2).is_null());
assert!(result.get(3).is_null());
}
}
#[test]
fn test_distance_const_string() {
let funcs = [
Box::new(CosDistanceFunction {}) as Box<dyn Function>,
Box::new(L2SqDistanceFunction {}) as Box<dyn Function>,
Box::new(DotProductFunction {}) as Box<dyn Function>,
];
for func in funcs {
let const_str = Arc::new(ConstantVector::new(
Arc::new(StringVector::from(vec!["[0.0, 1.0]"])),
4,
));
let vec1 = Arc::new(StringVector::from(vec![
Some("[0.0, 1.0]"),
Some("[1.0, 0.0]"),
None,
Some("[1.0, 0.0]"),
])) as VectorRef;
let vec2 = Arc::new(BinaryVector::from(vec![
// [0.0, 1.0]
Some(vec![0, 0, 0, 0, 0, 0, 128, 63]),
Some(vec![0, 0, 0, 0, 0, 0, 128, 63]),
Some(vec![0, 0, 0, 0, 0, 0, 128, 63]),
None,
])) as VectorRef;
let result = func
.eval(
FunctionContext::default(),
&[const_str.clone(), vec1.clone()],
)
.unwrap();
assert!(!result.get(0).is_null());
assert!(!result.get(1).is_null());
assert!(result.get(2).is_null());
assert!(!result.get(3).is_null());
let result = func
.eval(
FunctionContext::default(),
&[vec1.clone(), const_str.clone()],
)
.unwrap();
assert!(!result.get(0).is_null());
assert!(!result.get(1).is_null());
assert!(result.get(2).is_null());
assert!(!result.get(3).is_null());
let result = func
.eval(
FunctionContext::default(),
&[const_str.clone(), vec2.clone()],
)
.unwrap();
assert!(!result.get(0).is_null());
assert!(!result.get(1).is_null());
assert!(!result.get(2).is_null());
assert!(result.get(3).is_null());
let result = func
.eval(
FunctionContext::default(),
&[vec2.clone(), const_str.clone()],
)
.unwrap();
assert!(!result.get(0).is_null());
assert!(!result.get(1).is_null());
assert!(!result.get(2).is_null());
assert!(result.get(3).is_null());
}
}
#[test]
fn test_invalid_vector_length() {
let funcs = [
Box::new(CosDistanceFunction {}) as Box<dyn Function>,
Box::new(L2SqDistanceFunction {}) as Box<dyn Function>,
Box::new(DotProductFunction {}) as Box<dyn Function>,
];
for func in funcs {
let vec1 = Arc::new(StringVector::from(vec!["[1.0]"])) as VectorRef;
let vec2 = Arc::new(StringVector::from(vec!["[1.0, 1.0]"])) as VectorRef;
let result = func.eval(FunctionContext::default(), &[vec1, vec2]);
assert!(result.is_err());
let vec1 = Arc::new(BinaryVector::from(vec![vec![0, 0, 128, 63]])) as VectorRef;
let vec2 =
Arc::new(BinaryVector::from(vec![vec![0, 0, 128, 63, 0, 0, 0, 64]])) as VectorRef;
let result = func.eval(FunctionContext::default(), &[vec1, vec2]);
assert!(result.is_err());
}
}
#[test]
fn test_parse_vector_from_string() {
let result = parse_f32_vector_from_string("[1.0, 2.0, 3.0]").unwrap();
assert_eq!(result, vec![1.0, 2.0, 3.0]);
let result = parse_f32_vector_from_string("[]").unwrap();
assert_eq!(result, Vec::<f32>::new());
let result = parse_f32_vector_from_string("[1.0, a, 3.0]");
assert!(result.is_err());
}
#[test]
fn test_binary_as_vector() {
let bytes = [0, 0, 128, 63];
let result = binary_as_vector(&bytes).unwrap();
assert_eq!(result.as_ref(), &[1.0]);
let invalid_bytes = [0, 0, 128];
let result = binary_as_vector(&invalid_bytes);
assert!(result.is_err());
}
}

View File

@@ -0,0 +1,87 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use nalgebra::DVectorView;
/// Calculates the cos distance between two vectors.
///
/// **Note:** Must ensure that the length of the two vectors are the same.
pub fn cos(lhs: &[f32], rhs: &[f32]) -> f32 {
let lhs_vec = DVectorView::from_slice(lhs, lhs.len());
let rhs_vec = DVectorView::from_slice(rhs, rhs.len());
let dot_product = lhs_vec.dot(&rhs_vec);
let lhs_norm = lhs_vec.norm();
let rhs_norm = rhs_vec.norm();
if dot_product.abs() < f32::EPSILON
|| lhs_norm.abs() < f32::EPSILON
|| rhs_norm.abs() < f32::EPSILON
{
return 1.0;
}
let cos_similar = dot_product / (lhs_norm * rhs_norm);
let res = 1.0 - cos_similar;
if res.abs() < f32::EPSILON {
0.0
} else {
res
}
}
#[cfg(test)]
mod tests {
use approx::assert_relative_eq;
use super::*;
#[test]
fn test_cos_scalar() {
let lhs = vec![1.0, 2.0, 3.0];
let rhs = vec![1.0, 2.0, 3.0];
assert_relative_eq!(cos(&lhs, &rhs), 0.0, epsilon = 1e-2);
let lhs = vec![1.0, 2.0, 3.0];
let rhs = vec![4.0, 5.0, 6.0];
assert_relative_eq!(cos(&lhs, &rhs), 0.025, epsilon = 1e-2);
let lhs = vec![1.0, 2.0, 3.0];
let rhs = vec![7.0, 8.0, 9.0];
assert_relative_eq!(cos(&lhs, &rhs), 0.04, epsilon = 1e-2);
let lhs = vec![0.0, 0.0, 0.0];
let rhs = vec![1.0, 2.0, 3.0];
assert_relative_eq!(cos(&lhs, &rhs), 1.0, epsilon = 1e-2);
let lhs = vec![0.0, 0.0, 0.0];
let rhs = vec![4.0, 5.0, 6.0];
assert_relative_eq!(cos(&lhs, &rhs), 1.0, epsilon = 1e-2);
let lhs = vec![0.0, 0.0, 0.0];
let rhs = vec![7.0, 8.0, 9.0];
assert_relative_eq!(cos(&lhs, &rhs), 1.0, epsilon = 1e-2);
let lhs = vec![7.0, 8.0, 9.0];
let rhs = vec![1.0, 2.0, 3.0];
assert_relative_eq!(cos(&lhs, &rhs), 0.04, epsilon = 1e-2);
let lhs = vec![7.0, 8.0, 9.0];
let rhs = vec![4.0, 5.0, 6.0];
assert_relative_eq!(cos(&lhs, &rhs), 0.0, epsilon = 1e-2);
let lhs = vec![7.0, 8.0, 9.0];
let rhs = vec![7.0, 8.0, 9.0];
assert_relative_eq!(cos(&lhs, &rhs), 0.0, epsilon = 1e-2);
}
}

View File

@@ -0,0 +1,71 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use nalgebra::DVectorView;
/// Calculates the dot product between two vectors.
///
/// **Note:** Must ensure that the length of the two vectors are the same.
pub fn dot(lhs: &[f32], rhs: &[f32]) -> f32 {
let lhs = DVectorView::from_slice(lhs, lhs.len());
let rhs = DVectorView::from_slice(rhs, rhs.len());
lhs.dot(&rhs)
}
#[cfg(test)]
mod tests {
use approx::assert_relative_eq;
use super::*;
#[test]
fn test_dot_scalar() {
let lhs = vec![1.0, 2.0, 3.0];
let rhs = vec![1.0, 2.0, 3.0];
assert_relative_eq!(dot(&lhs, &rhs), 14.0, epsilon = 1e-2);
let lhs = vec![1.0, 2.0, 3.0];
let rhs = vec![4.0, 5.0, 6.0];
assert_relative_eq!(dot(&lhs, &rhs), 32.0, epsilon = 1e-2);
let lhs = vec![1.0, 2.0, 3.0];
let rhs = vec![7.0, 8.0, 9.0];
assert_relative_eq!(dot(&lhs, &rhs), 50.0, epsilon = 1e-2);
let lhs = vec![0.0, 0.0, 0.0];
let rhs = vec![1.0, 2.0, 3.0];
assert_relative_eq!(dot(&lhs, &rhs), 0.0, epsilon = 1e-2);
let lhs = vec![0.0, 0.0, 0.0];
let rhs = vec![4.0, 5.0, 6.0];
assert_relative_eq!(dot(&lhs, &rhs), 0.0, epsilon = 1e-2);
let lhs = vec![0.0, 0.0, 0.0];
let rhs = vec![7.0, 8.0, 9.0];
assert_relative_eq!(dot(&lhs, &rhs), 0.0, epsilon = 1e-2);
let lhs = vec![7.0, 8.0, 9.0];
let rhs = vec![1.0, 2.0, 3.0];
assert_relative_eq!(dot(&lhs, &rhs), 50.0, epsilon = 1e-2);
let lhs = vec![7.0, 8.0, 9.0];
let rhs = vec![4.0, 5.0, 6.0];
assert_relative_eq!(dot(&lhs, &rhs), 122.0, epsilon = 1e-2);
let lhs = vec![7.0, 8.0, 9.0];
let rhs = vec![7.0, 8.0, 9.0];
assert_relative_eq!(dot(&lhs, &rhs), 194.0, epsilon = 1e-2);
}
}

View File

@@ -0,0 +1,71 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use nalgebra::DVectorView;
/// Calculates the squared L2 distance between two vectors.
///
/// **Note:** Must ensure that the length of the two vectors are the same.
pub fn l2sq(lhs: &[f32], rhs: &[f32]) -> f32 {
let lhs = DVectorView::from_slice(lhs, lhs.len());
let rhs = DVectorView::from_slice(rhs, rhs.len());
(lhs - rhs).norm_squared()
}
#[cfg(test)]
mod tests {
use approx::assert_relative_eq;
use super::*;
#[test]
fn test_l2sq_scalar() {
let lhs = vec![1.0, 2.0, 3.0];
let rhs = vec![1.0, 2.0, 3.0];
assert_relative_eq!(l2sq(&lhs, &rhs), 0.0, epsilon = 1e-2);
let lhs = vec![1.0, 2.0, 3.0];
let rhs = vec![4.0, 5.0, 6.0];
assert_relative_eq!(l2sq(&lhs, &rhs), 27.0, epsilon = 1e-2);
let lhs = vec![1.0, 2.0, 3.0];
let rhs = vec![7.0, 8.0, 9.0];
assert_relative_eq!(l2sq(&lhs, &rhs), 108.0, epsilon = 1e-2);
let lhs = vec![0.0, 0.0, 0.0];
let rhs = vec![1.0, 2.0, 3.0];
assert_relative_eq!(l2sq(&lhs, &rhs), 14.0, epsilon = 1e-2);
let lhs = vec![0.0, 0.0, 0.0];
let rhs = vec![4.0, 5.0, 6.0];
assert_relative_eq!(l2sq(&lhs, &rhs), 77.0, epsilon = 1e-2);
let lhs = vec![0.0, 0.0, 0.0];
let rhs = vec![7.0, 8.0, 9.0];
assert_relative_eq!(l2sq(&lhs, &rhs), 194.0, epsilon = 1e-2);
let lhs = vec![7.0, 8.0, 9.0];
let rhs = vec![1.0, 2.0, 3.0];
assert_relative_eq!(l2sq(&lhs, &rhs), 108.0, epsilon = 1e-2);
let lhs = vec![7.0, 8.0, 9.0];
let rhs = vec![4.0, 5.0, 6.0];
assert_relative_eq!(l2sq(&lhs, &rhs), 27.0, epsilon = 1e-2);
let lhs = vec![7.0, 8.0, 9.0];
let rhs = vec![7.0, 8.0, 9.0];
assert_relative_eq!(l2sq(&lhs, &rhs), 0.0, epsilon = 1e-2);
}
}

View File

@@ -14,30 +14,30 @@
use api::helper::ColumnDataTypeWrapper;
use api::v1::add_column_location::LocationType;
use api::v1::alter_expr::Kind;
use api::v1::alter_table_expr::Kind;
use api::v1::column_def::as_fulltext_option;
use api::v1::{
column_def, AddColumnLocation as Location, AlterExpr, Analyzer, ChangeColumnTypes,
CreateTableExpr, DropColumns, RenameTable, SemanticType,
column_def, AddColumnLocation as Location, AlterTableExpr, Analyzer, CreateTableExpr,
DropColumns, ModifyColumnTypes, RenameTable, SemanticType,
};
use common_query::AddColumnLocation;
use datatypes::schema::{ColumnSchema, FulltextOptions, RawSchema};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::region_request::ChangeOption;
use store_api::region_request::{SetRegionOption, UnsetRegionOption};
use table::metadata::TableId;
use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, ChangeColumnTypeRequest};
use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, ModifyColumnTypeRequest};
use crate::error::{
InvalidChangeFulltextOptionRequestSnafu, InvalidChangeTableOptionRequestSnafu,
InvalidColumnDefSnafu, MissingFieldSnafu, MissingTimestampColumnSnafu, Result,
InvalidColumnDefSnafu, InvalidSetFulltextOptionRequestSnafu, InvalidSetTableOptionRequestSnafu,
InvalidUnsetTableOptionRequestSnafu, MissingFieldSnafu, MissingTimestampColumnSnafu, Result,
UnknownLocationTypeSnafu,
};
const LOCATION_TYPE_FIRST: i32 = LocationType::First as i32;
const LOCATION_TYPE_AFTER: i32 = LocationType::After as i32;
/// Convert an [`AlterExpr`] to an [`AlterTableRequest`]
pub fn alter_expr_to_request(table_id: TableId, expr: AlterExpr) -> Result<AlterTableRequest> {
/// Convert an [`AlterTableExpr`] to an [`AlterTableRequest`]
pub fn alter_expr_to_request(table_id: TableId, expr: AlterTableExpr) -> Result<AlterTableRequest> {
let catalog_name = expr.catalog_name;
let schema_name = expr.schema_name;
let kind = expr.kind.context(MissingFieldSnafu { field: "kind" })?;
@@ -68,25 +68,25 @@ pub fn alter_expr_to_request(table_id: TableId, expr: AlterExpr) -> Result<Alter
columns: add_column_requests,
}
}
Kind::ChangeColumnTypes(ChangeColumnTypes {
change_column_types,
Kind::ModifyColumnTypes(ModifyColumnTypes {
modify_column_types,
}) => {
let change_column_type_requests = change_column_types
let modify_column_type_requests = modify_column_types
.into_iter()
.map(|cct| {
let target_type =
ColumnDataTypeWrapper::new(cct.target_type(), cct.target_type_extension)
.into();
Ok(ChangeColumnTypeRequest {
Ok(ModifyColumnTypeRequest {
column_name: cct.column_name,
target_type,
})
})
.collect::<Result<Vec<_>>>()?;
AlterKind::ChangeColumnTypes {
columns: change_column_type_requests,
AlterKind::ModifyColumnTypes {
columns: modify_column_type_requests,
}
}
Kind::DropColumns(DropColumns { drop_columns }) => AlterKind::DropColumns {
@@ -95,26 +95,37 @@ pub fn alter_expr_to_request(table_id: TableId, expr: AlterExpr) -> Result<Alter
Kind::RenameTable(RenameTable { new_table_name }) => {
AlterKind::RenameTable { new_table_name }
}
Kind::ChangeTableOptions(api::v1::ChangeTableOptions {
change_table_options,
}) => AlterKind::ChangeTableOptions {
options: change_table_options
.iter()
.map(ChangeOption::try_from)
.collect::<std::result::Result<Vec<_>, _>>()
.context(InvalidChangeTableOptionRequestSnafu)?,
},
Kind::ChangeColumnFulltext(c) => AlterKind::ChangeColumnFulltext {
Kind::SetTableOptions(api::v1::SetTableOptions { table_options }) => {
AlterKind::SetTableOptions {
options: table_options
.iter()
.map(SetRegionOption::try_from)
.collect::<std::result::Result<Vec<_>, _>>()
.context(InvalidSetTableOptionRequestSnafu)?,
}
}
Kind::UnsetTableOptions(api::v1::UnsetTableOptions { keys }) => {
AlterKind::UnsetTableOptions {
keys: keys
.iter()
.map(|key| UnsetRegionOption::try_from(key.as_str()))
.collect::<std::result::Result<Vec<_>, _>>()
.context(InvalidUnsetTableOptionRequestSnafu)?,
}
}
Kind::SetColumnFulltext(c) => AlterKind::SetColumnFulltext {
column_name: c.column_name,
options: FulltextOptions {
enable: c.enable,
analyzer: as_fulltext_option(
Analyzer::try_from(c.analyzer)
.context(InvalidChangeFulltextOptionRequestSnafu)?,
Analyzer::try_from(c.analyzer).context(InvalidSetFulltextOptionRequestSnafu)?,
),
case_sensitive: c.case_sensitive,
},
},
Kind::UnsetColumnFulltext(c) => AlterKind::UnsetColumnFulltext {
column_name: c.column_name,
},
};
let request = AlterTableRequest {
@@ -183,7 +194,7 @@ fn parse_location(location: Option<Location>) -> Result<Option<AddColumnLocation
#[cfg(test)]
mod tests {
use api::v1::{
AddColumn, AddColumns, ChangeColumnType, ColumnDataType, ColumnDef, DropColumn,
AddColumn, AddColumns, ColumnDataType, ColumnDef, DropColumn, ModifyColumnType,
SemanticType,
};
use datatypes::prelude::ConcreteDataType;
@@ -192,7 +203,7 @@ mod tests {
#[test]
fn test_alter_expr_to_request() {
let expr = AlterExpr {
let expr = AlterTableExpr {
catalog_name: String::default(),
schema_name: String::default(),
table_name: "monitor".to_string(),
@@ -233,7 +244,7 @@ mod tests {
#[test]
fn test_alter_expr_with_location_to_request() {
let expr = AlterExpr {
let expr = AlterTableExpr {
catalog_name: String::default(),
schema_name: String::default(),
table_name: "monitor".to_string(),
@@ -309,14 +320,14 @@ mod tests {
}
#[test]
fn test_change_column_type_expr() {
let expr = AlterExpr {
fn test_modify_column_type_expr() {
let expr = AlterTableExpr {
catalog_name: "test_catalog".to_string(),
schema_name: "test_schema".to_string(),
table_name: "monitor".to_string(),
kind: Some(Kind::ChangeColumnTypes(ChangeColumnTypes {
change_column_types: vec![ChangeColumnType {
kind: Some(Kind::ModifyColumnTypes(ModifyColumnTypes {
modify_column_types: vec![ModifyColumnType {
column_name: "mem_usage".to_string(),
target_type: ColumnDataType::String as i32,
target_type_extension: None,
@@ -329,22 +340,22 @@ mod tests {
assert_eq!(alter_request.schema_name, "test_schema");
assert_eq!("monitor".to_string(), alter_request.table_name);
let mut change_column_types = match alter_request.alter_kind {
AlterKind::ChangeColumnTypes { columns } => columns,
let mut modify_column_types = match alter_request.alter_kind {
AlterKind::ModifyColumnTypes { columns } => columns,
_ => unreachable!(),
};
let change_column_type = change_column_types.pop().unwrap();
assert_eq!("mem_usage", change_column_type.column_name);
let modify_column_type = modify_column_types.pop().unwrap();
assert_eq!("mem_usage", modify_column_type.column_name);
assert_eq!(
ConcreteDataType::string_datatype(),
change_column_type.target_type
modify_column_type.target_type
);
}
#[test]
fn test_drop_column_expr() {
let expr = AlterExpr {
let expr = AlterTableExpr {
catalog_name: "test_catalog".to_string(),
schema_name: "test_schema".to_string(),
table_name: "monitor".to_string(),

View File

@@ -120,14 +120,20 @@ pub enum Error {
location: Location,
},
#[snafu(display("Invalid change table option request"))]
InvalidChangeTableOptionRequest {
#[snafu(display("Invalid set table option request"))]
InvalidSetTableOptionRequest {
#[snafu(source)]
error: MetadataError,
},
#[snafu(display("Invalid change fulltext option request"))]
InvalidChangeFulltextOptionRequest {
#[snafu(display("Invalid unset table option request"))]
InvalidUnsetTableOptionRequest {
#[snafu(source)]
error: MetadataError,
},
#[snafu(display("Invalid set fulltext option request"))]
InvalidSetFulltextOptionRequest {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
@@ -156,8 +162,9 @@ impl ErrorExt for Error {
Error::UnknownColumnDataType { .. } | Error::InvalidFulltextColumnType { .. } => {
StatusCode::InvalidArguments
}
Error::InvalidChangeTableOptionRequest { .. }
| Error::InvalidChangeFulltextOptionRequest { .. } => StatusCode::InvalidArguments,
Error::InvalidSetTableOptionRequest { .. }
| Error::InvalidUnsetTableOptionRequest { .. }
| Error::InvalidSetFulltextOptionRequest { .. } => StatusCode::InvalidArguments,
}
}

View File

@@ -32,6 +32,7 @@ use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
use crate::{ClusterId, DatanodeId};
pub mod alter_database;
pub mod alter_logical_tables;
pub mod alter_table;
pub mod create_database;

View File

@@ -0,0 +1,248 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use common_telemetry::tracing::info;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use strum::AsRefStr;
use super::utils::handle_retry_error;
use crate::cache_invalidator::Context;
use crate::ddl::DdlContext;
use crate::error::{Result, SchemaNotFoundSnafu};
use crate::instruction::CacheIdent;
use crate::key::schema_name::{SchemaName, SchemaNameKey, SchemaNameValue};
use crate::key::DeserializedValueWithBytes;
use crate::lock_key::{CatalogLock, SchemaLock};
use crate::rpc::ddl::UnsetDatabaseOption::{self};
use crate::rpc::ddl::{AlterDatabaseKind, AlterDatabaseTask, SetDatabaseOption};
use crate::ClusterId;
pub struct AlterDatabaseProcedure {
pub context: DdlContext,
pub data: AlterDatabaseData,
}
fn build_new_schema_value(
mut value: SchemaNameValue,
alter_kind: &AlterDatabaseKind,
) -> Result<SchemaNameValue> {
match alter_kind {
AlterDatabaseKind::SetDatabaseOptions(options) => {
for option in options.0.iter() {
match option {
SetDatabaseOption::Ttl(ttl) => {
if ttl.is_zero() {
value.ttl = None;
} else {
value.ttl = Some(*ttl);
}
}
}
}
}
AlterDatabaseKind::UnsetDatabaseOptions(keys) => {
for key in keys.0.iter() {
match key {
UnsetDatabaseOption::Ttl => value.ttl = None,
}
}
}
}
Ok(value)
}
impl AlterDatabaseProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterDatabase";
pub fn new(
cluster_id: ClusterId,
task: AlterDatabaseTask,
context: DdlContext,
) -> Result<Self> {
Ok(Self {
context,
data: AlterDatabaseData::new(task, cluster_id)?,
})
}
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data = serde_json::from_str(json).context(FromJsonSnafu)?;
Ok(Self { context, data })
}
pub async fn on_prepare(&mut self) -> Result<Status> {
let value = self
.context
.table_metadata_manager
.schema_manager()
.get(SchemaNameKey::new(self.data.catalog(), self.data.schema()))
.await?;
ensure!(
value.is_some(),
SchemaNotFoundSnafu {
table_schema: self.data.schema(),
}
);
self.data.schema_value = value;
self.data.state = AlterDatabaseState::UpdateMetadata;
Ok(Status::executing(true))
}
pub async fn on_update_metadata(&mut self) -> Result<Status> {
let schema_name = SchemaNameKey::new(self.data.catalog(), self.data.schema());
// Safety: schema_value is not None.
let current_schema_value = self.data.schema_value.as_ref().unwrap();
let new_schema_value = build_new_schema_value(
current_schema_value.get_inner_ref().clone(),
&self.data.kind,
)?;
self.context
.table_metadata_manager
.schema_manager()
.update(schema_name, current_schema_value, &new_schema_value)
.await?;
info!("Updated database metadata for schema {schema_name}");
self.data.state = AlterDatabaseState::InvalidateSchemaCache;
Ok(Status::executing(true))
}
pub async fn on_invalidate_schema_cache(&mut self) -> Result<Status> {
let cache_invalidator = &self.context.cache_invalidator;
cache_invalidator
.invalidate(
&Context::default(),
&[CacheIdent::SchemaName(SchemaName {
catalog_name: self.data.catalog().to_string(),
schema_name: self.data.schema().to_string(),
})],
)
.await?;
Ok(Status::done())
}
}
#[async_trait]
impl Procedure for AlterDatabaseProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
match self.data.state {
AlterDatabaseState::Prepare => self.on_prepare().await,
AlterDatabaseState::UpdateMetadata => self.on_update_metadata().await,
AlterDatabaseState::InvalidateSchemaCache => self.on_invalidate_schema_cache().await,
}
.map_err(handle_retry_error)
}
fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
}
fn lock_key(&self) -> LockKey {
let catalog = self.data.catalog();
let schema = self.data.schema();
let lock_key = vec![
CatalogLock::Read(catalog).into(),
SchemaLock::write(catalog, schema).into(),
];
LockKey::new(lock_key)
}
}
#[derive(Debug, Serialize, Deserialize, AsRefStr)]
enum AlterDatabaseState {
Prepare,
UpdateMetadata,
InvalidateSchemaCache,
}
/// The data of alter database procedure.
#[derive(Debug, Serialize, Deserialize)]
pub struct AlterDatabaseData {
cluster_id: ClusterId,
state: AlterDatabaseState,
kind: AlterDatabaseKind,
catalog_name: String,
schema_name: String,
schema_value: Option<DeserializedValueWithBytes<SchemaNameValue>>,
}
impl AlterDatabaseData {
pub fn new(task: AlterDatabaseTask, cluster_id: ClusterId) -> Result<Self> {
Ok(Self {
cluster_id,
state: AlterDatabaseState::Prepare,
kind: AlterDatabaseKind::try_from(task.alter_expr.kind.unwrap())?,
catalog_name: task.alter_expr.catalog_name,
schema_name: task.alter_expr.schema_name,
schema_value: None,
})
}
pub fn catalog(&self) -> &str {
&self.catalog_name
}
pub fn schema(&self) -> &str {
&self.schema_name
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use crate::ddl::alter_database::build_new_schema_value;
use crate::key::schema_name::SchemaNameValue;
use crate::rpc::ddl::{
AlterDatabaseKind, SetDatabaseOption, SetDatabaseOptions, UnsetDatabaseOption,
UnsetDatabaseOptions,
};
#[test]
fn test_build_new_schema_value() {
let set_ttl = AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(vec![
SetDatabaseOption::Ttl(Duration::from_secs(10)),
]));
let current_schema_value = SchemaNameValue::default();
let new_schema_value =
build_new_schema_value(current_schema_value.clone(), &set_ttl).unwrap();
assert_eq!(new_schema_value.ttl, Some(Duration::from_secs(10)));
let unset_ttl_alter_kind =
AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(vec![
UnsetDatabaseOption::Ttl,
]));
let new_schema_value =
build_new_schema_value(current_schema_value, &unset_ttl_alter_kind).unwrap();
assert_eq!(new_schema_value.ttl, None);
}
}

View File

@@ -14,7 +14,7 @@
use std::collections::HashSet;
use api::v1::alter_expr::Kind;
use api::v1::alter_table_expr::Kind;
use snafu::{ensure, OptionExt};
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use api::v1;
use api::v1::alter_expr::Kind;
use api::v1::alter_table_expr::Kind;
use api::v1::region::{
alter_request, region_request, AddColumn, AddColumns, AlterRequest, AlterRequests,
RegionColumnDef, RegionRequest, RegionRequestHeader,

View File

@@ -19,7 +19,7 @@ mod update_metadata;
use std::vec;
use api::v1::alter_expr::Kind;
use api::v1::alter_table_expr::Kind;
use api::v1::RenameTable;
use async_trait::async_trait;
use common_error::ext::ErrorExt;

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::alter_expr::Kind;
use api::v1::alter_table_expr::Kind;
use api::v1::RenameTable;
use common_catalog::format_full_table_name;
use snafu::ensure;

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::alter_expr::Kind;
use api::v1::alter_table_expr::Kind;
use api::v1::region::region_request::Body;
use api::v1::region::{
alter_request, AddColumn, AddColumns, AlterRequest, DropColumn, DropColumns, RegionColumnDef,
@@ -91,7 +91,7 @@ fn create_proto_alter_kind(
add_columns,
})))
}
Kind::ChangeColumnTypes(x) => Ok(Some(alter_request::Kind::ChangeColumnTypes(x.clone()))),
Kind::ModifyColumnTypes(x) => Ok(Some(alter_request::Kind::ModifyColumnTypes(x.clone()))),
Kind::DropColumns(x) => {
let drop_columns = x
.drop_columns
@@ -106,9 +106,11 @@ fn create_proto_alter_kind(
})))
}
Kind::RenameTable(_) => Ok(None),
Kind::ChangeTableOptions(v) => Ok(Some(alter_request::Kind::ChangeTableOptions(v.clone()))),
Kind::ChangeColumnFulltext(v) => {
Ok(Some(alter_request::Kind::ChangeColumnFulltext(v.clone())))
Kind::SetTableOptions(v) => Ok(Some(alter_request::Kind::SetTableOptions(v.clone()))),
Kind::UnsetTableOptions(v) => Ok(Some(alter_request::Kind::UnsetTableOptions(v.clone()))),
Kind::SetColumnFulltext(v) => Ok(Some(alter_request::Kind::SetColumnFulltext(v.clone()))),
Kind::UnsetColumnFulltext(v) => {
Ok(Some(alter_request::Kind::UnsetColumnFulltext(v.clone())))
}
}
}
@@ -119,12 +121,12 @@ mod tests {
use std::sync::Arc;
use api::v1::add_column_location::LocationType;
use api::v1::alter_expr::Kind;
use api::v1::alter_table_expr::Kind;
use api::v1::region::region_request::Body;
use api::v1::region::RegionColumnDef;
use api::v1::{
region, AddColumn, AddColumnLocation, AddColumns, AlterExpr, ChangeColumnType,
ChangeColumnTypes, ColumnDataType, ColumnDef as PbColumnDef, SemanticType,
region, AddColumn, AddColumnLocation, AddColumns, AlterTableExpr, ColumnDataType,
ColumnDef as PbColumnDef, ModifyColumnType, ModifyColumnTypes, SemanticType,
};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use store_api::storage::{RegionId, TableId};
@@ -213,7 +215,7 @@ mod tests {
prepare_ddl_context().await;
let task = AlterTableTask {
alter_table: AlterExpr {
alter_table: AlterTableExpr {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name,
@@ -280,12 +282,12 @@ mod tests {
prepare_ddl_context().await;
let task = AlterTableTask {
alter_table: AlterExpr {
alter_table: AlterTableExpr {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name,
kind: Some(Kind::ChangeColumnTypes(ChangeColumnTypes {
change_column_types: vec![ChangeColumnType {
kind: Some(Kind::ModifyColumnTypes(ModifyColumnTypes {
modify_column_types: vec![ModifyColumnType {
column_name: "cpu".to_string(),
target_type: ColumnDataType::String as i32,
target_type_extension: None,
@@ -306,9 +308,9 @@ mod tests {
assert_eq!(alter_region_request.schema_version, 1);
assert_eq!(
alter_region_request.kind,
Some(region::alter_request::Kind::ChangeColumnTypes(
ChangeColumnTypes {
change_column_types: vec![ChangeColumnType {
Some(region::alter_request::Kind::ModifyColumnTypes(
ModifyColumnTypes {
modify_column_types: vec![ModifyColumnType {
column_name: "cpu".to_string(),
target_type: ColumnDataType::String as i32,
target_type_extension: None,

View File

@@ -52,9 +52,11 @@ impl AlterTableProcedure {
new_info.name = new_table_name.to_string();
}
AlterKind::DropColumns { .. }
| AlterKind::ChangeColumnTypes { .. }
| AlterKind::ChangeTableOptions { .. }
| AlterKind::ChangeColumnFulltext { .. } => {}
| AlterKind::ModifyColumnTypes { .. }
| AlterKind::SetTableOptions { .. }
| AlterKind::UnsetTableOptions { .. }
| AlterKind::SetColumnFulltext { .. }
| AlterKind::UnsetColumnFulltext { .. } => {}
}
Ok(new_info)

View File

@@ -28,6 +28,7 @@ use common_procedure::{
use common_telemetry::info;
use common_telemetry::tracing_context::TracingContext;
use futures::future::join_all;
use futures::TryStreamExt;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
@@ -43,7 +44,7 @@ use crate::instruction::{CacheIdent, CreateFlow};
use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::flow::flow_route::FlowRouteValue;
use crate::key::table_name::TableNameKey;
use crate::key::{FlowId, FlowPartitionId};
use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId};
use crate::lock_key::{CatalogLock, FlowNameLock, TableNameLock};
use crate::peer::Peer;
use crate::rpc::ddl::{CreateFlowTask, QueryContext};
@@ -75,6 +76,7 @@ impl CreateFlowProcedure {
source_table_ids: vec![],
query_context,
state: CreateFlowState::Prepare,
prev_flow_info_value: None,
},
}
}
@@ -90,6 +92,7 @@ impl CreateFlowProcedure {
let flow_name = &self.data.task.flow_name;
let sink_table_name = &self.data.task.sink_table_name;
let create_if_not_exists = self.data.task.create_if_not_exists;
let or_replace = self.data.task.or_replace;
let flow_name_value = self
.context
@@ -98,16 +101,56 @@ impl CreateFlowProcedure {
.get(catalog_name, flow_name)
.await?;
if create_if_not_exists && or_replace {
// this is forbidden because not clear what does that mean exactly
return error::UnsupportedSnafu {
operation: "Create flow with both `IF NOT EXISTS` and `OR REPLACE`".to_string(),
}
.fail();
}
if let Some(value) = flow_name_value {
ensure!(
create_if_not_exists,
create_if_not_exists || or_replace,
error::FlowAlreadyExistsSnafu {
flow_name: format_full_flow_name(catalog_name, flow_name),
}
);
let flow_id = value.flow_id();
return Ok(Status::done_with_output(flow_id));
if create_if_not_exists {
info!("Flow already exists, flow_id: {}", flow_id);
return Ok(Status::done_with_output(flow_id));
}
let flow_id = value.flow_id();
let peers = self
.context
.flow_metadata_manager
.flow_route_manager()
.routes(flow_id)
.map_ok(|(_, value)| value.peer)
.try_collect::<Vec<_>>()
.await?;
self.data.flow_id = Some(flow_id);
self.data.peers = peers;
info!("Replacing flow, flow_id: {}", flow_id);
let flow_info_value = self
.context
.flow_metadata_manager
.flow_info_manager()
.get_raw(flow_id)
.await?;
ensure!(
flow_info_value.is_some(),
error::FlowNotFoundSnafu {
flow_name: format_full_flow_name(catalog_name, flow_name),
}
);
self.data.prev_flow_info_value = flow_info_value;
}
// Ensures sink table doesn't exist.
@@ -128,7 +171,9 @@ impl CreateFlowProcedure {
}
self.collect_source_tables().await?;
self.allocate_flow_id().await?;
if self.data.flow_id.is_none() {
self.allocate_flow_id().await?;
}
self.data.state = CreateFlowState::CreateFlows;
Ok(Status::executing(true))
@@ -153,7 +198,10 @@ impl CreateFlowProcedure {
.map_err(add_peer_context_if_needed(peer.clone()))
});
}
info!(
"Creating flow({:?}) on flownodes with peers={:?}",
self.data.flow_id, self.data.peers
);
join_all(create_flow)
.await
.into_iter()
@@ -170,18 +218,29 @@ impl CreateFlowProcedure {
async fn on_create_metadata(&mut self) -> Result<Status> {
// Safety: The flow id must be allocated.
let flow_id = self.data.flow_id.unwrap();
// TODO(weny): Support `or_replace`.
let (flow_info, flow_routes) = (&self.data).into();
self.context
.flow_metadata_manager
.create_flow_metadata(flow_id, flow_info, flow_routes)
.await?;
info!("Created flow metadata for flow {flow_id}");
if let Some(prev_flow_value) = self.data.prev_flow_info_value.as_ref()
&& self.data.task.or_replace
{
self.context
.flow_metadata_manager
.update_flow_metadata(flow_id, prev_flow_value, &flow_info, flow_routes)
.await?;
info!("Replaced flow metadata for flow {flow_id}");
} else {
self.context
.flow_metadata_manager
.create_flow_metadata(flow_id, flow_info, flow_routes)
.await?;
info!("Created flow metadata for flow {flow_id}");
}
self.data.state = CreateFlowState::InvalidateFlowCache;
Ok(Status::executing(true))
}
async fn on_broadcast(&mut self) -> Result<Status> {
debug_assert!(self.data.state == CreateFlowState::InvalidateFlowCache);
// Safety: The flow id must be allocated.
let flow_id = self.data.flow_id.unwrap();
let ctx = Context {
@@ -192,10 +251,13 @@ impl CreateFlowProcedure {
.cache_invalidator
.invalidate(
&ctx,
&[CacheIdent::CreateFlow(CreateFlow {
source_table_ids: self.data.source_table_ids.clone(),
flownodes: self.data.peers.clone(),
})],
&[
CacheIdent::CreateFlow(CreateFlow {
source_table_ids: self.data.source_table_ids.clone(),
flownodes: self.data.peers.clone(),
}),
CacheIdent::FlowId(flow_id),
],
)
.await?;
@@ -270,6 +332,9 @@ pub struct CreateFlowData {
pub(crate) peers: Vec<Peer>,
pub(crate) source_table_ids: Vec<TableId>,
pub(crate) query_context: QueryContext,
/// For verify if prev value is consistent when need to update flow metadata.
/// only set when `or_replace` is true.
pub(crate) prev_flow_info_value: Option<DeserializedValueWithBytes<FlowInfoValue>>,
}
impl From<&CreateFlowData> for CreateRequest {
@@ -284,8 +349,9 @@ impl From<&CreateFlowData> for CreateRequest {
.map(|table_id| api::v1::TableId { id: *table_id })
.collect_vec(),
sink_table_name: Some(value.task.sink_table_name.clone().into()),
// Always be true
// Always be true to ensure idempotent in case of retry
create_if_not_exists: true,
or_replace: value.task.or_replace,
expire_after: value.task.expire_after.map(|value| ExpireAfter { value }),
comment: value.task.comment.clone(),
sql: value.task.sql.clone(),

View File

@@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::alter_expr::Kind;
use api::v1::{AddColumn, AddColumns, AlterExpr, ColumnDef, RenameTable};
use api::v1::alter_table_expr::Kind;
use api::v1::{AddColumn, AddColumns, AlterTableExpr, ColumnDef, RenameTable};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use derive_builder::Builder;
@@ -32,7 +32,7 @@ pub struct TestAlterTableExpr {
new_table_name: Option<String>,
}
impl From<TestAlterTableExpr> for AlterExpr {
impl From<TestAlterTableExpr> for AlterTableExpr {
fn from(value: TestAlterTableExpr) -> Self {
if let Some(new_table_name) = value.new_table_name {
Self {

View File

@@ -16,11 +16,11 @@ use std::assert_matches::assert_matches;
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::alter_expr::Kind;
use api::v1::alter_table_expr::Kind;
use api::v1::region::{region_request, RegionRequest};
use api::v1::{
AddColumn, AddColumns, AlterExpr, ChangeTableOption, ChangeTableOptions, ColumnDataType,
ColumnDef as PbColumnDef, DropColumn, DropColumns, SemanticType,
AddColumn, AddColumns, AlterTableExpr, ColumnDataType, ColumnDef as PbColumnDef, DropColumn,
DropColumns, SemanticType, SetTableOptions,
};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::ErrorExt;
@@ -133,7 +133,7 @@ async fn test_on_submit_alter_request() {
.unwrap();
let alter_table_task = AlterTableTask {
alter_table: AlterExpr {
alter_table: AlterTableExpr {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
@@ -219,7 +219,7 @@ async fn test_on_submit_alter_request_with_outdated_request() {
.unwrap();
let alter_table_task = AlterTableTask {
alter_table: AlterExpr {
alter_table: AlterTableExpr {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
@@ -316,7 +316,7 @@ async fn test_on_update_metadata_add_columns() {
.unwrap();
let task = AlterTableTask {
alter_table: AlterExpr {
alter_table: AlterTableExpr {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
@@ -385,12 +385,12 @@ async fn test_on_update_table_options() {
.unwrap();
let task = AlterTableTask {
alter_table: AlterExpr {
alter_table: AlterTableExpr {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
kind: Some(Kind::ChangeTableOptions(ChangeTableOptions {
change_table_options: vec![ChangeTableOption {
kind: Some(Kind::SetTableOptions(SetTableOptions {
table_options: vec![api::v1::Option {
key: TTL_KEY.to_string(),
value: "1d".to_string(),
}],

View File

@@ -24,6 +24,7 @@ use derive_builder::Builder;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::TableId;
use crate::ddl::alter_database::AlterDatabaseProcedure;
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
use crate::ddl::alter_table::AlterTableProcedure;
use crate::ddl::create_database::CreateDatabaseProcedure;
@@ -47,12 +48,13 @@ use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use crate::rpc::ddl::DdlTask::{
AlterLogicalTables, AlterTable, CreateDatabase, CreateFlow, CreateLogicalTables, CreateTable,
CreateView, DropDatabase, DropFlow, DropLogicalTables, DropTable, DropView, TruncateTable,
AlterDatabase, AlterLogicalTables, AlterTable, CreateDatabase, CreateFlow, CreateLogicalTables,
CreateTable, CreateView, DropDatabase, DropFlow, DropLogicalTables, DropTable, DropView,
TruncateTable,
};
use crate::rpc::ddl::{
AlterTableTask, CreateDatabaseTask, CreateFlowTask, CreateTableTask, CreateViewTask,
DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask, QueryContext,
AlterDatabaseTask, AlterTableTask, CreateDatabaseTask, CreateFlowTask, CreateTableTask,
CreateViewTask, DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask, QueryContext,
SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
};
use crate::rpc::procedure;
@@ -129,6 +131,7 @@ impl DdlManager {
CreateFlowProcedure,
AlterTableProcedure,
AlterLogicalTablesProcedure,
AlterDatabaseProcedure,
DropTableProcedure,
DropFlowProcedure,
TruncateTableProcedure,
@@ -294,6 +297,18 @@ impl DdlManager {
self.submit_procedure(procedure_with_id).await
}
pub async fn submit_alter_database(
&self,
cluster_id: ClusterId,
alter_database_task: AlterDatabaseTask,
) -> Result<(ProcedureId, Option<Output>)> {
let context = self.create_context();
let procedure = AlterDatabaseProcedure::new(cluster_id, alter_database_task, context)?;
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
}
/// Submits and executes a create flow task.
#[tracing::instrument(skip_all)]
pub async fn submit_create_flow_task(
@@ -593,6 +608,28 @@ async fn handle_drop_database_task(
})
}
async fn handle_alter_database_task(
ddl_manager: &DdlManager,
cluster_id: ClusterId,
alter_database_task: AlterDatabaseTask,
) -> Result<SubmitDdlTaskResponse> {
let (id, _) = ddl_manager
.submit_alter_database(cluster_id, alter_database_task.clone())
.await?;
let procedure_id = id.to_string();
info!(
"Database {}.{} is altered via procedure_id {id:?}",
alter_database_task.catalog(),
alter_database_task.schema()
);
Ok(SubmitDdlTaskResponse {
key: procedure_id.into(),
..Default::default()
})
}
async fn handle_drop_flow_task(
ddl_manager: &DdlManager,
cluster_id: ClusterId,
@@ -655,10 +692,17 @@ async fn handle_create_flow_task(
procedure_id: &procedure_id,
err_msg: "downcast to `u32`",
})?);
info!(
"Flow {}.{}({flow_id}) is created via procedure_id {id:?}",
create_flow_task.catalog_name, create_flow_task.flow_name,
);
if !create_flow_task.or_replace {
info!(
"Flow {}.{}({flow_id}) is created via procedure_id {id:?}",
create_flow_task.catalog_name, create_flow_task.flow_name,
);
} else {
info!(
"Flow {}.{}({flow_id}) is replaced via procedure_id {id:?}",
create_flow_task.catalog_name, create_flow_task.flow_name,
);
}
Ok(SubmitDdlTaskResponse {
key: procedure_id.into(),
@@ -772,6 +816,9 @@ impl ProcedureExecutor for DdlManager {
DropDatabase(drop_database_task) => {
handle_drop_database_task(self, cluster_id, drop_database_task).await
}
AlterDatabase(alter_database_task) => {
handle_alter_database_task(self, cluster_id, alter_database_task).await
}
CreateFlow(create_flow_task) => {
handle_create_flow_task(
self,

View File

@@ -593,6 +593,21 @@ pub enum Error {
location: Location,
},
#[snafu(display("Invalid set database option, key: {}, value: {}", key, value))]
InvalidSetDatabaseOption {
key: String,
value: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid unset database option, key: {}", key))]
InvalidUnsetDatabaseOption {
key: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid prefix: {}, key: {}", prefix, key))]
MismatchPrefix {
prefix: String,
@@ -730,7 +745,9 @@ impl ErrorExt for Error {
| AlterLogicalTablesInvalidArguments { .. }
| CreateLogicalTablesInvalidArguments { .. }
| MismatchPrefix { .. }
| TlsConfig { .. } => StatusCode::InvalidArguments,
| TlsConfig { .. }
| InvalidSetDatabaseOption { .. }
| InvalidUnsetDatabaseOption { .. } => StatusCode::InvalidArguments,
FlowNotFound { .. } => StatusCode::FlowNotFound,
FlowRouteNotFound { .. } => StatusCode::Unexpected,

View File

@@ -90,6 +90,7 @@
pub mod catalog_name;
pub mod datanode_table;
pub mod flow;
pub mod maintenance;
pub mod node_address;
mod schema_metadata_manager;
pub mod schema_name;
@@ -564,13 +565,13 @@ impl TableMetadataManager {
let mut set = TxnOpGetResponseSet::from(&mut r.responses);
let remote_table_info = on_create_table_info_failure(&mut set)?
.context(error::UnexpectedSnafu {
err_msg: "Reads the empty table info during the create table metadata",
err_msg: "Reads the empty table info in comparing operation of creating table metadata",
})?
.into_inner();
let remote_view_info = on_create_view_info_failure(&mut set)?
.context(error::UnexpectedSnafu {
err_msg: "Reads the empty view info during the create view info",
err_msg: "Reads the empty view info in comparing operation of creating view metadata",
})?
.into_inner();
@@ -643,13 +644,13 @@ impl TableMetadataManager {
let mut set = TxnOpGetResponseSet::from(&mut r.responses);
let remote_table_info = on_create_table_info_failure(&mut set)?
.context(error::UnexpectedSnafu {
err_msg: "Reads the empty table info during the create table metadata",
err_msg: "Reads the empty table info in comparing operation of creating table metadata",
})?
.into_inner();
let remote_table_route = on_create_table_route_failure(&mut set)?
.context(error::UnexpectedSnafu {
err_msg: "Reads the empty table route during the create table metadata",
err_msg: "Reads the empty table route in comparing operation of creating table metadata",
})?
.into_inner();
@@ -730,13 +731,13 @@ impl TableMetadataManager {
for on_failure in on_failures {
let remote_table_info = (on_failure.on_create_table_info_failure)(&mut set)?
.context(error::UnexpectedSnafu {
err_msg: "Reads the empty table info during the create table metadata",
err_msg: "Reads the empty table info in comparing operation of creating table metadata",
})?
.into_inner();
let remote_table_route = (on_failure.on_create_table_route_failure)(&mut set)?
.context(error::UnexpectedSnafu {
err_msg: "Reads the empty table route during the create table metadata",
err_msg: "Reads the empty table route in comparing operation of creating table metadata",
})?
.into_inner();
@@ -914,7 +915,7 @@ impl TableMetadataManager {
let mut set = TxnOpGetResponseSet::from(&mut r.responses);
let remote_table_info = on_update_table_info_failure(&mut set)?
.context(error::UnexpectedSnafu {
err_msg: "Reads the empty table info during the rename table metadata",
err_msg: "Reads the empty table info in comparing operation of the rename table metadata",
})?
.into_inner();
@@ -960,7 +961,7 @@ impl TableMetadataManager {
let mut set = TxnOpGetResponseSet::from(&mut r.responses);
let remote_table_info = on_update_table_info_failure(&mut set)?
.context(error::UnexpectedSnafu {
err_msg: "Reads the empty table info during the updating table info",
err_msg: "Reads the empty table info in comparing operation of the updating table info",
})?
.into_inner();
@@ -1011,7 +1012,7 @@ impl TableMetadataManager {
let mut set = TxnOpGetResponseSet::from(&mut r.responses);
let remote_view_info = on_update_view_info_failure(&mut set)?
.context(error::UnexpectedSnafu {
err_msg: "Reads the empty view info during the updating view info",
err_msg: "Reads the empty view info in comparing operation of the updating view info",
})?
.into_inner();
@@ -1068,7 +1069,7 @@ impl TableMetadataManager {
for on_failure in on_failures {
let remote_table_info = (on_failure.on_update_table_info_failure)(&mut set)?
.context(error::UnexpectedSnafu {
err_msg: "Reads the empty table info during the updating table info",
err_msg: "Reads the empty table info in comparing operation of the updating table info",
})?
.into_inner();
@@ -1120,7 +1121,7 @@ impl TableMetadataManager {
let mut set = TxnOpGetResponseSet::from(&mut r.responses);
let remote_table_route = on_update_table_route_failure(&mut set)?
.context(error::UnexpectedSnafu {
err_msg: "Reads the empty table route during the updating table route",
err_msg: "Reads the empty table route in comparing operation of the updating table route",
})?
.into_inner();
@@ -1172,7 +1173,7 @@ impl TableMetadataManager {
let mut set = TxnOpGetResponseSet::from(&mut r.responses);
let remote_table_route = on_update_table_route_failure(&mut set)?
.context(error::UnexpectedSnafu {
err_msg: "Reads the empty table route during the updating leader region status",
err_msg: "Reads the empty table route in comparing operation of the updating leader region status",
})?
.into_inner();
@@ -1260,7 +1261,8 @@ impl_metadata_value! {
FlowNameValue,
FlowRouteValue,
TableFlowValue,
NodeAddressValue
NodeAddressValue,
SchemaNameValue
}
impl_optional_metadata_value! {

View File

@@ -38,7 +38,7 @@ use crate::key::flow::flow_name::FlowNameManager;
use crate::key::flow::flownode_flow::FlownodeFlowManager;
pub use crate::key::flow::table_flow::{TableFlowManager, TableFlowManagerRef};
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{FlowId, MetadataKey};
use crate::key::{DeserializedValueWithBytes, FlowId, MetadataKey};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::BatchDeleteRequest;
@@ -197,7 +197,7 @@ impl FlowMetadataManager {
on_create_flow_flow_name_failure(&mut set)?.with_context(|| {
error::UnexpectedSnafu {
err_msg: format!(
"Reads the empty flow name during the creating flow, flow_id: {flow_id}"
"Reads the empty flow name in comparing operation of the creating flow, flow_id: {flow_id}"
),
}
})?;
@@ -220,7 +220,7 @@ impl FlowMetadataManager {
let remote_flow =
on_create_flow_failure(&mut set)?.with_context(|| error::UnexpectedSnafu {
err_msg: format!(
"Reads the empty flow during the creating flow, flow_id: {flow_id}"
"Reads the empty flow in comparing operation of creating flow, flow_id: {flow_id}"
),
})?;
let op_name = "creating flow";
@@ -230,6 +230,102 @@ impl FlowMetadataManager {
Ok(())
}
/// Update metadata for flow and returns an error if old metadata IS NOT exists.
pub async fn update_flow_metadata(
&self,
flow_id: FlowId,
current_flow_info: &DeserializedValueWithBytes<FlowInfoValue>,
new_flow_info: &FlowInfoValue,
flow_routes: Vec<(FlowPartitionId, FlowRouteValue)>,
) -> Result<()> {
let (create_flow_flow_name_txn, on_create_flow_flow_name_failure) =
self.flow_name_manager.build_update_txn(
&new_flow_info.catalog_name,
&new_flow_info.flow_name,
flow_id,
)?;
let (create_flow_txn, on_create_flow_failure) =
self.flow_info_manager
.build_update_txn(flow_id, current_flow_info, new_flow_info)?;
let create_flow_routes_txn = self
.flow_route_manager
.build_create_txn(flow_id, flow_routes.clone())?;
let create_flownode_flow_txn = self
.flownode_flow_manager
.build_create_txn(flow_id, new_flow_info.flownode_ids().clone());
let create_table_flow_txn = self.table_flow_manager.build_create_txn(
flow_id,
flow_routes
.into_iter()
.map(|(partition_id, route)| (partition_id, TableFlowValue { peer: route.peer }))
.collect(),
new_flow_info.source_table_ids(),
)?;
let txn = Txn::merge_all(vec![
create_flow_flow_name_txn,
create_flow_txn,
create_flow_routes_txn,
create_flownode_flow_txn,
create_table_flow_txn,
]);
info!(
"Creating flow {}.{}({}), with {} txn operations",
new_flow_info.catalog_name,
new_flow_info.flow_name,
flow_id,
txn.max_operations()
);
let mut resp = self.kv_backend.txn(txn).await?;
if !resp.succeeded {
let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
let remote_flow_flow_name =
on_create_flow_flow_name_failure(&mut set)?.with_context(|| {
error::UnexpectedSnafu {
err_msg: format!(
"Reads the empty flow name in comparing operation of the updating flow, flow_id: {flow_id}"
),
}
})?;
if remote_flow_flow_name.flow_id() != flow_id {
info!(
"Trying to updating flow {}.{}({}), but flow({}) already exists with a different flow id",
new_flow_info.catalog_name,
new_flow_info.flow_name,
flow_id,
remote_flow_flow_name.flow_id()
);
return error::UnexpectedSnafu {
err_msg: format!(
"Reads different flow id when updating flow({2}.{3}), prev flow id = {0}, updating with flow id = {1}",
remote_flow_flow_name.flow_id(),
flow_id,
new_flow_info.catalog_name,
new_flow_info.flow_name,
),
}.fail();
}
let remote_flow =
on_create_flow_failure(&mut set)?.with_context(|| error::UnexpectedSnafu {
err_msg: format!(
"Reads the empty flow in comparing operation of the updating flow, flow_id: {flow_id}"
),
})?;
let op_name = "updating flow";
ensure_values!(*remote_flow, new_flow_info.clone(), op_name);
}
Ok(())
}
fn flow_metadata_keys(&self, flow_id: FlowId, flow_value: &FlowInfoValue) -> Vec<Vec<u8>> {
let source_table_ids = flow_value.source_table_ids();
let mut keys =
@@ -560,4 +656,222 @@ mod tests {
// Ensures all keys are deleted
assert!(mem_kv.is_empty())
}
#[tokio::test]
async fn test_update_flow_metadata() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
let flow_id = 10;
let flow_value = test_flow_info_value(
"flow",
[(0, 1u64), (1, 2u64)].into(),
vec![1024, 1025, 1026],
);
let flow_routes = vec![
(
1u32,
FlowRouteValue {
peer: Peer::empty(1),
},
),
(
2,
FlowRouteValue {
peer: Peer::empty(2),
},
),
];
flow_metadata_manager
.create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
.await
.unwrap();
let new_flow_value = {
let mut tmp = flow_value.clone();
tmp.raw_sql = "new".to_string();
tmp
};
// Update flow instead
flow_metadata_manager
.update_flow_metadata(
flow_id,
&DeserializedValueWithBytes::from_inner(flow_value.clone()),
&new_flow_value,
flow_routes.clone(),
)
.await
.unwrap();
let got = flow_metadata_manager
.flow_info_manager()
.get(flow_id)
.await
.unwrap()
.unwrap();
let routes = flow_metadata_manager
.flow_route_manager()
.routes(flow_id)
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(
routes,
vec![
(
FlowRouteKey::new(flow_id, 1),
FlowRouteValue {
peer: Peer::empty(1),
},
),
(
FlowRouteKey::new(flow_id, 2),
FlowRouteValue {
peer: Peer::empty(2),
},
),
]
);
assert_eq!(got, new_flow_value);
let flows = flow_metadata_manager
.flownode_flow_manager()
.flows(1)
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(flows, vec![(flow_id, 0)]);
for table_id in [1024, 1025, 1026] {
let nodes = flow_metadata_manager
.table_flow_manager()
.flows(table_id)
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(
nodes,
vec![
(
TableFlowKey::new(table_id, 1, flow_id, 1),
TableFlowValue {
peer: Peer::empty(1)
}
),
(
TableFlowKey::new(table_id, 2, flow_id, 2),
TableFlowValue {
peer: Peer::empty(2)
}
)
]
);
}
}
#[tokio::test]
async fn test_update_flow_metadata_flow_replace_diff_id_err() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
let flow_id = 10;
let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
let flow_routes = vec![
(
1u32,
FlowRouteValue {
peer: Peer::empty(1),
},
),
(
2,
FlowRouteValue {
peer: Peer::empty(2),
},
),
];
flow_metadata_manager
.create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
.await
.unwrap();
// update again with same flow id
flow_metadata_manager
.update_flow_metadata(
flow_id,
&DeserializedValueWithBytes::from_inner(flow_value.clone()),
&flow_value,
flow_routes.clone(),
)
.await
.unwrap();
// update again with wrong flow id, expected error
let err = flow_metadata_manager
.update_flow_metadata(
flow_id + 1,
&DeserializedValueWithBytes::from_inner(flow_value.clone()),
&flow_value,
flow_routes,
)
.await
.unwrap_err();
assert_matches!(err, error::Error::Unexpected { .. });
assert!(err
.to_string()
.contains("Reads different flow id when updating flow"));
}
#[tokio::test]
async fn test_update_flow_metadata_unexpected_err_prev_value_diff() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
let flow_id = 10;
let catalog_name = "greptime";
let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
let flow_routes = vec![
(
1u32,
FlowRouteValue {
peer: Peer::empty(1),
},
),
(
2,
FlowRouteValue {
peer: Peer::empty(2),
},
),
];
flow_metadata_manager
.create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
.await
.unwrap();
// Creates again.
let another_sink_table_name = TableName {
catalog_name: catalog_name.to_string(),
schema_name: "my_schema".to_string(),
table_name: "another_sink_table".to_string(),
};
let flow_value = FlowInfoValue {
catalog_name: "greptime".to_string(),
flow_name: "flow".to_string(),
source_table_ids: vec![1024, 1025, 1026],
sink_table_name: another_sink_table_name,
flownode_ids: [(0, 1u64)].into(),
raw_sql: "raw".to_string(),
expire_after: Some(300),
comment: "hi".to_string(),
options: Default::default(),
};
let err = flow_metadata_manager
.update_flow_metadata(
flow_id,
&DeserializedValueWithBytes::from_inner(flow_value.clone()),
&flow_value,
flow_routes.clone(),
)
.await
.unwrap_err();
assert!(
err.to_string().contains("Reads the different value"),
"error: {:?}",
err
);
}
}

View File

@@ -26,7 +26,7 @@ use crate::error::{self, Result};
use crate::key::flow::FlowScoped;
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey, MetadataValue};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::FlownodeId;
@@ -196,6 +196,19 @@ impl FlowInfoManager {
.transpose()
}
/// Returns the [FlowInfoValue] with original bytes of specified `flow_id`.
pub async fn get_raw(
&self,
flow_id: FlowId,
) -> Result<Option<DeserializedValueWithBytes<FlowInfoValue>>> {
let key = FlowInfoKey::new(flow_id).to_bytes();
self.kv_backend
.get(&key)
.await?
.map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value))
.transpose()
}
/// Builds a create flow transaction.
/// It is expected that the `__flow/info/{flow_id}` wasn't occupied.
/// Otherwise, the transaction will retrieve existing value.
@@ -215,6 +228,36 @@ impl FlowInfoManager {
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)),
))
}
/// Builds a update flow transaction.
/// It is expected that the `__flow/info/{flow_id}` IS ALREADY occupied and equal to `prev_flow_value`,
/// but the new value can be the same, so to allow replace operation to happen even when the value is the same.
/// Otherwise, the transaction will retrieve existing value and fail.
pub(crate) fn build_update_txn(
&self,
flow_id: FlowId,
current_flow_value: &DeserializedValueWithBytes<FlowInfoValue>,
new_flow_value: &FlowInfoValue,
) -> Result<(
Txn,
impl FnOnce(&mut TxnOpGetResponseSet) -> FlowInfoDecodeResult,
)> {
let key = FlowInfoKey::new(flow_id).to_bytes();
let raw_value = new_flow_value.try_as_raw_value()?;
let prev_value = current_flow_value.get_raw_bytes();
let txn = Txn::new()
.when(vec![
Compare::new(key.clone(), CompareOp::NotEqual, None),
Compare::new(key.clone(), CompareOp::Equal, Some(prev_value)),
])
.and_then(vec![TxnOp::Put(key.clone(), raw_value)])
.or_else(vec![TxnOp::Get(key.clone())]);
Ok((
txn,
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)),
))
}
}
#[cfg(test)]

View File

@@ -26,7 +26,7 @@ use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{
BytesAdapter, DeserializedValueWithBytes, FlowId, MetadataKey, MetadataValue, NAME_PATTERN,
};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use crate::rpc::store::RangeRequest;
@@ -237,6 +237,37 @@ impl FlowNameManager {
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
))
}
/// Builds a update flow name transaction. Which doesn't change either the name or id, just checking if they are the same.
/// It's expected that the `__flow/name/{catalog}/{flow_name}` IS already occupied,
/// and both flow name and flow id is the same.
/// Otherwise, the transaction will retrieve existing value(and fail).
pub fn build_update_txn(
&self,
catalog_name: &str,
flow_name: &str,
flow_id: FlowId,
) -> Result<(
Txn,
impl FnOnce(&mut TxnOpGetResponseSet) -> FlowNameDecodeResult,
)> {
let key = FlowNameKey::new(catalog_name, flow_name);
let raw_key = key.to_bytes();
let flow_flow_name_value = FlowNameValue::new(flow_id);
let raw_value = flow_flow_name_value.try_as_raw_value()?;
let txn = Txn::new()
.when(vec![Compare::new(
raw_key.clone(),
CompareOp::Equal,
Some(raw_value),
)])
.or_else(vec![TxnOp::Get(raw_key.clone())]);
Ok((
txn,
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
))
}
}
#[cfg(test)]

View File

@@ -0,0 +1,86 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use crate::error::Result;
use crate::key::MAINTENANCE_KEY;
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::PutRequest;
pub type MaintenanceModeManagerRef = Arc<MaintenanceModeManager>;
/// The maintenance mode manager.
///
/// Used to enable or disable maintenance mode.
#[derive(Clone)]
pub struct MaintenanceModeManager {
kv_backend: KvBackendRef,
}
impl MaintenanceModeManager {
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
}
/// Enables maintenance mode.
pub async fn set_maintenance_mode(&self) -> Result<()> {
let req = PutRequest {
key: Vec::from(MAINTENANCE_KEY),
value: vec![],
prev_kv: false,
};
self.kv_backend.put(req).await?;
Ok(())
}
/// Unsets maintenance mode.
pub async fn unset_maintenance_mode(&self) -> Result<()> {
self.kv_backend
.delete(MAINTENANCE_KEY.as_bytes(), false)
.await?;
Ok(())
}
/// Returns true if maintenance mode is enabled.
pub async fn maintenance_mode(&self) -> Result<bool> {
self.kv_backend.exists(MAINTENANCE_KEY.as_bytes()).await
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use crate::key::maintenance::MaintenanceModeManager;
use crate::kv_backend::memory::MemoryKvBackend;
#[tokio::test]
async fn test_maintenance_mode_manager() {
let maintenance_mode_manager = Arc::new(MaintenanceModeManager::new(Arc::new(
MemoryKvBackend::new(),
)));
assert!(!maintenance_mode_manager.maintenance_mode().await.unwrap());
maintenance_mode_manager
.set_maintenance_mode()
.await
.unwrap();
assert!(maintenance_mode_manager.maintenance_mode().await.unwrap());
maintenance_mode_manager
.unset_maintenance_mode()
.await
.unwrap();
assert!(!maintenance_mode_manager.maintenance_mode().await.unwrap());
}
}

View File

@@ -75,7 +75,10 @@ impl SchemaMetadataManager {
&table_info.table_info.catalog_name,
&table_info.table_info.schema_name,
);
self.schema_manager.get(key).await
self.schema_manager
.get(key)
.await
.map(|v| v.map(|v| v.into_inner()))
}
#[cfg(any(test, feature = "testing"))]

View File

@@ -21,10 +21,14 @@ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use futures::stream::BoxStream;
use humantime_serde::re::humantime;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use super::txn_helper::TxnOpGetResponseSet;
use super::DeserializedValueWithBytes;
use crate::ensure_values;
use crate::error::{self, Error, InvalidMetadataSnafu, ParseOptionSnafu, Result};
use crate::key::{MetadataKey, SCHEMA_NAME_KEY_PATTERN, SCHEMA_NAME_KEY_PREFIX};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use crate::rpc::store::RangeRequest;
@@ -171,6 +175,8 @@ pub struct SchemaManager {
kv_backend: KvBackendRef,
}
pub type SchemaNameDecodeResult = Result<Option<DeserializedValueWithBytes<SchemaNameValue>>>;
impl SchemaManager {
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
@@ -204,11 +210,15 @@ impl SchemaManager {
self.kv_backend.exists(&raw_key).await
}
pub async fn get(&self, schema: SchemaNameKey<'_>) -> Result<Option<SchemaNameValue>> {
pub async fn get(
&self,
schema: SchemaNameKey<'_>,
) -> Result<Option<DeserializedValueWithBytes<SchemaNameValue>>> {
let raw_key = schema.to_bytes();
let value = self.kv_backend.get(&raw_key).await?;
value
.and_then(|v| SchemaNameValue::try_from_raw_value(v.value.as_ref()).transpose())
self.kv_backend
.get(&raw_key)
.await?
.map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value))
.transpose()
}
@@ -220,6 +230,54 @@ impl SchemaManager {
Ok(())
}
pub(crate) fn build_update_txn(
&self,
schema: SchemaNameKey<'_>,
current_schema_value: &DeserializedValueWithBytes<SchemaNameValue>,
new_schema_value: &SchemaNameValue,
) -> Result<(
Txn,
impl FnOnce(&mut TxnOpGetResponseSet) -> SchemaNameDecodeResult,
)> {
let raw_key = schema.to_bytes();
let raw_value = current_schema_value.get_raw_bytes();
let new_raw_value: Vec<u8> = new_schema_value.try_as_raw_value()?;
let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value);
Ok((
txn,
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
))
}
/// Updates a [SchemaNameKey].
pub async fn update(
&self,
schema: SchemaNameKey<'_>,
current_schema_value: &DeserializedValueWithBytes<SchemaNameValue>,
new_schema_value: &SchemaNameValue,
) -> Result<()> {
let (txn, on_failure) =
self.build_update_txn(schema, current_schema_value, new_schema_value)?;
let mut r = self.kv_backend.txn(txn).await?;
if !r.succeeded {
let mut set = TxnOpGetResponseSet::from(&mut r.responses);
let remote_schema_value = on_failure(&mut set)?
.context(error::UnexpectedSnafu {
err_msg:
"Reads the empty schema name value in comparing operation of updating schema name value",
})?
.into_inner();
let op_name = "the updating schema name value";
ensure_values!(&remote_schema_value, new_schema_value, op_name);
}
Ok(())
}
/// Returns a schema stream, it lists all schemas belong to the target `catalog`.
pub fn schema_names(&self, catalog: &str) -> BoxStream<'static, Result<String>> {
let start_key = SchemaNameKey::range_start_key(catalog);
@@ -306,4 +364,42 @@ mod tests {
assert!(!manager.exists(wrong_schema_key).await.unwrap());
}
#[tokio::test]
async fn test_update_schema_value() {
let manager = SchemaManager::new(Arc::new(MemoryKvBackend::default()));
let schema_key = SchemaNameKey::new("my-catalog", "my-schema");
manager.create(schema_key, None, false).await.unwrap();
let current_schema_value = manager.get(schema_key).await.unwrap().unwrap();
let new_schema_value = SchemaNameValue {
ttl: Some(Duration::from_secs(10)),
};
manager
.update(schema_key, &current_schema_value, &new_schema_value)
.await
.unwrap();
// Update with the same value, should be ok
manager
.update(schema_key, &current_schema_value, &new_schema_value)
.await
.unwrap();
let new_schema_value = SchemaNameValue {
ttl: Some(Duration::from_secs(40)),
};
let incorrect_schema_value = SchemaNameValue {
ttl: Some(Duration::from_secs(20)),
}
.try_as_raw_value()
.unwrap();
let incorrect_schema_value =
DeserializedValueWithBytes::from_inner_slice(&incorrect_schema_value).unwrap();
manager
.update(schema_key, &incorrect_schema_value, &new_schema_value)
.await
.unwrap_err();
}
}

View File

@@ -14,25 +14,29 @@
use std::collections::{HashMap, HashSet};
use std::result;
use std::time::Duration;
use api::v1::alter_database_expr::Kind as PbAlterDatabaseKind;
use api::v1::meta::ddl_task_request::Task;
use api::v1::meta::{
AlterTableTask as PbAlterTableTask, AlterTableTasks as PbAlterTableTasks,
CreateDatabaseTask as PbCreateDatabaseTask, CreateFlowTask as PbCreateFlowTask,
CreateTableTask as PbCreateTableTask, CreateTableTasks as PbCreateTableTasks,
CreateViewTask as PbCreateViewTask, DdlTaskRequest as PbDdlTaskRequest,
DdlTaskResponse as PbDdlTaskResponse, DropDatabaseTask as PbDropDatabaseTask,
DropFlowTask as PbDropFlowTask, DropTableTask as PbDropTableTask,
DropTableTasks as PbDropTableTasks, DropViewTask as PbDropViewTask, Partition, ProcedureId,
AlterDatabaseTask as PbAlterDatabaseTask, AlterTableTask as PbAlterTableTask,
AlterTableTasks as PbAlterTableTasks, CreateDatabaseTask as PbCreateDatabaseTask,
CreateFlowTask as PbCreateFlowTask, CreateTableTask as PbCreateTableTask,
CreateTableTasks as PbCreateTableTasks, CreateViewTask as PbCreateViewTask,
DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse,
DropDatabaseTask as PbDropDatabaseTask, DropFlowTask as PbDropFlowTask,
DropTableTask as PbDropTableTask, DropTableTasks as PbDropTableTasks,
DropViewTask as PbDropViewTask, Partition, ProcedureId,
TruncateTableTask as PbTruncateTableTask,
};
use api::v1::{
AlterExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr,
DropDatabaseExpr, DropFlowExpr, DropTableExpr, DropViewExpr, ExpireAfter,
QueryContext as PbQueryContext, TruncateTableExpr,
AlterDatabaseExpr, AlterTableExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr,
CreateViewExpr, DropDatabaseExpr, DropFlowExpr, DropTableExpr, DropViewExpr, ExpireAfter,
Option as PbOption, QueryContext as PbQueryContext, TruncateTableExpr,
};
use base64::engine::general_purpose;
use base64::Engine as _;
use humantime_serde::re::humantime;
use prost::Message;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DefaultOnNull};
@@ -42,7 +46,7 @@ use table::metadata::{RawTableInfo, TableId};
use table::table_name::TableName;
use table::table_reference::TableReference;
use crate::error::{self, Result};
use crate::error::{self, InvalidSetDatabaseOptionSnafu, InvalidUnsetDatabaseOptionSnafu, Result};
use crate::key::FlowId;
/// DDL tasks
@@ -57,6 +61,7 @@ pub enum DdlTask {
AlterLogicalTables(Vec<AlterTableTask>),
CreateDatabase(CreateDatabaseTask),
DropDatabase(DropDatabaseTask),
AlterDatabase(AlterDatabaseTask),
CreateFlow(CreateFlowTask),
DropFlow(DropFlowTask),
CreateView(CreateViewTask),
@@ -99,7 +104,7 @@ impl DdlTask {
}
/// Creates a [`DdlTask`] to alter several logical tables.
pub fn new_alter_logical_tables(table_data: Vec<AlterExpr>) -> Self {
pub fn new_alter_logical_tables(table_data: Vec<AlterTableExpr>) -> Self {
DdlTask::AlterLogicalTables(
table_data
.into_iter()
@@ -149,8 +154,13 @@ impl DdlTask {
})
}
/// Creates a [`DdlTask`] to alter a database.
pub fn new_alter_database(alter_expr: AlterDatabaseExpr) -> Self {
DdlTask::AlterDatabase(AlterDatabaseTask { alter_expr })
}
/// Creates a [`DdlTask`] to alter a table.
pub fn new_alter_table(alter_table: AlterExpr) -> Self {
pub fn new_alter_table(alter_table: AlterTableExpr) -> Self {
DdlTask::AlterTable(AlterTableTask { alter_table })
}
@@ -223,6 +233,9 @@ impl TryFrom<Task> for DdlTask {
Task::DropDatabaseTask(drop_database) => {
Ok(DdlTask::DropDatabase(drop_database.try_into()?))
}
Task::AlterDatabaseTask(alter_database) => {
Ok(DdlTask::AlterDatabase(alter_database.try_into()?))
}
Task::CreateFlowTask(create_flow) => Ok(DdlTask::CreateFlow(create_flow.try_into()?)),
Task::DropFlowTask(drop_flow) => Ok(DdlTask::DropFlow(drop_flow.try_into()?)),
Task::CreateViewTask(create_view) => Ok(DdlTask::CreateView(create_view.try_into()?)),
@@ -272,6 +285,7 @@ impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
}
DdlTask::CreateDatabase(task) => Task::CreateDatabaseTask(task.try_into()?),
DdlTask::DropDatabase(task) => Task::DropDatabaseTask(task.try_into()?),
DdlTask::AlterDatabase(task) => Task::AlterDatabaseTask(task.try_into()?),
DdlTask::CreateFlow(task) => Task::CreateFlowTask(task.into()),
DdlTask::DropFlow(task) => Task::DropFlowTask(task.into()),
DdlTask::CreateView(task) => Task::CreateViewTask(task.try_into()?),
@@ -680,7 +694,8 @@ impl<'de> Deserialize<'de> for CreateTableTask {
#[derive(Debug, PartialEq, Clone)]
pub struct AlterTableTask {
pub alter_table: AlterExpr,
// TODO(CookiePieWw): Replace proto struct with user-defined struct
pub alter_table: AlterTableExpr,
}
impl AlterTableTask {
@@ -932,6 +947,125 @@ impl TryFrom<DropDatabaseTask> for PbDropDatabaseTask {
}
}
#[derive(Debug, PartialEq, Clone)]
pub struct AlterDatabaseTask {
pub alter_expr: AlterDatabaseExpr,
}
impl TryFrom<AlterDatabaseTask> for PbAlterDatabaseTask {
type Error = error::Error;
fn try_from(task: AlterDatabaseTask) -> Result<Self> {
Ok(PbAlterDatabaseTask {
task: Some(task.alter_expr),
})
}
}
impl TryFrom<PbAlterDatabaseTask> for AlterDatabaseTask {
type Error = error::Error;
fn try_from(pb: PbAlterDatabaseTask) -> Result<Self> {
let alter_expr = pb.task.context(error::InvalidProtoMsgSnafu {
err_msg: "expected alter database",
})?;
Ok(AlterDatabaseTask { alter_expr })
}
}
impl TryFrom<PbAlterDatabaseKind> for AlterDatabaseKind {
type Error = error::Error;
fn try_from(pb: PbAlterDatabaseKind) -> Result<Self> {
match pb {
PbAlterDatabaseKind::SetDatabaseOptions(options) => {
Ok(AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(
options
.set_database_options
.into_iter()
.map(SetDatabaseOption::try_from)
.collect::<Result<Vec<_>>>()?,
)))
}
PbAlterDatabaseKind::UnsetDatabaseOptions(options) => Ok(
AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(
options
.keys
.iter()
.map(|key| UnsetDatabaseOption::try_from(key.as_str()))
.collect::<Result<Vec<_>>>()?,
)),
),
}
}
}
const TTL_KEY: &str = "ttl";
impl TryFrom<PbOption> for SetDatabaseOption {
type Error = error::Error;
fn try_from(PbOption { key, value }: PbOption) -> Result<Self> {
match key.to_ascii_lowercase().as_str() {
TTL_KEY => {
let ttl = if value.is_empty() {
Duration::from_secs(0)
} else {
humantime::parse_duration(&value)
.map_err(|_| InvalidSetDatabaseOptionSnafu { key, value }.build())?
};
Ok(SetDatabaseOption::Ttl(ttl))
}
_ => InvalidSetDatabaseOptionSnafu { key, value }.fail(),
}
}
}
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
pub enum SetDatabaseOption {
Ttl(Duration),
}
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
pub enum UnsetDatabaseOption {
Ttl,
}
impl TryFrom<&str> for UnsetDatabaseOption {
type Error = error::Error;
fn try_from(key: &str) -> Result<Self> {
match key.to_ascii_lowercase().as_str() {
TTL_KEY => Ok(UnsetDatabaseOption::Ttl),
_ => InvalidUnsetDatabaseOptionSnafu { key }.fail(),
}
}
}
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
pub struct SetDatabaseOptions(pub Vec<SetDatabaseOption>);
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
pub struct UnsetDatabaseOptions(pub Vec<UnsetDatabaseOption>);
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
pub enum AlterDatabaseKind {
SetDatabaseOptions(SetDatabaseOptions),
UnsetDatabaseOptions(UnsetDatabaseOptions),
}
impl AlterDatabaseTask {
pub fn catalog(&self) -> &str {
&self.alter_expr.catalog_name
}
pub fn schema(&self) -> &str {
&self.alter_expr.catalog_name
}
}
/// Create flow
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CreateFlowTask {
@@ -1118,7 +1252,7 @@ impl From<QueryContext> for PbQueryContext {
mod tests {
use std::sync::Arc;
use api::v1::{AlterExpr, ColumnDef, CreateTableExpr, SemanticType};
use api::v1::{AlterTableExpr, ColumnDef, CreateTableExpr, SemanticType};
use datatypes::schema::{ColumnSchema, RawSchema, SchemaBuilder};
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use store_api::storage::ConcreteDataType;
@@ -1146,7 +1280,7 @@ mod tests {
#[test]
fn test_basic_ser_de_alter_table_task() {
let task = AlterTableTask {
alter_table: AlterExpr::default(),
alter_table: AlterTableExpr::default(),
};
let output = serde_json::to_vec(&task).unwrap();

View File

@@ -245,6 +245,14 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid vector string: {}", vec_str))]
InvalidVectorString {
vec_str: String,
source: DataTypeError,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -273,7 +281,8 @@ impl ErrorExt for Error {
| Error::IntoVector { source, .. }
| Error::FromScalarValue { source, .. }
| Error::ConvertArrowSchema { source, .. }
| Error::FromArrowArray { source, .. } => source.status_code(),
| Error::FromArrowArray { source, .. }
| Error::InvalidVectorString { source, .. } => source.status_code(),
Error::MissingTableMutationHandler { .. }
| Error::MissingProcedureServiceHandler { .. }

View File

@@ -20,6 +20,7 @@ pin-project.workspace = true
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true
tokio.workspace = true
[dev-dependencies]
tokio.workspace = true

View File

@@ -161,6 +161,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Stream timeout"))]
StreamTimeout {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: tokio::time::error::Elapsed,
},
}
impl ErrorExt for Error {
@@ -190,6 +197,8 @@ impl ErrorExt for Error {
Error::SchemaConversion { source, .. } | Error::CastVector { source, .. } => {
source.status_code()
}
Error::StreamTimeout { .. } => StatusCode::Cancelled,
}
}

View File

@@ -14,6 +14,8 @@
//! Datanode configurations
use core::time::Duration;
use common_base::readable_size::ReadableSize;
use common_base::secrets::{ExposeSecret, SecretString};
use common_config::Configurable;
@@ -112,6 +114,38 @@ pub struct ObjectStorageCacheConfig {
pub cache_capacity: Option<ReadableSize>,
}
/// The http client options to the storage.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(default)]
pub struct HttpClientConfig {
/// The maximum idle connection per host allowed in the pool.
pub(crate) pool_max_idle_per_host: u32,
/// The timeout for only the connect phase of a http client.
#[serde(with = "humantime_serde")]
pub(crate) connect_timeout: Duration,
/// The total request timeout, applied from when the request starts connecting until the response body has finished.
/// Also considered a total deadline.
#[serde(with = "humantime_serde")]
pub(crate) timeout: Duration,
/// The timeout for idle sockets being kept-alive.
#[serde(with = "humantime_serde")]
pub(crate) pool_idle_timeout: Duration,
}
impl Default for HttpClientConfig {
fn default() -> Self {
Self {
pool_max_idle_per_host: 1024,
connect_timeout: Duration::from_secs(30),
timeout: Duration::from_secs(30),
pool_idle_timeout: Duration::from_secs(90),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct S3Config {
@@ -126,6 +160,7 @@ pub struct S3Config {
pub region: Option<String>,
#[serde(flatten)]
pub cache: ObjectStorageCacheConfig,
pub http_client: HttpClientConfig,
}
impl PartialEq for S3Config {
@@ -138,6 +173,7 @@ impl PartialEq for S3Config {
&& self.endpoint == other.endpoint
&& self.region == other.region
&& self.cache == other.cache
&& self.http_client == other.http_client
}
}
@@ -154,6 +190,7 @@ pub struct OssConfig {
pub endpoint: String,
#[serde(flatten)]
pub cache: ObjectStorageCacheConfig,
pub http_client: HttpClientConfig,
}
impl PartialEq for OssConfig {
@@ -165,6 +202,7 @@ impl PartialEq for OssConfig {
&& self.access_key_secret.expose_secret() == other.access_key_secret.expose_secret()
&& self.endpoint == other.endpoint
&& self.cache == other.cache
&& self.http_client == other.http_client
}
}
@@ -182,6 +220,7 @@ pub struct AzblobConfig {
pub sas_token: Option<String>,
#[serde(flatten)]
pub cache: ObjectStorageCacheConfig,
pub http_client: HttpClientConfig,
}
impl PartialEq for AzblobConfig {
@@ -194,6 +233,7 @@ impl PartialEq for AzblobConfig {
&& self.endpoint == other.endpoint
&& self.sas_token == other.sas_token
&& self.cache == other.cache
&& self.http_client == other.http_client
}
}
@@ -211,6 +251,7 @@ pub struct GcsConfig {
pub endpoint: String,
#[serde(flatten)]
pub cache: ObjectStorageCacheConfig,
pub http_client: HttpClientConfig,
}
impl PartialEq for GcsConfig {
@@ -223,6 +264,7 @@ impl PartialEq for GcsConfig {
&& self.credential.expose_secret() == other.credential.expose_secret()
&& self.endpoint == other.endpoint
&& self.cache == other.cache
&& self.http_client == other.http_client
}
}
@@ -237,6 +279,7 @@ impl Default for S3Config {
endpoint: Option::default(),
region: Option::default(),
cache: ObjectStorageCacheConfig::default(),
http_client: HttpClientConfig::default(),
}
}
}
@@ -251,6 +294,7 @@ impl Default for OssConfig {
access_key_secret: SecretString::from(String::default()),
endpoint: String::default(),
cache: ObjectStorageCacheConfig::default(),
http_client: HttpClientConfig::default(),
}
}
}
@@ -266,6 +310,7 @@ impl Default for AzblobConfig {
endpoint: String::default(),
sas_token: Option::default(),
cache: ObjectStorageCacheConfig::default(),
http_client: HttpClientConfig::default(),
}
}
}
@@ -281,6 +326,7 @@ impl Default for GcsConfig {
credential: SecretString::from(String::default()),
endpoint: String::default(),
cache: ObjectStorageCacheConfig::default(),
http_client: HttpClientConfig::default(),
}
}
}

View File

@@ -18,6 +18,7 @@ use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use catalog::kvbackend::CachedKvBackendBuilder;
use catalog::memory::MemoryCatalogManager;
use common_base::Plugins;
use common_error::ext::BoxedError;
@@ -208,7 +209,10 @@ impl DatanodeBuilder {
(Box::new(NoopRegionServerEventListener) as _, None)
};
let schema_metadata_manager = Arc::new(SchemaMetadataManager::new(kv_backend.clone()));
let cached_kv_backend = Arc::new(CachedKvBackendBuilder::new(kv_backend.clone()).build());
let schema_metadata_manager =
Arc::new(SchemaMetadataManager::new(cached_kv_backend.clone()));
let region_server = self
.new_region_server(schema_metadata_manager, region_event_listener)
.await?;
@@ -239,7 +243,15 @@ impl DatanodeBuilder {
}
let heartbeat_task = if let Some(meta_client) = meta_client {
Some(HeartbeatTask::try_new(&self.opts, region_server.clone(), meta_client).await?)
Some(
HeartbeatTask::try_new(
&self.opts,
region_server.clone(),
meta_client,
cached_kv_backend,
)
.await?,
)
} else {
None
};

View File

@@ -18,6 +18,7 @@ use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat};
use catalog::kvbackend::CachedKvBackend;
use common_meta::datanode::REGION_STATISTIC_KEY;
use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
@@ -39,6 +40,7 @@ use crate::alive_keeper::RegionAliveKeeper;
use crate::config::DatanodeOptions;
use crate::error::{self, MetaClientInitSnafu, Result};
use crate::event_listener::RegionServerEventReceiver;
use crate::heartbeat::handler::cache_invalidator::InvalidateSchemaCacheHandler;
use crate::metrics::{self, HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
use crate::region_server::RegionServer;
@@ -70,6 +72,7 @@ impl HeartbeatTask {
opts: &DatanodeOptions,
region_server: RegionServer,
meta_client: MetaClientRef,
cache_kv_backend: Arc<CachedKvBackend>,
) -> Result<Self> {
let region_alive_keeper = Arc::new(RegionAliveKeeper::new(
region_server.clone(),
@@ -79,6 +82,7 @@ impl HeartbeatTask {
region_alive_keeper.clone(),
Arc::new(ParseMailboxMessageHandler),
Arc::new(RegionHeartbeatResponseHandler::new(region_server.clone())),
Arc::new(InvalidateSchemaCacheHandler::new(cache_kv_backend)),
]));
Ok(Self {

View File

@@ -24,6 +24,7 @@ use futures::future::BoxFuture;
use snafu::OptionExt;
use store_api::storage::RegionId;
pub(crate) mod cache_invalidator;
mod close_region;
mod downgrade_region;
mod open_region;
@@ -134,7 +135,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
}
});
Ok(HandleControl::Done)
Ok(HandleControl::Continue)
}
}
@@ -285,7 +286,7 @@ mod tests {
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
assert_matches!(control, HandleControl::Done);
assert_matches!(control, HandleControl::Continue);
let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
@@ -340,7 +341,7 @@ mod tests {
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
assert_matches!(control, HandleControl::Done);
assert_matches!(control, HandleControl::Continue);
let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
@@ -373,7 +374,7 @@ mod tests {
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
assert_matches!(control, HandleControl::Done);
assert_matches!(control, HandleControl::Continue);
let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
@@ -420,7 +421,7 @@ mod tests {
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
assert_matches!(control, HandleControl::Done);
assert_matches!(control, HandleControl::Continue);
let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
@@ -442,7 +443,7 @@ mod tests {
});
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
assert_matches!(control, HandleControl::Done);
assert_matches!(control, HandleControl::Continue);
let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();

View File

@@ -0,0 +1,167 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Schema cache invalidator handler
use std::sync::Arc;
use async_trait::async_trait;
use catalog::kvbackend::CachedKvBackend;
use common_meta::cache_invalidator::KvCacheInvalidator;
use common_meta::heartbeat::handler::{
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
};
use common_meta::instruction::{CacheIdent, Instruction};
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::MetadataKey;
use common_telemetry::debug;
#[derive(Clone)]
pub(crate) struct InvalidateSchemaCacheHandler {
cached_kv_backend: Arc<CachedKvBackend>,
}
#[async_trait]
impl HeartbeatResponseHandler for InvalidateSchemaCacheHandler {
fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
matches!(
ctx.incoming_message.as_ref(),
Some((_, Instruction::InvalidateCaches(_)))
)
}
async fn handle(
&self,
ctx: &mut HeartbeatResponseHandlerContext,
) -> common_meta::error::Result<HandleControl> {
let Some((_, Instruction::InvalidateCaches(caches))) = ctx.incoming_message.take() else {
unreachable!("InvalidateSchemaCacheHandler: should be guarded by 'is_acceptable'")
};
debug!(
"InvalidateSchemaCacheHandler: invalidating caches: {:?}",
caches
);
for cache in caches {
let CacheIdent::SchemaName(schema_name) = cache else {
continue;
};
let key: SchemaNameKey = (&schema_name).into();
let key_bytes = key.to_bytes();
// invalidate cache
self.cached_kv_backend.invalidate_key(&key_bytes).await;
}
Ok(HandleControl::Done)
}
}
impl InvalidateSchemaCacheHandler {
pub fn new(cached_kv_backend: Arc<CachedKvBackend>) -> Self {
Self { cached_kv_backend }
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::HeartbeatResponse;
use catalog::kvbackend::CachedKvBackendBuilder;
use common_meta::heartbeat::handler::{
HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor,
};
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta};
use common_meta::instruction::{CacheIdent, Instruction};
use common_meta::key::schema_name::{SchemaName, SchemaNameKey, SchemaNameValue};
use common_meta::key::{MetadataKey, SchemaMetadataManager};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackend;
use common_meta::rpc::store::PutRequest;
use crate::heartbeat::handler::cache_invalidator::InvalidateSchemaCacheHandler;
#[tokio::test]
async fn test_invalidate_schema_cache_handler() {
let inner_kv = Arc::new(MemoryKvBackend::default());
let cached_kv = Arc::new(CachedKvBackendBuilder::new(inner_kv.clone()).build());
let schema_metadata_manager = SchemaMetadataManager::new(cached_kv.clone());
let schema_name = "test_schema";
let catalog_name = "test_catalog";
schema_metadata_manager
.register_region_table_info(
1,
"test_table",
schema_name,
catalog_name,
Some(SchemaNameValue {
ttl: Some(Duration::from_secs(1)),
}),
)
.await;
schema_metadata_manager
.get_schema_options_by_table_id(1)
.await
.unwrap();
let schema_key = SchemaNameKey::new(catalog_name, schema_name).to_bytes();
let new_schema_value = SchemaNameValue {
ttl: Some(Duration::from_secs(3)),
}
.try_as_raw_value()
.unwrap();
inner_kv
.put(PutRequest {
key: schema_key.clone(),
value: new_schema_value,
prev_kv: false,
})
.await
.unwrap();
let executor = Arc::new(HandlerGroupExecutor::new(vec![Arc::new(
InvalidateSchemaCacheHandler::new(cached_kv),
)]));
let (tx, _) = tokio::sync::mpsc::channel(8);
let mailbox = Arc::new(HeartbeatMailbox::new(tx));
// removes a valid key
let response = HeartbeatResponse::default();
let mut ctx: HeartbeatResponseHandlerContext =
HeartbeatResponseHandlerContext::new(mailbox, response);
ctx.incoming_message = Some((
MessageMeta::new_test(1, "hi", "foo", "bar"),
Instruction::InvalidateCaches(vec![CacheIdent::SchemaName(SchemaName {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
})]),
));
executor.handle(ctx).await.unwrap();
assert_eq!(
Some(Duration::from_secs(3)),
SchemaNameValue::try_from_raw_value(
&inner_kv.get(&schema_key).await.unwrap().unwrap().value
)
.unwrap()
.unwrap()
.ttl
);
}
}

View File

@@ -74,6 +74,10 @@ impl HandlerContext {
// Ignores flush request
if !writable {
warn!(
"Region: {region_id} is not writable, flush_timeout: {:?}",
flush_timeout
);
return self.downgrade_to_follower_gracefully(region_id).await;
}

View File

@@ -32,7 +32,7 @@ use object_store::util::{join_dir, normalize_dir, with_instrument_layers};
use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder};
use snafu::prelude::*;
use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
use crate::config::{HttpClientConfig, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
use crate::error::{self, Result};
pub(crate) async fn new_raw_object_store(
@@ -177,7 +177,7 @@ pub(crate) fn clean_temp_dir(dir: &str) -> Result<()> {
Ok(())
}
pub(crate) fn build_http_client() -> Result<HttpClient> {
pub(crate) fn build_http_client(config: &HttpClientConfig) -> Result<HttpClient> {
let http_builder = {
let mut builder = reqwest::ClientBuilder::new();
@@ -186,25 +186,28 @@ pub(crate) fn build_http_client() -> Result<HttpClient> {
let pool_max_idle_per_host = env::var("_GREPTIMEDB_HTTP_POOL_MAX_IDLE_PER_HOST")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or(usize::MAX);
.inspect(|_| warn!("'_GREPTIMEDB_HTTP_POOL_MAX_IDLE_PER_HOST' might be deprecated in the future. Please set it in the config file instead."))
.unwrap_or(config.pool_max_idle_per_host as usize);
builder = builder.pool_max_idle_per_host(pool_max_idle_per_host);
// Connect timeout default to 30s.
let connect_timeout = env::var("_GREPTIMEDB_HTTP_CONNECT_TIMEOUT")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(30);
builder = builder.connect_timeout(Duration::from_secs(connect_timeout));
.and_then(|v| v.parse::<u64>().ok().map(Duration::from_secs))
.inspect(|_| warn!("'_GREPTIMEDB_HTTP_CONNECT_TIMEOUT' might be deprecated in the future. Please set it in the config file instead."))
.unwrap_or(config.connect_timeout);
builder = builder.connect_timeout(connect_timeout);
// Pool connection idle timeout default to 90s.
let idle_timeout = env::var("_GREPTIMEDB_HTTP_POOL_IDLE_TIMEOUT")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(90);
.and_then(|v| v.parse::<u64>().ok().map(Duration::from_secs))
.inspect(|_| warn!("'_GREPTIMEDB_HTTP_POOL_IDLE_TIMEOUT' might be deprecated in the future. Please set it in the config file instead."))
.unwrap_or(config.pool_idle_timeout);
builder = builder.pool_idle_timeout(Duration::from_secs(idle_timeout));
builder = builder.pool_idle_timeout(idle_timeout);
builder
builder.timeout(config.timeout)
};
HttpClient::build(http_builder).context(error::InitBackendSnafu)

View File

@@ -30,13 +30,15 @@ pub(crate) async fn new_azblob_object_store(azblob_config: &AzblobConfig) -> Res
azblob_config.container, &root
);
let client = build_http_client(&azblob_config.http_client)?;
let mut builder = Azblob::default()
.root(&root)
.container(&azblob_config.container)
.endpoint(&azblob_config.endpoint)
.account_name(azblob_config.account_name.expose_secret())
.account_key(azblob_config.account_key.expose_secret())
.http_client(build_http_client()?);
.http_client(client);
if let Some(token) = &azblob_config.sas_token {
builder = builder.sas_token(token);

View File

@@ -29,6 +29,8 @@ pub(crate) async fn new_gcs_object_store(gcs_config: &GcsConfig) -> Result<Objec
gcs_config.bucket, &root
);
let client = build_http_client(&gcs_config.http_client);
let builder = Gcs::default()
.root(&root)
.bucket(&gcs_config.bucket)
@@ -36,7 +38,7 @@ pub(crate) async fn new_gcs_object_store(gcs_config: &GcsConfig) -> Result<Objec
.credential_path(gcs_config.credential_path.expose_secret())
.credential(gcs_config.credential.expose_secret())
.endpoint(&gcs_config.endpoint)
.http_client(build_http_client()?);
.http_client(client?);
Ok(ObjectStore::new(builder)
.context(error::InitBackendSnafu)?

View File

@@ -29,13 +29,15 @@ pub(crate) async fn new_oss_object_store(oss_config: &OssConfig) -> Result<Objec
oss_config.bucket, &root
);
let client = build_http_client(&oss_config.http_client)?;
let builder = Oss::default()
.root(&root)
.bucket(&oss_config.bucket)
.endpoint(&oss_config.endpoint)
.access_key_id(oss_config.access_key_id.expose_secret())
.access_key_secret(oss_config.access_key_secret.expose_secret())
.http_client(build_http_client()?);
.http_client(client);
Ok(ObjectStore::new(builder)
.context(error::InitBackendSnafu)?

View File

@@ -30,12 +30,14 @@ pub(crate) async fn new_s3_object_store(s3_config: &S3Config) -> Result<ObjectSt
s3_config.bucket, &root
);
let client = build_http_client(&s3_config.http_client)?;
let mut builder = S3::default()
.root(&root)
.bucket(&s3_config.bucket)
.access_key_id(s3_config.access_key_id.expose_secret())
.secret_access_key(s3_config.secret_access_key.expose_secret())
.http_client(build_http_client()?);
.http_client(client);
if s3_config.endpoint.is_some() {
builder = builder.endpoint(s3_config.endpoint.as_ref().unwrap());

View File

@@ -85,15 +85,12 @@ pub fn vector_type_value_to_string(val: &[u8], dim: u32) -> Result<String> {
return Ok("[]".to_string());
}
let elements = unsafe {
std::slice::from_raw_parts(
val.as_ptr() as *const f32,
val.len() / std::mem::size_of::<f32>(),
)
};
let elements = val
.chunks_exact(std::mem::size_of::<f32>())
.map(|e| f32::from_le_bytes(e.try_into().unwrap()));
let mut s = String::from("[");
for (i, e) in elements.iter().enumerate() {
for (i, e) in elements.enumerate() {
if i > 0 {
s.push(',');
}
@@ -105,7 +102,7 @@ pub fn vector_type_value_to_string(val: &[u8], dim: u32) -> Result<String> {
/// Parses a string to a vector type value
/// Valid input format: "[1.0,2.0,3.0]", "[1.0, 2.0, 3.0]"
pub fn parse_string_to_vector_type_value(s: &str, dim: u32) -> Result<Vec<u8>> {
pub fn parse_string_to_vector_type_value(s: &str, dim: Option<u32>) -> Result<Vec<u8>> {
// Trim the brackets
let trimmed = s.trim();
if !trimmed.starts_with('[') || !trimmed.ends_with(']') {
@@ -118,7 +115,7 @@ pub fn parse_string_to_vector_type_value(s: &str, dim: u32) -> Result<Vec<u8>> {
let content = trimmed[1..trimmed.len() - 1].trim();
if content.is_empty() {
if dim != 0 {
if dim.map_or(false, |d| d != 0) {
return InvalidVectorSnafu {
msg: format!("Failed to parse {s} to Vector value: wrong dimension"),
}
@@ -142,7 +139,7 @@ pub fn parse_string_to_vector_type_value(s: &str, dim: u32) -> Result<Vec<u8>> {
.collect::<Result<Vec<f32>>>()?;
// Check dimension
if elements.len() != dim as usize {
if dim.map_or(false, |d| d as usize != elements.len()) {
return InvalidVectorSnafu {
msg: format!("Failed to parse {s} to Vector value: wrong dimension"),
}
@@ -150,12 +147,19 @@ pub fn parse_string_to_vector_type_value(s: &str, dim: u32) -> Result<Vec<u8>> {
}
// Convert Vec<f32> to Vec<u8>
let bytes = unsafe {
std::slice::from_raw_parts(
elements.as_ptr() as *const u8,
elements.len() * std::mem::size_of::<f32>(),
)
.to_vec()
let bytes = if cfg!(target_endian = "little") {
unsafe {
std::slice::from_raw_parts(
elements.as_ptr() as *const u8,
elements.len() * std::mem::size_of::<f32>(),
)
.to_vec()
}
} else {
elements
.iter()
.flat_map(|e| e.to_le_bytes())
.collect::<Vec<u8>>()
};
Ok(bytes)
@@ -176,7 +180,7 @@ mod tests {
];
for (s, expected) in cases.iter() {
let val = parse_string_to_vector_type_value(s, dim).unwrap();
let val = parse_string_to_vector_type_value(s, Some(dim)).unwrap();
let s = vector_type_value_to_string(&val, dim).unwrap();
assert_eq!(s, *expected);
}
@@ -184,7 +188,7 @@ mod tests {
let dim = 0;
let cases = [("[]", "[]"), ("[ ]", "[]"), ("[ ]", "[]")];
for (s, expected) in cases.iter() {
let val = parse_string_to_vector_type_value(s, dim).unwrap();
let val = parse_string_to_vector_type_value(s, Some(dim)).unwrap();
let s = vector_type_value_to_string(&val, dim).unwrap();
assert_eq!(s, *expected);
}
@@ -207,15 +211,15 @@ mod tests {
fn test_parse_string_to_vector_type_value_not_properly_enclosed_in_brackets() {
let dim = 3;
let s = "1.0,2.0,3.0";
let res = parse_string_to_vector_type_value(s, dim);
let res = parse_string_to_vector_type_value(s, Some(dim));
assert!(res.is_err());
let s = "[1.0,2.0,3.0";
let res = parse_string_to_vector_type_value(s, dim);
let res = parse_string_to_vector_type_value(s, Some(dim));
assert!(res.is_err());
let s = "1.0,2.0,3.0]";
let res = parse_string_to_vector_type_value(s, dim);
let res = parse_string_to_vector_type_value(s, Some(dim));
assert!(res.is_err());
}
@@ -223,7 +227,7 @@ mod tests {
fn test_parse_string_to_vector_type_value_wrong_dimension() {
let dim = 3;
let s = "[1.0,2.0]";
let res = parse_string_to_vector_type_value(s, dim);
let res = parse_string_to_vector_type_value(s, Some(dim));
assert!(res.is_err());
}
@@ -231,7 +235,7 @@ mod tests {
fn test_parse_string_to_vector_type_value_elements_are_not_all_float32() {
let dim = 3;
let s = "[1.0,2.0,ah]";
let res = parse_string_to_vector_type_value(s, dim);
let res = parse_string_to_vector_type_value(s, Some(dim));
assert!(res.is_err());
}
}

View File

@@ -1089,7 +1089,7 @@ macro_rules! impl_as_for_value_ref {
};
}
impl ValueRef<'_> {
impl<'a> ValueRef<'a> {
define_data_type_func!(ValueRef);
/// Returns true if this is null.
@@ -1098,12 +1098,12 @@ impl ValueRef<'_> {
}
/// Cast itself to binary slice.
pub fn as_binary(&self) -> Result<Option<&[u8]>> {
pub fn as_binary(&self) -> Result<Option<&'a [u8]>> {
impl_as_for_value_ref!(self, Binary)
}
/// Cast itself to string slice.
pub fn as_string(&self) -> Result<Option<&str>> {
pub fn as_string(&self) -> Result<Option<&'a str>> {
impl_as_for_value_ref!(self, String)
}

View File

@@ -80,7 +80,7 @@ impl BinaryVector {
let v = if let Some(binary) = binary {
let bytes_size = dim as usize * std::mem::size_of::<f32>();
if let Ok(s) = String::from_utf8(binary.to_vec()) {
let v = parse_string_to_vector_type_value(&s, dim)?;
let v = parse_string_to_vector_type_value(&s, Some(dim))?;
Some(v)
} else if binary.len() == dim as usize * std::mem::size_of::<f32>() {
Some(binary.to_vec())

View File

@@ -50,7 +50,10 @@ use crate::adapter::util::column_schemas_to_proto;
use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
use crate::compute::ErrCollector;
use crate::df_optimizer::sql_to_flow_plan;
use crate::error::{EvalSnafu, ExternalSnafu, InternalSnafu, TableNotFoundSnafu, UnexpectedSnafu};
use crate::error::{
EvalSnafu, ExternalSnafu, FlowAlreadyExistSnafu, InternalSnafu, TableNotFoundSnafu,
UnexpectedSnafu,
};
use crate::expr::{Batch, GlobalId};
use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_RUN_INTERVAL_MS};
use crate::repr::{self, DiffRow, Row, BATCH_SIZE};
@@ -299,7 +302,7 @@ impl FlowWorkerManager {
)]);
}
if row.len() != proto_schema.len() {
InternalSnafu {
UnexpectedSnafu {
reason: format!(
"Flow output row length mismatch, expect {} got {}, the columns in schema are: {:?}",
proto_schema.len(),
@@ -673,6 +676,21 @@ impl FlowWorkerManager {
}
}
/// The arguments to create a flow in [`FlowWorkerManager`].
#[derive(Debug, Clone)]
pub struct CreateFlowArgs {
pub flow_id: FlowId,
pub sink_table_name: TableName,
pub source_table_ids: Vec<TableId>,
pub create_if_not_exists: bool,
pub or_replace: bool,
pub expire_after: Option<i64>,
pub comment: Option<String>,
pub sql: String,
pub flow_options: HashMap<String, String>,
pub query_ctx: Option<QueryContext>,
}
/// Create&Remove flow
impl FlowWorkerManager {
/// remove a flow by it's id
@@ -694,18 +712,48 @@ impl FlowWorkerManager {
/// 1. parse query into typed plan(and optional parse expire_after expr)
/// 2. render source/sink with output table id and used input table id
#[allow(clippy::too_many_arguments)]
pub async fn create_flow(
&self,
flow_id: FlowId,
sink_table_name: TableName,
source_table_ids: &[TableId],
create_if_not_exists: bool,
expire_after: Option<i64>,
comment: Option<String>,
sql: String,
flow_options: HashMap<String, String>,
query_ctx: Option<QueryContext>,
) -> Result<Option<FlowId>, Error> {
pub async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
let CreateFlowArgs {
flow_id,
sink_table_name,
source_table_ids,
create_if_not_exists,
or_replace,
expire_after,
comment,
sql,
flow_options,
query_ctx,
} = args;
let already_exist = {
let mut flag = false;
// check if the task already exists
for handle in self.worker_handles.iter() {
if handle.lock().await.contains_flow(flow_id).await? {
flag = true;
break;
}
}
flag
};
match (create_if_not_exists, or_replace, already_exist) {
// do replace
(_, true, true) => {
info!("Replacing flow with id={}", flow_id);
self.remove_flow(flow_id).await?;
}
(false, false, true) => FlowAlreadyExistSnafu { id: flow_id }.fail()?,
// do nothing if exists
(true, false, true) => {
info!("Flow with id={} already exists, do nothing", flow_id);
return Ok(None);
}
// create if not exists
(_, _, false) => (),
}
if create_if_not_exists {
// check if the task already exists
for handle in self.worker_handles.iter() {
@@ -717,7 +765,7 @@ impl FlowWorkerManager {
let mut node_ctx = self.node_context.write().await;
// assign global id to source and sink table
for source in source_table_ids {
for source in &source_table_ids {
node_ctx
.assign_global_id_to_table(&self.table_info_source, None, Some(*source))
.await?;
@@ -726,7 +774,7 @@ impl FlowWorkerManager {
.assign_global_id_to_table(&self.table_info_source, Some(sink_table_name.clone()), None)
.await?;
node_ctx.register_task_src_sink(flow_id, source_table_ids, sink_table_name.clone());
node_ctx.register_task_src_sink(flow_id, &source_table_ids, sink_table_name.clone());
node_ctx.query_context = query_ctx.map(Arc::new);
// construct a active dataflow state with it

View File

@@ -28,7 +28,7 @@ use itertools::Itertools;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use crate::adapter::FlowWorkerManager;
use crate::adapter::{CreateFlowArgs, FlowWorkerManager};
use crate::error::InternalSnafu;
use crate::metrics::METRIC_FLOW_TASK_COUNT;
use crate::repr::{self, DiffRow};
@@ -57,6 +57,7 @@ impl Flownode for FlowWorkerManager {
comment,
sql,
flow_options,
or_replace,
})) => {
let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
let sink_table_name = [
@@ -65,20 +66,19 @@ impl Flownode for FlowWorkerManager {
sink_table_name.table_name,
];
let expire_after = expire_after.map(|e| e.value);
let ret = self
.create_flow(
task_id.id as u64,
sink_table_name,
&source_table_ids,
create_if_not_exists,
expire_after,
Some(comment),
sql,
flow_options,
query_ctx,
)
.await
.map_err(to_meta_err)?;
let args = CreateFlowArgs {
flow_id: task_id.id as u64,
sink_table_name,
source_table_ids,
create_if_not_exists,
or_replace,
expire_after,
comment: Some(comment),
sql,
flow_options,
query_ctx,
};
let ret = self.create_flow(args).await.map_err(to_meta_err)?;
METRIC_FLOW_TASK_COUNT.inc();
Ok(FlowResponse {
affected_flows: ret

View File

@@ -48,7 +48,7 @@ use tonic::codec::CompressionEncoding;
use tonic::transport::server::TcpIncoming;
use tonic::{Request, Response, Status};
use crate::adapter::FlowWorkerManagerRef;
use crate::adapter::{CreateFlowArgs, FlowWorkerManagerRef};
use crate::error::{
CacheRequiredSnafu, ExternalSnafu, FlowNotFoundSnafu, ListFlowsSnafu, ParseAddrSnafu,
ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu,
@@ -355,23 +355,26 @@ impl FlownodeBuilder {
info.sink_table_name().schema_name.clone(),
info.sink_table_name().table_name.clone(),
];
manager
.create_flow(
flow_id as _,
sink_table_name,
info.source_table_ids(),
true,
info.expire_after(),
Some(info.comment().clone()),
info.raw_sql().clone(),
info.options().clone(),
Some(
QueryContextBuilder::default()
.current_catalog(info.catalog_name().clone())
.build(),
),
)
.await?;
let args = CreateFlowArgs {
flow_id: flow_id as _,
sink_table_name,
source_table_ids: info.source_table_ids().to_vec(),
// because recover should only happen on restart the `create_if_not_exists` and `or_replace` can be arbitrary value(since flow doesn't exist)
// but for the sake of consistency and to make sure recover of flow actually happen, we set both to true
// (which is also fine since checks for not allow both to be true is on metasrv and we already pass that)
create_if_not_exists: true,
or_replace: true,
expire_after: info.expire_after(),
comment: Some(info.comment().clone()),
sql: info.raw_sql().clone(),
flow_options: info.options().clone(),
query_ctx: Some(
QueryContextBuilder::default()
.current_catalog(info.catalog_name().clone())
.build(),
),
};
manager.create_flow(args).await?;
}
Ok(cnt)

View File

@@ -492,6 +492,7 @@ pub fn check_permission(
Statement::CreateDatabase(_)
| Statement::ShowDatabases(_)
| Statement::DropDatabase(_)
| Statement::AlterDatabase(_)
| Statement::DropFlow(_)
| Statement::Use(_) => {}
Statement::ShowCreateDatabase(stmt) => {
@@ -516,7 +517,7 @@ pub fn check_permission(
Statement::CreateView(stmt) => {
validate_param(&stmt.name, query_ctx)?;
}
Statement::Alter(stmt) => {
Statement::AlterTable(stmt) => {
validate_param(stmt.table_name(), query_ctx)?;
}
// set/show variable now only alter/show variable in session

View File

@@ -115,7 +115,14 @@ impl GrpcQueryHandler for Instance {
.await?;
Output::new_with_affected_rows(0)
}
DdlExpr::Alter(expr) => {
DdlExpr::AlterDatabase(expr) => {
let _ = self
.statement_executor
.alter_database_inner(expr, ctx.clone())
.await?;
Output::new_with_affected_rows(0)
}
DdlExpr::AlterTable(expr) => {
self.statement_executor
.alter_table_inner(expr, ctx.clone())
.await?
@@ -195,11 +202,11 @@ fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryConte
}
match ddl_expr {
Expr::CreateDatabase(_) => { /* do nothing*/ }
Expr::CreateDatabase(_) | Expr::AlterDatabase(_) => { /* do nothing*/ }
Expr::CreateTable(expr) => {
check_and_fill!(expr);
}
Expr::Alter(expr) => {
Expr::AlterTable(expr) => {
check_and_fill!(expr);
}
Expr::DropTable(expr) => {

View File

@@ -44,7 +44,7 @@ impl MetasrvCacheInvalidator {
.clone()
.unwrap_or_else(|| DEFAULT_SUBJECT.to_string());
let msg = &MailboxMessage::json_message(
let mut msg = MailboxMessage::json_message(
subject,
&format!("Metasrv@{}", self.info.server_addr),
"Frontend broadcast",
@@ -54,22 +54,21 @@ impl MetasrvCacheInvalidator {
.with_context(|_| meta_error::SerdeJsonSnafu)?;
self.mailbox
.broadcast(&BroadcastChannel::Frontend, msg)
.broadcast(&BroadcastChannel::Frontend, &msg)
.await
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;
let msg = &MailboxMessage::json_message(
subject,
&format!("Metasrv@{}", self.info.server_addr),
"Flownode broadcast",
common_time::util::current_time_millis(),
&instruction,
)
.with_context(|_| meta_error::SerdeJsonSnafu)?;
msg.to = "Datanode broadcast".to_string();
self.mailbox
.broadcast(&BroadcastChannel::Flownode, msg)
.broadcast(&BroadcastChannel::Datanode, &msg)
.await
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;
msg.to = "Flownode broadcast".to_string();
self.mailbox
.broadcast(&BroadcastChannel::Flownode, &msg)
.await
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)

View File

@@ -622,6 +622,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Maintenance mode manager error"))]
MaintenanceModeManager {
source: common_meta::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Keyvalue backend error"))]
KvBackend {
source: common_meta::error::Error,
@@ -815,6 +822,7 @@ impl ErrorExt for Error {
Error::SubmitDdlTask { source, .. } => source.status_code(),
Error::ConvertProtoData { source, .. }
| Error::TableMetadataManager { source, .. }
| Error::MaintenanceModeManager { source, .. }
| Error::KvBackend { source, .. }
| Error::UnexpectedLogicalRouteTable { source, .. } => source.status_code(),

View File

@@ -26,6 +26,7 @@ use common_config::Configurable;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl::ProcedureExecutorRef;
use common_meta::key::maintenance::MaintenanceModeManagerRef;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
use common_meta::leadership_notifier::{
@@ -340,6 +341,7 @@ pub struct Metasrv {
procedure_executor: ProcedureExecutorRef,
wal_options_allocator: WalOptionsAllocatorRef,
table_metadata_manager: TableMetadataManagerRef,
maintenance_mode_manager: MaintenanceModeManagerRef,
memory_region_keeper: MemoryRegionKeeperRef,
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
region_migration_manager: RegionMigrationManagerRef,
@@ -572,6 +574,10 @@ impl Metasrv {
&self.table_metadata_manager
}
pub fn maintenance_mode_manager(&self) -> &MaintenanceModeManagerRef {
&self.maintenance_mode_manager
}
pub fn memory_region_keeper(&self) -> &MemoryRegionKeeperRef {
&self.memory_region_keeper
}

View File

@@ -27,6 +27,7 @@ use common_meta::ddl::{
use common_meta::ddl_manager::DdlManager;
use common_meta::distributed_time_constants;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::maintenance::MaintenanceModeManager;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
@@ -195,6 +196,7 @@ impl MetasrvBuilder {
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(
leader_cached_kv_backend.clone() as _,
));
let maintenance_mode_manager = Arc::new(MaintenanceModeManager::new(kv_backend.clone()));
let selector_ctx = SelectorContext {
server_addr: options.server_addr.clone(),
datanode_lease_secs: distributed_time_constants::DATANODE_LEASE_SECS,
@@ -304,7 +306,7 @@ impl MetasrvBuilder {
selector_ctx.clone(),
selector.clone(),
region_migration_manager.clone(),
leader_cached_kv_backend.clone() as _,
maintenance_mode_manager.clone(),
peer_lookup_service.clone(),
);
@@ -375,6 +377,7 @@ impl MetasrvBuilder {
procedure_executor: ddl_manager,
wal_options_allocator,
table_metadata_manager,
maintenance_mode_manager,
greptimedb_telemetry_task: get_greptimedb_telemetry_task(
Some(metasrv_home),
meta_peer_client,

View File

@@ -165,6 +165,7 @@ impl DowngradeLeaderRegion {
match receiver.await? {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!("Downgrade region reply: {:?}", reply);
let InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id,
exists,

View File

@@ -21,6 +21,7 @@ use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, Simple
use common_meta::key::datanode_table::RegionInfo;
use common_meta::RegionIdent;
use common_procedure::Status;
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
@@ -144,6 +145,7 @@ impl OpenCandidateRegion {
match receiver.await? {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!("Received open region reply: {:?}", reply);
let InstructionReply::OpenRegion(SimpleReply { result, error }) = reply else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),

View File

@@ -19,8 +19,7 @@ use std::time::Duration;
use async_trait::async_trait;
use common_meta::datanode::Stat;
use common_meta::ddl::{DetectingRegion, RegionFailureDetectorController};
use common_meta::key::MAINTENANCE_KEY;
use common_meta::kv_backend::KvBackendRef;
use common_meta::key::maintenance::MaintenanceModeManagerRef;
use common_meta::leadership_notifier::LeadershipChangeListener;
use common_meta::peer::PeerLookupServiceRef;
use common_meta::{ClusterId, DatanodeId};
@@ -216,8 +215,8 @@ pub struct RegionSupervisor {
selector: SelectorRef,
/// Region migration manager.
region_migration_manager: RegionMigrationManagerRef,
// TODO(weny): find a better way
kv_backend: KvBackendRef,
/// The maintenance mode manager.
maintenance_mode_manager: MaintenanceModeManagerRef,
/// Peer lookup service
peer_lookup: PeerLookupServiceRef,
}
@@ -288,7 +287,7 @@ impl RegionSupervisor {
selector_context: SelectorContext,
selector: SelectorRef,
region_migration_manager: RegionMigrationManagerRef,
kv_backend: KvBackendRef,
maintenance_mode_manager: MaintenanceModeManagerRef,
peer_lookup: PeerLookupServiceRef,
) -> Self {
Self {
@@ -297,7 +296,7 @@ impl RegionSupervisor {
selector_context,
selector,
region_migration_manager,
kv_backend,
maintenance_mode_manager,
peer_lookup,
}
}
@@ -346,7 +345,7 @@ impl RegionSupervisor {
if regions.is_empty() {
return;
}
match self.is_maintenance_mode().await {
match self.is_maintenance_mode_enabled().await {
Ok(false) => {}
Ok(true) => {
info!("Maintenance mode is enabled, skip failover");
@@ -382,11 +381,11 @@ impl RegionSupervisor {
}
}
pub(crate) async fn is_maintenance_mode(&self) -> Result<bool> {
self.kv_backend
.exists(MAINTENANCE_KEY.as_bytes())
pub(crate) async fn is_maintenance_mode_enabled(&self) -> Result<bool> {
self.maintenance_mode_manager
.maintenance_mode()
.await
.context(error::KvBackendSnafu)
.context(error::MaintenanceModeManagerSnafu)
}
async fn do_failover(
@@ -479,6 +478,7 @@ pub(crate) mod tests {
use std::time::Duration;
use common_meta::ddl::RegionFailureDetectorController;
use common_meta::key::maintenance;
use common_meta::peer::Peer;
use common_meta::test_util::NoopPeerLookupService;
use common_time::util::current_time_millis;
@@ -505,7 +505,8 @@ pub(crate) mod tests {
env.procedure_manager().clone(),
context_factory,
));
let kv_backend = env.kv_backend();
let maintenance_mode_manager =
Arc::new(maintenance::MaintenanceModeManager::new(env.kv_backend()));
let peer_lookup = Arc::new(NoopPeerLookupService);
let (tx, rx) = RegionSupervisor::channel();
@@ -516,7 +517,7 @@ pub(crate) mod tests {
selector_context,
selector,
region_migration_manager,
kv_backend,
maintenance_mode_manager,
peer_lookup,
),
tx,

View File

@@ -57,7 +57,7 @@ pub fn make_admin_service(metasrv: Arc<Metasrv>) -> Admin {
let router = router.route(
"/maintenance",
maintenance::MaintenanceHandler {
kv_backend: metasrv.kv_backend().clone(),
manager: metasrv.maintenance_mode_manager().clone(),
},
);
let router = Router::nest("/admin", router);

View File

@@ -14,31 +14,29 @@
use std::collections::HashMap;
use common_meta::key::MAINTENANCE_KEY;
use common_meta::kv_backend::KvBackendRef;
use common_meta::rpc::store::PutRequest;
use common_meta::key::maintenance::MaintenanceModeManagerRef;
use snafu::{OptionExt, ResultExt};
use tonic::codegen::http;
use tonic::codegen::http::Response;
use crate::error::{
InvalidHttpBodySnafu, KvBackendSnafu, MissingRequiredParameterSnafu, ParseBoolSnafu,
UnsupportedSnafu,
InvalidHttpBodySnafu, MaintenanceModeManagerSnafu, MissingRequiredParameterSnafu,
ParseBoolSnafu, UnsupportedSnafu,
};
use crate::service::admin::HttpHandler;
#[derive(Clone)]
pub struct MaintenanceHandler {
pub kv_backend: KvBackendRef,
pub manager: MaintenanceModeManagerRef,
}
impl MaintenanceHandler {
async fn get_maintenance(&self) -> crate::Result<Response<String>> {
let enabled = self
.kv_backend
.exists(MAINTENANCE_KEY.as_bytes())
.manager
.maintenance_mode()
.await
.context(KvBackendSnafu)?;
.context(MaintenanceModeManagerSnafu)?;
let response = if enabled {
"Maintenance mode is enabled"
} else {
@@ -63,21 +61,16 @@ impl MaintenanceHandler {
})?;
let response = if enable {
let req = PutRequest {
key: Vec::from(MAINTENANCE_KEY),
value: vec![],
prev_kv: false,
};
self.kv_backend
.put(req.clone())
self.manager
.set_maintenance_mode()
.await
.context(KvBackendSnafu)?;
.context(MaintenanceModeManagerSnafu)?;
"Maintenance mode enabled"
} else {
self.kv_backend
.delete(MAINTENANCE_KEY.as_bytes(), false)
self.manager
.unset_maintenance_mode()
.await
.context(KvBackendSnafu)?;
.context(MaintenanceModeManagerSnafu)?;
"Maintenance mode disabled"
};

View File

@@ -456,11 +456,7 @@ impl MetricEngineInner {
// concat region dir
let metadata_region_dir = join_dir(&request.region_dir, METADATA_REGION_SUBDIR);
// remove TTL and APPEND_MODE option
let mut options = request.options.clone();
options.remove(TTL_KEY);
options.remove(APPEND_MODE_KEY);
let options = region_options_for_metadata_region(request.options.clone());
RegionCreateRequest {
engine: MITO_ENGINE_NAME.to_string(),
column_metadatas: vec![
@@ -539,6 +535,15 @@ impl MetricEngineInner {
}
}
/// Creates the region options for metadata region in metric engine.
pub(crate) fn region_options_for_metadata_region(
mut original: HashMap<String, String>,
) -> HashMap<String, String> {
original.remove(APPEND_MODE_KEY);
original.insert(TTL_KEY.to_string(), "10000 years".to_string());
original
}
#[cfg(test)]
mod test {
use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY};
@@ -724,6 +729,9 @@ mod test {
metadata_region_request.region_dir,
"/test_dir/metadata/".to_string()
);
assert!(!metadata_region_request.options.contains_key("ttl"));
assert_eq!(
metadata_region_request.options.get("ttl").unwrap(),
"10000 years"
);
}
}

View File

@@ -24,6 +24,7 @@ use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
use store_api::storage::RegionId;
use super::MetricEngineInner;
use crate::engine::create::region_options_for_metadata_region;
use crate::engine::options::set_data_region_options;
use crate::error::{OpenMitoRegionSnafu, Result};
use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_REGION_COUNT};
@@ -68,9 +69,10 @@ impl MetricEngineInner {
let metadata_region_dir = join_dir(&request.region_dir, METADATA_REGION_SUBDIR);
let data_region_dir = join_dir(&request.region_dir, DATA_REGION_SUBDIR);
let metadata_region_options = region_options_for_metadata_region(request.options.clone());
let open_metadata_region_request = RegionOpenRequest {
region_dir: metadata_region_dir,
options: request.options.clone(),
options: metadata_region_options,
engine: MITO_ENGINE_NAME.to_string(),
skip_wal_replay: request.skip_wal_replay,
};

View File

@@ -501,7 +501,7 @@ mod tests {
// Read metadata from write cache
let builder = ParquetReaderBuilder::new(data_home, handle.clone(), mock_store.clone())
.cache(Some(cache_manager.clone()));
.cache(cache_manager.clone());
let reader = builder.build().await.unwrap();
// Check parquet metadata

View File

@@ -562,7 +562,7 @@ pub struct SerializedCompactionOutput {
struct CompactionSstReaderBuilder<'a> {
metadata: RegionMetadataRef,
sst_layer: AccessLayerRef,
cache: Option<CacheManagerRef>,
cache: CacheManagerRef,
inputs: &'a [FileHandle],
append_mode: bool,
filter_deleted: bool,

View File

@@ -295,7 +295,7 @@ impl Compactor for DefaultCompactor {
let reader = CompactionSstReaderBuilder {
metadata: region_metadata.clone(),
sst_layer: sst_layer.clone(),
cache: Some(cache_manager.clone()),
cache: cache_manager.clone(),
inputs: &output.inputs,
append_mode,
filter_deleted: output.filter_deleted,

View File

@@ -16,7 +16,7 @@ use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;
use common_telemetry::{debug, info};
use common_telemetry::{info, trace};
use common_time::timestamp::TimeUnit;
use common_time::timestamp_millis::BucketAligned;
use common_time::Timestamp;
@@ -114,7 +114,7 @@ impl TwcsPicker {
// Files in window exceeds file num limit
vec![enforce_file_num(&files.files, max_files)]
} else {
debug!("Skip building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, ", active_window, *window, max_runs, found_runs);
trace!("Skip building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, ", active_window, *window, max_runs, found_runs);
continue;
};
@@ -353,10 +353,10 @@ fn find_latest_window_in_seconds<'a>(
let mut latest_timestamp = None;
for f in files {
let (_, end) = f.time_range();
if let Some(latest) = latest_timestamp
&& end > latest
{
latest_timestamp = Some(end);
if let Some(latest) = latest_timestamp {
if end > latest {
latest_timestamp = Some(end);
}
} else {
latest_timestamp = Some(end);
}
@@ -406,6 +406,18 @@ mod tests {
)
.unwrap()
);
assert_eq!(
Some((i64::MAX / 3600000 + 1) * 3600),
find_latest_window_in_seconds(
[
new_file_handle(FileId::random(), i64::MIN, i64::MAX, 0),
new_file_handle(FileId::random(), 0, 1000, 0)
]
.iter(),
3600
)
);
}
#[test]

Some files were not shown because too many files have changed in this diff Show More