Compare commits

...

68 Commits

Author SHA1 Message Date
Weny Xu
eab702cc02 feat: implement sync_region for metric engine (#5826)
* feat: implement `sync_region` for metric engine

* chore: apply suggestions from CR

* chore: upgrade proto
2025-04-03 12:46:20 +00:00
Zhenchi
dd63068df6 feat: add matches_term function (#5817)
* feat: add `matches_term` function

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* merge & fix

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix & skip char after boundary mismatch

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-04-03 09:09:41 +00:00
Yuhan Wang
f73b61e767 feat(remote-wal): add remote wal prune procedure (#5714)
* feat: add remote wal prune procedure

* feat: add retry logic and remove rollback

* chore: simplify the logic

* fix: remove REMOTE_WAL_LOCK

* fix: use in-memory kv

* perf: O(n) judgement

* chore: add single write lock

* test: add unit test

* chore: remove unused function

* chore: update comments

* chore: apply comments

* chore: apply comments
2025-04-03 08:11:51 +00:00
Yingwen
2acecd3620 feat: support REPLACE INTO statement (#5820)
* feat: support replace into

* feat: support replace into
2025-04-03 03:22:43 +00:00
Zhenchi
f797de3497 feat: add backend field to fulltext options (#5806)
* feat: add backend field to fulltext options

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* update proto

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix option conv

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix display

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* polish

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-04-02 09:15:54 +00:00
dennis zhuang
d53afa849d fix: interval cast expression can't work in range query, #5805 (#5813)
* fix: interval cast expression can't work in range query, #5805

* fix: nested cast

* test: make vector test stable
2025-04-02 08:46:17 +00:00
discord9
3aebfc1716 test: looser condition (#5816) 2025-04-02 07:38:05 +00:00
Weny Xu
dbb79c9671 feat: introduce CollectLeaderRegionHandler (#5811)
* feat: introduce `CollectLeaderRegionHandler`

* feat: add to default handler group

* fix: correct unit test

* chore: rename
2025-04-02 04:47:00 +00:00
shuiyisong
054056fcbb refactor: remove prom store write dispatch (#5812)
* refactor: remove prom store remote write dispatch pattern

* chore: ref XIX-22
2025-04-02 04:35:28 +00:00
Zhenchi
aa486db8b7 refactor: allow bloom filter search to apply and conjunction (#5770)
* refactor: change bloom filter search from any to all match

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* polish

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* place back in list

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* nit

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-04-01 12:50:34 +00:00
Weny Xu
4ef9afd8d8 feat: introduce read preference (#5783)
* feat: introduce read preference

* feat: introduce `RegionQueryHandlerFactory`

* feat: extract ReadPreference from http header

* test: add more tests

* chore: apply suggestions from CR

* chore: apply suggestions from CR
2025-04-01 09:17:01 +00:00
shuiyisong
f9221e9e66 perf: introduce simd_json for parsing ndjson (#5794)
* perf: introduce simd_json for parsing ndjson

* fix: some tests

* fix: some tests

* fix: es test case

* chore: use `as_bytes_mut()`

* chore: remove unnecessary `to_string`

* chore: add safety comment
2025-04-01 08:17:26 +00:00
Weny Xu
6c26fe9c80 fix: correct error status code (#5802) 2025-04-01 07:34:16 +00:00
fys
33c9fb737c refactor: remove mode option in configuration files (#5809)
* refactor: remove mode option in configuration files

* chore: remove mode in configuration file

* remvoe mode field in FlownodeOptions

* add comment for test

* update config.md

* remove mode field in standalone options

* fix: ci
2025-04-01 07:14:10 +00:00
Weny Xu
68ce796771 chore: expose modules (#5810) 2025-04-01 05:33:20 +00:00
Weny Xu
d701c18150 feat: introduce CustomizedRegionLeaseRenewer (#5762)
* feat: add manifest_version to `GrantedRegion`

* chore: upgrade proto

* chore: apply review suggestions

* chore: apply suggestions from CR

* feat: introduce `CustomizedRegionLeaseRenewerRef`

* chore: upgrade to `103948`
2025-03-31 13:25:05 +00:00
Weny Xu
d3a60d8821 feat: add limit for the number of running procedures (#5793)
* refactor: remove unused `messages`

* feat: introduce running procedure num limit

* feat: update config

* chore: apply suggestions from CR

* feat: impl `status_code` for `log-store` crate
2025-03-31 06:14:21 +00:00
discord9
5d688c6565 feat(flow): time window expr (#5785)
* feat: time window expr

* chore: comments

* refactor: per review

* chore: partially per review

* chore: per review

* chore: per review use query engine's session
2025-03-31 04:46:37 +00:00
Weny Xu
41aee1f1b7 feat: implement sync_region for mito engine (#5765)
* chore: upgrade proto to `2d52b`

* feat: add `SyncRegion` to `WorkerRequest`

* feat: impl `sync_region` for `Engine` trait

* test: add tests

* chore: fmt code

* chore: upgrade proto

* chore: unify `RegionLeaderState` and `RegionFollowerState`

* chore: check immutable memtable

* chore: fix clippy

* chore: apply suggestions from CR
2025-03-31 03:53:47 +00:00
yihong
c5b55fd8cf fix: close issue #3902 since upstream fixed (#5801)
Signed-off-by: yihong0618 <zouzou0208@gmail.com>
2025-03-30 12:34:52 +00:00
Ruihang Xia
8051dbbc31 fix: typo variadic (#5800)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-03-29 07:09:36 +00:00
Ruihang Xia
2d3192984d refactor: remove deprecated find_unique method (#5790)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-03-28 19:32:11 +00:00
shuiyisong
bef45ed0e8 feat(pipeline): support table name suffix templating in pipeline (#5775)
* chore: add table name template in pipeline yaml

* chore: implement apply function and add simple test

* chore: add comment and integration test

* chore: minor update

* fix: typos

* chore: change to table suffix

* chore: update comment and test

* chore: change name to table_suffix
2025-03-28 18:12:46 +00:00
LFC
a9e990768d refactor: skip re-taking arrays in memtable if possible (#5779)
experiment: skip sorting and re-taking arrays if possible when scanning memtable
2025-03-28 09:58:55 +00:00
Weny Xu
7e1ba49d3d refactor: remove useless region follower legacy code (#5795) 2025-03-28 08:10:30 +00:00
Yingwen
737558ef53 fix: support __name__ matcher in label values (#5773) 2025-03-28 02:18:59 +00:00
Yingwen
dbc25dd8da feat: expose scanner metrics to df execution metrics (#5699)
* feat: add metrics list to scanner

* chore: add report metrics method

* feat: use df metrics in PartitionMetrics

* feat: pass execution metrics to scan partition

* refactor: remove PartitionMetricsList

* feat: better debug format for ScanMetricsSet

* feat: do not expose all metrics to execution metrics by default

* refactor: use struct destruction

* feat: add metrics list to scanner

* chore: Add custom Debug for ScanMetricsSet and partition metrics display

* test: update sqlness result
2025-03-27 23:40:39 +00:00
Ruihang Xia
76a58a07e1 feat: simple implementation of DictionaryVector (#5758)
* basic impl

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* impl vector op

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* unit tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove unwraps

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* feat: enhance DictionaryVector operations and deprecate find_unique method

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* chore: remove find_unique test

* chore: remove unused import

* fix test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
2025-03-27 23:19:10 +00:00
Weny Xu
c2ba7fb16c refactor: remove useless region follower legacy code (#5787)
chore: remove region follower procedure
2025-03-27 11:50:29 +00:00
Lei, HUANG
09ef24fd75 refactor: remove useless partition legacy code (#5786)
* refactor: remove useless partition legacy code

* also remove error variants

* fix imports
2025-03-27 11:08:25 +00:00
Weny Xu
9b7b012620 feat: impl show region (#5782)
* fix: fix region follower procedure

* feat: add table related info to region peers table and follower regions

* feat: impl show region

* chore: apply suggestions from CR
2025-03-27 10:41:44 +00:00
fys
898e0bd828 chore: expose some methods (#5784) 2025-03-27 09:00:51 +00:00
shuiyisong
2b4ed43692 chore: accept table options in auto create table from hints (#5776)
chore: accept table options in auto create table from hint
2025-03-27 08:17:27 +00:00
Weny Xu
8f2ae4e136 feat: add AddRegionFollower and RemoveRegionFollower admin fn (#5780) 2025-03-27 06:30:50 +00:00
Weny Xu
0cd219a5d2 refactor: move list_flow_stats to ClusterInfo trait. (#5774)
refactor: minor refactor
2025-03-27 04:20:12 +00:00
fys
2b2ea5bf72 chore: upgrade some dependencies (#5777)
* chore: upgrade some dependencies

* chore: upgrade some dependencies

* fix: cr

* fix: ci

* fix: test

* fix: cargo fmt
2025-03-27 02:48:44 +00:00
discord9
e107bd5529 feat(flow): utils function for recording rule (#5768)
* chore: utils for rr

* chore: one more test

* chore: more test case

* test: even more tests

* chore: per review

* tests: add more&update testcase

* chore: update comment
2025-03-26 08:55:35 +00:00
Weny Xu
a31f0e255b feat: introduce RegionFollowerClient trait (#5771)
* chore: expose AskLeader

* feat: introduce `RegionFollowerClient` trait

* feat: build meta client with region follower client
2025-03-26 08:05:15 +00:00
Lei, HUANG
40b52f3b13 feat(mito): allow skipping wal while creating tables (#5740)
* chore: add Noop Wal option

* remove: WalOptionsAllocator::alloc method

* feat/no-op-wal:
 ### Add Noop WAL Option

 - **`engine.rs`, `opener.rs`, `wal.rs`, `entry_reader.rs`, `handle_write.rs`, `provider.rs`**:
   - Introduced a new `WalOptions::Noop` variant to handle scenarios where no write-ahead logging is required.
   - Implemented `NoopEntryReader` to provide a no-operation entry reader.
   - Updated logic to skip WAL operations for regions with `Noop` option.
   - Added `Provider::Noop` to handle `Noop` operations in the provider logic.

* feat/no-op-wal:
 ### Add `skip_wal` Option to Table Metadata

 - **Enhancements in `table_meta.rs`**:
   - Added a `skip_wal` parameter to the `create_wal_options` function to allow skipping WAL writes.
   - Updated the `create_table_route` function to utilize the `skip_wal` option from `table_info.meta.options`.

 - **Updates in `wal_options_allocator.rs`**:
   - Modified `alloc_batch` to handle the `skip_wal` flag, setting WAL options to `Noop` when true.
   - Added a test case `test_allocator_with_skip_wal` to verify the `skip_wal` functionality.

 - **Changes in `requests.rs`**:
   - Introduced `skip_wal` in `TableOptions` and added parsing logic.
   - Updated `TableOptions` display to include `skip_wal`.

 These changes introduce the ability to skip WAL writes for tables, enhancing flexibility in table metadata management.

* feat/no-op-wal:
 **Add WAL Option Handling and Table Option Validation**

 - **`handle_write.rs`**: Introduced a check for `WalOptions::Noop` in the `RegionWorkerLoop` to skip WAL writing for regions with this option.
 - **`requests.rs`**: Added `SKIP_WAL_KEY` to the list of valid table options for enhanced table configuration validation.

* feat/no-op-wal:
 ### Update WAL Options Allocation

 - **`key.rs`**: Modified the `allocate_region_wal_options` function to include an additional boolean parameter, enhancing the allocation logic.
 - **`wal_options_allocator.rs`**: Simplified the `test_allocator_with_skip_wal` test by removing unnecessary variable declarations and directly using `WalOptionsAllocator::RaftEngine`.

 These changes improve the flexibility and efficiency of WAL options allocation in the system.

* chore: reformat code

* feat/no-op-wal:
 **Enhancement:** Conditional Addition of `SKIP_WAL_KEY` in `requests.rs`

 - Updated `TableOptions` implementation in `requests.rs` to conditionally add `SKIP_WAL_KEY` to `key_vals` only when `self.skip_wal` is true, optimizing the key-value pair generation.

* feat/no-op-wal:
 Update `requests.rs` tests to reflect changes in `skip_wal` option

 - Modified test assertions in `requests.rs` to remove `skip_wal=false` from expected strings.
 - Added a new test case to verify `skip_wal=true` is correctly represented in `TableOptions`.

* feat/no-op-wal: Add Debug Logging and Improve Error Handling for WAL and Table Options

 • Introduced debug logging in wal.rs to skip obsolete regions, enhancing traceability.
 • Improved error handling in requests.rs by replacing warn with error propagation for invalid skip_wal values.
 • Added new test cases for skip_wal functionality, including SQL scripts and expected results, to ensure correct behavior and validation of the changes.
2025-03-26 07:53:52 +00:00
shuiyisong
f13a43647a chore: remove Transformer trait (#5772)
* chore: remove transformer trait

* chore: remove unnecessory generic
2025-03-26 02:53:30 +00:00
Zhenchi
7bcb01d269 feat: utilize blob metadata properties (#5767)
* feat: utilize blob metadata properties

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* Update src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-03-26 02:47:20 +00:00
Ruihang Xia
e81213728b feat: add/correct some kafka-related metrics (#5757)
* feat: add/correct some kafka-related metrics

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix dumb issues

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* per-partition produce latency

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-03-25 19:16:39 +00:00
Yingwen
d88482b996 feat: support explain analyze verbose (#5763)
* Add explain_verbose to QueryContext

* feat: fmt plan by display type

* feat: update proto to use ExplainOptions

* feat: display more info in verbose mode

* chore: fix clippy

* test: add sqlness test

* test: update sqlness result

* chore: update proto version

* chore: Simplify QueryContextBuilder::explain_options using get_or_insert_default
2025-03-25 03:48:36 +00:00
discord9
3b547d9d13 feat(flow): frontend client for handle sql (#5761)
* feat: frontend client for handle sql

* refactor: per review

* chore: revert unnecessary change
2025-03-25 02:26:04 +00:00
Yuhan Wang
278553fc3f docs: rfc for wal purge (#5475)
* docs: add rfc for wal purge

* docs: fix typo

* docs: follow name format

* chore: all in heartbeat

* fix: unneeded sentence in rfc

* chore: apply comments
2025-03-24 12:07:50 +00:00
Yuhan Wang
a36901a653 chore: ut and some fix (#5752)
* chore: ut and some fix

* fix: remove NOWAIT

* refactor: use param for meta lease ttl

* chore: feature gate

* chore: add comments

* chore: apply comments

* fix: advice by claude 3.7 sonnet

* chore: apply comments
2025-03-24 09:05:06 +00:00
discord9
c4ac242c69 fix: properly give placeholder types (#5760)
* fix: properly give placeholder types

* chore: update sqlness
2025-03-24 08:41:32 +00:00
fys
9f9307de73 refactor: make frontend instance clear (#5754)
* refactor: the startup of frontend

* remove unnecessary error type

* fix: cr

* remove unnecessary trait FrontendInstance

* fix: cr

* fix: cr

* adjust the startup order of services
2025-03-24 06:08:02 +00:00
shuiyisong
c77ce958a3 chore: support custom time index selector for identity pipeline (#5750)
* chore: minor refactor

* chore: minor refactor

* chore: support custom ts for identity pipeline

* chore: fix clippy

* chore: minor refactor & update tests

* chore: use ref on identity pipeline param
2025-03-24 04:27:22 +00:00
discord9
5ad2d8b3b8 fix: handle nullable default value (#5747)
* fix: handle nullable default value

* chore: update sqlness
2025-03-24 02:38:26 +00:00
Ruihang Xia
2724c3c142 feat: support regex in simple filter (#5753)
* feat: support regex in simple filter

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update sqlness result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Update src/common/recordbatch/src/filter.rs

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-03-24 02:10:42 +00:00
Weny Xu
4eb0771afe feat: introduce install_manifest_to for RegionManifestManager (#5742)
* feat: introduce `install_manifest_changes` for `RegionManifestManager`

* chore: rename function to `install_manifest_to`

* Apply suggestions from code review

Co-authored-by: jeremyhi <jiachun_feng@proton.me>

* chore: add comments

* chore: add comments

* chore: update logic and add comments

* chore: add more check

* Update src/mito2/src/manifest/manager.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

---------

Co-authored-by: jeremyhi <jiachun_feng@proton.me>
Co-authored-by: Yingwen <realevenyag@gmail.com>
2025-03-21 05:19:23 +00:00
Yohan Wal
a0739a96e4 fix: wrap table name with `` (#5748)
* fix: wrap table name with quotes

* fix: minor fix
2025-03-20 09:38:54 +00:00
Ning Sun
77ccf1eac8 chore: add datanode write rows to grafana dashboard (#5745) 2025-03-20 03:39:40 +00:00
Yohan Wal
1dc4a196bf feat: add mysql election logic (#5694)
* feat: add mysql election

* feat: add mysql election

* chore: fix deps

* chore: fix deps

* fix: duplicate container

* fix: duplicate setup for sqlness

* fix: call once

* fix: do not use NOWAIT for mysql 5.7

* chore: apply comments

* fix: no parallel sqlness for mysql

* chore: comments and minor revert

* chore: apply comments

* chore: apply comments

* chore: add  to table name

* ci: use 2 metasrv to detect election bugs

* refactor: better election logic

* chore: apply comments

* chore: apply comments

* feat: version check before startup
2025-03-19 11:31:18 +00:00
shuiyisong
2431cd3bdf chore: merge error files under pipeline crate (#5738) 2025-03-19 09:55:51 +00:00
discord9
cd730e0486 fix: mysql prepare limit&offset param (#5734)
* fix: prepare limit&offset param

* test: sqlness

* chore: per review

* chore: per review
2025-03-19 07:49:26 +00:00
zyy17
a19441bed8 refactor: remove trace id from primary key in opentelemetry_traces table (#5733)
* refactor: remove trace id in primary key

* refactor: remove trace id in primary key in v0 model

* refactor: add span id in v1

* fix: integration test
2025-03-19 06:17:58 +00:00
dennis zhuang
162e3b8620 docs: adds news to readme (#5735) 2025-03-19 01:33:46 +00:00
Wenbin
83642dab87 feat: remove duplicated peer definition (#5728)
* remove duplicate peer

* fix
2025-03-18 11:30:25 +00:00
discord9
46070958c9 fix: mysql prepare bool value (#5732) 2025-03-18 10:50:45 +00:00
pikady
eea8b1c730 feat: add vec_kth_elem function (#5674)
* feat: add vec_kth_elem function

Signed-off-by: pikady <2652917633@qq.com>

* code format

Signed-off-by: pikady <2652917633@qq.com>

* add test sql

Signed-off-by: pikady <2652917633@qq.com>

* change indexing from 1-based to 0-based

Signed-off-by: pikady <2652917633@qq.com>

* improve code formatting and correct spelling errors

Signed-off-by: pikady <2652917633@qq.com>

* Update tests/cases/standalone/common/function/vector/vector.sql

I noticed the two lines are identical. Could you clarify the reason for the change? Thanks!

Co-authored-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: pikady <2652917633@qq.com>
Co-authored-by: Zhenchi <zhongzc_arch@outlook.com>
2025-03-18 07:25:53 +00:00
Ning Sun
1ab4ddab8d feat: update pipeline header name to x-greptime-pipeline-name (#5710)
* feat: update pipeline header name to x-greptime-pipeline-name

* refactor: update string_value_from_header
2025-03-18 02:39:54 +00:00
Ning Sun
9e63018198 feat: disable http timeout (#5721)
* feat: update to disable http timeout by default

* feat: make http timeout default to 0

* test: correct test case

* chore: generate new config doc

* test: correct tests
2025-03-18 01:18:56 +00:00
discord9
594bec8c36 feat: load manifest manually in mito engine (#5725)
* feat: load manifest and some

* chore: per review
2025-03-18 01:18:08 +00:00
localhost
1586732d20 chore: add some method for log query handler (#5685)
* chore: add some method for log query handler

* chore: make clippy happy

* chore: add some method for log query handler

* Update src/frontend/src/instance/logs.rs

Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>

---------

Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>
2025-03-17 18:36:43 +00:00
yihong
16fddd97a7 chore: revert commit update flate2 version (#5706)" (#5715)
Revert "chore: update flate2 version (#5706)"

This reverts commit a5df3954f3.
2025-03-17 12:16:26 +00:00
Ning Sun
2260782c12 refactor: update jaeger api implementation for new trace modeling (#5655)
* refactor: update jaeger api implementation

* test: add tests for v1 data model

* feat: customize trace table name

* fix: update column requirements to use Column type instead of String

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: lint fix

* refactor: accumulate resource attributes for v1

* fix: add empty check for additional string

* feat: add table option to mark data model version

* fix: do not overwrite all tags

* feat: use table option to mark table data model version and process accordingly

* chore: update comments to reflect query changes

* feat: use header for jaeger table name

* feat: update index for service_name, drop index for span_name

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: zyy17 <zyylsxm@gmail.com>
2025-03-17 07:31:32 +00:00
385 changed files with 15128 additions and 6244 deletions

View File

@@ -47,7 +47,6 @@ runs:
shell: pwsh
run: make test sqlness-test
env:
RUSTUP_WINDOWS_PATH_ADD_BIN: 1 # Workaround for https://github.com/nextest-rs/nextest/issues/1493
RUST_BACKTRACE: 1
SQLNESS_OPTS: "--preserve-state"

View File

@@ -8,7 +8,7 @@ inputs:
default: 2
description: "Number of Datanode replicas"
meta-replicas:
default: 1
default: 2
description: "Number of Metasrv replicas"
image-registry:
default: "docker.io"

View File

@@ -576,9 +576,12 @@ jobs:
- name: "Remote WAL"
opts: "-w kafka -k 127.0.0.1:9092"
kafka: true
- name: "Pg Kvbackend"
- name: "PostgreSQL KvBackend"
opts: "--setup-pg"
kafka: false
- name: "MySQL Kvbackend"
opts: "--setup-mysql"
kafka: false
timeout-minutes: 60
steps:
- uses: actions/checkout@v4

View File

@@ -107,7 +107,6 @@ jobs:
CARGO_BUILD_RUSTFLAGS: "-C linker=lld-link"
RUST_BACKTRACE: 1
CARGO_INCREMENTAL: 0
RUSTUP_WINDOWS_PATH_ADD_BIN: 1 # Workaround for https://github.com/nextest-rs/nextest/issues/1493
GT_S3_BUCKET: ${{ vars.AWS_CI_TEST_BUCKET }}
GT_S3_ACCESS_KEY_ID: ${{ secrets.AWS_CI_TEST_ACCESS_KEY_ID }}
GT_S3_ACCESS_KEY: ${{ secrets.AWS_CI_TEST_SECRET_ACCESS_KEY }}

823
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -29,6 +29,7 @@ members = [
"src/common/query",
"src/common/recordbatch",
"src/common/runtime",
"src/common/session",
"src/common/substrait",
"src/common/telemetry",
"src/common/test-util",
@@ -88,7 +89,7 @@ rust.unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
#
# See for more detaiils: https://github.com/rust-lang/cargo/issues/11329
ahash = { version = "0.8", features = ["compile-time-rng"] }
aquamarine = "0.3"
aquamarine = "0.6"
arrow = { version = "53.0.0", features = ["prettyprint"] }
arrow-array = { version = "53.0.0", default-features = false, features = ["chrono-tz"] }
arrow-flight = "53.0"
@@ -99,9 +100,9 @@ async-trait = "0.1"
# Remember to update axum-extra, axum-macros when updating axum
axum = "0.8"
axum-extra = "0.10"
axum-macros = "0.4"
axum-macros = "0.5"
backon = "1"
base64 = "0.21"
base64 = "0.22"
bigdecimal = "0.4.2"
bitflags = "2.4.1"
bytemuck = "1.12"
@@ -111,7 +112,7 @@ chrono-tz = "0.10.1"
clap = { version = "4.4", features = ["derive"] }
config = "0.13.0"
crossbeam-utils = "0.8"
dashmap = "5.4"
dashmap = "6.1"
datafusion = { git = "https://github.com/apache/datafusion.git", rev = "2464703c84c400a09cc59277018813f0e797bb4e" }
datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "2464703c84c400a09cc59277018813f0e797bb4e" }
datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "2464703c84c400a09cc59277018813f0e797bb4e" }
@@ -121,32 +122,31 @@ datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", r
datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git", rev = "2464703c84c400a09cc59277018813f0e797bb4e" }
datafusion-sql = { git = "https://github.com/apache/datafusion.git", rev = "2464703c84c400a09cc59277018813f0e797bb4e" }
datafusion-substrait = { git = "https://github.com/apache/datafusion.git", rev = "2464703c84c400a09cc59277018813f0e797bb4e" }
deadpool = "0.10"
deadpool-postgres = "0.12"
derive_builder = "0.12"
deadpool = "0.12"
deadpool-postgres = "0.14"
derive_builder = "0.20"
dotenv = "0.15"
etcd-client = "0.14"
flate2 = { version = "1.1.0", default-features = false, features = ["zlib-rs"] }
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "c5419bbd20cb42e568ec325a4d71a3c94cc327e1" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "fb8e20ce29afd81835e3ea3c1164c8ce10de2c65" }
hex = "0.4"
http = "1"
humantime = "2.1"
humantime-serde = "1.1"
hyper = "1.1"
hyper-util = "0.1"
itertools = "0.10"
itertools = "0.14"
jsonb = { git = "https://github.com/databendlabs/jsonb.git", rev = "8c8d2fc294a39f3ff08909d60f718639cfba3875", default-features = false }
lazy_static = "1.4"
local-ip-address = "0.6"
loki-proto = { git = "https://github.com/GreptimeTeam/loki-proto.git", rev = "1434ecf23a2654025d86188fb5205e7a74b225d3" }
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "5618e779cf2bb4755b499c630fba4c35e91898cb" }
mockall = "0.11.4"
mockall = "0.13"
moka = "0.12"
nalgebra = "0.33"
notify = "6.1"
notify = "8.0"
num_cpus = "1.16"
once_cell = "1.18"
opentelemetry-proto = { version = "0.27", features = [
@@ -164,8 +164,8 @@ prometheus = { version = "0.13.3", features = ["process"] }
promql-parser = { version = "0.5", features = ["ser"] }
prost = "0.13"
raft-engine = { version = "0.4.1", default-features = false }
rand = "0.8"
ratelimit = "0.9"
rand = "0.9"
ratelimit = "0.10"
regex = "1.8"
regex-automata = "0.4"
reqwest = { version = "0.12", default-features = false, features = [
@@ -177,7 +177,7 @@ reqwest = { version = "0.12", default-features = false, features = [
rskafka = { git = "https://github.com/influxdata/rskafka.git", rev = "75535b5ad9bae4a5dbb582c82e44dfd81ec10105", features = [
"transport-tls",
] }
rstest = "0.21"
rstest = "0.25"
rstest_reuse = "0.7"
rust_decimal = "1.33"
rustc-hash = "2.0"
@@ -185,21 +185,24 @@ rustls = { version = "0.23.20", default-features = false } # override by patch,
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", features = ["float_roundtrip"] }
serde_with = "3"
shadow-rs = "0.38"
shadow-rs = "1.1"
simd-json = "0.15"
similar-asserts = "1.6.0"
smallvec = { version = "1", features = ["serde"] }
snafu = "0.8"
sqlx = { version = "0.8", features = [
"runtime-tokio-rustls",
"mysql",
"postgres",
"chrono",
] }
sysinfo = "0.30"
sysinfo = "0.33"
# on branch v0.52.x
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "71dd86058d2af97b9925093d40c4e03360403170", features = [
"visitor",
"serde",
] } # on branch v0.44.x
strum = { version = "0.25", features = ["derive"] }
strum = { version = "0.27", features = ["derive"] }
tempfile = "3"
tokio = { version = "1.40", features = ["full"] }
tokio-postgres = "0.7"
@@ -246,6 +249,7 @@ common-procedure-test = { path = "src/common/procedure-test" }
common-query = { path = "src/common/query" }
common-recordbatch = { path = "src/common/recordbatch" }
common-runtime = { path = "src/common/runtime" }
common-session = { path = "src/common/session" }
common-telemetry = { path = "src/common/telemetry" }
common-test-util = { path = "src/common/test-util" }
common-time = { path = "src/common/time" }

View File

@@ -6,7 +6,7 @@
</picture>
</p>
<h2 align="center">Unified & Cost-Effective Time Series Database for Metrics, Logs, and Events</h2>
<h2 align="center">Unified & Cost-Effective Observerability Database for Metrics, Logs, and Events</h2>
<div align="center">
<h3 align="center">
@@ -62,15 +62,19 @@
## Introduction
**GreptimeDB** is an open-source unified & cost-effective time-series database for **Metrics**, **Logs**, and **Events** (also **Traces** in plan). You can gain real-time insights from Edge to Cloud at Any Scale.
**GreptimeDB** is an open-source unified & cost-effective observerability database for **Metrics**, **Logs**, and **Events** (also **Traces** in plan). You can gain real-time insights from Edge to Cloud at Any Scale.
## News
**[GreptimeDB archives 1 billion cold run #1 in JSONBench!](https://greptime.com/blogs/2025-03-18-jsonbench-greptimedb-performance)**
## Why GreptimeDB
Our core developers have been building time-series data platforms for years. Based on our best practices, GreptimeDB was born to give you:
Our core developers have been building observerability data platforms for years. Based on our best practices, GreptimeDB was born to give you:
* **Unified Processing of Metrics, Logs, and Events**
GreptimeDB unifies time series data processing by treating all data - whether metrics, logs, or events - as timestamped events with context. Users can analyze this data using either [SQL](https://docs.greptime.com/user-guide/query-data/sql) or [PromQL](https://docs.greptime.com/user-guide/query-data/promql) and leverage stream processing ([Flow](https://docs.greptime.com/user-guide/flow-computation/overview)) to enable continuous aggregation. [Read more](https://docs.greptime.com/user-guide/concepts/data-model).
GreptimeDB unifies observerability data processing by treating all data - whether metrics, logs, or events - as timestamped events with context. Users can analyze this data using either [SQL](https://docs.greptime.com/user-guide/query-data/sql) or [PromQL](https://docs.greptime.com/user-guide/query-data/promql) and leverage stream processing ([Flow](https://docs.greptime.com/user-guide/flow-computation/overview)) to enable continuous aggregation. [Read more](https://docs.greptime.com/user-guide/concepts/data-model).
* **Cloud-native Distributed Database**

View File

@@ -12,7 +12,6 @@
| Key | Type | Default | Descriptions |
| --- | -----| ------- | ----------- |
| `mode` | String | `standalone` | The running mode of the datanode. It can be `standalone` or `distributed`. |
| `default_timezone` | String | Unset | The default timezone of the server. |
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.<br/>By default, it provides services after all regions have been initialized. |
| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |
@@ -24,7 +23,7 @@
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
| `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
| `http.enable_cors` | Bool | `true` | HTTP CORS support, it's turned on by default<br/>This allows browser to access http APIs without CORS restrictions |
| `http.cors_allowed_origins` | Array | Unset | Customize allowed origins for HTTP CORS. |
@@ -98,6 +97,7 @@
| `procedure` | -- | -- | Procedure storage options. |
| `procedure.max_retry_times` | Integer | `3` | Procedure max retry time. |
| `procedure.retry_delay` | String | `500ms` | Initial retry delay of procedures, increases exponentially |
| `procedure.max_running_procedures` | Integer | `128` | Max running procedures.<br/>The maximum number of procedures that can be running at the same time.<br/>If the number of running procedures exceeds this limit, the procedure will be rejected. |
| `flow` | -- | -- | flow engine options. |
| `flow.num_workers` | Integer | `0` | The number of flow worker in flownode.<br/>Not setting(or set to 0) this value will use the number of CPU cores divided by 2. |
| `storage` | -- | -- | The data storage options. |
@@ -222,7 +222,7 @@
| `heartbeat.retry_interval` | String | `3s` | Interval for retrying to send heartbeat messages to the metasrv. |
| `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
| `http.enable_cors` | Bool | `true` | HTTP CORS support, it's turned on by default<br/>This allows browser to access http APIs without CORS restrictions |
| `http.cors_allowed_origins` | Array | Unset | Customize allowed origins for HTTP CORS. |
@@ -328,6 +328,7 @@
| `procedure.max_retry_times` | Integer | `12` | Procedure max retry time. |
| `procedure.retry_delay` | String | `500ms` | Initial retry delay of procedures, increases exponentially |
| `procedure.max_metadata_value_size` | String | `1500KiB` | Auto split large value<br/>GreptimeDB procedure uses etcd as the default metadata storage backend.<br/>The etcd the maximum size of any request is 1.5 MiB<br/>1500KiB = 1536KiB (1.5MiB) - 36KiB (reserved size of key)<br/>Comments out the `max_metadata_value_size`, for don't split large value (no limit). |
| `procedure.max_running_procedures` | Integer | `128` | Max running procedures.<br/>The maximum number of procedures that can be running at the same time.<br/>If the number of running procedures exceeds this limit, the procedure will be rejected. |
| `failure_detector` | -- | -- | -- |
| `failure_detector.threshold` | Float | `8.0` | The threshold value used by the failure detector to determine failure conditions. |
| `failure_detector.min_std_deviation` | String | `100ms` | The minimum standard deviation of the heartbeat intervals, used to calculate acceptable variations. |
@@ -381,7 +382,6 @@
| Key | Type | Default | Descriptions |
| --- | -----| ------- | ----------- |
| `mode` | String | `standalone` | The running mode of the datanode. It can be `standalone` or `distributed`. |
| `node_id` | Integer | Unset | The datanode identifier and should be unique in the cluster. |
| `require_lease_before_startup` | Bool | `false` | Start services after regions have obtained leases.<br/>It will block the datanode start if it can't receive leases in the heartbeat from metasrv. |
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.<br/>By default, it provides services after all regions have been initialized. |
@@ -390,7 +390,7 @@
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. Enabled by default. |
| `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.bind_addr` | String | `127.0.0.1:3001` | The address to bind the gRPC server. |
@@ -551,7 +551,6 @@
| Key | Type | Default | Descriptions |
| --- | -----| ------- | ----------- |
| `mode` | String | `distributed` | The running mode of the flownode. It can be `standalone` or `distributed`. |
| `node_id` | Integer | Unset | The flownode identifier and should be unique in the cluster. |
| `flow` | -- | -- | flow engine options. |
| `flow.num_workers` | Integer | `0` | The number of flow worker in flownode.<br/>Not setting(or set to 0) this value will use the number of CPU cores divided by 2. |
@@ -563,7 +562,7 @@
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
| `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
| `meta_client` | -- | -- | The metasrv client options. |
| `meta_client.metasrv_addrs` | Array | -- | The addresses of the metasrv. |

View File

@@ -1,6 +1,3 @@
## The running mode of the datanode. It can be `standalone` or `distributed`.
mode = "standalone"
## The datanode identifier and should be unique in the cluster.
## @toml2docs:none-default
node_id = 42
@@ -27,7 +24,7 @@ max_concurrent_queries = 0
## The address to bind the HTTP server.
addr = "127.0.0.1:4000"
## HTTP request timeout. Set to 0 to disable timeout.
timeout = "30s"
timeout = "0s"
## HTTP request body limit.
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
## Set to 0 to disable limit.

View File

@@ -1,6 +1,3 @@
## The running mode of the flownode. It can be `standalone` or `distributed`.
mode = "distributed"
## The flownode identifier and should be unique in the cluster.
## @toml2docs:none-default
node_id = 14
@@ -30,7 +27,7 @@ max_send_message_size = "512MB"
## The address to bind the HTTP server.
addr = "127.0.0.1:4000"
## HTTP request timeout. Set to 0 to disable timeout.
timeout = "30s"
timeout = "0s"
## HTTP request body limit.
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
## Set to 0 to disable limit.

View File

@@ -26,7 +26,7 @@ retry_interval = "3s"
## The address to bind the HTTP server.
addr = "127.0.0.1:4000"
## HTTP request timeout. Set to 0 to disable timeout.
timeout = "30s"
timeout = "0s"
## HTTP request body limit.
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
## Set to 0 to disable limit.

View File

@@ -79,6 +79,11 @@ retry_delay = "500ms"
## Comments out the `max_metadata_value_size`, for don't split large value (no limit).
max_metadata_value_size = "1500KiB"
## Max running procedures.
## The maximum number of procedures that can be running at the same time.
## If the number of running procedures exceeds this limit, the procedure will be rejected.
max_running_procedures = 128
# Failure detectors options.
[failure_detector]

View File

@@ -1,6 +1,3 @@
## The running mode of the datanode. It can be `standalone` or `distributed`.
mode = "standalone"
## The default timezone of the server.
## @toml2docs:none-default
default_timezone = "UTC"
@@ -34,7 +31,7 @@ max_concurrent_queries = 0
## The address to bind the HTTP server.
addr = "127.0.0.1:4000"
## HTTP request timeout. Set to 0 to disable timeout.
timeout = "30s"
timeout = "0s"
## HTTP request body limit.
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
## Set to 0 to disable limit.
@@ -302,6 +299,10 @@ purge_interval = "1m"
max_retry_times = 3
## Initial retry delay of procedures, increases exponentially
retry_delay = "500ms"
## Max running procedures.
## The maximum number of procedures that can be running at the same time.
## If the number of running procedures exceeds this limit, the procedure will be rejected.
max_running_procedures = 128
## flow engine options.
[flow]

View File

@@ -3,7 +3,7 @@
This document introduces how to write fuzz tests in GreptimeDB.
## What is a fuzz test
Fuzz test is tool that leverage deterministic random generation to assist in finding bugs. The goal of fuzz tests is to identify inputs generated by the fuzzer that cause system panics, crashes, or unexpected behaviors to occur. And we are using the [cargo-fuzz](https://github.com/rust-fuzz/cargo-fuzz) to run our fuzz test targets.
Fuzz test is tool that leverage deterministic random generation to assist in finding bugs. The goal of fuzz tests is to identify inputs generated by the fuzzer that cause system panics, crashes, or unexpected behaviors to occur. And we are using the [cargo-fuzz](https://github.com/rust-fuzz/cargo-fuzz) to run our fuzz test targets.
## Why we need them
- Find bugs by leveraging random generation
@@ -13,7 +13,7 @@ Fuzz test is tool that leverage deterministic random generation to assist in fin
All fuzz test-related resources are located in the `/tests-fuzz` directory.
There are two types of resources: (1) fundamental components and (2) test targets.
### Fundamental components
### Fundamental components
They are located in the `/tests-fuzz/src` directory. The fundamental components define how to generate SQLs (including dialects for different protocols) and validate execution results (e.g., column attribute validation), etc.
### Test targets
@@ -21,25 +21,25 @@ They are located in the `/tests-fuzz/targets` directory, with each file represen
Figure 1 illustrates the fundamental components of the fuzz test provide the ability to generate random SQLs. It utilizes a Random Number Generator (Rng) to generate the Intermediate Representation (IR), then employs a DialectTranslator to produce specified dialects for different protocols. Finally, the fuzz tests send the generated SQL via the specified protocol and verify that the execution results meet expectations.
```
Rng
|
|
v
ExprGenerator
|
|
v
Intermediate representation (IR)
|
|
+----------------------+----------------------+
| | |
v v v
Rng
|
|
v
ExprGenerator
|
|
v
Intermediate representation (IR)
|
|
+----------------------+----------------------+
| | |
v v v
MySQLTranslator PostgreSQLTranslator OtherDialectTranslator
| | |
| | |
v v v
SQL(MySQL Dialect) ..... .....
| | |
| | |
v v v
SQL(MySQL Dialect) ..... .....
|
|
v
@@ -133,4 +133,4 @@ fuzz_target!(|input: FuzzInput| {
cargo fuzz run <fuzz-target> --fuzz-dir tests-fuzz
```
For more details, please refer to this [document](/tests-fuzz/README.md).
For more details, please refer to this [document](/tests-fuzz/README.md).

View File

@@ -0,0 +1,77 @@
---
Feature Name: Remote WAL Purge
Tracking Issue: https://github.com/GreptimeTeam/greptimedb/issues/5474
Date: 2025-02-06
Author: "Yuhan Wang <profsyb@gmail.com>"
---
# Summary
This RFC proposes a method for purging remote WAL in the database.
# Motivation
Currently only local wal entries are purged when flushing, while remote wal does nothing.
# Details
```mermaid
sequenceDiagram
Region0->>Kafka: Last entry id of the topic in use
Region0->>WALPruner: Heartbeat with last entry id
WALPruner->>+WALPruner: Time Loop
WALPruner->>+ProcedureManager: Submit purge procedure
ProcedureManager->>Region0: Flush request
ProcedureManager->>Kafka: Prune WAL entries
Region0->>Region0: Flush
```
## Steps
### Before purge
Before purging remote WAL, metasrv needs to know:
1. `last_entry_id` of each region.
2. `kafka_topic_last_entry_id` which is the last entry id of the topic in use. Can be lazily updated and needed when region has empty memtable.
3. Kafka topics that each region uses.
The states are maintained through:
1. Heartbeat: Datanode sends `last_entry_id` to metasrv in heartbeat. As for regions with empty memtable, `last_entry_id` should equals to `kafka_topic_last_entry_id`.
2. Metasrv maintains a topic-region map to know which region uses which topic.
`kafka_topic_last_entry_id` will be maintained by the region itself. Region will update the value after `k` heartbeats if the memtable is empty.
### Purge procedure
We can better handle locks utilizing current procedure. It's quite similar to the region migration procedure.
After a period of time, metasrv will submit a purge procedure to ProcedureManager. The purge will apply to all topics.
The procedure is divided into following stages:
1. Preparation:
- Retrieve `last_entry_id` of each region kvbackend.
- Choose regions that have a relatively small `last_entry_id` as candidate regions, which means we need to send a flush request to these regions.
2. Communication:
- Send flush requests to candidate regions.
3. Purge:
- Choose proper entry id to delete for each topic. The entry should be the smallest `last_entry_id - 1` among all regions.
- Delete legacy entries in Kafka.
- Store the `last_purged_entry_id` in kvbackend. It should be locked to prevent other regions from replaying the purged entries.
### After purge
After purge, there may be some regions that have `last_entry_id` smaller than the entry we just deleted. It's legal since we only delete the entries that are not needed anymore.
When restarting a region, it should query the `last_purged_entry_id` from metasrv and replay from `min(last_entry_id, last_purged_entry_id)`.
### Error handling
No persisted states are needed since all states are maintained in kvbackend.
Retry when failed to retrieving metadata from kvbackend.
# Alternatives
Purge time can depend on the size of the WAL entries instead of a fixed period of time, which may be more efficient.

View File

@@ -4782,7 +4782,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "Current counts for stalled write requests by instance\n\nWrite stalls when memtable is full and pending for flush\n\n",
"description": "Ingestion size by row counts.",
"fieldConfig": {
"defaults": {
"color": {
@@ -4844,7 +4844,7 @@
"x": 12,
"y": 138
},
"id": 221,
"id": 277,
"options": {
"legend": {
"calcs": [],
@@ -4864,14 +4864,14 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "sum by(pod) (greptime_mito_write_stall_total{pod=~\"$datanode\"})",
"expr": "rate(greptime_mito_write_rows_total{pod=~\"$datanode\"}[$__rate_interval])",
"instant": false,
"legendFormat": "{{pod}}",
"range": true,
"refId": "A"
}
],
"title": "Write Stall per Instance",
"title": "Write Rows per Instance",
"type": "timeseries"
},
{
@@ -4976,7 +4976,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "Cache size by instance.\n",
"description": "Current counts for stalled write requests by instance\n\nWrite stalls when memtable is full and pending for flush\n\n",
"fieldConfig": {
"defaults": {
"color": {
@@ -5028,7 +5028,7 @@
}
]
},
"unit": "decbytes"
"unit": "none"
},
"overrides": []
},
@@ -5038,7 +5038,7 @@
"x": 12,
"y": 146
},
"id": 229,
"id": 221,
"options": {
"legend": {
"calcs": [],
@@ -5058,14 +5058,14 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "greptime_mito_cache_bytes{pod=~\"$datanode\"}",
"expr": "sum by(pod) (greptime_mito_write_stall_total{pod=~\"$datanode\"})",
"instant": false,
"legendFormat": "{{pod}}-{{type}}",
"legendFormat": "{{pod}}",
"range": true,
"refId": "A"
}
],
"title": "Cached Bytes per Instance",
"title": "Write Stall per Instance",
"type": "timeseries"
},
{
@@ -5172,7 +5172,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "P99 latency of each type of reads by instance",
"description": "Cache size by instance.\n",
"fieldConfig": {
"defaults": {
"color": {
@@ -5224,7 +5224,7 @@
}
]
},
"unit": "s"
"unit": "decbytes"
},
"overrides": []
},
@@ -5234,17 +5234,13 @@
"x": 12,
"y": 154
},
"id": 228,
"id": 229,
"options": {
"legend": {
"calcs": [
"lastNotNull"
],
"calcs": [],
"displayMode": "table",
"placement": "bottom",
"showLegend": true,
"sortBy": "Last *",
"sortDesc": true
"showLegend": true
},
"tooltip": {
"mode": "single",
@@ -5258,14 +5254,14 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.99, sum by(pod, le, stage) (rate(greptime_mito_read_stage_elapsed_bucket{pod=~\"$datanode\"}[$__rate_interval])))",
"expr": "greptime_mito_cache_bytes{pod=~\"$datanode\"}",
"instant": false,
"legendFormat": "{{pod}}-{{stage}}-p99",
"legendFormat": "{{pod}}-{{type}}",
"range": true,
"refId": "A"
}
],
"title": "Read Stage P99 per Instance",
"title": "Cached Bytes per Instance",
"type": "timeseries"
},
{
@@ -5317,7 +5313,8 @@
"mode": "absolute",
"steps": [
{
"color": "green"
"color": "green",
"value": null
},
{
"color": "red",
@@ -5370,7 +5367,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "Latency of compaction task, at p99",
"description": "P99 latency of each type of reads by instance",
"fieldConfig": {
"defaults": {
"color": {
@@ -5414,7 +5411,8 @@
"mode": "absolute",
"steps": [
{
"color": "green"
"color": "green",
"value": null
},
{
"color": "red",
@@ -5432,7 +5430,7 @@
"x": 12,
"y": 162
},
"id": 230,
"id": 228,
"options": {
"legend": {
"calcs": [
@@ -5440,7 +5438,9 @@
],
"displayMode": "table",
"placement": "bottom",
"showLegend": true
"showLegend": true,
"sortBy": "Last *",
"sortDesc": true
},
"tooltip": {
"mode": "single",
@@ -5454,14 +5454,14 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.99, sum by(pod, le) (rate(greptime_mito_compaction_total_elapsed_bucket{pod=~\"$datanode\"}[$__rate_interval])))",
"expr": "histogram_quantile(0.99, sum by(pod, le, stage) (rate(greptime_mito_read_stage_elapsed_bucket{pod=~\"$datanode\"}[$__rate_interval])))",
"instant": false,
"legendFormat": "[{{pod}}]-compaction-p99",
"legendFormat": "{{pod}}-{{stage}}-p99",
"range": true,
"refId": "A"
}
],
"title": "Compaction P99 per Instance",
"title": "Read Stage P99 per Instance",
"type": "timeseries"
},
{
@@ -5570,7 +5570,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "Compaction latency by stage",
"description": "Latency of compaction task, at p99",
"fieldConfig": {
"defaults": {
"color": {
@@ -5632,7 +5632,7 @@
"x": 12,
"y": 170
},
"id": 232,
"id": 230,
"options": {
"legend": {
"calcs": [
@@ -5654,9 +5654,9 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.99, sum by(pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_bucket{pod=~\"$datanode\"}[$__rate_interval])))",
"expr": "histogram_quantile(0.99, sum by(pod, le) (rate(greptime_mito_compaction_total_elapsed_bucket{pod=~\"$datanode\"}[$__rate_interval])))",
"instant": false,
"legendFormat": "{{pod}}-{{stage}}-p99",
"legendFormat": "[{{pod}}]-compaction-p99",
"range": true,
"refId": "A"
}
@@ -5794,7 +5794,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "Write-ahead log operations latency at p99",
"description": "Compaction latency by stage",
"fieldConfig": {
"defaults": {
"color": {
@@ -5856,13 +5856,13 @@
"x": 12,
"y": 178
},
"id": 269,
"id": 232,
"options": {
"legend": {
"calcs": [
"lastNotNull"
],
"displayMode": "list",
"displayMode": "table",
"placement": "bottom",
"showLegend": true
},
@@ -5878,14 +5878,14 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.99, sum by(le,logstore,optype,pod) (rate(greptime_logstore_op_elapsed_bucket[$__rate_interval])))",
"expr": "histogram_quantile(0.99, sum by(pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_bucket{pod=~\"$datanode\"}[$__rate_interval])))",
"instant": false,
"legendFormat": "{{pod}}-{{logstore}}-{{optype}}-p99",
"legendFormat": "{{pod}}-{{stage}}-p99",
"range": true,
"refId": "A"
}
],
"title": "Log Store op duration seconds",
"title": "Compaction P99 per Instance",
"type": "timeseries"
},
{
@@ -5993,7 +5993,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "Ongoing compaction task count",
"description": "Write-ahead log operations latency at p99",
"fieldConfig": {
"defaults": {
"color": {
@@ -6045,7 +6045,7 @@
}
]
},
"unit": "none"
"unit": "s"
},
"overrides": []
},
@@ -6055,13 +6055,13 @@
"x": 12,
"y": 186
},
"id": 271,
"id": 269,
"options": {
"legend": {
"calcs": [
"lastNotNull"
],
"displayMode": "table",
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
@@ -6078,14 +6078,14 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "greptime_mito_inflight_compaction_count",
"expr": "histogram_quantile(0.99, sum by(le,logstore,optype,pod) (rate(greptime_logstore_op_elapsed_bucket[$__rate_interval])))",
"instant": false,
"legendFormat": "{{pod}}",
"legendFormat": "{{pod}}-{{logstore}}-{{optype}}-p99",
"range": true,
"refId": "A"
}
],
"title": "Inflight Compaction",
"title": "Log Store op duration seconds",
"type": "timeseries"
},
{
@@ -6188,6 +6188,105 @@
"title": "Inflight Flush",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "Ongoing compaction task count",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "points",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
},
"unit": "none"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 194
},
"id": 271,
"options": {
"legend": {
"calcs": [
"lastNotNull"
],
"displayMode": "table",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "11.1.3",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "greptime_mito_inflight_compaction_count",
"instant": false,
"legendFormat": "{{pod}}",
"range": true,
"refId": "A"
}
],
"title": "Inflight Compaction",
"type": "timeseries"
},
{
"collapsed": false,
"gridPos": {

View File

@@ -15,10 +15,13 @@
use std::collections::HashMap;
use datatypes::schema::{
ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextOptions, SkippingIndexOptions,
SkippingIndexType, COMMENT_KEY, FULLTEXT_KEY, INVERTED_INDEX_KEY, SKIPPING_INDEX_KEY,
ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextBackend, FulltextOptions,
SkippingIndexOptions, SkippingIndexType, COMMENT_KEY, FULLTEXT_KEY, INVERTED_INDEX_KEY,
SKIPPING_INDEX_KEY,
};
use greptime_proto::v1::{
Analyzer, FulltextBackend as PbFulltextBackend, SkippingIndexType as PbSkippingIndexType,
};
use greptime_proto::v1::{Analyzer, SkippingIndexType as PbSkippingIndexType};
use snafu::ResultExt;
use crate::error::{self, Result};
@@ -142,13 +145,21 @@ pub fn options_from_inverted() -> ColumnOptions {
}
/// Tries to construct a `FulltextAnalyzer` from the given analyzer.
pub fn as_fulltext_option(analyzer: Analyzer) -> FulltextAnalyzer {
pub fn as_fulltext_option_analyzer(analyzer: Analyzer) -> FulltextAnalyzer {
match analyzer {
Analyzer::English => FulltextAnalyzer::English,
Analyzer::Chinese => FulltextAnalyzer::Chinese,
}
}
/// Tries to construct a `FulltextBackend` from the given backend.
pub fn as_fulltext_option_backend(backend: PbFulltextBackend) -> FulltextBackend {
match backend {
PbFulltextBackend::Bloom => FulltextBackend::Bloom,
PbFulltextBackend::Tantivy => FulltextBackend::Tantivy,
}
}
/// Tries to construct a `SkippingIndexType` from the given skipping index type.
pub fn as_skipping_index_type(skipping_index_type: PbSkippingIndexType) -> SkippingIndexType {
match skipping_index_type {
@@ -160,7 +171,7 @@ pub fn as_skipping_index_type(skipping_index_type: PbSkippingIndexType) -> Skipp
mod tests {
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::FulltextAnalyzer;
use datatypes::schema::{FulltextAnalyzer, FulltextBackend};
use super::*;
use crate::v1::ColumnDataType;
@@ -219,13 +230,14 @@ mod tests {
enable: true,
analyzer: FulltextAnalyzer::English,
case_sensitive: false,
backend: FulltextBackend::Bloom,
})
.unwrap();
schema.set_inverted_index(true);
let options = options_from_column_schema(&schema).unwrap();
assert_eq!(
options.options.get(FULLTEXT_GRPC_KEY).unwrap(),
"{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false}"
"{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false,\"backend\":\"bloom\"}"
);
assert_eq!(
options.options.get(INVERTED_INDEX_GRPC_KEY).unwrap(),
@@ -239,11 +251,12 @@ mod tests {
enable: true,
analyzer: FulltextAnalyzer::English,
case_sensitive: false,
backend: FulltextBackend::Bloom,
};
let options = options_from_fulltext(&fulltext).unwrap().unwrap();
assert_eq!(
options.options.get(FULLTEXT_GRPC_KEY).unwrap(),
"{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false}"
"{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false,\"backend\":\"bloom\"}"
);
}

View File

@@ -19,7 +19,7 @@ mod information_memory_table;
pub mod key_column_usage;
mod partitions;
mod procedure_info;
mod region_peers;
pub mod region_peers;
mod region_statistics;
mod runtime_metrics;
pub mod schemata;

View File

@@ -56,6 +56,8 @@ pub const TABLE_CATALOG: &str = "table_catalog";
pub const TABLE_SCHEMA: &str = "table_schema";
pub const TABLE_NAME: &str = "table_name";
pub const COLUMN_NAME: &str = "column_name";
pub const REGION_ID: &str = "region_id";
pub const PEER_ID: &str = "peer_id";
const ORDINAL_POSITION: &str = "ordinal_position";
const CHARACTER_MAXIMUM_LENGTH: &str = "character_maximum_length";
const CHARACTER_OCTET_LENGTH: &str = "character_octet_length";

View File

@@ -21,6 +21,7 @@ use common_error::ext::BoxedError;
use common_meta::rpc::router::RegionRoute;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::common::HashMap;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
@@ -43,16 +44,22 @@ use crate::kvbackend::KvBackendCatalogManager;
use crate::system_schema::information_schema::{InformationTable, Predicates};
use crate::CatalogManager;
const REGION_ID: &str = "region_id";
const PEER_ID: &str = "peer_id";
pub const TABLE_CATALOG: &str = "table_catalog";
pub const TABLE_SCHEMA: &str = "table_schema";
pub const TABLE_NAME: &str = "table_name";
pub const REGION_ID: &str = "region_id";
pub const PEER_ID: &str = "peer_id";
const PEER_ADDR: &str = "peer_addr";
const IS_LEADER: &str = "is_leader";
pub const IS_LEADER: &str = "is_leader";
const STATUS: &str = "status";
const DOWN_SECONDS: &str = "down_seconds";
const INIT_CAPACITY: usize = 42;
/// The `REGION_PEERS` table provides information about the region distribution and routes. Including fields:
///
/// - `table_catalog`: the table catalog name
/// - `table_schema`: the table schema name
/// - `table_name`: the table name
/// - `region_id`: the region id
/// - `peer_id`: the region storage datanode peer id
/// - `peer_addr`: the region storage datanode gRPC peer address
@@ -77,6 +84,9 @@ impl InformationSchemaRegionPeers {
pub(crate) fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![
ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(REGION_ID, ConcreteDataType::uint64_datatype(), false),
ColumnSchema::new(PEER_ID, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(PEER_ADDR, ConcreteDataType::string_datatype(), true),
@@ -134,6 +144,9 @@ struct InformationSchemaRegionPeersBuilder {
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
table_catalogs: StringVectorBuilder,
table_schemas: StringVectorBuilder,
table_names: StringVectorBuilder,
region_ids: UInt64VectorBuilder,
peer_ids: UInt64VectorBuilder,
peer_addrs: StringVectorBuilder,
@@ -152,6 +165,9 @@ impl InformationSchemaRegionPeersBuilder {
schema,
catalog_name,
catalog_manager,
table_catalogs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
table_schemas: StringVectorBuilder::with_capacity(INIT_CAPACITY),
table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
region_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
peer_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
peer_addrs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
@@ -177,24 +193,28 @@ impl InformationSchemaRegionPeersBuilder {
let predicates = Predicates::from_scan_request(&request);
for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
let table_id_stream = catalog_manager
let table_stream = catalog_manager
.tables(&catalog_name, &schema_name, None)
.try_filter_map(|t| async move {
let table_info = t.table_info();
if table_info.table_type == TableType::Temporary {
Ok(None)
} else {
Ok(Some(table_info.ident.table_id))
Ok(Some((
table_info.ident.table_id,
table_info.name.to_string(),
)))
}
});
const BATCH_SIZE: usize = 128;
// Split table ids into chunks
let mut table_id_chunks = pin!(table_id_stream.ready_chunks(BATCH_SIZE));
// Split tables into chunks
let mut table_chunks = pin!(table_stream.ready_chunks(BATCH_SIZE));
while let Some(table_ids) = table_id_chunks.next().await {
let table_ids = table_ids.into_iter().collect::<Result<Vec<_>>>()?;
while let Some(tables) = table_chunks.next().await {
let tables = tables.into_iter().collect::<Result<HashMap<_, _>>>()?;
let table_ids = tables.keys().cloned().collect::<Vec<_>>();
let table_routes = if let Some(partition_manager) = &partition_manager {
partition_manager
@@ -206,7 +226,16 @@ impl InformationSchemaRegionPeersBuilder {
};
for (table_id, routes) in table_routes {
self.add_region_peers(&predicates, table_id, &routes);
// Safety: table_id is guaranteed to be in the map
let table_name = tables.get(&table_id).unwrap();
self.add_region_peers(
&catalog_name,
&schema_name,
table_name,
&predicates,
table_id,
&routes,
);
}
}
}
@@ -216,6 +245,9 @@ impl InformationSchemaRegionPeersBuilder {
fn add_region_peers(
&mut self,
table_catalog: &str,
table_schema: &str,
table_name: &str,
predicates: &Predicates,
table_id: TableId,
routes: &[RegionRoute],
@@ -231,13 +263,20 @@ impl InformationSchemaRegionPeersBuilder {
Some("ALIVE".to_string())
};
let row = [(REGION_ID, &Value::from(region_id))];
let row = [
(TABLE_CATALOG, &Value::from(table_catalog)),
(TABLE_SCHEMA, &Value::from(table_schema)),
(TABLE_NAME, &Value::from(table_name)),
(REGION_ID, &Value::from(region_id)),
];
if !predicates.eval(&row) {
return;
}
// TODO(dennis): adds followers.
self.table_catalogs.push(Some(table_catalog));
self.table_schemas.push(Some(table_schema));
self.table_names.push(Some(table_name));
self.region_ids.push(Some(region_id));
self.peer_ids.push(peer_id);
self.peer_addrs.push(peer_addr.as_deref());
@@ -245,11 +284,26 @@ impl InformationSchemaRegionPeersBuilder {
self.statuses.push(state.as_deref());
self.down_seconds
.push(route.leader_down_millis().map(|m| m / 1000));
for follower in &route.follower_peers {
self.table_catalogs.push(Some(table_catalog));
self.table_schemas.push(Some(table_schema));
self.table_names.push(Some(table_name));
self.region_ids.push(Some(region_id));
self.peer_ids.push(Some(follower.id));
self.peer_addrs.push(Some(follower.addr.as_str()));
self.is_leaders.push(Some("No"));
self.statuses.push(None);
self.down_seconds.push(None);
}
}
}
fn finish(&mut self) -> Result<RecordBatch> {
let columns: Vec<VectorRef> = vec![
Arc::new(self.table_catalogs.finish()),
Arc::new(self.table_schemas.finish()),
Arc::new(self.table_names.finish()),
Arc::new(self.region_ids.finish()),
Arc::new(self.peer_ids.finish()),
Arc::new(self.peer_addrs.finish()),

View File

@@ -177,7 +177,7 @@ fn create_table_info(table_id: TableId, table_name: TableName) -> RawTableInfo {
fn create_region_routes(regions: Vec<RegionNumber>) -> Vec<RegionRoute> {
let mut region_routes = Vec::with_capacity(100);
let mut rng = rand::thread_rng();
let mut rng = rand::rng();
for region_id in regions.into_iter().map(u64::from) {
region_routes.push(RegionRoute {
@@ -188,7 +188,7 @@ fn create_region_routes(regions: Vec<RegionNumber>) -> Vec<RegionRoute> {
attrs: BTreeMap::new(),
},
leader_peer: Some(Peer {
id: rng.gen_range(0..10),
id: rng.random_range(0..10),
addr: String::new(),
}),
follower_peers: vec![],

View File

@@ -16,7 +16,6 @@
mod client;
pub mod client_manager;
#[cfg(feature = "testing")]
mod database;
pub mod error;
pub mod flow;
@@ -34,7 +33,6 @@ pub use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
use snafu::OptionExt;
pub use self::client::Client;
#[cfg(feature = "testing")]
pub use self::database::Database;
pub use self::error::{Error, Result};
use crate::error::{IllegalDatabaseResponseSnafu, ServerSnafu};

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use enum_dispatch::enum_dispatch;
use rand::seq::SliceRandom;
use rand::seq::IndexedRandom;
#[enum_dispatch]
pub trait LoadBalance {
@@ -37,7 +37,7 @@ pub struct Random;
impl LoadBalance for Random {
fn get_peer<'a>(&self, peers: &'a [String]) -> Option<&'a String> {
peers.choose(&mut rand::thread_rng())
peers.choose(&mut rand::rng())
}
}

View File

@@ -30,7 +30,7 @@ use datanode::datanode::{Datanode, DatanodeBuilder};
use datanode::service::DatanodeServiceBuilder;
use meta_client::{MetaClientOptions, MetaClientType};
use servers::Mode;
use snafu::{OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{
@@ -223,15 +223,14 @@ impl StartCommand {
.get_or_insert_with(MetaClientOptions::default)
.metasrv_addrs
.clone_from(metasrv_addrs);
opts.mode = Mode::Distributed;
}
if let (Mode::Distributed, None) = (&opts.mode, &opts.node_id) {
return MissingConfigSnafu {
msg: "Missing node id option",
ensure!(
opts.node_id.is_some(),
MissingConfigSnafu {
msg: "Missing node id option"
}
.fail();
}
);
if let Some(data_home) = &self.data_home {
opts.storage.data_home.clone_from(data_home);
@@ -295,10 +294,13 @@ impl StartCommand {
msg: "'meta_client_options'",
})?;
let meta_client =
meta_client::create_meta_client(MetaClientType::Datanode { member_id }, meta_config)
.await
.context(MetaClientInitSnafu)?;
let meta_client = meta_client::create_meta_client(
MetaClientType::Datanode { member_id },
meta_config,
None,
)
.await
.context(MetaClientInitSnafu)?;
let meta_backend = Arc::new(MetaKvBackend {
client: meta_client.clone(),
@@ -311,7 +313,7 @@ impl StartCommand {
.build(),
);
let mut datanode = DatanodeBuilder::new(opts.clone(), plugins)
let mut datanode = DatanodeBuilder::new(opts.clone(), plugins, Mode::Distributed)
.with_meta_client(meta_client)
.with_kv_backend(meta_backend)
.with_cache_registry(layered_cache_registry)
@@ -333,6 +335,7 @@ impl StartCommand {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::io::Write;
use std::time::Duration;
@@ -340,7 +343,6 @@ mod tests {
use common_test_util::temp_dir::create_named_temp_file;
use datanode::config::{FileConfig, GcsConfig, ObjectStoreConfig, S3Config};
use servers::heartbeat_options::HeartbeatOptions;
use servers::Mode;
use super::*;
use crate::options::GlobalOptions;
@@ -491,22 +493,6 @@ mod tests {
#[test]
fn test_try_from_cmd() {
let opt = StartCommand::default()
.load_options(&GlobalOptions::default())
.unwrap()
.component;
assert_eq!(Mode::Standalone, opt.mode);
let opt = (StartCommand {
node_id: Some(42),
metasrv_addrs: Some(vec!["127.0.0.1:3002".to_string()]),
..Default::default()
})
.load_options(&GlobalOptions::default())
.unwrap()
.component;
assert_eq!(Mode::Distributed, opt.mode);
assert!((StartCommand {
metasrv_addrs: Some(vec!["127.0.0.1:3002".to_string()]),
..Default::default()
@@ -525,7 +511,19 @@ mod tests {
#[test]
fn test_load_log_options_from_cli() {
let cmd = StartCommand::default();
let mut cmd = StartCommand::default();
let result = cmd.load_options(&GlobalOptions {
log_dir: Some("./greptimedb_data/test/logs".to_string()),
log_level: Some("debug".to_string()),
#[cfg(feature = "tokio-console")]
tokio_console_addr: None,
});
// Missing node_id.
assert_matches!(result, Err(crate::error::Error::MissingConfig { .. }));
cmd.node_id = Some(42);
let options = cmd
.load_options(&GlobalOptions {

View File

@@ -100,6 +100,13 @@ pub enum Error {
source: flow::Error,
},
#[snafu(display("Servers error"))]
Servers {
#[snafu(implicit)]
location: Location,
source: servers::error::Error,
},
#[snafu(display("Failed to start frontend"))]
StartFrontend {
#[snafu(implicit)]
@@ -365,6 +372,7 @@ impl ErrorExt for Error {
Error::ShutdownFrontend { source, .. } => source.status_code(),
Error::StartMetaServer { source, .. } => source.status_code(),
Error::ShutdownMetaServer { source, .. } => source.status_code(),
Error::Servers { source, .. } => source.status_code(),
Error::BuildMetaServer { source, .. } => source.status_code(),
Error::UnsupportedSelectorType { source, .. } => source.status_code(),
Error::BuildCli { source, .. } => source.status_code(),

View File

@@ -34,8 +34,7 @@ use common_telemetry::logging::TracingOptions;
use common_version::{short_version, version};
use flow::{FlownodeBuilder, FlownodeInstance, FrontendInvoker};
use meta_client::{MetaClientOptions, MetaClientType};
use servers::Mode;
use snafu::{OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{
@@ -203,7 +202,6 @@ impl StartCommand {
.get_or_insert_with(MetaClientOptions::default)
.metasrv_addrs
.clone_from(metasrv_addrs);
opts.mode = Mode::Distributed;
}
if let Some(http_addr) = &self.http_addr {
@@ -214,12 +212,12 @@ impl StartCommand {
opts.http.timeout = Duration::from_secs(http_timeout);
}
if let (Mode::Distributed, None) = (&opts.mode, &opts.node_id) {
return MissingConfigSnafu {
msg: "Missing node id option",
ensure!(
opts.node_id.is_some(),
MissingConfigSnafu {
msg: "Missing node id option"
}
.fail();
}
);
Ok(())
}
@@ -249,10 +247,13 @@ impl StartCommand {
msg: "'meta_client_options'",
})?;
let meta_client =
meta_client::create_meta_client(MetaClientType::Flownode { member_id }, meta_config)
.await
.context(MetaClientInitSnafu)?;
let meta_client = meta_client::create_meta_client(
MetaClientType::Flownode { member_id },
meta_config,
None,
)
.await
.context(MetaClientInitSnafu)?;
let cache_max_capacity = meta_config.metadata_cache_max_capacity;
let cache_ttl = meta_config.metadata_cache_ttl;

View File

@@ -32,28 +32,25 @@ use common_telemetry::info;
use common_telemetry::logging::TracingOptions;
use common_time::timezone::set_default_timezone;
use common_version::{short_version, version};
use frontend::frontend::Frontend;
use frontend::heartbeat::HeartbeatTask;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance as FeInstance};
use frontend::server::Services;
use meta_client::{MetaClientOptions, MetaClientType};
use query::stats::StatementStatistics;
use servers::export_metrics::ExportMetricsTask;
use servers::tls::{TlsMode, TlsOption};
use snafu::{OptionExt, ResultExt};
use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{
self, InitTimezoneSnafu, LoadLayeredConfigSnafu, MetaClientInitSnafu, MissingConfigSnafu,
Result, StartFrontendSnafu,
};
use crate::error::{self, Result};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{log_versions, App};
type FrontendOptions = GreptimeOptions<frontend::frontend::FrontendOptions>;
pub struct Instance {
frontend: FeInstance,
frontend: Frontend,
// Keep the logging guard to prevent the worker from being dropped.
_guard: Vec<WorkerGuard>,
}
@@ -61,20 +58,17 @@ pub struct Instance {
pub const APP_NAME: &str = "greptime-frontend";
impl Instance {
pub fn new(frontend: FeInstance, guard: Vec<WorkerGuard>) -> Self {
Self {
frontend,
_guard: guard,
}
pub fn new(frontend: Frontend, _guard: Vec<WorkerGuard>) -> Self {
Self { frontend, _guard }
}
pub fn mut_inner(&mut self) -> &mut FeInstance {
&mut self.frontend
}
pub fn inner(&self) -> &FeInstance {
pub fn inner(&self) -> &Frontend {
&self.frontend
}
pub fn mut_inner(&mut self) -> &mut Frontend {
&mut self.frontend
}
}
#[async_trait]
@@ -84,11 +78,15 @@ impl App for Instance {
}
async fn start(&mut self) -> Result<()> {
plugins::start_frontend_plugins(self.frontend.plugins().clone())
let plugins = self.frontend.instance.plugins().clone();
plugins::start_frontend_plugins(plugins)
.await
.context(StartFrontendSnafu)?;
.context(error::StartFrontendSnafu)?;
self.frontend.start().await.context(StartFrontendSnafu)
self.frontend
.start()
.await
.context(error::StartFrontendSnafu)
}
async fn stop(&self) -> Result<()> {
@@ -178,7 +176,7 @@ impl StartCommand {
self.config_file.as_deref(),
self.env_prefix.as_ref(),
)
.context(LoadLayeredConfigSnafu)?;
.context(error::LoadLayeredConfigSnafu)?;
self.merge_with_cli_options(global_options, &mut opts)?;
@@ -283,22 +281,28 @@ impl StartCommand {
let mut plugins = Plugins::new();
plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &opts)
.await
.context(StartFrontendSnafu)?;
.context(error::StartFrontendSnafu)?;
set_default_timezone(opts.default_timezone.as_deref()).context(InitTimezoneSnafu)?;
set_default_timezone(opts.default_timezone.as_deref()).context(error::InitTimezoneSnafu)?;
let meta_client_options = opts.meta_client.as_ref().context(MissingConfigSnafu {
msg: "'meta_client'",
})?;
let meta_client_options = opts
.meta_client
.as_ref()
.context(error::MissingConfigSnafu {
msg: "'meta_client'",
})?;
let cache_max_capacity = meta_client_options.metadata_cache_max_capacity;
let cache_ttl = meta_client_options.metadata_cache_ttl;
let cache_tti = meta_client_options.metadata_cache_tti;
let meta_client =
meta_client::create_meta_client(MetaClientType::Frontend, meta_client_options)
.await
.context(MetaClientInitSnafu)?;
let meta_client = meta_client::create_meta_client(
MetaClientType::Frontend,
meta_client_options,
Some(&plugins),
)
.await
.context(error::MetaClientInitSnafu)?;
// TODO(discord9): add helper function to ease the creation of cache registry&such
let cached_meta_backend =
@@ -345,6 +349,7 @@ impl StartCommand {
opts.heartbeat.clone(),
Arc::new(executor),
);
let heartbeat_task = Some(heartbeat_task);
// frontend to datanode need not timeout.
// Some queries are expected to take long time.
@@ -356,7 +361,7 @@ impl StartCommand {
};
let client = NodeClients::new(channel_config);
let mut instance = FrontendBuilder::new(
let instance = FrontendBuilder::new(
opts.clone(),
cached_meta_backend.clone(),
layered_cache_registry.clone(),
@@ -367,20 +372,27 @@ impl StartCommand {
)
.with_plugin(plugins.clone())
.with_local_cache_invalidator(layered_cache_registry)
.with_heartbeat_task(heartbeat_task)
.try_build()
.await
.context(StartFrontendSnafu)?;
.context(error::StartFrontendSnafu)?;
let instance = Arc::new(instance);
let servers = Services::new(opts, Arc::new(instance.clone()), plugins)
let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
.context(error::ServersSnafu)?;
let servers = Services::new(opts, instance.clone(), plugins)
.build()
.await
.context(StartFrontendSnafu)?;
instance
.build_servers(servers)
.context(StartFrontendSnafu)?;
.context(error::StartFrontendSnafu)?;
Ok(Instance::new(instance, guard))
let frontend = Frontend {
instance,
servers,
heartbeat_task,
export_metrics_task,
};
Ok(Instance::new(frontend, guard))
}
}
@@ -440,7 +452,7 @@ mod tests {
[http]
addr = "127.0.0.1:4000"
timeout = "30s"
timeout = "0s"
body_limit = "2GB"
[opentsdb]
@@ -461,7 +473,7 @@ mod tests {
let fe_opts = command.load_options(&Default::default()).unwrap().component;
assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
assert_eq!(Duration::from_secs(30), fe_opts.http.timeout);
assert_eq!(Duration::from_secs(0), fe_opts.http.timeout);
assert_eq!(ReadableSize::gb(2), fe_opts.http.body_limit);

View File

@@ -42,6 +42,7 @@ use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::NodeManagerRef;
use common_meta::peer::Peer;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal_options_allocator::{build_wal_options_allocator, WalOptionsAllocatorRef};
use common_procedure::{ProcedureInfo, ProcedureManagerRef};
@@ -55,9 +56,9 @@ use datanode::datanode::{Datanode, DatanodeBuilder};
use datanode::region_server::RegionServer;
use file_engine::config::EngineConfig as FileEngineConfig;
use flow::{FlowConfig, FlowWorkerManager, FlownodeBuilder, FlownodeOptions, FrontendInvoker};
use frontend::frontend::FrontendOptions;
use frontend::frontend::{Frontend, FrontendOptions};
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager};
use frontend::instance::{Instance as FeInstance, StandaloneDatanodeManager};
use frontend::server::Services;
use frontend::service_config::{
InfluxdbOptions, JaegerOptions, MysqlOptions, OpentsdbOptions, PostgresOptions,
@@ -67,7 +68,7 @@ use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
use mito2::config::MitoConfig;
use query::stats::StatementStatistics;
use serde::{Deserialize, Serialize};
use servers::export_metrics::ExportMetricsOption;
use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask};
use servers::grpc::GrpcOptions;
use servers::http::HttpOptions;
use servers::tls::{TlsMode, TlsOption};
@@ -76,15 +77,9 @@ use snafu::ResultExt;
use tokio::sync::{broadcast, RwLock};
use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{
BuildCacheRegistrySnafu, BuildWalOptionsAllocatorSnafu, CreateDirSnafu, IllegalConfigSnafu,
InitDdlManagerSnafu, InitMetadataSnafu, InitTimezoneSnafu, LoadLayeredConfigSnafu, OtherSnafu,
Result, ShutdownDatanodeSnafu, ShutdownFlownodeSnafu, ShutdownFrontendSnafu,
StartDatanodeSnafu, StartFlownodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu,
StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu,
};
use crate::error::Result;
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{log_versions, App};
use crate::{error, log_versions, App};
pub const APP_NAME: &str = "greptime-standalone";
@@ -132,7 +127,6 @@ impl SubCommand {
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[serde(default)]
pub struct StandaloneOptions {
pub mode: Mode,
pub enable_telemetry: bool,
pub default_timezone: Option<String>,
pub http: HttpOptions,
@@ -162,7 +156,6 @@ pub struct StandaloneOptions {
impl Default for StandaloneOptions {
fn default() -> Self {
Self {
mode: Mode::Standalone,
enable_telemetry: true,
default_timezone: None,
http: HttpOptions::default(),
@@ -243,7 +236,6 @@ impl StandaloneOptions {
grpc: cloned_opts.grpc,
init_regions_in_background: cloned_opts.init_regions_in_background,
init_regions_parallelism: cloned_opts.init_regions_parallelism,
mode: Mode::Standalone,
..Default::default()
}
}
@@ -251,13 +243,12 @@ impl StandaloneOptions {
pub struct Instance {
datanode: Datanode,
frontend: FeInstance,
frontend: Frontend,
// TODO(discord9): wrapped it in flownode instance instead
flow_worker_manager: Arc<FlowWorkerManager>,
flow_shutdown: broadcast::Sender<()>,
procedure_manager: ProcedureManagerRef,
wal_options_allocator: WalOptionsAllocatorRef,
// Keep the logging guard to prevent the worker from being dropped.
_guard: Vec<WorkerGuard>,
}
@@ -281,21 +272,26 @@ impl App for Instance {
self.procedure_manager
.start()
.await
.context(StartProcedureManagerSnafu)?;
.context(error::StartProcedureManagerSnafu)?;
self.wal_options_allocator
.start()
.await
.context(StartWalOptionsAllocatorSnafu)?;
.context(error::StartWalOptionsAllocatorSnafu)?;
plugins::start_frontend_plugins(self.frontend.plugins().clone())
plugins::start_frontend_plugins(self.frontend.instance.plugins().clone())
.await
.context(StartFrontendSnafu)?;
.context(error::StartFrontendSnafu)?;
self.frontend
.start()
.await
.context(error::StartFrontendSnafu)?;
self.frontend.start().await.context(StartFrontendSnafu)?;
self.flow_worker_manager
.clone()
.run_background(Some(self.flow_shutdown.subscribe()));
Ok(())
}
@@ -303,17 +299,18 @@ impl App for Instance {
self.frontend
.shutdown()
.await
.context(ShutdownFrontendSnafu)?;
.context(error::ShutdownFrontendSnafu)?;
self.procedure_manager
.stop()
.await
.context(StopProcedureManagerSnafu)?;
.context(error::StopProcedureManagerSnafu)?;
self.datanode
.shutdown()
.await
.context(ShutdownDatanodeSnafu)?;
.context(error::ShutdownDatanodeSnafu)?;
self.flow_shutdown
.send(())
.map_err(|_e| {
@@ -322,7 +319,8 @@ impl App for Instance {
}
.build()
})
.context(ShutdownFlownodeSnafu)?;
.context(error::ShutdownFlownodeSnafu)?;
info!("Datanode instance stopped.");
Ok(())
@@ -368,7 +366,7 @@ impl StartCommand {
self.config_file.as_deref(),
self.env_prefix.as_ref(),
)
.context(LoadLayeredConfigSnafu)?;
.context(error::LoadLayeredConfigSnafu)?;
self.merge_with_cli_options(global_options, &mut opts.component)?;
@@ -381,9 +379,6 @@ impl StartCommand {
global_options: &GlobalOptions,
opts: &mut StandaloneOptions,
) -> Result<()> {
// Should always be standalone mode.
opts.mode = Mode::Standalone;
if let Some(dir) = &global_options.log_dir {
opts.logging.dir.clone_from(dir);
}
@@ -415,7 +410,7 @@ impl StartCommand {
// frontend grpc addr conflict with datanode default grpc addr
let datanode_grpc_addr = DatanodeOptions::default().grpc.bind_addr;
if addr.eq(&datanode_grpc_addr) {
return IllegalConfigSnafu {
return error::IllegalConfigSnafu {
msg: format!(
"gRPC listen address conflicts with datanode reserved gRPC addr: {datanode_grpc_addr}",
),
@@ -474,18 +469,19 @@ impl StartCommand {
plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &fe_opts)
.await
.context(StartFrontendSnafu)?;
.context(error::StartFrontendSnafu)?;
plugins::setup_datanode_plugins(&mut plugins, &plugin_opts, &dn_opts)
.await
.context(StartDatanodeSnafu)?;
.context(error::StartDatanodeSnafu)?;
set_default_timezone(fe_opts.default_timezone.as_deref()).context(InitTimezoneSnafu)?;
set_default_timezone(fe_opts.default_timezone.as_deref())
.context(error::InitTimezoneSnafu)?;
let data_home = &dn_opts.storage.data_home;
// Ensure the data_home directory exists.
fs::create_dir_all(path::Path::new(data_home))
.context(CreateDirSnafu { dir: data_home })?;
.context(error::CreateDirSnafu { dir: data_home })?;
let metadata_dir = metadata_store_dir(data_home);
let (kv_backend, procedure_manager) = FeInstance::try_build_standalone_components(
@@ -494,7 +490,7 @@ impl StartCommand {
opts.procedure,
)
.await
.context(StartFrontendSnafu)?;
.context(error::StartFrontendSnafu)?;
// Builds cache registry
let layered_cache_builder = LayeredCacheRegistryBuilder::default();
@@ -503,16 +499,16 @@ impl StartCommand {
with_default_composite_cache_registry(
layered_cache_builder.add_cache_registry(fundamental_cache_registry),
)
.context(BuildCacheRegistrySnafu)?
.context(error::BuildCacheRegistrySnafu)?
.build(),
);
let datanode = DatanodeBuilder::new(dn_opts, plugins.clone())
let datanode = DatanodeBuilder::new(dn_opts, plugins.clone(), Mode::Standalone)
.with_kv_backend(kv_backend.clone())
.with_cache_registry(layered_cache_registry.clone())
.build()
.await
.context(StartDatanodeSnafu)?;
.context(error::StartDatanodeSnafu)?;
let information_extension = Arc::new(StandaloneInformationExtension::new(
datanode.region_server(),
@@ -545,7 +541,7 @@ impl StartCommand {
.build()
.await
.map_err(BoxedError::new)
.context(OtherSnafu)?,
.context(error::OtherSnafu)?,
);
// set the ref to query for the local flow state
@@ -576,7 +572,7 @@ impl StartCommand {
let kafka_options = opts.wal.clone().into();
let wal_options_allocator = build_wal_options_allocator(&kafka_options, kv_backend.clone())
.await
.context(BuildWalOptionsAllocatorSnafu)?;
.context(error::BuildWalOptionsAllocatorSnafu)?;
let wal_options_allocator = Arc::new(wal_options_allocator);
let table_meta_allocator = Arc::new(TableMetadataAllocator::new(
table_id_sequence,
@@ -597,8 +593,8 @@ impl StartCommand {
)
.await?;
let mut frontend = FrontendBuilder::new(
fe_opts,
let fe_instance = FrontendBuilder::new(
fe_opts.clone(),
kv_backend.clone(),
layered_cache_registry.clone(),
catalog_manager.clone(),
@@ -609,7 +605,8 @@ impl StartCommand {
.with_plugin(plugins.clone())
.try_build()
.await
.context(StartFrontendSnafu)?;
.context(error::StartFrontendSnafu)?;
let fe_instance = Arc::new(fe_instance);
let flow_worker_manager = flownode.flow_worker_manager();
// flow server need to be able to use frontend to write insert requests back
@@ -622,18 +619,25 @@ impl StartCommand {
node_manager,
)
.await
.context(StartFlownodeSnafu)?;
.context(error::StartFlownodeSnafu)?;
flow_worker_manager.set_frontend_invoker(invoker).await;
let (tx, _rx) = broadcast::channel(1);
let servers = Services::new(opts, Arc::new(frontend.clone()), plugins)
let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
.context(error::ServersSnafu)?;
let servers = Services::new(opts, fe_instance.clone(), plugins)
.build()
.await
.context(StartFrontendSnafu)?;
frontend
.build_servers(servers)
.context(StartFrontendSnafu)?;
.context(error::StartFrontendSnafu)?;
let frontend = Frontend {
instance: fe_instance,
servers,
heartbeat_task: None,
export_metrics_task,
};
Ok(Instance {
datanode,
@@ -661,6 +665,7 @@ impl StartCommand {
node_manager,
cache_invalidator,
memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
table_metadata_manager,
table_metadata_allocator,
flow_metadata_manager,
@@ -670,7 +675,7 @@ impl StartCommand {
procedure_manager,
true,
)
.context(InitDdlManagerSnafu)?,
.context(error::InitDdlManagerSnafu)?,
);
Ok(procedure_executor)
@@ -684,7 +689,7 @@ impl StartCommand {
table_metadata_manager
.init()
.await
.context(InitMetadataSnafu)?;
.context(error::InitMetadataSnafu)?;
Ok(table_metadata_manager)
}
@@ -778,6 +783,7 @@ impl InformationExtension for StandaloneInformationExtension {
manifest_size: region_stat.manifest_size,
sst_size: region_stat.sst_size,
index_size: region_stat.index_size,
region_manifest: region_stat.manifest.into(),
}
})
.collect::<Vec<_>>();
@@ -1054,7 +1060,6 @@ mod tests {
let options =
StandaloneOptions::load_layered_options(None, "GREPTIMEDB_STANDALONE").unwrap();
let default_options = StandaloneOptions::default();
assert_eq!(options.mode, default_options.mode);
assert_eq!(options.enable_telemetry, default_options.enable_telemetry);
assert_eq!(options.http, default_options.http);
assert_eq!(options.grpc, default_options.grpc);

View File

@@ -135,5 +135,6 @@ pub fn is_readonly_schema(schema: &str) -> bool {
pub const TRACE_ID_COLUMN: &str = "trace_id";
pub const SPAN_ID_COLUMN: &str = "span_id";
pub const SPAN_NAME_COLUMN: &str = "span_name";
pub const SERVICE_NAME_COLUMN: &str = "service_name";
pub const PARENT_SPAN_ID_COLUMN: &str = "parent_span_id";
// ---- End of special table and fields ----

View File

@@ -39,6 +39,7 @@ geohash = { version = "0.13", optional = true }
h3o = { version = "0.6", optional = true }
hyperloglogplus = "0.4"
jsonb.workspace = true
memchr = "2.7"
nalgebra.workspace = true
num = "0.4"
num-traits = "0.2"

View File

@@ -12,15 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod add_region_follower;
mod flush_compact_region;
mod flush_compact_table;
mod migrate_region;
mod remove_region_follower;
use std::sync::Arc;
use add_region_follower::AddRegionFollowerFunction;
use flush_compact_region::{CompactRegionFunction, FlushRegionFunction};
use flush_compact_table::{CompactTableFunction, FlushTableFunction};
use migrate_region::MigrateRegionFunction;
use remove_region_follower::RemoveRegionFollowerFunction;
use crate::flush_flow::FlushFlowFunction;
use crate::function_registry::FunctionRegistry;
@@ -32,6 +36,8 @@ impl AdminFunction {
/// Register all table functions to [`FunctionRegistry`].
pub fn register(registry: &FunctionRegistry) {
registry.register_async(Arc::new(MigrateRegionFunction));
registry.register_async(Arc::new(AddRegionFollowerFunction));
registry.register_async(Arc::new(RemoveRegionFollowerFunction));
registry.register_async(Arc::new(FlushRegionFunction));
registry.register_async(Arc::new(CompactRegionFunction));
registry.register_async(Arc::new(FlushTableFunction));

View File

@@ -0,0 +1,129 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_macro::admin_fn;
use common_meta::rpc::procedure::AddRegionFollowerRequest;
use common_query::error::{
InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result,
UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::{Signature, TypeSignature, Volatility};
use datatypes::prelude::ConcreteDataType;
use datatypes::value::{Value, ValueRef};
use session::context::QueryContextRef;
use snafu::ensure;
use crate::handlers::ProcedureServiceHandlerRef;
use crate::helper::cast_u64;
/// A function to add a follower to a region.
/// Only available in cluster mode.
///
/// - `add_region_follower(region_id, peer_id)`.
///
/// The parameters:
/// - `region_id`: the region id
/// - `peer_id`: the peer id
#[admin_fn(
name = AddRegionFollowerFunction,
display_name = add_region_follower,
sig_fn = signature,
ret = uint64
)]
pub(crate) async fn add_region_follower(
procedure_service_handler: &ProcedureServiceHandlerRef,
_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
ensure!(
params.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly 2, have: {}",
params.len()
),
}
);
let Some(region_id) = cast_u64(&params[0])? else {
return UnsupportedInputDataTypeSnafu {
function: "add_region_follower",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
let Some(peer_id) = cast_u64(&params[1])? else {
return UnsupportedInputDataTypeSnafu {
function: "add_region_follower",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
procedure_service_handler
.add_region_follower(AddRegionFollowerRequest { region_id, peer_id })
.await?;
Ok(Value::from(0u64))
}
fn signature() -> Signature {
Signature::one_of(
vec![
// add_region_follower(region_id, peer)
TypeSignature::Uniform(2, ConcreteDataType::numerics()),
],
Volatility::Immutable,
)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_query::prelude::TypeSignature;
use datatypes::vectors::{UInt64Vector, VectorRef};
use super::*;
use crate::function::{AsyncFunction, FunctionContext};
#[test]
fn test_add_region_follower_misc() {
let f = AddRegionFollowerFunction;
assert_eq!("add_region_follower", f.name());
assert_eq!(
ConcreteDataType::uint64_datatype(),
f.return_type(&[]).unwrap()
);
assert!(matches!(f.signature(),
Signature {
type_signature: TypeSignature::OneOf(sigs),
volatility: Volatility::Immutable
} if sigs.len() == 1));
}
#[tokio::test]
async fn test_add_region_follower() {
let f = AddRegionFollowerFunction;
let args = vec![1, 1];
let args = args
.into_iter()
.map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
.collect::<Vec<_>>();
let result = f.eval(FunctionContext::mock(), &args).await.unwrap();
let expect: VectorRef = Arc::new(UInt64Vector::from_slice([0u64]));
assert_eq!(result, expect);
}
}

View File

@@ -0,0 +1,129 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_macro::admin_fn;
use common_meta::rpc::procedure::RemoveRegionFollowerRequest;
use common_query::error::{
InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result,
UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::{Signature, TypeSignature, Volatility};
use datatypes::prelude::ConcreteDataType;
use datatypes::value::{Value, ValueRef};
use session::context::QueryContextRef;
use snafu::ensure;
use crate::handlers::ProcedureServiceHandlerRef;
use crate::helper::cast_u64;
/// A function to remove a follower from a region.
//// Only available in cluster mode.
///
/// - `remove_region_follower(region_id, peer_id)`.
///
/// The parameters:
/// - `region_id`: the region id
/// - `peer_id`: the peer id
#[admin_fn(
name = RemoveRegionFollowerFunction,
display_name = remove_region_follower,
sig_fn = signature,
ret = uint64
)]
pub(crate) async fn remove_region_follower(
procedure_service_handler: &ProcedureServiceHandlerRef,
_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
ensure!(
params.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly 2, have: {}",
params.len()
),
}
);
let Some(region_id) = cast_u64(&params[0])? else {
return UnsupportedInputDataTypeSnafu {
function: "add_region_follower",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
let Some(peer_id) = cast_u64(&params[1])? else {
return UnsupportedInputDataTypeSnafu {
function: "add_region_follower",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
procedure_service_handler
.remove_region_follower(RemoveRegionFollowerRequest { region_id, peer_id })
.await?;
Ok(Value::from(0u64))
}
fn signature() -> Signature {
Signature::one_of(
vec![
// remove_region_follower(region_id, peer_id)
TypeSignature::Uniform(2, ConcreteDataType::numerics()),
],
Volatility::Immutable,
)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_query::prelude::TypeSignature;
use datatypes::vectors::{UInt64Vector, VectorRef};
use super::*;
use crate::function::{AsyncFunction, FunctionContext};
#[test]
fn test_remove_region_follower_misc() {
let f = RemoveRegionFollowerFunction;
assert_eq!("remove_region_follower", f.name());
assert_eq!(
ConcreteDataType::uint64_datatype(),
f.return_type(&[]).unwrap()
);
assert!(matches!(f.signature(),
Signature {
type_signature: TypeSignature::OneOf(sigs),
volatility: Volatility::Immutable
} if sigs.len() == 1));
}
#[tokio::test]
async fn test_remove_region_follower() {
let f = RemoveRegionFollowerFunction;
let args = vec![1, 1];
let args = args
.into_iter()
.map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
.collect::<Vec<_>>();
let result = f.eval(FunctionContext::mock(), &args).await.unwrap();
let expect: VectorRef = Arc::new(UInt64Vector::from_slice([0u64]));
assert_eq!(result, expect);
}
}

View File

@@ -27,6 +27,7 @@ use crate::scalars::hll_count::HllCalcFunction;
use crate::scalars::ip::IpFunctions;
use crate::scalars::json::JsonFunction;
use crate::scalars::matches::MatchesFunction;
use crate::scalars::matches_term::MatchesTermFunction;
use crate::scalars::math::MathFunction;
use crate::scalars::timestamp::TimestampFunction;
use crate::scalars::uddsketch_calc::UddSketchCalcFunction;
@@ -116,6 +117,7 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
// Full text search function
MatchesFunction::register(&function_registry);
MatchesTermFunction::register(&function_registry);
// System and administration functions
SystemFunction::register(&function_registry);

View File

@@ -16,7 +16,10 @@ use std::sync::Arc;
use async_trait::async_trait;
use common_base::AffectedRows;
use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse};
use common_meta::rpc::procedure::{
AddRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
RemoveRegionFollowerRequest,
};
use common_query::error::Result;
use common_query::Output;
use session::context::QueryContextRef;
@@ -63,6 +66,12 @@ pub trait ProcedureServiceHandler: Send + Sync {
/// Query the procedure' state by its id
async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse>;
/// Add a region follower to a region.
async fn add_region_follower(&self, request: AddRegionFollowerRequest) -> Result<()>;
/// Remove a region follower from a region.
async fn remove_region_follower(&self, request: RemoveRegionFollowerRequest) -> Result<()>;
}
/// This flow service handler is only use for flush flow for now.

View File

@@ -19,6 +19,7 @@ pub mod expression;
pub mod geo;
pub mod json;
pub mod matches;
pub mod matches_term;
pub mod math;
pub mod vector;

View File

@@ -0,0 +1,375 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::{fmt, iter};
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::Volatility;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{BooleanVector, BooleanVectorBuilder, MutableVector, VectorRef};
use memchr::memmem;
use snafu::ensure;
use crate::function::{Function, FunctionContext};
use crate::function_registry::FunctionRegistry;
/// Exact term/phrase matching function for text columns.
///
/// This function checks if a text column contains exact term/phrase matches
/// with non-alphanumeric boundaries. Designed for:
/// - Whole-word matching (e.g. "cat" in "cat!" but not in "category")
/// - Phrase matching (e.g. "hello world" in "note:hello world!")
///
/// # Signature
/// `matches_term(text: String, term: String) -> Boolean`
///
/// # Arguments
/// * `text` - String column to search
/// * `term` - Search term/phrase
///
/// # Returns
/// BooleanVector where each element indicates if the corresponding text
/// contains an exact match of the term, following these rules:
/// 1. Exact substring match found (case-sensitive)
/// 2. Match boundaries are either:
/// - Start/end of text
/// - Any non-alphanumeric character (including spaces, hyphens, punctuation, etc.)
///
/// # Examples
/// ```
/// -- SQL examples --
/// -- Match phrase with space --
/// SELECT matches_term(column, 'hello world') FROM table;
/// -- Text: "warning:hello world!" => true
/// -- Text: "hello-world" => false (hyphen instead of space)
/// -- Text: "hello world2023" => false (ending with numbers)
///
/// -- Match multiple words with boundaries --
/// SELECT matches_term(column, 'critical error') FROM logs;
/// -- Match in: "ERROR:critical error!"
/// -- No match: "critical_errors"
///
/// -- Empty string handling --
/// SELECT matches_term(column, '') FROM table;
/// -- Text: "" => true
/// -- Text: "any" => false
///
/// -- Case sensitivity --
/// SELECT matches_term(column, 'Cat') FROM table;
/// -- Text: "Cat" => true
/// -- Text: "cat" => false
/// ```
pub struct MatchesTermFunction;
impl MatchesTermFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register(Arc::new(MatchesTermFunction));
}
}
impl fmt::Display for MatchesTermFunction {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "MATCHES_TERM")
}
}
impl Function for MatchesTermFunction {
fn name(&self) -> &str {
"matches_term"
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::boolean_datatype())
}
fn signature(&self) -> common_query::prelude::Signature {
common_query::prelude::Signature::exact(
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::string_datatype(),
],
Volatility::Immutable,
)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly 2, have: {}",
columns.len()
),
}
);
let text_column = &columns[0];
if text_column.is_empty() {
return Ok(Arc::new(BooleanVector::from(Vec::<bool>::with_capacity(0))));
}
let term_column = &columns[1];
let compiled_finder = if term_column.is_const() {
let term = term_column.get_ref(0).as_string().unwrap();
match term {
None => {
return Ok(Arc::new(BooleanVector::from_iter(
iter::repeat(None).take(text_column.len()),
)));
}
Some(term) => Some(MatchesTermFinder::new(term)),
}
} else {
None
};
let len = text_column.len();
let mut result = BooleanVectorBuilder::with_capacity(len);
for i in 0..len {
let text = text_column.get_ref(i).as_string().unwrap();
let Some(text) = text else {
result.push_null();
continue;
};
let contains = match &compiled_finder {
Some(finder) => finder.find(text),
None => {
let term = match term_column.get_ref(i).as_string().unwrap() {
None => {
result.push_null();
continue;
}
Some(term) => term,
};
MatchesTermFinder::new(term).find(text)
}
};
result.push(Some(contains));
}
Ok(result.to_vector())
}
}
/// A compiled finder for `matches_term` function that holds the compiled term
/// and its metadata for efficient matching.
///
/// A term is considered matched when:
/// 1. The exact sequence appears in the text
/// 2. It is either:
/// - At the start/end of text with adjacent non-alphanumeric character
/// - Surrounded by non-alphanumeric characters
///
/// # Examples
/// ```
/// let finder = MatchesTermFinder::new("cat");
/// assert!(finder.find("cat!")); // Term at end with punctuation
/// assert!(finder.find("dog,cat")); // Term preceded by comma
/// assert!(!finder.find("category")); // Partial match rejected
///
/// let finder = MatchesTermFinder::new("world");
/// assert!(finder.find("hello-world")); // Hyphen boundary
/// ```
#[derive(Clone, Debug)]
pub struct MatchesTermFinder {
finder: memmem::Finder<'static>,
term: String,
starts_with_non_alnum: bool,
ends_with_non_alnum: bool,
}
impl MatchesTermFinder {
/// Create a new `MatchesTermFinder` for the given term.
pub fn new(term: &str) -> Self {
let starts_with_non_alnum = term.chars().next().is_some_and(|c| !c.is_alphanumeric());
let ends_with_non_alnum = term.chars().last().is_some_and(|c| !c.is_alphanumeric());
Self {
finder: memmem::Finder::new(term).into_owned(),
term: term.to_string(),
starts_with_non_alnum,
ends_with_non_alnum,
}
}
/// Find the term in the text.
pub fn find(&self, text: &str) -> bool {
if self.term.is_empty() {
return text.is_empty();
}
if text.len() < self.term.len() {
return false;
}
let mut pos = 0;
while let Some(found_pos) = self.finder.find(text[pos..].as_bytes()) {
let actual_pos = pos + found_pos;
let prev_ok = self.starts_with_non_alnum
|| text[..actual_pos]
.chars()
.last()
.map(|c| !c.is_alphanumeric())
.unwrap_or(true);
if prev_ok {
let next_pos = actual_pos + self.finder.needle().len();
let next_ok = self.ends_with_non_alnum
|| text[next_pos..]
.chars()
.next()
.map(|c| !c.is_alphanumeric())
.unwrap_or(true);
if next_ok {
return true;
}
}
if let Some(next_char) = text[actual_pos..].chars().next() {
pos = actual_pos + next_char.len_utf8();
} else {
break;
}
}
false
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn matches_term_example() {
let finder = MatchesTermFinder::new("hello world");
assert!(finder.find("warning:hello world!"));
assert!(!finder.find("hello-world"));
assert!(!finder.find("hello world2023"));
let finder = MatchesTermFinder::new("critical error");
assert!(finder.find("ERROR:critical error!"));
assert!(!finder.find("critical_errors"));
let finder = MatchesTermFinder::new("");
assert!(finder.find(""));
assert!(!finder.find("any"));
let finder = MatchesTermFinder::new("Cat");
assert!(finder.find("Cat"));
assert!(!finder.find("cat"));
}
#[test]
fn matches_term_with_punctuation() {
assert!(MatchesTermFinder::new("cat").find("cat!"));
assert!(MatchesTermFinder::new("dog").find("!dog"));
}
#[test]
fn matches_phrase_with_boundaries() {
assert!(MatchesTermFinder::new("hello-world").find("hello-world"));
assert!(MatchesTermFinder::new("'foo bar'").find("test: 'foo bar'"));
}
#[test]
fn matches_at_text_boundaries() {
assert!(MatchesTermFinder::new("start").find("start..."));
assert!(MatchesTermFinder::new("end").find("...end"));
}
// Negative cases
#[test]
fn rejects_partial_matches() {
assert!(!MatchesTermFinder::new("cat").find("category"));
assert!(!MatchesTermFinder::new("boot").find("rebooted"));
}
#[test]
fn rejects_missing_term() {
assert!(!MatchesTermFinder::new("foo").find("hello world"));
}
// Edge cases
#[test]
fn handles_empty_inputs() {
assert!(!MatchesTermFinder::new("test").find(""));
assert!(!MatchesTermFinder::new("").find("text"));
}
#[test]
fn different_unicode_boundaries() {
assert!(MatchesTermFinder::new("café").find("café>"));
assert!(!MatchesTermFinder::new("café").find("口café>"));
assert!(!MatchesTermFinder::new("café").find("café口"));
assert!(!MatchesTermFinder::new("café").find("cafémore"));
assert!(MatchesTermFinder::new("русский").find("русский!"));
assert!(MatchesTermFinder::new("русский").find("русский!"));
}
#[test]
fn case_sensitive_matching() {
assert!(!MatchesTermFinder::new("cat").find("Cat"));
assert!(MatchesTermFinder::new("CaT").find("CaT"));
}
#[test]
fn numbers_in_term() {
assert!(MatchesTermFinder::new("v1.0").find("v1.0!"));
assert!(!MatchesTermFinder::new("v1.0").find("v1.0a"));
}
#[test]
fn adjacent_alphanumeric_fails() {
assert!(!MatchesTermFinder::new("cat").find("cat5"));
assert!(!MatchesTermFinder::new("dog").find("dogcat"));
}
#[test]
fn empty_term_text() {
assert!(!MatchesTermFinder::new("").find("text"));
assert!(MatchesTermFinder::new("").find(""));
assert!(!MatchesTermFinder::new("text").find(""));
}
#[test]
fn leading_non_alphanumeric() {
assert!(MatchesTermFinder::new("/cat").find("dog/cat"));
assert!(MatchesTermFinder::new("dog/").find("dog/cat"));
assert!(MatchesTermFinder::new("dog/cat").find("dog/cat"));
}
#[test]
fn continues_searching_after_boundary_mismatch() {
assert!(!MatchesTermFinder::new("log").find("bloglog!"));
assert!(MatchesTermFinder::new("log").find("bloglog log"));
assert!(MatchesTermFinder::new("log").find("alogblog_log!"));
assert!(MatchesTermFinder::new("error").find("errorlog_error_case"));
assert!(MatchesTermFinder::new("test").find("atestbtestc_test_end"));
assert!(MatchesTermFinder::new("data").find("database_data_store"));
assert!(!MatchesTermFinder::new("data").find("database_datastore"));
assert!(MatchesTermFinder::new("log.txt").find("catalog.txt_log.txt!"));
assert!(!MatchesTermFinder::new("log.txt").find("catalog.txtlog.txt!"));
assert!(MatchesTermFinder::new("data-set").find("bigdata-set_data-set!"));
assert!(MatchesTermFinder::new("中文").find("这是中文测试,中文!"));
assert!(MatchesTermFinder::new("error").find("错误errorerror日志_error!"));
}
}

View File

@@ -24,6 +24,7 @@ pub(crate) mod sum;
mod vector_add;
mod vector_dim;
mod vector_div;
mod vector_kth_elem;
mod vector_mul;
mod vector_norm;
mod vector_sub;
@@ -57,6 +58,7 @@ impl VectorFunction {
registry.register(Arc::new(vector_div::VectorDivFunction));
registry.register(Arc::new(vector_norm::VectorNormFunction));
registry.register(Arc::new(vector_dim::VectorDimFunction));
registry.register(Arc::new(vector_kth_elem::VectorKthElemFunction));
registry.register(Arc::new(vector_subvector::VectorSubvectorFunction));
registry.register(Arc::new(elem_sum::ElemSumFunction));
registry.register(Arc::new(elem_product::ElemProductFunction));

View File

@@ -0,0 +1,211 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::borrow::Cow;
use std::fmt::Display;
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::Signature;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{Float32VectorBuilder, MutableVector, VectorRef};
use snafu::ensure;
use crate::function::{Function, FunctionContext};
use crate::helper;
use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const};
const NAME: &str = "vec_kth_elem";
/// Returns the k-th(0-based index) element of the vector.
///
/// # Example
///
/// ```sql
/// SELECT vec_kth_elem("[2, 4, 6]",1) as result;
///
/// +---------+
/// | result |
/// +---------+
/// | 4 |
/// +---------+
///
/// ```
///
#[derive(Debug, Clone, Default)]
pub struct VectorKthElemFunction;
impl Function for VectorKthElemFunction {
fn name(&self) -> &str {
NAME
}
fn return_type(
&self,
_input_types: &[ConcreteDataType],
) -> common_query::error::Result<ConcreteDataType> {
Ok(ConcreteDataType::float32_datatype())
}
fn signature(&self) -> Signature {
helper::one_of_sigs2(
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::binary_datatype(),
],
vec![ConcreteDataType::int64_datatype()],
)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly two, have: {}",
columns.len()
),
}
);
let arg0 = &columns[0];
let arg1 = &columns[1];
let len = arg0.len();
let mut result = Float32VectorBuilder::with_capacity(len);
if len == 0 {
return Ok(result.to_vector());
};
let arg0_const = as_veclit_if_const(arg0)?;
for i in 0..len {
let arg0 = match arg0_const.as_ref() {
Some(arg0) => Some(Cow::Borrowed(arg0.as_ref())),
None => as_veclit(arg0.get_ref(i))?,
};
let Some(arg0) = arg0 else {
result.push_null();
continue;
};
let arg1 = arg1.get(i).as_f64_lossy();
let Some(arg1) = arg1 else {
result.push_null();
continue;
};
ensure!(
arg1 >= 0.0 && arg1.fract() == 0.0,
InvalidFuncArgsSnafu {
err_msg: format!(
"Invalid argument: k must be a non-negative integer, but got k = {}.",
arg1
),
}
);
let k = arg1 as usize;
ensure!(
k < arg0.len(),
InvalidFuncArgsSnafu {
err_msg: format!(
"Out of range: k must be in the range [0, {}], but got k = {}.",
arg0.len() - 1,
k
),
}
);
let value = arg0[k];
result.push(Some(value));
}
Ok(result.to_vector())
}
}
impl Display for VectorKthElemFunction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", NAME.to_ascii_uppercase())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_query::error;
use datatypes::vectors::{Int64Vector, StringVector};
use super::*;
#[test]
fn test_vec_kth_elem() {
let func = VectorKthElemFunction;
let input0 = Arc::new(StringVector::from(vec![
Some("[1.0,2.0,3.0]".to_string()),
Some("[4.0,5.0,6.0]".to_string()),
Some("[7.0,8.0,9.0]".to_string()),
None,
]));
let input1 = Arc::new(Int64Vector::from(vec![Some(0), Some(2), None, Some(1)]));
let result = func
.eval(&FunctionContext::default(), &[input0, input1])
.unwrap();
let result = result.as_ref();
assert_eq!(result.len(), 4);
assert_eq!(result.get_ref(0).as_f32().unwrap(), Some(1.0));
assert_eq!(result.get_ref(1).as_f32().unwrap(), Some(6.0));
assert!(result.get_ref(2).is_null());
assert!(result.get_ref(3).is_null());
let input0 = Arc::new(StringVector::from(vec![Some("[1.0,2.0,3.0]".to_string())]));
let input1 = Arc::new(Int64Vector::from(vec![Some(3)]));
let err = func
.eval(&FunctionContext::default(), &[input0, input1])
.unwrap_err();
match err {
error::Error::InvalidFuncArgs { err_msg, .. } => {
assert_eq!(
err_msg,
format!("Out of range: k must be in the range [0, 2], but got k = 3.")
)
}
_ => unreachable!(),
}
let input0 = Arc::new(StringVector::from(vec![Some("[1.0,2.0,3.0]".to_string())]));
let input1 = Arc::new(Int64Vector::from(vec![Some(-1)]));
let err = func
.eval(&FunctionContext::default(), &[input0, input1])
.unwrap_err();
match err {
error::Error::InvalidFuncArgs { err_msg, .. } => {
assert_eq!(
err_msg,
format!("Invalid argument: k must be a non-negative integer, but got k = -1.")
)
}
_ => unreachable!(),
}
}
}

View File

@@ -35,7 +35,10 @@ impl FunctionState {
use api::v1::meta::ProcedureStatus;
use async_trait::async_trait;
use common_base::AffectedRows;
use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse};
use common_meta::rpc::procedure::{
AddRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
RemoveRegionFollowerRequest,
};
use common_query::error::Result;
use common_query::Output;
use session::context::QueryContextRef;
@@ -66,6 +69,17 @@ impl FunctionState {
..Default::default()
})
}
async fn add_region_follower(&self, _request: AddRegionFollowerRequest) -> Result<()> {
Ok(())
}
async fn remove_region_follower(
&self,
_request: RemoveRegionFollowerRequest,
) -> Result<()> {
Ok(())
}
}
#[async_trait]

View File

@@ -22,7 +22,9 @@ mod version;
use std::sync::Arc;
use build::BuildFunction;
use database::{CurrentSchemaFunction, DatabaseFunction, SessionUserFunction};
use database::{
CurrentSchemaFunction, DatabaseFunction, ReadPreferenceFunction, SessionUserFunction,
};
use pg_catalog::PGCatalogFunction;
use procedure_state::ProcedureStateFunction;
use timezone::TimezoneFunction;
@@ -39,6 +41,7 @@ impl SystemFunction {
registry.register(Arc::new(CurrentSchemaFunction));
registry.register(Arc::new(DatabaseFunction));
registry.register(Arc::new(SessionUserFunction));
registry.register(Arc::new(ReadPreferenceFunction));
registry.register(Arc::new(TimezoneFunction));
registry.register_async(Arc::new(ProcedureStateFunction));
PGCatalogFunction::register(registry);

View File

@@ -30,9 +30,12 @@ pub struct DatabaseFunction;
pub struct CurrentSchemaFunction;
pub struct SessionUserFunction;
pub struct ReadPreferenceFunction;
const DATABASE_FUNCTION_NAME: &str = "database";
const CURRENT_SCHEMA_FUNCTION_NAME: &str = "current_schema";
const SESSION_USER_FUNCTION_NAME: &str = "session_user";
const READ_PREFERENCE_FUNCTION_NAME: &str = "read_preference";
impl Function for DatabaseFunction {
fn name(&self) -> &str {
@@ -94,6 +97,26 @@ impl Function for SessionUserFunction {
}
}
impl Function for ReadPreferenceFunction {
fn name(&self) -> &str {
READ_PREFERENCE_FUNCTION_NAME
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::string_datatype())
}
fn signature(&self) -> Signature {
Signature::nullary(Volatility::Immutable)
}
fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
let read_preference = func_ctx.query_ctx.read_preference();
Ok(Arc::new(StringVector::from_slice(&[read_preference.as_ref()])) as _)
}
}
impl fmt::Display for DatabaseFunction {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "DATABASE")
@@ -112,6 +135,12 @@ impl fmt::Display for SessionUserFunction {
}
}
impl fmt::Display for ReadPreferenceFunction {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "READ_PREFERENCE")
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;

View File

@@ -15,11 +15,13 @@
use api::helper::ColumnDataTypeWrapper;
use api::v1::add_column_location::LocationType;
use api::v1::alter_table_expr::Kind;
use api::v1::column_def::{as_fulltext_option, as_skipping_index_type};
use api::v1::column_def::{
as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
};
use api::v1::{
column_def, AddColumnLocation as Location, AlterTableExpr, Analyzer, CreateTableExpr,
DropColumns, ModifyColumnTypes, RenameTable, SemanticType,
SkippingIndexType as PbSkippingIndexType,
DropColumns, FulltextBackend as PbFulltextBackend, ModifyColumnTypes, RenameTable,
SemanticType, SkippingIndexType as PbSkippingIndexType,
};
use common_query::AddColumnLocation;
use datatypes::schema::{ColumnSchema, FulltextOptions, RawSchema, SkippingIndexOptions};
@@ -126,11 +128,15 @@ pub fn alter_expr_to_request(table_id: TableId, expr: AlterTableExpr) -> Result<
column_name: f.column_name.clone(),
options: FulltextOptions {
enable: f.enable,
analyzer: as_fulltext_option(
analyzer: as_fulltext_option_analyzer(
Analyzer::try_from(f.analyzer)
.context(InvalidSetFulltextOptionRequestSnafu)?,
),
case_sensitive: f.case_sensitive,
backend: as_fulltext_option_backend(
PbFulltextBackend::try_from(f.backend)
.context(InvalidSetFulltextOptionRequestSnafu)?,
),
},
},
},

View File

@@ -25,7 +25,7 @@ async fn do_bench_channel_manager() {
let m_clone = m.clone();
let join = tokio::spawn(async move {
for _ in 0..10000 {
let idx = rand::random::<usize>() % 100;
let idx = rand::random::<u32>() % 100;
let ret = m_clone.get(format!("{idx}"));
let _ = ret.unwrap();
}

View File

@@ -27,6 +27,7 @@ use crate::error::{
DecodeJsonSnafu, EncodeJsonSnafu, Error, FromUtf8Snafu, InvalidNodeInfoKeySnafu,
InvalidRoleSnafu, ParseNumSnafu, Result,
};
use crate::key::flow::flow_state::FlowStat;
use crate::peer::Peer;
const CLUSTER_NODE_INFO_PREFIX: &str = "__meta_cluster_node_info";
@@ -52,6 +53,9 @@ pub trait ClusterInfo {
/// List all region stats in the cluster.
async fn list_region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error>;
/// List all flow stats in the cluster.
async fn list_flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error>;
// TODO(jeremy): Other info, like region status, etc.
}

View File

@@ -92,6 +92,22 @@ pub struct RegionStat {
pub sst_size: u64,
/// The size of the SST index files in bytes.
pub index_size: u64,
/// The manifest infoof the region.
pub region_manifest: RegionManifestInfo,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum RegionManifestInfo {
Mito {
manifest_version: u64,
flushed_entry_id: u64,
},
Metric {
data_manifest_version: u64,
data_flushed_entry_id: u64,
metadata_manifest_version: u64,
metadata_flushed_entry_id: u64,
},
}
impl Stat {
@@ -165,6 +181,31 @@ impl TryFrom<&HeartbeatRequest> for Stat {
}
}
impl From<store_api::region_engine::RegionManifestInfo> for RegionManifestInfo {
fn from(value: store_api::region_engine::RegionManifestInfo) -> Self {
match value {
store_api::region_engine::RegionManifestInfo::Mito {
manifest_version,
flushed_entry_id,
} => RegionManifestInfo::Mito {
manifest_version,
flushed_entry_id,
},
store_api::region_engine::RegionManifestInfo::Metric {
data_manifest_version,
data_flushed_entry_id,
metadata_manifest_version,
metadata_flushed_entry_id,
} => RegionManifestInfo::Metric {
data_manifest_version,
data_flushed_entry_id,
metadata_manifest_version,
metadata_flushed_entry_id,
},
}
}
}
impl From<&api::v1::meta::RegionStat> for RegionStat {
fn from(value: &api::v1::meta::RegionStat) -> Self {
let region_stat = value
@@ -185,6 +226,7 @@ impl From<&api::v1::meta::RegionStat> for RegionStat {
manifest_size: region_stat.manifest_size,
sst_size: region_stat.sst_size,
index_size: region_stat.index_size,
region_manifest: region_stat.manifest.into(),
}
}
}

View File

@@ -22,14 +22,18 @@ use store_api::storage::{RegionId, RegionNumber, TableId};
use crate::cache_invalidator::CacheInvalidatorRef;
use crate::ddl::flow_meta::FlowMetadataAllocatorRef;
use crate::ddl::table_meta::TableMetadataAllocatorRef;
use crate::error::Result;
use crate::error::{Result, UnsupportedSnafu};
use crate::key::flow::FlowMetadataManagerRef;
use crate::key::table_route::PhysicalTableRouteValue;
use crate::key::TableMetadataManagerRef;
use crate::node_manager::NodeManagerRef;
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::region_registry::LeaderRegionRegistryRef;
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
use crate::rpc::procedure::{
AddRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
RemoveRegionFollowerRequest,
};
use crate::DatanodeId;
pub mod alter_database;
@@ -70,6 +74,30 @@ pub trait ProcedureExecutor: Send + Sync {
request: SubmitDdlTaskRequest,
) -> Result<SubmitDdlTaskResponse>;
/// Add a region follower
async fn add_region_follower(
&self,
_ctx: &ExecutorContext,
_request: AddRegionFollowerRequest,
) -> Result<()> {
UnsupportedSnafu {
operation: "add_region_follower",
}
.fail()
}
/// Remove a region follower
async fn remove_region_follower(
&self,
_ctx: &ExecutorContext,
_request: RemoveRegionFollowerRequest,
) -> Result<()> {
UnsupportedSnafu {
operation: "remove_region_follower",
}
.fail()
}
/// Submit a region migration task
async fn migrate_region(
&self,
@@ -137,6 +165,8 @@ pub struct DdlContext {
pub cache_invalidator: CacheInvalidatorRef,
/// Keep tracking operating regions.
pub memory_region_keeper: MemoryRegionKeeperRef,
/// The leader region registry.
pub leader_region_registry: LeaderRegionRegistryRef,
/// Table metadata manager.
pub table_metadata_manager: TableMetadataManagerRef,
/// Allocator for table metadata.

View File

@@ -35,7 +35,9 @@ use crate::error::{self, Result};
use crate::instruction::CacheIdent;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute};
use crate::rpc::router::{
find_leader_regions, find_leaders, operating_leader_regions, RegionRoute,
};
/// [Control] indicated to the caller whether to go to the next step.
#[derive(Debug)]
@@ -250,6 +252,11 @@ impl DropTableExecutor {
.into_iter()
.collect::<Result<Vec<_>>>()?;
// Deletes the leader region from registry.
let region_ids = operating_leader_regions(region_routes);
ctx.leader_region_registry
.batch_delete(region_ids.into_iter().map(|(region_id, _)| region_id));
Ok(())
}
}

View File

@@ -98,13 +98,14 @@ impl TableMetadataAllocator {
fn create_wal_options(
&self,
table_route: &PhysicalTableRouteValue,
skip_wal: bool,
) -> Result<HashMap<RegionNumber, String>> {
let region_numbers = table_route
.region_routes
.iter()
.map(|route| route.region.id.region_number())
.collect();
allocate_region_wal_options(region_numbers, &self.wal_options_allocator)
allocate_region_wal_options(region_numbers, &self.wal_options_allocator, skip_wal)
}
async fn create_table_route(
@@ -158,7 +159,9 @@ impl TableMetadataAllocator {
pub async fn create(&self, task: &CreateTableTask) -> Result<TableMetadata> {
let table_id = self.allocate_table_id(&task.create_table.table_id).await?;
let table_route = self.create_table_route(table_id, task).await?;
let region_wal_options = self.create_wal_options(&table_route)?;
let region_wal_options =
self.create_wal_options(&table_route, task.table_info.meta.options.skip_wal)?;
debug!(
"Allocated region wal options {:?} for table {}",

View File

@@ -850,6 +850,7 @@ mod tests {
use crate::node_manager::{DatanodeRef, FlownodeRef, NodeManager};
use crate::peer::Peer;
use crate::region_keeper::MemoryRegionKeeper;
use crate::region_registry::LeaderRegionRegistry;
use crate::sequence::SequenceBuilder;
use crate::state_store::KvStateStore;
use crate::wal_options_allocator::WalOptionsAllocator;
@@ -893,6 +894,7 @@ mod tests {
flow_metadata_manager,
flow_metadata_allocator,
memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
},
procedure_manager.clone(),

View File

@@ -512,6 +512,10 @@ impl TableMetadataManager {
&self.table_route_manager
}
pub fn topic_region_manager(&self) -> &TopicRegionManager {
&self.topic_region_manager
}
#[cfg(feature = "testing")]
pub fn kv_backend(&self) -> &KvBackendRef {
&self.kv_backend
@@ -1471,7 +1475,8 @@ mod tests {
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let wal_allocator = WalOptionsAllocator::RaftEngine;
let regions = (0..16).collect();
let region_wal_options = allocate_region_wal_options(regions, &wal_allocator).unwrap();
let region_wal_options =
allocate_region_wal_options(regions, &wal_allocator, false).unwrap();
create_physical_table_metadata(
&table_metadata_manager,
table_info.clone(),

View File

@@ -224,6 +224,7 @@ impl TopicRegionManager {
Some((region_id, kafka.topic.as_str()))
}
Some(WalOptions::RaftEngine) => None,
Some(WalOptions::Noop) => None,
None => None,
},
)

View File

@@ -155,21 +155,21 @@ impl<'a> MySqlTemplateFactory<'a> {
table_name: table_name.to_string(),
create_table_statement: format!(
// Cannot be more than 3072 bytes in PRIMARY KEY
"CREATE TABLE IF NOT EXISTS {table_name}(k VARBINARY(3072) PRIMARY KEY, v BLOB);",
"CREATE TABLE IF NOT EXISTS `{table_name}`(k VARBINARY(3072) PRIMARY KEY, v BLOB);",
),
range_template: RangeTemplate {
point: format!("SELECT k, v FROM {table_name} WHERE k = ?"),
range: format!("SELECT k, v FROM {table_name} WHERE k >= ? AND k < ? ORDER BY k"),
full: format!("SELECT k, v FROM {table_name} ? ORDER BY k"),
left_bounded: format!("SELECT k, v FROM {table_name} WHERE k >= ? ORDER BY k"),
prefix: format!("SELECT k, v FROM {table_name} WHERE k LIKE ? ORDER BY k"),
point: format!("SELECT k, v FROM `{table_name}` WHERE k = ?"),
range: format!("SELECT k, v FROM `{table_name}` WHERE k >= ? AND k < ? ORDER BY k"),
full: format!("SELECT k, v FROM `{table_name}` ? ORDER BY k"),
left_bounded: format!("SELECT k, v FROM `{table_name}` WHERE k >= ? ORDER BY k"),
prefix: format!("SELECT k, v FROM `{table_name}` WHERE k LIKE ? ORDER BY k"),
},
delete_template: RangeTemplate {
point: format!("DELETE FROM {table_name} WHERE k = ?;"),
range: format!("DELETE FROM {table_name} WHERE k >= ? AND k < ?;"),
full: format!("DELETE FROM {table_name}"),
left_bounded: format!("DELETE FROM {table_name} WHERE k >= ?;"),
prefix: format!("DELETE FROM {table_name} WHERE k LIKE ?;"),
point: format!("DELETE FROM `{table_name}` WHERE k = ?;"),
range: format!("DELETE FROM `{table_name}` WHERE k >= ? AND k < ?;"),
full: format!("DELETE FROM `{table_name}`"),
left_bounded: format!("DELETE FROM `{table_name}` WHERE k >= ?;"),
prefix: format!("DELETE FROM `{table_name}` WHERE k LIKE ?;"),
},
}
}
@@ -189,14 +189,17 @@ impl MySqlTemplateSet {
fn generate_batch_get_query(&self, key_len: usize) -> String {
let table_name = &self.table_name;
let in_clause = mysql_generate_in_placeholders(1, key_len).join(", ");
format!("SELECT k, v FROM {table_name} WHERE k in ({});", in_clause)
format!(
"SELECT k, v FROM `{table_name}` WHERE k in ({});",
in_clause
)
}
/// Generates the sql for batch delete.
fn generate_batch_delete_query(&self, key_len: usize) -> String {
let table_name = &self.table_name;
let in_clause = mysql_generate_in_placeholders(1, key_len).join(", ");
format!("DELETE FROM {table_name} WHERE k in ({});", in_clause)
format!("DELETE FROM `{table_name}` WHERE k in ({});", in_clause)
}
/// Generates the sql for batch upsert.
@@ -212,9 +215,9 @@ impl MySqlTemplateSet {
let values_clause = values_placeholders.join(", ");
(
format!(r#"SELECT k, v FROM {table_name} WHERE k IN ({in_clause})"#,),
format!(r#"SELECT k, v FROM `{table_name}` WHERE k IN ({in_clause})"#,),
format!(
r#"INSERT INTO {table_name} (k, v) VALUES {values_clause} ON DUPLICATE KEY UPDATE v = VALUES(v);"#,
r#"INSERT INTO `{table_name}` (k, v) VALUES {values_clause} ON DUPLICATE KEY UPDATE v = VALUES(v);"#,
),
)
}

View File

@@ -157,21 +157,25 @@ impl<'a> PgSqlTemplateFactory<'a> {
PgSqlTemplateSet {
table_name: table_name.to_string(),
create_table_statement: format!(
"CREATE TABLE IF NOT EXISTS {table_name}(k bytea PRIMARY KEY, v bytea)",
"CREATE TABLE IF NOT EXISTS \"{table_name}\"(k bytea PRIMARY KEY, v bytea)",
),
range_template: RangeTemplate {
point: format!("SELECT k, v FROM {table_name} WHERE k = $1"),
range: format!("SELECT k, v FROM {table_name} WHERE k >= $1 AND k < $2 ORDER BY k"),
full: format!("SELECT k, v FROM {table_name} $1 ORDER BY k"),
left_bounded: format!("SELECT k, v FROM {table_name} WHERE k >= $1 ORDER BY k"),
prefix: format!("SELECT k, v FROM {table_name} WHERE k LIKE $1 ORDER BY k"),
point: format!("SELECT k, v FROM \"{table_name}\" WHERE k = $1"),
range: format!(
"SELECT k, v FROM \"{table_name}\" WHERE k >= $1 AND k < $2 ORDER BY k"
),
full: format!("SELECT k, v FROM \"{table_name}\" $1 ORDER BY k"),
left_bounded: format!("SELECT k, v FROM \"{table_name}\" WHERE k >= $1 ORDER BY k"),
prefix: format!("SELECT k, v FROM \"{table_name}\" WHERE k LIKE $1 ORDER BY k"),
},
delete_template: RangeTemplate {
point: format!("DELETE FROM {table_name} WHERE k = $1 RETURNING k,v;"),
range: format!("DELETE FROM {table_name} WHERE k >= $1 AND k < $2 RETURNING k,v;"),
full: format!("DELETE FROM {table_name} RETURNING k,v"),
left_bounded: format!("DELETE FROM {table_name} WHERE k >= $1 RETURNING k,v;"),
prefix: format!("DELETE FROM {table_name} WHERE k LIKE $1 RETURNING k,v;"),
point: format!("DELETE FROM \"{table_name}\" WHERE k = $1 RETURNING k,v;"),
range: format!(
"DELETE FROM \"{table_name}\" WHERE k >= $1 AND k < $2 RETURNING k,v;"
),
full: format!("DELETE FROM \"{table_name}\" RETURNING k,v"),
left_bounded: format!("DELETE FROM \"{table_name}\" WHERE k >= $1 RETURNING k,v;"),
prefix: format!("DELETE FROM \"{table_name}\" WHERE k LIKE $1 RETURNING k,v;"),
},
}
}
@@ -191,7 +195,10 @@ impl PgSqlTemplateSet {
fn generate_batch_get_query(&self, key_len: usize) -> String {
let table_name = &self.table_name;
let in_clause = pg_generate_in_placeholders(1, key_len).join(", ");
format!("SELECT k, v FROM {table_name} WHERE k in ({});", in_clause)
format!(
"SELECT k, v FROM \"{table_name}\" WHERE k in ({});",
in_clause
)
}
/// Generates the sql for batch delete.
@@ -199,7 +206,7 @@ impl PgSqlTemplateSet {
let table_name = &self.table_name;
let in_clause = pg_generate_in_placeholders(1, key_len).join(", ");
format!(
"DELETE FROM {table_name} WHERE k in ({}) RETURNING k,v;",
"DELETE FROM \"{table_name}\" WHERE k in ({}) RETURNING k,v;",
in_clause
)
}
@@ -220,9 +227,9 @@ impl PgSqlTemplateSet {
format!(
r#"
WITH prev AS (
SELECT k,v FROM {table_name} WHERE k IN ({in_clause})
SELECT k,v FROM "{table_name}" WHERE k IN ({in_clause})
), update AS (
INSERT INTO {table_name} (k, v) VALUES
INSERT INTO "{table_name}" (k, v) VALUES
{values_clause}
ON CONFLICT (
k

View File

@@ -39,6 +39,7 @@ pub mod node_manager;
pub mod peer;
pub mod range_stream;
pub mod region_keeper;
pub mod region_registry;
pub mod rpc;
pub mod sequence;
pub mod state_store;

View File

@@ -27,6 +27,7 @@ const TABLE_NAME_LOCK_PREFIX: &str = "__table_name_lock";
const FLOW_NAME_LOCK_PREFIX: &str = "__flow_name_lock";
const REGION_LOCK_PREFIX: &str = "__region_lock";
const FLOW_LOCK_PREFIX: &str = "__flow_lock";
const REMOTE_WAL_LOCK_PREFIX: &str = "__remote_wal_lock";
/// [CatalogLock] acquires the lock on the tenant level.
pub enum CatalogLock<'a> {
@@ -231,6 +232,31 @@ impl From<FlowLock> for StringKey {
}
}
/// [RemoteWalLock] acquires the lock on the remote wal topic level.
pub enum RemoteWalLock {
Read(String),
Write(String),
}
impl Display for RemoteWalLock {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let key = match self {
RemoteWalLock::Read(s) => s,
RemoteWalLock::Write(s) => s,
};
write!(f, "{}/{}", REMOTE_WAL_LOCK_PREFIX, key)
}
}
impl From<RemoteWalLock> for StringKey {
fn from(value: RemoteWalLock) -> Self {
match value {
RemoteWalLock::Write(_) => StringKey::Exclusive(value.to_string()),
RemoteWalLock::Read(_) => StringKey::Share(value.to_string()),
}
}
}
#[cfg(test)]
mod tests {
use common_procedure::StringKey;
@@ -308,5 +334,16 @@ mod tests {
string_key,
StringKey::Exclusive(format!("{}/{}", FLOW_LOCK_PREFIX, flow_id))
);
// The remote wal lock
let string_key: StringKey = RemoteWalLock::Read("foo".to_string()).into();
assert_eq!(
string_key,
StringKey::Share(format!("{}/{}", REMOTE_WAL_LOCK_PREFIX, "foo"))
);
let string_key: StringKey = RemoteWalLock::Write("foo".to_string()).into();
assert_eq!(
string_key,
StringKey::Exclusive(format!("{}/{}", REMOTE_WAL_LOCK_PREFIX, "foo"))
);
}
}

View File

@@ -12,63 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{Display, Formatter};
use std::sync::Arc;
use api::v1::meta::Peer as PbPeer;
use serde::{Deserialize, Serialize};
pub use api::v1::meta::Peer;
use crate::error::Error;
use crate::{DatanodeId, FlownodeId};
#[derive(Debug, Default, Clone, Hash, Eq, PartialEq, Deserialize, Serialize)]
pub struct Peer {
/// Node identifier. Unique in a cluster.
pub id: u64,
pub addr: String,
}
impl From<PbPeer> for Peer {
fn from(p: PbPeer) -> Self {
Self {
id: p.id,
addr: p.addr,
}
}
}
impl From<Peer> for PbPeer {
fn from(p: Peer) -> Self {
Self {
id: p.id,
addr: p.addr,
}
}
}
impl Peer {
pub fn new(id: u64, addr: impl Into<String>) -> Self {
Self {
id,
addr: addr.into(),
}
}
#[cfg(any(test, feature = "testing"))]
pub fn empty(id: u64) -> Self {
Self {
id,
addr: String::new(),
}
}
}
impl Display for Peer {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "peer-{}({})", self.id, self.addr)
}
}
/// can query peer given a node id
#[async_trait::async_trait]
pub trait PeerLookupService {

View File

@@ -0,0 +1,186 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use common_telemetry::warn;
use store_api::storage::RegionId;
use crate::datanode::RegionManifestInfo;
/// Represents information about a leader region in the cluster.
/// Contains the datanode id where the leader is located,
/// and the current manifest version.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct LeaderRegion {
pub datanode_id: u64,
pub manifest: LeaderRegionManifestInfo,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum LeaderRegionManifestInfo {
Mito {
manifest_version: u64,
flushed_entry_id: u64,
},
Metric {
data_manifest_version: u64,
data_flushed_entry_id: u64,
metadata_manifest_version: u64,
metadata_flushed_entry_id: u64,
},
}
impl From<RegionManifestInfo> for LeaderRegionManifestInfo {
fn from(value: RegionManifestInfo) -> Self {
match value {
RegionManifestInfo::Mito {
manifest_version,
flushed_entry_id,
} => LeaderRegionManifestInfo::Mito {
manifest_version,
flushed_entry_id,
},
RegionManifestInfo::Metric {
data_manifest_version,
data_flushed_entry_id,
metadata_manifest_version,
metadata_flushed_entry_id,
} => LeaderRegionManifestInfo::Metric {
data_manifest_version,
data_flushed_entry_id,
metadata_manifest_version,
metadata_flushed_entry_id,
},
}
}
}
impl LeaderRegionManifestInfo {
/// Returns the manifest version of the leader region.
pub fn manifest_version(&self) -> u64 {
match self {
LeaderRegionManifestInfo::Mito {
manifest_version, ..
} => *manifest_version,
LeaderRegionManifestInfo::Metric {
data_manifest_version,
..
} => *data_manifest_version,
}
}
/// Returns the flushed entry id of the leader region.
pub fn flushed_entry_id(&self) -> u64 {
match self {
LeaderRegionManifestInfo::Mito {
flushed_entry_id, ..
} => *flushed_entry_id,
LeaderRegionManifestInfo::Metric {
data_flushed_entry_id,
..
} => *data_flushed_entry_id,
}
}
/// Returns the minimum flushed entry id of the leader region.
/// It is used to determine the minimum flushed entry id that can be pruned in remote wal.
pub fn min_flushed_entry_id(&self) -> u64 {
match self {
LeaderRegionManifestInfo::Mito {
flushed_entry_id, ..
} => *flushed_entry_id,
LeaderRegionManifestInfo::Metric {
data_flushed_entry_id,
metadata_flushed_entry_id,
..
} => (*data_flushed_entry_id).min(*metadata_flushed_entry_id),
}
}
}
pub type LeaderRegionRegistryRef = Arc<LeaderRegionRegistry>;
/// Registry that maintains a mapping of all leader regions in the cluster.
/// Tracks which datanode is hosting the leader for each region and the corresponding
/// manifest version.
#[derive(Default)]
pub struct LeaderRegionRegistry {
inner: RwLock<HashMap<RegionId, LeaderRegion>>,
}
impl LeaderRegionRegistry {
/// Creates a new empty leader region registry.
pub fn new() -> Self {
Self {
inner: RwLock::new(HashMap::new()),
}
}
/// Gets the leader region for the given region ids.
pub fn batch_get<I: Iterator<Item = RegionId>>(
&self,
region_ids: I,
) -> HashMap<RegionId, LeaderRegion> {
let inner = self.inner.read().unwrap();
region_ids
.into_iter()
.flat_map(|region_id| {
inner
.get(&region_id)
.map(|leader_region| (region_id, *leader_region))
})
.collect::<HashMap<_, _>>()
}
/// Puts the leader regions into the registry.
pub fn batch_put(&self, key_values: Vec<(RegionId, LeaderRegion)>) {
let mut inner = self.inner.write().unwrap();
for (region_id, leader_region) in key_values {
match inner.entry(region_id) {
Entry::Vacant(entry) => {
entry.insert(leader_region);
}
Entry::Occupied(mut entry) => {
let manifest_version = entry.get().manifest.manifest_version();
if manifest_version > leader_region.manifest.manifest_version() {
warn!(
"Received a leader region with a smaller manifest version than the existing one, ignore it. region: {}, existing_manifest_version: {}, new_manifest_version: {}",
region_id,
manifest_version,
leader_region.manifest.manifest_version()
);
} else {
entry.insert(leader_region);
}
}
}
}
}
pub fn batch_delete<I: Iterator<Item = RegionId>>(&self, region_ids: I) {
let mut inner = self.inner.write().unwrap();
for region_id in region_ids {
inner.remove(&region_id);
}
}
/// Resets the registry to an empty state.
pub fn reset(&self) {
let mut inner = self.inner.write().unwrap();
inner.clear();
}
}

View File

@@ -1240,6 +1240,7 @@ impl From<QueryContext> for PbQueryContext {
extensions,
channel: channel as u32,
snapshot_seqs: None,
explain: None,
}
}
}

View File

@@ -290,13 +290,13 @@ mod tests {
num_per_range: u32,
max_bytes: u32,
) {
let num_cases = rand::thread_rng().gen_range(1..=8);
let num_cases = rand::rng().random_range(1..=8);
common_telemetry::info!("num_cases: {}", num_cases);
let mut cases = Vec::with_capacity(num_cases);
for i in 0..num_cases {
let size = rand::thread_rng().gen_range(size_limit..=max_bytes);
let size = rand::rng().random_range(size_limit..=max_bytes);
let mut large_value = vec![0u8; size as usize];
rand::thread_rng().fill_bytes(&mut large_value);
rand::rng().fill_bytes(&mut large_value);
// Starts from `a`.
let prefix = format!("{}/", std::char::from_u32(97 + i as u32).unwrap());
@@ -354,8 +354,8 @@ mod tests {
#[tokio::test]
async fn test_meta_state_store_split_value() {
let size_limit = rand::thread_rng().gen_range(128..=512);
let page_size = rand::thread_rng().gen_range(1..10);
let size_limit = rand::rng().random_range(128..=512);
let page_size = rand::rng().random_range(1..10);
let kv_backend = Arc::new(MemoryKvBackend::new());
test_meta_state_store_split_value_with_size_limit(kv_backend, size_limit, page_size, 8192)
.await;
@@ -388,7 +388,7 @@ mod tests {
// However, some KvBackends, the `ChrootKvBackend`, will add the prefix to `key`;
// we don't know the exact size of the key.
let size_limit = 1536 * 1024 - key_size;
let page_size = rand::thread_rng().gen_range(1..10);
let page_size = rand::rng().random_range(1..10);
test_meta_state_store_split_value_with_size_limit(
kv_backend,
size_limit,

View File

@@ -35,6 +35,7 @@ use crate::node_manager::{
};
use crate::peer::{Peer, PeerLookupService};
use crate::region_keeper::MemoryRegionKeeper;
use crate::region_registry::LeaderRegionRegistry;
use crate::sequence::SequenceBuilder;
use crate::wal_options_allocator::WalOptionsAllocator;
use crate::{DatanodeId, FlownodeId};
@@ -177,6 +178,7 @@ pub fn new_ddl_context_with_kv_backend(
node_manager,
cache_invalidator: Arc::new(DummyCacheInvalidator),
memory_region_keeper: Arc::new(MemoryRegionKeeper::new()),
leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
table_metadata_allocator,
table_metadata_manager,
flow_metadata_allocator,

View File

@@ -30,7 +30,7 @@ use crate::error::{EncodeWalOptionsSnafu, InvalidTopicNamePrefixSnafu, Result};
use crate::key::NAME_PATTERN_REGEX;
use crate::kv_backend::KvBackendRef;
use crate::leadership_notifier::LeadershipChangeListener;
use crate::wal_options_allocator::topic_creator::build_kafka_topic_creator;
pub use crate::wal_options_allocator::topic_creator::build_kafka_topic_creator;
use crate::wal_options_allocator::topic_pool::KafkaTopicPool;
/// Allocates wal options in region granularity.
@@ -53,21 +53,12 @@ impl WalOptionsAllocator {
}
}
/// Allocates a wal options for a region.
pub fn alloc(&self) -> Result<WalOptions> {
match self {
Self::RaftEngine => Ok(WalOptions::RaftEngine),
Self::Kafka(topic_manager) => {
let topic = topic_manager.select()?;
Ok(WalOptions::Kafka(KafkaWalOptions {
topic: topic.clone(),
}))
}
}
}
/// Allocates a batch of wal options where each wal options goes to a region.
pub fn alloc_batch(&self, num_regions: usize) -> Result<Vec<WalOptions>> {
/// If skip_wal is true, the wal options will be set to Noop regardless of the allocator type.
pub fn alloc_batch(&self, num_regions: usize, skip_wal: bool) -> Result<Vec<WalOptions>> {
if skip_wal {
return Ok(vec![WalOptions::Noop; num_regions]);
}
match self {
WalOptionsAllocator::RaftEngine => Ok(vec![WalOptions::RaftEngine; num_regions]),
WalOptionsAllocator::Kafka(topic_manager) => {
@@ -130,9 +121,10 @@ pub async fn build_wal_options_allocator(
pub fn allocate_region_wal_options(
regions: Vec<RegionNumber>,
wal_options_allocator: &WalOptionsAllocator,
skip_wal: bool,
) -> Result<HashMap<RegionNumber, String>> {
let wal_options = wal_options_allocator
.alloc_batch(regions.len())?
.alloc_batch(regions.len(), skip_wal)?
.into_iter()
.map(|wal_options| {
serde_json::to_string(&wal_options).context(EncodeWalOptionsSnafu { wal_options })
@@ -177,7 +169,7 @@ mod tests {
let num_regions = 32;
let regions = (0..num_regions).collect::<Vec<_>>();
let got = allocate_region_wal_options(regions.clone(), &allocator).unwrap();
let got = allocate_region_wal_options(regions.clone(), &allocator, false).unwrap();
let encoded_wal_options = serde_json::to_string(&WalOptions::RaftEngine).unwrap();
let expected = regions
@@ -237,7 +229,7 @@ mod tests {
let num_regions = 32;
let regions = (0..num_regions).collect::<Vec<_>>();
let got = allocate_region_wal_options(regions.clone(), &allocator).unwrap();
let got = allocate_region_wal_options(regions.clone(), &allocator, false).unwrap();
// Check the allocated wal options contain the expected topics.
let expected = (0..num_regions)
@@ -253,4 +245,18 @@ mod tests {
})
.await;
}
#[tokio::test]
async fn test_allocator_with_skip_wal() {
let allocator = WalOptionsAllocator::RaftEngine;
allocator.start().await.unwrap();
let num_regions = 32;
let regions = (0..num_regions).collect::<Vec<_>>();
let got = allocate_region_wal_options(regions.clone(), &allocator, true).unwrap();
assert_eq!(got.len(), num_regions as usize);
for wal_options in got.values() {
assert_eq!(wal_options, &"{\"wal.provider\":\"noop\"}");
}
}
}

View File

@@ -39,7 +39,7 @@ impl RoundRobinTopicSelector {
// The cursor in the round-robin selector is not persisted which may break the round-robin strategy cross crashes.
// Introducing a shuffling strategy may help mitigate this issue.
pub fn with_shuffle() -> Self {
let offset = rand::thread_rng().gen_range(0..64);
let offset = rand::rng().random_range(0..64);
Self {
cursor: AtomicUsize::new(offset),
}

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_telemetry::{error, info};
use common_wal::config::kafka::MetasrvKafkaConfig;
use rskafka::client::error::Error as RsKafkaError;
@@ -32,9 +34,11 @@ use crate::error::{
// The `DEFAULT_PARTITION` refers to the index of the partition.
const DEFAULT_PARTITION: i32 = 0;
type KafkaClientRef = Arc<Client>;
/// Creates topics in kafka.
pub struct KafkaTopicCreator {
client: Client,
client: KafkaClientRef,
/// The number of partitions per topic.
num_partitions: i32,
/// The replication factor of each topic.
@@ -44,6 +48,10 @@ pub struct KafkaTopicCreator {
}
impl KafkaTopicCreator {
pub fn client(&self) -> &KafkaClientRef {
&self.client
}
async fn create_topic(&self, topic: &String, client: &Client) -> Result<()> {
let controller = client
.controller_client()
@@ -151,7 +159,7 @@ pub async fn build_kafka_topic_creator(config: &MetasrvKafkaConfig) -> Result<Ka
})?;
Ok(KafkaTopicCreator {
client,
client: Arc::new(client),
num_partitions: config.kafka_topic.num_partitions,
replication_factor: config.kafka_topic.replication_factor,
create_topic_timeout: config.kafka_topic.create_topic_timeout.as_millis() as i32,

View File

@@ -58,6 +58,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Too many running procedures, max: {}", max_running_procedures))]
TooManyRunningProcedures {
max_running_procedures: usize,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to put state, key: '{key}'"))]
PutState {
key: String,
@@ -192,7 +199,8 @@ impl ErrorExt for Error {
| Error::FromJson { .. }
| Error::WaitWatcher { .. }
| Error::RetryLater { .. }
| Error::RollbackProcedureRecovered { .. } => StatusCode::Internal,
| Error::RollbackProcedureRecovered { .. }
| Error::TooManyRunningProcedures { .. } => StatusCode::Internal,
Error::RetryTimesExceeded { .. }
| Error::RollbackTimesExceeded { .. }

View File

@@ -15,7 +15,8 @@
mod runner;
mod rwlock;
use std::collections::{HashMap, VecDeque};
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant};
@@ -33,6 +34,7 @@ use self::rwlock::KeyRwLock;
use crate::error::{
self, DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ManagerNotStartSnafu, Result,
StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu,
TooManyRunningProceduresSnafu,
};
use crate::local::runner::Runner;
use crate::procedure::{BoxedProcedureLoader, InitProcedureState, ProcedureInfo};
@@ -147,7 +149,6 @@ type ProcedureMetaRef = Arc<ProcedureMeta>;
/// Procedure loaded from store.
struct LoadedProcedure {
procedure: BoxedProcedure,
parent_id: Option<ProcedureId>,
step: u32,
}
@@ -157,8 +158,7 @@ pub(crate) struct ManagerContext {
loaders: Mutex<HashMap<String, BoxedProcedureLoader>>,
key_lock: KeyRwLock<String>,
procedures: RwLock<HashMap<ProcedureId, ProcedureMetaRef>>,
/// Messages loaded from the procedure store.
messages: Mutex<HashMap<ProcedureId, ProcedureMessage>>,
running_procedures: Mutex<HashSet<ProcedureId>>,
/// Ids and finished time of finished procedures.
finished_procedures: Mutex<VecDeque<(ProcedureId, Instant)>>,
/// Running flag.
@@ -179,7 +179,7 @@ impl ManagerContext {
key_lock: KeyRwLock::new(),
loaders: Mutex::new(HashMap::new()),
procedures: RwLock::new(HashMap::new()),
messages: Mutex::new(HashMap::new()),
running_procedures: Mutex::new(HashSet::new()),
finished_procedures: Mutex::new(VecDeque::new()),
running: Arc::new(AtomicBool::new(false)),
}
@@ -210,18 +210,27 @@ impl ManagerContext {
procedures.contains_key(&procedure_id)
}
/// Returns the number of running procedures.
fn num_running_procedures(&self) -> usize {
self.running_procedures.lock().unwrap().len()
}
/// Try to insert the `procedure` to the context if there is no procedure
/// with same [ProcedureId].
///
/// Returns `false` if there is already a procedure using the same [ProcedureId].
fn try_insert_procedure(&self, meta: ProcedureMetaRef) -> bool {
let procedure_id = meta.id;
let mut procedures = self.procedures.write().unwrap();
if procedures.contains_key(&meta.id) {
return false;
match procedures.entry(procedure_id) {
Entry::Occupied(_) => return false,
Entry::Vacant(vacant_entry) => {
vacant_entry.insert(meta);
}
}
let old = procedures.insert(meta.id, meta);
debug_assert!(old.is_none());
let mut running_procedures = self.running_procedures.lock().unwrap();
running_procedures.insert(procedure_id);
true
}
@@ -264,16 +273,6 @@ impl ManagerContext {
}
}
/// Load procedure with specific `procedure_id` from cached [ProcedureMessage]s.
fn load_one_procedure(&self, procedure_id: ProcedureId) -> Option<LoadedProcedure> {
let message = {
let messages = self.messages.lock().unwrap();
messages.get(&procedure_id).cloned()?
};
self.load_one_procedure_from_message(procedure_id, &message)
}
/// Load procedure from specific [ProcedureMessage].
fn load_one_procedure_from_message(
&self,
@@ -301,7 +300,6 @@ impl ManagerContext {
Some(LoadedProcedure {
procedure,
parent_id: message.parent_id,
step: message.step,
})
}
@@ -350,23 +348,19 @@ impl ManagerContext {
}
}
/// Remove cached [ProcedureMessage] by ids.
fn remove_messages(&self, procedure_ids: &[ProcedureId]) {
let mut messages = self.messages.lock().unwrap();
for procedure_id in procedure_ids {
let _ = messages.remove(procedure_id);
}
}
/// Clean resources of finished procedures.
fn on_procedures_finish(&self, procedure_ids: &[ProcedureId]) {
self.remove_messages(procedure_ids);
// Since users need to query the procedure state, so we can't remove the
// meta of the procedure directly.
let now = Instant::now();
let mut finished_procedures = self.finished_procedures.lock().unwrap();
finished_procedures.extend(procedure_ids.iter().map(|id| (*id, now)));
// Remove the procedures from the running set.
let mut running_procedures = self.running_procedures.lock().unwrap();
for procedure_id in procedure_ids {
running_procedures.remove(procedure_id);
}
}
/// Remove metadata of outdated procedures.
@@ -410,6 +404,7 @@ pub struct ManagerConfig {
pub retry_delay: Duration,
pub remove_outdated_meta_task_interval: Duration,
pub remove_outdated_meta_ttl: Duration,
pub max_running_procedures: usize,
}
impl Default for ManagerConfig {
@@ -420,6 +415,7 @@ impl Default for ManagerConfig {
retry_delay: Duration::from_millis(500),
remove_outdated_meta_task_interval: Duration::from_secs(60 * 10),
remove_outdated_meta_ttl: META_TTL,
max_running_procedures: 128,
}
}
}
@@ -492,6 +488,13 @@ impl LocalManager {
let watcher = meta.state_receiver.clone();
ensure!(
self.manager_ctx.num_running_procedures() < self.config.max_running_procedures,
TooManyRunningProceduresSnafu {
max_running_procedures: self.config.max_running_procedures,
}
);
// Inserts meta into the manager before actually spawnd the runner.
ensure!(
self.manager_ctx.try_insert_procedure(meta),
@@ -1119,6 +1122,7 @@ mod tests {
retry_delay: Duration::from_millis(500),
remove_outdated_meta_task_interval: Duration::from_millis(1),
remove_outdated_meta_ttl: Duration::from_millis(1),
max_running_procedures: 128,
};
let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
let manager = LocalManager::new(config, state_store);
@@ -1191,6 +1195,69 @@ mod tests {
.is_none());
}
#[tokio::test]
async fn test_too_many_running_procedures() {
let dir = create_temp_dir("too_many_running_procedures");
let config = ManagerConfig {
parent_path: "data/".to_string(),
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
max_running_procedures: 1,
..Default::default()
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let manager = LocalManager::new(config, state_store);
manager.manager_ctx.set_running();
manager
.manager_ctx
.running_procedures
.lock()
.unwrap()
.insert(ProcedureId::random());
manager.start().await.unwrap();
// Submit a new procedure should fail.
let mut procedure = ProcedureToLoad::new("submit");
procedure.lock_key = LockKey::single_exclusive("test.submit");
let procedure_id = ProcedureId::random();
let err = manager
.submit(ProcedureWithId {
id: procedure_id,
procedure: Box::new(procedure),
})
.await
.unwrap_err();
assert!(matches!(err, Error::TooManyRunningProcedures { .. }));
manager
.manager_ctx
.running_procedures
.lock()
.unwrap()
.clear();
// Submit a new procedure should succeed.
let mut procedure = ProcedureToLoad::new("submit");
procedure.lock_key = LockKey::single_exclusive("test.submit");
assert!(manager
.submit(ProcedureWithId {
id: procedure_id,
procedure: Box::new(procedure),
})
.await
.is_ok());
assert!(manager
.procedure_state(procedure_id)
.await
.unwrap()
.is_some());
// Wait for the procedure done.
let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
watcher.changed().await.unwrap();
assert!(watcher.borrow().is_done());
}
#[derive(Debug)]
struct ProcedureToRecover {
content: String,

View File

@@ -207,7 +207,7 @@ impl Runner {
if let Some(d) = retry.next() {
let millis = d.as_millis() as u64;
// Add random noise to the retry delay to avoid retry storms.
let noise = rand::thread_rng().gen_range(0..(millis / 4) + 1);
let noise = rand::rng().random_range(0..(millis / 4) + 1);
let d = d.add(Duration::from_millis(noise));
self.wait_on_err(d, retry_times).await;
@@ -348,24 +348,14 @@ impl Runner {
&self,
procedure_id: ProcedureId,
procedure_state: ProcedureState,
mut procedure: BoxedProcedure,
procedure: BoxedProcedure,
) {
if self.manager_ctx.contains_procedure(procedure_id) {
// If the parent has already submitted this procedure, don't submit it again.
return;
}
let mut step = 0;
if let Some(loaded_procedure) = self.manager_ctx.load_one_procedure(procedure_id) {
// Try to load procedure state from the message to avoid re-run the subprocedure
// from initial state.
assert_eq!(self.meta.id, loaded_procedure.parent_id.unwrap());
// Use the dumped procedure from the procedure store.
procedure = loaded_procedure.procedure;
// Update step number.
step = loaded_procedure.step;
}
let step = 0;
let meta = Arc::new(ProcedureMeta::new(
procedure_id,

View File

@@ -29,6 +29,8 @@ pub struct ProcedureConfig {
pub retry_delay: Duration,
/// `None` stands for no limit.
pub max_metadata_value_size: Option<ReadableSize>,
/// Max running procedures.
pub max_running_procedures: usize,
}
impl Default for ProcedureConfig {
@@ -37,6 +39,7 @@ impl Default for ProcedureConfig {
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
max_metadata_value_size: None,
max_running_procedures: 128,
}
}
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Display;
use std::fmt::{self, Display};
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
@@ -28,7 +28,7 @@ use datafusion::logical_expr::Expr;
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_plan::metrics::{BaselineMetrics, MetricValue};
use datafusion::physical_plan::{
accept, displayable, ExecutionPlan, ExecutionPlanVisitor, PhysicalExpr,
accept, DisplayFormatType, ExecutionPlan, ExecutionPlanVisitor, PhysicalExpr,
RecordBatchStream as DfRecordBatchStream,
};
use datafusion_common::arrow::error::ArrowError;
@@ -206,13 +206,16 @@ impl Stream for DfRecordBatchStreamAdapter {
}
/// DataFusion [SendableRecordBatchStream](DfSendableRecordBatchStream) -> Greptime [RecordBatchStream].
/// The reverse one is [DfRecordBatchStreamAdapter]
/// The reverse one is [DfRecordBatchStreamAdapter].
/// It can collect metrics from DataFusion execution plan.
pub struct RecordBatchStreamAdapter {
schema: SchemaRef,
stream: DfSendableRecordBatchStream,
metrics: Option<BaselineMetrics>,
/// Aggregated plan-level metrics. Resolved after an [ExecutionPlan] is finished.
metrics_2: Metrics,
/// Display plan and metrics in verbose mode.
explain_verbose: bool,
}
/// Json encoded metrics. Contains metric from a whole plan tree.
@@ -231,6 +234,7 @@ impl RecordBatchStreamAdapter {
stream,
metrics: None,
metrics_2: Metrics::Unavailable,
explain_verbose: false,
})
}
@@ -246,12 +250,18 @@ impl RecordBatchStreamAdapter {
stream,
metrics: Some(metrics),
metrics_2: Metrics::Unresolved(df_plan),
explain_verbose: false,
})
}
pub fn set_metrics2(&mut self, plan: Arc<dyn ExecutionPlan>) {
self.metrics_2 = Metrics::Unresolved(plan)
}
/// Set the verbose mode for displaying plan and metrics.
pub fn set_explain_verbose(&mut self, verbose: bool) {
self.explain_verbose = verbose;
}
}
impl RecordBatchStream for RecordBatchStreamAdapter {
@@ -296,7 +306,7 @@ impl Stream for RecordBatchStreamAdapter {
}
Poll::Ready(None) => {
if let Metrics::Unresolved(df_plan) = &self.metrics_2 {
let mut metric_collector = MetricCollector::default();
let mut metric_collector = MetricCollector::new(self.explain_verbose);
accept(df_plan.as_ref(), &mut metric_collector).unwrap();
self.metrics_2 = Metrics::Resolved(metric_collector.record_batch_metrics);
}
@@ -312,10 +322,20 @@ impl Stream for RecordBatchStreamAdapter {
}
/// An [ExecutionPlanVisitor] to collect metrics from a [ExecutionPlan].
#[derive(Default)]
pub struct MetricCollector {
current_level: usize,
pub record_batch_metrics: RecordBatchMetrics,
verbose: bool,
}
impl MetricCollector {
pub fn new(verbose: bool) -> Self {
Self {
current_level: 0,
record_batch_metrics: RecordBatchMetrics::default(),
verbose,
}
}
}
impl ExecutionPlanVisitor for MetricCollector {
@@ -339,7 +359,7 @@ impl ExecutionPlanVisitor for MetricCollector {
.sorted_for_display()
.timestamps_removed();
let mut plan_metric = PlanMetrics {
plan: displayable(plan).one_line().to_string(),
plan: one_line(plan, self.verbose).to_string(),
level: self.current_level,
metrics: Vec::with_capacity(metric.iter().size_hint().0),
};
@@ -371,6 +391,29 @@ impl ExecutionPlanVisitor for MetricCollector {
}
}
/// Returns a single-line summary of the root of the plan.
/// If the `verbose` flag is set, it will display detailed information about the plan.
fn one_line(plan: &dyn ExecutionPlan, verbose: bool) -> impl fmt::Display + '_ {
struct Wrapper<'a> {
plan: &'a dyn ExecutionPlan,
format_type: DisplayFormatType,
}
impl fmt::Display for Wrapper<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.plan.fmt_as(self.format_type, f)?;
writeln!(f)
}
}
let format_type = if verbose {
DisplayFormatType::Verbose
} else {
DisplayFormatType::Default
};
Wrapper { plan, format_type }
}
/// [`RecordBatchMetrics`] carrys metrics value
/// from datanode to frontend through gRPC
#[derive(serde::Serialize, serde::Deserialize, Default, Debug, Clone)]

View File

@@ -22,10 +22,12 @@ use datafusion::physical_plan::PhysicalExpr;
use datafusion_common::arrow::array::{ArrayRef, Datum, Scalar};
use datafusion_common::arrow::buffer::BooleanBuffer;
use datafusion_common::arrow::compute::kernels::cmp;
use datafusion_common::cast::{as_boolean_array, as_null_array};
use datafusion_common::cast::{as_boolean_array, as_null_array, as_string_array};
use datafusion_common::{internal_err, DataFusionError, ScalarValue};
use datatypes::arrow::array::{Array, BooleanArray, RecordBatch};
use datatypes::arrow::compute::filter_record_batch;
use datatypes::arrow::error::ArrowError;
use datatypes::compute::kernels::regexp;
use datatypes::compute::or_kleene;
use datatypes::vectors::VectorRef;
use snafu::ResultExt;
@@ -36,7 +38,8 @@ use crate::error::{ArrowComputeSnafu, Result, ToArrowScalarSnafu, UnsupportedOpe
/// - `col` `op` `literal`
/// - `literal` `op` `col`
///
/// And the `op` is one of `=`, `!=`, `>`, `>=`, `<`, `<=`.
/// And the `op` is one of `=`, `!=`, `>`, `>=`, `<`, `<=`,
/// or regex operators: `~`, `~*`, `!~`, `!~*`.
///
/// This struct contains normalized predicate expr. In the form of
/// `col` `op` `literal` where the `col` is provided from input.
@@ -86,7 +89,11 @@ impl SimpleFilterEvaluator {
| Operator::Lt
| Operator::LtEq
| Operator::Gt
| Operator::GtEq => {}
| Operator::GtEq
| Operator::RegexMatch
| Operator::RegexIMatch
| Operator::RegexNotMatch
| Operator::RegexNotIMatch => {}
Operator::Or => {
let lhs = Self::try_new(&binary.left)?;
let rhs = Self::try_new(&binary.right)?;
@@ -172,6 +179,10 @@ impl SimpleFilterEvaluator {
Operator::LtEq => cmp::lt_eq(input, &self.literal),
Operator::Gt => cmp::gt(input, &self.literal),
Operator::GtEq => cmp::gt_eq(input, &self.literal),
Operator::RegexMatch => self.regex_match(input, false, false),
Operator::RegexIMatch => self.regex_match(input, true, false),
Operator::RegexNotMatch => self.regex_match(input, false, true),
Operator::RegexNotIMatch => self.regex_match(input, true, true),
Operator::Or => {
// OR operator stands for OR-chained EQs (or INLIST in other words)
let mut result: BooleanArray = vec![false; input_len].into();
@@ -192,6 +203,28 @@ impl SimpleFilterEvaluator {
.context(ArrowComputeSnafu)
.map(|array| array.values().clone())
}
fn regex_match(
&self,
input: &impl Datum,
ignore_case: bool,
negative: bool,
) -> std::result::Result<BooleanArray, ArrowError> {
let flag = if ignore_case { Some("i") } else { None };
let array = input.get().0;
let string_array = as_string_array(array).map_err(|_| {
ArrowError::CastError(format!("Cannot cast {:?} to StringArray", array))
})?;
let literal_array = self.literal.clone().into_inner();
let regex_array = as_string_array(&literal_array).map_err(|_| {
ArrowError::CastError(format!("Cannot cast {:?} to StringArray", literal_array))
})?;
let mut result = regexp::regexp_is_match_scalar(string_array, regex_array.value(0), flag)?;
if negative {
result = datatypes::compute::not(&result)?;
}
Ok(result)
}
}
/// Evaluate the predicate on the input [RecordBatch], and return a new [RecordBatch].

View File

@@ -0,0 +1,11 @@
[package]
name = "common-session"
version.workspace = true
edition.workspace = true
license.workspace = true
[lints]
workspace = true
[dependencies]
strum.workspace = true

View File

@@ -0,0 +1,45 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use strum::{AsRefStr, Display, EnumString};
/// Defines the read preference for frontend route operations,
/// determining whether to read from the region leader or follower.
#[derive(Debug, Clone, Copy, Default, EnumString, Display, AsRefStr, PartialEq, Eq)]
pub enum ReadPreference {
#[default]
// Reads all operations from the region leader. This is the default mode.
#[strum(serialize = "leader", to_string = "LEADER")]
Leader,
}
#[cfg(test)]
mod tests {
use std::str::FromStr;
use crate::ReadPreference;
#[test]
fn test_read_preference() {
assert_eq!(ReadPreference::Leader.to_string(), "LEADER");
let read_preference = ReadPreference::from_str("LEADER").unwrap();
assert_eq!(read_preference, ReadPreference::Leader);
let read_preference = ReadPreference::from_str("leader").unwrap();
assert_eq!(read_preference, ReadPreference::Leader);
ReadPreference::from_str("follower").unwrap_err();
}
}

View File

@@ -22,6 +22,6 @@ static PORTS: OnceCell<AtomicUsize> = OnceCell::new();
/// Return a unique port(in runtime) for test
pub fn get_port() -> usize {
PORTS
.get_or_init(|| AtomicUsize::new(rand::thread_rng().gen_range(13000..13800)))
.get_or_init(|| AtomicUsize::new(rand::rng().random_range(13000..13800)))
.fetch_add(1, Ordering::Relaxed)
}

View File

@@ -715,10 +715,10 @@ mod tests {
TimeUnit::Microsecond,
TimeUnit::Nanosecond,
];
let mut rng = rand::thread_rng();
let unit_idx: usize = rng.gen_range(0..4);
let mut rng = rand::rng();
let unit_idx: usize = rng.random_range(0..4);
let unit = units[unit_idx];
let value: i64 = rng.gen();
let value: i64 = rng.random();
Timestamp::new(value, unit)
}
@@ -745,8 +745,8 @@ mod tests {
/// Generate timestamp less than or equal to `threshold`
fn gen_ts_le(threshold: &Timestamp) -> Timestamp {
let mut rng = rand::thread_rng();
let timestamp = rng.gen_range(i64::MIN..=threshold.value);
let mut rng = rand::rng();
let timestamp = rng.random_range(i64::MIN..=threshold.value);
Timestamp::new(timestamp, threshold.unit)
}

View File

@@ -33,6 +33,7 @@ pub enum WalOptions {
RaftEngine,
#[serde(with = "kafka_prefix")]
Kafka(KafkaWalOptions),
Noop,
}
with_prefix!(kafka_prefix "wal.kafka.");
@@ -62,5 +63,14 @@ mod tests {
let decoded: WalOptions = serde_json::from_str(&encoded).unwrap();
assert_eq!(decoded, wal_options);
// Test serde noop wal options.
let wal_options = WalOptions::Noop;
let encoded = serde_json::to_string(&wal_options).unwrap();
let expected = r#"{"wal.provider":"noop"}"#;
assert_eq!(&encoded, expected);
let decoded: WalOptions = serde_json::from_str(&encoded).unwrap();
assert_eq!(decoded, wal_options);
}
}

View File

@@ -58,17 +58,24 @@ pub struct RegionAliveKeeper {
/// non-decreasing). The heartbeat requests will carry the duration since this epoch, and the
/// duration acts like an "invariant point" for region's keep alive lease.
epoch: Instant,
countdown_task_ext_handler: Option<CountdownTaskHandlerExtRef>,
}
impl RegionAliveKeeper {
/// Returns an empty [RegionAliveKeeper].
pub fn new(region_server: RegionServer, heartbeat_interval_millis: u64) -> Self {
pub fn new(
region_server: RegionServer,
countdown_task_ext_handler: Option<CountdownTaskHandlerExtRef>,
heartbeat_interval_millis: u64,
) -> Self {
Self {
region_server,
tasks: Arc::new(Mutex::new(HashMap::new())),
heartbeat_interval_millis,
started: Arc::new(AtomicBool::new(false)),
epoch: Instant::now(),
countdown_task_ext_handler,
}
}
@@ -85,6 +92,7 @@ impl RegionAliveKeeper {
let handle = Arc::new(CountdownTaskHandle::new(
self.region_server.clone(),
self.countdown_task_ext_handler.clone(),
region_id,
));
@@ -114,7 +122,9 @@ impl RegionAliveKeeper {
for region in regions {
let (role, region_id) = (region.role().into(), RegionId::from(region.region_id));
if let Some(handle) = self.find_handle(region_id).await {
handle.reset_deadline(role, deadline).await;
handle
.reset_deadline(role, deadline, region.extensions.clone())
.await;
} else {
warn!(
"Trying to renew the lease for region {region_id}, the keeper handler is not found!"
@@ -265,13 +275,27 @@ enum CountdownCommand {
/// 4 * `heartbeat_interval_millis`
Start(u64),
/// Reset countdown deadline to the given instance.
/// (NextRole, Deadline)
Reset((RegionRole, Instant)),
/// (NextRole, Deadline, ExtensionInfo)
Reset((RegionRole, Instant, HashMap<String, Vec<u8>>)),
/// Returns the current deadline of the countdown task.
#[cfg(test)]
Deadline(oneshot::Sender<Instant>),
}
pub type CountdownTaskHandlerExtRef = Arc<dyn CountdownTaskExtHandler>;
/// Extension trait for [CountdownTaskHandle] to reset deadline method.
#[async_trait]
pub trait CountdownTaskExtHandler: Send + Sync {
async fn reset_deadline(
&self,
region_server: &RegionServer,
role: RegionRole,
deadline: Instant,
extension_info: HashMap<String, Vec<u8>>,
);
}
struct CountdownTaskHandle {
tx: mpsc::Sender<CountdownCommand>,
handler: JoinHandle<()>,
@@ -280,11 +304,16 @@ struct CountdownTaskHandle {
impl CountdownTaskHandle {
/// Creates a new [CountdownTaskHandle] and starts the countdown task.
fn new(region_server: RegionServer, region_id: RegionId) -> Self {
fn new(
region_server: RegionServer,
handler_ext: Option<CountdownTaskHandlerExtRef>,
region_id: RegionId,
) -> Self {
let (tx, rx) = mpsc::channel(1024);
let mut countdown_task = CountdownTask {
region_server,
handler_ext,
region_id,
rx,
};
@@ -323,10 +352,15 @@ impl CountdownTaskHandle {
None
}
async fn reset_deadline(&self, role: RegionRole, deadline: Instant) {
async fn reset_deadline(
&self,
role: RegionRole,
deadline: Instant,
extension_info: HashMap<String, Vec<u8>>,
) {
if let Err(e) = self
.tx
.send(CountdownCommand::Reset((role, deadline)))
.send(CountdownCommand::Reset((role, deadline, extension_info)))
.await
{
warn!(
@@ -350,6 +384,7 @@ impl Drop for CountdownTaskHandle {
struct CountdownTask {
region_server: RegionServer,
region_id: RegionId,
handler_ext: Option<CountdownTaskHandlerExtRef>,
rx: mpsc::Receiver<CountdownCommand>,
}
@@ -379,8 +414,18 @@ impl CountdownTask {
started = true;
}
},
Some(CountdownCommand::Reset((role, deadline))) => {
let _ = self.region_server.set_region_role(self.region_id, role);
Some(CountdownCommand::Reset((role, deadline, extension_info))) => {
if let Err(err) = self.region_server.set_region_role(self.region_id, role) {
error!(err; "Failed to set region role to {role} for region {region_id}");
}
if let Some(ext_handler) = self.handler_ext.as_ref() {
ext_handler.reset_deadline(
&self.region_server,
role,
deadline,
extension_info,
).await;
}
trace!(
"Reset deadline of region {region_id} to approximately {} seconds later.",
(deadline - Instant::now()).as_secs_f32(),
@@ -402,7 +447,9 @@ impl CountdownTask {
}
() = &mut countdown => {
warn!("The region {region_id} lease is expired, convert region to follower.");
let _ = self.region_server.set_region_role(self.region_id, RegionRole::Follower);
if let Err(err) = self.region_server.set_region_role(self.region_id, RegionRole::Follower) {
error!(err; "Failed to set region role to follower for region {region_id}");
}
// resets the countdown.
let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30);
countdown.as_mut().reset(far_future);
@@ -431,7 +478,7 @@ mod test {
let engine = Arc::new(engine);
region_server.register_engine(engine.clone());
let alive_keeper = Arc::new(RegionAliveKeeper::new(region_server.clone(), 100));
let alive_keeper = Arc::new(RegionAliveKeeper::new(region_server.clone(), None, 100));
let region_id = RegionId::new(1024, 1);
let builder = CreateRequestBuilder::new();
@@ -468,6 +515,7 @@ mod test {
&[GrantedRegion {
region_id: region_id.as_u64(),
role: api::v1::meta::RegionRole::Leader.into(),
extensions: HashMap::new(),
}],
Instant::now() + Duration::from_millis(200),
)
@@ -492,7 +540,8 @@ mod test {
async fn countdown_task() {
let region_server = mock_region_server();
let countdown_handle = CountdownTaskHandle::new(region_server, RegionId::new(9999, 2));
let countdown_handle =
CountdownTaskHandle::new(region_server, None, RegionId::new(9999, 2));
// If countdown task is not started, its deadline is set to far future.
assert!(
@@ -522,6 +571,7 @@ mod test {
.reset_deadline(
RegionRole::Leader,
Instant::now() + Duration::from_millis(heartbeat_interval_millis * 5),
HashMap::new(),
)
.await;
assert!(

View File

@@ -31,7 +31,6 @@ use servers::export_metrics::ExportMetricsOption;
use servers::grpc::GrpcOptions;
use servers::heartbeat_options::HeartbeatOptions;
use servers::http::HttpOptions;
use servers::Mode;
pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::gb(5);
@@ -359,7 +358,6 @@ impl Default for ObjectStoreConfig {
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[serde(default)]
pub struct DatanodeOptions {
pub mode: Mode,
pub node_id: Option<u64>,
pub require_lease_before_startup: bool,
pub init_regions_in_background: bool,
@@ -395,7 +393,6 @@ impl Default for DatanodeOptions {
#[allow(deprecated)]
fn default() -> Self {
Self {
mode: Mode::Standalone,
node_id: None,
require_lease_before_startup: false,
init_regions_in_background: false,

View File

@@ -157,6 +157,7 @@ impl Datanode {
pub struct DatanodeBuilder {
opts: DatanodeOptions,
mode: Mode,
plugins: Plugins,
meta_client: Option<MetaClientRef>,
kv_backend: Option<KvBackendRef>,
@@ -166,9 +167,10 @@ pub struct DatanodeBuilder {
impl DatanodeBuilder {
/// `kv_backend` is optional. If absent, the builder will try to build one
/// by using the given `opts`
pub fn new(opts: DatanodeOptions, plugins: Plugins) -> Self {
pub fn new(opts: DatanodeOptions, plugins: Plugins, mode: Mode) -> Self {
Self {
opts,
mode,
plugins,
meta_client: None,
kv_backend: None,
@@ -198,7 +200,7 @@ impl DatanodeBuilder {
}
pub async fn build(mut self) -> Result<Datanode> {
let mode = &self.opts.mode;
let mode = &self.mode;
let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
let meta_client = self.meta_client.take();
@@ -263,6 +265,7 @@ impl DatanodeBuilder {
region_server.clone(),
meta_client,
cache_registry,
self.plugins.clone(),
)
.await?,
)
@@ -629,6 +632,7 @@ mod tests {
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
use mito2::engine::MITO_ENGINE_NAME;
use servers::Mode;
use store_api::region_request::RegionRequest;
use store_api::storage::RegionId;
@@ -674,6 +678,7 @@ mod tests {
..Default::default()
},
Plugins::default(),
Mode::Standalone,
)
.with_cache_registry(layered_cache_registry);

View File

@@ -18,6 +18,7 @@ use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat};
use common_base::Plugins;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::datanode::REGION_STATISTIC_KEY;
use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
@@ -37,7 +38,7 @@ use tokio::sync::{mpsc, Notify};
use tokio::time::Instant;
use self::handler::RegionHeartbeatResponseHandler;
use crate::alive_keeper::RegionAliveKeeper;
use crate::alive_keeper::{CountdownTaskHandlerExtRef, RegionAliveKeeper};
use crate::config::DatanodeOptions;
use crate::error::{self, MetaClientInitSnafu, Result};
use crate::event_listener::RegionServerEventReceiver;
@@ -73,9 +74,12 @@ impl HeartbeatTask {
region_server: RegionServer,
meta_client: MetaClientRef,
cache_invalidator: CacheInvalidatorRef,
plugins: Plugins,
) -> Result<Self> {
let countdown_task_handler_ext = plugins.get::<CountdownTaskHandlerExtRef>();
let region_alive_keeper = Arc::new(RegionAliveKeeper::new(
region_server.clone(),
countdown_task_handler_ext,
opts.heartbeat.interval.as_millis() as u64,
));
let resp_handler_executor = Arc::new(HandlerGroupExecutor::new(vec![

View File

@@ -25,6 +25,6 @@ pub mod heartbeat;
pub mod metrics;
pub mod region_server;
pub mod service;
mod store;
pub mod store;
#[cfg(any(test, feature = "testing"))]
pub mod tests;

View File

@@ -55,7 +55,7 @@ use store_api::metric_engine_consts::{
FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
};
use store_api::region_engine::{
RegionEngineRef, RegionRole, RegionStatistic, SetRegionRoleStateResponse,
RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, SetRegionRoleStateResponse,
SettableRegionRoleState,
};
use store_api::region_request::{
@@ -308,6 +308,22 @@ impl RegionServer {
.with_context(|_| HandleRegionRequestSnafu { region_id })
}
pub async fn sync_region_manifest(
&self,
region_id: RegionId,
manifest_info: RegionManifestInfo,
) -> Result<()> {
let engine = self
.inner
.region_map
.get(&region_id)
.with_context(|| RegionNotFoundSnafu { region_id })?;
engine
.sync_region(region_id, manifest_info)
.await
.with_context(|_| HandleRegionRequestSnafu { region_id })
}
/// Set region role state gracefully.
///
/// For [SettableRegionRoleState::Follower]:

View File

@@ -15,7 +15,7 @@
//! object storage utilities
mod azblob;
mod fs;
pub mod fs;
mod gcs;
mod oss;
mod s3;

View File

@@ -24,7 +24,8 @@ use crate::config::FileConfig;
use crate::error::{self, Result};
use crate::store;
pub(crate) async fn new_fs_object_store(
/// A helper function to create a file system object store.
pub async fn new_fs_object_store(
data_home: &str,
_file_config: &FileConfig,
) -> Result<ObjectStore> {

View File

@@ -32,8 +32,8 @@ use query::{QueryEngine, QueryEngineContext};
use session::context::QueryContextRef;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{
RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, SetRegionRoleStateResponse,
SettableRegionRoleState,
RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic,
SetRegionRoleStateResponse, SettableRegionRoleState,
};
use store_api::region_request::{AffectedRows, RegionRequest};
use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
@@ -246,6 +246,14 @@ impl RegionEngine for MockRegionEngine {
Some(RegionRole::Leader)
}
async fn sync_region(
&self,
_region_id: RegionId,
_manifest_info: RegionManifestInfo,
) -> Result<(), BoxedError> {
unimplemented!()
}
fn as_any(&self) -> &dyn Any {
self
}

View File

@@ -28,8 +28,9 @@ use snafu::{ensure, ResultExt};
use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, Result};
use crate::prelude::ConcreteDataType;
pub use crate::schema::column_schema::{
ColumnSchema, FulltextAnalyzer, FulltextOptions, Metadata, SkippingIndexOptions,
SkippingIndexType, COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE, COLUMN_FULLTEXT_OPT_KEY_ANALYZER,
ColumnSchema, FulltextAnalyzer, FulltextBackend, FulltextOptions, Metadata,
SkippingIndexOptions, SkippingIndexType, COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE,
COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_BACKEND,
COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY,
COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY, FULLTEXT_KEY, INVERTED_INDEX_KEY,
SKIPPING_INDEX_KEY, TIME_INDEX_KEY,

View File

@@ -46,6 +46,7 @@ pub const SKIPPING_INDEX_KEY: &str = "greptime:skipping_index";
pub const COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE: &str = "enable";
pub const COLUMN_FULLTEXT_OPT_KEY_ANALYZER: &str = "analyzer";
pub const COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE: &str = "case_sensitive";
pub const COLUMN_FULLTEXT_OPT_KEY_BACKEND: &str = "backend";
/// Keys used in SKIPPING index options
pub const COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY: &str = "granularity";
@@ -514,6 +515,9 @@ pub struct FulltextOptions {
/// Whether the fulltext index is case-sensitive.
#[serde(default)]
pub case_sensitive: bool,
/// The fulltext backend to use.
#[serde(default)]
pub backend: FulltextBackend,
}
impl fmt::Display for FulltextOptions {
@@ -522,11 +526,30 @@ impl fmt::Display for FulltextOptions {
if self.enable {
write!(f, ", analyzer={}", self.analyzer)?;
write!(f, ", case_sensitive={}", self.case_sensitive)?;
write!(f, ", backend={}", self.backend)?;
}
Ok(())
}
}
/// The backend of the fulltext index.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default, Visit, VisitMut)]
#[serde(rename_all = "kebab-case")]
pub enum FulltextBackend {
#[default]
Tantivy,
Bloom, // TODO(zhongzc): when bloom is ready, use it as default
}
impl fmt::Display for FulltextBackend {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
FulltextBackend::Tantivy => write!(f, "tantivy"),
FulltextBackend::Bloom => write!(f, "bloom"),
}
}
}
impl TryFrom<HashMap<String, String>> for FulltextOptions {
type Error = Error;
@@ -575,6 +598,19 @@ impl TryFrom<HashMap<String, String>> for FulltextOptions {
}
}
if let Some(backend) = options.get(COLUMN_FULLTEXT_OPT_KEY_BACKEND) {
match backend.to_ascii_lowercase().as_str() {
"bloom" => fulltext_options.backend = FulltextBackend::Bloom,
"tantivy" => fulltext_options.backend = FulltextBackend::Tantivy,
_ => {
return InvalidFulltextOptionSnafu {
msg: format!("{backend}, expected: 'bloom' | 'tantivy'"),
}
.fail();
}
}
}
Ok(fulltext_options)
}
}

View File

@@ -30,6 +30,7 @@ mod boolean;
mod constant;
mod date;
mod decimal;
mod dictionary;
mod duration;
mod eq;
mod helper;
@@ -48,6 +49,7 @@ pub use boolean::{BooleanVector, BooleanVectorBuilder};
pub use constant::ConstantVector;
pub use date::{DateVector, DateVectorBuilder};
pub use decimal::{Decimal128Vector, Decimal128VectorBuilder};
pub use dictionary::{DictionaryIter, DictionaryVector};
pub use duration::{
DurationMicrosecondVector, DurationMicrosecondVectorBuilder, DurationMillisecondVector,
DurationMillisecondVectorBuilder, DurationNanosecondVector, DurationNanosecondVectorBuilder,

View File

@@ -0,0 +1,438 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::sync::Arc;
use arrow::array::Array;
use arrow::datatypes::Int32Type;
use arrow_array::{ArrayRef, DictionaryArray, Int32Array};
use serde_json::Value as JsonValue;
use snafu::ResultExt;
use super::operations::VectorOp;
use crate::data_type::ConcreteDataType;
use crate::error::{self, Result};
use crate::serialize::Serializable;
use crate::types::DictionaryType;
use crate::value::{Value, ValueRef};
use crate::vectors::{self, Helper, Validity, Vector, VectorRef};
/// Vector of dictionaries, basically backed by Arrow's `DictionaryArray`.
#[derive(Debug, PartialEq)]
pub struct DictionaryVector {
array: DictionaryArray<Int32Type>,
/// The datatype of the items in the dictionary.
item_type: ConcreteDataType,
/// The vector of items in the dictionary.
item_vector: VectorRef,
}
impl DictionaryVector {
/// Create a new instance of `DictionaryVector` from a dictionary array and item type
pub fn new(array: DictionaryArray<Int32Type>, item_type: ConcreteDataType) -> Result<Self> {
let item_vector = Helper::try_into_vector(array.values())?;
Ok(Self {
array,
item_type,
item_vector,
})
}
/// Returns the underlying Arrow dictionary array
pub fn array(&self) -> &DictionaryArray<Int32Type> {
&self.array
}
/// Returns the keys array of this dictionary
pub fn keys(&self) -> &arrow_array::PrimitiveArray<Int32Type> {
self.array.keys()
}
/// Returns the values array of this dictionary
pub fn values(&self) -> &ArrayRef {
self.array.values()
}
pub fn as_arrow(&self) -> &dyn Array {
&self.array
}
}
impl Vector for DictionaryVector {
fn data_type(&self) -> ConcreteDataType {
ConcreteDataType::Dictionary(DictionaryType::new(
ConcreteDataType::int32_datatype(),
self.item_type.clone(),
))
}
fn vector_type_name(&self) -> String {
"DictionaryVector".to_string()
}
fn as_any(&self) -> &dyn Any {
self
}
fn len(&self) -> usize {
self.array.len()
}
fn to_arrow_array(&self) -> ArrayRef {
Arc::new(self.array.clone())
}
fn to_boxed_arrow_array(&self) -> Box<dyn Array> {
Box::new(self.array.clone())
}
fn validity(&self) -> Validity {
vectors::impl_validity_for_vector!(self.array)
}
fn memory_size(&self) -> usize {
self.array.get_buffer_memory_size()
}
fn null_count(&self) -> usize {
self.array.null_count()
}
fn is_null(&self, row: usize) -> bool {
self.array.is_null(row)
}
fn slice(&self, offset: usize, length: usize) -> VectorRef {
Arc::new(Self {
array: self.array.slice(offset, length),
item_type: self.item_type.clone(),
item_vector: self.item_vector.clone(),
})
}
fn get(&self, index: usize) -> Value {
if !self.array.is_valid(index) {
return Value::Null;
}
let key = self.array.keys().value(index);
self.item_vector.get(key as usize)
}
fn get_ref(&self, index: usize) -> ValueRef {
if !self.array.is_valid(index) {
return ValueRef::Null;
}
let key = self.array.keys().value(index);
self.item_vector.get_ref(key as usize)
}
}
impl Serializable for DictionaryVector {
fn serialize_to_json(&self) -> Result<Vec<JsonValue>> {
// Convert the dictionary array to JSON, where each element is either null or
// the value it refers to in the dictionary
let mut result = Vec::with_capacity(self.len());
for i in 0..self.len() {
if self.is_null(i) {
result.push(JsonValue::Null);
} else {
let key = self.array.keys().value(i);
let value = self.item_vector.get(key as usize);
let json_value = serde_json::to_value(value).context(error::SerializeSnafu)?;
result.push(json_value);
}
}
Ok(result)
}
}
impl TryFrom<DictionaryArray<Int32Type>> for DictionaryVector {
type Error = crate::error::Error;
fn try_from(array: DictionaryArray<Int32Type>) -> Result<Self> {
let item_type = ConcreteDataType::from_arrow_type(array.values().data_type());
let item_vector = Helper::try_into_vector(array.values())?;
Ok(Self {
array,
item_type,
item_vector,
})
}
}
pub struct DictionaryIter<'a> {
vector: &'a DictionaryVector,
idx: usize,
}
impl<'a> DictionaryIter<'a> {
pub fn new(vector: &'a DictionaryVector) -> DictionaryIter<'a> {
DictionaryIter { vector, idx: 0 }
}
}
impl<'a> Iterator for DictionaryIter<'a> {
type Item = Option<ValueRef<'a>>;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.idx >= self.vector.len() {
return None;
}
let idx = self.idx;
self.idx += 1;
if self.vector.is_null(idx) {
return Some(None);
}
Some(Some(self.vector.item_vector.get_ref(self.idx)))
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
(
self.vector.len() - self.idx,
Some(self.vector.len() - self.idx),
)
}
}
impl VectorOp for DictionaryVector {
fn replicate(&self, offsets: &[usize]) -> VectorRef {
let keys = self.array.keys();
let mut replicated_keys = Vec::with_capacity(offsets.len());
let mut previous_offset = 0;
for (i, &offset) in offsets.iter().enumerate() {
let key = if i < self.len() {
if keys.is_valid(i) {
Some(keys.value(i))
} else {
None
}
} else {
None
};
// repeat this key (offset - previous_offset) times
let repeat_count = offset - previous_offset;
if repeat_count > 0 {
replicated_keys.resize(replicated_keys.len() + repeat_count, key);
}
previous_offset = offset;
}
let new_keys = Int32Array::from(replicated_keys);
let new_array = DictionaryArray::try_new(new_keys, self.values().clone())
.expect("Failed to create replicated dictionary array");
Arc::new(Self {
array: new_array,
item_type: self.item_type.clone(),
item_vector: self.item_vector.clone(),
})
}
fn filter(&self, filter: &vectors::BooleanVector) -> Result<VectorRef> {
let key_array: ArrayRef = Arc::new(self.array.keys().clone());
let key_vector = Helper::try_into_vector(&key_array)?;
let filtered_key_vector = key_vector.filter(filter)?;
let filtered_key_array = filtered_key_vector.to_arrow_array();
let filtered_key_array = filtered_key_array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
let new_array = DictionaryArray::try_new(filtered_key_array.clone(), self.values().clone())
.expect("Failed to create filtered dictionary array");
Ok(Arc::new(Self {
array: new_array,
item_type: self.item_type.clone(),
item_vector: self.item_vector.clone(),
}))
}
fn cast(&self, to_type: &ConcreteDataType) -> Result<VectorRef> {
let new_items = self.item_vector.cast(to_type)?;
let new_array =
DictionaryArray::try_new(self.array.keys().clone(), new_items.to_arrow_array())
.expect("Failed to create casted dictionary array");
Ok(Arc::new(Self {
array: new_array,
item_type: to_type.clone(),
item_vector: self.item_vector.clone(),
}))
}
fn take(&self, indices: &vectors::UInt32Vector) -> Result<VectorRef> {
let key_array: ArrayRef = Arc::new(self.array.keys().clone());
let key_vector = Helper::try_into_vector(&key_array)?;
let new_key_vector = key_vector.take(indices)?;
let new_key_array = new_key_vector.to_arrow_array();
let new_key_array = new_key_array.as_any().downcast_ref::<Int32Array>().unwrap();
let new_array = DictionaryArray::try_new(new_key_array.clone(), self.values().clone())
.expect("Failed to create filtered dictionary array");
Ok(Arc::new(Self {
array: new_array,
item_type: self.item_type.clone(),
item_vector: self.item_vector.clone(),
}))
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow_array::StringArray;
use super::*;
// Helper function to create a test dictionary vector with string values
fn create_test_dictionary() -> DictionaryVector {
// Dictionary values: ["a", "b", "c", "d"]
// Keys: [0, 1, 2, null, 1, 3]
// Resulting in: ["a", "b", "c", null, "b", "d"]
let values = StringArray::from(vec!["a", "b", "c", "d"]);
let keys = Int32Array::from(vec![Some(0), Some(1), Some(2), None, Some(1), Some(3)]);
let dict_array = DictionaryArray::new(keys, Arc::new(values));
DictionaryVector::try_from(dict_array).unwrap()
}
#[test]
fn test_dictionary_vector_basics() {
let dict_vec = create_test_dictionary();
// Test length and null count
assert_eq!(dict_vec.len(), 6);
assert_eq!(dict_vec.null_count(), 1);
// Test data type
let data_type = dict_vec.data_type();
if let ConcreteDataType::Dictionary(dict_type) = data_type {
assert_eq!(*dict_type.value_type(), ConcreteDataType::string_datatype());
} else {
panic!("Expected Dictionary data type");
}
// Test is_null
assert!(!dict_vec.is_null(0));
assert!(dict_vec.is_null(3));
// Test get values
assert_eq!(dict_vec.get(0), Value::String("a".to_string().into()));
assert_eq!(dict_vec.get(1), Value::String("b".to_string().into()));
assert_eq!(dict_vec.get(3), Value::Null);
assert_eq!(dict_vec.get(4), Value::String("b".to_string().into()));
}
#[test]
fn test_slice() {
let dict_vec = create_test_dictionary();
let sliced = dict_vec.slice(1, 3);
assert_eq!(sliced.len(), 3);
assert_eq!(sliced.get(0), Value::String("b".to_string().into()));
assert_eq!(sliced.get(1), Value::String("c".to_string().into()));
assert_eq!(sliced.get(2), Value::Null);
}
#[test]
fn test_replicate() {
let dict_vec = create_test_dictionary();
// Replicate with offsets [0, 2, 5] - should get values at these indices
let offsets = vec![0, 2, 5];
let replicated = dict_vec.replicate(&offsets);
assert_eq!(replicated.len(), 5);
assert_eq!(replicated.get(0), Value::String("b".to_string().into()));
assert_eq!(replicated.get(1), Value::String("b".to_string().into()));
assert_eq!(replicated.get(2), Value::String("c".to_string().into()));
assert_eq!(replicated.get(3), Value::String("c".to_string().into()));
assert_eq!(replicated.get(4), Value::String("c".to_string().into()));
}
#[test]
fn test_filter() {
let dict_vec = create_test_dictionary();
// Keep only indices 0, 2, 4
let filter_values = vec![true, false, true, false, true, false];
let filter = vectors::BooleanVector::from(filter_values);
let filtered = dict_vec.filter(&filter).unwrap();
assert_eq!(filtered.len(), 3);
// Check the values
assert_eq!(filtered.get(0), Value::String("a".to_string().into()));
assert_eq!(filtered.get(1), Value::String("c".to_string().into()));
assert_eq!(filtered.get(2), Value::String("b".to_string().into()));
}
#[test]
fn test_cast() {
let dict_vec = create_test_dictionary();
// Cast to the same type should return an equivalent vector
let casted = dict_vec.cast(&ConcreteDataType::string_datatype()).unwrap();
// The returned vector should have string values
assert_eq!(
casted.data_type(),
ConcreteDataType::Dictionary(DictionaryType::new(
ConcreteDataType::int32_datatype(),
ConcreteDataType::string_datatype(),
))
);
assert_eq!(casted.len(), dict_vec.len());
// Values should match the original dictionary lookups
assert_eq!(casted.get(0), Value::String("a".to_string().into()));
assert_eq!(casted.get(1), Value::String("b".to_string().into()));
assert_eq!(casted.get(2), Value::String("c".to_string().into()));
assert_eq!(casted.get(3), Value::Null);
assert_eq!(casted.get(4), Value::String("b".to_string().into()));
assert_eq!(casted.get(5), Value::String("d".to_string().into()));
}
#[test]
fn test_take() {
let dict_vec = create_test_dictionary();
// Take indices 2, 0, 4
let indices_vec = vec![Some(2u32), Some(0), Some(4)];
let indices = vectors::UInt32Vector::from(indices_vec);
let taken = dict_vec.take(&indices).unwrap();
assert_eq!(taken.len(), 3);
// Check the values
assert_eq!(taken.get(0), Value::String("c".to_string().into()));
assert_eq!(taken.get(1), Value::String("a".to_string().into()));
assert_eq!(taken.get(2), Value::String("b".to_string().into()));
}
}

View File

@@ -20,7 +20,8 @@ use std::sync::Arc;
use arrow::array::{Array, ArrayRef, StringArray};
use arrow::compute;
use arrow::compute::kernels::comparison;
use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
use arrow::datatypes::{DataType as ArrowDataType, Int32Type, TimeUnit};
use arrow_array::DictionaryArray;
use arrow_schema::IntervalUnit;
use datafusion_common::ScalarValue;
use snafu::{OptionExt, ResultExt};
@@ -31,7 +32,7 @@ use crate::prelude::DataType;
use crate::scalars::{Scalar, ScalarVectorBuilder};
use crate::value::{ListValue, ListValueRef, Value};
use crate::vectors::{
BinaryVector, BooleanVector, ConstantVector, DateVector, Decimal128Vector,
BinaryVector, BooleanVector, ConstantVector, DateVector, Decimal128Vector, DictionaryVector,
DurationMicrosecondVector, DurationMillisecondVector, DurationNanosecondVector,
DurationSecondVector, Float32Vector, Float64Vector, Int16Vector, Int32Vector, Int64Vector,
Int8Vector, IntervalDayTimeVector, IntervalMonthDayNanoVector, IntervalYearMonthVector,
@@ -347,6 +348,17 @@ impl Helper {
ArrowDataType::Decimal128(_, _) => {
Arc::new(Decimal128Vector::try_from_arrow_array(array)?)
}
ArrowDataType::Dictionary(key, value) if matches!(&**key, ArrowDataType::Int32) => {
let array = array
.as_ref()
.as_any()
.downcast_ref::<DictionaryArray<Int32Type>>()
.unwrap(); // Safety: the type is guarded by match arm condition
Arc::new(DictionaryVector::new(
array.clone(),
ConcreteDataType::try_from(value.as_ref())?,
)?)
}
ArrowDataType::Float16
| ArrowDataType::LargeList(_)
| ArrowDataType::FixedSizeList(_, _)

View File

@@ -14,14 +14,11 @@
mod cast;
mod filter;
mod find_unique;
mod replicate;
mod take;
use std::sync::Arc;
use common_base::BitVec;
use crate::error::{self, Result};
use crate::types::LogicalPrimitiveType;
use crate::vectors::constant::ConstantVector;
@@ -40,23 +37,6 @@ pub trait VectorOp {
/// Panics if `offsets.len() != self.len()`.
fn replicate(&self, offsets: &[usize]) -> VectorRef;
/// Mark `i-th` bit of `selected` to `true` if the `i-th` element of `self` is unique, which
/// means there is no elements behind it have same value as it.
///
/// The caller should ensure
/// 1. the length of `selected` bitmap is equal to `vector.len()`.
/// 2. `vector` and `prev_vector` are sorted.
///
/// If there are multiple duplicate elements, this function retains the **first** element.
/// The first element is considered as unique if the first element of `self` is different
/// from its previous element, that is the last element of `prev_vector`.
///
/// # Panics
/// Panics if
/// - `selected.len() < self.len()`.
/// - `prev_vector` and `self` have different data types.
fn find_unique(&self, selected: &mut BitVec, prev_vector: Option<&dyn Vector>);
/// Filters the vector, returns elements matching the `filter` (i.e. where the values are true).
///
/// Note that the nulls of `filter` are interpreted as `false` will lead to these elements being masked out.
@@ -81,11 +61,6 @@ macro_rules! impl_scalar_vector_op {
replicate::replicate_scalar(self, offsets)
}
fn find_unique(&self, selected: &mut BitVec, prev_vector: Option<&dyn Vector>) {
let prev_vector = prev_vector.map(|pv| pv.as_any().downcast_ref::<$VectorType>().unwrap());
find_unique::find_unique_scalar(self, selected, prev_vector);
}
fn filter(&self, filter: &BooleanVector) -> Result<VectorRef> {
filter::filter_non_constant!(self, $VectorType, filter)
}
@@ -121,11 +96,6 @@ impl VectorOp for Decimal128Vector {
std::sync::Arc::new(replicate::replicate_decimal128(self, offsets))
}
fn find_unique(&self, selected: &mut BitVec, prev_vector: Option<&dyn Vector>) {
let prev_vector = prev_vector.and_then(|pv| pv.as_any().downcast_ref::<Decimal128Vector>());
find_unique::find_unique_scalar(self, selected, prev_vector);
}
fn filter(&self, filter: &BooleanVector) -> Result<VectorRef> {
filter::filter_non_constant!(self, Decimal128Vector, filter)
}
@@ -144,12 +114,6 @@ impl<T: LogicalPrimitiveType> VectorOp for PrimitiveVector<T> {
std::sync::Arc::new(replicate::replicate_primitive(self, offsets))
}
fn find_unique(&self, selected: &mut BitVec, prev_vector: Option<&dyn Vector>) {
let prev_vector =
prev_vector.and_then(|pv| pv.as_any().downcast_ref::<PrimitiveVector<T>>());
find_unique::find_unique_scalar(self, selected, prev_vector);
}
fn filter(&self, filter: &BooleanVector) -> Result<VectorRef> {
filter::filter_non_constant!(self, PrimitiveVector<T>, filter)
}
@@ -168,11 +132,6 @@ impl VectorOp for NullVector {
replicate::replicate_null(self, offsets)
}
fn find_unique(&self, selected: &mut BitVec, prev_vector: Option<&dyn Vector>) {
let prev_vector = prev_vector.and_then(|pv| pv.as_any().downcast_ref::<NullVector>());
find_unique::find_unique_null(self, selected, prev_vector);
}
fn filter(&self, filter: &BooleanVector) -> Result<VectorRef> {
filter::filter_non_constant!(self, NullVector, filter)
}
@@ -195,11 +154,6 @@ impl VectorOp for ConstantVector {
self.replicate_vector(offsets)
}
fn find_unique(&self, selected: &mut BitVec, prev_vector: Option<&dyn Vector>) {
let prev_vector = prev_vector.and_then(|pv| pv.as_any().downcast_ref::<ConstantVector>());
find_unique::find_unique_constant(self, selected, prev_vector);
}
fn filter(&self, filter: &BooleanVector) -> Result<VectorRef> {
self.filter_vector(filter)
}

View File

@@ -1,366 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_base::BitVec;
use crate::scalars::ScalarVector;
use crate::vectors::constant::ConstantVector;
use crate::vectors::{NullVector, Vector};
// To implement `find_unique()` correctly, we need to keep in mind that always marks an element as
// selected when it is different from the previous one, and leaves the `selected` unchanged
// in any other case.
pub(crate) fn find_unique_scalar<'a, T: ScalarVector>(
vector: &'a T,
selected: &'a mut BitVec,
prev_vector: Option<&'a T>,
) where
T::RefItem<'a>: PartialEq,
{
assert!(selected.len() >= vector.len());
if vector.is_empty() {
return;
}
for ((i, current), next) in vector
.iter_data()
.enumerate()
.zip(vector.iter_data().skip(1))
{
if current != next {
// If next element is a different element, we mark it as selected.
selected.set(i + 1, true);
}
}
// Marks first element as selected if it is different from previous element, otherwise
// keep selected bitmap unchanged.
let is_first_not_duplicate = prev_vector
.map(|pv| {
if pv.is_empty() {
true
} else {
let last = pv.get_data(pv.len() - 1);
last != vector.get_data(0)
}
})
.unwrap_or(true);
if is_first_not_duplicate {
selected.set(0, true);
}
}
pub(crate) fn find_unique_null(
vector: &NullVector,
selected: &mut BitVec,
prev_vector: Option<&NullVector>,
) {
if vector.is_empty() {
return;
}
let is_first_not_duplicate = prev_vector.map(NullVector::is_empty).unwrap_or(true);
if is_first_not_duplicate {
selected.set(0, true);
}
}
pub(crate) fn find_unique_constant(
vector: &ConstantVector,
selected: &mut BitVec,
prev_vector: Option<&ConstantVector>,
) {
if vector.is_empty() {
return;
}
let is_first_not_duplicate = prev_vector
.map(|pv| {
if pv.is_empty() {
true
} else {
vector.get_constant_ref() != pv.get_constant_ref()
}
})
.unwrap_or(true);
if is_first_not_duplicate {
selected.set(0, true);
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_time::Date;
use super::*;
use crate::timestamp::*;
use crate::vectors::{Int32Vector, StringVector, Vector, VectorOp};
fn check_bitmap(expect: &[bool], selected: &BitVec) {
let actual = selected.iter().collect::<Vec<_>>();
assert_eq!(expect, actual);
}
fn check_find_unique_scalar(expect: &[bool], input: &[i32], prev: Option<&[i32]>) {
check_find_unique_scalar_opt(expect, input.iter().map(|v| Some(*v)), prev);
}
fn check_find_unique_scalar_opt(
expect: &[bool],
input: impl Iterator<Item = Option<i32>>,
prev: Option<&[i32]>,
) {
let input = Int32Vector::from(input.collect::<Vec<_>>());
let prev = prev.map(Int32Vector::from_slice);
let mut selected = BitVec::repeat(false, input.len());
input.find_unique(&mut selected, prev.as_ref().map(|v| v as _));
check_bitmap(expect, &selected);
}
#[test]
fn test_find_unique_scalar() {
check_find_unique_scalar(&[], &[], None);
check_find_unique_scalar(&[true], &[1], None);
check_find_unique_scalar(&[true, false], &[1, 1], None);
check_find_unique_scalar(&[true, true], &[1, 2], None);
check_find_unique_scalar(&[true, true, true, true], &[1, 2, 3, 4], None);
check_find_unique_scalar(&[true, false, true, false], &[1, 1, 3, 3], None);
check_find_unique_scalar(&[true, false, false, false, true], &[2, 2, 2, 2, 3], None);
check_find_unique_scalar(&[true], &[5], Some(&[]));
check_find_unique_scalar(&[true], &[5], Some(&[3]));
check_find_unique_scalar(&[false], &[5], Some(&[5]));
check_find_unique_scalar(&[false], &[5], Some(&[4, 5]));
check_find_unique_scalar(&[false, true], &[5, 6], Some(&[4, 5]));
check_find_unique_scalar(&[false, true, false], &[5, 6, 6], Some(&[4, 5]));
check_find_unique_scalar(
&[false, true, false, true, true],
&[5, 6, 6, 7, 8],
Some(&[4, 5]),
);
check_find_unique_scalar_opt(
&[true, true, false, true, false],
[Some(1), Some(2), Some(2), None, None].into_iter(),
None,
);
}
#[test]
fn test_find_unique_scalar_multi_times_with_prev() {
let prev = Int32Vector::from_slice([1]);
let v1 = Int32Vector::from_slice([2, 3, 4]);
let mut selected = BitVec::repeat(false, v1.len());
v1.find_unique(&mut selected, Some(&prev));
// Though element in v2 are the same as prev, but we should still keep them.
let v2 = Int32Vector::from_slice([1, 1, 1]);
v2.find_unique(&mut selected, Some(&prev));
check_bitmap(&[true, true, true], &selected);
}
fn new_bitmap(bits: &[bool]) -> BitVec {
BitVec::from_iter(bits)
}
#[test]
fn test_find_unique_scalar_with_prev() {
let prev = Int32Vector::from_slice([1]);
let mut selected = new_bitmap(&[true, false, true, false]);
let v = Int32Vector::from_slice([2, 3, 4, 5]);
v.find_unique(&mut selected, Some(&prev));
// All elements are different.
check_bitmap(&[true, true, true, true], &selected);
let mut selected = new_bitmap(&[true, false, true, false]);
let v = Int32Vector::from_slice([1, 2, 3, 4]);
v.find_unique(&mut selected, Some(&prev));
// Though first element is duplicate, but we keep the flag unchanged.
check_bitmap(&[true, true, true, true], &selected);
// Same case as above, but now `prev` is None.
let mut selected = new_bitmap(&[true, false, true, false]);
let v = Int32Vector::from_slice([1, 2, 3, 4]);
v.find_unique(&mut selected, None);
check_bitmap(&[true, true, true, true], &selected);
// Same case as above, but now `prev` is empty.
let mut selected = new_bitmap(&[true, false, true, false]);
let v = Int32Vector::from_slice([1, 2, 3, 4]);
v.find_unique(&mut selected, Some(&Int32Vector::from_slice([])));
check_bitmap(&[true, true, true, true], &selected);
let mut selected = new_bitmap(&[false, false, false, false]);
let v = Int32Vector::from_slice([2, 2, 4, 5]);
v.find_unique(&mut selected, Some(&prev));
// only v[1] is duplicate.
check_bitmap(&[true, false, true, true], &selected);
}
fn check_find_unique_null(len: usize) {
let input = NullVector::new(len);
let mut selected = BitVec::repeat(false, input.len());
input.find_unique(&mut selected, None);
let mut expect = vec![false; len];
if !expect.is_empty() {
expect[0] = true;
}
check_bitmap(&expect, &selected);
let mut selected = BitVec::repeat(false, input.len());
let prev = Some(NullVector::new(1));
input.find_unique(&mut selected, prev.as_ref().map(|v| v as _));
let expect = vec![false; len];
check_bitmap(&expect, &selected);
}
#[test]
fn test_find_unique_null() {
for len in 0..5 {
check_find_unique_null(len);
}
}
#[test]
fn test_find_unique_null_with_prev() {
let prev = NullVector::new(1);
// Keep flags unchanged.
let mut selected = new_bitmap(&[true, false, true, false]);
let v = NullVector::new(4);
v.find_unique(&mut selected, Some(&prev));
check_bitmap(&[true, false, true, false], &selected);
// Keep flags unchanged.
let mut selected = new_bitmap(&[false, false, true, false]);
v.find_unique(&mut selected, Some(&prev));
check_bitmap(&[false, false, true, false], &selected);
// Prev is None, select first element.
let mut selected = new_bitmap(&[false, false, true, false]);
v.find_unique(&mut selected, None);
check_bitmap(&[true, false, true, false], &selected);
// Prev is empty, select first element.
let mut selected = new_bitmap(&[false, false, true, false]);
v.find_unique(&mut selected, Some(&NullVector::new(0)));
check_bitmap(&[true, false, true, false], &selected);
}
fn check_find_unique_constant(len: usize) {
let input = ConstantVector::new(Arc::new(Int32Vector::from_slice([8])), len);
let mut selected = BitVec::repeat(false, len);
input.find_unique(&mut selected, None);
let mut expect = vec![false; len];
if !expect.is_empty() {
expect[0] = true;
}
check_bitmap(&expect, &selected);
let mut selected = BitVec::repeat(false, len);
let prev = Some(ConstantVector::new(
Arc::new(Int32Vector::from_slice([8])),
1,
));
input.find_unique(&mut selected, prev.as_ref().map(|v| v as _));
let expect = vec![false; len];
check_bitmap(&expect, &selected);
}
#[test]
fn test_find_unique_constant() {
for len in 0..5 {
check_find_unique_constant(len);
}
}
#[test]
fn test_find_unique_constant_with_prev() {
let prev = ConstantVector::new(Arc::new(Int32Vector::from_slice([1])), 1);
// Keep flags unchanged.
let mut selected = new_bitmap(&[true, false, true, false]);
let v = ConstantVector::new(Arc::new(Int32Vector::from_slice([1])), 4);
v.find_unique(&mut selected, Some(&prev));
check_bitmap(&[true, false, true, false], &selected);
// Keep flags unchanged.
let mut selected = new_bitmap(&[false, false, true, false]);
v.find_unique(&mut selected, Some(&prev));
check_bitmap(&[false, false, true, false], &selected);
// Prev is None, select first element.
let mut selected = new_bitmap(&[false, false, true, false]);
v.find_unique(&mut selected, None);
check_bitmap(&[true, false, true, false], &selected);
// Prev is empty, select first element.
let mut selected = new_bitmap(&[false, false, true, false]);
v.find_unique(
&mut selected,
Some(&ConstantVector::new(
Arc::new(Int32Vector::from_slice([1])),
0,
)),
);
check_bitmap(&[true, false, true, false], &selected);
// Different constant vector.
let mut selected = new_bitmap(&[false, false, true, false]);
let v = ConstantVector::new(Arc::new(Int32Vector::from_slice([2])), 4);
v.find_unique(&mut selected, Some(&prev));
check_bitmap(&[true, false, true, false], &selected);
}
#[test]
fn test_find_unique_string() {
let input = StringVector::from_slice(&["a", "a", "b", "c"]);
let mut selected = BitVec::repeat(false, 4);
input.find_unique(&mut selected, None);
let expect = vec![true, false, true, true];
check_bitmap(&expect, &selected);
}
macro_rules! impl_find_unique_date_like_test {
($VectorType: ident, $ValueType: ident, $method: ident) => {{
use $crate::vectors::$VectorType;
let v = $VectorType::from_iterator([8, 8, 9, 10].into_iter().map($ValueType::$method));
let mut selected = BitVec::repeat(false, 4);
v.find_unique(&mut selected, None);
let expect = vec![true, false, true, true];
check_bitmap(&expect, &selected);
}};
}
#[test]
fn test_find_unique_date_like() {
impl_find_unique_date_like_test!(DateVector, Date, new);
impl_find_unique_date_like_test!(TimestampSecondVector, TimestampSecond, from);
impl_find_unique_date_like_test!(TimestampMillisecondVector, TimestampMillisecond, from);
impl_find_unique_date_like_test!(TimestampMicrosecondVector, TimestampMicrosecond, from);
impl_find_unique_date_like_test!(TimestampNanosecondVector, TimestampNanosecond, from);
}
}

View File

@@ -26,8 +26,8 @@ use object_store::ObjectStore;
use snafu::{ensure, OptionExt};
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{
RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, SetRegionRoleStateResponse,
SettableRegionRoleState, SinglePartitionScanner,
RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic,
SetRegionRoleStateResponse, SettableRegionRoleState, SinglePartitionScanner,
};
use store_api::region_request::{
AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest,
@@ -138,6 +138,15 @@ impl RegionEngine for FileRegionEngine {
}
}
async fn sync_region(
&self,
_region_id: RegionId,
_manifest_info: RegionManifestInfo,
) -> Result<(), BoxedError> {
// File engine doesn't need to sync region manifest.
Ok(())
}
fn role(&self, region_id: RegionId) -> Option<RegionRole> {
self.inner.state(region_id)
}

View File

@@ -37,7 +37,6 @@ use serde::{Deserialize, Serialize};
use servers::grpc::GrpcOptions;
use servers::heartbeat_options::HeartbeatOptions;
use servers::http::HttpOptions;
use servers::Mode;
use session::context::QueryContext;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{ConcreteDataType, RegionId};
@@ -63,7 +62,7 @@ pub(crate) mod refill;
mod stat;
#[cfg(test)]
mod tests;
mod util;
pub(crate) mod util;
mod worker;
pub(crate) mod node_context;
@@ -102,7 +101,6 @@ impl Default for FlowConfig {
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct FlownodeOptions {
pub mode: Mode,
pub node_id: Option<u64>,
pub flow: FlowConfig,
pub grpc: GrpcOptions,
@@ -116,7 +114,6 @@ pub struct FlownodeOptions {
impl Default for FlownodeOptions {
fn default() -> Self {
Self {
mode: servers::Mode::Standalone,
node_id: None,
flow: FlowConfig::default(),
grpc: GrpcOptions::default().with_bind_addr("127.0.0.1:3004"),

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Util functions for adapter
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
@@ -20,7 +22,7 @@ use api::v1::{ColumnDataType, ColumnDataTypeExtension, CreateTableExpr, Semantic
use common_error::ext::BoxedError;
use common_meta::key::table_info::TableInfoValue;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema};
use itertools::Itertools;
use operator::expr_helper;
use session::context::QueryContextBuilder;
@@ -174,7 +176,15 @@ pub fn table_info_value_to_relation_desc(
let default_values = raw_schema
.column_schemas
.iter()
.map(|c| c.default_constraint().cloned())
.map(|c| {
c.default_constraint().cloned().or_else(|| {
if c.is_nullable() {
Some(ColumnDefaultConstraint::null_value())
} else {
None
}
})
})
.collect_vec();
Ok(TableDesc::new(relation_desc, default_values))

View File

@@ -179,7 +179,7 @@ impl Context<'_, '_> {
) -> CollectionBundle<Batch> {
let (send_port, recv_port) = self.df.make_edge::<_, Toff<Batch>>("constant_batch");
let mut per_time: BTreeMap<repr::Timestamp, Vec<DiffRow>> = Default::default();
for (key, group) in &rows.into_iter().group_by(|(_row, ts, _diff)| *ts) {
for (key, group) in &rows.into_iter().chunk_by(|(_row, ts, _diff)| *ts) {
per_time.entry(key).or_default().extend(group);
}
@@ -233,7 +233,7 @@ impl Context<'_, '_> {
pub fn render_constant(&mut self, rows: Vec<DiffRow>) -> CollectionBundle {
let (send_port, recv_port) = self.df.make_edge::<_, Toff>("constant");
let mut per_time: BTreeMap<repr::Timestamp, Vec<DiffRow>> = Default::default();
for (key, group) in &rows.into_iter().group_by(|(_row, ts, _diff)| *ts) {
for (key, group) in &rows.into_iter().chunk_by(|(_row, ts, _diff)| *ts) {
per_time.entry(key).or_default().extend(group);
}

View File

@@ -16,6 +16,7 @@
use std::any::Any;
use arrow_schema::ArrowError;
use common_error::ext::BoxedError;
use common_error::{define_into_tonic_status, from_err_code_msg_to_header};
use common_macro::stack_trace_debug;
@@ -156,6 +157,15 @@ pub enum Error {
location: Location,
},
#[snafu(display("Arrow error: {raw:?} in context: {context}"))]
Arrow {
#[snafu(source)]
raw: ArrowError,
context: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Datafusion error: {raw:?} in context: {context}"))]
Datafusion {
#[snafu(source)]
@@ -238,7 +248,9 @@ impl ErrorExt for Error {
| Self::FlowNotFound { .. }
| Self::ListFlows { .. } => StatusCode::TableNotFound,
Self::Plan { .. } | Self::Datatypes { .. } => StatusCode::PlanQuery,
Self::InvalidQuery { .. } | Self::CreateFlow { .. } => StatusCode::EngineExecuteQuery,
Self::InvalidQuery { .. } | Self::CreateFlow { .. } | Self::Arrow { .. } => {
StatusCode::EngineExecuteQuery
}
Self::Unexpected { .. } => StatusCode::Unexpected,
Self::NotImplemented { .. } | Self::UnsupportedTemporalFilter { .. } => {
StatusCode::Unsupported

View File

@@ -151,12 +151,12 @@ impl ScalarExpr {
/// apply optimization to the expression, like flatten variadic function
pub fn optimize(&mut self) {
self.flatten_varidic_fn();
self.flatten_variadic_fn();
}
/// Because Substrait's `And`/`Or` function is binary, but FlowPlan's
/// `And`/`Or` function is variadic, we need to flatten the `And` function if multiple `And`/`Or` functions are nested.
fn flatten_varidic_fn(&mut self) {
fn flatten_variadic_fn(&mut self) {
if let ScalarExpr::CallVariadic { func, exprs } = self {
let mut new_exprs = vec![];
for expr in std::mem::take(exprs) {
@@ -167,7 +167,7 @@ impl ScalarExpr {
{
if *func == inner_func {
for inner_expr in inner_exprs.iter_mut() {
inner_expr.flatten_varidic_fn();
inner_expr.flatten_variadic_fn();
}
new_exprs.extend(inner_exprs);
}

View File

@@ -33,6 +33,7 @@ mod expr;
pub mod heartbeat;
mod metrics;
mod plan;
mod recording_rules;
mod repr;
mod server;
mod transform;

View File

@@ -0,0 +1,25 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Run flow as recording rule which is time-window-aware normal query triggered when new data arrives
use std::time::Duration;
mod frontend_client;
mod time_window;
mod utils;
/// TODO(discord9): make those constants configurable
/// The default rule engine query timeout is 10 minutes
pub const DEFAULT_RULE_ENGINE_QUERY_TIMEOUT: Duration = Duration::from_secs(10 * 60);

View File

@@ -0,0 +1,148 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Frontend client to run flow as recording rule which is time-window-aware normal query triggered every tick set by user
use std::sync::Arc;
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
use common_meta::peer::Peer;
use common_meta::rpc::store::RangeRequest;
use meta_client::client::MetaClient;
use snafu::ResultExt;
use crate::error::{ExternalSnafu, UnexpectedSnafu};
use crate::recording_rules::DEFAULT_RULE_ENGINE_QUERY_TIMEOUT;
use crate::Error;
fn default_channel_mgr() -> ChannelManager {
let cfg = ChannelConfig::new().timeout(DEFAULT_RULE_ENGINE_QUERY_TIMEOUT);
ChannelManager::with_config(cfg)
}
fn client_from_urls(addrs: Vec<String>) -> Client {
Client::with_manager_and_urls(default_channel_mgr(), addrs)
}
/// A simple frontend client able to execute sql using grpc protocol
#[derive(Debug)]
pub enum FrontendClient {
Distributed {
meta_client: Arc<MetaClient>,
},
Standalone {
/// for the sake of simplicity still use grpc even in standalone mode
/// notice the client here should all be lazy, so that can wait after frontend is booted then make conn
/// TODO(discord9): not use grpc under standalone mode
database_client: DatabaseWithPeer,
},
}
#[derive(Debug, Clone)]
pub struct DatabaseWithPeer {
pub database: Database,
pub peer: Peer,
}
impl DatabaseWithPeer {
fn new(database: Database, peer: Peer) -> Self {
Self { database, peer }
}
}
impl FrontendClient {
pub fn from_meta_client(meta_client: Arc<MetaClient>) -> Self {
Self::Distributed { meta_client }
}
pub fn from_static_grpc_addr(addr: String) -> Self {
let peer = Peer {
id: 0,
addr: addr.clone(),
};
let client = client_from_urls(vec![addr]);
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
Self::Standalone {
database_client: DatabaseWithPeer::new(database, peer),
}
}
}
impl FrontendClient {
async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
let Self::Distributed { meta_client, .. } = self else {
return Ok(vec![]);
};
let cluster_client = meta_client
.cluster_client()
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
let req = RangeRequest::new().with_prefix(prefix);
let resp = cluster_client
.range(req)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let mut res = Vec::with_capacity(resp.kvs.len());
for kv in resp.kvs {
let key = NodeInfoKey::try_from(kv.key)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let val = NodeInfo::try_from(kv.value)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
res.push((key, val));
}
Ok(res)
}
/// Get the database with max `last_activity_ts`
async fn get_last_active_frontend(&self) -> Result<DatabaseWithPeer, Error> {
if let Self::Standalone { database_client } = self {
return Ok(database_client.clone());
}
let frontends = self.scan_for_frontend().await?;
let mut peer = None;
if let Some((_, val)) = frontends.iter().max_by_key(|(_, val)| val.last_activity_ts) {
peer = Some(val.peer.clone());
}
let Some(peer) = peer else {
UnexpectedSnafu {
reason: format!("No frontend available: {:?}", frontends),
}
.fail()?
};
let client = client_from_urls(vec![peer.addr.clone()]);
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
Ok(DatabaseWithPeer::new(database, peer))
}
/// Get a database client, and possibly update it before returning.
pub async fn get_database_client(&self) -> Result<DatabaseWithPeer, Error> {
match self {
Self::Standalone { database_client } => Ok(database_client.clone()),
Self::Distributed { meta_client: _ } => self.get_last_active_frontend().await,
}
}
}

Some files were not shown because too many files have changed in this diff Show More