Compare commits

..

115 Commits

Author SHA1 Message Date
evenyag
7fe735009c chore: bump version to 0.11.2 2025-01-04 02:12:27 +08:00
chenmortal
f0298afaf0 fix: import tokio-metrics and tokio-metrics-collector (#5264) 2025-01-04 02:12:27 +08:00
Yingwen
5175dea6b3 chore: update greptime-proto to include add_if_not_exists (#5289) 2025-01-04 02:12:27 +08:00
discord9
7caa88abc7 refactor: flow replace check&better error msg (#5277)
* chore: better error msg

* chore eof newline

* refactor: move replace check to flow worker

* chore: add ctx to insert flow failure

* chore: Update src/flow/src/adapter/flownode_impl.rs

* test: add order by for deterministic

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
2025-01-04 02:12:27 +08:00
Lei, HUANG
eafb01dfff chore: suppress list warning (#5280)
chore/suppress-list-warning:
 ### Update logging level in `intermediate.rs`

 - Changed logging level from `warn` to `debug` for unexpected directory entries in index creation.
 - Added `debug` to the `common_telemetry` import to support the logging level change.
2025-01-04 02:12:27 +08:00
yihong
b0de816d3d fix: better fmt check from 40s to 4s (#5279)
Signed-off-by: yihong0618 <zouzou0208@gmail.com>
2025-01-04 02:12:27 +08:00
Yingwen
5c6161a95e feat: support add if not exists in the gRPC alter kind (#5273)
* test: test adding existing columns

* chore: add more checks to AlterKind

* chore: update logs

* fix: check and build table info first

* feat: Add add_if_not_exists flag to alter expr

* feat: skip existing columns when building alter kind

* checks in make_region_alter_kind()
* reuse the alter kind

* test: fix tests in common-meta

* chore: fix typos

* chore: update comments
2025-01-04 02:12:27 +08:00
discord9
5e3c5945c4 fix: flow handle reordered inserts (#5275)
* fix: reorder correct schema

* tests: reorder insert handled correctly

* chore: rm unused

* refactor: per review

* chore: more comment

* chore: per review
2025-01-04 02:12:27 +08:00
Yohan Wal
f6feac26f5 refactor: adjust index cache page size (#5267)
* refactor: adjust index cache page size

* fix: wrong docs

* Update config/datanode.example.toml

* Update config/config.md

* Update config/config.md

* chore: adjust to 64KiB

* Apply suggestions from code review
2025-01-04 02:12:27 +08:00
Ning Sun
4b2c59e626 ci: update nix setup (#5272) 2025-01-04 02:12:27 +08:00
discord9
cf605ecccc fix(flow): flow's table schema cache (#5251)
* fix: flow schema cache

* refactor: location for `to_meta_err`

* chore: endfile emptyline

* chore: review(partially)

* chore: per review

* refactor: per review

* refactor: per review
2025-01-04 02:12:27 +08:00
Ning Sun
ab3f9c42f1 fix: correct invalid testing feature gate usage (#5258)
* fix: correct invalid testing feature gate usage

* test: refactor tests to avoid test code leak

* fix: sync main
2025-01-04 02:12:27 +08:00
discord9
258fc6f31b chore: typo (#5265)
* fix: a typo

* chore: even more typos
2025-01-04 02:12:27 +08:00
jeremyhi
e2dccc1d1a feat: hints all in one (#5194)
* feat: hints all in one

* chore: If hints are provided in the x-greptime-hints header, ignore the rest of the headers
2025-01-04 02:12:27 +08:00
Ruihang Xia
78c5707642 feat(log-query): implement pagination with limit and offset parameters (#5241)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-01-04 02:12:27 +08:00
Ning Sun
204b5e474f ci: disable pyo3 build tasks (#5256)
* ci: disable pyo3 build tasks

* ci: skip installing python for windows

* ci: also removed python dependencies from docker base image
2025-01-04 02:12:27 +08:00
Ruihang Xia
e9f1fa0b7d feat: override __sequence on creating SST to save space and CPU (#5252)
* override memtable sequence

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* override sst sequence

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* chore changes per to CR comments

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* use correct sequence number

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* wrap a method to get max sequence

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-01-04 02:12:27 +08:00
Yingwen
a988ff5acf feat: update partition duration of memtable using compaction window (#5197)
* feat: update partition duration of memtable using compaction window

* chore: only use provided duration if it is not None

* test: more tests

* test: test compaction apply window

* style: fix clippy
2025-01-04 02:12:27 +08:00
zyy17
ef0fca9388 refactor: support to convert time string to timestamp in convert_value() (#5242)
refactor: support to covert time string to timestamp in convert_value()
2025-01-04 02:12:27 +08:00
Lin Yihai
b704e7f703 feat: add vec_div function (#5245) 2025-01-04 02:12:27 +08:00
Ning Sun
3a4c636e29 ci: make sure clippy passes before running tests (#5253)
* ci: make sure clippy passes before running tests

* ci: do not run ci on main branch
2025-01-04 02:12:27 +08:00
Zhenchi
a22e8b421c fix(bloom-filter): skip applying for non-indexed columns (#5246)
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-01-04 02:12:27 +08:00
Yingwen
5b42546204 fix: implement a CacheStrategy to ensure compaction use cache correctly (#5254)
* feat: impl CacheStrategy

* refactor: replace Option<CacheManagerRef> with CacheStrategy

* feat: add disabled strategy

* ci: force update taplo

* refactor: rename CacheStrategy::Normal to CacheStrategy::EnableAll

* ci: force install cargo-gc-bin

* ci: force install

* chore: use CacheStrategy::Disabled as ScanInput default

* chore: fix compiler errors
2025-01-04 02:12:27 +08:00
Ruihang Xia
0678a31ab1 feat: add sqlness test for bloom filter index (#5240)
* feat: add sqlness test for bloom filter index

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* drop table after finished

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* redact more variables

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-01-04 02:12:27 +08:00
shuiyisong
589cc84048 fix: disable path label in opendal for now (#5247)
* fix: remove path label in opendal for now

* fix: typo

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

---------

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
2025-01-04 02:12:27 +08:00
Kould
ed8c072a5e feat(vector): add vector functions vec_sub & vec_sum & vec_elem_sum (#5230)
* feat(vector): add sub function

* chore: added check for vector length misalignment

* feat(vector): add `vec_sum` & `vec_elem_sum`

* chore: codefmt

* update lock file

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
2025-01-04 02:12:27 +08:00
Yohan Wal
9d172f1cae feat: init PgElection with candidate registration (#5209)
* feat: init PgElection

fix: release advisory lock

fix: handle duplicate keys

chore: update comments

fix: unlock if acquired the lock

chore: add TODO and avoid unwrap

refactor: check both lock and expire time, add more comments

chore: fmt

fix: deal with multiple edge cases

feat: init PgElection with candidate registration

chore: fmt

chore: remove

* test: add unit test for pg candidate registration

* test: add unit test for pg candidate registration

* chore: update pg env

* chore: make ci happy

* fix: spawn a background connection thread

* chore: typo

* fix: shadow the election client for now

* fix: fix ci

* chore: readability

* chore: follow review comments

* refactor: use kvbackend for pg election

* chore: rename

* chore: make clippy happy

* refactor: use pg server time instead of local ones

* chore: typo

* chore: rename infancy to leader_infancy for clarification

* chore: clean up

* chore: follow review comments

* chore: follow review comments

* ci: unit test should test all features

* ci: fix

* ci: just test pg
2025-01-04 02:12:27 +08:00
Zhenchi
236888313d feat(mito): add bloom filter read metrics (#5239)
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-01-04 02:12:27 +08:00
Zhenchi
0b97ef0e4f feat(config): add bloom filter config (#5237)
* feat(bloom-filter): integrate indexer with mito2

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat(config) add bloom filter config

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix docs

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix docs

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* merge

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* remove cache config

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-01-04 02:12:27 +08:00
localhost
316e6a83eb chore: add more info for pipeline dryrun API (#5232) 2025-01-04 02:12:27 +08:00
Ruihang Xia
6dc57b7a6c feat(bloom-filter): bloom filter applier (#5220)
* wip

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* draft search logic

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* use defined BloomFilterReader

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* round the range end

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* finish index applier

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* integrate applier into mito2 with cache layer

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix cache key and add unit test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* provide bloom filter index size hint

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* revert BloomFilterReaderImpl::read_vec

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove dead code

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* ignore null on eq

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add more tests and fix bloom filter logic

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-01-04 02:12:27 +08:00
discord9
1f5c2b32e5 fix: flow compare null values (#5234)
* fix: flow compare null values

* fix: fix again ck ty before cmp

* chore: rm comment

* fix: handle null

* chore: typo

* docs: update comment

* refactor: per review

* tests: more sqlness

* tests: sqlness not show create table
2025-01-04 02:12:27 +08:00
Zhenchi
01e907be40 feat(bloom-filter): integrate indexer with mito2 (#5236)
* feat(bloom-filter): integrate indexer with mito2

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* rename skippingindextype

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-01-04 02:12:27 +08:00
Lin Yihai
e4dc5ea243 feat: Add vec_mul function. (#5205) 2025-01-04 02:12:27 +08:00
discord9
3ff5754b5a feat(flow): check sink table mismatch on flow creation (#5112)
* tests: more mismatch errors

* feat: check sink table schema if exists&prompt nice err msg

* chore: rm unused variant

* chore: fmt

* chore: cargo clippy

* feat: check schema on create

* feat: better err msg when mismatch

* tests: fix a schema mismatch

* todo: create sink table

* feat: create sink table

* fix: find time index

* tests: auto created sink table

* fix: remove empty keys

* refactor: per review

* chore: fmt

* test: sqlness

* chore: after rebase
2025-01-04 02:12:27 +08:00
Ruihang Xia
c22ca3ebd5 feat: add some critical metrics to flownode (#5235)
* feat: add some critical metrics to flownode

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-01-04 02:12:27 +08:00
zyy17
327d165ad9 feat: introduce the Limiter in frontend to limit the requests by in-flight write bytes size. (#5231)
feat: introduct Limiter to limit in-flight write bytes size in frontend
2025-01-04 02:12:27 +08:00
discord9
fe63a620ef ci: upload .pdb files too for better windows debug (#5224)
ci: upload .pdb files too
2025-01-04 02:12:27 +08:00
Zhenchi
be81f0db5a feat(bloom-filter): impl batch push to creator (#5225)
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-01-04 02:12:27 +08:00
Ruihang Xia
6ca7a305ae fix: correct write cache's metric labels (#5227)
* refactor: remove unused field in WriteCache

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* refactor: unify read and write cache path

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update config and fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove unnecessary methods and adapt test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* change the default path

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove remote-home

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-01-04 02:12:27 +08:00
Weny Xu
1111a8bd57 chore: add log for converting region to follower (#5222)
* chore: add log for converting region to follower

* chore: apply suggestions from CR
2025-01-04 02:12:27 +08:00
zyy17
66b21b29b5 ci: support to pack multiple files in upload-artifacts action (#5228) 2025-01-04 02:12:27 +08:00
Lei, HUANG
31cfab81ad feat(mito): parquet memtable reader (#4967)
* wip: row group reader base

* wip: memtable row group reader

* Refactor MemtableRowGroupReader to streamline data fetching

 - Added early return when fetch_ranges is empty to optimize performance.
 - Replaced inline chunk data assignment with a call to `assign_dense_chunk` for cleaner code.

* wip: row group reader

* wip: reuse RowGroupReader

* wip: bulk part reader

* Enhance BulkPart Iteration with Filtering

 - Introduced `RangeBase` to `BulkIterContext` for improved filter handling.
 - Implemented filter application in `BulkPartIter` to prune batches based on predicates.
 - Updated `SimpleFilterContext::new_opt` to be public for broader access.

* chore: add prune test

* fix: clippy

* fix: introduce prune reader for memtable and add more prune test

* Enhance BulkPart read method to return Option<BoxedBatchIterator>

 - Modified `BulkPart::read` to return `Option<BoxedBatchIterator>` to handle cases where no row groups are selected.
 - Added logic to return `None` when all row groups are filtered out.
 - Updated tests to handle the new return type and added a test case to verify behavior when no row groups match the pr

* refactor/separate-paraquet-reader: Add helper function to parse parquet metadata and integrate it into BulkPartEncoder

* refactor/separate-paraquet-reader:
 Change BulkPartEncoder row_group_size from Option to usize and update tests

* refactor/separate-paraquet-reader: Add context module for bulk memtable iteration and refactor part reading

 • Introduce context module to encapsulate context for bulk memtable iteration.
 • Refactor BulkPart to use BulkIterContextRef for reading operations.
 • Remove redundant code in BulkPart by centralizing context creation and row group pruning logic in the new context module.
 • Create new file context.rs with structures and logic for handling iteration context.
 • Adjust part_reader.rs and row_group_reader.rs to reference the new BulkIterContextRef.

* refactor/separate-paraquet-reader: Refactor RowGroupReader traits and implementations in memtable and parquet reader modules

 • Rename RowGroupReaderVirtual to RowGroupReaderContext for clarity.
 • Replace BulkPartVirt with direct usage of BulkIterContextRef in MemtableRowGroupReader.
 • Simplify MemtableRowGroupReaderBuilder by directly passing context instead of creating a BulkPartVirt instance.
 • Update RowGroupReaderBase to use context field instead of virt, reflecting the trait renaming and usage.
 • Modify FileRangeVirt to FileRangeContextRef and adjust implementations accordingly.

* refactor/separate-paraquet-reader: Refactor column page reader creation and remove unused code

 • Centralize creation of SerializedPageReader in RowGroupBase::column_reader method.
 • Remove unused RowGroupCachedReader and related code from MemtableRowGroupPageFetcher.
 • Eliminate redundant error handling for invalid column index in multiple places.

* chore: rebase main and resolve conflicts

* fix: some comments

* chore: resolve conflicts

* chore: resolve conflicts
2025-01-04 02:12:27 +08:00
Ruihang Xia
dd3a509607 chore: bump opendal to fork version to fix prometheus layer (#5223)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-01-04 02:12:27 +08:00
Weny Xu
d4cae6af1e refactor: remove unnecessary wrap (#5221)
* chore: remove unnecessary arc

* chore: remove unnecessary box
2025-01-04 02:12:27 +08:00
Ruihang Xia
3fec71b5c0 feat: logs query endpoint (#5202)
* define endpoint

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* planner

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update lock file

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add unit test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix toml format

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* revert metric change

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Update src/query/src/log_query/planner.rs

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix compile

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* refactor and tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-01-04 02:12:27 +08:00
Zhenchi
9e31a6478b feat(index-cache): abstract IndexCache to be shared by multi types of indexes (#5219)
* feat(index-cache): abstract `IndexCache` to be shared by multi types of indexes

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix typo

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: remove added label

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: simplify cached reader impl

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* rename func

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-01-04 02:12:27 +08:00
zyy17
bce291a8e1 docs: add greptimedb-operator project link in 'Tools & Extensions' and other small improvements (#5216) 2025-01-04 02:12:27 +08:00
Ning Sun
c788eb67e2 ci: fix nightly ci task on nix build (#5198) 2025-01-04 02:12:27 +08:00
Yiran
0c32dcf46c fix: dead links (#5212) 2025-01-04 02:12:27 +08:00
Zhenchi
68a05b38bd feat(bloom-filter): add bloom filter reader (#5204)
* feat(bloom-filter): add bloom filter reader

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: remove unused dep

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix conflict

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-01-04 02:12:27 +08:00
Zhenchi
ee72ae8bd0 feat(bloom-filter): add memory control for creator (#5185)
* feat(bloom-filter): add memory control for creator

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: remove meaningless buf

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat: add codec for intermediate

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-01-04 02:12:27 +08:00
Weny Xu
556bd796d8 chore: adjust fuzz tests cfg (#5207) 2025-01-04 02:12:27 +08:00
Ruihang Xia
1327e8809f feat: bump opendal and switch prometheus layer to the upstream impl (#5179)
* feat: bump opendal and switch prometheus layer to the upstream impl

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove unused files

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove unused things

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove root dir on recovering cache

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* filter out non-files entry in test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-01-04 02:12:27 +08:00
Yingwen
17d75c767c chore: update all arm64 builders (#5215) 2024-12-21 18:14:45 +08:00
Yingwen
a1ed450c0c chore: arm64 use 8xlarge runner (#5213) 2024-12-20 22:29:25 +08:00
evenyag
ea4ce9d1e3 chore: set version to 0.11.1 2024-12-20 14:12:19 +08:00
evenyag
1f7d9666b7 chore: Downgrade opendal for releasing 0.11.1
Revert "feat: bump opendal and switch prometheus layer to the upstream impl (#5179)"

This reverts commit 422d18da8b.
2024-12-20 14:12:19 +08:00
LFC
9f1a0d78b2 ci: install latest protobuf in dev-builder image (#5196) 2024-12-20 14:12:19 +08:00
discord9
ed8e418716 fix: auto created table ttl check (#5203)
* fix: auto created table ttl check

* tests: with hint
2024-12-20 14:12:19 +08:00
discord9
9e7121c1bb fix(flow): batch builder with type (#5195)
* fix: typed builder

* chore: clippy

* chore: rename

* fix: unit tests

* refactor: per review
2024-12-20 14:12:19 +08:00
dennis zhuang
94a49ed4f0 chore: update PR template (#5199) 2024-12-20 14:12:19 +08:00
discord9
f5e743379f feat: show flow's mem usage in INFORMATION_SCHEMA.FLOWS (#4890)
* feat: add flow mem size to sys table

* chore: rm dup def

* chore: remove unused variant

* chore: minor refactor

* refactor: per review
2024-12-20 14:12:19 +08:00
Ruihang Xia
6735e5867e feat: bump opendal and switch prometheus layer to the upstream impl (#5179)
* feat: bump opendal and switch prometheus layer to the upstream impl

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove unused files

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove unused things

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove root dir on recovering cache

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* filter out non-files entry in test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-12-20 14:12:19 +08:00
Weny Xu
925525726b fix: ensure table route metadata is eventually rolled back on failure (#5174)
* fix: ensure table route metadata is eventually rolled back on procedure failure

* fix(fuzz): enhance procedure condition checking

* chore: add logs

* feat: close downgraded leader region actively

* chore: apply suggestions from CR
2024-12-20 14:12:19 +08:00
Ning Sun
6427682a9a feat: show create postgresql foreign table (#5143)
* feat: add show create table for pg in parser

* feat: implement show create table operation

* fix: adopt upstream changes
2024-12-20 14:12:19 +08:00
Ning Sun
55b0022676 chore: make nix compilation environment config more robust (#5183)
* chore: improve nix-shell support

* fix: add pkg-config

* ci: add a github action to ensure build on clean system

* ci: optimise dependencies of task

* ci: move clean build to nightly
2024-12-20 14:12:19 +08:00
Ruihang Xia
2d84cc8d87 refactor: remove unused symbols (#5193)
chore: remove unused symbols

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-12-20 14:12:19 +08:00
Yingwen
c030705b17 docs: fix grafana dashboard row (#5192) 2024-12-20 14:12:19 +08:00
Ruihang Xia
443c600bd0 fix: validate matcher op for __name__ in promql (#5191)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-12-20 14:12:19 +08:00
Lei, HUANG
39cadfe10b fix(sqlness): enforce order in union tests (#5190)
Add ORDER BY clause to subquery union tests

 Updated the SQL and result files for subquery union tests to include an ORDER BY clause, ensuring consistent result ordering. This change aligns with the test case from the DuckDB repository.
2024-12-20 14:12:19 +08:00
jeremyhi
9b5e4e80f7 feat: extract hints from http header (#5128)
* feat: extract hints from http header

* Update src/servers/src/http/hints.rs

Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>

* chore: by comment

* refactor: get instead of loop

---------

Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>
2024-12-20 14:12:19 +08:00
Yingwen
041a276b66 feat: do not remove time filters in ScanRegion (#5180)
* feat: do not remove time filters

* chore: remove `time_range` from parquet reader

* chore: print more message in the check script

* chore: fix unused error
2024-12-20 14:12:19 +08:00
Yingwen
614a25ddc5 feat: do not keep MemtableRefs in ScanInput (#5184) 2024-12-20 14:12:19 +08:00
dennis zhuang
4337e20010 feat: impl label_join and label_replace for promql (#5153)
* feat: impl label_join and label_replace for promql

* chore: style

* fix: dst_label is eqauls to src_label

* fix: forgot to sort the results

* fix: processing empty source label
2024-12-20 14:12:19 +08:00
Lanqing Yang
65c52cc698 fix: display inverted and fulltext index in show index (#5169) 2024-12-20 14:12:19 +08:00
Yohan Wal
50f31fd681 feat: introduce Buffer for non-continuous bytes (#5164)
* feat: introduce Buffer for non-continuous bytes

* Update src/mito2/src/cache/index.rs

Co-authored-by: Weny Xu <wenymedia@gmail.com>

* chore: apply review comments

* refactor: use opendal::Buffer

---------

Co-authored-by: Weny Xu <wenymedia@gmail.com>
2024-12-20 14:12:19 +08:00
LFC
b5af5aaf8d refactor: produce BatchBuilder from a Batch to modify it again (#5186)
chore: pub some mods
2024-12-20 14:12:19 +08:00
Lei, HUANG
27693c7f1e perf: avoid holding memtable during compaction (#5157)
* perf/avoid-holding-memtable-during-compaction: Refactor Compaction Version Handling

 • Introduced CompactionVersion struct to encapsulate region version details for compaction, removing dependency on VersionRef.
 • Updated CompactionRequest and CompactionRegion to use CompactionVersion.
 • Modified open_compaction_region to construct CompactionVersion without memtables.
 • Adjusted WindowedCompactionPicker to work with CompactionVersion.
 • Enhanced flush logic in WriteBufferManager to improve memory usage checks and logging.

* reformat code

* chore: change log level

* reformat code

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
2024-12-20 14:12:19 +08:00
discord9
a59fef9ffb test: sqlness upgrade compatibility tests (#5126)
* feat: simple version switch

* chore: remove debug print

* chore: add common folder

* tests: add drop table

* feat: pull versioned binary

* chore: don't use native-tls

* chore: rm outdated docs

* chore: new line

* fix: save old bin dir

* fix: switch version restart all node

* feat: use etcd

* fix: wait for election

* fix: normal sqlness

* refactor: hashmap for bin dir

* test: past 3 major version compat crate table

* refactor: allow using without setup etcd
2024-12-20 14:12:19 +08:00
Zhenchi
bcecd8ce52 feat(bloom-filter): add basic bloom filter creator (Part 1) (#5177)
* feat(bloom-filter): add a simple bloom filter creator (Part 1)

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: clippy

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: header

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* docs: add format comment

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2024-12-20 14:12:19 +08:00
Yingwen
ffdcb8c1ac fix: deletion between two put may not work in last_non_null mode (#5168)
* fix: deletion between rows with the same key may not work

* test: add sqlness test case

* chore: comments
2024-12-20 14:12:19 +08:00
Yingwen
554121ad79 chore: add aquamarine to dep lists (#5181) 2024-12-20 14:12:19 +08:00
Weny Xu
43c12b4f2c fix: correct set_region_role_state_gracefully behaviors (#5171)
* fix: reduce default max rows for fuzz testing

* chore: remove Postgres setup from fuzz test workflow

* chore(fuzz): increase resource limits for GreptimeDB cluster

* chore(fuzz): increase resource limits for kafka

* fix: correct `set_region_role_state_gracefully` behaviors

* chore: remove Postgres setup from fuzz test workflow

* chore(fuzz): redue resource limits for GreptimeDB & kafka
2024-12-20 14:12:19 +08:00
discord9
7aa8c28fe4 test: flow rebuild (#5162)
* tests: rebuild flow

* tests: more rebuild

* tests: restart

* chore: drop clean
2024-12-20 14:12:19 +08:00
Ning Sun
34fbe7739e chore: add nix-shell configure for a minimal environment for development (#5175)
* chore: add nix-shell development environment

* chore: add rust-analyzer

* chore: use .envrc as a private file
2024-12-20 14:12:19 +08:00
ZonaHe
06d7bd99dd feat: update dashboard to v0.7.3 (#5172)
Co-authored-by: sunchanglong <sunchanglong@users.noreply.github.com>
2024-12-20 14:12:19 +08:00
Ruihang Xia
b71d842615 feat: introduce SKIPPING index (part 1) (#5155)
* skip index parser

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* wip: sqlness

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* impl show create part

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add empty line

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* change keyword to SKIPPING INDEX

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* rename local variables

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-12-20 14:12:19 +08:00
Lei, HUANG
7f71693b8e chore: gauge for flush compaction (#5156)
* add metrics

* chore/bench-metrics: Add INFLIGHT_FLUSH_COUNT Metric to Flush Process

 • Introduced INFLIGHT_FLUSH_COUNT metric to track the number of ongoing flush operations.
 • Incremented INFLIGHT_FLUSH_COUNT in FlushScheduler to monitor active flushes.
 • Removed redundant increment of INFLIGHT_FLUSH_COUNT in RegionWorkerLoop to prevent double counting.

* chore/bench-metrics: Add Metrics for Compaction and Flush Operations

 • Introduced INFLIGHT_COMPACTION_COUNT and INFLIGHT_FLUSH_COUNT metrics to track the number of ongoing compaction and flush operations.
 • Incremented INFLIGHT_COMPACTION_COUNT when scheduling remote and local compaction jobs, and decremented it upon completion.
 • Added INFLIGHT_FLUSH_COUNT increment and decrement logic around flush tasks to monitor active flush operations.
 • Removed redundant metric updates in worker.rs and handle_compaction.rs to streamline metric handling.

* chore: add metrics for remote compaction jobs

* chore: format

* chore: also add dashbaord
2024-12-20 14:12:19 +08:00
Lin Yihai
615ea1a171 feat: Add vector_scalar_mul function. (#5166) 2024-12-20 14:12:19 +08:00
shuiyisong
4e725d259d chore: remove unused dep (#5163)
* chore: remove unused dep

* chore: remove more unused dep
2024-12-20 14:12:19 +08:00
Niwaka
dc2252eb6d fix: support alter table ~ add ~ custom_type (#5165) 2024-12-20 14:12:19 +08:00
Yingwen
6d4cc2e070 ci: use 4xlarge for nightly build (#5158) 2024-12-20 14:12:19 +08:00
localhost
6066ce2c4a fix: loki write row len error (#5161) 2024-12-20 14:12:19 +08:00
Yingwen
b90d8f7dbd docs: Add index panels to standalone grafana dashboard (#5140)
* docs: Add index panels to standalnoe grafana dashboard

* docs: fix flush/compaction op
2024-12-20 14:12:19 +08:00
Yohan Wal
fdccf4ff84 refactor: cache inverted index with fixed-size page (#5114)
* feat: cache inverted index by page instead of file

* fix: add unit test and fix bugs

* chore: typo

* chore: ci

* fix: math

* chore: apply review comments

* chore: renames

* test: add unit test for index key calculation

* refactor: use ReadableSize

* feat: add config for inverted index page size

* chore: update config file

* refactor: handle multiple range read and fix some related bugs

* fix: add config

* test: turn to a fs reader to match behaviors of object store
2024-12-20 14:12:19 +08:00
localhost
8b1484c064 chore: pipeline dryrun api can currently receives pipeline raw content (#5142)
* chore: pipeline dryrun api can currently receives pipeline raw content

* chore: remove dryrun v1 and add test

* chore: change dryrun pipeline api body schema

* chore: remove useless struct PipelineInfo

* chore: update PipelineDryrunParams doc

* chore: increase code readability

* chore: add some comment for pipeline dryrun test

* Apply suggestions from code review

Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>

* chore: format code

---------

Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>
2024-12-20 14:12:19 +08:00
Yingwen
576e20ac78 feat: collect reader metrics from prune reader (#5152) 2024-12-20 14:12:19 +08:00
localhost
10b3e3da0f chore: decide tag column in log api follow table schema if table exists (#5138)
* chore: decide tag column in log api follow table schema if table exists

* chore: add more test for greptime_identity pipeline

* chore: change pipeline get_table function signature

* chore: change identity_pipeline_inner tag_column_names type
2024-12-20 14:12:19 +08:00
Weny Xu
4a3ef2d718 feat(index): add file_size_hint for remote blob reader (#5147)
feat(index): add file_size_hint for remote blob reader
2024-12-20 14:12:19 +08:00
Yohan Wal
65eabb2a05 feat(fuzz): add alter table options for alter fuzzer (#5074)
* feat(fuzz): add set table options to alter fuzzer

* chore: clippy is happy, I'm sad

* chore: happy ci happy

* fix: unit test

* feat(fuzz): add unset table options to alter fuzzer

* fix: unit test

* feat(fuzz): add table option validator

* fix: make clippy happy

* chore: add comments

* chore: apply review comments

* fix: unit test

* feat(fuzz): add more ttl options

* fix: #5108

* chore: add comments

* chore: add comments
2024-12-20 14:12:19 +08:00
Weny Xu
bc5a57f51f feat: introduce PuffinMetadataCache (#5148)
* feat: introduce `PuffinMetadataCache`

* refactor: remove too_many_arguments

* chore: fmt toml
2024-12-20 14:12:19 +08:00
Weny Xu
f24b9d8814 feat: add prefetch support to InvertedIndexFooterReader for reduced I/O time (#5146)
* feat: add prefetch support to `InvertedIndeFooterReader`

* chore: correct struct name

* chore: apply suggestions from CR
2024-12-20 14:12:19 +08:00
Weny Xu
dd4d0a88ce feat: add prefetch support to PuffinFileFooterReader for reduced I/O time (#5145)
* feat: introduce `PuffinFileFooterReader`

* refactor: remove `SyncReader` trait and impl

* refactor: replace `FooterParser` with `PuffinFileFooterReader`

* chore: remove unused errors
2024-12-20 14:12:19 +08:00
Niwaka
3d2096fe9d feat: support push down IN filter (#5129)
* feat: support push down IN filter

* chore: move tests to prune.sql
2024-12-20 14:12:19 +08:00
Ruihang Xia
35715bb710 feat: implement v1/sql/parse endpoint to parse GreptimeDB's SQL dialect (#5144)
* derive ser/de

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* impl method

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove deserialize

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-12-20 14:12:19 +08:00
ZonaHe
08a3befa67 feat: update dashboard to v0.7.2 (#5141)
Co-authored-by: sunchanglong <sunchanglong@users.noreply.github.com>
2024-12-20 14:12:19 +08:00
Yohan Wal
ca1758d4e7 test: part of parser test migrated from duckdb (#5125)
* test: update test

* fix: fix test
2024-12-20 14:12:19 +08:00
Zhenchi
42bf818167 feat(vector): add scalar add function (#5119)
* refactor: extract implicit conversion helper functions of vector

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat(vector): add scalar add function

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix fmt

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2024-12-20 14:12:19 +08:00
Lei, HUANG
2c9b117224 perf: avoid cache during compaction (#5135)
* Revert "refactor: Avoid wrapping Option for CacheManagerRef (#4996)"

This reverts commit 42bf7e9965.

* fix: memory usage during log ingestion

* fix: fmt
2024-12-20 14:12:19 +08:00
dennis zhuang
3edf2317e1 feat: adjust WAL purge default configurations (#5107)
* feat: adjust WAL purge default configurations

* fix: config

* feat: change raft engine file_size default to 128Mib
2024-12-20 14:12:19 +08:00
jeremyhi
85d72a3cd0 chore: set store_key_prefix for all kvbackend (#5132) 2024-12-20 14:12:19 +08:00
discord9
928172bd82 chore: fix aws_lc not in depend tree check in CI (#5121)
* chore: fix aws_lc check in CI

* chore: update lock file
2024-12-20 14:12:19 +08:00
shuiyisong
e9f5bddeff chore: add /ready api for health checking (#5124)
* chore: add ready endpoint for health checking

* chore: add test
2024-12-20 14:12:19 +08:00
Yingwen
486755d795 chore: bump main branch version to 0.12 (#5133)
chore: bump version to v0.12.0
2024-12-20 14:12:19 +08:00
25 changed files with 234 additions and 1049 deletions

View File

@@ -29,7 +29,7 @@ on:
linux_arm64_runner:
type: choice
description: The runner uses to build linux-arm64 artifacts
default: ec2-c6g.4xlarge-arm64
default: ec2-c6g.8xlarge-arm64
options:
- ec2-c6g.xlarge-arm64 # 4C8G
- ec2-c6g.2xlarge-arm64 # 8C16G

View File

@@ -27,7 +27,7 @@ on:
linux_arm64_runner:
type: choice
description: The runner uses to build linux-arm64 artifacts
default: ec2-c6g.4xlarge-arm64
default: ec2-c6g.8xlarge-arm64
options:
- ec2-c6g.xlarge-arm64 # 4C8G
- ec2-c6g.2xlarge-arm64 # 8C16G

View File

@@ -91,7 +91,7 @@ env:
# The scheduled version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-YYYYMMDD', like v0.2.0-nigthly-20230313;
NIGHTLY_RELEASE_PREFIX: nightly
# Note: The NEXT_RELEASE_VERSION should be modified manually by every formal release.
NEXT_RELEASE_VERSION: v0.12.0
NEXT_RELEASE_VERSION: v0.11.0
# Permission reference: https://docs.github.com/en/actions/using-jobs/assigning-permissions-to-jobs
permissions:

181
Cargo.lock generated
View File

@@ -188,7 +188,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
[[package]]
name = "api"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"common-base",
"common-decimal",
@@ -773,7 +773,7 @@ dependencies = [
[[package]]
name = "auth"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"api",
"async-trait",
@@ -1314,7 +1314,7 @@ dependencies = [
[[package]]
name = "cache"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"catalog",
"common-error",
@@ -1348,7 +1348,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "catalog"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"api",
"arrow",
@@ -1684,7 +1684,7 @@ checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97"
[[package]]
name = "cli"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"async-trait",
"auth",
@@ -1727,7 +1727,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"store-api",
"substrait 0.12.0",
"substrait 0.11.2",
"table",
"tempfile",
"tokio",
@@ -1736,7 +1736,7 @@ dependencies = [
[[package]]
name = "client"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"api",
"arc-swap",
@@ -1763,7 +1763,7 @@ dependencies = [
"rand",
"serde_json",
"snafu 0.8.5",
"substrait 0.12.0",
"substrait 0.11.2",
"substrait 0.37.3",
"tokio",
"tokio-stream",
@@ -1804,7 +1804,7 @@ dependencies = [
[[package]]
name = "cmd"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"async-trait",
"auth",
@@ -1864,7 +1864,7 @@ dependencies = [
"similar-asserts",
"snafu 0.8.5",
"store-api",
"substrait 0.12.0",
"substrait 0.11.2",
"table",
"temp-env",
"tempfile",
@@ -1916,7 +1916,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
[[package]]
name = "common-base"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"anymap2",
"async-trait",
@@ -1938,11 +1938,11 @@ dependencies = [
[[package]]
name = "common-catalog"
version = "0.12.0"
version = "0.11.2"
[[package]]
name = "common-config"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"common-base",
"common-error",
@@ -1965,7 +1965,7 @@ dependencies = [
[[package]]
name = "common-datasource"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"arrow",
"arrow-schema",
@@ -2001,7 +2001,7 @@ dependencies = [
[[package]]
name = "common-decimal"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"bigdecimal 0.4.5",
"common-error",
@@ -2014,7 +2014,7 @@ dependencies = [
[[package]]
name = "common-error"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"http 0.2.12",
"snafu 0.8.5",
@@ -2024,7 +2024,7 @@ dependencies = [
[[package]]
name = "common-frontend"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"async-trait",
"common-error",
@@ -2034,7 +2034,7 @@ dependencies = [
[[package]]
name = "common-function"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"api",
"approx 0.5.1",
@@ -2078,7 +2078,7 @@ dependencies = [
[[package]]
name = "common-greptimedb-telemetry"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"async-trait",
"common-runtime",
@@ -2095,7 +2095,7 @@ dependencies = [
[[package]]
name = "common-grpc"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"api",
"arrow-flight",
@@ -2121,7 +2121,7 @@ dependencies = [
[[package]]
name = "common-grpc-expr"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"api",
"common-base",
@@ -2140,7 +2140,7 @@ dependencies = [
[[package]]
name = "common-macro"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"arc-swap",
"common-query",
@@ -2154,7 +2154,7 @@ dependencies = [
[[package]]
name = "common-mem-prof"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"common-error",
"common-macro",
@@ -2167,7 +2167,7 @@ dependencies = [
[[package]]
name = "common-meta"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"anymap2",
"api",
@@ -2192,8 +2192,6 @@ dependencies = [
"datafusion-common",
"datafusion-expr",
"datatypes",
"deadpool",
"deadpool-postgres",
"derive_builder 0.12.0",
"etcd-client",
"futures",
@@ -2226,7 +2224,7 @@ dependencies = [
[[package]]
name = "common-options"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"common-grpc",
"humantime-serde",
@@ -2235,11 +2233,11 @@ dependencies = [
[[package]]
name = "common-plugins"
version = "0.12.0"
version = "0.11.2"
[[package]]
name = "common-pprof"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"common-error",
"common-macro",
@@ -2251,7 +2249,7 @@ dependencies = [
[[package]]
name = "common-procedure"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"async-stream",
"async-trait",
@@ -2278,7 +2276,7 @@ dependencies = [
[[package]]
name = "common-procedure-test"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"async-trait",
"common-procedure",
@@ -2286,7 +2284,7 @@ dependencies = [
[[package]]
name = "common-query"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"api",
"async-trait",
@@ -2312,7 +2310,7 @@ dependencies = [
[[package]]
name = "common-recordbatch"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"arc-swap",
"common-error",
@@ -2331,7 +2329,7 @@ dependencies = [
[[package]]
name = "common-runtime"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -2361,7 +2359,7 @@ dependencies = [
[[package]]
name = "common-telemetry"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"atty",
"backtrace",
@@ -2389,7 +2387,7 @@ dependencies = [
[[package]]
name = "common-test-util"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"client",
"common-query",
@@ -2401,7 +2399,7 @@ dependencies = [
[[package]]
name = "common-time"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"arrow",
"chrono",
@@ -2419,7 +2417,7 @@ dependencies = [
[[package]]
name = "common-version"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"build-data",
"const_format",
@@ -2429,7 +2427,7 @@ dependencies = [
[[package]]
name = "common-wal"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"common-base",
"common-error",
@@ -3228,7 +3226,7 @@ dependencies = [
[[package]]
name = "datanode"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"api",
"arrow-flight",
@@ -3279,7 +3277,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"store-api",
"substrait 0.12.0",
"substrait 0.11.2",
"table",
"tokio",
"toml 0.8.19",
@@ -3288,7 +3286,7 @@ dependencies = [
[[package]]
name = "datatypes"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"arrow",
"arrow-array",
@@ -3315,39 +3313,6 @@ dependencies = [
"sqlparser_derive 0.1.1",
]
[[package]]
name = "deadpool"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb84100978c1c7b37f09ed3ce3e5f843af02c2a2c431bae5b19230dad2c1b490"
dependencies = [
"async-trait",
"deadpool-runtime",
"num_cpus",
"tokio",
]
[[package]]
name = "deadpool-postgres"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bda39fa1cfff190d8924d447ad04fd22772c250438ca5ce1dfb3c80621c05aaa"
dependencies = [
"deadpool",
"tokio",
"tokio-postgres",
"tracing",
]
[[package]]
name = "deadpool-runtime"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b"
dependencies = [
"tokio",
]
[[package]]
name = "debugid"
version = "0.8.0"
@@ -3945,7 +3910,7 @@ dependencies = [
[[package]]
name = "file-engine"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"api",
"async-trait",
@@ -4061,7 +4026,7 @@ checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8"
[[package]]
name = "flow"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"api",
"arrow",
@@ -4120,7 +4085,7 @@ dependencies = [
"snafu 0.8.5",
"store-api",
"strum 0.25.0",
"substrait 0.12.0",
"substrait 0.11.2",
"table",
"tokio",
"tonic 0.11.0",
@@ -4158,7 +4123,7 @@ checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
[[package]]
name = "frontend"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"api",
"arc-swap",
@@ -5308,7 +5273,7 @@ dependencies = [
[[package]]
name = "index"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"async-trait",
"asynchronous-codec",
@@ -6158,7 +6123,7 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
[[package]]
name = "log-query"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"chrono",
"common-error",
@@ -6170,7 +6135,7 @@ dependencies = [
[[package]]
name = "log-store"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"async-stream",
"async-trait",
@@ -6514,7 +6479,7 @@ dependencies = [
[[package]]
name = "meta-client"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"api",
"async-trait",
@@ -6541,7 +6506,7 @@ dependencies = [
[[package]]
name = "meta-srv"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"api",
"async-trait",
@@ -6566,8 +6531,6 @@ dependencies = [
"common-wal",
"dashmap",
"datatypes",
"deadpool",
"deadpool-postgres",
"derive_builder 0.12.0",
"etcd-client",
"futures",
@@ -6622,7 +6585,7 @@ dependencies = [
[[package]]
name = "metric-engine"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"api",
"aquamarine",
@@ -6716,7 +6679,7 @@ dependencies = [
[[package]]
name = "mito2"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"api",
"aquamarine",
@@ -7453,7 +7416,7 @@ dependencies = [
[[package]]
name = "object-store"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"anyhow",
"bytes",
@@ -7706,7 +7669,7 @@ dependencies = [
[[package]]
name = "operator"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"ahash 0.8.11",
"api",
@@ -7754,7 +7717,7 @@ dependencies = [
"sql",
"sqlparser 0.45.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=54a267ac89c09b11c0c88934690530807185d3e7)",
"store-api",
"substrait 0.12.0",
"substrait 0.11.2",
"table",
"tokio",
"tokio-util",
@@ -8004,7 +7967,7 @@ dependencies = [
[[package]]
name = "partition"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"api",
"async-trait",
@@ -8290,7 +8253,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pipeline"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"ahash 0.8.11",
"api",
@@ -8452,7 +8415,7 @@ dependencies = [
[[package]]
name = "plugins"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"auth",
"clap 4.5.19",
@@ -8740,7 +8703,7 @@ dependencies = [
[[package]]
name = "promql"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"ahash 0.8.11",
"async-trait",
@@ -8975,7 +8938,7 @@ dependencies = [
[[package]]
name = "puffin"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"async-compression 0.4.13",
"async-trait",
@@ -9100,7 +9063,7 @@ dependencies = [
[[package]]
name = "query"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"ahash 0.8.11",
"api",
@@ -9165,7 +9128,7 @@ dependencies = [
"sqlparser 0.45.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=54a267ac89c09b11c0c88934690530807185d3e7)",
"statrs",
"store-api",
"substrait 0.12.0",
"substrait 0.11.2",
"table",
"tokio",
"tokio-stream",
@@ -10649,7 +10612,7 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "script"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"api",
"arc-swap",
@@ -10941,7 +10904,7 @@ dependencies = [
[[package]]
name = "servers"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"ahash 0.8.11",
"api",
@@ -11053,7 +11016,7 @@ dependencies = [
[[package]]
name = "session"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"api",
"arc-swap",
@@ -11407,7 +11370,7 @@ dependencies = [
[[package]]
name = "sql"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"api",
"chrono",
@@ -11471,7 +11434,7 @@ dependencies = [
[[package]]
name = "sqlness-runner"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -11689,7 +11652,7 @@ dependencies = [
[[package]]
name = "store-api"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"api",
"aquamarine",
@@ -11851,7 +11814,7 @@ dependencies = [
[[package]]
name = "substrait"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"async-trait",
"bytes",
@@ -12050,7 +12013,7 @@ dependencies = [
[[package]]
name = "table"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"api",
"async-trait",
@@ -12327,7 +12290,7 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
[[package]]
name = "tests-fuzz"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"arbitrary",
"async-trait",
@@ -12370,7 +12333,7 @@ dependencies = [
[[package]]
name = "tests-integration"
version = "0.12.0"
version = "0.11.2"
dependencies = [
"api",
"arrow-flight",
@@ -12434,7 +12397,7 @@ dependencies = [
"sql",
"sqlx",
"store-api",
"substrait 0.12.0",
"substrait 0.11.2",
"table",
"tempfile",
"time",

View File

@@ -68,7 +68,7 @@ members = [
resolver = "2"
[workspace.package]
version = "0.12.0"
version = "0.11.2"
edition = "2021"
license = "Apache-2.0"
@@ -118,8 +118,6 @@ datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion
datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "7823ef2f63663907edab46af0d51359900f608d6" }
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "7823ef2f63663907edab46af0d51359900f608d6" }
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "7823ef2f63663907edab46af0d51359900f608d6" }
deadpool = "0.10"
deadpool-postgres = "0.12"
derive_builder = "0.12"
dotenv = "0.15"
etcd-client = "0.13"

View File

@@ -94,7 +94,7 @@
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `/tmp/greptimedb/` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. |
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}/object_cache/read`. An empty string means disabling. |
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |
| `storage.bucket` | String | Unset | The S3 bucket name.<br/>**It's only used when the storage type is `S3`, `Oss` and `Gcs`**. |
| `storage.root` | String | Unset | The S3 data will be stored in the specified prefix, for example, `s3://${bucket}/${root}`.<br/>**It's only used when the storage type is `S3`, `Oss` and `Azblob`**. |
@@ -132,10 +132,10 @@
| `region_engine.mito.vector_cache_size` | String | Auto | Cache size for vectors and arrow arrays. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.page_cache_size` | String | Auto | Cache size for pages of SST row groups. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/8 of OS memory. |
| `region_engine.mito.selector_result_cache_size` | String | Auto | Cache size for time series selector (e.g. `last_value()`). Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.enable_write_cache` | Bool | `false` | Whether to enable the write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance. |
| `region_engine.mito.write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}`. |
| `region_engine.mito.write_cache_size` | String | `5GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. |
| `region_engine.mito.write_cache_ttl` | String | Unset | TTL for write cache. |
| `region_engine.mito.enable_experimental_write_cache` | Bool | `false` | Whether to enable the experimental write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance. |
| `region_engine.mito.experimental_write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}/object_cache/write`. |
| `region_engine.mito.experimental_write_cache_size` | String | `5GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. |
| `region_engine.mito.experimental_write_cache_ttl` | String | Unset | TTL for write cache. |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
@@ -466,10 +466,10 @@
| `region_engine.mito.vector_cache_size` | String | Auto | Cache size for vectors and arrow arrays. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.page_cache_size` | String | Auto | Cache size for pages of SST row groups. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/8 of OS memory. |
| `region_engine.mito.selector_result_cache_size` | String | Auto | Cache size for time series selector (e.g. `last_value()`). Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.enable_write_cache` | Bool | `false` | Whether to enable the write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance. |
| `region_engine.mito.write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}`. |
| `region_engine.mito.write_cache_size` | String | `5GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. |
| `region_engine.mito.write_cache_ttl` | String | Unset | TTL for write cache. |
| `region_engine.mito.enable_experimental_write_cache` | Bool | `false` | Whether to enable the experimental write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance. |
| `region_engine.mito.experimental_write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}`. |
| `region_engine.mito.experimental_write_cache_size` | String | `5GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. |
| `region_engine.mito.experimental_write_cache_ttl` | String | Unset | TTL for write cache. |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |

View File

@@ -475,18 +475,18 @@ auto_flush_interval = "1h"
## @toml2docs:none-default="Auto"
#+ selector_result_cache_size = "512MB"
## Whether to enable the write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance.
enable_write_cache = false
## Whether to enable the experimental write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance.
enable_experimental_write_cache = false
## File system path for write cache, defaults to `{data_home}`.
write_cache_path = ""
experimental_write_cache_path = ""
## Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger.
write_cache_size = "5GiB"
experimental_write_cache_size = "5GiB"
## TTL for write cache.
## @toml2docs:none-default
write_cache_ttl = "8h"
experimental_write_cache_ttl = "8h"
## Buffer size for SST writing.
sst_write_buffer_size = "8MB"

View File

@@ -337,7 +337,7 @@ data_home = "/tmp/greptimedb/"
type = "File"
## Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.
## A local file directory, defaults to `{data_home}`. An empty string means disabling.
## A local file directory, defaults to `{data_home}/object_cache/read`. An empty string means disabling.
## @toml2docs:none-default
#+ cache_path = ""
@@ -518,18 +518,18 @@ auto_flush_interval = "1h"
## @toml2docs:none-default="Auto"
#+ selector_result_cache_size = "512MB"
## Whether to enable the write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance.
enable_write_cache = false
## Whether to enable the experimental write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance.
enable_experimental_write_cache = false
## File system path for write cache, defaults to `{data_home}`.
write_cache_path = ""
## File system path for write cache, defaults to `{data_home}/object_cache/write`.
experimental_write_cache_path = ""
## Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger.
write_cache_size = "5GiB"
experimental_write_cache_size = "5GiB"
## TTL for write cache.
## @toml2docs:none-default
write_cache_ttl = "8h"
experimental_write_cache_ttl = "8h"
## Buffer size for SST writing.
sst_write_buffer_size = "8MB"

View File

@@ -62,11 +62,6 @@ impl Instance {
pub fn datanode(&self) -> &Datanode {
&self.datanode
}
/// allow customizing datanode for downstream projects
pub fn datanode_mut(&mut self) -> &mut Datanode {
&mut self.datanode
}
}
#[async_trait]

View File

@@ -66,11 +66,6 @@ impl Instance {
pub fn flownode(&self) -> &FlownodeInstance {
&self.flownode
}
/// allow customizing flownode for downstream projects
pub fn flownode_mut(&mut self) -> &mut FlownodeInstance {
&mut self.flownode
}
}
#[async_trait::async_trait]

View File

@@ -69,7 +69,7 @@ fn test_load_datanode_example_config() {
region_engine: vec![
RegionEngineConfig::Mito(MitoConfig {
auto_flush_interval: Duration::from_secs(3600),
write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)),
experimental_write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)),
..Default::default()
}),
RegionEngineConfig::File(EngineConfig {}),
@@ -203,7 +203,7 @@ fn test_load_standalone_example_config() {
region_engine: vec![
RegionEngineConfig::Mito(MitoConfig {
auto_flush_interval: Duration::from_secs(3600),
write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)),
experimental_write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)),
..Default::default()
}),
RegionEngineConfig::File(EngineConfig {}),

View File

@@ -35,8 +35,6 @@ common-wal.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datatypes.workspace = true
deadpool.workspace = true
deadpool-postgres.workspace = true
derive_builder.workspace = true
etcd-client.workspace = true
futures.workspace = true

View File

@@ -667,18 +667,10 @@ pub enum Error {
},
#[cfg(feature = "pg_kvbackend")]
#[snafu(display("Failed to create connection pool for Postgres"))]
CreatePostgresPool {
#[snafu(display("Failed to connect to Postgres"))]
ConnectPostgres {
#[snafu(source)]
error: deadpool_postgres::CreatePoolError,
#[snafu(implicit)]
location: Location,
},
#[cfg(feature = "pg_kvbackend")]
#[snafu(display("Failed to get Postgres connection from pool: {}", reason))]
GetPostgresConnection {
reason: String,
error: tokio_postgres::Error,
#[snafu(implicit)]
location: Location,
},
@@ -794,9 +786,9 @@ impl ErrorExt for Error {
| EmptyDdlTasks { .. } => StatusCode::InvalidArguments,
#[cfg(feature = "pg_kvbackend")]
PostgresExecution { .. } | CreatePostgresPool { .. } | GetPostgresConnection { .. } => {
StatusCode::Internal
}
PostgresExecution { .. } => StatusCode::Internal,
#[cfg(feature = "pg_kvbackend")]
ConnectPostgres { .. } => StatusCode::Internal,
Error::DatanodeTableInfoNotFound { .. } => StatusCode::Internal,
}
}

View File

@@ -16,17 +16,15 @@ use std::any::Any;
use std::borrow::Cow;
use std::sync::Arc;
use deadpool_postgres::{Config, Pool, Runtime};
use common_telemetry::error;
use snafu::ResultExt;
use tokio_postgres::types::ToSql;
use tokio_postgres::NoTls;
use tokio_postgres::{Client, NoTls};
use crate::error::{
CreatePostgresPoolSnafu, Error, GetPostgresConnectionSnafu, PostgresExecutionSnafu, Result,
StrFromUtf8Snafu,
};
use super::{KvBackend, TxnService};
use crate::error::{ConnectPostgresSnafu, Error, PostgresExecutionSnafu, Result, StrFromUtf8Snafu};
use crate::kv_backend::txn::{Txn as KvTxn, TxnResponse as KvTxnResponse};
use crate::kv_backend::{KvBackend, KvBackendRef, TxnService};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
@@ -36,7 +34,8 @@ use crate::rpc::KeyValue;
/// Posgres backend store for metasrv
pub struct PgStore {
pool: Pool,
// TODO: Consider using sqlx crate.
client: Client,
}
const EMPTY: &[u8] = &[0];
@@ -95,49 +94,33 @@ SELECT k, v FROM prev;"#;
impl PgStore {
/// Create pgstore impl of KvBackendRef from url.
pub async fn with_url(url: &str) -> Result<KvBackendRef> {
let mut cfg = Config::new();
cfg.url = Some(url.to_string());
let pool = cfg
.create_pool(Some(Runtime::Tokio1), NoTls)
.context(CreatePostgresPoolSnafu)?;
Self::with_pg_pool(pool).await
// TODO: support tls.
let (client, conn) = tokio_postgres::connect(url, NoTls)
.await
.context(ConnectPostgresSnafu)?;
tokio::spawn(async move {
if let Err(e) = conn.await {
error!(e; "connection error");
}
});
Self::with_pg_client(client).await
}
/// Create pgstore impl of KvBackendRef from tokio-postgres client.
pub async fn with_pg_pool(pool: Pool) -> Result<KvBackendRef> {
pub async fn with_pg_client(client: Client) -> Result<KvBackendRef> {
// This step ensures the postgres metadata backend is ready to use.
// We check if greptime_metakv table exists, and we will create a new table
// if it does not exist.
let client = match pool.get().await {
Ok(client) => client,
Err(e) => {
return GetPostgresConnectionSnafu {
reason: e.to_string(),
}
.fail();
}
};
client
.execute(METADKV_CREATION, &[])
.await
.context(PostgresExecutionSnafu)?;
Ok(Arc::new(Self { pool }))
}
async fn get_client(&self) -> Result<deadpool::managed::Object<deadpool_postgres::Manager>> {
match self.pool.get().await {
Ok(client) => Ok(client),
Err(e) => GetPostgresConnectionSnafu {
reason: e.to_string(),
}
.fail(),
}
Ok(Arc::new(Self { client }))
}
async fn put_if_not_exists(&self, key: &str, value: &str) -> Result<bool> {
let res = self
.get_client()
.await?
.client
.query(PUT_IF_NOT_EXISTS, &[&key, &value])
.await
.context(PostgresExecutionSnafu)?;
@@ -276,8 +259,7 @@ impl KvBackend for PgStore {
})
.collect();
let res = self
.get_client()
.await?
.client
.query(&template, &params)
.await
.context(PostgresExecutionSnafu)?;
@@ -345,10 +327,8 @@ impl KvBackend for PgStore {
in_params.iter().map(|x| x as &(dyn ToSql + Sync)).collect();
let query = generate_batch_upsert_query(req.kvs.len());
let res = self
.get_client()
.await?
.client
.query(&query, &params)
.await
.context(PostgresExecutionSnafu)?;
@@ -385,10 +365,8 @@ impl KvBackend for PgStore {
.iter()
.map(|x| x as &(dyn ToSql + Sync))
.collect();
let res = self
.get_client()
.await?
.client
.query(&query, &params)
.await
.context(PostgresExecutionSnafu)?;
@@ -431,8 +409,7 @@ impl KvBackend for PgStore {
.collect();
let res = self
.get_client()
.await?
.client
.query(template, &params)
.await
.context(PostgresExecutionSnafu)?;
@@ -476,10 +453,8 @@ impl KvBackend for PgStore {
.iter()
.map(|x| x as &(dyn ToSql + Sync))
.collect();
let res = self
.get_client()
.await?
.client
.query(&query, &params)
.await
.context(PostgresExecutionSnafu)?;
@@ -513,8 +488,7 @@ impl KvBackend for PgStore {
let expect = process_bytes(&req.expect, "CASExpect")?;
let res = self
.get_client()
.await?
.client
.query(CAS, &[&key, &value, &expect])
.await
.context(PostgresExecutionSnafu)?;
@@ -586,19 +560,10 @@ mod tests {
return None;
}
let mut cfg = Config::new();
cfg.url = Some(endpoints);
let pool = cfg
.create_pool(Some(Runtime::Tokio1), NoTls)
.context(CreatePostgresPoolSnafu)
.unwrap();
let client = pool.get().await.unwrap();
client
.execute(METADKV_CREATION, &[])
.await
.context(PostgresExecutionSnafu)
.unwrap();
Some(PgStore { pool })
let (client, connection) = tokio_postgres::connect(&endpoints, NoTls).await.unwrap();
tokio::spawn(connection);
let _ = client.execute(METADKV_CREATION, &[]).await;
Some(PgStore { client })
}
#[tokio::test]

View File

@@ -433,8 +433,8 @@ impl DatanodeBuilder {
) -> Result<MitoEngine> {
if opts.storage.is_object_storage() {
// Enable the write cache when setting object storage
config.enable_write_cache = true;
info!("Configured 'enable_write_cache=true' for mito engine.");
config.enable_experimental_write_cache = true;
info!("Configured 'enable_experimental_write_cache=true' for mito engine.");
}
let mito_engine = match &opts.wal {

View File

@@ -34,8 +34,6 @@ common-version.workspace = true
common-wal.workspace = true
dashmap.workspace = true
datatypes.workspace = true
deadpool.workspace = true
deadpool-postgres.workspace = true
derive_builder.workspace = true
etcd-client.workspace = true
futures.workspace = true

View File

@@ -29,8 +29,6 @@ use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
#[cfg(feature = "pg_kvbackend")]
use common_telemetry::error;
use common_telemetry::info;
#[cfg(feature = "pg_kvbackend")]
use deadpool_postgres::{Config, Runtime};
use etcd_client::Client;
use futures::future;
use servers::configurator::ConfiguratorRef;
@@ -50,9 +48,8 @@ use tonic::transport::server::{Router, TcpIncoming};
use crate::election::etcd::EtcdElection;
#[cfg(feature = "pg_kvbackend")]
use crate::election::postgres::PgElection;
#[cfg(feature = "pg_kvbackend")]
use crate::election::CANDIDATE_LEASE_SECS;
use crate::error::InvalidArgumentsSnafu;
use crate::error::{InitExportMetricsTaskSnafu, TomlFormatSnafu};
use crate::metasrv::builder::MetasrvBuilder;
use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectorRef};
use crate::selector::lease_based::LeaseBasedSelector;
@@ -85,14 +82,14 @@ impl MetasrvInstance {
let httpsrv = Arc::new(
HttpServerBuilder::new(opts.http.clone())
.with_metrics_handler(MetricsHandler)
.with_greptime_config_options(opts.to_toml().context(error::TomlFormatSnafu)?)
.with_greptime_config_options(opts.to_toml().context(TomlFormatSnafu)?)
.build(),
);
let metasrv = Arc::new(metasrv);
// put metasrv into plugins for later use
plugins.insert::<Arc<Metasrv>>(metasrv.clone());
let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
.context(error::InitExportMetricsTaskSnafu)?;
.context(InitExportMetricsTaskSnafu)?;
Ok(MetasrvInstance {
metasrv,
httpsrv,
@@ -107,7 +104,7 @@ impl MetasrvInstance {
self.metasrv.try_start().await?;
if let Some(t) = self.export_metrics_task.as_ref() {
t.start(None).context(error::InitExportMetricsTaskSnafu)?
t.start(None).context(InitExportMetricsTaskSnafu)?
}
let (tx, rx) = mpsc::channel::<()>(1);
@@ -228,20 +225,11 @@ pub async fn metasrv_builder(
}
#[cfg(feature = "pg_kvbackend")]
(None, BackendImpl::PostgresStore) => {
let pool = create_postgres_pool(opts).await?;
let kv_backend = PgStore::with_pg_pool(pool)
let pg_client = create_postgres_client(opts).await?;
let kv_backend = PgStore::with_pg_client(pg_client)
.await
.context(error::KvBackendSnafu)?;
// Client for election should be created separately since we need a different session keep-alive idle time.
let election_client = create_postgres_client(opts).await?;
let election = PgElection::with_pg_client(
opts.server_addr.clone(),
election_client,
opts.store_key_prefix.clone(),
CANDIDATE_LEASE_SECS,
)
.await?;
(kv_backend, Some(election))
(kv_backend, None)
}
};
@@ -287,12 +275,9 @@ async fn create_etcd_client(opts: &MetasrvOptions) -> Result<Client> {
#[cfg(feature = "pg_kvbackend")]
async fn create_postgres_client(opts: &MetasrvOptions) -> Result<tokio_postgres::Client> {
let postgres_url = opts
.store_addrs
.first()
.context(error::InvalidArgumentsSnafu {
err_msg: "empty store addrs",
})?;
let postgres_url = opts.store_addrs.first().context(InvalidArgumentsSnafu {
err_msg: "empty store addrs",
})?;
let (client, connection) = tokio_postgres::connect(postgres_url, NoTls)
.await
.context(error::ConnectPostgresSnafu)?;
@@ -304,19 +289,3 @@ async fn create_postgres_client(opts: &MetasrvOptions) -> Result<tokio_postgres:
});
Ok(client)
}
#[cfg(feature = "pg_kvbackend")]
async fn create_postgres_pool(opts: &MetasrvOptions) -> Result<deadpool_postgres::Pool> {
let postgres_url = opts
.store_addrs
.first()
.context(error::InvalidArgumentsSnafu {
err_msg: "empty store addrs",
})?;
let mut cfg = Config::new();
cfg.url = Some(postgres_url.to_string());
let pool = cfg
.create_pool(Some(Runtime::Tokio1), NoTls)
.context(error::CreatePostgresPoolSnafu)?;
Ok(pool)
}

View File

@@ -19,9 +19,7 @@ pub mod postgres;
use std::fmt::{self, Debug};
use std::sync::Arc;
use common_telemetry::{info, warn};
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::broadcast::{self, Receiver, Sender};
use tokio::sync::broadcast::Receiver;
use crate::error::Result;
use crate::metasrv::MetasrvNodeInfo;
@@ -77,37 +75,6 @@ impl fmt::Display for LeaderChangeMessage {
}
}
fn listen_leader_change(leader_value: String) -> Sender<LeaderChangeMessage> {
let (tx, mut rx) = broadcast::channel(100);
let _handle = common_runtime::spawn_global(async move {
loop {
match rx.recv().await {
Ok(msg) => match msg {
LeaderChangeMessage::Elected(key) => {
info!(
"[{leader_value}] is elected as leader: {:?}, lease: {}",
String::from_utf8_lossy(key.name()),
key.lease_id()
);
}
LeaderChangeMessage::StepDown(key) => {
warn!(
"[{leader_value}] is stepping down: {:?}, lease: {}",
String::from_utf8_lossy(key.name()),
key.lease_id()
);
}
},
Err(RecvError::Lagged(_)) => {
warn!("Log printing is too slow or leader changed too fast!");
}
Err(RecvError::Closed) => break,
}
}
});
tx
}
#[async_trait::async_trait]
pub trait Election: Send + Sync {
type Leader;

View File

@@ -23,12 +23,13 @@ use etcd_client::{
};
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::broadcast::Receiver;
use tokio::time::{timeout, MissedTickBehavior};
use crate::election::{
listen_leader_change, Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT,
CANDIDATE_LEASE_SECS, ELECTION_KEY, KEEP_ALIVE_INTERVAL_SECS,
Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT, CANDIDATE_LEASE_SECS, ELECTION_KEY,
KEEP_ALIVE_INTERVAL_SECS,
};
use crate::error;
use crate::error::Result;
@@ -87,7 +88,36 @@ impl EtcdElection {
E: AsRef<str>,
{
let leader_value: String = leader_value.as_ref().into();
let tx = listen_leader_change(leader_value.clone());
let leader_ident = leader_value.clone();
let (tx, mut rx) = broadcast::channel(100);
let _handle = common_runtime::spawn_global(async move {
loop {
match rx.recv().await {
Ok(msg) => match msg {
LeaderChangeMessage::Elected(key) => {
info!(
"[{leader_ident}] is elected as leader: {:?}, lease: {}",
String::from_utf8_lossy(key.name()),
key.lease_id()
);
}
LeaderChangeMessage::StepDown(key) => {
warn!(
"[{leader_ident}] is stepping down: {:?}, lease: {}",
String::from_utf8_lossy(key.name()),
key.lease_id()
);
}
},
Err(RecvError::Lagged(_)) => {
warn!("Log printing is too slow or leader changed too fast!");
}
Err(RecvError::Closed) => break,
}
}
});
Ok(Arc::new(Self {
leader_value,
client,

View File

@@ -16,32 +16,18 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS};
use common_telemetry::{error, warn};
use common_time::Timestamp;
use itertools::Itertools;
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::broadcast;
use tokio::time::MissedTickBehavior;
use tokio_postgres::Client;
use crate::election::{
listen_leader_change, Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT, ELECTION_KEY,
};
use crate::election::{Election, LeaderChangeMessage, CANDIDATES_ROOT, ELECTION_KEY};
use crate::error::{
DeserializeFromJsonSnafu, NoLeaderSnafu, PostgresExecutionSnafu, Result, SerializeToJsonSnafu,
UnexpectedSnafu,
DeserializeFromJsonSnafu, PostgresExecutionSnafu, Result, SerializeToJsonSnafu, UnexpectedSnafu,
};
use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
// TODO(CookiePie): The lock id should be configurable.
const CAMPAIGN: &str = "SELECT pg_try_advisory_lock(28319)";
const STEP_DOWN: &str = "SELECT pg_advisory_unlock(28319)";
const SET_IDLE_SESSION_TIMEOUT: &str = "SET idle_in_transaction_session_timeout = $1";
// Currently the session timeout is longer than the leader lease time, so the leader lease may expire while the session is still alive.
// Either the leader reconnects and step down or the session expires and the lock is released.
const IDLE_SESSION_TIMEOUT: &str = "10s";
// Separator between value and expire time.
const LEASE_SEP: &str = r#"||__metadata_lease_sep||"#;
@@ -95,33 +81,8 @@ fn parse_value_and_expire_time(value: &str) -> Result<(String, Timestamp)> {
Ok((value.to_string(), expire_time))
}
#[derive(Debug, Clone, Default)]
struct PgLeaderKey {
name: Vec<u8>,
key: Vec<u8>,
rev: i64,
lease: i64,
}
impl LeaderKey for PgLeaderKey {
fn name(&self) -> &[u8] {
&self.name
}
fn key(&self) -> &[u8] {
&self.key
}
fn revision(&self) -> i64 {
self.rev
}
fn lease_id(&self) -> i64 {
self.lease
}
}
/// PostgreSql implementation of Election.
/// TODO(CookiePie): Currently only support candidate registration. Add election logic.
pub struct PgElection {
leader_value: String,
client: Client,
@@ -139,13 +100,7 @@ impl PgElection {
store_key_prefix: String,
candidate_lease_ttl_secs: u64,
) -> Result<ElectionRef> {
// Set idle session timeout to IDLE_SESSION_TIMEOUT to avoid dead advisory lock.
client
.execute(SET_IDLE_SESSION_TIMEOUT, &[&IDLE_SESSION_TIMEOUT])
.await
.context(PostgresExecutionSnafu)?;
let tx = listen_leader_change(leader_value.clone());
let (tx, _) = broadcast::channel(100);
Ok(Arc::new(Self {
leader_value,
client,
@@ -157,7 +112,7 @@ impl PgElection {
}))
}
fn election_key(&self) -> String {
fn _election_key(&self) -> String {
format!("{}{}", self.store_key_prefix, ELECTION_KEY)
}
@@ -191,14 +146,11 @@ impl Election for PgElection {
serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu {
input: format!("{node_info:?}"),
})?;
let res = self
.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl_secs)
.await?;
let res = self.put_value_with_lease(&key, &node_info).await?;
// May registered before, just update the lease.
if !res {
self.delete_value(&key).await?;
self.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl_secs)
.await?;
self.put_value_with_lease(&key, &node_info).await?;
}
// Check if the current lease has expired and renew the lease.
@@ -245,65 +197,12 @@ impl Election for PgElection {
Ok(valid_candidates)
}
/// Attempts to acquire leadership by executing a campaign. This function continuously checks
/// if the current instance can become the leader by acquiring an advisory lock in the PostgreSQL database.
///
/// The function operates in a loop, where it:
///
/// 1. Waits for a predefined interval before attempting to acquire the lock again.
/// 2. Executes the `CAMPAIGN` SQL query to try to acquire the advisory lock.
/// 3. Checks the result of the query:
/// - If the lock is successfully acquired (result is true), it calls the `leader_action` method
/// to perform actions as the leader.
/// - If the lock is not acquired (result is false), it calls the `follower_action` method
/// to perform actions as a follower.
async fn campaign(&self) -> Result<()> {
let mut keep_alive_interval =
tokio::time::interval(Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS));
keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
let res = self
.client
.query(CAMPAIGN, &[])
.await
.context(PostgresExecutionSnafu)?;
if let Some(row) = res.first() {
match row.try_get(0) {
Ok(true) => self.leader_action().await?,
Ok(false) => self.follower_action().await?,
Err(_) => {
return UnexpectedSnafu {
violated: "Failed to get the result of acquiring advisory lock"
.to_string(),
}
.fail();
}
}
} else {
return UnexpectedSnafu {
violated: "Failed to get the result of acquiring advisory lock".to_string(),
}
.fail();
}
let _ = keep_alive_interval.tick().await;
}
todo!()
}
async fn leader(&self) -> Result<Self::Leader> {
if self.is_leader.load(Ordering::Relaxed) {
Ok(self.leader_value.as_bytes().into())
} else {
let key = self.election_key();
if let Some((leader, expire_time, current, _)) =
self.get_value_with_lease(&key, false).await?
{
ensure!(expire_time > current, NoLeaderSnafu);
Ok(leader.as_bytes().into())
} else {
NoLeaderSnafu.fail()
}
}
todo!()
}
async fn resign(&self) -> Result<()> {
@@ -416,17 +315,17 @@ impl PgElection {
}
/// Returns `true` if the insertion is successful
async fn put_value_with_lease(
&self,
key: &str,
value: &str,
lease_ttl_secs: u64,
) -> Result<bool> {
async fn put_value_with_lease(&self, key: &str, value: &str) -> Result<bool> {
let res = self
.client
.query(
PUT_IF_NOT_EXISTS_WITH_EXPIRE_TIME,
&[&key, &value, &LEASE_SEP, &(lease_ttl_secs as f64)],
&[
&key,
&value,
&LEASE_SEP,
&(self.candidate_lease_ttl_secs as f64),
],
)
.await
.context(PostgresExecutionSnafu)?;
@@ -444,177 +343,6 @@ impl PgElection {
Ok(res.len() == 1)
}
/// Handles the actions of a leader in the election process.
///
/// This function performs the following checks and actions:
///
/// - **Case 1**: If the current instance believes it is the leader from the previous term,
/// it attempts to renew the lease. It checks if the lease is still valid and either renews it
/// or steps down if it has expired.
///
/// - **Case 1.1**: If the instance is still the leader and the lease is valid, it renews the lease
/// by updating the value associated with the election key.
/// - **Case 1.2**: If the instance is still the leader but the lease has expired, it logs a warning
/// and steps down, initiating a new campaign for leadership.
/// - **Case 1.3**: If the instance is not the leader (which is a rare scenario), it logs a warning
/// indicating that it still holds the lock and steps down to re-initiate the campaign. This may
/// happen if the leader has failed to renew the lease and the session has expired, and recovery
/// after a period of time during which other leaders have been elected and stepped down.
/// - **Case 1.4**: If no lease information is found, it also steps down and re-initiates the campaign.
///
/// - **Case 2**: If the current instance is not leader previously, it calls the
/// `elected` method as a newly elected leader.
async fn leader_action(&self) -> Result<()> {
let key = self.election_key();
// Case 1
if self.is_leader() {
match self.get_value_with_lease(&key, true).await? {
Some((prev_leader, expire_time, current, prev)) => {
match (prev_leader == self.leader_value, expire_time > current) {
// Case 1.1
(true, true) => {
// Safety: prev is Some since we are using `get_value_with_lease` with `true`.
let prev = prev.unwrap();
self.update_value_with_lease(&key, &prev, &self.leader_value)
.await?;
}
// Case 1.2
(true, false) => {
warn!("Leader lease expired, now stepping down.");
self.step_down().await?;
}
// Case 1.3
(false, _) => {
warn!("Leader lease not found, but still hold the lock. Now stepping down.");
self.step_down().await?;
}
}
}
// Case 1.4
None => {
warn!("Leader lease not found, but still hold the lock. Now stepping down.");
self.step_down().await?;
}
}
// Case 2
} else {
self.elected().await?;
}
Ok(())
}
/// Handles the actions of a follower in the election process.
///
/// This function performs the following checks and actions:
///
/// - **Case 1**: If the current instance believes it is the leader from the previous term,
/// it steps down without deleting the key.
/// - **Case 2**: If the current instance is not the leader but the lease has expired, it raises an error
/// to re-initiate the campaign. If the leader failed to renew the lease, its session will expire and the lock
/// will be released.
/// - **Case 3**: If all checks pass, the function returns without performing any actions.
async fn follower_action(&self) -> Result<()> {
let key = self.election_key();
// Case 1
if self.is_leader() {
self.step_down_without_lock().await?;
}
let (_, expire_time, current, _) = self
.get_value_with_lease(&key, false)
.await?
.context(NoLeaderSnafu)?;
// Case 2
ensure!(expire_time > current, NoLeaderSnafu);
// Case 3
Ok(())
}
/// Step down the leader. The leader should delete the key and notify the leader watcher.
///
/// __DO NOT__ check if the deletion is successful, since the key may be deleted by others elected.
///
/// ## Caution:
/// Should only step down while holding the advisory lock.
async fn step_down(&self) -> Result<()> {
let key = self.election_key();
let leader_key = PgLeaderKey {
name: self.leader_value.clone().into_bytes(),
key: key.clone().into_bytes(),
..Default::default()
};
if self
.is_leader
.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
self.delete_value(&key).await?;
self.client
.query(STEP_DOWN, &[])
.await
.context(PostgresExecutionSnafu)?;
if let Err(e) = self
.leader_watcher
.send(LeaderChangeMessage::StepDown(Arc::new(leader_key)))
{
error!(e; "Failed to send leader change message");
}
}
Ok(())
}
/// Still consider itself as the leader locally but failed to acquire the lock. Step down without deleting the key.
async fn step_down_without_lock(&self) -> Result<()> {
let key = self.election_key().into_bytes();
let leader_key = PgLeaderKey {
name: self.leader_value.clone().into_bytes(),
key: key.clone(),
..Default::default()
};
if self
.is_leader
.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
if let Err(e) = self
.leader_watcher
.send(LeaderChangeMessage::StepDown(Arc::new(leader_key)))
{
error!(e; "Failed to send leader change message");
}
}
Ok(())
}
/// Elected as leader. The leader should put the key and notify the leader watcher.
/// Caution: Should only elected while holding the advisory lock.
async fn elected(&self) -> Result<()> {
let key = self.election_key();
let leader_key = PgLeaderKey {
name: self.leader_value.clone().into_bytes(),
key: key.clone().into_bytes(),
..Default::default()
};
self.delete_value(&key).await?;
self.put_value_with_lease(&key, &self.leader_value, META_LEASE_SECS)
.await?;
if self
.is_leader
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
self.leader_infancy.store(true, Ordering::Relaxed);
if let Err(e) = self
.leader_watcher
.send(LeaderChangeMessage::Elected(Arc::new(leader_key)))
{
error!(e; "Failed to send leader change message");
}
}
Ok(())
}
}
#[cfg(test)]
@@ -662,7 +390,7 @@ mod tests {
};
let res = pg_election
.put_value_with_lease(&key, &value, 10)
.put_value_with_lease(&key, &value)
.await
.unwrap();
assert!(res);
@@ -690,7 +418,7 @@ mod tests {
let key = format!("test_key_{}", i);
let value = format!("test_value_{}", i);
pg_election
.put_value_with_lease(&key, &value, 10)
.put_value_with_lease(&key, &value)
.await
.unwrap();
}
@@ -750,7 +478,7 @@ mod tests {
handles.push(handle);
}
// Wait for candidates to registrate themselves and renew their leases at least once.
tokio::time::sleep(Duration::from_secs(3)).await;
tokio::time::sleep(Duration::from_secs(6)).await;
let client = create_postgres_client().await.unwrap();
@@ -788,402 +516,4 @@ mod tests {
assert!(res);
}
}
#[tokio::test]
async fn test_elected_and_step_down() {
let leader_value = "test_leader".to_string();
let candidate_lease_ttl_secs = 5;
let client = create_postgres_client().await.unwrap();
let (tx, mut rx) = broadcast::channel(100);
let leader_pg_election = PgElection {
leader_value: leader_value.clone(),
client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: "test_prefix".to_string(),
candidate_lease_ttl_secs,
};
leader_pg_election.elected().await.unwrap();
let (leader, expire_time, current, _) = leader_pg_election
.get_value_with_lease(&leader_pg_election.election_key(), false)
.await
.unwrap()
.unwrap();
assert!(leader == leader_value);
assert!(expire_time > current);
assert!(leader_pg_election.is_leader());
match rx.recv().await {
Ok(LeaderChangeMessage::Elected(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_pg_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::Elected"),
}
leader_pg_election.step_down_without_lock().await.unwrap();
let (leader, _, _, _) = leader_pg_election
.get_value_with_lease(&leader_pg_election.election_key(), false)
.await
.unwrap()
.unwrap();
assert!(leader == leader_value);
assert!(!leader_pg_election.is_leader());
match rx.recv().await {
Ok(LeaderChangeMessage::StepDown(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_pg_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::StepDown"),
}
leader_pg_election.elected().await.unwrap();
let (leader, expire_time, current, _) = leader_pg_election
.get_value_with_lease(&leader_pg_election.election_key(), false)
.await
.unwrap()
.unwrap();
assert!(leader == leader_value);
assert!(expire_time > current);
assert!(leader_pg_election.is_leader());
match rx.recv().await {
Ok(LeaderChangeMessage::Elected(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_pg_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::Elected"),
}
leader_pg_election.step_down().await.unwrap();
let res = leader_pg_election
.get_value_with_lease(&leader_pg_election.election_key(), false)
.await
.unwrap();
assert!(res.is_none());
assert!(!leader_pg_election.is_leader());
match rx.recv().await {
Ok(LeaderChangeMessage::StepDown(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_pg_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::StepDown"),
}
}
#[tokio::test]
async fn test_leader_action() {
let leader_value = "test_leader".to_string();
let candidate_lease_ttl_secs = 5;
let client = create_postgres_client().await.unwrap();
let (tx, mut rx) = broadcast::channel(100);
let leader_pg_election = PgElection {
leader_value: leader_value.clone(),
client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: "test_prefix".to_string(),
candidate_lease_ttl_secs,
};
// Step 1: No leader exists, campaign and elected.
let res = leader_pg_election
.client
.query(CAMPAIGN, &[])
.await
.unwrap();
let res: bool = res[0].get(0);
assert!(res);
leader_pg_election.leader_action().await.unwrap();
let (leader, expire_time, current, _) = leader_pg_election
.get_value_with_lease(&leader_pg_election.election_key(), false)
.await
.unwrap()
.unwrap();
assert!(leader == leader_value);
assert!(expire_time > current);
assert!(leader_pg_election.is_leader());
match rx.recv().await {
Ok(LeaderChangeMessage::Elected(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_pg_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::Elected"),
}
// Step 2: As a leader, renew the lease.
let res = leader_pg_election
.client
.query(CAMPAIGN, &[])
.await
.unwrap();
let res: bool = res[0].get(0);
assert!(res);
leader_pg_election.leader_action().await.unwrap();
let (leader, new_expire_time, current, _) = leader_pg_election
.get_value_with_lease(&leader_pg_election.election_key(), false)
.await
.unwrap()
.unwrap();
assert!(leader == leader_value);
assert!(new_expire_time > current && new_expire_time > expire_time);
assert!(leader_pg_election.is_leader());
// Step 3: Something wrong, the leader lease expired.
tokio::time::sleep(Duration::from_secs(META_LEASE_SECS)).await;
let res = leader_pg_election
.client
.query(CAMPAIGN, &[])
.await
.unwrap();
let res: bool = res[0].get(0);
assert!(res);
leader_pg_election.leader_action().await.unwrap();
let res = leader_pg_election
.get_value_with_lease(&leader_pg_election.election_key(), false)
.await
.unwrap();
assert!(res.is_none());
match rx.recv().await {
Ok(LeaderChangeMessage::StepDown(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_pg_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::StepDown"),
}
// Step 4: Re-campaign and elected.
let res = leader_pg_election
.client
.query(CAMPAIGN, &[])
.await
.unwrap();
let res: bool = res[0].get(0);
assert!(res);
leader_pg_election.leader_action().await.unwrap();
let (leader, expire_time, current, _) = leader_pg_election
.get_value_with_lease(&leader_pg_election.election_key(), false)
.await
.unwrap()
.unwrap();
assert!(leader == leader_value);
assert!(expire_time > current);
assert!(leader_pg_election.is_leader());
match rx.recv().await {
Ok(LeaderChangeMessage::Elected(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_pg_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::Elected"),
}
// Step 5: Something wrong, the leader key is deleted by other followers.
leader_pg_election
.delete_value(&leader_pg_election.election_key())
.await
.unwrap();
leader_pg_election.leader_action().await.unwrap();
let res = leader_pg_election
.get_value_with_lease(&leader_pg_election.election_key(), false)
.await
.unwrap();
assert!(res.is_none());
assert!(!leader_pg_election.is_leader());
match rx.recv().await {
Ok(LeaderChangeMessage::StepDown(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_pg_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::StepDown"),
}
// Step 6: Re-campaign and elected.
let res = leader_pg_election
.client
.query(CAMPAIGN, &[])
.await
.unwrap();
let res: bool = res[0].get(0);
assert!(res);
leader_pg_election.leader_action().await.unwrap();
let (leader, expire_time, current, _) = leader_pg_election
.get_value_with_lease(&leader_pg_election.election_key(), false)
.await
.unwrap()
.unwrap();
assert!(leader == leader_value);
assert!(expire_time > current);
assert!(leader_pg_election.is_leader());
match rx.recv().await {
Ok(LeaderChangeMessage::Elected(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_pg_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::Elected"),
}
// Step 7: Something wrong, the leader key changed by others.
let res = leader_pg_election
.client
.query(CAMPAIGN, &[])
.await
.unwrap();
let res: bool = res[0].get(0);
assert!(res);
leader_pg_election
.delete_value(&leader_pg_election.election_key())
.await
.unwrap();
leader_pg_election
.put_value_with_lease(&leader_pg_election.election_key(), "test", 10)
.await
.unwrap();
leader_pg_election.leader_action().await.unwrap();
let res = leader_pg_election
.get_value_with_lease(&leader_pg_election.election_key(), false)
.await
.unwrap();
assert!(res.is_none());
assert!(!leader_pg_election.is_leader());
match rx.recv().await {
Ok(LeaderChangeMessage::StepDown(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
assert_eq!(
String::from_utf8_lossy(key.key()),
leader_pg_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::StepDown"),
}
}
#[tokio::test]
async fn test_follower_action() {
let candidate_lease_ttl_secs = 5;
let follower_client = create_postgres_client().await.unwrap();
let (tx, mut rx) = broadcast::channel(100);
let follower_pg_election = PgElection {
leader_value: "test_follower".to_string(),
client: follower_client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: "test_prefix".to_string(),
candidate_lease_ttl_secs,
};
let leader_client = create_postgres_client().await.unwrap();
let (tx, _) = broadcast::channel(100);
let leader_pg_election = PgElection {
leader_value: "test_leader".to_string(),
client: leader_client,
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(true),
leader_watcher: tx,
store_key_prefix: "test_prefix".to_string(),
candidate_lease_ttl_secs,
};
leader_pg_election
.client
.query(CAMPAIGN, &[])
.await
.unwrap();
leader_pg_election.elected().await.unwrap();
// Step 1: As a follower, the leader exists and the lease is not expired.
follower_pg_election.follower_action().await.unwrap();
// Step 2: As a follower, the leader exists but the lease expired.
tokio::time::sleep(Duration::from_secs(META_LEASE_SECS)).await;
assert!(follower_pg_election.follower_action().await.is_err());
// Step 3: As a follower, the leader does not exist.
leader_pg_election
.delete_value(&leader_pg_election.election_key())
.await
.unwrap();
assert!(follower_pg_election.follower_action().await.is_err());
// Step 4: Follower thinks it's the leader but failed to acquire the lock.
follower_pg_election
.is_leader
.store(true, Ordering::Relaxed);
assert!(follower_pg_election.follower_action().await.is_err());
match rx.recv().await {
Ok(LeaderChangeMessage::StepDown(key)) => {
assert_eq!(String::from_utf8_lossy(key.name()), "test_follower");
assert_eq!(
String::from_utf8_lossy(key.key()),
follower_pg_election.election_key()
);
assert_eq!(key.lease_id(), i64::default());
assert_eq!(key.revision(), i64::default());
}
_ => panic!("Expected LeaderChangeMessage::StepDown"),
}
}
}

View File

@@ -704,7 +704,7 @@ pub enum Error {
},
#[cfg(feature = "pg_kvbackend")]
#[snafu(display("Failed to connect to Postgres"))]
#[snafu(display("Failed to connect to PostgresSQL"))]
ConnectPostgres {
#[snafu(source)]
error: tokio_postgres::Error,
@@ -712,23 +712,6 @@ pub enum Error {
location: Location,
},
#[cfg(feature = "pg_kvbackend")]
#[snafu(display("Failed to create connection pool for Postgres"))]
CreatePostgresPool {
#[snafu(source)]
error: deadpool_postgres::CreatePoolError,
#[snafu(implicit)]
location: Location,
},
#[cfg(feature = "pg_kvbackend")]
#[snafu(display("Failed to get connection from Postgres pool: {}", reason))]
GetPostgresConnection {
reason: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Handler not found: {}", name))]
HandlerNotFound {
name: String,
@@ -860,10 +843,9 @@ impl ErrorExt for Error {
Error::Other { source, .. } => source.status_code(),
Error::LookupPeer { source, .. } => source.status_code(),
#[cfg(feature = "pg_kvbackend")]
Error::CreatePostgresPool { .. }
| Error::GetPostgresConnection { .. }
| Error::PostgresExecution { .. }
| Error::ConnectPostgres { .. } => StatusCode::Internal,
Error::ConnectPostgres { .. } => StatusCode::Internal,
#[cfg(feature = "pg_kvbackend")]
Error::PostgresExecution { .. } => StatusCode::Internal,
}
}

View File

@@ -93,15 +93,15 @@ pub struct MitoConfig {
pub page_cache_size: ReadableSize,
/// Cache size for time series selector (e.g. `last_value()`). Setting it to 0 to disable the cache.
pub selector_result_cache_size: ReadableSize,
/// Whether to enable the write cache.
pub enable_write_cache: bool,
/// Whether to enable the experimental write cache.
pub enable_experimental_write_cache: bool,
/// File system path for write cache dir's root, defaults to `{data_home}`.
pub write_cache_path: String,
pub experimental_write_cache_path: String,
/// Capacity for write cache.
pub write_cache_size: ReadableSize,
pub experimental_write_cache_size: ReadableSize,
/// TTL for write cache.
#[serde(with = "humantime_serde")]
pub write_cache_ttl: Option<Duration>,
pub experimental_write_cache_ttl: Option<Duration>,
// Other configs:
/// Buffer size for SST writing.
@@ -147,10 +147,10 @@ impl Default for MitoConfig {
vector_cache_size: ReadableSize::mb(512),
page_cache_size: ReadableSize::mb(512),
selector_result_cache_size: ReadableSize::mb(512),
enable_write_cache: false,
write_cache_path: String::new(),
write_cache_size: ReadableSize::gb(5),
write_cache_ttl: None,
enable_experimental_write_cache: false,
experimental_write_cache_path: String::new(),
experimental_write_cache_size: ReadableSize::gb(5),
experimental_write_cache_ttl: None,
sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
allow_stale_entries: false,
@@ -234,8 +234,8 @@ impl MitoConfig {
}
// Sets write cache path if it is empty.
if self.write_cache_path.trim().is_empty() {
self.write_cache_path = data_home.to_string();
if self.experimental_write_cache_path.trim().is_empty() {
self.experimental_write_cache_path = data_home.to_string();
}
self.index.sanitize(data_home, &self.inverted_index)?;
@@ -268,7 +268,7 @@ impl MitoConfig {
self.selector_result_cache_size = mem_cache_size;
}
/// Enable write cache.
/// Enable experimental write cache.
#[cfg(test)]
pub fn enable_write_cache(
mut self,
@@ -276,10 +276,10 @@ impl MitoConfig {
size: ReadableSize,
ttl: Option<Duration>,
) -> Self {
self.enable_write_cache = true;
self.write_cache_path = path;
self.write_cache_size = size;
self.write_cache_ttl = ttl;
self.enable_experimental_write_cache = true;
self.experimental_write_cache_path = path;
self.experimental_write_cache_size = size;
self.experimental_write_cache_ttl = ttl;
self
}
}

View File

@@ -140,7 +140,7 @@ async fn test_edit_region_fill_cache() {
.create_engine_with(
MitoConfig {
// Write cache must be enabled to download the ingested SST file.
enable_write_cache: true,
enable_experimental_write_cache: true,
..Default::default()
},
None,

View File

@@ -365,20 +365,23 @@ async fn write_cache_from_config(
puffin_manager_factory: PuffinManagerFactory,
intermediate_manager: IntermediateManager,
) -> Result<Option<WriteCacheRef>> {
if !config.enable_write_cache {
if !config.enable_experimental_write_cache {
return Ok(None);
}
tokio::fs::create_dir_all(Path::new(&config.write_cache_path))
// TODO(yingwen): Remove this and document the config once the write cache is ready.
warn!("Write cache is an experimental feature");
tokio::fs::create_dir_all(Path::new(&config.experimental_write_cache_path))
.await
.context(CreateDirSnafu {
dir: &config.write_cache_path,
dir: &config.experimental_write_cache_path,
})?;
let cache = WriteCache::new_fs(
&config.write_cache_path,
config.write_cache_size,
config.write_cache_ttl,
&config.experimental_write_cache_path,
config.experimental_write_cache_size,
config.experimental_write_cache_ttl,
puffin_manager_factory,
intermediate_manager,
)

View File

@@ -928,9 +928,9 @@ worker_request_batch_size = 64
manifest_checkpoint_distance = 10
compress_manifest = false
auto_flush_interval = "30m"
enable_write_cache = false
write_cache_path = ""
write_cache_size = "5GiB"
enable_experimental_write_cache = false
experimental_write_cache_path = ""
experimental_write_cache_size = "5GiB"
sst_write_buffer_size = "8MiB"
parallel_scan_channel_size = 32
allow_stale_entries = false