Compare commits

...

222 Commits

Author SHA1 Message Date
ZonaHe
9038e1b769 feat: update dashboard to v0.4.10 (#3663)
Co-authored-by: ZonaHex <ZonaHex@users.noreply.github.com>
2024-04-08 16:35:41 +08:00
JeremyHi
12286f07ac feat: cluster information (#3631)
* chore: keep the same method order in KvBackend

* feat: make meta client can get all node info of cluster

* feat: cluster info data model

* feat: frontend and datanode info

* feat: list node info

* chore: remove the method: is_started

* fix: scan key prefix

* chore: impl From for NodeInfoKey

* chore: doc for trait and struct

* chore: reuse the error

* chore: refactor two collec cluster info handlers

* chore: remove inline

* chore: refactor two collec cluster info handlers
2024-04-08 07:48:36 +00:00
tison
e920f95902 refactor: drop Table trait (#3654)
* refactor: drop Table trait

Signed-off-by: tison <wander4096@gmail.com>

* finish rename

Signed-off-by: tison <wander4096@gmail.com>

* Apply suggestions from code review

Co-authored-by: Zhenchi <zhongzc_arch@outlook.com>

* Update time_range_filter_test.rs

* Update src/query/src/tests/time_range_filter_test.rs

* apply comments

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
Co-authored-by: Zhenchi <zhongzc_arch@outlook.com>
2024-04-08 07:28:55 +00:00
Yohan Wal
c4798d1913 refactor: move create database to procedure (#3626)
* refactor: move create database to procedure

* feat: enable database creation of rpc

* chore: update the commit hash of greptime-proto
2024-04-08 07:05:55 +00:00
Ruihang Xia
2ede968c2b chore: bump version to 0.7.2 (#3658)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-04-08 06:33:29 +00:00
Yingwen
89db8c18c8 feat: Add timers to more mito methods (#3659)
* feat: add timers for more mito methods

* refactor: combine methods to get type name
2024-04-08 05:53:34 +00:00
LFC
aa0af6135d chore: add manifest related metrics (#3634)
* chore: add two manifest related metrics

* Update src/mito2/src/manifest/manager.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

* Update src/mito2/src/metrics.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

* fix: resolve PR comments

* update cargo lock

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
2024-04-08 05:53:08 +00:00
dennis zhuang
87e0189e58 fix!: columns table in information_schema misses some columns (#3639)
* fix: columns table in information_schema misses some columns

* fix: test_information_schema_dot_columns

* fix: fuzz test

* feat: adds srs_id and refactor some columns with constant vector

* fix: test_information_schema_dot_columns

* chore: update comment

Co-authored-by: JeremyHi <jiachun_feng@proton.me>

* build(deps): bump h2 from 0.3.24 to 0.3.26 (#3642)

Bumps [h2](https://github.com/hyperium/h2) from 0.3.24 to 0.3.26.
- [Release notes](https://github.com/hyperium/h2/releases)
- [Changelog](https://github.com/hyperium/h2/blob/v0.3.26/CHANGELOG.md)
- [Commits](https://github.com/hyperium/h2/compare/v0.3.24...v0.3.26)

---
updated-dependencies:
- dependency-name: h2
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* build(deps): bump whoami from 1.4.1 to 1.5.1 (#3643)

Bumps [whoami](https://github.com/ardaku/whoami) from 1.4.1 to 1.5.1.
- [Changelog](https://github.com/ardaku/whoami/blob/v1/CHANGELOG.md)
- [Commits](https://github.com/ardaku/whoami/compare/v1.4.1...v1.5.1)

---
updated-dependencies:
- dependency-name: whoami
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* feat: adding victoriametrics remote write (#3641)

* feat: adding victoria metrics remote write

* test: add e2e tests for prom and vm remote writes

* fix: construct correct pk list with pre-existing pk (#3614)

* fix: construct correct pk list with pre-existing pk

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update UT

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* test(sqlness): release databases after tests (#3648)

* refactor: rename Greptime_Type to Greptime_type

---------

Signed-off-by: dependabot[bot] <support@github.com>
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: JeremyHi <jiachun_feng@proton.me>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Ning Sun <sunng@protonmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Weny Xu <wenymedia@gmail.com>
2024-04-08 03:20:49 +00:00
tison
7e8e9aba9d chore: generate release notes with git-cliff (#3650)
* chore: generate release notes with git-cliff

Signed-off-by: tison <wander4096@gmail.com>

* chore: newlines

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
2024-04-08 03:09:35 +00:00
tison
c93b76ae5f ci: bump license header checker action version (#3655) 2024-04-08 10:38:03 +08:00
Weny Xu
097a0371dc test(sqlness): release databases after tests (#3648) 2024-04-07 09:35:34 +00:00
Ruihang Xia
b9890ab870 fix: construct correct pk list with pre-existing pk (#3614)
* fix: construct correct pk list with pre-existing pk

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update UT

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-04-07 08:11:52 +00:00
Ning Sun
b32e0bba9c feat: adding victoriametrics remote write (#3641)
* feat: adding victoria metrics remote write

* test: add e2e tests for prom and vm remote writes
2024-04-07 07:09:21 +00:00
dependabot[bot]
fe1a0109d8 build(deps): bump whoami from 1.4.1 to 1.5.1 (#3643)
Bumps [whoami](https://github.com/ardaku/whoami) from 1.4.1 to 1.5.1.
- [Changelog](https://github.com/ardaku/whoami/blob/v1/CHANGELOG.md)
- [Commits](https://github.com/ardaku/whoami/compare/v1.4.1...v1.5.1)

---
updated-dependencies:
- dependency-name: whoami
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-04-05 19:02:36 -07:00
dependabot[bot]
11995eb52e build(deps): bump h2 from 0.3.24 to 0.3.26 (#3642)
Bumps [h2](https://github.com/hyperium/h2) from 0.3.24 to 0.3.26.
- [Release notes](https://github.com/hyperium/h2/releases)
- [Changelog](https://github.com/hyperium/h2/blob/v0.3.26/CHANGELOG.md)
- [Commits](https://github.com/hyperium/h2/compare/v0.3.24...v0.3.26)

---
updated-dependencies:
- dependency-name: h2
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-04-05 19:02:19 -07:00
dimbtp
86d377d028 fix: move object store read/write timer into inner (#3627)
* fix: move object store read/write timer into inner

* add Drop for PrometheusMetricWrapper

* call await on async read/write

* apply review comments

* git rid of option on timer
2024-04-03 12:24:34 +00:00
Lei, HUANG
ddeb73fbb7 fix: mistakely removes compaction inputs on failure (#3635)
* fix: mistakely removes compaction inputs on failure

* test: add test for compaction failure

---------

Co-authored-by: evenyag <realevenyag@gmail.com>
2024-04-03 11:54:20 +00:00
niebayes
d33435fa84 feat: introduce wal benchmarker (#3446)
* feat: introduce wal benchmarker

* chore: add log store metrics

* chore: add some comments to wal benchmarker

* fix: ci

* chore: add more metrics for kafka logstore

* chore: add more timers for kafka logstore

* chore: add more configs

* chore: move humantime to common dependencies

* refactor: refactor wal benchmarker

* fix: apply suggestions from code review

* doc: add a simple README for wal benchmarker

* fix: Cargo.toml

* fix: clippy

* chore: rename wal.rs to wal_bench.rs

* fix: compile
2024-04-03 03:16:05 +00:00
Weny Xu
a0f243c128 feat(procedure): enable auto split large value (#3628)
* chore: add comments

* chore: remove `pub`

* chore: rename to `merge_multiple_values`

* chore: fix typo

* feat(procedure): enable auto split large value

* chore: apply suggestions from CR

* chore: rename to `max_metadata_value_size`

* chore: remove the NoneAsEmptyString

* chore: set default max_metadata_value_size to 1500KiB
2024-04-02 12:13:59 +00:00
JeremyHi
a61fb98e4a refactor: alter logical tables (#3618)
* refactor: on prepare

* refactor: on create regions

* refactor: update metadata
2024-04-02 06:21:34 +00:00
Weny Xu
6c316d268f feat(procedure): auto split large value to multiple values (#3605)
* feat: implement MultipleValuesStream

* refactor: move KeySet to common-procedure

* refactor: move MultipleValuesStream to common-procedure

* refactor: refactor String to KeySet

* fix: fix dropping `collecting` unexpectedly

* fix: fix typo

* refactor: add the fast path of put

* refactor: remove `single_value_collector`

* refactor: use `extend` instead of `push`

* test: add more tests for `KvStateStore`

* test(etcd_store): add more tests for `KvStateStore`

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* refactor: refactor with async_stream

* Update src/common/procedure/src/store/util.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
2024-04-01 12:04:29 +00:00
Lei, HUANG
5e24448b96 feat: reject invalid timestamp ranges in copy statement (#3623)
* chore: reject invalid timestamp ranges in copy statement

* tests: add unit tests
2024-04-01 08:25:31 +00:00
JohnsonLee
d6b2d1dfb8 feat: Support outputting various date styles for postgresql (#3602)
* test: add integration_test for datetime style

* feat: support various datestyle for postgres

* doc: rewrite the comment about merge_datestyle_value

* test: add more test to illustrate valid datestyle input
2024-04-01 07:31:36 +00:00
Yingwen
bfd32571d9 fix: run purge jobs in another scheduler (#3621) 2024-04-01 03:18:14 +00:00
JeremyHi
0eb023bb23 feat: group requests by peer (#3619) 2024-04-01 03:10:22 +00:00
dennis zhuang
4a5bb698a9 feat: impl show index and show columns (#3577)
* feat: impl show index and show columns

* fix: show index from database

* fix: canonicalize table name

* refactor: show parsers
2024-03-29 18:34:52 +00:00
Eugene Tolbakov
18d676802a feat(function): add timestamp epoch integer support for to_timezone (#3620)
* feat(function): add timestamp epoch integer support for to_timezone

* chore: fmt
2024-03-29 18:33:24 +00:00
JeremyHi
93da45f678 feat: let alter table procedure can only alter physical table (#3613)
* feat: let alter table procedure can only alter physicale table

* chore: rm unnecessary todo
2024-03-29 09:50:33 +00:00
Ruihang Xia
7a19f66be0 ci: ignore type in sqlness sql and result files (#3616)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-03-29 09:39:37 +00:00
Ning Sun
500f9f10fc feat: allow cross-schema query in promql (#3545)
* feat: add __schema__ tag for promql parser

* feat: disable matcher op other than equals

* test: add more test to ensure context getting reset

* test: add integration test

* test: refactor tests

* refactor: remove duplicated test code

* refactor: update according to review comments

* test: add sqlness test for cross schema scenario

---------

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
2024-03-29 07:41:01 +00:00
JeremyHi
f49cd0ca18 refactor: cache invalidator (#3611)
* chore: remove some alias

* refactor: cache invalidator
2024-03-29 07:33:51 +00:00
Yingwen
ffbb132f27 feat: Implement an unordered scanner for append mode (#3598)
* feat: ScanInput

* refactor: seq scan use scan input

* chore: implement unordered scan

* feat: use unordered scan for append table

* fix: unordered scan panic

* docs: update mermaid

* chore: address comment

---------

Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>
2024-03-29 07:25:35 +00:00
Eugene Tolbakov
14267c2aed feat(tql): add initial support for start,stop,step as sql functions (#3507)
* feat(tql): add initial support for start,stop,step as sql functions

* fix(tql): remove unwraps, adjust fmt

* fix(tql): address taplo issue

* feat(tql): update parse_tql_query logic

* fix(tql): change query parsing logic to use parser instead of delimiter

* fix(tql): add timestamp function support, add sqlness tests

* fix(tql): add lookback optional param for tql eval

* fix(tql): adjust tests for now() function

* fix(tql): introduce the tqlerror to differentiate failures on parsing, evaluation and simplification stages

* fix(tql): add tests for explain/analyze

* feat(tql): add lookback support for explain/analyze, update tests

* feat(tql): add more sqlness tests

* chore(tql): extract common logic for eval, analyze and explain into a single function

* feat(tql): address CR points

* feat(tql): use snafu for tql errors, add more docs

* feat(tql): address CR points
2024-03-29 06:37:25 +00:00
Ruihang Xia
77cc7216af feat: support 2+2 and /status/buildinfo (#3604)
* feat: implement buildinfo endpoint

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* refactor prom result struct

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add more integration test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* format toml file

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Update src/servers/src/http/prometheus_resp.rs

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-03-29 06:31:39 +00:00
Zhenchi
63681f0e4d refactor(table): remove unused table requests (#3603)
* refactor(table): remove unused requests

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* update comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: clippy

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: compile

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2024-03-28 11:31:14 +00:00
Ruihang Xia
06a90527a3 fix: adjust status code to http error code map (#3601)
* fix: adjust status code to http error code map

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update integration test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-03-28 08:54:51 +00:00
niebayes
d5ba2fcf9d test: add more integration test for kafka wal (#3190)
* test: add integration tests for kafka wal

* chore: rebase main

* chore: unify naming convention for wal config

* chore: add register loaders switch

* chore: alter tables by adding a new column

* chore: move rand to dev-dependencies

* chore: update Cargo.lock
2024-03-28 06:55:18 +00:00
dennis zhuang
e3b37ee2c9 fix: canonicalize catalog and schema names (#3600) 2024-03-28 06:40:15 +00:00
dennis zhuang
5d7ce08358 feat: adds metric engine to information_schema engines table (#3599)
* feat: adds metric engine to information_schema engines table

* fix: support value for metric engine
2024-03-28 06:37:34 +00:00
JeremyHi
92a8e863de chore: do not reply for broadcast msg (#3595) 2024-03-27 11:39:23 +00:00
JeremyHi
9428cb8e7c feat: remove support for logical tables in the create table procedure (#3592)
* feat: Remove support for logical tables in the create table procedure

* chore: remove the redandent table ids alloc

* chore: minor fix
2024-03-27 10:03:42 +00:00
Weny Xu
5addb7d75a test: add tests for drop databases (#3594)
* refactor: minimize visibility of drop database steps

* feat: implement as_any

* refactor: move common functions to test_util

* test: add tests for drop databases

* fix: fix deteling physical table route unexpectedly
2024-03-27 09:18:37 +00:00
Weny Xu
623c930736 refactor: refactor drop table executor (#3589)
* refactor: refactor drop table executor

* chore: apply suggestions from CR
2024-03-27 06:29:54 +00:00
JeremyHi
5fa01e7a96 feat: create regions persist true (#3590)
* feat: change open-region-step's status persist as true

* feat: avoid cloning

* fix: fix unit test
2024-03-27 06:26:58 +00:00
Yingwen
922b1a9b66 feat: Implement append mode for a region (#3558)
* feat: add dedup option to merge reader

* test: test merger

* feat: append mode option

* feat: implement append mode for regions

* feat: only allow put under append mode

* feat: always create builder

* test: test append mode

* style: fix clippy

* test: trigger compaction

* chore: fix compiler errors
2024-03-27 03:21:22 +00:00
shuiyisong
653697f1d5 chore: add back core dependency (#3588) 2024-03-26 19:53:22 -07:00
JohnsonLee
83643eb195 feat: Support printing postgresql's bytea data type in its "hex" and "escape" format (#3567)
* feat: support set variable statement of session

* feat: support printing postgresql's bytea data type in its "hex" and "escape" format in ugly way

* refactor: add 'SessionConfigValue' type and unify the name

* doc: add license header

* refactor: confine coupling with 'sql::ast::Value' in SessionConfigValue

* refactor: move all bytea wrapper into bytea.rs

* fix: remove unused import in context.rs and postgres.rs

* refactor: rename 'set_configuration_parameter' to 'set_session_config'

rename 'set_configuration_parameter' in statement_.rs to 'set_session_config'

* refactor: use mod to organize options via macro

* refactor: re-model the session config value with static type

* test: add integration test

* refactor: move the encode bytea by format type logic into encoder

refactor: use Arc<DashMap> instead of DashMap in QueryContext

refactor: use Arc<DashMap> instead of DashMap in QueryContext

    Avoid expensive clone

refactor: use unreachable!() instead of unimplemented!()

refactor: move the encode bytea by format type logic into encoder

test: add binary format integration test case

* test: add ut for byte related type

* doc: remove TODO of bytea_output

* refactor: simplify the implementation with simple struct instead of complex typing

* fix: typo of 'Available'

* fix compile

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
Co-authored-by: tison <wander4096@gmail.com>
2024-03-27 01:54:41 +00:00
tison
d83279567b feat(auth): watch file user provider (#3566)
* feat(auth): watch file user provider

Signed-off-by: tison <wander4096@gmail.com>

* impl

Signed-off-by: tison <wander4096@gmail.com>

* use debouncer

Signed-off-by: tison <wander4096@gmail.com>

* add test

Signed-off-by: tison <wander4096@gmail.com>

* clippy

Signed-off-by: tison <wander4096@gmail.com>

* add path for FileWatch snafu

Signed-off-by: tison <wander4096@gmail.com>

* Apply comments

Signed-off-by: tison <wander4096@gmail.com>

* fix compile

Signed-off-by: tison <wander4096@gmail.com>

* drop notify-debouncer-full dep

Signed-off-by: tison <wander4096@gmail.com>

* empty to allow all

Signed-off-by: tison <wander4096@gmail.com>

* more test and log

Signed-off-by: tison <wander4096@gmail.com>

* relax the wait period

Signed-off-by: tison <wander4096@gmail.com>

* avoid sleep

Signed-off-by: tison <wander4096@gmail.com>

* Revert "avoid sleep"

This reverts commit d7a0be1dea.

* avoid sleep

Signed-off-by: tison <wander4096@gmail.com>

* cargo fmt

Signed-off-by: tison <wander4096@gmail.com>

* tidy dep

Signed-off-by: tison <wander4096@gmail.com>

* adjust

Signed-off-by: tison <wander4096@gmail.com>

* try be stable on CI

Signed-off-by: tison <wander4096@gmail.com>

* deugging

Signed-off-by: tison <wander4096@gmail.com>

* debugging

Signed-off-by: tison <wander4096@gmail.com>

* watch on the dir

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
2024-03-27 01:19:18 +00:00
tison
150454b1fd chore: Delete CODE_OF_CONDUCT.md (#3578)
Leverage GitHub's feature to reuse `GreptimeTeam/.github` content.

This depends on https://github.com/GreptimeTeam/.github/pull/5.
2024-03-26 09:43:05 -07:00
JeremyHi
58c7858cd4 feat: update physical table schema on alter logical tables (#3585)
* feat: update physical table schema on alter

* feat: alter logical table in sql path

* feat: invalidate cache step1

* feat: invalidate cache step2

* feat: invalidate cache step3

* feat: invalidate cache step4

* fix: failed ut

* fix: standalone cache invalidator

* feat: log the count of already finished

* feat: re-invalidate cache

* chore: by comment

* chore: Update src/common/meta/src/ddl/create_logical_tables.rs

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
2024-03-26 14:29:53 +00:00
dimbtp
dd18d8c97b build(deps): remove some unused dependencies (#3582)
* build(deps): remove some unused dependencies

* add `arc-swap` dependency back
2024-03-26 12:48:28 +00:00
Lei, HUANG
175929426a feat: support time range in copy table (#3583)
* feat: support specifying time range in copy table statement

* chore: update sqlness results

* fix: sqlness
2024-03-26 11:24:28 +00:00
Ruihang Xia
8f9676aad2 fix: incorrect version info in (#3586)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-03-26 09:31:01 +00:00
Ruihang Xia
74565151e9 fix: update pk_cache in compat reader (#3576)
* fix: update pk_cache in compat reader

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add sqlness case

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update document

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add more sqlness case

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* avoid mysterious bug

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-03-26 08:31:00 +00:00
Ruihang Xia
83c1b485ea chore: limit OpenDAL's feature gates (#3584)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-03-26 07:54:06 +00:00
JeremyHi
c2dd1136fe feat: batch alter logical tables (#3569)
* feat: add unit test for alter logical tables

* Update src/common/meta/src/ddl/alter_table.rs

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

* feat: add some comments

* chore: add debug_assert_eq

* chore: fix some nits

* chore: remove the method batch_get_table_routes

---------

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
2024-03-26 07:07:23 +00:00
tison
7c1c6e8b8c refactor: try upgrade regex-automata (#3575)
* refactor: try upgrade regex-automata

Signed-off-by: tison <wander4096@gmail.com>

* try fix

Signed-off-by: tison <wander4096@gmail.com>

* always check match with next_eoi_state

Signed-off-by: tison <wander4096@gmail.com>

* add a guard to prevent over moving the state

Signed-off-by: tison <wander4096@gmail.com>

* tidy

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
2024-03-26 04:28:14 +00:00
Yingwen
62d8bbb10c ci: use single commit on the deployment branch (#3580) 2024-03-25 21:04:57 -07:00
Weny Xu
bf14d33962 feat: implement the drop database procedure (#3541)
* refactor: remove Sync trait of Procedure

* refactor: remove unnecessary async

* feat: implement the drop database procedure

* refactor: refactor DdlManager register_loaders

* feat: register the DropDatabaseProcedureLoader

* chore: fmt toml

* feat: support to submit DropDatabaseTask

* feat: support drop database stmt

* fix: empty the tables stream

* fix: ensure the factory always exists

* test: update sqlness results

* chore: correct comments

* test: update sqlness results

* test: update sqlness results

* chore: apply suggestions from CR

* chore: apply suggestions from CR
2024-03-25 06:12:47 +00:00
tison
0f1747b80d chore: retain original headers (#3572)
Signed-off-by: tison <wander4096@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
2024-03-25 03:53:51 +00:00
Ruihang Xia
992c7ec71b feat: update physical table's schema on creating logical table (#3570)
* feat: update physical table's schema on creating logical table

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove debug code

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update sqlness cases

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* tweak ut const

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update sqlness cases

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* invalid physical table cache

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-03-25 03:19:30 +00:00
x³u³
2ad0b24efa fix: set http response chartset to utf-8 when using table format (#3571) 2024-03-25 03:13:01 +00:00
Ruihang Xia
2b2fd80bf4 feat: return new added columns in region server's extension response (#3533)
* feat: adapt the new proto response

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update interfaces

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* write columns to extension

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* use physical column's schema

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* sort logical columns by name

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* format code

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* return physical table's column

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Update src/common/meta/src/datanode_manager.rs

Co-authored-by: JeremyHi <jiachun_feng@proton.me>

* implement sort column logic

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* proxy create table procedure to create logical table

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add unit test for sort_columns

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update sqlness cases

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: JeremyHi <jiachun_feng@proton.me>
2024-03-23 09:31:16 +00:00
x³u³
24886b9530 test: add a parameter type mismatch test case to sql integration test (#3568) 2024-03-22 17:43:20 +00:00
tison
8345f1753c chore: avoid confusing TryFrom (#3565)
Signed-off-by: tison <wander4096@gmail.com>
2024-03-22 11:16:36 +00:00
tison
3420a010e6 refactor: reduce one clone by carefully pass ready boundary (#3543)
* refactor: reduce one clone by carefully pass ready boundary

Signed-off-by: tison <wander4096@gmail.com>

* defensive handle None

Signed-off-by: tison <wander4096@gmail.com>

* tidy code a bit

Signed-off-by: tison <wander4096@gmail.com>

* except batch exist

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
2024-03-22 04:46:17 +00:00
discord9
9f020aa414 fix(flow): Arrange get range with batch unaligned (#3552)
* fix: Arrange get range with batch unaligned

* chore: per review

* refactor: sort at apply_updates
2024-03-22 04:08:37 +00:00
tison
c9ac72e7f8 ci: use a PAT to list all writers (#3559)
Signed-off-by: tison <wander4096@gmail.com>
2024-03-21 20:25:01 -07:00
Lei, HUANG
86fb9d8ac7 refactor: remove redudant PromStoreProtocolHandler::write (#3553)
refactor: remove redudant PromStoreProtocolHandler::write API and rename PromStoreProtocolHandler::write_fast to write
2024-03-22 02:09:00 +00:00
Lei, HUANG
1f0fc40287 fix: performance degradation caused by config change (#3556) 2024-03-21 12:23:52 +00:00
tison
8b7a5aaa4a refactor: handle error for http format (#3548)
* refactor: handle error for http format

Signed-off-by: tison <wander4096@gmail.com>

* finish format handling

Signed-off-by: tison <wander4096@gmail.com>

* simplify auth error

Signed-off-by: tison <wander4096@gmail.com>

* fix

Signed-off-by: tison <wander4096@gmail.com>

* clippy format

Signed-off-by: tison <wander4096@gmail.com>

* no longer set greptime-db-format on influxdb error

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
2024-03-21 07:29:11 +00:00
Weny Xu
856a4e1e4f refactor: refactor CacheInvalidator (#3550)
* refactor: refactor InvalidateCache Instruction

* refactor: refactor CacheInvalidator
2024-03-20 10:18:28 +00:00
Yingwen
39b69f1e3b refactor!: Renames the new memtable to PartitionTreeMemtable (#3547)
* refactor: rename mod merge_tree to partition_tree

* refactor: rename merge_tree

* refactor: change merge tree comment

* refactor: rename merge tree struct

* refactor: memtable options
2024-03-20 06:40:41 +00:00
tison
bbcdb28b7c chore: fix comment in fetch-dashboard-assets.sh (#3546) 2024-03-20 06:18:14 +00:00
YCCD
6377982501 feat: Able to pretty print sql query result in http output (#3539)
* feat: Able to pretty print sql query result in http output

* fix: add some tests

* fix: add some space, delete fn into_payload, and impl Display for TableResponse
2024-03-20 03:25:17 +00:00
Lei, HUANG
ddbcff68dd feat: support append-only mode in time-series memtable (#3540)
* feat: support append-only mode in time-series memtable

* fix: rename sort_and_dedup to sort
2024-03-19 20:37:54 +00:00
WU Jingdi
5b315c2d40 feat: support multi params in promql range function macro (#3464)
feat: support multi params in promql range function
2024-03-19 20:36:51 +00:00
Ruihang Xia
9816d2a08b fix: clone data instead of moving it - homemade future is dangerous (#3542)
* fix: clone data instead of moving it - homemade future is dangerous

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add comment

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-03-19 13:46:55 +00:00
Ning Sun
a99d6eb3f9 feat: update pgwire to 0.20 for improved performance (#3538) 2024-03-19 10:11:05 +00:00
discord9
2c115bc22a feat(flow): shared in-memory state for dataflow operator (#3508)
* feat: Arrangement shared state

* feat: arrange&tests

* docs: detailed&tests for get

* chore: license

* refactor: opt out ts expr&tests: internal ts

* docs: remove some TODOs

* feat: use smallvec size of 2

* refactor: per review

* chore: per review

* chore: per review

* chore: remove reduant clone

* feat: return max expire time&docs: more explain cur expire config
2024-03-19 10:03:05 +00:00
Yingwen
641592644d feat: support per table memtable options (#3524)
* feat: add memtable builder to region

* refactor: rename memtable_builder in worker to default_memtable_builder

* fix: return error instead of using default compaction options

Support deserializing memtable and compaction options from the option
map

* feat: optional memtable options

* feat: add MemtableBuilderProvider to create builders

* feat: change default memtable and skip deserializing dedup

* chore: update test and comment

* chore: test invalid type

* feat: metric engine use new memtable manually

* feat: expose more memtable configs

* feat: add memtable options to valid option list

* test: add test

* test: sqlness test

* chore: serde workspace

* chore: remove comments
2024-03-19 08:50:10 +00:00
Weny Xu
fa0f3555d4 refactor: introduce the DropTableExecutor (#3534)
* refactor: introduce the DropTableExecutor

* fix: register the dropping regions

* test: add tests for on_prepare

* chroe: add TODO comment
2024-03-19 08:29:12 +00:00
ZonaHe
3cad844acd feat: update dashboard to v0.4.9 (#3531)
Co-authored-by: ZonaHex <ZonaHex@users.noreply.github.com>
2024-03-19 03:18:42 +00:00
JeremyHi
cf25cf984b chore: avoid unnecessary cloning (#3537) 2024-03-18 13:24:13 +00:00
shuiyisong
3acd5bfad0 chore: http header with metrics (#3536)
* chore: bring write cost to output

* chore: add write cost to greptimev1result

* chore: add metrics to influxdb write resp header

* chore: add metrics to prom store

* chore: add metrics to otlp

* chore: add debug log

* fix: prom remote read with output

* fix: prom queries don't output metrics header

* chore: extract header value

* chore: refactor code

* chore: fix cr issue
2024-03-18 11:21:19 +00:00
Weny Xu
343525dab8 refactor: remove removed-prefixed keys (#3535) 2024-03-18 11:07:30 +00:00
tison
0afac58e4d feat(metasrv): implement maintenance (#3527)
* feat(metasrv): implement maintenance

Signed-off-by: tison <wander4096@gmail.com>

* fixup and test

Signed-off-by: tison <wander4096@gmail.com>

* Add coauthors

Co-authored-by: Yingwen <realevenyag@gmail.com>
Co-authored-by: xifyang <595482900@qq.com>

* tidy code

Signed-off-by: tison <wander4096@gmail.com>

* Apply suggestions from code review

Co-authored-by: LFC <990479+MichaelScofield@users.noreply.github.com>

* always read kv_backend maintenance state

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
Co-authored-by: xifyang <595482900@qq.com>
Co-authored-by: LFC <990479+MichaelScofield@users.noreply.github.com>
2024-03-18 09:41:14 +00:00
tison
393ea44de0 docs: improve fn comments (#3526)
Signed-off-by: tison <wander4096@gmail.com>
2024-03-18 03:18:01 +00:00
tison
44731fd653 docs: readme style and project status (#3528)
* docs: readme style

Signed-off-by: tison <wander4096@gmail.com>

* more opening

Signed-off-by: tison <wander4096@gmail.com>

* Project Status

Signed-off-by: tison <wander4096@gmail.com>

* tidy

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
2024-03-18 03:03:25 +00:00
tison
d36a5a74d3 ci: unassign issues stale 14 days ago (#3529)
This closes https://github.com/GreptimeTeam/greptimedb/issues/3525.
2024-03-18 03:01:10 +00:00
Yingwen
74862f8c3f feat(mito): Checks whether a region should flush periodically (#3459)
* feat: handle flush periodically

* chore: call periodical method in loop

* feat: check periodical tasks on channel timeout

* refactor: use time provider to get time

Mock a time provider to test auto flush

* chore: fix typos

* refactor: rename mock time provider

* style: fix cilppy

* chore: address comment
2024-03-15 06:41:28 +00:00
Weny Xu
a52aedec5b feat: implement the drop database parser (#3521)
* refactor: refactor drop table parser

* feat: implement drop database parser

* fix: canonicalize name of create database

* test: update sqlness result

* Update src/operator/src/statement.rs

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

---------

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
2024-03-15 06:15:18 +00:00
tison
b6fac619a6 docs: revise README file (#3522)
* docs: revise README file

Signed-off-by: tison <wander4096@gmail.com>

* build prerequisite

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
2024-03-15 04:22:35 +00:00
Weny Xu
a29e7ebb7d feat: acquire all locks in procedure (#3514)
* feat: acquire catalog and schema lock in region failover

* chore: remove unused code

* feat!: acquire catalog and schema lock in region migration

* feat: acquire catalog and schema lock in create table
2024-03-14 11:41:23 +00:00
Yingwen
8ca9e01455 feat: Partition memtables by time if compaction window is provided (#3501)
* feat: define time partitions

* feat: adapt time partitions to version

* feat: implement non write methods

* feat: add write one to memtable

* feat: implement write

* chore: fix warning

* fix: inner not set

* refactor: add collect_iter_timestamps

* test: test partitions

* chore: debug log

* chore: fix typos

* chore: log memtable id

* fix: empty check

* chore: log total parts

* chore: update comments
2024-03-14 11:13:01 +00:00
Weny Xu
3a326775ee ci: add bin options to reduce build burden (#3518)
chore: add bin options
2024-03-14 11:05:35 +00:00
Yingwen
5ad3b7984e docs: add v0.7 TSBS benchmark result (#3512)
* docs: add v0.7 TSBS benchmark result

* docs: add OS

* docs: fix format
2024-03-14 08:29:52 +00:00
Yingwen
4fc27bdc75 chore: bump version to v0.7.1 (#3510)
chore: bump version
2024-03-14 07:43:47 +00:00
LFC
e3c82568e5 fix: correctly generate sequences when the value is pre-existed (#3502) 2024-03-14 06:55:12 +00:00
tison
61f0703af8 feat: support decode gzip if influxdb write specify it (#3494)
* feat: support dedoce gzip if influxdb write specify it

Signed-off-by: tison <wander4096@gmail.com>

* address comments

Signed-off-by: tison <wander4096@gmail.com>

* simplify with tower_http DecompressionLayer

Signed-off-by: tison <wander4096@gmail.com>

* tidy some code

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
2024-03-14 04:26:26 +00:00
Ruihang Xia
b85d7bb575 fix: decoding prometheus remote write proto doesn't reset the value (#3505)
* reset Sample

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* accomplish test assertion

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* revert toml format

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-03-14 03:08:14 +00:00
Ning Sun
d334d74986 fix!: remove error message from http header to avoid panic (#3506)
fix: remove error message from http header
2024-03-14 01:43:38 +00:00
Ning Sun
5ca8521e87 ci: attempt to setup docker cache for etcd (#3488)
* ci: attempt to setup docker cache for etcd

* ci: do not use file hash for cache key
2024-03-14 00:48:02 +00:00
Weny Xu
e4333969b4 feat(fuzz): add alter table target (#3503)
* feat(fuzz): validate semantic type of column

* feat(fuzz): add fuzz_alter_table target

* feat(fuzz): validate columns

* chore(ci): add fuzz_alter_table ci cfg
2024-03-13 14:11:47 +00:00
Zhenchi
b55905cf66 feat(fuzz): add insert target (#3499)
* fix(common-time): allow building nanos timestamp from parts split from i64::MIN

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat(fuzz): add insert target

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: cleanup cargo.toml and polish comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2024-03-13 10:03:03 +00:00
WU Jingdi
fb4da05f25 fix: adjust fill behavior of range query (#3489) 2024-03-13 09:20:34 +00:00
Zhenchi
904484b525 fix(common-time): allow building nanos timestamp from parts split from i64::MIN (#3493)
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2024-03-13 02:46:00 +00:00
tison
cafb4708ce refactor: validate constraints eagerly (#3472)
* chore: validate constraints eagerly

Signed-off-by: tison <wander4096@gmail.com>

* use timestamp column

Signed-off-by: tison <wander4096@gmail.com>

* fixup

Signed-off-by: tison <wander4096@gmail.com>

* lint

Signed-off-by: tison <wander4096@gmail.com>

* compile

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
2024-03-12 13:09:34 +00:00
Yingwen
7c895e2605 perf: more benchmarks for memtables (#3491)
* chore: remove duplicate bench

* refactor: rename bench

* perf: add full scan bench for memtable

* feat: filter bench and add time series to bench group

* chore: comment

* refactor: rename

* style: fix clippy
2024-03-12 12:02:58 +00:00
Lei, HUANG
9afe327bca feat: improve prom write requests decode performance (#3478)
* feat: optimize decode performance

* fix: some cr comments
2024-03-12 12:00:38 +00:00
discord9
58bd065c6b feat(flow): plan def (#3490)
* feat: plan def

* chore: add license

* docs: remove TODO done

* chore: add derive Ord
2024-03-12 10:59:07 +00:00
Yingwen
9aa8f756ab fix: allow passing extra table options (#3484)
* fix: do not check options in parser

* test: fix tests

* test: fix sqlness

* test: add sqlness test

* chore: log options

* chore: must specify compaction type

* feat: validate option key

* feat: add option key validation back
2024-03-12 07:03:52 +00:00
discord9
7639c227ca feat(flow): accumlator for aggr func (#3396)
* feat: Accumlator trait

* feat: add `OrdValue` accum&use enum_dispatch

* test: more accum test

* feat: eval aggr funcs

* chore: refactor test&fmt clippy

* refactor: less verbose

* test: more tests

* refactor: better err handling&use OrdValue for Count

* refactor: ignore null&more tests for error handle

* refactor: OrdValue accum

* chore: extract null check

* refactor: def&use fn signature

* chore: use extra cond with match guard

* chore: per review
2024-03-12 02:09:27 +00:00
tison
1255c1fc9e feat: to_timezone function (#3470)
* feat: to_timezone function

Signed-off-by: tison <wander4096@gmail.com>

* impl Function for ToTimezoneFunction

Signed-off-by: tison <wander4096@gmail.com>

* add test

Signed-off-by: tison <wander4096@gmail.com>

* Add original authors

Co-authored-by: parkma99 <park-ma@hotmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>

* fixup

Signed-off-by: tison <wander4096@gmail.com>

* address comments

Signed-off-by: tison <wander4096@gmail.com>

* add issue link

Signed-off-by: tison <wander4096@gmail.com>

* code refactor

Signed-off-by: tison <wander4096@gmail.com>

* further tidy

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
Co-authored-by: parkma99 <park-ma@hotmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
2024-03-12 01:46:19 +00:00
Yingwen
06dcd0f6ed fix: freeze data buffer in shard (#3468)
* feat: call freeze if the active data buffer in a shard is full

* chore: more metrics

* chore: print metrics

* chore: enlarge freeze threshold

* test: test freeze

* test: fix config test
2024-03-11 14:51:06 +00:00
Weny Xu
0a4444a43a feat(fuzz): validate columns (#3485) 2024-03-11 11:34:50 +00:00
Ruihang Xia
b7ac8d6aa8 ci: use another mirror for etcd image (#3486)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-03-11 10:40:19 +00:00
Weny Xu
e767f37241 fix: fix f64 has no sufficient precision during parsing (#3483) 2024-03-11 09:28:40 +00:00
JeremyHi
da098f5568 fix: make max-txn-ops limit valid (#3481) 2024-03-11 09:27:51 +00:00
shuiyisong
aa953dcc34 fix: impl RecordBatchStream method explicitly (#3482)
fix: impl RecordBatchStream method explicitly
2024-03-11 09:07:10 +00:00
crwen
aa125a50f9 refactor: make http api returns non-200 status code (#3473)
* refactor: make http api returns non-200 status code

* recover some code
2024-03-11 03:38:36 +00:00
Ruihang Xia
d8939eb891 feat: clamp function (#3465)
* basic impl

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add unit tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* a little type exercise

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add sqlness case

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-03-11 03:26:10 +00:00
shuiyisong
0bb949787c refactor: introduce new Output with OutputMeta (#3466)
* refactor: introduce new output struct

* chore: add helper function

* chore: update comment

* chore: update commit

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

* chore: rename according to cr

---------

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
2024-03-11 02:24:09 +00:00
WU Jingdi
8c37c3fc0f feat: support first_value/last_value in range query (#3448)
* feat: support `first_value/last_value` in range query

* chore: add sqlness test on `count`

* chore: add test
2024-03-11 01:30:39 +00:00
gcmutator
21ff3620be chore: remove repetitive words (#3469)
remove repetitive words

Signed-off-by: gcmutator <329964069@qq.com>
2024-03-09 04:18:47 +00:00
Eugene Tolbakov
aeca0d8e8a feat(influxdb): add db query param support for v2 write api (#3445)
* feat(influxdb): add db query param support for v2 write api

* fix(influxdb): update authorize logic to get catalog and schema from query string

* fix(influxdb): address CR suggestions

* fix(influxdb): use the correct import
2024-03-08 08:17:57 +00:00
Weny Xu
a309cd018a fix: fix incorrect COM_STMT_PREPARE reply (#3463)
* fix: fix incorrect `COM_STMT_PREPARE` reply

* chore: use column name instead of index
2024-03-08 07:31:20 +00:00
Yingwen
3ee53360ee perf: Reduce decode overhead during pruning keys in the memtable (#3415)
* feat: reuse value buf

* feat: skip values to decode

* feat: prune shard

chore: fix compiler errors

refactor: shard prune metrics

* fix: panic on DedupReader::try_new

* fix: prune after next

* chore: num parts metrics

* feat: metrics and logs

* chore: data build cost

* chore: more logs

* feat: cache skip result

* chore: todo

* fix: index out of bound

* test: test codec

* fix: invalid offsets

* fix: skip binary

* fix: offset buffer reuse

* chore: comment

* test: test memtable filter

* style: fix clippy

* chore: fix compiler error
2024-03-08 02:54:00 +00:00
JeremyHi
352bd7b6fd feat: max-txn-ops option (#3458)
* feat: max-txn-ops limit

* chore: by comment
2024-03-08 02:34:40 +00:00
Weny Xu
3f3ef2e7af refactor: separate the quote char and value (#3455)
refactor: use ident instead of string
2024-03-07 08:24:09 +00:00
Weny Xu
a218f12bd9 test: add fuzz test for create table (#3441)
* feat: add create table fuzz test

* chore: add ci cfg for fuzz tests

* refactor: remove redundant nightly config

* chore: run fuzz test in debug mode

* chore: use ubuntu-latest

* fix: close connection

* chore: add cache in fuzz test ci

* chore: apply suggestion from CR

* chore: apply suggestion from CR

* chore: refactor the fuzz test action
2024-03-07 06:51:19 +00:00
ZonaHe
c884c56151 feat: update dashboard to v0.4.8 (#3450)
Co-authored-by: ZonaHex <ZonaHex@users.noreply.github.com>
2024-03-07 04:06:07 +00:00
Weny Xu
9ec288cab9 chore: specify binary name (#3449) 2024-03-07 03:56:24 +00:00
LFC
1f1491e429 feat: impl some "set"s to adapt to some client apps (#3443) 2024-03-06 13:15:48 +00:00
Weny Xu
c52bc613e0 chore: add bin opt to build cmd (#3440) 2024-03-06 08:24:55 +00:00
shuiyisong
a9d42f7b87 fix: add support for influxdb basic auth (#3437) 2024-03-06 03:56:25 +00:00
tison
86ce2d8713 build(deps): upgrade opendal to 0.45.1 (#3432)
* build(deps): upgrade opendal to 0.45.1

Signed-off-by: tison <wander4096@gmail.com>

* Update src/object-store/Cargo.toml

Co-authored-by: Weny Xu <wenymedia@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
Co-authored-by: Weny Xu <wenymedia@gmail.com>
2024-03-06 03:08:59 +00:00
Yingwen
5d644c0b7f chore: bump version to v0.7.0 (#3433) 2024-03-05 12:07:37 +00:00
Ruihang Xia
020635063c feat: implement multi-dim partition rule (#3409)
* generate expr rule

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* implement show create for new partition rule

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* implement row spliter

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: fix failed tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: fix lint issues

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: ignore tests for deprecated partition rule

* chore: remove unused partition rule tests setup

* test(sqlness): add basic partition tests

* test(multi_dim): add basic find region test

* address CR comments

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: WenyXu <wenymedia@gmail.com>
Co-authored-by: WenyXu <wenymedia@gmail.com>
2024-03-05 11:39:15 +00:00
dependabot[bot]
97cbfcfe23 build(deps): bump mio from 0.8.10 to 0.8.11 (#3434)
Bumps [mio](https://github.com/tokio-rs/mio) from 0.8.10 to 0.8.11.
- [Release notes](https://github.com/tokio-rs/mio/releases)
- [Changelog](https://github.com/tokio-rs/mio/blob/master/CHANGELOG.md)
- [Commits](https://github.com/tokio-rs/mio/compare/v0.8.10...v0.8.11)

---
updated-dependencies:
- dependency-name: mio
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-03-05 11:04:14 +00:00
Lei, HUANG
7183fa198c refactor: make MergeTreeMemtable the default choice (#3430)
* refactor: make MergeTreeMemtable the default choice

* refactor: reformat

* chore: add doc to config
2024-03-05 10:00:08 +00:00
Lei, HUANG
02b18fbca1 feat: decode prom requests to grpc (#3425)
* hack: inline decode

* move to servers

* fix: samples lost

* add bench

* remove useless functions

* wip

* feat: remove object pools

* fix: minor issues

* fix: remove useless dep

* chore: rebase main

* format

* finish

* fix: format

* feat: introduce request pool

* try to fix license issue

* fix: clippy

* resolve comments

* fix:typo

* remove useless comments
2024-03-05 09:47:32 +00:00
shuiyisong
7b1c3503d0 fix: complete interceptors for all frontend entry (#3428) 2024-03-05 09:38:47 +00:00
liyang
6fd2ff49d5 ci: refine windows output env (#3431) 2024-03-05 08:38:28 +00:00
WU Jingdi
53f2a5846c feat: support tracing rule sampler (#3405)
* feat: support tracing rule sampler

* chore: simplify code
2024-03-05 15:40:02 +08:00
Yingwen
49157868f9 feat: Correct server metrics and add more metrics for scan (#3426)
* feat: drop timer on stream terminated

* refactor: combine metrics into a histogram vec

* refactor: frontend grpc metrics

* feat: add metrics middleware layer to grpc server

* refactor: move http metrics layer to metrics mod

* feat: bucket for grpc/http elapsed

* feat: remove duplicate metrics

* style: fix cilppy

* fix: incorrect bucket of promql series

* feat: more metrics for mito

* feat: convert cost

* test: fix metrics test
2024-03-04 10:15:10 +00:00
Ruihang Xia
ae2c18e1cf docs(rfcs): multi-dimension partition rule (#3350)
* docs(rfcs): multi-dimension partition rule

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* change math block type

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update tracking issue

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update discussion

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-03-04 08:10:54 +00:00
dennis zhuang
e6819412c5 refactor: show tables and show databases (#3423)
* refactor: show tables and show databases

* chore: clean code
2024-03-04 06:15:17 +00:00
tison
2a675e0794 docs: update pull_request_template.md (#3421) 2024-03-03 09:51:44 +00:00
JeremyHi
0edf1bbacc feat: reduce a clone of string (#3422) 2024-03-03 08:09:17 +00:00
Eugene Tolbakov
8609977b52 feat: add verbose support for tql explain/analyze (#3390)
* feat: add verbose support for tql explain/analyze

* chore: apply clippy suggestions

* feat: add sqlness tests

* fix: adjust sqlness replace rules

* fix: address CR (move tql explain/analyze inside common folder)

* fix: address CR(improve comments to indicate that verbose is optional)
2024-03-02 11:18:22 +00:00
JeremyHi
2d975e4f22 feat: tableref cache (#3420)
* feat: tableref cache

* chore: minor refactor

* chore: avoid to string

* chore: change log level

* feat: add metrics for prometheus remote write decode
2024-03-02 07:37:31 +00:00
Kould
00cbbc97ae feat: support Create Table ... Like (#3372)
* feat: support `Create Table ... Like`

* fix: `check_permission` for `Create Table ... Like`

* style: renaming `name` -> `table_name` & `target` -> `source_name` and make `Create Table ... Like` testcase more complicated

* rebase

* avoid _ fn

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
Co-authored-by: tison <wander4096@gmail.com>
2024-03-02 06:34:13 +00:00
niebayes
7d30c2484b fix: mitigate memory spike during startup (#3418)
* fix: fix memory spike during startup

* fix: allocate a region write ctx for each wal entry
2024-03-01 07:46:05 +00:00
Lei, HUANG
376409b857 feat: employ sparse key encoding for shard lookup (#3410)
* feat: employ short key encoding for shard lookup

* fix: license

* chore: simplify code

* refactor: only enable sparse encoding to speed lookup on metric engine

* fix: names
2024-03-01 06:22:15 +00:00
Ning Sun
d4a54a085b feat: add configuration for tls watch option (#3395)
* feat: add configuration for tls watch option

* test: sleep longer to ensure async task run

* test: update config api integration test

* refactor: rename function
2024-03-01 03:49:54 +00:00
dennis zhuang
c1a370649e fix: show table names not complete from information_schema (#3417) 2024-03-01 02:51:46 +00:00
JeremyHi
3cad9d989d fix: partition region id (#3414) 2024-02-29 09:09:59 +00:00
JohnsonLee
a50025269f feat: Support automatic DNS lookup for kafka bootstrap servers (#3379)
* feat: Support automatic DNS lookup for kafka bootstrap servers

* Revert "feat: Support automatic DNS lookup for kafka bootstrap servers"

This reverts commit 5baed7b01d.

* feat: Support automatic DNS lookup for Kafka broker

* fix: resolve broker endpoint in client manager

* fix: apply clippy lints

* refactor: slimplify the code with clippy hint

* refactor: move resolve_broker_endpoint to common/wal/src/lib.rs

* test: add mock test for resolver_broker_endpoint

* refactor: accept niebayes's advice

* refactor: rename EndpointIpNotFound to EndpointIPV4NotFound

* refactor: remove mock test and simplify the implementation

* docs: add comments about test_vallid_host_ipv6

* Apply suggestions from code review

Co-authored-by: niebayes <niebayes@gmail.com>

* move more common code

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
Co-authored-by: tison <wander4096@gmail.com>
Co-authored-by: niebayes <niebayes@gmail.com>
2024-02-29 07:29:20 +00:00
JeremyHi
a3533c4ea0 feat: zero copy on split rows (#3407) 2024-02-28 13:27:52 +00:00
Lei, HUANG
3413fc0781 refactor: move some costly methods in DataBuffer::read out of read lock (#3406)
* refactor: move some costly methods in DataBuffer::read out of read lock

* refactor: also replace ShardReader with ShardReaderBuilder
2024-02-28 12:22:44 +00:00
tison
dc205a2c5d feat: enable ArrowFlight compression (#3403)
* feat: enable ArrowFlight compression

Signed-off-by: tison <wander4096@gmail.com>

* turn on features

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
2024-02-28 08:55:44 +00:00
Lei, HUANG
a0a8e8c587 fix: some read metrics (#3404)
* fix: some read metrics

* chore: fix some metrics

* fix
2024-02-28 08:47:49 +00:00
Zhenchi
c3c80b92c8 feat(index): measure memory usage in global instead of single-column and add metrics (#3383)
* feat(index): measure memory usage in global instead of single-column and add metrics

* feat: add leading zeros to streamline memory usage

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: fmt

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: remove println

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2024-02-28 06:49:24 +00:00
Weny Xu
a8cbec824c refactor: refactor TableRouteManager (#3392)
* feat: introduce TableRouteStorage

* refactor: remove get & batch_get in TableRouteManager

* refactor: move txn related fn to TableRouteStorage

* chore: apply suggestions from CR

* chore(codecov): ingore tests-integration dir
2024-02-28 06:18:09 +00:00
tison
33d894c1f0 build: do not retry for connrefused (#3402)
* build: do not retry for connrefused

Signed-off-by: tison <wander4096@gmail.com>

* simplify layout

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
2024-02-28 06:15:23 +00:00
Lei, HUANG
7942b8fae9 chore: add metris for memtable read path (#3397)
* chore: add metris for read path

* chore: add more metrics
2024-02-28 03:37:19 +00:00
Yingwen
b97f957489 feat: Use a partition level map to look up pk index (#3400)
* feat: partition level map

* test: test shard and builder

* fix: do not use pk index from shard builder

* feat: add multi key test

* fix: freeze shard before finding pk in shards
2024-02-28 03:17:09 +00:00
tison
f3d69e9563 chore: retry fetch dashboard assets (#3394)
Signed-off-by: tison <wander4096@gmail.com>
2024-02-27 10:07:21 +00:00
dennis zhuang
4b36c285f1 feat: flush or compact table and region functions (#3363)
* feat: adds Requester to process table flush and compaction request

* feat: admin_fn macros for administration functions

* test: add query result

* feat: impl flush_region, flush_table, compact_region, and flush_region functions

* docs: add Arguments to admin_fn macro

* chore: apply suggestion

Co-authored-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: apply suggestion

Co-authored-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: group_requests_by_peer and adds log

* Update src/common/macro/src/admin_fn.rs

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

* feat: adds todo for spawan thread

* feat: rebase with main

---------

Co-authored-by: Zhenchi <zhongzc_arch@outlook.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
2024-02-27 08:57:38 +00:00
discord9
dbb1ce1a9b feat(flow): impl for MapFilterProject (#3359)
* feat: mfp impls

* fix: after rebase

* test: temporal filter mfp

* refactor: more comments&test

* test: permute

* fix: check input len when eval

* refactor: err handle&docs: more explain graph

* docs: better flowchart map,filter,project

* refactor: visit_* falliable

* chore: better temp lint allow

* fix: permute partially

* chore: remove duplicated checks

* docs: more explain&tests for clarity

* refactor: use ensure! instead
2024-02-27 08:13:55 +00:00
Ruihang Xia
3544c9334c feat!: new partition grammar - parser part (#3347)
* parser part

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix test in sql

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* comment out and ignore some logic

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update sqlness cases

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update region migration test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* temporary disable region migration test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* allow dead code

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update integration test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-02-27 07:20:16 +00:00
Lei, HUANG
492a00969d feat: enable zstd compression and encodings in merge tree data part (#3380)
* feat: enable zstd compression in merge tree data part to save memory

* feat: also enable customized column encoding in DataPartEncoder
2024-02-27 06:54:56 +00:00
Yingwen
206666bff6 feat: Implement partition eviction and only add value size to write buffer size (#3393)
* feat: track key bytes in dict

* chore: done allocating on finish

* feat: evict keys

* chore: do not add to write buffer

* chore: only count value bytes

* fix: reset key bytes

* feat: remove write buffer manager from shards

* feat: change dict size compute method

* chore: adjust dictionary size by os memory
2024-02-27 06:28:57 +00:00
Weny Xu
7453d9779d fix: throw errors instead of panic (#3391)
* fix: throw errors instead of panic

* chore: apply suggestions from CR
2024-02-27 03:46:12 +00:00
liyang
8e3e0fd528 ci: add builder result outputs in release action (#3381) 2024-02-27 03:43:16 +00:00
dimbtp
b1e290f959 fix: range fix in modulo function tests (#3389)
fix: range fix for modulo tests
2024-02-26 15:50:23 +00:00
Ruihang Xia
d8dc93fccc feat(grafana): enable shared tooltip, add raft engine throughput (#3387)
feat: enable shared tooltip, add raft engine throughput

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-02-26 11:31:15 +00:00
Ning Sun
3887d207b6 feat: make tls certificates/keys reloadable (part 1) (#3335)
* feat: make tls certificates/keys reloadable (part 1)

* feat: add notify watcher for cert/key files

* test: add unit test for watcher

* fix: correct usage of watcher

* fix: skip watch when tls disabled
2024-02-26 09:37:54 +00:00
Ruihang Xia
e859f0e67d chore: skip reorder workspace tables in taplo (#3388)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-02-26 08:57:49 +00:00
Ruihang Xia
ce397ebcc6 feat: change how region id maps to region worker (#3384)
* feat: change how region id maps to region worker

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add overflow test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-02-26 08:42:29 +00:00
Yingwen
26011ed0b6 fix: resets dict builder keys counter and avoid unnecessary pruning (#3386)
* fix: dict builder resets num_keys on finish

* feat: skip empty shard and builder

* feat: avoid pruning if possible

Implementations:
- Apply all filters on the partition column
- If no filter to prune, skip decoding keys
2024-02-26 08:24:46 +00:00
Lei, HUANG
8087822ab2 refactor: change the receivers of merge tree components (#3378)
* refactor: change the receivers of Shard::read/DataBuffer::read/DataParts::read to &self instead of &mut self

* refactor: remove allow(dead_code) in merge tree
2024-02-26 06:50:55 +00:00
Yingwen
e481f073f5 feat: Implement dedup for the new memtable and expose the config (#3377)
* fix: KeyValues num_fields() is incorrect

* chore: fix warnings

* feat: support dedup

* feat: allow using the new memtable

* feat: serde default for config

* fix: resets pk index after finishing a dict
2024-02-25 13:06:01 +00:00
Lei, HUANG
606309f49a fix: remove unused imports in memtable_util.rs (#3376) 2024-02-25 09:23:28 +00:00
Yingwen
8059b95e37 feat: Implement iter for the new memtable (#3373)
* chore: read shard builder

* chore: reuse pk weights

* chore: prune key

* chore: shard reader wip

* refactor: shard builder DataBatch

* feat: merge shard readers

* feat: return shard id in shard readers

* feat: impl partition reader

* chore: impl partition read

* feat: impl iter tree

* chore: save last yield pk id

* style: fix clippy

* refactor: rename ShardReaderImpl to ShardReader

* chore: address CR comment
2024-02-25 07:42:16 +00:00
Lei, HUANG
afe4633320 feat: merge tree dedup reader (#3375)
* feat: add dedup option to merge tree component

* feat: impl dedup reader for shard reader

* refactor: DedupReader::new to DedupReader::try_new

* refactor: remove DedupReader::current_key field

* fix: some cr comments

* fix: fmt

* fix: remove shard_id method from DedupSource
2024-02-24 13:50:49 +00:00
Yingwen
abbfd23d4b feat: Add freeze and fork method to the memtable (#3374)
* feat: add fork method to the memtable

* feat: allow mark immutable returns result

* feat: use fork to create the mutable memtable

* feat: remove memtable builder from freeze

* chore: warninigs

* fix: inspect error

* feat: iter returns result

* chore: maintains memtable id in region

* chore: update comment

* fix: remove region status if failed to freeze a memtable

* chroe: update comment

* chore: iter should not require sync

* chore: implement freeze and fork for the new memtable
2024-02-24 12:11:16 +00:00
Yingwen
1df64f294b refactor: Remove Item from merger's Node trait (#3371)
* refactor: data reader returns reference to data batch

* refactor: use range to create merger

* chore: Reference RecordBatch in DataBatch

* fix: top node not read if no next node

* refactor: move timestamp_array_to_i64_slice to data mod

* style: fix cilppy

* chore: derive copy for DataBatch

* chore: address CR comments
2024-02-24 07:19:48 +00:00
LFC
a6564e72b4 fix: treat "0" and "1" as valid boolean values. (#3370)
* Treat "0" and "1" as valid boolean values.

* Update src/sql/src/statements.rs

Co-authored-by: tison <wander4096@gmail.com>

* Fix tests.

---------

Co-authored-by: tison <wander4096@gmail.com>
2024-02-23 14:34:27 +00:00
Lei, HUANG
1f1d1b4f57 feat: distinguish between different read paths (#3369)
* feat: distinguish between different read paths

* fix: reformat code
2024-02-23 12:40:39 +00:00
Yingwen
b144836935 feat: Implement write and fork for the new memtable (#3357)
* feat: write to a shard or a shard builder

* feat: freeze and fork for partition and shards

* chore: shard builder

* chore: change dict reader to support random access

* test: test write shard

* test: test write

* test: test memtable

* feat: add new and write_row to DataParts

* refactor: partition freeze shards

* refactor: write_with_pk_id

* style: fix clippy

* chore: add methods to get pk weights

* chroe: fix compiler errors
2024-02-23 07:20:55 +00:00
dependabot[bot]
93d9f48dd7 build(deps): bump libgit2-sys from 0.16.1+1.7.1 to 0.16.2+1.7.2 (#3367)
Bumps [libgit2-sys](https://github.com/rust-lang/git2-rs) from 0.16.1+1.7.1 to 0.16.2+1.7.2.
- [Changelog](https://github.com/rust-lang/git2-rs/blob/master/CHANGELOG.md)
- [Commits](https://github.com/rust-lang/git2-rs/commits)

---
updated-dependencies:
- dependency-name: libgit2-sys
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-02-23 14:30:09 +08:00
Lei, HUANG
90e9b69035 feat: impl merge reader for DataParts (#3361)
* feat: impl merge reader for DataParts

* fix: fmt

* fix: sort rows with pk and ts according to sequnce desc

* fix: remove pk weight as pk index are already replace by weights

* fix: format

* fix: some cr comments

* fix: some cr comments

* refactor: simply trait's associated types

* fix: some cr comments
2024-02-23 06:07:55 +00:00
LFC
2035e7bf4c refactor: set the actual bound port in server handler (#3353)
* refactor: set the actual bound port so we can use port 0 in testing

* Update src/servers/src/server.rs

Co-authored-by: Weny Xu <wenymedia@gmail.com>

* fmt

---------

Co-authored-by: Weny Xu <wenymedia@gmail.com>
2024-02-23 02:49:11 +00:00
Ruihang Xia
7341f23019 feat: skip filling NULL for put and delete requests (#3364)
* feat: optimize for sparse data

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove old structures

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-02-22 14:30:43 +00:00
tison
41ee0cdd5a build(deps): Upgrade opensrv to 0.7.0 (#3362)
* build(deps): Upgrade opensrv to 0.7.0

Signed-off-by: tison <wander4096@gmail.com>

* workaround X is not X by casting

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
2024-02-22 13:11:26 +00:00
Kould
578dd8f87a feat: add isnull function (#3360)
* code fmt

* feat: add isnull function

* feat: add isnull function
2024-02-22 12:41:25 +00:00
Weny Xu
1dc4fec662 refactor: allocate table ids in the procedure (#3293)
* refactor: refactor the create logical tables

* test(create_logical_tables): add tests for on_prepare

* test(create_logical_tables): add tests for on_create_metadata

* refactor: rename to create_logical_tables_metadata

* chore: fmt toml

* chore: apply suggestions from CR
2024-02-22 10:53:28 +00:00
Ruihang Xia
f26505b625 fix: typo in lint config (#3358)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-02-22 08:56:33 +00:00
Ruihang Xia
8289b0dec2 ci: align docs workflow jobs with develop.yml (#3356)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-02-22 07:01:15 +00:00
Yingwen
53105b99e7 test: fix list_files_and_parse_table_name path issue on windows (#3349)
* fix: always converts path to slash

* chore: print

* chore: normalize dir

* chore: compile

* chore: rm print
2024-02-22 06:16:41 +00:00
dennis zhuang
564fe3beca feat: impl migrate_region and procedure_state SQL function (#3325)
* fix: logical region can't find region routes

* feat: fetch partitions info in batch

* refactor: rename batch functions

* refactor: rename DdlTaskExecutor to ProcedureExecutor

* feat: impl migrate_region and query_procedure_state for ProcedureExecutor

* feat: adds SQL function procedure_state and finish migrate_region impl

* fix: constant vector

* feat: unit tests for migrate_region and procedure_state

* test: test region migration by SQL

* fix: compile error after rebeasing

* fix: clippy warnings

* feat: ensure procedure_state and migrate_region can be only called under greptime catalog

* fix: license header
2024-02-22 02:37:11 +00:00
SteveLauC
e9a2b0a9ee chore: use workspace-wide lints (#3352)
* chore: use workspace-wide lints

* respond to review
2024-02-22 01:01:10 +00:00
discord9
860b1e9d9e feat(flow): impl ScalarExpr&Scalar Function (#3283)
* feat: impl for ScalarExpr

* feat: plain functions

* refactor: simpler trait bound&tests

* chore: remove unused imports

* chore: fmt

* refactor: early ret on first error

* refactor: remove abunant match arm

* chore: per review

* doc: `support` fn

* chore: per review more

* chore: more per review

* fix: extract_bound

* chore: per review

* refactor: reduce nest
2024-02-21 12:53:16 +00:00
Yingwen
7c88d721c2 Merge pull request #3348
* feat: define functions for partitions

* feat: write partitions

* feat: fork and freeze partition

* feat: create iter by partition

* style: fix clippy

* chore: typos

* feat: add scan method to builder

* feat: check whether the builder should freeze first
2024-02-21 20:50:34 +08:00
Lei, HUANG
90169c868d feat: merge tree data parts (#3346)
* feat: add iter method for DataPart

* chore: rename iter to reader

* chore: some doc

* fix: resolve some comments

* fix: remove metadata in DataPart
2024-02-21 11:37:29 +00:00
tison
4c07606da6 refactor: put together HTTP headers (#3337)
* refactor: put together HTTP headers

Signed-off-by: tison <wander4096@gmail.com>

* do refactor

Signed-off-by: tison <wander4096@gmail.com>

* drop dirty commit

Signed-off-by: tison <wander4096@gmail.com>

* reduce changeset

Signed-off-by: tison <wander4096@gmail.com>

* fixup compilations

Signed-off-by: tison <wander4096@gmail.com>

* tidy files

Signed-off-by: tison <wander4096@gmail.com>

* drop common-api

Signed-off-by: tison <wander4096@gmail.com>

* fmt

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
2024-02-21 09:51:10 +00:00
tison
a7bf458a37 chore: remove unused deprecated table_dir_with_catalog_and_schema (#3341) 2024-02-21 08:46:36 +00:00
tison
fa08085119 ci: upgrade actions to node20-based version (#3345)
* ci: upgrade actions to node20-based version

Signed-off-by: tison <wander4096@gmail.com>

* distinguish artifact name

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
2024-02-21 08:09:09 +00:00
Lei, HUANG
86a98c80f5 feat: replace pk index with pk_weight during freeze (#3343)
* feat: replace pk index with pk_weight during freeze

* chore: add parameter to control pk_index replacement

* fix: dedup pk weights also

* fix: generate pk array before dedup
2024-02-21 08:05:25 +00:00
tison
085a380019 build(deps): axum-tets-helper has included patch-1 (#3333)
Signed-off-by: tison <wander4096@gmail.com>
2024-02-21 07:49:42 +00:00
tison
d9a96344ee ci: try fix log location (#3342)
Signed-off-by: tison <wander4096@gmail.com>
2024-02-21 07:01:51 +00:00
Weny Xu
41656c8635 refactor: allocate table id in the procedure (#3271)
* refactor: replace TableMetadataManager with TableNameManager

* refactor: allocate table id in the procedure

* refactor: refactor client logical of handling retries

* feat(test_util): add TestCreateTableExprBuilder

* feat(test_util): add MockDatanodeManager

* feat(test_util): add new_ddl_context

* feat(test_util): add build_raw_table_info_from_expr

* feat(test_util): add MockDatanodeManager::new

* feat(procedure): add downcast_output_ref to Status

* test(create_table): add tests for CreateTableProcedure on_prepare

* refactor(ddl): rename handle_operate_region_error to add_peer_context_if_need

* test(create_table): add tests for CreateTableProcedure on_datanode_create_regions

* test(create_table): add tests for CreateTableProcedure on_create_metadata

* refactor(meta): use CreateTableExprBuilder

* feat(create_table): ensure number of partitions is greater than 0

* refactor: rename to add_peer_context_if_needed

* feat: add context for panic

* refactor: simplify the should_retry

* refactor: use Option<&T> instead of &Option<T>

* refactor: move downcast_output_ref under cfg(test)

* chore: fmt toml
2024-02-21 04:38:46 +00:00
tison
cf08a3de6b chore: support configure GITHUB_PROXY_URL when fetch dashboard assets (#3340)
Signed-off-by: tison <wander4096@gmail.com>
2024-02-21 02:38:14 +00:00
Yingwen
f087a843bb feat: Implement KeyDictBuilder for the merge tree memtable (#3334)
* feat: dict builder

* feat: write and scan dict builder

* chore: address CR comments
2024-02-20 15:39:17 +00:00
Lei, HUANG
450dfe324d feat: data buffer and related structs (#3329)
* feat: data buffer and related structs

* fix: some cr comments

* chore: remove freeze_threshold in DataBuffer

* fix: use LazyMutableVectorBuilder instead of two vector; add option to control dedup

* fix: dedup rows according to both pk weights and timestamps

* fix: assembly DataBatch on demand
2024-02-20 09:22:45 +00:00
tison
3dfe4a2e5a chore: check dirs before create RaftEngine store (#3327)
* chore: check dirs before create RaftEngine store

Signed-off-by: tison <wander4096@gmail.com>

* fix impl

Signed-off-by: tison <wander4096@gmail.com>

* improve naming

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
2024-02-20 07:48:15 +00:00
LFC
eded08897d test: add data compatibility test (#3109)
* test: data files compatibility test

* rework compatibility test

* revert unneeded changes

* revert unneeded changes

* debug CI

* Update .github/workflows/develop.yml

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

---------

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
2024-02-20 07:44:04 +00:00
Ruihang Xia
b1f54d8a03 fix: disable ansi contorl char when stdout is redirected (#3332)
* fix: disable ansi contorl char when stdout is redirected

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* don't touch file logging layer

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update comment

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* disable ansi for two file layers

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

Co-authored-by: LFC <bayinamine@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: LFC <bayinamine@gmail.com>
2024-02-20 06:42:56 +00:00
shuiyisong
bf5e1905cd refactor: bring metrics to http output (#3247)
* refactor: bring metrics to http output

* chore: remove unwrap

* chore: make walk plan accumulate

* chore: change field name and comment

* chore: add metrics to http resp header

* chore: move PrometheusJsonResponse to a separate file and impl IntoResponse

* chore: put metrics in prometheus resp header too
2024-02-20 03:25:18 +00:00
Zhenchi
6628c41c36 feat(metric-engine): set index options for data region (#3330)
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2024-02-20 02:38:35 +00:00
720 changed files with 44753 additions and 9898 deletions

View File

@@ -3,13 +3,3 @@ linker = "aarch64-linux-gnu-gcc"
[alias]
sqlness = "run --bin sqlness-runner --"
[build]
rustflags = [
# lints
# TODO: use lint configuration in cargo https://github.com/rust-lang/cargo/issues/5034
"-Wclippy::print_stdout",
"-Wclippy::print_stderr",
"-Wclippy::implicit_clone",
]

10
.editorconfig Normal file
View File

@@ -0,0 +1,10 @@
root = true
[*]
end_of_line = lf
indent_style = space
insert_final_newline = true
trim_trailing_whitespace = true
[{Makefile,**.mk}]
indent_style = tab

View File

@@ -21,3 +21,6 @@ GT_GCS_CREDENTIAL_PATH = GCS credential path
GT_GCS_ENDPOINT = GCS end point
# Settings for kafka wal test
GT_KAFKA_ENDPOINTS = localhost:9092
# Setting for fuzz tests
GT_MYSQL_ADDR = localhost:4002

View File

@@ -34,7 +34,7 @@ runs:
- name: Upload sqlness logs
if: ${{ failure() && inputs.disable-run-tests == 'false' }} # Only upload logs when the integration tests failed.
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: sqlness-logs
path: /tmp/greptime-*.log

View File

@@ -67,7 +67,7 @@ runs:
- name: Upload sqlness logs
if: ${{ failure() }} # Only upload logs when the integration tests failed.
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: sqlness-logs
path: /tmp/greptime-*.log

View File

@@ -62,15 +62,15 @@ runs:
- name: Upload sqlness logs
if: ${{ failure() }} # Only upload logs when the integration tests failed.
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: sqlness-logs
path: ${{ runner.temp }}/greptime-*.log
path: /tmp/greptime-*.log
retention-days: 3
- name: Build greptime binary
shell: pwsh
run: cargo build --profile ${{ inputs.cargo-profile }} --features ${{ inputs.features }} --target ${{ inputs.arch }}
run: cargo build --profile ${{ inputs.cargo-profile }} --features ${{ inputs.features }} --target ${{ inputs.arch }} --bin greptime
- name: Upload artifacts
uses: ./.github/actions/upload-artifacts

13
.github/actions/fuzz-test/action.yaml vendored Normal file
View File

@@ -0,0 +1,13 @@
name: Fuzz Test
description: 'Fuzz test given setup and service'
inputs:
target:
description: "The fuzz target to test"
runs:
using: composite
steps:
- name: Run Fuzz Test
shell: bash
run: cargo fuzz run ${{ inputs.target }} --fuzz-dir tests-fuzz -D -s none -- -max_total_time=120
env:
GT_MYSQL_ADDR: 127.0.0.1:4002

View File

@@ -1,8 +1,10 @@
I hereby agree to the terms of the [GreptimeDB CLA](https://gist.github.com/xtang/6378857777706e568c1949c7578592cc)
I hereby agree to the terms of the [GreptimeDB CLA](https://github.com/GreptimeTeam/.github/blob/main/CLA.md).
## Refer to a related PR or issue link (optional)
## What's changed and what's your intention?
_PLEASE DO NOT LEAVE THIS EMPTY !!!_
__!!! DO NOT LEAVE THIS BLOCK EMPTY !!!__
Please explain IN DETAIL what the changes are in this PR and why they are needed:
@@ -16,5 +18,3 @@ Please explain IN DETAIL what the changes are in this PR and why they are needed
- [ ] I have written the necessary rustdoc comments.
- [ ] I have added the necessary unit tests and integration tests.
- [x] This PR does not require documentation updates.
## Refer to a related PR or issue link (optional)

View File

@@ -40,3 +40,4 @@ jobs:
uses: JamesIves/github-pages-deploy-action@v4
with:
folder: target/doc
single-commit: true

View File

@@ -1,7 +1,7 @@
on:
merge_group:
pull_request:
types: [opened, synchronize, reopened, ready_for_review]
types: [ opened, synchronize, reopened, ready_for_review ]
paths-ignore:
- 'docs/**'
- 'config/**'
@@ -57,7 +57,7 @@ jobs:
toolchain: ${{ env.RUST_TOOLCHAIN }}
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
with:
# Shares across multiple jobs
# Shares with `Clippy` job
shared-key: "check-lint"
@@ -75,7 +75,7 @@ jobs:
toolchain: stable
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
with:
# Shares across multiple jobs
shared-key: "check-toml"
- name: Install taplo
@@ -102,7 +102,7 @@ jobs:
shared-key: "build-binaries"
- name: Build greptime binaries
shell: bash
run: cargo build
run: cargo build --bin greptime --bin sqlness-runner
- name: Pack greptime binaries
shell: bash
run: |
@@ -117,6 +117,46 @@ jobs:
artifacts-dir: bins
version: current
fuzztest:
name: Fuzz Test
needs: build
runs-on: ubuntu-latest
strategy:
matrix:
target: [ "fuzz_create_table", "fuzz_alter_table" ]
steps:
- uses: actions/checkout@v4
- uses: arduino/setup-protoc@v3
- uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
# Shares across multiple jobs
shared-key: "fuzz-test-targets"
- name: Set Rust Fuzz
shell: bash
run: |
sudo apt update && sudo apt install -y libfuzzer-14-dev
cargo install cargo-fuzz
- name: Download pre-built binaries
uses: actions/download-artifact@v4
with:
name: bins
path: .
- name: Unzip binaries
run: tar -xvf ./bins.tar.gz
- name: Run GreptimeDB
run: |
./bins/greptime standalone start&
- name: Fuzz Test
uses: ./.github/actions/fuzz-test
env:
CUSTOM_LIBFUZZER_PATH: /usr/lib/llvm-14/lib/libFuzzer.a
with:
target: ${{ matrix.target }}
sqlness:
name: Sqlness Test
needs: build
@@ -136,13 +176,12 @@ jobs:
run: tar -xvf ./bins.tar.gz
- name: Run sqlness
run: RUST_BACKTRACE=1 ./bins/sqlness-runner -c ./tests/cases --bins-dir ./bins
# FIXME: Logs cannot found be on failure (or even success). Need to figure out the cause.
- name: Upload sqlness logs
if: always()
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: sqlness-logs
path: ${{ runner.temp }}/greptime-*.log
path: /tmp/greptime-*.log
retention-days: 3
sqlness-kafka-wal:
@@ -167,13 +206,12 @@ jobs:
run: docker compose -f docker-compose-standalone.yml up -d --wait
- name: Run sqlness
run: RUST_BACKTRACE=1 ./bins/sqlness-runner -w kafka -k 127.0.0.1:9092 -c ./tests/cases --bins-dir ./bins
# FIXME: Logs cannot be found on failure (or even success). Need to figure out the cause.
- name: Upload sqlness logs
if: always()
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: sqlness-logs
path: ${{ runner.temp }}/greptime-*.log
name: sqlness-logs-with-kafka-wal
path: /tmp/greptime-*.log
retention-days: 3
fmt:
@@ -191,7 +229,7 @@ jobs:
components: rustfmt
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
with:
# Shares across multiple jobs
shared-key: "check-rust-fmt"
- name: Run cargo fmt
@@ -212,7 +250,7 @@ jobs:
components: clippy
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
with:
# Shares across multiple jobs
# Shares with `Check` job
shared-key: "check-lint"
@@ -241,6 +279,10 @@ jobs:
with:
# Shares cross multiple jobs
shared-key: "coverage-test"
- name: Docker Cache
uses: ScribeMD/docker-cache@0.3.7
with:
key: docker-${{ runner.os }}-coverage
- name: Install latest nextest release
uses: taiki-e/install-action@nextest
- name: Install cargo-llvm-cov
@@ -271,10 +313,28 @@ jobs:
GT_KAFKA_ENDPOINTS: 127.0.0.1:9092
UNITTEST_LOG_DIR: "__unittest_logs"
- name: Codecov upload
uses: codecov/codecov-action@v2
uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_TOKEN }}
files: ./lcov.info
flags: rust
fail_ci_if_error: false
verbose: true
compat:
name: Compatibility Test
needs: build
runs-on: ubuntu-20.04
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
- name: Download pre-built binaries
uses: actions/download-artifact@v4
with:
name: bins
path: .
- name: Unzip binaries
run: |
mkdir -p ./bins/current
tar -xvf ./bins.tar.gz --strip-components=1 -C ./bins/current
- run: ./tests/compat/test-compat.sh 0.6.0

View File

@@ -61,6 +61,18 @@ jobs:
sqlness:
name: Sqlness Test
runs-on: ubuntu-20.04
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ ubuntu-20.04 ]
steps:
- run: 'echo "No action required"'
sqlness-kafka-wal:
name: Sqlness Test with Kafka Wal
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ ubuntu-20.04 ]
steps:
- run: 'echo "No action required"'

View File

@@ -13,4 +13,4 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Check License Header
uses: korandoru/hawkeye@v4
uses: korandoru/hawkeye@v5

View File

@@ -45,10 +45,10 @@ jobs:
{"text": "Nightly CI failed for sqlness tests"}
- name: Upload sqlness logs
if: always()
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: sqlness-logs
path: ${{ runner.temp }}/greptime-*.log
path: /tmp/greptime-*.log
retention-days: 3
test-on-windows:

View File

@@ -91,7 +91,7 @@ env:
# The scheduled version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-YYYYMMDD', like v0.2.0-nigthly-20230313;
NIGHTLY_RELEASE_PREFIX: nightly
# Note: The NEXT_RELEASE_VERSION should be modified manually by every formal release.
NEXT_RELEASE_VERSION: v0.7.0
NEXT_RELEASE_VERSION: v0.8.0
jobs:
allocate-runners:
@@ -221,6 +221,8 @@ jobs:
arch: x86_64-apple-darwin
artifacts-dir-prefix: greptime-darwin-amd64-pyo3
runs-on: ${{ matrix.os }}
outputs:
build-macos-result: ${{ steps.set-build-macos-result.outputs.build-macos-result }}
needs: [
allocate-runners,
]
@@ -260,6 +262,8 @@ jobs:
features: pyo3_backend,servers/dashboard
artifacts-dir-prefix: greptime-windows-amd64-pyo3
runs-on: ${{ matrix.os }}
outputs:
build-windows-result: ${{ steps.set-build-windows-result.outputs.build-windows-result }}
needs: [
allocate-runners,
]
@@ -284,7 +288,7 @@ jobs:
- name: Set build windows result
id: set-build-windows-result
run: |
echo "build-windows-result=success" >> $GITHUB_OUTPUT
echo "build-windows-result=success" >> $Env:GITHUB_OUTPUT
release-images-to-dockerhub:
name: Build and push images to DockerHub
@@ -295,6 +299,8 @@ jobs:
build-linux-arm64-artifacts,
]
runs-on: ubuntu-2004-16-cores
outputs:
build-image-result: ${{ steps.set-build-image-result.outputs.build-image-result }}
steps:
- uses: actions/checkout@v4
with:
@@ -310,7 +316,7 @@ jobs:
version: ${{ needs.allocate-runners.outputs.version }}
- name: Set build image result
id: set-image-build-result
id: set-build-image-result
run: |
echo "build-image-result=success" >> $GITHUB_OUTPUT

21
.github/workflows/unassign.yml vendored Normal file
View File

@@ -0,0 +1,21 @@
name: Auto Unassign
on:
schedule:
- cron: '4 2 * * *'
workflow_dispatch:
permissions:
contents: read
issues: write
pull-requests: write
jobs:
auto-unassign:
name: Auto Unassign
runs-on: ubuntu-latest
steps:
- name: Auto Unassign
uses: tisonspieces/auto-unassign@main
with:
token: ${{ secrets.GH_PERSONAL_ACCESS_TOKEN }}
repository: ${{ github.repository }}

4
.gitignore vendored
View File

@@ -46,3 +46,7 @@ benchmarks/data
*.code-workspace
venv/
# Fuzz tests
tests-fuzz/artifacts/
tests-fuzz/corpus/

View File

@@ -1,132 +0,0 @@
# Contributor Covenant Code of Conduct
## Our Pledge
We as members, contributors, and leaders pledge to make participation in our
community a harassment-free experience for everyone, regardless of age, body
size, visible or invisible disability, ethnicity, sex characteristics, gender
identity and expression, level of experience, education, socio-economic status,
nationality, personal appearance, race, caste, color, religion, or sexual
identity and orientation.
We pledge to act and interact in ways that contribute to an open, welcoming,
diverse, inclusive, and healthy community.
## Our Standards
Examples of behavior that contributes to a positive environment for our
community include:
* Demonstrating empathy and kindness toward other people
* Being respectful of differing opinions, viewpoints, and experiences
* Giving and gracefully accepting constructive feedback
* Accepting responsibility and apologizing to those affected by our mistakes,
and learning from the experience
* Focusing on what is best not just for us as individuals, but for the overall
community
Examples of unacceptable behavior include:
* The use of sexualized language or imagery, and sexual attention or advances of
any kind
* Trolling, insulting or derogatory comments, and personal or political attacks
* Public or private harassment
* Publishing others' private information, such as a physical or email address,
without their explicit permission
* Other conduct which could reasonably be considered inappropriate in a
professional setting
## Enforcement Responsibilities
Community leaders are responsible for clarifying and enforcing our standards of
acceptable behavior and will take appropriate and fair corrective action in
response to any behavior that they deem inappropriate, threatening, offensive,
or harmful.
Community leaders have the right and responsibility to remove, edit, or reject
comments, commits, code, wiki edits, issues, and other contributions that are
not aligned to this Code of Conduct, and will communicate reasons for moderation
decisions when appropriate.
## Scope
This Code of Conduct applies within all community spaces, and also applies when
an individual is officially representing the community in public spaces.
Examples of representing our community include using an official e-mail address,
posting via an official social media account, or acting as an appointed
representative at an online or offline event.
## Enforcement
Instances of abusive, harassing, or otherwise unacceptable behavior may be
reported to the community leaders responsible for enforcement at
info@greptime.com.
All complaints will be reviewed and investigated promptly and fairly.
All community leaders are obligated to respect the privacy and security of the
reporter of any incident.
## Enforcement Guidelines
Community leaders will follow these Community Impact Guidelines in determining
the consequences for any action they deem in violation of this Code of Conduct:
### 1. Correction
**Community Impact**: Use of inappropriate language or other behavior deemed
unprofessional or unwelcome in the community.
**Consequence**: A private, written warning from community leaders, providing
clarity around the nature of the violation and an explanation of why the
behavior was inappropriate. A public apology may be requested.
### 2. Warning
**Community Impact**: A violation through a single incident or series of
actions.
**Consequence**: A warning with consequences for continued behavior. No
interaction with the people involved, including unsolicited interaction with
those enforcing the Code of Conduct, for a specified period of time. This
includes avoiding interactions in community spaces as well as external channels
like social media. Violating these terms may lead to a temporary or permanent
ban.
### 3. Temporary Ban
**Community Impact**: A serious violation of community standards, including
sustained inappropriate behavior.
**Consequence**: A temporary ban from any sort of interaction or public
communication with the community for a specified period of time. No public or
private interaction with the people involved, including unsolicited interaction
with those enforcing the Code of Conduct, is allowed during this period.
Violating these terms may lead to a permanent ban.
### 4. Permanent Ban
**Community Impact**: Demonstrating a pattern of violation of community
standards, including sustained inappropriate behavior, harassment of an
individual, or aggression toward or disparagement of classes of individuals.
**Consequence**: A permanent ban from any sort of public interaction within the
community.
## Attribution
This Code of Conduct is adapted from the [Contributor Covenant][homepage],
version 2.1, available at
[https://www.contributor-covenant.org/version/2/1/code_of_conduct.html][v2.1].
Community Impact Guidelines were inspired by
[Mozilla's code of conduct enforcement ladder][Mozilla CoC].
For answers to common questions about this code of conduct, see the FAQ at
[https://www.contributor-covenant.org/faq][FAQ]. Translations are available at
[https://www.contributor-covenant.org/translations][translations].
[homepage]: https://www.contributor-covenant.org
[v2.1]: https://www.contributor-covenant.org/version/2/1/code_of_conduct.html
[Mozilla CoC]: https://github.com/mozilla/diversity
[FAQ]: https://www.contributor-covenant.org/faq
[translations]: https://www.contributor-covenant.org/translations

805
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -18,6 +18,7 @@ members = [
"src/common/grpc-expr",
"src/common/mem-prof",
"src/common/meta",
"src/common/plugins",
"src/common/procedure",
"src/common/procedure-test",
"src/common/query",
@@ -61,17 +62,23 @@ members = [
resolver = "2"
[workspace.package]
version = "0.6.0"
version = "0.7.2"
edition = "2021"
license = "Apache-2.0"
[workspace.lints]
clippy.print_stdout = "warn"
clippy.print_stderr = "warn"
clippy.implicit_clone = "warn"
rust.unknown_lints = "deny"
[workspace.dependencies]
ahash = { version = "0.8", features = ["compile-time-rng"] }
aquamarine = "0.3"
arrow = { version = "47.0" }
arrow-array = "47.0"
arrow-flight = "47.0"
arrow-ipc = "47.0"
arrow-ipc = { version = "47.0", features = ["lz4"] }
arrow-schema = { version = "47.0", features = ["serde"] }
async-stream = "0.3"
async-trait = "0.1"
@@ -92,17 +99,20 @@ datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.g
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion-substrait = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
derive_builder = "0.12"
dotenv = "0.15"
etcd-client = "0.12"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "96f1f0404f421ee560a4310c73c5071e49168168" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1bd2398b686e5ac6c1eef6daf615867ce27f75c1" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "abbd357c1e193cd270ea65ee7652334a150b628f" }
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "80b72716dcde47ec4161478416a5c6c21343364d" }
mockall = "0.11.4"
moka = "0.12"
notify = "6.1"
num_cpus = "1.16"
once_cell = "1.18"
opentelemetry-proto = { git = "https://github.com/waynexia/opentelemetry-rust.git", rev = "33841b38dda79b15f2024952be5f32533325ca02", features = [
@@ -118,7 +128,7 @@ prost = "0.12"
raft-engine = { version = "0.4.1", default-features = false }
rand = "0.8"
regex = "1.8"
regex-automata = { version = "0.2", features = ["transducer"] }
regex-automata = { version = "0.4" }
reqwest = { version = "0.11", default-features = false, features = [
"json",
"rustls-tls-native-roots",
@@ -126,8 +136,9 @@ reqwest = { version = "0.11", default-features = false, features = [
] }
rskafka = "0.5"
rust_decimal = "1.33"
schemars = "0.8"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_json = { version = "1.0", features = ["float_roundtrip"] }
serde_with = "3"
smallvec = { version = "1", features = ["serde"] }
snafu = "0.7"
@@ -144,6 +155,7 @@ tokio-util = { version = "0.7", features = ["io-util", "compat"] }
toml = "0.8.8"
tonic = { version = "0.10", features = ["tls"] }
uuid = { version = "1", features = ["serde", "v4", "fast-rng"] }
zstd = "0.13"
## workspaces members
api = { path = "src/api" }
@@ -164,6 +176,7 @@ common-grpc-expr = { path = "src/common/grpc-expr" }
common-macro = { path = "src/common/macro" }
common-mem-prof = { path = "src/common/mem-prof" }
common-meta = { path = "src/common/meta" }
common-plugins = { path = "src/common/plugins" }
common-procedure = { path = "src/common/procedure" }
common-procedure-test = { path = "src/common/procedure-test" }
common-query = { path = "src/common/query" }
@@ -201,7 +214,7 @@ table = { path = "src/table" }
[workspace.dependencies.meter-macros]
git = "https://github.com/GreptimeTeam/greptime-meter.git"
rev = "abbd357c1e193cd270ea65ee7652334a150b628f"
rev = "80b72716dcde47ec4161478416a5c6c21343364d"
[profile.release]
debug = 1

View File

@@ -3,6 +3,7 @@ CARGO_PROFILE ?=
FEATURES ?=
TARGET_DIR ?=
TARGET ?=
BUILD_BIN ?= greptime
CARGO_BUILD_OPTS := --locked
IMAGE_REGISTRY ?= docker.io
IMAGE_NAMESPACE ?= greptime
@@ -45,6 +46,10 @@ ifneq ($(strip $(TARGET)),)
CARGO_BUILD_OPTS += --target ${TARGET}
endif
ifneq ($(strip $(BUILD_BIN)),)
CARGO_BUILD_OPTS += --bin ${BUILD_BIN}
endif
ifneq ($(strip $(RELEASE)),)
CARGO_BUILD_OPTS += --release
endif

219
README.md
View File

@@ -6,145 +6,154 @@
</picture>
</p>
<h1 align="center">Cloud-scale, Fast and Efficient Time Series Database</h1>
<div align="center">
<h3 align="center">
The next-generation hybrid time-series/analytics processing database in the cloud
</h3>
<a href="https://greptime.com/product/cloud">GreptimeCloud</a> |
<a href="https://docs.greptime.com/">User guide</a> |
<a href="https://greptimedb.rs/">API Docs</a> |
<a href="https://github.com/GreptimeTeam/greptimedb/issues/3412">Roadmap 2024</a>
</h4>
<p align="center">
<a href="https://codecov.io/gh/GrepTimeTeam/greptimedb"><img src="https://codecov.io/gh/GrepTimeTeam/greptimedb/branch/main/graph/badge.svg?token=FITFDI3J3C"></img></a>
&nbsp;
<a href="https://github.com/GreptimeTeam/greptimedb/actions/workflows/develop.yml"><img src="https://github.com/GreptimeTeam/greptimedb/actions/workflows/develop.yml/badge.svg" alt="CI"></img></a>
&nbsp;
<a href="https://github.com/greptimeTeam/greptimedb/blob/main/LICENSE"><img src="https://img.shields.io/github/license/greptimeTeam/greptimedb"></a>
</p>
<a href="https://github.com/GreptimeTeam/greptimedb/releases/latest">
<img src="https://img.shields.io/github/v/release/GreptimeTeam/greptimedb.svg" alt="Version"/>
</a>
<a href="https://github.com/GreptimeTeam/greptimedb/releases/latest">
<img src="https://img.shields.io/github/release-date/GreptimeTeam/greptimedb.svg" alt="Releases"/>
</a>
<a href="https://hub.docker.com/r/greptime/greptimedb/">
<img src="https://img.shields.io/docker/pulls/greptime/greptimedb.svg" alt="Docker Pulls"/>
</a>
<a href="https://github.com/GreptimeTeam/greptimedb/actions/workflows/develop.yml">
<img src="https://github.com/GreptimeTeam/greptimedb/actions/workflows/develop.yml/badge.svg" alt="GitHub Actions"/>
</a>
<a href="https://codecov.io/gh/GrepTimeTeam/greptimedb">
<img src="https://codecov.io/gh/GrepTimeTeam/greptimedb/branch/main/graph/badge.svg?token=FITFDI3J3C" alt="Codecov"/>
</a>
<a href="https://github.com/greptimeTeam/greptimedb/blob/main/LICENSE">
<img src="https://img.shields.io/github/license/greptimeTeam/greptimedb" alt="License"/>
</a>
<p align="center">
<a href="https://twitter.com/greptime"><img src="https://img.shields.io/badge/twitter-follow_us-1d9bf0.svg"></a>
&nbsp;
<a href="https://www.linkedin.com/company/greptime/"><img src="https://img.shields.io/badge/linkedin-connect_with_us-0a66c2.svg"></a>
&nbsp;
<a href="https://greptime.com/slack"><img src="https://img.shields.io/badge/slack-GreptimeDB-0abd59?logo=slack" alt="slack" /></a>
</p>
<br/>
## What is GreptimeDB
<a href="https://greptime.com/slack">
<img src="https://img.shields.io/badge/slack-GreptimeDB-0abd59?logo=slack&style=for-the-badge" alt="Slack"/>
</a>
<a href="https://twitter.com/greptime">
<img src="https://img.shields.io/badge/twitter-follow_us-1d9bf0.svg?style=for-the-badge" alt="Twitter"/>
</a>
<a href="https://www.linkedin.com/company/greptime/">
<img src="https://img.shields.io/badge/linkedin-connect_with_us-0a66c2.svg?style=for-the-badge" alt="LinkedIn"/>
</a>
</div>
GreptimeDB is an open-source time-series database focusing on efficiency, scalability, and analytical capabilities.
It's designed to work on infrastructure of the cloud era, and users benefit from its elasticity and commodity storage.
## Introduction
Our core developers have been building time-series data platforms for years. Based on their best-practices, GreptimeDB is born to give you:
**GreptimeDB** is an open-source time-series database focusing on efficiency, scalability, and analytical capabilities.
Designed to work on infrastructure of the cloud era, GreptimeDB benefits users with its elasticity and commodity storage, offering a fast and cost-effective **alternative to InfluxDB** and a **long-term storage for Prometheus**.
- Optimized columnar layout for handling time-series data; compacted, compressed, and stored on various storage backends, particularly cloud object storage with 50x cost efficiency.
- Fully open-source distributed cluster architecture that harnesses the power of cloud-native elastic computing resources.
- Seamless scalability from a standalone binary at edge to a robust, highly available distributed cluster in cloud, with a transparent experience for both developers and administrators.
- Native SQL and PromQL for queries, and Python scripting to facilitate complex analytical tasks.
- Flexible indexing capabilities and distributed, parallel-processing query engine, tackling high cardinality issues down.
- Widely adopted database protocols and APIs, including MySQL, PostgreSQL, and Prometheus Remote Storage, etc.
## Why GreptimeDB
## Quick Start
Our core developers have been building time-series data platforms for years. Based on our best-practices, GreptimeDB is born to give you:
### [GreptimePlay](https://greptime.com/playground)
* **Easy horizontal scaling**
Seamless scalability from a standalone binary at edge to a robust, highly available distributed cluster in cloud, with a transparent experience for both developers and administrators.
* **Analyzing time-series data**
Query your time-series data with SQL and PromQL. Use Python scripts to facilitate complex analytical tasks.
* **Cloud-native distributed database**
Fully open-source distributed cluster architecture that harnesses the power of cloud-native elastic computing resources.
* **Performance and Cost-effective**
Flexible indexing capabilities and distributed, parallel-processing query engine, tackling high cardinality issues down. Optimized columnar layout for handling time-series data; compacted, compressed, and stored on various storage backends, particularly cloud object storage with 50x cost efficiency.
* **Compatible with InfluxDB, Prometheus and more protocols**
Widely adopted database protocols and APIs, including MySQL, PostgreSQL, and Prometheus Remote Storage, etc. [Read more](https://docs.greptime.com/user-guide/clients/overview).
## Try GreptimeDB
### 1. [GreptimePlay](https://greptime.com/playground)
Try out the features of GreptimeDB right from your browser.
### Build
### 2. [GreptimeCloud](https://console.greptime.cloud/)
#### Build from Source
Start instantly with a free cluster.
To compile GreptimeDB from source, you'll need:
### 3. Docker Image
- C/C++ Toolchain: provides basic tools for compiling and linking. This is
available as `build-essential` on ubuntu and similar name on other platforms.
- Rust: the easiest way to install Rust is to use
[`rustup`](https://rustup.rs/), which will check our `rust-toolchain` file and
install correct Rust version for you.
- Protobuf: `protoc` is required for compiling `.proto` files. `protobuf` is
available from major package manager on macos and linux distributions. You can
find an installation instructions [here](https://grpc.io/docs/protoc-installation/).
**Note that `protoc` version needs to be >= 3.15** because we have used the `optional`
keyword. You can check it with `protoc --version`.
- python3-dev or python3-devel(Optional feature, only needed if you want to run scripts
in CPython, and also need to enable `pyo3_backend` feature when compiling(by `cargo run -F pyo3_backend` or add `pyo3_backend` to src/script/Cargo.toml 's `features.default` like `default = ["python", "pyo3_backend]`)): this install a Python shared library required for running Python
scripting engine(In CPython Mode). This is available as `python3-dev` on
ubuntu, you can install it with `sudo apt install python3-dev`, or
`python3-devel` on RPM based distributions (e.g. Fedora, Red Hat, SuSE). Mac's
`Python3` package should have this shared library by default. More detail for compiling with PyO3 can be found in [PyO3](https://pyo3.rs/v0.18.1/building_and_distribution#configuring-the-python-version)'s documentation.
To install GreptimeDB locally, the recommended way is via Docker:
#### Build with Docker
A docker image with necessary dependencies is provided:
```
docker build --network host -f docker/Dockerfile -t greptimedb .
```shell
docker pull greptime/greptimedb
```
### Run
Start GreptimeDB from source code, in standalone mode:
Start a GreptimeDB container with:
```shell
docker run --rm --name greptime --net=host greptime/greptimedb standalone start
```
Read more about [Installation](https://docs.greptime.com/getting-started/installation/overview) on docs.
## Getting Started
* [Quickstart](https://docs.greptime.com/getting-started/quick-start/overview)
* [Write Data](https://docs.greptime.com/user-guide/clients/overview)
* [Query Data](https://docs.greptime.com/user-guide/query-data/overview)
* [Operations](https://docs.greptime.com/user-guide/operations/overview)
## Build
Check the prerequisite:
* [Rust toolchain](https://www.rust-lang.org/tools/install) (nightly)
* [Protobuf compiler](https://grpc.io/docs/protoc-installation/) (>= 3.15)
* Python toolchain (optional): Required only if built with PyO3 backend. More detail for compiling with PyO3 can be found in its [documentation](https://pyo3.rs/v0.18.1/building_and_distribution#configuring-the-python-version).
Build GreptimeDB binary:
```shell
make
```
Run a standalone server:
```shell
cargo run -- standalone start
```
Or if you built from docker:
```
docker run -p 4002:4002 -v "$(pwd):/tmp/greptimedb" greptime/greptimedb standalone start
```
Please see the online document site for more installation options and [operations info](https://docs.greptime.com/user-guide/operations/overview).
### Get started
Read the [complete getting started guide](https://docs.greptime.com/getting-started/overview) on our [official document site](https://docs.greptime.com/).
To write and query data, GreptimeDB is compatible with multiple [protocols and clients](https://docs.greptime.com/user-guide/clients/overview).
## Resources
### Installation
- [Pre-built Binaries](https://greptime.com/download):
For Linux and macOS, you can easily download pre-built binaries including official releases and nightly builds that are ready to use.
In most cases, downloading the version without PyO3 is sufficient. However, if you plan to run scripts in CPython (and use Python packages like NumPy and Pandas), you will need to download the version with PyO3 and install a Python with the same version as the Python in the PyO3 version.
We recommend using virtualenv for the installation process to manage multiple Python versions.
- [Docker Images](https://hub.docker.com/r/greptime/greptimedb)(**recommended**): pre-built
Docker images, this is the easiest way to try GreptimeDB. By default it runs CPython script with `pyo3_backend` enabled.
- [`gtctl`](https://github.com/GreptimeTeam/gtctl): the command-line tool for
Kubernetes deployment
### Documentation
- GreptimeDB [User Guide](https://docs.greptime.com/user-guide/concepts/overview)
- GreptimeDB [Developer
Guide](https://docs.greptime.com/developer-guide/overview.html)
- GreptimeDB [internal code document](https://greptimedb.rs)
## Extension
### Dashboard
- [The dashboard UI for GreptimeDB](https://github.com/GreptimeTeam/dashboard)
### SDK
- [GreptimeDB C++ Client](https://github.com/GreptimeTeam/greptimedb-client-cpp)
- [GreptimeDB Erlang Client](https://github.com/GreptimeTeam/greptimedb-client-erl)
- [GreptimeDB Go Ingester](https://github.com/GreptimeTeam/greptimedb-ingester-go)
- [GreptimeDB Java Ingester](https://github.com/GreptimeTeam/greptimedb-ingester-java)
- [GreptimeDB Python Client](https://github.com/GreptimeTeam/greptimedb-client-py) (WIP)
- [GreptimeDB Rust Client](https://github.com/GreptimeTeam/greptimedb-client-rust)
- [GreptimeDB JavaScript Client](https://github.com/GreptimeTeam/greptime-js-sdk)
- [GreptimeDB C++ Ingester](https://github.com/GreptimeTeam/greptimedb-ingester-cpp)
- [GreptimeDB Erlang Ingester](https://github.com/GreptimeTeam/greptimedb-ingester-erl)
- [GreptimeDB Rust Ingester](https://github.com/GreptimeTeam/greptimedb-ingester-rust)
- [GreptimeDB JavaScript Ingester](https://github.com/GreptimeTeam/greptime-ingester-js)
### Grafana Dashboard
Our official Grafana dashboard is available at [grafana](./grafana/README.md) directory.
Our official Grafana dashboard is available at [grafana](grafana/README.md) directory.
## Project Status
This project is in its early stage and under heavy development. We move fast and
break things. Benchmark on development branch may not represent its potential
performance. We release pre-built binaries constantly for functional
evaluation. Do not use it in production at the moment.
For future plans, check out [GreptimeDB roadmap](https://github.com/GreptimeTeam/greptimedb/issues/669).
The current version has not yet reached General Availability version standards.
In line with our Greptime 2024 Roadmap, we plan to achieve a production-level
version with the update to v1.0 in August. [[Join Force]](https://github.com/GreptimeTeam/greptimedb/issues/3412)
## Community
@@ -154,12 +163,12 @@ and what went wrong. If you have any questions or if you would like to get invol
community, please check out:
- GreptimeDB Community on [Slack](https://greptime.com/slack)
- GreptimeDB GitHub [Discussions](https://github.com/GreptimeTeam/greptimedb/discussions)
- Greptime official [Website](https://greptime.com)
- GreptimeDB [GitHub Discussions forum](https://github.com/GreptimeTeam/greptimedb/discussions)
- Greptime official [website](https://greptime.com)
In addition, you may:
- View our official [Blog](https://greptime.com/blogs/index)
- View our official [Blog](https://greptime.com/blogs/)
- Connect us with [Linkedin](https://www.linkedin.com/company/greptime/)
- Follow us on [Twitter](https://twitter.com/greptime)
@@ -170,7 +179,7 @@ open contributions and allowing you to use the software however you want.
## Contributing
Please refer to [contribution guidelines](CONTRIBUTING.md) for more information.
Please refer to [contribution guidelines](CONTRIBUTING.md) and [internal concepts docs](https://docs.greptime.com/contributor-guide/overview.html) for more information.
## Acknowledgement

View File

@@ -4,13 +4,35 @@ version.workspace = true
edition.workspace = true
license.workspace = true
[lints]
workspace = true
[dependencies]
api.workspace = true
arrow.workspace = true
chrono.workspace = true
clap.workspace = true
client.workspace = true
common-base.workspace = true
common-telemetry.workspace = true
common-wal.workspace = true
dotenv.workspace = true
futures.workspace = true
futures-util.workspace = true
humantime.workspace = true
humantime-serde.workspace = true
indicatif = "0.17.1"
itertools.workspace = true
lazy_static.workspace = true
log-store.workspace = true
mito2.workspace = true
num_cpus.workspace = true
parquet.workspace = true
prometheus.workspace = true
rand.workspace = true
rskafka.workspace = true
serde.workspace = true
store-api.workspace = true
tokio.workspace = true
toml.workspace = true
uuid.workspace = true

11
benchmarks/README.md Normal file
View File

@@ -0,0 +1,11 @@
Benchmarkers for GreptimeDB
--------------------------------
## Wal Benchmarker
The wal benchmarker serves to evaluate the performance of GreptimeDB's Write-Ahead Log (WAL) component. It meticulously assesses the read/write performance of the WAL under diverse workloads generated by the benchmarker.
### How to use
To compile the benchmarker, navigate to the `greptimedb/benchmarks` directory and execute `cargo build --release`. Subsequently, you'll find the compiled target located at `greptimedb/target/release/wal_bench`.
The `./wal_bench -h` command reveals numerous arguments that the target accepts. Among these, a notable one is the `cfg-file` argument. By utilizing a configuration file in the TOML format, you can bypass the need to repeatedly specify cumbersome arguments.

View File

@@ -0,0 +1,21 @@
# Refers to the documents of `Args` in benchmarks/src/wal.rs`.
wal_provider = "kafka"
bootstrap_brokers = ["localhost:9092"]
num_workers = 10
num_topics = 32
num_regions = 1000
num_scrapes = 1000
num_rows = 5
col_types = "ifs"
max_batch_size = "512KB"
linger = "1ms"
backoff_init = "10ms"
backoff_max = "1ms"
backoff_base = 2
backoff_deadline = "3s"
compression = "zstd"
rng_seed = 42
skip_read = false
skip_write = false
random_topics = true
report_metrics = false

View File

@@ -29,7 +29,7 @@ use client::api::v1::column::Values;
use client::api::v1::{
Column, ColumnDataType, ColumnDef, CreateTableExpr, InsertRequest, InsertRequests, SemanticType,
};
use client::{Client, Database, Output, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use client::{Client, Database, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use futures_util::TryStreamExt;
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
@@ -502,9 +502,9 @@ async fn do_query(num_iter: usize, db: &Database, table_name: &str) {
for i in 0..num_iter {
let now = Instant::now();
let res = db.sql(&query).await.unwrap();
match res {
Output::AffectedRows(_) | Output::RecordBatches(_) => (),
Output::Stream(stream) => {
match res.data {
OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => (),
OutputData::Stream(stream) => {
stream.try_collect::<Vec<_>>().await.unwrap();
}
}

View File

@@ -0,0 +1,326 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![feature(int_roundings)]
use std::fs;
use std::sync::Arc;
use std::time::Instant;
use api::v1::{ColumnDataType, ColumnSchema, SemanticType};
use benchmarks::metrics;
use benchmarks::wal_bench::{Args, Config, Region, WalProvider};
use clap::Parser;
use common_telemetry::info;
use common_wal::config::kafka::common::BackoffConfig;
use common_wal::config::kafka::DatanodeKafkaConfig as KafkaConfig;
use common_wal::config::raft_engine::RaftEngineConfig;
use common_wal::options::{KafkaWalOptions, WalOptions};
use itertools::Itertools;
use log_store::kafka::log_store::KafkaLogStore;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use mito2::wal::Wal;
use prometheus::{Encoder, TextEncoder};
use rand::distributions::{Alphanumeric, DistString};
use rand::rngs::SmallRng;
use rand::SeedableRng;
use rskafka::client::partition::Compression;
use rskafka::client::ClientBuilder;
use store_api::logstore::LogStore;
use store_api::storage::RegionId;
async fn run_benchmarker<S: LogStore>(cfg: &Config, topics: &[String], wal: Arc<Wal<S>>) {
let chunk_size = cfg.num_regions.div_ceil(cfg.num_workers);
let region_chunks = (0..cfg.num_regions)
.map(|id| {
build_region(
id as u64,
topics,
&mut SmallRng::seed_from_u64(cfg.rng_seed),
cfg,
)
})
.chunks(chunk_size as usize)
.into_iter()
.map(|chunk| Arc::new(chunk.collect::<Vec<_>>()))
.collect::<Vec<_>>();
let mut write_elapsed = 0;
let mut read_elapsed = 0;
if !cfg.skip_write {
info!("Benchmarking write ...");
let num_scrapes = cfg.num_scrapes;
let timer = Instant::now();
futures::future::join_all((0..cfg.num_workers).map(|i| {
let wal = wal.clone();
let regions = region_chunks[i as usize].clone();
tokio::spawn(async move {
for _ in 0..num_scrapes {
let mut wal_writer = wal.writer();
regions
.iter()
.for_each(|region| region.add_wal_entry(&mut wal_writer));
wal_writer.write_to_wal().await.unwrap();
}
})
}))
.await;
write_elapsed += timer.elapsed().as_millis();
}
if !cfg.skip_read {
info!("Benchmarking read ...");
let timer = Instant::now();
futures::future::join_all((0..cfg.num_workers).map(|i| {
let wal = wal.clone();
let regions = region_chunks[i as usize].clone();
tokio::spawn(async move {
for region in regions.iter() {
region.replay(&wal).await;
}
})
}))
.await;
read_elapsed = timer.elapsed().as_millis();
}
dump_report(cfg, write_elapsed, read_elapsed);
}
fn build_region(id: u64, topics: &[String], rng: &mut SmallRng, cfg: &Config) -> Region {
let wal_options = match cfg.wal_provider {
WalProvider::Kafka => {
assert!(!topics.is_empty());
WalOptions::Kafka(KafkaWalOptions {
topic: topics.get(id as usize % topics.len()).cloned().unwrap(),
})
}
WalProvider::RaftEngine => WalOptions::RaftEngine,
};
Region::new(
RegionId::from_u64(id),
build_schema(&parse_col_types(&cfg.col_types), rng),
wal_options,
cfg.num_rows,
cfg.rng_seed,
)
}
fn build_schema(col_types: &[ColumnDataType], mut rng: &mut SmallRng) -> Vec<ColumnSchema> {
col_types
.iter()
.map(|col_type| ColumnSchema {
column_name: Alphanumeric.sample_string(&mut rng, 5),
datatype: *col_type as i32,
semantic_type: SemanticType::Field as i32,
datatype_extension: None,
})
.chain(vec![ColumnSchema {
column_name: "ts".to_string(),
datatype: ColumnDataType::TimestampMillisecond as i32,
semantic_type: SemanticType::Tag as i32,
datatype_extension: None,
}])
.collect()
}
fn dump_report(cfg: &Config, write_elapsed: u128, read_elapsed: u128) {
let cost_report = format!(
"write costs: {} ms, read costs: {} ms",
write_elapsed, read_elapsed,
);
let total_written_bytes = metrics::METRIC_WAL_WRITE_BYTES_TOTAL.get() as u128;
let write_throughput = if write_elapsed > 0 {
(total_written_bytes * 1000).div_floor(write_elapsed)
} else {
0
};
let total_read_bytes = metrics::METRIC_WAL_READ_BYTES_TOTAL.get() as u128;
let read_throughput = if read_elapsed > 0 {
(total_read_bytes * 1000).div_floor(read_elapsed)
} else {
0
};
let throughput_report = format!(
"total written bytes: {} bytes, total read bytes: {} bytes, write throuput: {} bytes/s ({} mb/s), read throughput: {} bytes/s ({} mb/s)",
total_written_bytes,
total_read_bytes,
write_throughput,
write_throughput.div_floor(1 << 20),
read_throughput,
read_throughput.div_floor(1 << 20),
);
let metrics_report = if cfg.report_metrics {
let mut buffer = Vec::new();
let encoder = TextEncoder::new();
let metrics = prometheus::gather();
encoder.encode(&metrics, &mut buffer).unwrap();
String::from_utf8(buffer).unwrap()
} else {
String::new()
};
info!(
r#"
Benchmark config:
{cfg:?}
Benchmark report:
{cost_report}
{throughput_report}
{metrics_report}"#
);
}
async fn create_topics(cfg: &Config) -> Vec<String> {
// Creates topics.
let client = ClientBuilder::new(cfg.bootstrap_brokers.clone())
.build()
.await
.unwrap();
let ctrl_client = client.controller_client().unwrap();
let (topics, tasks): (Vec<_>, Vec<_>) = (0..cfg.num_topics)
.map(|i| {
let topic = if cfg.random_topics {
format!(
"greptime_wal_bench_topic_{}_{}",
uuid::Uuid::new_v4().as_u128(),
i
)
} else {
format!("greptime_wal_bench_topic_{}", i)
};
let task = ctrl_client.create_topic(
topic.clone(),
1,
cfg.bootstrap_brokers.len() as i16,
2000,
);
(topic, task)
})
.unzip();
// Must ignore errors since we allow topics being created more than once.
let _ = futures::future::try_join_all(tasks).await;
topics
}
fn parse_compression(comp: &str) -> Compression {
match comp {
"no" => Compression::NoCompression,
"gzip" => Compression::Gzip,
"lz4" => Compression::Lz4,
"snappy" => Compression::Snappy,
"zstd" => Compression::Zstd,
other => unreachable!("Unrecognized compression {other}"),
}
}
fn parse_col_types(col_types: &str) -> Vec<ColumnDataType> {
let parts = col_types.split('x').collect::<Vec<_>>();
assert!(parts.len() <= 2);
let pattern = parts[0];
let repeat = parts
.get(1)
.map(|r| r.parse::<usize>().unwrap())
.unwrap_or(1);
pattern
.chars()
.map(|c| match c {
'i' | 'I' => ColumnDataType::Int64,
'f' | 'F' => ColumnDataType::Float64,
's' | 'S' => ColumnDataType::String,
other => unreachable!("Cannot parse {other} as a column data type"),
})
.cycle()
.take(pattern.len() * repeat)
.collect()
}
fn main() {
// Sets the global logging to INFO and suppress loggings from rskafka other than ERROR and upper ones.
std::env::set_var("UNITTEST_LOG_LEVEL", "info,rskafka=error");
common_telemetry::init_default_ut_logging();
let args = Args::parse();
let cfg = if !args.cfg_file.is_empty() {
toml::from_str(&fs::read_to_string(&args.cfg_file).unwrap()).unwrap()
} else {
Config::from(args)
};
// Validates arguments.
if cfg.num_regions < cfg.num_workers {
panic!("num_regions must be greater than or equal to num_workers");
}
if cfg
.num_workers
.min(cfg.num_topics)
.min(cfg.num_regions)
.min(cfg.num_scrapes)
.min(cfg.max_batch_size.as_bytes() as u32)
.min(cfg.bootstrap_brokers.len() as u32)
== 0
{
panic!("Invalid arguments");
}
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
match cfg.wal_provider {
WalProvider::Kafka => {
let topics = create_topics(&cfg).await;
let kafka_cfg = KafkaConfig {
broker_endpoints: cfg.bootstrap_brokers.clone(),
max_batch_size: cfg.max_batch_size,
linger: cfg.linger,
backoff: BackoffConfig {
init: cfg.backoff_init,
max: cfg.backoff_max,
base: cfg.backoff_base,
deadline: Some(cfg.backoff_deadline),
},
compression: parse_compression(&cfg.compression),
..Default::default()
};
let store = Arc::new(KafkaLogStore::try_new(&kafka_cfg).await.unwrap());
let wal = Arc::new(Wal::new(store));
run_benchmarker(&cfg, &topics, wal).await;
}
WalProvider::RaftEngine => {
// The benchmarker assumes the raft engine directory exists.
let store = RaftEngineLogStore::try_new(
"/tmp/greptimedb/raft-engine-wal".to_string(),
RaftEngineConfig::default(),
)
.await
.map(Arc::new)
.unwrap();
let wal = Arc::new(Wal::new(store));
run_benchmarker(&cfg, &[], wal).await;
}
}
});
}

16
benchmarks/src/lib.rs Normal file
View File

@@ -0,0 +1,16 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod metrics;
pub mod wal_bench;

39
benchmarks/src/metrics.rs Normal file
View File

@@ -0,0 +1,39 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use lazy_static::lazy_static;
use prometheus::*;
/// Logstore label.
pub const LOGSTORE_LABEL: &str = "logstore";
/// Operation type label.
pub const OPTYPE_LABEL: &str = "optype";
lazy_static! {
/// Counters of bytes of each operation on a logstore.
pub static ref METRIC_WAL_OP_BYTES_TOTAL: IntCounterVec = register_int_counter_vec!(
"greptime_bench_wal_op_bytes_total",
"wal operation bytes total",
&[OPTYPE_LABEL],
)
.unwrap();
/// Counter of bytes of the append_batch operation.
pub static ref METRIC_WAL_WRITE_BYTES_TOTAL: IntCounter = METRIC_WAL_OP_BYTES_TOTAL.with_label_values(
&["write"],
);
/// Counter of bytes of the read operation.
pub static ref METRIC_WAL_READ_BYTES_TOTAL: IntCounter = METRIC_WAL_OP_BYTES_TOTAL.with_label_values(
&["read"],
);
}

361
benchmarks/src/wal_bench.rs Normal file
View File

@@ -0,0 +1,361 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::mem::size_of;
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, Mutation, OpType, Row, Rows, Value, WalEntry};
use clap::{Parser, ValueEnum};
use common_base::readable_size::ReadableSize;
use common_wal::options::WalOptions;
use futures::StreamExt;
use mito2::wal::{Wal, WalWriter};
use rand::distributions::{Alphanumeric, DistString, Uniform};
use rand::rngs::SmallRng;
use rand::{Rng, SeedableRng};
use serde::{Deserialize, Serialize};
use store_api::logstore::LogStore;
use store_api::storage::RegionId;
use crate::metrics;
/// The wal provider.
#[derive(Clone, ValueEnum, Default, Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum WalProvider {
#[default]
RaftEngine,
Kafka,
}
#[derive(Parser)]
pub struct Args {
/// The provided configuration file.
/// The example configuration file can be found at `greptimedb/benchmarks/config/wal_bench.example.toml`.
#[clap(long, short = 'c')]
pub cfg_file: String,
/// The wal provider.
#[clap(long, value_enum, default_value_t = WalProvider::default())]
pub wal_provider: WalProvider,
/// The advertised addresses of the kafka brokers.
/// If there're multiple bootstrap brokers, their addresses should be separated by comma, for e.g. "localhost:9092,localhost:9093".
#[clap(long, short = 'b', default_value = "localhost:9092")]
pub bootstrap_brokers: String,
/// The number of workers each running in a dedicated thread.
#[clap(long, default_value_t = num_cpus::get() as u32)]
pub num_workers: u32,
/// The number of kafka topics to be created.
#[clap(long, default_value_t = 32)]
pub num_topics: u32,
/// The number of regions.
#[clap(long, default_value_t = 1000)]
pub num_regions: u32,
/// The number of times each region is scraped.
#[clap(long, default_value_t = 1000)]
pub num_scrapes: u32,
/// The number of rows in each wal entry.
/// Each time a region is scraped, a wal entry containing will be produced.
#[clap(long, default_value_t = 5)]
pub num_rows: u32,
/// The column types of the schema for each region.
/// Currently, three column types are supported:
/// - i = ColumnDataType::Int64
/// - f = ColumnDataType::Float64
/// - s = ColumnDataType::String
/// For e.g., "ifs" will be parsed as three columns: i64, f64, and string.
///
/// Additionally, a "x" sign can be provided to repeat the column types for a given number of times.
/// For e.g., "iix2" will be parsed as 4 columns: i64, i64, i64, and i64.
/// This feature is useful if you want to specify many columns.
#[clap(long, default_value = "ifs")]
pub col_types: String,
/// The maximum size of a batch of kafka records.
/// The default value is 1mb.
#[clap(long, default_value = "512KB")]
pub max_batch_size: ReadableSize,
/// The minimum latency the kafka client issues a batch of kafka records.
/// However, a batch of kafka records would be immediately issued if a record cannot be fit into the batch.
#[clap(long, default_value = "1ms")]
pub linger: String,
/// The initial backoff delay of the kafka consumer.
#[clap(long, default_value = "10ms")]
pub backoff_init: String,
/// The maximum backoff delay of the kafka consumer.
#[clap(long, default_value = "1s")]
pub backoff_max: String,
/// The exponential backoff rate of the kafka consumer. The next back off = base * the current backoff.
#[clap(long, default_value_t = 2)]
pub backoff_base: u32,
/// The deadline of backoff. The backoff ends if the total backoff delay reaches the deadline.
#[clap(long, default_value = "3s")]
pub backoff_deadline: String,
/// The client-side compression algorithm for kafka records.
#[clap(long, default_value = "zstd")]
pub compression: String,
/// The seed of random number generators.
#[clap(long, default_value_t = 42)]
pub rng_seed: u64,
/// Skips the read phase, aka. region replay, if set to true.
#[clap(long, default_value_t = false)]
pub skip_read: bool,
/// Skips the write phase if set to true.
#[clap(long, default_value_t = false)]
pub skip_write: bool,
/// Randomly generates topic names if set to true.
/// Useful when you want to run the benchmarker without worrying about the topics created before.
#[clap(long, default_value_t = false)]
pub random_topics: bool,
/// Logs out the gathered prometheus metrics when the benchmarker ends.
#[clap(long, default_value_t = false)]
pub report_metrics: bool,
}
/// Benchmarker config.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Config {
pub wal_provider: WalProvider,
pub bootstrap_brokers: Vec<String>,
pub num_workers: u32,
pub num_topics: u32,
pub num_regions: u32,
pub num_scrapes: u32,
pub num_rows: u32,
pub col_types: String,
pub max_batch_size: ReadableSize,
#[serde(with = "humantime_serde")]
pub linger: Duration,
#[serde(with = "humantime_serde")]
pub backoff_init: Duration,
#[serde(with = "humantime_serde")]
pub backoff_max: Duration,
pub backoff_base: u32,
#[serde(with = "humantime_serde")]
pub backoff_deadline: Duration,
pub compression: String,
pub rng_seed: u64,
pub skip_read: bool,
pub skip_write: bool,
pub random_topics: bool,
pub report_metrics: bool,
}
impl From<Args> for Config {
fn from(args: Args) -> Self {
let cfg = Self {
wal_provider: args.wal_provider,
bootstrap_brokers: args
.bootstrap_brokers
.split(',')
.map(ToString::to_string)
.collect::<Vec<_>>(),
num_workers: args.num_workers.min(num_cpus::get() as u32),
num_topics: args.num_topics,
num_regions: args.num_regions,
num_scrapes: args.num_scrapes,
num_rows: args.num_rows,
col_types: args.col_types,
max_batch_size: args.max_batch_size,
linger: humantime::parse_duration(&args.linger).unwrap(),
backoff_init: humantime::parse_duration(&args.backoff_init).unwrap(),
backoff_max: humantime::parse_duration(&args.backoff_max).unwrap(),
backoff_base: args.backoff_base,
backoff_deadline: humantime::parse_duration(&args.backoff_deadline).unwrap(),
compression: args.compression,
rng_seed: args.rng_seed,
skip_read: args.skip_read,
skip_write: args.skip_write,
random_topics: args.random_topics,
report_metrics: args.report_metrics,
};
cfg
}
}
/// The region used for wal benchmarker.
pub struct Region {
id: RegionId,
schema: Vec<ColumnSchema>,
wal_options: WalOptions,
next_sequence: AtomicU64,
next_entry_id: AtomicU64,
next_timestamp: AtomicI64,
rng: Mutex<Option<SmallRng>>,
num_rows: u32,
}
impl Region {
/// Creates a new region.
pub fn new(
id: RegionId,
schema: Vec<ColumnSchema>,
wal_options: WalOptions,
num_rows: u32,
rng_seed: u64,
) -> Self {
Self {
id,
schema,
wal_options,
next_sequence: AtomicU64::new(1),
next_entry_id: AtomicU64::new(1),
next_timestamp: AtomicI64::new(1655276557000),
rng: Mutex::new(Some(SmallRng::seed_from_u64(rng_seed))),
num_rows,
}
}
/// Scrapes the region and adds the generated entry to wal.
pub fn add_wal_entry<S: LogStore>(&self, wal_writer: &mut WalWriter<S>) {
let mutation = Mutation {
op_type: OpType::Put as i32,
sequence: self
.next_sequence
.fetch_add(self.num_rows as u64, Ordering::Relaxed),
rows: Some(self.build_rows()),
};
let entry = WalEntry {
mutations: vec![mutation],
};
metrics::METRIC_WAL_WRITE_BYTES_TOTAL.inc_by(Self::entry_estimated_size(&entry) as u64);
wal_writer
.add_entry(
self.id,
self.next_entry_id.fetch_add(1, Ordering::Relaxed),
&entry,
&self.wal_options,
)
.unwrap();
}
/// Replays the region.
pub async fn replay<S: LogStore>(&self, wal: &Arc<Wal<S>>) {
let mut wal_stream = wal.scan(self.id, 0, &self.wal_options).unwrap();
while let Some(res) = wal_stream.next().await {
let (_, entry) = res.unwrap();
metrics::METRIC_WAL_READ_BYTES_TOTAL.inc_by(Self::entry_estimated_size(&entry) as u64);
}
}
/// Computes the estimated size in bytes of the entry.
pub fn entry_estimated_size(entry: &WalEntry) -> usize {
let wrapper_size = size_of::<WalEntry>()
+ entry.mutations.capacity() * size_of::<Mutation>()
+ size_of::<Rows>();
let rows = entry.mutations[0].rows.as_ref().unwrap();
let schema_size = rows.schema.capacity() * size_of::<ColumnSchema>()
+ rows
.schema
.iter()
.map(|s| s.column_name.capacity())
.sum::<usize>();
let values_size = (rows.rows.capacity() * size_of::<Row>())
+ rows
.rows
.iter()
.map(|r| r.values.capacity() * size_of::<Value>())
.sum::<usize>();
wrapper_size + schema_size + values_size
}
fn build_rows(&self) -> Rows {
let cols = self
.schema
.iter()
.map(|col_schema| {
let col_data_type = ColumnDataType::try_from(col_schema.datatype).unwrap();
self.build_col(&col_data_type, self.num_rows)
})
.collect::<Vec<_>>();
let rows = (0..self.num_rows)
.map(|i| {
let values = cols.iter().map(|col| col[i as usize].clone()).collect();
Row { values }
})
.collect();
Rows {
schema: self.schema.clone(),
rows,
}
}
fn build_col(&self, col_data_type: &ColumnDataType, num_rows: u32) -> Vec<Value> {
let mut rng_guard = self.rng.lock().unwrap();
let rng = rng_guard.as_mut().unwrap();
match col_data_type {
ColumnDataType::TimestampMillisecond => (0..num_rows)
.map(|_| {
let ts = self.next_timestamp.fetch_add(1000, Ordering::Relaxed);
Value {
value_data: Some(ValueData::TimestampMillisecondValue(ts)),
}
})
.collect(),
ColumnDataType::Int64 => (0..num_rows)
.map(|_| {
let v = rng.sample(Uniform::new(0, 10_000));
Value {
value_data: Some(ValueData::I64Value(v)),
}
})
.collect(),
ColumnDataType::Float64 => (0..num_rows)
.map(|_| {
let v = rng.sample(Uniform::new(0.0, 5000.0));
Value {
value_data: Some(ValueData::F64Value(v)),
}
})
.collect(),
ColumnDataType::String => (0..num_rows)
.map(|_| {
let v = Alphanumeric.sample_string(rng, 10);
Value {
value_data: Some(ValueData::StringValue(v)),
}
})
.collect(),
_ => unreachable!(),
}
}
}

117
cliff.toml Normal file
View File

@@ -0,0 +1,117 @@
# https://git-cliff.org/docs/configuration
[remote.github]
owner = "GreptimeTeam"
repo = "greptimedb"
[changelog]
header = ""
footer = ""
# template for the changelog body
# https://keats.github.io/tera/docs/#introduction
body = """
# {{ version }}
Release date: {{ timestamp | date(format="%B %d, %Y") }}
{%- set breakings = commits | filter(attribute="breaking", value=true) -%}
{%- if breakings | length > 0 %}
## Breaking changes
{% for commit in breakings %}
* {{ commit.github.pr_title }}\
{% if commit.github.username %} by \
{% set author = commit.github.username -%}
[@{{ author }}](https://github.com/{{ author }})
{%- endif -%}
{% if commit.github.pr_number %} in \
{% set number = commit.github.pr_number -%}
[#{{ number }}]({{ self::remote_url() }}/pull/{{ number }})
{%- endif %}
{%- endfor %}
{%- endif -%}
{%- set grouped_commits = commits | filter(attribute="breaking", value=false) | group_by(attribute="group") -%}
{% for group, commits in grouped_commits %}
### {{ group | striptags | trim | upper_first }}
{% for commit in commits %}
* {{ commit.github.pr_title }}\
{% if commit.github.username %} by \
{% set author = commit.github.username -%}
[@{{ author }}](https://github.com/{{ author }})
{%- endif -%}
{% if commit.github.pr_number %} in \
{% set number = commit.github.pr_number -%}
[#{{ number }}]({{ self::remote_url() }}/pull/{{ number }})
{%- endif %}
{%- endfor -%}
{% endfor %}
{%- if github.contributors | filter(attribute="is_first_time", value=true) | length != 0 %}
{% raw %}\n{% endraw -%}
## New Contributors
{% endif -%}
{% for contributor in github.contributors | filter(attribute="is_first_time", value=true) %}
* @{{ contributor.username }} made their first contribution
{%- if contributor.pr_number %} in \
[#{{ contributor.pr_number }}]({{ self::remote_url() }}/pull/{{ contributor.pr_number }}) \
{%- endif %}
{%- endfor -%}
{% if github.contributors | length != 0 %}
{% raw %}\n{% endraw -%}
## All Contributors
We would like to thank the following contributors from the GreptimeDB community:
{{ github.contributors | map(attribute="username") | join(sep=", ") }}
{%- endif %}
{% raw %}\n{% endraw %}
{%- macro remote_url() -%}
https://github.com/{{ remote.github.owner }}/{{ remote.github.repo }}
{%- endmacro -%}
"""
trim = true
[git]
# parse the commits based on https://www.conventionalcommits.org
conventional_commits = true
# filter out the commits that are not conventional
filter_unconventional = true
# process each line of a commit as an individual commit
split_commits = false
# regex for parsing and grouping commits
commit_parsers = [
{ message = "^feat", group = "<!-- 0 -->🚀 Features" },
{ message = "^fix", group = "<!-- 1 -->🐛 Bug Fixes" },
{ message = "^doc", group = "<!-- 3 -->📚 Documentation" },
{ message = "^perf", group = "<!-- 4 -->⚡ Performance" },
{ message = "^refactor", group = "<!-- 2 -->🚜 Refactor" },
{ message = "^style", group = "<!-- 5 -->🎨 Styling" },
{ message = "^test", group = "<!-- 6 -->🧪 Testing" },
{ message = "^chore\\(release\\): prepare for", skip = true },
{ message = "^chore\\(deps.*\\)", skip = true },
{ message = "^chore\\(pr\\)", skip = true },
{ message = "^chore\\(pull\\)", skip = true },
{ message = "^chore|^ci", group = "<!-- 7 -->⚙️ Miscellaneous Tasks" },
{ body = ".*security", group = "<!-- 8 -->🛡️ Security" },
{ message = "^revert", group = "<!-- 9 -->◀️ Revert" },
]
# protect breaking changes from being skipped due to matching a skipping commit_parser
protect_breaking_commits = false
# filter out the commits that are not matched by commit parsers
filter_commits = false
# regex for matching git tags
# tag_pattern = "v[0-9].*"
# regex for skipping tags
# skip_tags = ""
# regex for ignoring tags
ignore_tags = ".*-nightly-.*"
# sort the tags topologically
topo_order = false
# sort the commits inside sections by oldest/newest order
sort_commits = "oldest"
# limit the number of commits included in the changelog.
# limit_commits = 42

View File

@@ -8,5 +8,6 @@ coverage:
ignore:
- "**/error*.rs" # ignore all error.rs files
- "tests/runner/*.rs" # ignore integration test runner
- "tests-integration/**/*.rs" # ignore integration tests
comment: # this is a top-level key
layout: "diff"

View File

@@ -134,10 +134,22 @@ create_on_compaction = "auto"
apply_on_query = "auto"
# Memory threshold for performing an external sort during index creation.
# Setting to empty will disable external sorting, forcing all sorting operations to happen in memory.
mem_threshold_on_create = "64MB"
mem_threshold_on_create = "64M"
# File system path to store intermediate files for external sorting (default `{data_home}/index_intermediate`).
intermediate_path = ""
[region_engine.mito.memtable]
# Memtable type.
# - "partition_tree": partition tree memtable
# - "time_series": time-series memtable (deprecated)
type = "partition_tree"
# The max number of keys in one shard.
index_max_keys_per_shard = 8192
# The max rows of data inside the actively writing buffer in one shard.
data_freeze_threshold = 32768
# Max dictionary bytes.
fork_dictionary_bytes = "1GiB"
# Log options, see `standalone.example.toml`
# [logging]
# dir = "/tmp/greptimedb/logs"

View File

@@ -31,6 +31,7 @@ runtime_size = 2
mode = "disable"
cert_path = ""
key_path = ""
watch = false
# PostgresSQL server options, see `standalone.example.toml`.
[postgres]
@@ -43,6 +44,7 @@ runtime_size = 2
mode = "disable"
cert_path = ""
key_path = ""
watch = false
# OpenTSDB protocol options, see `standalone.example.toml`.
[opentsdb]

View File

@@ -29,6 +29,12 @@ store_key_prefix = ""
max_retry_times = 12
# Initial retry delay of procedures, increases exponentially
retry_delay = "500ms"
# Auto split large value
# GreptimeDB procedure uses etcd as the default metadata storage backend.
# The etcd the maximum size of any request is 1.5 MiB
# 1500KiB = 1536KiB (1.5MiB) - 36KiB (reserved size of key)
# Comments out the `max_metadata_value_size`, for don't split large value (no limit).
max_metadata_value_size = "1500KiB"
# Failure detectors options.
[failure_detector]

View File

@@ -44,6 +44,8 @@ mode = "disable"
cert_path = ""
# Private key file path.
key_path = ""
# Watch for Certificate and key file change and auto reload
watch = false
# PostgresSQL server options.
[postgres]
@@ -62,6 +64,8 @@ mode = "disable"
cert_path = ""
# private key file path.
key_path = ""
# Watch for Certificate and key file change and auto reload
watch = false
# OpenTSDB protocol options.
[opentsdb]
@@ -118,7 +122,7 @@ sync_period = "1000ms"
# Number of topics to be created upon start.
# num_topics = 64
# Topic selector type.
# Available selector types:
# Available selector types:
# - "round_robin" (default)
# selector_type = "round_robin"
# The prefix of topic name.
@@ -240,6 +244,18 @@ mem_threshold_on_create = "64M"
# File system path to store intermediate files for external sorting (default `{data_home}/index_intermediate`).
intermediate_path = ""
[region_engine.mito.memtable]
# Memtable type.
# - "partition_tree": partition tree memtable
# - "time_series": time-series memtable (deprecated)
type = "partition_tree"
# The max number of keys in one shard.
index_max_keys_per_shard = 8192
# The max rows of data inside the actively writing buffer in one shard.
data_freeze_threshold = 32768
# Max dictionary bytes.
fork_dictionary_bytes = "1GiB"
# Log options
# [logging]
# Specify logs directory.
@@ -250,10 +266,11 @@ intermediate_path = ""
# enable_otlp_tracing = false
# tracing exporter endpoint with format `ip:port`, we use grpc oltp as exporter, default endpoint is `localhost:4317`
# otlp_endpoint = "localhost:4317"
# The percentage of tracing will be sampled and exported. Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1. ratio > 1 are treated as 1. Fractions < 0 are treated as 0
# tracing_sample_ratio = 1.0
# Whether to append logs to stdout. Defaults to true.
# append_stdout = true
# The percentage of tracing will be sampled and exported. Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1. ratio > 1 are treated as 1. Fractions < 0 are treated as 0
# [logging.tracing_sample_ratio]
# default_ratio = 0.0
# Standalone export the metrics generated by itself
# encoded to Prometheus remote-write format

View File

@@ -0,0 +1,50 @@
# TSBS benchmark - v0.7.0
## Environment
### Local
| | |
| ------ | ---------------------------------- |
| CPU | AMD Ryzen 7 7735HS (8 core 3.2GHz) |
| Memory | 32GB |
| Disk | SOLIDIGM SSDPFKNU010TZ |
| OS | Ubuntu 22.04.2 LTS |
### Amazon EC2
| | |
| ------- | -------------- |
| Machine | c5d.2xlarge |
| CPU | 8 core |
| Memory | 16GB |
| Disk | 50GB (GP3) |
| OS | Ubuntu 22.04.1 |
## Write performance
| Environment | Ingest rate (rows/s) |
| ------------------ | --------------------- |
| Local | 3695814.64 |
| EC2 c5d.2xlarge | 2987166.64 |
## Query performance
| Query type | Local (ms) | EC2 c5d.2xlarge (ms) |
| --------------------- | ---------- | ---------------------- |
| cpu-max-all-1 | 30.56 | 54.74 |
| cpu-max-all-8 | 52.69 | 70.50 |
| double-groupby-1 | 664.30 | 1366.63 |
| double-groupby-5 | 1391.26 | 2141.71 |
| double-groupby-all | 2828.94 | 3389.59 |
| groupby-orderby-limit | 718.92 | 1213.90 |
| high-cpu-1 | 29.21 | 52.98 |
| high-cpu-all | 5514.12 | 7194.91 |
| lastpoint | 7571.40 | 9423.41 |
| single-groupby-1-1-1 | 19.09 | 7.77 |
| single-groupby-1-1-12 | 27.28 | 51.64 |
| single-groupby-1-8-1 | 31.85 | 11.64 |
| single-groupby-5-1-1 | 16.14 | 9.67 |
| single-groupby-5-1-12 | 27.21 | 53.62 |
| single-groupby-5-8-1 | 39.62 | 14.96 |

View File

@@ -79,7 +79,7 @@ This RFC proposes to add a new expression node `MergeScan` to merge result from
│ │ │ │
└─Frontend──────┘ └─Remote-Sources──────────────┘
```
This merge operation simply chains all the the underlying remote data sources and return `RecordBatch`, just like a coalesce op. And each remote sources is a gRPC query to datanode via the substrait logical plan interface. The plan is transformed and divided from the original query that comes to frontend.
This merge operation simply chains all the underlying remote data sources and return `RecordBatch`, just like a coalesce op. And each remote sources is a gRPC query to datanode via the substrait logical plan interface. The plan is transformed and divided from the original query that comes to frontend.
## Commutativity of MergeScan

Binary file not shown.

After

Width:  |  Height:  |  Size: 65 KiB

View File

@@ -0,0 +1,101 @@
---
Feature Name: Multi-dimension Partition Rule
Tracking Issue: https://github.com/GreptimeTeam/greptimedb/issues/3351
Date: 2024-02-21
Author: "Ruihang Xia <waynestxia@gmail.com>"
---
# Summary
A new region partition scheme that runs on multiple dimensions of the key space. The partition rule is defined by a set of simple expressions on the partition key columns.
# Motivation
The current partition rule is from MySQL's [`RANGE Partition`](https://dev.mysql.com/doc/refman/8.0/en/partitioning-range.html), which is based on a single dimension. It is sort of a [Hilbert Curve](https://en.wikipedia.org/wiki/Hilbert_curve) and pick several point on the curve to divide the space. It is neither easy to understand how the data get partitioned nor flexible enough to handle complex partitioning requirements.
Considering the future requirements like region repartitioning or autonomous rebalancing, where both workload and partition may change frequently. Here proposes a new region partition scheme that uses a set of simple expressions on the partition key columns to divide the key space.
# Details
## Partition rule
First, we define a simple expression that can be used to define the partition rule. The simple expression is a binary expression expression on the partition key columns that can be evaluated to a boolean value. The binary operator is limited to comparison operators only, like `=`, `!=`, `>`, `>=`, `<`, `<=`. And the operands are limited either literal value or partition column.
Example of valid simple expressions are $`col_A = 10`$, $`col_A \gt 10 \& col_B \gt 20`$ or $`col_A \ne 10`$.
Those expressions can be used as predicates to divide the key space into different regions. The following example have two partition columns `Col A` and `Col B`, and four partitioned regions.
```math
\left\{\begin{aligned}
&col_A \le 10 &Region_1 \\
&10 \lt col_A \& col_A \le 20 &Region_2 \\
&20 \lt col_A \space \& \space col_B \lt 100 &Region_3 \\
&20 \lt col_A \space \& \space col_B \ge 100 &Region_4
\end{aligned}\right\}
```
An advantage of this scheme is that it is easy to understand how the data get partitioned. The above example can be visualized in a 2D space (two partition column is involved in the example).
![example](2d-example.png)
Here each expression draws a line in the 2D space. Managing data partitioning becomes a matter of drawing lines in the key space.
To make it easy to use, there is a "default region" which catches all the data that doesn't match any of previous expressions. The default region exist by default and do not need to specify. It is also possible to remove this default region if the DB finds it is not necessary.
## SQL interface
The SQL interface is in response to two parts: specifying the partition columns and the partition rule. Thouth we are targeting an autonomous system, it's still allowed to give some bootstrap rules or hints on creating table.
Partition column is specified by `PARTITION ON COLUMNS` sub-clause in `CREATE TABLE`:
```sql
CREATE TABLE t (...)
PARTITION ON COLUMNS (...) ();
```
Two following brackets are for partition columns and partition rule respectively.
Columns provided here are only used as an allow-list of how the partition rule can be defined. Which means (a) the sequence between columns doesn't matter, (b) the columns provided here are not necessarily being used in the partition rule.
The partition rule part is a list of comma-separated simple expressions. Expressions here are not corresponding to region, as they might be changed by system to fit various workload.
A full example of `CREATE TABLE` with partition rule is:
```sql
CREATE TABLE IF NOT EXISTS demo (
a STRING,
b STRING,
c STRING,
d STRING,
ts TIMESTAMP,
memory DOUBLE,
TIME INDEX (ts),
PRIMARY KEY (a, b, c, d)
)
PARTITION ON COLUMNS (c, b, a) (
a < 10,
10 >= a AND a < 20,
20 >= a AND b < 100,
20 >= a AND b > 100
)
```
## Combine with storage
Examining columns separately suits our columnar storage very well in two aspects.
1. The simple expression can be pushed down to storage and file format, and is likely to hit existing index. Makes pruning operation very efficient.
2. Columns in columnar storage are not tightly coupled like in the traditional row storages, which means we can easily add or remove columns from partition rule without much impact (like a global reshuffle) on data.
The data file itself can be "projected" to the key space as a polyhedron, it is guaranteed that each plane is in parallel with some coordinate planes (in a 2D scenario, this is saying that all the files can be projected to a rectangle). Thus partition or repartition also only need to consider related columns.
![sst-project](sst-project.png)
An additional limitation is that considering how the index works and how we organize the primary keys at present, the partition columns are limited to be a subset of primary keys for better performance.
# Drawbacks
This is a breaking change.

Binary file not shown.

After

Width:  |  Height:  |  Size: 71 KiB

View File

@@ -66,7 +66,7 @@
},
"editable": true,
"fiscalYearStartMonth": 0,
"graphTooltip": 0,
"graphTooltip": 1,
"id": null,
"links": [],
"liveNow": false,
@@ -2116,7 +2116,7 @@
}
]
},
"unit": "bytes"
"unit": "none"
},
"overrides": []
},
@@ -2126,7 +2126,7 @@
"x": 0,
"y": 61
},
"id": 12,
"id": 17,
"interval": "1s",
"options": {
"legend": {
@@ -2147,8 +2147,8 @@
"uid": "${DS_PROMETHEUS-1}"
},
"disableTextWrap": false,
"editorMode": "builder",
"expr": "histogram_quantile(0.95, sum by(le) (rate(raft_engine_write_size_bucket[$__rate_interval])))",
"editorMode": "code",
"expr": "rate(raft_engine_sync_log_duration_seconds_count[2s])",
"fullMetaSearch": false,
"includeNullMetadata": false,
"instant": false,
@@ -2158,7 +2158,7 @@
"useBackend": false
}
],
"title": "wal write size",
"title": "raft engine sync count",
"type": "timeseries"
},
{
@@ -2378,6 +2378,120 @@
],
"title": "raft engine write duration seconds",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS-1}"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
},
"unit": "bytes"
},
"overrides": []
},
"gridPos": {
"h": 7,
"w": 12,
"x": 12,
"y": 68
},
"id": 12,
"interval": "1s",
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS-1}"
},
"disableTextWrap": false,
"editorMode": "code",
"expr": "histogram_quantile(0.95, sum by(le) (rate(raft_engine_write_size_bucket[$__rate_interval])))",
"fullMetaSearch": false,
"includeNullMetadata": false,
"instant": false,
"legendFormat": "req-size-p95",
"range": true,
"refId": "A",
"useBackend": false
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS-1}"
},
"editorMode": "code",
"expr": "rate(raft_engine_write_size_sum[$__rate_interval])",
"hide": false,
"instant": false,
"legendFormat": "throughput",
"range": true,
"refId": "B"
}
],
"title": "wal write size",
"type": "timeseries"
}
],
"refresh": "10s",
@@ -2387,13 +2501,13 @@
"list": []
},
"time": {
"from": "now-3h",
"from": "now-30m",
"to": "now"
},
"timepicker": {},
"timezone": "",
"title": "GreptimeDB",
"uid": "e7097237-669b-4f8d-b751-13067afbfb68",
"version": 9,
"version": 12,
"weekStart": ""
}

View File

@@ -19,6 +19,12 @@ includes = [
"*.py",
]
excludes = [
# copied sources
"src/common/base/src/readable_size.rs",
"src/servers/src/repeated_field.rs",
]
[properties]
inceptionYear = 2023
copyrightOwner = "Greptime Team"

View File

@@ -1,8 +1,7 @@
#!/usr/bin/env bash
# This script is used to download built dashboard assets from the "GreptimeTeam/dashboard" repository.
set -e -x
set -ex
declare -r SCRIPT_DIR=$(cd $(dirname ${0}) >/dev/null 2>&1 && pwd)
declare -r ROOT_DIR=$(dirname ${SCRIPT_DIR})
@@ -13,13 +12,34 @@ RELEASE_VERSION="$(cat $STATIC_DIR/VERSION | tr -d '\t\r\n ')"
echo "Downloading assets to dir: $OUT_DIR"
cd $OUT_DIR
if [[ -z "$GITHUB_PROXY_URL" ]]; then
GITHUB_URL="https://github.com"
else
GITHUB_URL="${GITHUB_PROXY_URL%/}"
fi
function retry_fetch() {
local url=$1
local filename=$2
curl --connect-timeout 10 --retry 3 -fsSL $url --output $filename || {
echo "Failed to download $url"
echo "You may try to set http_proxy and https_proxy environment variables."
if [[ -z "$GITHUB_PROXY_URL" ]]; then
echo "You may try to set GITHUB_PROXY_URL=http://mirror.ghproxy.com/https://github.com/"
fi
exit 1
}
}
# Download the SHA256 checksum attached to the release. To verify the integrity
# of the download, this checksum will be used to check the download tar file
# containing the built dashboard assets.
curl -Ls https://github.com/GreptimeTeam/dashboard/releases/download/$RELEASE_VERSION/sha256.txt --output sha256.txt
retry_fetch "${GITHUB_URL}/GreptimeTeam/dashboard/releases/download/${RELEASE_VERSION}/sha256.txt" sha256.txt
# Download the tar file containing the built dashboard assets.
curl -L https://github.com/GreptimeTeam/dashboard/releases/download/$RELEASE_VERSION/build.tar.gz --output build.tar.gz
retry_fetch "${GITHUB_URL}/GreptimeTeam/dashboard/releases/download/${RELEASE_VERSION}/build.tar.gz" build.tar.gz
# Verify the checksums match; exit if they don't.
case "$(uname -s)" in

View File

@@ -4,6 +4,9 @@ version.workspace = true
edition.workspace = true
license.workspace = true
[lints]
workspace = true
[dependencies]
common-base.workspace = true
common-decimal.workspace = true
@@ -15,7 +18,6 @@ greptime-proto.workspace = true
paste = "1.0"
prost.workspace = true
snafu.workspace = true
tonic.workspace = true
[build-dependencies]
tonic-build = "0.9"

View File

@@ -707,7 +707,6 @@ pub fn pb_values_to_vector_ref(data_type: &ConcreteDataType, values: Values) ->
}
pub fn pb_values_to_values(data_type: &ConcreteDataType, values: Values) -> Vec<Value> {
// TODO(fys): use macros to optimize code
match data_type {
ConcreteDataType::Int64(_) => values
.i64_values

View File

@@ -8,13 +8,17 @@ license.workspace = true
default = []
testing = []
[lints]
workspace = true
[dependencies]
api.workspace = true
async-trait.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-telemetry.workspace = true
digest = "0.10"
hex = { version = "0.4" }
notify.workspace = true
secrecy = { version = "0.8", features = ["serde", "alloc"] }
sha1 = "0.10"
snafu.workspace = true

View File

@@ -22,6 +22,9 @@ use snafu::{ensure, OptionExt};
use crate::error::{IllegalParamSnafu, InvalidConfigSnafu, Result, UserPasswordMismatchSnafu};
use crate::user_info::DefaultUserInfo;
use crate::user_provider::static_user_provider::{StaticUserProvider, STATIC_USER_PROVIDER};
use crate::user_provider::watch_file_user_provider::{
WatchFileUserProvider, WATCH_FILE_USER_PROVIDER,
};
use crate::{UserInfoRef, UserProviderRef};
pub(crate) const DEFAULT_USERNAME: &str = "greptime";
@@ -40,9 +43,12 @@ pub fn user_provider_from_option(opt: &String) -> Result<UserProviderRef> {
match name {
STATIC_USER_PROVIDER => {
let provider =
StaticUserProvider::try_from(content).map(|p| Arc::new(p) as UserProviderRef)?;
StaticUserProvider::new(content).map(|p| Arc::new(p) as UserProviderRef)?;
Ok(provider)
}
WATCH_FILE_USER_PROVIDER => {
WatchFileUserProvider::new(content).map(|p| Arc::new(p) as UserProviderRef)
}
_ => InvalidConfigSnafu {
value: name.to_string(),
msg: "Invalid UserProviderOption",

View File

@@ -64,6 +64,13 @@ pub enum Error {
username: String,
},
#[snafu(display("Failed to initialize a watcher for file {}", path))]
FileWatch {
path: String,
#[snafu(source)]
error: notify::Error,
},
#[snafu(display("User is not authorized to perform this action"))]
PermissionDenied { location: Location },
}
@@ -73,6 +80,7 @@ impl ErrorExt for Error {
match self {
Error::InvalidConfig { .. } => StatusCode::InvalidArguments,
Error::IllegalParam { .. } => StatusCode::InvalidArguments,
Error::FileWatch { .. } => StatusCode::InvalidArguments,
Error::InternalState { .. } => StatusCode::Unexpected,
Error::Io { .. } => StatusCode::Internal,
Error::AuthBackend { .. } => StatusCode::Internal,

View File

@@ -13,10 +13,24 @@
// limitations under the License.
pub(crate) mod static_user_provider;
pub(crate) mod watch_file_user_provider;
use std::collections::HashMap;
use std::fs::File;
use std::io;
use std::io::BufRead;
use std::path::Path;
use secrecy::ExposeSecret;
use snafu::{ensure, OptionExt, ResultExt};
use crate::common::{Identity, Password};
use crate::error::Result;
use crate::UserInfoRef;
use crate::error::{
IllegalParamSnafu, InvalidConfigSnafu, IoSnafu, Result, UnsupportedPasswordTypeSnafu,
UserNotFoundSnafu, UserPasswordMismatchSnafu,
};
use crate::user_info::DefaultUserInfo;
use crate::{auth_mysql, UserInfoRef};
#[async_trait::async_trait]
pub trait UserProvider: Send + Sync {
@@ -44,3 +58,88 @@ pub trait UserProvider: Send + Sync {
Ok(user_info)
}
}
fn load_credential_from_file(filepath: &str) -> Result<Option<HashMap<String, Vec<u8>>>> {
// check valid path
let path = Path::new(filepath);
if !path.exists() {
return Ok(None);
}
ensure!(
path.is_file(),
InvalidConfigSnafu {
value: filepath,
msg: "UserProvider file must be a file",
}
);
let file = File::open(path).context(IoSnafu)?;
let credential = io::BufReader::new(file)
.lines()
.map_while(std::result::Result::ok)
.filter_map(|line| {
if let Some((k, v)) = line.split_once('=') {
Some((k.to_string(), v.as_bytes().to_vec()))
} else {
None
}
})
.collect::<HashMap<String, Vec<u8>>>();
ensure!(
!credential.is_empty(),
InvalidConfigSnafu {
value: filepath,
msg: "UserProvider's file must contains at least one valid credential",
}
);
Ok(Some(credential))
}
fn authenticate_with_credential(
users: &HashMap<String, Vec<u8>>,
input_id: Identity<'_>,
input_pwd: Password<'_>,
) -> Result<UserInfoRef> {
match input_id {
Identity::UserId(username, _) => {
ensure!(
!username.is_empty(),
IllegalParamSnafu {
msg: "blank username"
}
);
let save_pwd = users.get(username).context(UserNotFoundSnafu {
username: username.to_string(),
})?;
match input_pwd {
Password::PlainText(pwd) => {
ensure!(
!pwd.expose_secret().is_empty(),
IllegalParamSnafu {
msg: "blank password"
}
);
if save_pwd == pwd.expose_secret().as_bytes() {
Ok(DefaultUserInfo::with_name(username))
} else {
UserPasswordMismatchSnafu {
username: username.to_string(),
}
.fail()
}
}
Password::MysqlNativePassword(auth_data, salt) => {
auth_mysql(auth_data, salt, username, save_pwd)
.map(|_| DefaultUserInfo::with_name(username))
}
Password::PgMD5(_, _) => UnsupportedPasswordTypeSnafu {
password_type: "pg_md5",
}
.fail(),
}
}
}
}

View File

@@ -13,60 +13,34 @@
// limitations under the License.
use std::collections::HashMap;
use std::fs::File;
use std::io;
use std::io::BufRead;
use std::path::Path;
use async_trait::async_trait;
use secrecy::ExposeSecret;
use snafu::{ensure, OptionExt, ResultExt};
use snafu::OptionExt;
use crate::error::{
Error, IllegalParamSnafu, InvalidConfigSnafu, IoSnafu, Result, UnsupportedPasswordTypeSnafu,
UserNotFoundSnafu, UserPasswordMismatchSnafu,
};
use crate::user_info::DefaultUserInfo;
use crate::{auth_mysql, Identity, Password, UserInfoRef, UserProvider};
use crate::error::{InvalidConfigSnafu, Result};
use crate::user_provider::{authenticate_with_credential, load_credential_from_file};
use crate::{Identity, Password, UserInfoRef, UserProvider};
pub(crate) const STATIC_USER_PROVIDER: &str = "static_user_provider";
impl TryFrom<&str> for StaticUserProvider {
type Error = Error;
pub(crate) struct StaticUserProvider {
users: HashMap<String, Vec<u8>>,
}
fn try_from(value: &str) -> Result<Self> {
impl StaticUserProvider {
pub(crate) fn new(value: &str) -> Result<Self> {
let (mode, content) = value.split_once(':').context(InvalidConfigSnafu {
value: value.to_string(),
msg: "StaticUserProviderOption must be in format `<option>:<value>`",
})?;
return match mode {
"file" => {
// check valid path
let path = Path::new(content);
ensure!(path.exists() && path.is_file(), InvalidConfigSnafu {
value: content.to_string(),
msg: "StaticUserProviderOption file must be a valid file path",
});
let file = File::open(path).context(IoSnafu)?;
let credential = io::BufReader::new(file)
.lines()
.map_while(std::result::Result::ok)
.filter_map(|line| {
if let Some((k, v)) = line.split_once('=') {
Some((k.to_string(), v.as_bytes().to_vec()))
} else {
None
}
})
.collect::<HashMap<String, Vec<u8>>>();
ensure!(!credential.is_empty(), InvalidConfigSnafu {
value: content.to_string(),
msg: "StaticUserProviderOption file must contains at least one valid credential",
});
Ok(StaticUserProvider { users: credential, })
let users = load_credential_from_file(content)?
.context(InvalidConfigSnafu {
value: content.to_string(),
msg: "StaticFileUserProvider must be a valid file path",
})?;
Ok(StaticUserProvider { users })
}
"cmd" => content
.split(',')
@@ -83,66 +57,19 @@ impl TryFrom<&str> for StaticUserProvider {
value: mode.to_string(),
msg: "StaticUserProviderOption must be in format `file:<path>` or `cmd:<values>`",
}
.fail(),
.fail(),
};
}
}
pub(crate) struct StaticUserProvider {
users: HashMap<String, Vec<u8>>,
}
#[async_trait]
impl UserProvider for StaticUserProvider {
fn name(&self) -> &str {
STATIC_USER_PROVIDER
}
async fn authenticate(
&self,
input_id: Identity<'_>,
input_pwd: Password<'_>,
) -> Result<UserInfoRef> {
match input_id {
Identity::UserId(username, _) => {
ensure!(
!username.is_empty(),
IllegalParamSnafu {
msg: "blank username"
}
);
let save_pwd = self.users.get(username).context(UserNotFoundSnafu {
username: username.to_string(),
})?;
match input_pwd {
Password::PlainText(pwd) => {
ensure!(
!pwd.expose_secret().is_empty(),
IllegalParamSnafu {
msg: "blank password"
}
);
return if save_pwd == pwd.expose_secret().as_bytes() {
Ok(DefaultUserInfo::with_name(username))
} else {
UserPasswordMismatchSnafu {
username: username.to_string(),
}
.fail()
};
}
Password::MysqlNativePassword(auth_data, salt) => {
auth_mysql(auth_data, salt, username, save_pwd)
.map(|_| DefaultUserInfo::with_name(username))
}
Password::PgMD5(_, _) => UnsupportedPasswordTypeSnafu {
password_type: "pg_md5",
}
.fail(),
}
}
}
async fn authenticate(&self, id: Identity<'_>, pwd: Password<'_>) -> Result<UserInfoRef> {
authenticate_with_credential(&self.users, id, pwd)
}
async fn authorize(
@@ -181,7 +108,7 @@ pub mod test {
#[tokio::test]
async fn test_authorize() {
let user_info = DefaultUserInfo::with_name("root");
let provider = StaticUserProvider::try_from("cmd:root=123456,admin=654321").unwrap();
let provider = StaticUserProvider::new("cmd:root=123456,admin=654321").unwrap();
provider
.authorize("catalog", "schema", &user_info)
.await
@@ -190,7 +117,7 @@ pub mod test {
#[tokio::test]
async fn test_inline_provider() {
let provider = StaticUserProvider::try_from("cmd:root=123456,admin=654321").unwrap();
let provider = StaticUserProvider::new("cmd:root=123456,admin=654321").unwrap();
test_authenticate(&provider, "root", "123456").await;
test_authenticate(&provider, "admin", "654321").await;
}
@@ -214,7 +141,7 @@ admin=654321",
}
let param = format!("file:{file_path}");
let provider = StaticUserProvider::try_from(param.as_str()).unwrap();
let provider = StaticUserProvider::new(param.as_str()).unwrap();
test_authenticate(&provider, "root", "123456").await;
test_authenticate(&provider, "admin", "654321").await;
}

View File

@@ -0,0 +1,215 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::path::Path;
use std::sync::mpsc::channel;
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use common_telemetry::{info, warn};
use notify::{EventKind, RecursiveMode, Watcher};
use snafu::{ensure, ResultExt};
use crate::error::{FileWatchSnafu, InvalidConfigSnafu, Result};
use crate::user_info::DefaultUserInfo;
use crate::user_provider::{authenticate_with_credential, load_credential_from_file};
use crate::{Identity, Password, UserInfoRef, UserProvider};
pub(crate) const WATCH_FILE_USER_PROVIDER: &str = "watch_file_user_provider";
type WatchedCredentialRef = Arc<Mutex<Option<HashMap<String, Vec<u8>>>>>;
/// A user provider that reads user credential from a file and watches the file for changes.
///
/// Empty file is invalid; but file not exist means every user can be authenticated.
pub(crate) struct WatchFileUserProvider {
users: WatchedCredentialRef,
}
impl WatchFileUserProvider {
pub fn new(filepath: &str) -> Result<Self> {
let credential = load_credential_from_file(filepath)?;
let users = Arc::new(Mutex::new(credential));
let this = WatchFileUserProvider {
users: users.clone(),
};
let (tx, rx) = channel::<notify::Result<notify::Event>>();
let mut debouncer =
notify::recommended_watcher(tx).context(FileWatchSnafu { path: "<none>" })?;
let mut dir = Path::new(filepath).to_path_buf();
ensure!(
dir.pop(),
InvalidConfigSnafu {
value: filepath,
msg: "UserProvider path must be a file path",
}
);
debouncer
.watch(&dir, RecursiveMode::NonRecursive)
.context(FileWatchSnafu { path: filepath })?;
let filepath = filepath.to_string();
std::thread::spawn(move || {
let filename = Path::new(&filepath).file_name();
let _hold = debouncer;
while let Ok(res) = rx.recv() {
if let Ok(event) = res {
let is_this_file = event.paths.iter().any(|p| p.file_name() == filename);
let is_relevant_event = matches!(
event.kind,
EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_)
);
if is_this_file && is_relevant_event {
info!(?event.kind, "User provider file {} changed", &filepath);
match load_credential_from_file(&filepath) {
Ok(credential) => {
let mut users =
users.lock().expect("users credential must be valid");
#[cfg(not(test))]
info!("User provider file {filepath} reloaded");
#[cfg(test)]
info!("User provider file {filepath} reloaded: {credential:?}");
*users = credential;
}
Err(err) => {
warn!(
?err,
"Fail to load credential from file {filepath}; keep the old one",
)
}
}
}
}
}
});
Ok(this)
}
}
#[async_trait]
impl UserProvider for WatchFileUserProvider {
fn name(&self) -> &str {
WATCH_FILE_USER_PROVIDER
}
async fn authenticate(&self, id: Identity<'_>, password: Password<'_>) -> Result<UserInfoRef> {
let users = self.users.lock().expect("users credential must be valid");
if let Some(users) = users.as_ref() {
authenticate_with_credential(users, id, password)
} else {
match id {
Identity::UserId(id, _) => {
warn!(id, "User provider file not exist, allow all users");
Ok(DefaultUserInfo::with_name(id))
}
}
}
}
async fn authorize(&self, _: &str, _: &str, _: &UserInfoRef) -> Result<()> {
// default allow all
Ok(())
}
}
#[cfg(test)]
pub mod test {
use std::time::{Duration, Instant};
use common_test_util::temp_dir::create_temp_dir;
use tokio::time::sleep;
use crate::user_provider::watch_file_user_provider::WatchFileUserProvider;
use crate::user_provider::{Identity, Password};
use crate::UserProvider;
async fn test_authenticate(
provider: &dyn UserProvider,
username: &str,
password: &str,
ok: bool,
timeout: Option<Duration>,
) {
if let Some(timeout) = timeout {
let deadline = Instant::now().checked_add(timeout).unwrap();
loop {
let re = provider
.authenticate(
Identity::UserId(username, None),
Password::PlainText(password.to_string().into()),
)
.await;
if re.is_ok() == ok {
break;
} else if Instant::now() < deadline {
sleep(Duration::from_millis(100)).await;
} else {
panic!("timeout (username: {username}, password: {password}, expected: {ok})");
}
}
} else {
let re = provider
.authenticate(
Identity::UserId(username, None),
Password::PlainText(password.to_string().into()),
)
.await;
assert_eq!(
re.is_ok(),
ok,
"username: {}, password: {}",
username,
password
);
}
}
#[tokio::test]
async fn test_file_provider() {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("test_file_provider");
let file_path = format!("{}/test_file_provider", dir.path().to_str().unwrap());
// write a tmp file
assert!(std::fs::write(&file_path, "root=123456\nadmin=654321\n").is_ok());
let provider = WatchFileUserProvider::new(file_path.as_str()).unwrap();
let timeout = Duration::from_secs(60);
test_authenticate(&provider, "root", "123456", true, None).await;
test_authenticate(&provider, "admin", "654321", true, None).await;
test_authenticate(&provider, "root", "654321", false, None).await;
// update the tmp file
assert!(std::fs::write(&file_path, "root=654321\n").is_ok());
test_authenticate(&provider, "root", "123456", false, Some(timeout)).await;
test_authenticate(&provider, "root", "654321", true, Some(timeout)).await;
test_authenticate(&provider, "admin", "654321", false, Some(timeout)).await;
// remove the tmp file
assert!(std::fs::remove_file(&file_path).is_ok());
test_authenticate(&provider, "root", "123456", true, Some(timeout)).await;
test_authenticate(&provider, "root", "654321", true, Some(timeout)).await;
test_authenticate(&provider, "admin", "654321", true, Some(timeout)).await;
// recreate the tmp file
assert!(std::fs::write(&file_path, "root=123456\n").is_ok());
test_authenticate(&provider, "root", "123456", true, Some(timeout)).await;
test_authenticate(&provider, "root", "654321", false, Some(timeout)).await;
test_authenticate(&provider, "admin", "654321", false, Some(timeout)).await;
}
}

View File

@@ -7,21 +7,21 @@ license.workspace = true
[features]
testing = []
[lints]
workspace = true
[dependencies]
api.workspace = true
arc-swap = "1.0"
arrow.workspace = true
arrow-schema.workspace = true
async-stream.workspace = true
async-trait = "0.1"
common-catalog.workspace = true
common-error.workspace = true
common-grpc.workspace = true
common-macro.workspace = true
common-meta.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
common-version.workspace = true
@@ -34,15 +34,13 @@ itertools.workspace = true
lazy_static.workspace = true
meta-client.workspace = true
moka = { workspace = true, features = ["future", "sync"] }
parking_lot = "0.12"
partition.workspace = true
paste = "1.0"
prometheus.workspace = true
regex.workspace = true
serde.workspace = true
serde_json.workspace = true
session.workspace = true
snafu.workspace = true
sql.workspace = true
store-api.workspace = true
table.workspace = true
tokio.workspace = true

View File

@@ -164,11 +164,8 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to find table partitions: #{table}"))]
FindPartitions {
source: partition::error::Error,
table: String,
},
#[snafu(display("Failed to find table partitions"))]
FindPartitions { source: partition::error::Error },
#[snafu(display("Failed to find region routes"))]
FindRegionRoutes { source: partition::error::Error },
@@ -254,6 +251,12 @@ pub enum Error {
source: common_meta::error::Error,
location: Location,
},
#[snafu(display("Get null from table cache, key: {}", key))]
TableCacheNotGet { key: String, location: Location },
#[snafu(display("Failed to get table cache, err: {}", err_msg))]
GetTableCache { err_msg: String },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -314,6 +317,7 @@ impl ErrorExt for Error {
Error::QueryAccessDenied { .. } => StatusCode::AccessDenied,
Error::Datafusion { .. } => StatusCode::EngineExecuteQuery,
Error::TableMetadataManager { source, .. } => source.status_code(),
Error::TableCacheNotGet { .. } | Error::GetTableCache { .. } => StatusCode::Internal,
}
}

View File

@@ -12,16 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod columns;
mod key_column_usage;
pub mod columns;
pub mod key_column_usage;
mod memory_table;
mod partitions;
mod predicate;
mod region_peers;
mod runtime_metrics;
mod schemata;
pub mod schemata;
mod table_names;
mod tables;
pub mod tables;
use std::collections::HashMap;
use std::sync::{Arc, Weak};
@@ -41,8 +41,7 @@ use table::error::{SchemaConversionSnafu, TablesRecordBatchSnafu};
use table::metadata::{
FilterPushDownType, TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType,
};
use table::thin_table::{ThinTable, ThinTableAdapter};
use table::TableRef;
use table::{Table, TableRef};
pub use table_names::*;
use self::columns::InformationSchemaColumns;
@@ -187,10 +186,9 @@ impl InformationSchemaProvider {
self.information_table(name).map(|table| {
let table_info = Self::table_info(self.catalog_name.clone(), &table);
let filter_pushdown = FilterPushDownType::Inexact;
let thin_table = ThinTable::new(table_info, filter_pushdown);
let data_source = Arc::new(InformationTableDataSource::new(table));
Arc::new(ThinTableAdapter::new(thin_table, data_source)) as _
let table = Table::new(table_info, filter_pushdown, data_source);
Arc::new(table)
})
}

View File

@@ -26,13 +26,16 @@ use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::prelude::{ConcreteDataType, DataType, MutableVector};
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{StringVectorBuilder, VectorRef};
use datatypes::vectors::{
ConstantVector, Int64Vector, Int64VectorBuilder, StringVector, StringVectorBuilder, VectorRef,
};
use futures::TryStreamExt;
use snafu::{OptionExt, ResultExt};
use sql::statements;
use store_api::storage::{ScanRequest, TableId};
use super::{InformationTable, COLUMNS};
@@ -48,18 +51,42 @@ pub(super) struct InformationSchemaColumns {
catalog_manager: Weak<dyn CatalogManager>,
}
const TABLE_CATALOG: &str = "table_catalog";
const TABLE_SCHEMA: &str = "table_schema";
const TABLE_NAME: &str = "table_name";
const COLUMN_NAME: &str = "column_name";
const DATA_TYPE: &str = "data_type";
const SEMANTIC_TYPE: &str = "semantic_type";
const COLUMN_DEFAULT: &str = "column_default";
const IS_NULLABLE: &str = "is_nullable";
pub const TABLE_CATALOG: &str = "table_catalog";
pub const TABLE_SCHEMA: &str = "table_schema";
pub const TABLE_NAME: &str = "table_name";
pub const COLUMN_NAME: &str = "column_name";
const ORDINAL_POSITION: &str = "ordinal_position";
const CHARACTER_MAXIMUM_LENGTH: &str = "character_maximum_length";
const CHARACTER_OCTET_LENGTH: &str = "character_octet_length";
const NUMERIC_PRECISION: &str = "numeric_precision";
const NUMERIC_SCALE: &str = "numeric_scale";
const DATETIME_PRECISION: &str = "datetime_precision";
const CHARACTER_SET_NAME: &str = "character_set_name";
pub const COLLATION_NAME: &str = "collation_name";
pub const COLUMN_KEY: &str = "column_key";
pub const EXTRA: &str = "extra";
pub const PRIVILEGES: &str = "privileges";
const GENERATION_EXPRESSION: &str = "generation_expression";
// Extension field to keep greptime data type name
pub const GREPTIME_DATA_TYPE: &str = "greptime_data_type";
pub const DATA_TYPE: &str = "data_type";
pub const SEMANTIC_TYPE: &str = "semantic_type";
pub const COLUMN_DEFAULT: &str = "column_default";
pub const IS_NULLABLE: &str = "is_nullable";
const COLUMN_TYPE: &str = "column_type";
const COLUMN_COMMENT: &str = "column_comment";
pub const COLUMN_COMMENT: &str = "column_comment";
const SRS_ID: &str = "srs_id";
const INIT_CAPACITY: usize = 42;
// The maximum length of string type
const MAX_STRING_LENGTH: i64 = 2147483647;
const UTF8_CHARSET_NAME: &str = "utf8";
const UTF8_COLLATE_NAME: &str = "utf8_bin";
const PRI_COLUMN_KEY: &str = "PRI";
const TIME_INDEX_COLUMN_KEY: &str = "TIME INDEX";
const DEFAULT_PRIVILEGES: &str = "select,insert";
const EMPTY_STR: &str = "";
impl InformationSchemaColumns {
pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
Self {
@@ -75,12 +102,46 @@ impl InformationSchemaColumns {
ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(COLUMN_NAME, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(ORDINAL_POSITION, ConcreteDataType::int64_datatype(), false),
ColumnSchema::new(
CHARACTER_MAXIMUM_LENGTH,
ConcreteDataType::int64_datatype(),
true,
),
ColumnSchema::new(
CHARACTER_OCTET_LENGTH,
ConcreteDataType::int64_datatype(),
true,
),
ColumnSchema::new(NUMERIC_PRECISION, ConcreteDataType::int64_datatype(), true),
ColumnSchema::new(NUMERIC_SCALE, ConcreteDataType::int64_datatype(), true),
ColumnSchema::new(DATETIME_PRECISION, ConcreteDataType::int64_datatype(), true),
ColumnSchema::new(
CHARACTER_SET_NAME,
ConcreteDataType::string_datatype(),
true,
),
ColumnSchema::new(COLLATION_NAME, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(COLUMN_KEY, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(EXTRA, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(PRIVILEGES, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(
GENERATION_EXPRESSION,
ConcreteDataType::string_datatype(),
false,
),
ColumnSchema::new(
GREPTIME_DATA_TYPE,
ConcreteDataType::string_datatype(),
false,
),
ColumnSchema::new(DATA_TYPE, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(SEMANTIC_TYPE, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(COLUMN_DEFAULT, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(IS_NULLABLE, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(COLUMN_TYPE, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(COLUMN_COMMENT, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(SRS_ID, ConcreteDataType::int64_datatype(), true),
]))
}
@@ -136,9 +197,18 @@ struct InformationSchemaColumnsBuilder {
schema_names: StringVectorBuilder,
table_names: StringVectorBuilder,
column_names: StringVectorBuilder,
ordinal_positions: Int64VectorBuilder,
character_maximum_lengths: Int64VectorBuilder,
character_octet_lengths: Int64VectorBuilder,
numeric_precisions: Int64VectorBuilder,
numeric_scales: Int64VectorBuilder,
datetime_precisions: Int64VectorBuilder,
character_set_names: StringVectorBuilder,
collation_names: StringVectorBuilder,
column_keys: StringVectorBuilder,
greptime_data_types: StringVectorBuilder,
data_types: StringVectorBuilder,
semantic_types: StringVectorBuilder,
column_defaults: StringVectorBuilder,
is_nullables: StringVectorBuilder,
column_types: StringVectorBuilder,
@@ -159,6 +229,16 @@ impl InformationSchemaColumnsBuilder {
schema_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
column_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
ordinal_positions: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
character_maximum_lengths: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
character_octet_lengths: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
numeric_precisions: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
numeric_scales: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
datetime_precisions: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
character_set_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
collation_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
column_keys: StringVectorBuilder::with_capacity(INIT_CAPACITY),
greptime_data_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
data_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
semantic_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
column_defaults: StringVectorBuilder::with_capacity(INIT_CAPACITY),
@@ -194,6 +274,7 @@ impl InformationSchemaColumnsBuilder {
};
self.add_column(
idx,
&predicates,
&catalog_name,
&schema_name,
@@ -208,8 +289,10 @@ impl InformationSchemaColumnsBuilder {
self.finish()
}
#[allow(clippy::too_many_arguments)]
fn add_column(
&mut self,
index: usize,
predicates: &Predicates,
catalog_name: &str,
schema_name: &str,
@@ -217,7 +300,16 @@ impl InformationSchemaColumnsBuilder {
semantic_type: &str,
column_schema: &ColumnSchema,
) {
let data_type = &column_schema.data_type.name();
// Use sql data type name
let data_type = statements::concrete_data_type_to_sql_data_type(&column_schema.data_type)
.map(|dt| dt.to_string().to_lowercase())
.unwrap_or_else(|_| column_schema.data_type.name());
let column_key = match semantic_type {
SEMANTIC_TYPE_PRIMARY_KEY => PRI_COLUMN_KEY,
SEMANTIC_TYPE_TIME_INDEX => TIME_INDEX_COLUMN_KEY,
_ => EMPTY_STR,
};
let row = [
(TABLE_CATALOG, &Value::from(catalog_name)),
@@ -226,6 +318,8 @@ impl InformationSchemaColumnsBuilder {
(COLUMN_NAME, &Value::from(column_schema.name.as_str())),
(DATA_TYPE, &Value::from(data_type.as_str())),
(SEMANTIC_TYPE, &Value::from(semantic_type)),
(ORDINAL_POSITION, &Value::from((index + 1) as i64)),
(COLUMN_KEY, &Value::from(column_key)),
];
if !predicates.eval(&row) {
@@ -236,7 +330,63 @@ impl InformationSchemaColumnsBuilder {
self.schema_names.push(Some(schema_name));
self.table_names.push(Some(table_name));
self.column_names.push(Some(&column_schema.name));
self.data_types.push(Some(data_type));
// Starts from 1
self.ordinal_positions.push(Some((index + 1) as i64));
if column_schema.data_type.is_string() {
self.character_maximum_lengths.push(Some(MAX_STRING_LENGTH));
self.character_octet_lengths.push(Some(MAX_STRING_LENGTH));
self.numeric_precisions.push(None);
self.numeric_scales.push(None);
self.datetime_precisions.push(None);
self.character_set_names.push(Some(UTF8_CHARSET_NAME));
self.collation_names.push(Some(UTF8_COLLATE_NAME));
} else if column_schema.data_type.is_numeric() || column_schema.data_type.is_decimal() {
self.character_maximum_lengths.push(None);
self.character_octet_lengths.push(None);
self.numeric_precisions.push(
column_schema
.data_type
.numeric_precision()
.map(|x| x as i64),
);
self.numeric_scales
.push(column_schema.data_type.numeric_scale().map(|x| x as i64));
self.datetime_precisions.push(None);
self.character_set_names.push(None);
self.collation_names.push(None);
} else {
self.character_maximum_lengths.push(None);
self.character_octet_lengths.push(None);
self.numeric_precisions.push(None);
self.numeric_scales.push(None);
match &column_schema.data_type {
ConcreteDataType::DateTime(datetime_type) => {
self.datetime_precisions
.push(Some(datetime_type.precision() as i64));
}
ConcreteDataType::Timestamp(ts_type) => {
self.datetime_precisions
.push(Some(ts_type.precision() as i64));
}
ConcreteDataType::Time(time_type) => {
self.datetime_precisions
.push(Some(time_type.precision() as i64));
}
_ => self.datetime_precisions.push(None),
}
self.character_set_names.push(None);
self.collation_names.push(None);
}
self.column_keys.push(Some(column_key));
self.greptime_data_types
.push(Some(&column_schema.data_type.name()));
self.data_types.push(Some(&data_type));
self.semantic_types.push(Some(semantic_type));
self.column_defaults.push(
column_schema
@@ -249,23 +399,52 @@ impl InformationSchemaColumnsBuilder {
} else {
self.is_nullables.push(Some("No"));
}
self.column_types.push(Some(data_type));
self.column_types.push(Some(&data_type));
self.column_comments
.push(column_schema.column_comment().map(|x| x.as_ref()));
}
fn finish(&mut self) -> Result<RecordBatch> {
let rows_num = self.collation_names.len();
let privileges = Arc::new(ConstantVector::new(
Arc::new(StringVector::from(vec![DEFAULT_PRIVILEGES])),
rows_num,
));
let empty_string = Arc::new(ConstantVector::new(
Arc::new(StringVector::from(vec![EMPTY_STR])),
rows_num,
));
let srs_ids = Arc::new(ConstantVector::new(
Arc::new(Int64Vector::from(vec![None])),
rows_num,
));
let columns: Vec<VectorRef> = vec![
Arc::new(self.catalog_names.finish()),
Arc::new(self.schema_names.finish()),
Arc::new(self.table_names.finish()),
Arc::new(self.column_names.finish()),
Arc::new(self.ordinal_positions.finish()),
Arc::new(self.character_maximum_lengths.finish()),
Arc::new(self.character_octet_lengths.finish()),
Arc::new(self.numeric_precisions.finish()),
Arc::new(self.numeric_scales.finish()),
Arc::new(self.datetime_precisions.finish()),
Arc::new(self.character_set_names.finish()),
Arc::new(self.collation_names.finish()),
Arc::new(self.column_keys.finish()),
empty_string.clone(),
privileges,
empty_string,
Arc::new(self.greptime_data_types.finish()),
Arc::new(self.data_types.finish()),
Arc::new(self.semantic_types.finish()),
Arc::new(self.column_defaults.finish()),
Arc::new(self.is_nullables.finish()),
Arc::new(self.column_types.finish()),
Arc::new(self.column_comments.finish()),
srs_ids,
];
RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)

View File

@@ -37,13 +37,16 @@ use crate::error::{
use crate::information_schema::{InformationTable, Predicates};
use crate::CatalogManager;
const CONSTRAINT_SCHEMA: &str = "constraint_schema";
const CONSTRAINT_NAME: &str = "constraint_name";
const TABLE_CATALOG: &str = "table_catalog";
const TABLE_SCHEMA: &str = "table_schema";
const TABLE_NAME: &str = "table_name";
const COLUMN_NAME: &str = "column_name";
const ORDINAL_POSITION: &str = "ordinal_position";
pub const CONSTRAINT_SCHEMA: &str = "constraint_schema";
pub const CONSTRAINT_NAME: &str = "constraint_name";
// It's always `def` in MySQL
pub const TABLE_CATALOG: &str = "table_catalog";
// The real catalog name for this key column.
pub const REAL_TABLE_CATALOG: &str = "real_table_catalog";
pub const TABLE_SCHEMA: &str = "table_schema";
pub const TABLE_NAME: &str = "table_name";
pub const COLUMN_NAME: &str = "column_name";
pub const ORDINAL_POSITION: &str = "ordinal_position";
const INIT_CAPACITY: usize = 42;
/// The virtual table implementation for `information_schema.KEY_COLUMN_USAGE`.
@@ -76,6 +79,11 @@ impl InformationSchemaKeyColumnUsage {
),
ColumnSchema::new(CONSTRAINT_NAME, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(
REAL_TABLE_CATALOG,
ConcreteDataType::string_datatype(),
false,
),
ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(COLUMN_NAME, ConcreteDataType::string_datatype(), false),
@@ -158,6 +166,7 @@ struct InformationSchemaKeyColumnUsageBuilder {
constraint_schema: StringVectorBuilder,
constraint_name: StringVectorBuilder,
table_catalog: StringVectorBuilder,
real_table_catalog: StringVectorBuilder,
table_schema: StringVectorBuilder,
table_name: StringVectorBuilder,
column_name: StringVectorBuilder,
@@ -179,6 +188,7 @@ impl InformationSchemaKeyColumnUsageBuilder {
constraint_schema: StringVectorBuilder::with_capacity(INIT_CAPACITY),
constraint_name: StringVectorBuilder::with_capacity(INIT_CAPACITY),
table_catalog: StringVectorBuilder::with_capacity(INIT_CAPACITY),
real_table_catalog: StringVectorBuilder::with_capacity(INIT_CAPACITY),
table_schema: StringVectorBuilder::with_capacity(INIT_CAPACITY),
table_name: StringVectorBuilder::with_capacity(INIT_CAPACITY),
column_name: StringVectorBuilder::with_capacity(INIT_CAPACITY),
@@ -223,6 +233,7 @@ impl InformationSchemaKeyColumnUsageBuilder {
&predicates,
&schema_name,
"TIME INDEX",
&catalog_name,
&schema_name,
&table_name,
&column.name,
@@ -231,6 +242,7 @@ impl InformationSchemaKeyColumnUsageBuilder {
}
if keys.contains(&idx) {
primary_constraints.push((
catalog_name.clone(),
schema_name.clone(),
table_name.clone(),
column.name.clone(),
@@ -244,13 +256,14 @@ impl InformationSchemaKeyColumnUsageBuilder {
}
}
for (i, (schema_name, table_name, column_name)) in
for (i, (catalog_name, schema_name, table_name, column_name)) in
primary_constraints.into_iter().enumerate()
{
self.add_key_column_usage(
&predicates,
&schema_name,
"PRIMARY",
&catalog_name,
&schema_name,
&table_name,
&column_name,
@@ -269,6 +282,7 @@ impl InformationSchemaKeyColumnUsageBuilder {
predicates: &Predicates,
constraint_schema: &str,
constraint_name: &str,
table_catalog: &str,
table_schema: &str,
table_name: &str,
column_name: &str,
@@ -277,6 +291,7 @@ impl InformationSchemaKeyColumnUsageBuilder {
let row = [
(CONSTRAINT_SCHEMA, &Value::from(constraint_schema)),
(CONSTRAINT_NAME, &Value::from(constraint_name)),
(REAL_TABLE_CATALOG, &Value::from(table_catalog)),
(TABLE_SCHEMA, &Value::from(table_schema)),
(TABLE_NAME, &Value::from(table_name)),
(COLUMN_NAME, &Value::from(column_name)),
@@ -291,6 +306,7 @@ impl InformationSchemaKeyColumnUsageBuilder {
self.constraint_schema.push(Some(constraint_schema));
self.constraint_name.push(Some(constraint_name));
self.table_catalog.push(Some("def"));
self.real_table_catalog.push(Some(table_catalog));
self.table_schema.push(Some(table_schema));
self.table_name.push(Some(table_name));
self.column_name.push(Some(column_name));
@@ -310,6 +326,7 @@ impl InformationSchemaKeyColumnUsageBuilder {
Arc::new(self.constraint_schema.finish()),
Arc::new(self.constraint_name.finish()),
Arc::new(self.table_catalog.finish()),
Arc::new(self.real_table_catalog.finish()),
Arc::new(self.table_schema.finish()),
Arc::new(self.table_name.finish()),
Arc::new(self.column_name.finish()),

View File

@@ -14,13 +14,15 @@
use std::sync::Arc;
use common_catalog::consts::MITO_ENGINE;
use common_catalog::consts::{METRIC_ENGINE, MITO_ENGINE};
use datatypes::prelude::{ConcreteDataType, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::vectors::{Int64Vector, StringVector};
use crate::information_schema::table_names::*;
const NO_VALUE: &str = "NO";
/// Find the schema and columns by the table_name, only valid for memory tables.
/// Safety: the user MUST ensure the table schema exists, panic otherwise.
pub fn get_schema_columns(table_name: &str) -> (SchemaRef, Vec<VectorRef>) {
@@ -59,14 +61,15 @@ pub fn get_schema_columns(table_name: &str) -> (SchemaRef, Vec<VectorRef>) {
"SAVEPOINTS",
]),
vec![
Arc::new(StringVector::from(vec![MITO_ENGINE])),
Arc::new(StringVector::from(vec!["DEFAULT"])),
Arc::new(StringVector::from(vec![MITO_ENGINE, METRIC_ENGINE])),
Arc::new(StringVector::from(vec!["DEFAULT", "YES"])),
Arc::new(StringVector::from(vec![
"Storage engine for time-series data",
"Storage engine for observability scenarios, which is adept at handling a large number of small tables, making it particularly suitable for cloud-native monitoring",
])),
Arc::new(StringVector::from(vec!["NO"])),
Arc::new(StringVector::from(vec!["NO"])),
Arc::new(StringVector::from(vec!["NO"])),
Arc::new(StringVector::from(vec![NO_VALUE, NO_VALUE])),
Arc::new(StringVector::from(vec![NO_VALUE, NO_VALUE])),
Arc::new(StringVector::from(vec![NO_VALUE, NO_VALUE])),
],
),

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use core::pin::pin;
use std::sync::{Arc, Weak};
use arrow_schema::SchemaRef as ArrowSchemaRef;
@@ -31,7 +32,7 @@ use datatypes::vectors::{
ConstantVector, DateTimeVector, DateTimeVectorBuilder, Int64Vector, Int64VectorBuilder,
MutableVector, StringVector, StringVectorBuilder, UInt64VectorBuilder,
};
use futures::TryStreamExt;
use futures::{StreamExt, TryStreamExt};
use partition::manager::PartitionInfo;
use partition::partition::PartitionDef;
use snafu::{OptionExt, ResultExt};
@@ -240,40 +241,64 @@ impl InformationSchemaPartitionsBuilder {
let predicates = Predicates::from_scan_request(&request);
for schema_name in catalog_manager.schema_names(&catalog_name).await? {
let mut stream = catalog_manager.tables(&catalog_name, &schema_name).await;
let table_info_stream = catalog_manager
.tables(&catalog_name, &schema_name)
.await
.try_filter_map(|t| async move {
let table_info = t.table_info();
if table_info.table_type == TableType::Temporary {
Ok(None)
} else {
Ok(Some(table_info))
}
});
while let Some(table) = stream.try_next().await? {
let table_info = table.table_info();
const BATCH_SIZE: usize = 128;
if table_info.table_type == TableType::Temporary {
continue;
}
// Split table infos into chunks
let mut table_info_chunks = pin!(table_info_stream.ready_chunks(BATCH_SIZE));
let table_id = table_info.ident.table_id;
let partitions = if let Some(partition_manager) = &partition_manager {
while let Some(table_infos) = table_info_chunks.next().await {
let table_infos = table_infos.into_iter().collect::<Result<Vec<_>>>()?;
let table_ids: Vec<TableId> =
table_infos.iter().map(|info| info.ident.table_id).collect();
let mut table_partitions = if let Some(partition_manager) = &partition_manager {
partition_manager
.find_table_partitions(table_id)
.batch_find_table_partitions(&table_ids)
.await
.context(FindPartitionsSnafu {
table: &table_info.name,
})?
.context(FindPartitionsSnafu)?
} else {
// Current node must be a standalone instance, contains only one partition by default.
// TODO(dennis): change it when we support multi-regions for standalone.
vec![PartitionInfo {
id: RegionId::new(table_id, 0),
partition: PartitionDef::new(vec![], vec![]),
}]
table_ids
.into_iter()
.map(|table_id| {
(
table_id,
vec![PartitionInfo {
id: RegionId::new(table_id, 0),
partition: PartitionDef::new(vec![], vec![]),
}],
)
})
.collect()
};
self.add_partitions(
&predicates,
&table_info,
&catalog_name,
&schema_name,
&table_info.name,
&partitions,
);
for table_info in table_infos {
let partitions = table_partitions
.remove(&table_info.ident.table_id)
.unwrap_or(vec![]);
self.add_partitions(
&predicates,
&table_info,
&catalog_name,
&schema_name,
&table_info.name,
&partitions,
);
}
}
}

View File

@@ -199,7 +199,7 @@ impl InformationSchemaRegionPeersBuilder {
let table_routes = if let Some(partition_manager) = &partition_manager {
partition_manager
.find_region_routes_batch(&table_ids)
.batch_find_region_routes(&table_ids)
.await
.context(FindRegionRoutesSnafu)?
} else {

View File

@@ -37,8 +37,8 @@ use crate::error::{
use crate::information_schema::{InformationTable, Predicates};
use crate::CatalogManager;
const CATALOG_NAME: &str = "catalog_name";
const SCHEMA_NAME: &str = "schema_name";
pub const CATALOG_NAME: &str = "catalog_name";
pub const SCHEMA_NAME: &str = "schema_name";
const DEFAULT_CHARACTER_SET_NAME: &str = "default_character_set_name";
const DEFAULT_COLLATION_NAME: &str = "default_collation_name";
const INIT_CAPACITY: usize = 42;

View File

@@ -39,10 +39,10 @@ use crate::error::{
use crate::information_schema::{InformationTable, Predicates};
use crate::CatalogManager;
const TABLE_CATALOG: &str = "table_catalog";
const TABLE_SCHEMA: &str = "table_schema";
const TABLE_NAME: &str = "table_name";
const TABLE_TYPE: &str = "table_type";
pub const TABLE_CATALOG: &str = "table_catalog";
pub const TABLE_SCHEMA: &str = "table_schema";
pub const TABLE_NAME: &str = "table_name";
pub const TABLE_TYPE: &str = "table_type";
const TABLE_ID: &str = "table_id";
const ENGINE: &str = "engine";
const INIT_CAPACITY: usize = 42;

View File

@@ -82,12 +82,10 @@ impl CachedMetaKvBackendBuilder {
let cache_ttl = self.cache_ttl.unwrap_or(DEFAULT_CACHE_TTL);
let cache_tti = self.cache_tti.unwrap_or(DEFAULT_CACHE_TTI);
let cache = Arc::new(
CacheBuilder::new(cache_max_capacity)
.time_to_live(cache_ttl)
.time_to_idle(cache_tti)
.build(),
);
let cache = CacheBuilder::new(cache_max_capacity)
.time_to_live(cache_ttl)
.time_to_idle(cache_tti)
.build();
let kv_backend = Arc::new(MetaKvBackend {
client: self.meta_client,
@@ -104,7 +102,7 @@ impl CachedMetaKvBackendBuilder {
}
}
pub type CacheBackendRef = Arc<Cache<Vec<u8>, KeyValue>>;
pub type CacheBackend = Cache<Vec<u8>, KeyValue>;
/// A wrapper of `MetaKvBackend` with cache support.
///
@@ -117,7 +115,7 @@ pub type CacheBackendRef = Arc<Cache<Vec<u8>, KeyValue>>;
/// TTL and TTI for cache.
pub struct CachedMetaKvBackend {
kv_backend: KvBackendRef,
cache: CacheBackendRef,
cache: CacheBackend,
name: String,
version: AtomicUsize,
}
@@ -317,12 +315,10 @@ impl CachedMetaKvBackend {
// only for test
#[cfg(test)]
fn wrap(kv_backend: KvBackendRef) -> Self {
let cache = Arc::new(
CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
.time_to_live(DEFAULT_CACHE_TTL)
.time_to_idle(DEFAULT_CACHE_TTI)
.build(),
);
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
.time_to_live(DEFAULT_CACHE_TTL)
.time_to_idle(DEFAULT_CACHE_TTI)
.build();
let name = format!("CachedKvBackend({})", kv_backend.name());
Self {
@@ -333,7 +329,7 @@ impl CachedMetaKvBackend {
}
}
pub fn cache(&self) -> &CacheBackendRef {
pub fn cache(&self) -> &CacheBackend {
&self.cache
}
@@ -368,6 +364,10 @@ impl KvBackend for MetaKvBackend {
"MetaKvBackend"
}
fn as_any(&self) -> &dyn Any {
self
}
async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
self.client
.range(req)
@@ -376,27 +376,6 @@ impl KvBackend for MetaKvBackend {
.context(ExternalSnafu)
}
async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>> {
let mut response = self
.client
.range(RangeRequest::new().with_key(key))
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
Ok(response.take_kvs().get_mut(0).map(|kv| KeyValue {
key: kv.take_key(),
value: kv.take_value(),
}))
}
async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
self.client
.batch_put(req)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)
}
async fn put(&self, req: PutRequest) -> Result<PutResponse> {
self.client
.put(req)
@@ -405,17 +384,9 @@ impl KvBackend for MetaKvBackend {
.context(ExternalSnafu)
}
async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
self.client
.delete_range(req)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)
}
async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
self.client
.batch_delete(req)
.batch_put(req)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)
@@ -440,8 +411,33 @@ impl KvBackend for MetaKvBackend {
.context(ExternalSnafu)
}
fn as_any(&self) -> &dyn Any {
self
async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
self.client
.delete_range(req)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)
}
async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
self.client
.batch_delete(req)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)
}
async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>> {
let mut response = self
.client
.range(RangeRequest::new().with_key(key))
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
Ok(response.take_kvs().get_mut(0).map(|kv| KeyValue {
key: kv.take_key(),
value: kv.take_value(),
}))
}
}

View File

@@ -15,32 +15,36 @@
use std::any::Any;
use std::collections::BTreeSet;
use std::sync::{Arc, Weak};
use std::time::Duration;
use async_stream::try_stream;
use common_catalog::consts::{DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID};
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID,
};
use common_catalog::format_full_table_name;
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::{CacheInvalidator, CacheInvalidatorRef, Context};
use common_meta::error::Result as MetaResult;
use common_meta::cache_invalidator::{CacheInvalidator, Context, MultiCacheInvalidator};
use common_meta::instruction::CacheIdent;
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_name::TableNameKey;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::table_name::TableName;
use futures_util::stream::BoxStream;
use futures_util::{StreamExt, TryStreamExt};
use moka::future::{Cache as AsyncCache, CacheBuilder};
use moka::sync::Cache;
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
use snafu::prelude::*;
use table::dist_table::DistTable;
use table::metadata::TableId;
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
use table::TableRef;
use crate::error::Error::{GetTableCache, TableCacheNotGet};
use crate::error::{
self as catalog_err, ListCatalogsSnafu, ListSchemasSnafu, ListTablesSnafu,
Result as CatalogResult, TableMetadataManagerSnafu,
InvalidTableInfoInCatalogSnafu, ListCatalogsSnafu, ListSchemasSnafu, ListTablesSnafu, Result,
TableCacheNotGetSnafu, TableMetadataManagerSnafu,
};
use crate::information_schema::InformationSchemaProvider;
use crate::CatalogManager;
@@ -52,56 +56,74 @@ use crate::CatalogManager;
/// comes from `SystemCatalog`, which is static and read-only.
#[derive(Clone)]
pub struct KvBackendCatalogManager {
// TODO(LFC): Maybe use a real implementation for Standalone mode.
// Now we use `NoopKvCacheInvalidator` for Standalone mode. In Standalone mode, the KV backend
// is implemented by RaftEngine. Maybe we need a cache for it?
cache_invalidator: CacheInvalidatorRef,
partition_manager: PartitionRuleManagerRef,
table_metadata_manager: TableMetadataManagerRef,
/// A sub-CatalogManager that handles system tables
system_catalog: SystemCatalog,
table_cache: AsyncCache<String, TableRef>,
}
fn make_table(table_info_value: TableInfoValue) -> CatalogResult<TableRef> {
let table_info = table_info_value
.table_info
.try_into()
.context(catalog_err::InvalidTableInfoInCatalogSnafu)?;
Ok(DistTable::table(Arc::new(table_info)))
struct TableCacheInvalidator {
table_cache: AsyncCache<String, TableRef>,
}
impl TableCacheInvalidator {
pub fn new(table_cache: AsyncCache<String, TableRef>) -> Self {
Self { table_cache }
}
}
#[async_trait::async_trait]
impl CacheInvalidator for KvBackendCatalogManager {
async fn invalidate_table_id(&self, ctx: &Context, table_id: TableId) -> MetaResult<()> {
self.cache_invalidator
.invalidate_table_id(ctx, table_id)
.await
}
async fn invalidate_table_name(&self, ctx: &Context, table_name: TableName) -> MetaResult<()> {
self.cache_invalidator
.invalidate_table_name(ctx, table_name)
.await
impl CacheInvalidator for TableCacheInvalidator {
async fn invalidate(
&self,
_ctx: &Context,
caches: Vec<CacheIdent>,
) -> common_meta::error::Result<()> {
for cache in caches {
if let CacheIdent::TableName(table_name) = cache {
let table_cache_key = format_full_table_name(
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
);
self.table_cache.invalidate(&table_cache_key).await;
}
}
Ok(())
}
}
const DEFAULT_CACHED_CATALOG: u64 = 128;
const CATALOG_CACHE_MAX_CAPACITY: u64 = 128;
const TABLE_CACHE_MAX_CAPACITY: u64 = 65536;
const TABLE_CACHE_TTL: Duration = Duration::from_secs(10 * 60);
const TABLE_CACHE_TTI: Duration = Duration::from_secs(5 * 60);
impl KvBackendCatalogManager {
pub fn new(backend: KvBackendRef, cache_invalidator: CacheInvalidatorRef) -> Arc<Self> {
pub async fn new(
backend: KvBackendRef,
multi_cache_invalidator: Arc<MultiCacheInvalidator>,
) -> Arc<Self> {
let table_cache: AsyncCache<String, TableRef> = CacheBuilder::new(TABLE_CACHE_MAX_CAPACITY)
.time_to_live(TABLE_CACHE_TTL)
.time_to_idle(TABLE_CACHE_TTI)
.build();
multi_cache_invalidator
.add_invalidator(Arc::new(TableCacheInvalidator::new(table_cache.clone())))
.await;
Arc::new_cyclic(|me| Self {
partition_manager: Arc::new(PartitionRuleManager::new(backend.clone())),
table_metadata_manager: Arc::new(TableMetadataManager::new(backend)),
cache_invalidator,
system_catalog: SystemCatalog {
catalog_manager: me.clone(),
catalog_cache: Cache::new(DEFAULT_CACHED_CATALOG),
catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY),
information_schema_provider: Arc::new(InformationSchemaProvider::new(
// The catalog name is not used in system_catalog, so let it empty
String::default(),
DEFAULT_CATALOG_NAME.to_string(),
me.clone(),
)),
},
table_cache,
})
}
@@ -120,12 +142,11 @@ impl CatalogManager for KvBackendCatalogManager {
self
}
async fn catalog_names(&self) -> CatalogResult<Vec<String>> {
async fn catalog_names(&self) -> Result<Vec<String>> {
let stream = self
.table_metadata_manager
.catalog_manager()
.catalog_names()
.await;
.catalog_names();
let keys = stream
.try_collect::<Vec<_>>()
@@ -136,12 +157,11 @@ impl CatalogManager for KvBackendCatalogManager {
Ok(keys)
}
async fn schema_names(&self, catalog: &str) -> CatalogResult<Vec<String>> {
async fn schema_names(&self, catalog: &str) -> Result<Vec<String>> {
let stream = self
.table_metadata_manager
.schema_manager()
.schema_names(catalog)
.await;
.schema_names(catalog);
let mut keys = stream
.try_collect::<BTreeSet<_>>()
.await
@@ -153,12 +173,11 @@ impl CatalogManager for KvBackendCatalogManager {
Ok(keys.into_iter().collect())
}
async fn table_names(&self, catalog: &str, schema: &str) -> CatalogResult<Vec<String>> {
async fn table_names(&self, catalog: &str, schema: &str) -> Result<Vec<String>> {
let stream = self
.table_metadata_manager
.table_name_manager()
.tables(catalog, schema)
.await;
.tables(catalog, schema);
let mut tables = stream
.try_collect::<Vec<_>>()
.await
@@ -172,7 +191,7 @@ impl CatalogManager for KvBackendCatalogManager {
Ok(tables.into_iter().collect())
}
async fn catalog_exists(&self, catalog: &str) -> CatalogResult<bool> {
async fn catalog_exists(&self, catalog: &str) -> Result<bool> {
self.table_metadata_manager
.catalog_manager()
.exists(CatalogNameKey::new(catalog))
@@ -180,7 +199,7 @@ impl CatalogManager for KvBackendCatalogManager {
.context(TableMetadataManagerSnafu)
}
async fn schema_exists(&self, catalog: &str, schema: &str) -> CatalogResult<bool> {
async fn schema_exists(&self, catalog: &str, schema: &str) -> Result<bool> {
if self.system_catalog.schema_exist(schema) {
return Ok(true);
}
@@ -192,7 +211,7 @@ impl CatalogManager for KvBackendCatalogManager {
.context(TableMetadataManagerSnafu)
}
async fn table_exists(&self, catalog: &str, schema: &str, table: &str) -> CatalogResult<bool> {
async fn table_exists(&self, catalog: &str, schema: &str, table: &str) -> Result<bool> {
if self.system_catalog.table_exist(schema, table) {
return Ok(true);
}
@@ -211,41 +230,64 @@ impl CatalogManager for KvBackendCatalogManager {
catalog: &str,
schema: &str,
table_name: &str,
) -> CatalogResult<Option<TableRef>> {
) -> Result<Option<TableRef>> {
if let Some(table) = self.system_catalog.table(catalog, schema, table_name) {
return Ok(Some(table));
}
let key = TableNameKey::new(catalog, schema, table_name);
let Some(table_name_value) = self
.table_metadata_manager
.table_name_manager()
.get(key)
.await
.context(TableMetadataManagerSnafu)?
else {
return Ok(None);
};
let table_id = table_name_value.table_id();
let init = async {
let table_name_key = TableNameKey::new(catalog, schema, table_name);
let Some(table_name_value) = self
.table_metadata_manager
.table_name_manager()
.get(table_name_key)
.await
.context(TableMetadataManagerSnafu)?
else {
return TableCacheNotGetSnafu {
key: table_name_key.to_string(),
}
.fail();
};
let table_id = table_name_value.table_id();
let Some(table_info_value) = self
.table_metadata_manager
.table_info_manager()
.get(table_id)
.await
.context(TableMetadataManagerSnafu)?
.map(|v| v.into_inner())
else {
return Ok(None);
let Some(table_info_value) = self
.table_metadata_manager
.table_info_manager()
.get(table_id)
.await
.context(TableMetadataManagerSnafu)?
.map(|v| v.into_inner())
else {
return TableCacheNotGetSnafu {
key: table_name_key.to_string(),
}
.fail();
};
build_table(table_info_value)
};
make_table(table_info_value).map(Some)
match self
.table_cache
.try_get_with_by_ref(&format_full_table_name(catalog, schema, table_name), init)
.await
{
Ok(table) => Ok(Some(table)),
Err(err) => match err.as_ref() {
TableCacheNotGet { .. } => Ok(None),
_ => Err(err),
},
}
.map_err(|err| GetTableCache {
err_msg: err.to_string(),
})
}
async fn tables<'a>(
&'a self,
catalog: &'a str,
schema: &'a str,
) -> BoxStream<'a, CatalogResult<TableRef>> {
) -> BoxStream<'a, Result<TableRef>> {
let sys_tables = try_stream!({
// System tables
let sys_table_names = self.system_catalog.table_names(schema);
@@ -260,7 +302,6 @@ impl CatalogManager for KvBackendCatalogManager {
.table_metadata_manager
.table_name_manager()
.tables(catalog, schema)
.await
.map_ok(|(_, v)| v.table_id());
const BATCH_SIZE: usize = 128;
let user_tables = try_stream!({
@@ -270,7 +311,7 @@ impl CatalogManager for KvBackendCatalogManager {
while let Some(table_ids) = table_id_chunks.next().await {
let table_ids = table_ids
.into_iter()
.collect::<Result<Vec<_>, _>>()
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(BoxedError::new)
.context(ListTablesSnafu { catalog, schema })?;
@@ -282,7 +323,7 @@ impl CatalogManager for KvBackendCatalogManager {
.context(TableMetadataManagerSnafu)?;
for table_info_value in table_info_values.into_values() {
yield make_table(table_info_value)?;
yield build_table(table_info_value)?;
}
}
});
@@ -291,6 +332,14 @@ impl CatalogManager for KvBackendCatalogManager {
}
}
fn build_table(table_info_value: TableInfoValue) -> Result<TableRef> {
let table_info = table_info_value
.table_info
.try_into()
.context(InvalidTableInfoInCatalogSnafu)?;
Ok(DistTable::table(Arc::new(table_info)))
}
// TODO: This struct can hold a static map of all system tables when
// the upper layer (e.g., procedure) can inform the catalog manager
// a new catalog is created.

View File

@@ -19,10 +19,10 @@ use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use api::v1::CreateTableExpr;
use futures::future::BoxFuture;
use futures_util::stream::BoxStream;
use table::metadata::TableId;
use table::requests::CreateTableRequest;
use table::TableRef;
use crate::error::Result;
@@ -75,9 +75,9 @@ pub type OpenSystemTableHook =
/// Register system table request:
/// - When system table is already created and registered, the hook will be called
/// with table ref after opening the system table
/// - When system table is not exists, create and register the table by create_table_request and calls open_hook with the created table.
/// - When system table is not exists, create and register the table by `create_table_expr` and calls `open_hook` with the created table.
pub struct RegisterSystemTableRequest {
pub create_table_request: CreateTableRequest,
pub create_table_expr: CreateTableExpr,
pub open_hook: Option<OpenSystemTableHook>,
}

View File

@@ -7,13 +7,15 @@ license.workspace = true
[features]
testing = []
[lints]
workspace = true
[dependencies]
api.workspace = true
arc-swap = "1.6"
arrow-flight.workspace = true
async-stream.workspace = true
async-trait.workspace = true
common-base.workspace = true
common-catalog.workspace = true
common-error.workspace = true
common-grpc.workspace = true
@@ -22,10 +24,6 @@ common-meta.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
datafusion.workspace = true
datatypes.workspace = true
derive_builder.workspace = true
enum_dispatch = "0.3"
futures-util.workspace = true
lazy_static.workspace = true
@@ -34,7 +32,7 @@ parking_lot = "0.12"
prometheus.workspace = true
prost.workspace = true
rand.workspace = true
session.workspace = true
serde_json.workspace = true
snafu.workspace = true
tokio.workspace = true
tokio-stream = { workspace = true, features = ["net"] }

View File

@@ -307,7 +307,7 @@ impl Database {
reason: "Expect 'AffectedRows' Flight messages to be the one and the only!"
}
);
Ok(Output::AffectedRows(rows))
Ok(Output::new_with_affected_rows(rows))
}
FlightMessage::Recordbatch(_) | FlightMessage::Metrics(_) => {
IllegalFlightMessagesSnafu {
@@ -340,7 +340,7 @@ impl Database {
output_ordering: None,
metrics: Default::default(),
};
Ok(Output::Stream(Box::pin(record_batch_stream)))
Ok(Output::new_with_stream(Box::pin(record_batch_stream)))
}
}
}

View File

@@ -134,10 +134,17 @@ impl From<Status> for Error {
impl Error {
pub fn should_retry(&self) -> bool {
!matches!(
// TODO(weny): figure out each case of these codes.
matches!(
self,
Self::RegionServer {
code: Code::InvalidArgument,
code: Code::Cancelled,
..
} | Self::RegionServer {
code: Code::DeadlineExceeded,
..
} | Self::RegionServer {
code: Code::Unavailable,
..
}
)

View File

@@ -26,7 +26,7 @@ use api::v1::greptime_response::Response;
use api::v1::{AffectedRows, GreptimeResponse};
pub use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::status_code::StatusCode;
pub use common_query::Output;
pub use common_query::{Output, OutputData, OutputMeta};
pub use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
use snafu::OptionExt;

View File

@@ -14,7 +14,7 @@
use std::sync::Arc;
use api::v1::region::{QueryRequest, RegionRequest, RegionResponse};
use api::v1::region::{QueryRequest, RegionRequest};
use api::v1::ResponseHeader;
use arc_swap::ArcSwapOption;
use arrow_flight::Ticket;
@@ -23,7 +23,7 @@ use async_trait::async_trait;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_grpc::flight::{FlightDecoder, FlightMessage};
use common_meta::datanode_manager::{AffectedRows, Datanode};
use common_meta::datanode_manager::{Datanode, HandleResponse};
use common_meta::error::{self as meta_error, Result as MetaResult};
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
@@ -46,7 +46,7 @@ pub struct RegionRequester {
#[async_trait]
impl Datanode for RegionRequester {
async fn handle(&self, request: RegionRequest) -> MetaResult<AffectedRows> {
async fn handle(&self, request: RegionRequest) -> MetaResult<HandleResponse> {
self.handle_inner(request).await.map_err(|err| {
if err.should_retry() {
meta_error::Error::RetryLater {
@@ -123,8 +123,8 @@ impl RegionRequester {
.fail();
};
let metrics_str = Arc::new(ArcSwapOption::from(None));
let ref_str = metrics_str.clone();
let metrics = Arc::new(ArcSwapOption::from(None));
let metrics_ref = metrics.clone();
let tracing_context = TracingContext::from_current_span();
@@ -140,7 +140,8 @@ impl RegionRequester {
match flight_message {
FlightMessage::Recordbatch(record_batch) => yield Ok(record_batch),
FlightMessage::Metrics(s) => {
ref_str.swap(Some(Arc::new(s)));
let m = serde_json::from_str(&s).ok().map(Arc::new);
metrics_ref.swap(m);
break;
}
_ => {
@@ -159,12 +160,12 @@ impl RegionRequester {
schema,
stream,
output_ordering: None,
metrics: metrics_str,
metrics,
};
Ok(Box::pin(record_batch_stream))
}
async fn handle_inner(&self, request: RegionRequest) -> Result<AffectedRows> {
async fn handle_inner(&self, request: RegionRequest) -> Result<HandleResponse> {
let request_type = request
.body
.as_ref()
@@ -177,10 +178,7 @@ impl RegionRequester {
let mut client = self.client.raw_region_client()?;
let RegionResponse {
header,
affected_rows,
} = client
let response = client
.handle(request)
.await
.map_err(|e| {
@@ -194,19 +192,20 @@ impl RegionRequester {
})?
.into_inner();
check_response_header(header)?;
check_response_header(&response.header)?;
Ok(affected_rows)
Ok(HandleResponse::from_region_response(response))
}
pub async fn handle(&self, request: RegionRequest) -> Result<AffectedRows> {
pub async fn handle(&self, request: RegionRequest) -> Result<HandleResponse> {
self.handle_inner(request).await
}
}
pub fn check_response_header(header: Option<ResponseHeader>) -> Result<()> {
pub fn check_response_header(header: &Option<ResponseHeader>) -> Result<()> {
let status = header
.and_then(|header| header.status)
.as_ref()
.and_then(|header| header.status.as_ref())
.context(IllegalDatabaseResponseSnafu {
err_msg: "either response header or status is missing",
})?;
@@ -220,7 +219,7 @@ pub fn check_response_header(header: Option<ResponseHeader>) -> Result<()> {
})?;
ServerSnafu {
code,
msg: status.err_msg,
msg: status.err_msg.clone(),
}
.fail()
}
@@ -235,19 +234,19 @@ mod test {
#[test]
fn test_check_response_header() {
let result = check_response_header(None);
let result = check_response_header(&None);
assert!(matches!(
result.unwrap_err(),
IllegalDatabaseResponse { .. }
));
let result = check_response_header(Some(ResponseHeader { status: None }));
let result = check_response_header(&Some(ResponseHeader { status: None }));
assert!(matches!(
result.unwrap_err(),
IllegalDatabaseResponse { .. }
));
let result = check_response_header(Some(ResponseHeader {
let result = check_response_header(&Some(ResponseHeader {
status: Some(PbStatus {
status_code: StatusCode::Success as u32,
err_msg: String::default(),
@@ -255,7 +254,7 @@ mod test {
}));
assert!(result.is_ok());
let result = check_response_header(Some(ResponseHeader {
let result = check_response_header(&Some(ResponseHeader {
status: Some(PbStatus {
status_code: u32::MAX,
err_msg: String::default(),
@@ -266,7 +265,7 @@ mod test {
IllegalDatabaseResponse { .. }
));
let result = check_response_header(Some(ResponseHeader {
let result = check_response_header(&Some(ResponseHeader {
status: Some(PbStatus {
status_code: StatusCode::Internal as u32,
err_msg: "blabla".to_string(),

View File

@@ -12,8 +12,10 @@ path = "src/bin/greptime.rs"
[features]
tokio-console = ["common-telemetry/tokio-console"]
[lints]
workspace = true
[dependencies]
anymap = "1.0.0-beta.2"
async-trait.workspace = true
auth.workspace = true
catalog.workspace = true
@@ -49,7 +51,6 @@ meta-client.workspace = true
meta-srv.workspace = true
mito2.workspace = true
nu-ansi-term = "0.46"
partition.workspace = true
plugins.workspace = true
prometheus.workspace = true
prost.workspace = true

View File

@@ -13,5 +13,8 @@
// limitations under the License.
fn main() {
// Trigger this script if the git branch/commit changes
println!("cargo:rerun-if-changed=.git/refs/heads");
common_version::setup_build_info();
}

View File

@@ -62,7 +62,9 @@ pub struct BenchTableMetadataCommand {
impl BenchTableMetadataCommand {
pub async fn build(&self) -> Result<Instance> {
let etcd_store = EtcdStore::with_endpoints([&self.etcd_addr]).await.unwrap();
let etcd_store = EtcdStore::with_endpoints([&self.etcd_addr], 128)
.await
.unwrap();
let table_metadata_manager = Arc::new(TableMetadataManager::new(etcd_store));

View File

@@ -106,9 +106,15 @@ impl TableMetadataBencher {
.await
.unwrap();
let start = Instant::now();
let table_info = table_info.unwrap();
let table_id = table_info.table_info.ident.table_id;
let _ = self
.table_metadata_manager
.delete_table_metadata(&table_info.unwrap(), &table_route.unwrap())
.delete_table_metadata(
table_id,
&table_info.table_name(),
table_route.unwrap().region_routes().unwrap(),
)
.await;
start.elapsed()
},

View File

@@ -19,8 +19,7 @@ use async_trait::async_trait;
use clap::{Parser, ValueEnum};
use client::api::v1::auth_header::AuthScheme;
use client::api::v1::Basic;
use client::{Client, Database, DEFAULT_SCHEMA_NAME};
use common_query::Output;
use client::{Client, Database, OutputData, DEFAULT_SCHEMA_NAME};
use common_recordbatch::util::collect;
use common_telemetry::{debug, error, info, warn};
use datatypes::scalars::ScalarVector;
@@ -142,7 +141,7 @@ impl Export {
.with_context(|_| RequestDatabaseSnafu {
sql: "show databases".to_string(),
})?;
let Output::Stream(stream) = result else {
let OutputData::Stream(stream) = result.data else {
NotDataFromOutputSnafu.fail()?
};
let record_batch = collect(stream)
@@ -183,7 +182,7 @@ impl Export {
.sql(&sql)
.await
.with_context(|_| RequestDatabaseSnafu { sql })?;
let Output::Stream(stream) = result else {
let OutputData::Stream(stream) = result.data else {
NotDataFromOutputSnafu.fail()?
};
let Some(record_batch) = collect(stream)
@@ -235,7 +234,7 @@ impl Export {
.sql(&sql)
.await
.with_context(|_| RequestDatabaseSnafu { sql })?;
let Output::Stream(stream) = result else {
let OutputData::Stream(stream) = result.data else {
NotDataFromOutputSnafu.fail()?
};
let record_batch = collect(stream)

View File

@@ -19,9 +19,10 @@ use std::time::Instant;
use catalog::kvbackend::{
CachedMetaKvBackend, CachedMetaKvBackendBuilder, KvBackendCatalogManager,
};
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use client::{Client, Database, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_base::Plugins;
use common_error::ext::ErrorExt;
use common_meta::cache_invalidator::MultiCacheInvalidator;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::logging;
@@ -184,15 +185,15 @@ impl Repl {
}
.context(RequestDatabaseSnafu { sql: &sql })?;
let either = match output {
Output::Stream(s) => {
let either = match output.data {
OutputData::Stream(s) => {
let x = RecordBatches::try_collect(s)
.await
.context(CollectRecordBatchesSnafu)?;
Either::Left(x)
}
Output::RecordBatches(x) => Either::Left(x),
Output::AffectedRows(rows) => Either::Right(rows),
OutputData::RecordBatches(x) => Either::Left(x),
OutputData::AffectedRows(rows) => Either::Right(rows),
};
let end = Instant::now();
@@ -252,14 +253,17 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
let cached_meta_backend =
Arc::new(CachedMetaKvBackendBuilder::new(meta_client.clone()).build());
let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::with_invalidators(vec![
cached_meta_backend.clone(),
]));
let catalog_list =
KvBackendCatalogManager::new(cached_meta_backend.clone(), cached_meta_backend);
KvBackendCatalogManager::new(cached_meta_backend.clone(), multi_cache_invalidator).await;
let plugins: Plugins = Default::default();
let state = Arc::new(QueryEngineState::new(
catalog_list,
None,
None,
None,
false,
plugins.clone(),
));

View File

@@ -70,7 +70,7 @@ impl UpgradeCommand {
etcd_addr: &self.etcd_addr,
})?;
let tool = MigrateTableMetadata {
etcd_store: EtcdStore::with_etcd_client(client),
etcd_store: EtcdStore::with_etcd_client(client, 128),
dryrun: self.dryrun,
skip_catalog_keys: self.skip_catalog_keys,
skip_table_global_keys: self.skip_table_global_keys,

View File

@@ -43,6 +43,10 @@ impl Instance {
pub fn datanode_mut(&mut self) -> &mut Datanode {
&mut self.datanode
}
pub fn datanode(&self) -> &Datanode {
&self.datanode
}
}
#[async_trait]
@@ -235,6 +239,7 @@ impl StartCommand {
.with_default_grpc_server(&datanode.region_server())
.enable_http_service()
.build()
.await
.context(StartDatanodeSnafu)?;
datanode.setup_services(services);

View File

@@ -16,9 +16,10 @@ use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use catalog::kvbackend::CachedMetaKvBackendBuilder;
use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager};
use clap::Parser;
use client::client_manager::DatanodeClients;
use common_meta::cache_invalidator::MultiCacheInvalidator;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_telemetry::logging;
@@ -43,13 +44,17 @@ pub struct Instance {
}
impl Instance {
fn new(frontend: FeInstance) -> Self {
pub fn new(frontend: FeInstance) -> Self {
Self { frontend }
}
pub fn mut_inner(&mut self) -> &mut FeInstance {
&mut self.frontend
}
pub fn inner(&self) -> &FeInstance {
&self.frontend
}
}
#[async_trait]
@@ -243,11 +248,19 @@ impl StartCommand {
.cache_tti(cache_tti)
.build();
let cached_meta_backend = Arc::new(cached_meta_backend);
let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::with_invalidators(vec![
cached_meta_backend.clone(),
]));
let catalog_manager = KvBackendCatalogManager::new(
cached_meta_backend.clone(),
multi_cache_invalidator.clone(),
)
.await;
let executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
Arc::new(InvalidateTableCacheHandler::new(
cached_meta_backend.clone(),
multi_cache_invalidator.clone(),
)),
]);
@@ -259,11 +272,12 @@ impl StartCommand {
let mut instance = FrontendBuilder::new(
cached_meta_backend.clone(),
catalog_manager,
Arc::new(DatanodeClients::default()),
meta_client,
)
.with_cache_invalidator(cached_meta_backend)
.with_plugin(plugins.clone())
.with_cache_invalidator(multi_cache_invalidator)
.with_heartbeat_task(heartbeat_task)
.try_build()
.await
@@ -271,6 +285,7 @@ impl StartCommand {
let servers = Services::new(opts.clone(), Arc::new(instance.clone()), plugins)
.build()
.await
.context(StartFrontendSnafu)?;
instance
.build_servers(opts, servers)

View File

@@ -32,11 +32,11 @@ lazy_static::lazy_static! {
}
#[async_trait]
pub trait App {
pub trait App: Send {
fn name(&self) -> &str;
/// A hook for implementor to make something happened before actual startup. Defaults to no-op.
fn pre_start(&mut self) -> error::Result<()> {
async fn pre_start(&mut self) -> error::Result<()> {
Ok(())
}
@@ -46,24 +46,21 @@ pub trait App {
}
pub async fn start_app(mut app: Box<dyn App>) -> error::Result<()> {
let name = app.name().to_string();
info!("Starting app: {}", app.name());
app.pre_start()?;
app.pre_start().await?;
tokio::select! {
result = app.start() => {
if let Err(err) = result {
error!(err; "Failed to start app {name}!");
}
}
_ = tokio::signal::ctrl_c() => {
if let Err(err) = app.stop().await {
error!(err; "Failed to stop app {name}!");
}
info!("Goodbye!");
}
app.start().await?;
if let Err(e) = tokio::signal::ctrl_c().await {
error!("Failed to listen for ctrl-c signal: {}", e);
// It's unusual to fail to listen for ctrl-c signal, maybe there's something unexpected in
// the underlying system. So we stop the app instead of running nonetheless to let people
// investigate the issue.
}
app.stop().await?;
info!("Goodbye!");
Ok(())
}

View File

@@ -117,10 +117,12 @@ struct StartCommand {
/// The working home directory of this metasrv instance.
#[clap(long)]
data_home: Option<String>,
/// If it's not empty, the metasrv will store all data with this key prefix.
#[clap(long, default_value = "")]
store_key_prefix: String,
/// The max operations per txn
#[clap(long)]
max_txn_ops: Option<usize>,
}
impl StartCommand {
@@ -181,6 +183,10 @@ impl StartCommand {
opts.store_key_prefix = self.store_key_prefix.clone()
}
if let Some(max_txn_ops) = self.max_txn_ops {
opts.max_txn_ops = max_txn_ops;
}
// Disable dashboard in metasrv.
opts.http.disable_dashboard = true;
@@ -212,6 +218,7 @@ impl StartCommand {
mod tests {
use std::io::Write;
use common_base::readable_size::ReadableSize;
use common_test_util::temp_dir::create_named_temp_file;
use meta_srv::selector::SelectorType;
@@ -291,6 +298,10 @@ mod tests {
.first_heartbeat_estimate
.as_millis()
);
assert_eq!(
options.procedure.max_metadata_value_size,
Some(ReadableSize::kb(1500))
);
}
#[test]

View File

@@ -16,13 +16,14 @@ use std::sync::Arc;
use std::{fs, path};
use async_trait::async_trait;
use catalog::kvbackend::KvBackendCatalogManager;
use clap::Parser;
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_config::{metadata_store_dir, KvBackendConfig};
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::cache_invalidator::{CacheInvalidatorRef, MultiCacheInvalidator};
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::ddl::table_meta::TableMetadataAllocator;
use common_meta::ddl::DdlTaskExecutorRef;
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
use common_meta::ddl::ProcedureExecutorRef;
use common_meta::ddl_manager::DdlManager;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
@@ -399,6 +400,10 @@ impl StartCommand {
.await
.context(StartFrontendSnafu)?;
let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::default());
let catalog_manager =
KvBackendCatalogManager::new(kv_backend.clone(), multi_cache_invalidator.clone()).await;
let builder =
DatanodeBuilder::new(dn_opts, fe_plugins.clone()).with_kv_backend(kv_backend.clone());
let datanode = builder.build().await.context(StartDatanodeSnafu)?;
@@ -419,28 +424,34 @@ impl StartCommand {
let table_metadata_manager =
Self::create_table_metadata_manager(kv_backend.clone()).await?;
let table_meta_allocator = TableMetadataAllocator::new(
let table_meta_allocator = Arc::new(TableMetadataAllocator::new(
table_id_sequence,
wal_options_allocator.clone(),
table_metadata_manager.clone(),
);
));
let ddl_task_executor = Self::create_ddl_task_executor(
table_metadata_manager,
procedure_manager.clone(),
datanode_manager.clone(),
multi_cache_invalidator,
table_meta_allocator,
)
.await?;
let mut frontend = FrontendBuilder::new(kv_backend, datanode_manager, ddl_task_executor)
.with_plugin(fe_plugins.clone())
.try_build()
.await
.context(StartFrontendSnafu)?;
let mut frontend = FrontendBuilder::new(
kv_backend,
catalog_manager,
datanode_manager,
ddl_task_executor,
)
.with_plugin(fe_plugins.clone())
.try_build()
.await
.context(StartFrontendSnafu)?;
let servers = Services::new(fe_opts.clone(), Arc::new(frontend.clone()), fe_plugins)
.build()
.await
.context(StartFrontendSnafu)?;
frontend
.build_servers(fe_opts, servers)
@@ -458,21 +469,23 @@ impl StartCommand {
table_metadata_manager: TableMetadataManagerRef,
procedure_manager: ProcedureManagerRef,
datanode_manager: DatanodeManagerRef,
table_meta_allocator: TableMetadataAllocator,
) -> Result<DdlTaskExecutorRef> {
let ddl_task_executor: DdlTaskExecutorRef = Arc::new(
cache_invalidator: CacheInvalidatorRef,
table_meta_allocator: TableMetadataAllocatorRef,
) -> Result<ProcedureExecutorRef> {
let procedure_executor: ProcedureExecutorRef = Arc::new(
DdlManager::try_new(
procedure_manager,
datanode_manager,
Arc::new(DummyCacheInvalidator),
cache_invalidator,
table_metadata_manager,
table_meta_allocator,
Arc::new(MemoryRegionKeeper::default()),
true,
)
.context(InitDdlManagerSnafu)?,
);
Ok(ddl_task_executor)
Ok(procedure_executor)
}
pub async fn create_table_metadata_manager(

View File

@@ -4,6 +4,9 @@ version.workspace = true
edition.workspace = true
license.workspace = true
[lints]
workspace = true
[dependencies]
anymap = "1.0.0-beta.2"
bitvec = "1.0"

View File

@@ -21,6 +21,8 @@ pub mod readable_size;
use core::any::Any;
use std::sync::{Arc, Mutex, MutexGuard};
pub type AffectedRows = usize;
pub use bit_vec::BitVec;
/// [`Plugins`] is a wrapper of Arc contents.

View File

@@ -1,20 +1,6 @@
// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0.
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file is copied from https://github.com/tikv/raft-engine/blob/8dd2a39f359ff16f5295f35343f626e0c10132fa/src/util.rs
// This file is copied from https://github.com/tikv/raft-engine/blob/0.3.0/src/util.rs
use std::fmt::{self, Debug, Display, Write};
use std::ops::{Div, Mul};

View File

@@ -4,6 +4,9 @@ version.workspace = true
edition.workspace = true
license.workspace = true
[lints]
workspace = true
[dependencies]
common-error.workspace = true
common-macro.workspace = true

View File

@@ -55,10 +55,10 @@ pub fn build_db_string(catalog: &str, schema: &str) -> String {
/// schema name
/// - if `[<catalog>-]` is provided, we split database name with `-` and use
/// `<catalog>` and `<schema>`.
pub fn parse_catalog_and_schema_from_db_string(db: &str) -> (&str, &str) {
pub fn parse_catalog_and_schema_from_db_string(db: &str) -> (String, String) {
match parse_optional_catalog_and_schema_from_db_string(db) {
(Some(catalog), schema) => (catalog, schema),
(None, schema) => (DEFAULT_CATALOG_NAME, schema),
(None, schema) => (DEFAULT_CATALOG_NAME.to_string(), schema),
}
}
@@ -66,12 +66,12 @@ pub fn parse_catalog_and_schema_from_db_string(db: &str) -> (&str, &str) {
///
/// Similar to [`parse_catalog_and_schema_from_db_string`] but returns an optional
/// catalog if it's not provided in the database name.
pub fn parse_optional_catalog_and_schema_from_db_string(db: &str) -> (Option<&str>, &str) {
pub fn parse_optional_catalog_and_schema_from_db_string(db: &str) -> (Option<String>, String) {
let parts = db.splitn(2, '-').collect::<Vec<&str>>();
if parts.len() == 2 {
(Some(parts[0]), parts[1])
(Some(parts[0].to_lowercase()), parts[1].to_lowercase())
} else {
(None, db)
(None, db.to_lowercase())
}
}
@@ -88,32 +88,37 @@ mod tests {
#[test]
fn test_parse_catalog_and_schema() {
assert_eq!(
(DEFAULT_CATALOG_NAME, "fullschema"),
(DEFAULT_CATALOG_NAME.to_string(), "fullschema".to_string()),
parse_catalog_and_schema_from_db_string("fullschema")
);
assert_eq!(
("catalog", "schema"),
("catalog".to_string(), "schema".to_string()),
parse_catalog_and_schema_from_db_string("catalog-schema")
);
assert_eq!(
("catalog", "schema1-schema2"),
("catalog".to_string(), "schema1-schema2".to_string()),
parse_catalog_and_schema_from_db_string("catalog-schema1-schema2")
);
assert_eq!(
(None, "fullschema"),
(None, "fullschema".to_string()),
parse_optional_catalog_and_schema_from_db_string("fullschema")
);
assert_eq!(
(Some("catalog"), "schema"),
(Some("catalog".to_string()), "schema".to_string()),
parse_optional_catalog_and_schema_from_db_string("catalog-schema")
);
assert_eq!(
(Some("catalog"), "schema1-schema2"),
(Some("catalog".to_string()), "schema".to_string()),
parse_optional_catalog_and_schema_from_db_string("CATALOG-SCHEMA")
);
assert_eq!(
(Some("catalog".to_string()), "schema1-schema2".to_string()),
parse_optional_catalog_and_schema_from_db_string("catalog-schema1-schema2")
);
}

View File

@@ -4,9 +4,11 @@ version.workspace = true
edition.workspace = true
license.workspace = true
[lints]
workspace = true
[dependencies]
common-base.workspace = true
humantime-serde.workspace = true
num_cpus.workspace = true
serde.workspace = true
sysinfo.workspace = true

View File

@@ -4,6 +4,9 @@ version.workspace = true
edition.workspace = true
license.workspace = true
[lints]
workspace = true
[dependencies]
arrow.workspace = true
arrow-schema.workspace = true

View File

@@ -28,12 +28,15 @@ const REGION: &str = "region";
const ENABLE_VIRTUAL_HOST_STYLE: &str = "enable_virtual_host_style";
pub fn is_supported_in_s3(key: &str) -> bool {
key == ENDPOINT
|| key == ACCESS_KEY_ID
|| key == SECRET_ACCESS_KEY
|| key == SESSION_TOKEN
|| key == REGION
|| key == ENABLE_VIRTUAL_HOST_STYLE
[
ENDPOINT,
ACCESS_KEY_ID,
SECRET_ACCESS_KEY,
SESSION_TOKEN,
REGION,
ENABLE_VIRTUAL_HOST_STYLE,
]
.contains(&key)
}
pub fn build_s3_backend(

View File

@@ -4,8 +4,10 @@ version.workspace = true
edition.workspace = true
license.workspace = true
[lints]
workspace = true
[dependencies]
arrow.workspace = true
bigdecimal.workspace = true
common-error.workspace = true
common-macro.workspace = true

View File

@@ -4,6 +4,9 @@ version.workspace = true
edition.workspace = true
license.workspace = true
[lints]
workspace = true
[dependencies]
snafu.workspace = true
strum.workspace = true

View File

@@ -19,7 +19,9 @@ pub mod format;
pub mod mock;
pub mod status_code;
pub use snafu;
// HACK - these headers are here for shared in gRPC services. For common HTTP headers,
// please define in `src/servers/src/http/header.rs`.
pub const GREPTIME_DB_HEADER_ERROR_CODE: &str = "x-greptime-err-code";
pub const GREPTIME_DB_HEADER_ERROR_MSG: &str = "x-greptime-err-msg";
pub use snafu;

View File

@@ -4,13 +4,18 @@ edition.workspace = true
version.workspace = true
license.workspace = true
[lints]
workspace = true
[dependencies]
api.workspace = true
arc-swap = "1.0"
async-trait.workspace = true
chrono-tz = "0.6"
common-base.workspace = true
common-catalog.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-meta.workspace = true
common-query.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
@@ -18,14 +23,16 @@ common-time.workspace = true
common-version.workspace = true
datafusion.workspace = true
datatypes.workspace = true
libc = "0.2"
num = "0.4"
num-traits = "0.2"
once_cell.workspace = true
paste = "1.0"
serde.workspace = true
serde_json.workspace = true
session.workspace = true
snafu.workspace = true
statrs = "0.16"
store-api.workspace = true
table.workspace = true
[dev-dependencies]

View File

@@ -30,6 +30,17 @@ pub struct FunctionContext {
pub state: Arc<FunctionState>,
}
impl FunctionContext {
/// Create a mock [`FunctionContext`] for test.
#[cfg(any(test, feature = "testing"))]
pub fn mock() -> Self {
Self {
query_ctx: QueryContextBuilder::default().build(),
state: Arc::new(FunctionState::mock()),
}
}
}
impl Default for FunctionContext {
fn default() -> Self {
Self {

View File

@@ -21,6 +21,7 @@ use once_cell::sync::Lazy;
use crate::function::FunctionRef;
use crate::scalars::aggregate::{AggregateFunctionMetaRef, AggregateFunctions};
use crate::scalars::date::DateFunction;
use crate::scalars::expression::ExpressionFunction;
use crate::scalars::math::MathFunction;
use crate::scalars::numpy::NumpyFunction;
use crate::scalars::timestamp::TimestampFunction;
@@ -80,6 +81,7 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
NumpyFunction::register(&function_registry);
TimestampFunction::register(&function_registry);
DateFunction::register(&function_registry);
ExpressionFunction::register(&function_registry);
// Aggregate functions
AggregateFunctions::register(&function_registry);

View File

@@ -13,42 +13,58 @@
// limitations under the License.
use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::ProcedureStateResponse;
use async_trait::async_trait;
use common_base::AffectedRows;
use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse};
use common_query::error::Result;
use common_query::Output;
use session::context::QueryContextRef;
use table::requests::{DeleteRequest, InsertRequest};
pub type AffectedRows = usize;
use store_api::storage::RegionId;
use table::requests::{CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest};
/// A trait for handling table mutations in `QueryEngine`.
#[async_trait]
pub trait TableMutationHandler: Send + Sync {
/// Inserts rows into the table.
async fn insert(&self, request: InsertRequest, ctx: QueryContextRef) -> Result<AffectedRows>;
async fn insert(&self, request: InsertRequest, ctx: QueryContextRef) -> Result<Output>;
/// Delete rows from the table.
async fn delete(&self, request: DeleteRequest, ctx: QueryContextRef) -> Result<AffectedRows>;
/// Migrate a region from source peer to target peer, returns the procedure id if success.
async fn migrate_region(
/// Trigger a flush task for table.
async fn flush(&self, request: FlushTableRequest, ctx: QueryContextRef)
-> Result<AffectedRows>;
/// Trigger a compaction task for table.
async fn compact(
&self,
region_id: u64,
from_peer: u64,
to_peer: u64,
replay_timeout: Duration,
) -> Result<String>;
request: CompactTableRequest,
ctx: QueryContextRef,
) -> Result<AffectedRows>;
/// Trigger a flush task for a table region.
async fn flush_region(&self, region_id: RegionId, ctx: QueryContextRef)
-> Result<AffectedRows>;
/// Trigger a compaction task for a table region.
async fn compact_region(
&self,
region_id: RegionId,
ctx: QueryContextRef,
) -> Result<AffectedRows>;
}
/// A trait for handling meta service requests in `QueryEngine`.
/// A trait for handling procedure service requests in `QueryEngine`.
#[async_trait]
pub trait MetaServiceHandler: Send + Sync {
pub trait ProcedureServiceHandler: Send + Sync {
/// Migrate a region from source peer to target peer, returns the procedure id if success.
async fn migrate_region(&self, request: MigrateRegionRequest) -> Result<Option<String>>;
/// Query the procedure' state by its id
async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse>;
}
pub type TableMutationHandlerRef = Arc<dyn TableMutationHandler>;
pub type MetaServiceHandlerRef = Arc<dyn MetaServiceHandler>;
pub type ProcedureServiceHandlerRef = Arc<dyn ProcedureServiceHandler>;

View File

@@ -12,8 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_query::error::{InvalidInputTypeSnafu, Result};
use common_query::prelude::{Signature, TypeSignature, Volatility};
use datatypes::prelude::ConcreteDataType;
use datatypes::types::cast::cast;
use datatypes::value::ValueRef;
use snafu::ResultExt;
/// Create a function signature with oneof signatures of interleaving two arguments.
pub fn one_of_sigs2(args1: Vec<ConcreteDataType>, args2: Vec<ConcreteDataType>) -> Signature {
@@ -27,3 +31,15 @@ pub fn one_of_sigs2(args1: Vec<ConcreteDataType>, args2: Vec<ConcreteDataType>)
Signature::one_of(sigs, Volatility::Immutable)
}
/// Cast a [`ValueRef`] to u64, returns `None` if fails
pub fn cast_u64(value: &ValueRef) -> Result<Option<u64>> {
cast((*value).into(), &ConcreteDataType::uint64_datatype())
.context(InvalidInputTypeSnafu {
err_msg: format!(
"Failed to cast input into uint64, actual type: {:#?}",
value.data_type(),
),
})
.map(|v| v.as_u64())
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod macros;
pub mod scalars;
mod system;
mod table;

View File

@@ -12,17 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Shard in a partition.
use crate::memtable::merge_tree::data::DataParts;
use crate::memtable::merge_tree::dict::KeyDictRef;
use crate::memtable::merge_tree::ShardId;
/// Shard stores data related to the same key dictionary.
pub struct Shard {
shard_id: ShardId,
/// Key dictionary of the shard. `None` if the schema of the tree doesn't have a primary key.
key_dict: Option<KeyDictRef>,
/// Data in the shard.
data_parts: DataParts,
/// Ensure current function is invokded under `greptime` catalog.
#[macro_export]
macro_rules! ensure_greptime {
($func_ctx: expr) => {{
use common_catalog::consts::DEFAULT_CATALOG_NAME;
snafu::ensure!(
$func_ctx.query_ctx.current_catalog() == DEFAULT_CATALOG_NAME,
common_query::error::PermissionDeniedSnafu {
err_msg: format!("current catalog is not {DEFAULT_CATALOG_NAME}")
}
);
}};
}

View File

@@ -14,8 +14,22 @@
mod binary;
mod ctx;
mod is_null;
mod unary;
use std::sync::Arc;
pub use binary::scalar_binary_op;
pub use ctx::EvalContext;
pub use unary::scalar_unary_op;
use crate::function_registry::FunctionRegistry;
use crate::scalars::expression::is_null::IsNullFunction;
pub(crate) struct ExpressionFunction;
impl ExpressionFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register(Arc::new(IsNullFunction));
}
}

Some files were not shown because too many files have changed in this diff Show More