Compare commits

...

82 Commits

Author SHA1 Message Date
Lei, HUANG
7e79b4b2f6 poc/create-alter-for-metrics:
### Commit Message

 Enhance Prometheus Bulk Write Handling

 - **`server.rs`**: Introduced `start_background_task` in `PromBulkState` to handle asynchronous batch processing and SST file writing. Added a new `tx` field to manage task communication.
 - **`access_layer.rs`**: Added `file_id` method to `ParquetWriter` for file identification.
 - **`batch_builder.rs`**: Modified `MetricsBatchBuilder` to utilize session catalog and schema, and updated batch processing logic to handle column metadata.
 - **`prom_store.rs`**: Updated `remote_write` to use `decode_remote_write_request_to_batch` for batch processing and send data to the background task.
 - **`prom_row_builder.rs`**: Made `TableBuilder` and `TablesBuilder` fields public for external access.
 - **`proto.rs`**: Exposed `table_data` in `PromWriteRequest` for batch processing.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-30 08:27:26 +00:00
Lei, HUANG
4ad40af468 poc/create-alter-for-metrics:
- **Refactor `BatchEncoder` and `Columns`**:
   - Introduced `Columns` and `ColumnsBuilder` to manage primary key, timestamp, and value data more efficiently.
   - Updated `BatchEncoder` to utilize `ColumnsBuilder` for handling multiple record batches.
   - Modified `finish` method to return a vector of record batches instead of a single optional batch.
   - Added methods for estimating size and counting total rows in `BatchEncoder`.

 - **Enhance `MetricsBatchBuilder`**:
   - Changed the structure to store multiple record batches per region in `MetricsBatchBuilder`.

 - **Update `TablesBuilder` in `prom_row_builder.rs`**:
   - Modified to handle multiple record batches per region and collect file metadata for logging.

 - **Remove unnecessary logging in `prom_store.rs`**:
   - Removed the "Use bulk mode" log statement.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-29 14:07:57 +00:00
Lei, HUANG
e4b048e788 poc/create-alter-for-metrics:
### Commit Message

 Enhance Schema Handling and Add Telemetry Logging

 - **`access_layer.rs`**: Refactored to use `physical_schema` for schema creation and improved error handling with telemetry logging for batch writing.
 - **`batch_builder.rs`**: Introduced `physical_schema` function for schema creation and updated data structures to use `RegionId` for physical tables. Removed redundant schema function.
 - **`prom_store.rs`**: Added telemetry logging to track bulk mode usage and processing time.
 - **`prom_row_builder.rs`**: Implemented telemetry logging to measure elapsed time for key operations like table creation, metadata collection, and batch appending.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-28 09:58:33 +00:00
Lei, HUANG
ecbf372de3 poc/create-alter-for-metrics:
### Add Object Store Integration and Access Layer Factory

 - **Cargo.lock, Cargo.toml**: Added `object-store` as a dependency to integrate object storage capabilities.
 - **frontend.rs, instance.rs, builder.rs, server.rs**: Introduced `ObjectStoreConfig` and `AccessLayerFactory` to manage object storage configurations and access layers.
 - **access_layer.rs**: Made `AccessLayerFactory` clonable and its constructor public to facilitate object store access layer creation.
 - **prom_store.rs**: Updated `PromBulkState` to include `AccessLayerFactory` and modified `remote_write` to utilize the access layer for processing requests.
 - **lib.rs**: Made `access_layer` module public to expose access layer functionalities.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-27 08:50:25 +00:00
Lei, HUANG
3d81a17360 poc/create-alter-for-metrics:
### Commit Message

 Enhance Schema and Table Handling in MetricsBatchBuilder

 - **`access_layer.rs`**: Made `create_sst_writer` function public within the crate to facilitate SST writing.
 - **`batch_builder.rs`**: Updated `MetricsBatchBuilder` to handle builders as a nested `HashMap` for schemas and logical table names. Modified `finish` method to return record batches grouped by schema and logical table name.
 - **`prom_row_builder.rs`**: Renamed `as_record_batch` to `as_record_batches` and implemented logic to write record batches using `AccessLayerFactory`.
 - **`proto.rs`**: Updated method call from `as_record_batch` to `as_record_batches` to reflect the new method name.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-26 13:00:54 +00:00
Lei, HUANG
025cae3679 poc/create-alter-for-metrics:
- **Refactor `AccessLayerFactory`**: Updated the method for joining paths in `access_layer.rs` by replacing `join_dir` with `join_path` for file path construction.
 - **Enhance Testing**: Added comprehensive tests in `access_layer.rs` to verify the functionality of the Parquet writer, including writing multiple batches and handling provided timestamp ranges.
 - **Optimize `MetricsBatchBuilder`**: Simplified the loop in `batch_builder.rs` by removing unnecessary mutability in the encoder variable.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-26 11:31:29 +00:00
Lei, HUANG
68409e28ea poc/create-alter-for-metrics:
### Add Parquet Writer and Access Layer

 - **`Cargo.lock`, `Cargo.toml`**: Updated dependencies to include `parquet`, `object-store`, and `common-datasource`.
 - **`parquet_writer.rs`**: Introduced a new module for writing Parquet files asynchronously using `AsyncWriter`.
 - **`access_layer.rs`**: Added a new access layer for managing Parquet file writing, including `AccessLayerFactory` and `ParquetWriter` for handling record batches and metadata.

 ### Enhance Batch Builder

 - **`batch_builder.rs`**: Enhanced `BatchEncoder` to handle timestamp ranges and return optional record batches with timestamp metadata.

 ### Update Error Handling

 - **`error.rs`**: Added new error variants for handling object store and Parquet operations.

 ### Modify Mito2 Parquet Constants

 - **`parquet.rs`**: Changed visibility of `DEFAULT_ROW_GROUP_SIZE` to public for broader access.

 ### Add Tests

 - **`access_layer.rs`**: Added basic tests for building data region directories.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-26 09:17:53 +00:00
evenyag
699406ae32 refactor: update MetricsBatchBuilder to use physical region metadata
- Update append_rows_to_batch to accept physical_region_metadata parameter
- Refactor metadata lookup to use HashMap<String, HashMap<String, (TableId, RegionMetadataRef)>>
- Fix collect_physical_region_metadata call in TablesBuilder to extract logical table info
- Remove unused TableName import from batch_builder.rs

Signed-off-by: evenyag <realevenyag@gmail.com>
2025-06-25 16:44:41 +00:00
evenyag
344006deca feat: Implements collect_physical_region_metadata
Add partition_manager and node_manager to PromBulkState and PromBulkContext

Signed-off-by: evenyag <realevenyag@gmail.com>
2025-06-25 16:44:41 +00:00
Lei, HUANG
63803f2b43 poc/create-alter-for-metrics:
### Commit Message

 Enhance `MetricsBatchBuilder` and `TablesBuilder` functionalities

 - **`batch_builder.rs`**:
   - Made `collect_physical_region_metadata` and `append_rows_to_batch` methods public to allow external access.
   - Implemented the `finish` method to complete record batch building and return batches grouped by physical table ID.

 - **`prom_row_builder.rs`**:
   - Updated `TablesBuilder::build` to utilize `MetricsBatchBuilder` for creating or altering physical tables and appending rows.
   - Integrated collection of physical region metadata and finalized record batch processing.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-25 16:44:40 +00:00
Lei, HUANG
cf62767b98 poc/create-alter-for-metrics:
### Commit Message

 Enhance error handling in `batch_builder.rs` and `error.rs`

 - **`batch_builder.rs`**:
   - Replaced `todo!` with specific error handling for invalid timestamp and field value types using `InvalidTimestampValueTypeSnafu` and `InvalidFieldValueTypeSnafu`.

 - **`error.rs`**:
   - Added new error variants `InvalidTimestampValueType` and `InvalidFieldValueType` with descriptive messages.
   - Updated `status_code` method to handle new error types with `StatusCode::Unexpected`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-25 16:44:40 +00:00
Lei, HUANG
4e53c1531d poc/create-alter-for-metrics:
### Commit Message

 Enhance batch sorting in `batch_builder.rs`

 - Implement sorting of batches by primary key using `compute::sort_to_indices`.
 - Update `op_type` and `sequence` to use `Arc` for consistency.
 - Apply sorting to `value`, `timestamp`, and primary key arrays using `compute::take`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-25 16:44:40 +00:00
Lei, HUANG
892cb66c53 poc/create-alter-for-metrics:
- **Add `ahash` Dependency**: Updated `Cargo.lock` and `Cargo.toml` to include `ahash` as a dependency for the `metric-engine` project.
 - **Refactor `MetricsBatchBuilder`**: Modified `MetricsBatchBuilder` in `batch_builder.rs` to include a `builders` field and refactored methods to use `BatchEncoder` for appending rows.
 - **Update `BatchEncoder`**: Changed `BatchEncoder` in `batch_builder.rs` to use a `name_to_id` map and added `append_rows` and `finish` methods for handling row encoding.
 - **Modify `LogicalSchemas` Structure**: Updated `schema_helper.rs` to use `HashMap` instead of `ahash::HashMap` for the `schemas` field in `LogicalSchemas`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-25 16:44:39 +00:00
Lei, HUANG
8b392477c8 poc/create-alter-for-metrics:
### Commit Summary

 - **Add New Dependencies**: Updated `Cargo.lock` and `Cargo.toml` to include `metric-engine` and `mito-codec` dependencies.
 - **Enhance `MetricEngineInner`**: Modified `put.rs` to use `logical_table_id` instead of `table_id` and adjusted method calls accordingly.
 - **Expose Structs and Methods**: Made `RowModifier`, `RowsIter`, and `RowIter` structs and their methods public in `row_modifier.rs`.
 - **Implement Batch Processing**: Added batch processing logic in `batch_builder.rs` to handle row conversion to record batches with primary key encoding.
 - **Error Handling**: Introduced `EncodePrimaryKey` error variant in `error.rs` for handling primary key encoding errors.
 - **Clone Support for `TableBuilder`**: Added `Clone` trait to `TableBuilder` in `prom_row_builder.rs`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-25 16:44:39 +00:00
evenyag
905593dc16 feat: add bulk mode flag for prom store with SchemaHelper integration
Add a new bulk_mode flag to PromStoreOptions that enables bulk processing
for prometheus metrics ingestion. When enabled, initializes PromBulkState
with a SchemaHelper instance for efficient schema management.

Changes:
- Add bulk_mode field to PromStoreOptions (default: false)
- Add create_schema_helper() method to Instance for SchemaHelper construction
- Add getter methods to StatementExecutor for procedure_executor and cache_invalidator
- Update server initialization to conditionally create PromBulkState when bulk_mode is enabled
- Fix clippy warnings in schema_helper.rs

Signed-off-by: evenyag <realevenyag@gmail.com>
2025-06-25 16:44:38 +00:00
evenyag
6c04cb9b19 feat: use schema_helper to create/alter physical table
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-06-25 16:44:38 +00:00
Lei, HUANG
24da3367c1 poc/create-alter-for-metrics:
- **Add `operator` dependency**: Updated `Cargo.lock` and `Cargo.toml` to include the `operator` dependency.
 - **Expose structs and functions in `schema_helper.rs`**: Made `LogicalSchema`, `LogicalSchemas`, and `ensure_logical_tables_for_metrics` public in `src/operator/src/schema_helper.rs`.
 - **Refactor `batch_builder.rs`**:
   - Changed the logic for handling physical and logical tables, including the introduction of `tags_to_logical_schemas` function.
   - Modified `determine_physical_table_name` to return a string instead of a table reference.
   - Updated logic for managing tags and logical schemas in `MetricsBatchBuilder`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-25 16:44:38 +00:00
Lei, HUANG
80b14965a6 feat/frontend-stager:
### Commit Summary

 - **Enhancements in `MetricsBatchBuilder`:**
   - Added support for session catalog and schema in `create_or_alter_physical_tables`.
   - Introduced `physical_tables` mapping for logical to physical table references.
   - Implemented `rows_to_batch` to build `RecordBatch` from rows with primary key encoded.

 - **Function Modifications:**
   - Updated `determine_physical_table` to use session catalog and schema.
   - Modified `build_create_table_expr` and `build_alter_table_expr` to include additional parameters for schema compatibility.

 - **Error Handling:**
   - Added error handling for missing physical tables in `rows_to_batch`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-25 16:44:37 +00:00
Lei, HUANG
5da3f86d0c feat/frontend-stager:
### Add `MetricsBatchBuilder` for Physical Table Management

 - **New Feature**: Introduced `MetricsBatchBuilder` in `batch_builder.rs` to manage the creation and alteration of physical tables based on logical table tags.
 - **Error Handling**: Added `CommonMeta` error variant in `error.rs` to handle errors from `common_meta`.
 - **Enhancements**:
   - Added `tags` method in `prom_row_builder.rs` to retrieve tag names from `TableBuilder`.
   - Implemented `primary_key_names` method in `metadata.rs` to return primary key names from `TableMeta`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-25 16:44:37 +00:00
evenyag
151273d1df feat: get region metadata by ids
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-06-25 16:44:36 +00:00
evenyag
b0289dbdde refactor: extract logics for filling options
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-06-25 16:44:36 +00:00
evenyag
c51730a954 feat: create or alter logical tables for metrics
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-06-25 16:44:36 +00:00
evenyag
207709c727 refactor: remove Inserter::create_physical_table_on_demand
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-06-25 16:44:35 +00:00
evenyag
deca8c44fa refactor: StatementExecutor calls SchemaHelper to change schema
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-06-25 16:44:35 +00:00
evenyag
2edd861ce9 refactor: use SchemaHelper in Inserter
Remove the dependency to StatementExecutor from Inserter

Signed-off-by: evenyag <realevenyag@gmail.com>
2025-06-25 16:44:34 +00:00
evenyag
14f3a4ab05 refactor: copy alter logic to SchemaHelper
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-06-25 16:44:34 +00:00
evenyag
34875c0346 refactor: define schema_helper mod to handle schema creation
It has the same logic as Inserter and StatementExecutor

Signed-off-by: evenyag <realevenyag@gmail.com>
2025-06-25 16:44:34 +00:00
Lei, HUANG
1d07864b29 refactor(object-store): move backends building functions back to object-store (#6400)
refactor/building-backend-in-object-store:
 ### Refactor Object Store Configuration

 - **Centralize Object Store Configurations**: Moved object store configurations (`FileConfig`, `S3Config`, `OssConfig`, `AzblobConfig`, `GcsConfig`) to `object-store/src/config.rs`.
 - **Error Handling Enhancements**: Introduced `object-store/src/error.rs` for improved error handling related to object store operations.
 - **Factory Pattern for Object Store**: Implemented `object-store/src/factory.rs` to create object store instances, consolidating logic from `datanode/src/store.rs`.
 - **Remove Redundant Store Implementations**: Deleted individual store files (`azblob.rs`, `fs.rs`, `gcs.rs`, `oss.rs`, `s3.rs`) from `datanode/src/store/`.
 - **Update Usage of Object Store Config**: Updated references to `ObjectStoreConfig` in `datanode.rs`, `standalone.rs`, `config.rs`, and `error.rs` to use the new centralized configuration.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-25 13:49:55 +00:00
ZonaHe
9be75361a4 feat: update dashboard to v0.10.1 (#6396)
Co-authored-by: ZonaHex <ZonaHex@users.noreply.github.com>
2025-06-25 12:58:04 +00:00
Ning Sun
9c1df68a5f feat: introduce /v1/health for healthcheck from external (#6388)
Signed-off-by: Ning Sun <sunning@greptime.com>
2025-06-25 12:25:36 +00:00
fys
0209461155 chore: add components for standalone instance (#6383) 2025-06-25 12:17:34 +00:00
Ruihang Xia
e728cb33fb feat: implement count_hash aggr function (#6342)
* feat: implement count_hash aggr function

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* sqlness case

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* change copyright year

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* review changes

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-06-25 12:14:03 +00:00
fys
cde7e11983 refactor: avoid adding feature to parameter (#6391)
* refactor: avoid adding feature to parameter

* avoid `cfg(not(feature = ...))` block
2025-06-25 10:47:20 +00:00
dennis zhuang
944b4b3e49 feat: supports CsvWithNames and CsvWithNamesAndTypes formats (#6384)
* feat: supports CsvWithNames and CsvWithNamesAndTypes formats and object/array types

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* test: added and fixed tests

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: fix test

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: remove comments

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* test: add json type csv tests

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: remove comment

Co-authored-by: Yingwen <realevenyag@gmail.com>

---------

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
2025-06-25 07:28:11 +00:00
fys
7953b090c0 feat: support syntax parsing of drop trigger (#6371)
* feat: trigger drop

* Update Cargo.toml

* Update Cargo.lock

---------

Co-authored-by: Ning Sun <classicning@gmail.com>
2025-06-24 18:48:39 +00:00
liyang
7aa9af5ba6 chore: clarify default OTLP endpoint value (#6381)
Signed-off-by: liyang <daviderli614@gmail.com>
2025-06-24 11:44:45 +00:00
Ruihang Xia
7a9444c85b refactor: remove staled manifest structures (#6382)
* refactor: remove staled manifest structures

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Update src/store-api/src/lib.rs

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-06-24 09:23:10 +00:00
LFC
bb12be3310 refactor: scan Batches directly (#6369)
* refactor: scan `Batch`es directly

Signed-off-by: luofucong <luofc@foxmail.com>

* fix ci

Signed-off-by: luofucong <luofc@foxmail.com>

* resolve PR comments

Signed-off-by: luofucong <luofc@foxmail.com>

* resolve PR comments

Signed-off-by: luofucong <luofc@foxmail.com>

---------

Signed-off-by: luofucong <luofc@foxmail.com>
2025-06-24 07:55:49 +00:00
Weny Xu
24019334ee feat: implement automatic region failure detector registrations (#6370)
* feat: implement automatic region failure detector registrations

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: remove unused error

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: add more tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: add `region_failure_detector_initialization_delay` option

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: update config.md

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: update config.md

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-06-24 06:12:12 +00:00
codephage
116d5cf82b feat: support mysql flavor show processlist shortcut (#6328) (#6379)
* feat: support mysql flavor show processlist shortcut (#6328)

Signed-off-by: codephage. <381510760@qq.com>

* Refactor SHOW PROCESSLIST handling and add tests

Signed-off-by: codephage. <381510760@qq.com>

* add sqlness test

Signed-off-by: codephage. <381510760@qq.com>

* add sqlness test result

Signed-off-by: codephage. <381510760@qq.com>

* fix sqlness test show_processList

Signed-off-by: codephage. <381510760@qq.com>

---------

Signed-off-by: codephage. <381510760@qq.com>
2025-06-24 03:50:16 +00:00
zyy17
90a3894564 refactor: always write parent_span_id for otlp traces ingestion (#6356)
* refactor: always write `parent_span_id` for otlp traces ingestion

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* refactor: support to write None value in row writer

Signed-off-by: zyy17 <zyylsxm@gmail.com>

---------

Signed-off-by: zyy17 <zyylsxm@gmail.com>
2025-06-23 11:36:57 +00:00
Yingwen
39d3e0651d feat: Support ListMetadataRequest to retrieve regions' metadata (#6348)
* feat: support list metadata in region server

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: add test for list region metadata

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: return null if region not exists

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: update greptime-proto

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
2025-06-23 07:11:20 +00:00
zyy17
a49edc6ca6 refactor: add otlp_export_protocol config to support export trace data through gRPC and HTTP protocol (#6357)
* refactor: support http traces

* refactor: add `otlp_export_protocol` config to support export trace data through gRPC and HTTP protocol

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* Update src/common/telemetry/src/logging.rs

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

---------

Signed-off-by: zyy17 <zyylsxm@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-06-23 02:56:12 +00:00
shuiyisong
7cd6be41ce feat(pipeline): introduce pipeline doc version 2 for combine-transform (#6360)
* chore: init commit of pipeline doc version v2

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: remove unused code

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: remove unused code

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: add test

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: add test

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: compatible with v1 to remain field in the map during transform

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* refactor: pipeline.exec_mut

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* fix: typo

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: change from v2 to 2 in version setting

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
2025-06-22 00:58:36 +00:00
ZonaHe
15616d0c43 feat: update dashboard to v0.10.0 (#6368)
Co-authored-by: ZonaHex <ZonaHex@users.noreply.github.com>
2025-06-20 23:48:35 +00:00
dennis zhuang
b43e315c67 fix: test test_tls_file_change_watch (#6366)
* fix: test test_tls_file_change_watch

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: cert_path

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: test

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: revert times

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: debug

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: times

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: remove assertions

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: use inspect_err

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

---------

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
2025-06-20 16:57:07 +00:00
Yingwen
36ab1ceef7 chore: prints a warning when skip_ssl_validation is true (#6367)
chore: warn when skip_ssl_validation is true

We already log all configs when a node starts.

Signed-off-by: evenyag <realevenyag@gmail.com>
2025-06-20 13:14:50 +00:00
Weny Xu
3fb1b726c6 refactor(cli): simplify metadata command parameters (#6364)
Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-06-20 09:00:21 +00:00
Olexandr88
c423bb31fe docs: added YouTube link to documentation (#6362)
Update README.md
2025-06-20 08:16:54 +00:00
rgidda
e026f766d2 feat(storage): Add skip_ssl_validation option for object storage HTTP client (#6358)
* feat(storage): Add skip_ssl_validation option for object storage HTTP client

Signed-off-by: rgidda <rgidda@hitachivantara.com>

* fix(test): Broken test case for - Add skip_ssl_validation option for object storage HTTP client

Signed-off-by: rgidda <rgidda@hitachivantara.com>

* fix: test

* fix: test

---------

Signed-off-by: rgidda <rgidda@hitachivantara.com>
Co-authored-by: rgidda <rgidda@hitachivantara.com>
Co-authored-by: dennis zhuang <killme2008@gmail.com>
2025-06-20 08:08:19 +00:00
discord9
9d08f2532a feat: dist auto step aggr pushdown (#6268)
* wip: steppable aggr fn

Signed-off-by: discord9 <discord9@163.com>

* poc: step aggr query

Signed-off-by: discord9 <discord9@163.com>

* feat: mvp poc stuff

Signed-off-by: discord9 <discord9@163.com>

* test: sqlness

Signed-off-by: discord9 <discord9@163.com>

* chore: import missing

Signed-off-by: discord9 <discord9@163.com>

* feat: support first/last_value

Signed-off-by: discord9 <discord9@163.com>

* fix: check also include first/last value

Signed-off-by: discord9 <discord9@163.com>

* chore: clean up after rebase

Signed-off-by: discord9 <discord9@163.com>

* feat: optimize yes!

Signed-off-by: discord9 <discord9@163.com>

* fix: alias qualifled

Signed-off-by: discord9 <discord9@163.com>

* test: more testcases

Signed-off-by: discord9 <discord9@163.com>

* chore: qualified column

Signed-off-by: discord9 <discord9@163.com>

* chore: per review

Signed-off-by: discord9 <discord9@163.com>

* fix: case when can push down

Signed-off-by: discord9 <discord9@163.com>

* feat: udd/hll_merge is also the same

Signed-off-by: discord9 <discord9@163.com>

* fix: udd/hll_merge args

Signed-off-by: discord9 <discord9@163.com>

* tests: fix sqlness

Signed-off-by: discord9 <discord9@163.com>

* tests: fix sqlness

Signed-off-by: discord9 <discord9@163.com>

* fix: udd/hll merge steppable

Signed-off-by: discord9 <discord9@163.com>

* chore: per review

Signed-off-by: discord9 <discord9@163.com>

* test: REDACTED

Signed-off-by: discord9 <discord9@163.com>

* refactor: per review

Signed-off-by: discord9 <discord9@163.com>

* refactor: more formal transform action

Signed-off-by: discord9 <discord9@163.com>

* feat: support modify child plan too

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
2025-06-20 07:18:55 +00:00
LFC
e072726ea8 refactor: make scanner creation async (#6349)
* refactor: make scanner creation async

Signed-off-by: luofucong <luofc@foxmail.com>

* resolve PR comments

Signed-off-by: luofucong <luofc@foxmail.com>

---------

Signed-off-by: luofucong <luofc@foxmail.com>
2025-06-20 06:44:49 +00:00
Ning Sun
e78c3e1eaa refactor: make metadata region option opt-in (#6350)
* refactor: make metadata region option opt-in

Signed-off-by: Ning Sun <sunning@greptime.com>

* fix: preserve wal_options for metadata region

Signed-off-by: Ning Sun <sunning@greptime.com>

* Update src/metric-engine/src/engine/create.rs

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ning Sun <sunning@greptime.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
2025-06-20 03:31:16 +00:00
Weny Xu
89e3c8edab fix(meta): enhance mysql election client with timeouts and reconnection (#6341)
* fix(meta): enhance mysql election client with timeouts and reconnection

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: improve MySQL election client lease management and error handling

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: adjust timeout configurations for election clients

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: remove unused error

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: fix unit test

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-06-19 12:44:23 +00:00
fys
d4826b998d feat: support execute sql in frontend_client (#6355)
* feat: support execute sql in frontend_client

* chore: remove unnecessary clone

* add components for flownode instance

* add feature gate for component

* fix: enterprise feature
2025-06-19 09:47:16 +00:00
Weny Xu
d9faa5c801 feat(cli): add metadata del commands (#6339)
* feat: introduce cli for deleting metadata

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor(cli): flatten metadata command structure

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: add alias

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor(meta): implement logical deletion for CLI tool

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-06-19 06:48:00 +00:00
Ning Sun
12c3a3205b chore: security updates (#6351) 2025-06-19 06:43:43 +00:00
Weny Xu
5231505021 fix(metric-engine): properly propagate errors during batch open operation (#6325)
* fix(metric-engine): properly propagate errors during batch open operation

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: add comments

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-06-19 06:37:54 +00:00
Lei, HUANG
6ece560f8c fix: reordered write cause incorrect kv (#6345)
* fix/reordered-write-cause-incorrect-kv:
 - **Enhance Testing in `partition_tree.rs`**: Added comprehensive test functions such as `kv_region_metadata`, `key_values`, and `collect_kvs` to improve the robustness of key-value operations and ensure correct behavior of the `PartitionTreeMemtable`.
 - **Improve Key Handling in `dict.rs`**: Modified `KeyDictBuilder` to handle both full and sparse keys, ensuring correct mapping and insertion. Added a new test `test_builder_finish_with_sparse_key` to validate the handling of sparse keys.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix/reordered-write-cause-incorrect-kv:
 ### Refactor `partition_tree.rs` for Improved Key Handling

 - **Refactored Key Handling**: Simplified the `key_values` function to accept an iterator of keys, removing hardcoded key-value pairs. This change enhances flexibility and reduces redundancy in key management.
 - **Updated Test Cases**: Modified test cases to use the new `key_values` function signature, ensuring they iterate over keys dynamically rather than relying on predefined lists.

 Files affected:
 - `src/mito2/src/memtable/partition_tree.rs`

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix/reordered-write-cause-incorrect-kv:
 Enhance Testing in `partition_tree.rs`

 - Added assertions to verify key-value collection after `memtable` and `forked` operations.
 - Refactored key-value writing logic for clarity in `forked` operations.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-19 06:32:40 +00:00
fys
2ab08a8f93 chore(deps): switch greptime-proto to official repository (#6347) 2025-06-18 12:52:46 +00:00
Lei, HUANG
086ae9cdcd chore: print series count after wal replay (#6344)
* chore/print-series-count-after-wal-replay:
 ### Add Series Count Functionality and Logging Enhancements

 - **`time_partition.rs`**: Introduced `series_count` method to calculate the total timeseries count across all time partitions.
 - **`opener.rs`**: Enhanced logging to include the total timeseries replayed during WAL replay.
 - **`version.rs`**: Added `series_count` method to `VersionControlData` for approximating timeseries count in the current version.
 - **`handler.rs`**: Added entry and exit logging for the `sql` function to trace execution flow.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* chore/print-series-count-after-wal-replay:
 ### Remove Unused Import

 - **File Modified**: `src/servers/src/http/handler.rs`
 - **Change Summary**: Removed the unused `info` import from `common_telemetry`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-18 12:04:39 +00:00
LFC
6da8e00243 refactor: make finding leader in metasrv client dynamic (#6343)
Signed-off-by: luofucong <luofc@foxmail.com>
2025-06-18 11:29:23 +00:00
Arshdeep
4b04c402b6 fix: add path exist check in copy_table_from (#6182) (#6300)
Signed-off-by: Arshdeep54 <balarsh535@gmail.com>
2025-06-18 09:50:27 +00:00
Lei, HUANG
a59b6c36d2 chore: add metrics for active series and field builders (#6332)
* chore/series-metrics:
 ### Add Metrics for Active Series and Values in Memtable

 - **`simple_bulk_memtable.rs`**: Implemented `Drop` trait for `SimpleBulkMemtable` to decrement `MEMTABLE_ACTIVE_SERIES_COUNT` and `MEMTABLE_ACTIVE_VALUES_COUNT` upon dropping.
 - **`time_series.rs`**:
   - Introduced `SeriesMap` with `Drop` implementation to manage active series and values count.
   - Updated `SeriesSet` and `Iter` to use `SeriesMap`.
   - Added `num_values` method in `Series` to calculate the number of values.
 - **`metrics.rs`**: Added `MEMTABLE_ACTIVE_SERIES_COUNT` and `MEMTABLE_ACTIVE_VALUES_COUNT` metrics to track active series and values in `TimeSeriesMemtable`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* chore/series-metrics:
- Add metrics for active series and field builders
- Update dashboard

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* chore/series-metrics:
 **Add Series Count Tracking in Memtables**

 - **`flush.rs`**: Updated `RegionFlushTask` to track and log the series count during memtable flush operations.
 - **`memtable.rs`**: Introduced `series_count` in `MemtableStats` and added a method to retrieve it.
 - **`partition_tree.rs`, `partition.rs`, `tree.rs`**: Implemented series count calculation in `PartitionTreeMemtable` and its components.
 - **`simple_bulk_memtable.rs`, `time_series.rs`**: Integrated series count tracking in `SimpleBulkMemtable` and `TimeSeriesMemtable` implementations.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* Update src/mito2/src/memtable.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
2025-06-18 09:16:45 +00:00
zyy17
f6ce6fe385 fix(jaeger-api): incorrect find_traces() logic and multiple api compatible issues (#6293)
* fix: use `limit` params in jaeger http

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* refactor: only parse `max_duration` and `min_duration` when it's not empty

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* fix: handle the input for empty `limit` string

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* fix: missing the fileter for `service_name`

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* test: fix ci errors

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* fix: incorrect behavior of find_traces

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* fix: the logic of `find_traces()`

The correct logic should be:

1. Get all trace ids that match the filters;

2. Get all traces that match the trace ids from the previous query;

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* fix: integration test errors

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* refactor: add `empty_string_as_none`

Signed-off-by: zyy17 <zyylsxm@gmail.com>

* refactor: refine naming

Signed-off-by: zyy17 <zyylsxm@gmail.com>

---------

Signed-off-by: zyy17 <zyylsxm@gmail.com>
2025-06-18 08:01:36 +00:00
Weny Xu
4d4bfb7d8b fix(metric): prevent setting memtable type for metadata region (#6340)
Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-06-18 03:49:22 +00:00
Ruihang Xia
6e1e8f19e6 feat: support setting FORMAT in TQL ANALYZE/VERBOSE (#6327)
* feat: support setting FORMAT in TQL ANALYZE/VERBOSE

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update sqlness result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-06-18 03:39:12 +00:00
Weny Xu
49cb4da6d2 feat: introduce CLI tool for repairing logical table metadata (#6322)
* feat: introduce logical table metadata repair cli tool

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: deps

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: flatten doctor module structure

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-06-17 13:53:59 +00:00
Lei, HUANG
0d0236ddab fix: revert string builder initial capacity in TimeSeriesMemtable (#6330)
fix/revert-string-builder-initial-capacity:
 ### Update `time_series.rs` Memory Allocation

 - **Reduced StringBuilder Capacity**: Adjusted the initial capacity of `StringBuilder` in `ValueBuilder` from `(256, 4096)` to `(4, 8)` to optimize memory usage in `time_series.rs`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
2025-06-17 13:24:52 +00:00
Lei, HUANG
f8edb53b30 fix: carry process id in query ctx (#6335)
fix/carry-process-id-in-query-ctx:
 ### Commit Message

 Enhance query context handling in `instance.rs`

 - Updated `Instance` implementation to include `process_id` in query context initialization, improving traceability and debugging capabilities.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-17 12:04:23 +00:00
Lin Yihai
438791b3e4 feat: Add DROP DEFAULT (#6290)
* feat: Add `DROP DEFAULT`

Signed-off-by: Yihai Lin <yihai-lin@foxmail.com>

* chore: use `next_token`

Signed-off-by: Yihai Lin <yihai-lin@foxmail.com>

---------

Signed-off-by: Yihai Lin <yihai-lin@foxmail.com>
2025-06-17 09:33:56 +00:00
discord9
50e4c916e7 chore: clean up unused impl &standalone use mark dirty (#6331)
Signed-off-by: discord9 <discord9@163.com>
2025-06-17 08:18:17 +00:00
Ruihang Xia
16e7f7b64b fix: override logical table's partition column with physical table's (#6326)
* fix: override logical table's partition column with physical table's

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add more sqlness test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* naming

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-06-17 08:00:54 +00:00
localhost
53c4fd478e chore: add skip error for pipeline skip error log (#6318)
* chore: wip

Signed-off-by: paomian <xpaomian@gmail.com>

* chore: add skip error for pipeline skip error log

Signed-off-by: paomian <xpaomian@gmail.com>

* chore: add test and check timestamp must be non null

Signed-off-by: paomian <xpaomian@gmail.com>

* chore: fix test

* chore: fix by pr comment

* fix: typo

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

---------

Signed-off-by: paomian <xpaomian@gmail.com>
Co-authored-by: dennis zhuang <killme2008@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-06-17 06:44:11 +00:00
Lei, HUANG
ecbbd2fbdb feat: handle Ctrl-C command in MySQL client (#6320)
* feat/answer-ctrl-c-in-mysql:
 ## Implement Connection ID-based Query Killing

 ### Key Changes:
 - **Connection ID Management:**
   - Added `connection_id` to `Session` and `QueryContext` in `src/session/src/lib.rs` and `src/session/src/context.rs`.
   - Updated `MysqlInstanceShim` and `MysqlServer` to handle `connection_id` in `src/servers/src/mysql/handler.rs` and `src/servers/src/mysql/server.rs`.

 - **KILL Statement Enhancements:**
   - Introduced `Kill` enum to handle both `ProcessId` and `ConnectionId` in `src/sql/src/statements/kill.rs`.
   - Updated `ParserContext` to parse `KILL QUERY <connection_id>` in `src/sql/src/parser.rs`.
   - Modified `StatementExecutor` to support killing queries by `connection_id` in `src/operator/src/statement/kill.rs`.

 - **Process Management:**
   - Refactored `ProcessManager` to include `connection_id` in `src/catalog/src/process_manager.rs`.
   - Added `kill_local_process` method for local query termination.

 - **Testing:**
   - Added tests for `KILL` statement parsing and execution in `src/sql/src/parser.rs`.

 ### Affected Files:
 - `Cargo.lock`, `Cargo.toml`
 - `src/catalog/src/process_manager.rs`
 - `src/frontend/src/instance.rs`
 - `src/frontend/src/stream_wrapper.rs`
 - `src/operator/src/statement.rs`
 - `src/operator/src/statement/kill.rs`
 - `src/servers/src/mysql/federated.rs`
 - `src/servers/src/mysql/handler.rs`
 - `src/servers/src/mysql/server.rs`
 - `src/servers/src/postgres.rs`
 - `src/session/src/context.rs`
 - `src/session/src/lib.rs`
 - `src/sql/src/parser.rs`
 - `src/sql/src/statements.rs`
 - `src/sql/src/statements/kill.rs`
 - `src/sql/src/statements/statement.rs`

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

 Conflicts:
	Cargo.lock
	Cargo.toml

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/answer-ctrl-c-in-mysql:
 ### Enhance Process Management and Execution

 - **`process_manager.rs`**: Added a new method `find_processes_by_connection_id` to filter processes by connection ID, improving process management capabilities.
 - **`kill.rs`**: Refactored the process killing logic to utilize the new `find_processes_by_connection_id` method, streamlining the execution flow and reducing redundant checks.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/answer-ctrl-c-in-mysql:
 ## Commit Message

 ### Update Process ID Type and Refactor Code

 - **Change Process ID Type**: Updated the process ID type from `u64` to `u32` across multiple files to optimize memory usage. Affected files include `process_manager.rs`, `lib.rs`, `database.rs`, `instance.rs`, `server.rs`, `stream_wrapper.rs`, `kill.rs`, `federated.rs`, `handler.rs`, `server.rs`,
 `postgres.rs`, `mysql_server_test.rs`, `context.rs`, `lib.rs`, and `test_util.rs`.

 - **Remove Connection ID**: Removed the `connection_id` field and related logic from `process_manager.rs`, `lib.rs`, `instance.rs`, `server.rs`, `stream_wrapper.rs`, `kill.rs`, `federated.rs`, `handler.rs`, `server.rs`, `postgres.rs`, `mysql_server_test.rs`, `context.rs`, `lib.rs`, and `test_util.rs` to
 simplify the codebase.

 - **Refactor Process Management**: Refactored process management logic to improve clarity and maintainability in `process_manager.rs`, `kill.rs`, and `handler.rs`.

 - **Enhance MySQL Server Handling**: Improved MySQL server handling by integrating process management in `server.rs` and `mysql_server_test.rs`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/answer-ctrl-c-in-mysql:
 ### Add Process Manager to Postgres Server

 - **`src/frontend/src/server.rs`**: Updated server initialization to include `process_manager`.
 - **`src/servers/src/postgres.rs`**: Modified `MakePostgresServerHandler` to accept `process_id` for session creation.
 - **`src/servers/src/postgres/server.rs`**: Integrated `process_manager` into `PostgresServer` for generating `process_id` during connection handling.
 - **`src/servers/tests/postgres/mod.rs`** and **`tests-integration/src/test_util.rs`**: Adjusted test server setup to accommodate optional `process_manager`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/answer-ctrl-c-in-mysql:
 Update `greptime-proto` Dependency

 - Updated the `greptime-proto` dependency to a new revision in both `Cargo.lock` and `Cargo.toml`.
   - `Cargo.lock`: Changed source revision from `d75a56e05a87594fe31ad5c48525e9b2124149ba` to `fdcbe5f1c7c467634c90a1fd1a00a784b92a4e80`.
   - `Cargo.toml`: Updated the `greptime-proto` git revision to match the new commit.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-17 06:36:23 +00:00
fys
3e3a12385c refactor: make flownode gRPC services able to be added dynamically (#6323)
chore: enhance the flownode gRPC servers extension
2025-06-17 06:27:41 +00:00
shuiyisong
079daf5db9 feat: support special labels parsing in prom remote write (#6302)
* feat: support special labels parsing in prom remote write

* chore: change __schema__ to __database__

* fix: test

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* fix: remove the missing type alias

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: update cr issue

Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>
2025-06-17 03:20:33 +00:00
liyang
56b9ab5279 ci: add pr label workflow (#6316)
* ci: add pr label workflow

Signed-off-by: liyang <daviderli614@gmail.com>

* move permissions to jobs

Signed-off-by: liyang <daviderli614@gmail.com>

* add checkout step

Signed-off-by: liyang <daviderli614@gmail.com>

* add job permissions

Signed-off-by: liyang <daviderli614@gmail.com>

* custom sizes

Signed-off-by: liyang <daviderli614@gmail.com>

* Update .github/workflows/pr-labeling.yaml

Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>

---------

Signed-off-by: liyang <daviderli614@gmail.com>
Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>
2025-06-16 17:26:16 +00:00
Ruihang Xia
be4e0d589e feat: support arbitrary constant expression in PromQL function (#6315)
* refactor holt_winters, predict_linear, quantile, round

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* some sqlness result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* support some functions

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* make all sqlness cases pass

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix other sqlness cases

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* some refactor

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-06-16 15:12:27 +00:00
Yingwen
2a3445c72c fix: ignore missing columns and tables in PromQL (#6285)
* fix: handle table/column not found in or

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: update result

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: drop table after test

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: fix test cases

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: do not return table not found error in series_query

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
2025-06-16 12:15:38 +00:00
Lei, HUANG
9d997d593c feat: bulk support flow batch (#6291)
* feat/bulk-support-flow-batch:
 ### Refactor and Enhance Timestamp Handling in gRPC and Bulk Insert

 - **Refactor Table Handling**:
   - Updated `put_record_batch` method to use `TableRef` instead of `TableId` in `grpc.rs`, `greptime_handler.rs`, and `grpc.rs`.
   - Modified `handle_bulk_insert` to accept `TableRef` and extract `TableId` internally in `bulk_insert.rs`.

 - **Enhance Timestamp Processing**:
   - Added `compute_timestamp_range` function to calculate timestamp range in `bulk_insert.rs`.
   - Introduced error handling for invalid time index types in `error.rs`.

 - **Test Adjustments**:
   - Updated `DummyInstance` implementation in `tests/mod.rs` to align with new method signatures.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/bulk-support-flow-batch:
 ### Add Dirty Window Handling in Flow Module

 - **Updated `greptime-proto` Dependency**: Updated the `greptime-proto` dependency to a new revision in `Cargo.lock` and `Cargo.toml`.
 - **Flow Module Enhancements**:
   - Added `DirtyWindowRequest` handling in `flow.rs`, `node_manager.rs`, `test_util.rs`, `flownode_impl.rs`, and `server.rs`.
   - Implemented `handle_mark_window_dirty` function to manage dirty time windows.
 - **Bulk Insert Enhancements**:
   - Modified `bulk_insert.rs` to notify flownodes about dirty time windows using `update_flow_dirty_window`.
 - **Removed Unused Imports**: Cleaned up unused imports in `greptime_handler.rs`, `grpc.rs`, and `mod.rs`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat: mark dirty time window

* feat: metrics

* metrics: more useful metrics batching mode

* feat/bulk-support-flow-batch:
 **Refactor Timestamp Handling and Update Dependencies**

 - **Dependency Update**: Updated `greptime-proto` dependency in `Cargo.lock` and `Cargo.toml` to a new revision.
 - **Batching Engine Refactor**: Modified `src/flow/src/batching_mode/engine.rs` to replace `dirty_time_ranges` with `timestamps` for improved timestamp handling.
 - **Bulk Insert Refactor**: Updated `src/operator/src/bulk_insert.rs` to refactor timestamp extraction and handling. Replaced `compute_timestamp_range` with `extract_timestamps` and adjusted related logic to handle timestamps directly.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/bulk-support-flow-batch:
 ### Update Metrics in Batching Mode Engine

 - **Modified Metrics**: Replaced `METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW_RANGE` with `METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW` to track the count of time windows instead of their range.
   - Files affected: `engine.rs`, `metrics.rs`
 - **New Method**: Added `len()` method to `DirtyTimeWindows` to return the number of dirty windows.
   - File affected: `state.rs`

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/bulk-support-flow-batch:
 **Refactor and Enhance Timestamp Handling in `bulk_insert.rs`**

 - **Refactored Timestamp Extraction**: Moved timestamp extraction logic to a new method `maybe_update_flow_dirty_window` to improve code readability and maintainability.
 - **Enhanced Flow Update Logic**: Updated the flow dirty window update mechanism to conditionally notify flownodes only if they are configured, using `table_info` and `record_batch`.
 - **Imports Adjusted**: Updated imports to reflect changes in table metadata handling, replacing `TableId` with `TableInfoRef`.

 Files affected:
 - `src/operator/src/bulk_insert.rs`

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/bulk-support-flow-batch:
 ## Update `handle_mark_window_dirty` Method in `flownode_impl.rs`

 - Replaced `unimplemented!()` with `unreachable!()` in the `handle_mark_window_dirty` method for both `FlowDualEngine` and `StreamingEngine` implementations in `flownode_impl.rs`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/bulk-support-flow-batch:
 Update `greptime-proto` Dependency

 - Updated the `greptime-proto` dependency to a new revision in both `Cargo.lock` and `Cargo.toml`.
   - `Cargo.lock`: Changed the source revision from `f0913f179ee1d2ce428f8b85a9ea12b5f69ad636` to `17971523673f4fbc982510d3c9d6647ff642e16f`.
   - `Cargo.toml`: Updated the `greptime-proto` git revision to `17971523673f4fbc982510d3c9d6647ff642e16f`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
Co-authored-by: discord9 <discord9@163.com>
2025-06-16 08:19:14 +00:00
Weny Xu
10bf9b11f6 fix: handle corner case in catchup where compacted entry id exceeds region last entry id (#6312)
* fix(mito2): handle corner case in catchup where compacted entry id exceeds region last entry id

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-06-16 06:36:31 +00:00
330 changed files with 23841 additions and 11769 deletions

15
.github/labeler.yaml vendored Normal file
View File

@@ -0,0 +1,15 @@
ci:
- changed-files:
- any-glob-to-any-file: .github/**
docker:
- changed-files:
- any-glob-to-any-file: docker/**
documentation:
- changed-files:
- any-glob-to-any-file: docs/**
dashboard:
- changed-files:
- any-glob-to-any-file: grafana/**

42
.github/workflows/pr-labeling.yaml vendored Normal file
View File

@@ -0,0 +1,42 @@
name: 'PR Labeling'
on:
pull_request_target:
types:
- opened
- synchronize
- reopened
permissions:
contents: read
pull-requests: write
issues: write
jobs:
labeler:
runs-on: ubuntu-latest
steps:
- name: Checkout sources
uses: actions/checkout@v4
- uses: actions/labeler@v5
with:
configuration-path: ".github/labeler.yaml"
repo-token: "${{ secrets.GITHUB_TOKEN }}"
size-label:
runs-on: ubuntu-latest
steps:
- uses: pascalgn/size-label-action@v0.5.5
env:
GITHUB_TOKEN: "${{ secrets.GITHUB_TOKEN }}"
with:
sizes: >
{
"0": "XS",
"100": "S",
"300": "M",
"1000": "L",
"1500": "XL",
"2000": "XXL"
}

147
Cargo.lock generated
View File

@@ -1670,9 +1670,9 @@ dependencies = [
[[package]]
name = "cc"
version = "1.1.24"
version = "1.2.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "812acba72f0a070b003d3697490d2b55b837230ae7c6c6497f05cc2ddbb8d938"
checksum = "d487aa071b5f64da6f19a3e848e3578944b726ee5a4854b82172f02aa876bfdc"
dependencies = [
"jobserver",
"libc",
@@ -1950,6 +1950,7 @@ checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97"
name = "cli"
version = "0.15.0"
dependencies = [
"async-stream",
"async-trait",
"auth",
"base64 0.22.1",
@@ -1982,9 +1983,10 @@ dependencies = [
"meta-srv",
"nu-ansi-term",
"object-store",
"operator",
"query",
"rand 0.9.0",
"reqwest",
"reqwest 0.12.9",
"serde",
"serde_json",
"servers",
@@ -2116,13 +2118,14 @@ dependencies = [
"mito2",
"moka",
"nu-ansi-term",
"object-store",
"plugins",
"prometheus",
"prost 0.13.5",
"query",
"rand 0.9.0",
"regex",
"reqwest",
"reqwest 0.12.9",
"rexpect",
"serde",
"serde_json",
@@ -2218,6 +2221,7 @@ dependencies = [
"humantime-serde",
"meta-client",
"num_cpus",
"object-store",
"serde",
"serde_json",
"serde_with",
@@ -2330,6 +2334,7 @@ dependencies = [
"datafusion",
"datafusion-common",
"datafusion-expr",
"datafusion-functions-aggregate-common",
"datatypes",
"derive_more",
"geo",
@@ -2368,7 +2373,7 @@ dependencies = [
"common-test-util",
"common-version",
"hyper 0.14.30",
"reqwest",
"reqwest 0.12.9",
"serde",
"tempfile",
"tokio",
@@ -2669,7 +2674,6 @@ dependencies = [
name = "common-telemetry"
version = "0.15.0"
dependencies = [
"atty",
"backtrace",
"common-error",
"console-subscriber",
@@ -3070,9 +3074,9 @@ dependencies = [
[[package]]
name = "crossbeam-channel"
version = "0.5.13"
version = "0.5.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2"
checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2"
dependencies = [
"crossbeam-utils",
]
@@ -3761,7 +3765,7 @@ dependencies = [
"prometheus",
"prost 0.13.5",
"query",
"reqwest",
"reqwest 0.12.9",
"serde",
"serde_json",
"servers",
@@ -4734,6 +4738,7 @@ dependencies = [
"log-store",
"meta-client",
"num_cpus",
"object-store",
"opentelemetry-proto 0.27.0",
"operator",
"otel-arrow-rust",
@@ -5143,7 +5148,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=f0913f179ee1d2ce428f8b85a9ea12b5f69ad636#f0913f179ee1d2ce428f8b85a9ea12b5f69ad636"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=464226cf8a4a22696503536a123d0b9e318582f4#464226cf8a4a22696503536a123d0b9e318582f4"
dependencies = [
"prost 0.13.5",
"serde",
@@ -7230,6 +7235,7 @@ dependencies = [
name = "metric-engine"
version = "0.15.0"
dependencies = [
"ahash 0.8.11",
"api",
"aquamarine",
"async-stream",
@@ -8095,14 +8101,21 @@ version = "0.15.0"
dependencies = [
"anyhow",
"bytes",
"common-base",
"common-error",
"common-macro",
"common-telemetry",
"common-test-util",
"futures",
"humantime-serde",
"lazy_static",
"md5",
"moka",
"opendal",
"prometheus",
"reqwest 0.12.9",
"serde",
"snafu 0.8.5",
"tokio",
"uuid",
]
@@ -8237,7 +8250,7 @@ dependencies = [
"prometheus",
"quick-xml 0.36.2",
"reqsign",
"reqwest",
"reqwest 0.12.9",
"serde",
"serde_json",
"sha2",
@@ -8309,6 +8322,19 @@ dependencies = [
"tracing",
]
[[package]]
name = "opentelemetry-http"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f51189ce8be654f9b5f7e70e49967ed894e84a06fc35c6c042e64ac1fc5399e"
dependencies = [
"async-trait",
"bytes",
"http 0.2.12",
"opentelemetry 0.21.0",
"reqwest 0.11.27",
]
[[package]]
name = "opentelemetry-otlp"
version = "0.14.0"
@@ -8319,10 +8345,12 @@ dependencies = [
"futures-core",
"http 0.2.12",
"opentelemetry 0.21.0",
"opentelemetry-http",
"opentelemetry-proto 0.4.0",
"opentelemetry-semantic-conventions",
"opentelemetry_sdk 0.21.2",
"prost 0.11.9",
"reqwest 0.11.27",
"thiserror 1.0.64",
"tokio",
"tonic 0.9.2",
@@ -10309,7 +10337,7 @@ dependencies = [
"percent-encoding",
"quick-xml 0.35.0",
"rand 0.8.5",
"reqwest",
"reqwest 0.12.9",
"rsa",
"rust-ini 0.21.1",
"serde",
@@ -10318,6 +10346,42 @@ dependencies = [
"sha2",
]
[[package]]
name = "reqwest"
version = "0.11.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62"
dependencies = [
"base64 0.21.7",
"bytes",
"encoding_rs",
"futures-core",
"futures-util",
"h2 0.3.26",
"http 0.2.12",
"http-body 0.4.6",
"hyper 0.14.30",
"ipnet",
"js-sys",
"log",
"mime",
"once_cell",
"percent-encoding",
"pin-project-lite",
"serde",
"serde_json",
"serde_urlencoded",
"sync_wrapper 0.1.2",
"system-configuration",
"tokio",
"tower-service",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
"winreg",
]
[[package]]
name = "reqwest"
version = "0.12.9"
@@ -10388,15 +10452,14 @@ dependencies = [
[[package]]
name = "ring"
version = "0.17.8"
version = "0.17.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d"
checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7"
dependencies = [
"cc",
"cfg-if",
"getrandom 0.2.15",
"libc",
"spin",
"untrusted",
"windows-sys 0.52.0",
]
@@ -11169,6 +11232,7 @@ dependencies = [
"common-base",
"common-catalog",
"common-config",
"common-datasource",
"common-error",
"common-frontend",
"common-grpc",
@@ -11211,16 +11275,23 @@ dependencies = [
"local-ip-address",
"log-query",
"loki-proto",
"metric-engine",
"mime_guess",
"mito-codec",
"mito2",
"mysql_async",
"notify",
"object-pool",
"object-store",
"once_cell",
"openmetrics-parser",
"opensrv-mysql",
"opentelemetry-proto 0.27.0",
"operator",
"otel-arrow-rust",
"parking_lot 0.12.3",
"parquet",
"partition",
"permutation",
"pgwire",
"pin-project",
@@ -11234,7 +11305,7 @@ dependencies = [
"quoted-string",
"rand 0.9.0",
"regex",
"reqwest",
"reqwest 0.12.9",
"rust-embed",
"rustls",
"rustls-pemfile",
@@ -11678,7 +11749,7 @@ dependencies = [
"local-ip-address",
"mysql",
"num_cpus",
"reqwest",
"reqwest 0.12.9",
"serde",
"serde_json",
"sha2",
@@ -12328,6 +12399,27 @@ dependencies = [
"nom",
]
[[package]]
name = "system-configuration"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7"
dependencies = [
"bitflags 1.3.2",
"core-foundation",
"system-configuration-sys",
]
[[package]]
name = "system-configuration-sys"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]]
name = "table"
version = "0.15.0"
@@ -12618,7 +12710,7 @@ dependencies = [
"paste",
"rand 0.9.0",
"rand_chacha 0.9.0",
"reqwest",
"reqwest 0.12.9",
"schemars",
"serde",
"serde_json",
@@ -13774,12 +13866,13 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]]
name = "uuid"
version = "1.10.0"
version = "1.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314"
checksum = "3cf4199d1e5d15ddd86a694e4d0dffa9c323ce759fea589f00fef9d81cc1931d"
dependencies = [
"getrandom 0.2.15",
"rand 0.8.5",
"getrandom 0.3.2",
"js-sys",
"rand 0.9.0",
"serde",
"wasm-bindgen",
]
@@ -14501,6 +14594,16 @@ dependencies = [
"memchr",
]
[[package]]
name = "winreg"
version = "0.50.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1"
dependencies = [
"cfg-if",
"windows-sys 0.48.0",
]
[[package]]
name = "wit-bindgen-rt"
version = "0.39.0"

View File

@@ -121,6 +121,7 @@ datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "
datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-functions-aggregate-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "12c0381babd52c681043957e9d6ee083a03f7646" }
@@ -134,7 +135,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "f0913f179ee1d2ce428f8b85a9ea12b5f69ad636" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "464226cf8a4a22696503536a123d0b9e318582f4" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -189,7 +189,8 @@ We invite you to engage and contribute!
- [Official Website](https://greptime.com/)
- [Blog](https://greptime.com/blogs/)
- [LinkedIn](https://www.linkedin.com/company/greptime/)
- [Twitter](https://twitter.com/greptime)
- [X (Twitter)](https://X.com/greptime)
- [YouTube](https://www.youtube.com/@greptime)
## License

View File

@@ -123,6 +123,7 @@
| `storage.http_client.connect_timeout` | String | `30s` | The timeout for only the connect phase of a http client. |
| `storage.http_client.timeout` | String | `30s` | The total request timeout, applied from when the request starts connecting until the response body has finished.<br/>Also considered a total deadline. |
| `storage.http_client.pool_idle_timeout` | String | `90s` | The timeout for idle sockets being kept-alive. |
| `storage.http_client.skip_ssl_validation` | Bool | `false` | To skip the ssl verification<br/>**Security Notice**: Setting `skip_ssl_validation = true` disables certificate verification, making connections vulnerable to man-in-the-middle attacks. Only use this in development or trusted private networks. |
| `[[region_engine]]` | -- | -- | The region engine options. You can configure multiple region engines. |
| `region_engine.mito` | -- | -- | The Mito engine options. |
| `region_engine.mito.num_workers` | Integer | `8` | Number of region workers. |
@@ -184,10 +185,11 @@
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. |
| `logging.otlp_endpoint` | String | `http://localhost:4318` | The OTLP tracing endpoint. |
| `logging.append_stdout` | Bool | `true` | Whether to append logs to stdout. |
| `logging.log_format` | String | `text` | The log format. Can be `text`/`json`. |
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
| `logging.otlp_export_protocol` | String | `http` | The OTLP tracing export protocol. Can be `grpc`/`http`. |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `slow_query` | -- | -- | The slow query log options. |
@@ -287,10 +289,11 @@
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. |
| `logging.otlp_endpoint` | String | `http://localhost:4318` | The OTLP tracing endpoint. |
| `logging.append_stdout` | Bool | `true` | Whether to append logs to stdout. |
| `logging.log_format` | String | `text` | The log format. Can be `text`/`json`. |
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
| `logging.otlp_export_protocol` | String | `http` | The OTLP tracing export protocol. Can be `grpc`/`http`. |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `slow_query` | -- | -- | The slow query log options. |
@@ -322,6 +325,7 @@
| `selector` | String | `round_robin` | Datanode selector type.<br/>- `round_robin` (default value)<br/>- `lease_based`<br/>- `load_based`<br/>For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". |
| `use_memory_store` | Bool | `false` | Store data in memory. |
| `enable_region_failover` | Bool | `false` | Whether to enable region failover.<br/>This feature is only available on GreptimeDB running on cluster mode and<br/>- Using Remote WAL<br/>- Using shared storage (e.g., s3). |
| `region_failure_detector_initialization_delay` | String | `10m` | Delay before initializing region failure detectors.<br/>This delay helps prevent premature initialization of region failure detectors in cases where<br/>cluster maintenance mode is enabled right after metasrv starts, especially when the cluster<br/>is not deployed via the recommended GreptimeDB Operator. Without this delay, early detector registration<br/>may trigger unnecessary region failovers during datanode startup. |
| `allow_region_failover_on_local_wal` | Bool | `false` | Whether to allow region failover on local WAL.<br/>**This option is not recommended to be set to true, because it may lead to data loss during failover.** |
| `node_max_idle_time` | String | `24hours` | Max allowed idle time before removing node info from metasrv memory. |
| `enable_telemetry` | Bool | `true` | Whether to enable greptimedb telemetry. Enabled by default. |
@@ -369,10 +373,11 @@
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. |
| `logging.otlp_endpoint` | String | `http://localhost:4318` | The OTLP tracing endpoint. |
| `logging.append_stdout` | Bool | `true` | Whether to append logs to stdout. |
| `logging.log_format` | String | `text` | The log format. Can be `text`/`json`. |
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
| `logging.otlp_export_protocol` | String | `http` | The OTLP tracing export protocol. Can be `grpc`/`http`. |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `export_metrics` | -- | -- | The metasrv can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
@@ -471,6 +476,7 @@
| `storage.http_client.connect_timeout` | String | `30s` | The timeout for only the connect phase of a http client. |
| `storage.http_client.timeout` | String | `30s` | The total request timeout, applied from when the request starts connecting until the response body has finished.<br/>Also considered a total deadline. |
| `storage.http_client.pool_idle_timeout` | String | `90s` | The timeout for idle sockets being kept-alive. |
| `storage.http_client.skip_ssl_validation` | Bool | `false` | To skip the ssl verification<br/>**Security Notice**: Setting `skip_ssl_validation = true` disables certificate verification, making connections vulnerable to man-in-the-middle attacks. Only use this in development or trusted private networks. |
| `[[region_engine]]` | -- | -- | The region engine options. You can configure multiple region engines. |
| `region_engine.mito` | -- | -- | The Mito engine options. |
| `region_engine.mito.num_workers` | Integer | `8` | Number of region workers. |
@@ -532,10 +538,11 @@
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. |
| `logging.otlp_endpoint` | String | `http://localhost:4318` | The OTLP tracing endpoint. |
| `logging.append_stdout` | Bool | `true` | Whether to append logs to stdout. |
| `logging.log_format` | String | `text` | The log format. Can be `text`/`json`. |
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
| `logging.otlp_export_protocol` | String | `http` | The OTLP tracing export protocol. Can be `grpc`/`http`. |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
@@ -582,10 +589,11 @@
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
| `logging.enable_otlp_tracing` | Bool | `false` | Enable OTLP tracing. |
| `logging.otlp_endpoint` | String | `http://localhost:4317` | The OTLP tracing endpoint. |
| `logging.otlp_endpoint` | String | `http://localhost:4318` | The OTLP tracing endpoint. |
| `logging.append_stdout` | Bool | `true` | Whether to append logs to stdout. |
| `logging.log_format` | String | `text` | The log format. Can be `text`/`json`. |
| `logging.max_log_files` | Integer | `720` | The maximum amount of log files. |
| `logging.otlp_export_protocol` | String | `http` | The OTLP tracing export protocol. Can be `grpc`/`http`. |
| `logging.tracing_sample_ratio` | -- | -- | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |

View File

@@ -367,6 +367,10 @@ timeout = "30s"
## The timeout for idle sockets being kept-alive.
pool_idle_timeout = "90s"
## To skip the ssl verification
## **Security Notice**: Setting `skip_ssl_validation = true` disables certificate verification, making connections vulnerable to man-in-the-middle attacks. Only use this in development or trusted private networks.
skip_ssl_validation = false
# Custom storage options
# [[storage.providers]]
# name = "S3"
@@ -625,7 +629,7 @@ level = "info"
enable_otlp_tracing = false
## The OTLP tracing endpoint.
otlp_endpoint = "http://localhost:4317"
otlp_endpoint = "http://localhost:4318"
## Whether to append logs to stdout.
append_stdout = true
@@ -636,6 +640,9 @@ log_format = "text"
## The maximum amount of log files.
max_log_files = 720
## The OTLP tracing export protocol. Can be `grpc`/`http`.
otlp_export_protocol = "http"
## The percentage of tracing will be sampled and exported.
## Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
## ratio > 1 are treated as 1. Fractions < 0 are treated as 0

View File

@@ -83,7 +83,7 @@ level = "info"
enable_otlp_tracing = false
## The OTLP tracing endpoint.
otlp_endpoint = "http://localhost:4317"
otlp_endpoint = "http://localhost:4318"
## Whether to append logs to stdout.
append_stdout = true
@@ -94,6 +94,9 @@ log_format = "text"
## The maximum amount of log files.
max_log_files = 720
## The OTLP tracing export protocol. Can be `grpc`/`http`.
otlp_export_protocol = "http"
## The percentage of tracing will be sampled and exported.
## Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
## ratio > 1 are treated as 1. Fractions < 0 are treated as 0

View File

@@ -218,7 +218,7 @@ level = "info"
enable_otlp_tracing = false
## The OTLP tracing endpoint.
otlp_endpoint = "http://localhost:4317"
otlp_endpoint = "http://localhost:4318"
## Whether to append logs to stdout.
append_stdout = true
@@ -229,6 +229,9 @@ log_format = "text"
## The maximum amount of log files.
max_log_files = 720
## The OTLP tracing export protocol. Can be `grpc`/`http`.
otlp_export_protocol = "http"
## The percentage of tracing will be sampled and exported.
## Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
## ratio > 1 are treated as 1. Fractions < 0 are treated as 0

View File

@@ -43,6 +43,13 @@ use_memory_store = false
## - Using shared storage (e.g., s3).
enable_region_failover = false
## Delay before initializing region failure detectors.
## This delay helps prevent premature initialization of region failure detectors in cases where
## cluster maintenance mode is enabled right after metasrv starts, especially when the cluster
## is not deployed via the recommended GreptimeDB Operator. Without this delay, early detector registration
## may trigger unnecessary region failovers during datanode startup.
region_failure_detector_initialization_delay = '10m'
## Whether to allow region failover on local WAL.
## **This option is not recommended to be set to true, because it may lead to data loss during failover.**
allow_region_failover_on_local_wal = false
@@ -220,7 +227,7 @@ level = "info"
enable_otlp_tracing = false
## The OTLP tracing endpoint.
otlp_endpoint = "http://localhost:4317"
otlp_endpoint = "http://localhost:4318"
## Whether to append logs to stdout.
append_stdout = true
@@ -231,6 +238,9 @@ log_format = "text"
## The maximum amount of log files.
max_log_files = 720
## The OTLP tracing export protocol. Can be `grpc`/`http`.
otlp_export_protocol = "http"
## The percentage of tracing will be sampled and exported.
## Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
## ratio > 1 are treated as 1. Fractions < 0 are treated as 0

View File

@@ -458,6 +458,10 @@ timeout = "30s"
## The timeout for idle sockets being kept-alive.
pool_idle_timeout = "90s"
## To skip the ssl verification
## **Security Notice**: Setting `skip_ssl_validation = true` disables certificate verification, making connections vulnerable to man-in-the-middle attacks. Only use this in development or trusted private networks.
skip_ssl_validation = false
# Custom storage options
# [[storage.providers]]
# name = "S3"
@@ -716,7 +720,7 @@ level = "info"
enable_otlp_tracing = false
## The OTLP tracing endpoint.
otlp_endpoint = "http://localhost:4317"
otlp_endpoint = "http://localhost:4318"
## Whether to append logs to stdout.
append_stdout = true
@@ -727,6 +731,9 @@ log_format = "text"
## The maximum amount of log files.
max_log_files = 720
## The OTLP tracing export protocol. Can be `grpc`/`http`.
otlp_export_protocol = "http"
## The percentage of tracing will be sampled and exported.
## Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.
## ratio > 1 are treated as 1. Fractions < 0 are treated as 0

File diff suppressed because it is too large Load Diff

View File

@@ -70,6 +70,7 @@
| Inflight Flush | `greptime_mito_inflight_flush_count` | `timeseries` | Ongoing flush task count | `prometheus` | `none` | `[{{instance}}]-[{{pod}}]` |
| Compaction Input/Output Bytes | `sum by(instance, pod) (greptime_mito_compaction_input_bytes)`<br/>`sum by(instance, pod) (greptime_mito_compaction_output_bytes)` | `timeseries` | Compaction oinput output bytes | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-input` |
| Region Worker Handle Bulk Insert Requests | `histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))`<br/>`sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to handle bulk insert region requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
| Active Series and Field Builders Count | `sum by(instance, pod) (greptime_mito_memtable_active_series_count)`<br/>`sum by(instance, pod) (greptime_mito_memtable_field_builder_count)` | `timeseries` | Compaction oinput output bytes | `prometheus` | `none` | `[{{instance}}]-[{{pod}}]-series` |
| Region Worker Convert Requests | `histogram_quantile(0.95, sum by(le, instance, stage, pod) (rate(greptime_datanode_convert_region_request_bucket[$__rate_interval])))`<br/>`sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to decode requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
# OpenDAL
| Title | Query | Type | Description | Datasource | Unit | Legend Format |

View File

@@ -612,6 +612,21 @@ groups:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG'
- title: Active Series and Field Builders Count
type: timeseries
description: Compaction oinput output bytes
unit: none
queries:
- expr: sum by(instance, pod) (greptime_mito_memtable_active_series_count)
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-series'
- expr: sum by(instance, pod) (greptime_mito_memtable_field_builder_count)
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-field_builders'
- title: Region Worker Convert Requests
type: timeseries
description: Per-stage elapsed time for region worker to decode requests.

File diff suppressed because it is too large Load Diff

View File

@@ -70,6 +70,7 @@
| Inflight Flush | `greptime_mito_inflight_flush_count` | `timeseries` | Ongoing flush task count | `prometheus` | `none` | `[{{instance}}]-[{{pod}}]` |
| Compaction Input/Output Bytes | `sum by(instance, pod) (greptime_mito_compaction_input_bytes)`<br/>`sum by(instance, pod) (greptime_mito_compaction_output_bytes)` | `timeseries` | Compaction oinput output bytes | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-input` |
| Region Worker Handle Bulk Insert Requests | `histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))`<br/>`sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to handle bulk insert region requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
| Active Series and Field Builders Count | `sum by(instance, pod) (greptime_mito_memtable_active_series_count)`<br/>`sum by(instance, pod) (greptime_mito_memtable_field_builder_count)` | `timeseries` | Compaction oinput output bytes | `prometheus` | `none` | `[{{instance}}]-[{{pod}}]-series` |
| Region Worker Convert Requests | `histogram_quantile(0.95, sum by(le, instance, stage, pod) (rate(greptime_datanode_convert_region_request_bucket[$__rate_interval])))`<br/>`sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to decode requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
# OpenDAL
| Title | Query | Type | Description | Datasource | Unit | Legend Format |

View File

@@ -612,6 +612,21 @@ groups:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG'
- title: Active Series and Field Builders Count
type: timeseries
description: Compaction oinput output bytes
unit: none
queries:
- expr: sum by(instance, pod) (greptime_mito_memtable_active_series_count)
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-series'
- expr: sum by(instance, pod) (greptime_mito_memtable_field_builder_count)
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-field_builders'
- title: Region Worker Convert Requests
type: timeseries
description: Per-stage elapsed time for region worker to decode requests.

View File

@@ -31,6 +31,7 @@ excludes = [
"src/operator/src/expr_helper/trigger.rs",
"src/sql/src/statements/create/trigger.rs",
"src/sql/src/statements/show/trigger.rs",
"src/sql/src/statements/drop/trigger.rs",
"src/sql/src/parsers/create_parser/trigger.rs",
"src/sql/src/parsers/show_parser/trigger.rs",
]

View File

@@ -22,6 +22,7 @@ use greptime_proto::v1::region::RegionResponse as RegionResponseV1;
pub struct RegionResponse {
pub affected_rows: AffectedRows,
pub extensions: HashMap<String, Vec<u8>>,
pub metadata: Vec<u8>,
}
impl RegionResponse {
@@ -29,6 +30,7 @@ impl RegionResponse {
Self {
affected_rows: region_response.affected_rows as _,
extensions: region_response.extensions,
metadata: region_response.metadata,
}
}
@@ -37,6 +39,16 @@ impl RegionResponse {
Self {
affected_rows,
extensions: Default::default(),
metadata: Vec::new(),
}
}
/// Creates one response with metadata.
pub fn from_metadata(metadata: Vec<u8>) -> Self {
Self {
affected_rows: 0,
extensions: Default::default(),
metadata,
}
}
}

View File

@@ -22,7 +22,9 @@ use common_catalog::consts::{
PG_CATALOG_NAME,
};
use common_error::ext::BoxedError;
use common_meta::cache::{LayeredCacheRegistryRef, ViewInfoCacheRef};
use common_meta::cache::{
LayeredCacheRegistryRef, TableRoute, TableRouteCacheRef, ViewInfoCacheRef,
};
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::schema_name::SchemaNameKey;
@@ -266,16 +268,68 @@ impl CatalogManager for KvBackendCatalogManager {
let table_cache: TableCacheRef = self.cache_registry.get().context(CacheNotFoundSnafu {
name: "table_cache",
})?;
if let Some(table) = table_cache
let table_route_cache: TableRouteCacheRef =
self.cache_registry.get().context(CacheNotFoundSnafu {
name: "table_route_cache",
})?;
let table = table_cache
.get_by_ref(&TableName {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name: table_name.to_string(),
})
.await
.context(GetTableCacheSnafu)?
.context(GetTableCacheSnafu)?;
// Override logical table's partition key indices with physical table's.
if let Some(table) = &table
&& let Some(table_route_value) = table_route_cache
.get(table.table_info().table_id())
.await
.context(TableMetadataManagerSnafu)?
&& let TableRoute::Logical(logical_route) = &*table_route_value
&& let Some(physical_table_info_value) = self
.table_metadata_manager
.table_info_manager()
.get(logical_route.physical_table_id())
.await
.context(TableMetadataManagerSnafu)?
{
return Ok(Some(table));
let mut new_table_info = (*table.table_info()).clone();
// Gather all column names from the logical table
let logical_column_names: std::collections::HashSet<_> = new_table_info
.meta
.schema
.column_schemas()
.iter()
.map(|col| &col.name)
.collect();
// Only preserve partition key indices where the corresponding columns exist in logical table
new_table_info.meta.partition_key_indices = physical_table_info_value
.table_info
.meta
.partition_key_indices
.iter()
.filter(|&&index| {
if let Some(physical_column) = physical_table_info_value
.table_info
.meta
.schema
.column_schemas
.get(index)
{
logical_column_names.contains(&physical_column.name)
} else {
false
}
})
.cloned()
.collect();
let new_table = DistTable::table(Arc::new(new_table_info));
return Ok(Some(new_table));
}
if channel == Channel::Postgres {
@@ -288,7 +342,7 @@ impl CatalogManager for KvBackendCatalogManager {
}
}
return Ok(None);
Ok(table)
}
async fn tables_by_ids(

View File

@@ -14,6 +14,7 @@
#![feature(assert_matches)]
#![feature(try_blocks)]
#![feature(let_chains)]
use std::any::Any;
use std::fmt::{Debug, Formatter};

View File

@@ -15,7 +15,7 @@
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, RwLock};
use api::v1::frontend::{KillProcessRequest, ListProcessRequest, ProcessInfo};
@@ -29,6 +29,7 @@ use snafu::{ensure, OptionExt, ResultExt};
use crate::error;
use crate::metrics::{PROCESS_KILL_COUNT, PROCESS_LIST_COUNT};
pub type ProcessId = u32;
pub type ProcessManagerRef = Arc<ProcessManager>;
/// Query process manager.
@@ -36,9 +37,9 @@ pub struct ProcessManager {
/// Local frontend server address,
server_addr: String,
/// Next process id for local queries.
next_id: AtomicU64,
next_id: AtomicU32,
/// Running process per catalog.
catalogs: RwLock<HashMap<String, HashMap<u64, CancellableProcess>>>,
catalogs: RwLock<HashMap<String, HashMap<ProcessId, CancellableProcess>>>,
/// Frontend selector to locate frontend nodes.
frontend_selector: Option<MetaClientSelector>,
}
@@ -65,9 +66,9 @@ impl ProcessManager {
schemas: Vec<String>,
query: String,
client: String,
id: Option<u64>,
query_id: Option<ProcessId>,
) -> Ticket {
let id = id.unwrap_or_else(|| self.next_id.fetch_add(1, Ordering::Relaxed));
let id = query_id.unwrap_or_else(|| self.next_id.fetch_add(1, Ordering::Relaxed));
let process = ProcessInfo {
id,
catalog: catalog.clone(),
@@ -96,12 +97,12 @@ impl ProcessManager {
}
/// Generates the next process id.
pub fn next_id(&self) -> u64 {
pub fn next_id(&self) -> u32 {
self.next_id.fetch_add(1, Ordering::Relaxed)
}
/// De-register a query from process list.
pub fn deregister_query(&self, catalog: String, id: u64) {
pub fn deregister_query(&self, catalog: String, id: ProcessId) {
if let Entry::Occupied(mut o) = self.catalogs.write().unwrap().entry(catalog) {
let process = o.get_mut().remove(&id);
debug!("Deregister process: {:?}", process);
@@ -159,26 +160,10 @@ impl ProcessManager {
&self,
server_addr: String,
catalog: String,
id: u64,
id: ProcessId,
) -> error::Result<bool> {
if server_addr == self.server_addr {
if let Some(catalogs) = self.catalogs.write().unwrap().get_mut(&catalog) {
if let Some(process) = catalogs.remove(&id) {
process.handle.cancel();
info!(
"Killed process, catalog: {}, id: {:?}",
process.process.catalog, process.process.id
);
PROCESS_KILL_COUNT.with_label_values(&[&catalog]).inc();
Ok(true)
} else {
debug!("Failed to kill process, id not found: {}", id);
Ok(false)
}
} else {
debug!("Failed to kill process, catalog not found: {}", catalog);
Ok(false)
}
self.kill_local_process(catalog, id).await
} else {
let mut nodes = self
.frontend_selector
@@ -204,12 +189,33 @@ impl ProcessManager {
Ok(true)
}
}
/// Kills local query with provided catalog and id.
pub async fn kill_local_process(&self, catalog: String, id: ProcessId) -> error::Result<bool> {
if let Some(catalogs) = self.catalogs.write().unwrap().get_mut(&catalog) {
if let Some(process) = catalogs.remove(&id) {
process.handle.cancel();
info!(
"Killed process, catalog: {}, id: {:?}",
process.process.catalog, process.process.id
);
PROCESS_KILL_COUNT.with_label_values(&[&catalog]).inc();
Ok(true)
} else {
debug!("Failed to kill process, id not found: {}", id);
Ok(false)
}
} else {
debug!("Failed to kill process, catalog not found: {}", catalog);
Ok(false)
}
}
}
pub struct Ticket {
pub(crate) catalog: String,
pub(crate) manager: ProcessManagerRef,
pub(crate) id: u64,
pub(crate) id: ProcessId,
pub cancellation_handle: Arc<CancellationHandle>,
}
@@ -323,7 +329,7 @@ mod tests {
assert_eq!(running_processes.len(), 2);
// Verify both processes are present
let ids: Vec<u64> = running_processes.iter().map(|p| p.id).collect();
let ids: Vec<u32> = running_processes.iter().map(|p| p.id).collect();
assert!(ids.contains(&ticket1.id));
assert!(ids.contains(&ticket2.id));
}

View File

@@ -19,7 +19,7 @@ mod information_memory_table;
pub mod key_column_usage;
mod partitions;
mod procedure_info;
mod process_list;
pub mod process_list;
pub mod region_peers;
mod region_statistics;
mod runtime_metrics;

View File

@@ -39,14 +39,14 @@ use crate::process_manager::ProcessManagerRef;
use crate::system_schema::information_schema::InformationTable;
/// Column names of `information_schema.process_list`
const ID: &str = "id";
const CATALOG: &str = "catalog";
const SCHEMAS: &str = "schemas";
const QUERY: &str = "query";
const CLIENT: &str = "client";
const FRONTEND: &str = "frontend";
const START_TIMESTAMP: &str = "start_timestamp";
const ELAPSED_TIME: &str = "elapsed_time";
pub const ID: &str = "id";
pub const CATALOG: &str = "catalog";
pub const SCHEMAS: &str = "schemas";
pub const QUERY: &str = "query";
pub const CLIENT: &str = "client";
pub const FRONTEND: &str = "frontend";
pub const START_TIMESTAMP: &str = "start_timestamp";
pub const ELAPSED_TIME: &str = "elapsed_time";
/// `information_schema.process_list` table implementation that tracks running
/// queries in current cluster.

View File

@@ -16,6 +16,7 @@ mysql_kvbackend = ["common-meta/mysql_kvbackend", "meta-srv/mysql_kvbackend"]
workspace = true
[dependencies]
async-stream.workspace = true
async-trait.workspace = true
auth.workspace = true
base64.workspace = true
@@ -50,6 +51,7 @@ meta-client.workspace = true
meta-srv.workspace = true
nu-ansi-term = "0.46"
object-store.workspace = true
operator.workspace = true
query.workspace = true
rand.workspace = true
reqwest.workspace = true
@@ -65,6 +67,7 @@ tokio.workspace = true
tracing-appender.workspace = true
[dev-dependencies]
common-meta = { workspace = true, features = ["testing"] }
common-version.workspace = true
serde.workspace = true
tempfile.workspace = true

View File

@@ -17,8 +17,10 @@ use std::any::Any;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_meta::peer::Peer;
use object_store::Error as ObjectStoreError;
use snafu::{Location, Snafu};
use store_api::storage::TableId;
#[derive(Snafu)]
#[snafu(visibility(pub))]
@@ -73,6 +75,20 @@ pub enum Error {
source: common_meta::error::Error,
},
#[snafu(display("Failed to get table metadata"))]
TableMetadata {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
},
#[snafu(display("Unexpected error: {}", msg))]
Unexpected {
msg: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Missing config, msg: {}", msg))]
MissingConfig {
msg: String,
@@ -222,6 +238,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Table not found: {table_id}"))]
TableNotFound {
table_id: TableId,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("OpenDAL operator failed"))]
OpenDal {
#[snafu(implicit)]
@@ -267,6 +290,29 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to init backend"))]
InitBackend {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: ObjectStoreError,
},
#[snafu(display("Covert column schemas to defs failed"))]
CovertColumnSchemasToDefs {
#[snafu(implicit)]
location: Location,
source: operator::error::Error,
},
#[snafu(display("Failed to send request to datanode: {}", peer))]
SendRequestToDatanode {
peer: Peer,
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -274,9 +320,9 @@ pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::InitMetadata { source, .. } | Error::InitDdlManager { source, .. } => {
source.status_code()
}
Error::InitMetadata { source, .. }
| Error::InitDdlManager { source, .. }
| Error::TableMetadata { source, .. } => source.status_code(),
Error::MissingConfig { .. }
| Error::LoadLayeredConfig { .. }
@@ -290,6 +336,9 @@ impl ErrorExt for Error {
| Error::InvalidArguments { .. }
| Error::ParseProxyOpts { .. } => StatusCode::InvalidArguments,
Error::CovertColumnSchemasToDefs { source, .. } => source.status_code(),
Error::SendRequestToDatanode { source, .. } => source.status_code(),
Error::StartProcedureManager { source, .. }
| Error::StopProcedureManager { source, .. } => source.status_code(),
Error::StartWalOptionsAllocator { source, .. } => source.status_code(),
@@ -297,6 +346,7 @@ impl ErrorExt for Error {
Error::ParseSql { source, .. } | Error::PlanStatement { source, .. } => {
source.status_code()
}
Error::Unexpected { .. } => StatusCode::Unexpected,
Error::SerdeJson { .. }
| Error::FileIo { .. }
@@ -305,7 +355,7 @@ impl ErrorExt for Error {
| Error::BuildClient { .. } => StatusCode::Unexpected,
Error::Other { source, .. } => source.status_code(),
Error::OpenDal { .. } => StatusCode::Internal,
Error::OpenDal { .. } | Error::InitBackend { .. } => StatusCode::Internal,
Error::S3ConfigNotSet { .. }
| Error::OutputDirNotSet { .. }
| Error::EmptyStoreAddrs { .. } => StatusCode::InvalidArguments,
@@ -314,6 +364,7 @@ impl ErrorExt for Error {
Error::CacheRequired { .. } | Error::BuildCacheRegistry { .. } => StatusCode::Internal,
Error::MetaClientInit { source, .. } => source.status_code(),
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::SchemaNotFound { .. } => StatusCode::DatabaseNotFound,
}
}

View File

@@ -14,29 +14,39 @@
mod common;
mod control;
mod repair;
mod snapshot;
mod utils;
use clap::Subcommand;
use common_error::ext::BoxedError;
use crate::metadata::control::ControlCommand;
use crate::metadata::control::{DelCommand, GetCommand};
use crate::metadata::repair::RepairLogicalTablesCommand;
use crate::metadata::snapshot::SnapshotCommand;
use crate::Tool;
/// Command for managing metadata operations, including saving metadata snapshots and restoring metadata from snapshots.
/// Command for managing metadata operations,
/// including saving and restoring metadata snapshots,
/// controlling metadata operations, and diagnosing and repairing metadata.
#[derive(Subcommand)]
pub enum MetadataCommand {
#[clap(subcommand)]
Snapshot(SnapshotCommand),
#[clap(subcommand)]
Control(ControlCommand),
Get(GetCommand),
#[clap(subcommand)]
Del(DelCommand),
RepairLogicalTables(RepairLogicalTablesCommand),
}
impl MetadataCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
match self {
MetadataCommand::Snapshot(cmd) => cmd.build().await,
MetadataCommand::Control(cmd) => cmd.build().await,
MetadataCommand::RepairLogicalTables(cmd) => cmd.build().await,
MetadataCommand::Get(cmd) => cmd.build().await,
MetadataCommand::Del(cmd) => cmd.build().await,
}
}
}

View File

@@ -12,27 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod del;
mod get;
#[cfg(test)]
mod test_utils;
mod utils;
use clap::Subcommand;
use common_error::ext::BoxedError;
use get::GetCommand;
use crate::Tool;
/// Subcommand for metadata control.
#[derive(Subcommand)]
pub enum ControlCommand {
/// Get the metadata from the metasrv.
#[clap(subcommand)]
Get(GetCommand),
}
impl ControlCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
match self {
ControlCommand::Get(cmd) => cmd.build().await,
}
}
}
pub(crate) use del::DelCommand;
pub(crate) use get::GetCommand;

View File

@@ -0,0 +1,42 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod key;
mod table;
use clap::Subcommand;
use common_error::ext::BoxedError;
use crate::metadata::control::del::key::DelKeyCommand;
use crate::metadata::control::del::table::DelTableCommand;
use crate::Tool;
/// The prefix of the tombstone keys.
pub(crate) const CLI_TOMBSTONE_PREFIX: &str = "__cli_tombstone/";
/// Subcommand for deleting metadata from the metadata store.
#[derive(Subcommand)]
pub enum DelCommand {
Key(DelKeyCommand),
Table(DelTableCommand),
}
impl DelCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
match self {
DelCommand::Key(cmd) => cmd.build().await,
DelCommand::Table(cmd) => cmd.build().await,
}
}
}

View File

@@ -0,0 +1,132 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use async_trait::async_trait;
use clap::Parser;
use common_error::ext::BoxedError;
use common_meta::key::tombstone::TombstoneManager;
use common_meta::kv_backend::KvBackendRef;
use common_meta::rpc::store::RangeRequest;
use crate::metadata::common::StoreConfig;
use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX;
use crate::Tool;
/// Delete key-value pairs logically from the metadata store.
#[derive(Debug, Default, Parser)]
pub struct DelKeyCommand {
/// The key to delete from the metadata store.
key: String,
/// Delete key-value pairs with the given prefix.
#[clap(long)]
prefix: bool,
#[clap(flatten)]
store: StoreConfig,
}
impl DelKeyCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
let kv_backend = self.store.build().await?;
Ok(Box::new(DelKeyTool {
key: self.key.to_string(),
prefix: self.prefix,
key_deleter: KeyDeleter::new(kv_backend),
}))
}
}
struct KeyDeleter {
kv_backend: KvBackendRef,
tombstone_manager: TombstoneManager,
}
impl KeyDeleter {
fn new(kv_backend: KvBackendRef) -> Self {
Self {
kv_backend: kv_backend.clone(),
tombstone_manager: TombstoneManager::new_with_prefix(kv_backend, CLI_TOMBSTONE_PREFIX),
}
}
async fn delete(&self, key: &str, prefix: bool) -> Result<usize, BoxedError> {
let mut req = RangeRequest::default().with_keys_only();
if prefix {
req = req.with_prefix(key.as_bytes());
} else {
req = req.with_key(key.as_bytes());
}
let resp = self.kv_backend.range(req).await.map_err(BoxedError::new)?;
let keys = resp.kvs.iter().map(|kv| kv.key.clone()).collect::<Vec<_>>();
self.tombstone_manager
.create(keys)
.await
.map_err(BoxedError::new)
}
}
struct DelKeyTool {
key: String,
prefix: bool,
key_deleter: KeyDeleter,
}
#[async_trait]
impl Tool for DelKeyTool {
async fn do_work(&self) -> Result<(), BoxedError> {
let deleted = self.key_deleter.delete(&self.key, self.prefix).await?;
// Print the number of deleted keys.
println!("{}", deleted);
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_meta::kv_backend::chroot::ChrootKvBackend;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::{KvBackend, KvBackendRef};
use common_meta::rpc::store::RangeRequest;
use crate::metadata::control::del::key::KeyDeleter;
use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX;
use crate::metadata::control::test_utils::put_key;
#[tokio::test]
async fn test_delete_keys() {
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let key_deleter = KeyDeleter::new(kv_backend.clone());
put_key(&kv_backend, "foo", "bar").await;
put_key(&kv_backend, "foo/bar", "baz").await;
put_key(&kv_backend, "foo/baz", "qux").await;
let deleted = key_deleter.delete("foo", true).await.unwrap();
assert_eq!(deleted, 3);
let deleted = key_deleter.delete("foo/bar", false).await.unwrap();
assert_eq!(deleted, 0);
let chroot = ChrootKvBackend::new(CLI_TOMBSTONE_PREFIX.as_bytes().to_vec(), kv_backend);
let req = RangeRequest::default().with_prefix(b"foo");
let resp = chroot.range(req).await.unwrap();
assert_eq!(resp.kvs.len(), 3);
assert_eq!(resp.kvs[0].key, b"foo");
assert_eq!(resp.kvs[0].value, b"bar");
assert_eq!(resp.kvs[1].key, b"foo/bar");
assert_eq!(resp.kvs[1].value, b"baz");
assert_eq!(resp.kvs[2].key, b"foo/baz");
assert_eq!(resp.kvs[2].value, b"qux");
}
}

View File

@@ -0,0 +1,235 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use async_trait::async_trait;
use clap::Parser;
use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::format_full_table_name;
use common_error::ext::BoxedError;
use common_meta::ddl::utils::get_region_wal_options;
use common_meta::key::table_name::TableNameManager;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use store_api::storage::TableId;
use crate::error::{InvalidArgumentsSnafu, TableNotFoundSnafu};
use crate::metadata::common::StoreConfig;
use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX;
use crate::metadata::control::utils::get_table_id_by_name;
use crate::Tool;
/// Delete table metadata logically from the metadata store.
#[derive(Debug, Default, Parser)]
pub struct DelTableCommand {
/// The table id to delete from the metadata store.
#[clap(long)]
table_id: Option<u32>,
/// The table name to delete from the metadata store.
#[clap(long)]
table_name: Option<String>,
/// The schema name of the table.
#[clap(long, default_value = DEFAULT_SCHEMA_NAME)]
schema_name: String,
/// The catalog name of the table.
#[clap(long, default_value = DEFAULT_CATALOG_NAME)]
catalog_name: String,
#[clap(flatten)]
store: StoreConfig,
}
impl DelTableCommand {
fn validate(&self) -> Result<(), BoxedError> {
if matches!(
(&self.table_id, &self.table_name),
(Some(_), Some(_)) | (None, None)
) {
return Err(BoxedError::new(
InvalidArgumentsSnafu {
msg: "You must specify either --table-id or --table-name.",
}
.build(),
));
}
Ok(())
}
}
impl DelTableCommand {
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
self.validate()?;
let kv_backend = self.store.build().await?;
Ok(Box::new(DelTableTool {
table_id: self.table_id,
table_name: self.table_name.clone(),
schema_name: self.schema_name.clone(),
catalog_name: self.catalog_name.clone(),
table_name_manager: TableNameManager::new(kv_backend.clone()),
table_metadata_deleter: TableMetadataDeleter::new(kv_backend),
}))
}
}
struct DelTableTool {
table_id: Option<u32>,
table_name: Option<String>,
schema_name: String,
catalog_name: String,
table_name_manager: TableNameManager,
table_metadata_deleter: TableMetadataDeleter,
}
#[async_trait]
impl Tool for DelTableTool {
async fn do_work(&self) -> Result<(), BoxedError> {
let table_id = if let Some(table_name) = &self.table_name {
let catalog_name = &self.catalog_name;
let schema_name = &self.schema_name;
let Some(table_id) = get_table_id_by_name(
&self.table_name_manager,
catalog_name,
schema_name,
table_name,
)
.await?
else {
println!(
"Table({}) not found",
format_full_table_name(catalog_name, schema_name, table_name)
);
return Ok(());
};
table_id
} else {
// Safety: we have validated that table_id or table_name is not None
self.table_id.unwrap()
};
self.table_metadata_deleter.delete(table_id).await?;
println!("Table({}) deleted", table_id);
Ok(())
}
}
struct TableMetadataDeleter {
table_metadata_manager: TableMetadataManager,
}
impl TableMetadataDeleter {
fn new(kv_backend: KvBackendRef) -> Self {
Self {
table_metadata_manager: TableMetadataManager::new_with_custom_tombstone_prefix(
kv_backend,
CLI_TOMBSTONE_PREFIX,
),
}
}
async fn delete(&self, table_id: TableId) -> Result<(), BoxedError> {
let (table_info, table_route) = self
.table_metadata_manager
.get_full_table_info(table_id)
.await
.map_err(BoxedError::new)?;
let Some(table_info) = table_info else {
return Err(BoxedError::new(TableNotFoundSnafu { table_id }.build()));
};
let Some(table_route) = table_route else {
return Err(BoxedError::new(TableNotFoundSnafu { table_id }.build()));
};
let physical_table_id = self
.table_metadata_manager
.table_route_manager()
.get_physical_table_id(table_id)
.await
.map_err(BoxedError::new)?;
let table_name = table_info.table_name();
let region_wal_options = get_region_wal_options(
&self.table_metadata_manager,
&table_route,
physical_table_id,
)
.await
.map_err(BoxedError::new)?;
self.table_metadata_manager
.delete_table_metadata(table_id, &table_name, &table_route, &region_wal_options)
.await
.map_err(BoxedError::new)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::chroot::ChrootKvBackend;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::{KvBackend, KvBackendRef};
use common_meta::rpc::store::RangeRequest;
use crate::metadata::control::del::table::TableMetadataDeleter;
use crate::metadata::control::del::CLI_TOMBSTONE_PREFIX;
use crate::metadata::control::test_utils::prepare_physical_table_metadata;
#[tokio::test]
async fn test_delete_table_not_found() {
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let table_metadata_deleter = TableMetadataDeleter::new(kv_backend);
let table_id = 1;
let err = table_metadata_deleter.delete(table_id).await.unwrap_err();
assert_eq!(err.status_code(), StatusCode::TableNotFound);
}
#[tokio::test]
async fn test_delete_table_metadata() {
let kv_backend = Arc::new(MemoryKvBackend::new());
let table_metadata_manager = TableMetadataManager::new(kv_backend.clone());
let table_id = 1024;
let (table_info, table_route) = prepare_physical_table_metadata("my_table", table_id).await;
table_metadata_manager
.create_table_metadata(
table_info,
TableRouteValue::Physical(table_route),
HashMap::new(),
)
.await
.unwrap();
let total_keys = kv_backend.len();
assert!(total_keys > 0);
let table_metadata_deleter = TableMetadataDeleter::new(kv_backend.clone());
table_metadata_deleter.delete(table_id).await.unwrap();
// Check the tombstone keys are deleted
let chroot =
ChrootKvBackend::new(CLI_TOMBSTONE_PREFIX.as_bytes().to_vec(), kv_backend.clone());
let req = RangeRequest::default().with_range(vec![0], vec![0]);
let resp = chroot.range(req).await.unwrap();
assert_eq!(resp.kvs.len(), total_keys);
}
}

View File

@@ -20,7 +20,6 @@ use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::format_full_table_name;
use common_error::ext::BoxedError;
use common_meta::key::table_info::TableInfoKey;
use common_meta::key::table_name::TableNameKey;
use common_meta::key::table_route::TableRouteKey;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
@@ -30,10 +29,10 @@ use futures::TryStreamExt;
use crate::error::InvalidArgumentsSnafu;
use crate::metadata::common::StoreConfig;
use crate::metadata::control::utils::{decode_key_value, json_fromatter};
use crate::metadata::control::utils::{decode_key_value, get_table_id_by_name, json_fromatter};
use crate::Tool;
/// Subcommand for get command.
/// Getting metadata from metadata store.
#[derive(Subcommand)]
pub enum GetCommand {
Key(GetKeyCommand),
@@ -52,7 +51,7 @@ impl GetCommand {
/// Get key-value pairs from the metadata store.
#[derive(Debug, Default, Parser)]
pub struct GetKeyCommand {
/// The key to get from the metadata store. If empty, returns all key-value pairs.
/// The key to get from the metadata store.
#[clap(default_value = "")]
key: String,
@@ -130,8 +129,12 @@ pub struct GetTableCommand {
table_name: Option<String>,
/// The schema name of the table.
#[clap(long)]
schema_name: Option<String>,
#[clap(long, default_value = DEFAULT_SCHEMA_NAME)]
schema_name: String,
/// The catalog name of the table.
#[clap(long, default_value = DEFAULT_CATALOG_NAME)]
catalog_name: String,
/// Pretty print the output.
#[clap(long, default_value = "false")]
@@ -143,7 +146,10 @@ pub struct GetTableCommand {
impl GetTableCommand {
pub fn validate(&self) -> Result<(), BoxedError> {
if self.table_id.is_none() && self.table_name.is_none() {
if matches!(
(&self.table_id, &self.table_name),
(Some(_), Some(_)) | (None, None)
) {
return Err(BoxedError::new(
InvalidArgumentsSnafu {
msg: "You must specify either --table-id or --table-name.",
@@ -159,7 +165,8 @@ struct GetTableTool {
kvbackend: KvBackendRef,
table_id: Option<u32>,
table_name: Option<String>,
schema_name: Option<String>,
schema_name: String,
catalog_name: String,
pretty: bool,
}
@@ -172,23 +179,20 @@ impl Tool for GetTableTool {
let table_route_manager = table_metadata_manager.table_route_manager();
let table_id = if let Some(table_name) = &self.table_name {
let catalog = DEFAULT_CATALOG_NAME.to_string();
let schema_name = self
.schema_name
.clone()
.unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string());
let key = TableNameKey::new(&catalog, &schema_name, table_name);
let catalog_name = &self.catalog_name;
let schema_name = &self.schema_name;
let Some(table_name) = table_name_manager.get(key).await.map_err(BoxedError::new)?
let Some(table_id) =
get_table_id_by_name(table_name_manager, catalog_name, schema_name, table_name)
.await?
else {
println!(
"Table({}) not found",
format_full_table_name(&catalog, &schema_name, table_name)
format_full_table_name(catalog_name, schema_name, table_name)
);
return Ok(());
};
table_name.table_id()
table_id
} else {
// Safety: we have validated that table_id or table_name is not None
self.table_id.unwrap()
@@ -236,6 +240,7 @@ impl GetTableCommand {
table_id: self.table_id,
table_name: self.table_name.clone(),
schema_name: self.schema_name.clone(),
catalog_name: self.catalog_name.clone(),
pretty: self.pretty,
}))
}

View File

@@ -0,0 +1,51 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::ddl::test_util::test_create_physical_table_task;
use common_meta::key::table_route::PhysicalTableRouteValue;
use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::rpc::store::PutRequest;
use store_api::storage::{RegionId, TableId};
use table::metadata::RawTableInfo;
/// Puts a key-value pair into the kv backend.
pub async fn put_key(kv_backend: &KvBackendRef, key: &str, value: &str) {
let put_req = PutRequest::new()
.with_key(key.as_bytes())
.with_value(value.as_bytes());
kv_backend.put(put_req).await.unwrap();
}
/// Prepares the physical table metadata for testing.
///
/// Returns the table info and the table route.
pub async fn prepare_physical_table_metadata(
table_name: &str,
table_id: TableId,
) -> (RawTableInfo, PhysicalTableRouteValue) {
let mut create_physical_table_task = test_create_physical_table_task(table_name);
let table_route = PhysicalTableRouteValue::new(vec![RegionRoute {
region: Region {
id: RegionId::new(table_id, 1),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
}]);
create_physical_table_task.set_table_id(table_id);
(create_physical_table_task.table_info, table_route)
}

View File

@@ -12,9 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_error::ext::BoxedError;
use common_meta::error::Result as CommonMetaResult;
use common_meta::key::table_name::{TableNameKey, TableNameManager};
use common_meta::rpc::KeyValue;
use serde::Serialize;
use store_api::storage::TableId;
/// Decodes a key-value pair into a string.
pub fn decode_key_value(kv: KeyValue) -> CommonMetaResult<(String, String)> {
@@ -34,3 +37,21 @@ where
serde_json::to_string(value).unwrap()
}
}
/// Gets the table id by table name.
pub async fn get_table_id_by_name(
table_name_manager: &TableNameManager,
catalog_name: &str,
schema_name: &str,
table_name: &str,
) -> Result<Option<TableId>, BoxedError> {
let table_name_key = TableNameKey::new(catalog_name, schema_name, table_name);
let Some(table_name_value) = table_name_manager
.get(table_name_key)
.await
.map_err(BoxedError::new)?
else {
return Ok(None);
};
Ok(Some(table_name_value.table_id()))
}

View File

@@ -0,0 +1,369 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod alter_table;
mod create_table;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use clap::Parser;
use client::api::v1::CreateTableExpr;
use client::client_manager::NodeClients;
use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_grpc::channel_manager::ChannelConfig;
use common_meta::error::Error as CommonMetaError;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::NodeManagerRef;
use common_meta::peer::Peer;
use common_meta::rpc::router::{find_leaders, RegionRoute};
use common_telemetry::{error, info, warn};
use futures::TryStreamExt;
use snafu::{ensure, ResultExt};
use store_api::storage::TableId;
use crate::error::{
InvalidArgumentsSnafu, Result, SendRequestToDatanodeSnafu, TableMetadataSnafu, UnexpectedSnafu,
};
use crate::metadata::common::StoreConfig;
use crate::metadata::utils::{FullTableMetadata, IteratorInput, TableMetadataIterator};
use crate::Tool;
/// Repair metadata of logical tables.
#[derive(Debug, Default, Parser)]
pub struct RepairLogicalTablesCommand {
/// The names of the tables to repair.
#[clap(long, value_delimiter = ',', alias = "table-name")]
table_names: Vec<String>,
/// The id of the table to repair.
#[clap(long, value_delimiter = ',', alias = "table-id")]
table_ids: Vec<TableId>,
/// The schema of the tables to repair.
#[clap(long, default_value = DEFAULT_SCHEMA_NAME)]
schema_name: String,
/// The catalog of the tables to repair.
#[clap(long, default_value = DEFAULT_CATALOG_NAME)]
catalog_name: String,
/// Whether to fail fast if any repair operation fails.
#[clap(long)]
fail_fast: bool,
#[clap(flatten)]
store: StoreConfig,
/// The timeout for the client to operate the datanode.
#[clap(long, default_value_t = 30)]
client_timeout_secs: u64,
/// The timeout for the client to connect to the datanode.
#[clap(long, default_value_t = 3)]
client_connect_timeout_secs: u64,
}
impl RepairLogicalTablesCommand {
fn validate(&self) -> Result<()> {
ensure!(
!self.table_names.is_empty() || !self.table_ids.is_empty(),
InvalidArgumentsSnafu {
msg: "You must specify --table-names or --table-ids.",
}
);
Ok(())
}
}
impl RepairLogicalTablesCommand {
pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
self.validate().map_err(BoxedError::new)?;
let kv_backend = self.store.build().await?;
let node_client_channel_config = ChannelConfig::new()
.timeout(Duration::from_secs(self.client_timeout_secs))
.connect_timeout(Duration::from_secs(self.client_connect_timeout_secs));
let node_manager = Arc::new(NodeClients::new(node_client_channel_config));
Ok(Box::new(RepairTool {
table_names: self.table_names.clone(),
table_ids: self.table_ids.clone(),
schema_name: self.schema_name.clone(),
catalog_name: self.catalog_name.clone(),
fail_fast: self.fail_fast,
kv_backend,
node_manager,
}))
}
}
struct RepairTool {
table_names: Vec<String>,
table_ids: Vec<TableId>,
schema_name: String,
catalog_name: String,
fail_fast: bool,
kv_backend: KvBackendRef,
node_manager: NodeManagerRef,
}
#[async_trait]
impl Tool for RepairTool {
async fn do_work(&self) -> std::result::Result<(), BoxedError> {
self.repair_tables().await.map_err(BoxedError::new)
}
}
impl RepairTool {
fn generate_iterator_input(&self) -> Result<IteratorInput> {
if !self.table_names.is_empty() {
let table_names = &self.table_names;
let catalog = &self.catalog_name;
let schema_name = &self.schema_name;
let table_names = table_names
.iter()
.map(|table_name| {
(
catalog.to_string(),
schema_name.to_string(),
table_name.to_string(),
)
})
.collect::<Vec<_>>();
return Ok(IteratorInput::new_table_names(table_names));
} else if !self.table_ids.is_empty() {
return Ok(IteratorInput::new_table_ids(self.table_ids.clone()));
};
InvalidArgumentsSnafu {
msg: "You must specify --table-names or --table-id.",
}
.fail()
}
async fn repair_tables(&self) -> Result<()> {
let input = self.generate_iterator_input()?;
let mut table_metadata_iterator =
Box::pin(TableMetadataIterator::new(self.kv_backend.clone(), input).into_stream());
let table_metadata_manager = TableMetadataManager::new(self.kv_backend.clone());
let mut skipped_table = 0;
let mut success_table = 0;
while let Some(full_table_metadata) = table_metadata_iterator.try_next().await? {
let full_table_name = full_table_metadata.full_table_name();
if !full_table_metadata.is_metric_engine() {
warn!(
"Skipping repair for non-metric engine table: {}",
full_table_name
);
skipped_table += 1;
continue;
}
if full_table_metadata.is_physical_table() {
warn!("Skipping repair for physical table: {}", full_table_name);
skipped_table += 1;
continue;
}
let (physical_table_id, physical_table_route) = table_metadata_manager
.table_route_manager()
.get_physical_table_route(full_table_metadata.table_id)
.await
.context(TableMetadataSnafu)?;
if let Err(err) = self
.repair_table(
&full_table_metadata,
physical_table_id,
&physical_table_route.region_routes,
)
.await
{
error!(
err;
"Failed to repair table: {}, skipped table: {}",
full_table_name,
skipped_table,
);
if self.fail_fast {
return Err(err);
}
} else {
success_table += 1;
}
}
info!(
"Repair logical tables result: {} tables repaired, {} tables skipped",
success_table, skipped_table
);
Ok(())
}
async fn alter_table_on_datanodes(
&self,
full_table_metadata: &FullTableMetadata,
physical_region_routes: &[RegionRoute],
) -> Result<Vec<(Peer, CommonMetaError)>> {
let logical_table_id = full_table_metadata.table_id;
let alter_table_expr = alter_table::generate_alter_table_expr_for_all_columns(
&full_table_metadata.table_info,
)?;
let node_manager = self.node_manager.clone();
let mut failed_peers = Vec::new();
info!(
"Sending alter table requests to all datanodes for table: {}, number of regions:{}.",
full_table_metadata.full_table_name(),
physical_region_routes.len()
);
let leaders = find_leaders(physical_region_routes);
for peer in &leaders {
let alter_table_request = alter_table::make_alter_region_request_for_peer(
logical_table_id,
&alter_table_expr,
full_table_metadata.table_info.ident.version,
peer,
physical_region_routes,
)?;
let datanode = node_manager.datanode(peer).await;
if let Err(err) = datanode.handle(alter_table_request).await {
failed_peers.push((peer.clone(), err));
}
}
Ok(failed_peers)
}
async fn create_table_on_datanode(
&self,
create_table_expr: &CreateTableExpr,
logical_table_id: TableId,
physical_table_id: TableId,
peer: &Peer,
physical_region_routes: &[RegionRoute],
) -> Result<()> {
let node_manager = self.node_manager.clone();
let datanode = node_manager.datanode(peer).await;
let create_table_request = create_table::make_create_region_request_for_peer(
logical_table_id,
physical_table_id,
create_table_expr,
peer,
physical_region_routes,
)?;
datanode
.handle(create_table_request)
.await
.with_context(|_| SendRequestToDatanodeSnafu { peer: peer.clone() })?;
Ok(())
}
async fn repair_table(
&self,
full_table_metadata: &FullTableMetadata,
physical_table_id: TableId,
physical_region_routes: &[RegionRoute],
) -> Result<()> {
let full_table_name = full_table_metadata.full_table_name();
// First we sends alter table requests to all datanodes with all columns.
let failed_peers = self
.alter_table_on_datanodes(full_table_metadata, physical_region_routes)
.await?;
if failed_peers.is_empty() {
info!(
"All alter table requests sent successfully for table: {}",
full_table_name
);
return Ok(());
}
warn!(
"Sending alter table requests to datanodes for table: {} failed for the datanodes: {:?}",
full_table_name,
failed_peers.iter().map(|(peer, _)| peer.id).collect::<Vec<_>>()
);
let create_table_expr =
create_table::generate_create_table_expr(&full_table_metadata.table_info)?;
let mut errors = Vec::new();
for (peer, err) in failed_peers {
if err.status_code() != StatusCode::RegionNotFound {
error!(
err;
"Sending alter table requests to datanode: {} for table: {} failed",
peer.id,
full_table_name,
);
continue;
}
info!(
"Region not found for table: {}, datanode: {}, trying to create the logical table on that datanode",
full_table_name,
peer.id
);
// If the alter table request fails for any datanode, we attempt to create the table on that datanode
// as a fallback mechanism to ensure table consistency across the cluster.
if let Err(err) = self
.create_table_on_datanode(
&create_table_expr,
full_table_metadata.table_id,
physical_table_id,
&peer,
physical_region_routes,
)
.await
{
error!(
err;
"Failed to create table on datanode: {} for table: {}",
peer.id, full_table_name
);
errors.push(err);
if self.fail_fast {
break;
}
} else {
info!(
"Created table on datanode: {} for table: {}",
peer.id, full_table_name
);
}
}
if !errors.is_empty() {
return UnexpectedSnafu {
msg: format!(
"Failed to create table on datanodes for table: {}",
full_table_name,
),
}
.fail();
}
Ok(())
}
}

View File

@@ -0,0 +1,85 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use client::api::v1::alter_table_expr::Kind;
use client::api::v1::region::{region_request, AlterRequests, RegionRequest, RegionRequestHeader};
use client::api::v1::{AddColumn, AddColumns, AlterTableExpr};
use common_meta::ddl::alter_logical_tables::make_alter_region_request;
use common_meta::peer::Peer;
use common_meta::rpc::router::{find_leader_regions, RegionRoute};
use operator::expr_helper::column_schemas_to_defs;
use snafu::ResultExt;
use store_api::storage::{RegionId, TableId};
use table::metadata::RawTableInfo;
use crate::error::{CovertColumnSchemasToDefsSnafu, Result};
/// Generates alter table expression for all columns.
pub fn generate_alter_table_expr_for_all_columns(
table_info: &RawTableInfo,
) -> Result<AlterTableExpr> {
let schema = &table_info.meta.schema;
let mut alter_table_expr = AlterTableExpr {
catalog_name: table_info.catalog_name.to_string(),
schema_name: table_info.schema_name.to_string(),
table_name: table_info.name.to_string(),
..Default::default()
};
let primary_keys = table_info
.meta
.primary_key_indices
.iter()
.map(|i| schema.column_schemas[*i].name.clone())
.collect::<Vec<_>>();
let add_columns = column_schemas_to_defs(schema.column_schemas.clone(), &primary_keys)
.context(CovertColumnSchemasToDefsSnafu)?;
alter_table_expr.kind = Some(Kind::AddColumns(AddColumns {
add_columns: add_columns
.into_iter()
.map(|col| AddColumn {
column_def: Some(col),
location: None,
add_if_not_exists: true,
})
.collect(),
}));
Ok(alter_table_expr)
}
/// Makes an alter region request for a peer.
pub fn make_alter_region_request_for_peer(
logical_table_id: TableId,
alter_table_expr: &AlterTableExpr,
schema_version: u64,
peer: &Peer,
region_routes: &[RegionRoute],
) -> Result<RegionRequest> {
let regions_on_this_peer = find_leader_regions(region_routes, peer);
let mut requests = Vec::with_capacity(regions_on_this_peer.len());
for region_number in &regions_on_this_peer {
let region_id = RegionId::new(logical_table_id, *region_number);
let request = make_alter_region_request(region_id, alter_table_expr, schema_version);
requests.push(request);
}
Ok(RegionRequest {
header: Some(RegionRequestHeader::default()),
body: Some(region_request::Body::Alters(AlterRequests { requests })),
})
}

View File

@@ -0,0 +1,89 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use client::api::v1::region::{region_request, CreateRequests, RegionRequest, RegionRequestHeader};
use client::api::v1::CreateTableExpr;
use common_meta::ddl::create_logical_tables::create_region_request_builder;
use common_meta::ddl::utils::region_storage_path;
use common_meta::peer::Peer;
use common_meta::rpc::router::{find_leader_regions, RegionRoute};
use operator::expr_helper::column_schemas_to_defs;
use snafu::ResultExt;
use store_api::storage::{RegionId, TableId};
use table::metadata::RawTableInfo;
use crate::error::{CovertColumnSchemasToDefsSnafu, Result};
/// Generates a `CreateTableExpr` from a `RawTableInfo`.
pub fn generate_create_table_expr(table_info: &RawTableInfo) -> Result<CreateTableExpr> {
let schema = &table_info.meta.schema;
let primary_keys = table_info
.meta
.primary_key_indices
.iter()
.map(|i| schema.column_schemas[*i].name.clone())
.collect::<Vec<_>>();
let timestamp_index = schema.timestamp_index.as_ref().unwrap();
let time_index = schema.column_schemas[*timestamp_index].name.clone();
let column_defs = column_schemas_to_defs(schema.column_schemas.clone(), &primary_keys)
.context(CovertColumnSchemasToDefsSnafu)?;
let table_options = HashMap::from(&table_info.meta.options);
Ok(CreateTableExpr {
catalog_name: table_info.catalog_name.to_string(),
schema_name: table_info.schema_name.to_string(),
table_name: table_info.name.to_string(),
desc: String::default(),
column_defs,
time_index,
primary_keys,
create_if_not_exists: true,
table_options,
table_id: None,
engine: table_info.meta.engine.to_string(),
})
}
/// Makes a create region request for a peer.
pub fn make_create_region_request_for_peer(
logical_table_id: TableId,
physical_table_id: TableId,
create_table_expr: &CreateTableExpr,
peer: &Peer,
region_routes: &[RegionRoute],
) -> Result<RegionRequest> {
let regions_on_this_peer = find_leader_regions(region_routes, peer);
let mut requests = Vec::with_capacity(regions_on_this_peer.len());
let request_builder =
create_region_request_builder(create_table_expr, physical_table_id).unwrap();
let catalog = &create_table_expr.catalog_name;
let schema = &create_table_expr.schema_name;
let storage_path = region_storage_path(catalog, schema);
for region_number in &regions_on_this_peer {
let region_id = RegionId::new(logical_table_id, *region_number);
let region_request =
request_builder.build_one(region_id, storage_path.clone(), &HashMap::new());
requests.push(region_request);
}
Ok(RegionRequest {
header: Some(RegionRequestHeader::default()),
body: Some(region_request::Body::Creates(CreateRequests { requests })),
})
}

View File

@@ -0,0 +1,178 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::VecDeque;
use async_stream::try_stream;
use common_catalog::consts::METRIC_ENGINE;
use common_catalog::format_full_table_name;
use common_meta::key::table_name::TableNameKey;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use futures::Stream;
use snafu::{OptionExt, ResultExt};
use store_api::storage::TableId;
use table::metadata::RawTableInfo;
use crate::error::{Result, TableMetadataSnafu, UnexpectedSnafu};
/// The input for the iterator.
pub enum IteratorInput {
TableIds(VecDeque<TableId>),
TableNames(VecDeque<(String, String, String)>),
}
impl IteratorInput {
/// Creates a new iterator input from a list of table ids.
pub fn new_table_ids(table_ids: Vec<TableId>) -> Self {
Self::TableIds(table_ids.into())
}
/// Creates a new iterator input from a list of table names.
pub fn new_table_names(table_names: Vec<(String, String, String)>) -> Self {
Self::TableNames(table_names.into())
}
}
/// An iterator for retrieving table metadata from the metadata store.
///
/// This struct provides functionality to iterate over table metadata based on
/// either [`TableId`] and their associated regions or fully qualified table names.
pub struct TableMetadataIterator {
input: IteratorInput,
table_metadata_manager: TableMetadataManager,
}
/// The full table metadata.
pub struct FullTableMetadata {
pub table_id: TableId,
pub table_info: RawTableInfo,
pub table_route: TableRouteValue,
}
impl FullTableMetadata {
/// Returns true if it's [TableRouteValue::Physical].
pub fn is_physical_table(&self) -> bool {
self.table_route.is_physical()
}
/// Returns true if it's a metric engine table.
pub fn is_metric_engine(&self) -> bool {
self.table_info.meta.engine == METRIC_ENGINE
}
/// Returns the full table name.
pub fn full_table_name(&self) -> String {
format_full_table_name(
&self.table_info.catalog_name,
&self.table_info.schema_name,
&self.table_info.name,
)
}
}
impl TableMetadataIterator {
pub fn new(kvbackend: KvBackendRef, input: IteratorInput) -> Self {
let table_metadata_manager = TableMetadataManager::new(kvbackend);
Self {
input,
table_metadata_manager,
}
}
/// Returns the next table metadata.
///
/// This method handles two types of inputs:
/// - TableIds: Returns metadata for a specific [`TableId`].
/// - TableNames: Returns metadata for a table identified by its full name (catalog.schema.table).
///
/// Returns `None` when there are no more tables to process.
pub async fn next(&mut self) -> Result<Option<FullTableMetadata>> {
match &mut self.input {
IteratorInput::TableIds(table_ids) => {
if let Some(table_id) = table_ids.pop_front() {
let full_table_metadata = self.get_table_metadata(table_id).await?;
return Ok(Some(full_table_metadata));
}
}
IteratorInput::TableNames(table_names) => {
if let Some(full_table_name) = table_names.pop_front() {
let table_id = self.get_table_id_by_name(full_table_name).await?;
let full_table_metadata = self.get_table_metadata(table_id).await?;
return Ok(Some(full_table_metadata));
}
}
}
Ok(None)
}
/// Converts the iterator into a stream of table metadata.
pub fn into_stream(mut self) -> impl Stream<Item = Result<FullTableMetadata>> {
try_stream!({
while let Some(full_table_metadata) = self.next().await? {
yield full_table_metadata;
}
})
}
async fn get_table_id_by_name(
&mut self,
(catalog_name, schema_name, table_name): (String, String, String),
) -> Result<TableId> {
let key = TableNameKey::new(&catalog_name, &schema_name, &table_name);
let table_id = self
.table_metadata_manager
.table_name_manager()
.get(key)
.await
.context(TableMetadataSnafu)?
.with_context(|| UnexpectedSnafu {
msg: format!(
"Table not found: {}",
format_full_table_name(&catalog_name, &schema_name, &table_name)
),
})?
.table_id();
Ok(table_id)
}
async fn get_table_metadata(&mut self, table_id: TableId) -> Result<FullTableMetadata> {
let (table_info, table_route) = self
.table_metadata_manager
.get_full_table_info(table_id)
.await
.context(TableMetadataSnafu)?;
let table_info = table_info
.with_context(|| UnexpectedSnafu {
msg: format!("Table info not found for table id: {table_id}"),
})?
.into_inner()
.table_info;
let table_route = table_route
.with_context(|| UnexpectedSnafu {
msg: format!("Table route not found for table id: {table_id}"),
})?
.into_inner();
Ok(FullTableMetadata {
table_id,
table_info,
table_route,
})
}
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::flow::{FlowRequest, FlowResponse};
use api::v1::flow::{DirtyWindowRequest, DirtyWindowRequests, FlowRequest, FlowResponse};
use api::v1::region::InsertRequests;
use common_error::ext::BoxedError;
use common_meta::node_manager::Flownode;
@@ -44,6 +44,16 @@ impl Flownode for FlowRequester {
.map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu)
}
async fn handle_mark_window_dirty(
&self,
req: DirtyWindowRequest,
) -> common_meta::error::Result<FlowResponse> {
self.handle_mark_window_dirty(req)
.await
.map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu)
}
}
impl FlowRequester {
@@ -91,4 +101,20 @@ impl FlowRequester {
.into_inner();
Ok(response)
}
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse> {
let (addr, mut client) = self.client.raw_flow_client()?;
let response = client
.handle_mark_dirty_time_window(DirtyWindowRequests {
requests: vec![req],
})
.await
.or_else(|e| {
let code = e.code();
let err: crate::error::Error = e.into();
Err(BoxedError::new(err)).context(FlowServerSnafu { addr, code })
})?
.into_inner();
Ok(response)
}
}

View File

@@ -67,6 +67,7 @@ metric-engine.workspace = true
mito2.workspace = true
moka.workspace = true
nu-ansi-term = "0.46"
object-store.workspace = true
plugins.workspace = true
prometheus.workspace = true
prost.workspace = true

View File

@@ -280,7 +280,7 @@ mod tests {
use common_config::ENV_VAR_SEP;
use common_test_util::temp_dir::create_named_temp_file;
use datanode::config::{FileConfig, GcsConfig, ObjectStoreConfig, S3Config};
use object_store::config::{FileConfig, GcsConfig, ObjectStoreConfig, S3Config};
use servers::heartbeat_options::HeartbeatOptions;
use super::*;

View File

@@ -93,6 +93,7 @@ impl InstanceBuilder {
MetaClientType::Datanode { member_id },
meta_client_options,
Some(&plugins),
None,
)
.await
.context(MetaClientInitSnafu)?;

View File

@@ -55,14 +55,32 @@ type FlownodeOptions = GreptimeOptions<flow::FlownodeOptions>;
pub struct Instance {
flownode: FlownodeInstance,
// The components of flownode, which make it easier to expand based
// on the components.
#[cfg(feature = "enterprise")]
components: Components,
// Keep the logging guard to prevent the worker from being dropped.
_guard: Vec<WorkerGuard>,
}
#[cfg(feature = "enterprise")]
pub struct Components {
pub catalog_manager: catalog::CatalogManagerRef,
pub fe_client: Arc<FrontendClient>,
pub kv_backend: common_meta::kv_backend::KvBackendRef,
}
impl Instance {
pub fn new(flownode: FlownodeInstance, guard: Vec<WorkerGuard>) -> Self {
pub fn new(
flownode: FlownodeInstance,
#[cfg(feature = "enterprise")] components: Components,
guard: Vec<WorkerGuard>,
) -> Self {
Self {
flownode,
#[cfg(feature = "enterprise")]
components,
_guard: guard,
}
}
@@ -75,6 +93,11 @@ impl Instance {
pub fn flownode_mut(&mut self) -> &mut FlownodeInstance {
&mut self.flownode
}
#[cfg(feature = "enterprise")]
pub fn components(&self) -> &Components {
&self.components
}
}
#[async_trait::async_trait]
@@ -283,6 +306,7 @@ impl StartCommand {
MetaClientType::Flownode { member_id },
meta_config,
None,
None,
)
.await
.context(MetaClientInitSnafu)?;
@@ -349,19 +373,20 @@ impl StartCommand {
let flow_auth_header = get_flow_auth_options(&opts).context(StartFlownodeSnafu)?;
let frontend_client =
FrontendClient::from_meta_client(meta_client.clone(), flow_auth_header);
let frontend_client = Arc::new(frontend_client);
let flownode_builder = FlownodeBuilder::new(
opts.clone(),
plugins,
table_metadata_manager,
catalog_manager.clone(),
flow_metadata_manager,
Arc::new(frontend_client),
frontend_client.clone(),
)
.with_heartbeat_task(heartbeat_task);
let mut flownode = flownode_builder.build().await.context(StartFlownodeSnafu)?;
let services = FlownodeServiceBuilder::new(&opts)
.with_grpc_server(flownode.flownode_server().clone())
.with_default_grpc_server(flownode.flownode_server())
.enable_http_service()
.build()
.context(StartFlownodeSnafu)?;
@@ -393,6 +418,16 @@ impl StartCommand {
.set_frontend_invoker(invoker)
.await;
Ok(Instance::new(flownode, guard))
#[cfg(feature = "enterprise")]
let components = Components {
catalog_manager: catalog_manager.clone(),
fe_client: frontend_client,
kv_backend: cached_meta_backend,
};
#[cfg(not(feature = "enterprise"))]
return Ok(Instance::new(flownode, guard));
#[cfg(feature = "enterprise")]
Ok(Instance::new(flownode, components, guard))
}
}

View File

@@ -313,6 +313,7 @@ impl StartCommand {
MetaClientType::Frontend,
meta_client_options,
Some(&plugins),
None,
)
.await
.context(error::MetaClientInitSnafu)?;

View File

@@ -30,20 +30,16 @@ use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
use common_config::{metadata_store_dir, Configurable, KvBackendConfig};
use common_error::ext::BoxedError;
use common_meta::cache::LayeredCacheRegistryBuilder;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::cluster::{NodeInfo, NodeStatus};
use common_meta::datanode::RegionStat;
use common_meta::ddl::flow_meta::{FlowMetadataAllocator, FlowMetadataAllocatorRef};
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
use common_meta::ddl::flow_meta::FlowMetadataAllocator;
use common_meta::ddl::table_meta::TableMetadataAllocator;
use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl, ProcedureExecutorRef};
use common_meta::ddl_manager::DdlManager;
#[cfg(feature = "enterprise")]
use common_meta::ddl_manager::TriggerDdlManagerRef;
use common_meta::key::flow::flow_state::FlowStat;
use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::NodeManagerRef;
use common_meta::peer::Peer;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::region_registry::LeaderRegionRegistry;
@@ -261,15 +257,34 @@ pub struct Instance {
flownode: FlownodeInstance,
procedure_manager: ProcedureManagerRef,
wal_options_allocator: WalOptionsAllocatorRef,
// The components of standalone, which make it easier to expand based
// on the components.
#[cfg(feature = "enterprise")]
components: Components,
// Keep the logging guard to prevent the worker from being dropped.
_guard: Vec<WorkerGuard>,
}
#[cfg(feature = "enterprise")]
pub struct Components {
pub plugins: Plugins,
pub kv_backend: KvBackendRef,
pub frontend_client: Arc<FrontendClient>,
pub catalog_manager: catalog::CatalogManagerRef,
}
impl Instance {
/// Find the socket addr of a server by its `name`.
pub fn server_addr(&self, name: &str) -> Option<SocketAddr> {
self.frontend.server_handlers().addr(name)
}
#[cfg(feature = "enterprise")]
pub fn components(&self) -> &Components {
&self.components
}
}
#[async_trait]
@@ -550,13 +565,14 @@ impl StartCommand {
// actually make a connection
let (frontend_client, frontend_instance_handler) =
FrontendClient::from_empty_grpc_handler();
let frontend_client = Arc::new(frontend_client);
let flow_builder = FlownodeBuilder::new(
flownode_options,
plugins.clone(),
table_metadata_manager.clone(),
catalog_manager.clone(),
flow_metadata_manager.clone(),
Arc::new(frontend_client.clone()),
frontend_client.clone(),
);
let flownode = flow_builder
.build()
@@ -594,28 +610,36 @@ impl StartCommand {
.await
.context(error::BuildWalOptionsAllocatorSnafu)?;
let wal_options_allocator = Arc::new(wal_options_allocator);
let table_meta_allocator = Arc::new(TableMetadataAllocator::new(
let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
table_id_sequence,
wal_options_allocator.clone(),
));
let flow_meta_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
flow_id_sequence,
));
let ddl_context = DdlContext {
node_manager: node_manager.clone(),
cache_invalidator: layered_cache_registry.clone(),
memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
table_metadata_manager: table_metadata_manager.clone(),
table_metadata_allocator: table_metadata_allocator.clone(),
flow_metadata_manager: flow_metadata_manager.clone(),
flow_metadata_allocator: flow_metadata_allocator.clone(),
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
};
let procedure_manager_c = procedure_manager.clone();
let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager_c, true)
.context(error::InitDdlManagerSnafu)?;
#[cfg(feature = "enterprise")]
let trigger_ddl_manager: Option<TriggerDdlManagerRef> = plugins.get();
let ddl_task_executor = Self::create_ddl_task_executor(
procedure_manager.clone(),
node_manager.clone(),
layered_cache_registry.clone(),
table_metadata_manager,
table_meta_allocator,
flow_metadata_manager,
flow_meta_allocator,
#[cfg(feature = "enterprise")]
trigger_ddl_manager,
)
.await?;
let ddl_manager = {
let trigger_ddl_manager: Option<common_meta::ddl_manager::TriggerDdlManagerRef> =
plugins.get();
ddl_manager.with_trigger_ddl_manager(trigger_ddl_manager)
};
let ddl_task_executor: ProcedureExecutorRef = Arc::new(ddl_manager);
let fe_instance = FrontendBuilder::new(
fe_opts.clone(),
@@ -658,7 +682,7 @@ impl StartCommand {
let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
.context(error::ServersSnafu)?;
let servers = Services::new(opts, fe_instance.clone(), plugins)
let servers = Services::new(opts, fe_instance.clone(), plugins.clone())
.build()
.context(error::StartFrontendSnafu)?;
@@ -669,51 +693,26 @@ impl StartCommand {
export_metrics_task,
};
#[cfg(feature = "enterprise")]
let components = Components {
plugins,
kv_backend,
frontend_client,
catalog_manager,
};
Ok(Instance {
datanode,
frontend,
flownode,
procedure_manager,
wal_options_allocator,
#[cfg(feature = "enterprise")]
components,
_guard: guard,
})
}
#[allow(clippy::too_many_arguments)]
pub async fn create_ddl_task_executor(
procedure_manager: ProcedureManagerRef,
node_manager: NodeManagerRef,
cache_invalidator: CacheInvalidatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_metadata_allocator: TableMetadataAllocatorRef,
flow_metadata_manager: FlowMetadataManagerRef,
flow_metadata_allocator: FlowMetadataAllocatorRef,
#[cfg(feature = "enterprise")] trigger_ddl_manager: Option<TriggerDdlManagerRef>,
) -> Result<ProcedureExecutorRef> {
let procedure_executor: ProcedureExecutorRef = Arc::new(
DdlManager::try_new(
DdlContext {
node_manager,
cache_invalidator,
memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
table_metadata_manager,
table_metadata_allocator,
flow_metadata_manager,
flow_metadata_allocator,
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
},
procedure_manager,
true,
#[cfg(feature = "enterprise")]
trigger_ddl_manager,
)
.context(error::InitDdlManagerSnafu)?,
);
Ok(procedure_executor)
}
pub async fn create_table_metadata_manager(
kv_backend: KvBackendRef,
) -> Result<TableMetadataManagerRef> {
@@ -849,7 +848,7 @@ mod tests {
use common_config::ENV_VAR_SEP;
use common_test_util::temp_dir::create_named_temp_file;
use common_wal::config::DatanodeWalConfig;
use datanode::config::{FileConfig, GcsConfig};
use object_store::config::{FileConfig, GcsConfig};
use super::*;
use crate::options::GlobalOptions;
@@ -968,15 +967,15 @@ mod tests {
assert!(matches!(
&dn_opts.storage.store,
datanode::config::ObjectStoreConfig::File(FileConfig { .. })
object_store::config::ObjectStoreConfig::File(FileConfig { .. })
));
assert_eq!(dn_opts.storage.providers.len(), 2);
assert!(matches!(
dn_opts.storage.providers[0],
datanode::config::ObjectStoreConfig::Gcs(GcsConfig { .. })
object_store::config::ObjectStoreConfig::Gcs(GcsConfig { .. })
));
match &dn_opts.storage.providers[1] {
datanode::config::ObjectStoreConfig::S3(s3_config) => {
object_store::config::ObjectStoreConfig::S3(s3_config) => {
assert_eq!(
"SecretBox<alloc::string::String>([REDACTED])".to_string(),
format!("{:?}", s3_config.access_key_id)

View File

@@ -18,7 +18,7 @@ use cmd::options::GreptimeOptions;
use cmd::standalone::StandaloneOptions;
use common_config::{Configurable, DEFAULT_DATA_HOME};
use common_options::datanode::{ClientOptions, DatanodeClientOptions};
use common_telemetry::logging::{LoggingOptions, DEFAULT_LOGGING_DIR, DEFAULT_OTLP_ENDPOINT};
use common_telemetry::logging::{LoggingOptions, DEFAULT_LOGGING_DIR, DEFAULT_OTLP_HTTP_ENDPOINT};
use common_wal::config::raft_engine::RaftEngineConfig;
use common_wal::config::DatanodeWalConfig;
use datanode::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
@@ -81,7 +81,7 @@ fn test_load_datanode_example_config() {
logging: LoggingOptions {
level: Some("info".to_string()),
dir: format!("{}/{}", DEFAULT_DATA_HOME, DEFAULT_LOGGING_DIR),
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
otlp_endpoint: Some(DEFAULT_OTLP_HTTP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()),
..Default::default()
},
@@ -124,7 +124,7 @@ fn test_load_frontend_example_config() {
logging: LoggingOptions {
level: Some("info".to_string()),
dir: format!("{}/{}", DEFAULT_DATA_HOME, DEFAULT_LOGGING_DIR),
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
otlp_endpoint: Some(DEFAULT_OTLP_HTTP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()),
..Default::default()
},
@@ -172,7 +172,7 @@ fn test_load_metasrv_example_config() {
logging: LoggingOptions {
dir: format!("{}/{}", DEFAULT_DATA_HOME, DEFAULT_LOGGING_DIR),
level: Some("info".to_string()),
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
otlp_endpoint: Some(DEFAULT_OTLP_HTTP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()),
..Default::default()
},
@@ -229,7 +229,7 @@ fn test_load_standalone_example_config() {
logging: LoggingOptions {
level: Some("info".to_string()),
dir: format!("{}/{}", DEFAULT_DATA_HOME, DEFAULT_LOGGING_DIR),
otlp_endpoint: Some(DEFAULT_OTLP_ENDPOINT.to_string()),
otlp_endpoint: Some(DEFAULT_OTLP_HTTP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()),
..Default::default()
},

View File

@@ -14,6 +14,7 @@ common-macro.workspace = true
config.workspace = true
humantime-serde.workspace = true
num_cpus.workspace = true
object-store.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true

View File

@@ -106,7 +106,7 @@ mod tests {
use common_telemetry::logging::LoggingOptions;
use common_test_util::temp_dir::create_named_temp_file;
use common_wal::config::DatanodeWalConfig;
use datanode::config::{ObjectStoreConfig, StorageConfig};
use datanode::config::StorageConfig;
use meta_client::MetaClientOptions;
use serde::{Deserialize, Serialize};
@@ -212,7 +212,7 @@ mod tests {
// Check the configs from environment variables.
match &opts.storage.store {
ObjectStoreConfig::S3(s3_config) => {
object_store::config::ObjectStoreConfig::S3(s3_config) => {
assert_eq!(s3_config.bucket, "mybucket".to_string());
}
_ => panic!("unexpected store type"),

View File

@@ -21,6 +21,7 @@ pub mod error;
pub mod file_format;
pub mod lister;
pub mod object_store;
pub mod parquet_writer;
pub mod share_buffer;
#[cfg(test)]
pub mod test_util;

View File

@@ -0,0 +1,52 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use bytes::Bytes;
use futures::future::BoxFuture;
use object_store::Writer;
use parquet::arrow::async_writer::AsyncFileWriter;
use parquet::errors::ParquetError;
/// Bridges opendal [Writer] with parquet [AsyncFileWriter].
pub struct AsyncWriter {
inner: Writer,
}
impl AsyncWriter {
/// Create a [`AsyncWriter`] by given [`Writer`].
pub fn new(writer: Writer) -> Self {
Self { inner: writer }
}
}
impl AsyncFileWriter for AsyncWriter {
fn write(&mut self, bs: Bytes) -> BoxFuture<'_, parquet::errors::Result<()>> {
Box::pin(async move {
self.inner
.write(bs)
.await
.map_err(|err| ParquetError::External(Box::new(err)))
})
}
fn complete(&mut self) -> BoxFuture<'_, parquet::errors::Result<()>> {
Box::pin(async move {
self.inner
.close()
.await
.map(|_| ())
.map_err(|err| ParquetError::External(Box::new(err)))
})
}
}

View File

@@ -23,7 +23,7 @@ pub mod selector;
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct DisplayProcessId {
pub server_addr: String,
pub id: u64,
pub id: u32,
}
impl Display for DisplayProcessId {
@@ -44,7 +44,7 @@ impl TryFrom<&str> for DisplayProcessId {
let id = split
.next()
.context(error::ParseProcessIdSnafu { s: value })?;
let id = u64::from_str(id)
let id = u32::from_str(id)
.ok()
.context(error::ParseProcessIdSnafu { s: value })?;
Ok(DisplayProcessId { server_addr, id })

View File

@@ -33,6 +33,7 @@ common-version.workspace = true
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datafusion-functions-aggregate-common.workspace = true
datatypes.workspace = true
derive_more = { version = "1", default-features = false, features = ["display"] }
geo = { version = "0.29", optional = true }

View File

@@ -13,6 +13,7 @@
// limitations under the License.
pub mod approximate;
pub mod count_hash;
#[cfg(feature = "geo")]
pub mod geo;
pub mod vector;

View File

@@ -14,8 +14,8 @@
use crate::function_registry::FunctionRegistry;
pub(crate) mod hll;
mod uddsketch;
pub mod hll;
pub mod uddsketch;
pub(crate) struct ApproximateFunction;

View File

@@ -0,0 +1,647 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! `CountHash` / `count_hash` is a hash-based approximate distinct count function.
//!
//! It is a variant of `CountDistinct` that uses a hash function to approximate the
//! distinct count.
//! It is designed to be more efficient than `CountDistinct` for large datasets,
//! but it is not as accurate, as the hash value may be collision.
use std::collections::HashSet;
use std::fmt::Debug;
use std::sync::Arc;
use ahash::RandomState;
use datafusion_common::cast::as_list_array;
use datafusion_common::error::Result;
use datafusion_common::hash_utils::create_hashes;
use datafusion_common::utils::SingleRowListArrayBuilder;
use datafusion_common::{internal_err, not_impl_err, ScalarValue};
use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
use datafusion_expr::utils::{format_state_name, AggregateOrderSensitivity};
use datafusion_expr::{
Accumulator, AggregateUDF, AggregateUDFImpl, EmitTo, GroupsAccumulator, ReversedUDAF,
SetMonotonicity, Signature, TypeSignature, Volatility,
};
use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls::filtered_null_mask;
use datatypes::arrow;
use datatypes::arrow::array::{
Array, ArrayRef, AsArray, BooleanArray, Int64Array, ListArray, UInt64Array,
};
use datatypes::arrow::buffer::{OffsetBuffer, ScalarBuffer};
use datatypes::arrow::datatypes::{DataType, Field};
use crate::function_registry::FunctionRegistry;
type HashValueType = u64;
// read from /dev/urandom 4047821dc6144e4b2abddf23ad4171126a52eeecd26eff2191cf673b965a7875
const RANDOM_SEED_0: u64 = 0x4047821dc6144e4b;
const RANDOM_SEED_1: u64 = 0x2abddf23ad417112;
const RANDOM_SEED_2: u64 = 0x6a52eeecd26eff21;
const RANDOM_SEED_3: u64 = 0x91cf673b965a7875;
impl CountHash {
pub fn register(registry: &FunctionRegistry) {
registry.register_aggr(CountHash::udf_impl());
}
pub fn udf_impl() -> AggregateUDF {
AggregateUDF::new_from_impl(CountHash {
signature: Signature::one_of(
vec![TypeSignature::VariadicAny, TypeSignature::Nullary],
Volatility::Immutable,
),
})
}
}
#[derive(Debug, Clone)]
pub struct CountHash {
signature: Signature,
}
impl AggregateUDFImpl for CountHash {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn name(&self) -> &str {
"count_hash"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Int64)
}
fn is_nullable(&self) -> bool {
false
}
fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<Field>> {
Ok(vec![Field::new_list(
format_state_name(args.name, "count_hash"),
Field::new_list_field(DataType::UInt64, true),
// For count_hash accumulator, null list item stands for an
// empty value set (i.e., all NULL value so far for that group).
true,
)])
}
fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
if acc_args.exprs.len() > 1 {
return not_impl_err!("count_hash with multiple arguments");
}
Ok(Box::new(CountHashAccumulator {
values: HashSet::default(),
random_state: RandomState::with_seeds(
RANDOM_SEED_0,
RANDOM_SEED_1,
RANDOM_SEED_2,
RANDOM_SEED_3,
),
batch_hashes: vec![],
}))
}
fn aliases(&self) -> &[String] {
&[]
}
fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool {
true
}
fn create_groups_accumulator(
&self,
args: AccumulatorArgs,
) -> Result<Box<dyn GroupsAccumulator>> {
if args.exprs.len() > 1 {
return not_impl_err!("count_hash with multiple arguments");
}
Ok(Box::new(CountHashGroupAccumulator::new()))
}
fn reverse_expr(&self) -> ReversedUDAF {
ReversedUDAF::Identical
}
fn order_sensitivity(&self) -> AggregateOrderSensitivity {
AggregateOrderSensitivity::Insensitive
}
fn default_value(&self, _data_type: &DataType) -> Result<ScalarValue> {
Ok(ScalarValue::Int64(Some(0)))
}
fn set_monotonicity(&self, _data_type: &DataType) -> SetMonotonicity {
SetMonotonicity::Increasing
}
}
/// GroupsAccumulator for `count_hash` aggregate function
#[derive(Debug)]
pub struct CountHashGroupAccumulator {
/// One HashSet per group to track distinct values
distinct_sets: Vec<HashSet<HashValueType, RandomState>>,
random_state: RandomState,
batch_hashes: Vec<HashValueType>,
}
impl Default for CountHashGroupAccumulator {
fn default() -> Self {
Self::new()
}
}
impl CountHashGroupAccumulator {
pub fn new() -> Self {
Self {
distinct_sets: vec![],
random_state: RandomState::with_seeds(
RANDOM_SEED_0,
RANDOM_SEED_1,
RANDOM_SEED_2,
RANDOM_SEED_3,
),
batch_hashes: vec![],
}
}
fn ensure_sets(&mut self, total_num_groups: usize) {
if self.distinct_sets.len() < total_num_groups {
self.distinct_sets
.resize_with(total_num_groups, HashSet::default);
}
}
}
impl GroupsAccumulator for CountHashGroupAccumulator {
fn update_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
assert_eq!(values.len(), 1, "count_hash expects a single argument");
self.ensure_sets(total_num_groups);
let array = &values[0];
self.batch_hashes.clear();
self.batch_hashes.resize(array.len(), 0);
let hashes = create_hashes(
&[ArrayRef::clone(array)],
&self.random_state,
&mut self.batch_hashes,
)?;
// Use a pattern similar to accumulate_indices to process rows
// that are not null and pass the filter
let nulls = array.logical_nulls();
match (nulls.as_ref(), opt_filter) {
(None, None) => {
// No nulls, no filter - process all rows
for (row_idx, &group_idx) in group_indices.iter().enumerate() {
self.distinct_sets[group_idx].insert(hashes[row_idx]);
}
}
(Some(nulls), None) => {
// Has nulls, no filter
for (row_idx, (&group_idx, is_valid)) in
group_indices.iter().zip(nulls.iter()).enumerate()
{
if is_valid {
self.distinct_sets[group_idx].insert(hashes[row_idx]);
}
}
}
(None, Some(filter)) => {
// No nulls, has filter
for (row_idx, (&group_idx, filter_value)) in
group_indices.iter().zip(filter.iter()).enumerate()
{
if let Some(true) = filter_value {
self.distinct_sets[group_idx].insert(hashes[row_idx]);
}
}
}
(Some(nulls), Some(filter)) => {
// Has nulls and filter
let iter = filter
.iter()
.zip(group_indices.iter())
.zip(nulls.iter())
.enumerate();
for (row_idx, ((filter_value, &group_idx), is_valid)) in iter {
if is_valid && filter_value == Some(true) {
self.distinct_sets[group_idx].insert(hashes[row_idx]);
}
}
}
}
Ok(())
}
fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
let distinct_sets: Vec<HashSet<u64, RandomState>> =
emit_to.take_needed(&mut self.distinct_sets);
let counts = distinct_sets
.iter()
.map(|set| set.len() as i64)
.collect::<Vec<_>>();
Ok(Arc::new(Int64Array::from(counts)))
}
fn merge_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
_opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
assert_eq!(
values.len(),
1,
"count_hash merge expects a single state array"
);
self.ensure_sets(total_num_groups);
let list_array = as_list_array(&values[0])?;
// For each group in the incoming batch
for (i, &group_idx) in group_indices.iter().enumerate() {
if i < list_array.len() {
let inner_array = list_array.value(i);
let inner_array = inner_array.as_any().downcast_ref::<UInt64Array>().unwrap();
// Add each value to our set for this group
for j in 0..inner_array.len() {
if !inner_array.is_null(j) {
self.distinct_sets[group_idx].insert(inner_array.value(j));
}
}
}
}
Ok(())
}
fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
let distinct_sets: Vec<HashSet<u64, RandomState>> =
emit_to.take_needed(&mut self.distinct_sets);
let mut offsets = Vec::with_capacity(distinct_sets.len() + 1);
offsets.push(0);
let mut curr_len = 0i32;
let mut value_iter = distinct_sets
.into_iter()
.flat_map(|set| {
// build offset
curr_len += set.len() as i32;
offsets.push(curr_len);
// convert into iter
set.into_iter()
})
.peekable();
let data_array: ArrayRef = if value_iter.peek().is_none() {
arrow::array::new_empty_array(&DataType::UInt64) as _
} else {
Arc::new(UInt64Array::from_iter_values(value_iter))
};
let offset_buffer = OffsetBuffer::new(ScalarBuffer::from(offsets));
let list_array = ListArray::new(
Arc::new(Field::new_list_field(DataType::UInt64, true)),
offset_buffer,
data_array,
None,
);
Ok(vec![Arc::new(list_array) as _])
}
fn convert_to_state(
&self,
values: &[ArrayRef],
opt_filter: Option<&BooleanArray>,
) -> Result<Vec<ArrayRef>> {
// For a single hash value per row, create a list array with that value
assert_eq!(values.len(), 1, "count_hash expects a single argument");
let values = ArrayRef::clone(&values[0]);
let offsets = OffsetBuffer::new(ScalarBuffer::from_iter(0..values.len() as i32 + 1));
let nulls = filtered_null_mask(opt_filter, &values);
let list_array = ListArray::new(
Arc::new(Field::new_list_field(DataType::UInt64, true)),
offsets,
values,
nulls,
);
Ok(vec![Arc::new(list_array)])
}
fn supports_convert_to_state(&self) -> bool {
true
}
fn size(&self) -> usize {
// Base size of the struct
let mut size = size_of::<Self>();
// Size of the vector holding the HashSets
size += size_of::<Vec<HashSet<HashValueType, RandomState>>>()
+ self.distinct_sets.capacity() * size_of::<HashSet<HashValueType, RandomState>>();
// Estimate HashSet contents size more efficiently
// Instead of iterating through all values which is expensive, use an approximation
for set in &self.distinct_sets {
// Base size of the HashSet
size += set.capacity() * size_of::<HashValueType>();
}
size
}
}
#[derive(Debug)]
struct CountHashAccumulator {
values: HashSet<HashValueType, RandomState>,
random_state: RandomState,
batch_hashes: Vec<HashValueType>,
}
impl CountHashAccumulator {
// calculating the size for fixed length values, taking first batch size *
// number of batches.
fn fixed_size(&self) -> usize {
size_of_val(self) + (size_of::<HashValueType>() * self.values.capacity())
}
}
impl Accumulator for CountHashAccumulator {
/// Returns the distinct values seen so far as (one element) ListArray.
fn state(&mut self) -> Result<Vec<ScalarValue>> {
let values = self.values.iter().cloned().collect::<Vec<_>>();
let arr = Arc::new(UInt64Array::from(values)) as _;
let list_scalar = SingleRowListArrayBuilder::new(arr).build_list_scalar();
Ok(vec![list_scalar])
}
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
if values.is_empty() {
return Ok(());
}
let arr = &values[0];
if arr.data_type() == &DataType::Null {
return Ok(());
}
self.batch_hashes.clear();
self.batch_hashes.resize(arr.len(), 0);
let hashes = create_hashes(
&[ArrayRef::clone(arr)],
&self.random_state,
&mut self.batch_hashes,
)?;
for hash in hashes.as_slice() {
self.values.insert(*hash);
}
Ok(())
}
/// Merges multiple sets of distinct values into the current set.
///
/// The input to this function is a `ListArray` with **multiple** rows,
/// where each row contains the values from a partial aggregate's phase (e.g.
/// the result of calling `Self::state` on multiple accumulators).
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
if states.is_empty() {
return Ok(());
}
assert_eq!(states.len(), 1, "array_agg states must be singleton!");
let array = &states[0];
let list_array = array.as_list::<i32>();
for inner_array in list_array.iter() {
let Some(inner_array) = inner_array else {
return internal_err!(
"Intermediate results of count_hash should always be non null"
);
};
let hash_array = inner_array.as_any().downcast_ref::<UInt64Array>().unwrap();
for i in 0..hash_array.len() {
self.values.insert(hash_array.value(i));
}
}
Ok(())
}
fn evaluate(&mut self) -> Result<ScalarValue> {
Ok(ScalarValue::Int64(Some(self.values.len() as i64)))
}
fn size(&self) -> usize {
self.fixed_size()
}
}
#[cfg(test)]
mod tests {
use datatypes::arrow::array::{Array, BooleanArray, Int32Array, Int64Array};
use super::*;
fn create_test_accumulator() -> CountHashAccumulator {
CountHashAccumulator {
values: HashSet::default(),
random_state: RandomState::with_seeds(
RANDOM_SEED_0,
RANDOM_SEED_1,
RANDOM_SEED_2,
RANDOM_SEED_3,
),
batch_hashes: vec![],
}
}
#[test]
fn test_count_hash_accumulator() -> Result<()> {
let mut acc = create_test_accumulator();
// Test with some data
let array = Arc::new(Int32Array::from(vec![
Some(1),
Some(2),
Some(3),
Some(1),
Some(2),
None,
])) as ArrayRef;
acc.update_batch(&[array])?;
let result = acc.evaluate()?;
assert_eq!(result, ScalarValue::Int64(Some(4)));
// Test with empty data
let mut acc = create_test_accumulator();
let array = Arc::new(Int32Array::from(vec![] as Vec<Option<i32>>)) as ArrayRef;
acc.update_batch(&[array])?;
let result = acc.evaluate()?;
assert_eq!(result, ScalarValue::Int64(Some(0)));
// Test with only nulls
let mut acc = create_test_accumulator();
let array = Arc::new(Int32Array::from(vec![None, None, None])) as ArrayRef;
acc.update_batch(&[array])?;
let result = acc.evaluate()?;
assert_eq!(result, ScalarValue::Int64(Some(1)));
Ok(())
}
#[test]
fn test_count_hash_accumulator_merge() -> Result<()> {
// Accumulator 1
let mut acc1 = create_test_accumulator();
let array1 = Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])) as ArrayRef;
acc1.update_batch(&[array1])?;
let state1 = acc1.state()?;
// Accumulator 2
let mut acc2 = create_test_accumulator();
let array2 = Arc::new(Int32Array::from(vec![Some(3), Some(4), Some(5)])) as ArrayRef;
acc2.update_batch(&[array2])?;
let state2 = acc2.state()?;
// Merge state1 and state2 into a new accumulator
let mut acc_merged = create_test_accumulator();
let state_array1 = state1[0].to_array()?;
let state_array2 = state2[0].to_array()?;
acc_merged.merge_batch(&[state_array1])?;
acc_merged.merge_batch(&[state_array2])?;
let result = acc_merged.evaluate()?;
// Distinct values are {1, 2, 3, 4, 5}, so count is 5
assert_eq!(result, ScalarValue::Int64(Some(5)));
Ok(())
}
fn create_test_group_accumulator() -> CountHashGroupAccumulator {
CountHashGroupAccumulator::new()
}
#[test]
fn test_count_hash_group_accumulator() -> Result<()> {
let mut acc = create_test_group_accumulator();
let values = Arc::new(Int32Array::from(vec![1, 2, 1, 3, 2, 4, 5])) as ArrayRef;
let group_indices = vec![0, 1, 0, 0, 1, 2, 0];
let total_num_groups = 3;
acc.update_batch(&[values], &group_indices, None, total_num_groups)?;
let result_array = acc.evaluate(EmitTo::All)?;
let result = result_array.as_any().downcast_ref::<Int64Array>().unwrap();
// Group 0: {1, 3, 5} -> 3
// Group 1: {2} -> 1
// Group 2: {4} -> 1
assert_eq!(result.value(0), 3);
assert_eq!(result.value(1), 1);
assert_eq!(result.value(2), 1);
Ok(())
}
#[test]
fn test_count_hash_group_accumulator_with_filter() -> Result<()> {
let mut acc = create_test_group_accumulator();
let values = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6])) as ArrayRef;
let group_indices = vec![0, 0, 1, 1, 2, 2];
let filter = BooleanArray::from(vec![true, false, true, true, false, true]);
let total_num_groups = 3;
acc.update_batch(&[values], &group_indices, Some(&filter), total_num_groups)?;
let result_array = acc.evaluate(EmitTo::All)?;
let result = result_array.as_any().downcast_ref::<Int64Array>().unwrap();
// Group 0: {1} (2 is filtered out) -> 1
// Group 1: {3, 4} -> 2
// Group 2: {6} (5 is filtered out) -> 1
assert_eq!(result.value(0), 1);
assert_eq!(result.value(1), 2);
assert_eq!(result.value(2), 1);
Ok(())
}
#[test]
fn test_count_hash_group_accumulator_merge() -> Result<()> {
// Accumulator 1
let mut acc1 = create_test_group_accumulator();
let values1 = Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as ArrayRef;
let group_indices1 = vec![0, 0, 1, 1];
acc1.update_batch(&[values1], &group_indices1, None, 2)?;
// acc1 state: group 0 -> {1, 2}, group 1 -> {3, 4}
let state1 = acc1.state(EmitTo::All)?;
// Accumulator 2
let mut acc2 = create_test_group_accumulator();
let values2 = Arc::new(Int32Array::from(vec![5, 6, 1, 3])) as ArrayRef;
// Merge into different group indices
let group_indices2 = vec![2, 2, 0, 1];
acc2.update_batch(&[values2], &group_indices2, None, 3)?;
// acc2 state: group 0 -> {1}, group 1 -> {3}, group 2 -> {5, 6}
// Merge state from acc1 into acc2
// We will merge acc1's group 0 into acc2's group 0
// and acc1's group 1 into acc2's group 2
let merge_group_indices = vec![0, 2];
acc2.merge_batch(&state1, &merge_group_indices, None, 3)?;
let result_array = acc2.evaluate(EmitTo::All)?;
let result = result_array.as_any().downcast_ref::<Int64Array>().unwrap();
// Final state of acc2:
// Group 0: {1} U {1, 2} -> {1, 2}, count = 2
// Group 1: {3}, count = 1
// Group 2: {5, 6} U {3, 4} -> {3, 4, 5, 6}, count = 4
assert_eq!(result.value(0), 2);
assert_eq!(result.value(1), 1);
assert_eq!(result.value(2), 4);
Ok(())
}
#[test]
fn test_size() {
let acc = create_test_group_accumulator();
// Just test it doesn't crash and returns a value.
assert!(acc.size() > 0);
}
}

View File

@@ -21,6 +21,7 @@ use once_cell::sync::Lazy;
use crate::admin::AdminFunction;
use crate::aggrs::approximate::ApproximateFunction;
use crate::aggrs::count_hash::CountHash;
use crate::aggrs::vector::VectorFunction as VectorAggrFunction;
use crate::function::{AsyncFunctionRef, Function, FunctionRef};
use crate::function_factory::ScalarFunctionFactory;
@@ -144,6 +145,9 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
// Approximate functions
ApproximateFunction::register(&function_registry);
// CountHash function
CountHash::register(&function_registry);
Arc::new(function_registry)
});

View File

@@ -18,7 +18,7 @@ use std::sync::Arc;
use common_query::error::Result;
use common_query::prelude::{Signature, Volatility};
use datatypes::prelude::{ConcreteDataType, ScalarVector};
use datatypes::vectors::{StringVector, UInt64Vector, VectorRef};
use datatypes::vectors::{StringVector, UInt32Vector, VectorRef};
use derive_more::Display;
use crate::function::{Function, FunctionContext};
@@ -144,7 +144,7 @@ impl Function for PgBackendPidFunction {
fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
let pid = func_ctx.query_ctx.process_id();
Ok(Arc::new(UInt64Vector::from_slice([pid])) as _)
Ok(Arc::new(UInt32Vector::from_slice([pid])) as _)
}
}
@@ -164,7 +164,7 @@ impl Function for ConnectionIdFunction {
fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
let pid = func_ctx.query_ctx.process_id();
Ok(Arc::new(UInt64Vector::from_slice([pid])) as _)
Ok(Arc::new(UInt32Vector::from_slice([pid])) as _)
}
}

View File

@@ -180,6 +180,22 @@ pub fn alter_expr_to_request(table_id: TableId, expr: AlterTableExpr) -> Result<
},
None => return MissingAlterIndexOptionSnafu.fail(),
},
Kind::DropDefaults(o) => {
let names = o
.drop_defaults
.into_iter()
.map(|col| {
ensure!(
!col.column_name.is_empty(),
MissingFieldSnafu {
field: "column_name"
}
);
Ok(col.column_name)
})
.collect::<Result<Vec<_>>>()?;
AlterKind::DropDefaults { names }
}
};
let request = AlterTableRequest {

View File

@@ -201,8 +201,8 @@ impl ChannelManager {
"http"
};
let mut endpoint =
Endpoint::new(format!("{http_prefix}://{addr}")).context(CreateChannelSnafu)?;
let mut endpoint = Endpoint::new(format!("{http_prefix}://{addr}"))
.context(CreateChannelSnafu { addr })?;
if let Some(dur) = self.config().timeout {
endpoint = endpoint.timeout(dur);
@@ -237,7 +237,7 @@ impl ChannelManager {
if let Some(tls_config) = &self.inner.client_tls_config {
endpoint = endpoint
.tls_config(tls_config.clone())
.context(CreateChannelSnafu)?;
.context(CreateChannelSnafu { addr })?;
}
endpoint = endpoint

View File

@@ -52,8 +52,9 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to create gRPC channel"))]
#[snafu(display("Failed to create gRPC channel from '{addr}'"))]
CreateChannel {
addr: String,
#[snafu(source)]
error: tonic::transport::Error,
#[snafu(implicit)]

View File

@@ -17,7 +17,7 @@ workspace = true
anymap2 = "0.13.0"
api.workspace = true
async-recursion = "1.0"
async-stream = "0.3"
async-stream.workspace = true
async-trait.workspace = true
backon = { workspace = true, optional = true }
base64.workspace = true

View File

@@ -25,6 +25,7 @@ use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSn
use common_procedure::{Context, LockKey, Procedure, Status};
use common_telemetry::{error, info, warn};
use futures_util::future;
pub use region_request::make_alter_region_request;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use store_api::metadata::ColumnMetadata;

View File

@@ -12,20 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1;
use api::v1::alter_table_expr::Kind;
use api::v1::region::{
alter_request, region_request, AddColumn, AddColumns, AlterRequest, AlterRequests,
RegionColumnDef, RegionRequest, RegionRequestHeader,
};
use api::v1::{self, AlterTableExpr};
use common_telemetry::tracing_context::TracingContext;
use store_api::storage::RegionId;
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
use crate::error::Result;
use crate::key::table_info::TableInfoValue;
use crate::peer::Peer;
use crate::rpc::ddl::AlterTableTask;
use crate::rpc::router::{find_leader_regions, RegionRoute};
impl AlterLogicalTablesProcedure {
@@ -62,34 +60,37 @@ impl AlterLogicalTablesProcedure {
{
for region_number in &regions_on_this_peer {
let region_id = RegionId::new(table.table_info.ident.table_id, *region_number);
let request = self.make_alter_region_request(region_id, task, table)?;
let request = make_alter_region_request(
region_id,
&task.alter_table,
table.table_info.ident.version,
);
requests.push(request);
}
}
Ok(AlterRequests { requests })
}
}
fn make_alter_region_request(
&self,
region_id: RegionId,
task: &AlterTableTask,
table: &TableInfoValue,
) -> Result<AlterRequest> {
let region_id = region_id.as_u64();
let schema_version = table.table_info.ident.version;
let kind = match &task.alter_table.kind {
Some(Kind::AddColumns(add_columns)) => Some(alter_request::Kind::AddColumns(
to_region_add_columns(add_columns),
)),
_ => unreachable!(), // Safety: we have checked the kind in check_input_tasks
};
/// Makes an alter region request.
pub fn make_alter_region_request(
region_id: RegionId,
alter_table_expr: &AlterTableExpr,
schema_version: u64,
) -> AlterRequest {
let region_id = region_id.as_u64();
let kind = match &alter_table_expr.kind {
Some(Kind::AddColumns(add_columns)) => Some(alter_request::Kind::AddColumns(
to_region_add_columns(add_columns),
)),
_ => unreachable!(), // Safety: we have checked the kind in check_input_tasks
};
Ok(AlterRequest {
region_id,
schema_version,
kind,
})
AlterRequest {
region_id,
schema_version,
kind,
}
}

View File

@@ -135,6 +135,7 @@ fn create_proto_alter_kind(
Kind::UnsetTableOptions(v) => Ok(Some(alter_request::Kind::UnsetTableOptions(v.clone()))),
Kind::SetIndex(v) => Ok(Some(alter_request::Kind::SetIndex(v.clone()))),
Kind::UnsetIndex(v) => Ok(Some(alter_request::Kind::UnsetIndex(v.clone()))),
Kind::DropDefaults(v) => Ok(Some(alter_request::Kind::DropDefaults(v.clone()))),
}
}

View File

@@ -61,7 +61,8 @@ impl AlterTableProcedure {
| AlterKind::SetTableOptions { .. }
| AlterKind::UnsetTableOptions { .. }
| AlterKind::SetIndex { .. }
| AlterKind::UnsetIndex { .. } => {}
| AlterKind::UnsetIndex { .. }
| AlterKind::DropDefaults { .. } => {}
}
Ok(new_info)

View File

@@ -25,6 +25,7 @@ use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSn
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use common_telemetry::{debug, error, warn};
use futures::future;
pub use region_request::create_region_request_builder;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use store_api::metadata::ColumnMetadata;

View File

@@ -15,16 +15,16 @@
use std::collections::HashMap;
use api::v1::region::{region_request, CreateRequests, RegionRequest, RegionRequestHeader};
use api::v1::CreateTableExpr;
use common_telemetry::debug;
use common_telemetry::tracing_context::TracingContext;
use store_api::storage::RegionId;
use store_api::storage::{RegionId, TableId};
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::ddl::create_table_template::{build_template, CreateRequestBuilder};
use crate::ddl::utils::region_storage_path;
use crate::error::Result;
use crate::peer::Peer;
use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{find_leader_regions, RegionRoute};
impl CreateLogicalTablesProcedure {
@@ -45,13 +45,15 @@ impl CreateLogicalTablesProcedure {
let catalog = &create_table_expr.catalog_name;
let schema = &create_table_expr.schema_name;
let logical_table_id = task.table_info.ident.table_id;
let physical_table_id = self.data.physical_table_id;
let storage_path = region_storage_path(catalog, schema);
let request_builder = self.create_region_request_builder(task)?;
let request_builder =
create_region_request_builder(&task.create_table, physical_table_id)?;
for region_number in &regions_on_this_peer {
let region_id = RegionId::new(logical_table_id, *region_number);
let one_region_request =
request_builder.build_one(region_id, storage_path.clone(), &HashMap::new())?;
request_builder.build_one(region_id, storage_path.clone(), &HashMap::new());
requests.push(one_region_request);
}
}
@@ -69,16 +71,13 @@ impl CreateLogicalTablesProcedure {
body: Some(region_request::Body::Creates(CreateRequests { requests })),
}))
}
fn create_region_request_builder(
&self,
task: &CreateTableTask,
) -> Result<CreateRequestBuilder> {
let create_expr = &task.create_table;
let template = build_template(create_expr)?;
Ok(CreateRequestBuilder::new(
template,
Some(self.data.physical_table_id),
))
}
}
/// Creates a region request builder.
pub fn create_region_request_builder(
create_table_expr: &CreateTableExpr,
physical_table_id: TableId,
) -> Result<CreateRequestBuilder> {
let template = build_template(create_table_expr)?;
Ok(CreateRequestBuilder::new(template, Some(physical_table_id)))
}

View File

@@ -218,11 +218,8 @@ impl CreateTableProcedure {
let mut requests = Vec::with_capacity(regions.len());
for region_number in regions {
let region_id = RegionId::new(self.table_id(), region_number);
let create_region_request = request_builder.build_one(
region_id,
storage_path.clone(),
region_wal_options,
)?;
let create_region_request =
request_builder.build_one(region_id, storage_path.clone(), region_wal_options);
requests.push(PbRegionRequest::Create(create_region_request));
}

View File

@@ -105,12 +105,12 @@ impl CreateRequestBuilder {
&self.template
}
pub(crate) fn build_one(
pub fn build_one(
&self,
region_id: RegionId,
storage_path: String,
region_wal_options: &HashMap<RegionNumber, String>,
) -> Result<CreateRequest> {
) -> CreateRequest {
let mut request = self.template.clone();
request.region_id = region_id.as_u64();
@@ -130,6 +130,6 @@ impl CreateRequestBuilder {
);
}
Ok(request)
request
}
}

View File

@@ -13,7 +13,6 @@
// limitations under the License.
use std::any::Any;
use std::collections::HashMap;
use common_procedure::Status;
use common_telemetry::info;
@@ -25,7 +24,7 @@ use table::table_name::TableName;
use crate::ddl::drop_database::cursor::DropDatabaseCursor;
use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
use crate::ddl::drop_table::executor::DropTableExecutor;
use crate::ddl::utils::extract_region_wal_options;
use crate::ddl::utils::get_region_wal_options;
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::key::table_route::TableRouteValue;
@@ -109,17 +108,12 @@ impl State for DropDatabaseExecutor {
);
// Deletes topic-region mapping if dropping physical table
let region_wal_options =
if let TableRouteValue::Physical(table_route_value) = &table_route_value {
let datanode_table_values = ddl_ctx
.table_metadata_manager
.datanode_table_manager()
.regions(self.physical_table_id, table_route_value)
.await?;
extract_region_wal_options(&datanode_table_values)?
} else {
HashMap::new()
};
let region_wal_options = get_region_wal_options(
&ddl_ctx.table_metadata_manager,
&table_route_value,
self.physical_table_id,
)
.await?;
executor
.on_destroy_metadata(ddl_ctx, &table_route_value, &region_wal_options)

View File

@@ -32,6 +32,7 @@ impl MockDatanodeHandler for () {
Ok(RegionResponse {
affected_rows: 0,
extensions: Default::default(),
metadata: Vec::new(),
})
}

View File

@@ -42,7 +42,8 @@ use crate::error::{
};
use crate::key::datanode_table::DatanodeTableValue;
use crate::key::table_name::TableNameKey;
use crate::key::TableMetadataManagerRef;
use crate::key::table_route::TableRouteValue;
use crate::key::{TableMetadataManager, TableMetadataManagerRef};
use crate::peer::Peer;
use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{find_follower_regions, find_followers, RegionRoute};
@@ -187,6 +188,25 @@ pub fn parse_region_wal_options(
Ok(region_wal_options)
}
/// Gets the wal options for a table.
pub async fn get_region_wal_options(
table_metadata_manager: &TableMetadataManager,
table_route_value: &TableRouteValue,
physical_table_id: TableId,
) -> Result<HashMap<RegionNumber, WalOptions>> {
let region_wal_options =
if let TableRouteValue::Physical(table_route_value) = &table_route_value {
let datanode_table_values = table_metadata_manager
.datanode_table_manager()
.regions(physical_table_id, table_route_value)
.await?;
extract_region_wal_options(&datanode_table_values)?
} else {
HashMap::new()
};
Ok(region_wal_options)
}
/// Extracts region wal options from [DatanodeTableValue]s.
pub fn extract_region_wal_options(
datanode_table_values: &Vec<DatanodeTableValue>,

View File

@@ -50,7 +50,11 @@ use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
#[cfg(feature = "enterprise")]
use crate::rpc::ddl::trigger::CreateTriggerTask;
#[cfg(feature = "enterprise")]
use crate::rpc::ddl::trigger::DropTriggerTask;
#[cfg(feature = "enterprise")]
use crate::rpc::ddl::DdlTask::CreateTrigger;
#[cfg(feature = "enterprise")]
use crate::rpc::ddl::DdlTask::DropTrigger;
use crate::rpc::ddl::DdlTask::{
AlterDatabase, AlterLogicalTables, AlterTable, CreateDatabase, CreateFlow, CreateLogicalTables,
CreateTable, CreateView, DropDatabase, DropFlow, DropLogicalTables, DropTable, DropView,
@@ -91,6 +95,14 @@ pub trait TriggerDdlManager: Send + Sync {
query_context: QueryContext,
) -> Result<SubmitDdlTaskResponse>;
async fn drop_trigger(
&self,
drop_trigger_task: DropTriggerTask,
procedure_manager: ProcedureManagerRef,
ddl_context: DdlContext,
query_context: QueryContext,
) -> Result<SubmitDdlTaskResponse>;
fn as_any(&self) -> &dyn std::any::Any;
}
@@ -125,13 +137,12 @@ impl DdlManager {
ddl_context: DdlContext,
procedure_manager: ProcedureManagerRef,
register_loaders: bool,
#[cfg(feature = "enterprise")] trigger_ddl_manager: Option<TriggerDdlManagerRef>,
) -> Result<Self> {
let manager = Self {
ddl_context,
procedure_manager,
#[cfg(feature = "enterprise")]
trigger_ddl_manager,
trigger_ddl_manager: None,
};
if register_loaders {
manager.register_loaders()?;
@@ -139,6 +150,15 @@ impl DdlManager {
Ok(manager)
}
#[cfg(feature = "enterprise")]
pub fn with_trigger_ddl_manager(
mut self,
trigger_ddl_manager: Option<TriggerDdlManagerRef>,
) -> Self {
self.trigger_ddl_manager = trigger_ddl_manager;
self
}
/// Returns the [TableMetadataManagerRef].
pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
&self.ddl_context.table_metadata_manager
@@ -640,6 +660,28 @@ async fn handle_drop_flow_task(
})
}
#[cfg(feature = "enterprise")]
async fn handle_drop_trigger_task(
ddl_manager: &DdlManager,
drop_trigger_task: DropTriggerTask,
query_context: QueryContext,
) -> Result<SubmitDdlTaskResponse> {
let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
return UnsupportedSnafu {
operation: "drop trigger",
}
.fail();
};
m.drop_trigger(
drop_trigger_task,
ddl_manager.procedure_manager.clone(),
ddl_manager.ddl_context.clone(),
query_context,
)
.await
}
async fn handle_drop_view_task(
ddl_manager: &DdlManager,
drop_view_task: DropViewTask,
@@ -827,6 +869,11 @@ impl ProcedureExecutor for DdlManager {
handle_create_flow_task(self, create_flow_task, request.query_context.into())
.await
}
DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await,
CreateView(create_view_task) => {
handle_create_view_task(self, create_view_task).await
}
DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await,
#[cfg(feature = "enterprise")]
CreateTrigger(create_trigger_task) => {
handle_create_trigger_task(
@@ -836,11 +883,11 @@ impl ProcedureExecutor for DdlManager {
)
.await
}
DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await,
CreateView(create_view_task) => {
handle_create_view_task(self, create_view_task).await
#[cfg(feature = "enterprise")]
DropTrigger(drop_trigger_task) => {
handle_drop_trigger_task(self, drop_trigger_task, request.query_context.into())
.await
}
DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await,
}
}
.trace(span)
@@ -964,8 +1011,6 @@ mod tests {
},
procedure_manager.clone(),
true,
#[cfg(feature = "enterprise")]
None,
);
let expected_loaders = vec![

View File

@@ -109,7 +109,7 @@ pub mod table_name;
pub mod table_route;
#[cfg(any(test, feature = "testing"))]
pub mod test_utils;
mod tombstone;
pub mod tombstone;
pub mod topic_name;
pub mod topic_region;
pub mod txn_helper;
@@ -535,6 +535,29 @@ impl TableMetadataManager {
}
}
/// Creates a new `TableMetadataManager` with a custom tombstone prefix.
pub fn new_with_custom_tombstone_prefix(
kv_backend: KvBackendRef,
tombstone_prefix: &str,
) -> Self {
Self {
table_name_manager: TableNameManager::new(kv_backend.clone()),
table_info_manager: TableInfoManager::new(kv_backend.clone()),
view_info_manager: ViewInfoManager::new(kv_backend.clone()),
datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
catalog_manager: CatalogManager::new(kv_backend.clone()),
schema_manager: SchemaManager::new(kv_backend.clone()),
table_route_manager: TableRouteManager::new(kv_backend.clone()),
tombstone_manager: TombstoneManager::new_with_prefix(
kv_backend.clone(),
tombstone_prefix,
),
topic_name_manager: TopicNameManager::new(kv_backend.clone()),
topic_region_manager: TopicRegionManager::new(kv_backend.clone()),
kv_backend,
}
}
pub async fn init(&self) -> Result<()> {
let catalog_name = CatalogNameKey::new(DEFAULT_CATALOG_NAME);
@@ -925,7 +948,7 @@ impl TableMetadataManager {
) -> Result<()> {
let keys =
self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
self.tombstone_manager.create(keys).await
self.tombstone_manager.create(keys).await.map(|_| ())
}
/// Deletes metadata tombstone for table **permanently**.
@@ -939,7 +962,10 @@ impl TableMetadataManager {
) -> Result<()> {
let table_metadata_keys =
self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
self.tombstone_manager.delete(table_metadata_keys).await
self.tombstone_manager
.delete(table_metadata_keys)
.await
.map(|_| ())
}
/// Restores metadata for table.
@@ -953,7 +979,7 @@ impl TableMetadataManager {
) -> Result<()> {
let keys =
self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?;
self.tombstone_manager.restore(keys).await
self.tombstone_manager.restore(keys).await.map(|_| ())
}
/// Deletes metadata for table **permanently**.

View File

@@ -48,6 +48,11 @@ impl TableRouteKey {
pub fn new(table_id: TableId) -> Self {
Self { table_id }
}
/// Returns the range prefix of the table route key.
pub fn range_prefix() -> Vec<u8> {
format!("{}/", TABLE_ROUTE_PREFIX).into_bytes()
}
}
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]

View File

@@ -25,20 +25,32 @@ use crate::rpc::store::BatchGetRequest;
/// [TombstoneManager] provides the ability to:
/// - logically delete values
/// - restore the deleted values
pub(crate) struct TombstoneManager {
pub struct TombstoneManager {
kv_backend: KvBackendRef,
tombstone_prefix: String,
}
const TOMBSTONE_PREFIX: &str = "__tombstone/";
fn to_tombstone(key: &[u8]) -> Vec<u8> {
[TOMBSTONE_PREFIX.as_bytes(), key].concat()
}
impl TombstoneManager {
/// Returns [TombstoneManager].
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
Self {
kv_backend,
tombstone_prefix: TOMBSTONE_PREFIX.to_string(),
}
}
/// Returns [TombstoneManager] with a custom tombstone prefix.
pub fn new_with_prefix(kv_backend: KvBackendRef, prefix: &str) -> Self {
Self {
kv_backend,
tombstone_prefix: prefix.to_string(),
}
}
pub fn to_tombstone(&self, key: &[u8]) -> Vec<u8> {
[self.tombstone_prefix.as_bytes(), key].concat()
}
/// Moves value to `dest_key`.
@@ -67,7 +79,7 @@ impl TombstoneManager {
(txn, TxnOpGetResponseSet::filter(src_key))
}
async fn move_values_inner(&self, keys: &[Vec<u8>], dest_keys: &[Vec<u8>]) -> Result<()> {
async fn move_values_inner(&self, keys: &[Vec<u8>], dest_keys: &[Vec<u8>]) -> Result<usize> {
ensure!(
keys.len() == dest_keys.len(),
error::UnexpectedSnafu {
@@ -102,7 +114,7 @@ impl TombstoneManager {
.unzip();
let mut resp = self.kv_backend.txn(Txn::merge_all(txns)).await?;
if resp.succeeded {
return Ok(());
return Ok(keys.len());
}
let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
// Updates results.
@@ -125,7 +137,9 @@ impl TombstoneManager {
}
/// Moves values to `dest_key`.
async fn move_values(&self, keys: Vec<Vec<u8>>, dest_keys: Vec<Vec<u8>>) -> Result<()> {
///
/// Returns the number of keys that were moved.
async fn move_values(&self, keys: Vec<Vec<u8>>, dest_keys: Vec<Vec<u8>>) -> Result<usize> {
let chunk_size = self.kv_backend.max_txn_ops() / 2;
if keys.len() > chunk_size {
let keys_chunks = keys.chunks(chunk_size).collect::<Vec<_>>();
@@ -134,7 +148,7 @@ impl TombstoneManager {
self.move_values_inner(keys, dest_keys).await?;
}
Ok(())
Ok(keys.len())
} else {
self.move_values_inner(&keys, &dest_keys).await
}
@@ -145,11 +159,13 @@ impl TombstoneManager {
/// Preforms to:
/// - deletes origin values.
/// - stores tombstone values.
pub(crate) async fn create(&self, keys: Vec<Vec<u8>>) -> Result<()> {
///
/// Returns the number of keys that were moved.
pub async fn create(&self, keys: Vec<Vec<u8>>) -> Result<usize> {
let (keys, dest_keys): (Vec<_>, Vec<_>) = keys
.into_iter()
.map(|key| {
let tombstone_key = to_tombstone(&key);
let tombstone_key = self.to_tombstone(&key);
(key, tombstone_key)
})
.unzip();
@@ -162,11 +178,13 @@ impl TombstoneManager {
/// Preforms to:
/// - restore origin value.
/// - deletes tombstone values.
pub(crate) async fn restore(&self, keys: Vec<Vec<u8>>) -> Result<()> {
///
/// Returns the number of keys that were restored.
pub async fn restore(&self, keys: Vec<Vec<u8>>) -> Result<usize> {
let (keys, dest_keys): (Vec<_>, Vec<_>) = keys
.into_iter()
.map(|key| {
let tombstone_key = to_tombstone(&key);
let tombstone_key = self.to_tombstone(&key);
(tombstone_key, key)
})
.unzip();
@@ -175,16 +193,18 @@ impl TombstoneManager {
}
/// Deletes tombstones values for the specified `keys`.
pub(crate) async fn delete(&self, keys: Vec<Vec<u8>>) -> Result<()> {
///
/// Returns the number of keys that were deleted.
pub async fn delete(&self, keys: Vec<Vec<u8>>) -> Result<usize> {
let operations = keys
.iter()
.map(|key| TxnOp::Delete(to_tombstone(key)))
.map(|key| TxnOp::Delete(self.to_tombstone(key)))
.collect::<Vec<_>>();
let txn = Txn::new().and_then(operations);
// Always success.
let _ = self.kv_backend.txn(txn).await?;
Ok(())
Ok(keys.len())
}
}
@@ -194,7 +214,6 @@ mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use super::to_tombstone;
use crate::error::Error;
use crate::key::tombstone::TombstoneManager;
use crate::kv_backend::memory::MemoryKvBackend;
@@ -246,7 +265,7 @@ mod tests {
assert!(!kv_backend.exists(b"foo").await.unwrap());
assert_eq!(
kv_backend
.get(&to_tombstone(b"bar"))
.get(&tombstone_manager.to_tombstone(b"bar"))
.await
.unwrap()
.unwrap()
@@ -255,7 +274,7 @@ mod tests {
);
assert_eq!(
kv_backend
.get(&to_tombstone(b"foo"))
.get(&tombstone_manager.to_tombstone(b"foo"))
.await
.unwrap()
.unwrap()
@@ -287,7 +306,7 @@ mod tests {
kv_backend.clone(),
&[MoveValue {
key: b"bar".to_vec(),
dest_key: to_tombstone(b"bar"),
dest_key: tombstone_manager.to_tombstone(b"bar"),
value: b"baz".to_vec(),
}],
)
@@ -364,7 +383,7 @@ mod tests {
.iter()
.map(|(key, value)| MoveValue {
key: key.clone(),
dest_key: to_tombstone(key),
dest_key: tombstone_manager.to_tombstone(key),
value: value.clone(),
})
.collect::<Vec<_>>();
@@ -409,7 +428,7 @@ mod tests {
.iter()
.map(|(key, value)| MoveValue {
key: key.clone(),
dest_key: to_tombstone(key),
dest_key: tombstone_manager.to_tombstone(key),
value: value.clone(),
})
.collect::<Vec<_>>();
@@ -462,7 +481,7 @@ mod tests {
.iter()
.map(|(key, value)| MoveValue {
key: key.clone(),
dest_key: to_tombstone(key),
dest_key: tombstone_manager.to_tombstone(key),
value: value.clone(),
})
.collect::<Vec<_>>();
@@ -502,7 +521,7 @@ mod tests {
.iter()
.map(|(key, value)| MoveValue {
key: key.clone(),
dest_key: to_tombstone(key),
dest_key: tombstone_manager.to_tombstone(key),
value: value.clone(),
})
.collect::<Vec<_>>();
@@ -537,7 +556,7 @@ mod tests {
.iter()
.map(|(key, value)| MoveValue {
key: key.clone(),
dest_key: to_tombstone(key),
dest_key: tombstone_manager.to_tombstone(key),
value: value.clone(),
})
.collect::<Vec<_>>();

View File

@@ -54,20 +54,20 @@ impl<T> MemoryKvBackend<T> {
kvs.clear();
}
#[cfg(test)]
#[cfg(any(test, feature = "testing"))]
/// Returns true if the `kvs` is empty.
pub fn is_empty(&self) -> bool {
self.kvs.read().unwrap().is_empty()
}
#[cfg(test)]
#[cfg(any(test, feature = "testing"))]
/// Returns the `kvs`.
pub fn dump(&self) -> BTreeMap<Vec<u8>, Vec<u8>> {
let kvs = self.kvs.read().unwrap();
kvs.clone()
}
#[cfg(test)]
#[cfg(any(test, feature = "testing"))]
/// Returns the length of `kvs`
pub fn len(&self) -> usize {
self.kvs.read().unwrap().len()

View File

@@ -15,7 +15,7 @@
use std::sync::Arc;
use api::region::RegionResponse;
use api::v1::flow::{FlowRequest, FlowResponse};
use api::v1::flow::{DirtyWindowRequest, FlowRequest, FlowResponse};
use api::v1::region::{InsertRequests, RegionRequest};
pub use common_base::AffectedRows;
use common_query::request::QueryRequest;
@@ -42,6 +42,9 @@ pub trait Flownode: Send + Sync {
async fn handle(&self, request: FlowRequest) -> Result<FlowResponse>;
async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse>;
/// Handles requests to mark time window as dirty.
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse>;
}
pub type FlownodeRef = Arc<dyn Flownode>;

View File

@@ -69,6 +69,8 @@ pub enum DdlTask {
AlterDatabase(AlterDatabaseTask),
CreateFlow(CreateFlowTask),
DropFlow(DropFlowTask),
#[cfg(feature = "enterprise")]
DropTrigger(trigger::DropTriggerTask),
CreateView(CreateViewTask),
DropView(DropViewTask),
#[cfg(feature = "enterprise")]
@@ -259,6 +261,18 @@ impl TryFrom<Task> for DdlTask {
.fail()
}
}
Task::DropTriggerTask(drop_trigger) => {
#[cfg(feature = "enterprise")]
return Ok(DdlTask::DropTrigger(drop_trigger.try_into()?));
#[cfg(not(feature = "enterprise"))]
{
let _ = drop_trigger;
crate::error::UnsupportedSnafu {
operation: "drop trigger",
}
.fail()
}
}
}
}
}
@@ -311,6 +325,8 @@ impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
DdlTask::DropView(task) => Task::DropViewTask(task.into()),
#[cfg(feature = "enterprise")]
DdlTask::CreateTrigger(task) => Task::CreateTriggerTask(task.into()),
#[cfg(feature = "enterprise")]
DdlTask::DropTrigger(task) => Task::DropTriggerTask(task.into()),
};
Ok(Self {

View File

@@ -1,10 +1,13 @@
use std::collections::HashMap;
use std::time::Duration;
use api::v1::meta::CreateTriggerTask as PbCreateTriggerTask;
use api::v1::meta::{
CreateTriggerTask as PbCreateTriggerTask, DropTriggerTask as PbDropTriggerTask,
};
use api::v1::notify_channel::ChannelType as PbChannelType;
use api::v1::{
CreateTriggerExpr, NotifyChannel as PbNotifyChannel, WebhookOptions as PbWebhookOptions,
CreateTriggerExpr as PbCreateTriggerExpr, DropTriggerExpr as PbDropTriggerExpr,
NotifyChannel as PbNotifyChannel, WebhookOptions as PbWebhookOptions,
};
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
@@ -56,7 +59,7 @@ impl From<CreateTriggerTask> for PbCreateTriggerTask {
.map(PbNotifyChannel::from)
.collect();
let expr = CreateTriggerExpr {
let expr = PbCreateTriggerExpr {
catalog_name: task.catalog_name,
trigger_name: task.trigger_name,
create_if_not_exists: task.if_not_exists,
@@ -139,17 +142,86 @@ impl TryFrom<PbNotifyChannel> for NotifyChannel {
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DropTriggerTask {
pub catalog_name: String,
pub trigger_name: String,
pub drop_if_exists: bool,
}
impl From<DropTriggerTask> for PbDropTriggerTask {
fn from(task: DropTriggerTask) -> Self {
let expr = PbDropTriggerExpr {
catalog_name: task.catalog_name,
trigger_name: task.trigger_name,
drop_if_exists: task.drop_if_exists,
};
PbDropTriggerTask {
drop_trigger: Some(expr),
}
}
}
impl TryFrom<PbDropTriggerTask> for DropTriggerTask {
type Error = error::Error;
fn try_from(task: PbDropTriggerTask) -> Result<Self> {
let expr = task.drop_trigger.context(error::InvalidProtoMsgSnafu {
err_msg: "expected drop_trigger",
})?;
Ok(DropTriggerTask {
catalog_name: expr.catalog_name,
trigger_name: expr.trigger_name,
drop_if_exists: expr.drop_if_exists,
})
}
}
impl DdlTask {
/// Creates a [`DdlTask`] to create a trigger.
pub fn new_create_trigger(expr: CreateTriggerTask) -> Self {
DdlTask::CreateTrigger(expr)
}
/// Creates a [`DdlTask`] to drop a trigger.
pub fn new_drop_trigger(expr: DropTriggerTask) -> Self {
DdlTask::DropTrigger(expr)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_convert_drop_trigger_task() {
let original = DropTriggerTask {
catalog_name: "test_catalog".to_string(),
trigger_name: "test_trigger".to_string(),
drop_if_exists: true,
};
let pb_task: PbDropTriggerTask = original.clone().into();
let expr = pb_task.drop_trigger.as_ref().unwrap();
assert_eq!(expr.catalog_name, "test_catalog");
assert_eq!(expr.trigger_name, "test_trigger");
assert!(expr.drop_if_exists);
let round_tripped = DropTriggerTask::try_from(pb_task).unwrap();
assert_eq!(original.catalog_name, round_tripped.catalog_name);
assert_eq!(original.trigger_name, round_tripped.trigger_name);
assert_eq!(original.drop_if_exists, round_tripped.drop_if_exists);
// Test invalid case where drop_trigger is None
let invalid_task = PbDropTriggerTask { drop_trigger: None };
let result = DropTriggerTask::try_from(invalid_task);
assert!(result.is_err());
}
#[test]
fn test_convert_create_trigger_task() {
let original = CreateTriggerTask {

View File

@@ -15,7 +15,7 @@
use std::sync::Arc;
use api::region::RegionResponse;
use api::v1::flow::{FlowRequest, FlowResponse};
use api::v1::flow::{DirtyWindowRequest, FlowRequest, FlowResponse};
use api::v1::region::{InsertRequests, RegionRequest};
pub use common_base::AffectedRows;
use common_query::request::QueryRequest;
@@ -67,6 +67,14 @@ pub trait MockFlownodeHandler: Sync + Send + Clone {
) -> Result<FlowResponse> {
unimplemented!()
}
async fn handle_mark_window_dirty(
&self,
_peer: &Peer,
_req: DirtyWindowRequest,
) -> Result<FlowResponse> {
unimplemented!()
}
}
/// A mock struct implements [NodeManager] only implement the `datanode` method.
@@ -134,6 +142,10 @@ impl<T: MockFlownodeHandler> Flownode for MockNode<T> {
async fn handle_inserts(&self, requests: InsertRequests) -> Result<FlowResponse> {
self.handler.handle_inserts(&self.peer, requests).await
}
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse> {
self.handler.handle_mark_window_dirty(&self.peer, req).await
}
}
#[async_trait::async_trait]

View File

@@ -14,7 +14,7 @@
pub mod columnar_value;
pub mod error;
mod function;
pub mod function;
pub mod logical_plan;
pub mod prelude;
pub mod request;

View File

@@ -12,7 +12,6 @@ deadlock_detection = ["parking_lot/deadlock_detection"]
workspace = true
[dependencies]
atty = "0.2"
backtrace = "0.3"
common-error.workspace = true
console-subscriber = { version = "0.1", optional = true }
@@ -23,7 +22,7 @@ once_cell.workspace = true
opentelemetry = { version = "0.21.0", default-features = false, features = [
"trace",
] }
opentelemetry-otlp = { version = "0.14.0", features = ["tokio"] }
opentelemetry-otlp = { version = "0.14.0", features = ["tokio", "http-proto", "reqwest-client"] }
opentelemetry-semantic-conventions = "0.13.0"
opentelemetry_sdk = { version = "0.21.0", features = ["rt-tokio"] }
parking_lot.workspace = true

View File

@@ -14,12 +14,13 @@
//! logging stuffs, inspired by databend
use std::env;
use std::io::IsTerminal;
use std::sync::{Arc, Mutex, Once};
use std::time::Duration;
use once_cell::sync::{Lazy, OnceCell};
use opentelemetry::{global, KeyValue};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_otlp::{Protocol, SpanExporterBuilder, WithExportConfig};
use opentelemetry_sdk::propagation::TraceContextPropagator;
use opentelemetry_sdk::trace::Sampler;
use opentelemetry_semantic_conventions::resource;
@@ -35,7 +36,11 @@ use tracing_subscriber::{filter, EnvFilter, Registry};
use crate::tracing_sampler::{create_sampler, TracingSampleOptions};
pub const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317";
/// The default endpoint when use gRPC exporter protocol.
pub const DEFAULT_OTLP_GRPC_ENDPOINT: &str = "http://localhost:4317";
/// The default endpoint when use HTTP exporter protocol.
pub const DEFAULT_OTLP_HTTP_ENDPOINT: &str = "http://localhost:4318";
/// The default logs directory.
pub const DEFAULT_LOGGING_DIR: &str = "logs";
@@ -66,11 +71,25 @@ pub struct LoggingOptions {
/// Whether to enable tracing with OTLP. Default is false.
pub enable_otlp_tracing: bool,
/// The endpoint of OTLP. Default is "http://localhost:4317".
/// The endpoint of OTLP. Default is "http://localhost:4318".
pub otlp_endpoint: Option<String>,
/// The tracing sample ratio.
pub tracing_sample_ratio: Option<TracingSampleOptions>,
/// The protocol of OTLP export.
pub otlp_export_protocol: Option<OtlpExportProtocol>,
}
/// The protocol of OTLP export.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum OtlpExportProtocol {
/// GRPC protocol.
Grpc,
/// HTTP protocol with binary protobuf.
Http,
}
/// The options of slow query.
@@ -146,6 +165,7 @@ impl Default for LoggingOptions {
append_stdout: true,
// Rotation hourly, 24 files per day, keeps info log files of 30 days
max_log_files: 720,
otlp_export_protocol: None,
}
}
}
@@ -221,14 +241,14 @@ pub fn init_global_logging(
Layer::new()
.json()
.with_writer(writer)
.with_ansi(atty::is(atty::Stream::Stdout))
.with_ansi(std::io::stdout().is_terminal())
.boxed(),
)
} else {
Some(
Layer::new()
.with_writer(writer)
.with_ansi(atty::is(atty::Stream::Stdout))
.with_ansi(std::io::stdout().is_terminal())
.boxed(),
)
}
@@ -387,22 +407,9 @@ pub fn init_global_logging(
KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()),
]));
let exporter = opentelemetry_otlp::new_exporter().tonic().with_endpoint(
opts.otlp_endpoint
.as_ref()
.map(|e| {
if e.starts_with("http") {
e.to_string()
} else {
format!("http://{}", e)
}
})
.unwrap_or(DEFAULT_OTLP_ENDPOINT.to_string()),
);
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(exporter)
.with_exporter(build_otlp_exporter(opts))
.with_trace_config(trace_config)
.install_batch(opentelemetry_sdk::runtime::Tokio)
.expect("otlp tracer install failed");
@@ -420,6 +427,42 @@ pub fn init_global_logging(
guards
}
fn build_otlp_exporter(opts: &LoggingOptions) -> SpanExporterBuilder {
let protocol = opts
.otlp_export_protocol
.clone()
.unwrap_or(OtlpExportProtocol::Http);
let endpoint = opts
.otlp_endpoint
.as_ref()
.map(|e| {
if e.starts_with("http") {
e.to_string()
} else {
format!("http://{}", e)
}
})
.unwrap_or_else(|| match protocol {
OtlpExportProtocol::Grpc => DEFAULT_OTLP_GRPC_ENDPOINT.to_string(),
OtlpExportProtocol::Http => DEFAULT_OTLP_HTTP_ENDPOINT.to_string(),
});
match protocol {
OtlpExportProtocol::Grpc => SpanExporterBuilder::Tonic(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(endpoint),
),
OtlpExportProtocol::Http => SpanExporterBuilder::Http(
opentelemetry_otlp::new_exporter()
.http()
.with_endpoint(endpoint)
.with_protocol(Protocol::HttpBinary),
),
}
}
fn build_slow_query_logger<S>(
opts: &LoggingOptions,
slow_query_opts: Option<&SlowQueryOptions>,

View File

@@ -14,10 +14,7 @@
//! Datanode configurations
use core::time::Duration;
use common_base::readable_size::ReadableSize;
use common_base::secrets::{ExposeSecret, SecretString};
use common_config::{Configurable, DEFAULT_DATA_HOME};
pub use common_procedure::options::ProcedureConfig;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
@@ -27,6 +24,7 @@ use file_engine::config::EngineConfig as FileEngineConfig;
use meta_client::MetaClientOptions;
use metric_engine::config::EngineConfig as MetricEngineConfig;
use mito2::config::MitoConfig;
pub(crate) use object_store::config::ObjectStoreConfig;
use query::options::QueryOptions;
use serde::{Deserialize, Serialize};
use servers::export_metrics::ExportMetricsOption;
@@ -36,53 +34,6 @@ use servers::http::HttpOptions;
pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::gb(5);
/// Object storage config
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type")]
pub enum ObjectStoreConfig {
File(FileConfig),
S3(S3Config),
Oss(OssConfig),
Azblob(AzblobConfig),
Gcs(GcsConfig),
}
impl ObjectStoreConfig {
/// Returns the object storage type name, such as `S3`, `Oss` etc.
pub fn provider_name(&self) -> &'static str {
match self {
Self::File(_) => "File",
Self::S3(_) => "S3",
Self::Oss(_) => "Oss",
Self::Azblob(_) => "Azblob",
Self::Gcs(_) => "Gcs",
}
}
/// Returns true when it's a remote object storage such as AWS s3 etc.
pub fn is_object_storage(&self) -> bool {
!matches!(self, Self::File(_))
}
/// Returns the object storage configuration name, return the provider name if it's empty.
pub fn config_name(&self) -> &str {
let name = match self {
// file storage doesn't support name
Self::File(_) => self.provider_name(),
Self::S3(s3) => &s3.name,
Self::Oss(oss) => &oss.name,
Self::Azblob(az) => &az.name,
Self::Gcs(gcs) => &gcs.name,
};
if name.trim().is_empty() {
return self.provider_name();
}
name
}
}
/// Storage engine config
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(default)]
@@ -112,248 +63,6 @@ impl Default for StorageConfig {
}
}
#[derive(Debug, Clone, Serialize, Default, Deserialize, Eq, PartialEq)]
#[serde(default)]
pub struct FileConfig {}
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
#[serde(default)]
pub struct ObjectStorageCacheConfig {
/// The local file cache directory
pub cache_path: Option<String>,
/// The cache capacity in bytes
pub cache_capacity: Option<ReadableSize>,
}
/// The http client options to the storage.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(default)]
pub struct HttpClientConfig {
/// The maximum idle connection per host allowed in the pool.
pub(crate) pool_max_idle_per_host: u32,
/// The timeout for only the connect phase of a http client.
#[serde(with = "humantime_serde")]
pub(crate) connect_timeout: Duration,
/// The total request timeout, applied from when the request starts connecting until the response body has finished.
/// Also considered a total deadline.
#[serde(with = "humantime_serde")]
pub(crate) timeout: Duration,
/// The timeout for idle sockets being kept-alive.
#[serde(with = "humantime_serde")]
pub(crate) pool_idle_timeout: Duration,
}
impl Default for HttpClientConfig {
fn default() -> Self {
Self {
pool_max_idle_per_host: 1024,
connect_timeout: Duration::from_secs(30),
timeout: Duration::from_secs(30),
pool_idle_timeout: Duration::from_secs(90),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct S3Config {
pub name: String,
pub bucket: String,
pub root: String,
#[serde(skip_serializing)]
pub access_key_id: SecretString,
#[serde(skip_serializing)]
pub secret_access_key: SecretString,
pub endpoint: Option<String>,
pub region: Option<String>,
/// Enable virtual host style so that opendal will send API requests in virtual host style instead of path style.
/// By default, opendal will send API to https://s3.us-east-1.amazonaws.com/bucket_name
/// Enabled, opendal will send API to https://bucket_name.s3.us-east-1.amazonaws.com
pub enable_virtual_host_style: bool,
#[serde(flatten)]
pub cache: ObjectStorageCacheConfig,
pub http_client: HttpClientConfig,
}
impl PartialEq for S3Config {
fn eq(&self, other: &Self) -> bool {
self.name == other.name
&& self.bucket == other.bucket
&& self.root == other.root
&& self.access_key_id.expose_secret() == other.access_key_id.expose_secret()
&& self.secret_access_key.expose_secret() == other.secret_access_key.expose_secret()
&& self.endpoint == other.endpoint
&& self.region == other.region
&& self.enable_virtual_host_style == other.enable_virtual_host_style
&& self.cache == other.cache
&& self.http_client == other.http_client
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct OssConfig {
pub name: String,
pub bucket: String,
pub root: String,
#[serde(skip_serializing)]
pub access_key_id: SecretString,
#[serde(skip_serializing)]
pub access_key_secret: SecretString,
pub endpoint: String,
#[serde(flatten)]
pub cache: ObjectStorageCacheConfig,
pub http_client: HttpClientConfig,
}
impl PartialEq for OssConfig {
fn eq(&self, other: &Self) -> bool {
self.name == other.name
&& self.bucket == other.bucket
&& self.root == other.root
&& self.access_key_id.expose_secret() == other.access_key_id.expose_secret()
&& self.access_key_secret.expose_secret() == other.access_key_secret.expose_secret()
&& self.endpoint == other.endpoint
&& self.cache == other.cache
&& self.http_client == other.http_client
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct AzblobConfig {
pub name: String,
pub container: String,
pub root: String,
#[serde(skip_serializing)]
pub account_name: SecretString,
#[serde(skip_serializing)]
pub account_key: SecretString,
pub endpoint: String,
pub sas_token: Option<String>,
#[serde(flatten)]
pub cache: ObjectStorageCacheConfig,
pub http_client: HttpClientConfig,
}
impl PartialEq for AzblobConfig {
fn eq(&self, other: &Self) -> bool {
self.name == other.name
&& self.container == other.container
&& self.root == other.root
&& self.account_name.expose_secret() == other.account_name.expose_secret()
&& self.account_key.expose_secret() == other.account_key.expose_secret()
&& self.endpoint == other.endpoint
&& self.sas_token == other.sas_token
&& self.cache == other.cache
&& self.http_client == other.http_client
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct GcsConfig {
pub name: String,
pub root: String,
pub bucket: String,
pub scope: String,
#[serde(skip_serializing)]
pub credential_path: SecretString,
#[serde(skip_serializing)]
pub credential: SecretString,
pub endpoint: String,
#[serde(flatten)]
pub cache: ObjectStorageCacheConfig,
pub http_client: HttpClientConfig,
}
impl PartialEq for GcsConfig {
fn eq(&self, other: &Self) -> bool {
self.name == other.name
&& self.root == other.root
&& self.bucket == other.bucket
&& self.scope == other.scope
&& self.credential_path.expose_secret() == other.credential_path.expose_secret()
&& self.credential.expose_secret() == other.credential.expose_secret()
&& self.endpoint == other.endpoint
&& self.cache == other.cache
&& self.http_client == other.http_client
}
}
impl Default for S3Config {
fn default() -> Self {
Self {
name: String::default(),
bucket: String::default(),
root: String::default(),
access_key_id: SecretString::from(String::default()),
secret_access_key: SecretString::from(String::default()),
enable_virtual_host_style: false,
endpoint: Option::default(),
region: Option::default(),
cache: ObjectStorageCacheConfig::default(),
http_client: HttpClientConfig::default(),
}
}
}
impl Default for OssConfig {
fn default() -> Self {
Self {
name: String::default(),
bucket: String::default(),
root: String::default(),
access_key_id: SecretString::from(String::default()),
access_key_secret: SecretString::from(String::default()),
endpoint: String::default(),
cache: ObjectStorageCacheConfig::default(),
http_client: HttpClientConfig::default(),
}
}
}
impl Default for AzblobConfig {
fn default() -> Self {
Self {
name: String::default(),
container: String::default(),
root: String::default(),
account_name: SecretString::from(String::default()),
account_key: SecretString::from(String::default()),
endpoint: String::default(),
sas_token: Option::default(),
cache: ObjectStorageCacheConfig::default(),
http_client: HttpClientConfig::default(),
}
}
}
impl Default for GcsConfig {
fn default() -> Self {
Self {
name: String::default(),
root: String::default(),
bucket: String::default(),
scope: String::default(),
credential_path: SecretString::from(String::default()),
credential: SecretString::from(String::default()),
endpoint: String::default(),
cache: ObjectStorageCacheConfig::default(),
http_client: HttpClientConfig::default(),
}
}
}
impl Default for ObjectStoreConfig {
fn default() -> Self {
ObjectStoreConfig::File(FileConfig {})
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[serde(default)]
pub struct DatanodeOptions {
@@ -463,37 +172,6 @@ mod tests {
let _parsed: DatanodeOptions = toml::from_str(&toml_string).unwrap();
}
#[test]
fn test_config_name() {
let object_store_config = ObjectStoreConfig::default();
assert_eq!("File", object_store_config.config_name());
let s3_config = ObjectStoreConfig::S3(S3Config::default());
assert_eq!("S3", s3_config.config_name());
assert_eq!("S3", s3_config.provider_name());
let s3_config = ObjectStoreConfig::S3(S3Config {
name: "test".to_string(),
..Default::default()
});
assert_eq!("test", s3_config.config_name());
assert_eq!("S3", s3_config.provider_name());
}
#[test]
fn test_is_object_storage() {
let store = ObjectStoreConfig::default();
assert!(!store.is_object_storage());
let s3_config = ObjectStoreConfig::S3(S3Config::default());
assert!(s3_config.is_object_storage());
let oss_config = ObjectStoreConfig::Oss(OssConfig::default());
assert!(oss_config.is_object_storage());
let gcs_config = ObjectStoreConfig::Gcs(GcsConfig::default());
assert!(gcs_config.is_object_storage());
let azblob_config = ObjectStoreConfig::Azblob(AzblobConfig::default());
assert!(azblob_config.is_object_storage());
}
#[test]
fn test_secstr() {
let toml_str = r#"
@@ -514,4 +192,48 @@ mod tests {
_ => unreachable!(),
}
}
#[test]
fn test_skip_ssl_validation_config() {
// Test with skip_ssl_validation = true
let toml_str_true = r#"
[storage]
type = "S3"
[storage.http_client]
skip_ssl_validation = true
"#;
let opts: DatanodeOptions = toml::from_str(toml_str_true).unwrap();
match &opts.storage.store {
ObjectStoreConfig::S3(cfg) => {
assert!(cfg.http_client.skip_ssl_validation);
}
_ => panic!("Expected S3 config"),
}
// Test with skip_ssl_validation = false
let toml_str_false = r#"
[storage]
type = "S3"
[storage.http_client]
skip_ssl_validation = false
"#;
let opts: DatanodeOptions = toml::from_str(toml_str_false).unwrap();
match &opts.storage.store {
ObjectStoreConfig::S3(cfg) => {
assert!(!cfg.http_client.skip_ssl_validation);
}
_ => panic!("Expected S3 config"),
}
// Test default value (should be false)
let toml_str_default = r#"
[storage]
type = "S3"
"#;
let opts: DatanodeOptions = toml::from_str(toml_str_default).unwrap();
match &opts.storage.store {
ObjectStoreConfig::S3(cfg) => {
assert!(!cfg.http_client.skip_ssl_validation);
}
_ => panic!("Expected S3 config"),
}
}
}

View File

@@ -142,14 +142,6 @@ pub enum Error {
source: Box<log_store::error::Error>,
},
#[snafu(display("Failed to init backend"))]
InitBackend {
#[snafu(source)]
error: object_store::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid SQL, error: {}", msg))]
InvalidSql { msg: String },
@@ -387,6 +379,29 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to serialize json"))]
SerializeJson {
#[snafu(source)]
error: serde_json::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed object store operation"))]
ObjectStore {
source: object_store::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to build cache store"))]
BuildCacheStore {
#[snafu(source)]
error: object_store::Error,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -439,8 +454,6 @@ impl ErrorExt for Error {
StartServer { source, .. } | ShutdownServer { source, .. } => source.status_code(),
InitBackend { .. } => StatusCode::StorageUnavailable,
OpenLogStore { source, .. } => source.status_code(),
MetaClientInit { source, .. } => source.status_code(),
UnsupportedOutput { .. } => StatusCode::Unsupported,
@@ -457,6 +470,10 @@ impl ErrorExt for Error {
StatusCode::RegionBusy
}
MissingCache { .. } => StatusCode::Internal,
SerializeJson { .. } => StatusCode::Internal,
ObjectStore { source, .. } => source.status_code(),
BuildCacheStore { .. } => StatusCode::StorageUnavailable,
}
}

View File

@@ -20,12 +20,14 @@ use std::time::Duration;
use api::region::RegionResponse;
use api::v1::region::sync_request::ManifestInfo;
use api::v1::region::{region_request, RegionResponse as RegionResponseV1, SyncRequest};
use api::v1::region::{
region_request, ListMetadataRequest, RegionResponse as RegionResponseV1, SyncRequest,
};
use api::v1::{ResponseHeader, Status};
use arrow_flight::{FlightData, Ticket};
use async_trait::async_trait;
use bytes::Bytes;
use common_error::ext::BoxedError;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_query::request::QueryRequest;
use common_query::OutputData;
@@ -47,6 +49,7 @@ pub use query::dummy_catalog::{
DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef,
};
use query::QueryEngineRef;
use serde_json;
use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult};
use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
use servers::grpc::region_server::RegionServerHandler;
@@ -71,10 +74,10 @@ use tonic::{Request, Response, Result as TonicResult};
use crate::error::{
self, BuildRegionRequestsSnafu, ConcurrentQueryLimiterClosedSnafu,
ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, HandleBatchDdlRequestSnafu,
HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu, NewPlanDecoderSnafu,
RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu, Result,
StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, GetRegionMetadataSnafu,
HandleBatchDdlRequestSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu,
NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu,
Result, SerializeJsonSnafu, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
};
use crate::event_listener::RegionServerEventListenerRef;
@@ -138,12 +141,12 @@ impl RegionServer {
/// Finds the region's engine by its id. If the region is not ready, returns `None`.
pub fn find_engine(&self, region_id: RegionId) -> Result<Option<RegionEngineRef>> {
self.inner
.get_engine(region_id, &RegionChange::None)
.map(|x| match x {
CurrentEngine::Engine(engine) => Some(engine),
CurrentEngine::EarlyReturn(_) => None,
})
match self.inner.get_engine(region_id, &RegionChange::None) {
Ok(CurrentEngine::Engine(engine)) => Ok(Some(engine)),
Ok(CurrentEngine::EarlyReturn(_)) => Ok(None),
Err(error::Error::RegionNotFound { .. }) => Ok(None),
Err(err) => Err(err),
}
}
#[tracing::instrument(skip_all)]
@@ -412,6 +415,7 @@ impl RegionServer {
Ok(RegionResponse {
affected_rows,
extensions,
metadata: Vec::new(),
})
}
@@ -441,6 +445,7 @@ impl RegionServer {
Ok(RegionResponse {
affected_rows,
extensions,
metadata: Vec::new(),
})
}
@@ -473,6 +478,48 @@ impl RegionServer {
.map(|_| RegionResponse::new(AffectedRows::default()))
}
/// Handles the ListMetadata request and retrieves metadata for specified regions.
///
/// Returns the results as a JSON-serialized list in the [RegionResponse]. It serializes
/// non-existing regions as `null`.
#[tracing::instrument(skip_all)]
async fn handle_list_metadata_request(
&self,
request: &ListMetadataRequest,
) -> Result<RegionResponse> {
let mut region_metadatas = Vec::new();
// Collect metadata for each region
for region_id in &request.region_ids {
let region_id = RegionId::from_u64(*region_id);
// Get the engine.
let Some(engine) = self.find_engine(region_id)? else {
region_metadatas.push(None);
continue;
};
match engine.get_metadata(region_id).await {
Ok(metadata) => region_metadatas.push(Some(metadata)),
Err(err) => {
if err.status_code() == StatusCode::RegionNotFound {
region_metadatas.push(None);
} else {
Err(err).with_context(|_| GetRegionMetadataSnafu {
engine: engine.name(),
region_id,
})?;
}
}
}
}
// Serialize metadata to JSON
let json_result = serde_json::to_vec(&region_metadatas).context(SerializeJsonSnafu)?;
let response = RegionResponse::from_metadata(json_result);
Ok(response)
}
/// Sync region manifest and registers new opened logical regions.
pub async fn sync_region(
&self,
@@ -504,6 +551,10 @@ impl RegionServerHandler for RegionServer {
region_request::Body::Sync(sync_request) => {
self.handle_sync_region_request(sync_request).await
}
region_request::Body::ListMetadata(list_metadata_request) => {
self.handle_list_metadata_request(list_metadata_request)
.await
}
_ => self.handle_requests_in_serial(request).await,
}
.map_err(BoxedError::new)
@@ -518,6 +569,7 @@ impl RegionServerHandler for RegionServer {
}),
affected_rows: response.affected_rows as _,
extensions: response.extensions,
metadata: response.metadata,
})
}
}
@@ -897,6 +949,7 @@ impl RegionServerInner {
Ok(RegionResponse {
affected_rows: result.affected_rows,
extensions: result.extensions,
metadata: Vec::new(),
})
}
Err(err) => {
@@ -967,6 +1020,7 @@ impl RegionServerInner {
Ok(RegionResponse {
affected_rows: result.affected_rows,
extensions: result.extensions,
metadata: Vec::new(),
})
}
Err(err) => {
@@ -1242,8 +1296,11 @@ mod tests {
use std::assert_matches::assert_matches;
use api::v1::SemanticType;
use common_error::ext::ErrorExt;
use datatypes::prelude::ConcreteDataType;
use mito2::test_util::CreateRequestBuilder;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionDropRequest, RegionOpenRequest, RegionTruncateRequest};
use store_api::storage::RegionId;
@@ -1605,4 +1662,175 @@ mod tests {
let forth_query = p.acquire().await;
assert!(forth_query.is_ok());
}
fn mock_region_metadata(region_id: RegionId) -> RegionMetadata {
let mut metadata_builder = RegionMetadataBuilder::new(region_id);
metadata_builder.push_column_metadata(ColumnMetadata {
column_schema: datatypes::schema::ColumnSchema::new(
"timestamp",
ConcreteDataType::timestamp_nanosecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 0,
});
metadata_builder.push_column_metadata(ColumnMetadata {
column_schema: datatypes::schema::ColumnSchema::new(
"file",
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 1,
});
metadata_builder.push_column_metadata(ColumnMetadata {
column_schema: datatypes::schema::ColumnSchema::new(
"message",
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Field,
column_id: 2,
});
metadata_builder.primary_key(vec![1]);
metadata_builder.build().unwrap()
}
#[tokio::test]
async fn test_handle_list_metadata_request() {
common_telemetry::init_default_ut_logging();
let mut mock_region_server = mock_region_server();
let region_id_1 = RegionId::new(1, 0);
let region_id_2 = RegionId::new(2, 0);
let metadata_1 = mock_region_metadata(region_id_1);
let metadata_2 = mock_region_metadata(region_id_2);
let metadatas = vec![Some(metadata_1.clone()), Some(metadata_2.clone())];
let metadata_1 = Arc::new(metadata_1);
let metadata_2 = Arc::new(metadata_2);
let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
MITO_ENGINE_NAME,
Box::new(move |region_id| {
if region_id == region_id_1 {
Ok(metadata_1.clone())
} else if region_id == region_id_2 {
Ok(metadata_2.clone())
} else {
error::RegionNotFoundSnafu { region_id }.fail()
}
}),
);
mock_region_server.register_engine(engine.clone());
mock_region_server
.inner
.region_map
.insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
mock_region_server
.inner
.region_map
.insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
// All regions exist.
let list_metadata_request = ListMetadataRequest {
region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
};
let response = mock_region_server
.handle_list_metadata_request(&list_metadata_request)
.await
.unwrap();
let decoded_metadata: Vec<Option<RegionMetadata>> =
serde_json::from_slice(&response.metadata).unwrap();
assert_eq!(metadatas, decoded_metadata);
}
#[tokio::test]
async fn test_handle_list_metadata_not_found() {
common_telemetry::init_default_ut_logging();
let mut mock_region_server = mock_region_server();
let region_id_1 = RegionId::new(1, 0);
let region_id_2 = RegionId::new(2, 0);
let metadata_1 = mock_region_metadata(region_id_1);
let metadatas = vec![Some(metadata_1.clone()), None];
let metadata_1 = Arc::new(metadata_1);
let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
MITO_ENGINE_NAME,
Box::new(move |region_id| {
if region_id == region_id_1 {
Ok(metadata_1.clone())
} else {
error::RegionNotFoundSnafu { region_id }.fail()
}
}),
);
mock_region_server.register_engine(engine.clone());
mock_region_server
.inner
.region_map
.insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
// Not in region map.
let list_metadata_request = ListMetadataRequest {
region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
};
let response = mock_region_server
.handle_list_metadata_request(&list_metadata_request)
.await
.unwrap();
let decoded_metadata: Vec<Option<RegionMetadata>> =
serde_json::from_slice(&response.metadata).unwrap();
assert_eq!(metadatas, decoded_metadata);
// Not in region engine.
mock_region_server
.inner
.region_map
.insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
let response = mock_region_server
.handle_list_metadata_request(&list_metadata_request)
.await
.unwrap();
let decoded_metadata: Vec<Option<RegionMetadata>> =
serde_json::from_slice(&response.metadata).unwrap();
assert_eq!(metadatas, decoded_metadata);
}
#[tokio::test]
async fn test_handle_list_metadata_failed() {
common_telemetry::init_default_ut_logging();
let mut mock_region_server = mock_region_server();
let region_id_1 = RegionId::new(1, 0);
let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
MITO_ENGINE_NAME,
Box::new(move |region_id| {
error::UnexpectedSnafu {
violated: format!("Failed to get region {region_id}"),
}
.fail()
}),
);
mock_region_server.register_engine(engine.clone());
mock_region_server
.inner
.region_map
.insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
// Failed to get.
let list_metadata_request = ListMetadataRequest {
region_ids: vec![region_id_1.as_u64()],
};
mock_region_server
.handle_list_metadata_request(&list_metadata_request)
.await
.unwrap_err();
}
}

View File

@@ -17,7 +17,7 @@ use std::sync::Arc;
use common_config::Configurable;
use servers::grpc::builder::GrpcServerBuilder;
use servers::grpc::{GrpcServer, GrpcServerConfig};
use servers::grpc::GrpcServer;
use servers::http::HttpServerBuilder;
use servers::metrics_handler::MetricsHandler;
use servers::server::{ServerHandler, ServerHandlers};
@@ -92,13 +92,7 @@ impl<'a> DatanodeServiceBuilder<'a> {
opts: &DatanodeOptions,
region_server: &RegionServer,
) -> GrpcServerBuilder {
let config = GrpcServerConfig {
max_recv_message_size: opts.grpc.max_recv_message_size.as_bytes() as usize,
max_send_message_size: opts.grpc.max_send_message_size.as_bytes() as usize,
tls: opts.grpc.tls.clone(),
};
GrpcServerBuilder::new(config, region_server.runtime())
GrpcServerBuilder::new(opts.grpc.as_config(), region_server.runtime())
.flight_handler(Arc::new(region_server.clone()))
.region_server_handler(Arc::new(region_server.clone()))
}

View File

@@ -14,45 +14,22 @@
//! object storage utilities
mod azblob;
pub mod fs;
mod gcs;
mod oss;
mod s3;
use std::path;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use common_telemetry::{info, warn};
use mito2::access_layer::{ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR};
use object_store::factory::new_raw_object_store;
use object_store::layers::{LruCacheLayer, RetryInterceptor, RetryLayer};
use object_store::services::Fs;
use object_store::util::{join_dir, normalize_dir, with_instrument_layers};
use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder};
use object_store::util::{clean_temp_dir, join_dir, with_instrument_layers};
use object_store::{
Access, Error, ObjectStore, ObjectStoreBuilder, ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR,
};
use snafu::prelude::*;
use crate::config::{HttpClientConfig, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
use crate::error::{self, BuildHttpClientSnafu, CreateDirSnafu, Result};
pub(crate) async fn new_raw_object_store(
store: &ObjectStoreConfig,
data_home: &str,
) -> Result<ObjectStore> {
let data_home = normalize_dir(data_home);
let object_store = match store {
ObjectStoreConfig::File(file_config) => {
fs::new_fs_object_store(&data_home, file_config).await
}
ObjectStoreConfig::S3(s3_config) => s3::new_s3_object_store(s3_config).await,
ObjectStoreConfig::Oss(oss_config) => oss::new_oss_object_store(oss_config).await,
ObjectStoreConfig::Azblob(azblob_config) => {
azblob::new_azblob_object_store(azblob_config).await
}
ObjectStoreConfig::Gcs(gcs_config) => gcs::new_gcs_object_store(gcs_config).await,
}?;
Ok(object_store)
}
use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
use crate::error::{self, CreateDirSnafu, Result};
fn with_retry_layers(object_store: ObjectStore) -> ObjectStore {
object_store.layer(
@@ -66,7 +43,9 @@ pub(crate) async fn new_object_store_without_cache(
store: &ObjectStoreConfig,
data_home: &str,
) -> Result<ObjectStore> {
let object_store = new_raw_object_store(store, data_home).await?;
let object_store = new_raw_object_store(store, data_home)
.await
.context(error::ObjectStoreSnafu)?;
// Enable retry layer and cache layer for non-fs object storages
let object_store = if store.is_object_storage() {
// Adds retry layer
@@ -83,7 +62,9 @@ pub(crate) async fn new_object_store(
store: ObjectStoreConfig,
data_home: &str,
) -> Result<ObjectStore> {
let object_store = new_raw_object_store(&store, data_home).await?;
let object_store = new_raw_object_store(&store, data_home)
.await
.context(error::ObjectStoreSnafu)?;
// Enable retry layer and cache layer for non-fs object storages
let object_store = if store.is_object_storage() {
let object_store = if let Some(cache_layer) = build_cache_layer(&store, data_home).await? {
@@ -170,20 +151,20 @@ async fn build_cache_layer(
&& !path.trim().is_empty()
{
let atomic_temp_dir = join_dir(path, ATOMIC_WRITE_DIR);
clean_temp_dir(&atomic_temp_dir)?;
clean_temp_dir(&atomic_temp_dir).context(error::ObjectStoreSnafu)?;
// Compatible code. Remove this after a major release.
let old_atomic_temp_dir = join_dir(path, OLD_ATOMIC_WRITE_DIR);
clean_temp_dir(&old_atomic_temp_dir)?;
clean_temp_dir(&old_atomic_temp_dir).context(error::ObjectStoreSnafu)?;
let cache_store = Fs::default()
.root(path)
.atomic_write_dir(&atomic_temp_dir)
.build()
.context(error::InitBackendSnafu)?;
.context(error::BuildCacheStoreSnafu)?;
let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize)
.context(error::InitBackendSnafu)?;
.context(error::BuildCacheStoreSnafu)?;
cache_layer.recover_cache(false).await;
info!(
"Enabled local object storage cache, path: {}, capacity: {}.",
@@ -196,26 +177,6 @@ async fn build_cache_layer(
}
}
pub(crate) fn clean_temp_dir(dir: &str) -> Result<()> {
if path::Path::new(&dir).exists() {
info!("Begin to clean temp storage directory: {}", dir);
std::fs::remove_dir_all(dir).context(error::RemoveDirSnafu { dir })?;
info!("Cleaned temp storage directory: {}", dir);
}
Ok(())
}
pub(crate) fn build_http_client(config: &HttpClientConfig) -> Result<HttpClient> {
let client = reqwest::ClientBuilder::new()
.pool_max_idle_per_host(config.pool_max_idle_per_host as usize)
.connect_timeout(config.connect_timeout)
.pool_idle_timeout(config.pool_idle_timeout)
.timeout(config.timeout)
.build()
.context(BuildHttpClientSnafu)?;
Ok(HttpClient::with(client))
}
struct PrintDetailedError;
// PrintDetailedError is a retry interceptor that prints error in Debug format in retrying.

View File

@@ -1,50 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_base::secrets::ExposeSecret;
use common_telemetry::info;
use object_store::services::Azblob;
use object_store::{util, ObjectStore};
use snafu::prelude::*;
use crate::config::AzblobConfig;
use crate::error::{self, Result};
use crate::store::build_http_client;
pub(crate) async fn new_azblob_object_store(azblob_config: &AzblobConfig) -> Result<ObjectStore> {
let root = util::normalize_dir(&azblob_config.root);
info!(
"The azure storage container is: {}, root is: {}",
azblob_config.container, &root
);
let client = build_http_client(&azblob_config.http_client)?;
let mut builder = Azblob::default()
.root(&root)
.container(&azblob_config.container)
.endpoint(&azblob_config.endpoint)
.account_name(azblob_config.account_name.expose_secret())
.account_key(azblob_config.account_key.expose_secret())
.http_client(client);
if let Some(token) = &azblob_config.sas_token {
builder = builder.sas_token(token);
};
Ok(ObjectStore::new(builder)
.context(error::InitBackendSnafu)?
.finish())
}

View File

@@ -1,53 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{fs, path};
use common_telemetry::info;
use mito2::access_layer::{ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR};
use object_store::services::Fs;
use object_store::util::join_dir;
use object_store::ObjectStore;
use snafu::prelude::*;
use crate::config::FileConfig;
use crate::error::{self, Result};
use crate::store;
/// A helper function to create a file system object store.
pub async fn new_fs_object_store(
data_home: &str,
_file_config: &FileConfig,
) -> Result<ObjectStore> {
fs::create_dir_all(path::Path::new(&data_home))
.context(error::CreateDirSnafu { dir: data_home })?;
info!("The file storage home is: {}", data_home);
let atomic_write_dir = join_dir(data_home, ATOMIC_WRITE_DIR);
store::clean_temp_dir(&atomic_write_dir)?;
// Compatible code. Remove this after a major release.
let old_atomic_temp_dir = join_dir(data_home, OLD_ATOMIC_WRITE_DIR);
store::clean_temp_dir(&old_atomic_temp_dir)?;
let builder = Fs::default()
.root(data_home)
.atomic_write_dir(&atomic_write_dir);
let object_store = ObjectStore::new(builder)
.context(error::InitBackendSnafu)?
.finish();
Ok(object_store)
}

View File

@@ -1,46 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_base::secrets::ExposeSecret;
use common_telemetry::info;
use object_store::services::Gcs;
use object_store::{util, ObjectStore};
use snafu::prelude::*;
use crate::config::GcsConfig;
use crate::error::{self, Result};
use crate::store::build_http_client;
pub(crate) async fn new_gcs_object_store(gcs_config: &GcsConfig) -> Result<ObjectStore> {
let root = util::normalize_dir(&gcs_config.root);
info!(
"The gcs storage bucket is: {}, root is: {}",
gcs_config.bucket, &root
);
let client = build_http_client(&gcs_config.http_client);
let builder = Gcs::default()
.root(&root)
.bucket(&gcs_config.bucket)
.scope(&gcs_config.scope)
.credential_path(gcs_config.credential_path.expose_secret())
.credential(gcs_config.credential.expose_secret())
.endpoint(&gcs_config.endpoint)
.http_client(client?);
Ok(ObjectStore::new(builder)
.context(error::InitBackendSnafu)?
.finish())
}

View File

@@ -1,45 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_base::secrets::ExposeSecret;
use common_telemetry::info;
use object_store::services::Oss;
use object_store::{util, ObjectStore};
use snafu::prelude::*;
use crate::config::OssConfig;
use crate::error::{self, Result};
use crate::store::build_http_client;
pub(crate) async fn new_oss_object_store(oss_config: &OssConfig) -> Result<ObjectStore> {
let root = util::normalize_dir(&oss_config.root);
info!(
"The oss storage bucket is: {}, root is: {}",
oss_config.bucket, &root
);
let client = build_http_client(&oss_config.http_client)?;
let builder = Oss::default()
.root(&root)
.bucket(&oss_config.bucket)
.endpoint(&oss_config.endpoint)
.access_key_id(oss_config.access_key_id.expose_secret())
.access_key_secret(oss_config.access_key_secret.expose_secret())
.http_client(client);
Ok(ObjectStore::new(builder)
.context(error::InitBackendSnafu)?
.finish())
}

View File

@@ -1,55 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_base::secrets::ExposeSecret;
use common_telemetry::info;
use object_store::services::S3;
use object_store::{util, ObjectStore};
use snafu::prelude::*;
use crate::config::S3Config;
use crate::error::{self, Result};
use crate::store::build_http_client;
pub(crate) async fn new_s3_object_store(s3_config: &S3Config) -> Result<ObjectStore> {
let root = util::normalize_dir(&s3_config.root);
info!(
"The s3 storage bucket is: {}, root is: {}",
s3_config.bucket, &root
);
let client = build_http_client(&s3_config.http_client)?;
let mut builder = S3::default()
.root(&root)
.bucket(&s3_config.bucket)
.access_key_id(s3_config.access_key_id.expose_secret())
.secret_access_key(s3_config.secret_access_key.expose_secret())
.http_client(client);
if s3_config.endpoint.is_some() {
builder = builder.endpoint(s3_config.endpoint.as_ref().unwrap());
}
if s3_config.region.is_some() {
builder = builder.region(s3_config.region.as_ref().unwrap());
}
if s3_config.enable_virtual_host_style {
builder = builder.enable_virtual_host_style();
}
Ok(ObjectStore::new(builder)
.context(error::InitBackendSnafu)?
.finish())
}

View File

@@ -108,11 +108,15 @@ pub type MockRequestHandler =
pub type MockSetReadonlyGracefullyHandler =
Box<dyn Fn(RegionId) -> Result<SetRegionRoleStateResponse, Error> + Send + Sync>;
pub type MockGetMetadataHandler =
Box<dyn Fn(RegionId) -> Result<RegionMetadataRef, Error> + Send + Sync>;
pub struct MockRegionEngine {
sender: Sender<(RegionId, RegionRequest)>,
pub(crate) handle_request_delay: Option<Duration>,
pub(crate) handle_request_mock_fn: Option<MockRequestHandler>,
pub(crate) handle_set_readonly_gracefully_mock_fn: Option<MockSetReadonlyGracefullyHandler>,
pub(crate) handle_get_metadata_mock_fn: Option<MockGetMetadataHandler>,
pub(crate) mock_role: Option<Option<RegionRole>>,
engine: String,
}
@@ -127,6 +131,7 @@ impl MockRegionEngine {
sender: tx,
handle_request_mock_fn: None,
handle_set_readonly_gracefully_mock_fn: None,
handle_get_metadata_mock_fn: None,
mock_role: None,
engine: engine.to_string(),
}),
@@ -146,6 +151,27 @@ impl MockRegionEngine {
sender: tx,
handle_request_mock_fn: Some(mock_fn),
handle_set_readonly_gracefully_mock_fn: None,
handle_get_metadata_mock_fn: None,
mock_role: None,
engine: engine.to_string(),
}),
rx,
)
}
pub fn with_metadata_mock_fn(
engine: &str,
mock_fn: MockGetMetadataHandler,
) -> (Arc<Self>, Receiver<(RegionId, RegionRequest)>) {
let (tx, rx) = tokio::sync::mpsc::channel(8);
(
Arc::new(Self {
handle_request_delay: None,
sender: tx,
handle_request_mock_fn: None,
handle_set_readonly_gracefully_mock_fn: None,
handle_get_metadata_mock_fn: Some(mock_fn),
mock_role: None,
engine: engine.to_string(),
}),
@@ -166,6 +192,7 @@ impl MockRegionEngine {
sender: tx,
handle_request_mock_fn: None,
handle_set_readonly_gracefully_mock_fn: None,
handle_get_metadata_mock_fn: None,
mock_role: None,
engine: engine.to_string(),
};
@@ -208,7 +235,11 @@ impl RegionEngine for MockRegionEngine {
unimplemented!()
}
async fn get_metadata(&self, _region_id: RegionId) -> Result<RegionMetadataRef, BoxedError> {
async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError> {
if let Some(mock_fn) = &self.handle_get_metadata_mock_fn {
return mock_fn(region_id).map_err(BoxedError::new);
};
unimplemented!()
}

View File

@@ -316,7 +316,7 @@ impl StreamingEngine {
);
METRIC_FLOW_ROWS
.with_label_values(&["out"])
.with_label_values(&["out-streaming"])
.inc_by(total_rows as u64);
let now = self.tick_manager.tick();

View File

@@ -18,7 +18,8 @@ use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use api::v1::flow::{
flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow,
flow_request, CreateRequest, DirtyWindowRequests, DropRequest, FlowRequest, FlowResponse,
FlushFlow,
};
use api::v1::region::InsertRequests;
use catalog::CatalogManager;
@@ -31,6 +32,7 @@ use common_runtime::JoinHandle;
use common_telemetry::{error, info, trace, warn};
use datatypes::value::Value;
use futures::TryStreamExt;
use greptime_proto::v1::flow::DirtyWindowRequest;
use itertools::Itertools;
use session::context::QueryContextBuilder;
use snafu::{ensure, IntoError, OptionExt, ResultExt};
@@ -46,7 +48,7 @@ use crate::error::{
IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu,
NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu,
};
use crate::metrics::METRIC_FLOW_TASK_COUNT;
use crate::metrics::{METRIC_FLOW_ROWS, METRIC_FLOW_TASK_COUNT};
use crate::repr::{self, DiffRow};
use crate::{Error, FlowId};
@@ -689,6 +691,9 @@ impl FlowEngine for FlowDualEngine {
let mut to_stream_engine = Vec::with_capacity(request.requests.len());
let mut to_batch_engine = request.requests;
let mut batching_row_cnt = 0;
let mut streaming_row_cnt = 0;
{
// not locking this, or recover flows will be starved when also handling flow inserts
let src_table2flow = self.src_table2flow.read().await;
@@ -698,9 +703,11 @@ impl FlowEngine for FlowDualEngine {
let is_in_stream = src_table2flow.in_stream(table_id);
let is_in_batch = src_table2flow.in_batch(table_id);
if is_in_stream {
streaming_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
to_stream_engine.push(req.clone());
}
if is_in_batch {
batching_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
return true;
}
if !is_in_batch && !is_in_stream {
@@ -713,6 +720,14 @@ impl FlowEngine for FlowDualEngine {
// can't use drop due to https://github.com/rust-lang/rust/pull/128846
}
METRIC_FLOW_ROWS
.with_label_values(&["in-streaming"])
.inc_by(streaming_row_cnt as u64);
METRIC_FLOW_ROWS
.with_label_values(&["in-batching"])
.inc_by(batching_row_cnt as u64);
let streaming_engine = self.streaming_engine.clone();
let stream_handler: JoinHandle<Result<(), Error>> =
common_runtime::spawn_global(async move {
@@ -819,6 +834,15 @@ impl common_meta::node_manager::Flownode for FlowDualEngine {
.map(|_| Default::default())
.map_err(to_meta_err(snafu::location!()))
}
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> MetaResult<FlowResponse> {
self.batching_engine()
.handle_mark_dirty_time_window(DirtyWindowRequests {
requests: vec![req],
})
.await
.map_err(to_meta_err(snafu::location!()))
}
}
/// return a function to convert `crate::error::Error` to `common_meta::error::Error`
@@ -841,93 +865,6 @@ fn to_meta_err(
}
}
#[async_trait::async_trait]
impl common_meta::node_manager::Flownode for StreamingEngine {
async fn handle(&self, request: FlowRequest) -> MetaResult<FlowResponse> {
let query_ctx = request
.header
.and_then(|h| h.query_context)
.map(|ctx| ctx.into());
match request.body {
Some(flow_request::Body::Create(CreateRequest {
flow_id: Some(task_id),
source_table_ids,
sink_table_name: Some(sink_table_name),
create_if_not_exists,
expire_after,
comment,
sql,
flow_options,
or_replace,
})) => {
let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
let sink_table_name = [
sink_table_name.catalog_name,
sink_table_name.schema_name,
sink_table_name.table_name,
];
let expire_after = expire_after.map(|e| e.value);
let args = CreateFlowArgs {
flow_id: task_id.id as u64,
sink_table_name,
source_table_ids,
create_if_not_exists,
or_replace,
expire_after,
comment: Some(comment),
sql: sql.clone(),
flow_options,
query_ctx,
};
let ret = self
.create_flow(args)
.await
.map_err(BoxedError::new)
.with_context(|_| CreateFlowSnafu { sql: sql.clone() })
.map_err(to_meta_err(snafu::location!()))?;
METRIC_FLOW_TASK_COUNT.inc();
Ok(FlowResponse {
affected_flows: ret
.map(|id| greptime_proto::v1::FlowId { id: id as u32 })
.into_iter()
.collect_vec(),
..Default::default()
})
}
Some(flow_request::Body::Drop(DropRequest {
flow_id: Some(flow_id),
})) => {
self.remove_flow(flow_id.id as u64)
.await
.map_err(to_meta_err(snafu::location!()))?;
METRIC_FLOW_TASK_COUNT.dec();
Ok(Default::default())
}
Some(flow_request::Body::Flush(FlushFlow {
flow_id: Some(flow_id),
})) => {
let row = self
.flush_flow_inner(flow_id.id as u64)
.await
.map_err(to_meta_err(snafu::location!()))?;
Ok(FlowResponse {
affected_flows: vec![flow_id],
affected_rows: row as u64,
..Default::default()
})
}
other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
}
}
async fn handle_inserts(&self, request: InsertRequests) -> MetaResult<FlowResponse> {
self.handle_inserts_inner(request)
.await
.map(|_| Default::default())
.map_err(to_meta_err(snafu::location!()))
}
}
impl FlowEngine for StreamingEngine {
async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
self.create_flow_inner(args).await

View File

@@ -17,6 +17,7 @@
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use api::v1::flow::{DirtyWindowRequests, FlowResponse};
use catalog::CatalogManagerRef;
use common_error::ext::BoxedError;
use common_meta::ddl::create_flow::FlowType;
@@ -29,8 +30,7 @@ use common_telemetry::{debug, info};
use common_time::TimeToLive;
use query::QueryEngineRef;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionId;
use table::metadata::TableId;
use store_api::storage::{RegionId, TableId};
use tokio::sync::{oneshot, RwLock};
use crate::batching_mode::frontend_client::FrontendClient;
@@ -42,6 +42,7 @@ use crate::error::{
ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu,
UnexpectedSnafu, UnsupportedSnafu,
};
use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW;
use crate::{CreateFlowArgs, Error, FlowId, TableName};
/// Batching mode Engine, responsible for driving all the batching mode tasks
@@ -77,6 +78,116 @@ impl BatchingEngine {
}
}
pub async fn handle_mark_dirty_time_window(
&self,
reqs: DirtyWindowRequests,
) -> Result<FlowResponse, Error> {
let table_info_mgr = self.table_meta.table_info_manager();
let mut group_by_table_id: HashMap<u32, Vec<_>> = HashMap::new();
for r in reqs.requests {
let tid = TableId::from(r.table_id);
let entry = group_by_table_id.entry(tid).or_default();
entry.extend(r.timestamps);
}
let tids = group_by_table_id.keys().cloned().collect::<Vec<TableId>>();
let table_infos =
table_info_mgr
.batch_get(&tids)
.await
.with_context(|_| TableNotFoundMetaSnafu {
msg: format!("Failed to get table info for table ids: {:?}", tids),
})?;
let group_by_table_name = group_by_table_id
.into_iter()
.filter_map(|(id, timestamps)| {
let table_name = table_infos.get(&id).map(|info| info.table_name());
let Some(table_name) = table_name else {
warn!("Failed to get table infos for table id: {:?}", id);
return None;
};
let table_name = [
table_name.catalog_name,
table_name.schema_name,
table_name.table_name,
];
let schema = &table_infos.get(&id).unwrap().table_info.meta.schema;
let time_index_unit = schema.column_schemas[schema.timestamp_index.unwrap()]
.data_type
.as_timestamp()
.unwrap()
.unit();
Some((table_name, (timestamps, time_index_unit)))
})
.collect::<HashMap<_, _>>();
let group_by_table_name = Arc::new(group_by_table_name);
let mut handles = Vec::new();
let tasks = self.tasks.read().await;
for (_flow_id, task) in tasks.iter() {
let src_table_names = &task.config.source_table_names;
if src_table_names
.iter()
.all(|name| !group_by_table_name.contains_key(name))
{
continue;
}
let group_by_table_name = group_by_table_name.clone();
let task = task.clone();
let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
let src_table_names = &task.config.source_table_names;
let mut all_dirty_windows = vec![];
for src_table_name in src_table_names {
if let Some((timestamps, unit)) = group_by_table_name.get(src_table_name) {
let Some(expr) = &task.config.time_window_expr else {
continue;
};
for timestamp in timestamps {
let align_start = expr
.eval(common_time::Timestamp::new(*timestamp, *unit))?
.0
.context(UnexpectedSnafu {
reason: "Failed to eval start value",
})?;
all_dirty_windows.push(align_start);
}
}
}
let mut state = task.state.write().unwrap();
let flow_id_label = task.config.flow_id.to_string();
for timestamp in all_dirty_windows {
state.dirty_time_windows.add_window(timestamp, None);
}
METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW
.with_label_values(&[&flow_id_label])
.set(state.dirty_time_windows.len() as f64);
Ok(())
});
handles.push(handle);
}
drop(tasks);
for handle in handles {
match handle.await {
Err(e) => {
warn!("Failed to handle inserts: {e}");
}
Ok(Ok(())) => (),
Ok(Err(e)) => {
warn!("Failed to handle inserts: {e}");
}
}
}
Ok(Default::default())
}
pub async fn handle_inserts_inner(
&self,
request: api::v1::region::InsertRequests,

View File

@@ -18,7 +18,8 @@ use std::sync::{Arc, Weak};
use std::time::SystemTime;
use api::v1::greptime_request::Request;
use api::v1::CreateTableExpr;
use api::v1::query_request::Query;
use api::v1::{CreateTableExpr, QueryRequest};
use client::{Client, Database};
use common_error::ext::{BoxedError, ErrorExt};
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
@@ -269,6 +270,55 @@ impl FrontendClient {
.await
}
/// Execute a SQL statement on the frontend.
pub async fn sql(&self, catalog: &str, schema: &str, sql: &str) -> Result<Output, Error> {
match self {
FrontendClient::Distributed { .. } => {
let db = self.get_random_active_frontend(catalog, schema).await?;
db.database
.sql(sql)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)
}
FrontendClient::Standalone { database_client } => {
let ctx = QueryContextBuilder::default()
.current_catalog(catalog.to_string())
.current_schema(schema.to_string())
.build();
let ctx = Arc::new(ctx);
{
let database_client = {
database_client
.lock()
.map_err(|e| {
UnexpectedSnafu {
reason: format!("Failed to lock database client: {e}"),
}
.build()
})?
.as_ref()
.context(UnexpectedSnafu {
reason: "Standalone's frontend instance is not set",
})?
.upgrade()
.context(UnexpectedSnafu {
reason: "Failed to upgrade database client",
})?
};
let req = Request::Query(QueryRequest {
query: Some(Query::Sql(sql.to_string())),
});
database_client
.do_query(req, ctx)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)
}
}
}
}
/// Handle a request to frontend
pub(crate) async fn handle(
&self,
@@ -318,7 +368,7 @@ impl FrontendClient {
})?
};
let resp: common_query::Output = database_client
.do_query(req.clone(), ctx)
.do_query(req, ctx)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;

View File

@@ -156,6 +156,11 @@ impl DirtyTimeWindows {
self.windows.clear();
}
/// Number of dirty windows.
pub fn len(&self) -> usize {
self.windows.len()
}
/// Generate all filter expressions consuming all time windows
///
/// there is two limits:

Some files were not shown because too many files have changed in this diff Show More