Compare commits

..

36 Commits

Author SHA1 Message Date
Ning Sun
8853e08a7d chore: use pinned prost 0.14.1 2026-01-15 16:58:24 +08:00
Ning Sun
9cba14f904 chore: update otel librarires 2026-01-15 16:12:34 +08:00
Ning Sun
09ba24b7a9 chore: update otel-arrow 2026-01-15 15:46:40 +08:00
LFC
e64c31e59a chore: upgrade DataFusion family (#7558)
* chore: upgrade DataFusion family

Signed-off-by: luofucong <luofc@foxmail.com>

* use main proto

Signed-off-by: luofucong <luofc@foxmail.com>

* fix ci

Signed-off-by: luofucong <luofc@foxmail.com>

---------

Signed-off-by: luofucong <luofc@foxmail.com>
2026-01-14 14:02:31 +00:00
Ruihang Xia
a5cb0116a2 perf: avoid boundary checks on accessing array items (#7570)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2026-01-14 12:56:39 +00:00
Ruihang Xia
170f94fc08 feat: enable pruning for manipulate plans (#7565)
* feat: enable pruning for manipulate plans

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* apply to other plans and add sqlness case

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix scalar manipulate and histogram fold for missing some columns

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* don't drop every columns

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove unrelated part

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2026-01-14 08:32:51 +00:00
Yingwen
1c9aa59317 style: remove unused imports (#7567)
* style: remove unused imports

Signed-off-by: evenyag <realevenyag@gmail.com>

* style: import only in test

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
2026-01-14 07:59:40 +00:00
Weny Xu
ed171d9264 feat(cli): support RaftEngine store backend for metadata snapshot operations (#7467)
* feat(cli): support RaftEngine store backend for metadata snapshot operations

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: allow empty aksk

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2026-01-14 02:24:36 +00:00
Yingwen
4b3bd7317b feat: add per-partition convert, result cache metrics (#7539)
* fix: show convert cost in explain analyze verbose

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: increase puffin metadata cache metric

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: add result cache hit/miss to filter metrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: print flat format in debug

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: update sqlness test

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: make scan cost contains part/reader build cost

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: collect divider cost

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: remove unused field in ScannerMetrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: collect metadata read bytes

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: collect read metrics in get_parquet_meta_data

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
2026-01-13 09:17:09 +00:00
Weny Xu
279908984d fix: fix topic region mapping sync and handle region_not_found in migration (#7552)
* fix(meta): update topic region mapping during table route updates

Fix a bug in `build_create_txn` where the parameter order was incorrect
(`(topic, region_id)` -> `(region_id, topic)`), and add support for updating
topic region mappings during repartition operations.

- Add `build_update_txn` method to handle topic region mapping updates
- Integrate topic region update into `update_table_route` transaction
- Add WAL options merging and validation logic for repartition
- Update allocate/deallocate procedures to pass WAL options
- Add comprehensive tests for all scenarios

This ensures topic region mappings stay in sync with table routes during
repartition, preventing data inconsistencies.

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat(meta): handle region_not_found in region migration

Add support for detecting and handling regions that exist in migration
tasks but are no longer present in table routes (e.g., removed after
repartition). This prevents unnecessary retries and cleans up related
resources.

Changes:
- Add `region_not_found` field to `SubmitRegionMigrationTaskResult` and
  `RegionMigrationAnalysis` structs
- Update `analyze_region_migration_task` to detect regions missing from
  current table routes
- Deregister failure detectors for `region_not_found` regions in supervisor
- Change `table_regions()` return type from `HashMap<TableId, Vec<RegionId>>`
  to `HashMap<TableId, HashSet<RegionId>>` for better performance
- Add test cases for `region_not_found` handling

This fixes the issue where migration tasks would continue retrying on
regions that have been removed after repartition operations.

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: fix clippy

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2026-01-13 04:10:45 +00:00
dennis zhuang
a56a00224f feat: impl vector index scan in storage (#7528)
* feat: impl vector index scan in storage

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* feat: fallback to read remote blob when blob not found

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: refactor encoding and decoding and apply suggestions

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: license

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* test: add apply_with_k tests

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: apply suggestions

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: forgot to align nulls when the vector column is not in the batch

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* test: add test for vector column is not in a batch while buiilding

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

---------

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
2026-01-12 08:30:51 +00:00
discord9
6487f14f70 feat: gc schd update repart mapping (#7517)
* feat(gc): batch gc now alos handle routing

Signed-off-by: discord9 <discord9@163.com>

typo

Signed-off-by: discord9 <discord9@163.com>

s

Signed-off-by: discord9 <discord9@163.com>

feat: use batch gc procedure

Signed-off-by: discord9 <discord9@163.com>

feat: cross region refs

Signed-off-by: discord9 <discord9@163.com>

feat: clean up repartition

Signed-off-by: discord9 <discord9@163.com>

chore: cleanup

Signed-off-by: discord9 <discord9@163.com>

per review

Signed-off-by: discord9 <discord9@163.com>

test: update mock test

Signed-off-by: discord9 <discord9@163.com>

refactor: rm unused

Signed-off-by: discord9 <discord9@163.com>

refactor: invert related_regions

Signed-off-by: discord9 <discord9@163.com>

clippy

Signed-off-by: discord9 <discord9@163.com>

pcr

Signed-off-by: discord9 <discord9@163.com>

chore: remove unused

Signed-off-by: discord9 <discord9@163.com>

fix: after invert fix

Signed-off-by: discord9 <discord9@163.com>

chore: rm unused

Signed-off-by: discord9 <discord9@163.com>

refactor: eff

Signed-off-by: discord9 <discord9@163.com>

docs: chore

Signed-off-by: discord9 <discord9@163.com>

* after rebase fix

Signed-off-by: discord9 <discord9@163.com>

* chore

Signed-off-by: discord9 <discord9@163.com>

* pcr

Signed-off-by: discord9 <discord9@163.com>

* fix: mssing region

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
2026-01-12 08:28:34 +00:00
Ruihang Xia
45b4067721 feat: always canonicalize partition expr (#7553)
* feat: always canonicalize partition expr

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix ut assertion

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2026-01-12 07:24:29 +00:00
jeremyhi
0c373062c2 feat: make grpc can handle metric engine request (#7508)
Signed-off-by: jeremyhi <fengjiachun@gmail.com>
2026-01-09 09:42:27 +00:00
Weny Xu
567d3e66e9 feat: integrate repartition procedure into DdlManager (#7548)
* feat: add repartition procedure factory support to DdlManager

- Introduce RepartitionProcedureFactory trait for creating and registering
  repartition procedures
- Implement DefaultRepartitionProcedureFactory for metasrv with full support
- Implement StandaloneRepartitionProcedureFactory for standalone (unsupported)
- Add procedure loader registration for RepartitionProcedure and
  RepartitionGroupProcedure
- Add helper methods to TableMetadataAllocator for allocator access
- Add error types for repartition procedure operations
- Update DdlManager to accept and use RepartitionProcedureFactoryRef

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: integrate repartition procedure into DdlManager

- Add submit_repartition_task() to handle repartition from alter table
- Route Repartition operations in submit_alter_table_task() to repartition factory
- Refactor: rename submit_procedure() to execute_procedure_and_wait()
- Make all DDL operations wait for completion by default
- Add submit_procedure() for fire-and-forget submissions
- Add CreateRepartitionProcedure error type
- Add placeholder Repartition handling in grpc-expr (unsupported)
- Update greptime-proto dependency

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: implement ALTER TABLE REPARTITION procedure submission

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor(repartition): handle central region in apply staging manifest

- Introduce ApplyStagingManifestInstructions struct to organize instructions
- Add special handling for central region when applying staging manifests
- Transition state from UpdateMetadata to RepartitionEnd after applying staging manifests
- Remove next_state() method in RepartitionStart and inline state transitions
- Improve logging and expression serialization in DDL statement executor
- Move repartition tests from standalone to distributed test suite

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: update proto

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2026-01-09 08:37:21 +00:00
discord9
63284a5081 chore: sqlness fmt (#7551)
chore

Signed-off-by: discord9 <discord9@163.com>
2026-01-09 07:18:23 +00:00
Weny Xu
4891d7ceef fix: fix SQL table identifier quoting for election (#7541)
fix: fix SQL table identifier quoting for election and RDS kv-backend

- Quote MySQL table names with backticks and PostgreSQL tables with double quotes in election and RDS kv-backend SQL
- Update related tests to use quoted identifiers and cover hyphenated table names
- Ensure dynamic SQL using table names is safe for special characters in identifiers

Signed-off-by: WenyXu <wenymedia@gmail.com>
2026-01-09 03:15:53 +00:00
Weny Xu
aadfcd7821 feat(repartition): implement validation logic for repartition table (#7538)
* feat(repartition): implement validation logic for repartition_table

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: minor refactor

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: update sqlness

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2026-01-08 12:18:39 +00:00
Weny Xu
f3e2d333e4 feat(repartition): implement region allocation for repartition procedure (#7534)
* refactor: rename WalOptionsAllocator to WalProvider

The name "WalOptionsAllocator" was misleading because:
- For RaftEngine variant, it doesn't actually allocate anything
- The actual allocation logic lives in KafkaTopicPool

"WalProvider" better describes its role as providing WAL options
based on the configured WAL backend (RaftEngine or Kafka).

Changes:
- Rename `WalOptionsAllocator` to `WalProvider`
- Rename `WalOptionsAllocatorRef` to `WalProviderRef`
- Rename `build_wal_options_allocator` to `build_wal_provider`
- Rename module `wal_options_allocator` to `wal_provider`
- Rename error types: `BuildWalOptionsAllocator` -> `BuildWalProvider`,
  `StartWalOptionsAllocator` -> `StartWalProvider`

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor(meta): extract allocator traits from TableMetadataAllocator

Refactor TableMetadataAllocator to use trait-based dependency injection
for better testability and separation of concerns.

Changes:
- Add `ResourceIdAllocator` trait to abstract ID allocation
- Add `WalOptionsAllocator` trait to abstract WAL options allocation
- Implement traits for `Sequence` and `WalProvider`
- Remove duplicate `allocate_region_wal_options` function
- Rename `table_id_sequence` to `table_id_allocator` for consistency
- Rename `TableIdSequenceHandler` to `TableIdAllocatorHandler`

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat(meta): add max_region_number tracking to PhysicalTableRouteValue

Add `max_region_number` field to track the highest region number ever
allocated for a table. This value only increases when regions are added
and never decreases when regions are dropped, ensuring unique region
numbers across the table's lifetime.

Changes:
- Add `max_region_number` field to `PhysicalTableRouteValue`
- Implement custom `Deserialize` for backward compatibility
- Update `update_region_routes` to maintain max_region_number
- Calculate max_region_number from region_routes in `new()`

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: extract TableRouteAllocator trait from TableMetadataAllocator

- Add TableRouteAllocator trait for abstracting region route allocation
- Implement blanket impl for all PeerAllocator types
- Add PeerAllocator impl for Arc<T> to support trait object delegation
- Update TableMetadataAllocator to use TableRouteAllocatorRef

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: rename TableRouteAllocator to RegionRoutesAllocator

- Rename table_route.rs to region_routes.rs
- Rename TableRouteAllocator trait to RegionRoutesAllocator
- Rename wal_option.rs to wal_options.rs for consistency
- Update TableMetadataAllocator to use new naming

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat(meta-srv): implement region allocation for repartition procedure

This commit implements the region allocation phase of the repartition procedure,
which handles allocating new regions when a table needs to be split into more partitions.

Key changes:
- Refactor `RegionRoutesAllocator::allocate` to accept `(region_number, partition_expr)` tuples
  for more flexible region number assignment
- Simplify `AllocationPlanEntry` by removing `regions_to_allocate` and `regions_to_deallocate`
  fields (now derived from source/target counts)
- Add `convert_allocation_plan_to_repartition_plan` function to handle allocation, equal,
  and deallocation cases
- Fix `RepartitionPlanEntry::allocate_regions()` to return target regions (was incorrectly
  returning source regions)
- Implement complete `AllocateRegion` state with:
  - Region route allocation via `RegionRoutesAllocator`
  - WAL options allocation via `WalOptionsAllocator`
  - Operating region registration for concurrency control
  - Region creation on datanodes via `CreateTableExecutor`
  - Table route metadata update
- Add `TableRouteValue::max_region_number()` helper method
- Add comprehensive unit tests for plan conversion and allocation logic

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2026-01-08 11:03:58 +00:00
discord9
06f9a4c80c chore: sqlness redact time properly (#7543)
chore

Signed-off-by: discord9 <discord9@163.com>
2026-01-08 08:24:28 +00:00
shuiyisong
8e2c2e6e9a chore: add information extension to the plugins in frontend (#7542)
Signed-off-by: shuiyisong <xixing.sys@gmail.com>
2026-01-08 08:02:56 +00:00
fys
28255d8ade chore: add grafana dashboard about trigger (#7536)
* chore: add grafana dashboard about trigger

* fix: ci

* fix: ci

* fix: row title

* fix: ci

* modify desc

* fix: wrong legend

* revert some
2026-01-08 06:47:46 +00:00
Ning Sun
90deaae844 feat: update special remote write label name (#7527)
* feat: update special remote write label name

* chore: mark schema_label as deprecated
2026-01-08 06:13:31 +00:00
Alan Tang
a32326c887 chore: check for redundant pre-commit hooks (#7506)
Signed-off-by: StandingMan <jmtangcs@gmail.com>
2026-01-07 13:46:42 +00:00
Ruihang Xia
fce1687fa7 fix: incorrect timestamp index inference (#7530)
* add sqlness case, but can't reproduce

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* reproduction

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix wildcard rule

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* sort result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2026-01-07 11:18:25 +00:00
Yingwen
ef6dd5b99f fix: precise filter time index if not in projection (#7531)
* fix: precise filter time index if not in projection

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: add sqlness test

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
2026-01-07 11:15:34 +00:00
discord9
ac6d68aa2d fix: simp expr recursively (#7523)
* fix: simp expr recursively

Signed-off-by: discord9 <discord9@163.com>

* test: some simple constant folding case

Signed-off-by: discord9 <discord9@163.com>

* fix: literal ts cast to UTC

Signed-off-by: discord9 <discord9@163.com>

* fix: patch merge scan batch col tz instead

Signed-off-by: discord9 <discord9@163.com>

* test: fix

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
2026-01-07 09:22:26 +00:00
Ruihang Xia
d39895a970 feat: tune query traces (#7524)
* feat: add partition and region id

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* wip: instrument mito

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* connect region scan span

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* instrument streams

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* tweak

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2026-01-07 08:11:09 +00:00
jeremyhi
59867cd5b6 fix: remove log_env_flags (#7529)
Signed-off-by: jeremyhi <fengjiachun@gmail.com>
2026-01-07 08:08:35 +00:00
Ruihang Xia
9a4b7cbb32 feat: bump promql-parser to v0.7.1 (#7521)
* feat: bump promql-parser to v0.7.0

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add sqlness tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update other sqlness results

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Update tests/cases/standalone/common/tql/case_sensitive.result

Co-authored-by: Ning Sun <sunng@protonmail.com>

* remove escape on greptimedb side

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update to v0.7.1

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove unused deps

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Ning Sun <sunng@protonmail.com>
2026-01-07 07:23:40 +00:00
Weny Xu
2f242927a8 feat(repartition): implement region deallocation for repartition procedure (#7522)
* feat: implement deallocate regions for repartition procedure

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat(metric-engine): add force flag to drop physical regions with associated logical regions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: update table metadata after deallocating regions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: update proto

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2026-01-07 06:13:48 +00:00
Weny Xu
77310ec5bd refactor: refactor CreateTableProcedure to extract reusable components (#7526)
Signed-off-by: WenyXu <wenymedia@gmail.com>
2026-01-07 01:58:53 +00:00
Weny Xu
ada4666e10 refactor: remove region_numbers from TableMeta and TableInfo (#7519)
* refactor: remove `region_numbers` from `TableMeta` and `TableInfo`

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: create partitions from region route

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: fix build

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2026-01-06 13:21:36 +00:00
jeremyhi
898e84898c feat!: make heartbeat config only in metasrv (#7510)
* feat: make heartbeat config only in metasrv

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* feat: refine config doc

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* feat: make the heartbeat setup simple

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* chore: by comment

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* chore: revert config

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* feat: proto update

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* chore: fix sqlness wrong cfg

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

---------

Signed-off-by: jeremyhi <fengjiachun@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-01-06 09:43:36 +00:00
discord9
6f86a22e6f feat: adjust some args to gc worker (#7469)
* chore: less stuff sent

Signed-off-by: discord9 <discord9@163.com>

* after rebase fix

Signed-off-by: discord9 <discord9@163.com>

* pcr

Signed-off-by: discord9 <discord9@163.com>

* fix: clarify comment on manifest file removal for GC worker

Signed-off-by: discord9 <discord9@163.com>

* per review

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
2026-01-06 07:37:05 +00:00
Ruihang Xia
5162c1de4d feat: repartition grammar candy (#7518)
* feat: repartition grammar candy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* align keyword

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2026-01-06 04:44:13 +00:00
350 changed files with 13849 additions and 4083 deletions

View File

@@ -15,8 +15,11 @@ repos:
rev: v1.0
hooks:
- id: fmt
args: ["--", "--check"]
stages: [commit-msg]
- id: clippy
args: ["--workspace", "--all-targets", "--all-features", "--", "-D", "warnings"]
stages: [pre-push]
stages: [commit-msg]
- id: cargo-check
args: ["--workspace", "--all-targets", "--all-features"]
stages: [commit-msg]

1615
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -100,13 +100,13 @@ rust.unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
# See for more detaiils: https://github.com/rust-lang/cargo/issues/11329
ahash = { version = "0.8", features = ["compile-time-rng"] }
aquamarine = "0.6"
arrow = { version = "56.2", features = ["prettyprint"] }
arrow-array = { version = "56.2", default-features = false, features = ["chrono-tz"] }
arrow-buffer = "56.2"
arrow-cast = "56.2"
arrow-flight = "56.2"
arrow-ipc = { version = "56.2", default-features = false, features = ["lz4", "zstd"] }
arrow-schema = { version = "56.2", features = ["serde"] }
arrow = { version = "57.0", features = ["prettyprint"] }
arrow-array = { version = "57.0", default-features = false, features = ["chrono-tz"] }
arrow-buffer = "57.0"
arrow-cast = "57.0"
arrow-flight = "57.0"
arrow-ipc = { version = "57.0", default-features = false, features = ["lz4", "zstd"] }
arrow-schema = { version = "57.0", features = ["serde"] }
async-stream = "0.3"
async-trait = "0.1"
# Remember to update axum-extra, axum-macros when updating axum
@@ -120,38 +120,39 @@ bitflags = "2.4.1"
bytemuck = "1.12"
bytes = { version = "1.7", features = ["serde"] }
chrono = { version = "0.4", features = ["serde"] }
chrono-tz = { version = "0.10.1", features = ["case-insensitive"] }
chrono-tz = { version = "0.10", features = ["case-insensitive"] }
clap = { version = "4.4", features = ["derive"] }
config = "0.13.0"
const_format = "0.2"
crossbeam-utils = "0.8"
dashmap = "6.1"
datafusion = "50"
datafusion-common = "50"
datafusion-expr = "50"
datafusion-functions = "50"
datafusion-functions-aggregate-common = "50"
datafusion-optimizer = "50"
datafusion-orc = "0.5"
datafusion-pg-catalog = "0.12.3"
datafusion-physical-expr = "50"
datafusion-physical-plan = "50"
datafusion-sql = "50"
datafusion-substrait = "50"
datafusion = "51.0"
datafusion-common = "51.0"
datafusion-datasource = "51.0"
datafusion-expr = "51.0"
datafusion-functions = "51.0"
datafusion-functions-aggregate-common = "51.0"
datafusion-optimizer = "51.0"
datafusion-orc = { git = "https://github.com/GreptimeTeam/datafusion-orc.git", rev = "35f2e04bf81f2ab7b6f86c0450d6a77b7098d43e" }
datafusion-pg-catalog = "0.13"
datafusion-physical-expr = "51.0"
datafusion-physical-plan = "51.0"
datafusion-sql = "51.0"
datafusion-substrait = "51.0"
deadpool = "0.12"
deadpool-postgres = "0.14"
derive_builder = "0.20"
derive_more = { version = "2.1", features = ["full"] }
dotenv = "0.15"
either = "1.15"
etcd-client = { version = "0.16.1", features = [
etcd-client = { version = "0.17", features = [
"tls",
"tls-roots",
] }
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a2e5099d72a1cfa8ba41fa4296101eb5f874074a" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1353b0ada9e17890c7ba0e402ba29b2b57816ff1" }
hex = "0.4"
http = "1"
humantime = "2.1"
@@ -162,7 +163,7 @@ itertools = "0.14"
jsonb = { git = "https://github.com/databendlabs/jsonb.git", rev = "8c8d2fc294a39f3ff08909d60f718639cfba3875", default-features = false }
lazy_static = "1.4"
local-ip-address = "0.6"
loki-proto = { git = "https://github.com/GreptimeTeam/loki-proto.git", rev = "3b7cd33234358b18ece977bf689dc6fb760f29ab" }
loki-proto = { git = "https://github.com/GreptimeTeam/loki-proto.git", rev = "f69c8924c4babe516373e26a4118be82d976629c" }
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "5618e779cf2bb4755b499c630fba4c35e91898cb" }
mockall = "0.13"
moka = "0.12"
@@ -172,7 +173,7 @@ notify = "8.0"
num_cpus = "1.16"
object_store_opendal = "0.54"
once_cell = "1.18"
opentelemetry-proto = { version = "0.30", features = [
opentelemetry-proto = { version = "0.31", features = [
"gen-tonic",
"metrics",
"trace",
@@ -180,18 +181,18 @@ opentelemetry-proto = { version = "0.30", features = [
"logs",
] }
ordered-float = { version = "4.3", features = ["serde"] }
otel-arrow-rust = { git = "https://github.com/GreptimeTeam/otel-arrow", rev = "2d64b7c0fa95642028a8205b36fe9ea0b023ec59", features = [
otel-arrow-rust = { git = "https://github.com/GreptimeTeam/otel-arrow", rev = "452821e455b16e9a397a09d299340e197eb91571", features = [
"server",
] }
parking_lot = "0.12"
parquet = { version = "56.2", default-features = false, features = ["arrow", "async", "object_store"] }
parquet = { version = "57.0", default-features = false, features = ["arrow", "async", "object_store"] }
paste = "1.0"
pin-project = "1.0"
pretty_assertions = "1.4.0"
prometheus = { version = "0.13.3", features = ["process"] }
promql-parser = { version = "0.6", features = ["ser"] }
prost = { version = "0.13", features = ["no-recursion-limit"] }
prost-types = "0.13"
promql-parser = { version = "0.7.1", features = ["ser"] }
prost = { version = "=0.14.1", features = ["no-recursion-limit"] }
prost-types = "=0.14.1"
raft-engine = { version = "0.4.1", default-features = false }
rand = "0.9"
ratelimit = "0.10"
@@ -203,6 +204,7 @@ reqwest = { version = "0.12", default-features = false, features = [
"stream",
"multipart",
] }
url = "2.3"
# Branch: feat/request-timeout
rskafka = { git = "https://github.com/GreptimeTeam/rskafka.git", rev = "f5688f83e7da591cda3f2674c2408b4c0ed4ed50", features = [
"transport-tls",
@@ -222,7 +224,7 @@ simd-json = "0.15"
similar-asserts = "1.6.0"
smallvec = { version = "1", features = ["serde"] }
snafu = "0.8"
sqlparser = { version = "0.58.0", default-features = false, features = ["std", "visitor", "serde"] }
sqlparser = { version = "0.59.0", default-features = false, features = ["std", "visitor", "serde"] }
sqlx = { version = "0.8", default-features = false, features = ["any", "macros", "json", "runtime-tokio-rustls"] }
strum = { version = "0.27", features = ["derive"] }
sysinfo = "0.33"
@@ -233,12 +235,12 @@ tokio-rustls = { version = "0.26.2", default-features = false }
tokio-stream = "0.1"
tokio-util = { version = "0.7", features = ["io-util", "compat"] }
toml = "0.8.8"
tonic = { version = "0.13", features = ["tls-ring", "gzip", "zstd"] }
tonic = { version = "0.14", features = ["tls-ring", "gzip", "zstd"] }
tower = "0.5"
tower-http = "0.6"
tracing = "0.1"
tracing-appender = "0.2"
tracing-opentelemetry = "0.31.0"
tracing-opentelemetry = "0.32.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "fmt"] }
typetag = "0.2"
uuid = { version = "1.17", features = ["serde", "v4", "fast-rng"] }
@@ -321,19 +323,20 @@ git = "https://github.com/GreptimeTeam/greptime-meter.git"
rev = "5618e779cf2bb4755b499c630fba4c35e91898cb"
[patch.crates-io]
datafusion = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-functions = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-functions-aggregate-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-optimizer = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-physical-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-physical-expr-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-physical-plan = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-datasource = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-sql = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-substrait = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "a0ce2bc6eb3e804532932f39833c32432f5c9a39" } # branch = "v0.58.x"
datafusion = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-functions = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-functions-aggregate-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-optimizer = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-physical-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-physical-expr-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-physical-plan = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-pg-catalog = { git = "https://github.com/GreptimeTeam/datafusion-postgres.git", rev = "74ac8e2806be6de91ff192b97f64735392539d16" }
datafusion-datasource = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-sql = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-substrait = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "d7d95a44889e099e32d78e9bad9bc00598faef28" } # on branch v0.59.x
[profile.release]
debug = 1

View File

@@ -8863,7 +8863,7 @@
"type": "prometheus",
"uid": "${metrics}"
},
"description": "Elapsed of Reconciliation steps ",
"description": "Elapsed of Reconciliation steps",
"fieldConfig": {
"defaults": {
"color": {
@@ -9366,7 +9366,7 @@
"editorMode": "code",
"expr": "greptime_flow_input_buf_size",
"instant": false,
"legendFormat": "[{{instance}}]-[{{pod}]",
"legendFormat": "[{{instance}}]-[{{pod}}]",
"range": true,
"refId": "A"
}
@@ -9472,6 +9472,755 @@
],
"title": "Flownode",
"type": "row"
},
{
"collapsed": true,
"gridPos": {
"h": 1,
"w": 24,
"x": 0,
"y": 187
},
"id": 357,
"panels": [],
"title": "Trigger",
"type": "row"
},
{
"datasource": {
"type": "prometheus",
"uid": "bf9fzta69bhtsa"
},
"description": "Total number of triggers currently defined.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 24,
"x": 0,
"y": 188
},
"id": 358,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "11.6.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "greptime_trigger_count{}",
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "Trigger Count",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"description": "Elapsed time for trigger evaluation, including query execution and condition evaluation.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
},
"unit": "s"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 196
},
"id": 359,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "11.6.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.99, \n rate(greptime_trigger_evaluate_elapsed_bucket[$__rate_interval])\n)",
"hide": false,
"instant": false,
"legendFormat": "[{{instance}}]-[{{pod}}]-p99",
"range": true,
"refId": "A"
},
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.75, \n rate(greptime_trigger_evaluate_elapsed_bucket[$__rate_interval])\n)",
"hide": false,
"instant": false,
"legendFormat": "[{{instance}}]-[{{pod}}]-p75",
"range": true,
"refId": "B"
}
],
"title": "Trigger Eval Elapsed",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"description": "Rate of failed trigger evaluations.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
},
"unit": "none"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 196
},
"id": 360,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "11.6.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "rate(greptime_trigger_evaluate_failure_count[$__rate_interval])",
"hide": false,
"instant": false,
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "Trigger Eval Failure Rate",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"description": "Elapsed time to send trigger alerts to notification channels.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
},
"unit": "s"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 204
},
"id": 361,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "11.6.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.99, \n rate(greptime_trigger_send_alert_elapsed_bucket[$__rate_interval])\n)",
"hide": false,
"instant": false,
"legendFormat": "[{{instance}}]-[{{pod}}]-[{{channel_type}}]-p99",
"range": true,
"refId": "A"
},
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.75, \n rate(greptime_trigger_send_alert_elapsed_bucket[$__rate_interval])\n)",
"hide": false,
"instant": false,
"legendFormat": "[{{instance}}]-[{{pod}}]-[{{channel_type}}]-p75",
"range": true,
"refId": "B"
}
],
"title": "Send Alert Elapsed",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"description": "Rate of failures when sending trigger alerts.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
},
"unit": "none"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 204
},
"id": 364,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "11.6.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "rate(greptime_trigger_send_alert_failure_count[$__rate_interval])",
"hide": false,
"instant": false,
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "Send Alert Failure Rate",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"description": "Elapsed time to persist trigger alert records.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
},
"unit": "s"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 212
},
"id": 363,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "11.6.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.99, \n rate(greptime_trigger_save_alert_record_elapsed_bucket[$__rate_interval])\n)",
"hide": false,
"instant": false,
"legendFormat": "[{{instance}}]-[{{pod}}]-[{{storage_type}}]-p99",
"range": true,
"refId": "A"
},
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.75, \n rate(greptime_trigger_save_alert_record_elapsed_bucket[$__rate_interval])\n)",
"hide": false,
"instant": false,
"legendFormat": "[{{instance}}]-[{{pod}}]-[{{storage_type}}]-p75",
"range": true,
"refId": "B"
}
],
"title": "Save Alert Elapsed",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"description": "Rate of failures when persisting trigger alert records.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
},
"unit": "none"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 212
},
"id": 362,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "11.6.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "rate(greptime_trigger_save_alert_record_failure_count[$__rate_interval])",
"hide": false,
"instant": false,
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "Save Alert Failure Rate",
"type": "timeseries"
}
],
"preload": false,
@@ -9613,4 +10362,4 @@
"title": "GreptimeDB",
"uid": "dejf3k5e7g2kgb",
"version": 15
}
}

View File

@@ -111,12 +111,34 @@
| Rate of meta KV Ops | `rate(greptime_meta_kv_request_elapsed_count[$__rate_interval])` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `none` | `{{pod}}-{{op}} p99` |
| DDL Latency | `histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_tables_bucket))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_view))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_flow))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_drop_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_alter_table))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `CreateLogicalTables-{{step}} p90` |
| Reconciliation stats | `greptime_meta_reconciliation_stats` | `timeseries` | Reconciliation stats | `prometheus` | `s` | `{{pod}}-{{table_type}}-{{type}}` |
| Reconciliation steps | `histogram_quantile(0.9, greptime_meta_reconciliation_procedure_bucket)` | `timeseries` | Elapsed of Reconciliation steps | `prometheus` | `s` | `{{procedure_name}}-{{step}}-P90` |
| Reconciliation steps | `histogram_quantile(0.9, greptime_meta_reconciliation_procedure_bucket)` | `timeseries` | Elapsed of Reconciliation steps | `prometheus` | `s` | `{{procedure_name}}-{{step}}-P90` |
# Flownode
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- |
| Flow Ingest / Output Rate | `sum by(instance, pod, direction) (rate(greptime_flow_processed_rows[$__rate_interval]))` | `timeseries` | Flow Ingest / Output Rate. | `prometheus` | -- | `[{{pod}}]-[{{instance}}]-[{{direction}}]` |
| Flow Ingest Latency | `histogram_quantile(0.95, sum(rate(greptime_flow_insert_elapsed_bucket[$__rate_interval])) by (le, instance, pod))`<br/>`histogram_quantile(0.99, sum(rate(greptime_flow_insert_elapsed_bucket[$__rate_interval])) by (le, instance, pod))` | `timeseries` | Flow Ingest Latency. | `prometheus` | -- | `[{{instance}}]-[{{pod}}]-p95` |
| Flow Operation Latency | `histogram_quantile(0.95, sum(rate(greptime_flow_processing_time_bucket[$__rate_interval])) by (le,instance,pod,type))`<br/>`histogram_quantile(0.99, sum(rate(greptime_flow_processing_time_bucket[$__rate_interval])) by (le,instance,pod,type))` | `timeseries` | Flow Operation Latency. | `prometheus` | -- | `[{{instance}}]-[{{pod}}]-[{{type}}]-p95` |
| Flow Buffer Size per Instance | `greptime_flow_input_buf_size` | `timeseries` | Flow Buffer Size per Instance. | `prometheus` | -- | `[{{instance}}]-[{{pod}]` |
| Flow Buffer Size per Instance | `greptime_flow_input_buf_size` | `timeseries` | Flow Buffer Size per Instance. | `prometheus` | -- | `[{{instance}}]-[{{pod}}]` |
| Flow Processing Error per Instance | `sum by(instance,pod,code) (rate(greptime_flow_errors[$__rate_interval]))` | `timeseries` | Flow Processing Error per Instance. | `prometheus` | -- | `[{{instance}}]-[{{pod}}]-[{{code}}]` |
# Trigger
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- |
| Trigger Count | `greptime_trigger_count{}` | `timeseries` | Total number of triggers currently defined. | `prometheus` | -- | `__auto` |
| Trigger Eval Elapsed | `histogram_quantile(0.99,
rate(greptime_trigger_evaluate_elapsed_bucket[$__rate_interval])
)`<br/>`histogram_quantile(0.75,
rate(greptime_trigger_evaluate_elapsed_bucket[$__rate_interval])
)` | `timeseries` | Elapsed time for trigger evaluation, including query execution and condition evaluation. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-p99` |
| Trigger Eval Failure Rate | `rate(greptime_trigger_evaluate_failure_count[$__rate_interval])` | `timeseries` | Rate of failed trigger evaluations. | `prometheus` | `none` | `__auto` |
| Send Alert Elapsed | `histogram_quantile(0.99,
rate(greptime_trigger_send_alert_elapsed_bucket[$__rate_interval])
)`<br/>`histogram_quantile(0.75,
rate(greptime_trigger_send_alert_elapsed_bucket[$__rate_interval])
)` | `timeseries` | Elapsed time to send trigger alerts to notification channels. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{channel_type}}]-p99` |
| Send Alert Failure Rate | `rate(greptime_trigger_send_alert_failure_count[$__rate_interval])` | `timeseries` | Rate of failures when sending trigger alerts. | `prometheus` | `none` | `__auto` |
| Save Alert Elapsed | `histogram_quantile(0.99,
rate(greptime_trigger_save_alert_record_elapsed_bucket[$__rate_interval])
)`<br/>`histogram_quantile(0.75,
rate(greptime_trigger_save_alert_record_elapsed_bucket[$__rate_interval])
)` | `timeseries` | Elapsed time to persist trigger alert records. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{storage_type}}]-p99` |
| Save Alert Failure Rate | `rate(greptime_trigger_save_alert_record_failure_count[$__rate_interval])` | `timeseries` | Rate of failures when persisting trigger alert records. | `prometheus` | `none` | `__auto` |

View File

@@ -1002,7 +1002,7 @@ groups:
legendFormat: '{{pod}}-{{table_type}}-{{type}}'
- title: Reconciliation steps
type: timeseries
description: 'Elapsed of Reconciliation steps '
description: Elapsed of Reconciliation steps
unit: s
queries:
- expr: histogram_quantile(0.9, greptime_meta_reconciliation_procedure_bucket)
@@ -1057,7 +1057,7 @@ groups:
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}]'
legendFormat: '[{{instance}}]-[{{pod}}]'
- title: Flow Processing Error per Instance
type: timeseries
description: Flow Processing Error per Instance.
@@ -1067,3 +1067,89 @@ groups:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{code}}]'
- title: Trigger
panels:
- title: Trigger Count
type: timeseries
description: Total number of triggers currently defined.
queries:
- expr: greptime_trigger_count{}
datasource:
type: prometheus
uid: ${metrics}
legendFormat: __auto
- title: Trigger Eval Elapsed
type: timeseries
description: Elapsed time for trigger evaluation, including query execution and condition evaluation.
unit: s
queries:
- expr: "histogram_quantile(0.99, \n rate(greptime_trigger_evaluate_elapsed_bucket[$__rate_interval])\n)"
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-p99'
- expr: "histogram_quantile(0.75, \n rate(greptime_trigger_evaluate_elapsed_bucket[$__rate_interval])\n)"
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-p75'
- title: Trigger Eval Failure Rate
type: timeseries
description: Rate of failed trigger evaluations.
unit: none
queries:
- expr: rate(greptime_trigger_evaluate_failure_count[$__rate_interval])
datasource:
type: prometheus
uid: ${metrics}
legendFormat: __auto
- title: Send Alert Elapsed
type: timeseries
description: Elapsed time to send trigger alerts to notification channels.
unit: s
queries:
- expr: "histogram_quantile(0.99, \n rate(greptime_trigger_send_alert_elapsed_bucket[$__rate_interval])\n)"
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{channel_type}}]-p99'
- expr: "histogram_quantile(0.75, \n rate(greptime_trigger_send_alert_elapsed_bucket[$__rate_interval])\n)"
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{channel_type}}]-p75'
- title: Send Alert Failure Rate
type: timeseries
description: Rate of failures when sending trigger alerts.
unit: none
queries:
- expr: rate(greptime_trigger_send_alert_failure_count[$__rate_interval])
datasource:
type: prometheus
uid: ${metrics}
legendFormat: __auto
- title: Save Alert Elapsed
type: timeseries
description: Elapsed time to persist trigger alert records.
unit: s
queries:
- expr: "histogram_quantile(0.99, \n rate(greptime_trigger_save_alert_record_elapsed_bucket[$__rate_interval])\n)"
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{storage_type}}]-p99'
- expr: "histogram_quantile(0.75, \n rate(greptime_trigger_save_alert_record_elapsed_bucket[$__rate_interval])\n)"
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{storage_type}}]-p75'
- title: Save Alert Failure Rate
type: timeseries
description: Rate of failures when persisting trigger alert records.
unit: none
queries:
- expr: rate(greptime_trigger_save_alert_record_failure_count[$__rate_interval])
datasource:
type: prometheus
uid: ${metrics}
legendFormat: __auto

View File

@@ -8863,7 +8863,7 @@
"type": "prometheus",
"uid": "${metrics}"
},
"description": "Elapsed of Reconciliation steps ",
"description": "Elapsed of Reconciliation steps",
"fieldConfig": {
"defaults": {
"color": {
@@ -9366,7 +9366,7 @@
"editorMode": "code",
"expr": "greptime_flow_input_buf_size",
"instant": false,
"legendFormat": "[{{instance}}]-[{{pod}]",
"legendFormat": "[{{instance}}]-[{{pod}}]",
"range": true,
"refId": "A"
}
@@ -9472,6 +9472,755 @@
],
"title": "Flownode",
"type": "row"
},
{
"collapsed": true,
"gridPos": {
"h": 1,
"w": 24,
"x": 0,
"y": 187
},
"id": 357,
"panels": [],
"title": "Trigger",
"type": "row"
},
{
"datasource": {
"type": "prometheus",
"uid": "bf9fzta69bhtsa"
},
"description": "Total number of triggers currently defined.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 24,
"x": 0,
"y": 188
},
"id": 358,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "11.6.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "greptime_trigger_count{}",
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "Trigger Count",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"description": "Elapsed time for trigger evaluation, including query execution and condition evaluation.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
},
"unit": "s"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 196
},
"id": 359,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "11.6.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.99, \n rate(greptime_trigger_evaluate_elapsed_bucket[$__rate_interval])\n)",
"hide": false,
"instant": false,
"legendFormat": "[{{instance}}]-[{{pod}}]-p99",
"range": true,
"refId": "A"
},
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.75, \n rate(greptime_trigger_evaluate_elapsed_bucket[$__rate_interval])\n)",
"hide": false,
"instant": false,
"legendFormat": "[{{instance}}]-[{{pod}}]-p75",
"range": true,
"refId": "B"
}
],
"title": "Trigger Eval Elapsed",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"description": "Rate of failed trigger evaluations.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
},
"unit": "none"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 196
},
"id": 360,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "11.6.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "rate(greptime_trigger_evaluate_failure_count[$__rate_interval])",
"hide": false,
"instant": false,
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "Trigger Eval Failure Rate",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"description": "Elapsed time to send trigger alerts to notification channels.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
},
"unit": "s"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 204
},
"id": 361,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "11.6.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.99, \n rate(greptime_trigger_send_alert_elapsed_bucket[$__rate_interval])\n)",
"hide": false,
"instant": false,
"legendFormat": "[{{instance}}]-[{{pod}}]-[{{channel_type}}]-p99",
"range": true,
"refId": "A"
},
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.75, \n rate(greptime_trigger_send_alert_elapsed_bucket[$__rate_interval])\n)",
"hide": false,
"instant": false,
"legendFormat": "[{{instance}}]-[{{pod}}]-[{{channel_type}}]-p75",
"range": true,
"refId": "B"
}
],
"title": "Send Alert Elapsed",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"description": "Rate of failures when sending trigger alerts.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
},
"unit": "none"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 204
},
"id": 364,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "11.6.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "rate(greptime_trigger_send_alert_failure_count[$__rate_interval])",
"hide": false,
"instant": false,
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "Send Alert Failure Rate",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"description": "Elapsed time to persist trigger alert records.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
},
"unit": "s"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 212
},
"id": 363,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "11.6.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.99, \n rate(greptime_trigger_save_alert_record_elapsed_bucket[$__rate_interval])\n)",
"hide": false,
"instant": false,
"legendFormat": "[{{instance}}]-[{{pod}}]-[{{storage_type}}]-p99",
"range": true,
"refId": "A"
},
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.75, \n rate(greptime_trigger_save_alert_record_elapsed_bucket[$__rate_interval])\n)",
"hide": false,
"instant": false,
"legendFormat": "[{{instance}}]-[{{pod}}]-[{{storage_type}}]-p75",
"range": true,
"refId": "B"
}
],
"title": "Save Alert Elapsed",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"description": "Rate of failures when persisting trigger alert records.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
},
"unit": "none"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 212
},
"id": 362,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"hideZeros": false,
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "11.6.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${metrics}"
},
"editorMode": "code",
"expr": "rate(greptime_trigger_save_alert_record_failure_count[$__rate_interval])",
"hide": false,
"instant": false,
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "Save Alert Failure Rate",
"type": "timeseries"
}
],
"preload": false,
@@ -9613,4 +10362,4 @@
"title": "GreptimeDB",
"uid": "dejf3k5e7g2kgb",
"version": 15
}
}

View File

@@ -111,12 +111,34 @@
| Rate of meta KV Ops | `rate(greptime_meta_kv_request_elapsed_count[$__rate_interval])` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `none` | `{{pod}}-{{op}} p99` |
| DDL Latency | `histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_tables_bucket))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_view))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_create_flow))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_drop_table))`<br/>`histogram_quantile(0.9, sum by(le, pod, step) (greptime_meta_procedure_alter_table))` | `timeseries` | Gauge of load information of each datanode, collected via heartbeat between datanode and metasrv. This information is for metasrv to schedule workloads. | `prometheus` | `s` | `CreateLogicalTables-{{step}} p90` |
| Reconciliation stats | `greptime_meta_reconciliation_stats` | `timeseries` | Reconciliation stats | `prometheus` | `s` | `{{pod}}-{{table_type}}-{{type}}` |
| Reconciliation steps | `histogram_quantile(0.9, greptime_meta_reconciliation_procedure_bucket)` | `timeseries` | Elapsed of Reconciliation steps | `prometheus` | `s` | `{{procedure_name}}-{{step}}-P90` |
| Reconciliation steps | `histogram_quantile(0.9, greptime_meta_reconciliation_procedure_bucket)` | `timeseries` | Elapsed of Reconciliation steps | `prometheus` | `s` | `{{procedure_name}}-{{step}}-P90` |
# Flownode
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- |
| Flow Ingest / Output Rate | `sum by(instance, pod, direction) (rate(greptime_flow_processed_rows[$__rate_interval]))` | `timeseries` | Flow Ingest / Output Rate. | `prometheus` | -- | `[{{pod}}]-[{{instance}}]-[{{direction}}]` |
| Flow Ingest Latency | `histogram_quantile(0.95, sum(rate(greptime_flow_insert_elapsed_bucket[$__rate_interval])) by (le, instance, pod))`<br/>`histogram_quantile(0.99, sum(rate(greptime_flow_insert_elapsed_bucket[$__rate_interval])) by (le, instance, pod))` | `timeseries` | Flow Ingest Latency. | `prometheus` | -- | `[{{instance}}]-[{{pod}}]-p95` |
| Flow Operation Latency | `histogram_quantile(0.95, sum(rate(greptime_flow_processing_time_bucket[$__rate_interval])) by (le,instance,pod,type))`<br/>`histogram_quantile(0.99, sum(rate(greptime_flow_processing_time_bucket[$__rate_interval])) by (le,instance,pod,type))` | `timeseries` | Flow Operation Latency. | `prometheus` | -- | `[{{instance}}]-[{{pod}}]-[{{type}}]-p95` |
| Flow Buffer Size per Instance | `greptime_flow_input_buf_size` | `timeseries` | Flow Buffer Size per Instance. | `prometheus` | -- | `[{{instance}}]-[{{pod}]` |
| Flow Buffer Size per Instance | `greptime_flow_input_buf_size` | `timeseries` | Flow Buffer Size per Instance. | `prometheus` | -- | `[{{instance}}]-[{{pod}}]` |
| Flow Processing Error per Instance | `sum by(instance,pod,code) (rate(greptime_flow_errors[$__rate_interval]))` | `timeseries` | Flow Processing Error per Instance. | `prometheus` | -- | `[{{instance}}]-[{{pod}}]-[{{code}}]` |
# Trigger
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- |
| Trigger Count | `greptime_trigger_count{}` | `timeseries` | Total number of triggers currently defined. | `prometheus` | -- | `__auto` |
| Trigger Eval Elapsed | `histogram_quantile(0.99,
rate(greptime_trigger_evaluate_elapsed_bucket[$__rate_interval])
)`<br/>`histogram_quantile(0.75,
rate(greptime_trigger_evaluate_elapsed_bucket[$__rate_interval])
)` | `timeseries` | Elapsed time for trigger evaluation, including query execution and condition evaluation. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-p99` |
| Trigger Eval Failure Rate | `rate(greptime_trigger_evaluate_failure_count[$__rate_interval])` | `timeseries` | Rate of failed trigger evaluations. | `prometheus` | `none` | `__auto` |
| Send Alert Elapsed | `histogram_quantile(0.99,
rate(greptime_trigger_send_alert_elapsed_bucket[$__rate_interval])
)`<br/>`histogram_quantile(0.75,
rate(greptime_trigger_send_alert_elapsed_bucket[$__rate_interval])
)` | `timeseries` | Elapsed time to send trigger alerts to notification channels. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{channel_type}}]-p99` |
| Send Alert Failure Rate | `rate(greptime_trigger_send_alert_failure_count[$__rate_interval])` | `timeseries` | Rate of failures when sending trigger alerts. | `prometheus` | `none` | `__auto` |
| Save Alert Elapsed | `histogram_quantile(0.99,
rate(greptime_trigger_save_alert_record_elapsed_bucket[$__rate_interval])
)`<br/>`histogram_quantile(0.75,
rate(greptime_trigger_save_alert_record_elapsed_bucket[$__rate_interval])
)` | `timeseries` | Elapsed time to persist trigger alert records. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{storage_type}}]-p99` |
| Save Alert Failure Rate | `rate(greptime_trigger_save_alert_record_failure_count[$__rate_interval])` | `timeseries` | Rate of failures when persisting trigger alert records. | `prometheus` | `none` | `__auto` |

View File

@@ -1002,7 +1002,7 @@ groups:
legendFormat: '{{pod}}-{{table_type}}-{{type}}'
- title: Reconciliation steps
type: timeseries
description: 'Elapsed of Reconciliation steps '
description: Elapsed of Reconciliation steps
unit: s
queries:
- expr: histogram_quantile(0.9, greptime_meta_reconciliation_procedure_bucket)
@@ -1057,7 +1057,7 @@ groups:
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}]'
legendFormat: '[{{instance}}]-[{{pod}}]'
- title: Flow Processing Error per Instance
type: timeseries
description: Flow Processing Error per Instance.
@@ -1067,3 +1067,89 @@ groups:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{code}}]'
- title: Trigger
panels:
- title: Trigger Count
type: timeseries
description: Total number of triggers currently defined.
queries:
- expr: greptime_trigger_count{}
datasource:
type: prometheus
uid: ${metrics}
legendFormat: __auto
- title: Trigger Eval Elapsed
type: timeseries
description: Elapsed time for trigger evaluation, including query execution and condition evaluation.
unit: s
queries:
- expr: "histogram_quantile(0.99, \n rate(greptime_trigger_evaluate_elapsed_bucket[$__rate_interval])\n)"
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-p99'
- expr: "histogram_quantile(0.75, \n rate(greptime_trigger_evaluate_elapsed_bucket[$__rate_interval])\n)"
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-p75'
- title: Trigger Eval Failure Rate
type: timeseries
description: Rate of failed trigger evaluations.
unit: none
queries:
- expr: rate(greptime_trigger_evaluate_failure_count[$__rate_interval])
datasource:
type: prometheus
uid: ${metrics}
legendFormat: __auto
- title: Send Alert Elapsed
type: timeseries
description: Elapsed time to send trigger alerts to notification channels.
unit: s
queries:
- expr: "histogram_quantile(0.99, \n rate(greptime_trigger_send_alert_elapsed_bucket[$__rate_interval])\n)"
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{channel_type}}]-p99'
- expr: "histogram_quantile(0.75, \n rate(greptime_trigger_send_alert_elapsed_bucket[$__rate_interval])\n)"
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{channel_type}}]-p75'
- title: Send Alert Failure Rate
type: timeseries
description: Rate of failures when sending trigger alerts.
unit: none
queries:
- expr: rate(greptime_trigger_send_alert_failure_count[$__rate_interval])
datasource:
type: prometheus
uid: ${metrics}
legendFormat: __auto
- title: Save Alert Elapsed
type: timeseries
description: Elapsed time to persist trigger alert records.
unit: s
queries:
- expr: "histogram_quantile(0.99, \n rate(greptime_trigger_save_alert_record_elapsed_bucket[$__rate_interval])\n)"
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{storage_type}}]-p99'
- expr: "histogram_quantile(0.75, \n rate(greptime_trigger_save_alert_record_elapsed_bucket[$__rate_interval])\n)"
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{storage_type}}]-p75'
- title: Save Alert Failure Rate
type: timeseries
description: Rate of failures when persisting trigger alert records.
unit: none
queries:
- expr: rate(greptime_trigger_save_alert_record_failure_count[$__rate_interval])
datasource:
type: prometheus
uid: ${metrics}
legendFormat: __auto

View File

@@ -32,6 +32,7 @@ use crate::error::Result;
pub mod error;
pub mod information_extension;
pub mod kvbackend;
#[cfg(any(test, feature = "testing"))]
pub mod memory;
mod metrics;
pub mod system_schema;

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) const METRIC_DB_LABEL: &str = "db";
use lazy_static::lazy_static;
use prometheus::*;
@@ -25,7 +23,7 @@ lazy_static! {
pub static ref METRIC_CATALOG_MANAGER_TABLE_COUNT: IntGaugeVec = register_int_gauge_vec!(
"greptime_catalog_table_count",
"catalog table count",
&[METRIC_DB_LABEL]
&["db"]
)
.unwrap();
pub static ref METRIC_CATALOG_KV_REMOTE_GET: Histogram =

View File

@@ -24,6 +24,7 @@ use std::sync::Arc;
use common_error::ext::BoxedError;
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
use common_telemetry::tracing::Span;
use datatypes::schema::SchemaRef;
use futures_util::StreamExt;
use snafu::ResultExt;
@@ -163,6 +164,7 @@ impl DataSource for SystemTableDataSource {
stream: Box::pin(stream),
output_ordering: None,
metrics: Default::default(),
span: Span::current(),
};
Ok(Box::pin(stream))

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use core::pin::pin;
use std::sync::{Arc, Weak};
use arrow_schema::SchemaRef as ArrowSchemaRef;
@@ -31,15 +32,17 @@ use datatypes::value::Value;
use datatypes::vectors::{
StringVectorBuilder, TimestampSecondVectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder,
};
use futures::TryStreamExt;
use futures::StreamExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{RegionId, ScanRequest, TableId};
use store_api::storage::{ScanRequest, TableId};
use table::metadata::{TableInfo, TableType};
use crate::CatalogManager;
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
CreateRecordBatchSnafu, FindRegionRoutesSnafu, InternalSnafu, Result,
UpgradeWeakCatalogManagerRefSnafu,
};
use crate::kvbackend::KvBackendCatalogManager;
use crate::system_schema::information_schema::{InformationTable, Predicates, TABLES};
use crate::system_schema::utils;
@@ -247,6 +250,10 @@ impl InformationSchemaTablesBuilder {
.catalog_manager
.upgrade()
.context(UpgradeWeakCatalogManagerRefSnafu)?;
let partition_manager = catalog_manager
.as_any()
.downcast_ref::<KvBackendCatalogManager>()
.map(|catalog_manager| catalog_manager.partition_manager());
let predicates = Predicates::from_scan_request(&request);
let information_extension = utils::information_extension(&self.catalog_manager)?;
@@ -267,37 +274,59 @@ impl InformationSchemaTablesBuilder {
};
for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
let mut stream = catalog_manager.tables(&catalog_name, &schema_name, None);
let table_stream = catalog_manager.tables(&catalog_name, &schema_name, None);
while let Some(table) = stream.try_next().await? {
let table_info = table.table_info();
const BATCH_SIZE: usize = 128;
// Split tables into chunks
let mut table_chunks = pin!(table_stream.ready_chunks(BATCH_SIZE));
// TODO(dennis): make it working for metric engine
let table_region_stats =
if table_info.meta.engine == MITO_ENGINE || table_info.is_physical_table() {
table_info
.meta
.region_numbers
.iter()
.map(|n| RegionId::new(table_info.ident.table_id, *n))
.flat_map(|region_id| {
region_stats
.binary_search_by_key(&region_id, |x| x.id)
.map(|i| &region_stats[i])
})
.collect::<Vec<_>>()
} else {
vec![]
};
while let Some(tables) = table_chunks.next().await {
let tables = tables.into_iter().collect::<Result<Vec<_>>>()?;
let mito_or_physical_table_ids = tables
.iter()
.filter(|table| {
table.table_info().meta.engine == MITO_ENGINE
|| table.table_info().is_physical_table()
})
.map(|table| table.table_info().ident.table_id)
.collect::<Vec<_>>();
self.add_table(
&predicates,
&catalog_name,
&schema_name,
table_info,
table.table_type(),
&table_region_stats,
);
let table_routes = if let Some(partition_manager) = &partition_manager {
partition_manager
.batch_find_region_routes(&mito_or_physical_table_ids)
.await
.context(FindRegionRoutesSnafu)?
} else {
mito_or_physical_table_ids
.into_iter()
.map(|id| (id, vec![]))
.collect()
};
for table in tables {
let table_region_stats =
match table_routes.get(&table.table_info().ident.table_id) {
Some(routes) => routes
.iter()
.flat_map(|route| {
let region_id = route.region.id;
region_stats
.binary_search_by_key(&region_id, |x| x.id)
.map(|i| &region_stats[i])
})
.collect::<Vec<_>>(),
None => vec![],
};
self.add_table(
&predicates,
&catalog_name,
&schema_name,
table.table_info(),
table.table_type(),
&table_region_stats,
);
}
}
}

View File

@@ -337,7 +337,7 @@ mod tests {
.build();
let table_metadata_manager = TableMetadataManager::new(backend);
let mut view_info = common_meta::key::test_utils::new_test_table_info(1024, vec![]);
let mut view_info = common_meta::key::test_utils::new_test_table_info(1024);
view_info.table_type = TableType::View;
let logical_plan = vec![1, 2, 3];
// Create view metadata

View File

@@ -60,6 +60,7 @@ serde_json.workspace = true
servers.workspace = true
session.workspace = true
snafu.workspace = true
standalone.workspace = true
store-api.workspace = true
table.workspace = true
tokio.workspace = true

View File

@@ -162,7 +162,6 @@ fn create_table_info(table_id: TableId, table_name: TableName) -> RawTableInfo {
next_column_id: columns as u32 + 1,
value_indices: vec![],
options: Default::default(),
region_numbers: (1..=100).collect(),
partition_key_indices: vec![],
column_ids: vec![],
};

View File

@@ -267,8 +267,6 @@ impl PrefixedS3Connection {
name: "S3",
required: [
(&self.s3_bucket, "bucket"),
(&self.s3_access_key_id, "access key ID"),
(&self.s3_secret_access_key, "secret access key"),
(&self.s3_region, "region"),
]
)

View File

@@ -14,16 +14,38 @@
use std::sync::Arc;
use clap::Parser;
use clap::{Parser, ValueEnum};
use common_error::ext::BoxedError;
use common_meta::kv_backend::KvBackendRef;
use common_meta::kv_backend::chroot::ChrootKvBackend;
use common_meta::kv_backend::etcd::EtcdStore;
use meta_srv::metasrv::{BackendClientOptions, BackendImpl};
use meta_srv::metasrv::BackendClientOptions;
use meta_srv::utils::etcd::create_etcd_client_with_tls;
use serde::{Deserialize, Serialize};
use servers::tls::{TlsMode, TlsOption};
use snafu::OptionExt;
use crate::error::EmptyStoreAddrsSnafu;
use crate::error::{EmptyStoreAddrsSnafu, InvalidArgumentsSnafu};
// The datastores that implements metadata kvbackend.
#[derive(Clone, Debug, PartialEq, Serialize, Default, Deserialize, ValueEnum)]
#[serde(rename_all = "snake_case")]
#[allow(clippy::enum_variant_names)]
pub enum BackendImpl {
// Etcd as metadata storage.
#[default]
EtcdStore,
// In memory metadata storage - mostly used for testing.
MemoryStore,
#[cfg(feature = "pg_kvbackend")]
// Postgres as metadata storage.
PostgresStore,
#[cfg(feature = "mysql_kvbackend")]
// MySql as metadata storage.
MysqlStore,
// RaftEngine as metadata storage.
RaftEngineStore,
}
#[derive(Debug, Default, Parser)]
pub struct StoreConfig {
@@ -179,6 +201,18 @@ impl StoreConfig {
Ok(Arc::new(MemoryKvBackend::default()) as _)
}
BackendImpl::RaftEngineStore => {
let url = store_addrs
.first()
.context(InvalidArgumentsSnafu {
msg: "empty store addresses".to_string(),
})
.map_err(BoxedError::new)?;
let kvbackend =
standalone::build_metadata_kv_from_url(url).map_err(BoxedError::new)?;
Ok(kvbackend)
}
};
if self.store_key_prefix.is_empty() {
kvbackend

View File

@@ -900,67 +900,6 @@ mod tests {
// ==================== Gap 2: Empty string vs missing tests ====================
#[tokio::test]
async fn test_export_command_build_with_s3_empty_access_key() {
// Test S3 with empty access key ID (empty string, not missing)
let cmd = ExportCommand::parse_from([
"export",
"--addr",
"127.0.0.1:4000",
"--s3",
"--s3-bucket",
"test-bucket",
"--s3-root",
"test-root",
"--s3-access-key-id",
"", // Empty string
"--s3-secret-access-key",
"test-secret",
"--s3-region",
"us-west-2",
]);
let result = cmd.build().await;
assert!(result.is_err());
if let Err(err) = result {
assert!(
err.to_string().contains("S3 access key ID must be set"),
"Actual error: {}",
err
);
}
}
#[tokio::test]
async fn test_export_command_build_with_s3_missing_secret_key() {
// Test S3 with empty secret access key
let cmd = ExportCommand::parse_from([
"export",
"--addr",
"127.0.0.1:4000",
"--s3",
"--s3-bucket",
"test-bucket",
"--s3-root",
"test-root",
"--s3-access-key-id",
"test-key",
// Missing --s3-secret-access-key
"--s3-region",
"us-west-2",
]);
let result = cmd.build().await;
assert!(result.is_err());
if let Err(err) = result {
assert!(
err.to_string().contains("S3 secret access key must be set"),
"Actual error: {}",
err
);
}
}
#[tokio::test]
async fn test_export_command_build_with_s3_empty_root() {
// Empty root should be allowed (it's optional path component)

View File

@@ -68,8 +68,8 @@ pub enum Error {
source: common_procedure::error::Error,
},
#[snafu(display("Failed to start wal options allocator"))]
StartWalOptionsAllocator {
#[snafu(display("Failed to start wal provider"))]
StartWalProvider {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
@@ -343,7 +343,7 @@ impl ErrorExt for Error {
Error::StartProcedureManager { source, .. }
| Error::StopProcedureManager { source, .. } => source.status_code(),
Error::StartWalOptionsAllocator { source, .. } => source.status_code(),
Error::StartWalProvider { source, .. } => source.status_code(),
Error::HttpQuerySql { .. } => StatusCode::Internal,
Error::ParseSql { source, .. } | Error::PlanStatement { source, .. } => {
source.status_code()

View File

@@ -288,9 +288,16 @@ fn build_object_store_and_resolve_file_path(
#[cfg(test)]
mod tests {
use std::env;
use std::sync::Arc;
use std::time::Duration;
use clap::Parser;
use common_meta::kv_backend::KvBackend;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::rpc::store::PutRequest;
use object_store::ObjectStore;
use super::*;
use crate::metadata::snapshot::RestoreCommand;
#[tokio::test]
@@ -334,4 +341,97 @@ mod tests {
let tool = cmd.build().await.unwrap();
assert_eq!(tool.file_path, file_path.to_string_lossy().to_string());
}
async fn setup_backup_file(object_store: ObjectStore, file_path: &str) {
let kv_backend = Arc::new(MemoryKvBackend::default());
let manager = MetadataSnapshotManager::new(kv_backend.clone(), object_store);
// Put some data into the kv backend
kv_backend
.put(
PutRequest::new()
.with_key(b"test".to_vec())
.with_value(b"test".to_vec()),
)
.await
.unwrap();
manager.dump(file_path).await.unwrap();
}
#[tokio::test]
async fn test_restore_raft_engine_store() {
common_telemetry::init_default_ut_logging();
let temp_dir = tempfile::tempdir().unwrap();
let root = temp_dir.path().display().to_string();
let object_store = new_fs_object_store(&root).unwrap();
setup_backup_file(object_store, "/backup/metadata_snapshot.metadata.fb").await;
{
let cmd = RestoreCommand::parse_from([
"",
"--file_name",
format!("{}/backup/metadata_snapshot.metadata.fb", root).as_str(),
"--backend",
"raft-engine-store",
"--store-addrs",
format!("raftengine:///{}/metadata", root).as_str(),
]);
let tool = cmd.build().await.unwrap();
tool.do_work().await.unwrap();
}
// Waits for the raft engine release the file lock.
tokio::time::sleep(Duration::from_secs(1)).await;
let kv =
standalone::build_metadata_kvbackend(format!("{}/metadata", root), Default::default())
.unwrap();
let value = kv.get(b"test").await.unwrap().unwrap().value;
assert_eq!(value, b"test");
}
#[tokio::test]
async fn test_save_raft_engine_store() {
common_telemetry::init_default_ut_logging();
let temp_dir = tempfile::tempdir().unwrap();
let root = temp_dir.path().display().to_string();
{
let kv = standalone::build_metadata_kvbackend(
format!("{}/metadata", root),
Default::default(),
)
.unwrap();
kv.put(
PutRequest::new()
.with_key(b"test".to_vec())
.with_value(b"test".to_vec()),
)
.await
.unwrap();
}
// Waits for the raft engine release the file lock.
tokio::time::sleep(Duration::from_secs(1)).await;
{
let cmd = SaveCommand::parse_from([
"",
"--file_name",
format!("{}/backup/metadata_snapshot.metadata.fb", root).as_str(),
"--backend",
"raft-engine-store",
"--store-addrs",
format!("raftengine:///{}/metadata", root).as_str(),
]);
let tool = cmd.build().await.unwrap();
tool.do_work().await.unwrap();
}
// Reads the snapshot file from the object store.
let object_store = new_fs_object_store(&root).unwrap();
let kv_backend = Arc::new(MemoryKvBackend::default());
let manager = MetadataSnapshotManager::new(kv_backend.clone(), object_store);
manager
.restore("/backup/metadata_snapshot.metadata.fb")
.await
.unwrap();
let value = kv_backend.get(b"test").await.unwrap().unwrap().value;
assert_eq!(value, b"test");
}
}

View File

@@ -37,6 +37,7 @@ use common_grpc::flight::{FlightDecoder, FlightMessage};
use common_query::Output;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper};
use common_telemetry::tracing::Span;
use common_telemetry::tracing_context::W3cTrace;
use common_telemetry::{error, warn};
use futures::future;
@@ -456,6 +457,7 @@ impl Database {
stream,
output_ordering: None,
metrics: Default::default(),
span: Span::current(),
};
Ok(Output::new_with_stream(Box::pin(record_batch_stream)))
}

View File

@@ -30,6 +30,7 @@ use common_query::request::QueryRequest;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream};
use common_telemetry::error;
use common_telemetry::tracing::Span;
use common_telemetry::tracing_context::TracingContext;
use prost::Message;
use query::query_engine::DefaultSerializer;
@@ -242,6 +243,7 @@ impl RegionRequester {
stream,
output_ordering: None,
metrics,
span: Span::current(),
};
Ok(Box::pin(record_batch_stream))
}

View File

@@ -330,7 +330,6 @@ mod tests {
use common_config::ENV_VAR_SEP;
use common_test_util::temp_dir::create_named_temp_file;
use object_store::config::{FileConfig, GcsConfig, ObjectStoreConfig, S3Config};
use servers::heartbeat_options::HeartbeatOptions;
use super::*;
use crate::options::GlobalOptions;
@@ -374,9 +373,6 @@ mod tests {
hostname = "127.0.0.1"
runtime_size = 8
[heartbeat]
interval = "300ms"
[meta_client]
metasrv_addrs = ["127.0.0.1:3002"]
timeout = "3s"
@@ -434,13 +430,6 @@ mod tests {
);
assert!(!raft_engine_config.sync_write);
let HeartbeatOptions {
interval: heart_beat_interval,
..
} = options.heartbeat;
assert_eq!(300, heart_beat_interval.as_millis());
let MetaClientOptions {
metasrv_addrs: metasrv_addr,
timeout,

View File

@@ -35,6 +35,7 @@ use mito2::sst::parquet::reader::ParquetReaderBuilder;
use mito2::sst::parquet::{PARQUET_METADATA_KEY, WriteOptions};
use mito2::worker::write_cache_from_config;
use object_store::ObjectStore;
use parquet::file::metadata::{FooterTail, KeyValue};
use regex::Regex;
use snafu::OptionExt;
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
@@ -463,7 +464,6 @@ fn extract_region_metadata(
file_path: &str,
meta: &parquet::file::metadata::ParquetMetaData,
) -> error::Result<RegionMetadataRef> {
use parquet::format::KeyValue;
let kvs: Option<&Vec<KeyValue>> = meta.file_metadata().key_value_metadata();
let Some(kvs) = kvs else {
return Err(error::IllegalConfigSnafu {
@@ -608,7 +608,7 @@ async fn load_parquet_metadata(
let buffer_len = buffer.len();
let mut footer = [0; 8];
footer.copy_from_slice(&buffer[buffer_len - FOOTER_SIZE..]);
let footer = ParquetMetaDataReader::decode_footer_tail(&footer)?;
let footer = FooterTail::try_new(&footer)?;
let metadata_len = footer.metadata_length() as u64;
if actual_size - (FOOTER_SIZE as u64) < metadata_len {
return Err("invalid footer/metadata length".into());

View File

@@ -64,8 +64,8 @@ pub enum Error {
source: common_procedure::error::Error,
},
#[snafu(display("Failed to start wal options allocator"))]
StartWalOptionsAllocator {
#[snafu(display("Failed to start wal provider"))]
StartWalProvider {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
@@ -289,8 +289,8 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to build wal options allocator"))]
BuildWalOptionsAllocator {
#[snafu(display("Failed to build wal provider"))]
BuildWalProvider {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
@@ -350,8 +350,9 @@ impl ErrorExt for Error {
Error::StartProcedureManager { source, .. }
| Error::StopProcedureManager { source, .. } => source.status_code(),
Error::BuildWalOptionsAllocator { source, .. }
| Error::StartWalOptionsAllocator { source, .. } => source.status_code(),
Error::BuildWalProvider { source, .. } | Error::StartWalProvider { source, .. } => {
source.status_code()
}
Error::HttpQuerySql { .. } => StatusCode::Internal,
Error::ParseSql { source, .. } | Error::PlanStatement { source, .. } => {
source.status_code()

View File

@@ -358,7 +358,6 @@ impl StartCommand {
let heartbeat_task = flow::heartbeat::HeartbeatTask::new(
&opts,
meta_client.clone(),
opts.heartbeat.clone(),
Arc::new(executor),
Arc::new(resource_stat),
);

View File

@@ -20,6 +20,7 @@ use std::time::Duration;
use async_trait::async_trait;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::information_extension::DistributedInformationExtension;
use catalog::information_schema::InformationExtensionRef;
use catalog::kvbackend::{
CachedKvBackendBuilder, CatalogManagerConfiguratorRef, KvBackendCatalogManagerBuilder,
MetaKvBackend,
@@ -412,6 +413,7 @@ impl StartCommand {
meta_client.clone(),
client.clone(),
));
plugins.insert::<InformationExtensionRef>(information_extension.clone());
let process_manager = Arc::new(ProcessManager::new(
addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),

View File

@@ -108,7 +108,7 @@ pub trait App: Send {
}
}
/// Log the versions of the application, and the arguments passed to the cli.
/// Log the versions of the application.
///
/// `version` should be the same as the output of cli "--version";
/// and the `short_version` is the short version of the codes, often consist of git branch and commit.
@@ -118,10 +118,7 @@ pub fn log_versions(version: &str, short_version: &str, app: &str) {
.with_label_values(&[common_version::version(), short_version, app])
.inc();
// Log version and argument flags.
info!("GreptimeDB version: {}", version);
log_env_flags();
}
pub fn create_resource_limit_metrics(app: &str) {
@@ -144,13 +141,6 @@ pub fn create_resource_limit_metrics(app: &str) {
}
}
fn log_env_flags() {
info!("command line arguments");
for argument in std::env::args() {
info!("argument: {}", argument);
}
}
pub fn maybe_activate_heap_profile(memory_options: &common_options::memory::MemoryOptions) {
if memory_options.enable_heap_profiling {
match activate_heap_profile() {

View File

@@ -40,7 +40,7 @@ use common_meta::procedure_executor::LocalProcedureExecutor;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal_options_allocator::{WalOptionsAllocatorRef, build_wal_options_allocator};
use common_meta::wal_provider::{WalProviderRef, build_wal_provider};
use common_procedure::ProcedureManagerRef;
use common_query::prelude::set_default_prefix;
use common_telemetry::info;
@@ -64,8 +64,8 @@ use plugins::frontend::context::{
use plugins::standalone::context::DdlManagerConfigureContext;
use servers::tls::{TlsMode, TlsOption, merge_tls_option};
use snafu::ResultExt;
use standalone::StandaloneInformationExtension;
use standalone::options::StandaloneOptions;
use standalone::{StandaloneInformationExtension, StandaloneRepartitionProcedureFactory};
use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{OtherSnafu, Result, StartFlownodeSnafu};
@@ -120,7 +120,7 @@ pub struct Instance {
frontend: Frontend,
flownode: FlownodeInstance,
procedure_manager: ProcedureManagerRef,
wal_options_allocator: WalOptionsAllocatorRef,
wal_provider: WalProviderRef,
// Keep the logging guard to prevent the worker from being dropped.
_guard: Vec<WorkerGuard>,
}
@@ -146,10 +146,10 @@ impl App for Instance {
.await
.context(error::StartProcedureManagerSnafu)?;
self.wal_options_allocator
self.wal_provider
.start()
.await
.context(error::StartWalOptionsAllocatorSnafu)?;
.context(error::StartWalProviderSnafu)?;
plugins::start_frontend_plugins(self.frontend.instance.plugins().clone())
.await
@@ -468,7 +468,7 @@ impl StartCommand {
flow_server: flownode.flow_engine(),
});
let table_id_sequence = Arc::new(
let table_id_allocator = Arc::new(
SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
.initial(MIN_USER_TABLE_ID as u64)
.step(10)
@@ -485,13 +485,13 @@ impl StartCommand {
.clone()
.try_into()
.context(error::InvalidWalProviderSnafu)?;
let wal_options_allocator = build_wal_options_allocator(&kafka_options, kv_backend.clone())
let wal_provider = build_wal_provider(&kafka_options, kv_backend.clone())
.await
.context(error::BuildWalOptionsAllocatorSnafu)?;
let wal_options_allocator = Arc::new(wal_options_allocator);
.context(error::BuildWalProviderSnafu)?;
let wal_provider = Arc::new(wal_provider);
let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
table_id_sequence,
wal_options_allocator.clone(),
table_id_allocator,
wal_provider.clone(),
));
let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
flow_id_sequence,
@@ -509,8 +509,13 @@ impl StartCommand {
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
};
let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager.clone(), true)
.context(error::InitDdlManagerSnafu)?;
let ddl_manager = DdlManager::try_new(
ddl_context,
procedure_manager.clone(),
Arc::new(StandaloneRepartitionProcedureFactory),
true,
)
.context(error::InitDdlManagerSnafu)?;
let ddl_manager = if let Some(configurator) =
plugins.get::<DdlManagerConfiguratorRef<DdlManagerConfigureContext>>()
@@ -585,7 +590,7 @@ impl StartCommand {
frontend,
flownode,
procedure_manager,
wal_options_allocator,
wal_provider,
_guard: guard,
})
}

View File

@@ -228,7 +228,6 @@ fn test_load_flownode_example_config() {
..Default::default()
},
tracing: Default::default(),
heartbeat: Default::default(),
// flownode deliberately use a slower query parallelism
// to avoid overwhelming the frontend with too many queries
query: QueryOptions {

View File

@@ -27,13 +27,14 @@ common-recordbatch.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
datafusion.workspace = true
datafusion-datasource.workspace = true
datafusion-orc.workspace = true
datatypes.workspace = true
futures.workspace = true
lazy_static.workspace = true
object-store.workspace = true
object_store_opendal.workspace = true
orc-rust = { version = "0.6.3", default-features = false, features = ["async"] }
orc-rust = { version = "0.7", default-features = false, features = ["async"] }
parquet.workspace = true
paste.workspace = true
regex.workspace = true
@@ -42,7 +43,7 @@ snafu.workspace = true
strum.workspace = true
tokio.workspace = true
tokio-util.workspace = true
url = "2.3"
url.workspace = true
[dev-dependencies]
common-test-util.workspace = true

View File

@@ -14,7 +14,7 @@
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use datafusion::parquet::format::FileMetaData;
use parquet::file::metadata::ParquetMetaData;
use crate::error::Result;
@@ -24,5 +24,5 @@ pub trait DfRecordBatchEncoder {
#[async_trait]
pub trait ArrowWriterCloser {
async fn close(mut self) -> Result<FileMetaData>;
async fn close(mut self) -> Result<ParquetMetaData>;
}

View File

@@ -40,7 +40,6 @@ use datafusion::datasource::physical_plan::{
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datatypes::arrow::datatypes::SchemaRef;
use futures::{StreamExt, TryStreamExt};
use object_store::ObjectStore;
use object_store_opendal::OpendalStore;
@@ -303,24 +302,20 @@ where
pub async fn file_to_stream(
store: &ObjectStore,
filename: &str,
file_schema: SchemaRef,
file_source: Arc<dyn FileSource>,
projection: Option<Vec<usize>>,
compression_type: CompressionType,
) -> Result<DfSendableRecordBatchStream> {
let df_compression: DfCompressionType = compression_type.into();
let config = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
file_schema,
file_source.clone(),
)
.with_file_group(FileGroup::new(vec![PartitionedFile::new(
filename.to_string(),
0,
)]))
.with_projection(projection)
.with_file_compression_type(df_compression)
.build();
let config =
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source.clone())
.with_file_group(FileGroup::new(vec![PartitionedFile::new(
filename.to_string(),
0,
)]))
.with_projection_indices(projection)
.with_file_compression_type(df_compression)
.build();
let store = Arc::new(OpendalStore::new(store.clone()));
let file_opener = file_source

View File

@@ -440,14 +440,11 @@ mod tests {
.await
.unwrap(),
);
let csv_source = CsvSource::new(true, b',', b'"')
.with_schema(schema.clone())
.with_batch_size(8192);
let csv_source = CsvSource::new(schema).with_batch_size(8192);
let stream = file_to_stream(
&store,
compressed_file_path_str,
schema.clone(),
csv_source.clone(),
None,
compression_type,

View File

@@ -347,14 +347,11 @@ mod tests {
.await
.unwrap(),
);
let json_source = JsonSource::new()
.with_schema(schema.clone())
.with_batch_size(8192);
let json_source = JsonSource::new(schema).with_batch_size(8192);
let stream = file_to_stream(
&store,
compressed_file_path_str,
schema.clone(),
json_source.clone(),
None,
compression_type,

View File

@@ -18,15 +18,15 @@ use std::sync::Arc;
use arrow::record_batch::RecordBatch;
use arrow_schema::Schema;
use async_trait::async_trait;
use datafusion::datasource::physical_plan::{FileMeta, ParquetFileReaderFactory};
use datafusion::datasource::physical_plan::ParquetFileReaderFactory;
use datafusion::error::Result as DatafusionResult;
use datafusion::parquet::arrow::async_reader::AsyncFileReader;
use datafusion::parquet::arrow::{ArrowWriter, parquet_to_arrow_schema};
use datafusion::parquet::errors::{ParquetError, Result as ParquetResult};
use datafusion::parquet::file::metadata::ParquetMetaData;
use datafusion::parquet::format::FileMetaData;
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_datasource::PartitionedFile;
use datatypes::schema::SchemaRef;
use futures::StreamExt;
use futures::future::BoxFuture;
@@ -100,11 +100,11 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
fn create_reader(
&self,
_partition_index: usize,
file_meta: FileMeta,
partitioned_file: PartitionedFile,
_metadata_size_hint: Option<usize>,
_metrics: &ExecutionPlanMetricsSet,
) -> DatafusionResult<Box<dyn AsyncFileReader + Send>> {
let path = file_meta.location().to_string();
let path = partitioned_file.path().to_string();
let object_store = self.object_store.clone();
Ok(Box::new(LazyParquetFileReader::new(object_store, path)))
@@ -180,7 +180,7 @@ impl DfRecordBatchEncoder for ArrowWriter<SharedBuffer> {
#[async_trait]
impl ArrowWriterCloser for ArrowWriter<SharedBuffer> {
async fn close(self) -> Result<FileMetaData> {
async fn close(self) -> Result<ParquetMetaData> {
self.close().context(error::EncodeRecordBatchSnafu)
}
}

View File

@@ -67,14 +67,14 @@ impl Test<'_> {
async fn test_json_opener() {
let store = test_store("/");
let schema = test_basic_schema();
let file_source = Arc::new(JsonSource::new()).with_batch_size(test_util::TEST_BATCH_SIZE);
let file_source = Arc::new(JsonSource::new(schema)).with_batch_size(test_util::TEST_BATCH_SIZE);
let path = &find_workspace_path("/src/common/datasource/tests/json/basic.json")
.display()
.to_string();
let tests = [
Test {
config: scan_config(schema.clone(), None, path, file_source.clone()),
config: scan_config(None, path, file_source.clone()),
file_source: file_source.clone(),
expected: vec![
"+-----+-------+",
@@ -87,7 +87,7 @@ async fn test_json_opener() {
],
},
Test {
config: scan_config(schema, Some(1), path, file_source.clone()),
config: scan_config(Some(1), path, file_source.clone()),
file_source,
expected: vec![
"+-----+------+",
@@ -112,13 +112,11 @@ async fn test_csv_opener() {
.display()
.to_string();
let file_source = CsvSource::new(true, b',', b'"')
.with_batch_size(test_util::TEST_BATCH_SIZE)
.with_schema(schema.clone());
let file_source = CsvSource::new(schema).with_batch_size(test_util::TEST_BATCH_SIZE);
let tests = [
Test {
config: scan_config(schema.clone(), None, path, file_source.clone()),
config: scan_config(None, path, file_source.clone()),
file_source: file_source.clone(),
expected: vec![
"+-----+-------+---------------------+----------+------------+",
@@ -131,7 +129,7 @@ async fn test_csv_opener() {
],
},
Test {
config: scan_config(schema, Some(1), path, file_source.clone()),
config: scan_config(Some(1), path, file_source.clone()),
file_source,
expected: vec![
"+-----+------+---------------------+----------+------------+",
@@ -158,10 +156,10 @@ async fn test_parquet_exec() {
.display()
.to_string();
let parquet_source = ParquetSource::default()
let parquet_source = ParquetSource::new(schema)
.with_parquet_file_reader_factory(Arc::new(DefaultParquetFileReaderFactory::new(store)));
let config = scan_config(schema, None, path, Arc::new(parquet_source));
let config = scan_config(None, path, Arc::new(parquet_source));
let exec = DataSourceExec::from_data_source(config);
let ctx = SessionContext::new();
@@ -197,11 +195,11 @@ async fn test_orc_opener() {
let store = test_store("/");
let schema = Arc::new(OrcFormat.infer_schema(&store, path).await.unwrap());
let file_source = Arc::new(OrcSource::default());
let file_source = Arc::new(OrcSource::new(schema.into()));
let tests = [
Test {
config: scan_config(schema.clone(), None, path, file_source.clone()),
config: scan_config(None, path, file_source.clone()),
file_source: file_source.clone(),
expected: vec![
"+----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+",
@@ -216,7 +214,7 @@ async fn test_orc_opener() {
],
},
Test {
config: scan_config(schema.clone(), Some(1), path, file_source.clone()),
config: scan_config(Some(1), path, file_source.clone()),
file_source,
expected: vec![
"+----------+-----+------+------------+---+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+-------------------------+-------------+",

View File

@@ -80,7 +80,6 @@ pub fn csv_basic_schema() -> SchemaRef {
}
pub(crate) fn scan_config(
file_schema: SchemaRef,
limit: Option<usize>,
filename: &str,
file_source: Arc<dyn FileSource>,
@@ -89,7 +88,7 @@ pub(crate) fn scan_config(
let filename = &filename.replace('\\', "/");
let file_group = FileGroup::new(vec![PartitionedFile::new(filename.clone(), 4096)]);
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_schema, file_source)
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
.with_file_group(file_group)
.with_limit(limit)
.build()
@@ -109,7 +108,7 @@ pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usi
let size = store.read(origin_path).await.unwrap().len();
let config = scan_config(schema, None, origin_path, Arc::new(JsonSource::new()));
let config = scan_config(None, origin_path, Arc::new(JsonSource::new(schema)));
let stream = FileStream::new(
&config,
0,
@@ -151,10 +150,8 @@ pub async fn setup_stream_to_csv_test(
let schema = csv_basic_schema();
let csv_source = CsvSource::new(true, b',', b'"')
.with_schema(schema.clone())
.with_batch_size(TEST_BATCH_SIZE);
let config = scan_config(schema, None, origin_path, csv_source.clone());
let csv_source = CsvSource::new(schema).with_batch_size(TEST_BATCH_SIZE);
let config = scan_config(None, origin_path, csv_source.clone());
let size = store.read(origin_path).await.unwrap().len();
let csv_opener = csv_source.create_file_opener(

View File

@@ -104,7 +104,8 @@ mod tests {
assert!(matches!(f.signature(),
datafusion_expr::Signature {
type_signature: datafusion_expr::TypeSignature::Uniform(1, valid_types),
volatility: datafusion_expr::Volatility::Immutable
volatility: datafusion_expr::Volatility::Immutable,
..
} if valid_types == &ConcreteDataType::numerics().into_iter().map(|dt| { use datatypes::data_type::DataType; dt.as_arrow_type() }).collect::<Vec<_>>()));
}

View File

@@ -331,7 +331,8 @@ mod tests {
assert!(matches!(f.signature(),
datafusion_expr::Signature {
type_signature: datafusion_expr::TypeSignature::Uniform(1, valid_types),
volatility: datafusion_expr::Volatility::Immutable
volatility: datafusion_expr::Volatility::Immutable,
..
} if valid_types == &vec![ArrowDataType::Utf8]));
}

View File

@@ -145,7 +145,8 @@ mod tests {
assert!(matches!(f.signature(),
datafusion_expr::Signature {
type_signature: datafusion_expr::TypeSignature::OneOf(sigs),
volatility: datafusion_expr::Volatility::Immutable
volatility: datafusion_expr::Volatility::Immutable,
..
} if sigs.len() == 2));
}

View File

@@ -341,6 +341,7 @@ impl AggregateUDFImpl for StateWrapper {
name: acc_args.name,
is_distinct: acc_args.is_distinct,
exprs: acc_args.exprs,
expr_fields: acc_args.expr_fields,
};
self.inner.accumulator(acc_args)?
};

View File

@@ -650,7 +650,7 @@ async fn test_last_value_order_by_udaf() {
DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None),
true
), // ordering field is added to state fields too
Field::new("is_set", DataType::Boolean, true)
Field::new("last_value[last_value_is_set]", DataType::Boolean, true)
]
.into()
),
@@ -735,7 +735,7 @@ async fn test_last_value_order_by_udaf() {
DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None),
true,
),
Field::new("is_set", DataType::Boolean, true),
Field::new("last_value[last_value_is_set]", DataType::Boolean, true),
]
.into(),
vec![

View File

@@ -453,8 +453,8 @@ impl Accumulator for CountHashAccumulator {
);
};
let hash_array = inner_array.as_any().downcast_ref::<UInt64Array>().unwrap();
for i in 0..hash_array.len() {
self.values.insert(hash_array.value(i));
for &hash in hash_array.values().iter().take(hash_array.len()) {
self.values.insert(hash);
}
}
Ok(())

View File

@@ -152,9 +152,9 @@ impl DfAccumulator for JsonEncodePathAccumulator {
let lng_array = lng_array.as_primitive::<Float64Type>();
let mut coords = Vec::with_capacity(len);
for i in 0..len {
let lng = lng_array.value(i);
let lat = lat_array.value(i);
let lng_values = lng_array.values();
let lat_values = lat_array.values();
for (&lng, &lat) in lng_values.iter().zip(lat_values.iter()).take(len) {
coords.push(vec![lng, lat]);
}

View File

@@ -122,7 +122,8 @@ mod tests {
matches!(f.signature(),
Signature {
type_signature: TypeSignature::OneOf(sigs),
volatility: Volatility::Immutable
volatility: Volatility::Immutable,
..
} if sigs.len() == 15),
"{:?}",
f.signature()

View File

@@ -193,7 +193,8 @@ mod tests {
assert!(matches!(f.signature(),
Signature {
type_signature: TypeSignature::OneOf(sigs),
volatility: Volatility::Immutable
volatility: Volatility::Immutable,
..
} if sigs.len() == 6));
}

View File

@@ -120,7 +120,8 @@ mod tests {
matches!(f.signature(),
Signature {
type_signature: TypeSignature::OneOf(sigs),
volatility: Volatility::Immutable
volatility: Volatility::Immutable,
..
} if sigs.len() == 15),
"{:?}",
f.signature()

View File

@@ -25,7 +25,6 @@ use datafusion_common::arrow::array::{
};
use datafusion_common::arrow::datatypes::DataType;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::type_coercion::aggregates::STRINGS;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
use datatypes::arrow_array::{int_array_value_at_index, string_array_value_at_index};
use datatypes::json::JsonStructureSettings;
@@ -519,7 +518,7 @@ impl Default for JsonGetObject {
DataType::LargeBinary,
DataType::BinaryView,
],
STRINGS.to_vec(),
vec![DataType::UInt8, DataType::LargeUtf8, DataType::Utf8View],
),
}
}

View File

@@ -99,7 +99,8 @@ mod tests {
assert!(matches!(rate.signature(),
Signature {
type_signature: TypeSignature::Uniform(2, valid_types),
volatility: Volatility::Immutable
volatility: Volatility::Immutable,
..
} if valid_types == NUMERICS
));
let values = vec![1.0, 3.0, 6.0];

View File

@@ -208,9 +208,9 @@ fn decode_dictionary(
let mut rows = Vec::with_capacity(number_rows);
let keys = dict.keys();
for i in 0..number_rows {
let dict_index = keys.value(i) as usize;
rows.push(decoded_values[dict_index].clone());
let dict_indices = keys.values();
for &dict_index in dict_indices[..number_rows].iter() {
rows.push(decoded_values[dict_index as usize].clone());
}
Ok(rows)

View File

@@ -19,8 +19,10 @@ use datafusion_common::DataFusionError;
use datafusion_common::arrow::array::{Array, AsArray, StringViewBuilder};
use datafusion_common::arrow::compute;
use datafusion_common::arrow::datatypes::DataType;
use datafusion_expr::type_coercion::aggregates::BINARYS;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, TypeSignature, Volatility};
use datafusion_common::types::logical_binary;
use datafusion_expr::{
Coercion, ColumnarValue, ScalarFunctionArgs, Signature, TypeSignatureClass, Volatility,
};
use datatypes::types::vector_type_value_to_string;
use crate::function::{Function, extract_args};
@@ -35,11 +37,10 @@ pub struct VectorToStringFunction {
impl Default for VectorToStringFunction {
fn default() -> Self {
Self {
signature: Signature::one_of(
vec![
TypeSignature::Uniform(1, vec![DataType::BinaryView]),
TypeSignature::Uniform(1, BINARYS.to_vec()),
],
signature: Signature::coercible(
vec![Coercion::new_exact(TypeSignatureClass::Native(
logical_binary(),
))],
Volatility::Immutable,
),
}

View File

@@ -15,10 +15,10 @@
use std::fmt::Display;
use datafusion::arrow::datatypes::DataType;
use datafusion::logical_expr::ColumnarValue;
use datafusion::logical_expr::{Coercion, ColumnarValue, TypeSignature, TypeSignatureClass};
use datafusion_common::ScalarValue;
use datafusion_expr::type_coercion::aggregates::{BINARYS, STRINGS};
use datafusion_expr::{ScalarFunctionArgs, Signature, TypeSignature, Volatility};
use datafusion_common::types::{logical_binary, logical_string};
use datafusion_expr::{ScalarFunctionArgs, Signature, Volatility};
use nalgebra::DVectorView;
use crate::function::Function;
@@ -36,9 +36,12 @@ impl Default for ElemAvgFunction {
Self {
signature: Signature::one_of(
vec![
TypeSignature::Uniform(1, STRINGS.to_vec()),
TypeSignature::Uniform(1, BINARYS.to_vec()),
TypeSignature::Uniform(1, vec![DataType::BinaryView]),
TypeSignature::Coercible(vec![Coercion::new_exact(
TypeSignatureClass::Native(logical_binary()),
)]),
TypeSignature::Coercible(vec![Coercion::new_exact(
TypeSignatureClass::Native(logical_string()),
)]),
],
Volatility::Immutable,
),

View File

@@ -15,10 +15,10 @@
use std::fmt::Display;
use datafusion::arrow::datatypes::DataType;
use datafusion::logical_expr::ColumnarValue;
use datafusion::logical_expr_common::type_coercion::aggregates::{BINARYS, STRINGS};
use datafusion::logical_expr::{Coercion, ColumnarValue, TypeSignature, TypeSignatureClass};
use datafusion_common::ScalarValue;
use datafusion_expr::{ScalarFunctionArgs, Signature, TypeSignature, Volatility};
use datafusion_common::types::{logical_binary, logical_string};
use datafusion_expr::{ScalarFunctionArgs, Signature, Volatility};
use nalgebra::DVectorView;
use crate::function::Function;
@@ -49,9 +49,12 @@ impl Default for ElemProductFunction {
Self {
signature: Signature::one_of(
vec![
TypeSignature::Uniform(1, STRINGS.to_vec()),
TypeSignature::Uniform(1, BINARYS.to_vec()),
TypeSignature::Uniform(1, vec![DataType::BinaryView]),
TypeSignature::Coercible(vec![Coercion::new_exact(
TypeSignatureClass::Native(logical_binary()),
)]),
TypeSignature::Coercible(vec![Coercion::new_exact(
TypeSignatureClass::Native(logical_string()),
)]),
],
Volatility::Immutable,
),

View File

@@ -15,9 +15,9 @@
use std::fmt::Display;
use datafusion::arrow::datatypes::DataType;
use datafusion::logical_expr::ColumnarValue;
use datafusion::logical_expr::{Coercion, ColumnarValue, TypeSignatureClass};
use datafusion_common::ScalarValue;
use datafusion_expr::type_coercion::aggregates::{BINARYS, STRINGS};
use datafusion_common::types::{logical_binary, logical_string};
use datafusion_expr::{ScalarFunctionArgs, Signature, TypeSignature, Volatility};
use nalgebra::DVectorView;
@@ -36,9 +36,12 @@ impl Default for ElemSumFunction {
Self {
signature: Signature::one_of(
vec![
TypeSignature::Uniform(1, STRINGS.to_vec()),
TypeSignature::Uniform(1, BINARYS.to_vec()),
TypeSignature::Uniform(1, vec![DataType::BinaryView]),
TypeSignature::Coercible(vec![Coercion::new_exact(
TypeSignatureClass::Native(logical_binary()),
)]),
TypeSignature::Coercible(vec![Coercion::new_exact(
TypeSignatureClass::Native(logical_string()),
)]),
],
Volatility::Immutable,
),

View File

@@ -15,9 +15,9 @@
use std::fmt::Display;
use datafusion::arrow::datatypes::DataType;
use datafusion::logical_expr::ColumnarValue;
use datafusion::logical_expr_common::type_coercion::aggregates::{BINARYS, STRINGS};
use datafusion::logical_expr::{Coercion, ColumnarValue, TypeSignatureClass};
use datafusion_common::ScalarValue;
use datafusion_common::types::{logical_binary, logical_string};
use datafusion_expr::{ScalarFunctionArgs, Signature, TypeSignature, Volatility};
use crate::function::Function;
@@ -49,8 +49,12 @@ impl Default for VectorDimFunction {
Self {
signature: Signature::one_of(
vec![
TypeSignature::Uniform(1, STRINGS.to_vec()),
TypeSignature::Uniform(1, BINARYS.to_vec()),
TypeSignature::Coercible(vec![Coercion::new_exact(
TypeSignatureClass::Native(logical_binary()),
)]),
TypeSignature::Coercible(vec![Coercion::new_exact(
TypeSignatureClass::Native(logical_string()),
)]),
],
Volatility::Immutable,
),

View File

@@ -15,9 +15,9 @@
use std::fmt::Display;
use datafusion::arrow::datatypes::DataType;
use datafusion::logical_expr::ColumnarValue;
use datafusion::logical_expr_common::type_coercion::aggregates::{BINARYS, STRINGS};
use datafusion::logical_expr::{Coercion, ColumnarValue, TypeSignatureClass};
use datafusion_common::ScalarValue;
use datafusion_common::types::{logical_binary, logical_string};
use datafusion_expr::{ScalarFunctionArgs, Signature, TypeSignature, Volatility};
use nalgebra::DVectorView;
@@ -52,9 +52,12 @@ impl Default for VectorNormFunction {
Self {
signature: Signature::one_of(
vec![
TypeSignature::Uniform(1, STRINGS.to_vec()),
TypeSignature::Uniform(1, BINARYS.to_vec()),
TypeSignature::Uniform(1, vec![DataType::BinaryView]),
TypeSignature::Coercible(vec![Coercion::new_exact(
TypeSignatureClass::Native(logical_binary()),
)]),
TypeSignature::Coercible(vec![Coercion::new_exact(
TypeSignatureClass::Native(logical_string()),
)]),
],
Volatility::Immutable,
),

View File

@@ -106,7 +106,8 @@ mod tests {
assert!(matches!(f.signature(),
datafusion_expr::Signature {
type_signature: datafusion_expr::TypeSignature::Uniform(1, valid_types),
volatility: datafusion_expr::Volatility::Immutable
volatility: datafusion_expr::Volatility::Immutable,
..
} if valid_types == &vec![ArrowDataType::Utf8]));
}

View File

@@ -34,7 +34,7 @@ use table::requests::{
};
use crate::error::{
ColumnNotFoundSnafu, InvalidColumnDefSnafu, InvalidIndexOptionSnafu,
self, ColumnNotFoundSnafu, InvalidColumnDefSnafu, InvalidIndexOptionSnafu,
InvalidSetFulltextOptionRequestSnafu, InvalidSetSkippingIndexOptionRequestSnafu,
InvalidSetTableOptionRequestSnafu, InvalidUnsetTableOptionRequestSnafu,
MissingAlterIndexOptionSnafu, MissingFieldSnafu, MissingTableMetaSnafu,
@@ -251,6 +251,10 @@ pub fn alter_expr_to_request(
.collect::<Result<Vec<_>>>()?;
AlterKind::SetDefaults { defaults }
}
Kind::Repartition(_) => error::UnexpectedSnafu {
err_msg: "Repartition operation should be handled through DdlManager and not converted to AlterTableRequest",
}
.fail()?,
};
let request = AlterTableRequest {

View File

@@ -161,6 +161,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unexpected: {err_msg}"))]
Unexpected {
err_msg: String,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -188,6 +195,7 @@ impl ErrorExt for Error {
Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound,
Error::SqlCommon { source, .. } => source.status_code(),
Error::MissingTableMeta { .. } => StatusCode::Unexpected,
Error::Unexpected { .. } => StatusCode::Unexpected,
}
}

View File

@@ -103,10 +103,11 @@ impl FlightEncoder {
FlightMessage::RecordBatch(record_batch) => {
let (encoded_dictionaries, encoded_batch) = self
.data_gen
.encoded_batch(
.encode(
&record_batch,
&mut self.dictionary_tracker,
&self.write_options,
&mut Default::default(),
)
.expect("DictionaryTracker configured above to not fail on replacement");

View File

@@ -28,6 +28,7 @@ use crate::node_manager::NodeManagerRef;
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::region_registry::LeaderRegionRegistryRef;
pub mod allocator;
pub mod alter_database;
pub mod alter_logical_tables;
pub mod alter_table;
@@ -36,8 +37,7 @@ pub mod create_database;
pub mod create_flow;
pub mod create_logical_tables;
pub mod create_table;
mod create_table_template;
pub(crate) use create_table_template::{CreateRequestBuilder, build_template_from_raw_table_info};
pub(crate) use create_table::{CreateRequestBuilder, build_template_from_raw_table_info};
pub mod create_view;
pub mod drop_database;
pub mod drop_flow;

View File

@@ -0,0 +1,17 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod region_routes;
pub mod resource_id;
pub mod wal_options;

View File

@@ -0,0 +1,80 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_telemetry::debug;
use store_api::storage::{RegionId, RegionNumber, TableId};
use crate::error::Result;
use crate::peer::PeerAllocator;
use crate::rpc::router::{Region, RegionRoute};
pub type RegionRoutesAllocatorRef = Arc<dyn RegionRoutesAllocator>;
#[async_trait::async_trait]
pub trait RegionRoutesAllocator: Send + Sync {
async fn allocate(
&self,
table_id: TableId,
regions_and_partitions: &[(RegionNumber, &str)],
) -> Result<Vec<RegionRoute>>;
}
#[async_trait::async_trait]
impl<T: PeerAllocator> RegionRoutesAllocator for T {
async fn allocate(
&self,
table_id: TableId,
regions_and_partitions: &[(RegionNumber, &str)],
) -> Result<Vec<RegionRoute>> {
let regions = regions_and_partitions.len().max(1);
let peers = self.alloc(regions).await?;
debug!("Allocated peers {:?} for table {}", peers, table_id,);
let mut region_routes = regions_and_partitions
.iter()
.enumerate()
.map(|(i, (region_number, partition))| {
let region = Region {
id: RegionId::new(table_id, *region_number),
partition_expr: partition.to_string(),
..Default::default()
};
let peer = peers[i % peers.len()].clone();
RegionRoute {
region,
leader_peer: Some(peer),
..Default::default()
}
})
.collect::<Vec<_>>();
// If the table has no partitions, we need to create a default region.
if region_routes.is_empty() {
region_routes.push(RegionRoute {
region: Region {
id: RegionId::new(table_id, 0),
..Default::default()
},
leader_peer: Some(peers[0].clone()),
..Default::default()
});
}
Ok(region_routes)
}
}

View File

@@ -0,0 +1,35 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::Range;
use std::sync::Arc;
use crate::error::Result;
pub type ResourceIdAllocatorRef = Arc<dyn ResourceIdAllocator>;
#[async_trait::async_trait]
pub trait ResourceIdAllocator: Send + Sync {
/// Returns the next value and increments the sequence.
async fn next(&self) -> Result<u64>;
/// Returns the current value stored in the remote storage without incrementing the sequence.
async fn peek(&self) -> Result<u64>;
/// Jumps to the given value.
async fn jump_to(&self, next: u64) -> Result<()>;
/// Returns the range of available sequences.
async fn min_max(&self) -> Range<u64>;
}

View File

@@ -0,0 +1,31 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use store_api::storage::RegionNumber;
use crate::error::Result;
pub type WalOptionsAllocatorRef = Arc<dyn WalOptionsAllocator>;
#[async_trait::async_trait]
pub trait WalOptionsAllocator: Send + Sync {
async fn allocate(
&self,
region_numbers: &[RegionNumber],
skip_wal: bool,
) -> Result<HashMap<RegionNumber, String>>;
}

View File

@@ -22,7 +22,7 @@ use snafu::OptionExt;
use table::metadata::RawTableInfo;
use crate::ddl::alter_table::AlterTableProcedure;
use crate::error::{InvalidProtoMsgSnafu, Result};
use crate::error::{self, InvalidProtoMsgSnafu, Result};
impl AlterTableProcedure {
/// Makes alter kind proto that all regions can reuse.
@@ -112,6 +112,10 @@ fn create_proto_alter_kind(
Kind::UnsetIndexes(v) => Ok(Some(alter_request::Kind::UnsetIndexes(v.clone()))),
Kind::DropDefaults(v) => Ok(Some(alter_request::Kind::DropDefaults(v.clone()))),
Kind::SetDefaults(v) => Ok(Some(alter_request::Kind::SetDefaults(v.clone()))),
Kind::Repartition(_) => error::UnexpectedSnafu {
err_msg: "Repartition operation should be handled through DdlManager and not converted to AlterTableRequest",
}
.fail()?,
}
}

View File

@@ -30,7 +30,7 @@ use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::metadata::ColumnMetadata;
use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
use store_api::storage::{RegionId, RegionNumber};
use store_api::storage::RegionNumber;
use strum::AsRefStr;
use table::metadata::{RawTableInfo, TableId};
@@ -286,14 +286,7 @@ impl CreateTablesData {
.flat_map(|(task, table_id)| {
if table_id.is_none() {
let table_info = task.table_info.clone();
let region_ids = self
.physical_region_numbers
.iter()
.map(|region_number| {
RegionId::new(table_info.ident.table_id, *region_number)
})
.collect();
let table_route = TableRouteValue::logical(self.physical_table_id, region_ids);
let table_route = TableRouteValue::logical(self.physical_table_id);
Some((table_info, table_route))
} else {
None

View File

@@ -22,7 +22,7 @@ use store_api::storage::{RegionId, TableId};
use table::metadata::RawTableInfo;
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::ddl::create_table_template::{
use crate::ddl::create_table::template::{
CreateRequestBuilder, build_template, build_template_from_raw_table_info,
};
use crate::ddl::utils::region_storage_path;
@@ -97,7 +97,7 @@ pub fn create_region_request_builder(
/// Builds a [CreateRequestBuilder] from a [RawTableInfo].
///
/// Note: **This method is only used for creating logical tables.**
/// Note: This function is primarily intended for creating logical tables or allocating placeholder regions.
pub fn create_region_request_builder_from_raw_table_info(
raw_table_info: &RawTableInfo,
physical_table_id: TableId,

View File

@@ -12,74 +12,99 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod executor;
pub mod template;
use std::collections::HashMap;
use api::v1::region::region_request::Body as PbRegionRequest;
use api::v1::region::{RegionRequest, RegionRequestHeader};
use api::v1::CreateTableExpr;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_procedure::error::{
ExternalSnafu, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, ProcedureId, Status};
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{info, warn};
use futures::future::join_all;
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, ensure};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::ColumnMetadata;
use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY;
use store_api::storage::{RegionId, RegionNumber};
use store_api::storage::RegionNumber;
use strum::AsRefStr;
use table::metadata::{RawTableInfo, TableId};
use table::table_name::TableName;
use table::table_reference::TableReference;
pub(crate) use template::{CreateRequestBuilder, build_template_from_raw_table_info};
use crate::ddl::create_table_template::{CreateRequestBuilder, build_template};
use crate::ddl::utils::raw_table_info::update_table_info_column_ids;
use crate::ddl::utils::{
add_peer_context_if_needed, convert_region_routes_to_detecting_regions,
extract_column_metadatas, map_to_procedure_error, region_storage_path,
};
use crate::ddl::create_table::executor::CreateTableExecutor;
use crate::ddl::create_table::template::build_template;
use crate::ddl::utils::map_to_procedure_error;
use crate::ddl::{DdlContext, TableMetadata};
use crate::error::{self, Result};
use crate::key::table_name::TableNameKey;
use crate::key::table_route::{PhysicalTableRouteValue, TableRouteValue};
use crate::key::table_route::PhysicalTableRouteValue;
use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock};
use crate::metrics;
use crate::region_keeper::OperatingRegionGuard;
use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{
RegionRoute, find_leader_regions, find_leaders, operating_leader_regions,
};
use crate::rpc::router::{RegionRoute, operating_leader_regions};
pub struct CreateTableProcedure {
pub context: DdlContext,
pub creator: TableCreator,
/// The serializable data.
pub data: CreateTableData,
/// The guards of opening.
pub opening_regions: Vec<OperatingRegionGuard>,
/// The executor of the procedure.
pub executor: CreateTableExecutor,
}
fn build_executor_from_create_table_data(
create_table_expr: &CreateTableExpr,
) -> Result<CreateTableExecutor> {
let template = build_template(create_table_expr)?;
let builder = CreateRequestBuilder::new(template, None);
let table_name = TableName::new(
create_table_expr.catalog_name.clone(),
create_table_expr.schema_name.clone(),
create_table_expr.table_name.clone(),
);
let executor =
CreateTableExecutor::new(table_name, create_table_expr.create_if_not_exists, builder);
Ok(executor)
}
impl CreateTableProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateTable";
pub fn new(task: CreateTableTask, context: DdlContext) -> Self {
Self {
pub fn new(task: CreateTableTask, context: DdlContext) -> Result<Self> {
let executor = build_executor_from_create_table_data(&task.create_table)?;
Ok(Self {
context,
creator: TableCreator::new(task),
}
data: CreateTableData::new(task),
opening_regions: vec![],
executor,
})
}
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data = serde_json::from_str(json).context(FromJsonSnafu)?;
let data: CreateTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
let create_table_expr = &data.task.create_table;
let executor = build_executor_from_create_table_data(create_table_expr)
.map_err(BoxedError::new)
.context(ExternalSnafu {
clean_poisons: false,
})?;
Ok(CreateTableProcedure {
context,
creator: TableCreator {
data,
opening_regions: vec![],
},
data,
opening_regions: vec![],
executor,
})
}
fn table_info(&self) -> &RawTableInfo {
&self.creator.data.task.table_info
&self.data.task.table_info
}
pub(crate) fn table_id(&self) -> TableId {
@@ -87,8 +112,7 @@ impl CreateTableProcedure {
}
fn region_wal_options(&self) -> Result<&HashMap<RegionNumber, String>> {
self.creator
.data
self.data
.region_wal_options
.as_ref()
.context(error::UnexpectedSnafu {
@@ -97,8 +121,7 @@ impl CreateTableProcedure {
}
fn table_route(&self) -> Result<&PhysicalTableRouteValue> {
self.creator
.data
self.data
.table_route
.as_ref()
.context(error::UnexpectedSnafu {
@@ -106,17 +129,6 @@ impl CreateTableProcedure {
})
}
#[cfg(any(test, feature = "testing"))]
pub fn set_allocated_metadata(
&mut self,
table_id: TableId,
table_route: PhysicalTableRouteValue,
region_wal_options: HashMap<RegionNumber, String>,
) {
self.creator
.set_allocated_metadata(table_id, table_route, region_wal_options)
}
/// On the prepare step, it performs:
/// - Checks whether the table exists.
/// - Allocates the table id.
@@ -125,31 +137,16 @@ impl CreateTableProcedure {
/// - TableName exists and `create_if_not_exists` is false.
/// - Failed to allocate [TableMetadata].
pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
let expr = &self.creator.data.task.create_table;
let table_name_value = self
.context
.table_metadata_manager
.table_name_manager()
.get(TableNameKey::new(
&expr.catalog_name,
&expr.schema_name,
&expr.table_name,
))
let table_id = self
.executor
.on_prepare(&self.context.table_metadata_manager)
.await?;
if let Some(value) = table_name_value {
ensure!(
expr.create_if_not_exists,
error::TableAlreadyExistsSnafu {
table_name: self.creator.data.table_ref().to_string(),
}
);
let table_id = value.table_id();
// Return the table id if the table already exists.
if let Some(table_id) = table_id {
return Ok(Status::done_with_output(table_id));
}
self.creator.data.state = CreateTableState::DatanodeCreateRegions;
self.data.state = CreateTableState::DatanodeCreateRegions;
let TableMetadata {
table_id,
table_route,
@@ -157,23 +154,13 @@ impl CreateTableProcedure {
} = self
.context
.table_metadata_allocator
.create(&self.creator.data.task)
.create(&self.data.task)
.await?;
self.creator
.set_allocated_metadata(table_id, table_route, region_wal_options);
self.set_allocated_metadata(table_id, table_route, region_wal_options);
Ok(Status::executing(true))
}
pub fn new_region_request_builder(
&self,
physical_table_id: Option<TableId>,
) -> Result<CreateRequestBuilder> {
let create_table_expr = &self.creator.data.task.create_table;
let template = build_template(create_table_expr)?;
Ok(CreateRequestBuilder::new(template, physical_table_id))
}
/// Creates regions on datanodes
///
/// Abort(non-retry):
@@ -187,90 +174,29 @@ impl CreateTableProcedure {
/// - [Code::Unavailable](tonic::status::Code::Unavailable)
pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
let table_route = self.table_route()?.clone();
let request_builder = self.new_region_request_builder(None)?;
// Registers opening regions
let guards = self
.creator
.register_opening_regions(&self.context, &table_route.region_routes)?;
let guards = self.register_opening_regions(&self.context, &table_route.region_routes)?;
if !guards.is_empty() {
self.creator.opening_regions = guards;
self.opening_regions = guards;
}
self.create_regions(&table_route.region_routes, request_builder)
.await
self.create_regions(&table_route.region_routes).await
}
async fn create_regions(
&mut self,
region_routes: &[RegionRoute],
request_builder: CreateRequestBuilder,
) -> Result<Status> {
let create_table_data = &self.creator.data;
// Safety: the region_wal_options must be allocated
async fn create_regions(&mut self, region_routes: &[RegionRoute]) -> Result<Status> {
let table_id = self.table_id();
let region_wal_options = self.region_wal_options()?;
let create_table_expr = &create_table_data.task.create_table;
let catalog = &create_table_expr.catalog_name;
let schema = &create_table_expr.schema_name;
let storage_path = region_storage_path(catalog, schema);
let leaders = find_leaders(region_routes);
let mut create_region_tasks = Vec::with_capacity(leaders.len());
let column_metadatas = self
.executor
.on_create_regions(
&self.context.node_manager,
table_id,
region_routes,
region_wal_options,
)
.await?;
let partition_exprs = region_routes
.iter()
.map(|r| (r.region.id.region_number(), r.region.partition_expr()))
.collect();
for datanode in leaders {
let requester = self.context.node_manager.datanode(&datanode).await;
let regions = find_leader_regions(region_routes, &datanode);
let mut requests = Vec::with_capacity(regions.len());
for region_number in regions {
let region_id = RegionId::new(self.table_id(), region_number);
let create_region_request = request_builder.build_one(
region_id,
storage_path.clone(),
region_wal_options,
&partition_exprs,
);
requests.push(PbRegionRequest::Create(create_region_request));
}
for request in requests {
let request = RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(request),
};
let datanode = datanode.clone();
let requester = requester.clone();
create_region_tasks.push(async move {
requester
.handle(request)
.await
.map_err(add_peer_context_if_needed(datanode))
});
}
}
let mut results = join_all(create_region_tasks)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
if let Some(column_metadatas) =
extract_column_metadatas(&mut results, TABLE_COLUMN_METADATA_EXTENSION_KEY)?
{
self.creator.data.column_metadatas = column_metadatas;
} else {
warn!(
"creating table result doesn't contains extension key `{TABLE_COLUMN_METADATA_EXTENSION_KEY}`,leaving the table's column metadata unchanged"
);
}
self.creator.data.state = CreateTableState::CreateMetadata;
self.data.column_metadatas = column_metadatas;
self.data.state = CreateTableState::CreateMetadata;
Ok(Status::executing(true))
}
@@ -280,107 +206,33 @@ impl CreateTableProcedure {
/// - Failed to create table metadata.
async fn on_create_metadata(&mut self, pid: ProcedureId) -> Result<Status> {
let table_id = self.table_id();
let table_ref = self.creator.data.table_ref();
let table_ref = self.data.table_ref();
let manager = &self.context.table_metadata_manager;
let mut raw_table_info = self.table_info().clone();
if !self.creator.data.column_metadatas.is_empty() {
update_table_info_column_ids(&mut raw_table_info, &self.creator.data.column_metadatas);
}
let raw_table_info = self.table_info().clone();
// Safety: the region_wal_options must be allocated.
let region_wal_options = self.region_wal_options()?.clone();
// Safety: the table_route must be allocated.
let physical_table_route = self.table_route()?.clone();
let detecting_regions =
convert_region_routes_to_detecting_regions(&physical_table_route.region_routes);
let table_route = TableRouteValue::Physical(physical_table_route);
manager
.create_table_metadata(raw_table_info, table_route, region_wal_options)
self.executor
.on_create_metadata(
manager,
&self.context.region_failure_detector_controller,
raw_table_info,
&self.data.column_metadatas,
physical_table_route,
region_wal_options,
)
.await?;
self.context
.register_failure_detectors(detecting_regions)
.await;
info!(
"Successfully created table: {}, table_id: {}, procedure_id: {}",
table_ref, table_id, pid
);
self.creator.opening_regions.clear();
self.opening_regions.clear();
Ok(Status::done_with_output(table_id))
}
}
#[async_trait]
impl Procedure for CreateTableProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
fn recover(&mut self) -> ProcedureResult<()> {
// Only registers regions if the table route is allocated.
if let Some(x) = &self.creator.data.table_route {
self.creator.opening_regions = self
.creator
.register_opening_regions(&self.context, &x.region_routes)
.map_err(BoxedError::new)
.context(ExternalSnafu {
clean_poisons: false,
})?;
}
Ok(())
}
async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &self.creator.data.state;
let _timer = metrics::METRIC_META_PROCEDURE_CREATE_TABLE
.with_label_values(&[state.as_ref()])
.start_timer();
match state {
CreateTableState::Prepare => self.on_prepare().await,
CreateTableState::DatanodeCreateRegions => self.on_datanode_create_regions().await,
CreateTableState::CreateMetadata => self.on_create_metadata(ctx.procedure_id).await,
}
.map_err(map_to_procedure_error)
}
fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.creator.data).context(ToJsonSnafu)
}
fn lock_key(&self) -> LockKey {
let table_ref = &self.creator.data.table_ref();
LockKey::new(vec![
CatalogLock::Read(table_ref.catalog).into(),
SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
TableNameLock::new(table_ref.catalog, table_ref.schema, table_ref.table).into(),
])
}
}
pub struct TableCreator {
/// The serializable data.
pub data: CreateTableData,
/// The guards of opening.
pub opening_regions: Vec<OperatingRegionGuard>,
}
impl TableCreator {
pub fn new(task: CreateTableTask) -> Self {
Self {
data: CreateTableData {
state: CreateTableState::Prepare,
column_metadatas: vec![],
task,
table_route: None,
region_wal_options: None,
},
opening_regions: vec![],
}
}
/// Registers and returns the guards of the opening region if they don't exist.
fn register_opening_regions(
@@ -389,7 +241,6 @@ impl TableCreator {
region_routes: &[RegionRoute],
) -> Result<Vec<OperatingRegionGuard>> {
let opening_regions = operating_leader_regions(region_routes);
if self.opening_regions.len() == opening_regions.len() {
return Ok(vec![]);
}
@@ -409,7 +260,7 @@ impl TableCreator {
Ok(opening_region_guards)
}
fn set_allocated_metadata(
pub fn set_allocated_metadata(
&mut self,
table_id: TableId,
table_route: PhysicalTableRouteValue,
@@ -421,6 +272,56 @@ impl TableCreator {
}
}
#[async_trait]
impl Procedure for CreateTableProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
fn recover(&mut self) -> ProcedureResult<()> {
// Only registers regions if the table route is allocated.
if let Some(x) = &self.data.table_route {
self.opening_regions = self
.register_opening_regions(&self.context, &x.region_routes)
.map_err(BoxedError::new)
.context(ExternalSnafu {
clean_poisons: false,
})?;
}
Ok(())
}
async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &self.data.state;
let _timer = metrics::METRIC_META_PROCEDURE_CREATE_TABLE
.with_label_values(&[state.as_ref()])
.start_timer();
match state {
CreateTableState::Prepare => self.on_prepare().await,
CreateTableState::DatanodeCreateRegions => self.on_datanode_create_regions().await,
CreateTableState::CreateMetadata => self.on_create_metadata(ctx.procedure_id).await,
}
.map_err(map_to_procedure_error)
}
fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
}
fn lock_key(&self) -> LockKey {
let table_ref = &self.data.table_ref();
LockKey::new(vec![
CatalogLock::Read(table_ref.catalog).into(),
SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
TableNameLock::new(table_ref.catalog, table_ref.schema, table_ref.table).into(),
])
}
}
#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)]
pub enum CreateTableState {
/// Prepares to create the table
@@ -444,6 +345,16 @@ pub struct CreateTableData {
}
impl CreateTableData {
pub fn new(task: CreateTableTask) -> Self {
CreateTableData {
state: CreateTableState::Prepare,
column_metadatas: vec![],
task,
table_route: None,
region_wal_options: None,
}
}
fn table_ref(&self) -> TableReference<'_> {
self.task.table_ref()
}

View File

@@ -0,0 +1,203 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::v1::region::region_request::Body as PbRegionRequest;
use api::v1::region::{RegionRequest, RegionRequestHeader};
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::warn;
use futures::future::join_all;
use snafu::ensure;
use store_api::metadata::ColumnMetadata;
use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::{RawTableInfo, TableId};
use table::table_name::TableName;
use crate::ddl::utils::raw_table_info::update_table_info_column_ids;
use crate::ddl::utils::{
add_peer_context_if_needed, convert_region_routes_to_detecting_regions,
extract_column_metadatas, region_storage_path,
};
use crate::ddl::{CreateRequestBuilder, RegionFailureDetectorControllerRef};
use crate::error::{self, Result};
use crate::key::TableMetadataManagerRef;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::{PhysicalTableRouteValue, TableRouteValue};
use crate::node_manager::NodeManagerRef;
use crate::rpc::router::{RegionRoute, find_leader_regions, find_leaders};
/// [CreateTableExecutor] performs:
/// - Creates the metadata of the table.
/// - Creates the regions on the Datanode nodes.
pub struct CreateTableExecutor {
create_if_not_exists: bool,
table_name: TableName,
builder: CreateRequestBuilder,
}
impl CreateTableExecutor {
/// Creates a new [`CreateTableExecutor`].
pub fn new(
table_name: TableName,
create_if_not_exists: bool,
builder: CreateRequestBuilder,
) -> Self {
Self {
create_if_not_exists,
table_name,
builder,
}
}
/// On the prepare step, it performs:
/// - Checks whether the table exists.
/// - Returns the table id if the table exists.
///
/// Abort(non-retry):
/// - Table exists and `create_if_not_exists` is `false`.
/// - Failed to get the table name value.
pub async fn on_prepare(
&self,
table_metadata_manager: &TableMetadataManagerRef,
) -> Result<Option<TableId>> {
let table_name_value = table_metadata_manager
.table_name_manager()
.get(TableNameKey::new(
&self.table_name.catalog_name,
&self.table_name.schema_name,
&self.table_name.table_name,
))
.await?;
if let Some(value) = table_name_value {
ensure!(
self.create_if_not_exists,
error::TableAlreadyExistsSnafu {
table_name: self.table_name.to_string(),
}
);
return Ok(Some(value.table_id()));
}
Ok(None)
}
pub async fn on_create_regions(
&self,
node_manager: &NodeManagerRef,
table_id: TableId,
region_routes: &[RegionRoute],
region_wal_options: &HashMap<RegionNumber, String>,
) -> Result<Vec<ColumnMetadata>> {
let storage_path =
region_storage_path(&self.table_name.catalog_name, &self.table_name.schema_name);
let leaders = find_leaders(region_routes);
let mut create_region_tasks = Vec::with_capacity(leaders.len());
let partition_exprs = region_routes
.iter()
.map(|r| (r.region.id.region_number(), r.region.partition_expr()))
.collect::<HashMap<_, _>>();
for datanode in leaders {
let requester = node_manager.datanode(&datanode).await;
let regions = find_leader_regions(region_routes, &datanode);
let mut requests = Vec::with_capacity(regions.len());
for region_number in regions {
let region_id = RegionId::new(table_id, region_number);
let create_region_request = self.builder.build_one(
region_id,
storage_path.clone(),
region_wal_options,
&partition_exprs,
);
requests.push(PbRegionRequest::Create(create_region_request));
}
for request in requests {
let request = RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(request),
};
let datanode = datanode.clone();
let requester = requester.clone();
create_region_tasks.push(async move {
requester
.handle(request)
.await
.map_err(add_peer_context_if_needed(datanode))
});
}
}
let mut results = join_all(create_region_tasks)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
let column_metadatas = if let Some(column_metadatas) =
extract_column_metadatas(&mut results, TABLE_COLUMN_METADATA_EXTENSION_KEY)?
{
column_metadatas
} else {
warn!(
"creating table result doesn't contains extension key `{TABLE_COLUMN_METADATA_EXTENSION_KEY}`,leaving the table's column metadata unchanged"
);
vec![]
};
Ok(column_metadatas)
}
/// Creates table metadata
///
/// Abort(non-retry):
/// - Failed to create table metadata.
pub async fn on_create_metadata(
&self,
table_metadata_manager: &TableMetadataManagerRef,
region_failure_detector_controller: &RegionFailureDetectorControllerRef,
mut raw_table_info: RawTableInfo,
column_metadatas: &[ColumnMetadata],
table_route: PhysicalTableRouteValue,
region_wal_options: HashMap<RegionNumber, String>,
) -> Result<()> {
if !column_metadatas.is_empty() {
update_table_info_column_ids(&mut raw_table_info, column_metadatas);
}
let detecting_regions =
convert_region_routes_to_detecting_regions(&table_route.region_routes);
let table_route = TableRouteValue::Physical(table_route);
table_metadata_manager
.create_table_metadata(raw_table_info, table_route, region_wal_options)
.await?;
region_failure_detector_controller
.register_failure_detectors(detecting_regions)
.await;
Ok(())
}
/// Returns the builder of the executor.
pub fn builder(&self) -> &CreateRequestBuilder {
&self.builder
}
}

View File

@@ -20,19 +20,17 @@ use api::v1::region::{CreateRequest, RegionColumnDef};
use api::v1::{ColumnDef, CreateTableExpr, SemanticType};
use common_telemetry::warn;
use snafu::{OptionExt, ResultExt};
use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::{RawTableInfo, TableId};
use crate::error::{self, Result};
use crate::wal_options_allocator::prepare_wal_options;
use crate::wal_provider::prepare_wal_options;
/// Builds a [CreateRequest] from a [RawTableInfo].
/// Constructs a [CreateRequest] based on the provided [RawTableInfo].
///
/// Note: **This method is only used for creating logical tables.**
pub(crate) fn build_template_from_raw_table_info(
raw_table_info: &RawTableInfo,
) -> Result<CreateRequest> {
/// Note: This function is primarily intended for creating logical tables or allocating placeholder regions.
pub fn build_template_from_raw_table_info(raw_table_info: &RawTableInfo) -> Result<CreateRequest> {
let primary_key_indices = &raw_table_info.meta.primary_key_indices;
let column_defs = raw_table_info
.meta
@@ -57,7 +55,7 @@ pub(crate) fn build_template_from_raw_table_info(
let options = HashMap::from(&raw_table_info.meta.options);
let template = CreateRequest {
region_id: 0,
engine: METRIC_ENGINE_NAME.to_string(),
engine: raw_table_info.meta.engine.clone(),
column_defs,
primary_key: primary_key_indices.iter().map(|i| *i as u32).collect(),
path: String::new(),
@@ -138,7 +136,7 @@ pub struct CreateRequestBuilder {
}
impl CreateRequestBuilder {
pub(crate) fn new(template: CreateRequest, physical_table_id: Option<TableId>) -> Self {
pub fn new(template: CreateRequest, physical_table_id: Option<TableId>) -> Self {
Self {
template,
physical_table_id,

View File

@@ -120,7 +120,13 @@ impl State for DropDatabaseExecutor {
.await?;
executor.invalidate_table_cache(ddl_ctx).await?;
executor
.on_drop_regions(ddl_ctx, &self.physical_region_routes, true)
.on_drop_regions(
&ddl_ctx.node_manager,
&ddl_ctx.leader_region_registry,
&self.physical_region_routes,
true,
false,
)
.await?;
info!("Table: {}({}) is dropped", self.table_name, self.table_id);

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod executor;
pub mod executor;
mod metadata;
use std::collections::HashMap;
@@ -156,7 +156,13 @@ impl DropTableProcedure {
pub async fn on_datanode_drop_regions(&mut self) -> Result<Status> {
self.executor
.on_drop_regions(&self.context, &self.data.physical_region_routes, false)
.on_drop_regions(
&self.context.node_manager,
&self.context.leader_region_registry,
&self.data.physical_region_routes,
false,
false,
)
.await?;
self.data.state = DropTableState::DeleteTombstone;
Ok(Status::executing(true))

View File

@@ -36,6 +36,8 @@ use crate::error::{self, Result};
use crate::instruction::CacheIdent;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::node_manager::NodeManagerRef;
use crate::region_registry::LeaderRegionRegistryRef;
use crate::rpc::router::{
RegionRoute, find_follower_regions, find_followers, find_leader_regions, find_leaders,
operating_leader_regions,
@@ -212,16 +214,18 @@ impl DropTableExecutor {
/// Drops region on datanode.
pub async fn on_drop_regions(
&self,
ctx: &DdlContext,
node_manager: &NodeManagerRef,
leader_region_registry: &LeaderRegionRegistryRef,
region_routes: &[RegionRoute],
fast_path: bool,
force: bool,
) -> Result<()> {
// Drops leader regions on datanodes.
let leaders = find_leaders(region_routes);
let mut drop_region_tasks = Vec::with_capacity(leaders.len());
let table_id = self.table_id;
for datanode in leaders {
let requester = ctx.node_manager.datanode(&datanode).await;
let requester = node_manager.datanode(&datanode).await;
let regions = find_leader_regions(region_routes, &datanode);
let region_ids = regions
.iter()
@@ -238,6 +242,7 @@ impl DropTableExecutor {
body: Some(region_request::Body::Drop(PbDropRegionRequest {
region_id: region_id.as_u64(),
fast_path,
force,
})),
};
let datanode = datanode.clone();
@@ -262,7 +267,7 @@ impl DropTableExecutor {
let followers = find_followers(region_routes);
let mut close_region_tasks = Vec::with_capacity(followers.len());
for datanode in followers {
let requester = ctx.node_manager.datanode(&datanode).await;
let requester = node_manager.datanode(&datanode).await;
let regions = find_follower_regions(region_routes, &datanode);
let region_ids = regions
.iter()
@@ -307,8 +312,7 @@ impl DropTableExecutor {
// Deletes the leader region from registry.
let region_ids = operating_leader_regions(region_routes);
ctx.leader_region_registry
.batch_delete(region_ids.into_iter().map(|(region_id, _)| region_id));
leader_region_registry.batch_delete(region_ids.into_iter().map(|(region_id, _)| region_id));
Ok(())
}

View File

@@ -17,47 +17,47 @@ use std::sync::Arc;
use common_telemetry::{debug, info};
use snafu::ensure;
use store_api::storage::{RegionId, RegionNumber, TableId};
use store_api::storage::{RegionNumber, TableId};
use crate::ddl::TableMetadata;
use crate::ddl::allocator::region_routes::RegionRoutesAllocatorRef;
use crate::ddl::allocator::resource_id::ResourceIdAllocatorRef;
use crate::ddl::allocator::wal_options::WalOptionsAllocatorRef;
use crate::error::{Result, UnsupportedSnafu};
use crate::key::table_route::PhysicalTableRouteValue;
use crate::peer::{NoopPeerAllocator, PeerAllocatorRef};
use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{Region, RegionRoute};
use crate::sequence::SequenceRef;
use crate::wal_options_allocator::{WalOptionsAllocatorRef, allocate_region_wal_options};
pub type TableMetadataAllocatorRef = Arc<TableMetadataAllocator>;
#[derive(Clone)]
pub struct TableMetadataAllocator {
table_id_sequence: SequenceRef,
table_id_allocator: ResourceIdAllocatorRef,
wal_options_allocator: WalOptionsAllocatorRef,
peer_allocator: PeerAllocatorRef,
region_routes_allocator: RegionRoutesAllocatorRef,
}
impl TableMetadataAllocator {
pub fn new(
table_id_sequence: SequenceRef,
table_id_allocator: ResourceIdAllocatorRef,
wal_options_allocator: WalOptionsAllocatorRef,
) -> Self {
Self::with_peer_allocator(
table_id_sequence,
table_id_allocator,
wal_options_allocator,
Arc::new(NoopPeerAllocator),
)
}
pub fn with_peer_allocator(
table_id_sequence: SequenceRef,
table_id_allocator: ResourceIdAllocatorRef,
wal_options_allocator: WalOptionsAllocatorRef,
peer_allocator: PeerAllocatorRef,
) -> Self {
Self {
table_id_sequence,
table_id_allocator,
wal_options_allocator,
peer_allocator,
region_routes_allocator: Arc::new(peer_allocator) as _,
}
}
@@ -70,7 +70,7 @@ impl TableMetadataAllocator {
ensure!(
!self
.table_id_sequence
.table_id_allocator
.min_max()
.await
.contains(&(table_id as u64)),
@@ -89,65 +89,35 @@ impl TableMetadataAllocator {
table_id
} else {
self.table_id_sequence.next().await? as TableId
self.table_id_allocator.next().await? as TableId
};
Ok(table_id)
}
fn create_wal_options(
async fn create_wal_options(
&self,
table_route: &PhysicalTableRouteValue,
region_numbers: &[RegionNumber],
skip_wal: bool,
) -> Result<HashMap<RegionNumber, String>> {
let region_numbers = table_route
.region_routes
.iter()
.map(|route| route.region.id.region_number())
.collect();
allocate_region_wal_options(region_numbers, &self.wal_options_allocator, skip_wal)
self.wal_options_allocator
.allocate(region_numbers, skip_wal)
.await
}
async fn create_table_route(
&self,
table_id: TableId,
task: &CreateTableTask,
partition_exprs: &[&str],
) -> Result<PhysicalTableRouteValue> {
let regions = task.partitions.len().max(1);
let peers = self.peer_allocator.alloc(regions).await?;
debug!("Allocated peers {:?} for table {}", peers, table_id);
let mut region_routes = task
.partitions
let region_number_and_partition_exprs = partition_exprs
.iter()
.enumerate()
.map(|(i, partition)| {
let region = Region {
id: RegionId::new(table_id, i as u32),
partition_expr: partition.expression.clone(),
..Default::default()
};
let peer = peers[i % peers.len()].clone();
RegionRoute {
region,
leader_peer: Some(peer),
..Default::default()
}
})
.map(|(i, partition)| (i as u32, *partition))
.collect::<Vec<_>>();
// If the table has no partitions, we need to create a default region.
if region_routes.is_empty() {
region_routes.push(RegionRoute {
region: Region {
id: RegionId::new(table_id, 0),
..Default::default()
},
leader_peer: Some(peers[0].clone()),
..Default::default()
});
}
let region_routes = self
.region_routes_allocator
.allocate(table_id, &region_number_and_partition_exprs)
.await?;
Ok(PhysicalTableRouteValue::new(region_routes))
}
@@ -164,10 +134,20 @@ impl TableMetadataAllocator {
pub async fn create(&self, task: &CreateTableTask) -> Result<TableMetadata> {
let table_id = self.allocate_table_id(&task.create_table.table_id).await?;
let table_route = self.create_table_route(table_id, task).await?;
let region_wal_options =
self.create_wal_options(&table_route, task.table_info.meta.options.skip_wal)?;
let partition_exprs = task
.partitions
.iter()
.map(|p| p.expression.as_str())
.collect::<Vec<_>>();
let table_route = self.create_table_route(table_id, &partition_exprs).await?;
let region_numbers = table_route
.region_routes
.iter()
.map(|route| route.region.id.region_number())
.collect::<Vec<_>>();
let region_wal_options = self
.create_wal_options(&region_numbers, task.table_info.meta.options.skip_wal)
.await?;
debug!(
"Allocated region wal options {:?} for table {}",
@@ -181,7 +161,18 @@ impl TableMetadataAllocator {
})
}
pub fn table_id_sequence(&self) -> SequenceRef {
self.table_id_sequence.clone()
/// Returns the table id allocator.
pub fn table_id_allocator(&self) -> ResourceIdAllocatorRef {
self.table_id_allocator.clone()
}
/// Returns the wal options allocator.
pub fn wal_options_allocator(&self) -> WalOptionsAllocatorRef {
self.wal_options_allocator.clone()
}
/// Returns the region routes allocator.
pub fn region_routes_allocator(&self) -> RegionRoutesAllocatorRef {
self.region_routes_allocator.clone()
}
}

View File

@@ -128,7 +128,6 @@ pub fn build_raw_table_info_from_expr(expr: &CreateTableExpr) -> RawTableInfo {
value_indices: vec![],
engine: expr.engine.clone(),
next_column_id: expr.column_defs.len() as u32,
region_numbers: vec![],
options: TableOptions::try_from_iter(&expr.table_options).unwrap(),
created_on: DateTime::default(),
updated_on: DateTime::default(),

View File

@@ -166,7 +166,7 @@ async fn test_on_prepare_logical_table_exists_err() {
.table_metadata_manager
.create_logical_tables_metadata(vec![(
task.table_info.clone(),
TableRouteValue::logical(1024, vec![RegionId::new(1025, 1)]),
TableRouteValue::logical(1024),
)])
.await
.unwrap();
@@ -208,7 +208,7 @@ async fn test_on_prepare_with_create_if_table_exists() {
.table_metadata_manager
.create_logical_tables_metadata(vec![(
task.table_info.clone(),
TableRouteValue::logical(1024, vec![RegionId::new(8192, 1)]),
TableRouteValue::logical(1024),
)])
.await
.unwrap();
@@ -252,7 +252,7 @@ async fn test_on_prepare_part_logical_tables_exist() {
.table_metadata_manager
.create_logical_tables_metadata(vec![(
task.table_info.clone(),
TableRouteValue::logical(1024, vec![RegionId::new(8192, 1)]),
TableRouteValue::logical(1024),
)])
.await
.unwrap();
@@ -392,7 +392,7 @@ async fn test_on_create_metadata_part_logical_tables_exist() {
.table_metadata_manager
.create_logical_tables_metadata(vec![(
task.table_info.clone(),
TableRouteValue::logical(1024, vec![RegionId::new(8192, 1)]),
TableRouteValue::logical(1024),
)])
.await
.unwrap();
@@ -496,10 +496,7 @@ async fn test_on_create_metadata_err() {
task.table_info.ident.table_id = 1025;
ddl_context
.table_metadata_manager
.create_logical_tables_metadata(vec![(
task.table_info,
TableRouteValue::logical(512, vec![RegionId::new(1026, 1)]),
)])
.create_logical_tables_metadata(vec![(task.table_info, TableRouteValue::logical(512))])
.await
.unwrap();
// Triggers procedure to create table metadata

View File

@@ -162,7 +162,7 @@ async fn test_on_prepare_table_exists_err() {
)
.await
.unwrap();
let mut procedure = CreateTableProcedure::new(task, ddl_context);
let mut procedure = CreateTableProcedure::new(task, ddl_context).unwrap();
let err = procedure.on_prepare().await.unwrap_err();
assert_matches!(err, Error::TableAlreadyExists { .. });
assert_eq!(err.status_code(), StatusCode::TableAlreadyExists);
@@ -185,7 +185,7 @@ async fn test_on_prepare_with_create_if_table_exists() {
)
.await
.unwrap();
let mut procedure = CreateTableProcedure::new(task, ddl_context);
let mut procedure = CreateTableProcedure::new(task, ddl_context).unwrap();
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Done { output: Some(..) });
let table_id = *status.downcast_output_ref::<u32>().unwrap();
@@ -198,7 +198,7 @@ async fn test_on_prepare_without_create_if_table_exists() {
let ddl_context = new_ddl_context(node_manager);
let mut task = test_create_table_task("foo");
task.create_table.create_if_not_exists = true;
let mut procedure = CreateTableProcedure::new(task, ddl_context);
let mut procedure = CreateTableProcedure::new(task, ddl_context).unwrap();
let status = procedure.on_prepare().await.unwrap();
assert_matches!(
status,
@@ -217,7 +217,7 @@ async fn test_on_datanode_create_regions_should_retry() {
let ddl_context = new_ddl_context(node_manager);
let task = test_create_table_task("foo");
assert!(!task.create_table.create_if_not_exists);
let mut procedure = CreateTableProcedure::new(task, ddl_context);
let mut procedure = CreateTableProcedure::new(task, ddl_context).unwrap();
procedure.on_prepare().await.unwrap();
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
@@ -234,7 +234,7 @@ async fn test_on_datanode_create_regions_should_not_retry() {
let ddl_context = new_ddl_context(node_manager);
let task = test_create_table_task("foo");
assert!(!task.create_table.create_if_not_exists);
let mut procedure = CreateTableProcedure::new(task, ddl_context);
let mut procedure = CreateTableProcedure::new(task, ddl_context).unwrap();
procedure.on_prepare().await.unwrap();
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
@@ -251,7 +251,7 @@ async fn test_on_create_metadata_error() {
let ddl_context = new_ddl_context(node_manager);
let task = test_create_table_task("foo");
assert!(!task.create_table.create_if_not_exists);
let mut procedure = CreateTableProcedure::new(task.clone(), ddl_context.clone());
let mut procedure = CreateTableProcedure::new(task.clone(), ddl_context.clone()).unwrap();
procedure.on_prepare().await.unwrap();
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
@@ -284,7 +284,7 @@ async fn test_on_create_metadata() {
let ddl_context = new_ddl_context(node_manager);
let task = test_create_table_task("foo");
assert!(!task.create_table.create_if_not_exists);
let mut procedure = CreateTableProcedure::new(task, ddl_context.clone());
let mut procedure = CreateTableProcedure::new(task, ddl_context.clone()).unwrap();
procedure.on_prepare().await.unwrap();
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
@@ -312,16 +312,16 @@ async fn test_memory_region_keeper_guard_dropped_on_procedure_done() {
let ddl_context = new_ddl_context_with_kv_backend(node_manager, kv_backend);
let task = test_create_table_task("foo");
let mut procedure = CreateTableProcedure::new(task, ddl_context.clone());
let mut procedure = CreateTableProcedure::new(task, ddl_context.clone()).unwrap();
execute_procedure_until(&mut procedure, |p| {
p.creator.data.state == CreateTableState::CreateMetadata
p.data.state == CreateTableState::CreateMetadata
})
.await;
// Ensure that after running to the state `CreateMetadata`(just past `DatanodeCreateRegions`),
// the opening regions should be recorded:
let guards = &procedure.creator.opening_regions;
let guards = &procedure.opening_regions;
assert_eq!(guards.len(), 1);
let (datanode_id, region_id) = (0, RegionId::new(procedure.table_id(), 0));
assert_eq!(guards[0].info(), (datanode_id, region_id));
@@ -334,7 +334,7 @@ async fn test_memory_region_keeper_guard_dropped_on_procedure_done() {
execute_procedure_until_done(&mut procedure).await;
// Ensure that when run to the end, the opening regions should be cleared:
let guards = &procedure.creator.opening_regions;
let guards = &procedure.opening_regions;
assert!(guards.is_empty());
assert!(
!ddl_context

View File

@@ -259,7 +259,7 @@ async fn test_replace_table() {
{
// Create a `foo` table.
let task = test_create_table_task("foo");
let mut procedure = CreateTableProcedure::new(task, ddl_context.clone());
let mut procedure = CreateTableProcedure::new(task, ddl_context.clone()).unwrap();
procedure.on_prepare().await.unwrap();
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),

View File

@@ -14,15 +14,19 @@
use std::sync::Arc;
use api::v1::Repartition;
use api::v1::alter_table_expr::Kind;
use common_error::ext::BoxedError;
use common_procedure::{
BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId, watcher,
BoxedProcedure, BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef,
ProcedureWithId, watcher,
};
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{debug, info, tracing};
use derive_builder::Builder;
use snafu::{OptionExt, ResultExt, ensure};
use store_api::storage::TableId;
use table::table_name::TableName;
use crate::ddl::alter_database::AlterDatabaseProcedure;
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
@@ -40,7 +44,8 @@ use crate::ddl::drop_view::DropViewProcedure;
use crate::ddl::truncate_table::TruncateTableProcedure;
use crate::ddl::{DdlContext, utils};
use crate::error::{
EmptyDdlTasksSnafu, ProcedureOutputSnafu, RegisterProcedureLoaderSnafu, Result,
CreateRepartitionProcedureSnafu, EmptyDdlTasksSnafu, ProcedureOutputSnafu,
RegisterProcedureLoaderSnafu, RegisterRepartitionProcedureLoaderSnafu, Result,
SubmitProcedureSnafu, TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu,
UnexpectedLogicalRouteTableSnafu, WaitProcedureSnafu,
};
@@ -90,6 +95,7 @@ pub type BoxedProcedureLoaderFactory = dyn Fn(DdlContext) -> BoxedProcedureLoade
pub struct DdlManager {
ddl_context: DdlContext,
procedure_manager: ProcedureManagerRef,
repartition_procedure_factory: RepartitionProcedureFactoryRef,
#[cfg(feature = "enterprise")]
trigger_ddl_manager: Option<TriggerDdlManagerRef>,
}
@@ -143,16 +149,37 @@ macro_rules! procedure_loader {
};
}
pub type RepartitionProcedureFactoryRef = Arc<dyn RepartitionProcedureFactory>;
pub trait RepartitionProcedureFactory: Send + Sync {
fn create(
&self,
ddl_ctx: &DdlContext,
table_name: TableName,
table_id: TableId,
from_exprs: Vec<String>,
to_exprs: Vec<String>,
) -> std::result::Result<BoxedProcedure, BoxedError>;
fn register_loaders(
&self,
ddl_ctx: &DdlContext,
procedure_manager: &ProcedureManagerRef,
) -> std::result::Result<(), BoxedError>;
}
impl DdlManager {
/// Returns a new [DdlManager] with all Ddl [BoxedProcedureLoader](common_procedure::procedure::BoxedProcedureLoader)s registered.
pub fn try_new(
ddl_context: DdlContext,
procedure_manager: ProcedureManagerRef,
repartition_procedure_factory: RepartitionProcedureFactoryRef,
register_loaders: bool,
) -> Result<Self> {
let manager = Self {
ddl_context,
procedure_manager,
repartition_procedure_factory,
#[cfg(feature = "enterprise")]
trigger_ddl_manager: None,
};
@@ -204,9 +231,63 @@ impl DdlManager {
.context(RegisterProcedureLoaderSnafu { type_name })?;
}
self.repartition_procedure_factory
.register_loaders(&self.ddl_context, &self.procedure_manager)
.context(RegisterRepartitionProcedureLoaderSnafu)?;
Ok(())
}
/// Submits a repartition procedure for the specified table.
///
/// This creates a repartition procedure using the provided `table_id`,
/// `table_name`, and `Repartition` configuration, and then either executes it
/// to completion or just submits it for asynchronous execution.
///
/// The `Repartition` argument contains the original (`from_partition_exprs`)
/// and target (`into_partition_exprs`) partition expressions that define how
/// the table should be repartitioned.
///
/// The `wait` flag controls whether this method waits for the repartition
/// procedure to finish:
/// - If `wait` is `true`, the procedure is executed and this method awaits
/// its completion, returning both the generated `ProcedureId` and the
/// final `Output` of the procedure.
/// - If `wait` is `false`, the procedure is only submitted to the procedure
/// manager for asynchronous execution, and this method returns the
/// `ProcedureId` along with `None` as the output.
async fn submit_repartition_task(
&self,
table_id: TableId,
table_name: TableName,
Repartition {
from_partition_exprs,
into_partition_exprs,
wait,
}: Repartition,
) -> Result<(ProcedureId, Option<Output>)> {
let context = self.create_context();
let procedure = self
.repartition_procedure_factory
.create(
&context,
table_name,
table_id,
from_partition_exprs,
into_partition_exprs,
)
.context(CreateRepartitionProcedureSnafu)?;
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
if wait {
self.execute_procedure_and_wait(procedure_with_id).await
} else {
self.submit_procedure(procedure_with_id)
.await
.map(|p| (p, None))
}
}
/// Submits and executes an alter table task.
#[tracing::instrument(skip_all)]
pub async fn submit_alter_table_task(
@@ -214,13 +295,28 @@ impl DdlManager {
table_id: TableId,
alter_table_task: AlterTableTask,
) -> Result<(ProcedureId, Option<Output>)> {
let context = self.create_context();
// make alter_table_task mutable so we can call .take() on its field
let mut alter_table_task = alter_table_task;
if let Some(Kind::Repartition(_)) = alter_table_task.alter_table.kind.as_ref()
&& let Kind::Repartition(repartition) =
alter_table_task.alter_table.kind.take().unwrap()
{
let table_name = TableName::new(
alter_table_task.alter_table.catalog_name,
alter_table_task.alter_table.schema_name,
alter_table_task.alter_table.table_name,
);
return self
.submit_repartition_task(table_id, table_name, repartition)
.await;
}
let context = self.create_context();
let procedure = AlterTableProcedure::new(table_id, alter_table_task, context)?;
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
self.execute_procedure_and_wait(procedure_with_id).await
}
/// Submits and executes a create table task.
@@ -231,11 +327,11 @@ impl DdlManager {
) -> Result<(ProcedureId, Option<Output>)> {
let context = self.create_context();
let procedure = CreateTableProcedure::new(create_table_task, context);
let procedure = CreateTableProcedure::new(create_table_task, context)?;
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
self.execute_procedure_and_wait(procedure_with_id).await
}
/// Submits and executes a `[CreateViewTask]`.
@@ -250,7 +346,7 @@ impl DdlManager {
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
self.execute_procedure_and_wait(procedure_with_id).await
}
/// Submits and executes a create multiple logical table tasks.
@@ -267,7 +363,7 @@ impl DdlManager {
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
self.execute_procedure_and_wait(procedure_with_id).await
}
/// Submits and executes alter multiple table tasks.
@@ -284,7 +380,7 @@ impl DdlManager {
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
self.execute_procedure_and_wait(procedure_with_id).await
}
/// Submits and executes a drop table task.
@@ -299,7 +395,7 @@ impl DdlManager {
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
self.execute_procedure_and_wait(procedure_with_id).await
}
/// Submits and executes a create database task.
@@ -318,7 +414,7 @@ impl DdlManager {
CreateDatabaseProcedure::new(catalog, schema, create_if_not_exists, options, context);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
self.execute_procedure_and_wait(procedure_with_id).await
}
/// Submits and executes a drop table task.
@@ -335,7 +431,7 @@ impl DdlManager {
let procedure = DropDatabaseProcedure::new(catalog, schema, drop_if_exists, context);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
self.execute_procedure_and_wait(procedure_with_id).await
}
pub async fn submit_alter_database(
@@ -346,7 +442,7 @@ impl DdlManager {
let procedure = AlterDatabaseProcedure::new(alter_database_task, context)?;
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
self.execute_procedure_and_wait(procedure_with_id).await
}
/// Submits and executes a create flow task.
@@ -360,7 +456,7 @@ impl DdlManager {
let procedure = CreateFlowProcedure::new(create_flow, query_context, context);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
self.execute_procedure_and_wait(procedure_with_id).await
}
/// Submits and executes a drop flow task.
@@ -373,7 +469,7 @@ impl DdlManager {
let procedure = DropFlowProcedure::new(drop_flow, context);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
self.execute_procedure_and_wait(procedure_with_id).await
}
/// Submits and executes a drop view task.
@@ -386,7 +482,7 @@ impl DdlManager {
let procedure = DropViewProcedure::new(drop_view, context);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
self.execute_procedure_and_wait(procedure_with_id).await
}
/// Submits and executes a truncate table task.
@@ -407,7 +503,7 @@ impl DdlManager {
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
self.execute_procedure_and_wait(procedure_with_id).await
}
/// Submits and executes a comment on task.
@@ -420,10 +516,11 @@ impl DdlManager {
let procedure = CommentOnProcedure::new(comment_on_task, context);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
self.execute_procedure_and_wait(procedure_with_id).await
}
async fn submit_procedure(
/// Executes a procedure and waits for the result.
async fn execute_procedure_and_wait(
&self,
procedure_with_id: ProcedureWithId,
) -> Result<(ProcedureId, Option<Output>)> {
@@ -442,6 +539,18 @@ impl DdlManager {
Ok((procedure_id, output))
}
/// Submits a procedure and returns the procedure id.
async fn submit_procedure(&self, procedure_with_id: ProcedureWithId) -> Result<ProcedureId> {
let procedure_id = procedure_with_id.id;
let _ = self
.procedure_manager
.submit(procedure_with_id)
.await
.context(SubmitProcedureSnafu)?;
Ok(procedure_id)
}
pub async fn submit_ddl_task(
&self,
ctx: &ExecutorContext,
@@ -947,8 +1056,12 @@ async fn handle_comment_on_task(
mod tests {
use std::sync::Arc;
use common_error::ext::BoxedError;
use common_procedure::local::LocalManager;
use common_procedure::test_util::InMemoryPoisonStore;
use common_procedure::{BoxedProcedure, ProcedureManagerRef};
use store_api::storage::TableId;
use table::table_name::TableName;
use super::DdlManager;
use crate::cache_invalidator::DummyCacheInvalidator;
@@ -959,6 +1072,7 @@ mod tests {
use crate::ddl::table_meta::TableMetadataAllocator;
use crate::ddl::truncate_table::TruncateTableProcedure;
use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl};
use crate::ddl_manager::RepartitionProcedureFactory;
use crate::key::TableMetadataManager;
use crate::key::flow::FlowMetadataManager;
use crate::kv_backend::memory::MemoryKvBackend;
@@ -968,7 +1082,7 @@ mod tests {
use crate::region_registry::LeaderRegionRegistry;
use crate::sequence::SequenceBuilder;
use crate::state_store::KvStateStore;
use crate::wal_options_allocator::WalOptionsAllocator;
use crate::wal_provider::WalProvider;
/// A dummy implemented [NodeManager].
pub struct DummyDatanodeManager;
@@ -987,13 +1101,37 @@ mod tests {
}
}
struct DummyRepartitionProcedureFactory;
#[async_trait::async_trait]
impl RepartitionProcedureFactory for DummyRepartitionProcedureFactory {
fn create(
&self,
_ddl_ctx: &DdlContext,
_table_name: TableName,
_table_id: TableId,
_from_exprs: Vec<String>,
_to_exprs: Vec<String>,
) -> std::result::Result<BoxedProcedure, BoxedError> {
unimplemented!()
}
fn register_loaders(
&self,
_ddl_ctx: &DdlContext,
_procedure_manager: &ProcedureManagerRef,
) -> std::result::Result<(), BoxedError> {
Ok(())
}
}
#[test]
fn test_try_new() {
let kv_backend = Arc::new(MemoryKvBackend::new());
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
Arc::new(WalOptionsAllocator::default()),
Arc::new(WalProvider::default()),
));
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
@@ -1023,6 +1161,7 @@ mod tests {
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
},
procedure_manager.clone(),
Arc::new(DummyRepartitionProcedureFactory),
true,
);

View File

@@ -104,6 +104,20 @@ pub enum Error {
source: common_procedure::error::Error,
},
#[snafu(display("Failed to register repartition procedure loader"))]
RegisterRepartitionProcedureLoader {
#[snafu(implicit)]
location: Location,
source: BoxedError,
},
#[snafu(display("Failed to create repartition procedure"))]
CreateRepartitionProcedure {
source: BoxedError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to submit procedure"))]
SubmitProcedure {
#[snafu(implicit)]
@@ -1170,6 +1184,8 @@ impl ErrorExt for Error {
PutPoison { source, .. } => source.status_code(),
ConvertColumnDef { source, .. } => source.status_code(),
ProcedureStateReceiver { source, .. } => source.status_code(),
RegisterRepartitionProcedureLoader { source, .. } => source.status_code(),
CreateRepartitionProcedure { source, .. } => source.status_code(),
ParseProcedureId { .. }
| InvalidNumTopics { .. }

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::fmt::{Display, Formatter};
use std::time::Duration;
@@ -432,11 +432,11 @@ where
pub struct GetFileRefs {
/// List of region IDs to get file references from active FileHandles (in-memory).
pub query_regions: Vec<RegionId>,
/// Mapping from the source region ID (where to read the manifest) to
/// the target region IDs (whose file references to look for).
/// Key: The region ID of the manifest.
/// Value: The list of region IDs to find references for in that manifest.
pub related_regions: HashMap<RegionId, Vec<RegionId>>,
/// Mapping from the src region IDs (whose file references to look for) to
/// the dst region IDs (where to read the manifests).
/// Key: The source region IDs (where files originally came from).
/// Value: The set of destination region IDs (whose manifests need to be read).
pub related_regions: HashMap<RegionId, HashSet<RegionId>>,
}
impl Display for GetFileRefs {

View File

@@ -747,12 +747,10 @@ impl TableMetadataManager {
/// The caller MUST ensure it has the exclusive access to `TableNameKey`.
pub async fn create_table_metadata(
&self,
mut table_info: RawTableInfo,
table_info: RawTableInfo,
table_route_value: TableRouteValue,
region_wal_options: HashMap<RegionNumber, String>,
) -> Result<()> {
let region_numbers = table_route_value.region_numbers();
table_info.meta.region_numbers = region_numbers;
let table_id = table_info.ident.table_id;
let engine = table_info.meta.engine.clone();
@@ -851,8 +849,7 @@ impl TableMetadataManager {
on_create_table_route_failure: F2,
}
let mut on_failures = Vec::with_capacity(len);
for (mut table_info, table_route_value) in tables_data {
table_info.meta.region_numbers = table_route_value.region_numbers();
for (table_info, table_route_value) in tables_data {
let table_id = table_info.ident.table_id;
// Creates table name.
@@ -1283,6 +1280,11 @@ impl TableMetadataManager {
region_distribution(current_table_route_value.region_routes()?);
let new_region_distribution = region_distribution(&new_region_routes);
let update_topic_region_txn = self.topic_region_manager.build_update_txn(
table_id,
&region_info.region_wal_options,
new_region_wal_options,
)?;
let update_datanode_table_txn = self.datanode_table_manager().build_update_txn(
table_id,
region_info,
@@ -1294,13 +1296,16 @@ impl TableMetadataManager {
// Updates the table_route.
let new_table_route_value = current_table_route_value.update(new_region_routes)?;
let (update_table_route_txn, on_update_table_route_failure) = self
.table_route_manager()
.table_route_storage()
.build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
let txn = Txn::merge_all(vec![update_datanode_table_txn, update_table_route_txn]);
let txn = Txn::merge_all(vec![
update_datanode_table_txn,
update_table_route_txn,
update_topic_region_txn,
]);
let mut r = self.kv_backend.txn(txn).await?;
@@ -1477,6 +1482,7 @@ mod tests {
use super::datanode_table::DatanodeTableKey;
use super::test_utils;
use crate::ddl::allocator::wal_options::WalOptionsAllocator;
use crate::ddl::test_util::create_table::test_create_table_task;
use crate::ddl::utils::region_storage_path;
use crate::error::Result;
@@ -1484,6 +1490,7 @@ mod tests {
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::key::topic_region::TopicRegionKey;
use crate::key::{
DeserializedValueWithBytes, RegionDistribution, RegionRoleSet, TOPIC_REGION_PREFIX,
TableMetadataManager, ViewInfoValue,
@@ -1493,7 +1500,7 @@ mod tests {
use crate::peer::Peer;
use crate::rpc::router::{LeaderState, Region, RegionRoute, region_distribution};
use crate::rpc::store::RangeRequest;
use crate::wal_options_allocator::{WalOptionsAllocator, allocate_region_wal_options};
use crate::wal_provider::WalProvider;
#[test]
fn test_deserialized_value_with_bytes() {
@@ -1543,8 +1550,8 @@ mod tests {
}
}
fn new_test_table_info(region_numbers: impl Iterator<Item = u32>) -> TableInfo {
test_utils::new_test_table_info(10, region_numbers)
fn new_test_table_info() -> TableInfo {
test_utils::new_test_table_info(10)
}
fn new_test_table_names() -> HashSet<TableName> {
@@ -1602,12 +1609,10 @@ mod tests {
let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
let region_route = new_test_region_route();
let region_routes = &vec![region_route.clone()];
let table_info: RawTableInfo =
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let wal_allocator = WalOptionsAllocator::RaftEngine;
let regions = (0..16).collect();
let region_wal_options =
allocate_region_wal_options(regions, &wal_allocator, false).unwrap();
let table_info: RawTableInfo = new_test_table_info().into();
let wal_provider = WalProvider::RaftEngine;
let regions: Vec<_> = (0..16).collect();
let region_wal_options = wal_provider.allocate(&regions, false).await.unwrap();
create_physical_table_metadata(
&table_metadata_manager,
table_info.clone(),
@@ -1630,8 +1635,7 @@ mod tests {
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let region_route = new_test_region_route();
let region_routes = &vec![region_route.clone()];
let table_info: RawTableInfo =
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let table_info: RawTableInfo = new_test_table_info().into();
let region_wal_options = create_mock_region_wal_options()
.into_iter()
.map(|(k, v)| (k, serde_json::to_string(&v).unwrap()))
@@ -1713,8 +1717,7 @@ mod tests {
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let region_route = new_test_region_route();
let region_routes = vec![region_route.clone()];
let table_info: RawTableInfo =
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let table_info: RawTableInfo = new_test_table_info().into();
let table_id = table_info.ident.table_id;
let table_route_value = TableRouteValue::physical(region_routes.clone());
@@ -1779,7 +1782,6 @@ mod tests {
let table_info: RawTableInfo = test_utils::new_test_table_info_with_name(
table_id,
&format!("my_table_{}", table_id),
region_routes.iter().map(|r| r.region.id.region_number()),
)
.into();
let table_route_value = TableRouteValue::physical(region_routes.clone());
@@ -1800,8 +1802,7 @@ mod tests {
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let region_route = new_test_region_route();
let region_routes = &vec![region_route.clone()];
let table_info: RawTableInfo =
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let table_info: RawTableInfo = new_test_table_info().into();
let table_id = table_info.ident.table_id;
let datanode_id = 2;
let region_wal_options = create_mock_region_wal_options();
@@ -1907,8 +1908,7 @@ mod tests {
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let region_route = new_test_region_route();
let region_routes = vec![region_route.clone()];
let table_info: RawTableInfo =
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let table_info: RawTableInfo = new_test_table_info().into();
let table_id = table_info.ident.table_id;
// creates metadata.
create_physical_table_metadata(
@@ -1984,8 +1984,7 @@ mod tests {
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let region_route = new_test_region_route();
let region_routes = vec![region_route.clone()];
let table_info: RawTableInfo =
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let table_info: RawTableInfo = new_test_table_info().into();
let table_id = table_info.ident.table_id;
// creates metadata.
create_physical_table_metadata(
@@ -2070,8 +2069,7 @@ mod tests {
leader_down_since: None,
},
];
let table_info: RawTableInfo =
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let table_info: RawTableInfo = new_test_table_info().into();
let table_id = table_info.ident.table_id;
let current_table_route_value = DeserializedValueWithBytes::from_inner(
TableRouteValue::physical(region_routes.clone()),
@@ -2153,8 +2151,7 @@ mod tests {
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let region_route = new_test_region_route();
let region_routes = vec![region_route.clone()];
let table_info: RawTableInfo =
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let table_info: RawTableInfo = new_test_table_info().into();
let table_id = table_info.ident.table_id;
let engine = table_info.meta.engine.as_str();
let region_storage_path =
@@ -2274,6 +2271,218 @@ mod tests {
);
}
#[tokio::test]
async fn test_update_table_route_with_topic_region_mapping() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
let region_route = new_test_region_route();
let region_routes = vec![region_route.clone()];
let table_info: RawTableInfo = new_test_table_info().into();
let table_id = table_info.ident.table_id;
let engine = table_info.meta.engine.as_str();
let region_storage_path =
region_storage_path(&table_info.catalog_name, &table_info.schema_name);
// Create initial metadata with Kafka WAL options
let old_region_wal_options: HashMap<RegionNumber, String> = vec![
(
1,
serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
topic: "topic_1".to_string(),
}))
.unwrap(),
),
(
2,
serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
topic: "topic_2".to_string(),
}))
.unwrap(),
),
]
.into_iter()
.collect();
create_physical_table_metadata(
&table_metadata_manager,
table_info.clone(),
region_routes.clone(),
old_region_wal_options.clone(),
)
.await
.unwrap();
let current_table_route_value = DeserializedValueWithBytes::from_inner(
TableRouteValue::physical(region_routes.clone()),
);
// Verify initial topic region mappings exist
let region_id_1 = RegionId::new(table_id, 1);
let region_id_2 = RegionId::new(table_id, 2);
let topic_1_key = TopicRegionKey::new(region_id_1, "topic_1");
let topic_2_key = TopicRegionKey::new(region_id_2, "topic_2");
assert!(
table_metadata_manager
.topic_region_manager
.get(topic_1_key.clone())
.await
.unwrap()
.is_some()
);
assert!(
table_metadata_manager
.topic_region_manager
.get(topic_2_key.clone())
.await
.unwrap()
.is_some()
);
// Test 1: Add new region with new topic
let new_region_routes = vec![
new_region_route(1, 1),
new_region_route(2, 2),
new_region_route(3, 3), // New region
];
let new_region_wal_options: HashMap<RegionNumber, String> = vec![
(
1,
serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
topic: "topic_1".to_string(), // Unchanged
}))
.unwrap(),
),
(
2,
serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
topic: "topic_2".to_string(), // Unchanged
}))
.unwrap(),
),
(
3,
serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
topic: "topic_3".to_string(), // New topic
}))
.unwrap(),
),
]
.into_iter()
.collect();
let current_table_route_value_updated = DeserializedValueWithBytes::from_inner(
current_table_route_value
.inner
.update(new_region_routes.clone())
.unwrap(),
);
table_metadata_manager
.update_table_route(
table_id,
RegionInfo {
engine: engine.to_string(),
region_storage_path: region_storage_path.clone(),
region_options: HashMap::new(),
region_wal_options: old_region_wal_options.clone(),
},
&current_table_route_value,
new_region_routes.clone(),
&HashMap::new(),
&new_region_wal_options,
)
.await
.unwrap();
// Verify new topic region mapping was created
let region_id_3 = RegionId::new(table_id, 3);
let topic_3_key = TopicRegionKey::new(region_id_3, "topic_3");
assert!(
table_metadata_manager
.topic_region_manager
.get(topic_3_key)
.await
.unwrap()
.is_some()
);
// Test 2: Remove a region and change topic for another
let newer_region_routes = vec![
new_region_route(1, 1),
// Region 2 removed
// Region 3 now has different topic
];
let newer_region_wal_options: HashMap<RegionNumber, String> = vec![
(
1,
serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
topic: "topic_1".to_string(), // Unchanged
}))
.unwrap(),
),
(
3,
serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
topic: "topic_3_new".to_string(), // Changed topic
}))
.unwrap(),
),
]
.into_iter()
.collect();
table_metadata_manager
.update_table_route(
table_id,
RegionInfo {
engine: engine.to_string(),
region_storage_path: region_storage_path.clone(),
region_options: HashMap::new(),
region_wal_options: new_region_wal_options.clone(),
},
&current_table_route_value_updated,
newer_region_routes.clone(),
&HashMap::new(),
&newer_region_wal_options,
)
.await
.unwrap();
// Verify region 2 mapping was deleted
let topic_2_key_new = TopicRegionKey::new(region_id_2, "topic_2");
assert!(
table_metadata_manager
.topic_region_manager
.get(topic_2_key_new)
.await
.unwrap()
.is_none()
);
// Verify region 3 old topic mapping was deleted
let topic_3_key_old = TopicRegionKey::new(region_id_3, "topic_3");
assert!(
table_metadata_manager
.topic_region_manager
.get(topic_3_key_old)
.await
.unwrap()
.is_none()
);
// Verify region 3 new topic mapping was created
let topic_3_key_new = TopicRegionKey::new(region_id_3, "topic_3_new");
assert!(
table_metadata_manager
.topic_region_manager
.get(topic_3_key_new)
.await
.unwrap()
.is_some()
);
// Verify region 1 mapping still exists (unchanged)
assert!(
table_metadata_manager
.topic_region_manager
.get(topic_1_key)
.await
.unwrap()
.is_some()
);
}
#[tokio::test]
async fn test_destroy_table_metadata() {
let mem_kv = Arc::new(MemoryKvBackend::default());
@@ -2408,7 +2617,7 @@ mod tests {
let mem_kv = Arc::new(MemoryKvBackend::default());
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let view_info: RawTableInfo = new_test_table_info(Vec::<u32>::new().into_iter()).into();
let view_info: RawTableInfo = new_test_table_info().into();
let view_id = view_info.ident.table_id;

View File

@@ -338,7 +338,6 @@ mod tests {
next_column_id: 3,
value_indices: vec![2, 3],
options: Default::default(),
region_numbers: vec![1],
partition_key_indices: vec![],
column_ids: vec![],
};

View File

@@ -16,7 +16,7 @@ use std::collections::{HashMap, HashSet};
use std::fmt::Display;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use serde::{Deserialize, Deserializer, Serialize};
use snafu::{OptionExt, ResultExt, ensure};
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;
@@ -62,16 +62,54 @@ pub enum TableRouteValue {
Logical(LogicalTableRouteValue),
}
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone, Default)]
#[derive(Debug, PartialEq, Serialize, Clone, Default)]
pub struct PhysicalTableRouteValue {
// The region routes of the table.
pub region_routes: Vec<RegionRoute>,
// Tracks the highest region number ever allocated for the table.
// This value only increases: adding a region updates it if needed,
// and dropping regions does not decrease it.
pub max_region_number: RegionNumber,
// The version of the table route.
version: u64,
}
impl<'de> Deserialize<'de> for PhysicalTableRouteValue {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: Deserializer<'de>,
{
#[derive(Deserialize)]
struct Helper {
region_routes: Vec<RegionRoute>,
#[serde(default)]
max_region_number: Option<RegionNumber>,
version: u64,
}
let mut helper = Helper::deserialize(deserializer)?;
// If the max region number is not provided, we will calculate it from the region routes.
if helper.max_region_number.is_none() {
let max_region = helper
.region_routes
.iter()
.map(|r| r.region.id.region_number())
.max()
.unwrap_or_default();
helper.max_region_number = Some(max_region);
}
Ok(PhysicalTableRouteValue {
region_routes: helper.region_routes,
max_region_number: helper.max_region_number.unwrap_or_default(),
version: helper.version,
})
}
}
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub struct LogicalTableRouteValue {
physical_table_id: TableId,
region_ids: Vec<RegionId>,
}
impl TableRouteValue {
@@ -85,14 +123,7 @@ impl TableRouteValue {
if table_id == physical_table_id {
TableRouteValue::physical(region_routes)
} else {
let region_routes = region_routes
.into_iter()
.map(|region| {
debug_assert_eq!(region.region.id.table_id(), physical_table_id);
RegionId::new(table_id, region.region.id.region_number())
})
.collect();
TableRouteValue::logical(physical_table_id, region_routes)
TableRouteValue::logical(physical_table_id)
}
}
@@ -100,8 +131,8 @@ impl TableRouteValue {
Self::Physical(PhysicalTableRouteValue::new(region_routes))
}
pub fn logical(physical_table_id: TableId, region_ids: Vec<RegionId>) -> Self {
Self::Logical(LogicalTableRouteValue::new(physical_table_id, region_ids))
pub fn logical(physical_table_id: TableId) -> Self {
Self::Logical(LogicalTableRouteValue::new(physical_table_id))
}
/// Returns a new version [TableRouteValue] with `region_routes`.
@@ -112,9 +143,19 @@ impl TableRouteValue {
err_msg: format!("{self:?} is a non-physical TableRouteValue."),
}
);
let version = self.as_physical_table_route_ref().version;
let physical_table_route = self.as_physical_table_route_ref();
let original_max_region_number = physical_table_route.max_region_number;
let new_max_region_number = region_routes
.iter()
.map(|r| r.region.id.region_number())
.max()
.unwrap_or_default();
let version = physical_table_route.version;
Ok(Self::Physical(PhysicalTableRouteValue {
region_routes,
// If region routes are added, we will update the max region number.
// If region routes are removed, we will keep the original max region number.
max_region_number: original_max_region_number.max(new_max_region_number),
version: version + 1,
}))
}
@@ -167,6 +208,20 @@ impl TableRouteValue {
Ok(&self.as_physical_table_route_ref().region_routes)
}
/// Returns the max region number of this [TableRouteValue::Physical].
///
/// # Panic
/// If it is not the [`PhysicalTableRouteValue`].
pub fn max_region_number(&self) -> Result<RegionNumber> {
ensure!(
self.is_physical(),
UnexpectedLogicalRouteTableSnafu {
err_msg: format!("{self:?} is a non-physical TableRouteValue."),
}
);
Ok(self.as_physical_table_route_ref().max_region_number)
}
/// Returns the reference of [`PhysicalTableRouteValue`].
///
/// # Panic
@@ -207,11 +262,9 @@ impl TableRouteValue {
.iter()
.map(|region_route| region_route.region.id.region_number())
.collect(),
TableRouteValue::Logical(x) => x
.region_ids()
.iter()
.map(|region_id| region_id.region_number())
.collect(),
TableRouteValue::Logical(_) => {
vec![]
}
}
}
}
@@ -237,28 +290,27 @@ impl MetadataValue for TableRouteValue {
impl PhysicalTableRouteValue {
pub fn new(region_routes: Vec<RegionRoute>) -> Self {
let max_region_number = region_routes
.iter()
.map(|r| r.region.id.region_number())
.max()
.unwrap_or_default();
Self {
region_routes,
max_region_number,
version: 0,
}
}
}
impl LogicalTableRouteValue {
pub fn new(physical_table_id: TableId, region_ids: Vec<RegionId>) -> Self {
Self {
physical_table_id,
region_ids,
}
pub fn new(physical_table_id: TableId) -> Self {
Self { physical_table_id }
}
pub fn physical_table_id(&self) -> TableId {
self.physical_table_id
}
pub fn region_ids(&self) -> &Vec<RegionId> {
&self.region_ids
}
}
impl MetadataKey<'_, TableRouteKey> for TableRouteKey {
@@ -823,6 +875,57 @@ mod tests {
use crate::rpc::router::Region;
use crate::rpc::store::PutRequest;
#[test]
fn test_update_table_route_max_region_number() {
let table_route = PhysicalTableRouteValue::new(vec![
RegionRoute {
region: Region {
id: RegionId::new(0, 1),
..Default::default()
},
..Default::default()
},
RegionRoute {
region: Region {
id: RegionId::new(0, 2),
..Default::default()
},
..Default::default()
},
]);
assert_eq!(table_route.max_region_number, 2);
// Shouldn't change the max region number.
let new_table_route = TableRouteValue::Physical(table_route)
.update(vec![RegionRoute {
region: Region {
id: RegionId::new(0, 1),
..Default::default()
},
..Default::default()
}])
.unwrap();
assert_eq!(
new_table_route
.as_physical_table_route_ref()
.max_region_number,
2
);
// Should increase the max region number.
let new_table_route = new_table_route
.update(vec![RegionRoute {
region: Region {
id: RegionId::new(0, 3),
..Default::default()
},
..Default::default()
}])
.unwrap()
.into_physical_table_route();
assert_eq!(new_table_route.max_region_number, 3);
}
#[test]
fn test_table_route_compatibility() {
let old_raw_v = r#"{"region_routes":[{"region":{"id":1,"name":"r1","partition":null,"attrs":{}},"leader_peer":{"id":2,"addr":"a2"},"follower_peers":[]},{"region":{"id":1,"name":"r1","partition":null,"attrs":{}},"leader_peer":{"id":2,"addr":"a2"},"follower_peers":[]}],"version":0}"#;
@@ -863,6 +966,7 @@ mod tests {
leader_down_since: None,
},
],
max_region_number: 1,
version: 0,
});
@@ -900,7 +1004,6 @@ mod tests {
let table_route_manager = TableRouteManager::new(kv.clone());
let table_route_value = TableRouteValue::Logical(LogicalTableRouteValue {
physical_table_id: 1023,
region_ids: vec![RegionId::new(1023, 1)],
});
let (txn, _) = table_route_manager
.table_route_storage()
@@ -930,14 +1033,12 @@ mod tests {
1024,
TableRouteValue::Logical(LogicalTableRouteValue {
physical_table_id: 1023,
region_ids: vec![RegionId::new(1023, 1)],
}),
),
(
1025,
TableRouteValue::Logical(LogicalTableRouteValue {
physical_table_id: 1023,
region_ids: vec![RegionId::new(1023, 2)],
}),
),
];
@@ -976,6 +1077,7 @@ mod tests {
}],
..Default::default()
}],
max_region_number: 0,
version: 0,
});

View File

@@ -19,11 +19,7 @@ use datatypes::schema::{ColumnSchema, SchemaBuilder};
use store_api::storage::TableId;
use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder};
pub fn new_test_table_info_with_name<I: IntoIterator<Item = u32>>(
table_id: TableId,
table_name: &str,
region_numbers: I,
) -> TableInfo {
pub fn new_test_table_info_with_name(table_id: TableId, table_name: &str) -> TableInfo {
let column_schemas = vec![
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new(
@@ -45,7 +41,6 @@ pub fn new_test_table_info_with_name<I: IntoIterator<Item = u32>>(
.primary_key_indices(vec![0])
.engine("engine")
.next_column_id(3)
.region_numbers(region_numbers.into_iter().collect::<Vec<_>>())
.build()
.unwrap();
TableInfoBuilder::default()
@@ -56,9 +51,6 @@ pub fn new_test_table_info_with_name<I: IntoIterator<Item = u32>>(
.build()
.unwrap()
}
pub fn new_test_table_info<I: IntoIterator<Item = u32>>(
table_id: TableId,
region_numbers: I,
) -> TableInfo {
new_test_table_info_with_name(table_id, "mytable", region_numbers)
pub fn new_test_table_info(table_id: TableId) -> TableInfo {
new_test_table_info_with_name(table_id, "mytable")
}

View File

@@ -243,7 +243,7 @@ impl TopicRegionManager {
let topic_region_mapping = self.get_topic_region_mapping(table_id, &region_wal_options);
let topic_region_keys = topic_region_mapping
.iter()
.map(|(topic, region_id)| TopicRegionKey::new(*topic, region_id))
.map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
.collect::<Vec<_>>();
let operations = topic_region_keys
.into_iter()
@@ -252,6 +252,55 @@ impl TopicRegionManager {
Ok(Txn::new().and_then(operations))
}
/// Build a update topic region mapping transaction.
pub fn build_update_txn(
&self,
table_id: TableId,
old_region_wal_options: &HashMap<RegionNumber, String>,
new_region_wal_options: &HashMap<RegionNumber, String>,
) -> Result<Txn> {
let old_wal_options_parsed = parse_region_wal_options(old_region_wal_options)?;
let new_wal_options_parsed = parse_region_wal_options(new_region_wal_options)?;
let old_mapping = self.get_topic_region_mapping(table_id, &old_wal_options_parsed);
let new_mapping = self.get_topic_region_mapping(table_id, &new_wal_options_parsed);
// Convert to HashMap for easier lookup: RegionId -> Topic
let old_map: HashMap<RegionId, &str> = old_mapping.into_iter().collect();
let new_map: HashMap<RegionId, &str> = new_mapping.into_iter().collect();
let mut ops = Vec::new();
// Check for deletes (in old but not in new, or topic changed)
for (region_id, old_topic) in &old_map {
match new_map.get(region_id) {
Some(new_topic) if *new_topic == *old_topic => {
// Same topic, do nothing (preserve checkpoint)
}
_ => {
// Removed or topic changed -> Delete old
let key = TopicRegionKey::new(*region_id, old_topic);
ops.push(TxnOp::Delete(key.to_bytes()));
}
}
}
// Check for adds (in new but not in old, or topic changed)
for (region_id, new_topic) in &new_map {
match old_map.get(region_id) {
Some(old_topic) if *old_topic == *new_topic => {
// Same topic, already handled (do nothing)
}
_ => {
// New or topic changed -> Put new
let key = TopicRegionKey::new(*region_id, new_topic);
// Initialize with empty value (default TopicRegionValue)
ops.push(TxnOp::Put(key.to_bytes(), vec![]));
}
}
}
Ok(Txn::new().and_then(ops))
}
/// Returns the map of [`RegionId`] to their corresponding topic [`TopicRegionValue`].
pub async fn regions(&self, topic: &str) -> Result<HashMap<RegionId, TopicRegionValue>> {
let prefix = TopicRegionKey::range_topic_key(topic);
@@ -431,4 +480,420 @@ mod tests {
RegionId::from_u64(4410931412992)
);
}
#[test]
fn test_build_create_txn() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let manager = TopicRegionManager::new(kv_backend.clone());
let table_id = 1;
let region_wal_options = vec![
(
0,
WalOptions::Kafka(KafkaWalOptions {
topic: "topic_0".to_string(),
}),
),
(
1,
WalOptions::Kafka(KafkaWalOptions {
topic: "topic_1".to_string(),
}),
),
(2, WalOptions::RaftEngine), // Should be ignored
]
.into_iter()
.map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
.collect::<HashMap<_, _>>();
let txn = manager
.build_create_txn(table_id, &region_wal_options)
.unwrap();
// Verify the transaction contains correct operations
// Should create mappings for region 0 and 1, but not region 2 (RaftEngine)
let ops = txn.req().success.clone();
assert_eq!(ops.len(), 2);
let keys: Vec<_> = ops
.iter()
.filter_map(|op| {
if let TxnOp::Put(key, _) = op {
TopicRegionKey::from_bytes(key).ok()
} else {
None
}
})
.collect();
assert_eq!(keys.len(), 2);
let region_ids: Vec<_> = keys.iter().map(|k| k.region_id).collect();
assert!(region_ids.contains(&RegionId::new(table_id, 0)));
assert!(region_ids.contains(&RegionId::new(table_id, 1)));
assert!(!region_ids.contains(&RegionId::new(table_id, 2)));
// Verify topics are correct
for key in keys {
match key.region_id.region_number() {
0 => assert_eq!(key.topic, "topic_0"),
1 => assert_eq!(key.topic, "topic_1"),
_ => panic!("Unexpected region number"),
}
}
}
#[test]
fn test_build_update_txn_add_new_region() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let manager = TopicRegionManager::new(kv_backend.clone());
let table_id = 1;
let old_region_wal_options = vec![(
0,
WalOptions::Kafka(KafkaWalOptions {
topic: "topic_0".to_string(),
}),
)]
.into_iter()
.map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
.collect::<HashMap<_, _>>();
let new_region_wal_options = vec![
(
0,
WalOptions::Kafka(KafkaWalOptions {
topic: "topic_0".to_string(),
}),
),
(
1,
WalOptions::Kafka(KafkaWalOptions {
topic: "topic_1".to_string(),
}),
),
]
.into_iter()
.map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
.collect::<HashMap<_, _>>();
let txn = manager
.build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options)
.unwrap();
let ops = txn.req().success.clone();
// Should only have Put for new region 1 (region 0 unchanged)
assert_eq!(ops.len(), 1);
if let TxnOp::Put(key, _) = &ops[0] {
let topic_key = TopicRegionKey::from_bytes(key).unwrap();
assert_eq!(topic_key.region_id, RegionId::new(table_id, 1));
assert_eq!(topic_key.topic, "topic_1");
} else {
panic!("Expected Put operation");
}
}
#[test]
fn test_build_update_txn_remove_region() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let manager = TopicRegionManager::new(kv_backend.clone());
let table_id = 1;
let old_region_wal_options = vec![
(
0,
WalOptions::Kafka(KafkaWalOptions {
topic: "topic_0".to_string(),
}),
),
(
1,
WalOptions::Kafka(KafkaWalOptions {
topic: "topic_1".to_string(),
}),
),
]
.into_iter()
.map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
.collect::<HashMap<_, _>>();
let new_region_wal_options = vec![(
0,
WalOptions::Kafka(KafkaWalOptions {
topic: "topic_0".to_string(),
}),
)]
.into_iter()
.map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
.collect::<HashMap<_, _>>();
let txn = manager
.build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options)
.unwrap();
let ops = txn.req().success.clone();
// Should only have Delete for removed region 1 (region 0 unchanged)
assert_eq!(ops.len(), 1);
match &ops[0] {
TxnOp::Delete(key) => {
let topic_key = TopicRegionKey::from_bytes(key).unwrap();
assert_eq!(topic_key.region_id, RegionId::new(table_id, 1));
assert_eq!(topic_key.topic, "topic_1");
}
TxnOp::Put(_, _) | TxnOp::Get(_) => {
panic!("Expected Delete operation");
}
}
}
#[test]
fn test_build_update_txn_change_topic() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let manager = TopicRegionManager::new(kv_backend.clone());
let table_id = 1;
let old_region_wal_options = vec![(
0,
WalOptions::Kafka(KafkaWalOptions {
topic: "topic_0".to_string(),
}),
)]
.into_iter()
.map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
.collect::<HashMap<_, _>>();
let new_region_wal_options = vec![(
0,
WalOptions::Kafka(KafkaWalOptions {
topic: "topic_0_new".to_string(),
}),
)]
.into_iter()
.map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
.collect::<HashMap<_, _>>();
let txn = manager
.build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options)
.unwrap();
let ops = txn.req().success.clone();
// Should have Delete for old topic and Put for new topic
assert_eq!(ops.len(), 2);
let mut delete_found = false;
let mut put_found = false;
for op in ops {
match op {
TxnOp::Delete(key) => {
let topic_key = TopicRegionKey::from_bytes(&key).unwrap();
assert_eq!(topic_key.region_id, RegionId::new(table_id, 0));
assert_eq!(topic_key.topic, "topic_0");
delete_found = true;
}
TxnOp::Put(key, _) => {
let topic_key = TopicRegionKey::from_bytes(&key).unwrap();
assert_eq!(topic_key.region_id, RegionId::new(table_id, 0));
assert_eq!(topic_key.topic, "topic_0_new");
put_found = true;
}
TxnOp::Get(_) => {
// Get operations shouldn't appear in this context
panic!("Unexpected Get operation in update transaction");
}
}
}
assert!(delete_found, "Expected Delete operation for old topic");
assert!(put_found, "Expected Put operation for new topic");
}
#[test]
fn test_build_update_txn_no_change() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let manager = TopicRegionManager::new(kv_backend.clone());
let table_id = 1;
let region_wal_options = vec![
(
0,
WalOptions::Kafka(KafkaWalOptions {
topic: "topic_0".to_string(),
}),
),
(
1,
WalOptions::Kafka(KafkaWalOptions {
topic: "topic_1".to_string(),
}),
),
]
.into_iter()
.map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
.collect::<HashMap<_, _>>();
let txn = manager
.build_update_txn(table_id, &region_wal_options, &region_wal_options)
.unwrap();
// Should have no operations when nothing changes (preserves checkpoint)
let ops = txn.req().success.clone();
assert_eq!(ops.len(), 0);
}
#[test]
fn test_build_update_txn_mixed_scenarios() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let manager = TopicRegionManager::new(kv_backend.clone());
let table_id = 1;
let old_region_wal_options = vec![
(
0,
WalOptions::Kafka(KafkaWalOptions {
topic: "topic_0".to_string(),
}),
),
(
1,
WalOptions::Kafka(KafkaWalOptions {
topic: "topic_1".to_string(),
}),
),
(
2,
WalOptions::Kafka(KafkaWalOptions {
topic: "topic_2".to_string(),
}),
),
]
.into_iter()
.map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
.collect::<HashMap<_, _>>();
let new_region_wal_options = vec![
(
0,
WalOptions::Kafka(KafkaWalOptions {
topic: "topic_0".to_string(), // Unchanged
}),
),
(
1,
WalOptions::Kafka(KafkaWalOptions {
topic: "topic_1_new".to_string(), // Topic changed
}),
),
// Region 2 removed
(
3,
WalOptions::Kafka(KafkaWalOptions {
topic: "topic_3".to_string(), // New region
}),
),
]
.into_iter()
.map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
.collect::<HashMap<_, _>>();
let txn = manager
.build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options)
.unwrap();
let ops = txn.req().success.clone();
// Should have:
// - Delete for region 2 (removed)
// - Delete for region 1 old topic (topic changed)
// - Put for region 1 new topic (topic changed)
// - Put for region 3 (new)
// Region 0 unchanged, so no operation
assert_eq!(ops.len(), 4);
let mut delete_ops = 0;
let mut put_ops = 0;
let mut delete_region_2 = false;
let mut delete_region_1_old = false;
let mut put_region_1_new = false;
let mut put_region_3 = false;
for op in ops {
match op {
TxnOp::Delete(key) => {
delete_ops += 1;
let topic_key = TopicRegionKey::from_bytes(&key).unwrap();
match topic_key.region_id.region_number() {
1 => {
assert_eq!(topic_key.topic, "topic_1");
delete_region_1_old = true;
}
2 => {
assert_eq!(topic_key.topic, "topic_2");
delete_region_2 = true;
}
_ => panic!("Unexpected delete operation for region"),
}
}
TxnOp::Put(key, _) => {
put_ops += 1;
let topic_key: TopicRegionKey<'_> = TopicRegionKey::from_bytes(&key).unwrap();
match topic_key.region_id.region_number() {
1 => {
assert_eq!(topic_key.topic, "topic_1_new");
put_region_1_new = true;
}
3 => {
assert_eq!(topic_key.topic, "topic_3");
put_region_3 = true;
}
_ => panic!("Unexpected put operation for region"),
}
}
TxnOp::Get(_) => {
panic!("Unexpected Get operation in update transaction");
}
}
}
assert_eq!(delete_ops, 2);
assert_eq!(put_ops, 2);
assert!(delete_region_2, "Expected delete for removed region 2");
assert!(
delete_region_1_old,
"Expected delete for region 1 old topic"
);
assert!(put_region_1_new, "Expected put for region 1 new topic");
assert!(put_region_3, "Expected put for new region 3");
}
#[test]
fn test_build_update_txn_with_raft_engine() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let manager = TopicRegionManager::new(kv_backend.clone());
let table_id = 1;
let old_region_wal_options = vec![
(
0,
WalOptions::Kafka(KafkaWalOptions {
topic: "topic_0".to_string(),
}),
),
(1, WalOptions::RaftEngine), // Should be ignored
]
.into_iter()
.map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
.collect::<HashMap<_, _>>();
let new_region_wal_options = vec![
(
0,
WalOptions::Kafka(KafkaWalOptions {
topic: "topic_0".to_string(),
}),
),
(
1,
WalOptions::Kafka(KafkaWalOptions {
topic: "topic_1".to_string(), // Changed from RaftEngine to Kafka
}),
),
]
.into_iter()
.map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap()))
.collect::<HashMap<_, _>>();
let txn = manager
.build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options)
.unwrap();
let ops = txn.req().success.clone();
// Should only have Put for region 1 (new Kafka topic)
// Region 0 unchanged, so no operation
// Region 1 was RaftEngine before (not tracked), so only Put needed
assert_eq!(ops.len(), 1);
match &ops[0] {
TxnOp::Put(key, _) => {
let topic_key = TopicRegionKey::from_bytes(key).unwrap();
assert_eq!(topic_key.region_id, RegionId::new(table_id, 1));
assert_eq!(topic_key.topic, "topic_1");
}
TxnOp::Delete(_) | TxnOp::Get(_) => {
panic!("Expected Put operation for new Kafka region");
}
}
}
}

View File

@@ -613,7 +613,7 @@ mod tests {
#[tokio::test]
async fn test_mysql_put() {
maybe_skip_mysql_integration_test!();
let kv_backend = build_mysql_kv_backend("put_test").await.unwrap();
let kv_backend = build_mysql_kv_backend("put-test").await.unwrap();
let prefix = b"put/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await;
@@ -623,7 +623,7 @@ mod tests {
#[tokio::test]
async fn test_mysql_range() {
maybe_skip_mysql_integration_test!();
let kv_backend = build_mysql_kv_backend("range_test").await.unwrap();
let kv_backend = build_mysql_kv_backend("range-test").await.unwrap();
let prefix = b"range/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await;
@@ -633,7 +633,7 @@ mod tests {
#[tokio::test]
async fn test_mysql_range_2() {
maybe_skip_mysql_integration_test!();
let kv_backend = build_mysql_kv_backend("range2_test").await.unwrap();
let kv_backend = build_mysql_kv_backend("range2-test").await.unwrap();
let prefix = b"range2/";
test_kv_range_2_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
@@ -642,7 +642,7 @@ mod tests {
#[tokio::test]
async fn test_mysql_all_range() {
maybe_skip_mysql_integration_test!();
let kv_backend = build_mysql_kv_backend("simple_range_test").await.unwrap();
let kv_backend = build_mysql_kv_backend("simple_range-test").await.unwrap();
let prefix = b"";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_simple_kv_range(&kv_backend).await;
@@ -652,7 +652,7 @@ mod tests {
#[tokio::test]
async fn test_mysql_batch_get() {
maybe_skip_mysql_integration_test!();
let kv_backend = build_mysql_kv_backend("batch_get_test").await.unwrap();
let kv_backend = build_mysql_kv_backend("batch_get-test").await.unwrap();
let prefix = b"batch_get/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await;
@@ -662,7 +662,7 @@ mod tests {
#[tokio::test]
async fn test_mysql_batch_delete() {
maybe_skip_mysql_integration_test!();
let kv_backend = build_mysql_kv_backend("batch_delete_test").await.unwrap();
let kv_backend = build_mysql_kv_backend("batch_delete-test").await.unwrap();
let prefix = b"batch_delete/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await;
@@ -672,7 +672,7 @@ mod tests {
#[tokio::test]
async fn test_mysql_batch_delete_with_prefix() {
maybe_skip_mysql_integration_test!();
let kv_backend = build_mysql_kv_backend("batch_delete_with_prefix_test")
let kv_backend = build_mysql_kv_backend("batch_delete_with_prefix-test")
.await
.unwrap();
let prefix = b"batch_delete/";
@@ -684,7 +684,7 @@ mod tests {
#[tokio::test]
async fn test_mysql_delete_range() {
maybe_skip_mysql_integration_test!();
let kv_backend = build_mysql_kv_backend("delete_range_test").await.unwrap();
let kv_backend = build_mysql_kv_backend("delete_range-test").await.unwrap();
let prefix = b"delete_range/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await;
@@ -694,7 +694,7 @@ mod tests {
#[tokio::test]
async fn test_mysql_compare_and_put() {
maybe_skip_mysql_integration_test!();
let kv_backend = build_mysql_kv_backend("compare_and_put_test")
let kv_backend = build_mysql_kv_backend("compare_and_put-test")
.await
.unwrap();
let prefix = b"compare_and_put/";
@@ -705,7 +705,7 @@ mod tests {
#[tokio::test]
async fn test_mysql_txn() {
maybe_skip_mysql_integration_test!();
let kv_backend = build_mysql_kv_backend("txn_test").await.unwrap();
let kv_backend = build_mysql_kv_backend("txn-test").await.unwrap();
test_txn_one_compare_op(&kv_backend).await;
text_txn_multi_compare_op(&kv_backend).await;
test_txn_compare_equal(&kv_backend).await;

View File

@@ -1105,7 +1105,7 @@ mod tests {
#[tokio::test]
async fn test_pg_put() {
maybe_skip_postgres_integration_test!();
let kv_backend = build_pg_kv_backend("put_test").await.unwrap();
let kv_backend = build_pg_kv_backend("put-test").await.unwrap();
let prefix = b"put/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await;
@@ -1115,7 +1115,7 @@ mod tests {
#[tokio::test]
async fn test_pg_range() {
maybe_skip_postgres_integration_test!();
let kv_backend = build_pg_kv_backend("range_test").await.unwrap();
let kv_backend = build_pg_kv_backend("range-test").await.unwrap();
let prefix = b"range/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await;
@@ -1125,7 +1125,7 @@ mod tests {
#[tokio::test]
async fn test_pg_range_2() {
maybe_skip_postgres_integration_test!();
let kv_backend = build_pg_kv_backend("range2_test").await.unwrap();
let kv_backend = build_pg_kv_backend("range2-test").await.unwrap();
let prefix = b"range2/";
test_kv_range_2_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
@@ -1134,7 +1134,7 @@ mod tests {
#[tokio::test]
async fn test_pg_all_range() {
maybe_skip_postgres_integration_test!();
let kv_backend = build_pg_kv_backend("simple_range_test").await.unwrap();
let kv_backend = build_pg_kv_backend("simple_range-test").await.unwrap();
let prefix = b"";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_simple_kv_range(&kv_backend).await;
@@ -1144,7 +1144,7 @@ mod tests {
#[tokio::test]
async fn test_pg_batch_get() {
maybe_skip_postgres_integration_test!();
let kv_backend = build_pg_kv_backend("batch_get_test").await.unwrap();
let kv_backend = build_pg_kv_backend("batch_get-test").await.unwrap();
let prefix = b"batch_get/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await;
@@ -1154,7 +1154,7 @@ mod tests {
#[tokio::test]
async fn test_pg_batch_delete() {
maybe_skip_postgres_integration_test!();
let kv_backend = build_pg_kv_backend("batch_delete_test").await.unwrap();
let kv_backend = build_pg_kv_backend("batch_delete-test").await.unwrap();
let prefix = b"batch_delete/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await;
@@ -1164,7 +1164,7 @@ mod tests {
#[tokio::test]
async fn test_pg_batch_delete_with_prefix() {
maybe_skip_postgres_integration_test!();
let kv_backend = build_pg_kv_backend("batch_delete_with_prefix_test")
let kv_backend = build_pg_kv_backend("batch_delete_with_prefix-test")
.await
.unwrap();
let prefix = b"batch_delete/";
@@ -1176,7 +1176,7 @@ mod tests {
#[tokio::test]
async fn test_pg_delete_range() {
maybe_skip_postgres_integration_test!();
let kv_backend = build_pg_kv_backend("delete_range_test").await.unwrap();
let kv_backend = build_pg_kv_backend("delete_range-test").await.unwrap();
let prefix = b"delete_range/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await;
@@ -1186,7 +1186,7 @@ mod tests {
#[tokio::test]
async fn test_pg_compare_and_put() {
maybe_skip_postgres_integration_test!();
let kv_backend = build_pg_kv_backend("compare_and_put_test").await.unwrap();
let kv_backend = build_pg_kv_backend("compare_and_put-test").await.unwrap();
let prefix = b"compare_and_put/";
let kv_backend = Arc::new(kv_backend);
test_kv_compare_and_put_with_prefix(kv_backend.clone(), prefix.to_vec()).await;
@@ -1195,7 +1195,7 @@ mod tests {
#[tokio::test]
async fn test_pg_txn() {
maybe_skip_postgres_integration_test!();
let kv_backend = build_pg_kv_backend("txn_test").await.unwrap();
let kv_backend = build_pg_kv_backend("txn-test").await.unwrap();
test_txn_one_compare_op(&kv_backend).await;
text_txn_multi_compare_op(&kv_backend).await;
test_txn_compare_equal(&kv_backend).await;

View File

@@ -48,7 +48,7 @@ pub mod stats;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;
pub mod util;
pub mod wal_options_allocator;
pub mod wal_provider;
// The id of the datanode.
pub type DatanodeId = u64;

View File

@@ -81,6 +81,13 @@ pub trait PeerAllocator: Send + Sync {
pub type PeerAllocatorRef = Arc<dyn PeerAllocator>;
#[async_trait::async_trait]
impl<T: PeerAllocator + ?Sized> PeerAllocator for Arc<T> {
async fn alloc(&self, num: usize) -> Result<Vec<Peer>, Error> {
T::alloc(self, num).await
}
}
pub struct NoopPeerAllocator;
#[async_trait::async_trait]

View File

@@ -144,6 +144,8 @@ impl ReconcileRegions {
}
/// Creates a region request builder from a raw table info.
///
/// Note: This function is primarily intended for creating logical tables or allocating placeholder regions.
fn create_region_request_from_raw_table_info(
raw_table_info: &RawTableInfo,
physical_table_id: TableId,

View File

@@ -1639,7 +1639,6 @@ mod tests {
value_indices: vec![2],
engine: METRIC_ENGINE_NAME.to_string(),
next_column_id: 0,
region_numbers: vec![0],
options: Default::default(),
created_on: Default::default(),
updated_on: Default::default(),

View File

@@ -19,6 +19,7 @@ use common_telemetry::{debug, warn};
use snafu::ensure;
use tokio::sync::Mutex;
use crate::ddl::allocator::resource_id::ResourceIdAllocator;
use crate::error::{self, Result};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::CompareAndPutRequest;
@@ -82,6 +83,25 @@ pub struct Sequence {
inner: Mutex<Inner>,
}
#[async_trait::async_trait]
impl ResourceIdAllocator for Sequence {
async fn next(&self) -> Result<u64> {
self.next().await
}
async fn peek(&self) -> Result<u64> {
self.peek().await
}
async fn jump_to(&self, next: u64) -> Result<()> {
self.jump_to(next).await
}
async fn min_max(&self) -> Range<u64> {
self.min_max().await
}
}
impl Sequence {
/// Returns the next value and increments the sequence.
pub async fn next(&self) -> Result<u64> {

View File

@@ -40,8 +40,8 @@ use crate::peer::{Peer, PeerResolver};
use crate::region_keeper::MemoryRegionKeeper;
use crate::region_registry::LeaderRegionRegistry;
use crate::sequence::SequenceBuilder;
use crate::wal_options_allocator::topic_pool::KafkaTopicPool;
use crate::wal_options_allocator::{WalOptionsAllocator, build_kafka_topic_creator};
use crate::wal_provider::topic_pool::KafkaTopicPool;
use crate::wal_provider::{WalProvider, build_kafka_topic_creator};
use crate::{DatanodeId, FlownodeId};
#[async_trait::async_trait]
@@ -187,7 +187,7 @@ pub fn new_ddl_context_with_kv_backend(
.initial(1024)
.build(),
),
Arc::new(WalOptionsAllocator::default()),
Arc::new(WalProvider::default()),
));
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
let flow_metadata_allocator =

View File

@@ -26,28 +26,46 @@ use common_wal::options::{KafkaWalOptions, WAL_OPTIONS_KEY, WalOptions};
use snafu::{ResultExt, ensure};
use store_api::storage::{RegionId, RegionNumber};
use crate::ddl::allocator::wal_options::WalOptionsAllocator;
use crate::error::{EncodeWalOptionsSnafu, InvalidTopicNamePrefixSnafu, Result};
use crate::key::TOPIC_NAME_PATTERN_REGEX;
use crate::kv_backend::KvBackendRef;
use crate::leadership_notifier::LeadershipChangeListener;
pub use crate::wal_options_allocator::topic_creator::{
build_kafka_client, build_kafka_topic_creator,
};
use crate::wal_options_allocator::topic_pool::KafkaTopicPool;
pub use crate::wal_provider::topic_creator::{build_kafka_client, build_kafka_topic_creator};
use crate::wal_provider::topic_pool::KafkaTopicPool;
/// Allocates wal options in region granularity.
/// Provides wal options in region granularity.
#[derive(Default, Debug)]
pub enum WalOptionsAllocator {
pub enum WalProvider {
#[default]
RaftEngine,
Kafka(KafkaTopicPool),
}
/// Arc wrapper of WalOptionsAllocator.
pub type WalOptionsAllocatorRef = Arc<WalOptionsAllocator>;
/// Arc wrapper of WalProvider.
pub type WalProviderRef = Arc<WalProvider>;
impl WalOptionsAllocator {
/// Tries to start the allocator.
#[async_trait::async_trait]
impl WalOptionsAllocator for WalProvider {
async fn allocate(
&self,
region_numbers: &[RegionNumber],
skip_wal: bool,
) -> Result<HashMap<RegionNumber, String>> {
let wal_options = self
.alloc_batch(region_numbers.len(), skip_wal)?
.into_iter()
.map(|wal_options| {
serde_json::to_string(&wal_options).context(EncodeWalOptionsSnafu { wal_options })
})
.collect::<Result<Vec<_>>>()?;
Ok(region_numbers.iter().copied().zip(wal_options).collect())
}
}
impl WalProvider {
/// Tries to start the provider.
pub async fn start(&self) -> Result<()> {
match self {
Self::RaftEngine => Ok(()),
@@ -56,14 +74,14 @@ impl WalOptionsAllocator {
}
/// Allocates a batch of wal options where each wal options goes to a region.
/// If skip_wal is true, the wal options will be set to Noop regardless of the allocator type.
/// If skip_wal is true, the wal options will be set to Noop regardless of the provider type.
pub fn alloc_batch(&self, num_regions: usize, skip_wal: bool) -> Result<Vec<WalOptions>> {
if skip_wal {
return Ok(vec![WalOptions::Noop; num_regions]);
}
match self {
WalOptionsAllocator::RaftEngine => Ok(vec![WalOptions::RaftEngine; num_regions]),
WalOptionsAllocator::Kafka(topic_manager) => {
WalProvider::RaftEngine => Ok(vec![WalOptions::RaftEngine; num_regions]),
WalProvider::Kafka(topic_manager) => {
let options_batch = topic_manager
.select_batch(num_regions)?
.into_iter()
@@ -80,14 +98,14 @@ impl WalOptionsAllocator {
/// Returns true if it's the remote WAL.
pub fn is_remote_wal(&self) -> bool {
matches!(&self, WalOptionsAllocator::Kafka(_))
matches!(&self, WalProvider::Kafka(_))
}
}
#[async_trait]
impl LeadershipChangeListener for WalOptionsAllocator {
impl LeadershipChangeListener for WalProvider {
fn name(&self) -> &str {
"WalOptionsAllocator"
"WalProvider"
}
async fn on_leader_start(&self) -> Result<()> {
@@ -99,13 +117,13 @@ impl LeadershipChangeListener for WalOptionsAllocator {
}
}
/// Builds a wal options allocator based on the given configuration.
pub async fn build_wal_options_allocator(
/// Builds a wal provider based on the given configuration.
pub async fn build_wal_provider(
config: &MetasrvWalConfig,
kv_backend: KvBackendRef,
) -> Result<WalOptionsAllocator> {
) -> Result<WalProvider> {
match config {
MetasrvWalConfig::RaftEngine => Ok(WalOptionsAllocator::RaftEngine),
MetasrvWalConfig::RaftEngine => Ok(WalProvider::RaftEngine),
MetasrvWalConfig::Kafka(kafka_config) => {
let prefix = &kafka_config.kafka_topic.topic_name_prefix;
ensure!(
@@ -116,28 +134,11 @@ pub async fn build_wal_options_allocator(
build_kafka_topic_creator(&kafka_config.connection, &kafka_config.kafka_topic)
.await?;
let topic_pool = KafkaTopicPool::new(kafka_config, kv_backend, topic_creator);
Ok(WalOptionsAllocator::Kafka(topic_pool))
Ok(WalProvider::Kafka(topic_pool))
}
}
}
/// Allocates a wal options for each region. The allocated wal options is encoded immediately.
pub fn allocate_region_wal_options(
regions: Vec<RegionNumber>,
wal_options_allocator: &WalOptionsAllocator,
skip_wal: bool,
) -> Result<HashMap<RegionNumber, String>> {
let wal_options = wal_options_allocator
.alloc_batch(regions.len(), skip_wal)?
.into_iter()
.map(|wal_options| {
serde_json::to_string(&wal_options).context(EncodeWalOptionsSnafu { wal_options })
})
.collect::<Result<Vec<_>>>()?;
Ok(regions.into_iter().zip(wal_options).collect())
}
/// Inserts wal options into options.
pub fn prepare_wal_options(
options: &mut HashMap<String, String>,
@@ -182,21 +183,19 @@ mod tests {
use crate::error::Error;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::test_util::test_kafka_topic_pool;
use crate::wal_options_allocator::selector::RoundRobinTopicSelector;
use crate::wal_provider::selector::RoundRobinTopicSelector;
// Tests that the wal options allocator could successfully allocate raft-engine wal options.
// Tests that the wal provider could successfully allocate raft-engine wal options.
#[tokio::test]
async fn test_allocator_with_raft_engine() {
async fn test_provider_with_raft_engine() {
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let wal_config = MetasrvWalConfig::RaftEngine;
let allocator = build_wal_options_allocator(&wal_config, kv_backend)
.await
.unwrap();
allocator.start().await.unwrap();
let provider = build_wal_provider(&wal_config, kv_backend).await.unwrap();
provider.start().await.unwrap();
let num_regions = 32;
let regions = (0..num_regions).collect::<Vec<_>>();
let got = allocate_region_wal_options(regions.clone(), &allocator, false).unwrap();
let got = provider.allocate(&regions, false).await.unwrap();
let encoded_wal_options = serde_json::to_string(&WalOptions::RaftEngine).unwrap();
let expected = regions
@@ -216,14 +215,14 @@ mod tests {
},
..Default::default()
});
let got = build_wal_options_allocator(&wal_config, kv_backend)
let got = build_wal_provider(&wal_config, kv_backend)
.await
.unwrap_err();
assert_matches!(got, Error::InvalidTopicNamePrefix { .. });
}
#[tokio::test]
async fn test_allocator_with_kafka_allocate_wal_options() {
async fn test_provider_with_kafka_allocate_wal_options() {
common_telemetry::init_default_ut_logging();
maybe_skip_kafka_integration_test!();
let num_topics = 5;
@@ -240,13 +239,13 @@ mod tests {
let topic_creator = topic_pool.topic_creator();
topic_creator.delete_topics(&topics).await.unwrap();
// Creates an options allocator.
let allocator = WalOptionsAllocator::Kafka(topic_pool);
allocator.start().await.unwrap();
// Creates an options provider.
let provider = WalProvider::Kafka(topic_pool);
provider.start().await.unwrap();
let num_regions = 3;
let regions = (0..num_regions).collect::<Vec<_>>();
let got = allocate_region_wal_options(regions.clone(), &allocator, false).unwrap();
let got = provider.allocate(&regions, false).await.unwrap();
// Check the allocated wal options contain the expected topics.
let expected = (0..num_regions)
@@ -261,13 +260,13 @@ mod tests {
}
#[tokio::test]
async fn test_allocator_with_skip_wal() {
let allocator = WalOptionsAllocator::RaftEngine;
allocator.start().await.unwrap();
async fn test_provider_with_skip_wal() {
let provider = WalProvider::RaftEngine;
provider.start().await.unwrap();
let num_regions = 32;
let regions = (0..num_regions).collect::<Vec<_>>();
let got = allocate_region_wal_options(regions.clone(), &allocator, true).unwrap();
let got = provider.allocate(&regions, true).await.unwrap();
assert_eq!(got.len(), num_regions as usize);
for wal_options in got.values() {
assert_eq!(wal_options, &"{\"wal.provider\":\"noop\"}");

Some files were not shown because too many files have changed in this diff Show More