Compare commits

..

133 Commits

Author SHA1 Message Date
Ruihang Xia
ea0a347edc fix(log-query): panic on prometheus (#5429)
* fix(log-query): panic on prometheus

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix test environment setup

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-01-23 23:03:20 +08:00
Lei, HUANG
4d70589488 fix: avoid suppress manual compaction (#5399)
* fix/avoid-suppress-manual-compaction:
 **Refactor Compaction Logic**

 - Removed `PendingCompaction` struct and integrated its functionality directly into `CompactionStatus` in `compaction.rs`.
 - Simplified waiter management by consolidating waiter handling logic into `CompactionStatus`.
 - Updated `CompactionRequest` creation to directly handle waiters without intermediate structures.
 - Adjusted test cases in `compaction.rs` to align with the new waiter management approach.

(cherry picked from commit 87e2d1c2cc9bd82c02991d22e429bef25c5ee348)

* fix/avoid-suppress-manual-compaction:
 ### Add Support for Manual Compaction Requests

 - **Compaction Logic Enhancements**:
   - Updated `CompactionScheduler` in `compaction.rs` to handle manual compaction requests using `Options::StrictWindow`.
   - Introduced `PendingCompaction` struct to manage pending manual compaction requests.
   - Added logic to reschedule manual compaction requests once the current compaction task is completed.

 - **Testing**:
   - Added `test_manual_compaction_when_compaction_in_progress` to verify the handling of manual compaction requests during ongoing compaction processes.

 These changes enhance the compaction scheduling mechanism by allowing manual compaction requests to be queued and processed efficiently.

(cherry picked from commit bc38ed0f2f8ba2c4690e0d0e251aeb2acce308ca)

* chore: fix conflicts

* fix/avoid-suppress-manual-compaction:
 ### Add Error Handling for Manual Compaction Override

 - **`compaction.rs`**: Enhanced the `set_pending_request` method to handle manual compaction overrides by sending an error to the waiter if a previous request exists.
 - **`error.rs`**: Introduced a new error variant `ManualCompactionOverride` to represent manual compaction being overridden, and mapped it to the `Cancelled` status code.

* fix: format

* fix/avoid-suppress-manual-compaction:
 **Add Error Handling for Pending Compaction Requests**

 - Enhanced error handling in `compaction.rs` by adding logic to handle errors for pending compaction requests.
 - Introduced a mechanism to send errors using `waiter.send` when a pending compaction request fails, ensuring proper error propagation and context with `CompactRegionSnafu`.

* fix/avoid-suppress-manual-compaction:
 **Fix Typo and Simplify Code Logic in `compaction.rs`**

 - Corrected a typo in the license comment from "langucage" to "language".
 - Simplified the logic for handling `pending_compaction` in `CompactionStatus` by removing unnecessary pattern matching and directly accessing `waiter`.

* fix: typo
2025-01-23 19:23:06 +08:00
Yingwen
428f646fa3 feat: overwrites inferred compaction window by region options (#5396)
* feat: use time window in compaction options for compaction window

* test: add tests for overwriting options

* chore: typo

* chore: fix a grammar issue in log
2025-01-23 19:23:06 +08:00
Ruihang Xia
1d1bb83a9f feat: set default compaction parallelism (#5371)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-01-23 19:23:06 +08:00
discord9
27918686d6 fix: handle insert default value (#5307)
* fix: handle flow inserts with default values

* test: sqlness

* chore: typo

* chore: newline

* feat(WIP): impure default filler

* feat: fill impure default values

* test: add test for default fill impure

* feat: check for impure

* fix: also handle stmt to region

* refactor: per review

* refactor: per review

* chore: rebase fix

* chore: clippy

* chore: per review
2025-01-23 19:23:06 +08:00
LFC
0f55afd167 refactor: optimize out partition split insert requests (#5298)
* test: optimize out partition split insert requests if there is only one region

* Now that the optimization for single region insert has been lifted up, the original "fast path" can be obsoleted.

* resolve PR comments
2025-01-23 19:23:06 +08:00
Yiran
ea02ddcde1 ci: automatically bump doc version when release GreptimeDB (#5343)
* ci: automatically bump doc version when release GreptimeDB

* add license header
2025-01-23 15:15:52 +08:00
evenyag
0404e2a132 ci: disable pg kvbackend sqlness test 2025-01-23 15:15:52 +08:00
Ning Sun
7deb559a81 ci: revert coverage runner (#5403) 2025-01-23 15:15:52 +08:00
Ning Sun
c470c6a172 ci: use arm builders for tests (#5395) 2025-01-23 15:15:52 +08:00
Ning Sun
efee2480d2 ci: do not collect coverage from pull request any more (#5364)
* ci: do not collect coverage from pull request any more

* fix: disable toolchain cache

ci: update develop ci

update ci to the version in 121ec7936f on
main branch
2025-01-23 15:15:52 +08:00
Ning Sun
42aaf86c26 ci: disable cache for some tasks, create cache in nightly build (#5324)
* ci: disable cache for some tasks

* ci: add a nightly test to create rust cache on main
2025-01-23 15:15:52 +08:00
Ning Sun
a952ebb2ff ci: use mold for tests (#5319)
* ci: use mold for tests

* ci: enable rust cache saving for merge group
2025-01-23 15:15:52 +08:00
Ning Sun
9a5b904db3 ci: do not trigger tests when there is a merge conflict (#5318)
* ci: do not trigger tests when there is a merge conflict

* Update .github/workflows/develop.yml

Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>

* ci: disable cache from rust toolchain action

---------

Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>
2025-01-23 15:15:52 +08:00
Ning Sun
2e1a5d811a ci: disable docker/rust cache temporarily and merge docker compose files (#5293)
* ci: disable docker cache temporarily and merge docker compose files

* ci: fix compose file name and options

* ci: try to disable rust cache
2025-01-23 15:15:52 +08:00
evenyag
2d5824b3a5 chore: bump version to v0.11.3 2025-01-23 15:15:52 +08:00
Zhenchi
5f67f2b58e fix: matches incorrectly uses byte len as char len (#5411)
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-01-23 15:15:52 +08:00
Ruihang Xia
c12fbcda9f fix: panic when received invalid query string (#5366)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-01-23 15:15:52 +08:00
evenyag
7fe735009c chore: bump version to 0.11.2 2025-01-04 02:12:27 +08:00
chenmortal
f0298afaf0 fix: import tokio-metrics and tokio-metrics-collector (#5264) 2025-01-04 02:12:27 +08:00
Yingwen
5175dea6b3 chore: update greptime-proto to include add_if_not_exists (#5289) 2025-01-04 02:12:27 +08:00
discord9
7caa88abc7 refactor: flow replace check&better error msg (#5277)
* chore: better error msg

* chore eof newline

* refactor: move replace check to flow worker

* chore: add ctx to insert flow failure

* chore: Update src/flow/src/adapter/flownode_impl.rs

* test: add order by for deterministic

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
2025-01-04 02:12:27 +08:00
Lei, HUANG
eafb01dfff chore: suppress list warning (#5280)
chore/suppress-list-warning:
 ### Update logging level in `intermediate.rs`

 - Changed logging level from `warn` to `debug` for unexpected directory entries in index creation.
 - Added `debug` to the `common_telemetry` import to support the logging level change.
2025-01-04 02:12:27 +08:00
yihong
b0de816d3d fix: better fmt check from 40s to 4s (#5279)
Signed-off-by: yihong0618 <zouzou0208@gmail.com>
2025-01-04 02:12:27 +08:00
Yingwen
5c6161a95e feat: support add if not exists in the gRPC alter kind (#5273)
* test: test adding existing columns

* chore: add more checks to AlterKind

* chore: update logs

* fix: check and build table info first

* feat: Add add_if_not_exists flag to alter expr

* feat: skip existing columns when building alter kind

* checks in make_region_alter_kind()
* reuse the alter kind

* test: fix tests in common-meta

* chore: fix typos

* chore: update comments
2025-01-04 02:12:27 +08:00
discord9
5e3c5945c4 fix: flow handle reordered inserts (#5275)
* fix: reorder correct schema

* tests: reorder insert handled correctly

* chore: rm unused

* refactor: per review

* chore: more comment

* chore: per review
2025-01-04 02:12:27 +08:00
Yohan Wal
f6feac26f5 refactor: adjust index cache page size (#5267)
* refactor: adjust index cache page size

* fix: wrong docs

* Update config/datanode.example.toml

* Update config/config.md

* Update config/config.md

* chore: adjust to 64KiB

* Apply suggestions from code review
2025-01-04 02:12:27 +08:00
Ning Sun
4b2c59e626 ci: update nix setup (#5272) 2025-01-04 02:12:27 +08:00
discord9
cf605ecccc fix(flow): flow's table schema cache (#5251)
* fix: flow schema cache

* refactor: location for `to_meta_err`

* chore: endfile emptyline

* chore: review(partially)

* chore: per review

* refactor: per review

* refactor: per review
2025-01-04 02:12:27 +08:00
Ning Sun
ab3f9c42f1 fix: correct invalid testing feature gate usage (#5258)
* fix: correct invalid testing feature gate usage

* test: refactor tests to avoid test code leak

* fix: sync main
2025-01-04 02:12:27 +08:00
discord9
258fc6f31b chore: typo (#5265)
* fix: a typo

* chore: even more typos
2025-01-04 02:12:27 +08:00
jeremyhi
e2dccc1d1a feat: hints all in one (#5194)
* feat: hints all in one

* chore: If hints are provided in the x-greptime-hints header, ignore the rest of the headers
2025-01-04 02:12:27 +08:00
Ruihang Xia
78c5707642 feat(log-query): implement pagination with limit and offset parameters (#5241)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-01-04 02:12:27 +08:00
Ning Sun
204b5e474f ci: disable pyo3 build tasks (#5256)
* ci: disable pyo3 build tasks

* ci: skip installing python for windows

* ci: also removed python dependencies from docker base image
2025-01-04 02:12:27 +08:00
Ruihang Xia
e9f1fa0b7d feat: override __sequence on creating SST to save space and CPU (#5252)
* override memtable sequence

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* override sst sequence

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* chore changes per to CR comments

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* use correct sequence number

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* wrap a method to get max sequence

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-01-04 02:12:27 +08:00
Yingwen
a988ff5acf feat: update partition duration of memtable using compaction window (#5197)
* feat: update partition duration of memtable using compaction window

* chore: only use provided duration if it is not None

* test: more tests

* test: test compaction apply window

* style: fix clippy
2025-01-04 02:12:27 +08:00
zyy17
ef0fca9388 refactor: support to convert time string to timestamp in convert_value() (#5242)
refactor: support to covert time string to timestamp in convert_value()
2025-01-04 02:12:27 +08:00
Lin Yihai
b704e7f703 feat: add vec_div function (#5245) 2025-01-04 02:12:27 +08:00
Ning Sun
3a4c636e29 ci: make sure clippy passes before running tests (#5253)
* ci: make sure clippy passes before running tests

* ci: do not run ci on main branch
2025-01-04 02:12:27 +08:00
Zhenchi
a22e8b421c fix(bloom-filter): skip applying for non-indexed columns (#5246)
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-01-04 02:12:27 +08:00
Yingwen
5b42546204 fix: implement a CacheStrategy to ensure compaction use cache correctly (#5254)
* feat: impl CacheStrategy

* refactor: replace Option<CacheManagerRef> with CacheStrategy

* feat: add disabled strategy

* ci: force update taplo

* refactor: rename CacheStrategy::Normal to CacheStrategy::EnableAll

* ci: force install cargo-gc-bin

* ci: force install

* chore: use CacheStrategy::Disabled as ScanInput default

* chore: fix compiler errors
2025-01-04 02:12:27 +08:00
Ruihang Xia
0678a31ab1 feat: add sqlness test for bloom filter index (#5240)
* feat: add sqlness test for bloom filter index

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* drop table after finished

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* redact more variables

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-01-04 02:12:27 +08:00
shuiyisong
589cc84048 fix: disable path label in opendal for now (#5247)
* fix: remove path label in opendal for now

* fix: typo

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

---------

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
2025-01-04 02:12:27 +08:00
Kould
ed8c072a5e feat(vector): add vector functions vec_sub & vec_sum & vec_elem_sum (#5230)
* feat(vector): add sub function

* chore: added check for vector length misalignment

* feat(vector): add `vec_sum` & `vec_elem_sum`

* chore: codefmt

* update lock file

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
2025-01-04 02:12:27 +08:00
Yohan Wal
9d172f1cae feat: init PgElection with candidate registration (#5209)
* feat: init PgElection

fix: release advisory lock

fix: handle duplicate keys

chore: update comments

fix: unlock if acquired the lock

chore: add TODO and avoid unwrap

refactor: check both lock and expire time, add more comments

chore: fmt

fix: deal with multiple edge cases

feat: init PgElection with candidate registration

chore: fmt

chore: remove

* test: add unit test for pg candidate registration

* test: add unit test for pg candidate registration

* chore: update pg env

* chore: make ci happy

* fix: spawn a background connection thread

* chore: typo

* fix: shadow the election client for now

* fix: fix ci

* chore: readability

* chore: follow review comments

* refactor: use kvbackend for pg election

* chore: rename

* chore: make clippy happy

* refactor: use pg server time instead of local ones

* chore: typo

* chore: rename infancy to leader_infancy for clarification

* chore: clean up

* chore: follow review comments

* chore: follow review comments

* ci: unit test should test all features

* ci: fix

* ci: just test pg
2025-01-04 02:12:27 +08:00
Zhenchi
236888313d feat(mito): add bloom filter read metrics (#5239)
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-01-04 02:12:27 +08:00
Zhenchi
0b97ef0e4f feat(config): add bloom filter config (#5237)
* feat(bloom-filter): integrate indexer with mito2

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat(config) add bloom filter config

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix docs

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix docs

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* merge

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* remove cache config

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-01-04 02:12:27 +08:00
localhost
316e6a83eb chore: add more info for pipeline dryrun API (#5232) 2025-01-04 02:12:27 +08:00
Ruihang Xia
6dc57b7a6c feat(bloom-filter): bloom filter applier (#5220)
* wip

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* draft search logic

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* use defined BloomFilterReader

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* round the range end

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* finish index applier

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* integrate applier into mito2 with cache layer

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix cache key and add unit test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* provide bloom filter index size hint

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* revert BloomFilterReaderImpl::read_vec

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove dead code

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* ignore null on eq

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add more tests and fix bloom filter logic

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-01-04 02:12:27 +08:00
discord9
1f5c2b32e5 fix: flow compare null values (#5234)
* fix: flow compare null values

* fix: fix again ck ty before cmp

* chore: rm comment

* fix: handle null

* chore: typo

* docs: update comment

* refactor: per review

* tests: more sqlness

* tests: sqlness not show create table
2025-01-04 02:12:27 +08:00
Zhenchi
01e907be40 feat(bloom-filter): integrate indexer with mito2 (#5236)
* feat(bloom-filter): integrate indexer with mito2

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* rename skippingindextype

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-01-04 02:12:27 +08:00
Lin Yihai
e4dc5ea243 feat: Add vec_mul function. (#5205) 2025-01-04 02:12:27 +08:00
discord9
3ff5754b5a feat(flow): check sink table mismatch on flow creation (#5112)
* tests: more mismatch errors

* feat: check sink table schema if exists&prompt nice err msg

* chore: rm unused variant

* chore: fmt

* chore: cargo clippy

* feat: check schema on create

* feat: better err msg when mismatch

* tests: fix a schema mismatch

* todo: create sink table

* feat: create sink table

* fix: find time index

* tests: auto created sink table

* fix: remove empty keys

* refactor: per review

* chore: fmt

* test: sqlness

* chore: after rebase
2025-01-04 02:12:27 +08:00
Ruihang Xia
c22ca3ebd5 feat: add some critical metrics to flownode (#5235)
* feat: add some critical metrics to flownode

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-01-04 02:12:27 +08:00
zyy17
327d165ad9 feat: introduce the Limiter in frontend to limit the requests by in-flight write bytes size. (#5231)
feat: introduct Limiter to limit in-flight write bytes size in frontend
2025-01-04 02:12:27 +08:00
discord9
fe63a620ef ci: upload .pdb files too for better windows debug (#5224)
ci: upload .pdb files too
2025-01-04 02:12:27 +08:00
Zhenchi
be81f0db5a feat(bloom-filter): impl batch push to creator (#5225)
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-01-04 02:12:27 +08:00
Ruihang Xia
6ca7a305ae fix: correct write cache's metric labels (#5227)
* refactor: remove unused field in WriteCache

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* refactor: unify read and write cache path

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update config and fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove unnecessary methods and adapt test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* change the default path

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove remote-home

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-01-04 02:12:27 +08:00
Weny Xu
1111a8bd57 chore: add log for converting region to follower (#5222)
* chore: add log for converting region to follower

* chore: apply suggestions from CR
2025-01-04 02:12:27 +08:00
zyy17
66b21b29b5 ci: support to pack multiple files in upload-artifacts action (#5228) 2025-01-04 02:12:27 +08:00
Lei, HUANG
31cfab81ad feat(mito): parquet memtable reader (#4967)
* wip: row group reader base

* wip: memtable row group reader

* Refactor MemtableRowGroupReader to streamline data fetching

 - Added early return when fetch_ranges is empty to optimize performance.
 - Replaced inline chunk data assignment with a call to `assign_dense_chunk` for cleaner code.

* wip: row group reader

* wip: reuse RowGroupReader

* wip: bulk part reader

* Enhance BulkPart Iteration with Filtering

 - Introduced `RangeBase` to `BulkIterContext` for improved filter handling.
 - Implemented filter application in `BulkPartIter` to prune batches based on predicates.
 - Updated `SimpleFilterContext::new_opt` to be public for broader access.

* chore: add prune test

* fix: clippy

* fix: introduce prune reader for memtable and add more prune test

* Enhance BulkPart read method to return Option<BoxedBatchIterator>

 - Modified `BulkPart::read` to return `Option<BoxedBatchIterator>` to handle cases where no row groups are selected.
 - Added logic to return `None` when all row groups are filtered out.
 - Updated tests to handle the new return type and added a test case to verify behavior when no row groups match the pr

* refactor/separate-paraquet-reader: Add helper function to parse parquet metadata and integrate it into BulkPartEncoder

* refactor/separate-paraquet-reader:
 Change BulkPartEncoder row_group_size from Option to usize and update tests

* refactor/separate-paraquet-reader: Add context module for bulk memtable iteration and refactor part reading

 • Introduce context module to encapsulate context for bulk memtable iteration.
 • Refactor BulkPart to use BulkIterContextRef for reading operations.
 • Remove redundant code in BulkPart by centralizing context creation and row group pruning logic in the new context module.
 • Create new file context.rs with structures and logic for handling iteration context.
 • Adjust part_reader.rs and row_group_reader.rs to reference the new BulkIterContextRef.

* refactor/separate-paraquet-reader: Refactor RowGroupReader traits and implementations in memtable and parquet reader modules

 • Rename RowGroupReaderVirtual to RowGroupReaderContext for clarity.
 • Replace BulkPartVirt with direct usage of BulkIterContextRef in MemtableRowGroupReader.
 • Simplify MemtableRowGroupReaderBuilder by directly passing context instead of creating a BulkPartVirt instance.
 • Update RowGroupReaderBase to use context field instead of virt, reflecting the trait renaming and usage.
 • Modify FileRangeVirt to FileRangeContextRef and adjust implementations accordingly.

* refactor/separate-paraquet-reader: Refactor column page reader creation and remove unused code

 • Centralize creation of SerializedPageReader in RowGroupBase::column_reader method.
 • Remove unused RowGroupCachedReader and related code from MemtableRowGroupPageFetcher.
 • Eliminate redundant error handling for invalid column index in multiple places.

* chore: rebase main and resolve conflicts

* fix: some comments

* chore: resolve conflicts

* chore: resolve conflicts
2025-01-04 02:12:27 +08:00
Ruihang Xia
dd3a509607 chore: bump opendal to fork version to fix prometheus layer (#5223)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-01-04 02:12:27 +08:00
Weny Xu
d4cae6af1e refactor: remove unnecessary wrap (#5221)
* chore: remove unnecessary arc

* chore: remove unnecessary box
2025-01-04 02:12:27 +08:00
Ruihang Xia
3fec71b5c0 feat: logs query endpoint (#5202)
* define endpoint

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* planner

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update lock file

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add unit test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix toml format

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* revert metric change

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Update src/query/src/log_query/planner.rs

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix compile

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* refactor and tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-01-04 02:12:27 +08:00
Zhenchi
9e31a6478b feat(index-cache): abstract IndexCache to be shared by multi types of indexes (#5219)
* feat(index-cache): abstract `IndexCache` to be shared by multi types of indexes

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix typo

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: remove added label

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: simplify cached reader impl

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* rename func

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-01-04 02:12:27 +08:00
zyy17
bce291a8e1 docs: add greptimedb-operator project link in 'Tools & Extensions' and other small improvements (#5216) 2025-01-04 02:12:27 +08:00
Ning Sun
c788eb67e2 ci: fix nightly ci task on nix build (#5198) 2025-01-04 02:12:27 +08:00
Yiran
0c32dcf46c fix: dead links (#5212) 2025-01-04 02:12:27 +08:00
Zhenchi
68a05b38bd feat(bloom-filter): add bloom filter reader (#5204)
* feat(bloom-filter): add bloom filter reader

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: remove unused dep

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix conflict

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-01-04 02:12:27 +08:00
Zhenchi
ee72ae8bd0 feat(bloom-filter): add memory control for creator (#5185)
* feat(bloom-filter): add memory control for creator

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: remove meaningless buf

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat: add codec for intermediate

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-01-04 02:12:27 +08:00
Weny Xu
556bd796d8 chore: adjust fuzz tests cfg (#5207) 2025-01-04 02:12:27 +08:00
Ruihang Xia
1327e8809f feat: bump opendal and switch prometheus layer to the upstream impl (#5179)
* feat: bump opendal and switch prometheus layer to the upstream impl

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove unused files

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove unused things

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove root dir on recovering cache

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* filter out non-files entry in test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-01-04 02:12:27 +08:00
Yingwen
17d75c767c chore: update all arm64 builders (#5215) 2024-12-21 18:14:45 +08:00
Yingwen
a1ed450c0c chore: arm64 use 8xlarge runner (#5213) 2024-12-20 22:29:25 +08:00
evenyag
ea4ce9d1e3 chore: set version to 0.11.1 2024-12-20 14:12:19 +08:00
evenyag
1f7d9666b7 chore: Downgrade opendal for releasing 0.11.1
Revert "feat: bump opendal and switch prometheus layer to the upstream impl (#5179)"

This reverts commit 422d18da8b.
2024-12-20 14:12:19 +08:00
LFC
9f1a0d78b2 ci: install latest protobuf in dev-builder image (#5196) 2024-12-20 14:12:19 +08:00
discord9
ed8e418716 fix: auto created table ttl check (#5203)
* fix: auto created table ttl check

* tests: with hint
2024-12-20 14:12:19 +08:00
discord9
9e7121c1bb fix(flow): batch builder with type (#5195)
* fix: typed builder

* chore: clippy

* chore: rename

* fix: unit tests

* refactor: per review
2024-12-20 14:12:19 +08:00
dennis zhuang
94a49ed4f0 chore: update PR template (#5199) 2024-12-20 14:12:19 +08:00
discord9
f5e743379f feat: show flow's mem usage in INFORMATION_SCHEMA.FLOWS (#4890)
* feat: add flow mem size to sys table

* chore: rm dup def

* chore: remove unused variant

* chore: minor refactor

* refactor: per review
2024-12-20 14:12:19 +08:00
Ruihang Xia
6735e5867e feat: bump opendal and switch prometheus layer to the upstream impl (#5179)
* feat: bump opendal and switch prometheus layer to the upstream impl

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove unused files

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove unused things

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove root dir on recovering cache

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* filter out non-files entry in test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-12-20 14:12:19 +08:00
Weny Xu
925525726b fix: ensure table route metadata is eventually rolled back on failure (#5174)
* fix: ensure table route metadata is eventually rolled back on procedure failure

* fix(fuzz): enhance procedure condition checking

* chore: add logs

* feat: close downgraded leader region actively

* chore: apply suggestions from CR
2024-12-20 14:12:19 +08:00
Ning Sun
6427682a9a feat: show create postgresql foreign table (#5143)
* feat: add show create table for pg in parser

* feat: implement show create table operation

* fix: adopt upstream changes
2024-12-20 14:12:19 +08:00
Ning Sun
55b0022676 chore: make nix compilation environment config more robust (#5183)
* chore: improve nix-shell support

* fix: add pkg-config

* ci: add a github action to ensure build on clean system

* ci: optimise dependencies of task

* ci: move clean build to nightly
2024-12-20 14:12:19 +08:00
Ruihang Xia
2d84cc8d87 refactor: remove unused symbols (#5193)
chore: remove unused symbols

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-12-20 14:12:19 +08:00
Yingwen
c030705b17 docs: fix grafana dashboard row (#5192) 2024-12-20 14:12:19 +08:00
Ruihang Xia
443c600bd0 fix: validate matcher op for __name__ in promql (#5191)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-12-20 14:12:19 +08:00
Lei, HUANG
39cadfe10b fix(sqlness): enforce order in union tests (#5190)
Add ORDER BY clause to subquery union tests

 Updated the SQL and result files for subquery union tests to include an ORDER BY clause, ensuring consistent result ordering. This change aligns with the test case from the DuckDB repository.
2024-12-20 14:12:19 +08:00
jeremyhi
9b5e4e80f7 feat: extract hints from http header (#5128)
* feat: extract hints from http header

* Update src/servers/src/http/hints.rs

Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>

* chore: by comment

* refactor: get instead of loop

---------

Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>
2024-12-20 14:12:19 +08:00
Yingwen
041a276b66 feat: do not remove time filters in ScanRegion (#5180)
* feat: do not remove time filters

* chore: remove `time_range` from parquet reader

* chore: print more message in the check script

* chore: fix unused error
2024-12-20 14:12:19 +08:00
Yingwen
614a25ddc5 feat: do not keep MemtableRefs in ScanInput (#5184) 2024-12-20 14:12:19 +08:00
dennis zhuang
4337e20010 feat: impl label_join and label_replace for promql (#5153)
* feat: impl label_join and label_replace for promql

* chore: style

* fix: dst_label is eqauls to src_label

* fix: forgot to sort the results

* fix: processing empty source label
2024-12-20 14:12:19 +08:00
Lanqing Yang
65c52cc698 fix: display inverted and fulltext index in show index (#5169) 2024-12-20 14:12:19 +08:00
Yohan Wal
50f31fd681 feat: introduce Buffer for non-continuous bytes (#5164)
* feat: introduce Buffer for non-continuous bytes

* Update src/mito2/src/cache/index.rs

Co-authored-by: Weny Xu <wenymedia@gmail.com>

* chore: apply review comments

* refactor: use opendal::Buffer

---------

Co-authored-by: Weny Xu <wenymedia@gmail.com>
2024-12-20 14:12:19 +08:00
LFC
b5af5aaf8d refactor: produce BatchBuilder from a Batch to modify it again (#5186)
chore: pub some mods
2024-12-20 14:12:19 +08:00
Lei, HUANG
27693c7f1e perf: avoid holding memtable during compaction (#5157)
* perf/avoid-holding-memtable-during-compaction: Refactor Compaction Version Handling

 • Introduced CompactionVersion struct to encapsulate region version details for compaction, removing dependency on VersionRef.
 • Updated CompactionRequest and CompactionRegion to use CompactionVersion.
 • Modified open_compaction_region to construct CompactionVersion without memtables.
 • Adjusted WindowedCompactionPicker to work with CompactionVersion.
 • Enhanced flush logic in WriteBufferManager to improve memory usage checks and logging.

* reformat code

* chore: change log level

* reformat code

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
2024-12-20 14:12:19 +08:00
discord9
a59fef9ffb test: sqlness upgrade compatibility tests (#5126)
* feat: simple version switch

* chore: remove debug print

* chore: add common folder

* tests: add drop table

* feat: pull versioned binary

* chore: don't use native-tls

* chore: rm outdated docs

* chore: new line

* fix: save old bin dir

* fix: switch version restart all node

* feat: use etcd

* fix: wait for election

* fix: normal sqlness

* refactor: hashmap for bin dir

* test: past 3 major version compat crate table

* refactor: allow using without setup etcd
2024-12-20 14:12:19 +08:00
Zhenchi
bcecd8ce52 feat(bloom-filter): add basic bloom filter creator (Part 1) (#5177)
* feat(bloom-filter): add a simple bloom filter creator (Part 1)

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: clippy

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: header

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* docs: add format comment

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2024-12-20 14:12:19 +08:00
Yingwen
ffdcb8c1ac fix: deletion between two put may not work in last_non_null mode (#5168)
* fix: deletion between rows with the same key may not work

* test: add sqlness test case

* chore: comments
2024-12-20 14:12:19 +08:00
Yingwen
554121ad79 chore: add aquamarine to dep lists (#5181) 2024-12-20 14:12:19 +08:00
Weny Xu
43c12b4f2c fix: correct set_region_role_state_gracefully behaviors (#5171)
* fix: reduce default max rows for fuzz testing

* chore: remove Postgres setup from fuzz test workflow

* chore(fuzz): increase resource limits for GreptimeDB cluster

* chore(fuzz): increase resource limits for kafka

* fix: correct `set_region_role_state_gracefully` behaviors

* chore: remove Postgres setup from fuzz test workflow

* chore(fuzz): redue resource limits for GreptimeDB & kafka
2024-12-20 14:12:19 +08:00
discord9
7aa8c28fe4 test: flow rebuild (#5162)
* tests: rebuild flow

* tests: more rebuild

* tests: restart

* chore: drop clean
2024-12-20 14:12:19 +08:00
Ning Sun
34fbe7739e chore: add nix-shell configure for a minimal environment for development (#5175)
* chore: add nix-shell development environment

* chore: add rust-analyzer

* chore: use .envrc as a private file
2024-12-20 14:12:19 +08:00
ZonaHe
06d7bd99dd feat: update dashboard to v0.7.3 (#5172)
Co-authored-by: sunchanglong <sunchanglong@users.noreply.github.com>
2024-12-20 14:12:19 +08:00
Ruihang Xia
b71d842615 feat: introduce SKIPPING index (part 1) (#5155)
* skip index parser

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* wip: sqlness

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* impl show create part

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add empty line

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* change keyword to SKIPPING INDEX

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* rename local variables

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-12-20 14:12:19 +08:00
Lei, HUANG
7f71693b8e chore: gauge for flush compaction (#5156)
* add metrics

* chore/bench-metrics: Add INFLIGHT_FLUSH_COUNT Metric to Flush Process

 • Introduced INFLIGHT_FLUSH_COUNT metric to track the number of ongoing flush operations.
 • Incremented INFLIGHT_FLUSH_COUNT in FlushScheduler to monitor active flushes.
 • Removed redundant increment of INFLIGHT_FLUSH_COUNT in RegionWorkerLoop to prevent double counting.

* chore/bench-metrics: Add Metrics for Compaction and Flush Operations

 • Introduced INFLIGHT_COMPACTION_COUNT and INFLIGHT_FLUSH_COUNT metrics to track the number of ongoing compaction and flush operations.
 • Incremented INFLIGHT_COMPACTION_COUNT when scheduling remote and local compaction jobs, and decremented it upon completion.
 • Added INFLIGHT_FLUSH_COUNT increment and decrement logic around flush tasks to monitor active flush operations.
 • Removed redundant metric updates in worker.rs and handle_compaction.rs to streamline metric handling.

* chore: add metrics for remote compaction jobs

* chore: format

* chore: also add dashbaord
2024-12-20 14:12:19 +08:00
Lin Yihai
615ea1a171 feat: Add vector_scalar_mul function. (#5166) 2024-12-20 14:12:19 +08:00
shuiyisong
4e725d259d chore: remove unused dep (#5163)
* chore: remove unused dep

* chore: remove more unused dep
2024-12-20 14:12:19 +08:00
Niwaka
dc2252eb6d fix: support alter table ~ add ~ custom_type (#5165) 2024-12-20 14:12:19 +08:00
Yingwen
6d4cc2e070 ci: use 4xlarge for nightly build (#5158) 2024-12-20 14:12:19 +08:00
localhost
6066ce2c4a fix: loki write row len error (#5161) 2024-12-20 14:12:19 +08:00
Yingwen
b90d8f7dbd docs: Add index panels to standalone grafana dashboard (#5140)
* docs: Add index panels to standalnoe grafana dashboard

* docs: fix flush/compaction op
2024-12-20 14:12:19 +08:00
Yohan Wal
fdccf4ff84 refactor: cache inverted index with fixed-size page (#5114)
* feat: cache inverted index by page instead of file

* fix: add unit test and fix bugs

* chore: typo

* chore: ci

* fix: math

* chore: apply review comments

* chore: renames

* test: add unit test for index key calculation

* refactor: use ReadableSize

* feat: add config for inverted index page size

* chore: update config file

* refactor: handle multiple range read and fix some related bugs

* fix: add config

* test: turn to a fs reader to match behaviors of object store
2024-12-20 14:12:19 +08:00
localhost
8b1484c064 chore: pipeline dryrun api can currently receives pipeline raw content (#5142)
* chore: pipeline dryrun api can currently receives pipeline raw content

* chore: remove dryrun v1 and add test

* chore: change dryrun pipeline api body schema

* chore: remove useless struct PipelineInfo

* chore: update PipelineDryrunParams doc

* chore: increase code readability

* chore: add some comment for pipeline dryrun test

* Apply suggestions from code review

Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>

* chore: format code

---------

Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>
2024-12-20 14:12:19 +08:00
Yingwen
576e20ac78 feat: collect reader metrics from prune reader (#5152) 2024-12-20 14:12:19 +08:00
localhost
10b3e3da0f chore: decide tag column in log api follow table schema if table exists (#5138)
* chore: decide tag column in log api follow table schema if table exists

* chore: add more test for greptime_identity pipeline

* chore: change pipeline get_table function signature

* chore: change identity_pipeline_inner tag_column_names type
2024-12-20 14:12:19 +08:00
Weny Xu
4a3ef2d718 feat(index): add file_size_hint for remote blob reader (#5147)
feat(index): add file_size_hint for remote blob reader
2024-12-20 14:12:19 +08:00
Yohan Wal
65eabb2a05 feat(fuzz): add alter table options for alter fuzzer (#5074)
* feat(fuzz): add set table options to alter fuzzer

* chore: clippy is happy, I'm sad

* chore: happy ci happy

* fix: unit test

* feat(fuzz): add unset table options to alter fuzzer

* fix: unit test

* feat(fuzz): add table option validator

* fix: make clippy happy

* chore: add comments

* chore: apply review comments

* fix: unit test

* feat(fuzz): add more ttl options

* fix: #5108

* chore: add comments

* chore: add comments
2024-12-20 14:12:19 +08:00
Weny Xu
bc5a57f51f feat: introduce PuffinMetadataCache (#5148)
* feat: introduce `PuffinMetadataCache`

* refactor: remove too_many_arguments

* chore: fmt toml
2024-12-20 14:12:19 +08:00
Weny Xu
f24b9d8814 feat: add prefetch support to InvertedIndexFooterReader for reduced I/O time (#5146)
* feat: add prefetch support to `InvertedIndeFooterReader`

* chore: correct struct name

* chore: apply suggestions from CR
2024-12-20 14:12:19 +08:00
Weny Xu
dd4d0a88ce feat: add prefetch support to PuffinFileFooterReader for reduced I/O time (#5145)
* feat: introduce `PuffinFileFooterReader`

* refactor: remove `SyncReader` trait and impl

* refactor: replace `FooterParser` with `PuffinFileFooterReader`

* chore: remove unused errors
2024-12-20 14:12:19 +08:00
Niwaka
3d2096fe9d feat: support push down IN filter (#5129)
* feat: support push down IN filter

* chore: move tests to prune.sql
2024-12-20 14:12:19 +08:00
Ruihang Xia
35715bb710 feat: implement v1/sql/parse endpoint to parse GreptimeDB's SQL dialect (#5144)
* derive ser/de

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* impl method

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove deserialize

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-12-20 14:12:19 +08:00
ZonaHe
08a3befa67 feat: update dashboard to v0.7.2 (#5141)
Co-authored-by: sunchanglong <sunchanglong@users.noreply.github.com>
2024-12-20 14:12:19 +08:00
Yohan Wal
ca1758d4e7 test: part of parser test migrated from duckdb (#5125)
* test: update test

* fix: fix test
2024-12-20 14:12:19 +08:00
Zhenchi
42bf818167 feat(vector): add scalar add function (#5119)
* refactor: extract implicit conversion helper functions of vector

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat(vector): add scalar add function

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix fmt

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2024-12-20 14:12:19 +08:00
Lei, HUANG
2c9b117224 perf: avoid cache during compaction (#5135)
* Revert "refactor: Avoid wrapping Option for CacheManagerRef (#4996)"

This reverts commit 42bf7e9965.

* fix: memory usage during log ingestion

* fix: fmt
2024-12-20 14:12:19 +08:00
dennis zhuang
3edf2317e1 feat: adjust WAL purge default configurations (#5107)
* feat: adjust WAL purge default configurations

* fix: config

* feat: change raft engine file_size default to 128Mib
2024-12-20 14:12:19 +08:00
jeremyhi
85d72a3cd0 chore: set store_key_prefix for all kvbackend (#5132) 2024-12-20 14:12:19 +08:00
discord9
928172bd82 chore: fix aws_lc not in depend tree check in CI (#5121)
* chore: fix aws_lc check in CI

* chore: update lock file
2024-12-20 14:12:19 +08:00
shuiyisong
e9f5bddeff chore: add /ready api for health checking (#5124)
* chore: add ready endpoint for health checking

* chore: add test
2024-12-20 14:12:19 +08:00
Yingwen
486755d795 chore: bump main branch version to 0.12 (#5133)
chore: bump version to v0.12.0
2024-12-20 14:12:19 +08:00
103 changed files with 3629 additions and 1586 deletions

View File

@@ -1,9 +1,6 @@
name: Check Dependencies
on:
push:
branches:
- main
pull_request:
branches:
- main

View File

@@ -29,7 +29,7 @@ on:
linux_arm64_runner:
type: choice
description: The runner uses to build linux-arm64 artifacts
default: ec2-c6g.4xlarge-arm64
default: ec2-c6g.8xlarge-arm64
options:
- ec2-c6g.xlarge-arm64 # 4C8G
- ec2-c6g.2xlarge-arm64 # 8C16G

View File

@@ -43,7 +43,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ windows-2022, ubuntu-20.04 ]
os: [ ubuntu-20.04 ]
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
@@ -57,6 +57,8 @@ jobs:
# Shares across multiple jobs
# Shares with `Clippy` job
shared-key: "check-lint"
cache-all-crates: "true"
save-if: ${{ github.ref == 'refs/heads/main' }}
- name: Run cargo check
run: cargo check --locked --workspace --all-targets
@@ -67,11 +69,6 @@ jobs:
steps:
- uses: actions/checkout@v4
- uses: actions-rust-lang/setup-rust-toolchain@v1
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
# Shares across multiple jobs
shared-key: "check-toml"
- name: Install taplo
run: cargo +stable install taplo-cli --version ^0.9 --locked --force
- name: Run taplo
@@ -94,13 +91,15 @@ jobs:
with:
# Shares across multiple jobs
shared-key: "build-binaries"
cache-all-crates: "true"
save-if: ${{ github.ref == 'refs/heads/main' }}
- name: Install cargo-gc-bin
shell: bash
run: cargo install cargo-gc-bin --force
- name: Build greptime binaries
shell: bash
# `cargo gc` will invoke `cargo build` with specified args
run: cargo gc -- --bin greptime --bin sqlness-runner
run: cargo gc -- --bin greptime --bin sqlness-runner --features pg_kvbackend
- name: Pack greptime binaries
shell: bash
run: |
@@ -142,11 +141,6 @@ jobs:
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions-rust-lang/setup-rust-toolchain@v1
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
# Shares across multiple jobs
shared-key: "fuzz-test-targets"
- name: Set Rust Fuzz
shell: bash
run: |
@@ -200,11 +194,6 @@ jobs:
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions-rust-lang/setup-rust-toolchain@v1
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
# Shares across multiple jobs
shared-key: "fuzz-test-targets"
- name: Set Rust Fuzz
shell: bash
run: |
@@ -255,13 +244,15 @@ jobs:
with:
# Shares across multiple jobs
shared-key: "build-greptime-ci"
cache-all-crates: "true"
save-if: ${{ github.ref == 'refs/heads/main' }}
- name: Install cargo-gc-bin
shell: bash
run: cargo install cargo-gc-bin --force
- name: Build greptime bianry
shell: bash
# `cargo gc` will invoke `cargo build` with specified args
run: cargo gc --profile ci -- --bin greptime
run: cargo gc --profile ci -- --bin greptime --features pg_kvbackend
- name: Pack greptime binary
shell: bash
run: |
@@ -317,11 +308,6 @@ jobs:
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions-rust-lang/setup-rust-toolchain@v1
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
# Shares across multiple jobs
shared-key: "fuzz-test-targets"
- name: Set Rust Fuzz
shell: bash
run: |
@@ -466,11 +452,6 @@ jobs:
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions-rust-lang/setup-rust-toolchain@v1
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
# Shares across multiple jobs
shared-key: "fuzz-test-targets"
- name: Set Rust Fuzz
shell: bash
run: |
@@ -578,8 +559,8 @@ jobs:
- uses: actions/checkout@v4
- if: matrix.mode.kafka
name: Setup kafka server
working-directory: tests-integration/fixtures/kafka
run: docker compose -f docker-compose-standalone.yml up -d --wait
working-directory: tests-integration/fixtures
run: docker compose up -d --wait kafka
- name: Download pre-built binaries
uses: actions/download-artifact@v4
with:
@@ -609,11 +590,6 @@ jobs:
- uses: actions-rust-lang/setup-rust-toolchain@v1
with:
components: rustfmt
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
# Shares across multiple jobs
shared-key: "check-rust-fmt"
- name: Check format
run: make fmt-check
@@ -635,55 +611,99 @@ jobs:
# Shares across multiple jobs
# Shares with `Check` job
shared-key: "check-lint"
cache-all-crates: "true"
save-if: ${{ github.ref == 'refs/heads/main' }}
- name: Run cargo clippy
run: make clippy
coverage:
if: github.event.pull_request.draft == false
runs-on: ubuntu-20.04-8-cores
conflict-check:
name: Check for conflict
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Merge Conflict Finder
uses: olivernybroe/action-conflict-finder@v4.0
test:
if: github.event_name != 'merge_group'
runs-on: ubuntu-24.04-arm
timeout-minutes: 60
needs: [clippy, fmt]
needs: [conflict-check, clippy, fmt]
steps:
- uses: actions/checkout@v4
- uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: KyleMayes/install-llvm-action@v1
with:
version: "14.0"
- uses: rui314/setup-mold@v1
- name: Install toolchain
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
components: llvm-tools-preview
cache: false
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
# Shares cross multiple jobs
shared-key: "coverage-test"
- name: Docker Cache
uses: ScribeMD/docker-cache@0.3.7
cache-all-crates: "true"
save-if: ${{ github.ref == 'refs/heads/main' }}
- name: Install latest nextest release
uses: taiki-e/install-action@nextest
- name: Setup external services
working-directory: tests-integration/fixtures
run: docker compose up -d --wait
- name: Run nextest cases
run: cargo nextest run --workspace -F dashboard -F pg_kvbackend
env:
CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=mold"
RUST_BACKTRACE: 1
CARGO_INCREMENTAL: 0
GT_S3_BUCKET: ${{ vars.AWS_CI_TEST_BUCKET }}
GT_S3_ACCESS_KEY_ID: ${{ secrets.AWS_CI_TEST_ACCESS_KEY_ID }}
GT_S3_ACCESS_KEY: ${{ secrets.AWS_CI_TEST_SECRET_ACCESS_KEY }}
GT_S3_REGION: ${{ vars.AWS_CI_TEST_BUCKET_REGION }}
GT_MINIO_BUCKET: greptime
GT_MINIO_ACCESS_KEY_ID: superpower_ci_user
GT_MINIO_ACCESS_KEY: superpower_password
GT_MINIO_REGION: us-west-2
GT_MINIO_ENDPOINT_URL: http://127.0.0.1:9000
GT_ETCD_ENDPOINTS: http://127.0.0.1:2379
GT_POSTGRES_ENDPOINTS: postgres://greptimedb:admin@127.0.0.1:5432/postgres
GT_KAFKA_ENDPOINTS: 127.0.0.1:9092
GT_KAFKA_SASL_ENDPOINTS: 127.0.0.1:9093
UNITTEST_LOG_DIR: "__unittest_logs"
coverage:
if: github.event_name == 'merge_group'
runs-on: ubuntu-20.04-8-cores
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
- uses: arduino/setup-protoc@v3
with:
key: docker-${{ runner.os }}-coverage
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: rui314/setup-mold@v1
- name: Install toolchain
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
components: llvm-tools
cache: false
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
# Shares cross multiple jobs
shared-key: "coverage-test"
save-if: ${{ github.ref == 'refs/heads/main' }}
- name: Install latest nextest release
uses: taiki-e/install-action@nextest
- name: Install cargo-llvm-cov
uses: taiki-e/install-action@cargo-llvm-cov
- name: Setup etcd server
working-directory: tests-integration/fixtures/etcd
run: docker compose -f docker-compose-standalone.yml up -d --wait
- name: Setup kafka server
working-directory: tests-integration/fixtures/kafka
run: docker compose -f docker-compose-standalone.yml up -d --wait
- name: Setup minio
working-directory: tests-integration/fixtures/minio
run: docker compose -f docker-compose-standalone.yml up -d --wait
- name: Setup postgres server
working-directory: tests-integration/fixtures/postgres
run: docker compose -f docker-compose-standalone.yml up -d --wait
- name: Setup external services
working-directory: tests-integration/fixtures
run: docker compose up -d --wait
- name: Run nextest cases
run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F dashboard -F pg_kvbackend
env:
CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=lld"
CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=mold"
RUST_BACKTRACE: 1
CARGO_INCREMENTAL: 0
GT_S3_BUCKET: ${{ vars.AWS_CI_TEST_BUCKET }}

View File

@@ -27,7 +27,7 @@ on:
linux_arm64_runner:
type: choice
description: The runner uses to build linux-arm64 artifacts
default: ec2-c6g.4xlarge-arm64
default: ec2-c6g.8xlarge-arm64
options:
- ec2-c6g.xlarge-arm64 # 4C8G
- ec2-c6g.2xlarge-arm64 # 8C16G

View File

@@ -108,7 +108,53 @@ jobs:
GT_S3_REGION: ${{ vars.AWS_CI_TEST_BUCKET_REGION }}
UNITTEST_LOG_DIR: "__unittest_logs"
## this is designed for generating cache that usable for pull requests
test-on-linux:
name: Run tests on Linux
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
runs-on: ubuntu-20.04-8-cores
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
- uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: rui314/setup-mold@v1
- name: Install Rust toolchain
uses: actions-rust-lang/setup-rust-toolchain@v1
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
# Shares cross multiple jobs
shared-key: "coverage-test"
- name: Install Cargo Nextest
uses: taiki-e/install-action@nextest
- name: Setup external services
working-directory: tests-integration/fixtures
run: docker compose up -d --wait
- name: Running tests
run: cargo nextest run -F dashboard -F pg_kvbackend
env:
CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=mold"
RUST_BACKTRACE: 1
CARGO_INCREMENTAL: 0
GT_S3_BUCKET: ${{ vars.AWS_CI_TEST_BUCKET }}
GT_S3_ACCESS_KEY_ID: ${{ secrets.AWS_CI_TEST_ACCESS_KEY_ID }}
GT_S3_ACCESS_KEY: ${{ secrets.AWS_CI_TEST_SECRET_ACCESS_KEY }}
GT_S3_REGION: ${{ vars.AWS_CI_TEST_BUCKET_REGION }}
GT_MINIO_BUCKET: greptime
GT_MINIO_ACCESS_KEY_ID: superpower_ci_user
GT_MINIO_ACCESS_KEY: superpower_password
GT_MINIO_REGION: us-west-2
GT_MINIO_ENDPOINT_URL: http://127.0.0.1:9000
GT_ETCD_ENDPOINTS: http://127.0.0.1:2379
GT_POSTGRES_ENDPOINTS: postgres://greptimedb:admin@127.0.0.1:5432/postgres
GT_KAFKA_ENDPOINTS: 127.0.0.1:9092
GT_KAFKA_SASL_ENDPOINTS: 127.0.0.1:9093
UNITTEST_LOG_DIR: "__unittest_logs"
cleanbuild-linux-nix:
name: Run clean build on Linux
runs-on: ubuntu-latest-8-cores
timeout-minutes: 60
steps:

View File

@@ -91,7 +91,7 @@ env:
# The scheduled version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-YYYYMMDD', like v0.2.0-nigthly-20230313;
NIGHTLY_RELEASE_PREFIX: nightly
# Note: The NEXT_RELEASE_VERSION should be modified manually by every formal release.
NEXT_RELEASE_VERSION: v0.12.0
NEXT_RELEASE_VERSION: v0.11.0
# Permission reference: https://docs.github.com/en/actions/using-jobs/assigning-permissions-to-jobs
permissions:
@@ -436,6 +436,22 @@ jobs:
aws-region: ${{ vars.EC2_RUNNER_REGION }}
github-token: ${{ secrets.GH_PERSONAL_ACCESS_TOKEN }}
bump-doc-version:
name: Bump doc version
if: ${{ github.event_name == 'push' || github.event_name == 'schedule' }}
needs: [allocate-runners]
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v4
- uses: ./.github/actions/setup-cyborg
- name: Bump doc version
working-directory: cyborg
run: pnpm tsx bin/bump-doc-version.ts
env:
VERSION: ${{ needs.allocate-runners.outputs.version }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
DOCS_REPO_TOKEN: ${{ secrets.DOCS_REPO_TOKEN }}
notification:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' && (github.event_name == 'push' || github.event_name == 'schedule') && always() }}
name: Send notification to Greptime team

147
Cargo.lock generated
View File

@@ -188,7 +188,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
[[package]]
name = "api"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"common-base",
"common-decimal",
@@ -773,7 +773,7 @@ dependencies = [
[[package]]
name = "auth"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"api",
"async-trait",
@@ -1314,7 +1314,7 @@ dependencies = [
[[package]]
name = "cache"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"catalog",
"common-error",
@@ -1348,7 +1348,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "catalog"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"api",
"arrow",
@@ -1684,7 +1684,7 @@ checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97"
[[package]]
name = "cli"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"async-trait",
"auth",
@@ -1727,7 +1727,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"store-api",
"substrait 0.12.0",
"substrait 0.11.3",
"table",
"tempfile",
"tokio",
@@ -1736,7 +1736,7 @@ dependencies = [
[[package]]
name = "client"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"api",
"arc-swap",
@@ -1763,7 +1763,7 @@ dependencies = [
"rand",
"serde_json",
"snafu 0.8.5",
"substrait 0.12.0",
"substrait 0.11.3",
"substrait 0.37.3",
"tokio",
"tokio-stream",
@@ -1804,7 +1804,7 @@ dependencies = [
[[package]]
name = "cmd"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"async-trait",
"auth",
@@ -1864,7 +1864,7 @@ dependencies = [
"similar-asserts",
"snafu 0.8.5",
"store-api",
"substrait 0.12.0",
"substrait 0.11.3",
"table",
"temp-env",
"tempfile",
@@ -1916,7 +1916,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
[[package]]
name = "common-base"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"anymap2",
"async-trait",
@@ -1938,11 +1938,11 @@ dependencies = [
[[package]]
name = "common-catalog"
version = "0.12.0"
version = "0.11.3"
[[package]]
name = "common-config"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"common-base",
"common-error",
@@ -1965,7 +1965,7 @@ dependencies = [
[[package]]
name = "common-datasource"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"arrow",
"arrow-schema",
@@ -2001,7 +2001,7 @@ dependencies = [
[[package]]
name = "common-decimal"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"bigdecimal 0.4.5",
"common-error",
@@ -2014,7 +2014,7 @@ dependencies = [
[[package]]
name = "common-error"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"http 0.2.12",
"snafu 0.8.5",
@@ -2024,7 +2024,7 @@ dependencies = [
[[package]]
name = "common-frontend"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"async-trait",
"common-error",
@@ -2034,7 +2034,7 @@ dependencies = [
[[package]]
name = "common-function"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"api",
"approx 0.5.1",
@@ -2078,7 +2078,7 @@ dependencies = [
[[package]]
name = "common-greptimedb-telemetry"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"async-trait",
"common-runtime",
@@ -2095,7 +2095,7 @@ dependencies = [
[[package]]
name = "common-grpc"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"api",
"arrow-flight",
@@ -2121,7 +2121,7 @@ dependencies = [
[[package]]
name = "common-grpc-expr"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"api",
"common-base",
@@ -2140,7 +2140,7 @@ dependencies = [
[[package]]
name = "common-macro"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"arc-swap",
"common-query",
@@ -2154,7 +2154,7 @@ dependencies = [
[[package]]
name = "common-mem-prof"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"common-error",
"common-macro",
@@ -2167,7 +2167,7 @@ dependencies = [
[[package]]
name = "common-meta"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"anymap2",
"api",
@@ -2224,7 +2224,7 @@ dependencies = [
[[package]]
name = "common-options"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"common-grpc",
"humantime-serde",
@@ -2233,11 +2233,11 @@ dependencies = [
[[package]]
name = "common-plugins"
version = "0.12.0"
version = "0.11.3"
[[package]]
name = "common-pprof"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"common-error",
"common-macro",
@@ -2249,7 +2249,7 @@ dependencies = [
[[package]]
name = "common-procedure"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"async-stream",
"async-trait",
@@ -2276,7 +2276,7 @@ dependencies = [
[[package]]
name = "common-procedure-test"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"async-trait",
"common-procedure",
@@ -2284,7 +2284,7 @@ dependencies = [
[[package]]
name = "common-query"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"api",
"async-trait",
@@ -2310,7 +2310,7 @@ dependencies = [
[[package]]
name = "common-recordbatch"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"arc-swap",
"common-error",
@@ -2329,7 +2329,7 @@ dependencies = [
[[package]]
name = "common-runtime"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -2359,7 +2359,7 @@ dependencies = [
[[package]]
name = "common-telemetry"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"atty",
"backtrace",
@@ -2387,7 +2387,7 @@ dependencies = [
[[package]]
name = "common-test-util"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"client",
"common-query",
@@ -2399,7 +2399,7 @@ dependencies = [
[[package]]
name = "common-time"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"arrow",
"chrono",
@@ -2417,7 +2417,7 @@ dependencies = [
[[package]]
name = "common-version"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"build-data",
"const_format",
@@ -2427,7 +2427,7 @@ dependencies = [
[[package]]
name = "common-wal"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"common-base",
"common-error",
@@ -3226,7 +3226,7 @@ dependencies = [
[[package]]
name = "datanode"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"api",
"arrow-flight",
@@ -3277,7 +3277,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"store-api",
"substrait 0.12.0",
"substrait 0.11.3",
"table",
"tokio",
"toml 0.8.19",
@@ -3286,7 +3286,7 @@ dependencies = [
[[package]]
name = "datatypes"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"arrow",
"arrow-array",
@@ -3910,7 +3910,7 @@ dependencies = [
[[package]]
name = "file-engine"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"api",
"async-trait",
@@ -4026,7 +4026,7 @@ checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8"
[[package]]
name = "flow"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"api",
"arrow",
@@ -4085,7 +4085,7 @@ dependencies = [
"snafu 0.8.5",
"store-api",
"strum 0.25.0",
"substrait 0.12.0",
"substrait 0.11.3",
"table",
"tokio",
"tonic 0.11.0",
@@ -4123,7 +4123,7 @@ checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
[[package]]
name = "frontend"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"api",
"arc-swap",
@@ -4558,7 +4558,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a875e976441188028353f7274a46a7e6e065c5d4#a875e976441188028353f7274a46a7e6e065c5d4"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=43ddd8dea69f4df0fe2e8b5cdc0044d2cfa35908#43ddd8dea69f4df0fe2e8b5cdc0044d2cfa35908"
dependencies = [
"prost 0.12.6",
"serde",
@@ -5273,7 +5273,7 @@ dependencies = [
[[package]]
name = "index"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"async-trait",
"asynchronous-codec",
@@ -6123,7 +6123,7 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
[[package]]
name = "log-query"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"chrono",
"common-error",
@@ -6135,7 +6135,7 @@ dependencies = [
[[package]]
name = "log-store"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"async-stream",
"async-trait",
@@ -6479,7 +6479,7 @@ dependencies = [
[[package]]
name = "meta-client"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"api",
"async-trait",
@@ -6506,7 +6506,7 @@ dependencies = [
[[package]]
name = "meta-srv"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"api",
"async-trait",
@@ -6585,7 +6585,7 @@ dependencies = [
[[package]]
name = "metric-engine"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"api",
"aquamarine",
@@ -6679,7 +6679,7 @@ dependencies = [
[[package]]
name = "mito2"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"api",
"aquamarine",
@@ -7416,7 +7416,7 @@ dependencies = [
[[package]]
name = "object-store"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"anyhow",
"bytes",
@@ -7669,7 +7669,7 @@ dependencies = [
[[package]]
name = "operator"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"ahash 0.8.11",
"api",
@@ -7717,7 +7717,7 @@ dependencies = [
"sql",
"sqlparser 0.45.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=54a267ac89c09b11c0c88934690530807185d3e7)",
"store-api",
"substrait 0.12.0",
"substrait 0.11.3",
"table",
"tokio",
"tokio-util",
@@ -7967,7 +7967,7 @@ dependencies = [
[[package]]
name = "partition"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"api",
"async-trait",
@@ -8253,7 +8253,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pipeline"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"ahash 0.8.11",
"api",
@@ -8415,7 +8415,7 @@ dependencies = [
[[package]]
name = "plugins"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"auth",
"clap 4.5.19",
@@ -8703,7 +8703,7 @@ dependencies = [
[[package]]
name = "promql"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"ahash 0.8.11",
"async-trait",
@@ -8938,7 +8938,7 @@ dependencies = [
[[package]]
name = "puffin"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"async-compression 0.4.13",
"async-trait",
@@ -9063,7 +9063,7 @@ dependencies = [
[[package]]
name = "query"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"ahash 0.8.11",
"api",
@@ -9128,7 +9128,7 @@ dependencies = [
"sqlparser 0.45.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=54a267ac89c09b11c0c88934690530807185d3e7)",
"statrs",
"store-api",
"substrait 0.12.0",
"substrait 0.11.3",
"table",
"tokio",
"tokio-stream",
@@ -10612,7 +10612,7 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "script"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"api",
"arc-swap",
@@ -10904,7 +10904,7 @@ dependencies = [
[[package]]
name = "servers"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"ahash 0.8.11",
"api",
@@ -11016,7 +11016,7 @@ dependencies = [
[[package]]
name = "session"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"api",
"arc-swap",
@@ -11370,7 +11370,7 @@ dependencies = [
[[package]]
name = "sql"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"api",
"chrono",
@@ -11434,7 +11434,7 @@ dependencies = [
[[package]]
name = "sqlness-runner"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -11652,7 +11652,7 @@ dependencies = [
[[package]]
name = "store-api"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"api",
"aquamarine",
@@ -11814,7 +11814,7 @@ dependencies = [
[[package]]
name = "substrait"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"async-trait",
"bytes",
@@ -12013,7 +12013,7 @@ dependencies = [
[[package]]
name = "table"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"api",
"async-trait",
@@ -12290,7 +12290,7 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
[[package]]
name = "tests-fuzz"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"arbitrary",
"async-trait",
@@ -12333,7 +12333,7 @@ dependencies = [
[[package]]
name = "tests-integration"
version = "0.12.0"
version = "0.11.3"
dependencies = [
"api",
"arrow-flight",
@@ -12373,6 +12373,7 @@ dependencies = [
"futures-util",
"hex",
"itertools 0.10.5",
"log-query",
"loki-api",
"meta-client",
"meta-srv",
@@ -12397,7 +12398,7 @@ dependencies = [
"sql",
"sqlx",
"store-api",
"substrait 0.12.0",
"substrait 0.11.3",
"table",
"tempfile",
"time",

View File

@@ -68,7 +68,7 @@ members = [
resolver = "2"
[workspace.package]
version = "0.12.0"
version = "0.11.3"
edition = "2021"
license = "Apache-2.0"
@@ -124,7 +124,7 @@ etcd-client = "0.13"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a875e976441188028353f7274a46a7e6e065c5d4" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "43ddd8dea69f4df0fe2e8b5cdc0044d2cfa35908" }
hex = "0.4"
http = "0.2"
humantime = "2.1"

View File

@@ -151,7 +151,7 @@
| `region_engine.mito.inverted_index.intermediate_path` | String | `""` | Deprecated, use `region_engine.mito.index.aux_path` instead. |
| `region_engine.mito.inverted_index.metadata_cache_size` | String | `64MiB` | Cache size for inverted index metadata. |
| `region_engine.mito.inverted_index.content_cache_size` | String | `128MiB` | Cache size for inverted index content. |
| `region_engine.mito.inverted_index.content_cache_page_size` | String | `8MiB` | Page size for inverted index content cache. |
| `region_engine.mito.inverted_index.content_cache_page_size` | String | `64KiB` | Page size for inverted index content cache. |
| `region_engine.mito.fulltext_index` | -- | -- | The options for full-text index in Mito engine. |
| `region_engine.mito.fulltext_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
@@ -485,7 +485,7 @@
| `region_engine.mito.inverted_index.intermediate_path` | String | `""` | Deprecated, use `region_engine.mito.index.aux_path` instead. |
| `region_engine.mito.inverted_index.metadata_cache_size` | String | `64MiB` | Cache size for inverted index metadata. |
| `region_engine.mito.inverted_index.content_cache_size` | String | `128MiB` | Cache size for inverted index content. |
| `region_engine.mito.inverted_index.content_cache_page_size` | String | `8MiB` | Page size for inverted index content cache. |
| `region_engine.mito.inverted_index.content_cache_page_size` | String | `64KiB` | Page size for inverted index content cache. |
| `region_engine.mito.fulltext_index` | -- | -- | The options for full-text index in Mito engine. |
| `region_engine.mito.fulltext_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |

View File

@@ -550,7 +550,7 @@ metadata_cache_size = "64MiB"
content_cache_size = "128MiB"
## Page size for inverted index content cache.
content_cache_page_size = "8MiB"
content_cache_page_size = "64KiB"
## The options for full-text index in Mito engine.
[region_engine.mito.fulltext_index]

View File

@@ -593,7 +593,7 @@ metadata_cache_size = "64MiB"
content_cache_size = "128MiB"
## Page size for inverted index content cache.
content_cache_page_size = "8MiB"
content_cache_page_size = "64KiB"
## The options for full-text index in Mito engine.
[region_engine.mito.fulltext_index]

View File

@@ -0,0 +1,75 @@
/*
* Copyright 2023 Greptime Team
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import * as core from "@actions/core";
import {obtainClient} from "@/common";
async function triggerWorkflow(workflowId: string, version: string) {
const docsClient = obtainClient("DOCS_REPO_TOKEN")
try {
await docsClient.rest.actions.createWorkflowDispatch({
owner: "GreptimeTeam",
repo: "docs",
workflow_id: workflowId,
ref: "main",
inputs: {
version,
},
});
console.log(`Successfully triggered ${workflowId} workflow with version ${version}`);
} catch (error) {
core.setFailed(`Failed to trigger workflow: ${error.message}`);
}
}
function determineWorkflow(version: string): [string, string] {
// Check if it's a nightly version
if (version.includes('nightly')) {
return ['bump-nightly-version.yml', version];
}
const parts = version.split('.');
if (parts.length !== 3) {
throw new Error('Invalid version format');
}
// If patch version (last number) is 0, it's a major version
// Return only major.minor version
if (parts[2] === '0') {
return ['bump-version.yml', `${parts[0]}.${parts[1]}`];
}
// Otherwise it's a patch version, use full version
return ['bump-patch-version.yml', version];
}
const version = process.env.VERSION;
if (!version) {
core.setFailed("VERSION environment variable is required");
process.exit(1);
}
// Remove 'v' prefix if exists
const cleanVersion = version.startsWith('v') ? version.slice(1) : version;
try {
const [workflowId, apiVersion] = determineWorkflow(cleanVersion);
triggerWorkflow(workflowId, apiVersion);
} catch (error) {
core.setFailed(`Error processing version: ${error.message}`);
process.exit(1);
}

View File

@@ -20,31 +20,3 @@ Sample at 49 Hertz, for 10 seconds, output report in text format.
```bash
curl -X POST -s '0:4000/debug/prof/cpu?seconds=10&frequency=49&output=text' > /tmp/pprof.txt
```
## Using `perf`
First find the pid of GreptimeDB:
Using `perf record` to profile GreptimeDB, at the sampling frequency of 99 hertz, and a duration of 60 seconds:
```bash
perf record -p <pid> --call-graph dwarf -F 99 -- sleep 60
```
The result will be saved to file `perf.data`.
Then
```bash
perf script --no-inline > perf.out
```
Produce a flame graph out of it:
```bash
git clone https://github.com/brendangregg/FlameGraph
FlameGraph/stackcollapse-perf.pl perf.out > perf.folded
FlameGraph/flamegraph.pl perf.folded > perf.svg
```

View File

@@ -1,3 +1,3 @@
[toolchain]
channel = "nightly-2024-10-19"
components = ["rust-analyzer"]
components = ["rust-analyzer", "llvm-tools"]

View File

@@ -14,6 +14,7 @@
import os
import re
from multiprocessing import Pool
def find_rust_files(directory):
@@ -33,13 +34,11 @@ def extract_branch_names(file_content):
return pattern.findall(file_content)
def check_snafu_in_files(branch_name, rust_files):
def check_snafu_in_files(branch_name, rust_files_content):
branch_name_snafu = f"{branch_name}Snafu"
for rust_file in rust_files:
with open(rust_file, "r") as file:
content = file.read()
if branch_name_snafu in content:
return True
for content in rust_files_content.values():
if branch_name_snafu in content:
return True
return False
@@ -49,21 +48,24 @@ def main():
for error_file in error_files:
with open(error_file, "r") as file:
content = file.read()
branch_names.extend(extract_branch_names(content))
branch_names.extend(extract_branch_names(file.read()))
unused_snafu = [
branch_name
for branch_name in branch_names
if not check_snafu_in_files(branch_name, other_rust_files)
]
# Read all rust files into memory once
rust_files_content = {}
for rust_file in other_rust_files:
with open(rust_file, "r") as file:
rust_files_content[rust_file] = file.read()
with Pool() as pool:
results = pool.starmap(
check_snafu_in_files, [(bn, rust_files_content) for bn in branch_names]
)
unused_snafu = [bn for bn, found in zip(branch_names, results) if not found]
if unused_snafu:
print("Unused error variants:")
for name in unused_snafu:
print(name)
if unused_snafu:
raise SystemExit(1)

View File

@@ -1,5 +1,5 @@
let
nixpkgs = fetchTarball "https://github.com/NixOS/nixpkgs/tarball/nixos-unstable";
nixpkgs = fetchTarball "https://github.com/NixOS/nixpkgs/tarball/nixos-24.11";
fenix = import (fetchTarball "https://github.com/nix-community/fenix/archive/main.tar.gz") {};
pkgs = import nixpkgs { config = {}; overlays = []; };
in
@@ -11,16 +11,20 @@ pkgs.mkShell rec {
clang
gcc
protobuf
gnumake
mold
(fenix.fromToolchainFile {
dir = ./.;
})
cargo-nextest
cargo-llvm-cov
taplo
curl
];
buildInputs = with pkgs; [
libgit2
libz
];
LD_LIBRARY_PATH = pkgs.lib.makeLibraryPath buildInputs;

View File

@@ -18,7 +18,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::KvCacheInvalidator;
use common_meta::error::Error::CacheNotGet;
use common_meta::error::{CacheNotGetSnafu, Error, ExternalSnafu, GetKvCacheSnafu, Result};
@@ -37,7 +37,6 @@ use snafu::{OptionExt, ResultExt};
use crate::metrics::{
METRIC_CATALOG_KV_BATCH_GET, METRIC_CATALOG_KV_GET, METRIC_CATALOG_KV_REMOTE_GET,
METRIC_META_CLIENT_GET,
};
const DEFAULT_CACHE_MAX_CAPACITY: u64 = 10000;
@@ -293,7 +292,7 @@ impl KvBackend for CachedKvBackend {
}
.map_err(|e| {
GetKvCacheSnafu {
err_msg: e.output_msg(),
err_msg: e.to_string(),
}
.build()
});
@@ -446,8 +445,6 @@ impl KvBackend for MetaKvBackend {
}
async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>> {
let _timer = METRIC_META_CLIENT_GET.start_timer();
let mut response = self
.client
.range(RangeRequest::new().with_key(key))

View File

@@ -34,6 +34,4 @@ lazy_static! {
register_histogram!("greptime_catalog_kv_get", "catalog kv get").unwrap();
pub static ref METRIC_CATALOG_KV_BATCH_GET: Histogram =
register_histogram!("greptime_catalog_kv_batch_get", "catalog kv batch get").unwrap();
pub static ref METRIC_META_CLIENT_GET: Histogram =
register_histogram!("greptime_meta_client_get", "meta client get").unwrap();
}

View File

@@ -62,13 +62,6 @@ impl Instance {
pub fn datanode(&self) -> &Datanode {
&self.datanode
}
/// Get mutable Datanode instance for changing some internal state, before starting it.
// Useful for wrapping Datanode instance. Please do not remove this method even if you find
// nowhere it is called.
pub fn datanode_mut(&mut self) -> &mut Datanode {
&mut self.datanode
}
}
#[async_trait]

View File

@@ -35,10 +35,23 @@ data = {
"bigint_other": [5, -5, 1, 5, 5],
"utf8_increase": ["a", "bb", "ccc", "dddd", "eeeee"],
"utf8_decrease": ["eeeee", "dddd", "ccc", "bb", "a"],
"timestamp_simple": [datetime.datetime(2023, 4, 1, 20, 15, 30, 2000), datetime.datetime.fromtimestamp(int('1629617204525777000')/1000000000), datetime.datetime(2023, 1, 1), datetime.datetime(2023, 2, 1), datetime.datetime(2023, 3, 1)],
"date_simple": [datetime.date(2023, 4, 1), datetime.date(2023, 3, 1), datetime.date(2023, 1, 1), datetime.date(2023, 2, 1), datetime.date(2023, 3, 1)]
"timestamp_simple": [
datetime.datetime(2023, 4, 1, 20, 15, 30, 2000),
datetime.datetime.fromtimestamp(int("1629617204525777000") / 1000000000),
datetime.datetime(2023, 1, 1),
datetime.datetime(2023, 2, 1),
datetime.datetime(2023, 3, 1),
],
"date_simple": [
datetime.date(2023, 4, 1),
datetime.date(2023, 3, 1),
datetime.date(2023, 1, 1),
datetime.date(2023, 2, 1),
datetime.date(2023, 3, 1),
],
}
def infer_schema(data):
schema = "struct<"
for key, value in data.items():
@@ -56,7 +69,7 @@ def infer_schema(data):
elif key.startswith("date"):
dt = "date"
else:
print(key,value,dt)
print(key, value, dt)
raise NotImplementedError
if key.startswith("double"):
dt = "double"
@@ -68,7 +81,6 @@ def infer_schema(data):
return schema
def _write(
schema: str,
data,

View File

@@ -725,7 +725,8 @@ struct Tokenizer {
impl Tokenizer {
pub fn tokenize(mut self, pattern: &str) -> Result<Vec<Token>> {
let mut tokens = vec![];
while self.cursor < pattern.len() {
let char_len = pattern.chars().count();
while self.cursor < char_len {
// TODO: collect pattern into Vec<char> if this tokenizer is bottleneck in the future
let c = pattern.chars().nth(self.cursor).unwrap();
match c {
@@ -794,7 +795,8 @@ impl Tokenizer {
let mut phase = String::new();
let mut is_quote_present = false;
while self.cursor < pattern.len() {
let char_len = pattern.chars().count();
while self.cursor < char_len {
let mut c = pattern.chars().nth(self.cursor).unwrap();
match c {
@@ -899,6 +901,26 @@ mod test {
Phase("c".to_string()),
],
),
(
r#"中文 测试"#,
vec![Phase("中文".to_string()), Phase("测试".to_string())],
),
(
r#"中文 AND 测试"#,
vec![Phase("中文".to_string()), And, Phase("测试".to_string())],
),
(
r#"中文 +测试"#,
vec![Phase("中文".to_string()), Must, Phase("测试".to_string())],
),
(
r#"中文 -测试"#,
vec![
Phase("中文".to_string()),
Negative,
Phase("测试".to_string()),
],
),
];
for (query, expected) in cases {
@@ -1030,6 +1052,61 @@ mod test {
],
},
),
(
r#"中文 测试"#,
PatternAst::Binary {
op: BinaryOp::Or,
children: vec![
PatternAst::Literal {
op: UnaryOp::Optional,
pattern: "中文".to_string(),
},
PatternAst::Literal {
op: UnaryOp::Optional,
pattern: "测试".to_string(),
},
],
},
),
(
r#"中文 AND 测试"#,
PatternAst::Binary {
op: BinaryOp::And,
children: vec![
PatternAst::Literal {
op: UnaryOp::Optional,
pattern: "中文".to_string(),
},
PatternAst::Literal {
op: UnaryOp::Optional,
pattern: "测试".to_string(),
},
],
},
),
(
r#"中文 +测试"#,
PatternAst::Literal {
op: UnaryOp::Must,
pattern: "测试".to_string(),
},
),
(
r#"中文 -测试"#,
PatternAst::Binary {
op: BinaryOp::And,
children: vec![
PatternAst::Literal {
op: UnaryOp::Negative,
pattern: "测试".to_string(),
},
PatternAst::Literal {
op: UnaryOp::Optional,
pattern: "中文".to_string(),
},
],
},
),
];
for (query, expected) in cases {

View File

@@ -60,6 +60,7 @@ pub fn alter_expr_to_request(table_id: TableId, expr: AlterTableExpr) -> Result<
column_schema: schema,
is_key: column_def.semantic_type == SemanticType::Tag as i32,
location: parse_location(ac.location)?,
add_if_not_exists: ac.add_if_not_exists,
})
})
.collect::<Result<Vec<_>>>()?;
@@ -220,6 +221,7 @@ mod tests {
..Default::default()
}),
location: None,
add_if_not_exists: true,
}],
})),
};
@@ -240,6 +242,7 @@ mod tests {
add_column.column_schema.data_type
);
assert_eq!(None, add_column.location);
assert!(add_column.add_if_not_exists);
}
#[test]
@@ -265,6 +268,7 @@ mod tests {
location_type: LocationType::First.into(),
after_column_name: String::default(),
}),
add_if_not_exists: false,
},
AddColumn {
column_def: Some(ColumnDef {
@@ -280,6 +284,7 @@ mod tests {
location_type: LocationType::After.into(),
after_column_name: "ts".to_string(),
}),
add_if_not_exists: true,
},
],
})),
@@ -308,6 +313,7 @@ mod tests {
}),
add_column.location
);
assert!(add_column.add_if_not_exists);
let add_column = add_columns.pop().unwrap();
assert!(!add_column.is_key);
@@ -317,6 +323,7 @@ mod tests {
add_column.column_schema.data_type
);
assert_eq!(Some(AddColumnLocation::First), add_column.location);
assert!(!add_column.add_if_not_exists);
}
#[test]

View File

@@ -299,6 +299,7 @@ mod tests {
.unwrap()
)
);
assert!(host_column.add_if_not_exists);
let memory_column = &add_columns.add_columns[1];
assert_eq!(
@@ -311,6 +312,7 @@ mod tests {
.unwrap()
)
);
assert!(host_column.add_if_not_exists);
let time_column = &add_columns.add_columns[2];
assert_eq!(
@@ -323,6 +325,7 @@ mod tests {
.unwrap()
)
);
assert!(host_column.add_if_not_exists);
let interval_column = &add_columns.add_columns[3];
assert_eq!(
@@ -335,6 +338,7 @@ mod tests {
.unwrap()
)
);
assert!(host_column.add_if_not_exists);
let decimal_column = &add_columns.add_columns[4];
assert_eq!(
@@ -352,6 +356,7 @@ mod tests {
.unwrap()
)
);
assert!(host_column.add_if_not_exists);
}
#[test]

View File

@@ -192,6 +192,9 @@ pub fn build_create_table_expr(
Ok(expr)
}
/// Find columns that are not present in the schema and return them as `AddColumns`
/// for adding columns automatically.
/// It always sets `add_if_not_exists` to `true` for now.
pub fn extract_new_columns(
schema: &Schema,
column_exprs: Vec<ColumnExpr>,
@@ -213,6 +216,7 @@ pub fn extract_new_columns(
AddColumn {
column_def,
location: None,
add_if_not_exists: true,
}
})
.collect::<Vec<_>>();

View File

@@ -105,7 +105,7 @@ impl AlterLogicalTablesProcedure {
.context(ConvertAlterTableRequestSnafu)?;
let new_meta = table_info
.meta
.builder_with_alter_kind(table_ref.table, &request.alter_kind, true)
.builder_with_alter_kind(table_ref.table, &request.alter_kind)
.context(error::TableSnafu)?
.build()
.with_context(|_| error::BuildTableMetaSnafu {

View File

@@ -28,13 +28,13 @@ use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSn
use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Status, StringKey,
};
use common_telemetry::{debug, info};
use common_telemetry::{debug, error, info};
use futures::future;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::storage::RegionId;
use strum::AsRefStr;
use table::metadata::{RawTableInfo, TableId};
use table::metadata::{RawTableInfo, TableId, TableInfo};
use table::table_reference::TableReference;
use crate::cache_invalidator::Context;
@@ -51,10 +51,14 @@ use crate::{metrics, ClusterId};
/// The alter table procedure
pub struct AlterTableProcedure {
// The runtime context.
/// The runtime context.
context: DdlContext,
// The serialized data.
/// The serialized data.
data: AlterTableData,
/// Cached new table metadata in the prepare step.
/// If we recover the procedure from json, then the table info value is not cached.
/// But we already validated it in the prepare step.
new_table_info: Option<TableInfo>,
}
impl AlterTableProcedure {
@@ -70,18 +74,31 @@ impl AlterTableProcedure {
Ok(Self {
context,
data: AlterTableData::new(task, table_id, cluster_id),
new_table_info: None,
})
}
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data: AlterTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
Ok(AlterTableProcedure { context, data })
Ok(AlterTableProcedure {
context,
data,
new_table_info: None,
})
}
// Checks whether the table exists.
pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
self.check_alter().await?;
self.fill_table_info().await?;
// Validates the request and builds the new table info.
// We need to build the new table info here because we should ensure the alteration
// is valid in `UpdateMeta` state as we already altered the region.
// Safety: `fill_table_info()` already set it.
let table_info_value = self.data.table_info_value.as_ref().unwrap();
self.new_table_info = Some(self.build_new_table_info(&table_info_value.table_info)?);
// Safety: Checked in `AlterTableProcedure::new`.
let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
if matches!(alter_kind, Kind::RenameTable { .. }) {
@@ -106,6 +123,14 @@ impl AlterTableProcedure {
let leaders = find_leaders(&physical_table_route.region_routes);
let mut alter_region_tasks = Vec::with_capacity(leaders.len());
let alter_kind = self.make_region_alter_kind()?;
info!(
"Submitting alter region requests for table {}, table_id: {}, alter_kind: {:?}",
self.data.table_ref(),
table_id,
alter_kind,
);
for datanode in leaders {
let requester = self.context.node_manager.datanode(&datanode).await;
@@ -113,7 +138,7 @@ impl AlterTableProcedure {
for region in regions {
let region_id = RegionId::new(table_id, region);
let request = self.make_alter_region_request(region_id)?;
let request = self.make_alter_region_request(region_id, alter_kind.clone())?;
debug!("Submitting {request:?} to {datanode}");
let datanode = datanode.clone();
@@ -150,7 +175,15 @@ impl AlterTableProcedure {
let table_ref = self.data.table_ref();
// Safety: checked before.
let table_info_value = self.data.table_info_value.as_ref().unwrap();
let new_info = self.build_new_table_info(&table_info_value.table_info)?;
// Gets the table info from the cache or builds it.
let new_info = match &self.new_table_info {
Some(cached) => cached.clone(),
None => self.build_new_table_info(&table_info_value.table_info)
.inspect_err(|e| {
// We already check the table info in the prepare step so this should not happen.
error!(e; "Unable to build info for table {} in update metadata step, table_id: {}", table_ref, table_id);
})?,
};
debug!(
"Starting update table: {} metadata, new table info {:?}",
@@ -174,7 +207,7 @@ impl AlterTableProcedure {
.await?;
}
info!("Updated table metadata for table {table_ref}, table_id: {table_id}");
info!("Updated table metadata for table {table_ref}, table_id: {table_id}, kind: {alter_kind:?}");
self.data.state = AlterTableState::InvalidateTableCache;
Ok(Status::executing(true))
}

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use api::v1::alter_table_expr::Kind;
use api::v1::region::region_request::Body;
use api::v1::region::{
@@ -27,13 +29,15 @@ use crate::ddl::alter_table::AlterTableProcedure;
use crate::error::{InvalidProtoMsgSnafu, Result};
impl AlterTableProcedure {
/// Makes alter region request.
pub(crate) fn make_alter_region_request(&self, region_id: RegionId) -> Result<RegionRequest> {
// Safety: Checked in `AlterTableProcedure::new`.
let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
/// Makes alter region request from existing an alter kind.
/// Region alter request always add columns if not exist.
pub(crate) fn make_alter_region_request(
&self,
region_id: RegionId,
kind: Option<alter_request::Kind>,
) -> Result<RegionRequest> {
// Safety: checked
let table_info = self.data.table_info().unwrap();
let kind = create_proto_alter_kind(table_info, alter_kind)?;
Ok(RegionRequest {
header: Some(RegionRequestHeader {
@@ -47,45 +51,66 @@ impl AlterTableProcedure {
})),
})
}
/// Makes alter kind proto that all regions can reuse.
/// Region alter request always add columns if not exist.
pub(crate) fn make_region_alter_kind(&self) -> Result<Option<alter_request::Kind>> {
// Safety: Checked in `AlterTableProcedure::new`.
let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
// Safety: checked
let table_info = self.data.table_info().unwrap();
let kind = create_proto_alter_kind(table_info, alter_kind)?;
Ok(kind)
}
}
/// Creates region proto alter kind from `table_info` and `alter_kind`.
///
/// Returns the kind and next column id if it adds new columns.
/// It always adds column if not exists and drops column if exists.
/// It skips the column if it already exists in the table.
fn create_proto_alter_kind(
table_info: &RawTableInfo,
alter_kind: &Kind,
) -> Result<Option<alter_request::Kind>> {
match alter_kind {
Kind::AddColumns(x) => {
// Construct a set of existing columns in the table.
let existing_columns: HashSet<_> = table_info
.meta
.schema
.column_schemas
.iter()
.map(|col| &col.name)
.collect();
let mut next_column_id = table_info.meta.next_column_id;
let add_columns = x
.add_columns
.iter()
.map(|add_column| {
let column_def =
add_column
.column_def
.as_ref()
.context(InvalidProtoMsgSnafu {
err_msg: "'column_def' is absent",
})?;
let mut add_columns = Vec::with_capacity(x.add_columns.len());
for add_column in &x.add_columns {
let column_def = add_column
.column_def
.as_ref()
.context(InvalidProtoMsgSnafu {
err_msg: "'column_def' is absent",
})?;
let column_id = next_column_id;
next_column_id += 1;
// Skips existing columns.
if existing_columns.contains(&column_def.name) {
continue;
}
let column_def = RegionColumnDef {
column_def: Some(column_def.clone()),
column_id,
};
let column_id = next_column_id;
next_column_id += 1;
let column_def = RegionColumnDef {
column_def: Some(column_def.clone()),
column_id,
};
Ok(AddColumn {
column_def: Some(column_def),
location: add_column.location.clone(),
})
})
.collect::<Result<Vec<_>>>()?;
add_columns.push(AddColumn {
column_def: Some(column_def),
location: add_column.location.clone(),
});
}
Ok(Some(alter_request::Kind::AddColumns(AddColumns {
add_columns,
@@ -143,6 +168,7 @@ mod tests {
use crate::rpc::router::{Region, RegionRoute};
use crate::test_util::{new_ddl_context, MockDatanodeManager};
/// Prepares a region with schema `[ts: Timestamp, host: Tag, cpu: Field]`.
async fn prepare_ddl_context() -> (DdlContext, u64, TableId, RegionId, String) {
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
@@ -171,6 +197,7 @@ mod tests {
.name("cpu")
.data_type(ColumnDataType::Float64)
.semantic_type(SemanticType::Field)
.is_nullable(true)
.build()
.unwrap()
.into(),
@@ -225,15 +252,16 @@ mod tests {
name: "my_tag3".to_string(),
data_type: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: b"hello".to_vec(),
default_constraint: Vec::new(),
semantic_type: SemanticType::Tag as i32,
comment: String::new(),
..Default::default()
}),
location: Some(AddColumnLocation {
location_type: LocationType::After as i32,
after_column_name: "my_tag2".to_string(),
after_column_name: "host".to_string(),
}),
add_if_not_exists: false,
}],
})),
},
@@ -242,8 +270,11 @@ mod tests {
let mut procedure =
AlterTableProcedure::new(cluster_id, table_id, task, ddl_context).unwrap();
procedure.on_prepare().await.unwrap();
let Some(Body::Alter(alter_region_request)) =
procedure.make_alter_region_request(region_id).unwrap().body
let alter_kind = procedure.make_region_alter_kind().unwrap();
let Some(Body::Alter(alter_region_request)) = procedure
.make_alter_region_request(region_id, alter_kind)
.unwrap()
.body
else {
unreachable!()
};
@@ -259,7 +290,7 @@ mod tests {
name: "my_tag3".to_string(),
data_type: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: b"hello".to_vec(),
default_constraint: Vec::new(),
semantic_type: SemanticType::Tag as i32,
comment: String::new(),
..Default::default()
@@ -268,7 +299,7 @@ mod tests {
}),
location: Some(AddColumnLocation {
location_type: LocationType::After as i32,
after_column_name: "my_tag2".to_string(),
after_column_name: "host".to_string(),
}),
}]
}
@@ -299,8 +330,11 @@ mod tests {
let mut procedure =
AlterTableProcedure::new(cluster_id, table_id, task, ddl_context).unwrap();
procedure.on_prepare().await.unwrap();
let Some(Body::Alter(alter_region_request)) =
procedure.make_alter_region_request(region_id).unwrap().body
let alter_kind = procedure.make_region_alter_kind().unwrap();
let Some(Body::Alter(alter_region_request)) = procedure
.make_alter_region_request(region_id, alter_kind)
.unwrap()
.body
else {
unreachable!()
};

View File

@@ -23,7 +23,9 @@ use crate::key::table_info::TableInfoValue;
use crate::key::{DeserializedValueWithBytes, RegionDistribution};
impl AlterTableProcedure {
/// Builds new_meta
/// Builds new table info after alteration.
/// It bumps the column id of the table by the number of the add column requests.
/// So there may be holes in the column id sequence.
pub(crate) fn build_new_table_info(&self, table_info: &RawTableInfo) -> Result<TableInfo> {
let table_info =
TableInfo::try_from(table_info.clone()).context(error::ConvertRawTableInfoSnafu)?;
@@ -34,7 +36,7 @@ impl AlterTableProcedure {
let new_meta = table_info
.meta
.builder_with_alter_kind(table_ref.table, &request.alter_kind, false)
.builder_with_alter_kind(table_ref.table, &request.alter_kind)
.context(error::TableSnafu)?
.build()
.with_context(|_| error::BuildTableMetaSnafu {
@@ -46,6 +48,9 @@ impl AlterTableProcedure {
new_info.ident.version = table_info.ident.version + 1;
match request.alter_kind {
AlterKind::AddColumns { columns } => {
// Bumps the column id for the new columns.
// It may bump more than the actual number of columns added if there are
// existing columns, but it's fine.
new_info.meta.next_column_id += columns.len() as u32;
}
AlterKind::RenameTable { new_table_name } => {

View File

@@ -30,6 +30,8 @@ pub struct TestAlterTableExpr {
add_columns: Vec<ColumnDef>,
#[builder(setter(into, strip_option))]
new_table_name: Option<String>,
#[builder(setter)]
add_if_not_exists: bool,
}
impl From<TestAlterTableExpr> for AlterTableExpr {
@@ -53,6 +55,7 @@ impl From<TestAlterTableExpr> for AlterTableExpr {
.map(|col| AddColumn {
column_def: Some(col),
location: None,
add_if_not_exists: value.add_if_not_exists,
})
.collect(),
})),

View File

@@ -56,6 +56,7 @@ fn make_alter_logical_table_add_column_task(
let alter_table = alter_table
.table_name(table.to_string())
.add_columns(add_columns)
.add_if_not_exists(true)
.build()
.unwrap();

View File

@@ -139,7 +139,7 @@ async fn test_on_submit_alter_request() {
table_name: table_name.to_string(),
kind: Some(Kind::DropColumns(DropColumns {
drop_columns: vec![DropColumn {
name: "my_field_column".to_string(),
name: "cpu".to_string(),
}],
})),
},
@@ -225,7 +225,7 @@ async fn test_on_submit_alter_request_with_outdated_request() {
table_name: table_name.to_string(),
kind: Some(Kind::DropColumns(DropColumns {
drop_columns: vec![DropColumn {
name: "my_field_column".to_string(),
name: "cpu".to_string(),
}],
})),
},
@@ -330,6 +330,7 @@ async fn test_on_update_metadata_add_columns() {
..Default::default()
}),
location: None,
add_if_not_exists: false,
}],
})),
},

View File

@@ -29,7 +29,6 @@ use crate::error::{self, Error, InvalidMetadataSnafu, ParseOptionSnafu, Result};
use crate::key::{MetadataKey, SCHEMA_NAME_KEY_PATTERN, SCHEMA_NAME_KEY_PREFIX};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;
use crate::metrics::METRIC_META_SCHEMA_INFO_GET;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use crate::rpc::store::RangeRequest;
use crate::rpc::KeyValue;
@@ -210,8 +209,6 @@ impl SchemaManager {
&self,
schema: SchemaNameKey<'_>,
) -> Result<Option<DeserializedValueWithBytes<SchemaNameValue>>> {
let _timer = METRIC_META_SCHEMA_INFO_GET.start_timer();
let raw_key = schema.to_bytes();
self.kv_backend
.get(&raw_key)

View File

@@ -29,7 +29,6 @@ use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{DeserializedValueWithBytes, MetadataKey, MetadataValue, TABLE_INFO_KEY_PREFIX};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;
use crate::metrics::METRIC_META_TABLE_INFO_GET;
use crate::rpc::store::BatchGetRequest;
/// The key stores the metadata of the table.
@@ -195,8 +194,6 @@ impl TableInfoManager {
&self,
table_id: TableId,
) -> Result<Option<DeserializedValueWithBytes<TableInfoValue>>> {
let _timer = METRIC_META_TABLE_INFO_GET.start_timer();
let key = TableInfoKey::new(table_id);
let raw_key = key.to_bytes();
self.kv_backend

View File

@@ -108,9 +108,4 @@ lazy_static! {
&["name"]
)
.unwrap();
pub static ref METRIC_META_TABLE_INFO_GET: Histogram =
register_histogram!("greptime_meta_table_info_get", "get table info from kvbackend").unwrap();
pub static ref METRIC_META_SCHEMA_INFO_GET: Histogram =
register_histogram!("greptime_meta_schema_info_get", "get schema info from kvbackend").unwrap();
}

View File

@@ -123,6 +123,14 @@ impl ColumnSchema {
self.default_constraint.as_ref()
}
/// Check if the default constraint is a impure function.
pub fn is_default_impure(&self) -> bool {
self.default_constraint
.as_ref()
.map(|c| c.is_function())
.unwrap_or(false)
}
#[inline]
pub fn metadata(&self) -> &Metadata {
&self.metadata
@@ -283,6 +291,15 @@ impl ColumnSchema {
}
}
/// Creates an impure default value for this column, only if it have a impure default constraint.
/// Otherwise, returns `Ok(None)`.
pub fn create_impure_default(&self) -> Result<Option<Value>> {
match &self.default_constraint {
Some(c) => c.create_impure_default(&self.data_type),
None => Ok(None),
}
}
/// Retrieves the fulltext options for the column.
pub fn fulltext_options(&self) -> Result<Option<FulltextOptions>> {
match self.metadata.get(FULLTEXT_KEY) {

View File

@@ -178,12 +178,63 @@ impl ColumnDefaultConstraint {
}
}
/// Only create default vector if it's impure, i.e., it's a function.
///
/// This helps to delay creating constant default values to mito engine while also keeps impure default have consistent values
pub fn create_impure_default_vector(
&self,
data_type: &ConcreteDataType,
num_rows: usize,
) -> Result<Option<VectorRef>> {
assert!(num_rows > 0);
match self {
ColumnDefaultConstraint::Function(expr) => {
// Functions should also ensure its return value is not null when
// is_nullable is true.
match &expr[..] {
// TODO(dennis): we only supports current_timestamp right now,
// it's better to use a expression framework in future.
CURRENT_TIMESTAMP | CURRENT_TIMESTAMP_FN | NOW_FN => {
create_current_timestamp_vector(data_type, num_rows).map(Some)
}
_ => error::UnsupportedDefaultExprSnafu { expr }.fail(),
}
}
ColumnDefaultConstraint::Value(_) => Ok(None),
}
}
/// Only create default value if it's impure, i.e., it's a function.
///
/// This helps to delay creating constant default values to mito engine while also keeps impure default have consistent values
pub fn create_impure_default(&self, data_type: &ConcreteDataType) -> Result<Option<Value>> {
match self {
ColumnDefaultConstraint::Function(expr) => {
// Functions should also ensure its return value is not null when
// is_nullable is true.
match &expr[..] {
CURRENT_TIMESTAMP | CURRENT_TIMESTAMP_FN | NOW_FN => {
create_current_timestamp(data_type).map(Some)
}
_ => error::UnsupportedDefaultExprSnafu { expr }.fail(),
}
}
ColumnDefaultConstraint::Value(_) => Ok(None),
}
}
/// Returns true if this constraint might creates NULL.
fn maybe_null(&self) -> bool {
// Once we support more functions, we may return true if given function
// could return null.
matches!(self, ColumnDefaultConstraint::Value(Value::Null))
}
/// Returns true if this constraint is a function.
pub fn is_function(&self) -> bool {
matches!(self, ColumnDefaultConstraint::Function(_))
}
}
fn create_current_timestamp(data_type: &ConcreteDataType) -> Result<Value> {

View File

@@ -45,17 +45,12 @@ use tokio::sync::broadcast::error::TryRecvError;
use tokio::sync::{broadcast, watch, Mutex, RwLock};
pub(crate) use crate::adapter::node_context::FlownodeContext;
use crate::adapter::table_source::TableSource;
use crate::adapter::util::{
relation_desc_to_column_schemas_with_fallback, table_info_value_to_relation_desc,
};
use crate::adapter::table_source::ManagedTableSource;
use crate::adapter::util::relation_desc_to_column_schemas_with_fallback;
use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
use crate::compute::ErrCollector;
use crate::df_optimizer::sql_to_flow_plan;
use crate::error::{
EvalSnafu, ExternalSnafu, FlowAlreadyExistSnafu, InternalSnafu, InvalidQuerySnafu,
UnexpectedSnafu,
};
use crate::error::{EvalSnafu, ExternalSnafu, InternalSnafu, InvalidQuerySnafu, UnexpectedSnafu};
use crate::expr::Batch;
use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS};
use crate::repr::{self, DiffRow, RelationDesc, Row, BATCH_SIZE};
@@ -69,7 +64,7 @@ mod util;
mod worker;
pub(crate) mod node_context;
mod table_source;
pub(crate) mod table_source;
use crate::error::Error;
use crate::utils::StateReportHandler;
@@ -129,7 +124,7 @@ pub struct FlowWorkerManager {
/// The query engine that will be used to parse the query and convert it to a dataflow plan
pub query_engine: Arc<dyn QueryEngine>,
/// Getting table name and table schema from table info manager
table_info_source: TableSource,
table_info_source: ManagedTableSource,
frontend_invoker: RwLock<Option<FrontendInvoker>>,
/// contains mapping from table name to global id, and table schema
node_context: RwLock<FlownodeContext>,
@@ -158,11 +153,11 @@ impl FlowWorkerManager {
query_engine: Arc<dyn QueryEngine>,
table_meta: TableMetadataManagerRef,
) -> Self {
let srv_map = TableSource::new(
let srv_map = ManagedTableSource::new(
table_meta.table_info_manager().clone(),
table_meta.table_name_manager().clone(),
);
let node_context = FlownodeContext::default();
let node_context = FlownodeContext::new(Box::new(srv_map.clone()) as _);
let tick_manager = FlowTickManager::new();
let worker_handles = Vec::new();
FlowWorkerManager {
@@ -409,7 +404,7 @@ impl FlowWorkerManager {
) -> Result<Option<(Vec<String>, Option<usize>, Vec<ColumnSchema>)>, Error> {
if let Some(table_id) = self
.table_info_source
.get_table_id_from_name(table_name)
.get_opt_table_id_from_name(table_name)
.await?
{
let table_info = self
@@ -729,43 +724,6 @@ impl FlowWorkerManager {
query_ctx,
} = args;
let already_exist = {
let mut flag = false;
// check if the task already exists
for handle in self.worker_handles.iter() {
if handle.lock().await.contains_flow(flow_id).await? {
flag = true;
break;
}
}
flag
};
match (create_if_not_exists, or_replace, already_exist) {
// do replace
(_, true, true) => {
info!("Replacing flow with id={}", flow_id);
self.remove_flow(flow_id).await?;
}
(false, false, true) => FlowAlreadyExistSnafu { id: flow_id }.fail()?,
// do nothing if exists
(true, false, true) => {
info!("Flow with id={} already exists, do nothing", flow_id);
return Ok(None);
}
// create if not exists
(_, _, false) => (),
}
if create_if_not_exists {
// check if the task already exists
for handle in self.worker_handles.iter() {
if handle.lock().await.contains_flow(flow_id).await? {
return Ok(None);
}
}
}
let mut node_ctx = self.node_context.write().await;
// assign global id to source and sink table
for source in &source_table_ids {
@@ -828,27 +786,9 @@ impl FlowWorkerManager {
.fail()?,
}
}
let table_id = self
.table_info_source
.get_table_id_from_name(&sink_table_name)
.await?
.context(UnexpectedSnafu {
reason: format!("Can't get table id for table name {:?}", sink_table_name),
})?;
let table_info_value = self
.table_info_source
.get_table_info_value(&table_id)
.await?
.context(UnexpectedSnafu {
reason: format!("Can't get table info value for table id {:?}", table_id),
})?;
let real_schema = table_info_value_to_relation_desc(table_info_value)?;
node_ctx.assign_table_schema(&sink_table_name, real_schema.clone())?;
} else {
// assign inferred schema to sink table
// create sink table
node_ctx.assign_table_schema(&sink_table_name, flow_plan.schema.clone())?;
let did_create = self
.create_table_from_relation(
&format!("flow-id={flow_id}"),
@@ -897,9 +837,11 @@ impl FlowWorkerManager {
source_ids,
src_recvs: source_receivers,
expire_after,
or_replace,
create_if_not_exists,
err_collector,
};
handle.create_flow(create_request).await?;
info!("Successfully create flow with id={}", flow_id);
Ok(Some(flow_id))

View File

@@ -24,21 +24,26 @@ use common_error::ext::BoxedError;
use common_meta::error::{ExternalSnafu, Result, UnexpectedSnafu};
use common_meta::node_manager::Flownode;
use common_telemetry::{debug, trace};
use datatypes::value::Value;
use itertools::Itertools;
use snafu::{OptionExt, ResultExt};
use snafu::{IntoError, OptionExt, ResultExt};
use store_api::storage::RegionId;
use super::util::from_proto_to_data_type;
use crate::adapter::{CreateFlowArgs, FlowWorkerManager};
use crate::error::InternalSnafu;
use crate::error::{CreateFlowSnafu, InsertIntoFlowSnafu, InternalSnafu};
use crate::metrics::METRIC_FLOW_TASK_COUNT;
use crate::repr::{self, DiffRow};
fn to_meta_err(err: crate::error::Error) -> common_meta::error::Error {
// TODO(discord9): refactor this
Err::<(), _>(BoxedError::new(err))
.with_context(|_| ExternalSnafu)
.unwrap_err()
/// return a function to convert `crate::error::Error` to `common_meta::error::Error`
fn to_meta_err(
location: snafu::Location,
) -> impl FnOnce(crate::error::Error) -> common_meta::error::Error {
move |err: crate::error::Error| -> common_meta::error::Error {
common_meta::error::Error::External {
location,
source: BoxedError::new(err),
}
}
}
#[async_trait::async_trait]
@@ -75,11 +80,16 @@ impl Flownode for FlowWorkerManager {
or_replace,
expire_after,
comment: Some(comment),
sql,
sql: sql.clone(),
flow_options,
query_ctx,
};
let ret = self.create_flow(args).await.map_err(to_meta_err)?;
let ret = self
.create_flow(args)
.await
.map_err(BoxedError::new)
.with_context(|_| CreateFlowSnafu { sql: sql.clone() })
.map_err(to_meta_err(snafu::location!()))?;
METRIC_FLOW_TASK_COUNT.inc();
Ok(FlowResponse {
affected_flows: ret
@@ -94,7 +104,7 @@ impl Flownode for FlowWorkerManager {
})) => {
self.remove_flow(flow_id.id as u64)
.await
.map_err(to_meta_err)?;
.map_err(to_meta_err(snafu::location!()))?;
METRIC_FLOW_TASK_COUNT.dec();
Ok(Default::default())
}
@@ -112,9 +122,15 @@ impl Flownode for FlowWorkerManager {
.await
.flush_all_sender()
.await
.map_err(to_meta_err)?;
let rows_send = self.run_available(true).await.map_err(to_meta_err)?;
let row = self.send_writeback_requests().await.map_err(to_meta_err)?;
.map_err(to_meta_err(snafu::location!()))?;
let rows_send = self
.run_available(true)
.await
.map_err(to_meta_err(snafu::location!()))?;
let row = self
.send_writeback_requests()
.await
.map_err(to_meta_err(snafu::location!()))?;
debug!(
"Done to flush flow_id={:?} with {} input rows flushed, {} rows sended and {} output rows flushed",
@@ -154,17 +170,41 @@ impl Flownode for FlowWorkerManager {
// TODO(discord9): reconsider time assignment mechanism
let now = self.tick_manager.tick();
let fetch_order = {
let (table_types, fetch_order) = {
let ctx = self.node_context.read().await;
let table_col_names = ctx
.table_repr
.get_by_table_id(&table_id)
.map(|r| r.1)
.and_then(|id| ctx.schema.get(&id))
.map(|desc| &desc.names)
.context(UnexpectedSnafu {
err_msg: format!("Table not found: {}", table_id),
})?;
// TODO(discord9): also check schema version so that altered table can be reported
let table_schema = ctx
.table_source
.table_from_id(&table_id)
.await
.map_err(to_meta_err(snafu::location!()))?;
let default_vals = table_schema
.default_values
.iter()
.zip(table_schema.relation_desc.typ().column_types.iter())
.map(|(v, ty)| {
v.as_ref().and_then(|v| {
match v.create_default(ty.scalar_type(), ty.nullable()) {
Ok(v) => Some(v),
Err(err) => {
common_telemetry::error!(err; "Failed to create default value");
None
}
}
})
})
.collect_vec();
let table_types = table_schema
.relation_desc
.typ()
.column_types
.clone()
.into_iter()
.map(|t| t.scalar_type)
.collect_vec();
let table_col_names = table_schema.relation_desc.names;
let table_col_names = table_col_names
.iter().enumerate()
.map(|(idx,name)| match name {
@@ -181,44 +221,80 @@ impl Flownode for FlowWorkerManager {
.enumerate()
.map(|(i, name)| (&name.column_name, i)),
);
let fetch_order: Vec<usize> = table_col_names
let fetch_order: Vec<FetchFromRow> = table_col_names
.iter()
.map(|names| {
name_to_col.get(names).copied().context(UnexpectedSnafu {
err_msg: format!("Column not found: {}", names),
})
.zip(default_vals.into_iter())
.map(|(col_name, col_default_val)| {
name_to_col
.get(col_name)
.copied()
.map(FetchFromRow::Idx)
.or_else(|| col_default_val.clone().map(FetchFromRow::Default))
.with_context(|| UnexpectedSnafu {
err_msg: format!(
"Column not found: {}, default_value: {:?}",
col_name, col_default_val
),
})
})
.try_collect()?;
if !fetch_order.iter().enumerate().all(|(i, &v)| i == v) {
trace!("Reordering columns: {:?}", fetch_order)
}
fetch_order
trace!("Reordering columns: {:?}", fetch_order);
(table_types, fetch_order)
};
// TODO(discord9): use column instead of row
let rows: Vec<DiffRow> = rows_proto
.into_iter()
.map(|r| {
let r = repr::Row::from(r);
let reordered = fetch_order
.iter()
.map(|&i| r.inner[i].clone())
.collect_vec();
let reordered = fetch_order.iter().map(|i| i.fetch(&r)).collect_vec();
repr::Row::new(reordered)
})
.map(|r| (r, now, 1))
.collect_vec();
let batch_datatypes = insert_schema
.iter()
.map(from_proto_to_data_type)
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(to_meta_err)?;
self.handle_write_request(region_id.into(), rows, &batch_datatypes)
if let Err(err) = self
.handle_write_request(region_id.into(), rows, &table_types)
.await
.map_err(|err| {
common_telemetry::error!(err;"Failed to handle write request");
to_meta_err(err)
})?;
{
let err = BoxedError::new(err);
let flow_ids = self
.node_context
.read()
.await
.get_flow_ids(table_id)
.into_iter()
.flatten()
.cloned()
.collect_vec();
let err = InsertIntoFlowSnafu {
region_id,
flow_ids,
}
.into_error(err);
common_telemetry::error!(err; "Failed to handle write request");
let err = to_meta_err(snafu::location!())(err);
return Err(err);
}
}
Ok(Default::default())
}
}
/// Simple helper enum for fetching value from row with default value
#[derive(Debug, Clone)]
enum FetchFromRow {
Idx(usize),
Default(Value),
}
impl FetchFromRow {
/// Panic if idx is out of bound
fn fetch(&self, row: &repr::Row) -> Value {
match self {
FetchFromRow::Idx(idx) => row.get(*idx).unwrap().clone(),
FetchFromRow::Default(v) => v.clone(),
}
}
}

View File

@@ -25,7 +25,8 @@ use snafu::{OptionExt, ResultExt};
use table::metadata::TableId;
use tokio::sync::{broadcast, mpsc, RwLock};
use crate::adapter::{FlowId, TableName, TableSource};
use crate::adapter::table_source::FlowTableSource;
use crate::adapter::{FlowId, ManagedTableSource, TableName};
use crate::error::{Error, EvalSnafu, TableNotFoundSnafu};
use crate::expr::error::InternalSnafu;
use crate::expr::{Batch, GlobalId};
@@ -33,7 +34,7 @@ use crate::metrics::METRIC_FLOW_INPUT_BUF_SIZE;
use crate::repr::{DiffRow, RelationDesc, BATCH_SIZE, BROADCAST_CAP, SEND_BUF_CAP};
/// A context that holds the information of the dataflow
#[derive(Default, Debug)]
#[derive(Debug)]
pub struct FlownodeContext {
/// mapping from source table to tasks, useful for schedule which task to run when a source table is updated
pub source_to_tasks: BTreeMap<TableId, BTreeSet<FlowId>>,
@@ -50,13 +51,32 @@ pub struct FlownodeContext {
/// note that the sink receiver should only have one, and we are using broadcast as mpsc channel here
pub sink_receiver:
BTreeMap<TableName, (mpsc::UnboundedSender<Batch>, mpsc::UnboundedReceiver<Batch>)>,
/// the schema of the table, query from metasrv or inferred from TypedPlan
pub schema: HashMap<GlobalId, RelationDesc>,
/// can query the schema of the table source, from metasrv with local cache
pub table_source: Box<dyn FlowTableSource>,
/// All the tables that have been registered in the worker
pub table_repr: IdToNameMap,
pub query_context: Option<Arc<QueryContext>>,
}
impl FlownodeContext {
pub fn new(table_source: Box<dyn FlowTableSource>) -> Self {
Self {
source_to_tasks: Default::default(),
flow_to_sink: Default::default(),
sink_to_flow: Default::default(),
source_sender: Default::default(),
sink_receiver: Default::default(),
table_source,
table_repr: Default::default(),
query_context: Default::default(),
}
}
pub fn get_flow_ids(&self, table_id: TableId) -> Option<&BTreeSet<FlowId>> {
self.source_to_tasks.get(&table_id)
}
}
/// a simple broadcast sender with backpressure, bounded capacity and blocking on send when send buf is full
/// note that it wouldn't evict old data, so it's possible to block forever if the receiver is slow
///
@@ -284,7 +304,7 @@ impl FlownodeContext {
/// Retrieves a GlobalId and table schema representing a table previously registered by calling the [register_table] function.
///
/// Returns an error if no table has been registered with the provided names
pub fn table(&self, name: &TableName) -> Result<(GlobalId, RelationDesc), Error> {
pub async fn table(&self, name: &TableName) -> Result<(GlobalId, RelationDesc), Error> {
let id = self
.table_repr
.get_by_name(name)
@@ -292,14 +312,8 @@ impl FlownodeContext {
.with_context(|| TableNotFoundSnafu {
name: name.join("."),
})?;
let schema = self
.schema
.get(&id)
.cloned()
.with_context(|| TableNotFoundSnafu {
name: name.join("."),
})?;
Ok((id, schema))
let schema = self.table_source.table(name).await?;
Ok((id, schema.relation_desc))
}
/// Assign a global id to a table, if already assigned, return the existing global id
@@ -312,7 +326,7 @@ impl FlownodeContext {
/// merely creating a mapping from table id to global id
pub async fn assign_global_id_to_table(
&mut self,
srv_map: &TableSource,
srv_map: &ManagedTableSource,
mut table_name: Option<TableName>,
table_id: Option<TableId>,
) -> Result<GlobalId, Error> {
@@ -333,9 +347,8 @@ impl FlownodeContext {
// table id is Some meaning db must have created the table
if let Some(table_id) = table_id {
let (known_table_name, schema) = srv_map.get_table_name_schema(&table_id).await?;
let known_table_name = srv_map.get_table_name(&table_id).await?;
table_name = table_name.or(Some(known_table_name));
self.schema.insert(global_id, schema);
} // if we don't have table id, it means database haven't assign one yet or we don't need it
// still update the mapping with new global id
@@ -344,26 +357,6 @@ impl FlownodeContext {
}
}
/// Assign a schema to a table
///
pub fn assign_table_schema(
&mut self,
table_name: &TableName,
schema: RelationDesc,
) -> Result<(), Error> {
let gid = self
.table_repr
.get_by_name(table_name)
.map(|(_, gid)| gid)
.context(TableNotFoundSnafu {
name: format!("Table not found: {:?} in flownode cache", table_name),
})?;
self.schema.insert(gid, schema);
Ok(())
}
/// Get a new global id
pub fn new_global_id(&self) -> GlobalId {
GlobalId::User(self.table_repr.global_id_to_name_id.len() as u64)

View File

@@ -17,6 +17,8 @@
use common_error::ext::BoxedError;
use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
use common_meta::key::table_name::{TableNameKey, TableNameManager};
use datatypes::schema::ColumnDefaultConstraint;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use table::metadata::TableId;
@@ -27,16 +29,82 @@ use crate::error::{
};
use crate::repr::RelationDesc;
/// mapping of table name <-> table id should be query from tableinfo manager
pub struct TableSource {
/// Table description, include relation desc and default values, which is the minimal information flow needed for table
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TableDesc {
pub relation_desc: RelationDesc,
pub default_values: Vec<Option<ColumnDefaultConstraint>>,
}
impl TableDesc {
pub fn new(
relation_desc: RelationDesc,
default_values: Vec<Option<ColumnDefaultConstraint>>,
) -> Self {
Self {
relation_desc,
default_values,
}
}
pub fn new_no_default(relation_desc: RelationDesc) -> Self {
Self {
relation_desc,
default_values: vec![],
}
}
}
/// Table source but for flow, provide table schema by table name/id
#[async_trait::async_trait]
pub trait FlowTableSource: Send + Sync + std::fmt::Debug {
async fn table_name_from_id(&self, table_id: &TableId) -> Result<TableName, Error>;
async fn table_id_from_name(&self, name: &TableName) -> Result<TableId, Error>;
/// Get the table schema by table name
async fn table(&self, name: &TableName) -> Result<TableDesc, Error> {
let id = self.table_id_from_name(name).await?;
self.table_from_id(&id).await
}
async fn table_from_id(&self, table_id: &TableId) -> Result<TableDesc, Error>;
}
/// managed table source information, query from table info manager and table name manager
#[derive(Clone)]
pub struct ManagedTableSource {
/// for query `TableId -> TableName` mapping
table_info_manager: TableInfoManager,
table_name_manager: TableNameManager,
}
impl TableSource {
#[async_trait::async_trait]
impl FlowTableSource for ManagedTableSource {
async fn table_from_id(&self, table_id: &TableId) -> Result<TableDesc, Error> {
let table_info_value = self
.get_table_info_value(table_id)
.await?
.with_context(|| TableNotFoundSnafu {
name: format!("TableId = {:?}, Can't found table info", table_id),
})?;
let desc = table_info_value_to_relation_desc(table_info_value)?;
Ok(desc)
}
async fn table_name_from_id(&self, table_id: &TableId) -> Result<TableName, Error> {
self.get_table_name(table_id).await
}
async fn table_id_from_name(&self, name: &TableName) -> Result<TableId, Error> {
self.get_opt_table_id_from_name(name)
.await?
.with_context(|| TableNotFoundSnafu {
name: name.join("."),
})
}
}
impl ManagedTableSource {
pub fn new(table_info_manager: TableInfoManager, table_name_manager: TableNameManager) -> Self {
TableSource {
ManagedTableSource {
table_info_manager,
table_name_manager,
}
@@ -63,7 +131,10 @@ impl TableSource {
}
/// If the table haven't been created in database, the tableId returned would be null
pub async fn get_table_id_from_name(&self, name: &TableName) -> Result<Option<TableId>, Error> {
pub async fn get_opt_table_id_from_name(
&self,
name: &TableName,
) -> Result<Option<TableId>, Error> {
let ret = self
.table_name_manager
.get(TableNameKey::new(&name[0], &name[1], &name[2]))
@@ -107,7 +178,7 @@ impl TableSource {
pub async fn get_table_name_schema(
&self,
table_id: &TableId,
) -> Result<(TableName, RelationDesc), Error> {
) -> Result<(TableName, TableDesc), Error> {
let table_info_value = self
.get_table_info_value(table_id)
.await?
@@ -126,3 +197,121 @@ impl TableSource {
Ok((table_name, desc))
}
}
impl std::fmt::Debug for ManagedTableSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("KvBackendTableSource").finish()
}
}
#[cfg(test)]
pub(crate) mod test {
use std::collections::HashMap;
use datatypes::data_type::ConcreteDataType as CDT;
use super::*;
use crate::repr::{ColumnType, RelationType};
pub struct FlowDummyTableSource {
pub id_names_to_desc: Vec<(TableId, TableName, TableDesc)>,
id_to_idx: HashMap<TableId, usize>,
name_to_idx: HashMap<TableName, usize>,
}
impl Default for FlowDummyTableSource {
fn default() -> Self {
let id_names_to_desc = vec![
(
1024,
[
"greptime".to_string(),
"public".to_string(),
"numbers".to_string(),
],
TableDesc::new_no_default(
RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)])
.into_named(vec![Some("number".to_string())]),
),
),
(
1025,
[
"greptime".to_string(),
"public".to_string(),
"numbers_with_ts".to_string(),
],
TableDesc::new_no_default(
RelationType::new(vec![
ColumnType::new(CDT::uint32_datatype(), false),
ColumnType::new(CDT::timestamp_millisecond_datatype(), false),
])
.into_named(vec![Some("number".to_string()), Some("ts".to_string())]),
),
),
];
let id_to_idx = id_names_to_desc
.iter()
.enumerate()
.map(|(idx, (id, _name, _desc))| (*id, idx))
.collect();
let name_to_idx = id_names_to_desc
.iter()
.enumerate()
.map(|(idx, (_id, name, _desc))| (name.clone(), idx))
.collect();
Self {
id_names_to_desc,
id_to_idx,
name_to_idx,
}
}
}
#[async_trait::async_trait]
impl FlowTableSource for FlowDummyTableSource {
async fn table_from_id(&self, table_id: &TableId) -> Result<TableDesc, Error> {
let idx = self.id_to_idx.get(table_id).context(TableNotFoundSnafu {
name: format!("Table id = {:?}, couldn't found table desc", table_id),
})?;
let desc = self
.id_names_to_desc
.get(*idx)
.map(|x| x.2.clone())
.context(TableNotFoundSnafu {
name: format!("Table id = {:?}, couldn't found table desc", table_id),
})?;
Ok(desc)
}
async fn table_name_from_id(&self, table_id: &TableId) -> Result<TableName, Error> {
let idx = self.id_to_idx.get(table_id).context(TableNotFoundSnafu {
name: format!("Table id = {:?}, couldn't found table desc", table_id),
})?;
self.id_names_to_desc
.get(*idx)
.map(|x| x.1.clone())
.context(TableNotFoundSnafu {
name: format!("Table id = {:?}, couldn't found table desc", table_id),
})
}
async fn table_id_from_name(&self, name: &TableName) -> Result<TableId, Error> {
for (id, table_name, _desc) in &self.id_names_to_desc {
if name == table_name {
return Ok(*id);
}
}
TableNotFoundSnafu {
name: format!("Table name = {:?}, couldn't found table id", name),
}
.fail()?
}
}
impl std::fmt::Debug for FlowDummyTableSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DummyTableSource").finish()
}
}
}

View File

@@ -27,6 +27,7 @@ use session::context::QueryContextBuilder;
use snafu::{OptionExt, ResultExt};
use table::table_reference::TableReference;
use crate::adapter::table_source::TableDesc;
use crate::adapter::{TableName, AUTO_CREATED_PLACEHOLDER_TS_COL};
use crate::error::{Error, ExternalSnafu, UnexpectedSnafu};
use crate::repr::{ColumnType, RelationDesc, RelationType};
@@ -126,7 +127,7 @@ impl FlowWorkerManager {
pub fn table_info_value_to_relation_desc(
table_info_value: TableInfoValue,
) -> Result<RelationDesc, Error> {
) -> Result<TableDesc, Error> {
let raw_schema = table_info_value.table_info.meta.schema;
let (column_types, col_names): (Vec<_>, Vec<_>) = raw_schema
.column_schemas
@@ -147,8 +148,7 @@ pub fn table_info_value_to_relation_desc(
let keys = vec![crate::repr::Key::from(key)];
let time_index = raw_schema.timestamp_index;
Ok(RelationDesc {
let relation_desc = RelationDesc {
typ: RelationType {
column_types,
keys,
@@ -157,7 +157,14 @@ pub fn table_info_value_to_relation_desc(
auto_columns: vec![],
},
names: col_names,
})
};
let default_values = raw_schema
.column_schemas
.iter()
.map(|c| c.default_constraint().cloned())
.collect_vec();
Ok(TableDesc::new(relation_desc, default_values))
}
pub fn from_proto_to_data_type(

View File

@@ -247,15 +247,25 @@ impl<'s> Worker<'s> {
src_recvs: Vec<broadcast::Receiver<Batch>>,
// TODO(discord9): set expire duration for all arrangement and compare to sys timestamp instead
expire_after: Option<repr::Duration>,
or_replace: bool,
create_if_not_exists: bool,
err_collector: ErrCollector,
) -> Result<Option<FlowId>, Error> {
let already_exists = self.task_states.contains_key(&flow_id);
match (already_exists, create_if_not_exists) {
(true, true) => return Ok(None),
(true, false) => FlowAlreadyExistSnafu { id: flow_id }.fail()?,
(false, _) => (),
};
let already_exist = self.task_states.contains_key(&flow_id);
match (create_if_not_exists, or_replace, already_exist) {
// if replace, ignore that old flow exists
(_, true, true) => {
info!("Replacing flow with id={}", flow_id);
}
(false, false, true) => FlowAlreadyExistSnafu { id: flow_id }.fail()?,
// already exists, and not replace, return None
(true, false, true) => {
info!("Flow with id={} already exists, do nothing", flow_id);
return Ok(None);
}
// continue as normal
(_, _, false) => (),
}
let mut cur_task_state = ActiveDataflowState::<'s> {
err_collector,
@@ -341,6 +351,7 @@ impl<'s> Worker<'s> {
source_ids,
src_recvs,
expire_after,
or_replace,
create_if_not_exists,
err_collector,
} => {
@@ -352,6 +363,7 @@ impl<'s> Worker<'s> {
&source_ids,
src_recvs,
expire_after,
or_replace,
create_if_not_exists,
err_collector,
);
@@ -398,6 +410,7 @@ pub enum Request {
source_ids: Vec<GlobalId>,
src_recvs: Vec<broadcast::Receiver<Batch>>,
expire_after: Option<repr::Duration>,
or_replace: bool,
create_if_not_exists: bool,
err_collector: ErrCollector,
},
@@ -547,6 +560,7 @@ mod test {
source_ids: src_ids,
src_recvs: vec![rx],
expire_after: None,
or_replace: false,
create_if_not_exists: true,
err_collector: ErrCollector::default(),
};

View File

@@ -32,6 +32,27 @@ use crate::expr::EvalError;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display(
"Failed to insert into flow: region_id={}, flow_ids={:?}",
region_id,
flow_ids
))]
InsertIntoFlow {
region_id: u64,
flow_ids: Vec<u64>,
source: BoxedError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Error encountered while creating flow: {sql}"))]
CreateFlow {
sql: String,
source: BoxedError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("External error"))]
External {
source: BoxedError,
@@ -207,16 +228,17 @@ pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Self::Eval { .. } | Self::JoinTask { .. } | Self::Datafusion { .. } => {
StatusCode::Internal
}
Self::Eval { .. }
| Self::JoinTask { .. }
| Self::Datafusion { .. }
| Self::InsertIntoFlow { .. } => StatusCode::Internal,
Self::FlowAlreadyExist { .. } => StatusCode::TableAlreadyExists,
Self::TableNotFound { .. }
| Self::TableNotFoundMeta { .. }
| Self::FlowNotFound { .. }
| Self::ListFlows { .. } => StatusCode::TableNotFound,
Self::Plan { .. } | Self::Datatypes { .. } => StatusCode::PlanQuery,
Self::InvalidQuery { .. } => StatusCode::EngineExecuteQuery,
Self::InvalidQuery { .. } | Self::CreateFlow { .. } => StatusCode::EngineExecuteQuery,
Self::Unexpected { .. } => StatusCode::Unexpected,
Self::NotImplemented { .. } | Self::UnsupportedTemporalFilter { .. } => {
StatusCode::Unsupported

View File

@@ -50,8 +50,8 @@ use tonic::{Request, Response, Status};
use crate::adapter::{CreateFlowArgs, FlowWorkerManagerRef};
use crate::error::{
to_status_with_last_err, CacheRequiredSnafu, ExternalSnafu, FlowNotFoundSnafu, ListFlowsSnafu,
ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu,
to_status_with_last_err, CacheRequiredSnafu, CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu,
ListFlowsSnafu, ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu,
};
use crate::heartbeat::HeartbeatTask;
use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS};
@@ -392,7 +392,13 @@ impl FlownodeBuilder {
.build(),
),
};
manager.create_flow(args).await?;
manager
.create_flow(args)
.await
.map_err(BoxedError::new)
.with_context(|_| CreateFlowSnafu {
sql: info.raw_sql().clone(),
})?;
}
Ok(cnt)

View File

@@ -173,12 +173,11 @@ mod test {
use super::*;
use crate::adapter::node_context::IdToNameMap;
use crate::adapter::table_source::test::FlowDummyTableSource;
use crate::df_optimizer::apply_df_optimizer;
use crate::expr::GlobalId;
use crate::repr::{ColumnType, RelationType};
pub fn create_test_ctx() -> FlownodeContext {
let mut schemas = HashMap::new();
let mut tri_map = IdToNameMap::new();
{
let gid = GlobalId::User(0);
@@ -187,10 +186,7 @@ mod test {
"public".to_string(),
"numbers".to_string(),
];
let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]);
tri_map.insert(Some(name.clone()), Some(1024), gid);
schemas.insert(gid, schema.into_named(vec![Some("number".to_string())]));
}
{
@@ -200,23 +196,16 @@ mod test {
"public".to_string(),
"numbers_with_ts".to_string(),
];
let schema = RelationType::new(vec![
ColumnType::new(CDT::uint32_datatype(), false),
ColumnType::new(CDT::timestamp_millisecond_datatype(), false),
]);
schemas.insert(
gid,
schema.into_named(vec![Some("number".to_string()), Some("ts".to_string())]),
);
tri_map.insert(Some(name.clone()), Some(1025), gid);
}
FlownodeContext {
schema: schemas,
table_repr: tri_map,
query_context: Some(Arc::new(QueryContext::with("greptime", "public"))),
..Default::default()
}
let dummy_source = FlowDummyTableSource::default();
let mut ctx = FlownodeContext::new(Box::new(dummy_source));
ctx.table_repr = tri_map;
ctx.query_context = Some(Arc::new(QueryContext::with("greptime", "public")));
ctx
}
pub fn create_test_query_engine() -> Arc<dyn QueryEngine> {

View File

@@ -128,7 +128,11 @@ impl AggregateExpr {
}
if args.len() != 1 {
return not_impl_err!("Aggregated function with multiple arguments is not supported");
let fn_name = extensions.get(&f.function_reference).cloned();
return not_impl_err!(
"Aggregated function (name={:?}) with multiple arguments is not supported",
fn_name
);
}
let arg = if let Some(first) = args.first() {

View File

@@ -176,7 +176,7 @@ impl TypedPlan {
}
.fail()?,
};
let table = ctx.table(&table_reference)?;
let table = ctx.table(&table_reference).await?;
let get_table = Plan::Get {
id: crate::expr::Id::Global(table.0),
};

View File

@@ -48,10 +48,6 @@ use tonic::transport::server::{Router, TcpIncoming};
use crate::election::etcd::EtcdElection;
#[cfg(feature = "pg_kvbackend")]
use crate::election::postgres::PgElection;
#[cfg(feature = "pg_kvbackend")]
use crate::election::CANDIDATE_LEASE_SECS;
#[cfg(feature = "pg_kvbackend")]
use crate::error::InvalidArgumentsSnafu;
use crate::error::{InitExportMetricsTaskSnafu, TomlFormatSnafu};
use crate::metasrv::builder::MetasrvBuilder;
@@ -233,15 +229,7 @@ pub async fn metasrv_builder(
let kv_backend = PgStore::with_pg_client(pg_client)
.await
.context(error::KvBackendSnafu)?;
let election_client = create_postgres_client(opts).await?;
let election = PgElection::with_pg_client(
opts.server_addr.clone(),
election_client,
opts.store_key_prefix.clone(),
CANDIDATE_LEASE_SECS,
)
.await?;
(kv_backend, Some(election))
(kv_backend, None)
}
};

View File

@@ -19,9 +19,7 @@ pub mod postgres;
use std::fmt::{self, Debug};
use std::sync::Arc;
use common_telemetry::{info, warn};
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::broadcast::{self, Receiver, Sender};
use tokio::sync::broadcast::Receiver;
use crate::error::Result;
use crate::metasrv::MetasrvNodeInfo;
@@ -77,37 +75,6 @@ impl fmt::Display for LeaderChangeMessage {
}
}
fn listen_leader_change(leader_value: String) -> Sender<LeaderChangeMessage> {
let (tx, mut rx) = broadcast::channel(100);
let _handle = common_runtime::spawn_global(async move {
loop {
match rx.recv().await {
Ok(msg) => match msg {
LeaderChangeMessage::Elected(key) => {
info!(
"[{leader_value}] is elected as leader: {:?}, lease: {}",
String::from_utf8_lossy(key.name()),
key.lease_id()
);
}
LeaderChangeMessage::StepDown(key) => {
warn!(
"[{leader_value}] is stepping down: {:?}, lease: {}",
String::from_utf8_lossy(key.name()),
key.lease_id()
);
}
},
Err(RecvError::Lagged(_)) => {
warn!("Log printing is too slow or leader changed too fast!");
}
Err(RecvError::Closed) => break,
}
}
});
tx
}
#[async_trait::async_trait]
pub trait Election: Send + Sync {
type Leader;

View File

@@ -23,12 +23,13 @@ use etcd_client::{
};
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::broadcast::Receiver;
use tokio::time::{timeout, MissedTickBehavior};
use crate::election::{
listen_leader_change, Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT,
CANDIDATE_LEASE_SECS, ELECTION_KEY, KEEP_ALIVE_INTERVAL_SECS,
Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT, CANDIDATE_LEASE_SECS, ELECTION_KEY,
KEEP_ALIVE_INTERVAL_SECS,
};
use crate::error;
use crate::error::Result;
@@ -87,7 +88,36 @@ impl EtcdElection {
E: AsRef<str>,
{
let leader_value: String = leader_value.as_ref().into();
let tx = listen_leader_change(leader_value.clone());
let leader_ident = leader_value.clone();
let (tx, mut rx) = broadcast::channel(100);
let _handle = common_runtime::spawn_global(async move {
loop {
match rx.recv().await {
Ok(msg) => match msg {
LeaderChangeMessage::Elected(key) => {
info!(
"[{leader_ident}] is elected as leader: {:?}, lease: {}",
String::from_utf8_lossy(key.name()),
key.lease_id()
);
}
LeaderChangeMessage::StepDown(key) => {
warn!(
"[{leader_ident}] is stepping down: {:?}, lease: {}",
String::from_utf8_lossy(key.name()),
key.lease_id()
);
}
},
Err(RecvError::Lagged(_)) => {
warn!("Log printing is too slow or leader changed too fast!");
}
Err(RecvError::Closed) => break,
}
}
});
Ok(Arc::new(Self {
leader_value,
client,

View File

@@ -16,32 +16,18 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS};
use common_telemetry::{error, warn};
use common_time::Timestamp;
use itertools::Itertools;
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::broadcast;
use tokio::time::MissedTickBehavior;
use tokio_postgres::Client;
use crate::election::{
listen_leader_change, Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT, ELECTION_KEY,
};
use crate::election::{Election, LeaderChangeMessage, CANDIDATES_ROOT, ELECTION_KEY};
use crate::error::{
DeserializeFromJsonSnafu, NoLeaderSnafu, PostgresExecutionSnafu, Result, SerializeToJsonSnafu,
UnexpectedSnafu,
DeserializeFromJsonSnafu, PostgresExecutionSnafu, Result, SerializeToJsonSnafu, UnexpectedSnafu,
};
use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
// TODO(CookiePie): The lock id should be configurable.
const CAMPAIGN: &str = "SELECT pg_try_advisory_lock(28319)";
const STEP_DOWN: &str = "SELECT pg_advisory_unlock(28319)";
const SET_IDLE_SESSION_TIMEOUT: &str = "SET idle_in_transaction_session_timeout = $1";
// Currently the session timeout is longer than the leader lease time, so the leader lease may expire while the session is still alive.
// Either the leader reconnects and step down or the session expires and the lock is released.
const IDLE_SESSION_TIMEOUT: &str = "10s";
// Separator between value and expire time.
const LEASE_SEP: &str = r#"||__metadata_lease_sep||"#;
@@ -95,33 +81,8 @@ fn parse_value_and_expire_time(value: &str) -> Result<(String, Timestamp)> {
Ok((value.to_string(), expire_time))
}
#[derive(Debug, Clone, Default)]
struct PgLeaderKey {
name: Vec<u8>,
key: Vec<u8>,
rev: i64,
lease: i64,
}
impl LeaderKey for PgLeaderKey {
fn name(&self) -> &[u8] {
&self.name
}
fn key(&self) -> &[u8] {
&self.key
}
fn revision(&self) -> i64 {
self.rev
}
fn lease_id(&self) -> i64 {
self.lease
}
}
/// PostgreSql implementation of Election.
/// TODO(CookiePie): Currently only support candidate registration. Add election logic.
pub struct PgElection {
leader_value: String,
client: Client,
@@ -139,13 +100,7 @@ impl PgElection {
store_key_prefix: String,
candidate_lease_ttl_secs: u64,
) -> Result<ElectionRef> {
// Set idle session timeout to IDLE_SESSION_TIMEOUT to avoid dead advisory lock.
client
.execute(SET_IDLE_SESSION_TIMEOUT, &[&IDLE_SESSION_TIMEOUT])
.await
.context(PostgresExecutionSnafu)?;
let tx = listen_leader_change(leader_value.clone());
let (tx, _) = broadcast::channel(100);
Ok(Arc::new(Self {
leader_value,
client,
@@ -157,7 +112,7 @@ impl PgElection {
}))
}
fn election_key(&self) -> String {
fn _election_key(&self) -> String {
format!("{}{}", self.store_key_prefix, ELECTION_KEY)
}
@@ -191,14 +146,11 @@ impl Election for PgElection {
serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu {
input: format!("{node_info:?}"),
})?;
let res = self
.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl_secs)
.await?;
let res = self.put_value_with_lease(&key, &node_info).await?;
// May registered before, just update the lease.
if !res {
self.delete_value(&key).await?;
self.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl_secs)
.await?;
self.put_value_with_lease(&key, &node_info).await?;
}
// Check if the current lease has expired and renew the lease.
@@ -245,65 +197,12 @@ impl Election for PgElection {
Ok(valid_candidates)
}
/// Attempts to acquire leadership by executing a campaign. This function continuously checks
/// if the current instance can become the leader by acquiring an advisory lock in the PostgreSQL database.
///
/// The function operates in a loop, where it:
///
/// 1. Waits for a predefined interval before attempting to acquire the lock again.
/// 2. Executes the `CAMPAIGN` SQL query to try to acquire the advisory lock.
/// 3. Checks the result of the query:
/// - If the lock is successfully acquired (result is true), it calls the `leader_action` method
/// to perform actions as the leader.
/// - If the lock is not acquired (result is false), it calls the `follower_action` method
/// to perform actions as a follower.
async fn campaign(&self) -> Result<()> {
let mut keep_alive_interval =
tokio::time::interval(Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS));
keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
let res = self
.client
.query(CAMPAIGN, &[])
.await
.context(PostgresExecutionSnafu)?;
if let Some(row) = res.first() {
match row.try_get(0) {
Ok(true) => self.leader_action().await?,
Ok(false) => self.follower_action().await?,
Err(_) => {
return UnexpectedSnafu {
violated: "Failed to get the result of acquiring advisory lock"
.to_string(),
}
.fail();
}
}
} else {
return UnexpectedSnafu {
violated: "Failed to get the result of acquiring advisory lock".to_string(),
}
.fail();
}
let _ = keep_alive_interval.tick().await;
}
todo!()
}
async fn leader(&self) -> Result<Self::Leader> {
if self.is_leader.load(Ordering::Relaxed) {
Ok(self.leader_value.as_bytes().into())
} else {
let key = self.election_key();
if let Some((leader, expire_time, current, _)) =
self.get_value_with_lease(&key, false).await?
{
ensure!(expire_time > current, NoLeaderSnafu);
Ok(leader.as_bytes().into())
} else {
NoLeaderSnafu.fail()
}
}
todo!()
}
async fn resign(&self) -> Result<()> {
@@ -416,17 +315,17 @@ impl PgElection {
}
/// Returns `true` if the insertion is successful
async fn put_value_with_lease(
&self,
key: &str,
value: &str,
lease_ttl_secs: u64,
) -> Result<bool> {
async fn put_value_with_lease(&self, key: &str, value: &str) -> Result<bool> {
let res = self
.client
.query(
PUT_IF_NOT_EXISTS_WITH_EXPIRE_TIME,
&[&key, &value, &LEASE_SEP, &(lease_ttl_secs as f64)],
&[
&key,
&value,
&LEASE_SEP,
&(self.candidate_lease_ttl_secs as f64),
],
)
.await
.context(PostgresExecutionSnafu)?;
@@ -444,177 +343,6 @@ impl PgElection {
Ok(res.len() == 1)
}
/// Handles the actions of a leader in the election process.
///
/// This function performs the following checks and actions:
///
/// - **Case 1**: If the current instance believes it is the leader from the previous term,
/// it attempts to renew the lease. It checks if the lease is still valid and either renews it
/// or steps down if it has expired.
///
/// - **Case 1.1**: If the instance is still the leader and the lease is valid, it renews the lease
/// by updating the value associated with the election key.
/// - **Case 1.2**: If the instance is still the leader but the lease has expired, it logs a warning
/// and steps down, initiating a new campaign for leadership.
/// - **Case 1.3**: If the instance is not the leader (which is a rare scenario), it logs a warning
/// indicating that it still holds the lock and steps down to re-initiate the campaign. This may
/// happen if the leader has failed to renew the lease and the session has expired, and recovery
/// after a period of time during which other leaders have been elected and stepped down.
/// - **Case 1.4**: If no lease information is found, it also steps down and re-initiates the campaign.
///
/// - **Case 2**: If the current instance is not leader previously, it calls the
/// `elected` method as a newly elected leader.
async fn leader_action(&self) -> Result<()> {
let key = self.election_key();
// Case 1
if self.is_leader() {
match self.get_value_with_lease(&key, true).await? {
Some((prev_leader, expire_time, current, prev)) => {
match (prev_leader == self.leader_value, expire_time > current) {
// Case 1.1
(true, true) => {
// Safety: prev is Some since we are using `get_value_with_lease` with `true`.
let prev = prev.unwrap();
self.update_value_with_lease(&key, &prev, &self.leader_value)
.await?;
}
// Case 1.2
(true, false) => {
warn!("Leader lease expired, now stepping down.");
self.step_down().await?;
}
// Case 1.3
(false, _) => {
warn!("Leader lease not found, but still hold the lock. Now stepping down.");
self.step_down().await?;
}
}
}
// Case 1.4
None => {
warn!("Leader lease not found, but still hold the lock. Now stepping down.");
self.step_down().await?;
}
}
// Case 2
} else {
self.elected().await?;
}
Ok(())
}
/// Handles the actions of a follower in the election process.
///
/// This function performs the following checks and actions:
///
/// - **Case 1**: If the current instance believes it is the leader from the previous term,
/// it steps down without deleting the key.
/// - **Case 2**: If the current instance is not the leader but the lease has expired, it raises an error
/// to re-initiate the campaign. If the leader failed to renew the lease, its session will expire and the lock
/// will be released.
/// - **Case 3**: If all checks pass, the function returns without performing any actions.
async fn follower_action(&self) -> Result<()> {
let key = self.election_key();
// Case 1
if self.is_leader() {
self.step_down_without_lock().await?;
}
let (_, expire_time, current, _) = self
.get_value_with_lease(&key, false)
.await?
.context(NoLeaderSnafu)?;
// Case 2
ensure!(expire_time > current, NoLeaderSnafu);
// Case 3
Ok(())
}
/// Step down the leader. The leader should delete the key and notify the leader watcher.
///
/// __DO NOT__ check if the deletion is successful, since the key may be deleted by others elected.
///
/// ## Caution:
/// Should only step down while holding the advisory lock.
async fn step_down(&self) -> Result<()> {
let key = self.election_key();
let leader_key = PgLeaderKey {
name: self.leader_value.clone().into_bytes(),
key: key.clone().into_bytes(),
..Default::default()
};
if self
.is_leader
.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
self.delete_value(&key).await?;
self.client
.query(STEP_DOWN, &[])
.await
.context(PostgresExecutionSnafu)?;
if let Err(e) = self
.leader_watcher
.send(LeaderChangeMessage::StepDown(Arc::new(leader_key)))
{
error!(e; "Failed to send leader change message");
}
}
Ok(())
}
/// Still consider itself as the leader locally but failed to acquire the lock. Step down without deleting the key.
async fn step_down_without_lock(&self) -> Result<()> {
let key = self.election_key().into_bytes();
let leader_key = PgLeaderKey {
name: self.leader_value.clone().into_bytes(),
key: key.clone(),
..Default::default()
};
if self
.is_leader
.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
if let Err(e) = self
.leader_watcher
.send(LeaderChangeMessage::StepDown(Arc::new(leader_key)))
{
error!(e; "Failed to send leader change message");
}
}
Ok(())
}
/// Elected as leader. The leader should put the key and notify the leader watcher.
/// Caution: Should only elected while holding the advisory lock.
async fn elected(&self) -> Result<()> {
let key = self.election_key();
let leader_key = PgLeaderKey {
name: self.leader_value.clone().into_bytes(),
key: key.clone().into_bytes(),
..Default::default()
};
self.delete_value(&key).await?;
self.put_value_with_lease(&key, &self.leader_value, META_LEASE_SECS)
.await?;
if self
.is_leader
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
self.leader_infancy.store(true, Ordering::Relaxed);
if let Err(e) = self
.leader_watcher
.send(LeaderChangeMessage::Elected(Arc::new(leader_key)))
{
error!(e; "Failed to send leader change message");
}
}
Ok(())
}
}
#[cfg(test)]
@@ -662,7 +390,7 @@ mod tests {
};
let res = pg_election
.put_value_with_lease(&key, &value, 10)
.put_value_with_lease(&key, &value)
.await
.unwrap();
assert!(res);
@@ -690,7 +418,7 @@ mod tests {
let key = format!("test_key_{}", i);
let value = format!("test_value_{}", i);
pg_election
.put_value_with_lease(&key, &value, 10)
.put_value_with_lease(&key, &value)
.await
.unwrap();
}
@@ -750,7 +478,7 @@ mod tests {
handles.push(handle);
}
// Wait for candidates to registrate themselves and renew their leases at least once.
tokio::time::sleep(Duration::from_secs(3)).await;
tokio::time::sleep(Duration::from_secs(6)).await;
let client = create_postgres_client().await.unwrap();
@@ -788,402 +516,4 @@ mod tests {
assert!(res);
}
}
#[tokio::test]
async fn test_elected_and_step_down() {
let leader_value = "test_leader".to_string();
let candidate_lease_ttl_secs = 5;
let client = create_postgres_client().await.unwrap();
let (tx, mut rx) = broadcast::channel(100);
let leader_pg_election = PgElection {
leader_value: leader_value.clone(),
client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: "test_prefix".to_string(),
candidate_lease_ttl_secs,
};
leader_pg_election.elected().await.unwrap();
let (leader, expire_time, current, _) = leader_pg_election
.get_value_with_lease(&leader_pg_election.election_key(), false)
.await
.unwrap()
.unwrap();
assert!(leader == leader_value);
assert!(expire_time > current);
assert!(leader_pg_election.is_leader());
match rx.recv().await {
Ok(LeaderChangeMessage::Elected(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_pg_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::Elected"),
}
leader_pg_election.step_down_without_lock().await.unwrap();
let (leader, _, _, _) = leader_pg_election
.get_value_with_lease(&leader_pg_election.election_key(), false)
.await
.unwrap()
.unwrap();
assert!(leader == leader_value);
assert!(!leader_pg_election.is_leader());
match rx.recv().await {
Ok(LeaderChangeMessage::StepDown(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_pg_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::StepDown"),
}
leader_pg_election.elected().await.unwrap();
let (leader, expire_time, current, _) = leader_pg_election
.get_value_with_lease(&leader_pg_election.election_key(), false)
.await
.unwrap()
.unwrap();
assert!(leader == leader_value);
assert!(expire_time > current);
assert!(leader_pg_election.is_leader());
match rx.recv().await {
Ok(LeaderChangeMessage::Elected(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_pg_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::Elected"),
}
leader_pg_election.step_down().await.unwrap();
let res = leader_pg_election
.get_value_with_lease(&leader_pg_election.election_key(), false)
.await
.unwrap();
assert!(res.is_none());
assert!(!leader_pg_election.is_leader());
match rx.recv().await {
Ok(LeaderChangeMessage::StepDown(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_pg_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::StepDown"),
}
}
#[tokio::test]
async fn test_leader_action() {
let leader_value = "test_leader".to_string();
let candidate_lease_ttl_secs = 5;
let client = create_postgres_client().await.unwrap();
let (tx, mut rx) = broadcast::channel(100);
let leader_pg_election = PgElection {
leader_value: leader_value.clone(),
client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: "test_prefix".to_string(),
candidate_lease_ttl_secs,
};
// Step 1: No leader exists, campaign and elected.
let res = leader_pg_election
.client
.query(CAMPAIGN, &[])
.await
.unwrap();
let res: bool = res[0].get(0);
assert!(res);
leader_pg_election.leader_action().await.unwrap();
let (leader, expire_time, current, _) = leader_pg_election
.get_value_with_lease(&leader_pg_election.election_key(), false)
.await
.unwrap()
.unwrap();
assert!(leader == leader_value);
assert!(expire_time > current);
assert!(leader_pg_election.is_leader());
match rx.recv().await {
Ok(LeaderChangeMessage::Elected(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_pg_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::Elected"),
}
// Step 2: As a leader, renew the lease.
let res = leader_pg_election
.client
.query(CAMPAIGN, &[])
.await
.unwrap();
let res: bool = res[0].get(0);
assert!(res);
leader_pg_election.leader_action().await.unwrap();
let (leader, new_expire_time, current, _) = leader_pg_election
.get_value_with_lease(&leader_pg_election.election_key(), false)
.await
.unwrap()
.unwrap();
assert!(leader == leader_value);
assert!(new_expire_time > current && new_expire_time > expire_time);
assert!(leader_pg_election.is_leader());
// Step 3: Something wrong, the leader lease expired.
tokio::time::sleep(Duration::from_secs(META_LEASE_SECS)).await;
let res = leader_pg_election
.client
.query(CAMPAIGN, &[])
.await
.unwrap();
let res: bool = res[0].get(0);
assert!(res);
leader_pg_election.leader_action().await.unwrap();
let res = leader_pg_election
.get_value_with_lease(&leader_pg_election.election_key(), false)
.await
.unwrap();
assert!(res.is_none());
match rx.recv().await {
Ok(LeaderChangeMessage::StepDown(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_pg_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::StepDown"),
}
// Step 4: Re-campaign and elected.
let res = leader_pg_election
.client
.query(CAMPAIGN, &[])
.await
.unwrap();
let res: bool = res[0].get(0);
assert!(res);
leader_pg_election.leader_action().await.unwrap();
let (leader, expire_time, current, _) = leader_pg_election
.get_value_with_lease(&leader_pg_election.election_key(), false)
.await
.unwrap()
.unwrap();
assert!(leader == leader_value);
assert!(expire_time > current);
assert!(leader_pg_election.is_leader());
match rx.recv().await {
Ok(LeaderChangeMessage::Elected(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_pg_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::Elected"),
}
// Step 5: Something wrong, the leader key is deleted by other followers.
leader_pg_election
.delete_value(&leader_pg_election.election_key())
.await
.unwrap();
leader_pg_election.leader_action().await.unwrap();
let res = leader_pg_election
.get_value_with_lease(&leader_pg_election.election_key(), false)
.await
.unwrap();
assert!(res.is_none());
assert!(!leader_pg_election.is_leader());
match rx.recv().await {
Ok(LeaderChangeMessage::StepDown(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_pg_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::StepDown"),
}
// Step 6: Re-campaign and elected.
let res = leader_pg_election
.client
.query(CAMPAIGN, &[])
.await
.unwrap();
let res: bool = res[0].get(0);
assert!(res);
leader_pg_election.leader_action().await.unwrap();
let (leader, expire_time, current, _) = leader_pg_election
.get_value_with_lease(&leader_pg_election.election_key(), false)
.await
.unwrap()
.unwrap();
assert!(leader == leader_value);
assert!(expire_time > current);
assert!(leader_pg_election.is_leader());
match rx.recv().await {
Ok(LeaderChangeMessage::Elected(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_pg_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::Elected"),
}
// Step 7: Something wrong, the leader key changed by others.
let res = leader_pg_election
.client
.query(CAMPAIGN, &[])
.await
.unwrap();
let res: bool = res[0].get(0);
assert!(res);
leader_pg_election
.delete_value(&leader_pg_election.election_key())
.await
.unwrap();
leader_pg_election
.put_value_with_lease(&leader_pg_election.election_key(), "test", 10)
.await
.unwrap();
leader_pg_election.leader_action().await.unwrap();
let res = leader_pg_election
.get_value_with_lease(&leader_pg_election.election_key(), false)
.await
.unwrap();
assert!(res.is_none());
assert!(!leader_pg_election.is_leader());
match rx.recv().await {
Ok(LeaderChangeMessage::StepDown(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_pg_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::StepDown"),
}
}
#[tokio::test]
async fn test_follower_action() {
let candidate_lease_ttl_secs = 5;
let follower_client = create_postgres_client().await.unwrap();
let (tx, mut rx) = broadcast::channel(100);
let follower_pg_election = PgElection {
leader_value: "test_follower".to_string(),
client: follower_client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: "test_prefix".to_string(),
candidate_lease_ttl_secs,
};
let leader_client = create_postgres_client().await.unwrap();
let (tx, _) = broadcast::channel(100);
let leader_pg_election = PgElection {
leader_value: "test_leader".to_string(),
client: leader_client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: "test_prefix".to_string(),
candidate_lease_ttl_secs,
};
leader_pg_election
.client
.query(CAMPAIGN, &[])
.await
.unwrap();
leader_pg_election.elected().await.unwrap();
// Step 1: As a follower, the leader exists and the lease is not expired.
follower_pg_election.follower_action().await.unwrap();
// Step 2: As a follower, the leader exists but the lease expired.
tokio::time::sleep(Duration::from_secs(META_LEASE_SECS)).await;
assert!(follower_pg_election.follower_action().await.is_err());
// Step 3: As a follower, the leader does not exist.
leader_pg_election
.delete_value(&leader_pg_election.election_key())
.await
.unwrap();
assert!(follower_pg_election.follower_action().await.is_err());
// Step 4: Follower thinks it's the leader but failed to acquire the lock.
follower_pg_election
.is_leader
.store(true, Ordering::Relaxed);
assert!(follower_pg_election.follower_action().await.is_err());
match rx.recv().await {
Ok(LeaderChangeMessage::StepDown(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), "test_follower");
assert_eq!(
String::from_utf8_lossy(key.key()),
follower_pg_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::StepDown"),
}
}
}

View File

@@ -27,6 +27,7 @@ use std::sync::Arc;
use std::time::Instant;
use api::v1::region::compact_request;
use api::v1::region::compact_request::Options;
use common_base::Plugins;
use common_meta::key::SchemaMetadataManagerRef;
use common_telemetry::{debug, error, info, warn};
@@ -40,6 +41,7 @@ use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{RegionId, TableId};
use table::predicate::Predicate;
use task::MAX_PARALLEL_COMPACTION;
use tokio::sync::mpsc::{self, Sender};
use crate::access_layer::AccessLayerRef;
@@ -49,9 +51,9 @@ use crate::compaction::picker::{new_picker, CompactionTask};
use crate::compaction::task::CompactionTaskImpl;
use crate::config::MitoConfig;
use crate::error::{
CompactRegionSnafu, Error, GetSchemaMetadataSnafu, RegionClosedSnafu, RegionDroppedSnafu,
RegionTruncatedSnafu, RemoteCompactionSnafu, Result, TimeRangePredicateOverflowSnafu,
TimeoutSnafu,
CompactRegionSnafu, Error, GetSchemaMetadataSnafu, ManualCompactionOverrideSnafu,
RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, RemoteCompactionSnafu, Result,
TimeRangePredicateOverflowSnafu, TimeoutSnafu,
};
use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
use crate::read::projection::ProjectionMapper;
@@ -85,19 +87,13 @@ pub struct CompactionRequest {
pub(crate) manifest_ctx: ManifestContextRef,
pub(crate) listener: WorkerListener,
pub(crate) schema_metadata_manager: SchemaMetadataManagerRef,
pub(crate) max_parallelism: usize,
}
impl CompactionRequest {
pub(crate) fn region_id(&self) -> RegionId {
self.current_version.metadata.region_id
}
/// Push waiter to the request.
pub(crate) fn push_waiter(&mut self, mut waiter: OptionOutputTx) {
if let Some(waiter) = waiter.take_inner() {
self.waiters.push(waiter);
}
}
}
/// Compaction scheduler tracks and manages compaction tasks.
@@ -145,10 +141,27 @@ impl CompactionScheduler {
waiter: OptionOutputTx,
manifest_ctx: &ManifestContextRef,
schema_metadata_manager: SchemaMetadataManagerRef,
max_parallelism: usize,
) -> Result<()> {
if let Some(status) = self.region_status.get_mut(&region_id) {
// Region is compacting. Add the waiter to pending list.
status.merge_waiter(waiter);
match compact_options {
Options::Regular(_) => {
// Region is compacting. Add the waiter to pending list.
status.merge_waiter(waiter);
}
options @ Options::StrictWindow(_) => {
// Incoming compaction request is manually triggered.
status.set_pending_request(PendingCompaction {
options,
waiter,
max_parallelism,
});
info!(
"Region {} is compacting, manually compaction will be re-scheduled.",
region_id
);
}
}
return Ok(());
}
@@ -163,6 +176,7 @@ impl CompactionScheduler {
manifest_ctx,
self.listener.clone(),
schema_metadata_manager,
max_parallelism,
);
self.region_status.insert(region_id, status);
let result = self
@@ -184,6 +198,35 @@ impl CompactionScheduler {
return;
};
if let Some(pending_request) = std::mem::take(&mut status.pending_request) {
let PendingCompaction {
options,
waiter,
max_parallelism,
} = pending_request;
let request = status.new_compaction_request(
self.request_sender.clone(),
waiter,
self.engine_config.clone(),
self.cache_manager.clone(),
manifest_ctx,
self.listener.clone(),
schema_metadata_manager,
max_parallelism,
);
if let Err(e) = self.schedule_compaction_request(request, options).await {
error!(e; "Failed to continue pending manual compaction for region id: {}", region_id);
} else {
debug!(
"Successfully scheduled manual compaction for region id: {}",
region_id
);
}
return;
}
// We should always try to compact the region until picker returns None.
let request = status.new_compaction_request(
self.request_sender.clone(),
@@ -193,6 +236,7 @@ impl CompactionScheduler {
manifest_ctx,
self.listener.clone(),
schema_metadata_manager,
MAX_PARALLEL_COMPACTION,
);
// Try to schedule next compaction task for this region.
if let Err(e) = self
@@ -264,6 +308,7 @@ impl CompactionScheduler {
manifest_ctx,
listener,
schema_metadata_manager,
max_parallelism,
} = request;
let ttl = find_ttl(
@@ -294,6 +339,7 @@ impl CompactionScheduler {
manifest_ctx: manifest_ctx.clone(),
file_purger: None,
ttl: Some(ttl),
max_parallelism,
};
let picker_output = {
@@ -417,27 +463,6 @@ impl Drop for CompactionScheduler {
}
}
/// Pending compaction tasks.
struct PendingCompaction {
waiters: Vec<OutputTx>,
}
impl PendingCompaction {
/// Push waiter to the request.
fn push_waiter(&mut self, mut waiter: OptionOutputTx) {
if let Some(waiter) = waiter.take_inner() {
self.waiters.push(waiter);
}
}
/// Send compaction error to waiter.
fn on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
for waiter in self.waiters.drain(..) {
waiter.send(Err(err.clone()).context(CompactRegionSnafu { region_id }));
}
}
}
/// Finds TTL of table by first examine table options then database options.
async fn find_ttl(
table_id: TableId,
@@ -471,10 +496,10 @@ struct CompactionStatus {
version_control: VersionControlRef,
/// Access layer of the region.
access_layer: AccessLayerRef,
/// Compaction pending to schedule.
///
/// For simplicity, we merge all pending compaction requests into one.
pending_compaction: Option<PendingCompaction>,
/// Pending waiters for compaction.
waiters: Vec<OutputTx>,
/// Pending compactions that are supposed to run as soon as current compaction task finished.
pending_request: Option<PendingCompaction>,
}
impl CompactionStatus {
@@ -488,23 +513,44 @@ impl CompactionStatus {
region_id,
version_control,
access_layer,
pending_compaction: None,
waiters: Vec::new(),
pending_request: None,
}
}
/// Merge the watier to the pending compaction.
fn merge_waiter(&mut self, waiter: OptionOutputTx) {
let pending = self
.pending_compaction
.get_or_insert_with(|| PendingCompaction {
waiters: Vec::new(),
});
pending.push_waiter(waiter);
/// Merge the waiter to the pending compaction.
fn merge_waiter(&mut self, mut waiter: OptionOutputTx) {
if let Some(waiter) = waiter.take_inner() {
self.waiters.push(waiter);
}
}
fn on_failure(self, err: Arc<Error>) {
if let Some(mut pending) = self.pending_compaction {
pending.on_failure(self.region_id, err.clone());
/// Set pending compaction request or replace current value if already exist.
fn set_pending_request(&mut self, pending: PendingCompaction) {
if let Some(mut prev) = self.pending_request.replace(pending) {
debug!(
"Replace pending compaction options with new request {:?} for region: {}",
prev.options, self.region_id
);
if let Some(waiter) = prev.waiter.take_inner() {
waiter.send(ManualCompactionOverrideSnafu.fail());
}
}
}
fn on_failure(mut self, err: Arc<Error>) {
for waiter in self.waiters.drain(..) {
waiter.send(Err(err.clone()).context(CompactRegionSnafu {
region_id: self.region_id,
}));
}
if let Some(pending_compaction) = self.pending_request {
pending_compaction
.waiter
.send(Err(err.clone()).context(CompactRegionSnafu {
region_id: self.region_id,
}));
}
}
@@ -515,34 +561,36 @@ impl CompactionStatus {
fn new_compaction_request(
&mut self,
request_sender: Sender<WorkerRequest>,
waiter: OptionOutputTx,
mut waiter: OptionOutputTx,
engine_config: Arc<MitoConfig>,
cache_manager: CacheManagerRef,
manifest_ctx: &ManifestContextRef,
listener: WorkerListener,
schema_metadata_manager: SchemaMetadataManagerRef,
max_parallelism: usize,
) -> CompactionRequest {
let current_version = CompactionVersion::from(self.version_control.current().version);
let start_time = Instant::now();
let mut req = CompactionRequest {
let mut waiters = Vec::with_capacity(self.waiters.len() + 1);
waiters.extend(std::mem::take(&mut self.waiters));
if let Some(waiter) = waiter.take_inner() {
waiters.push(waiter);
}
CompactionRequest {
engine_config,
current_version,
access_layer: self.access_layer.clone(),
request_sender: request_sender.clone(),
waiters: Vec::new(),
waiters,
start_time,
cache_manager,
manifest_ctx: manifest_ctx.clone(),
listener,
schema_metadata_manager,
};
if let Some(pending) = self.pending_compaction.take() {
req.waiters = pending.waiters;
max_parallelism,
}
req.push_waiter(waiter);
req
}
}
@@ -680,8 +728,20 @@ fn get_expired_ssts(
.collect()
}
/// Pending compaction request that is supposed to run after current task is finished,
/// typically used for manual compactions.
struct PendingCompaction {
/// Compaction options. Currently, it can only be [StrictWindow].
pub(crate) options: compact_request::Options,
/// Waiters of pending requests.
pub(crate) waiter: OptionOutputTx,
/// Max parallelism for pending compaction.
pub(crate) max_parallelism: usize,
}
#[cfg(test)]
mod tests {
use api::v1::region::StrictWindow;
use tokio::sync::oneshot;
use super::*;
@@ -722,6 +782,7 @@ mod tests {
waiter,
&manifest_ctx,
schema_metadata_manager.clone(),
1,
)
.await
.unwrap();
@@ -742,6 +803,7 @@ mod tests {
waiter,
&manifest_ctx,
schema_metadata_manager,
1,
)
.await
.unwrap();
@@ -752,6 +814,7 @@ mod tests {
#[tokio::test]
async fn test_schedule_on_finished() {
common_telemetry::init_default_ut_logging();
let job_scheduler = Arc::new(VecScheduler::default());
let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
let (tx, _rx) = mpsc::channel(4);
@@ -795,6 +858,7 @@ mod tests {
OptionOutputTx::none(),
&manifest_ctx,
schema_metadata_manager.clone(),
1,
)
.await
.unwrap();
@@ -816,6 +880,119 @@ mod tests {
purger.clone(),
);
// The task is pending.
let (tx, _rx) = oneshot::channel();
scheduler
.schedule_compaction(
region_id,
compact_request::Options::Regular(Default::default()),
&version_control,
&env.access_layer,
OptionOutputTx::new(Some(OutputTx::new(tx))),
&manifest_ctx,
schema_metadata_manager.clone(),
1,
)
.await
.unwrap();
assert_eq!(1, scheduler.region_status.len());
assert_eq!(1, job_scheduler.num_jobs());
assert!(!scheduler
.region_status
.get(&builder.region_id())
.unwrap()
.waiters
.is_empty());
// On compaction finished and schedule next compaction.
scheduler
.on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
.await;
assert_eq!(1, scheduler.region_status.len());
assert_eq!(2, job_scheduler.num_jobs());
// 5 files for next compaction.
apply_edit(
&version_control,
&[(0, end), (20, end), (40, end), (60, end), (80, end)],
&[],
purger.clone(),
);
let (tx, _rx) = oneshot::channel();
// The task is pending.
scheduler
.schedule_compaction(
region_id,
compact_request::Options::Regular(Default::default()),
&version_control,
&env.access_layer,
OptionOutputTx::new(Some(OutputTx::new(tx))),
&manifest_ctx,
schema_metadata_manager,
1,
)
.await
.unwrap();
assert_eq!(2, job_scheduler.num_jobs());
assert!(!scheduler
.region_status
.get(&builder.region_id())
.unwrap()
.waiters
.is_empty());
}
#[tokio::test]
async fn test_manual_compaction_when_compaction_in_progress() {
common_telemetry::init_default_ut_logging();
let job_scheduler = Arc::new(VecScheduler::default());
let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let mut builder = VersionControlBuilder::new();
let purger = builder.file_purger();
let region_id = builder.region_id();
let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
schema_metadata_manager
.register_region_table_info(
builder.region_id().table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
kv_backend,
)
.await;
// 5 files to compact.
let end = 1000 * 1000;
let version_control = Arc::new(
builder
.push_l0_file(0, end)
.push_l0_file(10, end)
.push_l0_file(50, end)
.push_l0_file(80, end)
.push_l0_file(90, end)
.build(),
);
let manifest_ctx = env
.mock_manifest_context(version_control.current().version.metadata.clone())
.await;
let file_metas: Vec<_> = version_control.current().version.ssts.levels()[0]
.files
.values()
.map(|file| file.meta_ref().clone())
.collect();
// 5 files for next compaction and removes old files.
apply_edit(
&version_control,
&[(0, end), (20, end), (40, end), (60, end), (80, end)],
&file_metas,
purger.clone(),
);
scheduler
.schedule_compaction(
region_id,
@@ -825,17 +1002,40 @@ mod tests {
OptionOutputTx::none(),
&manifest_ctx,
schema_metadata_manager.clone(),
1,
)
.await
.unwrap();
// Should schedule 1 compaction.
assert_eq!(1, scheduler.region_status.len());
assert_eq!(1, job_scheduler.num_jobs());
assert!(scheduler
.region_status
.get(&builder.region_id())
.get(&region_id)
.unwrap()
.pending_compaction
.is_some());
.pending_request
.is_none());
// Schedule another manual compaction.
let (tx, _rx) = oneshot::channel();
scheduler
.schedule_compaction(
region_id,
compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }),
&version_control,
&env.access_layer,
OptionOutputTx::new(Some(OutputTx::new(tx))),
&manifest_ctx,
schema_metadata_manager.clone(),
1,
)
.await
.unwrap();
assert_eq!(1, scheduler.region_status.len());
// Current job num should be 1 since compaction is in progress.
assert_eq!(1, job_scheduler.num_jobs());
let status = scheduler.region_status.get(&builder.region_id()).unwrap();
assert!(status.pending_request.is_some());
// On compaction finished and schedule next compaction.
scheduler
@@ -843,32 +1043,8 @@ mod tests {
.await;
assert_eq!(1, scheduler.region_status.len());
assert_eq!(2, job_scheduler.num_jobs());
// 5 files for next compaction.
apply_edit(
&version_control,
&[(0, end), (20, end), (40, end), (60, end), (80, end)],
&[],
purger.clone(),
);
// The task is pending.
scheduler
.schedule_compaction(
region_id,
compact_request::Options::Regular(Default::default()),
&version_control,
&env.access_layer,
OptionOutputTx::none(),
&manifest_ctx,
schema_metadata_manager,
)
.await
.unwrap();
assert_eq!(2, job_scheduler.num_jobs());
assert!(scheduler
.region_status
.get(&builder.region_id())
.unwrap()
.pending_compaction
.is_some());
let status = scheduler.region_status.get(&builder.region_id()).unwrap();
assert!(status.pending_request.is_none());
}
}

View File

@@ -91,6 +91,12 @@ pub struct CompactionRegion {
pub(crate) current_version: CompactionVersion,
pub(crate) file_purger: Option<Arc<LocalFilePurger>>,
pub(crate) ttl: Option<TimeToLive>,
/// Controls the parallelism of this compaction task. Default is 1.
///
/// The parallel is inside this compaction task, not across different compaction tasks.
/// It can be different windows of the same compaction task or something like this.
pub max_parallelism: usize,
}
/// OpenCompactionRegionRequest represents the request to open a compaction region.
@@ -99,6 +105,7 @@ pub struct OpenCompactionRegionRequest {
pub region_id: RegionId,
pub region_dir: String,
pub region_options: RegionOptions,
pub max_parallelism: usize,
}
/// Open a compaction region from a compaction request.
@@ -205,6 +212,7 @@ pub async fn open_compaction_region(
current_version,
file_purger: Some(file_purger),
ttl: Some(ttl),
max_parallelism: req.max_parallelism,
})
}
@@ -266,6 +274,7 @@ impl Compactor for DefaultCompactor {
let mut futs = Vec::with_capacity(picker_output.outputs.len());
let mut compacted_inputs =
Vec::with_capacity(picker_output.outputs.iter().map(|o| o.inputs.len()).sum());
let internal_parallelism = compaction_region.max_parallelism.max(1);
for output in picker_output.outputs.drain(..) {
compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone()));
@@ -358,9 +367,8 @@ impl Compactor for DefaultCompactor {
}
let mut output_files = Vec::with_capacity(futs.len());
while !futs.is_empty() {
let mut task_chunk =
Vec::with_capacity(crate::compaction::task::MAX_PARALLEL_COMPACTION);
for _ in 0..crate::compaction::task::MAX_PARALLEL_COMPACTION {
let mut task_chunk = Vec::with_capacity(internal_parallelism);
for _ in 0..internal_parallelism {
if let Some(task) = futs.pop() {
task_chunk.push(common_runtime::spawn_compact(task));
}

View File

@@ -32,7 +32,7 @@ use crate::request::{
use crate::worker::WorkerListener;
/// Maximum number of compaction tasks in parallel.
pub const MAX_PARALLEL_COMPACTION: usize = 8;
pub const MAX_PARALLEL_COMPACTION: usize = 1;
pub(crate) struct CompactionTaskImpl {
pub compaction_region: CompactionRegion,

View File

@@ -443,7 +443,7 @@ impl Default for InvertedIndexConfig {
intermediate_path: String::new(),
metadata_cache_size: ReadableSize::mb(64),
content_cache_size: ReadableSize::mb(128),
content_cache_page_size: ReadableSize::mb(8),
content_cache_page_size: ReadableSize::kb(64),
};
if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {

View File

@@ -12,16 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::ops::Range;
use std::sync::Arc;
use std::time::Duration;
use api::v1::{ColumnSchema, Rows};
use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
use datatypes::prelude::ScalarVector;
use datatypes::vectors::TimestampMillisecondVector;
use store_api::region_engine::{RegionEngine, RegionRole};
use store_api::region_request::AlterKind::SetRegionOptions;
use store_api::region_request::{
RegionCompactRequest, RegionDeleteRequest, RegionFlushRequest, RegionRequest,
RegionAlterRequest, RegionCompactRequest, RegionDeleteRequest, RegionFlushRequest,
RegionOpenRequest, RegionRequest, SetRegionOption,
};
use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::Notify;
@@ -466,3 +470,219 @@ async fn test_compaction_update_time_window() {
let vec = collect_stream_ts(stream).await;
assert_eq!((0..4000).map(|v| v * 1000).collect::<Vec<_>>(), vec);
}
#[tokio::test]
async fn test_change_region_compaction_window() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
env.get_kv_backend(),
)
.await;
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "1")
.insert_option("compaction.twcs.max_active_window_files", "1")
.insert_option("compaction.twcs.max_inactive_window_runs", "1")
.insert_option("compaction.twcs.max_inactive_window_files", "1")
.build();
let region_dir = request.region_dir.clone();
let column_schemas = request
.column_metadatas
.iter()
.map(column_metadata_to_column_schema)
.collect::<Vec<_>>();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Flush 2 SSTs for compaction.
put_and_flush(&engine, region_id, &column_schemas, 0..1200).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 1200..2400).await; // window 3600
engine
.handle_request(
region_id,
RegionRequest::Compact(RegionCompactRequest::default()),
)
.await
.unwrap();
// Put window 7200
put_and_flush(&engine, region_id, &column_schemas, 4000..5000).await; // window 3600
// Check compaction window.
let region = engine.get_region(region_id).unwrap();
{
let version = region.version();
assert_eq!(
Some(Duration::from_secs(3600)),
version.compaction_time_window,
);
assert!(version.options.compaction.time_window().is_none());
}
// Change compaction window.
let request = RegionRequest::Alter(RegionAlterRequest {
schema_version: region.metadata().schema_version,
kind: SetRegionOptions {
options: vec![SetRegionOption::Twsc(
"compaction.twcs.time_window".to_string(),
"2h".to_string(),
)],
},
});
engine.handle_request(region_id, request).await.unwrap();
// Compaction again. It should compacts window 3600 and 7200
// into 7200.
engine
.handle_request(
region_id,
RegionRequest::Compact(RegionCompactRequest::default()),
)
.await
.unwrap();
// Check compaction window.
{
let region = engine.get_region(region_id).unwrap();
let version = region.version();
assert_eq!(
Some(Duration::from_secs(7200)),
version.compaction_time_window,
);
assert_eq!(
Some(Duration::from_secs(7200)),
version.options.compaction.time_window()
);
}
// Reopen region.
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir,
options: Default::default(),
skip_wal_replay: false,
}),
)
.await
.unwrap();
// Check compaction window.
{
let region = engine.get_region(region_id).unwrap();
let version = region.version();
assert_eq!(
Some(Duration::from_secs(7200)),
version.compaction_time_window,
);
// We open the region without options, so the time window should be None.
assert!(version.options.compaction.time_window().is_none());
}
}
#[tokio::test]
async fn test_open_overwrite_compaction_window() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
env.get_kv_backend(),
)
.await;
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "1")
.insert_option("compaction.twcs.max_active_window_files", "1")
.insert_option("compaction.twcs.max_inactive_window_runs", "1")
.insert_option("compaction.twcs.max_inactive_window_files", "1")
.build();
let region_dir = request.region_dir.clone();
let column_schemas = request
.column_metadatas
.iter()
.map(column_metadata_to_column_schema)
.collect::<Vec<_>>();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Flush 2 SSTs for compaction.
put_and_flush(&engine, region_id, &column_schemas, 0..1200).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 1200..2400).await; // window 3600
engine
.handle_request(
region_id,
RegionRequest::Compact(RegionCompactRequest::default()),
)
.await
.unwrap();
// Check compaction window.
{
let region = engine.get_region(region_id).unwrap();
let version = region.version();
assert_eq!(
Some(Duration::from_secs(3600)),
version.compaction_time_window,
);
assert!(version.options.compaction.time_window().is_none());
}
// Reopen region.
let options = HashMap::from([
("compaction.type".to_string(), "twcs".to_string()),
("compaction.twcs.time_window".to_string(), "2h".to_string()),
]);
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir,
options,
skip_wal_replay: false,
}),
)
.await
.unwrap();
// Check compaction window.
{
let region = engine.get_region(region_id).unwrap();
let version = region.version();
assert_eq!(
Some(Duration::from_secs(7200)),
version.compaction_time_window,
);
assert_eq!(
Some(Duration::from_secs(7200)),
version.options.compaction.time_window()
);
}
}

View File

@@ -464,6 +464,7 @@ async fn test_open_compaction_region() {
region_id,
region_dir: region_dir.clone(),
region_options: RegionOptions::default(),
max_parallelism: 1,
};
let compaction_region = open_compaction_region(

View File

@@ -925,6 +925,23 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Unexpected impure default value with region_id: {}, column: {}, default_value: {}",
region_id,
column,
default_value
))]
UnexpectedImpureDefault {
#[snafu(implicit)]
location: Location,
region_id: RegionId,
column: String,
default_value: String,
},
#[snafu(display("Manual compaction is override by following operations."))]
ManualCompactionOverride {},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -964,7 +981,8 @@ impl ErrorExt for Error {
| InvalidParquet { .. }
| OperateAbortedIndex { .. }
| UnexpectedReplay { .. }
| IndexEncodeNull { .. } => StatusCode::Unexpected,
| IndexEncodeNull { .. }
| UnexpectedImpureDefault { .. } => StatusCode::Unexpected,
RegionNotFound { .. } => StatusCode::RegionNotFound,
ObjectStoreNotFound { .. }
| InvalidScanIndex { .. }
@@ -1067,6 +1085,8 @@ impl ErrorExt for Error {
PushBloomFilterValue { source, .. } | BloomFilterFinish { source, .. } => {
source.status_code()
}
ManualCompactionOverride {} => StatusCode::Cancelled,
}
}

View File

@@ -26,6 +26,7 @@
use std::sync::{Arc, RwLock};
use std::time::Duration;
use common_telemetry::info;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::SequenceNumber;
@@ -253,7 +254,10 @@ pub(crate) struct Version {
///
/// Used to check if it is a flush task during the truncating table.
pub(crate) truncated_entry_id: Option<EntryId>,
/// Inferred compaction time window.
/// Inferred compaction time window from flush.
///
/// If compaction options contain a time window, it will overwrite this value
/// when creating a new version from the [VersionBuilder].
pub(crate) compaction_time_window: Option<Duration>,
/// Options of the region.
pub(crate) options: RegionOptions,
@@ -389,7 +393,24 @@ impl VersionBuilder {
}
/// Builds a new [Version] from the builder.
/// It overwrites the window size by compaction option.
pub(crate) fn build(self) -> Version {
let compaction_time_window = self
.options
.compaction
.time_window()
.or(self.compaction_time_window);
if self.compaction_time_window.is_some()
&& compaction_time_window != self.compaction_time_window
{
info!(
"VersionBuilder overwrites region compaction time window from {:?} to {:?}, region: {}",
self.compaction_time_window,
compaction_time_window,
self.metadata.region_id
);
}
Version {
metadata: self.metadata,
memtables: self.memtables,
@@ -397,7 +418,7 @@ impl VersionBuilder {
flushed_entry_id: self.flushed_entry_id,
flushed_sequence: self.flushed_sequence,
truncated_entry_id: self.truncated_entry_id,
compaction_time_window: self.compaction_time_window,
compaction_time_window,
options: self.options,
}
}

View File

@@ -42,7 +42,7 @@ use tokio::sync::oneshot::{self, Receiver, Sender};
use crate::error::{
CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu,
FlushRegionSnafu, InvalidRequestSnafu, Result,
FlushRegionSnafu, InvalidRequestSnafu, Result, UnexpectedImpureDefaultSnafu,
};
use crate::manifest::action::RegionEdit;
use crate::memtable::MemtableId;
@@ -333,6 +333,14 @@ impl WriteRequest {
}
OpType::Put => {
// For put requests, we use the default value from column schema.
if column.column_schema.is_default_impure() {
UnexpectedImpureDefaultSnafu {
region_id: self.region_id,
column: &column.column_schema.name,
default_value: format!("{:?}", column.column_schema.default_constraint()),
}
.fail()?
}
column
.column_schema
.create_default()
@@ -1039,6 +1047,57 @@ mod tests {
check_invalid_request(&err, r#"unknown columns: ["k1"]"#);
}
#[test]
fn test_fill_impure_columns_err() {
let rows = Rows {
schema: vec![new_column_schema(
"k0",
ColumnDataType::Int64,
SemanticType::Tag,
)],
rows: vec![Row {
values: vec![i64_value(1)],
}],
};
let metadata = {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
builder
.push_column_metadata(ColumnMetadata {
column_schema: datatypes::schema::ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_default_constraint(Some(ColumnDefaultConstraint::Function(
"now()".to_string(),
)))
.unwrap(),
semantic_type: SemanticType::Timestamp,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: datatypes::schema::ColumnSchema::new(
"k0",
ConcreteDataType::int64_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 2,
})
.primary_key(vec![2]);
builder.build().unwrap()
};
let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap();
let err = request.check_schema(&metadata).unwrap_err();
assert!(err.is_fill_default());
assert!(request
.fill_missing_columns(&metadata)
.unwrap_err()
.to_string()
.contains("Unexpected impure default value with region_id"));
}
#[test]
fn test_fill_missing_columns() {
let rows = Rows {

View File

@@ -16,7 +16,7 @@ use std::path::PathBuf;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_telemetry::warn;
use common_telemetry::{debug, warn};
use futures::{AsyncRead, AsyncWrite};
use index::error as index_error;
use index::error::Result as IndexResult;
@@ -189,7 +189,8 @@ impl ExternalTempFileProvider for TempFileProvider {
for entry in entries {
if entry.metadata().is_dir() {
warn!("Unexpected entry in index creation dir: {:?}", entry.path());
// todo(hl): we can keep this warning once we find a way to filter self in list result.
debug!("Unexpected entry in index creation dir: {:?}", entry.path());
continue;
}

View File

@@ -145,10 +145,8 @@ impl<S> RegionWorkerLoop<S> {
}
info!(
"Try to alter region {} from version {} to {}",
region_id,
version.metadata.schema_version,
region.metadata().schema_version
"Try to alter region {}, version.metadata: {:?}, request: {:?}",
region_id, version.metadata, request,
);
self.handle_alter_region_metadata(region, version, request, sender);
}

View File

@@ -45,6 +45,8 @@ impl<S> RegionWorkerLoop<S> {
sender,
&region.manifest_ctx,
self.schema_metadata_manager.clone(),
// TODO(yingwen): expose this to frontend
1,
)
.await
{
@@ -113,6 +115,7 @@ impl<S> RegionWorkerLoop<S> {
OptionOutputTx::none(),
&region.manifest_ctx,
self.schema_metadata_manager.clone(),
1,
)
.await
{

View File

@@ -101,10 +101,10 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.version_control
.alter_schema(change_result.new_meta, &region.memtable_builder);
let version = region.version();
info!(
"Region {} is altered, schema version is {}",
region.region_id,
region.metadata().schema_version
"Region {} is altered, metadata is {:?}, options: {:?}",
region.region_id, version.metadata, version.options,
);
}

View File

@@ -477,6 +477,7 @@ pub fn column_schemas_to_defs(
.collect()
}
/// Converts a SQL alter table statement into a gRPC alter table expression.
pub(crate) fn to_alter_table_expr(
alter_table: AlterTable,
query_ctx: &QueryContextRef,
@@ -504,6 +505,8 @@ pub(crate) fn to_alter_table_expr(
.context(ExternalSnafu)?,
),
location: location.as_ref().map(From::from),
// TODO(yingwen): We don't support `IF NOT EXISTS` for `ADD COLUMN` yet.
add_if_not_exists: false,
}],
}),
AlterTableOperation::ModifyColumnType {

View File

@@ -47,6 +47,7 @@ use store_api::metric_engine_consts::{
};
use store_api::mito_engine_options::{APPEND_MODE_KEY, MERGE_MODE_KEY};
use store_api::storage::{RegionId, TableId};
use table::metadata::TableInfo;
use table::requests::{InsertRequest as TableInsertRequest, AUTO_CREATE_TABLE_KEY, TTL_KEY};
use table::table_reference::TableReference;
use table::TableRef;
@@ -58,7 +59,9 @@ use crate::error::{
use crate::expr_factory::CreateExprFactory;
use crate::region_req_factory::RegionRequestFactory;
use crate::req_convert::common::preprocess_row_insert_requests;
use crate::req_convert::insert::{ColumnToRow, RowToRegion, StatementToRegion, TableToRegion};
use crate::req_convert::insert::{
fill_reqs_with_impure_default, ColumnToRow, RowToRegion, StatementToRegion, TableToRegion,
};
use crate::statement::StatementExecutor;
pub struct Inserter {
@@ -200,18 +203,26 @@ impl Inserter {
});
validate_column_count_match(&requests)?;
let (table_name_to_ids, instant_table_ids) = self
let CreateAlterTableResult {
instant_table_ids,
table_infos,
} = self
.create_or_alter_tables_on_demand(&requests, &ctx, create_type, statement_executor)
.await?;
let name_to_info = table_infos
.values()
.map(|info| (info.name.clone(), info.clone()))
.collect::<HashMap<_, _>>();
let inserts = RowToRegion::new(
table_name_to_ids,
name_to_info,
instant_table_ids,
self.partition_manager.as_ref(),
)
.convert(requests)
.await?;
self.do_request(inserts, &ctx).await
self.do_request(inserts, &table_infos, &ctx).await
}
/// Handles row inserts request with metric engine.
@@ -236,7 +247,10 @@ impl Inserter {
.await?;
// check and create logical tables
let (table_name_to_ids, instant_table_ids) = self
let CreateAlterTableResult {
instant_table_ids,
table_infos,
} = self
.create_or_alter_tables_on_demand(
&requests,
&ctx,
@@ -244,15 +258,15 @@ impl Inserter {
statement_executor,
)
.await?;
let inserts = RowToRegion::new(
table_name_to_ids,
instant_table_ids,
&self.partition_manager,
)
.convert(requests)
.await?;
let name_to_info = table_infos
.values()
.map(|info| (info.name.clone(), info.clone()))
.collect::<HashMap<_, _>>();
let inserts = RowToRegion::new(name_to_info, instant_table_ids, &self.partition_manager)
.convert(requests)
.await?;
self.do_request(inserts, &ctx).await
self.do_request(inserts, &table_infos, &ctx).await
}
pub async fn handle_table_insert(
@@ -273,7 +287,10 @@ impl Inserter {
.convert(request)
.await?;
self.do_request(inserts, &ctx).await
let table_infos =
HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
self.do_request(inserts, &table_infos, &ctx).await
}
pub async fn handle_statement_insert(
@@ -281,12 +298,15 @@ impl Inserter {
insert: &Insert,
ctx: &QueryContextRef,
) -> Result<Output> {
let inserts =
let (inserts, table_info) =
StatementToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, ctx)
.convert(insert, ctx)
.await?;
self.do_request(inserts, ctx).await
let table_infos =
HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
self.do_request(inserts, &table_infos, ctx).await
}
}
@@ -294,8 +314,12 @@ impl Inserter {
async fn do_request(
&self,
requests: InstantAndNormalInsertRequests,
table_infos: &HashMap<TableId, Arc<TableInfo>>,
ctx: &QueryContextRef,
) -> Result<Output> {
// Fill impure default values in the request
let requests = fill_reqs_with_impure_default(table_infos, requests)?;
let write_cost = write_meter!(
ctx.current_catalog(),
ctx.current_schema(),
@@ -499,14 +523,15 @@ impl Inserter {
ctx: &QueryContextRef,
auto_create_table_type: AutoCreateTableType,
statement_executor: &StatementExecutor,
) -> Result<(HashMap<String, TableId>, HashSet<TableId>)> {
) -> Result<CreateAlterTableResult> {
let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND
.with_label_values(&[auto_create_table_type.as_str()])
.start_timer();
let catalog = ctx.current_catalog();
let schema = ctx.current_schema();
let mut table_name_to_ids = HashMap::with_capacity(requests.inserts.len());
let mut table_infos = HashMap::new();
// If `auto_create_table` hint is disabled, skip creating/altering tables.
let auto_create_table_hint = ctx
.extension(AUTO_CREATE_TABLE_KEY)
@@ -535,9 +560,13 @@ impl Inserter {
if table_info.is_ttl_instant_table() {
instant_table_ids.insert(table_info.table_id());
}
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
table_infos.insert(table_info.table_id(), table.table_info());
}
return Ok((table_name_to_ids, instant_table_ids));
let ret = CreateAlterTableResult {
instant_table_ids,
table_infos,
};
return Ok(ret);
}
let mut create_tables = vec![];
@@ -551,7 +580,7 @@ impl Inserter {
if table_info.is_ttl_instant_table() {
instant_table_ids.insert(table_info.table_id());
}
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
table_infos.insert(table_info.table_id(), table.table_info());
if let Some(alter_expr) =
self.get_alter_table_expr_on_demand(req, &table, ctx)?
{
@@ -579,7 +608,7 @@ impl Inserter {
if table_info.is_ttl_instant_table() {
instant_table_ids.insert(table_info.table_id());
}
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
table_infos.insert(table_info.table_id(), table.table_info());
}
}
if !alter_tables.is_empty() {
@@ -602,7 +631,7 @@ impl Inserter {
if table_info.is_ttl_instant_table() {
instant_table_ids.insert(table_info.table_id());
}
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
table_infos.insert(table_info.table_id(), table.table_info());
}
for alter_expr in alter_tables.into_iter() {
statement_executor
@@ -612,7 +641,10 @@ impl Inserter {
}
}
Ok((table_name_to_ids, instant_table_ids))
Ok(CreateAlterTableResult {
instant_table_ids,
table_infos,
})
}
async fn create_physical_table_on_demand(
@@ -741,6 +773,8 @@ impl Inserter {
Ok(create_table_expr)
}
/// Returns an alter table expression if it finds new columns in the request.
/// It always adds columns if not exist.
fn get_alter_table_expr_on_demand(
&self,
req: &RowInsertRequest,
@@ -872,3 +906,11 @@ fn build_create_table_expr(
) -> Result<CreateTableExpr> {
CreateExprFactory.create_table_expr_by_column_schemas(table, request_schema, engine, None)
}
/// Result of `create_or_alter_tables_on_demand`.
struct CreateAlterTableResult {
/// table ids of ttl=instant tables.
instant_table_ids: HashSet<TableId>,
/// Table Info of the created tables.
table_infos: HashMap<TableId, Arc<TableInfo>>,
}

View File

@@ -13,12 +13,14 @@
// limitations under the License.
mod column_to_row;
mod fill_impure_default;
mod row_to_region;
mod stmt_to_region;
mod table_to_region;
use api::v1::SemanticType;
pub use column_to_row::ColumnToRow;
pub use fill_impure_default::fill_reqs_with_impure_default;
pub use row_to_region::RowToRegion;
use snafu::{OptionExt, ResultExt};
pub use stmt_to_region::StatementToRegion;

View File

@@ -0,0 +1,242 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Util functions to help with fill impure default values columns in request
use std::sync::Arc;
use ahash::{HashMap, HashMapExt, HashSet};
use datatypes::schema::ColumnSchema;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{RegionId, TableId};
use table::metadata::{TableInfo, TableInfoRef};
use crate::error::{ConvertColumnDefaultConstraintSnafu, Result, UnexpectedSnafu};
use crate::expr_factory::column_schemas_to_defs;
use crate::insert::InstantAndNormalInsertRequests;
/// Find all columns that have impure default values
pub fn find_all_impure_columns(table_info: &TableInfo) -> Vec<ColumnSchema> {
let columns = table_info.meta.schema.column_schemas();
columns
.iter()
.filter(|column| column.is_default_impure())
.cloned()
.collect()
}
/// Fill impure default values in the request
pub struct ImpureDefaultFiller {
impure_columns: HashMap<String, (api::v1::ColumnSchema, Option<api::v1::Value>)>,
}
impl ImpureDefaultFiller {
pub fn new(table_info: TableInfoRef) -> Result<Self> {
let impure_column_list = find_all_impure_columns(&table_info);
let pks = &table_info.meta.primary_key_indices;
let pk_names = pks
.iter()
.map(|&i| table_info.meta.schema.column_name_by_index(i).to_string())
.collect::<Vec<_>>();
let mut impure_columns = HashMap::new();
for column in impure_column_list {
let default_value = column
.create_impure_default()
.with_context(|_| ConvertColumnDefaultConstraintSnafu {
column_name: column.name.clone(),
})?
.with_context(|| UnexpectedSnafu {
violated: format!(
"Expect default value to be impure, found {:?}",
column.default_constraint()
),
})?;
let grpc_default_value = api::helper::to_proto_value(default_value);
let def = column_schemas_to_defs(vec![column], &pk_names)?.swap_remove(0);
let grpc_column_schema = api::v1::ColumnSchema {
column_name: def.name,
datatype: def.data_type,
semantic_type: def.semantic_type,
datatype_extension: def.datatype_extension,
options: def.options,
};
impure_columns.insert(
grpc_column_schema.column_name.clone(),
(grpc_column_schema, grpc_default_value),
);
}
Ok(Self { impure_columns })
}
/// Fill impure default values in the request
pub fn fill_rows(&self, rows: &mut api::v1::Rows) {
let impure_columns_in_reqs: HashSet<_> = rows
.schema
.iter()
.filter_map(|schema| {
if self.impure_columns.contains_key(&schema.column_name) {
Some(&schema.column_name)
} else {
None
}
})
.collect();
if self.impure_columns.len() == impure_columns_in_reqs.len() {
return;
}
let (schema_append, row_append): (Vec<_>, Vec<_>) = self
.impure_columns
.iter()
.filter_map(|(name, (schema, val))| {
if !impure_columns_in_reqs.contains(name) {
Some((schema.clone(), val.clone().unwrap_or_default()))
} else {
None
}
})
.unzip();
rows.schema.extend(schema_append);
for row in rows.rows.iter_mut() {
row.values.extend_from_slice(row_append.as_slice());
}
}
}
/// Fill impure default values in the request(only for normal insert requests, since instant insert can be filled in flownode directly as a single source of truth)
pub fn fill_reqs_with_impure_default(
table_infos: &HashMap<TableId, Arc<TableInfo>>,
mut inserts: InstantAndNormalInsertRequests,
) -> Result<InstantAndNormalInsertRequests> {
let fillers = table_infos
.iter()
.map(|(table_id, table_info)| {
let table_id = *table_id;
ImpureDefaultFiller::new(table_info.clone()).map(|filler| (table_id, filler))
})
.collect::<Result<HashMap<TableId, ImpureDefaultFiller>>>()?;
let normal_inserts = &mut inserts.normal_requests;
for request in normal_inserts.requests.iter_mut() {
let region_id = RegionId::from(request.region_id);
let table_id = region_id.table_id();
let filler = fillers.get(&table_id).with_context(|| UnexpectedSnafu {
violated: format!("impure default filler for table_id: {} not found", table_id),
})?;
if let Some(rows) = &mut request.rows {
filler.fill_rows(rows);
}
}
Ok(inserts)
}
#[cfg(test)]
mod tests {
use api::v1::value::ValueData;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder};
use datatypes::value::Value;
use table::metadata::{TableInfoBuilder, TableMetaBuilder};
use super::*;
/// Create a test schema with 3 columns: `[col1 int32, ts timestampmills DEFAULT now(), col2 int32]`.
fn new_test_schema() -> Schema {
let column_schemas = vec![
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true)
.with_default_constraint(Some(datatypes::schema::ColumnDefaultConstraint::Function(
"now()".to_string(),
)))
.unwrap(),
ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true)
.with_default_constraint(Some(datatypes::schema::ColumnDefaultConstraint::Value(
Value::from(1i32),
)))
.unwrap(),
];
SchemaBuilder::try_from(column_schemas)
.unwrap()
.version(123)
.build()
.unwrap()
}
pub fn new_table_info() -> TableInfo {
let schema = Arc::new(new_test_schema());
let meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![0])
.engine("engine")
.next_column_id(3)
.build()
.unwrap();
TableInfoBuilder::default()
.table_id(10)
.table_version(5)
.name("mytable")
.meta(meta)
.build()
.unwrap()
}
fn column_schema_to_proto(
column_schema: &[ColumnSchema],
pk_names: &[String],
) -> Vec<api::v1::ColumnSchema> {
column_schemas_to_defs(column_schema.to_vec(), pk_names)
.unwrap()
.into_iter()
.map(|def| api::v1::ColumnSchema {
column_name: def.name,
datatype: def.data_type,
semantic_type: def.semantic_type,
datatype_extension: def.datatype_extension,
options: def.options,
})
.collect()
}
#[test]
fn test_impure_append() {
let row = api::v1::Row {
values: vec![api::v1::Value {
value_data: Some(ValueData::I32Value(42)),
}],
};
let schema = new_test_schema().column_schemas()[0].clone();
let col_schemas = column_schema_to_proto(&[schema], &["col1".to_string()]);
let mut rows = api::v1::Rows {
schema: col_schemas,
rows: vec![row],
};
let info = new_table_info();
let filler = ImpureDefaultFiller::new(Arc::new(info)).unwrap();
filler.fill_rows(&mut rows);
assert_eq!(rows.schema[1].column_name, "ts");
assert!(rows.schema.len() == 2 && rows.rows[0].values.len() == 2);
}
}

View File

@@ -13,30 +13,31 @@
// limitations under the License.
use ahash::{HashMap, HashSet};
use api::v1::region::InsertRequests as RegionInsertRequests;
use api::v1::region::{InsertRequest, InsertRequests as RegionInsertRequests};
use api::v1::RowInsertRequests;
use partition::manager::PartitionRuleManager;
use snafu::OptionExt;
use table::metadata::TableId;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::{TableId, TableInfoRef};
use crate::error::{Result, TableNotFoundSnafu};
use crate::insert::InstantAndNormalInsertRequests;
use crate::req_convert::common::partitioner::Partitioner;
pub struct RowToRegion<'a> {
table_name_to_ids: HashMap<String, TableId>,
tables_info: HashMap<String, TableInfoRef>,
instant_table_ids: HashSet<TableId>,
partition_manager: &'a PartitionRuleManager,
}
impl<'a> RowToRegion<'a> {
pub fn new(
table_name_to_ids: HashMap<String, TableId>,
tables_info: HashMap<String, TableInfoRef>,
instant_table_ids: HashSet<TableId>,
partition_manager: &'a PartitionRuleManager,
) -> Self {
Self {
table_name_to_ids,
tables_info,
instant_table_ids,
partition_manager,
}
@@ -49,10 +50,24 @@ impl<'a> RowToRegion<'a> {
let mut region_request = Vec::with_capacity(requests.inserts.len());
let mut instant_request = Vec::with_capacity(requests.inserts.len());
for request in requests.inserts {
let Some(rows) = request.rows else { continue };
let table_id = self.get_table_id(&request.table_name)?;
let requests = Partitioner::new(self.partition_manager)
.partition_insert_requests(table_id, request.rows.unwrap_or_default())
.await?;
let region_numbers = self.region_numbers(&request.table_name)?;
let requests = if let Some(region_id) = match region_numbers[..] {
[singular] => Some(RegionId::new(table_id, singular)),
_ => None,
} {
vec![InsertRequest {
region_id: region_id.as_u64(),
rows: Some(rows),
}]
} else {
Partitioner::new(self.partition_manager)
.partition_insert_requests(table_id, rows)
.await?
};
if self.instant_table_ids.contains(&table_id) {
instant_request.extend(requests);
} else {
@@ -71,9 +86,16 @@ impl<'a> RowToRegion<'a> {
}
fn get_table_id(&self, table_name: &str) -> Result<TableId> {
self.table_name_to_ids
self.tables_info
.get(table_name)
.cloned()
.map(|x| x.table_id())
.context(TableNotFoundSnafu { table_name })
}
fn region_numbers(&self, table_name: &str) -> Result<&Vec<RegionNumber>> {
self.tables_info
.get(table_name)
.map(|x| &x.meta.region_numbers)
.context(TableNotFoundSnafu { table_name })
}
}

View File

@@ -25,6 +25,7 @@ use snafu::{ensure, OptionExt, ResultExt};
use sql::statements;
use sql::statements::insert::Insert;
use sqlparser::ast::{ObjectName, Value as SqlValue};
use table::metadata::TableInfoRef;
use table::TableRef;
use crate::error::{
@@ -61,7 +62,7 @@ impl<'a> StatementToRegion<'a> {
&self,
stmt: &Insert,
query_ctx: &QueryContextRef,
) -> Result<InstantAndNormalInsertRequests> {
) -> Result<(InstantAndNormalInsertRequests, TableInfoRef)> {
let (catalog, schema, table_name) = self.get_full_name(stmt.table_name())?;
let table = self.get_table(&catalog, &schema, &table_name).await?;
let table_schema = table.schema();
@@ -137,15 +138,21 @@ impl<'a> StatementToRegion<'a> {
.await?;
let requests = RegionInsertRequests { requests };
if table_info.is_ttl_instant_table() {
Ok(InstantAndNormalInsertRequests {
normal_requests: Default::default(),
instant_requests: requests,
})
Ok((
InstantAndNormalInsertRequests {
normal_requests: Default::default(),
instant_requests: requests,
},
table_info,
))
} else {
Ok(InstantAndNormalInsertRequests {
normal_requests: requests,
instant_requests: Default::default(),
})
Ok((
InstantAndNormalInsertRequests {
normal_requests: requests,
instant_requests: Default::default(),
},
table_info,
))
}
}

View File

@@ -911,7 +911,7 @@ impl StatementExecutor {
let _ = table_info
.meta
.builder_with_alter_kind(table_name, &request.alter_kind, false)
.builder_with_alter_kind(table_name, &request.alter_kind)
.context(error::TableSnafu)?
.build()
.context(error::BuildTableMetaSnafu { table_name })?;

View File

@@ -80,35 +80,20 @@ impl<'a> SplitReadRowHelper<'a> {
fn split_rows(mut self) -> Result<HashMap<RegionNumber, Rows>> {
let regions = self.split_to_regions()?;
let request_splits = if regions.len() == 1 {
// fast path, zero copy
regions
.into_keys()
.map(|region_number| {
let rows = std::mem::take(&mut self.rows);
let rows = Rows {
schema: self.schema.clone(),
rows,
};
(region_number, rows)
})
.collect::<HashMap<_, _>>()
} else {
regions
.into_iter()
.map(|(region_number, row_indexes)| {
let rows = row_indexes
.into_iter()
.map(|row_idx| std::mem::take(&mut self.rows[row_idx]))
.collect();
let rows = Rows {
schema: self.schema.clone(),
rows,
};
(region_number, rows)
})
.collect::<HashMap<_, _>>()
};
let request_splits = regions
.into_iter()
.map(|(region_number, row_indexes)| {
let rows = row_indexes
.into_iter()
.map(|row_idx| std::mem::take(&mut self.rows[row_idx]))
.collect();
let rows = Rows {
schema: self.schema.clone(),
rows,
};
(region_number, rows)
})
.collect::<HashMap<_, _>>();
Ok(request_splits)
}

View File

@@ -38,7 +38,7 @@ pub async fn logs(
query_ctx.set_channel(Channel::Http);
let query_ctx = Arc::new(query_ctx);
let _timer = crate::metrics::METRIC_HTTP_LOGS_INGESTION_ELAPSED
let _timer = crate::metrics::METRIC_HTTP_LOGS_ELAPSED
.with_label_values(&[db.as_str()])
.start_timer();

View File

@@ -273,8 +273,11 @@ pub(crate) fn check(
) -> Option<Output> {
// INSERT don't need MySQL federated check. We assume the query doesn't contain
// federated or driver setup command if it starts with a 'INSERT' statement.
if query.len() > 6 && query[..6].eq_ignore_ascii_case("INSERT") {
return None;
let the_6th_index = query.char_indices().nth(6).map(|(i, _)| i);
if let Some(index) = the_6th_index {
if query[..index].eq_ignore_ascii_case("INSERT") {
return None;
}
}
// First to check the query is like "select @@variables".
@@ -295,6 +298,15 @@ mod test {
use super::*;
#[test]
fn test_check_abnormal() {
let session = Arc::new(Session::new(None, Channel::Mysql, Default::default()));
let query = "🫣一点不正常的东西🫣";
let output = check(query, QueryContext::arc(), session.clone());
assert!(output.is_none());
}
#[test]
fn test_check() {
let session = Arc::new(Session::new(None, Channel::Mysql, Default::default()));

View File

@@ -597,7 +597,8 @@ pub struct AddColumn {
impl AddColumn {
/// Returns an error if the column to add is invalid.
///
/// It allows adding existing columns.
/// It allows adding existing columns. However, the existing column must have the same metadata
/// and the location must be None.
pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
ensure!(
self.column_metadata.column_schema.is_nullable()
@@ -615,6 +616,46 @@ impl AddColumn {
}
);
if let Some(existing_column) =
metadata.column_by_name(&self.column_metadata.column_schema.name)
{
// If the column already exists.
ensure!(
*existing_column == self.column_metadata,
InvalidRegionRequestSnafu {
region_id: metadata.region_id,
err: format!(
"column {} already exists with different metadata, existing: {:?}, got: {:?}",
self.column_metadata.column_schema.name, existing_column, self.column_metadata,
),
}
);
ensure!(
self.location.is_none(),
InvalidRegionRequestSnafu {
region_id: metadata.region_id,
err: format!(
"column {} already exists, but location is specified",
self.column_metadata.column_schema.name
),
}
);
}
if let Some(existing_column) = metadata.column_by_id(self.column_metadata.column_id) {
// Ensures the existing column has the same name.
ensure!(
existing_column.column_schema.name == self.column_metadata.column_schema.name,
InvalidRegionRequestSnafu {
region_id: metadata.region_id,
err: format!(
"column id {} already exists with different name {}",
self.column_metadata.column_id, existing_column.column_schema.name
),
}
);
}
Ok(())
}
@@ -1008,6 +1049,8 @@ mod tests {
);
}
/// Returns a new region metadata for testing. Metadata:
/// `[(ts, ms, 1), (tag_0, string, 2), (field_0, string, 3), (field_1, bool, 4)]`
fn new_metadata() -> RegionMetadata {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
builder
@@ -1062,7 +1105,7 @@ mod tests {
true,
),
semantic_type: SemanticType::Tag,
column_id: 4,
column_id: 5,
},
location: None,
};
@@ -1078,7 +1121,7 @@ mod tests {
false,
),
semantic_type: SemanticType::Tag,
column_id: 4,
column_id: 5,
},
location: None,
}
@@ -1094,7 +1137,7 @@ mod tests {
true,
),
semantic_type: SemanticType::Tag,
column_id: 4,
column_id: 2,
},
location: None,
};
@@ -1114,7 +1157,7 @@ mod tests {
true,
),
semantic_type: SemanticType::Tag,
column_id: 4,
column_id: 5,
},
location: None,
},
@@ -1126,7 +1169,7 @@ mod tests {
true,
),
semantic_type: SemanticType::Field,
column_id: 5,
column_id: 6,
},
location: None,
},
@@ -1137,6 +1180,82 @@ mod tests {
assert!(kind.need_alter(&metadata));
}
#[test]
fn test_add_existing_column_different_metadata() {
let metadata = new_metadata();
// Add existing column with different id.
let kind = AlterKind::AddColumns {
columns: vec![AddColumn {
column_metadata: ColumnMetadata {
column_schema: ColumnSchema::new(
"tag_0",
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 4,
},
location: None,
}],
};
kind.validate(&metadata).unwrap_err();
// Add existing column with different type.
let kind = AlterKind::AddColumns {
columns: vec![AddColumn {
column_metadata: ColumnMetadata {
column_schema: ColumnSchema::new(
"tag_0",
ConcreteDataType::int64_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 2,
},
location: None,
}],
};
kind.validate(&metadata).unwrap_err();
// Add existing column with different name.
let kind = AlterKind::AddColumns {
columns: vec![AddColumn {
column_metadata: ColumnMetadata {
column_schema: ColumnSchema::new(
"tag_1",
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 2,
},
location: None,
}],
};
kind.validate(&metadata).unwrap_err();
}
#[test]
fn test_add_existing_column_with_location() {
let metadata = new_metadata();
let kind = AlterKind::AddColumns {
columns: vec![AddColumn {
column_metadata: ColumnMetadata {
column_schema: ColumnSchema::new(
"tag_0",
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 2,
},
location: Some(AddColumnLocation::First),
}],
};
kind.validate(&metadata).unwrap_err();
}
#[test]
fn test_validate_drop_column() {
let metadata = new_metadata();
@@ -1235,19 +1354,19 @@ mod tests {
true,
),
semantic_type: SemanticType::Tag,
column_id: 4,
column_id: 5,
},
location: None,
},
AddColumn {
column_metadata: ColumnMetadata {
column_schema: ColumnSchema::new(
"field_1",
"field_2",
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Field,
column_id: 5,
column_id: 6,
},
location: None,
},

View File

@@ -194,12 +194,9 @@ impl TableMeta {
&self,
table_name: &str,
alter_kind: &AlterKind,
add_if_not_exists: bool,
) -> Result<TableMetaBuilder> {
match alter_kind {
AlterKind::AddColumns { columns } => {
self.add_columns(table_name, columns, add_if_not_exists)
}
AlterKind::AddColumns { columns } => self.add_columns(table_name, columns),
AlterKind::DropColumns { names } => self.remove_columns(table_name, names),
AlterKind::ModifyColumnTypes { columns } => {
self.modify_column_types(table_name, columns)
@@ -340,6 +337,7 @@ impl TableMeta {
Ok(meta_builder)
}
// TODO(yingwen): Remove this.
/// Allocate a new column for the table.
///
/// This method would bump the `next_column_id` of the meta.
@@ -384,11 +382,11 @@ impl TableMeta {
builder
}
// TODO(yingwen): Tests add if not exists.
fn add_columns(
&self,
table_name: &str,
requests: &[AddColumnRequest],
add_if_not_exists: bool,
) -> Result<TableMetaBuilder> {
let table_schema = &self.schema;
let mut meta_builder = self.new_meta_builder();
@@ -396,63 +394,61 @@ impl TableMeta {
self.primary_key_indices.iter().collect();
let mut names = HashSet::with_capacity(requests.len());
let mut new_requests = Vec::with_capacity(requests.len());
let requests = if add_if_not_exists {
for col_to_add in requests {
if let Some(column_schema) =
table_schema.column_schema_by_name(&col_to_add.column_schema.name)
{
// If the column already exists, we should check if the type is the same.
ensure!(
column_schema.data_type == col_to_add.column_schema.data_type,
error::InvalidAlterRequestSnafu {
table: table_name,
err: format!(
"column {} already exists with different type",
col_to_add.column_schema.name
),
}
);
} else {
new_requests.push(col_to_add.clone());
}
}
&new_requests[..]
} else {
requests
};
let mut new_columns = Vec::with_capacity(requests.len());
for col_to_add in requests {
ensure!(
names.insert(&col_to_add.column_schema.name),
error::InvalidAlterRequestSnafu {
table: table_name,
err: format!(
"add column {} more than once",
col_to_add.column_schema.name
),
}
);
if let Some(column_schema) =
table_schema.column_schema_by_name(&col_to_add.column_schema.name)
{
// If the column already exists.
ensure!(
col_to_add.add_if_not_exists,
error::ColumnExistsSnafu {
table_name,
column_name: &col_to_add.column_schema.name
},
);
ensure!(
!table_schema.contains_column(&col_to_add.column_schema.name),
error::ColumnExistsSnafu {
table_name,
column_name: col_to_add.column_schema.name.to_string()
},
);
// Checks if the type is the same
ensure!(
column_schema.data_type == col_to_add.column_schema.data_type,
error::InvalidAlterRequestSnafu {
table: table_name,
err: format!(
"column {} already exists with different type {:?}",
col_to_add.column_schema.name, column_schema.data_type,
),
}
);
} else {
// A new column.
// Ensures we only add a column once.
ensure!(
names.insert(&col_to_add.column_schema.name),
error::InvalidAlterRequestSnafu {
table: table_name,
err: format!(
"add column {} more than once",
col_to_add.column_schema.name
),
}
);
ensure!(
col_to_add.column_schema.is_nullable()
|| col_to_add.column_schema.default_constraint().is_some(),
error::InvalidAlterRequestSnafu {
table: table_name,
err: format!(
"no default value for column {}",
col_to_add.column_schema.name
),
},
);
ensure!(
col_to_add.column_schema.is_nullable()
|| col_to_add.column_schema.default_constraint().is_some(),
error::InvalidAlterRequestSnafu {
table: table_name,
err: format!(
"no default value for column {}",
col_to_add.column_schema.name
),
},
);
new_columns.push(col_to_add.clone());
}
}
let requests = &new_columns[..];
let SplitResult {
columns_at_first,
@@ -881,6 +877,7 @@ pub struct RawTableMeta {
pub value_indices: Vec<usize>,
/// Engine type of this table. Usually in small case.
pub engine: String,
/// Next column id of a new column.
/// Deprecated. See https://github.com/GreptimeTeam/greptimedb/issues/2982
pub next_column_id: ColumnId,
pub region_numbers: Vec<u32>,
@@ -1078,6 +1075,7 @@ mod tests {
use super::*;
/// Create a test schema with 3 columns: `[col1 int32, ts timestampmills, col2 int32]`.
fn new_test_schema() -> Schema {
let column_schemas = vec![
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
@@ -1129,17 +1127,19 @@ mod tests {
column_schema: new_tag,
is_key: true,
location: None,
add_if_not_exists: false,
},
AddColumnRequest {
column_schema: new_field,
is_key: false,
location: None,
add_if_not_exists: false,
},
],
};
let builder = meta
.builder_with_alter_kind("my_table", &alter_kind, false)
.builder_with_alter_kind("my_table", &alter_kind)
.unwrap();
builder.build().unwrap()
}
@@ -1157,6 +1157,7 @@ mod tests {
column_schema: new_tag,
is_key: true,
location: Some(AddColumnLocation::First),
add_if_not_exists: false,
},
AddColumnRequest {
column_schema: new_field,
@@ -1164,12 +1165,13 @@ mod tests {
location: Some(AddColumnLocation::After {
column_name: "ts".to_string(),
}),
add_if_not_exists: false,
},
],
};
let builder = meta
.builder_with_alter_kind("my_table", &alter_kind, false)
.builder_with_alter_kind("my_table", &alter_kind)
.unwrap();
builder.build().unwrap()
}
@@ -1199,6 +1201,48 @@ mod tests {
assert_eq!(&[1, 2, 4], &new_meta.value_indices[..]);
}
#[test]
fn test_add_columns_multiple_times() {
let schema = Arc::new(new_test_schema());
let meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![0])
.engine("engine")
.next_column_id(3)
.build()
.unwrap();
let alter_kind = AlterKind::AddColumns {
columns: vec![
AddColumnRequest {
column_schema: ColumnSchema::new(
"col3",
ConcreteDataType::int32_datatype(),
true,
),
is_key: true,
location: None,
add_if_not_exists: true,
},
AddColumnRequest {
column_schema: ColumnSchema::new(
"col3",
ConcreteDataType::int32_datatype(),
true,
),
is_key: true,
location: None,
add_if_not_exists: true,
},
],
};
let err = meta
.builder_with_alter_kind("my_table", &alter_kind)
.err()
.unwrap();
assert_eq!(StatusCode::InvalidArguments, err.status_code());
}
#[test]
fn test_remove_columns() {
let schema = Arc::new(new_test_schema());
@@ -1216,7 +1260,7 @@ mod tests {
names: vec![String::from("col2"), String::from("my_field")],
};
let new_meta = meta
.builder_with_alter_kind("my_table", &alter_kind, false)
.builder_with_alter_kind("my_table", &alter_kind)
.unwrap()
.build()
.unwrap();
@@ -1271,7 +1315,7 @@ mod tests {
names: vec![String::from("col3"), String::from("col1")],
};
let new_meta = meta
.builder_with_alter_kind("my_table", &alter_kind, false)
.builder_with_alter_kind("my_table", &alter_kind)
.unwrap()
.build()
.unwrap();
@@ -1307,14 +1351,62 @@ mod tests {
column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
is_key: false,
location: None,
add_if_not_exists: false,
}],
};
let err = meta
.builder_with_alter_kind("my_table", &alter_kind, false)
.builder_with_alter_kind("my_table", &alter_kind)
.err()
.unwrap();
assert_eq!(StatusCode::TableColumnExists, err.status_code());
// Add if not exists
let alter_kind = AlterKind::AddColumns {
columns: vec![AddColumnRequest {
column_schema: ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
is_key: true,
location: None,
add_if_not_exists: true,
}],
};
let new_meta = meta
.builder_with_alter_kind("my_table", &alter_kind)
.unwrap()
.build()
.unwrap();
assert_eq!(
meta.schema.column_schemas(),
new_meta.schema.column_schemas()
);
assert_eq!(meta.schema.version() + 1, new_meta.schema.version());
}
#[test]
fn test_add_different_type_column() {
let schema = Arc::new(new_test_schema());
let meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![0])
.engine("engine")
.next_column_id(3)
.build()
.unwrap();
// Add if not exists, but different type.
let alter_kind = AlterKind::AddColumns {
columns: vec![AddColumnRequest {
column_schema: ColumnSchema::new("col1", ConcreteDataType::string_datatype(), true),
is_key: false,
location: None,
add_if_not_exists: true,
}],
};
let err = meta
.builder_with_alter_kind("my_table", &alter_kind)
.err()
.unwrap();
assert_eq!(StatusCode::InvalidArguments, err.status_code());
}
#[test]
@@ -1328,6 +1420,7 @@ mod tests {
.build()
.unwrap();
// Not nullable and no default value.
let alter_kind = AlterKind::AddColumns {
columns: vec![AddColumnRequest {
column_schema: ColumnSchema::new(
@@ -1337,11 +1430,12 @@ mod tests {
),
is_key: false,
location: None,
add_if_not_exists: false,
}],
};
let err = meta
.builder_with_alter_kind("my_table", &alter_kind, false)
.builder_with_alter_kind("my_table", &alter_kind)
.err()
.unwrap();
assert_eq!(StatusCode::InvalidArguments, err.status_code());
@@ -1363,7 +1457,7 @@ mod tests {
};
let err = meta
.builder_with_alter_kind("my_table", &alter_kind, false)
.builder_with_alter_kind("my_table", &alter_kind)
.err()
.unwrap();
assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
@@ -1388,7 +1482,7 @@ mod tests {
};
let err = meta
.builder_with_alter_kind("my_table", &alter_kind, false)
.builder_with_alter_kind("my_table", &alter_kind)
.err()
.unwrap();
assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
@@ -1411,7 +1505,7 @@ mod tests {
};
let err = meta
.builder_with_alter_kind("my_table", &alter_kind, false)
.builder_with_alter_kind("my_table", &alter_kind)
.err()
.unwrap();
assert_eq!(StatusCode::InvalidArguments, err.status_code());
@@ -1422,7 +1516,7 @@ mod tests {
};
let err = meta
.builder_with_alter_kind("my_table", &alter_kind, false)
.builder_with_alter_kind("my_table", &alter_kind)
.err()
.unwrap();
assert_eq!(StatusCode::InvalidArguments, err.status_code());
@@ -1448,7 +1542,7 @@ mod tests {
};
let err = meta
.builder_with_alter_kind("my_table", &alter_kind, false)
.builder_with_alter_kind("my_table", &alter_kind)
.err()
.unwrap();
assert_eq!(StatusCode::InvalidArguments, err.status_code());
@@ -1462,7 +1556,7 @@ mod tests {
};
let err = meta
.builder_with_alter_kind("my_table", &alter_kind, false)
.builder_with_alter_kind("my_table", &alter_kind)
.err()
.unwrap();
assert_eq!(StatusCode::InvalidArguments, err.status_code());
@@ -1531,7 +1625,7 @@ mod tests {
options: FulltextOptions::default(),
};
let err = meta
.builder_with_alter_kind("my_table", &alter_kind, false)
.builder_with_alter_kind("my_table", &alter_kind)
.err()
.unwrap();
assert_eq!(
@@ -1552,7 +1646,7 @@ mod tests {
},
};
let new_meta = new_meta
.builder_with_alter_kind("my_table", &alter_kind, false)
.builder_with_alter_kind("my_table", &alter_kind)
.unwrap()
.build()
.unwrap();
@@ -1572,7 +1666,7 @@ mod tests {
column_name: "my_tag_first".to_string(),
};
let new_meta = new_meta
.builder_with_alter_kind("my_table", &alter_kind, false)
.builder_with_alter_kind("my_table", &alter_kind)
.unwrap()
.build()
.unwrap();

View File

@@ -185,6 +185,8 @@ pub struct AddColumnRequest {
pub column_schema: ColumnSchema,
pub is_key: bool,
pub location: Option<AddColumnLocation>,
/// Add column if not exists.
pub add_if_not_exists: bool,
}
/// Change column datatype request

View File

@@ -45,6 +45,7 @@ flow.workspace = true
frontend = { workspace = true, features = ["testing"] }
futures.workspace = true
futures-util.workspace = true
log-query = { workspace = true }
loki-api = "0.1"
meta-client.workspace = true
meta-srv = { workspace = true, features = ["mock"] }

View File

@@ -47,13 +47,10 @@ To run the integration test, please copy `.env.example` to `.env` in the project
GT_KAFKA_ENDPOINTS = localhost:9092
```
### Setup kafka standalone
### Setup kafka standalone
```
cd tests-integration/fixtures/kafka
cd tests-integration/fixtures
docker compose -f docker-compose-standalone.yml up
docker compose -f docker-compose-standalone.yml up kafka
```

View File

@@ -0,0 +1,72 @@
services:
zookeeper:
image: docker.io/bitnami/zookeeper:3.7
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: docker.io/bitnami/kafka:3.6.0
container_name: kafka
ports:
- 9092:9092
- 9093:9093
environment:
# KRaft settings
KAFKA_CFG_NODE_ID: "1"
KAFKA_CFG_PROCESS_ROLES: broker,controller
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@127.0.0.1:2181
# Listeners
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:9092,SECURE://localhost:9093
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SECURE:SASL_PLAINTEXT
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:2181,SECURE://:9093
ALLOW_PLAINTEXT_LISTENER: "yes"
KAFKA_BROKER_ID: "1"
KAFKA_CLIENT_USERS: "user_kafka"
KAFKA_CLIENT_PASSWORDS: "secret"
depends_on:
zookeeper:
condition: service_started
etcd:
image: docker.io/bitnami/etcd:3.5
ports:
- "2379:2379"
- "2380:2380"
environment:
ALLOW_NONE_AUTHENTICATION: "yes"
ETCD_NAME: etcd
ETCD_LISTEN_CLIENT_URLS: http://0.0.0.0:2379
ETCD_ADVERTISE_CLIENT_URLS: http://etcd:2379
ETCD_MAX_REQUEST_BYTES: 10485760
minio:
image: docker.io/bitnami/minio:2024
ports:
- '9000:9000'
- '9001:9001'
environment:
- MINIO_ROOT_USER=superpower_ci_user
- MINIO_ROOT_PASSWORD=superpower_password
- MINIO_DEFAULT_BUCKETS=greptime
- BITNAMI_DEBUG=true
volumes:
- 'minio_data:/bitnami/minio/data'
postgres:
image: docker.io/postgres:14-alpine
ports:
- 5432:5432
volumes:
- ~/apps/postgres:/var/lib/postgresql/data
environment:
- POSTGRES_USER=greptimedb
- POSTGRES_DB=postgres
- POSTGRES_PASSWORD=admin
volumes:
minio_data:
driver: local

View File

@@ -1,13 +0,0 @@
version: '3.8'
services:
etcd:
image: ghcr.io/zcube/bitnami-compat/etcd:3.5
ports:
- "2379:2379"
- "2380:2380"
environment:
ALLOW_NONE_AUTHENTICATION: "yes"
ETCD_NAME: etcd
ETCD_LISTEN_CLIENT_URLS: http://0.0.0.0:2379
ETCD_ADVERTISE_CLIENT_URLS: http://etcd:2379
ETCD_MAX_REQUEST_BYTES: 10485760

View File

@@ -1,19 +0,0 @@
## Starts a standalone kafka
```bash
docker compose -f docker-compose-standalone.yml up kafka -d
```
## Lists running services
```bash
docker compose -f docker-compose-standalone.yml ps
```
## Stops the standalone kafka
```bash
docker compose -f docker-compose-standalone.yml stop kafka
```
## Stops and removes the standalone kafka
```bash
docker compose -f docker-compose-standalone.yml down kafka
```

View File

@@ -1,28 +0,0 @@
version: '3.8'
services:
zookeeper:
image: bitnami/zookeeper:3.7
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: bitnami/kafka:3.6.0
container_name: kafka
ports:
- 9092:9092
- 9093:9093
environment:
# KRaft settings
KAFKA_CFG_NODE_ID: "1"
KAFKA_CFG_PROCESS_ROLES: broker,controller
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@127.0.0.1:2181
# Listeners
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:9092,SECURE://localhost:9093
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SECURE:SASL_PLAINTEXT
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:2181,SECURE://:9093
ALLOW_PLAINTEXT_LISTENER: "yes"
KAFKA_BROKER_ID: "1"
KAFKA_CLIENT_USERS: "user_kafka"
KAFKA_CLIENT_PASSWORDS: "secret"

View File

@@ -1,18 +0,0 @@
version: '3.8'
services:
minio:
image: bitnami/minio:2024
ports:
- '9000:9000'
- '9001:9001'
environment:
- MINIO_ROOT_USER=superpower_ci_user
- MINIO_ROOT_PASSWORD=superpower_password
- MINIO_DEFAULT_BUCKETS=greptime
- BITNAMI_DEBUG=true
volumes:
- 'minio_data:/bitnami/minio/data'
volumes:
minio_data:
driver: local

View File

@@ -1,12 +0,0 @@
version: '3.9'
services:
postgres:
image: postgres:14-alpine
ports:
- 5432:5432
volumes:
- ~/apps/postgres:/var/lib/postgresql/data
environment:
- POSTGRES_USER=greptimedb
- POSTGRES_DB=postgres
- POSTGRES_PASSWORD=admin

View File

@@ -66,12 +66,190 @@ mod test {
test_handle_ddl_request(instance.as_ref()).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_distributed_handle_multi_ddl_request() {
common_telemetry::init_default_ut_logging();
let instance =
tests::create_distributed_instance("test_distributed_handle_multi_ddl_request").await;
test_handle_multi_ddl_request(instance.frontend().as_ref()).await;
verify_table_is_dropped(&instance).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_standalone_handle_multi_ddl_request() {
let standalone =
GreptimeDbStandaloneBuilder::new("test_standalone_handle_multi_ddl_request")
.build()
.await;
let instance = &standalone.instance;
test_handle_multi_ddl_request(instance.as_ref()).await;
}
async fn query(instance: &Instance, request: Request) -> Output {
GrpcQueryHandler::do_query(instance, request, QueryContext::arc())
.await
.unwrap()
}
async fn test_handle_multi_ddl_request(instance: &Instance) {
let request = Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr {
catalog_name: "greptime".to_string(),
schema_name: "database_created_through_grpc".to_string(),
create_if_not_exists: true,
options: Default::default(),
})),
});
let output = query(instance, request).await;
assert!(matches!(output.data, OutputData::AffectedRows(1)));
let request = Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateTable(CreateTableExpr {
catalog_name: "greptime".to_string(),
schema_name: "database_created_through_grpc".to_string(),
table_name: "table_created_through_grpc".to_string(),
column_defs: vec![
ColumnDef {
name: "a".to_string(),
data_type: ColumnDataType::String as _,
is_nullable: true,
default_constraint: vec![],
semantic_type: SemanticType::Field as i32,
..Default::default()
},
ColumnDef {
name: "ts".to_string(),
data_type: ColumnDataType::TimestampMillisecond as _,
is_nullable: false,
default_constraint: vec![],
semantic_type: SemanticType::Timestamp as i32,
..Default::default()
},
],
time_index: "ts".to_string(),
engine: MITO_ENGINE.to_string(),
..Default::default()
})),
});
let output = query(instance, request).await;
assert!(matches!(output.data, OutputData::AffectedRows(0)));
let request = Request::Ddl(DdlRequest {
expr: Some(DdlExpr::AlterTable(AlterTableExpr {
catalog_name: "greptime".to_string(),
schema_name: "database_created_through_grpc".to_string(),
table_name: "table_created_through_grpc".to_string(),
kind: Some(alter_table_expr::Kind::AddColumns(AddColumns {
add_columns: vec![
AddColumn {
column_def: Some(ColumnDef {
name: "b".to_string(),
data_type: ColumnDataType::Int32 as _,
is_nullable: true,
default_constraint: vec![],
semantic_type: SemanticType::Field as i32,
..Default::default()
}),
location: None,
add_if_not_exists: true,
},
AddColumn {
column_def: Some(ColumnDef {
name: "a".to_string(),
data_type: ColumnDataType::String as _,
is_nullable: true,
default_constraint: vec![],
semantic_type: SemanticType::Field as i32,
..Default::default()
}),
location: None,
add_if_not_exists: true,
},
],
})),
})),
});
let output = query(instance, request).await;
assert!(matches!(output.data, OutputData::AffectedRows(0)));
let request = Request::Ddl(DdlRequest {
expr: Some(DdlExpr::AlterTable(AlterTableExpr {
catalog_name: "greptime".to_string(),
schema_name: "database_created_through_grpc".to_string(),
table_name: "table_created_through_grpc".to_string(),
kind: Some(alter_table_expr::Kind::AddColumns(AddColumns {
add_columns: vec![
AddColumn {
column_def: Some(ColumnDef {
name: "c".to_string(),
data_type: ColumnDataType::Int32 as _,
is_nullable: true,
default_constraint: vec![],
semantic_type: SemanticType::Field as i32,
..Default::default()
}),
location: None,
add_if_not_exists: true,
},
AddColumn {
column_def: Some(ColumnDef {
name: "d".to_string(),
data_type: ColumnDataType::Int32 as _,
is_nullable: true,
default_constraint: vec![],
semantic_type: SemanticType::Field as i32,
..Default::default()
}),
location: None,
add_if_not_exists: true,
},
],
})),
})),
});
let output = query(instance, request).await;
assert!(matches!(output.data, OutputData::AffectedRows(0)));
let request = Request::Query(QueryRequest {
query: Some(Query::Sql("INSERT INTO database_created_through_grpc.table_created_through_grpc (a, b, c, d, ts) VALUES ('s', 1, 1, 1, 1672816466000)".to_string()))
});
let output = query(instance, request).await;
assert!(matches!(output.data, OutputData::AffectedRows(1)));
let request = Request::Query(QueryRequest {
query: Some(Query::Sql(
"SELECT ts, a, b FROM database_created_through_grpc.table_created_through_grpc"
.to_string(),
)),
});
let output = query(instance, request).await;
let OutputData::Stream(stream) = output.data else {
unreachable!()
};
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+---------------------+---+---+
| ts | a | b |
+---------------------+---+---+
| 2023-01-04T07:14:26 | s | 1 |
+---------------------+---+---+";
assert_eq!(recordbatches.pretty_print().unwrap(), expected);
let request = Request::Ddl(DdlRequest {
expr: Some(DdlExpr::DropTable(DropTableExpr {
catalog_name: "greptime".to_string(),
schema_name: "database_created_through_grpc".to_string(),
table_name: "table_created_through_grpc".to_string(),
..Default::default()
})),
});
let output = query(instance, request).await;
assert!(matches!(output.data, OutputData::AffectedRows(0)));
}
async fn test_handle_ddl_request(instance: &Instance) {
let request = Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr {
@@ -131,6 +309,7 @@ mod test {
..Default::default()
}),
location: None,
add_if_not_exists: false,
}],
})),
})),

View File

@@ -394,6 +394,7 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router
ServerSqlQueryHandlerAdapter::arc(instance.instance.clone()),
None,
)
.with_logs_handler(instance.instance.clone())
.with_metrics_handler(MetricsHandler)
.with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap())
.build();
@@ -429,6 +430,7 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider(
Some(instance.instance.clone()),
)
.with_log_ingest_handler(instance.instance.clone(), None, None)
.with_logs_handler(instance.instance.clone())
.with_otlp_handler(instance.instance.clone())
.with_greptime_config_options(instance.opts.to_toml().unwrap());
@@ -467,6 +469,7 @@ pub async fn setup_test_prom_app_with_frontend(
ServerSqlQueryHandlerAdapter::arc(frontend_ref.clone()),
Some(frontend_ref.clone()),
)
.with_logs_handler(instance.instance.clone())
.with_prom_handler(frontend_ref.clone(), true, is_strict_mode)
.with_prometheus_handler(frontend_ref)
.with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap())

View File

@@ -372,6 +372,7 @@ pub async fn test_insert_and_select(store_type: StorageType) {
add_columns: vec![AddColumn {
column_def: Some(add_column),
location: None,
add_if_not_exists: false,
}],
});
let expr = AlterTableExpr {

View File

@@ -22,6 +22,7 @@ use axum::http::{HeaderName, HeaderValue, StatusCode};
use common_error::status_code::StatusCode as ErrorCode;
use flate2::write::GzEncoder;
use flate2::Compression;
use log_query::{ColumnFilters, Context, Limit, LogQuery, TimeFilter};
use loki_api::logproto::{EntryAdapter, PushRequest, StreamAdapter};
use loki_api::prost_types::Timestamp;
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
@@ -40,6 +41,7 @@ use servers::http::result::influxdb_result_v1::{InfluxdbOutput, InfluxdbV1Respon
use servers::http::test_helpers::{TestClient, TestResponse};
use servers::http::GreptimeQueryOutput;
use servers::prom_store;
use table::table_name::TableName;
use tests_integration::test_util::{
setup_test_http_app, setup_test_http_app_with_frontend,
setup_test_http_app_with_frontend_and_user_provider, setup_test_prom_app_with_frontend,
@@ -97,6 +99,7 @@ macro_rules! http_tests {
test_otlp_traces,
test_otlp_logs,
test_loki_logs,
test_log_query,
);
)*
};
@@ -946,7 +949,7 @@ create_on_flush = "auto"
create_on_compaction = "auto"
apply_on_query = "auto"
mem_threshold_on_create = "auto"
content_cache_page_size = "8MiB"
content_cache_page_size = "64KiB"
[region_engine.mito.fulltext_index]
create_on_flush = "auto"
@@ -1882,6 +1885,68 @@ pub async fn test_loki_logs(store_type: StorageType) {
guard.remove_all().await;
}
pub async fn test_log_query(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_log_query").await;
let client = TestClient::new(app);
// prepare data with SQL API
let res = client
.get("/v1/sql?sql=create table logs (`ts` timestamp time index, message string);")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK, "{:?}", res.text().await);
let res = client
.post("/v1/sql?sql=insert into logs values ('2024-11-07 10:53:50', 'hello');")
.header("Content-Type", "application/x-www-form-urlencoded")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK, "{:?}", res.text().await);
// test log query
let log_query = LogQuery {
table: TableName {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: "logs".to_string(),
},
time_filter: TimeFilter {
start: Some("2024-11-07".to_string()),
end: None,
span: None,
},
limit: Limit {
skip: None,
fetch: Some(1),
},
columns: vec![
ColumnFilters {
column_name: "ts".to_string(),
filters: vec![],
},
ColumnFilters {
column_name: "message".to_string(),
filters: vec![],
},
],
context: Context::None,
};
let res = client
.post("/v1/logs")
.header("Content-Type", "application/json")
.body(serde_json::to_string(&log_query).unwrap())
.send()
.await;
assert_eq!(res.status(), StatusCode::OK, "{:?}", res.text().await);
let resp = res.text().await;
let v = get_rows_from_output(&resp);
assert_eq!(v, "[[1730976830000,\"hello\"]]");
guard.remove_all().await;
}
async fn validate_data(test_name: &str, client: &TestClient, sql: &str, expected: &str) {
let res = client
.get(format!("/v1/sql?sql={sql}").as_str())

View File

@@ -32,7 +32,7 @@ INSERT INTO test VALUES ('hello', '2020-01-01 00:00:00'),
Affected Rows: 4
SELECT * FROM test WHERE MATCHES(message, 'hello');
SELECT * FROM test WHERE MATCHES(message, 'hello') ORDER BY message;
+-------------+---------------------+
| message | time |
@@ -46,7 +46,7 @@ ALTER TABLE test MODIFY COLUMN message SET FULLTEXT WITH(analyzer = 'Chinese', c
Affected Rows: 0
SELECT * FROM test WHERE MATCHES(message, 'hello');
SELECT * FROM test WHERE MATCHES(message, 'hello') ORDER BY message;
+-------------+---------------------+
| message | time |
@@ -63,15 +63,15 @@ INSERT INTO test VALUES ('hello NiKo', '2020-01-03 00:00:00'),
Affected Rows: 4
SELECT * FROM test WHERE MATCHES(message, 'hello');
SELECT * FROM test WHERE MATCHES(message, 'hello') ORDER BY message;
+-------------+---------------------+
| message | time |
+-------------+---------------------+
| hello NiKo | 2020-01-03T00:00:00 |
| NiKo hello | 2020-01-03T00:00:01 |
| hello hello | 2020-01-04T00:00:00 |
| hello | 2020-01-01T00:00:00 |
| hello NiKo | 2020-01-03T00:00:00 |
| hello hello | 2020-01-04T00:00:00 |
| hello world | 2020-01-02T00:00:00 |
| world hello | 2020-01-02T00:00:01 |
+-------------+---------------------+

View File

@@ -13,18 +13,18 @@ INSERT INTO test VALUES ('hello', '2020-01-01 00:00:00'),
('hello world', '2020-01-02 00:00:00'),
('world hello', '2020-01-02 00:00:01');
SELECT * FROM test WHERE MATCHES(message, 'hello');
SELECT * FROM test WHERE MATCHES(message, 'hello') ORDER BY message;
ALTER TABLE test MODIFY COLUMN message SET FULLTEXT WITH(analyzer = 'Chinese', case_sensitive = 'true');
SELECT * FROM test WHERE MATCHES(message, 'hello');
SELECT * FROM test WHERE MATCHES(message, 'hello') ORDER BY message;
INSERT INTO test VALUES ('hello NiKo', '2020-01-03 00:00:00'),
('NiKo hello', '2020-01-03 00:00:01'),
('hello hello', '2020-01-04 00:00:00'),
('NiKo, NiKo', '2020-01-04 00:00:01');
SELECT * FROM test WHERE MATCHES(message, 'hello');
SELECT * FROM test WHERE MATCHES(message, 'hello') ORDER BY message;
-- SQLNESS ARG restart=true
SHOW CREATE TABLE test;

View File

@@ -0,0 +1,107 @@
-- test if flow can get table schema correctly after table have been altered
CREATE TABLE bytes_log (
byte INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-- event time
TIME INDEX(ts)
);
Affected Rows: 0
CREATE TABLE approx_rate (
rate DOUBLE,
time_window TIMESTAMP,
update_at TIMESTAMP,
TIME INDEX(time_window)
);
Affected Rows: 0
-- make both src&sink table in cache of flownode by using them
CREATE FLOW find_approx_rate SINK TO approx_rate AS
SELECT
(max(byte) - min(byte)) / 30.0 as rate,
date_bin(INTERVAL '30 second', ts) as time_window
from
bytes_log
GROUP BY
time_window;
Affected Rows: 0
SHOW CREATE FLOW find_approx_rate;
+------------------+-----------------------------------------------------------------------------------------------------------------------------------------+
| Flow | Create Flow |
+------------------+-----------------------------------------------------------------------------------------------------------------------------------------+
| find_approx_rate | CREATE FLOW IF NOT EXISTS find_approx_rate |
| | SINK TO approx_rate |
| | AS SELECT (max(byte) - min(byte)) / 30.0 AS rate, date_bin(INTERVAL '30 second', ts) AS time_window FROM bytes_log GROUP BY time_window |
+------------------+-----------------------------------------------------------------------------------------------------------------------------------------+
DROP FLOW find_approx_rate;
Affected Rows: 0
ALTER TABLE bytes_log ADD COLUMN stat INT DEFAULT 200 AFTER byte;
Affected Rows: 0
ALTER TABLE approx_rate ADD COLUMN sample_cnt INT64 DEFAULT 0 AFTER rate;
Affected Rows: 0
CREATE FLOW find_approx_rate SINK TO approx_rate AS
SELECT
(max(byte) - min(byte)) / 30.0 as rate,
count(byte) as sample_cnt,
date_bin(INTERVAL '30 second', ts) as time_window
from
bytes_log
GROUP BY
time_window;
Affected Rows: 0
INSERT INTO
bytes_log
VALUES
(0, 200, '2023-01-01 00:00:01'),
(300,200, '2023-01-01 00:00:29');
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
+--------------------------------------+
| ADMIN FLUSH_FLOW('find_approx_rate') |
+--------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------+
SELECT
rate,
sample_cnt,
time_window
FROM
approx_rate;
+------+------------+---------------------+
| rate | sample_cnt | time_window |
+------+------------+---------------------+
| 10.0 | 2 | 2023-01-01T00:00:00 |
+------+------------+---------------------+
DROP TABLE bytes_log;
Affected Rows: 0
DROP FLOW find_approx_rate;
Affected Rows: 0
DROP TABLE approx_rate;
Affected Rows: 0

View File

@@ -0,0 +1,64 @@
-- test if flow can get table schema correctly after table have been altered
CREATE TABLE bytes_log (
byte INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-- event time
TIME INDEX(ts)
);
CREATE TABLE approx_rate (
rate DOUBLE,
time_window TIMESTAMP,
update_at TIMESTAMP,
TIME INDEX(time_window)
);
-- make both src&sink table in cache of flownode by using them
CREATE FLOW find_approx_rate SINK TO approx_rate AS
SELECT
(max(byte) - min(byte)) / 30.0 as rate,
date_bin(INTERVAL '30 second', ts) as time_window
from
bytes_log
GROUP BY
time_window;
SHOW CREATE FLOW find_approx_rate;
DROP FLOW find_approx_rate;
ALTER TABLE bytes_log ADD COLUMN stat INT DEFAULT 200 AFTER byte;
ALTER TABLE approx_rate ADD COLUMN sample_cnt INT64 DEFAULT 0 AFTER rate;
CREATE FLOW find_approx_rate SINK TO approx_rate AS
SELECT
(max(byte) - min(byte)) / 30.0 as rate,
count(byte) as sample_cnt,
date_bin(INTERVAL '30 second', ts) as time_window
from
bytes_log
GROUP BY
time_window;
INSERT INTO
bytes_log
VALUES
(0, 200, '2023-01-01 00:00:01'),
(300,200, '2023-01-01 00:00:29');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
SELECT
rate,
sample_cnt,
time_window
FROM
approx_rate;
DROP TABLE bytes_log;
DROP FLOW find_approx_rate;
DROP TABLE approx_rate;

View File

@@ -0,0 +1,70 @@
CREATE TABLE bytes_log (
byte INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-- event time
TIME INDEX(ts)
);
Affected Rows: 0
CREATE TABLE approx_rate (
rate DOUBLE,
time_window TIMESTAMP,
update_at TIMESTAMP,
TIME INDEX(time_window)
);
Affected Rows: 0
CREATE FLOW find_approx_rate SINK TO approx_rate AS
SELECT
(max(byte) - min(byte)) / 30.0 as rate,
date_bin(INTERVAL '30 second', ts) as time_window
from
bytes_log
GROUP BY
time_window;
Affected Rows: 0
INSERT INTO
bytes_log (byte)
VALUES
(NULL),
(300);
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
+--------------------------------------+
| ADMIN FLUSH_FLOW('find_approx_rate') |
+--------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------+
-- since ts is default to now(), omit it when querying
SELECT
rate
FROM
approx_rate;
+------+
| rate |
+------+
| 0.0 |
+------+
DROP FLOW find_approx_rate;
Affected Rows: 0
DROP TABLE bytes_log;
Affected Rows: 0
DROP TABLE approx_rate;
Affected Rows: 0

View File

@@ -0,0 +1,41 @@
CREATE TABLE bytes_log (
byte INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-- event time
TIME INDEX(ts)
);
CREATE TABLE approx_rate (
rate DOUBLE,
time_window TIMESTAMP,
update_at TIMESTAMP,
TIME INDEX(time_window)
);
CREATE FLOW find_approx_rate SINK TO approx_rate AS
SELECT
(max(byte) - min(byte)) / 30.0 as rate,
date_bin(INTERVAL '30 second', ts) as time_window
from
bytes_log
GROUP BY
time_window;
INSERT INTO
bytes_log (byte)
VALUES
(NULL),
(300);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
-- since ts is default to now(), omit it when querying
SELECT
rate
FROM
approx_rate;
DROP FLOW find_approx_rate;
DROP TABLE bytes_log;
DROP TABLE approx_rate;

View File

@@ -0,0 +1,185 @@
-- test if reordered insert is correctly handled
CREATE TABLE bytes_log (
byte INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-- event time
TIME INDEX(ts)
);
Affected Rows: 0
-- TODO(discord9): remove this after auto infer table's time index is impl
CREATE TABLE approx_rate (
rate DOUBLE,
time_window TIMESTAMP,
update_at TIMESTAMP,
TIME INDEX(time_window)
);
Affected Rows: 0
CREATE FLOW find_approx_rate SINK TO approx_rate AS
SELECT
(max(byte) - min(byte)) / 30.0 as rate,
date_bin(INTERVAL '30 second', ts) as time_window
from
bytes_log
GROUP BY
time_window;
Affected Rows: 0
SHOW CREATE TABLE approx_rate;
+-------------+--------------------------------------------+
| Table | Create Table |
+-------------+--------------------------------------------+
| approx_rate | CREATE TABLE IF NOT EXISTS "approx_rate" ( |
| | "rate" DOUBLE NULL, |
| | "time_window" TIMESTAMP(3) NOT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("time_window") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------+--------------------------------------------+
-- reordered insert, also test if null is handled correctly
INSERT INTO
bytes_log (ts, byte)
VALUES
('2023-01-01 00:00:01', NULL),
('2023-01-01 00:00:29', 300);
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
+--------------------------------------+
| ADMIN FLUSH_FLOW('find_approx_rate') |
+--------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------+
SELECT
rate,
time_window
FROM
approx_rate;
+------+---------------------+
| rate | time_window |
+------+---------------------+
| 0.0 | 2023-01-01T00:00:00 |
+------+---------------------+
-- reordered insert, also test if null is handled correctly
INSERT INTO
bytes_log (ts, byte)
VALUES
('2022-01-01 00:00:01', NULL),
('2022-01-01 00:00:29', NULL);
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
+--------------------------------------+
| ADMIN FLUSH_FLOW('find_approx_rate') |
+--------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------+
SELECT
rate,
time_window
FROM
approx_rate;
+------+---------------------+
| rate | time_window |
+------+---------------------+
| | 2022-01-01T00:00:00 |
| 0.0 | 2023-01-01T00:00:00 |
+------+---------------------+
-- reordered insert
INSERT INTO
bytes_log (ts, byte)
VALUES
('2025-01-01 00:00:01', 101),
('2025-01-01 00:00:29', 300);
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
+--------------------------------------+
| ADMIN FLUSH_FLOW('find_approx_rate') |
+--------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------+
SELECT
rate,
time_window
FROM
approx_rate;
+-------------------+---------------------+
| rate | time_window |
+-------------------+---------------------+
| | 2022-01-01T00:00:00 |
| 0.0 | 2023-01-01T00:00:00 |
| 6.633333333333334 | 2025-01-01T00:00:00 |
+-------------------+---------------------+
-- reordered insert
INSERT INTO
bytes_log (ts, byte)
VALUES
('2025-01-01 00:00:32', 450),
('2025-01-01 00:00:37', 500);
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
+--------------------------------------+
| ADMIN FLUSH_FLOW('find_approx_rate') |
+--------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------+
SELECT
rate,
time_window
FROM
approx_rate;
+--------------------+---------------------+
| rate | time_window |
+--------------------+---------------------+
| | 2022-01-01T00:00:00 |
| 0.0 | 2023-01-01T00:00:00 |
| 6.633333333333334 | 2025-01-01T00:00:00 |
| 1.6666666666666667 | 2025-01-01T00:00:30 |
+--------------------+---------------------+
DROP TABLE bytes_log;
Affected Rows: 0
DROP FLOW find_approx_rate;
Affected Rows: 0
DROP TABLE approx_rate;
Affected Rows: 0

View File

@@ -0,0 +1,96 @@
-- test if reordered insert is correctly handled
CREATE TABLE bytes_log (
byte INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-- event time
TIME INDEX(ts)
);
-- TODO(discord9): remove this after auto infer table's time index is impl
CREATE TABLE approx_rate (
rate DOUBLE,
time_window TIMESTAMP,
update_at TIMESTAMP,
TIME INDEX(time_window)
);
CREATE FLOW find_approx_rate SINK TO approx_rate AS
SELECT
(max(byte) - min(byte)) / 30.0 as rate,
date_bin(INTERVAL '30 second', ts) as time_window
from
bytes_log
GROUP BY
time_window;
SHOW CREATE TABLE approx_rate;
-- reordered insert, also test if null is handled correctly
INSERT INTO
bytes_log (ts, byte)
VALUES
('2023-01-01 00:00:01', NULL),
('2023-01-01 00:00:29', 300);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
SELECT
rate,
time_window
FROM
approx_rate;
-- reordered insert, also test if null is handled correctly
INSERT INTO
bytes_log (ts, byte)
VALUES
('2022-01-01 00:00:01', NULL),
('2022-01-01 00:00:29', NULL);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
SELECT
rate,
time_window
FROM
approx_rate;
-- reordered insert
INSERT INTO
bytes_log (ts, byte)
VALUES
('2025-01-01 00:00:01', 101),
('2025-01-01 00:00:29', 300);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
SELECT
rate,
time_window
FROM
approx_rate;
-- reordered insert
INSERT INTO
bytes_log (ts, byte)
VALUES
('2025-01-01 00:00:32', 450),
('2025-01-01 00:00:37', 500);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
SELECT
rate,
time_window
FROM
approx_rate;
DROP TABLE bytes_log;
DROP FLOW find_approx_rate;
DROP TABLE approx_rate;

View File

@@ -0,0 +1,56 @@
CREATE TABLE `api_requests` (
`timestamp` TIMESTAMP NOT NULL,
`request_id` STRING NOT NULL,
`upstream_id` STRING NOT NULL,
`application_id` STRING NULL,
`url` STRING NOT NULL,
`method` STRING NOT NULL,
`status_code` INTEGER NOT NULL,
`request_headers` JSON NULL,
`request_body` STRING NULL,
`response_headers` JSON NULL,
`response_body` STRING NULL,
`latency_ms` INTEGER NOT NULL,
`client_ip` STRING NULL,
`user_agent` STRING NULL,
TIME INDEX (`timestamp`)
)
WITH(
append_mode = 'true'
);
Affected Rows: 0
CREATE TABLE api_request_volume_upstream_stats (
`upstream_id` STRING NOT NULL,
`time_window` TIMESTAMP NOT NULL,
`request_count` BIGINT NOT NULL,
TIME INDEX (`time_window`)
);
Affected Rows: 0
CREATE FLOW api_request_volume_by_upstream
SINK TO api_request_volume_upstream_stats
AS
SELECT
upstream_id,
date_bin(INTERVAL '1 hour', timestamp, '2024-01-01 00:00:00'::TimestampNanosecond) AS time_window,
COUNT(*) AS request_count
FROM api_requests
GROUP BY upstream_id, time_window;
Affected Rows: 0
DROP FLOW api_request_volume_by_upstream;
Affected Rows: 0
DROP TABLE api_request_volume_upstream_stats;
Affected Rows: 0
DROP TABLE api_requests;
Affected Rows: 0

View File

@@ -0,0 +1,41 @@
CREATE TABLE `api_requests` (
`timestamp` TIMESTAMP NOT NULL,
`request_id` STRING NOT NULL,
`upstream_id` STRING NOT NULL,
`application_id` STRING NULL,
`url` STRING NOT NULL,
`method` STRING NOT NULL,
`status_code` INTEGER NOT NULL,
`request_headers` JSON NULL,
`request_body` STRING NULL,
`response_headers` JSON NULL,
`response_body` STRING NULL,
`latency_ms` INTEGER NOT NULL,
`client_ip` STRING NULL,
`user_agent` STRING NULL,
TIME INDEX (`timestamp`)
)
WITH(
append_mode = 'true'
);
CREATE TABLE api_request_volume_upstream_stats (
`upstream_id` STRING NOT NULL,
`time_window` TIMESTAMP NOT NULL,
`request_count` BIGINT NOT NULL,
TIME INDEX (`time_window`)
);
CREATE FLOW api_request_volume_by_upstream
SINK TO api_request_volume_upstream_stats
AS
SELECT
upstream_id,
date_bin(INTERVAL '1 hour', timestamp, '2024-01-01 00:00:00'::TimestampNanosecond) AS time_window,
COUNT(*) AS request_count
FROM api_requests
GROUP BY upstream_id, time_window;
DROP FLOW api_request_volume_by_upstream;
DROP TABLE api_request_volume_upstream_stats;
DROP TABLE api_requests;

View File

@@ -365,33 +365,48 @@ SELECT number FROM out_num_cnt_show;
| 16 |
+--------+
-- should mismatch
-- should mismatch, hence the old flow remains
CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number AS n1, number AS n2 FROM numbers_input_show where number > 15;
Error: 3001(EngineExecuteQuery), Invalid query: Column 1(name is 'ts', flow inferred name is 'n2')'s data type mismatch, expect Timestamp(Millisecond(TimestampMillisecondType)) got Int32(Int32Type)
-- should mismatch
-- should mismatch, hence the old flow remains
CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number AS n1, number AS n2, number AS n3 FROM numbers_input_show where number > 15;
Error: 3001(EngineExecuteQuery), Invalid query: Column 1(name is 'ts', flow inferred name is 'n2')'s data type mismatch, expect Timestamp(Millisecond(TimestampMillisecondType)) got Int32(Int32Type)
INSERT INTO numbers_input_show VALUES (10, 6),(15, 7),(18, 3);
SELECT flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
Affected Rows: 3
+---------------------------------------------------------------+
| flow_definition |
+---------------------------------------------------------------+
| SELECT number AS n1 FROM numbers_input_show WHERE number > 10 |
+---------------------------------------------------------------+
INSERT INTO numbers_input_show VALUES (10, 6),(11, 8),(15, 7),(18, 3);
Affected Rows: 4
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('filter_numbers_show');
Error: 1003(Internal), Internal error: 1003
+-----------------------------------------+
| ADMIN FLUSH_FLOW('filter_numbers_show') |
+-----------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------+
-- sink table stays the same since the flow error out due to column mismatch
-- sink table shows new 11 since old flow remains
SELECT number FROM out_num_cnt_show;
+--------+
| number |
+--------+
| 11 |
| 15 |
| 15 |
| 16 |
| 18 |
+--------+
DROP FLOW filter_numbers_show;

Some files were not shown because too many files have changed in this diff Show More