Compare commits

..

35 Commits

Author SHA1 Message Date
Weny Xu
4bb9ceb63b chore: cherry pick patches to release/v0.17.0 branch (#7024)
* fix: print the output message of the error in admin fn macro (#6994)

Signed-off-by: evenyag <realevenyag@gmail.com>
Signed-off-by: discord9 <discord9@163.com>

* fix: make EXPIRE (keyword) parsing case-insensitive, when creating flow (#6997)

fix: make EXPIRE keyword case-insensitive in CREATE FLOW parser

Signed-off-by: Shyamnatesan <shyamnatesan21@gmail.com>
Signed-off-by: discord9 <discord9@163.com>

* fix: promql range function has incorrect timestamps (#7006)

* fix: promql range function has incorrect timestamps

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* simplify

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: discord9 <discord9@163.com>

* fix: incorrect timestamp resolution in information_schema.partitions table (#7004)

* fix: incorrect timestamp resolution in information_schema.partitions table

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* use second for all fields in partitions table

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update sqlness result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: discord9 <discord9@163.com>

* fix: match promql column reference in case sensitive way (#7013)

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: discord9 <discord9@163.com>

* fix: group by expr not as column in step aggr (#7008)

* fix: group by expr not as column

Signed-off-by: discord9 <discord9@163.com>

* test: dist analyzer date_bin

Signed-off-by: discord9 <discord9@163.com>

* ???fix wip

Signed-off-by: discord9 <discord9@163.com>

* fix: deduce using correct input fields

Signed-off-by: discord9 <discord9@163.com>

* refactor: clearer wrapper

Signed-off-by: discord9 <discord9@163.com>

* chore: update sqlness

Signed-off-by: discord9 <discord9@163.com>

* chore: per review

Signed-off-by: discord9 <discord9@163.com>

* chore: per review

Signed-off-by: discord9 <discord9@163.com>

* chore: rm todo

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>

* fix: skip placeholder when partition columns (#7020)

Signed-off-by: discord9 <discord9@163.com>

* chore: add function for getting started on metasrv (#7022)

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
Signed-off-by: discord9 <discord9@163.com>

* fix: not step when aggr have order by/filter (#7015)

* fix: not applied

Signed-off-by: discord9 <discord9@163.com>

* chore: per review

Signed-off-by: discord9 <discord9@163.com>

* test: confirm order by not push down

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>

* feat: supports expression in TQL params (#7014)

* feat: supports expression in TQL params

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: by cr comments

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: comment

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: by cr comments

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

---------

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
Signed-off-by: discord9 <discord9@163.com>

* feat: update dashboard to v0.11.6 (#7026)

Co-authored-by: ZonaHex <ZonaHex@users.noreply.github.com>
Signed-off-by: discord9 <discord9@163.com>

* fix: step aggr merge phase not order nor filter (#6998)

* fix: not order

Signed-off-by: discord9 <discord9@163.com>

* test: redacted

Signed-off-by: discord9 <discord9@163.com>

* feat: fix up state wrapper

Signed-off-by: discord9 <discord9@163.com>

* df last_value state not as promised!

Signed-off-by: discord9 <discord9@163.com>

* fix?: could fix better

Signed-off-by: discord9 <discord9@163.com>

* test: unstable result

Signed-off-by: discord9 <discord9@163.com>

* fix: work around by fixing state

Signed-off-by: discord9 <discord9@163.com>

* chore: after rebase fix

Signed-off-by: discord9 <discord9@163.com>

* chore: finish some todo

Signed-off-by: discord9 <discord9@163.com>

* chore: per copilot

Signed-off-by: discord9 <discord9@163.com>

* refactor: not fix but just notify mismatch

Signed-off-by: discord9 <discord9@163.com>

* chore: warn -> debug state mismatch

Signed-off-by: discord9 <discord9@163.com>

* chore: refine error msg

Signed-off-by: discord9 <discord9@163.com>

* test: sqlness add last_value date_bin test

Signed-off-by: discord9 <discord9@163.com>

* ?: substrait order by decode failure

Signed-off-by: discord9 <discord9@163.com>

* unit test reproduce that

Signed-off-by: discord9 <discord9@163.com>

* feat: support state wrapper's order serde in substrait

Signed-off-by: discord9 <discord9@163.com>

* refactor: stuff

Signed-off-by: discord9 <discord9@163.com>

* test: standalone/distributed different exec

Signed-off-by: discord9 <discord9@163.com>

* fmt

Signed-off-by: discord9 <discord9@163.com>

* chore: per review

Signed-off-by: discord9 <discord9@163.com>

* refactor: closure

Signed-off-by: discord9 <discord9@163.com>

* test: first value order by

Signed-off-by: discord9 <discord9@163.com>

* refactor: per cr

Signed-off-by: discord9 <discord9@163.com>

* feat: ScanHint last_value last row selector

Signed-off-by: discord9 <discord9@163.com>

* docs: per cr

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>

* chore: bump version to 0.17.2

Signed-off-by: WenyXu <wenymedia@gmail.com>
Signed-off-by: discord9 <discord9@163.com>

* chore: not warning

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: Shyamnatesan <shyamnatesan21@gmail.com>
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: shuiyisong <xixing.sys@gmail.com>
Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
Signed-off-by: WenyXu <wenymedia@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
Co-authored-by: shyam <43544082+Shyamnatesan@users.noreply.github.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: discord9 <55937128+discord9@users.noreply.github.com>
Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>
Co-authored-by: dennis zhuang <killme2008@gmail.com>
Co-authored-by: ZonaHe <zonahe@qq.com>
Co-authored-by: ZonaHex <ZonaHex@users.noreply.github.com>
Co-authored-by: discord9 <discord9@163.com>
2025-09-28 16:21:06 +08:00
WenyXu
38456638f8 chore: bump version to 0.17.1
Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-09-17 16:42:28 +08:00
Yingwen
97c0b1f5c1 chore: reduce SeriesScan sender timeout (#6983)
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-09-17 16:42:28 +08:00
shuiyisong
4fc7f12360 fix: OTel metrics naming wiht Prometheus style (#6982)
* fix: otel metrics naming

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* fix: otel metrics naming & add some tests

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
2025-09-17 16:42:28 +08:00
dennis zhuang
ed17997449 test: migrate join tests from duckdb, part3 (#6881)
* test: migrate join tests

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: update test results after rebasing main branch

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: unstable query sort results and natural_join test

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: count(*) with joining

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: unstable query sort results and style

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

---------

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
2025-09-17 16:42:28 +08:00
Lei, HUANG
849ae8ebb6 fix: avoid truncating SST statistics during flush (#6977)
fix/disable-parquet-stats-truncate:
 - **Update `memcomparable` Dependency**: Switched from crates.io to a Git repository for `memcomparable` in `Cargo.lock`, `mito-codec/Cargo.toml`, and removed it from `mito2/Cargo.toml`.
 - **Enhance Parquet Writer Properties**: Added `set_statistics_truncate_length` and `set_column_index_truncate_length` to `WriterProperties` in `parquet.rs`, `bulk/part.rs`, `partition_tree/data.rs`, and `writer.rs`.
 - **Add Test for Corrupt Scan**: Introduced a new test module `scan_corrupt.rs` in `mito2/src/engine` to verify handling of corrupt data.
 - **Update Test Data**: Modified test data in `flush.rs` to reflect changes in file sizes and sequences.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-09-17 16:42:28 +08:00
Zhenchi
a0587e2e87 fix: clean intm ignore notfound (#6971)
* fix: clean intm ignore notfound

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-09-17 16:42:28 +08:00
discord9
1ed71169ac feat: support SubqueryAlias pushdown (#6963)
* wip enforce dist requirement rewriter

Signed-off-by: discord9 <discord9@163.com>

* feat: enforce dist req

Signed-off-by: discord9 <discord9@163.com>

* test: sqlness result

Signed-off-by: discord9 <discord9@163.com>

* fix: double projection

Signed-off-by: discord9 <discord9@163.com>

* test: fix sqlness

Signed-off-by: discord9 <discord9@163.com>

* refactor: per review

Signed-off-by: discord9 <discord9@163.com>

* docs: use btree map

Signed-off-by: discord9 <discord9@163.com>

* test: sqlness explain&comment

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
2025-09-17 16:42:28 +08:00
shuiyisong
e62f0e2b64 fix: deadlock in dashmap (#6978)
* fix: deadlock in dashmap

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* Update src/frontend/src/instance.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

* chore: extract fast cache check and add test

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
2025-09-17 16:42:28 +08:00
Weny Xu
f92e753a34 feat: add postgres tls support for CLI (#6941)
* feat: add postgres tls support for cli

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-09-17 16:42:28 +08:00
Ruihang Xia
a22b016f90 feat: skip compaction on large file on append only mode (#6838)
* feat: skip compaction on large file on append only mode

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* log ignored files

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* format

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* only ignore level 1 files

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* early exit

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-09-17 16:42:28 +08:00
shuiyisong
7a9fa99069 fix: shorten lock time (#6968) 2025-09-17 16:42:28 +08:00
ZonaHe
d808e7be7e feat: update dashboard to v0.11.4 (#6956)
Co-authored-by: sunchanglong <sunchanglong@users.noreply.github.com>
2025-09-17 16:42:28 +08:00
fys
8e22fcfd5c fix: correct jemalloc metrics (#6959)
The allocated and resident metrics were swapped in the set calls. This commit
fixes the issue by ensuring each metric receives its corresponding value.
2025-09-17 16:42:28 +08:00
zyy17
26729c31a6 fix: use pull_request_target to fix add labels 403 error (#6958)
Signed-off-by: zyy17 <zyylsxm@gmail.com>
2025-09-17 16:42:28 +08:00
discord9
b73617eaba feat(query): better alias tracker (#6909)
* better resolve

Signed-off-by: discord9 <discord9@163.com>

feat: layered alias tracker

Signed-off-by: discord9 <discord9@163.com>

refactor

Signed-off-by: discord9 <discord9@163.com>

docs: expalin for no offset by one

Signed-off-by: discord9 <discord9@163.com>

test: more

Signed-off-by: discord9 <discord9@163.com>

simpify api

Signed-off-by: discord9 <discord9@163.com>

wip

Signed-off-by: discord9 <discord9@163.com>

fix: filter non-exist columns

Signed-off-by: discord9 <discord9@163.com>

feat: stuff

Signed-off-by: discord9 <discord9@163.com>

feat: cache partition columns

Signed-off-by: discord9 <discord9@163.com>

refactor: rm unused fn

Signed-off-by: discord9 <discord9@163.com>

no need res

Signed-off-by: discord9 <discord9@163.com>

chore: rm unwrap&docs update

Signed-off-by: discord9 <discord9@163.com>

* chore: after rebase fix

Signed-off-by: discord9 <discord9@163.com>

* refactor: per review

Signed-off-by: discord9 <discord9@163.com>

* fix: unsupport part

Signed-off-by: discord9 <discord9@163.com>

* err msg

Signed-off-by: discord9 <discord9@163.com>

* fix: pass correct partition cols

Signed-off-by: discord9 <discord9@163.com>

* fix? use column name only

Signed-off-by: discord9 <discord9@163.com>

* fix: merge scan has partition columns no alias/no partition diff

Signed-off-by: discord9 <discord9@163.com>

* refactor: loop instead of recursive

Signed-off-by: discord9 <discord9@163.com>

* refactor: per review

Signed-off-by: discord9 <discord9@163.com>

* feat: overlaps

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
2025-09-17 16:42:28 +08:00
discord9
3b909f63e3 fix: count(1) instead of count(ts) when >1 inputs (#6952)
Signed-off-by: discord9 <discord9@163.com>
2025-09-17 16:42:28 +08:00
dennis zhuang
0d4e07eddd fix: unstable query sort results (#6944)
Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
2025-09-17 16:42:28 +08:00
dennis zhuang
b94ce9019d test: migrate duckdb tests, part 1 (#6870)
* test: migrate duckdb tests

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: style

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* test: add more duckdb tests

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: stable order

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: simplfy comments

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: remove tests/cases/standalone/common/DUCKDB_MIGRATION_GUIDE.md

* fix: incorrect_sql.sql

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: integer flow test

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: integer flow test

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* docs: add todo

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

---------

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
2025-09-17 16:42:28 +08:00
Cason Kervis
3dcd40c4ba fix(path): fix program lookup failure on Windows CI (#6946)
* fix(path): fix program lookup failure on Windows CI

Signed-off-by: Cason Kervis <cscnk52@outlook.com>

* fix(path): fix program exec name

Signed-off-by: Cason Kervis <cscnk52@outlook.com>

* fix(path): using absolute path

Signed-off-by: Cason Kervis <cscnk52@outlook.com>

* style: using fmt

Signed-off-by: Cason Kervis <cscnk52@outlook.com>

---------

Signed-off-by: Cason Kervis <cscnk52@outlook.com>
2025-09-17 16:42:28 +08:00
Ruihang Xia
a67803d0e9 fix: handle hash distribution properly (#6943)
* fix: handle hash distribution properly

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Update src/query/src/optimizer/pass_distribution.rs

Co-authored-by: dennis zhuang <killme2008@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: dennis zhuang <killme2008@gmail.com>
2025-09-17 16:42:28 +08:00
Ruihang Xia
aa7e7942f8 fix: wrap tql cte in a subquery alias (#6910)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-09-17 16:42:28 +08:00
dennis zhuang
f1b7581dc3 test: adds approx_percentile_cont to range query test (#6903)
Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
2025-09-17 16:42:28 +08:00
dennis zhuang
cd761df369 test: migrate duckdb tests part2, window functions (#6875)
* test: migrate window tests

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: blank line at the end

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

---------

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
2025-09-17 16:42:28 +08:00
liyang
0cea6ae64d fix: fix deploy greptimedb in sqlness-test (#6894)
Signed-off-by: liyang <daviderli614@gmail.com>
2025-09-17 16:42:28 +08:00
Weny Xu
8bf772fb50 chore: disable stats persistence by default (#6900)
* chore: disable stats persistence by default

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: fix clippy

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-09-03 21:25:54 +08:00
discord9
9c1240921d feat: flow full aggr only trigger on new data (#6880)
* fix: flow full aggr only trigger on new data

Signed-off-by: discord9 <discord9@163.com>

* chore: better debug msg

Signed-off-by: discord9 <discord9@163.com>

* refactor: per review

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-09-03 18:15:17 +08:00
Zhenchi
eb52129a91 fix: move prune_region_dir to region drop (#6891)
* fix: move prune_region_dir to region drop

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-09-03 15:50:43 +08:00
Weny Xu
a0a2b40cbe fix: initialize remote WAL regions with correct flushed entry IDs (#6856)
* fix: initialize remote WAL regions with correct flushed entry IDs

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: add logs

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: correct latest offset

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: update sqlness

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: add replay checkpoint to catchup request

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: logs

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-09-03 15:50:43 +08:00
Weny Xu
067c4458d6 chore: fix typo (#6887)
Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-09-03 15:50:43 +08:00
Ruihang Xia
4e9c31bf5c chore: fix typo (#6885)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-09-03 15:50:43 +08:00
Weny Xu
9320a6ddaa chore: update dashboard (#6883)
* chore: update dashboard

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: update json

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: update json

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: add desc

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-09-03 15:50:43 +08:00
Zhenchi
4c9fcb7dee fix: prune intermediate dirs on index finish and region pruge (#6878)
* fix: prune intermediate dirs on index finish and region pruge

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-09-03 15:50:43 +08:00
Weny Xu
9dc16772fe fix: ignore reserved column IDs and prevent panic on chunk_size is zero (#6882)
Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-09-03 15:50:43 +08:00
discord9
6ee91f6af4 fix(flow): promql auto create table (#6867)
* fix: non aggr prom ql auto create table

Signed-off-by: discord9 <discord9@163.com>

* feat: val column use any name

Signed-off-by: discord9 <discord9@163.com>

* feat: check if it's tql src table

Signed-off-by: discord9 <discord9@163.com>

* test: check sink table is tql-able

Signed-off-by: discord9 <discord9@163.com>

* test: sqlness redacted

Signed-off-by: discord9 <discord9@163.com>

* fix: sql also handle no aggr case

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-09-03 15:50:43 +08:00
1230 changed files with 22566 additions and 16174 deletions

View File

@@ -2,7 +2,7 @@
linker = "aarch64-linux-gnu-gcc"
[alias]
sqlness = "run --bin sqlness-runner --target-dir target/sqlness --"
sqlness = "run --bin sqlness-runner --"
[unstable.git]
shallow_index = true

View File

@@ -1,7 +1,7 @@
name: "Semantic Pull Request"
on:
pull_request:
pull_request_target:
types:
- opened
- reopened
@@ -12,9 +12,9 @@ concurrency:
cancel-in-progress: true
permissions:
issues: write
contents: write
contents: read
pull-requests: write
issues: write
jobs:
check:

171
Cargo.lock generated
View File

@@ -218,7 +218,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
[[package]]
name = "api"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"common-base",
"common-decimal",
@@ -737,7 +737,7 @@ dependencies = [
[[package]]
name = "auth"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"api",
"async-trait",
@@ -1387,7 +1387,7 @@ dependencies = [
[[package]]
name = "cache"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"catalog",
"common-error",
@@ -1422,7 +1422,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "catalog"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"api",
"arrow",
@@ -1447,7 +1447,6 @@ dependencies = [
"common-telemetry",
"common-time",
"common-version",
"common-workload",
"dashmap",
"datafusion",
"datatypes",
@@ -1465,7 +1464,6 @@ dependencies = [
"promql-parser",
"rand 0.9.1",
"rustc-hash 2.1.1",
"serde",
"serde_json",
"session",
"snafu 0.8.6",
@@ -1765,7 +1763,7 @@ checksum = "b94f61472cee1439c0b966b47e3aca9ae07e45d070759512cd390ea2bebc6675"
[[package]]
name = "cli"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"async-stream",
"async-trait",
@@ -1809,7 +1807,7 @@ dependencies = [
"session",
"snafu 0.8.6",
"store-api",
"substrait 0.18.0",
"substrait 0.17.2",
"table",
"tempfile",
"tokio",
@@ -1818,7 +1816,7 @@ dependencies = [
[[package]]
name = "client"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"api",
"arc-swap",
@@ -1850,7 +1848,7 @@ dependencies = [
"serde_json",
"snafu 0.8.6",
"store-api",
"substrait 0.18.0",
"substrait 0.17.2",
"substrait 0.37.3",
"tokio",
"tokio-stream",
@@ -1891,7 +1889,7 @@ dependencies = [
[[package]]
name = "cmd"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"async-trait",
"auth",
@@ -1953,7 +1951,7 @@ dependencies = [
"snafu 0.8.6",
"stat",
"store-api",
"substrait 0.18.0",
"substrait 0.17.2",
"table",
"temp-env",
"tempfile",
@@ -1999,7 +1997,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
[[package]]
name = "common-base"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"anymap2",
"async-trait",
@@ -2021,11 +2019,11 @@ dependencies = [
[[package]]
name = "common-catalog"
version = "0.18.0"
version = "0.17.2"
[[package]]
name = "common-config"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"common-base",
"common-error",
@@ -2051,7 +2049,7 @@ dependencies = [
[[package]]
name = "common-datasource"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"arrow",
"arrow-schema",
@@ -2086,7 +2084,7 @@ dependencies = [
[[package]]
name = "common-decimal"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"bigdecimal 0.4.8",
"common-error",
@@ -2099,7 +2097,7 @@ dependencies = [
[[package]]
name = "common-error"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"common-macro",
"http 1.3.1",
@@ -2110,7 +2108,7 @@ dependencies = [
[[package]]
name = "common-event-recorder"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"api",
"async-trait",
@@ -2132,7 +2130,7 @@ dependencies = [
[[package]]
name = "common-frontend"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"api",
"async-trait",
@@ -2154,7 +2152,7 @@ dependencies = [
[[package]]
name = "common-function"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"ahash 0.8.12",
"api",
@@ -2194,6 +2192,7 @@ dependencies = [
"nalgebra",
"num",
"num-traits",
"once_cell",
"paste",
"pretty_assertions",
"s2",
@@ -2211,7 +2210,7 @@ dependencies = [
[[package]]
name = "common-greptimedb-telemetry"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"async-trait",
"common-runtime",
@@ -2228,7 +2227,7 @@ dependencies = [
[[package]]
name = "common-grpc"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"api",
"arrow-flight",
@@ -2261,7 +2260,7 @@ dependencies = [
[[package]]
name = "common-grpc-expr"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"api",
"common-base",
@@ -2281,7 +2280,7 @@ dependencies = [
[[package]]
name = "common-macro"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"greptime-proto",
"once_cell",
@@ -2292,7 +2291,7 @@ dependencies = [
[[package]]
name = "common-mem-prof"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"anyhow",
"common-error",
@@ -2308,7 +2307,7 @@ dependencies = [
[[package]]
name = "common-meta"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"anymap2",
"api",
@@ -2380,7 +2379,7 @@ dependencies = [
[[package]]
name = "common-options"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"common-grpc",
"humantime-serde",
@@ -2389,11 +2388,11 @@ dependencies = [
[[package]]
name = "common-plugins"
version = "0.18.0"
version = "0.17.2"
[[package]]
name = "common-pprof"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"common-error",
"common-macro",
@@ -2405,7 +2404,7 @@ dependencies = [
[[package]]
name = "common-procedure"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"api",
"async-stream",
@@ -2434,7 +2433,7 @@ dependencies = [
[[package]]
name = "common-procedure-test"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"async-trait",
"common-procedure",
@@ -2444,7 +2443,7 @@ dependencies = [
[[package]]
name = "common-query"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"api",
"async-trait",
@@ -2469,14 +2468,12 @@ dependencies = [
[[package]]
name = "common-recordbatch"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"arc-swap",
"common-base",
"common-error",
"common-macro",
"common-telemetry",
"common-time",
"datafusion",
"datafusion-common",
"datatypes",
@@ -2492,7 +2489,7 @@ dependencies = [
[[package]]
name = "common-runtime"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"async-trait",
"clap 4.5.40",
@@ -2521,7 +2518,7 @@ dependencies = [
[[package]]
name = "common-session"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"serde",
"strum 0.27.1",
@@ -2529,7 +2526,7 @@ dependencies = [
[[package]]
name = "common-sql"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"common-base",
"common-decimal",
@@ -2547,7 +2544,7 @@ dependencies = [
[[package]]
name = "common-telemetry"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"backtrace",
"common-base",
@@ -2576,7 +2573,7 @@ dependencies = [
[[package]]
name = "common-test-util"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"client",
"common-grpc",
@@ -2589,7 +2586,7 @@ dependencies = [
[[package]]
name = "common-time"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"arrow",
"chrono",
@@ -2607,7 +2604,7 @@ dependencies = [
[[package]]
name = "common-version"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"build-data",
"cargo-manifest",
@@ -2618,7 +2615,7 @@ dependencies = [
[[package]]
name = "common-wal"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"common-base",
"common-error",
@@ -2641,7 +2638,7 @@ dependencies = [
[[package]]
name = "common-workload"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"common-telemetry",
"serde",
@@ -3868,7 +3865,7 @@ dependencies = [
[[package]]
name = "datanode"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"api",
"arrow-flight",
@@ -3921,7 +3918,7 @@ dependencies = [
"session",
"snafu 0.8.6",
"store-api",
"substrait 0.18.0",
"substrait 0.17.2",
"table",
"tokio",
"toml 0.8.23",
@@ -3931,7 +3928,7 @@ dependencies = [
[[package]]
name = "datatypes"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"arrow",
"arrow-array",
@@ -4605,7 +4602,7 @@ checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
[[package]]
name = "file-engine"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"api",
"async-trait",
@@ -4737,7 +4734,7 @@ checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8"
[[package]]
name = "flow"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"api",
"arrow",
@@ -4804,7 +4801,7 @@ dependencies = [
"sql",
"store-api",
"strum 0.27.1",
"substrait 0.18.0",
"substrait 0.17.2",
"table",
"tokio",
"tonic 0.13.1",
@@ -4859,7 +4856,7 @@ checksum = "28dd6caf6059519a65843af8fe2a3ae298b14b80179855aeb4adc2c1934ee619"
[[package]]
name = "frontend"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"api",
"arc-swap",
@@ -4922,7 +4919,7 @@ dependencies = [
"sqlparser 0.55.0-greptime",
"store-api",
"strfmt",
"substrait 0.18.0",
"substrait 0.17.2",
"table",
"tokio",
"tokio-util",
@@ -5302,7 +5299,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=f9836cf8aab30e672f640c6ef4c1cfd2cf9fbc36#f9836cf8aab30e672f640c6ef4c1cfd2cf9fbc36"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=66eb089afa6baaa3ddfafabd0a4abbe317d012c3#66eb089afa6baaa3ddfafabd0a4abbe317d012c3"
dependencies = [
"prost 0.13.5",
"prost-types 0.13.5",
@@ -6064,7 +6061,7 @@ dependencies = [
[[package]]
name = "index"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"async-trait",
"asynchronous-codec",
@@ -7004,7 +7001,7 @@ checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94"
[[package]]
name = "log-query"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"chrono",
"common-error",
@@ -7016,7 +7013,7 @@ dependencies = [
[[package]]
name = "log-store"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"async-stream",
"async-trait",
@@ -7287,8 +7284,7 @@ checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0"
[[package]]
name = "memcomparable"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "376101dbd964fc502d5902216e180f92b3d003b5cc3d2e40e044eb5470fca677"
source = "git+https://github.com/v0y4g3r/memcomparable.git?rev=a07122dc03556bbd88ad66234cbea7efd3b23efb#a07122dc03556bbd88ad66234cbea7efd3b23efb"
dependencies = [
"bytes",
"serde",
@@ -7324,7 +7320,7 @@ dependencies = [
[[package]]
name = "meta-client"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"api",
"async-trait",
@@ -7352,7 +7348,7 @@ dependencies = [
[[package]]
name = "meta-srv"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"api",
"async-trait",
@@ -7448,7 +7444,7 @@ dependencies = [
[[package]]
name = "metric-engine"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"api",
"aquamarine",
@@ -7541,7 +7537,7 @@ dependencies = [
[[package]]
name = "mito-codec"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"api",
"bytes",
@@ -7565,7 +7561,7 @@ dependencies = [
[[package]]
name = "mito2"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"api",
"aquamarine",
@@ -7594,7 +7590,6 @@ dependencies = [
"crc32fast",
"criterion 0.4.0",
"crossbeam-utils",
"dashmap",
"datafusion",
"datafusion-common",
"datafusion-expr",
@@ -7607,12 +7602,10 @@ dependencies = [
"itertools 0.14.0",
"lazy_static",
"log-store",
"memcomparable",
"mito-codec",
"moka",
"object-store",
"parquet",
"partition",
"paste",
"pin-project",
"prometheus",
@@ -8302,7 +8295,7 @@ dependencies = [
[[package]]
name = "object-store"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"anyhow",
"bytes",
@@ -8587,7 +8580,7 @@ dependencies = [
[[package]]
name = "operator"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"ahash 0.8.12",
"api",
@@ -8645,7 +8638,7 @@ dependencies = [
"sql",
"sqlparser 0.55.0-greptime",
"store-api",
"substrait 0.18.0",
"substrait 0.17.2",
"table",
"tokio",
"tokio-util",
@@ -8957,7 +8950,7 @@ dependencies = [
[[package]]
name = "partition"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"api",
"async-trait",
@@ -9296,7 +9289,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pipeline"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"ahash 0.8.12",
"api",
@@ -9452,7 +9445,7 @@ dependencies = [
[[package]]
name = "plugins"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"auth",
"clap 4.5.40",
@@ -9750,7 +9743,7 @@ dependencies = [
[[package]]
name = "promql"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"ahash 0.8.12",
"async-trait",
@@ -10033,7 +10026,7 @@ dependencies = [
[[package]]
name = "puffin"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"async-compression 0.4.19",
"async-trait",
@@ -10075,7 +10068,7 @@ dependencies = [
[[package]]
name = "query"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"ahash 0.8.12",
"api",
@@ -10140,7 +10133,7 @@ dependencies = [
"sql",
"sqlparser 0.55.0-greptime",
"store-api",
"substrait 0.18.0",
"substrait 0.17.2",
"table",
"tokio",
"tokio-stream",
@@ -11504,7 +11497,7 @@ dependencies = [
[[package]]
name = "servers"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"ahash 0.8.12",
"api",
@@ -11575,7 +11568,6 @@ dependencies = [
"openmetrics-parser",
"opensrv-mysql",
"opentelemetry-proto",
"operator",
"otel-arrow-rust",
"parking_lot 0.12.4",
"permutation",
@@ -11628,7 +11620,7 @@ dependencies = [
[[package]]
name = "session"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"ahash 0.8.12",
"api",
@@ -11956,7 +11948,7 @@ dependencies = [
[[package]]
name = "sql"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"api",
"arrow-buffer",
@@ -12014,7 +12006,7 @@ dependencies = [
[[package]]
name = "sqlness-runner"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"async-trait",
"clap 4.5.40",
@@ -12314,7 +12306,7 @@ dependencies = [
[[package]]
name = "stat"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"nix 0.30.1",
]
@@ -12327,7 +12319,7 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "store-api"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"api",
"aquamarine",
@@ -12474,11 +12466,12 @@ dependencies = [
[[package]]
name = "substrait"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"async-trait",
"bytes",
"common-error",
"common-function",
"common-macro",
"common-telemetry",
"datafusion",
@@ -12642,7 +12635,7 @@ dependencies = [
[[package]]
name = "table"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"api",
"async-trait",
@@ -12911,7 +12904,7 @@ checksum = "8f50febec83f5ee1df3015341d8bd429f2d1cc62bcba7ea2076759d315084683"
[[package]]
name = "tests-fuzz"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"arbitrary",
"async-trait",
@@ -12955,7 +12948,7 @@ dependencies = [
[[package]]
name = "tests-integration"
version = "0.18.0"
version = "0.17.2"
dependencies = [
"api",
"arrow-flight",
@@ -13027,7 +13020,7 @@ dependencies = [
"sql",
"sqlx",
"store-api",
"substrait 0.18.0",
"substrait 0.17.2",
"table",
"tempfile",
"time",

View File

@@ -73,8 +73,8 @@ members = [
resolver = "2"
[workspace.package]
version = "0.18.0"
edition = "2024"
version = "0.17.2"
edition = "2021"
license = "Apache-2.0"
[workspace.lints]
@@ -145,7 +145,7 @@ etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62d
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "f9836cf8aab30e672f640c6ef4c1cfd2cf9fbc36" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "66eb089afa6baaa3ddfafabd0a4abbe317d012c3" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -103,7 +103,6 @@
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `./greptimedb_data` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
| `storage.enable_read_cache` | Bool | `true` | Whether to enable read cache. If not set, the read cache will be enabled by default. |
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. |
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |
| `storage.bucket` | String | Unset | The S3 bucket name.<br/>**It's only used when the storage type is `S3`, `Oss` and `Gcs`**. |
@@ -495,7 +494,6 @@
| `storage.data_home` | String | `./greptimedb_data` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. |
| `storage.enable_read_cache` | Bool | `true` | Whether to enable read cache. If not set, the read cache will be enabled by default. |
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |
| `storage.bucket` | String | Unset | The S3 bucket name.<br/>**It's only used when the storage type is `S3`, `Oss` and `Gcs`**. |
| `storage.root` | String | Unset | The S3 data will be stored in the specified prefix, for example, `s3://${bucket}/${root}`.<br/>**It's only used when the storage type is `S3`, `Oss` and `Azblob`**. |

View File

@@ -274,9 +274,6 @@ type = "File"
## @toml2docs:none-default
#+ cache_path = ""
## Whether to enable read cache. If not set, the read cache will be enabled by default.
enable_read_cache = true
## The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger.
## @toml2docs:none-default
cache_capacity = "5GiB"

View File

@@ -361,9 +361,6 @@ data_home = "./greptimedb_data"
## - `Oss`: the data is stored in the Aliyun OSS.
type = "File"
## Whether to enable read cache. If not set, the read cache will be enabled by default.
enable_read_cache = true
## Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.
## A local file directory, defaults to `{data_home}`. An empty string means disabling.
## @toml2docs:none-default

View File

@@ -19,8 +19,8 @@ use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_time::timestamp::TimeUnit;
use datatypes::prelude::ConcreteDataType;
use snafu::Location;
use snafu::prelude::*;
use snafu::Location;
pub type Result<T> = std::result::Result<T, Error>;

View File

@@ -16,15 +16,15 @@ use std::collections::HashSet;
use std::sync::Arc;
use common_base::BitVec;
use common_decimal::Decimal128;
use common_decimal::decimal128::{DECIMAL128_DEFAULT_SCALE, DECIMAL128_MAX_PRECISION};
use common_decimal::Decimal128;
use common_time::time::Time;
use common_time::timestamp::TimeUnit;
use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp};
use datatypes::prelude::{ConcreteDataType, ValueRef};
use datatypes::scalars::ScalarVector;
use datatypes::types::{
Int8Type, Int16Type, IntervalType, TimeType, TimestampType, UInt8Type, UInt16Type,
Int16Type, Int8Type, IntervalType, TimeType, TimestampType, UInt16Type, UInt8Type,
};
use datatypes::value::{OrderedF32, OrderedF64, Value};
use datatypes::vectors::{
@@ -295,7 +295,7 @@ impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
| ConcreteDataType::Struct(_)
| ConcreteDataType::Dictionary(_)
| ConcreteDataType::Duration(_) => {
return error::IntoColumnDataTypeSnafu { from: datatype }.fail();
return error::IntoColumnDataTypeSnafu { from: datatype }.fail()
}
};
let datatype_extension = match column_datatype {

View File

@@ -15,9 +15,9 @@
use std::collections::HashMap;
use datatypes::schema::{
COMMENT_KEY, ColumnDefaultConstraint, ColumnSchema, FULLTEXT_KEY, FulltextAnalyzer,
FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, SKIPPING_INDEX_KEY, SkippingIndexOptions,
SkippingIndexType,
ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextBackend, FulltextOptions,
SkippingIndexOptions, SkippingIndexType, COMMENT_KEY, FULLTEXT_KEY, INVERTED_INDEX_KEY,
SKIPPING_INDEX_KEY,
};
use greptime_proto::v1::{
Analyzer, FulltextBackend as PbFulltextBackend, SkippingIndexType as PbSkippingIndexType,

View File

@@ -17,13 +17,13 @@ use std::sync::Arc;
use common_base::secrets::SecretString;
use digest::Digest;
use sha1::Sha1;
use snafu::{OptionExt, ensure};
use snafu::{ensure, OptionExt};
use crate::error::{IllegalParamSnafu, InvalidConfigSnafu, Result, UserPasswordMismatchSnafu};
use crate::user_info::DefaultUserInfo;
use crate::user_provider::static_user_provider::{STATIC_USER_PROVIDER, StaticUserProvider};
use crate::user_provider::static_user_provider::{StaticUserProvider, STATIC_USER_PROVIDER};
use crate::user_provider::watch_file_user_provider::{
WATCH_FILE_USER_PROVIDER, WatchFileUserProvider,
WatchFileUserProvider, WATCH_FILE_USER_PROVIDER,
};
use crate::{UserInfoRef, UserProviderRef};

View File

@@ -22,13 +22,13 @@ mod user_provider;
pub mod tests;
pub use common::{
HashedPassword, Identity, Password, auth_mysql, static_user_provider_from_option,
user_provider_from_option, userinfo_by_name,
auth_mysql, static_user_provider_from_option, user_provider_from_option, userinfo_by_name,
HashedPassword, Identity, Password,
};
pub use permission::{PermissionChecker, PermissionReq, PermissionResp};
pub use user_info::UserInfo;
pub use user_provider::UserProvider;
pub use user_provider::static_user_provider::StaticUserProvider;
pub use user_provider::UserProvider;
/// pub type alias
pub type UserInfoRef = std::sync::Arc<dyn UserInfo>;

View File

@@ -21,7 +21,7 @@ use crate::error::{
UserPasswordMismatchSnafu,
};
use crate::user_info::DefaultUserInfo;
use crate::{Identity, Password, UserInfoRef, UserProvider, auth_mysql};
use crate::{auth_mysql, Identity, Password, UserInfoRef, UserProvider};
pub struct DatabaseAuthInfo<'a> {
pub catalog: &'a str,

View File

@@ -22,7 +22,7 @@ use std::io::BufRead;
use std::path::Path;
use common_base::secrets::ExposeSecret;
use snafu::{OptionExt, ResultExt, ensure};
use snafu::{ensure, OptionExt, ResultExt};
use crate::common::{Identity, Password};
use crate::error::{
@@ -30,7 +30,7 @@ use crate::error::{
UserNotFoundSnafu, UserPasswordMismatchSnafu,
};
use crate::user_info::DefaultUserInfo;
use crate::{UserInfoRef, auth_mysql};
use crate::{auth_mysql, UserInfoRef};
#[async_trait::async_trait]
pub trait UserProvider: Send + Sync {

View File

@@ -102,10 +102,10 @@ pub mod test {
use common_test_util::temp_dir::create_temp_dir;
use crate::UserProvider;
use crate::user_info::DefaultUserInfo;
use crate::user_provider::static_user_provider::StaticUserProvider;
use crate::user_provider::{Identity, Password};
use crate::UserProvider;
async fn test_authenticate(provider: &dyn UserProvider, username: &str, password: &str) {
let re = provider
@@ -143,13 +143,12 @@ pub mod test {
let file = File::create(&file_path);
let file = file.unwrap();
let mut lw = LineWriter::new(file);
assert!(
lw.write_all(
assert!(lw
.write_all(
b"root=123456
admin=654321",
)
.is_ok()
);
.is_ok());
lw.flush().unwrap();
}

View File

@@ -20,7 +20,7 @@ use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use common_telemetry::{info, warn};
use notify::{EventKind, RecursiveMode, Watcher};
use snafu::{ResultExt, ensure};
use snafu::{ensure, ResultExt};
use crate::error::{FileWatchSnafu, InvalidConfigSnafu, Result};
use crate::user_info::DefaultUserInfo;
@@ -133,9 +133,9 @@ pub mod test {
use common_test_util::temp_dir::create_temp_dir;
use tokio::time::sleep;
use crate::UserProvider;
use crate::user_provider::watch_file_user_provider::WatchFileUserProvider;
use crate::user_provider::{Identity, Password};
use crate::UserProvider;
async fn test_authenticate(
provider: &dyn UserProvider,

View File

@@ -19,9 +19,9 @@ use std::time::Duration;
use catalog::kvbackend::new_table_cache;
use common_meta::cache::{
CacheRegistry, CacheRegistryBuilder, LayeredCacheRegistryBuilder, new_schema_cache,
new_table_flownode_set_cache, new_table_info_cache, new_table_name_cache,
new_table_route_cache, new_table_schema_cache, new_view_info_cache,
new_schema_cache, new_table_flownode_set_cache, new_table_info_cache, new_table_name_cache,
new_table_route_cache, new_table_schema_cache, new_view_info_cache, CacheRegistry,
CacheRegistryBuilder, LayeredCacheRegistryBuilder,
};
use common_meta::kv_backend::KvBackendRef;
use moka::future::CacheBuilder;

View File

@@ -32,7 +32,6 @@ common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
common-version.workspace = true
common-workload.workspace = true
dashmap.workspace = true
datafusion.workspace = true
datatypes.workspace = true
@@ -49,7 +48,6 @@ prometheus.workspace = true
promql-parser.workspace = true
rand.workspace = true
rustc-hash.workspace = true
serde.workspace = true
serde_json.workspace = true
session.workspace = true
snafu.workspace = true

View File

@@ -297,20 +297,6 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to handle query"))]
HandleQuery {
source: common_meta::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to project schema"))]
ProjectSchema {
source: datatypes::error::Error,
#[snafu(implicit)]
location: Location,
},
}
impl Error {
@@ -383,8 +369,6 @@ impl ErrorExt for Error {
Error::FrontendNotFound { .. } | Error::MetaClientMissing { .. } => {
StatusCode::Unexpected
}
Error::HandleQuery { source, .. } => source.status_code(),
Error::ProjectSchema { source, .. } => source.status_code(),
}
}

View File

@@ -14,34 +14,25 @@
use api::v1::meta::ProcedureStatus;
use common_error::ext::BoxedError;
use common_meta::cluster::{ClusterInfo, NodeInfo, Role};
use common_meta::cluster::{ClusterInfo, NodeInfo};
use common_meta::datanode::RegionStat;
use common_meta::key::flow::flow_state::FlowStat;
use common_meta::node_manager::DatanodeManagerRef;
use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutor};
use common_meta::rpc::procedure;
use common_procedure::{ProcedureInfo, ProcedureState};
use common_query::request::QueryRequest;
use common_recordbatch::SendableRecordBatchStream;
use common_recordbatch::util::ChainedRecordBatchStream;
use meta_client::MetaClientRef;
use snafu::ResultExt;
use store_api::storage::RegionId;
use crate::error;
use crate::information_schema::{DatanodeInspectRequest, InformationExtension};
use crate::information_schema::InformationExtension;
pub struct DistributedInformationExtension {
meta_client: MetaClientRef,
datanode_manager: DatanodeManagerRef,
}
impl DistributedInformationExtension {
pub fn new(meta_client: MetaClientRef, datanode_manager: DatanodeManagerRef) -> Self {
Self {
meta_client,
datanode_manager,
}
pub fn new(meta_client: MetaClientRef) -> Self {
Self { meta_client }
}
}
@@ -107,39 +98,4 @@ impl InformationExtension for DistributedInformationExtension {
.map_err(BoxedError::new)
.context(crate::error::ListFlowStatsSnafu)
}
async fn inspect_datanode(
&self,
request: DatanodeInspectRequest,
) -> std::result::Result<SendableRecordBatchStream, Self::Error> {
// Aggregate results from all datanodes
let nodes = self
.meta_client
.list_nodes(Some(Role::Datanode))
.await
.map_err(BoxedError::new)
.context(crate::error::ListNodesSnafu)?;
let plan = request
.build_plan()
.context(crate::error::DatafusionSnafu)?;
let mut streams = Vec::with_capacity(nodes.len());
for node in nodes {
let client = self.datanode_manager.datanode(&node.peer).await;
let stream = client
.handle_query(QueryRequest {
plan: plan.clone(),
region_id: RegionId::default(),
header: None,
})
.await
.context(crate::error::HandleQuerySnafu)?;
streams.push(stream);
}
let chained =
ChainedRecordBatchStream::new(streams).context(crate::error::CreateRecordBatchSnafu)?;
Ok(Box::pin(chained))
}
}

View File

@@ -21,4 +21,4 @@ mod table_cache;
pub use builder::KvBackendCatalogManagerBuilder;
pub use manager::KvBackendCatalogManager;
pub use table_cache::{TableCache, TableCacheRef, new_table_cache};
pub use table_cache::{new_table_cache, TableCache, TableCacheRef};

View File

@@ -16,8 +16,8 @@ use std::sync::Arc;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_meta::cache::LayeredCacheRegistryRef;
use common_meta::key::TableMetadataManager;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use common_procedure::ProcedureManagerRef;
use moka::sync::Cache;
@@ -26,8 +26,8 @@ use partition::manager::PartitionRuleManager;
#[cfg(feature = "enterprise")]
use crate::information_schema::InformationSchemaTableFactoryRef;
use crate::information_schema::{InformationExtensionRef, InformationSchemaProvider};
use crate::kvbackend::manager::{SystemCatalog, CATALOG_CACHE_MAX_CAPACITY};
use crate::kvbackend::KvBackendCatalogManager;
use crate::kvbackend::manager::{CATALOG_CACHE_MAX_CAPACITY, SystemCatalog};
use crate::process_manager::ProcessManagerRef;
use crate::system_schema::pg_catalog::PGCatalogProvider;

View File

@@ -24,12 +24,12 @@ use common_meta::error::Error::CacheNotGet;
use common_meta::error::{CacheNotGetSnafu, Error, ExternalSnafu, GetKvCacheSnafu, Result};
use common_meta::kv_backend::txn::{Txn, TxnResponse};
use common_meta::kv_backend::{KvBackend, KvBackendRef, TxnService};
use common_meta::rpc::KeyValue;
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
};
use common_meta::rpc::KeyValue;
use common_telemetry::debug;
use meta_client::client::MetaClient;
use moka::future::{Cache, CacheBuilder};
@@ -461,17 +461,17 @@ impl KvBackend for MetaKvBackend {
#[cfg(test)]
mod tests {
use std::any::Any;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use async_trait::async_trait;
use common_meta::kv_backend::{KvBackend, TxnService};
use common_meta::rpc::KeyValue;
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse,
BatchPutRequest, BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest,
PutResponse, RangeRequest, RangeResponse,
};
use common_meta::rpc::KeyValue;
use dashmap::DashMap;
use super::CachedKvBackend;

View File

@@ -26,12 +26,12 @@ use common_meta::cache::{
LayeredCacheRegistryRef, TableInfoCacheRef, TableNameCacheRef, TableRoute, TableRouteCacheRef,
ViewInfoCacheRef,
};
use common_meta::key::TableMetadataManagerRef;
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
use common_meta::key::table_name::TableNameKey;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::KvBackendRef;
use common_procedure::ProcedureManagerRef;
use futures_util::stream::BoxStream;
@@ -41,16 +41,15 @@ use partition::manager::PartitionRuleManagerRef;
use session::context::{Channel, QueryContext};
use snafu::prelude::*;
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use table::TableRef;
use table::dist_table::DistTable;
use table::metadata::{TableId, TableInfoRef};
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
use table::table::PartitionRules;
use table::table::numbers::{NUMBERS_TABLE_NAME, NumbersTable};
use table::table_name::TableName;
use table::TableRef;
use tokio::sync::Semaphore;
use tokio_stream::wrappers::ReceiverStream;
use crate::CatalogManager;
use crate::error::{
CacheNotFoundSnafu, GetTableCacheSnafu, InvalidTableInfoInCatalogSnafu, ListCatalogsSnafu,
ListSchemasSnafu, ListTablesSnafu, Result, TableMetadataManagerSnafu,
@@ -60,8 +59,9 @@ use crate::information_schema::InformationSchemaTableFactoryRef;
use crate::information_schema::{InformationExtensionRef, InformationSchemaProvider};
use crate::kvbackend::TableCacheRef;
use crate::process_manager::ProcessManagerRef;
use crate::system_schema::SystemSchemaProvider;
use crate::system_schema::pg_catalog::PGCatalogProvider;
use crate::system_schema::SystemSchemaProvider;
use crate::CatalogManager;
/// Access all existing catalog, schema and tables.
///

View File

@@ -20,9 +20,9 @@ use common_meta::instruction::CacheIdent;
use futures::future::BoxFuture;
use moka::future::Cache;
use snafu::OptionExt;
use table::TableRef;
use table::dist_table::DistTable;
use table::table_name::TableName;
use table::TableRef;
pub type TableCacheRef = Arc<TableCache>;

View File

@@ -25,8 +25,8 @@ use common_catalog::consts::{INFORMATION_SCHEMA_NAME, PG_CATALOG_NAME};
use futures::future::BoxFuture;
use futures_util::stream::BoxStream;
use session::context::QueryContext;
use table::TableRef;
use table::metadata::{TableId, TableInfoRef};
use table::TableRef;
use crate::error::Result;

View File

@@ -14,4 +14,4 @@
pub mod manager;
pub use manager::{MemoryCatalogManager, new_memory_catalog_manager};
pub use manager::{new_memory_catalog_manager, MemoryCatalogManager};

View File

@@ -28,8 +28,8 @@ use common_meta::kv_backend::memory::MemoryKvBackend;
use futures_util::stream::BoxStream;
use session::context::QueryContext;
use snafu::OptionExt;
use table::TableRef;
use table::metadata::{TableId, TableInfoRef};
use table::TableRef;
use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu};
use crate::information_schema::InformationSchemaProvider;
@@ -419,7 +419,7 @@ pub fn new_memory_catalog_manager() -> Result<Arc<MemoryCatalogManager>> {
mod tests {
use common_catalog::consts::*;
use futures_util::TryStreamExt;
use table::table::numbers::{NUMBERS_TABLE_NAME, NumbersTable};
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
use super::*;
@@ -454,18 +454,16 @@ mod tests {
tables[0].table_info().table_id()
);
assert!(
catalog_list
.table(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
"not_exists",
None
)
.await
.unwrap()
.is_none()
);
assert!(catalog_list
.table(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
"not_exists",
None
)
.await
.unwrap()
.is_none());
}
#[test]
@@ -488,13 +486,11 @@ mod tests {
table: NumbersTable::table(2333),
};
catalog.register_table_sync(register_table_req).unwrap();
assert!(
catalog
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name, None)
.await
.unwrap()
.is_some()
);
assert!(catalog
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name, None)
.await
.unwrap()
.is_some());
let deregister_table_req = DeregisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
@@ -502,12 +498,10 @@ mod tests {
table_name: table_name.to_string(),
};
catalog.deregister_table_sync(deregister_table_req).unwrap();
assert!(
catalog
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name, None)
.await
.unwrap()
.is_none()
);
assert!(catalog
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name, None)
.await
.unwrap()
.is_none());
}
}

View File

@@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt::{Debug, Display, Formatter};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, RwLock};
@@ -30,7 +30,7 @@ use common_time::util::current_time_millis;
use meta_client::MetaClientRef;
use promql_parser::parser::EvalStmt;
use rand::random;
use snafu::{OptionExt, ResultExt, ensure};
use snafu::{ensure, OptionExt, ResultExt};
use sql::statements::statement::Statement;
use crate::error;

View File

@@ -36,26 +36,22 @@ use common_catalog::consts::{self, DEFAULT_CATALOG_NAME, INFORMATION_SCHEMA_NAME
use common_error::ext::ErrorExt;
use common_meta::cluster::NodeInfo;
use common_meta::datanode::RegionStat;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::flow::flow_state::FlowStat;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use common_procedure::ProcedureInfo;
use common_recordbatch::SendableRecordBatchStream;
use datafusion::error::DataFusionError;
use datafusion::logical_expr::LogicalPlan;
use datatypes::schema::SchemaRef;
use lazy_static::lazy_static;
use paste::paste;
use process_list::InformationSchemaProcessList;
use store_api::sst_entry::{ManifestSstEntry, StorageSstEntry};
use store_api::storage::{ScanRequest, TableId};
use table::TableRef;
use table::metadata::TableType;
use table::TableRef;
pub use table_names::*;
use views::InformationSchemaViews;
use self::columns::InformationSchemaColumns;
use crate::CatalogManager;
use crate::error::{Error, Result};
use crate::process_manager::ProcessManagerRef;
use crate::system_schema::information_schema::cluster_info::InformationSchemaClusterInfo;
@@ -73,6 +69,7 @@ pub(crate) use crate::system_schema::predicate::Predicates;
use crate::system_schema::{
SystemSchemaProvider, SystemSchemaProviderInner, SystemTable, SystemTableRef,
};
use crate::CatalogManager;
lazy_static! {
// Memory tables in `information_schema`.
@@ -412,43 +409,8 @@ pub trait InformationExtension {
/// Get the flow statistics. If no flownode is available, return `None`.
async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error>;
/// Inspects the datanode.
async fn inspect_datanode(
&self,
request: DatanodeInspectRequest,
) -> std::result::Result<SendableRecordBatchStream, Self::Error>;
}
/// The request to inspect the datanode.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DatanodeInspectRequest {
/// Kind to fetch from datanode.
pub kind: DatanodeInspectKind,
/// Pushdown scan configuration (projection/predicate/limit) for the returned stream.
/// This allows server-side filtering to reduce I/O and network costs.
pub scan: ScanRequest,
}
/// The kind of the datanode inspect request.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DatanodeInspectKind {
/// List SST entries recorded in manifest
SstManifest,
/// List SST entries discovered in storage layer
SstStorage,
}
impl DatanodeInspectRequest {
/// Builds a logical plan for the datanode inspect request.
pub fn build_plan(self) -> std::result::Result<LogicalPlan, DataFusionError> {
match self.kind {
DatanodeInspectKind::SstManifest => ManifestSstEntry::build_plan(self.scan),
DatanodeInspectKind::SstStorage => StorageSstEntry::build_plan(self.scan),
}
}
}
pub struct NoopInformationExtension;
#[async_trait::async_trait]
@@ -470,11 +432,4 @@ impl InformationExtension for NoopInformationExtension {
async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
Ok(None)
}
async fn inspect_datanode(
&self,
_request: DatanodeInspectRequest,
) -> std::result::Result<SendableRecordBatchStream, Self::Error> {
Ok(common_recordbatch::RecordBatches::empty().as_stream())
}
}

View File

@@ -18,46 +18,37 @@ use std::time::Duration;
use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID;
use common_error::ext::BoxedError;
use common_meta::cluster::{DatanodeStatus, NodeInfo, NodeStatus};
use common_meta::cluster::NodeInfo;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_time::timestamp::Timestamp;
use common_workload::DatanodeWorkloadType;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::timestamp::TimestampMillisecond;
use datatypes::value::Value;
use datatypes::vectors::{
Int64VectorBuilder, StringVectorBuilder, TimestampMillisecondVectorBuilder,
UInt32VectorBuilder, UInt64VectorBuilder,
};
use serde::Serialize;
use snafu::ResultExt;
use store_api::storage::{ScanRequest, TableId};
use crate::CatalogManager;
use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
use crate::system_schema::information_schema::{CLUSTER_INFO, InformationTable, Predicates};
use crate::system_schema::information_schema::{InformationTable, Predicates, CLUSTER_INFO};
use crate::system_schema::utils;
const PEER_TYPE_FRONTEND: &str = "FRONTEND";
const PEER_TYPE_METASRV: &str = "METASRV";
use crate::CatalogManager;
const PEER_ID: &str = "peer_id";
const PEER_TYPE: &str = "peer_type";
const PEER_ADDR: &str = "peer_addr";
const CPUS: &str = "cpus";
const MEMORY_BYTES: &str = "memory_bytes";
const VERSION: &str = "version";
const GIT_COMMIT: &str = "git_commit";
const START_TIME: &str = "start_time";
const UPTIME: &str = "uptime";
const ACTIVE_TIME: &str = "active_time";
const NODE_STATUS: &str = "node_status";
const INIT_CAPACITY: usize = 42;
@@ -66,14 +57,11 @@ const INIT_CAPACITY: usize = 42;
/// - `peer_id`: the peer server id.
/// - `peer_type`: the peer type, such as `datanode`, `frontend`, `metasrv` etc.
/// - `peer_addr`: the peer gRPC address.
/// - `cpus`: the number of CPUs of the peer.
/// - `memory_bytes`: the memory bytes of the peer.
/// - `version`: the build package version of the peer.
/// - `git_commit`: the build git commit hash of the peer.
/// - `start_time`: the starting time of the peer.
/// - `uptime`: the uptime of the peer.
/// - `active_time`: the time since the last activity of the peer.
/// - `node_status`: the status info of the peer.
///
#[derive(Debug)]
pub(super) struct InformationSchemaClusterInfo {
@@ -94,8 +82,6 @@ impl InformationSchemaClusterInfo {
ColumnSchema::new(PEER_ID, ConcreteDataType::int64_datatype(), false),
ColumnSchema::new(PEER_TYPE, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(PEER_ADDR, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(CPUS, ConcreteDataType::uint32_datatype(), false),
ColumnSchema::new(MEMORY_BYTES, ConcreteDataType::uint64_datatype(), false),
ColumnSchema::new(VERSION, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(GIT_COMMIT, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(
@@ -105,7 +91,6 @@ impl InformationSchemaClusterInfo {
),
ColumnSchema::new(UPTIME, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(ACTIVE_TIME, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(NODE_STATUS, ConcreteDataType::string_datatype(), true),
]))
}
@@ -155,14 +140,11 @@ struct InformationSchemaClusterInfoBuilder {
peer_ids: Int64VectorBuilder,
peer_types: StringVectorBuilder,
peer_addrs: StringVectorBuilder,
cpus: UInt32VectorBuilder,
memory_bytes: UInt64VectorBuilder,
versions: StringVectorBuilder,
git_commits: StringVectorBuilder,
start_times: TimestampMillisecondVectorBuilder,
uptimes: StringVectorBuilder,
active_times: StringVectorBuilder,
node_status: StringVectorBuilder,
}
impl InformationSchemaClusterInfoBuilder {
@@ -173,14 +155,11 @@ impl InformationSchemaClusterInfoBuilder {
peer_ids: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
peer_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
peer_addrs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
cpus: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
memory_bytes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
versions: StringVectorBuilder::with_capacity(INIT_CAPACITY),
git_commits: StringVectorBuilder::with_capacity(INIT_CAPACITY),
start_times: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
uptimes: StringVectorBuilder::with_capacity(INIT_CAPACITY),
active_times: StringVectorBuilder::with_capacity(INIT_CAPACITY),
node_status: StringVectorBuilder::with_capacity(INIT_CAPACITY),
}
}
@@ -197,10 +176,9 @@ impl InformationSchemaClusterInfoBuilder {
fn add_node_info(&mut self, predicates: &Predicates, node_info: NodeInfo) {
let peer_type = node_info.status.role_name();
let peer_id = peer_id(peer_type, node_info.peer.id);
let row = [
(PEER_ID, &Value::from(peer_id)),
(PEER_ID, &Value::from(node_info.peer.id)),
(PEER_TYPE, &Value::from(peer_type)),
(PEER_ADDR, &Value::from(node_info.peer.addr.as_str())),
(VERSION, &Value::from(node_info.version.as_str())),
@@ -211,7 +189,13 @@ impl InformationSchemaClusterInfoBuilder {
return;
}
self.peer_ids.push(Some(peer_id));
if peer_type == "FRONTEND" || peer_type == "METASRV" {
// Always set peer_id to be -1 for frontends and metasrvs
self.peer_ids.push(Some(-1));
} else {
self.peer_ids.push(Some(node_info.peer.id as i64));
}
self.peer_types.push(Some(peer_type));
self.peer_addrs.push(Some(&node_info.peer.addr));
self.versions.push(Some(&node_info.version));
@@ -228,8 +212,6 @@ impl InformationSchemaClusterInfoBuilder {
self.start_times.push(None);
self.uptimes.push(None);
}
self.cpus.push(Some(node_info.cpus));
self.memory_bytes.push(Some(node_info.memory_bytes));
if node_info.last_activity_ts > 0 {
self.active_times.push(Some(
@@ -238,8 +220,6 @@ impl InformationSchemaClusterInfoBuilder {
} else {
self.active_times.push(None);
}
self.node_status
.push(format_node_status(&node_info).as_deref());
}
fn format_duration_since(ts: u64) -> String {
@@ -253,14 +233,11 @@ impl InformationSchemaClusterInfoBuilder {
Arc::new(self.peer_ids.finish()),
Arc::new(self.peer_types.finish()),
Arc::new(self.peer_addrs.finish()),
Arc::new(self.cpus.finish()),
Arc::new(self.memory_bytes.finish()),
Arc::new(self.versions.finish()),
Arc::new(self.git_commits.finish()),
Arc::new(self.start_times.finish()),
Arc::new(self.uptimes.finish()),
Arc::new(self.active_times.finish()),
Arc::new(self.node_status.finish()),
];
RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
}
@@ -286,56 +263,3 @@ impl DfPartitionStream for InformationSchemaClusterInfo {
))
}
}
fn peer_id(peer_type: &str, peer_id: u64) -> i64 {
if peer_type == PEER_TYPE_FRONTEND || peer_type == PEER_TYPE_METASRV {
-1
} else {
peer_id as i64
}
}
#[derive(Serialize)]
struct DisplayMetasrvStatus {
is_leader: bool,
}
#[derive(Serialize)]
struct DisplayDatanodeStatus {
workloads: Vec<DatanodeWorkloadType>,
leader_regions: usize,
follower_regions: usize,
}
impl From<&DatanodeStatus> for DisplayDatanodeStatus {
fn from(status: &DatanodeStatus) -> Self {
Self {
workloads: status
.workloads
.types
.iter()
.flat_map(|w| DatanodeWorkloadType::from_i32(*w))
.collect(),
leader_regions: status.leader_regions,
follower_regions: status.follower_regions,
}
}
}
fn format_node_status(node_info: &NodeInfo) -> Option<String> {
match &node_info.status {
NodeStatus::Datanode(datanode_status) => {
serde_json::to_string(&DisplayDatanodeStatus::from(datanode_status)).ok()
}
NodeStatus::Frontend(_) => None,
NodeStatus::Flownode(_) => None,
NodeStatus::Metasrv(metasrv_status) => {
if metasrv_status.is_leader {
serde_json::to_string(&DisplayMetasrvStatus { is_leader: true }).ok()
} else {
None
}
}
NodeStatus::Standalone => None,
}
}

View File

@@ -23,9 +23,9 @@ use common_error::ext::BoxedError;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, DataType, MutableVector};
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
@@ -38,12 +38,12 @@ use snafu::{OptionExt, ResultExt};
use sql::statements;
use store_api::storage::{ScanRequest, TableId};
use crate::CatalogManager;
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::information_schema::Predicates;
use crate::system_schema::information_schema::{COLUMNS, InformationTable};
use crate::system_schema::information_schema::{InformationTable, COLUMNS};
use crate::CatalogManager;
#[derive(Debug)]
pub(super) struct InformationSchemaColumns {

View File

@@ -16,10 +16,10 @@ use std::sync::{Arc, Weak};
use common_catalog::consts::INFORMATION_SCHEMA_FLOW_TABLE_ID;
use common_error::ext::BoxedError;
use common_meta::key::FlowId;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::flow::flow_info::FlowInfoValue;
use common_meta::key::flow::flow_state::FlowStat;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::FlowId;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{DfSendableRecordBatchStream, RecordBatch, SendableRecordBatchStream};
use datafusion::execution::TaskContext;
@@ -38,14 +38,14 @@ use futures::TryStreamExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId};
use crate::CatalogManager;
use crate::error::{
CreateRecordBatchSnafu, FlowInfoNotFoundSnafu, InternalSnafu, JsonSnafu, ListFlowsSnafu,
Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::information_schema::{FLOWS, Predicates};
use crate::information_schema::{Predicates, FLOWS};
use crate::system_schema::information_schema::InformationTable;
use crate::system_schema::utils;
use crate::CatalogManager;
const INIT_CAPACITY: usize = 42;

View File

@@ -89,9 +89,9 @@ pub(super) fn get_schema_columns(table_name: &str) -> (SchemaRef, Vec<VectorRef>
vec![
Arc::new(StringVector::from(vec![build_info.branch.to_string()])),
Arc::new(StringVector::from(vec![build_info.commit.to_string()])),
Arc::new(StringVector::from(vec![
build_info.commit_short.to_string(),
])),
Arc::new(StringVector::from(vec![build_info
.commit_short
.to_string()])),
Arc::new(StringVector::from(vec![build_info.clean.to_string()])),
Arc::new(StringVector::from(vec![build_info.version.to_string()])),
],
@@ -369,9 +369,17 @@ pub(super) fn get_schema_columns(table_name: &str) -> (SchemaRef, Vec<VectorRef>
TRIGGERS => (
vec![
string_column("TRIGGER_NAME"),
ColumnSchema::new("trigger_id", ConcreteDataType::uint64_datatype(), false),
ColumnSchema::new(
"trigger_id",
ConcreteDataType::uint64_datatype(),
false,
),
string_column("TRIGGER_DEFINITION"),
ColumnSchema::new("flownode_id", ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(
"flownode_id",
ConcreteDataType::uint64_datatype(),
true,
),
],
vec![],
),

View File

@@ -20,9 +20,9 @@ use common_error::ext::BoxedError;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, MutableVector, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, FulltextBackend, Schema, SchemaRef};
use datatypes::value::Value;
@@ -31,11 +31,11 @@ use futures_util::TryStreamExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId};
use crate::CatalogManager;
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::system_schema::information_schema::{InformationTable, KEY_COLUMN_USAGE, Predicates};
use crate::system_schema::information_schema::{InformationTable, Predicates, KEY_COLUMN_USAGE};
use crate::CatalogManager;
pub const CONSTRAINT_SCHEMA: &str = "constraint_schema";
pub const CONSTRAINT_NAME: &str = "constraint_name";
@@ -277,15 +277,15 @@ impl InformationSchemaKeyColumnUsageBuilder {
constraints.push(CONSTRAINT_NAME_INVERTED_INDEX);
greptime_index_type.push(INDEX_TYPE_INVERTED_INDEX);
}
if let Ok(Some(options)) = column.fulltext_options()
&& options.enable
{
constraints.push(CONSTRAINT_NAME_FULLTEXT_INDEX);
let index_type = match options.backend {
FulltextBackend::Bloom => INDEX_TYPE_FULLTEXT_BLOOM,
FulltextBackend::Tantivy => INDEX_TYPE_FULLTEXT_TANTIVY,
};
greptime_index_type.push(index_type);
if let Ok(Some(options)) = column.fulltext_options() {
if options.enable {
constraints.push(CONSTRAINT_NAME_FULLTEXT_INDEX);
let index_type = match options.backend {
FulltextBackend::Bloom => INDEX_TYPE_FULLTEXT_BLOOM,
FulltextBackend::Tantivy => INDEX_TYPE_FULLTEXT_TANTIVY,
};
greptime_index_type.push(index_type);
}
}
if column.is_skipping_indexed() {
constraints.push(CONSTRAINT_NAME_SKIPPING_INDEX);

View File

@@ -21,17 +21,16 @@ use common_error::ext::BoxedError;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::timestamp::TimestampMicrosecond;
use datatypes::timestamp::TimestampSecond;
use datatypes::value::Value;
use datatypes::vectors::{
ConstantVector, Int64Vector, Int64VectorBuilder, MutableVector, StringVector,
StringVectorBuilder, TimestampMicrosecondVector, TimestampMicrosecondVectorBuilder,
UInt64VectorBuilder,
StringVectorBuilder, TimestampSecondVector, TimestampSecondVectorBuilder, UInt64VectorBuilder,
};
use futures::{StreamExt, TryStreamExt};
use partition::manager::PartitionInfo;
@@ -39,13 +38,13 @@ use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId};
use table::metadata::{TableInfo, TableType};
use crate::CatalogManager;
use crate::error::{
CreateRecordBatchSnafu, FindPartitionsSnafu, InternalSnafu, PartitionManagerNotFoundSnafu,
Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::kvbackend::KvBackendCatalogManager;
use crate::system_schema::information_schema::{InformationTable, PARTITIONS, Predicates};
use crate::system_schema::information_schema::{InformationTable, Predicates, PARTITIONS};
use crate::CatalogManager;
const TABLE_CATALOG: &str = "table_catalog";
const TABLE_SCHEMA: &str = "table_schema";
@@ -129,17 +128,17 @@ impl InformationSchemaPartitions {
ColumnSchema::new("data_free", ConcreteDataType::int64_datatype(), true),
ColumnSchema::new(
"create_time",
ConcreteDataType::timestamp_microsecond_datatype(),
ConcreteDataType::timestamp_second_datatype(),
true,
),
ColumnSchema::new(
"update_time",
ConcreteDataType::timestamp_microsecond_datatype(),
ConcreteDataType::timestamp_second_datatype(),
true,
),
ColumnSchema::new(
"check_time",
ConcreteDataType::timestamp_microsecond_datatype(),
ConcreteDataType::timestamp_second_datatype(),
true,
),
ColumnSchema::new("checksum", ConcreteDataType::int64_datatype(), true),
@@ -212,7 +211,7 @@ struct InformationSchemaPartitionsBuilder {
partition_names: StringVectorBuilder,
partition_ordinal_positions: Int64VectorBuilder,
partition_expressions: StringVectorBuilder,
create_times: TimestampMicrosecondVectorBuilder,
create_times: TimestampSecondVectorBuilder,
partition_ids: UInt64VectorBuilder,
}
@@ -232,7 +231,7 @@ impl InformationSchemaPartitionsBuilder {
partition_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
partition_ordinal_positions: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
partition_expressions: StringVectorBuilder::with_capacity(INIT_CAPACITY),
create_times: TimestampMicrosecondVectorBuilder::with_capacity(INIT_CAPACITY),
create_times: TimestampSecondVectorBuilder::with_capacity(INIT_CAPACITY),
partition_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
}
}
@@ -331,8 +330,8 @@ impl InformationSchemaPartitionsBuilder {
.push(Some((index + 1) as i64));
let expression = partition.partition_expr.as_ref().map(|e| e.to_string());
self.partition_expressions.push(expression.as_deref());
self.create_times.push(Some(TimestampMicrosecond::from(
table_info.meta.created_on.timestamp_millis(),
self.create_times.push(Some(TimestampSecond::from(
table_info.meta.created_on.timestamp(),
)));
self.partition_ids.push(Some(partition.id.as_u64()));
}
@@ -349,8 +348,8 @@ impl InformationSchemaPartitionsBuilder {
Arc::new(Int64Vector::from(vec![None])),
rows_num,
));
let null_timestampmicrosecond_vector = Arc::new(ConstantVector::new(
Arc::new(TimestampMicrosecondVector::from(vec![None])),
let null_timestamp_second_vector = Arc::new(ConstantVector::new(
Arc::new(TimestampSecondVector::from(vec![None])),
rows_num,
));
let partition_methods = Arc::new(ConstantVector::new(
@@ -380,8 +379,8 @@ impl InformationSchemaPartitionsBuilder {
null_i64_vector.clone(),
Arc::new(self.create_times.finish()),
// TODO(dennis): supports update_time
null_timestampmicrosecond_vector.clone(),
null_timestampmicrosecond_vector,
null_timestamp_second_vector.clone(),
null_timestamp_second_vector,
null_i64_vector,
null_string_vector.clone(),
null_string_vector.clone(),

View File

@@ -22,9 +22,9 @@ use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_time::timestamp::Timestamp;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::timestamp::TimestampMillisecond;
@@ -33,10 +33,10 @@ use datatypes::vectors::{StringVectorBuilder, TimestampMillisecondVectorBuilder}
use snafu::ResultExt;
use store_api::storage::{ScanRequest, TableId};
use crate::CatalogManager;
use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
use crate::system_schema::information_schema::{InformationTable, PROCEDURE_INFO, Predicates};
use crate::system_schema::information_schema::{InformationTable, Predicates, PROCEDURE_INFO};
use crate::system_schema::utils;
use crate::CatalogManager;
const PROCEDURE_ID: &str = "procedure_id";
const PROCEDURE_TYPE: &str = "procedure_type";

View File

@@ -23,9 +23,9 @@ use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::common::HashMap;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
@@ -35,13 +35,13 @@ use snafu::{OptionExt, ResultExt};
use store_api::storage::{RegionId, ScanRequest, TableId};
use table::metadata::TableType;
use crate::CatalogManager;
use crate::error::{
CreateRecordBatchSnafu, FindRegionRoutesSnafu, InternalSnafu, Result,
UpgradeWeakCatalogManagerRefSnafu,
};
use crate::kvbackend::KvBackendCatalogManager;
use crate::system_schema::information_schema::{InformationTable, Predicates, REGION_PEERS};
use crate::CatalogManager;
pub const TABLE_CATALOG: &str = "table_catalog";
pub const TABLE_SCHEMA: &str = "table_schema";

View File

@@ -30,17 +30,16 @@ use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder, UInt64VectorB
use snafu::ResultExt;
use store_api::storage::{ScanRequest, TableId};
use crate::CatalogManager;
use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
use crate::information_schema::Predicates;
use crate::system_schema::information_schema::{InformationTable, REGION_STATISTICS};
use crate::system_schema::utils;
use crate::CatalogManager;
const REGION_ID: &str = "region_id";
const TABLE_ID: &str = "table_id";
const REGION_NUMBER: &str = "region_number";
const REGION_ROWS: &str = "region_rows";
const WRITTEN_BYTES: &str = "written_bytes_since_open";
const DISK_SIZE: &str = "disk_size";
const MEMTABLE_SIZE: &str = "memtable_size";
const MANIFEST_SIZE: &str = "manifest_size";
@@ -58,7 +57,6 @@ const INIT_CAPACITY: usize = 42;
/// - `table_id`: The table id.
/// - `region_number`: The region number.
/// - `region_rows`: The number of rows in region.
/// - `written_bytes_since_open`: The total bytes written of the region since region opened.
/// - `memtable_size`: The memtable size in bytes.
/// - `disk_size`: The approximate disk size in bytes.
/// - `manifest_size`: The manifest size in bytes.
@@ -86,7 +84,6 @@ impl InformationSchemaRegionStatistics {
ColumnSchema::new(TABLE_ID, ConcreteDataType::uint32_datatype(), false),
ColumnSchema::new(REGION_NUMBER, ConcreteDataType::uint32_datatype(), false),
ColumnSchema::new(REGION_ROWS, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(WRITTEN_BYTES, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(DISK_SIZE, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(MEMTABLE_SIZE, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(MANIFEST_SIZE, ConcreteDataType::uint64_datatype(), true),
@@ -150,7 +147,6 @@ struct InformationSchemaRegionStatisticsBuilder {
table_ids: UInt32VectorBuilder,
region_numbers: UInt32VectorBuilder,
region_rows: UInt64VectorBuilder,
written_bytes: UInt64VectorBuilder,
disk_sizes: UInt64VectorBuilder,
memtable_sizes: UInt64VectorBuilder,
manifest_sizes: UInt64VectorBuilder,
@@ -170,7 +166,6 @@ impl InformationSchemaRegionStatisticsBuilder {
table_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
region_numbers: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
region_rows: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
written_bytes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
disk_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
memtable_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
manifest_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
@@ -202,7 +197,6 @@ impl InformationSchemaRegionStatisticsBuilder {
(TABLE_ID, &Value::from(region_stat.id.table_id())),
(REGION_NUMBER, &Value::from(region_stat.id.region_number())),
(REGION_ROWS, &Value::from(region_stat.num_rows)),
(WRITTEN_BYTES, &Value::from(region_stat.written_bytes)),
(DISK_SIZE, &Value::from(region_stat.approximate_bytes)),
(MEMTABLE_SIZE, &Value::from(region_stat.memtable_size)),
(MANIFEST_SIZE, &Value::from(region_stat.manifest_size)),
@@ -222,7 +216,6 @@ impl InformationSchemaRegionStatisticsBuilder {
self.region_numbers
.push(Some(region_stat.id.region_number()));
self.region_rows.push(Some(region_stat.num_rows));
self.written_bytes.push(Some(region_stat.written_bytes));
self.disk_sizes.push(Some(region_stat.approximate_bytes));
self.memtable_sizes.push(Some(region_stat.memtable_size));
self.manifest_sizes.push(Some(region_stat.manifest_size));
@@ -239,7 +232,6 @@ impl InformationSchemaRegionStatisticsBuilder {
Arc::new(self.table_ids.finish()),
Arc::new(self.region_numbers.finish()),
Arc::new(self.region_rows.finish()),
Arc::new(self.written_bytes.finish()),
Arc::new(self.disk_sizes.finish()),
Arc::new(self.memtable_sizes.finish()),
Arc::new(self.manifest_sizes.finish()),

View File

@@ -21,9 +21,9 @@ use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_time::util::current_time_millis;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, MutableVector};
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};

View File

@@ -21,9 +21,9 @@ use common_meta::key::schema_name::SchemaNameKey;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
@@ -31,13 +31,13 @@ use datatypes::vectors::StringVectorBuilder;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId};
use crate::CatalogManager;
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, TableMetadataManagerSnafu,
UpgradeWeakCatalogManagerRefSnafu,
};
use crate::system_schema::information_schema::{InformationTable, Predicates, SCHEMATA};
use crate::system_schema::utils;
use crate::CatalogManager;
pub const CATALOG_NAME: &str = "catalog_name";
pub const SCHEMA_NAME: &str = "schema_name";

View File

@@ -20,9 +20,9 @@ use common_error::ext::BoxedError;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, MutableVector};
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
@@ -32,15 +32,15 @@ use futures::TryStreamExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId};
use crate::CatalogManager;
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::information_schema::Predicates;
use crate::information_schema::key_column_usage::{
CONSTRAINT_NAME_PRI, CONSTRAINT_NAME_TIME_INDEX,
};
use crate::information_schema::Predicates;
use crate::system_schema::information_schema::{InformationTable, TABLE_CONSTRAINTS};
use crate::CatalogManager;
/// The `TABLE_CONSTRAINTS` table describes which tables have constraints.
#[derive(Debug)]

View File

@@ -23,9 +23,9 @@ use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_telemetry::error;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
@@ -37,12 +37,12 @@ use snafu::{OptionExt, ResultExt};
use store_api::storage::{RegionId, ScanRequest, TableId};
use table::metadata::{TableInfo, TableType};
use crate::CatalogManager;
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::system_schema::information_schema::{InformationTable, Predicates, TABLES};
use crate::system_schema::utils;
use crate::CatalogManager;
pub const TABLE_CATALOG: &str = "table_catalog";
pub const TABLE_SCHEMA: &str = "table_schema";

View File

@@ -20,9 +20,9 @@ use common_error::ext::BoxedError;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
@@ -32,13 +32,13 @@ use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId};
use table::metadata::TableType;
use crate::CatalogManager;
use crate::error::{
CastManagerSnafu, CreateRecordBatchSnafu, GetViewCacheSnafu, InternalSnafu, Result,
UpgradeWeakCatalogManagerRefSnafu, ViewInfoNotFoundSnafu,
};
use crate::kvbackend::KvBackendCatalogManager;
use crate::system_schema::information_schema::{InformationTable, Predicates, VIEWS};
use crate::CatalogManager;
const INIT_CAPACITY: usize = 42;
pub const TABLE_CATALOG: &str = "table_catalog";

View File

@@ -21,9 +21,9 @@ use common_error::ext::BoxedError;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::schema::SchemaRef;
use datatypes::vectors::VectorRef;
use snafu::ResultExt;

View File

@@ -34,10 +34,10 @@ use table::TableRef;
pub use table_names::*;
use self::pg_namespace::oid_map::{PGNamespaceOidMap, PGNamespaceOidMapRef};
use crate::CatalogManager;
use crate::system_schema::memory_table::MemoryTable;
use crate::system_schema::utils::tables::u32_column;
use crate::system_schema::{SystemSchemaProvider, SystemSchemaProviderInner, SystemTableRef};
use crate::CatalogManager;
lazy_static! {
static ref MEMORY_TABLES: &'static [&'static str] = &[table_names::PG_TYPE];

View File

@@ -32,15 +32,15 @@ use snafu::{OptionExt, ResultExt};
use store_api::storage::ScanRequest;
use table::metadata::TableType;
use crate::CatalogManager;
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::information_schema::Predicates;
use crate::system_schema::SystemTable;
use crate::system_schema::pg_catalog::pg_namespace::oid_map::PGNamespaceOidMapRef;
use crate::system_schema::pg_catalog::{OID_COLUMN_NAME, PG_CLASS, query_ctx};
use crate::system_schema::pg_catalog::{query_ctx, OID_COLUMN_NAME, PG_CLASS};
use crate::system_schema::utils::tables::{string_column, u32_column};
use crate::system_schema::SystemTable;
use crate::CatalogManager;
// === column name ===
pub const RELNAME: &str = "relname";

View File

@@ -29,15 +29,15 @@ use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder, VectorRef};
use snafu::{OptionExt, ResultExt};
use store_api::storage::ScanRequest;
use crate::CatalogManager;
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::information_schema::Predicates;
use crate::system_schema::SystemTable;
use crate::system_schema::pg_catalog::pg_namespace::oid_map::PGNamespaceOidMapRef;
use crate::system_schema::pg_catalog::{OID_COLUMN_NAME, PG_DATABASE, query_ctx};
use crate::system_schema::pg_catalog::{query_ctx, OID_COLUMN_NAME, PG_DATABASE};
use crate::system_schema::utils::tables::{string_column, u32_column};
use crate::system_schema::SystemTable;
use crate::CatalogManager;
// === column name ===
pub const DATNAME: &str = "datname";

View File

@@ -35,16 +35,16 @@ use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder, VectorRef};
use snafu::{OptionExt, ResultExt};
use store_api::storage::ScanRequest;
use crate::CatalogManager;
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::information_schema::Predicates;
use crate::system_schema::SystemTable;
use crate::system_schema::pg_catalog::{
OID_COLUMN_NAME, PG_NAMESPACE, PGNamespaceOidMapRef, query_ctx,
query_ctx, PGNamespaceOidMapRef, OID_COLUMN_NAME, PG_NAMESPACE,
};
use crate::system_schema::utils::tables::{string_column, u32_column};
use crate::system_schema::SystemTable;
use crate::CatalogManager;
const NSPNAME: &str = "nspname";
const INIT_CAPACITY: usize = 42;

View File

@@ -339,22 +339,18 @@ mod tests {
assert!(!p.eval(&wrong_row).unwrap());
assert!(p.eval(&[]).is_none());
assert!(p.eval(&[("c", &a_value)]).is_none());
assert!(
!p.eval(&[(a_col.as_str(), &b_value), (b_col.as_str(), &a_value)])
.unwrap()
);
assert!(
!p.eval(&[(a_col.as_str(), &b_value), (b_col.as_str(), &b_value)])
.unwrap()
);
assert!(
p.eval(&[(a_col.as_ref(), &a_value), ("c", &a_value)])
.is_none()
);
assert!(
!p.eval(&[(a_col.as_ref(), &b_value), ("c", &a_value)])
.unwrap()
);
assert!(!p
.eval(&[(a_col.as_str(), &b_value), (b_col.as_str(), &a_value)])
.unwrap());
assert!(!p
.eval(&[(a_col.as_str(), &b_value), (b_col.as_str(), &b_value)])
.unwrap());
assert!(p
.eval(&[(a_col.as_ref(), &a_value), ("c", &a_value)])
.is_none());
assert!(!p
.eval(&[(a_col.as_ref(), &b_value), ("c", &a_value)])
.unwrap());
//Predicate::Or
let p = Predicate::Or(Box::new(p1), Box::new(p2));
@@ -362,22 +358,18 @@ mod tests {
assert!(p.eval(&wrong_row).unwrap());
assert!(p.eval(&[]).is_none());
assert!(p.eval(&[("c", &a_value)]).is_none());
assert!(
!p.eval(&[(a_col.as_str(), &b_value), (b_col.as_str(), &a_value)])
.unwrap()
);
assert!(
p.eval(&[(a_col.as_str(), &b_value), (b_col.as_str(), &b_value)])
.unwrap()
);
assert!(
p.eval(&[(a_col.as_ref(), &a_value), ("c", &a_value)])
.unwrap()
);
assert!(
p.eval(&[(a_col.as_ref(), &b_value), ("c", &a_value)])
.is_none()
);
assert!(!p
.eval(&[(a_col.as_str(), &b_value), (b_col.as_str(), &a_value)])
.unwrap());
assert!(p
.eval(&[(a_col.as_str(), &b_value), (b_col.as_str(), &b_value)])
.unwrap());
assert!(p
.eval(&[(a_col.as_ref(), &a_value), ("c", &a_value)])
.unwrap());
assert!(p
.eval(&[(a_col.as_ref(), &b_value), ("c", &a_value)])
.is_none());
}
#[test]

View File

@@ -17,10 +17,10 @@ use std::sync::Weak;
use common_meta::key::TableMetadataManagerRef;
use snafu::OptionExt;
use crate::CatalogManager;
use crate::error::{GetInformationExtensionSnafu, Result, UpgradeWeakCatalogManagerRefSnafu};
use crate::information_schema::InformationExtensionRef;
use crate::kvbackend::KvBackendCatalogManager;
use crate::CatalogManager;
pub mod tables;

View File

@@ -17,27 +17,27 @@ use std::sync::Arc;
use bytes::Bytes;
use common_catalog::format_full_table_name;
use common_query::logical_plan::{SubstraitPlanDecoderRef, rename_logical_plan_columns};
use common_query::logical_plan::{rename_logical_plan_columns, SubstraitPlanDecoderRef};
use datafusion::common::{ResolvedTableReference, TableReference};
use datafusion::datasource::view::ViewTable;
use datafusion::datasource::{TableProvider, provider_as_source};
use datafusion::datasource::{provider_as_source, TableProvider};
use datafusion::logical_expr::TableSource;
use itertools::Itertools;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt, ensure};
use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::TableType;
use table::table::adapter::DfTableProviderAdapter;
pub mod dummy_catalog;
use dummy_catalog::DummyCatalogList;
use table::TableRef;
use crate::CatalogManagerRef;
use crate::error::{
CastManagerSnafu, DecodePlanSnafu, GetViewCacheSnafu, ProjectViewColumnsSnafu,
QueryAccessDeniedSnafu, Result, TableNotExistSnafu, ViewInfoNotFoundSnafu,
ViewPlanColumnsChangedSnafu,
};
use crate::kvbackend::KvBackendCatalogManager;
use crate::CatalogManagerRef;
pub struct DfTableSourceProvider {
catalog_manager: CatalogManagerRef,
@@ -272,7 +272,7 @@ mod tests {
use common_query::logical_plan::SubstraitPlanDecoder;
use datafusion::catalog::CatalogProviderList;
use datafusion::logical_expr::builder::LogicalTableSource;
use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder, col, lit};
use datafusion::logical_expr::{col, lit, LogicalPlan, LogicalPlanBuilder};
use crate::information_schema::NoopInformationExtension;

View File

@@ -25,8 +25,8 @@ use datafusion::datasource::TableProvider;
use snafu::OptionExt;
use table::table::adapter::DfTableProviderAdapter;
use crate::CatalogManagerRef;
use crate::error::TableNotExistSnafu;
use crate::CatalogManagerRef;
/// Delegate the resolving requests to the `[CatalogManager]` unconditionally.
#[derive(Clone)]

View File

@@ -14,8 +14,8 @@
use std::time::Instant;
use common_meta::key::TableMetadataManagerRef;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::TableMetadataManagerRef;
use table::table_name::TableName;
use crate::bench::{

View File

@@ -18,9 +18,9 @@ mod import;
use clap::Subcommand;
use common_error::ext::BoxedError;
use crate::Tool;
use crate::data::export::ExportCommand;
use crate::data::import::ImportCommand;
use crate::Tool;
/// Command for data operations including exporting data from and importing data into GreptimeDB.
#[derive(Subcommand)]

View File

@@ -24,18 +24,18 @@ use common_error::ext::BoxedError;
use common_telemetry::{debug, error, info};
use object_store::layers::LoggingLayer;
use object_store::services::Oss;
use object_store::{ObjectStore, services};
use object_store::{services, ObjectStore};
use serde_json::Value;
use snafu::{OptionExt, ResultExt};
use tokio::sync::Semaphore;
use tokio::time::Instant;
use crate::database::{DatabaseClient, parse_proxy_opts};
use crate::database::{parse_proxy_opts, DatabaseClient};
use crate::error::{
EmptyResultSnafu, Error, OpenDalSnafu, OutputDirNotSetSnafu, Result, S3ConfigNotSetSnafu,
SchemaNotFoundSnafu,
};
use crate::{Tool, database};
use crate::{database, Tool};
type TableReference = (String, String, String);

View File

@@ -25,9 +25,9 @@ use snafu::{OptionExt, ResultExt};
use tokio::sync::Semaphore;
use tokio::time::Instant;
use crate::database::{DatabaseClient, parse_proxy_opts};
use crate::database::{parse_proxy_opts, DatabaseClient};
use crate::error::{Error, FileIoSnafu, Result, SchemaNotFoundSnafu};
use crate::{Tool, database};
use crate::{database, Tool};
#[derive(Debug, Default, Clone, ValueEnum)]
enum ImportTarget {

View File

@@ -14,15 +14,15 @@
use std::time::Duration;
use base64::Engine;
use base64::engine::general_purpose;
use base64::Engine;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::BoxedError;
use humantime::format_duration;
use serde_json::Value;
use servers::http::GreptimeQueryOutput;
use servers::http::header::constants::GREPTIME_DB_HEADER_TIMEOUT;
use servers::http::result::greptime_result_v1::GreptimedbV1Response;
use servers::http::GreptimeQueryOutput;
use snafu::ResultExt;
use crate::error::{

View File

@@ -21,10 +21,10 @@ mod utils;
use clap::Subcommand;
use common_error::ext::BoxedError;
use crate::Tool;
use crate::metadata::control::{DelCommand, GetCommand};
use crate::metadata::repair::RepairLogicalTablesCommand;
use crate::metadata::snapshot::SnapshotCommand;
use crate::Tool;
/// Command for managing metadata operations,
/// including saving and restoring metadata snapshots,

View File

@@ -16,9 +16,9 @@ use std::sync::Arc;
use clap::Parser;
use common_error::ext::BoxedError;
use common_meta::kv_backend::KvBackendRef;
use common_meta::kv_backend::chroot::ChrootKvBackend;
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::KvBackendRef;
use meta_srv::bootstrap::create_etcd_client_with_tls;
use meta_srv::metasrv::BackendImpl;
use servers::tls::{TlsMode, TlsOption};
@@ -83,6 +83,20 @@ pub(crate) struct StoreConfig {
}
impl StoreConfig {
pub fn tls_config(&self) -> Option<TlsOption> {
if self.backend_tls_mode != TlsMode::Disable {
Some(TlsOption {
mode: self.backend_tls_mode.clone(),
cert_path: self.backend_tls_cert_path.clone(),
key_path: self.backend_tls_key_path.clone(),
ca_cert_path: self.backend_tls_ca_cert_path.clone(),
watch: self.backend_tls_watch,
})
} else {
None
}
}
/// Builds a [`KvBackendRef`] from the store configuration.
pub async fn build(&self) -> Result<KvBackendRef, BoxedError> {
let max_txn_ops = self.max_txn_ops;
@@ -92,17 +106,7 @@ impl StoreConfig {
} else {
let kvbackend = match self.backend {
BackendImpl::EtcdStore => {
let tls_config = if self.backend_tls_mode != TlsMode::Disable {
Some(TlsOption {
mode: self.backend_tls_mode.clone(),
cert_path: self.backend_tls_cert_path.clone(),
key_path: self.backend_tls_key_path.clone(),
ca_cert_path: self.backend_tls_ca_cert_path.clone(),
watch: self.backend_tls_watch,
})
} else {
None
};
let tls_config = self.tls_config();
let etcd_client = create_etcd_client_with_tls(store_addrs, tls_config.as_ref())
.await
.map_err(BoxedError::new)?;
@@ -111,7 +115,8 @@ impl StoreConfig {
#[cfg(feature = "pg_kvbackend")]
BackendImpl::PostgresStore => {
let table_name = &self.meta_table_name;
let pool = meta_srv::bootstrap::create_postgres_pool(store_addrs, None)
let tls_config = self.tls_config();
let pool = meta_srv::bootstrap::create_postgres_pool(store_addrs, tls_config)
.await
.map_err(BoxedError::new)?;
let schema_name = self.meta_schema_name.as_deref();

View File

@@ -18,9 +18,9 @@ mod table;
use clap::Subcommand;
use common_error::ext::BoxedError;
use crate::Tool;
use crate::metadata::control::del::key::DelKeyCommand;
use crate::metadata::control::del::table::DelTableCommand;
use crate::Tool;
/// The prefix of the tombstone keys.
pub(crate) const CLI_TOMBSTONE_PREFIX: &str = "__cli_tombstone/";

View File

@@ -19,9 +19,9 @@ use common_meta::key::tombstone::TombstoneManager;
use common_meta::kv_backend::KvBackendRef;
use common_meta::rpc::store::RangeRequest;
use crate::Tool;
use crate::metadata::common::StoreConfig;
use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX;
use crate::Tool;
/// Delete key-value pairs logically from the metadata store.
#[derive(Debug, Default, Parser)]
@@ -102,8 +102,8 @@ mod tests {
use common_meta::kv_backend::{KvBackend, KvBackendRef};
use common_meta::rpc::store::RangeRequest;
use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX;
use crate::metadata::control::del::key::KeyDeleter;
use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX;
use crate::metadata::control::test_utils::put_key;
#[tokio::test]

View File

@@ -18,16 +18,16 @@ use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::format_full_table_name;
use common_error::ext::BoxedError;
use common_meta::ddl::utils::get_region_wal_options;
use common_meta::key::TableMetadataManager;
use common_meta::key::table_name::TableNameManager;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use store_api::storage::TableId;
use crate::Tool;
use crate::error::{InvalidArgumentsSnafu, TableNotFoundSnafu};
use crate::metadata::common::StoreConfig;
use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX;
use crate::metadata::control::utils::get_table_id_by_name;
use crate::Tool;
/// Delete table metadata logically from the metadata store.
#[derive(Debug, Default, Parser)]
@@ -183,15 +183,15 @@ mod tests {
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_meta::key::TableMetadataManager;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::chroot::ChrootKvBackend;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::{KvBackend, KvBackendRef};
use common_meta::rpc::store::RangeRequest;
use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX;
use crate::metadata::control::del::table::TableMetadataDeleter;
use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX;
use crate::metadata::control::test_utils::prepare_physical_table_metadata;
#[tokio::test]

View File

@@ -19,18 +19,18 @@ use clap::{Parser, Subcommand};
use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::format_full_table_name;
use common_error::ext::BoxedError;
use common_meta::key::TableMetadataManager;
use common_meta::key::table_info::TableInfoKey;
use common_meta::key::table_route::TableRouteKey;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use common_meta::range_stream::{DEFAULT_PAGE_SIZE, PaginationStream};
use common_meta::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use common_meta::rpc::store::RangeRequest;
use futures::TryStreamExt;
use crate::Tool;
use crate::error::InvalidArgumentsSnafu;
use crate::metadata::common::StoreConfig;
use crate::metadata::control::utils::{decode_key_value, get_table_id_by_name, json_fromatter};
use crate::Tool;
/// Getting metadata from metadata store.
#[derive(Subcommand)]

View File

@@ -31,18 +31,18 @@ use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::NodeManagerRef;
use common_meta::peer::Peer;
use common_meta::rpc::router::{RegionRoute, find_leaders};
use common_meta::rpc::router::{find_leaders, RegionRoute};
use common_telemetry::{error, info, warn};
use futures::TryStreamExt;
use snafu::{ResultExt, ensure};
use snafu::{ensure, ResultExt};
use store_api::storage::TableId;
use crate::Tool;
use crate::error::{
InvalidArgumentsSnafu, Result, SendRequestToDatanodeSnafu, TableMetadataSnafu, UnexpectedSnafu,
};
use crate::metadata::common::StoreConfig;
use crate::metadata::utils::{FullTableMetadata, IteratorInput, TableMetadataIterator};
use crate::Tool;
/// Repair metadata of logical tables.
#[derive(Debug, Default, Parser)]
@@ -301,10 +301,7 @@ impl RepairTool {
warn!(
"Sending alter table requests to datanodes for table: {} failed for the datanodes: {:?}",
full_table_name,
failed_peers
.iter()
.map(|(peer, _)| peer.id)
.collect::<Vec<_>>()
failed_peers.iter().map(|(peer, _)| peer.id).collect::<Vec<_>>()
);
let create_table_expr =
@@ -323,7 +320,8 @@ impl RepairTool {
}
info!(
"Region not found for table: {}, datanode: {}, trying to create the logical table on that datanode",
full_table_name, peer.id
full_table_name,
peer.id
);
// If the alter table request fails for any datanode, we attempt to create the table on that datanode

View File

@@ -13,11 +13,11 @@
// limitations under the License.
use client::api::v1::alter_table_expr::Kind;
use client::api::v1::region::{AlterRequests, RegionRequest, RegionRequestHeader, region_request};
use client::api::v1::region::{region_request, AlterRequests, RegionRequest, RegionRequestHeader};
use client::api::v1::{AddColumn, AddColumns, AlterTableExpr};
use common_meta::ddl::alter_logical_tables::make_alter_region_request;
use common_meta::peer::Peer;
use common_meta::rpc::router::{RegionRoute, find_leader_regions};
use common_meta::rpc::router::{find_leader_regions, RegionRoute};
use operator::expr_helper::column_schemas_to_defs;
use snafu::ResultExt;
use store_api::storage::{RegionId, TableId};

View File

@@ -14,12 +14,12 @@
use std::collections::HashMap;
use client::api::v1::region::{region_request, CreateRequests, RegionRequest, RegionRequestHeader};
use client::api::v1::CreateTableExpr;
use client::api::v1::region::{CreateRequests, RegionRequest, RegionRequestHeader, region_request};
use common_meta::ddl::create_logical_tables::create_region_request_builder;
use common_meta::ddl::utils::region_storage_path;
use common_meta::peer::Peer;
use common_meta::rpc::router::{RegionRoute, find_leader_regions};
use common_meta::rpc::router::{find_leader_regions, RegionRoute};
use operator::expr_helper::column_schemas_to_defs;
use snafu::ResultExt;
use store_api::storage::{RegionId, TableId};

View File

@@ -19,13 +19,13 @@ use clap::{Parser, Subcommand};
use common_base::secrets::{ExposeSecret, SecretString};
use common_error::ext::BoxedError;
use common_meta::snapshot::MetadataSnapshotManager;
use object_store::ObjectStore;
use object_store::services::{Fs, S3};
use object_store::ObjectStore;
use snafu::{OptionExt, ResultExt};
use crate::Tool;
use crate::error::{InvalidFilePathSnafu, OpenDalSnafu, S3ConfigNotSetSnafu};
use crate::metadata::common::StoreConfig;
use crate::Tool;
/// Subcommand for metadata snapshot operations, including saving snapshots, restoring from snapshots, and viewing snapshot information.
#[derive(Subcommand)]
@@ -258,13 +258,11 @@ impl Tool for MetaRestoreTool {
Ok(())
} else if !self.force {
common_telemetry::warn!(
"The target source is not clean, if you want to restore the metadata snapshot forcefully, please use --force option."
);
"The target source is not clean, if you want to restore the metadata snapshot forcefully, please use --force option."
);
Ok(())
} else {
common_telemetry::info!(
"The target source is not clean, We will restore the metadata snapshot with --force."
);
common_telemetry::info!("The target source is not clean, We will restore the metadata snapshot with --force.");
self.inner
.restore(&self.source_file)
.await

View File

@@ -17,9 +17,9 @@ use std::collections::VecDeque;
use async_stream::try_stream;
use common_catalog::consts::METRIC_ENGINE;
use common_catalog::format_full_table_name;
use common_meta::key::TableMetadataManager;
use common_meta::key::table_name::TableNameKey;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use futures::Stream;
use snafu::{OptionExt, ResultExt};

View File

@@ -14,11 +14,11 @@
use std::sync::Arc;
use api::v1::HealthCheckRequest;
use api::v1::flow::flow_client::FlowClient as PbFlowClient;
use api::v1::health_check_client::HealthCheckClient;
use api::v1::prometheus_gateway_client::PrometheusGatewayClient;
use api::v1::region::region_client::RegionClient as PbRegionClient;
use api::v1::HealthCheckRequest;
use arrow_flight::flight_service_client::FlightServiceClient;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager, ClientTlsOption};
use parking_lot::RwLock;
@@ -27,7 +27,7 @@ use tonic::codec::CompressionEncoding;
use tonic::transport::Channel;
use crate::load_balance::{LoadBalance, Loadbalancer};
use crate::{Result, error};
use crate::{error, Result};
pub struct FlightClient {
addr: String,

View File

@@ -21,9 +21,9 @@ use common_meta::node_manager::{DatanodeManager, DatanodeRef, FlownodeManager, F
use common_meta::peer::Peer;
use moka::future::{Cache, CacheBuilder};
use crate::Client;
use crate::flow::FlowRequester;
use crate::region::RegionRequester;
use crate::Client;
pub struct NodeClients {
channel_manager: ChannelManager,

View File

@@ -27,8 +27,8 @@ use api::v1::{
};
use arrow_flight::{FlightData, Ticket};
use async_stream::stream;
use base64::Engine;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use common_catalog::build_db_string;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::BoxedError;
@@ -42,7 +42,7 @@ use common_telemetry::{error, warn};
use futures::future;
use futures_util::{Stream, StreamExt, TryStreamExt};
use prost::Message;
use snafu::{OptionExt, ResultExt, ensure};
use snafu::{ensure, OptionExt, ResultExt};
use tonic::metadata::{AsciiMetadataKey, AsciiMetadataValue, MetadataMap, MetadataValue};
use tonic::transport::Channel;
@@ -50,7 +50,7 @@ use crate::error::{
ConvertFlightDataSnafu, Error, FlightGetSnafu, IllegalFlightMessagesSnafu,
InvalidTonicMetadataValueSnafu,
};
use crate::{Client, Result, error, from_grpc_response};
use crate::{error, from_grpc_response, Client, Result};
type FlightDataStream = Pin<Box<dyn Stream<Item = FlightData> + Send>>;
@@ -379,10 +379,11 @@ impl Database {
tonic_code,
e
);
Err(BoxedError::new(e)).with_context(|_| FlightGetSnafu {
let error = Err(BoxedError::new(e)).with_context(|_| FlightGetSnafu {
addr: client.addr().to_string(),
tonic_code,
})
});
error
})?;
let flight_data_stream = response.into_inner();

View File

@@ -18,9 +18,9 @@ use common_error::define_from_tonic_status;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::{Location, Snafu, location};
use tonic::Code;
use snafu::{location, Location, Snafu};
use tonic::metadata::errors::InvalidMetadataValue;
use tonic::Code;
#[derive(Snafu)]
#[snafu(visibility(pub))]

View File

@@ -12,14 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::flow::{DirtyWindowRequests, FlowRequest, FlowResponse};
use api::v1::flow::{DirtyWindowRequest, DirtyWindowRequests, FlowRequest, FlowResponse};
use api::v1::region::InsertRequests;
use common_error::ext::BoxedError;
use common_meta::node_manager::Flownode;
use snafu::ResultExt;
use crate::Client;
use crate::error::{FlowServerSnafu, Result};
use crate::Client;
#[derive(Debug)]
pub struct FlowRequester {
@@ -47,7 +47,7 @@ impl Flownode for FlowRequester {
async fn handle_mark_window_dirty(
&self,
req: DirtyWindowRequests,
req: DirtyWindowRequest,
) -> common_meta::error::Result<FlowResponse> {
self.handle_mark_window_dirty(req)
.await
@@ -102,10 +102,12 @@ impl FlowRequester {
Ok(response)
}
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequests) -> Result<FlowResponse> {
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse> {
let (addr, mut client) = self.client.raw_flow_client()?;
let response = client
.handle_mark_dirty_time_window(req)
.handle_mark_dirty_time_window(DirtyWindowRequests {
requests: vec![req],
})
.await
.or_else(|e| {
let code = e.code();

View File

@@ -15,8 +15,8 @@
use std::sync::Arc;
use api::region::RegionResponse;
use api::v1::ResponseHeader;
use api::v1::region::RegionRequest;
use api::v1::ResponseHeader;
use arc_swap::ArcSwapOption;
use arrow_flight::Ticket;
use async_stream::stream;
@@ -33,7 +33,7 @@ use common_telemetry::error;
use common_telemetry::tracing_context::TracingContext;
use prost::Message;
use query::query_engine::DefaultSerializer;
use snafu::{OptionExt, ResultExt, location};
use snafu::{location, OptionExt, ResultExt};
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use tokio_stream::StreamExt;
@@ -41,7 +41,7 @@ use crate::error::{
self, ConvertFlightDataSnafu, FlightGetSnafu, IllegalDatabaseResponseSnafu,
IllegalFlightMessagesSnafu, MissingFieldSnafu, Result, ServerSnafu,
};
use crate::{Client, Error, metrics};
use crate::{metrics, Client, Error};
#[derive(Debug)]
pub struct RegionRequester {
@@ -115,10 +115,11 @@ impl RegionRequester {
flight_client.addr(),
tonic_code
);
Err(BoxedError::new(e)).with_context(|_| FlightGetSnafu {
let error = Err(BoxedError::new(e)).with_context(|_| FlightGetSnafu {
addr: flight_client.addr().to_string(),
tonic_code,
})
});
error
})?;
let flight_data_stream = response.into_inner();

View File

@@ -18,7 +18,7 @@ use clap::{Parser, Subcommand};
use cmd::datanode::builder::InstanceBuilder;
use cmd::error::{InitTlsProviderSnafu, Result};
use cmd::options::GlobalOptions;
use cmd::{App, cli, datanode, flownode, frontend, metasrv, standalone};
use cmd::{cli, datanode, flownode, frontend, metasrv, standalone, App};
use common_base::Plugins;
use common_version::{verbose_version, version};
use servers::install_ring_crypto_provider;
@@ -143,10 +143,8 @@ async fn start(cli: Command) -> Result<()> {
}
fn setup_human_panic() {
human_panic::setup_panic!(
human_panic::Metadata::new("GreptimeDB", version())
.homepage("https://github.com/GreptimeTeam/greptimedb/discussions")
);
human_panic::setup_panic!(human_panic::Metadata::new("GreptimeDB", version())
.homepage("https://github.com/GreptimeTeam/greptimedb/discussions"));
common_telemetry::set_panic_hook();
}

View File

@@ -20,7 +20,7 @@ use snafu::ResultExt;
use tracing_appender::non_blocking::WorkerGuard;
use crate::options::GlobalOptions;
use crate::{App, Result, error};
use crate::{error, App, Result};
pub const APP_NAME: &str = "greptime-cli";
use async_trait::async_trait;
@@ -109,7 +109,7 @@ mod tests {
use crate::error::Result as CmdResult;
use crate::options::GlobalOptions;
use crate::{App, cli, standalone};
use crate::{cli, standalone, App};
#[tokio::test(flavor = "multi_thread")]
async fn test_export_create_table_with_quoted_names() -> CmdResult<()> {

View File

@@ -20,20 +20,20 @@ use std::time::Duration;
use async_trait::async_trait;
use clap::Parser;
use common_config::Configurable;
use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
use common_telemetry::{info, warn};
use common_wal::config::DatanodeWalConfig;
use datanode::datanode::Datanode;
use meta_client::MetaClientOptions;
use snafu::{ResultExt, ensure};
use snafu::{ensure, ResultExt};
use tracing_appender::non_blocking::WorkerGuard;
use crate::App;
use crate::datanode::builder::InstanceBuilder;
use crate::error::{
LoadLayeredConfigSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu,
};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::App;
pub const APP_NAME: &str = "greptime-datanode";
@@ -187,39 +187,29 @@ impl StartCommand {
if let Some(addr) = &self.rpc_bind_addr {
opts.grpc.bind_addr.clone_from(addr);
} else if let Some(addr) = &opts.rpc_addr {
warn!(
"Use the deprecated attribute `DatanodeOptions.rpc_addr`, please use `grpc.addr` instead."
);
warn!("Use the deprecated attribute `DatanodeOptions.rpc_addr`, please use `grpc.addr` instead.");
opts.grpc.bind_addr.clone_from(addr);
}
if let Some(server_addr) = &self.rpc_server_addr {
opts.grpc.server_addr.clone_from(server_addr);
} else if let Some(server_addr) = &opts.rpc_hostname {
warn!(
"Use the deprecated attribute `DatanodeOptions.rpc_hostname`, please use `grpc.hostname` instead."
);
warn!("Use the deprecated attribute `DatanodeOptions.rpc_hostname`, please use `grpc.hostname` instead.");
opts.grpc.server_addr.clone_from(server_addr);
}
if let Some(runtime_size) = opts.rpc_runtime_size {
warn!(
"Use the deprecated attribute `DatanodeOptions.rpc_runtime_size`, please use `grpc.runtime_size` instead."
);
warn!("Use the deprecated attribute `DatanodeOptions.rpc_runtime_size`, please use `grpc.runtime_size` instead.");
opts.grpc.runtime_size = runtime_size;
}
if let Some(max_recv_message_size) = opts.rpc_max_recv_message_size {
warn!(
"Use the deprecated attribute `DatanodeOptions.rpc_max_recv_message_size`, please use `grpc.max_recv_message_size` instead."
);
warn!("Use the deprecated attribute `DatanodeOptions.rpc_max_recv_message_size`, please use `grpc.max_recv_message_size` instead.");
opts.grpc.max_recv_message_size = max_recv_message_size;
}
if let Some(max_send_message_size) = opts.rpc_max_send_message_size {
warn!(
"Use the deprecated attribute `DatanodeOptions.rpc_max_send_message_size`, please use `grpc.max_send_message_size` instead."
);
warn!("Use the deprecated attribute `DatanodeOptions.rpc_max_send_message_size`, please use `grpc.max_send_message_size` instead.");
opts.grpc.max_send_message_size = max_send_message_size;
}
@@ -440,24 +430,20 @@ mod tests {
#[test]
fn test_try_from_cmd() {
assert!(
(StartCommand {
metasrv_addrs: Some(vec!["127.0.0.1:3002".to_string()]),
..Default::default()
})
.load_options(&GlobalOptions::default())
.is_err()
);
assert!((StartCommand {
metasrv_addrs: Some(vec!["127.0.0.1:3002".to_string()]),
..Default::default()
})
.load_options(&GlobalOptions::default())
.is_err());
// Providing node_id but leave metasrv_addr absent is ok since metasrv_addr has default value
assert!(
(StartCommand {
node_id: Some(42),
..Default::default()
})
.load_options(&GlobalOptions::default())
.is_ok()
);
assert!((StartCommand {
node_id: Some(42),
..Default::default()
})
.load_options(&GlobalOptions::default())
.is_ok());
}
#[test]

View File

@@ -26,7 +26,7 @@ use meta_client::MetaClientType;
use snafu::{OptionExt, ResultExt};
use tracing_appender::non_blocking::WorkerGuard;
use crate::datanode::{APP_NAME, DatanodeOptions, Instance};
use crate::datanode::{DatanodeOptions, Instance, APP_NAME};
use crate::error::{MetaClientInitSnafu, MissingConfigSnafu, Result, StartDatanodeSnafu};
use crate::{create_resource_limit_metrics, log_versions, maybe_activate_heap_profile};

View File

@@ -25,20 +25,20 @@ use common_base::Plugins;
use common_config::{Configurable, DEFAULT_DATA_HOME};
use common_grpc::channel_manager::ChannelConfig;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::key::TableMetadataManager;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::TableMetadataManager;
use common_telemetry::info;
use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
use common_version::{short_version, verbose_version};
use flow::{
FlownodeBuilder, FlownodeInstance, FlownodeServiceBuilder, FrontendClient, FrontendInvoker,
get_flow_auth_options,
get_flow_auth_options, FlownodeBuilder, FlownodeInstance, FlownodeServiceBuilder,
FrontendClient, FrontendInvoker,
};
use meta_client::{MetaClientOptions, MetaClientType};
use snafu::{OptionExt, ResultExt, ensure};
use snafu::{ensure, OptionExt, ResultExt};
use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{
@@ -46,7 +46,7 @@ use crate::error::{
MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu,
};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_heap_profile};
use crate::{create_resource_limit_metrics, log_versions, maybe_activate_heap_profile, App};
pub const APP_NAME: &str = "greptime-flownode";
@@ -341,18 +341,8 @@ impl StartCommand {
.build(),
);
// flownode's frontend to datanode need not timeout.
// Some queries are expected to take long time.
let channel_config = ChannelConfig {
timeout: None,
..Default::default()
};
let client = Arc::new(NodeClients::new(channel_config));
let information_extension = Arc::new(DistributedInformationExtension::new(
meta_client.clone(),
client.clone(),
));
let information_extension =
Arc::new(DistributedInformationExtension::new(meta_client.clone()));
let catalog_manager = KvBackendCatalogManagerBuilder::new(
information_extension,
cached_meta_backend.clone(),
@@ -408,6 +398,14 @@ impl StartCommand {
flownode.setup_services(services);
let flownode = flownode;
// flownode's frontend to datanode need not timeout.
// Some queries are expected to take long time.
let channel_config = ChannelConfig {
timeout: None,
..Default::default()
};
let client = Arc::new(NodeClients::new(channel_config));
let invoker = FrontendInvoker::build_from(
flownode.flow_engine().streaming_engine(),
catalog_manager.clone(),

View File

@@ -27,11 +27,11 @@ use common_base::Plugins;
use common_config::{Configurable, DEFAULT_DATA_HOME};
use common_grpc::channel_manager::ChannelConfig;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_telemetry::info;
use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
use common_time::timezone::set_default_timezone;
use common_version::{short_version, verbose_version};
use frontend::frontend::Frontend;
@@ -48,7 +48,7 @@ use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{self, Result};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_heap_profile};
use crate::{create_resource_limit_metrics, log_versions, maybe_activate_heap_profile, App};
type FrontendOptions = GreptimeOptions<frontend::frontend::FrontendOptions>;
@@ -378,24 +378,8 @@ impl StartCommand {
.build(),
);
// frontend to datanode need not timeout.
// Some queries are expected to take long time.
let mut channel_config = ChannelConfig {
timeout: None,
tcp_nodelay: opts.datanode.client.tcp_nodelay,
connect_timeout: Some(opts.datanode.client.connect_timeout),
..Default::default()
};
if opts.grpc.flight_compression.transport_compression() {
channel_config.accept_compression = true;
channel_config.send_compression = true;
}
let client = Arc::new(NodeClients::new(channel_config));
let information_extension = Arc::new(DistributedInformationExtension::new(
meta_client.clone(),
client.clone(),
));
let information_extension =
Arc::new(DistributedInformationExtension::new(meta_client.clone()));
let process_manager = Arc::new(ProcessManager::new(
addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
@@ -429,12 +413,26 @@ impl StartCommand {
);
let heartbeat_task = Some(heartbeat_task);
// frontend to datanode need not timeout.
// Some queries are expected to take long time.
let mut channel_config = ChannelConfig {
timeout: None,
tcp_nodelay: opts.datanode.client.tcp_nodelay,
connect_timeout: Some(opts.datanode.client.connect_timeout),
..Default::default()
};
if opts.grpc.flight_compression.transport_compression() {
channel_config.accept_compression = true;
channel_config.send_compression = true;
}
let client = NodeClients::new(channel_config);
let instance = FrontendBuilder::new(
opts.clone(),
cached_meta_backend.clone(),
layered_cache_registry.clone(),
catalog_manager,
client,
Arc::new(client),
meta_client,
process_manager,
)

View File

@@ -46,7 +46,7 @@ lazy_static::lazy_static! {
/// wait for the close signal, for unix platform it's SIGINT or SIGTERM
#[cfg(unix)]
async fn start_wait_for_close_signal() -> std::io::Result<()> {
use tokio::signal::unix::{SignalKind, signal};
use tokio::signal::unix::{signal, SignalKind};
let mut sigint = signal(SignalKind::interrupt())?;
let mut sigterm = signal(SignalKind::terminate())?;
@@ -93,13 +93,13 @@ pub trait App: Send {
self.start().await?;
if self.wait_signal()
&& let Err(e) = start_wait_for_close_signal().await
{
error!(e; "Failed to listen for close signal");
// It's unusual to fail to listen for close signal, maybe there's something unexpected in
// the underlying system. So we stop the app instead of running nonetheless to let people
// investigate the issue.
if self.wait_signal() {
if let Err(e) = start_wait_for_close_signal().await {
error!(e; "Failed to listen for close signal");
// It's unusual to fail to listen for close signal, maybe there's something unexpected in
// the underlying system. So we stop the app instead of running nonetheless to let people
// investigate the issue.
}
}
self.stop().await?;

View File

@@ -21,7 +21,7 @@ use clap::Parser;
use common_base::Plugins;
use common_config::Configurable;
use common_telemetry::info;
use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
use common_version::{short_version, verbose_version};
use meta_srv::bootstrap::MetasrvInstance;
use meta_srv::metasrv::BackendImpl;
@@ -30,7 +30,7 @@ use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{self, LoadLayeredConfigSnafu, Result, StartMetaServerSnafu};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_heap_profile};
use crate::{create_resource_limit_metrics, log_versions, maybe_activate_heap_profile, App};
type MetasrvOptions = GreptimeOptions<meta_srv::metasrv::MetasrvOptions>;

View File

@@ -19,16 +19,15 @@ use std::{fs, path};
use async_trait::async_trait;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::information_schema::{DatanodeInspectRequest, InformationExtension};
use catalog::information_schema::InformationExtension;
use catalog::kvbackend::KvBackendCatalogManagerBuilder;
use catalog::process_manager::ProcessManager;
use clap::Parser;
use client::SendableRecordBatchStream;
use client::api::v1::meta::RegionRole;
use common_base::Plugins;
use common_base::readable_size::ReadableSize;
use common_base::Plugins;
use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
use common_config::{Configurable, KvBackendConfig, metadata_store_dir};
use common_config::{metadata_store_dir, Configurable, KvBackendConfig};
use common_error::ext::BoxedError;
use common_meta::cache::LayeredCacheRegistryBuilder;
use common_meta::cluster::{NodeInfo, NodeStatus};
@@ -37,8 +36,8 @@ use common_meta::ddl::flow_meta::FlowMetadataAllocator;
use common_meta::ddl::table_meta::TableMetadataAllocator;
use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
use common_meta::ddl_manager::DdlManager;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::flow::flow_state::FlowStat;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::Peer;
@@ -46,13 +45,12 @@ use common_meta::procedure_executor::LocalProcedureExecutor;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal_options_allocator::{WalOptionsAllocatorRef, build_wal_options_allocator};
use common_meta::wal_options_allocator::{build_wal_options_allocator, WalOptionsAllocatorRef};
use common_options::memory::MemoryOptions;
use common_procedure::{ProcedureInfo, ProcedureManagerRef};
use common_query::request::QueryRequest;
use common_telemetry::info;
use common_telemetry::logging::{
DEFAULT_LOGGING_DIR, LoggingOptions, SlowQueryOptions, TracingOptions,
LoggingOptions, SlowQueryOptions, TracingOptions, DEFAULT_LOGGING_DIR,
};
use common_time::timezone::set_default_timezone;
use common_version::{short_version, verbose_version};
@@ -82,13 +80,12 @@ use servers::grpc::GrpcOptions;
use servers::http::HttpOptions;
use servers::tls::{TlsMode, TlsOption};
use snafu::ResultExt;
use store_api::storage::RegionId;
use tokio::sync::RwLock;
use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{Result, StartFlownodeSnafu};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{App, create_resource_limit_metrics, error, log_versions, maybe_activate_heap_profile};
use crate::{create_resource_limit_metrics, error, log_versions, maybe_activate_heap_profile, App};
pub const APP_NAME: &str = "greptime-standalone";
@@ -218,7 +215,6 @@ impl Into<FrontendOptions> for StandaloneOptions {
}
impl StandaloneOptions {
/// Returns the `FrontendOptions` for the standalone instance.
pub fn frontend_options(&self) -> FrontendOptions {
let cloned_opts = self.clone();
FrontendOptions {
@@ -242,7 +238,6 @@ impl StandaloneOptions {
}
}
/// Returns the `DatanodeOptions` for the standalone instance.
pub fn datanode_options(&self) -> DatanodeOptions {
let cloned_opts = self.clone();
DatanodeOptions {
@@ -258,17 +253,6 @@ impl StandaloneOptions {
..Default::default()
}
}
/// Sanitize the `StandaloneOptions` to ensure the config is valid.
pub fn sanitize(&mut self) {
if self.storage.is_object_storage() {
self.storage
.store
.cache_config_mut()
.unwrap()
.sanitize(&self.storage.data_home);
}
}
}
pub struct Instance {
@@ -409,7 +393,6 @@ impl StartCommand {
.context(error::LoadLayeredConfigSnafu)?;
self.merge_with_cli_options(global_options, &mut opts.component)?;
opts.component.sanitize();
Ok(opts)
}
@@ -804,10 +787,6 @@ impl InformationExtension for StandaloneInformationExtension {
// Use `self.start_time_ms` instead.
// It's not precise but enough.
start_time_ms: self.start_time_ms,
cpus: common_config::utils::get_cpus() as u32,
memory_bytes: common_config::utils::get_sys_total_memory()
.unwrap_or_default()
.as_bytes(),
};
Ok(vec![node_info])
}
@@ -855,7 +834,7 @@ impl InformationExtension for StandaloneInformationExtension {
region_manifest: region_stat.manifest.into(),
data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
written_bytes: region_stat.written_bytes,
write_bytes: 0,
}
})
.collect::<Vec<_>>();
@@ -873,25 +852,6 @@ impl InformationExtension for StandaloneInformationExtension {
.await,
))
}
async fn inspect_datanode(
&self,
request: DatanodeInspectRequest,
) -> std::result::Result<SendableRecordBatchStream, Self::Error> {
let req = QueryRequest {
plan: request
.build_plan()
.context(catalog::error::DatafusionSnafu)?,
region_id: RegionId::default(),
header: None,
};
self.region_server
.handle_read(req)
.await
.map_err(BoxedError::new)
.context(catalog::error::InternalSnafu)
}
}
#[cfg(test)]
@@ -1161,22 +1121,4 @@ mod tests {
assert_eq!(options.logging, default_options.logging);
assert_eq!(options.region_engine, default_options.region_engine);
}
#[test]
fn test_cache_config() {
let toml_str = r#"
[storage]
data_home = "test_data_home"
type = "S3"
[storage.cache_config]
enable_read_cache = true
"#;
let mut opts: StandaloneOptions = toml::from_str(toml_str).unwrap();
opts.sanitize();
assert!(opts.storage.store.cache_config().unwrap().enable_read_cache);
assert_eq!(
opts.storage.store.cache_config().unwrap().cache_path,
"test_data_home"
);
}
}

View File

@@ -18,9 +18,9 @@ use cmd::options::GreptimeOptions;
use cmd::standalone::StandaloneOptions;
use common_config::{Configurable, DEFAULT_DATA_HOME};
use common_options::datanode::{ClientOptions, DatanodeClientOptions};
use common_telemetry::logging::{DEFAULT_LOGGING_DIR, DEFAULT_OTLP_HTTP_ENDPOINT, LoggingOptions};
use common_wal::config::DatanodeWalConfig;
use common_telemetry::logging::{LoggingOptions, DEFAULT_LOGGING_DIR, DEFAULT_OTLP_HTTP_ENDPOINT};
use common_wal::config::raft_engine::RaftEngineConfig;
use common_wal::config::DatanodeWalConfig;
use datanode::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
use file_engine::config::EngineConfig as FileEngineConfig;
use flow::FlownodeOptions;

View File

@@ -19,8 +19,8 @@
use std::fmt::{Debug, Display, Formatter};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use futures::task::AtomicWaker;

View File

@@ -17,8 +17,8 @@ use std::io;
use std::ops::Range;
use std::path::Path;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use async_trait::async_trait;

View File

@@ -7,7 +7,7 @@ use std::ops::{Div, Mul};
use std::str::FromStr;
use serde::de::{Unexpected, Visitor};
use serde::{Deserialize, Deserializer, Serialize, Serializer, de};
use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
const UNIT: u64 = 1;

View File

@@ -34,7 +34,7 @@
use std::fmt::Debug;
use std::{any, fmt};
use serde::{Deserialize, Serialize, de, ser};
use serde::{de, ser, Deserialize, Serialize};
use zeroize::{Zeroize, ZeroizeOnDrop};
/// Wrapper type for strings that contains secrets. See also [SecretBox].

View File

@@ -13,8 +13,8 @@
// limitations under the License.
use config::{Environment, File, FileFormat};
use serde::Serialize;
use serde::de::DeserializeOwned;
use serde::Serialize;
use snafu::ResultExt;
use crate::error::{LoadLayeredConfigSnafu, Result, SerdeJsonSnafu, TomlFormatSnafu};

View File

@@ -39,24 +39,6 @@ pub fn get_sys_total_memory() -> Option<ReadableSize> {
}
}
/// `ResourceSpec` holds the static resource specifications of a node,
/// such as CPU cores and memory capacity. These values are fixed
/// at startup and do not change dynamically during runtime.
#[derive(Debug, Clone, Copy)]
pub struct ResourceSpec {
pub cpus: usize,
pub memory: Option<ReadableSize>,
}
impl Default for ResourceSpec {
fn default() -> Self {
Self {
cpus: get_cpus(),
memory: get_sys_total_memory(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -45,11 +45,11 @@ pub trait ArrowWriterCloser {
}
impl<
T: AsyncWrite + Send + Unpin,
U: DfRecordBatchEncoder + ArrowWriterCloser,
F: Fn(String) -> Fut,
Fut: Future<Output = Result<T>>,
> LazyBufferedWriter<T, U, F>
T: AsyncWrite + Send + Unpin,
U: DfRecordBatchEncoder + ArrowWriterCloser,
F: Fn(String) -> Fut,
Fut: Future<Output = Result<T>>,
> LazyBufferedWriter<T, U, F>
{
/// Closes `LazyBufferedWriter` and optionally flushes all data to underlying storage
/// if any row's been written.
@@ -67,11 +67,11 @@ impl<
}
impl<
T: AsyncWrite + Send + Unpin,
U: DfRecordBatchEncoder,
F: Fn(String) -> Fut,
Fut: Future<Output = Result<T>>,
> LazyBufferedWriter<T, U, F>
T: AsyncWrite + Send + Unpin,
U: DfRecordBatchEncoder,
F: Fn(String) -> Fut,
Fut: Future<Output = Result<T>>,
> LazyBufferedWriter<T, U, F>
{
/// Closes the writer and flushes the buffer data.
pub async fn close_inner_writer(&mut self) -> Result<()> {

View File

@@ -42,11 +42,11 @@ use self::csv::CsvFormat;
use self::json::JsonFormat;
use self::orc::OrcFormat;
use self::parquet::ParquetFormat;
use crate::DEFAULT_WRITE_BUFFER_SIZE;
use crate::buffered_writer::{DfRecordBatchEncoder, LazyBufferedWriter};
use crate::compression::CompressionType;
use crate::error::{self, Result};
use crate::share_buffer::SharedBuffer;
use crate::DEFAULT_WRITE_BUFFER_SIZE;
pub const FORMAT_COMPRESSION_TYPE: &str = "compression_type";
pub const FORMAT_DELIMITER: &str = "delimiter";
@@ -158,10 +158,10 @@ pub fn open_with_decoder<T: ArrowDecoder, F: Fn() -> DataFusionResult<T>>(
let stream = futures::stream::poll_fn(move |cx| {
loop {
if buffered.is_empty()
&& let Some(result) = futures::ready!(upstream.poll_next_unpin(cx))
{
buffered = result?;
if buffered.is_empty() {
if let Some(result) = futures::ready!(upstream.poll_next_unpin(cx)) {
buffered = result?;
};
}
let decoded = decoder.decode(buffered.as_ref())?;

View File

@@ -30,7 +30,7 @@ use tokio_util::io::SyncIoBridge;
use crate::buffered_writer::DfRecordBatchEncoder;
use crate::compression::CompressionType;
use crate::error::{self, Result};
use crate::file_format::{self, FileFormat, stream_to_file};
use crate::file_format::{self, stream_to_file, FileFormat};
use crate::share_buffer::SharedBuffer;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -160,8 +160,8 @@ mod tests {
use super::*;
use crate::file_format::{
FORMAT_COMPRESSION_TYPE, FORMAT_DELIMITER, FORMAT_HAS_HEADER,
FORMAT_SCHEMA_INFER_MAX_RECORD, FileFormat,
FileFormat, FORMAT_COMPRESSION_TYPE, FORMAT_DELIMITER, FORMAT_HAS_HEADER,
FORMAT_SCHEMA_INFER_MAX_RECORD,
};
use crate::test_util::{format_schema, test_store};

View File

@@ -17,7 +17,7 @@ use std::io::BufReader;
use std::str::FromStr;
use arrow::json;
use arrow::json::reader::{ValueIter, infer_json_schema_from_iterator};
use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter};
use arrow::json::writer::LineDelimited;
use arrow::record_batch::RecordBatch;
use arrow_schema::Schema;
@@ -32,7 +32,7 @@ use tokio_util::io::SyncIoBridge;
use crate::buffered_writer::DfRecordBatchEncoder;
use crate::compression::CompressionType;
use crate::error::{self, Result};
use crate::file_format::{self, FileFormat, stream_to_file};
use crate::file_format::{self, stream_to_file, FileFormat};
use crate::share_buffer::SharedBuffer;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -133,7 +133,7 @@ mod tests {
use common_test_util::find_workspace_path;
use super::*;
use crate::file_format::{FORMAT_COMPRESSION_TYPE, FORMAT_SCHEMA_INFER_MAX_RECORD, FileFormat};
use crate::file_format::{FileFormat, FORMAT_COMPRESSION_TYPE, FORMAT_SCHEMA_INFER_MAX_RECORD};
use crate::test_util::{format_schema, test_store};
fn test_data_root() -> String {

View File

@@ -15,8 +15,8 @@
use arrow_schema::Schema;
use async_trait::async_trait;
use bytes::Bytes;
use futures::FutureExt;
use futures::future::BoxFuture;
use futures::FutureExt;
use object_store::ObjectStore;
use orc_rust::arrow_reader::ArrowReaderBuilder;
use orc_rust::async_arrow_reader::ArrowStreamReader;

View File

@@ -21,29 +21,29 @@ use async_trait::async_trait;
use datafusion::datasource::physical_plan::{FileMeta, ParquetFileReaderFactory};
use datafusion::error::Result as DatafusionResult;
use datafusion::parquet::arrow::async_reader::AsyncFileReader;
use datafusion::parquet::arrow::{ArrowWriter, parquet_to_arrow_schema};
use datafusion::parquet::arrow::{parquet_to_arrow_schema, ArrowWriter};
use datafusion::parquet::errors::{ParquetError, Result as ParquetResult};
use datafusion::parquet::file::metadata::ParquetMetaData;
use datafusion::parquet::format::FileMetaData;
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::SendableRecordBatchStream;
use datatypes::schema::SchemaRef;
use futures::StreamExt;
use futures::future::BoxFuture;
use futures::StreamExt;
use object_store::{FuturesAsyncReader, ObjectStore};
use parquet::arrow::AsyncArrowWriter;
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::arrow::AsyncArrowWriter;
use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
use parquet::schema::types::ColumnPath;
use snafu::ResultExt;
use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
use crate::DEFAULT_WRITE_BUFFER_SIZE;
use crate::buffered_writer::{ArrowWriterCloser, DfRecordBatchEncoder};
use crate::error::{self, Result, WriteObjectSnafu, WriteParquetSnafu};
use crate::file_format::FileFormat;
use crate::share_buffer::SharedBuffer;
use crate::DEFAULT_WRITE_BUFFER_SIZE;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct ParquetFormat {}
@@ -196,7 +196,10 @@ pub async fn stream_to_parquet(
concurrency: usize,
) -> Result<usize> {
let write_props = column_wise_config(
WriterProperties::builder().set_compression(Compression::ZSTD(ZstdLevel::default())),
WriterProperties::builder()
.set_compression(Compression::ZSTD(ZstdLevel::default()))
.set_statistics_truncate_length(None)
.set_column_index_truncate_length(None),
schema,
)
.build();

View File

@@ -24,8 +24,8 @@ use datafusion::datasource::physical_plan::{
};
use datafusion::datasource::source::DataSourceExec;
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
use datafusion_orc::OrcSource;
use futures::StreamExt;
@@ -204,15 +204,15 @@ async fn test_orc_opener() {
config: scan_config(schema.clone(), None, path, file_source.clone()),
file_source: file_source.clone(),
expected: vec![
"+----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+",
"| double_a | a | b | str_direct | d | e | f | int_short_repeated | int_neg_short_repeated | int_delta | int_neg_delta | int_direct | int_neg_direct | bigint_direct | bigint_neg_direct | bigint_other | utf8_increase | utf8_decrease | timestamp_simple | date_simple |",
"+----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+",
"| 1.0 | 1.0 | true | a | a | ddd | aaaaa | 5 | -5 | 1 | 5 | 1 | -1 | 1 | -1 | 5 | a | eeeee | 2023-04-01T20:15:30.002 | 2023-04-01 |",
"| 2.0 | 2.0 | false | cccccc | bb | cc | bbbbb | 5 | -5 | 2 | 4 | 6 | -6 | 6 | -6 | -5 | bb | dddd | 2021-08-22T07:26:44.525777 | 2023-03-01 |",
"| 3.0 | | | | | | | | | | | | | | | 1 | ccc | ccc | 2023-01-01T00:00:00 | 2023-01-01 |",
"| 4.0 | 4.0 | true | ddd | ccc | bb | ccccc | 5 | -5 | 4 | 2 | 3 | -3 | 3 | -3 | 5 | dddd | bb | 2023-02-01T00:00:00 | 2023-02-01 |",
"| 5.0 | 5.0 | false | ee | ddd | a | ddddd | 5 | -5 | 5 | 1 | 2 | -2 | 2 | -2 | 5 | eeeee | a | 2023-03-01T00:00:00 | 2023-03-01 |",
"+----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+",
"+----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+",
"| double_a | a | b | str_direct | d | e | f | int_short_repeated | int_neg_short_repeated | int_delta | int_neg_delta | int_direct | int_neg_direct | bigint_direct | bigint_neg_direct | bigint_other | utf8_increase | utf8_decrease | timestamp_simple | date_simple |",
"+----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+",
"| 1.0 | 1.0 | true | a | a | ddd | aaaaa | 5 | -5 | 1 | 5 | 1 | -1 | 1 | -1 | 5 | a | eeeee | 2023-04-01T20:15:30.002 | 2023-04-01 |",
"| 2.0 | 2.0 | false | cccccc | bb | cc | bbbbb | 5 | -5 | 2 | 4 | 6 | -6 | 6 | -6 | -5 | bb | dddd | 2021-08-22T07:26:44.525777 | 2023-03-01 |",
"| 3.0 | | | | | | | | | | | | | | | 1 | ccc | ccc | 2023-01-01T00:00:00 | 2023-01-01 |",
"| 4.0 | 4.0 | true | ddd | ccc | bb | ccccc | 5 | -5 | 4 | 2 | 3 | -3 | 3 | -3 | 5 | dddd | bb | 2023-02-01T00:00:00 | 2023-02-01 |",
"| 5.0 | 5.0 | false | ee | ddd | a | ddddd | 5 | -5 | 5 | 1 | 2 | -2 | 2 | -2 | 5 | eeeee | a | 2023-03-01T00:00:00 | 2023-03-01 |",
"+----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+",
],
},
Test {

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use futures::{TryStreamExt, future};
use futures::{future, TryStreamExt};
use object_store::{Entry, ObjectStore};
use regex::Regex;
use snafu::ResultExt;

View File

@@ -12,9 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use object_store::ObjectStore;
use object_store::services::Fs;
use object_store::util::DefaultLoggingInterceptor;
use object_store::ObjectStore;
use snafu::ResultExt;
use crate::error::{BuildBackendSnafu, Result};

Some files were not shown because too many files have changed in this diff Show More