Compare commits

...

69 Commits

Author SHA1 Message Date
discord9
3d17d195a3 feat: flownode to frontend load balance with guess 2025-06-08 14:17:32 +08:00
Weny Xu
0d4f27a699 fix: convert JSON type to JSON string in COPY TABLE TO statment (#6255)
* fix: convert JSON type to JSON string in COPY TABLE TO statement

* chore: apply suggestions from CR

* chore: apply suggestions from CR
2025-06-06 02:23:57 +00:00
Ruihang Xia
c4da8bb69d feat: don't allow creating logical table with partitions (#6249)
* feat: don't allow creating logical table with partitions

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-06-05 12:38:47 +00:00
discord9
0bd8856e2f chore: pub flow info (#6253)
* chore: make all flow info's field public

* chore: expose flow_route

* chore: more pub
2025-06-05 12:34:11 +00:00
Lei, HUANG
92c5a9f5f4 chore: allow numberic values in alter statements (#6252)
chore/allow-numberic-values-in-alter:
 ### Commit Message

 Enhance `alter_parser.rs` to Support Numeric Values

 - Updated `parse_string_options` function in `alter_parser.rs` to handle numeric literals in addition to string literals and `NULL` for alter table statements.
 - Added a new test `test_parse_alter_with_numeric_value` in `alter_parser.rs` to verify the parsing of numeric values in alter table options.
2025-06-05 02:16:53 +00:00
Weny Xu
80c5af0ecf fix: ignore incomplete WAL entries during read (#6251)
* fix: ignore incomplete entry

* fix: fix unit tests
2025-06-04 11:16:42 +00:00
LFC
7afb77fd35 fix: add "query" options to standalone (#6248) 2025-06-04 08:47:31 +00:00
discord9
0b9af77fe9 chore: test sleep longer (#6247)
* chore: test sleep longer

* win timer resolution is 15.6ms, need longer
2025-06-04 08:18:44 +00:00
discord9
cbafb6e00b feat(flow): flow streaming mode in list expr support (#6229)
* feat: flow streaming in list support

* chore: per review

* chore: per review

* fix: expr correct type
2025-06-04 08:05:20 +00:00
LFC
744a754246 fix: add missing features (#6245) 2025-06-04 07:13:39 +00:00
fys
9cd4a2c525 feat: add trigger ddl manager (#6228)
* feat: add trigger ddl manager

* chore: reduce the number of cfg feature code blocks

* upgrade greptime-proto

* chore: upgrade greptime-proto
2025-06-04 06:38:02 +00:00
liyang
180920327b ci: add option to choose whether upload artifacts to S3 in the development build (#6232)
ci: add option to choose whether to upload artifacts to S3 in the development build
2025-06-04 03:49:53 +00:00
Yingwen
ee4f830be6 fix: do not accommodate fields for multi-value protocol (#6237) 2025-06-04 01:10:52 +00:00
shuiyisong
69975f1f71 feat: pipeline with insert options (#6192)
* feat: pipeline recognize hints from exec

* chore: rename and add test

* chore: minor improve

* chore: rename and add comments

* fix: typos

* chore: remove unnecessory clone fn

* chore: group metrics

* chore: use struct in transform output enum

* chore: update hint prefix
2025-06-03 18:46:48 +00:00
discord9
38cac301f2 refactor(flow): limit the size of query (#6216)
* refactor: not wait for slow query

* chore: clippy

* chore: fmt

* WIP: time range lock

* WIP

* refactor: rm over-complicated query pool

* chore: add more metrics& rm sql from slow query metrics
2025-06-03 12:27:07 +00:00
Yuhan Wang
083c22b90a refactor: extract some common functions and structs in election module (#6172)
* refactor: extract some common functions and structs in election module

* chore: add comments and modify a function name

* chore: add comments and modify a function name

* fix: missing 2 lines in license header

* fix: acqrel

* chore: apply comment suggestions

* Update src/meta-srv/src/election.rs

Co-authored-by: jeremyhi <jiachun_feng@proton.me>

---------

Co-authored-by: jeremyhi <jiachun_feng@proton.me>
2025-06-03 11:31:30 +00:00
Lei, HUANG
fdd164c0fa fix(mito): revert initial builder capacity for TimeSeriesMemtable (#6231)
* fix/initial-builder-cap:
 ### Enhance Series Initialization and Capacity Management

 - **`simple_bulk_memtable.rs`**: Updated the `Series` initialization to use `with_capacity` with a specified capacity of 8192, improving memory management.
 - **`time_series.rs`**: Introduced `with_capacity` method in `Series` to allow custom initial capacity for `ValueBuilder`. Adjusted `INITIAL_BUILDER_CAPACITY` to 16 for more efficient memory usage. Added a new `new` method to maintain backward compatibility.

* fix/initial-builder-cap:
 ### Adjust Memory Allocation in Memtable

 - **`simple_bulk_memtable.rs`**: Reduced the initial capacity of `Series` from 8192 to 1024 to optimize memory usage.
 - **`time_series.rs`**: Decreased `INITIAL_BUILDER_CAPACITY` from 16 to 4 to improve efficiency in vector building.
2025-06-03 08:25:02 +00:00
Zhenchi
078afb2bd6 feat: bloom filter index applier support or eq chain (#6227)
* feat: bloom filter index applier support or eq chain

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-06-03 08:08:19 +00:00
localhost
477e4cc344 chore: add pg mysql be default feature in cli (#6230) 2025-06-03 07:09:26 +00:00
Lei, HUANG
078d83cec2 chore: add some metrics to grafana dashboard (#6169)
* add compaction elapsed time avg and bulk request convert elapsed time to grafana dashboard

* fix: standalone dashboard conversion

* chore: newline

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
2025-06-03 03:33:11 +00:00
liyang
7705d84d83 docs: fix bad link (#6222)
* docs: fix bad link

* Update how-to-profile-memory.md
2025-06-03 03:19:10 +00:00
dennis zhuang
0d81400bb4 feat: supports select @@session.time_zone (#6212) 2025-06-03 02:32:19 +00:00
Weny Xu
1d7ae66e75 fix: remove stale region failover detectors (#6221)
* fix: remove stale region failover detectors

* fix: fix unit tests
2025-05-30 10:27:06 +00:00
shuiyisong
af6cf999c1 chore: shared pipeline under same catalog with compatibility (#6143)
* chore: support shared pipeline under catalog with compatibility

* test: add test for cross schema ref

* chore: use empty string schema by default

* chore: remove unwrap in the patch

* fix: df check
2025-05-30 07:19:32 +00:00
jeremyhi
54869a1329 chore: clear metadata filed after updating metadata (#6215)
chore: clear metadata filed after updatng metadata
2025-05-30 07:06:39 +00:00
jeremyhi
3104d49434 chore: example of http config in metasrv (#6218)
* chore: example of http config in metasrv

* docs: make config-docs
2025-05-30 03:27:54 +00:00
fys
b4d00fb499 feat: support SQL parsing for trigger show (#6217)
* feat: support SQL parsing for trigger show

* add excludes in licenserc

* refine comment

* fix: typo

* fix: add show/trigger.rs to excludes in licenserc
2025-05-29 12:00:36 +00:00
Ning Sun
4ae6df607b feat: update pgwire to 0.30 (#6209) 2025-05-29 11:47:00 +00:00
Lei, HUANG
183e1dc031 feat(http): lossy string validation in prom remote write (#6213)
* feat/lossy-string-validation-in-prom-remote-write:
 ### Commit Message

 #### Refactor Prometheus Validation Mode

 - **Replace `is_strict_mode` with `PromValidationMode` Enum:**
   - Updated `HttpOptions` and related structures to use `PromValidationMode` enum instead of the boolean `is_strict_mode`.
   - Modified functions and tests to accommodate the new enum, ensuring flexible validation modes (`Strict`, `Lossy`, `Unchecked`).
   - Affected files: `server.rs`, `prom_decode.rs`, `http.rs`, `prom_store.rs`, `prom_row_builder.rs`, `proto.rs`, `prom_store_test.rs`, `test_util.rs`, `http.rs`.

 - **Enhance UTF-8 String Decoding:**
   - Introduced `decode_string` function to handle UTF-8 string decoding based on the selected `PromValidationMode`.
   - Affected files: `proto.rs`, `prom_row_builder.rs`.

 This refactor improves the flexibility and clarity of Prometheus request handling by allowing different validation strategies.

* feat/lossy-string-validation-in-prom-remote-write:
 - **Add Prometheus Validation Mode Configuration:**
   - Updated `config/config.md`, `config/frontend.example.toml`, and `config/standalone.example.toml` to include `http.prom_validation_mode` setting for Prometheus remote write requests.

 - **Enhance Benchmarking for Prometheus Requests:**
   - Modified `src/servers/benches/prom_decode.rs` to benchmark different Prometheus validation modes (`Strict`, `Lossy`, `Unchecked`).

 - **Implement and Test String Decoding:**
   - Added `decode_string` function and comprehensive tests in `src/servers/src/proto.rs` to handle string decoding with different validation modes.

* feat/lossy-string-validation-in-prom-remote-write:
 ### Add Histogram Buckets to Metrics

 - **Files Modified**: `src/servers/src/metrics.rs`
 - **Key Changes**:
   - Added specific histogram buckets to `METRIC_MYSQL_QUERY_TIMER`, `METRIC_POSTGRES_QUERY_TIMER`, and `METRIC_SERVER_GRPC_PROM_REQUEST_TIMER` to enhance granularity in query elapsed time metrics.

* feat/lossy-string-validation-in-prom-remote-write:
 ### Update Prometheus Validation Mode Default

 - **Config Documentation**: Updated the default description for `http.prom_validation_mode` to indicate that "strict" is the default option in `config.md`, `frontend.example.toml`, and `standalone.example.toml`.
 - **HTTP Server Implementation**: Changed the default `prom_validation_mode` to `PromValidationMode::Strict` in `src/servers/src/http.rs`.

* feat/lossy-string-validation-in-prom-remote-write:
 **Commit Message:**

 Update Prometheus Validation Mode to Strict

 - Changed `http.prom_validation_mode` from `unchecked` to `strict` in `config.md`, `frontend.example.toml`, and
 `standalone.example.toml` to enforce strict validation of Prometheus remote write requests.
2025-05-29 11:08:57 +00:00
localhost
886c2dba76 chore: fix rds kv backend test (#6214)
* chore: fix rds kv backend test

* Revert "chore: fix rds kv backend test"

This reverts commit 9b5b6bacc0.

* chore: introduce helper macro

---------

Co-authored-by: WenyXu <wenymedia@gmail.com>
2025-05-29 09:12:31 +00:00
Lei, HUANG
4e615e8906 feat(wal): support bulk wal entries (#6178)
* feat/bulk-wal:
 ### Refactor: Simplify Data Handling in LogStore Implementations

 - **`kafka/log_store.rs`, `raft_engine/log_store.rs`, `wal.rs`, `raw_entry_reader.rs`, `logstore.rs`:**
   - Refactored `entry` and `build_entry` functions to accept `Vec<u8>` directly instead of `&mut Vec<u8>`.
   - Removed usage of `std::mem::take` for data handling, simplifying the code and improving readability.
   - Updated test cases to align with the new function signatures.

* feat/bulk-wal:
 ### Add Support for Bulk WAL Entries and Flight Data Encoding

 - **Add `raw_data` field to `BulkPart` and related structs**: Updated `BulkPart` and related structures in `src/mito2/src/memtable/bulk/part.rs`, `src/mito2/src/memtable/simple_bulk_memtable.rs`, `src/mito2/src/memtable/time_partition.rs`, `src/mito2/src/region_write_ctx.rs`,
 `src/mito2/src/worker/handle_bulk_insert.rs`, and `src/store-api/src/region_request.rs` to include a new `raw_data` field for handling Arrow IPC data.
 - **Implement Flight Data Encoding**: Added a new module `flight` in `src/common/test-util/src/flight.rs` to encode record batches to Flight data format.
 - **Update `greptime-proto` dependency**: Changed the revision of the `greptime-proto` dependency in `Cargo.lock` and `Cargo.toml`.
 - **Enhance WAL Writer and Tests**: Modified `src/mito2/src/wal.rs` and related test files to support bulk WAL entries and added tests for encoding and handling bulk data.

* feat/bulk-wal:
 - **Update `greptime-proto` Dependency**: Updated the `greptime-proto` dependency to a new revision in `Cargo.lock` and `Cargo.toml`.
 - **Add `common-grpc` Dependency**: Added `common-grpc` as a dependency in `Cargo.lock` and `src/mito2/Cargo.toml`.
 - **Refactor `BulkPart` Structure**: Removed `num_rows` field and added `num_rows()` method in `src/mito2/src/memtable/bulk/part.rs`. Updated related usages in `src/mito2/src/memtable/simple_bulk_memtable.rs`, `src/mito2/src/memtable/time_partition.rs`, `src/mito2/src/memtable/time_series.rs`,
 `src/mito2/src/region_write_ctx.rs`, and `src/mito2/src/worker/handle_bulk_insert.rs`.
 - **Implement `TryFrom` and `From` for `BulkWalEntry`**: Added implementations for converting between `BulkPart` and `BulkWalEntry` in `src/mito2/src/memtable/bulk/part.rs`.
 - **Handle Bulk Entries in Region Opener**: Added logic to process bulk entries in `src/mito2/src/region/opener.rs`.
 - **Fix `BulkInsertRequest` Handling**: Corrected `region_id` handling in `src/operator/src/bulk_insert.rs` and `src/store-api/src/region_request.rs`.
 - **Add Error Variant for `ConvertBulkWalEntry`**: Added a new error variant in `src/mito2/src/error.rs` for handling bulk WAL entry conversion errors.

* fix: ci

* feat/bulk-wal:
 Add bulk write operation in `opener.rs`

 - Enhanced the region write context by adding a call to `write_bulk()` after `write_memtable()` in `opener.rs`.
 - This change aims to improve the efficiency of writing operations by enabling bulk writes.

* feat/bulk-wal:
 Enhance error handling and metrics in `bulk_insert.rs`

 - Updated `Inserter` to improve error handling by capturing the result of `datanode.handle(request)` and incrementing the `DIST_INGEST_ROW_COUNT` metric with the number of affected rows.

* feat/bulk-wal:
 ### Remove Encode Error Handling for WAL Entries

 - **`error.rs`**: Removed the `EncodeWal` error variant and its associated handling.
 - **`wal.rs`**: Eliminated the `entry_encode_buf` buffer and its usage for encoding WAL entries. Replaced with direct encoding to a vector using `encode_to_vec()`.
2025-05-29 09:10:30 +00:00
dennis zhuang
9afc61f778 feat: supports @@session.time_zone for mysql (#6210)
* feat: supports @@session.time_zone for mysql

* test: assert timezone
2025-05-29 05:26:49 +00:00
fys
d22084e90c feat: support parsing trigger create sql (#6197)
* feat: support parsing trigger create sql

* chore: add context for TryFromInt error

* chore: refine error msg about int convert

* avoid clone of token
2025-05-29 04:14:27 +00:00
Weny Xu
5e9b5d981f chore: fix feature gates for pg and mysql kvbackend (#6211) 2025-05-29 03:58:06 +00:00
Weny Xu
b01fce95a0 fix: remove poison key before retrying procedure on retryable errors (#6189)
* fix(meta): remove poison key before retrying procedure on retriable errors

* refactor: enhance error handling in DDL procedures
2025-05-29 01:17:38 +00:00
Ning Sun
9fbcf9b7e7 chore: switch nix index to 25.05 release (#6181)
chore: switch to nix index to 25.05 release
2025-05-29 01:13:35 +00:00
localhost
dc3591655e chore: add metrics for rds kv backend (#6201)
* chore: add metrics for rds kv backend

* chore: make clippy happy

* chore: remove useless rds timer

* chore: remove  in record_rds_sql_execute_elapsed macro

* chore: change some str literal to constant

* chore: fix import issue

* chore: remove impl Display for RangeTemplateType
2025-05-28 13:11:11 +00:00
jeremyhi
aca7ad82b1 chore: correct some CAS ordering args (#6200)
chore: corect some CAS ordering args
2025-05-28 09:21:46 +00:00
yihong
10fa6d8736 docs: nit from github -> GitHub (#6199)
doc: nit from github -> GitHub

Signed-off-by: yihong0618 <zouzou0208@gmail.com>
2025-05-28 07:20:02 +00:00
localhost
92422dafca feat: add CLI tool to export metadata (#6150)
* chore: add tool to export db meta

* chore: add meta restore command

* chore: fmt code

* chore: remove useless error

* chore: support key prefix

* chore: add clean check for meta restore

* chore: add more log for meta restore

* chore: resolve s3 and local file root in command meta-snapshot

* chore: remove the pg mysql features from the build script as they are already in the default feature

* chore: fix by pr comment
2025-05-28 03:18:00 +00:00
discord9
53752e4f6c fix: alter table update table column default (#6155)
* fix: alter table update table column default

* fix: fuzz test also cast default value

* chore: more testcase

* test: non-zero value

* refactor: per review

* tests: unexpected alter result(WIP on fix)

* ub

* ub more

* test: update sqlness
2025-05-27 09:42:27 +00:00
Weny Xu
40bfa98d4b fix(promql): handle field column projection with correct qualifier (#6183)
* fix(promql): handle field column projection with correct qualifier

* test: add sqlness tests
2025-05-27 03:26:23 +00:00
dennis zhuang
49986b03d6 chore: change info to debug for scanning physical table (#6180) 2025-05-26 18:23:31 +00:00
Lei, HUANG
493440a802 refactor: replace FlightMessage with arrow RecordBatch and Schema (#6175)
* refactor/flight-codec:
 ### Refactor and Enhance Schema and RecordBatch Handling

 - **Add `datatypes` Dependency**: Updated `Cargo.lock` and `Cargo.toml` to include the `datatypes` dependency.
 - **Schema Conversion and Error Handling**:
   - Updated `src/client/src/database.rs` and `src/client/src/region.rs` to handle schema conversion using `Arc` and added error handling for schema conversion.
   - Enhanced error handling in `src/client/src/error.rs` and `src/common/grpc/src/error.rs` by adding `ConvertSchema` error and removing unused errors.
 - **FlightMessage and RecordBatch Refactoring**:
   - Refactored `FlightMessage` enum in `src/common/grpc/src/flight.rs` to use `RecordBatch` instead of `Recordbatch`.
   - Updated related functions and tests in `src/common/grpc/benches/bench_flight_decoder.rs`, `src/operator/src/bulk_insert.rs`, `src/servers/src/grpc/flight/stream.rs`, and `tests-integration/src/grpc/flight.rs` to align with the new `FlightMessage` structure.

* refactor/flight-codec:
 Remove `ConvertArrowSchema` Error Variant

 - Removed the `ConvertArrowSchema` error variant from `error.rs`.
 - Updated the `ErrorExt` implementation to exclude `ConvertArrowSchema`.
 - Affected file: `src/common/query/src/error.rs`.

* fix: cr
2025-05-26 10:06:50 +00:00
localhost
77e2fee755 fix: add simple test for rds kv backend (#6167)
* chore: add simple test for rds kv backend

* chore: add test for etcd and mem

* chore: remove etcd simple range test

* chore: add more test case
2025-05-26 06:32:36 +00:00
dennis zhuang
b85429c0f1 fix: set column index can't work in physical table (#6179) 2025-05-26 04:44:05 +00:00
Lei, HUANG
3d942f6763 fix: bulk insert case sensitive (#6165)
* fix/bulk-insert-case-sensitive:
 Add error inspection for gRPC bulk insert in `greptime_handler.rs`

 - Enhanced error handling by adding `inspect_err` to log errors during the `put_record_batch` operation in `greptime_handler.rs`.

* fix: silient error while bulk ingest with uppercase columns
2025-05-24 07:02:42 +00:00
discord9
3901863432 chore: metasrv starting not blocking (#6158)
* chore: metasrv starting not blocking

* chore: fmt

* chore: expose actual bind_addr
2025-05-23 09:53:42 +00:00
Lei, HUANG
27e339f628 perf: optimize bulk encode decode (#6161)
* main:
 **Enhancements to Flight Data Handling and Error Management**

 - **Flight Data Handling:**
   - Added `bytes` dependency in `Cargo.lock` and `Cargo.toml`.
   - Introduced `try_from_schema_bytes` and `try_decode_record_batch` methods in `FlightDecoder` to handle schema and record batch decoding more efficiently in `src/common/grpc/src/flight.rs`.
   - Updated `Inserter` in `src/operator/src/bulk_insert.rs` to utilize schema bytes directly, improving bulk insert operations.

 - **Error Management:**
   - Added `ArrowError` handling in `src/common/grpc/src/error.rs` to manage errors related to Arrow operations.

 - **Region Request Processing:**
   - Modified `make_region_bulk_inserts` in `src/store-api/src/region_request.rs` to use the new `FlightDecoder` methods for decoding Arrow IPC data.

* - **Flight Data Handling:**
 - Added `bytes` dependency in `Cargo.lock` and `Cargo.toml`.
 - Introduced `try_from_schema_bytes` and `try_decode_record_batch` methods in `FlightDecoder` to handle schema and record batch decoding more efficiently in `src/common/grpc/src/flight.rs`.
 - Updated `Inserter` in `src/operator/src/bulk_insert.rs` to utilize schema bytes directly, improving bulk insert operations.
- **Error Management:**
 - Added `ArrowError` handling in `src/common/grpc/src/error.rs` to manage errors related to Arrow operations.
- **Region Request Processing:**
 - Modified `make_region_bulk_inserts` in `src/store-api/src/region_request.rs` to use the new `FlightDecoder` methods for decoding Arrow IPC data.

* perf/optimize-bulk-encode-decode:
 Update `greptime-proto` dependency and refactor error handling

 - **Dependency Update**: Updated the `greptime-proto` dependency to a new revision in `Cargo.lock` and `Cargo.toml`.
 - **Error Handling Refactor**: Removed the `Prost` error variant from `MetadataError` in `src/store-api/src/metadata.rs`.
 - **Error Handling Improvement**: Replaced `unwrap` with `context(FlightCodecSnafu)` for error handling in `make_region_bulk_inserts` function in `src/store-api/src/region_request.rs`.

* fix: clippy

* fix: toml

* perf/optimize-bulk-encode-decode:
 ### Update `Cargo.toml` Dependencies

 - Updated the `bytes` dependency to use the workspace version in `Cargo.toml`.

* perf/optimize-bulk-encode-decode:
 **Fix payload assignment in `bulk_insert.rs`**

 - Corrected the assignment of the `payload` field in the `ArrowIpc` struct within the `Inserter` implementation in `bulk_insert.rs`.

* use main branch proto
2025-05-23 07:22:10 +00:00
discord9
cf2712e6f4 chore: invalid table flow mapping cache (#6135)
* chore: invalid table flow mapping

* chore: exists

* fix: invalid all related keys in kv cache when drop flow&refactor: per review

* fix: flow not found status code

* chore: rm unused error code

* chore: stuff

* chore: unused
2025-05-23 03:40:10 +00:00
Lei, HUANG
4b71e493f7 feat!: revise compaction picker (#6121)
* - **Refactor `RegionFilePathFactory` to `RegionFilePathProvider`:** Updated references and implementations in `access_layer.rs`, `write_cache.rs`, and related test files to use the new struct name.
 - **Add `max_file_size` support in compaction:** Introduced `max_file_size` option in `PickerOutput`, `SerializedPickerOutput`, and `WriteOptions` in `compactor.rs`, `picker.rs`, `twcs.rs`, and `window.rs`.
 - **Enhance Parquet writing logic:** Modified `parquet.rs` and `parquet/writer.rs` to support optional `max_file_size` and added a test case `test_write_multiple_files` to verify writing multiple files based on size constraints.

 **Refactor Parquet Writer Initialization and File Handling**
 - Updated `ParquetWriter` in `writer.rs` to handle `current_indexer` as an `Option`, allowing for more flexible initialization and management.
 - Introduced `finish_current_file` method to encapsulate logic for completing and transitioning between SST files, improving code clarity and maintainability.
 - Enhanced error handling and logging with `debug` statements for better traceability during file operations.

 - **Removed Output Size Enforcement in `twcs.rs`:**
   - Deleted the `enforce_max_output_size` function and related logic to simplify compaction input handling.

 - **Added Max File Size Option in `parquet.rs`:**
   - Introduced `max_file_size` in `WriteOptions` to control the maximum size of output files.

 - **Refactored Indexer Management in `parquet/writer.rs`:**
   - Changed `current_indexer` from an `Option` to a direct `Indexer` type.
   - Implemented `roll_to_next_file` to handle file transitions when exceeding `max_file_size`.
   - Simplified indexer initialization and management logic.

 - **Refactored SST File Handling**:
   - Introduced `FilePathProvider` trait and its implementations (`WriteCachePathProvider`, `RegionFilePathFactory`) to manage SST and index file paths.
   - Updated `AccessLayer`, `WriteCache`, and `ParquetWriter` to use `FilePathProvider` for path management.
   - Modified `SstWriteRequest` and `SstUploadRequest` to use path providers instead of direct paths.
   - Files affected: `access_layer.rs`, `write_cache.rs`, `parquet.rs`, `writer.rs`.

 - **Enhanced Indexer Management**:
   - Replaced `IndexerBuilder` with `IndexerBuilderImpl` and made it async to support dynamic indexer creation.
   - Updated `ParquetWriter` to handle multiple indexers and file IDs.
   - Files affected: `index.rs`, `parquet.rs`, `writer.rs`.

 - **Removed Redundant File ID Handling**:
   - Removed `file_id` from `SstWriteRequest` and `CompactionOutput`.
   - Updated related logic to dynamically generate file IDs where necessary.
   - Files affected: `compaction.rs`, `flush.rs`, `picker.rs`, `twcs.rs`, `window.rs`.

 - **Test Adjustments**:
   - Updated tests to align with new path and indexer management.
   - Introduced `FixedPathProvider` and `NoopIndexBuilder` for testing purposes.
   - Files affected: `sst_util.rs`, `version_util.rs`, `parquet.rs`.

* chore: rebase main

* feat/multiple-compaction-output:
 ### Add Benchmarking and Refactor Compaction Logic

 - **Benchmarking**: Added a new benchmark `run_bench` in `Cargo.toml` and implemented benchmarks in `benches/run_bench.rs` using Criterion for `find_sorted_runs` and `reduce_runs` functions.
 - **Compaction Module Enhancements**:
   - Made `run.rs` public and refactored the `Ranged` and `Item` traits to be public.
   - Simplified the logic in `find_sorted_runs` and `reduce_runs` by removing `MergeItems` and related functions.
   - Introduced `find_overlapping_items` for identifying overlapping items.
 - **Code Cleanup**: Removed redundant code and tests related to `MergeItems` in `run.rs`.

* feat/multiple-compaction-output:
 ### Enhance Compaction Logic and Add Benchmarks

 - **Compaction Logic Improvements**:
   - Updated `reduce_runs` function in `src/mito2/src/compaction/run.rs` to remove the target parameter and improve the logic for selecting files to merge based on minimum penalty.
   - Enhanced `find_overlapping_items` to handle unsorted inputs and improve overlap detection efficiency.

 - **Benchmark Enhancements**:
   - Added `bench_find_overlapping_items` in `src/mito2/benches/run_bench.rs` to benchmark the new `find_overlapping_items` function.
   - Extended existing benchmarks to include larger data sizes.

 - **Testing Enhancements**:
   - Updated tests in `src/mito2/src/compaction/run.rs` to reflect changes in `reduce_runs` and added new tests for `find_overlapping_items`.

 - **Logging and Debugging**:
   - Improved logging in `src/mito2/src/compaction/twcs.rs` to provide more detailed information about compaction decisions.

* feat/multiple-compaction-output:
 ### Refactor and Enhance Compaction Logic

 - **Refactor `find_overlapping_items` Function**: Changed the function signature to accept slices instead of mutable vectors in `run.rs`.
 - **Rename and Update Struct Fields**: Renamed `penalty` to `size` in `SortedRun` struct and updated related logic in `run.rs`.
 - **Enhance `reduce_runs` Function**: Improved logic to sort runs by size and limit probe runs to 100 in `run.rs`.
 - **Add `merge_seq_files` Function**: Introduced a new function `merge_seq_files` in `run.rs` for merging sequential files.
 - **Modify `TwcsPicker` Logic**: Updated the compaction logic to use `merge_seq_files` when only one run is found in `twcs.rs`.
 - **Remove `enforce_file_num` Function**: Deleted the `enforce_file_num` function and its related test cases in `twcs.rs`.

* feat/multiple-compaction-output:
 ### Enhance Compaction Logic and Testing

 - **Add `merge_seq_files` Functionality**: Implemented the `merge_seq_files` function in `run.rs` to optimize file merging based on scoring systems. Updated
 benchmarks in `run_bench.rs` to include `bench_merge_seq_files`.
 - **Improve Compaction Strategy in `twcs.rs`**: Modified the compaction logic to handle file merging more effectively, considering file size and overlap.
 - **Update Tests**: Enhanced test coverage in `compaction_test.rs` and `append_mode_test.rs` to validate new compaction logic and file merging strategies.
 - **Remove Unused Function**: Deleted `new_file_handles` from `test_util.rs` as it was no longer needed.

* feat/multiple-compaction-output:
 ### Refactor TWCS Compaction Options

 - **Refactor Compaction Logic**: Simplified the TWCS compaction logic by replacing multiple parameters (`max_active_window_runs`, `max_active_window_files`, `max_inactive_window_runs`, `max_inactive_window_files`) with a single `trigger_file_num` parameter in `picker.rs`, `twcs.rs`, and `options.rs`.
 - **Update Tests**: Adjusted test cases to reflect the new compaction logic in `append_mode_test.rs`, `compaction_test.rs`, `filter_deleted_test.rs`, `merge_mode_test.rs`, and various test files under `tests/cases`.
 - **Modify Engine Options**: Updated engine option keys to use `trigger_file_num` in `mito_engine_options.rs` and `region_request.rs`.
 - **Fuzz Testing**: Updated fuzz test generators and translators to accommodate the new compaction parameter in `alter_expr.rs` and related files.

 This refactor aims to streamline the compaction configuration by reducing the number of parameters and simplifying the codebase.

* chore: add trailing space

* fix license header

* feat/revise-compaction-picker:
 **Limit File Processing and Optimize Merge Logic in `run.rs`**

 - Introduced a limit to process a maximum of 100 files in `merge_seq_files` to control time complexity.
 - Adjusted logic to calculate `target_size` and iterate over files using the limited set of files.
 - Updated scoring calculations to use the limited file set, ensuring efficient file merging.

* feat/revise-compaction-picker:
 ### Add Compaction Metrics and Remove Debug Logging

 - **Compaction Metrics**: Introduced new histograms `COMPACTION_INPUT_BYTES` and `COMPACTION_OUTPUT_BYTES` to track compaction input and output file sizes in `metrics.rs`. Updated `compactor.rs` to observe these metrics during the compaction process.
 - **Logging Cleanup**: Removed debug logging of file ranges during the merge process in `twcs.rs`.

* feat/revise-compaction-picker:
 ## Enhance Compaction Logic and Metrics

 - **Compaction Logic Improvements**:
   - Added methods `input_file_size` and `output_file_size` to `MergeOutput` in `compactor.rs` to streamline file size calculations.
   - Updated `Compactor` implementation to use these methods for metrics tracking.
   - Modified `Ranged` trait logic in `run.rs` to improve range comparison.
   - Enhanced test cases in `run.rs` to reflect changes in compaction logic.

 - **Metrics Enhancements**:
   - Changed `COMPACTION_INPUT_BYTES` and `COMPACTION_OUTPUT_BYTES` from histograms to counters in `metrics.rs` for better performance tracking.

 - **Debugging and Logging**:
   - Added detailed logging for compaction pick results in `twcs.rs`.
   - Implemented custom `Debug` trait for `FileMeta` in `file.rs` to improve debugging output.

 - **Testing Enhancements**:
   - Added new test `test_compaction_overlapping_files` in `compaction_test.rs` to verify compaction behavior with overlapping files.
   - Updated `merge_mode_test.rs` to reflect changes in file handling during scans.

* feat/revise-compaction-picker:
 ### Update `FileHandle` Debug Implementation

 - **Refactor Debug Output**: Simplified the `fmt::Debug` implementation for `FileHandle` in `src/mito2/src/sst/file.rs` by consolidating multiple fields into a single `meta` field using `meta_ref()`.
 - **Atomic Operations**: Updated the `deleted` field to use atomic loading with `Ordering::Relaxed`.

* Trigger CI

* feat/revise-compaction-picker:
 **Update compaction logic and default options**

 - **`twcs.rs`**: Enhanced logging for compaction pick results by improving the formatting for better readability.
 - **`options.rs`**: Modified the default `max_output_file_size` in `TwcsOptions` from 2GB to 512MB to optimize file handling and performance.

* feat/revise-compaction-picker:
 Refactor `find_overlapping_items` to use an external result vector

 - Updated `find_overlapping_items` in `src/mito2/src/compaction/run.rs` to accept a mutable result vector instead of returning a new vector, improving memory efficiency.
 - Modified benchmarks in `src/mito2/benches/bench_compaction_picker.rs` to accommodate the new function signature.
 - Adjusted tests in `src/mito2/src/compaction/run.rs` to use the updated function signature, ensuring correct functionality with the new approach.

* feat/revise-compaction-picker:
 Improve file merging logic in `run.rs`

 - Refactor the loop logic in `merge_seq_files` to simplify the iteration over file groups.
 - Adjust the range for `end_idx` to include the endpoint, allowing for more flexible group selection.
 - Remove the condition that skips groups with only one file, enabling more comprehensive processing of file sequences.

* feat/revise-compaction-picker:
 Enhance `find_overlapping_items` with `SortedRun` and Update Tests

 - Refactor `find_overlapping_items` in `src/mito2/src/compaction/run.rs` to utilize the `SortedRun` struct for improved efficiency and clarity.
 - Introduce a `sorted` flag in `SortedRun` to optimize sorting operations.
 - Update test cases in `src/mito2/benches/bench_compaction_picker.rs` to accommodate changes in `find_overlapping_items` by using `SortedRun`.
 - Add `From<Vec<T>>` implementation for `SortedRun` to facilitate easy conversion from vectors.

* feat/revise-compaction-picker:
 **Enhancements in `compaction/run.rs`:**

 - Added `ReadableSize` import to handle size calculations.
 - Modified the logic in `merge_seq_files` to clamp the calculated target size to a maximum of 2GB when `max_file_size` is not provided.

* feat/revise-compaction-picker: Add Default Max Output Size Constant for Compaction

Introduce DEFAULT_MAX_OUTPUT_SIZE constant to define the default maximum compaction output file size as 2GB. Refactor the merge_seq_files function to utilize this constant, ensuring consistent and maintainable code for handling file size limits during compaction.
2025-05-23 03:29:08 +00:00
Ruihang Xia
bf496e05cc ci: turn off fail fast strategy (#6157)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-05-23 02:38:25 +00:00
zyy17
513ca951ee chore: add the missing v prefix for NEXT_RELEASE_VERSION variable (#6160)
chore: add 'v' prefix for NEXT_RELEASE_VERSION variable
2025-05-22 10:33:14 +00:00
Ruihang Xia
791f530a78 fix: require input ordering in series divide plan (#6148)
* require input ordering in series divide plan

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add sqlness case

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* finilise

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-05-22 07:04:25 +00:00
Ning Sun
1de6d8c619 fix: ident value in set search_path (#6153)
* fix: ident value in set search_path

* refactor: remove unneeded clone
2025-05-22 03:58:18 +00:00
discord9
a4d0420727 fix(flow): flow task run interval (#6100)
* fix: always check for shutdown signal in flow
chore: correct log msg for flows that shouldn't exist
feat: use time window size/2 as sleep interval

* chore: better slower query refresh time

* chore

* refactor: per review
2025-05-22 03:27:26 +00:00
discord9
fc6300a2ba feat(flow): support prom ql(in tql) in flow (#6063)
* feat: support parse prom ql in create flow

* refactor

* fix: just run tql unmodified

* refactor: determine type faster

* fix: pass original query

* tests: sqlness

* test: fix format&chore

* fix: get raw query

* test: fix sqlness randomness

* chore: what's the box for?

* test: location_to_index

* test: make sqlness more determinstic

* fix: tmp add sleep 1s after flush_flow

* undo test sleep 1s&rm done todo

* chore: more tests
2025-05-22 03:06:09 +00:00
liyang
f55af5838c ci: add issues write permission (#6145)
fixed to: https://github.com/GreptimeTeam/greptimedb/actions/runs/15155518237/job/42610589439
2025-05-21 15:53:01 +00:00
Lei, HUANG
5a0da5b6bb fix: region worker stall metrics (#6149)
fix/stall-metrics:
 Improve stalled request handling in `handle_write.rs`

 - Updated logic to account for both `write_requests` and `bulk_requests` when adjusting `stalled_count`.
 - Modified `reject_region_stalled_requests` and `handle_region_stalled_requests` to correctly subtract the combined length of `requests` and `bulk` from `stalled_count`.
2025-05-21 13:21:50 +00:00
Lei, HUANG
d5f0006864 fix: flaky prom gateway test (#6146)
fix/flaky-prom-gateway-test:
 **Refactor gRPC Test Assertions in `grpc.rs`**

 - Updated test assertions for `test_prom_gateway_query` to improve clarity and maintainability.
 - Replaced direct comparison with expected `PrometheusJsonResponse` objects with individual field assertions.
 - Added sorting for `vector` and `matrix` results to ensure consistent test outcomes.
2025-05-21 09:31:58 +00:00
liyang
ede82331b2 docs: change docker run mount directory (#6142) 2025-05-21 07:05:21 +00:00
Ruihang Xia
56e696bd55 chore: remove stale wal config entries (#6134)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-05-20 19:42:09 +00:00
ZonaHe
bc0cdf62ba feat: update dashboard to v0.9.2 (#6140)
Co-authored-by: ZonaHex <ZonaHex@users.noreply.github.com>
2025-05-20 19:41:29 +00:00
Lei, HUANG
eaf7b4b9dd chore: update flush failure metric name and update grafana dashboard (#6138)
* 1. rename `greptime_mito_flush_errors_total` metric to `greptime_mito_flush_errors_total` for consistency
2. update grafana dashboard to add following panel:
  - compaction input/output bytes
  - bulk insert handle elasped time in frontend and region worker
2025-05-20 12:05:54 +00:00
Ruihang Xia
7ae0e150e5 feat: support altering multiple logical table in one remote write request (#6137)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-05-20 11:22:38 +00:00
ZonaHe
43c30b55ae feat: update dashboard to v0.9.1 (#6132)
Co-authored-by: sunchanglong <sunchanglong@users.noreply.github.com>
2025-05-20 09:58:44 +00:00
liyang
153e80450a fix: update dev-build image tag (#6136) 2025-05-20 09:08:28 +00:00
jeremyhi
1624dc41c5 chore: reduce unnecessary txns in alter operations (#6133) 2025-05-20 08:29:49 +00:00
Ruihang Xia
300262562b feat: accommodate default column name with pre-created table schema (#6126)
* refactor: prepare_mocked_backend

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* modify request in place

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* apply to influx line protocol

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* return on empty alter expr list

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* expose to other write paths

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-05-20 07:22:13 +00:00
306 changed files with 19263 additions and 10499 deletions

View File

@@ -52,7 +52,7 @@ runs:
uses: ./.github/actions/build-greptime-binary
with:
base-image: ubuntu
features: servers/dashboard,pg_kvbackend,mysql_kvbackend
features: servers/dashboard
cargo-profile: ${{ inputs.cargo-profile }}
artifacts-dir: greptime-linux-${{ inputs.arch }}-${{ inputs.version }}
version: ${{ inputs.version }}
@@ -70,7 +70,7 @@ runs:
if: ${{ inputs.arch == 'amd64' && inputs.dev-mode == 'false' }} # Builds greptime for centos if the host machine is amd64.
with:
base-image: centos
features: servers/dashboard,pg_kvbackend,mysql_kvbackend
features: servers/dashboard
cargo-profile: ${{ inputs.cargo-profile }}
artifacts-dir: greptime-linux-${{ inputs.arch }}-centos-${{ inputs.version }}
version: ${{ inputs.version }}

View File

@@ -22,7 +22,6 @@ datanode:
[wal]
provider = "kafka"
broker_endpoints = ["kafka.kafka-cluster.svc.cluster.local:9092"]
linger = "2ms"
overwrite_entry_start_id = true
frontend:
configData: |-

View File

@@ -16,7 +16,8 @@ function create_version() {
if [ -z "$NEXT_RELEASE_VERSION" ]; then
echo "NEXT_RELEASE_VERSION is empty, use version from Cargo.toml" >&2
export NEXT_RELEASE_VERSION=$(grep '^version = ' Cargo.toml | cut -d '"' -f 2 | head -n 1)
# NOTE: Need a `v` prefix for the version string.
export NEXT_RELEASE_VERSION=v$(grep '^version = ' Cargo.toml | cut -d '"' -f 2 | head -n 1)
fi
if [ -z "$NIGHTLY_RELEASE_PREFIX" ]; then

View File

@@ -4,7 +4,7 @@ DEV_BUILDER_IMAGE_TAG=$1
update_dev_builder_version() {
if [ -z "$DEV_BUILDER_IMAGE_TAG" ]; then
echo "Error: Should specify the dev-builder image tag"
echo "Error: Should specify the dev-builder image tag"
exit 1
fi
@@ -17,7 +17,7 @@ update_dev_builder_version() {
git checkout -b $BRANCH_NAME
# Update the dev-builder image tag in the Makefile.
gsed -i "s/DEV_BUILDER_IMAGE_TAG ?=.*/DEV_BUILDER_IMAGE_TAG ?= ${DEV_BUILDER_IMAGE_TAG}/g" Makefile
sed -i "s/DEV_BUILDER_IMAGE_TAG ?=.*/DEV_BUILDER_IMAGE_TAG ?= ${DEV_BUILDER_IMAGE_TAG}/g" Makefile
# Commit the changes.
git add Makefile

View File

@@ -55,6 +55,11 @@ on:
description: Build and push images to DockerHub and ACR
required: false
default: true
upload_artifacts_to_s3:
type: boolean
description: Whether upload artifacts to s3
required: false
default: false
cargo_profile:
type: choice
description: The cargo profile to use in building GreptimeDB.
@@ -238,7 +243,7 @@ jobs:
version: ${{ needs.allocate-runners.outputs.version }}
push-latest-tag: false # Don't push the latest tag to registry.
dev-mode: true # Only build the standard images.
- name: Echo Docker image tag to step summary
run: |
echo "## Docker Image Tag" >> $GITHUB_STEP_SUMMARY
@@ -281,7 +286,7 @@ jobs:
aws-cn-access-key-id: ${{ secrets.AWS_CN_ACCESS_KEY_ID }}
aws-cn-secret-access-key: ${{ secrets.AWS_CN_SECRET_ACCESS_KEY }}
aws-cn-region: ${{ vars.AWS_RELEASE_BUCKET_REGION }}
upload-to-s3: false
upload-to-s3: ${{ inputs.upload_artifacts_to_s3 }}
dev-mode: true # Only build the standard images(exclude centos images).
push-latest-tag: false # Don't push the latest tag to registry.
update-version-info: false # Don't update the version info in S3.

View File

@@ -195,6 +195,7 @@ jobs:
runs-on: ubuntu-latest
timeout-minutes: 60
strategy:
fail-fast: false
matrix:
target: [ "unstable_fuzz_create_table_standalone" ]
steps:
@@ -299,6 +300,7 @@ jobs:
needs: build-greptime-ci
timeout-minutes: 60
strategy:
fail-fast: false
matrix:
target: [ "fuzz_create_table", "fuzz_alter_table", "fuzz_create_database", "fuzz_create_logical_table", "fuzz_alter_logical_table", "fuzz_insert", "fuzz_insert_logical_table" ]
mode:
@@ -431,6 +433,7 @@ jobs:
needs: build-greptime-ci
timeout-minutes: 60
strategy:
fail-fast: false
matrix:
target: ["fuzz_migrate_mito_regions", "fuzz_migrate_metric_regions", "fuzz_failover_mito_regions", "fuzz_failover_metric_regions"]
mode:
@@ -578,6 +581,7 @@ jobs:
needs: build
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ ubuntu-latest ]
mode:

View File

@@ -124,9 +124,7 @@ jobs:
fetch-depth: 0
persist-credentials: false
- uses: cachix/install-nix-action@v31
with:
nix_path: nixpkgs=channel:nixos-24.11
- run: nix develop --command cargo build --bin greptime
- run: nix develop --command cargo check --bin greptime
env:
CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=mold"

View File

@@ -16,6 +16,7 @@ jobs:
runs-on: ubuntu-latest
permissions:
pull-requests: write # Add permissions to modify PRs
issues: write
timeout-minutes: 10
steps:
- uses: actions/checkout@v4

View File

@@ -108,7 +108,7 @@ of what you were trying to do and what went wrong. You can also reach for help i
The core team will be thrilled if you would like to participate in any way you like. When you are stuck, try to ask for help by filing an issue, with a detailed description of what you were trying to do and what went wrong. If you have any questions or if you would like to get involved in our community, please check out:
- [GreptimeDB Community Slack](https://greptime.com/slack)
- [GreptimeDB Github Discussions](https://github.com/GreptimeTeam/greptimedb/discussions)
- [GreptimeDB GitHub Discussions](https://github.com/GreptimeTeam/greptimedb/discussions)
Also, see some extra GreptimeDB content:

88
Cargo.lock generated
View File

@@ -1852,8 +1852,9 @@ dependencies = [
"futures",
"humantime",
"meta-client",
"meta-srv",
"nu-ansi-term",
"opendal 0.51.2",
"object-store",
"query",
"rand 0.9.0",
"reqwest",
@@ -1889,6 +1890,7 @@ dependencies = [
"common-query",
"common-recordbatch",
"common-telemetry",
"datatypes",
"enum_dispatch",
"futures",
"futures-util",
@@ -2223,6 +2225,7 @@ version = "0.15.0"
dependencies = [
"api",
"arrow-flight",
"bytes",
"common-base",
"common-error",
"common-macro",
@@ -2320,6 +2323,7 @@ dependencies = [
"common-query",
"common-recordbatch",
"common-telemetry",
"common-test-util",
"common-time",
"common-wal",
"common-workload",
@@ -2330,6 +2334,7 @@ dependencies = [
"deadpool-postgres",
"derive_builder 0.20.1",
"etcd-client",
"flexbuffers",
"futures",
"futures-util",
"hex",
@@ -2338,6 +2343,7 @@ dependencies = [
"itertools 0.14.0",
"lazy_static",
"moka",
"object-store",
"prometheus",
"prost 0.13.5",
"rand 0.9.0",
@@ -2536,6 +2542,7 @@ name = "common-test-util"
version = "0.15.0"
dependencies = [
"client",
"common-grpc",
"common-query",
"common-recordbatch",
"once_cell",
@@ -4258,6 +4265,19 @@ dependencies = [
"miniz_oxide",
]
[[package]]
name = "flexbuffers"
version = "25.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "935627e7bc8f083035d9faad09ffaed9128f73fb1f74a8798f115749c43378e8"
dependencies = [
"bitflags 1.3.2",
"byteorder",
"num_enum 0.5.11",
"serde",
"serde_derive",
]
[[package]]
name = "float-cmp"
version = "0.10.0"
@@ -4352,6 +4372,7 @@ dependencies = [
"session",
"smallvec",
"snafu 0.8.5",
"sql",
"store-api",
"strum 0.27.1",
"substrait 0.15.0",
@@ -4855,7 +4876,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=7668a882d57ca6a2333146e0574b8f0c9d5008ae#7668a882d57ca6a2333146e0574b8f0c9d5008ae"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=454c52634c3bac27de10bf0d85d5533eed1cf03f#454c52634c3bac27de10bf0d85d5533eed1cf03f"
dependencies = [
"prost 0.13.5",
"serde",
@@ -6955,6 +6976,7 @@ dependencies = [
"common-decimal",
"common-error",
"common-function",
"common-grpc",
"common-macro",
"common-meta",
"common-query",
@@ -7572,13 +7594,34 @@ dependencies = [
"libc",
]
[[package]]
name = "num_enum"
version = "0.5.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f646caf906c20226733ed5b1374287eb97e3c2a5c227ce668c1f2ce20ae57c9"
dependencies = [
"num_enum_derive 0.5.11",
]
[[package]]
name = "num_enum"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e613fc340b2220f734a8595782c551f1250e969d87d3be1ae0579e8d4065179"
dependencies = [
"num_enum_derive",
"num_enum_derive 0.7.3",
]
[[package]]
name = "num_enum_derive"
version = "0.5.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcbff9bc912032c62bf65ef1d5aea88983b420f4f839db1e9b0c281a25c9c799"
dependencies = [
"proc-macro-crate 1.3.1",
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
@@ -7632,7 +7675,7 @@ dependencies = [
"lazy_static",
"md5",
"moka",
"opendal 0.52.0",
"opendal",
"prometheus",
"tokio",
"uuid",
@@ -7671,7 +7714,7 @@ dependencies = [
"futures",
"futures-util",
"object_store",
"opendal 0.52.0",
"opendal",
"pin-project",
"tokio",
]
@@ -7697,35 +7740,6 @@ version = "11.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b410bbe7e14ab526a0e86877eb47c6996a2bd7746f027ba551028c925390e4e9"
[[package]]
name = "opendal"
version = "0.51.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b1063ea459fa9e94584115743b06330f437902dd1d9f692b863ef1875a20548"
dependencies = [
"anyhow",
"async-trait",
"backon",
"base64 0.22.1",
"bytes",
"chrono",
"crc32c",
"futures",
"getrandom 0.2.15",
"http 1.1.0",
"log",
"md-5",
"once_cell",
"percent-encoding",
"quick-xml 0.36.2",
"reqsign",
"reqwest",
"serde",
"serde_json",
"tokio",
"uuid",
]
[[package]]
name = "opendal"
version = "0.52.0"
@@ -8077,7 +8091,7 @@ dependencies = [
"arrow 53.4.1",
"arrow-ipc 53.4.1",
"lazy_static",
"num_enum",
"num_enum 0.7.3",
"opentelemetry-proto 0.27.0",
"paste",
"prost 0.13.5",
@@ -8414,9 +8428,9 @@ dependencies = [
[[package]]
name = "pgwire"
version = "0.29.0"
version = "0.30.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4e6fcdc2ae2173ef8ee1005b6e46453d45195ac3d97caac0db7ecf64ab4aa85"
checksum = "ec79ee18e6cafde8698885646780b967ecc905120798b8359dd0da64f9688e89"
dependencies = [
"async-trait",
"bytes",

View File

@@ -132,7 +132,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "7668a882d57ca6a2333146e0574b8f0c9d5008ae" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "454c52634c3bac27de10bf0d85d5533eed1cf03f" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -8,7 +8,7 @@ CARGO_BUILD_OPTS := --locked
IMAGE_REGISTRY ?= docker.io
IMAGE_NAMESPACE ?= greptime
IMAGE_TAG ?= latest
DEV_BUILDER_IMAGE_TAG ?= 2025-04-15-1a517ec8-20250428023155
DEV_BUILDER_IMAGE_TAG ?= 2025-05-19-b2377d4b-20250520045554
BUILDX_MULTI_PLATFORM_BUILD ?= false
BUILDX_BUILDER_NAME ?= gtbuilder
BASE_IMAGE ?= ubuntu

View File

@@ -121,7 +121,7 @@ docker pull greptime/greptimedb
```shell
docker run -p 127.0.0.1:4000-4003:4000-4003 \
-v "$(pwd)/greptimedb:/greptimedb_data" \
-v "$(pwd)/greptimedb_data:/greptimedb_data" \
--name greptime --rm \
greptime/greptimedb:latest standalone start \
--http-addr 0.0.0.0:4000 \
@@ -129,7 +129,7 @@ docker run -p 127.0.0.1:4000-4003:4000-4003 \
--mysql-addr 0.0.0.0:4002 \
--postgres-addr 0.0.0.0:4003
```
Dashboard: [http://localhost:4000/dashboard](http://localhost:4000/dashboard)
Dashboard: [http://localhost:4000/dashboard](http://localhost:4000/dashboard)
[Full Install Guide](https://docs.greptime.com/getting-started/installation/overview)
**Troubleshooting:**
@@ -167,7 +167,7 @@ cargo run -- standalone start
## Project Status
> **Status:** Beta.
> **Status:** Beta.
> **GA (v1.0):** Targeted for mid 2025.
- Being used in production by early adopters
@@ -197,8 +197,8 @@ GreptimeDB is licensed under the [Apache License 2.0](https://apache.org/license
## Commercial Support
Running GreptimeDB in your organization?
We offer enterprise add-ons, services, training, and consulting.
Running GreptimeDB in your organization?
We offer enterprise add-ons, services, training, and consulting.
[Contact us](https://greptime.com/contactus) for details.
## Contributing

View File

@@ -27,6 +27,7 @@
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
| `http.enable_cors` | Bool | `true` | HTTP CORS support, it's turned on by default<br/>This allows browser to access http APIs without CORS restrictions |
| `http.cors_allowed_origins` | Array | Unset | Customize allowed origins for HTTP CORS. |
| `http.prom_validation_mode` | String | `strict` | Whether to enable validation for Prometheus remote write requests.<br/>Available options:<br/>- strict: deny invalid UTF-8 strings (default).<br/>- lossy: allow invalid UTF-8 strings, replace invalid characters with REPLACEMENT_CHARACTER(U+FFFD).<br/>- unchecked: do not valid strings. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
@@ -226,6 +227,7 @@
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
| `http.enable_cors` | Bool | `true` | HTTP CORS support, it's turned on by default<br/>This allows browser to access http APIs without CORS restrictions |
| `http.cors_allowed_origins` | Array | Unset | Customize allowed origins for HTTP CORS. |
| `http.prom_validation_mode` | String | `strict` | Whether to enable validation for Prometheus remote write requests.<br/>Available options:<br/>- strict: deny invalid UTF-8 strings (default).<br/>- lossy: allow invalid UTF-8 strings, replace invalid characters with REPLACEMENT_CHARACTER(U+FFFD).<br/>- unchecked: do not valid strings. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
| `grpc.server_addr` | String | `127.0.0.1:4001` | The address advertised to the metasrv, and used for connections from outside the host.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `grpc.bind_addr`. |
@@ -329,6 +331,10 @@
| `runtime` | -- | -- | The runtime options. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
| `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
| `procedure` | -- | -- | Procedure storage options. |
| `procedure.max_retry_times` | Integer | `12` | Procedure max retry time. |
| `procedure.retry_delay` | String | `500ms` | Initial retry delay of procedures, increases exponentially |

View File

@@ -37,6 +37,12 @@ enable_cors = true
## Customize allowed origins for HTTP CORS.
## @toml2docs:none-default
cors_allowed_origins = ["https://example.com"]
## Whether to enable validation for Prometheus remote write requests.
## Available options:
## - strict: deny invalid UTF-8 strings (default).
## - lossy: allow invalid UTF-8 strings, replace invalid characters with REPLACEMENT_CHARACTER(U+FFFD).
## - unchecked: do not valid strings.
prom_validation_mode = "strict"
## The gRPC server options.
[grpc]

View File

@@ -67,6 +67,17 @@ node_max_idle_time = "24hours"
## The number of threads to execute the runtime for global write operations.
#+ compact_rt_size = 4
## The HTTP server options.
[http]
## The address to bind the HTTP server.
addr = "127.0.0.1:4000"
## HTTP request timeout. Set to 0 to disable timeout.
timeout = "0s"
## HTTP request body limit.
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
## Set to 0 to disable limit.
body_limit = "64MB"
## Procedure storage options.
[procedure]

View File

@@ -43,6 +43,13 @@ enable_cors = true
## @toml2docs:none-default
cors_allowed_origins = ["https://example.com"]
## Whether to enable validation for Prometheus remote write requests.
## Available options:
## - strict: deny invalid UTF-8 strings (default).
## - lossy: allow invalid UTF-8 strings, replace invalid characters with REPLACEMENT_CHARACTER(U+FFFD).
## - unchecked: do not valid strings.
prom_validation_mode = "strict"
## The gRPC server options.
[grpc]
## The address to bind the gRPC server.

View File

@@ -1,6 +1,6 @@
# Profile memory usage of GreptimeDB
This crate provides an easy approach to dump memory profiling info. A set of ready to use scripts is provided in [docs/how-to/memory-profile-scripts](docs/how-to/memory-profile-scripts).
This crate provides an easy approach to dump memory profiling info. A set of ready to use scripts is provided in [docs/how-to/memory-profile-scripts](./memory-profile-scripts/scripts).
## Prerequisites
### jemalloc

8
flake.lock generated
View File

@@ -41,16 +41,16 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1745487689,
"narHash": "sha256-FQoi3R0NjQeBAsEOo49b5tbDPcJSMWc3QhhaIi9eddw=",
"lastModified": 1748162331,
"narHash": "sha256-rqc2RKYTxP3tbjA+PB3VMRQNnjesrT0pEofXQTrMsS8=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "5630cf13cceac06cefe9fc607e8dfa8fb342dde3",
"rev": "7c43f080a7f28b2774f3b3f43234ca11661bf334",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-24.11",
"ref": "nixos-25.05",
"repo": "nixpkgs",
"type": "github"
}

View File

@@ -2,7 +2,7 @@
description = "Development environment flake";
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-24.11";
nixpkgs.url = "github:NixOS/nixpkgs/nixos-25.05";
fenix = {
url = "github:nix-community/fenix";
inputs.nixpkgs.follows = "nixpkgs";
@@ -51,6 +51,7 @@
];
LD_LIBRARY_PATH = pkgs.lib.makeLibraryPath buildInputs;
NIX_HARDENING_ENABLE = "";
};
});
}

File diff suppressed because it is too large Load Diff

View File

@@ -46,6 +46,7 @@
| Ingest Rows per Instance | `sum by(instance, pod)(rate(greptime_table_operator_ingest_rows{instance=~"$frontend"}[$__rate_interval]))` | `timeseries` | Ingestion rate by row as in each frontend | `prometheus` | `rowsps` | `[{{instance}}]-[{{pod}}]` |
| Region Call QPS per Instance | `sum by(instance, pod, request_type) (rate(greptime_grpc_region_request_count{instance=~"$frontend"}[$__rate_interval]))` | `timeseries` | Region Call QPS per Instance. | `prometheus` | `ops` | `[{{instance}}]-[{{pod}}]-[{{request_type}}]` |
| Region Call P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, request_type) (rate(greptime_grpc_region_request_bucket{instance=~"$frontend"}[$__rate_interval])))` | `timeseries` | Region Call P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{request_type}}]` |
| Frontend Handle Bulk Insert Elapsed Time | `sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_sum[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_count[$__rate_interval]))`<br/>`histogram_quantile(0.99, sum by(instance, pod, stage, le) (rate(greptime_table_operator_handle_bulk_insert_bucket[$__rate_interval])))` | `timeseries` | Per-stage time for frontend to handle bulk insert requests | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG` |
# Mito Engine
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- |
@@ -59,7 +60,7 @@
| Read Stage P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_read_stage_elapsed_bucket{instance=~"$datanode"}[$__rate_interval])))` | `timeseries` | Read Stage P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]` |
| Write Stage P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_write_stage_elapsed_bucket{instance=~"$datanode"}[$__rate_interval])))` | `timeseries` | Write Stage P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]` |
| Compaction OPS per Instance | `sum by(instance, pod) (rate(greptime_mito_compaction_total_elapsed_count{instance=~"$datanode"}[$__rate_interval]))` | `timeseries` | Compaction OPS per Instance. | `prometheus` | `ops` | `[{{ instance }}]-[{{pod}}]` |
| Compaction P99 per Instance by Stage | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_bucket{instance=~"$datanode"}[$__rate_interval])))` | `timeseries` | Compaction latency by stage | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-p99` |
| Compaction Elapsed Time per Instance by Stage | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_bucket{instance=~"$datanode"}[$__rate_interval])))`<br/>`sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_sum{instance=~"$datanode"}[$__rate_interval]))/sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_count{instance=~"$datanode"}[$__rate_interval]))` | `timeseries` | Compaction latency by stage | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-p99` |
| Compaction P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le,stage) (rate(greptime_mito_compaction_total_elapsed_bucket{instance=~"$datanode"}[$__rate_interval])))` | `timeseries` | Compaction P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-compaction` |
| WAL write size | `histogram_quantile(0.95, sum by(le,instance, pod) (rate(raft_engine_write_size_bucket[$__rate_interval])))`<br/>`histogram_quantile(0.99, sum by(le,instance,pod) (rate(raft_engine_write_size_bucket[$__rate_interval])))`<br/>`sum by (instance, pod)(rate(raft_engine_write_size_sum[$__rate_interval]))` | `timeseries` | Write-ahead logs write size as bytes. This chart includes stats of p95 and p99 size by instance, total WAL write rate. | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-req-size-p95` |
| Cached Bytes per Instance | `greptime_mito_cache_bytes{instance=~"$datanode"}` | `timeseries` | Cached Bytes per Instance. | `prometheus` | `decbytes` | `[{{instance}}]-[{{pod}}]-[{{type}}]` |
@@ -67,6 +68,9 @@
| WAL sync duration seconds | `histogram_quantile(0.99, sum by(le, type, node, instance, pod) (rate(raft_engine_sync_log_duration_seconds_bucket[$__rate_interval])))` | `timeseries` | Raft engine (local disk) log store sync latency, p99 | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-p99` |
| Log Store op duration seconds | `histogram_quantile(0.99, sum by(le,logstore,optype,instance, pod) (rate(greptime_logstore_op_elapsed_bucket[$__rate_interval])))` | `timeseries` | Write-ahead log operations latency at p99 | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{logstore}}]-[{{optype}}]-p99` |
| Inflight Flush | `greptime_mito_inflight_flush_count` | `timeseries` | Ongoing flush task count | `prometheus` | `none` | `[{{instance}}]-[{{pod}}]` |
| Compaction Input/Output Bytes | `sum by(instance, pod) (greptime_mito_compaction_input_bytes)`<br/>`sum by(instance, pod) (greptime_mito_compaction_output_bytes)` | `timeseries` | Compaction oinput output bytes | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-input` |
| Region Worker Handle Bulk Insert Requests | `histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))`<br/>`sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to handle bulk insert region requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
| Region Worker Convert Requests | `histogram_quantile(0.95, sum by(le, instance, stage, pod) (rate(greptime_datanode_convert_region_request_bucket[$__rate_interval])))`<br/>`sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to decode requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
# OpenDAL
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- |

View File

@@ -371,6 +371,21 @@ groups:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{request_type}}]'
- title: 'Frontend Handle Bulk Insert Elapsed Time '
type: timeseries
description: Per-stage time for frontend to handle bulk insert requests
unit: s
queries:
- expr: sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_sum[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_count[$__rate_interval]))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG'
- expr: histogram_quantile(0.99, sum by(instance, pod, stage, le) (rate(greptime_table_operator_handle_bulk_insert_bucket[$__rate_interval])))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-P95'
- title: Mito Engine
panels:
- title: Request OPS per Instance
@@ -472,7 +487,7 @@ groups:
type: prometheus
uid: ${metrics}
legendFormat: '[{{ instance }}]-[{{pod}}]'
- title: Compaction P99 per Instance by Stage
- title: Compaction Elapsed Time per Instance by Stage
type: timeseries
description: Compaction latency by stage
unit: s
@@ -482,6 +497,11 @@ groups:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-p99'
- expr: sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_sum{instance=~"$datanode"}[$__rate_interval]))/sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_count{instance=~"$datanode"}[$__rate_interval]))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-avg'
- title: Compaction P99 per Instance
type: timeseries
description: Compaction P99 per Instance.
@@ -562,6 +582,51 @@ groups:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]'
- title: Compaction Input/Output Bytes
type: timeseries
description: Compaction oinput output bytes
unit: bytes
queries:
- expr: sum by(instance, pod) (greptime_mito_compaction_input_bytes)
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-input'
- expr: sum by(instance, pod) (greptime_mito_compaction_output_bytes)
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-output'
- title: Region Worker Handle Bulk Insert Requests
type: timeseries
description: Per-stage elapsed time for region worker to handle bulk insert region requests.
unit: s
queries:
- expr: histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-P95'
- expr: sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG'
- title: Region Worker Convert Requests
type: timeseries
description: Per-stage elapsed time for region worker to decode requests.
unit: s
queries:
- expr: histogram_quantile(0.95, sum by(le, instance, stage, pod) (rate(greptime_datanode_convert_region_request_bucket[$__rate_interval])))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-P95'
- expr: sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_count[$__rate_interval]))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG'
- title: OpenDAL
panels:
- title: QPS per Instance

File diff suppressed because it is too large Load Diff

View File

@@ -46,6 +46,7 @@
| Ingest Rows per Instance | `sum by(instance, pod)(rate(greptime_table_operator_ingest_rows{}[$__rate_interval]))` | `timeseries` | Ingestion rate by row as in each frontend | `prometheus` | `rowsps` | `[{{instance}}]-[{{pod}}]` |
| Region Call QPS per Instance | `sum by(instance, pod, request_type) (rate(greptime_grpc_region_request_count{}[$__rate_interval]))` | `timeseries` | Region Call QPS per Instance. | `prometheus` | `ops` | `[{{instance}}]-[{{pod}}]-[{{request_type}}]` |
| Region Call P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, request_type) (rate(greptime_grpc_region_request_bucket{}[$__rate_interval])))` | `timeseries` | Region Call P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{request_type}}]` |
| Frontend Handle Bulk Insert Elapsed Time | `sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_sum[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_count[$__rate_interval]))`<br/>`histogram_quantile(0.99, sum by(instance, pod, stage, le) (rate(greptime_table_operator_handle_bulk_insert_bucket[$__rate_interval])))` | `timeseries` | Per-stage time for frontend to handle bulk insert requests | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG` |
# Mito Engine
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- |
@@ -59,7 +60,7 @@
| Read Stage P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_read_stage_elapsed_bucket{}[$__rate_interval])))` | `timeseries` | Read Stage P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]` |
| Write Stage P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_write_stage_elapsed_bucket{}[$__rate_interval])))` | `timeseries` | Write Stage P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]` |
| Compaction OPS per Instance | `sum by(instance, pod) (rate(greptime_mito_compaction_total_elapsed_count{}[$__rate_interval]))` | `timeseries` | Compaction OPS per Instance. | `prometheus` | `ops` | `[{{ instance }}]-[{{pod}}]` |
| Compaction P99 per Instance by Stage | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_bucket{}[$__rate_interval])))` | `timeseries` | Compaction latency by stage | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-p99` |
| Compaction Elapsed Time per Instance by Stage | `histogram_quantile(0.99, sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_bucket{}[$__rate_interval])))`<br/>`sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_sum{}[$__rate_interval]))/sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_count{}[$__rate_interval]))` | `timeseries` | Compaction latency by stage | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-p99` |
| Compaction P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le,stage) (rate(greptime_mito_compaction_total_elapsed_bucket{}[$__rate_interval])))` | `timeseries` | Compaction P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-compaction` |
| WAL write size | `histogram_quantile(0.95, sum by(le,instance, pod) (rate(raft_engine_write_size_bucket[$__rate_interval])))`<br/>`histogram_quantile(0.99, sum by(le,instance,pod) (rate(raft_engine_write_size_bucket[$__rate_interval])))`<br/>`sum by (instance, pod)(rate(raft_engine_write_size_sum[$__rate_interval]))` | `timeseries` | Write-ahead logs write size as bytes. This chart includes stats of p95 and p99 size by instance, total WAL write rate. | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-req-size-p95` |
| Cached Bytes per Instance | `greptime_mito_cache_bytes{}` | `timeseries` | Cached Bytes per Instance. | `prometheus` | `decbytes` | `[{{instance}}]-[{{pod}}]-[{{type}}]` |
@@ -67,6 +68,9 @@
| WAL sync duration seconds | `histogram_quantile(0.99, sum by(le, type, node, instance, pod) (rate(raft_engine_sync_log_duration_seconds_bucket[$__rate_interval])))` | `timeseries` | Raft engine (local disk) log store sync latency, p99 | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-p99` |
| Log Store op duration seconds | `histogram_quantile(0.99, sum by(le,logstore,optype,instance, pod) (rate(greptime_logstore_op_elapsed_bucket[$__rate_interval])))` | `timeseries` | Write-ahead log operations latency at p99 | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{logstore}}]-[{{optype}}]-p99` |
| Inflight Flush | `greptime_mito_inflight_flush_count` | `timeseries` | Ongoing flush task count | `prometheus` | `none` | `[{{instance}}]-[{{pod}}]` |
| Compaction Input/Output Bytes | `sum by(instance, pod) (greptime_mito_compaction_input_bytes)`<br/>`sum by(instance, pod) (greptime_mito_compaction_output_bytes)` | `timeseries` | Compaction oinput output bytes | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-input` |
| Region Worker Handle Bulk Insert Requests | `histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))`<br/>`sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to handle bulk insert region requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
| Region Worker Convert Requests | `histogram_quantile(0.95, sum by(le, instance, stage, pod) (rate(greptime_datanode_convert_region_request_bucket[$__rate_interval])))`<br/>`sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to decode requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
# OpenDAL
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- |

View File

@@ -371,6 +371,21 @@ groups:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{request_type}}]'
- title: 'Frontend Handle Bulk Insert Elapsed Time '
type: timeseries
description: Per-stage time for frontend to handle bulk insert requests
unit: s
queries:
- expr: sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_sum[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_count[$__rate_interval]))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG'
- expr: histogram_quantile(0.99, sum by(instance, pod, stage, le) (rate(greptime_table_operator_handle_bulk_insert_bucket[$__rate_interval])))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-P95'
- title: Mito Engine
panels:
- title: Request OPS per Instance
@@ -472,7 +487,7 @@ groups:
type: prometheus
uid: ${metrics}
legendFormat: '[{{ instance }}]-[{{pod}}]'
- title: Compaction P99 per Instance by Stage
- title: Compaction Elapsed Time per Instance by Stage
type: timeseries
description: Compaction latency by stage
unit: s
@@ -482,6 +497,11 @@ groups:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-p99'
- expr: sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_sum{}[$__rate_interval]))/sum by(instance, pod, le, stage) (rate(greptime_mito_compaction_stage_elapsed_count{}[$__rate_interval]))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-avg'
- title: Compaction P99 per Instance
type: timeseries
description: Compaction P99 per Instance.
@@ -562,6 +582,51 @@ groups:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]'
- title: Compaction Input/Output Bytes
type: timeseries
description: Compaction oinput output bytes
unit: bytes
queries:
- expr: sum by(instance, pod) (greptime_mito_compaction_input_bytes)
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-input'
- expr: sum by(instance, pod) (greptime_mito_compaction_output_bytes)
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-output'
- title: Region Worker Handle Bulk Insert Requests
type: timeseries
description: Per-stage elapsed time for region worker to handle bulk insert region requests.
unit: s
queries:
- expr: histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-P95'
- expr: sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG'
- title: Region Worker Convert Requests
type: timeseries
description: Per-stage elapsed time for region worker to decode requests.
unit: s
queries:
- expr: histogram_quantile(0.95, sum by(le, instance, stage, pod) (rate(greptime_datanode_convert_region_request_bucket[$__rate_interval])))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-P95'
- expr: sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_count[$__rate_interval]))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG'
- title: OpenDAL
panels:
- title: QPS per Instance

View File

@@ -6,7 +6,7 @@ DAC_IMAGE=ghcr.io/zyy17/dac:20250423-522bd35
remove_instance_filters() {
# Remove the instance filters for the standalone dashboards.
sed 's/instance=~\\"$datanode\\",//; s/instance=~\\"$datanode\\"//; s/instance=~\\"$frontend\\",//; s/instance=~\\"$frontend\\"//; s/instance=~\\"$metasrv\\",//; s/instance=~\\"$metasrv\\"//; s/instance=~\\"$flownode\\",//; s/instance=~\\"$flownode\\"//;' $CLUSTER_DASHBOARD_DIR/dashboard.json > $STANDALONE_DASHBOARD_DIR/dashboard.json
sed -E 's/instance=~\\"(\$datanode|\$frontend|\$metasrv|\$flownode)\\",?//g' "$CLUSTER_DASHBOARD_DIR/dashboard.json" > "$STANDALONE_DASHBOARD_DIR/dashboard.json"
}
generate_intermediate_dashboards_and_docs() {

View File

@@ -26,6 +26,13 @@ excludes = [
"src/common/base/src/secrets.rs",
"src/servers/src/repeated_field.rs",
"src/servers/src/http/test_helpers.rs",
# enterprise
"src/common/meta/src/rpc/ddl/trigger.rs",
"src/operator/src/expr_helper/trigger.rs",
"src/sql/src/statements/create/trigger.rs",
"src/sql/src/statements/show/trigger.rs",
"src/sql/src/parsers/create_parser/trigger.rs",
"src/sql/src/parsers/show_parser/trigger.rs",
]
[properties]

View File

@@ -5,8 +5,12 @@ edition.workspace = true
license.workspace = true
[features]
pg_kvbackend = ["common-meta/pg_kvbackend"]
mysql_kvbackend = ["common-meta/mysql_kvbackend"]
default = [
"pg_kvbackend",
"mysql_kvbackend",
]
pg_kvbackend = ["common-meta/pg_kvbackend", "meta-srv/pg_kvbackend"]
mysql_kvbackend = ["common-meta/mysql_kvbackend", "meta-srv/mysql_kvbackend"]
[lints]
workspace = true
@@ -43,11 +47,9 @@ etcd-client.workspace = true
futures.workspace = true
humantime.workspace = true
meta-client.workspace = true
meta-srv.workspace = true
nu-ansi-term = "0.46"
opendal = { version = "0.51.1", features = [
"services-fs",
"services-s3",
] }
object-store.workspace = true
query.workspace = true
rand.workspace = true
reqwest.workspace = true

View File

@@ -17,6 +17,7 @@ use std::any::Any;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use object_store::Error as ObjectStoreError;
use snafu::{Location, Snafu};
#[derive(Snafu)]
@@ -225,7 +226,7 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: opendal::Error,
error: ObjectStoreError,
},
#[snafu(display("S3 config need be set"))]
S3ConfigNotSet {
@@ -237,6 +238,12 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("KV backend not set: {}", backend))]
KvBackendNotSet {
backend: String,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -273,8 +280,9 @@ impl ErrorExt for Error {
Error::Other { source, .. } => source.status_code(),
Error::OpenDal { .. } => StatusCode::Internal,
Error::S3ConfigNotSet { .. } => StatusCode::InvalidArguments,
Error::OutputDirNotSet { .. } => StatusCode::InvalidArguments,
Error::S3ConfigNotSet { .. }
| Error::OutputDirNotSet { .. }
| Error::KvBackendNotSet { .. } => StatusCode::InvalidArguments,
Error::BuildRuntime { source, .. } => source.status_code(),

View File

@@ -21,8 +21,8 @@ use async_trait::async_trait;
use clap::{Parser, ValueEnum};
use common_error::ext::BoxedError;
use common_telemetry::{debug, error, info};
use opendal::layers::LoggingLayer;
use opendal::{services, Operator};
use object_store::layers::LoggingLayer;
use object_store::{services, ObjectStore};
use serde_json::Value;
use snafu::{OptionExt, ResultExt};
use tokio::sync::Semaphore;
@@ -470,7 +470,7 @@ impl Export {
Ok(())
}
async fn build_operator(&self) -> Result<Operator> {
async fn build_operator(&self) -> Result<ObjectStore> {
if self.s3 {
self.build_s3_operator().await
} else {
@@ -479,11 +479,11 @@ impl Export {
}
/// build operator with preference for file system
async fn build_prefer_fs_operator(&self) -> Result<Operator> {
async fn build_prefer_fs_operator(&self) -> Result<ObjectStore> {
// is under s3 mode and s3_ddl_dir is set, use it as root
if self.s3 && self.s3_ddl_local_dir.is_some() {
let root = self.s3_ddl_local_dir.as_ref().unwrap().clone();
let op = Operator::new(services::Fs::default().root(&root))
let op = ObjectStore::new(services::Fs::default().root(&root))
.context(OpenDalSnafu)?
.layer(LoggingLayer::default())
.finish();
@@ -495,7 +495,7 @@ impl Export {
}
}
async fn build_s3_operator(&self) -> Result<Operator> {
async fn build_s3_operator(&self) -> Result<ObjectStore> {
let mut builder = services::S3::default().bucket(
self.s3_bucket
.as_ref()
@@ -522,20 +522,20 @@ impl Export {
builder = builder.secret_access_key(secret_key);
}
let op = Operator::new(builder)
let op = ObjectStore::new(builder)
.context(OpenDalSnafu)?
.layer(LoggingLayer::default())
.finish();
Ok(op)
}
async fn build_fs_operator(&self) -> Result<Operator> {
async fn build_fs_operator(&self) -> Result<ObjectStore> {
let root = self
.output_dir
.as_ref()
.context(OutputDirNotSetSnafu)?
.clone();
let op = Operator::new(services::Fs::default().root(&root))
let op = ObjectStore::new(services::Fs::default().root(&root))
.context(OpenDalSnafu)?
.layer(LoggingLayer::default())
.finish();
@@ -642,11 +642,14 @@ impl Export {
async fn write_to_storage(
&self,
op: &Operator,
op: &ObjectStore,
file_path: &str,
content: Vec<u8>,
) -> Result<()> {
op.write(file_path, content).await.context(OpenDalSnafu)
op.write(file_path, content)
.await
.context(OpenDalSnafu)
.map(|_| ())
}
fn get_storage_params(&self, schema: &str) -> (String, String) {

View File

@@ -17,6 +17,7 @@ mod database;
pub mod error;
mod export;
mod import;
mod meta_snapshot;
use async_trait::async_trait;
use clap::Parser;
@@ -27,6 +28,7 @@ use error::Result;
pub use crate::bench::BenchTableMetadataCommand;
pub use crate::export::ExportCommand;
pub use crate::import::ImportCommand;
pub use crate::meta_snapshot::{MetaRestoreCommand, MetaSnapshotCommand};
#[async_trait]
pub trait Tool: Send + Sync {

View File

@@ -0,0 +1,329 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use async_trait::async_trait;
use clap::Parser;
use common_base::secrets::{ExposeSecret, SecretString};
use common_error::ext::BoxedError;
use common_meta::kv_backend::chroot::ChrootKvBackend;
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::KvBackendRef;
use common_meta::snapshot::MetadataSnapshotManager;
use meta_srv::bootstrap::create_etcd_client;
use meta_srv::metasrv::BackendImpl;
use object_store::services::{Fs, S3};
use object_store::ObjectStore;
use snafu::ResultExt;
use crate::error::{KvBackendNotSetSnafu, OpenDalSnafu, S3ConfigNotSetSnafu};
use crate::Tool;
#[derive(Debug, Default, Parser)]
struct MetaConnection {
/// The endpoint of store. one of etcd, pg or mysql.
#[clap(long, alias = "store-addr", value_delimiter = ',', num_args = 1..)]
store_addrs: Vec<String>,
/// The database backend.
#[clap(long, value_enum)]
backend: Option<BackendImpl>,
#[clap(long, default_value = "")]
store_key_prefix: String,
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
#[clap(long,default_value = common_meta::kv_backend::DEFAULT_META_TABLE_NAME)]
meta_table_name: String,
#[clap(long, default_value = "128")]
max_txn_ops: usize,
}
impl MetaConnection {
pub async fn build(&self) -> Result<KvBackendRef, BoxedError> {
let max_txn_ops = self.max_txn_ops;
let store_addrs = &self.store_addrs;
if store_addrs.is_empty() {
KvBackendNotSetSnafu { backend: "all" }
.fail()
.map_err(BoxedError::new)
} else {
let kvbackend = match self.backend {
Some(BackendImpl::EtcdStore) => {
let etcd_client = create_etcd_client(store_addrs)
.await
.map_err(BoxedError::new)?;
Ok(EtcdStore::with_etcd_client(etcd_client, max_txn_ops))
}
#[cfg(feature = "pg_kvbackend")]
Some(BackendImpl::PostgresStore) => {
let table_name = &self.meta_table_name;
let pool = meta_srv::bootstrap::create_postgres_pool(store_addrs)
.await
.map_err(BoxedError::new)?;
Ok(common_meta::kv_backend::rds::PgStore::with_pg_pool(
pool,
table_name,
max_txn_ops,
)
.await
.map_err(BoxedError::new)?)
}
#[cfg(feature = "mysql_kvbackend")]
Some(BackendImpl::MysqlStore) => {
let table_name = &self.meta_table_name;
let pool = meta_srv::bootstrap::create_mysql_pool(store_addrs)
.await
.map_err(BoxedError::new)?;
Ok(common_meta::kv_backend::rds::MySqlStore::with_mysql_pool(
pool,
table_name,
max_txn_ops,
)
.await
.map_err(BoxedError::new)?)
}
_ => KvBackendNotSetSnafu { backend: "all" }
.fail()
.map_err(BoxedError::new),
};
if self.store_key_prefix.is_empty() {
kvbackend
} else {
let chroot_kvbackend =
ChrootKvBackend::new(self.store_key_prefix.as_bytes().to_vec(), kvbackend?);
Ok(Arc::new(chroot_kvbackend))
}
}
}
}
// TODO(qtang): Abstract a generic s3 config for export import meta snapshot restore
#[derive(Debug, Default, Parser)]
struct S3Config {
/// whether to use s3 as the output directory. default is false.
#[clap(long, default_value = "false")]
s3: bool,
/// The s3 bucket name.
#[clap(long)]
s3_bucket: Option<String>,
/// The s3 region.
#[clap(long)]
s3_region: Option<String>,
/// The s3 access key.
#[clap(long)]
s3_access_key: Option<SecretString>,
/// The s3 secret key.
#[clap(long)]
s3_secret_key: Option<SecretString>,
/// The s3 endpoint. we will automatically use the default s3 decided by the region if not set.
#[clap(long)]
s3_endpoint: Option<String>,
}
impl S3Config {
pub fn build(&self, root: &str) -> Result<Option<ObjectStore>, BoxedError> {
if !self.s3 {
Ok(None)
} else {
if self.s3_region.is_none()
|| self.s3_access_key.is_none()
|| self.s3_secret_key.is_none()
|| self.s3_bucket.is_none()
{
return S3ConfigNotSetSnafu.fail().map_err(BoxedError::new);
}
// Safety, unwrap is safe because we have checked the options above.
let mut config = S3::default()
.bucket(self.s3_bucket.as_ref().unwrap())
.region(self.s3_region.as_ref().unwrap())
.access_key_id(self.s3_access_key.as_ref().unwrap().expose_secret())
.secret_access_key(self.s3_secret_key.as_ref().unwrap().expose_secret());
if !root.is_empty() && root != "." {
config = config.root(root);
}
if let Some(endpoint) = &self.s3_endpoint {
config = config.endpoint(endpoint);
}
Ok(Some(
ObjectStore::new(config)
.context(OpenDalSnafu)
.map_err(BoxedError::new)?
.finish(),
))
}
}
}
/// Export metadata snapshot tool.
/// This tool is used to export metadata snapshot from etcd, pg or mysql.
/// It will dump the metadata snapshot to local file or s3 bucket.
/// The snapshot file will be in binary format.
#[derive(Debug, Default, Parser)]
pub struct MetaSnapshotCommand {
/// The connection to the metadata store.
#[clap(flatten)]
connection: MetaConnection,
/// The s3 config.
#[clap(flatten)]
s3_config: S3Config,
/// The name of the target snapshot file. we will add the file extension automatically.
#[clap(long, default_value = "metadata_snapshot")]
file_name: String,
/// The directory to store the snapshot file.
/// if target output is s3 bucket, this is the root directory in the bucket.
/// if target output is local file, this is the local directory.
#[clap(long, default_value = "")]
output_dir: String,
}
fn create_local_file_object_store(root: &str) -> Result<ObjectStore, BoxedError> {
let root = if root.is_empty() { "." } else { root };
let object_store = ObjectStore::new(Fs::default().root(root))
.context(OpenDalSnafu)
.map_err(BoxedError::new)?
.finish();
Ok(object_store)
}
impl MetaSnapshotCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
let kvbackend = self.connection.build().await?;
let output_dir = &self.output_dir;
let object_store = self.s3_config.build(output_dir).map_err(BoxedError::new)?;
if let Some(store) = object_store {
let tool = MetaSnapshotTool {
inner: MetadataSnapshotManager::new(kvbackend, store),
target_file: self.file_name.clone(),
};
Ok(Box::new(tool))
} else {
let object_store = create_local_file_object_store(output_dir)?;
let tool = MetaSnapshotTool {
inner: MetadataSnapshotManager::new(kvbackend, object_store),
target_file: self.file_name.clone(),
};
Ok(Box::new(tool))
}
}
}
pub struct MetaSnapshotTool {
inner: MetadataSnapshotManager,
target_file: String,
}
#[async_trait]
impl Tool for MetaSnapshotTool {
async fn do_work(&self) -> std::result::Result<(), BoxedError> {
self.inner
.dump("", &self.target_file)
.await
.map_err(BoxedError::new)?;
Ok(())
}
}
/// Restore metadata snapshot tool.
/// This tool is used to restore metadata snapshot from etcd, pg or mysql.
/// It will restore the metadata snapshot from local file or s3 bucket.
#[derive(Debug, Default, Parser)]
pub struct MetaRestoreCommand {
/// The connection to the metadata store.
#[clap(flatten)]
connection: MetaConnection,
/// The s3 config.
#[clap(flatten)]
s3_config: S3Config,
/// The name of the target snapshot file.
#[clap(long, default_value = "metadata_snapshot.metadata.fb")]
file_name: String,
/// The directory to store the snapshot file.
#[clap(long, default_value = ".")]
input_dir: String,
#[clap(long, default_value = "false")]
force: bool,
}
impl MetaRestoreCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
let kvbackend = self.connection.build().await?;
let input_dir = &self.input_dir;
let object_store = self.s3_config.build(input_dir).map_err(BoxedError::new)?;
if let Some(store) = object_store {
let tool = MetaRestoreTool::new(
MetadataSnapshotManager::new(kvbackend, store),
self.file_name.clone(),
self.force,
);
Ok(Box::new(tool))
} else {
let object_store = create_local_file_object_store(input_dir)?;
let tool = MetaRestoreTool::new(
MetadataSnapshotManager::new(kvbackend, object_store),
self.file_name.clone(),
self.force,
);
Ok(Box::new(tool))
}
}
}
pub struct MetaRestoreTool {
inner: MetadataSnapshotManager,
source_file: String,
force: bool,
}
impl MetaRestoreTool {
pub fn new(inner: MetadataSnapshotManager, source_file: String, force: bool) -> Self {
Self {
inner,
source_file,
force,
}
}
}
#[async_trait]
impl Tool for MetaRestoreTool {
async fn do_work(&self) -> std::result::Result<(), BoxedError> {
let clean = self
.inner
.check_target_source_clean()
.await
.map_err(BoxedError::new)?;
if clean {
common_telemetry::info!(
"The target source is clean, we will restore the metadata snapshot."
);
self.inner
.restore(&self.source_file)
.await
.map_err(BoxedError::new)?;
Ok(())
} else if !self.force {
common_telemetry::warn!(
"The target source is not clean, if you want to restore the metadata snapshot forcefully, please use --force option."
);
Ok(())
} else {
common_telemetry::info!("The target source is not clean, We will restore the metadata snapshot with --force.");
self.inner
.restore(&self.source_file)
.await
.map_err(BoxedError::new)?;
Ok(())
}
}
}

View File

@@ -25,6 +25,7 @@ common-meta.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true
common-telemetry.workspace = true
datatypes.workspace = true
enum_dispatch = "0.3"
futures.workspace = true
futures-util.workspace = true

View File

@@ -14,6 +14,7 @@
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use api::v1::auth_header::AuthScheme;
use api::v1::ddl_request::Expr as DdlExpr;
@@ -35,7 +36,7 @@ use common_grpc::flight::do_put::DoPutResponse;
use common_grpc::flight::{FlightDecoder, FlightMessage};
use common_query::Output;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::RecordBatchStreamWrapper;
use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper};
use common_telemetry::tracing_context::W3cTrace;
use common_telemetry::{error, warn};
use futures::future;
@@ -49,7 +50,7 @@ use crate::error::{
ConvertFlightDataSnafu, Error, FlightGetSnafu, IllegalFlightMessagesSnafu,
InvalidTonicMetadataValueSnafu, ServerSnafu,
};
use crate::{from_grpc_response, Client, Result};
use crate::{error, from_grpc_response, Client, Result};
type FlightDataStream = Pin<Box<dyn Stream<Item = FlightData> + Send>>;
@@ -337,20 +338,30 @@ impl Database {
);
Ok(Output::new_with_affected_rows(rows))
}
FlightMessage::Recordbatch(_) | FlightMessage::Metrics(_) => {
FlightMessage::RecordBatch(_) | FlightMessage::Metrics(_) => {
IllegalFlightMessagesSnafu {
reason: "The first flight message cannot be a RecordBatch or Metrics message",
}
.fail()
}
FlightMessage::Schema(schema) => {
let schema = Arc::new(
datatypes::schema::Schema::try_from(schema)
.context(error::ConvertSchemaSnafu)?,
);
let schema_cloned = schema.clone();
let stream = Box::pin(stream!({
while let Some(flight_message) = flight_message_stream.next().await {
let flight_message = flight_message
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
match flight_message {
FlightMessage::Recordbatch(record_batch) => yield Ok(record_batch),
FlightMessage::RecordBatch(arrow_batch) => {
yield RecordBatch::try_from_df_record_batch(
schema_cloned.clone(),
arrow_batch,
)
}
FlightMessage::Metrics(_) => {}
FlightMessage::AffectedRows(_) | FlightMessage::Schema(_) => {
yield IllegalFlightMessagesSnafu {reason: format!("A Schema message must be succeeded exclusively by a set of RecordBatch messages, flight_message: {:?}", flight_message)}

View File

@@ -117,6 +117,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to convert Schema"))]
ConvertSchema {
#[snafu(implicit)]
location: Location,
source: datatypes::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -137,6 +144,7 @@ impl ErrorExt for Error {
| Error::CreateTlsChannel { source, .. } => source.status_code(),
Error::IllegalGrpcClientState { .. } => StatusCode::Unexpected,
Error::InvalidTonicMetadataValue { .. } => StatusCode::InvalidArguments,
Error::ConvertSchema { source, .. } => source.status_code(),
}
}

View File

@@ -28,7 +28,7 @@ use common_meta::error::{self as meta_error, Result as MetaResult};
use common_meta::node_manager::Datanode;
use common_query::request::QueryRequest;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream};
use common_telemetry::error;
use common_telemetry::tracing_context::TracingContext;
use prost::Message;
@@ -55,6 +55,7 @@ impl Datanode for RegionRequester {
if err.should_retry() {
meta_error::Error::RetryLater {
source: BoxedError::new(err),
clean_poisons: false,
}
} else {
meta_error::Error::External {
@@ -146,6 +147,10 @@ impl RegionRequester {
let tracing_context = TracingContext::from_current_span();
let schema = Arc::new(
datatypes::schema::Schema::try_from(schema).context(error::ConvertSchemaSnafu)?,
);
let schema_cloned = schema.clone();
let stream = Box::pin(stream!({
let _span = tracing_context.attach(common_telemetry::tracing::info_span!(
"poll_flight_data_stream"
@@ -156,7 +161,12 @@ impl RegionRequester {
.context(ExternalSnafu)?;
match flight_message {
FlightMessage::Recordbatch(record_batch) => yield Ok(record_batch),
FlightMessage::RecordBatch(record_batch) => {
yield RecordBatch::try_from_df_record_batch(
schema_cloned.clone(),
record_batch,
)
}
FlightMessage::Metrics(s) => {
let m = serde_json::from_str(&s).ok().map(Arc::new);
metrics_ref.swap(m);

View File

@@ -10,7 +10,13 @@ name = "greptime"
path = "src/bin/greptime.rs"
[features]
default = ["servers/pprof", "servers/mem-prof"]
default = [
"servers/pprof",
"servers/mem-prof",
"meta-srv/pg_kvbackend",
"meta-srv/mysql_kvbackend",
]
enterprise = ["common-meta/enterprise", "frontend/enterprise", "meta-srv/enterprise"]
tokio-console = ["common-telemetry/tokio-console"]
[lints]

View File

@@ -35,6 +35,8 @@ use common_meta::ddl::flow_meta::{FlowMetadataAllocator, FlowMetadataAllocatorRe
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl, ProcedureExecutorRef};
use common_meta::ddl_manager::DdlManager;
#[cfg(feature = "enterprise")]
use common_meta::ddl_manager::TriggerDdlManagerRef;
use common_meta::key::flow::flow_state::FlowStat;
use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
@@ -69,6 +71,7 @@ use frontend::service_config::{
};
use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
use mito2::config::MitoConfig;
use query::options::QueryOptions;
use serde::{Deserialize, Serialize};
use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask};
use servers::grpc::GrpcOptions;
@@ -153,6 +156,7 @@ pub struct StandaloneOptions {
pub init_regions_parallelism: usize,
pub max_in_flight_write_bytes: Option<ReadableSize>,
pub slow_query: Option<SlowQueryOptions>,
pub query: QueryOptions,
}
impl Default for StandaloneOptions {
@@ -185,6 +189,7 @@ impl Default for StandaloneOptions {
init_regions_parallelism: 16,
max_in_flight_write_bytes: None,
slow_query: Some(SlowQueryOptions::default()),
query: QueryOptions::default(),
}
}
}
@@ -240,6 +245,7 @@ impl StandaloneOptions {
grpc: cloned_opts.grpc,
init_regions_in_background: cloned_opts.init_regions_in_background,
init_regions_parallelism: cloned_opts.init_regions_parallelism,
query: cloned_opts.query,
..Default::default()
}
}
@@ -579,6 +585,8 @@ impl StartCommand {
flow_id_sequence,
));
#[cfg(feature = "enterprise")]
let trigger_ddl_manager: Option<TriggerDdlManagerRef> = plugins.get();
let ddl_task_executor = Self::create_ddl_task_executor(
procedure_manager.clone(),
node_manager.clone(),
@@ -587,6 +595,8 @@ impl StartCommand {
table_meta_allocator,
flow_metadata_manager,
flow_meta_allocator,
#[cfg(feature = "enterprise")]
trigger_ddl_manager,
)
.await?;
@@ -651,6 +661,7 @@ impl StartCommand {
})
}
#[allow(clippy::too_many_arguments)]
pub async fn create_ddl_task_executor(
procedure_manager: ProcedureManagerRef,
node_manager: NodeManagerRef,
@@ -659,6 +670,7 @@ impl StartCommand {
table_metadata_allocator: TableMetadataAllocatorRef,
flow_metadata_manager: FlowMetadataManagerRef,
flow_metadata_allocator: FlowMetadataAllocatorRef,
#[cfg(feature = "enterprise")] trigger_ddl_manager: Option<TriggerDdlManagerRef>,
) -> Result<ProcedureExecutorRef> {
let procedure_executor: ProcedureExecutorRef = Arc::new(
DdlManager::try_new(
@@ -675,6 +687,8 @@ impl StartCommand {
},
procedure_manager,
true,
#[cfg(feature = "enterprise")]
trigger_ddl_manager,
)
.context(error::InitDdlManagerSnafu)?,
);

View File

@@ -10,6 +10,7 @@ workspace = true
[dependencies]
api.workspace = true
arrow-flight.workspace = true
bytes.workspace = true
common-base.workspace = true
common-error.workspace = true
common-macro.workspace = true

View File

@@ -0,0 +1,146 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use arrow_flight::FlightData;
use bytes::Bytes;
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
use common_recordbatch::DfRecordBatch;
use criterion::{criterion_group, criterion_main, Criterion};
use datatypes::arrow;
use datatypes::arrow::array::{ArrayRef, Int64Array, StringArray, TimestampMillisecondArray};
use datatypes::arrow::datatypes::DataType;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use prost::Message;
fn schema() -> arrow::datatypes::SchemaRef {
let schema = Schema::new(vec![
ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), false),
ColumnSchema::new("v1", ConcreteDataType::int64_datatype(), false),
]);
schema.arrow_schema().clone()
}
/// Generate record batch according to provided schema and num rows.
fn prepare_random_record_batch(
schema: arrow::datatypes::SchemaRef,
num_rows: usize,
) -> DfRecordBatch {
let tag_candidates = (0..10000).map(|i| i.to_string()).collect::<Vec<_>>();
let columns: Vec<ArrayRef> = schema
.fields
.iter()
.map(|col| match col.data_type() {
DataType::Utf8 => {
let array = StringArray::from(
(0..num_rows)
.map(|_| {
let idx: usize = rand::random_range(0..10000);
format!("tag-{}", tag_candidates[idx])
})
.collect::<Vec<_>>(),
);
Arc::new(array) as ArrayRef
}
DataType::Timestamp(_, _) => {
let now = common_time::util::current_time_millis();
let array = TimestampMillisecondArray::from(
(0..num_rows).map(|i| now + i as i64).collect::<Vec<_>>(),
);
Arc::new(array) as ArrayRef
}
DataType::Int64 => {
let array = Int64Array::from((0..num_rows).map(|i| i as i64).collect::<Vec<_>>());
Arc::new(array) as ArrayRef
}
_ => unreachable!(),
})
.collect();
DfRecordBatch::try_new(schema, columns).unwrap()
}
fn prepare_flight_data(num_rows: usize) -> (FlightData, FlightData) {
let schema = schema();
let mut encoder = FlightEncoder::default();
let schema_data = encoder.encode(FlightMessage::Schema(schema.clone()));
let rb = prepare_random_record_batch(schema, num_rows);
let rb_data = encoder.encode(FlightMessage::RecordBatch(rb));
(schema_data, rb_data)
}
fn decode_flight_data_from_protobuf(schema: &Bytes, payload: &Bytes) -> DfRecordBatch {
let schema = FlightData::decode(&schema[..]).unwrap();
let payload = FlightData::decode(&payload[..]).unwrap();
let mut decoder = FlightDecoder::default();
let _schema = decoder.try_decode(&schema).unwrap();
let message = decoder.try_decode(&payload).unwrap();
let FlightMessage::RecordBatch(batch) = message else {
unreachable!("unexpected message");
};
batch
}
fn decode_flight_data_from_header_and_body(
schema: &Bytes,
data_header: &Bytes,
data_body: &Bytes,
) -> DfRecordBatch {
let mut decoder = FlightDecoder::try_from_schema_bytes(schema).unwrap();
decoder
.try_decode_record_batch(data_header, data_body)
.unwrap()
}
fn bench_decode_flight_data(c: &mut Criterion) {
let row_counts = [100000, 200000, 1000000];
for row_count in row_counts {
let (schema, payload) = prepare_flight_data(row_count);
// arguments for decode_flight_data_from_protobuf
let schema_bytes = Bytes::from(schema.encode_to_vec());
let payload_bytes = Bytes::from(payload.encode_to_vec());
let mut group = c.benchmark_group(format!("flight_decoder_{}_rows", row_count));
group.bench_function("decode_from_protobuf", |b| {
b.iter(|| decode_flight_data_from_protobuf(&schema_bytes, &payload_bytes));
});
group.bench_function("decode_from_header_and_body", |b| {
b.iter(|| {
decode_flight_data_from_header_and_body(
&schema.data_header,
&payload.data_header,
&payload.data_body,
)
});
});
group.finish();
}
}
criterion_group!(benches, bench_decode_flight_data);
criterion_main!(benches);

View File

@@ -14,8 +14,10 @@
use criterion::criterion_main;
mod bench_flight_decoder;
mod channel_manager;
criterion_main! {
channel_manager::benches
channel_manager::benches,
bench_flight_decoder::benches
}

View File

@@ -18,6 +18,7 @@ use std::io;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use datatypes::arrow::error::ArrowError;
use snafu::{Location, Snafu};
pub type Result<T> = std::result::Result<T, Error>;
@@ -59,13 +60,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to create RecordBatch"))]
CreateRecordBatch {
#[snafu(implicit)]
location: Location,
source: common_recordbatch::error::Error,
},
#[snafu(display("Failed to convert Arrow type: {}", from))]
Conversion {
from: String,
@@ -88,13 +82,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to convert Arrow Schema"))]
ConvertArrowSchema {
#[snafu(implicit)]
location: Location,
source: datatypes::error::Error,
},
#[snafu(display("Not supported: {}", feat))]
NotSupported { feat: String },
@@ -105,6 +92,14 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed arrow operation"))]
Arrow {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: ArrowError,
},
}
impl ErrorExt for Error {
@@ -121,8 +116,7 @@ impl ErrorExt for Error {
| Error::DecodeFlightData { .. }
| Error::SerdeJson { .. } => StatusCode::Internal,
Error::CreateRecordBatch { source, .. } => source.status_code(),
Error::ConvertArrowSchema { source, .. } => source.status_code(),
Error::Arrow { .. } => StatusCode::Internal,
}
}

View File

@@ -21,25 +21,24 @@ use api::v1::{AffectedRows, FlightMetadata, Metrics};
use arrow_flight::utils::flight_data_to_arrow_batch;
use arrow_flight::{FlightData, SchemaAsIpc};
use common_base::bytes::Bytes;
use common_recordbatch::{RecordBatch, RecordBatches};
use common_recordbatch::DfRecordBatch;
use datatypes::arrow;
use datatypes::arrow::datatypes::Schema as ArrowSchema;
use datatypes::arrow::ipc::{root_as_message, writer, MessageHeader};
use datatypes::schema::{Schema, SchemaRef};
use datatypes::arrow::buffer::Buffer;
use datatypes::arrow::datatypes::{Schema as ArrowSchema, SchemaRef};
use datatypes::arrow::error::ArrowError;
use datatypes::arrow::ipc::{convert, reader, root_as_message, writer, MessageHeader};
use flatbuffers::FlatBufferBuilder;
use prost::bytes::Bytes as ProstBytes;
use prost::Message;
use snafu::{OptionExt, ResultExt};
use crate::error::{
ConvertArrowSchemaSnafu, CreateRecordBatchSnafu, DecodeFlightDataSnafu, InvalidFlightDataSnafu,
Result,
};
use crate::error;
use crate::error::{DecodeFlightDataSnafu, InvalidFlightDataSnafu, Result};
#[derive(Debug, Clone)]
pub enum FlightMessage {
Schema(SchemaRef),
Recordbatch(RecordBatch),
RecordBatch(DfRecordBatch),
AffectedRows(usize),
Metrics(String),
}
@@ -67,14 +66,12 @@ impl Default for FlightEncoder {
impl FlightEncoder {
pub fn encode(&mut self, flight_message: FlightMessage) -> FlightData {
match flight_message {
FlightMessage::Schema(schema) => {
SchemaAsIpc::new(schema.arrow_schema(), &self.write_options).into()
}
FlightMessage::Recordbatch(recordbatch) => {
FlightMessage::Schema(schema) => SchemaAsIpc::new(&schema, &self.write_options).into(),
FlightMessage::RecordBatch(record_batch) => {
let (encoded_dictionaries, encoded_batch) = self
.data_gen
.encoded_batch(
recordbatch.df_record_batch(),
&record_batch,
&mut self.dictionary_tracker,
&self.write_options,
)
@@ -124,9 +121,58 @@ impl FlightEncoder {
#[derive(Default)]
pub struct FlightDecoder {
schema: Option<SchemaRef>,
schema_bytes: Option<bytes::Bytes>,
}
impl FlightDecoder {
/// Build a [FlightDecoder] instance from provided schema bytes.
pub fn try_from_schema_bytes(schema_bytes: &bytes::Bytes) -> Result<Self> {
let arrow_schema = convert::try_schema_from_flatbuffer_bytes(&schema_bytes[..])
.context(error::ArrowSnafu)?;
Ok(Self {
schema: Some(Arc::new(arrow_schema)),
schema_bytes: Some(schema_bytes.clone()),
})
}
pub fn try_decode_record_batch(
&mut self,
data_header: &bytes::Bytes,
data_body: &bytes::Bytes,
) -> Result<DfRecordBatch> {
let schema = self
.schema
.as_ref()
.context(InvalidFlightDataSnafu {
reason: "Should have decoded schema first!",
})?
.clone();
let message = root_as_message(&data_header[..])
.map_err(|err| {
ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
})
.context(error::ArrowSnafu)?;
let result = message
.header_as_record_batch()
.ok_or_else(|| {
ArrowError::ParseError(
"Unable to convert flight data header to a record batch".to_string(),
)
})
.and_then(|batch| {
reader::read_record_batch(
&Buffer::from(data_body.as_ref()),
batch,
schema,
&HashMap::new(),
None,
&message.version(),
)
})
.context(error::ArrowSnafu)?;
Ok(result)
}
pub fn try_decode(&mut self, flight_data: &FlightData) -> Result<FlightMessage> {
let message = root_as_message(&flight_data.data_header).map_err(|e| {
InvalidFlightDataSnafu {
@@ -152,36 +198,29 @@ impl FlightDecoder {
.fail()
}
MessageHeader::Schema => {
let arrow_schema = ArrowSchema::try_from(flight_data).map_err(|e| {
let arrow_schema = Arc::new(ArrowSchema::try_from(flight_data).map_err(|e| {
InvalidFlightDataSnafu {
reason: e.to_string(),
}
.build()
})?;
let schema =
Arc::new(Schema::try_from(arrow_schema).context(ConvertArrowSchemaSnafu)?);
self.schema = Some(schema.clone());
Ok(FlightMessage::Schema(schema))
})?);
self.schema = Some(arrow_schema.clone());
self.schema_bytes = Some(flight_data.data_header.clone());
Ok(FlightMessage::Schema(arrow_schema))
}
MessageHeader::RecordBatch => {
let schema = self.schema.clone().context(InvalidFlightDataSnafu {
reason: "Should have decoded schema first!",
})?;
let arrow_schema = schema.arrow_schema().clone();
let arrow_batch =
flight_data_to_arrow_batch(flight_data, arrow_schema, &HashMap::new())
flight_data_to_arrow_batch(flight_data, schema.clone(), &HashMap::new())
.map_err(|e| {
InvalidFlightDataSnafu {
reason: e.to_string(),
}
.build()
})?;
let recordbatch = RecordBatch::try_from_df_record_batch(schema, arrow_batch)
.context(CreateRecordBatchSnafu)?;
Ok(FlightMessage::Recordbatch(recordbatch))
Ok(FlightMessage::RecordBatch(arrow_batch))
}
other => {
let name = other.variant_name().unwrap_or("UNKNOWN");
@@ -196,16 +235,22 @@ impl FlightDecoder {
pub fn schema(&self) -> Option<&SchemaRef> {
self.schema.as_ref()
}
pub fn schema_bytes(&self) -> Option<bytes::Bytes> {
self.schema_bytes.clone()
}
}
pub fn flight_messages_to_recordbatches(messages: Vec<FlightMessage>) -> Result<RecordBatches> {
pub fn flight_messages_to_recordbatches(
messages: Vec<FlightMessage>,
) -> Result<Vec<DfRecordBatch>> {
if messages.is_empty() {
Ok(RecordBatches::empty())
Ok(vec![])
} else {
let mut recordbatches = Vec::with_capacity(messages.len() - 1);
let schema = match &messages[0] {
FlightMessage::Schema(schema) => schema.clone(),
match &messages[0] {
FlightMessage::Schema(_schema) => {}
_ => {
return InvalidFlightDataSnafu {
reason: "First Flight Message must be schema!",
@@ -216,7 +261,7 @@ pub fn flight_messages_to_recordbatches(messages: Vec<FlightMessage>) -> Result<
for message in messages.into_iter().skip(1) {
match message {
FlightMessage::Recordbatch(recordbatch) => recordbatches.push(recordbatch),
FlightMessage::RecordBatch(recordbatch) => recordbatches.push(recordbatch),
_ => {
return InvalidFlightDataSnafu {
reason: "Expect the following Flight Messages are all Recordbatches!",
@@ -226,7 +271,7 @@ pub fn flight_messages_to_recordbatches(messages: Vec<FlightMessage>) -> Result<
}
}
RecordBatches::try_new(schema, recordbatches).context(CreateRecordBatchSnafu)
Ok(recordbatches)
}
}
@@ -247,38 +292,33 @@ fn build_none_flight_msg() -> Bytes {
#[cfg(test)]
mod test {
use arrow_flight::utils::batches_to_flight_data;
use datatypes::arrow::datatypes::{DataType, Field};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::vectors::Int32Vector;
use datatypes::arrow::array::Int32Array;
use datatypes::arrow::datatypes::{DataType, Field, Schema};
use super::*;
use crate::Error;
#[test]
fn test_try_decode() {
let arrow_schema = ArrowSchema::new(vec![Field::new("n", DataType::Int32, true)]);
let schema = Arc::new(Schema::try_from(arrow_schema.clone()).unwrap());
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"n",
DataType::Int32,
true,
)]));
let batch1 = RecordBatch::new(
let batch1 = DfRecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Vector::from(vec![Some(1), None, Some(3)])) as _],
vec![Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])) as _],
)
.unwrap();
let batch2 = RecordBatch::new(
let batch2 = DfRecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Vector::from(vec![None, Some(5)])) as _],
vec![Arc::new(Int32Array::from(vec![None, Some(5)])) as _],
)
.unwrap();
let flight_data = batches_to_flight_data(
&arrow_schema,
vec![
batch1.clone().into_df_record_batch(),
batch2.clone().into_df_record_batch(),
],
)
.unwrap();
let flight_data =
batches_to_flight_data(&schema, vec![batch1.clone(), batch2.clone()]).unwrap();
assert_eq!(flight_data.len(), 3);
let [d1, d2, d3] = flight_data.as_slice() else {
unreachable!()
@@ -304,15 +344,15 @@ mod test {
let _ = decoder.schema.as_ref().unwrap();
let message = decoder.try_decode(d2).unwrap();
assert!(matches!(message, FlightMessage::Recordbatch(_)));
let FlightMessage::Recordbatch(actual_batch) = message else {
assert!(matches!(message, FlightMessage::RecordBatch(_)));
let FlightMessage::RecordBatch(actual_batch) = message else {
unreachable!()
};
assert_eq!(actual_batch, batch1);
let message = decoder.try_decode(d3).unwrap();
assert!(matches!(message, FlightMessage::Recordbatch(_)));
let FlightMessage::Recordbatch(actual_batch) = message else {
assert!(matches!(message, FlightMessage::RecordBatch(_)));
let FlightMessage::RecordBatch(actual_batch) = message else {
unreachable!()
};
assert_eq!(actual_batch, batch2);
@@ -320,27 +360,22 @@ mod test {
#[test]
fn test_flight_messages_to_recordbatches() {
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
"m",
ConcreteDataType::int32_datatype(),
true,
)]));
let batch1 = RecordBatch::new(
let schema = Arc::new(Schema::new(vec![Field::new("m", DataType::Int32, true)]));
let batch1 = DfRecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Vector::from(vec![Some(2), None, Some(4)])) as _],
vec![Arc::new(Int32Array::from(vec![Some(2), None, Some(4)])) as _],
)
.unwrap();
let batch2 = RecordBatch::new(
let batch2 = DfRecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Vector::from(vec![None, Some(6)])) as _],
vec![Arc::new(Int32Array::from(vec![None, Some(6)])) as _],
)
.unwrap();
let recordbatches =
RecordBatches::try_new(schema.clone(), vec![batch1.clone(), batch2.clone()]).unwrap();
let recordbatches = vec![batch1.clone(), batch2.clone()];
let m1 = FlightMessage::Schema(schema);
let m2 = FlightMessage::Recordbatch(batch1);
let m3 = FlightMessage::Recordbatch(batch2);
let m2 = FlightMessage::RecordBatch(batch1);
let m3 = FlightMessage::RecordBatch(batch2);
let result = flight_messages_to_recordbatches(vec![m2.clone(), m1.clone(), m3.clone()]);
assert!(matches!(result, Err(Error::InvalidFlightData { .. })));

View File

@@ -8,6 +8,7 @@ license.workspace = true
testing = []
pg_kvbackend = ["dep:tokio-postgres", "dep:backon", "dep:deadpool-postgres", "dep:deadpool"]
mysql_kvbackend = ["dep:sqlx", "dep:backon"]
enterprise = []
[lints]
workspace = true
@@ -42,6 +43,7 @@ deadpool = { workspace = true, optional = true }
deadpool-postgres = { workspace = true, optional = true }
derive_builder.workspace = true
etcd-client.workspace = true
flexbuffers = "25.2"
futures.workspace = true
futures-util.workspace = true
hex.workspace = true
@@ -49,6 +51,7 @@ humantime-serde.workspace = true
itertools.workspace = true
lazy_static.workspace = true
moka.workspace = true
object-store.workspace = true
prometheus.workspace = true
prost.workspace = true
rand.workspace = true
@@ -71,6 +74,7 @@ typetag.workspace = true
[dev-dependencies]
chrono.workspace = true
common-procedure = { workspace = true, features = ["testing"] }
common-test-util.workspace = true
common-wal = { workspace = true, features = ["testing"] }
datatypes.workspace = true
hyper = { version = "0.14", features = ["full"] }

View File

@@ -16,9 +16,12 @@ use std::sync::Arc;
use crate::error::Result;
use crate::flow_name::FlowName;
use crate::instruction::CacheIdent;
use crate::instruction::{CacheIdent, DropFlow};
use crate::key::flow::flow_info::FlowInfoKey;
use crate::key::flow::flow_name::FlowNameKey;
use crate::key::flow::flow_route::FlowRouteKey;
use crate::key::flow::flownode_flow::FlownodeFlowKey;
use crate::key::flow::table_flow::TableFlowKey;
use crate::key::schema_name::SchemaNameKey;
use crate::key::table_info::TableInfoKey;
use crate::key::table_name::TableNameKey;
@@ -89,9 +92,40 @@ where
let key: SchemaNameKey = schema_name.into();
self.invalidate_key(&key.to_bytes()).await;
}
CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_) => {
CacheIdent::CreateFlow(_) => {
// Do nothing
}
CacheIdent::DropFlow(DropFlow {
flow_id,
source_table_ids,
flow_part2node_id,
}) => {
// invalidate flow route/flownode flow/table flow
let mut keys = Vec::with_capacity(
source_table_ids.len() * flow_part2node_id.len()
+ flow_part2node_id.len() * 2,
);
for table_id in source_table_ids {
for (partition_id, node_id) in flow_part2node_id {
let key =
TableFlowKey::new(*table_id, *node_id, *flow_id, *partition_id)
.to_bytes();
keys.push(key);
}
}
for (partition_id, node_id) in flow_part2node_id {
let key =
FlownodeFlowKey::new(*node_id, *flow_id, *partition_id).to_bytes();
keys.push(key);
let key = FlowRouteKey::new(*flow_id, *partition_id).to_bytes();
keys.push(key);
}
for key in keys {
self.invalidate_key(&key).await;
}
}
CacheIdent::FlowName(FlowName {
catalog_name,
flow_name,

View File

@@ -21,7 +21,7 @@ use snafu::{ensure, ResultExt};
use strum::AsRefStr;
use crate::cache_invalidator::Context;
use crate::ddl::utils::handle_retry_error;
use crate::ddl::utils::map_to_procedure_error;
use crate::ddl::DdlContext;
use crate::error::{Result, SchemaNotFoundSnafu};
use crate::instruction::CacheIdent;
@@ -148,7 +148,7 @@ impl Procedure for AlterDatabaseProcedure {
AlterDatabaseState::UpdateMetadata => self.on_update_metadata().await,
AlterDatabaseState::InvalidateSchemaCache => self.on_invalidate_schema_cache().await,
}
.map_err(handle_retry_error)
.map_err(map_to_procedure_error)
}
fn dump(&self) -> ProcedureResult<String> {

View File

@@ -32,9 +32,12 @@ use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
use strum::AsRefStr;
use table::metadata::TableId;
use crate::ddl::utils::{add_peer_context_if_needed, sync_follower_regions};
use crate::ddl::utils::{
add_peer_context_if_needed, map_to_procedure_error, sync_follower_regions,
};
use crate::ddl::DdlContext;
use crate::error::{DecodeJsonSnafu, Error, MetadataCorruptionSnafu, Result};
use crate::error::{DecodeJsonSnafu, MetadataCorruptionSnafu, Result};
use crate::instruction::CacheIdent;
use crate::key::table_info::TableInfoValue;
use crate::key::table_route::PhysicalTableRouteValue;
use crate::key::DeserializedValueWithBytes;
@@ -66,6 +69,7 @@ impl AlterLogicalTablesProcedure {
physical_table_info: None,
physical_table_route: None,
physical_columns: vec![],
table_cache_keys_to_invalidate: vec![],
},
}
}
@@ -195,16 +199,19 @@ impl AlterLogicalTablesProcedure {
self.update_physical_table_metadata().await?;
self.update_logical_tables_metadata().await?;
self.data.build_cache_keys_to_invalidate();
self.data.clear_metadata_fields();
self.data.state = AlterTablesState::InvalidateTableCache;
Ok(Status::executing(true))
}
pub(crate) async fn on_invalidate_table_cache(&mut self) -> Result<Status> {
let to_invalidate = self.build_table_cache_keys_to_invalidate();
let to_invalidate = &self.data.table_cache_keys_to_invalidate;
self.context
.cache_invalidator
.invalidate(&Default::default(), &to_invalidate)
.invalidate(&Default::default(), to_invalidate)
.await?;
Ok(Status::done())
}
@@ -217,14 +224,6 @@ impl Procedure for AlterLogicalTablesProcedure {
}
async fn execute(&mut self, _ctx: &Context) -> ProcedureResult<Status> {
let error_handler = |e: Error| {
if e.is_retry_later() {
common_procedure::Error::retry_later(e)
} else {
common_procedure::Error::external(e)
}
};
let state = &self.data.state;
let step = state.as_ref();
@@ -241,7 +240,7 @@ impl Procedure for AlterLogicalTablesProcedure {
AlterTablesState::UpdateMetadata => self.on_update_metadata().await,
AlterTablesState::InvalidateTableCache => self.on_invalidate_table_cache().await,
}
.map_err(error_handler)
.map_err(map_to_procedure_error)
}
fn dump(&self) -> ProcedureResult<String> {
@@ -280,6 +279,20 @@ pub struct AlterTablesData {
physical_table_info: Option<DeserializedValueWithBytes<TableInfoValue>>,
physical_table_route: Option<PhysicalTableRouteValue>,
physical_columns: Vec<ColumnMetadata>,
table_cache_keys_to_invalidate: Vec<CacheIdent>,
}
impl AlterTablesData {
/// Clears all data fields except `state` and `table_cache_keys_to_invalidate` after metadata update.
/// This is done to avoid persisting unnecessary data after the update metadata step.
fn clear_metadata_fields(&mut self) {
self.tasks.clear();
self.table_info_values.clear();
self.physical_table_id = 0;
self.physical_table_info = None;
self.physical_table_route = None;
self.physical_columns.clear();
}
}
#[derive(Debug, Serialize, Deserialize, AsRefStr)]

View File

@@ -15,13 +15,12 @@
use table::metadata::RawTableInfo;
use table::table_name::TableName;
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
use crate::ddl::alter_logical_tables::AlterTablesData;
use crate::instruction::CacheIdent;
impl AlterLogicalTablesProcedure {
pub(crate) fn build_table_cache_keys_to_invalidate(&self) -> Vec<CacheIdent> {
impl AlterTablesData {
pub(crate) fn build_cache_keys_to_invalidate(&mut self) {
let mut cache_keys = self
.data
.table_info_values
.iter()
.flat_map(|table| {
@@ -31,14 +30,14 @@ impl AlterLogicalTablesProcedure {
]
})
.collect::<Vec<_>>();
cache_keys.push(CacheIdent::TableId(self.data.physical_table_id));
cache_keys.push(CacheIdent::TableId(self.physical_table_id));
// Safety: physical_table_info already filled in previous steps
let physical_table_info = &self.data.physical_table_info.as_ref().unwrap().table_info;
let physical_table_info = &self.physical_table_info.as_ref().unwrap().table_info;
cache_keys.push(CacheIdent::TableName(extract_table_name(
physical_table_info,
)));
cache_keys
self.table_cache_keys_to_invalidate = cache_keys;
}
}

View File

@@ -40,10 +40,11 @@ use table::table_reference::TableReference;
use crate::cache_invalidator::Context;
use crate::ddl::utils::{
add_peer_context_if_needed, handle_multiple_results, sync_follower_regions, MultipleResults,
add_peer_context_if_needed, handle_multiple_results, map_to_procedure_error,
sync_follower_regions, MultipleResults,
};
use crate::ddl::DdlContext;
use crate::error::{AbortProcedureSnafu, Error, NoLeaderSnafu, PutPoisonSnafu, Result};
use crate::error::{AbortProcedureSnafu, NoLeaderSnafu, PutPoisonSnafu, Result, RetryLaterSnafu};
use crate::instruction::CacheIdent;
use crate::key::table_info::TableInfoValue;
use crate::key::{DeserializedValueWithBytes, RegionDistribution};
@@ -195,7 +196,10 @@ impl AlterTableProcedure {
}
MultipleResults::AllRetryable(error) => {
// Just returns the error, and wait for the next try.
Err(error)
let err = BoxedError::new(error);
Err(err).context(RetryLaterSnafu {
clean_poisons: true,
})
}
MultipleResults::Ok(results) => {
self.submit_sync_region_requests(results, &physical_table_route.region_routes)
@@ -323,16 +327,6 @@ impl Procedure for AlterTableProcedure {
}
async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
let error_handler = |e: Error| {
if e.is_retry_later() {
ProcedureError::retry_later(e)
} else if e.need_clean_poisons() {
ProcedureError::external_and_clean_poisons(e)
} else {
ProcedureError::external(e)
}
};
let state = &self.data.state;
let step = state.as_ref();
@@ -350,7 +344,7 @@ impl Procedure for AlterTableProcedure {
AlterTableState::UpdateMetadata => self.on_update_metadata().await,
AlterTableState::InvalidateTableCache => self.on_broadcast().await,
}
.map_err(error_handler)
.map_err(map_to_procedure_error)
}
fn dump(&self) -> ProcedureResult<String> {

View File

@@ -22,7 +22,7 @@ use serde_with::{serde_as, DefaultOnNull};
use snafu::{ensure, ResultExt};
use strum::AsRefStr;
use crate::ddl::utils::handle_retry_error;
use crate::ddl::utils::map_to_procedure_error;
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::key::schema_name::{SchemaNameKey, SchemaNameValue};
@@ -115,7 +115,7 @@ impl Procedure for CreateDatabaseProcedure {
CreateDatabaseState::Prepare => self.on_prepare().await,
CreateDatabaseState::CreateMetadata => self.on_create_metadata().await,
}
.map_err(handle_retry_error)
.map_err(map_to_procedure_error)
}
fn dump(&self) -> ProcedureResult<String> {

View File

@@ -36,7 +36,7 @@ use strum::AsRefStr;
use table::metadata::TableId;
use crate::cache_invalidator::Context;
use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error};
use crate::ddl::utils::{add_peer_context_if_needed, map_to_procedure_error};
use crate::ddl::DdlContext;
use crate::error::{self, Result, UnexpectedSnafu};
use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
@@ -304,7 +304,7 @@ impl Procedure for CreateFlowProcedure {
CreateFlowState::CreateMetadata => self.on_create_metadata().await,
CreateFlowState::InvalidateFlowCache => self.on_broadcast().await,
}
.map_err(handle_retry_error)
.map_err(map_to_procedure_error)
}
fn dump(&self) -> ProcedureResult<String> {

View File

@@ -33,7 +33,9 @@ use store_api::storage::{RegionId, RegionNumber};
use strum::AsRefStr;
use table::metadata::{RawTableInfo, TableId};
use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error, sync_follower_regions};
use crate::ddl::utils::{
add_peer_context_if_needed, map_to_procedure_error, sync_follower_regions,
};
use crate::ddl::DdlContext;
use crate::error::{DecodeJsonSnafu, MetadataCorruptionSnafu, Result};
use crate::key::table_route::TableRouteValue;
@@ -238,7 +240,7 @@ impl Procedure for CreateLogicalTablesProcedure {
CreateTablesState::DatanodeCreateRegions => self.on_datanode_create_regions().await,
CreateTablesState::CreateMetadata => self.on_create_metadata().await,
}
.map_err(handle_retry_error)
.map_err(map_to_procedure_error)
}
fn dump(&self) -> ProcedureResult<String> {

View File

@@ -34,7 +34,7 @@ use table::table_reference::TableReference;
use crate::ddl::create_table_template::{build_template, CreateRequestBuilder};
use crate::ddl::utils::{
add_peer_context_if_needed, convert_region_routes_to_detecting_regions, handle_retry_error,
add_peer_context_if_needed, convert_region_routes_to_detecting_regions, map_to_procedure_error,
region_storage_path,
};
use crate::ddl::{DdlContext, TableMetadata};
@@ -319,7 +319,7 @@ impl Procedure for CreateTableProcedure {
CreateTableState::DatanodeCreateRegions => self.on_datanode_create_regions().await,
CreateTableState::CreateMetadata => self.on_create_metadata().await,
}
.map_err(handle_retry_error)
.map_err(map_to_procedure_error)
}
fn dump(&self) -> ProcedureResult<String> {

View File

@@ -23,7 +23,7 @@ use table::metadata::{RawTableInfo, TableId, TableType};
use table::table_reference::TableReference;
use crate::cache_invalidator::Context;
use crate::ddl::utils::handle_retry_error;
use crate::ddl::utils::map_to_procedure_error;
use crate::ddl::{DdlContext, TableMetadata};
use crate::error::{self, Result};
use crate::instruction::CacheIdent;
@@ -249,7 +249,7 @@ impl Procedure for CreateViewProcedure {
CreateViewState::Prepare => self.on_prepare().await,
CreateViewState::CreateMetadata => self.on_create_metadata(ctx).await,
}
.map_err(handle_retry_error)
.map_err(map_to_procedure_error)
}
fn dump(&self) -> ProcedureResult<String> {

View File

@@ -21,7 +21,7 @@ use std::any::Any;
use std::fmt::Debug;
use common_error::ext::BoxedError;
use common_procedure::error::{Error as ProcedureError, ExternalSnafu, FromJsonSnafu, ToJsonSnafu};
use common_procedure::error::{ExternalSnafu, FromJsonSnafu, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
};
@@ -31,6 +31,7 @@ use snafu::ResultExt;
use tonic::async_trait;
use self::start::DropDatabaseStart;
use crate::ddl::utils::map_to_procedure_error;
use crate::ddl::DdlContext;
use crate::error::Result;
use crate::key::table_name::TableNameValue;
@@ -141,13 +142,7 @@ impl Procedure for DropDatabaseProcedure {
let (next, status) = state
.next(&self.runtime_context, &mut self.context)
.await
.map_err(|e| {
if e.is_retry_later() {
ProcedureError::retry_later(e)
} else {
ProcedureError::external(e)
}
})?;
.map_err(map_to_procedure_error)?;
*state = next;
Ok(status)

View File

@@ -323,6 +323,7 @@ mod tests {
}
.build(),
),
clean_poisons: false,
})
}

View File

@@ -30,7 +30,7 @@ use snafu::{ensure, ResultExt};
use strum::AsRefStr;
use crate::cache_invalidator::Context;
use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error};
use crate::ddl::utils::{add_peer_context_if_needed, map_to_procedure_error};
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::flow_name::FlowName;
@@ -201,7 +201,7 @@ impl Procedure for DropFlowProcedure {
DropFlowState::InvalidateFlowCache => self.on_broadcast().await,
DropFlowState::DropFlows => self.on_flownode_drop_flows().await,
}
.map_err(handle_retry_error)
.map_err(map_to_procedure_error)
}
fn dump(&self) -> ProcedureResult<String> {

View File

@@ -35,7 +35,7 @@ use table::metadata::TableId;
use table::table_reference::TableReference;
use self::executor::DropTableExecutor;
use crate::ddl::utils::handle_retry_error;
use crate::ddl::utils::map_to_procedure_error;
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::key::table_route::TableRouteValue;
@@ -221,7 +221,7 @@ impl Procedure for DropTableProcedure {
DropTableState::DatanodeDropRegions => self.on_datanode_drop_regions().await,
DropTableState::DeleteTombstone => self.on_delete_metadata_tombstone().await,
}
.map_err(handle_retry_error)
.map_err(map_to_procedure_error)
}
fn dump(&self) -> ProcedureResult<String> {

View File

@@ -25,7 +25,7 @@ use table::metadata::{RawTableInfo, TableId, TableType};
use table::table_reference::TableReference;
use crate::cache_invalidator::Context;
use crate::ddl::utils::handle_retry_error;
use crate::ddl::utils::map_to_procedure_error;
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::instruction::CacheIdent;
@@ -191,7 +191,7 @@ impl Procedure for DropViewProcedure {
DropViewState::DeleteMetadata => self.on_delete_metadata().await,
DropViewState::InvalidateViewCache => self.on_broadcast().await,
}
.map_err(handle_retry_error)
.map_err(map_to_procedure_error)
}
fn dump(&self) -> ProcedureResult<String> {

View File

@@ -18,6 +18,7 @@ pub mod create_table;
pub mod datanode_handler;
pub mod flownode_handler;
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use api::v1::meta::Partition;
@@ -75,8 +76,6 @@ pub async fn create_logical_table(
physical_table_id: TableId,
table_name: &str,
) -> TableId {
use std::assert_matches::assert_matches;
let tasks = vec![test_create_logical_table_task(table_name)];
let mut procedure = CreateLogicalTablesProcedure::new(tasks, physical_table_id, ddl_context);
let status = procedure.on_prepare().await.unwrap();

View File

@@ -105,6 +105,7 @@ impl MockDatanodeHandler for RetryErrorDatanodeHandler {
}
.build(),
),
clean_poisons: false,
})
}
@@ -218,6 +219,7 @@ impl MockDatanodeHandler for PartialSuccessDatanodeHandler {
}
.build(),
),
clean_poisons: false,
})
} else {
error::UnexpectedSnafu {
@@ -252,6 +254,7 @@ impl MockDatanodeHandler for AllFailureDatanodeHandler {
}
.build(),
),
clean_poisons: false,
})
} else {
error::UnexpectedSnafu {

View File

@@ -575,6 +575,7 @@ async fn test_on_submit_alter_request_with_partial_success_retryable() {
.await
.unwrap_err();
assert!(result.is_retry_later());
assert!(!result.need_clean_poisons());
// Submits again
let result = procedure
@@ -582,6 +583,7 @@ async fn test_on_submit_alter_request_with_partial_success_retryable() {
.await
.unwrap_err();
assert!(result.is_retry_later());
assert!(!result.need_clean_poisons());
}
#[tokio::test]
@@ -618,12 +620,14 @@ async fn test_on_submit_alter_request_with_all_failure_retrybale() {
.await
.unwrap_err();
assert!(err.is_retry_later());
assert!(err.need_clean_poisons());
// submits again
let err = procedure
.submit_alter_region_requests(procedure_id, provider.as_ref())
.await
.unwrap_err();
assert!(err.is_retry_later());
assert!(err.need_clean_poisons());
}
#[tokio::test]

View File

@@ -31,7 +31,7 @@ use table::metadata::{RawTableInfo, TableId};
use table::table_name::TableName;
use table::table_reference::TableReference;
use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error};
use crate::ddl::utils::{add_peer_context_if_needed, map_to_procedure_error};
use crate::ddl::DdlContext;
use crate::error::{Result, TableNotFoundSnafu};
use crate::key::table_info::TableInfoValue;
@@ -66,7 +66,7 @@ impl Procedure for TruncateTableProcedure {
self.on_datanode_truncate_regions().await
}
}
.map_err(handle_retry_error)
.map_err(map_to_procedure_error)
}
fn dump(&self) -> ProcedureResult<String> {

View File

@@ -60,11 +60,16 @@ pub fn add_peer_context_if_needed(datanode: Peer) -> impl FnOnce(Error) -> Error
}
}
pub fn handle_retry_error(e: Error) -> ProcedureError {
if e.is_retry_later() {
ProcedureError::retry_later(e)
} else {
ProcedureError::external(e)
/// Maps the error to the corresponding procedure error.
///
/// This function determines whether the error should be retried and if poison cleanup is needed,
/// then maps it to the appropriate procedure error variant.
pub fn map_to_procedure_error(e: Error) -> ProcedureError {
match (e.is_retry_later(), e.need_clean_poisons()) {
(true, true) => ProcedureError::retry_later_and_clean_poisons(e),
(true, false) => ProcedureError::retry_later(e),
(false, true) => ProcedureError::external_and_clean_poisons(e),
(false, false) => ProcedureError::external(e),
}
}

View File

@@ -47,6 +47,10 @@ use crate::error::{
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
#[cfg(feature = "enterprise")]
use crate::rpc::ddl::trigger::CreateTriggerTask;
#[cfg(feature = "enterprise")]
use crate::rpc::ddl::DdlTask::CreateTrigger;
use crate::rpc::ddl::DdlTask::{
AlterDatabase, AlterLogicalTables, AlterTable, CreateDatabase, CreateFlow, CreateLogicalTables,
CreateTable, CreateView, DropDatabase, DropFlow, DropLogicalTables, DropTable, DropView,
@@ -70,8 +74,29 @@ pub type BoxedProcedureLoaderFactory = dyn Fn(DdlContext) -> BoxedProcedureLoade
pub struct DdlManager {
ddl_context: DdlContext,
procedure_manager: ProcedureManagerRef,
#[cfg(feature = "enterprise")]
trigger_ddl_manager: Option<TriggerDdlManagerRef>,
}
/// This trait is responsible for handling DDL tasks about triggers. e.g.,
/// create trigger, drop trigger, etc.
#[cfg(feature = "enterprise")]
#[async_trait::async_trait]
pub trait TriggerDdlManager: Send + Sync {
async fn create_trigger(
&self,
create_trigger_task: CreateTriggerTask,
procedure_manager: ProcedureManagerRef,
ddl_context: DdlContext,
query_context: QueryContext,
) -> Result<SubmitDdlTaskResponse>;
fn as_any(&self) -> &dyn std::any::Any;
}
#[cfg(feature = "enterprise")]
pub type TriggerDdlManagerRef = Arc<dyn TriggerDdlManager>;
macro_rules! procedure_loader_entry {
($procedure:ident) => {
(
@@ -100,10 +125,13 @@ impl DdlManager {
ddl_context: DdlContext,
procedure_manager: ProcedureManagerRef,
register_loaders: bool,
#[cfg(feature = "enterprise")] trigger_ddl_manager: Option<TriggerDdlManagerRef>,
) -> Result<Self> {
let manager = Self {
ddl_context,
procedure_manager,
#[cfg(feature = "enterprise")]
trigger_ddl_manager,
};
if register_loaders {
manager.register_loaders()?;
@@ -669,6 +697,28 @@ async fn handle_create_flow_task(
})
}
#[cfg(feature = "enterprise")]
async fn handle_create_trigger_task(
ddl_manager: &DdlManager,
create_trigger_task: CreateTriggerTask,
query_context: QueryContext,
) -> Result<SubmitDdlTaskResponse> {
let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
return UnsupportedSnafu {
operation: "create trigger",
}
.fail();
};
m.create_trigger(
create_trigger_task,
ddl_manager.procedure_manager.clone(),
ddl_manager.ddl_context.clone(),
query_context,
)
.await
}
async fn handle_alter_logical_table_tasks(
ddl_manager: &DdlManager,
alter_table_tasks: Vec<AlterTableTask>,
@@ -777,6 +827,15 @@ impl ProcedureExecutor for DdlManager {
handle_create_flow_task(self, create_flow_task, request.query_context.into())
.await
}
#[cfg(feature = "enterprise")]
CreateTrigger(create_trigger_task) => {
handle_create_trigger_task(
self,
create_trigger_task,
request.query_context.into(),
)
.await
}
DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await,
CreateView(create_view_task) => {
handle_create_view_task(self, create_view_task).await
@@ -905,6 +964,8 @@ mod tests {
},
procedure_manager.clone(),
true,
#[cfg(feature = "enterprise")]
None,
);
let expected_loaders = vec![

View File

@@ -454,7 +454,10 @@ pub enum Error {
},
#[snafu(display("Retry later"))]
RetryLater { source: BoxedError },
RetryLater {
source: BoxedError,
clean_poisons: bool,
},
#[snafu(display("Abort procedure"))]
AbortProcedure {
@@ -812,6 +815,68 @@ pub enum Error {
#[snafu(source)]
error: common_time::error::Error,
},
#[snafu(display("Invalid file path: {}", file_path))]
InvalidFilePath {
#[snafu(implicit)]
location: Location,
file_path: String,
},
#[snafu(display("Failed to serialize flexbuffers"))]
SerializeFlexbuffers {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: flexbuffers::SerializationError,
},
#[snafu(display("Failed to deserialize flexbuffers"))]
DeserializeFlexbuffers {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: flexbuffers::DeserializationError,
},
#[snafu(display("Failed to read flexbuffers"))]
ReadFlexbuffers {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: flexbuffers::ReaderError,
},
#[snafu(display("Invalid file name: {}", reason))]
InvalidFileName {
#[snafu(implicit)]
location: Location,
reason: String,
},
#[snafu(display("Invalid file extension: {}", reason))]
InvalidFileExtension {
#[snafu(implicit)]
location: Location,
reason: String,
},
#[snafu(display("Failed to write object, file path: {}", file_path))]
WriteObject {
#[snafu(implicit)]
location: Location,
file_path: String,
#[snafu(source)]
error: object_store::Error,
},
#[snafu(display("Failed to read object, file path: {}", file_path))]
ReadObject {
#[snafu(implicit)]
location: Location,
file_path: String,
#[snafu(source)]
error: object_store::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -834,6 +899,7 @@ impl ErrorExt for Error {
ValueNotExist { .. } | ProcedurePoisonConflict { .. } => StatusCode::Unexpected,
Unsupported { .. } => StatusCode::Unsupported,
WriteObject { .. } | ReadObject { .. } => StatusCode::StorageUnavailable,
SerdeJson { .. }
| ParseOption { .. }
@@ -867,7 +933,10 @@ impl ErrorExt for Error {
| FromUtf8 { .. }
| MetadataCorruption { .. }
| ParseWalOptions { .. }
| KafkaGetOffset { .. } => StatusCode::Unexpected,
| KafkaGetOffset { .. }
| ReadFlexbuffers { .. }
| SerializeFlexbuffers { .. }
| DeserializeFlexbuffers { .. } => StatusCode::Unexpected,
SendMessage { .. } | GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal,
@@ -884,7 +953,10 @@ impl ErrorExt for Error {
| InvalidSetDatabaseOption { .. }
| InvalidUnsetDatabaseOption { .. }
| InvalidTopicNamePrefix { .. }
| InvalidTimeZone { .. } => StatusCode::InvalidArguments,
| InvalidTimeZone { .. }
| InvalidFileExtension { .. }
| InvalidFileName { .. }
| InvalidFilePath { .. } => StatusCode::InvalidArguments,
InvalidFlowRequestBody { .. } => StatusCode::InvalidArguments,
FlowNotFound { .. } => StatusCode::FlowNotFound,
@@ -970,6 +1042,7 @@ impl Error {
pub fn retry_later<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
Error::RetryLater {
source: BoxedError::new(err),
clean_poisons: false,
}
}
@@ -980,7 +1053,13 @@ impl Error {
/// Determine whether it needs to clean poisons.
pub fn need_clean_poisons(&self) -> bool {
matches!(self, Error::AbortProcedure { clean_poisons, .. } if *clean_poisons)
matches!(
self,
Error::AbortProcedure { clean_poisons, .. } if *clean_poisons
) || matches!(
self,
Error::RetryLater { clean_poisons, .. } if *clean_poisons
)
}
/// Returns true if the response exceeds the size limit.

View File

@@ -256,6 +256,11 @@ impl DatanodeTableManager {
})?
.and_then(|r| DatanodeTableValue::try_from_raw_value(&r.value))?
.region_info;
// If the region options are the same, we don't need to update it.
if region_info.region_options == new_region_options {
return Ok(Txn::new());
}
// substitute region options only.
region_info.region_options = new_region_options;

View File

@@ -14,7 +14,7 @@
pub mod flow_info;
pub(crate) mod flow_name;
pub(crate) mod flow_route;
pub mod flow_route;
pub mod flow_state;
mod flownode_addr_helper;
pub(crate) mod flownode_flow;
@@ -45,7 +45,7 @@ use crate::kv_backend::KvBackendRef;
use crate::rpc::store::BatchDeleteRequest;
/// The key of `__flow/` scope.
#[derive(Debug, PartialEq)]
#[derive(Debug, Clone, PartialEq)]
pub struct FlowScoped<T> {
inner: T,
}

View File

@@ -114,37 +114,37 @@ impl<'a> MetadataKey<'a, FlowInfoKeyInner> for FlowInfoKeyInner {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct FlowInfoValue {
/// The source tables used by the flow.
pub(crate) source_table_ids: Vec<TableId>,
pub source_table_ids: Vec<TableId>,
/// The sink table used by the flow.
pub(crate) sink_table_name: TableName,
pub sink_table_name: TableName,
/// Which flow nodes this flow is running on.
pub(crate) flownode_ids: BTreeMap<FlowPartitionId, FlownodeId>,
pub flownode_ids: BTreeMap<FlowPartitionId, FlownodeId>,
/// The catalog name.
pub(crate) catalog_name: String,
pub catalog_name: String,
/// The query context used when create flow.
/// Although flow doesn't belong to any schema, this query_context is needed to remember
/// the query context when `create_flow` is executed
/// for recovering flow using the same sql&query_context after db restart.
/// if none, should use default query context
#[serde(default)]
pub(crate) query_context: Option<crate::rpc::ddl::QueryContext>,
pub query_context: Option<crate::rpc::ddl::QueryContext>,
/// The flow name.
pub(crate) flow_name: String,
pub flow_name: String,
/// The raw sql.
pub(crate) raw_sql: String,
pub raw_sql: String,
/// The expr of expire.
/// Duration in seconds as `i64`.
pub(crate) expire_after: Option<i64>,
pub expire_after: Option<i64>,
/// The comment.
pub(crate) comment: String,
pub comment: String,
/// The options.
pub(crate) options: HashMap<String, String>,
pub options: HashMap<String, String>,
/// The created time
#[serde(default)]
pub(crate) created_time: DateTime<Utc>,
pub created_time: DateTime<Utc>,
/// The updated time.
#[serde(default)]
pub(crate) updated_time: DateTime<Utc>,
pub updated_time: DateTime<Utc>,
}
impl FlowInfoValue {
@@ -153,6 +153,15 @@ impl FlowInfoValue {
&self.flownode_ids
}
/// Insert a new flownode id for a partition.
pub fn insert_flownode_id(
&mut self,
partition: FlowPartitionId,
node: FlownodeId,
) -> Option<FlownodeId> {
self.flownode_ids.insert(partition, node)
}
/// Returns the `source_table`.
pub fn source_table_ids(&self) -> &[TableId] {
&self.source_table_ids

View File

@@ -205,7 +205,7 @@ impl FlowNameManager {
catalog: &str,
) -> BoxStream<'static, Result<(String, FlowNameValue)>> {
let start_key = FlowNameKey::range_start_key(catalog);
common_telemetry::debug!("flow_names: start_key: {:?}", start_key);
common_telemetry::trace!("flow_names: start_key: {:?}", start_key);
let req = RangeRequest::new().with_prefix(start_key);
let stream = PaginationStream::new(

View File

@@ -42,7 +42,7 @@ lazy_static! {
/// The key stores the route info of the flow.
///
/// The layout: `__flow/route/{flow_id}/{partition_id}`.
#[derive(Debug, PartialEq)]
#[derive(Debug, Clone, PartialEq)]
pub struct FlowRouteKey(FlowScoped<FlowRouteKeyInner>);
impl FlowRouteKey {
@@ -145,6 +145,12 @@ pub struct FlowRouteValue {
pub(crate) peer: Peer,
}
impl From<Peer> for FlowRouteValue {
fn from(peer: Peer) -> Self {
Self { peer }
}
}
impl FlowRouteValue {
/// Returns the `peer`.
pub fn peer(&self) -> &Peer {

View File

@@ -166,6 +166,17 @@ impl FlownodeFlowManager {
Self { kv_backend }
}
/// Whether given flow exist on this flownode.
pub async fn exists(
&self,
flownode_id: FlownodeId,
flow_id: FlowId,
partition_id: FlowPartitionId,
) -> Result<bool> {
let key = FlownodeFlowKey::new(flownode_id, flow_id, partition_id).to_bytes();
Ok(self.kv_backend.get(&key).await?.is_some())
}
/// Retrieves all [FlowId] and [FlowPartitionId]s of the specified `flownode_id`.
pub fn flows(
&self,

View File

@@ -38,6 +38,14 @@ pub mod txn;
pub mod util;
pub type KvBackendRef<E = Error> = Arc<dyn KvBackend<Error = E> + Send + Sync>;
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
// The default meta table name, default is "greptime_metakv".
pub const DEFAULT_META_TABLE_NAME: &str = "greptime_metakv";
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
// The default lock id for election, default is 1.
pub const DEFAULT_META_ELECTION_LOCK_ID: u64 = 1;
#[async_trait]
pub trait KvBackend: TxnService
where

View File

@@ -308,10 +308,11 @@ mod tests {
use super::*;
use crate::error::Error;
use crate::kv_backend::test::{
prepare_kv, test_kv_batch_delete, test_kv_batch_get, test_kv_compare_and_put,
test_kv_delete_range, test_kv_put, test_kv_range, test_kv_range_2, test_txn_compare_equal,
test_txn_compare_greater, test_txn_compare_less, test_txn_compare_not_equal,
test_txn_one_compare_op, text_txn_multi_compare_op,
prepare_kv, prepare_kv_with_prefix, test_kv_batch_delete, test_kv_batch_get,
test_kv_compare_and_put, test_kv_delete_range, test_kv_put, test_kv_range, test_kv_range_2,
test_simple_kv_range, test_txn_compare_equal, test_txn_compare_greater,
test_txn_compare_less, test_txn_compare_not_equal, test_txn_one_compare_op,
text_txn_multi_compare_op, unprepare_kv,
};
async fn mock_mem_store_with_data() -> MemoryKvBackend<Error> {
@@ -380,4 +381,12 @@ mod tests {
test_txn_compare_less(&kv_backend).await;
test_txn_compare_not_equal(&kv_backend).await;
}
#[tokio::test]
async fn test_mem_all_range() {
let kv_backend = MemoryKvBackend::<Error>::new();
let prefix = b"";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_simple_kv_range(&kv_backend).await;
unprepare_kv(&kv_backend, prefix).await;
}
}

View File

@@ -33,6 +33,12 @@ use crate::rpc::store::{
};
use crate::rpc::KeyValue;
const RDS_STORE_OP_BATCH_GET: &str = "batch_get";
const RDS_STORE_OP_BATCH_PUT: &str = "batch_put";
const RDS_STORE_OP_RANGE_QUERY: &str = "range_query";
const RDS_STORE_OP_RANGE_DELETE: &str = "range_delete";
const RDS_STORE_OP_BATCH_DELETE: &str = "batch_delete";
#[cfg(feature = "pg_kvbackend")]
mod postgres;
#[cfg(feature = "pg_kvbackend")]
@@ -560,3 +566,21 @@ fn check_txn_ops(txn_ops: &[TxnOp]) -> Result<bool> {
});
Ok(same)
}
#[macro_export]
macro_rules! record_rds_sql_execute_elapsed {
($result:expr, $label_store:expr,$label_op:expr,$label_type:expr) => {{
let timer = std::time::Instant::now();
$result
.inspect(|_| {
$crate::metrics::RDS_SQL_EXECUTE_ELAPSED
.with_label_values(&[$label_store, "success", $label_op, $label_type])
.observe(timer.elapsed().as_millis_f64())
})
.inspect_err(|_| {
$crate::metrics::RDS_SQL_EXECUTE_ELAPSED
.with_label_values(&[$label_store, "error", $label_op, $label_type])
.observe(timer.elapsed().as_millis_f64());
})
}};
}

View File

@@ -20,11 +20,13 @@ use snafu::ResultExt;
use sqlx::mysql::MySqlRow;
use sqlx::pool::Pool;
use sqlx::{MySql, MySqlPool, Row, Transaction as MySqlTransaction};
use strum::AsRefStr;
use crate::error::{CreateMySqlPoolSnafu, MySqlExecutionSnafu, MySqlTransactionSnafu, Result};
use crate::kv_backend::rds::{
Executor, ExecutorFactory, ExecutorImpl, KvQueryExecutor, RdsStore, Transaction,
RDS_STORE_TXN_RETRY_COUNT,
RDS_STORE_OP_BATCH_DELETE, RDS_STORE_OP_BATCH_GET, RDS_STORE_OP_BATCH_PUT,
RDS_STORE_OP_RANGE_DELETE, RDS_STORE_OP_RANGE_QUERY, RDS_STORE_TXN_RETRY_COUNT,
};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::{
@@ -33,6 +35,8 @@ use crate::rpc::store::{
};
use crate::rpc::KeyValue;
const MYSQL_STORE_NAME: &str = "mysql_store";
type MySqlClient = Arc<Pool<MySql>>;
pub struct MySqlTxnClient(MySqlTransaction<'static, MySql>);
@@ -47,7 +51,7 @@ fn key_value_from_row(row: MySqlRow) -> KeyValue {
const EMPTY: &[u8] = &[0];
/// Type of range template.
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, AsRefStr)]
enum RangeTemplateType {
Point,
Range,
@@ -58,6 +62,8 @@ enum RangeTemplateType {
/// Builds params for the given range template type.
impl RangeTemplateType {
/// Builds the parameters for the given range template type.
/// You can check out the conventions at [RangeRequest]
fn build_params(&self, mut key: Vec<u8>, range_end: Vec<u8>) -> Vec<Vec<u8>> {
match self {
RangeTemplateType::Point => vec![key],
@@ -160,7 +166,7 @@ impl<'a> MySqlTemplateFactory<'a> {
range_template: RangeTemplate {
point: format!("SELECT k, v FROM `{table_name}` WHERE k = ?"),
range: format!("SELECT k, v FROM `{table_name}` WHERE k >= ? AND k < ? ORDER BY k"),
full: format!("SELECT k, v FROM `{table_name}` ? ORDER BY k"),
full: format!("SELECT k, v FROM `{table_name}` ORDER BY k"),
left_bounded: format!("SELECT k, v FROM `{table_name}` WHERE k >= ? ORDER BY k"),
prefix: format!("SELECT k, v FROM `{table_name}` WHERE k LIKE ? ORDER BY k"),
},
@@ -343,7 +349,12 @@ impl KvQueryExecutor<MySqlClient> for MySqlStore {
RangeTemplate::with_limit(template, if req.limit == 0 { 0 } else { req.limit + 1 });
let limit = req.limit as usize;
debug!("query: {:?}, params: {:?}", query, params);
let mut kvs = query_executor.query(&query, &params_ref).await?;
let mut kvs = crate::record_rds_sql_execute_elapsed!(
query_executor.query(&query, &params_ref).await,
MYSQL_STORE_NAME,
RDS_STORE_OP_RANGE_QUERY,
template_type.as_ref()
)?;
if req.keys_only {
kvs.iter_mut().for_each(|kv| kv.value = vec![]);
}
@@ -381,7 +392,12 @@ impl KvQueryExecutor<MySqlClient> for MySqlStore {
// Fast path: if we don't need previous kvs, we can just upsert the keys.
if !req.prev_kv {
query_executor.execute(&update, &values_params).await?;
crate::record_rds_sql_execute_elapsed!(
query_executor.execute(&update, &values_params).await,
MYSQL_STORE_NAME,
RDS_STORE_OP_BATCH_PUT,
""
)?;
return Ok(BatchPutResponse::default());
}
// Should use transaction to ensure atomicity.
@@ -392,7 +408,12 @@ impl KvQueryExecutor<MySqlClient> for MySqlStore {
txn.commit().await?;
return res;
}
let prev_kvs = query_executor.query(&select, &in_params).await?;
let prev_kvs = crate::record_rds_sql_execute_elapsed!(
query_executor.query(&select, &in_params).await,
MYSQL_STORE_NAME,
RDS_STORE_OP_BATCH_PUT,
""
)?;
query_executor.execute(&update, &values_params).await?;
Ok(BatchPutResponse { prev_kvs })
}
@@ -409,7 +430,12 @@ impl KvQueryExecutor<MySqlClient> for MySqlStore {
.sql_template_set
.generate_batch_get_query(req.keys.len());
let params = req.keys.iter().map(|x| x as _).collect::<Vec<_>>();
let kvs = query_executor.query(&query, &params).await?;
let kvs = crate::record_rds_sql_execute_elapsed!(
query_executor.query(&query, &params).await,
MYSQL_STORE_NAME,
RDS_STORE_OP_BATCH_GET,
""
)?;
Ok(BatchGetResponse { kvs })
}
@@ -441,7 +467,12 @@ impl KvQueryExecutor<MySqlClient> for MySqlStore {
let template = self.sql_template_set.delete_template.get(template_type);
let params = template_type.build_params(req.key, req.range_end);
let params_ref = params.iter().map(|x| x as _).collect::<Vec<_>>();
query_executor.execute(template, &params_ref).await?;
crate::record_rds_sql_execute_elapsed!(
query_executor.execute(template, &params_ref).await,
MYSQL_STORE_NAME,
RDS_STORE_OP_RANGE_DELETE,
template_type.as_ref()
)?;
let mut resp = DeleteRangeResponse::new(prev_kvs.len() as i64);
if req.prev_kv {
resp.with_prev_kvs(prev_kvs);
@@ -463,7 +494,12 @@ impl KvQueryExecutor<MySqlClient> for MySqlStore {
let params = req.keys.iter().map(|x| x as _).collect::<Vec<_>>();
// Fast path: if we don't need previous kvs, we can just delete the keys.
if !req.prev_kv {
query_executor.execute(&query, &params).await?;
crate::record_rds_sql_execute_elapsed!(
query_executor.execute(&query, &params).await,
MYSQL_STORE_NAME,
RDS_STORE_OP_BATCH_DELETE,
""
)?;
return Ok(BatchDeleteResponse::default());
}
// Should use transaction to ensure atomicity.
@@ -483,7 +519,12 @@ impl KvQueryExecutor<MySqlClient> for MySqlStore {
.await?
.kvs;
// Pure `DELETE` has no return value, so we need to use `execute` instead of `query`.
query_executor.execute(&query, &params).await?;
crate::record_rds_sql_execute_elapsed!(
query_executor.execute(&query, &params).await,
MYSQL_STORE_NAME,
RDS_STORE_OP_BATCH_DELETE,
""
)?;
if req.prev_kv {
Ok(BatchDeleteResponse { prev_kvs })
} else {
@@ -538,10 +579,11 @@ mod tests {
prepare_kv_with_prefix, test_kv_batch_delete_with_prefix, test_kv_batch_get_with_prefix,
test_kv_compare_and_put_with_prefix, test_kv_delete_range_with_prefix,
test_kv_put_with_prefix, test_kv_range_2_with_prefix, test_kv_range_with_prefix,
test_txn_compare_equal, test_txn_compare_greater, test_txn_compare_less,
test_txn_compare_not_equal, test_txn_one_compare_op, text_txn_multi_compare_op,
unprepare_kv,
test_simple_kv_range, test_txn_compare_equal, test_txn_compare_greater,
test_txn_compare_less, test_txn_compare_not_equal, test_txn_one_compare_op,
text_txn_multi_compare_op, unprepare_kv,
};
use crate::maybe_skip_mysql_integration_test;
async fn build_mysql_kv_backend(table_name: &str) -> Option<MySqlStore> {
init_default_ut_logging();
@@ -568,6 +610,7 @@ mod tests {
#[tokio::test]
async fn test_mysql_put() {
maybe_skip_mysql_integration_test!();
let kv_backend = build_mysql_kv_backend("put_test").await.unwrap();
let prefix = b"put/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
@@ -577,6 +620,7 @@ mod tests {
#[tokio::test]
async fn test_mysql_range() {
maybe_skip_mysql_integration_test!();
let kv_backend = build_mysql_kv_backend("range_test").await.unwrap();
let prefix = b"range/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
@@ -586,14 +630,26 @@ mod tests {
#[tokio::test]
async fn test_mysql_range_2() {
maybe_skip_mysql_integration_test!();
let kv_backend = build_mysql_kv_backend("range2_test").await.unwrap();
let prefix = b"range2/";
test_kv_range_2_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
}
#[tokio::test]
async fn test_mysql_all_range() {
maybe_skip_mysql_integration_test!();
let kv_backend = build_mysql_kv_backend("simple_range_test").await.unwrap();
let prefix = b"";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_simple_kv_range(&kv_backend).await;
unprepare_kv(&kv_backend, prefix).await;
}
#[tokio::test]
async fn test_mysql_batch_get() {
maybe_skip_mysql_integration_test!();
let kv_backend = build_mysql_kv_backend("batch_get_test").await.unwrap();
let prefix = b"batch_get/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
@@ -603,6 +659,7 @@ mod tests {
#[tokio::test]
async fn test_mysql_batch_delete() {
maybe_skip_mysql_integration_test!();
let kv_backend = build_mysql_kv_backend("batch_delete_test").await.unwrap();
let prefix = b"batch_delete/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
@@ -612,6 +669,7 @@ mod tests {
#[tokio::test]
async fn test_mysql_batch_delete_with_prefix() {
maybe_skip_mysql_integration_test!();
let kv_backend = build_mysql_kv_backend("batch_delete_with_prefix_test")
.await
.unwrap();
@@ -623,6 +681,7 @@ mod tests {
#[tokio::test]
async fn test_mysql_delete_range() {
maybe_skip_mysql_integration_test!();
let kv_backend = build_mysql_kv_backend("delete_range_test").await.unwrap();
let prefix = b"delete_range/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
@@ -632,6 +691,7 @@ mod tests {
#[tokio::test]
async fn test_mysql_compare_and_put() {
maybe_skip_mysql_integration_test!();
let kv_backend = build_mysql_kv_backend("compare_and_put_test")
.await
.unwrap();
@@ -642,6 +702,7 @@ mod tests {
#[tokio::test]
async fn test_mysql_txn() {
maybe_skip_mysql_integration_test!();
let kv_backend = build_mysql_kv_backend("txn_test").await.unwrap();
test_txn_one_compare_op(&kv_backend).await;
text_txn_multi_compare_op(&kv_backend).await;

View File

@@ -18,6 +18,7 @@ use std::sync::Arc;
use common_telemetry::debug;
use deadpool_postgres::{Config, Pool, Runtime};
use snafu::ResultExt;
use strum::AsRefStr;
use tokio_postgres::types::ToSql;
use tokio_postgres::{IsolationLevel, NoTls, Row};
@@ -27,7 +28,8 @@ use crate::error::{
};
use crate::kv_backend::rds::{
Executor, ExecutorFactory, ExecutorImpl, KvQueryExecutor, RdsStore, Transaction,
RDS_STORE_TXN_RETRY_COUNT,
RDS_STORE_OP_BATCH_DELETE, RDS_STORE_OP_BATCH_GET, RDS_STORE_OP_BATCH_PUT,
RDS_STORE_OP_RANGE_DELETE, RDS_STORE_OP_RANGE_QUERY, RDS_STORE_TXN_RETRY_COUNT,
};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::{
@@ -36,6 +38,8 @@ use crate::rpc::store::{
};
use crate::rpc::KeyValue;
const PG_STORE_NAME: &str = "pg_store";
pub struct PgClient(deadpool::managed::Object<deadpool_postgres::Manager>);
pub struct PgTxnClient<'a>(deadpool_postgres::Transaction<'a>);
@@ -50,7 +54,7 @@ fn key_value_from_row(r: Row) -> KeyValue {
const EMPTY: &[u8] = &[0];
/// Type of range template.
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, AsRefStr)]
enum RangeTemplateType {
Point,
Range,
@@ -61,6 +65,8 @@ enum RangeTemplateType {
/// Builds params for the given range template type.
impl RangeTemplateType {
/// Builds the parameters for the given range template type.
/// You can check out the conventions at [RangeRequest]
fn build_params(&self, mut key: Vec<u8>, range_end: Vec<u8>) -> Vec<Vec<u8>> {
match self {
RangeTemplateType::Point => vec![key],
@@ -164,7 +170,7 @@ impl<'a> PgSqlTemplateFactory<'a> {
range: format!(
"SELECT k, v FROM \"{table_name}\" WHERE k >= $1 AND k < $2 ORDER BY k"
),
full: format!("SELECT k, v FROM \"{table_name}\" $1 ORDER BY k"),
full: format!("SELECT k, v FROM \"{table_name}\" ORDER BY k"),
left_bounded: format!("SELECT k, v FROM \"{table_name}\" WHERE k >= $1 ORDER BY k"),
prefix: format!("SELECT k, v FROM \"{table_name}\" WHERE k LIKE $1 ORDER BY k"),
},
@@ -358,7 +364,13 @@ impl KvQueryExecutor<PgClient> for PgStore {
RangeTemplate::with_limit(template, if req.limit == 0 { 0 } else { req.limit + 1 });
let limit = req.limit as usize;
debug!("query: {:?}, params: {:?}", query, params);
let mut kvs = query_executor.query(&query, &params_ref).await?;
let mut kvs = crate::record_rds_sql_execute_elapsed!(
query_executor.query(&query, &params_ref).await,
PG_STORE_NAME,
RDS_STORE_OP_RANGE_QUERY,
template_type.as_ref()
)?;
if req.keys_only {
kvs.iter_mut().for_each(|kv| kv.value = vec![]);
}
@@ -393,7 +405,13 @@ impl KvQueryExecutor<PgClient> for PgStore {
let query = self
.sql_template_set
.generate_batch_upsert_query(req.kvs.len());
let kvs = query_executor.query(&query, &params).await?;
let kvs = crate::record_rds_sql_execute_elapsed!(
query_executor.query(&query, &params).await,
PG_STORE_NAME,
RDS_STORE_OP_BATCH_PUT,
""
)?;
if req.prev_kv {
Ok(BatchPutResponse { prev_kvs: kvs })
} else {
@@ -414,7 +432,12 @@ impl KvQueryExecutor<PgClient> for PgStore {
.sql_template_set
.generate_batch_get_query(req.keys.len());
let params = req.keys.iter().map(|x| x as _).collect::<Vec<_>>();
let kvs = query_executor.query(&query, &params).await?;
let kvs = crate::record_rds_sql_execute_elapsed!(
query_executor.query(&query, &params).await,
PG_STORE_NAME,
RDS_STORE_OP_BATCH_GET,
""
)?;
Ok(BatchGetResponse { kvs })
}
@@ -427,7 +450,12 @@ impl KvQueryExecutor<PgClient> for PgStore {
let template = self.sql_template_set.delete_template.get(template_type);
let params = template_type.build_params(req.key, req.range_end);
let params_ref = params.iter().map(|x| x as _).collect::<Vec<_>>();
let kvs = query_executor.query(template, &params_ref).await?;
let kvs = crate::record_rds_sql_execute_elapsed!(
query_executor.query(template, &params_ref).await,
PG_STORE_NAME,
RDS_STORE_OP_RANGE_DELETE,
template_type.as_ref()
)?;
let mut resp = DeleteRangeResponse::new(kvs.len() as i64);
if req.prev_kv {
resp.with_prev_kvs(kvs);
@@ -447,7 +475,13 @@ impl KvQueryExecutor<PgClient> for PgStore {
.sql_template_set
.generate_batch_delete_query(req.keys.len());
let params = req.keys.iter().map(|x| x as _).collect::<Vec<_>>();
let kvs = query_executor.query(&query, &params).await?;
let kvs = crate::record_rds_sql_execute_elapsed!(
query_executor.query(&query, &params).await,
PG_STORE_NAME,
RDS_STORE_OP_BATCH_DELETE,
""
)?;
if req.prev_kv {
Ok(BatchDeleteResponse { prev_kvs: kvs })
} else {
@@ -511,10 +545,11 @@ mod tests {
prepare_kv_with_prefix, test_kv_batch_delete_with_prefix, test_kv_batch_get_with_prefix,
test_kv_compare_and_put_with_prefix, test_kv_delete_range_with_prefix,
test_kv_put_with_prefix, test_kv_range_2_with_prefix, test_kv_range_with_prefix,
test_txn_compare_equal, test_txn_compare_greater, test_txn_compare_less,
test_txn_compare_not_equal, test_txn_one_compare_op, text_txn_multi_compare_op,
unprepare_kv,
test_simple_kv_range, test_txn_compare_equal, test_txn_compare_greater,
test_txn_compare_less, test_txn_compare_not_equal, test_txn_one_compare_op,
text_txn_multi_compare_op, unprepare_kv,
};
use crate::maybe_skip_postgres_integration_test;
async fn build_pg_kv_backend(table_name: &str) -> Option<PgStore> {
let endpoints = std::env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default();
@@ -549,6 +584,7 @@ mod tests {
#[tokio::test]
async fn test_pg_put() {
maybe_skip_postgres_integration_test!();
let kv_backend = build_pg_kv_backend("put_test").await.unwrap();
let prefix = b"put/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
@@ -558,6 +594,7 @@ mod tests {
#[tokio::test]
async fn test_pg_range() {
maybe_skip_postgres_integration_test!();
let kv_backend = build_pg_kv_backend("range_test").await.unwrap();
let prefix = b"range/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
@@ -567,14 +604,26 @@ mod tests {
#[tokio::test]
async fn test_pg_range_2() {
maybe_skip_postgres_integration_test!();
let kv_backend = build_pg_kv_backend("range2_test").await.unwrap();
let prefix = b"range2/";
test_kv_range_2_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
}
#[tokio::test]
async fn test_pg_all_range() {
maybe_skip_postgres_integration_test!();
let kv_backend = build_pg_kv_backend("simple_range_test").await.unwrap();
let prefix = b"";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_simple_kv_range(&kv_backend).await;
unprepare_kv(&kv_backend, prefix).await;
}
#[tokio::test]
async fn test_pg_batch_get() {
maybe_skip_postgres_integration_test!();
let kv_backend = build_pg_kv_backend("batch_get_test").await.unwrap();
let prefix = b"batch_get/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
@@ -584,6 +633,7 @@ mod tests {
#[tokio::test]
async fn test_pg_batch_delete() {
maybe_skip_postgres_integration_test!();
let kv_backend = build_pg_kv_backend("batch_delete_test").await.unwrap();
let prefix = b"batch_delete/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
@@ -593,6 +643,7 @@ mod tests {
#[tokio::test]
async fn test_pg_batch_delete_with_prefix() {
maybe_skip_postgres_integration_test!();
let kv_backend = build_pg_kv_backend("batch_delete_with_prefix_test")
.await
.unwrap();
@@ -604,6 +655,7 @@ mod tests {
#[tokio::test]
async fn test_pg_delete_range() {
maybe_skip_postgres_integration_test!();
let kv_backend = build_pg_kv_backend("delete_range_test").await.unwrap();
let prefix = b"delete_range/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
@@ -613,6 +665,7 @@ mod tests {
#[tokio::test]
async fn test_pg_compare_and_put() {
maybe_skip_postgres_integration_test!();
let kv_backend = build_pg_kv_backend("compare_and_put_test").await.unwrap();
let prefix = b"compare_and_put/";
let kv_backend = Arc::new(kv_backend);
@@ -621,6 +674,7 @@ mod tests {
#[tokio::test]
async fn test_pg_txn() {
maybe_skip_postgres_integration_test!();
let kv_backend = build_pg_kv_backend("txn_test").await.unwrap();
test_txn_one_compare_op(&kv_backend).await;
text_txn_multi_compare_op(&kv_backend).await;

View File

@@ -108,6 +108,44 @@ pub async fn test_kv_range(kv_backend: &impl KvBackend) {
test_kv_range_with_prefix(kv_backend, vec![]).await;
}
pub async fn test_simple_kv_range(kvbackend: &impl KvBackend) {
{
let full_query = RangeRequest::new().with_range(vec![0], vec![0]);
let response = kvbackend.range(full_query).await.unwrap();
assert_eq!(response.kvs.len(), 4);
}
{
let point_query = RangeRequest::new().with_range(b"key11".to_vec(), vec![]);
let response = kvbackend.range(point_query).await.unwrap();
assert_eq!(response.kvs.len(), 1);
}
{
let left_bounded_query = RangeRequest::new().with_range(b"key1".to_vec(), vec![0]);
let response = kvbackend.range(left_bounded_query).await.unwrap();
assert_eq!(response.kvs.len(), 4);
}
{
let range_query = RangeRequest::new().with_range(b"key1".to_vec(), b"key11".to_vec());
let response = kvbackend.range(range_query).await.unwrap();
assert_eq!(response.kvs.len(), 1);
}
{
let prefix_query = RangeRequest::new().with_range(b"key1".to_vec(), b"key2".to_vec());
let response = kvbackend.range(prefix_query).await.unwrap();
assert_eq!(response.kvs.len(), 2);
}
{
let range_query = RangeRequest::new().with_range(b"key10".to_vec(), b"key100".to_vec());
let response = kvbackend.range(range_query).await.unwrap();
assert_eq!(response.kvs.len(), 0);
}
{
let prefix_query = RangeRequest::new().with_range(b"key10".to_vec(), b"key11".to_vec());
let response = kvbackend.range(prefix_query).await.unwrap();
assert_eq!(response.kvs.len(), 0);
}
}
pub async fn test_kv_range_with_prefix(kv_backend: &impl KvBackend, prefix: Vec<u8>) {
let key = [prefix.clone(), b"key1".to_vec()].concat();
let key11 = [prefix.clone(), b"key11".to_vec()].concat();

View File

@@ -15,6 +15,7 @@
#![feature(assert_matches)]
#![feature(btree_extract_if)]
#![feature(let_chains)]
#![feature(duration_millis_float)]
pub mod cache;
pub mod cache_invalidator;
@@ -41,6 +42,7 @@ pub mod region_keeper;
pub mod region_registry;
pub mod rpc;
pub mod sequence;
pub mod snapshot;
pub mod state_store;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;

View File

@@ -108,4 +108,10 @@ lazy_static! {
&["name"]
)
.unwrap();
pub static ref RDS_SQL_EXECUTE_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_meta_rds_pg_sql_execute_elapsed_ms",
"rds pg sql execute elapsed",
&["backend", "result", "op", "type"]
)
.unwrap();
}

View File

@@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#[cfg(feature = "enterprise")]
pub mod trigger;
use std::collections::{HashMap, HashSet};
use std::result;
@@ -68,6 +71,8 @@ pub enum DdlTask {
DropFlow(DropFlowTask),
CreateView(CreateViewTask),
DropView(DropViewTask),
#[cfg(feature = "enterprise")]
CreateTrigger(trigger::CreateTriggerTask),
}
impl DdlTask {
@@ -242,6 +247,18 @@ impl TryFrom<Task> for DdlTask {
Task::DropFlowTask(drop_flow) => Ok(DdlTask::DropFlow(drop_flow.try_into()?)),
Task::CreateViewTask(create_view) => Ok(DdlTask::CreateView(create_view.try_into()?)),
Task::DropViewTask(drop_view) => Ok(DdlTask::DropView(drop_view.try_into()?)),
Task::CreateTriggerTask(create_trigger) => {
#[cfg(feature = "enterprise")]
return Ok(DdlTask::CreateTrigger(create_trigger.try_into()?));
#[cfg(not(feature = "enterprise"))]
{
let _ = create_trigger;
crate::error::UnsupportedSnafu {
operation: "create trigger",
}
.fail()
}
}
}
}
}
@@ -292,6 +309,8 @@ impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
DdlTask::DropFlow(task) => Task::DropFlowTask(task.into()),
DdlTask::CreateView(task) => Task::CreateViewTask(task.try_into()?),
DdlTask::DropView(task) => Task::DropViewTask(task.into()),
#[cfg(feature = "enterprise")]
DdlTask::CreateTrigger(task) => Task::CreateTriggerTask(task.into()),
};
Ok(Self {

View File

@@ -0,0 +1,276 @@
use std::collections::HashMap;
use std::time::Duration;
use api::v1::meta::CreateTriggerTask as PbCreateTriggerTask;
use api::v1::notify_channel::ChannelType as PbChannelType;
use api::v1::{
CreateTriggerExpr, NotifyChannel as PbNotifyChannel, WebhookOptions as PbWebhookOptions,
};
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use crate::error;
use crate::error::Result;
use crate::rpc::ddl::DdlTask;
// Create trigger
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CreateTriggerTask {
pub catalog_name: String,
pub trigger_name: String,
pub if_not_exists: bool,
pub sql: String,
pub channels: Vec<NotifyChannel>,
pub labels: HashMap<String, String>,
pub annotations: HashMap<String, String>,
pub interval: Duration,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct NotifyChannel {
pub name: String,
pub channel_type: ChannelType,
}
/// The available channel enum for sending trigger notifications.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum ChannelType {
Webhook(WebhookOptions),
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct WebhookOptions {
/// The URL of the AlertManager API endpoint.
///
/// e.g., "http://localhost:9093".
pub url: String,
/// Configuration options for the AlertManager webhook. e.g., timeout, etc.
pub opts: HashMap<String, String>,
}
impl From<CreateTriggerTask> for PbCreateTriggerTask {
fn from(task: CreateTriggerTask) -> Self {
let channels = task
.channels
.into_iter()
.map(PbNotifyChannel::from)
.collect();
let expr = CreateTriggerExpr {
catalog_name: task.catalog_name,
trigger_name: task.trigger_name,
create_if_not_exists: task.if_not_exists,
sql: task.sql,
channels,
labels: task.labels,
annotations: task.annotations,
interval: task.interval.as_secs(),
};
PbCreateTriggerTask {
create_trigger: Some(expr),
}
}
}
impl TryFrom<PbCreateTriggerTask> for CreateTriggerTask {
type Error = error::Error;
fn try_from(task: PbCreateTriggerTask) -> Result<Self> {
let expr = task.create_trigger.context(error::InvalidProtoMsgSnafu {
err_msg: "expected create_trigger",
})?;
let channels = expr
.channels
.into_iter()
.map(NotifyChannel::try_from)
.collect::<Result<Vec<_>>>()?;
let task = CreateTriggerTask {
catalog_name: expr.catalog_name,
trigger_name: expr.trigger_name,
if_not_exists: expr.create_if_not_exists,
sql: expr.sql,
channels,
labels: expr.labels,
annotations: expr.annotations,
interval: Duration::from_secs(expr.interval),
};
Ok(task)
}
}
impl From<NotifyChannel> for PbNotifyChannel {
fn from(channel: NotifyChannel) -> Self {
let NotifyChannel { name, channel_type } = channel;
let channel_type = match channel_type {
ChannelType::Webhook(options) => PbChannelType::Webhook(PbWebhookOptions {
url: options.url,
opts: options.opts,
}),
};
PbNotifyChannel {
name,
channel_type: Some(channel_type),
}
}
}
impl TryFrom<PbNotifyChannel> for NotifyChannel {
type Error = error::Error;
fn try_from(channel: PbNotifyChannel) -> Result<Self> {
let PbNotifyChannel { name, channel_type } = channel;
let channel_type = channel_type.context(error::InvalidProtoMsgSnafu {
err_msg: "expected channel_type",
})?;
let channel_type = match channel_type {
PbChannelType::Webhook(options) => ChannelType::Webhook(WebhookOptions {
url: options.url,
opts: options.opts,
}),
};
Ok(NotifyChannel { name, channel_type })
}
}
impl DdlTask {
/// Creates a [`DdlTask`] to create a trigger.
pub fn new_create_trigger(expr: CreateTriggerTask) -> Self {
DdlTask::CreateTrigger(expr)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_convert_create_trigger_task() {
let original = CreateTriggerTask {
catalog_name: "test_catalog".to_string(),
trigger_name: "test_trigger".to_string(),
if_not_exists: true,
sql: "SELECT * FROM test".to_string(),
channels: vec![
NotifyChannel {
name: "channel1".to_string(),
channel_type: ChannelType::Webhook(WebhookOptions {
url: "http://localhost:9093".to_string(),
opts: HashMap::from([("timeout".to_string(), "30s".to_string())]),
}),
},
NotifyChannel {
name: "channel2".to_string(),
channel_type: ChannelType::Webhook(WebhookOptions {
url: "http://alertmanager:9093".to_string(),
opts: HashMap::new(),
}),
},
],
labels: vec![
("key1".to_string(), "value1".to_string()),
("key2".to_string(), "value2".to_string()),
]
.into_iter()
.collect(),
annotations: vec![
("summary".to_string(), "Test alert".to_string()),
("description".to_string(), "This is a test".to_string()),
]
.into_iter()
.collect(),
interval: Duration::from_secs(60),
};
let pb_task: PbCreateTriggerTask = original.clone().into();
let expr = pb_task.create_trigger.as_ref().unwrap();
assert_eq!(expr.catalog_name, "test_catalog");
assert_eq!(expr.trigger_name, "test_trigger");
assert!(expr.create_if_not_exists);
assert_eq!(expr.sql, "SELECT * FROM test");
assert_eq!(expr.channels.len(), 2);
assert_eq!(expr.labels.len(), 2);
assert_eq!(expr.labels.get("key1").unwrap(), "value1");
assert_eq!(expr.labels.get("key2").unwrap(), "value2");
assert_eq!(expr.annotations.len(), 2);
assert_eq!(expr.annotations.get("summary").unwrap(), "Test alert");
assert_eq!(
expr.annotations.get("description").unwrap(),
"This is a test"
);
assert_eq!(expr.interval, 60);
let round_tripped = CreateTriggerTask::try_from(pb_task).unwrap();
assert_eq!(original.catalog_name, round_tripped.catalog_name);
assert_eq!(original.trigger_name, round_tripped.trigger_name);
assert_eq!(original.if_not_exists, round_tripped.if_not_exists);
assert_eq!(original.sql, round_tripped.sql);
assert_eq!(original.channels.len(), round_tripped.channels.len());
assert_eq!(&original.channels[0], &round_tripped.channels[0]);
assert_eq!(&original.channels[1], &round_tripped.channels[1]);
assert_eq!(original.labels, round_tripped.labels);
assert_eq!(original.annotations, round_tripped.annotations);
assert_eq!(original.interval, round_tripped.interval);
// Invalid, since create_trigger is None and it's required.
let invalid_task = PbCreateTriggerTask {
create_trigger: None,
};
let result = CreateTriggerTask::try_from(invalid_task);
assert!(result.is_err());
}
#[test]
fn test_convert_notify_channel() {
let original = NotifyChannel {
name: "test_channel".to_string(),
channel_type: ChannelType::Webhook(WebhookOptions {
url: "http://localhost:9093".to_string(),
opts: HashMap::new(),
}),
};
let pb_channel: PbNotifyChannel = original.clone().into();
match pb_channel.channel_type.as_ref().unwrap() {
PbChannelType::Webhook(options) => {
assert_eq!(pb_channel.name, "test_channel");
assert_eq!(options.url, "http://localhost:9093");
assert!(options.opts.is_empty());
}
}
let round_tripped = NotifyChannel::try_from(pb_channel).unwrap();
assert_eq!(original, round_tripped);
// Test with timeout is None.
let no_timeout = NotifyChannel {
name: "no_timeout".to_string(),
channel_type: ChannelType::Webhook(WebhookOptions {
url: "http://localhost:9093".to_string(),
opts: HashMap::new(),
}),
};
let pb_no_timeout: PbNotifyChannel = no_timeout.clone().into();
match pb_no_timeout.channel_type.as_ref().unwrap() {
PbChannelType::Webhook(options) => {
assert_eq!(options.url, "http://localhost:9093");
}
}
let round_tripped_no_timeout = NotifyChannel::try_from(pb_no_timeout).unwrap();
assert_eq!(no_timeout, round_tripped_no_timeout);
// Invalid, since channel_type is None and it's required.
let invalid_channel = PbNotifyChannel {
name: "invalid".to_string(),
channel_type: None,
};
let result = NotifyChannel::try_from(invalid_channel);
assert!(result.is_err());
}
}

View File

@@ -0,0 +1,380 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod file;
use std::fmt::{Display, Formatter};
use std::path::{Path, PathBuf};
use std::time::Instant;
use common_telemetry::info;
use file::{Metadata, MetadataContent};
use futures::TryStreamExt;
use object_store::ObjectStore;
use snafu::{OptionExt, ResultExt};
use strum::Display;
use crate::error::{
Error, InvalidFileExtensionSnafu, InvalidFileNameSnafu, InvalidFilePathSnafu, ReadObjectSnafu,
Result, WriteObjectSnafu,
};
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use crate::rpc::store::{BatchPutRequest, RangeRequest};
use crate::rpc::KeyValue;
use crate::snapshot::file::{Document, KeyValue as FileKeyValue};
/// The format of the backup file.
#[derive(Debug, PartialEq, Eq, Display, Clone, Copy)]
pub enum FileFormat {
#[strum(serialize = "fb")]
FlexBuffers,
}
impl TryFrom<&str> for FileFormat {
type Error = String;
fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
match value.to_lowercase().as_str() {
"fb" => Ok(FileFormat::FlexBuffers),
_ => Err(format!("Invalid file format: {}", value)),
}
}
}
#[derive(Debug, PartialEq, Eq, Display)]
#[strum(serialize_all = "lowercase")]
pub enum DataType {
Metadata,
}
impl TryFrom<&str> for DataType {
type Error = String;
fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
match value.to_lowercase().as_str() {
"metadata" => Ok(DataType::Metadata),
_ => Err(format!("Invalid data type: {}", value)),
}
}
}
#[derive(Debug, PartialEq, Eq)]
pub struct FileExtension {
format: FileFormat,
data_type: DataType,
}
impl FileExtension {
pub fn new(format: FileFormat, data_type: DataType) -> Self {
Self { format, data_type }
}
}
impl Display for FileExtension {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}.{}", self.data_type, self.format)
}
}
impl TryFrom<&str> for FileExtension {
type Error = Error;
fn try_from(value: &str) -> Result<Self> {
let parts = value.split(".").collect::<Vec<&str>>();
if parts.len() != 2 {
return InvalidFileExtensionSnafu {
reason: format!(
"Extension should be in the format of <datatype>.<format>, got: {}",
value
),
}
.fail();
}
let data_type = DataType::try_from(parts[0])
.map_err(|e| InvalidFileExtensionSnafu { reason: e }.build())?;
let format = FileFormat::try_from(parts[1])
.map_err(|e| InvalidFileExtensionSnafu { reason: e }.build())?;
Ok(FileExtension { format, data_type })
}
}
#[derive(Debug, PartialEq, Eq)]
pub struct FileName {
name: String,
extension: FileExtension,
}
impl Display for FileName {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}.{}", self.name, self.extension)
}
}
impl TryFrom<&str> for FileName {
type Error = Error;
fn try_from(value: &str) -> Result<Self> {
let Some((name, extension)) = value.split_once(".") else {
return InvalidFileNameSnafu {
reason: format!(
"The file name should be in the format of <name>.<extension>, got: {}",
value
),
}
.fail();
};
let extension = FileExtension::try_from(extension)?;
Ok(Self {
name: name.to_string(),
extension,
})
}
}
impl FileName {
fn new(name: String, extension: FileExtension) -> Self {
Self { name, extension }
}
}
/// The manager of the metadata snapshot.
///
/// It manages the metadata snapshot, including dumping and restoring.
pub struct MetadataSnapshotManager {
kv_backend: KvBackendRef,
object_store: ObjectStore,
}
/// The maximum size of the request to put metadata, use 1MiB by default.
const MAX_REQUEST_SIZE: usize = 1024 * 1024;
impl MetadataSnapshotManager {
pub fn new(kv_backend: KvBackendRef, object_store: ObjectStore) -> Self {
Self {
kv_backend,
object_store,
}
}
/// Restores the metadata from the backup file to the metadata store.
pub async fn restore(&self, file_path: &str) -> Result<u64> {
let path = Path::new(file_path);
let file_name = path
.file_name()
.and_then(|s| s.to_str())
.context(InvalidFilePathSnafu { file_path })?;
let filename = FileName::try_from(file_name)?;
let data = self
.object_store
.read(file_path)
.await
.context(ReadObjectSnafu { file_path })?;
let document = Document::from_slice(&filename.extension.format, &data.to_bytes())?;
let metadata_content = document.into_metadata_content()?;
let mut req = BatchPutRequest::default();
let mut total_request_size = 0;
let mut count = 0;
let now = Instant::now();
for FileKeyValue { key, value } in metadata_content.into_iter() {
count += 1;
let key_size = key.len();
let value_size = value.len();
if total_request_size + key_size + value_size > MAX_REQUEST_SIZE {
self.kv_backend.batch_put(req).await?;
req = BatchPutRequest::default();
total_request_size = 0;
}
req.kvs.push(KeyValue { key, value });
total_request_size += key_size + value_size;
}
if !req.kvs.is_empty() {
self.kv_backend.batch_put(req).await?;
}
info!(
"Restored metadata from {} successfully, total {} key-value pairs, elapsed {:?}",
file_path,
count,
now.elapsed()
);
Ok(count)
}
pub async fn check_target_source_clean(&self) -> Result<bool> {
let req = RangeRequest::new().with_range(vec![0], vec![0]);
let mut stream = Box::pin(
PaginationStream::new(self.kv_backend.clone(), req, 1, Result::Ok).into_stream(),
);
let v = stream.as_mut().try_next().await?;
Ok(v.is_none())
}
/// Dumps the metadata to the backup file.
pub async fn dump(&self, path: &str, filename_str: &str) -> Result<(String, u64)> {
let format = FileFormat::FlexBuffers;
let filename = FileName::new(
filename_str.to_string(),
FileExtension {
format,
data_type: DataType::Metadata,
},
);
let file_path_buf = [path, filename.to_string().as_str()]
.iter()
.collect::<PathBuf>();
let file_path = file_path_buf.to_str().context(InvalidFileNameSnafu {
reason: format!("Invalid file path: {}, filename: {}", path, filename_str),
})?;
let now = Instant::now();
let req = RangeRequest::new().with_range(vec![0], vec![0]);
let stream = PaginationStream::new(self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, |kv| {
Ok(FileKeyValue {
key: kv.key,
value: kv.value,
})
})
.into_stream();
let keyvalues = stream.try_collect::<Vec<_>>().await?;
let num_keyvalues = keyvalues.len();
let document = Document::new(
Metadata::new(),
file::Content::Metadata(MetadataContent::new(keyvalues)),
);
let bytes = document.to_bytes(&format)?;
let r = self
.object_store
.write(file_path, bytes)
.await
.context(WriteObjectSnafu { file_path })?;
info!(
"Dumped metadata to {} successfully, total {} key-value pairs, file size {} bytes, elapsed {:?}",
file_path,
num_keyvalues,
r.content_length(),
now.elapsed()
);
Ok((filename.to_string(), num_keyvalues as u64))
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::sync::Arc;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use object_store::services::Fs;
use super::*;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::KvBackend;
use crate::rpc::store::PutRequest;
#[test]
fn test_file_name() {
let file_name = FileName::try_from("test.metadata.fb").unwrap();
assert_eq!(file_name.name, "test");
assert_eq!(file_name.extension.format, FileFormat::FlexBuffers);
assert_eq!(file_name.extension.data_type, DataType::Metadata);
assert_eq!(file_name.to_string(), "test.metadata.fb");
let invalid_file_name = FileName::try_from("test.metadata").unwrap_err();
assert_eq!(
invalid_file_name.to_string(),
"Invalid file extension: Extension should be in the format of <datatype>.<format>, got: metadata"
);
let invalid_file_extension = FileName::try_from("test.metadata.hello").unwrap_err();
assert_eq!(
invalid_file_extension.to_string(),
"Invalid file extension: Invalid file format: hello"
);
}
fn test_env(
prefix: &str,
) -> (
TempDir,
Arc<MemoryKvBackend<Error>>,
MetadataSnapshotManager,
) {
let temp_dir = create_temp_dir(prefix);
let kv_backend = Arc::new(MemoryKvBackend::default());
let temp_path = temp_dir.path();
let data_path = temp_path.join("data").as_path().display().to_string();
let builder = Fs::default().root(&data_path);
let object_store = ObjectStore::new(builder).unwrap().finish();
let manager = MetadataSnapshotManager::new(kv_backend.clone(), object_store);
(temp_dir, kv_backend, manager)
}
#[tokio::test]
async fn test_dump_and_restore() {
common_telemetry::init_default_ut_logging();
let (temp_dir, kv_backend, manager) = test_env("test_dump_and_restore");
let temp_path = temp_dir.path();
for i in 0..10 {
kv_backend
.put(
PutRequest::new()
.with_key(format!("test_{}", i).as_bytes().to_vec())
.with_value(format!("value_{}", i).as_bytes().to_vec()),
)
.await
.unwrap();
}
let dump_path = temp_path.join("snapshot");
manager
.dump(
&dump_path.as_path().display().to_string(),
"metadata_snapshot",
)
.await
.unwrap();
// Clean up the kv backend
kv_backend.clear();
let restore_path = dump_path
.join("metadata_snapshot.metadata.fb")
.as_path()
.display()
.to_string();
manager.restore(&restore_path).await.unwrap();
for i in 0..10 {
let key = format!("test_{}", i);
let value = kv_backend.get(key.as_bytes()).await.unwrap().unwrap();
assert_eq!(value.value, format!("value_{}", i).as_bytes());
}
}
#[tokio::test]
async fn test_restore_from_nonexistent_file() {
let (temp_dir, _kv_backend, manager) = test_env("test_restore_from_nonexistent_file");
let restore_path = temp_dir
.path()
.join("nonexistent.metadata.fb")
.as_path()
.display()
.to_string();
let err = manager.restore(&restore_path).await.unwrap_err();
assert_matches!(err, Error::ReadObject { .. })
}
}

View File

@@ -0,0 +1,145 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_time::util::current_time_millis;
use flexbuffers::{FlexbufferSerializer, Reader};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use crate::error::{
DeserializeFlexbuffersSnafu, ReadFlexbuffersSnafu, Result, SerializeFlexbuffersSnafu,
};
use crate::snapshot::FileFormat;
/// The layout of the backup file.
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub(crate) struct Document {
metadata: Metadata,
content: Content,
}
impl Document {
/// Creates a new document.
pub fn new(metadata: Metadata, content: Content) -> Self {
Self { metadata, content }
}
fn serialize_to_flexbuffer(&self) -> Result<Vec<u8>> {
let mut builder = FlexbufferSerializer::new();
self.serialize(&mut builder)
.context(SerializeFlexbuffersSnafu)?;
Ok(builder.take_buffer())
}
/// Converts the [`Document`] to a bytes.
pub(crate) fn to_bytes(&self, format: &FileFormat) -> Result<Vec<u8>> {
match format {
FileFormat::FlexBuffers => self.serialize_to_flexbuffer(),
}
}
fn deserialize_from_flexbuffer(data: &[u8]) -> Result<Self> {
let reader = Reader::get_root(data).context(ReadFlexbuffersSnafu)?;
Document::deserialize(reader).context(DeserializeFlexbuffersSnafu)
}
/// Deserializes the [`Document`] from a bytes.
pub(crate) fn from_slice(format: &FileFormat, data: &[u8]) -> Result<Self> {
match format {
FileFormat::FlexBuffers => Self::deserialize_from_flexbuffer(data),
}
}
/// Converts the [`Document`] to a [`MetadataContent`].
pub(crate) fn into_metadata_content(self) -> Result<MetadataContent> {
match self.content {
Content::Metadata(metadata) => Ok(metadata),
}
}
}
/// The metadata of the backup file.
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub(crate) struct Metadata {
// UNIX_EPOCH in milliseconds.
created_timestamp_mills: i64,
}
impl Metadata {
/// Create a new metadata.
///
/// The `created_timestamp_mills` will be the current time in milliseconds.
pub fn new() -> Self {
Self {
created_timestamp_mills: current_time_millis(),
}
}
}
/// The content of the backup file.
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub(crate) enum Content {
Metadata(MetadataContent),
}
/// The content of the backup file.
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub(crate) struct MetadataContent {
values: Vec<KeyValue>,
}
impl MetadataContent {
/// Create a new metadata content.
pub fn new(values: impl IntoIterator<Item = KeyValue>) -> Self {
Self {
values: values.into_iter().collect(),
}
}
/// Returns an iterator over the key-value pairs.
pub fn into_iter(self) -> impl Iterator<Item = KeyValue> {
self.values.into_iter()
}
}
/// The key-value pair of the backup file.
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub(crate) struct KeyValue {
pub key: Vec<u8>,
pub value: Vec<u8>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_document() {
let document = Document::new(
Metadata::new(),
Content::Metadata(MetadataContent::new(vec![KeyValue {
key: b"key".to_vec(),
value: b"value".to_vec(),
}])),
);
let bytes = document.to_bytes(&FileFormat::FlexBuffers).unwrap();
let document_deserialized = Document::from_slice(&FileFormat::FlexBuffers, &bytes).unwrap();
assert_eq!(
document.metadata.created_timestamp_mills,
document_deserialized.metadata.created_timestamp_mills
);
assert_eq!(document.content, document_deserialized.content);
}
}

View File

@@ -233,3 +233,35 @@ pub async fn test_kafka_topic_pool(
KafkaTopicPool::new(&config, kv_backend, topic_creator)
}
#[macro_export]
/// Skip the test if the environment variable `GT_KAFKA_ENDPOINTS` is not set.
///
/// The format of the environment variable is:
/// ```
/// GT_KAFKA_ENDPOINTS=localhost:9092,localhost:9093
/// ```
macro_rules! maybe_skip_postgres_integration_test {
() => {
if std::env::var("GT_POSTGRES_ENDPOINTS").is_err() {
common_telemetry::warn!("The endpoints is empty, skipping the test");
return;
}
};
}
#[macro_export]
/// Skip the test if the environment variable `GT_KAFKA_ENDPOINTS` is not set.
///
/// The format of the environment variable is:
/// ```
/// GT_KAFKA_ENDPOINTS=localhost:9092,localhost:9093
/// ```
macro_rules! maybe_skip_mysql_integration_test {
() => {
if std::env::var("GT_MYSQL_ENDPOINTS").is_err() {
common_telemetry::warn!("The endpoints is empty, skipping the test");
return;
}
};
}

View File

@@ -138,7 +138,10 @@ pub enum Error {
},
#[snafu(display("Procedure exec failed"))]
RetryLater { source: BoxedError },
RetryLater {
source: BoxedError,
clean_poisons: bool,
},
#[snafu(display("Procedure panics, procedure_id: {}", procedure_id))]
ProcedurePanic { procedure_id: ProcedureId },
@@ -298,6 +301,15 @@ impl Error {
pub fn retry_later<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
Error::RetryLater {
source: BoxedError::new(err),
clean_poisons: false,
}
}
/// Creates a new [Error::RetryLater] error from source `err` and clean poisons.
pub fn retry_later_and_clean_poisons<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
Error::RetryLater {
source: BoxedError::new(err),
clean_poisons: true,
}
}
@@ -309,6 +321,7 @@ impl Error {
/// Determine whether it needs to clean poisons.
pub fn need_clean_poisons(&self) -> bool {
matches!(self, Error::External { clean_poisons, .. } if *clean_poisons)
|| matches!(self, Error::RetryLater { clean_poisons, .. } if *clean_poisons)
}
/// Creates a new [Error::RetryLater] or [Error::External] error from source `err` according

View File

@@ -358,10 +358,11 @@ impl Runner {
Err(e) => {
error!(
e;
"Failed to execute procedure {}-{}, retry: {}",
"Failed to execute procedure {}-{}, retry: {}, clean_poisons: {}",
self.procedure.type_name(),
self.meta.id,
e.is_retry_later(),
e.need_clean_poisons(),
);
// Don't store state if `ProcedureManager` is stopped.
@@ -378,6 +379,11 @@ impl Runner {
self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
return;
}
debug!(
"Procedure {}-{} cleaned poisons",
self.procedure.type_name(),
self.meta.id,
);
}
if e.is_retry_later() {
@@ -581,6 +587,7 @@ impl Runner {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
@@ -1598,6 +1605,75 @@ mod tests {
assert_eq!(&procedure_id.to_string(), ROOT_ID);
}
#[tokio::test]
async fn test_execute_exceed_max_retry_after_set_poison() {
common_telemetry::init_default_ut_logging();
let mut times = 0;
let poison_key = PoisonKey::new("table/1024");
let moved_poison_key = poison_key.clone();
let exec_fn = move |ctx: Context| {
times += 1;
let poison_key = moved_poison_key.clone();
async move {
if times == 1 {
Ok(Status::executing(true))
} else {
// Put the poison to the context.
ctx.provider
.try_put_poison(&poison_key, ctx.procedure_id)
.await
.unwrap();
Err(Error::retry_later_and_clean_poisons(MockError::new(
StatusCode::Unexpected,
)))
}
}
.boxed()
};
let poison = ProcedureAdapter {
data: "poison".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
exec_fn,
rollback_fn: None,
};
let dir = create_temp_dir("exceed_max_after_set_poison");
let meta = poison.new_meta(ROOT_ID);
let object_store = test_util::new_object_store(&dir);
let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store);
runner.manager_ctx.start();
runner.exponential_builder = ExponentialBuilder::default()
.with_min_delay(Duration::from_millis(1))
.with_max_times(3);
// Use the manager ctx as the context provider.
let ctx = context_with_provider(
meta.id,
runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
);
// Manually add this procedure to the manager ctx.
runner
.manager_ctx
.procedures
.write()
.unwrap()
.insert(meta.id, runner.meta.clone());
// Run the runner and execute the procedure.
runner.execute_once_with_retry(&ctx).await;
let err = meta.state().error().unwrap().clone();
assert_matches!(&*err, Error::RetryTimesExceeded { .. });
// Check the poison is deleted.
let procedure_id = runner
.manager_ctx
.poison_manager
.get_poison(&poison_key.to_string())
.await
.unwrap();
assert_eq!(procedure_id, None);
}
#[tokio::test]
async fn test_execute_poisoned() {
let mut times = 0;

View File

@@ -103,13 +103,6 @@ pub enum Error {
source: common_recordbatch::error::Error,
},
#[snafu(display("Failed to convert arrow schema"))]
ConvertArrowSchema {
#[snafu(implicit)]
location: Location,
source: DataTypeError,
},
#[snafu(display("Failed to cast array to {:?}", typ))]
TypeCast {
#[snafu(source)]
@@ -244,7 +237,6 @@ impl ErrorExt for Error {
Error::InvalidInputType { source, .. }
| Error::IntoVector { source, .. }
| Error::FromScalarValue { source, .. }
| Error::ConvertArrowSchema { source, .. }
| Error::FromArrowArray { source, .. }
| Error::InvalidVectorString { source, .. } => source.status_code(),

View File

@@ -133,6 +133,18 @@ pub enum Error {
source: datatypes::error::Error,
},
#[snafu(display(
"Failed to downcast vector of type '{:?}' to type '{:?}'",
from_type,
to_type
))]
DowncastVector {
from_type: ConcreteDataType,
to_type: ConcreteDataType,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Error occurs when performing arrow computation"))]
ArrowCompute {
#[snafu(source)]
@@ -192,6 +204,8 @@ impl ErrorExt for Error {
| Error::PhysicalExpr { .. }
| Error::RecordBatchSliceIndexOverflow { .. } => StatusCode::Internal,
Error::DowncastVector { .. } => StatusCode::Unexpected,
Error::PollStream { .. } => StatusCode::EngineExecuteQuery,
Error::ArrowCompute { .. } => StatusCode::IllegalState,

View File

@@ -30,13 +30,16 @@ pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecord
use datatypes::arrow::compute::SortOptions;
pub use datatypes::arrow::record_batch::RecordBatch as DfRecordBatch;
use datatypes::arrow::util::pretty;
use datatypes::prelude::VectorRef;
use datatypes::schema::{Schema, SchemaRef};
use datatypes::prelude::{ConcreteDataType, VectorRef};
use datatypes::scalars::{ScalarVector, ScalarVectorBuilder};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::types::json_type_value_to_string;
use datatypes::vectors::{BinaryVector, StringVectorBuilder};
use error::Result;
use futures::task::{Context, Poll};
use futures::{Stream, TryStreamExt};
pub use recordbatch::RecordBatch;
use snafu::{ensure, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
fn name(&self) -> &str {
@@ -58,6 +61,146 @@ pub struct OrderOption {
pub options: SortOptions,
}
/// A wrapper that maps a [RecordBatchStream] to a new [RecordBatchStream] by applying a function to each [RecordBatch].
///
/// The mapper function is applied to each [RecordBatch] in the stream.
/// The schema of the new [RecordBatchStream] is the same as the schema of the inner [RecordBatchStream] after applying the schema mapper function.
/// The output ordering of the new [RecordBatchStream] is the same as the output ordering of the inner [RecordBatchStream].
/// The metrics of the new [RecordBatchStream] is the same as the metrics of the inner [RecordBatchStream] if it is not `None`.
pub struct SendableRecordBatchMapper {
inner: SendableRecordBatchStream,
/// The mapper function is applied to each [RecordBatch] in the stream.
/// The original schema and the mapped schema are passed to the mapper function.
mapper: fn(RecordBatch, &SchemaRef, &SchemaRef) -> Result<RecordBatch>,
/// The schema of the new [RecordBatchStream] is the same as the schema of the inner [RecordBatchStream] after applying the schema mapper function.
schema: SchemaRef,
/// Whether the mapper function is applied to each [RecordBatch] in the stream.
apply_mapper: bool,
}
/// Maps the json type to string in the batch.
///
/// The json type is mapped to string by converting the json value to string.
/// The batch is updated to have the same number of columns as the original batch,
/// but with the json type mapped to string.
pub fn map_json_type_to_string(
batch: RecordBatch,
original_schema: &SchemaRef,
mapped_schema: &SchemaRef,
) -> Result<RecordBatch> {
let mut vectors = Vec::with_capacity(original_schema.column_schemas().len());
for (vector, schema) in batch.columns.iter().zip(original_schema.column_schemas()) {
if let ConcreteDataType::Json(j) = schema.data_type {
let mut string_vector_builder = StringVectorBuilder::with_capacity(vector.len());
let binary_vector = vector
.as_any()
.downcast_ref::<BinaryVector>()
.with_context(|| error::DowncastVectorSnafu {
from_type: schema.data_type.clone(),
to_type: ConcreteDataType::binary_datatype(),
})?;
for value in binary_vector.iter_data() {
let Some(value) = value else {
string_vector_builder.push(None);
continue;
};
let string_value =
json_type_value_to_string(value, &j.format).with_context(|_| {
error::CastVectorSnafu {
from_type: schema.data_type.clone(),
to_type: ConcreteDataType::string_datatype(),
}
})?;
string_vector_builder.push(Some(string_value.as_str()));
}
let string_vector = string_vector_builder.finish();
vectors.push(Arc::new(string_vector) as VectorRef);
} else {
vectors.push(vector.clone());
}
}
RecordBatch::new(mapped_schema.clone(), vectors)
}
/// Maps the json type to string in the schema.
///
/// The json type is mapped to string by converting the json value to string.
/// The schema is updated to have the same number of columns as the original schema,
/// but with the json type mapped to string.
///
/// Returns the new schema and whether the schema needs to be mapped to string.
pub fn map_json_type_to_string_schema(schema: SchemaRef) -> (SchemaRef, bool) {
let mut new_columns = Vec::with_capacity(schema.column_schemas().len());
let mut apply_mapper = false;
for column in schema.column_schemas() {
if matches!(column.data_type, ConcreteDataType::Json(_)) {
new_columns.push(ColumnSchema::new(
column.name.to_string(),
ConcreteDataType::string_datatype(),
column.is_nullable(),
));
apply_mapper = true;
} else {
new_columns.push(column.clone());
}
}
(Arc::new(Schema::new(new_columns)), apply_mapper)
}
impl SendableRecordBatchMapper {
/// Creates a new [SendableRecordBatchMapper] with the given inner [RecordBatchStream], mapper function, and schema mapper function.
pub fn new(
inner: SendableRecordBatchStream,
mapper: fn(RecordBatch, &SchemaRef, &SchemaRef) -> Result<RecordBatch>,
schema_mapper: fn(SchemaRef) -> (SchemaRef, bool),
) -> Self {
let (mapped_schema, apply_mapper) = schema_mapper(inner.schema());
Self {
inner,
mapper,
schema: mapped_schema,
apply_mapper,
}
}
}
impl RecordBatchStream for SendableRecordBatchMapper {
fn name(&self) -> &str {
"SendableRecordBatchMapper"
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn output_ordering(&self) -> Option<&[OrderOption]> {
self.inner.output_ordering()
}
fn metrics(&self) -> Option<RecordBatchMetrics> {
self.inner.metrics()
}
}
impl Stream for SendableRecordBatchMapper {
type Item = Result<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.apply_mapper {
Pin::new(&mut self.inner).poll_next(cx).map(|opt| {
opt.map(|result| {
result
.and_then(|batch| (self.mapper)(batch, &self.inner.schema(), &self.schema))
})
})
} else {
Pin::new(&mut self.inner).poll_next(cx)
}
}
}
/// EmptyRecordBatchStream can be used to create a RecordBatchStream
/// that will produce no results
pub struct EmptyRecordBatchStream {

View File

@@ -9,6 +9,7 @@ workspace = true
[dependencies]
client = { workspace = true, features = ["testing"] }
common-grpc.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true
once_cell.workspace = true

View File

@@ -0,0 +1,26 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_grpc::flight::{FlightEncoder, FlightMessage};
use common_grpc::FlightData;
use common_recordbatch::DfRecordBatch;
/// Encodes record batch to a Schema message and a RecordBatch message.
pub fn encode_to_flight_data(rb: DfRecordBatch) -> (FlightData, FlightData) {
let mut encoder = FlightEncoder::default();
(
encoder.encode(FlightMessage::Schema(rb.schema())),
encoder.encode(FlightMessage::RecordBatch(rb)),
)
}

View File

@@ -16,6 +16,7 @@ use std::path::{Path, PathBuf};
use std::process::Command;
use std::sync::LazyLock;
pub mod flight;
pub mod ports;
pub mod recordbatch;
pub mod temp_dir;

View File

@@ -158,12 +158,7 @@ mod tests {
provider = "kafka"
broker_endpoints = ["127.0.0.1:9092"]
max_batch_bytes = "1MB"
linger = "200ms"
consumer_wait_timeout = "100ms"
backoff_init = "500ms"
backoff_max = "10s"
backoff_base = 2
backoff_deadline = "5mins"
num_topics = 32
num_partitions = 1
selector_type = "round_robin"

View File

@@ -65,6 +65,7 @@ servers.workspace = true
session.workspace = true
smallvec.workspace = true
snafu.workspace = true
sql.workspace = true
store-api.workspace = true
strum.workspace = true
substrait.workspace = true

View File

@@ -359,7 +359,7 @@ impl FlowDualEngine {
}
} else {
warn!(
"Flownode {:?} found flows not exist in flownode, flow_ids={:?}",
"Flows do not exist in flownode for node {:?}, flow_ids={:?}",
nodeid, to_be_created
);
}
@@ -379,7 +379,7 @@ impl FlowDualEngine {
}
} else {
warn!(
"Flownode {:?} found flows not exist in flownode, flow_ids={:?}",
"Flows do not exist in metadata for node {:?}, flow_ids={:?}",
nodeid, to_be_dropped
);
}
@@ -826,9 +826,17 @@ fn to_meta_err(
location: snafu::Location,
) -> impl FnOnce(crate::error::Error) -> common_meta::error::Error {
move |err: crate::error::Error| -> common_meta::error::Error {
common_meta::error::Error::External {
location,
source: BoxedError::new(err),
match err {
crate::error::Error::FlowNotFound { id, .. } => {
common_meta::error::Error::FlowNotFound {
flow_name: format!("flow_id={id}"),
location,
}
}
_ => common_meta::error::Error::External {
location,
source: BoxedError::new(err),
},
}
}
}

View File

@@ -39,7 +39,8 @@ use crate::batching_mode::time_window::{find_time_window_expr, TimeWindowExpr};
use crate::batching_mode::utils::sql_to_df_plan;
use crate::engine::FlowEngine;
use crate::error::{
ExternalSnafu, FlowAlreadyExistSnafu, TableNotFoundMetaSnafu, UnexpectedSnafu, UnsupportedSnafu,
ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu,
UnexpectedSnafu, UnsupportedSnafu,
};
use crate::{CreateFlowArgs, Error, FlowId, TableName};
@@ -312,7 +313,7 @@ impl BatchingEngine {
.unwrap_or("None".to_string())
);
let task = BatchingTask::new(
let task = BatchingTask::try_new(
flow_id,
&sql,
plan,
@@ -323,7 +324,7 @@ impl BatchingEngine {
query_ctx,
self.catalog_manager.clone(),
rx,
);
)?;
let task_inner = task.clone();
let engine = self.query_engine.clone();
@@ -349,7 +350,8 @@ impl BatchingEngine {
pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> {
if self.tasks.write().await.remove(&flow_id).is_none() {
warn!("Flow {flow_id} not found in tasks")
warn!("Flow {flow_id} not found in tasks");
FlowNotFoundSnafu { id: flow_id }.fail()?;
}
let Some(tx) = self.shutdown_txs.write().await.remove(&flow_id) else {
UnexpectedSnafu {
@@ -366,9 +368,7 @@ impl BatchingEngine {
pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> {
debug!("Try flush flow {flow_id}");
let task = self.tasks.read().await.get(&flow_id).cloned();
let task = task.with_context(|| UnexpectedSnafu {
reason: format!("Can't found task for flow {flow_id}"),
})?;
let task = task.with_context(|| FlowNotFoundSnafu { id: flow_id })?;
task.mark_all_windows_as_dirty()?;

View File

@@ -14,8 +14,9 @@
//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
use std::sync::{Arc, Weak};
use std::time::SystemTime;
use std::collections::HashMap;
use std::sync::{Arc, Mutex, Weak};
use std::time::{Duration, Instant, SystemTime};
use api::v1::greptime_request::Request;
use api::v1::CreateTableExpr;
@@ -26,20 +27,21 @@ use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
use common_meta::peer::Peer;
use common_meta::rpc::store::RangeRequest;
use common_query::Output;
use common_telemetry::warn;
use common_telemetry::{debug, warn};
use itertools::Itertools;
use meta_client::client::MetaClient;
use rand::rng;
use rand::seq::SliceRandom;
use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{OptionExt, ResultExt};
use crate::batching_mode::task::BatchingTask;
use crate::batching_mode::{
DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, FRONTEND_ACTIVITY_TIMEOUT, GRPC_CONN_TIMEOUT,
GRPC_MAX_RETRIES,
};
use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu};
use crate::{Error, FlowAuthHeader};
use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD;
use crate::{Error, FlowAuthHeader, FlowId};
/// Just like [`GrpcQueryHandler`] but use BoxedError
///
@@ -74,6 +76,105 @@ impl<
type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>;
/// Statistics about running query on this frontend from flownode
#[derive(Debug, Default, Clone)]
struct FrontendStat {
/// The query for flow id has been running since this timestamp
since: HashMap<FlowId, Instant>,
/// The average query time for each flow id
/// This is used to calculate the average query time for each flow id
past_query_avg: HashMap<FlowId, (usize, Duration)>,
}
#[derive(Debug, Default, Clone)]
pub struct FrontendStats {
/// The statistics for each flow id
stats: Arc<Mutex<HashMap<String, FrontendStat>>>,
}
impl FrontendStats {
pub fn observe(&self, frontend_addr: &str, flow_id: FlowId) -> FrontendStatsGuard {
let mut stats = self.stats.lock().expect("Failed to lock frontend stats");
let stat = stats.entry(frontend_addr.to_string()).or_default();
stat.since.insert(flow_id, Instant::now());
FrontendStatsGuard {
stats: self.stats.clone(),
frontend_addr: frontend_addr.to_string(),
cur: flow_id,
}
}
/// return frontend addrs sorted by load, from lightest to heaviest
/// The load is calculated as the total average query time for each flow id plus running query's total running time elapsed
pub fn sort_by_load(&self) -> Vec<String> {
let stats = self.stats.lock().expect("Failed to lock frontend stats");
let fe_load_factor = stats
.iter()
.map(|(node_addr, stat)| {
// total expected avg running time for all currently running queries
let total_expect_avg_run_time = stat
.since
.keys()
.map(|f| {
let (count, total_duration) =
stat.past_query_avg.get(f).unwrap_or(&(0, Duration::ZERO));
if *count == 0 {
0.0
} else {
total_duration.as_secs_f64() / *count as f64
}
})
.sum::<f64>();
let total_cur_running_time = stat
.since
.values()
.map(|since| since.elapsed().as_secs_f64())
.sum::<f64>();
(
node_addr.to_string(),
total_expect_avg_run_time + total_cur_running_time,
)
})
.sorted_by(|(_, load_a), (_, load_b)| {
load_a
.partial_cmp(load_b)
.unwrap_or(std::cmp::Ordering::Equal)
})
.collect::<Vec<_>>();
debug!("Frontend load factor: {:?}", fe_load_factor);
for (node_addr, load) in &fe_load_factor {
METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD
.with_label_values(&[&node_addr.to_string()])
.observe(*load);
}
fe_load_factor
.into_iter()
.map(|(addr, _)| addr)
.collect::<Vec<_>>()
}
}
pub struct FrontendStatsGuard {
stats: Arc<Mutex<HashMap<String, FrontendStat>>>,
frontend_addr: String,
cur: FlowId,
}
impl Drop for FrontendStatsGuard {
fn drop(&mut self) {
let mut stats = self.stats.lock().expect("Failed to lock frontend stats");
if let Some(stat) = stats.get_mut(&self.frontend_addr) {
if let Some(since) = stat.since.remove(&self.cur) {
let elapsed = since.elapsed();
let (count, total_duration) = stat.past_query_avg.entry(self.cur).or_default();
*count += 1;
*total_duration += elapsed;
}
}
}
}
/// A simple frontend client able to execute sql using grpc protocol
///
/// This is for computation-heavy query which need to offload computation to frontend, lifting the load from flownode
@@ -83,6 +184,7 @@ pub enum FrontendClient {
meta_client: Arc<MetaClient>,
chnl_mgr: ChannelManager,
auth: Option<FlowAuthHeader>,
fe_stats: FrontendStats,
},
Standalone {
/// for the sake of simplicity still use grpc even in standalone mode
@@ -114,6 +216,7 @@ impl FrontendClient {
ChannelManager::with_config(cfg)
},
auth,
fe_stats: Default::default(),
}
}
@@ -192,6 +295,7 @@ impl FrontendClient {
meta_client: _,
chnl_mgr,
auth,
fe_stats,
} = self
else {
return UnexpectedSnafu {
@@ -208,8 +312,21 @@ impl FrontendClient {
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis() as i64;
// shuffle the frontends to avoid always pick the same one
frontends.shuffle(&mut rng());
let node_addrs_by_load = fe_stats.sort_by_load();
// index+1 to load order asc, so that the lightest node has load 1 and non-existent node has load 0
let addr2load = node_addrs_by_load
.iter()
.enumerate()
.map(|(i, id)| (id.clone(), i + 1))
.collect::<HashMap<_, _>>();
// sort frontends by load, from lightest to heaviest
frontends.sort_by(|(_, a), (_, b)| {
// if not even in stats, treat as 0 load since never been queried
let load_a = addr2load.get(&a.peer.addr).unwrap_or(&0);
let load_b = addr2load.get(&b.peer.addr).unwrap_or(&0);
load_a.cmp(load_b)
});
debug!("Frontend nodes sorted by load: {:?}", frontends);
// found node with maximum last_activity_ts
for (_, node_info) in frontends
@@ -257,6 +374,7 @@ impl FrontendClient {
create: CreateTableExpr,
catalog: &str,
schema: &str,
task: Option<&BatchingTask>,
) -> Result<u32, Error> {
self.handle(
Request::Ddl(api::v1::DdlRequest {
@@ -265,6 +383,7 @@ impl FrontendClient {
catalog,
schema,
&mut None,
task,
)
.await
}
@@ -276,15 +395,19 @@ impl FrontendClient {
catalog: &str,
schema: &str,
peer_desc: &mut Option<PeerDesc>,
task: Option<&BatchingTask>,
) -> Result<u32, Error> {
match self {
FrontendClient::Distributed { .. } => {
FrontendClient::Distributed { fe_stats, .. } => {
let db = self.get_random_active_frontend(catalog, schema).await?;
*peer_desc = Some(PeerDesc::Dist {
peer: db.peer.clone(),
});
let flow_id = task.map(|t| t.config.flow_id).unwrap_or_default();
let _guard = fe_stats.observe(&db.peer.addr, flow_id);
db.database
.handle_with_retry(req.clone(), GRPC_MAX_RETRIES)
.await

View File

@@ -30,6 +30,9 @@ use crate::batching_mode::task::BatchingTask;
use crate::batching_mode::time_window::TimeWindowExpr;
use crate::batching_mode::MIN_REFRESH_DURATION;
use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
use crate::metrics::{
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT,
};
use crate::{Error, FlowId};
/// The state of the [`BatchingTask`].
@@ -71,18 +74,33 @@ impl TaskState {
self.last_update_time = Instant::now();
}
/// wait for at least `last_query_duration`, at most `max_timeout` to start next query
/// Compute the next query delay based on the time window size or the last query duration.
/// Aiming to avoid too frequent queries. But also not too long delay.
/// The delay is computed as follows:
/// - If `time_window_size` is set, the delay is half the time window size, constrained to be
/// at least `last_query_duration` and at most `max_timeout`.
/// - If `time_window_size` is not set, the delay defaults to `last_query_duration`, constrained
/// to be at least `MIN_REFRESH_DURATION` and at most `max_timeout`.
///
/// if have more dirty time window, exec next query immediately
/// If there are dirty time windows, the function returns an immediate execution time to clean them.
/// TODO: Make this behavior configurable.
pub fn get_next_start_query_time(
&self,
flow_id: FlowId,
time_window_size: &Option<Duration>,
max_timeout: Option<Duration>,
) -> Instant {
let next_duration = max_timeout
let last_duration = max_timeout
.unwrap_or(self.last_query_duration)
.min(self.last_query_duration);
let next_duration = next_duration.max(MIN_REFRESH_DURATION);
.min(self.last_query_duration)
.max(MIN_REFRESH_DURATION);
let next_duration = time_window_size
.map(|t| {
let half = t / 2;
half.max(last_duration)
})
.unwrap_or(last_duration);
// if have dirty time window, execute immediately to clean dirty time window
if self.dirty_time_windows.windows.is_empty() {
@@ -112,10 +130,10 @@ impl DirtyTimeWindows {
/// Time window merge distance
///
/// TODO(discord9): make those configurable
const MERGE_DIST: i32 = 3;
pub const MERGE_DIST: i32 = 3;
/// Maximum number of filters allowed in a single query
const MAX_FILTER_NUM: usize = 20;
pub const MAX_FILTER_NUM: usize = 20;
/// Add lower bounds to the dirty time windows. Upper bounds are ignored.
///
@@ -139,11 +157,16 @@ impl DirtyTimeWindows {
}
/// Generate all filter expressions consuming all time windows
///
/// there is two limits:
/// - shouldn't return a too long time range(<=`window_size * window_cnt`), so that the query can be executed in a reasonable time
/// - shouldn't return too many time range exprs, so that the query can be parsed properly instead of causing parser to overflow
pub fn gen_filter_exprs(
&mut self,
col_name: &str,
expire_lower_bound: Option<Timestamp>,
window_size: chrono::Duration,
window_cnt: usize,
flow_id: FlowId,
task_ctx: Option<&BatchingTask>,
) -> Result<Option<datafusion_expr::Expr>, Error> {
@@ -181,12 +204,33 @@ impl DirtyTimeWindows {
}
}
// get the first `MAX_FILTER_NUM` time windows
let nth = self
.windows
.iter()
.nth(Self::MAX_FILTER_NUM)
.map(|(key, _)| *key);
// get the first `window_cnt` time windows
let max_time_range = window_size * window_cnt as i32;
let nth = {
let mut cur_time_range = chrono::Duration::zero();
let mut nth_key = None;
for (idx, (start, end)) in self.windows.iter().enumerate() {
// if time range is too long, stop
if cur_time_range > max_time_range {
nth_key = Some(*start);
break;
}
// if we have enough time windows, stop
if idx >= window_cnt {
nth_key = Some(*start);
break;
}
if let Some(end) = end {
if let Some(x) = end.sub(start) {
cur_time_range += x;
}
}
}
nth_key
};
let first_nth = {
if let Some(nth) = nth {
let mut after = self.windows.split_off(&nth);
@@ -198,6 +242,24 @@ impl DirtyTimeWindows {
}
};
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
.with_label_values(&[flow_id.to_string().as_str()])
.observe(first_nth.len() as f64);
let full_time_range = first_nth
.iter()
.fold(chrono::Duration::zero(), |acc, (start, end)| {
if let Some(end) = end {
acc + end.sub(start).unwrap_or(chrono::Duration::zero())
} else {
acc
}
})
.num_seconds() as f64;
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE
.with_label_values(&[flow_id.to_string().as_str()])
.observe(full_time_range);
let mut expr_lst = vec![];
for (start, end) in first_nth.into_iter() {
// align using time window exprs
@@ -259,6 +321,8 @@ impl DirtyTimeWindows {
}
/// Merge time windows that overlaps or get too close
///
/// TODO(discord9): not merge and prefer to send smaller time windows? how?
pub fn merge_dirty_time_windows(
&mut self,
window_size: chrono::Duration,
@@ -457,7 +521,14 @@ mod test {
.unwrap();
assert_eq!(expected, dirty.windows);
let filter_expr = dirty
.gen_filter_exprs("ts", expire_lower_bound, window_size, 0, None)
.gen_filter_exprs(
"ts",
expire_lower_bound,
window_size,
DirtyTimeWindows::MAX_FILTER_NUM,
0,
None,
)
.unwrap();
let unparser = datafusion::sql::unparser::Unparser::default();

Some files were not shown because too many files have changed in this diff Show More