Compare commits

..

52 Commits

Author SHA1 Message Date
Weny Xu
8f9674440f chore: pick #7148 into release/v0.16 (#7455)
refactor: add test feature gate to numbers table (#7148)

* refactor: add test feature gate to numbers table



* chore: add debug_assertions



* refactor: extract numbers table provider



* chore: address CR issues



---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
Signed-off-by: WenyXu <wenymedia@gmail.com>
Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>
2025-12-22 19:34:19 +08:00
Weny Xu
a29681eb12 chore: pick #7101 into release/v0.16 (#7453) 2025-12-22 07:14:48 +00:00
Weny Xu
c112cdf241 feat: make distributed time constants and client timeouts configurable (#7429)
* chore: update rskafka

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: make distributed time constants and client timeouts configurable

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: change etcd endpoints to array in the test scripts (#7419)

chore: change etcd endpoint

Signed-off-by: liyang <daviderli614@gmail.com>

* fix: fix tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
Signed-off-by: liyang <daviderli614@gmail.com>
Co-authored-by: liyang <daviderli614@gmail.com>
2025-12-17 11:40:16 +00:00
Weny Xu
4d33b9687a fix: improve network failure detection (#7367) 2025-12-09 11:47:45 +00:00
Weny Xu
b0d2d26ad8 chore: pick #6845 #6821 into release/v0.16 (#7368) 2025-12-09 12:45:47 +08:00
Weny Xu
4ed048c735 fix(meta): add default etcd client options with keep-alive settings (#7363) 2025-12-08 12:39:18 +00:00
Weny Xu
af4465e543 chore: pick #7344 into release v0.16 (#7345) 2025-12-04 08:34:23 +00:00
liyang
7c2cdccb22 chore: use greptime dockerhub image (#6865)
Signed-off-by: liyang <daviderli614@gmail.com>
2025-12-03 18:33:56 +08:00
liyang
3655bbe032 chore: update bitnami config (#6847)
* chore: update bitnami config

Signed-off-by: liyang <daviderli614@gmail.com>

* update postgresql chart version

Signed-off-by: liyang <daviderli614@gmail.com>

* fix ci

Signed-off-by: liyang <daviderli614@gmail.com>

* refactor: add pull-test-deps-images.sh to pull images one by one to avoid rate limit

Signed-off-by: zyy17 <zyylsxm@gmail.com>

---------

Signed-off-by: liyang <daviderli614@gmail.com>
Signed-off-by: zyy17 <zyylsxm@gmail.com>
Co-authored-by: zyy17 <zyylsxm@gmail.com>
2025-12-03 18:33:56 +08:00
jeremyhi
5501f63a71 fix: reset cached channel on errors with VIP (#7335)
Signed-off-by: jeremyhi <fengjiachun@gmail.com>
2025-12-03 18:33:56 +08:00
Ruihang Xia
c5c9b263e1 chore: fix typo (#6885)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-12-03 18:33:56 +08:00
Ning Sun
9e6b19d301 chore: fix typo (#7169)
Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-12-03 18:33:56 +08:00
Ning Sun
f7f52592b4 fix: various typos reported by CI (#7047)
* fix: various typos reported by CI

* fix: additional typo

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-12-03 18:33:56 +08:00
LFC
4a936d7320 chore: pub access layer (#6670) (#6842)
(cherry picked from commit 7bb765af1d)

Signed-off-by: evenyag <realevenyag@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
2025-08-28 09:42:16 +00:00
Yingwen
83f566ad20 perf: Reduce fulltext bloom load time (#6651)
* perf: cached reader do not get page concurrently

Otherwise they will all fetch the same pages in parallel

Signed-off-by: evenyag <realevenyag@gmail.com>

* perf: always disable zstd for bloom

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
2025-08-06 18:53:42 +08:00
Ruihang Xia
ed2dff6d27 feat: count underscore in English tokenizer and improve performance (#6660)
* feat: count underscore in English tokenizer and improve performance

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update lock file

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update test results

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* assert lookup table

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* handle utf8 alphanumeric

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* finalize

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-08-06 18:53:42 +08:00
Yingwen
af483335b2 feat: EncodedBulkPartIter iters flat format and returns RecordBatch (#6655)
* feat: implements iter to read bulk part

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: BulkPartEncoder encodes BulkPart instead of mutation

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
2025-08-06 15:43:55 +08:00
Lei, HUANG
70852a01a3 chore: add methods to catalog manager (#6656)
* chore/optimize-catalog:
 ### Add `table_id` Method to `CatalogManager`

 - **Files Modified**:
   - `src/catalog/src/kvbackend/manager.rs`
   - `src/catalog/src/lib.rs`

 - **Key Changes**:
   - Introduced a new asynchronous method `table_id` in the `CatalogManager` trait to retrieve the table ID based on catalog, schema, and table name.
   - Implemented the `table_id` method in `KvBackendCatalogManager` to fetch the table ID from the system catalog or cache, with a fallback to `pg_catalog` for Postgres channels.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* chore/optimize-catalog:
 ### Add `table_info_by_id` Method to Catalog Managers

 - **`manager.rs`**: Introduced the `table_info_by_id` method in `KvBackendCatalogManager` to retrieve table information by table ID using the `TableInfoCacheRef`.
 - **`lib.rs`**: Updated the `CatalogManager` trait to include the new `table_info_by_id` method.
 - **`memory/manager.rs`**: Implemented the `table_info_by_id` method in `MemoryCatalogManager` to fetch table information by table ID from in-memory catalogs.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-08-06 15:43:55 +08:00
Ruihang Xia
8f3c6f72f5 feat(log-query): support binary op, scalar fn & is_true/is_false (#6659)
* rename symbol

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* handle binary op

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update test results

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Update src/query/src/log_query/planner.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

* fix format

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* reduce duplication

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
2025-08-06 15:43:55 +08:00
Ruihang Xia
d3f15e72bf feat: support TQL CTE in planner (#6645)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-08-06 15:43:55 +08:00
zyy17
b25a6527ed fix: unable to record slow query (#6590)
* refactor: add process manager for prometheus query

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* refactor: modify `register_query()` API to accept parsed statement(`catalog::process_manager::QueryStatement`)

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* refactor: add the slow query timer in the `Tikcet` of ProcessManager

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* test: add integration tests

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* refactor: add process manager in `do_exec_plan()`

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* tests: add `test_postgres_slow_query` integration test

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* chore: polish the code

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* refactor: create a query ticket and slow query timer if the statement is a query in `query_statement()`

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* fix: sqlness errors

Signed-off-by: zyy17 <zyylsxm@gmail.com>

---------

Signed-off-by: zyy17 <zyylsxm@gmail.com>
2025-08-06 15:43:55 +08:00
zyy17
7b48e53261 feat: record the migration events in metasrv (#6579)
* feat: collect procedure event

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* feat: collect region migration events

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* test: add integration test

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* chore: fix docs error

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* chore: fix integration test error

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* chore: change status code for errors

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* refactor: add `event()` in Procedure

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* refactor: improve trait design

1. Add `user_metadata()` in `Procedure` trait;

2. Add `Eventable` trait;

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* chore: polish the code

Signed-off-by: zyy17 <zyylsxm@gmail.com>

---------

Signed-off-by: zyy17 <zyylsxm@gmail.com>
2025-08-06 15:43:55 +08:00
Ruihang Xia
ebe78f668e fix: box Explain node in Statement to reduce stack size (#6661)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-08-06 15:43:55 +08:00
Ruihang Xia
8456949749 feat: use column expr with filters in LogQuery (#6646)
* feat: use column expr with filters in LogQuery

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove some clone

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-08-06 15:43:55 +08:00
discord9
f21bedd141 fix: not mark all deleted when partial trunc (#6654)
* fix: not mark all deleted when partial trunc&not update manifest when partial file range is empty

Signed-off-by: discord9 <discord9@163.com>

* docs: note

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
2025-08-06 15:43:55 +08:00
discord9
469c3140fe feat: register all aggregate function to auto step aggr fn (#6596)
* feat: support generic aggr push down

Signed-off-by: discord9 <discord9@163.com>

* typo

Signed-off-by: discord9 <discord9@163.com>

* fix: type ck in merge wrapper

Signed-off-by: discord9 <discord9@163.com>

* test: update sqlness

Signed-off-by: discord9 <discord9@163.com>

* feat: support all registried aggr func

Signed-off-by: discord9 <discord9@163.com>

* chore: per review

Signed-off-by: discord9 <discord9@163.com>

* chore: per review

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
2025-08-06 15:43:55 +08:00
jeremyhi
569d93c599 fix: sequence peek with remote value (#6648)
* fix: sequence peek with remote value

* chore: more ut

* chore: add more ut
2025-08-06 15:43:55 +08:00
Yingwen
b91b520f54 feat: Implements an iterator to read the RecordBatch in BulkPart (#6647)
* feat: impl RecordBatchIter for BulkPart

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: rename BulkPartIter to EncodedBulkPartIter

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: add iter benchmark

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: filter by primary key columns

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: move struct definitions

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: bulk iter for flat schema

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: iter filter benchmark

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: fix compiler errors

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: use corrent sequence array to compare

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: remove RecordBatchIter

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: update comments

Signed-off-by: evenyag <realevenyag@gmail.com>

* style: fix clippy

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: apply projection first

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: address comment

No need to check number of rows after filter

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
2025-08-06 15:43:55 +08:00
Ruihang Xia
7a12585af9 feat: use real data to truncate manipulate range (#6649)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-08-06 15:43:55 +08:00
Yingwen
89b661c98a feat: implements FlatReadFormat to project parquets with flat schema (#6638)
* feat: add plain read format

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: reduce unused code

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: reuse code

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: allow dead code

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: change ReadFormat to enum

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: as_primary_key() returns option

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: remove some allow dead_code

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: rename WriteFormat to PrimaryKeyWriteFormat

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: add tests for read/write format

Signed-off-by: evenyag <realevenyag@gmail.com>

* style: format code

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: dedup column ids in format

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: rename plain to flat

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: implements FlatReadFormat based on the new format

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: fix tests

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: support override sequence

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: new_override_sequence_array for ReadFormat

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: update comments

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: address comment

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
2025-08-06 15:43:55 +08:00
Ning Sun
e0b1ebdfb6 feat: schema/database support for label_values (#6631)
* feat: initial support for __schema__ in label values

* feat: filter database with matches

* refactor: skip unnecessary check

* fix: resolve schema matcher in label values

* test: add a test case for table not exists

* refactor: add matchop check on db label

* chore: merge main
2025-08-06 15:43:55 +08:00
Weny Xu
eaaf9448c7 fix: fix sequence peek method to return correct values when sequence is not initialized (#6643)
fix: improve sequence peek method to handle uninitialized sequences

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-08-06 15:43:55 +08:00
discord9
dc37382946 feat: add partial truncate (#6602)
* feat: add partial truncate

Signed-off-by: discord9 <discord9@163.com>

* fix: per review

Signed-off-by: discord9 <discord9@163.com>

* feat: add proto partial truncate kind

Signed-off-by: discord9 <discord9@163.com>

* chore: clippy

Signed-off-by: discord9 <discord9@163.com>

* chore: update branched proto

Signed-off-by: discord9 <discord9@163.com>

* feat: grpc support truncate WIP sql support

Signed-off-by: discord9 <discord9@163.com>

* wip: parse truncate range

Signed-off-by: discord9 <discord9@163.com>

* feat: truncate by range

Signed-off-by: discord9 <discord9@163.com>

* fix: truncate range display

Signed-off-by: discord9 <discord9@163.com>

* chore: resolve todo

Signed-off-by: discord9 <discord9@163.com>

* refactor: per review

Signed-off-by: discord9 <discord9@163.com>

* test: more invalid parse

Signed-off-by: discord9 <discord9@163.com>

* chore: per review

Signed-off-by: discord9 <discord9@163.com>

* refactor: per review

Signed-off-by: discord9 <discord9@163.com>

* chore: unused

Signed-off-by: discord9 <discord9@163.com>

* chore: per review

Signed-off-by: discord9 <discord9@163.com>

* chore: update branch

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
2025-08-06 15:43:55 +08:00
Weny Xu
b31d307eb6 feat: introduce reconciliation interface (#6614)
* feat: introduce reconcile interface

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: upgrade proto

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-08-06 15:43:55 +08:00
Yingwen
d9f177ba9f feat: Add option to limit the files reading simultaneously (#6635)
* feat: limits the max number of files to scan at the same time

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: make max_concurrent_scan_files configurable

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: reduce concurrent scan files to 128

Signed-off-by: evenyag <realevenyag@gmail.com>

* docs: update config example

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: add test for max_concurrent_scan_files

Signed-off-by: evenyag <realevenyag@gmail.com>

* style: fix clippy

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: update config test

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
2025-08-06 15:43:55 +08:00
Ruihang Xia
b940906d86 feat: absent function in PromQL (#6618)
* feat: absent function in PromQL

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* impl serde

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* sqlness test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* ai suggests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* resolve PR comments

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* comment out some tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-08-06 15:43:55 +08:00
discord9
9ccc8da231 fix: show create flow's expire after (#6641)
* fix: show create flow's expire after

Signed-off-by: discord9 <discord9@163.com>

* chore: per review

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
2025-08-06 15:43:55 +08:00
Keming
0d603bfc96 fix: bump greptime-sqlparser to avoid convert statement overflow (#6634)
bump the greptime-sqlparser

Co-authored-by: Yihong <zouzou0208@gmail.com>
2025-08-06 15:43:55 +08:00
Ruihang Xia
62eedbb6cd feat: support tls for pg backend (#6611)
* load tls

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* impl tls

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* pass options

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* implement require mode

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* clean up

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update config

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* default to prefer

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update example config

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* adjust example config

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* handle client cert and key properly

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* implement verify_ca and verify_full

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update integration test for config api

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* change config name and default mode

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-08-06 15:43:55 +08:00
zyy17
dec6da0e8a chore: add limit in resources panel and Cache Miss panel (#6636)
chore: add `limit` in resources panel and 'Cache Miss' panel

Signed-off-by: zyy17 <zyylsxm@gmail.com>
2025-08-06 15:43:55 +08:00
yihong
a84cf5ec67 chore: update jieba tantivy-jieba and tantivy version (#6637)
* chore: update jieba tantivy-jieba and tantivy version

Signed-off-by: yihong0618 <zouzou0208@gmail.com>

* fix: address comments

Signed-off-by: yihong0618 <zouzou0208@gmail.com>

---------

Signed-off-by: yihong0618 <zouzou0208@gmail.com>
2025-08-06 15:43:55 +08:00
Ruihang Xia
3b7652039f feat: support __schema__ and __database__ in Prom Remote Read (#6610)
* feat: support __schema__ and __database__ in Prom remote R/W

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix integration test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* revert remote write changes

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* check matcher type

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-08-06 15:43:55 +08:00
Lei, HUANG
9164e8f50d chore: refine metrics tracking the flush/compaction cost time (#6630)
chore: refine metrics tracking the per-stage cost time during flush and compaction

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-08-06 15:43:55 +08:00
Ruihang Xia
3854e2edb4 fix: only return the __name__ label when there is one (#6629)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-08-06 15:43:55 +08:00
ZonaHe
f3dd7cccd3 feat: update dashboard to v0.10.6 (#6632)
Co-authored-by: ZonaHex <ZonaHex@users.noreply.github.com>
2025-08-06 15:43:55 +08:00
discord9
b2112c3f5c feat: panic logger (#6633)
Signed-off-by: discord9 <discord9@163.com>
2025-08-06 15:43:55 +08:00
Weny Xu
e98f2facd4 feat: introduce reconcile catalog procedure (#6613)
Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-08-06 15:43:55 +08:00
Weny Xu
98ef92bb0a refactor: remove procedure executor from DDL manager (#6625)
* refactor: remove procedure executor from DDL manager

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: clippy

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from  CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-08-06 15:43:55 +08:00
shuiyisong
1352a5b637 chore(otlp_metric): update metric and label naming (#6624)
* chore: update otlp metrics & labels naming

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* fix: typo and test

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* Update src/session/src/protocol_ctx.rs

* chore: add test cases for normalizing functions

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
Co-authored-by: Ning Sun <classicning@gmail.com>
2025-08-06 15:43:55 +08:00
Yingwen
42ed5042b4 feat: Implement a converter to converts KeyValues into BulkPart (#6620)
* chore: add api to memtable to check bulk capability

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: Add a converter to convert KeyValues into BulkPart

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: move supports_bulk_insert to MemtableBuilder

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: benchmark

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: use write_bulk if the memtable benefits from it

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: test BulkPartConverter

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: add a flag to store unencoded primary keys

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: cache schema for converter

Implements to_flat_sst_arrow_schema

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: simplify tests

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: don't use bulk convert branch now

Signed-off-by: evenyag <realevenyag@gmail.com>

* style: fix clippy

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: address review comments

* simplify primary_key_column_builders check
* return error if value is not string

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: add FlatSchemaOptions::from_encoding and test sparse encoding

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
2025-08-06 15:43:55 +08:00
Ruihang Xia
20af73ec44 docs(rfc): compatibility test framework (#6460)
* docs(rfc): compatibility test framework

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* rename file

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-08-06 15:43:55 +08:00
Yingwen
f3221a3b18 feat: HTTP API to activate/deactive heap prof (activate by default) (#6593)
* feat: add HTTP API to activate/deactivate heap profiling

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: add HTTP API to get profiling status

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: enable heap prof by default

Signed-off-by: evenyag <realevenyag@gmail.com>

* build: add "prof:true,prof_active:false" as default env to dockerfiles

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: activate heap profiling after log initialization

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: add memory options to control whether to activate profiling

Signed-off-by: evenyag <realevenyag@gmail.com>

* docs: update docs

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: fmt toml

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: fix config test

Signed-off-by: evenyag <realevenyag@gmail.com>

* docs: usage of new api

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: log profile after version

Signed-off-by: evenyag <realevenyag@gmail.com>

* docs: update how to docs

Signed-off-by: evenyag <realevenyag@gmail.com>

* docs: fix how to docs

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
2025-08-06 15:43:55 +08:00
109 changed files with 1016 additions and 1902 deletions

View File

@@ -12,7 +12,7 @@ runs:
steps:
- name: Install Etcd cluster
shell: bash
run: |
run: |
helm upgrade \
--install etcd oci://registry-1.docker.io/bitnamicharts/etcd \
--set replicaCount=${{ inputs.etcd-replicas }} \
@@ -24,4 +24,9 @@ runs:
--set auth.rbac.token.enabled=false \
--set persistence.size=2Gi \
--create-namespace \
--set global.security.allowInsecureImages=true \
--set image.registry=docker.io \
--set image.repository=greptime/etcd \
--set image.tag=3.6.1-debian-12-r3 \
--version 12.0.8 \
-n ${{ inputs.namespace }}

View File

@@ -51,7 +51,7 @@ runs:
run: |
helm upgrade \
--install my-greptimedb \
--set meta.backendStorage.etcd.endpoints=${{ inputs.etcd-endpoints }} \
--set 'meta.backendStorage.etcd.endpoints[0]=${{ inputs.etcd-endpoints }}' \
--set meta.enableRegionFailover=${{ inputs.enable-region-failover }} \
--set image.registry=${{ inputs.image-registry }} \
--set image.repository=${{ inputs.image-repository }} \

View File

@@ -12,7 +12,7 @@ runs:
steps:
- name: Install Kafka cluster
shell: bash
run: |
run: |
helm upgrade \
--install kafka oci://registry-1.docker.io/bitnamicharts/kafka \
--set controller.replicaCount=${{ inputs.controller-replicas }} \
@@ -23,4 +23,8 @@ runs:
--set listeners.controller.protocol=PLAINTEXT \
--set listeners.client.protocol=PLAINTEXT \
--create-namespace \
--set image.registry=docker.io \
--set image.repository=greptime/kafka \
--set image.tag=3.9.0-debian-12-r1 \
--version 31.0.0 \
-n ${{ inputs.namespace }}

View File

@@ -6,9 +6,7 @@ inputs:
description: "Number of PostgreSQL replicas"
namespace:
default: "postgres-namespace"
postgres-version:
default: "14.2"
description: "PostgreSQL version"
description: "The PostgreSQL namespace"
storage-size:
default: "1Gi"
description: "Storage size for PostgreSQL"
@@ -22,7 +20,11 @@ runs:
helm upgrade \
--install postgresql oci://registry-1.docker.io/bitnamicharts/postgresql \
--set replicaCount=${{ inputs.postgres-replicas }} \
--set image.tag=${{ inputs.postgres-version }} \
--set global.security.allowInsecureImages=true \
--set image.registry=docker.io \
--set image.repository=greptime/postgresql \
--set image.tag=17.5.0-debian-12-r3 \
--version 16.7.4 \
--set persistence.size=${{ inputs.storage-size }} \
--set postgresql.username=greptimedb \
--set postgresql.password=admin \

View File

@@ -68,7 +68,7 @@ function deploy_greptimedb_cluster() {
helm install "$cluster_name" greptime/greptimedb-cluster \
--set image.tag="$GREPTIMEDB_IMAGE_TAG" \
--set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \
--set meta.backendStorage.etcd.endpoints[0]="etcd.$install_namespace:2379" \
-n "$install_namespace"
# Wait for greptimedb cluster to be ready.
@@ -103,14 +103,13 @@ function deploy_greptimedb_cluster_with_s3_storage() {
helm install "$cluster_name" greptime/greptimedb-cluster -n "$install_namespace" \
--set image.tag="$GREPTIMEDB_IMAGE_TAG" \
--set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \
--set meta.backendStorage.etcd.endpoints[0]="etcd.$install_namespace:2379" \
--set storage.s3.bucket="$AWS_CI_TEST_BUCKET" \
--set storage.s3.region="$AWS_REGION" \
--set storage.s3.root="$DATA_ROOT" \
--set storage.credentials.secretName=s3-credentials \
--set storage.credentials.accessKeyId="$AWS_ACCESS_KEY_ID" \
--set storage.credentials.secretAccessKey="$AWS_SECRET_ACCESS_KEY"
# Wait for greptimedb cluster to be ready.
while true; do
PHASE=$(kubectl -n "$install_namespace" get gtc "$cluster_name" -o jsonpath='{.status.clusterPhase}')

34
.github/scripts/pull-test-deps-images.sh vendored Executable file
View File

@@ -0,0 +1,34 @@
#!/bin/bash
# This script is used to pull the test dependency images that are stored in public ECR one by one to avoid rate limiting.
set -e
MAX_RETRIES=3
IMAGES=(
"greptime/zookeeper:3.7"
"greptime/kafka:3.9.0-debian-12-r1"
"greptime/etcd:3.6.1-debian-12-r3"
"greptime/minio:2024"
"greptime/mysql:5.7"
)
for image in "${IMAGES[@]}"; do
for ((attempt=1; attempt<=MAX_RETRIES; attempt++)); do
if docker pull "$image"; then
# Successfully pulled the image.
break
else
# Use some simple exponential backoff to avoid rate limiting.
if [ $attempt -lt $MAX_RETRIES ]; then
sleep_seconds=$((attempt * 5))
echo "Attempt $attempt failed for $image, waiting $sleep_seconds seconds"
sleep $sleep_seconds # 5s, 10s delays
else
echo "Failed to pull $image after $MAX_RETRIES attempts"
exit 1
fi
fi
done
done

View File

@@ -719,6 +719,10 @@ jobs:
save-if: ${{ github.ref == 'refs/heads/main' }}
- name: Install latest nextest release
uses: taiki-e/install-action@nextest
- name: Pull test dependencies images
run: ./.github/scripts/pull-test-deps-images.sh
- name: Setup external services
working-directory: tests-integration/fixtures
run: docker compose up -d --wait

190
Cargo.lock generated
View File

@@ -217,7 +217,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
[[package]]
name = "api"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"common-base",
"common-decimal",
@@ -950,7 +950,7 @@ dependencies = [
[[package]]
name = "auth"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"api",
"async-trait",
@@ -1617,7 +1617,7 @@ dependencies = [
[[package]]
name = "cache"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"catalog",
"common-error",
@@ -1652,7 +1652,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "catalog"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"api",
"arrow 54.2.1",
@@ -1992,7 +1992,7 @@ checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97"
[[package]]
name = "cli"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"async-stream",
"async-trait",
@@ -2036,7 +2036,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"store-api",
"substrait 0.17.0",
"substrait 0.16.0",
"table",
"tempfile",
"tokio",
@@ -2045,7 +2045,7 @@ dependencies = [
[[package]]
name = "client"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"api",
"arc-swap",
@@ -2075,7 +2075,7 @@ dependencies = [
"rand 0.9.0",
"serde_json",
"snafu 0.8.5",
"substrait 0.17.0",
"substrait 0.16.0",
"substrait 0.37.3",
"tokio",
"tokio-stream",
@@ -2116,7 +2116,7 @@ dependencies = [
[[package]]
name = "cmd"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"async-trait",
"auth",
@@ -2178,7 +2178,7 @@ dependencies = [
"snafu 0.8.5",
"stat",
"store-api",
"substrait 0.17.0",
"substrait 0.16.0",
"table",
"temp-env",
"tempfile",
@@ -2225,7 +2225,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
[[package]]
name = "common-base"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"anymap2",
"async-trait",
@@ -2247,11 +2247,11 @@ dependencies = [
[[package]]
name = "common-catalog"
version = "0.17.0"
version = "0.16.0"
[[package]]
name = "common-config"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"common-base",
"common-error",
@@ -2277,7 +2277,7 @@ dependencies = [
[[package]]
name = "common-datasource"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"arrow 54.2.1",
"arrow-schema 54.3.1",
@@ -2314,7 +2314,7 @@ dependencies = [
[[package]]
name = "common-decimal"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"bigdecimal 0.4.8",
"common-error",
@@ -2327,7 +2327,7 @@ dependencies = [
[[package]]
name = "common-error"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"common-macro",
"http 1.1.0",
@@ -2338,7 +2338,7 @@ dependencies = [
[[package]]
name = "common-event-recorder"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"api",
"async-trait",
@@ -2357,7 +2357,7 @@ dependencies = [
[[package]]
name = "common-frontend"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"async-trait",
"common-error",
@@ -2374,7 +2374,7 @@ dependencies = [
[[package]]
name = "common-function"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -2433,7 +2433,7 @@ dependencies = [
[[package]]
name = "common-greptimedb-telemetry"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"async-trait",
"common-runtime",
@@ -2450,7 +2450,7 @@ dependencies = [
[[package]]
name = "common-grpc"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"api",
"arrow-flight",
@@ -2483,7 +2483,7 @@ dependencies = [
[[package]]
name = "common-grpc-expr"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"api",
"common-base",
@@ -2503,7 +2503,7 @@ dependencies = [
[[package]]
name = "common-macro"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"arc-swap",
"common-query",
@@ -2517,7 +2517,7 @@ dependencies = [
[[package]]
name = "common-mem-prof"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"anyhow",
"common-error",
@@ -2533,7 +2533,7 @@ dependencies = [
[[package]]
name = "common-meta"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"anymap2",
"api",
@@ -2604,7 +2604,7 @@ dependencies = [
[[package]]
name = "common-options"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"common-grpc",
"humantime-serde",
@@ -2613,11 +2613,11 @@ dependencies = [
[[package]]
name = "common-plugins"
version = "0.17.0"
version = "0.16.0"
[[package]]
name = "common-pprof"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"common-error",
"common-macro",
@@ -2629,7 +2629,7 @@ dependencies = [
[[package]]
name = "common-procedure"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"api",
"async-stream",
@@ -2658,7 +2658,7 @@ dependencies = [
[[package]]
name = "common-procedure-test"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"async-trait",
"common-procedure",
@@ -2668,7 +2668,7 @@ dependencies = [
[[package]]
name = "common-query"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"api",
"async-trait",
@@ -2694,7 +2694,7 @@ dependencies = [
[[package]]
name = "common-recordbatch"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"arc-swap",
"common-error",
@@ -2715,7 +2715,7 @@ dependencies = [
[[package]]
name = "common-runtime"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -2745,14 +2745,14 @@ dependencies = [
[[package]]
name = "common-session"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"strum 0.27.1",
]
[[package]]
name = "common-sql"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"common-base",
"common-datasource",
@@ -2771,7 +2771,7 @@ dependencies = [
[[package]]
name = "common-telemetry"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"backtrace",
"common-error",
@@ -2799,7 +2799,7 @@ dependencies = [
[[package]]
name = "common-test-util"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"client",
"common-grpc",
@@ -2812,7 +2812,7 @@ dependencies = [
[[package]]
name = "common-time"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"arrow 54.2.1",
"chrono",
@@ -2830,7 +2830,7 @@ dependencies = [
[[package]]
name = "common-version"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"build-data",
"cargo-manifest",
@@ -2841,7 +2841,7 @@ dependencies = [
[[package]]
name = "common-wal"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"common-base",
"common-error",
@@ -2864,7 +2864,7 @@ dependencies = [
[[package]]
name = "common-workload"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"api",
"common-telemetry",
@@ -3863,7 +3863,7 @@ dependencies = [
[[package]]
name = "datanode"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"api",
"arrow-flight",
@@ -3917,7 +3917,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"store-api",
"substrait 0.17.0",
"substrait 0.16.0",
"table",
"tokio",
"toml 0.8.19",
@@ -3927,7 +3927,7 @@ dependencies = [
[[package]]
name = "datatypes"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"arrow 54.2.1",
"arrow-array 54.2.1",
@@ -4602,7 +4602,7 @@ checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6"
[[package]]
name = "file-engine"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"api",
"async-trait",
@@ -4739,7 +4739,7 @@ checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8"
[[package]]
name = "flow"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"api",
"arrow 54.2.1",
@@ -4806,7 +4806,7 @@ dependencies = [
"sql",
"store-api",
"strum 0.27.1",
"substrait 0.17.0",
"substrait 0.16.0",
"table",
"tokio",
"tonic 0.12.3",
@@ -4861,7 +4861,7 @@ checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
[[package]]
name = "frontend"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"api",
"arc-swap",
@@ -4922,7 +4922,7 @@ dependencies = [
"sqlparser 0.54.0-greptime",
"store-api",
"strfmt",
"substrait 0.17.0",
"substrait 0.16.0",
"table",
"tokio",
"tokio-util",
@@ -6107,7 +6107,7 @@ dependencies = [
[[package]]
name = "index"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"async-trait",
"asynchronous-codec",
@@ -6870,9 +6870,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.171"
version = "0.2.178"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6"
checksum = "37c93d8daa9d8a012fd8ab92f088405fb202ea0b6ab73ee2482ae66af4f42091"
[[package]]
name = "libflate"
@@ -6928,7 +6928,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4"
dependencies = [
"cfg-if",
"windows-targets 0.52.6",
"windows-targets 0.48.5",
]
[[package]]
@@ -7005,13 +7005,13 @@ checksum = "b4ce301924b7887e9d637144fdade93f9dfff9b60981d4ac161db09720d39aa5"
[[package]]
name = "local-ip-address"
version = "0.6.3"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3669cf5561f8d27e8fc84cc15e58350e70f557d4d65f70e3154e54cd2f8e1782"
checksum = "656b3b27f8893f7bbf9485148ff9a65f019e3f33bd5cdc87c83cab16b3fd9ec8"
dependencies = [
"libc",
"neli",
"thiserror 1.0.64",
"thiserror 2.0.12",
"windows-sys 0.59.0",
]
@@ -7033,7 +7033,7 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
[[package]]
name = "log-query"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"chrono",
"common-error",
@@ -7045,7 +7045,7 @@ dependencies = [
[[package]]
name = "log-store"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"async-stream",
"async-trait",
@@ -7287,12 +7287,6 @@ version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
[[package]]
name = "md5"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae960838283323069879657ca3de837e9f7bbb4c7bf6ea7f1b290d5e9476d2e0"
[[package]]
name = "measure_time"
version = "0.9.0"
@@ -7348,7 +7342,7 @@ dependencies = [
[[package]]
name = "meta-client"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"api",
"async-trait",
@@ -7376,7 +7370,7 @@ dependencies = [
[[package]]
name = "meta-srv"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"api",
"async-trait",
@@ -7474,7 +7468,7 @@ dependencies = [
[[package]]
name = "metric-engine"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"api",
"aquamarine",
@@ -7499,7 +7493,6 @@ dependencies = [
"lazy_static",
"mito-codec",
"mito2",
"moka",
"mur3",
"object-store",
"prometheus",
@@ -7567,7 +7560,7 @@ dependencies = [
[[package]]
name = "mito-codec"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"api",
"bytes",
@@ -7590,7 +7583,7 @@ dependencies = [
[[package]]
name = "mito2"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"api",
"aquamarine",
@@ -8343,7 +8336,7 @@ dependencies = [
[[package]]
name = "object-store"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"anyhow",
"bytes",
@@ -8355,7 +8348,7 @@ dependencies = [
"futures",
"humantime-serde",
"lazy_static",
"md5 0.7.0",
"md5",
"moka",
"opendal",
"prometheus",
@@ -8679,7 +8672,7 @@ dependencies = [
[[package]]
name = "operator"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -8735,7 +8728,7 @@ dependencies = [
"sql",
"sqlparser 0.54.0-greptime",
"store-api",
"substrait 0.17.0",
"substrait 0.16.0",
"table",
"tokio",
"tokio-util",
@@ -8994,7 +8987,7 @@ dependencies = [
[[package]]
name = "partition"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"api",
"async-trait",
@@ -9188,9 +9181,8 @@ dependencies = [
[[package]]
name = "pgwire"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "017b8b74f9e8c7aff0087d4ef2b91676a5509e8928100d6a3510fd472210feb5"
version = "0.30.2"
source = "git+https://github.com/sunng87/pgwire?rev=127573d997228cfb70c7699881c568eae8131270#127573d997228cfb70c7699881c568eae8131270"
dependencies = [
"async-trait",
"bytes",
@@ -9199,7 +9191,7 @@ dependencies = [
"futures",
"hex",
"lazy-regex",
"md5 0.8.0",
"md5",
"postgres-types",
"rand 0.9.0",
"ring",
@@ -9322,7 +9314,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pipeline"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -9466,7 +9458,7 @@ dependencies = [
[[package]]
name = "plugins"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"auth",
"clap 4.5.19",
@@ -9779,7 +9771,7 @@ dependencies = [
[[package]]
name = "promql"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"ahash 0.8.11",
"async-trait",
@@ -10062,7 +10054,7 @@ dependencies = [
[[package]]
name = "puffin"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"async-compression 0.4.13",
"async-trait",
@@ -10104,7 +10096,7 @@ dependencies = [
[[package]]
name = "query"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -10170,7 +10162,7 @@ dependencies = [
"sqlparser 0.54.0-greptime",
"statrs",
"store-api",
"substrait 0.17.0",
"substrait 0.16.0",
"table",
"tokio",
"tokio-stream",
@@ -10857,7 +10849,7 @@ dependencies = [
[[package]]
name = "rskafka"
version = "0.6.0"
source = "git+https://github.com/influxdata/rskafka.git?rev=8dbd01ed809f5a791833a594e85b144e36e45820#8dbd01ed809f5a791833a594e85b144e36e45820"
source = "git+https://github.com/WenyXu/rskafka.git?rev=9494304ae3947b07e660b5d08549ad4a39c84a26#9494304ae3947b07e660b5d08549ad4a39c84a26"
dependencies = [
"bytes",
"chrono",
@@ -11523,7 +11515,7 @@ dependencies = [
[[package]]
name = "servers"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -11647,7 +11639,7 @@ dependencies = [
[[package]]
name = "session"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"ahash 0.8.11",
"api",
@@ -11987,7 +11979,7 @@ dependencies = [
[[package]]
name = "sql"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"api",
"chrono",
@@ -12044,7 +12036,7 @@ dependencies = [
[[package]]
name = "sqlness-runner"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -12339,12 +12331,12 @@ dependencies = [
"cfg-if",
"libc",
"psm",
"windows-sys 0.59.0",
"windows-sys 0.52.0",
]
[[package]]
name = "stat"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"nix 0.30.1",
]
@@ -12370,7 +12362,7 @@ dependencies = [
[[package]]
name = "store-api"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"api",
"aquamarine",
@@ -12533,7 +12525,7 @@ dependencies = [
[[package]]
name = "substrait"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"async-trait",
"bytes",
@@ -12734,7 +12726,7 @@ dependencies = [
[[package]]
name = "table"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"api",
"async-trait",
@@ -13003,7 +12995,7 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
[[package]]
name = "tests-fuzz"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"arbitrary",
"async-trait",
@@ -13047,7 +13039,7 @@ dependencies = [
[[package]]
name = "tests-integration"
version = "0.17.0"
version = "0.16.0"
dependencies = [
"api",
"arrow-flight",
@@ -13117,7 +13109,7 @@ dependencies = [
"sql",
"sqlx",
"store-api",
"substrait 0.17.0",
"substrait 0.16.0",
"table",
"tempfile",
"time",
@@ -14606,7 +14598,7 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
dependencies = [
"windows-sys 0.59.0",
"windows-sys 0.48.0",
]
[[package]]

View File

@@ -73,7 +73,7 @@ members = [
resolver = "2"
[workspace.package]
version = "0.17.0"
version = "0.16.0"
edition = "2021"
license = "Apache-2.0"
@@ -188,7 +188,8 @@ reqwest = { version = "0.12", default-features = false, features = [
"stream",
"multipart",
] }
rskafka = { git = "https://github.com/influxdata/rskafka.git", rev = "8dbd01ed809f5a791833a594e85b144e36e45820", features = [
# Branch: feat/request-timeout-port
rskafka = { git = "https://github.com/WenyXu/rskafka.git", rev = "9494304ae3947b07e660b5d08549ad4a39c84a26", features = [
"transport-tls",
] }
rstest = "0.25"

View File

@@ -78,6 +78,8 @@
| `wal.sync_period` | String | `10s` | Duration for fsyncing log files.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.recovery_parallelism` | Integer | `2` | Parallelism during WAL recovery. |
| `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.<br/>**It's only used when the provider is `kafka`**. |
| `wal.connect_timeout` | String | `3s` | The connect timeout for kafka client.<br/>**It's only used when the provider is `kafka`**. |
| `wal.timeout` | String | `3s` | The timeout for kafka client.<br/>**It's only used when the provider is `kafka`**. |
| `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL.<br/>Set to `true` to automatically create topics for WAL.<br/>Otherwise, use topics named `topic_name_prefix_[0..num_topics)` |
| `wal.num_topics` | Integer | `64` | Number of topics.<br/>**It's only used when the provider is `kafka`**. |
| `wal.selector_type` | String | `round_robin` | Topic selector type.<br/>Available selector types:<br/>- `round_robin` (default)<br/>**It's only used when the provider is `kafka`**. |
@@ -275,7 +277,6 @@
| `meta_client` | -- | -- | The metasrv client options. |
| `meta_client.metasrv_addrs` | Array | -- | The addresses of the metasrv. |
| `meta_client.timeout` | String | `3s` | Operation timeout. |
| `meta_client.heartbeat_timeout` | String | `500ms` | Heartbeat timeout. |
| `meta_client.ddl_timeout` | String | `10s` | DDL timeout. |
| `meta_client.connect_timeout` | String | `1s` | Connect server timeout. |
| `meta_client.tcp_nodelay` | Bool | `true` | `TCP_NODELAY` option for accepted connections. |
@@ -334,6 +335,7 @@
| `region_failure_detector_initialization_delay` | String | `10m` | The delay before starting region failure detection.<br/>This delay helps prevent Metasrv from triggering unnecessary region failovers before all Datanodes are fully started.<br/>Especially useful when the cluster is not deployed with GreptimeDB Operator and maintenance mode is not enabled. |
| `allow_region_failover_on_local_wal` | Bool | `false` | Whether to allow region failover on local WAL.<br/>**This option is not recommended to be set to true, because it may lead to data loss during failover.** |
| `node_max_idle_time` | String | `24hours` | Max allowed idle time before removing node info from metasrv memory. |
| `heartbeat_interval` | String | `3s` | Base heartbeat interval for calculating distributed time constants.<br/>The frontend heartbeat interval is 6 times of the base heartbeat interval.<br/>The flownode/datanode heartbeat interval is 1 times of the base heartbeat interval.<br/>e.g., If the base heartbeat interval is 3s, the frontend heartbeat interval is 18s, the flownode/datanode heartbeat interval is 3s.<br/>If you change this value, you need to change the heartbeat interval of the flownode/frontend/datanode accordingly. |
| `enable_telemetry` | Bool | `true` | Whether to enable greptimedb telemetry. Enabled by default. |
| `runtime` | -- | -- | The runtime options. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
@@ -344,12 +346,18 @@
| `backend_tls.key_path` | String | `""` | Path to client private key file (for client authentication)<br/>Like "/path/to/client.key" |
| `backend_tls.ca_cert_path` | String | `""` | Path to CA certificate file (for server certificate verification)<br/>Required when using custom CAs or self-signed certificates<br/>Leave empty to use system root certificates only<br/>Like "/path/to/ca.crt" |
| `backend_tls.watch` | Bool | `false` | Watch for certificate file changes and auto reload |
| `backend_client` | -- | -- | The backend client options.<br/>Currently, only applicable when using etcd as the metadata store. |
| `backend_client.keep_alive_timeout` | String | `3s` | The keep alive timeout for backend client. |
| `backend_client.keep_alive_interval` | String | `10s` | The keep alive interval for backend client. |
| `backend_client.connect_timeout` | String | `3s` | The connect timeout for backend client. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.bind_addr` | String | `127.0.0.1:3002` | The address to bind the gRPC server. |
| `grpc.server_addr` | String | `127.0.0.1:3002` | The communication server address for the frontend and datanode to connect to metasrv.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `bind_addr`. |
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
| `grpc.http2_keep_alive_interval` | String | `10s` | The server side HTTP/2 keep-alive interval |
| `grpc.http2_keep_alive_timeout` | String | `3s` | The server side HTTP/2 keep-alive timeout. |
| `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
@@ -441,7 +449,6 @@
| `meta_client` | -- | -- | The metasrv client options. |
| `meta_client.metasrv_addrs` | Array | -- | The addresses of the metasrv. |
| `meta_client.timeout` | String | `3s` | Operation timeout. |
| `meta_client.heartbeat_timeout` | String | `500ms` | Heartbeat timeout. |
| `meta_client.ddl_timeout` | String | `10s` | DDL timeout. |
| `meta_client.connect_timeout` | String | `1s` | Connect server timeout. |
| `meta_client.tcp_nodelay` | Bool | `true` | `TCP_NODELAY` option for accepted connections. |
@@ -461,6 +468,8 @@
| `wal.sync_period` | String | `10s` | Duration for fsyncing log files.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.recovery_parallelism` | Integer | `2` | Parallelism during WAL recovery. |
| `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.<br/>**It's only used when the provider is `kafka`**. |
| `wal.connect_timeout` | String | `3s` | The connect timeout for kafka client.<br/>**It's only used when the provider is `kafka`**. |
| `wal.timeout` | String | `3s` | The timeout for kafka client.<br/>**It's only used when the provider is `kafka`**. |
| `wal.max_batch_bytes` | String | `1MB` | The max size of a single producer batch.<br/>Warning: Kafka has a default limit of 1MB per message in a topic.<br/>**It's only used when the provider is `kafka`**. |
| `wal.consumer_wait_timeout` | String | `100ms` | The consumer wait timeout.<br/>**It's only used when the provider is `kafka`**. |
| `wal.create_index` | Bool | `true` | Whether to enable WAL index creation.<br/>**It's only used when the provider is `kafka`**. |
@@ -604,7 +613,6 @@
| `meta_client` | -- | -- | The metasrv client options. |
| `meta_client.metasrv_addrs` | Array | -- | The addresses of the metasrv. |
| `meta_client.timeout` | String | `3s` | Operation timeout. |
| `meta_client.heartbeat_timeout` | String | `500ms` | Heartbeat timeout. |
| `meta_client.ddl_timeout` | String | `10s` | DDL timeout. |
| `meta_client.connect_timeout` | String | `1s` | Connect server timeout. |
| `meta_client.tcp_nodelay` | Bool | `true` | `TCP_NODELAY` option for accepted connections. |

View File

@@ -92,9 +92,6 @@ metasrv_addrs = ["127.0.0.1:3002"]
## Operation timeout.
timeout = "3s"
## Heartbeat timeout.
heartbeat_timeout = "500ms"
## DDL timeout.
ddl_timeout = "10s"
@@ -164,6 +161,14 @@ recovery_parallelism = 2
## **It's only used when the provider is `kafka`**.
broker_endpoints = ["127.0.0.1:9092"]
## The connect timeout for kafka client.
## **It's only used when the provider is `kafka`**.
#+ connect_timeout = "3s"
## The timeout for kafka client.
## **It's only used when the provider is `kafka`**.
#+ timeout = "3s"
## The max size of a single producer batch.
## Warning: Kafka has a default limit of 1MB per message in a topic.
## **It's only used when the provider is `kafka`**.

View File

@@ -64,9 +64,6 @@ metasrv_addrs = ["127.0.0.1:3002"]
## Operation timeout.
timeout = "3s"
## Heartbeat timeout.
heartbeat_timeout = "500ms"
## DDL timeout.
ddl_timeout = "10s"

View File

@@ -171,9 +171,6 @@ metasrv_addrs = ["127.0.0.1:3002"]
## Operation timeout.
timeout = "3s"
## Heartbeat timeout.
heartbeat_timeout = "500ms"
## DDL timeout.
ddl_timeout = "10s"

View File

@@ -55,6 +55,13 @@ allow_region_failover_on_local_wal = false
## Max allowed idle time before removing node info from metasrv memory.
node_max_idle_time = "24hours"
## Base heartbeat interval for calculating distributed time constants.
## The frontend heartbeat interval is 6 times of the base heartbeat interval.
## The flownode/datanode heartbeat interval is 1 times of the base heartbeat interval.
## e.g., If the base heartbeat interval is 3s, the frontend heartbeat interval is 18s, the flownode/datanode heartbeat interval is 3s.
## If you change this value, you need to change the heartbeat interval of the flownode/frontend/datanode accordingly.
#+ heartbeat_interval = "3s"
## Whether to enable greptimedb telemetry. Enabled by default.
#+ enable_telemetry = true
@@ -93,6 +100,16 @@ ca_cert_path = ""
## Watch for certificate file changes and auto reload
watch = false
## The backend client options.
## Currently, only applicable when using etcd as the metadata store.
#+ [backend_client]
## The keep alive timeout for backend client.
#+ keep_alive_timeout = "3s"
## The keep alive interval for backend client.
#+ keep_alive_interval = "10s"
## The connect timeout for backend client.
#+ connect_timeout = "3s"
## The gRPC server options.
[grpc]
## The address to bind the gRPC server.
@@ -107,6 +124,10 @@ runtime_size = 8
max_recv_message_size = "512MB"
## The maximum send message size for gRPC server.
max_send_message_size = "512MB"
## The server side HTTP/2 keep-alive interval
#+ http2_keep_alive_interval = "10s"
## The server side HTTP/2 keep-alive timeout.
#+ http2_keep_alive_timeout = "3s"
## The HTTP server options.
[http]

View File

@@ -209,6 +209,14 @@ recovery_parallelism = 2
## **It's only used when the provider is `kafka`**.
broker_endpoints = ["127.0.0.1:9092"]
## The connect timeout for kafka client.
## **It's only used when the provider is `kafka`**.
#+ connect_timeout = "3s"
## The timeout for kafka client.
## **It's only used when the provider is `kafka`**.
#+ timeout = "3s"
## Automatically create topics for WAL.
## Set to `true` to automatically create topics for WAL.
## Otherwise, use topics named `topic_name_prefix_[0..num_topics)`

View File

@@ -1,157 +0,0 @@
---
Feature Name: "global-gc-worker"
Tracking Issue: https://github.com/GreptimeTeam/greptimedb/issues/6571
Date: 2025-07-23
Author: "discord9 <discord9@163.com>"
---
# Global GC Worker
## Summary
This RFC proposes the integration of a garbage collection (GC) mechanism within the Compaction process. This mechanism aims to manage and remove stale files that are no longer actively used by any system component, thereby reclaiming storage space.
## Motivation
With the introduction of features such as table repartitioning, a substantial number of Parquet files can become obsolete. Furthermore, failures during manifest updates may result in orphaned files that are never referenced by the system. Therefore, a periodic garbage collection mechanism is essential to reclaim storage space by systematically removing these unused files.
## Details
### Overview
The garbage collection process will be integrated directly into the Compaction process. Upon the completion of a Compaction for a given region, the GC worker will be automatically triggered. Its primary function will be to identify and subsequently delete obsolete files that have persisted beyond their designated retention period. This integration ensures that garbage collection is performed in close conjunction with data lifecycle management, effectively leveraging the compaction process's inherent knowledge of file states.
This design prioritizes correctness and safety by explicitly linking GC execution to a well-defined operational boundary: the successful completion of a compaction cycle.
### Terminology
- **Unused File**: Refers to a file present in the storage directory that has never been formally recorded in any manifest. A common scenario for this includes cases where a new SST file is successfully written to storage, but the subsequent update to the manifest fails, leaving the file unreferenced.
- **Obsolete File**: Denotes a file that was previously recorded in a manifest but has since been explicitly marked for removal. This typically occurs following operations such as data repartitioning or compaction.
### GC Worker Process
The GC worker operates as an integral part of the Compaction process. Once a Compaction for a specific region is completed, the GC worker is automatically triggered. Executing this process on a `datanode` is preferred to eliminate the overhead associated with having to set object storage configurations in the `metasrv`.
The detailed process is as follows:
1. **Invocation**: Upon the successful completion of a Compaction for a region, the GC worker is invoked.
2. **Manifest Reading**: The worker reads the region's primary manifest to obtain a comprehensive list of all files marked as obsolete. Concurrently, it reads any temporary manifests generated by long-running queries to identify files that are currently in active use, thereby preventing their premature deletion.
3. **Lingering Time Check (Obsolete Files)**: For each identified obsolete file, the GC worker evaluates its "lingering time." Which is the time passed after it had been removed from manifest.
4. **Deletion Marking (Obsolete Files)**: Files that have exceeded their maximum configurable lingering time and are not referenced by any active temporary manifests are marked for deletion.
5. **Lingering Time (Unused Files)**: Unused files (those never recorded in any manifest) are also subject to a configurable maximum lingering time before they are eligible for deletion.
Following flowchart illustrates the GC worker's process:
```mermaid
flowchart TD
A[Compaction Completed] --> B[Trigger GC Worker]
B --> C[Scan Region Manifest]
C --> D[Identify File Types]
D --> E[Unused Files<br/>Never recorded in manifest]
D --> F[Obsolete Files<br/>Previously in manifest<br/>but marked for removal]
E --> G[Check Lingering Time]
F --> G
G --> H{File exceeds<br/>configured lingering time?}
H -->|No| I[Skip deletion]
H -->|Yes| J[Check Temporary Manifest]
J --> K{File in use by<br/>active queries?}
K -->|Yes| L[Retain file<br/>Wait for next GC cycle]
K -->|No| M[Safely delete file]
I --> N[End GC cycle]
L --> N
M --> O[Update Manifest]
O --> N
N --> P[Wait for next Compaction]
P --> A
style A fill:#e1f5fe
style B fill:#f3e5f5
style M fill:#e8f5e8
style L fill:#fff3e0
```
#### Handling Obsolete Files
An obsolete file is permanently deleted only if two conditions are met:
1. The time elapsed since its removal from the manifest (its obsolescence timestamp) exceeds a configurable threshold.
2. It is not currently referenced by any active temporary manifests.
#### Handling Unused Files
With the integration of the GC worker into the Compaction process, the risk of accidentally deleting newly created SST files that have not yet been recorded in the manifest is significantly mitigated. Consequently, the concept of "Unused Files" as a distinct category primarily susceptible to accidental deletion is largely resolved. Any files that are genuinely "unused" (i.e., never referenced by any manifest, including temporary ones) can be safely deleted after a configurable maximum lingering time.
For debugging and auditing purposes, a comprehensive list of recently deleted files can be maintained.
### Ensuring Read Consistency
To prevent the GC worker from inadvertently deleting files that are actively being utilized by long-running analytical queries, a robust protection mechanism is introduced. This mechanism relies on temporary manifests that are actively kept "alive" by the queries using them.
When a long-running query is detected (e.g., by a slow query recorder), it will write a temporary manifest to the region's manifest directory. This manifest lists all files required for the query. However, simply creating this file is not enough, as a query runner might crash, leaving the temporary manifest orphaned and preventing garbage collection indefinitely.
To address this, the following "heartbeat" mechanism is implemented:
1. **Periodic Updates**: The process executing the long-running query is responsible for periodically updating the modification timestamp of its temporary manifest file (i.e., "touching" the file). This serves as a heartbeat, signaling that the query is still active.
2. **GC Worker Verification**: When the GC worker runs, it scans for temporary manifests. For each one it finds, it checks the file's last modification time.
3. **Stale File Handling**: If a temporary manifest's last modification time is older than a configurable threshold, the GC worker considers it stale (left over from a crashed or terminated query). The GC worker will then delete this stale temporary manifest. Files that were protected only by this stale manifest are no longer shielded from garbage collection.
This approach ensures that only files for genuinely active queries are protected. The lifecycle of the temporary manifest is managed dynamically: it is created when a long query starts, kept alive through periodic updates, and is either deleted by the query upon normal completion or automatically cleaned up by the GC worker if the query terminates unexpectedly.
This mechanism may be too complex to implement at once. We can consider a two-phased approach:
1. **Phase 1 (Simple Time-Based Deletion)**: Initially, implement a simpler GC strategy that deletes obsolete files based solely on a configurable lingering time. This provides a baseline for space reclamation without the complexity of temporary manifests.
2. **Phase 2 (Consistency-Aware GC)**: Based on the practical effectiveness and observed issues from Phase 1, we can then decide whether to implement the full temporary manifest and heartbeat mechanism to handle long-running queries. This iterative approach allows for a quicker initial implementation while gathering real-world data to justify the need for a more complex solution.
## Drawbacks
- **Dependency on Compaction Frequency**: The integration of the GC worker with Compaction means that GC cycles are directly tied to the frequency of compactions. In environments with infrequent compaction operations, obsolete files may accumulate for extended periods before being reclaimed, potentially leading to increased storage consumption.
- **Race Condition with Long-Running Queries**: A potential race condition exists if a long-running query initiates but haven't write its temporary manifest in time, while a compaction process simultaneously begins and marks files used by that query as obsolete. This scenario could lead to the premature deletion of files still required by the active query. To mitigate this, the threshold time for writing a temporary manifest should be significantly shorter than the lingering time configured for obsolete files, ensuring that next GC worker runs do not delete files that are now referenced by a temporary manifest if the query is still running.
Also the read replica shouldn't be later in manifest version for more than the lingering time of obsolete files, otherwise it might ref to files that are already deleted by the GC worker.
- need to upload tmp manifest to object storage, which may introduce additional complexity and potential performance overhead. But since long-running queries are typically not frequent, the performance impact is expected to be minimal.
## Conclusion and Rationale
This section summarizes the key aspects and trade-offs of the proposed integrated GC worker, highlighting its advantages and potential challenges.
| Aspect | Current Proposal (Integrated GC) |
| :--- | :--- |
| **Implementation Complexity** | **Medium**. Requires careful integration with the compaction process and the slow query recorder for temporary manifest management. |
| **Reliability** | **High**. Integration with compaction and leveraging temporary manifests from long-running queries significantly mitigates the risk of incorrect deletion. Accurate management of lingering times for obsolete files and prevention of accidental deletion of newly created SSTs enhance data safety. |
| **Performance Overhead** | **Low to Medium**. The GC worker runs post-compaction, minimizing direct impact on write paths. Overhead from temporary manifest management by the slow query recorder is expected to be acceptable for long-running queries. |
| **Impact on Other Components** | **Moderate**. Requires modifications to the compaction process to trigger GC and the slow query recorder to manage temporary manifests. This introduces some coupling but enhances overall data safety. |
| **Deletion Strategy** | **State- and Time-Based**. Obsolete files are deleted based on a configurable lingering time, which is paused if the file is referenced by a temporary manifest. Unused files (never in a manifest) are also subject to a lingering time. |
## Unresolved Questions and Future Work
This section outlines key areas requiring further discussion and defines potential avenues for future development.
* **Slow Query Recorder Implementation**: Detailed specifications for modify slow query recorder's implementation and its precise interaction mechanisms with temporary manifests are needed.
* **Configurable Lingering Times**: Establish and make configurable the specific lingering times for both obsolete and unused files to optimize storage reclamation and data availability.
## Alternatives
### 1. Standalone GC Service
Instead of integrating the GC worker directly into the Compaction process, a standalone GC service could be implemented. This service would operate independently, periodically scanning the storage for obsolete and unused files based on manifest information and predefined retention policies.
**Pros:**
* **Decoupling**: Separates GC logic from compaction, allowing independent scaling and deployment.
* **Flexibility**: Can be configured to run at different frequencies and with different strategies than compaction.
**Cons:**
* **Increased Complexity**: Requires a separate service to manage, monitor, and coordinate with other components.
* **Potential for Redundancy**: May duplicate some file scanning logic already present in compaction.
* **Consistency Challenges**: Ensuring read consistency would require more complex coordination mechanisms between the standalone GC service and active queries, potentially involving a distributed lock manager or a more sophisticated temporary manifest system.
This alternative could be implemented in the future if the integrated GC worker proves insufficient or if there is a need for more advanced GC strategies.
### 2. Manifest-Driven Deletion (No Lingering Time)
This alternative would involve immediate deletion of files once they are removed from the manifest, without a lingering time.
**Pros:**
* **Simplicity**: Simplifies the GC logic by removing the need for lingering time management.
* **Immediate Space Reclamation**: Storage space is reclaimed as soon as files are marked for deletion.
**Cons:**
* **Increased Risk of Data Loss**: Higher risk of deleting files still in use by long-running queries or other processes if not perfectly synchronized.
* **Complex Read Consistency**: Requires extremely robust and immediate mechanisms to ensure that no active queries are referencing files marked for deletion, potentially leading to performance bottlenecks or complex error handling.
* **Debugging Challenges**: Difficult to debug issues related to premature file deletion due to the immediate nature of the operation.

View File

@@ -29,6 +29,7 @@ use crate::information_schema::{InformationExtensionRef, InformationSchemaProvid
use crate::kvbackend::manager::{SystemCatalog, CATALOG_CACHE_MAX_CAPACITY};
use crate::kvbackend::KvBackendCatalogManager;
use crate::process_manager::ProcessManagerRef;
use crate::system_schema::numbers_table_provider::NumbersTableProvider;
use crate::system_schema::pg_catalog::PGCatalogProvider;
pub struct KvBackendCatalogManagerBuilder {
@@ -119,6 +120,7 @@ impl KvBackendCatalogManagerBuilder {
DEFAULT_CATALOG_NAME.to_string(),
me.clone(),
)),
numbers_table_provider: NumbersTableProvider,
backend,
process_manager,
#[cfg(feature = "enterprise")]

View File

@@ -18,8 +18,7 @@ use std::sync::{Arc, Weak};
use async_stream::try_stream;
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID,
PG_CATALOG_NAME,
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, PG_CATALOG_NAME,
};
use common_error::ext::BoxedError;
use common_meta::cache::{
@@ -43,7 +42,6 @@ use snafu::prelude::*;
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use table::dist_table::DistTable;
use table::metadata::{TableId, TableInfoRef};
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
use table::table_name::TableName;
use table::TableRef;
use tokio::sync::Semaphore;
@@ -58,6 +56,7 @@ use crate::information_schema::InformationSchemaTableFactoryRef;
use crate::information_schema::{InformationExtensionRef, InformationSchemaProvider};
use crate::kvbackend::TableCacheRef;
use crate::process_manager::ProcessManagerRef;
use crate::system_schema::numbers_table_provider::NumbersTableProvider;
use crate::system_schema::pg_catalog::PGCatalogProvider;
use crate::system_schema::SystemSchemaProvider;
use crate::CatalogManager;
@@ -537,6 +536,7 @@ pub(super) struct SystemCatalog {
// system_schema_provider for default catalog
pub(super) information_schema_provider: Arc<InformationSchemaProvider>,
pub(super) pg_catalog_provider: Arc<PGCatalogProvider>,
pub(super) numbers_table_provider: NumbersTableProvider,
pub(super) backend: KvBackendRef,
pub(super) process_manager: Option<ProcessManagerRef>,
#[cfg(feature = "enterprise")]
@@ -566,9 +566,7 @@ impl SystemCatalog {
PG_CATALOG_NAME if channel == Channel::Postgres => {
self.pg_catalog_provider.table_names()
}
DEFAULT_SCHEMA_NAME => {
vec![NUMBERS_TABLE_NAME.to_string()]
}
DEFAULT_SCHEMA_NAME => self.numbers_table_provider.table_names(),
_ => vec![],
}
}
@@ -586,7 +584,7 @@ impl SystemCatalog {
if schema == INFORMATION_SCHEMA_NAME {
self.information_schema_provider.table(table).is_some()
} else if schema == DEFAULT_SCHEMA_NAME {
table == NUMBERS_TABLE_NAME
self.numbers_table_provider.table_exists(table)
} else if schema == PG_CATALOG_NAME && channel == Channel::Postgres {
self.pg_catalog_provider.table(table).is_some()
} else {
@@ -631,8 +629,8 @@ impl SystemCatalog {
});
pg_catalog_provider.table(table_name)
}
} else if schema == DEFAULT_SCHEMA_NAME && table_name == NUMBERS_TABLE_NAME {
Some(NumbersTable::table(NUMBERS_TABLE_ID))
} else if schema == DEFAULT_SCHEMA_NAME {
self.numbers_table_provider.table(table_name)
} else {
None
}

View File

@@ -14,6 +14,7 @@
pub mod information_schema;
mod memory_table;
pub mod numbers_table_provider;
pub mod pg_catalog;
pub mod predicate;
mod utils;

View File

@@ -0,0 +1,59 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[cfg(any(test, feature = "testing", debug_assertions))]
use common_catalog::consts::NUMBERS_TABLE_ID;
#[cfg(any(test, feature = "testing", debug_assertions))]
use table::table::numbers::NumbersTable;
#[cfg(any(test, feature = "testing", debug_assertions))]
use table::table::numbers::NUMBERS_TABLE_NAME;
use table::TableRef;
// NumbersTableProvider is a dedicated provider for feature-gating the numbers table.
#[derive(Clone)]
pub struct NumbersTableProvider;
#[cfg(any(test, feature = "testing", debug_assertions))]
impl NumbersTableProvider {
pub(crate) fn table_exists(&self, name: &str) -> bool {
name == NUMBERS_TABLE_NAME
}
pub(crate) fn table_names(&self) -> Vec<String> {
vec![NUMBERS_TABLE_NAME.to_string()]
}
pub(crate) fn table(&self, name: &str) -> Option<TableRef> {
if name == NUMBERS_TABLE_NAME {
Some(NumbersTable::table(NUMBERS_TABLE_ID))
} else {
None
}
}
}
#[cfg(not(any(test, feature = "testing", debug_assertions)))]
impl NumbersTableProvider {
pub(crate) fn table_exists(&self, _name: &str) -> bool {
false
}
pub(crate) fn table_names(&self) -> Vec<String> {
vec![]
}
pub(crate) fn table(&self, _name: &str) -> Option<TableRef> {
None
}
}

View File

@@ -20,7 +20,7 @@ use common_meta::kv_backend::chroot::ChrootKvBackend;
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::KvBackendRef;
use meta_srv::bootstrap::create_etcd_client;
use meta_srv::metasrv::BackendImpl;
use meta_srv::metasrv::{BackendClientOptions, BackendImpl};
use crate::error::{EmptyStoreAddrsSnafu, UnsupportedMemoryBackendSnafu};
@@ -67,9 +67,10 @@ impl StoreConfig {
} else {
let kvbackend = match self.backend {
BackendImpl::EtcdStore => {
let etcd_client = create_etcd_client(store_addrs)
.await
.map_err(BoxedError::new)?;
let etcd_client =
create_etcd_client(store_addrs, &BackendClientOptions::default())
.await
.map_err(BoxedError::new)?;
Ok(EtcdStore::with_etcd_client(etcd_client, max_txn_ops))
}
#[cfg(feature = "pg_kvbackend")]

View File

@@ -29,7 +29,7 @@ use futures::TryStreamExt;
use crate::error::InvalidArgumentsSnafu;
use crate::metadata::common::StoreConfig;
use crate::metadata::control::utils::{decode_key_value, get_table_id_by_name, json_fromatter};
use crate::metadata::control::utils::{decode_key_value, get_table_id_by_name, json_formatter};
use crate::Tool;
/// Getting metadata from metadata store.
@@ -206,7 +206,7 @@ impl Tool for GetTableTool {
println!(
"{}\n{}",
TableInfoKey::new(table_id),
json_fromatter(self.pretty, &*table_info)
json_formatter(self.pretty, &*table_info)
);
} else {
println!("Table info not found");
@@ -221,7 +221,7 @@ impl Tool for GetTableTool {
println!(
"{}\n{}",
TableRouteKey::new(table_id),
json_fromatter(self.pretty, &table_route)
json_formatter(self.pretty, &table_route)
);
} else {
println!("Table route not found");

View File

@@ -27,7 +27,7 @@ pub fn decode_key_value(kv: KeyValue) -> CommonMetaResult<(String, String)> {
}
/// Formats a value as a JSON string.
pub fn json_fromatter<T>(pretty: bool, value: &T) -> String
pub fn json_formatter<T>(pretty: bool, value: &T) -> String
where
T: Serialize,
{

View File

@@ -20,6 +20,7 @@ use async_trait::async_trait;
use clap::Parser;
use common_base::Plugins;
use common_config::Configurable;
use common_meta::distributed_time_constants::init_distributed_time_constants;
use common_telemetry::info;
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
use common_version::{short_version, verbose_version};
@@ -327,6 +328,7 @@ impl StartCommand {
log_versions(verbose_version(), short_version(), APP_NAME);
maybe_activate_heap_profile(&opts.component.memory);
create_resource_limit_metrics(APP_NAME);
init_distributed_time_constants(opts.component.heartbeat_interval);
info!("Metasrv start command: {:#?}", self);

View File

@@ -51,7 +51,6 @@ fn test_load_datanode_example_config() {
meta_client: Some(MetaClientOptions {
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
timeout: Duration::from_secs(3),
heartbeat_timeout: Duration::from_millis(500),
ddl_timeout: Duration::from_secs(10),
connect_timeout: Duration::from_secs(1),
tcp_nodelay: true,
@@ -116,7 +115,6 @@ fn test_load_frontend_example_config() {
meta_client: Some(MetaClientOptions {
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
timeout: Duration::from_secs(3),
heartbeat_timeout: Duration::from_millis(500),
ddl_timeout: Duration::from_secs(10),
connect_timeout: Duration::from_secs(1),
tcp_nodelay: true,
@@ -240,7 +238,6 @@ fn test_load_flownode_example_config() {
meta_client: Some(MetaClientOptions {
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
timeout: Duration::from_secs(3),
heartbeat_timeout: Duration::from_millis(500),
ddl_timeout: Duration::from_secs(10),
connect_timeout: Duration::from_secs(1),
tcp_nodelay: true,

View File

@@ -332,7 +332,7 @@ impl AggregateUDFImpl for StateWrapper {
self.inner.signature()
}
/// Coerce types also do nothing, as optimzer should be able to already make struct types
/// Coerce types also do nothing, as optimizer should be able to already make struct types
fn coerce_types(&self, arg_types: &[DataType]) -> datafusion_common::Result<Vec<DataType>> {
self.inner.coerce_types(arg_types)
}
@@ -486,7 +486,7 @@ impl AggregateUDFImpl for MergeWrapper {
&self.merge_signature
}
/// Coerce types also do nothing, as optimzer should be able to already make struct types
/// Coerce types also do nothing, as optimizer should be able to already make struct types
fn coerce_types(&self, arg_types: &[DataType]) -> datafusion_common::Result<Vec<DataType>> {
// just check if the arg_types are only one and is struct array
if arg_types.len() != 1 || !matches!(arg_types.first(), Some(DataType::Struct(_))) {

View File

@@ -12,25 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::OnceLock;
use std::time::Duration;
/// Heartbeat interval time (is the basic unit of various time).
pub const HEARTBEAT_INTERVAL_MILLIS: u64 = 3000;
/// The frontend will also send heartbeats to Metasrv, sending an empty
/// heartbeat every HEARTBEAT_INTERVAL_MILLIS * 6 seconds.
pub const FRONTEND_HEARTBEAT_INTERVAL_MILLIS: u64 = HEARTBEAT_INTERVAL_MILLIS * 6;
/// The lease seconds of a region. It's set by 3 heartbeat intervals
/// (HEARTBEAT_INTERVAL_MILLIS × 3), plus some extra buffer (1 second).
pub const REGION_LEASE_SECS: u64 =
Duration::from_millis(HEARTBEAT_INTERVAL_MILLIS * 3).as_secs() + 1;
/// When creating table or region failover, a target node needs to be selected.
/// If the node's lease has expired, the `Selector` will not select it.
pub const DATANODE_LEASE_SECS: u64 = REGION_LEASE_SECS;
pub const FLOWNODE_LEASE_SECS: u64 = DATANODE_LEASE_SECS;
pub const BASE_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(3);
/// The lease seconds of metasrv leader.
pub const META_LEASE_SECS: u64 = 5;
@@ -41,5 +26,73 @@ pub const POSTGRES_KEEP_ALIVE_SECS: u64 = 30;
/// In a lease, there are two opportunities for renewal.
pub const META_KEEP_ALIVE_INTERVAL_SECS: u64 = META_LEASE_SECS / 2;
/// The timeout of the heartbeat request.
pub const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS + 1);
/// The keep-alive interval of the heartbeat channel.
pub const HEARTBEAT_CHANNEL_KEEP_ALIVE_INTERVAL_SECS: Duration =
Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS + 1);
/// The keep-alive timeout of the heartbeat channel.
pub const HEARTBEAT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECS: Duration =
Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS + 1);
/// The default mailbox round-trip timeout.
pub const MAILBOX_RTT_SECS: u64 = 1;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
/// The distributed time constants.
pub struct DistributedTimeConstants {
pub heartbeat_interval: Duration,
pub frontend_heartbeat_interval: Duration,
pub region_lease: Duration,
pub datanode_lease: Duration,
pub flownode_lease: Duration,
}
/// The frontend heartbeat interval is 6 times of the base heartbeat interval.
pub fn frontend_heartbeat_interval(base_heartbeat_interval: Duration) -> Duration {
base_heartbeat_interval * 6
}
impl DistributedTimeConstants {
/// Create a new DistributedTimeConstants from the heartbeat interval.
pub fn from_heartbeat_interval(heartbeat_interval: Duration) -> Self {
let region_lease = heartbeat_interval * 3 + Duration::from_secs(1);
let datanode_lease = region_lease;
let flownode_lease = datanode_lease;
Self {
heartbeat_interval,
frontend_heartbeat_interval: frontend_heartbeat_interval(heartbeat_interval),
region_lease,
datanode_lease,
flownode_lease,
}
}
}
impl Default for DistributedTimeConstants {
fn default() -> Self {
Self::from_heartbeat_interval(BASE_HEARTBEAT_INTERVAL)
}
}
static DEFAULT_DISTRIBUTED_TIME_CONSTANTS: OnceLock<DistributedTimeConstants> = OnceLock::new();
/// Get the default distributed time constants.
pub fn default_distributed_time_constants() -> &'static DistributedTimeConstants {
DEFAULT_DISTRIBUTED_TIME_CONSTANTS.get_or_init(Default::default)
}
/// Initialize the default distributed time constants.
pub fn init_distributed_time_constants(base_heartbeat_interval: Duration) {
let distributed_time_constants =
DistributedTimeConstants::from_heartbeat_interval(base_heartbeat_interval);
DEFAULT_DISTRIBUTED_TIME_CONSTANTS
.set(distributed_time_constants)
.expect("Failed to set default distributed time constants");
common_telemetry::info!(
"Initialized default distributed time constants: {:#?}",
distributed_time_constants
);
}

View File

@@ -528,9 +528,6 @@ pub enum Error {
source: common_wal::error::Error,
},
#[snafu(display("Failed to resolve Kafka broker endpoint."))]
ResolveKafkaEndpoint { source: common_wal::error::Error },
#[snafu(display("Failed to build a Kafka controller client"))]
BuildKafkaCtrlClient {
#[snafu(implicit)]
@@ -987,39 +984,6 @@ pub enum Error {
table_name: String,
table_id: TableId,
},
#[snafu(display(
"Column not found in column metadata, column_name: {}, column_id: {}",
column_name,
column_id
))]
ColumnNotFound { column_name: String, column_id: u32 },
#[snafu(display(
"Column id mismatch, column_name: {}, expected column_id: {}, actual column_id: {}",
column_name,
expected_column_id,
actual_column_id
))]
ColumnIdMismatch {
column_name: String,
expected_column_id: u32,
actual_column_id: u32,
},
#[snafu(display(
"Timestamp column mismatch, expected column_name: {}, expected column_id: {}, actual column_name: {}, actual column_id: {}",
expected_column_name,
expected_column_id,
actual_column_name,
actual_column_id,
))]
TimestampMismatch {
expected_column_name: String,
expected_column_id: u32,
actual_column_name: String,
actual_column_id: u32,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -1045,10 +1009,7 @@ impl ErrorExt for Error {
| MissingColumnIds { .. }
| MissingColumnInColumnMetadata { .. }
| MismatchColumnId { .. }
| ColumnMetadataConflicts { .. }
| ColumnNotFound { .. }
| ColumnIdMismatch { .. }
| TimestampMismatch { .. } => StatusCode::Unexpected,
| ColumnMetadataConflicts { .. } => StatusCode::Unexpected,
Unsupported { .. } => StatusCode::Unsupported,
WriteObject { .. } | ReadObject { .. } => StatusCode::StorageUnavailable,
@@ -1076,7 +1037,6 @@ impl ErrorExt for Error {
| BuildKafkaClient { .. }
| BuildKafkaCtrlClient { .. }
| KafkaPartitionClient { .. }
| ResolveKafkaEndpoint { .. }
| ProduceRecord { .. }
| CreateKafkaWalTopic { .. }
| EmptyTopicPool { .. }

View File

@@ -15,13 +15,6 @@
use lazy_static::lazy_static;
use prometheus::*;
pub const TABLE_TYPE_PHYSICAL: &str = "physical";
pub const TABLE_TYPE_LOGICAL: &str = "logical";
pub const ERROR_TYPE_RETRYABLE: &str = "retryable";
pub const ERROR_TYPE_EXTERNAL: &str = "external";
pub const STATS_TYPE_NO_REGION_METADATA: &str = "no_region_metadata";
pub const STATS_TYPE_REGION_NOT_OPEN: &str = "region_not_open";
lazy_static! {
pub static ref METRIC_META_TXN_REQUEST: HistogramVec = register_histogram_vec!(
"greptime_meta_txn_request",
@@ -121,39 +114,4 @@ lazy_static! {
&["backend", "result", "op", "type"]
)
.unwrap();
pub static ref METRIC_META_RECONCILIATION_LIST_REGION_METADATA_DURATION: HistogramVec =
register_histogram_vec!(
"greptime_meta_reconciliation_list_region_metadata_duration",
"reconciliation list region metadata duration",
&["table_type"]
)
.unwrap();
pub static ref METRIC_META_RECONCILIATION_RESOLVED_COLUMN_METADATA: IntCounterVec =
register_int_counter_vec!(
"greptime_meta_reconciliation_resolved_column_metadata",
"reconciliation resolved column metadata",
&["strategy"]
)
.unwrap();
pub static ref METRIC_META_RECONCILIATION_STATS: IntCounterVec =
register_int_counter_vec!(
"greptime_meta_reconciliation_stats",
"reconciliation stats",
&["procedure_name", "table_type", "type"]
)
.unwrap();
pub static ref METRIC_META_RECONCILIATION_PROCEDURE: HistogramVec =
register_histogram_vec!(
"greptime_meta_reconciliation_procedure",
"reconcile table procedure",
&["procedure_name", "step"]
)
.unwrap();
pub static ref METRIC_META_RECONCILIATION_PROCEDURE_ERROR: IntCounterVec =
register_int_counter_vec!(
"greptime_meta_reconciliation_procedure_error",
"reconciliation procedure error",
&["procedure_name", "step", "error_type"]
)
.unwrap();
}

View File

@@ -14,11 +14,10 @@
use std::any::Any;
use std::fmt::Debug;
use std::time::Instant;
use common_procedure::error::FromJsonSnafu;
use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, ProcedureId,
Result as ProcedureResult, Status,
};
use futures::stream::BoxStream;
@@ -29,13 +28,11 @@ use crate::cache_invalidator::CacheInvalidatorRef;
use crate::error::Result;
use crate::key::TableMetadataManagerRef;
use crate::lock_key::CatalogLock;
use crate::metrics;
use crate::node_manager::NodeManagerRef;
use crate::reconciliation::reconcile_catalog::start::ReconcileCatalogStart;
use crate::reconciliation::reconcile_database::utils::wait_for_inflight_subprocedures;
use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveStrategy;
use crate::reconciliation::utils::{
wait_for_inflight_subprocedures, Context, ReconcileCatalogMetrics, SubprocedureMeta,
};
use crate::reconciliation::utils::Context;
pub(crate) mod end;
pub(crate) mod reconcile_databases;
@@ -64,15 +61,13 @@ impl ReconcileCatalogContext {
&mut self,
procedure_ctx: &ProcedureContext,
) -> Result<()> {
if let Some(subprocedure) = self.volatile_ctx.inflight_subprocedure.take() {
let subprocedures = [subprocedure];
let result = wait_for_inflight_subprocedures(
if let Some(procedure_id) = self.volatile_ctx.inflight_subprocedure {
wait_for_inflight_subprocedures(
procedure_ctx,
&subprocedures,
&[procedure_id],
self.persistent_ctx.fast_fail,
)
.await?;
self.volatile_ctx.metrics += result.into();
}
Ok(())
}
@@ -102,26 +97,12 @@ impl PersistentContext {
}
}
#[derive(Default)]
pub(crate) struct VolatileContext {
/// Stores the stream of catalogs.
schemas: Option<BoxStream<'static, Result<String>>>,
/// Stores the inflight subprocedure.
inflight_subprocedure: Option<SubprocedureMeta>,
/// Stores the metrics of reconciling catalog.
metrics: ReconcileCatalogMetrics,
/// The start time of the reconciliation.
start_time: Instant,
}
impl Default for VolatileContext {
fn default() -> Self {
Self {
schemas: None,
inflight_subprocedure: None,
metrics: Default::default(),
start_time: Instant::now(),
}
}
inflight_subprocedure: Option<ProcedureId>,
}
pub struct ReconcileCatalogProcedure {
@@ -177,11 +158,6 @@ impl Procedure for ReconcileCatalogProcedure {
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &mut self.state;
let procedure_name = Self::TYPE_NAME;
let step = state.name();
let _timer = metrics::METRIC_META_RECONCILIATION_PROCEDURE
.with_label_values(&[procedure_name, step])
.start_timer();
match state.next(&mut self.context, _ctx).await {
Ok((next, status)) => {
*state = next;
@@ -189,14 +165,8 @@ impl Procedure for ReconcileCatalogProcedure {
}
Err(e) => {
if e.is_retry_later() {
metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR
.with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_RETRYABLE])
.inc();
Err(ProcedureError::retry_later(e))
} else {
metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR
.with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_EXTERNAL])
.inc();
Err(ProcedureError::external(e))
}
}

View File

@@ -15,7 +15,6 @@
use std::any::Any;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use crate::error::Result;
@@ -29,16 +28,9 @@ pub(crate) struct ReconcileCatalogEnd;
impl State for ReconcileCatalogEnd {
async fn next(
&mut self,
ctx: &mut ReconcileCatalogContext,
procedure_ctx: &ProcedureContext,
_ctx: &mut ReconcileCatalogContext,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
info!(
"Catalog reconciliation completed. catalog: {}, procedure_id: {}, metrics: {}, elapsed: {:?}",
ctx.persistent_ctx.catalog,
procedure_ctx.procedure_id,
ctx.volatile_ctx.metrics,
ctx.volatile_ctx.start_time.elapsed()
);
Ok((Box::new(ReconcileCatalogEnd), Status::done()))
}

View File

@@ -23,7 +23,7 @@ use crate::error::Result;
use crate::reconciliation::reconcile_catalog::end::ReconcileCatalogEnd;
use crate::reconciliation::reconcile_catalog::{ReconcileCatalogContext, State};
use crate::reconciliation::reconcile_database::ReconcileDatabaseProcedure;
use crate::reconciliation::utils::{Context, SubprocedureMeta};
use crate::reconciliation::utils::Context;
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct ReconcileDatabases;
@@ -83,18 +83,13 @@ impl ReconcileDatabases {
let procedure = ReconcileDatabaseProcedure::new(
context,
ctx.persistent_ctx.catalog.clone(),
schema.clone(),
schema,
ctx.persistent_ctx.fast_fail,
ctx.persistent_ctx.parallelism,
ctx.persistent_ctx.resolve_strategy,
true,
);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
ctx.volatile_ctx.inflight_subprocedure = Some(SubprocedureMeta::new_reconcile_database(
procedure_with_id.id,
ctx.persistent_ctx.catalog.clone(),
schema,
));
Ok((
Box::new(ReconcileDatabases),

View File

@@ -16,16 +16,16 @@ pub(crate) mod end;
pub(crate) mod reconcile_logical_tables;
pub(crate) mod reconcile_tables;
pub(crate) mod start;
pub(crate) mod utils;
use std::any::Any;
use std::collections::HashMap;
use std::fmt::Debug;
use std::time::Instant;
use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, ProcedureId,
Result as ProcedureResult, Status,
};
use futures::stream::BoxStream;
@@ -39,13 +39,12 @@ use crate::error::Result;
use crate::key::table_name::TableNameValue;
use crate::key::TableMetadataManagerRef;
use crate::lock_key::{CatalogLock, SchemaLock};
use crate::metrics;
use crate::node_manager::NodeManagerRef;
use crate::reconciliation::reconcile_database::start::ReconcileDatabaseStart;
use crate::reconciliation::reconcile_database::utils::wait_for_inflight_subprocedures;
use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveStrategy;
use crate::reconciliation::utils::{
wait_for_inflight_subprocedures, Context, ReconcileDatabaseMetrics, SubprocedureMeta,
};
use crate::reconciliation::utils::Context;
pub(crate) const DEFAULT_PARALLELISM: usize = 64;
pub(crate) struct ReconcileDatabaseContext {
@@ -67,32 +66,22 @@ impl ReconcileDatabaseContext {
}
}
/// Waits for inflight subprocedures to complete.
pub(crate) async fn wait_for_inflight_subprocedures(
&mut self,
procedure_ctx: &ProcedureContext,
) -> Result<()> {
if !self.volatile_ctx.inflight_subprocedures.is_empty() {
let result = wait_for_inflight_subprocedures(
wait_for_inflight_subprocedures(
procedure_ctx,
&self.volatile_ctx.inflight_subprocedures,
self.persistent_ctx.fail_fast,
)
.await?;
// Collects result into metrics
let metrics = result.into();
self.volatile_ctx.inflight_subprocedures.clear();
self.volatile_ctx.metrics += metrics;
}
Ok(())
}
/// Returns the immutable metrics.
pub(crate) fn metrics(&self) -> &ReconcileDatabaseMetrics {
&self.volatile_ctx.metrics
}
}
#[derive(Debug, Serialize, Deserialize)]
@@ -125,6 +114,7 @@ impl PersistentContext {
}
}
#[derive(Default)]
pub(crate) struct VolatileContext {
/// Stores pending physical tables.
pending_tables: Vec<(TableId, TableName)>,
@@ -134,26 +124,9 @@ pub(crate) struct VolatileContext {
/// - Value: Vector of (TableId, TableName) tuples representing logical tables belonging to the physical table.
pending_logical_tables: HashMap<TableId, Vec<(TableId, TableName)>>,
/// Stores inflight subprocedures.
inflight_subprocedures: Vec<SubprocedureMeta>,
inflight_subprocedures: Vec<ProcedureId>,
/// Stores the stream of tables.
tables: Option<BoxStream<'static, Result<(String, TableNameValue)>>>,
/// The metrics of reconciling database.
metrics: ReconcileDatabaseMetrics,
/// The start time of the reconciliation.
start_time: Instant,
}
impl Default for VolatileContext {
fn default() -> Self {
Self {
pending_tables: vec![],
pending_logical_tables: HashMap::new(),
inflight_subprocedures: vec![],
tables: None,
metrics: ReconcileDatabaseMetrics::default(),
start_time: Instant::now(),
}
}
}
pub struct ReconcileDatabaseProcedure {
@@ -217,11 +190,6 @@ impl Procedure for ReconcileDatabaseProcedure {
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &mut self.state;
let procedure_name = Self::TYPE_NAME;
let step = state.name();
let _timer = metrics::METRIC_META_RECONCILIATION_PROCEDURE
.with_label_values(&[procedure_name, step])
.start_timer();
match state.next(&mut self.context, _ctx).await {
Ok((next, status)) => {
*state = next;
@@ -229,14 +197,8 @@ impl Procedure for ReconcileDatabaseProcedure {
}
Err(e) => {
if e.is_retry_later() {
metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR
.with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_RETRYABLE])
.inc();
Err(ProcedureError::retry_later(e))
} else {
metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR
.with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_EXTERNAL])
.inc();
Err(ProcedureError::external(e))
}
}

View File

@@ -15,7 +15,6 @@
use std::any::Any;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use crate::error::Result;
@@ -29,17 +28,9 @@ pub(crate) struct ReconcileDatabaseEnd;
impl State for ReconcileDatabaseEnd {
async fn next(
&mut self,
ctx: &mut ReconcileDatabaseContext,
procedure_ctx: &ProcedureContext,
_ctx: &mut ReconcileDatabaseContext,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
info!(
"Database reconciliation completed. schema: {}, catalog: {}, procedure_id: {}, metrics: {}, elapsed: {:?}",
ctx.persistent_ctx.schema,
ctx.persistent_ctx.catalog,
procedure_ctx.procedure_id,
ctx.metrics(),
ctx.volatile_ctx.start_time.elapsed(),
);
Ok((Box::new(ReconcileDatabaseEnd), Status::done()))
}

View File

@@ -29,7 +29,7 @@ use crate::key::table_route::TableRouteValue;
use crate::reconciliation::reconcile_database::end::ReconcileDatabaseEnd;
use crate::reconciliation::reconcile_database::{ReconcileDatabaseContext, State};
use crate::reconciliation::reconcile_logical_tables::ReconcileLogicalTablesProcedure;
use crate::reconciliation::utils::{Context, SubprocedureMeta};
use crate::reconciliation::utils::Context;
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct ReconcileLogicalTables;
@@ -128,12 +128,13 @@ impl State for ReconcileLogicalTables {
impl ReconcileLogicalTables {
fn schedule_reconcile_logical_tables(
ctx: &mut ReconcileDatabaseContext,
buffer: &mut Vec<(ProcedureWithId, SubprocedureMeta)>,
buffer: &mut Vec<ProcedureWithId>,
) -> Result<(Box<dyn State>, Status)> {
let buffer = std::mem::take(buffer);
let (procedures, meta): (Vec<_>, Vec<_>) = buffer.into_iter().unzip();
let procedures = std::mem::take(buffer);
ctx.volatile_ctx
.inflight_subprocedures
.extend(procedures.iter().map(|p| p.id));
ctx.volatile_ctx.inflight_subprocedures.extend(meta);
Ok((
Box::new(ReconcileLogicalTables),
Status::suspended(procedures, false),
@@ -141,7 +142,7 @@ impl ReconcileLogicalTables {
}
fn should_schedule_reconcile_logical_tables(
buffer: &[(ProcedureWithId, SubprocedureMeta)],
buffer: &[ProcedureWithId],
parallelism: usize,
) -> bool {
buffer.len() >= parallelism
@@ -151,7 +152,7 @@ impl ReconcileLogicalTables {
ctx: &Context,
pending_logical_tables: &mut HashMap<TableId, Vec<(TableId, TableName)>>,
parallelism: usize,
) -> Result<Option<(ProcedureWithId, SubprocedureMeta)>> {
) -> Result<Option<ProcedureWithId>> {
let mut physical_table_id = None;
for (table_id, tables) in pending_logical_tables.iter() {
if tables.len() >= parallelism {
@@ -175,7 +176,7 @@ impl ReconcileLogicalTables {
async fn build_remaining_procedures(
ctx: &Context,
pending_logical_tables: &mut HashMap<TableId, Vec<(TableId, TableName)>>,
pending_procedures: &mut Vec<(ProcedureWithId, SubprocedureMeta)>,
pending_procedures: &mut Vec<ProcedureWithId>,
parallelism: usize,
) -> Result<()> {
if pending_logical_tables.is_empty() {
@@ -202,7 +203,7 @@ impl ReconcileLogicalTables {
ctx: &Context,
physical_table_id: TableId,
logical_tables: Vec<(TableId, TableName)>,
) -> Result<(ProcedureWithId, SubprocedureMeta)> {
) -> Result<ProcedureWithId> {
let table_info = ctx
.table_metadata_manager
.table_info_manager()
@@ -216,18 +217,12 @@ impl ReconcileLogicalTables {
let procedure = ReconcileLogicalTablesProcedure::new(
ctx.clone(),
physical_table_id,
physical_table_name.clone(),
logical_tables.clone(),
true,
);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
let subprocedure_meta = SubprocedureMeta::new_logical_table(
procedure_with_id.id,
physical_table_id,
physical_table_name,
logical_tables,
true,
);
Ok((procedure_with_id, subprocedure_meta))
Ok(ProcedureWithId::with_random_id(Box::new(procedure)))
}
fn enqueue_logical_table(

View File

@@ -27,7 +27,7 @@ use crate::key::table_route::TableRouteValue;
use crate::reconciliation::reconcile_database::reconcile_logical_tables::ReconcileLogicalTables;
use crate::reconciliation::reconcile_database::{ReconcileDatabaseContext, State};
use crate::reconciliation::reconcile_table::ReconcileTableProcedure;
use crate::reconciliation::utils::{Context, SubprocedureMeta};
use crate::reconciliation::utils::Context;
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct ReconcileTables;
@@ -104,14 +104,14 @@ impl ReconcileTables {
ctx: &mut ReconcileDatabaseContext,
) -> Result<(Box<dyn State>, Status)> {
let tables = std::mem::take(&mut ctx.volatile_ctx.pending_tables);
let (procedures, meta): (Vec<_>, Vec<_>) =
Self::build_reconcile_table_procedures(ctx, tables)
.into_iter()
.unzip();
ctx.volatile_ctx.inflight_subprocedures.extend(meta);
let subprocedures = Self::build_reconcile_table_procedures(ctx, tables);
ctx.volatile_ctx
.inflight_subprocedures
.extend(subprocedures.iter().map(|p| p.id));
Ok((
Box::new(ReconcileTables),
Status::suspended(procedures, false),
Status::suspended(subprocedures, false),
))
}
@@ -125,7 +125,7 @@ impl ReconcileTables {
fn build_reconcile_table_procedures(
ctx: &ReconcileDatabaseContext,
tables: Vec<(TableId, TableName)>,
) -> Vec<(ProcedureWithId, SubprocedureMeta)> {
) -> Vec<ProcedureWithId> {
let mut procedures = Vec::with_capacity(tables.len());
for (table_id, table_name) in tables {
let context = Context {
@@ -141,13 +141,11 @@ impl ReconcileTables {
true,
);
let procedure = ProcedureWithId::with_random_id(Box::new(procedure));
let meta =
SubprocedureMeta::new_physical_table(procedure.id, table_id, table_name.clone());
info!(
"Reconcile table: {}, table_id: {}, procedure_id: {}",
table_name, table_id, procedure.id
);
procedures.push((procedure, meta));
procedures.push(procedure)
}
procedures

View File

@@ -33,7 +33,7 @@ impl State for ReconcileDatabaseStart {
async fn next(
&mut self,
ctx: &mut ReconcileDatabaseContext,
procedure_ctx: &ProcedureContext,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let exists = ctx
.table_metadata_manager
@@ -51,8 +51,8 @@ impl State for ReconcileDatabaseStart {
},
);
info!(
"Reconcile database: {}, catalog: {}, procedure_id: {}",
ctx.persistent_ctx.schema, ctx.persistent_ctx.catalog, procedure_ctx.procedure_id,
"Reconcile database: {}, catalog: {}",
ctx.persistent_ctx.schema, ctx.persistent_ctx.catalog
);
Ok((Box::new(ReconcileTables), Status::executing(true)))
}

View File

@@ -0,0 +1,79 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_procedure::{watcher, Context as ProcedureContext, ProcedureId};
use common_telemetry::{error, info, warn};
use futures::future::{join_all, try_join_all};
use snafu::{OptionExt, ResultExt};
use crate::error::{
ProcedureStateReceiverNotFoundSnafu, ProcedureStateReceiverSnafu, Result, WaitProcedureSnafu,
};
/// Wait for inflight subprocedures.
///
/// If `fail_fast` is true, the function will return an error if any subprocedure fails.
/// Otherwise, the function will continue waiting for all subprocedures to complete.
pub(crate) async fn wait_for_inflight_subprocedures(
procedure_ctx: &ProcedureContext,
subprocedures: &[ProcedureId],
fail_fast: bool,
) -> Result<()> {
let mut receivers = Vec::with_capacity(subprocedures.len());
for procedure_id in subprocedures {
let receiver = procedure_ctx
.provider
.procedure_state_receiver(*procedure_id)
.await
.context(ProcedureStateReceiverSnafu {
procedure_id: *procedure_id,
})?
.context(ProcedureStateReceiverNotFoundSnafu {
procedure_id: *procedure_id,
})?;
receivers.push(receiver);
}
let mut tasks = Vec::with_capacity(receivers.len());
for receiver in receivers.iter_mut() {
let fut = watcher::wait(receiver);
tasks.push(fut);
}
if fail_fast {
try_join_all(tasks).await.context(WaitProcedureSnafu)?;
} else {
let mut failed = 0;
let total = tasks.len();
for result in join_all(tasks).await {
if let Err(e) = result {
error!(e; "inflight subprocedure, procedure_id: {}", procedure_ctx.procedure_id);
failed += 1;
}
}
if failed > 0 {
warn!(
"{} inflight subprocedures failed, total: {}, procedure_id: {}",
failed, total, procedure_ctx.procedure_id
);
} else {
info!(
"{} inflight subprocedures completed, procedure_id: {}",
total, procedure_ctx.procedure_id
);
}
}
Ok(())
}

View File

@@ -40,17 +40,15 @@ use crate::key::table_info::TableInfoValue;
use crate::key::table_route::PhysicalTableRouteValue;
use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
use crate::metrics;
use crate::node_manager::NodeManagerRef;
use crate::reconciliation::reconcile_logical_tables::reconciliation_start::ReconciliationStart;
use crate::reconciliation::utils::{Context, ReconcileLogicalTableMetrics};
use crate::reconciliation::utils::Context;
pub struct ReconcileLogicalTablesContext {
pub node_manager: NodeManagerRef,
pub table_metadata_manager: TableMetadataManagerRef,
pub cache_invalidator: CacheInvalidatorRef,
pub persistent_ctx: PersistentContext,
pub volatile_ctx: VolatileContext,
}
impl ReconcileLogicalTablesContext {
@@ -61,29 +59,16 @@ impl ReconcileLogicalTablesContext {
table_metadata_manager: ctx.table_metadata_manager,
cache_invalidator: ctx.cache_invalidator,
persistent_ctx,
volatile_ctx: VolatileContext::default(),
}
}
/// Returns the physical table name.
pub(crate) fn table_name(&self) -> &TableName {
&self.persistent_ctx.table_name
}
/// Returns the physical table id.
pub(crate) fn table_id(&self) -> TableId {
self.persistent_ctx.table_id
}
/// Returns a mutable reference to the metrics.
pub(crate) fn mut_metrics(&mut self) -> &mut ReconcileLogicalTableMetrics {
&mut self.volatile_ctx.metrics
}
/// Returns a reference to the metrics.
pub(crate) fn metrics(&self) -> &ReconcileLogicalTableMetrics {
&self.volatile_ctx.metrics
}
}
#[derive(Debug, Serialize, Deserialize)]
@@ -135,11 +120,6 @@ impl PersistentContext {
}
}
#[derive(Default)]
pub(crate) struct VolatileContext {
pub(crate) metrics: ReconcileLogicalTableMetrics,
}
pub struct ReconcileLogicalTablesProcedure {
pub context: ReconcileLogicalTablesContext,
state: Box<dyn State>,
@@ -193,11 +173,6 @@ impl Procedure for ReconcileLogicalTablesProcedure {
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &mut self.state;
let procedure_name = Self::TYPE_NAME;
let step = state.name();
let _timer = metrics::METRIC_META_RECONCILIATION_PROCEDURE
.with_label_values(&[procedure_name, step])
.start_timer();
match state.next(&mut self.context, _ctx).await {
Ok((next, status)) => {
*state = next;
@@ -205,14 +180,8 @@ impl Procedure for ReconcileLogicalTablesProcedure {
}
Err(e) => {
if e.is_retry_later() {
metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR
.with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_RETRYABLE])
.inc();
Err(ProcedureError::retry_later(e))
} else {
metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR
.with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_EXTERNAL])
.inc();
Err(ProcedureError::external(e))
}
}

View File

@@ -15,7 +15,6 @@
use std::any::Any;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use crate::error::Result;
@@ -29,21 +28,9 @@ pub struct ReconciliationEnd;
impl State for ReconciliationEnd {
async fn next(
&mut self,
ctx: &mut ReconcileLogicalTablesContext,
procedure_ctx: &ProcedureContext,
_ctx: &mut ReconcileLogicalTablesContext,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let table_id = ctx.table_id();
let table_name = ctx.table_name();
let metrics = ctx.metrics();
info!(
"Logical tables reconciliation completed. logical tables: {:?}, physical_table_id: {}, table_name: {}, procedure_id: {}, metrics: {}",
ctx.persistent_ctx.logical_table_ids,
table_id,
table_name,
procedure_ctx.procedure_id,
metrics
);
Ok((Box::new(ReconciliationEnd), Status::done()))
}

View File

@@ -25,11 +25,8 @@ use crate::ddl::utils::region_metadata_lister::RegionMetadataLister;
use crate::ddl::utils::table_id::get_all_table_ids_by_names;
use crate::ddl::utils::table_info::all_logical_table_routes_have_same_physical_id;
use crate::error::{self, Result};
use crate::metrics;
use crate::reconciliation::reconcile_logical_tables::resolve_table_metadatas::ResolveTableMetadatas;
use crate::reconciliation::reconcile_logical_tables::{
ReconcileLogicalTablesContext, ReconcileLogicalTablesProcedure, State,
};
use crate::reconciliation::reconcile_logical_tables::{ReconcileLogicalTablesContext, State};
use crate::reconciliation::utils::check_column_metadatas_consistent;
/// The start state of the reconciliation procedure.
@@ -42,7 +39,7 @@ impl State for ReconciliationStart {
async fn next(
&mut self,
ctx: &mut ReconcileLogicalTablesContext,
procedure_ctx: &ProcedureContext,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let table_id = ctx.table_id();
let table_name = ctx.table_name();
@@ -61,48 +58,35 @@ impl State for ReconciliationStart {
}
);
info!(
"Starting reconciliation for logical table: table_id: {}, table_name: {}",
table_id, table_name
);
let region_metadata_lister = RegionMetadataLister::new(ctx.node_manager.clone());
let region_metadatas = {
let _timer = metrics::METRIC_META_RECONCILIATION_LIST_REGION_METADATA_DURATION
.with_label_values(&[metrics::TABLE_TYPE_PHYSICAL])
.start_timer();
region_metadata_lister
.list(physical_table_id, &physical_table_route.region_routes)
.await?
};
ensure!(!region_metadatas.is_empty(), {
metrics::METRIC_META_RECONCILIATION_STATS
.with_label_values(&[
ReconcileLogicalTablesProcedure::TYPE_NAME,
metrics::TABLE_TYPE_PHYSICAL,
metrics::STATS_TYPE_NO_REGION_METADATA,
])
.inc();
let region_metadatas = region_metadata_lister
.list(physical_table_id, &physical_table_route.region_routes)
.await?;
ensure!(
!region_metadatas.is_empty(),
error::UnexpectedSnafu {
err_msg: format!(
"No region metadata found for physical table: {}, table_id: {}",
"No region metadata found for table: {}, table_id: {}",
table_name, table_id
),
}
});
);
ensure!(region_metadatas.iter().all(|r| r.is_some()), {
metrics::METRIC_META_RECONCILIATION_STATS
.with_label_values(&[
ReconcileLogicalTablesProcedure::TYPE_NAME,
metrics::TABLE_TYPE_PHYSICAL,
metrics::STATS_TYPE_REGION_NOT_OPEN,
])
.inc();
error::UnexpectedSnafu {
if region_metadatas.iter().any(|r| r.is_none()) {
return error::UnexpectedSnafu {
err_msg: format!(
"Some regions of the physical table are not open. physical table: {}, table_id: {}",
"Some regions of the physical table are not open. Table: {}, table_id: {}",
table_name, table_id
),
}
});
.fail();
}
// Safety: checked above
let region_metadatas = region_metadatas
@@ -112,13 +96,14 @@ impl State for ReconciliationStart {
let _region_metadata = check_column_metadatas_consistent(&region_metadatas).context(
error::UnexpectedSnafu {
err_msg: format!(
"Column metadatas are not consistent for physical table: {}, table_id: {}",
"Column metadatas are not consistent for table: {}, table_id: {}",
table_name, table_id
),
},
)?;
// TODO(weny): ensure all columns in region metadata can be found in table info.
// Validates the logical tables.
Self::validate_schema(&ctx.persistent_ctx.logical_tables)?;
let table_refs = ctx
@@ -134,12 +119,6 @@ impl State for ReconciliationStart {
.await?;
Self::validate_logical_table_routes(ctx, &table_ids).await?;
let table_name = ctx.table_name();
info!(
"Starting reconciliation for logical tables: {:?}, physical_table_id: {}, table_name: {}, procedure_id: {}",
table_ids, table_id, table_name, procedure_ctx.procedure_id
);
ctx.persistent_ctx.physical_table_route = Some(physical_table_route);
ctx.persistent_ctx.logical_table_ids = table_ids;
Ok((Box::new(ResolveTableMetadatas), Status::executing(true)))

View File

@@ -22,11 +22,8 @@ use snafu::ensure;
use crate::ddl::utils::region_metadata_lister::RegionMetadataLister;
use crate::ddl::utils::table_info::get_all_table_info_values_by_table_ids;
use crate::error::{self, Result};
use crate::metrics;
use crate::reconciliation::reconcile_logical_tables::reconcile_regions::ReconcileRegions;
use crate::reconciliation::reconcile_logical_tables::{
ReconcileLogicalTablesContext, ReconcileLogicalTablesProcedure, State,
};
use crate::reconciliation::reconcile_logical_tables::{ReconcileLogicalTablesContext, State};
use crate::reconciliation::utils::{
check_column_metadatas_consistent, need_update_logical_table_info,
};
@@ -68,38 +65,22 @@ impl State for ResolveTableMetadatas {
.unwrap()
.region_routes;
let region_metadata_lister = RegionMetadataLister::new(ctx.node_manager.clone());
let mut metadata_consistent_count = 0;
let mut metadata_inconsistent_count = 0;
let mut create_tables_count = 0;
for (table_id, table_info_value) in table_ids.iter().zip(table_info_values.iter()) {
let region_metadatas = {
let _timer = metrics::METRIC_META_RECONCILIATION_LIST_REGION_METADATA_DURATION
.with_label_values(&[metrics::TABLE_TYPE_LOGICAL])
.start_timer();
region_metadata_lister
.list(*table_id, region_routes)
.await?
};
ensure!(!region_metadatas.is_empty(), {
metrics::METRIC_META_RECONCILIATION_STATS
.with_label_values(&[
ReconcileLogicalTablesProcedure::TYPE_NAME,
metrics::TABLE_TYPE_LOGICAL,
metrics::STATS_TYPE_NO_REGION_METADATA,
])
.inc();
let region_metadatas = region_metadata_lister
.list(*table_id, region_routes)
.await?;
ensure!(
!region_metadatas.is_empty(),
error::UnexpectedSnafu {
err_msg: format!(
"No region metadata found for table: {}, table_id: {}",
table_info_value.table_info.name, table_id
),
}
});
);
if region_metadatas.iter().any(|r| r.is_none()) {
create_tables_count += 1;
create_tables.push((*table_id, table_info_value.table_info.clone()));
continue;
}
@@ -110,12 +91,10 @@ impl State for ResolveTableMetadatas {
.map(|r| r.unwrap())
.collect::<Vec<_>>();
if let Some(column_metadatas) = check_column_metadatas_consistent(&region_metadatas) {
metadata_consistent_count += 1;
if need_update_logical_table_info(&table_info_value.table_info, &column_metadatas) {
update_table_infos.push((*table_id, column_metadatas));
}
} else {
metadata_inconsistent_count += 1;
// If the logical regions have inconsistent column metadatas, it won't affect read and write.
// It's safe to continue if the column metadatas of the logical table are inconsistent.
warn!(
@@ -142,11 +121,6 @@ impl State for ResolveTableMetadatas {
);
ctx.persistent_ctx.update_table_infos = update_table_infos;
ctx.persistent_ctx.create_tables = create_tables;
// Update metrics.
let metrics = ctx.mut_metrics();
metrics.column_metadata_consistent_count = metadata_consistent_count;
metrics.column_metadata_inconsistent_count = metadata_inconsistent_count;
metrics.create_tables_count = create_tables_count;
Ok((Box::new(ReconcileRegions), Status::executing(true)))
}

View File

@@ -96,7 +96,6 @@ impl State for UpdateTableInfos {
let table_id = ctx.table_id();
let table_name = ctx.table_name();
let updated_table_info_num = table_info_values_to_update.len();
batch_update_table_info_values(&ctx.table_metadata_manager, table_info_values_to_update)
.await?;
@@ -123,9 +122,6 @@ impl State for UpdateTableInfos {
.await?;
ctx.persistent_ctx.update_table_infos.clear();
// Update metrics.
let metrics = ctx.mut_metrics();
metrics.update_table_info_count = updated_table_info_num;
Ok((Box::new(ReconciliationEnd), Status::executing(false)))
}

View File

@@ -40,13 +40,10 @@ use crate::key::table_info::TableInfoValue;
use crate::key::table_route::PhysicalTableRouteValue;
use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock};
use crate::metrics;
use crate::node_manager::NodeManagerRef;
use crate::reconciliation::reconcile_table::reconciliation_start::ReconciliationStart;
use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveStrategy;
use crate::reconciliation::utils::{
build_table_meta_from_column_metadatas, Context, ReconcileTableMetrics,
};
use crate::reconciliation::utils::{build_table_meta_from_column_metadatas, Context};
pub struct ReconcileTableContext {
pub node_manager: NodeManagerRef,
@@ -68,46 +65,13 @@ impl ReconcileTableContext {
}
}
/// Returns the physical table name.
pub(crate) fn table_name(&self) -> &TableName {
&self.persistent_ctx.table_name
}
/// Returns the physical table id.
pub(crate) fn table_id(&self) -> TableId {
self.persistent_ctx.table_id
}
/// Builds a [`RawTableMeta`] from the provided [`ColumnMetadata`]s.
pub(crate) fn build_table_meta(
&self,
column_metadatas: &[ColumnMetadata],
) -> Result<RawTableMeta> {
// Safety: The table info value is set in `ReconciliationStart` state.
let table_info_value = self.persistent_ctx.table_info_value.as_ref().unwrap();
let table_id = self.table_id();
let table_ref = self.table_name().table_ref();
let name_to_ids = table_info_value.table_info.name_to_ids();
let table_meta = build_table_meta_from_column_metadatas(
table_id,
table_ref,
&table_info_value.table_info.meta,
name_to_ids,
column_metadatas,
)?;
Ok(table_meta)
}
/// Returns a mutable reference to the metrics.
pub(crate) fn mut_metrics(&mut self) -> &mut ReconcileTableMetrics {
&mut self.volatile_ctx.metrics
}
/// Returns a reference to the metrics.
pub(crate) fn metrics(&self) -> &ReconcileTableMetrics {
&self.volatile_ctx.metrics
}
}
#[derive(Debug, Serialize, Deserialize)]
@@ -146,7 +110,29 @@ impl PersistentContext {
#[derive(Default)]
pub(crate) struct VolatileContext {
pub(crate) table_meta: Option<RawTableMeta>,
pub(crate) metrics: ReconcileTableMetrics,
}
impl ReconcileTableContext {
/// Builds a [`RawTableMeta`] from the provided [`ColumnMetadata`]s.
pub(crate) fn build_table_meta(
&self,
column_metadatas: &[ColumnMetadata],
) -> Result<RawTableMeta> {
// Safety: The table info value is set in `ReconciliationStart` state.
let table_info_value = self.persistent_ctx.table_info_value.as_ref().unwrap();
let table_id = self.table_id();
let table_ref = self.table_name().table_ref();
let name_to_ids = table_info_value.table_info.name_to_ids();
let table_meta = build_table_meta_from_column_metadatas(
table_id,
table_ref,
&table_info_value.table_info.meta,
name_to_ids,
column_metadatas,
)?;
Ok(table_meta)
}
}
pub struct ReconcileTableProcedure {
@@ -205,11 +191,6 @@ impl Procedure for ReconcileTableProcedure {
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &mut self.state;
let procedure_name = Self::TYPE_NAME;
let step = state.name();
let _timer = metrics::METRIC_META_RECONCILIATION_PROCEDURE
.with_label_values(&[procedure_name, step])
.start_timer();
match state.next(&mut self.context, _ctx).await {
Ok((next, status)) => {
*state = next;
@@ -217,14 +198,8 @@ impl Procedure for ReconcileTableProcedure {
}
Err(e) => {
if e.is_retry_later() {
metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR
.with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_RETRYABLE])
.inc();
Err(ProcedureError::retry_later(e))
} else {
metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR
.with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_EXTERNAL])
.inc();
Err(ProcedureError::external(e))
}
}

View File

@@ -15,7 +15,6 @@
use std::any::Any;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use tonic::async_trait;
@@ -32,18 +31,9 @@ pub struct ReconciliationEnd;
impl State for ReconciliationEnd {
async fn next(
&mut self,
ctx: &mut ReconcileTableContext,
procedure_ctx: &ProcedureContext,
_ctx: &mut ReconcileTableContext,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let table_id = ctx.table_id();
let table_name = ctx.table_name();
let metrics = ctx.metrics();
info!(
"Physical table reconciliation completed. table_name: {}, table_id: {}, procedure_id: {}, metrics: {}",
table_name, table_id, procedure_ctx.procedure_id, metrics
);
Ok((Box::new(ReconciliationEnd), Status::done()))
}

View File

@@ -20,12 +20,9 @@ use serde::{Deserialize, Serialize};
use snafu::ensure;
use crate::ddl::utils::region_metadata_lister::RegionMetadataLister;
use crate::error::{self, Result};
use crate::metrics::{self};
use crate::error::{self, Result, UnexpectedSnafu};
use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveColumnMetadata;
use crate::reconciliation::reconcile_table::{
ReconcileTableContext, ReconcileTableProcedure, State,
};
use crate::reconciliation::reconcile_table::{ReconcileTableContext, State};
/// The start state of the reconciliation procedure.
///
@@ -43,7 +40,7 @@ impl State for ReconciliationStart {
async fn next(
&mut self,
ctx: &mut ReconcileTableContext,
procedure_ctx: &ProcedureContext,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let table_id = ctx.table_id();
let table_name = ctx.table_name();
@@ -63,56 +60,33 @@ impl State for ReconciliationStart {
}
);
info!(
"Reconciling table: {}, table_id: {}, procedure_id: {}",
table_name, table_id, procedure_ctx.procedure_id
);
info!("Reconciling table: {}, table_id: {}", table_name, table_id);
// TODO(weny): Repairs the table route if needed.
let region_metadata_lister = RegionMetadataLister::new(ctx.node_manager.clone());
// Always list region metadatas for the physical table.
let region_metadatas = region_metadata_lister
.list(physical_table_id, &physical_table_route.region_routes)
.await?;
let region_metadatas = {
let _timer = metrics::METRIC_META_RECONCILIATION_LIST_REGION_METADATA_DURATION
.with_label_values(&[metrics::TABLE_TYPE_PHYSICAL])
.start_timer();
// Always list region metadatas for the physical table.
region_metadata_lister
.list(physical_table_id, &physical_table_route.region_routes)
.await?
};
ensure!(!region_metadatas.is_empty(), {
metrics::METRIC_META_RECONCILIATION_STATS
.with_label_values(&[
ReconcileTableProcedure::TYPE_NAME,
metrics::TABLE_TYPE_PHYSICAL,
metrics::STATS_TYPE_NO_REGION_METADATA,
])
.inc();
ensure!(
!region_metadatas.is_empty(),
error::UnexpectedSnafu {
err_msg: format!(
"No region metadata found for table: {}, table_id: {}",
table_name, table_id
),
}
});
);
ensure!(region_metadatas.iter().all(|r| r.is_some()), {
metrics::METRIC_META_RECONCILIATION_STATS
.with_label_values(&[
ReconcileTableProcedure::TYPE_NAME,
metrics::TABLE_TYPE_PHYSICAL,
metrics::STATS_TYPE_REGION_NOT_OPEN,
])
.inc();
error::UnexpectedSnafu {
if region_metadatas.iter().any(|r| r.is_none()) {
return UnexpectedSnafu {
err_msg: format!(
"Some regions are not opened, table: {}, table_id: {}",
table_name, table_id
),
}
});
.fail();
}
// Persist the physical table route.
// TODO(weny): refetch the physical table route if repair is needed.

View File

@@ -20,7 +20,6 @@ use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use store_api::metadata::RegionMetadata;
use strum::AsRefStr;
use crate::error::{self, MissingColumnIdsSnafu, Result};
use crate::reconciliation::reconcile_table::reconcile_regions::ReconcileRegions;
@@ -29,11 +28,10 @@ use crate::reconciliation::reconcile_table::{ReconcileTableContext, State};
use crate::reconciliation::utils::{
build_column_metadata_from_table_info, check_column_metadatas_consistent,
resolve_column_metadatas_with_latest, resolve_column_metadatas_with_metasrv,
ResolveColumnMetadataResult,
};
/// Strategy for resolving column metadata inconsistencies.
#[derive(Debug, Serialize, Deserialize, Clone, Copy, Default, AsRefStr)]
#[derive(Debug, Serialize, Deserialize, Clone, Copy, Default)]
pub enum ResolveStrategy {
#[default]
/// Trusts the latest column metadata from datanode.
@@ -100,10 +98,6 @@ impl State for ResolveColumnMetadata {
"Column metadatas are consistent for table: {}, table_id: {}.",
table_name, table_id
);
// Update metrics.
ctx.mut_metrics().resolve_column_metadata_result =
Some(ResolveColumnMetadataResult::Consistent);
return Ok((
Box::new(UpdateTableInfo::new(table_info_value, column_metadatas)),
Status::executing(false),
@@ -125,11 +119,6 @@ impl State for ResolveColumnMetadata {
let region_ids =
resolve_column_metadatas_with_metasrv(&column_metadata, &self.region_metadata)?;
// Update metrics.
let metrics = ctx.mut_metrics();
metrics.resolve_column_metadata_result =
Some(ResolveColumnMetadataResult::Inconsistent(self.strategy));
Ok((
Box::new(ReconcileRegions::new(column_metadata, region_ids)),
Status::executing(true),
@@ -138,29 +127,16 @@ impl State for ResolveColumnMetadata {
ResolveStrategy::UseLatest => {
let (column_metadatas, region_ids) =
resolve_column_metadatas_with_latest(&self.region_metadata)?;
// Update metrics.
let metrics = ctx.mut_metrics();
metrics.resolve_column_metadata_result =
Some(ResolveColumnMetadataResult::Inconsistent(self.strategy));
Ok((
Box::new(ReconcileRegions::new(column_metadatas, region_ids)),
Status::executing(true),
))
}
ResolveStrategy::AbortOnConflict => {
let table_name = table_name.to_string();
// Update metrics.
let metrics = ctx.mut_metrics();
metrics.resolve_column_metadata_result =
Some(ResolveColumnMetadataResult::Inconsistent(self.strategy));
error::ColumnMetadataConflictsSnafu {
table_name,
table_id,
}
.fail()
ResolveStrategy::AbortOnConflict => error::ColumnMetadataConflictsSnafu {
table_name: table_name.to_string(),
table_id,
}
.fail(),
}
}

View File

@@ -116,9 +116,6 @@ impl State for UpdateTableInfo {
],
)
.await?;
// Update metrics.
let metrics = ctx.mut_metrics();
metrics.update_table_info = true;
Ok((Box::new(ReconciliationEnd), Status::executing(true)))
}

View File

@@ -13,35 +13,23 @@
// limitations under the License.
use std::collections::{HashMap, HashSet};
use std::fmt::{self, Display};
use std::ops::AddAssign;
use std::time::Instant;
use std::fmt;
use api::v1::SemanticType;
use common_procedure::{watcher, Context as ProcedureContext, ProcedureId};
use common_telemetry::{error, warn};
use common_telemetry::warn;
use datatypes::schema::ColumnSchema;
use futures::future::{join_all, try_join_all};
use snafu::{ensure, OptionExt, ResultExt};
use snafu::{ensure, OptionExt};
use store_api::metadata::{ColumnMetadata, RegionMetadata};
use store_api::storage::{RegionId, TableId};
use table::metadata::{RawTableInfo, RawTableMeta};
use table::table_name::TableName;
use table::table_reference::TableReference;
use crate::cache_invalidator::CacheInvalidatorRef;
use crate::error::{
ColumnIdMismatchSnafu, ColumnNotFoundSnafu, MismatchColumnIdSnafu,
MissingColumnInColumnMetadataSnafu, ProcedureStateReceiverNotFoundSnafu,
ProcedureStateReceiverSnafu, Result, TimestampMismatchSnafu, UnexpectedSnafu,
WaitProcedureSnafu,
MismatchColumnIdSnafu, MissingColumnInColumnMetadataSnafu, Result, UnexpectedSnafu,
};
use crate::key::TableMetadataManagerRef;
use crate::metrics;
use crate::node_manager::NodeManagerRef;
use crate::reconciliation::reconcile_logical_tables::ReconcileLogicalTablesProcedure;
use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveStrategy;
use crate::reconciliation::reconcile_table::ReconcileTableProcedure;
#[derive(Debug, PartialEq, Eq)]
pub(crate) struct PartialRegionMetadata<'a> {
@@ -60,6 +48,20 @@ impl<'a> From<&'a RegionMetadata> for PartialRegionMetadata<'a> {
}
}
/// A display wrapper for [`ColumnMetadata`] that formats the column metadata in a more readable way.
struct ColumnMetadataDisplay<'a>(pub &'a ColumnMetadata);
impl<'a> fmt::Debug for ColumnMetadataDisplay<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let col = self.0;
write!(
f,
"Column {{ name: {}, id: {}, semantic_type: {:?}, data_type: {:?} }}",
col.column_schema.name, col.column_id, col.semantic_type, col.column_schema.data_type,
)
}
}
/// Checks if the column metadatas are consistent.
///
/// The column metadatas are consistent if:
@@ -108,7 +110,21 @@ pub(crate) fn resolve_column_metadatas_with_metasrv(
let mut regions_ids = vec![];
for region_metadata in region_metadatas {
if region_metadata.column_metadatas != column_metadatas {
check_column_metadata_invariants(column_metadatas, &region_metadata.column_metadatas)?;
let is_invariant_preserved = check_column_metadata_invariants(
column_metadatas,
&region_metadata.column_metadatas,
);
ensure!(
is_invariant_preserved,
UnexpectedSnafu {
err_msg: format!(
"Column metadata invariants violated for region {}. Resolved column metadata: {:?}, region column metadata: {:?}",
region_metadata.region_id,
column_metadatas.iter().map(ColumnMetadataDisplay).collect::<Vec<_>>(),
region_metadata.column_metadatas.iter().map(ColumnMetadataDisplay).collect::<Vec<_>>(),
)
}
);
regions_ids.push(region_metadata.region_id);
}
}
@@ -147,10 +163,21 @@ pub(crate) fn resolve_column_metadatas_with_latest(
let mut region_ids = vec![];
for region_metadata in region_metadatas {
if PartialRegionMetadata::from(region_metadata) != latest_column_metadatas {
check_column_metadata_invariants(
let is_invariant_preserved = check_column_metadata_invariants(
&latest_region_metadata.column_metadatas,
&region_metadata.column_metadatas,
)?;
);
ensure!(
is_invariant_preserved,
UnexpectedSnafu {
err_msg: format!(
"Column metadata invariants violated for region {}. Resolved column metadata: {:?}, region column metadata: {:?}",
region_metadata.region_id,
latest_column_metadatas.column_metadatas.iter().map(ColumnMetadataDisplay).collect::<Vec<_>>(),
region_metadata.column_metadatas.iter().map(ColumnMetadataDisplay).collect::<Vec<_>>()
)
}
);
region_ids.push(region_metadata.region_id);
}
}
@@ -212,7 +239,7 @@ pub(crate) fn build_column_metadata_from_table_info(
pub(crate) fn check_column_metadata_invariants(
new_column_metadatas: &[ColumnMetadata],
column_metadatas: &[ColumnMetadata],
) -> Result<()> {
) -> bool {
let new_primary_keys = new_column_metadatas
.iter()
.filter(|c| c.semantic_type == SemanticType::Tag)
@@ -225,50 +252,22 @@ pub(crate) fn check_column_metadata_invariants(
.map(|c| (c.column_schema.name.as_str(), c.column_id));
for (name, id) in old_primary_keys {
let column_id = new_primary_keys
.get(name)
.cloned()
.context(ColumnNotFoundSnafu {
column_name: name,
column_id: id,
})?;
ensure!(
column_id == id,
ColumnIdMismatchSnafu {
column_name: name,
expected_column_id: id,
actual_column_id: column_id,
}
);
if new_primary_keys.get(name) != Some(&id) {
return false;
}
}
let new_ts_column = new_column_metadatas
.iter()
.find(|c| c.semantic_type == SemanticType::Timestamp)
.map(|c| (c.column_schema.name.as_str(), c.column_id))
.context(UnexpectedSnafu {
err_msg: "Timestamp column not found in new column metadata",
})?;
.map(|c| (c.column_schema.name.as_str(), c.column_id));
let old_ts_column = column_metadatas
.iter()
.find(|c| c.semantic_type == SemanticType::Timestamp)
.map(|c| (c.column_schema.name.as_str(), c.column_id))
.context(UnexpectedSnafu {
err_msg: "Timestamp column not found in column metadata",
})?;
ensure!(
new_ts_column == old_ts_column,
TimestampMismatchSnafu {
expected_column_name: old_ts_column.0,
expected_column_id: old_ts_column.1,
actual_column_name: new_ts_column.0,
actual_column_id: new_ts_column.1,
}
);
.map(|c| (c.column_schema.name.as_str(), c.column_id));
Ok(())
new_ts_column == old_ts_column
}
/// Builds a [`RawTableMeta`] from the provided [`ColumnMetadata`]s.
@@ -407,88 +406,6 @@ pub(crate) fn need_update_logical_table_info(
table_info.meta.schema.column_schemas.len() != column_metadatas.len()
}
/// The result of waiting for inflight subprocedures.
pub struct PartialSuccessResult<'a> {
pub failed_procedures: Vec<&'a SubprocedureMeta>,
pub success_procedures: Vec<&'a SubprocedureMeta>,
}
/// The result of waiting for inflight subprocedures.
pub enum WaitForInflightSubproceduresResult<'a> {
Success(Vec<&'a SubprocedureMeta>),
PartialSuccess(PartialSuccessResult<'a>),
}
/// Wait for inflight subprocedures.
///
/// If `fail_fast` is true, the function will return an error if any subprocedure fails.
/// Otherwise, the function will continue waiting for all subprocedures to complete.
pub(crate) async fn wait_for_inflight_subprocedures<'a>(
procedure_ctx: &ProcedureContext,
subprocedures: &'a [SubprocedureMeta],
fail_fast: bool,
) -> Result<WaitForInflightSubproceduresResult<'a>> {
let mut receivers = Vec::with_capacity(subprocedures.len());
for subprocedure in subprocedures {
let procedure_id = subprocedure.procedure_id();
let receiver = procedure_ctx
.provider
.procedure_state_receiver(procedure_id)
.await
.context(ProcedureStateReceiverSnafu { procedure_id })?
.context(ProcedureStateReceiverNotFoundSnafu { procedure_id })?;
receivers.push((receiver, subprocedure));
}
let mut tasks = Vec::with_capacity(receivers.len());
for (receiver, subprocedure) in receivers.iter_mut() {
tasks.push(async move {
watcher::wait(receiver).await.inspect_err(|e| {
error!(e; "inflight subprocedure failed, parent procedure_id: {}, procedure: {}", procedure_ctx.procedure_id, subprocedure);
})
});
}
if fail_fast {
try_join_all(tasks).await.context(WaitProcedureSnafu)?;
return Ok(WaitForInflightSubproceduresResult::Success(
subprocedures.iter().collect(),
));
}
// If fail_fast is false, we need to wait for all subprocedures to complete.
let results = join_all(tasks).await;
let failed_procedures_num = results.iter().filter(|r| r.is_err()).count();
if failed_procedures_num == 0 {
return Ok(WaitForInflightSubproceduresResult::Success(
subprocedures.iter().collect(),
));
}
warn!(
"{} inflight subprocedures failed, total: {}, parent procedure_id: {}",
failed_procedures_num,
subprocedures.len(),
procedure_ctx.procedure_id
);
let mut failed_procedures = Vec::with_capacity(failed_procedures_num);
let mut success_procedures = Vec::with_capacity(subprocedures.len() - failed_procedures_num);
for (result, subprocedure) in results.into_iter().zip(subprocedures) {
if result.is_err() {
failed_procedures.push(subprocedure);
} else {
success_procedures.push(subprocedure);
}
}
Ok(WaitForInflightSubproceduresResult::PartialSuccess(
PartialSuccessResult {
failed_procedures,
success_procedures,
},
))
}
#[derive(Clone)]
pub struct Context {
pub node_manager: NodeManagerRef,
@@ -496,446 +413,6 @@ pub struct Context {
pub cache_invalidator: CacheInvalidatorRef,
}
/// Metadata for an inflight physical table subprocedure.
pub struct PhysicalTableMeta {
pub procedure_id: ProcedureId,
pub table_id: TableId,
pub table_name: TableName,
}
/// Metadata for an inflight logical table subprocedure.
pub struct LogicalTableMeta {
pub procedure_id: ProcedureId,
pub physical_table_id: TableId,
pub physical_table_name: TableName,
pub logical_tables: Vec<(TableId, TableName)>,
}
/// Metadata for an inflight database subprocedure.
pub struct ReconcileDatabaseMeta {
pub procedure_id: ProcedureId,
pub catalog: String,
pub schema: String,
}
/// The inflight subprocedure metadata.
pub enum SubprocedureMeta {
PhysicalTable(PhysicalTableMeta),
LogicalTable(LogicalTableMeta),
Database(ReconcileDatabaseMeta),
}
impl Display for SubprocedureMeta {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
SubprocedureMeta::PhysicalTable(meta) => {
write!(
f,
"ReconcilePhysicalTable(procedure_id: {}, table_id: {}, table_name: {})",
meta.procedure_id, meta.table_id, meta.table_name
)
}
SubprocedureMeta::LogicalTable(meta) => {
write!(
f,
"ReconcileLogicalTable(procedure_id: {}, physical_table_id: {}, physical_table_name: {}, logical_tables: {:?})",
meta.procedure_id, meta.physical_table_id, meta.physical_table_name, meta.logical_tables
)
}
SubprocedureMeta::Database(meta) => {
write!(
f,
"ReconcileDatabase(procedure_id: {}, catalog: {}, schema: {})",
meta.procedure_id, meta.catalog, meta.schema
)
}
}
}
}
impl SubprocedureMeta {
/// Creates a new logical table subprocedure metadata.
pub fn new_logical_table(
procedure_id: ProcedureId,
physical_table_id: TableId,
physical_table_name: TableName,
logical_tables: Vec<(TableId, TableName)>,
) -> Self {
Self::LogicalTable(LogicalTableMeta {
procedure_id,
physical_table_id,
physical_table_name,
logical_tables,
})
}
/// Creates a new physical table subprocedure metadata.
pub fn new_physical_table(
procedure_id: ProcedureId,
table_id: TableId,
table_name: TableName,
) -> Self {
Self::PhysicalTable(PhysicalTableMeta {
procedure_id,
table_id,
table_name,
})
}
/// Creates a new reconcile database subprocedure metadata.
pub fn new_reconcile_database(
procedure_id: ProcedureId,
catalog: String,
schema: String,
) -> Self {
Self::Database(ReconcileDatabaseMeta {
procedure_id,
catalog,
schema,
})
}
/// Returns the procedure id of the subprocedure.
pub fn procedure_id(&self) -> ProcedureId {
match self {
SubprocedureMeta::PhysicalTable(meta) => meta.procedure_id,
SubprocedureMeta::LogicalTable(meta) => meta.procedure_id,
SubprocedureMeta::Database(meta) => meta.procedure_id,
}
}
/// Returns the number of tables will be reconciled.
pub fn table_num(&self) -> usize {
match self {
SubprocedureMeta::PhysicalTable(_) => 1,
SubprocedureMeta::LogicalTable(meta) => meta.logical_tables.len(),
SubprocedureMeta::Database(_) => 0,
}
}
/// Returns the number of databases will be reconciled.
pub fn database_num(&self) -> usize {
match self {
SubprocedureMeta::Database(_) => 1,
_ => 0,
}
}
}
/// The metrics of reconciling catalog.
#[derive(Clone, Default)]
pub struct ReconcileCatalogMetrics {
pub succeeded_databases: usize,
pub failed_databases: usize,
}
impl AddAssign for ReconcileCatalogMetrics {
fn add_assign(&mut self, other: Self) {
self.succeeded_databases += other.succeeded_databases;
self.failed_databases += other.failed_databases;
}
}
impl Display for ReconcileCatalogMetrics {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"succeeded_databases: {}, failed_databases: {}",
self.succeeded_databases, self.failed_databases
)
}
}
impl From<WaitForInflightSubproceduresResult<'_>> for ReconcileCatalogMetrics {
fn from(result: WaitForInflightSubproceduresResult<'_>) -> Self {
match result {
WaitForInflightSubproceduresResult::Success(subprocedures) => ReconcileCatalogMetrics {
succeeded_databases: subprocedures.len(),
failed_databases: 0,
},
WaitForInflightSubproceduresResult::PartialSuccess(PartialSuccessResult {
failed_procedures,
success_procedures,
}) => {
let succeeded_databases = success_procedures
.iter()
.map(|subprocedure| subprocedure.database_num())
.sum();
let failed_databases = failed_procedures
.iter()
.map(|subprocedure| subprocedure.database_num())
.sum();
ReconcileCatalogMetrics {
succeeded_databases,
failed_databases,
}
}
}
}
}
/// The metrics of reconciling database.
#[derive(Clone, Default)]
pub struct ReconcileDatabaseMetrics {
pub succeeded_tables: usize,
pub failed_tables: usize,
pub succeeded_procedures: usize,
pub failed_procedures: usize,
}
impl Display for ReconcileDatabaseMetrics {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "succeeded_tables: {}, failed_tables: {}, succeeded_procedures: {}, failed_procedures: {}", self.succeeded_tables, self.failed_tables, self.succeeded_procedures, self.failed_procedures)
}
}
impl AddAssign for ReconcileDatabaseMetrics {
fn add_assign(&mut self, other: Self) {
self.succeeded_tables += other.succeeded_tables;
self.failed_tables += other.failed_tables;
self.succeeded_procedures += other.succeeded_procedures;
self.failed_procedures += other.failed_procedures;
}
}
impl From<WaitForInflightSubproceduresResult<'_>> for ReconcileDatabaseMetrics {
fn from(result: WaitForInflightSubproceduresResult<'_>) -> Self {
match result {
WaitForInflightSubproceduresResult::Success(subprocedures) => {
let table_num = subprocedures
.iter()
.map(|subprocedure| subprocedure.table_num())
.sum();
ReconcileDatabaseMetrics {
succeeded_procedures: subprocedures.len(),
failed_procedures: 0,
succeeded_tables: table_num,
failed_tables: 0,
}
}
WaitForInflightSubproceduresResult::PartialSuccess(PartialSuccessResult {
failed_procedures,
success_procedures,
}) => {
let succeeded_tables = success_procedures
.iter()
.map(|subprocedure| subprocedure.table_num())
.sum();
let failed_tables = failed_procedures
.iter()
.map(|subprocedure| subprocedure.table_num())
.sum();
ReconcileDatabaseMetrics {
succeeded_procedures: success_procedures.len(),
failed_procedures: failed_procedures.len(),
succeeded_tables,
failed_tables,
}
}
}
}
}
/// The metrics of reconciling logical tables.
#[derive(Clone)]
pub struct ReconcileLogicalTableMetrics {
pub start_time: Instant,
pub update_table_info_count: usize,
pub create_tables_count: usize,
pub column_metadata_consistent_count: usize,
pub column_metadata_inconsistent_count: usize,
}
impl Default for ReconcileLogicalTableMetrics {
fn default() -> Self {
Self {
start_time: Instant::now(),
update_table_info_count: 0,
create_tables_count: 0,
column_metadata_consistent_count: 0,
column_metadata_inconsistent_count: 0,
}
}
}
const CREATE_TABLES: &str = "create_tables";
const UPDATE_TABLE_INFO: &str = "update_table_info";
const COLUMN_METADATA_CONSISTENT: &str = "column_metadata_consistent";
const COLUMN_METADATA_INCONSISTENT: &str = "column_metadata_inconsistent";
impl ReconcileLogicalTableMetrics {
/// The total number of tables that have been reconciled.
pub fn total_table_count(&self) -> usize {
self.create_tables_count
+ self.column_metadata_consistent_count
+ self.column_metadata_inconsistent_count
}
}
impl Drop for ReconcileLogicalTableMetrics {
fn drop(&mut self) {
let procedure_name = ReconcileLogicalTablesProcedure::TYPE_NAME;
metrics::METRIC_META_RECONCILIATION_STATS
.with_label_values(&[procedure_name, metrics::TABLE_TYPE_LOGICAL, CREATE_TABLES])
.inc_by(self.create_tables_count as u64);
metrics::METRIC_META_RECONCILIATION_STATS
.with_label_values(&[
procedure_name,
metrics::TABLE_TYPE_LOGICAL,
UPDATE_TABLE_INFO,
])
.inc_by(self.update_table_info_count as u64);
metrics::METRIC_META_RECONCILIATION_STATS
.with_label_values(&[
procedure_name,
metrics::TABLE_TYPE_LOGICAL,
COLUMN_METADATA_CONSISTENT,
])
.inc_by(self.column_metadata_consistent_count as u64);
metrics::METRIC_META_RECONCILIATION_STATS
.with_label_values(&[
procedure_name,
metrics::TABLE_TYPE_LOGICAL,
COLUMN_METADATA_INCONSISTENT,
])
.inc_by(self.column_metadata_inconsistent_count as u64);
}
}
impl Display for ReconcileLogicalTableMetrics {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let elapsed = self.start_time.elapsed();
if self.create_tables_count > 0 {
write!(f, "create_tables_count: {}, ", self.create_tables_count)?;
}
if self.update_table_info_count > 0 {
write!(
f,
"update_table_info_count: {}, ",
self.update_table_info_count
)?;
}
if self.column_metadata_consistent_count > 0 {
write!(
f,
"column_metadata_consistent_count: {}, ",
self.column_metadata_consistent_count
)?;
}
if self.column_metadata_inconsistent_count > 0 {
write!(
f,
"column_metadata_inconsistent_count: {}, ",
self.column_metadata_inconsistent_count
)?;
}
write!(
f,
"total_table_count: {}, elapsed: {:?}",
self.total_table_count(),
elapsed
)
}
}
/// The result of resolving column metadata.
#[derive(Clone, Copy)]
pub enum ResolveColumnMetadataResult {
Consistent,
Inconsistent(ResolveStrategy),
}
impl Display for ResolveColumnMetadataResult {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ResolveColumnMetadataResult::Consistent => write!(f, "Consistent"),
ResolveColumnMetadataResult::Inconsistent(strategy) => {
let strategy_str = strategy.as_ref();
write!(f, "Inconsistent({})", strategy_str)
}
}
}
}
/// The metrics of reconciling physical tables.
#[derive(Clone)]
pub struct ReconcileTableMetrics {
/// The start time of the reconciliation.
pub start_time: Instant,
/// The result of resolving column metadata.
pub resolve_column_metadata_result: Option<ResolveColumnMetadataResult>,
/// Whether the table info has been updated.
pub update_table_info: bool,
}
impl Drop for ReconcileTableMetrics {
fn drop(&mut self) {
if let Some(resolve_column_metadata_result) = self.resolve_column_metadata_result {
match resolve_column_metadata_result {
ResolveColumnMetadataResult::Consistent => {
metrics::METRIC_META_RECONCILIATION_STATS
.with_label_values(&[
ReconcileTableProcedure::TYPE_NAME,
metrics::TABLE_TYPE_PHYSICAL,
COLUMN_METADATA_CONSISTENT,
])
.inc();
}
ResolveColumnMetadataResult::Inconsistent(strategy) => {
metrics::METRIC_META_RECONCILIATION_STATS
.with_label_values(&[
ReconcileTableProcedure::TYPE_NAME,
metrics::TABLE_TYPE_PHYSICAL,
COLUMN_METADATA_INCONSISTENT,
])
.inc();
metrics::METRIC_META_RECONCILIATION_RESOLVED_COLUMN_METADATA
.with_label_values(&[strategy.as_ref()])
.inc();
}
}
}
if self.update_table_info {
metrics::METRIC_META_RECONCILIATION_STATS
.with_label_values(&[
ReconcileTableProcedure::TYPE_NAME,
metrics::TABLE_TYPE_PHYSICAL,
UPDATE_TABLE_INFO,
])
.inc();
}
}
}
impl Default for ReconcileTableMetrics {
fn default() -> Self {
Self {
start_time: Instant::now(),
resolve_column_metadata_result: None,
update_table_info: false,
}
}
}
impl Display for ReconcileTableMetrics {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let elapsed = self.start_time.elapsed();
if let Some(resolve_column_metadata_result) = self.resolve_column_metadata_result {
write!(
f,
"resolve_column_metadata_result: {}, ",
resolve_column_metadata_result
)?;
}
write!(
f,
"update_table_info: {}, elapsed: {:?}",
self.update_table_info, elapsed
)
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
@@ -1188,7 +665,10 @@ mod tests {
semantic_type: SemanticType::Field,
column_id: 3,
});
check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap();
assert!(check_column_metadata_invariants(
&new_column_metadatas,
&column_metadatas
));
}
#[test]
@@ -1196,12 +676,18 @@ mod tests {
let column_metadatas = new_test_column_metadatas();
let mut new_column_metadatas = column_metadatas.clone();
new_column_metadatas.retain(|c| c.semantic_type != SemanticType::Timestamp);
check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap_err();
assert!(!check_column_metadata_invariants(
&new_column_metadatas,
&column_metadatas
));
let column_metadatas = new_test_column_metadatas();
let mut new_column_metadatas = column_metadatas.clone();
new_column_metadatas.retain(|c| c.semantic_type != SemanticType::Tag);
check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap_err();
assert!(!check_column_metadata_invariants(
&new_column_metadatas,
&column_metadatas
));
}
#[test]
@@ -1214,7 +700,10 @@ mod tests {
{
col.column_id = 100;
}
check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap_err();
assert!(!check_column_metadata_invariants(
&new_column_metadatas,
&column_metadatas
));
let column_metadatas = new_test_column_metadatas();
let mut new_column_metadatas = column_metadatas.clone();
@@ -1224,7 +713,10 @@ mod tests {
{
col.column_id = 100;
}
check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap_err();
assert!(!check_column_metadata_invariants(
&new_column_metadatas,
&column_metadatas
));
}
#[test]

View File

@@ -25,8 +25,7 @@ use snafu::ResultExt;
use crate::error::{
BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, CreateKafkaWalTopicSnafu,
KafkaGetOffsetSnafu, KafkaPartitionClientSnafu, ProduceRecordSnafu, ResolveKafkaEndpointSnafu,
Result, TlsConfigSnafu,
KafkaGetOffsetSnafu, KafkaPartitionClientSnafu, ProduceRecordSnafu, Result, TlsConfigSnafu,
};
// Each topic only has one partition for now.
@@ -209,10 +208,10 @@ impl KafkaTopicCreator {
/// Builds a kafka [Client](rskafka::client::Client).
pub async fn build_kafka_client(connection: &KafkaConnectionConfig) -> Result<Client> {
// Builds an kafka controller client for creating topics.
let broker_endpoints = common_wal::resolve_to_ipv4(&connection.broker_endpoints)
.await
.context(ResolveKafkaEndpointSnafu)?;
let mut builder = ClientBuilder::new(broker_endpoints).backoff_config(DEFAULT_BACKOFF_CONFIG);
let mut builder = ClientBuilder::new(connection.broker_endpoints.clone())
.backoff_config(DEFAULT_BACKOFF_CONFIG)
.connect_timeout(Some(connection.connect_timeout))
.timeout(Some(connection.timeout));
if let Some(sasl) = &connection.sasl {
builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
};

View File

@@ -19,7 +19,7 @@ use opentelemetry::propagation::TextMapPropagator;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use tracing_opentelemetry::OpenTelemetrySpanExt;
// An wapper for `Futures` that provides tracing instrument adapters.
// An wrapper for `Futures` that provides tracing instrument adapters.
pub trait FutureExt: std::future::Future + Sized {
fn trace(self, span: tracing::span::Span) -> tracing::instrument::Instrumented<Self>;
}

View File

@@ -189,6 +189,8 @@ mod tests {
client_cert_path: None,
client_key_path: None,
}),
connect_timeout: Duration::from_secs(3),
timeout: Duration::from_secs(3),
},
kafka_topic: KafkaTopicConfig {
num_topics: 32,
@@ -221,6 +223,8 @@ mod tests {
client_cert_path: None,
client_key_path: None,
}),
connect_timeout: Duration::from_secs(3),
timeout: Duration::from_secs(3),
},
max_batch_bytes: ReadableSize::mb(1),
consumer_wait_timeout: Duration::from_millis(100),

View File

@@ -161,6 +161,12 @@ pub struct KafkaConnectionConfig {
pub sasl: Option<KafkaClientSasl>,
/// Client TLS config
pub tls: Option<KafkaClientTls>,
/// The connect timeout for kafka client.
#[serde(with = "humantime_serde")]
pub connect_timeout: Duration,
/// The timeout for kafka client.
#[serde(with = "humantime_serde")]
pub timeout: Duration,
}
impl Default for KafkaConnectionConfig {
@@ -169,6 +175,8 @@ impl Default for KafkaConnectionConfig {
broker_endpoints: vec![BROKER_ENDPOINT.to_string()],
sasl: None,
tls: None,
connect_timeout: Duration::from_secs(3),
timeout: Duration::from_secs(3),
}
}
}

View File

@@ -669,24 +669,16 @@ async fn open_all_regions(
ignore_nonexistent_region,
)
.await?;
if !ignore_nonexistent_region {
ensure!(
open_regions.len() == num_regions,
error::UnexpectedSnafu {
violated: format!(
"Expected to open {} of regions, only {} of regions has opened",
num_regions,
open_regions.len()
)
}
);
} else if open_regions.len() != num_regions {
warn!(
"ignore nonexistent region, expected to open {} of regions, only {} of regions has opened",
num_regions,
open_regions.len()
);
}
ensure!(
open_regions.len() == num_regions,
error::UnexpectedSnafu {
violated: format!(
"Expected to open {} of regions, only {} of regions has opened",
num_regions,
open_regions.len()
)
}
);
for region_id in open_regions {
if open_with_writable {
@@ -727,24 +719,16 @@ async fn open_all_regions(
)
.await?;
if !ignore_nonexistent_region {
ensure!(
open_regions.len() == num_regions,
error::UnexpectedSnafu {
violated: format!(
"Expected to open {} of follower regions, only {} of regions has opened",
num_regions,
open_regions.len()
)
}
);
} else if open_regions.len() != num_regions {
warn!(
"ignore nonexistent region, expected to open {} of follower regions, only {} of regions has opened",
num_regions,
open_regions.len()
);
}
ensure!(
open_regions.len() == num_regions,
error::UnexpectedSnafu {
violated: format!(
"Expected to open {} of follower regions, only {} of regions has opened",
num_regions,
open_regions.len()
)
}
);
}
info!("all regions are opened");

View File

@@ -218,6 +218,7 @@ impl HeartbeatTask {
if let Some(message) = message {
Self::new_heartbeat_request(&heartbeat_request, Some(message), &latest_report)
} else {
warn!("Sender has been dropped, exiting the heartbeat loop");
// Receives None that means Sender was dropped, we need to break the current loop
break
}
@@ -259,7 +260,11 @@ impl HeartbeatTask {
error!(e; "Error while handling heartbeat response");
}
}
Ok(None) => break,
Ok(None) => {
warn!("Heartbeat response stream closed");
capture_self.start_with_retry(retry_interval).await;
break;
}
Err(e) => {
error!(e; "Occur error while reading heartbeat response");
capture_self.start_with_retry(retry_interval).await;

View File

@@ -71,6 +71,6 @@ pub struct LinearStagePlan {
/// The key expressions to use for the lookup relation.
pub lookup_key: Vec<ScalarExpr>,
/// The closure to apply to the concatenation of the key columns,
/// the stream value columns, and the lookup value colunms.
/// the stream value columns, and the lookup value columns.
pub closure: JoinFilter,
}

View File

@@ -23,7 +23,7 @@ use common_meta::heartbeat::handler::{
};
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
use common_telemetry::{debug, error, info};
use common_telemetry::{debug, error, info, warn};
use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
use servers::addrs;
use servers::heartbeat_options::HeartbeatOptions;
@@ -42,8 +42,8 @@ use crate::metrics::{HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
pub struct HeartbeatTask {
peer_addr: String,
meta_client: Arc<MetaClient>,
report_interval: u64,
retry_interval: u64,
report_interval: Duration,
retry_interval: Duration,
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
start_time_ms: u64,
}
@@ -58,8 +58,8 @@ impl HeartbeatTask {
HeartbeatTask {
peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
meta_client,
report_interval: heartbeat_opts.interval.as_millis() as u64,
retry_interval: heartbeat_opts.retry_interval.as_millis() as u64,
report_interval: heartbeat_opts.interval,
retry_interval: heartbeat_opts.retry_interval,
resp_handler_executor,
start_time_ms: common_time::util::current_time_millis() as u64,
}
@@ -103,13 +103,15 @@ impl HeartbeatTask {
HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
}
}
Ok(None) => break,
Ok(None) => {
warn!("Heartbeat response stream closed");
capture_self.start_with_retry(retry_interval).await;
break;
}
Err(e) => {
HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
error!(e; "Occur error while reading heartbeat response");
capture_self
.start_with_retry(Duration::from_millis(retry_interval))
.await;
capture_self.start_with_retry(retry_interval).await;
break;
}
@@ -177,12 +179,13 @@ impl HeartbeatTask {
if let Some(message) = message {
Self::new_heartbeat_request(&heartbeat_request, Some(message))
} else {
warn!("Sender has been dropped, exiting the heartbeat loop");
// Receives None that means Sender was dropped, we need to break the current loop
break
}
}
_ = &mut sleep => {
sleep.as_mut().reset(Instant::now() + Duration::from_millis(report_interval));
sleep.as_mut().reset(Instant::now() + report_interval);
Self::new_heartbeat_request(&heartbeat_request, None)
}
};

View File

@@ -139,9 +139,6 @@ pub enum Error {
error: rskafka::client::error::Error,
},
#[snafu(display("Failed to resolve Kafka broker endpoint."))]
ResolveKafkaEndpoint { source: common_wal::error::Error },
#[snafu(display(
"Failed to build a Kafka partition client, topic: {}, partition: {}",
topic,
@@ -343,7 +340,6 @@ impl ErrorExt for Error {
StartWalTask { .. }
| StopWalTask { .. }
| IllegalState { .. }
| ResolveKafkaEndpoint { .. }
| NoMaxValue { .. }
| Cast { .. }
| EncodeJson { .. }

View File

@@ -24,9 +24,7 @@ use snafu::ResultExt;
use store_api::logstore::provider::KafkaProvider;
use tokio::sync::{Mutex, RwLock};
use crate::error::{
BuildClientSnafu, BuildPartitionClientSnafu, ResolveKafkaEndpointSnafu, Result, TlsConfigSnafu,
};
use crate::error::{BuildClientSnafu, BuildPartitionClientSnafu, Result, TlsConfigSnafu};
use crate::kafka::index::{GlobalIndexCollector, NoopCollector};
use crate::kafka::producer::{OrderedBatchProducer, OrderedBatchProducerRef};
@@ -78,11 +76,10 @@ impl ClientManager {
high_watermark: Arc<DashMap<Arc<KafkaProvider>, u64>>,
) -> Result<Self> {
// Sets backoff config for the top-level kafka client and all clients constructed by it.
let broker_endpoints = common_wal::resolve_to_ipv4(&config.connection.broker_endpoints)
.await
.context(ResolveKafkaEndpointSnafu)?;
let mut builder =
ClientBuilder::new(broker_endpoints).backoff_config(DEFAULT_BACKOFF_CONFIG);
let mut builder = ClientBuilder::new(config.connection.broker_endpoints.clone())
.backoff_config(DEFAULT_BACKOFF_CONFIG)
.connect_timeout(Some(config.connection.connect_timeout))
.timeout(Some(config.connection.timeout));
if let Some(sasl) = &config.connection.sasl {
builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
};

View File

@@ -186,6 +186,9 @@ impl MetaClientBuilder {
let mgr = client.channel_manager.clone();
if self.enable_heartbeat {
if self.heartbeat_channel_manager.is_some() {
info!("Enable heartbeat channel using the heartbeat channel manager.");
}
let mgr = self.heartbeat_channel_manager.unwrap_or(mgr.clone());
client.heartbeat = Some(HeartbeatClient::new(
self.id,
@@ -525,7 +528,7 @@ impl MetaClient {
self.heartbeat_client()?.ask_leader().await
}
/// Returns a heartbeat bidirectional streaming: (sender, recever), the
/// Returns a heartbeat bidirectional streaming: (sender, receiver), the
/// other end is the leader of `metasrv`.
///
/// The `datanode` needs to use the sender to continuously send heartbeat

View File

@@ -24,7 +24,7 @@ use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::warn;
use rand::seq::SliceRandom;
use snafu::{OptionExt, ResultExt};
use snafu::ResultExt;
use tokio::time::timeout;
use tonic::transport::Channel;
@@ -101,12 +101,14 @@ impl AskLeader {
};
let (tx, mut rx) = tokio::sync::mpsc::channel(peers.len());
let channel_manager = self.channel_manager.clone();
for addr in &peers {
let mut client = self.create_asker(addr)?;
let tx_clone = tx.clone();
let req = req.clone();
let addr = addr.to_string();
let addr = addr.clone();
let channel_manager = channel_manager.clone();
tokio::spawn(async move {
match client.ask_leader(req).await {
Ok(res) => {
@@ -117,13 +119,19 @@ impl AskLeader {
};
}
Err(status) => {
// Reset cached channel even on generic errors: the VIP may keep us on a dead
// backend, so forcing a reconnect gives us a chance to hit a healthy peer.
Self::reset_channels_with_manager(
&channel_manager,
std::slice::from_ref(&addr),
);
warn!("Failed to ask leader from: {addr}, {status}");
}
}
});
}
let leader = timeout(
let leader = match timeout(
self.channel_manager
.config()
.timeout
@@ -131,8 +139,16 @@ impl AskLeader {
rx.recv(),
)
.await
.context(error::AskLeaderTimeoutSnafu)?
.context(error::NoLeaderSnafu)?;
{
Ok(Some(leader)) => leader,
Ok(None) => return error::NoLeaderSnafu.fail(),
Err(e) => {
// All peers timed out. Reset channels to force reconnection,
// which may help escape dead backends in VIP/LB scenarios.
Self::reset_channels_with_manager(&self.channel_manager, &peers);
return Err(e).context(error::AskLeaderTimeoutSnafu);
}
};
let mut leadership_group = self.leadership_group.write().unwrap();
leadership_group.leader = Some(leader.clone());
@@ -169,6 +185,15 @@ impl AskLeader {
.context(error::CreateChannelSnafu)?,
))
}
/// Drop cached channels for the given peers so a fresh connection is used next time.
fn reset_channels_with_manager(channel_manager: &ChannelManager, peers: &[String]) {
if peers.is_empty() {
return;
}
channel_manager.retain_channel(|addr, _| !peers.iter().any(|peer| peer == addr));
}
}
#[async_trait]

View File

@@ -18,6 +18,10 @@ use std::time::Duration;
use client::RegionFollowerClientRef;
use common_base::Plugins;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::distributed_time_constants::{
HEARTBEAT_CHANNEL_KEEP_ALIVE_INTERVAL_SECS, HEARTBEAT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECS,
HEARTBEAT_TIMEOUT,
};
use common_telemetry::{debug, info};
use serde::{Deserialize, Serialize};
@@ -34,8 +38,6 @@ pub struct MetaClientOptions {
#[serde(with = "humantime_serde")]
pub timeout: Duration,
#[serde(with = "humantime_serde")]
pub heartbeat_timeout: Duration,
#[serde(with = "humantime_serde")]
pub ddl_timeout: Duration,
#[serde(with = "humantime_serde")]
pub connect_timeout: Duration,
@@ -52,7 +54,6 @@ impl Default for MetaClientOptions {
Self {
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
timeout: Duration::from_millis(3_000u64),
heartbeat_timeout: Duration::from_millis(500u64),
ddl_timeout: Duration::from_millis(10_000u64),
connect_timeout: Duration::from_millis(1_000u64),
tcp_nodelay: true,
@@ -97,7 +98,11 @@ pub async fn create_meta_client(
.timeout(meta_client_options.timeout)
.connect_timeout(meta_client_options.connect_timeout)
.tcp_nodelay(meta_client_options.tcp_nodelay);
let heartbeat_config = base_config.clone();
let heartbeat_config = base_config
.clone()
.timeout(HEARTBEAT_TIMEOUT)
.http2_keep_alive_interval(HEARTBEAT_CHANNEL_KEEP_ALIVE_INTERVAL_SECS)
.http2_keep_alive_timeout(HEARTBEAT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECS);
if let MetaClientType::Frontend = client_type {
let ddl_config = base_config.clone().timeout(meta_client_options.ddl_timeout);

View File

@@ -40,7 +40,7 @@ use common_telemetry::info;
#[cfg(feature = "pg_kvbackend")]
use deadpool_postgres::{Config, Runtime};
use either::Either;
use etcd_client::Client;
use etcd_client::{Client, ConnectOptions};
use servers::configurator::ConfiguratorRef;
use servers::export_metrics::ExportMetricsTask;
use servers::http::{HttpServer, HttpServerBuilder};
@@ -70,7 +70,9 @@ use crate::election::rds::postgres::PgElection;
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
use crate::election::CANDIDATE_LEASE_SECS;
use crate::metasrv::builder::MetasrvBuilder;
use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectTarget, SelectorRef};
use crate::metasrv::{
BackendClientOptions, BackendImpl, Metasrv, MetasrvOptions, SelectTarget, SelectorRef,
};
use crate::node_excluder::NodeExcluderRef;
use crate::selector::lease_based::LeaseBasedSelector;
use crate::selector::load_based::LoadBasedSelector;
@@ -270,7 +272,12 @@ macro_rules! add_compressed_service {
}
pub fn router(metasrv: Arc<Metasrv>) -> Router {
let mut router = tonic::transport::Server::builder().accept_http1(true); // for admin services
let mut router = tonic::transport::Server::builder()
// for admin services
.accept_http1(true)
// For quick network failures detection.
.http2_keepalive_interval(Some(metasrv.options().grpc.http2_keep_alive_interval))
.http2_keepalive_timeout(Some(metasrv.options().grpc.http2_keep_alive_timeout));
let router = add_compressed_service!(router, HeartbeatServer::from_arc(metasrv.clone()));
let router = add_compressed_service!(router, StoreServer::from_arc(metasrv.clone()));
let router = add_compressed_service!(router, ClusterServer::from_arc(metasrv.clone()));
@@ -287,7 +294,7 @@ pub async fn metasrv_builder(
(Some(kv_backend), _) => (kv_backend, None),
(None, BackendImpl::MemoryStore) => (Arc::new(MemoryKvBackend::new()) as _, None),
(None, BackendImpl::EtcdStore) => {
let etcd_client = create_etcd_client(&opts.store_addrs).await?;
let etcd_client = create_etcd_client(&opts.store_addrs, &opts.backend_client).await?;
let kv_backend = EtcdStore::with_etcd_client(etcd_client.clone(), opts.max_txn_ops);
let election = EtcdElection::with_etcd_client(
&opts.grpc.server_addr,
@@ -435,13 +442,20 @@ pub async fn metasrv_builder(
.plugins(plugins))
}
pub async fn create_etcd_client(store_addrs: &[String]) -> Result<Client> {
pub async fn create_etcd_client(
store_addrs: &[String],
options: &BackendClientOptions,
) -> Result<Client> {
let etcd_endpoints = store_addrs
.iter()
.map(|x| x.trim())
.filter(|x| !x.is_empty())
.collect::<Vec<_>>();
Client::connect(&etcd_endpoints, None)
let options = ConnectOptions::new()
.with_keep_alive_while_idle(true)
.with_keep_alive(options.keep_alive_interval, options.keep_alive_timeout)
.with_connect_timeout(options.connect_timeout);
Client::connect(&etcd_endpoints, Some(options))
.await
.context(error::ConnectEtcdSnafu)
}

View File

@@ -63,22 +63,6 @@ pub struct EtcdElection {
}
impl EtcdElection {
pub async fn with_endpoints<E, S>(
leader_value: E,
endpoints: S,
store_key_prefix: String,
) -> Result<ElectionRef>
where
E: AsRef<str>,
S: AsRef<[E]>,
{
let client = Client::connect(endpoints, None)
.await
.context(error::ConnectEtcdSnafu)?;
Self::with_etcd_client(leader_value, client, store_key_prefix).await
}
pub async fn with_etcd_client<E>(
leader_value: E,
client: Client,

View File

@@ -1190,7 +1190,7 @@ mod tests {
));
handles.push(handle);
}
// Wait for candidates to registrate themselves and renew their leases at least once.
// Wait for candidates to register themselves and renew their leases at least once.
tokio::time::sleep(candidate_lease_ttl / 2 + Duration::from_secs(1)).await;
let (tx, _) = broadcast::channel(100);

View File

@@ -1012,7 +1012,7 @@ mod tests {
));
handles.push(handle);
}
// Wait for candidates to registrate themselves and renew their leases at least once.
// Wait for candidates to register themselves and renew their leases at least once.
tokio::time::sleep(Duration::from_secs(3)).await;
let (tx, _) = broadcast::channel(100);

View File

@@ -15,7 +15,6 @@
use std::collections::VecDeque;
use std::time::Duration;
use common_meta::distributed_time_constants;
use serde::{Deserialize, Serialize};
/// This is our port of Akka's "[PhiAccrualFailureDetector](https://github.com/akka/akka/blob/v2.6.21/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala)"
@@ -83,9 +82,7 @@ impl Default for PhiAccrualFailureDetectorOptions {
Self {
threshold: 8_f32,
min_std_deviation: Duration::from_millis(100),
acceptable_heartbeat_pause: Duration::from_secs(
distributed_time_constants::DATANODE_LEASE_SECS,
),
acceptable_heartbeat_pause: Duration::from_secs(10),
first_heartbeat_estimate: Duration::from_millis(1000),
}
}

View File

@@ -268,6 +268,15 @@ impl Pushers {
async fn remove(&self, pusher_id: &str) -> Option<Pusher> {
self.0.write().await.remove(pusher_id)
}
pub(crate) async fn clear(&self) -> Vec<String> {
let mut pushers = self.0.write().await;
let keys = pushers.keys().cloned().collect::<Vec<_>>();
if !keys.is_empty() {
pushers.clear();
}
keys
}
}
#[derive(Clone)]
@@ -304,10 +313,11 @@ impl HeartbeatHandlerGroup {
/// Deregisters the heartbeat response [`Pusher`] with the given key from the group.
///
/// Returns the [`Pusher`] if it exists.
pub async fn deregister_push(&self, pusher_id: PusherId) -> Option<Pusher> {
METRIC_META_HEARTBEAT_CONNECTION_NUM.dec();
pub async fn deregister_push(&self, pusher_id: PusherId) {
info!("Pusher unregister: {}", pusher_id);
self.pushers.remove(&pusher_id.string_key()).await
if self.pushers.remove(&pusher_id.string_key()).await.is_some() {
METRIC_META_HEARTBEAT_CONNECTION_NUM.dec();
}
}
/// Returns the [`Pushers`] of the group.
@@ -516,6 +526,14 @@ impl Mailbox for HeartbeatMailbox {
Ok(())
}
async fn reset(&self) {
let keys = self.pushers.clear().await;
if !keys.is_empty() {
info!("Reset mailbox, deregister pushers: {:?}", keys);
METRIC_META_HEARTBEAT_CONNECTION_NUM.sub(keys.len() as i64);
}
}
}
/// The builder to build the group of heartbeat handlers.

View File

@@ -124,7 +124,7 @@ mod test {
use std::sync::Arc;
use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
use common_meta::distributed_time_constants;
use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::key::TableMetadataManager;
@@ -223,7 +223,7 @@ mod test {
let opening_region_keeper = Arc::new(MemoryRegionKeeper::default());
let handler = RegionLeaseHandler::new(
distributed_time_constants::REGION_LEASE_SECS,
default_distributed_time_constants().region_lease.as_secs() as u64,
table_metadata_manager.clone(),
opening_region_keeper.clone(),
None,
@@ -253,7 +253,7 @@ mod test {
assert_eq!(
acc.region_lease.as_ref().unwrap().lease_seconds,
distributed_time_constants::REGION_LEASE_SECS
default_distributed_time_constants().region_lease.as_secs()
);
assert_region_lease(
@@ -287,7 +287,7 @@ mod test {
assert_eq!(
acc.region_lease.as_ref().unwrap().lease_seconds,
distributed_time_constants::REGION_LEASE_SECS
default_distributed_time_constants().region_lease.as_secs()
);
assert_region_lease(
@@ -366,7 +366,7 @@ mod test {
});
let handler = RegionLeaseHandler::new(
distributed_time_constants::REGION_LEASE_SECS,
default_distributed_time_constants().region_lease.as_secs(),
table_metadata_manager.clone(),
Default::default(),
None,

View File

@@ -21,7 +21,7 @@ use std::task::{Context, Poll};
use api::v1::meta::heartbeat_request::NodeWorkloads;
use common_error::ext::BoxedError;
use common_meta::cluster::{NodeInfo, NodeInfoKey, Role as ClusterRole};
use common_meta::distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS;
use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::kv_backend::{KvBackend, ResettableKvBackendRef};
use common_meta::peer::{Peer, PeerLookupService};
use common_meta::rpc::store::RangeRequest;
@@ -312,7 +312,9 @@ impl PeerLookupService for MetaPeerLookupService {
lookup_frontends(
&self.meta_peer_client,
// TODO(zyy17): How to get the heartbeat interval of the frontend if it uses a custom heartbeat interval?
FRONTEND_HEARTBEAT_INTERVAL_MILLIS,
default_distributed_time_constants()
.frontend_heartbeat_interval
.as_secs(),
)
.await
.map_err(BoxedError::new)

View File

@@ -27,7 +27,7 @@ use common_event_recorder::EventRecorderOptions;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl_manager::DdlManagerRef;
use common_meta::distributed_time_constants;
use common_meta::distributed_time_constants::{self, default_distributed_time_constants};
use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
@@ -98,6 +98,27 @@ pub enum BackendImpl {
MysqlStore,
}
#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)]
#[serde(default)]
pub struct BackendClientOptions {
#[serde(with = "humantime_serde")]
pub keep_alive_timeout: Duration,
#[serde(with = "humantime_serde")]
pub keep_alive_interval: Duration,
#[serde(with = "humantime_serde")]
pub connect_timeout: Duration,
}
impl Default for BackendClientOptions {
fn default() -> Self {
Self {
keep_alive_interval: Duration::from_secs(10),
keep_alive_timeout: Duration::from_secs(3),
connect_timeout: Duration::from_secs(3),
}
}
}
#[derive(Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct MetasrvOptions {
@@ -113,12 +134,22 @@ pub struct MetasrvOptions {
/// Only applicable when using PostgreSQL or MySQL as the metadata store
#[serde(default)]
pub backend_tls: Option<TlsOption>,
/// The backend client options.
/// Currently, only applicable when using etcd as the metadata store.
#[serde(default)]
pub backend_client: BackendClientOptions,
/// The type of selector.
pub selector: SelectorType,
/// Whether to use the memory store.
pub use_memory_store: bool,
/// Whether to enable region failover.
pub enable_region_failover: bool,
/// The base heartbeat interval.
///
/// This value is used to calculate the distributed time constants for components.
/// e.g., the region lease time is `heartbeat_interval * 3 + Duration::from_secs(1)`.
#[serde(with = "humantime_serde")]
pub heartbeat_interval: Duration,
/// The delay before starting region failure detection.
/// This delay helps prevent Metasrv from triggering unnecessary region failovers before all Datanodes are fully started.
/// Especially useful when the cluster is not deployed with GreptimeDB Operator and maintenance mode is not enabled.
@@ -211,7 +242,9 @@ impl fmt::Debug for MetasrvOptions {
.field("max_txn_ops", &self.max_txn_ops)
.field("flush_stats_factor", &self.flush_stats_factor)
.field("tracing", &self.tracing)
.field("backend", &self.backend);
.field("backend", &self.backend)
.field("heartbeat_interval", &self.heartbeat_interval)
.field("backend_client", &self.backend_client);
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
debug_struct.field("meta_table_name", &self.meta_table_name);
@@ -239,6 +272,7 @@ impl Default for MetasrvOptions {
selector: SelectorType::default(),
use_memory_store: false,
enable_region_failover: false,
heartbeat_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
region_failure_detector_initialization_delay: Duration::from_secs(10 * 60),
allow_region_failover_on_local_wal: false,
grpc: GrpcOptions {
@@ -273,6 +307,7 @@ impl Default for MetasrvOptions {
meta_election_lock_id: common_meta::kv_backend::DEFAULT_META_ELECTION_LOCK_ID,
node_max_idle_time: Duration::from_secs(24 * 60 * 60),
event_recorder: EventRecorderOptions::default(),
backend_client: BackendClientOptions::default(),
}
}
}
@@ -388,6 +423,7 @@ pub struct MetaStateHandler {
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
leadership_change_notifier: LeadershipChangeNotifier,
mailbox: MailboxRef,
state: StateRef,
}
@@ -411,6 +447,9 @@ impl MetaStateHandler {
pub async fn on_leader_stop(&self) {
self.state.write().unwrap().next_state(become_follower());
// Enforces the mailbox to clear all pushers.
// The remaining heartbeat connections will be closed by the remote peer or keep-alive detection.
self.mailbox.reset().await;
self.leadership_change_notifier
.notify_on_leader_stop()
.await;
@@ -528,6 +567,7 @@ impl Metasrv {
state: self.state.clone(),
leader_cached_kv_backend: leader_cached_kv_backend.clone(),
leadership_change_notifier,
mailbox: self.mailbox.clone(),
};
let _handle = common_runtime::spawn_global(async move {
loop {
@@ -655,7 +695,9 @@ impl Metasrv {
lookup_datanode_peer(
peer_id,
&self.meta_peer_client,
distributed_time_constants::DATANODE_LEASE_SECS,
default_distributed_time_constants()
.datanode_lease
.as_secs(),
)
.await
}

View File

@@ -27,7 +27,7 @@ use common_meta::ddl::{
DdlContext, NoopRegionFailureDetectorControl, RegionFailureDetectorControllerRef,
};
use common_meta::ddl_manager::DdlManager;
use common_meta::distributed_time_constants;
use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::key::flow::flow_state::FlowStateManager;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::runtime_switch::{RuntimeSwitchManager, RuntimeSwitchManagerRef};
@@ -220,8 +220,12 @@ impl MetasrvBuilder {
let selector_ctx = SelectorContext {
server_addr: options.grpc.server_addr.clone(),
datanode_lease_secs: distributed_time_constants::DATANODE_LEASE_SECS,
flownode_lease_secs: distributed_time_constants::FLOWNODE_LEASE_SECS,
datanode_lease_secs: default_distributed_time_constants()
.datanode_lease
.as_secs(),
flownode_lease_secs: default_distributed_time_constants()
.flownode_lease
.as_secs(),
kv_backend: kv_backend.clone(),
meta_peer_client: meta_peer_client.clone(),
table_id: None,
@@ -438,7 +442,7 @@ impl MetasrvBuilder {
Some(handler_group_builder) => handler_group_builder,
None => {
let region_lease_handler = RegionLeaseHandler::new(
distributed_time_constants::REGION_LEASE_SECS,
default_distributed_time_constants().region_lease.as_secs(),
table_metadata_manager.clone(),
memory_region_keeper.clone(),
customized_region_lease_renewer,

View File

@@ -770,7 +770,7 @@ mod tests {
use std::assert_matches::assert_matches;
use std::sync::Arc;
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::instruction::Instruction;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::rpc::router::{Region, RegionRoute};
@@ -1004,8 +1004,10 @@ mod tests {
.run_once()
.await;
let region_lease = default_distributed_time_constants().region_lease.as_secs();
// Ensure it didn't run into the slow path.
assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2);
assert!(timer.elapsed().as_secs() < region_lease / 2);
runner.suite.verify_table_metadata().await;
}
@@ -1059,8 +1061,9 @@ mod tests {
.run_once()
.await;
let region_lease = default_distributed_time_constants().region_lease.as_secs();
// Ensure it didn't run into the slow path.
assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2);
assert!(timer.elapsed().as_secs() < region_lease / 2);
runner.suite.verify_table_metadata().await;
}
@@ -1380,8 +1383,9 @@ mod tests {
.run_once()
.await;
let region_lease = default_distributed_time_constants().region_lease.as_secs();
// Ensure it didn't run into the slow path.
assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS);
assert!(timer.elapsed().as_secs() < region_lease);
runner.suite.verify_table_metadata().await;
}
}

View File

@@ -13,10 +13,9 @@
// limitations under the License.
use std::any::Any;
use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::key::datanode_table::RegionInfo;
use common_meta::RegionIdent;
@@ -31,9 +30,6 @@ use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
use crate::procedure::region_migration::{Context, State};
use crate::service::mailbox::Channel;
/// Uses lease time of a region as the timeout of closing a downgraded region.
const CLOSE_DOWNGRADED_REGION_TIMEOUT: Duration = Duration::from_secs(REGION_LEASE_SECS);
#[derive(Debug, Serialize, Deserialize)]
pub struct CloseDowngradedRegion;
@@ -111,7 +107,7 @@ impl CloseDowngradedRegion {
let ch = Channel::Datanode(downgrade_leader_datanode.id);
let receiver = ctx
.mailbox
.send(&ch, msg, CLOSE_DOWNGRADED_REGION_TIMEOUT)
.send(&ch, msg, default_distributed_time_constants().region_lease)
.await?;
match receiver.await {

View File

@@ -17,7 +17,7 @@ use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_error::ext::BoxedError;
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::instruction::{
DowngradeRegion, DowngradeRegionReply, Instruction, InstructionReply,
};
@@ -64,7 +64,7 @@ impl State for DowngradeLeaderRegion {
let now = Instant::now();
// Ensures the `leader_region_lease_deadline` must exist after recovering.
ctx.volatile_ctx
.set_leader_region_lease_deadline(Duration::from_secs(REGION_LEASE_SECS));
.set_leader_region_lease_deadline(default_distributed_time_constants().region_lease);
match self.downgrade_region_with_retry(ctx).await {
Ok(_) => {
@@ -250,14 +250,14 @@ impl DowngradeLeaderRegion {
if let Some(last_connection_at) = last_connection_at {
let now = current_time_millis();
let elapsed = now - last_connection_at;
let region_lease = Duration::from_secs(REGION_LEASE_SECS);
let region_lease = default_distributed_time_constants().region_lease;
// It's safe to update the region leader lease deadline here because:
// 1. The old region leader has already been marked as downgraded in metadata,
// which means any attempts to renew its lease will be rejected.
// 2. The pusher disconnect time record only gets removed when the datanode (from_peer)
// establishes a new heartbeat connection stream.
if elapsed >= (REGION_LEASE_SECS * 1000) as i64 {
if elapsed >= (region_lease.as_secs() * 1000) as i64 {
ctx.volatile_ctx.reset_leader_region_lease_deadline();
info!(
"Datanode {}({}) has been disconnected for longer than the region lease period ({:?}), reset leader region lease deadline to None, region: {}",
@@ -663,7 +663,8 @@ mod tests {
let procedure_ctx = new_procedure_context();
let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
let elapsed = timer.elapsed().as_secs();
assert!(elapsed < REGION_LEASE_SECS / 2);
let region_lease = default_distributed_time_constants().region_lease.as_secs();
assert!(elapsed < region_lease / 2);
assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, Some(1));
assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());

View File

@@ -13,10 +13,9 @@
// limitations under the License.
use std::any::Any;
use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
use common_meta::key::datanode_table::RegionInfo;
use common_meta::RegionIdent;
@@ -32,9 +31,6 @@ use crate::procedure::region_migration::flush_leader_region::PreFlushRegion;
use crate::procedure::region_migration::{Context, State};
use crate::service::mailbox::Channel;
/// Uses lease time of a region as the timeout of opening a candidate region.
const OPEN_CANDIDATE_REGION_TIMEOUT: Duration = Duration::from_secs(REGION_LEASE_SECS);
#[derive(Debug, Serialize, Deserialize)]
pub struct OpenCandidateRegion;
@@ -143,7 +139,7 @@ impl OpenCandidateRegion {
let now = Instant::now();
let receiver = ctx
.mailbox
.send(&ch, msg, OPEN_CANDIDATE_REGION_TIMEOUT)
.send(&ch, msg, default_distributed_time_constants().region_lease)
.await?;
match receiver.await {

View File

@@ -14,7 +14,7 @@
use std::sync::Arc;
use common_meta::distributed_time_constants::{FLOWNODE_LEASE_SECS, REGION_LEASE_SECS};
use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::peer::Peer;
use rand::prelude::SliceRandom;
@@ -36,8 +36,10 @@ pub fn new_test_selector_context() -> SelectorContext {
SelectorContext {
server_addr: "127.0.0.1:3002".to_string(),
datanode_lease_secs: REGION_LEASE_SECS,
flownode_lease_secs: FLOWNODE_LEASE_SECS,
datanode_lease_secs: default_distributed_time_constants().region_lease.as_secs(),
flownode_lease_secs: default_distributed_time_constants()
.flownode_lease
.as_secs(),
kv_backend,
meta_peer_client,
table_id: None,

View File

@@ -27,10 +27,9 @@ use snafu::OptionExt;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Streaming};
use tonic::{Request, Response, Status, Streaming};
use crate::error;
use crate::error::Result;
use crate::error::{self, Result};
use crate::handler::{HeartbeatHandlerGroup, Pusher, PusherId};
use crate::metasrv::{Context, Metasrv};
use crate::metrics::METRIC_META_HEARTBEAT_RECV;
@@ -99,6 +98,7 @@ impl heartbeat_server::Heartbeat for Metasrv {
break;
}
}
error!(err; "Sending heartbeat response error");
if tx.send(Err(err)).await.is_err() {
info!("ReceiverStream was dropped; shutting down");
@@ -109,6 +109,12 @@ impl heartbeat_server::Heartbeat for Metasrv {
if is_not_leader {
warn!("Quit because it is no longer the leader");
let _ = tx
.send(Err(Status::aborted(format!(
"The requested metasrv node is not leader, node addr: {}",
ctx.server_addr
))))
.await;
break;
}
}

View File

@@ -207,6 +207,9 @@ pub trait Mailbox: Send + Sync {
async fn broadcast(&self, ch: &BroadcastChannel, msg: &MailboxMessage) -> Result<()>;
async fn on_recv(&self, id: MessageId, maybe_msg: Result<MailboxMessage>) -> Result<()>;
/// Reset all pushers of the mailbox.
async fn reset(&self);
}
#[cfg(test)]

View File

@@ -29,7 +29,6 @@ itertools.workspace = true
lazy_static = "1.4"
mito-codec.workspace = true
mito2.workspace = true
moka.workspace = true
mur3 = "0.1"
object-store.workspace = true
prometheus.workspace = true

View File

@@ -13,7 +13,6 @@
// limitations under the License.
use std::any::Any;
use std::sync::Arc;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
@@ -305,13 +304,6 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Get value from cache"))]
CacheGet {
source: Arc<Error>,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -370,8 +362,6 @@ impl ErrorExt for Error {
StartRepeatedTask { source, .. } => source.status_code(),
MetricManifestInfo { .. } => StatusCode::Internal,
CacheGet { source, .. } => source.status_code(),
}
}

View File

@@ -13,23 +13,19 @@
// limitations under the License.
use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, HashMap};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
use async_stream::try_stream;
use base64::engine::general_purpose::STANDARD_NO_PAD;
use base64::Engine;
use common_base::readable_size::ReadableSize;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::prelude::{col, lit};
use futures_util::stream::BoxStream;
use futures_util::TryStreamExt;
use mito2::engine::MitoEngine;
use moka::future::Cache;
use moka::policy::EvictionPolicy;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::ColumnMetadata;
use store_api::metric_engine_consts::{
@@ -43,9 +39,9 @@ use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
use crate::error::{
CacheGetSnafu, CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu,
DeserializeColumnMetadataSnafu, LogicalRegionNotFoundSnafu, MitoReadOperationSnafu,
MitoWriteOperationSnafu, ParseRegionIdSnafu, Result,
CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu, DeserializeColumnMetadataSnafu,
LogicalRegionNotFoundSnafu, MitoReadOperationSnafu, MitoWriteOperationSnafu,
ParseRegionIdSnafu, Result,
};
use crate::utils;
@@ -66,11 +62,6 @@ const COLUMN_PREFIX: &str = "__column_";
/// itself.
pub struct MetadataRegion {
pub(crate) mito: MitoEngine,
/// The cache for contents(key-value pairs) of region metadata.
///
/// The cache should be invalidated when any new values are put into the metadata region or any
/// values are deleted from the metadata region.
cache: Cache<RegionId, RegionMetadataCacheEntry>,
/// Logical lock for operations that need to be serialized. Like update & read region columns.
///
/// Region entry will be registered on creating and opening logical region, and deregistered on
@@ -78,30 +69,10 @@ pub struct MetadataRegion {
logical_region_lock: RwLock<HashMap<RegionId, Arc<RwLock<()>>>>,
}
#[derive(Clone)]
struct RegionMetadataCacheEntry {
key_values: Arc<BTreeMap<String, String>>,
size: usize,
}
/// The max size of the region metadata cache.
const MAX_CACHE_SIZE: u64 = ReadableSize::mb(128).as_bytes();
/// The TTL of the region metadata cache.
const CACHE_TTL: Duration = Duration::from_secs(5 * 60);
impl MetadataRegion {
pub fn new(mito: MitoEngine) -> Self {
let cache = Cache::builder()
.max_capacity(MAX_CACHE_SIZE)
// Use the LRU eviction policy to minimize frequent mito scans.
// Recently accessed items are retained longer in the cache.
.eviction_policy(EvictionPolicy::lru())
.time_to_live(CACHE_TTL)
.weigher(|_, v: &RegionMetadataCacheEntry| v.size as u32)
.build();
Self {
mito,
cache,
logical_region_lock: RwLock::new(HashMap::new()),
}
}
@@ -380,60 +351,21 @@ impl MetadataRegion {
}
}
fn build_read_request() -> ScanRequest {
let projection = vec![
METADATA_SCHEMA_KEY_COLUMN_INDEX,
METADATA_SCHEMA_VALUE_COLUMN_INDEX,
];
ScanRequest {
projection: Some(projection),
..Default::default()
}
}
async fn load_all(&self, metadata_region_id: RegionId) -> Result<RegionMetadataCacheEntry> {
let scan_req = MetadataRegion::build_read_request();
pub async fn get_all_with_prefix(
&self,
region_id: RegionId,
prefix: &str,
) -> Result<HashMap<String, String>> {
let scan_req = MetadataRegion::build_prefix_read_request(prefix, false);
let record_batch_stream = self
.mito
.scan_to_stream(metadata_region_id, scan_req)
.scan_to_stream(region_id, scan_req)
.await
.context(MitoReadOperationSnafu)?;
let kv = decode_batch_stream(record_batch_stream, decode_record_batch_to_key_and_value)
.try_collect::<BTreeMap<_, _>>()
.await?;
let mut size = 0;
for (k, v) in kv.iter() {
size += k.len();
size += v.len();
}
let kv = Arc::new(kv);
Ok(RegionMetadataCacheEntry {
key_values: kv,
size,
})
}
async fn get_all_with_prefix(
&self,
metadata_region_id: RegionId,
prefix: &str,
) -> Result<HashMap<String, String>> {
let region_metadata = self
.cache
.try_get_with(metadata_region_id, self.load_all(metadata_region_id))
decode_batch_stream(record_batch_stream, decode_record_batch_to_key_and_value)
.try_collect::<HashMap<_, _>>()
.await
.context(CacheGetSnafu)?;
let range = region_metadata.key_values.range(prefix.to_string()..);
let mut result = HashMap::new();
for (k, v) in range {
if !k.starts_with(prefix) {
break;
}
result.insert(k.to_string(), v.to_string());
}
Ok(result)
}
pub async fn get_all_key_with_prefix(
@@ -455,18 +387,15 @@ impl MetadataRegion {
/// Delete the given keys. For performance consideration, this method
/// doesn't check if those keys exist or not.
async fn delete(&self, metadata_region_id: RegionId, keys: &[String]) -> Result<()> {
async fn delete(&self, region_id: RegionId, keys: &[String]) -> Result<()> {
let delete_request = Self::build_delete_request(keys);
self.mito
.handle_request(
metadata_region_id,
region_id,
store_api::region_request::RegionRequest::Delete(delete_request),
)
.await
.context(MitoWriteOperationSnafu)?;
// Invalidates the region metadata cache if any values are deleted from the metadata region.
self.cache.invalidate(&metadata_region_id).await;
Ok(())
}
@@ -556,7 +485,7 @@ impl MetadataRegion {
write_region_id: bool,
logical_regions: impl Iterator<Item = (RegionId, HashMap<&str, &ColumnMetadata>)>,
) -> Result<()> {
let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
let region_id = utils::to_metadata_region_id(physical_region_id);
let iter = logical_regions
.into_iter()
.flat_map(|(logical_region_id, column_metadatas)| {
@@ -583,13 +512,11 @@ impl MetadataRegion {
let put_request = MetadataRegion::build_put_request_from_iter(iter.into_iter());
self.mito
.handle_request(
metadata_region_id,
region_id,
store_api::region_request::RegionRequest::Put(put_request),
)
.await
.context(MitoWriteOperationSnafu)?;
// Invalidates the region metadata cache if any new values are put into the metadata region.
self.cache.invalidate(&metadata_region_id).await;
Ok(())
}

View File

@@ -76,7 +76,7 @@ pub struct RegionManifestOptions {
/// -RegionMetadataRef metadata
/// }
/// class RegionEdit {
/// -VersionNumber regoin_version
/// -VersionNumber region_version
/// -Vec~FileMeta~ files_to_add
/// -Vec~FileMeta~ files_to_remove
/// -SequenceNumber flushed_sequence

View File

@@ -371,7 +371,7 @@ impl VersionBuilder {
self
}
/// Sets truncated entty id.
/// Sets truncated entry id.
pub(crate) fn truncated_entry_id(mut self, entry_id: Option<EntryId>) -> Self {
self.truncated_entry_id = entry_id;
self

View File

@@ -344,7 +344,7 @@ mod tests {
#[test]
fn test_collect_and_basic() {
let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_and_basic_");
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_collect_and_basic_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -353,7 +353,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let expr = Expr::BinaryExpr(BinaryExpr {

View File

@@ -72,7 +72,7 @@ mod tests {
#[test]
fn test_collect_between_basic() {
let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_between_basic_");
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_collect_between_basic_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
@@ -80,7 +80,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let between = Between {
@@ -113,7 +113,7 @@ mod tests {
#[test]
fn test_collect_between_negated() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_collect_between_negated_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -122,7 +122,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let between = Between {
@@ -138,7 +138,7 @@ mod tests {
#[test]
fn test_collect_between_field_column() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_collect_between_field_column_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -147,7 +147,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let between = Between {
@@ -180,7 +180,7 @@ mod tests {
#[test]
fn test_collect_between_type_mismatch() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_collect_between_type_mismatch_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -189,7 +189,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let between = Between {
@@ -206,7 +206,7 @@ mod tests {
#[test]
fn test_collect_between_nonexistent_column() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_collect_between_nonexistent_column_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -215,7 +215,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let between = Between {

View File

@@ -227,7 +227,7 @@ mod tests {
),
];
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_collect_comparison_basic_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -236,7 +236,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
for ((left, op, right), _) in &cases {
@@ -255,7 +255,7 @@ mod tests {
#[test]
fn test_collect_comparison_type_mismatch() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_collect_comparison_type_mismatch_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -264,7 +264,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let res = builder.collect_comparison_expr(&tag_column(), &Operator::Lt, &int64_lit(10));
@@ -274,7 +274,7 @@ mod tests {
#[test]
fn test_collect_comparison_field_column() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_collect_comparison_field_column_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -283,7 +283,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
builder
@@ -308,7 +308,7 @@ mod tests {
#[test]
fn test_collect_comparison_nonexistent_column() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_collect_comparison_nonexistent_column_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -317,7 +317,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let res = builder.collect_comparison_expr(

View File

@@ -136,7 +136,7 @@ mod tests {
#[test]
fn test_collect_eq_basic() {
let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_eq_basic_");
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_collect_eq_basic_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
@@ -144,7 +144,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
builder
@@ -172,7 +172,7 @@ mod tests {
#[test]
fn test_collect_eq_field_column() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_collect_eq_field_column_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -181,7 +181,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
builder
@@ -200,7 +200,7 @@ mod tests {
#[test]
fn test_collect_eq_nonexistent_column() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_collect_eq_nonexistent_column_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -209,7 +209,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let res = builder.collect_eq(&nonexistent_column(), &string_lit("abc"));
@@ -219,7 +219,7 @@ mod tests {
#[test]
fn test_collect_eq_type_mismatch() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_collect_eq_type_mismatch_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -228,7 +228,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let res = builder.collect_eq(&tag_column(), &int64_lit(1));
@@ -238,7 +238,7 @@ mod tests {
#[test]
fn test_collect_or_eq_list_basic() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_basic_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -247,7 +247,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
@@ -296,7 +296,7 @@ mod tests {
#[test]
fn test_collect_or_eq_list_invalid_op() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_invalid_op_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -305,7 +305,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
@@ -333,7 +333,7 @@ mod tests {
#[test]
fn test_collect_or_eq_list_multiple_columns() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_multiple_columns_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -342,7 +342,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let eq_expr = DfExpr::BinaryExpr(BinaryExpr {

View File

@@ -67,7 +67,7 @@ mod tests {
#[test]
fn test_collect_in_list_basic() {
let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_in_list_basic_");
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_collect_in_list_basic_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
@@ -75,7 +75,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let in_list = InList {
@@ -98,7 +98,7 @@ mod tests {
#[test]
fn test_collect_in_list_negated() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_collect_in_list_negated_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -107,7 +107,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let in_list = InList {
@@ -122,7 +122,7 @@ mod tests {
#[test]
fn test_collect_in_list_field_column() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_collect_in_list_field_column_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -131,7 +131,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let in_list = InList {
@@ -154,7 +154,7 @@ mod tests {
#[test]
fn test_collect_in_list_type_mismatch() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_collect_in_list_type_mismatch_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -163,7 +163,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let in_list = InList {
@@ -179,7 +179,7 @@ mod tests {
#[test]
fn test_collect_in_list_nonexistent_column() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_collect_in_list_nonexistent_column_");
let metadata = test_region_metadata();
@@ -189,7 +189,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let in_list = InList {

View File

@@ -59,7 +59,7 @@ mod tests {
#[test]
fn test_regex_match_basic() {
let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_regex_match_basic_");
let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_regex_match_basic_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
@@ -67,7 +67,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
builder
@@ -86,7 +86,7 @@ mod tests {
#[test]
fn test_regex_match_field_column() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_regex_match_field_column_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -95,7 +95,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
builder
@@ -114,7 +114,7 @@ mod tests {
#[test]
fn test_regex_match_type_mismatch() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_regex_match_type_mismatch_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -123,7 +123,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
builder
@@ -135,7 +135,7 @@ mod tests {
#[test]
fn test_regex_match_type_nonexist_column() {
let (_d, facotry) =
let (_d, factory) =
PuffinManagerFactory::new_for_test_block("test_regex_match_type_nonexist_column_");
let metadata = test_region_metadata();
let mut builder = InvertedIndexApplierBuilder::new(
@@ -144,7 +144,7 @@ mod tests {
test_object_store(),
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
factory,
);
let res = builder.collect_regex_match(&nonexistent_column(), &string_lit("abc"));

View File

@@ -70,7 +70,7 @@ impl ScalarCalculate {
interval: Millisecond,
input: LogicalPlan,
time_index: &str,
tag_colunms: &[String],
tag_columns: &[String],
field_column: &str,
table_name: Option<&str>,
) -> Result<Self> {
@@ -97,7 +97,7 @@ impl ScalarCalculate {
end,
interval,
time_index: time_index.to_string(),
tag_columns: tag_colunms.to_vec(),
tag_columns: tag_columns.to_vec(),
field_column: field_column.to_string(),
input,
output_schema: Arc::new(schema),

View File

@@ -82,7 +82,7 @@ impl ExtensionPlanner for MergeSortExtensionPlanner {
// and we only need to do a merge sort, otherwise fallback to quick sort
let can_merge_sort = partition_cnt >= region_cnt;
if can_merge_sort {
// TODO(discord9): use `SortPreversingMergeExec here`
// TODO(discord9): use `SortPreservingMergeExec here`
}
// for now merge sort only exist in logical plan, and have the same effect as `Sort`
// doesn't change the execution plan, this will change in the future

View File

@@ -23,7 +23,6 @@ use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_function::function::FunctionContext;
use common_query::prelude::GREPTIME_VALUE;
use common_telemetry::debug;
use datafusion::common::DFSchemaRef;
use datafusion::datasource::DefaultTableSource;
use datafusion::functions_aggregate::average::avg_udaf;
@@ -662,30 +661,10 @@ impl PromPlanner {
}
Ok(binary_expr)
};
fn optimize(plan: &LogicalPlan) -> LogicalPlan {
use datafusion_optimizer::OptimizerRule;
let new_plan =
datafusion::optimizer::optimize_projections::OptimizeProjections::new()
.rewrite(
plan.clone(),
&datafusion::optimizer::OptimizerContext::default(),
)
.unwrap()
.data;
if new_plan != *plan {
debug!(
"Optimized projection plan: {new_plan:#?}\n From old plan: {plan:#?}"
);
}
new_plan
}
if is_comparison_op && !should_return_bool {
self.filter_on_field_column(join_plan, bin_expr_builder)
} else {
self.projection_for_each_field_column(join_plan, bin_expr_builder)
.map(|p| optimize(&p))
}
}
}
@@ -3310,8 +3289,6 @@ mod test {
use common_base::Plugins;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::test_util::DummyDecoder;
use datafusion::functions_aggregate::count::count;
use datafusion_optimizer::OptimizerContext;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use promql_parser::label::Labels;
@@ -4932,132 +4909,6 @@ Filter: up.field_0 IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:
assert_eq!(plan.display_indent_schema().to_string(), expected);
}
#[tokio::test]
async fn test_nested_aggr_not_exists_table_label() {
let mut eval_stmt = EvalStmt {
expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
start: UNIX_EPOCH,
end: UNIX_EPOCH
.checked_add(Duration::from_secs(100_000))
.unwrap(),
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
};
let case = r#"count(count(node_cpu_seconds_total)) / node_load5"#;
let prom_expr = parser::parse(case).unwrap();
eval_stmt.expr = prom_expr;
let table_provider = build_test_table_provider_with_fields(
&[(DEFAULT_SCHEMA_NAME.to_string(), "metric_exists".to_string())],
&["job"],
)
.await;
let plan =
PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_query_engine_state())
.await
.unwrap();
let expected = r#"Projection: lhs.time, lhs.count(count(.value)) / rhs.value AS lhs.count(count(.value)) / rhs.value [time:Timestamp(Millisecond, None), lhs.count(count(.value)) / rhs.value:Float64;N]
Inner Join: lhs.time = rhs.time [time:Timestamp(Millisecond, None), count(.value):Int64, count(count(.value)):Int64, time:Timestamp(Millisecond, None), value:Float64;N]
SubqueryAlias: lhs [time:Timestamp(Millisecond, None), count(.value):Int64, count(count(.value)):Int64]
Sort: .time ASC NULLS LAST [time:Timestamp(Millisecond, None), count(.value):Int64, count(count(.value)):Int64]
Aggregate: groupBy=[[.time, count(.value)]], aggr=[[count(count(.value))]] [time:Timestamp(Millisecond, None), count(.value):Int64, count(count(.value)):Int64]
Sort: .time ASC NULLS LAST [time:Timestamp(Millisecond, None), count(.value):Int64]
Aggregate: groupBy=[[.time]], aggr=[[count(.value)]] [time:Timestamp(Millisecond, None), count(.value):Int64]
EmptyMetric: range=[0..-1], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]
SubqueryAlias: rhs [time:Timestamp(Millisecond, None), value:Float64;N]
EmptyMetric: range=[0..-1], interval=[5000] [time:Timestamp(Millisecond, None), value:Float64;N]"#;
let rhs = LogicalPlanBuilder::from(LogicalPlan::Extension(Extension {
node: Arc::new(
EmptyMetric::new(
0,
-1,
5000,
"time".to_string(),
"value".to_string(),
Some(lit(0.0f64)),
)
.unwrap(),
),
}))
.alias("rhs")
.unwrap()
.build()
.unwrap();
let full = LogicalPlanBuilder::from(LogicalPlan::Extension(Extension {
node: Arc::new(
EmptyMetric::new(
0,
-1,
5000,
"time".to_string(),
"value".to_string(),
Some(lit(0.0f64)),
)
.unwrap(),
),
}))
.aggregate(
vec![col(Column::new(Some(""), "time"))],
vec![count(col(Column::new(Some(""), "value")))],
)
.unwrap()
.sort(vec![SortExpr::new(
col(Column::new(Some(""), "time")),
true,
false,
)])
.unwrap()
.aggregate(
vec![col(Column::new(Some(""), "time"))],
vec![count(col("count(.value)"))],
)
.unwrap()
.sort(vec![SortExpr::new(
col(Column::new(Some(""), "time")),
true,
false,
)])
.unwrap()
.alias("lhs")
.unwrap()
.project(vec![
col("lhs.time"),
col(Column::new(Some("lhs"), "count(count(.value))")),
])
.unwrap()
.join(
rhs,
JoinType::Inner,
(
vec![Column::new(Some("lhs"), "time")],
vec![Column::new(Some("rhs"), "time")],
),
None,
)
.unwrap()
.build()
.unwrap();
dbg!(&full);
{
let optimizer = datafusion_optimizer::Optimizer::new();
let optimized_full_plan = optimizer
.optimize(full, &OptimizerContext::default(), |_, _| {})
.unwrap();
}
return;
assert_eq!(plan.display_indent_schema().to_string(), expected);
let optimizer = datafusion_optimizer::Optimizer::new();
let optimized_plan = optimizer
.optimize(plan, &OptimizerContext::default(), |_, _| {})
.unwrap();
println!("{}", optimized_plan.display_indent_schema().to_string());
}
#[tokio::test]
async fn test_histogram_quantile_missing_le_column() {
let mut eval_stmt = EvalStmt {

View File

@@ -89,7 +89,10 @@ opensrv-mysql = { git = "https://github.com/datafuselabs/opensrv", rev = "a1fb4d
opentelemetry-proto.workspace = true
otel-arrow-rust.workspace = true
parking_lot.workspace = true
pgwire = { version = "0.32", default-features = false, features = ["server-api-ring"] }
#pgwire = { version = "0.30", default-features = false, features = ["server-api-ring"] }
pgwire = { git = "https://github.com/sunng87/pgwire", rev = "127573d997228cfb70c7699881c568eae8131270", default-features = false, features = [
"server-api-ring",
] }
pin-project = "1.0"
pipeline.workspace = true
postgres-types = { version = "0.2", features = ["with-chrono-0_4", "with-serde_json-1"] }

View File

@@ -23,6 +23,7 @@ pub mod prom_query_gateway;
pub mod region_server;
use std::net::SocketAddr;
use std::time::Duration;
use api::v1::health_check_server::{HealthCheck, HealthCheckServer};
use api::v1::{HealthCheckRequest, HealthCheckResponse};
@@ -72,6 +73,12 @@ pub struct GrpcOptions {
pub runtime_size: usize,
#[serde(default = "Default::default")]
pub tls: TlsOption,
/// The HTTP/2 keep-alive interval.
#[serde(with = "humantime_serde")]
pub http2_keep_alive_interval: Duration,
/// The HTTP/2 keep-alive timeout.
#[serde(with = "humantime_serde")]
pub http2_keep_alive_timeout: Duration,
}
impl GrpcOptions {
@@ -129,6 +136,8 @@ impl Default for GrpcOptions {
flight_compression: FlightCompression::ArrowIpc,
runtime_size: 8,
tls: TlsOption::default(),
http2_keep_alive_interval: Duration::from_secs(10),
http2_keep_alive_timeout: Duration::from_secs(3),
}
}
}

View File

@@ -34,12 +34,10 @@ impl HeartbeatOptions {
pub fn frontend_default() -> Self {
Self {
// Frontend can send heartbeat with a longer interval.
interval: Duration::from_millis(
distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS,
),
retry_interval: Duration::from_millis(
distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS,
interval: distributed_time_constants::frontend_heartbeat_interval(
distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
),
retry_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
}
}
}
@@ -47,10 +45,8 @@ impl HeartbeatOptions {
impl Default for HeartbeatOptions {
fn default() -> Self {
Self {
interval: Duration::from_millis(distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS),
retry_interval: Duration::from_millis(
distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS,
),
interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
retry_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
}
}
}

View File

@@ -1066,7 +1066,7 @@ impl HttpServer {
/// Route Prometheus [HTTP API].
///
/// [HTTP API]: https://prometheus.io/docs/prometheus/latest/querying/api/
fn route_prometheus<S>(prometheus_handler: PrometheusHandlerRef) -> Router<S> {
pub fn route_prometheus<S>(prometheus_handler: PrometheusHandlerRef) -> Router<S> {
Router::new()
.route(
"/format_query",

View File

@@ -352,7 +352,7 @@ async fn dryrun_pipeline_inner(
)
.await?;
let colume_type_key = "colume_type";
let column_type_key = "column_type";
let data_type_key = "data_type";
let name_key = "name";
@@ -376,7 +376,7 @@ async fn dryrun_pipeline_inner(
JsonValue::String(cs.datatype().as_str_name().to_string()),
);
map.insert(
colume_type_key.to_string(),
column_type_key.to_string(),
JsonValue::String(cs.semantic_type().as_str_name().to_string()),
);
map.insert(
@@ -409,7 +409,7 @@ async fn dryrun_pipeline_inner(
);
map.insert(
"semantic_type".to_string(),
schema[idx][colume_type_key].clone(),
schema[idx][column_type_key].clone(),
);
map.insert(
"data_type".to_string(),

View File

@@ -32,9 +32,9 @@ use std::sync::Arc;
use ::auth::UserProviderRef;
use derive_builder::Builder;
use pgwire::api::auth::{ServerParameterProvider, StartupHandler};
use pgwire::api::query::{ExtendedQueryHandler, SimpleQueryHandler};
use pgwire::api::{ClientInfo, ErrorHandler, PgWireServerHandlers};
use pgwire::api::auth::ServerParameterProvider;
use pgwire::api::copy::NoopCopyHandler;
use pgwire::api::{ClientInfo, PgWireServerHandlers};
pub use server::PostgresServer;
use session::context::Channel;
use session::Session;
@@ -92,19 +92,29 @@ pub(crate) struct MakePostgresServerHandler {
pub(crate) struct PostgresServerHandler(Arc<PostgresServerHandlerInner>);
impl PgWireServerHandlers for PostgresServerHandler {
fn simple_query_handler(&self) -> Arc<impl SimpleQueryHandler> {
type StartupHandler = PostgresServerHandlerInner;
type SimpleQueryHandler = PostgresServerHandlerInner;
type ExtendedQueryHandler = PostgresServerHandlerInner;
type CopyHandler = NoopCopyHandler;
type ErrorHandler = PostgresServerHandlerInner;
fn simple_query_handler(&self) -> Arc<Self::SimpleQueryHandler> {
self.0.clone()
}
fn extended_query_handler(&self) -> Arc<impl ExtendedQueryHandler> {
fn extended_query_handler(&self) -> Arc<Self::ExtendedQueryHandler> {
self.0.clone()
}
fn startup_handler(&self) -> Arc<impl StartupHandler> {
fn startup_handler(&self) -> Arc<Self::StartupHandler> {
self.0.clone()
}
fn error_handler(&self) -> Arc<impl ErrorHandler> {
fn copy_handler(&self) -> Arc<Self::CopyHandler> {
Arc::new(NoopCopyHandler)
}
fn error_handler(&self) -> Arc<Self::ErrorHandler> {
self.0.clone()
}
}

View File

@@ -24,7 +24,7 @@ use pgwire::api::auth::StartupHandler;
use pgwire::api::{auth, ClientInfo, PgWireConnectionState};
use pgwire::error::{ErrorInfo, PgWireError, PgWireResult};
use pgwire::messages::response::ErrorResponse;
use pgwire::messages::startup::{Authentication, SecretKey};
use pgwire::messages::startup::Authentication;
use pgwire::messages::{PgWireBackendMessage, PgWireFrontendMessage};
use session::Session;
use snafu::IntoError;
@@ -127,8 +127,7 @@ where
// pass generated process id and secret key to client, this information will
// be sent to postgres client for query cancellation.
// use all 0 before we actually supported query cancellation
client.set_pid_and_secret_key(0, SecretKey::I32(0));
client.set_pid_and_secret_key(session.process_id() as i32, rand::random::<i32>());
// set userinfo outside
}

Some files were not shown because too many files have changed in this diff Show More