Compare commits

..

40 Commits

Author SHA1 Message Date
yihong
09f3d72d2d fix: closee issue #6555 return empty result (#6569)
* fix: closee issue #6555 return empty result

Signed-off-by: yihong0618 <zouzou0208@gmail.com>

* fix: only start one instance one regrex sqlness test (#6570)

Signed-off-by: yihong0618 <zouzou0208@gmail.com>

* refactor: refactor partition mod to use PartitionExpr instead of PartitionDef (#6554)

* refactor: refactor partition mod to use PartitionExpr instead of PartitionDef

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix snafu

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* Puts expression into PbPartition

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix compile

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* update proto

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* add serde test

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* add serde test

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: address comments

Signed-off-by: yihong0618 <zouzou0208@gmail.com>

---------

Signed-off-by: yihong0618 <zouzou0208@gmail.com>
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
Co-authored-by: Zhenchi <zhongzc_arch@outlook.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-24 15:00:32 +08:00
Yingwen
ca0c1282ed chore: bump version to 0.15.3 (#6580)
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-24 11:24:07 +08:00
Yingwen
b719c020ba chore: cherry pick #6540, #6550, #6551, #6556, #6563, #6534 to v0.15 branch (#6577)
* feat: add metrics for request wait time and adjust stall metrics (#6540)

* feat: add metric greptime_mito_request_wait_time to observe wait time

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: add worker to wait time metric

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: rename stall gauge to greptime_mito_write_stalling_count

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: change greptime_mito_write_stall_total to total stalled requests

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: merge lazy static blocks

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: estimate mem size for bulk ingester (#6550)

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: flow mirror cache (#6551)

* fix: invalid cache when flownode change address

Signed-off-by: discord9 <discord9@163.com>

* update comments

Signed-off-by: discord9 <discord9@163.com>

* fix

Signed-off-by: discord9 <discord9@163.com>

* refactor: add log&rename

Signed-off-by: discord9 <discord9@163.com>

* stuff

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: impl timestamp function for promql (#6556)

* feat: impl timestamp function for promql

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: style and typo

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: test

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* docs: update comments

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: comment

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

---------

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: MergeScan print input (#6563)

* feat: MergeScan print input

Signed-off-by: discord9 <discord9@163.com>

* test: fix ut

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: aggr group by all partition cols use partial commutative (#6534)

* fix: aggr group by all partition cols use partial commutative

Signed-off-by: discord9 <discord9@163.com>

* test: bugged case

Signed-off-by: discord9 <discord9@163.com>

* test: sqlness fix

Signed-off-by: discord9 <discord9@163.com>

* test: more redacted

Signed-off-by: discord9 <discord9@163.com>

* more cases

Signed-off-by: discord9 <discord9@163.com>

* even more test cases

Signed-off-by: discord9 <discord9@163.com>

* join testcase

Signed-off-by: discord9 <discord9@163.com>

* fix: column requirement added in correct location

Signed-off-by: discord9 <discord9@163.com>

* fix test

Signed-off-by: discord9 <discord9@163.com>

* chore: clippy

Signed-off-by: discord9 <discord9@163.com>

* track col reqs per stack

Signed-off-by: discord9 <discord9@163.com>

* fix: continue

Signed-off-by: discord9 <discord9@163.com>

* chore: clippy

Signed-off-by: discord9 <discord9@163.com>

* refactor: test mod

Signed-off-by: discord9 <discord9@163.com>

* test utils

Signed-off-by: discord9 <discord9@163.com>

* test: better test

Signed-off-by: discord9 <discord9@163.com>

* more testcases

Signed-off-by: discord9 <discord9@163.com>

* test limit push down

Signed-off-by: discord9 <discord9@163.com>

* more testcases

Signed-off-by: discord9 <discord9@163.com>

* more testcase

Signed-off-by: discord9 <discord9@163.com>

* more test

Signed-off-by: discord9 <discord9@163.com>

* chore: update sqlness

Signed-off-by: discord9 <discord9@163.com>

* chore: update commnets

Signed-off-by: discord9 <discord9@163.com>

* fix: check col reqs from bottom to upper

Signed-off-by: discord9 <discord9@163.com>

* chore: more comment

Signed-off-by: discord9 <discord9@163.com>

* docs: more todo

Signed-off-by: discord9 <discord9@163.com>

* chore: comments

Signed-off-by: discord9 <discord9@163.com>

* test: a new failing test that should be fixed

Signed-off-by: discord9 <discord9@163.com>

* fix: part col alias tracking

Signed-off-by: discord9 <discord9@163.com>

* chore: unused

Signed-off-by: discord9 <discord9@163.com>

* chore: clippy

Signed-off-by: discord9 <discord9@163.com>

* docs: comment

Signed-off-by: discord9 <discord9@163.com>

* mroe testcase

Signed-off-by: discord9 <discord9@163.com>

* more testcase for step/part aggr combine

Signed-off-by: discord9 <discord9@163.com>

* FIXME: a new bug

Signed-off-by: discord9 <discord9@163.com>

* literally unfixable

Signed-off-by: discord9 <discord9@163.com>

* chore: remove some debug print

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
Co-authored-by: fys <40801205+fengys1996@users.noreply.github.com>
Co-authored-by: discord9 <55937128+discord9@users.noreply.github.com>
Co-authored-by: dennis zhuang <killme2008@gmail.com>
2025-07-23 22:29:14 +08:00
Ruihang Xia
717c1d1807 feat: update partial execution metrics (#6499)
* feat: update partial execution metrics

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* send data with metrics in distributed mode

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* only send partial metrics under VERBOSE flag

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* loop to while

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-23 20:54:33 +08:00
Zhenchi
291f3c89fe fix: row selection intersection removes trailing rows (#6539)
* fix: row selection intersection removes trailing rows

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix typos

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-23 20:54:33 +08:00
discord9
602cc38056 fix: breaking loop when not retryable (#6538)
fix: breaking when not retryable

Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-23 20:54:33 +08:00
Lei, HUANG
46b3593021 fix(grpc): check grpc client unavailable (#6488)
* fix/check-grpc-client-unavailable:
 Improve async handling in `greptime_handler.rs`

 - Updated the `DoPut` response handling to use `await` with `result_sender.send` for better asynchronous operation.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix/check-grpc-client-unavailable:
 ### Improve Error Handling in `greptime_handler.rs`

 - Enhanced error handling for the `DoPut` operation by switching from `send` to `try_send` for the `result_sender`.
 - Added specific logging for unreachable clients, including `request_id` in the warning message.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-23 20:54:33 +08:00
Yan Tingwang
ff402fd6f6 test: add sqlness test for max execution time (#6517)
* add sqlness test for max_execution_time

Signed-off-by: codephage. <tingwangyan2020@163.com>

* add Pre-line comments SQLNESS PROTOCOL MYSQL

Signed-off-by: codephage. <tingwangyan2020@163.com>

* fix(mysql): support max_execution_time variable

Co-authored-by: evenyag <realevenyag@gmail.com>
Signed-off-by: codephage. <tingwangyan2020@163.com>

* fix: test::test_check & sqlness test mysql

Signed-off-by: codephage. <tingwangyan2020@163.com>

* add sqlness test for max_execution_time

Signed-off-by: codephage. <tingwangyan2020@163.com>

* add Pre-line comments SQLNESS PROTOCOL MYSQL

Signed-off-by: codephage. <tingwangyan2020@163.com>

* fix(mysql): support max_execution_time variable

Co-authored-by: evenyag <realevenyag@gmail.com>
Signed-off-by: codephage. <tingwangyan2020@163.com>

* fix: test::test_check & sqlness test mysql

Signed-off-by: codephage. <tingwangyan2020@163.com>

* chore: Unify the sql style

Signed-off-by: codephage. <tingwangyan2020@163.com>

---------

Signed-off-by: codephage. <tingwangyan2020@163.com>
Co-authored-by: evenyag <realevenyag@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-23 20:54:33 +08:00
Yan Tingwang
b83e6e2b18 fix: add system variable max_execution_time (#6511)
add system variable : max_execution_time

Signed-off-by: codephage. <tingwangyan2020@163.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-23 20:54:33 +08:00
discord9
cb74337dbe refactor(flow): faster time window expr (#6495)
* refactor: faster window expr

Signed-off-by: discord9 <discord9@163.com>

* docs: explain fast path

Signed-off-by: discord9 <discord9@163.com>

* chore: rm unwrap

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-23 20:54:33 +08:00
shuiyisong
32bffbb668 feat: add filter processor to v0.15 (#6516)
feat: add filter processor

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
2025-07-14 17:43:49 +08:00
evenyag
941906dc74 chore: bump version to v0.15.2
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-11 00:24:21 +08:00
Ruihang Xia
cbf251d0f0 fix: expand on conditional commutative as well (#6484)
* fix: expand on conditional commutative as well

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: discord9 <discord9@163.com>

* update sqlness result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: discord9 <discord9@163.com>

* add logging to figure test failure

Signed-off-by: discord9 <discord9@163.com>

* revert

Signed-off-by: discord9 <discord9@163.com>

* feat: stream drop record metrics

Signed-off-by: discord9 <discord9@163.com>

* Revert "feat: stream drop record metrics"

This reverts commit 6a16946a5b8ea37557bbb1b600847d24274d6500.

Signed-off-by: discord9 <discord9@163.com>

* feat: stream drop record metrics

Signed-off-by: discord9 <discord9@163.com>

refactor: move logging to drop too

Signed-off-by: discord9 <discord9@163.com>

fix: drop input stream before collect metrics

Signed-off-by: discord9 <discord9@163.com>

* fix: expand differently

Signed-off-by: discord9 <discord9@163.com>

* test: update sqlness

Signed-off-by: discord9 <discord9@163.com>

* chore: more dbg

Signed-off-by: discord9 <discord9@163.com>

* Revert "feat: stream drop record metrics"

This reverts commit 3eda4a2257928d95cf9c1328ae44fae84cfbb017.

Signed-off-by: discord9 <discord9@163.com>

* test: sqlness redacted

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: discord9 <discord9@163.com>
Co-authored-by: discord9 <discord9@163.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-11 00:24:21 +08:00
shuiyisong
1519379262 chore: skip calc ts in doc 2 with transform (#6509)
Signed-off-by: shuiyisong <xixing.sys@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-10 22:40:07 +08:00
localhost
4bfe02ec7f chore: remove region id to reduce time series (#6506)
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-10 22:40:07 +08:00
Weny Xu
ecacf1333e fix: correctly update partition key indices during alter table operations (#6494)
* fix: correctly update partition key indices in alter table operations

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: add sqlness tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-10 22:40:07 +08:00
Yingwen
92fa33c250 fix: range query returns range selector error when table not found (#6481)
* test: add sqlness test for range vector with non-existence metric

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: handle empty metric for matrix selector

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: update sqlness result

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: add newline

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-10 22:40:07 +08:00
shuiyisong
8b2d1a3753 fix: skip nan in prom remote write pipeline (#6489)
Signed-off-by: shuiyisong <xixing.sys@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-10 22:40:07 +08:00
Ning Sun
13401c94e0 feat: allow alternative version string (#6472)
* feat: allow alternative version string

* refactor: rename original version function to verbose_version

Signed-off-by: Ning Sun <sunning@greptime.com>

---------

Signed-off-by: Ning Sun <sunning@greptime.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-10 22:40:07 +08:00
shuiyisong
fd637dae47 chore: sort range query return values (#6474)
* chore: sort range query return values

* chore: add comments

* chore: add is_sorted check

* fix: test

Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-10 22:40:07 +08:00
dennis zhuang
69fac19770 fix: empty statements hang (#6480)
* fix: empty statements hang

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* tests: add cases

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

---------

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-10 22:40:07 +08:00
discord9
6435b97314 fix: stricter win sort condition (#6477)
test: sqlness

test: fix sqlness redacted

Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-10 22:40:07 +08:00
Weny Xu
726e3909fe fix(metric-engine): handle stale metadata region recovery failures (#6395)
* fix(metric-engine): handle stale metadata region recovery failures

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: add unit tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-10 22:40:07 +08:00
evenyag
00d759e828 chore: bump version to v0.15.1
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-04 22:53:46 +08:00
Lei, HUANG
0042ea6462 fix: filter empty batch in bulk insert api (#6459)
* fix/filter-empty-batch-in-bulk-insert-api:
 **Add Early Return for Empty Record Batches in `bulk_insert.rs`**

 - Implemented an early return in the `Inserter` implementation to handle cases where `record_batch.num_rows()` is zero, improving efficiency by avoiding unnecessary processing.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix/filter-empty-batch-in-bulk-insert-api:
 **Improve Bulk Insert Handling**

 - **`handle_bulk_insert.rs`**: Added a check to handle cases where the batch has zero rows, immediately returning and sending a success response with zero rows processed.
 - **`bulk_insert.rs`**: Enhanced logic to skip processing for masks that select none, optimizing the bulk insert operation by avoiding unnecessary iterations.

 These changes improve the efficiency and robustness of the bulk insert process by handling edge cases more effectively.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix/filter-empty-batch-in-bulk-insert-api:
 ### Refactor and Error Handling Enhancements

 - **Refactored Timestamp Handling**: Introduced `timestamp_array_to_primitive` function in `timestamp.rs` to streamline conversion of timestamp arrays to primitive arrays, reducing redundancy in `handle_bulk_insert.rs` and `bulk_insert.rs`.
 - **Error Handling**: Added `InconsistentTimestampLength` error in `error.rs` to handle mismatched timestamp column lengths in bulk insert operations.
 - **Bulk Insert Logic**: Updated `handle_bulk_insert.rs` to utilize the new timestamp conversion function and added checks for timestamp length consistency.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix/filter-empty-batch-in-bulk-insert-api:
 **Refactor `bulk_insert.rs` to streamline imports**

 - Simplified import statements by removing unused timestamp-related arrays and data types from the `arrow` crate in `bulk_insert.rs`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-04 22:53:46 +08:00
Zhenchi
d06450715f fix: add backward compatibility for SkippingIndexOptions deserialization (#6458)
* fix: add backward compatibility for `SkippingIndexOptions` deserialization

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-04 22:53:46 +08:00
evenyag
8612bb066f chore: fix statement compile errors
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-03 23:05:21 +08:00
Yingwen
467593d329 fix: enable max_execution time for other read only statements (#6454)
Also disable the timeout when timeout is 0

Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-03 23:05:21 +08:00
Ruihang Xia
9e4ae070b2 feat: skip rule checker on ingestion (#6453)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-07-03 23:05:21 +08:00
Ruihang Xia
d8261dda51 feat!: point matrix based partition rule checker (#6431)
* bare implementation

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* stateful generator

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* error report

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix remap checkpoint

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* use matrix generator as iterator

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* pre-calculate suffix product

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update existing test cases

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* sqlness

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix ut

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* clean up

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-03 23:05:21 +08:00
dennis zhuang
7ab9b335a1 fix: label_replace and label_join functions when used as sub‐expressions (#6443)
* fix: label_replace and label_join functions in expressions

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: remove update_fields

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: tql eval -> TQL EVAL

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: empty regex and not existing source label

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: simplfy test

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: test

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: test

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

---------

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-03 23:05:21 +08:00
Ruihang Xia
60835afb47 feat: Collider for playing with PartitionRule (#6399)
* skeleton

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* initial impl and tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* refactor and reorganize

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add comment

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* error handling

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* explain naming

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-03 23:05:21 +08:00
fys
aba5bf7431 refactor: avoid adding feature to parameter (#6391)
* refactor: avoid adding feature to parameter

* avoid `cfg(not(feature = ...))` block
2025-07-03 15:48:22 +08:00
Yingwen
7897fe8dbe fix: correct MAX_EXECUTION_TIME timeout calculation (#6444)
* feat: implement statement timeout in frontend instance

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: fail fast when timeout is 0

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: update start time

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-03 10:46:35 +08:00
Ruihang Xia
cc8ec706a1 fix: remap column indices on overriding logical table partitions (#6446)
* fix: remap column indices on overriding logical table partitions

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* sqlness

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* refactor map query

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-03 10:46:35 +08:00
Weny Xu
7c688718db fix: fix dest_keys chunks bug in TombstoneManager (#6432)
* fix(meta): fix dest_keys_chunks bug in TombstoneManager

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: fix typo

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: fix sqlness tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-03 10:46:35 +08:00
shuiyisong
8a0e554e5a feat(pipeline): support Loki API (#6390)
* chore: use schema_info

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* refactor: abstract loki item generator

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: introduce middle item

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* feat: introduce pipeline in loki api

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* test: add tests

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: minor update

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: minor update

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: update prefix and test

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: change recursion to loop

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* fix: cr issue

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-03 10:46:35 +08:00
jeremyhi
80fae1c559 feat: override logical table's partition key indices (#6385)
* feat: Override logical table's partition key indices with physical table's

* chore: by comment

Signed-off-by: evenyag <realevenyag@gmail.com>
2025-07-03 10:46:35 +08:00
Zhenchi
c37c4df20d feat: pick #6416 to release/0.15 (#6445)
* feat: pick #6416 to release/0.15

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* upgrade proto

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-07-02 08:55:54 +00:00
Yingwen
f712c1b356 feat: cherry-pick #6384 #6388 #6396 #6403 #6412 #6405 to 0.15 branch (#6414)
* feat: supports CsvWithNames and CsvWithNamesAndTypes formats (#6384)

* feat: supports CsvWithNames and CsvWithNamesAndTypes formats and object/array types

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* test: added and fixed tests

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: fix test

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: remove comments

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* test: add json type csv tests

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: remove comment

Co-authored-by: Yingwen <realevenyag@gmail.com>

---------

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: introduce /v1/health for healthcheck from external (#6388)

Signed-off-by: Ning Sun <sunning@greptime.com>
Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: update dashboard to v0.10.1 (#6396)

Co-authored-by: ZonaHex <ZonaHex@users.noreply.github.com>
Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: complete partial index search results in cache (#6403)

* fix: complete partial index search results in cache

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* polish

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* add initial tests

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* cover issue case

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* TestEnv new -> async

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: skip failing nodes when gathering porcess info (#6412)

* fix/process-manager-skip-fail-nodes:
 - **Enhance Error Handling in `process_manager.rs`:**
   Improved error handling by adding a warning log for failing nodes in the `list_process` method. This ensures that the process listing continues even if some nodes fail to respond.

 - **Add Error Type Import in `process_manager.rs`:**
   Included the `Error` type from the `error` module to handle errors more effectively within the `ProcessManager` implementation.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix: clippy

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix/process-manager-skip-fail-nodes:
 **Enhancements to Debugging and Trait Implementation**

 - **`process_manager.rs`**: Improved logging by adding more detailed error messages when skipping failing nodes.
 - **`selector.rs`**: Enhanced the `FrontendClient` trait by adding the `Debug` trait bound to improve debugging capabilities.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: pass pipeline name through http header and get db from query context (#6405)

Signed-off-by: zyy17 <zyylsxm@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
Signed-off-by: Ning Sun <sunning@greptime.com>
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
Signed-off-by: zyy17 <zyylsxm@gmail.com>
Co-authored-by: dennis zhuang <killme2008@gmail.com>
Co-authored-by: Ning Sun <sunng@protonmail.com>
Co-authored-by: ZonaHe <zonahe@qq.com>
Co-authored-by: ZonaHex <ZonaHex@users.noreply.github.com>
Co-authored-by: Zhenchi <zhongzc_arch@outlook.com>
Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>
Co-authored-by: zyy17 <zyylsxm@gmail.com>
2025-06-27 20:11:28 +08:00
324 changed files with 12935 additions and 8244 deletions

View File

@@ -12,3 +12,6 @@ fetch = true
checkout = true
list_files = true
internal_use_git2 = false
[env]
CARGO_WORKSPACE_DIR = { value = "", relative = true }

306
Cargo.lock generated
View File

@@ -211,7 +211,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
[[package]]
name = "api"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"common-base",
"common-decimal",
@@ -944,7 +944,7 @@ dependencies = [
[[package]]
name = "auth"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"api",
"async-trait",
@@ -1586,7 +1586,7 @@ dependencies = [
[[package]]
name = "cache"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"catalog",
"common-error",
@@ -1602,6 +1602,17 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acbc26382d871df4b7442e3df10a9402bf3cf5e55cbd66f12be38861425f0564"
[[package]]
name = "cargo-manifest"
version = "0.19.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1d8af896b707212cd0e99c112a78c9497dd32994192a463ed2f7419d29bd8c6"
dependencies = [
"serde",
"thiserror 2.0.12",
"toml 0.8.19",
]
[[package]]
name = "cast"
version = "0.3.0"
@@ -1610,7 +1621,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "catalog"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"api",
"arrow 54.2.1",
@@ -1948,7 +1959,7 @@ checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97"
[[package]]
name = "cli"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"async-stream",
"async-trait",
@@ -1986,14 +1997,14 @@ dependencies = [
"operator",
"query",
"rand 0.9.0",
"reqwest 0.12.9",
"reqwest",
"serde",
"serde_json",
"servers",
"session",
"snafu 0.8.5",
"store-api",
"substrait 0.15.0",
"substrait 0.15.3",
"table",
"tempfile",
"tokio",
@@ -2002,7 +2013,7 @@ dependencies = [
[[package]]
name = "client"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"api",
"arc-swap",
@@ -2032,7 +2043,7 @@ dependencies = [
"rand 0.9.0",
"serde_json",
"snafu 0.8.5",
"substrait 0.15.0",
"substrait 0.15.3",
"substrait 0.37.3",
"tokio",
"tokio-stream",
@@ -2073,7 +2084,7 @@ dependencies = [
[[package]]
name = "cmd"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"async-trait",
"auth",
@@ -2118,14 +2129,13 @@ dependencies = [
"mito2",
"moka",
"nu-ansi-term",
"object-store",
"plugins",
"prometheus",
"prost 0.13.5",
"query",
"rand 0.9.0",
"regex",
"reqwest 0.12.9",
"reqwest",
"rexpect",
"serde",
"serde_json",
@@ -2135,7 +2145,7 @@ dependencies = [
"snafu 0.8.5",
"stat",
"store-api",
"substrait 0.15.0",
"substrait 0.15.3",
"table",
"temp-env",
"tempfile",
@@ -2182,7 +2192,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
[[package]]
name = "common-base"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"anymap2",
"async-trait",
@@ -2204,11 +2214,11 @@ dependencies = [
[[package]]
name = "common-catalog"
version = "0.15.0"
version = "0.15.3"
[[package]]
name = "common-config"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"common-base",
"common-error",
@@ -2221,7 +2231,6 @@ dependencies = [
"humantime-serde",
"meta-client",
"num_cpus",
"object-store",
"serde",
"serde_json",
"serde_with",
@@ -2234,7 +2243,7 @@ dependencies = [
[[package]]
name = "common-datasource"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"arrow 54.2.1",
"arrow-schema 54.3.1",
@@ -2271,7 +2280,7 @@ dependencies = [
[[package]]
name = "common-decimal"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"bigdecimal 0.4.8",
"common-error",
@@ -2284,7 +2293,7 @@ dependencies = [
[[package]]
name = "common-error"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"common-macro",
"http 1.1.0",
@@ -2295,7 +2304,7 @@ dependencies = [
[[package]]
name = "common-frontend"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"async-trait",
"common-error",
@@ -2311,7 +2320,7 @@ dependencies = [
[[package]]
name = "common-function"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"ahash 0.8.11",
"api",
@@ -2334,7 +2343,6 @@ dependencies = [
"datafusion",
"datafusion-common",
"datafusion-expr",
"datafusion-functions-aggregate-common",
"datatypes",
"derive_more",
"geo",
@@ -2365,7 +2373,7 @@ dependencies = [
[[package]]
name = "common-greptimedb-telemetry"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"async-trait",
"common-runtime",
@@ -2373,7 +2381,7 @@ dependencies = [
"common-test-util",
"common-version",
"hyper 0.14.30",
"reqwest 0.12.9",
"reqwest",
"serde",
"tempfile",
"tokio",
@@ -2382,7 +2390,7 @@ dependencies = [
[[package]]
name = "common-grpc"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"api",
"arrow-flight",
@@ -2414,7 +2422,7 @@ dependencies = [
[[package]]
name = "common-grpc-expr"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"api",
"common-base",
@@ -2433,7 +2441,7 @@ dependencies = [
[[package]]
name = "common-macro"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"arc-swap",
"common-query",
@@ -2447,7 +2455,7 @@ dependencies = [
[[package]]
name = "common-mem-prof"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"anyhow",
"common-error",
@@ -2463,7 +2471,7 @@ dependencies = [
[[package]]
name = "common-meta"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"anymap2",
"api",
@@ -2528,7 +2536,7 @@ dependencies = [
[[package]]
name = "common-options"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"common-grpc",
"humantime-serde",
@@ -2537,11 +2545,11 @@ dependencies = [
[[package]]
name = "common-plugins"
version = "0.15.0"
version = "0.15.3"
[[package]]
name = "common-pprof"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"common-error",
"common-macro",
@@ -2553,7 +2561,7 @@ dependencies = [
[[package]]
name = "common-procedure"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"async-stream",
"async-trait",
@@ -2580,7 +2588,7 @@ dependencies = [
[[package]]
name = "common-procedure-test"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"async-trait",
"common-procedure",
@@ -2589,7 +2597,7 @@ dependencies = [
[[package]]
name = "common-query"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"api",
"async-trait",
@@ -2615,7 +2623,7 @@ dependencies = [
[[package]]
name = "common-recordbatch"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"arc-swap",
"common-error",
@@ -2635,7 +2643,7 @@ dependencies = [
[[package]]
name = "common-runtime"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -2665,17 +2673,18 @@ dependencies = [
[[package]]
name = "common-session"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"strum 0.27.1",
]
[[package]]
name = "common-telemetry"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"backtrace",
"common-error",
"common-version",
"console-subscriber",
"greptime-proto",
"humantime-serde",
@@ -2699,7 +2708,7 @@ dependencies = [
[[package]]
name = "common-test-util"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"client",
"common-grpc",
@@ -2712,7 +2721,7 @@ dependencies = [
[[package]]
name = "common-time"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"arrow 54.2.1",
"chrono",
@@ -2730,9 +2739,10 @@ dependencies = [
[[package]]
name = "common-version"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"build-data",
"cargo-manifest",
"const_format",
"serde",
"shadow-rs",
@@ -2740,7 +2750,7 @@ dependencies = [
[[package]]
name = "common-wal"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"common-base",
"common-error",
@@ -2763,7 +2773,7 @@ dependencies = [
[[package]]
name = "common-workload"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"api",
"common-telemetry",
@@ -3719,7 +3729,7 @@ dependencies = [
[[package]]
name = "datanode"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"api",
"arrow-flight",
@@ -3765,14 +3775,14 @@ dependencies = [
"prometheus",
"prost 0.13.5",
"query",
"reqwest 0.12.9",
"reqwest",
"serde",
"serde_json",
"servers",
"session",
"snafu 0.8.5",
"store-api",
"substrait 0.15.0",
"substrait 0.15.3",
"table",
"tokio",
"toml 0.8.19",
@@ -3781,7 +3791,7 @@ dependencies = [
[[package]]
name = "datatypes"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"arrow 54.2.1",
"arrow-array 54.2.1",
@@ -4441,7 +4451,7 @@ checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6"
[[package]]
name = "file-engine"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"api",
"async-trait",
@@ -4578,7 +4588,7 @@ checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8"
[[package]]
name = "flow"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"api",
"arrow 54.2.1",
@@ -4643,7 +4653,7 @@ dependencies = [
"sql",
"store-api",
"strum 0.27.1",
"substrait 0.15.0",
"substrait 0.15.3",
"table",
"tokio",
"tonic 0.12.3",
@@ -4698,10 +4708,11 @@ checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
[[package]]
name = "frontend"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"api",
"arc-swap",
"async-stream",
"async-trait",
"auth",
"bytes",
@@ -4738,7 +4749,6 @@ dependencies = [
"log-store",
"meta-client",
"num_cpus",
"object-store",
"opentelemetry-proto 0.27.0",
"operator",
"otel-arrow-rust",
@@ -4758,7 +4768,7 @@ dependencies = [
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
"store-api",
"strfmt",
"substrait 0.15.0",
"substrait 0.15.3",
"table",
"tokio",
"tokio-util",
@@ -5148,7 +5158,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=464226cf8a4a22696503536a123d0b9e318582f4#464226cf8a4a22696503536a123d0b9e318582f4"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=96c733f8472284d3c83a4c011dc6de9cf830c353#96c733f8472284d3c83a4c011dc6de9cf830c353"
dependencies = [
"prost 0.13.5",
"serde",
@@ -5919,7 +5929,7 @@ dependencies = [
[[package]]
name = "index"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"async-trait",
"asynchronous-codec",
@@ -6699,7 +6709,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4"
dependencies = [
"cfg-if",
"windows-targets 0.52.6",
"windows-targets 0.48.5",
]
[[package]]
@@ -6804,7 +6814,7 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
[[package]]
name = "log-query"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"chrono",
"common-error",
@@ -6816,7 +6826,7 @@ dependencies = [
[[package]]
name = "log-store"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"async-stream",
"async-trait",
@@ -7114,7 +7124,7 @@ dependencies = [
[[package]]
name = "meta-client"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"api",
"async-trait",
@@ -7142,7 +7152,7 @@ dependencies = [
[[package]]
name = "meta-srv"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"api",
"async-trait",
@@ -7233,9 +7243,8 @@ dependencies = [
[[package]]
name = "metric-engine"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"ahash 0.8.11",
"api",
"aquamarine",
"async-stream",
@@ -7324,7 +7333,7 @@ dependencies = [
[[package]]
name = "mito-codec"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"api",
"bytes",
@@ -7347,7 +7356,7 @@ dependencies = [
[[package]]
name = "mito2"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"api",
"aquamarine",
@@ -8097,25 +8106,18 @@ dependencies = [
[[package]]
name = "object-store"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"anyhow",
"bytes",
"common-base",
"common-error",
"common-macro",
"common-telemetry",
"common-test-util",
"futures",
"humantime-serde",
"lazy_static",
"md5",
"moka",
"opendal",
"prometheus",
"reqwest 0.12.9",
"serde",
"snafu 0.8.5",
"tokio",
"uuid",
]
@@ -8250,7 +8252,7 @@ dependencies = [
"prometheus",
"quick-xml 0.36.2",
"reqsign",
"reqwest 0.12.9",
"reqwest",
"serde",
"serde_json",
"sha2",
@@ -8322,19 +8324,6 @@ dependencies = [
"tracing",
]
[[package]]
name = "opentelemetry-http"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f51189ce8be654f9b5f7e70e49967ed894e84a06fc35c6c042e64ac1fc5399e"
dependencies = [
"async-trait",
"bytes",
"http 0.2.12",
"opentelemetry 0.21.0",
"reqwest 0.11.27",
]
[[package]]
name = "opentelemetry-otlp"
version = "0.14.0"
@@ -8345,12 +8334,10 @@ dependencies = [
"futures-core",
"http 0.2.12",
"opentelemetry 0.21.0",
"opentelemetry-http",
"opentelemetry-proto 0.4.0",
"opentelemetry-semantic-conventions",
"opentelemetry_sdk 0.21.2",
"prost 0.11.9",
"reqwest 0.11.27",
"thiserror 1.0.64",
"tokio",
"tonic 0.9.2",
@@ -8433,7 +8420,7 @@ dependencies = [
[[package]]
name = "operator"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"ahash 0.8.11",
"api",
@@ -8488,7 +8475,7 @@ dependencies = [
"sql",
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
"store-api",
"substrait 0.15.0",
"substrait 0.15.3",
"table",
"tokio",
"tokio-util",
@@ -8755,7 +8742,7 @@ dependencies = [
[[package]]
name = "partition"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"api",
"async-trait",
@@ -9043,7 +9030,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pipeline"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"ahash 0.8.11",
"api",
@@ -9186,7 +9173,7 @@ dependencies = [
[[package]]
name = "plugins"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"auth",
"clap 4.5.19",
@@ -9499,7 +9486,7 @@ dependencies = [
[[package]]
name = "promql"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"ahash 0.8.11",
"async-trait",
@@ -9595,7 +9582,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf"
dependencies = [
"heck 0.5.0",
"itertools 0.14.0",
"itertools 0.11.0",
"log",
"multimap",
"once_cell",
@@ -9641,7 +9628,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d"
dependencies = [
"anyhow",
"itertools 0.14.0",
"itertools 0.11.0",
"proc-macro2",
"quote",
"syn 2.0.100",
@@ -9781,7 +9768,7 @@ dependencies = [
[[package]]
name = "puffin"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"async-compression 0.4.13",
"async-trait",
@@ -9823,7 +9810,7 @@ dependencies = [
[[package]]
name = "query"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"ahash 0.8.11",
"api",
@@ -9889,7 +9876,7 @@ dependencies = [
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
"statrs",
"store-api",
"substrait 0.15.0",
"substrait 0.15.3",
"table",
"tokio",
"tokio-stream",
@@ -10337,7 +10324,7 @@ dependencies = [
"percent-encoding",
"quick-xml 0.35.0",
"rand 0.8.5",
"reqwest 0.12.9",
"reqwest",
"rsa",
"rust-ini 0.21.1",
"serde",
@@ -10346,42 +10333,6 @@ dependencies = [
"sha2",
]
[[package]]
name = "reqwest"
version = "0.11.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62"
dependencies = [
"base64 0.21.7",
"bytes",
"encoding_rs",
"futures-core",
"futures-util",
"h2 0.3.26",
"http 0.2.12",
"http-body 0.4.6",
"hyper 0.14.30",
"ipnet",
"js-sys",
"log",
"mime",
"once_cell",
"percent-encoding",
"pin-project-lite",
"serde",
"serde_json",
"serde_urlencoded",
"sync_wrapper 0.1.2",
"system-configuration",
"tokio",
"tower-service",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
"winreg",
]
[[package]]
name = "reqwest"
version = "0.12.9"
@@ -11211,7 +11162,7 @@ dependencies = [
[[package]]
name = "servers"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"ahash 0.8.11",
"api",
@@ -11232,7 +11183,6 @@ dependencies = [
"common-base",
"common-catalog",
"common-config",
"common-datasource",
"common-error",
"common-frontend",
"common-grpc",
@@ -11275,23 +11225,16 @@ dependencies = [
"local-ip-address",
"log-query",
"loki-proto",
"metric-engine",
"mime_guess",
"mito-codec",
"mito2",
"mysql_async",
"notify",
"object-pool",
"object-store",
"once_cell",
"openmetrics-parser",
"opensrv-mysql",
"opentelemetry-proto 0.27.0",
"operator",
"otel-arrow-rust",
"parking_lot 0.12.3",
"parquet",
"partition",
"permutation",
"pgwire",
"pin-project",
@@ -11305,7 +11248,7 @@ dependencies = [
"quoted-string",
"rand 0.9.0",
"regex",
"reqwest 0.12.9",
"reqwest",
"rust-embed",
"rustls",
"rustls-pemfile",
@@ -11340,7 +11283,7 @@ dependencies = [
[[package]]
name = "session"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"api",
"arc-swap",
@@ -11679,7 +11622,7 @@ dependencies = [
[[package]]
name = "sql"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"api",
"chrono",
@@ -11734,7 +11677,7 @@ dependencies = [
[[package]]
name = "sqlness-runner"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -11749,7 +11692,7 @@ dependencies = [
"local-ip-address",
"mysql",
"num_cpus",
"reqwest 0.12.9",
"reqwest",
"serde",
"serde_json",
"sha2",
@@ -12034,7 +11977,7 @@ dependencies = [
[[package]]
name = "stat"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"nix 0.30.1",
]
@@ -12060,7 +12003,7 @@ dependencies = [
[[package]]
name = "store-api"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"api",
"aquamarine",
@@ -12221,7 +12164,7 @@ dependencies = [
[[package]]
name = "substrait"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"async-trait",
"bytes",
@@ -12399,30 +12342,9 @@ dependencies = [
"nom",
]
[[package]]
name = "system-configuration"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7"
dependencies = [
"bitflags 1.3.2",
"core-foundation",
"system-configuration-sys",
]
[[package]]
name = "system-configuration-sys"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]]
name = "table"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"api",
"async-trait",
@@ -12683,7 +12605,7 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
[[package]]
name = "tests-fuzz"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"arbitrary",
"async-trait",
@@ -12710,7 +12632,7 @@ dependencies = [
"paste",
"rand 0.9.0",
"rand_chacha 0.9.0",
"reqwest 0.12.9",
"reqwest",
"schemars",
"serde",
"serde_json",
@@ -12727,7 +12649,7 @@ dependencies = [
[[package]]
name = "tests-integration"
version = "0.15.0"
version = "0.15.3"
dependencies = [
"api",
"arrow-flight",
@@ -12794,7 +12716,7 @@ dependencies = [
"sql",
"sqlx",
"store-api",
"substrait 0.15.0",
"substrait 0.15.3",
"table",
"tempfile",
"time",
@@ -13164,6 +13086,7 @@ version = "0.8.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1ed1f98e3fdc28d6d910e6737ae6ab1a93bf1985935a1193e68f93eeb68d24e"
dependencies = [
"indexmap 2.9.0",
"serde",
"serde_spanned",
"toml_datetime",
@@ -13866,13 +13789,12 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]]
name = "uuid"
version = "1.17.0"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3cf4199d1e5d15ddd86a694e4d0dffa9c323ce759fea589f00fef9d81cc1931d"
checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314"
dependencies = [
"getrandom 0.3.2",
"js-sys",
"rand 0.9.0",
"getrandom 0.2.15",
"rand 0.8.5",
"serde",
"wasm-bindgen",
]
@@ -14276,7 +14198,7 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
dependencies = [
"windows-sys 0.59.0",
"windows-sys 0.48.0",
]
[[package]]
@@ -14594,16 +14516,6 @@ dependencies = [
"memchr",
]
[[package]]
name = "winreg"
version = "0.50.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1"
dependencies = [
"cfg-if",
"windows-sys 0.48.0",
]
[[package]]
name = "wit-bindgen-rt"
version = "0.39.0"

View File

@@ -71,7 +71,7 @@ members = [
resolver = "2"
[workspace.package]
version = "0.15.0"
version = "0.15.3"
edition = "2021"
license = "Apache-2.0"
@@ -121,7 +121,6 @@ datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "
datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-functions-aggregate-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
@@ -135,7 +134,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "464226cf8a4a22696503536a123d0b9e318582f4" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "96c733f8472284d3c83a4c011dc6de9cf830c353" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -185,11 +185,10 @@
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4318` | The OTLP tracing endpoint. |
| `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. |
| `logging.append_stdout` | Bool | `true` | Whether to append logs to stdout. |
| `logging.log_format` | String | `text` | The log format. Can be `text`/`json`. |
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
| `logging.otlp_export_protocol` | String | `http` | The OTLP tracing export protocol. Can be `grpc`/`http`. |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `slow_query` | -- | -- | The slow query log options. |
@@ -289,11 +288,10 @@
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4318` | The OTLP tracing endpoint. |
| `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. |
| `logging.append_stdout` | Bool | `true` | Whether to append logs to stdout. |
| `logging.log_format` | String | `text` | The log format. Can be `text`/`json`. |
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
| `logging.otlp_export_protocol` | String | `http` | The OTLP tracing export protocol. Can be `grpc`/`http`. |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `slow_query` | -- | -- | The slow query log options. |
@@ -325,7 +323,6 @@
| `selector` | String | `round_robin` | Datanode selector type.<br/>- `round_robin` (default value)<br/>- `lease_based`<br/>- `load_based`<br/>For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". |
| `use_memory_store` | Bool | `false` | Store data in memory. |
| `enable_region_failover` | Bool | `false` | Whether to enable region failover.<br/>This feature is only available on GreptimeDB running on cluster mode and<br/>- Using Remote WAL<br/>- Using shared storage (e.g., s3). |
| `region_failure_detector_initialization_delay` | String | `10m` | Delay before initializing region failure detectors.<br/>This delay helps prevent premature initialization of region failure detectors in cases where<br/>cluster maintenance mode is enabled right after metasrv starts, especially when the cluster<br/>is not deployed via the recommended GreptimeDB Operator. Without this delay, early detector registration<br/>may trigger unnecessary region failovers during datanode startup. |
| `allow_region_failover_on_local_wal` | Bool | `false` | Whether to allow region failover on local WAL.<br/>**This option is not recommended to be set to true, because it may lead to data loss during failover.** |
| `node_max_idle_time` | String | `24hours` | Max allowed idle time before removing node info from metasrv memory. |
| `enable_telemetry` | Bool | `true` | Whether to enable greptimedb telemetry. Enabled by default. |
@@ -373,11 +370,10 @@
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4318` | The OTLP tracing endpoint. |
| `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. |
| `logging.append_stdout` | Bool | `true` | Whether to append logs to stdout. |
| `logging.log_format` | String | `text` | The log format. Can be `text`/`json`. |
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
| `logging.otlp_export_protocol` | String | `http` | The OTLP tracing export protocol. Can be `grpc`/`http`. |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `export_metrics` | -- | -- | The metasrv can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
@@ -538,11 +534,10 @@
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4318` | The OTLP tracing endpoint. |
| `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. |
| `logging.append_stdout` | Bool | `true` | Whether to append logs to stdout. |
| `logging.log_format` | String | `text` | The log format. Can be `text`/`json`. |
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
| `logging.otlp_export_protocol` | String | `http` | The OTLP tracing export protocol. Can be `grpc`/`http`. |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
@@ -589,11 +584,10 @@
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4318` | The OTLP tracing endpoint. |
| `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. |
| `logging.append_stdout` | Bool | `true` | Whether to append logs to stdout. |
| `logging.log_format` | String | `text` | The log format. Can be `text`/`json`. |
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
| `logging.otlp_export_protocol` | String | `http` | The OTLP tracing export protocol. Can be `grpc`/`http`. |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |

View File

@@ -629,7 +629,7 @@ level = "info"
enable_otlp_tracing = false
## The OTLP tracing endpoint.
otlp_endpoint = "http://localhost:4318"
otlp_endpoint = "http://localhost:4317"
## Whether to append logs to stdout.
append_stdout = true
@@ -640,9 +640,6 @@ log_format = "text"
## The maximum amount of log files.
max_log_files = 720
## The OTLP tracing export protocol. Can be `grpc`/`http`.
otlp_export_protocol = "http"
## The percentage of tracing will be sampled and exported.
## Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
## ratio > 1 are treated as 1. Fractions < 0 are treated as 0

View File

@@ -83,7 +83,7 @@ level = "info"
enable_otlp_tracing = false
## The OTLP tracing endpoint.
otlp_endpoint = "http://localhost:4318"
otlp_endpoint = "http://localhost:4317"
## Whether to append logs to stdout.
append_stdout = true
@@ -94,9 +94,6 @@ log_format = "text"
## The maximum amount of log files.
max_log_files = 720
## The OTLP tracing export protocol. Can be `grpc`/`http`.
otlp_export_protocol = "http"
## The percentage of tracing will be sampled and exported.
## Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
## ratio > 1 are treated as 1. Fractions < 0 are treated as 0

View File

@@ -218,7 +218,7 @@ level = "info"
enable_otlp_tracing = false
## The OTLP tracing endpoint.
otlp_endpoint = "http://localhost:4318"
otlp_endpoint = "http://localhost:4317"
## Whether to append logs to stdout.
append_stdout = true
@@ -229,9 +229,6 @@ log_format = "text"
## The maximum amount of log files.
max_log_files = 720
## The OTLP tracing export protocol. Can be `grpc`/`http`.
otlp_export_protocol = "http"
## The percentage of tracing will be sampled and exported.
## Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
## ratio > 1 are treated as 1. Fractions < 0 are treated as 0

View File

@@ -43,13 +43,6 @@ use_memory_store = false
## - Using shared storage (e.g., s3).
enable_region_failover = false
## Delay before initializing region failure detectors.
## This delay helps prevent premature initialization of region failure detectors in cases where
## cluster maintenance mode is enabled right after metasrv starts, especially when the cluster
## is not deployed via the recommended GreptimeDB Operator. Without this delay, early detector registration
## may trigger unnecessary region failovers during datanode startup.
region_failure_detector_initialization_delay = '10m'
## Whether to allow region failover on local WAL.
## **This option is not recommended to be set to true, because it may lead to data loss during failover.**
allow_region_failover_on_local_wal = false
@@ -227,7 +220,7 @@ level = "info"
enable_otlp_tracing = false
## The OTLP tracing endpoint.
otlp_endpoint = "http://localhost:4318"
otlp_endpoint = "http://localhost:4317"
## Whether to append logs to stdout.
append_stdout = true
@@ -238,9 +231,6 @@ log_format = "text"
## The maximum amount of log files.
max_log_files = 720
## The OTLP tracing export protocol. Can be `grpc`/`http`.
otlp_export_protocol = "http"
## The percentage of tracing will be sampled and exported.
## Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
## ratio > 1 are treated as 1. Fractions < 0 are treated as 0

View File

@@ -720,7 +720,7 @@ level = "info"
enable_otlp_tracing = false
## The OTLP tracing endpoint.
otlp_endpoint = "http://localhost:4318"
otlp_endpoint = "http://localhost:4317"
## Whether to append logs to stdout.
append_stdout = true
@@ -731,9 +731,6 @@ log_format = "text"
## The maximum amount of log files.
max_log_files = 720
## The OTLP tracing export protocol. Can be `grpc`/`http`.
otlp_export_protocol = "http"
## The percentage of tracing will be sampled and exported.
## Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
## ratio > 1 are treated as 1. Fractions < 0 are treated as 0

View File

@@ -31,7 +31,6 @@ excludes = [
"src/operator/src/expr_helper/trigger.rs",
"src/sql/src/statements/create/trigger.rs",
"src/sql/src/statements/show/trigger.rs",
"src/sql/src/statements/drop/trigger.rs",
"src/sql/src/parsers/create_parser/trigger.rs",
"src/sql/src/parsers/show_parser/trigger.rs",
]

View File

@@ -22,7 +22,6 @@ use greptime_proto::v1::region::RegionResponse as RegionResponseV1;
pub struct RegionResponse {
pub affected_rows: AffectedRows,
pub extensions: HashMap<String, Vec<u8>>,
pub metadata: Vec<u8>,
}
impl RegionResponse {
@@ -30,7 +29,6 @@ impl RegionResponse {
Self {
affected_rows: region_response.affected_rows as _,
extensions: region_response.extensions,
metadata: region_response.metadata,
}
}
@@ -39,16 +37,6 @@ impl RegionResponse {
Self {
affected_rows,
extensions: Default::default(),
metadata: Vec::new(),
}
}
/// Creates one response with metadata.
pub fn from_metadata(metadata: Vec<u8>) -> Self {
Self {
affected_rows: 0,
extensions: Default::default(),
metadata,
}
}
}

View File

@@ -226,18 +226,20 @@ mod tests {
assert!(options.is_none());
let mut schema = ColumnSchema::new("test", ConcreteDataType::string_datatype(), true)
.with_fulltext_options(FulltextOptions {
enable: true,
analyzer: FulltextAnalyzer::English,
case_sensitive: false,
backend: FulltextBackend::Bloom,
})
.with_fulltext_options(FulltextOptions::new_unchecked(
true,
FulltextAnalyzer::English,
false,
FulltextBackend::Bloom,
10240,
0.01,
))
.unwrap();
schema.set_inverted_index(true);
let options = options_from_column_schema(&schema).unwrap();
assert_eq!(
options.options.get(FULLTEXT_GRPC_KEY).unwrap(),
"{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false,\"backend\":\"bloom\"}"
"{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false,\"backend\":\"bloom\",\"granularity\":10240,\"false-positive-rate-in-10000\":100}"
);
assert_eq!(
options.options.get(INVERTED_INDEX_GRPC_KEY).unwrap(),
@@ -247,16 +249,18 @@ mod tests {
#[test]
fn test_options_with_fulltext() {
let fulltext = FulltextOptions {
enable: true,
analyzer: FulltextAnalyzer::English,
case_sensitive: false,
backend: FulltextBackend::Bloom,
};
let fulltext = FulltextOptions::new_unchecked(
true,
FulltextAnalyzer::English,
false,
FulltextBackend::Bloom,
10240,
0.01,
);
let options = options_from_fulltext(&fulltext).unwrap().unwrap();
assert_eq!(
options.options.get(FULLTEXT_GRPC_KEY).unwrap(),
"{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false,\"backend\":\"bloom\"}"
"{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false,\"backend\":\"bloom\",\"granularity\":10240,\"false-positive-rate-in-10000\":100}"
);
}

View File

@@ -28,7 +28,7 @@ use common_meta::cache::{
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
use common_meta::key::table_name::TableNameKey;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
@@ -39,6 +39,7 @@ use moka::sync::Cache;
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
use session::context::{Channel, QueryContext};
use snafu::prelude::*;
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use table::dist_table::DistTable;
use table::metadata::TableId;
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
@@ -142,6 +143,61 @@ impl KvBackendCatalogManager {
pub fn procedure_manager(&self) -> Option<ProcedureManagerRef> {
self.procedure_manager.clone()
}
// Override logical table's partition key indices with physical table's.
async fn override_logical_table_partition_key_indices(
table_route_cache: &TableRouteCacheRef,
table_info_manager: &TableInfoManager,
table: TableRef,
) -> Result<TableRef> {
// If the table is not a metric table, return the table directly.
if table.table_info().meta.engine != METRIC_ENGINE_NAME {
return Ok(table);
}
if let Some(table_route_value) = table_route_cache
.get(table.table_info().table_id())
.await
.context(TableMetadataManagerSnafu)?
&& let TableRoute::Logical(logical_route) = &*table_route_value
&& let Some(physical_table_info_value) = table_info_manager
.get(logical_route.physical_table_id())
.await
.context(TableMetadataManagerSnafu)?
{
let mut new_table_info = (*table.table_info()).clone();
// Remap partition key indices from physical table to logical table
new_table_info.meta.partition_key_indices = physical_table_info_value
.table_info
.meta
.partition_key_indices
.iter()
.filter_map(|&physical_index| {
// Get the column name from the physical table using the physical index
physical_table_info_value
.table_info
.meta
.schema
.column_schemas
.get(physical_index)
.and_then(|physical_column| {
// Find the corresponding index in the logical table schema
new_table_info
.meta
.schema
.column_index_by_name(physical_column.name.as_str())
})
})
.collect();
let new_table = DistTable::table(Arc::new(new_table_info));
return Ok(new_table);
}
Ok(table)
}
}
#[async_trait::async_trait]
@@ -268,10 +324,7 @@ impl CatalogManager for KvBackendCatalogManager {
let table_cache: TableCacheRef = self.cache_registry.get().context(CacheNotFoundSnafu {
name: "table_cache",
})?;
let table_route_cache: TableRouteCacheRef =
self.cache_registry.get().context(CacheNotFoundSnafu {
name: "table_route_cache",
})?;
let table = table_cache
.get_by_ref(&TableName {
catalog_name: catalog_name.to_string(),
@@ -281,55 +334,18 @@ impl CatalogManager for KvBackendCatalogManager {
.await
.context(GetTableCacheSnafu)?;
// Override logical table's partition key indices with physical table's.
if let Some(table) = &table
&& let Some(table_route_value) = table_route_cache
.get(table.table_info().table_id())
.await
.context(TableMetadataManagerSnafu)?
&& let TableRoute::Logical(logical_route) = &*table_route_value
&& let Some(physical_table_info_value) = self
.table_metadata_manager
.table_info_manager()
.get(logical_route.physical_table_id())
.await
.context(TableMetadataManagerSnafu)?
{
let mut new_table_info = (*table.table_info()).clone();
// Gather all column names from the logical table
let logical_column_names: std::collections::HashSet<_> = new_table_info
.meta
.schema
.column_schemas()
.iter()
.map(|col| &col.name)
.collect();
// Only preserve partition key indices where the corresponding columns exist in logical table
new_table_info.meta.partition_key_indices = physical_table_info_value
.table_info
.meta
.partition_key_indices
.iter()
.filter(|&&index| {
if let Some(physical_column) = physical_table_info_value
.table_info
.meta
.schema
.column_schemas
.get(index)
{
logical_column_names.contains(&physical_column.name)
} else {
false
}
})
.cloned()
.collect();
let new_table = DistTable::table(Arc::new(new_table_info));
return Ok(Some(new_table));
if let Some(table) = table {
let table_route_cache: TableRouteCacheRef =
self.cache_registry.get().context(CacheNotFoundSnafu {
name: "table_route_cache",
})?;
return Self::override_logical_table_partition_key_indices(
&table_route_cache,
self.table_metadata_manager.table_info_manager(),
table,
)
.await
.map(Some);
}
if channel == Channel::Postgres {
@@ -342,7 +358,7 @@ impl CatalogManager for KvBackendCatalogManager {
}
}
Ok(table)
Ok(None)
}
async fn tables_by_ids(
@@ -394,8 +410,20 @@ impl CatalogManager for KvBackendCatalogManager {
let catalog = catalog.to_string();
let schema = schema.to_string();
let semaphore = Arc::new(Semaphore::new(CONCURRENCY));
let table_route_cache: Result<TableRouteCacheRef> =
self.cache_registry.get().context(CacheNotFoundSnafu {
name: "table_route_cache",
});
common_runtime::spawn_global(async move {
let table_route_cache = match table_route_cache {
Ok(table_route_cache) => table_route_cache,
Err(e) => {
let _ = tx.send(Err(e)).await;
return;
}
};
let table_id_stream = metadata_manager
.table_name_manager()
.tables(&catalog, &schema)
@@ -422,6 +450,7 @@ impl CatalogManager for KvBackendCatalogManager {
let metadata_manager = metadata_manager.clone();
let tx = tx.clone();
let semaphore = semaphore.clone();
let table_route_cache = table_route_cache.clone();
common_runtime::spawn_global(async move {
// we don't explicitly close the semaphore so just ignore the potential error.
let _ = semaphore.acquire().await;
@@ -439,6 +468,16 @@ impl CatalogManager for KvBackendCatalogManager {
};
for table in table_info_values.into_values().map(build_table) {
let table = if let Ok(table) = table {
Self::override_logical_table_partition_key_indices(
&table_route_cache,
metadata_manager.table_info_manager(),
table,
)
.await
} else {
table
};
if tx.send(table).await.is_err() {
return;
}

View File

@@ -21,7 +21,7 @@ use std::sync::{Arc, RwLock};
use api::v1::frontend::{KillProcessRequest, ListProcessRequest, ProcessInfo};
use common_base::cancellation::CancellationHandle;
use common_frontend::selector::{FrontendSelector, MetaClientSelector};
use common_telemetry::{debug, info};
use common_telemetry::{debug, info, warn};
use common_time::util::current_time_millis;
use meta_client::MetaClientRef;
use snafu::{ensure, OptionExt, ResultExt};
@@ -141,14 +141,20 @@ impl ProcessManager {
.await
.context(error::InvokeFrontendSnafu)?;
for mut f in frontends {
processes.extend(
f.list_process(ListProcessRequest {
let result = f
.list_process(ListProcessRequest {
catalog: catalog.unwrap_or_default().to_string(),
})
.await
.context(error::InvokeFrontendSnafu)?
.processes,
);
.context(error::InvokeFrontendSnafu);
match result {
Ok(resp) => {
processes.extend(resp.processes);
}
Err(e) => {
warn!(e; "Skipping failing node: {:?}", f)
}
}
}
}
processes.extend(self.local_processes(catalog)?);

View File

@@ -19,7 +19,7 @@ mod information_memory_table;
pub mod key_column_usage;
mod partitions;
mod procedure_info;
pub mod process_list;
mod process_list;
pub mod region_peers;
mod region_statistics;
mod runtime_metrics;

View File

@@ -39,14 +39,14 @@ use crate::process_manager::ProcessManagerRef;
use crate::system_schema::information_schema::InformationTable;
/// Column names of `information_schema.process_list`
pub const ID: &str = "id";
pub const CATALOG: &str = "catalog";
pub const SCHEMAS: &str = "schemas";
pub const QUERY: &str = "query";
pub const CLIENT: &str = "client";
pub const FRONTEND: &str = "frontend";
pub const START_TIMESTAMP: &str = "start_timestamp";
pub const ELAPSED_TIME: &str = "elapsed_time";
const ID: &str = "id";
const CATALOG: &str = "catalog";
const SCHEMAS: &str = "schemas";
const QUERY: &str = "query";
const CLIENT: &str = "client";
const FRONTEND: &str = "frontend";
const START_TIMESTAMP: &str = "start_timestamp";
const ELAPSED_TIME: &str = "elapsed_time";
/// `information_schema.process_list` table implementation that tracks running
/// queries in current cluster.

View File

@@ -211,12 +211,18 @@ impl Database {
retries += 1;
warn!("Retrying {} times with error = {:?}", retries, err);
continue;
} else {
error!(
err; "Failed to send request to grpc handle, retries = {}, not retryable error, aborting",
retries
);
return Err(err.into());
}
}
(Err(err), false) => {
error!(
"Failed to send request to grpc handle after {} retries, error = {:?}",
retries, err
err; "Failed to send request to grpc handle after {} retries",
retries,
);
return Err(err.into());
}

View File

@@ -163,19 +163,70 @@ impl RegionRequester {
let _span = tracing_context.attach(common_telemetry::tracing::info_span!(
"poll_flight_data_stream"
));
while let Some(flight_message) = flight_message_stream.next().await {
let flight_message = flight_message
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let mut buffered_message: Option<FlightMessage> = None;
let mut stream_ended = false;
while !stream_ended {
// get the next message from the buffered message or read from the flight message stream
let flight_message_item = if let Some(msg) = buffered_message.take() {
Some(Ok(msg))
} else {
flight_message_stream.next().await
};
let flight_message = match flight_message_item {
Some(Ok(message)) => message,
Some(Err(e)) => {
yield Err(BoxedError::new(e)).context(ExternalSnafu);
break;
}
None => break,
};
match flight_message {
FlightMessage::RecordBatch(record_batch) => {
yield RecordBatch::try_from_df_record_batch(
let result_to_yield = RecordBatch::try_from_df_record_batch(
schema_cloned.clone(),
record_batch,
)
);
// get the next message from the stream. normally it should be a metrics message.
if let Some(next_flight_message_result) = flight_message_stream.next().await
{
match next_flight_message_result {
Ok(FlightMessage::Metrics(s)) => {
let m = serde_json::from_str(&s).ok().map(Arc::new);
metrics_ref.swap(m);
}
Ok(FlightMessage::RecordBatch(rb)) => {
// for some reason it's not a metrics message, so we need to buffer this record batch
// and yield it in the next iteration.
buffered_message = Some(FlightMessage::RecordBatch(rb));
}
Ok(_) => {
yield IllegalFlightMessagesSnafu {
reason: "A RecordBatch message can only be succeeded by a Metrics message or another RecordBatch message"
}
.fail()
.map_err(BoxedError::new)
.context(ExternalSnafu);
break;
}
Err(e) => {
yield Err(BoxedError::new(e)).context(ExternalSnafu);
break;
}
}
} else {
// the stream has ended
stream_ended = true;
}
yield result_to_yield;
}
FlightMessage::Metrics(s) => {
// just a branch in case of some metrics message comes after other things.
let m = serde_json::from_str(&s).ok().map(Arc::new);
metrics_ref.swap(m);
break;

View File

@@ -67,7 +67,6 @@ metric-engine.workspace = true
mito2.workspace = true
moka.workspace = true
nu-ansi-term = "0.46"
object-store.workspace = true
plugins.workspace = true
prometheus.workspace = true
prost.workspace = true

View File

@@ -20,11 +20,11 @@ use cmd::error::{InitTlsProviderSnafu, Result};
use cmd::options::GlobalOptions;
use cmd::{cli, datanode, flownode, frontend, metasrv, standalone, App};
use common_base::Plugins;
use common_version::version;
use common_version::{verbose_version, version};
use servers::install_ring_crypto_provider;
#[derive(Parser)]
#[command(name = "greptime", author, version, long_version = version(), about)]
#[command(name = "greptime", author, version, long_version = verbose_version(), about)]
#[command(propagate_version = true)]
pub(crate) struct Command {
#[clap(subcommand)]
@@ -143,10 +143,8 @@ async fn start(cli: Command) -> Result<()> {
}
fn setup_human_panic() {
human_panic::setup_panic!(
human_panic::Metadata::new("GreptimeDB", env!("CARGO_PKG_VERSION"))
.homepage("https://github.com/GreptimeTeam/greptimedb/discussions")
);
human_panic::setup_panic!(human_panic::Metadata::new("GreptimeDB", version())
.homepage("https://github.com/GreptimeTeam/greptimedb/discussions"));
common_telemetry::set_panic_hook();
}

View File

@@ -280,7 +280,7 @@ mod tests {
use common_config::ENV_VAR_SEP;
use common_test_util::temp_dir::create_named_temp_file;
use object_store::config::{FileConfig, GcsConfig, ObjectStoreConfig, S3Config};
use datanode::config::{FileConfig, GcsConfig, ObjectStoreConfig, S3Config};
use servers::heartbeat_options::HeartbeatOptions;
use super::*;

View File

@@ -19,7 +19,7 @@ use catalog::kvbackend::MetaKvBackend;
use common_base::Plugins;
use common_meta::cache::LayeredCacheRegistryBuilder;
use common_telemetry::info;
use common_version::{short_version, version};
use common_version::{short_version, verbose_version};
use datanode::datanode::DatanodeBuilder;
use datanode::service::DatanodeServiceBuilder;
use meta_client::MetaClientType;
@@ -67,7 +67,7 @@ impl InstanceBuilder {
None,
);
log_versions(version(), short_version(), APP_NAME);
log_versions(verbose_version(), short_version(), APP_NAME);
create_resource_limit_metrics(APP_NAME);
plugins::setup_datanode_plugins(plugins, &opts.plugins, dn_opts)

View File

@@ -32,7 +32,7 @@ use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::TableMetadataManager;
use common_telemetry::info;
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
use common_version::{short_version, version};
use common_version::{short_version, verbose_version};
use flow::{
get_flow_auth_options, FlownodeBuilder, FlownodeInstance, FlownodeServiceBuilder,
FrontendClient, FrontendInvoker,
@@ -279,7 +279,7 @@ impl StartCommand {
None,
);
log_versions(version(), short_version(), APP_NAME);
log_versions(verbose_version(), short_version(), APP_NAME);
create_resource_limit_metrics(APP_NAME);
info!("Flownode start command: {:#?}", self);

View File

@@ -33,7 +33,7 @@ use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_telemetry::info;
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
use common_time::timezone::set_default_timezone;
use common_version::{short_version, version};
use common_version::{short_version, verbose_version};
use frontend::frontend::Frontend;
use frontend::heartbeat::HeartbeatTask;
use frontend::instance::builder::FrontendBuilder;
@@ -282,7 +282,7 @@ impl StartCommand {
opts.component.slow_query.as_ref(),
);
log_versions(version(), short_version(), APP_NAME);
log_versions(verbose_version(), short_version(), APP_NAME);
create_resource_limit_metrics(APP_NAME);
info!("Frontend start command: {:#?}", self);

View File

@@ -112,7 +112,7 @@ pub trait App: Send {
pub fn log_versions(version: &str, short_version: &str, app: &str) {
// Report app version as gauge.
APP_VERSION
.with_label_values(&[env!("CARGO_PKG_VERSION"), short_version, app])
.with_label_values(&[common_version::version(), short_version, app])
.inc();
// Log version and argument flags.

View File

@@ -22,7 +22,7 @@ use common_base::Plugins;
use common_config::Configurable;
use common_telemetry::info;
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
use common_version::{short_version, version};
use common_version::{short_version, verbose_version};
use meta_srv::bootstrap::MetasrvInstance;
use meta_srv::metasrv::BackendImpl;
use snafu::ResultExt;
@@ -320,7 +320,7 @@ impl StartCommand {
None,
);
log_versions(version(), short_version(), APP_NAME);
log_versions(verbose_version(), short_version(), APP_NAME);
create_resource_limit_metrics(APP_NAME);
info!("Metasrv start command: {:#?}", self);

View File

@@ -51,7 +51,7 @@ use common_telemetry::logging::{
LoggingOptions, SlowQueryOptions, TracingOptions, DEFAULT_LOGGING_DIR,
};
use common_time::timezone::set_default_timezone;
use common_version::{short_version, version};
use common_version::{short_version, verbose_version};
use common_wal::config::DatanodeWalConfig;
use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig};
use datanode::datanode::{Datanode, DatanodeBuilder};
@@ -257,34 +257,15 @@ pub struct Instance {
flownode: FlownodeInstance,
procedure_manager: ProcedureManagerRef,
wal_options_allocator: WalOptionsAllocatorRef,
// The components of standalone, which make it easier to expand based
// on the components.
#[cfg(feature = "enterprise")]
components: Components,
// Keep the logging guard to prevent the worker from being dropped.
_guard: Vec<WorkerGuard>,
}
#[cfg(feature = "enterprise")]
pub struct Components {
pub plugins: Plugins,
pub kv_backend: KvBackendRef,
pub frontend_client: Arc<FrontendClient>,
pub catalog_manager: catalog::CatalogManagerRef,
}
impl Instance {
/// Find the socket addr of a server by its `name`.
pub fn server_addr(&self, name: &str) -> Option<SocketAddr> {
self.frontend.server_handlers().addr(name)
}
#[cfg(feature = "enterprise")]
pub fn components(&self) -> &Components {
&self.components
}
}
#[async_trait]
@@ -485,7 +466,7 @@ impl StartCommand {
opts.component.slow_query.as_ref(),
);
log_versions(version(), short_version(), APP_NAME);
log_versions(verbose_version(), short_version(), APP_NAME);
create_resource_limit_metrics(APP_NAME);
info!("Standalone start command: {:#?}", self);
@@ -565,14 +546,13 @@ impl StartCommand {
// actually make a connection
let (frontend_client, frontend_instance_handler) =
FrontendClient::from_empty_grpc_handler();
let frontend_client = Arc::new(frontend_client);
let flow_builder = FlownodeBuilder::new(
flownode_options,
plugins.clone(),
table_metadata_manager.clone(),
catalog_manager.clone(),
flow_metadata_manager.clone(),
frontend_client.clone(),
Arc::new(frontend_client.clone()),
);
let flownode = flow_builder
.build()
@@ -682,7 +662,7 @@ impl StartCommand {
let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
.context(error::ServersSnafu)?;
let servers = Services::new(opts, fe_instance.clone(), plugins.clone())
let servers = Services::new(opts, fe_instance.clone(), plugins)
.build()
.context(error::StartFrontendSnafu)?;
@@ -693,22 +673,12 @@ impl StartCommand {
export_metrics_task,
};
#[cfg(feature = "enterprise")]
let components = Components {
plugins,
kv_backend,
frontend_client,
catalog_manager,
};
Ok(Instance {
datanode,
frontend,
flownode,
procedure_manager,
wal_options_allocator,
#[cfg(feature = "enterprise")]
components,
_guard: guard,
})
}
@@ -848,7 +818,7 @@ mod tests {
use common_config::ENV_VAR_SEP;
use common_test_util::temp_dir::create_named_temp_file;
use common_wal::config::DatanodeWalConfig;
use object_store::config::{FileConfig, GcsConfig};
use datanode::config::{FileConfig, GcsConfig};
use super::*;
use crate::options::GlobalOptions;
@@ -967,15 +937,15 @@ mod tests {
assert!(matches!(
&dn_opts.storage.store,
object_store::config::ObjectStoreConfig::File(FileConfig { .. })
datanode::config::ObjectStoreConfig::File(FileConfig { .. })
));
assert_eq!(dn_opts.storage.providers.len(), 2);
assert!(matches!(
dn_opts.storage.providers[0],
object_store::config::ObjectStoreConfig::Gcs(GcsConfig { .. })
datanode::config::ObjectStoreConfig::Gcs(GcsConfig { .. })
));
match &dn_opts.storage.providers[1] {
object_store::config::ObjectStoreConfig::S3(s3_config) => {
datanode::config::ObjectStoreConfig::S3(s3_config) => {
assert_eq!(
"SecretBox<alloc::string::String>([REDACTED])".to_string(),
format!("{:?}", s3_config.access_key_id)

View File

@@ -18,7 +18,7 @@ use cmd::options::GreptimeOptions;
use cmd::standalone::StandaloneOptions;
use common_config::{Configurable, DEFAULT_DATA_HOME};
use common_options::datanode::{ClientOptions, DatanodeClientOptions};
use common_telemetry::logging::{LoggingOptions, DEFAULT_LOGGING_DIR, DEFAULT_OTLP_HTTP_ENDPOINT};
use common_telemetry::logging::{LoggingOptions, DEFAULT_LOGGING_DIR, DEFAULT_OTLP_ENDPOINT};
use common_wal::config::raft_engine::RaftEngineConfig;
use common_wal::config::DatanodeWalConfig;
use datanode::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
@@ -81,7 +81,7 @@ fn test_load_datanode_example_config() {
logging: LoggingOptions {
level: Some("info".to_string()),
dir: format!("{}/{}", DEFAULT_DATA_HOME, DEFAULT_LOGGING_DIR),
otlp_endpoint: Some(DEFAULT_OTLP_HTTP_ENDPOINT.to_string()),
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()),
..Default::default()
},
@@ -124,7 +124,7 @@ fn test_load_frontend_example_config() {
logging: LoggingOptions {
level: Some("info".to_string()),
dir: format!("{}/{}", DEFAULT_DATA_HOME, DEFAULT_LOGGING_DIR),
otlp_endpoint: Some(DEFAULT_OTLP_HTTP_ENDPOINT.to_string()),
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()),
..Default::default()
},
@@ -172,7 +172,7 @@ fn test_load_metasrv_example_config() {
logging: LoggingOptions {
dir: format!("{}/{}", DEFAULT_DATA_HOME, DEFAULT_LOGGING_DIR),
level: Some("info".to_string()),
otlp_endpoint: Some(DEFAULT_OTLP_HTTP_ENDPOINT.to_string()),
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()),
..Default::default()
},
@@ -229,7 +229,7 @@ fn test_load_standalone_example_config() {
logging: LoggingOptions {
level: Some("info".to_string()),
dir: format!("{}/{}", DEFAULT_DATA_HOME, DEFAULT_LOGGING_DIR),
otlp_endpoint: Some(DEFAULT_OTLP_HTTP_ENDPOINT.to_string()),
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()),
..Default::default()
},

View File

@@ -14,7 +14,6 @@ common-macro.workspace = true
config.workspace = true
humantime-serde.workspace = true
num_cpus.workspace = true
object-store.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true

View File

@@ -106,7 +106,7 @@ mod tests {
use common_telemetry::logging::LoggingOptions;
use common_test_util::temp_dir::create_named_temp_file;
use common_wal::config::DatanodeWalConfig;
use datanode::config::StorageConfig;
use datanode::config::{ObjectStoreConfig, StorageConfig};
use meta_client::MetaClientOptions;
use serde::{Deserialize, Serialize};
@@ -212,7 +212,7 @@ mod tests {
// Check the configs from environment variables.
match &opts.storage.store {
object_store::config::ObjectStoreConfig::S3(s3_config) => {
ObjectStoreConfig::S3(s3_config) => {
assert_eq!(s3_config.bucket, "mybucket".to_string());
}
_ => panic!("unexpected store type"),

View File

@@ -21,7 +21,6 @@ pub mod error;
pub mod file_format;
pub mod lister;
pub mod object_store;
pub mod parquet_writer;
pub mod share_buffer;
#[cfg(test)]
pub mod test_util;

View File

@@ -1,52 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use bytes::Bytes;
use futures::future::BoxFuture;
use object_store::Writer;
use parquet::arrow::async_writer::AsyncFileWriter;
use parquet::errors::ParquetError;
/// Bridges opendal [Writer] with parquet [AsyncFileWriter].
pub struct AsyncWriter {
inner: Writer,
}
impl AsyncWriter {
/// Create a [`AsyncWriter`] by given [`Writer`].
pub fn new(writer: Writer) -> Self {
Self { inner: writer }
}
}
impl AsyncFileWriter for AsyncWriter {
fn write(&mut self, bs: Bytes) -> BoxFuture<'_, parquet::errors::Result<()>> {
Box::pin(async move {
self.inner
.write(bs)
.await
.map_err(|err| ParquetError::External(Box::new(err)))
})
}
fn complete(&mut self) -> BoxFuture<'_, parquet::errors::Result<()>> {
Box::pin(async move {
self.inner
.close()
.await
.map(|_| ())
.map_err(|err| ParquetError::External(Box::new(err)))
})
}
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Debug;
use std::time::Duration;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
@@ -30,7 +31,7 @@ use crate::error::{MetaSnafu, Result};
pub type FrontendClientPtr = Box<dyn FrontendClient>;
#[async_trait::async_trait]
pub trait FrontendClient: Send {
pub trait FrontendClient: Send + Debug {
async fn list_process(&mut self, req: ListProcessRequest) -> Result<ListProcessResponse>;
async fn kill_process(&mut self, req: KillProcessRequest) -> Result<KillProcessResponse>;

View File

@@ -33,7 +33,6 @@ common-version.workspace = true
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datafusion-functions-aggregate-common.workspace = true
datatypes.workspace = true
derive_more = { version = "1", default-features = false, features = ["display"] }
geo = { version = "0.29", optional = true }

View File

@@ -13,7 +13,6 @@
// limitations under the License.
pub mod approximate;
pub mod count_hash;
#[cfg(feature = "geo")]
pub mod geo;
pub mod vector;

View File

@@ -1,647 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! `CountHash` / `count_hash` is a hash-based approximate distinct count function.
//!
//! It is a variant of `CountDistinct` that uses a hash function to approximate the
//! distinct count.
//! It is designed to be more efficient than `CountDistinct` for large datasets,
//! but it is not as accurate, as the hash value may be collision.
use std::collections::HashSet;
use std::fmt::Debug;
use std::sync::Arc;
use ahash::RandomState;
use datafusion_common::cast::as_list_array;
use datafusion_common::error::Result;
use datafusion_common::hash_utils::create_hashes;
use datafusion_common::utils::SingleRowListArrayBuilder;
use datafusion_common::{internal_err, not_impl_err, ScalarValue};
use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
use datafusion_expr::utils::{format_state_name, AggregateOrderSensitivity};
use datafusion_expr::{
Accumulator, AggregateUDF, AggregateUDFImpl, EmitTo, GroupsAccumulator, ReversedUDAF,
SetMonotonicity, Signature, TypeSignature, Volatility,
};
use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls::filtered_null_mask;
use datatypes::arrow;
use datatypes::arrow::array::{
Array, ArrayRef, AsArray, BooleanArray, Int64Array, ListArray, UInt64Array,
};
use datatypes::arrow::buffer::{OffsetBuffer, ScalarBuffer};
use datatypes::arrow::datatypes::{DataType, Field};
use crate::function_registry::FunctionRegistry;
type HashValueType = u64;
// read from /dev/urandom 4047821dc6144e4b2abddf23ad4171126a52eeecd26eff2191cf673b965a7875
const RANDOM_SEED_0: u64 = 0x4047821dc6144e4b;
const RANDOM_SEED_1: u64 = 0x2abddf23ad417112;
const RANDOM_SEED_2: u64 = 0x6a52eeecd26eff21;
const RANDOM_SEED_3: u64 = 0x91cf673b965a7875;
impl CountHash {
pub fn register(registry: &FunctionRegistry) {
registry.register_aggr(CountHash::udf_impl());
}
pub fn udf_impl() -> AggregateUDF {
AggregateUDF::new_from_impl(CountHash {
signature: Signature::one_of(
vec![TypeSignature::VariadicAny, TypeSignature::Nullary],
Volatility::Immutable,
),
})
}
}
#[derive(Debug, Clone)]
pub struct CountHash {
signature: Signature,
}
impl AggregateUDFImpl for CountHash {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn name(&self) -> &str {
"count_hash"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Int64)
}
fn is_nullable(&self) -> bool {
false
}
fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<Field>> {
Ok(vec![Field::new_list(
format_state_name(args.name, "count_hash"),
Field::new_list_field(DataType::UInt64, true),
// For count_hash accumulator, null list item stands for an
// empty value set (i.e., all NULL value so far for that group).
true,
)])
}
fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
if acc_args.exprs.len() > 1 {
return not_impl_err!("count_hash with multiple arguments");
}
Ok(Box::new(CountHashAccumulator {
values: HashSet::default(),
random_state: RandomState::with_seeds(
RANDOM_SEED_0,
RANDOM_SEED_1,
RANDOM_SEED_2,
RANDOM_SEED_3,
),
batch_hashes: vec![],
}))
}
fn aliases(&self) -> &[String] {
&[]
}
fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool {
true
}
fn create_groups_accumulator(
&self,
args: AccumulatorArgs,
) -> Result<Box<dyn GroupsAccumulator>> {
if args.exprs.len() > 1 {
return not_impl_err!("count_hash with multiple arguments");
}
Ok(Box::new(CountHashGroupAccumulator::new()))
}
fn reverse_expr(&self) -> ReversedUDAF {
ReversedUDAF::Identical
}
fn order_sensitivity(&self) -> AggregateOrderSensitivity {
AggregateOrderSensitivity::Insensitive
}
fn default_value(&self, _data_type: &DataType) -> Result<ScalarValue> {
Ok(ScalarValue::Int64(Some(0)))
}
fn set_monotonicity(&self, _data_type: &DataType) -> SetMonotonicity {
SetMonotonicity::Increasing
}
}
/// GroupsAccumulator for `count_hash` aggregate function
#[derive(Debug)]
pub struct CountHashGroupAccumulator {
/// One HashSet per group to track distinct values
distinct_sets: Vec<HashSet<HashValueType, RandomState>>,
random_state: RandomState,
batch_hashes: Vec<HashValueType>,
}
impl Default for CountHashGroupAccumulator {
fn default() -> Self {
Self::new()
}
}
impl CountHashGroupAccumulator {
pub fn new() -> Self {
Self {
distinct_sets: vec![],
random_state: RandomState::with_seeds(
RANDOM_SEED_0,
RANDOM_SEED_1,
RANDOM_SEED_2,
RANDOM_SEED_3,
),
batch_hashes: vec![],
}
}
fn ensure_sets(&mut self, total_num_groups: usize) {
if self.distinct_sets.len() < total_num_groups {
self.distinct_sets
.resize_with(total_num_groups, HashSet::default);
}
}
}
impl GroupsAccumulator for CountHashGroupAccumulator {
fn update_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
assert_eq!(values.len(), 1, "count_hash expects a single argument");
self.ensure_sets(total_num_groups);
let array = &values[0];
self.batch_hashes.clear();
self.batch_hashes.resize(array.len(), 0);
let hashes = create_hashes(
&[ArrayRef::clone(array)],
&self.random_state,
&mut self.batch_hashes,
)?;
// Use a pattern similar to accumulate_indices to process rows
// that are not null and pass the filter
let nulls = array.logical_nulls();
match (nulls.as_ref(), opt_filter) {
(None, None) => {
// No nulls, no filter - process all rows
for (row_idx, &group_idx) in group_indices.iter().enumerate() {
self.distinct_sets[group_idx].insert(hashes[row_idx]);
}
}
(Some(nulls), None) => {
// Has nulls, no filter
for (row_idx, (&group_idx, is_valid)) in
group_indices.iter().zip(nulls.iter()).enumerate()
{
if is_valid {
self.distinct_sets[group_idx].insert(hashes[row_idx]);
}
}
}
(None, Some(filter)) => {
// No nulls, has filter
for (row_idx, (&group_idx, filter_value)) in
group_indices.iter().zip(filter.iter()).enumerate()
{
if let Some(true) = filter_value {
self.distinct_sets[group_idx].insert(hashes[row_idx]);
}
}
}
(Some(nulls), Some(filter)) => {
// Has nulls and filter
let iter = filter
.iter()
.zip(group_indices.iter())
.zip(nulls.iter())
.enumerate();
for (row_idx, ((filter_value, &group_idx), is_valid)) in iter {
if is_valid && filter_value == Some(true) {
self.distinct_sets[group_idx].insert(hashes[row_idx]);
}
}
}
}
Ok(())
}
fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
let distinct_sets: Vec<HashSet<u64, RandomState>> =
emit_to.take_needed(&mut self.distinct_sets);
let counts = distinct_sets
.iter()
.map(|set| set.len() as i64)
.collect::<Vec<_>>();
Ok(Arc::new(Int64Array::from(counts)))
}
fn merge_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
_opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
assert_eq!(
values.len(),
1,
"count_hash merge expects a single state array"
);
self.ensure_sets(total_num_groups);
let list_array = as_list_array(&values[0])?;
// For each group in the incoming batch
for (i, &group_idx) in group_indices.iter().enumerate() {
if i < list_array.len() {
let inner_array = list_array.value(i);
let inner_array = inner_array.as_any().downcast_ref::<UInt64Array>().unwrap();
// Add each value to our set for this group
for j in 0..inner_array.len() {
if !inner_array.is_null(j) {
self.distinct_sets[group_idx].insert(inner_array.value(j));
}
}
}
}
Ok(())
}
fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
let distinct_sets: Vec<HashSet<u64, RandomState>> =
emit_to.take_needed(&mut self.distinct_sets);
let mut offsets = Vec::with_capacity(distinct_sets.len() + 1);
offsets.push(0);
let mut curr_len = 0i32;
let mut value_iter = distinct_sets
.into_iter()
.flat_map(|set| {
// build offset
curr_len += set.len() as i32;
offsets.push(curr_len);
// convert into iter
set.into_iter()
})
.peekable();
let data_array: ArrayRef = if value_iter.peek().is_none() {
arrow::array::new_empty_array(&DataType::UInt64) as _
} else {
Arc::new(UInt64Array::from_iter_values(value_iter))
};
let offset_buffer = OffsetBuffer::new(ScalarBuffer::from(offsets));
let list_array = ListArray::new(
Arc::new(Field::new_list_field(DataType::UInt64, true)),
offset_buffer,
data_array,
None,
);
Ok(vec![Arc::new(list_array) as _])
}
fn convert_to_state(
&self,
values: &[ArrayRef],
opt_filter: Option<&BooleanArray>,
) -> Result<Vec<ArrayRef>> {
// For a single hash value per row, create a list array with that value
assert_eq!(values.len(), 1, "count_hash expects a single argument");
let values = ArrayRef::clone(&values[0]);
let offsets = OffsetBuffer::new(ScalarBuffer::from_iter(0..values.len() as i32 + 1));
let nulls = filtered_null_mask(opt_filter, &values);
let list_array = ListArray::new(
Arc::new(Field::new_list_field(DataType::UInt64, true)),
offsets,
values,
nulls,
);
Ok(vec![Arc::new(list_array)])
}
fn supports_convert_to_state(&self) -> bool {
true
}
fn size(&self) -> usize {
// Base size of the struct
let mut size = size_of::<Self>();
// Size of the vector holding the HashSets
size += size_of::<Vec<HashSet<HashValueType, RandomState>>>()
+ self.distinct_sets.capacity() * size_of::<HashSet<HashValueType, RandomState>>();
// Estimate HashSet contents size more efficiently
// Instead of iterating through all values which is expensive, use an approximation
for set in &self.distinct_sets {
// Base size of the HashSet
size += set.capacity() * size_of::<HashValueType>();
}
size
}
}
#[derive(Debug)]
struct CountHashAccumulator {
values: HashSet<HashValueType, RandomState>,
random_state: RandomState,
batch_hashes: Vec<HashValueType>,
}
impl CountHashAccumulator {
// calculating the size for fixed length values, taking first batch size *
// number of batches.
fn fixed_size(&self) -> usize {
size_of_val(self) + (size_of::<HashValueType>() * self.values.capacity())
}
}
impl Accumulator for CountHashAccumulator {
/// Returns the distinct values seen so far as (one element) ListArray.
fn state(&mut self) -> Result<Vec<ScalarValue>> {
let values = self.values.iter().cloned().collect::<Vec<_>>();
let arr = Arc::new(UInt64Array::from(values)) as _;
let list_scalar = SingleRowListArrayBuilder::new(arr).build_list_scalar();
Ok(vec![list_scalar])
}
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
if values.is_empty() {
return Ok(());
}
let arr = &values[0];
if arr.data_type() == &DataType::Null {
return Ok(());
}
self.batch_hashes.clear();
self.batch_hashes.resize(arr.len(), 0);
let hashes = create_hashes(
&[ArrayRef::clone(arr)],
&self.random_state,
&mut self.batch_hashes,
)?;
for hash in hashes.as_slice() {
self.values.insert(*hash);
}
Ok(())
}
/// Merges multiple sets of distinct values into the current set.
///
/// The input to this function is a `ListArray` with **multiple** rows,
/// where each row contains the values from a partial aggregate's phase (e.g.
/// the result of calling `Self::state` on multiple accumulators).
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
if states.is_empty() {
return Ok(());
}
assert_eq!(states.len(), 1, "array_agg states must be singleton!");
let array = &states[0];
let list_array = array.as_list::<i32>();
for inner_array in list_array.iter() {
let Some(inner_array) = inner_array else {
return internal_err!(
"Intermediate results of count_hash should always be non null"
);
};
let hash_array = inner_array.as_any().downcast_ref::<UInt64Array>().unwrap();
for i in 0..hash_array.len() {
self.values.insert(hash_array.value(i));
}
}
Ok(())
}
fn evaluate(&mut self) -> Result<ScalarValue> {
Ok(ScalarValue::Int64(Some(self.values.len() as i64)))
}
fn size(&self) -> usize {
self.fixed_size()
}
}
#[cfg(test)]
mod tests {
use datatypes::arrow::array::{Array, BooleanArray, Int32Array, Int64Array};
use super::*;
fn create_test_accumulator() -> CountHashAccumulator {
CountHashAccumulator {
values: HashSet::default(),
random_state: RandomState::with_seeds(
RANDOM_SEED_0,
RANDOM_SEED_1,
RANDOM_SEED_2,
RANDOM_SEED_3,
),
batch_hashes: vec![],
}
}
#[test]
fn test_count_hash_accumulator() -> Result<()> {
let mut acc = create_test_accumulator();
// Test with some data
let array = Arc::new(Int32Array::from(vec![
Some(1),
Some(2),
Some(3),
Some(1),
Some(2),
None,
])) as ArrayRef;
acc.update_batch(&[array])?;
let result = acc.evaluate()?;
assert_eq!(result, ScalarValue::Int64(Some(4)));
// Test with empty data
let mut acc = create_test_accumulator();
let array = Arc::new(Int32Array::from(vec![] as Vec<Option<i32>>)) as ArrayRef;
acc.update_batch(&[array])?;
let result = acc.evaluate()?;
assert_eq!(result, ScalarValue::Int64(Some(0)));
// Test with only nulls
let mut acc = create_test_accumulator();
let array = Arc::new(Int32Array::from(vec![None, None, None])) as ArrayRef;
acc.update_batch(&[array])?;
let result = acc.evaluate()?;
assert_eq!(result, ScalarValue::Int64(Some(1)));
Ok(())
}
#[test]
fn test_count_hash_accumulator_merge() -> Result<()> {
// Accumulator 1
let mut acc1 = create_test_accumulator();
let array1 = Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])) as ArrayRef;
acc1.update_batch(&[array1])?;
let state1 = acc1.state()?;
// Accumulator 2
let mut acc2 = create_test_accumulator();
let array2 = Arc::new(Int32Array::from(vec![Some(3), Some(4), Some(5)])) as ArrayRef;
acc2.update_batch(&[array2])?;
let state2 = acc2.state()?;
// Merge state1 and state2 into a new accumulator
let mut acc_merged = create_test_accumulator();
let state_array1 = state1[0].to_array()?;
let state_array2 = state2[0].to_array()?;
acc_merged.merge_batch(&[state_array1])?;
acc_merged.merge_batch(&[state_array2])?;
let result = acc_merged.evaluate()?;
// Distinct values are {1, 2, 3, 4, 5}, so count is 5
assert_eq!(result, ScalarValue::Int64(Some(5)));
Ok(())
}
fn create_test_group_accumulator() -> CountHashGroupAccumulator {
CountHashGroupAccumulator::new()
}
#[test]
fn test_count_hash_group_accumulator() -> Result<()> {
let mut acc = create_test_group_accumulator();
let values = Arc::new(Int32Array::from(vec![1, 2, 1, 3, 2, 4, 5])) as ArrayRef;
let group_indices = vec![0, 1, 0, 0, 1, 2, 0];
let total_num_groups = 3;
acc.update_batch(&[values], &group_indices, None, total_num_groups)?;
let result_array = acc.evaluate(EmitTo::All)?;
let result = result_array.as_any().downcast_ref::<Int64Array>().unwrap();
// Group 0: {1, 3, 5} -> 3
// Group 1: {2} -> 1
// Group 2: {4} -> 1
assert_eq!(result.value(0), 3);
assert_eq!(result.value(1), 1);
assert_eq!(result.value(2), 1);
Ok(())
}
#[test]
fn test_count_hash_group_accumulator_with_filter() -> Result<()> {
let mut acc = create_test_group_accumulator();
let values = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6])) as ArrayRef;
let group_indices = vec![0, 0, 1, 1, 2, 2];
let filter = BooleanArray::from(vec![true, false, true, true, false, true]);
let total_num_groups = 3;
acc.update_batch(&[values], &group_indices, Some(&filter), total_num_groups)?;
let result_array = acc.evaluate(EmitTo::All)?;
let result = result_array.as_any().downcast_ref::<Int64Array>().unwrap();
// Group 0: {1} (2 is filtered out) -> 1
// Group 1: {3, 4} -> 2
// Group 2: {6} (5 is filtered out) -> 1
assert_eq!(result.value(0), 1);
assert_eq!(result.value(1), 2);
assert_eq!(result.value(2), 1);
Ok(())
}
#[test]
fn test_count_hash_group_accumulator_merge() -> Result<()> {
// Accumulator 1
let mut acc1 = create_test_group_accumulator();
let values1 = Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as ArrayRef;
let group_indices1 = vec![0, 0, 1, 1];
acc1.update_batch(&[values1], &group_indices1, None, 2)?;
// acc1 state: group 0 -> {1, 2}, group 1 -> {3, 4}
let state1 = acc1.state(EmitTo::All)?;
// Accumulator 2
let mut acc2 = create_test_group_accumulator();
let values2 = Arc::new(Int32Array::from(vec![5, 6, 1, 3])) as ArrayRef;
// Merge into different group indices
let group_indices2 = vec![2, 2, 0, 1];
acc2.update_batch(&[values2], &group_indices2, None, 3)?;
// acc2 state: group 0 -> {1}, group 1 -> {3}, group 2 -> {5, 6}
// Merge state from acc1 into acc2
// We will merge acc1's group 0 into acc2's group 0
// and acc1's group 1 into acc2's group 2
let merge_group_indices = vec![0, 2];
acc2.merge_batch(&state1, &merge_group_indices, None, 3)?;
let result_array = acc2.evaluate(EmitTo::All)?;
let result = result_array.as_any().downcast_ref::<Int64Array>().unwrap();
// Final state of acc2:
// Group 0: {1} U {1, 2} -> {1, 2}, count = 2
// Group 1: {3}, count = 1
// Group 2: {5, 6} U {3, 4} -> {3, 4, 5, 6}, count = 4
assert_eq!(result.value(0), 2);
assert_eq!(result.value(1), 1);
assert_eq!(result.value(2), 4);
Ok(())
}
#[test]
fn test_size() {
let acc = create_test_group_accumulator();
// Just test it doesn't crash and returns a value.
assert!(acc.size() > 0);
}
}

View File

@@ -21,7 +21,6 @@ use once_cell::sync::Lazy;
use crate::admin::AdminFunction;
use crate::aggrs::approximate::ApproximateFunction;
use crate::aggrs::count_hash::CountHash;
use crate::aggrs::vector::VectorFunction as VectorAggrFunction;
use crate::function::{AsyncFunctionRef, Function, FunctionRef};
use crate::function_factory::ScalarFunctionFactory;
@@ -145,9 +144,6 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
// Approximate functions
ApproximateFunction::register(&function_registry);
// CountHash function
CountHash::register(&function_registry);
Arc::new(function_registry)
});

View File

@@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use std::sync::Arc;
use std::{env, fmt};
use common_query::error::Result;
use common_query::prelude::{Signature, Volatility};
@@ -47,7 +47,7 @@ impl Function for PGVersionFunction {
fn eval(&self, _func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
let result = StringVector::from(vec![format!(
"PostgreSQL 16.3 GreptimeDB {}",
env!("CARGO_PKG_VERSION")
common_version::version()
)]);
Ok(Arc::new(result))
}

View File

@@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use std::sync::Arc;
use std::{env, fmt};
use common_query::error::Result;
use common_query::prelude::{Signature, Volatility};
@@ -52,13 +52,13 @@ impl Function for VersionFunction {
"{}-greptimedb-{}",
std::env::var("GREPTIMEDB_MYSQL_SERVER_VERSION")
.unwrap_or_else(|_| "8.4.2".to_string()),
env!("CARGO_PKG_VERSION")
common_version::version()
)
}
Channel::Postgres => {
format!("16.3-greptimedb-{}", env!("CARGO_PKG_VERSION"))
format!("16.3-greptimedb-{}", common_version::version())
}
_ => env!("CARGO_PKG_VERSION").to_string(),
_ => common_version::version().to_string(),
};
let result = StringVector::from(vec![version]);
Ok(Arc::new(result))

View File

@@ -34,7 +34,7 @@ use table::requests::{
};
use crate::error::{
InvalidColumnDefSnafu, InvalidSetFulltextOptionRequestSnafu,
InvalidColumnDefSnafu, InvalidIndexOptionSnafu, InvalidSetFulltextOptionRequestSnafu,
InvalidSetSkippingIndexOptionRequestSnafu, InvalidSetTableOptionRequestSnafu,
InvalidUnsetTableOptionRequestSnafu, MissingAlterIndexOptionSnafu, MissingFieldSnafu,
MissingTimestampColumnSnafu, Result, UnknownLocationTypeSnafu,
@@ -126,18 +126,21 @@ pub fn alter_expr_to_request(table_id: TableId, expr: AlterTableExpr) -> Result<
api::v1::set_index::Options::Fulltext(f) => AlterKind::SetIndex {
options: SetIndexOptions::Fulltext {
column_name: f.column_name.clone(),
options: FulltextOptions {
enable: f.enable,
analyzer: as_fulltext_option_analyzer(
options: FulltextOptions::new(
f.enable,
as_fulltext_option_analyzer(
Analyzer::try_from(f.analyzer)
.context(InvalidSetFulltextOptionRequestSnafu)?,
),
case_sensitive: f.case_sensitive,
backend: as_fulltext_option_backend(
f.case_sensitive,
as_fulltext_option_backend(
PbFulltextBackend::try_from(f.backend)
.context(InvalidSetFulltextOptionRequestSnafu)?,
),
},
f.granularity as u32,
f.false_positive_rate,
)
.context(InvalidIndexOptionSnafu)?,
},
},
api::v1::set_index::Options::Inverted(i) => AlterKind::SetIndex {
@@ -148,13 +151,15 @@ pub fn alter_expr_to_request(table_id: TableId, expr: AlterTableExpr) -> Result<
api::v1::set_index::Options::Skipping(s) => AlterKind::SetIndex {
options: SetIndexOptions::Skipping {
column_name: s.column_name,
options: SkippingIndexOptions {
granularity: s.granularity as u32,
index_type: as_skipping_index_type(
options: SkippingIndexOptions::new(
s.granularity as u32,
s.false_positive_rate,
as_skipping_index_type(
PbSkippingIndexType::try_from(s.skipping_index_type)
.context(InvalidSetSkippingIndexOptionRequestSnafu)?,
),
},
)
.context(InvalidIndexOptionSnafu)?,
},
},
},

View File

@@ -153,6 +153,14 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid index option"))]
InvalidIndexOption {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: datatypes::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -180,7 +188,8 @@ impl ErrorExt for Error {
| Error::InvalidUnsetTableOptionRequest { .. }
| Error::InvalidSetFulltextOptionRequest { .. }
| Error::InvalidSetSkippingIndexOptionRequest { .. }
| Error::MissingAlterIndexOption { .. } => StatusCode::InvalidArguments,
| Error::MissingAlterIndexOption { .. }
| Error::InvalidIndexOption { .. } => StatusCode::InvalidArguments,
}
}

View File

@@ -15,6 +15,7 @@
use std::collections::HashMap;
use std::sync::Arc;
use common_telemetry::info;
use futures::future::BoxFuture;
use moka::future::Cache;
use moka::ops::compute::Op;
@@ -89,6 +90,12 @@ fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId,
// we have a corresponding cache invalidation mechanism to invalidate `(Key, EmptyHashSet)`.
.map(Arc::new)
.map(Some)
.inspect(|set| {
info!(
"Initialized table_flownode cache for table_id: {}, set: {:?}",
table_id, set
);
})
})
})
}
@@ -167,6 +174,13 @@ fn invalidator<'a>(
match ident {
CacheIdent::CreateFlow(create_flow) => handle_create_flow(cache, create_flow).await,
CacheIdent::DropFlow(drop_flow) => handle_drop_flow(cache, drop_flow).await,
CacheIdent::FlowNodeAddressChange(node_id) => {
info!(
"Invalidate flow node cache for node_id in table_flownode: {}",
node_id
);
cache.invalidate_all();
}
_ => {}
}
Ok(())
@@ -174,7 +188,10 @@ fn invalidator<'a>(
}
fn filter(ident: &CacheIdent) -> bool {
matches!(ident, CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_))
matches!(
ident,
CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_) | CacheIdent::FlowNodeAddressChange(_)
)
}
#[cfg(test)]

View File

@@ -22,6 +22,7 @@ use crate::key::flow::flow_name::FlowNameKey;
use crate::key::flow::flow_route::FlowRouteKey;
use crate::key::flow::flownode_flow::FlownodeFlowKey;
use crate::key::flow::table_flow::TableFlowKey;
use crate::key::node_address::NodeAddressKey;
use crate::key::schema_name::SchemaNameKey;
use crate::key::table_info::TableInfoKey;
use crate::key::table_name::TableNameKey;
@@ -53,6 +54,10 @@ pub struct Context {
#[async_trait::async_trait]
pub trait CacheInvalidator: Send + Sync {
async fn invalidate(&self, ctx: &Context, caches: &[CacheIdent]) -> Result<()>;
fn name(&self) -> &'static str {
std::any::type_name::<Self>()
}
}
pub type CacheInvalidatorRef = Arc<dyn CacheInvalidator>;
@@ -137,6 +142,13 @@ where
let key = FlowInfoKey::new(*flow_id);
self.invalidate_key(&key.to_bytes()).await;
}
CacheIdent::FlowNodeAddressChange(node_id) => {
// other caches doesn't need to be invalidated
// since this is only for flownode address change not id change
common_telemetry::info!("Invalidate flow node cache for node_id: {}", node_id);
let key = NodeAddressKey::with_flownode(*node_id);
self.invalidate_key(&key.to_bytes()).await;
}
}
}
Ok(())

View File

@@ -32,7 +32,6 @@ impl MockDatanodeHandler for () {
Ok(RegionResponse {
affected_rows: 0,
extensions: Default::default(),
metadata: Vec::new(),
})
}

View File

@@ -50,11 +50,7 @@ use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
#[cfg(feature = "enterprise")]
use crate::rpc::ddl::trigger::CreateTriggerTask;
#[cfg(feature = "enterprise")]
use crate::rpc::ddl::trigger::DropTriggerTask;
#[cfg(feature = "enterprise")]
use crate::rpc::ddl::DdlTask::CreateTrigger;
#[cfg(feature = "enterprise")]
use crate::rpc::ddl::DdlTask::DropTrigger;
use crate::rpc::ddl::DdlTask::{
AlterDatabase, AlterLogicalTables, AlterTable, CreateDatabase, CreateFlow, CreateLogicalTables,
CreateTable, CreateView, DropDatabase, DropFlow, DropLogicalTables, DropTable, DropView,
@@ -95,14 +91,6 @@ pub trait TriggerDdlManager: Send + Sync {
query_context: QueryContext,
) -> Result<SubmitDdlTaskResponse>;
async fn drop_trigger(
&self,
drop_trigger_task: DropTriggerTask,
procedure_manager: ProcedureManagerRef,
ddl_context: DdlContext,
query_context: QueryContext,
) -> Result<SubmitDdlTaskResponse>;
fn as_any(&self) -> &dyn std::any::Any;
}
@@ -660,28 +648,6 @@ async fn handle_drop_flow_task(
})
}
#[cfg(feature = "enterprise")]
async fn handle_drop_trigger_task(
ddl_manager: &DdlManager,
drop_trigger_task: DropTriggerTask,
query_context: QueryContext,
) -> Result<SubmitDdlTaskResponse> {
let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
return UnsupportedSnafu {
operation: "drop trigger",
}
.fail();
};
m.drop_trigger(
drop_trigger_task,
ddl_manager.procedure_manager.clone(),
ddl_manager.ddl_context.clone(),
query_context,
)
.await
}
async fn handle_drop_view_task(
ddl_manager: &DdlManager,
drop_view_task: DropViewTask,
@@ -869,11 +835,6 @@ impl ProcedureExecutor for DdlManager {
handle_create_flow_task(self, create_flow_task, request.query_context.into())
.await
}
DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await,
CreateView(create_view_task) => {
handle_create_view_task(self, create_view_task).await
}
DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await,
#[cfg(feature = "enterprise")]
CreateTrigger(create_trigger_task) => {
handle_create_trigger_task(
@@ -883,11 +844,11 @@ impl ProcedureExecutor for DdlManager {
)
.await
}
#[cfg(feature = "enterprise")]
DropTrigger(drop_trigger_task) => {
handle_drop_trigger_task(self, drop_trigger_task, request.query_context.into())
.await
DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await,
CreateView(create_view_task) => {
handle_create_view_task(self, create_view_task).await
}
DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await,
}
}
.trace(span)

View File

@@ -174,6 +174,8 @@ pub struct UpgradeRegion {
/// The identifier of cache.
pub enum CacheIdent {
FlowId(FlowId),
/// Indicate change of address of flownode.
FlowNodeAddressChange(u64),
FlowName(FlowName),
TableId(TableId),
TableName(TableName),

View File

@@ -48,11 +48,6 @@ impl TableRouteKey {
pub fn new(table_id: TableId) -> Self {
Self { table_id }
}
/// Returns the range prefix of the table route key.
pub fn range_prefix() -> Vec<u8> {
format!("{}/", TABLE_ROUTE_PREFIX).into_bytes()
}
}
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]

View File

@@ -14,13 +14,14 @@
use std::collections::HashMap;
use common_telemetry::debug;
use snafu::ensure;
use crate::error::{self, Result};
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::BatchGetRequest;
use crate::rpc::store::{BatchDeleteRequest, BatchGetRequest};
/// [TombstoneManager] provides the ability to:
/// - logically delete values
@@ -28,6 +29,9 @@ use crate::rpc::store::BatchGetRequest;
pub struct TombstoneManager {
kv_backend: KvBackendRef,
tombstone_prefix: String,
// Only used for testing.
#[cfg(test)]
max_txn_ops: Option<usize>,
}
const TOMBSTONE_PREFIX: &str = "__tombstone/";
@@ -35,10 +39,7 @@ const TOMBSTONE_PREFIX: &str = "__tombstone/";
impl TombstoneManager {
/// Returns [TombstoneManager].
pub fn new(kv_backend: KvBackendRef) -> Self {
Self {
kv_backend,
tombstone_prefix: TOMBSTONE_PREFIX.to_string(),
}
Self::new_with_prefix(kv_backend, TOMBSTONE_PREFIX)
}
/// Returns [TombstoneManager] with a custom tombstone prefix.
@@ -46,6 +47,8 @@ impl TombstoneManager {
Self {
kv_backend,
tombstone_prefix: prefix.to_string(),
#[cfg(test)]
max_txn_ops: None,
}
}
@@ -53,6 +56,11 @@ impl TombstoneManager {
[self.tombstone_prefix.as_bytes(), key].concat()
}
#[cfg(test)]
pub fn set_max_txn_ops(&mut self, max_txn_ops: usize) {
self.max_txn_ops = Some(max_txn_ops);
}
/// Moves value to `dest_key`.
///
/// Puts `value` to `dest_key` if the value of `src_key` equals `value`.
@@ -83,7 +91,11 @@ impl TombstoneManager {
ensure!(
keys.len() == dest_keys.len(),
error::UnexpectedSnafu {
err_msg: "The length of keys does not match the length of dest_keys."
err_msg: format!(
"The length of keys({}) does not match the length of dest_keys({}).",
keys.len(),
dest_keys.len()
),
}
);
// The key -> dest key mapping.
@@ -136,19 +148,45 @@ impl TombstoneManager {
.fail()
}
fn max_txn_ops(&self) -> usize {
#[cfg(test)]
if let Some(max_txn_ops) = self.max_txn_ops {
return max_txn_ops;
}
self.kv_backend.max_txn_ops()
}
/// Moves values to `dest_key`.
///
/// Returns the number of keys that were moved.
async fn move_values(&self, keys: Vec<Vec<u8>>, dest_keys: Vec<Vec<u8>>) -> Result<usize> {
let chunk_size = self.kv_backend.max_txn_ops() / 2;
if keys.len() > chunk_size {
let keys_chunks = keys.chunks(chunk_size).collect::<Vec<_>>();
let dest_keys_chunks = keys.chunks(chunk_size).collect::<Vec<_>>();
for (keys, dest_keys) in keys_chunks.into_iter().zip(dest_keys_chunks) {
self.move_values_inner(keys, dest_keys).await?;
ensure!(
keys.len() == dest_keys.len(),
error::UnexpectedSnafu {
err_msg: format!(
"The length of keys({}) does not match the length of dest_keys({}).",
keys.len(),
dest_keys.len()
),
}
Ok(keys.len())
);
if keys.is_empty() {
return Ok(0);
}
let chunk_size = self.max_txn_ops() / 2;
if keys.len() > chunk_size {
debug!(
"Moving values with multiple chunks, keys len: {}, chunk_size: {}",
keys.len(),
chunk_size
);
let mut moved_keys = 0;
let keys_chunks = keys.chunks(chunk_size).collect::<Vec<_>>();
let dest_keys_chunks = dest_keys.chunks(chunk_size).collect::<Vec<_>>();
for (keys, dest_keys) in keys_chunks.into_iter().zip(dest_keys_chunks) {
moved_keys += self.move_values_inner(keys, dest_keys).await?;
}
Ok(moved_keys)
} else {
self.move_values_inner(&keys, &dest_keys).await
}
@@ -196,15 +234,18 @@ impl TombstoneManager {
///
/// Returns the number of keys that were deleted.
pub async fn delete(&self, keys: Vec<Vec<u8>>) -> Result<usize> {
let operations = keys
let keys = keys
.iter()
.map(|key| TxnOp::Delete(self.to_tombstone(key)))
.map(|key| self.to_tombstone(key))
.collect::<Vec<_>>();
let txn = Txn::new().and_then(operations);
// Always success.
let _ = self.kv_backend.txn(txn).await?;
Ok(keys.len())
let num_keys = keys.len();
let _ = self
.kv_backend
.batch_delete(BatchDeleteRequest::new().with_keys(keys))
.await?;
Ok(num_keys)
}
}
@@ -392,16 +433,73 @@ mod tests {
.into_iter()
.map(|kv| (kv.key, kv.dest_key))
.unzip();
tombstone_manager
let moved_keys = tombstone_manager
.move_values(keys.clone(), dest_keys.clone())
.await
.unwrap();
assert_eq!(kvs.len(), moved_keys);
check_moved_values(kv_backend.clone(), &move_values).await;
// Moves again
tombstone_manager
let moved_keys = tombstone_manager
.move_values(keys.clone(), dest_keys.clone())
.await
.unwrap();
assert_eq!(0, moved_keys);
check_moved_values(kv_backend.clone(), &move_values).await;
}
#[tokio::test]
async fn test_move_values_with_max_txn_ops() {
common_telemetry::init_default_ut_logging();
let kv_backend = Arc::new(MemoryKvBackend::default());
let mut tombstone_manager = TombstoneManager::new(kv_backend.clone());
tombstone_manager.set_max_txn_ops(4);
let kvs = HashMap::from([
(b"bar".to_vec(), b"baz".to_vec()),
(b"foo".to_vec(), b"hi".to_vec()),
(b"baz".to_vec(), b"hello".to_vec()),
(b"qux".to_vec(), b"world".to_vec()),
(b"quux".to_vec(), b"world".to_vec()),
(b"quuux".to_vec(), b"world".to_vec()),
(b"quuuux".to_vec(), b"world".to_vec()),
(b"quuuuux".to_vec(), b"world".to_vec()),
(b"quuuuuux".to_vec(), b"world".to_vec()),
]);
for (key, value) in &kvs {
kv_backend
.put(
PutRequest::new()
.with_key(key.clone())
.with_value(value.clone()),
)
.await
.unwrap();
}
let move_values = kvs
.iter()
.map(|(key, value)| MoveValue {
key: key.clone(),
dest_key: tombstone_manager.to_tombstone(key),
value: value.clone(),
})
.collect::<Vec<_>>();
let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
.clone()
.into_iter()
.map(|kv| (kv.key, kv.dest_key))
.unzip();
let moved_keys = tombstone_manager
.move_values(keys.clone(), dest_keys.clone())
.await
.unwrap();
assert_eq!(kvs.len(), moved_keys);
check_moved_values(kv_backend.clone(), &move_values).await;
// Moves again
let moved_keys = tombstone_manager
.move_values(keys.clone(), dest_keys.clone())
.await
.unwrap();
assert_eq!(0, moved_keys);
check_moved_values(kv_backend.clone(), &move_values).await;
}
@@ -439,17 +537,19 @@ mod tests {
.unzip();
keys.push(b"non-exists".to_vec());
dest_keys.push(b"hi/non-exists".to_vec());
tombstone_manager
let moved_keys = tombstone_manager
.move_values(keys.clone(), dest_keys.clone())
.await
.unwrap();
check_moved_values(kv_backend.clone(), &move_values).await;
assert_eq!(3, moved_keys);
// Moves again
tombstone_manager
let moved_keys = tombstone_manager
.move_values(keys.clone(), dest_keys.clone())
.await
.unwrap();
check_moved_values(kv_backend.clone(), &move_values).await;
assert_eq!(0, moved_keys);
}
#[tokio::test]
@@ -490,10 +590,11 @@ mod tests {
.into_iter()
.map(|kv| (kv.key, kv.dest_key))
.unzip();
tombstone_manager
let moved_keys = tombstone_manager
.move_values(keys, dest_keys)
.await
.unwrap();
assert_eq!(kvs.len(), moved_keys);
}
#[tokio::test]
@@ -571,4 +672,24 @@ mod tests {
.unwrap();
check_moved_values(kv_backend.clone(), &move_values).await;
}
#[tokio::test]
async fn test_move_values_with_different_lengths() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let tombstone_manager = TombstoneManager::new(kv_backend.clone());
let keys = vec![b"bar".to_vec(), b"foo".to_vec()];
let dest_keys = vec![b"bar".to_vec(), b"foo".to_vec(), b"baz".to_vec()];
let err = tombstone_manager
.move_values(keys, dest_keys)
.await
.unwrap_err();
assert!(err
.to_string()
.contains("The length of keys(2) does not match the length of dest_keys(3)."),);
let moved_keys = tombstone_manager.move_values(vec![], vec![]).await.unwrap();
assert_eq!(0, moved_keys);
}
}

View File

@@ -69,8 +69,6 @@ pub enum DdlTask {
AlterDatabase(AlterDatabaseTask),
CreateFlow(CreateFlowTask),
DropFlow(DropFlowTask),
#[cfg(feature = "enterprise")]
DropTrigger(trigger::DropTriggerTask),
CreateView(CreateViewTask),
DropView(DropViewTask),
#[cfg(feature = "enterprise")]
@@ -261,18 +259,6 @@ impl TryFrom<Task> for DdlTask {
.fail()
}
}
Task::DropTriggerTask(drop_trigger) => {
#[cfg(feature = "enterprise")]
return Ok(DdlTask::DropTrigger(drop_trigger.try_into()?));
#[cfg(not(feature = "enterprise"))]
{
let _ = drop_trigger;
crate::error::UnsupportedSnafu {
operation: "drop trigger",
}
.fail()
}
}
}
}
}
@@ -325,8 +311,6 @@ impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
DdlTask::DropView(task) => Task::DropViewTask(task.into()),
#[cfg(feature = "enterprise")]
DdlTask::CreateTrigger(task) => Task::CreateTriggerTask(task.into()),
#[cfg(feature = "enterprise")]
DdlTask::DropTrigger(task) => Task::DropTriggerTask(task.into()),
};
Ok(Self {

View File

@@ -1,13 +1,10 @@
use std::collections::HashMap;
use std::time::Duration;
use api::v1::meta::{
CreateTriggerTask as PbCreateTriggerTask, DropTriggerTask as PbDropTriggerTask,
};
use api::v1::meta::CreateTriggerTask as PbCreateTriggerTask;
use api::v1::notify_channel::ChannelType as PbChannelType;
use api::v1::{
CreateTriggerExpr as PbCreateTriggerExpr, DropTriggerExpr as PbDropTriggerExpr,
NotifyChannel as PbNotifyChannel, WebhookOptions as PbWebhookOptions,
CreateTriggerExpr, NotifyChannel as PbNotifyChannel, WebhookOptions as PbWebhookOptions,
};
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
@@ -59,7 +56,7 @@ impl From<CreateTriggerTask> for PbCreateTriggerTask {
.map(PbNotifyChannel::from)
.collect();
let expr = PbCreateTriggerExpr {
let expr = CreateTriggerExpr {
catalog_name: task.catalog_name,
trigger_name: task.trigger_name,
create_if_not_exists: task.if_not_exists,
@@ -142,86 +139,17 @@ impl TryFrom<PbNotifyChannel> for NotifyChannel {
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DropTriggerTask {
pub catalog_name: String,
pub trigger_name: String,
pub drop_if_exists: bool,
}
impl From<DropTriggerTask> for PbDropTriggerTask {
fn from(task: DropTriggerTask) -> Self {
let expr = PbDropTriggerExpr {
catalog_name: task.catalog_name,
trigger_name: task.trigger_name,
drop_if_exists: task.drop_if_exists,
};
PbDropTriggerTask {
drop_trigger: Some(expr),
}
}
}
impl TryFrom<PbDropTriggerTask> for DropTriggerTask {
type Error = error::Error;
fn try_from(task: PbDropTriggerTask) -> Result<Self> {
let expr = task.drop_trigger.context(error::InvalidProtoMsgSnafu {
err_msg: "expected drop_trigger",
})?;
Ok(DropTriggerTask {
catalog_name: expr.catalog_name,
trigger_name: expr.trigger_name,
drop_if_exists: expr.drop_if_exists,
})
}
}
impl DdlTask {
/// Creates a [`DdlTask`] to create a trigger.
pub fn new_create_trigger(expr: CreateTriggerTask) -> Self {
DdlTask::CreateTrigger(expr)
}
/// Creates a [`DdlTask`] to drop a trigger.
pub fn new_drop_trigger(expr: DropTriggerTask) -> Self {
DdlTask::DropTrigger(expr)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_convert_drop_trigger_task() {
let original = DropTriggerTask {
catalog_name: "test_catalog".to_string(),
trigger_name: "test_trigger".to_string(),
drop_if_exists: true,
};
let pb_task: PbDropTriggerTask = original.clone().into();
let expr = pb_task.drop_trigger.as_ref().unwrap();
assert_eq!(expr.catalog_name, "test_catalog");
assert_eq!(expr.trigger_name, "test_trigger");
assert!(expr.drop_if_exists);
let round_tripped = DropTriggerTask::try_from(pb_task).unwrap();
assert_eq!(original.catalog_name, round_tripped.catalog_name);
assert_eq!(original.trigger_name, round_tripped.trigger_name);
assert_eq!(original.drop_if_exists, round_tripped.drop_if_exists);
// Test invalid case where drop_trigger is None
let invalid_task = PbDropTriggerTask { drop_trigger: None };
let result = DropTriggerTask::try_from(invalid_task);
assert!(result.is_err());
}
#[test]
fn test_convert_create_trigger_task() {
let original = CreateTriggerTask {

View File

@@ -14,7 +14,7 @@
pub mod columnar_value;
pub mod error;
pub mod function;
mod function;
pub mod logical_plan;
pub mod prelude;
pub mod request;

View File

@@ -222,6 +222,7 @@ pub struct RecordBatchStreamAdapter {
enum Metrics {
Unavailable,
Unresolved(Arc<dyn ExecutionPlan>),
PartialResolved(Arc<dyn ExecutionPlan>, RecordBatchMetrics),
Resolved(RecordBatchMetrics),
}
@@ -275,7 +276,9 @@ impl RecordBatchStream for RecordBatchStreamAdapter {
fn metrics(&self) -> Option<RecordBatchMetrics> {
match &self.metrics_2 {
Metrics::Resolved(metrics) => Some(metrics.clone()),
Metrics::Resolved(metrics) | Metrics::PartialResolved(_, metrics) => {
Some(metrics.clone())
}
Metrics::Unavailable | Metrics::Unresolved(_) => None,
}
}
@@ -299,13 +302,25 @@ impl Stream for RecordBatchStreamAdapter {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(df_record_batch)) => {
let df_record_batch = df_record_batch?;
if let Metrics::Unresolved(df_plan) | Metrics::PartialResolved(df_plan, _) =
&self.metrics_2
{
let mut metric_collector = MetricCollector::new(self.explain_verbose);
accept(df_plan.as_ref(), &mut metric_collector).unwrap();
self.metrics_2 = Metrics::PartialResolved(
df_plan.clone(),
metric_collector.record_batch_metrics,
);
}
Poll::Ready(Some(RecordBatch::try_from_df_record_batch(
self.schema(),
df_record_batch,
)))
}
Poll::Ready(None) => {
if let Metrics::Unresolved(df_plan) = &self.metrics_2 {
if let Metrics::Unresolved(df_plan) | Metrics::PartialResolved(df_plan, _) =
&self.metrics_2
{
let mut metric_collector = MetricCollector::new(self.explain_verbose);
accept(df_plan.as_ref(), &mut metric_collector).unwrap();
self.metrics_2 = Metrics::Resolved(metric_collector.record_batch_metrics);

View File

@@ -178,8 +178,6 @@ pub enum Error {
StreamTimeout {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: tokio::time::error::Elapsed,
},
#[snafu(display("RecordBatch slice index overflow: {visit_index} > {size}"))]

View File

@@ -14,6 +14,7 @@ workspace = true
[dependencies]
backtrace = "0.3"
common-error.workspace = true
common-version.workspace = true
console-subscriber = { version = "0.1", optional = true }
greptime-proto.workspace = true
humantime-serde.workspace = true
@@ -22,7 +23,7 @@ once_cell.workspace = true
opentelemetry = { version = "0.21.0", default-features = false, features = [
"trace",
] }
opentelemetry-otlp = { version = "0.14.0", features = ["tokio", "http-proto", "reqwest-client"] }
opentelemetry-otlp = { version = "0.14.0", features = ["tokio"] }
opentelemetry-semantic-conventions = "0.13.0"
opentelemetry_sdk = { version = "0.21.0", features = ["rt-tokio"] }
parking_lot.workspace = true

View File

@@ -20,7 +20,7 @@ use std::time::Duration;
use once_cell::sync::{Lazy, OnceCell};
use opentelemetry::{global, KeyValue};
use opentelemetry_otlp::{Protocol, SpanExporterBuilder, WithExportConfig};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use opentelemetry_sdk::trace::Sampler;
use opentelemetry_semantic_conventions::resource;
@@ -36,11 +36,7 @@ use tracing_subscriber::{filter, EnvFilter, Registry};
use crate::tracing_sampler::{create_sampler, TracingSampleOptions};
/// The default endpoint when use gRPC exporter protocol.
pub const DEFAULT_OTLP_GRPC_ENDPOINT: &str = "http://localhost:4317";
/// The default endpoint when use HTTP exporter protocol.
pub const DEFAULT_OTLP_HTTP_ENDPOINT: &str = "http://localhost:4318";
pub const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317";
/// The default logs directory.
pub const DEFAULT_LOGGING_DIR: &str = "logs";
@@ -71,25 +67,11 @@ pub struct LoggingOptions {
/// Whether to enable tracing with OTLP. Default is false.
pub enable_otlp_tracing: bool,
/// The endpoint of OTLP. Default is "http://localhost:4318".
/// The endpoint of OTLP. Default is "http://localhost:4317".
pub otlp_endpoint: Option<String>,
/// The tracing sample ratio.
pub tracing_sample_ratio: Option<TracingSampleOptions>,
/// The protocol of OTLP export.
pub otlp_export_protocol: Option<OtlpExportProtocol>,
}
/// The protocol of OTLP export.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum OtlpExportProtocol {
/// GRPC protocol.
Grpc,
/// HTTP protocol with binary protobuf.
Http,
}
/// The options of slow query.
@@ -165,7 +147,6 @@ impl Default for LoggingOptions {
append_stdout: true,
// Rotation hourly, 24 files per day, keeps info log files of 30 days
max_log_files: 720,
otlp_export_protocol: None,
}
}
}
@@ -403,13 +384,26 @@ pub fn init_global_logging(
resource::SERVICE_INSTANCE_ID,
node_id.unwrap_or("none".to_string()),
),
KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
KeyValue::new(resource::SERVICE_VERSION, common_version::version()),
KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()),
]));
let exporter = opentelemetry_otlp::new_exporter().tonic().with_endpoint(
opts.otlp_endpoint
.as_ref()
.map(|e| {
if e.starts_with("http") {
e.to_string()
} else {
format!("http://{}", e)
}
})
.unwrap_or(DEFAULT_OTLP_ENDPOINT.to_string()),
);
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(build_otlp_exporter(opts))
.with_exporter(exporter)
.with_trace_config(trace_config)
.install_batch(opentelemetry_sdk::runtime::Tokio)
.expect("otlp tracer install failed");
@@ -427,42 +421,6 @@ pub fn init_global_logging(
guards
}
fn build_otlp_exporter(opts: &LoggingOptions) -> SpanExporterBuilder {
let protocol = opts
.otlp_export_protocol
.clone()
.unwrap_or(OtlpExportProtocol::Http);
let endpoint = opts
.otlp_endpoint
.as_ref()
.map(|e| {
if e.starts_with("http") {
e.to_string()
} else {
format!("http://{}", e)
}
})
.unwrap_or_else(|| match protocol {
OtlpExportProtocol::Grpc => DEFAULT_OTLP_GRPC_ENDPOINT.to_string(),
OtlpExportProtocol::Http => DEFAULT_OTLP_HTTP_ENDPOINT.to_string(),
});
match protocol {
OtlpExportProtocol::Grpc => SpanExporterBuilder::Tonic(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(endpoint),
),
OtlpExportProtocol::Http => SpanExporterBuilder::Http(
opentelemetry_otlp::new_exporter()
.http()
.with_endpoint(endpoint)
.with_protocol(Protocol::HttpBinary),
),
}
}
fn build_slow_query_logger<S>(
opts: &LoggingOptions,
slow_query_opts: Option<&SlowQueryOptions>,

View File

@@ -17,4 +17,5 @@ shadow-rs.workspace = true
[build-dependencies]
build-data = "0.2"
cargo-manifest = "0.19"
shadow-rs.workspace = true

View File

@@ -14,8 +14,10 @@
use std::collections::BTreeSet;
use std::env;
use std::path::PathBuf;
use build_data::{format_timestamp, get_source_time};
use cargo_manifest::Manifest;
use shadow_rs::{BuildPattern, ShadowBuilder, CARGO_METADATA, CARGO_TREE};
fn main() -> shadow_rs::SdResult<()> {
@@ -33,6 +35,24 @@ fn main() -> shadow_rs::SdResult<()> {
// solve the problem where the "CARGO_MANIFEST_DIR" is not what we want when this repo is
// made as a submodule in another repo.
let src_path = env::var("CARGO_WORKSPACE_DIR").or_else(|_| env::var("CARGO_MANIFEST_DIR"))?;
let manifest = Manifest::from_path(PathBuf::from(&src_path).join("Cargo.toml"))
.expect("Failed to parse Cargo.toml");
if let Some(product_version) = manifest.workspace.as_ref().and_then(|w| {
w.metadata.as_ref().and_then(|m| {
m.get("greptime")
.and_then(|g| g.get("product_version").and_then(|v| v.as_str()))
})
}) {
println!(
"cargo:rustc-env=GREPTIME_PRODUCT_VERSION={}",
product_version
);
} else {
let version = env::var("CARGO_PKG_VERSION").unwrap();
println!("cargo:rustc-env=GREPTIME_PRODUCT_VERSION={}", version,);
}
let out_path = env::var("OUT_DIR")?;
let _ = ShadowBuilder::builder()

View File

@@ -105,13 +105,17 @@ pub const fn build_info() -> BuildInfo {
build_time: env!("BUILD_TIMESTAMP"),
rustc: build::RUST_VERSION,
target: build::BUILD_TARGET,
version: build::PKG_VERSION,
version: env!("GREPTIME_PRODUCT_VERSION"),
}
}
const BUILD_INFO: BuildInfo = build_info();
pub const fn version() -> &'static str {
BUILD_INFO.version
}
pub const fn verbose_version() -> &'static str {
const_format::formatcp!(
"\nbranch: {}\ncommit: {}\nclean: {}\nversion: {}",
BUILD_INFO.branch,

View File

@@ -475,7 +475,7 @@ mod test {
async fn region_alive_keeper() {
common_telemetry::init_default_ut_logging();
let mut region_server = mock_region_server();
let mut engine_env = TestEnv::with_prefix("region-alive-keeper");
let mut engine_env = TestEnv::with_prefix("region-alive-keeper").await;
let engine = engine_env.create_engine(MitoConfig::default()).await;
let engine = Arc::new(engine);
region_server.register_engine(engine.clone());

View File

@@ -14,7 +14,10 @@
//! Datanode configurations
use core::time::Duration;
use common_base::readable_size::ReadableSize;
use common_base::secrets::{ExposeSecret, SecretString};
use common_config::{Configurable, DEFAULT_DATA_HOME};
pub use common_procedure::options::ProcedureConfig;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
@@ -24,7 +27,6 @@ use file_engine::config::EngineConfig as FileEngineConfig;
use meta_client::MetaClientOptions;
use metric_engine::config::EngineConfig as MetricEngineConfig;
use mito2::config::MitoConfig;
pub(crate) use object_store::config::ObjectStoreConfig;
use query::options::QueryOptions;
use serde::{Deserialize, Serialize};
use servers::export_metrics::ExportMetricsOption;
@@ -34,6 +36,53 @@ use servers::http::HttpOptions;
pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::gb(5);
/// Object storage config
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type")]
pub enum ObjectStoreConfig {
File(FileConfig),
S3(S3Config),
Oss(OssConfig),
Azblob(AzblobConfig),
Gcs(GcsConfig),
}
impl ObjectStoreConfig {
/// Returns the object storage type name, such as `S3`, `Oss` etc.
pub fn provider_name(&self) -> &'static str {
match self {
Self::File(_) => "File",
Self::S3(_) => "S3",
Self::Oss(_) => "Oss",
Self::Azblob(_) => "Azblob",
Self::Gcs(_) => "Gcs",
}
}
/// Returns true when it's a remote object storage such as AWS s3 etc.
pub fn is_object_storage(&self) -> bool {
!matches!(self, Self::File(_))
}
/// Returns the object storage configuration name, return the provider name if it's empty.
pub fn config_name(&self) -> &str {
let name = match self {
// file storage doesn't support name
Self::File(_) => self.provider_name(),
Self::S3(s3) => &s3.name,
Self::Oss(oss) => &oss.name,
Self::Azblob(az) => &az.name,
Self::Gcs(gcs) => &gcs.name,
};
if name.trim().is_empty() {
return self.provider_name();
}
name
}
}
/// Storage engine config
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(default)]
@@ -63,6 +112,252 @@ impl Default for StorageConfig {
}
}
#[derive(Debug, Clone, Serialize, Default, Deserialize, Eq, PartialEq)]
#[serde(default)]
pub struct FileConfig {}
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
#[serde(default)]
pub struct ObjectStorageCacheConfig {
/// The local file cache directory
pub cache_path: Option<String>,
/// The cache capacity in bytes
pub cache_capacity: Option<ReadableSize>,
}
/// The http client options to the storage.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(default)]
pub struct HttpClientConfig {
/// The maximum idle connection per host allowed in the pool.
pub(crate) pool_max_idle_per_host: u32,
/// The timeout for only the connect phase of a http client.
#[serde(with = "humantime_serde")]
pub(crate) connect_timeout: Duration,
/// The total request timeout, applied from when the request starts connecting until the response body has finished.
/// Also considered a total deadline.
#[serde(with = "humantime_serde")]
pub(crate) timeout: Duration,
/// The timeout for idle sockets being kept-alive.
#[serde(with = "humantime_serde")]
pub(crate) pool_idle_timeout: Duration,
/// Skip SSL certificate validation (insecure)
pub skip_ssl_validation: bool,
}
impl Default for HttpClientConfig {
fn default() -> Self {
Self {
pool_max_idle_per_host: 1024,
connect_timeout: Duration::from_secs(30),
timeout: Duration::from_secs(30),
pool_idle_timeout: Duration::from_secs(90),
skip_ssl_validation: false,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct S3Config {
pub name: String,
pub bucket: String,
pub root: String,
#[serde(skip_serializing)]
pub access_key_id: SecretString,
#[serde(skip_serializing)]
pub secret_access_key: SecretString,
pub endpoint: Option<String>,
pub region: Option<String>,
/// Enable virtual host style so that opendal will send API requests in virtual host style instead of path style.
/// By default, opendal will send API to https://s3.us-east-1.amazonaws.com/bucket_name
/// Enabled, opendal will send API to https://bucket_name.s3.us-east-1.amazonaws.com
pub enable_virtual_host_style: bool,
#[serde(flatten)]
pub cache: ObjectStorageCacheConfig,
pub http_client: HttpClientConfig,
}
impl PartialEq for S3Config {
fn eq(&self, other: &Self) -> bool {
self.name == other.name
&& self.bucket == other.bucket
&& self.root == other.root
&& self.access_key_id.expose_secret() == other.access_key_id.expose_secret()
&& self.secret_access_key.expose_secret() == other.secret_access_key.expose_secret()
&& self.endpoint == other.endpoint
&& self.region == other.region
&& self.enable_virtual_host_style == other.enable_virtual_host_style
&& self.cache == other.cache
&& self.http_client == other.http_client
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct OssConfig {
pub name: String,
pub bucket: String,
pub root: String,
#[serde(skip_serializing)]
pub access_key_id: SecretString,
#[serde(skip_serializing)]
pub access_key_secret: SecretString,
pub endpoint: String,
#[serde(flatten)]
pub cache: ObjectStorageCacheConfig,
pub http_client: HttpClientConfig,
}
impl PartialEq for OssConfig {
fn eq(&self, other: &Self) -> bool {
self.name == other.name
&& self.bucket == other.bucket
&& self.root == other.root
&& self.access_key_id.expose_secret() == other.access_key_id.expose_secret()
&& self.access_key_secret.expose_secret() == other.access_key_secret.expose_secret()
&& self.endpoint == other.endpoint
&& self.cache == other.cache
&& self.http_client == other.http_client
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct AzblobConfig {
pub name: String,
pub container: String,
pub root: String,
#[serde(skip_serializing)]
pub account_name: SecretString,
#[serde(skip_serializing)]
pub account_key: SecretString,
pub endpoint: String,
pub sas_token: Option<String>,
#[serde(flatten)]
pub cache: ObjectStorageCacheConfig,
pub http_client: HttpClientConfig,
}
impl PartialEq for AzblobConfig {
fn eq(&self, other: &Self) -> bool {
self.name == other.name
&& self.container == other.container
&& self.root == other.root
&& self.account_name.expose_secret() == other.account_name.expose_secret()
&& self.account_key.expose_secret() == other.account_key.expose_secret()
&& self.endpoint == other.endpoint
&& self.sas_token == other.sas_token
&& self.cache == other.cache
&& self.http_client == other.http_client
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct GcsConfig {
pub name: String,
pub root: String,
pub bucket: String,
pub scope: String,
#[serde(skip_serializing)]
pub credential_path: SecretString,
#[serde(skip_serializing)]
pub credential: SecretString,
pub endpoint: String,
#[serde(flatten)]
pub cache: ObjectStorageCacheConfig,
pub http_client: HttpClientConfig,
}
impl PartialEq for GcsConfig {
fn eq(&self, other: &Self) -> bool {
self.name == other.name
&& self.root == other.root
&& self.bucket == other.bucket
&& self.scope == other.scope
&& self.credential_path.expose_secret() == other.credential_path.expose_secret()
&& self.credential.expose_secret() == other.credential.expose_secret()
&& self.endpoint == other.endpoint
&& self.cache == other.cache
&& self.http_client == other.http_client
}
}
impl Default for S3Config {
fn default() -> Self {
Self {
name: String::default(),
bucket: String::default(),
root: String::default(),
access_key_id: SecretString::from(String::default()),
secret_access_key: SecretString::from(String::default()),
enable_virtual_host_style: false,
endpoint: Option::default(),
region: Option::default(),
cache: ObjectStorageCacheConfig::default(),
http_client: HttpClientConfig::default(),
}
}
}
impl Default for OssConfig {
fn default() -> Self {
Self {
name: String::default(),
bucket: String::default(),
root: String::default(),
access_key_id: SecretString::from(String::default()),
access_key_secret: SecretString::from(String::default()),
endpoint: String::default(),
cache: ObjectStorageCacheConfig::default(),
http_client: HttpClientConfig::default(),
}
}
}
impl Default for AzblobConfig {
fn default() -> Self {
Self {
name: String::default(),
container: String::default(),
root: String::default(),
account_name: SecretString::from(String::default()),
account_key: SecretString::from(String::default()),
endpoint: String::default(),
sas_token: Option::default(),
cache: ObjectStorageCacheConfig::default(),
http_client: HttpClientConfig::default(),
}
}
}
impl Default for GcsConfig {
fn default() -> Self {
Self {
name: String::default(),
root: String::default(),
bucket: String::default(),
scope: String::default(),
credential_path: SecretString::from(String::default()),
credential: SecretString::from(String::default()),
endpoint: String::default(),
cache: ObjectStorageCacheConfig::default(),
http_client: HttpClientConfig::default(),
}
}
}
impl Default for ObjectStoreConfig {
fn default() -> Self {
ObjectStoreConfig::File(FileConfig {})
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[serde(default)]
pub struct DatanodeOptions {
@@ -172,6 +467,37 @@ mod tests {
let _parsed: DatanodeOptions = toml::from_str(&toml_string).unwrap();
}
#[test]
fn test_config_name() {
let object_store_config = ObjectStoreConfig::default();
assert_eq!("File", object_store_config.config_name());
let s3_config = ObjectStoreConfig::S3(S3Config::default());
assert_eq!("S3", s3_config.config_name());
assert_eq!("S3", s3_config.provider_name());
let s3_config = ObjectStoreConfig::S3(S3Config {
name: "test".to_string(),
..Default::default()
});
assert_eq!("test", s3_config.config_name());
assert_eq!("S3", s3_config.provider_name());
}
#[test]
fn test_is_object_storage() {
let store = ObjectStoreConfig::default();
assert!(!store.is_object_storage());
let s3_config = ObjectStoreConfig::S3(S3Config::default());
assert!(s3_config.is_object_storage());
let oss_config = ObjectStoreConfig::Oss(OssConfig::default());
assert!(oss_config.is_object_storage());
let gcs_config = ObjectStoreConfig::Gcs(GcsConfig::default());
assert!(gcs_config.is_object_storage());
let azblob_config = ObjectStoreConfig::Azblob(AzblobConfig::default());
assert!(azblob_config.is_object_storage());
}
#[test]
fn test_secstr() {
let toml_str = r#"

View File

@@ -142,6 +142,14 @@ pub enum Error {
source: Box<log_store::error::Error>,
},
#[snafu(display("Failed to init backend"))]
InitBackend {
#[snafu(source)]
error: object_store::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid SQL, error: {}", msg))]
InvalidSql { msg: String },
@@ -379,29 +387,6 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to serialize json"))]
SerializeJson {
#[snafu(source)]
error: serde_json::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed object store operation"))]
ObjectStore {
source: object_store::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to build cache store"))]
BuildCacheStore {
#[snafu(source)]
error: object_store::Error,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -454,6 +439,8 @@ impl ErrorExt for Error {
StartServer { source, .. } | ShutdownServer { source, .. } => source.status_code(),
InitBackend { .. } => StatusCode::StorageUnavailable,
OpenLogStore { source, .. } => source.status_code(),
MetaClientInit { source, .. } => source.status_code(),
UnsupportedOutput { .. } => StatusCode::Unsupported,
@@ -470,10 +457,6 @@ impl ErrorExt for Error {
StatusCode::RegionBusy
}
MissingCache { .. } => StatusCode::Internal,
SerializeJson { .. } => StatusCode::Internal,
ObjectStore { source, .. } => source.status_code(),
BuildCacheStore { .. } => StatusCode::StorageUnavailable,
}
}

View File

@@ -278,7 +278,7 @@ mod tests {
let mut region_server = mock_region_server();
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
let mut engine_env = TestEnv::with_prefix("close-region");
let mut engine_env = TestEnv::with_prefix("close-region").await;
let engine = engine_env.create_engine(MitoConfig::default()).await;
region_server.register_engine(Arc::new(engine));
let region_id = RegionId::new(1024, 1);
@@ -326,7 +326,7 @@ mod tests {
let mut region_server = mock_region_server();
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
let mut engine_env = TestEnv::with_prefix("open-region");
let mut engine_env = TestEnv::with_prefix("open-region").await;
let engine = engine_env.create_engine(MitoConfig::default()).await;
region_server.register_engine(Arc::new(engine));
let region_id = RegionId::new(1024, 1);
@@ -374,7 +374,7 @@ mod tests {
let mut region_server = mock_region_server();
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
let mut engine_env = TestEnv::with_prefix("open-not-exists-region");
let mut engine_env = TestEnv::with_prefix("open-not-exists-region").await;
let engine = engine_env.create_engine(MitoConfig::default()).await;
region_server.register_engine(Arc::new(engine));
let region_id = RegionId::new(1024, 1);
@@ -406,7 +406,7 @@ mod tests {
let mut region_server = mock_region_server();
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
let mut engine_env = TestEnv::with_prefix("downgrade-region");
let mut engine_env = TestEnv::with_prefix("downgrade-region").await;
let engine = engine_env.create_engine(MitoConfig::default()).await;
region_server.register_engine(Arc::new(engine));
let region_id = RegionId::new(1024, 1);

View File

@@ -27,14 +27,14 @@ lazy_static! {
pub static ref HANDLE_REGION_REQUEST_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_datanode_handle_region_request_elapsed",
"datanode handle region request elapsed",
&[REGION_ID, REGION_REQUEST_TYPE]
&[REGION_REQUEST_TYPE]
)
.unwrap();
/// The number of rows in region request received by region server, labeled with request type.
pub static ref REGION_CHANGED_ROW_COUNT: IntCounterVec = register_int_counter_vec!(
"greptime_datanode_region_changed_row_count",
"datanode region changed row count",
&[REGION_ID, REGION_REQUEST_TYPE]
&[REGION_REQUEST_TYPE]
)
.unwrap();
/// The elapsed time since the last received heartbeat.

View File

@@ -20,14 +20,12 @@ use std::time::Duration;
use api::region::RegionResponse;
use api::v1::region::sync_request::ManifestInfo;
use api::v1::region::{
region_request, ListMetadataRequest, RegionResponse as RegionResponseV1, SyncRequest,
};
use api::v1::region::{region_request, RegionResponse as RegionResponseV1, SyncRequest};
use api::v1::{ResponseHeader, Status};
use arrow_flight::{FlightData, Ticket};
use async_trait::async_trait;
use bytes::Bytes;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::ext::BoxedError;
use common_error::status_code::StatusCode;
use common_query::request::QueryRequest;
use common_query::OutputData;
@@ -49,12 +47,11 @@ pub use query::dummy_catalog::{
DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef,
};
use query::QueryEngineRef;
use serde_json;
use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult};
use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
use servers::grpc::region_server::RegionServerHandler;
use servers::grpc::FlightCompression;
use session::context::{QueryContextBuilder, QueryContextRef};
use session::context::{QueryContext, QueryContextBuilder, QueryContextRef};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metric_engine_consts::{
FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
@@ -74,10 +71,10 @@ use tonic::{Request, Response, Result as TonicResult};
use crate::error::{
self, BuildRegionRequestsSnafu, ConcurrentQueryLimiterClosedSnafu,
ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, GetRegionMetadataSnafu,
HandleBatchDdlRequestSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu,
NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu,
Result, SerializeJsonSnafu, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, HandleBatchDdlRequestSnafu,
HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu, NewPlanDecoderSnafu,
RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu, Result,
StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
};
use crate::event_listener::RegionServerEventListenerRef;
@@ -141,12 +138,12 @@ impl RegionServer {
/// Finds the region's engine by its id. If the region is not ready, returns `None`.
pub fn find_engine(&self, region_id: RegionId) -> Result<Option<RegionEngineRef>> {
match self.inner.get_engine(region_id, &RegionChange::None) {
Ok(CurrentEngine::Engine(engine)) => Ok(Some(engine)),
Ok(CurrentEngine::EarlyReturn(_)) => Ok(None),
Err(error::Error::RegionNotFound { .. }) => Ok(None),
Err(err) => Err(err),
}
self.inner
.get_engine(region_id, &RegionChange::None)
.map(|x| match x {
CurrentEngine::Engine(engine) => Some(engine),
CurrentEngine::EarlyReturn(_) => None,
})
}
#[tracing::instrument(skip_all)]
@@ -197,6 +194,7 @@ impl RegionServer {
pub async fn handle_remote_read(
&self,
request: api::v1::region::QueryRequest,
query_ctx: QueryContextRef,
) -> Result<SendableRecordBatchStream> {
let _permit = if let Some(p) = &self.inner.parallelism {
Some(p.acquire().await?)
@@ -204,12 +202,6 @@ impl RegionServer {
None
};
let query_ctx: QueryContextRef = request
.header
.as_ref()
.map(|h| Arc::new(h.into()))
.unwrap_or_else(|| Arc::new(QueryContextBuilder::default().build()));
let region_id = RegionId::from_u64(request.region_id);
let provider = self.table_provider(region_id, Some(&query_ctx)).await?;
let catalog_list = Arc::new(DummyCatalogList::with_table_provider(provider));
@@ -217,7 +209,7 @@ impl RegionServer {
let decoder = self
.inner
.query_engine
.engine_context(query_ctx)
.engine_context(query_ctx.clone())
.new_plan_decoder()
.context(NewPlanDecoderSnafu)?;
@@ -227,11 +219,14 @@ impl RegionServer {
.context(DecodeLogicalPlanSnafu)?;
self.inner
.handle_read(QueryRequest {
header: request.header,
region_id,
plan,
})
.handle_read(
QueryRequest {
header: request.header,
region_id,
plan,
},
query_ctx,
)
.await
}
@@ -246,6 +241,7 @@ impl RegionServer {
let ctx: Option<session::context::QueryContext> = request.header.as_ref().map(|h| h.into());
let provider = self.table_provider(request.region_id, ctx.as_ref()).await?;
let query_ctx = Arc::new(ctx.unwrap_or_else(|| QueryContextBuilder::default().build()));
struct RegionDataSourceInjector {
source: Arc<dyn TableSource>,
@@ -274,7 +270,7 @@ impl RegionServer {
.data;
self.inner
.handle_read(QueryRequest { plan, ..request })
.handle_read(QueryRequest { plan, ..request }, query_ctx)
.await
}
@@ -415,7 +411,6 @@ impl RegionServer {
Ok(RegionResponse {
affected_rows,
extensions,
metadata: Vec::new(),
})
}
@@ -445,7 +440,6 @@ impl RegionServer {
Ok(RegionResponse {
affected_rows,
extensions,
metadata: Vec::new(),
})
}
@@ -478,48 +472,6 @@ impl RegionServer {
.map(|_| RegionResponse::new(AffectedRows::default()))
}
/// Handles the ListMetadata request and retrieves metadata for specified regions.
///
/// Returns the results as a JSON-serialized list in the [RegionResponse]. It serializes
/// non-existing regions as `null`.
#[tracing::instrument(skip_all)]
async fn handle_list_metadata_request(
&self,
request: &ListMetadataRequest,
) -> Result<RegionResponse> {
let mut region_metadatas = Vec::new();
// Collect metadata for each region
for region_id in &request.region_ids {
let region_id = RegionId::from_u64(*region_id);
// Get the engine.
let Some(engine) = self.find_engine(region_id)? else {
region_metadatas.push(None);
continue;
};
match engine.get_metadata(region_id).await {
Ok(metadata) => region_metadatas.push(Some(metadata)),
Err(err) => {
if err.status_code() == StatusCode::RegionNotFound {
region_metadatas.push(None);
} else {
Err(err).with_context(|_| GetRegionMetadataSnafu {
engine: engine.name(),
region_id,
})?;
}
}
}
}
// Serialize metadata to JSON
let json_result = serde_json::to_vec(&region_metadatas).context(SerializeJsonSnafu)?;
let response = RegionResponse::from_metadata(json_result);
Ok(response)
}
/// Sync region manifest and registers new opened logical regions.
pub async fn sync_region(
&self,
@@ -551,10 +503,6 @@ impl RegionServerHandler for RegionServer {
region_request::Body::Sync(sync_request) => {
self.handle_sync_region_request(sync_request).await
}
region_request::Body::ListMetadata(list_metadata_request) => {
self.handle_list_metadata_request(list_metadata_request)
.await
}
_ => self.handle_requests_in_serial(request).await,
}
.map_err(BoxedError::new)
@@ -569,7 +517,6 @@ impl RegionServerHandler for RegionServer {
}),
affected_rows: response.affected_rows as _,
extensions: response.extensions,
metadata: response.metadata,
})
}
}
@@ -588,9 +535,14 @@ impl FlightCraft for RegionServer {
.as_ref()
.map(|h| TracingContext::from_w3c(&h.tracing_context))
.unwrap_or_default();
let query_ctx = request
.header
.as_ref()
.map(|h| Arc::new(QueryContext::from(h)))
.unwrap_or(QueryContext::arc());
let result = self
.handle_remote_read(request)
.handle_remote_read(request, query_ctx.clone())
.trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
.await?;
@@ -598,6 +550,7 @@ impl FlightCraft for RegionServer {
result,
tracing_context,
self.flight_compression,
query_ctx,
));
Ok(Response::new(stream))
}
@@ -949,7 +902,6 @@ impl RegionServerInner {
Ok(RegionResponse {
affected_rows: result.affected_rows,
extensions: result.extensions,
metadata: Vec::new(),
})
}
Err(err) => {
@@ -968,9 +920,8 @@ impl RegionServerInner {
request: RegionRequest,
) -> Result<RegionResponse> {
let request_type = request.request_type();
let region_id_str = region_id.to_string();
let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED
.with_label_values(&[&region_id_str, request_type])
.with_label_values(&[request_type])
.start_timer();
let region_change = match &request {
@@ -1010,7 +961,7 @@ impl RegionServerInner {
// Update metrics
if matches!(region_change, RegionChange::Ingest) {
crate::metrics::REGION_CHANGED_ROW_COUNT
.with_label_values(&[&region_id_str, request_type])
.with_label_values(&[request_type])
.inc_by(result.affected_rows as u64);
}
// Sets corresponding region status to ready.
@@ -1020,7 +971,6 @@ impl RegionServerInner {
Ok(RegionResponse {
affected_rows: result.affected_rows,
extensions: result.extensions,
metadata: Vec::new(),
})
}
Err(err) => {
@@ -1178,16 +1128,13 @@ impl RegionServerInner {
Ok(())
}
pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
pub async fn handle_read(
&self,
request: QueryRequest,
query_ctx: QueryContextRef,
) -> Result<SendableRecordBatchStream> {
// TODO(ruihang): add metrics and set trace id
// Build query context from gRPC header
let query_ctx: QueryContextRef = request
.header
.as_ref()
.map(|h| Arc::new(h.into()))
.unwrap_or_else(|| QueryContextBuilder::default().build().into());
let result = self
.query_engine
.execute(request.plan, query_ctx)
@@ -1296,11 +1243,8 @@ mod tests {
use std::assert_matches::assert_matches;
use api::v1::SemanticType;
use common_error::ext::ErrorExt;
use datatypes::prelude::ConcreteDataType;
use mito2::test_util::CreateRequestBuilder;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionDropRequest, RegionOpenRequest, RegionTruncateRequest};
use store_api::storage::RegionId;
@@ -1662,175 +1606,4 @@ mod tests {
let forth_query = p.acquire().await;
assert!(forth_query.is_ok());
}
fn mock_region_metadata(region_id: RegionId) -> RegionMetadata {
let mut metadata_builder = RegionMetadataBuilder::new(region_id);
metadata_builder.push_column_metadata(ColumnMetadata {
column_schema: datatypes::schema::ColumnSchema::new(
"timestamp",
ConcreteDataType::timestamp_nanosecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 0,
});
metadata_builder.push_column_metadata(ColumnMetadata {
column_schema: datatypes::schema::ColumnSchema::new(
"file",
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 1,
});
metadata_builder.push_column_metadata(ColumnMetadata {
column_schema: datatypes::schema::ColumnSchema::new(
"message",
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Field,
column_id: 2,
});
metadata_builder.primary_key(vec![1]);
metadata_builder.build().unwrap()
}
#[tokio::test]
async fn test_handle_list_metadata_request() {
common_telemetry::init_default_ut_logging();
let mut mock_region_server = mock_region_server();
let region_id_1 = RegionId::new(1, 0);
let region_id_2 = RegionId::new(2, 0);
let metadata_1 = mock_region_metadata(region_id_1);
let metadata_2 = mock_region_metadata(region_id_2);
let metadatas = vec![Some(metadata_1.clone()), Some(metadata_2.clone())];
let metadata_1 = Arc::new(metadata_1);
let metadata_2 = Arc::new(metadata_2);
let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
MITO_ENGINE_NAME,
Box::new(move |region_id| {
if region_id == region_id_1 {
Ok(metadata_1.clone())
} else if region_id == region_id_2 {
Ok(metadata_2.clone())
} else {
error::RegionNotFoundSnafu { region_id }.fail()
}
}),
);
mock_region_server.register_engine(engine.clone());
mock_region_server
.inner
.region_map
.insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
mock_region_server
.inner
.region_map
.insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
// All regions exist.
let list_metadata_request = ListMetadataRequest {
region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
};
let response = mock_region_server
.handle_list_metadata_request(&list_metadata_request)
.await
.unwrap();
let decoded_metadata: Vec<Option<RegionMetadata>> =
serde_json::from_slice(&response.metadata).unwrap();
assert_eq!(metadatas, decoded_metadata);
}
#[tokio::test]
async fn test_handle_list_metadata_not_found() {
common_telemetry::init_default_ut_logging();
let mut mock_region_server = mock_region_server();
let region_id_1 = RegionId::new(1, 0);
let region_id_2 = RegionId::new(2, 0);
let metadata_1 = mock_region_metadata(region_id_1);
let metadatas = vec![Some(metadata_1.clone()), None];
let metadata_1 = Arc::new(metadata_1);
let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
MITO_ENGINE_NAME,
Box::new(move |region_id| {
if region_id == region_id_1 {
Ok(metadata_1.clone())
} else {
error::RegionNotFoundSnafu { region_id }.fail()
}
}),
);
mock_region_server.register_engine(engine.clone());
mock_region_server
.inner
.region_map
.insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
// Not in region map.
let list_metadata_request = ListMetadataRequest {
region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
};
let response = mock_region_server
.handle_list_metadata_request(&list_metadata_request)
.await
.unwrap();
let decoded_metadata: Vec<Option<RegionMetadata>> =
serde_json::from_slice(&response.metadata).unwrap();
assert_eq!(metadatas, decoded_metadata);
// Not in region engine.
mock_region_server
.inner
.region_map
.insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
let response = mock_region_server
.handle_list_metadata_request(&list_metadata_request)
.await
.unwrap();
let decoded_metadata: Vec<Option<RegionMetadata>> =
serde_json::from_slice(&response.metadata).unwrap();
assert_eq!(metadatas, decoded_metadata);
}
#[tokio::test]
async fn test_handle_list_metadata_failed() {
common_telemetry::init_default_ut_logging();
let mut mock_region_server = mock_region_server();
let region_id_1 = RegionId::new(1, 0);
let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
MITO_ENGINE_NAME,
Box::new(move |region_id| {
error::UnexpectedSnafu {
violated: format!("Failed to get region {region_id}"),
}
.fail()
}),
);
mock_region_server.register_engine(engine.clone());
mock_region_server
.inner
.region_map
.insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
// Failed to get.
let list_metadata_request = ListMetadataRequest {
region_ids: vec![region_id_1.as_u64()],
};
mock_region_server
.handle_list_metadata_request(&list_metadata_request)
.await
.unwrap_err();
}
}

View File

@@ -14,22 +14,45 @@
//! object storage utilities
mod azblob;
pub mod fs;
mod gcs;
mod oss;
mod s3;
use std::path;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use common_telemetry::{info, warn};
use object_store::factory::new_raw_object_store;
use mito2::access_layer::{ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR};
use object_store::layers::{LruCacheLayer, RetryInterceptor, RetryLayer};
use object_store::services::Fs;
use object_store::util::{clean_temp_dir, join_dir, with_instrument_layers};
use object_store::{
Access, Error, ObjectStore, ObjectStoreBuilder, ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR,
};
use object_store::util::{join_dir, normalize_dir, with_instrument_layers};
use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder};
use snafu::prelude::*;
use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
use crate::error::{self, CreateDirSnafu, Result};
use crate::config::{HttpClientConfig, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
use crate::error::{self, BuildHttpClientSnafu, CreateDirSnafu, Result};
pub(crate) async fn new_raw_object_store(
store: &ObjectStoreConfig,
data_home: &str,
) -> Result<ObjectStore> {
let data_home = normalize_dir(data_home);
let object_store = match store {
ObjectStoreConfig::File(file_config) => {
fs::new_fs_object_store(&data_home, file_config).await
}
ObjectStoreConfig::S3(s3_config) => s3::new_s3_object_store(s3_config).await,
ObjectStoreConfig::Oss(oss_config) => oss::new_oss_object_store(oss_config).await,
ObjectStoreConfig::Azblob(azblob_config) => {
azblob::new_azblob_object_store(azblob_config).await
}
ObjectStoreConfig::Gcs(gcs_config) => gcs::new_gcs_object_store(gcs_config).await,
}?;
Ok(object_store)
}
fn with_retry_layers(object_store: ObjectStore) -> ObjectStore {
object_store.layer(
@@ -43,9 +66,7 @@ pub(crate) async fn new_object_store_without_cache(
store: &ObjectStoreConfig,
data_home: &str,
) -> Result<ObjectStore> {
let object_store = new_raw_object_store(store, data_home)
.await
.context(error::ObjectStoreSnafu)?;
let object_store = new_raw_object_store(store, data_home).await?;
// Enable retry layer and cache layer for non-fs object storages
let object_store = if store.is_object_storage() {
// Adds retry layer
@@ -62,9 +83,7 @@ pub(crate) async fn new_object_store(
store: ObjectStoreConfig,
data_home: &str,
) -> Result<ObjectStore> {
let object_store = new_raw_object_store(&store, data_home)
.await
.context(error::ObjectStoreSnafu)?;
let object_store = new_raw_object_store(&store, data_home).await?;
// Enable retry layer and cache layer for non-fs object storages
let object_store = if store.is_object_storage() {
let object_store = if let Some(cache_layer) = build_cache_layer(&store, data_home).await? {
@@ -151,20 +170,20 @@ async fn build_cache_layer(
&& !path.trim().is_empty()
{
let atomic_temp_dir = join_dir(path, ATOMIC_WRITE_DIR);
clean_temp_dir(&atomic_temp_dir).context(error::ObjectStoreSnafu)?;
clean_temp_dir(&atomic_temp_dir)?;
// Compatible code. Remove this after a major release.
let old_atomic_temp_dir = join_dir(path, OLD_ATOMIC_WRITE_DIR);
clean_temp_dir(&old_atomic_temp_dir).context(error::ObjectStoreSnafu)?;
clean_temp_dir(&old_atomic_temp_dir)?;
let cache_store = Fs::default()
.root(path)
.atomic_write_dir(&atomic_temp_dir)
.build()
.context(error::BuildCacheStoreSnafu)?;
.context(error::InitBackendSnafu)?;
let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize)
.context(error::BuildCacheStoreSnafu)?;
.context(error::InitBackendSnafu)?;
cache_layer.recover_cache(false).await;
info!(
"Enabled local object storage cache, path: {}, capacity: {}.",
@@ -177,6 +196,31 @@ async fn build_cache_layer(
}
}
pub(crate) fn clean_temp_dir(dir: &str) -> Result<()> {
if path::Path::new(&dir).exists() {
info!("Begin to clean temp storage directory: {}", dir);
std::fs::remove_dir_all(dir).context(error::RemoveDirSnafu { dir })?;
info!("Cleaned temp storage directory: {}", dir);
}
Ok(())
}
pub(crate) fn build_http_client(config: &HttpClientConfig) -> Result<HttpClient> {
if config.skip_ssl_validation {
common_telemetry::warn!("Skipping SSL validation for object storage HTTP client. Please ensure the environment is trusted.");
}
let client = reqwest::ClientBuilder::new()
.pool_max_idle_per_host(config.pool_max_idle_per_host as usize)
.connect_timeout(config.connect_timeout)
.pool_idle_timeout(config.pool_idle_timeout)
.timeout(config.timeout)
.danger_accept_invalid_certs(config.skip_ssl_validation)
.build()
.context(BuildHttpClientSnafu)?;
Ok(HttpClient::with(client))
}
struct PrintDetailedError;
// PrintDetailedError is a retry interceptor that prints error in Debug format in retrying.

View File

@@ -0,0 +1,50 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_base::secrets::ExposeSecret;
use common_telemetry::info;
use object_store::services::Azblob;
use object_store::{util, ObjectStore};
use snafu::prelude::*;
use crate::config::AzblobConfig;
use crate::error::{self, Result};
use crate::store::build_http_client;
pub(crate) async fn new_azblob_object_store(azblob_config: &AzblobConfig) -> Result<ObjectStore> {
let root = util::normalize_dir(&azblob_config.root);
info!(
"The azure storage container is: {}, root is: {}",
azblob_config.container, &root
);
let client = build_http_client(&azblob_config.http_client)?;
let mut builder = Azblob::default()
.root(&root)
.container(&azblob_config.container)
.endpoint(&azblob_config.endpoint)
.account_name(azblob_config.account_name.expose_secret())
.account_key(azblob_config.account_key.expose_secret())
.http_client(client);
if let Some(token) = &azblob_config.sas_token {
builder = builder.sas_token(token);
};
Ok(ObjectStore::new(builder)
.context(error::InitBackendSnafu)?
.finish())
}

View File

@@ -0,0 +1,53 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{fs, path};
use common_telemetry::info;
use mito2::access_layer::{ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR};
use object_store::services::Fs;
use object_store::util::join_dir;
use object_store::ObjectStore;
use snafu::prelude::*;
use crate::config::FileConfig;
use crate::error::{self, Result};
use crate::store;
/// A helper function to create a file system object store.
pub async fn new_fs_object_store(
data_home: &str,
_file_config: &FileConfig,
) -> Result<ObjectStore> {
fs::create_dir_all(path::Path::new(&data_home))
.context(error::CreateDirSnafu { dir: data_home })?;
info!("The file storage home is: {}", data_home);
let atomic_write_dir = join_dir(data_home, ATOMIC_WRITE_DIR);
store::clean_temp_dir(&atomic_write_dir)?;
// Compatible code. Remove this after a major release.
let old_atomic_temp_dir = join_dir(data_home, OLD_ATOMIC_WRITE_DIR);
store::clean_temp_dir(&old_atomic_temp_dir)?;
let builder = Fs::default()
.root(data_home)
.atomic_write_dir(&atomic_write_dir);
let object_store = ObjectStore::new(builder)
.context(error::InitBackendSnafu)?
.finish();
Ok(object_store)
}

View File

@@ -0,0 +1,46 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_base::secrets::ExposeSecret;
use common_telemetry::info;
use object_store::services::Gcs;
use object_store::{util, ObjectStore};
use snafu::prelude::*;
use crate::config::GcsConfig;
use crate::error::{self, Result};
use crate::store::build_http_client;
pub(crate) async fn new_gcs_object_store(gcs_config: &GcsConfig) -> Result<ObjectStore> {
let root = util::normalize_dir(&gcs_config.root);
info!(
"The gcs storage bucket is: {}, root is: {}",
gcs_config.bucket, &root
);
let client = build_http_client(&gcs_config.http_client);
let builder = Gcs::default()
.root(&root)
.bucket(&gcs_config.bucket)
.scope(&gcs_config.scope)
.credential_path(gcs_config.credential_path.expose_secret())
.credential(gcs_config.credential.expose_secret())
.endpoint(&gcs_config.endpoint)
.http_client(client?);
Ok(ObjectStore::new(builder)
.context(error::InitBackendSnafu)?
.finish())
}

View File

@@ -0,0 +1,45 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_base::secrets::ExposeSecret;
use common_telemetry::info;
use object_store::services::Oss;
use object_store::{util, ObjectStore};
use snafu::prelude::*;
use crate::config::OssConfig;
use crate::error::{self, Result};
use crate::store::build_http_client;
pub(crate) async fn new_oss_object_store(oss_config: &OssConfig) -> Result<ObjectStore> {
let root = util::normalize_dir(&oss_config.root);
info!(
"The oss storage bucket is: {}, root is: {}",
oss_config.bucket, &root
);
let client = build_http_client(&oss_config.http_client)?;
let builder = Oss::default()
.root(&root)
.bucket(&oss_config.bucket)
.endpoint(&oss_config.endpoint)
.access_key_id(oss_config.access_key_id.expose_secret())
.access_key_secret(oss_config.access_key_secret.expose_secret())
.http_client(client);
Ok(ObjectStore::new(builder)
.context(error::InitBackendSnafu)?
.finish())
}

View File

@@ -0,0 +1,55 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_base::secrets::ExposeSecret;
use common_telemetry::info;
use object_store::services::S3;
use object_store::{util, ObjectStore};
use snafu::prelude::*;
use crate::config::S3Config;
use crate::error::{self, Result};
use crate::store::build_http_client;
pub(crate) async fn new_s3_object_store(s3_config: &S3Config) -> Result<ObjectStore> {
let root = util::normalize_dir(&s3_config.root);
info!(
"The s3 storage bucket is: {}, root is: {}",
s3_config.bucket, &root
);
let client = build_http_client(&s3_config.http_client)?;
let mut builder = S3::default()
.root(&root)
.bucket(&s3_config.bucket)
.access_key_id(s3_config.access_key_id.expose_secret())
.secret_access_key(s3_config.secret_access_key.expose_secret())
.http_client(client);
if s3_config.endpoint.is_some() {
builder = builder.endpoint(s3_config.endpoint.as_ref().unwrap());
}
if s3_config.region.is_some() {
builder = builder.region(s3_config.region.as_ref().unwrap());
}
if s3_config.enable_virtual_host_style {
builder = builder.enable_virtual_host_style();
}
Ok(ObjectStore::new(builder)
.context(error::InitBackendSnafu)?
.finish())
}

View File

@@ -108,15 +108,11 @@ pub type MockRequestHandler =
pub type MockSetReadonlyGracefullyHandler =
Box<dyn Fn(RegionId) -> Result<SetRegionRoleStateResponse, Error> + Send + Sync>;
pub type MockGetMetadataHandler =
Box<dyn Fn(RegionId) -> Result<RegionMetadataRef, Error> + Send + Sync>;
pub struct MockRegionEngine {
sender: Sender<(RegionId, RegionRequest)>,
pub(crate) handle_request_delay: Option<Duration>,
pub(crate) handle_request_mock_fn: Option<MockRequestHandler>,
pub(crate) handle_set_readonly_gracefully_mock_fn: Option<MockSetReadonlyGracefullyHandler>,
pub(crate) handle_get_metadata_mock_fn: Option<MockGetMetadataHandler>,
pub(crate) mock_role: Option<Option<RegionRole>>,
engine: String,
}
@@ -131,7 +127,6 @@ impl MockRegionEngine {
sender: tx,
handle_request_mock_fn: None,
handle_set_readonly_gracefully_mock_fn: None,
handle_get_metadata_mock_fn: None,
mock_role: None,
engine: engine.to_string(),
}),
@@ -151,27 +146,6 @@ impl MockRegionEngine {
sender: tx,
handle_request_mock_fn: Some(mock_fn),
handle_set_readonly_gracefully_mock_fn: None,
handle_get_metadata_mock_fn: None,
mock_role: None,
engine: engine.to_string(),
}),
rx,
)
}
pub fn with_metadata_mock_fn(
engine: &str,
mock_fn: MockGetMetadataHandler,
) -> (Arc<Self>, Receiver<(RegionId, RegionRequest)>) {
let (tx, rx) = tokio::sync::mpsc::channel(8);
(
Arc::new(Self {
handle_request_delay: None,
sender: tx,
handle_request_mock_fn: None,
handle_set_readonly_gracefully_mock_fn: None,
handle_get_metadata_mock_fn: Some(mock_fn),
mock_role: None,
engine: engine.to_string(),
}),
@@ -192,7 +166,6 @@ impl MockRegionEngine {
sender: tx,
handle_request_mock_fn: None,
handle_set_readonly_gracefully_mock_fn: None,
handle_get_metadata_mock_fn: None,
mock_role: None,
engine: engine.to_string(),
};
@@ -235,11 +208,7 @@ impl RegionEngine for MockRegionEngine {
unimplemented!()
}
async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError> {
if let Some(mock_fn) = &self.handle_get_metadata_mock_fn {
return mock_fn(region_id).map_err(BoxedError::new);
};
async fn get_metadata(&self, _region_id: RegionId) -> Result<RegionMetadataRef, BoxedError> {
unimplemented!()
}

View File

@@ -31,9 +31,10 @@ pub use crate::schema::column_schema::{
ColumnSchema, FulltextAnalyzer, FulltextBackend, FulltextOptions, Metadata,
SkippingIndexOptions, SkippingIndexType, COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE,
COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_BACKEND,
COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY,
COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY, FULLTEXT_KEY, INVERTED_INDEX_KEY,
SKIPPING_INDEX_KEY, TIME_INDEX_KEY,
COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE,
COLUMN_FULLTEXT_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE,
COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY,
FULLTEXT_KEY, INVERTED_INDEX_KEY, SKIPPING_INDEX_KEY, TIME_INDEX_KEY,
};
pub use crate::schema::constraint::ColumnDefaultConstraint;
pub use crate::schema::raw::RawSchema;

View File

@@ -47,13 +47,18 @@ pub const COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE: &str = "enable";
pub const COLUMN_FULLTEXT_OPT_KEY_ANALYZER: &str = "analyzer";
pub const COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE: &str = "case_sensitive";
pub const COLUMN_FULLTEXT_OPT_KEY_BACKEND: &str = "backend";
pub const COLUMN_FULLTEXT_OPT_KEY_GRANULARITY: &str = "granularity";
pub const COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE: &str = "false_positive_rate";
/// Keys used in SKIPPING index options
pub const COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY: &str = "granularity";
pub const COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE: &str = "false_positive_rate";
pub const COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE: &str = "type";
pub const DEFAULT_GRANULARITY: u32 = 10240;
pub const DEFAULT_FALSE_POSITIVE_RATE: f64 = 0.01;
/// Schema of a column, used as an immutable struct.
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ColumnSchema {
@@ -504,7 +509,7 @@ impl TryFrom<&ColumnSchema> for Field {
}
/// Fulltext options for a column.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default, Visit, VisitMut)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Visit, VisitMut)]
#[serde(rename_all = "kebab-case")]
pub struct FulltextOptions {
/// Whether the fulltext index is enabled.
@@ -518,6 +523,92 @@ pub struct FulltextOptions {
/// The fulltext backend to use.
#[serde(default)]
pub backend: FulltextBackend,
/// The granularity of the fulltext index (for bloom backend only)
#[serde(default = "fulltext_options_default_granularity")]
pub granularity: u32,
/// The false positive rate of the fulltext index (for bloom backend only)
#[serde(default = "index_options_default_false_positive_rate_in_10000")]
pub false_positive_rate_in_10000: u32,
}
fn fulltext_options_default_granularity() -> u32 {
DEFAULT_GRANULARITY
}
fn index_options_default_false_positive_rate_in_10000() -> u32 {
(DEFAULT_FALSE_POSITIVE_RATE * 10000.0) as u32
}
impl FulltextOptions {
/// Creates a new fulltext options.
pub fn new(
enable: bool,
analyzer: FulltextAnalyzer,
case_sensitive: bool,
backend: FulltextBackend,
granularity: u32,
false_positive_rate: f64,
) -> Result<Self> {
ensure!(
0.0 < false_positive_rate && false_positive_rate <= 1.0,
error::InvalidFulltextOptionSnafu {
msg: format!(
"Invalid false positive rate: {false_positive_rate}, expected: 0.0 < rate <= 1.0"
),
}
);
ensure!(
granularity > 0,
error::InvalidFulltextOptionSnafu {
msg: format!("Invalid granularity: {granularity}, expected: positive integer"),
}
);
Ok(Self::new_unchecked(
enable,
analyzer,
case_sensitive,
backend,
granularity,
false_positive_rate,
))
}
/// Creates a new fulltext options without checking `false_positive_rate` and `granularity`.
pub fn new_unchecked(
enable: bool,
analyzer: FulltextAnalyzer,
case_sensitive: bool,
backend: FulltextBackend,
granularity: u32,
false_positive_rate: f64,
) -> Self {
Self {
enable,
analyzer,
case_sensitive,
backend,
granularity,
false_positive_rate_in_10000: (false_positive_rate * 10000.0) as u32,
}
}
/// Gets the false positive rate.
pub fn false_positive_rate(&self) -> f64 {
self.false_positive_rate_in_10000 as f64 / 10000.0
}
}
impl Default for FulltextOptions {
fn default() -> Self {
Self::new_unchecked(
false,
FulltextAnalyzer::default(),
false,
FulltextBackend::default(),
DEFAULT_GRANULARITY,
DEFAULT_FALSE_POSITIVE_RATE,
)
}
}
impl fmt::Display for FulltextOptions {
@@ -527,6 +618,10 @@ impl fmt::Display for FulltextOptions {
write!(f, ", analyzer={}", self.analyzer)?;
write!(f, ", case_sensitive={}", self.case_sensitive)?;
write!(f, ", backend={}", self.backend)?;
if self.backend == FulltextBackend::Bloom {
write!(f, ", granularity={}", self.granularity)?;
write!(f, ", false_positive_rate={}", self.false_positive_rate())?;
}
}
Ok(())
}
@@ -611,6 +706,45 @@ impl TryFrom<HashMap<String, String>> for FulltextOptions {
}
}
if fulltext_options.backend == FulltextBackend::Bloom {
// Parse granularity with default value 10240
let granularity = match options.get(COLUMN_FULLTEXT_OPT_KEY_GRANULARITY) {
Some(value) => value
.parse::<u32>()
.ok()
.filter(|&v| v > 0)
.ok_or_else(|| {
error::InvalidFulltextOptionSnafu {
msg: format!(
"Invalid granularity: {value}, expected: positive integer"
),
}
.build()
})?,
None => DEFAULT_GRANULARITY,
};
fulltext_options.granularity = granularity;
// Parse false positive rate with default value 0.01
let false_positive_rate = match options.get(COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE)
{
Some(value) => value
.parse::<f64>()
.ok()
.filter(|&v| v > 0.0 && v <= 1.0)
.ok_or_else(|| {
error::InvalidFulltextOptionSnafu {
msg: format!(
"Invalid false positive rate: {value}, expected: 0.0 < rate <= 1.0"
),
}
.build()
})?,
None => DEFAULT_FALSE_POSITIVE_RATE,
};
fulltext_options.false_positive_rate_in_10000 = (false_positive_rate * 10000.0) as u32;
}
Ok(fulltext_options)
}
}
@@ -638,23 +772,73 @@ impl fmt::Display for FulltextAnalyzer {
pub struct SkippingIndexOptions {
/// The granularity of the skip index.
pub granularity: u32,
/// The false positive rate of the skip index (in ten-thousandths, e.g., 100 = 1%).
#[serde(default = "index_options_default_false_positive_rate_in_10000")]
pub false_positive_rate_in_10000: u32,
/// The type of the skip index.
#[serde(default)]
pub index_type: SkippingIndexType,
}
impl SkippingIndexOptions {
/// Creates a new skipping index options without checking `false_positive_rate` and `granularity`.
pub fn new_unchecked(
granularity: u32,
false_positive_rate: f64,
index_type: SkippingIndexType,
) -> Self {
Self {
granularity,
false_positive_rate_in_10000: (false_positive_rate * 10000.0) as u32,
index_type,
}
}
/// Creates a new skipping index options.
pub fn new(
granularity: u32,
false_positive_rate: f64,
index_type: SkippingIndexType,
) -> Result<Self> {
ensure!(
0.0 < false_positive_rate && false_positive_rate <= 1.0,
error::InvalidSkippingIndexOptionSnafu {
msg: format!("Invalid false positive rate: {false_positive_rate}, expected: 0.0 < rate <= 1.0"),
}
);
ensure!(
granularity > 0,
error::InvalidSkippingIndexOptionSnafu {
msg: format!("Invalid granularity: {granularity}, expected: positive integer"),
}
);
Ok(Self::new_unchecked(
granularity,
false_positive_rate,
index_type,
))
}
/// Gets the false positive rate.
pub fn false_positive_rate(&self) -> f64 {
self.false_positive_rate_in_10000 as f64 / 10000.0
}
}
impl Default for SkippingIndexOptions {
fn default() -> Self {
Self {
granularity: DEFAULT_GRANULARITY,
index_type: SkippingIndexType::default(),
}
Self::new_unchecked(
DEFAULT_GRANULARITY,
DEFAULT_FALSE_POSITIVE_RATE,
SkippingIndexType::default(),
)
}
}
impl fmt::Display for SkippingIndexOptions {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "granularity={}", self.granularity)?;
write!(f, ", false_positive_rate={}", self.false_positive_rate())?;
write!(f, ", index_type={}", self.index_type)?;
Ok(())
}
@@ -681,15 +865,37 @@ impl TryFrom<HashMap<String, String>> for SkippingIndexOptions {
fn try_from(options: HashMap<String, String>) -> Result<Self> {
// Parse granularity with default value 1
let granularity = match options.get(COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY) {
Some(value) => value.parse::<u32>().map_err(|_| {
error::InvalidSkippingIndexOptionSnafu {
msg: format!("Invalid granularity: {value}, expected: positive integer"),
}
.build()
})?,
Some(value) => value
.parse::<u32>()
.ok()
.filter(|&v| v > 0)
.ok_or_else(|| {
error::InvalidSkippingIndexOptionSnafu {
msg: format!("Invalid granularity: {value}, expected: positive integer"),
}
.build()
})?,
None => DEFAULT_GRANULARITY,
};
// Parse false positive rate with default value 100
let false_positive_rate =
match options.get(COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE) {
Some(value) => value
.parse::<f64>()
.ok()
.filter(|&v| v > 0.0 && v <= 1.0)
.ok_or_else(|| {
error::InvalidSkippingIndexOptionSnafu {
msg: format!(
"Invalid false positive rate: {value}, expected: 0.0 < rate <= 1.0"
),
}
.build()
})?,
None => DEFAULT_FALSE_POSITIVE_RATE,
};
// Parse index type with default value BloomFilter
let index_type = match options.get(COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE) {
Some(typ) => match typ.to_ascii_uppercase().as_str() {
@@ -704,10 +910,11 @@ impl TryFrom<HashMap<String, String>> for SkippingIndexOptions {
None => SkippingIndexType::default(),
};
Ok(SkippingIndexOptions {
Ok(SkippingIndexOptions::new_unchecked(
granularity,
false_positive_rate,
index_type,
})
))
}
}
@@ -973,4 +1180,59 @@ mod tests {
assert!(column_schema.default_constraint.is_none());
assert!(column_schema.metadata.is_empty());
}
#[test]
fn test_skipping_index_options_deserialization() {
let original_options = "{\"granularity\":1024,\"false-positive-rate-in-10000\":10,\"index-type\":\"BloomFilter\"}";
let options = serde_json::from_str::<SkippingIndexOptions>(original_options).unwrap();
assert_eq!(1024, options.granularity);
assert_eq!(SkippingIndexType::BloomFilter, options.index_type);
assert_eq!(0.001, options.false_positive_rate());
let options_str = serde_json::to_string(&options).unwrap();
assert_eq!(options_str, original_options);
}
#[test]
fn test_skipping_index_options_deserialization_v0_14_to_v0_15() {
let options = "{\"granularity\":10240,\"index-type\":\"BloomFilter\"}";
let options = serde_json::from_str::<SkippingIndexOptions>(options).unwrap();
assert_eq!(10240, options.granularity);
assert_eq!(SkippingIndexType::BloomFilter, options.index_type);
assert_eq!(DEFAULT_FALSE_POSITIVE_RATE, options.false_positive_rate());
let options_str = serde_json::to_string(&options).unwrap();
assert_eq!(options_str, "{\"granularity\":10240,\"false-positive-rate-in-10000\":100,\"index-type\":\"BloomFilter\"}");
}
#[test]
fn test_fulltext_options_deserialization() {
let original_options = "{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false,\"backend\":\"bloom\",\"granularity\":1024,\"false-positive-rate-in-10000\":10}";
let options = serde_json::from_str::<FulltextOptions>(original_options).unwrap();
assert!(!options.case_sensitive);
assert!(options.enable);
assert_eq!(FulltextBackend::Bloom, options.backend);
assert_eq!(FulltextAnalyzer::default(), options.analyzer);
assert_eq!(1024, options.granularity);
assert_eq!(0.001, options.false_positive_rate());
let options_str = serde_json::to_string(&options).unwrap();
assert_eq!(options_str, original_options);
}
#[test]
fn test_fulltext_options_deserialization_v0_14_to_v0_15() {
// 0.14 to 0.15
let options = "{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false,\"backend\":\"bloom\"}";
let options = serde_json::from_str::<FulltextOptions>(options).unwrap();
assert!(!options.case_sensitive);
assert!(options.enable);
assert_eq!(FulltextBackend::Bloom, options.backend);
assert_eq!(FulltextAnalyzer::default(), options.analyzer);
assert_eq!(DEFAULT_GRANULARITY, options.granularity);
assert_eq!(DEFAULT_FALSE_POSITIVE_RATE, options.false_positive_rate());
let options_str = serde_json::to_string(&options).unwrap();
assert_eq!(options_str, "{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false,\"backend\":\"bloom\",\"granularity\":10240,\"false-positive-rate-in-10000\":100}");
}
}

View File

@@ -12,6 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use arrow_array::{
ArrayRef, PrimitiveArray, TimestampMicrosecondArray, TimestampMillisecondArray,
TimestampNanosecondArray, TimestampSecondArray,
};
use arrow_schema::DataType;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use paste::paste;
@@ -138,6 +143,41 @@ define_timestamp_with_unit!(Millisecond);
define_timestamp_with_unit!(Microsecond);
define_timestamp_with_unit!(Nanosecond);
pub fn timestamp_array_to_primitive(
ts_array: &ArrayRef,
) -> Option<(
PrimitiveArray<arrow_array::types::Int64Type>,
arrow::datatypes::TimeUnit,
)> {
let DataType::Timestamp(unit, _) = ts_array.data_type() else {
return None;
};
let ts_primitive = match unit {
arrow_schema::TimeUnit::Second => ts_array
.as_any()
.downcast_ref::<TimestampSecondArray>()
.unwrap()
.reinterpret_cast::<arrow_array::types::Int64Type>(),
arrow_schema::TimeUnit::Millisecond => ts_array
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.reinterpret_cast::<arrow_array::types::Int64Type>(),
arrow_schema::TimeUnit::Microsecond => ts_array
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap()
.reinterpret_cast::<arrow_array::types::Int64Type>(),
arrow_schema::TimeUnit::Nanosecond => ts_array
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap()
.reinterpret_cast::<arrow_array::types::Int64Type>(),
};
Some((ts_primitive, *unit))
}
#[cfg(test)]
mod tests {
use common_time::timezone::set_default_timezone;

View File

@@ -899,7 +899,7 @@ impl StreamingEngine {
let rows_send = self.run_available(true).await?;
let row = self.send_writeback_requests().await?;
debug!(
"Done to flush flow_id={:?} with {} input rows flushed, {} rows sended and {} output rows flushed",
"Done to flush flow_id={:?} with {} input rows flushed, {} rows sent and {} output rows flushed",
flow_id, flushed_input_rows, rows_send, row
);
Ok(row)

View File

@@ -14,7 +14,7 @@
//! Batching mode engine
use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;
use api::v1::flow::{DirtyWindowRequests, FlowResponse};
@@ -142,7 +142,7 @@ impl BatchingEngine {
let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
let src_table_names = &task.config.source_table_names;
let mut all_dirty_windows = vec![];
let mut all_dirty_windows = HashSet::new();
for src_table_name in src_table_names {
if let Some((timestamps, unit)) = group_by_table_name.get(src_table_name) {
let Some(expr) = &task.config.time_window_expr else {
@@ -155,7 +155,7 @@ impl BatchingEngine {
.context(UnexpectedSnafu {
reason: "Failed to eval start value",
})?;
all_dirty_windows.push(align_start);
all_dirty_windows.insert(align_start);
}
}
}

View File

@@ -50,7 +50,8 @@ use snafu::{ensure, OptionExt, ResultExt};
use crate::adapter::util::from_proto_to_data_type;
use crate::error::{
ArrowSnafu, DatafusionSnafu, DatatypesSnafu, ExternalSnafu, PlanSnafu, UnexpectedSnafu,
ArrowSnafu, DatafusionSnafu, DatatypesSnafu, ExternalSnafu, PlanSnafu, TimeSnafu,
UnexpectedSnafu,
};
use crate::expr::error::DataTypeSnafu;
use crate::Error;
@@ -74,6 +75,7 @@ pub struct TimeWindowExpr {
logical_expr: Expr,
df_schema: DFSchema,
eval_time_window_size: Option<std::time::Duration>,
eval_time_original: Option<Timestamp>,
}
impl std::fmt::Display for TimeWindowExpr {
@@ -106,10 +108,11 @@ impl TimeWindowExpr {
logical_expr: expr.clone(),
df_schema: df_schema.clone(),
eval_time_window_size: None,
eval_time_original: None,
};
let test_ts = DEFAULT_TEST_TIMESTAMP;
let (l, u) = zelf.eval(test_ts)?;
let time_window_size = match (l, u) {
let (lower, upper) = zelf.eval(test_ts)?;
let time_window_size = match (lower, upper) {
(Some(l), Some(u)) => u.sub(&l).map(|r| r.to_std()).transpose().map_err(|_| {
UnexpectedSnafu {
reason: format!(
@@ -121,13 +124,59 @@ impl TimeWindowExpr {
_ => None,
};
zelf.eval_time_window_size = time_window_size;
zelf.eval_time_original = lower;
Ok(zelf)
}
/// TODO(discord9): add `eval_batch` too
pub fn eval(
&self,
current: Timestamp,
) -> Result<(Option<Timestamp>, Option<Timestamp>), Error> {
fn compute_distance(time_diff_ns: i64, stride_ns: i64) -> i64 {
if stride_ns == 0 {
return time_diff_ns;
}
// a - (a % n) impl ceil to nearest n * stride
let time_delta = time_diff_ns - (time_diff_ns % stride_ns);
if time_diff_ns < 0 && time_delta != time_diff_ns {
// The origin is later than the source timestamp, round down to the previous bin
time_delta - stride_ns
} else {
time_delta
}
}
// FAST PATH: if we have eval_time_original and eval_time_window_size,
// we can compute the bounds directly
if let (Some(original), Some(window_size)) =
(self.eval_time_original, self.eval_time_window_size)
{
// date_bin align current to lower bound
let time_diff_ns = current.sub(&original).and_then(|s|s.num_nanoseconds()).with_context(||UnexpectedSnafu {
reason: format!(
"Failed to compute time difference between current {current:?} and original {original:?}"
),
})?;
let window_size_ns = window_size.as_nanos() as i64;
let distance_ns = compute_distance(time_diff_ns, window_size_ns);
let lower_bound = if distance_ns >= 0 {
original.add_duration(std::time::Duration::from_nanos(distance_ns as u64))
} else {
original.sub_duration(std::time::Duration::from_nanos((-distance_ns) as u64))
}
.context(TimeSnafu)?;
let upper_bound = lower_bound.add_duration(window_size).context(TimeSnafu)?;
return Ok((Some(lower_bound), Some(upper_bound)));
}
let lower_bound =
calc_expr_time_window_lower_bound(&self.phy_expr, &self.df_schema, current)?;
let upper_bound =

View File

@@ -26,7 +26,7 @@ use common_error::ext::BoxedError;
use common_meta::cache::{LayeredCacheRegistryRef, TableFlownodeSetCacheRef, TableRouteCacheRef};
use common_meta::ddl::ProcedureExecutorRef;
use common_meta::key::flow::FlowMetadataManagerRef;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::{Flownode, NodeManagerRef};
use common_query::Output;
@@ -37,7 +37,6 @@ use greptime_proto::v1::flow::{flow_server, FlowRequest, FlowResponse, InsertReq
use itertools::Itertools;
use operator::delete::Deleter;
use operator::insert::Inserter;
use operator::schema_helper::SchemaHelper;
use operator::statement::StatementExecutor;
use partition::manager::PartitionRuleManager;
use query::{QueryEngine, QueryEngineFactory};
@@ -547,14 +546,8 @@ impl FrontendInvoker {
name: TABLE_FLOWNODE_SET_CACHE_NAME,
})?;
let schema_helper = SchemaHelper::new(
catalog_manager.clone(),
Arc::new(TableMetadataManager::new(kv_backend.clone())),
procedure_executor.clone(),
layered_cache_registry.clone(),
);
let inserter = Arc::new(Inserter::new(
schema_helper,
catalog_manager.clone(),
partition_manager.clone(),
node_manager.clone(),
table_flownode_cache,
@@ -595,7 +588,7 @@ impl FrontendInvoker {
.start_timer();
self.inserter
.handle_row_inserts(requests, ctx, false, false)
.handle_row_inserts(requests, ctx, &self.statement_executor, false, false)
.await
.map_err(BoxedError::new)
.context(common_frontend::error::ExternalSnafu)

View File

@@ -14,6 +14,7 @@ workspace = true
[dependencies]
api.workspace = true
arc-swap = "1.0"
async-stream.workspace = true
async-trait.workspace = true
auth.workspace = true
bytes.workspace = true
@@ -49,7 +50,6 @@ log-query.workspace = true
log-store.workspace = true
meta-client.workspace = true
num_cpus.workspace = true
object-store.workspace = true
opentelemetry-proto.workspace = true
operator.workspace = true
otel-arrow-rust.workspace = true

View File

@@ -363,6 +363,12 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Canceling statement due to statement timeout"))]
StatementTimeout {
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -443,6 +449,8 @@ impl ErrorExt for Error {
Error::DataFusion { error, .. } => datafusion_status_code::<Self>(error, None),
Error::Cancelled { .. } => StatusCode::Cancelled,
Error::StatementTimeout { .. } => StatusCode::Cancelled,
}
}

View File

@@ -19,7 +19,6 @@ use common_config::config::Configurable;
use common_options::datanode::DatanodeClientOptions;
use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, TracingOptions};
use meta_client::MetaClientOptions;
use object_store::config::ObjectStoreConfig;
use query::options::QueryOptions;
use serde::{Deserialize, Serialize};
use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask};
@@ -63,7 +62,6 @@ pub struct FrontendOptions {
pub query: QueryOptions,
pub max_in_flight_write_bytes: Option<ReadableSize>,
pub slow_query: Option<SlowQueryOptions>,
pub store: ObjectStoreConfig,
}
impl Default for FrontendOptions {
@@ -90,7 +88,6 @@ impl Default for FrontendOptions {
query: QueryOptions::default(),
max_in_flight_write_bytes: None,
slow_query: Some(SlowQueryOptions::default()),
store: ObjectStoreConfig::default(),
}
}
}
@@ -119,7 +116,8 @@ impl Frontend {
if let Some(t) = self.export_metrics_task.as_ref() {
if t.send_by_handler {
let inserter = self.instance.inserter().clone();
let handler = ExportMetricHandler::new_handler(inserter);
let statement_executor = self.instance.statement_executor().clone();
let handler = ExportMetricHandler::new_handler(inserter, statement_executor);
t.start(Some(handler)).context(error::StartServerSnafu)?
} else {
t.start(None).context(error::StartServerSnafu)?;

View File

@@ -25,9 +25,11 @@ mod promql;
mod region_query;
pub mod standalone;
use std::pin::Pin;
use std::sync::Arc;
use std::time::SystemTime;
use std::time::{Duration, SystemTime};
use async_stream::stream;
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use catalog::process_manager::ProcessManagerRef;
@@ -39,20 +41,20 @@ use common_config::KvBackendConfig;
use common_error::ext::{BoxedError, ErrorExt};
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::NodeManagerRef;
use common_meta::state_store::KvStateStore;
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
use common_query::Output;
use common_recordbatch::error::StreamTimeoutSnafu;
use common_recordbatch::RecordBatchStreamWrapper;
use common_telemetry::{debug, error, info, tracing};
use datafusion_expr::LogicalPlan;
use futures::{Stream, StreamExt};
use log_store::raft_engine::RaftEngineBackend;
use operator::delete::DeleterRef;
use operator::insert::InserterRef;
use operator::schema_helper::SchemaHelper;
use operator::statement::{StatementExecutor, StatementExecutorRef};
use partition::manager::PartitionRuleManagerRef;
use pipeline::pipeline_operator::PipelineOperator;
use prometheus::HistogramTimer;
use promql_parser::label::Matcher;
@@ -61,7 +63,6 @@ use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use query::query_engine::options::{validate_catalog_and_schema, QueryOptions};
use query::query_engine::DescribeResult;
use query::QueryEngineRef;
use servers::access_layer::AccessLayerFactory;
use servers::error as server_error;
use servers::error::{AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu};
use servers::interceptor::{
@@ -69,20 +70,21 @@ use servers::interceptor::{
};
use servers::prometheus_handler::PrometheusHandler;
use servers::query_handler::sql::SqlQueryHandler;
use session::context::QueryContextRef;
use session::context::{Channel, QueryContextRef};
use session::table_name::table_idents_to_full_name;
use snafu::prelude::*;
use sql::dialect::Dialect;
use sql::parser::{ParseOptions, ParserContext};
use sql::statements::copy::{CopyDatabase, CopyTable};
use sql::statements::statement::Statement;
use sql::statements::tql::Tql;
use sqlparser::ast::ObjectName;
pub use standalone::StandaloneDatanodeManager;
use crate::error::{
self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, InvalidSqlSnafu,
ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu,
TableOperationSnafu,
StatementTimeoutSnafu, TableOperationSnafu,
};
use crate::limiter::LimiterRef;
use crate::slow_query_recorder::SlowQueryRecorder;
@@ -104,7 +106,6 @@ pub struct Instance {
slow_query_recorder: Option<SlowQueryRecorder>,
limiter: Option<LimiterRef>,
process_manager: ProcessManagerRef,
access_layer_factory: AccessLayerFactory,
}
impl Instance {
@@ -166,27 +167,6 @@ impl Instance {
pub fn process_manager(&self) -> &ProcessManagerRef {
&self.process_manager
}
pub fn create_schema_helper(&self) -> SchemaHelper {
SchemaHelper::new(
self.catalog_manager.clone(),
self.table_metadata_manager.clone(),
self.statement_executor.procedure_executor().clone(),
self.statement_executor.cache_invalidator().clone(),
)
}
pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
self.inserter.partition_manager()
}
pub fn node_manager(&self) -> &NodeManagerRef {
self.inserter.node_manager()
}
pub fn access_layer_factory(&self) -> &AccessLayerFactory {
&self.access_layer_factory
}
}
fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {
@@ -214,56 +194,7 @@ impl Instance {
Some(query_ctx.process_id()),
);
let query_fut = async {
match stmt {
Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
// TODO: remove this when format is supported in datafusion
if let Statement::Explain(explain) = &stmt {
if let Some(format) = explain.format() {
query_ctx.set_explain_format(format.to_string());
}
}
let stmt = QueryStatement::Sql(stmt);
let plan = self
.statement_executor
.plan(&stmt, query_ctx.clone())
.await?;
let QueryStatement::Sql(stmt) = stmt else {
unreachable!()
};
query_interceptor.pre_execute(&stmt, Some(&plan), query_ctx.clone())?;
self.statement_executor
.exec_plan(plan, query_ctx)
.await
.context(TableOperationSnafu)
}
Statement::Tql(tql) => {
let plan = self
.statement_executor
.plan_tql(tql.clone(), &query_ctx)
.await?;
query_interceptor.pre_execute(
&Statement::Tql(tql),
Some(&plan),
query_ctx.clone(),
)?;
self.statement_executor
.exec_plan(plan, query_ctx)
.await
.context(TableOperationSnafu)
}
_ => {
query_interceptor.pre_execute(&stmt, None, query_ctx.clone())?;
self.statement_executor
.execute_sql(stmt, query_ctx)
.await
.context(TableOperationSnafu)
}
}
};
let query_fut = self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor);
CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
.await
@@ -280,6 +211,153 @@ impl Instance {
Output { data, meta }
})
}
async fn exec_statement_with_timeout(
&self,
stmt: Statement,
query_ctx: QueryContextRef,
query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
) -> Result<Output> {
let timeout = derive_timeout(&stmt, &query_ctx);
match timeout {
Some(timeout) => {
let start = tokio::time::Instant::now();
let output = tokio::time::timeout(
timeout,
self.exec_statement(stmt, query_ctx, query_interceptor),
)
.await
.map_err(|_| StatementTimeoutSnafu.build())??;
// compute remaining timeout
let remaining_timeout = timeout.checked_sub(start.elapsed()).unwrap_or_default();
attach_timeout(output, remaining_timeout)
}
None => {
self.exec_statement(stmt, query_ctx, query_interceptor)
.await
}
}
}
async fn exec_statement(
&self,
stmt: Statement,
query_ctx: QueryContextRef,
query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
) -> Result<Output> {
match stmt {
Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
// TODO: remove this when format is supported in datafusion
if let Statement::Explain(explain) = &stmt {
if let Some(format) = explain.format() {
query_ctx.set_explain_format(format.to_string());
}
}
self.plan_and_exec_sql(stmt, &query_ctx, query_interceptor)
.await
}
Statement::Tql(tql) => {
self.plan_and_exec_tql(&query_ctx, query_interceptor, tql)
.await
}
_ => {
query_interceptor.pre_execute(&stmt, None, query_ctx.clone())?;
self.statement_executor
.execute_sql(stmt, query_ctx)
.await
.context(TableOperationSnafu)
}
}
}
async fn plan_and_exec_sql(
&self,
stmt: Statement,
query_ctx: &QueryContextRef,
query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
) -> Result<Output> {
let stmt = QueryStatement::Sql(stmt);
let plan = self
.statement_executor
.plan(&stmt, query_ctx.clone())
.await?;
let QueryStatement::Sql(stmt) = stmt else {
unreachable!()
};
query_interceptor.pre_execute(&stmt, Some(&plan), query_ctx.clone())?;
self.statement_executor
.exec_plan(plan, query_ctx.clone())
.await
.context(TableOperationSnafu)
}
async fn plan_and_exec_tql(
&self,
query_ctx: &QueryContextRef,
query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
tql: Tql,
) -> Result<Output> {
let plan = self
.statement_executor
.plan_tql(tql.clone(), query_ctx)
.await?;
query_interceptor.pre_execute(&Statement::Tql(tql), Some(&plan), query_ctx.clone())?;
self.statement_executor
.exec_plan(plan, query_ctx.clone())
.await
.context(TableOperationSnafu)
}
}
/// If the relevant variables are set, the timeout is enforced for all PostgreSQL statements.
/// For MySQL, it applies only to read-only statements.
fn derive_timeout(stmt: &Statement, query_ctx: &QueryContextRef) -> Option<Duration> {
let query_timeout = query_ctx.query_timeout()?;
if query_timeout.is_zero() {
return None;
}
match query_ctx.channel() {
Channel::Mysql if stmt.is_readonly() => Some(query_timeout),
Channel::Postgres => Some(query_timeout),
_ => None,
}
}
fn attach_timeout(output: Output, mut timeout: Duration) -> Result<Output> {
if timeout.is_zero() {
return StatementTimeoutSnafu.fail();
}
let output = match output.data {
OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
OutputData::Stream(mut stream) => {
let schema = stream.schema();
let s = Box::pin(stream! {
let mut start = tokio::time::Instant::now();
while let Some(item) = tokio::time::timeout(timeout, stream.next()).await.map_err(|_| StreamTimeoutSnafu.build())? {
yield item;
let now = tokio::time::Instant::now();
timeout = timeout.checked_sub(now - start).unwrap_or(Duration::ZERO);
start = now;
// tokio::time::timeout may not return an error immediately when timeout is 0.
if timeout.is_zero() {
StreamTimeoutSnafu.fail()?;
}
}
}) as Pin<Box<dyn Stream<Item = _> + Send>>;
let stream = RecordBatchStreamWrapper {
schema,
stream: s,
output_ordering: None,
metrics: Default::default(),
};
Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
}
};
Ok(output)
}
#[async_trait]
@@ -302,6 +380,13 @@ impl SqlQueryHandler for Instance {
.and_then(|stmts| query_interceptor.post_parsing(stmts, query_ctx.clone()))
{
Ok(stmts) => {
if stmts.is_empty() {
return vec![InvalidSqlSnafu {
err_msg: "empty statements",
}
.fail()];
}
let mut results = Vec::with_capacity(stmts.len());
for stmt in stmts {
if let Err(e) = checker
@@ -539,8 +624,6 @@ pub fn check_permission(
| Statement::AlterDatabase(_)
| Statement::DropFlow(_)
| Statement::Use(_) => {}
#[cfg(feature = "enterprise")]
Statement::DropTrigger(_) => {}
Statement::ShowCreateDatabase(stmt) => {
validate_database(&stmt.database_name, query_ctx)?;
}
@@ -644,8 +727,6 @@ pub fn check_permission(
Statement::FetchCursor(_) | Statement::CloseCursor(_) => {}
// User can only kill process in their own catalog.
Statement::Kill(_) => {}
// SHOW PROCESSLIST
Statement::ShowProcesslist(_) => {}
}
Ok(())
}

View File

@@ -30,14 +30,12 @@ use operator::flow::FlowServiceOperator;
use operator::insert::Inserter;
use operator::procedure::ProcedureServiceOperator;
use operator::request::Requester;
use operator::schema_helper::SchemaHelper;
use operator::statement::{StatementExecutor, StatementExecutorRef};
use operator::table::TableMutationOperator;
use partition::manager::PartitionRuleManager;
use pipeline::pipeline_operator::PipelineOperator;
use query::region_query::RegionQueryHandlerFactoryRef;
use query::QueryEngineFactory;
use servers::access_layer::AccessLayerFactory;
use snafu::OptionExt;
use crate::error::{self, Result};
@@ -132,15 +130,8 @@ impl FrontendBuilder {
name: TABLE_FLOWNODE_SET_CACHE_NAME,
})?;
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
let schema_helper = SchemaHelper::new(
self.catalog_manager.clone(),
table_metadata_manager.clone(),
self.procedure_executor.clone(),
local_cache_invalidator.clone(),
);
let inserter = Arc::new(Inserter::new(
schema_helper,
self.catalog_manager.clone(),
partition_manager.clone(),
node_manager.clone(),
table_flownode_cache,
@@ -185,7 +176,7 @@ impl FrontendBuilder {
self.catalog_manager.clone(),
query_engine.clone(),
self.procedure_executor,
kv_backend,
kv_backend.clone(),
local_cache_invalidator,
inserter.clone(),
table_route_cache,
@@ -220,7 +211,6 @@ impl FrontendBuilder {
Arc::new(Limiter::new(max_in_flight_write_bytes.as_bytes()))
});
let access_layer_factory = AccessLayerFactory::new(&self.options.store).await.unwrap();
Ok(Instance {
catalog_manager: self.catalog_manager,
pipeline_operator,
@@ -229,11 +219,10 @@ impl FrontendBuilder {
plugins,
inserter,
deleter,
table_metadata_manager,
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)),
slow_query_recorder,
limiter,
process_manager,
access_layer_factory,
})
}
}

View File

@@ -408,7 +408,7 @@ impl Instance {
ctx: QueryContextRef,
) -> Result<Output> {
self.inserter
.handle_column_inserts(requests, ctx)
.handle_column_inserts(requests, ctx, self.statement_executor.as_ref())
.await
.context(TableOperationSnafu)
}
@@ -422,7 +422,13 @@ impl Instance {
is_single_value: bool,
) -> Result<Output> {
self.inserter
.handle_row_inserts(requests, ctx, accommodate_existing_schema, is_single_value)
.handle_row_inserts(
requests,
ctx,
self.statement_executor.as_ref(),
accommodate_existing_schema,
is_single_value,
)
.await
.context(TableOperationSnafu)
}
@@ -435,7 +441,10 @@ impl Instance {
) -> Result<Output> {
self.inserter
.handle_last_non_null_inserts(
requests, ctx, true,
requests,
ctx,
self.statement_executor.as_ref(),
true,
// Influx protocol may writes multiple fields (values).
false,
)
@@ -451,7 +460,7 @@ impl Instance {
physical_table: String,
) -> Result<Output> {
self.inserter
.handle_metric_row_inserts(requests, ctx, physical_table)
.handle_metric_row_inserts(requests, ctx, &self.statement_executor, physical_table)
.await
.context(TableOperationSnafu)
}

View File

@@ -135,7 +135,7 @@ impl Instance {
};
self.inserter
.handle_log_inserts(log, ctx)
.handle_log_inserts(log, ctx, self.statement_executor.as_ref())
.await
.map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu)
@@ -157,7 +157,7 @@ impl Instance {
};
self.inserter
.handle_trace_inserts(rows, ctx)
.handle_trace_inserts(rows, ctx, self.statement_executor.as_ref())
.await
.map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu)

View File

@@ -28,6 +28,7 @@ use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::{debug, tracing};
use operator::insert::InserterRef;
use operator::statement::StatementExecutor;
use prost::Message;
use servers::error::{self, AuthSnafu, InFlightWriteBytesExceededSnafu, Result as ServerResult};
use servers::http::header::{collect_plan_metrics, CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF};
@@ -270,11 +271,18 @@ impl PromStoreProtocolHandler for Instance {
/// so only implement `PromStoreProtocolHandler::write` method.
pub struct ExportMetricHandler {
inserter: InserterRef,
statement_executor: Arc<StatementExecutor>,
}
impl ExportMetricHandler {
pub fn new_handler(inserter: InserterRef) -> PromStoreProtocolHandlerRef {
Arc::new(Self { inserter })
pub fn new_handler(
inserter: InserterRef,
statement_executor: Arc<StatementExecutor>,
) -> PromStoreProtocolHandlerRef {
Arc::new(Self {
inserter,
statement_executor,
})
}
}
@@ -287,7 +295,12 @@ impl PromStoreProtocolHandler for ExportMetricHandler {
_: bool,
) -> ServerResult<Output> {
self.inserter
.handle_metric_row_inserts(request, ctx, GREPTIME_PHYSICAL_TABLE.to_string())
.handle_metric_row_inserts(
request,
ctx,
&self.statement_executor,
GREPTIME_PHYSICAL_TABLE.to_string(),
)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)

View File

@@ -24,7 +24,6 @@ use servers::grpc::frontend_grpc_handler::FrontendGrpcHandler;
use servers::grpc::greptime_handler::GreptimeRequestHandler;
use servers::grpc::{GrpcOptions, GrpcServer};
use servers::http::event::LogValidatorRef;
use servers::http::prom_store::{PromBulkState, PromStoreState};
use servers::http::{HttpServer, HttpServerBuilder};
use servers::interceptor::LogIngestInterceptorRef;
use servers::metrics_handler::MetricsHandler;
@@ -96,30 +95,13 @@ where
}
if opts.prom_store.enable {
let bulk_state = if opts.prom_store.bulk_mode {
let mut state = PromBulkState {
schema_helper: self.instance.create_schema_helper(),
partition_manager: self.instance.partition_manager().clone(),
node_manager: self.instance.node_manager().clone(),
access_layer_factory: self.instance.access_layer_factory().clone(),
tx: None,
};
state.start_background_task();
Some(state)
} else {
None
};
let state = PromStoreState {
prom_store_handler: self.instance.clone(),
pipeline_handler: Some(self.instance.clone()),
prom_store_with_metric_engine: opts.prom_store.with_metric_engine,
prom_validation_mode: opts.http.prom_validation_mode,
bulk_state,
};
builder = builder
.with_prom_handler(state)
.with_prom_handler(
self.instance.clone(),
Some(self.instance.clone()),
opts.prom_store.with_metric_engine,
opts.http.prom_validation_mode,
)
.with_prometheus_handler(self.instance.clone());
}

View File

@@ -18,7 +18,6 @@ use serde::{Deserialize, Serialize};
pub struct PromStoreOptions {
pub enable: bool,
pub with_metric_engine: bool,
pub bulk_mode: bool,
}
impl Default for PromStoreOptions {
@@ -26,7 +25,6 @@ impl Default for PromStoreOptions {
Self {
enable: true,
with_metric_engine: true,
bulk_mode: false,
}
}
}
@@ -39,7 +37,6 @@ mod tests {
fn test_prom_store_options() {
let default = PromStoreOptions::default();
assert!(default.enable);
assert!(default.with_metric_engine);
assert!(!default.bulk_mode);
assert!(default.with_metric_engine)
}
}

View File

@@ -233,7 +233,7 @@ impl SlowQueryEventHandler {
.into();
self.inserter
.handle_row_inserts(requests, query_ctx, false, false)
.handle_row_inserts(requests, query_ctx, &self.statement_executor, false, false)
.await
.context(TableOperationSnafu)?;

View File

@@ -218,6 +218,7 @@ mod tests {
let mut writer = Cursor::new(Vec::new());
let mut creator = BloomFilterCreator::new(
4,
0.01,
Arc::new(MockExternalTempFileProvider::new()),
Arc::new(AtomicUsize::new(0)),
None,

View File

@@ -30,9 +30,6 @@ use crate::bloom_filter::SEED;
use crate::external_provider::ExternalTempFileProvider;
use crate::Bytes;
/// The false positive rate of the Bloom filter.
pub const FALSE_POSITIVE_RATE: f64 = 0.01;
/// `BloomFilterCreator` is responsible for creating and managing bloom filters
/// for a set of elements. It divides the rows into segments and creates
/// bloom filters for each segment.
@@ -79,6 +76,7 @@ impl BloomFilterCreator {
/// `rows_per_segment` <= 0
pub fn new(
rows_per_segment: usize,
false_positive_rate: f64,
intermediate_provider: Arc<dyn ExternalTempFileProvider>,
global_memory_usage: Arc<AtomicUsize>,
global_memory_usage_threshold: Option<usize>,
@@ -95,6 +93,7 @@ impl BloomFilterCreator {
cur_seg_distinct_elems_mem_usage: 0,
global_memory_usage: global_memory_usage.clone(),
finalized_bloom_filters: FinalizedBloomFilterStorage::new(
false_positive_rate,
intermediate_provider,
global_memory_usage,
global_memory_usage_threshold,
@@ -263,6 +262,7 @@ mod tests {
let mut writer = Cursor::new(Vec::new());
let mut creator = BloomFilterCreator::new(
2,
0.01,
Arc::new(MockExternalTempFileProvider::new()),
Arc::new(AtomicUsize::new(0)),
None,
@@ -337,6 +337,7 @@ mod tests {
let mut writer = Cursor::new(Vec::new());
let mut creator: BloomFilterCreator = BloomFilterCreator::new(
2,
0.01,
Arc::new(MockExternalTempFileProvider::new()),
Arc::new(AtomicUsize::new(0)),
None,
@@ -418,6 +419,7 @@ mod tests {
let mut writer = Cursor::new(Vec::new());
let mut creator = BloomFilterCreator::new(
2,
0.01,
Arc::new(MockExternalTempFileProvider::new()),
Arc::new(AtomicUsize::new(0)),
None,

View File

@@ -23,7 +23,7 @@ use futures::{stream, AsyncWriteExt, Stream};
use snafu::ResultExt;
use crate::bloom_filter::creator::intermediate_codec::IntermediateBloomFilterCodecV1;
use crate::bloom_filter::creator::{FALSE_POSITIVE_RATE, SEED};
use crate::bloom_filter::creator::SEED;
use crate::bloom_filter::error::{IntermediateSnafu, IoSnafu, Result};
use crate::external_provider::ExternalTempFileProvider;
use crate::Bytes;
@@ -33,6 +33,9 @@ const MIN_MEMORY_USAGE_THRESHOLD: usize = 1024 * 1024; // 1MB
/// Storage for finalized Bloom filters.
pub struct FinalizedBloomFilterStorage {
/// The false positive rate of the Bloom filter.
false_positive_rate: f64,
/// Indices of the segments in the sequence of finalized Bloom filters.
segment_indices: Vec<usize>,
@@ -65,12 +68,14 @@ pub struct FinalizedBloomFilterStorage {
impl FinalizedBloomFilterStorage {
/// Creates a new `FinalizedBloomFilterStorage`.
pub fn new(
false_positive_rate: f64,
intermediate_provider: Arc<dyn ExternalTempFileProvider>,
global_memory_usage: Arc<AtomicUsize>,
global_memory_usage_threshold: Option<usize>,
) -> Self {
let external_prefix = format!("intm-bloom-filters-{}", uuid::Uuid::new_v4());
Self {
false_positive_rate,
segment_indices: Vec::new(),
in_memory: Vec::new(),
intermediate_file_id_counter: 0,
@@ -96,7 +101,7 @@ impl FinalizedBloomFilterStorage {
elems: impl IntoIterator<Item = Bytes>,
element_count: usize,
) -> Result<()> {
let mut bf = BloomFilter::with_false_pos(FALSE_POSITIVE_RATE)
let mut bf = BloomFilter::with_false_pos(self.false_positive_rate)
.seed(&SEED)
.expected_items(element_count);
for elem in elems.into_iter() {
@@ -284,6 +289,7 @@ mod tests {
let global_memory_usage_threshold = Some(1024 * 1024); // 1MB
let provider = Arc::new(mock_provider);
let mut storage = FinalizedBloomFilterStorage::new(
0.01,
provider,
global_memory_usage.clone(),
global_memory_usage_threshold,
@@ -340,6 +346,7 @@ mod tests {
let global_memory_usage_threshold = Some(1024 * 1024); // 1MB
let provider = Arc::new(mock_provider);
let mut storage = FinalizedBloomFilterStorage::new(
0.01,
provider,
global_memory_usage.clone(),
global_memory_usage_threshold,

View File

@@ -222,6 +222,7 @@ mod tests {
let mut writer = Cursor::new(vec![]);
let mut creator = BloomFilterCreator::new(
2,
0.01,
Arc::new(MockExternalTempFileProvider::new()),
Arc::new(AtomicUsize::new(0)),
None,

View File

@@ -45,6 +45,7 @@ impl BloomFilterFulltextIndexCreator {
pub fn new(
config: Config,
rows_per_segment: usize,
false_positive_rate: f64,
intermediate_provider: Arc<dyn ExternalTempFileProvider>,
global_memory_usage: Arc<AtomicUsize>,
global_memory_usage_threshold: Option<usize>,
@@ -57,6 +58,7 @@ impl BloomFilterFulltextIndexCreator {
let inner = BloomFilterCreator::new(
rows_per_segment,
false_positive_rate,
intermediate_provider,
global_memory_usage,
global_memory_usage_threshold,

View File

@@ -54,6 +54,14 @@ pub enum Error {
peer_id: u64,
},
#[snafu(display("Failed to lookup peer: {}", peer_id))]
LookupPeer {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
peer_id: u64,
},
#[snafu(display("Another migration procedure is running for region: {}", region_id))]
MigrationRunning {
#[snafu(implicit)]
@@ -1025,6 +1033,7 @@ impl ErrorExt for Error {
}
Error::Other { source, .. } => source.status_code(),
Error::LookupPeer { source, .. } => source.status_code(),
Error::NoEnoughAvailableNode { .. } => StatusCode::RuntimeResourcesExhausted,
#[cfg(feature = "pg_kvbackend")]

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use api::v1::meta::{HeartbeatRequest, Peer, Role};
use common_meta::instruction::CacheIdent;
use common_meta::key::node_address::{NodeAddressKey, NodeAddressValue};
use common_meta::key::{MetadataKey, MetadataValue};
use common_meta::rpc::store::PutRequest;
@@ -80,7 +81,19 @@ async fn rewrite_node_address(ctx: &mut Context, peer: &Peer) {
match ctx.leader_cached_kv_backend.put(put).await {
Ok(_) => {
info!("Successfully updated flow `NodeAddressValue`: {:?}", peer);
// TODO(discord): broadcast invalidating cache to all frontends
// broadcast invalidating cache to all frontends
let cache_idents = vec![CacheIdent::FlowNodeAddressChange(peer.id)];
info!(
"Invalidate flow node cache for new address with cache idents: {:?}",
cache_idents
);
if let Err(e) = ctx
.cache_invalidator
.invalidate(&Default::default(), &cache_idents)
.await
{
error!(e; "Failed to invalidate {} `NodeAddressKey` cache, peer: {:?}", cache_idents.len(), peer);
}
}
Err(e) => {
error!(e; "Failed to update flow `NodeAddressValue`: {:?}", peer);

View File

@@ -110,14 +110,6 @@ pub struct MetasrvOptions {
pub use_memory_store: bool,
/// Whether to enable region failover.
pub enable_region_failover: bool,
/// Delay before initializing region failure detectors.
///
/// This delay helps prevent premature initialization of region failure detectors in cases where
/// cluster maintenance mode is enabled right after metasrv starts, especially when the cluster
/// is not deployed via the recommended GreptimeDB Operator. Without this delay, early detector registration
/// may trigger unnecessary region failovers during datanode startup.
#[serde(with = "humantime_serde")]
pub region_failure_detector_initialization_delay: Duration,
/// Whether to allow region failover on local WAL.
///
/// If it's true, the region failover will be allowed even if the local WAL is used.
@@ -227,7 +219,6 @@ impl Default for MetasrvOptions {
selector: SelectorType::default(),
use_memory_store: false,
enable_region_failover: false,
region_failure_detector_initialization_delay: Duration::from_secs(10 * 60),
allow_region_failover_on_local_wal: false,
grpc: GrpcOptions {
bind_addr: format!("127.0.0.1:{}", DEFAULT_METASRV_ADDR_PORT),

View File

@@ -64,7 +64,7 @@ use crate::procedure::wal_prune::manager::{WalPruneManager, WalPruneTicker};
use crate::procedure::wal_prune::Context as WalPruneContext;
use crate::region::supervisor::{
HeartbeatAcceptor, RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorSelector,
RegionSupervisorTicker, DEFAULT_INITIALIZATION_RETRY_PERIOD, DEFAULT_TICK_INTERVAL,
RegionSupervisorTicker, DEFAULT_TICK_INTERVAL,
};
use crate::selector::lease_based::LeaseBasedSelector;
use crate::selector::round_robin::RoundRobinSelector;
@@ -299,8 +299,6 @@ impl MetasrvBuilder {
Arc::new(RegionFailureDetectorControl::new(tx.clone())) as _,
Some(Arc::new(RegionSupervisorTicker::new(
DEFAULT_TICK_INTERVAL,
options.region_failure_detector_initialization_delay,
DEFAULT_INITIALIZATION_RETRY_PERIOD,
tx.clone(),
))),
)
@@ -343,7 +341,6 @@ impl MetasrvBuilder {
region_migration_manager.clone(),
maintenance_mode_manager.clone(),
peer_lookup_service.clone(),
leader_cached_kv_backend.clone(),
);
Some(RegionFailureHandler::new(

View File

@@ -23,7 +23,7 @@ use common_meta::key::table_route::TableRouteValue;
use common_meta::peer::Peer;
use common_meta::rpc::router::RegionRoute;
use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId};
use common_telemetry::{error, info, warn};
use common_telemetry::{error, info};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionId;
use table::table_name::TableName;
@@ -253,12 +253,10 @@ impl RegionMigrationManager {
}
/// Throws an error if `leader_peer` is not the `from_peer`.
///
/// If `from_peer` is unknown, use the leader peer as the `from_peer`.
fn verify_region_leader_peer(
&self,
region_route: &RegionRoute,
task: &mut RegionMigrationProcedureTask,
task: &RegionMigrationProcedureTask,
) -> Result<()> {
let leader_peer = region_route
.leader_peer
@@ -277,15 +275,6 @@ impl RegionMigrationManager {
}
);
if task.from_peer.addr.is_empty() {
warn!(
"The `from_peer` is unknown, use the leader peer({}) as the `from_peer`, region: {}",
leader_peer, task.region_id
);
// The peer id is the same as the leader peer id.
task.from_peer = leader_peer.clone();
}
Ok(())
}
@@ -311,7 +300,7 @@ impl RegionMigrationManager {
/// Submits a new region migration procedure.
pub async fn submit_procedure(
&self,
mut task: RegionMigrationProcedureTask,
task: RegionMigrationProcedureTask,
) -> Result<Option<ProcedureId>> {
let Some(guard) = self.insert_running_procedure(&task) else {
return error::MigrationRunningSnafu {
@@ -344,7 +333,7 @@ impl RegionMigrationManager {
.fail();
}
self.verify_region_leader_peer(&region_route, &mut task)?;
self.verify_region_leader_peer(&region_route, &task)?;
self.verify_region_follower_peers(&region_route, &task)?;
let table_info = self.retrieve_table_info(region_id).await?;
let TableName {
@@ -352,6 +341,12 @@ impl RegionMigrationManager {
schema_name,
..
} = table_info.table_name();
METRIC_META_REGION_MIGRATION_DATANODES
.with_label_values(&["src", &task.from_peer.id.to_string()])
.inc();
METRIC_META_REGION_MIGRATION_DATANODES
.with_label_values(&["desc", &task.to_peer.id.to_string()])
.inc();
let RegionMigrationProcedureTask {
region_id,
from_peer,
@@ -382,12 +377,6 @@ impl RegionMigrationManager {
return;
}
};
METRIC_META_REGION_MIGRATION_DATANODES
.with_label_values(&["src", &task.from_peer.id.to_string()])
.inc();
METRIC_META_REGION_MIGRATION_DATANODES
.with_label_values(&["desc", &task.to_peer.id.to_string()])
.inc();
if let Err(e) = watcher::wait(watcher).await {
error!(e; "Failed to wait region migration procedure {procedure_id} for {task}");

View File

@@ -103,7 +103,6 @@ pub mod mock {
}),
affected_rows: 0,
extensions: Default::default(),
metadata: Vec::new(),
})
}
}

Some files were not shown because too many files have changed in this diff Show More