Compare commits

...

35 Commits

Author SHA1 Message Date
Yingwen
bc42d35c2a chore: bump version to 0.16 (#6417)
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-06-28 01:46:01 +00:00
fys
524bdfff22 fix: add cfg for DecodeSqlValue error (#6420) 2025-06-28 01:39:06 +00:00
fys
6bed0b6ba0 feat: add trigger-related error code (#6419) 2025-06-28 01:25:20 +00:00
shuiyisong
dec8c52b18 feat(pipeline): support Loki API (#6390)
* chore: use schema_info

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* refactor: abstract loki item generator

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: introduce middle item

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* feat: introduce pipeline in loki api

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* test: add tests

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: minor update

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: minor update

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: update prefix and test

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: change recursion to loop

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* fix: cr issue

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
2025-06-28 01:01:08 +00:00
zyy17
753a7e1a24 refactor: pass pipeline name through http header and get db from query context (#6405)
Signed-off-by: zyy17 <zyylsxm@gmail.com>
2025-06-27 10:43:37 +00:00
Lei, HUANG
6684200fce fix: skip failing nodes when gathering porcess info (#6412)
* fix/process-manager-skip-fail-nodes:
 - **Enhance Error Handling in `process_manager.rs`:**
   Improved error handling by adding a warning log for failing nodes in the `list_process` method. This ensures that the process listing continues even if some nodes fail to respond.

 - **Add Error Type Import in `process_manager.rs`:**
   Included the `Error` type from the `error` module to handle errors more effectively within the `ProcessManager` implementation.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix: clippy

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix/process-manager-skip-fail-nodes:
 **Enhancements to Debugging and Trait Implementation**

 - **`process_manager.rs`**: Improved logging by adding more detailed error messages when skipping failing nodes.
 - **`selector.rs`**: Enhanced the `FrontendClient` trait by adding the `Debug` trait bound to improve debugging capabilities.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-27 08:20:01 +00:00
Weny Xu
5fcb97724f chore: correct typo in configuration (#6411)
Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-06-27 07:55:13 +00:00
Zhenchi
ff559b2688 fix: complete partial index search results in cache (#6403)
* fix: complete partial index search results in cache

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* polish

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* add initial tests

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* cover issue case

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* TestEnv new -> async

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-06-27 07:40:14 +00:00
Ruihang Xia
8473a34fc9 feat: Collider for playing with PartitionRule (#6399)
* skeleton

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* initial impl and tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* refactor and reorganize

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add comment

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* error handling

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* explain naming

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-06-27 07:15:33 +00:00
jeremyhi
df0ebf0378 feat: override logical table's partition key indices (#6385)
* feat: Override logical table's partition key indices with physical table's

* chore: by comment
2025-06-27 02:55:56 +00:00
Weny Xu
4a665fd27b refactor: move #[allow(clippy::print_stdout)] to lib level (#6398)
chore: allow cli to print stdout

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-06-27 02:40:12 +00:00
Yingwen
b4d6441716 refactor: rename test show_processList to show_process_list (#6408)
refactor: rename show_processList to show_process_list

Signed-off-by: evenyag <realevenyag@gmail.com>
2025-06-27 01:18:54 +00:00
liyang
bdd50a2263 ci: try to fix the job permissions (#6407)
Signed-off-by: liyang <daviderli614@gmail.com>
2025-06-26 17:23:43 +00:00
codephage
f87b12b2aa feat: remove own pow fn (#6404)
feat remove own pow fn

Signed-off-by: codephage. <381510760@qq.com>
2025-06-26 09:27:30 +00:00
Yiran
07eec083b9 fix: doc issue assignee (#6406)
Signed-off-by: Yiran <cuiyiran3@gmail.com>
2025-06-26 09:18:47 +00:00
Weny Xu
4737285275 feat: implement pause/resume functionality for procedure manager (#6393)
* feat: implement pause/resume functionality for procedure manager

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-06-26 01:57:12 +00:00
codephage
55f5e09885 fix: sqlness_test show_processList (#6401)
fix test sqlness_test show_processList

Signed-off-by: codephage. <381510760@qq.com>
2025-06-26 01:53:55 +00:00
liyang
9ab36e9a6f test: add a test load configuration example for flownode (#6397)
* test: add a test load configuration example for flownode

Signed-off-by: liyang <daviderli614@gmail.com>

* format rust

Signed-off-by: liyang <daviderli614@gmail.com>

* fix cargo clippy

Signed-off-by: liyang <daviderli614@gmail.com>

* refine FlownodeOptions visibility

Signed-off-by: liyang <daviderli614@gmail.com>

* format rust

Signed-off-by: liyang <daviderli614@gmail.com>

---------

Signed-off-by: liyang <daviderli614@gmail.com>
2025-06-26 01:48:53 +00:00
Lei, HUANG
4bb5d00a4b fix(http): apply string validation mode to pipeline processor (#6378)
* fix/apply-string-validation-to-pipeline:
 ### Commit Summary

 - **Refactor `decode_string` Functionality**:
   - Moved `decode_string` logic into `PromValidationMode` as a method `decode_string`.
   - Updated all references to use the new method.
   - Files affected: `http.rs`, `prom_row_builder.rs`, `proto.rs`.

 - **Logging Enhancements**:
   - Added `debug` logging for invalid UTF-8 string values.
   - File affected: `http.rs`.

 - **Test Updates**:
   - Modified tests to use the new `decode_string` method in `PromValidationMode`.
   - File affected: `proto.rs`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix clippy

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-25 18:56:35 +00:00
Lei, HUANG
1d07864b29 refactor(object-store): move backends building functions back to object-store (#6400)
refactor/building-backend-in-object-store:
 ### Refactor Object Store Configuration

 - **Centralize Object Store Configurations**: Moved object store configurations (`FileConfig`, `S3Config`, `OssConfig`, `AzblobConfig`, `GcsConfig`) to `object-store/src/config.rs`.
 - **Error Handling Enhancements**: Introduced `object-store/src/error.rs` for improved error handling related to object store operations.
 - **Factory Pattern for Object Store**: Implemented `object-store/src/factory.rs` to create object store instances, consolidating logic from `datanode/src/store.rs`.
 - **Remove Redundant Store Implementations**: Deleted individual store files (`azblob.rs`, `fs.rs`, `gcs.rs`, `oss.rs`, `s3.rs`) from `datanode/src/store/`.
 - **Update Usage of Object Store Config**: Updated references to `ObjectStoreConfig` in `datanode.rs`, `standalone.rs`, `config.rs`, and `error.rs` to use the new centralized configuration.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-25 13:49:55 +00:00
ZonaHe
9be75361a4 feat: update dashboard to v0.10.1 (#6396)
Co-authored-by: ZonaHex <ZonaHex@users.noreply.github.com>
2025-06-25 12:58:04 +00:00
Ning Sun
9c1df68a5f feat: introduce /v1/health for healthcheck from external (#6388)
Signed-off-by: Ning Sun <sunning@greptime.com>
2025-06-25 12:25:36 +00:00
fys
0209461155 chore: add components for standalone instance (#6383) 2025-06-25 12:17:34 +00:00
Ruihang Xia
e728cb33fb feat: implement count_hash aggr function (#6342)
* feat: implement count_hash aggr function

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* sqlness case

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* change copyright year

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* review changes

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-06-25 12:14:03 +00:00
fys
cde7e11983 refactor: avoid adding feature to parameter (#6391)
* refactor: avoid adding feature to parameter

* avoid `cfg(not(feature = ...))` block
2025-06-25 10:47:20 +00:00
dennis zhuang
944b4b3e49 feat: supports CsvWithNames and CsvWithNamesAndTypes formats (#6384)
* feat: supports CsvWithNames and CsvWithNamesAndTypes formats and object/array types

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* test: added and fixed tests

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: fix test

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: remove comments

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* test: add json type csv tests

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: remove comment

Co-authored-by: Yingwen <realevenyag@gmail.com>

---------

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
2025-06-25 07:28:11 +00:00
fys
7953b090c0 feat: support syntax parsing of drop trigger (#6371)
* feat: trigger drop

* Update Cargo.toml

* Update Cargo.lock

---------

Co-authored-by: Ning Sun <classicning@gmail.com>
2025-06-24 18:48:39 +00:00
liyang
7aa9af5ba6 chore: clarify default OTLP endpoint value (#6381)
Signed-off-by: liyang <daviderli614@gmail.com>
2025-06-24 11:44:45 +00:00
Ruihang Xia
7a9444c85b refactor: remove staled manifest structures (#6382)
* refactor: remove staled manifest structures

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Update src/store-api/src/lib.rs

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-06-24 09:23:10 +00:00
LFC
bb12be3310 refactor: scan Batches directly (#6369)
* refactor: scan `Batch`es directly

Signed-off-by: luofucong <luofc@foxmail.com>

* fix ci

Signed-off-by: luofucong <luofc@foxmail.com>

* resolve PR comments

Signed-off-by: luofucong <luofc@foxmail.com>

* resolve PR comments

Signed-off-by: luofucong <luofc@foxmail.com>

---------

Signed-off-by: luofucong <luofc@foxmail.com>
2025-06-24 07:55:49 +00:00
Weny Xu
24019334ee feat: implement automatic region failure detector registrations (#6370)
* feat: implement automatic region failure detector registrations

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: remove unused error

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: add more tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: add `region_failure_detector_initialization_delay` option

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: update config.md

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: update config.md

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-06-24 06:12:12 +00:00
codephage
116d5cf82b feat: support mysql flavor show processlist shortcut (#6328) (#6379)
* feat: support mysql flavor show processlist shortcut (#6328)

Signed-off-by: codephage. <381510760@qq.com>

* Refactor SHOW PROCESSLIST handling and add tests

Signed-off-by: codephage. <381510760@qq.com>

* add sqlness test

Signed-off-by: codephage. <381510760@qq.com>

* add sqlness test result

Signed-off-by: codephage. <381510760@qq.com>

* fix sqlness test show_processList

Signed-off-by: codephage. <381510760@qq.com>

---------

Signed-off-by: codephage. <381510760@qq.com>
2025-06-24 03:50:16 +00:00
zyy17
90a3894564 refactor: always write parent_span_id for otlp traces ingestion (#6356)
* refactor: always write `parent_span_id` for otlp traces ingestion

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* refactor: support to write None value in row writer

Signed-off-by: zyy17 <zyylsxm@gmail.com>

---------

Signed-off-by: zyy17 <zyylsxm@gmail.com>
2025-06-23 11:36:57 +00:00
Yingwen
39d3e0651d feat: Support ListMetadataRequest to retrieve regions' metadata (#6348)
* feat: support list metadata in region server

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: add test for list region metadata

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: return null if region not exists

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: update greptime-proto

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
2025-06-23 07:11:20 +00:00
zyy17
a49edc6ca6 refactor: add otlp_export_protocol config to support export trace data through gRPC and HTTP protocol (#6357)
* refactor: support http traces

* refactor: add `otlp_export_protocol` config to support export trace data through gRPC and HTTP protocol

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* Update src/common/telemetry/src/logging.rs

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

---------

Signed-off-by: zyy17 <zyylsxm@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-06-23 02:56:12 +00:00
184 changed files with 6882 additions and 2363 deletions

View File

@@ -11,17 +11,17 @@ concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
cancel-in-progress: true
permissions:
issues: write
contents: write
pull-requests: write
jobs:
check:
runs-on: ubuntu-latest
permissions:
pull-requests: write # Add permissions to modify PRs
issues: write
timeout-minutes: 10
steps:
- uses: actions/checkout@v4
with:
persist-credentials: false
- uses: ./.github/actions/setup-cyborg
- name: Check Pull Request
working-directory: cyborg

264
Cargo.lock generated
View File

@@ -211,7 +211,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
[[package]]
name = "api"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"common-base",
"common-decimal",
@@ -944,7 +944,7 @@ dependencies = [
[[package]]
name = "auth"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"api",
"async-trait",
@@ -1586,7 +1586,7 @@ dependencies = [
[[package]]
name = "cache"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"catalog",
"common-error",
@@ -1610,7 +1610,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "catalog"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"api",
"arrow 54.2.1",
@@ -1948,7 +1948,7 @@ checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97"
[[package]]
name = "cli"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"async-stream",
"async-trait",
@@ -1986,14 +1986,14 @@ dependencies = [
"operator",
"query",
"rand 0.9.0",
"reqwest",
"reqwest 0.12.9",
"serde",
"serde_json",
"servers",
"session",
"snafu 0.8.5",
"store-api",
"substrait 0.15.0",
"substrait 0.16.0",
"table",
"tempfile",
"tokio",
@@ -2002,7 +2002,7 @@ dependencies = [
[[package]]
name = "client"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"api",
"arc-swap",
@@ -2032,7 +2032,7 @@ dependencies = [
"rand 0.9.0",
"serde_json",
"snafu 0.8.5",
"substrait 0.15.0",
"substrait 0.16.0",
"substrait 0.37.3",
"tokio",
"tokio-stream",
@@ -2073,7 +2073,7 @@ dependencies = [
[[package]]
name = "cmd"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"async-trait",
"auth",
@@ -2118,13 +2118,14 @@ dependencies = [
"mito2",
"moka",
"nu-ansi-term",
"object-store",
"plugins",
"prometheus",
"prost 0.13.5",
"query",
"rand 0.9.0",
"regex",
"reqwest",
"reqwest 0.12.9",
"rexpect",
"serde",
"serde_json",
@@ -2134,7 +2135,7 @@ dependencies = [
"snafu 0.8.5",
"stat",
"store-api",
"substrait 0.15.0",
"substrait 0.16.0",
"table",
"temp-env",
"tempfile",
@@ -2181,7 +2182,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
[[package]]
name = "common-base"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"anymap2",
"async-trait",
@@ -2203,11 +2204,11 @@ dependencies = [
[[package]]
name = "common-catalog"
version = "0.15.0"
version = "0.16.0"
[[package]]
name = "common-config"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"common-base",
"common-error",
@@ -2220,6 +2221,7 @@ dependencies = [
"humantime-serde",
"meta-client",
"num_cpus",
"object-store",
"serde",
"serde_json",
"serde_with",
@@ -2232,7 +2234,7 @@ dependencies = [
[[package]]
name = "common-datasource"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"arrow 54.2.1",
"arrow-schema 54.3.1",
@@ -2269,7 +2271,7 @@ dependencies = [
[[package]]
name = "common-decimal"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"bigdecimal 0.4.8",
"common-error",
@@ -2282,7 +2284,7 @@ dependencies = [
[[package]]
name = "common-error"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"common-macro",
"http 1.1.0",
@@ -2293,7 +2295,7 @@ dependencies = [
[[package]]
name = "common-frontend"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"async-trait",
"common-error",
@@ -2309,7 +2311,7 @@ dependencies = [
[[package]]
name = "common-function"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -2332,6 +2334,7 @@ dependencies = [
"datafusion",
"datafusion-common",
"datafusion-expr",
"datafusion-functions-aggregate-common",
"datatypes",
"derive_more",
"geo",
@@ -2362,7 +2365,7 @@ dependencies = [
[[package]]
name = "common-greptimedb-telemetry"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"async-trait",
"common-runtime",
@@ -2370,7 +2373,7 @@ dependencies = [
"common-test-util",
"common-version",
"hyper 0.14.30",
"reqwest",
"reqwest 0.12.9",
"serde",
"tempfile",
"tokio",
@@ -2379,7 +2382,7 @@ dependencies = [
[[package]]
name = "common-grpc"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"api",
"arrow-flight",
@@ -2411,7 +2414,7 @@ dependencies = [
[[package]]
name = "common-grpc-expr"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"api",
"common-base",
@@ -2430,7 +2433,7 @@ dependencies = [
[[package]]
name = "common-macro"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"arc-swap",
"common-query",
@@ -2444,7 +2447,7 @@ dependencies = [
[[package]]
name = "common-mem-prof"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"anyhow",
"common-error",
@@ -2460,7 +2463,7 @@ dependencies = [
[[package]]
name = "common-meta"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"anymap2",
"api",
@@ -2525,7 +2528,7 @@ dependencies = [
[[package]]
name = "common-options"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"common-grpc",
"humantime-serde",
@@ -2534,11 +2537,11 @@ dependencies = [
[[package]]
name = "common-plugins"
version = "0.15.0"
version = "0.16.0"
[[package]]
name = "common-pprof"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"common-error",
"common-macro",
@@ -2550,7 +2553,7 @@ dependencies = [
[[package]]
name = "common-procedure"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"async-stream",
"async-trait",
@@ -2577,7 +2580,7 @@ dependencies = [
[[package]]
name = "common-procedure-test"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"async-trait",
"common-procedure",
@@ -2586,7 +2589,7 @@ dependencies = [
[[package]]
name = "common-query"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"api",
"async-trait",
@@ -2612,7 +2615,7 @@ dependencies = [
[[package]]
name = "common-recordbatch"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"arc-swap",
"common-error",
@@ -2632,7 +2635,7 @@ dependencies = [
[[package]]
name = "common-runtime"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -2662,14 +2665,14 @@ dependencies = [
[[package]]
name = "common-session"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"strum 0.27.1",
]
[[package]]
name = "common-telemetry"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"backtrace",
"common-error",
@@ -2696,7 +2699,7 @@ dependencies = [
[[package]]
name = "common-test-util"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"client",
"common-grpc",
@@ -2709,7 +2712,7 @@ dependencies = [
[[package]]
name = "common-time"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"arrow 54.2.1",
"chrono",
@@ -2727,7 +2730,7 @@ dependencies = [
[[package]]
name = "common-version"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"build-data",
"const_format",
@@ -2737,7 +2740,7 @@ dependencies = [
[[package]]
name = "common-wal"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"common-base",
"common-error",
@@ -2760,7 +2763,7 @@ dependencies = [
[[package]]
name = "common-workload"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"api",
"common-telemetry",
@@ -3716,7 +3719,7 @@ dependencies = [
[[package]]
name = "datanode"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"api",
"arrow-flight",
@@ -3762,14 +3765,14 @@ dependencies = [
"prometheus",
"prost 0.13.5",
"query",
"reqwest",
"reqwest 0.12.9",
"serde",
"serde_json",
"servers",
"session",
"snafu 0.8.5",
"store-api",
"substrait 0.15.0",
"substrait 0.16.0",
"table",
"tokio",
"toml 0.8.19",
@@ -3778,7 +3781,7 @@ dependencies = [
[[package]]
name = "datatypes"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"arrow 54.2.1",
"arrow-array 54.2.1",
@@ -4438,7 +4441,7 @@ checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6"
[[package]]
name = "file-engine"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"api",
"async-trait",
@@ -4575,7 +4578,7 @@ checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8"
[[package]]
name = "flow"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"api",
"arrow 54.2.1",
@@ -4640,7 +4643,7 @@ dependencies = [
"sql",
"store-api",
"strum 0.27.1",
"substrait 0.15.0",
"substrait 0.16.0",
"table",
"tokio",
"tonic 0.12.3",
@@ -4695,7 +4698,7 @@ checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
[[package]]
name = "frontend"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"api",
"arc-swap",
@@ -4754,7 +4757,7 @@ dependencies = [
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
"store-api",
"strfmt",
"substrait 0.15.0",
"substrait 0.16.0",
"table",
"tokio",
"tokio-util",
@@ -5144,7 +5147,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=82fe5c6282f623c185b86f03e898ee8952e50cf9#82fe5c6282f623c185b86f03e898ee8952e50cf9"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=464226cf8a4a22696503536a123d0b9e318582f4#464226cf8a4a22696503536a123d0b9e318582f4"
dependencies = [
"prost 0.13.5",
"serde",
@@ -5915,7 +5918,7 @@ dependencies = [
[[package]]
name = "index"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"async-trait",
"asynchronous-codec",
@@ -6800,7 +6803,7 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
[[package]]
name = "log-query"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"chrono",
"common-error",
@@ -6812,7 +6815,7 @@ dependencies = [
[[package]]
name = "log-store"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"async-stream",
"async-trait",
@@ -7110,7 +7113,7 @@ dependencies = [
[[package]]
name = "meta-client"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"api",
"async-trait",
@@ -7138,7 +7141,7 @@ dependencies = [
[[package]]
name = "meta-srv"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"api",
"async-trait",
@@ -7229,7 +7232,7 @@ dependencies = [
[[package]]
name = "metric-engine"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"api",
"aquamarine",
@@ -7319,7 +7322,7 @@ dependencies = [
[[package]]
name = "mito-codec"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"api",
"bytes",
@@ -7342,7 +7345,7 @@ dependencies = [
[[package]]
name = "mito2"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"api",
"aquamarine",
@@ -8092,18 +8095,25 @@ dependencies = [
[[package]]
name = "object-store"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"anyhow",
"bytes",
"common-base",
"common-error",
"common-macro",
"common-telemetry",
"common-test-util",
"futures",
"humantime-serde",
"lazy_static",
"md5",
"moka",
"opendal",
"prometheus",
"reqwest 0.12.9",
"serde",
"snafu 0.8.5",
"tokio",
"uuid",
]
@@ -8238,7 +8248,7 @@ dependencies = [
"prometheus",
"quick-xml 0.36.2",
"reqsign",
"reqwest",
"reqwest 0.12.9",
"serde",
"serde_json",
"sha2",
@@ -8310,6 +8320,19 @@ dependencies = [
"tracing",
]
[[package]]
name = "opentelemetry-http"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f51189ce8be654f9b5f7e70e49967ed894e84a06fc35c6c042e64ac1fc5399e"
dependencies = [
"async-trait",
"bytes",
"http 0.2.12",
"opentelemetry 0.21.0",
"reqwest 0.11.27",
]
[[package]]
name = "opentelemetry-otlp"
version = "0.14.0"
@@ -8320,10 +8343,12 @@ dependencies = [
"futures-core",
"http 0.2.12",
"opentelemetry 0.21.0",
"opentelemetry-http",
"opentelemetry-proto 0.4.0",
"opentelemetry-semantic-conventions",
"opentelemetry_sdk 0.21.2",
"prost 0.11.9",
"reqwest 0.11.27",
"thiserror 1.0.64",
"tokio",
"tonic 0.9.2",
@@ -8406,7 +8431,7 @@ dependencies = [
[[package]]
name = "operator"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -8461,7 +8486,7 @@ dependencies = [
"sql",
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
"store-api",
"substrait 0.15.0",
"substrait 0.16.0",
"table",
"tokio",
"tokio-util",
@@ -8728,7 +8753,7 @@ dependencies = [
[[package]]
name = "partition"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"api",
"async-trait",
@@ -9016,7 +9041,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pipeline"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -9159,7 +9184,7 @@ dependencies = [
[[package]]
name = "plugins"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"auth",
"clap 4.5.19",
@@ -9472,7 +9497,7 @@ dependencies = [
[[package]]
name = "promql"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"ahash 0.8.11",
"async-trait",
@@ -9754,7 +9779,7 @@ dependencies = [
[[package]]
name = "puffin"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"async-compression 0.4.13",
"async-trait",
@@ -9796,7 +9821,7 @@ dependencies = [
[[package]]
name = "query"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -9862,7 +9887,7 @@ dependencies = [
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
"statrs",
"store-api",
"substrait 0.15.0",
"substrait 0.16.0",
"table",
"tokio",
"tokio-stream",
@@ -10310,7 +10335,7 @@ dependencies = [
"percent-encoding",
"quick-xml 0.35.0",
"rand 0.8.5",
"reqwest",
"reqwest 0.12.9",
"rsa",
"rust-ini 0.21.1",
"serde",
@@ -10319,6 +10344,42 @@ dependencies = [
"sha2",
]
[[package]]
name = "reqwest"
version = "0.11.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62"
dependencies = [
"base64 0.21.7",
"bytes",
"encoding_rs",
"futures-core",
"futures-util",
"h2 0.3.26",
"http 0.2.12",
"http-body 0.4.6",
"hyper 0.14.30",
"ipnet",
"js-sys",
"log",
"mime",
"once_cell",
"percent-encoding",
"pin-project-lite",
"serde",
"serde_json",
"serde_urlencoded",
"sync_wrapper 0.1.2",
"system-configuration",
"tokio",
"tower-service",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
"winreg",
]
[[package]]
name = "reqwest"
version = "0.12.9"
@@ -11148,7 +11209,7 @@ dependencies = [
[[package]]
name = "servers"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -11234,7 +11295,7 @@ dependencies = [
"quoted-string",
"rand 0.9.0",
"regex",
"reqwest",
"reqwest 0.12.9",
"rust-embed",
"rustls",
"rustls-pemfile",
@@ -11269,7 +11330,7 @@ dependencies = [
[[package]]
name = "session"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"api",
"arc-swap",
@@ -11608,7 +11669,7 @@ dependencies = [
[[package]]
name = "sql"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"api",
"chrono",
@@ -11663,7 +11724,7 @@ dependencies = [
[[package]]
name = "sqlness-runner"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -11678,7 +11739,7 @@ dependencies = [
"local-ip-address",
"mysql",
"num_cpus",
"reqwest",
"reqwest 0.12.9",
"serde",
"serde_json",
"sha2",
@@ -11963,7 +12024,7 @@ dependencies = [
[[package]]
name = "stat"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"nix 0.30.1",
]
@@ -11989,7 +12050,7 @@ dependencies = [
[[package]]
name = "store-api"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"api",
"aquamarine",
@@ -12150,7 +12211,7 @@ dependencies = [
[[package]]
name = "substrait"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"async-trait",
"bytes",
@@ -12328,9 +12389,30 @@ dependencies = [
"nom",
]
[[package]]
name = "system-configuration"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7"
dependencies = [
"bitflags 1.3.2",
"core-foundation",
"system-configuration-sys",
]
[[package]]
name = "system-configuration-sys"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]]
name = "table"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"api",
"async-trait",
@@ -12591,7 +12673,7 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
[[package]]
name = "tests-fuzz"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"arbitrary",
"async-trait",
@@ -12618,7 +12700,7 @@ dependencies = [
"paste",
"rand 0.9.0",
"rand_chacha 0.9.0",
"reqwest",
"reqwest 0.12.9",
"schemars",
"serde",
"serde_json",
@@ -12635,7 +12717,7 @@ dependencies = [
[[package]]
name = "tests-integration"
version = "0.15.0"
version = "0.16.0"
dependencies = [
"api",
"arrow-flight",
@@ -12702,7 +12784,7 @@ dependencies = [
"sql",
"sqlx",
"store-api",
"substrait 0.15.0",
"substrait 0.16.0",
"table",
"tempfile",
"time",
@@ -14501,6 +14583,16 @@ dependencies = [
"memchr",
]
[[package]]
name = "winreg"
version = "0.50.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1"
dependencies = [
"cfg-if",
"windows-sys 0.48.0",
]
[[package]]
name = "wit-bindgen-rt"
version = "0.39.0"

View File

@@ -71,7 +71,7 @@ members = [
resolver = "2"
[workspace.package]
version = "0.15.0"
version = "0.16.0"
edition = "2021"
license = "Apache-2.0"
@@ -121,6 +121,7 @@ datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "
datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-functions-aggregate-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
@@ -134,7 +135,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "82fe5c6282f623c185b86f03e898ee8952e50cf9" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "464226cf8a4a22696503536a123d0b9e318582f4" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -185,10 +185,11 @@
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. |
| `logging.otlp_endpoint` | String | `http://localhost:4318` | The OTLP tracing endpoint. |
| `logging.append_stdout` | Bool | `true` | Whether to append logs to stdout. |
| `logging.log_format` | String | `text` | The log format. Can be `text`/`json`. |
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
| `logging.otlp_export_protocol` | String | `http` | The OTLP tracing export protocol. Can be `grpc`/`http`. |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `slow_query` | -- | -- | The slow query log options. |
@@ -288,10 +289,11 @@
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. |
| `logging.otlp_endpoint` | String | `http://localhost:4318` | The OTLP tracing endpoint. |
| `logging.append_stdout` | Bool | `true` | Whether to append logs to stdout. |
| `logging.log_format` | String | `text` | The log format. Can be `text`/`json`. |
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
| `logging.otlp_export_protocol` | String | `http` | The OTLP tracing export protocol. Can be `grpc`/`http`. |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `slow_query` | -- | -- | The slow query log options. |
@@ -323,6 +325,7 @@
| `selector` | String | `round_robin` | Datanode selector type.<br/>- `round_robin` (default value)<br/>- `lease_based`<br/>- `load_based`<br/>For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". |
| `use_memory_store` | Bool | `false` | Store data in memory. |
| `enable_region_failover` | Bool | `false` | Whether to enable region failover.<br/>This feature is only available on GreptimeDB running on cluster mode and<br/>- Using Remote WAL<br/>- Using shared storage (e.g., s3). |
| `region_failure_detector_initialization_delay` | String | `10m` | The delay before starting region failure detection.<br/>This delay helps prevent Metasrv from triggering unnecessary region failovers before all Datanodes are fully started.<br/>Especially useful when the cluster is not deployed with GreptimeDB Operator and maintenance mode is not enabled. |
| `allow_region_failover_on_local_wal` | Bool | `false` | Whether to allow region failover on local WAL.<br/>**This option is not recommended to be set to true, because it may lead to data loss during failover.** |
| `node_max_idle_time` | String | `24hours` | Max allowed idle time before removing node info from metasrv memory. |
| `enable_telemetry` | Bool | `true` | Whether to enable greptimedb telemetry. Enabled by default. |
@@ -370,10 +373,11 @@
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. |
| `logging.otlp_endpoint` | String | `http://localhost:4318` | The OTLP tracing endpoint. |
| `logging.append_stdout` | Bool | `true` | Whether to append logs to stdout. |
| `logging.log_format` | String | `text` | The log format. Can be `text`/`json`. |
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
| `logging.otlp_export_protocol` | String | `http` | The OTLP tracing export protocol. Can be `grpc`/`http`. |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `export_metrics` | -- | -- | The metasrv can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
@@ -432,8 +436,8 @@
| `wal.provider` | String | `raft_engine` | The provider of the WAL.<br/>- `raft_engine`: the wal is stored in the local file system by raft-engine.<br/>- `kafka`: it's remote wal that data is stored in Kafka. |
| `wal.dir` | String | Unset | The directory to store the WAL files.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.file_size` | String | `128MB` | The size of the WAL segment file.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.purge_threshold` | String | `1GB` | The threshold of the WAL size to trigger a flush.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.purge_interval` | String | `1m` | The interval to trigger a flush.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.purge_threshold` | String | `1GB` | The threshold of the WAL size to trigger a purge.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.purge_interval` | String | `1m` | The interval to trigger a purge.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.read_batch_size` | Integer | `128` | The read batch size.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.sync_write` | Bool | `false` | Whether to use sync write.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.enable_log_recycle` | Bool | `true` | Whether to reuse logically truncated log files.<br/>**It's only used when the provider is `raft_engine`**. |
@@ -534,10 +538,11 @@
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. |
| `logging.otlp_endpoint` | String | `http://localhost:4318` | The OTLP tracing endpoint. |
| `logging.append_stdout` | Bool | `true` | Whether to append logs to stdout. |
| `logging.log_format` | String | `text` | The log format. Can be `text`/`json`. |
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
| `logging.otlp_export_protocol` | String | `http` | The OTLP tracing export protocol. Can be `grpc`/`http`. |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
@@ -584,10 +589,11 @@
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. |
| `logging.otlp_endpoint` | String | `http://localhost:4318` | The OTLP tracing endpoint. |
| `logging.append_stdout` | Bool | `true` | Whether to append logs to stdout. |
| `logging.log_format` | String | `text` | The log format. Can be `text`/`json`. |
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
| `logging.otlp_export_protocol` | String | `http` | The OTLP tracing export protocol. Can be `grpc`/`http`. |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |

View File

@@ -129,11 +129,11 @@ dir = "./greptimedb_data/wal"
## **It's only used when the provider is `raft_engine`**.
file_size = "128MB"
## The threshold of the WAL size to trigger a flush.
## The threshold of the WAL size to trigger a purge.
## **It's only used when the provider is `raft_engine`**.
purge_threshold = "1GB"
## The interval to trigger a flush.
## The interval to trigger a purge.
## **It's only used when the provider is `raft_engine`**.
purge_interval = "1m"
@@ -629,7 +629,7 @@ level = "info"
enable_otlp_tracing = false
## The OTLP tracing endpoint.
otlp_endpoint = "http://localhost:4317"
otlp_endpoint = "http://localhost:4318"
## Whether to append logs to stdout.
append_stdout = true
@@ -640,6 +640,9 @@ log_format = "text"
## The maximum amount of log files.
max_log_files = 720
## The OTLP tracing export protocol. Can be `grpc`/`http`.
otlp_export_protocol = "http"
## The percentage of tracing will be sampled and exported.
## Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
## ratio > 1 are treated as 1. Fractions < 0 are treated as 0

View File

@@ -83,7 +83,7 @@ level = "info"
enable_otlp_tracing = false
## The OTLP tracing endpoint.
otlp_endpoint = "http://localhost:4317"
otlp_endpoint = "http://localhost:4318"
## Whether to append logs to stdout.
append_stdout = true
@@ -94,6 +94,9 @@ log_format = "text"
## The maximum amount of log files.
max_log_files = 720
## The OTLP tracing export protocol. Can be `grpc`/`http`.
otlp_export_protocol = "http"
## The percentage of tracing will be sampled and exported.
## Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
## ratio > 1 are treated as 1. Fractions < 0 are treated as 0

View File

@@ -218,7 +218,7 @@ level = "info"
enable_otlp_tracing = false
## The OTLP tracing endpoint.
otlp_endpoint = "http://localhost:4317"
otlp_endpoint = "http://localhost:4318"
## Whether to append logs to stdout.
append_stdout = true
@@ -229,6 +229,9 @@ log_format = "text"
## The maximum amount of log files.
max_log_files = 720
## The OTLP tracing export protocol. Can be `grpc`/`http`.
otlp_export_protocol = "http"
## The percentage of tracing will be sampled and exported.
## Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
## ratio > 1 are treated as 1. Fractions < 0 are treated as 0

View File

@@ -43,6 +43,11 @@ use_memory_store = false
## - Using shared storage (e.g., s3).
enable_region_failover = false
## The delay before starting region failure detection.
## This delay helps prevent Metasrv from triggering unnecessary region failovers before all Datanodes are fully started.
## Especially useful when the cluster is not deployed with GreptimeDB Operator and maintenance mode is not enabled.
region_failure_detector_initialization_delay = '10m'
## Whether to allow region failover on local WAL.
## **This option is not recommended to be set to true, because it may lead to data loss during failover.**
allow_region_failover_on_local_wal = false
@@ -220,7 +225,7 @@ level = "info"
enable_otlp_tracing = false
## The OTLP tracing endpoint.
otlp_endpoint = "http://localhost:4317"
otlp_endpoint = "http://localhost:4318"
## Whether to append logs to stdout.
append_stdout = true
@@ -231,6 +236,9 @@ log_format = "text"
## The maximum amount of log files.
max_log_files = 720
## The OTLP tracing export protocol. Can be `grpc`/`http`.
otlp_export_protocol = "http"
## The percentage of tracing will be sampled and exported.
## Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
## ratio > 1 are treated as 1. Fractions < 0 are treated as 0

View File

@@ -720,7 +720,7 @@ level = "info"
enable_otlp_tracing = false
## The OTLP tracing endpoint.
otlp_endpoint = "http://localhost:4317"
otlp_endpoint = "http://localhost:4318"
## Whether to append logs to stdout.
append_stdout = true
@@ -731,6 +731,9 @@ log_format = "text"
## The maximum amount of log files.
max_log_files = 720
## The OTLP tracing export protocol. Can be `grpc`/`http`.
otlp_export_protocol = "http"
## The percentage of tracing will be sampled and exported.
## Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
## ratio > 1 are treated as 1. Fractions < 0 are treated as 0

View File

@@ -55,12 +55,25 @@ async function main() {
await client.rest.issues.addLabels({
owner, repo, issue_number: number, labels: [labelDocsRequired],
})
// Get available assignees for the docs repo
const assigneesResponse = await docsClient.rest.issues.listAssignees({
owner: 'GreptimeTeam',
repo: 'docs',
})
const validAssignees = assigneesResponse.data.map(assignee => assignee.login)
core.info(`Available assignees: ${validAssignees.join(', ')}`)
// Check if the actor is a valid assignee, otherwise fallback to fengjiachun
const assignee = validAssignees.includes(actor) ? actor : 'fengjiachun'
core.info(`Assigning issue to: ${assignee}`)
await docsClient.rest.issues.create({
owner: 'GreptimeTeam',
repo: 'docs',
title: `Update docs for ${title}`,
body: `A document change request is generated from ${html_url}`,
assignee: actor,
assignee: assignee,
}).then((res) => {
core.info(`Created issue ${res.data}`)
})

View File

@@ -31,6 +31,7 @@ excludes = [
"src/operator/src/expr_helper/trigger.rs",
"src/sql/src/statements/create/trigger.rs",
"src/sql/src/statements/show/trigger.rs",
"src/sql/src/statements/drop/trigger.rs",
"src/sql/src/parsers/create_parser/trigger.rs",
"src/sql/src/parsers/show_parser/trigger.rs",
]

View File

@@ -22,6 +22,7 @@ use greptime_proto::v1::region::RegionResponse as RegionResponseV1;
pub struct RegionResponse {
pub affected_rows: AffectedRows,
pub extensions: HashMap<String, Vec<u8>>,
pub metadata: Vec<u8>,
}
impl RegionResponse {
@@ -29,6 +30,7 @@ impl RegionResponse {
Self {
affected_rows: region_response.affected_rows as _,
extensions: region_response.extensions,
metadata: region_response.metadata,
}
}
@@ -37,6 +39,16 @@ impl RegionResponse {
Self {
affected_rows,
extensions: Default::default(),
metadata: Vec::new(),
}
}
/// Creates one response with metadata.
pub fn from_metadata(metadata: Vec<u8>) -> Self {
Self {
affected_rows: 0,
extensions: Default::default(),
metadata,
}
}
}

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use std::any::Any;
use std::collections::BTreeSet;
use std::collections::{BTreeSet, HashSet};
use std::sync::{Arc, Weak};
use async_stream::try_stream;
@@ -28,7 +28,7 @@ use common_meta::cache::{
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
use common_meta::key::table_name::TableNameKey;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
@@ -39,6 +39,7 @@ use moka::sync::Cache;
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
use session::context::{Channel, QueryContext};
use snafu::prelude::*;
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use table::dist_table::DistTable;
use table::metadata::TableId;
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
@@ -142,6 +143,64 @@ impl KvBackendCatalogManager {
pub fn procedure_manager(&self) -> Option<ProcedureManagerRef> {
self.procedure_manager.clone()
}
// Override logical table's partition key indices with physical table's.
async fn override_logical_table_partition_key_indices(
table_route_cache: &TableRouteCacheRef,
table_info_manager: &TableInfoManager,
table: TableRef,
) -> Result<TableRef> {
// If the table is not a metric table, return the table directly.
if table.table_info().meta.engine != METRIC_ENGINE_NAME {
return Ok(table);
}
if let Some(table_route_value) = table_route_cache
.get(table.table_info().table_id())
.await
.context(TableMetadataManagerSnafu)?
&& let TableRoute::Logical(logical_route) = &*table_route_value
&& let Some(physical_table_info_value) = table_info_manager
.get(logical_route.physical_table_id())
.await
.context(TableMetadataManagerSnafu)?
{
let mut new_table_info = (*table.table_info()).clone();
// Gather all column names from the logical table
let logical_column_names: HashSet<_> = new_table_info
.meta
.schema
.column_schemas()
.iter()
.map(|col| &col.name)
.collect();
// Only preserve partition key indices where the corresponding columns exist in logical table
new_table_info.meta.partition_key_indices = physical_table_info_value
.table_info
.meta
.partition_key_indices
.iter()
.filter(|&&index| {
physical_table_info_value
.table_info
.meta
.schema
.column_schemas
.get(index)
.map(|physical_column| logical_column_names.contains(&physical_column.name))
.unwrap_or(false)
})
.cloned()
.collect();
let new_table = DistTable::table(Arc::new(new_table_info));
return Ok(new_table);
}
Ok(table)
}
}
#[async_trait::async_trait]
@@ -268,10 +327,7 @@ impl CatalogManager for KvBackendCatalogManager {
let table_cache: TableCacheRef = self.cache_registry.get().context(CacheNotFoundSnafu {
name: "table_cache",
})?;
let table_route_cache: TableRouteCacheRef =
self.cache_registry.get().context(CacheNotFoundSnafu {
name: "table_route_cache",
})?;
let table = table_cache
.get_by_ref(&TableName {
catalog_name: catalog_name.to_string(),
@@ -281,55 +337,18 @@ impl CatalogManager for KvBackendCatalogManager {
.await
.context(GetTableCacheSnafu)?;
// Override logical table's partition key indices with physical table's.
if let Some(table) = &table
&& let Some(table_route_value) = table_route_cache
.get(table.table_info().table_id())
.await
.context(TableMetadataManagerSnafu)?
&& let TableRoute::Logical(logical_route) = &*table_route_value
&& let Some(physical_table_info_value) = self
.table_metadata_manager
.table_info_manager()
.get(logical_route.physical_table_id())
.await
.context(TableMetadataManagerSnafu)?
{
let mut new_table_info = (*table.table_info()).clone();
// Gather all column names from the logical table
let logical_column_names: std::collections::HashSet<_> = new_table_info
.meta
.schema
.column_schemas()
.iter()
.map(|col| &col.name)
.collect();
// Only preserve partition key indices where the corresponding columns exist in logical table
new_table_info.meta.partition_key_indices = physical_table_info_value
.table_info
.meta
.partition_key_indices
.iter()
.filter(|&&index| {
if let Some(physical_column) = physical_table_info_value
.table_info
.meta
.schema
.column_schemas
.get(index)
{
logical_column_names.contains(&physical_column.name)
} else {
false
}
})
.cloned()
.collect();
let new_table = DistTable::table(Arc::new(new_table_info));
return Ok(Some(new_table));
if let Some(table) = table {
let table_route_cache: TableRouteCacheRef =
self.cache_registry.get().context(CacheNotFoundSnafu {
name: "table_route_cache",
})?;
return Self::override_logical_table_partition_key_indices(
&table_route_cache,
self.table_metadata_manager.table_info_manager(),
table,
)
.await
.map(Some);
}
if channel == Channel::Postgres {
@@ -342,7 +361,7 @@ impl CatalogManager for KvBackendCatalogManager {
}
}
Ok(table)
Ok(None)
}
async fn tables_by_ids(
@@ -394,8 +413,20 @@ impl CatalogManager for KvBackendCatalogManager {
let catalog = catalog.to_string();
let schema = schema.to_string();
let semaphore = Arc::new(Semaphore::new(CONCURRENCY));
let table_route_cache: Result<TableRouteCacheRef> =
self.cache_registry.get().context(CacheNotFoundSnafu {
name: "table_route_cache",
});
common_runtime::spawn_global(async move {
let table_route_cache = match table_route_cache {
Ok(table_route_cache) => table_route_cache,
Err(e) => {
let _ = tx.send(Err(e)).await;
return;
}
};
let table_id_stream = metadata_manager
.table_name_manager()
.tables(&catalog, &schema)
@@ -422,6 +453,7 @@ impl CatalogManager for KvBackendCatalogManager {
let metadata_manager = metadata_manager.clone();
let tx = tx.clone();
let semaphore = semaphore.clone();
let table_route_cache = table_route_cache.clone();
common_runtime::spawn_global(async move {
// we don't explicitly close the semaphore so just ignore the potential error.
let _ = semaphore.acquire().await;
@@ -439,6 +471,16 @@ impl CatalogManager for KvBackendCatalogManager {
};
for table in table_info_values.into_values().map(build_table) {
let table = if let Ok(table) = table {
Self::override_logical_table_partition_key_indices(
&table_route_cache,
metadata_manager.table_info_manager(),
table,
)
.await
} else {
table
};
if tx.send(table).await.is_err() {
return;
}

View File

@@ -21,7 +21,7 @@ use std::sync::{Arc, RwLock};
use api::v1::frontend::{KillProcessRequest, ListProcessRequest, ProcessInfo};
use common_base::cancellation::CancellationHandle;
use common_frontend::selector::{FrontendSelector, MetaClientSelector};
use common_telemetry::{debug, info};
use common_telemetry::{debug, info, warn};
use common_time::util::current_time_millis;
use meta_client::MetaClientRef;
use snafu::{ensure, OptionExt, ResultExt};
@@ -141,14 +141,20 @@ impl ProcessManager {
.await
.context(error::InvokeFrontendSnafu)?;
for mut f in frontends {
processes.extend(
f.list_process(ListProcessRequest {
let result = f
.list_process(ListProcessRequest {
catalog: catalog.unwrap_or_default().to_string(),
})
.await
.context(error::InvokeFrontendSnafu)?
.processes,
);
.context(error::InvokeFrontendSnafu);
match result {
Ok(resp) => {
processes.extend(resp.processes);
}
Err(e) => {
warn!(e; "Skipping failing node: {:?}", f)
}
}
}
}
processes.extend(self.local_processes(catalog)?);

View File

@@ -19,7 +19,7 @@ mod information_memory_table;
pub mod key_column_usage;
mod partitions;
mod procedure_info;
mod process_list;
pub mod process_list;
pub mod region_peers;
mod region_statistics;
mod runtime_metrics;

View File

@@ -39,14 +39,14 @@ use crate::process_manager::ProcessManagerRef;
use crate::system_schema::information_schema::InformationTable;
/// Column names of `information_schema.process_list`
const ID: &str = "id";
const CATALOG: &str = "catalog";
const SCHEMAS: &str = "schemas";
const QUERY: &str = "query";
const CLIENT: &str = "client";
const FRONTEND: &str = "frontend";
const START_TIMESTAMP: &str = "start_timestamp";
const ELAPSED_TIME: &str = "elapsed_time";
pub const ID: &str = "id";
pub const CATALOG: &str = "catalog";
pub const SCHEMAS: &str = "schemas";
pub const QUERY: &str = "query";
pub const CLIENT: &str = "client";
pub const FRONTEND: &str = "frontend";
pub const START_TIMESTAMP: &str = "start_timestamp";
pub const ELAPSED_TIME: &str = "elapsed_time";
/// `information_schema.process_list` table implementation that tracks running
/// queries in current cluster.

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#[allow(clippy::print_stdout)]
mod bench;
mod data;
mod database;

View File

@@ -301,7 +301,6 @@ struct MetaInfoTool {
#[async_trait]
impl Tool for MetaInfoTool {
#[allow(clippy::print_stdout)]
async fn do_work(&self) -> std::result::Result<(), BoxedError> {
let result = MetadataSnapshotManager::info(
&self.inner,

View File

@@ -67,6 +67,7 @@ metric-engine.workspace = true
mito2.workspace = true
moka.workspace = true
nu-ansi-term = "0.46"
object-store.workspace = true
plugins.workspace = true
prometheus.workspace = true
prost.workspace = true

View File

@@ -280,7 +280,7 @@ mod tests {
use common_config::ENV_VAR_SEP;
use common_test_util::temp_dir::create_named_temp_file;
use datanode::config::{FileConfig, GcsConfig, ObjectStoreConfig, S3Config};
use object_store::config::{FileConfig, GcsConfig, ObjectStoreConfig, S3Config};
use servers::heartbeat_options::HeartbeatOptions;
use super::*;

View File

@@ -30,20 +30,16 @@ use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
use common_config::{metadata_store_dir, Configurable, KvBackendConfig};
use common_error::ext::BoxedError;
use common_meta::cache::LayeredCacheRegistryBuilder;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::cluster::{NodeInfo, NodeStatus};
use common_meta::datanode::RegionStat;
use common_meta::ddl::flow_meta::{FlowMetadataAllocator, FlowMetadataAllocatorRef};
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
use common_meta::ddl::flow_meta::FlowMetadataAllocator;
use common_meta::ddl::table_meta::TableMetadataAllocator;
use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl, ProcedureExecutorRef};
use common_meta::ddl_manager::DdlManager;
#[cfg(feature = "enterprise")]
use common_meta::ddl_manager::TriggerDdlManagerRef;
use common_meta::key::flow::flow_state::FlowStat;
use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::NodeManagerRef;
use common_meta::peer::Peer;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::region_registry::LeaderRegionRegistry;
@@ -261,15 +257,34 @@ pub struct Instance {
flownode: FlownodeInstance,
procedure_manager: ProcedureManagerRef,
wal_options_allocator: WalOptionsAllocatorRef,
// The components of standalone, which make it easier to expand based
// on the components.
#[cfg(feature = "enterprise")]
components: Components,
// Keep the logging guard to prevent the worker from being dropped.
_guard: Vec<WorkerGuard>,
}
#[cfg(feature = "enterprise")]
pub struct Components {
pub plugins: Plugins,
pub kv_backend: KvBackendRef,
pub frontend_client: Arc<FrontendClient>,
pub catalog_manager: catalog::CatalogManagerRef,
}
impl Instance {
/// Find the socket addr of a server by its `name`.
pub fn server_addr(&self, name: &str) -> Option<SocketAddr> {
self.frontend.server_handlers().addr(name)
}
#[cfg(feature = "enterprise")]
pub fn components(&self) -> &Components {
&self.components
}
}
#[async_trait]
@@ -550,13 +565,14 @@ impl StartCommand {
// actually make a connection
let (frontend_client, frontend_instance_handler) =
FrontendClient::from_empty_grpc_handler();
let frontend_client = Arc::new(frontend_client);
let flow_builder = FlownodeBuilder::new(
flownode_options,
plugins.clone(),
table_metadata_manager.clone(),
catalog_manager.clone(),
flow_metadata_manager.clone(),
Arc::new(frontend_client.clone()),
frontend_client.clone(),
);
let flownode = flow_builder
.build()
@@ -594,28 +610,36 @@ impl StartCommand {
.await
.context(error::BuildWalOptionsAllocatorSnafu)?;
let wal_options_allocator = Arc::new(wal_options_allocator);
let table_meta_allocator = Arc::new(TableMetadataAllocator::new(
let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
table_id_sequence,
wal_options_allocator.clone(),
));
let flow_meta_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
flow_id_sequence,
));
let ddl_context = DdlContext {
node_manager: node_manager.clone(),
cache_invalidator: layered_cache_registry.clone(),
memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
table_metadata_manager: table_metadata_manager.clone(),
table_metadata_allocator: table_metadata_allocator.clone(),
flow_metadata_manager: flow_metadata_manager.clone(),
flow_metadata_allocator: flow_metadata_allocator.clone(),
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
};
let procedure_manager_c = procedure_manager.clone();
let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager_c, true)
.context(error::InitDdlManagerSnafu)?;
#[cfg(feature = "enterprise")]
let trigger_ddl_manager: Option<TriggerDdlManagerRef> = plugins.get();
let ddl_task_executor = Self::create_ddl_task_executor(
procedure_manager.clone(),
node_manager.clone(),
layered_cache_registry.clone(),
table_metadata_manager,
table_meta_allocator,
flow_metadata_manager,
flow_meta_allocator,
#[cfg(feature = "enterprise")]
trigger_ddl_manager,
)
.await?;
let ddl_manager = {
let trigger_ddl_manager: Option<common_meta::ddl_manager::TriggerDdlManagerRef> =
plugins.get();
ddl_manager.with_trigger_ddl_manager(trigger_ddl_manager)
};
let ddl_task_executor: ProcedureExecutorRef = Arc::new(ddl_manager);
let fe_instance = FrontendBuilder::new(
fe_opts.clone(),
@@ -658,7 +682,7 @@ impl StartCommand {
let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
.context(error::ServersSnafu)?;
let servers = Services::new(opts, fe_instance.clone(), plugins)
let servers = Services::new(opts, fe_instance.clone(), plugins.clone())
.build()
.context(error::StartFrontendSnafu)?;
@@ -669,51 +693,26 @@ impl StartCommand {
export_metrics_task,
};
#[cfg(feature = "enterprise")]
let components = Components {
plugins,
kv_backend,
frontend_client,
catalog_manager,
};
Ok(Instance {
datanode,
frontend,
flownode,
procedure_manager,
wal_options_allocator,
#[cfg(feature = "enterprise")]
components,
_guard: guard,
})
}
#[allow(clippy::too_many_arguments)]
pub async fn create_ddl_task_executor(
procedure_manager: ProcedureManagerRef,
node_manager: NodeManagerRef,
cache_invalidator: CacheInvalidatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_metadata_allocator: TableMetadataAllocatorRef,
flow_metadata_manager: FlowMetadataManagerRef,
flow_metadata_allocator: FlowMetadataAllocatorRef,
#[cfg(feature = "enterprise")] trigger_ddl_manager: Option<TriggerDdlManagerRef>,
) -> Result<ProcedureExecutorRef> {
let procedure_executor: ProcedureExecutorRef = Arc::new(
DdlManager::try_new(
DdlContext {
node_manager,
cache_invalidator,
memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
table_metadata_manager,
table_metadata_allocator,
flow_metadata_manager,
flow_metadata_allocator,
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
},
procedure_manager,
true,
#[cfg(feature = "enterprise")]
trigger_ddl_manager,
)
.context(error::InitDdlManagerSnafu)?,
);
Ok(procedure_executor)
}
pub async fn create_table_metadata_manager(
kv_backend: KvBackendRef,
) -> Result<TableMetadataManagerRef> {
@@ -849,7 +848,7 @@ mod tests {
use common_config::ENV_VAR_SEP;
use common_test_util::temp_dir::create_named_temp_file;
use common_wal::config::DatanodeWalConfig;
use datanode::config::{FileConfig, GcsConfig};
use object_store::config::{FileConfig, GcsConfig};
use super::*;
use crate::options::GlobalOptions;
@@ -968,15 +967,15 @@ mod tests {
assert!(matches!(
&dn_opts.storage.store,
datanode::config::ObjectStoreConfig::File(FileConfig { .. })
object_store::config::ObjectStoreConfig::File(FileConfig { .. })
));
assert_eq!(dn_opts.storage.providers.len(), 2);
assert!(matches!(
dn_opts.storage.providers[0],
datanode::config::ObjectStoreConfig::Gcs(GcsConfig { .. })
object_store::config::ObjectStoreConfig::Gcs(GcsConfig { .. })
));
match &dn_opts.storage.providers[1] {
datanode::config::ObjectStoreConfig::S3(s3_config) => {
object_store::config::ObjectStoreConfig::S3(s3_config) => {
assert_eq!(
"SecretBox<alloc::string::String>([REDACTED])".to_string(),
format!("{:?}", s3_config.access_key_id)

View File

@@ -18,11 +18,12 @@ use cmd::options::GreptimeOptions;
use cmd::standalone::StandaloneOptions;
use common_config::{Configurable, DEFAULT_DATA_HOME};
use common_options::datanode::{ClientOptions, DatanodeClientOptions};
use common_telemetry::logging::{LoggingOptions, DEFAULT_LOGGING_DIR, DEFAULT_OTLP_ENDPOINT};
use common_telemetry::logging::{LoggingOptions, DEFAULT_LOGGING_DIR, DEFAULT_OTLP_HTTP_ENDPOINT};
use common_wal::config::raft_engine::RaftEngineConfig;
use common_wal::config::DatanodeWalConfig;
use datanode::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
use file_engine::config::EngineConfig as FileEngineConfig;
use flow::FlownodeOptions;
use frontend::frontend::FrontendOptions;
use meta_client::MetaClientOptions;
use meta_srv::metasrv::MetasrvOptions;
@@ -81,7 +82,7 @@ fn test_load_datanode_example_config() {
logging: LoggingOptions {
level: Some("info".to_string()),
dir: format!("{}/{}", DEFAULT_DATA_HOME, DEFAULT_LOGGING_DIR),
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
otlp_endpoint: Some(DEFAULT_OTLP_HTTP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()),
..Default::default()
},
@@ -124,7 +125,7 @@ fn test_load_frontend_example_config() {
logging: LoggingOptions {
level: Some("info".to_string()),
dir: format!("{}/{}", DEFAULT_DATA_HOME, DEFAULT_LOGGING_DIR),
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
otlp_endpoint: Some(DEFAULT_OTLP_HTTP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()),
..Default::default()
},
@@ -172,7 +173,7 @@ fn test_load_metasrv_example_config() {
logging: LoggingOptions {
dir: format!("{}/{}", DEFAULT_DATA_HOME, DEFAULT_LOGGING_DIR),
level: Some("info".to_string()),
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
otlp_endpoint: Some(DEFAULT_OTLP_HTTP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()),
..Default::default()
},
@@ -195,6 +196,54 @@ fn test_load_metasrv_example_config() {
similar_asserts::assert_eq!(options, expected);
}
#[test]
fn test_load_flownode_example_config() {
let example_config = common_test_util::find_workspace_path("config/flownode.example.toml");
let options =
GreptimeOptions::<FlownodeOptions>::load_layered_options(example_config.to_str(), "")
.unwrap();
let expected = GreptimeOptions::<FlownodeOptions> {
component: FlownodeOptions {
node_id: Some(14),
flow: Default::default(),
grpc: GrpcOptions {
bind_addr: "127.0.0.1:6800".to_string(),
server_addr: "127.0.0.1:6800".to_string(),
runtime_size: 2,
..Default::default()
},
logging: LoggingOptions {
dir: format!("{}/{}", DEFAULT_DATA_HOME, DEFAULT_LOGGING_DIR),
level: Some("info".to_string()),
otlp_endpoint: Some(DEFAULT_OTLP_HTTP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()),
..Default::default()
},
tracing: Default::default(),
heartbeat: Default::default(),
query: Default::default(),
meta_client: Some(MetaClientOptions {
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
timeout: Duration::from_secs(3),
heartbeat_timeout: Duration::from_millis(500),
ddl_timeout: Duration::from_secs(10),
connect_timeout: Duration::from_secs(1),
tcp_nodelay: true,
metadata_cache_max_capacity: 100000,
metadata_cache_ttl: Duration::from_secs(600),
metadata_cache_tti: Duration::from_secs(300),
}),
http: HttpOptions {
addr: "127.0.0.1:4000".to_string(),
..Default::default()
},
user_provider: None,
},
..Default::default()
};
similar_asserts::assert_eq!(options, expected);
}
#[test]
fn test_load_standalone_example_config() {
let example_config = common_test_util::find_workspace_path("config/standalone.example.toml");
@@ -229,7 +278,7 @@ fn test_load_standalone_example_config() {
logging: LoggingOptions {
level: Some("info".to_string()),
dir: format!("{}/{}", DEFAULT_DATA_HOME, DEFAULT_LOGGING_DIR),
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
otlp_endpoint: Some(DEFAULT_OTLP_HTTP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()),
..Default::default()
},

View File

@@ -14,6 +14,7 @@ common-macro.workspace = true
config.workspace = true
humantime-serde.workspace = true
num_cpus.workspace = true
object-store.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true

View File

@@ -106,7 +106,7 @@ mod tests {
use common_telemetry::logging::LoggingOptions;
use common_test_util::temp_dir::create_named_temp_file;
use common_wal::config::DatanodeWalConfig;
use datanode::config::{ObjectStoreConfig, StorageConfig};
use datanode::config::StorageConfig;
use meta_client::MetaClientOptions;
use serde::{Deserialize, Serialize};
@@ -212,7 +212,7 @@ mod tests {
// Check the configs from environment variables.
match &opts.storage.store {
ObjectStoreConfig::S3(s3_config) => {
object_store::config::ObjectStoreConfig::S3(s3_config) => {
assert_eq!(s3_config.bucket, "mybucket".to_string());
}
_ => panic!("unexpected store type"),

View File

@@ -119,6 +119,11 @@ pub enum StatusCode {
FlowAlreadyExists = 8000,
FlowNotFound = 8001,
// ====== End of flow related status code =====
// ====== Begin of trigger related status code =====
TriggerAlreadyExists = 9000,
TriggerNotFound = 9001,
// ====== End of trigger related status code =====
}
impl StatusCode {
@@ -155,6 +160,8 @@ impl StatusCode {
| StatusCode::RegionNotFound
| StatusCode::FlowAlreadyExists
| StatusCode::FlowNotFound
| StatusCode::TriggerAlreadyExists
| StatusCode::TriggerNotFound
| StatusCode::RegionReadonly
| StatusCode::TableColumnNotFound
| StatusCode::TableColumnExists
@@ -198,6 +205,8 @@ impl StatusCode {
| StatusCode::PlanQuery
| StatusCode::FlowAlreadyExists
| StatusCode::FlowNotFound
| StatusCode::TriggerAlreadyExists
| StatusCode::TriggerNotFound
| StatusCode::RegionNotReady
| StatusCode::RegionBusy
| StatusCode::RegionReadonly
@@ -281,12 +290,14 @@ pub fn status_to_tonic_code(status_code: StatusCode) -> Code {
| StatusCode::TableColumnExists
| StatusCode::RegionAlreadyExists
| StatusCode::DatabaseAlreadyExists
| StatusCode::TriggerAlreadyExists
| StatusCode::FlowAlreadyExists => Code::AlreadyExists,
StatusCode::TableNotFound
| StatusCode::RegionNotFound
| StatusCode::TableColumnNotFound
| StatusCode::DatabaseNotFound
| StatusCode::UserNotFound
| StatusCode::TriggerNotFound
| StatusCode::FlowNotFound => Code::NotFound,
StatusCode::TableUnavailable
| StatusCode::StorageUnavailable

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Debug;
use std::time::Duration;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
@@ -30,7 +31,7 @@ use crate::error::{MetaSnafu, Result};
pub type FrontendClientPtr = Box<dyn FrontendClient>;
#[async_trait::async_trait]
pub trait FrontendClient: Send {
pub trait FrontendClient: Send + Debug {
async fn list_process(&mut self, req: ListProcessRequest) -> Result<ListProcessResponse>;
async fn kill_process(&mut self, req: KillProcessRequest) -> Result<KillProcessResponse>;

View File

@@ -33,6 +33,7 @@ common-version.workspace = true
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datafusion-functions-aggregate-common.workspace = true
datatypes.workspace = true
derive_more = { version = "1", default-features = false, features = ["display"] }
geo = { version = "0.29", optional = true }

View File

@@ -13,6 +13,7 @@
// limitations under the License.
pub mod approximate;
pub mod count_hash;
#[cfg(feature = "geo")]
pub mod geo;
pub mod vector;

View File

@@ -0,0 +1,647 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! `CountHash` / `count_hash` is a hash-based approximate distinct count function.
//!
//! It is a variant of `CountDistinct` that uses a hash function to approximate the
//! distinct count.
//! It is designed to be more efficient than `CountDistinct` for large datasets,
//! but it is not as accurate, as the hash value may be collision.
use std::collections::HashSet;
use std::fmt::Debug;
use std::sync::Arc;
use ahash::RandomState;
use datafusion_common::cast::as_list_array;
use datafusion_common::error::Result;
use datafusion_common::hash_utils::create_hashes;
use datafusion_common::utils::SingleRowListArrayBuilder;
use datafusion_common::{internal_err, not_impl_err, ScalarValue};
use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
use datafusion_expr::utils::{format_state_name, AggregateOrderSensitivity};
use datafusion_expr::{
Accumulator, AggregateUDF, AggregateUDFImpl, EmitTo, GroupsAccumulator, ReversedUDAF,
SetMonotonicity, Signature, TypeSignature, Volatility,
};
use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls::filtered_null_mask;
use datatypes::arrow;
use datatypes::arrow::array::{
Array, ArrayRef, AsArray, BooleanArray, Int64Array, ListArray, UInt64Array,
};
use datatypes::arrow::buffer::{OffsetBuffer, ScalarBuffer};
use datatypes::arrow::datatypes::{DataType, Field};
use crate::function_registry::FunctionRegistry;
type HashValueType = u64;
// read from /dev/urandom 4047821dc6144e4b2abddf23ad4171126a52eeecd26eff2191cf673b965a7875
const RANDOM_SEED_0: u64 = 0x4047821dc6144e4b;
const RANDOM_SEED_1: u64 = 0x2abddf23ad417112;
const RANDOM_SEED_2: u64 = 0x6a52eeecd26eff21;
const RANDOM_SEED_3: u64 = 0x91cf673b965a7875;
impl CountHash {
pub fn register(registry: &FunctionRegistry) {
registry.register_aggr(CountHash::udf_impl());
}
pub fn udf_impl() -> AggregateUDF {
AggregateUDF::new_from_impl(CountHash {
signature: Signature::one_of(
vec![TypeSignature::VariadicAny, TypeSignature::Nullary],
Volatility::Immutable,
),
})
}
}
#[derive(Debug, Clone)]
pub struct CountHash {
signature: Signature,
}
impl AggregateUDFImpl for CountHash {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn name(&self) -> &str {
"count_hash"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Int64)
}
fn is_nullable(&self) -> bool {
false
}
fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<Field>> {
Ok(vec![Field::new_list(
format_state_name(args.name, "count_hash"),
Field::new_list_field(DataType::UInt64, true),
// For count_hash accumulator, null list item stands for an
// empty value set (i.e., all NULL value so far for that group).
true,
)])
}
fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
if acc_args.exprs.len() > 1 {
return not_impl_err!("count_hash with multiple arguments");
}
Ok(Box::new(CountHashAccumulator {
values: HashSet::default(),
random_state: RandomState::with_seeds(
RANDOM_SEED_0,
RANDOM_SEED_1,
RANDOM_SEED_2,
RANDOM_SEED_3,
),
batch_hashes: vec![],
}))
}
fn aliases(&self) -> &[String] {
&[]
}
fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool {
true
}
fn create_groups_accumulator(
&self,
args: AccumulatorArgs,
) -> Result<Box<dyn GroupsAccumulator>> {
if args.exprs.len() > 1 {
return not_impl_err!("count_hash with multiple arguments");
}
Ok(Box::new(CountHashGroupAccumulator::new()))
}
fn reverse_expr(&self) -> ReversedUDAF {
ReversedUDAF::Identical
}
fn order_sensitivity(&self) -> AggregateOrderSensitivity {
AggregateOrderSensitivity::Insensitive
}
fn default_value(&self, _data_type: &DataType) -> Result<ScalarValue> {
Ok(ScalarValue::Int64(Some(0)))
}
fn set_monotonicity(&self, _data_type: &DataType) -> SetMonotonicity {
SetMonotonicity::Increasing
}
}
/// GroupsAccumulator for `count_hash` aggregate function
#[derive(Debug)]
pub struct CountHashGroupAccumulator {
/// One HashSet per group to track distinct values
distinct_sets: Vec<HashSet<HashValueType, RandomState>>,
random_state: RandomState,
batch_hashes: Vec<HashValueType>,
}
impl Default for CountHashGroupAccumulator {
fn default() -> Self {
Self::new()
}
}
impl CountHashGroupAccumulator {
pub fn new() -> Self {
Self {
distinct_sets: vec![],
random_state: RandomState::with_seeds(
RANDOM_SEED_0,
RANDOM_SEED_1,
RANDOM_SEED_2,
RANDOM_SEED_3,
),
batch_hashes: vec![],
}
}
fn ensure_sets(&mut self, total_num_groups: usize) {
if self.distinct_sets.len() < total_num_groups {
self.distinct_sets
.resize_with(total_num_groups, HashSet::default);
}
}
}
impl GroupsAccumulator for CountHashGroupAccumulator {
fn update_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
assert_eq!(values.len(), 1, "count_hash expects a single argument");
self.ensure_sets(total_num_groups);
let array = &values[0];
self.batch_hashes.clear();
self.batch_hashes.resize(array.len(), 0);
let hashes = create_hashes(
&[ArrayRef::clone(array)],
&self.random_state,
&mut self.batch_hashes,
)?;
// Use a pattern similar to accumulate_indices to process rows
// that are not null and pass the filter
let nulls = array.logical_nulls();
match (nulls.as_ref(), opt_filter) {
(None, None) => {
// No nulls, no filter - process all rows
for (row_idx, &group_idx) in group_indices.iter().enumerate() {
self.distinct_sets[group_idx].insert(hashes[row_idx]);
}
}
(Some(nulls), None) => {
// Has nulls, no filter
for (row_idx, (&group_idx, is_valid)) in
group_indices.iter().zip(nulls.iter()).enumerate()
{
if is_valid {
self.distinct_sets[group_idx].insert(hashes[row_idx]);
}
}
}
(None, Some(filter)) => {
// No nulls, has filter
for (row_idx, (&group_idx, filter_value)) in
group_indices.iter().zip(filter.iter()).enumerate()
{
if let Some(true) = filter_value {
self.distinct_sets[group_idx].insert(hashes[row_idx]);
}
}
}
(Some(nulls), Some(filter)) => {
// Has nulls and filter
let iter = filter
.iter()
.zip(group_indices.iter())
.zip(nulls.iter())
.enumerate();
for (row_idx, ((filter_value, &group_idx), is_valid)) in iter {
if is_valid && filter_value == Some(true) {
self.distinct_sets[group_idx].insert(hashes[row_idx]);
}
}
}
}
Ok(())
}
fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
let distinct_sets: Vec<HashSet<u64, RandomState>> =
emit_to.take_needed(&mut self.distinct_sets);
let counts = distinct_sets
.iter()
.map(|set| set.len() as i64)
.collect::<Vec<_>>();
Ok(Arc::new(Int64Array::from(counts)))
}
fn merge_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
_opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
assert_eq!(
values.len(),
1,
"count_hash merge expects a single state array"
);
self.ensure_sets(total_num_groups);
let list_array = as_list_array(&values[0])?;
// For each group in the incoming batch
for (i, &group_idx) in group_indices.iter().enumerate() {
if i < list_array.len() {
let inner_array = list_array.value(i);
let inner_array = inner_array.as_any().downcast_ref::<UInt64Array>().unwrap();
// Add each value to our set for this group
for j in 0..inner_array.len() {
if !inner_array.is_null(j) {
self.distinct_sets[group_idx].insert(inner_array.value(j));
}
}
}
}
Ok(())
}
fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
let distinct_sets: Vec<HashSet<u64, RandomState>> =
emit_to.take_needed(&mut self.distinct_sets);
let mut offsets = Vec::with_capacity(distinct_sets.len() + 1);
offsets.push(0);
let mut curr_len = 0i32;
let mut value_iter = distinct_sets
.into_iter()
.flat_map(|set| {
// build offset
curr_len += set.len() as i32;
offsets.push(curr_len);
// convert into iter
set.into_iter()
})
.peekable();
let data_array: ArrayRef = if value_iter.peek().is_none() {
arrow::array::new_empty_array(&DataType::UInt64) as _
} else {
Arc::new(UInt64Array::from_iter_values(value_iter))
};
let offset_buffer = OffsetBuffer::new(ScalarBuffer::from(offsets));
let list_array = ListArray::new(
Arc::new(Field::new_list_field(DataType::UInt64, true)),
offset_buffer,
data_array,
None,
);
Ok(vec![Arc::new(list_array) as _])
}
fn convert_to_state(
&self,
values: &[ArrayRef],
opt_filter: Option<&BooleanArray>,
) -> Result<Vec<ArrayRef>> {
// For a single hash value per row, create a list array with that value
assert_eq!(values.len(), 1, "count_hash expects a single argument");
let values = ArrayRef::clone(&values[0]);
let offsets = OffsetBuffer::new(ScalarBuffer::from_iter(0..values.len() as i32 + 1));
let nulls = filtered_null_mask(opt_filter, &values);
let list_array = ListArray::new(
Arc::new(Field::new_list_field(DataType::UInt64, true)),
offsets,
values,
nulls,
);
Ok(vec![Arc::new(list_array)])
}
fn supports_convert_to_state(&self) -> bool {
true
}
fn size(&self) -> usize {
// Base size of the struct
let mut size = size_of::<Self>();
// Size of the vector holding the HashSets
size += size_of::<Vec<HashSet<HashValueType, RandomState>>>()
+ self.distinct_sets.capacity() * size_of::<HashSet<HashValueType, RandomState>>();
// Estimate HashSet contents size more efficiently
// Instead of iterating through all values which is expensive, use an approximation
for set in &self.distinct_sets {
// Base size of the HashSet
size += set.capacity() * size_of::<HashValueType>();
}
size
}
}
#[derive(Debug)]
struct CountHashAccumulator {
values: HashSet<HashValueType, RandomState>,
random_state: RandomState,
batch_hashes: Vec<HashValueType>,
}
impl CountHashAccumulator {
// calculating the size for fixed length values, taking first batch size *
// number of batches.
fn fixed_size(&self) -> usize {
size_of_val(self) + (size_of::<HashValueType>() * self.values.capacity())
}
}
impl Accumulator for CountHashAccumulator {
/// Returns the distinct values seen so far as (one element) ListArray.
fn state(&mut self) -> Result<Vec<ScalarValue>> {
let values = self.values.iter().cloned().collect::<Vec<_>>();
let arr = Arc::new(UInt64Array::from(values)) as _;
let list_scalar = SingleRowListArrayBuilder::new(arr).build_list_scalar();
Ok(vec![list_scalar])
}
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
if values.is_empty() {
return Ok(());
}
let arr = &values[0];
if arr.data_type() == &DataType::Null {
return Ok(());
}
self.batch_hashes.clear();
self.batch_hashes.resize(arr.len(), 0);
let hashes = create_hashes(
&[ArrayRef::clone(arr)],
&self.random_state,
&mut self.batch_hashes,
)?;
for hash in hashes.as_slice() {
self.values.insert(*hash);
}
Ok(())
}
/// Merges multiple sets of distinct values into the current set.
///
/// The input to this function is a `ListArray` with **multiple** rows,
/// where each row contains the values from a partial aggregate's phase (e.g.
/// the result of calling `Self::state` on multiple accumulators).
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
if states.is_empty() {
return Ok(());
}
assert_eq!(states.len(), 1, "array_agg states must be singleton!");
let array = &states[0];
let list_array = array.as_list::<i32>();
for inner_array in list_array.iter() {
let Some(inner_array) = inner_array else {
return internal_err!(
"Intermediate results of count_hash should always be non null"
);
};
let hash_array = inner_array.as_any().downcast_ref::<UInt64Array>().unwrap();
for i in 0..hash_array.len() {
self.values.insert(hash_array.value(i));
}
}
Ok(())
}
fn evaluate(&mut self) -> Result<ScalarValue> {
Ok(ScalarValue::Int64(Some(self.values.len() as i64)))
}
fn size(&self) -> usize {
self.fixed_size()
}
}
#[cfg(test)]
mod tests {
use datatypes::arrow::array::{Array, BooleanArray, Int32Array, Int64Array};
use super::*;
fn create_test_accumulator() -> CountHashAccumulator {
CountHashAccumulator {
values: HashSet::default(),
random_state: RandomState::with_seeds(
RANDOM_SEED_0,
RANDOM_SEED_1,
RANDOM_SEED_2,
RANDOM_SEED_3,
),
batch_hashes: vec![],
}
}
#[test]
fn test_count_hash_accumulator() -> Result<()> {
let mut acc = create_test_accumulator();
// Test with some data
let array = Arc::new(Int32Array::from(vec![
Some(1),
Some(2),
Some(3),
Some(1),
Some(2),
None,
])) as ArrayRef;
acc.update_batch(&[array])?;
let result = acc.evaluate()?;
assert_eq!(result, ScalarValue::Int64(Some(4)));
// Test with empty data
let mut acc = create_test_accumulator();
let array = Arc::new(Int32Array::from(vec![] as Vec<Option<i32>>)) as ArrayRef;
acc.update_batch(&[array])?;
let result = acc.evaluate()?;
assert_eq!(result, ScalarValue::Int64(Some(0)));
// Test with only nulls
let mut acc = create_test_accumulator();
let array = Arc::new(Int32Array::from(vec![None, None, None])) as ArrayRef;
acc.update_batch(&[array])?;
let result = acc.evaluate()?;
assert_eq!(result, ScalarValue::Int64(Some(1)));
Ok(())
}
#[test]
fn test_count_hash_accumulator_merge() -> Result<()> {
// Accumulator 1
let mut acc1 = create_test_accumulator();
let array1 = Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])) as ArrayRef;
acc1.update_batch(&[array1])?;
let state1 = acc1.state()?;
// Accumulator 2
let mut acc2 = create_test_accumulator();
let array2 = Arc::new(Int32Array::from(vec![Some(3), Some(4), Some(5)])) as ArrayRef;
acc2.update_batch(&[array2])?;
let state2 = acc2.state()?;
// Merge state1 and state2 into a new accumulator
let mut acc_merged = create_test_accumulator();
let state_array1 = state1[0].to_array()?;
let state_array2 = state2[0].to_array()?;
acc_merged.merge_batch(&[state_array1])?;
acc_merged.merge_batch(&[state_array2])?;
let result = acc_merged.evaluate()?;
// Distinct values are {1, 2, 3, 4, 5}, so count is 5
assert_eq!(result, ScalarValue::Int64(Some(5)));
Ok(())
}
fn create_test_group_accumulator() -> CountHashGroupAccumulator {
CountHashGroupAccumulator::new()
}
#[test]
fn test_count_hash_group_accumulator() -> Result<()> {
let mut acc = create_test_group_accumulator();
let values = Arc::new(Int32Array::from(vec![1, 2, 1, 3, 2, 4, 5])) as ArrayRef;
let group_indices = vec![0, 1, 0, 0, 1, 2, 0];
let total_num_groups = 3;
acc.update_batch(&[values], &group_indices, None, total_num_groups)?;
let result_array = acc.evaluate(EmitTo::All)?;
let result = result_array.as_any().downcast_ref::<Int64Array>().unwrap();
// Group 0: {1, 3, 5} -> 3
// Group 1: {2} -> 1
// Group 2: {4} -> 1
assert_eq!(result.value(0), 3);
assert_eq!(result.value(1), 1);
assert_eq!(result.value(2), 1);
Ok(())
}
#[test]
fn test_count_hash_group_accumulator_with_filter() -> Result<()> {
let mut acc = create_test_group_accumulator();
let values = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6])) as ArrayRef;
let group_indices = vec![0, 0, 1, 1, 2, 2];
let filter = BooleanArray::from(vec![true, false, true, true, false, true]);
let total_num_groups = 3;
acc.update_batch(&[values], &group_indices, Some(&filter), total_num_groups)?;
let result_array = acc.evaluate(EmitTo::All)?;
let result = result_array.as_any().downcast_ref::<Int64Array>().unwrap();
// Group 0: {1} (2 is filtered out) -> 1
// Group 1: {3, 4} -> 2
// Group 2: {6} (5 is filtered out) -> 1
assert_eq!(result.value(0), 1);
assert_eq!(result.value(1), 2);
assert_eq!(result.value(2), 1);
Ok(())
}
#[test]
fn test_count_hash_group_accumulator_merge() -> Result<()> {
// Accumulator 1
let mut acc1 = create_test_group_accumulator();
let values1 = Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as ArrayRef;
let group_indices1 = vec![0, 0, 1, 1];
acc1.update_batch(&[values1], &group_indices1, None, 2)?;
// acc1 state: group 0 -> {1, 2}, group 1 -> {3, 4}
let state1 = acc1.state(EmitTo::All)?;
// Accumulator 2
let mut acc2 = create_test_group_accumulator();
let values2 = Arc::new(Int32Array::from(vec![5, 6, 1, 3])) as ArrayRef;
// Merge into different group indices
let group_indices2 = vec![2, 2, 0, 1];
acc2.update_batch(&[values2], &group_indices2, None, 3)?;
// acc2 state: group 0 -> {1}, group 1 -> {3}, group 2 -> {5, 6}
// Merge state from acc1 into acc2
// We will merge acc1's group 0 into acc2's group 0
// and acc1's group 1 into acc2's group 2
let merge_group_indices = vec![0, 2];
acc2.merge_batch(&state1, &merge_group_indices, None, 3)?;
let result_array = acc2.evaluate(EmitTo::All)?;
let result = result_array.as_any().downcast_ref::<Int64Array>().unwrap();
// Final state of acc2:
// Group 0: {1} U {1, 2} -> {1, 2}, count = 2
// Group 1: {3}, count = 1
// Group 2: {5, 6} U {3, 4} -> {3, 4, 5, 6}, count = 4
assert_eq!(result.value(0), 2);
assert_eq!(result.value(1), 1);
assert_eq!(result.value(2), 4);
Ok(())
}
#[test]
fn test_size() {
let acc = create_test_group_accumulator();
// Just test it doesn't crash and returns a value.
assert!(acc.size() > 0);
}
}

View File

@@ -21,6 +21,7 @@ use once_cell::sync::Lazy;
use crate::admin::AdminFunction;
use crate::aggrs::approximate::ApproximateFunction;
use crate::aggrs::count_hash::CountHash;
use crate::aggrs::vector::VectorFunction as VectorAggrFunction;
use crate::function::{AsyncFunctionRef, Function, FunctionRef};
use crate::function_factory::ScalarFunctionFactory;
@@ -144,6 +145,9 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
// Approximate functions
ApproximateFunction::register(&function_registry);
// CountHash function
CountHash::register(&function_registry);
Arc::new(function_registry)
});

View File

@@ -14,7 +14,6 @@
pub mod clamp;
mod modulo;
mod pow;
mod rate;
use std::fmt;
@@ -26,7 +25,6 @@ use datafusion::error::DataFusionError;
use datafusion::logical_expr::Volatility;
use datatypes::prelude::ConcreteDataType;
use datatypes::vectors::VectorRef;
pub use pow::PowFunction;
pub use rate::RateFunction;
use snafu::ResultExt;
@@ -39,7 +37,6 @@ pub(crate) struct MathFunction;
impl MathFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(ModuloFunction);
registry.register_scalar(PowFunction);
registry.register_scalar(RateFunction);
registry.register_scalar(RangeFunction);
registry.register_scalar(ClampFunction);

View File

@@ -1,120 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use std::sync::Arc;
use common_query::error::Result;
use common_query::prelude::{Signature, Volatility};
use datatypes::data_type::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::types::LogicalPrimitiveType;
use datatypes::vectors::VectorRef;
use datatypes::with_match_primitive_type_id;
use num::traits::Pow;
use num_traits::AsPrimitive;
use crate::function::{Function, FunctionContext};
use crate::scalars::expression::{scalar_binary_op, EvalContext};
#[derive(Clone, Debug, Default)]
pub struct PowFunction;
impl Function for PowFunction {
fn name(&self) -> &str {
"pow"
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::float64_datatype())
}
fn signature(&self) -> Signature {
Signature::uniform(2, ConcreteDataType::numerics(), Volatility::Immutable)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
with_match_primitive_type_id!(columns[0].data_type().logical_type_id(), |$S| {
with_match_primitive_type_id!(columns[1].data_type().logical_type_id(), |$T| {
let col = scalar_binary_op::<<$S as LogicalPrimitiveType>::Native, <$T as LogicalPrimitiveType>::Native, f64, _>(&columns[0], &columns[1], scalar_pow, &mut EvalContext::default())?;
Ok(Arc::new(col))
},{
unreachable!()
})
},{
unreachable!()
})
}
}
#[inline]
fn scalar_pow<S, T>(value: Option<S>, base: Option<T>, _ctx: &mut EvalContext) -> Option<f64>
where
S: AsPrimitive<f64>,
T: AsPrimitive<f64>,
{
match (value, base) {
(Some(value), Some(base)) => Some(value.as_().pow(base.as_())),
_ => None,
}
}
impl fmt::Display for PowFunction {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "POW")
}
}
#[cfg(test)]
mod tests {
use common_query::prelude::TypeSignature;
use datatypes::value::Value;
use datatypes::vectors::{Float32Vector, Int8Vector};
use super::*;
use crate::function::FunctionContext;
#[test]
fn test_pow_function() {
let pow = PowFunction;
assert_eq!("pow", pow.name());
assert_eq!(
ConcreteDataType::float64_datatype(),
pow.return_type(&[]).unwrap()
);
assert!(matches!(pow.signature(),
Signature {
type_signature: TypeSignature::Uniform(2, valid_types),
volatility: Volatility::Immutable
} if valid_types == ConcreteDataType::numerics()
));
let values = vec![1.0, 2.0, 3.0];
let bases = vec![0i8, -1i8, 3i8];
let args: Vec<VectorRef> = vec![
Arc::new(Float32Vector::from_vec(values.clone())),
Arc::new(Int8Vector::from_vec(bases.clone())),
];
let vector = pow.eval(&FunctionContext::default(), &args).unwrap();
assert_eq!(3, vector.len());
for i in 0..3 {
let p: f64 = (values[i] as f64).pow(bases[i] as f64);
assert!(matches!(vector.get(i), Value::Float64(v) if v == p));
}
}
}

View File

@@ -32,6 +32,7 @@ impl MockDatanodeHandler for () {
Ok(RegionResponse {
affected_rows: 0,
extensions: Default::default(),
metadata: Vec::new(),
})
}

View File

@@ -50,7 +50,11 @@ use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
#[cfg(feature = "enterprise")]
use crate::rpc::ddl::trigger::CreateTriggerTask;
#[cfg(feature = "enterprise")]
use crate::rpc::ddl::trigger::DropTriggerTask;
#[cfg(feature = "enterprise")]
use crate::rpc::ddl::DdlTask::CreateTrigger;
#[cfg(feature = "enterprise")]
use crate::rpc::ddl::DdlTask::DropTrigger;
use crate::rpc::ddl::DdlTask::{
AlterDatabase, AlterLogicalTables, AlterTable, CreateDatabase, CreateFlow, CreateLogicalTables,
CreateTable, CreateView, DropDatabase, DropFlow, DropLogicalTables, DropTable, DropView,
@@ -91,6 +95,14 @@ pub trait TriggerDdlManager: Send + Sync {
query_context: QueryContext,
) -> Result<SubmitDdlTaskResponse>;
async fn drop_trigger(
&self,
drop_trigger_task: DropTriggerTask,
procedure_manager: ProcedureManagerRef,
ddl_context: DdlContext,
query_context: QueryContext,
) -> Result<SubmitDdlTaskResponse>;
fn as_any(&self) -> &dyn std::any::Any;
}
@@ -125,13 +137,12 @@ impl DdlManager {
ddl_context: DdlContext,
procedure_manager: ProcedureManagerRef,
register_loaders: bool,
#[cfg(feature = "enterprise")] trigger_ddl_manager: Option<TriggerDdlManagerRef>,
) -> Result<Self> {
let manager = Self {
ddl_context,
procedure_manager,
#[cfg(feature = "enterprise")]
trigger_ddl_manager,
trigger_ddl_manager: None,
};
if register_loaders {
manager.register_loaders()?;
@@ -139,6 +150,15 @@ impl DdlManager {
Ok(manager)
}
#[cfg(feature = "enterprise")]
pub fn with_trigger_ddl_manager(
mut self,
trigger_ddl_manager: Option<TriggerDdlManagerRef>,
) -> Self {
self.trigger_ddl_manager = trigger_ddl_manager;
self
}
/// Returns the [TableMetadataManagerRef].
pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
&self.ddl_context.table_metadata_manager
@@ -640,6 +660,28 @@ async fn handle_drop_flow_task(
})
}
#[cfg(feature = "enterprise")]
async fn handle_drop_trigger_task(
ddl_manager: &DdlManager,
drop_trigger_task: DropTriggerTask,
query_context: QueryContext,
) -> Result<SubmitDdlTaskResponse> {
let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
return UnsupportedSnafu {
operation: "drop trigger",
}
.fail();
};
m.drop_trigger(
drop_trigger_task,
ddl_manager.procedure_manager.clone(),
ddl_manager.ddl_context.clone(),
query_context,
)
.await
}
async fn handle_drop_view_task(
ddl_manager: &DdlManager,
drop_view_task: DropViewTask,
@@ -827,6 +869,11 @@ impl ProcedureExecutor for DdlManager {
handle_create_flow_task(self, create_flow_task, request.query_context.into())
.await
}
DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await,
CreateView(create_view_task) => {
handle_create_view_task(self, create_view_task).await
}
DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await,
#[cfg(feature = "enterprise")]
CreateTrigger(create_trigger_task) => {
handle_create_trigger_task(
@@ -836,11 +883,11 @@ impl ProcedureExecutor for DdlManager {
)
.await
}
DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await,
CreateView(create_view_task) => {
handle_create_view_task(self, create_view_task).await
#[cfg(feature = "enterprise")]
DropTrigger(drop_trigger_task) => {
handle_drop_trigger_task(self, drop_trigger_task, request.query_context.into())
.await
}
DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await,
}
}
.trace(span)
@@ -948,6 +995,7 @@ mod tests {
Default::default(),
state_store,
poison_manager,
None,
));
let _ = DdlManager::try_new(
@@ -964,8 +1012,6 @@ mod tests {
},
procedure_manager.clone(),
true,
#[cfg(feature = "enterprise")]
None,
);
let expected_loaders = vec![

View File

@@ -100,8 +100,8 @@
pub mod catalog_name;
pub mod datanode_table;
pub mod flow;
pub mod maintenance;
pub mod node_address;
pub mod runtime_switch;
mod schema_metadata_manager;
pub mod schema_name;
pub mod table_info;
@@ -164,7 +164,9 @@ use crate::state_store::PoisonValue;
use crate::DatanodeId;
pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*";
pub const MAINTENANCE_KEY: &str = "__maintenance";
pub const LEGACY_MAINTENANCE_KEY: &str = "__maintenance";
pub const MAINTENANCE_KEY: &str = "__switches/maintenance";
pub const PAUSE_PROCEDURE_KEY: &str = "__switches/pause_procedure";
pub const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";

View File

@@ -1,86 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use crate::error::Result;
use crate::key::MAINTENANCE_KEY;
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::PutRequest;
pub type MaintenanceModeManagerRef = Arc<MaintenanceModeManager>;
/// The maintenance mode manager.
///
/// Used to enable or disable maintenance mode.
#[derive(Clone)]
pub struct MaintenanceModeManager {
kv_backend: KvBackendRef,
}
impl MaintenanceModeManager {
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
}
/// Enables maintenance mode.
pub async fn set_maintenance_mode(&self) -> Result<()> {
let req = PutRequest {
key: Vec::from(MAINTENANCE_KEY),
value: vec![],
prev_kv: false,
};
self.kv_backend.put(req).await?;
Ok(())
}
/// Unsets maintenance mode.
pub async fn unset_maintenance_mode(&self) -> Result<()> {
self.kv_backend
.delete(MAINTENANCE_KEY.as_bytes(), false)
.await?;
Ok(())
}
/// Returns true if maintenance mode is enabled.
pub async fn maintenance_mode(&self) -> Result<bool> {
self.kv_backend.exists(MAINTENANCE_KEY.as_bytes()).await
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use crate::key::maintenance::MaintenanceModeManager;
use crate::kv_backend::memory::MemoryKvBackend;
#[tokio::test]
async fn test_maintenance_mode_manager() {
let maintenance_mode_manager = Arc::new(MaintenanceModeManager::new(Arc::new(
MemoryKvBackend::new(),
)));
assert!(!maintenance_mode_manager.maintenance_mode().await.unwrap());
maintenance_mode_manager
.set_maintenance_mode()
.await
.unwrap();
assert!(maintenance_mode_manager.maintenance_mode().await.unwrap());
maintenance_mode_manager
.unset_maintenance_mode()
.await
.unwrap();
assert!(!maintenance_mode_manager.maintenance_mode().await.unwrap());
}
}

View File

@@ -0,0 +1,224 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::time::Duration;
use common_error::ext::BoxedError;
use common_procedure::local::PauseAware;
use moka::future::Cache;
use snafu::ResultExt;
use crate::error::{GetCacheSnafu, Result};
use crate::key::{LEGACY_MAINTENANCE_KEY, MAINTENANCE_KEY, PAUSE_PROCEDURE_KEY};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::{BatchDeleteRequest, PutRequest};
pub type RuntimeSwitchManagerRef = Arc<RuntimeSwitchManager>;
/// The runtime switch manager.
///
/// Used to enable or disable runtime switches.
#[derive(Clone)]
pub struct RuntimeSwitchManager {
kv_backend: KvBackendRef,
cache: Cache<Vec<u8>, Option<Vec<u8>>>,
}
#[async_trait::async_trait]
impl PauseAware for RuntimeSwitchManager {
async fn is_paused(&self) -> std::result::Result<bool, BoxedError> {
self.is_procedure_paused().await.map_err(BoxedError::new)
}
}
const CACHE_TTL: Duration = Duration::from_secs(10);
const MAX_CAPACITY: u64 = 32;
impl RuntimeSwitchManager {
pub fn new(kv_backend: KvBackendRef) -> Self {
let cache = Cache::builder()
.time_to_live(CACHE_TTL)
.max_capacity(MAX_CAPACITY)
.build();
Self { kv_backend, cache }
}
async fn put_key(&self, key: &str) -> Result<()> {
let req = PutRequest {
key: Vec::from(key),
value: vec![],
prev_kv: false,
};
self.kv_backend.put(req).await?;
self.cache.invalidate(key.as_bytes()).await;
Ok(())
}
async fn delete_keys(&self, keys: &[&str]) -> Result<()> {
let req = BatchDeleteRequest::new()
.with_keys(keys.iter().map(|x| x.as_bytes().to_vec()).collect());
self.kv_backend.batch_delete(req).await?;
for key in keys {
self.cache.invalidate(key.as_bytes()).await;
}
Ok(())
}
/// Returns true if the key exists.
async fn exists(&self, key: &str) -> Result<bool> {
let key = key.as_bytes().to_vec();
let kv_backend = self.kv_backend.clone();
let value = self
.cache
.try_get_with(key.clone(), async move {
kv_backend.get(&key).await.map(|v| v.map(|v| v.value))
})
.await
.context(GetCacheSnafu)?;
Ok(value.is_some())
}
/// Enables maintenance mode.
pub async fn set_maintenance_mode(&self) -> Result<()> {
self.put_key(MAINTENANCE_KEY).await
}
/// Unsets maintenance mode.
pub async fn unset_maintenance_mode(&self) -> Result<()> {
self.delete_keys(&[MAINTENANCE_KEY, LEGACY_MAINTENANCE_KEY])
.await
}
/// Returns true if maintenance mode is enabled.
pub async fn maintenance_mode(&self) -> Result<bool> {
let exists = self.exists(MAINTENANCE_KEY).await?;
if exists {
return Ok(true);
}
let exists = self.exists(LEGACY_MAINTENANCE_KEY).await?;
if exists {
return Ok(true);
}
Ok(false)
}
// Pauses handling of incoming procedure requests.
pub async fn pasue_procedure(&self) -> Result<()> {
self.put_key(PAUSE_PROCEDURE_KEY).await
}
/// Resumes processing of incoming procedure requests.
pub async fn resume_procedure(&self) -> Result<()> {
self.delete_keys(&[PAUSE_PROCEDURE_KEY]).await
}
/// Returns true if the system is currently pausing incoming procedure requests.
pub async fn is_procedure_paused(&self) -> Result<bool> {
self.exists(PAUSE_PROCEDURE_KEY).await
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use crate::key::runtime_switch::RuntimeSwitchManager;
use crate::key::{LEGACY_MAINTENANCE_KEY, MAINTENANCE_KEY};
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::KvBackend;
use crate::rpc::store::PutRequest;
#[tokio::test]
async fn test_runtime_switch_manager_basic() {
let runtime_switch_manager =
Arc::new(RuntimeSwitchManager::new(Arc::new(MemoryKvBackend::new())));
runtime_switch_manager
.put_key(MAINTENANCE_KEY)
.await
.unwrap();
let v = runtime_switch_manager
.cache
.get(MAINTENANCE_KEY.as_bytes())
.await;
assert!(v.is_none());
runtime_switch_manager
.exists(MAINTENANCE_KEY)
.await
.unwrap();
let v = runtime_switch_manager
.cache
.get(MAINTENANCE_KEY.as_bytes())
.await;
assert!(v.is_some());
runtime_switch_manager
.delete_keys(&[MAINTENANCE_KEY])
.await
.unwrap();
let v = runtime_switch_manager
.cache
.get(MAINTENANCE_KEY.as_bytes())
.await;
assert!(v.is_none());
}
#[tokio::test]
async fn test_runtime_switch_manager() {
let runtime_switch_manager =
Arc::new(RuntimeSwitchManager::new(Arc::new(MemoryKvBackend::new())));
assert!(!runtime_switch_manager.maintenance_mode().await.unwrap());
runtime_switch_manager.set_maintenance_mode().await.unwrap();
assert!(runtime_switch_manager.maintenance_mode().await.unwrap());
runtime_switch_manager
.unset_maintenance_mode()
.await
.unwrap();
assert!(!runtime_switch_manager.maintenance_mode().await.unwrap());
}
#[tokio::test]
async fn test_runtime_switch_manager_with_legacy_key() {
let kv_backend = Arc::new(MemoryKvBackend::new());
kv_backend
.put(PutRequest {
key: Vec::from(LEGACY_MAINTENANCE_KEY),
value: vec![],
prev_kv: false,
})
.await
.unwrap();
let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend));
assert!(runtime_switch_manager.maintenance_mode().await.unwrap());
runtime_switch_manager
.unset_maintenance_mode()
.await
.unwrap();
assert!(!runtime_switch_manager.maintenance_mode().await.unwrap());
runtime_switch_manager.set_maintenance_mode().await.unwrap();
assert!(runtime_switch_manager.maintenance_mode().await.unwrap());
}
#[tokio::test]
async fn test_pasue_procedure() {
let runtime_switch_manager =
Arc::new(RuntimeSwitchManager::new(Arc::new(MemoryKvBackend::new())));
runtime_switch_manager.pasue_procedure().await.unwrap();
assert!(runtime_switch_manager.is_procedure_paused().await.unwrap());
runtime_switch_manager.resume_procedure().await.unwrap();
assert!(!runtime_switch_manager.is_procedure_paused().await.unwrap());
}
}

View File

@@ -48,6 +48,11 @@ impl TableRouteKey {
pub fn new(table_id: TableId) -> Self {
Self { table_id }
}
/// Returns the range prefix of the table route key.
pub fn range_prefix() -> Vec<u8> {
format!("{}/", TABLE_ROUTE_PREFIX).into_bytes()
}
}
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]

View File

@@ -69,6 +69,8 @@ pub enum DdlTask {
AlterDatabase(AlterDatabaseTask),
CreateFlow(CreateFlowTask),
DropFlow(DropFlowTask),
#[cfg(feature = "enterprise")]
DropTrigger(trigger::DropTriggerTask),
CreateView(CreateViewTask),
DropView(DropViewTask),
#[cfg(feature = "enterprise")]
@@ -259,6 +261,18 @@ impl TryFrom<Task> for DdlTask {
.fail()
}
}
Task::DropTriggerTask(drop_trigger) => {
#[cfg(feature = "enterprise")]
return Ok(DdlTask::DropTrigger(drop_trigger.try_into()?));
#[cfg(not(feature = "enterprise"))]
{
let _ = drop_trigger;
crate::error::UnsupportedSnafu {
operation: "drop trigger",
}
.fail()
}
}
}
}
}
@@ -311,6 +325,8 @@ impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
DdlTask::DropView(task) => Task::DropViewTask(task.into()),
#[cfg(feature = "enterprise")]
DdlTask::CreateTrigger(task) => Task::CreateTriggerTask(task.into()),
#[cfg(feature = "enterprise")]
DdlTask::DropTrigger(task) => Task::DropTriggerTask(task.into()),
};
Ok(Self {

View File

@@ -1,10 +1,13 @@
use std::collections::HashMap;
use std::time::Duration;
use api::v1::meta::CreateTriggerTask as PbCreateTriggerTask;
use api::v1::meta::{
CreateTriggerTask as PbCreateTriggerTask, DropTriggerTask as PbDropTriggerTask,
};
use api::v1::notify_channel::ChannelType as PbChannelType;
use api::v1::{
CreateTriggerExpr, NotifyChannel as PbNotifyChannel, WebhookOptions as PbWebhookOptions,
CreateTriggerExpr as PbCreateTriggerExpr, DropTriggerExpr as PbDropTriggerExpr,
NotifyChannel as PbNotifyChannel, WebhookOptions as PbWebhookOptions,
};
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
@@ -56,7 +59,7 @@ impl From<CreateTriggerTask> for PbCreateTriggerTask {
.map(PbNotifyChannel::from)
.collect();
let expr = CreateTriggerExpr {
let expr = PbCreateTriggerExpr {
catalog_name: task.catalog_name,
trigger_name: task.trigger_name,
create_if_not_exists: task.if_not_exists,
@@ -139,17 +142,86 @@ impl TryFrom<PbNotifyChannel> for NotifyChannel {
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DropTriggerTask {
pub catalog_name: String,
pub trigger_name: String,
pub drop_if_exists: bool,
}
impl From<DropTriggerTask> for PbDropTriggerTask {
fn from(task: DropTriggerTask) -> Self {
let expr = PbDropTriggerExpr {
catalog_name: task.catalog_name,
trigger_name: task.trigger_name,
drop_if_exists: task.drop_if_exists,
};
PbDropTriggerTask {
drop_trigger: Some(expr),
}
}
}
impl TryFrom<PbDropTriggerTask> for DropTriggerTask {
type Error = error::Error;
fn try_from(task: PbDropTriggerTask) -> Result<Self> {
let expr = task.drop_trigger.context(error::InvalidProtoMsgSnafu {
err_msg: "expected drop_trigger",
})?;
Ok(DropTriggerTask {
catalog_name: expr.catalog_name,
trigger_name: expr.trigger_name,
drop_if_exists: expr.drop_if_exists,
})
}
}
impl DdlTask {
/// Creates a [`DdlTask`] to create a trigger.
pub fn new_create_trigger(expr: CreateTriggerTask) -> Self {
DdlTask::CreateTrigger(expr)
}
/// Creates a [`DdlTask`] to drop a trigger.
pub fn new_drop_trigger(expr: DropTriggerTask) -> Self {
DdlTask::DropTrigger(expr)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_convert_drop_trigger_task() {
let original = DropTriggerTask {
catalog_name: "test_catalog".to_string(),
trigger_name: "test_trigger".to_string(),
drop_if_exists: true,
};
let pb_task: PbDropTriggerTask = original.clone().into();
let expr = pb_task.drop_trigger.as_ref().unwrap();
assert_eq!(expr.catalog_name, "test_catalog");
assert_eq!(expr.trigger_name, "test_trigger");
assert!(expr.drop_if_exists);
let round_tripped = DropTriggerTask::try_from(pb_task).unwrap();
assert_eq!(original.catalog_name, round_tripped.catalog_name);
assert_eq!(original.trigger_name, round_tripped.trigger_name);
assert_eq!(original.drop_if_exists, round_tripped.drop_if_exists);
// Test invalid case where drop_trigger is None
let invalid_task = PbDropTriggerTask { drop_trigger: None };
let result = DropTriggerTask::try_from(invalid_task);
assert!(result.is_err());
}
#[test]
fn test_convert_create_trigger_task() {
let original = CreateTriggerTask {

View File

@@ -28,6 +28,19 @@ use crate::PoisonKey;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Failed to check procedure manager status"))]
CheckStatus {
source: BoxedError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Manager is pasued"))]
ManagerPasued {
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Failed to execute procedure due to external error, clean poisons: {}",
clean_poisons
@@ -246,7 +259,8 @@ impl ErrorExt for Error {
| Error::ListState { source, .. }
| Error::PutPoison { source, .. }
| Error::DeletePoison { source, .. }
| Error::GetPoison { source, .. } => source.status_code(),
| Error::GetPoison { source, .. }
| Error::CheckStatus { source, .. } => source.status_code(),
Error::ToJson { .. }
| Error::DeleteState { .. }
@@ -259,7 +273,8 @@ impl ErrorExt for Error {
Error::RetryTimesExceeded { .. }
| Error::RollbackTimesExceeded { .. }
| Error::ManagerNotStart { .. } => StatusCode::IllegalState,
| Error::ManagerNotStart { .. }
| Error::ManagerPasued { .. } => StatusCode::IllegalState,
Error::RollbackNotSupported { .. } => StatusCode::Unsupported,
Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => {

View File

@@ -22,6 +22,7 @@ use std::time::{Duration, Instant};
use async_trait::async_trait;
use backon::ExponentialBuilder;
use common_error::ext::BoxedError;
use common_runtime::{RepeatedTask, TaskFunction};
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{error, info, tracing};
@@ -30,9 +31,10 @@ use tokio::sync::watch::{self, Receiver, Sender};
use tokio::sync::{Mutex as TokioMutex, Notify};
use crate::error::{
self, DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ManagerNotStartSnafu,
PoisonKeyNotDefinedSnafu, ProcedureNotFoundSnafu, Result, StartRemoveOutdatedMetaTaskSnafu,
StopRemoveOutdatedMetaTaskSnafu, TooManyRunningProceduresSnafu,
self, CheckStatusSnafu, DuplicateProcedureSnafu, Error, LoaderConflictSnafu,
ManagerNotStartSnafu, ManagerPasuedSnafu, PoisonKeyNotDefinedSnafu, ProcedureNotFoundSnafu,
Result, StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu,
TooManyRunningProceduresSnafu,
};
use crate::local::runner::Runner;
use crate::procedure::{BoxedProcedureLoader, InitProcedureState, PoisonKeys, ProcedureInfo};
@@ -522,6 +524,14 @@ impl Default for ManagerConfig {
}
}
type PauseAwareRef = Arc<dyn PauseAware>;
#[async_trait]
pub trait PauseAware: Send + Sync {
/// Returns true if the procedure manager is paused.
async fn is_paused(&self) -> std::result::Result<bool, BoxedError>;
}
/// A [ProcedureManager] that maintains procedure states locally.
pub struct LocalManager {
manager_ctx: Arc<ManagerContext>,
@@ -531,6 +541,7 @@ pub struct LocalManager {
/// GC task.
remove_outdated_meta_task: TokioMutex<Option<RepeatedTask<Error>>>,
config: ManagerConfig,
pause_aware: Option<PauseAwareRef>,
}
impl LocalManager {
@@ -539,6 +550,7 @@ impl LocalManager {
config: ManagerConfig,
state_store: StateStoreRef,
poison_store: PoisonStoreRef,
pause_aware: Option<PauseAwareRef>,
) -> LocalManager {
let manager_ctx = Arc::new(ManagerContext::new(poison_store));
@@ -549,6 +561,7 @@ impl LocalManager {
retry_delay: config.retry_delay,
remove_outdated_meta_task: TokioMutex::new(None),
config,
pause_aware,
}
}
@@ -719,6 +732,17 @@ impl LocalManager {
let loaders = self.manager_ctx.loaders.lock().unwrap();
loaders.contains_key(name)
}
async fn check_status(&self) -> Result<()> {
if let Some(pause_aware) = self.pause_aware.as_ref() {
ensure!(
!pause_aware.is_paused().await.context(CheckStatusSnafu)?,
ManagerPasuedSnafu
);
}
Ok(())
}
}
#[async_trait]
@@ -774,6 +798,7 @@ impl ProcedureManager for LocalManager {
!self.manager_ctx.contains_procedure(procedure_id),
DuplicateProcedureSnafu { procedure_id }
);
self.check_status().await?;
self.submit_root(
procedure.id,
@@ -979,7 +1004,7 @@ mod tests {
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager);
let manager = LocalManager::new(config, state_store, poison_manager, None);
manager.manager_ctx.start();
manager
@@ -1004,7 +1029,7 @@ mod tests {
};
let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager);
let manager = LocalManager::new(config, state_store, poison_manager, None);
manager.manager_ctx.start();
manager
@@ -1058,7 +1083,7 @@ mod tests {
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager);
let manager = LocalManager::new(config, state_store, poison_manager, None);
manager.manager_ctx.start();
let procedure_id = ProcedureId::random();
@@ -1110,7 +1135,7 @@ mod tests {
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager);
let manager = LocalManager::new(config, state_store, poison_manager, None);
manager.manager_ctx.start();
#[derive(Debug)]
@@ -1191,7 +1216,7 @@ mod tests {
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager);
let manager = LocalManager::new(config, state_store, poison_manager, None);
let mut procedure = ProcedureToLoad::new("submit");
procedure.lock_key = LockKey::single_exclusive("test.submit");
@@ -1219,7 +1244,7 @@ mod tests {
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager);
let manager = LocalManager::new(config, state_store, poison_manager, None);
manager.start().await.unwrap();
manager.stop().await.unwrap();
@@ -1256,7 +1281,7 @@ mod tests {
};
let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager);
let manager = LocalManager::new(config, state_store, poison_manager, None);
manager.manager_ctx.set_running();
let mut procedure = ProcedureToLoad::new("submit");
@@ -1338,7 +1363,7 @@ mod tests {
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager);
let manager = LocalManager::new(config, state_store, poison_manager, None);
manager.manager_ctx.set_running();
manager
@@ -1463,7 +1488,7 @@ mod tests {
};
let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager);
let manager = LocalManager::new(config, state_store, poison_manager, None);
manager.manager_ctx.start();
let notify = Arc::new(Notify::new());

View File

@@ -83,7 +83,7 @@ mod tests {
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let poison_manager = Arc::new(InMemoryPoisonStore::default());
let manager = LocalManager::new(config, state_store, poison_manager);
let manager = LocalManager::new(config, state_store, poison_manager, None);
manager.start().await.unwrap();
#[derive(Debug)]

View File

@@ -14,7 +14,7 @@
pub mod columnar_value;
pub mod error;
mod function;
pub mod function;
pub mod logical_plan;
pub mod prelude;
pub mod request;

View File

@@ -22,7 +22,7 @@ once_cell.workspace = true
opentelemetry = { version = "0.21.0", default-features = false, features = [
"trace",
] }
opentelemetry-otlp = { version = "0.14.0", features = ["tokio"] }
opentelemetry-otlp = { version = "0.14.0", features = ["tokio", "http-proto", "reqwest-client"] }
opentelemetry-semantic-conventions = "0.13.0"
opentelemetry_sdk = { version = "0.21.0", features = ["rt-tokio"] }
parking_lot.workspace = true

View File

@@ -20,7 +20,7 @@ use std::time::Duration;
use once_cell::sync::{Lazy, OnceCell};
use opentelemetry::{global, KeyValue};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_otlp::{Protocol, SpanExporterBuilder, WithExportConfig};
use opentelemetry_sdk::propagation::TraceContextPropagator;
use opentelemetry_sdk::trace::Sampler;
use opentelemetry_semantic_conventions::resource;
@@ -36,7 +36,11 @@ use tracing_subscriber::{filter, EnvFilter, Registry};
use crate::tracing_sampler::{create_sampler, TracingSampleOptions};
pub const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317";
/// The default endpoint when use gRPC exporter protocol.
pub const DEFAULT_OTLP_GRPC_ENDPOINT: &str = "http://localhost:4317";
/// The default endpoint when use HTTP exporter protocol.
pub const DEFAULT_OTLP_HTTP_ENDPOINT: &str = "http://localhost:4318";
/// The default logs directory.
pub const DEFAULT_LOGGING_DIR: &str = "logs";
@@ -67,11 +71,25 @@ pub struct LoggingOptions {
/// Whether to enable tracing with OTLP. Default is false.
pub enable_otlp_tracing: bool,
/// The endpoint of OTLP. Default is "http://localhost:4317".
/// The endpoint of OTLP. Default is "http://localhost:4318".
pub otlp_endpoint: Option<String>,
/// The tracing sample ratio.
pub tracing_sample_ratio: Option<TracingSampleOptions>,
/// The protocol of OTLP export.
pub otlp_export_protocol: Option<OtlpExportProtocol>,
}
/// The protocol of OTLP export.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum OtlpExportProtocol {
/// GRPC protocol.
Grpc,
/// HTTP protocol with binary protobuf.
Http,
}
/// The options of slow query.
@@ -147,6 +165,7 @@ impl Default for LoggingOptions {
append_stdout: true,
// Rotation hourly, 24 files per day, keeps info log files of 30 days
max_log_files: 720,
otlp_export_protocol: None,
}
}
}
@@ -388,22 +407,9 @@ pub fn init_global_logging(
KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()),
]));
let exporter = opentelemetry_otlp::new_exporter().tonic().with_endpoint(
opts.otlp_endpoint
.as_ref()
.map(|e| {
if e.starts_with("http") {
e.to_string()
} else {
format!("http://{}", e)
}
})
.unwrap_or(DEFAULT_OTLP_ENDPOINT.to_string()),
);
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(exporter)
.with_exporter(build_otlp_exporter(opts))
.with_trace_config(trace_config)
.install_batch(opentelemetry_sdk::runtime::Tokio)
.expect("otlp tracer install failed");
@@ -421,6 +427,42 @@ pub fn init_global_logging(
guards
}
fn build_otlp_exporter(opts: &LoggingOptions) -> SpanExporterBuilder {
let protocol = opts
.otlp_export_protocol
.clone()
.unwrap_or(OtlpExportProtocol::Http);
let endpoint = opts
.otlp_endpoint
.as_ref()
.map(|e| {
if e.starts_with("http") {
e.to_string()
} else {
format!("http://{}", e)
}
})
.unwrap_or_else(|| match protocol {
OtlpExportProtocol::Grpc => DEFAULT_OTLP_GRPC_ENDPOINT.to_string(),
OtlpExportProtocol::Http => DEFAULT_OTLP_HTTP_ENDPOINT.to_string(),
});
match protocol {
OtlpExportProtocol::Grpc => SpanExporterBuilder::Tonic(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(endpoint),
),
OtlpExportProtocol::Http => SpanExporterBuilder::Http(
opentelemetry_otlp::new_exporter()
.http()
.with_endpoint(endpoint)
.with_protocol(Protocol::HttpBinary),
),
}
}
fn build_slow_query_logger<S>(
opts: &LoggingOptions,
slow_query_opts: Option<&SlowQueryOptions>,

View File

@@ -475,7 +475,7 @@ mod test {
async fn region_alive_keeper() {
common_telemetry::init_default_ut_logging();
let mut region_server = mock_region_server();
let mut engine_env = TestEnv::with_prefix("region-alive-keeper");
let mut engine_env = TestEnv::with_prefix("region-alive-keeper").await;
let engine = engine_env.create_engine(MitoConfig::default()).await;
let engine = Arc::new(engine);
region_server.register_engine(engine.clone());

View File

@@ -14,10 +14,7 @@
//! Datanode configurations
use core::time::Duration;
use common_base::readable_size::ReadableSize;
use common_base::secrets::{ExposeSecret, SecretString};
use common_config::{Configurable, DEFAULT_DATA_HOME};
pub use common_procedure::options::ProcedureConfig;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
@@ -27,6 +24,7 @@ use file_engine::config::EngineConfig as FileEngineConfig;
use meta_client::MetaClientOptions;
use metric_engine::config::EngineConfig as MetricEngineConfig;
use mito2::config::MitoConfig;
pub(crate) use object_store::config::ObjectStoreConfig;
use query::options::QueryOptions;
use serde::{Deserialize, Serialize};
use servers::export_metrics::ExportMetricsOption;
@@ -36,53 +34,6 @@ use servers::http::HttpOptions;
pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::gb(5);
/// Object storage config
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type")]
pub enum ObjectStoreConfig {
File(FileConfig),
S3(S3Config),
Oss(OssConfig),
Azblob(AzblobConfig),
Gcs(GcsConfig),
}
impl ObjectStoreConfig {
/// Returns the object storage type name, such as `S3`, `Oss` etc.
pub fn provider_name(&self) -> &'static str {
match self {
Self::File(_) => "File",
Self::S3(_) => "S3",
Self::Oss(_) => "Oss",
Self::Azblob(_) => "Azblob",
Self::Gcs(_) => "Gcs",
}
}
/// Returns true when it's a remote object storage such as AWS s3 etc.
pub fn is_object_storage(&self) -> bool {
!matches!(self, Self::File(_))
}
/// Returns the object storage configuration name, return the provider name if it's empty.
pub fn config_name(&self) -> &str {
let name = match self {
// file storage doesn't support name
Self::File(_) => self.provider_name(),
Self::S3(s3) => &s3.name,
Self::Oss(oss) => &oss.name,
Self::Azblob(az) => &az.name,
Self::Gcs(gcs) => &gcs.name,
};
if name.trim().is_empty() {
return self.provider_name();
}
name
}
}
/// Storage engine config
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(default)]
@@ -112,252 +63,6 @@ impl Default for StorageConfig {
}
}
#[derive(Debug, Clone, Serialize, Default, Deserialize, Eq, PartialEq)]
#[serde(default)]
pub struct FileConfig {}
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
#[serde(default)]
pub struct ObjectStorageCacheConfig {
/// The local file cache directory
pub cache_path: Option<String>,
/// The cache capacity in bytes
pub cache_capacity: Option<ReadableSize>,
}
/// The http client options to the storage.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(default)]
pub struct HttpClientConfig {
/// The maximum idle connection per host allowed in the pool.
pub(crate) pool_max_idle_per_host: u32,
/// The timeout for only the connect phase of a http client.
#[serde(with = "humantime_serde")]
pub(crate) connect_timeout: Duration,
/// The total request timeout, applied from when the request starts connecting until the response body has finished.
/// Also considered a total deadline.
#[serde(with = "humantime_serde")]
pub(crate) timeout: Duration,
/// The timeout for idle sockets being kept-alive.
#[serde(with = "humantime_serde")]
pub(crate) pool_idle_timeout: Duration,
/// Skip SSL certificate validation (insecure)
pub skip_ssl_validation: bool,
}
impl Default for HttpClientConfig {
fn default() -> Self {
Self {
pool_max_idle_per_host: 1024,
connect_timeout: Duration::from_secs(30),
timeout: Duration::from_secs(30),
pool_idle_timeout: Duration::from_secs(90),
skip_ssl_validation: false,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct S3Config {
pub name: String,
pub bucket: String,
pub root: String,
#[serde(skip_serializing)]
pub access_key_id: SecretString,
#[serde(skip_serializing)]
pub secret_access_key: SecretString,
pub endpoint: Option<String>,
pub region: Option<String>,
/// Enable virtual host style so that opendal will send API requests in virtual host style instead of path style.
/// By default, opendal will send API to https://s3.us-east-1.amazonaws.com/bucket_name
/// Enabled, opendal will send API to https://bucket_name.s3.us-east-1.amazonaws.com
pub enable_virtual_host_style: bool,
#[serde(flatten)]
pub cache: ObjectStorageCacheConfig,
pub http_client: HttpClientConfig,
}
impl PartialEq for S3Config {
fn eq(&self, other: &Self) -> bool {
self.name == other.name
&& self.bucket == other.bucket
&& self.root == other.root
&& self.access_key_id.expose_secret() == other.access_key_id.expose_secret()
&& self.secret_access_key.expose_secret() == other.secret_access_key.expose_secret()
&& self.endpoint == other.endpoint
&& self.region == other.region
&& self.enable_virtual_host_style == other.enable_virtual_host_style
&& self.cache == other.cache
&& self.http_client == other.http_client
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct OssConfig {
pub name: String,
pub bucket: String,
pub root: String,
#[serde(skip_serializing)]
pub access_key_id: SecretString,
#[serde(skip_serializing)]
pub access_key_secret: SecretString,
pub endpoint: String,
#[serde(flatten)]
pub cache: ObjectStorageCacheConfig,
pub http_client: HttpClientConfig,
}
impl PartialEq for OssConfig {
fn eq(&self, other: &Self) -> bool {
self.name == other.name
&& self.bucket == other.bucket
&& self.root == other.root
&& self.access_key_id.expose_secret() == other.access_key_id.expose_secret()
&& self.access_key_secret.expose_secret() == other.access_key_secret.expose_secret()
&& self.endpoint == other.endpoint
&& self.cache == other.cache
&& self.http_client == other.http_client
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct AzblobConfig {
pub name: String,
pub container: String,
pub root: String,
#[serde(skip_serializing)]
pub account_name: SecretString,
#[serde(skip_serializing)]
pub account_key: SecretString,
pub endpoint: String,
pub sas_token: Option<String>,
#[serde(flatten)]
pub cache: ObjectStorageCacheConfig,
pub http_client: HttpClientConfig,
}
impl PartialEq for AzblobConfig {
fn eq(&self, other: &Self) -> bool {
self.name == other.name
&& self.container == other.container
&& self.root == other.root
&& self.account_name.expose_secret() == other.account_name.expose_secret()
&& self.account_key.expose_secret() == other.account_key.expose_secret()
&& self.endpoint == other.endpoint
&& self.sas_token == other.sas_token
&& self.cache == other.cache
&& self.http_client == other.http_client
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct GcsConfig {
pub name: String,
pub root: String,
pub bucket: String,
pub scope: String,
#[serde(skip_serializing)]
pub credential_path: SecretString,
#[serde(skip_serializing)]
pub credential: SecretString,
pub endpoint: String,
#[serde(flatten)]
pub cache: ObjectStorageCacheConfig,
pub http_client: HttpClientConfig,
}
impl PartialEq for GcsConfig {
fn eq(&self, other: &Self) -> bool {
self.name == other.name
&& self.root == other.root
&& self.bucket == other.bucket
&& self.scope == other.scope
&& self.credential_path.expose_secret() == other.credential_path.expose_secret()
&& self.credential.expose_secret() == other.credential.expose_secret()
&& self.endpoint == other.endpoint
&& self.cache == other.cache
&& self.http_client == other.http_client
}
}
impl Default for S3Config {
fn default() -> Self {
Self {
name: String::default(),
bucket: String::default(),
root: String::default(),
access_key_id: SecretString::from(String::default()),
secret_access_key: SecretString::from(String::default()),
enable_virtual_host_style: false,
endpoint: Option::default(),
region: Option::default(),
cache: ObjectStorageCacheConfig::default(),
http_client: HttpClientConfig::default(),
}
}
}
impl Default for OssConfig {
fn default() -> Self {
Self {
name: String::default(),
bucket: String::default(),
root: String::default(),
access_key_id: SecretString::from(String::default()),
access_key_secret: SecretString::from(String::default()),
endpoint: String::default(),
cache: ObjectStorageCacheConfig::default(),
http_client: HttpClientConfig::default(),
}
}
}
impl Default for AzblobConfig {
fn default() -> Self {
Self {
name: String::default(),
container: String::default(),
root: String::default(),
account_name: SecretString::from(String::default()),
account_key: SecretString::from(String::default()),
endpoint: String::default(),
sas_token: Option::default(),
cache: ObjectStorageCacheConfig::default(),
http_client: HttpClientConfig::default(),
}
}
}
impl Default for GcsConfig {
fn default() -> Self {
Self {
name: String::default(),
root: String::default(),
bucket: String::default(),
scope: String::default(),
credential_path: SecretString::from(String::default()),
credential: SecretString::from(String::default()),
endpoint: String::default(),
cache: ObjectStorageCacheConfig::default(),
http_client: HttpClientConfig::default(),
}
}
}
impl Default for ObjectStoreConfig {
fn default() -> Self {
ObjectStoreConfig::File(FileConfig {})
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[serde(default)]
pub struct DatanodeOptions {
@@ -467,37 +172,6 @@ mod tests {
let _parsed: DatanodeOptions = toml::from_str(&toml_string).unwrap();
}
#[test]
fn test_config_name() {
let object_store_config = ObjectStoreConfig::default();
assert_eq!("File", object_store_config.config_name());
let s3_config = ObjectStoreConfig::S3(S3Config::default());
assert_eq!("S3", s3_config.config_name());
assert_eq!("S3", s3_config.provider_name());
let s3_config = ObjectStoreConfig::S3(S3Config {
name: "test".to_string(),
..Default::default()
});
assert_eq!("test", s3_config.config_name());
assert_eq!("S3", s3_config.provider_name());
}
#[test]
fn test_is_object_storage() {
let store = ObjectStoreConfig::default();
assert!(!store.is_object_storage());
let s3_config = ObjectStoreConfig::S3(S3Config::default());
assert!(s3_config.is_object_storage());
let oss_config = ObjectStoreConfig::Oss(OssConfig::default());
assert!(oss_config.is_object_storage());
let gcs_config = ObjectStoreConfig::Gcs(GcsConfig::default());
assert!(gcs_config.is_object_storage());
let azblob_config = ObjectStoreConfig::Azblob(AzblobConfig::default());
assert!(azblob_config.is_object_storage());
}
#[test]
fn test_secstr() {
let toml_str = r#"

View File

@@ -142,14 +142,6 @@ pub enum Error {
source: Box<log_store::error::Error>,
},
#[snafu(display("Failed to init backend"))]
InitBackend {
#[snafu(source)]
error: object_store::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid SQL, error: {}", msg))]
InvalidSql { msg: String },
@@ -387,6 +379,29 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to serialize json"))]
SerializeJson {
#[snafu(source)]
error: serde_json::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed object store operation"))]
ObjectStore {
source: object_store::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to build cache store"))]
BuildCacheStore {
#[snafu(source)]
error: object_store::Error,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -439,8 +454,6 @@ impl ErrorExt for Error {
StartServer { source, .. } | ShutdownServer { source, .. } => source.status_code(),
InitBackend { .. } => StatusCode::StorageUnavailable,
OpenLogStore { source, .. } => source.status_code(),
MetaClientInit { source, .. } => source.status_code(),
UnsupportedOutput { .. } => StatusCode::Unsupported,
@@ -457,6 +470,10 @@ impl ErrorExt for Error {
StatusCode::RegionBusy
}
MissingCache { .. } => StatusCode::Internal,
SerializeJson { .. } => StatusCode::Internal,
ObjectStore { source, .. } => source.status_code(),
BuildCacheStore { .. } => StatusCode::StorageUnavailable,
}
}

View File

@@ -278,7 +278,7 @@ mod tests {
let mut region_server = mock_region_server();
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
let mut engine_env = TestEnv::with_prefix("close-region");
let mut engine_env = TestEnv::with_prefix("close-region").await;
let engine = engine_env.create_engine(MitoConfig::default()).await;
region_server.register_engine(Arc::new(engine));
let region_id = RegionId::new(1024, 1);
@@ -326,7 +326,7 @@ mod tests {
let mut region_server = mock_region_server();
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
let mut engine_env = TestEnv::with_prefix("open-region");
let mut engine_env = TestEnv::with_prefix("open-region").await;
let engine = engine_env.create_engine(MitoConfig::default()).await;
region_server.register_engine(Arc::new(engine));
let region_id = RegionId::new(1024, 1);
@@ -374,7 +374,7 @@ mod tests {
let mut region_server = mock_region_server();
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
let mut engine_env = TestEnv::with_prefix("open-not-exists-region");
let mut engine_env = TestEnv::with_prefix("open-not-exists-region").await;
let engine = engine_env.create_engine(MitoConfig::default()).await;
region_server.register_engine(Arc::new(engine));
let region_id = RegionId::new(1024, 1);
@@ -406,7 +406,7 @@ mod tests {
let mut region_server = mock_region_server();
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
let mut engine_env = TestEnv::with_prefix("downgrade-region");
let mut engine_env = TestEnv::with_prefix("downgrade-region").await;
let engine = engine_env.create_engine(MitoConfig::default()).await;
region_server.register_engine(Arc::new(engine));
let region_id = RegionId::new(1024, 1);

View File

@@ -20,12 +20,14 @@ use std::time::Duration;
use api::region::RegionResponse;
use api::v1::region::sync_request::ManifestInfo;
use api::v1::region::{region_request, RegionResponse as RegionResponseV1, SyncRequest};
use api::v1::region::{
region_request, ListMetadataRequest, RegionResponse as RegionResponseV1, SyncRequest,
};
use api::v1::{ResponseHeader, Status};
use arrow_flight::{FlightData, Ticket};
use async_trait::async_trait;
use bytes::Bytes;
use common_error::ext::BoxedError;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_query::request::QueryRequest;
use common_query::OutputData;
@@ -47,6 +49,7 @@ pub use query::dummy_catalog::{
DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef,
};
use query::QueryEngineRef;
use serde_json;
use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult};
use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
use servers::grpc::region_server::RegionServerHandler;
@@ -71,10 +74,10 @@ use tonic::{Request, Response, Result as TonicResult};
use crate::error::{
self, BuildRegionRequestsSnafu, ConcurrentQueryLimiterClosedSnafu,
ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, HandleBatchDdlRequestSnafu,
HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu, NewPlanDecoderSnafu,
RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu, Result,
StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, GetRegionMetadataSnafu,
HandleBatchDdlRequestSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu,
NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu,
Result, SerializeJsonSnafu, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
};
use crate::event_listener::RegionServerEventListenerRef;
@@ -138,12 +141,12 @@ impl RegionServer {
/// Finds the region's engine by its id. If the region is not ready, returns `None`.
pub fn find_engine(&self, region_id: RegionId) -> Result<Option<RegionEngineRef>> {
self.inner
.get_engine(region_id, &RegionChange::None)
.map(|x| match x {
CurrentEngine::Engine(engine) => Some(engine),
CurrentEngine::EarlyReturn(_) => None,
})
match self.inner.get_engine(region_id, &RegionChange::None) {
Ok(CurrentEngine::Engine(engine)) => Ok(Some(engine)),
Ok(CurrentEngine::EarlyReturn(_)) => Ok(None),
Err(error::Error::RegionNotFound { .. }) => Ok(None),
Err(err) => Err(err),
}
}
#[tracing::instrument(skip_all)]
@@ -412,6 +415,7 @@ impl RegionServer {
Ok(RegionResponse {
affected_rows,
extensions,
metadata: Vec::new(),
})
}
@@ -441,6 +445,7 @@ impl RegionServer {
Ok(RegionResponse {
affected_rows,
extensions,
metadata: Vec::new(),
})
}
@@ -473,6 +478,48 @@ impl RegionServer {
.map(|_| RegionResponse::new(AffectedRows::default()))
}
/// Handles the ListMetadata request and retrieves metadata for specified regions.
///
/// Returns the results as a JSON-serialized list in the [RegionResponse]. It serializes
/// non-existing regions as `null`.
#[tracing::instrument(skip_all)]
async fn handle_list_metadata_request(
&self,
request: &ListMetadataRequest,
) -> Result<RegionResponse> {
let mut region_metadatas = Vec::new();
// Collect metadata for each region
for region_id in &request.region_ids {
let region_id = RegionId::from_u64(*region_id);
// Get the engine.
let Some(engine) = self.find_engine(region_id)? else {
region_metadatas.push(None);
continue;
};
match engine.get_metadata(region_id).await {
Ok(metadata) => region_metadatas.push(Some(metadata)),
Err(err) => {
if err.status_code() == StatusCode::RegionNotFound {
region_metadatas.push(None);
} else {
Err(err).with_context(|_| GetRegionMetadataSnafu {
engine: engine.name(),
region_id,
})?;
}
}
}
}
// Serialize metadata to JSON
let json_result = serde_json::to_vec(&region_metadatas).context(SerializeJsonSnafu)?;
let response = RegionResponse::from_metadata(json_result);
Ok(response)
}
/// Sync region manifest and registers new opened logical regions.
pub async fn sync_region(
&self,
@@ -504,6 +551,10 @@ impl RegionServerHandler for RegionServer {
region_request::Body::Sync(sync_request) => {
self.handle_sync_region_request(sync_request).await
}
region_request::Body::ListMetadata(list_metadata_request) => {
self.handle_list_metadata_request(list_metadata_request)
.await
}
_ => self.handle_requests_in_serial(request).await,
}
.map_err(BoxedError::new)
@@ -518,6 +569,7 @@ impl RegionServerHandler for RegionServer {
}),
affected_rows: response.affected_rows as _,
extensions: response.extensions,
metadata: response.metadata,
})
}
}
@@ -897,6 +949,7 @@ impl RegionServerInner {
Ok(RegionResponse {
affected_rows: result.affected_rows,
extensions: result.extensions,
metadata: Vec::new(),
})
}
Err(err) => {
@@ -967,6 +1020,7 @@ impl RegionServerInner {
Ok(RegionResponse {
affected_rows: result.affected_rows,
extensions: result.extensions,
metadata: Vec::new(),
})
}
Err(err) => {
@@ -1242,8 +1296,11 @@ mod tests {
use std::assert_matches::assert_matches;
use api::v1::SemanticType;
use common_error::ext::ErrorExt;
use datatypes::prelude::ConcreteDataType;
use mito2::test_util::CreateRequestBuilder;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionDropRequest, RegionOpenRequest, RegionTruncateRequest};
use store_api::storage::RegionId;
@@ -1605,4 +1662,175 @@ mod tests {
let forth_query = p.acquire().await;
assert!(forth_query.is_ok());
}
fn mock_region_metadata(region_id: RegionId) -> RegionMetadata {
let mut metadata_builder = RegionMetadataBuilder::new(region_id);
metadata_builder.push_column_metadata(ColumnMetadata {
column_schema: datatypes::schema::ColumnSchema::new(
"timestamp",
ConcreteDataType::timestamp_nanosecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 0,
});
metadata_builder.push_column_metadata(ColumnMetadata {
column_schema: datatypes::schema::ColumnSchema::new(
"file",
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 1,
});
metadata_builder.push_column_metadata(ColumnMetadata {
column_schema: datatypes::schema::ColumnSchema::new(
"message",
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Field,
column_id: 2,
});
metadata_builder.primary_key(vec![1]);
metadata_builder.build().unwrap()
}
#[tokio::test]
async fn test_handle_list_metadata_request() {
common_telemetry::init_default_ut_logging();
let mut mock_region_server = mock_region_server();
let region_id_1 = RegionId::new(1, 0);
let region_id_2 = RegionId::new(2, 0);
let metadata_1 = mock_region_metadata(region_id_1);
let metadata_2 = mock_region_metadata(region_id_2);
let metadatas = vec![Some(metadata_1.clone()), Some(metadata_2.clone())];
let metadata_1 = Arc::new(metadata_1);
let metadata_2 = Arc::new(metadata_2);
let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
MITO_ENGINE_NAME,
Box::new(move |region_id| {
if region_id == region_id_1 {
Ok(metadata_1.clone())
} else if region_id == region_id_2 {
Ok(metadata_2.clone())
} else {
error::RegionNotFoundSnafu { region_id }.fail()
}
}),
);
mock_region_server.register_engine(engine.clone());
mock_region_server
.inner
.region_map
.insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
mock_region_server
.inner
.region_map
.insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
// All regions exist.
let list_metadata_request = ListMetadataRequest {
region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
};
let response = mock_region_server
.handle_list_metadata_request(&list_metadata_request)
.await
.unwrap();
let decoded_metadata: Vec<Option<RegionMetadata>> =
serde_json::from_slice(&response.metadata).unwrap();
assert_eq!(metadatas, decoded_metadata);
}
#[tokio::test]
async fn test_handle_list_metadata_not_found() {
common_telemetry::init_default_ut_logging();
let mut mock_region_server = mock_region_server();
let region_id_1 = RegionId::new(1, 0);
let region_id_2 = RegionId::new(2, 0);
let metadata_1 = mock_region_metadata(region_id_1);
let metadatas = vec![Some(metadata_1.clone()), None];
let metadata_1 = Arc::new(metadata_1);
let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
MITO_ENGINE_NAME,
Box::new(move |region_id| {
if region_id == region_id_1 {
Ok(metadata_1.clone())
} else {
error::RegionNotFoundSnafu { region_id }.fail()
}
}),
);
mock_region_server.register_engine(engine.clone());
mock_region_server
.inner
.region_map
.insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
// Not in region map.
let list_metadata_request = ListMetadataRequest {
region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
};
let response = mock_region_server
.handle_list_metadata_request(&list_metadata_request)
.await
.unwrap();
let decoded_metadata: Vec<Option<RegionMetadata>> =
serde_json::from_slice(&response.metadata).unwrap();
assert_eq!(metadatas, decoded_metadata);
// Not in region engine.
mock_region_server
.inner
.region_map
.insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
let response = mock_region_server
.handle_list_metadata_request(&list_metadata_request)
.await
.unwrap();
let decoded_metadata: Vec<Option<RegionMetadata>> =
serde_json::from_slice(&response.metadata).unwrap();
assert_eq!(metadatas, decoded_metadata);
}
#[tokio::test]
async fn test_handle_list_metadata_failed() {
common_telemetry::init_default_ut_logging();
let mut mock_region_server = mock_region_server();
let region_id_1 = RegionId::new(1, 0);
let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
MITO_ENGINE_NAME,
Box::new(move |region_id| {
error::UnexpectedSnafu {
violated: format!("Failed to get region {region_id}"),
}
.fail()
}),
);
mock_region_server.register_engine(engine.clone());
mock_region_server
.inner
.region_map
.insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
// Failed to get.
let list_metadata_request = ListMetadataRequest {
region_ids: vec![region_id_1.as_u64()],
};
mock_region_server
.handle_list_metadata_request(&list_metadata_request)
.await
.unwrap_err();
}
}

View File

@@ -14,45 +14,22 @@
//! object storage utilities
mod azblob;
pub mod fs;
mod gcs;
mod oss;
mod s3;
use std::path;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use common_telemetry::{info, warn};
use mito2::access_layer::{ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR};
use object_store::factory::new_raw_object_store;
use object_store::layers::{LruCacheLayer, RetryInterceptor, RetryLayer};
use object_store::services::Fs;
use object_store::util::{join_dir, normalize_dir, with_instrument_layers};
use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder};
use object_store::util::{clean_temp_dir, join_dir, with_instrument_layers};
use object_store::{
Access, Error, ObjectStore, ObjectStoreBuilder, ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR,
};
use snafu::prelude::*;
use crate::config::{HttpClientConfig, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
use crate::error::{self, BuildHttpClientSnafu, CreateDirSnafu, Result};
pub(crate) async fn new_raw_object_store(
store: &ObjectStoreConfig,
data_home: &str,
) -> Result<ObjectStore> {
let data_home = normalize_dir(data_home);
let object_store = match store {
ObjectStoreConfig::File(file_config) => {
fs::new_fs_object_store(&data_home, file_config).await
}
ObjectStoreConfig::S3(s3_config) => s3::new_s3_object_store(s3_config).await,
ObjectStoreConfig::Oss(oss_config) => oss::new_oss_object_store(oss_config).await,
ObjectStoreConfig::Azblob(azblob_config) => {
azblob::new_azblob_object_store(azblob_config).await
}
ObjectStoreConfig::Gcs(gcs_config) => gcs::new_gcs_object_store(gcs_config).await,
}?;
Ok(object_store)
}
use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
use crate::error::{self, CreateDirSnafu, Result};
fn with_retry_layers(object_store: ObjectStore) -> ObjectStore {
object_store.layer(
@@ -66,7 +43,9 @@ pub(crate) async fn new_object_store_without_cache(
store: &ObjectStoreConfig,
data_home: &str,
) -> Result<ObjectStore> {
let object_store = new_raw_object_store(store, data_home).await?;
let object_store = new_raw_object_store(store, data_home)
.await
.context(error::ObjectStoreSnafu)?;
// Enable retry layer and cache layer for non-fs object storages
let object_store = if store.is_object_storage() {
// Adds retry layer
@@ -83,7 +62,9 @@ pub(crate) async fn new_object_store(
store: ObjectStoreConfig,
data_home: &str,
) -> Result<ObjectStore> {
let object_store = new_raw_object_store(&store, data_home).await?;
let object_store = new_raw_object_store(&store, data_home)
.await
.context(error::ObjectStoreSnafu)?;
// Enable retry layer and cache layer for non-fs object storages
let object_store = if store.is_object_storage() {
let object_store = if let Some(cache_layer) = build_cache_layer(&store, data_home).await? {
@@ -170,20 +151,20 @@ async fn build_cache_layer(
&& !path.trim().is_empty()
{
let atomic_temp_dir = join_dir(path, ATOMIC_WRITE_DIR);
clean_temp_dir(&atomic_temp_dir)?;
clean_temp_dir(&atomic_temp_dir).context(error::ObjectStoreSnafu)?;
// Compatible code. Remove this after a major release.
let old_atomic_temp_dir = join_dir(path, OLD_ATOMIC_WRITE_DIR);
clean_temp_dir(&old_atomic_temp_dir)?;
clean_temp_dir(&old_atomic_temp_dir).context(error::ObjectStoreSnafu)?;
let cache_store = Fs::default()
.root(path)
.atomic_write_dir(&atomic_temp_dir)
.build()
.context(error::InitBackendSnafu)?;
.context(error::BuildCacheStoreSnafu)?;
let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize)
.context(error::InitBackendSnafu)?;
.context(error::BuildCacheStoreSnafu)?;
cache_layer.recover_cache(false).await;
info!(
"Enabled local object storage cache, path: {}, capacity: {}.",
@@ -196,31 +177,6 @@ async fn build_cache_layer(
}
}
pub(crate) fn clean_temp_dir(dir: &str) -> Result<()> {
if path::Path::new(&dir).exists() {
info!("Begin to clean temp storage directory: {}", dir);
std::fs::remove_dir_all(dir).context(error::RemoveDirSnafu { dir })?;
info!("Cleaned temp storage directory: {}", dir);
}
Ok(())
}
pub(crate) fn build_http_client(config: &HttpClientConfig) -> Result<HttpClient> {
if config.skip_ssl_validation {
common_telemetry::warn!("Skipping SSL validation for object storage HTTP client. Please ensure the environment is trusted.");
}
let client = reqwest::ClientBuilder::new()
.pool_max_idle_per_host(config.pool_max_idle_per_host as usize)
.connect_timeout(config.connect_timeout)
.pool_idle_timeout(config.pool_idle_timeout)
.timeout(config.timeout)
.danger_accept_invalid_certs(config.skip_ssl_validation)
.build()
.context(BuildHttpClientSnafu)?;
Ok(HttpClient::with(client))
}
struct PrintDetailedError;
// PrintDetailedError is a retry interceptor that prints error in Debug format in retrying.

View File

@@ -1,50 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_base::secrets::ExposeSecret;
use common_telemetry::info;
use object_store::services::Azblob;
use object_store::{util, ObjectStore};
use snafu::prelude::*;
use crate::config::AzblobConfig;
use crate::error::{self, Result};
use crate::store::build_http_client;
pub(crate) async fn new_azblob_object_store(azblob_config: &AzblobConfig) -> Result<ObjectStore> {
let root = util::normalize_dir(&azblob_config.root);
info!(
"The azure storage container is: {}, root is: {}",
azblob_config.container, &root
);
let client = build_http_client(&azblob_config.http_client)?;
let mut builder = Azblob::default()
.root(&root)
.container(&azblob_config.container)
.endpoint(&azblob_config.endpoint)
.account_name(azblob_config.account_name.expose_secret())
.account_key(azblob_config.account_key.expose_secret())
.http_client(client);
if let Some(token) = &azblob_config.sas_token {
builder = builder.sas_token(token);
};
Ok(ObjectStore::new(builder)
.context(error::InitBackendSnafu)?
.finish())
}

View File

@@ -1,53 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{fs, path};
use common_telemetry::info;
use mito2::access_layer::{ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR};
use object_store::services::Fs;
use object_store::util::join_dir;
use object_store::ObjectStore;
use snafu::prelude::*;
use crate::config::FileConfig;
use crate::error::{self, Result};
use crate::store;
/// A helper function to create a file system object store.
pub async fn new_fs_object_store(
data_home: &str,
_file_config: &FileConfig,
) -> Result<ObjectStore> {
fs::create_dir_all(path::Path::new(&data_home))
.context(error::CreateDirSnafu { dir: data_home })?;
info!("The file storage home is: {}", data_home);
let atomic_write_dir = join_dir(data_home, ATOMIC_WRITE_DIR);
store::clean_temp_dir(&atomic_write_dir)?;
// Compatible code. Remove this after a major release.
let old_atomic_temp_dir = join_dir(data_home, OLD_ATOMIC_WRITE_DIR);
store::clean_temp_dir(&old_atomic_temp_dir)?;
let builder = Fs::default()
.root(data_home)
.atomic_write_dir(&atomic_write_dir);
let object_store = ObjectStore::new(builder)
.context(error::InitBackendSnafu)?
.finish();
Ok(object_store)
}

View File

@@ -1,46 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_base::secrets::ExposeSecret;
use common_telemetry::info;
use object_store::services::Gcs;
use object_store::{util, ObjectStore};
use snafu::prelude::*;
use crate::config::GcsConfig;
use crate::error::{self, Result};
use crate::store::build_http_client;
pub(crate) async fn new_gcs_object_store(gcs_config: &GcsConfig) -> Result<ObjectStore> {
let root = util::normalize_dir(&gcs_config.root);
info!(
"The gcs storage bucket is: {}, root is: {}",
gcs_config.bucket, &root
);
let client = build_http_client(&gcs_config.http_client);
let builder = Gcs::default()
.root(&root)
.bucket(&gcs_config.bucket)
.scope(&gcs_config.scope)
.credential_path(gcs_config.credential_path.expose_secret())
.credential(gcs_config.credential.expose_secret())
.endpoint(&gcs_config.endpoint)
.http_client(client?);
Ok(ObjectStore::new(builder)
.context(error::InitBackendSnafu)?
.finish())
}

View File

@@ -1,45 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_base::secrets::ExposeSecret;
use common_telemetry::info;
use object_store::services::Oss;
use object_store::{util, ObjectStore};
use snafu::prelude::*;
use crate::config::OssConfig;
use crate::error::{self, Result};
use crate::store::build_http_client;
pub(crate) async fn new_oss_object_store(oss_config: &OssConfig) -> Result<ObjectStore> {
let root = util::normalize_dir(&oss_config.root);
info!(
"The oss storage bucket is: {}, root is: {}",
oss_config.bucket, &root
);
let client = build_http_client(&oss_config.http_client)?;
let builder = Oss::default()
.root(&root)
.bucket(&oss_config.bucket)
.endpoint(&oss_config.endpoint)
.access_key_id(oss_config.access_key_id.expose_secret())
.access_key_secret(oss_config.access_key_secret.expose_secret())
.http_client(client);
Ok(ObjectStore::new(builder)
.context(error::InitBackendSnafu)?
.finish())
}

View File

@@ -1,55 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_base::secrets::ExposeSecret;
use common_telemetry::info;
use object_store::services::S3;
use object_store::{util, ObjectStore};
use snafu::prelude::*;
use crate::config::S3Config;
use crate::error::{self, Result};
use crate::store::build_http_client;
pub(crate) async fn new_s3_object_store(s3_config: &S3Config) -> Result<ObjectStore> {
let root = util::normalize_dir(&s3_config.root);
info!(
"The s3 storage bucket is: {}, root is: {}",
s3_config.bucket, &root
);
let client = build_http_client(&s3_config.http_client)?;
let mut builder = S3::default()
.root(&root)
.bucket(&s3_config.bucket)
.access_key_id(s3_config.access_key_id.expose_secret())
.secret_access_key(s3_config.secret_access_key.expose_secret())
.http_client(client);
if s3_config.endpoint.is_some() {
builder = builder.endpoint(s3_config.endpoint.as_ref().unwrap());
}
if s3_config.region.is_some() {
builder = builder.region(s3_config.region.as_ref().unwrap());
}
if s3_config.enable_virtual_host_style {
builder = builder.enable_virtual_host_style();
}
Ok(ObjectStore::new(builder)
.context(error::InitBackendSnafu)?
.finish())
}

View File

@@ -108,11 +108,15 @@ pub type MockRequestHandler =
pub type MockSetReadonlyGracefullyHandler =
Box<dyn Fn(RegionId) -> Result<SetRegionRoleStateResponse, Error> + Send + Sync>;
pub type MockGetMetadataHandler =
Box<dyn Fn(RegionId) -> Result<RegionMetadataRef, Error> + Send + Sync>;
pub struct MockRegionEngine {
sender: Sender<(RegionId, RegionRequest)>,
pub(crate) handle_request_delay: Option<Duration>,
pub(crate) handle_request_mock_fn: Option<MockRequestHandler>,
pub(crate) handle_set_readonly_gracefully_mock_fn: Option<MockSetReadonlyGracefullyHandler>,
pub(crate) handle_get_metadata_mock_fn: Option<MockGetMetadataHandler>,
pub(crate) mock_role: Option<Option<RegionRole>>,
engine: String,
}
@@ -127,6 +131,7 @@ impl MockRegionEngine {
sender: tx,
handle_request_mock_fn: None,
handle_set_readonly_gracefully_mock_fn: None,
handle_get_metadata_mock_fn: None,
mock_role: None,
engine: engine.to_string(),
}),
@@ -146,6 +151,27 @@ impl MockRegionEngine {
sender: tx,
handle_request_mock_fn: Some(mock_fn),
handle_set_readonly_gracefully_mock_fn: None,
handle_get_metadata_mock_fn: None,
mock_role: None,
engine: engine.to_string(),
}),
rx,
)
}
pub fn with_metadata_mock_fn(
engine: &str,
mock_fn: MockGetMetadataHandler,
) -> (Arc<Self>, Receiver<(RegionId, RegionRequest)>) {
let (tx, rx) = tokio::sync::mpsc::channel(8);
(
Arc::new(Self {
handle_request_delay: None,
sender: tx,
handle_request_mock_fn: None,
handle_set_readonly_gracefully_mock_fn: None,
handle_get_metadata_mock_fn: Some(mock_fn),
mock_role: None,
engine: engine.to_string(),
}),
@@ -166,6 +192,7 @@ impl MockRegionEngine {
sender: tx,
handle_request_mock_fn: None,
handle_set_readonly_gracefully_mock_fn: None,
handle_get_metadata_mock_fn: None,
mock_role: None,
engine: engine.to_string(),
};
@@ -208,7 +235,11 @@ impl RegionEngine for MockRegionEngine {
unimplemented!()
}
async fn get_metadata(&self, _region_id: RegionId) -> Result<RegionMetadataRef, BoxedError> {
async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError> {
if let Some(mock_fn) = &self.handle_get_metadata_mock_fn {
return mock_fn(region_id).map_err(BoxedError::new);
};
unimplemented!()
}

View File

@@ -95,7 +95,7 @@ impl Default for FlowConfig {
}
/// Options for flow node
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct FlownodeOptions {
pub node_id: Option<u64>,
@@ -251,6 +251,10 @@ impl DiffRequest {
Self::Delete(v) => v.len(),
}
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
pub fn batches_to_rows_req(batches: Vec<Batch>) -> Result<Vec<DiffRequest>, Error> {
@@ -929,6 +933,12 @@ pub struct FlowTickManager {
start_timestamp: repr::Timestamp,
}
impl Default for FlowTickManager {
fn default() -> Self {
Self::new()
}
}
impl FlowTickManager {
pub fn new() -> Self {
FlowTickManager {

View File

@@ -43,7 +43,7 @@ mod utils;
#[cfg(test)]
mod test_utils;
pub use adapter::{FlowConfig, FlowStreamingEngineRef, FlownodeOptions, StreamingEngine};
pub use adapter::{FlowConfig, FlowStreamingEngineRef, StreamingEngine};
pub use batching_mode::frontend_client::{FrontendClient, GrpcQueryHandlerWithBoxedError};
pub use engine::FlowAuthHeader;
pub(crate) use engine::{CreateFlowArgs, FlowId, TableName};
@@ -52,3 +52,5 @@ pub use server::{
get_flow_auth_options, FlownodeBuilder, FlownodeInstance, FlownodeServer,
FlownodeServiceBuilder, FrontendInvoker,
};
pub use crate::adapter::FlownodeOptions;

View File

@@ -37,6 +37,7 @@ use common_base::cancellation::CancellableFuture;
use common_base::Plugins;
use common_config::KvBackendConfig;
use common_error::ext::{BoxedError, ErrorExt};
use common_meta::key::runtime_switch::RuntimeSwitchManager;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::KvBackendRef;
use common_meta::state_store::KvStateStore;
@@ -125,10 +126,12 @@ impl Instance {
max_running_procedures: procedure_config.max_running_procedures,
..Default::default()
};
let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend.clone()));
let procedure_manager = Arc::new(LocalManager::new(
manager_config,
kv_state_store.clone(),
kv_state_store,
Some(runtime_switch_manager),
));
Ok((kv_backend, procedure_manager))
@@ -513,6 +516,8 @@ pub fn check_permission(
| Statement::AlterDatabase(_)
| Statement::DropFlow(_)
| Statement::Use(_) => {}
#[cfg(feature = "enterprise")]
Statement::DropTrigger(_) => {}
Statement::ShowCreateDatabase(stmt) => {
validate_database(&stmt.database_name, query_ctx)?;
}
@@ -616,6 +621,8 @@ pub fn check_permission(
Statement::FetchCursor(_) | Statement::CloseCursor(_) => {}
// User can only kill process in their own catalog.
Statement::Kill(_) => {}
// SHOW PROCESSLIST
Statement::ShowProcesslist(_) => {}
}
Ok(())
}

View File

@@ -54,14 +54,6 @@ pub enum Error {
peer_id: u64,
},
#[snafu(display("Failed to lookup peer: {}", peer_id))]
LookupPeer {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
peer_id: u64,
},
#[snafu(display("Another migration procedure is running for region: {}", region_id))]
MigrationRunning {
#[snafu(implicit)]
@@ -382,6 +374,7 @@ pub enum Error {
location: Location,
},
#[cfg(feature = "mysql_kvbackend")]
#[snafu(display("Failed to decode sql value"))]
DecodeSqlValue {
#[snafu(source)]
@@ -695,8 +688,8 @@ pub enum Error {
location: Location,
},
#[snafu(display("Maintenance mode manager error"))]
MaintenanceModeManager {
#[snafu(display("Runtime switch manager error"))]
RuntimeSwitchManager {
source: common_meta::error::Error,
#[snafu(implicit)]
location: Location,
@@ -1023,7 +1016,7 @@ impl ErrorExt for Error {
Error::SubmitDdlTask { source, .. } => source.status_code(),
Error::ConvertProtoData { source, .. }
| Error::TableMetadataManager { source, .. }
| Error::MaintenanceModeManager { source, .. }
| Error::RuntimeSwitchManager { source, .. }
| Error::KvBackend { source, .. }
| Error::UnexpectedLogicalRouteTable { source, .. }
| Error::UpdateTopicNameValue { source, .. } => source.status_code(),
@@ -1033,7 +1026,6 @@ impl ErrorExt for Error {
}
Error::Other { source, .. } => source.status_code(),
Error::LookupPeer { source, .. } => source.status_code(),
Error::NoEnoughAvailableNode { .. } => StatusCode::RuntimeResourcesExhausted,
#[cfg(feature = "pg_kvbackend")]

View File

@@ -27,7 +27,7 @@ use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl::ProcedureExecutorRef;
use common_meta::distributed_time_constants;
use common_meta::key::maintenance::MaintenanceModeManagerRef;
use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
use common_meta::leadership_notifier::{
@@ -110,6 +110,11 @@ pub struct MetasrvOptions {
pub use_memory_store: bool,
/// Whether to enable region failover.
pub enable_region_failover: bool,
/// The delay before starting region failure detection.
/// This delay helps prevent Metasrv from triggering unnecessary region failovers before all Datanodes are fully started.
/// Especially useful when the cluster is not deployed with GreptimeDB Operator and maintenance mode is not enabled.
#[serde(with = "humantime_serde")]
pub region_failure_detector_initialization_delay: Duration,
/// Whether to allow region failover on local WAL.
///
/// If it's true, the region failover will be allowed even if the local WAL is used.
@@ -219,6 +224,7 @@ impl Default for MetasrvOptions {
selector: SelectorType::default(),
use_memory_store: false,
enable_region_failover: false,
region_failure_detector_initialization_delay: Duration::from_secs(10 * 60),
allow_region_failover_on_local_wal: false,
grpc: GrpcOptions {
bind_addr: format!("127.0.0.1:{}", DEFAULT_METASRV_ADDR_PORT),
@@ -428,7 +434,7 @@ pub struct Metasrv {
procedure_executor: ProcedureExecutorRef,
wal_options_allocator: WalOptionsAllocatorRef,
table_metadata_manager: TableMetadataManagerRef,
maintenance_mode_manager: MaintenanceModeManagerRef,
runtime_switch_manager: RuntimeSwitchManagerRef,
memory_region_keeper: MemoryRegionKeeperRef,
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
region_migration_manager: RegionMigrationManagerRef,
@@ -687,8 +693,8 @@ impl Metasrv {
&self.table_metadata_manager
}
pub fn maintenance_mode_manager(&self) -> &MaintenanceModeManagerRef {
&self.maintenance_mode_manager
pub fn runtime_switch_manager(&self) -> &RuntimeSwitchManagerRef {
&self.runtime_switch_manager
}
pub fn memory_region_keeper(&self) -> &MemoryRegionKeeperRef {

View File

@@ -29,7 +29,7 @@ use common_meta::ddl_manager::DdlManager;
use common_meta::distributed_time_constants;
use common_meta::key::flow::flow_state::FlowStateManager;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::maintenance::MaintenanceModeManager;
use common_meta::key::runtime_switch::{RuntimeSwitchManager, RuntimeSwitchManagerRef};
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
@@ -64,7 +64,7 @@ use crate::procedure::wal_prune::manager::{WalPruneManager, WalPruneTicker};
use crate::procedure::wal_prune::Context as WalPruneContext;
use crate::region::supervisor::{
HeartbeatAcceptor, RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorSelector,
RegionSupervisorTicker, DEFAULT_TICK_INTERVAL,
RegionSupervisorTicker, DEFAULT_INITIALIZATION_RETRY_PERIOD, DEFAULT_TICK_INTERVAL,
};
use crate::selector::lease_based::LeaseBasedSelector;
use crate::selector::round_robin::RoundRobinSelector;
@@ -193,7 +193,9 @@ impl MetasrvBuilder {
let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector::default()));
let pushers = Pushers::default();
let mailbox = build_mailbox(&kv_backend, &pushers);
let procedure_manager = build_procedure_manager(&options, &kv_backend);
let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend.clone()));
let procedure_manager =
build_procedure_manager(&options, &kv_backend, &runtime_switch_manager);
let table_metadata_manager = Arc::new(TableMetadataManager::new(
leader_cached_kv_backend.clone() as _,
@@ -201,7 +203,7 @@ impl MetasrvBuilder {
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(
leader_cached_kv_backend.clone() as _,
));
let maintenance_mode_manager = Arc::new(MaintenanceModeManager::new(kv_backend.clone()));
let selector_ctx = SelectorContext {
server_addr: options.grpc.server_addr.clone(),
datanode_lease_secs: distributed_time_constants::DATANODE_LEASE_SECS,
@@ -299,6 +301,8 @@ impl MetasrvBuilder {
Arc::new(RegionFailureDetectorControl::new(tx.clone())) as _,
Some(Arc::new(RegionSupervisorTicker::new(
DEFAULT_TICK_INTERVAL,
options.region_failure_detector_initialization_delay,
DEFAULT_INITIALIZATION_RETRY_PERIOD,
tx.clone(),
))),
)
@@ -339,8 +343,9 @@ impl MetasrvBuilder {
selector_ctx.clone(),
supervisor_selector,
region_migration_manager.clone(),
maintenance_mode_manager.clone(),
runtime_switch_manager.clone(),
peer_lookup_service.clone(),
leader_cached_kv_backend.clone(),
);
Some(RegionFailureHandler::new(
@@ -353,30 +358,28 @@ impl MetasrvBuilder {
let leader_region_registry = Arc::new(LeaderRegionRegistry::default());
let ddl_context = DdlContext {
node_manager,
cache_invalidator: cache_invalidator.clone(),
memory_region_keeper: memory_region_keeper.clone(),
leader_region_registry: leader_region_registry.clone(),
table_metadata_manager: table_metadata_manager.clone(),
table_metadata_allocator: table_metadata_allocator.clone(),
flow_metadata_manager: flow_metadata_manager.clone(),
flow_metadata_allocator: flow_metadata_allocator.clone(),
region_failure_detector_controller,
};
let procedure_manager_c = procedure_manager.clone();
let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager_c, true)
.context(error::InitDdlManagerSnafu)?;
#[cfg(feature = "enterprise")]
let trigger_ddl_manager = plugins
.as_ref()
.and_then(|plugins| plugins.get::<common_meta::ddl_manager::TriggerDdlManagerRef>());
let ddl_manager = Arc::new(
DdlManager::try_new(
DdlContext {
node_manager,
cache_invalidator: cache_invalidator.clone(),
memory_region_keeper: memory_region_keeper.clone(),
leader_region_registry: leader_region_registry.clone(),
table_metadata_manager: table_metadata_manager.clone(),
table_metadata_allocator: table_metadata_allocator.clone(),
flow_metadata_manager: flow_metadata_manager.clone(),
flow_metadata_allocator: flow_metadata_allocator.clone(),
region_failure_detector_controller,
},
procedure_manager.clone(),
true,
#[cfg(feature = "enterprise")]
trigger_ddl_manager,
)
.context(error::InitDdlManagerSnafu)?,
);
let ddl_manager = {
let trigger_ddl_manager = plugins.as_ref().and_then(|plugins| {
plugins.get::<common_meta::ddl_manager::TriggerDdlManagerRef>()
});
ddl_manager.with_trigger_ddl_manager(trigger_ddl_manager)
};
let ddl_manager = Arc::new(ddl_manager);
// remote WAL prune ticker and manager
let wal_prune_ticker = if is_remote_wal && options.wal.enable_active_wal_pruning() {
@@ -463,7 +466,7 @@ impl MetasrvBuilder {
procedure_executor: ddl_manager,
wal_options_allocator,
table_metadata_manager,
maintenance_mode_manager,
runtime_switch_manager,
greptimedb_telemetry_task: get_greptimedb_telemetry_task(
Some(metasrv_home),
meta_peer_client,
@@ -506,6 +509,7 @@ fn build_mailbox(kv_backend: &KvBackendRef, pushers: &Pushers) -> MailboxRef {
fn build_procedure_manager(
options: &MetasrvOptions,
kv_backend: &KvBackendRef,
runtime_switch_manager: &RuntimeSwitchManagerRef,
) -> ProcedureManagerRef {
let manager_config = ManagerConfig {
max_retry_times: options.procedure.max_retry_times,
@@ -526,6 +530,7 @@ fn build_procedure_manager(
manager_config,
kv_state_store.clone(),
kv_state_store,
Some(runtime_switch_manager.clone()),
))
}

View File

@@ -23,7 +23,7 @@ use common_meta::key::table_route::TableRouteValue;
use common_meta::peer::Peer;
use common_meta::rpc::router::RegionRoute;
use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId};
use common_telemetry::{error, info};
use common_telemetry::{error, info, warn};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionId;
use table::table_name::TableName;
@@ -253,10 +253,12 @@ impl RegionMigrationManager {
}
/// Throws an error if `leader_peer` is not the `from_peer`.
///
/// If `from_peer` is unknown, use the leader peer as the `from_peer`.
fn verify_region_leader_peer(
&self,
region_route: &RegionRoute,
task: &RegionMigrationProcedureTask,
task: &mut RegionMigrationProcedureTask,
) -> Result<()> {
let leader_peer = region_route
.leader_peer
@@ -275,6 +277,15 @@ impl RegionMigrationManager {
}
);
if task.from_peer.addr.is_empty() {
warn!(
"The `from_peer` is unknown, use the leader peer({}) as the `from_peer`, region: {}",
leader_peer, task.region_id
);
// The peer id is the same as the leader peer id.
task.from_peer = leader_peer.clone();
}
Ok(())
}
@@ -300,7 +311,7 @@ impl RegionMigrationManager {
/// Submits a new region migration procedure.
pub async fn submit_procedure(
&self,
task: RegionMigrationProcedureTask,
mut task: RegionMigrationProcedureTask,
) -> Result<Option<ProcedureId>> {
let Some(guard) = self.insert_running_procedure(&task) else {
return error::MigrationRunningSnafu {
@@ -333,7 +344,7 @@ impl RegionMigrationManager {
.fail();
}
self.verify_region_leader_peer(&region_route, &task)?;
self.verify_region_leader_peer(&region_route, &mut task)?;
self.verify_region_follower_peers(&region_route, &task)?;
let table_info = self.retrieve_table_info(region_id).await?;
let TableName {
@@ -341,12 +352,6 @@ impl RegionMigrationManager {
schema_name,
..
} = table_info.table_name();
METRIC_META_REGION_MIGRATION_DATANODES
.with_label_values(&["src", &task.from_peer.id.to_string()])
.inc();
METRIC_META_REGION_MIGRATION_DATANODES
.with_label_values(&["desc", &task.to_peer.id.to_string()])
.inc();
let RegionMigrationProcedureTask {
region_id,
from_peer,
@@ -377,6 +382,12 @@ impl RegionMigrationManager {
return;
}
};
METRIC_META_REGION_MIGRATION_DATANODES
.with_label_values(&["src", &task.from_peer.id.to_string()])
.inc();
METRIC_META_REGION_MIGRATION_DATANODES
.with_label_values(&["desc", &task.to_peer.id.to_string()])
.inc();
if let Err(e) = watcher::wait(watcher).await {
error!(e; "Failed to wait region migration procedure {procedure_id} for {task}");

View File

@@ -92,6 +92,7 @@ impl TestingEnv {
ManagerConfig::default(),
state_store,
poison_manager,
None,
));
Self {

View File

@@ -103,6 +103,7 @@ pub mod mock {
}),
affected_rows: 0,
extensions: Default::default(),
metadata: Vec::new(),
})
}
}

View File

@@ -52,6 +52,7 @@ impl TestEnv {
ManagerConfig::default(),
state_store,
poison_manager,
None,
));
let mailbox_ctx = MailboxContext::new(mailbox_sequence);

View File

@@ -15,23 +15,30 @@
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::time::{Duration, Instant};
use async_trait::async_trait;
use common_meta::datanode::Stat;
use common_meta::ddl::{DetectingRegion, RegionFailureDetectorController};
use common_meta::key::maintenance::MaintenanceModeManagerRef;
use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
use common_meta::key::table_route::{TableRouteKey, TableRouteValue};
use common_meta::key::{MetadataKey, MetadataValue};
use common_meta::kv_backend::KvBackendRef;
use common_meta::leadership_notifier::LeadershipChangeListener;
use common_meta::peer::{Peer, PeerLookupServiceRef};
use common_meta::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use common_meta::rpc::store::RangeRequest;
use common_meta::DatanodeId;
use common_runtime::JoinHandle;
use common_telemetry::{debug, error, info, warn};
use common_time::util::current_time_millis;
use error::Error::{LeaderPeerChanged, MigrationRunning, RegionMigrated, TableRouteNotFound};
use snafu::{ensure, OptionExt, ResultExt};
use futures::{StreamExt, TryStreamExt};
use snafu::{ensure, ResultExt};
use store_api::storage::RegionId;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::time::{interval, MissedTickBehavior};
use tokio::sync::oneshot;
use tokio::time::{interval, interval_at, MissedTickBehavior};
use crate::error::{self, Result};
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
@@ -70,6 +77,9 @@ impl From<&Stat> for DatanodeHeartbeat {
///
/// Variants:
/// - `Tick`: This event is used to trigger region failure detection periodically.
/// - `InitializeAllRegions`: This event is used to initialize all region failure detectors.
/// - `RegisterFailureDetectors`: This event is used to register failure detectors for regions.
/// - `DeregisterFailureDetectors`: This event is used to deregister failure detectors for regions.
/// - `HeartbeatArrived`: This event presents the metasrv received [`DatanodeHeartbeat`] from the datanodes.
/// - `Clear`: This event is used to reset the state of the supervisor, typically used
/// when a system-wide reset or reinitialization is needed.
@@ -78,6 +88,7 @@ impl From<&Stat> for DatanodeHeartbeat {
/// of the supervisor during tests.
pub(crate) enum Event {
Tick,
InitializeAllRegions(tokio::sync::oneshot::Sender<()>),
RegisterFailureDetectors(Vec<DetectingRegion>),
DeregisterFailureDetectors(Vec<DetectingRegion>),
HeartbeatArrived(DatanodeHeartbeat),
@@ -102,6 +113,7 @@ impl Debug for Event {
Self::Tick => write!(f, "Tick"),
Self::HeartbeatArrived(arg0) => f.debug_tuple("HeartbeatArrived").field(arg0).finish(),
Self::Clear => write!(f, "Clear"),
Self::InitializeAllRegions(_) => write!(f, "InspectAndRegisterRegions"),
Self::RegisterFailureDetectors(arg0) => f
.debug_tuple("RegisterFailureDetectors")
.field(arg0)
@@ -127,6 +139,12 @@ pub struct RegionSupervisorTicker {
/// The interval of tick.
tick_interval: Duration,
/// The delay before initializing all region failure detectors.
initialization_delay: Duration,
/// The retry period for initializing all region failure detectors.
initialization_retry_period: Duration,
/// Sends [Event]s.
sender: Sender<Event>,
}
@@ -149,10 +167,21 @@ impl LeadershipChangeListener for RegionSupervisorTicker {
}
impl RegionSupervisorTicker {
pub(crate) fn new(tick_interval: Duration, sender: Sender<Event>) -> Self {
pub(crate) fn new(
tick_interval: Duration,
initialization_delay: Duration,
initialization_retry_period: Duration,
sender: Sender<Event>,
) -> Self {
info!(
"RegionSupervisorTicker is created, tick_interval: {:?}, initialization_delay: {:?}, initialization_retry_period: {:?}",
tick_interval, initialization_delay, initialization_retry_period
);
Self {
tick_handle: Mutex::new(None),
tick_interval,
initialization_delay,
initialization_retry_period,
sender,
}
}
@@ -163,15 +192,39 @@ impl RegionSupervisorTicker {
if handle.is_none() {
let sender = self.sender.clone();
let tick_interval = self.tick_interval;
let initialization_delay = self.initialization_delay;
let mut initialization_interval = interval_at(
tokio::time::Instant::now() + initialization_delay,
self.initialization_retry_period,
);
initialization_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
common_runtime::spawn_global(async move {
loop {
initialization_interval.tick().await;
let (tx, rx) = oneshot::channel();
if sender.send(Event::InitializeAllRegions(tx)).await.is_err() {
info!("EventReceiver is dropped, region failure detectors initialization loop is stopped");
break;
}
if rx.await.is_ok() {
info!("All region failure detectors are initialized.");
break;
}
}
});
let sender = self.sender.clone();
let ticker_loop = tokio::spawn(async move {
let mut interval = interval(tick_interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
let mut tick_interval = interval(tick_interval);
tick_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
if let Err(err) = sender.send(Event::Clear).await {
warn!(err; "EventReceiver is dropped, failed to send Event::Clear");
return;
}
loop {
interval.tick().await;
tick_interval.tick().await;
if sender.send(Event::Tick).await.is_err() {
info!("EventReceiver is dropped, tick loop is stopped");
break;
@@ -202,6 +255,8 @@ pub type RegionSupervisorRef = Arc<RegionSupervisor>;
/// The default tick interval.
pub const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(1);
/// The default initialization retry period.
pub const DEFAULT_INITIALIZATION_RETRY_PERIOD: Duration = Duration::from_secs(60);
/// Selector for region supervisor.
pub enum RegionSupervisorSelector {
@@ -225,9 +280,11 @@ pub struct RegionSupervisor {
/// Region migration manager.
region_migration_manager: RegionMigrationManagerRef,
/// The maintenance mode manager.
maintenance_mode_manager: MaintenanceModeManagerRef,
runtime_switch_manager: RuntimeSwitchManagerRef,
/// Peer lookup service
peer_lookup: PeerLookupServiceRef,
/// The kv backend.
kv_backend: KvBackendRef,
}
/// Controller for managing failure detectors for regions.
@@ -290,14 +347,16 @@ impl RegionSupervisor {
tokio::sync::mpsc::channel(1024)
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
event_receiver: Receiver<Event>,
options: PhiAccrualFailureDetectorOptions,
selector_context: SelectorContext,
selector: RegionSupervisorSelector,
region_migration_manager: RegionMigrationManagerRef,
maintenance_mode_manager: MaintenanceModeManagerRef,
runtime_switch_manager: RuntimeSwitchManagerRef,
peer_lookup: PeerLookupServiceRef,
kv_backend: KvBackendRef,
) -> Self {
Self {
failure_detector: RegionFailureDetector::new(options),
@@ -306,8 +365,9 @@ impl RegionSupervisor {
selector_context,
selector,
region_migration_manager,
maintenance_mode_manager,
runtime_switch_manager,
peer_lookup,
kv_backend,
}
}
@@ -315,6 +375,26 @@ impl RegionSupervisor {
pub(crate) async fn run(&mut self) {
while let Some(event) = self.receiver.recv().await {
match event {
Event::InitializeAllRegions(sender) => {
match self.is_maintenance_mode_enabled().await {
Ok(false) => {}
Ok(true) => {
warn!("Skipping initialize all regions since maintenance mode is enabled.");
continue;
}
Err(err) => {
error!(err; "Failed to check maintenance mode during initialize all regions.");
continue;
}
}
if let Err(err) = self.initialize_all().await {
error!(err; "Failed to initialize all regions.");
} else {
// Ignore the error.
let _ = sender.send(());
}
}
Event::Tick => {
let regions = self.detect_region_failure();
self.handle_region_failures(regions).await;
@@ -336,6 +416,59 @@ impl RegionSupervisor {
info!("RegionSupervisor is stopped!");
}
async fn initialize_all(&self) -> Result<()> {
let now = Instant::now();
let regions = self.regions();
let req = RangeRequest::new().with_prefix(TableRouteKey::range_prefix());
let stream = PaginationStream::new(self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, |kv| {
TableRouteKey::from_bytes(&kv.key).map(|v| (v.table_id, kv.value))
})
.into_stream();
let mut stream = stream
.map_ok(|(_, value)| {
TableRouteValue::try_from_raw_value(&value)
.context(error::TableMetadataManagerSnafu)
})
.boxed();
let mut detecting_regions = Vec::new();
while let Some(route) = stream
.try_next()
.await
.context(error::TableMetadataManagerSnafu)?
{
let route = route?;
if !route.is_physical() {
continue;
}
let physical_table_route = route.into_physical_table_route();
physical_table_route
.region_routes
.iter()
.for_each(|region_route| {
if !regions.contains(&region_route.region.id) {
if let Some(leader_peer) = &region_route.leader_peer {
detecting_regions.push((leader_peer.id, region_route.region.id));
}
}
});
}
let num_detecting_regions = detecting_regions.len();
if !detecting_regions.is_empty() {
self.register_failure_detectors(detecting_regions).await;
}
info!(
"Initialize {} region failure detectors, elapsed: {:?}",
num_detecting_regions,
now.elapsed()
);
Ok(())
}
async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
let ts_millis = current_time_millis();
for region in detecting_regions {
@@ -426,10 +559,10 @@ impl RegionSupervisor {
}
pub(crate) async fn is_maintenance_mode_enabled(&self) -> Result<bool> {
self.maintenance_mode_manager
self.runtime_switch_manager
.maintenance_mode()
.await
.context(error::MaintenanceModeManagerSnafu)
.context(error::RuntimeSwitchManagerSnafu)
}
async fn select_peers(
@@ -497,12 +630,10 @@ impl RegionSupervisor {
.peer_lookup
.datanode(from_peer_id)
.await
.context(error::LookupPeerSnafu {
peer_id: from_peer_id,
})?
.context(error::PeerUnavailableSnafu {
peer_id: from_peer_id,
})?;
.ok()
.flatten()
.unwrap_or_else(|| Peer::empty(from_peer_id));
let region_peers = self
.select_peers(from_peer_id, regions, failed_datanodes)
.await?;
@@ -599,6 +730,14 @@ impl RegionSupervisor {
.collect::<Vec<_>>()
}
/// Returns all regions that registered in the failure detector.
fn regions(&self) -> HashSet<RegionId> {
self.failure_detector
.iter()
.map(|e| e.region_ident().1)
.collect::<HashSet<_>>()
}
/// Updates the state of corresponding failure detectors.
fn on_heartbeat_arrived(&self, heartbeat: DatanodeHeartbeat) {
for region_id in heartbeat.regions {
@@ -618,13 +757,22 @@ impl RegionSupervisor {
#[cfg(test)]
pub(crate) mod tests {
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use common_meta::ddl::test_util::{
test_create_logical_table_task, test_create_physical_table_task,
};
use common_meta::ddl::RegionFailureDetectorController;
use common_meta::key::maintenance;
use common_meta::key::table_route::{
LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue,
};
use common_meta::key::{runtime_switch, TableMetadataManager};
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::test_util::NoopPeerLookupService;
use common_telemetry::info;
use common_time::util::current_time_millis;
use rand::Rng;
use store_api::storage::RegionId;
@@ -650,10 +798,11 @@ pub(crate) mod tests {
env.procedure_manager().clone(),
context_factory,
));
let maintenance_mode_manager =
Arc::new(maintenance::MaintenanceModeManager::new(env.kv_backend()));
let runtime_switch_manager =
Arc::new(runtime_switch::RuntimeSwitchManager::new(env.kv_backend()));
let peer_lookup = Arc::new(NoopPeerLookupService);
let (tx, rx) = RegionSupervisor::channel();
let kv_backend = env.kv_backend();
(
RegionSupervisor::new(
@@ -662,8 +811,9 @@ pub(crate) mod tests {
selector_context,
RegionSupervisorSelector::NaiveSelector(selector),
region_migration_manager,
maintenance_mode_manager,
runtime_switch_manager,
peer_lookup,
kv_backend,
),
tx,
)
@@ -748,6 +898,8 @@ pub(crate) mod tests {
let ticker = RegionSupervisorTicker {
tick_handle: Mutex::new(None),
tick_interval: Duration::from_millis(10),
initialization_delay: Duration::from_millis(100),
initialization_retry_period: Duration::from_millis(100),
sender: tx,
};
// It's ok if we start the ticker again.
@@ -757,11 +909,116 @@ pub(crate) mod tests {
ticker.stop();
assert!(!rx.is_empty());
while let Ok(event) = rx.try_recv() {
assert_matches!(event, Event::Tick | Event::Clear);
assert_matches!(
event,
Event::Tick | Event::Clear | Event::InitializeAllRegions(_)
);
}
}
}
#[tokio::test]
async fn test_initialize_all_regions_event_handling() {
common_telemetry::init_default_ut_logging();
let (tx, mut rx) = tokio::sync::mpsc::channel(128);
let ticker = RegionSupervisorTicker {
tick_handle: Mutex::new(None),
tick_interval: Duration::from_millis(1000),
initialization_delay: Duration::from_millis(50),
initialization_retry_period: Duration::from_millis(50),
sender: tx,
};
ticker.start();
sleep(Duration::from_millis(60)).await;
let handle = tokio::spawn(async move {
let mut counter = 0;
while let Some(event) = rx.recv().await {
if let Event::InitializeAllRegions(tx) = event {
if counter == 0 {
// Ignore the first event
counter += 1;
continue;
}
tx.send(()).unwrap();
info!("Responded initialize all regions event");
break;
}
}
rx
});
let rx = handle.await.unwrap();
for _ in 0..3 {
sleep(Duration::from_millis(100)).await;
assert!(rx.is_empty());
}
}
#[tokio::test]
async fn test_initialize_all_regions() {
common_telemetry::init_default_ut_logging();
let (mut supervisor, sender) = new_test_supervisor();
let table_metadata_manager = TableMetadataManager::new(supervisor.kv_backend.clone());
// Create a physical table metadata
let table_id = 1024;
let mut create_physical_table_task = test_create_physical_table_task("my_physical_table");
create_physical_table_task.set_table_id(table_id);
let table_info = create_physical_table_task.table_info;
let table_route = PhysicalTableRouteValue::new(vec![RegionRoute {
region: Region {
id: RegionId::new(table_id, 0),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
}]);
let table_route_value = TableRouteValue::Physical(table_route);
table_metadata_manager
.create_table_metadata(table_info, table_route_value, HashMap::new())
.await
.unwrap();
// Create a logical table metadata
let logical_table_id = 1025;
let mut test_create_logical_table_task = test_create_logical_table_task("my_logical_table");
test_create_logical_table_task.set_table_id(logical_table_id);
let table_info = test_create_logical_table_task.table_info;
let table_route = LogicalTableRouteValue::new(1024, vec![RegionId::new(1025, 0)]);
let table_route_value = TableRouteValue::Logical(table_route);
table_metadata_manager
.create_table_metadata(table_info, table_route_value, HashMap::new())
.await
.unwrap();
tokio::spawn(async move { supervisor.run().await });
let (tx, rx) = oneshot::channel();
sender.send(Event::InitializeAllRegions(tx)).await.unwrap();
assert!(rx.await.is_ok());
let (tx, rx) = oneshot::channel();
sender.send(Event::Dump(tx)).await.unwrap();
let detector = rx.await.unwrap();
assert_eq!(detector.len(), 1);
assert!(detector.contains(&(1, RegionId::new(1024, 0))));
}
#[tokio::test]
async fn test_initialize_all_regions_with_maintenance_mode() {
common_telemetry::init_default_ut_logging();
let (mut supervisor, sender) = new_test_supervisor();
supervisor
.runtime_switch_manager
.set_maintenance_mode()
.await
.unwrap();
tokio::spawn(async move { supervisor.run().await });
let (tx, rx) = oneshot::channel();
sender.send(Event::InitializeAllRegions(tx)).await.unwrap();
// The sender is dropped, so the receiver will receive an error.
assert!(rx.await.is_err());
}
#[tokio::test]
async fn test_region_failure_detector_controller() {
let (mut supervisor, sender) = new_test_supervisor();

View File

@@ -17,6 +17,7 @@ mod heartbeat;
mod leader;
mod maintenance;
mod node_lease;
mod procedure;
mod util;
use std::collections::HashMap;
@@ -56,10 +57,25 @@ pub fn make_admin_service(metasrv: Arc<Metasrv>) -> Admin {
},
);
let router = router.route(
"/maintenance",
let router = router.routes(
&[
"/maintenance",
"/maintenance/status",
"/maintenance/enable",
"/maintenance/disable",
],
maintenance::MaintenanceHandler {
manager: metasrv.maintenance_mode_manager().clone(),
manager: metasrv.runtime_switch_manager().clone(),
},
);
let router = router.routes(
&[
"/procedure-manager/pause",
"/procedure-manager/resume",
"/procedure-manager/status",
],
procedure::ProcedureManagerHandler {
manager: metasrv.runtime_switch_manager().clone(),
},
);
let router = Router::nest("/admin", router);
@@ -97,10 +113,7 @@ impl NamedService for Admin {
const NAME: &'static str = "admin";
}
impl<T> Service<http::Request<T>> for Admin
where
T: Send,
{
impl Service<http::Request<BoxBody>> for Admin {
type Response = http::Response<BoxBody>;
type Error = Infallible;
type Future = BoxFuture<Self::Response, Self::Error>;
@@ -109,7 +122,7 @@ where
Poll::Ready(Ok(()))
}
fn call(&mut self, req: http::Request<T>) -> Self::Future {
fn call(&mut self, req: http::Request<BoxBody>) -> Self::Future {
let router = self.router.clone();
let query_params = req
.uri()
@@ -128,7 +141,7 @@ where
#[derive(Default)]
pub struct Router {
handlers: HashMap<String, Box<dyn HttpHandler>>,
handlers: HashMap<String, Arc<dyn HttpHandler>>,
}
impl Router {
@@ -153,7 +166,17 @@ impl Router {
pub fn route(mut self, path: &str, handler: impl HttpHandler + 'static) -> Self {
check_path(path);
let _ = self.handlers.insert(path.to_owned(), Box::new(handler));
let _ = self.handlers.insert(path.to_owned(), Arc::new(handler));
self
}
pub fn routes(mut self, paths: &[&str], handler: impl HttpHandler + 'static) -> Self {
let handler = Arc::new(handler);
for path in paths {
check_path(path);
let _ = self.handlers.insert(path.to_string(), handler.clone());
}
self
}
@@ -204,7 +227,7 @@ fn boxed(body: String) -> BoxBody {
mod tests {
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream};
use super::*;
use crate::metasrv::builder::MetasrvBuilder;
@@ -325,6 +348,13 @@ mod tests {
metasrv
}
async fn send_request(client: &mut DuplexStream, request: &[u8]) -> String {
client.write_all(request).await.unwrap();
let mut buf = vec![0; 1024];
let n = client.read(&mut buf).await.unwrap();
String::from_utf8_lossy(&buf[..n]).to_string()
}
#[tokio::test(flavor = "multi_thread")]
async fn test_metasrv_maintenance_mode() {
common_telemetry::init_default_ut_logging();
@@ -343,73 +373,188 @@ mod tests {
});
// Get maintenance mode
let http_request = b"GET /admin/maintenance HTTP/1.1\r\nHost: localhost\r\n\r\n";
client.write_all(http_request).await.unwrap();
let mut buf = vec![0; 1024];
let n = client.read(&mut buf).await.unwrap();
let response = String::from_utf8_lossy(&buf[..n]);
let response = send_request(
&mut client,
b"GET /admin/maintenance HTTP/1.1\r\nHost: localhost\r\n\r\n",
)
.await;
assert!(response.contains(r#"{"enabled":false}"#));
assert!(response.contains("200 OK"));
// Set maintenance mode to true
let http_post = b"POST /admin/maintenance?enable=true HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n";
client.write_all(http_post).await.unwrap();
let mut buf = vec![0; 1024];
let n = client.read(&mut buf).await.unwrap();
let response = String::from_utf8_lossy(&buf[..n]);
let response = send_request(
&mut client,
b"POST /admin/maintenance?enable=true HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
)
.await;
assert!(response.contains(r#"{"enabled":true}"#));
assert!(response.contains("200 OK"));
let enabled = metasrv
.maintenance_mode_manager()
.runtime_switch_manager()
.maintenance_mode()
.await
.unwrap();
assert!(enabled);
// Get maintenance mode again
let http_request = b"GET /admin/maintenance HTTP/1.1\r\nHost: localhost\r\n\r\n";
client.write_all(http_request).await.unwrap();
let mut buf = vec![0; 1024];
let n = client.read(&mut buf).await.unwrap();
let response = String::from_utf8_lossy(&buf[..n]);
let response = send_request(
&mut client,
b"GET /admin/maintenance HTTP/1.1\r\nHost: localhost\r\n\r\n",
)
.await;
assert!(response.contains(r#"{"enabled":true}"#));
assert!(response.contains("200 OK"));
// Set maintenance mode to false
let http_post = b"POST /admin/maintenance?enable=false HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n";
client.write_all(http_post).await.unwrap();
let mut buf = vec![0; 1024];
let n = client.read(&mut buf).await.unwrap();
let response = String::from_utf8_lossy(&buf[..n]);
let response = send_request(
&mut client,
b"POST /admin/maintenance?enable=false HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
)
.await;
assert!(response.contains(r#"{"enabled":false}"#));
assert!(response.contains("200 OK"));
let enabled = metasrv
.maintenance_mode_manager()
.runtime_switch_manager()
.maintenance_mode()
.await
.unwrap();
assert!(!enabled);
// Set maintenance mode to true via GET request
let http_request =
b"GET /admin/maintenance?enable=true HTTP/1.1\r\nHost: localhost\r\n\r\n";
client.write_all(http_request).await.unwrap();
let mut buf = vec![0; 1024];
let n = client.read(&mut buf).await.unwrap();
let response = String::from_utf8_lossy(&buf[..n]);
let response = send_request(
&mut client,
b"GET /admin/maintenance?enable=true HTTP/1.1\r\nHost: localhost\r\n\r\n",
)
.await;
assert!(response.contains(r#"{"enabled":true}"#));
assert!(response.contains("200 OK"));
// Set maintenance mode to false via GET request
let http_request =
b"PUT /admin/maintenance?enable=false HTTP/1.1\r\nHost: localhost\r\n\r\n";
client.write_all(http_request).await.unwrap();
let mut buf = vec![0; 1024];
let n = client.read(&mut buf).await.unwrap();
let response = String::from_utf8_lossy(&buf[..n]);
let response = send_request(
&mut client,
b"PUT /admin/maintenance?enable=false HTTP/1.1\r\nHost: localhost\r\n\r\n",
)
.await;
assert!(response.contains(r#"{"enabled":false}"#));
assert!(response.contains("200 OK"));
// Get maintenance mode via status path
let response = send_request(
&mut client,
b"GET /admin/maintenance/status HTTP/1.1\r\nHost: localhost\r\n\r\n",
)
.await;
assert!(response.contains(r#"{"enabled":false}"#));
// Set maintenance mode via enable path
let response = send_request(
&mut client,
b"POST /admin/maintenance/enable HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
)
.await;
assert!(response.contains(r#"{"enabled":true}"#));
// Unset maintenance mode via disable path
let response = send_request(
&mut client,
b"POST /admin/maintenance/disable HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
)
.await;
assert!(response.contains(r#"{"enabled":false}"#));
// send POST request to status path
let response = send_request(
&mut client,
b"POST /admin/maintenance/status HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
)
.await;
assert!(response.contains("404 Not Found"));
// send GET request to enable path
let response = send_request(
&mut client,
b"GET /admin/maintenance/enable HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
)
.await;
assert!(response.contains("404 Not Found"));
// send GET request to disable path
let response = send_request(
&mut client,
b"GET /admin/maintenance/disable HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
)
.await;
assert!(response.contains("404 Not Found"));
}
#[tokio::test(flavor = "multi_thread")]
async fn test_metasrv_procedure_manager_handler() {
common_telemetry::init_default_ut_logging();
let kv_backend = Arc::new(MemoryKvBackend::new());
let metasrv = test_metasrv(kv_backend).await;
metasrv.try_start().await.unwrap();
let (mut client, server) = tokio::io::duplex(1024);
let metasrv = Arc::new(metasrv);
let service = metasrv.clone();
let _handle = tokio::spawn(async move {
let router = bootstrap::router(service);
router
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
});
// send GET request to procedure-manager/status path
let response = send_request(
&mut client,
b"GET /admin/procedure-manager/status HTTP/1.1\r\nHost: localhost\r\n\r\n",
)
.await;
assert!(response.contains("200 OK"));
assert!(
response.contains(r#"{"status":"running"}"#),
"response: {}",
response
);
// send POST request to procedure-manager/pause path
let response = send_request(
&mut client,
b"POST /admin/procedure-manager/pause HTTP/1.1\r\nHost: localhost\r\n\r\n",
)
.await;
assert!(response.contains("200 OK"));
assert!(response.contains(r#"{"status":"paused"}"#));
// send POST request to procedure-manager/resume path
let response = send_request(
&mut client,
b"POST /admin/procedure-manager/resume HTTP/1.1\r\nHost: localhost\r\n\r\n",
)
.await;
assert!(response.contains("200 OK"));
assert!(
response.contains(r#"{"status":"running"}"#),
"response: {}",
response
);
// send GET request to procedure-manager/resume path
let response = send_request(
&mut client,
b"GET /admin/procedure-manager/resume HTTP/1.1\r\nHost: localhost\r\n\r\n",
)
.await;
assert!(response.contains("404 Not Found"));
// send GET request to procedure-manager/pause path
let response = send_request(
&mut client,
b"GET /admin/procedure-manager/pause HTTP/1.1\r\nHost: localhost\r\n\r\n",
)
.await;
assert!(response.contains("404 Not Found"));
}
}

View File

@@ -14,7 +14,7 @@
use std::collections::HashMap;
use common_meta::key::maintenance::MaintenanceModeManagerRef;
use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
use common_telemetry::{info, warn};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
@@ -22,14 +22,15 @@ use tonic::codegen::http;
use tonic::codegen::http::Response;
use crate::error::{
self, InvalidHttpBodySnafu, MaintenanceModeManagerSnafu, MissingRequiredParameterSnafu,
ParseBoolSnafu, Result, UnsupportedSnafu,
self, MissingRequiredParameterSnafu, ParseBoolSnafu, Result, RuntimeSwitchManagerSnafu,
UnsupportedSnafu,
};
use crate::service::admin::util::{to_json_response, to_not_found_response};
use crate::service::admin::HttpHandler;
#[derive(Clone)]
pub struct MaintenanceHandler {
pub manager: MaintenanceModeManagerRef,
pub manager: RuntimeSwitchManagerRef,
}
#[derive(Debug, Serialize, Deserialize)]
@@ -48,79 +49,124 @@ impl TryFrom<MaintenanceResponse> for String {
}
impl MaintenanceHandler {
async fn get_maintenance(&self) -> crate::Result<Response<String>> {
async fn get_maintenance(&self) -> crate::Result<MaintenanceResponse> {
let enabled = self
.manager
.maintenance_mode()
.await
.context(MaintenanceModeManagerSnafu)?;
let response = MaintenanceResponse { enabled }.try_into()?;
http::Response::builder()
.status(http::StatusCode::OK)
.body(response)
.context(InvalidHttpBodySnafu)
.context(RuntimeSwitchManagerSnafu)?;
Ok(MaintenanceResponse { enabled })
}
async fn set_maintenance(
async fn set_maintenance(&self) -> crate::Result<MaintenanceResponse> {
self.manager
.set_maintenance_mode()
.await
.context(RuntimeSwitchManagerSnafu)?;
// TODO(weny): Add a record to the system events.
info!("Enable the maintenance mode.");
Ok(MaintenanceResponse { enabled: true })
}
async fn unset_maintenance(&self) -> crate::Result<MaintenanceResponse> {
self.manager
.unset_maintenance_mode()
.await
.context(RuntimeSwitchManagerSnafu)?;
// TODO(weny): Add a record to the system events.
info!("Disable the maintenance mode.");
Ok(MaintenanceResponse { enabled: false })
}
async fn handle_legacy_maintenance(
&self,
params: &HashMap<String, String>,
) -> crate::Result<Response<String>> {
let enable = params
.get("enable")
.map(|v| v.parse::<bool>())
.context(MissingRequiredParameterSnafu { param: "enable" })?
.context(ParseBoolSnafu {
err_msg: "'enable' must be 'true' or 'false'",
})?;
) -> crate::Result<MaintenanceResponse> {
let enable = get_enable_from_params(params)?;
if enable {
self.manager
.set_maintenance_mode()
.await
.context(MaintenanceModeManagerSnafu)?;
info!("Enable the maintenance mode.");
self.set_maintenance().await
} else {
self.manager
.unset_maintenance_mode()
.await
.context(MaintenanceModeManagerSnafu)?;
info!("Disable the maintenance mode.");
};
let response = MaintenanceResponse { enabled: enable }.try_into()?;
http::Response::builder()
.status(http::StatusCode::OK)
.body(response)
.context(InvalidHttpBodySnafu)
self.unset_maintenance().await
}
}
}
fn get_enable_from_params(params: &HashMap<String, String>) -> crate::Result<bool> {
params
.get("enable")
.map(|v| v.parse::<bool>())
.context(MissingRequiredParameterSnafu { param: "enable" })?
.context(ParseBoolSnafu {
err_msg: "'enable' must be 'true' or 'false'",
})
}
const MAINTENANCE_PATH: &str = "maintenance";
const ENABLE_SUFFIX: &str = "enable";
const DISABLE_SUFFIX: &str = "disable";
const STATUS_SUFFIX: &str = "status";
#[async_trait::async_trait]
impl HttpHandler for MaintenanceHandler {
// TODO(weny): Remove the legacy version of the maintenance API.
// However, we need to keep the legacy version for a while to avoid breaking the existing operators.
async fn handle(
&self,
_: &str,
path: &str,
method: http::Method,
params: &HashMap<String, String>,
) -> crate::Result<Response<String>> {
match method {
http::Method::GET => {
if params.is_empty() {
self.get_maintenance().await
} else {
if path.ends_with(STATUS_SUFFIX) {
// Handle GET request to '/admin/maintenance/status'
let response = self.get_maintenance().await?;
to_json_response(response)
} else if path.ends_with(MAINTENANCE_PATH) && params.is_empty() {
// Handle GET request to '/admin/maintenance'. (The legacy version)
let response = self.get_maintenance().await?;
to_json_response(response)
} else if path.ends_with(MAINTENANCE_PATH) {
// Handle GET request to '/admin/maintenance' with URL parameters. (The legacy version)
warn!(
"Found URL parameters in '/admin/maintenance' request, it's deprecated, will be removed in the future"
);
// The old version operator will send GET request with URL parameters,
// so we need to support it.
self.set_maintenance(params).await
let response = self.handle_legacy_maintenance(params).await?;
to_json_response(response)
} else {
to_not_found_response()
}
}
http::Method::PUT => {
warn!("Found PUT request to '/admin/maintenance', it's deprecated, will be removed in the future");
self.set_maintenance(params).await
// Handle PUT request to '/admin/maintenance' with URL parameters. (The legacy version)
if path.ends_with(MAINTENANCE_PATH) {
warn!("Found PUT request to '/admin/maintenance', it's deprecated, will be removed in the future");
let response = self.handle_legacy_maintenance(params).await?;
to_json_response(response)
} else {
to_not_found_response()
}
}
http::Method::POST => {
// Handle POST request to '/admin/maintenance/enable'
if path.ends_with(ENABLE_SUFFIX) {
let response = self.set_maintenance().await?;
to_json_response(response)
} else if path.ends_with(DISABLE_SUFFIX) {
// Handle POST request to '/admin/maintenance/disable'
let response = self.unset_maintenance().await?;
to_json_response(response)
} else if path.ends_with(MAINTENANCE_PATH) {
// Handle POST request to '/admin/maintenance' with URL parameters. (The legacy version)
warn!("Found PUT request to '/admin/maintenance', it's deprecated, will be removed in the future");
let response = self.handle_legacy_maintenance(params).await?;
to_json_response(response)
} else {
to_not_found_response()
}
}
http::Method::POST => self.set_maintenance(params).await,
_ => UnsupportedSnafu {
operation: format!("http method {method}"),
}

View File

@@ -0,0 +1,119 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use tonic::codegen::http;
use tonic::codegen::http::Response;
use crate::error::RuntimeSwitchManagerSnafu;
use crate::service::admin::util::{to_json_response, to_not_found_response};
use crate::service::admin::HttpHandler;
#[derive(Clone)]
pub struct ProcedureManagerHandler {
pub manager: RuntimeSwitchManagerRef,
}
#[derive(Debug, Serialize, Deserialize)]
struct ProcedureManagerStatusResponse {
status: ProcedureManagerStatus,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
enum ProcedureManagerStatus {
Paused,
Running,
}
impl ProcedureManagerHandler {
async fn pause_procedure_manager(&self) -> crate::Result<ProcedureManagerStatusResponse> {
self.manager
.pasue_procedure()
.await
.context(RuntimeSwitchManagerSnafu)?;
// TODO(weny): Add a record to the system events.
info!("Pause the procedure manager.");
Ok(ProcedureManagerStatusResponse {
status: ProcedureManagerStatus::Paused,
})
}
async fn resume_procedure_manager(&self) -> crate::Result<ProcedureManagerStatusResponse> {
self.manager
.resume_procedure()
.await
.context(RuntimeSwitchManagerSnafu)?;
// TODO(weny): Add a record to the system events.
info!("Resume the procedure manager.");
Ok(ProcedureManagerStatusResponse {
status: ProcedureManagerStatus::Running,
})
}
async fn get_procedure_manager_status(&self) -> crate::Result<ProcedureManagerStatusResponse> {
let is_paused = self
.manager
.is_procedure_paused()
.await
.context(RuntimeSwitchManagerSnafu)?;
let response = ProcedureManagerStatusResponse {
status: if is_paused {
ProcedureManagerStatus::Paused
} else {
ProcedureManagerStatus::Running
},
};
Ok(response)
}
}
#[async_trait::async_trait]
impl HttpHandler for ProcedureManagerHandler {
async fn handle(
&self,
path: &str,
method: http::Method,
_: &HashMap<String, String>,
) -> crate::Result<Response<String>> {
match method {
http::Method::GET => {
if path.ends_with("status") {
let response = self.get_procedure_manager_status().await?;
to_json_response(response)
} else {
to_not_found_response()
}
}
http::Method::POST => {
if path.ends_with("pause") {
let response = self.pause_procedure_manager().await?;
to_json_response(response)
} else if path.ends_with("resume") {
let response = self.resume_procedure_manager().await?;
to_json_response(response)
} else {
to_not_found_response()
}
}
_ => to_not_found_response(),
}
}
}

View File

@@ -12,11 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Debug;
use serde::Serialize;
use snafu::ResultExt;
use tonic::codegen::http;
use crate::error::{self, Result};
/// Returns a 200 response with a text body.
pub fn to_text_response(text: &str) -> Result<http::Response<String>> {
http::Response::builder()
.header("Content-Type", "text/plain")
@@ -24,3 +28,26 @@ pub fn to_text_response(text: &str) -> Result<http::Response<String>> {
.body(text.to_string())
.context(error::InvalidHttpBodySnafu)
}
/// Returns a 200 response with a JSON body.
pub fn to_json_response<T>(response: T) -> Result<http::Response<String>>
where
T: Serialize + Debug,
{
let response = serde_json::to_string(&response).context(error::SerializeToJsonSnafu {
input: format!("{response:?}"),
})?;
http::Response::builder()
.header("Content-Type", "application/json")
.status(http::StatusCode::OK)
.body(response)
.context(error::InvalidHttpBodySnafu)
}
/// Returns a 404 response with an empty body.
pub fn to_not_found_response() -> Result<http::Response<String>> {
http::Response::builder()
.status(http::StatusCode::NOT_FOUND)
.body("".to_string())
.context(error::InvalidHttpBodySnafu)
}

View File

@@ -158,6 +158,7 @@ impl RegionEngine for MetricEngine {
Ok(RegionResponse {
affected_rows: rows,
extensions: extension_return_value,
metadata: Vec::new(),
})
}
BatchRegionDdlRequest::Alter(requests) => {
@@ -171,6 +172,7 @@ impl RegionEngine for MetricEngine {
Ok(RegionResponse {
affected_rows: rows,
extensions: extension_return_value,
metadata: Vec::new(),
})
}
BatchRegionDdlRequest::Drop(requests) => {
@@ -243,6 +245,7 @@ impl RegionEngine for MetricEngine {
result.map_err(BoxedError::new).map(|rows| RegionResponse {
affected_rows: rows,
extensions: extension_return_value,
metadata: Vec::new(),
})
}
@@ -439,6 +442,7 @@ impl MetricEngine {
Ok(RegionResponse {
affected_rows,
extensions,
metadata: Vec::new(),
})
}
}

View File

@@ -59,7 +59,7 @@ impl TestEnv {
/// Returns a new env with specific `prefix` and `config` for test.
pub async fn with_prefix_and_config(prefix: &str, config: EngineConfig) -> Self {
let mut mito_env = MitoTestEnv::with_prefix(prefix);
let mut mito_env = MitoTestEnv::with_prefix(prefix).await;
let mito = mito_env.create_engine(MitoConfig::default()).await;
let metric = MetricEngine::try_new(mito.clone(), config).unwrap();
Self {

View File

@@ -16,7 +16,7 @@ use std::sync::Arc;
use object_store::services::Fs;
use object_store::util::{join_dir, with_instrument_layers};
use object_store::{ErrorKind, ObjectStore};
use object_store::{ErrorKind, ObjectStore, ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR};
use smallvec::SmallVec;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
@@ -42,10 +42,6 @@ pub type AccessLayerRef = Arc<AccessLayer>;
/// SST write results.
pub type SstInfoArray = SmallVec<[SstInfo; 2]>;
pub const ATOMIC_WRITE_DIR: &str = "tmp/";
/// For compatibility. Remove this after a major version release.
pub const OLD_ATOMIC_WRITE_DIR: &str = ".tmp/";
/// A layer to access SST files under the same directory.
pub struct AccessLayer {
region_dir: String,

View File

@@ -245,7 +245,7 @@ mod test {
let blob = create_inverted_index_blob().await;
// Init a test range reader in local fs.
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let file_size = blob.len() as u64;
let store = env.init_object_store_manager();
let temp_path = "data";

View File

@@ -31,6 +31,11 @@ use crate::sst::parquet::row_selection::RowGroupSelection;
const INDEX_RESULT_TYPE: &str = "index_result";
/// Cache for storing index query results.
///
/// The `RowGroupSelection` is a collection of row groups that match the predicate.
///
/// Row groups can be partially searched. Row groups that not contained in `RowGroupSelection` are not searched.
/// User can retrieve the partial results and handle uncontained row groups required by the predicate subsequently.
pub struct IndexResultCache {
cache: Cache<(PredicateKey, FileId), Arc<RowGroupSelection>>,
}
@@ -64,6 +69,8 @@ impl IndexResultCache {
}
/// Puts a query result into the cache.
///
/// Allow user to put a partial result (not containing all row groups) into the cache.
pub fn put(&self, key: PredicateKey, file_id: FileId, result: Arc<RowGroupSelection>) {
let key = (key, file_id);
let size = Self::index_result_cache_weight(&key, &result);
@@ -74,6 +81,9 @@ impl IndexResultCache {
}
/// Gets a query result from the cache.
///
/// Note: the returned `RowGroupSelection` only contains the row groups that are searched.
/// Caller should handle the uncontained row groups required by the predicate subsequently.
pub fn get(&self, key: &PredicateKey, file_id: FileId) -> Option<Arc<RowGroupSelection>> {
let res = self.cache.get(&(key.clone(), file_id));
if res.is_some() {

View File

@@ -430,9 +430,10 @@ impl UploadTracker {
#[cfg(test)]
mod tests {
use common_test_util::temp_dir::create_temp_dir;
use object_store::ATOMIC_WRITE_DIR;
use super::*;
use crate::access_layer::{OperationType, ATOMIC_WRITE_DIR};
use crate::access_layer::OperationType;
use crate::cache::test_util::new_fs_store;
use crate::cache::{CacheManager, CacheStrategy};
use crate::error::InvalidBatchSnafu;
@@ -449,7 +450,7 @@ mod tests {
async fn test_write_and_upload_sst() {
// TODO(QuenKar): maybe find a way to create some object server for testing,
// and now just use local file system to mock.
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let mock_store = env.init_object_store_manager();
let path_provider = RegionFilePathFactory::new("test".to_string());
@@ -537,7 +538,7 @@ mod tests {
#[tokio::test]
async fn test_read_metadata_from_write_cache() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let data_home = env.data_home().display().to_string();
let mock_store = env.init_object_store_manager();
@@ -606,7 +607,7 @@ mod tests {
#[tokio::test]
async fn test_write_cache_clean_tmp_files() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let data_home = env.data_home().display().to_string();
let mock_store = env.init_object_store_manager();

View File

@@ -80,7 +80,6 @@ use snafu::{ensure, OptionExt, ResultExt};
use store_api::codec::PrimaryKeyEncoding;
use store_api::logstore::provider::Provider;
use store_api::logstore::LogStore;
use store_api::manifest::ManifestVersion;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::MANIFEST_INFO_EXTENSION_KEY;
use store_api::region_engine::{
@@ -89,6 +88,7 @@ use store_api::region_engine::{
};
use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
use store_api::ManifestVersion;
use tokio::sync::{oneshot, Semaphore};
use crate::cache::CacheStrategy;
@@ -101,6 +101,7 @@ use crate::manifest::action::RegionEdit;
use crate::memtable::MemtableStats;
use crate::metrics::HANDLE_REQUEST_ELAPSED;
use crate::read::scan_region::{ScanRegion, Scanner};
use crate::read::stream::ScanBatchStream;
use crate::region::MitoRegionRef;
use crate::request::{RegionEditRequest, WorkerRequest};
use crate::sst::file::FileMeta;
@@ -183,6 +184,18 @@ impl MitoEngine {
.await
}
/// Scan [`Batch`]es by [`ScanRequest`].
pub async fn scan_batch(
&self,
region_id: RegionId,
request: ScanRequest,
filter_deleted: bool,
) -> Result<ScanBatchStream> {
let mut scan_region = self.scan_region(region_id, request)?;
scan_region.set_filter_deleted(filter_deleted);
scan_region.scanner().await?.scan_batch()
}
/// Returns a scanner to scan for `request`.
async fn scanner(&self, region_id: RegionId, request: ScanRequest) -> Result<Scanner> {
self.scan_region(region_id, request)?.scanner().await

View File

@@ -115,7 +115,7 @@ fn check_region_version(
async fn test_alter_region() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -211,7 +211,7 @@ fn build_rows_for_tags(
#[tokio::test]
async fn test_put_after_alter() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
@@ -316,7 +316,7 @@ async fn test_put_after_alter() {
async fn test_alter_region_retry() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -374,7 +374,7 @@ async fn test_alter_region_retry() {
async fn test_alter_on_flushing() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let listener = Arc::new(AlterFlushListener::default());
let engine = env
.create_engine_with(MitoConfig::default(), None, Some(listener.clone()))
@@ -478,7 +478,7 @@ async fn test_alter_on_flushing() {
async fn test_alter_column_fulltext_options() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let listener = Arc::new(AlterFlushListener::default());
let engine = env
.create_engine_with(MitoConfig::default(), None, Some(listener.clone()))
@@ -597,7 +597,7 @@ async fn test_alter_column_fulltext_options() {
async fn test_alter_column_set_inverted_index() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let listener = Arc::new(AlterFlushListener::default());
let engine = env
.create_engine_with(MitoConfig::default(), None, Some(listener.clone()))
@@ -707,7 +707,7 @@ async fn test_alter_column_set_inverted_index() {
async fn test_alter_region_ttl_options() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let listener = Arc::new(AlterFlushListener::default());
let engine = env
.create_engine_with(MitoConfig::default(), None, Some(listener.clone()))
@@ -757,7 +757,7 @@ async fn test_alter_region_ttl_options() {
async fn test_write_stall_on_altering() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let listener = Arc::new(NotifyRegionChangeResultListener::default());
let engine = env
.create_engine_with(MitoConfig::default(), None, Some(listener.clone()))

View File

@@ -31,7 +31,7 @@ use crate::test_util::{
async fn test_append_mode_write_query() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -89,7 +89,7 @@ async fn test_append_mode_write_query() {
#[tokio::test]
async fn test_append_mode_compaction() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
..Default::default()

View File

@@ -42,7 +42,7 @@ use crate::test_util::{
#[tokio::test]
async fn test_engine_new_stop() {
let mut env = TestEnv::with_prefix("engine-stop");
let mut env = TestEnv::with_prefix("engine-stop").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -69,7 +69,7 @@ async fn test_engine_new_stop() {
#[tokio::test]
async fn test_write_to_region() {
let mut env = TestEnv::with_prefix("write-to-region");
let mut env = TestEnv::with_prefix("write-to-region").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -97,7 +97,9 @@ async fn test_region_replay(factory: Option<LogStoreFactory>) {
let Some(factory) = factory else {
return;
};
let mut env = TestEnv::with_prefix("region-replay").with_log_store_factory(factory.clone());
let mut env = TestEnv::with_prefix("region-replay")
.await
.with_log_store_factory(factory.clone());
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -173,7 +175,7 @@ async fn test_region_replay(factory: Option<LogStoreFactory>) {
#[tokio::test]
async fn test_write_query_region() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -207,7 +209,7 @@ async fn test_write_query_region() {
#[tokio::test]
async fn test_different_order() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -268,7 +270,7 @@ async fn test_different_order() {
#[tokio::test]
async fn test_different_order_and_type() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -332,7 +334,7 @@ async fn test_different_order_and_type() {
async fn test_put_delete() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -384,7 +386,7 @@ async fn test_put_delete() {
#[tokio::test]
async fn test_delete_not_null_fields() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -433,7 +435,7 @@ async fn test_delete_not_null_fields() {
#[tokio::test]
async fn test_put_overwrite() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -493,7 +495,7 @@ async fn test_put_overwrite() {
#[tokio::test]
async fn test_absent_and_invalid_columns() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -541,7 +543,7 @@ async fn test_absent_and_invalid_columns() {
#[tokio::test]
async fn test_region_usage() {
let mut env = TestEnv::with_prefix("region_usage");
let mut env = TestEnv::with_prefix("region_usage").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -595,7 +597,7 @@ async fn test_region_usage() {
async fn test_engine_with_write_cache() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let path = env.data_home().to_str().unwrap().to_string();
let mito_config = MitoConfig::default().enable_write_cache(path, ReadableSize::mb(512), None);
let engine = env.create_engine(mito_config).await;
@@ -635,7 +637,7 @@ async fn test_engine_with_write_cache() {
#[tokio::test]
async fn test_cache_null_primary_key() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
vector_cache_size: ReadableSize::mb(32),

View File

@@ -39,8 +39,9 @@ async fn test_batch_open(factory: Option<LogStoreFactory>) {
let Some(factory) = factory else {
return;
};
let mut env =
TestEnv::with_prefix("open-batch-regions").with_log_store_factory(factory.clone());
let mut env = TestEnv::with_prefix("open-batch-regions")
.await
.with_log_store_factory(factory.clone());
let engine = env.create_engine(MitoConfig::default()).await;
let topic = prepare_test_for_kafka_log_store(&factory).await;
@@ -160,8 +161,9 @@ async fn test_batch_open_err(factory: Option<LogStoreFactory>) {
let Some(factory) = factory else {
return;
};
let mut env =
TestEnv::with_prefix("open-batch-regions-err").with_log_store_factory(factory.clone());
let mut env = TestEnv::with_prefix("open-batch-regions-err")
.await
.with_log_store_factory(factory.clone());
let engine = env.create_engine(MitoConfig::default()).await;
let topic = prepare_test_for_kafka_log_store(&factory).await;
let mut options = HashMap::new();

View File

@@ -57,7 +57,9 @@ async fn test_catchup_with_last_entry_id(factory: Option<LogStoreFactory>) {
return;
};
let mut env = TestEnv::with_prefix("last_entry_id").with_log_store_factory(factory.clone());
let mut env = TestEnv::with_prefix("last_entry_id")
.await
.with_log_store_factory(factory.clone());
let topic = prepare_test_for_kafka_log_store(&factory).await;
let leader_engine = env.create_engine(MitoConfig::default()).await;
let follower_engine = env.create_follower_engine(MitoConfig::default()).await;
@@ -175,8 +177,9 @@ async fn test_catchup_with_incorrect_last_entry_id(factory: Option<LogStoreFacto
return;
};
let mut env =
TestEnv::with_prefix("incorrect_last_entry_id").with_log_store_factory(factory.clone());
let mut env = TestEnv::with_prefix("incorrect_last_entry_id")
.await
.with_log_store_factory(factory.clone());
let topic = prepare_test_for_kafka_log_store(&factory).await;
let leader_engine = env.create_engine(MitoConfig::default()).await;
let follower_engine = env.create_follower_engine(MitoConfig::default()).await;
@@ -277,8 +280,9 @@ async fn test_catchup_without_last_entry_id(factory: Option<LogStoreFactory>) {
return;
};
let mut env =
TestEnv::with_prefix("without_last_entry_id").with_log_store_factory(factory.clone());
let mut env = TestEnv::with_prefix("without_last_entry_id")
.await
.with_log_store_factory(factory.clone());
let topic = prepare_test_for_kafka_log_store(&factory).await;
let leader_engine = env.create_engine(MitoConfig::default()).await;
let follower_engine = env.create_follower_engine(MitoConfig::default()).await;
@@ -380,8 +384,9 @@ async fn test_catchup_with_manifest_update(factory: Option<LogStoreFactory>) {
return;
};
let mut env =
TestEnv::with_prefix("without_manifest_update").with_log_store_factory(factory.clone());
let mut env = TestEnv::with_prefix("without_manifest_update")
.await
.with_log_store_factory(factory.clone());
let topic = prepare_test_for_kafka_log_store(&factory).await;
let leader_engine = env.create_engine(MitoConfig::default()).await;
let follower_engine = env.create_follower_engine(MitoConfig::default()).await;
@@ -545,7 +550,9 @@ async fn test_local_catchup(factory: Option<LogStoreFactory>) {
return;
};
let mut env = TestEnv::with_prefix("local_catchup").with_log_store_factory(factory.clone());
let mut env = TestEnv::with_prefix("local_catchup")
.await
.with_log_store_factory(factory.clone());
let leader_engine = env.create_engine(MitoConfig::default()).await;
let Some(LogStoreImpl::RaftEngine(log_store)) = env.get_log_store() else {
unreachable!()
@@ -686,7 +693,7 @@ async fn test_local_catchup(factory: Option<LogStoreFactory>) {
#[tokio::test]
async fn test_catchup_not_exist() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let non_exist_region_id = RegionId::new(1, 1);

View File

@@ -21,7 +21,7 @@ use crate::test_util::{CreateRequestBuilder, TestEnv};
#[tokio::test]
async fn test_engine_close_region() {
let mut env = TestEnv::with_prefix("close");
let mut env = TestEnv::with_prefix("close").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);

View File

@@ -136,7 +136,7 @@ async fn collect_stream_ts(stream: SendableRecordBatchStream) -> Vec<i64> {
#[tokio::test]
async fn test_compaction_region() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -202,7 +202,7 @@ async fn test_compaction_region() {
#[tokio::test]
async fn test_infer_compaction_time_window() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -341,7 +341,7 @@ async fn test_infer_compaction_time_window() {
#[tokio::test]
async fn test_compaction_overlapping_files() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -402,7 +402,7 @@ async fn test_compaction_overlapping_files() {
#[tokio::test]
async fn test_compaction_region_with_overlapping() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -450,7 +450,7 @@ async fn test_compaction_region_with_overlapping() {
#[tokio::test]
async fn test_compaction_region_with_overlapping_delete_all() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -506,7 +506,7 @@ async fn test_compaction_region_with_overlapping_delete_all() {
#[tokio::test]
async fn test_readonly_during_compaction() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let listener = Arc::new(CompactionListener::default());
let engine = env
.create_engine_with(
@@ -590,7 +590,7 @@ async fn test_readonly_during_compaction() {
#[tokio::test]
async fn test_compaction_update_time_window() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -686,7 +686,7 @@ async fn test_compaction_update_time_window() {
#[tokio::test]
async fn test_change_region_compaction_window() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -811,7 +811,7 @@ async fn test_change_region_compaction_window() {
#[tokio::test]
async fn test_open_overwrite_compaction_window() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);

View File

@@ -26,7 +26,7 @@ use crate::test_util::{build_rows, put_rows, rows_schema, CreateRequestBuilder,
#[tokio::test]
async fn test_engine_create_new_region() {
let mut env = TestEnv::with_prefix("new-region");
let mut env = TestEnv::with_prefix("new-region").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -41,7 +41,7 @@ async fn test_engine_create_new_region() {
#[tokio::test]
async fn test_engine_create_existing_region() {
let mut env = TestEnv::with_prefix("create-existing");
let mut env = TestEnv::with_prefix("create-existing").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -61,7 +61,7 @@ async fn test_engine_create_existing_region() {
#[tokio::test]
async fn test_engine_create_close_create_region() {
// This test will trigger create_or_open function.
let mut env = TestEnv::with_prefix("create-close-create");
let mut env = TestEnv::with_prefix("create-close-create").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -91,7 +91,7 @@ async fn test_engine_create_close_create_region() {
#[tokio::test]
async fn test_engine_create_with_different_id() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -110,7 +110,7 @@ async fn test_engine_create_with_different_id() {
#[tokio::test]
async fn test_engine_create_with_different_schema() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -130,7 +130,7 @@ async fn test_engine_create_with_different_schema() {
#[tokio::test]
async fn test_engine_create_with_different_primary_key() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -150,7 +150,7 @@ async fn test_engine_create_with_different_primary_key() {
#[tokio::test]
async fn test_engine_create_with_options() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -172,7 +172,7 @@ async fn test_engine_create_with_options() {
#[tokio::test]
async fn test_engine_create_with_custom_store() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env
.create_engine_with_multiple_object_stores(MitoConfig::default(), None, None, &["Gcs"])
.await;
@@ -204,7 +204,7 @@ async fn test_engine_create_with_custom_store() {
#[tokio::test]
async fn test_engine_create_with_memtable_opts() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);

View File

@@ -35,7 +35,7 @@ use crate::worker::DROPPING_MARKER_FILE;
async fn test_engine_drop_region() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::with_prefix("drop");
let mut env = TestEnv::with_prefix("drop").await;
let listener = Arc::new(DropListener::new(Duration::from_millis(100)));
let engine = env
.create_engine_with(MitoConfig::default(), None, Some(listener.clone()))
@@ -143,7 +143,7 @@ async fn test_engine_drop_region_for_custom_store() {
put_rows(engine, region_id, rows).await;
flush_region(engine, region_id, None).await;
}
let mut env = TestEnv::with_prefix("drop");
let mut env = TestEnv::with_prefix("drop").await;
let listener = Arc::new(DropListener::new(Duration::from_millis(100)));
let engine = env
.create_engine_with_multiple_object_stores(

View File

@@ -33,7 +33,7 @@ use crate::test_util::{CreateRequestBuilder, TestEnv};
#[tokio::test]
async fn test_edit_region_schedule_compaction() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
struct EditRegionListener {
tx: Mutex<Option<oneshot::Sender<RegionId>>>,
@@ -122,7 +122,7 @@ async fn test_edit_region_schedule_compaction() {
#[tokio::test]
async fn test_edit_region_fill_cache() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
struct EditRegionListener {
tx: Mutex<Option<oneshot::Sender<FileId>>>,
@@ -241,7 +241,7 @@ async fn test_edit_region_concurrently() {
}
}
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
// Suppress the compaction to not impede the speed of this kinda stress testing.

View File

@@ -28,7 +28,7 @@ use crate::test_util::{
async fn test_scan_without_filtering_deleted() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);

View File

@@ -41,7 +41,7 @@ use crate::worker::MAX_INITIAL_CHECK_DELAY_SECS;
#[tokio::test]
async fn test_manual_flush() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -91,7 +91,7 @@ async fn test_manual_flush() {
#[tokio::test]
async fn test_flush_engine() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
let listener = Arc::new(FlushListener::default());
let engine = env
@@ -161,7 +161,7 @@ async fn test_flush_engine() {
#[tokio::test]
async fn test_write_stall() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
let listener = Arc::new(StallListener::default());
let engine = env
@@ -236,7 +236,7 @@ async fn test_write_stall() {
#[tokio::test]
async fn test_flush_empty() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
let engine = env
.create_engine_with(
@@ -289,7 +289,7 @@ async fn test_flush_reopen_region(factory: Option<LogStoreFactory>) {
return;
};
let mut env = TestEnv::new().with_log_store_factory(factory.clone());
let mut env = TestEnv::new().await.with_log_store_factory(factory.clone());
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
@@ -396,7 +396,7 @@ impl MockTimeProvider {
#[tokio::test]
async fn test_auto_flush_engine() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
let listener = Arc::new(FlushListener::default());
let now = current_time_millis();
@@ -467,7 +467,7 @@ async fn test_auto_flush_engine() {
#[tokio::test]
async fn test_flush_workers() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
let listener = Arc::new(FlushListener::default());
let engine = env
@@ -554,7 +554,7 @@ async fn test_update_topic_latest_entry_id(factory: Option<LogStoreFactory>) {
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
let listener = Arc::new(FlushListener::default());
let mut env = TestEnv::new().with_log_store_factory(factory.clone());
let mut env = TestEnv::new().await.with_log_store_factory(factory.clone());
let engine = env
.create_engine_with(
MitoConfig::default(),

View File

@@ -31,7 +31,7 @@ use crate::test_util::{
async fn test_merge_mode_write_query() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -89,7 +89,7 @@ async fn test_merge_mode_write_query() {
async fn test_merge_mode_compaction() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
..Default::default()

View File

@@ -36,7 +36,7 @@ use crate::test_util::{
#[tokio::test]
async fn test_engine_open_empty() {
let mut env = TestEnv::with_prefix("open-empty");
let mut env = TestEnv::with_prefix("open-empty").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -63,7 +63,7 @@ async fn test_engine_open_empty() {
#[tokio::test]
async fn test_engine_open_existing() {
let mut env = TestEnv::with_prefix("open-exiting");
let mut env = TestEnv::with_prefix("open-exiting").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -90,7 +90,7 @@ async fn test_engine_open_existing() {
#[tokio::test]
async fn test_engine_reopen_region() {
let mut env = TestEnv::with_prefix("reopen-region");
let mut env = TestEnv::with_prefix("reopen-region").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -107,7 +107,7 @@ async fn test_engine_reopen_region() {
#[tokio::test]
async fn test_engine_open_readonly() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -150,7 +150,7 @@ async fn test_engine_open_readonly() {
#[tokio::test]
async fn test_engine_region_open_with_options() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -190,7 +190,7 @@ async fn test_engine_region_open_with_options() {
#[tokio::test]
async fn test_engine_region_open_with_custom_store() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env
.create_engine_with_multiple_object_stores(MitoConfig::default(), None, None, &["Gcs"])
.await;
@@ -244,7 +244,7 @@ async fn test_engine_region_open_with_custom_store() {
#[tokio::test]
async fn test_open_region_skip_wal_replay() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -344,7 +344,7 @@ async fn test_open_region_skip_wal_replay() {
#[tokio::test]
async fn test_open_region_wait_for_opening_region_ok() {
let mut env = TestEnv::with_prefix("wait-for-opening-region-ok");
let mut env = TestEnv::with_prefix("wait-for-opening-region-ok").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let worker = engine.inner.workers.worker(region_id);
@@ -383,7 +383,7 @@ async fn test_open_region_wait_for_opening_region_ok() {
#[tokio::test]
async fn test_open_region_wait_for_opening_region_err() {
let mut env = TestEnv::with_prefix("wait-for-opening-region-err");
let mut env = TestEnv::with_prefix("wait-for-opening-region-err").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let worker = engine.inner.workers.worker(region_id);
@@ -428,7 +428,7 @@ async fn test_open_region_wait_for_opening_region_err() {
#[tokio::test]
async fn test_open_compaction_region() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let mut mito_config = MitoConfig::default();
mito_config
.sanitize(&env.data_home().display().to_string())

View File

@@ -73,7 +73,7 @@ async fn scan_in_parallel(
#[tokio::test]
async fn test_parallel_scan() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);

View File

@@ -53,7 +53,7 @@ fn build_rows_multi_tags_fields(
#[tokio::test]
async fn test_scan_projection() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);

View File

@@ -27,7 +27,7 @@ use crate::test_util::{
};
async fn check_prune_row_groups(exprs: Vec<Expr>, expected: &str) {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -147,7 +147,7 @@ fn time_range_expr(start_sec: i64, end_sec: i64) -> Expr {
#[tokio::test]
async fn test_prune_memtable() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -221,7 +221,7 @@ async fn test_prune_memtable() {
#[tokio::test]
async fn test_prune_memtable_complex_expr() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -274,7 +274,7 @@ async fn test_prune_memtable_complex_expr() {
#[tokio::test]
async fn test_mem_range_prune() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);

View File

@@ -25,7 +25,7 @@ use crate::test_util::{
};
async fn test_last_row(append_mode: bool) {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);

View File

@@ -27,7 +27,7 @@ use crate::test_util::{CreateRequestBuilder, TestEnv};
#[tokio::test]
async fn test_scan_with_min_sst_sequence() {
let mut env = TestEnv::with_prefix("test_scan_with_min_sst_sequence");
let mut env = TestEnv::with_prefix("test_scan_with_min_sst_sequence").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -153,7 +153,7 @@ async fn test_scan_with_min_sst_sequence() {
#[tokio::test]
async fn test_series_scan() {
let mut env = TestEnv::with_prefix("test_series_scan");
let mut env = TestEnv::with_prefix("test_series_scan").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);

View File

@@ -32,7 +32,7 @@ async fn test_set_role_state_gracefully() {
SettableRegionRoleState::DowngradingLeader,
];
for settable_role_state in settable_role_states {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -101,7 +101,7 @@ async fn test_set_role_state_gracefully() {
#[tokio::test]
async fn test_set_role_state_gracefully_not_exist() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let non_exist_region_id = RegionId::new(1, 1);
@@ -116,7 +116,7 @@ async fn test_set_role_state_gracefully_not_exist() {
#[tokio::test]
async fn test_write_downgrading_region() {
let mut env = TestEnv::with_prefix("write-to-downgrading-region");
let mut env = TestEnv::with_prefix("write-to-downgrading-region").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);

Some files were not shown because too many files have changed in this diff Show More