Compare commits

..

85 Commits

Author SHA1 Message Date
Yingwen
5d644c0b7f chore: bump version to v0.7.0 (#3433) 2024-03-05 12:07:37 +00:00
Ruihang Xia
020635063c feat: implement multi-dim partition rule (#3409)
* generate expr rule

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* implement show create for new partition rule

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* implement row spliter

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: fix failed tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: fix lint issues

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: ignore tests for deprecated partition rule

* chore: remove unused partition rule tests setup

* test(sqlness): add basic partition tests

* test(multi_dim): add basic find region test

* address CR comments

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: WenyXu <wenymedia@gmail.com>
Co-authored-by: WenyXu <wenymedia@gmail.com>
2024-03-05 11:39:15 +00:00
dependabot[bot]
97cbfcfe23 build(deps): bump mio from 0.8.10 to 0.8.11 (#3434)
Bumps [mio](https://github.com/tokio-rs/mio) from 0.8.10 to 0.8.11.
- [Release notes](https://github.com/tokio-rs/mio/releases)
- [Changelog](https://github.com/tokio-rs/mio/blob/master/CHANGELOG.md)
- [Commits](https://github.com/tokio-rs/mio/compare/v0.8.10...v0.8.11)

---
updated-dependencies:
- dependency-name: mio
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-03-05 11:04:14 +00:00
Lei, HUANG
7183fa198c refactor: make MergeTreeMemtable the default choice (#3430)
* refactor: make MergeTreeMemtable the default choice

* refactor: reformat

* chore: add doc to config
2024-03-05 10:00:08 +00:00
Lei, HUANG
02b18fbca1 feat: decode prom requests to grpc (#3425)
* hack: inline decode

* move to servers

* fix: samples lost

* add bench

* remove useless functions

* wip

* feat: remove object pools

* fix: minor issues

* fix: remove useless dep

* chore: rebase main

* format

* finish

* fix: format

* feat: introduce request pool

* try to fix license issue

* fix: clippy

* resolve comments

* fix:typo

* remove useless comments
2024-03-05 09:47:32 +00:00
shuiyisong
7b1c3503d0 fix: complete interceptors for all frontend entry (#3428) 2024-03-05 09:38:47 +00:00
liyang
6fd2ff49d5 ci: refine windows output env (#3431) 2024-03-05 08:38:28 +00:00
WU Jingdi
53f2a5846c feat: support tracing rule sampler (#3405)
* feat: support tracing rule sampler

* chore: simplify code
2024-03-05 15:40:02 +08:00
Yingwen
49157868f9 feat: Correct server metrics and add more metrics for scan (#3426)
* feat: drop timer on stream terminated

* refactor: combine metrics into a histogram vec

* refactor: frontend grpc metrics

* feat: add metrics middleware layer to grpc server

* refactor: move http metrics layer to metrics mod

* feat: bucket for grpc/http elapsed

* feat: remove duplicate metrics

* style: fix cilppy

* fix: incorrect bucket of promql series

* feat: more metrics for mito

* feat: convert cost

* test: fix metrics test
2024-03-04 10:15:10 +00:00
Ruihang Xia
ae2c18e1cf docs(rfcs): multi-dimension partition rule (#3350)
* docs(rfcs): multi-dimension partition rule

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* change math block type

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update tracking issue

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update discussion

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-03-04 08:10:54 +00:00
dennis zhuang
e6819412c5 refactor: show tables and show databases (#3423)
* refactor: show tables and show databases

* chore: clean code
2024-03-04 06:15:17 +00:00
tison
2a675e0794 docs: update pull_request_template.md (#3421) 2024-03-03 09:51:44 +00:00
JeremyHi
0edf1bbacc feat: reduce a clone of string (#3422) 2024-03-03 08:09:17 +00:00
Eugene Tolbakov
8609977b52 feat: add verbose support for tql explain/analyze (#3390)
* feat: add verbose support for tql explain/analyze

* chore: apply clippy suggestions

* feat: add sqlness tests

* fix: adjust sqlness replace rules

* fix: address CR (move tql explain/analyze inside common folder)

* fix: address CR(improve comments to indicate that verbose is optional)
2024-03-02 11:18:22 +00:00
JeremyHi
2d975e4f22 feat: tableref cache (#3420)
* feat: tableref cache

* chore: minor refactor

* chore: avoid to string

* chore: change log level

* feat: add metrics for prometheus remote write decode
2024-03-02 07:37:31 +00:00
Kould
00cbbc97ae feat: support Create Table ... Like (#3372)
* feat: support `Create Table ... Like`

* fix: `check_permission` for `Create Table ... Like`

* style: renaming `name` -> `table_name` & `target` -> `source_name` and make `Create Table ... Like` testcase more complicated

* rebase

* avoid _ fn

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
Co-authored-by: tison <wander4096@gmail.com>
2024-03-02 06:34:13 +00:00
niebayes
7d30c2484b fix: mitigate memory spike during startup (#3418)
* fix: fix memory spike during startup

* fix: allocate a region write ctx for each wal entry
2024-03-01 07:46:05 +00:00
Lei, HUANG
376409b857 feat: employ sparse key encoding for shard lookup (#3410)
* feat: employ short key encoding for shard lookup

* fix: license

* chore: simplify code

* refactor: only enable sparse encoding to speed lookup on metric engine

* fix: names
2024-03-01 06:22:15 +00:00
Ning Sun
d4a54a085b feat: add configuration for tls watch option (#3395)
* feat: add configuration for tls watch option

* test: sleep longer to ensure async task run

* test: update config api integration test

* refactor: rename function
2024-03-01 03:49:54 +00:00
dennis zhuang
c1a370649e fix: show table names not complete from information_schema (#3417) 2024-03-01 02:51:46 +00:00
JeremyHi
3cad9d989d fix: partition region id (#3414) 2024-02-29 09:09:59 +00:00
JohnsonLee
a50025269f feat: Support automatic DNS lookup for kafka bootstrap servers (#3379)
* feat: Support automatic DNS lookup for kafka bootstrap servers

* Revert "feat: Support automatic DNS lookup for kafka bootstrap servers"

This reverts commit 5baed7b01d.

* feat: Support automatic DNS lookup for Kafka broker

* fix: resolve broker endpoint in client manager

* fix: apply clippy lints

* refactor: slimplify the code with clippy hint

* refactor: move resolve_broker_endpoint to common/wal/src/lib.rs

* test: add mock test for resolver_broker_endpoint

* refactor: accept niebayes's advice

* refactor: rename EndpointIpNotFound to EndpointIPV4NotFound

* refactor: remove mock test and simplify the implementation

* docs: add comments about test_vallid_host_ipv6

* Apply suggestions from code review

Co-authored-by: niebayes <niebayes@gmail.com>

* move more common code

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
Co-authored-by: tison <wander4096@gmail.com>
Co-authored-by: niebayes <niebayes@gmail.com>
2024-02-29 07:29:20 +00:00
JeremyHi
a3533c4ea0 feat: zero copy on split rows (#3407) 2024-02-28 13:27:52 +00:00
Lei, HUANG
3413fc0781 refactor: move some costly methods in DataBuffer::read out of read lock (#3406)
* refactor: move some costly methods in DataBuffer::read out of read lock

* refactor: also replace ShardReader with ShardReaderBuilder
2024-02-28 12:22:44 +00:00
tison
dc205a2c5d feat: enable ArrowFlight compression (#3403)
* feat: enable ArrowFlight compression

Signed-off-by: tison <wander4096@gmail.com>

* turn on features

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
2024-02-28 08:55:44 +00:00
Lei, HUANG
a0a8e8c587 fix: some read metrics (#3404)
* fix: some read metrics

* chore: fix some metrics

* fix
2024-02-28 08:47:49 +00:00
Zhenchi
c3c80b92c8 feat(index): measure memory usage in global instead of single-column and add metrics (#3383)
* feat(index): measure memory usage in global instead of single-column and add metrics

* feat: add leading zeros to streamline memory usage

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: fmt

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: remove println

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2024-02-28 06:49:24 +00:00
Weny Xu
a8cbec824c refactor: refactor TableRouteManager (#3392)
* feat: introduce TableRouteStorage

* refactor: remove get & batch_get in TableRouteManager

* refactor: move txn related fn to TableRouteStorage

* chore: apply suggestions from CR

* chore(codecov): ingore tests-integration dir
2024-02-28 06:18:09 +00:00
tison
33d894c1f0 build: do not retry for connrefused (#3402)
* build: do not retry for connrefused

Signed-off-by: tison <wander4096@gmail.com>

* simplify layout

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
2024-02-28 06:15:23 +00:00
Lei, HUANG
7942b8fae9 chore: add metris for memtable read path (#3397)
* chore: add metris for read path

* chore: add more metrics
2024-02-28 03:37:19 +00:00
Yingwen
b97f957489 feat: Use a partition level map to look up pk index (#3400)
* feat: partition level map

* test: test shard and builder

* fix: do not use pk index from shard builder

* feat: add multi key test

* fix: freeze shard before finding pk in shards
2024-02-28 03:17:09 +00:00
tison
f3d69e9563 chore: retry fetch dashboard assets (#3394)
Signed-off-by: tison <wander4096@gmail.com>
2024-02-27 10:07:21 +00:00
dennis zhuang
4b36c285f1 feat: flush or compact table and region functions (#3363)
* feat: adds Requester to process table flush and compaction request

* feat: admin_fn macros for administration functions

* test: add query result

* feat: impl flush_region, flush_table, compact_region, and flush_region functions

* docs: add Arguments to admin_fn macro

* chore: apply suggestion

Co-authored-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: apply suggestion

Co-authored-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: group_requests_by_peer and adds log

* Update src/common/macro/src/admin_fn.rs

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

* feat: adds todo for spawan thread

* feat: rebase with main

---------

Co-authored-by: Zhenchi <zhongzc_arch@outlook.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
2024-02-27 08:57:38 +00:00
discord9
dbb1ce1a9b feat(flow): impl for MapFilterProject (#3359)
* feat: mfp impls

* fix: after rebase

* test: temporal filter mfp

* refactor: more comments&test

* test: permute

* fix: check input len when eval

* refactor: err handle&docs: more explain graph

* docs: better flowchart map,filter,project

* refactor: visit_* falliable

* chore: better temp lint allow

* fix: permute partially

* chore: remove duplicated checks

* docs: more explain&tests for clarity

* refactor: use ensure! instead
2024-02-27 08:13:55 +00:00
Ruihang Xia
3544c9334c feat!: new partition grammar - parser part (#3347)
* parser part

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix test in sql

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* comment out and ignore some logic

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update sqlness cases

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update region migration test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* temporary disable region migration test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* allow dead code

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update integration test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-02-27 07:20:16 +00:00
Lei, HUANG
492a00969d feat: enable zstd compression and encodings in merge tree data part (#3380)
* feat: enable zstd compression in merge tree data part to save memory

* feat: also enable customized column encoding in DataPartEncoder
2024-02-27 06:54:56 +00:00
Yingwen
206666bff6 feat: Implement partition eviction and only add value size to write buffer size (#3393)
* feat: track key bytes in dict

* chore: done allocating on finish

* feat: evict keys

* chore: do not add to write buffer

* chore: only count value bytes

* fix: reset key bytes

* feat: remove write buffer manager from shards

* feat: change dict size compute method

* chore: adjust dictionary size by os memory
2024-02-27 06:28:57 +00:00
Weny Xu
7453d9779d fix: throw errors instead of panic (#3391)
* fix: throw errors instead of panic

* chore: apply suggestions from CR
2024-02-27 03:46:12 +00:00
liyang
8e3e0fd528 ci: add builder result outputs in release action (#3381) 2024-02-27 03:43:16 +00:00
dimbtp
b1e290f959 fix: range fix in modulo function tests (#3389)
fix: range fix for modulo tests
2024-02-26 15:50:23 +00:00
Ruihang Xia
d8dc93fccc feat(grafana): enable shared tooltip, add raft engine throughput (#3387)
feat: enable shared tooltip, add raft engine throughput

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-02-26 11:31:15 +00:00
Ning Sun
3887d207b6 feat: make tls certificates/keys reloadable (part 1) (#3335)
* feat: make tls certificates/keys reloadable (part 1)

* feat: add notify watcher for cert/key files

* test: add unit test for watcher

* fix: correct usage of watcher

* fix: skip watch when tls disabled
2024-02-26 09:37:54 +00:00
Ruihang Xia
e859f0e67d chore: skip reorder workspace tables in taplo (#3388)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-02-26 08:57:49 +00:00
Ruihang Xia
ce397ebcc6 feat: change how region id maps to region worker (#3384)
* feat: change how region id maps to region worker

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add overflow test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-02-26 08:42:29 +00:00
Yingwen
26011ed0b6 fix: resets dict builder keys counter and avoid unnecessary pruning (#3386)
* fix: dict builder resets num_keys on finish

* feat: skip empty shard and builder

* feat: avoid pruning if possible

Implementations:
- Apply all filters on the partition column
- If no filter to prune, skip decoding keys
2024-02-26 08:24:46 +00:00
Lei, HUANG
8087822ab2 refactor: change the receivers of merge tree components (#3378)
* refactor: change the receivers of Shard::read/DataBuffer::read/DataParts::read to &self instead of &mut self

* refactor: remove allow(dead_code) in merge tree
2024-02-26 06:50:55 +00:00
Yingwen
e481f073f5 feat: Implement dedup for the new memtable and expose the config (#3377)
* fix: KeyValues num_fields() is incorrect

* chore: fix warnings

* feat: support dedup

* feat: allow using the new memtable

* feat: serde default for config

* fix: resets pk index after finishing a dict
2024-02-25 13:06:01 +00:00
Lei, HUANG
606309f49a fix: remove unused imports in memtable_util.rs (#3376) 2024-02-25 09:23:28 +00:00
Yingwen
8059b95e37 feat: Implement iter for the new memtable (#3373)
* chore: read shard builder

* chore: reuse pk weights

* chore: prune key

* chore: shard reader wip

* refactor: shard builder DataBatch

* feat: merge shard readers

* feat: return shard id in shard readers

* feat: impl partition reader

* chore: impl partition read

* feat: impl iter tree

* chore: save last yield pk id

* style: fix clippy

* refactor: rename ShardReaderImpl to ShardReader

* chore: address CR comment
2024-02-25 07:42:16 +00:00
Lei, HUANG
afe4633320 feat: merge tree dedup reader (#3375)
* feat: add dedup option to merge tree component

* feat: impl dedup reader for shard reader

* refactor: DedupReader::new to DedupReader::try_new

* refactor: remove DedupReader::current_key field

* fix: some cr comments

* fix: fmt

* fix: remove shard_id method from DedupSource
2024-02-24 13:50:49 +00:00
Yingwen
abbfd23d4b feat: Add freeze and fork method to the memtable (#3374)
* feat: add fork method to the memtable

* feat: allow mark immutable returns result

* feat: use fork to create the mutable memtable

* feat: remove memtable builder from freeze

* chore: warninigs

* fix: inspect error

* feat: iter returns result

* chore: maintains memtable id in region

* chore: update comment

* fix: remove region status if failed to freeze a memtable

* chroe: update comment

* chore: iter should not require sync

* chore: implement freeze and fork for the new memtable
2024-02-24 12:11:16 +00:00
Yingwen
1df64f294b refactor: Remove Item from merger's Node trait (#3371)
* refactor: data reader returns reference to data batch

* refactor: use range to create merger

* chore: Reference RecordBatch in DataBatch

* fix: top node not read if no next node

* refactor: move timestamp_array_to_i64_slice to data mod

* style: fix cilppy

* chore: derive copy for DataBatch

* chore: address CR comments
2024-02-24 07:19:48 +00:00
LFC
a6564e72b4 fix: treat "0" and "1" as valid boolean values. (#3370)
* Treat "0" and "1" as valid boolean values.

* Update src/sql/src/statements.rs

Co-authored-by: tison <wander4096@gmail.com>

* Fix tests.

---------

Co-authored-by: tison <wander4096@gmail.com>
2024-02-23 14:34:27 +00:00
Lei, HUANG
1f1d1b4f57 feat: distinguish between different read paths (#3369)
* feat: distinguish between different read paths

* fix: reformat code
2024-02-23 12:40:39 +00:00
Yingwen
b144836935 feat: Implement write and fork for the new memtable (#3357)
* feat: write to a shard or a shard builder

* feat: freeze and fork for partition and shards

* chore: shard builder

* chore: change dict reader to support random access

* test: test write shard

* test: test write

* test: test memtable

* feat: add new and write_row to DataParts

* refactor: partition freeze shards

* refactor: write_with_pk_id

* style: fix clippy

* chore: add methods to get pk weights

* chroe: fix compiler errors
2024-02-23 07:20:55 +00:00
dependabot[bot]
93d9f48dd7 build(deps): bump libgit2-sys from 0.16.1+1.7.1 to 0.16.2+1.7.2 (#3367)
Bumps [libgit2-sys](https://github.com/rust-lang/git2-rs) from 0.16.1+1.7.1 to 0.16.2+1.7.2.
- [Changelog](https://github.com/rust-lang/git2-rs/blob/master/CHANGELOG.md)
- [Commits](https://github.com/rust-lang/git2-rs/commits)

---
updated-dependencies:
- dependency-name: libgit2-sys
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-02-23 14:30:09 +08:00
Lei, HUANG
90e9b69035 feat: impl merge reader for DataParts (#3361)
* feat: impl merge reader for DataParts

* fix: fmt

* fix: sort rows with pk and ts according to sequnce desc

* fix: remove pk weight as pk index are already replace by weights

* fix: format

* fix: some cr comments

* fix: some cr comments

* refactor: simply trait's associated types

* fix: some cr comments
2024-02-23 06:07:55 +00:00
LFC
2035e7bf4c refactor: set the actual bound port in server handler (#3353)
* refactor: set the actual bound port so we can use port 0 in testing

* Update src/servers/src/server.rs

Co-authored-by: Weny Xu <wenymedia@gmail.com>

* fmt

---------

Co-authored-by: Weny Xu <wenymedia@gmail.com>
2024-02-23 02:49:11 +00:00
Ruihang Xia
7341f23019 feat: skip filling NULL for put and delete requests (#3364)
* feat: optimize for sparse data

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove old structures

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-02-22 14:30:43 +00:00
tison
41ee0cdd5a build(deps): Upgrade opensrv to 0.7.0 (#3362)
* build(deps): Upgrade opensrv to 0.7.0

Signed-off-by: tison <wander4096@gmail.com>

* workaround X is not X by casting

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
2024-02-22 13:11:26 +00:00
Kould
578dd8f87a feat: add isnull function (#3360)
* code fmt

* feat: add isnull function

* feat: add isnull function
2024-02-22 12:41:25 +00:00
Weny Xu
1dc4fec662 refactor: allocate table ids in the procedure (#3293)
* refactor: refactor the create logical tables

* test(create_logical_tables): add tests for on_prepare

* test(create_logical_tables): add tests for on_create_metadata

* refactor: rename to create_logical_tables_metadata

* chore: fmt toml

* chore: apply suggestions from CR
2024-02-22 10:53:28 +00:00
Ruihang Xia
f26505b625 fix: typo in lint config (#3358)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-02-22 08:56:33 +00:00
Ruihang Xia
8289b0dec2 ci: align docs workflow jobs with develop.yml (#3356)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-02-22 07:01:15 +00:00
Yingwen
53105b99e7 test: fix list_files_and_parse_table_name path issue on windows (#3349)
* fix: always converts path to slash

* chore: print

* chore: normalize dir

* chore: compile

* chore: rm print
2024-02-22 06:16:41 +00:00
dennis zhuang
564fe3beca feat: impl migrate_region and procedure_state SQL function (#3325)
* fix: logical region can't find region routes

* feat: fetch partitions info in batch

* refactor: rename batch functions

* refactor: rename DdlTaskExecutor to ProcedureExecutor

* feat: impl migrate_region and query_procedure_state for ProcedureExecutor

* feat: adds SQL function procedure_state and finish migrate_region impl

* fix: constant vector

* feat: unit tests for migrate_region and procedure_state

* test: test region migration by SQL

* fix: compile error after rebeasing

* fix: clippy warnings

* feat: ensure procedure_state and migrate_region can be only called under greptime catalog

* fix: license header
2024-02-22 02:37:11 +00:00
SteveLauC
e9a2b0a9ee chore: use workspace-wide lints (#3352)
* chore: use workspace-wide lints

* respond to review
2024-02-22 01:01:10 +00:00
discord9
860b1e9d9e feat(flow): impl ScalarExpr&Scalar Function (#3283)
* feat: impl for ScalarExpr

* feat: plain functions

* refactor: simpler trait bound&tests

* chore: remove unused imports

* chore: fmt

* refactor: early ret on first error

* refactor: remove abunant match arm

* chore: per review

* doc: `support` fn

* chore: per review more

* chore: more per review

* fix: extract_bound

* chore: per review

* refactor: reduce nest
2024-02-21 12:53:16 +00:00
Yingwen
7c88d721c2 Merge pull request #3348
* feat: define functions for partitions

* feat: write partitions

* feat: fork and freeze partition

* feat: create iter by partition

* style: fix clippy

* chore: typos

* feat: add scan method to builder

* feat: check whether the builder should freeze first
2024-02-21 20:50:34 +08:00
Lei, HUANG
90169c868d feat: merge tree data parts (#3346)
* feat: add iter method for DataPart

* chore: rename iter to reader

* chore: some doc

* fix: resolve some comments

* fix: remove metadata in DataPart
2024-02-21 11:37:29 +00:00
tison
4c07606da6 refactor: put together HTTP headers (#3337)
* refactor: put together HTTP headers

Signed-off-by: tison <wander4096@gmail.com>

* do refactor

Signed-off-by: tison <wander4096@gmail.com>

* drop dirty commit

Signed-off-by: tison <wander4096@gmail.com>

* reduce changeset

Signed-off-by: tison <wander4096@gmail.com>

* fixup compilations

Signed-off-by: tison <wander4096@gmail.com>

* tidy files

Signed-off-by: tison <wander4096@gmail.com>

* drop common-api

Signed-off-by: tison <wander4096@gmail.com>

* fmt

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
2024-02-21 09:51:10 +00:00
tison
a7bf458a37 chore: remove unused deprecated table_dir_with_catalog_and_schema (#3341) 2024-02-21 08:46:36 +00:00
tison
fa08085119 ci: upgrade actions to node20-based version (#3345)
* ci: upgrade actions to node20-based version

Signed-off-by: tison <wander4096@gmail.com>

* distinguish artifact name

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
2024-02-21 08:09:09 +00:00
Lei, HUANG
86a98c80f5 feat: replace pk index with pk_weight during freeze (#3343)
* feat: replace pk index with pk_weight during freeze

* chore: add parameter to control pk_index replacement

* fix: dedup pk weights also

* fix: generate pk array before dedup
2024-02-21 08:05:25 +00:00
tison
085a380019 build(deps): axum-tets-helper has included patch-1 (#3333)
Signed-off-by: tison <wander4096@gmail.com>
2024-02-21 07:49:42 +00:00
tison
d9a96344ee ci: try fix log location (#3342)
Signed-off-by: tison <wander4096@gmail.com>
2024-02-21 07:01:51 +00:00
Weny Xu
41656c8635 refactor: allocate table id in the procedure (#3271)
* refactor: replace TableMetadataManager with TableNameManager

* refactor: allocate table id in the procedure

* refactor: refactor client logical of handling retries

* feat(test_util): add TestCreateTableExprBuilder

* feat(test_util): add MockDatanodeManager

* feat(test_util): add new_ddl_context

* feat(test_util): add build_raw_table_info_from_expr

* feat(test_util): add MockDatanodeManager::new

* feat(procedure): add downcast_output_ref to Status

* test(create_table): add tests for CreateTableProcedure on_prepare

* refactor(ddl): rename handle_operate_region_error to add_peer_context_if_need

* test(create_table): add tests for CreateTableProcedure on_datanode_create_regions

* test(create_table): add tests for CreateTableProcedure on_create_metadata

* refactor(meta): use CreateTableExprBuilder

* feat(create_table): ensure number of partitions is greater than 0

* refactor: rename to add_peer_context_if_needed

* feat: add context for panic

* refactor: simplify the should_retry

* refactor: use Option<&T> instead of &Option<T>

* refactor: move downcast_output_ref under cfg(test)

* chore: fmt toml
2024-02-21 04:38:46 +00:00
tison
cf08a3de6b chore: support configure GITHUB_PROXY_URL when fetch dashboard assets (#3340)
Signed-off-by: tison <wander4096@gmail.com>
2024-02-21 02:38:14 +00:00
Yingwen
f087a843bb feat: Implement KeyDictBuilder for the merge tree memtable (#3334)
* feat: dict builder

* feat: write and scan dict builder

* chore: address CR comments
2024-02-20 15:39:17 +00:00
Lei, HUANG
450dfe324d feat: data buffer and related structs (#3329)
* feat: data buffer and related structs

* fix: some cr comments

* chore: remove freeze_threshold in DataBuffer

* fix: use LazyMutableVectorBuilder instead of two vector; add option to control dedup

* fix: dedup rows according to both pk weights and timestamps

* fix: assembly DataBatch on demand
2024-02-20 09:22:45 +00:00
tison
3dfe4a2e5a chore: check dirs before create RaftEngine store (#3327)
* chore: check dirs before create RaftEngine store

Signed-off-by: tison <wander4096@gmail.com>

* fix impl

Signed-off-by: tison <wander4096@gmail.com>

* improve naming

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
2024-02-20 07:48:15 +00:00
LFC
eded08897d test: add data compatibility test (#3109)
* test: data files compatibility test

* rework compatibility test

* revert unneeded changes

* revert unneeded changes

* debug CI

* Update .github/workflows/develop.yml

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

---------

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
2024-02-20 07:44:04 +00:00
Ruihang Xia
b1f54d8a03 fix: disable ansi contorl char when stdout is redirected (#3332)
* fix: disable ansi contorl char when stdout is redirected

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* don't touch file logging layer

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update comment

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* disable ansi for two file layers

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

Co-authored-by: LFC <bayinamine@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: LFC <bayinamine@gmail.com>
2024-02-20 06:42:56 +00:00
shuiyisong
bf5e1905cd refactor: bring metrics to http output (#3247)
* refactor: bring metrics to http output

* chore: remove unwrap

* chore: make walk plan accumulate

* chore: change field name and comment

* chore: add metrics to http resp header

* chore: move PrometheusJsonResponse to a separate file and impl IntoResponse

* chore: put metrics in prometheus resp header too
2024-02-20 03:25:18 +00:00
Zhenchi
6628c41c36 feat(metric-engine): set index options for data region (#3330)
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2024-02-20 02:38:35 +00:00
405 changed files with 17765 additions and 4151 deletions

View File

@@ -3,13 +3,3 @@ linker = "aarch64-linux-gnu-gcc"
[alias]
sqlness = "run --bin sqlness-runner --"
[build]
rustflags = [
# lints
# TODO: use lint configuration in cargo https://github.com/rust-lang/cargo/issues/5034
"-Wclippy::print_stdout",
"-Wclippy::print_stderr",
"-Wclippy::implicit_clone",
]

View File

@@ -34,7 +34,7 @@ runs:
- name: Upload sqlness logs
if: ${{ failure() && inputs.disable-run-tests == 'false' }} # Only upload logs when the integration tests failed.
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: sqlness-logs
path: /tmp/greptime-*.log

View File

@@ -67,7 +67,7 @@ runs:
- name: Upload sqlness logs
if: ${{ failure() }} # Only upload logs when the integration tests failed.
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: sqlness-logs
path: /tmp/greptime-*.log

View File

@@ -62,10 +62,10 @@ runs:
- name: Upload sqlness logs
if: ${{ failure() }} # Only upload logs when the integration tests failed.
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: sqlness-logs
path: ${{ runner.temp }}/greptime-*.log
path: /tmp/greptime-*.log
retention-days: 3
- name: Build greptime binary

View File

@@ -1,8 +1,10 @@
I hereby agree to the terms of the [GreptimeDB CLA](https://gist.github.com/xtang/6378857777706e568c1949c7578592cc)
I hereby agree to the terms of the [GreptimeDB CLA](https://github.com/GreptimeTeam/.github/blob/main/CLA.md).
## Refer to a related PR or issue link (optional)
## What's changed and what's your intention?
_PLEASE DO NOT LEAVE THIS EMPTY !!!_
__!!! DO NOT LEAVE THIS BLOCK EMPTY !!!__
Please explain IN DETAIL what the changes are in this PR and why they are needed:
@@ -16,5 +18,3 @@ Please explain IN DETAIL what the changes are in this PR and why they are needed
- [ ] I have written the necessary rustdoc comments.
- [ ] I have added the necessary unit tests and integration tests.
- [x] This PR does not require documentation updates.
## Refer to a related PR or issue link (optional)

View File

@@ -1,7 +1,7 @@
on:
merge_group:
pull_request:
types: [opened, synchronize, reopened, ready_for_review]
types: [ opened, synchronize, reopened, ready_for_review ]
paths-ignore:
- 'docs/**'
- 'config/**'
@@ -57,7 +57,7 @@ jobs:
toolchain: ${{ env.RUST_TOOLCHAIN }}
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
with:
# Shares across multiple jobs
# Shares with `Clippy` job
shared-key: "check-lint"
@@ -75,7 +75,7 @@ jobs:
toolchain: stable
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
with:
# Shares across multiple jobs
shared-key: "check-toml"
- name: Install taplo
@@ -136,13 +136,12 @@ jobs:
run: tar -xvf ./bins.tar.gz
- name: Run sqlness
run: RUST_BACKTRACE=1 ./bins/sqlness-runner -c ./tests/cases --bins-dir ./bins
# FIXME: Logs cannot found be on failure (or even success). Need to figure out the cause.
- name: Upload sqlness logs
if: always()
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: sqlness-logs
path: ${{ runner.temp }}/greptime-*.log
path: /tmp/greptime-*.log
retention-days: 3
sqlness-kafka-wal:
@@ -167,13 +166,12 @@ jobs:
run: docker compose -f docker-compose-standalone.yml up -d --wait
- name: Run sqlness
run: RUST_BACKTRACE=1 ./bins/sqlness-runner -w kafka -k 127.0.0.1:9092 -c ./tests/cases --bins-dir ./bins
# FIXME: Logs cannot be found on failure (or even success). Need to figure out the cause.
- name: Upload sqlness logs
if: always()
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: sqlness-logs
path: ${{ runner.temp }}/greptime-*.log
name: sqlness-logs-with-kafka-wal
path: /tmp/greptime-*.log
retention-days: 3
fmt:
@@ -191,7 +189,7 @@ jobs:
components: rustfmt
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
with:
# Shares across multiple jobs
shared-key: "check-rust-fmt"
- name: Run cargo fmt
@@ -212,7 +210,7 @@ jobs:
components: clippy
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
with:
# Shares across multiple jobs
# Shares with `Check` job
shared-key: "check-lint"
@@ -271,10 +269,28 @@ jobs:
GT_KAFKA_ENDPOINTS: 127.0.0.1:9092
UNITTEST_LOG_DIR: "__unittest_logs"
- name: Codecov upload
uses: codecov/codecov-action@v2
uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_TOKEN }}
files: ./lcov.info
flags: rust
fail_ci_if_error: false
verbose: true
compat:
name: Compatibility Test
needs: build
runs-on: ubuntu-20.04
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
- name: Download pre-built binaries
uses: actions/download-artifact@v4
with:
name: bins
path: .
- name: Unzip binaries
run: |
mkdir -p ./bins/current
tar -xvf ./bins.tar.gz --strip-components=1 -C ./bins/current
- run: ./tests/compat/test-compat.sh 0.6.0

View File

@@ -61,6 +61,18 @@ jobs:
sqlness:
name: Sqlness Test
runs-on: ubuntu-20.04
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ ubuntu-20.04 ]
steps:
- run: 'echo "No action required"'
sqlness-kafka-wal:
name: Sqlness Test with Kafka Wal
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ ubuntu-20.04 ]
steps:
- run: 'echo "No action required"'

View File

@@ -45,10 +45,10 @@ jobs:
{"text": "Nightly CI failed for sqlness tests"}
- name: Upload sqlness logs
if: always()
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: sqlness-logs
path: ${{ runner.temp }}/greptime-*.log
path: /tmp/greptime-*.log
retention-days: 3
test-on-windows:

View File

@@ -91,7 +91,7 @@ env:
# The scheduled version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-YYYYMMDD', like v0.2.0-nigthly-20230313;
NIGHTLY_RELEASE_PREFIX: nightly
# Note: The NEXT_RELEASE_VERSION should be modified manually by every formal release.
NEXT_RELEASE_VERSION: v0.7.0
NEXT_RELEASE_VERSION: v0.8.0
jobs:
allocate-runners:
@@ -221,6 +221,8 @@ jobs:
arch: x86_64-apple-darwin
artifacts-dir-prefix: greptime-darwin-amd64-pyo3
runs-on: ${{ matrix.os }}
outputs:
build-macos-result: ${{ steps.set-build-macos-result.outputs.build-macos-result }}
needs: [
allocate-runners,
]
@@ -260,6 +262,8 @@ jobs:
features: pyo3_backend,servers/dashboard
artifacts-dir-prefix: greptime-windows-amd64-pyo3
runs-on: ${{ matrix.os }}
outputs:
build-windows-result: ${{ steps.set-build-windows-result.outputs.build-windows-result }}
needs: [
allocate-runners,
]
@@ -284,7 +288,7 @@ jobs:
- name: Set build windows result
id: set-build-windows-result
run: |
echo "build-windows-result=success" >> $GITHUB_OUTPUT
echo "build-windows-result=success" >> $Env:GITHUB_OUTPUT
release-images-to-dockerhub:
name: Build and push images to DockerHub
@@ -295,6 +299,8 @@ jobs:
build-linux-arm64-artifacts,
]
runs-on: ubuntu-2004-16-cores
outputs:
build-image-result: ${{ steps.set-build-image-result.outputs.build-image-result }}
steps:
- uses: actions/checkout@v4
with:
@@ -310,7 +316,7 @@ jobs:
version: ${{ needs.allocate-runners.outputs.version }}
- name: Set build image result
id: set-image-build-result
id: set-build-image-result
run: |
echo "build-image-result=success" >> $GITHUB_OUTPUT

353
Cargo.lock generated
View File

@@ -196,7 +196,7 @@ checksum = "8f1f8f5a6f3d50d89e3797d7593a50f96bb2aaa20ca0cc7be1fb673232c91d72"
[[package]]
name = "api"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"common-base",
"common-decimal",
@@ -412,6 +412,7 @@ dependencies = [
"arrow-data",
"arrow-schema",
"flatbuffers",
"lz4",
]
[[package]]
@@ -674,7 +675,7 @@ dependencies = [
[[package]]
name = "auth"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"api",
"async-trait",
@@ -785,23 +786,6 @@ dependencies = [
"syn 2.0.43",
]
[[package]]
name = "axum-test-helper"
version = "0.1.1"
source = "git+https://github.com/sunng87/axum-test-helper.git?branch=patch-1#5aa7843ce2250144ea1b7f589f274c00cf1af4ab"
dependencies = [
"axum",
"bytes",
"http",
"http-body",
"hyper",
"reqwest",
"serde",
"tokio",
"tower",
"tower-service",
]
[[package]]
name = "axum-test-helper"
version = "0.3.0"
@@ -877,7 +861,7 @@ dependencies = [
[[package]]
name = "benchmarks"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"arrow",
"chrono",
@@ -1235,7 +1219,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "catalog"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"api",
"arc-swap",
@@ -1526,7 +1510,7 @@ checksum = "702fc72eb24e5a1e48ce58027a675bc24edd52096d5397d4aea7c6dd9eca0bd1"
[[package]]
name = "client"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"api",
"arc-swap",
@@ -1557,10 +1541,12 @@ dependencies = [
"prometheus",
"prost 0.12.3",
"rand",
"serde",
"serde_json",
"session",
"snafu",
"substrait 0.17.1",
"substrait 0.6.0",
"substrait 0.7.0",
"tokio",
"tokio-stream",
"tonic 0.10.2",
@@ -1590,7 +1576,7 @@ dependencies = [
[[package]]
name = "cmd"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"anymap",
"async-trait",
@@ -1643,7 +1629,7 @@ dependencies = [
"session",
"snafu",
"store-api",
"substrait 0.6.0",
"substrait 0.7.0",
"table",
"temp-env",
"tikv-jemallocator",
@@ -1686,7 +1672,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
[[package]]
name = "common-base"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"anymap",
"bitvec",
@@ -1701,7 +1687,7 @@ dependencies = [
[[package]]
name = "common-catalog"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"chrono",
"common-error",
@@ -1712,7 +1698,7 @@ dependencies = [
[[package]]
name = "common-config"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"common-base",
"humantime-serde",
@@ -1723,7 +1709,7 @@ dependencies = [
[[package]]
name = "common-datasource"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"arrow",
"arrow-schema",
@@ -1755,7 +1741,7 @@ dependencies = [
[[package]]
name = "common-decimal"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"arrow",
"bigdecimal",
@@ -1769,7 +1755,7 @@ dependencies = [
[[package]]
name = "common-error"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"snafu",
"strum 0.25.0",
@@ -1777,14 +1763,17 @@ dependencies = [
[[package]]
name = "common-function"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"api",
"arc-swap",
"async-trait",
"chrono-tz 0.6.3",
"common-base",
"common-catalog",
"common-error",
"common-macro",
"common-meta",
"common-query",
"common-runtime",
"common-telemetry",
@@ -1799,15 +1788,17 @@ dependencies = [
"paste",
"ron",
"serde",
"serde_json",
"session",
"snafu",
"statrs",
"store-api",
"table",
]
[[package]]
name = "common-greptimedb-telemetry"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"async-trait",
"common-error",
@@ -1826,7 +1817,7 @@ dependencies = [
[[package]]
name = "common-grpc"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"api",
"arrow-flight",
@@ -1856,7 +1847,7 @@ dependencies = [
[[package]]
name = "common-grpc-expr"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"api",
"async-trait",
@@ -1875,7 +1866,7 @@ dependencies = [
[[package]]
name = "common-macro"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"arc-swap",
"common-query",
@@ -1890,7 +1881,7 @@ dependencies = [
[[package]]
name = "common-mem-prof"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"common-error",
"common-macro",
@@ -1903,7 +1894,7 @@ dependencies = [
[[package]]
name = "common-meta"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"api",
"async-recursion",
@@ -1912,11 +1903,13 @@ dependencies = [
"base64 0.21.5",
"bytes",
"chrono",
"common-base",
"common-catalog",
"common-error",
"common-grpc-expr",
"common-macro",
"common-procedure",
"common-procedure-test",
"common-recordbatch",
"common-runtime",
"common-telemetry",
@@ -1949,9 +1942,13 @@ dependencies = [
"uuid",
]
[[package]]
name = "common-plugins"
version = "0.7.0"
[[package]]
name = "common-procedure"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"async-stream",
"async-trait",
@@ -1975,7 +1972,7 @@ dependencies = [
[[package]]
name = "common-procedure-test"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"async-trait",
"common-procedure",
@@ -1983,7 +1980,7 @@ dependencies = [
[[package]]
name = "common-query"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"api",
"async-trait",
@@ -2006,7 +2003,7 @@ dependencies = [
[[package]]
name = "common-recordbatch"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"arc-swap",
"common-base",
@@ -2026,7 +2023,7 @@ dependencies = [
[[package]]
name = "common-runtime"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"async-trait",
"common-error",
@@ -2046,8 +2043,9 @@ dependencies = [
[[package]]
name = "common-telemetry"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"atty",
"backtrace",
"common-error",
"console-subscriber",
@@ -2073,7 +2071,7 @@ dependencies = [
[[package]]
name = "common-test-util"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"client",
"common-query",
@@ -2085,7 +2083,7 @@ dependencies = [
[[package]]
name = "common-time"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"arrow",
"chrono",
@@ -2101,16 +2099,18 @@ dependencies = [
[[package]]
name = "common-version"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"build-data",
]
[[package]]
name = "common-wal"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"common-base",
"common-error",
"common-macro",
"common-telemetry",
"futures-util",
"humantime-serde",
@@ -2118,6 +2118,8 @@ dependencies = [
"serde",
"serde_json",
"serde_with",
"snafu",
"tokio",
"toml 0.8.8",
]
@@ -2752,7 +2754,7 @@ dependencies = [
[[package]]
name = "datanode"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"api",
"arrow-flight",
@@ -2810,7 +2812,7 @@ dependencies = [
"snafu",
"sql",
"store-api",
"substrait 0.6.0",
"substrait 0.7.0",
"table",
"tokio",
"tokio-stream",
@@ -2824,7 +2826,7 @@ dependencies = [
[[package]]
name = "datatypes"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"arrow",
"arrow-array",
@@ -3300,7 +3302,7 @@ dependencies = [
[[package]]
name = "file-engine"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"api",
"async-trait",
@@ -3401,7 +3403,7 @@ dependencies = [
[[package]]
name = "flow"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"api",
"bimap",
@@ -3415,6 +3417,7 @@ dependencies = [
"datatypes",
"hydroflow",
"itertools 0.10.5",
"num-traits",
"serde",
"servers",
"session",
@@ -3455,7 +3458,7 @@ checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
[[package]]
name = "frontend"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"api",
"arc-swap",
@@ -3519,7 +3522,7 @@ dependencies = [
"sqlparser 0.38.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=6a93567ae38d42be5c8d08b13c8ff4dde26502ef)",
"store-api",
"strfmt",
"substrait 0.6.0",
"substrait 0.7.0",
"table",
"tokio",
"toml 0.8.8",
@@ -3590,6 +3593,15 @@ dependencies = [
"winapi",
]
[[package]]
name = "fsevent-sys"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2"
dependencies = [
"libc",
]
[[package]]
name = "fst"
version = "0.4.7"
@@ -4279,7 +4291,7 @@ dependencies = [
[[package]]
name = "index"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"async-trait",
"asynchronous-codec",
@@ -4374,6 +4386,26 @@ dependencies = [
"snafu",
]
[[package]]
name = "inotify"
version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff"
dependencies = [
"bitflags 1.3.2",
"inotify-sys",
"libc",
]
[[package]]
name = "inotify-sys"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb"
dependencies = [
"libc",
]
[[package]]
name = "instant"
version = "0.1.12"
@@ -4571,6 +4603,26 @@ dependencies = [
"indexmap 2.1.0",
]
[[package]]
name = "kqueue"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7447f1ca1b7b563588a205fe93dea8df60fd981423a768bc1c0ded35ed147d0c"
dependencies = [
"kqueue-sys",
"libc",
]
[[package]]
name = "kqueue-sys"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b"
dependencies = [
"bitflags 1.3.2",
"libc",
]
[[package]]
name = "lalrpop"
version = "0.19.12"
@@ -4796,7 +4848,7 @@ checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
[[package]]
name = "log-store"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"async-stream",
"async-trait",
@@ -5085,7 +5137,7 @@ dependencies = [
[[package]]
name = "meta-client"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"api",
"async-trait",
@@ -5115,7 +5167,7 @@ dependencies = [
[[package]]
name = "meta-srv"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"anymap",
"api",
@@ -5178,7 +5230,7 @@ dependencies = [
[[package]]
name = "meter-core"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=abbd357c1e193cd270ea65ee7652334a150b628f#abbd357c1e193cd270ea65ee7652334a150b628f"
source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=80b72716dcde47ec4161478416a5c6c21343364d#80b72716dcde47ec4161478416a5c6c21343364d"
dependencies = [
"anymap",
"once_cell",
@@ -5188,14 +5240,14 @@ dependencies = [
[[package]]
name = "meter-macros"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=abbd357c1e193cd270ea65ee7652334a150b628f#abbd357c1e193cd270ea65ee7652334a150b628f"
source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=80b72716dcde47ec4161478416a5c6c21343364d#80b72716dcde47ec4161478416a5c6c21343364d"
dependencies = [
"meter-core",
]
[[package]]
name = "metric-engine"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"api",
"aquamarine",
@@ -5210,6 +5262,7 @@ dependencies = [
"common-time",
"datafusion",
"datatypes",
"itertools 0.10.5",
"lazy_static",
"mito2",
"mur3",
@@ -5254,9 +5307,9 @@ dependencies = [
[[package]]
name = "mio"
version = "0.8.10"
version = "0.8.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09"
checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c"
dependencies = [
"libc",
"log",
@@ -5266,7 +5319,7 @@ dependencies = [
[[package]]
name = "mito2"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"anymap",
"api",
@@ -5293,6 +5346,7 @@ dependencies = [
"common-test-util",
"common-time",
"common-wal",
"criterion",
"dashmap",
"datafusion",
"datafusion-common",
@@ -5313,6 +5367,7 @@ dependencies = [
"prometheus",
"prost 0.12.3",
"puffin",
"rand",
"regex",
"serde",
"serde_json",
@@ -5325,6 +5380,7 @@ dependencies = [
"tokio",
"tokio-stream",
"tokio-util",
"toml 0.8.8",
"uuid",
]
@@ -5418,6 +5474,24 @@ dependencies = [
"thiserror",
]
[[package]]
name = "mysql-common-derive"
version = "0.31.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c60492b5eb751e55b42d716b6b26dceb66767996cd7a5560a842fbf613ca2e92"
dependencies = [
"darling 0.20.3",
"heck",
"num-bigint",
"proc-macro-crate 3.1.0",
"proc-macro-error",
"proc-macro2",
"quote",
"syn 2.0.43",
"termcolor",
"thiserror",
]
[[package]]
name = "mysql_async"
version = "0.33.0"
@@ -5434,7 +5508,7 @@ dependencies = [
"lazy_static",
"lru",
"mio",
"mysql_common",
"mysql_common 0.31.0",
"once_cell",
"pem",
"percent-encoding",
@@ -5470,13 +5544,12 @@ dependencies = [
"byteorder",
"bytes",
"cc",
"chrono",
"cmake",
"crc32fast",
"flate2",
"frunk",
"lazy_static",
"mysql-common-derive",
"mysql-common-derive 0.30.2",
"num-bigint",
"num-traits",
"rand",
@@ -5495,6 +5568,46 @@ dependencies = [
"zstd 0.12.4",
]
[[package]]
name = "mysql_common"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b73aacd01475af6d2efbdf489efd60fc519515ffe94edfd74236f954d521e31b"
dependencies = [
"base64 0.21.5",
"bigdecimal",
"bindgen",
"bitflags 2.4.1",
"bitvec",
"btoi",
"byteorder",
"bytes",
"cc",
"chrono",
"cmake",
"crc32fast",
"flate2",
"frunk",
"lazy_static",
"mysql-common-derive 0.31.0",
"num-bigint",
"num-traits",
"rand",
"regex",
"rust_decimal",
"saturating",
"serde",
"serde_json",
"sha1",
"sha2",
"smallvec",
"subprocess",
"thiserror",
"time",
"uuid",
"zstd 0.13.0",
]
[[package]]
name = "nalgebra"
version = "0.29.0"
@@ -5601,6 +5714,25 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be"
[[package]]
name = "notify"
version = "6.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d"
dependencies = [
"bitflags 2.4.1",
"crossbeam-channel",
"filetime",
"fsevent-sys",
"inotify",
"kqueue",
"libc",
"log",
"mio",
"walkdir",
"windows-sys 0.48.0",
]
[[package]]
name = "ntapi"
version = "0.4.1"
@@ -5790,9 +5922,18 @@ dependencies = [
"memchr",
]
[[package]]
name = "object-pool"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee9a3e7196d09ec86002b939f1576e8e446d58def8fd48fe578e2c72d5328d68"
dependencies = [
"parking_lot 0.11.2",
]
[[package]]
name = "object-store"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"anyhow",
"async-trait",
@@ -5890,13 +6031,14 @@ dependencies = [
[[package]]
name = "opensrv-mysql"
version = "0.6.0"
source = "git+https://github.com/MichaelScofield/opensrv.git?rev=1676c1d#1676c1d166cf33e7745e1b5db54b3fe2b7defec9"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4148ab944991b0a33be74d2636a815268974578812a9e4cf7dc785325e858154"
dependencies = [
"async-trait",
"byteorder",
"chrono",
"mysql_common",
"mysql_common 0.32.0",
"nom",
"pin-project-lite",
"tokio",
@@ -6034,7 +6176,7 @@ dependencies = [
[[package]]
name = "operator"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"api",
"async-trait",
@@ -6069,6 +6211,7 @@ dependencies = [
"meter-macros",
"object-store",
"partition",
"path-slash",
"prometheus",
"query",
"regex",
@@ -6080,7 +6223,7 @@ dependencies = [
"sql",
"sqlparser 0.38.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=6a93567ae38d42be5c8d08b13c8ff4dde26502ef)",
"store-api",
"substrait 0.6.0",
"substrait 0.7.0",
"table",
"tokio",
"tonic 0.10.2",
@@ -6311,7 +6454,7 @@ dependencies = [
[[package]]
name = "partition"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"api",
"async-trait",
@@ -6333,6 +6476,8 @@ dependencies = [
"serde",
"serde_json",
"snafu",
"sql",
"sqlparser 0.38.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=6a93567ae38d42be5c8d08b13c8ff4dde26502ef)",
"store-api",
"table",
]
@@ -6343,6 +6488,12 @@ version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de3145af08024dea9fa9914f381a17b8fc6034dfb00f3a84013f7ff43f29ed4c"
[[package]]
name = "path-slash"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e91099d4268b0e11973f036e885d652fb0b21fedcf69738c627f94db6a44f42"
[[package]]
name = "pathdiff"
version = "0.2.1"
@@ -6630,7 +6781,7 @@ dependencies = [
[[package]]
name = "plugins"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"auth",
"common-base",
@@ -6811,6 +6962,15 @@ dependencies = [
"toml_edit 0.20.7",
]
[[package]]
name = "proc-macro-crate"
version = "3.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d37c51ca738a55da99dc0c4a34860fd675453b8b36209178c2249bb13651284"
dependencies = [
"toml_edit 0.21.0",
]
[[package]]
name = "proc-macro-error"
version = "1.0.4"
@@ -6888,7 +7048,7 @@ dependencies = [
[[package]]
name = "promql"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"ahash 0.8.6",
"async-recursion",
@@ -7099,7 +7259,7 @@ dependencies = [
[[package]]
name = "puffin"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"async-trait",
"bitflags 2.4.1",
@@ -7220,7 +7380,7 @@ dependencies = [
[[package]]
name = "query"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"ahash 0.8.6",
"api",
@@ -7241,6 +7401,7 @@ dependencies = [
"common-function",
"common-macro",
"common-meta",
"common-plugins",
"common-query",
"common-recordbatch",
"common-telemetry",
@@ -7280,7 +7441,7 @@ dependencies = [
"stats-cli",
"store-api",
"streaming-stats",
"substrait 0.6.0",
"substrait 0.7.0",
"table",
"tokio",
"tokio-stream",
@@ -8598,7 +8759,7 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "script"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"api",
"arc-swap",
@@ -8871,7 +9032,7 @@ dependencies = [
[[package]]
name = "servers"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"aide",
"api",
@@ -8883,7 +9044,7 @@ dependencies = [
"auth",
"axum",
"axum-macros",
"axum-test-helper 0.3.0",
"axum-test-helper",
"base64 0.21.5",
"bytes",
"catalog",
@@ -8897,6 +9058,7 @@ dependencies = [
"common-macro",
"common-mem-prof",
"common-meta",
"common-plugins",
"common-query",
"common-recordbatch",
"common-runtime",
@@ -8904,6 +9066,7 @@ dependencies = [
"common-test-util",
"common-time",
"common-version",
"criterion",
"datafusion",
"datafusion-common",
"datafusion-expr",
@@ -8922,6 +9085,8 @@ dependencies = [
"lazy_static",
"mime_guess",
"mysql_async",
"notify",
"object-pool",
"once_cell",
"openmetrics-parser",
"opensrv-mysql",
@@ -8954,6 +9119,7 @@ dependencies = [
"sql",
"strum 0.25.0",
"table",
"tempfile",
"tikv-jemalloc-ctl",
"tokio",
"tokio-postgres",
@@ -8970,7 +9136,7 @@ dependencies = [
[[package]]
name = "session"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"api",
"arc-swap",
@@ -8979,6 +9145,7 @@ dependencies = [
"common-telemetry",
"common-time",
"derive_builder 0.12.0",
"snafu",
"sql",
]
@@ -9239,7 +9406,7 @@ dependencies = [
[[package]]
name = "sql"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"api",
"common-base",
@@ -9291,7 +9458,7 @@ dependencies = [
[[package]]
name = "sqlness-runner"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"async-trait",
"clap 4.4.11",
@@ -9498,7 +9665,7 @@ dependencies = [
[[package]]
name = "store-api"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"api",
"aquamarine",
@@ -9638,7 +9805,7 @@ dependencies = [
[[package]]
name = "substrait"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"async-recursion",
"async-trait",
@@ -9811,7 +9978,7 @@ dependencies = [
[[package]]
name = "table"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"anymap",
"async-trait",
@@ -9923,7 +10090,7 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
[[package]]
name = "tests-fuzz"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"async-trait",
"common-error",
@@ -9948,14 +10115,14 @@ dependencies = [
[[package]]
name = "tests-integration"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"api",
"arrow-flight",
"async-trait",
"auth",
"axum",
"axum-test-helper 0.1.1",
"axum-test-helper",
"catalog",
"chrono",
"client",
@@ -10005,7 +10172,7 @@ dependencies = [
"sql",
"sqlx",
"store-api",
"substrait 0.6.0",
"substrait 0.7.0",
"table",
"tempfile",
"time",

View File

@@ -18,6 +18,7 @@ members = [
"src/common/grpc-expr",
"src/common/mem-prof",
"src/common/meta",
"src/common/plugins",
"src/common/procedure",
"src/common/procedure-test",
"src/common/query",
@@ -61,17 +62,23 @@ members = [
resolver = "2"
[workspace.package]
version = "0.6.0"
version = "0.7.0"
edition = "2021"
license = "Apache-2.0"
[workspace.lints]
clippy.print_stdout = "warn"
clippy.print_stderr = "warn"
clippy.implicit_clone = "warn"
rust.unknown_lints = "deny"
[workspace.dependencies]
ahash = { version = "0.8", features = ["compile-time-rng"] }
aquamarine = "0.3"
arrow = { version = "47.0" }
arrow-array = "47.0"
arrow-flight = "47.0"
arrow-ipc = "47.0"
arrow-ipc = { version = "47.0", features = ["lz4"] }
arrow-schema = { version = "47.0", features = ["serde"] }
async-stream = "0.3"
async-trait = "0.1"
@@ -100,7 +107,7 @@ greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", r
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "abbd357c1e193cd270ea65ee7652334a150b628f" }
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "80b72716dcde47ec4161478416a5c6c21343364d" }
mockall = "0.11.4"
moka = "0.12"
num_cpus = "1.16"
@@ -164,6 +171,7 @@ common-grpc-expr = { path = "src/common/grpc-expr" }
common-macro = { path = "src/common/macro" }
common-mem-prof = { path = "src/common/mem-prof" }
common-meta = { path = "src/common/meta" }
common-plugins = { path = "src/common/plugins" }
common-procedure = { path = "src/common/procedure" }
common-procedure-test = { path = "src/common/procedure-test" }
common-query = { path = "src/common/query" }
@@ -201,7 +209,7 @@ table = { path = "src/table" }
[workspace.dependencies.meter-macros]
git = "https://github.com/GreptimeTeam/greptime-meter.git"
rev = "abbd357c1e193cd270ea65ee7652334a150b628f"
rev = "80b72716dcde47ec4161478416a5c6c21343364d"
[profile.release]
debug = 1

View File

@@ -4,6 +4,9 @@ version.workspace = true
edition.workspace = true
license.workspace = true
[lints]
workspace = true
[dependencies]
arrow.workspace = true
chrono.workspace = true

View File

@@ -504,7 +504,7 @@ async fn do_query(num_iter: usize, db: &Database, table_name: &str) {
let res = db.sql(&query).await.unwrap();
match res {
Output::AffectedRows(_) | Output::RecordBatches(_) => (),
Output::Stream(stream) => {
Output::Stream(stream, _) => {
stream.try_collect::<Vec<_>>().await.unwrap();
}
}

View File

@@ -8,5 +8,6 @@ coverage:
ignore:
- "**/error*.rs" # ignore all error.rs files
- "tests/runner/*.rs" # ignore integration test runner
- "tests-integration/**/*.rs" # ignore integration tests
comment: # this is a top-level key
layout: "diff"

View File

@@ -134,10 +134,22 @@ create_on_compaction = "auto"
apply_on_query = "auto"
# Memory threshold for performing an external sort during index creation.
# Setting to empty will disable external sorting, forcing all sorting operations to happen in memory.
mem_threshold_on_create = "64MB"
mem_threshold_on_create = "64M"
# File system path to store intermediate files for external sorting (default `{data_home}/index_intermediate`).
intermediate_path = ""
[region_engine.mito.memtable]
# Memtable type.
# - "experimental": experimental memtable
# - "time_series": time-series memtable (deprecated)
type = "experimental"
# The max number of keys in one shard.
index_max_keys_per_shard = 8192
# The max rows of data inside the actively writing buffer in one shard.
data_freeze_threshold = 32768
# Max dictionary bytes.
fork_dictionary_bytes = "1GiB"
# Log options, see `standalone.example.toml`
# [logging]
# dir = "/tmp/greptimedb/logs"

View File

@@ -31,6 +31,7 @@ runtime_size = 2
mode = "disable"
cert_path = ""
key_path = ""
watch = false
# PostgresSQL server options, see `standalone.example.toml`.
[postgres]
@@ -43,6 +44,7 @@ runtime_size = 2
mode = "disable"
cert_path = ""
key_path = ""
watch = false
# OpenTSDB protocol options, see `standalone.example.toml`.
[opentsdb]

View File

@@ -44,6 +44,8 @@ mode = "disable"
cert_path = ""
# Private key file path.
key_path = ""
# Watch for Certificate and key file change and auto reload
watch = false
# PostgresSQL server options.
[postgres]
@@ -62,6 +64,8 @@ mode = "disable"
cert_path = ""
# private key file path.
key_path = ""
# Watch for Certificate and key file change and auto reload
watch = false
# OpenTSDB protocol options.
[opentsdb]
@@ -118,7 +122,7 @@ sync_period = "1000ms"
# Number of topics to be created upon start.
# num_topics = 64
# Topic selector type.
# Available selector types:
# Available selector types:
# - "round_robin" (default)
# selector_type = "round_robin"
# The prefix of topic name.
@@ -240,6 +244,18 @@ mem_threshold_on_create = "64M"
# File system path to store intermediate files for external sorting (default `{data_home}/index_intermediate`).
intermediate_path = ""
[region_engine.mito.memtable]
# Memtable type.
# - "experimental": experimental memtable
# - "time_series": time-series memtable (deprecated)
type = "experimental"
# The max number of keys in one shard.
index_max_keys_per_shard = 8192
# The max rows of data inside the actively writing buffer in one shard.
data_freeze_threshold = 32768
# Max dictionary bytes.
fork_dictionary_bytes = "1GiB"
# Log options
# [logging]
# Specify logs directory.
@@ -250,10 +266,11 @@ intermediate_path = ""
# enable_otlp_tracing = false
# tracing exporter endpoint with format `ip:port`, we use grpc oltp as exporter, default endpoint is `localhost:4317`
# otlp_endpoint = "localhost:4317"
# The percentage of tracing will be sampled and exported. Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1. ratio > 1 are treated as 1. Fractions < 0 are treated as 0
# tracing_sample_ratio = 1.0
# Whether to append logs to stdout. Defaults to true.
# append_stdout = true
# The percentage of tracing will be sampled and exported. Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1. ratio > 1 are treated as 1. Fractions < 0 are treated as 0
# [logging.tracing_sample_ratio]
# default_ratio = 0.0
# Standalone export the metrics generated by itself
# encoded to Prometheus remote-write format

Binary file not shown.

After

Width:  |  Height:  |  Size: 65 KiB

View File

@@ -0,0 +1,101 @@
---
Feature Name: Multi-dimension Partition Rule
Tracking Issue: https://github.com/GreptimeTeam/greptimedb/issues/3351
Date: 2024-02-21
Author: "Ruihang Xia <waynestxia@gmail.com>"
---
# Summary
A new region partition scheme that runs on multiple dimensions of the key space. The partition rule is defined by a set of simple expressions on the partition key columns.
# Motivation
The current partition rule is from MySQL's [`RANGE Partition`](https://dev.mysql.com/doc/refman/8.0/en/partitioning-range.html), which is based on a single dimension. It is sort of a [Hilbert Curve](https://en.wikipedia.org/wiki/Hilbert_curve) and pick several point on the curve to divide the space. It is neither easy to understand how the data get partitioned nor flexible enough to handle complex partitioning requirements.
Considering the future requirements like region repartitioning or autonomous rebalancing, where both workload and partition may change frequently. Here proposes a new region partition scheme that uses a set of simple expressions on the partition key columns to divide the key space.
# Details
## Partition rule
First, we define a simple expression that can be used to define the partition rule. The simple expression is a binary expression expression on the partition key columns that can be evaluated to a boolean value. The binary operator is limited to comparison operators only, like `=`, `!=`, `>`, `>=`, `<`, `<=`. And the operands are limited either literal value or partition column.
Example of valid simple expressions are $`col_A = 10`$, $`col_A \gt 10 \& col_B \gt 20`$ or $`col_A \ne 10`$.
Those expressions can be used as predicates to divide the key space into different regions. The following example have two partition columns `Col A` and `Col B`, and four partitioned regions.
```math
\left\{\begin{aligned}
&col_A \le 10 &Region_1 \\
&10 \lt col_A \& col_A \le 20 &Region_2 \\
&20 \lt col_A \space \& \space col_B \lt 100 &Region_3 \\
&20 \lt col_A \space \& \space col_B \ge 100 &Region_4
\end{aligned}\right\}
```
An advantage of this scheme is that it is easy to understand how the data get partitioned. The above example can be visualized in a 2D space (two partition column is involved in the example).
![example](2d-example.png)
Here each expression draws a line in the 2D space. Managing data partitioning becomes a matter of drawing lines in the key space.
To make it easy to use, there is a "default region" which catches all the data that doesn't match any of previous expressions. The default region exist by default and do not need to specify. It is also possible to remove this default region if the DB finds it is not necessary.
## SQL interface
The SQL interface is in response to two parts: specifying the partition columns and the partition rule. Thouth we are targeting an autonomous system, it's still allowed to give some bootstrap rules or hints on creating table.
Partition column is specified by `PARTITION ON COLUMNS` sub-clause in `CREATE TABLE`:
```sql
CREATE TABLE t (...)
PARTITION ON COLUMNS (...) ();
```
Two following brackets are for partition columns and partition rule respectively.
Columns provided here are only used as an allow-list of how the partition rule can be defined. Which means (a) the sequence between columns doesn't matter, (b) the columns provided here are not necessarily being used in the partition rule.
The partition rule part is a list of comma-separated simple expressions. Expressions here are not corresponding to region, as they might be changed by system to fit various workload.
A full example of `CREATE TABLE` with partition rule is:
```sql
CREATE TABLE IF NOT EXISTS demo (
a STRING,
b STRING,
c STRING,
d STRING,
ts TIMESTAMP,
memory DOUBLE,
TIME INDEX (ts),
PRIMARY KEY (a, b, c, d)
)
PARTITION ON COLUMNS (c, b, a) (
a < 10,
10 >= a AND a < 20,
20 >= a AND b < 100,
20 >= a AND b > 100
)
```
## Combine with storage
Examining columns separately suits our columnar storage very well in two aspects.
1. The simple expression can be pushed down to storage and file format, and is likely to hit existing index. Makes pruning operation very efficient.
2. Columns in columnar storage are not tightly coupled like in the traditional row storages, which means we can easily add or remove columns from partition rule without much impact (like a global reshuffle) on data.
The data file itself can be "projected" to the key space as a polyhedron, it is guaranteed that each plane is in parallel with some coordinate planes (in a 2D scenario, this is saying that all the files can be projected to a rectangle). Thus partition or repartition also only need to consider related columns.
![sst-project](sst-project.png)
An additional limitation is that considering how the index works and how we organize the primary keys at present, the partition columns are limited to be a subset of primary keys for better performance.
# Drawbacks
This is a breaking change.

Binary file not shown.

After

Width:  |  Height:  |  Size: 71 KiB

View File

@@ -66,7 +66,7 @@
},
"editable": true,
"fiscalYearStartMonth": 0,
"graphTooltip": 0,
"graphTooltip": 1,
"id": null,
"links": [],
"liveNow": false,
@@ -2116,7 +2116,7 @@
}
]
},
"unit": "bytes"
"unit": "none"
},
"overrides": []
},
@@ -2126,7 +2126,7 @@
"x": 0,
"y": 61
},
"id": 12,
"id": 17,
"interval": "1s",
"options": {
"legend": {
@@ -2147,8 +2147,8 @@
"uid": "${DS_PROMETHEUS-1}"
},
"disableTextWrap": false,
"editorMode": "builder",
"expr": "histogram_quantile(0.95, sum by(le) (rate(raft_engine_write_size_bucket[$__rate_interval])))",
"editorMode": "code",
"expr": "rate(raft_engine_sync_log_duration_seconds_count[2s])",
"fullMetaSearch": false,
"includeNullMetadata": false,
"instant": false,
@@ -2158,7 +2158,7 @@
"useBackend": false
}
],
"title": "wal write size",
"title": "raft engine sync count",
"type": "timeseries"
},
{
@@ -2378,6 +2378,120 @@
],
"title": "raft engine write duration seconds",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS-1}"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
},
"unit": "bytes"
},
"overrides": []
},
"gridPos": {
"h": 7,
"w": 12,
"x": 12,
"y": 68
},
"id": 12,
"interval": "1s",
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS-1}"
},
"disableTextWrap": false,
"editorMode": "code",
"expr": "histogram_quantile(0.95, sum by(le) (rate(raft_engine_write_size_bucket[$__rate_interval])))",
"fullMetaSearch": false,
"includeNullMetadata": false,
"instant": false,
"legendFormat": "req-size-p95",
"range": true,
"refId": "A",
"useBackend": false
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS-1}"
},
"editorMode": "code",
"expr": "rate(raft_engine_write_size_sum[$__rate_interval])",
"hide": false,
"instant": false,
"legendFormat": "throughput",
"range": true,
"refId": "B"
}
],
"title": "wal write size",
"type": "timeseries"
}
],
"refresh": "10s",
@@ -2387,13 +2501,13 @@
"list": []
},
"time": {
"from": "now-3h",
"from": "now-30m",
"to": "now"
},
"timepicker": {},
"timezone": "",
"title": "GreptimeDB",
"uid": "e7097237-669b-4f8d-b751-13067afbfb68",
"version": 9,
"version": 12,
"weekStart": ""
}

View File

@@ -1,8 +1,7 @@
#!/usr/bin/env bash
# This script is used to download built dashboard assets from the "GreptimeTeam/dashboard" repository.
set -e -x
set -ex
declare -r SCRIPT_DIR=$(cd $(dirname ${0}) >/dev/null 2>&1 && pwd)
declare -r ROOT_DIR=$(dirname ${SCRIPT_DIR})
@@ -13,13 +12,34 @@ RELEASE_VERSION="$(cat $STATIC_DIR/VERSION | tr -d '\t\r\n ')"
echo "Downloading assets to dir: $OUT_DIR"
cd $OUT_DIR
if [[ -z "$GITHUB_PROXY_URL" ]]; then
GITHUB_URL="https://github.com"
else
GITHUB_URL="${GITHUB_PROXY_URL%/}"
fi
function retry_fetch() {
local url=$1
local filename=$2
curl --connect-timeout 10 --retry 3 -fsSL $url --output $filename || {
echo "Failed to download $url"
echo "You may try to set http_proxy and https_proxy environment variables."
if [[ -z "$GITHUB_PROXY_URL" ]]; then
echo "You may try to set GITHUB_PROXY_URL=http://mirror.ghproxy.com/"
fi
exit 1
}
}
# Download the SHA256 checksum attached to the release. To verify the integrity
# of the download, this checksum will be used to check the download tar file
# containing the built dashboard assets.
curl -Ls https://github.com/GreptimeTeam/dashboard/releases/download/$RELEASE_VERSION/sha256.txt --output sha256.txt
retry_fetch "${GITHUB_URL}/GreptimeTeam/dashboard/releases/download/${RELEASE_VERSION}/sha256.txt" sha256.txt
# Download the tar file containing the built dashboard assets.
curl -L https://github.com/GreptimeTeam/dashboard/releases/download/$RELEASE_VERSION/build.tar.gz --output build.tar.gz
retry_fetch "${GITHUB_URL}/GreptimeTeam/dashboard/releases/download/$RELEASE_VERSION/build.tar.gz" build.tar.gz
# Verify the checksums match; exit if they don't.
case "$(uname -s)" in

View File

@@ -4,6 +4,9 @@ version.workspace = true
edition.workspace = true
license.workspace = true
[lints]
workspace = true
[dependencies]
common-base.workspace = true
common-decimal.workspace = true

View File

@@ -8,6 +8,9 @@ license.workspace = true
default = []
testing = []
[lints]
workspace = true
[dependencies]
api.workspace = true
async-trait.workspace = true

View File

@@ -7,6 +7,9 @@ license.workspace = true
[features]
testing = []
[lints]
workspace = true
[dependencies]
api.workspace = true
arc-swap = "1.0"

View File

@@ -164,11 +164,8 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to find table partitions: #{table}"))]
FindPartitions {
source: partition::error::Error,
table: String,
},
#[snafu(display("Failed to find table partitions"))]
FindPartitions { source: partition::error::Error },
#[snafu(display("Failed to find region routes"))]
FindRegionRoutes { source: partition::error::Error },
@@ -254,6 +251,12 @@ pub enum Error {
source: common_meta::error::Error,
location: Location,
},
#[snafu(display("Get null from table cache, key: {}", key))]
TableCacheNotGet { key: String, location: Location },
#[snafu(display("Failed to get table cache, err: {}", err_msg))]
GetTableCache { err_msg: String },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -314,6 +317,7 @@ impl ErrorExt for Error {
Error::QueryAccessDenied { .. } => StatusCode::AccessDenied,
Error::Datafusion { .. } => StatusCode::EngineExecuteQuery,
Error::TableMetadataManager { source, .. } => source.status_code(),
Error::TableCacheNotGet { .. } | Error::GetTableCache { .. } => StatusCode::Internal,
}
}

View File

@@ -19,9 +19,9 @@ mod partitions;
mod predicate;
mod region_peers;
mod runtime_metrics;
mod schemata;
pub mod schemata;
mod table_names;
mod tables;
pub mod tables;
use std::collections::HashMap;
use std::sync::{Arc, Weak};

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use core::pin::pin;
use std::sync::{Arc, Weak};
use arrow_schema::SchemaRef as ArrowSchemaRef;
@@ -31,7 +32,7 @@ use datatypes::vectors::{
ConstantVector, DateTimeVector, DateTimeVectorBuilder, Int64Vector, Int64VectorBuilder,
MutableVector, StringVector, StringVectorBuilder, UInt64VectorBuilder,
};
use futures::TryStreamExt;
use futures::{StreamExt, TryStreamExt};
use partition::manager::PartitionInfo;
use partition::partition::PartitionDef;
use snafu::{OptionExt, ResultExt};
@@ -240,40 +241,64 @@ impl InformationSchemaPartitionsBuilder {
let predicates = Predicates::from_scan_request(&request);
for schema_name in catalog_manager.schema_names(&catalog_name).await? {
let mut stream = catalog_manager.tables(&catalog_name, &schema_name).await;
let table_info_stream = catalog_manager
.tables(&catalog_name, &schema_name)
.await
.try_filter_map(|t| async move {
let table_info = t.table_info();
if table_info.table_type == TableType::Temporary {
Ok(None)
} else {
Ok(Some(table_info))
}
});
while let Some(table) = stream.try_next().await? {
let table_info = table.table_info();
const BATCH_SIZE: usize = 128;
if table_info.table_type == TableType::Temporary {
continue;
}
// Split table infos into chunks
let mut table_info_chunks = pin!(table_info_stream.ready_chunks(BATCH_SIZE));
let table_id = table_info.ident.table_id;
let partitions = if let Some(partition_manager) = &partition_manager {
while let Some(table_infos) = table_info_chunks.next().await {
let table_infos = table_infos.into_iter().collect::<Result<Vec<_>>>()?;
let table_ids: Vec<TableId> =
table_infos.iter().map(|info| info.ident.table_id).collect();
let mut table_partitions = if let Some(partition_manager) = &partition_manager {
partition_manager
.find_table_partitions(table_id)
.batch_find_table_partitions(&table_ids)
.await
.context(FindPartitionsSnafu {
table: &table_info.name,
})?
.context(FindPartitionsSnafu)?
} else {
// Current node must be a standalone instance, contains only one partition by default.
// TODO(dennis): change it when we support multi-regions for standalone.
vec![PartitionInfo {
id: RegionId::new(table_id, 0),
partition: PartitionDef::new(vec![], vec![]),
}]
table_ids
.into_iter()
.map(|table_id| {
(
table_id,
vec![PartitionInfo {
id: RegionId::new(table_id, 0),
partition: PartitionDef::new(vec![], vec![]),
}],
)
})
.collect()
};
self.add_partitions(
&predicates,
&table_info,
&catalog_name,
&schema_name,
&table_info.name,
&partitions,
);
for table_info in table_infos {
let partitions = table_partitions
.remove(&table_info.ident.table_id)
.unwrap_or(vec![]);
self.add_partitions(
&predicates,
&table_info,
&catalog_name,
&schema_name,
&table_info.name,
&partitions,
);
}
}
}

View File

@@ -199,7 +199,7 @@ impl InformationSchemaRegionPeersBuilder {
let table_routes = if let Some(partition_manager) = &partition_manager {
partition_manager
.find_region_routes_batch(&table_ids)
.batch_find_region_routes(&table_ids)
.await
.context(FindRegionRoutesSnafu)?
} else {

View File

@@ -37,8 +37,8 @@ use crate::error::{
use crate::information_schema::{InformationTable, Predicates};
use crate::CatalogManager;
const CATALOG_NAME: &str = "catalog_name";
const SCHEMA_NAME: &str = "schema_name";
pub const CATALOG_NAME: &str = "catalog_name";
pub const SCHEMA_NAME: &str = "schema_name";
const DEFAULT_CHARACTER_SET_NAME: &str = "default_character_set_name";
const DEFAULT_COLLATION_NAME: &str = "default_collation_name";
const INIT_CAPACITY: usize = 42;

View File

@@ -39,10 +39,10 @@ use crate::error::{
use crate::information_schema::{InformationTable, Predicates};
use crate::CatalogManager;
const TABLE_CATALOG: &str = "table_catalog";
const TABLE_SCHEMA: &str = "table_schema";
const TABLE_NAME: &str = "table_name";
const TABLE_TYPE: &str = "table_type";
pub const TABLE_CATALOG: &str = "table_catalog";
pub const TABLE_SCHEMA: &str = "table_schema";
pub const TABLE_NAME: &str = "table_name";
pub const TABLE_TYPE: &str = "table_type";
const TABLE_ID: &str = "table_id";
const ENGINE: &str = "engine";
const INIT_CAPACITY: usize = 42;

View File

@@ -82,12 +82,10 @@ impl CachedMetaKvBackendBuilder {
let cache_ttl = self.cache_ttl.unwrap_or(DEFAULT_CACHE_TTL);
let cache_tti = self.cache_tti.unwrap_or(DEFAULT_CACHE_TTI);
let cache = Arc::new(
CacheBuilder::new(cache_max_capacity)
.time_to_live(cache_ttl)
.time_to_idle(cache_tti)
.build(),
);
let cache = CacheBuilder::new(cache_max_capacity)
.time_to_live(cache_ttl)
.time_to_idle(cache_tti)
.build();
let kv_backend = Arc::new(MetaKvBackend {
client: self.meta_client,
@@ -104,7 +102,7 @@ impl CachedMetaKvBackendBuilder {
}
}
pub type CacheBackendRef = Arc<Cache<Vec<u8>, KeyValue>>;
pub type CacheBackend = Cache<Vec<u8>, KeyValue>;
/// A wrapper of `MetaKvBackend` with cache support.
///
@@ -117,7 +115,7 @@ pub type CacheBackendRef = Arc<Cache<Vec<u8>, KeyValue>>;
/// TTL and TTI for cache.
pub struct CachedMetaKvBackend {
kv_backend: KvBackendRef,
cache: CacheBackendRef,
cache: CacheBackend,
name: String,
version: AtomicUsize,
}
@@ -317,12 +315,10 @@ impl CachedMetaKvBackend {
// only for test
#[cfg(test)]
fn wrap(kv_backend: KvBackendRef) -> Self {
let cache = Arc::new(
CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
.time_to_live(DEFAULT_CACHE_TTL)
.time_to_idle(DEFAULT_CACHE_TTI)
.build(),
);
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
.time_to_live(DEFAULT_CACHE_TTL)
.time_to_idle(DEFAULT_CACHE_TTI)
.build();
let name = format!("CachedKvBackend({})", kv_backend.name());
Self {
@@ -333,7 +329,7 @@ impl CachedMetaKvBackend {
}
}
pub fn cache(&self) -> &CacheBackendRef {
pub fn cache(&self) -> &CacheBackend {
&self.cache
}

View File

@@ -15,9 +15,13 @@
use std::any::Any;
use std::collections::BTreeSet;
use std::sync::{Arc, Weak};
use std::time::Duration;
use async_stream::try_stream;
use common_catalog::consts::{DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID};
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID,
};
use common_catalog::format_full_table_name;
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::{CacheInvalidator, CacheInvalidatorRef, Context};
use common_meta::error::Result as MetaResult;
@@ -30,6 +34,7 @@ use common_meta::kv_backend::KvBackendRef;
use common_meta::table_name::TableName;
use futures_util::stream::BoxStream;
use futures_util::{StreamExt, TryStreamExt};
use moka::future::{Cache as AsyncCache, CacheBuilder};
use moka::sync::Cache;
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
use snafu::prelude::*;
@@ -38,9 +43,10 @@ use table::metadata::TableId;
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
use table::TableRef;
use crate::error::Error::{GetTableCache, TableCacheNotGet};
use crate::error::{
self as catalog_err, ListCatalogsSnafu, ListSchemasSnafu, ListTablesSnafu,
Result as CatalogResult, TableMetadataManagerSnafu,
Result as CatalogResult, TableCacheNotGetSnafu, TableMetadataManagerSnafu,
};
use crate::information_schema::InformationSchemaProvider;
use crate::CatalogManager;
@@ -60,6 +66,7 @@ pub struct KvBackendCatalogManager {
table_metadata_manager: TableMetadataManagerRef,
/// A sub-CatalogManager that handles system tables
system_catalog: SystemCatalog,
table_cache: AsyncCache<String, TableRef>,
}
fn make_table(table_info_value: TableInfoValue) -> CatalogResult<TableRef> {
@@ -79,13 +86,24 @@ impl CacheInvalidator for KvBackendCatalogManager {
}
async fn invalidate_table_name(&self, ctx: &Context, table_name: TableName) -> MetaResult<()> {
let table_cache_key = format_full_table_name(
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
);
self.cache_invalidator
.invalidate_table_name(ctx, table_name)
.await
.await?;
self.table_cache.invalidate(&table_cache_key).await;
Ok(())
}
}
const DEFAULT_CACHED_CATALOG: u64 = 128;
const CATALOG_CACHE_MAX_CAPACITY: u64 = 128;
const TABLE_CACHE_MAX_CAPACITY: u64 = 65536;
const TABLE_CACHE_TTL: Duration = Duration::from_secs(10 * 60);
const TABLE_CACHE_TTI: Duration = Duration::from_secs(5 * 60);
impl KvBackendCatalogManager {
pub fn new(backend: KvBackendRef, cache_invalidator: CacheInvalidatorRef) -> Arc<Self> {
@@ -95,13 +113,16 @@ impl KvBackendCatalogManager {
cache_invalidator,
system_catalog: SystemCatalog {
catalog_manager: me.clone(),
catalog_cache: Cache::new(DEFAULT_CACHED_CATALOG),
catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY),
information_schema_provider: Arc::new(InformationSchemaProvider::new(
// The catalog name is not used in system_catalog, so let it empty
String::default(),
DEFAULT_CATALOG_NAME.to_string(),
me.clone(),
)),
},
table_cache: CacheBuilder::new(TABLE_CACHE_MAX_CAPACITY)
.time_to_live(TABLE_CACHE_TTL)
.time_to_idle(TABLE_CACHE_TTI)
.build(),
})
}
@@ -216,29 +237,52 @@ impl CatalogManager for KvBackendCatalogManager {
return Ok(Some(table));
}
let key = TableNameKey::new(catalog, schema, table_name);
let Some(table_name_value) = self
.table_metadata_manager
.table_name_manager()
.get(key)
.await
.context(TableMetadataManagerSnafu)?
else {
return Ok(None);
};
let table_id = table_name_value.table_id();
let init = async {
let table_name_key = TableNameKey::new(catalog, schema, table_name);
let Some(table_name_value) = self
.table_metadata_manager
.table_name_manager()
.get(table_name_key)
.await
.context(TableMetadataManagerSnafu)?
else {
return TableCacheNotGetSnafu {
key: table_name_key.to_string(),
}
.fail();
};
let table_id = table_name_value.table_id();
let Some(table_info_value) = self
.table_metadata_manager
.table_info_manager()
.get(table_id)
.await
.context(TableMetadataManagerSnafu)?
.map(|v| v.into_inner())
else {
return Ok(None);
let Some(table_info_value) = self
.table_metadata_manager
.table_info_manager()
.get(table_id)
.await
.context(TableMetadataManagerSnafu)?
.map(|v| v.into_inner())
else {
return TableCacheNotGetSnafu {
key: table_name_key.to_string(),
}
.fail();
};
make_table(table_info_value)
};
make_table(table_info_value).map(Some)
match self
.table_cache
.try_get_with_by_ref(&format_full_table_name(catalog, schema, table_name), init)
.await
{
Ok(table) => Ok(Some(table)),
Err(err) => match err.as_ref() {
TableCacheNotGet { .. } => Ok(None),
_ => Err(err),
},
}
.map_err(|err| GetTableCache {
err_msg: err.to_string(),
})
}
async fn tables<'a>(

View File

@@ -7,6 +7,9 @@ license.workspace = true
[features]
testing = []
[lints]
workspace = true
[dependencies]
api.workspace = true
arc-swap = "1.6"
@@ -34,6 +37,8 @@ parking_lot = "0.12"
prometheus.workspace = true
prost.workspace = true
rand.workspace = true
serde.workspace = true
serde_json.workspace = true
session.workspace = true
snafu.workspace = true
tokio.workspace = true

View File

@@ -340,7 +340,7 @@ impl Database {
output_ordering: None,
metrics: Default::default(),
};
Ok(Output::Stream(Box::pin(record_batch_stream)))
Ok(Output::new_stream(Box::pin(record_batch_stream)))
}
}
}

View File

@@ -134,10 +134,17 @@ impl From<Status> for Error {
impl Error {
pub fn should_retry(&self) -> bool {
!matches!(
// TODO(weny): figure out each case of these codes.
matches!(
self,
Self::RegionServer {
code: Code::InvalidArgument,
code: Code::Cancelled,
..
} | Self::RegionServer {
code: Code::DeadlineExceeded,
..
} | Self::RegionServer {
code: Code::Unavailable,
..
}
)

View File

@@ -123,8 +123,8 @@ impl RegionRequester {
.fail();
};
let metrics_str = Arc::new(ArcSwapOption::from(None));
let ref_str = metrics_str.clone();
let metrics = Arc::new(ArcSwapOption::from(None));
let metrics_ref = metrics.clone();
let tracing_context = TracingContext::from_current_span();
@@ -140,7 +140,8 @@ impl RegionRequester {
match flight_message {
FlightMessage::Recordbatch(record_batch) => yield Ok(record_batch),
FlightMessage::Metrics(s) => {
ref_str.swap(Some(Arc::new(s)));
let m = serde_json::from_str(&s).ok().map(Arc::new);
metrics_ref.swap(m);
break;
}
_ => {
@@ -159,7 +160,7 @@ impl RegionRequester {
schema,
stream,
output_ordering: None,
metrics: metrics_str,
metrics,
};
Ok(Box::pin(record_batch_stream))
}
@@ -196,7 +197,7 @@ impl RegionRequester {
check_response_header(header)?;
Ok(affected_rows)
Ok(affected_rows as _)
}
pub async fn handle(&self, request: RegionRequest) -> Result<AffectedRows> {

View File

@@ -12,6 +12,9 @@ path = "src/bin/greptime.rs"
[features]
tokio-console = ["common-telemetry/tokio-console"]
[lints]
workspace = true
[dependencies]
anymap = "1.0.0-beta.2"
async-trait.workspace = true

View File

@@ -142,7 +142,7 @@ impl Export {
.with_context(|_| RequestDatabaseSnafu {
sql: "show databases".to_string(),
})?;
let Output::Stream(stream) = result else {
let Output::Stream(stream, _) = result else {
NotDataFromOutputSnafu.fail()?
};
let record_batch = collect(stream)
@@ -183,7 +183,7 @@ impl Export {
.sql(&sql)
.await
.with_context(|_| RequestDatabaseSnafu { sql })?;
let Output::Stream(stream) = result else {
let Output::Stream(stream, _) = result else {
NotDataFromOutputSnafu.fail()?
};
let Some(record_batch) = collect(stream)
@@ -235,7 +235,7 @@ impl Export {
.sql(&sql)
.await
.with_context(|_| RequestDatabaseSnafu { sql })?;
let Output::Stream(stream) = result else {
let Output::Stream(stream, _) = result else {
NotDataFromOutputSnafu.fail()?
};
let record_batch = collect(stream)

View File

@@ -185,7 +185,7 @@ impl Repl {
.context(RequestDatabaseSnafu { sql: &sql })?;
let either = match output {
Output::Stream(s) => {
Output::Stream(s, _) => {
let x = RecordBatches::try_collect(s)
.await
.context(CollectRecordBatchesSnafu)?;
@@ -260,6 +260,7 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
catalog_list,
None,
None,
None,
false,
plugins.clone(),
));

View File

@@ -43,6 +43,10 @@ impl Instance {
pub fn datanode_mut(&mut self) -> &mut Datanode {
&mut self.datanode
}
pub fn datanode(&self) -> &Datanode {
&self.datanode
}
}
#[async_trait]
@@ -235,6 +239,7 @@ impl StartCommand {
.with_default_grpc_server(&datanode.region_server())
.enable_http_service()
.build()
.await
.context(StartDatanodeSnafu)?;
datanode.setup_services(services);

View File

@@ -43,13 +43,17 @@ pub struct Instance {
}
impl Instance {
fn new(frontend: FeInstance) -> Self {
pub fn new(frontend: FeInstance) -> Self {
Self { frontend }
}
pub fn mut_inner(&mut self) -> &mut FeInstance {
&mut self.frontend
}
pub fn inner(&self) -> &FeInstance {
&self.frontend
}
}
#[async_trait]
@@ -271,6 +275,7 @@ impl StartCommand {
let servers = Services::new(opts.clone(), Arc::new(instance.clone()), plugins)
.build()
.await
.context(StartFrontendSnafu)?;
instance
.build_servers(opts, servers)

View File

@@ -32,11 +32,11 @@ lazy_static::lazy_static! {
}
#[async_trait]
pub trait App {
pub trait App: Send {
fn name(&self) -> &str;
/// A hook for implementor to make something happened before actual startup. Defaults to no-op.
fn pre_start(&mut self) -> error::Result<()> {
async fn pre_start(&mut self) -> error::Result<()> {
Ok(())
}
@@ -46,24 +46,21 @@ pub trait App {
}
pub async fn start_app(mut app: Box<dyn App>) -> error::Result<()> {
let name = app.name().to_string();
info!("Starting app: {}", app.name());
app.pre_start()?;
app.pre_start().await?;
tokio::select! {
result = app.start() => {
if let Err(err) = result {
error!(err; "Failed to start app {name}!");
}
}
_ = tokio::signal::ctrl_c() => {
if let Err(err) = app.stop().await {
error!(err; "Failed to stop app {name}!");
}
info!("Goodbye!");
}
app.start().await?;
if let Err(e) = tokio::signal::ctrl_c().await {
error!("Failed to listen for ctrl-c signal: {}", e);
// It's unusual to fail to listen for ctrl-c signal, maybe there's something unexpected in
// the underlying system. So we stop the app instead of running nonetheless to let people
// investigate the issue.
}
app.stop().await?;
info!("Goodbye!");
Ok(())
}

View File

@@ -21,8 +21,8 @@ use common_catalog::consts::MIN_USER_TABLE_ID;
use common_config::{metadata_store_dir, KvBackendConfig};
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::ddl::table_meta::TableMetadataAllocator;
use common_meta::ddl::DdlTaskExecutorRef;
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
use common_meta::ddl::ProcedureExecutorRef;
use common_meta::ddl_manager::DdlManager;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
@@ -419,11 +419,11 @@ impl StartCommand {
let table_metadata_manager =
Self::create_table_metadata_manager(kv_backend.clone()).await?;
let table_meta_allocator = TableMetadataAllocator::new(
let table_meta_allocator = Arc::new(TableMetadataAllocator::new(
table_id_sequence,
wal_options_allocator.clone(),
table_metadata_manager.clone(),
);
table_metadata_manager.table_name_manager().clone(),
));
let ddl_task_executor = Self::create_ddl_task_executor(
table_metadata_manager,
@@ -441,6 +441,7 @@ impl StartCommand {
let servers = Services::new(fe_opts.clone(), Arc::new(frontend.clone()), fe_plugins)
.build()
.await
.context(StartFrontendSnafu)?;
frontend
.build_servers(fe_opts, servers)
@@ -458,9 +459,9 @@ impl StartCommand {
table_metadata_manager: TableMetadataManagerRef,
procedure_manager: ProcedureManagerRef,
datanode_manager: DatanodeManagerRef,
table_meta_allocator: TableMetadataAllocator,
) -> Result<DdlTaskExecutorRef> {
let ddl_task_executor: DdlTaskExecutorRef = Arc::new(
table_meta_allocator: TableMetadataAllocatorRef,
) -> Result<ProcedureExecutorRef> {
let procedure_executor: ProcedureExecutorRef = Arc::new(
DdlManager::try_new(
procedure_manager,
datanode_manager,
@@ -472,7 +473,7 @@ impl StartCommand {
.context(InitDdlManagerSnafu)?,
);
Ok(ddl_task_executor)
Ok(procedure_executor)
}
pub async fn create_table_metadata_manager(

View File

@@ -4,6 +4,9 @@ version.workspace = true
edition.workspace = true
license.workspace = true
[lints]
workspace = true
[dependencies]
anymap = "1.0.0-beta.2"
bitvec = "1.0"

View File

@@ -21,6 +21,8 @@ pub mod readable_size;
use core::any::Any;
use std::sync::{Arc, Mutex, MutexGuard};
pub type AffectedRows = usize;
pub use bit_vec::BitVec;
/// [`Plugins`] is a wrapper of Arc contents.

View File

@@ -4,6 +4,9 @@ version.workspace = true
edition.workspace = true
license.workspace = true
[lints]
workspace = true
[dependencies]
common-error.workspace = true
common-macro.workspace = true

View File

@@ -4,6 +4,9 @@ version.workspace = true
edition.workspace = true
license.workspace = true
[lints]
workspace = true
[dependencies]
common-base.workspace = true
humantime-serde.workspace = true

View File

@@ -4,6 +4,9 @@ version.workspace = true
edition.workspace = true
license.workspace = true
[lints]
workspace = true
[dependencies]
arrow.workspace = true
arrow-schema.workspace = true

View File

@@ -4,6 +4,9 @@ version.workspace = true
edition.workspace = true
license.workspace = true
[lints]
workspace = true
[dependencies]
arrow.workspace = true
bigdecimal.workspace = true

View File

@@ -4,6 +4,9 @@ version.workspace = true
edition.workspace = true
license.workspace = true
[lints]
workspace = true
[dependencies]
snafu.workspace = true
strum.workspace = true

View File

@@ -19,7 +19,9 @@ pub mod format;
pub mod mock;
pub mod status_code;
pub use snafu;
// HACK - these headers are here for shared in gRPC services. For common HTTP headers,
// please define in `src/servers/src/http/header.rs`.
pub const GREPTIME_DB_HEADER_ERROR_CODE: &str = "x-greptime-err-code";
pub const GREPTIME_DB_HEADER_ERROR_MSG: &str = "x-greptime-err-msg";
pub use snafu;

View File

@@ -4,13 +4,19 @@ edition.workspace = true
version.workspace = true
license.workspace = true
[lints]
workspace = true
[dependencies]
api.workspace = true
arc-swap = "1.0"
async-trait.workspace = true
chrono-tz = "0.6"
common-base.workspace = true
common-catalog.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-meta.workspace = true
common-query.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
@@ -23,9 +29,12 @@ num = "0.4"
num-traits = "0.2"
once_cell.workspace = true
paste = "1.0"
serde.workspace = true
serde_json.workspace = true
session.workspace = true
snafu.workspace = true
statrs = "0.16"
store-api.workspace = true
table.workspace = true
[dev-dependencies]

View File

@@ -30,6 +30,17 @@ pub struct FunctionContext {
pub state: Arc<FunctionState>,
}
impl FunctionContext {
/// Create a mock [`FunctionContext`] for test.
#[cfg(any(test, feature = "testing"))]
pub fn mock() -> Self {
Self {
query_ctx: QueryContextBuilder::default().build(),
state: Arc::new(FunctionState::mock()),
}
}
}
impl Default for FunctionContext {
fn default() -> Self {
Self {

View File

@@ -21,6 +21,7 @@ use once_cell::sync::Lazy;
use crate::function::FunctionRef;
use crate::scalars::aggregate::{AggregateFunctionMetaRef, AggregateFunctions};
use crate::scalars::date::DateFunction;
use crate::scalars::expression::ExpressionFunction;
use crate::scalars::math::MathFunction;
use crate::scalars::numpy::NumpyFunction;
use crate::scalars::timestamp::TimestampFunction;
@@ -80,6 +81,7 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
NumpyFunction::register(&function_registry);
TimestampFunction::register(&function_registry);
DateFunction::register(&function_registry);
ExpressionFunction::register(&function_registry);
// Aggregate functions
AggregateFunctions::register(&function_registry);

View File

@@ -13,15 +13,14 @@
// limitations under the License.
use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::ProcedureStateResponse;
use async_trait::async_trait;
use common_base::AffectedRows;
use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse};
use common_query::error::Result;
use session::context::QueryContextRef;
use table::requests::{DeleteRequest, InsertRequest};
pub type AffectedRows = usize;
use store_api::storage::RegionId;
use table::requests::{CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest};
/// A trait for handling table mutations in `QueryEngine`.
#[async_trait]
@@ -32,23 +31,39 @@ pub trait TableMutationHandler: Send + Sync {
/// Delete rows from the table.
async fn delete(&self, request: DeleteRequest, ctx: QueryContextRef) -> Result<AffectedRows>;
/// Migrate a region from source peer to target peer, returns the procedure id if success.
async fn migrate_region(
/// Trigger a flush task for table.
async fn flush(&self, request: FlushTableRequest, ctx: QueryContextRef)
-> Result<AffectedRows>;
/// Trigger a compaction task for table.
async fn compact(
&self,
region_id: u64,
from_peer: u64,
to_peer: u64,
replay_timeout: Duration,
) -> Result<String>;
request: CompactTableRequest,
ctx: QueryContextRef,
) -> Result<AffectedRows>;
/// Trigger a flush task for a table region.
async fn flush_region(&self, region_id: RegionId, ctx: QueryContextRef)
-> Result<AffectedRows>;
/// Trigger a compaction task for a table region.
async fn compact_region(
&self,
region_id: RegionId,
ctx: QueryContextRef,
) -> Result<AffectedRows>;
}
/// A trait for handling meta service requests in `QueryEngine`.
/// A trait for handling procedure service requests in `QueryEngine`.
#[async_trait]
pub trait MetaServiceHandler: Send + Sync {
pub trait ProcedureServiceHandler: Send + Sync {
/// Migrate a region from source peer to target peer, returns the procedure id if success.
async fn migrate_region(&self, request: MigrateRegionRequest) -> Result<Option<String>>;
/// Query the procedure' state by its id
async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse>;
}
pub type TableMutationHandlerRef = Arc<dyn TableMutationHandler>;
pub type MetaServiceHandlerRef = Arc<dyn MetaServiceHandler>;
pub type ProcedureServiceHandlerRef = Arc<dyn ProcedureServiceHandler>;

View File

@@ -12,8 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_query::error::{InvalidInputTypeSnafu, Result};
use common_query::prelude::{Signature, TypeSignature, Volatility};
use datatypes::prelude::ConcreteDataType;
use datatypes::types::cast::cast;
use datatypes::value::ValueRef;
use snafu::ResultExt;
/// Create a function signature with oneof signatures of interleaving two arguments.
pub fn one_of_sigs2(args1: Vec<ConcreteDataType>, args2: Vec<ConcreteDataType>) -> Signature {
@@ -27,3 +31,15 @@ pub fn one_of_sigs2(args1: Vec<ConcreteDataType>, args2: Vec<ConcreteDataType>)
Signature::one_of(sigs, Volatility::Immutable)
}
/// Cast a [`ValueRef`] to u64, returns `None` if fails
pub fn cast_u64(value: &ValueRef) -> Result<Option<u64>> {
cast((*value).into(), &ConcreteDataType::uint64_datatype())
.context(InvalidInputTypeSnafu {
err_msg: format!(
"Failed to cast input into uint64, actual type: {:#?}",
value.data_type(),
),
})
.map(|v| v.as_u64())
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod macros;
pub mod scalars;
mod system;
mod table;

View File

@@ -0,0 +1,27 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/// Ensure current function is invokded under `greptime` catalog.
#[macro_export]
macro_rules! ensure_greptime {
($func_ctx: expr) => {{
use common_catalog::consts::DEFAULT_CATALOG_NAME;
snafu::ensure!(
$func_ctx.query_ctx.current_catalog() == DEFAULT_CATALOG_NAME,
common_query::error::PermissionDeniedSnafu {
err_msg: format!("current catalog is not {DEFAULT_CATALOG_NAME}")
}
);
}};
}

View File

@@ -14,8 +14,22 @@
mod binary;
mod ctx;
mod is_null;
mod unary;
use std::sync::Arc;
pub use binary::scalar_binary_op;
pub use ctx::EvalContext;
pub use unary::scalar_unary_op;
use crate::function_registry::FunctionRegistry;
use crate::scalars::expression::is_null::IsNullFunction;
pub(crate) struct ExpressionFunction;
impl ExpressionFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register(Arc::new(IsNullFunction));
}
}

View File

@@ -0,0 +1,109 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use std::fmt::Display;
use std::sync::Arc;
use common_query::error;
use common_query::error::{ArrowComputeSnafu, InvalidFuncArgsSnafu};
use common_query::prelude::{Signature, Volatility};
use datafusion::arrow::array::ArrayRef;
use datafusion::arrow::compute::is_null;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::VectorRef;
use datatypes::vectors::Helper;
use snafu::{ensure, ResultExt};
use crate::function::{Function, FunctionContext};
const NAME: &str = "isnull";
/// The function to check whether an expression is NULL
#[derive(Clone, Debug, Default)]
pub struct IsNullFunction;
impl Display for IsNullFunction {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", NAME.to_ascii_uppercase())
}
}
impl Function for IsNullFunction {
fn name(&self) -> &str {
NAME
}
fn return_type(&self, _: &[ConcreteDataType]) -> common_query::error::Result<ConcreteDataType> {
Ok(ConcreteDataType::boolean_datatype())
}
fn signature(&self) -> Signature {
Signature::any(1, Volatility::Immutable)
}
fn eval(
&self,
_func_ctx: FunctionContext,
columns: &[VectorRef],
) -> common_query::error::Result<VectorRef> {
ensure!(
columns.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly one, have: {}",
columns.len()
),
}
);
let values = &columns[0];
let arrow_array = &values.to_arrow_array();
let result = is_null(arrow_array).context(ArrowComputeSnafu)?;
Helper::try_into_vector(Arc::new(result) as ArrayRef).context(error::FromArrowArraySnafu)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_query::prelude::TypeSignature;
use datatypes::scalars::ScalarVector;
use datatypes::vectors::{BooleanVector, Float32Vector};
use super::*;
#[test]
fn test_is_null_function() {
let is_null = IsNullFunction;
assert_eq!("isnull", is_null.name());
assert_eq!(
ConcreteDataType::boolean_datatype(),
is_null.return_type(&[]).unwrap()
);
assert_eq!(
is_null.signature(),
Signature {
type_signature: TypeSignature::Any(1),
volatility: Volatility::Immutable
}
);
let values = vec![None, Some(3.0), None];
let args: Vec<VectorRef> = vec![Arc::new(Float32Vector::from(values))];
let vector = is_null.eval(FunctionContext::default(), &args).unwrap();
let expect: VectorRef = Arc::new(BooleanVector::from_vec(vec![true, false, true]));
assert_eq!(expect, vector);
}
}

View File

@@ -128,7 +128,7 @@ mod tests {
];
let result = function.eval(FunctionContext::default(), &args).unwrap();
assert_eq!(result.len(), 4);
for i in 0..3 {
for i in 0..4 {
let p: i64 = (nums[i] % divs[i]) as i64;
assert!(matches!(result.get(i), Value::Int64(v) if v == p));
}
@@ -160,7 +160,7 @@ mod tests {
];
let result = function.eval(FunctionContext::default(), &args).unwrap();
assert_eq!(result.len(), 4);
for i in 0..3 {
for i in 0..4 {
let p: u64 = (nums[i] % divs[i]) as u64;
assert!(matches!(result.get(i), Value::UInt64(v) if v == p));
}
@@ -192,7 +192,7 @@ mod tests {
];
let result = function.eval(FunctionContext::default(), &args).unwrap();
assert_eq!(result.len(), 4);
for i in 0..3 {
for i in 0..4 {
let p: f64 = nums[i] % divs[i];
assert!(matches!(result.get(i), Value::Float64(v) if v == p));
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::handlers::{MetaServiceHandlerRef, TableMutationHandlerRef};
use crate::handlers::{ProcedureServiceHandlerRef, TableMutationHandlerRef};
/// Shared state for SQL functions.
/// The handlers in state may be `None` in cli command-line or test cases.
@@ -20,6 +20,104 @@ use crate::handlers::{MetaServiceHandlerRef, TableMutationHandlerRef};
pub struct FunctionState {
// The table mutation handler
pub table_mutation_handler: Option<TableMutationHandlerRef>,
// The meta service handler
pub meta_service_handler: Option<MetaServiceHandlerRef>,
// The procedure service handler
pub procedure_service_handler: Option<ProcedureServiceHandlerRef>,
}
impl FunctionState {
/// Create a mock [`FunctionState`] for test.
#[cfg(any(test, feature = "testing"))]
pub fn mock() -> Self {
use std::sync::Arc;
use api::v1::meta::ProcedureStatus;
use async_trait::async_trait;
use common_base::AffectedRows;
use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse};
use common_query::error::Result;
use session::context::QueryContextRef;
use store_api::storage::RegionId;
use table::requests::{
CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest,
};
use crate::handlers::{ProcedureServiceHandler, TableMutationHandler};
struct MockProcedureServiceHandler;
struct MockTableMutationHandler;
const ROWS: usize = 42;
#[async_trait]
impl ProcedureServiceHandler for MockProcedureServiceHandler {
async fn migrate_region(
&self,
_request: MigrateRegionRequest,
) -> Result<Option<String>> {
Ok(Some("test_pid".to_string()))
}
async fn query_procedure_state(&self, _pid: &str) -> Result<ProcedureStateResponse> {
Ok(ProcedureStateResponse {
status: ProcedureStatus::Done.into(),
error: "OK".to_string(),
..Default::default()
})
}
}
#[async_trait]
impl TableMutationHandler for MockTableMutationHandler {
async fn insert(
&self,
_request: InsertRequest,
_ctx: QueryContextRef,
) -> Result<AffectedRows> {
Ok(ROWS)
}
async fn delete(
&self,
_request: DeleteRequest,
_ctx: QueryContextRef,
) -> Result<AffectedRows> {
Ok(ROWS)
}
async fn flush(
&self,
_request: FlushTableRequest,
_ctx: QueryContextRef,
) -> Result<AffectedRows> {
Ok(ROWS)
}
async fn compact(
&self,
_request: CompactTableRequest,
_ctx: QueryContextRef,
) -> Result<AffectedRows> {
Ok(ROWS)
}
async fn flush_region(
&self,
_region_id: RegionId,
_ctx: QueryContextRef,
) -> Result<AffectedRows> {
Ok(ROWS)
}
async fn compact_region(
&self,
_region_id: RegionId,
_ctx: QueryContextRef,
) -> Result<AffectedRows> {
Ok(ROWS)
}
}
Self {
table_mutation_handler: Some(Arc::new(MockTableMutationHandler)),
procedure_service_handler: Some(Arc::new(MockProcedureServiceHandler)),
}
}
}

View File

@@ -14,6 +14,7 @@
mod build;
mod database;
mod procedure_state;
mod timezone;
mod version;
@@ -21,6 +22,7 @@ use std::sync::Arc;
use build::BuildFunction;
use database::DatabaseFunction;
use procedure_state::ProcedureStateFunction;
use timezone::TimezoneFunction;
use version::VersionFunction;
@@ -34,5 +36,6 @@ impl SystemFunction {
registry.register(Arc::new(VersionFunction));
registry.register(Arc::new(DatabaseFunction));
registry.register(Arc::new(TimezoneFunction));
registry.register(Arc::new(ProcedureStateFunction));
}
}

View File

@@ -22,7 +22,7 @@ use datatypes::vectors::{StringVector, VectorRef};
use crate::function::{Function, FunctionContext};
/// Generates build information
/// Generates build information
#[derive(Clone, Debug, Default)]
pub struct BuildFunction;
@@ -42,11 +42,7 @@ impl Function for BuildFunction {
}
fn signature(&self) -> Signature {
Signature::uniform(
0,
vec![ConcreteDataType::string_datatype()],
Volatility::Immutable,
)
Signature::uniform(0, vec![], Volatility::Immutable)
}
fn eval(&self, _func_ctx: FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
@@ -75,7 +71,7 @@ mod tests {
Signature {
type_signature: TypeSignature::Uniform(0, valid_types),
volatility: Volatility::Immutable
} if valid_types == vec![ConcreteDataType::string_datatype()]
} if valid_types.is_empty()
));
let build_info = common_version::build_info().to_string();
let vector = build.eval(FunctionContext::default(), &[]).unwrap();

View File

@@ -0,0 +1,159 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use api::v1::meta::ProcedureStatus;
use common_macro::admin_fn;
use common_meta::rpc::procedure::ProcedureStateResponse;
use common_query::error::Error::ThreadJoin;
use common_query::error::{
InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result,
UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::{Signature, Volatility};
use common_telemetry::error;
use datatypes::prelude::*;
use datatypes::vectors::VectorRef;
use serde::Serialize;
use session::context::QueryContextRef;
use snafu::{ensure, Location, OptionExt};
use crate::ensure_greptime;
use crate::function::{Function, FunctionContext};
use crate::handlers::ProcedureServiceHandlerRef;
#[derive(Serialize)]
struct ProcedureStateJson {
status: String,
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<String>,
}
/// A function to query procedure state by its id.
/// Such as `procedure_state(pid)`.
#[admin_fn(
name = "ProcedureStateFunction",
display_name = "procedure_state",
sig_fn = "signature",
ret = "string"
)]
pub(crate) async fn procedure_state(
procedure_service_handler: &ProcedureServiceHandlerRef,
_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
ensure!(
params.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 1, have: {}",
params.len()
),
}
);
let ValueRef::String(pid) = params[0] else {
return UnsupportedInputDataTypeSnafu {
function: "procedure_state",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
let ProcedureStateResponse { status, error, .. } =
procedure_service_handler.query_procedure_state(pid).await?;
let status = ProcedureStatus::try_from(status)
.map(|v| v.as_str_name())
.unwrap_or("Unknown");
let state = ProcedureStateJson {
status: status.to_string(),
error: if error.is_empty() { None } else { Some(error) },
};
let json = serde_json::to_string(&state).unwrap_or_default();
Ok(Value::from(json))
}
fn signature() -> Signature {
Signature::uniform(
1,
vec![ConcreteDataType::string_datatype()],
Volatility::Immutable,
)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_query::prelude::TypeSignature;
use datatypes::vectors::StringVector;
use super::*;
#[test]
fn test_procedure_state_misc() {
let f = ProcedureStateFunction;
assert_eq!("procedure_state", f.name());
assert_eq!(
ConcreteDataType::string_datatype(),
f.return_type(&[]).unwrap()
);
assert!(matches!(f.signature(),
Signature {
type_signature: TypeSignature::Uniform(1, valid_types),
volatility: Volatility::Immutable
} if valid_types == vec![ConcreteDataType::string_datatype()]
));
}
#[test]
fn test_missing_procedure_service() {
let f = ProcedureStateFunction;
let args = vec!["pid"];
let args = args
.into_iter()
.map(|arg| Arc::new(StringVector::from_slice(&[arg])) as _)
.collect::<Vec<_>>();
let result = f.eval(FunctionContext::default(), &args).unwrap_err();
assert_eq!(
"Missing ProcedureServiceHandler, not expected",
result.to_string()
);
}
#[test]
fn test_procedure_state() {
let f = ProcedureStateFunction;
let args = vec!["pid"];
let args = args
.into_iter()
.map(|arg| Arc::new(StringVector::from_slice(&[arg])) as _)
.collect::<Vec<_>>();
let result = f.eval(FunctionContext::mock(), &args).unwrap();
let expect: VectorRef = Arc::new(StringVector::from(vec![
"{\"status\":\"Done\",\"error\":\"OK\"}",
]));
assert_eq!(expect, result);
}
}

View File

@@ -12,10 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod flush_compact_region;
mod flush_compact_table;
mod migrate_region;
use std::sync::Arc;
use flush_compact_region::{CompactRegionFunction, FlushRegionFunction};
use flush_compact_table::{CompactTableFunction, FlushTableFunction};
use migrate_region::MigrateRegionFunction;
use crate::function_registry::FunctionRegistry;
@@ -27,5 +31,9 @@ impl TableFunction {
/// Register all table functions to [`FunctionRegistry`].
pub fn register(registry: &FunctionRegistry) {
registry.register(Arc::new(MigrateRegionFunction));
registry.register(Arc::new(FlushRegionFunction));
registry.register(Arc::new(CompactRegionFunction));
registry.register(Arc::new(FlushTableFunction));
registry.register(Arc::new(CompactTableFunction));
}
}

View File

@@ -0,0 +1,148 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use common_macro::admin_fn;
use common_query::error::Error::ThreadJoin;
use common_query::error::{
InvalidFuncArgsSnafu, MissingTableMutationHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::{Signature, Volatility};
use common_telemetry::error;
use datatypes::prelude::*;
use datatypes::vectors::VectorRef;
use session::context::QueryContextRef;
use snafu::{ensure, Location, OptionExt};
use store_api::storage::RegionId;
use crate::ensure_greptime;
use crate::function::{Function, FunctionContext};
use crate::handlers::TableMutationHandlerRef;
use crate::helper::cast_u64;
macro_rules! define_region_function {
($name: expr, $display_name_str: expr, $display_name: ident) => {
/// A function to $display_name
#[admin_fn(name = $name, display_name = $display_name_str, sig_fn = "signature", ret = "uint64")]
pub(crate) async fn $display_name(
table_mutation_handler: &TableMutationHandlerRef,
query_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
ensure!(
params.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 1, have: {}",
params.len()
),
}
);
let Some(region_id) = cast_u64(&params[0])? else {
return UnsupportedInputDataTypeSnafu {
function: $display_name_str,
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
let affected_rows = table_mutation_handler
.$display_name(RegionId::from_u64(region_id), query_ctx.clone())
.await?;
Ok(Value::from(affected_rows as u64))
}
};
}
define_region_function!("FlushRegionFunction", "flush_region", flush_region);
define_region_function!("CompactRegionFunction", "compact_region", compact_region);
fn signature() -> Signature {
Signature::uniform(1, ConcreteDataType::numerics(), Volatility::Immutable)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_query::prelude::TypeSignature;
use datatypes::vectors::UInt64Vector;
use super::*;
macro_rules! define_region_function_test {
($name: ident, $func: ident) => {
paste::paste! {
#[test]
fn [<test_ $name _misc>]() {
let f = $func;
assert_eq!(stringify!($name), f.name());
assert_eq!(
ConcreteDataType::uint64_datatype(),
f.return_type(&[]).unwrap()
);
assert!(matches!(f.signature(),
Signature {
type_signature: TypeSignature::Uniform(1, valid_types),
volatility: Volatility::Immutable
} if valid_types == ConcreteDataType::numerics()));
}
#[test]
fn [<test_ $name _missing_table_mutation>]() {
let f = $func;
let args = vec![99];
let args = args
.into_iter()
.map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
.collect::<Vec<_>>();
let result = f.eval(FunctionContext::default(), &args).unwrap_err();
assert_eq!(
"Missing TableMutationHandler, not expected",
result.to_string()
);
}
#[test]
fn [<test_ $name>]() {
let f = $func;
let args = vec![99];
let args = args
.into_iter()
.map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
.collect::<Vec<_>>();
let result = f.eval(FunctionContext::mock(), &args).unwrap();
let expect: VectorRef = Arc::new(UInt64Vector::from_slice([42]));
assert_eq!(expect, result);
}
}
};
}
define_region_function_test!(flush_region, FlushRegionFunction);
define_region_function_test!(compact_region, CompactRegionFunction);
}

View File

@@ -0,0 +1,178 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use common_error::ext::BoxedError;
use common_macro::admin_fn;
use common_query::error::Error::ThreadJoin;
use common_query::error::{
InvalidFuncArgsSnafu, MissingTableMutationHandlerSnafu, Result, TableMutationSnafu,
UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::{Signature, Volatility};
use common_telemetry::error;
use datatypes::prelude::*;
use datatypes::vectors::VectorRef;
use session::context::QueryContextRef;
use session::table_name::table_name_to_full_name;
use snafu::{ensure, Location, OptionExt, ResultExt};
use table::requests::{CompactTableRequest, FlushTableRequest};
use crate::ensure_greptime;
use crate::function::{Function, FunctionContext};
use crate::handlers::TableMutationHandlerRef;
macro_rules! define_table_function {
($name: expr, $display_name_str: expr, $display_name: ident, $func: ident, $request: ident) => {
/// A function to $func table, such as `$display_name(table_name)`.
#[admin_fn(name = $name, display_name = $display_name_str, sig_fn = "signature", ret = "uint64")]
pub(crate) async fn $display_name(
table_mutation_handler: &TableMutationHandlerRef,
query_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
ensure!(
params.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 1, have: {}",
params.len()
),
}
);
let ValueRef::String(table_name) = params[0] else {
return UnsupportedInputDataTypeSnafu {
function: $display_name_str,
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
let (catalog_name, schema_name, table_name) =
table_name_to_full_name(table_name, &query_ctx)
.map_err(BoxedError::new)
.context(TableMutationSnafu)?;
let affected_rows = table_mutation_handler
.$func(
$request {
catalog_name,
schema_name,
table_name,
},
query_ctx.clone(),
)
.await?;
Ok(Value::from(affected_rows as u64))
}
};
}
define_table_function!(
"FlushTableFunction",
"flush_table",
flush_table,
flush,
FlushTableRequest
);
define_table_function!(
"CompactTableFunction",
"compact_table",
compact_table,
compact,
CompactTableRequest
);
fn signature() -> Signature {
Signature::uniform(
1,
vec![ConcreteDataType::string_datatype()],
Volatility::Immutable,
)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_query::prelude::TypeSignature;
use datatypes::vectors::{StringVector, UInt64Vector};
use super::*;
macro_rules! define_table_function_test {
($name: ident, $func: ident) => {
paste::paste!{
#[test]
fn [<test_ $name _misc>]() {
let f = $func;
assert_eq!(stringify!($name), f.name());
assert_eq!(
ConcreteDataType::uint64_datatype(),
f.return_type(&[]).unwrap()
);
assert!(matches!(f.signature(),
Signature {
type_signature: TypeSignature::Uniform(1, valid_types),
volatility: Volatility::Immutable
} if valid_types == vec![ConcreteDataType::string_datatype()]));
}
#[test]
fn [<test_ $name _missing_table_mutation>]() {
let f = $func;
let args = vec!["test"];
let args = args
.into_iter()
.map(|arg| Arc::new(StringVector::from(vec![arg])) as _)
.collect::<Vec<_>>();
let result = f.eval(FunctionContext::default(), &args).unwrap_err();
assert_eq!(
"Missing TableMutationHandler, not expected",
result.to_string()
);
}
#[test]
fn [<test_ $name>]() {
let f = $func;
let args = vec!["test"];
let args = args
.into_iter()
.map(|arg| Arc::new(StringVector::from(vec![arg])) as _)
.collect::<Vec<_>>();
let result = f.eval(FunctionContext::mock(), &args).unwrap();
let expect: VectorRef = Arc::new(UInt64Vector::from_slice([42]));
assert_eq!(expect, result);
}
}
}
}
define_table_function_test!(flush_table, FlushTableFunction);
define_table_function_test!(compact_table, CompactTableFunction);
}

View File

@@ -15,18 +15,25 @@
use std::fmt::{self};
use std::time::Duration;
use common_macro::admin_fn;
use common_meta::rpc::procedure::MigrateRegionRequest;
use common_query::error::Error::ThreadJoin;
use common_query::error::{
InvalidFuncArgsSnafu, InvalidInputTypeSnafu, MissingTableMutationHandlerSnafu, Result,
};
use common_query::error::{InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result};
use common_query::prelude::{Signature, TypeSignature, Volatility};
use common_telemetry::logging::error;
use datatypes::prelude::{ConcreteDataType, MutableVector, ScalarVectorBuilder};
use datatypes::value::Value;
use datatypes::vectors::{StringVectorBuilder, VectorRef};
use snafu::{Location, OptionExt, ResultExt};
use datatypes::data_type::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::{Value, ValueRef};
use datatypes::vectors::VectorRef;
use session::context::QueryContextRef;
use snafu::{Location, OptionExt};
use crate::ensure_greptime;
use crate::function::{Function, FunctionContext};
use crate::handlers::ProcedureServiceHandlerRef;
use crate::helper::cast_u64;
const DEFAULT_REPLAY_TIMEOUT_SECS: u64 = 10;
/// A function to migrate a region from source peer to target peer.
/// Returns the submitted procedure id if success. Only available in cluster mode.
@@ -38,138 +45,140 @@ use crate::function::{Function, FunctionContext};
/// - `region_id`: the region id
/// - `from_peer`: the source peer id
/// - `to_peer`: the target peer id
#[derive(Clone, Debug, Default)]
pub struct MigrateRegionFunction;
#[admin_fn(
name = "MigrateRegionFunction",
display_name = "migrate_region",
sig_fn = "signature",
ret = "string"
)]
pub(crate) async fn migrate_region(
procedure_service_handler: &ProcedureServiceHandlerRef,
_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
let (region_id, from_peer, to_peer, replay_timeout) = match params.len() {
3 => {
let region_id = cast_u64(&params[0])?;
let from_peer = cast_u64(&params[1])?;
let to_peer = cast_u64(&params[2])?;
const NAME: &str = "migrate_region";
const DEFAULT_REPLAY_TIMEOUT_SECS: u64 = 10;
(
region_id,
from_peer,
to_peer,
Some(DEFAULT_REPLAY_TIMEOUT_SECS),
)
}
fn cast_u64_vector(vector: &VectorRef) -> Result<VectorRef> {
vector
.cast(&ConcreteDataType::uint64_datatype())
.context(InvalidInputTypeSnafu {
err_msg: format!(
"Failed to cast input into uint64, actual type: {:#?}",
vector.data_type(),
),
})
}
4 => {
let region_id = cast_u64(&params[0])?;
let from_peer = cast_u64(&params[1])?;
let to_peer = cast_u64(&params[2])?;
let replay_timeout = cast_u64(&params[3])?;
impl Function for MigrateRegionFunction {
fn name(&self) -> &str {
NAME
}
(region_id, from_peer, to_peer, replay_timeout)
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::string_datatype())
}
fn signature(&self) -> Signature {
Signature::one_of(
vec![
// migrate_region(region_id, from_peer, to_peer)
TypeSignature::Uniform(3, ConcreteDataType::numerics()),
// migrate_region(region_id, from_peer, to_peer, timeout(secs))
TypeSignature::Uniform(4, ConcreteDataType::numerics()),
],
Volatility::Immutable,
)
}
fn eval(&self, func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
let (region_ids, from_peers, to_peers, replay_timeouts) = match columns.len() {
3 => {
let region_ids = cast_u64_vector(&columns[0])?;
let from_peers = cast_u64_vector(&columns[1])?;
let to_peers = cast_u64_vector(&columns[2])?;
(region_ids, from_peers, to_peers, None)
size => {
return InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly 3 or 4, have: {}",
size
),
}
.fail();
}
};
4 => {
let region_ids = cast_u64_vector(&columns[0])?;
let from_peers = cast_u64_vector(&columns[1])?;
let to_peers = cast_u64_vector(&columns[2])?;
let replay_timeouts = cast_u64_vector(&columns[3])?;
match (region_id, from_peer, to_peer, replay_timeout) {
(Some(region_id), Some(from_peer), Some(to_peer), Some(replay_timeout)) => {
let pid = procedure_service_handler
.migrate_region(MigrateRegionRequest {
region_id,
from_peer,
to_peer,
replay_timeout: Duration::from_secs(replay_timeout),
})
.await?;
(region_ids, from_peers, to_peers, Some(replay_timeouts))
match pid {
Some(pid) => Ok(Value::from(pid)),
None => Ok(Value::Null),
}
}
size => {
return InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly 3 or 4, have: {}",
size
),
}
.fail();
}
};
std::thread::spawn(move || {
let len = region_ids.len();
let mut results = StringVectorBuilder::with_capacity(len);
for index in 0..len {
let region_id = region_ids.get(index);
let from_peer = from_peers.get(index);
let to_peer = to_peers.get(index);
let replay_timeout = match &replay_timeouts {
Some(replay_timeouts) => replay_timeouts.get(index),
None => Value::UInt64(DEFAULT_REPLAY_TIMEOUT_SECS),
};
match (region_id, from_peer, to_peer, replay_timeout) {
(
Value::UInt64(region_id),
Value::UInt64(from_peer),
Value::UInt64(to_peer),
Value::UInt64(replay_timeout),
) => {
let func_ctx = func_ctx.clone();
let pid = common_runtime::block_on_read(async move {
func_ctx
.state
.table_mutation_handler
.as_ref()
.context(MissingTableMutationHandlerSnafu)?
.migrate_region(
region_id,
from_peer,
to_peer,
Duration::from_secs(replay_timeout),
)
.await
})?;
results.push(Some(&pid));
}
_ => {
results.push(None);
}
}
}
Ok(results.to_vector())
})
.join()
.map_err(|e| {
error!(e; "Join thread error");
ThreadJoin {
location: Location::default(),
}
})?
_ => Ok(Value::Null),
}
}
impl fmt::Display for MigrateRegionFunction {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "MIGRATE_REGION")
}
fn signature() -> Signature {
Signature::one_of(
vec![
// migrate_region(region_id, from_peer, to_peer)
TypeSignature::Uniform(3, ConcreteDataType::numerics()),
// migrate_region(region_id, from_peer, to_peer, timeout(secs))
TypeSignature::Uniform(4, ConcreteDataType::numerics()),
],
Volatility::Immutable,
)
}
#[cfg(test)]
mod tests {
// FIXME(dennis): test in the following PR.
use std::sync::Arc;
use common_query::prelude::TypeSignature;
use datatypes::vectors::{StringVector, UInt64Vector};
use super::*;
#[test]
fn test_migrate_region_misc() {
let f = MigrateRegionFunction;
assert_eq!("migrate_region", f.name());
assert_eq!(
ConcreteDataType::string_datatype(),
f.return_type(&[]).unwrap()
);
assert!(matches!(f.signature(),
Signature {
type_signature: TypeSignature::OneOf(sigs),
volatility: Volatility::Immutable
} if sigs.len() == 2));
}
#[test]
fn test_missing_procedure_service() {
let f = MigrateRegionFunction;
let args = vec![1, 1, 1];
let args = args
.into_iter()
.map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
.collect::<Vec<_>>();
let result = f.eval(FunctionContext::default(), &args).unwrap_err();
assert_eq!(
"Missing ProcedureServiceHandler, not expected",
result.to_string()
);
}
#[test]
fn test_migrate_region() {
let f = MigrateRegionFunction;
let args = vec![1, 1, 1];
let args = args
.into_iter()
.map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
.collect::<Vec<_>>();
let result = f.eval(FunctionContext::mock(), &args).unwrap();
let expect: VectorRef = Arc::new(StringVector::from(vec!["test_pid"]));
assert_eq!(expect, result);
}
}

View File

@@ -4,6 +4,9 @@ version.workspace = true
edition.workspace = true
license.workspace = true
[lints]
workspace = true
[dependencies]
async-trait.workspace = true
common-error.workspace = true

View File

@@ -4,6 +4,9 @@ version.workspace = true
edition.workspace = true
license.workspace = true
[lints]
workspace = true
[dependencies]
api.workspace = true
async-trait.workspace = true

View File

@@ -4,6 +4,9 @@ version.workspace = true
edition.workspace = true
license.workspace = true
[lints]
workspace = true
[dependencies]
api.workspace = true
arrow-flight.workspace = true

View File

@@ -50,8 +50,12 @@ pub struct FlightEncoder {
impl Default for FlightEncoder {
fn default() -> Self {
let write_options = writer::IpcWriteOptions::default()
.try_with_compression(Some(arrow::ipc::CompressionType::LZ4_FRAME))
.unwrap();
Self {
write_options: writer::IpcWriteOptions::default(),
write_options,
data_gen: writer::IpcDataGenerator::default(),
dictionary_tracker: writer::DictionaryTracker::new(false),
}

View File

@@ -7,6 +7,9 @@ license.workspace = true
[lib]
proc-macro = true
[lints]
workspace = true
[dependencies]
proc-macro2 = "1.0.66"
quote = "1.0"

View File

@@ -0,0 +1,236 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use proc_macro::TokenStream;
use quote::quote;
use syn::spanned::Spanned;
use syn::{
parse_macro_input, Attribute, AttributeArgs, Ident, ItemFn, Signature, Type, TypePath,
TypeReference, Visibility,
};
use crate::utils::{extract_arg_map, extract_input_types, get_ident};
/// Internal util macro to early return on error.
macro_rules! ok {
($item:expr) => {
match $item {
Ok(item) => item,
Err(e) => return e.into_compile_error().into(),
}
};
}
/// Internal util macro to to create an error.
macro_rules! error {
($span:expr, $msg: expr) => {
Err(syn::Error::new($span, $msg))
};
}
pub(crate) fn process_admin_fn(args: TokenStream, input: TokenStream) -> TokenStream {
let mut result = TokenStream::new();
// extract arg map
let arg_pairs = parse_macro_input!(args as AttributeArgs);
let arg_span = arg_pairs[0].span();
let arg_map = ok!(extract_arg_map(arg_pairs));
// decompose the fn block
let compute_fn = parse_macro_input!(input as ItemFn);
let ItemFn {
attrs,
vis,
sig,
block,
} = compute_fn;
// extract fn arg list
let Signature {
inputs,
ident: fn_name,
..
} = &sig;
let arg_types = ok!(extract_input_types(inputs));
if arg_types.len() < 2 {
ok!(error!(
sig.span(),
"Expect at least two argument for admin fn: (handler, query_ctx)"
));
}
let handler_type = ok!(extract_handler_type(&arg_types));
// build the struct and its impl block
// only do this when `display_name` is specified
if let Ok(display_name) = get_ident(&arg_map, "display_name", arg_span) {
let struct_code = build_struct(
attrs,
vis,
fn_name,
ok!(get_ident(&arg_map, "name", arg_span)),
ok!(get_ident(&arg_map, "sig_fn", arg_span)),
ok!(get_ident(&arg_map, "ret", arg_span)),
handler_type,
display_name,
);
result.extend(struct_code);
}
// preserve this fn
let input_fn_code: TokenStream = quote! {
#sig { #block }
}
.into();
result.extend(input_fn_code);
result
}
/// Retrieve the handler type, `ProcedureServiceHandlerRef` or `TableMutationHandlerRef`.
fn extract_handler_type(arg_types: &[Type]) -> Result<&Ident, syn::Error> {
match &arg_types[0] {
Type::Reference(TypeReference { elem, .. }) => match &**elem {
Type::Path(TypePath { path, .. }) => Ok(&path
.segments
.first()
.expect("Expected a reference of handler")
.ident),
other => {
error!(other.span(), "Expected a reference of handler")
}
},
other => {
error!(other.span(), "Expected a reference of handler")
}
}
}
/// Build the function struct
#[allow(clippy::too_many_arguments)]
fn build_struct(
attrs: Vec<Attribute>,
vis: Visibility,
fn_name: &Ident,
name: Ident,
sig_fn: Ident,
ret: Ident,
handler_type: &Ident,
display_name_ident: Ident,
) -> TokenStream {
let display_name = display_name_ident.to_string();
let ret = Ident::new(&format!("{ret}_datatype"), ret.span());
let uppcase_display_name = display_name.to_uppercase();
// Get the handler name in function state by the argument ident
let (handler, snafu_type) = match handler_type.to_string().as_str() {
"ProcedureServiceHandlerRef" => (
Ident::new("procedure_service_handler", handler_type.span()),
Ident::new("MissingProcedureServiceHandlerSnafu", handler_type.span()),
),
"TableMutationHandlerRef" => (
Ident::new("table_mutation_handler", handler_type.span()),
Ident::new("MissingTableMutationHandlerSnafu", handler_type.span()),
),
handler => ok!(error!(
handler_type.span(),
format!("Unknown handler type: {handler}")
)),
};
quote! {
#(#attrs)*
#[derive(Debug)]
#vis struct #name;
impl fmt::Display for #name {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, #uppcase_display_name)
}
}
impl Function for #name {
fn name(&self) -> &'static str {
#display_name
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::#ret())
}
fn signature(&self) -> Signature {
#sig_fn()
}
fn eval(&self, func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
// Ensure under the `greptime` catalog for security
ensure_greptime!(func_ctx);
let columns_num = columns.len();
let rows_num = if columns.is_empty() {
1
} else {
columns[0].len()
};
let columns = Vec::from(columns);
// TODO(dennis): DataFusion doesn't support async UDF currently
std::thread::spawn(move || {
let query_ctx = &func_ctx.query_ctx;
let handler = func_ctx
.state
.#handler
.as_ref()
.context(#snafu_type)?;
let mut builder = ConcreteDataType::#ret()
.create_mutable_vector(rows_num);
if columns_num == 0 {
let result = common_runtime::block_on_read(async move {
#fn_name(handler, query_ctx, &[]).await
})?;
builder.push_value_ref(result.as_value_ref());
} else {
for i in 0..rows_num {
let args: Vec<_> = columns.iter()
.map(|vector| vector.get_ref(i))
.collect();
let result = common_runtime::block_on_read(async move {
#fn_name(handler, query_ctx, &args).await
})?;
builder.push_value_ref(result.as_value_ref());
}
}
Ok(builder.to_vector())
})
.join()
.map_err(|e| {
error!(e; "Join thread error");
ThreadJoin {
location: Location::default(),
}
})?
}
}
}
.into()
}

View File

@@ -12,17 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod admin_fn;
mod aggr_func;
mod print_caller;
mod range_fn;
mod stack_trace_debug;
mod utils;
use aggr_func::{impl_aggr_func_type_store, impl_as_aggr_func_creator};
use print_caller::process_print_caller;
use proc_macro::TokenStream;
use range_fn::process_range_fn;
use syn::{parse_macro_input, DeriveInput};
use crate::admin_fn::process_admin_fn;
/// Make struct implemented trait [AggrFuncTypeStore], which is necessary when writing UDAF.
/// This derive macro is expect to be used along with attribute macro [macro@as_aggr_func_creator].
#[proc_macro_derive(AggrFuncTypeStore)]
@@ -68,6 +71,25 @@ pub fn range_fn(args: TokenStream, input: TokenStream) -> TokenStream {
process_range_fn(args, input)
}
/// Attribute macro to convert a normal function to SQL administration function. The annotated function
/// should accept:
/// - `&ProcedureServiceHandlerRef` or `&TableMutationHandlerRef` as the first argument,
/// - `&QueryContextRef` as the second argument, and
/// - `&[ValueRef<'_>]` as the third argument which is SQL function input values in each row.
/// Return type must be `common_query::error::Result<Value>`.
///
/// # Example see `common/function/src/system/procedure_state.rs`.
///
/// # Arguments
/// - `name`: The name of the generated `Function` implementation.
/// - `ret`: The return type of the generated SQL function, it will be transformed into `ConcreteDataType::{ret}_datatype()` result.
/// - `display_name`: The display name of the generated SQL function.
/// - `sig_fn`: the function to returns `Signature` of generated `Function`.
#[proc_macro_attribute]
pub fn admin_fn(args: TokenStream, input: TokenStream) -> TokenStream {
process_admin_fn(args, input)
}
/// Attribute macro to print the caller to the annotated function.
/// The caller is printed as its filename and the call site line number.
///

View File

@@ -12,20 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use proc_macro::TokenStream;
use proc_macro2::Span;
use quote::quote;
use syn::punctuated::Punctuated;
use syn::spanned::Spanned;
use syn::token::Comma;
use syn::{
parse_macro_input, Attribute, AttributeArgs, FnArg, Ident, ItemFn, Meta, MetaNameValue,
NestedMeta, Signature, Type, TypeReference, Visibility,
parse_macro_input, Attribute, AttributeArgs, Ident, ItemFn, Signature, Type, TypeReference,
Visibility,
};
/// Internal util macro to early return on error.
use crate::utils::{extract_arg_map, extract_input_types, get_ident};
macro_rules! ok {
($item:expr) => {
match $item {
@@ -89,48 +85,6 @@ pub(crate) fn process_range_fn(args: TokenStream, input: TokenStream) -> TokenSt
result
}
/// Extract a String <-> Ident map from the attribute args.
fn extract_arg_map(args: Vec<NestedMeta>) -> Result<HashMap<String, Ident>, syn::Error> {
args.into_iter()
.map(|meta| {
if let NestedMeta::Meta(Meta::NameValue(MetaNameValue { path, lit, .. })) = meta {
let name = path.get_ident().unwrap().to_string();
let ident = match lit {
syn::Lit::Str(lit_str) => lit_str.parse::<Ident>(),
_ => Err(syn::Error::new(
lit.span(),
"Unexpected attribute format. Expected `name = \"value\"`",
)),
}?;
Ok((name, ident))
} else {
Err(syn::Error::new(
meta.span(),
"Unexpected attribute format. Expected `name = \"value\"`",
))
}
})
.collect::<Result<HashMap<String, Ident>, syn::Error>>()
}
/// Helper function to get an Ident from the previous arg map.
fn get_ident(map: &HashMap<String, Ident>, key: &str, span: Span) -> Result<Ident, syn::Error> {
map.get(key)
.cloned()
.ok_or_else(|| syn::Error::new(span, format!("Expect attribute {key} but not found")))
}
/// Extract the argument list from the annotated function.
fn extract_input_types(inputs: &Punctuated<FnArg, Comma>) -> Result<Vec<Type>, syn::Error> {
inputs
.iter()
.map(|arg| match arg {
FnArg::Receiver(receiver) => Err(syn::Error::new(receiver.span(), "expected bool")),
FnArg::Typed(pat_type) => Ok(*pat_type.ty.clone()),
})
.collect()
}
fn build_struct(
attrs: Vec<Attribute>,
vis: Visibility,
@@ -214,7 +168,7 @@ fn build_calc_fn(
#( let #range_array_names = RangeArray::try_new(extract_array(&input[#param_numbers])?.to_data().into())?; )*
// TODO(ruihang): add ensure!()
// TODO(ruihang): add ensure!()
let mut result_array = Vec::new();
for index in 0..#first_range_array_name.len(){

View File

@@ -0,0 +1,69 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use proc_macro2::Span;
use syn::punctuated::Punctuated;
use syn::spanned::Spanned;
use syn::token::Comma;
use syn::{FnArg, Ident, Meta, MetaNameValue, NestedMeta, Type};
/// Extract a String <-> Ident map from the attribute args.
pub(crate) fn extract_arg_map(args: Vec<NestedMeta>) -> Result<HashMap<String, Ident>, syn::Error> {
args.into_iter()
.map(|meta| {
if let NestedMeta::Meta(Meta::NameValue(MetaNameValue { path, lit, .. })) = meta {
let name = path.get_ident().unwrap().to_string();
let ident = match lit {
syn::Lit::Str(lit_str) => lit_str.parse::<Ident>(),
_ => Err(syn::Error::new(
lit.span(),
"Unexpected attribute format. Expected `name = \"value\"`",
)),
}?;
Ok((name, ident))
} else {
Err(syn::Error::new(
meta.span(),
"Unexpected attribute format. Expected `name = \"value\"`",
))
}
})
.collect::<Result<HashMap<String, Ident>, syn::Error>>()
}
/// Helper function to get an Ident from the previous arg map.
pub(crate) fn get_ident(
map: &HashMap<String, Ident>,
key: &str,
span: Span,
) -> Result<Ident, syn::Error> {
map.get(key)
.cloned()
.ok_or_else(|| syn::Error::new(span, format!("Expect attribute {key} but not found")))
}
/// Extract the argument list from the annotated function.
pub(crate) fn extract_input_types(
inputs: &Punctuated<FnArg, Comma>,
) -> Result<Vec<Type>, syn::Error> {
inputs
.iter()
.map(|arg| match arg {
FnArg::Receiver(receiver) => Err(syn::Error::new(receiver.span(), "expected bool")),
FnArg::Typed(pat_type) => Ok(*pat_type.ty.clone()),
})
.collect()
}

View File

@@ -4,6 +4,9 @@ version.workspace = true
edition.workspace = true
license.workspace = true
[lints]
workspace = true
[dependencies]
common-error.workspace = true
common-macro.workspace = true

View File

@@ -7,6 +7,9 @@ license.workspace = true
[features]
testing = []
[lints]
workspace = true
[dependencies]
api.workspace = true
async-recursion = "1.0"
@@ -15,11 +18,13 @@ async-trait.workspace = true
base64.workspace = true
bytes.workspace = true
chrono.workspace = true
common-base.workspace = true
common-catalog.workspace = true
common-error.workspace = true
common-grpc-expr.workspace = true
common-macro.workspace = true
common-procedure.workspace = true
common-procedure-test.workspace = true
common-recordbatch.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true

View File

@@ -15,23 +15,25 @@
use std::sync::Arc;
use api::v1::region::{QueryRequest, RegionRequest};
pub use common_base::AffectedRows;
use common_recordbatch::SendableRecordBatchStream;
use crate::error::Result;
use crate::peer::Peer;
pub type AffectedRows = u64;
/// The trait for handling requests to datanode.
#[async_trait::async_trait]
pub trait Datanode: Send + Sync {
/// Handles DML, and DDL requests.
async fn handle(&self, request: RegionRequest) -> Result<AffectedRows>;
/// Handles query requests
async fn handle_query(&self, request: QueryRequest) -> Result<SendableRecordBatchStream>;
}
pub type DatanodeRef = Arc<dyn Datanode>;
/// Datanode manager
#[async_trait::async_trait]
pub trait DatanodeManager: Send + Sync {
/// Retrieves a target `datanode`.

View File

@@ -18,6 +18,7 @@ use std::sync::Arc;
use common_telemetry::tracing_context::W3cTrace;
use store_api::storage::{RegionNumber, TableId};
use self::table_meta::TableMetadataAllocatorRef;
use crate::cache_invalidator::CacheInvalidatorRef;
use crate::datanode_manager::DatanodeManagerRef;
use crate::error::Result;
@@ -25,6 +26,7 @@ use crate::key::table_route::TableRouteValue;
use crate::key::TableMetadataManagerRef;
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
pub mod alter_table;
pub mod create_logical_tables;
@@ -32,6 +34,10 @@ pub mod create_table;
mod create_table_template;
pub mod drop_table;
pub mod table_meta;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;
#[cfg(test)]
mod tests;
pub mod truncate_table;
pub mod utils;
@@ -41,16 +47,32 @@ pub struct ExecutorContext {
pub tracing_context: Option<W3cTrace>,
}
/// The procedure executor that accepts ddl, region migration task etc.
#[async_trait::async_trait]
pub trait DdlTaskExecutor: Send + Sync {
pub trait ProcedureExecutor: Send + Sync {
/// Submit a ddl task
async fn submit_ddl_task(
&self,
ctx: &ExecutorContext,
request: SubmitDdlTaskRequest,
) -> Result<SubmitDdlTaskResponse>;
/// Submit a region migration task
async fn migrate_region(
&self,
ctx: &ExecutorContext,
request: MigrateRegionRequest,
) -> Result<MigrateRegionResponse>;
/// Query the procedure state by its id
async fn query_procedure_state(
&self,
ctx: &ExecutorContext,
pid: &str,
) -> Result<ProcedureStateResponse>;
}
pub type DdlTaskExecutorRef = Arc<dyn DdlTaskExecutor>;
pub type ProcedureExecutorRef = Arc<dyn ProcedureExecutor>;
pub struct TableMetadataAllocatorContext {
pub cluster_id: u64,
@@ -73,4 +95,5 @@ pub struct DdlContext {
pub cache_invalidator: CacheInvalidatorRef,
pub table_metadata_manager: TableMetadataManagerRef,
pub memory_region_keeper: MemoryRegionKeeperRef,
pub table_metadata_allocator: TableMetadataAllocatorRef,
}

View File

@@ -40,7 +40,7 @@ use table::requests::AlterKind;
use table::table_reference::TableReference;
use crate::cache_invalidator::Context;
use crate::ddl::utils::handle_operate_region_error;
use crate::ddl::utils::add_peer_context_if_needed;
use crate::ddl::DdlContext;
use crate::error::{self, ConvertAlterTableRequestSnafu, Error, InvalidProtoMsgSnafu, Result};
use crate::key::table_info::TableInfoValue;
@@ -226,7 +226,7 @@ impl AlterTableProcedure {
// The engine will throw this code when the schema version not match.
// As this procedure has locked the table, the only reason for this error
// is procedure is succeeded before and is retrying.
return Err(handle_operate_region_error(datanode)(err));
return Err(add_peer_context_if_needed(datanode)(err));
}
}
Ok(())

View File

@@ -31,7 +31,7 @@ use strum::AsRefStr;
use table::metadata::{RawTableInfo, TableId};
use crate::ddl::create_table_template::{build_template, CreateRequestBuilder};
use crate::ddl::utils::{handle_operate_region_error, handle_retry_error, region_storage_path};
use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error, region_storage_path};
use crate::ddl::DdlContext;
use crate::error::{Result, TableAlreadyExistsSnafu};
use crate::key::table_name::TableNameKey;
@@ -66,7 +66,16 @@ impl CreateLogicalTablesProcedure {
Ok(Self { context, creator })
}
async fn on_prepare(&mut self) -> Result<Status> {
/// On the prepares step, it performs:
/// - Checks whether physical table exists.
/// - Checks whether logical tables exist.
/// - Allocates the table ids.
///
/// Abort(non-retry):
/// - The physical table does not exist.
/// - Failed to check whether tables exist.
/// - One of logical tables has existing, and the table creation task without setting `create_if_not_exists`.
pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
let manager = &self.context.table_metadata_manager;
// Sets physical region numbers
@@ -80,7 +89,7 @@ impl CreateLogicalTablesProcedure {
.data
.set_physical_region_numbers(physical_region_numbers);
// Checks if the tables exists
// Checks if the tables exist
let table_name_keys = self
.creator
.data
@@ -96,24 +105,9 @@ impl CreateLogicalTablesProcedure {
.map(|x| x.map(|x| x.table_id()))
.collect::<Vec<_>>();
// Sets table ids already exists
self.creator
.data
.set_table_ids_already_exists(already_exists_tables_ids);
// If all tables do not exists, we can create them directly.
if self.creator.data.is_all_tables_not_exists() {
self.creator.data.state = CreateTablesState::DatanodeCreateRegions;
return Ok(Status::executing(true));
}
// Filter out the tables that already exist.
let tasks = &self.creator.data.tasks;
let mut filtered_tasks = Vec::with_capacity(tasks.len());
for (task, table_id) in tasks
.iter()
.zip(self.creator.data.table_ids_already_exists().iter())
{
// Validates the tasks
let tasks = &mut self.creator.data.tasks;
for (task, table_id) in tasks.iter().zip(already_exists_tables_ids.iter()) {
if table_id.is_some() {
// If a table already exists, we just ignore it.
ensure!(
@@ -124,17 +118,34 @@ impl CreateLogicalTablesProcedure {
);
continue;
}
filtered_tasks.push(task.clone());
}
// Resets tasks
self.creator.data.tasks = filtered_tasks;
if self.creator.data.tasks.is_empty() {
// If all tables already exist, we can skip the `DatanodeCreateRegions` stage.
self.creator.data.state = CreateTablesState::CreateMetadata;
return Ok(Status::executing(true));
// If all tables already exist, returns the table_ids.
if already_exists_tables_ids.iter().all(Option::is_some) {
return Ok(Status::done_with_output(
already_exists_tables_ids
.into_iter()
.flatten()
.collect::<Vec<_>>(),
));
}
// Allocates table ids
for (task, table_id) in tasks.iter_mut().zip(already_exists_tables_ids.iter()) {
let table_id = if let Some(table_id) = table_id {
*table_id
} else {
self.context
.table_metadata_allocator
.allocate_table_id(task)
.await?
};
task.set_table_id(table_id);
}
self.creator
.data
.set_table_ids_already_exists(already_exists_tables_ids);
self.creator.data.state = CreateTablesState::DatanodeCreateRegions;
Ok(Status::executing(true))
}
@@ -152,17 +163,20 @@ impl CreateLogicalTablesProcedure {
self.create_regions(region_routes).await
}
/// Creates table metadata
///
/// Abort(not-retry):
/// - Failed to create table metadata.
pub async fn on_create_metadata(&self) -> Result<Status> {
let manager = &self.context.table_metadata_manager;
let physical_table_id = self.creator.data.physical_table_id();
let tables_data = self.creator.data.all_tables_data();
let num_tables = tables_data.len();
let remaining_tasks = self.creator.data.remaining_tasks();
let num_tables = remaining_tasks.len();
if num_tables > 0 {
let chunk_size = manager.max_logical_tables_per_batch();
if num_tables > chunk_size {
let chunks = tables_data
let chunks = remaining_tasks
.into_iter()
.chunks(chunk_size)
.into_iter()
@@ -172,11 +186,21 @@ impl CreateLogicalTablesProcedure {
manager.create_logical_tables_metadata(chunk).await?;
}
} else {
manager.create_logical_tables_metadata(tables_data).await?;
manager
.create_logical_tables_metadata(remaining_tasks)
.await?;
}
}
let table_ids = self.creator.data.real_table_ids();
// The `table_id` MUST be collected after the [Prepare::Prepare],
// ensures the all `table_id`s have been allocated.
let table_ids = self
.creator
.data
.tasks
.iter()
.map(|task| task.table_info.ident.table_id)
.collect::<Vec<_>>();
info!("Created {num_tables} tables {table_ids:?} metadata for physical table {physical_table_id}");
@@ -238,10 +262,10 @@ impl CreateLogicalTablesProcedure {
body: Some(PbRegionRequest::Creates(creates)),
};
create_region_tasks.push(async move {
if let Err(err) = requester.handle(request).await {
return Err(handle_operate_region_error(datanode)(err));
}
Ok(())
requester
.handle(request)
.await
.map_err(add_peer_context_if_needed(datanode))
});
}
@@ -310,17 +334,13 @@ impl TablesCreator {
tasks: Vec<CreateTableTask>,
physical_table_id: TableId,
) -> Self {
let table_ids_from_tasks = tasks
.iter()
.map(|task| task.table_info.ident.table_id)
.collect::<Vec<_>>();
let len = table_ids_from_tasks.len();
let len = tasks.len();
Self {
data: CreateTablesData {
cluster_id,
state: CreateTablesState::Prepare,
tasks,
table_ids_from_tasks,
table_ids_already_exists: vec![None; len],
physical_table_id,
physical_region_numbers: vec![],
@@ -334,10 +354,6 @@ pub struct CreateTablesData {
cluster_id: ClusterId,
state: CreateTablesState,
tasks: Vec<CreateTableTask>,
table_ids_from_tasks: Vec<TableId>,
// Because the table_id is allocated before entering the distributed lock,
// it needs to recheck if the table exists when creating a table.
// If it does exist, then the table_id needs to be replaced with the existing one.
table_ids_already_exists: Vec<Option<TableId>>,
physical_table_id: TableId,
physical_region_numbers: Vec<RegionNumber>,
@@ -360,24 +376,6 @@ impl CreateTablesData {
self.table_ids_already_exists = table_ids_already_exists;
}
fn table_ids_already_exists(&self) -> &[Option<TableId>] {
&self.table_ids_already_exists
}
fn is_all_tables_not_exists(&self) -> bool {
self.table_ids_already_exists.iter().all(Option::is_none)
}
pub fn real_table_ids(&self) -> Vec<TableId> {
self.table_ids_from_tasks
.iter()
.zip(self.table_ids_already_exists.iter())
.map(|(table_id_from_task, table_id_already_exists)| {
table_id_already_exists.unwrap_or(*table_id_from_task)
})
.collect::<Vec<_>>()
}
fn all_create_table_exprs(&self) -> Vec<&CreateTableExpr> {
self.tasks
.iter()
@@ -385,18 +383,27 @@ impl CreateTablesData {
.collect::<Vec<_>>()
}
fn all_tables_data(&self) -> Vec<(RawTableInfo, TableRouteValue)> {
/// Returns the remaining tasks.
/// The length of tasks must be greater than 0.
fn remaining_tasks(&self) -> Vec<(RawTableInfo, TableRouteValue)> {
self.tasks
.iter()
.map(|task| {
let table_info = task.table_info.clone();
let region_ids = self
.physical_region_numbers
.iter()
.map(|region_number| RegionId::new(table_info.ident.table_id, *region_number))
.collect();
let table_route = TableRouteValue::logical(self.physical_table_id, region_ids);
(table_info, table_route)
.zip(self.table_ids_already_exists.iter())
.flat_map(|(task, table_id)| {
if table_id.is_none() {
let table_info = task.table_info.clone();
let region_ids = self
.physical_region_numbers
.iter()
.map(|region_number| {
RegionId::new(table_info.ident.table_id, *region_number)
})
.collect();
let table_route = TableRouteValue::logical(self.physical_table_id, region_ids);
Some((table_info, table_route))
} else {
None
}
})
.collect::<Vec<_>>()
}

View File

@@ -33,8 +33,8 @@ use table::metadata::{RawTableInfo, TableId};
use table::table_reference::TableReference;
use crate::ddl::create_table_template::{build_template, CreateRequestBuilder};
use crate::ddl::utils::{handle_operate_region_error, handle_retry_error, region_storage_path};
use crate::ddl::DdlContext;
use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error, region_storage_path};
use crate::ddl::{DdlContext, TableMetadata, TableMetadataAllocatorContext};
use crate::error::{self, Result, TableRouteNotFoundSnafu};
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
@@ -45,7 +45,6 @@ use crate::rpc::router::{
find_leader_regions, find_leaders, operating_leader_regions, RegionRoute,
};
use crate::{metrics, ClusterId};
pub struct CreateTableProcedure {
pub context: DdlContext,
pub creator: TableCreator,
@@ -54,16 +53,10 @@ pub struct CreateTableProcedure {
impl CreateTableProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateTable";
pub fn new(
cluster_id: ClusterId,
task: CreateTableTask,
table_route: TableRouteValue,
region_wal_options: HashMap<RegionNumber, String>,
context: DdlContext,
) -> Self {
pub fn new(cluster_id: ClusterId, task: CreateTableTask, context: DdlContext) -> Self {
Self {
context,
creator: TableCreator::new(cluster_id, task, table_route, region_wal_options),
creator: TableCreator::new(cluster_id, task),
}
}
@@ -75,7 +68,8 @@ impl CreateTableProcedure {
opening_regions: vec![],
};
if let TableRouteValue::Physical(x) = &creator.data.table_route {
// Only registers regions if the table route is allocated.
if let Some(TableRouteValue::Physical(x)) = &creator.data.table_route {
creator.opening_regions = creator
.register_opening_regions(&context, &x.region_routes)
.map_err(BoxedError::new)
@@ -85,20 +79,53 @@ impl CreateTableProcedure {
Ok(CreateTableProcedure { context, creator })
}
pub fn table_info(&self) -> &RawTableInfo {
fn table_info(&self) -> &RawTableInfo {
&self.creator.data.task.table_info
}
fn table_id(&self) -> TableId {
pub(crate) fn table_id(&self) -> TableId {
self.table_info().ident.table_id
}
pub fn region_wal_options(&self) -> &HashMap<RegionNumber, String> {
&self.creator.data.region_wal_options
fn region_wal_options(&self) -> Result<&HashMap<RegionNumber, String>> {
self.creator
.data
.region_wal_options
.as_ref()
.context(error::UnexpectedSnafu {
err_msg: "region_wal_options is not allocated",
})
}
/// Checks whether the table exists.
async fn on_prepare(&mut self) -> Result<Status> {
fn table_route(&self) -> Result<&TableRouteValue> {
self.creator
.data
.table_route
.as_ref()
.context(error::UnexpectedSnafu {
err_msg: "table_route is not allocated",
})
}
#[cfg(any(test, feature = "testing"))]
pub fn set_allocated_metadata(
&mut self,
table_id: TableId,
table_route: TableRouteValue,
region_wal_options: HashMap<RegionNumber, String>,
) {
self.creator
.set_allocated_metadata(table_id, table_route, region_wal_options)
}
/// On the prepare step, it performs:
/// - Checks whether the table exists.
/// - Allocates the table id.
///
/// Abort(non-retry):
/// - TableName exists and `create_if_not_exists` is false.
/// - Failed to allocate [TableMetadata].
pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
let expr = &self.creator.data.task.create_table;
let table_name_value = self
.context
@@ -124,6 +151,22 @@ impl CreateTableProcedure {
}
self.creator.data.state = CreateTableState::DatanodeCreateRegions;
let TableMetadata {
table_id,
table_route,
region_wal_options,
} = self
.context
.table_metadata_allocator
.create(
&TableMetadataAllocatorContext {
cluster_id: self.creator.data.cluster_id,
},
&self.creator.data.task,
)
.await?;
self.creator
.set_allocated_metadata(table_id, table_route, region_wal_options);
Ok(Status::executing(true))
}
@@ -137,8 +180,20 @@ impl CreateTableProcedure {
Ok(CreateRequestBuilder::new(template, physical_table_id))
}
/// Creates regions on datanodes
///
/// Abort(non-retry):
/// - Failed to create [CreateRequestBuilder].
/// - Failed to get the table route of physical table (for logical table).
///
/// Retry:
/// - If the underlying servers returns one of the following [Code](tonic::status::Code):
/// - [Code::Cancelled](tonic::status::Code::Cancelled)
/// - [Code::DeadlineExceeded](tonic::status::Code::DeadlineExceeded)
/// - [Code::Unavailable](tonic::status::Code::Unavailable)
pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
match &self.creator.data.table_route {
// Safety: the table route must be allocated.
match self.table_route()?.clone() {
TableRouteValue::Physical(x) => {
let region_routes = x.region_routes.clone();
let request_builder = self.new_region_request_builder(None)?;
@@ -151,12 +206,12 @@ impl CreateTableProcedure {
.context
.table_metadata_manager
.table_route_manager()
.get(physical_table_id)
.try_get_physical_table_route(physical_table_id)
.await?
.context(TableRouteNotFoundSnafu {
table_id: physical_table_id,
})?;
let region_routes = physical_table_route.region_routes()?;
let region_routes = &physical_table_route.region_routes;
let request_builder = self.new_region_request_builder(Some(physical_table_id))?;
@@ -170,7 +225,8 @@ impl CreateTableProcedure {
region_routes: &[RegionRoute],
request_builder: CreateRequestBuilder,
) -> Result<Status> {
if self.creator.data.table_route.is_physical() {
// Safety: the table_route must be allocated.
if self.table_route()?.is_physical() {
// Registers opening regions
let guards = self
.creator
@@ -181,13 +237,12 @@ impl CreateTableProcedure {
}
let create_table_data = &self.creator.data;
let region_wal_options = &create_table_data.region_wal_options;
// Safety: the region_wal_options must be allocated
let region_wal_options = self.region_wal_options()?;
let create_table_expr = &create_table_data.task.create_table;
let catalog = &create_table_expr.catalog_name;
let schema = &create_table_expr.schema_name;
let storage_path = region_storage_path(catalog, schema);
let leaders = find_leaders(region_routes);
let mut create_region_tasks = Vec::with_capacity(leaders.len());
@@ -203,7 +258,6 @@ impl CreateTableProcedure {
storage_path.clone(),
region_wal_options,
)?;
requests.push(PbRegionRequest::Create(create_region_request));
}
@@ -218,12 +272,11 @@ impl CreateTableProcedure {
let datanode = datanode.clone();
let requester = requester.clone();
create_region_tasks.push(async move {
if let Err(err) = requester.handle(request).await {
return Err(handle_operate_region_error(datanode)(err));
}
Ok(())
requester
.handle(request)
.await
.map_err(add_peer_context_if_needed(datanode))
});
}
}
@@ -240,18 +293,21 @@ impl CreateTableProcedure {
Ok(Status::executing(false))
}
/// Creates table metadata
///
/// Abort(not-retry):
/// - Failed to create table metadata.
async fn on_create_metadata(&self) -> Result<Status> {
let table_id = self.table_id();
let manager = &self.context.table_metadata_manager;
let raw_table_info = self.table_info().clone();
let region_wal_options = self.region_wal_options().clone();
// Safety: the region_wal_options must be allocated.
let region_wal_options = self.region_wal_options()?.clone();
// Safety: the table_route must be allocated.
let table_route = self.table_route()?.clone();
manager
.create_table_metadata(
raw_table_info,
self.creator.data.table_route.clone(),
region_wal_options,
)
.create_table_metadata(raw_table_info, table_route, region_wal_options)
.await?;
info!("Created table metadata for table {table_id}");
@@ -303,19 +359,14 @@ pub struct TableCreator {
}
impl TableCreator {
pub fn new(
cluster_id: u64,
task: CreateTableTask,
table_route: TableRouteValue,
region_wal_options: HashMap<RegionNumber, String>,
) -> Self {
pub fn new(cluster_id: u64, task: CreateTableTask) -> Self {
Self {
data: CreateTableData {
state: CreateTableState::Prepare,
cluster_id,
task,
table_route,
region_wal_options,
table_route: None,
region_wal_options: None,
},
opening_regions: vec![],
}
@@ -347,6 +398,17 @@ impl TableCreator {
}
Ok(opening_region_guards)
}
fn set_allocated_metadata(
&mut self,
table_id: TableId,
table_route: TableRouteValue,
region_wal_options: HashMap<RegionNumber, String>,
) {
self.data.task.table_info.ident.table_id = table_id;
self.data.table_route = Some(table_route);
self.data.region_wal_options = Some(region_wal_options);
}
}
#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr)]
@@ -363,8 +425,10 @@ pub enum CreateTableState {
pub struct CreateTableData {
pub state: CreateTableState,
pub task: CreateTableTask,
table_route: TableRouteValue,
pub region_wal_options: HashMap<RegionNumber, String>,
/// None stands for not allocated yet.
table_route: Option<TableRouteValue>,
/// None stands for not allocated yet.
pub region_wal_options: Option<HashMap<RegionNumber, String>>,
pub cluster_id: ClusterId,
}

View File

@@ -34,7 +34,7 @@ use table::table_reference::TableReference;
use super::utils::handle_retry_error;
use crate::cache_invalidator::Context;
use crate::ddl::utils::handle_operate_region_error;
use crate::ddl::utils::add_peer_context_if_needed;
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::key::table_info::TableInfoValue;
@@ -223,7 +223,7 @@ impl DropTableProcedure {
drop_region_tasks.push(async move {
if let Err(err) = requester.handle(request).await {
if err.status_code() != StatusCode::RegionNotFound {
return Err(handle_operate_region_error(datanode)(err));
return Err(add_peer_context_if_needed(datanode)(err));
}
}
Ok(())

View File

@@ -23,21 +23,22 @@ use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
use store_api::storage::{RegionId, RegionNumber, TableId};
use crate::ddl::{TableMetadata, TableMetadataAllocatorContext};
use crate::error::{Result, TableNotFoundSnafu, UnsupportedSnafu};
use crate::key::table_name::TableNameKey;
use crate::error::{self, Result, TableNotFoundSnafu, UnsupportedSnafu};
use crate::key::table_name::{TableNameKey, TableNameManager};
use crate::key::table_route::{LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue};
use crate::key::TableMetadataManagerRef;
use crate::peer::Peer;
use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{Region, RegionRoute};
use crate::sequence::SequenceRef;
use crate::wal_options_allocator::{allocate_region_wal_options, WalOptionsAllocatorRef};
pub type TableMetadataAllocatorRef = Arc<TableMetadataAllocator>;
#[derive(Clone)]
pub struct TableMetadataAllocator {
table_id_sequence: SequenceRef,
wal_options_allocator: WalOptionsAllocatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_name_manager: TableNameManager,
peer_allocator: PeerAllocatorRef,
}
@@ -45,12 +46,12 @@ impl TableMetadataAllocator {
pub fn new(
table_id_sequence: SequenceRef,
wal_options_allocator: WalOptionsAllocatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_name_manager: TableNameManager,
) -> Self {
Self::with_peer_allocator(
table_id_sequence,
wal_options_allocator,
table_metadata_manager,
table_name_manager,
Arc::new(NoopPeerAllocator),
)
}
@@ -58,18 +59,18 @@ impl TableMetadataAllocator {
pub fn with_peer_allocator(
table_id_sequence: SequenceRef,
wal_options_allocator: WalOptionsAllocatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_name_manager: TableNameManager,
peer_allocator: PeerAllocatorRef,
) -> Self {
Self {
table_id_sequence,
wal_options_allocator,
table_metadata_manager,
table_name_manager,
peer_allocator,
}
}
async fn allocate_table_id(&self, task: &CreateTableTask) -> Result<TableId> {
pub(crate) async fn allocate_table_id(&self, task: &CreateTableTask) -> Result<TableId> {
let table_id = if let Some(table_id) = &task.create_table.table_id {
let table_id = table_id.id;
@@ -123,6 +124,12 @@ impl TableMetadataAllocator {
task: &CreateTableTask,
) -> Result<TableRouteValue> {
let regions = task.partitions.len();
ensure!(
regions > 0,
error::UnexpectedSnafu {
err_msg: "The number of partitions must be greater than 0"
}
);
let table_route = if task.create_table.engine == METRIC_ENGINE
&& let Some(physical_table_name) = task
@@ -131,8 +138,7 @@ impl TableMetadataAllocator {
.get(LOGICAL_TABLE_METADATA_KEY)
{
let physical_table_id = self
.table_metadata_manager
.table_name_manager()
.table_name_manager
.get(TableNameKey::new(
&task.create_table.catalog_name,
&task.create_table.schema_name,

View File

@@ -0,0 +1,19 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod create_table;
pub use create_table::{
TestColumnDef, TestColumnDefBuilder, TestCreateTableExpr, TestCreateTableExprBuilder,
};

View File

@@ -0,0 +1,165 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::v1::column_def::try_as_column_schema;
use api::v1::{ColumnDataType, ColumnDef, CreateTableExpr, SemanticType};
use chrono::DateTime;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO2_ENGINE};
use datatypes::schema::RawSchema;
use derive_builder::Builder;
use store_api::storage::TableId;
use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
use table::requests::TableOptions;
#[derive(Default, Builder)]
pub struct TestColumnDef {
#[builder(setter(into), default)]
name: String,
data_type: ColumnDataType,
#[builder(default)]
is_nullable: bool,
semantic_type: SemanticType,
#[builder(setter(into), default)]
comment: String,
}
impl From<TestColumnDef> for ColumnDef {
fn from(
TestColumnDef {
name,
data_type,
is_nullable,
semantic_type,
comment,
}: TestColumnDef,
) -> Self {
Self {
name,
data_type: data_type as i32,
is_nullable,
default_constraint: vec![],
semantic_type: semantic_type as i32,
comment,
datatype_extension: None,
}
}
}
#[derive(Default, Builder)]
#[builder(default)]
pub struct TestCreateTableExpr {
#[builder(setter(into), default = "DEFAULT_CATALOG_NAME.to_string()")]
catalog_name: String,
#[builder(setter(into), default = "DEFAULT_SCHEMA_NAME.to_string()")]
schema_name: String,
#[builder(setter(into))]
table_name: String,
#[builder(setter(into))]
desc: String,
#[builder(setter(into))]
column_defs: Vec<ColumnDef>,
#[builder(setter(into))]
time_index: String,
#[builder(setter(into))]
primary_keys: Vec<String>,
create_if_not_exists: bool,
table_options: HashMap<String, String>,
table_id: Option<TableId>,
#[builder(setter(into), default = "MITO2_ENGINE.to_string()")]
engine: String,
}
impl From<TestCreateTableExpr> for CreateTableExpr {
fn from(
TestCreateTableExpr {
catalog_name,
schema_name,
table_name,
desc,
column_defs,
time_index,
primary_keys,
create_if_not_exists,
table_options,
table_id,
engine,
}: TestCreateTableExpr,
) -> Self {
Self {
catalog_name,
schema_name,
table_name,
desc,
column_defs,
time_index,
primary_keys,
create_if_not_exists,
table_options,
table_id: table_id.map(|id| api::v1::TableId { id }),
engine,
}
}
}
/// Builds [RawTableInfo] from [CreateTableExpr].
pub fn build_raw_table_info_from_expr(expr: &CreateTableExpr) -> RawTableInfo {
RawTableInfo {
ident: TableIdent {
table_id: expr
.table_id
.as_ref()
.map(|table_id| table_id.id)
.unwrap_or(0),
version: 1,
},
name: expr.table_name.to_string(),
desc: Some(expr.desc.to_string()),
catalog_name: expr.catalog_name.to_string(),
schema_name: expr.schema_name.to_string(),
meta: RawTableMeta {
schema: RawSchema {
column_schemas: expr
.column_defs
.iter()
.map(|column| try_as_column_schema(column).unwrap())
.collect(),
timestamp_index: expr
.column_defs
.iter()
.position(|column| column.semantic_type() == SemanticType::Timestamp),
version: 0,
},
primary_key_indices: expr
.primary_keys
.iter()
.map(|key| {
expr.column_defs
.iter()
.position(|column| &column.name == key)
.unwrap()
})
.collect(),
value_indices: vec![],
engine: expr.engine.to_string(),
next_column_id: expr.column_defs.len() as u32,
region_numbers: vec![],
options: TableOptions::default(),
created_on: DateTime::default(),
partition_key_indices: vec![],
},
table_type: TableType::Base,
}
}

View File

@@ -0,0 +1,16 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod create_logical_tables;
mod create_table;

View File

@@ -0,0 +1,518 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::meta::Partition;
use api::v1::region::{QueryRequest, RegionRequest};
use api::v1::{ColumnDataType, SemanticType};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId, Status};
use common_procedure_test::MockContextProvider;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::debug;
use store_api::storage::RegionId;
use table::metadata::RawTableInfo;
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::ddl::test_util::create_table::build_raw_table_info_from_expr;
use crate::ddl::test_util::{TestColumnDefBuilder, TestCreateTableExprBuilder};
use crate::ddl::{DdlContext, TableMetadata, TableMetadataAllocatorContext};
use crate::error::{Error, Result};
use crate::key::table_route::TableRouteValue;
use crate::peer::Peer;
use crate::rpc::ddl::CreateTableTask;
use crate::test_util::{new_ddl_context, AffectedRows, MockDatanodeHandler, MockDatanodeManager};
// Note: this code may be duplicated with others.
// However, it's by design, ensures the tests are easy to be modified or added.
fn test_create_logical_table_task(name: &str) -> CreateTableTask {
let create_table = TestCreateTableExprBuilder::default()
.column_defs([
TestColumnDefBuilder::default()
.name("ts")
.data_type(ColumnDataType::TimestampMillisecond)
.semantic_type(SemanticType::Timestamp)
.build()
.unwrap()
.into(),
TestColumnDefBuilder::default()
.name("host")
.data_type(ColumnDataType::String)
.semantic_type(SemanticType::Tag)
.build()
.unwrap()
.into(),
TestColumnDefBuilder::default()
.name("cpu")
.data_type(ColumnDataType::Float64)
.semantic_type(SemanticType::Field)
.build()
.unwrap()
.into(),
])
.time_index("ts")
.primary_keys(["host".into()])
.table_name(name)
.build()
.unwrap()
.into();
let table_info = build_raw_table_info_from_expr(&create_table);
CreateTableTask {
create_table,
// Single region
partitions: vec![Partition {
column_list: vec![],
value_list: vec![],
}],
table_info,
}
}
// Note: this code may be duplicated with others.
// However, it's by design, ensures the tests are easy to be modified or added.
fn test_create_physical_table_task(name: &str) -> CreateTableTask {
let create_table = TestCreateTableExprBuilder::default()
.column_defs([
TestColumnDefBuilder::default()
.name("ts")
.data_type(ColumnDataType::TimestampMillisecond)
.semantic_type(SemanticType::Timestamp)
.build()
.unwrap()
.into(),
TestColumnDefBuilder::default()
.name("value")
.data_type(ColumnDataType::Float64)
.semantic_type(SemanticType::Field)
.build()
.unwrap()
.into(),
])
.time_index("ts")
.primary_keys(["value".into()])
.table_name(name)
.build()
.unwrap()
.into();
let table_info = build_raw_table_info_from_expr(&create_table);
CreateTableTask {
create_table,
// Single region
partitions: vec![Partition {
column_list: vec![],
value_list: vec![],
}],
table_info,
}
}
#[tokio::test]
async fn test_on_prepare_physical_table_not_found() {
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
let cluster_id = 1;
let tasks = vec![test_create_logical_table_task("foo")];
let physical_table_id = 1024u32;
let mut procedure =
CreateLogicalTablesProcedure::new(cluster_id, tasks, physical_table_id, ddl_context);
let err = procedure.on_prepare().await.unwrap_err();
assert_matches!(err, Error::TableRouteNotFound { .. });
}
async fn create_physical_table_metadata(
ddl_context: &DdlContext,
table_info: RawTableInfo,
table_route: TableRouteValue,
) {
ddl_context
.table_metadata_manager
.create_table_metadata(table_info, table_route, HashMap::default())
.await
.unwrap();
}
#[tokio::test]
async fn test_on_prepare() {
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
let cluster_id = 1;
// Prepares physical table metadata.
let mut create_physical_table_task = test_create_physical_table_task("phy_table");
let TableMetadata {
table_id,
table_route,
..
} = ddl_context
.table_metadata_allocator
.create(
&TableMetadataAllocatorContext { cluster_id },
&create_physical_table_task,
)
.await
.unwrap();
create_physical_table_task.set_table_id(table_id);
create_physical_table_metadata(
&ddl_context,
create_physical_table_task.table_info.clone(),
table_route,
)
.await;
// The create logical table procedure.
let tasks = vec![test_create_logical_table_task("foo")];
let physical_table_id = table_id;
let mut procedure =
CreateLogicalTablesProcedure::new(cluster_id, tasks, physical_table_id, ddl_context);
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
}
#[tokio::test]
async fn test_on_prepare_logical_table_exists_err() {
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
let cluster_id = 1;
// Prepares physical table metadata.
let mut create_physical_table_task = test_create_physical_table_task("phy_table");
let TableMetadata {
table_id,
table_route,
..
} = ddl_context
.table_metadata_allocator
.create(
&TableMetadataAllocatorContext { cluster_id },
&create_physical_table_task,
)
.await
.unwrap();
create_physical_table_task.set_table_id(table_id);
create_physical_table_metadata(
&ddl_context,
create_physical_table_task.table_info.clone(),
table_route,
)
.await;
// Creates the logical table metadata.
let mut task = test_create_logical_table_task("foo");
task.set_table_id(1025);
ddl_context
.table_metadata_manager
.create_logical_tables_metadata(vec![(
task.table_info.clone(),
TableRouteValue::logical(1024, vec![RegionId::new(1025, 1)]),
)])
.await
.unwrap();
// The create logical table procedure.
let physical_table_id = table_id;
let mut procedure =
CreateLogicalTablesProcedure::new(cluster_id, vec![task], physical_table_id, ddl_context);
let err = procedure.on_prepare().await.unwrap_err();
assert_matches!(err, Error::TableAlreadyExists { .. });
assert_eq!(err.status_code(), StatusCode::TableAlreadyExists);
}
#[tokio::test]
async fn test_on_prepare_with_create_if_table_exists() {
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
let cluster_id = 1;
// Prepares physical table metadata.
let mut create_physical_table_task = test_create_physical_table_task("phy_table");
let TableMetadata {
table_id,
table_route,
..
} = ddl_context
.table_metadata_allocator
.create(
&TableMetadataAllocatorContext { cluster_id },
&create_physical_table_task,
)
.await
.unwrap();
create_physical_table_task.set_table_id(table_id);
create_physical_table_metadata(
&ddl_context,
create_physical_table_task.table_info.clone(),
table_route,
)
.await;
// Creates the logical table metadata.
let mut task = test_create_logical_table_task("foo");
task.set_table_id(8192);
ddl_context
.table_metadata_manager
.create_logical_tables_metadata(vec![(
task.table_info.clone(),
TableRouteValue::logical(1024, vec![RegionId::new(8192, 1)]),
)])
.await
.unwrap();
// The create logical table procedure.
let physical_table_id = table_id;
// Sets `create_if_not_exists`
task.create_table.create_if_not_exists = true;
let mut procedure =
CreateLogicalTablesProcedure::new(cluster_id, vec![task], physical_table_id, ddl_context);
let status = procedure.on_prepare().await.unwrap();
let output = status.downcast_output_ref::<Vec<u32>>().unwrap();
assert_eq!(*output, vec![8192]);
}
#[tokio::test]
async fn test_on_prepare_part_logical_tables_exist() {
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
let cluster_id = 1;
// Prepares physical table metadata.
let mut create_physical_table_task = test_create_physical_table_task("phy_table");
let TableMetadata {
table_id,
table_route,
..
} = ddl_context
.table_metadata_allocator
.create(
&TableMetadataAllocatorContext { cluster_id },
&create_physical_table_task,
)
.await
.unwrap();
create_physical_table_task.set_table_id(table_id);
create_physical_table_metadata(
&ddl_context,
create_physical_table_task.table_info.clone(),
table_route,
)
.await;
// Creates the logical table metadata.
let mut task = test_create_logical_table_task("exists");
task.set_table_id(8192);
ddl_context
.table_metadata_manager
.create_logical_tables_metadata(vec![(
task.table_info.clone(),
TableRouteValue::logical(1024, vec![RegionId::new(8192, 1)]),
)])
.await
.unwrap();
// The create logical table procedure.
let physical_table_id = table_id;
// Sets `create_if_not_exists`
task.create_table.create_if_not_exists = true;
let non_exist_task = test_create_logical_table_task("non_exists");
let mut procedure = CreateLogicalTablesProcedure::new(
cluster_id,
vec![task, non_exist_task],
physical_table_id,
ddl_context,
);
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
}
#[derive(Clone)]
pub struct NaiveDatanodeHandler;
#[async_trait::async_trait]
impl MockDatanodeHandler for NaiveDatanodeHandler {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<AffectedRows> {
debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
Ok(0)
}
async fn handle_query(
&self,
_peer: &Peer,
_request: QueryRequest,
) -> Result<SendableRecordBatchStream> {
unreachable!()
}
}
#[tokio::test]
async fn test_on_create_metadata() {
let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
let ddl_context = new_ddl_context(datanode_manager);
let cluster_id = 1;
// Prepares physical table metadata.
let mut create_physical_table_task = test_create_physical_table_task("phy_table");
let TableMetadata {
table_id,
table_route,
..
} = ddl_context
.table_metadata_allocator
.create(
&TableMetadataAllocatorContext { cluster_id },
&create_physical_table_task,
)
.await
.unwrap();
create_physical_table_task.set_table_id(table_id);
create_physical_table_metadata(
&ddl_context,
create_physical_table_task.table_info.clone(),
table_route,
)
.await;
// The create logical table procedure.
let physical_table_id = table_id;
// Creates the logical table metadata.
let task = test_create_logical_table_task("foo");
let yet_another_task = test_create_logical_table_task("bar");
let mut procedure = CreateLogicalTablesProcedure::new(
cluster_id,
vec![task, yet_another_task],
physical_table_id,
ddl_context,
);
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
};
procedure.execute(&ctx).await.unwrap();
// Triggers procedure to create table metadata
let status = procedure.execute(&ctx).await.unwrap();
let table_ids = status.downcast_output_ref::<Vec<u32>>().unwrap();
assert_eq!(*table_ids, vec![1025, 1026]);
}
#[tokio::test]
async fn test_on_create_metadata_part_logical_tables_exist() {
let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
let ddl_context = new_ddl_context(datanode_manager);
let cluster_id = 1;
// Prepares physical table metadata.
let mut create_physical_table_task = test_create_physical_table_task("phy_table");
let TableMetadata {
table_id,
table_route,
..
} = ddl_context
.table_metadata_allocator
.create(
&TableMetadataAllocatorContext { cluster_id },
&create_physical_table_task,
)
.await
.unwrap();
create_physical_table_task.set_table_id(table_id);
create_physical_table_metadata(
&ddl_context,
create_physical_table_task.table_info.clone(),
table_route,
)
.await;
// Creates the logical table metadata.
let mut task = test_create_logical_table_task("exists");
task.set_table_id(8192);
ddl_context
.table_metadata_manager
.create_logical_tables_metadata(vec![(
task.table_info.clone(),
TableRouteValue::logical(1024, vec![RegionId::new(8192, 1)]),
)])
.await
.unwrap();
// The create logical table procedure.
let physical_table_id = table_id;
// Sets `create_if_not_exists`
task.create_table.create_if_not_exists = true;
let non_exist_task = test_create_logical_table_task("non_exists");
let mut procedure = CreateLogicalTablesProcedure::new(
cluster_id,
vec![task, non_exist_task],
physical_table_id,
ddl_context,
);
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
};
procedure.execute(&ctx).await.unwrap();
// Triggers procedure to create table metadata
let status = procedure.execute(&ctx).await.unwrap();
let table_ids = status.downcast_output_ref::<Vec<u32>>().unwrap();
assert_eq!(*table_ids, vec![8192, 1025]);
}
#[tokio::test]
async fn test_on_create_metadata_err() {
let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
let ddl_context = new_ddl_context(datanode_manager);
let cluster_id = 1;
// Prepares physical table metadata.
let mut create_physical_table_task = test_create_physical_table_task("phy_table");
let TableMetadata {
table_id,
table_route,
..
} = ddl_context
.table_metadata_allocator
.create(
&TableMetadataAllocatorContext { cluster_id },
&create_physical_table_task,
)
.await
.unwrap();
create_physical_table_task.set_table_id(table_id);
create_physical_table_metadata(
&ddl_context,
create_physical_table_task.table_info.clone(),
table_route,
)
.await;
// The create logical table procedure.
let physical_table_id = table_id;
// Creates the logical table metadata.
let task = test_create_logical_table_task("foo");
let yet_another_task = test_create_logical_table_task("bar");
let mut procedure = CreateLogicalTablesProcedure::new(
cluster_id,
vec![task.clone(), yet_another_task],
physical_table_id,
ddl_context.clone(),
);
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
};
procedure.execute(&ctx).await.unwrap();
// Creates logical table metadata(different with the task)
let mut task = task.clone();
task.table_info.ident.table_id = 1025;
ddl_context
.table_metadata_manager
.create_logical_tables_metadata(vec![(
task.table_info,
TableRouteValue::logical(512, vec![RegionId::new(1026, 1)]),
)])
.await
.unwrap();
// Triggers procedure to create table metadata
let error = procedure.execute(&ctx).await.unwrap_err();
assert!(!error.is_retry_later());
}

View File

@@ -0,0 +1,328 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::meta::Partition;
use api::v1::region::{QueryRequest, RegionRequest};
use api::v1::{ColumnDataType, SemanticType};
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId, Status};
use common_procedure_test::MockContextProvider;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::debug;
use crate::ddl::create_table::CreateTableProcedure;
use crate::ddl::test_util::create_table::build_raw_table_info_from_expr;
use crate::ddl::test_util::{TestColumnDefBuilder, TestCreateTableExprBuilder};
use crate::error;
use crate::error::{Error, Result};
use crate::key::table_route::TableRouteValue;
use crate::peer::Peer;
use crate::rpc::ddl::CreateTableTask;
use crate::test_util::{new_ddl_context, AffectedRows, MockDatanodeHandler, MockDatanodeManager};
#[async_trait::async_trait]
impl MockDatanodeHandler for () {
async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<AffectedRows> {
unreachable!()
}
async fn handle_query(
&self,
_peer: &Peer,
_request: QueryRequest,
) -> Result<SendableRecordBatchStream> {
unreachable!()
}
}
fn test_create_table_task(name: &str) -> CreateTableTask {
let create_table = TestCreateTableExprBuilder::default()
.column_defs([
TestColumnDefBuilder::default()
.name("ts")
.data_type(ColumnDataType::TimestampMillisecond)
.semantic_type(SemanticType::Timestamp)
.build()
.unwrap()
.into(),
TestColumnDefBuilder::default()
.name("host")
.data_type(ColumnDataType::String)
.semantic_type(SemanticType::Tag)
.build()
.unwrap()
.into(),
TestColumnDefBuilder::default()
.name("cpu")
.data_type(ColumnDataType::Float64)
.semantic_type(SemanticType::Field)
.build()
.unwrap()
.into(),
])
.time_index("ts")
.primary_keys(["host".into()])
.table_name(name)
.build()
.unwrap()
.into();
let table_info = build_raw_table_info_from_expr(&create_table);
CreateTableTask {
create_table,
// Single region
partitions: vec![Partition {
column_list: vec![],
value_list: vec![],
}],
table_info,
}
}
#[tokio::test]
async fn test_on_prepare_table_exists_err() {
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
let cluster_id = 1;
let task = test_create_table_task("foo");
assert!(!task.create_table.create_if_not_exists);
// Puts a value to table name key.
ddl_context
.table_metadata_manager
.create_table_metadata(
task.table_info.clone(),
TableRouteValue::physical(vec![]),
HashMap::new(),
)
.await
.unwrap();
let mut procedure = CreateTableProcedure::new(cluster_id, task, ddl_context);
let err = procedure.on_prepare().await.unwrap_err();
assert_matches!(err, Error::TableAlreadyExists { .. });
assert_eq!(err.status_code(), StatusCode::TableAlreadyExists);
}
#[tokio::test]
async fn test_on_prepare_with_create_if_table_exists() {
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
let cluster_id = 1;
let mut task = test_create_table_task("foo");
task.create_table.create_if_not_exists = true;
task.table_info.ident.table_id = 1024;
// Puts a value to table name key.
ddl_context
.table_metadata_manager
.create_table_metadata(
task.table_info.clone(),
TableRouteValue::physical(vec![]),
HashMap::new(),
)
.await
.unwrap();
let mut procedure = CreateTableProcedure::new(cluster_id, task, ddl_context);
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Done { output: Some(..) });
let table_id = *status.downcast_output_ref::<u32>().unwrap();
assert_eq!(table_id, 1024);
}
#[tokio::test]
async fn test_on_prepare_without_create_if_table_exists() {
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
let cluster_id = 1;
let mut task = test_create_table_task("foo");
task.create_table.create_if_not_exists = true;
let mut procedure = CreateTableProcedure::new(cluster_id, task, ddl_context);
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
assert_eq!(procedure.table_id(), 1024);
}
#[tokio::test]
async fn test_on_prepare_with_no_partition_err() {
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
let cluster_id = 1;
let mut task = test_create_table_task("foo");
task.partitions = vec![];
task.create_table.create_if_not_exists = true;
let mut procedure = CreateTableProcedure::new(cluster_id, task, ddl_context);
let err = procedure.on_prepare().await.unwrap_err();
assert_matches!(err, Error::Unexpected { .. });
assert!(err
.to_string()
.contains("The number of partitions must be greater than 0"),);
}
#[derive(Clone)]
pub struct RetryErrorDatanodeHandler;
#[async_trait::async_trait]
impl MockDatanodeHandler for RetryErrorDatanodeHandler {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<AffectedRows> {
debug!("Returning retry later for request: {request:?}, peer: {peer:?}");
Err(Error::RetryLater {
source: BoxedError::new(
error::UnexpectedSnafu {
err_msg: "retry later",
}
.build(),
),
})
}
async fn handle_query(
&self,
_peer: &Peer,
_request: QueryRequest,
) -> Result<SendableRecordBatchStream> {
unreachable!()
}
}
#[tokio::test]
async fn test_on_datanode_create_regions_should_retry() {
common_telemetry::init_default_ut_logging();
let datanode_manager = Arc::new(MockDatanodeManager::new(RetryErrorDatanodeHandler));
let ddl_context = new_ddl_context(datanode_manager);
let cluster_id = 1;
let task = test_create_table_task("foo");
assert!(!task.create_table.create_if_not_exists);
let mut procedure = CreateTableProcedure::new(cluster_id, task, ddl_context);
procedure.on_prepare().await.unwrap();
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
};
let error = procedure.execute(&ctx).await.unwrap_err();
assert!(error.is_retry_later());
}
#[derive(Clone)]
pub struct UnexpectedErrorDatanodeHandler;
#[async_trait::async_trait]
impl MockDatanodeHandler for UnexpectedErrorDatanodeHandler {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<AffectedRows> {
debug!("Returning mock error for request: {request:?}, peer: {peer:?}");
error::UnexpectedSnafu {
err_msg: "mock error",
}
.fail()
}
async fn handle_query(
&self,
_peer: &Peer,
_request: QueryRequest,
) -> Result<SendableRecordBatchStream> {
unreachable!()
}
}
#[tokio::test]
async fn test_on_datanode_create_regions_should_not_retry() {
common_telemetry::init_default_ut_logging();
let datanode_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
let ddl_context = new_ddl_context(datanode_manager);
let cluster_id = 1;
let task = test_create_table_task("foo");
assert!(!task.create_table.create_if_not_exists);
let mut procedure = CreateTableProcedure::new(cluster_id, task, ddl_context);
procedure.on_prepare().await.unwrap();
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
};
let error = procedure.execute(&ctx).await.unwrap_err();
assert!(!error.is_retry_later());
}
#[derive(Clone)]
pub struct NaiveDatanodeHandler;
#[async_trait::async_trait]
impl MockDatanodeHandler for NaiveDatanodeHandler {
async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<AffectedRows> {
debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
Ok(0)
}
async fn handle_query(
&self,
_peer: &Peer,
_request: QueryRequest,
) -> Result<SendableRecordBatchStream> {
unreachable!()
}
}
#[tokio::test]
async fn test_on_create_metadata_error() {
common_telemetry::init_default_ut_logging();
let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
let ddl_context = new_ddl_context(datanode_manager);
let cluster_id = 1;
let task = test_create_table_task("foo");
assert!(!task.create_table.create_if_not_exists);
let mut procedure = CreateTableProcedure::new(cluster_id, task.clone(), ddl_context.clone());
procedure.on_prepare().await.unwrap();
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
};
procedure.execute(&ctx).await.unwrap();
let mut task = task;
// Creates table metadata(different with the task)
task.table_info.ident.table_id = procedure.table_id();
ddl_context
.table_metadata_manager
.create_table_metadata(
task.table_info.clone(),
TableRouteValue::physical(vec![]),
HashMap::new(),
)
.await
.unwrap();
// Triggers procedure to create table metadata
let error = procedure.execute(&ctx).await.unwrap_err();
assert!(!error.is_retry_later());
}
#[tokio::test]
async fn test_on_create_metadata() {
common_telemetry::init_default_ut_logging();
let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
let ddl_context = new_ddl_context(datanode_manager);
let cluster_id = 1;
let task = test_create_table_task("foo");
assert!(!task.create_table.create_if_not_exists);
let mut procedure = CreateTableProcedure::new(cluster_id, task, ddl_context);
procedure.on_prepare().await.unwrap();
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
};
procedure.execute(&ctx).await.unwrap();
// Triggers procedure to create table metadata
let status = procedure.execute(&ctx).await.unwrap();
let table_id = status.downcast_output_ref::<u32>().unwrap();
assert_eq!(*table_id, 1024);
}

View File

@@ -31,7 +31,7 @@ use table::metadata::{RawTableInfo, TableId};
use table::table_reference::TableReference;
use super::utils::handle_retry_error;
use crate::ddl::utils::handle_operate_region_error;
use crate::ddl::utils::add_peer_context_if_needed;
use crate::ddl::DdlContext;
use crate::error::{Result, TableNotFoundSnafu};
use crate::key::table_info::TableInfoValue;
@@ -169,10 +169,10 @@ impl TruncateTableProcedure {
let requester = requester.clone();
truncate_region_tasks.push(async move {
if let Err(err) = requester.handle(request).await {
return Err(handle_operate_region_error(datanode)(err));
}
Ok(())
requester
.handle(request)
.await
.map_err(add_peer_context_if_needed(datanode))
});
}
}

View File

@@ -27,19 +27,17 @@ use crate::key::TableMetadataManagerRef;
use crate::peer::Peer;
use crate::rpc::ddl::CreateTableTask;
pub fn handle_operate_region_error(datanode: Peer) -> impl FnOnce(Error) -> Error {
/// Adds [Peer] context if the error is unretryable.
pub fn add_peer_context_if_needed(datanode: Peer) -> impl FnOnce(Error) -> Error {
move |err| {
if matches!(err, Error::RetryLater { .. }) {
Error::RetryLater {
source: BoxedError::new(err),
}
} else {
Error::OperateDatanode {
if !err.is_retry_later() {
return Error::OperateDatanode {
location: location!(),
peer: datanode,
source: BoxedError::new(err),
}
};
}
err
}
}

View File

@@ -12,14 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use common_procedure::{watcher, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId};
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{info, tracing};
use common_telemetry::{debug, info, tracing};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{RegionNumber, TableId};
use store_api::storage::TableId;
use crate::cache_invalidator::CacheInvalidatorRef;
use crate::datanode_manager::DatanodeManagerRef;
@@ -27,15 +26,12 @@ use crate::ddl::alter_table::AlterTableProcedure;
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::ddl::create_table::CreateTableProcedure;
use crate::ddl::drop_table::DropTableProcedure;
use crate::ddl::table_meta::TableMetadataAllocator;
use crate::ddl::table_meta::TableMetadataAllocatorRef;
use crate::ddl::truncate_table::TruncateTableProcedure;
use crate::ddl::{
utils, DdlContext, DdlTaskExecutor, ExecutorContext, TableMetadata,
TableMetadataAllocatorContext,
};
use crate::ddl::{utils, DdlContext, ExecutorContext, ProcedureExecutor};
use crate::error::{
self, EmptyCreateTableTasksSnafu, ProcedureOutputSnafu, RegisterProcedureLoaderSnafu, Result,
SubmitProcedureSnafu, TableNotFoundSnafu, WaitProcedureSnafu,
SubmitProcedureSnafu, TableNotFoundSnafu, UnsupportedSnafu, WaitProcedureSnafu,
};
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
@@ -50,6 +46,8 @@ use crate::rpc::ddl::{
AlterTableTask, CreateTableTask, DropTableTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse,
TruncateTableTask,
};
use crate::rpc::procedure;
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
use crate::rpc::router::RegionRoute;
use crate::table_name::TableName;
use crate::ClusterId;
@@ -62,7 +60,7 @@ pub struct DdlManager {
datanode_manager: DatanodeManagerRef,
cache_invalidator: CacheInvalidatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_metadata_allocator: TableMetadataAllocator,
table_metadata_allocator: TableMetadataAllocatorRef,
memory_region_keeper: MemoryRegionKeeperRef,
}
@@ -73,7 +71,7 @@ impl DdlManager {
datanode_clients: DatanodeManagerRef,
cache_invalidator: CacheInvalidatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_metadata_allocator: TableMetadataAllocator,
table_metadata_allocator: TableMetadataAllocatorRef,
memory_region_keeper: MemoryRegionKeeperRef,
) -> Result<Self> {
let manager = Self {
@@ -100,6 +98,7 @@ impl DdlManager {
cache_invalidator: self.cache_invalidator.clone(),
table_metadata_manager: self.table_metadata_manager.clone(),
memory_region_keeper: self.memory_region_keeper.clone(),
table_metadata_allocator: self.table_metadata_allocator.clone(),
}
}
@@ -205,18 +204,10 @@ impl DdlManager {
&self,
cluster_id: ClusterId,
create_table_task: CreateTableTask,
table_route: TableRouteValue,
region_wal_options: HashMap<RegionNumber, String>,
) -> Result<(ProcedureId, Option<Output>)> {
let context = self.create_context();
let procedure = CreateTableProcedure::new(
cluster_id,
create_table_task,
table_route,
region_wal_options,
context,
);
let procedure = CreateTableProcedure::new(cluster_id, create_table_task, context);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
@@ -470,31 +461,10 @@ async fn handle_drop_table_task(
async fn handle_create_table_task(
ddl_manager: &DdlManager,
cluster_id: ClusterId,
mut create_table_task: CreateTableTask,
create_table_task: CreateTableTask,
) -> Result<SubmitDdlTaskResponse> {
let table_meta = ddl_manager
.table_metadata_allocator
.create(
&TableMetadataAllocatorContext { cluster_id },
&create_table_task,
)
.await?;
let TableMetadata {
table_id,
table_route,
region_wal_options,
} = table_meta;
create_table_task.table_info.ident.table_id = table_id;
let (id, output) = ddl_manager
.submit_create_table_task(
cluster_id,
create_table_task,
table_route,
region_wal_options,
)
.submit_create_table_task(cluster_id, create_table_task)
.await?;
let procedure_id = id.to_string();
@@ -559,8 +529,9 @@ async fn handle_create_logical_table_tasks(
})
}
/// TODO(dennis): let [`DdlManager`] implement [`ProcedureExecutor`] looks weird, find some way to refactor it.
#[async_trait::async_trait]
impl DdlTaskExecutor for DdlManager {
impl ProcedureExecutor for DdlManager {
async fn submit_ddl_task(
&self,
ctx: &ExecutorContext,
@@ -574,7 +545,7 @@ impl DdlTaskExecutor for DdlManager {
.attach(tracing::info_span!("DdlManager::submit_ddl_task"));
async move {
let cluster_id = ctx.cluster_id.unwrap_or_default();
info!("Submitting Ddl task: {:?}", request.task);
debug!("Submitting Ddl task: {:?}", request.task);
match request.task {
CreateTable(create_table_task) => {
handle_create_table_task(self, cluster_id, create_table_task).await
@@ -598,6 +569,37 @@ impl DdlTaskExecutor for DdlManager {
.trace(span)
.await
}
async fn migrate_region(
&self,
_ctx: &ExecutorContext,
_request: MigrateRegionRequest,
) -> Result<MigrateRegionResponse> {
UnsupportedSnafu {
operation: "migrate_region",
}
.fail()
}
async fn query_procedure_state(
&self,
_ctx: &ExecutorContext,
pid: &str,
) -> Result<ProcedureStateResponse> {
let pid = ProcedureId::parse_str(pid)
.with_context(|_| error::ParseProcedureIdSnafu { key: pid })?;
let state = self
.procedure_manager
.procedure_state(pid)
.await
.context(error::QueryProcedureSnafu)?
.context(error::ProcedureNotFoundSnafu {
pid: pid.to_string(),
})?;
Ok(procedure::procedure_state_to_pb_response(&state))
}
}
#[cfg(test)]
@@ -644,12 +646,12 @@ mod tests {
procedure_manager.clone(),
Arc::new(DummyDatanodeManager),
Arc::new(DummyCacheInvalidator),
table_metadata_manager,
TableMetadataAllocator::new(
table_metadata_manager.clone(),
Arc::new(TableMetadataAllocator::new(
Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
Arc::new(WalOptionsAllocator::default()),
Arc::new(TableMetadataManager::new(kv_backend)),
),
table_metadata_manager.table_name_manager().clone(),
)),
Arc::new(MemoryRegionKeeper::default()),
);

View File

@@ -100,6 +100,15 @@ pub enum Error {
source: common_procedure::Error,
},
#[snafu(display("Failed to query procedure"))]
QueryProcedure {
location: Location,
source: common_procedure::Error,
},
#[snafu(display("Procedure not found: {pid}"))]
ProcedureNotFound { location: Location, pid: String },
#[snafu(display("Failed to parse procedure id: {key}"))]
ParseProcedureId {
location: Location,
@@ -331,6 +340,9 @@ pub enum Error {
error: rskafka::client::error::Error,
},
#[snafu(display("Failed to resolve Kafka broker endpoint."))]
ResolveKafkaEndpoint { source: common_wal::error::Error },
#[snafu(display("Failed to build a Kafka controller client"))]
BuildKafkaCtrlClient {
location: Location,
@@ -416,6 +428,7 @@ impl ErrorExt for Error {
| BuildKafkaClient { .. }
| BuildKafkaCtrlClient { .. }
| BuildKafkaPartitionClient { .. }
| ResolveKafkaEndpoint { .. }
| ProduceRecord { .. }
| CreateKafkaWalTopic { .. }
| EmptyTopicPool { .. }
@@ -431,14 +444,17 @@ impl ErrorExt for Error {
| RenameTable { .. }
| Unsupported { .. } => StatusCode::Internal,
PrimaryKeyNotFound { .. } | EmptyKey { .. } | InvalidEngineType { .. } => {
StatusCode::InvalidArguments
}
ProcedureNotFound { .. }
| PrimaryKeyNotFound { .. }
| EmptyKey { .. }
| InvalidEngineType { .. } => StatusCode::InvalidArguments,
TableNotFound { .. } => StatusCode::TableNotFound,
TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
SubmitProcedure { source, .. } | WaitProcedure { source, .. } => source.status_code(),
SubmitProcedure { source, .. }
| QueryProcedure { source, .. }
| WaitProcedure { source, .. } => source.status_code(),
RegisterProcedureLoader { source, .. } => source.status_code(),
External { source, .. } => source.status_code(),
OperateDatanode { source, .. } => source.status_code(),

View File

@@ -363,8 +363,10 @@ impl TableMetadataManager {
Option<DeserializedValueWithBytes<TableInfoValue>>,
Option<DeserializedValueWithBytes<TableRouteValue>>,
)> {
let (get_table_route_txn, table_route_decoder) =
self.table_route_manager.build_get_txn(table_id);
let (get_table_route_txn, table_route_decoder) = self
.table_route_manager
.table_route_storage()
.build_get_txn(table_id);
let (get_table_info_txn, table_info_decoder) =
self.table_info_manager.build_get_txn(table_id);
@@ -414,6 +416,7 @@ impl TableMetadataManager {
let (create_table_route_txn, on_create_table_route_failure) = self
.table_route_manager()
.table_route_storage()
.build_create_txn(table_id, &table_route_value)?;
let mut txn = Txn::merge_all(vec![
@@ -506,6 +509,7 @@ impl TableMetadataManager {
let (create_table_route_txn, on_create_table_route_failure) = self
.table_route_manager()
.table_route_storage()
.build_create_txn(table_id, &table_route_value)?;
txns.push(create_table_route_txn);
@@ -579,6 +583,7 @@ impl TableMetadataManager {
// Deletes table route.
let delete_table_route_txn = self
.table_route_manager()
.table_route_storage()
.build_delete_txn(table_id, table_route_value)?;
let txn = Txn::merge_all(vec![
@@ -713,6 +718,7 @@ impl TableMetadataManager {
let (update_table_route_txn, on_update_table_route_failure) = self
.table_route_manager()
.table_route_storage()
.build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
let txn = Txn::merge_all(vec![update_datanode_table_txn, update_table_route_txn]);
@@ -765,6 +771,7 @@ impl TableMetadataManager {
let (update_table_route_txn, on_update_table_route_failure) = self
.table_route_manager()
.table_route_storage()
.build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
let r = self.kv_backend.txn(update_table_route_txn).await?;
@@ -1096,6 +1103,7 @@ mod tests {
assert!(table_metadata_manager
.table_route_manager()
.table_route_storage()
.get(table_id)
.await
.unwrap()
@@ -1120,7 +1128,8 @@ mod tests {
let removed_table_route = table_metadata_manager
.table_route_manager()
.get_removed(table_id)
.table_route_storage()
.get_raw_removed(table_id)
.await
.unwrap()
.unwrap()
@@ -1316,6 +1325,7 @@ mod tests {
let updated_route_value = table_metadata_manager
.table_route_manager()
.table_route_storage()
.get(table_id)
.await
.unwrap()

View File

@@ -144,6 +144,7 @@ impl TableNameValue {
}
}
#[derive(Clone)]
pub struct TableNameManager {
kv_backend: KvBackendRef,
}

View File

@@ -22,7 +22,7 @@ use table::metadata::TableId;
use super::{txn_helper, DeserializedValueWithBytes, TableMetaValue};
use crate::error::{
MetadataCorruptionSnafu, Result, SerdeJsonSnafu, TableRouteNotFoundSnafu,
self, MetadataCorruptionSnafu, Result, SerdeJsonSnafu, TableRouteNotFoundSnafu,
UnexpectedLogicalRouteTableSnafu,
};
use crate::key::{to_removed_key, RegionDistribution, TableMetaKey, TABLE_ROUTE_PREFIX};
@@ -77,7 +77,7 @@ impl TableRouteValue {
err_msg: format!("{self:?} is a non-physical TableRouteValue."),
}
);
let version = self.physical_table_route().version;
let version = self.as_physical_table_route_ref().version;
Ok(Self::Physical(PhysicalTableRouteValue {
region_routes,
version: version + 1,
@@ -95,7 +95,7 @@ impl TableRouteValue {
err_msg: format!("{self:?} is a non-physical TableRouteValue."),
}
);
Ok(self.physical_table_route().version)
Ok(self.as_physical_table_route_ref().version)
}
/// Returns the corresponding [RegionRoute], returns `None` if it's the specific region is not found.
@@ -109,7 +109,7 @@ impl TableRouteValue {
}
);
Ok(self
.physical_table_route()
.as_physical_table_route_ref()
.region_routes
.iter()
.find(|route| route.region.id == region_id)
@@ -129,10 +129,25 @@ impl TableRouteValue {
err_msg: format!("{self:?} is a non-physical TableRouteValue."),
}
);
Ok(&self.physical_table_route().region_routes)
Ok(&self.as_physical_table_route_ref().region_routes)
}
fn physical_table_route(&self) -> &PhysicalTableRouteValue {
/// Returns the reference of [`PhysicalTableRouteValue`].
///
/// # Panic
/// If it is not the [`PhysicalTableRouteValue`].
fn as_physical_table_route_ref(&self) -> &PhysicalTableRouteValue {
match self {
TableRouteValue::Physical(x) => x,
_ => unreachable!("Mistakenly been treated as a Physical TableRoute: {self:?}"),
}
}
/// Converts to [`PhysicalTableRouteValue`].
///
/// # Panic
/// If it is not the [`PhysicalTableRouteValue`].
fn into_physical_table_route(self) -> PhysicalTableRouteValue {
match self {
TableRouteValue::Physical(x) => x,
_ => unreachable!("Mistakenly been treated as a Physical TableRoute: {self:?}"),
@@ -213,111 +228,53 @@ impl Display for TableRouteKey {
}
pub struct TableRouteManager {
kv_backend: KvBackendRef,
storage: TableRouteStorage,
}
impl TableRouteManager {
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
Self {
storage: TableRouteStorage::new(kv_backend),
}
}
pub(crate) fn build_get_txn(
/// Returns the [`PhysicalTableRouteValue`] in the first level,
/// It won't follow the [`LogicalTableRouteValue`] to find the next level [`PhysicalTableRouteValue`].
///
/// Returns an error if the first level value is not a [`PhysicalTableRouteValue`].
pub async fn try_get_physical_table_route(
&self,
table_id: TableId,
) -> (
Txn,
impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>>,
) {
let key = TableRouteKey::new(table_id);
let raw_key = key.as_raw_key();
let txn = Txn::new().and_then(vec![TxnOp::Get(raw_key.clone())]);
(txn, txn_helper::build_txn_response_decoder_fn(raw_key))
}
/// Builds a create table route transaction. it expected the `__table_route/{table_id}` wasn't occupied.
pub fn build_create_txn(
&self,
table_id: TableId,
table_route_value: &TableRouteValue,
) -> Result<(
Txn,
impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>>,
)> {
let key = TableRouteKey::new(table_id);
let raw_key = key.as_raw_key();
let txn = txn_helper::build_put_if_absent_txn(
raw_key.clone(),
table_route_value.try_as_raw_value()?,
);
Ok((txn, txn_helper::build_txn_response_decoder_fn(raw_key)))
}
/// Builds a update table route transaction, it expected the remote value equals the `current_table_route_value`.
/// It retrieves the latest value if the comparing failed.
pub(crate) fn build_update_txn(
&self,
table_id: TableId,
current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
new_table_route_value: &TableRouteValue,
) -> Result<(
Txn,
impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>>,
)> {
let key = TableRouteKey::new(table_id);
let raw_key = key.as_raw_key();
let raw_value = current_table_route_value.get_raw_bytes();
let new_raw_value: Vec<u8> = new_table_route_value.try_as_raw_value()?;
let txn = txn_helper::build_compare_and_put_txn(raw_key.clone(), raw_value, new_raw_value);
Ok((txn, txn_helper::build_txn_response_decoder_fn(raw_key)))
}
/// Builds a delete table route transaction, it expected the remote value equals the `table_route_value`.
pub(crate) fn build_delete_txn(
&self,
table_id: TableId,
table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
) -> Result<Txn> {
let key = TableRouteKey::new(table_id);
let raw_key = key.as_raw_key();
let raw_value = table_route_value.get_raw_bytes();
let removed_key = to_removed_key(&String::from_utf8_lossy(&raw_key));
let txn = Txn::new().and_then(vec![
TxnOp::Delete(raw_key),
TxnOp::Put(removed_key.into_bytes(), raw_value),
]);
Ok(txn)
}
pub async fn get(
&self,
table_id: TableId,
) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>> {
let key = TableRouteKey::new(table_id);
self.kv_backend
.get(&key.as_raw_key())
.await?
.map(|kv| DeserializedValueWithBytes::from_inner_slice(&kv.value))
.transpose()
) -> Result<Option<PhysicalTableRouteValue>> {
match self.storage.get(table_id).await? {
Some(route) => {
ensure!(
route.is_physical(),
error::UnexpectedLogicalRouteTableSnafu {
err_msg: format!("{route:?} is a non-physical TableRouteValue.")
}
);
Ok(Some(route.into_physical_table_route()))
}
None => Ok(None),
}
}
/// Returns the [TableId] recursively.
///
/// Returns a [TableRouteNotFound](crate::error::Error::TableRouteNotFound) Error if:
/// - the table(`logical_or_physical_table_id`) does not exist.
pub async fn get_physical_table_id(
&self,
logical_or_physical_table_id: TableId,
) -> Result<TableId> {
let table_route = self
.storage
.get(logical_or_physical_table_id)
.await?
.context(TableRouteNotFoundSnafu {
table_id: logical_or_physical_table_id,
})?
.into_inner();
})?;
match table_route {
TableRouteValue::Physical(_) => Ok(logical_or_physical_table_id),
@@ -325,41 +282,58 @@ impl TableRouteManager {
}
}
/// Returns the [TableRouteValue::Physical] recursively.
///
/// Returns a [TableRouteNotFound](crate::error::Error::TableRouteNotFound) Error if:
/// - the physical table(`logical_or_physical_table_id`) does not exist
/// - the corresponding physical table of the logical table(`logical_or_physical_table_id`) does not exist.
pub async fn get_physical_table_route(
&self,
logical_or_physical_table_id: TableId,
) -> Result<(TableId, PhysicalTableRouteValue)> {
let table_route = self
.storage
.get(logical_or_physical_table_id)
.await?
.context(TableRouteNotFoundSnafu {
table_id: logical_or_physical_table_id,
})?
.into_inner();
})?;
match table_route {
TableRouteValue::Physical(x) => Ok((logical_or_physical_table_id, x)),
TableRouteValue::Logical(x) => {
let physical_table_id = x.physical_table_id();
let physical_table_route =
self.get(physical_table_id)
.await?
.context(TableRouteNotFoundSnafu {
table_id: physical_table_id,
})?;
Ok((
physical_table_id,
physical_table_route.physical_table_route().clone(),
))
let physical_table_route = self.storage.get(physical_table_id).await?.context(
TableRouteNotFoundSnafu {
table_id: physical_table_id,
},
)?;
let physical_table_route = physical_table_route.into_physical_table_route();
Ok((physical_table_id, physical_table_route))
}
}
}
/// Returns the [TableRouteValue::Physical] recursively.
///
/// Returns a [TableRouteNotFound](crate::error::Error::TableRouteNotFound) Error if:
/// - one of the logical tables corresponding to the physical table does not exist.
///
/// **Notes**: it may return a subset of `logical_or_physical_table_ids`.
pub async fn batch_get_physical_table_routes(
&self,
logical_or_physical_table_ids: &[TableId],
) -> Result<HashMap<TableId, PhysicalTableRouteValue>> {
let table_routes = self.batch_get(logical_or_physical_table_ids).await?;
let table_routes = self
.storage
.batch_get(logical_or_physical_table_ids)
.await?;
// Returns a subset of `logical_or_physical_table_ids`.
let table_routes = table_routes
.into_iter()
.zip(logical_or_physical_table_ids)
.filter_map(|(route, id)| route.map(|route| (*id, route)))
.collect::<HashMap<_, _>>();
let mut physical_table_routes = HashMap::with_capacity(table_routes.len());
let mut logical_table_ids = HashMap::with_capacity(table_routes.len());
@@ -379,13 +353,22 @@ impl TableRouteManager {
return Ok(physical_table_routes);
}
// Finds the logical tables corresponding to the physical tables.
let physical_table_ids = logical_table_ids
.values()
.cloned()
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>();
let table_routes = self.batch_get(&physical_table_ids).await?;
let table_routes = self
.table_route_storage()
.batch_get(&physical_table_ids)
.await?;
let table_routes = table_routes
.into_iter()
.zip(physical_table_ids)
.filter_map(|(route, id)| route.map(|route| (id, route)))
.collect::<HashMap<_, _>>();
for (logical_table_id, physical_table_id) in logical_table_ids {
let table_route =
@@ -414,40 +397,114 @@ impl TableRouteManager {
Ok(physical_table_routes)
}
/// It may return a subset of the `table_ids`.
pub async fn batch_get(
/// Returns [`RegionDistribution`] of the table(`table_id`).
pub async fn get_region_distribution(
&self,
table_ids: &[TableId],
) -> Result<HashMap<TableId, TableRouteValue>> {
let lookup_table = table_ids
.iter()
.map(|id| (TableRouteKey::new(*id).as_raw_key(), id))
.collect::<HashMap<_, _>>();
table_id: TableId,
) -> Result<Option<RegionDistribution>> {
self.storage
.get(table_id)
.await?
.map(|table_route| Ok(region_distribution(table_route.region_routes()?)))
.transpose()
}
let resp = self
.kv_backend
.batch_get(BatchGetRequest {
keys: lookup_table.keys().cloned().collect::<Vec<_>>(),
})
.await?;
/// Returns low-level APIs.
pub fn table_route_storage(&self) -> &TableRouteStorage {
&self.storage
}
}
let values = resp
.kvs
.iter()
.map(|kv| {
Ok((
// Safety: must exist.
**lookup_table.get(kv.key()).unwrap(),
TableRouteValue::try_from_raw_value(&kv.value)?,
))
})
.collect::<Result<HashMap<_, _>>>()?;
/// Low-level operations of [TableRouteValue].
pub struct TableRouteStorage {
kv_backend: KvBackendRef,
}
Ok(values)
impl TableRouteStorage {
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
}
/// Builds a get table route transaction(readonly).
pub(crate) fn build_get_txn(
&self,
table_id: TableId,
) -> (
Txn,
impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>>,
) {
let key = TableRouteKey::new(table_id);
let raw_key = key.as_raw_key();
let txn = Txn::new().and_then(vec![TxnOp::Get(raw_key.clone())]);
(txn, txn_helper::build_txn_response_decoder_fn(raw_key))
}
/// Builds a create table route transaction,
/// it expected the `__table_route/{table_id}` wasn't occupied.
pub fn build_create_txn(
&self,
table_id: TableId,
table_route_value: &TableRouteValue,
) -> Result<(
Txn,
impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>>,
)> {
let key = TableRouteKey::new(table_id);
let raw_key = key.as_raw_key();
let txn = txn_helper::build_put_if_absent_txn(
raw_key.clone(),
table_route_value.try_as_raw_value()?,
);
Ok((txn, txn_helper::build_txn_response_decoder_fn(raw_key)))
}
/// Builds a update table route transaction,
/// it expected the remote value equals the `current_table_route_value`.
/// It retrieves the latest value if the comparing failed.
pub(crate) fn build_update_txn(
&self,
table_id: TableId,
current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
new_table_route_value: &TableRouteValue,
) -> Result<(
Txn,
impl FnOnce(&Vec<TxnOpResponse>) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>>,
)> {
let key = TableRouteKey::new(table_id);
let raw_key = key.as_raw_key();
let raw_value = current_table_route_value.get_raw_bytes();
let new_raw_value: Vec<u8> = new_table_route_value.try_as_raw_value()?;
let txn = txn_helper::build_compare_and_put_txn(raw_key.clone(), raw_value, new_raw_value);
Ok((txn, txn_helper::build_txn_response_decoder_fn(raw_key)))
}
/// Builds a delete table route transaction,
/// it expected the remote value equals the `table_route_value`.
pub(crate) fn build_delete_txn(
&self,
table_id: TableId,
table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
) -> Result<Txn> {
let key = TableRouteKey::new(table_id);
let raw_key = key.as_raw_key();
let raw_value = table_route_value.get_raw_bytes();
let removed_key = to_removed_key(&String::from_utf8_lossy(&raw_key));
let txn = Txn::new().and_then(vec![
TxnOp::Delete(raw_key),
TxnOp::Put(removed_key.into_bytes(), raw_value),
]);
Ok(txn)
}
#[cfg(test)]
pub async fn get_removed(
pub async fn get_raw_removed(
&self,
table_id: TableId,
) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>> {
@@ -460,20 +517,64 @@ impl TableRouteManager {
.transpose()
}
pub async fn get_region_distribution(
/// Returns the [`TableRouteValue`].
pub async fn get(&self, table_id: TableId) -> Result<Option<TableRouteValue>> {
let key = TableRouteKey::new(table_id);
self.kv_backend
.get(&key.as_raw_key())
.await?
.map(|kv| TableRouteValue::try_from_raw_value(&kv.value))
.transpose()
}
/// Returns the [`TableRouteValue`] wrapped with [`DeserializedValueWithBytes`].
pub async fn get_raw(
&self,
table_id: TableId,
) -> Result<Option<RegionDistribution>> {
self.get(table_id)
) -> Result<Option<DeserializedValueWithBytes<TableRouteValue>>> {
let key = TableRouteKey::new(table_id);
self.kv_backend
.get(&key.as_raw_key())
.await?
.map(|table_route| Ok(region_distribution(table_route.region_routes()?)))
.map(|kv| DeserializedValueWithBytes::from_inner_slice(&kv.value))
.transpose()
}
/// Returns batch of [`TableRouteValue`] that respects the order of `table_ids`.
pub async fn batch_get(&self, table_ids: &[TableId]) -> Result<Vec<Option<TableRouteValue>>> {
let keys = table_ids
.iter()
.map(|id| TableRouteKey::new(*id).as_raw_key())
.collect::<Vec<_>>();
let resp = self
.kv_backend
.batch_get(BatchGetRequest { keys: keys.clone() })
.await?;
let kvs = resp
.kvs
.into_iter()
.map(|kv| (kv.key, kv.value))
.collect::<HashMap<_, _>>();
keys.into_iter()
.map(|key| {
if let Some(value) = kvs.get(&key) {
Ok(Some(TableRouteValue::try_from_raw_value(value)?))
} else {
Ok(None)
}
})
.collect::<Result<Vec<_>>>()
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::TxnService;
#[test]
fn test_table_route_compatibility() {
@@ -486,4 +587,81 @@ mod tests {
r#"Physical(PhysicalTableRouteValue { region_routes: [RegionRoute { region: Region { id: 1(0, 1), name: "r1", partition: None, attrs: {} }, leader_peer: Some(Peer { id: 2, addr: "a2" }), follower_peers: [], leader_status: None, leader_down_since: None }, RegionRoute { region: Region { id: 1(0, 1), name: "r1", partition: None, attrs: {} }, leader_peer: Some(Peer { id: 2, addr: "a2" }), follower_peers: [], leader_status: None, leader_down_since: None }], version: 0 })"#
);
}
#[tokio::test]
async fn test_table_route_storage_get_raw_empty() {
let kv = Arc::new(MemoryKvBackend::default());
let table_route_storage = TableRouteStorage::new(kv);
let table_route = table_route_storage.get_raw(1024).await.unwrap();
assert!(table_route.is_none());
}
#[tokio::test]
async fn test_table_route_storage_get_raw() {
let kv = Arc::new(MemoryKvBackend::default());
let table_route_storage = TableRouteStorage::new(kv.clone());
let table_route = table_route_storage.get_raw(1024).await.unwrap();
assert!(table_route.is_none());
let table_route_manager = TableRouteManager::new(kv.clone());
let table_route_value = TableRouteValue::Logical(LogicalTableRouteValue {
physical_table_id: 1023,
region_ids: vec![RegionId::new(1023, 1)],
});
let (txn, _) = table_route_manager
.table_route_storage()
.build_create_txn(1024, &table_route_value)
.unwrap();
let r = kv.txn(txn).await.unwrap();
assert!(r.succeeded);
let table_route = table_route_storage.get_raw(1024).await.unwrap();
assert!(table_route.is_some());
let got = table_route.unwrap().inner;
assert_eq!(got, table_route_value);
}
#[tokio::test]
async fn test_table_route_batch_get() {
let kv = Arc::new(MemoryKvBackend::default());
let table_route_storage = TableRouteStorage::new(kv.clone());
let routes = table_route_storage
.batch_get(&[1023, 1024, 1025])
.await
.unwrap();
assert!(routes.iter().all(Option::is_none));
let table_route_manager = TableRouteManager::new(kv.clone());
let routes = [
(
1024,
TableRouteValue::Logical(LogicalTableRouteValue {
physical_table_id: 1023,
region_ids: vec![RegionId::new(1023, 1)],
}),
),
(
1025,
TableRouteValue::Logical(LogicalTableRouteValue {
physical_table_id: 1023,
region_ids: vec![RegionId::new(1023, 2)],
}),
),
];
for (table_id, route) in &routes {
let (txn, _) = table_route_manager
.table_route_storage()
.build_create_txn(*table_id, route)
.unwrap();
let r = kv.txn(txn).await.unwrap();
assert!(r.succeeded);
}
let results = table_route_storage
.batch_get(&[9999, 1025, 8888, 1024])
.await
.unwrap();
assert!(results[0].is_none());
assert_eq!(results[1].as_ref().unwrap(), &routes[1].1);
assert!(results[2].is_none());
assert_eq!(results[3].as_ref().unwrap(), &routes[0].1);
}
}

Some files were not shown because too many files have changed in this diff Show More