Compare commits

..

33 Commits

Author SHA1 Message Date
Ruihang Xia
d8d29fd86a feat: always canonicalize partition expr
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2026-01-09 17:03:13 +08:00
Weny Xu
4891d7ceef fix: fix SQL table identifier quoting for election (#7541)
fix: fix SQL table identifier quoting for election and RDS kv-backend

- Quote MySQL table names with backticks and PostgreSQL tables with double quotes in election and RDS kv-backend SQL
- Update related tests to use quoted identifiers and cover hyphenated table names
- Ensure dynamic SQL using table names is safe for special characters in identifiers

Signed-off-by: WenyXu <wenymedia@gmail.com>
2026-01-09 03:15:53 +00:00
Weny Xu
aadfcd7821 feat(repartition): implement validation logic for repartition table (#7538)
* feat(repartition): implement validation logic for repartition_table

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: minor refactor

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: update sqlness

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2026-01-08 12:18:39 +00:00
Weny Xu
f3e2d333e4 feat(repartition): implement region allocation for repartition procedure (#7534)
* refactor: rename WalOptionsAllocator to WalProvider

The name "WalOptionsAllocator" was misleading because:
- For RaftEngine variant, it doesn't actually allocate anything
- The actual allocation logic lives in KafkaTopicPool

"WalProvider" better describes its role as providing WAL options
based on the configured WAL backend (RaftEngine or Kafka).

Changes:
- Rename `WalOptionsAllocator` to `WalProvider`
- Rename `WalOptionsAllocatorRef` to `WalProviderRef`
- Rename `build_wal_options_allocator` to `build_wal_provider`
- Rename module `wal_options_allocator` to `wal_provider`
- Rename error types: `BuildWalOptionsAllocator` -> `BuildWalProvider`,
  `StartWalOptionsAllocator` -> `StartWalProvider`

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor(meta): extract allocator traits from TableMetadataAllocator

Refactor TableMetadataAllocator to use trait-based dependency injection
for better testability and separation of concerns.

Changes:
- Add `ResourceIdAllocator` trait to abstract ID allocation
- Add `WalOptionsAllocator` trait to abstract WAL options allocation
- Implement traits for `Sequence` and `WalProvider`
- Remove duplicate `allocate_region_wal_options` function
- Rename `table_id_sequence` to `table_id_allocator` for consistency
- Rename `TableIdSequenceHandler` to `TableIdAllocatorHandler`

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat(meta): add max_region_number tracking to PhysicalTableRouteValue

Add `max_region_number` field to track the highest region number ever
allocated for a table. This value only increases when regions are added
and never decreases when regions are dropped, ensuring unique region
numbers across the table's lifetime.

Changes:
- Add `max_region_number` field to `PhysicalTableRouteValue`
- Implement custom `Deserialize` for backward compatibility
- Update `update_region_routes` to maintain max_region_number
- Calculate max_region_number from region_routes in `new()`

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: extract TableRouteAllocator trait from TableMetadataAllocator

- Add TableRouteAllocator trait for abstracting region route allocation
- Implement blanket impl for all PeerAllocator types
- Add PeerAllocator impl for Arc<T> to support trait object delegation
- Update TableMetadataAllocator to use TableRouteAllocatorRef

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: rename TableRouteAllocator to RegionRoutesAllocator

- Rename table_route.rs to region_routes.rs
- Rename TableRouteAllocator trait to RegionRoutesAllocator
- Rename wal_option.rs to wal_options.rs for consistency
- Update TableMetadataAllocator to use new naming

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat(meta-srv): implement region allocation for repartition procedure

This commit implements the region allocation phase of the repartition procedure,
which handles allocating new regions when a table needs to be split into more partitions.

Key changes:
- Refactor `RegionRoutesAllocator::allocate` to accept `(region_number, partition_expr)` tuples
  for more flexible region number assignment
- Simplify `AllocationPlanEntry` by removing `regions_to_allocate` and `regions_to_deallocate`
  fields (now derived from source/target counts)
- Add `convert_allocation_plan_to_repartition_plan` function to handle allocation, equal,
  and deallocation cases
- Fix `RepartitionPlanEntry::allocate_regions()` to return target regions (was incorrectly
  returning source regions)
- Implement complete `AllocateRegion` state with:
  - Region route allocation via `RegionRoutesAllocator`
  - WAL options allocation via `WalOptionsAllocator`
  - Operating region registration for concurrency control
  - Region creation on datanodes via `CreateTableExecutor`
  - Table route metadata update
- Add `TableRouteValue::max_region_number()` helper method
- Add comprehensive unit tests for plan conversion and allocation logic

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2026-01-08 11:03:58 +00:00
discord9
06f9a4c80c chore: sqlness redact time properly (#7543)
chore

Signed-off-by: discord9 <discord9@163.com>
2026-01-08 08:24:28 +00:00
shuiyisong
8e2c2e6e9a chore: add information extension to the plugins in frontend (#7542)
Signed-off-by: shuiyisong <xixing.sys@gmail.com>
2026-01-08 08:02:56 +00:00
fys
28255d8ade chore: add grafana dashboard about trigger (#7536)
* chore: add grafana dashboard about trigger

* fix: ci

* fix: ci

* fix: row title

* fix: ci

* modify desc

* fix: wrong legend

* revert some
2026-01-08 06:47:46 +00:00
Ning Sun
90deaae844 feat: update special remote write label name (#7527)
* feat: update special remote write label name

* chore: mark schema_label as deprecated
2026-01-08 06:13:31 +00:00
Alan Tang
a32326c887 chore: check for redundant pre-commit hooks (#7506)
Signed-off-by: StandingMan <jmtangcs@gmail.com>
2026-01-07 13:46:42 +00:00
Ruihang Xia
fce1687fa7 fix: incorrect timestamp index inference (#7530)
* add sqlness case, but can't reproduce

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* reproduction

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix wildcard rule

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* sort result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2026-01-07 11:18:25 +00:00
Yingwen
ef6dd5b99f fix: precise filter time index if not in projection (#7531)
* fix: precise filter time index if not in projection

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: add sqlness test

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
2026-01-07 11:15:34 +00:00
discord9
ac6d68aa2d fix: simp expr recursively (#7523)
* fix: simp expr recursively

Signed-off-by: discord9 <discord9@163.com>

* test: some simple constant folding case

Signed-off-by: discord9 <discord9@163.com>

* fix: literal ts cast to UTC

Signed-off-by: discord9 <discord9@163.com>

* fix: patch merge scan batch col tz instead

Signed-off-by: discord9 <discord9@163.com>

* test: fix

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
2026-01-07 09:22:26 +00:00
Ruihang Xia
d39895a970 feat: tune query traces (#7524)
* feat: add partition and region id

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* wip: instrument mito

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* connect region scan span

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* instrument streams

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* tweak

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2026-01-07 08:11:09 +00:00
jeremyhi
59867cd5b6 fix: remove log_env_flags (#7529)
Signed-off-by: jeremyhi <fengjiachun@gmail.com>
2026-01-07 08:08:35 +00:00
Ruihang Xia
9a4b7cbb32 feat: bump promql-parser to v0.7.1 (#7521)
* feat: bump promql-parser to v0.7.0

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add sqlness tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update other sqlness results

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Update tests/cases/standalone/common/tql/case_sensitive.result

Co-authored-by: Ning Sun <sunng@protonmail.com>

* remove escape on greptimedb side

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update to v0.7.1

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove unused deps

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Ning Sun <sunng@protonmail.com>
2026-01-07 07:23:40 +00:00
Weny Xu
2f242927a8 feat(repartition): implement region deallocation for repartition procedure (#7522)
* feat: implement deallocate regions for repartition procedure

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat(metric-engine): add force flag to drop physical regions with associated logical regions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: update table metadata after deallocating regions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: update proto

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2026-01-07 06:13:48 +00:00
Weny Xu
77310ec5bd refactor: refactor CreateTableProcedure to extract reusable components (#7526)
Signed-off-by: WenyXu <wenymedia@gmail.com>
2026-01-07 01:58:53 +00:00
Weny Xu
ada4666e10 refactor: remove region_numbers from TableMeta and TableInfo (#7519)
* refactor: remove `region_numbers` from `TableMeta` and `TableInfo`

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: create partitions from region route

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: fix build

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2026-01-06 13:21:36 +00:00
jeremyhi
898e84898c feat!: make heartbeat config only in metasrv (#7510)
* feat: make heartbeat config only in metasrv

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* feat: refine config doc

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* feat: make the heartbeat setup simple

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* chore: by comment

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* chore: revert config

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* feat: proto update

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* chore: fix sqlness wrong cfg

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

---------

Signed-off-by: jeremyhi <fengjiachun@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-01-06 09:43:36 +00:00
discord9
6f86a22e6f feat: adjust some args to gc worker (#7469)
* chore: less stuff sent

Signed-off-by: discord9 <discord9@163.com>

* after rebase fix

Signed-off-by: discord9 <discord9@163.com>

* pcr

Signed-off-by: discord9 <discord9@163.com>

* fix: clarify comment on manifest file removal for GC worker

Signed-off-by: discord9 <discord9@163.com>

* per review

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
2026-01-06 07:37:05 +00:00
Ruihang Xia
5162c1de4d feat: repartition grammar candy (#7518)
* feat: repartition grammar candy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* align keyword

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2026-01-06 04:44:13 +00:00
LFC
522ca99cd6 feat: ingest jsonbench data through pipeline (#7312)
Signed-off-by: luofucong <luofc@foxmail.com>
2026-01-05 12:12:34 +00:00
Weny Xu
2d756b24c8 feat: implement RemapManifest and ApplyStagingManifest for repartition procedure (#7509)
* feat: add RemapManifest and ApplyStagingManifest heartbeat handler

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: add `RemapManifest` and `ApplyStagingManifest` states for repartition

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2026-01-05 08:33:44 +00:00
shuiyisong
527a1c03f3 fix: pipeline loading issue (#7491)
* fix: pipeline loading

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: change string to str

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: minor fix to save returned version

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* refactor: introduce PipelineContent

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* fix: use found schema

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: update CR

Co-authored-by: Yingwen <realevenyag@gmail.com>

* chore: CR issue

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
2026-01-05 06:49:44 +00:00
discord9
7e243632c7 fix: dist planner rm col req when rm sort (#7512)
* aha!

Signed-off-by: discord9 <discord9@163.com>

* fix: rm col_req in pql sort

Signed-off-by: discord9 <discord9@163.com>

* ut

Signed-off-by: discord9 <discord9@163.com>

* docs

Signed-off-by: discord9 <discord9@163.com>

* typo

Signed-off-by: discord9 <discord9@163.com>

* more typo

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
2026-01-05 03:27:11 +00:00
Ruihang Xia
3556eb4476 chore: add tests to comment column on information_schema (#7514)
* feat: show comment on information_schema

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add to information schema for columns, add sqlness tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove duplications

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update integration test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2026-01-04 09:05:50 +00:00
Weny Xu
9343da7fe8 feat(meta-srv): fallback to non-TLS connection when etcd TLS prefer mode fail (#7507)
* feat(meta-srv): fallback to non-TLS connection when etcd TLS prefer mode fail

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore(ci): set timeout for deploy cluster

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: simplify etcd TLS prefer mode handling

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-12-31 10:03:34 +00:00
Alan Tang
8a07dbf605 fix: fix sqlness test error about double precision (#7476)
* fix: fix sqlness test error about double precision

Signed-off-by: StandingMan <jmtangcs@gmail.com>

* fix: use round method to truncate the result

Signed-off-by: StandingMan <jmtangcs@gmail.com>

---------

Signed-off-by: StandingMan <jmtangcs@gmail.com>
2025-12-31 04:55:22 +00:00
Weny Xu
83932c8c9e fix: align backend_tls default value with example config (#7496)
* fix: align backend_tls default value with example config

Signed-off-by: WenyXu <wenymedia@gmail.com>

* Update src/common/meta/src/kv_backend/rds/postgres.rs

Co-authored-by: dennis zhuang <killme2008@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
Co-authored-by: dennis zhuang <killme2008@gmail.com>
2025-12-31 03:31:08 +00:00
LFC
dc9fc582a0 feat: impl json_get_int for new json type (#7495)
Update src/common/function/src/scalars/json/json_get.rs



impl `json_get_int` for new json type

Signed-off-by: luofucong <luofc@foxmail.com>
2025-12-30 09:42:16 +00:00
Weny Xu
b1d81913f5 feat: update ApplyStagingManifestRequest to fetch manifest from central region (#7493)
* feat: update ApplyStagingManifestRequest to fetch manifest from central region

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: refine comments

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor(mito2): rename `StagingDataStorage` to `StagingBlobStorage`

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: update proto

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-12-30 07:29:56 +00:00
Yingwen
554f3943b6 ci: update breaking change title level (#7497)
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-12-30 06:17:51 +00:00
dennis zhuang
e4b5ef275f feat: impl vector index building (#7468)
* feat: impl vector index building

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* feat: supports flat format

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* ci: add vector_index feature to test

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: apply suggestions

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: apply suggestions from copilot

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

---------

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
2025-12-30 03:38:51 +00:00
280 changed files with 11939 additions and 2035 deletions

View File

@@ -70,19 +70,23 @@ runs:
--wait \
--wait-for-jobs
- name: Wait for GreptimeDB
shell: bash
run: |
while true; do
PHASE=$(kubectl -n my-greptimedb get gtc my-greptimedb -o jsonpath='{.status.clusterPhase}')
if [ "$PHASE" == "Running" ]; then
echo "Cluster is ready"
break
else
echo "Cluster is not ready yet: Current phase: $PHASE"
kubectl get pods -n my-greptimedb
sleep 5 # wait for 5 seconds before check again.
fi
done
uses: nick-fields/retry@v3
with:
timeout_minutes: 3
max_attempts: 1
shell: bash
command: |
while true; do
PHASE=$(kubectl -n my-greptimedb get gtc my-greptimedb -o jsonpath='{.status.clusterPhase}')
if [ "$PHASE" == "Running" ]; then
echo "Cluster is ready"
break
else
echo "Cluster is not ready yet: Current phase: $PHASE"
kubectl get pods -n my-greptimedb
sleep 5 # wait for 5 seconds before check again.
fi
done
- name: Print GreptimeDB info
if: always()
shell: bash

View File

@@ -755,7 +755,7 @@ jobs:
run: ../../.github/scripts/pull-test-deps-images.sh && docker compose up -d --wait
- name: Run nextest cases
run: cargo nextest run --workspace -F dashboard -F pg_kvbackend -F mysql_kvbackend
run: cargo nextest run --workspace -F dashboard -F pg_kvbackend -F mysql_kvbackend -F vector_index
env:
CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=mold"
RUST_BACKTRACE: 1
@@ -813,7 +813,7 @@ jobs:
run: ../../.github/scripts/pull-test-deps-images.sh && docker compose up -d --wait
- name: Run nextest cases
run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F dashboard -F pg_kvbackend -F mysql_kvbackend
run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F dashboard -F pg_kvbackend -F mysql_kvbackend -F vector_index
env:
CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=mold"
RUST_BACKTRACE: 1

View File

@@ -15,8 +15,11 @@ repos:
rev: v1.0
hooks:
- id: fmt
args: ["--", "--check"]
stages: [commit-msg]
- id: clippy
args: ["--workspace", "--all-targets", "--all-features", "--", "-D", "warnings"]
stages: [pre-push]
stages: [commit-msg]
- id: cargo-check
args: ["--workspace", "--all-targets", "--all-features"]
stages: [commit-msg]

22
Cargo.lock generated
View File

@@ -4062,6 +4062,7 @@ dependencies = [
"mito2",
"num_cpus",
"object-store",
"partition",
"prometheus",
"prost 0.13.5",
"query",
@@ -5466,7 +5467,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=520fa524f9d590752ea327683e82ffd65721b27c#520fa524f9d590752ea327683e82ffd65721b27c"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=0e316b86d765e4718d6f0ca77b1ad179f222b822#0e316b86d765e4718d6f0ca77b1ad179f222b822"
dependencies = [
"prost 0.13.5",
"prost-types 0.13.5",
@@ -7779,7 +7780,6 @@ dependencies = [
"either",
"futures",
"greptime-proto",
"humantime",
"humantime-serde",
"index",
"itertools 0.14.0",
@@ -7798,6 +7798,7 @@ dependencies = [
"rand 0.9.1",
"rayon",
"regex",
"roaring",
"rskafka",
"rstest",
"rstest_reuse",
@@ -7816,6 +7817,7 @@ dependencies = [
"tokio-util",
"toml 0.8.23",
"tracing",
"usearch",
"uuid",
]
@@ -9473,6 +9475,7 @@ dependencies = [
"ahash 0.8.12",
"api",
"arrow",
"arrow-schema",
"async-trait",
"catalog",
"chrono",
@@ -9950,9 +9953,9 @@ dependencies = [
[[package]]
name = "promql-parser"
version = "0.6.0"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "328fe69c2443ec4f8e6c33ea925dde04a1026e6c95928e89ed02343944cac9bf"
checksum = "6c3c2199b84e1253aade469e92ae16cd8dbe1de031c66a00f4f5cdd650290a86"
dependencies = [
"cfgrammar",
"chrono",
@@ -9962,7 +9965,6 @@ dependencies = [
"regex",
"serde",
"serde_json",
"unescaper",
]
[[package]]
@@ -10323,7 +10325,6 @@ dependencies = [
"tokio",
"tokio-stream",
"tracing",
"unescaper",
"uuid",
]
@@ -14166,15 +14167,6 @@ dependencies = [
"version_check",
]
[[package]]
name = "unescaper"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c01d12e3a56a4432a8b436f293c25f4808bdf9e9f9f98f9260bba1f1bc5a1f26"
dependencies = [
"thiserror 2.0.17",
]
[[package]]
name = "unicase"
version = "2.8.1"

View File

@@ -151,7 +151,7 @@ etcd-client = { version = "0.16.1", features = [
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "520fa524f9d590752ea327683e82ffd65721b27c" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "0e316b86d765e4718d6f0ca77b1ad179f222b822" }
hex = "0.4"
http = "1"
humantime = "2.1"
@@ -189,7 +189,7 @@ paste = "1.0"
pin-project = "1.0"
pretty_assertions = "1.4.0"
prometheus = { version = "0.13.3", features = ["process"] }
promql-parser = { version = "0.6", features = ["ser"] }
promql-parser = { version = "0.7.1", features = ["ser"] }
prost = { version = "0.13", features = ["no-recursion-limit"] }
prost-types = "0.13"
raft-engine = { version = "0.4.1", default-features = false }

View File

@@ -17,7 +17,7 @@ Release date: {{ timestamp | date(format="%B %d, %Y") }}
{%- set breakings = commits | filter(attribute="breaking", value=true) -%}
{%- if breakings | length > 0 %}
## Breaking changes
### Breaking changes
{% for commit in breakings %}
* {{ commit.github.pr_title }}\
{% if commit.github.username %} by \

View File

@@ -8863,7 +8863,7 @@
"type": "prometheus",
"uid": "${metrics}"
},
"description": "Elapsed of Reconciliation steps ",
"description": "Elapsed of Reconciliation steps",
"fieldConfig": {
"defaults": {
"color": {
@@ -9366,7 +9366,7 @@
"editorMode": "code",
"expr": "greptime_flow_input_buf_size",
"instant": false,
"legendFormat": "[{{instance}}]-[{{pod}]",
"legendFormat": "[{{instance}}]-[{{pod}}]",
"range": true,
"refId": "A"
}
@@ -9472,6 +9472,755 @@
],
"title": "Flownode",
"type": "row"
},
{
"collapsed": true,
"gridPos": {
"h": 1,
"w": 24,
"x": 0,
"y": 187
},
"id": 357,
"panels": [],
"title": "Trigger",
"type": "row"
},
{
"datasource": {
"type": "prometheus",
"uid": "bf9fzta69bhtsa"
},
"description": "Total number of triggers currently defined.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 24,
"x": 0,
"y": 188
},
"id": 358,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "11.6.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "greptime_trigger_count{}",
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "Trigger Count",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"description": "Elapsed time for trigger evaluation, including query execution and condition evaluation.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
},
"unit": "s"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 196
},
"id": 359,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "11.6.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.99, \n rate(greptime_trigger_evaluate_elapsed_bucket[$__rate_interval])\n)",
"hide": false,
"instant": false,
"legendFormat": "[{{instance}}]-[{{pod}}]-p99",
"range": true,
"refId": "A"
},
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.75, \n rate(greptime_trigger_evaluate_elapsed_bucket[$__rate_interval])\n)",
"hide": false,
"instant": false,
"legendFormat": "[{{instance}}]-[{{pod}}]-p75",
"range": true,
"refId": "B"
}
],
"title": "Trigger Eval Elapsed",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"description": "Rate of failed trigger evaluations.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
},
"unit": "none"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 196
},
"id": 360,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "11.6.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "rate(greptime_trigger_evaluate_failure_count[$__rate_interval])",
"hide": false,
"instant": false,
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "Trigger Eval Failure Rate",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"description": "Elapsed time to send trigger alerts to notification channels.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
},
"unit": "s"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 204
},
"id": 361,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "11.6.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.99, \n rate(greptime_trigger_send_alert_elapsed_bucket[$__rate_interval])\n)",
"hide": false,
"instant": false,
"legendFormat": "[{{instance}}]-[{{pod}}]-[{{channel_type}}]-p99",
"range": true,
"refId": "A"
},
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.75, \n rate(greptime_trigger_send_alert_elapsed_bucket[$__rate_interval])\n)",
"hide": false,
"instant": false,
"legendFormat": "[{{instance}}]-[{{pod}}]-[{{channel_type}}]-p75",
"range": true,
"refId": "B"
}
],
"title": "Send Alert Elapsed",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"description": "Rate of failures when sending trigger alerts.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
},
"unit": "none"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 204
},
"id": 364,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "11.6.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "rate(greptime_trigger_send_alert_failure_count[$__rate_interval])",
"hide": false,
"instant": false,
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "Send Alert Failure Rate",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"description": "Elapsed time to persist trigger alert records.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
},
"unit": "s"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 212
},
"id": 363,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "11.6.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.99, \n rate(greptime_trigger_save_alert_record_elapsed_bucket[$__rate_interval])\n)",
"hide": false,
"instant": false,
"legendFormat": "[{{instance}}]-[{{pod}}]-[{{storage_type}}]-p99",
"range": true,
"refId": "A"
},
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.75, \n rate(greptime_trigger_save_alert_record_elapsed_bucket[$__rate_interval])\n)",
"hide": false,
"instant": false,
"legendFormat": "[{{instance}}]-[{{pod}}]-[{{storage_type}}]-p75",
"range": true,
"refId": "B"
}
],
"title": "Save Alert Elapsed",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"description": "Rate of failures when persisting trigger alert records.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
},
"unit": "none"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 212
},
"id": 362,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "11.6.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "rate(greptime_trigger_save_alert_record_failure_count[$__rate_interval])",
"hide": false,
"instant": false,
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "Save Alert Failure Rate",
"type": "timeseries"
}
],
"preload": false,
@@ -9613,4 +10362,4 @@
"title": "GreptimeDB",
"uid": "dejf3k5e7g2kgb",
"version": 15
}
}

View File

@@ -111,12 +111,34 @@
| Rate of meta KV Ops | `rate(greptime_meta_kv_request_elapsed_count[$__rate_interval])` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `none` | `{{pod}}-{{op}} p99` |
| DDL Latency | `histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_tables_bucket))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_view))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_flow))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_drop_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_alter_table))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `CreateLogicalTables-{{step}} p90` |
| Reconciliation stats | `greptime_meta_reconciliation_stats` | `timeseries` | Reconciliation stats | `prometheus` | `s` | `{{pod}}-{{table_type}}-{{type}}` |
| Reconciliation steps | `histogram_quantile(0.9, greptime_meta_reconciliation_procedure_bucket)` | `timeseries` | Elapsed of Reconciliation steps | `prometheus` | `s` | `{{procedure_name}}-{{step}}-P90` |
| Reconciliation steps | `histogram_quantile(0.9, greptime_meta_reconciliation_procedure_bucket)` | `timeseries` | Elapsed of Reconciliation steps | `prometheus` | `s` | `{{procedure_name}}-{{step}}-P90` |
# Flownode
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- |
| Flow Ingest / Output Rate | `sum by(instance, pod, direction) (rate(greptime_flow_processed_rows[$__rate_interval]))` | `timeseries` | Flow Ingest / Output Rate. | `prometheus` | -- | `[{{pod}}]-[{{instance}}]-[{{direction}}]` |
| Flow Ingest Latency | `histogram_quantile(0.95, sum(rate(greptime_flow_insert_elapsed_bucket[$__rate_interval])) by (le, instance, pod))`<br/>`histogram_quantile(0.99, sum(rate(greptime_flow_insert_elapsed_bucket[$__rate_interval])) by (le, instance, pod))` | `timeseries` | Flow Ingest Latency. | `prometheus` | -- | `[{{instance}}]-[{{pod}}]-p95` |
| Flow Operation Latency | `histogram_quantile(0.95, sum(rate(greptime_flow_processing_time_bucket[$__rate_interval])) by (le,instance,pod,type))`<br/>`histogram_quantile(0.99, sum(rate(greptime_flow_processing_time_bucket[$__rate_interval])) by (le,instance,pod,type))` | `timeseries` | Flow Operation Latency. | `prometheus` | -- | `[{{instance}}]-[{{pod}}]-[{{type}}]-p95` |
| Flow Buffer Size per Instance | `greptime_flow_input_buf_size` | `timeseries` | Flow Buffer Size per Instance. | `prometheus` | -- | `[{{instance}}]-[{{pod}]` |
| Flow Buffer Size per Instance | `greptime_flow_input_buf_size` | `timeseries` | Flow Buffer Size per Instance. | `prometheus` | -- | `[{{instance}}]-[{{pod}}]` |
| Flow Processing Error per Instance | `sum by(instance,pod,code) (rate(greptime_flow_errors[$__rate_interval]))` | `timeseries` | Flow Processing Error per Instance. | `prometheus` | -- | `[{{instance}}]-[{{pod}}]-[{{code}}]` |
# Trigger
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- |
| Trigger Count | `greptime_trigger_count{}` | `timeseries` | Total number of triggers currently defined. | `prometheus` | -- | `__auto` |
| Trigger Eval Elapsed | `histogram_quantile(0.99,
rate(greptime_trigger_evaluate_elapsed_bucket[$__rate_interval])
)`<br/>`histogram_quantile(0.75,
rate(greptime_trigger_evaluate_elapsed_bucket[$__rate_interval])
)` | `timeseries` | Elapsed time for trigger evaluation, including query execution and condition evaluation. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-p99` |
| Trigger Eval Failure Rate | `rate(greptime_trigger_evaluate_failure_count[$__rate_interval])` | `timeseries` | Rate of failed trigger evaluations. | `prometheus` | `none` | `__auto` |
| Send Alert Elapsed | `histogram_quantile(0.99,
rate(greptime_trigger_send_alert_elapsed_bucket[$__rate_interval])
)`<br/>`histogram_quantile(0.75,
rate(greptime_trigger_send_alert_elapsed_bucket[$__rate_interval])
)` | `timeseries` | Elapsed time to send trigger alerts to notification channels. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{channel_type}}]-p99` |
| Send Alert Failure Rate | `rate(greptime_trigger_send_alert_failure_count[$__rate_interval])` | `timeseries` | Rate of failures when sending trigger alerts. | `prometheus` | `none` | `__auto` |
| Save Alert Elapsed | `histogram_quantile(0.99,
rate(greptime_trigger_save_alert_record_elapsed_bucket[$__rate_interval])
)`<br/>`histogram_quantile(0.75,
rate(greptime_trigger_save_alert_record_elapsed_bucket[$__rate_interval])
)` | `timeseries` | Elapsed time to persist trigger alert records. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{storage_type}}]-p99` |
| Save Alert Failure Rate | `rate(greptime_trigger_save_alert_record_failure_count[$__rate_interval])` | `timeseries` | Rate of failures when persisting trigger alert records. | `prometheus` | `none` | `__auto` |

View File

@@ -1002,7 +1002,7 @@ groups:
legendFormat: '{{pod}}-{{table_type}}-{{type}}'
- title: Reconciliation steps
type: timeseries
description: 'Elapsed of Reconciliation steps '
description: Elapsed of Reconciliation steps
unit: s
queries:
- expr: histogram_quantile(0.9, greptime_meta_reconciliation_procedure_bucket)
@@ -1057,7 +1057,7 @@ groups:
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}]'
legendFormat: '[{{instance}}]-[{{pod}}]'
- title: Flow Processing Error per Instance
type: timeseries
description: Flow Processing Error per Instance.
@@ -1067,3 +1067,89 @@ groups:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{code}}]'
- title: Trigger
panels:
- title: Trigger Count
type: timeseries
description: Total number of triggers currently defined.
queries:
- expr: greptime_trigger_count{}
datasource:
type: prometheus
uid: ${metrics}
legendFormat: __auto
- title: Trigger Eval Elapsed
type: timeseries
description: Elapsed time for trigger evaluation, including query execution and condition evaluation.
unit: s
queries:
- expr: "histogram_quantile(0.99, \n rate(greptime_trigger_evaluate_elapsed_bucket[$__rate_interval])\n)"
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-p99'
- expr: "histogram_quantile(0.75, \n rate(greptime_trigger_evaluate_elapsed_bucket[$__rate_interval])\n)"
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-p75'
- title: Trigger Eval Failure Rate
type: timeseries
description: Rate of failed trigger evaluations.
unit: none
queries:
- expr: rate(greptime_trigger_evaluate_failure_count[$__rate_interval])
datasource:
type: prometheus
uid: ${metrics}
legendFormat: __auto
- title: Send Alert Elapsed
type: timeseries
description: Elapsed time to send trigger alerts to notification channels.
unit: s
queries:
- expr: "histogram_quantile(0.99, \n rate(greptime_trigger_send_alert_elapsed_bucket[$__rate_interval])\n)"
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{channel_type}}]-p99'
- expr: "histogram_quantile(0.75, \n rate(greptime_trigger_send_alert_elapsed_bucket[$__rate_interval])\n)"
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{channel_type}}]-p75'
- title: Send Alert Failure Rate
type: timeseries
description: Rate of failures when sending trigger alerts.
unit: none
queries:
- expr: rate(greptime_trigger_send_alert_failure_count[$__rate_interval])
datasource:
type: prometheus
uid: ${metrics}
legendFormat: __auto
- title: Save Alert Elapsed
type: timeseries
description: Elapsed time to persist trigger alert records.
unit: s
queries:
- expr: "histogram_quantile(0.99, \n rate(greptime_trigger_save_alert_record_elapsed_bucket[$__rate_interval])\n)"
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{storage_type}}]-p99'
- expr: "histogram_quantile(0.75, \n rate(greptime_trigger_save_alert_record_elapsed_bucket[$__rate_interval])\n)"
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{storage_type}}]-p75'
- title: Save Alert Failure Rate
type: timeseries
description: Rate of failures when persisting trigger alert records.
unit: none
queries:
- expr: rate(greptime_trigger_save_alert_record_failure_count[$__rate_interval])
datasource:
type: prometheus
uid: ${metrics}
legendFormat: __auto

View File

@@ -8863,7 +8863,7 @@
"type": "prometheus",
"uid": "${metrics}"
},
"description": "Elapsed of Reconciliation steps ",
"description": "Elapsed of Reconciliation steps",
"fieldConfig": {
"defaults": {
"color": {
@@ -9366,7 +9366,7 @@
"editorMode": "code",
"expr": "greptime_flow_input_buf_size",
"instant": false,
"legendFormat": "[{{instance}}]-[{{pod}]",
"legendFormat": "[{{instance}}]-[{{pod}}]",
"range": true,
"refId": "A"
}
@@ -9472,6 +9472,755 @@
],
"title": "Flownode",
"type": "row"
},
{
"collapsed": true,
"gridPos": {
"h": 1,
"w": 24,
"x": 0,
"y": 187
},
"id": 357,
"panels": [],
"title": "Trigger",
"type": "row"
},
{
"datasource": {
"type": "prometheus",
"uid": "bf9fzta69bhtsa"
},
"description": "Total number of triggers currently defined.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 24,
"x": 0,
"y": 188
},
"id": 358,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "11.6.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "greptime_trigger_count{}",
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "Trigger Count",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"description": "Elapsed time for trigger evaluation, including query execution and condition evaluation.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
},
"unit": "s"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 196
},
"id": 359,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "11.6.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.99, \n rate(greptime_trigger_evaluate_elapsed_bucket[$__rate_interval])\n)",
"hide": false,
"instant": false,
"legendFormat": "[{{instance}}]-[{{pod}}]-p99",
"range": true,
"refId": "A"
},
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.75, \n rate(greptime_trigger_evaluate_elapsed_bucket[$__rate_interval])\n)",
"hide": false,
"instant": false,
"legendFormat": "[{{instance}}]-[{{pod}}]-p75",
"range": true,
"refId": "B"
}
],
"title": "Trigger Eval Elapsed",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"description": "Rate of failed trigger evaluations.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
},
"unit": "none"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 196
},
"id": 360,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "11.6.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "rate(greptime_trigger_evaluate_failure_count[$__rate_interval])",
"hide": false,
"instant": false,
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "Trigger Eval Failure Rate",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"description": "Elapsed time to send trigger alerts to notification channels.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
},
"unit": "s"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 204
},
"id": 361,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "11.6.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.99, \n rate(greptime_trigger_send_alert_elapsed_bucket[$__rate_interval])\n)",
"hide": false,
"instant": false,
"legendFormat": "[{{instance}}]-[{{pod}}]-[{{channel_type}}]-p99",
"range": true,
"refId": "A"
},
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.75, \n rate(greptime_trigger_send_alert_elapsed_bucket[$__rate_interval])\n)",
"hide": false,
"instant": false,
"legendFormat": "[{{instance}}]-[{{pod}}]-[{{channel_type}}]-p75",
"range": true,
"refId": "B"
}
],
"title": "Send Alert Elapsed",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"description": "Rate of failures when sending trigger alerts.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
},
"unit": "none"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 204
},
"id": 364,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "11.6.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "rate(greptime_trigger_send_alert_failure_count[$__rate_interval])",
"hide": false,
"instant": false,
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "Send Alert Failure Rate",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"description": "Elapsed time to persist trigger alert records.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
},
"unit": "s"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 212
},
"id": 363,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "11.6.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.99, \n rate(greptime_trigger_save_alert_record_elapsed_bucket[$__rate_interval])\n)",
"hide": false,
"instant": false,
"legendFormat": "[{{instance}}]-[{{pod}}]-[{{storage_type}}]-p99",
"range": true,
"refId": "A"
},
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.75, \n rate(greptime_trigger_save_alert_record_elapsed_bucket[$__rate_interval])\n)",
"hide": false,
"instant": false,
"legendFormat": "[{{instance}}]-[{{pod}}]-[{{storage_type}}]-p75",
"range": true,
"refId": "B"
}
],
"title": "Save Alert Elapsed",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"description": "Rate of failures when persisting trigger alert records.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
},
"unit": "none"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 212
},
"id": 362,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "11.6.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "rate(greptime_trigger_save_alert_record_failure_count[$__rate_interval])",
"hide": false,
"instant": false,
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "Save Alert Failure Rate",
"type": "timeseries"
}
],
"preload": false,
@@ -9613,4 +10362,4 @@
"title": "GreptimeDB",
"uid": "dejf3k5e7g2kgb",
"version": 15
}
}

View File

@@ -111,12 +111,34 @@
| Rate of meta KV Ops | `rate(greptime_meta_kv_request_elapsed_count[$__rate_interval])` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `none` | `{{pod}}-{{op}} p99` |
| DDL Latency | `histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_tables_bucket))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_view))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_flow))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_drop_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_alter_table))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `CreateLogicalTables-{{step}} p90` |
| Reconciliation stats | `greptime_meta_reconciliation_stats` | `timeseries` | Reconciliation stats | `prometheus` | `s` | `{{pod}}-{{table_type}}-{{type}}` |
| Reconciliation steps | `histogram_quantile(0.9, greptime_meta_reconciliation_procedure_bucket)` | `timeseries` | Elapsed of Reconciliation steps | `prometheus` | `s` | `{{procedure_name}}-{{step}}-P90` |
| Reconciliation steps | `histogram_quantile(0.9, greptime_meta_reconciliation_procedure_bucket)` | `timeseries` | Elapsed of Reconciliation steps | `prometheus` | `s` | `{{procedure_name}}-{{step}}-P90` |
# Flownode
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- |
| Flow Ingest / Output Rate | `sum by(instance, pod, direction) (rate(greptime_flow_processed_rows[$__rate_interval]))` | `timeseries` | Flow Ingest / Output Rate. | `prometheus` | -- | `[{{pod}}]-[{{instance}}]-[{{direction}}]` |
| Flow Ingest Latency | `histogram_quantile(0.95, sum(rate(greptime_flow_insert_elapsed_bucket[$__rate_interval])) by (le, instance, pod))`<br/>`histogram_quantile(0.99, sum(rate(greptime_flow_insert_elapsed_bucket[$__rate_interval])) by (le, instance, pod))` | `timeseries` | Flow Ingest Latency. | `prometheus` | -- | `[{{instance}}]-[{{pod}}]-p95` |
| Flow Operation Latency | `histogram_quantile(0.95, sum(rate(greptime_flow_processing_time_bucket[$__rate_interval])) by (le,instance,pod,type))`<br/>`histogram_quantile(0.99, sum(rate(greptime_flow_processing_time_bucket[$__rate_interval])) by (le,instance,pod,type))` | `timeseries` | Flow Operation Latency. | `prometheus` | -- | `[{{instance}}]-[{{pod}}]-[{{type}}]-p95` |
| Flow Buffer Size per Instance | `greptime_flow_input_buf_size` | `timeseries` | Flow Buffer Size per Instance. | `prometheus` | -- | `[{{instance}}]-[{{pod}]` |
| Flow Buffer Size per Instance | `greptime_flow_input_buf_size` | `timeseries` | Flow Buffer Size per Instance. | `prometheus` | -- | `[{{instance}}]-[{{pod}}]` |
| Flow Processing Error per Instance | `sum by(instance,pod,code) (rate(greptime_flow_errors[$__rate_interval]))` | `timeseries` | Flow Processing Error per Instance. | `prometheus` | -- | `[{{instance}}]-[{{pod}}]-[{{code}}]` |
# Trigger
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- |
| Trigger Count | `greptime_trigger_count{}` | `timeseries` | Total number of triggers currently defined. | `prometheus` | -- | `__auto` |
| Trigger Eval Elapsed | `histogram_quantile(0.99,
rate(greptime_trigger_evaluate_elapsed_bucket[$__rate_interval])
)`<br/>`histogram_quantile(0.75,
rate(greptime_trigger_evaluate_elapsed_bucket[$__rate_interval])
)` | `timeseries` | Elapsed time for trigger evaluation, including query execution and condition evaluation. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-p99` |
| Trigger Eval Failure Rate | `rate(greptime_trigger_evaluate_failure_count[$__rate_interval])` | `timeseries` | Rate of failed trigger evaluations. | `prometheus` | `none` | `__auto` |
| Send Alert Elapsed | `histogram_quantile(0.99,
rate(greptime_trigger_send_alert_elapsed_bucket[$__rate_interval])
)`<br/>`histogram_quantile(0.75,
rate(greptime_trigger_send_alert_elapsed_bucket[$__rate_interval])
)` | `timeseries` | Elapsed time to send trigger alerts to notification channels. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{channel_type}}]-p99` |
| Send Alert Failure Rate | `rate(greptime_trigger_send_alert_failure_count[$__rate_interval])` | `timeseries` | Rate of failures when sending trigger alerts. | `prometheus` | `none` | `__auto` |
| Save Alert Elapsed | `histogram_quantile(0.99,
rate(greptime_trigger_save_alert_record_elapsed_bucket[$__rate_interval])
)`<br/>`histogram_quantile(0.75,
rate(greptime_trigger_save_alert_record_elapsed_bucket[$__rate_interval])
)` | `timeseries` | Elapsed time to persist trigger alert records. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{storage_type}}]-p99` |
| Save Alert Failure Rate | `rate(greptime_trigger_save_alert_record_failure_count[$__rate_interval])` | `timeseries` | Rate of failures when persisting trigger alert records. | `prometheus` | `none` | `__auto` |

View File

@@ -1002,7 +1002,7 @@ groups:
legendFormat: '{{pod}}-{{table_type}}-{{type}}'
- title: Reconciliation steps
type: timeseries
description: 'Elapsed of Reconciliation steps '
description: Elapsed of Reconciliation steps
unit: s
queries:
- expr: histogram_quantile(0.9, greptime_meta_reconciliation_procedure_bucket)
@@ -1057,7 +1057,7 @@ groups:
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}]'
legendFormat: '[{{instance}}]-[{{pod}}]'
- title: Flow Processing Error per Instance
type: timeseries
description: Flow Processing Error per Instance.
@@ -1067,3 +1067,89 @@ groups:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{code}}]'
- title: Trigger
panels:
- title: Trigger Count
type: timeseries
description: Total number of triggers currently defined.
queries:
- expr: greptime_trigger_count{}
datasource:
type: prometheus
uid: ${metrics}
legendFormat: __auto
- title: Trigger Eval Elapsed
type: timeseries
description: Elapsed time for trigger evaluation, including query execution and condition evaluation.
unit: s
queries:
- expr: "histogram_quantile(0.99, \n rate(greptime_trigger_evaluate_elapsed_bucket[$__rate_interval])\n)"
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-p99'
- expr: "histogram_quantile(0.75, \n rate(greptime_trigger_evaluate_elapsed_bucket[$__rate_interval])\n)"
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-p75'
- title: Trigger Eval Failure Rate
type: timeseries
description: Rate of failed trigger evaluations.
unit: none
queries:
- expr: rate(greptime_trigger_evaluate_failure_count[$__rate_interval])
datasource:
type: prometheus
uid: ${metrics}
legendFormat: __auto
- title: Send Alert Elapsed
type: timeseries
description: Elapsed time to send trigger alerts to notification channels.
unit: s
queries:
- expr: "histogram_quantile(0.99, \n rate(greptime_trigger_send_alert_elapsed_bucket[$__rate_interval])\n)"
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{channel_type}}]-p99'
- expr: "histogram_quantile(0.75, \n rate(greptime_trigger_send_alert_elapsed_bucket[$__rate_interval])\n)"
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{channel_type}}]-p75'
- title: Send Alert Failure Rate
type: timeseries
description: Rate of failures when sending trigger alerts.
unit: none
queries:
- expr: rate(greptime_trigger_send_alert_failure_count[$__rate_interval])
datasource:
type: prometheus
uid: ${metrics}
legendFormat: __auto
- title: Save Alert Elapsed
type: timeseries
description: Elapsed time to persist trigger alert records.
unit: s
queries:
- expr: "histogram_quantile(0.99, \n rate(greptime_trigger_save_alert_record_elapsed_bucket[$__rate_interval])\n)"
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{storage_type}}]-p99'
- expr: "histogram_quantile(0.75, \n rate(greptime_trigger_save_alert_record_elapsed_bucket[$__rate_interval])\n)"
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{storage_type}}]-p75'
- title: Save Alert Failure Rate
type: timeseries
description: Rate of failures when persisting trigger alert records.
unit: none
queries:
- expr: rate(greptime_trigger_save_alert_record_failure_count[$__rate_interval])
datasource:
type: prometheus
uid: ${metrics}
legendFormat: __auto

View File

@@ -895,7 +895,7 @@ pub fn is_column_type_value_eq(
.unwrap_or(false)
}
fn encode_json_value(value: JsonValue) -> v1::JsonValue {
pub fn encode_json_value(value: JsonValue) -> v1::JsonValue {
fn helper(json: JsonVariant) -> v1::JsonValue {
let value = match json {
JsonVariant::Null => None,

View File

@@ -17,8 +17,8 @@ use std::collections::HashMap;
use arrow_schema::extension::{EXTENSION_TYPE_METADATA_KEY, EXTENSION_TYPE_NAME_KEY};
use datatypes::schema::{
COMMENT_KEY, ColumnDefaultConstraint, ColumnSchema, FULLTEXT_KEY, FulltextAnalyzer,
FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, SKIPPING_INDEX_KEY, SkippingIndexOptions,
SkippingIndexType,
FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, Metadata, SKIPPING_INDEX_KEY,
SkippingIndexOptions, SkippingIndexType,
};
use greptime_proto::v1::{
Analyzer, FulltextBackend as PbFulltextBackend, SkippingIndexType as PbSkippingIndexType,
@@ -36,6 +36,14 @@ const INVERTED_INDEX_GRPC_KEY: &str = "inverted_index";
/// Key used to store skip index options in gRPC column options.
const SKIPPING_INDEX_GRPC_KEY: &str = "skipping_index";
const COLUMN_OPTION_MAPPINGS: [(&str, &str); 5] = [
(FULLTEXT_GRPC_KEY, FULLTEXT_KEY),
(INVERTED_INDEX_GRPC_KEY, INVERTED_INDEX_KEY),
(SKIPPING_INDEX_GRPC_KEY, SKIPPING_INDEX_KEY),
(EXTENSION_TYPE_NAME_KEY, EXTENSION_TYPE_NAME_KEY),
(EXTENSION_TYPE_METADATA_KEY, EXTENSION_TYPE_METADATA_KEY),
];
/// Tries to construct a `ColumnSchema` from the given `ColumnDef`.
pub fn try_as_column_schema(column_def: &ColumnDef) -> Result<ColumnSchema> {
let data_type = ColumnDataTypeWrapper::try_new(
@@ -131,6 +139,21 @@ pub fn try_as_column_def(column_schema: &ColumnSchema, is_primary_key: bool) ->
})
}
/// Collect the [ColumnOptions] into the [Metadata] that can be used in, for example, [ColumnSchema].
pub fn collect_column_options(column_options: Option<&ColumnOptions>) -> Metadata {
let Some(ColumnOptions { options }) = column_options else {
return Metadata::default();
};
let mut metadata = Metadata::with_capacity(options.len());
for (x, y) in COLUMN_OPTION_MAPPINGS {
if let Some(v) = options.get(x) {
metadata.insert(y.to_string(), v.clone());
}
}
metadata
}
/// Constructs a `ColumnOptions` from the given `ColumnSchema`.
pub fn options_from_column_schema(column_schema: &ColumnSchema) -> Option<ColumnOptions> {
let mut options = ColumnOptions::default();

View File

@@ -32,6 +32,7 @@ use crate::error::Result;
pub mod error;
pub mod information_extension;
pub mod kvbackend;
#[cfg(any(test, feature = "testing"))]
pub mod memory;
mod metrics;
pub mod system_schema;

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) const METRIC_DB_LABEL: &str = "db";
use lazy_static::lazy_static;
use prometheus::*;
@@ -25,7 +23,7 @@ lazy_static! {
pub static ref METRIC_CATALOG_MANAGER_TABLE_COUNT: IntGaugeVec = register_int_gauge_vec!(
"greptime_catalog_table_count",
"catalog table count",
&[METRIC_DB_LABEL]
&["db"]
)
.unwrap();
pub static ref METRIC_CATALOG_KV_REMOTE_GET: Histogram =

View File

@@ -24,6 +24,7 @@ use std::sync::Arc;
use common_error::ext::BoxedError;
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
use common_telemetry::tracing::Span;
use datatypes::schema::SchemaRef;
use futures_util::StreamExt;
use snafu::ResultExt;
@@ -163,6 +164,7 @@ impl DataSource for SystemTableDataSource {
stream: Box::pin(stream),
output_ordering: None,
metrics: Default::default(),
span: Span::current(),
};
Ok(Box::pin(stream))

View File

@@ -399,8 +399,8 @@ impl InformationSchemaColumnsBuilder {
self.is_nullables.push(Some("No"));
}
self.column_types.push(Some(&data_type));
self.column_comments
.push(column_schema.column_comment().map(|x| x.as_ref()));
let column_comment = column_schema.column_comment().map(|x| x.as_ref());
self.column_comments.push(column_comment);
}
fn finish(&mut self) -> Result<RecordBatch> {

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use core::pin::pin;
use std::sync::{Arc, Weak};
use arrow_schema::SchemaRef as ArrowSchemaRef;
@@ -31,15 +32,17 @@ use datatypes::value::Value;
use datatypes::vectors::{
StringVectorBuilder, TimestampSecondVectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder,
};
use futures::TryStreamExt;
use futures::StreamExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{RegionId, ScanRequest, TableId};
use store_api::storage::{ScanRequest, TableId};
use table::metadata::{TableInfo, TableType};
use crate::CatalogManager;
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
CreateRecordBatchSnafu, FindRegionRoutesSnafu, InternalSnafu, Result,
UpgradeWeakCatalogManagerRefSnafu,
};
use crate::kvbackend::KvBackendCatalogManager;
use crate::system_schema::information_schema::{InformationTable, Predicates, TABLES};
use crate::system_schema::utils;
@@ -247,6 +250,10 @@ impl InformationSchemaTablesBuilder {
.catalog_manager
.upgrade()
.context(UpgradeWeakCatalogManagerRefSnafu)?;
let partition_manager = catalog_manager
.as_any()
.downcast_ref::<KvBackendCatalogManager>()
.map(|catalog_manager| catalog_manager.partition_manager());
let predicates = Predicates::from_scan_request(&request);
let information_extension = utils::information_extension(&self.catalog_manager)?;
@@ -267,37 +274,59 @@ impl InformationSchemaTablesBuilder {
};
for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
let mut stream = catalog_manager.tables(&catalog_name, &schema_name, None);
let table_stream = catalog_manager.tables(&catalog_name, &schema_name, None);
while let Some(table) = stream.try_next().await? {
let table_info = table.table_info();
const BATCH_SIZE: usize = 128;
// Split tables into chunks
let mut table_chunks = pin!(table_stream.ready_chunks(BATCH_SIZE));
// TODO(dennis): make it working for metric engine
let table_region_stats =
if table_info.meta.engine == MITO_ENGINE || table_info.is_physical_table() {
table_info
.meta
.region_numbers
.iter()
.map(|n| RegionId::new(table_info.ident.table_id, *n))
.flat_map(|region_id| {
region_stats
.binary_search_by_key(&region_id, |x| x.id)
.map(|i| &region_stats[i])
})
.collect::<Vec<_>>()
} else {
vec![]
};
while let Some(tables) = table_chunks.next().await {
let tables = tables.into_iter().collect::<Result<Vec<_>>>()?;
let mito_or_physical_table_ids = tables
.iter()
.filter(|table| {
table.table_info().meta.engine == MITO_ENGINE
|| table.table_info().is_physical_table()
})
.map(|table| table.table_info().ident.table_id)
.collect::<Vec<_>>();
self.add_table(
&predicates,
&catalog_name,
&schema_name,
table_info,
table.table_type(),
&table_region_stats,
);
let table_routes = if let Some(partition_manager) = &partition_manager {
partition_manager
.batch_find_region_routes(&mito_or_physical_table_ids)
.await
.context(FindRegionRoutesSnafu)?
} else {
mito_or_physical_table_ids
.into_iter()
.map(|id| (id, vec![]))
.collect()
};
for table in tables {
let table_region_stats =
match table_routes.get(&table.table_info().ident.table_id) {
Some(routes) => routes
.iter()
.flat_map(|route| {
let region_id = route.region.id;
region_stats
.binary_search_by_key(&region_id, |x| x.id)
.map(|i| &region_stats[i])
})
.collect::<Vec<_>>(),
None => vec![],
};
self.add_table(
&predicates,
&catalog_name,
&schema_name,
table.table_info(),
table.table_type(),
&table_region_stats,
);
}
}
}

View File

@@ -337,7 +337,7 @@ mod tests {
.build();
let table_metadata_manager = TableMetadataManager::new(backend);
let mut view_info = common_meta::key::test_utils::new_test_table_info(1024, vec![]);
let mut view_info = common_meta::key::test_utils::new_test_table_info(1024);
view_info.table_type = TableType::View;
let logical_plan = vec![1, 2, 3];
// Create view metadata

View File

@@ -162,7 +162,6 @@ fn create_table_info(table_id: TableId, table_name: TableName) -> RawTableInfo {
next_column_id: columns as u32 + 1,
value_indices: vec![],
options: Default::default(),
region_numbers: (1..=100).collect(),
partition_key_indices: vec![],
column_ids: vec![],
};

View File

@@ -92,7 +92,7 @@ impl StoreConfig {
pub fn tls_config(&self) -> Option<TlsOption> {
if self.backend_tls_mode != TlsMode::Disable {
Some(TlsOption {
mode: self.backend_tls_mode.clone(),
mode: self.backend_tls_mode,
cert_path: self.backend_tls_cert_path.clone(),
key_path: self.backend_tls_key_path.clone(),
ca_cert_path: self.backend_tls_ca_cert_path.clone(),

View File

@@ -68,8 +68,8 @@ pub enum Error {
source: common_procedure::error::Error,
},
#[snafu(display("Failed to start wal options allocator"))]
StartWalOptionsAllocator {
#[snafu(display("Failed to start wal provider"))]
StartWalProvider {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
@@ -343,7 +343,7 @@ impl ErrorExt for Error {
Error::StartProcedureManager { source, .. }
| Error::StopProcedureManager { source, .. } => source.status_code(),
Error::StartWalOptionsAllocator { source, .. } => source.status_code(),
Error::StartWalProvider { source, .. } => source.status_code(),
Error::HttpQuerySql { .. } => StatusCode::Internal,
Error::ParseSql { source, .. } | Error::PlanStatement { source, .. } => {
source.status_code()

View File

@@ -37,6 +37,7 @@ use common_grpc::flight::{FlightDecoder, FlightMessage};
use common_query::Output;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper};
use common_telemetry::tracing::Span;
use common_telemetry::tracing_context::W3cTrace;
use common_telemetry::{error, warn};
use futures::future;
@@ -456,6 +457,7 @@ impl Database {
stream,
output_ordering: None,
metrics: Default::default(),
span: Span::current(),
};
Ok(Output::new_with_stream(Box::pin(record_batch_stream)))
}

View File

@@ -30,6 +30,7 @@ use common_query::request::QueryRequest;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream};
use common_telemetry::error;
use common_telemetry::tracing::Span;
use common_telemetry::tracing_context::TracingContext;
use prost::Message;
use query::query_engine::DefaultSerializer;
@@ -242,6 +243,7 @@ impl RegionRequester {
stream,
output_ordering: None,
metrics,
span: Span::current(),
};
Ok(Box::pin(record_batch_stream))
}

View File

@@ -18,6 +18,7 @@ default = [
]
enterprise = ["common-meta/enterprise", "frontend/enterprise", "meta-srv/enterprise"]
tokio-console = ["common-telemetry/tokio-console"]
vector_index = ["mito2/vector_index"]
[lints]
workspace = true

View File

@@ -330,7 +330,6 @@ mod tests {
use common_config::ENV_VAR_SEP;
use common_test_util::temp_dir::create_named_temp_file;
use object_store::config::{FileConfig, GcsConfig, ObjectStoreConfig, S3Config};
use servers::heartbeat_options::HeartbeatOptions;
use super::*;
use crate::options::GlobalOptions;
@@ -374,9 +373,6 @@ mod tests {
hostname = "127.0.0.1"
runtime_size = 8
[heartbeat]
interval = "300ms"
[meta_client]
metasrv_addrs = ["127.0.0.1:3002"]
timeout = "3s"
@@ -434,13 +430,6 @@ mod tests {
);
assert!(!raft_engine_config.sync_write);
let HeartbeatOptions {
interval: heart_beat_interval,
..
} = options.heartbeat;
assert_eq!(300, heart_beat_interval.as_millis());
let MetaClientOptions {
metasrv_addrs: metasrv_addr,
timeout,

View File

@@ -233,6 +233,8 @@ impl ObjbenchCommand {
inverted_index_config: MitoConfig::default().inverted_index,
fulltext_index_config,
bloom_filter_index_config: MitoConfig::default().bloom_filter_index,
#[cfg(feature = "vector_index")]
vector_index_config: Default::default(),
};
// Write SST

View File

@@ -64,8 +64,8 @@ pub enum Error {
source: common_procedure::error::Error,
},
#[snafu(display("Failed to start wal options allocator"))]
StartWalOptionsAllocator {
#[snafu(display("Failed to start wal provider"))]
StartWalProvider {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
@@ -289,8 +289,8 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to build wal options allocator"))]
BuildWalOptionsAllocator {
#[snafu(display("Failed to build wal provider"))]
BuildWalProvider {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
@@ -350,8 +350,9 @@ impl ErrorExt for Error {
Error::StartProcedureManager { source, .. }
| Error::StopProcedureManager { source, .. } => source.status_code(),
Error::BuildWalOptionsAllocator { source, .. }
| Error::StartWalOptionsAllocator { source, .. } => source.status_code(),
Error::BuildWalProvider { source, .. } | Error::StartWalProvider { source, .. } => {
source.status_code()
}
Error::HttpQuerySql { .. } => StatusCode::Internal,
Error::ParseSql { source, .. } | Error::PlanStatement { source, .. } => {
source.status_code()

View File

@@ -358,7 +358,6 @@ impl StartCommand {
let heartbeat_task = flow::heartbeat::HeartbeatTask::new(
&opts,
meta_client.clone(),
opts.heartbeat.clone(),
Arc::new(executor),
Arc::new(resource_stat),
);

View File

@@ -20,6 +20,7 @@ use std::time::Duration;
use async_trait::async_trait;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::information_extension::DistributedInformationExtension;
use catalog::information_schema::InformationExtensionRef;
use catalog::kvbackend::{
CachedKvBackendBuilder, CatalogManagerConfiguratorRef, KvBackendCatalogManagerBuilder,
MetaKvBackend,
@@ -236,7 +237,7 @@ impl StartCommand {
};
let tls_opts = TlsOption::new(
self.tls_mode.clone(),
self.tls_mode,
self.tls_cert_path.clone(),
self.tls_key_path.clone(),
self.tls_watch,
@@ -412,6 +413,7 @@ impl StartCommand {
meta_client.clone(),
client.clone(),
));
plugins.insert::<InformationExtensionRef>(information_extension.clone());
let process_manager = Arc::new(ProcessManager::new(
addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),

View File

@@ -108,7 +108,7 @@ pub trait App: Send {
}
}
/// Log the versions of the application, and the arguments passed to the cli.
/// Log the versions of the application.
///
/// `version` should be the same as the output of cli "--version";
/// and the `short_version` is the short version of the codes, often consist of git branch and commit.
@@ -118,10 +118,7 @@ pub fn log_versions(version: &str, short_version: &str, app: &str) {
.with_label_values(&[common_version::version(), short_version, app])
.inc();
// Log version and argument flags.
info!("GreptimeDB version: {}", version);
log_env_flags();
}
pub fn create_resource_limit_metrics(app: &str) {
@@ -144,13 +141,6 @@ pub fn create_resource_limit_metrics(app: &str) {
}
}
fn log_env_flags() {
info!("command line arguments");
for argument in std::env::args() {
info!("argument: {}", argument);
}
}
pub fn maybe_activate_heap_profile(memory_options: &common_options::memory::MemoryOptions) {
if memory_options.enable_heap_profiling {
match activate_heap_profile() {

View File

@@ -40,7 +40,7 @@ use common_meta::procedure_executor::LocalProcedureExecutor;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal_options_allocator::{WalOptionsAllocatorRef, build_wal_options_allocator};
use common_meta::wal_provider::{WalProviderRef, build_wal_provider};
use common_procedure::ProcedureManagerRef;
use common_query::prelude::set_default_prefix;
use common_telemetry::info;
@@ -120,7 +120,7 @@ pub struct Instance {
frontend: Frontend,
flownode: FlownodeInstance,
procedure_manager: ProcedureManagerRef,
wal_options_allocator: WalOptionsAllocatorRef,
wal_provider: WalProviderRef,
// Keep the logging guard to prevent the worker from being dropped.
_guard: Vec<WorkerGuard>,
}
@@ -146,10 +146,10 @@ impl App for Instance {
.await
.context(error::StartProcedureManagerSnafu)?;
self.wal_options_allocator
self.wal_provider
.start()
.await
.context(error::StartWalOptionsAllocatorSnafu)?;
.context(error::StartWalProviderSnafu)?;
plugins::start_frontend_plugins(self.frontend.instance.plugins().clone())
.await
@@ -261,7 +261,7 @@ impl StartCommand {
};
let tls_opts = TlsOption::new(
self.tls_mode.clone(),
self.tls_mode,
self.tls_cert_path.clone(),
self.tls_key_path.clone(),
self.tls_watch,
@@ -468,7 +468,7 @@ impl StartCommand {
flow_server: flownode.flow_engine(),
});
let table_id_sequence = Arc::new(
let table_id_allocator = Arc::new(
SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
.initial(MIN_USER_TABLE_ID as u64)
.step(10)
@@ -485,13 +485,13 @@ impl StartCommand {
.clone()
.try_into()
.context(error::InvalidWalProviderSnafu)?;
let wal_options_allocator = build_wal_options_allocator(&kafka_options, kv_backend.clone())
let wal_provider = build_wal_provider(&kafka_options, kv_backend.clone())
.await
.context(error::BuildWalOptionsAllocatorSnafu)?;
let wal_options_allocator = Arc::new(wal_options_allocator);
.context(error::BuildWalProviderSnafu)?;
let wal_provider = Arc::new(wal_provider);
let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
table_id_sequence,
wal_options_allocator.clone(),
table_id_allocator,
wal_provider.clone(),
));
let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
flow_id_sequence,
@@ -585,7 +585,7 @@ impl StartCommand {
frontend,
flownode,
procedure_manager,
wal_options_allocator,
wal_provider,
_guard: guard,
})
}

View File

@@ -228,7 +228,6 @@ fn test_load_flownode_example_config() {
..Default::default()
},
tracing: Default::default(),
heartbeat: Default::default(),
// flownode deliberately use a slower query parallelism
// to avoid overwhelming the frontend with too many queries
query: QueryOptions {

View File

@@ -27,7 +27,7 @@ use datafusion_common::arrow::datatypes::DataType;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::type_coercion::aggregates::STRINGS;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
use datatypes::arrow_array::string_array_value_at_index;
use datatypes::arrow_array::{int_array_value_at_index, string_array_value_at_index};
use datatypes::json::JsonStructureSettings;
use jsonpath_rust::JsonPath;
use serde_json::Value;
@@ -131,13 +131,6 @@ macro_rules! json_get {
};
}
json_get!(
JsonGetInt,
Int64,
i64,
"Get the value from the JSONB by the given path and return it as an integer."
);
json_get!(
JsonGetFloat,
Float64,
@@ -152,17 +145,65 @@ json_get!(
"Get the value from the JSONB by the given path and return it as a boolean."
);
/// Get the value from the JSONB by the given path and return it as a string.
#[derive(Clone, Debug)]
pub struct JsonGetString {
enum JsonResultValue<'a> {
Jsonb(Vec<u8>),
JsonStructByColumn(&'a ArrayRef, usize),
JsonStructByValue(&'a Value),
}
trait JsonGetResultBuilder {
fn append_value(&mut self, value: JsonResultValue<'_>) -> Result<()>;
fn append_null(&mut self);
fn build(&mut self) -> ArrayRef;
}
/// Common implementation for JSON get scalar functions.
///
/// `JsonGet` encapsulates the logic for extracting values from JSON inputs
/// based on a path expression. Different JSON get functions reuse this
/// implementation by supplying their own `JsonGetResultBuilder` to control
/// how the resulting values are materialized into an Arrow array.
struct JsonGet {
signature: Signature,
}
impl JsonGetString {
pub const NAME: &'static str = "json_get_string";
impl JsonGet {
fn invoke<F, B>(&self, args: ScalarFunctionArgs, builder_factory: F) -> Result<ColumnarValue>
where
F: Fn(usize) -> B,
B: JsonGetResultBuilder,
{
let [arg0, arg1] = extract_args("JSON_GET", &args)?;
let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
let paths = arg1.as_string_view();
let mut builder = (builder_factory)(arg0.len());
match arg0.data_type() {
DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
let jsons = arg0.as_binary_view();
jsonb_get(jsons, paths, &mut builder)?;
}
DataType::Struct(_) => {
let jsons = arg0.as_struct();
json_struct_get(jsons, paths, &mut builder)?
}
_ => {
return Err(DataFusionError::Execution(format!(
"JSON_GET not supported argument type {}",
arg0.data_type(),
)));
}
};
Ok(ColumnarValue::Array(builder.build()))
}
}
impl Default for JsonGetString {
impl Default for JsonGet {
fn default() -> Self {
Self {
signature: Signature::any(2, Volatility::Immutable),
@@ -170,6 +211,13 @@ impl Default for JsonGetString {
}
}
#[derive(Default)]
pub struct JsonGetString(JsonGet);
impl JsonGetString {
pub const NAME: &'static str = "json_get_string";
}
impl Function for JsonGetString {
fn name(&self) -> &str {
Self::NAME
@@ -180,61 +228,142 @@ impl Function for JsonGetString {
}
fn signature(&self) -> &Signature {
&self.signature
&self.0.signature
}
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
let [arg0, arg1] = extract_args(self.name(), &args)?;
struct StringResultBuilder(StringViewBuilder);
let arg1 = compute::cast(&arg1, &DataType::Utf8View)?;
let paths = arg1.as_string_view();
impl JsonGetResultBuilder for StringResultBuilder {
fn append_value(&mut self, value: JsonResultValue<'_>) -> Result<()> {
match value {
JsonResultValue::Jsonb(value) => {
self.0.append_option(jsonb::to_str(&value).ok())
}
JsonResultValue::JsonStructByColumn(column, i) => {
if let Some(v) = string_array_value_at_index(column, i) {
self.0.append_value(v);
} else {
self.0
.append_value(arrow_cast::display::array_value_to_string(
column, i,
)?);
}
}
JsonResultValue::JsonStructByValue(value) => {
if let Some(s) = value.as_str() {
self.0.append_value(s)
} else {
self.0.append_value(value.to_string())
}
}
}
Ok(())
}
let result = match arg0.data_type() {
DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
let arg0 = compute::cast(&arg0, &DataType::BinaryView)?;
let jsons = arg0.as_binary_view();
jsonb_get_string(jsons, paths)?
fn append_null(&mut self) {
self.0.append_null();
}
DataType::Struct(_) => {
let jsons = arg0.as_struct();
json_struct_get_string(jsons, paths)?
}
_ => {
return Err(DataFusionError::Execution(format!(
"{} not supported argument type {}",
Self::NAME,
arg0.data_type(),
)));
}
};
Ok(ColumnarValue::Array(result))
fn build(&mut self) -> ArrayRef {
Arc::new(self.0.finish())
}
}
self.0.invoke(args, |len: usize| {
StringResultBuilder(StringViewBuilder::with_capacity(len))
})
}
}
fn jsonb_get_string(jsons: &BinaryViewArray, paths: &StringViewArray) -> Result<ArrayRef> {
let size = jsons.len();
let mut builder = StringViewBuilder::with_capacity(size);
#[derive(Default)]
pub struct JsonGetInt(JsonGet);
impl JsonGetInt {
pub const NAME: &'static str = "json_get_int";
}
impl Function for JsonGetInt {
fn name(&self) -> &str {
Self::NAME
}
fn return_type(&self, _: &[DataType]) -> Result<DataType> {
Ok(DataType::Int64)
}
fn signature(&self) -> &Signature {
&self.0.signature
}
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
struct IntResultBuilder(Int64Builder);
impl JsonGetResultBuilder for IntResultBuilder {
fn append_value(&mut self, value: JsonResultValue<'_>) -> Result<()> {
match value {
JsonResultValue::Jsonb(value) => {
self.0.append_option(jsonb::to_i64(&value).ok())
}
JsonResultValue::JsonStructByColumn(column, i) => {
self.0.append_option(int_array_value_at_index(column, i))
}
JsonResultValue::JsonStructByValue(value) => {
self.0.append_option(value.as_i64())
}
}
Ok(())
}
fn append_null(&mut self) {
self.0.append_null();
}
fn build(&mut self) -> ArrayRef {
Arc::new(self.0.finish())
}
}
self.0.invoke(args, |len: usize| {
IntResultBuilder(Int64Builder::with_capacity(len))
})
}
}
impl Display for JsonGetInt {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", Self::NAME.to_ascii_uppercase())
}
}
fn jsonb_get(
jsons: &BinaryViewArray,
paths: &StringViewArray,
builder: &mut impl JsonGetResultBuilder,
) -> Result<()> {
let size = jsons.len();
for i in 0..size {
let json = jsons.is_valid(i).then(|| jsons.value(i));
let path = paths.is_valid(i).then(|| paths.value(i));
let result = match (json, path) {
(Some(json), Some(path)) => {
get_json_by_path(json, path).and_then(|json| jsonb::to_str(&json).ok())
}
(Some(json), Some(path)) => get_json_by_path(json, path),
_ => None,
};
builder.append_option(result);
if let Some(v) = result {
builder.append_value(JsonResultValue::Jsonb(v))?;
} else {
builder.append_null();
}
}
Ok(Arc::new(builder.finish()))
Ok(())
}
fn json_struct_get_string(jsons: &StructArray, paths: &StringViewArray) -> Result<ArrayRef> {
fn json_struct_get(
jsons: &StructArray,
paths: &StringViewArray,
builder: &mut impl JsonGetResultBuilder,
) -> Result<()> {
let size = jsons.len();
let mut builder = StringViewBuilder::with_capacity(size);
for i in 0..size {
if jsons.is_null(i) || paths.is_null(i) {
builder.append_null();
@@ -247,11 +376,7 @@ fn json_struct_get_string(jsons: &StructArray, paths: &StringViewArray) -> Resul
let column = jsons.column_by_name(&field_path);
if let Some(column) = column {
if let Some(v) = string_array_value_at_index(column, i) {
builder.append_value(v);
} else {
builder.append_value(arrow_cast::display::array_value_to_string(column, i)?);
}
builder.append_value(JsonResultValue::JsonStructByColumn(column, i))?;
} else {
let Some(raw) = jsons
.column_by_name(JsonStructureSettings::RAW_FIELD)
@@ -272,27 +397,15 @@ fn json_struct_get_string(jsons: &StructArray, paths: &StringViewArray) -> Resul
Value::Null => builder.append_null(),
Value::Array(values) => match values.as_slice() {
[] => builder.append_null(),
[x] => {
if let Some(s) = x.as_str() {
builder.append_value(s)
} else {
builder.append_value(x.to_string())
}
}
x => builder.append_value(
x.iter()
.map(|v| v.to_string())
.collect::<Vec<_>>()
.join(", "),
),
[x] => builder.append_value(JsonResultValue::JsonStructByValue(x))?,
_ => builder.append_value(JsonResultValue::JsonStructByValue(&value))?,
},
// Safety: guarded by the returns of `path.find` as documented
_ => unreachable!(),
value => builder.append_value(JsonResultValue::JsonStructByValue(&value))?,
}
}
}
Ok(Arc::new(builder.finish()))
Ok(())
}
fn json_struct_to_value(raw: &str, jsons: &StructArray, i: usize) -> Result<Value> {
@@ -479,6 +592,50 @@ mod tests {
use super::*;
/// Create a JSON object like this (as a one element struct array for testing):
///
/// ```JSON
/// {
/// "kind": "foo",
/// "payload": {
/// "code": 404,
/// "success": false,
/// "result": {
/// "error": "not found",
/// "time_cost": 1.234
/// }
/// }
/// }
/// ```
fn test_json_struct() -> ArrayRef {
Arc::new(StructArray::new(
vec![
Field::new("kind", DataType::Utf8, true),
Field::new("payload.code", DataType::Int64, true),
Field::new("payload.result.time_cost", DataType::Float64, true),
Field::new(JsonStructureSettings::RAW_FIELD, DataType::Utf8View, true),
]
.into(),
vec![
Arc::new(StringArray::from_iter([Some("foo")])) as ArrayRef,
Arc::new(Int64Array::from_iter([Some(404)])),
Arc::new(Float64Array::from_iter([Some(1.234)])),
Arc::new(StringViewArray::from_iter([Some(
json! ({
"payload": {
"success": false,
"result": {
"error": "not found"
}
}
})
.to_string(),
)])),
],
None,
))
}
#[test]
fn test_json_get_int() {
let json_get_int = JsonGetInt::default();
@@ -496,37 +653,55 @@ mod tests {
r#"{"a": 4, "b": {"c": 6}, "c": 6}"#,
r#"{"a": 7, "b": 8, "c": {"a": 7}}"#,
];
let paths = vec!["$.a.b", "$.a", "$.c"];
let results = [Some(2), Some(4), None];
let json_struct = test_json_struct();
let jsonbs = json_strings
let path_expects = vec![
("$.a.b", Some(2)),
("$.a", Some(4)),
("$.c", None),
("$.kind", None),
("$.payload.code", Some(404)),
("$.payload.success", None),
("$.payload.result.time_cost", None),
("$.payload.not-exists", None),
("$.not-exists", None),
("$", None),
];
let mut jsons = json_strings
.iter()
.map(|s| {
let value = jsonb::parse_value(s.as_bytes()).unwrap();
value.to_vec()
Arc::new(BinaryArray::from_iter_values([value.to_vec()])) as ArrayRef
})
.collect::<Vec<_>>();
let json_struct_arrays =
std::iter::repeat_n(json_struct, path_expects.len() - jsons.len()).collect::<Vec<_>>();
jsons.extend(json_struct_arrays);
let args = ScalarFunctionArgs {
args: vec![
ColumnarValue::Array(Arc::new(BinaryArray::from_iter_values(jsonbs))),
ColumnarValue::Array(Arc::new(StringArray::from_iter_values(paths))),
],
arg_fields: vec![],
number_rows: 3,
return_field: Arc::new(Field::new("x", DataType::Int64, false)),
config_options: Arc::new(Default::default()),
};
let result = json_get_int
.invoke_with_args(args)
.and_then(|x| x.to_array(3))
.unwrap();
let vector = result.as_primitive::<Int64Type>();
for i in 0..jsons.len() {
let json = &jsons[i];
let (path, expect) = path_expects[i];
assert_eq!(3, vector.len());
for (i, gt) in results.iter().enumerate() {
let result = vector.is_valid(i).then(|| vector.value(i));
assert_eq!(*gt, result);
let args = ScalarFunctionArgs {
args: vec![
ColumnarValue::Array(json.clone()),
ColumnarValue::Scalar(path.into()),
],
arg_fields: vec![],
number_rows: 1,
return_field: Arc::new(Field::new("x", DataType::Int64, false)),
config_options: Arc::new(Default::default()),
};
let result = json_get_int
.invoke_with_args(args)
.and_then(|x| x.to_array(1))
.unwrap();
let result = result.as_primitive::<Int64Type>();
assert_eq!(1, result.len());
let actual = result.is_valid(0).then(|| result.value(0));
assert_eq!(actual, expect);
}
}
@@ -649,45 +824,7 @@ mod tests {
r#"{"a": "d", "b": {"c": "e"}, "c": "f"}"#,
r#"{"a": "g", "b": "h", "c": {"a": "g"}}"#,
];
// complete JSON is:
// {
// "kind": "foo",
// "payload": {
// "code": 404,
// "success": false,
// "result": {
// "error": "not found",
// "time_cost": 1.234
// }
// }
// }
let json_struct: ArrayRef = Arc::new(StructArray::new(
vec![
Field::new("kind", DataType::Utf8, true),
Field::new("payload.code", DataType::Int64, true),
Field::new("payload.result.time_cost", DataType::Float64, true),
Field::new(JsonStructureSettings::RAW_FIELD, DataType::Utf8View, true),
]
.into(),
vec![
Arc::new(StringArray::from_iter([Some("foo")])) as ArrayRef,
Arc::new(Int64Array::from_iter([Some(404)])),
Arc::new(Float64Array::from_iter([Some(1.234)])),
Arc::new(StringViewArray::from_iter([Some(
json! ({
"payload": {
"success": false,
"result": {
"error": "not found"
}
}
})
.to_string(),
)])),
],
None,
));
let json_struct = test_json_struct();
let paths = vec![
"$.a.b",

View File

@@ -28,6 +28,7 @@ use crate::node_manager::NodeManagerRef;
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::region_registry::LeaderRegionRegistryRef;
pub mod allocator;
pub mod alter_database;
pub mod alter_logical_tables;
pub mod alter_table;
@@ -36,8 +37,7 @@ pub mod create_database;
pub mod create_flow;
pub mod create_logical_tables;
pub mod create_table;
mod create_table_template;
pub(crate) use create_table_template::{CreateRequestBuilder, build_template_from_raw_table_info};
pub(crate) use create_table::{CreateRequestBuilder, build_template_from_raw_table_info};
pub mod create_view;
pub mod drop_database;
pub mod drop_flow;

View File

@@ -0,0 +1,17 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod region_routes;
pub mod resource_id;
pub mod wal_options;

View File

@@ -0,0 +1,80 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_telemetry::debug;
use store_api::storage::{RegionId, RegionNumber, TableId};
use crate::error::Result;
use crate::peer::PeerAllocator;
use crate::rpc::router::{Region, RegionRoute};
pub type RegionRoutesAllocatorRef = Arc<dyn RegionRoutesAllocator>;
#[async_trait::async_trait]
pub trait RegionRoutesAllocator: Send + Sync {
async fn allocate(
&self,
table_id: TableId,
regions_and_partitions: &[(RegionNumber, &str)],
) -> Result<Vec<RegionRoute>>;
}
#[async_trait::async_trait]
impl<T: PeerAllocator> RegionRoutesAllocator for T {
async fn allocate(
&self,
table_id: TableId,
regions_and_partitions: &[(RegionNumber, &str)],
) -> Result<Vec<RegionRoute>> {
let regions = regions_and_partitions.len().max(1);
let peers = self.alloc(regions).await?;
debug!("Allocated peers {:?} for table {}", peers, table_id,);
let mut region_routes = regions_and_partitions
.iter()
.enumerate()
.map(|(i, (region_number, partition))| {
let region = Region {
id: RegionId::new(table_id, *region_number),
partition_expr: partition.to_string(),
..Default::default()
};
let peer = peers[i % peers.len()].clone();
RegionRoute {
region,
leader_peer: Some(peer),
..Default::default()
}
})
.collect::<Vec<_>>();
// If the table has no partitions, we need to create a default region.
if region_routes.is_empty() {
region_routes.push(RegionRoute {
region: Region {
id: RegionId::new(table_id, 0),
..Default::default()
},
leader_peer: Some(peers[0].clone()),
..Default::default()
});
}
Ok(region_routes)
}
}

View File

@@ -0,0 +1,35 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::Range;
use std::sync::Arc;
use crate::error::Result;
pub type ResourceIdAllocatorRef = Arc<dyn ResourceIdAllocator>;
#[async_trait::async_trait]
pub trait ResourceIdAllocator: Send + Sync {
/// Returns the next value and increments the sequence.
async fn next(&self) -> Result<u64>;
/// Returns the current value stored in the remote storage without incrementing the sequence.
async fn peek(&self) -> Result<u64>;
/// Jumps to the given value.
async fn jump_to(&self, next: u64) -> Result<()>;
/// Returns the range of available sequences.
async fn min_max(&self) -> Range<u64>;
}

View File

@@ -0,0 +1,31 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use store_api::storage::RegionNumber;
use crate::error::Result;
pub type WalOptionsAllocatorRef = Arc<dyn WalOptionsAllocator>;
#[async_trait::async_trait]
pub trait WalOptionsAllocator: Send + Sync {
async fn allocate(
&self,
region_numbers: &[RegionNumber],
skip_wal: bool,
) -> Result<HashMap<RegionNumber, String>>;
}

View File

@@ -30,7 +30,7 @@ use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::metadata::ColumnMetadata;
use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
use store_api::storage::{RegionId, RegionNumber};
use store_api::storage::RegionNumber;
use strum::AsRefStr;
use table::metadata::{RawTableInfo, TableId};
@@ -286,14 +286,7 @@ impl CreateTablesData {
.flat_map(|(task, table_id)| {
if table_id.is_none() {
let table_info = task.table_info.clone();
let region_ids = self
.physical_region_numbers
.iter()
.map(|region_number| {
RegionId::new(table_info.ident.table_id, *region_number)
})
.collect();
let table_route = TableRouteValue::logical(self.physical_table_id, region_ids);
let table_route = TableRouteValue::logical(self.physical_table_id);
Some((table_info, table_route))
} else {
None

View File

@@ -22,7 +22,7 @@ use store_api::storage::{RegionId, TableId};
use table::metadata::RawTableInfo;
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::ddl::create_table_template::{
use crate::ddl::create_table::template::{
CreateRequestBuilder, build_template, build_template_from_raw_table_info,
};
use crate::ddl::utils::region_storage_path;
@@ -97,7 +97,7 @@ pub fn create_region_request_builder(
/// Builds a [CreateRequestBuilder] from a [RawTableInfo].
///
/// Note: **This method is only used for creating logical tables.**
/// Note: This function is primarily intended for creating logical tables or allocating placeholder regions.
pub fn create_region_request_builder_from_raw_table_info(
raw_table_info: &RawTableInfo,
physical_table_id: TableId,

View File

@@ -12,74 +12,99 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod executor;
pub mod template;
use std::collections::HashMap;
use api::v1::region::region_request::Body as PbRegionRequest;
use api::v1::region::{RegionRequest, RegionRequestHeader};
use api::v1::CreateTableExpr;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_procedure::error::{
ExternalSnafu, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, ProcedureId, Status};
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{info, warn};
use futures::future::join_all;
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, ensure};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::ColumnMetadata;
use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY;
use store_api::storage::{RegionId, RegionNumber};
use store_api::storage::RegionNumber;
use strum::AsRefStr;
use table::metadata::{RawTableInfo, TableId};
use table::table_name::TableName;
use table::table_reference::TableReference;
pub(crate) use template::{CreateRequestBuilder, build_template_from_raw_table_info};
use crate::ddl::create_table_template::{CreateRequestBuilder, build_template};
use crate::ddl::utils::raw_table_info::update_table_info_column_ids;
use crate::ddl::utils::{
add_peer_context_if_needed, convert_region_routes_to_detecting_regions,
extract_column_metadatas, map_to_procedure_error, region_storage_path,
};
use crate::ddl::create_table::executor::CreateTableExecutor;
use crate::ddl::create_table::template::build_template;
use crate::ddl::utils::map_to_procedure_error;
use crate::ddl::{DdlContext, TableMetadata};
use crate::error::{self, Result};
use crate::key::table_name::TableNameKey;
use crate::key::table_route::{PhysicalTableRouteValue, TableRouteValue};
use crate::key::table_route::PhysicalTableRouteValue;
use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock};
use crate::metrics;
use crate::region_keeper::OperatingRegionGuard;
use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{
RegionRoute, find_leader_regions, find_leaders, operating_leader_regions,
};
use crate::rpc::router::{RegionRoute, operating_leader_regions};
pub struct CreateTableProcedure {
pub context: DdlContext,
pub creator: TableCreator,
/// The serializable data.
pub data: CreateTableData,
/// The guards of opening.
pub opening_regions: Vec<OperatingRegionGuard>,
/// The executor of the procedure.
pub executor: CreateTableExecutor,
}
fn build_executor_from_create_table_data(
create_table_expr: &CreateTableExpr,
) -> Result<CreateTableExecutor> {
let template = build_template(create_table_expr)?;
let builder = CreateRequestBuilder::new(template, None);
let table_name = TableName::new(
create_table_expr.catalog_name.clone(),
create_table_expr.schema_name.clone(),
create_table_expr.table_name.clone(),
);
let executor =
CreateTableExecutor::new(table_name, create_table_expr.create_if_not_exists, builder);
Ok(executor)
}
impl CreateTableProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateTable";
pub fn new(task: CreateTableTask, context: DdlContext) -> Self {
Self {
pub fn new(task: CreateTableTask, context: DdlContext) -> Result<Self> {
let executor = build_executor_from_create_table_data(&task.create_table)?;
Ok(Self {
context,
creator: TableCreator::new(task),
}
data: CreateTableData::new(task),
opening_regions: vec![],
executor,
})
}
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data = serde_json::from_str(json).context(FromJsonSnafu)?;
let data: CreateTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
let create_table_expr = &data.task.create_table;
let executor = build_executor_from_create_table_data(create_table_expr)
.map_err(BoxedError::new)
.context(ExternalSnafu {
clean_poisons: false,
})?;
Ok(CreateTableProcedure {
context,
creator: TableCreator {
data,
opening_regions: vec![],
},
data,
opening_regions: vec![],
executor,
})
}
fn table_info(&self) -> &RawTableInfo {
&self.creator.data.task.table_info
&self.data.task.table_info
}
pub(crate) fn table_id(&self) -> TableId {
@@ -87,8 +112,7 @@ impl CreateTableProcedure {
}
fn region_wal_options(&self) -> Result<&HashMap<RegionNumber, String>> {
self.creator
.data
self.data
.region_wal_options
.as_ref()
.context(error::UnexpectedSnafu {
@@ -97,8 +121,7 @@ impl CreateTableProcedure {
}
fn table_route(&self) -> Result<&PhysicalTableRouteValue> {
self.creator
.data
self.data
.table_route
.as_ref()
.context(error::UnexpectedSnafu {
@@ -106,17 +129,6 @@ impl CreateTableProcedure {
})
}
#[cfg(any(test, feature = "testing"))]
pub fn set_allocated_metadata(
&mut self,
table_id: TableId,
table_route: PhysicalTableRouteValue,
region_wal_options: HashMap<RegionNumber, String>,
) {
self.creator
.set_allocated_metadata(table_id, table_route, region_wal_options)
}
/// On the prepare step, it performs:
/// - Checks whether the table exists.
/// - Allocates the table id.
@@ -125,31 +137,16 @@ impl CreateTableProcedure {
/// - TableName exists and `create_if_not_exists` is false.
/// - Failed to allocate [TableMetadata].
pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
let expr = &self.creator.data.task.create_table;
let table_name_value = self
.context
.table_metadata_manager
.table_name_manager()
.get(TableNameKey::new(
&expr.catalog_name,
&expr.schema_name,
&expr.table_name,
))
let table_id = self
.executor
.on_prepare(&self.context.table_metadata_manager)
.await?;
if let Some(value) = table_name_value {
ensure!(
expr.create_if_not_exists,
error::TableAlreadyExistsSnafu {
table_name: self.creator.data.table_ref().to_string(),
}
);
let table_id = value.table_id();
// Return the table id if the table already exists.
if let Some(table_id) = table_id {
return Ok(Status::done_with_output(table_id));
}
self.creator.data.state = CreateTableState::DatanodeCreateRegions;
self.data.state = CreateTableState::DatanodeCreateRegions;
let TableMetadata {
table_id,
table_route,
@@ -157,23 +154,13 @@ impl CreateTableProcedure {
} = self
.context
.table_metadata_allocator
.create(&self.creator.data.task)
.create(&self.data.task)
.await?;
self.creator
.set_allocated_metadata(table_id, table_route, region_wal_options);
self.set_allocated_metadata(table_id, table_route, region_wal_options);
Ok(Status::executing(true))
}
pub fn new_region_request_builder(
&self,
physical_table_id: Option<TableId>,
) -> Result<CreateRequestBuilder> {
let create_table_expr = &self.creator.data.task.create_table;
let template = build_template(create_table_expr)?;
Ok(CreateRequestBuilder::new(template, physical_table_id))
}
/// Creates regions on datanodes
///
/// Abort(non-retry):
@@ -187,90 +174,29 @@ impl CreateTableProcedure {
/// - [Code::Unavailable](tonic::status::Code::Unavailable)
pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
let table_route = self.table_route()?.clone();
let request_builder = self.new_region_request_builder(None)?;
// Registers opening regions
let guards = self
.creator
.register_opening_regions(&self.context, &table_route.region_routes)?;
let guards = self.register_opening_regions(&self.context, &table_route.region_routes)?;
if !guards.is_empty() {
self.creator.opening_regions = guards;
self.opening_regions = guards;
}
self.create_regions(&table_route.region_routes, request_builder)
.await
self.create_regions(&table_route.region_routes).await
}
async fn create_regions(
&mut self,
region_routes: &[RegionRoute],
request_builder: CreateRequestBuilder,
) -> Result<Status> {
let create_table_data = &self.creator.data;
// Safety: the region_wal_options must be allocated
async fn create_regions(&mut self, region_routes: &[RegionRoute]) -> Result<Status> {
let table_id = self.table_id();
let region_wal_options = self.region_wal_options()?;
let create_table_expr = &create_table_data.task.create_table;
let catalog = &create_table_expr.catalog_name;
let schema = &create_table_expr.schema_name;
let storage_path = region_storage_path(catalog, schema);
let leaders = find_leaders(region_routes);
let mut create_region_tasks = Vec::with_capacity(leaders.len());
let column_metadatas = self
.executor
.on_create_regions(
&self.context.node_manager,
table_id,
region_routes,
region_wal_options,
)
.await?;
let partition_exprs = region_routes
.iter()
.map(|r| (r.region.id.region_number(), r.region.partition_expr()))
.collect();
for datanode in leaders {
let requester = self.context.node_manager.datanode(&datanode).await;
let regions = find_leader_regions(region_routes, &datanode);
let mut requests = Vec::with_capacity(regions.len());
for region_number in regions {
let region_id = RegionId::new(self.table_id(), region_number);
let create_region_request = request_builder.build_one(
region_id,
storage_path.clone(),
region_wal_options,
&partition_exprs,
);
requests.push(PbRegionRequest::Create(create_region_request));
}
for request in requests {
let request = RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(request),
};
let datanode = datanode.clone();
let requester = requester.clone();
create_region_tasks.push(async move {
requester
.handle(request)
.await
.map_err(add_peer_context_if_needed(datanode))
});
}
}
let mut results = join_all(create_region_tasks)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
if let Some(column_metadatas) =
extract_column_metadatas(&mut results, TABLE_COLUMN_METADATA_EXTENSION_KEY)?
{
self.creator.data.column_metadatas = column_metadatas;
} else {
warn!(
"creating table result doesn't contains extension key `{TABLE_COLUMN_METADATA_EXTENSION_KEY}`,leaving the table's column metadata unchanged"
);
}
self.creator.data.state = CreateTableState::CreateMetadata;
self.data.column_metadatas = column_metadatas;
self.data.state = CreateTableState::CreateMetadata;
Ok(Status::executing(true))
}
@@ -280,107 +206,33 @@ impl CreateTableProcedure {
/// - Failed to create table metadata.
async fn on_create_metadata(&mut self, pid: ProcedureId) -> Result<Status> {
let table_id = self.table_id();
let table_ref = self.creator.data.table_ref();
let table_ref = self.data.table_ref();
let manager = &self.context.table_metadata_manager;
let mut raw_table_info = self.table_info().clone();
if !self.creator.data.column_metadatas.is_empty() {
update_table_info_column_ids(&mut raw_table_info, &self.creator.data.column_metadatas);
}
let raw_table_info = self.table_info().clone();
// Safety: the region_wal_options must be allocated.
let region_wal_options = self.region_wal_options()?.clone();
// Safety: the table_route must be allocated.
let physical_table_route = self.table_route()?.clone();
let detecting_regions =
convert_region_routes_to_detecting_regions(&physical_table_route.region_routes);
let table_route = TableRouteValue::Physical(physical_table_route);
manager
.create_table_metadata(raw_table_info, table_route, region_wal_options)
self.executor
.on_create_metadata(
manager,
&self.context.region_failure_detector_controller,
raw_table_info,
&self.data.column_metadatas,
physical_table_route,
region_wal_options,
)
.await?;
self.context
.register_failure_detectors(detecting_regions)
.await;
info!(
"Successfully created table: {}, table_id: {}, procedure_id: {}",
table_ref, table_id, pid
);
self.creator.opening_regions.clear();
self.opening_regions.clear();
Ok(Status::done_with_output(table_id))
}
}
#[async_trait]
impl Procedure for CreateTableProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
fn recover(&mut self) -> ProcedureResult<()> {
// Only registers regions if the table route is allocated.
if let Some(x) = &self.creator.data.table_route {
self.creator.opening_regions = self
.creator
.register_opening_regions(&self.context, &x.region_routes)
.map_err(BoxedError::new)
.context(ExternalSnafu {
clean_poisons: false,
})?;
}
Ok(())
}
async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &self.creator.data.state;
let _timer = metrics::METRIC_META_PROCEDURE_CREATE_TABLE
.with_label_values(&[state.as_ref()])
.start_timer();
match state {
CreateTableState::Prepare => self.on_prepare().await,
CreateTableState::DatanodeCreateRegions => self.on_datanode_create_regions().await,
CreateTableState::CreateMetadata => self.on_create_metadata(ctx.procedure_id).await,
}
.map_err(map_to_procedure_error)
}
fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.creator.data).context(ToJsonSnafu)
}
fn lock_key(&self) -> LockKey {
let table_ref = &self.creator.data.table_ref();
LockKey::new(vec![
CatalogLock::Read(table_ref.catalog).into(),
SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
TableNameLock::new(table_ref.catalog, table_ref.schema, table_ref.table).into(),
])
}
}
pub struct TableCreator {
/// The serializable data.
pub data: CreateTableData,
/// The guards of opening.
pub opening_regions: Vec<OperatingRegionGuard>,
}
impl TableCreator {
pub fn new(task: CreateTableTask) -> Self {
Self {
data: CreateTableData {
state: CreateTableState::Prepare,
column_metadatas: vec![],
task,
table_route: None,
region_wal_options: None,
},
opening_regions: vec![],
}
}
/// Registers and returns the guards of the opening region if they don't exist.
fn register_opening_regions(
@@ -389,7 +241,6 @@ impl TableCreator {
region_routes: &[RegionRoute],
) -> Result<Vec<OperatingRegionGuard>> {
let opening_regions = operating_leader_regions(region_routes);
if self.opening_regions.len() == opening_regions.len() {
return Ok(vec![]);
}
@@ -409,7 +260,7 @@ impl TableCreator {
Ok(opening_region_guards)
}
fn set_allocated_metadata(
pub fn set_allocated_metadata(
&mut self,
table_id: TableId,
table_route: PhysicalTableRouteValue,
@@ -421,6 +272,56 @@ impl TableCreator {
}
}
#[async_trait]
impl Procedure for CreateTableProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
fn recover(&mut self) -> ProcedureResult<()> {
// Only registers regions if the table route is allocated.
if let Some(x) = &self.data.table_route {
self.opening_regions = self
.register_opening_regions(&self.context, &x.region_routes)
.map_err(BoxedError::new)
.context(ExternalSnafu {
clean_poisons: false,
})?;
}
Ok(())
}
async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &self.data.state;
let _timer = metrics::METRIC_META_PROCEDURE_CREATE_TABLE
.with_label_values(&[state.as_ref()])
.start_timer();
match state {
CreateTableState::Prepare => self.on_prepare().await,
CreateTableState::DatanodeCreateRegions => self.on_datanode_create_regions().await,
CreateTableState::CreateMetadata => self.on_create_metadata(ctx.procedure_id).await,
}
.map_err(map_to_procedure_error)
}
fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
}
fn lock_key(&self) -> LockKey {
let table_ref = &self.data.table_ref();
LockKey::new(vec![
CatalogLock::Read(table_ref.catalog).into(),
SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
TableNameLock::new(table_ref.catalog, table_ref.schema, table_ref.table).into(),
])
}
}
#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)]
pub enum CreateTableState {
/// Prepares to create the table
@@ -444,6 +345,16 @@ pub struct CreateTableData {
}
impl CreateTableData {
pub fn new(task: CreateTableTask) -> Self {
CreateTableData {
state: CreateTableState::Prepare,
column_metadatas: vec![],
task,
table_route: None,
region_wal_options: None,
}
}
fn table_ref(&self) -> TableReference<'_> {
self.task.table_ref()
}

View File

@@ -0,0 +1,203 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::v1::region::region_request::Body as PbRegionRequest;
use api::v1::region::{RegionRequest, RegionRequestHeader};
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::warn;
use futures::future::join_all;
use snafu::ensure;
use store_api::metadata::ColumnMetadata;
use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::{RawTableInfo, TableId};
use table::table_name::TableName;
use crate::ddl::utils::raw_table_info::update_table_info_column_ids;
use crate::ddl::utils::{
add_peer_context_if_needed, convert_region_routes_to_detecting_regions,
extract_column_metadatas, region_storage_path,
};
use crate::ddl::{CreateRequestBuilder, RegionFailureDetectorControllerRef};
use crate::error::{self, Result};
use crate::key::TableMetadataManagerRef;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::{PhysicalTableRouteValue, TableRouteValue};
use crate::node_manager::NodeManagerRef;
use crate::rpc::router::{RegionRoute, find_leader_regions, find_leaders};
/// [CreateTableExecutor] performs:
/// - Creates the metadata of the table.
/// - Creates the regions on the Datanode nodes.
pub struct CreateTableExecutor {
create_if_not_exists: bool,
table_name: TableName,
builder: CreateRequestBuilder,
}
impl CreateTableExecutor {
/// Creates a new [`CreateTableExecutor`].
pub fn new(
table_name: TableName,
create_if_not_exists: bool,
builder: CreateRequestBuilder,
) -> Self {
Self {
create_if_not_exists,
table_name,
builder,
}
}
/// On the prepare step, it performs:
/// - Checks whether the table exists.
/// - Returns the table id if the table exists.
///
/// Abort(non-retry):
/// - Table exists and `create_if_not_exists` is `false`.
/// - Failed to get the table name value.
pub async fn on_prepare(
&self,
table_metadata_manager: &TableMetadataManagerRef,
) -> Result<Option<TableId>> {
let table_name_value = table_metadata_manager
.table_name_manager()
.get(TableNameKey::new(
&self.table_name.catalog_name,
&self.table_name.schema_name,
&self.table_name.table_name,
))
.await?;
if let Some(value) = table_name_value {
ensure!(
self.create_if_not_exists,
error::TableAlreadyExistsSnafu {
table_name: self.table_name.to_string(),
}
);
return Ok(Some(value.table_id()));
}
Ok(None)
}
pub async fn on_create_regions(
&self,
node_manager: &NodeManagerRef,
table_id: TableId,
region_routes: &[RegionRoute],
region_wal_options: &HashMap<RegionNumber, String>,
) -> Result<Vec<ColumnMetadata>> {
let storage_path =
region_storage_path(&self.table_name.catalog_name, &self.table_name.schema_name);
let leaders = find_leaders(region_routes);
let mut create_region_tasks = Vec::with_capacity(leaders.len());
let partition_exprs = region_routes
.iter()
.map(|r| (r.region.id.region_number(), r.region.partition_expr()))
.collect::<HashMap<_, _>>();
for datanode in leaders {
let requester = node_manager.datanode(&datanode).await;
let regions = find_leader_regions(region_routes, &datanode);
let mut requests = Vec::with_capacity(regions.len());
for region_number in regions {
let region_id = RegionId::new(table_id, region_number);
let create_region_request = self.builder.build_one(
region_id,
storage_path.clone(),
region_wal_options,
&partition_exprs,
);
requests.push(PbRegionRequest::Create(create_region_request));
}
for request in requests {
let request = RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(request),
};
let datanode = datanode.clone();
let requester = requester.clone();
create_region_tasks.push(async move {
requester
.handle(request)
.await
.map_err(add_peer_context_if_needed(datanode))
});
}
}
let mut results = join_all(create_region_tasks)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
let column_metadatas = if let Some(column_metadatas) =
extract_column_metadatas(&mut results, TABLE_COLUMN_METADATA_EXTENSION_KEY)?
{
column_metadatas
} else {
warn!(
"creating table result doesn't contains extension key `{TABLE_COLUMN_METADATA_EXTENSION_KEY}`,leaving the table's column metadata unchanged"
);
vec![]
};
Ok(column_metadatas)
}
/// Creates table metadata
///
/// Abort(non-retry):
/// - Failed to create table metadata.
pub async fn on_create_metadata(
&self,
table_metadata_manager: &TableMetadataManagerRef,
region_failure_detector_controller: &RegionFailureDetectorControllerRef,
mut raw_table_info: RawTableInfo,
column_metadatas: &[ColumnMetadata],
table_route: PhysicalTableRouteValue,
region_wal_options: HashMap<RegionNumber, String>,
) -> Result<()> {
if !column_metadatas.is_empty() {
update_table_info_column_ids(&mut raw_table_info, column_metadatas);
}
let detecting_regions =
convert_region_routes_to_detecting_regions(&table_route.region_routes);
let table_route = TableRouteValue::Physical(table_route);
table_metadata_manager
.create_table_metadata(raw_table_info, table_route, region_wal_options)
.await?;
region_failure_detector_controller
.register_failure_detectors(detecting_regions)
.await;
Ok(())
}
/// Returns the builder of the executor.
pub fn builder(&self) -> &CreateRequestBuilder {
&self.builder
}
}

View File

@@ -20,19 +20,17 @@ use api::v1::region::{CreateRequest, RegionColumnDef};
use api::v1::{ColumnDef, CreateTableExpr, SemanticType};
use common_telemetry::warn;
use snafu::{OptionExt, ResultExt};
use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::{RawTableInfo, TableId};
use crate::error::{self, Result};
use crate::wal_options_allocator::prepare_wal_options;
use crate::wal_provider::prepare_wal_options;
/// Builds a [CreateRequest] from a [RawTableInfo].
/// Constructs a [CreateRequest] based on the provided [RawTableInfo].
///
/// Note: **This method is only used for creating logical tables.**
pub(crate) fn build_template_from_raw_table_info(
raw_table_info: &RawTableInfo,
) -> Result<CreateRequest> {
/// Note: This function is primarily intended for creating logical tables or allocating placeholder regions.
pub fn build_template_from_raw_table_info(raw_table_info: &RawTableInfo) -> Result<CreateRequest> {
let primary_key_indices = &raw_table_info.meta.primary_key_indices;
let column_defs = raw_table_info
.meta
@@ -57,7 +55,7 @@ pub(crate) fn build_template_from_raw_table_info(
let options = HashMap::from(&raw_table_info.meta.options);
let template = CreateRequest {
region_id: 0,
engine: METRIC_ENGINE_NAME.to_string(),
engine: raw_table_info.meta.engine.clone(),
column_defs,
primary_key: primary_key_indices.iter().map(|i| *i as u32).collect(),
path: String::new(),
@@ -138,7 +136,7 @@ pub struct CreateRequestBuilder {
}
impl CreateRequestBuilder {
pub(crate) fn new(template: CreateRequest, physical_table_id: Option<TableId>) -> Self {
pub fn new(template: CreateRequest, physical_table_id: Option<TableId>) -> Self {
Self {
template,
physical_table_id,

View File

@@ -120,7 +120,13 @@ impl State for DropDatabaseExecutor {
.await?;
executor.invalidate_table_cache(ddl_ctx).await?;
executor
.on_drop_regions(ddl_ctx, &self.physical_region_routes, true)
.on_drop_regions(
&ddl_ctx.node_manager,
&ddl_ctx.leader_region_registry,
&self.physical_region_routes,
true,
false,
)
.await?;
info!("Table: {}({}) is dropped", self.table_name, self.table_id);

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod executor;
pub mod executor;
mod metadata;
use std::collections::HashMap;
@@ -156,7 +156,13 @@ impl DropTableProcedure {
pub async fn on_datanode_drop_regions(&mut self) -> Result<Status> {
self.executor
.on_drop_regions(&self.context, &self.data.physical_region_routes, false)
.on_drop_regions(
&self.context.node_manager,
&self.context.leader_region_registry,
&self.data.physical_region_routes,
false,
false,
)
.await?;
self.data.state = DropTableState::DeleteTombstone;
Ok(Status::executing(true))

View File

@@ -36,6 +36,8 @@ use crate::error::{self, Result};
use crate::instruction::CacheIdent;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::node_manager::NodeManagerRef;
use crate::region_registry::LeaderRegionRegistryRef;
use crate::rpc::router::{
RegionRoute, find_follower_regions, find_followers, find_leader_regions, find_leaders,
operating_leader_regions,
@@ -212,16 +214,18 @@ impl DropTableExecutor {
/// Drops region on datanode.
pub async fn on_drop_regions(
&self,
ctx: &DdlContext,
node_manager: &NodeManagerRef,
leader_region_registry: &LeaderRegionRegistryRef,
region_routes: &[RegionRoute],
fast_path: bool,
force: bool,
) -> Result<()> {
// Drops leader regions on datanodes.
let leaders = find_leaders(region_routes);
let mut drop_region_tasks = Vec::with_capacity(leaders.len());
let table_id = self.table_id;
for datanode in leaders {
let requester = ctx.node_manager.datanode(&datanode).await;
let requester = node_manager.datanode(&datanode).await;
let regions = find_leader_regions(region_routes, &datanode);
let region_ids = regions
.iter()
@@ -238,6 +242,7 @@ impl DropTableExecutor {
body: Some(region_request::Body::Drop(PbDropRegionRequest {
region_id: region_id.as_u64(),
fast_path,
force,
})),
};
let datanode = datanode.clone();
@@ -262,7 +267,7 @@ impl DropTableExecutor {
let followers = find_followers(region_routes);
let mut close_region_tasks = Vec::with_capacity(followers.len());
for datanode in followers {
let requester = ctx.node_manager.datanode(&datanode).await;
let requester = node_manager.datanode(&datanode).await;
let regions = find_follower_regions(region_routes, &datanode);
let region_ids = regions
.iter()
@@ -307,8 +312,7 @@ impl DropTableExecutor {
// Deletes the leader region from registry.
let region_ids = operating_leader_regions(region_routes);
ctx.leader_region_registry
.batch_delete(region_ids.into_iter().map(|(region_id, _)| region_id));
leader_region_registry.batch_delete(region_ids.into_iter().map(|(region_id, _)| region_id));
Ok(())
}

View File

@@ -17,47 +17,47 @@ use std::sync::Arc;
use common_telemetry::{debug, info};
use snafu::ensure;
use store_api::storage::{RegionId, RegionNumber, TableId};
use store_api::storage::{RegionNumber, TableId};
use crate::ddl::TableMetadata;
use crate::ddl::allocator::region_routes::RegionRoutesAllocatorRef;
use crate::ddl::allocator::resource_id::ResourceIdAllocatorRef;
use crate::ddl::allocator::wal_options::WalOptionsAllocatorRef;
use crate::error::{Result, UnsupportedSnafu};
use crate::key::table_route::PhysicalTableRouteValue;
use crate::peer::{NoopPeerAllocator, PeerAllocatorRef};
use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{Region, RegionRoute};
use crate::sequence::SequenceRef;
use crate::wal_options_allocator::{WalOptionsAllocatorRef, allocate_region_wal_options};
pub type TableMetadataAllocatorRef = Arc<TableMetadataAllocator>;
#[derive(Clone)]
pub struct TableMetadataAllocator {
table_id_sequence: SequenceRef,
table_id_allocator: ResourceIdAllocatorRef,
wal_options_allocator: WalOptionsAllocatorRef,
peer_allocator: PeerAllocatorRef,
region_routes_allocator: RegionRoutesAllocatorRef,
}
impl TableMetadataAllocator {
pub fn new(
table_id_sequence: SequenceRef,
table_id_allocator: ResourceIdAllocatorRef,
wal_options_allocator: WalOptionsAllocatorRef,
) -> Self {
Self::with_peer_allocator(
table_id_sequence,
table_id_allocator,
wal_options_allocator,
Arc::new(NoopPeerAllocator),
)
}
pub fn with_peer_allocator(
table_id_sequence: SequenceRef,
table_id_allocator: ResourceIdAllocatorRef,
wal_options_allocator: WalOptionsAllocatorRef,
peer_allocator: PeerAllocatorRef,
) -> Self {
Self {
table_id_sequence,
table_id_allocator,
wal_options_allocator,
peer_allocator,
region_routes_allocator: Arc::new(peer_allocator) as _,
}
}
@@ -70,7 +70,7 @@ impl TableMetadataAllocator {
ensure!(
!self
.table_id_sequence
.table_id_allocator
.min_max()
.await
.contains(&(table_id as u64)),
@@ -89,65 +89,35 @@ impl TableMetadataAllocator {
table_id
} else {
self.table_id_sequence.next().await? as TableId
self.table_id_allocator.next().await? as TableId
};
Ok(table_id)
}
fn create_wal_options(
async fn create_wal_options(
&self,
table_route: &PhysicalTableRouteValue,
region_numbers: &[RegionNumber],
skip_wal: bool,
) -> Result<HashMap<RegionNumber, String>> {
let region_numbers = table_route
.region_routes
.iter()
.map(|route| route.region.id.region_number())
.collect();
allocate_region_wal_options(region_numbers, &self.wal_options_allocator, skip_wal)
self.wal_options_allocator
.allocate(region_numbers, skip_wal)
.await
}
async fn create_table_route(
&self,
table_id: TableId,
task: &CreateTableTask,
partition_exprs: &[&str],
) -> Result<PhysicalTableRouteValue> {
let regions = task.partitions.len().max(1);
let peers = self.peer_allocator.alloc(regions).await?;
debug!("Allocated peers {:?} for table {}", peers, table_id);
let mut region_routes = task
.partitions
let region_number_and_partition_exprs = partition_exprs
.iter()
.enumerate()
.map(|(i, partition)| {
let region = Region {
id: RegionId::new(table_id, i as u32),
partition_expr: partition.expression.clone(),
..Default::default()
};
let peer = peers[i % peers.len()].clone();
RegionRoute {
region,
leader_peer: Some(peer),
..Default::default()
}
})
.map(|(i, partition)| (i as u32, *partition))
.collect::<Vec<_>>();
// If the table has no partitions, we need to create a default region.
if region_routes.is_empty() {
region_routes.push(RegionRoute {
region: Region {
id: RegionId::new(table_id, 0),
..Default::default()
},
leader_peer: Some(peers[0].clone()),
..Default::default()
});
}
let region_routes = self
.region_routes_allocator
.allocate(table_id, &region_number_and_partition_exprs)
.await?;
Ok(PhysicalTableRouteValue::new(region_routes))
}
@@ -164,10 +134,20 @@ impl TableMetadataAllocator {
pub async fn create(&self, task: &CreateTableTask) -> Result<TableMetadata> {
let table_id = self.allocate_table_id(&task.create_table.table_id).await?;
let table_route = self.create_table_route(table_id, task).await?;
let region_wal_options =
self.create_wal_options(&table_route, task.table_info.meta.options.skip_wal)?;
let partition_exprs = task
.partitions
.iter()
.map(|p| p.expression.as_str())
.collect::<Vec<_>>();
let table_route = self.create_table_route(table_id, &partition_exprs).await?;
let region_numbers = table_route
.region_routes
.iter()
.map(|route| route.region.id.region_number())
.collect::<Vec<_>>();
let region_wal_options = self
.create_wal_options(&region_numbers, task.table_info.meta.options.skip_wal)
.await?;
debug!(
"Allocated region wal options {:?} for table {}",
@@ -181,7 +161,7 @@ impl TableMetadataAllocator {
})
}
pub fn table_id_sequence(&self) -> SequenceRef {
self.table_id_sequence.clone()
pub fn table_id_allocator(&self) -> ResourceIdAllocatorRef {
self.table_id_allocator.clone()
}
}

View File

@@ -128,7 +128,6 @@ pub fn build_raw_table_info_from_expr(expr: &CreateTableExpr) -> RawTableInfo {
value_indices: vec![],
engine: expr.engine.clone(),
next_column_id: expr.column_defs.len() as u32,
region_numbers: vec![],
options: TableOptions::try_from_iter(&expr.table_options).unwrap(),
created_on: DateTime::default(),
updated_on: DateTime::default(),

View File

@@ -166,7 +166,7 @@ async fn test_on_prepare_logical_table_exists_err() {
.table_metadata_manager
.create_logical_tables_metadata(vec![(
task.table_info.clone(),
TableRouteValue::logical(1024, vec![RegionId::new(1025, 1)]),
TableRouteValue::logical(1024),
)])
.await
.unwrap();
@@ -208,7 +208,7 @@ async fn test_on_prepare_with_create_if_table_exists() {
.table_metadata_manager
.create_logical_tables_metadata(vec![(
task.table_info.clone(),
TableRouteValue::logical(1024, vec![RegionId::new(8192, 1)]),
TableRouteValue::logical(1024),
)])
.await
.unwrap();
@@ -252,7 +252,7 @@ async fn test_on_prepare_part_logical_tables_exist() {
.table_metadata_manager
.create_logical_tables_metadata(vec![(
task.table_info.clone(),
TableRouteValue::logical(1024, vec![RegionId::new(8192, 1)]),
TableRouteValue::logical(1024),
)])
.await
.unwrap();
@@ -392,7 +392,7 @@ async fn test_on_create_metadata_part_logical_tables_exist() {
.table_metadata_manager
.create_logical_tables_metadata(vec![(
task.table_info.clone(),
TableRouteValue::logical(1024, vec![RegionId::new(8192, 1)]),
TableRouteValue::logical(1024),
)])
.await
.unwrap();
@@ -496,10 +496,7 @@ async fn test_on_create_metadata_err() {
task.table_info.ident.table_id = 1025;
ddl_context
.table_metadata_manager
.create_logical_tables_metadata(vec![(
task.table_info,
TableRouteValue::logical(512, vec![RegionId::new(1026, 1)]),
)])
.create_logical_tables_metadata(vec![(task.table_info, TableRouteValue::logical(512))])
.await
.unwrap();
// Triggers procedure to create table metadata

View File

@@ -162,7 +162,7 @@ async fn test_on_prepare_table_exists_err() {
)
.await
.unwrap();
let mut procedure = CreateTableProcedure::new(task, ddl_context);
let mut procedure = CreateTableProcedure::new(task, ddl_context).unwrap();
let err = procedure.on_prepare().await.unwrap_err();
assert_matches!(err, Error::TableAlreadyExists { .. });
assert_eq!(err.status_code(), StatusCode::TableAlreadyExists);
@@ -185,7 +185,7 @@ async fn test_on_prepare_with_create_if_table_exists() {
)
.await
.unwrap();
let mut procedure = CreateTableProcedure::new(task, ddl_context);
let mut procedure = CreateTableProcedure::new(task, ddl_context).unwrap();
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Done { output: Some(..) });
let table_id = *status.downcast_output_ref::<u32>().unwrap();
@@ -198,7 +198,7 @@ async fn test_on_prepare_without_create_if_table_exists() {
let ddl_context = new_ddl_context(node_manager);
let mut task = test_create_table_task("foo");
task.create_table.create_if_not_exists = true;
let mut procedure = CreateTableProcedure::new(task, ddl_context);
let mut procedure = CreateTableProcedure::new(task, ddl_context).unwrap();
let status = procedure.on_prepare().await.unwrap();
assert_matches!(
status,
@@ -217,7 +217,7 @@ async fn test_on_datanode_create_regions_should_retry() {
let ddl_context = new_ddl_context(node_manager);
let task = test_create_table_task("foo");
assert!(!task.create_table.create_if_not_exists);
let mut procedure = CreateTableProcedure::new(task, ddl_context);
let mut procedure = CreateTableProcedure::new(task, ddl_context).unwrap();
procedure.on_prepare().await.unwrap();
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
@@ -234,7 +234,7 @@ async fn test_on_datanode_create_regions_should_not_retry() {
let ddl_context = new_ddl_context(node_manager);
let task = test_create_table_task("foo");
assert!(!task.create_table.create_if_not_exists);
let mut procedure = CreateTableProcedure::new(task, ddl_context);
let mut procedure = CreateTableProcedure::new(task, ddl_context).unwrap();
procedure.on_prepare().await.unwrap();
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
@@ -251,7 +251,7 @@ async fn test_on_create_metadata_error() {
let ddl_context = new_ddl_context(node_manager);
let task = test_create_table_task("foo");
assert!(!task.create_table.create_if_not_exists);
let mut procedure = CreateTableProcedure::new(task.clone(), ddl_context.clone());
let mut procedure = CreateTableProcedure::new(task.clone(), ddl_context.clone()).unwrap();
procedure.on_prepare().await.unwrap();
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
@@ -284,7 +284,7 @@ async fn test_on_create_metadata() {
let ddl_context = new_ddl_context(node_manager);
let task = test_create_table_task("foo");
assert!(!task.create_table.create_if_not_exists);
let mut procedure = CreateTableProcedure::new(task, ddl_context.clone());
let mut procedure = CreateTableProcedure::new(task, ddl_context.clone()).unwrap();
procedure.on_prepare().await.unwrap();
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
@@ -312,16 +312,16 @@ async fn test_memory_region_keeper_guard_dropped_on_procedure_done() {
let ddl_context = new_ddl_context_with_kv_backend(node_manager, kv_backend);
let task = test_create_table_task("foo");
let mut procedure = CreateTableProcedure::new(task, ddl_context.clone());
let mut procedure = CreateTableProcedure::new(task, ddl_context.clone()).unwrap();
execute_procedure_until(&mut procedure, |p| {
p.creator.data.state == CreateTableState::CreateMetadata
p.data.state == CreateTableState::CreateMetadata
})
.await;
// Ensure that after running to the state `CreateMetadata`(just past `DatanodeCreateRegions`),
// the opening regions should be recorded:
let guards = &procedure.creator.opening_regions;
let guards = &procedure.opening_regions;
assert_eq!(guards.len(), 1);
let (datanode_id, region_id) = (0, RegionId::new(procedure.table_id(), 0));
assert_eq!(guards[0].info(), (datanode_id, region_id));
@@ -334,7 +334,7 @@ async fn test_memory_region_keeper_guard_dropped_on_procedure_done() {
execute_procedure_until_done(&mut procedure).await;
// Ensure that when run to the end, the opening regions should be cleared:
let guards = &procedure.creator.opening_regions;
let guards = &procedure.opening_regions;
assert!(guards.is_empty());
assert!(
!ddl_context

View File

@@ -259,7 +259,7 @@ async fn test_replace_table() {
{
// Create a `foo` table.
let task = test_create_table_task("foo");
let mut procedure = CreateTableProcedure::new(task, ddl_context.clone());
let mut procedure = CreateTableProcedure::new(task, ddl_context.clone()).unwrap();
procedure.on_prepare().await.unwrap();
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),

View File

@@ -231,7 +231,7 @@ impl DdlManager {
) -> Result<(ProcedureId, Option<Output>)> {
let context = self.create_context();
let procedure = CreateTableProcedure::new(create_table_task, context);
let procedure = CreateTableProcedure::new(create_table_task, context)?;
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
@@ -968,7 +968,7 @@ mod tests {
use crate::region_registry::LeaderRegionRegistry;
use crate::sequence::SequenceBuilder;
use crate::state_store::KvStateStore;
use crate::wal_options_allocator::WalOptionsAllocator;
use crate::wal_provider::WalProvider;
/// A dummy implemented [NodeManager].
pub struct DummyDatanodeManager;
@@ -993,7 +993,7 @@ mod tests {
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
Arc::new(WalOptionsAllocator::default()),
Arc::new(WalProvider::default()),
));
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(

View File

@@ -530,6 +530,49 @@ impl Display for EnterStagingRegion {
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct RemapManifest {
pub region_id: RegionId,
/// Regions to remap manifests from.
pub input_regions: Vec<RegionId>,
/// For each old region, which new regions should receive its files
pub region_mapping: HashMap<RegionId, Vec<RegionId>>,
/// New partition expressions for the new regions.
pub new_partition_exprs: HashMap<RegionId, String>,
}
impl Display for RemapManifest {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"RemapManifest(region_id={}, input_regions={:?}, region_mapping={:?}, new_partition_exprs={:?})",
self.region_id, self.input_regions, self.region_mapping, self.new_partition_exprs
)
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ApplyStagingManifest {
/// The region ID to apply the staging manifest to.
pub region_id: RegionId,
/// The partition expression of the staging region.
pub partition_expr: String,
/// The region that stores the staging manifests in its staging blob storage.
pub central_region_id: RegionId,
/// The relative path to the staging manifest within the central region's staging blob storage.
pub manifest_path: String,
}
impl Display for ApplyStagingManifest {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"ApplyStagingManifest(region_id={}, partition_expr={}, central_region_id={}, manifest_path={})",
self.region_id, self.partition_expr, self.central_region_id, self.manifest_path
)
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
pub enum Instruction {
/// Opens regions.
@@ -559,6 +602,10 @@ pub enum Instruction {
Suspend,
/// Makes regions enter staging state.
EnterStagingRegions(Vec<EnterStagingRegion>),
/// Remaps manifests for a region.
RemapManifest(RemapManifest),
/// Applies staging manifests for a region.
ApplyStagingManifests(Vec<ApplyStagingManifest>),
}
impl Instruction {
@@ -737,6 +784,48 @@ impl EnterStagingRegionsReply {
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct RemapManifestReply {
/// Returns false if the region does not exist.
pub exists: bool,
/// A map from region IDs to their corresponding remapped manifest paths.
pub manifest_paths: HashMap<RegionId, String>,
/// Return error if any during the operation.
pub error: Option<String>,
}
impl Display for RemapManifestReply {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"RemapManifestReply(manifest_paths={:?}, error={:?})",
self.manifest_paths, self.error
)
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct ApplyStagingManifestsReply {
pub replies: Vec<ApplyStagingManifestReply>,
}
impl ApplyStagingManifestsReply {
pub fn new(replies: Vec<ApplyStagingManifestReply>) -> Self {
Self { replies }
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct ApplyStagingManifestReply {
pub region_id: RegionId,
/// Returns true if the region is ready to serve reads and writes.
pub ready: bool,
/// Indicates whether the region exists.
pub exists: bool,
/// Return error if any during the operation.
pub error: Option<String>,
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum InstructionReply {
@@ -758,6 +847,8 @@ pub enum InstructionReply {
GetFileRefs(GetFileRefsReply),
GcRegions(GcRegionsReply),
EnterStagingRegions(EnterStagingRegionsReply),
RemapManifest(RemapManifestReply),
ApplyStagingManifests(ApplyStagingManifestsReply),
}
impl Display for InstructionReply {
@@ -781,6 +872,12 @@ impl Display for InstructionReply {
reply.replies
)
}
Self::RemapManifest(reply) => write!(f, "InstructionReply::RemapManifest({})", reply),
Self::ApplyStagingManifests(reply) => write!(
f,
"InstructionReply::ApplyStagingManifests({:?})",
reply.replies
),
}
}
}
@@ -828,6 +925,20 @@ impl InstructionReply {
_ => panic!("Expected EnterStagingRegion reply"),
}
}
pub fn expect_remap_manifest_reply(self) -> RemapManifestReply {
match self {
Self::RemapManifest(reply) => reply,
_ => panic!("Expected RemapManifest reply"),
}
}
pub fn expect_apply_staging_manifests_reply(self) -> Vec<ApplyStagingManifestReply> {
match self {
Self::ApplyStagingManifests(reply) => reply.replies,
_ => panic!("Expected ApplyStagingManifest reply"),
}
}
}
#[cfg(test)]

View File

@@ -747,12 +747,10 @@ impl TableMetadataManager {
/// The caller MUST ensure it has the exclusive access to `TableNameKey`.
pub async fn create_table_metadata(
&self,
mut table_info: RawTableInfo,
table_info: RawTableInfo,
table_route_value: TableRouteValue,
region_wal_options: HashMap<RegionNumber, String>,
) -> Result<()> {
let region_numbers = table_route_value.region_numbers();
table_info.meta.region_numbers = region_numbers;
let table_id = table_info.ident.table_id;
let engine = table_info.meta.engine.clone();
@@ -851,8 +849,7 @@ impl TableMetadataManager {
on_create_table_route_failure: F2,
}
let mut on_failures = Vec::with_capacity(len);
for (mut table_info, table_route_value) in tables_data {
table_info.meta.region_numbers = table_route_value.region_numbers();
for (table_info, table_route_value) in tables_data {
let table_id = table_info.ident.table_id;
// Creates table name.
@@ -1477,6 +1474,7 @@ mod tests {
use super::datanode_table::DatanodeTableKey;
use super::test_utils;
use crate::ddl::allocator::wal_options::WalOptionsAllocator;
use crate::ddl::test_util::create_table::test_create_table_task;
use crate::ddl::utils::region_storage_path;
use crate::error::Result;
@@ -1493,7 +1491,7 @@ mod tests {
use crate::peer::Peer;
use crate::rpc::router::{LeaderState, Region, RegionRoute, region_distribution};
use crate::rpc::store::RangeRequest;
use crate::wal_options_allocator::{WalOptionsAllocator, allocate_region_wal_options};
use crate::wal_provider::WalProvider;
#[test]
fn test_deserialized_value_with_bytes() {
@@ -1543,8 +1541,8 @@ mod tests {
}
}
fn new_test_table_info(region_numbers: impl Iterator<Item = u32>) -> TableInfo {
test_utils::new_test_table_info(10, region_numbers)
fn new_test_table_info() -> TableInfo {
test_utils::new_test_table_info(10)
}
fn new_test_table_names() -> HashSet<TableName> {
@@ -1602,12 +1600,10 @@ mod tests {
let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
let region_route = new_test_region_route();
let region_routes = &vec![region_route.clone()];
let table_info: RawTableInfo =
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let wal_allocator = WalOptionsAllocator::RaftEngine;
let regions = (0..16).collect();
let region_wal_options =
allocate_region_wal_options(regions, &wal_allocator, false).unwrap();
let table_info: RawTableInfo = new_test_table_info().into();
let wal_provider = WalProvider::RaftEngine;
let regions: Vec<_> = (0..16).collect();
let region_wal_options = wal_provider.allocate(&regions, false).await.unwrap();
create_physical_table_metadata(
&table_metadata_manager,
table_info.clone(),
@@ -1630,8 +1626,7 @@ mod tests {
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let region_route = new_test_region_route();
let region_routes = &vec![region_route.clone()];
let table_info: RawTableInfo =
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let table_info: RawTableInfo = new_test_table_info().into();
let region_wal_options = create_mock_region_wal_options()
.into_iter()
.map(|(k, v)| (k, serde_json::to_string(&v).unwrap()))
@@ -1713,8 +1708,7 @@ mod tests {
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let region_route = new_test_region_route();
let region_routes = vec![region_route.clone()];
let table_info: RawTableInfo =
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let table_info: RawTableInfo = new_test_table_info().into();
let table_id = table_info.ident.table_id;
let table_route_value = TableRouteValue::physical(region_routes.clone());
@@ -1779,7 +1773,6 @@ mod tests {
let table_info: RawTableInfo = test_utils::new_test_table_info_with_name(
table_id,
&format!("my_table_{}", table_id),
region_routes.iter().map(|r| r.region.id.region_number()),
)
.into();
let table_route_value = TableRouteValue::physical(region_routes.clone());
@@ -1800,8 +1793,7 @@ mod tests {
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let region_route = new_test_region_route();
let region_routes = &vec![region_route.clone()];
let table_info: RawTableInfo =
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let table_info: RawTableInfo = new_test_table_info().into();
let table_id = table_info.ident.table_id;
let datanode_id = 2;
let region_wal_options = create_mock_region_wal_options();
@@ -1907,8 +1899,7 @@ mod tests {
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let region_route = new_test_region_route();
let region_routes = vec![region_route.clone()];
let table_info: RawTableInfo =
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let table_info: RawTableInfo = new_test_table_info().into();
let table_id = table_info.ident.table_id;
// creates metadata.
create_physical_table_metadata(
@@ -1984,8 +1975,7 @@ mod tests {
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let region_route = new_test_region_route();
let region_routes = vec![region_route.clone()];
let table_info: RawTableInfo =
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let table_info: RawTableInfo = new_test_table_info().into();
let table_id = table_info.ident.table_id;
// creates metadata.
create_physical_table_metadata(
@@ -2070,8 +2060,7 @@ mod tests {
leader_down_since: None,
},
];
let table_info: RawTableInfo =
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let table_info: RawTableInfo = new_test_table_info().into();
let table_id = table_info.ident.table_id;
let current_table_route_value = DeserializedValueWithBytes::from_inner(
TableRouteValue::physical(region_routes.clone()),
@@ -2153,8 +2142,7 @@ mod tests {
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let region_route = new_test_region_route();
let region_routes = vec![region_route.clone()];
let table_info: RawTableInfo =
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let table_info: RawTableInfo = new_test_table_info().into();
let table_id = table_info.ident.table_id;
let engine = table_info.meta.engine.as_str();
let region_storage_path =
@@ -2408,7 +2396,7 @@ mod tests {
let mem_kv = Arc::new(MemoryKvBackend::default());
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let view_info: RawTableInfo = new_test_table_info(Vec::<u32>::new().into_iter()).into();
let view_info: RawTableInfo = new_test_table_info().into();
let view_id = view_info.ident.table_id;

View File

@@ -338,7 +338,6 @@ mod tests {
next_column_id: 3,
value_indices: vec![2, 3],
options: Default::default(),
region_numbers: vec![1],
partition_key_indices: vec![],
column_ids: vec![],
};

View File

@@ -16,7 +16,7 @@ use std::collections::{HashMap, HashSet};
use std::fmt::Display;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use serde::{Deserialize, Deserializer, Serialize};
use snafu::{OptionExt, ResultExt, ensure};
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;
@@ -62,16 +62,54 @@ pub enum TableRouteValue {
Logical(LogicalTableRouteValue),
}
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone, Default)]
#[derive(Debug, PartialEq, Serialize, Clone, Default)]
pub struct PhysicalTableRouteValue {
// The region routes of the table.
pub region_routes: Vec<RegionRoute>,
// Tracks the highest region number ever allocated for the table.
// This value only increases: adding a region updates it if needed,
// and dropping regions does not decrease it.
pub max_region_number: RegionNumber,
// The version of the table route.
version: u64,
}
impl<'de> Deserialize<'de> for PhysicalTableRouteValue {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: Deserializer<'de>,
{
#[derive(Deserialize)]
struct Helper {
region_routes: Vec<RegionRoute>,
#[serde(default)]
max_region_number: Option<RegionNumber>,
version: u64,
}
let mut helper = Helper::deserialize(deserializer)?;
// If the max region number is not provided, we will calculate it from the region routes.
if helper.max_region_number.is_none() {
let max_region = helper
.region_routes
.iter()
.map(|r| r.region.id.region_number())
.max()
.unwrap_or_default();
helper.max_region_number = Some(max_region);
}
Ok(PhysicalTableRouteValue {
region_routes: helper.region_routes,
max_region_number: helper.max_region_number.unwrap_or_default(),
version: helper.version,
})
}
}
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub struct LogicalTableRouteValue {
physical_table_id: TableId,
region_ids: Vec<RegionId>,
}
impl TableRouteValue {
@@ -85,14 +123,7 @@ impl TableRouteValue {
if table_id == physical_table_id {
TableRouteValue::physical(region_routes)
} else {
let region_routes = region_routes
.into_iter()
.map(|region| {
debug_assert_eq!(region.region.id.table_id(), physical_table_id);
RegionId::new(table_id, region.region.id.region_number())
})
.collect();
TableRouteValue::logical(physical_table_id, region_routes)
TableRouteValue::logical(physical_table_id)
}
}
@@ -100,8 +131,8 @@ impl TableRouteValue {
Self::Physical(PhysicalTableRouteValue::new(region_routes))
}
pub fn logical(physical_table_id: TableId, region_ids: Vec<RegionId>) -> Self {
Self::Logical(LogicalTableRouteValue::new(physical_table_id, region_ids))
pub fn logical(physical_table_id: TableId) -> Self {
Self::Logical(LogicalTableRouteValue::new(physical_table_id))
}
/// Returns a new version [TableRouteValue] with `region_routes`.
@@ -112,9 +143,19 @@ impl TableRouteValue {
err_msg: format!("{self:?} is a non-physical TableRouteValue."),
}
);
let version = self.as_physical_table_route_ref().version;
let physical_table_route = self.as_physical_table_route_ref();
let original_max_region_number = physical_table_route.max_region_number;
let new_max_region_number = region_routes
.iter()
.map(|r| r.region.id.region_number())
.max()
.unwrap_or_default();
let version = physical_table_route.version;
Ok(Self::Physical(PhysicalTableRouteValue {
region_routes,
// If region routes are added, we will update the max region number.
// If region routes are removed, we will keep the original max region number.
max_region_number: original_max_region_number.max(new_max_region_number),
version: version + 1,
}))
}
@@ -167,6 +208,20 @@ impl TableRouteValue {
Ok(&self.as_physical_table_route_ref().region_routes)
}
/// Returns the max region number of this [TableRouteValue::Physical].
///
/// # Panic
/// If it is not the [`PhysicalTableRouteValue`].
pub fn max_region_number(&self) -> Result<RegionNumber> {
ensure!(
self.is_physical(),
UnexpectedLogicalRouteTableSnafu {
err_msg: format!("{self:?} is a non-physical TableRouteValue."),
}
);
Ok(self.as_physical_table_route_ref().max_region_number)
}
/// Returns the reference of [`PhysicalTableRouteValue`].
///
/// # Panic
@@ -207,11 +262,9 @@ impl TableRouteValue {
.iter()
.map(|region_route| region_route.region.id.region_number())
.collect(),
TableRouteValue::Logical(x) => x
.region_ids()
.iter()
.map(|region_id| region_id.region_number())
.collect(),
TableRouteValue::Logical(_) => {
vec![]
}
}
}
}
@@ -237,28 +290,27 @@ impl MetadataValue for TableRouteValue {
impl PhysicalTableRouteValue {
pub fn new(region_routes: Vec<RegionRoute>) -> Self {
let max_region_number = region_routes
.iter()
.map(|r| r.region.id.region_number())
.max()
.unwrap_or_default();
Self {
region_routes,
max_region_number,
version: 0,
}
}
}
impl LogicalTableRouteValue {
pub fn new(physical_table_id: TableId, region_ids: Vec<RegionId>) -> Self {
Self {
physical_table_id,
region_ids,
}
pub fn new(physical_table_id: TableId) -> Self {
Self { physical_table_id }
}
pub fn physical_table_id(&self) -> TableId {
self.physical_table_id
}
pub fn region_ids(&self) -> &Vec<RegionId> {
&self.region_ids
}
}
impl MetadataKey<'_, TableRouteKey> for TableRouteKey {
@@ -823,6 +875,57 @@ mod tests {
use crate::rpc::router::Region;
use crate::rpc::store::PutRequest;
#[test]
fn test_update_table_route_max_region_number() {
let table_route = PhysicalTableRouteValue::new(vec![
RegionRoute {
region: Region {
id: RegionId::new(0, 1),
..Default::default()
},
..Default::default()
},
RegionRoute {
region: Region {
id: RegionId::new(0, 2),
..Default::default()
},
..Default::default()
},
]);
assert_eq!(table_route.max_region_number, 2);
// Shouldn't change the max region number.
let new_table_route = TableRouteValue::Physical(table_route)
.update(vec![RegionRoute {
region: Region {
id: RegionId::new(0, 1),
..Default::default()
},
..Default::default()
}])
.unwrap();
assert_eq!(
new_table_route
.as_physical_table_route_ref()
.max_region_number,
2
);
// Should increase the max region number.
let new_table_route = new_table_route
.update(vec![RegionRoute {
region: Region {
id: RegionId::new(0, 3),
..Default::default()
},
..Default::default()
}])
.unwrap()
.into_physical_table_route();
assert_eq!(new_table_route.max_region_number, 3);
}
#[test]
fn test_table_route_compatibility() {
let old_raw_v = r#"{"region_routes":[{"region":{"id":1,"name":"r1","partition":null,"attrs":{}},"leader_peer":{"id":2,"addr":"a2"},"follower_peers":[]},{"region":{"id":1,"name":"r1","partition":null,"attrs":{}},"leader_peer":{"id":2,"addr":"a2"},"follower_peers":[]}],"version":0}"#;
@@ -863,6 +966,7 @@ mod tests {
leader_down_since: None,
},
],
max_region_number: 1,
version: 0,
});
@@ -900,7 +1004,6 @@ mod tests {
let table_route_manager = TableRouteManager::new(kv.clone());
let table_route_value = TableRouteValue::Logical(LogicalTableRouteValue {
physical_table_id: 1023,
region_ids: vec![RegionId::new(1023, 1)],
});
let (txn, _) = table_route_manager
.table_route_storage()
@@ -930,14 +1033,12 @@ mod tests {
1024,
TableRouteValue::Logical(LogicalTableRouteValue {
physical_table_id: 1023,
region_ids: vec![RegionId::new(1023, 1)],
}),
),
(
1025,
TableRouteValue::Logical(LogicalTableRouteValue {
physical_table_id: 1023,
region_ids: vec![RegionId::new(1023, 2)],
}),
),
];
@@ -976,6 +1077,7 @@ mod tests {
}],
..Default::default()
}],
max_region_number: 0,
version: 0,
});

View File

@@ -19,11 +19,7 @@ use datatypes::schema::{ColumnSchema, SchemaBuilder};
use store_api::storage::TableId;
use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder};
pub fn new_test_table_info_with_name<I: IntoIterator<Item = u32>>(
table_id: TableId,
table_name: &str,
region_numbers: I,
) -> TableInfo {
pub fn new_test_table_info_with_name(table_id: TableId, table_name: &str) -> TableInfo {
let column_schemas = vec![
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new(
@@ -45,7 +41,6 @@ pub fn new_test_table_info_with_name<I: IntoIterator<Item = u32>>(
.primary_key_indices(vec![0])
.engine("engine")
.next_column_id(3)
.region_numbers(region_numbers.into_iter().collect::<Vec<_>>())
.build()
.unwrap();
TableInfoBuilder::default()
@@ -56,9 +51,6 @@ pub fn new_test_table_info_with_name<I: IntoIterator<Item = u32>>(
.build()
.unwrap()
}
pub fn new_test_table_info<I: IntoIterator<Item = u32>>(
table_id: TableId,
region_numbers: I,
) -> TableInfo {
new_test_table_info_with_name(table_id, "mytable", region_numbers)
pub fn new_test_table_info(table_id: TableId) -> TableInfo {
new_test_table_info_with_name(table_id, "mytable")
}

View File

@@ -613,7 +613,7 @@ mod tests {
#[tokio::test]
async fn test_mysql_put() {
maybe_skip_mysql_integration_test!();
let kv_backend = build_mysql_kv_backend("put_test").await.unwrap();
let kv_backend = build_mysql_kv_backend("put-test").await.unwrap();
let prefix = b"put/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await;
@@ -623,7 +623,7 @@ mod tests {
#[tokio::test]
async fn test_mysql_range() {
maybe_skip_mysql_integration_test!();
let kv_backend = build_mysql_kv_backend("range_test").await.unwrap();
let kv_backend = build_mysql_kv_backend("range-test").await.unwrap();
let prefix = b"range/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await;
@@ -633,7 +633,7 @@ mod tests {
#[tokio::test]
async fn test_mysql_range_2() {
maybe_skip_mysql_integration_test!();
let kv_backend = build_mysql_kv_backend("range2_test").await.unwrap();
let kv_backend = build_mysql_kv_backend("range2-test").await.unwrap();
let prefix = b"range2/";
test_kv_range_2_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
@@ -642,7 +642,7 @@ mod tests {
#[tokio::test]
async fn test_mysql_all_range() {
maybe_skip_mysql_integration_test!();
let kv_backend = build_mysql_kv_backend("simple_range_test").await.unwrap();
let kv_backend = build_mysql_kv_backend("simple_range-test").await.unwrap();
let prefix = b"";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_simple_kv_range(&kv_backend).await;
@@ -652,7 +652,7 @@ mod tests {
#[tokio::test]
async fn test_mysql_batch_get() {
maybe_skip_mysql_integration_test!();
let kv_backend = build_mysql_kv_backend("batch_get_test").await.unwrap();
let kv_backend = build_mysql_kv_backend("batch_get-test").await.unwrap();
let prefix = b"batch_get/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await;
@@ -662,7 +662,7 @@ mod tests {
#[tokio::test]
async fn test_mysql_batch_delete() {
maybe_skip_mysql_integration_test!();
let kv_backend = build_mysql_kv_backend("batch_delete_test").await.unwrap();
let kv_backend = build_mysql_kv_backend("batch_delete-test").await.unwrap();
let prefix = b"batch_delete/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await;
@@ -672,7 +672,7 @@ mod tests {
#[tokio::test]
async fn test_mysql_batch_delete_with_prefix() {
maybe_skip_mysql_integration_test!();
let kv_backend = build_mysql_kv_backend("batch_delete_with_prefix_test")
let kv_backend = build_mysql_kv_backend("batch_delete_with_prefix-test")
.await
.unwrap();
let prefix = b"batch_delete/";
@@ -684,7 +684,7 @@ mod tests {
#[tokio::test]
async fn test_mysql_delete_range() {
maybe_skip_mysql_integration_test!();
let kv_backend = build_mysql_kv_backend("delete_range_test").await.unwrap();
let kv_backend = build_mysql_kv_backend("delete_range-test").await.unwrap();
let prefix = b"delete_range/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await;
@@ -694,7 +694,7 @@ mod tests {
#[tokio::test]
async fn test_mysql_compare_and_put() {
maybe_skip_mysql_integration_test!();
let kv_backend = build_mysql_kv_backend("compare_and_put_test")
let kv_backend = build_mysql_kv_backend("compare_and_put-test")
.await
.unwrap();
let prefix = b"compare_and_put/";
@@ -705,7 +705,7 @@ mod tests {
#[tokio::test]
async fn test_mysql_txn() {
maybe_skip_mysql_integration_test!();
let kv_backend = build_mysql_kv_backend("txn_test").await.unwrap();
let kv_backend = build_mysql_kv_backend("txn-test").await.unwrap();
test_txn_one_compare_op(&kv_backend).await;
text_txn_multi_compare_op(&kv_backend).await;
test_txn_compare_equal(&kv_backend).await;

View File

@@ -868,6 +868,8 @@ impl PgStore {
let client = match pool.get().await {
Ok(client) => client,
Err(e) => {
// We need to log the debug for the error to help diagnose the issue.
common_telemetry::error!(e; "Failed to get Postgres connection.");
return GetPostgresConnectionSnafu {
reason: e.to_string(),
}
@@ -1103,7 +1105,7 @@ mod tests {
#[tokio::test]
async fn test_pg_put() {
maybe_skip_postgres_integration_test!();
let kv_backend = build_pg_kv_backend("put_test").await.unwrap();
let kv_backend = build_pg_kv_backend("put-test").await.unwrap();
let prefix = b"put/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await;
@@ -1113,7 +1115,7 @@ mod tests {
#[tokio::test]
async fn test_pg_range() {
maybe_skip_postgres_integration_test!();
let kv_backend = build_pg_kv_backend("range_test").await.unwrap();
let kv_backend = build_pg_kv_backend("range-test").await.unwrap();
let prefix = b"range/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await;
@@ -1123,7 +1125,7 @@ mod tests {
#[tokio::test]
async fn test_pg_range_2() {
maybe_skip_postgres_integration_test!();
let kv_backend = build_pg_kv_backend("range2_test").await.unwrap();
let kv_backend = build_pg_kv_backend("range2-test").await.unwrap();
let prefix = b"range2/";
test_kv_range_2_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
@@ -1132,7 +1134,7 @@ mod tests {
#[tokio::test]
async fn test_pg_all_range() {
maybe_skip_postgres_integration_test!();
let kv_backend = build_pg_kv_backend("simple_range_test").await.unwrap();
let kv_backend = build_pg_kv_backend("simple_range-test").await.unwrap();
let prefix = b"";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_simple_kv_range(&kv_backend).await;
@@ -1142,7 +1144,7 @@ mod tests {
#[tokio::test]
async fn test_pg_batch_get() {
maybe_skip_postgres_integration_test!();
let kv_backend = build_pg_kv_backend("batch_get_test").await.unwrap();
let kv_backend = build_pg_kv_backend("batch_get-test").await.unwrap();
let prefix = b"batch_get/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await;
@@ -1152,7 +1154,7 @@ mod tests {
#[tokio::test]
async fn test_pg_batch_delete() {
maybe_skip_postgres_integration_test!();
let kv_backend = build_pg_kv_backend("batch_delete_test").await.unwrap();
let kv_backend = build_pg_kv_backend("batch_delete-test").await.unwrap();
let prefix = b"batch_delete/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await;
@@ -1162,7 +1164,7 @@ mod tests {
#[tokio::test]
async fn test_pg_batch_delete_with_prefix() {
maybe_skip_postgres_integration_test!();
let kv_backend = build_pg_kv_backend("batch_delete_with_prefix_test")
let kv_backend = build_pg_kv_backend("batch_delete_with_prefix-test")
.await
.unwrap();
let prefix = b"batch_delete/";
@@ -1174,7 +1176,7 @@ mod tests {
#[tokio::test]
async fn test_pg_delete_range() {
maybe_skip_postgres_integration_test!();
let kv_backend = build_pg_kv_backend("delete_range_test").await.unwrap();
let kv_backend = build_pg_kv_backend("delete_range-test").await.unwrap();
let prefix = b"delete_range/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await;
@@ -1184,7 +1186,7 @@ mod tests {
#[tokio::test]
async fn test_pg_compare_and_put() {
maybe_skip_postgres_integration_test!();
let kv_backend = build_pg_kv_backend("compare_and_put_test").await.unwrap();
let kv_backend = build_pg_kv_backend("compare_and_put-test").await.unwrap();
let prefix = b"compare_and_put/";
let kv_backend = Arc::new(kv_backend);
test_kv_compare_and_put_with_prefix(kv_backend.clone(), prefix.to_vec()).await;
@@ -1193,7 +1195,7 @@ mod tests {
#[tokio::test]
async fn test_pg_txn() {
maybe_skip_postgres_integration_test!();
let kv_backend = build_pg_kv_backend("txn_test").await.unwrap();
let kv_backend = build_pg_kv_backend("txn-test").await.unwrap();
test_txn_one_compare_op(&kv_backend).await;
text_txn_multi_compare_op(&kv_backend).await;
test_txn_compare_equal(&kv_backend).await;

View File

@@ -48,7 +48,7 @@ pub mod stats;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;
pub mod util;
pub mod wal_options_allocator;
pub mod wal_provider;
// The id of the datanode.
pub type DatanodeId = u64;

View File

@@ -81,6 +81,13 @@ pub trait PeerAllocator: Send + Sync {
pub type PeerAllocatorRef = Arc<dyn PeerAllocator>;
#[async_trait::async_trait]
impl<T: PeerAllocator + ?Sized> PeerAllocator for Arc<T> {
async fn alloc(&self, num: usize) -> Result<Vec<Peer>, Error> {
T::alloc(self, num).await
}
}
pub struct NoopPeerAllocator;
#[async_trait::async_trait]

View File

@@ -144,6 +144,8 @@ impl ReconcileRegions {
}
/// Creates a region request builder from a raw table info.
///
/// Note: This function is primarily intended for creating logical tables or allocating placeholder regions.
fn create_region_request_from_raw_table_info(
raw_table_info: &RawTableInfo,
physical_table_id: TableId,

View File

@@ -1639,7 +1639,6 @@ mod tests {
value_indices: vec![2],
engine: METRIC_ENGINE_NAME.to_string(),
next_column_id: 0,
region_numbers: vec![0],
options: Default::default(),
created_on: Default::default(),
updated_on: Default::default(),

View File

@@ -19,6 +19,7 @@ use common_telemetry::{debug, warn};
use snafu::ensure;
use tokio::sync::Mutex;
use crate::ddl::allocator::resource_id::ResourceIdAllocator;
use crate::error::{self, Result};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::CompareAndPutRequest;
@@ -82,6 +83,25 @@ pub struct Sequence {
inner: Mutex<Inner>,
}
#[async_trait::async_trait]
impl ResourceIdAllocator for Sequence {
async fn next(&self) -> Result<u64> {
self.next().await
}
async fn peek(&self) -> Result<u64> {
self.peek().await
}
async fn jump_to(&self, next: u64) -> Result<()> {
self.jump_to(next).await
}
async fn min_max(&self) -> Range<u64> {
self.min_max().await
}
}
impl Sequence {
/// Returns the next value and increments the sequence.
pub async fn next(&self) -> Result<u64> {

View File

@@ -40,8 +40,8 @@ use crate::peer::{Peer, PeerResolver};
use crate::region_keeper::MemoryRegionKeeper;
use crate::region_registry::LeaderRegionRegistry;
use crate::sequence::SequenceBuilder;
use crate::wal_options_allocator::topic_pool::KafkaTopicPool;
use crate::wal_options_allocator::{WalOptionsAllocator, build_kafka_topic_creator};
use crate::wal_provider::topic_pool::KafkaTopicPool;
use crate::wal_provider::{WalProvider, build_kafka_topic_creator};
use crate::{DatanodeId, FlownodeId};
#[async_trait::async_trait]
@@ -187,7 +187,7 @@ pub fn new_ddl_context_with_kv_backend(
.initial(1024)
.build(),
),
Arc::new(WalOptionsAllocator::default()),
Arc::new(WalProvider::default()),
));
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
let flow_metadata_allocator =

View File

@@ -26,28 +26,46 @@ use common_wal::options::{KafkaWalOptions, WAL_OPTIONS_KEY, WalOptions};
use snafu::{ResultExt, ensure};
use store_api::storage::{RegionId, RegionNumber};
use crate::ddl::allocator::wal_options::WalOptionsAllocator;
use crate::error::{EncodeWalOptionsSnafu, InvalidTopicNamePrefixSnafu, Result};
use crate::key::TOPIC_NAME_PATTERN_REGEX;
use crate::kv_backend::KvBackendRef;
use crate::leadership_notifier::LeadershipChangeListener;
pub use crate::wal_options_allocator::topic_creator::{
build_kafka_client, build_kafka_topic_creator,
};
use crate::wal_options_allocator::topic_pool::KafkaTopicPool;
pub use crate::wal_provider::topic_creator::{build_kafka_client, build_kafka_topic_creator};
use crate::wal_provider::topic_pool::KafkaTopicPool;
/// Allocates wal options in region granularity.
/// Provides wal options in region granularity.
#[derive(Default, Debug)]
pub enum WalOptionsAllocator {
pub enum WalProvider {
#[default]
RaftEngine,
Kafka(KafkaTopicPool),
}
/// Arc wrapper of WalOptionsAllocator.
pub type WalOptionsAllocatorRef = Arc<WalOptionsAllocator>;
/// Arc wrapper of WalProvider.
pub type WalProviderRef = Arc<WalProvider>;
impl WalOptionsAllocator {
/// Tries to start the allocator.
#[async_trait::async_trait]
impl WalOptionsAllocator for WalProvider {
async fn allocate(
&self,
region_numbers: &[RegionNumber],
skip_wal: bool,
) -> Result<HashMap<RegionNumber, String>> {
let wal_options = self
.alloc_batch(region_numbers.len(), skip_wal)?
.into_iter()
.map(|wal_options| {
serde_json::to_string(&wal_options).context(EncodeWalOptionsSnafu { wal_options })
})
.collect::<Result<Vec<_>>>()?;
Ok(region_numbers.iter().copied().zip(wal_options).collect())
}
}
impl WalProvider {
/// Tries to start the provider.
pub async fn start(&self) -> Result<()> {
match self {
Self::RaftEngine => Ok(()),
@@ -56,14 +74,14 @@ impl WalOptionsAllocator {
}
/// Allocates a batch of wal options where each wal options goes to a region.
/// If skip_wal is true, the wal options will be set to Noop regardless of the allocator type.
/// If skip_wal is true, the wal options will be set to Noop regardless of the provider type.
pub fn alloc_batch(&self, num_regions: usize, skip_wal: bool) -> Result<Vec<WalOptions>> {
if skip_wal {
return Ok(vec![WalOptions::Noop; num_regions]);
}
match self {
WalOptionsAllocator::RaftEngine => Ok(vec![WalOptions::RaftEngine; num_regions]),
WalOptionsAllocator::Kafka(topic_manager) => {
WalProvider::RaftEngine => Ok(vec![WalOptions::RaftEngine; num_regions]),
WalProvider::Kafka(topic_manager) => {
let options_batch = topic_manager
.select_batch(num_regions)?
.into_iter()
@@ -80,14 +98,14 @@ impl WalOptionsAllocator {
/// Returns true if it's the remote WAL.
pub fn is_remote_wal(&self) -> bool {
matches!(&self, WalOptionsAllocator::Kafka(_))
matches!(&self, WalProvider::Kafka(_))
}
}
#[async_trait]
impl LeadershipChangeListener for WalOptionsAllocator {
impl LeadershipChangeListener for WalProvider {
fn name(&self) -> &str {
"WalOptionsAllocator"
"WalProvider"
}
async fn on_leader_start(&self) -> Result<()> {
@@ -99,13 +117,13 @@ impl LeadershipChangeListener for WalOptionsAllocator {
}
}
/// Builds a wal options allocator based on the given configuration.
pub async fn build_wal_options_allocator(
/// Builds a wal provider based on the given configuration.
pub async fn build_wal_provider(
config: &MetasrvWalConfig,
kv_backend: KvBackendRef,
) -> Result<WalOptionsAllocator> {
) -> Result<WalProvider> {
match config {
MetasrvWalConfig::RaftEngine => Ok(WalOptionsAllocator::RaftEngine),
MetasrvWalConfig::RaftEngine => Ok(WalProvider::RaftEngine),
MetasrvWalConfig::Kafka(kafka_config) => {
let prefix = &kafka_config.kafka_topic.topic_name_prefix;
ensure!(
@@ -116,28 +134,11 @@ pub async fn build_wal_options_allocator(
build_kafka_topic_creator(&kafka_config.connection, &kafka_config.kafka_topic)
.await?;
let topic_pool = KafkaTopicPool::new(kafka_config, kv_backend, topic_creator);
Ok(WalOptionsAllocator::Kafka(topic_pool))
Ok(WalProvider::Kafka(topic_pool))
}
}
}
/// Allocates a wal options for each region. The allocated wal options is encoded immediately.
pub fn allocate_region_wal_options(
regions: Vec<RegionNumber>,
wal_options_allocator: &WalOptionsAllocator,
skip_wal: bool,
) -> Result<HashMap<RegionNumber, String>> {
let wal_options = wal_options_allocator
.alloc_batch(regions.len(), skip_wal)?
.into_iter()
.map(|wal_options| {
serde_json::to_string(&wal_options).context(EncodeWalOptionsSnafu { wal_options })
})
.collect::<Result<Vec<_>>>()?;
Ok(regions.into_iter().zip(wal_options).collect())
}
/// Inserts wal options into options.
pub fn prepare_wal_options(
options: &mut HashMap<String, String>,
@@ -182,21 +183,19 @@ mod tests {
use crate::error::Error;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::test_util::test_kafka_topic_pool;
use crate::wal_options_allocator::selector::RoundRobinTopicSelector;
use crate::wal_provider::selector::RoundRobinTopicSelector;
// Tests that the wal options allocator could successfully allocate raft-engine wal options.
// Tests that the wal provider could successfully allocate raft-engine wal options.
#[tokio::test]
async fn test_allocator_with_raft_engine() {
async fn test_provider_with_raft_engine() {
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let wal_config = MetasrvWalConfig::RaftEngine;
let allocator = build_wal_options_allocator(&wal_config, kv_backend)
.await
.unwrap();
allocator.start().await.unwrap();
let provider = build_wal_provider(&wal_config, kv_backend).await.unwrap();
provider.start().await.unwrap();
let num_regions = 32;
let regions = (0..num_regions).collect::<Vec<_>>();
let got = allocate_region_wal_options(regions.clone(), &allocator, false).unwrap();
let got = provider.allocate(&regions, false).await.unwrap();
let encoded_wal_options = serde_json::to_string(&WalOptions::RaftEngine).unwrap();
let expected = regions
@@ -216,14 +215,14 @@ mod tests {
},
..Default::default()
});
let got = build_wal_options_allocator(&wal_config, kv_backend)
let got = build_wal_provider(&wal_config, kv_backend)
.await
.unwrap_err();
assert_matches!(got, Error::InvalidTopicNamePrefix { .. });
}
#[tokio::test]
async fn test_allocator_with_kafka_allocate_wal_options() {
async fn test_provider_with_kafka_allocate_wal_options() {
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_integration_test!();
let num_topics = 5;
@@ -240,13 +239,13 @@ mod tests {
let topic_creator = topic_pool.topic_creator();
topic_creator.delete_topics(&topics).await.unwrap();
// Creates an options allocator.
let allocator = WalOptionsAllocator::Kafka(topic_pool);
allocator.start().await.unwrap();
// Creates an options provider.
let provider = WalProvider::Kafka(topic_pool);
provider.start().await.unwrap();
let num_regions = 3;
let regions = (0..num_regions).collect::<Vec<_>>();
let got = allocate_region_wal_options(regions.clone(), &allocator, false).unwrap();
let got = provider.allocate(&regions, false).await.unwrap();
// Check the allocated wal options contain the expected topics.
let expected = (0..num_regions)
@@ -261,13 +260,13 @@ mod tests {
}
#[tokio::test]
async fn test_allocator_with_skip_wal() {
let allocator = WalOptionsAllocator::RaftEngine;
allocator.start().await.unwrap();
async fn test_provider_with_skip_wal() {
let provider = WalProvider::RaftEngine;
provider.start().await.unwrap();
let num_regions = 32;
let regions = (0..num_regions).collect::<Vec<_>>();
let got = allocate_region_wal_options(regions.clone(), &allocator, true).unwrap();
let got = provider.allocate(&regions, true).await.unwrap();
assert_eq!(got.len(), num_regions as usize);
for wal_options in got.values() {
assert_eq!(wal_options, &"{\"wal.provider\":\"noop\"}");

View File

@@ -22,9 +22,9 @@ use snafu::ensure;
use crate::error::{InvalidNumTopicsSnafu, Result};
use crate::kv_backend::KvBackendRef;
use crate::wal_options_allocator::selector::{RoundRobinTopicSelector, TopicSelectorRef};
use crate::wal_options_allocator::topic_creator::KafkaTopicCreator;
use crate::wal_options_allocator::topic_manager::KafkaTopicManager;
use crate::wal_provider::selector::{RoundRobinTopicSelector, TopicSelectorRef};
use crate::wal_provider::topic_creator::KafkaTopicCreator;
use crate::wal_provider::topic_manager::KafkaTopicManager;
/// Topic pool for kafka remote wal.
/// Responsible for:
@@ -144,7 +144,7 @@ mod tests {
use super::*;
use crate::error::Error;
use crate::test_util::test_kafka_topic_pool;
use crate::wal_options_allocator::selector::RoundRobinTopicSelector;
use crate::wal_provider::selector::RoundRobinTopicSelector;
#[tokio::test]
async fn test_pool_invalid_number_topics_err() {

View File

@@ -21,6 +21,7 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use common_base::readable_size::ReadableSize;
use common_telemetry::tracing::{Span, info_span};
use common_time::util::format_nanoseconds_human_readable;
use datafusion::arrow::compute::cast;
use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
@@ -218,6 +219,7 @@ pub struct RecordBatchStreamAdapter {
metrics_2: Metrics,
/// Display plan and metrics in verbose mode.
explain_verbose: bool,
span: Span,
}
/// Json encoded metrics. Contains metric from a whole plan tree.
@@ -238,22 +240,21 @@ impl RecordBatchStreamAdapter {
metrics: None,
metrics_2: Metrics::Unavailable,
explain_verbose: false,
span: Span::current(),
})
}
pub fn try_new_with_metrics_and_df_plan(
stream: DfSendableRecordBatchStream,
metrics: BaselineMetrics,
df_plan: Arc<dyn ExecutionPlan>,
) -> Result<Self> {
pub fn try_new_with_span(stream: DfSendableRecordBatchStream, span: Span) -> Result<Self> {
let schema =
Arc::new(Schema::try_from(stream.schema()).context(error::SchemaConversionSnafu)?);
let subspan = info_span!(parent: &span, "RecordBatchStreamAdapter");
Ok(Self {
schema,
stream,
metrics: Some(metrics),
metrics_2: Metrics::Unresolved(df_plan),
metrics: None,
metrics_2: Metrics::Unavailable,
explain_verbose: false,
span: subspan,
})
}
@@ -300,6 +301,8 @@ impl Stream for RecordBatchStreamAdapter {
.map(|m| m.elapsed_compute().clone())
.unwrap_or_default();
let _guard = timer.timer();
let poll_span = info_span!(parent: &self.span, "poll_next");
let _entered = poll_span.enter();
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(df_record_batch)) => {

View File

@@ -29,6 +29,7 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use adapter::RecordBatchMetrics;
use arc_swap::ArcSwapOption;
use common_base::readable_size::ReadableSize;
use common_telemetry::tracing::Span;
pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::arrow::array::{ArrayRef, AsArray, StringBuilder};
use datatypes::arrow::compute::SortOptions;
@@ -370,6 +371,7 @@ pub struct RecordBatchStreamWrapper<S> {
pub stream: S,
pub output_ordering: Option<Vec<OrderOption>>,
pub metrics: Arc<ArcSwapOption<RecordBatchMetrics>>,
pub span: Span,
}
impl<S> RecordBatchStreamWrapper<S> {
@@ -380,6 +382,7 @@ impl<S> RecordBatchStreamWrapper<S> {
stream,
output_ordering: None,
metrics: Default::default(),
span: Span::current(),
}
}
}
@@ -408,6 +411,7 @@ impl<S: Stream<Item = Result<RecordBatch>> + Unpin> Stream for RecordBatchStream
type Item = Result<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let _entered = self.span.clone().entered();
Pin::new(&mut self.stream).poll_next(ctx)
}
}

View File

@@ -77,4 +77,5 @@ common-query.workspace = true
common-test-util.workspace = true
datafusion-common.workspace = true
mito2 = { workspace = true, features = ["test"] }
partition.workspace = true
session.workspace = true

View File

@@ -14,7 +14,7 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use api::v1::meta::GrantedRegion;
use async_trait::async_trait;
@@ -50,7 +50,7 @@ use crate::region_server::RegionServer;
pub struct RegionAliveKeeper {
region_server: RegionServer,
tasks: Arc<Mutex<HashMap<RegionId, Arc<CountdownTaskHandle>>>>,
heartbeat_interval_millis: u64,
heartbeat_interval_millis: Arc<AtomicU64>,
started: Arc<AtomicBool>,
/// The epoch when [RegionAliveKeeper] is created. It's used to get a monotonically non-decreasing
@@ -67,18 +67,26 @@ impl RegionAliveKeeper {
pub fn new(
region_server: RegionServer,
countdown_task_handler_ext: Option<CountdownTaskHandlerExtRef>,
heartbeat_interval_millis: u64,
heartbeat_interval: Duration,
) -> Self {
Self {
region_server,
tasks: Arc::new(Mutex::new(HashMap::new())),
heartbeat_interval_millis,
heartbeat_interval_millis: Arc::new(AtomicU64::new(
heartbeat_interval.as_millis() as u64
)),
started: Arc::new(AtomicBool::new(false)),
epoch: Instant::now(),
countdown_task_handler_ext,
}
}
/// Update the heartbeat interval with the value received from Metasrv.
pub fn update_heartbeat_interval(&self, heartbeat_interval_millis: u64) {
self.heartbeat_interval_millis
.store(heartbeat_interval_millis, Ordering::Relaxed);
}
async fn find_handle(&self, region_id: RegionId) -> Option<Arc<CountdownTaskHandle>> {
self.tasks.lock().await.get(&region_id).cloned()
}
@@ -108,7 +116,9 @@ impl RegionAliveKeeper {
};
if should_start {
handle.start(self.heartbeat_interval_millis).await;
handle
.start(self.heartbeat_interval_millis.load(Ordering::Relaxed))
.await;
info!("Region alive countdown for region {region_id} is started!");
} else {
info!(
@@ -230,8 +240,9 @@ impl RegionAliveKeeper {
}
let tasks = self.tasks.lock().await;
let interval = self.heartbeat_interval_millis.load(Ordering::Relaxed);
for task in tasks.values() {
task.start(self.heartbeat_interval_millis).await;
task.start(interval).await;
}
info!(
@@ -505,7 +516,11 @@ mod test {
let engine = Arc::new(engine);
region_server.register_engine(engine.clone());
let alive_keeper = Arc::new(RegionAliveKeeper::new(region_server.clone(), None, 100));
let alive_keeper = Arc::new(RegionAliveKeeper::new(
region_server.clone(),
None,
Duration::from_millis(100),
));
let region_id = RegionId::new(1024, 1);
let builder = CreateRequestBuilder::new();

View File

@@ -29,7 +29,6 @@ pub(crate) use object_store::config::ObjectStoreConfig;
use query::options::QueryOptions;
use serde::{Deserialize, Serialize};
use servers::grpc::GrpcOptions;
use servers::heartbeat_options::HeartbeatOptions;
use servers::http::HttpOptions;
/// Storage engine config
@@ -71,7 +70,6 @@ pub struct DatanodeOptions {
pub init_regions_in_background: bool,
pub init_regions_parallelism: usize,
pub grpc: GrpcOptions,
pub heartbeat: HeartbeatOptions,
pub http: HttpOptions,
pub meta_client: Option<MetaClientOptions>,
pub wal: DatanodeWalConfig,
@@ -134,7 +132,6 @@ impl Default for DatanodeOptions {
RegionEngineConfig::File(FileEngineConfig::default()),
],
logging: LoggingOptions::default(),
heartbeat: HeartbeatOptions::datanode_default(),
enable_telemetry: true,
tracing: TracingOptions::default(),
query: QueryOptions::default(),

View File

@@ -201,6 +201,7 @@ pub enum Error {
ShutdownServer {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
source: servers::error::Error,
},
@@ -208,6 +209,7 @@ pub enum Error {
ShutdownInstance {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
source: BoxedError,
},

View File

@@ -22,7 +22,7 @@ use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, NodeInfo, Peer, RegionR
use common_base::Plugins;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::datanode::REGION_STATISTIC_KEY;
use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
use common_meta::distributed_time_constants::BASE_HEARTBEAT_INTERVAL;
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::suspend::SuspendHandler;
@@ -35,6 +35,7 @@ use common_stat::ResourceStatRef;
use common_telemetry::{debug, error, info, trace, warn};
use common_workload::DatanodeWorkloadType;
use meta_client::MetaClientRef;
use meta_client::client::heartbeat::HeartbeatConfig;
use meta_client::client::{HeartbeatSender, MetaClient};
use servers::addrs;
use snafu::{OptionExt as _, ResultExt};
@@ -61,7 +62,6 @@ pub struct HeartbeatTask {
running: Arc<AtomicBool>,
meta_client: MetaClientRef,
region_server: RegionServer,
interval: u64,
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
region_alive_keeper: Arc<RegionAliveKeeper>,
resource_stat: ResourceStatRef,
@@ -87,7 +87,7 @@ impl HeartbeatTask {
let region_alive_keeper = Arc::new(RegionAliveKeeper::new(
region_server.clone(),
countdown_task_handler_ext,
opts.heartbeat.interval.as_millis() as u64,
BASE_HEARTBEAT_INTERVAL,
));
let resp_handler_executor = Arc::new(HandlerGroupExecutor::new(vec![
region_alive_keeper.clone(),
@@ -109,7 +109,6 @@ impl HeartbeatTask {
running: Arc::new(AtomicBool::new(false)),
meta_client,
region_server,
interval: opts.heartbeat.interval.as_millis() as u64,
resp_handler_executor,
region_alive_keeper,
resource_stat,
@@ -123,9 +122,9 @@ impl HeartbeatTask {
mailbox: MailboxRef,
mut notify: Option<Arc<Notify>>,
quit_signal: Arc<Notify>,
) -> Result<HeartbeatSender> {
) -> Result<(HeartbeatSender, HeartbeatConfig)> {
let client_id = meta_client.id();
let (tx, mut rx) = meta_client.heartbeat().await.context(MetaClientInitSnafu)?;
let (tx, mut rx, config) = meta_client.heartbeat().await.context(MetaClientInitSnafu)?;
let mut last_received_lease = Instant::now();
@@ -175,7 +174,7 @@ impl HeartbeatTask {
quit_signal.notify_one();
info!("Heartbeat handling loop exit.");
});
Ok(tx)
Ok((tx, config))
}
async fn handle_response(
@@ -204,13 +203,9 @@ impl HeartbeatTask {
warn!("Heartbeat task started multiple times");
return Ok(());
}
let interval = self.interval;
let node_id = self.node_id;
let node_epoch = self.node_epoch;
let addr = &self.peer_addr;
info!(
"Starting heartbeat to Metasrv with interval {interval}. My node id is {node_id}, address is {addr}."
);
let meta_client = self.meta_client.clone();
let region_server_clone = self.region_server.clone();
@@ -222,7 +217,7 @@ impl HeartbeatTask {
let quit_signal = Arc::new(Notify::new());
let mut tx = Self::create_streams(
let (mut tx, config) = Self::create_streams(
&meta_client,
running.clone(),
handler_executor.clone(),
@@ -232,6 +227,17 @@ impl HeartbeatTask {
)
.await?;
let interval = config.interval.as_millis() as u64;
let mut retry_interval = config.retry_interval;
// Update RegionAliveKeeper with the interval from Metasrv
self.region_alive_keeper.update_heartbeat_interval(interval);
info!(
"Starting heartbeat to Metasrv with config: {}. My node id is {}, address is {}.",
config, node_id, addr
);
let self_peer = Some(Peer {
id: node_id,
addr: addr.clone(),
@@ -244,6 +250,7 @@ impl HeartbeatTask {
let total_cpu_millicores = self.resource_stat.get_total_cpu_millicores();
let total_memory_bytes = self.resource_stat.get_total_memory_bytes();
let resource_stat = self.resource_stat.clone();
let region_alive_keeper = self.region_alive_keeper.clone();
let gc_limiter = self
.region_server
.mito_engine()
@@ -363,20 +370,23 @@ impl HeartbeatTask {
)
.await
{
Ok(new_tx) => {
info!("Reconnected to metasrv");
Ok((new_tx, new_config)) => {
info!("Reconnected to metasrv, heartbeat config: {}", new_config);
tx = new_tx;
// Update retry_interval from new config
retry_interval = new_config.retry_interval;
// Update region_alive_keeper's heartbeat interval
region_alive_keeper.update_heartbeat_interval(
new_config.interval.as_millis() as u64,
);
// Triggers to send heartbeat immediately.
sleep.as_mut().reset(Instant::now());
}
Err(e) => {
// Before the META_LEASE_SECS expires,
// any retries are meaningless, it always reads the old meta leader address.
// Triggers to retry after META_KEEP_ALIVE_INTERVAL_SECS.
sleep.as_mut().reset(
Instant::now()
+ Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS),
);
// Triggers to retry after retry_interval from Metasrv config.
sleep.as_mut().reset(Instant::now() + retry_interval);
error!(e; "Failed to reconnect to metasrv!");
}
}

View File

@@ -22,6 +22,7 @@ use common_telemetry::error;
use snafu::OptionExt;
use store_api::storage::GcReport;
mod apply_staging_manifest;
mod close_region;
mod downgrade_region;
mod enter_staging;
@@ -29,8 +30,10 @@ mod file_ref;
mod flush_region;
mod gc_worker;
mod open_region;
mod remap_manifest;
mod upgrade_region;
use crate::heartbeat::handler::apply_staging_manifest::ApplyStagingManifestsHandler;
use crate::heartbeat::handler::close_region::CloseRegionsHandler;
use crate::heartbeat::handler::downgrade_region::DowngradeRegionsHandler;
use crate::heartbeat::handler::enter_staging::EnterStagingRegionsHandler;
@@ -38,6 +41,7 @@ use crate::heartbeat::handler::file_ref::GetFileRefsHandler;
use crate::heartbeat::handler::flush_region::FlushRegionsHandler;
use crate::heartbeat::handler::gc_worker::GcRegionsHandler;
use crate::heartbeat::handler::open_region::OpenRegionsHandler;
use crate::heartbeat::handler::remap_manifest::RemapManifestHandler;
use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler;
use crate::heartbeat::task_tracker::TaskTracker;
use crate::region_server::RegionServer;
@@ -128,6 +132,10 @@ impl RegionHeartbeatResponseHandler {
Instruction::EnterStagingRegions(_) => {
Ok(Some(Box::new(EnterStagingRegionsHandler.into())))
}
Instruction::RemapManifest(_) => Ok(Some(Box::new(RemapManifestHandler.into()))),
Instruction::ApplyStagingManifests(_) => {
Ok(Some(Box::new(ApplyStagingManifestsHandler.into())))
}
}
}
}
@@ -142,6 +150,8 @@ pub enum InstructionHandlers {
GetFileRefs(GetFileRefsHandler),
GcRegions(GcRegionsHandler),
EnterStagingRegions(EnterStagingRegionsHandler),
RemapManifest(RemapManifestHandler),
ApplyStagingManifests(ApplyStagingManifestsHandler),
}
macro_rules! impl_from_handler {
@@ -164,7 +174,9 @@ impl_from_handler!(
UpgradeRegionsHandler => UpgradeRegions,
GetFileRefsHandler => GetFileRefs,
GcRegionsHandler => GcRegions,
EnterStagingRegionsHandler => EnterStagingRegions
EnterStagingRegionsHandler => EnterStagingRegions,
RemapManifestHandler => RemapManifest,
ApplyStagingManifestsHandler => ApplyStagingManifests
);
macro_rules! dispatch_instr {
@@ -209,7 +221,9 @@ dispatch_instr!(
UpgradeRegions => UpgradeRegions,
GetFileRefs => GetFileRefs,
GcRegions => GcRegions,
EnterStagingRegions => EnterStagingRegions
EnterStagingRegions => EnterStagingRegions,
RemapManifest => RemapManifest,
ApplyStagingManifests => ApplyStagingManifests,
);
#[async_trait]

View File

@@ -0,0 +1,287 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::instruction::{
ApplyStagingManifest, ApplyStagingManifestReply, ApplyStagingManifestsReply, InstructionReply,
};
use common_telemetry::{error, warn};
use futures::future::join_all;
use store_api::region_request::{ApplyStagingManifestRequest, RegionRequest};
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
pub struct ApplyStagingManifestsHandler;
#[async_trait::async_trait]
impl InstructionHandler for ApplyStagingManifestsHandler {
type Instruction = Vec<ApplyStagingManifest>;
async fn handle(
&self,
ctx: &HandlerContext,
requests: Self::Instruction,
) -> Option<InstructionReply> {
let results = join_all(
requests
.into_iter()
.map(|request| Self::handle_apply_staging_manifest(ctx, request)),
)
.await;
Some(InstructionReply::ApplyStagingManifests(
ApplyStagingManifestsReply::new(results),
))
}
}
impl ApplyStagingManifestsHandler {
async fn handle_apply_staging_manifest(
ctx: &HandlerContext,
request: ApplyStagingManifest,
) -> ApplyStagingManifestReply {
let Some(leader) = ctx.region_server.is_region_leader(request.region_id) else {
warn!("Region: {} is not found", request.region_id);
return ApplyStagingManifestReply {
region_id: request.region_id,
exists: false,
ready: false,
error: None,
};
};
if !leader {
warn!("Region: {} is not leader", request.region_id);
return ApplyStagingManifestReply {
region_id: request.region_id,
exists: true,
ready: false,
error: Some("Region is not leader".into()),
};
}
match ctx
.region_server
.handle_request(
request.region_id,
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
partition_expr: request.partition_expr,
central_region_id: request.central_region_id,
manifest_path: request.manifest_path,
}),
)
.await
{
Ok(_) => ApplyStagingManifestReply {
region_id: request.region_id,
exists: true,
ready: true,
error: None,
},
Err(err) => {
error!(err; "Failed to apply staging manifest");
ApplyStagingManifestReply {
region_id: request.region_id,
exists: true,
ready: false,
error: Some(format!("{err:?}")),
}
}
}
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use common_meta::instruction::RemapManifest;
use datatypes::value::Value;
use mito2::config::MitoConfig;
use mito2::engine::MITO_ENGINE_NAME;
use mito2::test_util::{CreateRequestBuilder, TestEnv};
use partition::expr::{PartitionExpr, col};
use store_api::path_utils::table_dir;
use store_api::region_engine::RegionRole;
use store_api::region_request::EnterStagingRequest;
use store_api::storage::RegionId;
use super::*;
use crate::heartbeat::handler::remap_manifest::RemapManifestHandler;
use crate::region_server::RegionServer;
use crate::tests::{MockRegionEngine, mock_region_server};
#[tokio::test]
async fn test_region_not_exist() {
let mut mock_region_server = mock_region_server();
let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
mock_region_server.register_engine(mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let region_id = RegionId::new(1024, 1);
let reply = ApplyStagingManifestsHandler
.handle(
&handler_context,
vec![ApplyStagingManifest {
region_id,
partition_expr: "".to_string(),
central_region_id: RegionId::new(1024, 9999), // use a dummy value
manifest_path: "".to_string(),
}],
)
.await
.unwrap();
let replies = reply.expect_apply_staging_manifests_reply();
let reply = &replies[0];
assert!(!reply.exists);
assert!(!reply.ready);
assert!(reply.error.is_none());
}
#[tokio::test]
async fn test_region_not_leader() {
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let (mock_engine, _) =
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
region_engine.mock_role = Some(Some(RegionRole::Follower));
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let region_id = RegionId::new(1024, 1);
let reply = ApplyStagingManifestsHandler
.handle(
&handler_context,
vec![ApplyStagingManifest {
region_id,
partition_expr: "".to_string(),
central_region_id: RegionId::new(1024, 2),
manifest_path: "".to_string(),
}],
)
.await
.unwrap();
let replies = reply.expect_apply_staging_manifests_reply();
let reply = &replies[0];
assert!(reply.exists);
assert!(!reply.ready);
assert!(reply.error.is_some());
}
fn range_expr(col_name: &str, start: i64, end: i64) -> PartitionExpr {
col(col_name)
.gt_eq(Value::Int64(start))
.and(col(col_name).lt(Value::Int64(end)))
}
async fn prepare_region(region_server: &RegionServer) {
let region_specs = [
(RegionId::new(1024, 1), range_expr("x", 0, 49)),
(RegionId::new(1024, 2), range_expr("x", 49, 100)),
];
for (region_id, partition_expr) in region_specs {
let builder = CreateRequestBuilder::new();
let mut create_req = builder.build();
create_req.table_dir = table_dir("test", 1024);
region_server
.handle_request(region_id, RegionRequest::Create(create_req))
.await
.unwrap();
region_server
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: partition_expr.as_json_str().unwrap(),
}),
)
.await
.unwrap();
}
}
#[tokio::test]
async fn test_apply_staging_manifest() {
common_telemetry::init_default_ut_logging();
let mut region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let mut engine_env = TestEnv::new().await;
let engine = engine_env.create_engine(MitoConfig::default()).await;
region_server.register_engine(Arc::new(engine.clone()));
prepare_region(&region_server).await;
let handler_context = HandlerContext::new_for_test(region_server);
let region_id2 = RegionId::new(1024, 2);
let reply = RemapManifestHandler
.handle(
&handler_context,
RemapManifest {
region_id,
input_regions: vec![region_id, region_id2],
region_mapping: HashMap::from([
// [0,49) <- [0, 50)
(region_id, vec![region_id]),
// [49, 100) <- [0, 50), [50,100)
(region_id2, vec![region_id, region_id2]),
]),
new_partition_exprs: HashMap::from([
(region_id, range_expr("x", 0, 49).as_json_str().unwrap()),
(region_id2, range_expr("x", 49, 100).as_json_str().unwrap()),
]),
},
)
.await
.unwrap();
let reply = reply.expect_remap_manifest_reply();
assert!(reply.exists);
assert!(reply.error.is_none(), "{}", reply.error.unwrap());
assert_eq!(reply.manifest_paths.len(), 2);
let manifest_path_1 = reply.manifest_paths[&region_id].clone();
let manifest_path_2 = reply.manifest_paths[&region_id2].clone();
let reply = ApplyStagingManifestsHandler
.handle(
&handler_context,
vec![ApplyStagingManifest {
region_id,
partition_expr: range_expr("x", 0, 49).as_json_str().unwrap(),
central_region_id: region_id,
manifest_path: manifest_path_1,
}],
)
.await
.unwrap();
let replies = reply.expect_apply_staging_manifests_reply();
let reply = &replies[0];
assert!(reply.exists);
assert!(reply.ready);
assert!(reply.error.is_none());
// partition expr mismatch
let reply = ApplyStagingManifestsHandler
.handle(
&handler_context,
vec![ApplyStagingManifest {
region_id: region_id2,
partition_expr: range_expr("x", 50, 100).as_json_str().unwrap(),
central_region_id: region_id,
manifest_path: manifest_path_2,
}],
)
.await
.unwrap();
let replies = reply.expect_apply_staging_manifests_reply();
let reply = &replies[0];
assert!(reply.exists);
assert!(!reply.ready);
assert!(reply.error.is_some());
}
}

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use common_meta::instruction::{InstructionReply, OpenRegion, SimpleReply};
use common_meta::wal_options_allocator::prepare_wal_options;
use common_meta::wal_provider::prepare_wal_options;
use store_api::path_utils::table_dir;
use store_api::region_request::{PathType, RegionOpenRequest};
use store_api::storage::RegionId;

View File

@@ -0,0 +1,246 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::instruction::{InstructionReply, RemapManifest, RemapManifestReply};
use common_telemetry::warn;
use store_api::region_engine::RemapManifestsRequest;
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
pub struct RemapManifestHandler;
#[async_trait::async_trait]
impl InstructionHandler for RemapManifestHandler {
type Instruction = RemapManifest;
async fn handle(
&self,
ctx: &HandlerContext,
request: Self::Instruction,
) -> Option<InstructionReply> {
let RemapManifest {
region_id,
input_regions,
region_mapping,
new_partition_exprs,
} = request;
let Some(leader) = ctx.region_server.is_region_leader(region_id) else {
warn!("Region: {} is not found", region_id);
return Some(InstructionReply::RemapManifest(RemapManifestReply {
exists: false,
manifest_paths: Default::default(),
error: None,
}));
};
if !leader {
warn!("Region: {} is not leader", region_id);
return Some(InstructionReply::RemapManifest(RemapManifestReply {
exists: true,
manifest_paths: Default::default(),
error: Some("Region is not leader".into()),
}));
}
let reply = match ctx
.region_server
.remap_manifests(RemapManifestsRequest {
region_id,
input_regions,
region_mapping,
new_partition_exprs,
})
.await
{
Ok(result) => InstructionReply::RemapManifest(RemapManifestReply {
exists: true,
manifest_paths: result.manifest_paths,
error: None,
}),
Err(e) => InstructionReply::RemapManifest(RemapManifestReply {
exists: true,
manifest_paths: Default::default(),
error: Some(format!("{e:?}")),
}),
};
Some(reply)
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use common_meta::instruction::RemapManifest;
use datatypes::value::Value;
use mito2::config::MitoConfig;
use mito2::engine::MITO_ENGINE_NAME;
use mito2::test_util::{CreateRequestBuilder, TestEnv};
use partition::expr::{PartitionExpr, col};
use store_api::path_utils::table_dir;
use store_api::region_engine::RegionRole;
use store_api::region_request::{EnterStagingRequest, RegionRequest};
use store_api::storage::RegionId;
use crate::heartbeat::handler::remap_manifest::RemapManifestHandler;
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
use crate::region_server::RegionServer;
use crate::tests::{MockRegionEngine, mock_region_server};
#[tokio::test]
async fn test_region_not_exist() {
let mut mock_region_server = mock_region_server();
let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
mock_region_server.register_engine(mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let region_id = RegionId::new(1024, 1);
let reply = RemapManifestHandler
.handle(
&handler_context,
RemapManifest {
region_id,
input_regions: vec![],
region_mapping: HashMap::new(),
new_partition_exprs: HashMap::new(),
},
)
.await
.unwrap();
let reply = &reply.expect_remap_manifest_reply();
assert!(!reply.exists);
assert!(reply.error.is_none());
assert!(reply.manifest_paths.is_empty());
}
#[tokio::test]
async fn test_region_not_leader() {
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let (mock_engine, _) =
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
region_engine.mock_role = Some(Some(RegionRole::Follower));
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let reply = RemapManifestHandler
.handle(
&handler_context,
RemapManifest {
region_id,
input_regions: vec![],
region_mapping: HashMap::new(),
new_partition_exprs: HashMap::new(),
},
)
.await
.unwrap();
let reply = reply.expect_remap_manifest_reply();
assert!(reply.exists);
assert!(reply.error.is_some());
}
fn range_expr(col_name: &str, start: i64, end: i64) -> PartitionExpr {
col(col_name)
.gt_eq(Value::Int64(start))
.and(col(col_name).lt(Value::Int64(end)))
}
async fn prepare_region(region_server: &RegionServer) {
let region_specs = [
(RegionId::new(1024, 1), range_expr("x", 0, 50)),
(RegionId::new(1024, 2), range_expr("x", 50, 100)),
];
for (region_id, partition_expr) in region_specs {
let builder = CreateRequestBuilder::new();
let mut create_req = builder.build();
create_req.table_dir = table_dir("test", 1024);
region_server
.handle_request(region_id, RegionRequest::Create(create_req))
.await
.unwrap();
region_server
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: partition_expr.as_json_str().unwrap(),
}),
)
.await
.unwrap();
}
}
#[tokio::test]
async fn test_remap_manifest() {
common_telemetry::init_default_ut_logging();
let mut region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let mut engine_env = TestEnv::new().await;
let engine = engine_env.create_engine(MitoConfig::default()).await;
region_server.register_engine(Arc::new(engine.clone()));
prepare_region(&region_server).await;
let handler_context = HandlerContext::new_for_test(region_server);
let region_id2 = RegionId::new(1024, 2);
let reply = RemapManifestHandler
.handle(
&handler_context,
RemapManifest {
region_id,
input_regions: vec![region_id, region_id2],
region_mapping: HashMap::from([
(region_id, vec![region_id]),
(region_id2, vec![region_id]),
]),
new_partition_exprs: HashMap::from([(
region_id,
range_expr("x", 0, 100).as_json_str().unwrap(),
)]),
},
)
.await
.unwrap();
let reply = reply.expect_remap_manifest_reply();
assert!(reply.exists);
assert!(reply.error.is_none(), "{}", reply.error.unwrap());
assert_eq!(reply.manifest_paths.len(), 1);
// Remap failed
let reply = RemapManifestHandler
.handle(
&handler_context,
RemapManifest {
region_id,
input_regions: vec![region_id],
region_mapping: HashMap::from([
(region_id, vec![region_id]),
(region_id2, vec![region_id]),
]),
new_partition_exprs: HashMap::from([(
region_id,
range_expr("x", 0, 100).as_json_str().unwrap(),
)]),
},
)
.await
.unwrap();
let reply = reply.expect_remap_manifest_reply();
assert!(reply.exists);
assert!(reply.error.is_some());
assert!(reply.manifest_paths.is_empty());
}
}

View File

@@ -65,8 +65,9 @@ use store_api::metric_engine_consts::{
FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
};
use store_api::region_engine::{
RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, SetRegionRoleStateResponse,
SettableRegionRoleState, SyncRegionFromRequest,
RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, RemapManifestsRequest,
RemapManifestsResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
SyncRegionFromRequest,
};
use store_api::region_request::{
AffectedRows, BatchRegionDdlRequest, RegionCatchupRequest, RegionCloseRequest,
@@ -604,6 +605,25 @@ impl RegionServer {
.await
}
/// Remaps manifests from old regions to new regions.
pub async fn remap_manifests(
&self,
request: RemapManifestsRequest,
) -> Result<RemapManifestsResponse> {
let region_id = request.region_id;
let engine_with_status = self
.inner
.region_map
.get(&region_id)
.with_context(|| RegionNotFoundSnafu { region_id })?;
engine_with_status
.engine()
.remap_manifests(request)
.await
.with_context(|_| HandleRegionRequestSnafu { region_id })
}
fn is_suspended(&self) -> bool {
self.suspend.load(Ordering::Relaxed)
}
@@ -1621,7 +1641,10 @@ mod tests {
let response = mock_region_server
.handle_request(
region_id,
RegionRequest::Drop(RegionDropRequest { fast_path: false }),
RegionRequest::Drop(RegionDropRequest {
fast_path: false,
force: false,
}),
)
.await
.unwrap();
@@ -1719,7 +1742,10 @@ mod tests {
mock_region_server
.handle_request(
region_id,
RegionRequest::Drop(RegionDropRequest { fast_path: false }),
RegionRequest::Drop(RegionDropRequest {
fast_path: false,
force: false,
}),
)
.await
.unwrap_err();

View File

@@ -18,7 +18,7 @@ use common_meta::DatanodeId;
use common_meta::key::datanode_table::DatanodeTableManager;
use common_meta::key::topic_region::{TopicRegionKey, TopicRegionManager, TopicRegionValue};
use common_meta::kv_backend::KvBackendRef;
use common_meta::wal_options_allocator::{extract_topic_from_wal_options, prepare_wal_options};
use common_meta::wal_provider::{extract_topic_from_wal_options, prepare_wal_options};
use futures::TryStreamExt;
use snafu::ResultExt;
use store_api::path_utils::table_dir;

View File

@@ -15,9 +15,10 @@
use arrow::array::{ArrayRef, AsArray};
use arrow::datatypes::{
DataType, DurationMicrosecondType, DurationMillisecondType, DurationNanosecondType,
DurationSecondType, Time32MillisecondType, Time32SecondType, Time64MicrosecondType,
Time64NanosecondType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType,
TimestampNanosecondType, TimestampSecondType,
DurationSecondType, Int8Type, Int16Type, Int32Type, Int64Type, Time32MillisecondType,
Time32SecondType, Time64MicrosecondType, Time64NanosecondType, TimeUnit,
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
};
use arrow_array::Array;
use common_time::time::Time;
@@ -152,3 +153,62 @@ pub fn string_array_value_at_index(array: &ArrayRef, i: usize) -> Option<&str> {
_ => None,
}
}
/// Get the integer value (`i64`) at index `i` for any integer array.
///
/// Returns `None` when:
///
/// - the array type is not an integer type;
/// - the value is larger than `i64::MAX`;
/// - the value is null.
///
/// # Panics
///
/// If index `i` is out of bounds.
pub fn int_array_value_at_index(array: &ArrayRef, i: usize) -> Option<i64> {
match array.data_type() {
DataType::Int8 => {
let array = array.as_primitive::<Int8Type>();
array.is_valid(i).then(|| array.value(i) as i64)
}
DataType::Int16 => {
let array = array.as_primitive::<Int16Type>();
array.is_valid(i).then(|| array.value(i) as i64)
}
DataType::Int32 => {
let array = array.as_primitive::<Int32Type>();
array.is_valid(i).then(|| array.value(i) as i64)
}
DataType::Int64 => {
let array = array.as_primitive::<Int64Type>();
array.is_valid(i).then(|| array.value(i))
}
DataType::UInt8 => {
let array = array.as_primitive::<UInt8Type>();
array.is_valid(i).then(|| array.value(i) as i64)
}
DataType::UInt16 => {
let array = array.as_primitive::<UInt16Type>();
array.is_valid(i).then(|| array.value(i) as i64)
}
DataType::UInt32 => {
let array = array.as_primitive::<UInt32Type>();
array.is_valid(i).then(|| array.value(i) as i64)
}
DataType::UInt64 => {
let array = array.as_primitive::<UInt64Type>();
array
.is_valid(i)
.then(|| {
let i = array.value(i);
if i <= i64::MAX as u64 {
Some(i as i64)
} else {
None
}
})
.flatten()
}
_ => None,
}
}

View File

@@ -816,7 +816,7 @@ mod tests {
let result = encode_by_struct(&json_struct, json);
assert_eq!(
result.unwrap_err().to_string(),
"Cannot cast value bar to Number(I64)"
r#"Cannot cast value bar to "<Number>""#
);
let json = json!({

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use std::collections::BTreeMap;
use std::fmt::{Display, Formatter};
use std::fmt::{Debug, Display, Formatter};
use std::str::FromStr;
use std::sync::Arc;
@@ -134,24 +134,24 @@ impl From<&ConcreteDataType> for JsonNativeType {
impl Display for JsonNativeType {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
JsonNativeType::Null => write!(f, "Null"),
JsonNativeType::Bool => write!(f, "Bool"),
JsonNativeType::Number(t) => {
write!(f, "Number({t:?})")
JsonNativeType::Null => write!(f, r#""<Null>""#),
JsonNativeType::Bool => write!(f, r#""<Bool>""#),
JsonNativeType::Number(_) => {
write!(f, r#""<Number>""#)
}
JsonNativeType::String => write!(f, "String"),
JsonNativeType::String => write!(f, r#""<String>""#),
JsonNativeType::Array(item_type) => {
write!(f, "Array[{}]", item_type)
write!(f, "[{}]", item_type)
}
JsonNativeType::Object(object) => {
write!(
f,
"Object{{{}}}",
"{{{}}}",
object
.iter()
.map(|(k, v)| format!(r#""{k}": {v}"#))
.map(|(k, v)| format!(r#""{k}":{v}"#))
.collect::<Vec<_>>()
.join(", ")
.join(",")
)
}
}
@@ -183,7 +183,11 @@ impl JsonType {
}
}
pub(crate) fn native_type(&self) -> &JsonNativeType {
pub fn is_native_type(&self) -> bool {
matches!(self.format, JsonFormat::Native(_))
}
pub fn native_type(&self) -> &JsonNativeType {
match &self.format {
JsonFormat::Jsonb => &JsonNativeType::String,
JsonFormat::Native(x) => x.as_ref(),
@@ -650,15 +654,16 @@ mod tests {
"list": [1, 2, 3],
"object": {"a": 1}
}"#;
let expected = r#"Json<Object{"hello": String, "list": Array[Number(I64)], "object": Object{"a": Number(I64)}}>"#;
let expected =
r#"Json<{"hello":"<String>","list":["<Number>"],"object":{"a":"<Number>"}}>"#;
test(json, json_type, Ok(expected))?;
// cannot merge with other non-object json values:
let jsons = [r#""s""#, "1", "[1]"];
let expects = [
r#"Failed to merge JSON datatype: datatypes have conflict, this: Object{"hello": String, "list": Array[Number(I64)], "object": Object{"a": Number(I64)}}, that: String"#,
r#"Failed to merge JSON datatype: datatypes have conflict, this: Object{"hello": String, "list": Array[Number(I64)], "object": Object{"a": Number(I64)}}, that: Number(I64)"#,
r#"Failed to merge JSON datatype: datatypes have conflict, this: Object{"hello": String, "list": Array[Number(I64)], "object": Object{"a": Number(I64)}}, that: Array[Number(I64)]"#,
r#"Failed to merge JSON datatype: datatypes have conflict, this: {"hello":"<String>","list":["<Number>"],"object":{"a":"<Number>"}}, that: "<String>""#,
r#"Failed to merge JSON datatype: datatypes have conflict, this: {"hello":"<String>","list":["<Number>"],"object":{"a":"<Number>"}}, that: "<Number>""#,
r#"Failed to merge JSON datatype: datatypes have conflict, this: {"hello":"<String>","list":["<Number>"],"object":{"a":"<Number>"}}, that: ["<Number>"]"#,
];
for (json, expect) in jsons.into_iter().zip(expects.into_iter()) {
test(json, json_type, Err(expect))?;
@@ -670,7 +675,7 @@ mod tests {
"float": 0.123,
"no": 42
}"#;
let expected = r#"Failed to merge JSON datatype: datatypes have conflict, this: String, that: Number(I64)"#;
let expected = r#"Failed to merge JSON datatype: datatypes have conflict, this: "<String>", that: "<Number>""#;
test(json, json_type, Err(expected))?;
// can merge with another json object:
@@ -679,7 +684,7 @@ mod tests {
"float": 0.123,
"int": 42
}"#;
let expected = r#"Json<Object{"float": Number(F64), "hello": String, "int": Number(I64), "list": Array[Number(I64)], "object": Object{"a": Number(I64)}}>"#;
let expected = r#"Json<{"float":"<Number>","hello":"<String>","int":"<Number>","list":["<Number>"],"object":{"a":"<Number>"}}>"#;
test(json, json_type, Ok(expected))?;
// can merge with some complex nested json object:
@@ -689,7 +694,7 @@ mod tests {
"float": 0.456,
"int": 0
}"#;
let expected = r#"Json<Object{"float": Number(F64), "hello": String, "int": Number(I64), "list": Array[Number(I64)], "object": Object{"a": Number(I64), "foo": String, "l": Array[String], "o": Object{"key": String}}}>"#;
let expected = r#"Json<{"float":"<Number>","hello":"<String>","int":"<Number>","list":["<Number>"],"object":{"a":"<Number>","foo":"<String>","l":["<String>"],"o":{"key":"<String>"}}}>"#;
test(json, json_type, Ok(expected))?;
Ok(())

View File

@@ -321,10 +321,10 @@ mod tests {
Ok(()),
Ok(()),
Err(
"Failed to merge JSON datatype: datatypes have conflict, this: Number(I64), that: String",
r#"Failed to merge JSON datatype: datatypes have conflict, this: "<Number>", that: "<String>""#,
),
Err(
"Failed to merge JSON datatype: datatypes have conflict, this: Number(I64), that: Array[Bool]",
r#"Failed to merge JSON datatype: datatypes have conflict, this: "<Number>", that: ["<Bool>"]"#,
),
];
let mut builder = JsonVectorBuilder::new(JsonNativeType::Null, 1);
@@ -396,12 +396,12 @@ mod tests {
// test children builders:
assert_eq!(builder.builders.len(), 6);
let expect_types = [
r#"Json<Object{"list": Array[Number(I64)], "s": String}>"#,
r#"Json<Object{"float": Number(F64), "s": String}>"#,
r#"Json<Object{"float": Number(F64), "int": Number(I64)}>"#,
r#"Json<Object{"int": Number(I64), "object": Object{"hello": String, "timestamp": Number(I64)}}>"#,
r#"Json<Object{"nested": Object{"a": Object{"b": Object{"b": Object{"a": String}}}}, "object": Object{"timestamp": Number(I64)}}>"#,
r#"Json<Object{"nested": Object{"a": Object{"b": Object{"a": Object{"b": String}}}}, "object": Object{"timestamp": Number(I64)}}>"#,
r#"Json<{"list":["<Number>"],"s":"<String>"}>"#,
r#"Json<{"float":"<Number>","s":"<String>"}>"#,
r#"Json<{"float":"<Number>","int":"<Number>"}>"#,
r#"Json<{"int":"<Number>","object":{"hello":"<String>","timestamp":"<Number>"}}>"#,
r#"Json<{"nested":{"a":{"b":{"b":{"a":"<String>"}}}},"object":{"timestamp":"<Number>"}}>"#,
r#"Json<{"nested":{"a":{"b":{"a":{"b":"<String>"}}}},"object":{"timestamp":"<Number>"}}>"#,
];
let expect_vectors = [
r#"
@@ -456,7 +456,7 @@ mod tests {
}
// test final merged json type:
let expected = r#"Json<Object{"float": Number(F64), "int": Number(I64), "list": Array[Number(I64)], "nested": Object{"a": Object{"b": Object{"a": Object{"b": String}, "b": Object{"a": String}}}}, "object": Object{"hello": String, "timestamp": Number(I64)}, "s": String}>"#;
let expected = r#"Json<{"float":"<Number>","int":"<Number>","list":["<Number>"],"nested":{"a":{"b":{"a":{"b":"<String>"},"b":{"a":"<String>"}}}},"object":{"hello":"<String>","timestamp":"<Number>"},"s":"<String>"}>"#;
assert_eq!(builder.data_type().to_string(), expected);
// test final produced vector:

View File

@@ -79,7 +79,7 @@ tokio.workspace = true
tonic.workspace = true
[dev-dependencies]
catalog.workspace = true
catalog = { workspace = true, features = ["testing"] }
common-catalog.workspace = true
pretty_assertions.workspace = true
prost.workspace = true

View File

@@ -39,7 +39,6 @@ use query::QueryEngine;
use query::options::QueryOptions;
use serde::{Deserialize, Serialize};
use servers::grpc::GrpcOptions;
use servers::heartbeat_options::HeartbeatOptions;
use servers::http::HttpOptions;
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt, ensure};
@@ -111,7 +110,6 @@ pub struct FlownodeOptions {
pub meta_client: Option<MetaClientOptions>,
pub logging: LoggingOptions,
pub tracing: TracingOptions,
pub heartbeat: HeartbeatOptions,
pub query: QueryOptions,
pub user_provider: Option<String>,
pub memory: MemoryOptions,
@@ -127,7 +125,6 @@ impl Default for FlownodeOptions {
meta_client: None,
logging: LoggingOptions::default(),
tracing: TracingOptions::default(),
heartbeat: HeartbeatOptions::default(),
// flownode's query option is set to 1 to throttle flow's query so
// that it won't use too much cpu or memory
query: QueryOptions {

View File

@@ -24,7 +24,7 @@ use super::*;
pub fn new_test_table_info_with_name<I: IntoIterator<Item = u32>>(
table_id: TableId,
table_name: &str,
region_numbers: I,
_region_numbers: I,
) -> TableInfo {
let column_schemas = vec![
ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true),
@@ -46,7 +46,6 @@ pub fn new_test_table_info_with_name<I: IntoIterator<Item = u32>>(
.primary_key_indices(vec![0])
.engine("engine")
.next_column_id(3)
.region_numbers(region_numbers.into_iter().collect::<Vec<_>>())
.build()
.unwrap();
TableInfoBuilder::default()

View File

@@ -30,7 +30,6 @@ use common_telemetry::{debug, error, info, warn};
use greptime_proto::v1::meta::NodeInfo;
use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
use servers::addrs;
use servers::heartbeat_options::HeartbeatOptions;
use snafu::ResultExt;
use tokio::sync::mpsc;
use tokio::time::Duration;
@@ -64,8 +63,6 @@ pub struct HeartbeatTask {
node_epoch: u64,
peer_addr: String,
meta_client: Arc<MetaClient>,
report_interval: Duration,
retry_interval: Duration,
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
running: Arc<AtomicBool>,
query_stat_size: Option<SizeReportSender>,
@@ -81,7 +78,6 @@ impl HeartbeatTask {
pub fn new(
opts: &FlownodeOptions,
meta_client: Arc<MetaClient>,
heartbeat_opts: HeartbeatOptions,
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
resource_stat: ResourceStatRef,
) -> Self {
@@ -90,8 +86,6 @@ impl HeartbeatTask {
node_epoch: common_time::util::current_time_millis() as u64,
peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
meta_client,
report_interval: heartbeat_opts.interval,
retry_interval: heartbeat_opts.retry_interval,
resp_handler_executor,
running: Arc::new(AtomicBool::new(false)),
query_stat_size: None,
@@ -113,22 +107,26 @@ impl HeartbeatTask {
}
async fn create_streams(&self) -> Result<(), Error> {
info!("Start to establish the heartbeat connection to metasrv.");
let (req_sender, resp_stream) = self
info!("Establishing heartbeat connection to Metasrv...");
let (req_sender, resp_stream, config) = self
.meta_client
.heartbeat()
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
info!("Flownode's heartbeat connection has been established with metasrv");
info!(
"Heartbeat started for flownode {}, Metasrv config: {}",
self.node_id, config
);
let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
self.start_handle_resp_stream(resp_stream, mailbox);
self.start_handle_resp_stream(resp_stream, mailbox, config.retry_interval);
self.start_heartbeat_report(req_sender, outgoing_rx);
self.start_heartbeat_report(req_sender, outgoing_rx, config.interval);
Ok(())
}
@@ -217,8 +215,8 @@ impl HeartbeatTask {
&self,
req_sender: HeartbeatSender,
mut outgoing_rx: mpsc::Receiver<OutgoingMessage>,
report_interval: Duration,
) {
let report_interval = self.report_interval;
let node_epoch = self.node_epoch;
let self_peer = Some(Peer {
id: self.node_id,
@@ -277,9 +275,13 @@ impl HeartbeatTask {
});
}
fn start_handle_resp_stream(&self, mut resp_stream: HeartbeatStream, mailbox: MailboxRef) {
fn start_handle_resp_stream(
&self,
mut resp_stream: HeartbeatStream,
mailbox: MailboxRef,
retry_interval: Duration,
) {
let capture_self = self.clone();
let retry_interval = self.retry_interval;
let _handle = common_runtime::spawn_hb(async move {
loop {

View File

@@ -25,7 +25,6 @@ use meta_client::MetaClientOptions;
use query::options::QueryOptions;
use serde::{Deserialize, Serialize};
use servers::grpc::GrpcOptions;
use servers::heartbeat_options::HeartbeatOptions;
use servers::http::HttpOptions;
use servers::server::ServerHandlers;
use snafu::ResultExt;
@@ -45,7 +44,6 @@ pub struct FrontendOptions {
pub node_id: Option<String>,
pub default_timezone: Option<String>,
pub default_column_prefix: Option<String>,
pub heartbeat: HeartbeatOptions,
/// Maximum total memory for all concurrent write request bodies and messages (HTTP, gRPC, Flight).
/// Set to 0 to disable the limit. Default: "0" (unlimited)
pub max_in_flight_write_bytes: ReadableSize,
@@ -82,7 +80,6 @@ impl Default for FrontendOptions {
node_id: None,
default_timezone: None,
default_column_prefix: None,
heartbeat: HeartbeatOptions::frontend_default(),
max_in_flight_write_bytes: ReadableSize(0),
write_bytes_exhausted_policy: OnExhaustedPolicy::default(),
http: HttpOptions::default(),
@@ -406,10 +403,6 @@ mod tests {
..Default::default()
},
meta_client: Some(meta_client_options.clone()),
heartbeat: HeartbeatOptions {
interval: Duration::from_secs(1),
..Default::default()
},
..Default::default()
};
@@ -419,7 +412,11 @@ mod tests {
let meta_client = create_meta_client(&meta_client_options, server.clone()).await;
let frontend = create_frontend(&options, meta_client).await?;
let frontend_heartbeat_interval = options.heartbeat.interval;
use common_meta::distributed_time_constants::{
BASE_HEARTBEAT_INTERVAL, frontend_heartbeat_interval,
};
let frontend_heartbeat_interval =
frontend_heartbeat_interval(BASE_HEARTBEAT_INTERVAL) + Duration::from_secs(1);
tokio::time::sleep(frontend_heartbeat_interval).await;
// initial state: not suspend:
assert!(!frontend.instance.is_suspended());

View File

@@ -42,8 +42,6 @@ use crate::metrics::{HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
pub struct HeartbeatTask {
peer_addr: String,
meta_client: Arc<MetaClient>,
report_interval: Duration,
retry_interval: Duration,
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
start_time_ms: u64,
resource_stat: ResourceStatRef,
@@ -66,8 +64,6 @@ impl HeartbeatTask {
addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr))
},
meta_client,
report_interval: opts.heartbeat.interval,
retry_interval: opts.heartbeat.retry_interval,
resp_handler_executor,
start_time_ms: common_time::util::current_time_millis() as u64,
resource_stat,
@@ -75,27 +71,31 @@ impl HeartbeatTask {
}
pub async fn start(&self) -> Result<()> {
let (req_sender, resp_stream) = self
let (req_sender, resp_stream, config) = self
.meta_client
.heartbeat()
.await
.context(error::CreateMetaHeartbeatStreamSnafu)?;
info!("A heartbeat connection has been established with metasrv");
info!("Heartbeat started with Metasrv config: {}", config);
let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
self.start_handle_resp_stream(resp_stream, mailbox);
self.start_handle_resp_stream(resp_stream, mailbox, config.retry_interval);
self.start_heartbeat_report(req_sender, outgoing_rx);
self.start_heartbeat_report(req_sender, outgoing_rx, config.interval);
Ok(())
}
fn start_handle_resp_stream(&self, mut resp_stream: HeartbeatStream, mailbox: MailboxRef) {
fn start_handle_resp_stream(
&self,
mut resp_stream: HeartbeatStream,
mailbox: MailboxRef,
retry_interval: Duration,
) {
let capture_self = self.clone();
let retry_interval = self.retry_interval;
let _handle = common_runtime::spawn_hb(async move {
loop {
@@ -190,8 +190,8 @@ impl HeartbeatTask {
&self,
req_sender: HeartbeatSender,
mut outgoing_rx: Receiver<OutgoingMessage>,
report_interval: Duration,
) {
let report_interval = self.report_interval;
let start_time_ms = self.start_time_ms;
let self_peer = Some(Peer {
// The node id will be actually calculated from its address (by hashing the address

View File

@@ -91,6 +91,7 @@ use sql::statements::tql::Tql;
use sqlparser::ast::ObjectName;
pub use standalone::StandaloneDatanodeManager;
use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM};
use tracing::Span;
use crate::error::{
self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, InvalidSqlSnafu,
@@ -508,6 +509,7 @@ fn attach_timeout(output: Output, mut timeout: Duration) -> Result<Output> {
stream: s,
output_ordering: None,
metrics: Default::default(),
span: Span::current(),
};
Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
}

View File

@@ -40,6 +40,7 @@ use servers::query_handler::{
};
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use tracing::instrument;
use crate::error::{
CatalogSnafu, ExecLogicalPlanSnafu, PromStoreRemoteQueryPlanSnafu, ReadTableSnafu, Result,
@@ -78,6 +79,7 @@ fn negotiate_response_type(accepted_response_types: &[i32]) -> ServerResult<Resp
Ok(ResponseType::try_from(*response_type).unwrap())
}
#[instrument(skip_all, fields(table_name))]
async fn to_query_result(table_name: &str, output: Output) -> ServerResult<QueryResult> {
let OutputData::Stream(stream) = output.data else {
unreachable!()
@@ -194,6 +196,7 @@ impl PromStoreProtocolHandler for Instance {
Ok(output)
}
#[instrument(skip_all, fields(table_name))]
async fn read(
&self,
request: ReadRequest,

View File

@@ -23,7 +23,7 @@ use common_telemetry::tracing;
use promql_parser::label::{MatchOp, Matcher, Matchers};
use query::promql;
use query::promql::planner::PromPlanner;
use servers::prom_store::{DATABASE_LABEL, SCHEMA_LABEL};
use servers::prom_store::is_database_selection_label;
use servers::prometheus;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
@@ -112,7 +112,7 @@ impl Instance {
let table_schema = matchers
.iter()
.find_map(|m| {
if (m.name == SCHEMA_LABEL || m.name == DATABASE_LABEL) && m.op == MatchOp::Equal {
if is_database_selection_label(&m.name) && m.op == MatchOp::Equal {
Some(m.value.clone())
} else {
None

View File

@@ -97,12 +97,16 @@ impl Datanode for RegionInvoker {
}
async fn handle_query(&self, request: QueryRequest) -> MetaResult<SendableRecordBatchStream> {
let region_id = request.region_id.to_string();
let span = request
.header
.as_ref()
.map(|h| TracingContext::from_w3c(&h.tracing_context))
.unwrap_or_default()
.attach(tracing::info_span!("RegionInvoker::handle_query"));
.attach(tracing::info_span!(
"RegionInvoker::handle_query",
region_id = region_id
));
self.region_server
.handle_read(request)
.trace(span)

View File

@@ -44,7 +44,7 @@ async fn run() {
// required only when the heartbeat_client is enabled
meta_client.ask_leader().await.unwrap();
let (sender, mut receiver) = meta_client.heartbeat().await.unwrap();
let (sender, mut receiver, _config) = meta_client.heartbeat().await.unwrap();
// send heartbeats
let _handle = tokio::spawn(async move {

Some files were not shown because too many files have changed in this diff Show More