Compare commits

...

25 Commits

Author SHA1 Message Date
Lanqing Yang
5ce5e41296 Merge branch 'main' into chore/test_skip_auth 2025-12-12 07:48:43 +09:00
Weny Xu
ba4eda40e5 refactor: optimize heartbeat channel and etcd client keepalive settings (#7390)
Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-12-11 13:32:11 +00:00
discord9
f06a64ff90 feat: mark index outdated (#7383)
* feat: mark index outdated

Signed-off-by: discord9 <discord9@163.com>

* refactor: move IndexVerwsion to store-api

Signed-off-by: discord9 <discord9@163.com>

* per review

Signed-off-by: discord9 <discord9@163.com>

* fix: condition for add files

Signed-off-by: discord9 <discord9@163.com>

* cleanup

Signed-off-by: discord9 <discord9@163.com>

* refactor(sst): extract index version check into method

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
2025-12-11 12:08:45 +00:00
discord9
b8c362ec65 fix: disable config load in int test
Signed-off-by: discord9 <discord9@163.com>
2025-12-11 15:29:05 +08:00
fys
84b4777925 fix: parse "KEEP FIRING FOR" (#7386)
* fix: parse "KEEP FIRING FOR"

* fix: cargo fmt
2025-12-11 03:54:47 +00:00
discord9
a26dee0ca1 fix: gc listing op first (#7385)
Signed-off-by: discord9 <discord9@163.com>
2025-12-11 03:25:05 +00:00
Ning Sun
276f6bf026 feat: grafana postgresql data source query builder support (#7379)
* feat: grafana postgresql data source query builder support

* test: add sqlness test cases
2025-12-11 03:18:35 +00:00
Weny Xu
1d5291b06d fix(procedure): update procedure state correctly during execution and on failure (#7376)
Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-12-11 02:30:32 +00:00
Ruihang Xia
564cc0c750 feat: table/column/flow COMMENT (#7060)
* initial impl

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* simplify impl

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* sqlness test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* avoid unimplemented panic

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* validate flow

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update sqlness result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix table column comment

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* table level comment

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* simplify table info serde

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* don't txn

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove empty trait

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* wip: procedure

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update proto

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* grpc support

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Apply suggestions from code review

Co-authored-by: dennis zhuang <killme2008@gmail.com>
Co-authored-by: LFC <990479+MichaelScofield@users.noreply.github.com>
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* try from pb struct

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* doc comment

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* check unchanged fast case

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* tune errors

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix merge error

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* use try_as_raw_value

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: dennis zhuang <killme2008@gmail.com>
Co-authored-by: LFC <990479+MichaelScofield@users.noreply.github.com>
2025-12-10 15:08:47 +00:00
LFC
f1abe5d215 feat: suspend frontend and datanode (#7370)
Signed-off-by: luofucong <luofc@foxmail.com>
2025-12-10 12:18:24 +00:00
Ruihang Xia
ab426cbf89 refactor: remove duplication coverage and code from window sort tests (#7384)
* refactor: remove duplication coverage and code from window sort tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* allow clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-12-10 10:11:19 +00:00
Weny Xu
cb0f1afb01 fix: improve network failure detection (#7382)
* fix(meta): add default etcd client options with keep-alive settings (#7363)

* fix: improve network failure detection (#7367)

* Update src/meta-srv/src/handler.rs

Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>

---------

Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>
2025-12-10 09:48:36 +00:00
Yingwen
a22d08f1b1 feat: collect merge and dedup metrics (#7375)
* feat: collect FlatMergeReader metrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: add MergeMetricsReporter, rename Metrics to MergeMetrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: remove num_input_rows from MergeMetrics

The merge reader won't dedup so there is no need to collect input rows

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: report merge metrics to PartitionMetrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: add dedup cost to DedupMetrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: collect dedup metrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: remove metrics from FlatMergeIterator

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: remove num_output_rows from MergeMetrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: fix clippy

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: implement merge() for merge and dedup metrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: report metrics after observe metrics

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
2025-12-10 09:16:20 +00:00
Ruihang Xia
6817a376b5 fix: part sort behavior (#7374)
* fix: part sort behavior

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* tune tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* debug assertion and remove produced count

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-12-10 07:44:44 +00:00
discord9
4d1a587079 chore: saturating duration since (#7380)
chore: sat duration since

Signed-off-by: discord9 <discord9@163.com>
2025-12-10 07:10:46 +00:00
Lei, HUANG
9f1aefe98f feat: allow one to many VRL pipeline (#7342)
* feat/allow-one-to-many-pipeline:
 ### Enhance Pipeline Processing for One-to-Many Transformations

 - **Support One-to-Many Transformations**:
   - Updated `processor.rs`, `etl.rs`, `vrl_processor.rs`, and `greptime.rs` to handle one-to-many transformations by allowing VRL processors to return arrays, expanding each element into separate rows.
   - Introduced `transform_array_elements` and `values_to_rows` functions to facilitate this transformation.

 - **Error Handling Enhancements**:
   - Added new error types in `error.rs` to handle cases where array elements are not objects and for transformation failures.

 - **Testing Enhancements**:
   - Added tests in `pipeline.rs` to verify one-to-many transformations, single object processing, and error handling for non-object array elements.

 - **Context Management**:
   - Modified `ctx_req.rs` to clone `ContextOpt` when adding rows, ensuring correct context management during transformations.

 - **Server Pipeline Adjustments**:
   - Updated `pipeline.rs` in `servers` to handle transformed outputs with one-to-many row expansions, ensuring correct row padding and request formation.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/allow-one-to-many-pipeline:
 Add one-to-many VRL pipeline test in `http.rs`

 - Introduced `test_pipeline_one_to_many_vrl` to verify VRL processor's ability to expand a single input row into multiple output rows.
 - Updated `http_tests!` macro to include the new test.
 - Implemented test scenarios for single and multiple input rows, ensuring correct data transformation and row count validation.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/allow-one-to-many-pipeline:
 ### Add Tests for VRL Pipeline Transformations

 - **File:** `src/pipeline/src/etl.rs`
   - Added tests for one-to-many VRL pipeline expansion to ensure multiple output rows from a single input.
   - Introduced tests to verify backward compatibility for single object output.
   - Implemented tests to confirm zero rows are produced from empty arrays.
   - Added validation tests to ensure array elements must be objects.
   - Developed tests for one-to-many transformations with table suffix hints from VRL.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/allow-one-to-many-pipeline:
 ### Enhance Pipeline Transformation with Per-Row Table Suffixes

 - **`src/pipeline/src/etl.rs`**: Updated `TransformedOutput` to include per-row table suffixes, allowing for more flexible routing of transformed data. Modified `PipelineExecOutput` and related methods to
 handle the new structure.
 - **`src/pipeline/src/etl/transform/transformer/greptime.rs`**: Enhanced `values_to_rows` to support per-row table suffix extraction and application.
 - **`src/pipeline/tests/common.rs`** and **`src/pipeline/tests/pipeline.rs`**: Adjusted tests to validate the new per-row table suffix functionality, ensuring backward compatibility and correct behavior in
 one-to-many transformations.
 - **`src/servers/src/pipeline.rs`**: Modified `run_custom_pipeline` to process transformed outputs with per-row table suffixes, grouping rows by `(opt, table_name)` for insertion.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/allow-one-to-many-pipeline:
 ### Update VRL Processor Type Checks

 - **File:** `vrl_processor.rs`
 - **Changes:** Updated type checking logic to use `contains_object()` and `contains_array()` methods instead of `is_object()` and `is_array()`. This change ensures
 compatibility with VRL type inference that may return multiple possible types.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/allow-one-to-many-pipeline:
 - **Enhance Error Handling**: Added new error types `ArrayElementMustBeObjectSnafu` and `TransformArrayElementSnafu` to improve error handling in `etl.rs` and `greptime.rs`.
 - **Refactor Error Usage**: Moved error usage declarations in `transform_array_elements` and `values_to_rows` functions to the top of the file for better organization in `etl.rs` and `greptime.rs`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/allow-one-to-many-pipeline:
 ### Update `greptime.rs` to Enhance Error Handling

 - **Error Handling**: Modified the `values_to_rows` function to handle invalid array elements based on the `skip_error` parameter. If `skip_error` is true, invalid elements are skipped; otherwise, an error is returned.
 - **Testing**: Added unit tests in `greptime.rs` to verify the behavior of `values_to_rows` with different `skip_error` settings, ensuring correct processing of valid objects and appropriate error handling for invalid elements.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/allow-one-to-many-pipeline:
 ### Commit Summary

 - **Enhance `TransformedOutput` Structure**: Refactored `TransformedOutput` to use a `HashMap` for grouping rows by `ContextOpt`, allowing for per-row configuration options. Updated methods in `PipelineExecOutput` to support the new structure (`src/pipeline/src/etl.rs`).

 - **Add New Transformation Method**: Introduced `transform_array_elements_to_hashmap` to handle array inputs with per-row `ContextOpt` in `HashMap` format (`src/pipeline/src/etl.rs`).

 - **Update Pipeline Execution**: Modified `run_custom_pipeline` to process `TransformedOutput` using the new `HashMap` structure, ensuring rows are grouped by `ContextOpt` and table name (`src/servers/src/pipeline.rs`).

 - **Add Tests for New Structure**: Implemented tests to verify the functionality of the new `HashMap` structure in `TransformedOutput`, including scenarios for one-to-many mapping, single object input, and empty arrays (`src/pipeline/src/etl.rs`).

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/allow-one-to-many-pipeline:
 ### Refactor `values_to_rows` to Return `HashMap` Grouped by `ContextOpt`

 - **`etl.rs`**:
   - Updated `values_to_rows` to return a `HashMap` grouped by `ContextOpt` instead of a vector.
   - Adjusted logic to handle single object and array inputs, ensuring rows are grouped by their `ContextOpt`.
   - Modified functions to extract rows from default `ContextOpt` and apply table suffixes accordingly.

 - **`greptime.rs`**:
   - Enhanced `values_to_rows` to handle errors gracefully with `skip_error` logic.
   - Added logic to group rows by `ContextOpt` for array inputs.

 - **Tests**:
   - Updated existing tests to validate the new `HashMap` return structure.
   - Added a new test to verify correct grouping of rows by per-element `ContextOpt`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/allow-one-to-many-pipeline:
 ### Refactor and Enhance Error Handling in ETL Pipeline

 - **Refactored Functionality**:
   - Replaced `transform_array_elements` with `transform_array_elements_by_ctx` in `etl.rs` to streamline transformation logic and improve error handling.
   - Updated `values_to_rows` in `greptime.rs` to use `or_default` for cleaner code.

 - **Enhanced Error Handling**:
   - Introduced `unwrap_or_continue_if_err` macro in `etl.rs` to allow skipping errors based on pipeline context, improving robustness in data processing.

 These changes enhance the maintainability and error resilience of the ETL pipeline.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/allow-one-to-many-pipeline:
 ### Update `Row` Handling in ETL Pipeline

 - **Refactor `Row` Type**: Introduced `RowWithTableSuffix` type alias to simplify handling of rows with optional table suffixes across the ETL pipeline.
 - **Modify Function Signatures**: Updated function signatures in `etl.rs` and `greptime.rs` to use `RowWithTableSuffix` for better clarity and consistency.
 - **Enhance Test Coverage**: Adjusted test logic in `greptime.rs` to align with the new `RowWithTableSuffix` type, ensuring correct grouping and processing of rows by TTL.

 Files affected: `etl.rs`, `greptime.rs`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-12-10 06:38:44 +00:00
Lei, HUANG
2f9130a2de chore(mito): expose some symbols (#7373)
chore/expose-symbols:
 ### Commit Summary

 - **Visibility Changes**: Updated visibility of functions in `bulk/part.rs`:
   - Made `record_batch_estimated_size` and `sort_primary_key_record_batch` functions public.
 - **Enhancements**: Enhanced functionality in `memtable.rs` by exposing additional components from `bulk::part`:
   - `BulkPartEncoder`, `BulkPartMeta`, `UnorderedPart`, `record_batch_estimated_size`, and `sort_primary_key_record_batch`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-12-09 14:33:14 +00:00
shuiyisong
fa2b4e5e63 refactor: extract file watcher to common-config (#7357)
* refactor: extract file watcher to common-config

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* fix: add file check

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: watch dir instead of file

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: address CR issues

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
2025-12-09 11:23:26 +00:00
discord9
9197e818ec refactor: use versioned index for index file (#7309)
* refactor: use versioned index for index file

Signed-off-by: discord9 <discord9@163.com>

* fix: sst entry table

Signed-off-by: discord9 <discord9@163.com>

* update sqlness

Signed-off-by: discord9 <discord9@163.com>

* chore: unit type

Signed-off-by: discord9 <discord9@163.com>

* fix: missing version

Signed-off-by: discord9 <discord9@163.com>

* more fix build index

Signed-off-by: discord9 <discord9@163.com>

* fix: use proper index id

Signed-off-by: discord9 <discord9@163.com>

* pcr

Signed-off-by: discord9 <discord9@163.com>

* test: update

Signed-off-by: discord9 <discord9@163.com>

* clippy

Signed-off-by: discord9 <discord9@163.com>

* test: test_list_ssts fixed

Signed-off-by: discord9 <discord9@163.com>

* test: fix test

Signed-off-by: discord9 <discord9@163.com>

* feat: stuff

Signed-off-by: discord9 <discord9@163.com>

* fix: clean temp index file on abort&delete all index version when delete file

Signed-off-by: discord9 <discord9@163.com>

* docs: explain

Signed-off-by: discord9 <discord9@163.com>

* fix: actually clean up tmp dir

Signed-off-by: discord9 <discord9@163.com>

* clippy

Signed-off-by: discord9 <discord9@163.com>

* clean tmp dir only when write cache enabled

Signed-off-by: discord9 <discord9@163.com>

* refactor: add version to index cache

Signed-off-by: discord9 <discord9@163.com>

* per review

Signed-off-by: discord9 <discord9@163.com>

* test: update size

Signed-off-by: discord9 <discord9@163.com>

* per review

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
2025-12-09 07:31:12 +00:00
discord9
36d89c3baf fix: use saturating in gc tracker (#7369)
chore: use saturating

Signed-off-by: discord9 <discord9@163.com>
2025-12-09 06:38:59 +00:00
Ruihang Xia
0ebfd161d8 feat: allow publishing new nightly release when some platforms are absent (#7354)
* feat: allow publishing new nightly release when some platforms are absent

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* unify linux platforms

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* always evaluate conditions

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-12-09 04:59:50 +00:00
ZonaHe
8b26a98c3b feat: update dashboard to v0.11.9 (#7364)
Co-authored-by: sunchanglong <sunchanglong@users.noreply.github.com>
2025-12-09 02:37:44 +00:00
discord9
7199823be9 chore: rename to avoid git reserved name (#7359)
rename to avoid reserved name

Signed-off-by: discord9 <discord9@163.com>
2025-12-08 04:01:25 +00:00
Ruihang Xia
60f752d306 feat: run histogram quantile in safe mode for incomplete data (#7297)
* initial impl

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* sqlness test and fix

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* correct sqlness case

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* simplification

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* refine code and comment

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-12-05 09:19:21 +00:00
Ruihang Xia
edb1f6086f feat: decode pk eagerly (#7350)
* feat: decode pk eagerly

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* merge primary_key_codec and decode_primary_key_values

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-12-05 09:11:51 +00:00
180 changed files with 7457 additions and 2018 deletions

View File

@@ -49,14 +49,9 @@ on:
description: Do not run integration tests during the build
type: boolean
default: true
build_linux_amd64_artifacts:
build_linux_artifacts:
type: boolean
description: Build linux-amd64 artifacts
required: false
default: false
build_linux_arm64_artifacts:
type: boolean
description: Build linux-arm64 artifacts
description: Build linux artifacts (both amd64 and arm64)
required: false
default: false
build_macos_artifacts:
@@ -144,7 +139,7 @@ jobs:
./.github/scripts/check-version.sh "${{ steps.create-version.outputs.version }}"
- name: Allocate linux-amd64 runner
if: ${{ inputs.build_linux_amd64_artifacts || github.event_name == 'push' || github.event_name == 'schedule' }}
if: ${{ inputs.build_linux_artifacts || github.event_name == 'push' || github.event_name == 'schedule' }}
uses: ./.github/actions/start-runner
id: start-linux-amd64-runner
with:
@@ -158,7 +153,7 @@ jobs:
subnet-id: ${{ vars.EC2_RUNNER_SUBNET_ID }}
- name: Allocate linux-arm64 runner
if: ${{ inputs.build_linux_arm64_artifacts || github.event_name == 'push' || github.event_name == 'schedule' }}
if: ${{ inputs.build_linux_artifacts || github.event_name == 'push' || github.event_name == 'schedule' }}
uses: ./.github/actions/start-runner
id: start-linux-arm64-runner
with:
@@ -173,7 +168,7 @@ jobs:
build-linux-amd64-artifacts:
name: Build linux-amd64 artifacts
if: ${{ inputs.build_linux_amd64_artifacts || github.event_name == 'push' || github.event_name == 'schedule' }}
if: ${{ inputs.build_linux_artifacts || github.event_name == 'push' || github.event_name == 'schedule' }}
needs: [
allocate-runners,
]
@@ -195,7 +190,7 @@ jobs:
build-linux-arm64-artifacts:
name: Build linux-arm64 artifacts
if: ${{ inputs.build_linux_arm64_artifacts || github.event_name == 'push' || github.event_name == 'schedule' }}
if: ${{ inputs.build_linux_artifacts || github.event_name == 'push' || github.event_name == 'schedule' }}
needs: [
allocate-runners,
]
@@ -217,7 +212,7 @@ jobs:
run-multi-lang-tests:
name: Run Multi-language SDK Tests
if: ${{ inputs.build_linux_amd64_artifacts || github.event_name == 'push' || github.event_name == 'schedule' }}
if: ${{ inputs.build_linux_artifacts || github.event_name == 'push' || github.event_name == 'schedule' }}
needs: [
allocate-runners,
build-linux-amd64-artifacts,
@@ -386,7 +381,18 @@ jobs:
publish-github-release:
name: Create GitHub release and upload artifacts
if: ${{ inputs.publish_github_release || github.event_name == 'push' || github.event_name == 'schedule' }}
# Use always() to run even when optional jobs (macos, windows) are skipped.
# Then check that required jobs succeeded and optional jobs didn't fail.
if: |
always() &&
(inputs.publish_github_release || github.event_name == 'push' || github.event_name == 'schedule') &&
needs.allocate-runners.result == 'success' &&
(needs.build-linux-amd64-artifacts.result == 'success' || needs.build-linux-amd64-artifacts.result == 'skipped') &&
(needs.build-linux-arm64-artifacts.result == 'success' || needs.build-linux-arm64-artifacts.result == 'skipped') &&
(needs.build-macos-artifacts.result == 'success' || needs.build-macos-artifacts.result == 'skipped') &&
(needs.build-windows-artifacts.result == 'success' || needs.build-windows-artifacts.result == 'skipped') &&
(needs.release-images-to-dockerhub.result == 'success' || needs.release-images-to-dockerhub.result == 'skipped') &&
(needs.run-multi-lang-tests.result == 'success' || needs.run-multi-lang-tests.result == 'skipped')
needs: [ # The job have to wait for all the artifacts are built.
allocate-runners,
build-linux-amd64-artifacts,

36
Cargo.lock generated
View File

@@ -738,12 +738,12 @@ dependencies = [
"api",
"async-trait",
"common-base",
"common-config",
"common-error",
"common-macro",
"common-telemetry",
"common-test-util",
"digest",
"notify",
"sha1",
"snafu 0.8.6",
"sql",
@@ -2055,6 +2055,7 @@ dependencies = [
"datanode",
"humantime-serde",
"meta-client",
"notify",
"object-store",
"serde",
"serde_json",
@@ -2253,6 +2254,7 @@ dependencies = [
"arrow-flight",
"bytes",
"common-base",
"common-config",
"common-error",
"common-macro",
"common-recordbatch",
@@ -2266,7 +2268,6 @@ dependencies = [
"hyper 1.6.0",
"hyper-util",
"lazy_static",
"notify",
"prost 0.13.5",
"rand 0.9.1",
"serde",
@@ -2845,6 +2846,15 @@ dependencies = [
"unicode-segmentation",
]
[[package]]
name = "convert_case"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "633458d4ef8c78b72454de2d54fd6ab2e60f9e02be22f3c6104cdc8a4e0fceb9"
dependencies = [
"unicode-segmentation",
]
[[package]]
name = "core-foundation"
version = "0.9.4"
@@ -3741,9 +3751,9 @@ dependencies = [
[[package]]
name = "datafusion-pg-catalog"
version = "0.12.2"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "755393864c0c2dd95575ceed4b25e348686028e1b83d06f8f39914209999f821"
checksum = "09bfd1feed7ed335227af0b65955ed825e467cf67fad6ecd089123202024cfd1"
dependencies = [
"async-trait",
"datafusion",
@@ -4184,21 +4194,23 @@ dependencies = [
[[package]]
name = "derive_more"
version = "1.0.0"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a9b99b9cbbe49445b21764dc0625032a89b145a2642e67603e1c936f5458d05"
checksum = "10b768e943bed7bf2cab53df09f4bc34bfd217cdb57d971e769874c9a6710618"
dependencies = [
"derive_more-impl",
]
[[package]]
name = "derive_more-impl"
version = "1.0.0"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb7330aeadfbe296029522e6c40f315320aba36fc43a5b3632f3795348f3bd22"
checksum = "6d286bfdaf75e988b4a78e013ecd79c581e06399ab53fbacd2d916c2f904f30b"
dependencies = [
"convert_case 0.10.0",
"proc-macro2",
"quote",
"rustc_version",
"syn 2.0.106",
"unicode-xid",
]
@@ -4915,6 +4927,7 @@ dependencies = [
"async-stream",
"async-trait",
"auth",
"axum 0.8.4",
"bytes",
"cache",
"catalog",
@@ -4949,9 +4962,11 @@ dependencies = [
"hostname 0.4.1",
"humantime",
"humantime-serde",
"hyper-util",
"lazy_static",
"log-query",
"meta-client",
"meta-srv",
"num_cpus",
"opentelemetry-proto",
"operator",
@@ -4963,6 +4978,7 @@ dependencies = [
"prost 0.13.5",
"query",
"rand 0.9.1",
"reqwest",
"serde",
"serde_json",
"servers",
@@ -5351,7 +5367,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=0df99f09f1d6785055b2d9da96fc4ecc2bdf6803#0df99f09f1d6785055b2d9da96fc4ecc2bdf6803"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=0423fa30203187c75e2937a668df1da699c8b96c#0423fa30203187c75e2937a668df1da699c8b96c"
dependencies = [
"prost 0.13.5",
"prost-types 0.13.5",
@@ -10837,7 +10853,7 @@ dependencies = [
[[package]]
name = "rskafka"
version = "0.6.0"
source = "git+https://github.com/WenyXu/rskafka.git?rev=7b0f31ed39db049b4ee2e5f1e95b5a30be9baf76#7b0f31ed39db049b4ee2e5f1e95b5a30be9baf76"
source = "git+https://github.com/GreptimeTeam/rskafka.git?rev=f5688f83e7da591cda3f2674c2408b4c0ed4ed50#f5688f83e7da591cda3f2674c2408b4c0ed4ed50"
dependencies = [
"bytes",
"chrono",

View File

@@ -131,7 +131,7 @@ datafusion-functions = "50"
datafusion-functions-aggregate-common = "50"
datafusion-optimizer = "50"
datafusion-orc = "0.5"
datafusion-pg-catalog = "0.12.2"
datafusion-pg-catalog = "0.12.3"
datafusion-physical-expr = "50"
datafusion-physical-plan = "50"
datafusion-sql = "50"
@@ -139,6 +139,7 @@ datafusion-substrait = "50"
deadpool = "0.12"
deadpool-postgres = "0.14"
derive_builder = "0.20"
derive_more = { version = "2.1", features = ["full"] }
dotenv = "0.15"
either = "1.15"
etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62df834f0cffda355eba96691fe1a9a332b75a7", features = [
@@ -148,7 +149,7 @@ etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62d
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "0df99f09f1d6785055b2d9da96fc4ecc2bdf6803" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "0423fa30203187c75e2937a668df1da699c8b96c" }
hex = "0.4"
http = "1"
humantime = "2.1"
@@ -200,7 +201,8 @@ reqwest = { version = "0.12", default-features = false, features = [
"stream",
"multipart",
] }
rskafka = { git = "https://github.com/WenyXu/rskafka.git", rev = "7b0f31ed39db049b4ee2e5f1e95b5a30be9baf76", features = [
# Branch: feat/request-timeout
rskafka = { git = "https://github.com/GreptimeTeam/rskafka.git", rev = "f5688f83e7da591cda3f2674c2408b4c0ed4ed50", features = [
"transport-tls",
] }
rstest = "0.25"

20
flake.lock generated
View File

@@ -8,11 +8,11 @@
"rust-analyzer-src": "rust-analyzer-src"
},
"locked": {
"lastModified": 1760078406,
"narHash": "sha256-JeJK0ZA845PtkCHkfo4KjeI1mYrsr2s3cxBYKhF4BoE=",
"lastModified": 1765252472,
"narHash": "sha256-byMt/uMi7DJ8tRniFopDFZMO3leSjGp6GS4zWOFT+uQ=",
"owner": "nix-community",
"repo": "fenix",
"rev": "351277c60d104944122ee389cdf581c5ce2c6732",
"rev": "8456b985f6652e3eef0632ee9992b439735c5544",
"type": "github"
},
"original": {
@@ -41,16 +41,16 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1759994382,
"narHash": "sha256-wSK+3UkalDZRVHGCRikZ//CyZUJWDJkBDTQX1+G77Ow=",
"lastModified": 1764983851,
"narHash": "sha256-y7RPKl/jJ/KAP/VKLMghMgXTlvNIJMHKskl8/Uuar7o=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "5da4a26309e796daa7ffca72df93dbe53b8164c7",
"rev": "d9bc5c7dceb30d8d6fafa10aeb6aa8a48c218454",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-25.05",
"ref": "nixos-25.11",
"repo": "nixpkgs",
"type": "github"
}
@@ -65,11 +65,11 @@
"rust-analyzer-src": {
"flake": false,
"locked": {
"lastModified": 1760014945,
"narHash": "sha256-ySdl7F9+oeWNHVrg3QL/brazqmJvYFEdpGnF3pyoDH8=",
"lastModified": 1765120009,
"narHash": "sha256-nG76b87rkaDzibWbnB5bYDm6a52b78A+fpm+03pqYIw=",
"owner": "rust-lang",
"repo": "rust-analyzer",
"rev": "90d2e1ce4dfe7dc49250a8b88a0f08ffdb9cb23f",
"rev": "5e3e9c4e61bba8a5e72134b9ffefbef8f531d008",
"type": "github"
},
"original": {

View File

@@ -2,7 +2,7 @@
description = "Development environment flake";
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-25.05";
nixpkgs.url = "github:NixOS/nixpkgs/nixos-25.11";
fenix = {
url = "github:nix-community/fenix";
inputs.nixpkgs.follows = "nixpkgs";
@@ -48,7 +48,7 @@
gnuplot ## for cargo bench
];
LD_LIBRARY_PATH = pkgs.lib.makeLibraryPath buildInputs;
buildInputs = buildInputs;
NIX_HARDENING_ENABLE = "";
};
});

View File

@@ -708,6 +708,7 @@ fn ddl_request_type(request: &DdlRequest) -> &'static str {
Some(Expr::CreateView(_)) => "ddl.create_view",
Some(Expr::DropView(_)) => "ddl.drop_view",
Some(Expr::AlterDatabase(_)) => "ddl.alter_database",
Some(Expr::CommentOn(_)) => "ddl.comment_on",
None => "ddl.empty",
}
}

View File

@@ -15,11 +15,11 @@ workspace = true
api.workspace = true
async-trait.workspace = true
common-base.workspace = true
common-config.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-telemetry.workspace = true
digest = "0.10"
notify.workspace = true
sha1 = "0.10"
snafu.workspace = true
sql.workspace = true

View File

@@ -75,11 +75,12 @@ pub enum Error {
username: String,
},
#[snafu(display("Failed to initialize a watcher for file {}", path))]
#[snafu(display("Failed to initialize a file watcher"))]
FileWatch {
path: String,
#[snafu(source)]
error: notify::Error,
source: common_config::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("User is not authorized to perform this action"))]

View File

@@ -12,16 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::Path;
use std::sync::mpsc::channel;
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use common_config::file_watcher::{FileWatcherBuilder, FileWatcherConfig};
use common_telemetry::{info, warn};
use notify::{EventKind, RecursiveMode, Watcher};
use snafu::{ResultExt, ensure};
use snafu::ResultExt;
use crate::error::{FileWatchSnafu, InvalidConfigSnafu, Result};
use crate::error::{FileWatchSnafu, Result};
use crate::user_provider::{UserInfoMap, authenticate_with_credential, load_credential_from_file};
use crate::{Identity, Password, UserInfoRef, UserProvider};
@@ -41,61 +39,36 @@ impl WatchFileUserProvider {
pub fn new(filepath: &str) -> Result<Self> {
let credential = load_credential_from_file(filepath)?;
let users = Arc::new(Mutex::new(credential));
let this = WatchFileUserProvider {
users: users.clone(),
};
let (tx, rx) = channel::<notify::Result<notify::Event>>();
let mut debouncer =
notify::recommended_watcher(tx).context(FileWatchSnafu { path: "<none>" })?;
let mut dir = Path::new(filepath).to_path_buf();
ensure!(
dir.pop(),
InvalidConfigSnafu {
value: filepath,
msg: "UserProvider path must be a file path",
}
);
debouncer
.watch(&dir, RecursiveMode::NonRecursive)
.context(FileWatchSnafu { path: filepath })?;
let users_clone = users.clone();
let filepath_owned = filepath.to_string();
let filepath = filepath.to_string();
std::thread::spawn(move || {
let filename = Path::new(&filepath).file_name();
let _hold = debouncer;
while let Ok(res) = rx.recv() {
if let Ok(event) = res {
let is_this_file = event.paths.iter().any(|p| p.file_name() == filename);
let is_relevant_event = matches!(
event.kind,
EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_)
FileWatcherBuilder::new()
.watch_path(filepath)
.context(FileWatchSnafu)?
.config(FileWatcherConfig::new())
.spawn(move || match load_credential_from_file(&filepath_owned) {
Ok(credential) => {
let mut users = users_clone.lock().expect("users credential must be valid");
#[cfg(not(test))]
info!("User provider file {} reloaded", &filepath_owned);
#[cfg(test)]
info!(
"User provider file {} reloaded: {:?}",
&filepath_owned, credential
);
if is_this_file && is_relevant_event {
info!(?event.kind, "User provider file {} changed", &filepath);
match load_credential_from_file(&filepath) {
Ok(credential) => {
let mut users =
users.lock().expect("users credential must be valid");
#[cfg(not(test))]
info!("User provider file {filepath} reloaded");
#[cfg(test)]
info!("User provider file {filepath} reloaded: {credential:?}");
*users = credential;
}
Err(err) => {
warn!(
?err,
"Fail to load credential from file {filepath}; keep the old one",
)
}
}
}
*users = credential;
}
}
});
Err(err) => {
warn!(
?err,
"Fail to load credential from file {}; keep the old one", &filepath_owned
)
}
})
.context(FileWatchSnafu)?;
Ok(this)
Ok(WatchFileUserProvider { users })
}
}

View File

@@ -89,6 +89,10 @@ wrap_with_clap_prefix! {
region: Option<String>,
#[doc = "Enable virtual host style for the object store."]
enable_virtual_host_style: bool = Default::default(),
#[doc = "Allow anonymous access (disable credential signing) for testing."]
allow_anonymous: bool = Default::default(),
#[doc = "Disable config load from environment and files for testing."]
disable_config_load: bool = Default::default(),
}
}

View File

@@ -163,7 +163,7 @@ impl ObjbenchCommand {
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
index_file_id: None,
index_version: 0,
num_rows,
num_row_groups,
sequence: None,
@@ -564,7 +564,7 @@ fn new_noop_file_purger() -> FilePurgerRef {
#[derive(Debug)]
struct Noop;
impl FilePurger for Noop {
fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool) {}
fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool, _index_outdated: bool) {}
}
Arc::new(Noop)
}

View File

@@ -35,6 +35,7 @@ use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::suspend::SuspendHandler;
use common_query::prelude::set_default_prefix;
use common_stat::ResourceStatImpl;
use common_telemetry::info;
@@ -45,7 +46,7 @@ use frontend::frontend::Frontend;
use frontend::heartbeat::HeartbeatTask;
use frontend::instance::builder::FrontendBuilder;
use frontend::server::Services;
use meta_client::{MetaClientOptions, MetaClientType};
use meta_client::{MetaClientOptions, MetaClientRef, MetaClientType};
use plugins::frontend::context::{
CatalogManagerConfigureContext, DistributedCatalogManagerConfigureContext,
};
@@ -440,30 +441,13 @@ impl StartCommand {
};
let catalog_manager = builder.build();
let executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())),
]);
let mut resource_stat = ResourceStatImpl::default();
resource_stat.start_collect_cpu_usage();
let heartbeat_task = HeartbeatTask::new(
&opts,
meta_client.clone(),
opts.heartbeat.clone(),
Arc::new(executor),
Arc::new(resource_stat),
);
let heartbeat_task = Some(heartbeat_task);
let instance = FrontendBuilder::new(
opts.clone(),
cached_meta_backend.clone(),
layered_cache_registry.clone(),
catalog_manager,
client,
meta_client,
meta_client.clone(),
process_manager,
)
.with_plugin(plugins.clone())
@@ -471,6 +455,9 @@ impl StartCommand {
.try_build()
.await
.context(error::StartFrontendSnafu)?;
let heartbeat_task = Some(create_heartbeat_task(&opts, meta_client, &instance));
let instance = Arc::new(instance);
let servers = Services::new(opts, instance.clone(), plugins)
@@ -487,6 +474,28 @@ impl StartCommand {
}
}
pub fn create_heartbeat_task(
options: &frontend::frontend::FrontendOptions,
meta_client: MetaClientRef,
instance: &frontend::instance::Instance,
) -> HeartbeatTask {
let executor = Arc::new(HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
Arc::new(SuspendHandler::new(instance.suspend_state())),
Arc::new(InvalidateCacheHandler::new(
instance.cache_invalidator().clone(),
)),
]));
let stat = {
let mut stat = ResourceStatImpl::default();
stat.start_collect_cpu_usage();
Arc::new(stat)
};
HeartbeatTask::new(options, meta_client, executor, stat)
}
#[cfg(test)]
mod tests {
use std::io::Write;

View File

@@ -11,8 +11,10 @@ workspace = true
common-base.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-telemetry.workspace = true
config.workspace = true
humantime-serde.workspace = true
notify.workspace = true
object-store.workspace = true
serde.workspace = true
serde_json.workspace = true

View File

@@ -49,14 +49,41 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to watch file: {}", path))]
FileWatch {
path: String,
#[snafu(source)]
error: notify::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to canonicalize path: {}", path))]
CanonicalizePath {
path: String,
#[snafu(source)]
error: std::io::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid path '{}': expected a file, not a directory", path))]
InvalidPath {
path: String,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::TomlFormat { .. } | Error::LoadLayeredConfig { .. } => {
StatusCode::InvalidArguments
}
Error::TomlFormat { .. }
| Error::LoadLayeredConfig { .. }
| Error::FileWatch { .. }
| Error::InvalidPath { .. }
| Error::CanonicalizePath { .. } => StatusCode::InvalidArguments,
Error::SerdeJson { .. } => StatusCode::Unexpected,
}
}

View File

@@ -0,0 +1,355 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Common file watching utilities for configuration hot-reloading.
//!
//! This module provides a generic file watcher that can be used to watch
//! files for changes and trigger callbacks when changes occur.
//!
//! The watcher monitors the parent directory of each file rather than the
//! file itself. This ensures that file deletions and recreations are properly
//! tracked, which is common with editors that use atomic saves or when
//! configuration files are replaced.
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::sync::mpsc::channel;
use common_telemetry::{error, info, warn};
use notify::{EventKind, RecursiveMode, Watcher};
use snafu::ResultExt;
use crate::error::{CanonicalizePathSnafu, FileWatchSnafu, InvalidPathSnafu, Result};
/// Configuration for the file watcher behavior.
#[derive(Debug, Clone, Default)]
pub struct FileWatcherConfig {
/// Whether to include Remove events in addition to Modify and Create.
pub include_remove_events: bool,
}
impl FileWatcherConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_modify_and_create(mut self) -> Self {
self.include_remove_events = false;
self
}
pub fn with_remove_events(mut self) -> Self {
self.include_remove_events = true;
self
}
}
/// A builder for creating file watchers with flexible configuration.
///
/// The watcher monitors the parent directory of each file to handle file
/// deletion and recreation properly. Events are filtered to only trigger
/// callbacks for the specific files being watched.
pub struct FileWatcherBuilder {
config: FileWatcherConfig,
/// Canonicalized paths of files to watch.
file_paths: Vec<PathBuf>,
}
impl FileWatcherBuilder {
/// Create a new builder with default configuration.
pub fn new() -> Self {
Self {
config: FileWatcherConfig::default(),
file_paths: Vec::new(),
}
}
/// Set the watcher configuration.
pub fn config(mut self, config: FileWatcherConfig) -> Self {
self.config = config;
self
}
/// Add a file path to watch.
///
/// Returns an error if the path is a directory.
/// The path is canonicalized for reliable comparison with events.
pub fn watch_path<P: AsRef<Path>>(mut self, path: P) -> Result<Self> {
let path = path.as_ref();
snafu::ensure!(
path.is_file(),
InvalidPathSnafu {
path: path.display().to_string(),
}
);
// Canonicalize the path for reliable comparison with event paths
let canonical = path.canonicalize().context(CanonicalizePathSnafu {
path: path.display().to_string(),
})?;
self.file_paths.push(canonical);
Ok(self)
}
/// Add multiple file paths to watch.
///
/// Returns an error if any path is a directory.
pub fn watch_paths<P: AsRef<Path>, I: IntoIterator<Item = P>>(
mut self,
paths: I,
) -> Result<Self> {
for path in paths {
self = self.watch_path(path)?;
}
Ok(self)
}
/// Build and spawn the file watcher with the given callback.
///
/// The callback is invoked when relevant file events are detected for
/// the watched files. The watcher monitors the parent directories to
/// handle file deletion and recreation properly.
///
/// The spawned watcher thread runs for the lifetime of the process.
pub fn spawn<F>(self, callback: F) -> Result<()>
where
F: Fn() + Send + 'static,
{
let (tx, rx) = channel::<notify::Result<notify::Event>>();
let mut watcher =
notify::recommended_watcher(tx).context(FileWatchSnafu { path: "<none>" })?;
// Collect unique parent directories to watch
let mut watched_dirs: HashSet<PathBuf> = HashSet::new();
for file_path in &self.file_paths {
if let Some(parent) = file_path.parent()
&& watched_dirs.insert(parent.to_path_buf())
{
watcher
.watch(parent, RecursiveMode::NonRecursive)
.context(FileWatchSnafu {
path: parent.display().to_string(),
})?;
}
}
let config = self.config;
let watched_files: HashSet<PathBuf> = self.file_paths.iter().cloned().collect();
info!(
"Spawning file watcher for paths: {:?} (watching parent directories)",
self.file_paths
.iter()
.map(|p| p.display().to_string())
.collect::<Vec<_>>()
);
std::thread::spawn(move || {
// Keep watcher alive in the thread
let _watcher = watcher;
while let Ok(res) = rx.recv() {
match res {
Ok(event) => {
if !is_relevant_event(&event.kind, &config) {
continue;
}
// Check if any of the event paths match our watched files
let is_watched_file = event.paths.iter().any(|event_path| {
// Try to canonicalize the event path for comparison
// If the file was deleted, canonicalize will fail, so we also
// compare the raw path
if let Ok(canonical) = event_path.canonicalize()
&& watched_files.contains(&canonical)
{
return true;
}
// For deleted files, compare using the raw path
watched_files.contains(event_path)
});
if !is_watched_file {
continue;
}
info!(?event.kind, ?event.paths, "Detected file change");
callback();
}
Err(err) => {
warn!("File watcher error: {}", err);
}
}
}
error!("File watcher channel closed unexpectedly");
});
Ok(())
}
}
impl Default for FileWatcherBuilder {
fn default() -> Self {
Self::new()
}
}
/// Check if an event kind is relevant based on the configuration.
fn is_relevant_event(kind: &EventKind, config: &FileWatcherConfig) -> bool {
match kind {
EventKind::Modify(_) | EventKind::Create(_) => true,
EventKind::Remove(_) => config.include_remove_events,
_ => false,
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use common_test_util::temp_dir::create_temp_dir;
use super::*;
#[test]
fn test_file_watcher_detects_changes() {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("test_file_watcher");
let file_path = dir.path().join("test_file.txt");
// Create initial file
std::fs::write(&file_path, "initial content").unwrap();
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
FileWatcherBuilder::new()
.watch_path(&file_path)
.unwrap()
.config(FileWatcherConfig::new())
.spawn(move || {
counter_clone.fetch_add(1, Ordering::SeqCst);
})
.unwrap();
// Give watcher time to start
std::thread::sleep(Duration::from_millis(100));
// Modify the file
std::fs::write(&file_path, "modified content").unwrap();
// Wait for the event to be processed
std::thread::sleep(Duration::from_millis(500));
assert!(
counter.load(Ordering::SeqCst) >= 1,
"Watcher should have detected at least one change"
);
}
#[test]
fn test_file_watcher_detects_delete_and_recreate() {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("test_file_watcher_recreate");
let file_path = dir.path().join("test_file.txt");
// Create initial file
std::fs::write(&file_path, "initial content").unwrap();
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
FileWatcherBuilder::new()
.watch_path(&file_path)
.unwrap()
.config(FileWatcherConfig::new())
.spawn(move || {
counter_clone.fetch_add(1, Ordering::SeqCst);
})
.unwrap();
// Give watcher time to start
std::thread::sleep(Duration::from_millis(100));
// Delete the file
std::fs::remove_file(&file_path).unwrap();
std::thread::sleep(Duration::from_millis(100));
// Recreate the file - this should still be detected because we watch the directory
std::fs::write(&file_path, "recreated content").unwrap();
// Wait for the event to be processed
std::thread::sleep(Duration::from_millis(500));
assert!(
counter.load(Ordering::SeqCst) >= 1,
"Watcher should have detected file recreation"
);
}
#[test]
fn test_file_watcher_ignores_other_files() {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("test_file_watcher_other");
let watched_file = dir.path().join("watched.txt");
let other_file = dir.path().join("other.txt");
// Create both files
std::fs::write(&watched_file, "watched content").unwrap();
std::fs::write(&other_file, "other content").unwrap();
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
FileWatcherBuilder::new()
.watch_path(&watched_file)
.unwrap()
.config(FileWatcherConfig::new())
.spawn(move || {
counter_clone.fetch_add(1, Ordering::SeqCst);
})
.unwrap();
// Give watcher time to start
std::thread::sleep(Duration::from_millis(100));
// Modify the other file - should NOT trigger callback
std::fs::write(&other_file, "modified other content").unwrap();
// Wait for potential event
std::thread::sleep(Duration::from_millis(500));
assert_eq!(
counter.load(Ordering::SeqCst),
0,
"Watcher should not have detected changes to other files"
);
// Now modify the watched file - SHOULD trigger callback
std::fs::write(&watched_file, "modified watched content").unwrap();
// Wait for the event to be processed
std::thread::sleep(Duration::from_millis(500));
assert!(
counter.load(Ordering::SeqCst) >= 1,
"Watcher should have detected change to watched file"
);
}
}

View File

@@ -14,6 +14,7 @@
pub mod config;
pub mod error;
pub mod file_watcher;
use std::time::Duration;

View File

@@ -21,6 +21,8 @@ pub mod status_code;
use http::{HeaderMap, HeaderValue};
pub use snafu;
use crate::status_code::StatusCode;
// HACK - these headers are here for shared in gRPC services. For common HTTP headers,
// please define in `src/servers/src/http/header.rs`.
pub const GREPTIME_DB_HEADER_ERROR_CODE: &str = "x-greptime-err-code";
@@ -46,6 +48,29 @@ pub fn from_err_code_msg_to_header(code: u32, msg: &str) -> HeaderMap {
header
}
/// Extract [StatusCode] and error message from [HeaderMap], if any.
///
/// Note that if the [StatusCode] is illegal, for example, a random number that is not pre-defined
/// as a [StatusCode], the result is still `None`.
pub fn from_header_to_err_code_msg(headers: &HeaderMap) -> Option<(StatusCode, &str)> {
let code = headers
.get(GREPTIME_DB_HEADER_ERROR_CODE)
.and_then(|value| {
value
.to_str()
.ok()
.and_then(|x| x.parse::<u32>().ok())
.and_then(StatusCode::from_u32)
});
let msg = headers
.get(GREPTIME_DB_HEADER_ERROR_MSG)
.and_then(|x| x.to_str().ok());
match (code, msg) {
(Some(code), Some(msg)) => Some((code, msg)),
_ => None,
}
}
/// Returns the external root cause of the source error (exclude the current error).
pub fn root_source(err: &dyn std::error::Error) -> Option<&dyn std::error::Error> {
// There are some divergence about the behavior of the `sources()` API

View File

@@ -42,6 +42,8 @@ pub enum StatusCode {
External = 1007,
/// The request is deadline exceeded (typically server-side).
DeadlineExceeded = 1008,
/// Service got suspended for various reason. For example, resources exceed limit.
Suspended = 1009,
// ====== End of common status code ================
// ====== Begin of SQL related status code =========
@@ -175,7 +177,8 @@ impl StatusCode {
| StatusCode::AccessDenied
| StatusCode::PermissionDenied
| StatusCode::RequestOutdated
| StatusCode::External => false,
| StatusCode::External
| StatusCode::Suspended => false,
}
}
@@ -223,7 +226,8 @@ impl StatusCode {
| StatusCode::InvalidAuthHeader
| StatusCode::AccessDenied
| StatusCode::PermissionDenied
| StatusCode::RequestOutdated => false,
| StatusCode::RequestOutdated
| StatusCode::Suspended => false,
}
}
@@ -347,7 +351,8 @@ pub fn status_to_tonic_code(status_code: StatusCode) -> Code {
| StatusCode::RegionNotReady => Code::Unavailable,
StatusCode::RuntimeResourcesExhausted
| StatusCode::RateLimited
| StatusCode::RegionBusy => Code::ResourceExhausted,
| StatusCode::RegionBusy
| StatusCode::Suspended => Code::ResourceExhausted,
StatusCode::UnsupportedPasswordType
| StatusCode::UserPasswordMismatch
| StatusCode::AuthHeaderNotFound

View File

@@ -39,7 +39,7 @@ datafusion-functions-aggregate-common.workspace = true
datafusion-pg-catalog.workspace = true
datafusion-physical-expr.workspace = true
datatypes.workspace = true
derive_more = { version = "1", default-features = false, features = ["display"] }
derive_more.workspace = true
geo = { version = "0.29", optional = true }
geo-types = { version = "0.7", optional = true }
geohash = { version = "0.13", optional = true }

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Display;
use std::sync::Arc;
use datafusion_common::arrow::array::{Array, AsArray, BooleanBuilder};

View File

@@ -387,6 +387,8 @@ impl PGCatalogFunction {
registry.register(pg_catalog::create_pg_stat_get_numscans());
registry.register(pg_catalog::create_pg_get_constraintdef());
registry.register(pg_catalog::create_pg_get_partition_ancestors_udf());
registry.register(pg_catalog::quote_ident_udf::create_quote_ident_udf());
registry.register(pg_catalog::quote_ident_udf::create_parse_ident_udf());
registry.register_scalar(ObjDescriptionFunction::new());
registry.register_scalar(ColDescriptionFunction::new());
registry.register_scalar(ShobjDescriptionFunction::new());

View File

@@ -12,6 +12,7 @@ api.workspace = true
arrow-flight.workspace = true
bytes.workspace = true
common-base.workspace = true
common-config.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-recordbatch.workspace = true
@@ -23,7 +24,6 @@ datatypes.workspace = true
flatbuffers = "25.2"
hyper.workspace = true
lazy_static.workspace = true
notify.workspace = true
prost.workspace = true
serde.workspace = true
serde_json.workspace = true

View File

@@ -38,11 +38,10 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to watch config file path: {}", path))]
#[snafu(display("Failed to watch config file"))]
FileWatch {
path: String,
#[snafu(source)]
error: notify::Error,
source: common_config::error::Error,
#[snafu(implicit)]
location: Location,
},

View File

@@ -15,11 +15,10 @@
use std::path::Path;
use std::result::Result as StdResult;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use common_config::file_watcher::{FileWatcherBuilder, FileWatcherConfig};
use common_telemetry::{error, info};
use notify::{EventKind, RecursiveMode, Watcher};
use snafu::ResultExt;
use crate::error::{FileWatchSnafu, Result};
@@ -119,45 +118,28 @@ where
return Ok(());
}
let watch_paths: Vec<_> = tls_config
.get_tls_option()
.watch_paths()
.iter()
.map(|p| p.to_path_buf())
.collect();
let tls_config_for_watcher = tls_config.clone();
let (tx, rx) = channel::<notify::Result<notify::Event>>();
let mut watcher = notify::recommended_watcher(tx).context(FileWatchSnafu { path: "<none>" })?;
// Watch all paths returned by the TlsConfigLoader
for path in tls_config.get_tls_option().watch_paths() {
watcher
.watch(path, RecursiveMode::NonRecursive)
.with_context(|_| FileWatchSnafu {
path: path.display().to_string(),
})?;
}
info!("Spawning background task for watching TLS cert/key file changes");
std::thread::spawn(move || {
let _watcher = watcher;
loop {
match rx.recv() {
Ok(Ok(event)) => {
if let EventKind::Modify(_) | EventKind::Create(_) = event.kind {
info!("Detected TLS cert/key file change: {:?}", event);
if let Err(err) = tls_config_for_watcher.reload() {
error!("Failed to reload TLS config: {}", err);
} else {
info!("Reloaded TLS cert/key file successfully.");
on_reload();
}
}
}
Ok(Err(err)) => {
error!("Failed to watch TLS cert/key file: {}", err);
}
Err(err) => {
error!("TLS cert/key file watcher channel closed: {}", err);
}
FileWatcherBuilder::new()
.watch_paths(&watch_paths)
.context(FileWatchSnafu)?
.config(FileWatcherConfig::new())
.spawn(move || {
if let Err(err) = tls_config_for_watcher.reload() {
error!("Failed to reload TLS config: {}", err);
} else {
info!("Reloaded TLS cert/key file successfully.");
on_reload();
}
}
});
})
.context(FileWatchSnafu)?;
Ok(())
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{Display, Formatter};
use std::hash::{DefaultHasher, Hash, Hasher};
use std::str::FromStr;
@@ -60,7 +61,7 @@ pub trait ClusterInfo {
}
/// The key of [NodeInfo] in the storage. The format is `__meta_cluster_node_info-0-{role}-{node_id}`.
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize, PartialOrd, Ord)]
pub struct NodeInfoKey {
/// The role of the node. It can be `[Role::Datanode]` or `[Role::Frontend]`.
pub role: Role,
@@ -135,7 +136,7 @@ pub struct NodeInfo {
pub hostname: String,
}
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize, PartialOrd, Ord)]
pub enum Role {
Datanode,
Frontend,
@@ -241,6 +242,12 @@ impl From<&NodeInfoKey> for Vec<u8> {
}
}
impl Display for NodeInfoKey {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}-{}", self.role, self.node_id)
}
}
impl FromStr for NodeInfo {
type Err = Error;

View File

@@ -31,6 +31,7 @@ use crate::region_registry::LeaderRegionRegistryRef;
pub mod alter_database;
pub mod alter_logical_tables;
pub mod alter_table;
pub mod comment_on;
pub mod create_database;
pub mod create_flow;
pub mod create_logical_tables;

View File

@@ -301,8 +301,8 @@ fn build_new_table_info(
| AlterKind::UnsetTableOptions { .. }
| AlterKind::SetIndexes { .. }
| AlterKind::UnsetIndexes { .. }
| AlterKind::DropDefaults { .. } => {}
AlterKind::SetDefaults { .. } => {}
| AlterKind::DropDefaults { .. }
| AlterKind::SetDefaults { .. } => {}
}
info!(

View File

@@ -0,0 +1,509 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use async_trait::async_trait;
use chrono::Utc;
use common_catalog::format_full_table_name;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use common_telemetry::tracing::info;
use datatypes::schema::COMMENT_KEY as COLUMN_COMMENT_KEY;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, ensure};
use store_api::storage::TableId;
use strum::AsRefStr;
use table::metadata::RawTableInfo;
use table::requests::COMMENT_KEY as TABLE_COMMENT_KEY;
use table::table_name::TableName;
use crate::cache_invalidator::Context;
use crate::ddl::DdlContext;
use crate::ddl::utils::map_to_procedure_error;
use crate::error::{ColumnNotFoundSnafu, FlowNotFoundSnafu, Result, TableNotFoundSnafu};
use crate::instruction::CacheIdent;
use crate::key::flow::flow_info::{FlowInfoKey, FlowInfoValue};
use crate::key::table_info::{TableInfoKey, TableInfoValue};
use crate::key::table_name::TableNameKey;
use crate::key::{DeserializedValueWithBytes, FlowId, MetadataKey, MetadataValue};
use crate::lock_key::{CatalogLock, FlowNameLock, SchemaLock, TableNameLock};
use crate::rpc::ddl::{CommentObjectType, CommentOnTask};
use crate::rpc::store::PutRequest;
pub struct CommentOnProcedure {
pub context: DdlContext,
pub data: CommentOnData,
}
impl CommentOnProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::CommentOn";
pub fn new(task: CommentOnTask, context: DdlContext) -> Self {
Self {
context,
data: CommentOnData::new(task),
}
}
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data = serde_json::from_str(json).context(FromJsonSnafu)?;
Ok(Self { context, data })
}
pub async fn on_prepare(&mut self) -> Result<Status> {
match self.data.object_type {
CommentObjectType::Table | CommentObjectType::Column => {
self.prepare_table_or_column().await?;
}
CommentObjectType::Flow => {
self.prepare_flow().await?;
}
}
// Fast path: if comment is unchanged, skip update
if self.data.is_unchanged {
let object_desc = match self.data.object_type {
CommentObjectType::Table => format!(
"table {}",
format_full_table_name(
&self.data.catalog_name,
&self.data.schema_name,
&self.data.object_name,
)
),
CommentObjectType::Column => format!(
"column {}.{}",
format_full_table_name(
&self.data.catalog_name,
&self.data.schema_name,
&self.data.object_name,
),
self.data.column_name.as_ref().unwrap()
),
CommentObjectType::Flow => {
format!("flow {}.{}", self.data.catalog_name, self.data.object_name)
}
};
info!("Comment unchanged for {}, skipping update", object_desc);
return Ok(Status::done());
}
self.data.state = CommentOnState::UpdateMetadata;
Ok(Status::executing(true))
}
async fn prepare_table_or_column(&mut self) -> Result<()> {
let table_name_key = TableNameKey::new(
&self.data.catalog_name,
&self.data.schema_name,
&self.data.object_name,
);
let table_id = self
.context
.table_metadata_manager
.table_name_manager()
.get(table_name_key)
.await?
.with_context(|| TableNotFoundSnafu {
table_name: format_full_table_name(
&self.data.catalog_name,
&self.data.schema_name,
&self.data.object_name,
),
})?
.table_id();
let table_info = self
.context
.table_metadata_manager
.table_info_manager()
.get(table_id)
.await?
.with_context(|| TableNotFoundSnafu {
table_name: format_full_table_name(
&self.data.catalog_name,
&self.data.schema_name,
&self.data.object_name,
),
})?;
// For column comments, validate the column exists
if self.data.object_type == CommentObjectType::Column {
let column_name = self.data.column_name.as_ref().unwrap();
let column_exists = table_info
.table_info
.meta
.schema
.column_schemas
.iter()
.any(|col| &col.name == column_name);
ensure!(
column_exists,
ColumnNotFoundSnafu {
column_name,
column_id: 0u32, // column_id is not known here
}
);
}
self.data.table_id = Some(table_id);
// Check if comment is unchanged for early exit optimization
match self.data.object_type {
CommentObjectType::Table => {
let current_comment = &table_info.table_info.desc;
if &self.data.comment == current_comment {
self.data.is_unchanged = true;
}
}
CommentObjectType::Column => {
let column_name = self.data.column_name.as_ref().unwrap();
let column_schema = table_info
.table_info
.meta
.schema
.column_schemas
.iter()
.find(|col| &col.name == column_name)
.unwrap(); // Safe: validated above
let current_comment = column_schema.metadata().get(COLUMN_COMMENT_KEY);
if self.data.comment.as_deref() == current_comment.map(String::as_str) {
self.data.is_unchanged = true;
}
}
CommentObjectType::Flow => {
// this branch is handled in `prepare_flow`
}
}
self.data.table_info = Some(table_info);
Ok(())
}
async fn prepare_flow(&mut self) -> Result<()> {
let flow_name_value = self
.context
.flow_metadata_manager
.flow_name_manager()
.get(&self.data.catalog_name, &self.data.object_name)
.await?
.with_context(|| FlowNotFoundSnafu {
flow_name: &self.data.object_name,
})?;
let flow_id = flow_name_value.flow_id();
let flow_info = self
.context
.flow_metadata_manager
.flow_info_manager()
.get_raw(flow_id)
.await?
.with_context(|| FlowNotFoundSnafu {
flow_name: &self.data.object_name,
})?;
self.data.flow_id = Some(flow_id);
// Check if comment is unchanged for early exit optimization
let current_comment = &flow_info.get_inner_ref().comment;
let new_comment = self.data.comment.as_deref().unwrap_or("");
if new_comment == current_comment.as_str() {
self.data.is_unchanged = true;
}
self.data.flow_info = Some(flow_info);
Ok(())
}
pub async fn on_update_metadata(&mut self) -> Result<Status> {
match self.data.object_type {
CommentObjectType::Table => {
self.update_table_comment().await?;
}
CommentObjectType::Column => {
self.update_column_comment().await?;
}
CommentObjectType::Flow => {
self.update_flow_comment().await?;
}
}
self.data.state = CommentOnState::InvalidateCache;
Ok(Status::executing(true))
}
async fn update_table_comment(&mut self) -> Result<()> {
let table_info_value = self.data.table_info.as_ref().unwrap();
let mut new_table_info = table_info_value.table_info.clone();
new_table_info.desc = self.data.comment.clone();
// Sync comment to table options
sync_table_comment_option(
&mut new_table_info.meta.options,
new_table_info.desc.as_deref(),
);
self.update_table_info(table_info_value, new_table_info)
.await?;
info!(
"Updated comment for table {}.{}.{}",
self.data.catalog_name, self.data.schema_name, self.data.object_name
);
Ok(())
}
async fn update_column_comment(&mut self) -> Result<()> {
let table_info_value = self.data.table_info.as_ref().unwrap();
let mut new_table_info = table_info_value.table_info.clone();
let column_name = self.data.column_name.as_ref().unwrap();
let column_schema = new_table_info
.meta
.schema
.column_schemas
.iter_mut()
.find(|col| &col.name == column_name)
.unwrap(); // Safe: validated in prepare
update_column_comment_metadata(column_schema, self.data.comment.clone());
self.update_table_info(table_info_value, new_table_info)
.await?;
info!(
"Updated comment for column {}.{}.{}.{}",
self.data.catalog_name, self.data.schema_name, self.data.object_name, column_name
);
Ok(())
}
async fn update_flow_comment(&mut self) -> Result<()> {
let flow_id = self.data.flow_id.unwrap();
let flow_info_value = self.data.flow_info.as_ref().unwrap();
let mut new_flow_info = flow_info_value.get_inner_ref().clone();
new_flow_info.comment = self.data.comment.clone().unwrap_or_default();
new_flow_info.updated_time = Utc::now();
let raw_value = new_flow_info.try_as_raw_value()?;
self.context
.table_metadata_manager
.kv_backend()
.put(
PutRequest::new()
.with_key(FlowInfoKey::new(flow_id).to_bytes())
.with_value(raw_value),
)
.await?;
info!(
"Updated comment for flow {}.{}",
self.data.catalog_name, self.data.object_name
);
Ok(())
}
async fn update_table_info(
&self,
current_table_info: &DeserializedValueWithBytes<TableInfoValue>,
new_table_info: RawTableInfo,
) -> Result<()> {
let table_id = current_table_info.table_info.ident.table_id;
let new_table_info_value = current_table_info.update(new_table_info);
let raw_value = new_table_info_value.try_as_raw_value()?;
self.context
.table_metadata_manager
.kv_backend()
.put(
PutRequest::new()
.with_key(TableInfoKey::new(table_id).to_bytes())
.with_value(raw_value),
)
.await?;
Ok(())
}
pub async fn on_invalidate_cache(&mut self) -> Result<Status> {
let cache_invalidator = &self.context.cache_invalidator;
match self.data.object_type {
CommentObjectType::Table | CommentObjectType::Column => {
let table_id = self.data.table_id.unwrap();
let table_name = TableName::new(
self.data.catalog_name.clone(),
self.data.schema_name.clone(),
self.data.object_name.clone(),
);
let cache_ident = vec![
CacheIdent::TableId(table_id),
CacheIdent::TableName(table_name),
];
cache_invalidator
.invalidate(&Context::default(), &cache_ident)
.await?;
}
CommentObjectType::Flow => {
let flow_id = self.data.flow_id.unwrap();
let cache_ident = vec![CacheIdent::FlowId(flow_id)];
cache_invalidator
.invalidate(&Context::default(), &cache_ident)
.await?;
}
}
Ok(Status::done())
}
}
#[async_trait]
impl Procedure for CommentOnProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
match self.data.state {
CommentOnState::Prepare => self.on_prepare().await,
CommentOnState::UpdateMetadata => self.on_update_metadata().await,
CommentOnState::InvalidateCache => self.on_invalidate_cache().await,
}
.map_err(map_to_procedure_error)
}
fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
}
fn lock_key(&self) -> LockKey {
let catalog = &self.data.catalog_name;
let schema = &self.data.schema_name;
let lock_key = match self.data.object_type {
CommentObjectType::Table | CommentObjectType::Column => {
vec![
CatalogLock::Read(catalog).into(),
SchemaLock::read(catalog, schema).into(),
TableNameLock::new(catalog, schema, &self.data.object_name).into(),
]
}
CommentObjectType::Flow => {
vec![
CatalogLock::Read(catalog).into(),
FlowNameLock::new(catalog, &self.data.object_name).into(),
]
}
};
LockKey::new(lock_key)
}
}
#[derive(Debug, Serialize, Deserialize, AsRefStr)]
enum CommentOnState {
Prepare,
UpdateMetadata,
InvalidateCache,
}
/// The data of comment on procedure.
#[derive(Debug, Serialize, Deserialize)]
pub struct CommentOnData {
state: CommentOnState,
catalog_name: String,
schema_name: String,
object_type: CommentObjectType,
object_name: String,
/// Column name (only for Column comments)
column_name: Option<String>,
comment: Option<String>,
/// Cached table ID (for Table/Column)
#[serde(skip_serializing_if = "Option::is_none")]
table_id: Option<TableId>,
/// Cached table info (for Table/Column)
#[serde(skip)]
table_info: Option<DeserializedValueWithBytes<TableInfoValue>>,
/// Cached flow ID (for Flow)
#[serde(skip_serializing_if = "Option::is_none")]
flow_id: Option<FlowId>,
/// Cached flow info (for Flow)
#[serde(skip)]
flow_info: Option<DeserializedValueWithBytes<FlowInfoValue>>,
/// Whether the comment is unchanged (optimization for early exit)
#[serde(skip)]
is_unchanged: bool,
}
impl CommentOnData {
pub fn new(task: CommentOnTask) -> Self {
Self {
state: CommentOnState::Prepare,
catalog_name: task.catalog_name,
schema_name: task.schema_name,
object_type: task.object_type,
object_name: task.object_name,
column_name: task.column_name,
comment: task.comment,
table_id: None,
table_info: None,
flow_id: None,
flow_info: None,
is_unchanged: false,
}
}
}
fn update_column_comment_metadata(
column_schema: &mut datatypes::schema::ColumnSchema,
comment: Option<String>,
) {
match comment {
Some(value) => {
column_schema
.mut_metadata()
.insert(COLUMN_COMMENT_KEY.to_string(), value);
}
None => {
column_schema.mut_metadata().remove(COLUMN_COMMENT_KEY);
}
}
}
fn sync_table_comment_option(options: &mut table::requests::TableOptions, comment: Option<&str>) {
match comment {
Some(value) => {
options
.extra_options
.insert(TABLE_COMMENT_KEY.to_string(), value.to_string());
}
None => {
options.extra_options.remove(TABLE_COMMENT_KEY);
}
}
}

View File

@@ -27,6 +27,7 @@ use store_api::storage::TableId;
use crate::ddl::alter_database::AlterDatabaseProcedure;
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
use crate::ddl::alter_table::AlterTableProcedure;
use crate::ddl::comment_on::CommentOnProcedure;
use crate::ddl::create_database::CreateDatabaseProcedure;
use crate::ddl::create_flow::CreateFlowProcedure;
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
@@ -52,18 +53,18 @@ use crate::rpc::ddl::DdlTask::CreateTrigger;
#[cfg(feature = "enterprise")]
use crate::rpc::ddl::DdlTask::DropTrigger;
use crate::rpc::ddl::DdlTask::{
AlterDatabase, AlterLogicalTables, AlterTable, CreateDatabase, CreateFlow, CreateLogicalTables,
CreateTable, CreateView, DropDatabase, DropFlow, DropLogicalTables, DropTable, DropView,
TruncateTable,
AlterDatabase, AlterLogicalTables, AlterTable, CommentOn, CreateDatabase, CreateFlow,
CreateLogicalTables, CreateTable, CreateView, DropDatabase, DropFlow, DropLogicalTables,
DropTable, DropView, TruncateTable,
};
#[cfg(feature = "enterprise")]
use crate::rpc::ddl::trigger::CreateTriggerTask;
#[cfg(feature = "enterprise")]
use crate::rpc::ddl::trigger::DropTriggerTask;
use crate::rpc::ddl::{
AlterDatabaseTask, AlterTableTask, CreateDatabaseTask, CreateFlowTask, CreateTableTask,
CreateViewTask, DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask, QueryContext,
SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
AlterDatabaseTask, AlterTableTask, CommentOnTask, CreateDatabaseTask, CreateFlowTask,
CreateTableTask, CreateViewTask, DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask,
QueryContext, SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
};
use crate::rpc::router::RegionRoute;
@@ -192,7 +193,8 @@ impl DdlManager {
TruncateTableProcedure,
CreateDatabaseProcedure,
DropDatabaseProcedure,
DropViewProcedure
DropViewProcedure,
CommentOnProcedure
);
for (type_name, loader_factory) in loaders {
@@ -408,6 +410,19 @@ impl DdlManager {
self.submit_procedure(procedure_with_id).await
}
/// Submits and executes a comment on task.
#[tracing::instrument(skip_all)]
pub async fn submit_comment_on_task(
&self,
comment_on_task: CommentOnTask,
) -> Result<(ProcedureId, Option<Output>)> {
let context = self.create_context();
let procedure = CommentOnProcedure::new(comment_on_task, context);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
}
async fn submit_procedure(
&self,
procedure_with_id: ProcedureWithId,
@@ -476,6 +491,7 @@ impl DdlManager {
handle_create_view_task(self, create_view_task).await
}
DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await,
CommentOn(comment_on_task) => handle_comment_on_task(self, comment_on_task).await,
#[cfg(feature = "enterprise")]
CreateTrigger(create_trigger_task) => {
handle_create_trigger_task(
@@ -907,6 +923,26 @@ async fn handle_create_view_task(
})
}
async fn handle_comment_on_task(
ddl_manager: &DdlManager,
comment_on_task: CommentOnTask,
) -> Result<SubmitDdlTaskResponse> {
let (id, _) = ddl_manager
.submit_comment_on_task(comment_on_task.clone())
.await?;
let procedure_id = id.to_string();
info!(
"Comment on {}.{}.{} is updated via procedure_id {id:?}",
comment_on_task.catalog_name, comment_on_task.schema_name, comment_on_task.object_name
);
Ok(SubmitDdlTaskResponse {
key: procedure_id.into(),
..Default::default()
})
}
#[cfg(test)]
mod tests {
use std::sync::Arc;

View File

@@ -14,6 +14,8 @@
use std::time::Duration;
use etcd_client::ConnectOptions;
/// Heartbeat interval time (is the basic unit of various time).
pub const HEARTBEAT_INTERVAL_MILLIS: u64 = 3000;
@@ -45,12 +47,18 @@ pub const META_KEEP_ALIVE_INTERVAL_SECS: u64 = META_LEASE_SECS / 2;
pub const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS + 1);
/// The keep-alive interval of the heartbeat channel.
pub const HEARTBEAT_CHANNEL_KEEP_ALIVE_INTERVAL_SECS: Duration =
Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS + 1);
pub const HEARTBEAT_CHANNEL_KEEP_ALIVE_INTERVAL_SECS: Duration = Duration::from_secs(15);
/// The keep-alive timeout of the heartbeat channel.
pub const HEARTBEAT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECS: Duration =
Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS + 1);
pub const HEARTBEAT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECS: Duration = Duration::from_secs(5);
/// The default options for the etcd client.
pub fn default_etcd_client_options() -> ConnectOptions {
ConnectOptions::new()
.with_keep_alive_while_idle(true)
.with_keep_alive(Duration::from_secs(15), Duration::from_secs(5))
.with_connect_timeout(Duration::from_secs(10))
}
/// The default mailbox round-trip timeout.
pub const MAILBOX_RTT_SECS: u64 = 1;

View File

@@ -272,13 +272,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to send message: {err_msg}"))]
SendMessage {
err_msg: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to serde json"))]
SerdeJson {
#[snafu(source)]
@@ -1118,7 +1111,7 @@ impl ErrorExt for Error {
| DeserializeFlexbuffers { .. }
| ConvertTimeRanges { .. } => StatusCode::Unexpected,
SendMessage { .. } | GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal,
GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal,
SchemaAlreadyExists { .. } => StatusCode::DatabaseAlreadyExists,

View File

@@ -23,6 +23,7 @@ use crate::heartbeat::mailbox::{IncomingMessage, MailboxRef};
pub mod invalidate_table_cache;
pub mod parse_mailbox_message;
pub mod suspend;
#[cfg(test)]
mod tests;

View File

@@ -0,0 +1,69 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use async_trait::async_trait;
use common_telemetry::{info, warn};
use crate::error::Result;
use crate::heartbeat::handler::{
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
};
use crate::instruction::Instruction;
/// A heartbeat response handler that handles special "suspend" error.
/// It will simply set or clear (if previously set) the inner suspend atomic state.
pub struct SuspendHandler {
suspend: Arc<AtomicBool>,
}
impl SuspendHandler {
pub fn new(suspend: Arc<AtomicBool>) -> Self {
Self { suspend }
}
}
#[async_trait]
impl HeartbeatResponseHandler for SuspendHandler {
fn is_acceptable(&self, context: &HeartbeatResponseHandlerContext) -> bool {
matches!(
context.incoming_message,
Some((_, Instruction::Suspend)) | None
)
}
async fn handle(&self, context: &mut HeartbeatResponseHandlerContext) -> Result<HandleControl> {
let flip_state = |expect: bool| {
self.suspend
.compare_exchange(expect, !expect, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
};
if let Some((_, Instruction::Suspend)) = context.incoming_message.take() {
if flip_state(false) {
warn!("Suspend instruction received from meta, entering suspension state");
}
} else {
// Suspended components are made always tried to get rid of this state, we don't want
// an "un-suspend" instruction to resume them running. That can be error-prone.
// So if the "suspend" instruction is not found in the heartbeat, just unset the state.
if flip_state(true) {
info!("clear suspend state");
}
}
Ok(HandleControl::Continue)
}
}

View File

@@ -15,8 +15,8 @@
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use tokio::sync::mpsc::error::SendError;
use crate::error::{self, Result};
use crate::instruction::{Instruction, InstructionReply};
pub type IncomingMessage = (MessageMeta, Instruction);
@@ -51,13 +51,8 @@ impl HeartbeatMailbox {
Self { sender }
}
pub async fn send(&self, message: OutgoingMessage) -> Result<()> {
self.sender.send(message).await.map_err(|e| {
error::SendMessageSnafu {
err_msg: e.to_string(),
}
.build()
})
pub async fn send(&self, message: OutgoingMessage) -> Result<(), SendError<OutgoingMessage>> {
self.sender.send(message).await
}
}

View File

@@ -539,6 +539,8 @@ pub enum Instruction {
GetFileRefs(GetFileRefs),
/// Triggers garbage collection for a region.
GcRegions(GcRegions),
/// Temporary suspend serving reads or writes
Suspend,
}
impl Instruction {

View File

@@ -94,7 +94,7 @@ impl TableInfoValue {
}
}
pub(crate) fn update(&self, new_table_info: RawTableInfo) -> Self {
pub fn update(&self, new_table_info: RawTableInfo) -> Self {
Self {
table_info: new_table_info,
version: self.version + 1,

View File

@@ -23,19 +23,20 @@ use api::v1::alter_database_expr::Kind as PbAlterDatabaseKind;
use api::v1::meta::ddl_task_request::Task;
use api::v1::meta::{
AlterDatabaseTask as PbAlterDatabaseTask, AlterTableTask as PbAlterTableTask,
AlterTableTasks as PbAlterTableTasks, CreateDatabaseTask as PbCreateDatabaseTask,
CreateFlowTask as PbCreateFlowTask, CreateTableTask as PbCreateTableTask,
CreateTableTasks as PbCreateTableTasks, CreateViewTask as PbCreateViewTask,
DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse,
DropDatabaseTask as PbDropDatabaseTask, DropFlowTask as PbDropFlowTask,
DropTableTask as PbDropTableTask, DropTableTasks as PbDropTableTasks,
DropViewTask as PbDropViewTask, Partition, ProcedureId,
AlterTableTasks as PbAlterTableTasks, CommentOnTask as PbCommentOnTask,
CreateDatabaseTask as PbCreateDatabaseTask, CreateFlowTask as PbCreateFlowTask,
CreateTableTask as PbCreateTableTask, CreateTableTasks as PbCreateTableTasks,
CreateViewTask as PbCreateViewTask, DdlTaskRequest as PbDdlTaskRequest,
DdlTaskResponse as PbDdlTaskResponse, DropDatabaseTask as PbDropDatabaseTask,
DropFlowTask as PbDropFlowTask, DropTableTask as PbDropTableTask,
DropTableTasks as PbDropTableTasks, DropViewTask as PbDropViewTask, Partition, ProcedureId,
TruncateTableTask as PbTruncateTableTask,
};
use api::v1::{
AlterDatabaseExpr, AlterTableExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr,
CreateViewExpr, DropDatabaseExpr, DropFlowExpr, DropTableExpr, DropViewExpr, EvalInterval,
ExpireAfter, Option as PbOption, QueryContext as PbQueryContext, TruncateTableExpr,
AlterDatabaseExpr, AlterTableExpr, CommentObjectType as PbCommentObjectType, CommentOnExpr,
CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropDatabaseExpr,
DropFlowExpr, DropTableExpr, DropViewExpr, EvalInterval, ExpireAfter, Option as PbOption,
QueryContext as PbQueryContext, TruncateTableExpr,
};
use base64::Engine as _;
use base64::engine::general_purpose;
@@ -78,6 +79,7 @@ pub enum DdlTask {
DropView(DropViewTask),
#[cfg(feature = "enterprise")]
CreateTrigger(trigger::CreateTriggerTask),
CommentOn(CommentOnTask),
}
impl DdlTask {
@@ -200,6 +202,11 @@ impl DdlTask {
view_info,
})
}
/// Creates a [`DdlTask`] to comment on a table, column, or flow.
pub fn new_comment_on(task: CommentOnTask) -> Self {
DdlTask::CommentOn(task)
}
}
impl TryFrom<Task> for DdlTask {
@@ -278,6 +285,7 @@ impl TryFrom<Task> for DdlTask {
.fail()
}
}
Task::CommentOnTask(comment_on) => Ok(DdlTask::CommentOn(comment_on.try_into()?)),
}
}
}
@@ -332,6 +340,7 @@ impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
DdlTask::CreateTrigger(task) => Task::CreateTriggerTask(task.try_into()?),
#[cfg(feature = "enterprise")]
DdlTask::DropTrigger(task) => Task::DropTriggerTask(task.into()),
DdlTask::CommentOn(task) => Task::CommentOnTask(task.into()),
};
Ok(Self {
@@ -1277,6 +1286,119 @@ impl From<DropFlowTask> for PbDropFlowTask {
}
}
/// Represents the ID of the object being commented on (Table or Flow).
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum CommentObjectId {
Table(TableId),
Flow(FlowId),
}
/// Comment on table, column, or flow
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct CommentOnTask {
pub catalog_name: String,
pub schema_name: String,
pub object_type: CommentObjectType,
pub object_name: String,
/// Column name (only for Column comments)
pub column_name: Option<String>,
/// Object ID (Table or Flow) for validation and cache invalidation
pub object_id: Option<CommentObjectId>,
pub comment: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum CommentObjectType {
Table,
Column,
Flow,
}
impl CommentOnTask {
pub fn table_ref(&self) -> TableReference<'_> {
TableReference {
catalog: &self.catalog_name,
schema: &self.schema_name,
table: &self.object_name,
}
}
}
// Proto conversions for CommentObjectType
impl From<CommentObjectType> for PbCommentObjectType {
fn from(object_type: CommentObjectType) -> Self {
match object_type {
CommentObjectType::Table => PbCommentObjectType::Table,
CommentObjectType::Column => PbCommentObjectType::Column,
CommentObjectType::Flow => PbCommentObjectType::Flow,
}
}
}
impl TryFrom<i32> for CommentObjectType {
type Error = error::Error;
fn try_from(value: i32) -> Result<Self> {
match value {
0 => Ok(CommentObjectType::Table),
1 => Ok(CommentObjectType::Column),
2 => Ok(CommentObjectType::Flow),
_ => error::InvalidProtoMsgSnafu {
err_msg: format!(
"Invalid CommentObjectType value: {}. Valid values are: 0 (Table), 1 (Column), 2 (Flow)",
value
),
}
.fail(),
}
}
}
// Proto conversions for CommentOnTask
impl TryFrom<PbCommentOnTask> for CommentOnTask {
type Error = error::Error;
fn try_from(pb: PbCommentOnTask) -> Result<Self> {
let comment_on = pb.comment_on.context(error::InvalidProtoMsgSnafu {
err_msg: "expected comment_on",
})?;
Ok(CommentOnTask {
catalog_name: comment_on.catalog_name,
schema_name: comment_on.schema_name,
object_type: comment_on.object_type.try_into()?,
object_name: comment_on.object_name,
column_name: if comment_on.column_name.is_empty() {
None
} else {
Some(comment_on.column_name)
},
comment: if comment_on.comment.is_empty() {
None
} else {
Some(comment_on.comment)
},
object_id: None,
})
}
}
impl From<CommentOnTask> for PbCommentOnTask {
fn from(task: CommentOnTask) -> Self {
let pb_object_type: PbCommentObjectType = task.object_type.into();
PbCommentOnTask {
comment_on: Some(CommentOnExpr {
catalog_name: task.catalog_name,
schema_name: task.schema_name,
object_type: pb_object_type as i32,
object_name: task.object_name,
column_name: task.column_name.unwrap_or_default(),
comment: task.comment.unwrap_or_default(),
}),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct QueryContext {
pub(crate) current_catalog: String,

View File

@@ -14,7 +14,7 @@
use common_telemetry::{debug, error, info};
use common_wal::config::kafka::common::{
DEFAULT_BACKOFF_CONFIG, KafkaConnectionConfig, KafkaTopicConfig,
DEFAULT_BACKOFF_CONFIG, DEFAULT_CONNECT_TIMEOUT, KafkaConnectionConfig, KafkaTopicConfig,
};
use rskafka::client::error::Error as RsKafkaError;
use rskafka::client::error::ProtocolError::TopicAlreadyExists;
@@ -205,11 +205,13 @@ impl KafkaTopicCreator {
self.partition_client(topic).await.unwrap()
}
}
/// Builds a kafka [Client](rskafka::client::Client).
pub async fn build_kafka_client(connection: &KafkaConnectionConfig) -> Result<Client> {
// Builds an kafka controller client for creating topics.
let mut builder = ClientBuilder::new(connection.broker_endpoints.clone())
.backoff_config(DEFAULT_BACKOFF_CONFIG);
.backoff_config(DEFAULT_BACKOFF_CONFIG)
.connect_timeout(Some(DEFAULT_CONNECT_TIMEOUT));
if let Some(sasl) = &connection.sasl {
builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
};

View File

@@ -331,8 +331,29 @@ impl Runner {
}
match status {
Status::Executing { .. } => {}
Status::Executing { .. } => {
let prev_state = self.meta.state();
if !matches!(prev_state, ProcedureState::Running) {
info!(
"Set Procedure {}-{} state to running, prev_state: {:?}",
self.procedure.type_name(),
self.meta.id,
prev_state
);
self.meta.set_state(ProcedureState::Running);
}
}
Status::Suspended { subprocedures, .. } => {
let prev_state = self.meta.state();
if !matches!(prev_state, ProcedureState::Running) {
info!(
"Set Procedure {}-{} state to running, prev_state: {:?}",
self.procedure.type_name(),
self.meta.id,
prev_state
);
self.meta.set_state(ProcedureState::Running);
}
self.on_suspended(subprocedures).await;
}
Status::Done { output } => {
@@ -393,8 +414,12 @@ impl Runner {
return;
}
self.meta
.set_state(ProcedureState::prepare_rollback(Arc::new(e)));
if self.procedure.rollback_supported() {
self.meta
.set_state(ProcedureState::prepare_rollback(Arc::new(e)));
} else {
self.meta.set_state(ProcedureState::failed(Arc::new(e)));
}
}
}
}
@@ -1080,20 +1105,10 @@ mod tests {
let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone());
runner.manager_ctx.start();
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_prepare_rollback(), "{state:?}");
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_failed(), "{state:?}");
check_files(
&object_store,
&procedure_store,
ctx.procedure_id,
&["0000000000.rollback"],
)
.await;
check_files(&object_store, &procedure_store, ctx.procedure_id, &[]).await;
}
#[tokio::test]
@@ -1146,6 +1161,8 @@ mod tests {
async move {
if times == 1 {
Err(Error::retry_later(MockError::new(StatusCode::Unexpected)))
} else if times == 2 {
Ok(Status::executing(false))
} else {
Ok(Status::done())
}
@@ -1172,6 +1189,10 @@ mod tests {
let state = runner.meta.state();
assert!(state.is_retrying(), "{state:?}");
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_running(), "{state:?}");
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_done(), "{state:?}");
@@ -1185,6 +1206,86 @@ mod tests {
.await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_execute_on_retry_later_error_with_child() {
common_telemetry::init_default_ut_logging();
let mut times = 0;
let child_id = ProcedureId::random();
let exec_fn = move |_| {
times += 1;
async move {
debug!("times: {}", times);
if times == 1 {
Err(Error::retry_later(MockError::new(StatusCode::Unexpected)))
} else if times == 2 {
let exec_fn = |_| {
async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }
.boxed()
};
let fail = ProcedureAdapter {
data: "fail".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table.region-0"),
poison_keys: PoisonKeys::default(),
exec_fn,
rollback_fn: None,
};
Ok(Status::Suspended {
subprocedures: vec![ProcedureWithId {
id: child_id,
procedure: Box::new(fail),
}],
persist: true,
})
} else {
Ok(Status::done())
}
}
.boxed()
};
let retry_later = ProcedureAdapter {
data: "retry_later".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
poison_keys: PoisonKeys::default(),
exec_fn,
rollback_fn: None,
};
let dir = create_temp_dir("retry_later");
let meta = retry_later.new_meta(ROOT_ID);
let ctx = context_without_provider(meta.id);
let object_store = test_util::new_object_store(&dir);
let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
runner.manager_ctx.start();
debug!("execute_once 1");
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_retrying(), "{state:?}");
let moved_meta = meta.clone();
tokio::spawn(async move {
moved_meta.child_notify.notify_one();
});
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_running(), "{state:?}");
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_done(), "{state:?}");
assert!(meta.state().is_done());
check_files(
&object_store,
&procedure_store,
ctx.procedure_id,
&["0000000000.step", "0000000001.commit"],
)
.await;
}
#[tokio::test]
async fn test_execute_exceed_max_retry_later() {
let exec_fn =
@@ -1304,7 +1405,7 @@ mod tests {
async fn test_child_error() {
let mut times = 0;
let child_id = ProcedureId::random();
common_telemetry::init_default_ut_logging();
let exec_fn = move |ctx: Context| {
times += 1;
async move {
@@ -1529,7 +1630,7 @@ mod tests {
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_prepare_rollback(), "{state:?}");
assert!(state.is_failed(), "{state:?}");
let procedure_id = runner
.manager_ctx
@@ -1596,11 +1697,6 @@ mod tests {
let state = runner.meta.state();
assert!(state.is_running(), "{state:?}");
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_prepare_rollback(), "{state:?}");
assert!(meta.state().is_prepare_rollback());
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_failed(), "{state:?}");

View File

@@ -46,6 +46,22 @@ pub enum OutputData {
Stream(SendableRecordBatchStream),
}
impl OutputData {
/// Consume the data to pretty printed string.
pub async fn pretty_print(self) -> String {
match self {
OutputData::AffectedRows(x) => {
format!("Affected Rows: {x}")
}
OutputData::RecordBatches(x) => x.pretty_print().unwrap_or_else(|e| e.to_string()),
OutputData::Stream(x) => common_recordbatch::util::collect_batches(x)
.await
.and_then(|x| x.pretty_print())
.unwrap_or_else(|e| e.to_string()),
}
}
}
/// OutputMeta stores meta information produced/generated during the execution
#[derive(Debug, Default)]
pub struct OutputMeta {

View File

@@ -36,6 +36,9 @@ pub const DEFAULT_BACKOFF_CONFIG: BackoffConfig = BackoffConfig {
deadline: Some(Duration::from_secs(3)),
};
/// The default connect timeout for kafka client.
pub const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
/// Default interval for auto WAL pruning.
pub const DEFAULT_AUTO_PRUNE_INTERVAL: Duration = Duration::from_mins(30);
/// Default limit for concurrent auto pruning tasks.

View File

@@ -22,6 +22,7 @@ use common_base::Plugins;
use common_error::ext::BoxedError;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_meta::cache::{LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef};
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::datanode::TopicStatsReporter;
use common_meta::key::runtime_switch::RuntimeSwitchManager;
use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
@@ -281,21 +282,11 @@ impl DatanodeBuilder {
open_all_regions.await?;
}
let mut resource_stat = ResourceStatImpl::default();
resource_stat.start_collect_cpu_usage();
let heartbeat_task = if let Some(meta_client) = meta_client {
Some(
HeartbeatTask::try_new(
&self.opts,
region_server.clone(),
meta_client,
cache_registry,
self.plugins.clone(),
Arc::new(resource_stat),
)
.await?,
)
let task = self
.create_heartbeat_task(&region_server, meta_client, cache_registry)
.await?;
Some(task)
} else {
None
};
@@ -324,6 +315,29 @@ impl DatanodeBuilder {
})
}
async fn create_heartbeat_task(
&self,
region_server: &RegionServer,
meta_client: MetaClientRef,
cache_invalidator: CacheInvalidatorRef,
) -> Result<HeartbeatTask> {
let stat = {
let mut stat = ResourceStatImpl::default();
stat.start_collect_cpu_usage();
Arc::new(stat)
};
HeartbeatTask::try_new(
&self.opts,
region_server.clone(),
meta_client,
cache_invalidator,
self.plugins.clone(),
stat,
)
.await
}
/// Builds [ObjectStoreManager] from [StorageConfig].
pub async fn build_object_store_manager(cfg: &StorageConfig) -> Result<ObjectStoreManagerRef> {
let object_store = store::new_object_store(cfg.store.clone(), &cfg.data_home).await?;

View File

@@ -25,6 +25,7 @@ use common_meta::datanode::REGION_STATISTIC_KEY;
use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::suspend::SuspendHandler;
use common_meta::heartbeat::handler::{
HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
};
@@ -91,6 +92,7 @@ impl HeartbeatTask {
let resp_handler_executor = Arc::new(HandlerGroupExecutor::new(vec![
region_alive_keeper.clone(),
Arc::new(ParseMailboxMessageHandler),
Arc::new(SuspendHandler::new(region_server.suspend_state())),
Arc::new(
RegionHeartbeatResponseHandler::new(region_server.clone())
.with_open_region_parallelism(opts.init_regions_parallelism),

View File

@@ -99,26 +99,30 @@ impl RegionHeartbeatResponseHandler {
self
}
fn build_handler(&self, instruction: &Instruction) -> MetaResult<Box<InstructionHandlers>> {
fn build_handler(
&self,
instruction: &Instruction,
) -> MetaResult<Option<Box<InstructionHandlers>>> {
match instruction {
Instruction::CloseRegions(_) => Ok(Box::new(CloseRegionsHandler.into())),
Instruction::OpenRegions(_) => Ok(Box::new(
Instruction::CloseRegions(_) => Ok(Some(Box::new(CloseRegionsHandler.into()))),
Instruction::OpenRegions(_) => Ok(Some(Box::new(
OpenRegionsHandler {
open_region_parallelism: self.open_region_parallelism,
}
.into(),
)),
Instruction::FlushRegions(_) => Ok(Box::new(FlushRegionsHandler.into())),
Instruction::DowngradeRegions(_) => Ok(Box::new(DowngradeRegionsHandler.into())),
Instruction::UpgradeRegions(_) => Ok(Box::new(
))),
Instruction::FlushRegions(_) => Ok(Some(Box::new(FlushRegionsHandler.into()))),
Instruction::DowngradeRegions(_) => Ok(Some(Box::new(DowngradeRegionsHandler.into()))),
Instruction::UpgradeRegions(_) => Ok(Some(Box::new(
UpgradeRegionsHandler {
upgrade_region_parallelism: self.open_region_parallelism,
}
.into(),
)),
Instruction::GetFileRefs(_) => Ok(Box::new(GetFileRefsHandler.into())),
Instruction::GcRegions(_) => Ok(Box::new(GcRegionsHandler.into())),
))),
Instruction::GetFileRefs(_) => Ok(Some(Box::new(GetFileRefsHandler.into()))),
Instruction::GcRegions(_) => Ok(Some(Box::new(GcRegionsHandler.into()))),
Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
Instruction::Suspend => Ok(None),
}
}
}
@@ -216,30 +220,24 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
.context(InvalidHeartbeatResponseSnafu)?;
let mailbox = ctx.mailbox.clone();
let region_server = self.region_server.clone();
let downgrade_tasks = self.downgrade_tasks.clone();
let flush_tasks = self.flush_tasks.clone();
let gc_tasks = self.gc_tasks.clone();
let handler = self.build_handler(&instruction)?;
let _handle = common_runtime::spawn_global(async move {
let reply = handler
.handle(
&HandlerContext {
region_server,
downgrade_tasks,
flush_tasks,
gc_tasks,
},
instruction,
)
.await;
if let Some(reply) = reply
&& let Err(e) = mailbox.send((meta, reply)).await
{
error!(e; "Failed to send reply to mailbox");
}
});
if let Some(handler) = self.build_handler(&instruction)? {
let context = HandlerContext {
region_server: self.region_server.clone(),
downgrade_tasks: self.downgrade_tasks.clone(),
flush_tasks: self.flush_tasks.clone(),
gc_tasks: self.gc_tasks.clone(),
};
let _handle = common_runtime::spawn_global(async move {
let reply = handler.handle(&context, instruction).await;
if let Some(reply) = reply
&& let Err(e) = mailbox.send((meta, reply)).await
{
let error = e.to_string();
let (meta, reply) = e.0;
error!("Failed to send reply {reply} to {meta:?}: {error}");
}
});
}
Ok(HandleControl::Continue)
}

View File

@@ -17,6 +17,7 @@ mod catalog;
use std::collections::HashMap;
use std::fmt::Debug;
use std::ops::Deref;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::time::Duration;
@@ -52,7 +53,9 @@ pub use query::dummy_catalog::{
DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef,
};
use serde_json;
use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult};
use servers::error::{
self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult, SuspendedSnafu,
};
use servers::grpc::FlightCompression;
use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
use servers::grpc::region_server::RegionServerHandler;
@@ -89,6 +92,7 @@ use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInj
pub struct RegionServer {
inner: Arc<RegionServerInner>,
flight_compression: FlightCompression,
suspend: Arc<AtomicBool>,
}
pub struct RegionStat {
@@ -136,6 +140,7 @@ impl RegionServer {
),
)),
flight_compression,
suspend: Arc::new(AtomicBool::new(false)),
}
}
@@ -595,6 +600,14 @@ impl RegionServer {
.handle_sync_region(engine_with_status.engine(), region_id, manifest_info)
.await
}
fn is_suspended(&self) -> bool {
self.suspend.load(Ordering::Relaxed)
}
pub(crate) fn suspend_state(&self) -> Arc<AtomicBool> {
self.suspend.clone()
}
}
#[async_trait]
@@ -644,6 +657,8 @@ impl FlightCraft for RegionServer {
&self,
request: Request<Ticket>,
) -> TonicResult<Response<TonicStream<FlightData>>> {
ensure!(!self.is_suspended(), SuspendedSnafu);
let ticket = request.into_inner().ticket;
let request = api::v1::region::QueryRequest::decode(ticket.as_ref())
.context(servers_error::InvalidFlightTicketSnafu)?;

View File

@@ -17,6 +17,7 @@ arc-swap = "1.0"
async-stream.workspace = true
async-trait.workspace = true
auth.workspace = true
axum.workspace = true
bytes.workspace = true
cache.workspace = true
catalog.workspace = true
@@ -85,6 +86,9 @@ common-test-util.workspace = true
datanode.workspace = true
datatypes.workspace = true
futures.workspace = true
hyper-util = { workspace = true, features = ["tokio"] }
meta-srv.workspace = true
reqwest.workspace = true
serde_json.workspace = true
strfmt = "0.2"
tower.workspace = true

View File

@@ -364,6 +364,12 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Service suspended"))]
Suspended {
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -444,6 +450,8 @@ impl ErrorExt for Error {
Error::StatementTimeout { .. } => StatusCode::Cancelled,
Error::AcquireLimiter { .. } => StatusCode::Internal,
Error::Suspended { .. } => StatusCode::Suspended,
}
}

View File

@@ -141,7 +141,43 @@ impl Frontend {
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use api::v1::meta::heartbeat_server::HeartbeatServer;
use api::v1::meta::mailbox_message::Payload;
use api::v1::meta::{
AskLeaderRequest, AskLeaderResponse, HeartbeatRequest, HeartbeatResponse, MailboxMessage,
Peer, ResponseHeader, Role, heartbeat_server,
};
use async_trait::async_trait;
use client::{Client, Database};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::ErrorExt;
use common_error::from_header_to_err_code_msg;
use common_error::status_code::StatusCode;
use common_grpc::channel_manager::ChannelManager;
use common_meta::distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::suspend::SuspendHandler;
use common_meta::instruction::Instruction;
use common_stat::ResourceStatImpl;
use meta_client::MetaClientRef;
use meta_client::client::MetaClientBuilder;
use meta_srv::service::GrpcStream;
use servers::grpc::{FlightCompression, GRPC_SERVER};
use servers::http::HTTP_SERVER;
use servers::http::result::greptime_result_v1::GreptimedbV1Response;
use tokio::sync::mpsc;
use tonic::codec::CompressionEncoding;
use tonic::codegen::tokio_stream::StreamExt;
use tonic::codegen::tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status, Streaming};
use super::*;
use crate::instance::builder::FrontendBuilder;
use crate::server::Services;
#[test]
fn test_toml() {
@@ -149,4 +185,277 @@ mod tests {
let toml_string = toml::to_string(&opts).unwrap();
let _parsed: FrontendOptions = toml::from_str(&toml_string).unwrap();
}
struct SuspendableHeartbeatServer {
suspend: Arc<AtomicBool>,
}
#[async_trait]
impl heartbeat_server::Heartbeat for SuspendableHeartbeatServer {
type HeartbeatStream = GrpcStream<HeartbeatResponse>;
async fn heartbeat(
&self,
request: Request<Streaming<HeartbeatRequest>>,
) -> std::result::Result<Response<Self::HeartbeatStream>, Status> {
let (tx, rx) = mpsc::channel(4);
common_runtime::spawn_global({
let mut requests = request.into_inner();
let suspend = self.suspend.clone();
async move {
while let Some(request) = requests.next().await {
if let Err(e) = request {
let _ = tx.send(Err(e)).await;
return;
}
let mailbox_message =
suspend.load(Ordering::Relaxed).then(|| MailboxMessage {
payload: Some(Payload::Json(
serde_json::to_string(&Instruction::Suspend).unwrap(),
)),
..Default::default()
});
let response = HeartbeatResponse {
header: Some(ResponseHeader::success()),
mailbox_message,
..Default::default()
};
let _ = tx.send(Ok(response)).await;
}
}
});
Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
}
async fn ask_leader(
&self,
_: Request<AskLeaderRequest>,
) -> std::result::Result<Response<AskLeaderResponse>, Status> {
Ok(Response::new(AskLeaderResponse {
header: Some(ResponseHeader::success()),
leader: Some(Peer {
addr: "localhost:0".to_string(),
..Default::default()
}),
}))
}
}
async fn create_meta_client(
options: &MetaClientOptions,
heartbeat_server: Arc<SuspendableHeartbeatServer>,
) -> MetaClientRef {
let (client, server) = tokio::io::duplex(1024);
// create the heartbeat server:
common_runtime::spawn_global(async move {
let mut router = tonic::transport::Server::builder();
let router = router.add_service(
HeartbeatServer::from_arc(heartbeat_server)
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Zstd),
);
router
.serve_with_incoming(futures::stream::iter([Ok::<_, std::io::Error>(server)]))
.await
});
// Move client to an option so we can _move_ the inner value
// on the first attempt to connect. All other attempts will fail.
let mut client = Some(client);
let connector = tower::service_fn(move |_| {
let client = client.take();
async move {
if let Some(client) = client {
Ok(hyper_util::rt::TokioIo::new(client))
} else {
Err(std::io::Error::other("client already taken"))
}
}
});
let manager = ChannelManager::new();
manager
.reset_with_connector("localhost:0", connector)
.unwrap();
// create the heartbeat client:
let mut client = MetaClientBuilder::new(0, Role::Frontend)
.enable_heartbeat()
.heartbeat_channel_manager(manager)
.build();
client.start(&options.metasrv_addrs).await.unwrap();
Arc::new(client)
}
async fn create_frontend(
options: &FrontendOptions,
meta_client: MetaClientRef,
) -> Result<Frontend> {
let instance = Arc::new(
FrontendBuilder::new_test(options, meta_client.clone())
.try_build()
.await?,
);
let servers =
Services::new(options.clone(), instance.clone(), Default::default()).build()?;
let executor = Arc::new(HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
Arc::new(SuspendHandler::new(instance.suspend_state())),
]));
let heartbeat_task = Some(HeartbeatTask::new(
options,
meta_client,
executor,
Arc::new(ResourceStatImpl::default()),
));
let mut frontend = Frontend {
instance,
servers,
heartbeat_task,
};
frontend.start().await?;
Ok(frontend)
}
async fn verify_suspend_state_by_http(
frontend: &Frontend,
expected: std::result::Result<&str, (StatusCode, &str)>,
) {
let addr = frontend.server_handlers().addr(HTTP_SERVER).unwrap();
let response = reqwest::get(format!("http://{}/v1/sql?sql=SELECT 1", addr))
.await
.unwrap();
let headers = response.headers();
let response = if let Some((code, error)) = from_header_to_err_code_msg(headers) {
Err((code, error))
} else {
Ok(response.text().await.unwrap())
};
match (response, expected) {
(Ok(response), Ok(expected)) => {
let response: GreptimedbV1Response = serde_json::from_str(&response).unwrap();
let response = serde_json::to_string(response.output()).unwrap();
assert_eq!(&response, expected);
}
(Err(actual), Err(expected)) => assert_eq!(actual, expected),
_ => unreachable!(),
}
}
async fn verify_suspend_state_by_grpc(
frontend: &Frontend,
expected: std::result::Result<&str, (StatusCode, &str)>,
) {
let addr = frontend.server_handlers().addr(GRPC_SERVER).unwrap();
let client = Client::with_urls([addr.to_string()]);
let client = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
let response = client.sql("SELECT 1").await;
match (response, expected) {
(Ok(response), Ok(expected)) => {
let response = response.data.pretty_print().await;
assert_eq!(&response, expected.trim());
}
(Err(actual), Err(expected)) => {
assert_eq!(actual.status_code(), expected.0);
assert_eq!(actual.output_msg(), expected.1);
}
_ => unreachable!(),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_suspend_frontend() -> Result<()> {
common_telemetry::init_default_ut_logging();
let meta_client_options = MetaClientOptions {
metasrv_addrs: vec!["localhost:0".to_string()],
..Default::default()
};
let options = FrontendOptions {
http: HttpOptions {
addr: "127.0.0.1:0".to_string(),
..Default::default()
},
grpc: GrpcOptions {
bind_addr: "127.0.0.1:0".to_string(),
flight_compression: FlightCompression::None,
..Default::default()
},
mysql: MysqlOptions {
enable: false,
..Default::default()
},
postgres: PostgresOptions {
enable: false,
..Default::default()
},
meta_client: Some(meta_client_options.clone()),
..Default::default()
};
let server = Arc::new(SuspendableHeartbeatServer {
suspend: Arc::new(AtomicBool::new(false)),
});
let meta_client = create_meta_client(&meta_client_options, server.clone()).await;
let frontend = create_frontend(&options, meta_client).await?;
tokio::time::sleep(Duration::from_millis(FRONTEND_HEARTBEAT_INTERVAL_MILLIS)).await;
// initial state: not suspend:
assert!(!frontend.instance.is_suspended());
verify_suspend_state_by_http(&frontend, Ok(r#"[{"records":{"schema":{"column_schemas":[{"name":"Int64(1)","data_type":"Int64"}]},"rows":[[1]],"total_rows":1}}]"#)).await;
verify_suspend_state_by_grpc(
&frontend,
Ok(r#"
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+"#),
)
.await;
// make heartbeat server returned "suspend" instruction,
server.suspend.store(true, Ordering::Relaxed);
tokio::time::sleep(Duration::from_millis(FRONTEND_HEARTBEAT_INTERVAL_MILLIS)).await;
// ... then the frontend is suspended:
assert!(frontend.instance.is_suspended());
verify_suspend_state_by_http(
&frontend,
Err((
StatusCode::Suspended,
"error: Service suspended, execution_time_ms: 0",
)),
)
.await;
verify_suspend_state_by_grpc(&frontend, Err((StatusCode::Suspended, "Service suspended")))
.await;
// make heartbeat server NOT returned "suspend" instruction,
server.suspend.store(false, Ordering::Relaxed);
tokio::time::sleep(Duration::from_millis(FRONTEND_HEARTBEAT_INTERVAL_MILLIS)).await;
// ... then frontend's suspend state is cleared:
assert!(!frontend.instance.is_suspended());
verify_suspend_state_by_http(&frontend, Ok(r#"[{"records":{"schema":{"column_schemas":[{"name":"Int64(1)","data_type":"Int64"}]},"rows":[[1]],"total_rows":1}}]"#)).await;
verify_suspend_state_by_grpc(
&frontend,
Ok(r#"
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+"#),
)
.await;
Ok(())
}
}

View File

@@ -27,7 +27,6 @@ use common_stat::ResourceStatRef;
use common_telemetry::{debug, error, info, warn};
use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
use servers::addrs;
use servers::heartbeat_options::HeartbeatOptions;
use snafu::ResultExt;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Receiver;
@@ -54,7 +53,6 @@ impl HeartbeatTask {
pub fn new(
opts: &FrontendOptions,
meta_client: Arc<MetaClient>,
heartbeat_opts: HeartbeatOptions,
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
resource_stat: ResourceStatRef,
) -> Self {
@@ -68,8 +66,8 @@ impl HeartbeatTask {
addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr))
},
meta_client,
report_interval: heartbeat_opts.interval,
retry_interval: heartbeat_opts.retry_interval,
report_interval: opts.heartbeat.interval,
retry_interval: opts.heartbeat.retry_interval,
resp_handler_executor,
start_time_ms: common_time::util::current_time_millis() as u64,
resource_stat,
@@ -196,7 +194,8 @@ impl HeartbeatTask {
let report_interval = self.report_interval;
let start_time_ms = self.start_time_ms;
let self_peer = Some(Peer {
// The peer id doesn't make sense for frontend, so we just set it 0.
// The node id will be actually calculated from its address (by hashing the address
// string) in the metasrv. So it can be set to 0 here, as a placeholder.
id: 0,
addr: self.peer_addr.clone(),
});

View File

@@ -26,7 +26,8 @@ mod region_query;
pub mod standalone;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, atomic};
use std::time::{Duration, SystemTime};
use async_stream::stream;
@@ -83,6 +84,7 @@ use snafu::prelude::*;
use sql::ast::ObjectNamePartExt;
use sql::dialect::Dialect;
use sql::parser::{ParseOptions, ParserContext};
use sql::statements::comment::CommentObject;
use sql::statements::copy::{CopyDatabase, CopyTable};
use sql::statements::statement::Statement;
use sql::statements::tql::Tql;
@@ -119,6 +121,7 @@ pub struct Instance {
limiter: Option<LimiterRef>,
process_manager: ProcessManagerRef,
slow_query_options: SlowQueryOptions,
suspend: Arc<AtomicBool>,
// cache for otlp metrics
// first layer key: db-string
@@ -171,6 +174,14 @@ impl Instance {
pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
self.statement_executor.procedure_executor()
}
pub fn suspend_state(&self) -> Arc<AtomicBool> {
self.suspend.clone()
}
pub(crate) fn is_suspended(&self) -> bool {
self.suspend.load(atomic::Ordering::Relaxed)
}
}
fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {
@@ -513,6 +524,10 @@ impl SqlQueryHandler for Instance {
#[tracing::instrument(skip_all)]
async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
if self.is_suspended() {
return vec![error::SuspendedSnafu {}.fail()];
}
let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
let query_interceptor = query_interceptor_opt.as_ref();
let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) {
@@ -580,6 +595,8 @@ impl SqlQueryHandler for Instance {
plan: LogicalPlan,
query_ctx: QueryContextRef,
) -> Result<Output> {
ensure!(!self.is_suspended(), error::SuspendedSnafu);
if should_capture_statement(stmt.as_ref()) {
// It's safe to unwrap here because we've already checked the type.
let stmt = stmt.unwrap();
@@ -641,6 +658,10 @@ impl SqlQueryHandler for Instance {
query: &PromQuery,
query_ctx: QueryContextRef,
) -> Vec<Result<Output>> {
if self.is_suspended() {
return vec![error::SuspendedSnafu {}.fail()];
}
// check will be done in prometheus handler's do_query
let result = PrometheusHandler::do_query(self, query, query_ctx)
.await
@@ -655,6 +676,8 @@ impl SqlQueryHandler for Instance {
stmt: Statement,
query_ctx: QueryContextRef,
) -> Result<Option<DescribeResult>> {
ensure!(!self.is_suspended(), error::SuspendedSnafu);
if matches!(
stmt,
Statement::Insert(_) | Statement::Query(_) | Statement::Delete(_)
@@ -875,7 +898,7 @@ pub fn check_permission(
validate_param(&stmt.table_name, query_ctx)?;
}
Statement::ShowCreateFlow(stmt) => {
validate_param(&stmt.flow_name, query_ctx)?;
validate_flow(&stmt.flow_name, query_ctx)?;
}
#[cfg(feature = "enterprise")]
Statement::ShowCreateTrigger(stmt) => {
@@ -908,6 +931,12 @@ pub fn check_permission(
// show charset and show collation won't be checked
Statement::ShowCharset(_) | Statement::ShowCollation(_) => {}
Statement::Comment(comment) => match &comment.object {
CommentObject::Table(table) => validate_param(table, query_ctx)?,
CommentObject::Column { table, .. } => validate_param(table, query_ctx)?,
CommentObject::Flow(flow) => validate_flow(flow, query_ctx)?,
},
Statement::Insert(insert) => {
let name = insert.table_name().context(ParseSqlSnafu)?;
validate_param(name, query_ctx)?;
@@ -993,6 +1022,27 @@ fn validate_param(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()>
.context(SqlExecInterceptedSnafu)
}
fn validate_flow(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
let catalog = match &name.0[..] {
[_flow] => query_ctx.current_catalog().to_string(),
[catalog, _flow] => catalog.to_string_unquoted(),
_ => {
return InvalidSqlSnafu {
err_msg: format!(
"expect flow name to be <catalog>.<flow_name> or <flow_name>, actual: {name}",
),
}
.fail();
}
};
let schema = query_ctx.current_schema();
validate_catalog_and_schema(&catalog, &schema, query_ctx)
.map_err(BoxedError::new)
.context(SqlExecInterceptedSnafu)
}
fn validate_database(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
let (catalog, schema) = match &name.0[..] {
[schema] => (
@@ -1251,6 +1301,28 @@ mod tests {
// test describe table
let sql = "DESC TABLE {catalog}{schema}demo;";
replace_test(sql, plugins, &query_ctx);
replace_test(sql, plugins.clone(), &query_ctx);
let comment_flow_cases = [
("COMMENT ON FLOW my_flow IS 'comment';", true),
("COMMENT ON FLOW greptime.my_flow IS 'comment';", true),
("COMMENT ON FLOW wrongcatalog.my_flow IS 'comment';", false),
];
for (sql, is_ok) in comment_flow_cases {
let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
let result = check_permission(plugins.clone(), stmt, &query_ctx);
assert_eq!(result.is_ok(), is_ok);
}
let show_flow_cases = [
("SHOW CREATE FLOW my_flow;", true),
("SHOW CREATE FLOW greptime.my_flow;", true),
("SHOW CREATE FLOW wrongcatalog.my_flow;", false),
];
for (sql, is_ok) in show_flow_cases {
let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
let result = check_permission(plugins.clone(), stmt, &query_ctx);
assert_eq!(result.is_ok(), is_ok);
}
}
}

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
use catalog::CatalogManagerRef;
@@ -87,6 +88,33 @@ impl FrontendBuilder {
}
}
#[cfg(test)]
pub(crate) fn new_test(
options: &FrontendOptions,
meta_client: meta_client::MetaClientRef,
) -> Self {
let kv_backend = Arc::new(common_meta::kv_backend::memory::MemoryKvBackend::new());
let layered_cache_registry = Arc::new(
common_meta::cache::LayeredCacheRegistryBuilder::default()
.add_cache_registry(cache::build_fundamental_cache_registry(kv_backend.clone()))
.build(),
);
Self::new(
options.clone(),
kv_backend,
layered_cache_registry,
catalog::memory::MemoryCatalogManager::with_default_setup(),
Arc::new(client::client_manager::NodeClients::default()),
meta_client,
Arc::new(catalog::process_manager::ProcessManager::new(
"".to_string(),
None,
)),
)
}
pub fn with_local_cache_invalidator(self, cache_invalidator: CacheInvalidatorRef) -> Self {
Self {
local_cache_invalidator: Some(cache_invalidator),
@@ -242,6 +270,7 @@ impl FrontendBuilder {
process_manager,
otlp_metrics_table_legacy_cache: DashMap::new(),
slow_query_options: self.options.slow_query.clone(),
suspend: Arc::new(AtomicBool::new(false)),
})
}
}

View File

@@ -234,6 +234,11 @@ impl GrpcQueryHandler for Instance {
DdlExpr::DropView(_) => {
todo!("implemented in the following PR")
}
DdlExpr::CommentOn(expr) => {
self.statement_executor
.comment_by_expr(expr, ctx.clone())
.await?
}
}
}
};
@@ -399,6 +404,9 @@ fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryConte
Expr::DropView(expr) => {
check_and_fill!(expr);
}
Expr::CommentOn(expr) => {
check_and_fill!(expr);
}
}
}

View File

@@ -65,8 +65,7 @@ impl JaegerQueryHandler for Instance {
// It's equivalent to `SELECT DISTINCT(service_name) FROM {db}.{trace_table}`.
Ok(query_trace_table(
ctx,
self.catalog_manager(),
self.query_engine(),
self,
vec![SelectExpr::from(col(SERVICE_NAME_COLUMN))],
vec![],
vec![],
@@ -107,8 +106,7 @@ impl JaegerQueryHandler for Instance {
// ```.
Ok(query_trace_table(
ctx,
self.catalog_manager(),
self.query_engine(),
self,
vec![
SelectExpr::from(col(SPAN_NAME_COLUMN)),
SelectExpr::from(col(SPAN_KIND_COLUMN)),
@@ -160,8 +158,7 @@ impl JaegerQueryHandler for Instance {
Ok(query_trace_table(
ctx,
self.catalog_manager(),
self.query_engine(),
self,
selects,
filters,
vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order.
@@ -220,8 +217,7 @@ impl JaegerQueryHandler for Instance {
// ```.
let output = query_trace_table(
ctx.clone(),
self.catalog_manager(),
self.query_engine(),
self,
vec![wildcard()],
filters,
vec![],
@@ -285,8 +281,7 @@ impl JaegerQueryHandler for Instance {
// query all spans
Ok(query_trace_table(
ctx,
self.catalog_manager(),
self.query_engine(),
self,
vec![wildcard()],
filters,
vec![col(TIMESTAMP_COLUMN).sort(false, false)], // Sort by timestamp in descending order.
@@ -303,8 +298,7 @@ impl JaegerQueryHandler for Instance {
#[allow(clippy::too_many_arguments)]
async fn query_trace_table(
ctx: QueryContextRef,
catalog_manager: &CatalogManagerRef,
query_engine: &QueryEngineRef,
instance: &Instance,
selects: Vec<SelectExpr>,
filters: Vec<Expr>,
sorts: Vec<SortExpr>,
@@ -334,7 +328,8 @@ async fn query_trace_table(
}
};
let table = catalog_manager
let table = instance
.catalog_manager()
.table(
ctx.current_catalog(),
&ctx.current_schema(),
@@ -367,7 +362,7 @@ async fn query_trace_table(
.map(|s| format!("\"{}\"", s))
.collect::<HashSet<String>>();
let df_context = create_df_context(query_engine)?;
let df_context = create_df_context(instance.query_engine())?;
let dataframe = df_context
.read_table(Arc::new(DfTableProviderAdapter::new(table)))

View File

@@ -16,6 +16,9 @@ use std::net::SocketAddr;
use std::sync::Arc;
use auth::UserProviderRef;
use axum::extract::{Request, State};
use axum::middleware::Next;
use axum::response::IntoResponse;
use common_base::Plugins;
use common_config::Configurable;
use common_telemetry::info;
@@ -27,6 +30,7 @@ use servers::grpc::frontend_grpc_handler::FrontendGrpcHandler;
use servers::grpc::greptime_handler::GreptimeRequestHandler;
use servers::grpc::{GrpcOptions, GrpcServer};
use servers::http::event::LogValidatorRef;
use servers::http::result::error_result::ErrorResponse;
use servers::http::utils::router::RouterConfigurator;
use servers::http::{HttpServer, HttpServerBuilder};
use servers::interceptor::LogIngestInterceptorRef;
@@ -39,6 +43,7 @@ use servers::query_handler::sql::ServerSqlQueryHandlerAdapter;
use servers::server::{Server, ServerHandlers};
use servers::tls::{ReloadableTlsServerConfig, maybe_watch_server_tls_config};
use snafu::ResultExt;
use tonic::Status;
use crate::error::{self, Result, StartServerSnafu, TomlFormatSnafu};
use crate::frontend::FrontendOptions;
@@ -125,7 +130,16 @@ where
builder = builder.with_extra_router(configurator.router());
}
builder
builder.add_layer(axum::middleware::from_fn_with_state(
self.instance.clone(),
async move |State(state): State<Arc<Instance>>, request: Request, next: Next| {
if state.is_suspended() {
return ErrorResponse::from_error(servers::error::SuspendedSnafu.build())
.into_response();
}
next.run(request).await
},
))
}
pub fn with_grpc_server_builder(self, builder: GrpcServerBuilder) -> Self {
@@ -197,7 +211,17 @@ where
self.instance.clone(),
user_provider.clone(),
))
.flight_handler(flight_handler);
.flight_handler(flight_handler)
.add_layer(axum::middleware::from_fn_with_state(
self.instance.clone(),
async move |State(state): State<Arc<Instance>>, request: Request, next: Next| {
if state.is_suspended() {
let status = Status::from(servers::error::SuspendedSnafu.build());
return status.into_http();
}
next.run(request).await
},
));
let grpc_server = if !external {
let frontend_grpc_handler =

View File

@@ -16,7 +16,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use common_wal::config::kafka::DatanodeKafkaConfig;
use common_wal::config::kafka::common::DEFAULT_BACKOFF_CONFIG;
use common_wal::config::kafka::common::{DEFAULT_BACKOFF_CONFIG, DEFAULT_CONNECT_TIMEOUT};
use dashmap::DashMap;
use rskafka::client::ClientBuilder;
use rskafka::client::partition::{Compression, PartitionClient, UnknownTopicHandling};
@@ -78,7 +78,8 @@ impl ClientManager {
) -> Result<Self> {
// Sets backoff config for the top-level kafka client and all clients constructed by it.
let mut builder = ClientBuilder::new(config.connection.broker_endpoints.clone())
.backoff_config(DEFAULT_BACKOFF_CONFIG);
.backoff_config(DEFAULT_BACKOFF_CONFIG)
.connect_timeout(Some(DEFAULT_CONNECT_TIMEOUT));
if let Some(sasl) = &config.connection.sasl {
builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
};

View File

@@ -14,6 +14,7 @@
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::cluster_server::ClusterServer;
use api::v1::meta::heartbeat_server::HeartbeatServer;
@@ -49,16 +50,21 @@ use crate::metasrv::builder::MetasrvBuilder;
use crate::metasrv::{
BackendImpl, ElectionRef, Metasrv, MetasrvOptions, SelectTarget, SelectorRef,
};
use crate::selector::SelectorType;
use crate::selector::lease_based::LeaseBasedSelector;
use crate::selector::load_based::LoadBasedSelector;
use crate::selector::round_robin::RoundRobinSelector;
use crate::selector::weight_compute::RegionNumsBasedWeightCompute;
use crate::selector::{Selector, SelectorType};
use crate::service::admin;
use crate::service::admin::admin_axum_router;
use crate::utils::etcd::create_etcd_client_with_tls;
use crate::{Result, error};
/// The default keep-alive interval for gRPC.
const DEFAULT_GRPC_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(10);
/// The default keep-alive timeout for gRPC.
const DEFAULT_GRPC_KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(10);
pub struct MetasrvInstance {
metasrv: Arc<Metasrv>,
@@ -245,7 +251,12 @@ macro_rules! add_compressed_service {
}
pub fn router(metasrv: Arc<Metasrv>) -> Router {
let mut router = tonic::transport::Server::builder().accept_http1(true); // for admin services
let mut router = tonic::transport::Server::builder()
// for admin services
.accept_http1(true)
// For quick network failures detection.
.http2_keepalive_interval(Some(DEFAULT_GRPC_KEEP_ALIVE_INTERVAL))
.http2_keepalive_timeout(Some(DEFAULT_GRPC_KEEP_ALIVE_TIMEOUT));
let router = add_compressed_service!(router, HeartbeatServer::from_arc(metasrv.clone()));
let router = add_compressed_service!(router, StoreServer::from_arc(metasrv.clone()));
let router = add_compressed_service!(router, ClusterServer::from_arc(metasrv.clone()));
@@ -393,7 +404,12 @@ pub async fn metasrv_builder(
info!("Using selector from plugins");
selector
} else {
let selector = match opts.selector {
let selector: Arc<
dyn Selector<
Context = crate::metasrv::SelectorContext,
Output = Vec<common_meta::peer::Peer>,
>,
> = match opts.selector {
SelectorType::LoadBased => Arc::new(LoadBasedSelector::new(
RegionNumsBasedWeightCompute,
meta_peer_client.clone(),

View File

@@ -63,22 +63,6 @@ pub struct EtcdElection {
}
impl EtcdElection {
pub async fn with_endpoints<E, S>(
leader_value: E,
endpoints: S,
store_key_prefix: String,
) -> Result<ElectionRef>
where
E: AsRef<str>,
S: AsRef<[E]>,
{
let client = Client::connect(endpoints, None)
.await
.context(error::ConnectEtcdSnafu)?;
Self::with_etcd_client(leader_value, client, store_key_prefix).await
}
pub async fn with_etcd_client<E>(
leader_value: E,
client: Client,

View File

@@ -88,7 +88,8 @@ impl GcScheduler {
// Skip regions that are in cooldown period
if let Some(gc_info) = tracker.get(&region_stat.id)
&& now.duration_since(gc_info.last_gc_time) < self.config.gc_cooldown_period
&& now.saturating_duration_since(gc_info.last_gc_time)
< self.config.gc_cooldown_period
{
debug!("Skipping region {} due to cooldown", region_stat.id);
continue;

View File

@@ -434,7 +434,7 @@ impl GcScheduler {
if let Some(gc_info) = gc_tracker.get(&region_id) {
if let Some(last_full_listing) = gc_info.last_full_listing_time {
// check if pass cooling down interval after last full listing
let elapsed = now.duration_since(last_full_listing);
let elapsed = now.saturating_duration_since(last_full_listing);
elapsed >= self.config.full_file_listing_interval
} else {
// Never did full listing for this region, do it now

View File

@@ -14,7 +14,7 @@
mod basic;
mod candidate_select;
mod con;
mod concurrent;
mod config;
mod err_handle;
mod full_list;

View File

@@ -50,7 +50,7 @@ impl GcScheduler {
let now = Instant::now();
// Check if enough time has passed since last cleanup
if now.duration_since(last_cleanup) < self.config.tracker_cleanup_interval {
if now.saturating_duration_since(last_cleanup) < self.config.tracker_cleanup_interval {
return Ok(());
}
@@ -92,7 +92,7 @@ impl GcScheduler {
if let Some(gc_info) = gc_tracker.get(&region_id) {
if let Some(last_full_listing) = gc_info.last_full_listing_time {
let elapsed = now.duration_since(last_full_listing);
let elapsed = now.saturating_duration_since(last_full_listing);
elapsed >= self.config.full_file_listing_interval
} else {
// Never did full listing for this region, do it now

View File

@@ -32,7 +32,7 @@ use collect_leader_region_handler::CollectLeaderRegionHandler;
use collect_stats_handler::CollectStatsHandler;
use common_base::Plugins;
use common_meta::datanode::Stat;
use common_meta::instruction::{Instruction, InstructionReply};
use common_meta::instruction::InstructionReply;
use common_meta::sequence::Sequence;
use common_telemetry::{debug, info, warn};
use dashmap::DashMap;
@@ -114,16 +114,19 @@ pub enum HandleControl {
#[derive(Debug, Default)]
pub struct HeartbeatAccumulator {
pub header: Option<ResponseHeader>,
pub instructions: Vec<Instruction>,
mailbox_message: Option<MailboxMessage>,
pub stat: Option<Stat>,
pub inactive_region_ids: HashSet<RegionId>,
pub region_lease: Option<RegionLease>,
}
impl HeartbeatAccumulator {
pub fn into_mailbox_message(self) -> Option<MailboxMessage> {
// TODO(jiachun): to HeartbeatResponse payload
None
pub(crate) fn take_mailbox_message(&mut self) -> Option<MailboxMessage> {
self.mailbox_message.take()
}
pub fn set_mailbox_message(&mut self, message: MailboxMessage) {
let _ = self.mailbox_message.insert(message);
}
}
@@ -275,6 +278,15 @@ impl Pushers {
async fn remove(&self, pusher_id: &str) -> Option<Pusher> {
self.0.write().await.remove(pusher_id)
}
pub(crate) async fn clear(&self) -> Vec<String> {
let mut pushers = self.0.write().await;
let keys = pushers.keys().cloned().collect::<Vec<_>>();
if !keys.is_empty() {
pushers.clear();
}
keys
}
}
#[derive(Clone)]
@@ -309,12 +321,11 @@ impl HeartbeatHandlerGroup {
}
/// Deregisters the heartbeat response [`Pusher`] with the given key from the group.
///
/// Returns the [`Pusher`] if it exists.
pub async fn deregister_push(&self, pusher_id: PusherId) -> Option<Pusher> {
METRIC_META_HEARTBEAT_CONNECTION_NUM.dec();
pub async fn deregister_push(&self, pusher_id: PusherId) {
info!("Pusher unregister: {}", pusher_id);
self.pushers.remove(&pusher_id.string_key()).await
if self.pushers.remove(&pusher_id.string_key()).await.is_some() {
METRIC_META_HEARTBEAT_CONNECTION_NUM.dec();
}
}
/// Returns the [`Pushers`] of the group.
@@ -351,10 +362,11 @@ impl HeartbeatHandlerGroup {
}
}
let header = std::mem::take(&mut acc.header);
let mailbox_message = acc.take_mailbox_message();
let res = HeartbeatResponse {
header,
region_lease: acc.region_lease,
..Default::default()
mailbox_message,
};
Ok(res)
}
@@ -382,7 +394,9 @@ impl HeartbeatMailbox {
/// Parses the [Instruction] from [MailboxMessage].
#[cfg(test)]
pub fn json_instruction(msg: &MailboxMessage) -> Result<Instruction> {
pub(crate) fn json_instruction(
msg: &MailboxMessage,
) -> Result<common_meta::instruction::Instruction> {
let Payload::Json(payload) =
msg.payload
.as_ref()
@@ -519,6 +533,14 @@ impl Mailbox for HeartbeatMailbox {
Ok(())
}
async fn reset(&self) {
let keys = self.pushers.clear().await;
if !keys.is_empty() {
info!("Reset mailbox, deregister pushers: {:?}", keys);
METRIC_META_HEARTBEAT_CONNECTION_NUM.sub(keys.len() as i64);
}
}
}
/// The builder to build the group of heartbeat handlers.

View File

@@ -452,6 +452,7 @@ pub struct MetaStateHandler {
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
leadership_change_notifier: LeadershipChangeNotifier,
mailbox: MailboxRef,
state: StateRef,
}
@@ -475,6 +476,9 @@ impl MetaStateHandler {
pub async fn on_leader_stop(&self) {
self.state.write().unwrap().next_state(become_follower());
// Enforces the mailbox to clear all pushers.
// The remaining heartbeat connections will be closed by the remote peer or keep-alive detection.
self.mailbox.reset().await;
self.leadership_change_notifier
.notify_on_leader_stop()
.await;
@@ -602,6 +606,7 @@ impl Metasrv {
state: self.state.clone(),
leader_cached_kv_backend: leader_cached_kv_backend.clone(),
leadership_change_notifier,
mailbox: self.mailbox.clone(),
};
let _handle = common_runtime::spawn_global(async move {
loop {

View File

@@ -207,6 +207,9 @@ pub trait Mailbox: Send + Sync {
async fn broadcast(&self, ch: &BroadcastChannel, msg: &MailboxMessage) -> Result<()>;
async fn on_recv(&self, id: MessageId, maybe_msg: Result<MailboxMessage>) -> Result<()>;
/// Reset all pushers of the mailbox.
async fn reset(&self);
}
#[cfg(test)]

View File

@@ -12,8 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::distributed_time_constants::default_etcd_client_options;
use common_meta::kv_backend::etcd::create_etcd_tls_options;
use etcd_client::{Client, ConnectOptions};
use etcd_client::Client;
use servers::tls::{TlsMode, TlsOption};
use snafu::ResultExt;
@@ -30,14 +31,15 @@ pub async fn create_etcd_client_with_tls(
.filter(|x| !x.is_empty())
.collect::<Vec<_>>();
let connect_options = tls_config
.map(|c| create_etcd_tls_options(&convert_tls_option(c)))
.transpose()
.context(BuildTlsOptionsSnafu)?
.flatten()
.map(|tls_options| ConnectOptions::new().with_tls(tls_options));
let mut connect_options = default_etcd_client_options();
if let Some(tls_config) = tls_config
&& let Some(tls_options) = create_etcd_tls_options(&convert_tls_option(tls_config))
.context(BuildTlsOptionsSnafu)?
{
connect_options = connect_options.with_tls(tls_options);
}
Client::connect(&etcd_endpoints, connect_options)
Client::connect(&etcd_endpoints, Some(connect_options))
.await
.context(error::ConnectEtcdSnafu)
}

View File

@@ -119,7 +119,7 @@ mod tests {
.index_file_path
.map(|path| path.replace(&e.file_id, "<file_id>"));
e.file_id = "<file_id>".to_string();
e.index_file_id = e.index_file_id.map(|_| "<index_file_id>".to_string());
e.index_version = 0;
format!("\n{:?}", e)
})
.sorted()
@@ -128,12 +128,12 @@ mod tests {
assert_eq!(
debug_format,
r#"
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test_metric_region/11_0000000001/data/<file_id>.parquet", file_size: 3217, index_file_path: Some("test_metric_region/11_0000000001/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test_metric_region/11_0000000002/data/<file_id>.parquet", file_size: 3217, index_file_path: Some("test_metric_region/11_0000000002/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "<file_id>", index_file_id: None, level: 0, file_path: "test_metric_region/11_0000000001/metadata/<file_id>.parquet", file_size: 3487, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, num_series: Some(8), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8), origin_region_id: 47261417473(11, 16777217), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "<file_id>", index_file_id: None, level: 0, file_path: "test_metric_region/11_0000000002/metadata/<file_id>.parquet", file_size: 3471, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 47261417474(11, 16777218), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test_metric_region/22_0000000042/data/<file_id>.parquet", file_size: 3217, index_file_path: Some("test_metric_region/22_0000000042/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "<file_id>", index_file_id: None, level: 0, file_path: "test_metric_region/22_0000000042/metadata/<file_id>.parquet", file_size: 3471, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 94506057770(22, 16777258), node_id: None, visible: true }"#
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000001/data/<file_id>.parquet", file_size: 3217, index_file_path: Some("test_metric_region/11_0000000001/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000002/data/<file_id>.parquet", file_size: 3217, index_file_path: Some("test_metric_region/11_0000000002/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000001/metadata/<file_id>.parquet", file_size: 3487, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, num_series: Some(8), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8), origin_region_id: 47261417473(11, 16777217), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000002/metadata/<file_id>.parquet", file_size: 3471, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 47261417474(11, 16777218), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/22_0000000042/data/<file_id>.parquet", file_size: 3217, index_file_path: Some("test_metric_region/22_0000000042/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/22_0000000042/metadata/<file_id>.parquet", file_size: 3471, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 94506057770(22, 16777258), node_id: None, visible: true }"#,
);
// list from storage
let storage_entries = mito

View File

@@ -144,6 +144,7 @@ async fn flush(mem: &SimpleBulkMemtable) {
let reader = Box::new(DedupReader::new(
merge_reader,
read::dedup::LastRow::new(true),
None,
));
Source::Reader(reader)
};

View File

@@ -37,7 +37,7 @@ use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu
use crate::metrics::{COMPACTION_STAGE_ELAPSED, FLUSH_ELAPSED};
use crate::read::{FlatSource, Source};
use crate::region::options::IndexOptions;
use crate::sst::file::{FileHandle, RegionFileId};
use crate::sst::file::{FileHandle, RegionFileId, RegionIndexId};
use crate::sst::index::IndexerBuilderImpl;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinManager};
@@ -216,7 +216,7 @@ impl AccessLayer {
pub(crate) async fn delete_sst(
&self,
region_file_id: &RegionFileId,
index_file_id: &RegionFileId,
index_file_id: &RegionIndexId,
) -> Result<()> {
let path = location::sst_file_path(&self.table_dir, *region_file_id, self.path_type);
self.object_store
@@ -226,14 +226,30 @@ impl AccessLayer {
file_id: region_file_id.file_id(),
})?;
let path = location::index_file_path(&self.table_dir, *index_file_id, self.path_type);
// Delete all versions of the index file.
for version in 0..=index_file_id.version {
let index_id = RegionIndexId::new(*region_file_id, version);
self.delete_index(index_id).await?;
}
Ok(())
}
pub(crate) async fn delete_index(
&self,
index_file_id: RegionIndexId,
) -> Result<(), crate::error::Error> {
let path = location::index_file_path(
&self.table_dir,
RegionIndexId::new(index_file_id.file_id, index_file_id.version),
self.path_type,
);
self.object_store
.delete(&path)
.await
.context(DeleteIndexSnafu {
file_id: region_file_id.file_id(),
file_id: index_file_id.file_id(),
})?;
Ok(())
}
@@ -291,6 +307,7 @@ impl AccessLayer {
puffin_manager: self
.puffin_manager_factory
.build(store, path_provider.clone()),
write_cache_enabled: false,
intermediate_manager: self.intermediate_manager.clone(),
index_options: request.index_options,
inverted_index_config: request.inverted_index_config,
@@ -468,9 +485,10 @@ impl TempFileCleaner {
}
/// Removes the SST and index file from the local atomic dir by the file id.
/// This only removes the initial index, since the index version is always 0 for a new SST, this method should be safe to pass 0.
pub(crate) async fn clean_by_file_id(&self, file_id: FileId) {
let sst_key = IndexKey::new(self.region_id, file_id, FileType::Parquet).to_string();
let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin).to_string();
let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin(0)).to_string();
Self::clean_atomic_dir_files(&self.object_store, &[&sst_key, &index_key]).await;
}
@@ -553,9 +571,12 @@ async fn clean_dir(dir: &str) -> Result<()> {
/// Path provider for SST file and index file.
pub trait FilePathProvider: Send + Sync {
/// Creates index file path of given file id.
/// Creates index file path of given file id. Version default to 0, and not shown in the path.
fn build_index_file_path(&self, file_id: RegionFileId) -> String;
/// Creates index file path of given index id (with version support).
fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String;
/// Creates SST file path of given file id.
fn build_sst_file_path(&self, file_id: RegionFileId) -> String;
}
@@ -575,7 +596,16 @@ impl WriteCachePathProvider {
impl FilePathProvider for WriteCachePathProvider {
fn build_index_file_path(&self, file_id: RegionFileId) -> String {
let puffin_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin);
let puffin_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin(0));
self.file_cache.cache_file_path(puffin_key)
}
fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
let puffin_key = IndexKey::new(
index_id.region_id(),
index_id.file_id(),
FileType::Puffin(index_id.version),
);
self.file_cache.cache_file_path(puffin_key)
}
@@ -605,7 +635,11 @@ impl RegionFilePathFactory {
impl FilePathProvider for RegionFilePathFactory {
fn build_index_file_path(&self, file_id: RegionFileId) -> String {
location::index_file_path(&self.table_dir, file_id, self.path_type)
location::index_file_path_legacy(&self.table_dir, file_id, self.path_type)
}
fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
location::index_file_path(&self.table_dir, index_id, self.path_type)
}
fn build_sst_file_path(&self, file_id: RegionFileId) -> String {

View File

@@ -44,7 +44,7 @@ use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCache
use crate::cache::write_cache::WriteCacheRef;
use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
use crate::read::Batch;
use crate::sst::file::RegionFileId;
use crate::sst::file::{RegionFileId, RegionIndexId};
use crate::sst::parquet::reader::MetadataCacheMetrics;
/// Metrics type key for sst meta.
@@ -180,7 +180,7 @@ impl CacheStrategy {
}
/// Calls [CacheManager::evict_puffin_cache()].
pub async fn evict_puffin_cache(&self, file_id: RegionFileId) {
pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
match self {
CacheStrategy::EnableAll(cache_manager) => {
cache_manager.evict_puffin_cache(file_id).await
@@ -400,7 +400,7 @@ impl CacheManager {
}
/// Evicts every puffin-related cache entry for the given file.
pub async fn evict_puffin_cache(&self, file_id: RegionFileId) {
pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
if let Some(cache) = &self.bloom_filter_index_cache {
cache.invalidate_file(file_id.file_id());
}
@@ -422,7 +422,7 @@ impl CacheManager {
.remove(IndexKey::new(
file_id.region_id(),
file_id.file_id(),
FileType::Puffin,
FileType::Puffin(file_id.version),
))
.await;
}
@@ -949,7 +949,7 @@ mod tests {
let cache = Arc::new(cache);
let region_id = RegionId::new(1, 1);
let region_file_id = RegionFileId::new(region_id, FileId::random());
let index_id = RegionIndexId::new(RegionFileId::new(region_id, FileId::random()), 0);
let column_id: ColumnId = 1;
let bloom_cache = cache.bloom_filter_index_cache().unwrap().clone();
@@ -957,16 +957,21 @@ mod tests {
let result_cache = cache.index_result_cache().unwrap();
let puffin_metadata_cache = cache.puffin_metadata_cache().unwrap().clone();
let bloom_key = (region_file_id.file_id(), column_id, Tag::Skipping);
let bloom_key = (
index_id.file_id(),
index_id.version,
column_id,
Tag::Skipping,
);
bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
inverted_cache.put_metadata(
region_file_id.file_id(),
(index_id.file_id(), index_id.version),
Arc::new(InvertedIndexMetas::default()),
);
let predicate = PredicateKey::new_bloom(Arc::new(BTreeMap::new()));
let selection = Arc::new(RowGroupSelection::default());
result_cache.put(predicate.clone(), region_file_id.file_id(), selection);
let file_id_str = region_file_id.to_string();
result_cache.put(predicate.clone(), index_id.file_id(), selection);
let file_id_str = index_id.to_string();
let metadata = Arc::new(FileMetadata {
blobs: Vec::new(),
properties: HashMap::new(),
@@ -976,40 +981,32 @@ mod tests {
assert!(bloom_cache.get_metadata(bloom_key).is_some());
assert!(
inverted_cache
.get_metadata(region_file_id.file_id())
.is_some()
);
assert!(
result_cache
.get(&predicate, region_file_id.file_id())
.get_metadata((index_id.file_id(), index_id.version))
.is_some()
);
assert!(result_cache.get(&predicate, index_id.file_id()).is_some());
assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_some());
cache.evict_puffin_cache(region_file_id).await;
cache.evict_puffin_cache(index_id).await;
assert!(bloom_cache.get_metadata(bloom_key).is_none());
assert!(
inverted_cache
.get_metadata(region_file_id.file_id())
.is_none()
);
assert!(
result_cache
.get(&predicate, region_file_id.file_id())
.get_metadata((index_id.file_id(), index_id.version))
.is_none()
);
assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
// Refill caches and evict via CacheStrategy to ensure delegation works.
bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
inverted_cache.put_metadata(
region_file_id.file_id(),
(index_id.file_id(), index_id.version),
Arc::new(InvertedIndexMetas::default()),
);
result_cache.put(
predicate.clone(),
region_file_id.file_id(),
index_id.file_id(),
Arc::new(RowGroupSelection::default()),
);
puffin_metadata_cache.put_metadata(
@@ -1021,19 +1018,15 @@ mod tests {
);
let strategy = CacheStrategy::EnableAll(cache.clone());
strategy.evict_puffin_cache(region_file_id).await;
strategy.evict_puffin_cache(index_id).await;
assert!(bloom_cache.get_metadata(bloom_key).is_none());
assert!(
inverted_cache
.get_metadata(region_file_id.file_id())
.is_none()
);
assert!(
result_cache
.get(&predicate, region_file_id.file_id())
.get_metadata((index_id.file_id(), index_id.version))
.is_none()
);
assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
}
}

View File

@@ -71,7 +71,7 @@ impl FileCacheInner {
fn memory_index(&self, file_type: FileType) -> &Cache<IndexKey, IndexValue> {
match file_type {
FileType::Parquet => &self.parquet_index,
FileType::Puffin => &self.puffin_index,
FileType::Puffin { .. } => &self.puffin_index,
}
}
@@ -130,7 +130,7 @@ impl FileCacheInner {
// Track sizes separately for each file type
match key.file_type {
FileType::Parquet => parquet_size += size,
FileType::Puffin => puffin_size += size,
FileType::Puffin { .. } => puffin_size += size,
}
}
// The metrics is a signed int gauge so we can updates it finally.
@@ -178,7 +178,7 @@ impl FileCacheInner {
let timer = WRITE_CACHE_DOWNLOAD_ELAPSED
.with_label_values(&[match file_type {
FileType::Parquet => "download_parquet",
FileType::Puffin => "download_puffin",
FileType::Puffin { .. } => "download_puffin",
}])
.start_timer();
@@ -607,7 +607,7 @@ impl fmt::Display for IndexKey {
"{}.{}.{}",
self.region_id.as_u64(),
self.file_id,
self.file_type.as_str()
self.file_type
)
}
}
@@ -618,7 +618,16 @@ pub enum FileType {
/// Parquet file.
Parquet,
/// Puffin file.
Puffin,
Puffin(u64),
}
impl fmt::Display for FileType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
FileType::Parquet => write!(f, "parquet"),
FileType::Puffin(version) => write!(f, "{}.puffin", version),
}
}
}
impl FileType {
@@ -626,16 +635,16 @@ impl FileType {
fn parse(s: &str) -> Option<FileType> {
match s {
"parquet" => Some(FileType::Parquet),
"puffin" => Some(FileType::Puffin),
_ => None,
}
}
/// Converts the file type to string.
fn as_str(&self) -> &'static str {
match self {
FileType::Parquet => "parquet",
FileType::Puffin => "puffin",
"puffin" => Some(FileType::Puffin(0)),
_ => {
// if post-fix with .puffin, try to parse the version
if let Some(version_str) = s.strip_suffix(".puffin") {
let version = version_str.parse::<u64>().ok()?;
Some(FileType::Puffin(version))
} else {
None
}
}
}
}
@@ -643,7 +652,7 @@ impl FileType {
fn metric_label(&self) -> &'static str {
match self {
FileType::Parquet => FILE_TYPE,
FileType::Puffin => INDEX_TYPE,
FileType::Puffin(_) => INDEX_TYPE,
}
}
}
@@ -921,6 +930,15 @@ mod tests {
IndexKey::new(region_id, file_id, FileType::Parquet),
parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet").unwrap()
);
assert_eq!(
IndexKey::new(region_id, file_id, FileType::Puffin(0)),
parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.puffin").unwrap()
);
assert_eq!(
IndexKey::new(region_id, file_id, FileType::Puffin(42)),
parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.42.puffin")
.unwrap()
);
assert!(parse_index_key("").is_none());
assert!(parse_index_key(".").is_none());
assert!(parse_index_key("5299989643269").is_none());

View File

@@ -21,7 +21,7 @@ use async_trait::async_trait;
use bytes::Bytes;
use index::bloom_filter::error::Result;
use index::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReader};
use store_api::storage::{ColumnId, FileId};
use store_api::storage::{ColumnId, FileId, IndexVersion};
use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey};
use crate::metrics::{CACHE_HIT, CACHE_MISS};
@@ -35,8 +35,10 @@ pub enum Tag {
Fulltext,
}
pub type BloomFilterIndexKey = (FileId, IndexVersion, ColumnId, Tag);
/// Cache for bloom filter index.
pub type BloomFilterIndexCache = IndexCache<(FileId, ColumnId, Tag), BloomFilterMeta>;
pub type BloomFilterIndexCache = IndexCache<BloomFilterIndexKey, BloomFilterMeta>;
pub type BloomFilterIndexCacheRef = Arc<BloomFilterIndexCache>;
impl BloomFilterIndexCache {
@@ -59,11 +61,9 @@ impl BloomFilterIndexCache {
}
/// Calculates weight for bloom filter index metadata.
fn bloom_filter_index_metadata_weight(
k: &(FileId, ColumnId, Tag),
meta: &Arc<BloomFilterMeta>,
) -> u32 {
fn bloom_filter_index_metadata_weight(k: &BloomFilterIndexKey, meta: &Arc<BloomFilterMeta>) -> u32 {
let base = k.0.as_bytes().len()
+ std::mem::size_of::<IndexVersion>()
+ std::mem::size_of::<ColumnId>()
+ std::mem::size_of::<Tag>()
+ std::mem::size_of::<BloomFilterMeta>();
@@ -75,16 +75,14 @@ fn bloom_filter_index_metadata_weight(
}
/// Calculates weight for bloom filter index content.
fn bloom_filter_index_content_weight(
(k, _): &((FileId, ColumnId, Tag), PageKey),
v: &Bytes,
) -> u32 {
fn bloom_filter_index_content_weight((k, _): &(BloomFilterIndexKey, PageKey), v: &Bytes) -> u32 {
(k.0.as_bytes().len() + std::mem::size_of::<ColumnId>() + v.len()) as u32
}
/// Bloom filter index blob reader with cache.
pub struct CachedBloomFilterIndexBlobReader<R> {
file_id: FileId,
index_version: IndexVersion,
column_id: ColumnId,
tag: Tag,
blob_size: u64,
@@ -96,6 +94,7 @@ impl<R> CachedBloomFilterIndexBlobReader<R> {
/// Creates a new bloom filter index blob reader with cache.
pub fn new(
file_id: FileId,
index_version: IndexVersion,
column_id: ColumnId,
tag: Tag,
blob_size: u64,
@@ -104,6 +103,7 @@ impl<R> CachedBloomFilterIndexBlobReader<R> {
) -> Self {
Self {
file_id,
index_version,
column_id,
tag,
blob_size,
@@ -126,7 +126,7 @@ impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBl
let (result, cache_metrics) = self
.cache
.get_or_load(
(self.file_id, self.column_id, self.tag),
(self.file_id, self.index_version, self.column_id, self.tag),
self.blob_size,
offset,
size,
@@ -161,7 +161,7 @@ impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBl
let (page, cache_metrics) = self
.cache
.get_or_load(
(self.file_id, self.column_id, self.tag),
(self.file_id, self.index_version, self.column_id, self.tag),
self.blob_size,
range.start,
(range.end - range.start) as u32,
@@ -191,9 +191,9 @@ impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBl
&self,
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<BloomFilterMeta> {
if let Some(cached) = self
.cache
.get_metadata((self.file_id, self.column_id, self.tag))
if let Some(cached) =
self.cache
.get_metadata((self.file_id, self.index_version, self.column_id, self.tag))
{
CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
if let Some(m) = metrics {
@@ -203,7 +203,7 @@ impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBl
} else {
let meta = self.inner.metadata(metrics).await?;
self.cache.put_metadata(
(self.file_id, self.column_id, self.tag),
(self.file_id, self.index_version, self.column_id, self.tag),
Arc::new(meta.clone()),
);
CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc();
@@ -223,6 +223,7 @@ mod test {
#[test]
fn bloom_filter_metadata_weight_counts_vec_contents() {
let file_id = FileId::parse_str("00000000-0000-0000-0000-000000000001").unwrap();
let version = 0;
let column_id: ColumnId = 42;
let tag = Tag::Skipping;
@@ -246,10 +247,13 @@ mod test {
],
};
let weight =
bloom_filter_index_metadata_weight(&(file_id, column_id, tag), &Arc::new(meta.clone()));
let weight = bloom_filter_index_metadata_weight(
&(file_id, version, column_id, tag),
&Arc::new(meta.clone()),
);
let base = file_id.as_bytes().len()
+ std::mem::size_of::<IndexVersion>()
+ std::mem::size_of::<ColumnId>()
+ std::mem::size_of::<Tag>()
+ std::mem::size_of::<BloomFilterMeta>();

View File

@@ -22,7 +22,7 @@ use bytes::Bytes;
use index::inverted_index::error::Result;
use index::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader};
use prost::Message;
use store_api::storage::FileId;
use store_api::storage::{FileId, IndexVersion};
use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey};
use crate::metrics::{CACHE_HIT, CACHE_MISS};
@@ -30,7 +30,7 @@ use crate::metrics::{CACHE_HIT, CACHE_MISS};
const INDEX_TYPE_INVERTED_INDEX: &str = "inverted_index";
/// Cache for inverted index.
pub type InvertedIndexCache = IndexCache<FileId, InvertedIndexMetas>;
pub type InvertedIndexCache = IndexCache<(FileId, IndexVersion), InvertedIndexMetas>;
pub type InvertedIndexCacheRef = Arc<InvertedIndexCache>;
impl InvertedIndexCache {
@@ -48,23 +48,24 @@ impl InvertedIndexCache {
/// Removes all cached entries for the given `file_id`.
pub fn invalidate_file(&self, file_id: FileId) {
self.invalidate_if(move |key| *key == file_id);
self.invalidate_if(move |key| key.0 == file_id);
}
}
/// Calculates weight for inverted index metadata.
fn inverted_index_metadata_weight(k: &FileId, v: &Arc<InvertedIndexMetas>) -> u32 {
(k.as_bytes().len() + v.encoded_len()) as u32
fn inverted_index_metadata_weight(k: &(FileId, IndexVersion), v: &Arc<InvertedIndexMetas>) -> u32 {
(k.0.as_bytes().len() + size_of::<IndexVersion>() + v.encoded_len()) as u32
}
/// Calculates weight for inverted index content.
fn inverted_index_content_weight((k, _): &(FileId, PageKey), v: &Bytes) -> u32 {
(k.as_bytes().len() + v.len()) as u32
fn inverted_index_content_weight((k, _): &((FileId, IndexVersion), PageKey), v: &Bytes) -> u32 {
(k.0.as_bytes().len() + size_of::<IndexVersion>() + v.len()) as u32
}
/// Inverted index blob reader with cache.
pub struct CachedInvertedIndexBlobReader<R> {
file_id: FileId,
index_version: IndexVersion,
blob_size: u64,
inner: R,
cache: InvertedIndexCacheRef,
@@ -72,9 +73,16 @@ pub struct CachedInvertedIndexBlobReader<R> {
impl<R> CachedInvertedIndexBlobReader<R> {
/// Creates a new inverted index blob reader with cache.
pub fn new(file_id: FileId, blob_size: u64, inner: R, cache: InvertedIndexCacheRef) -> Self {
pub fn new(
file_id: FileId,
index_version: IndexVersion,
blob_size: u64,
inner: R,
cache: InvertedIndexCacheRef,
) -> Self {
Self {
file_id,
index_version,
blob_size,
inner,
cache,
@@ -96,7 +104,7 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
let (result, cache_metrics) = self
.cache
.get_or_load(
self.file_id,
(self.file_id, self.index_version),
self.blob_size,
offset,
size,
@@ -129,7 +137,7 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
let (page, cache_metrics) = self
.cache
.get_or_load(
self.file_id,
(self.file_id, self.index_version),
self.blob_size,
range.start,
(range.end - range.start) as u32,
@@ -156,7 +164,7 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
&self,
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<Arc<InvertedIndexMetas>> {
if let Some(cached) = self.cache.get_metadata(self.file_id) {
if let Some(cached) = self.cache.get_metadata((self.file_id, self.index_version)) {
CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
if let Some(m) = metrics {
m.cache_hit += 1;
@@ -164,7 +172,8 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
Ok(cached)
} else {
let meta = self.inner.metadata(metrics).await?;
self.cache.put_metadata(self.file_id, meta.clone());
self.cache
.put_metadata((self.file_id, self.index_version), meta.clone());
CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc();
Ok(meta)
}
@@ -299,6 +308,7 @@ mod test {
// Init a test range reader in local fs.
let mut env = TestEnv::new().await;
let file_size = blob.len() as u64;
let index_version = 0;
let store = env.init_object_store_manager();
let temp_path = "data";
store.write(temp_path, blob).await.unwrap();
@@ -314,6 +324,7 @@ mod test {
let reader = InvertedIndexBlobReader::new(range_reader);
let cached_reader = CachedInvertedIndexBlobReader::new(
FileId::random(),
index_version,
file_size,
reader,
Arc::new(InvertedIndexCache::new(8192, 8192, 50)),
@@ -450,7 +461,7 @@ mod test {
let (read, _cache_metrics) = cached_reader
.cache
.get_or_load(
cached_reader.file_id,
(cached_reader.file_id, cached_reader.index_version),
file_size,
offset,
size,

View File

@@ -215,6 +215,7 @@ impl WriteCache {
puffin_manager: self
.puffin_manager_factory
.build(store.clone(), path_provider.clone()),
write_cache_enabled: true,
intermediate_manager: self.intermediate_manager.clone(),
index_options: write_request.index_options,
inverted_index_config: write_request.inverted_index_config,
@@ -266,7 +267,7 @@ impl WriteCache {
upload_tracker.push_uploaded_file(parquet_path);
if sst.index_metadata.file_size > 0 {
let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin);
let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin(0));
let puffin_path = upload_request
.dest_path_provider
.build_index_file_path(RegionFileId::new(region_id, sst.file_id));
@@ -439,7 +440,11 @@ impl UploadTracker {
file_cache.remove(parquet_key).await;
if sst.index_metadata.file_size > 0 {
let puffin_key = IndexKey::new(self.region_id, sst.file_id, FileType::Puffin);
let puffin_key = IndexKey::new(
self.region_id,
sst.file_id,
FileType::Puffin(sst.index_metadata.version),
);
file_cache.remove(puffin_key).await;
}
}
@@ -548,7 +553,7 @@ mod tests {
assert_eq!(remote_data.to_vec(), cache_data.to_vec());
// Check write cache contains the index key
let index_key = IndexKey::new(region_id, file_id, FileType::Puffin);
let index_key = IndexKey::new(region_id, file_id, FileType::Puffin(0));
assert!(write_cache.file_cache.contains_key(&index_key));
let remote_index_data = mock_store.read(&index_upload_path).await.unwrap();

View File

@@ -399,7 +399,7 @@ impl DefaultCompactor {
available_indexes: sst_info.index_metadata.build_available_indexes(),
indexes: sst_info.index_metadata.build_indexes(),
index_file_size: sst_info.index_metadata.file_size,
index_file_id: None,
index_version: 0,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
sequence: max_sequence,

View File

@@ -77,7 +77,7 @@ pub fn new_file_handle_with_size_and_sequence(
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
index_file_id: None,
index_version: 0,
num_rows: 0,
num_row_groups: 0,
num_series: 0,

View File

@@ -135,7 +135,7 @@ use crate::read::stream::ScanBatchStream;
use crate::region::MitoRegionRef;
use crate::region::opener::PartitionExprFetcherRef;
use crate::request::{RegionEditRequest, WorkerRequest};
use crate::sst::file::{FileMeta, RegionFileId};
use crate::sst::file::{FileMeta, RegionFileId, RegionIndexId};
use crate::sst::file_ref::FileReferenceManagerRef;
use crate::wal::entry_distributor::{
DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE, build_wal_entry_distributor_and_receivers,
@@ -541,22 +541,23 @@ impl MitoEngine {
return Vec::new();
};
let Some(index_file_id) = entry.index_file_id.as_ref() else {
return Vec::new();
};
let file_id = match FileId::parse_str(index_file_id) {
let index_version = entry.index_version;
let file_id = match FileId::parse_str(&entry.file_id) {
Ok(file_id) => file_id,
Err(err) => {
warn!(
err;
"Failed to parse puffin index file id, table_dir: {}, file_id: {}",
entry.table_dir,
index_file_id
entry.file_id
);
return Vec::new();
}
};
let region_file_id = RegionFileId::new(entry.region_id, file_id);
let region_index_id = RegionIndexId::new(
RegionFileId::new(entry.region_id, file_id),
index_version,
);
let context = IndexEntryContext {
table_dir: &entry.table_dir,
index_file_path: index_file_path.as_str(),
@@ -565,7 +566,7 @@ impl MitoEngine {
region_number: entry.region_number,
region_group: entry.region_group,
region_sequence: entry.region_sequence,
file_id: index_file_id,
file_id: &entry.file_id,
index_file_size: entry.index_file_size,
node_id,
};
@@ -576,7 +577,7 @@ impl MitoEngine {
collect_index_entries_from_puffin(
manager,
region_file_id,
region_index_id,
context,
bloom_filter_cache,
inverted_index_cache,

View File

@@ -861,9 +861,10 @@ async fn test_cache_null_primary_key_with_format(flat_format: bool) {
#[tokio::test]
async fn test_list_ssts() {
test_list_ssts_with_format(false, r#"
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2513, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2513, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2513, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"# ,r#"
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2513, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2513, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2513, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"# ,
r#"
StorageSstEntry { file_path: "test/11_0000000001/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
StorageSstEntry { file_path: "test/11_0000000001/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }
StorageSstEntry { file_path: "test/11_0000000002/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
@@ -871,9 +872,10 @@ StorageSstEntry { file_path: "test/11_0000000002/index/<file_id>.puffin", file_s
StorageSstEntry { file_path: "test/22_0000000042/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
StorageSstEntry { file_path: "test/22_0000000042/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }"#).await;
test_list_ssts_with_format(true, r#"
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_file_id: Some("<index_file_id>"), level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"#, r#"
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"#,
r#"
StorageSstEntry { file_path: "test/11_0000000001/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
StorageSstEntry { file_path: "test/11_0000000001/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }
StorageSstEntry { file_path: "test/11_0000000002/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
@@ -945,13 +947,13 @@ async fn test_list_ssts_with_format(
.index_file_path
.map(|p| p.replace(&e.file_id, "<file_id>"));
e.file_id = "<file_id>".to_string();
e.index_file_id = e.index_file_id.map(|_| "<index_file_id>".to_string());
e.index_version = 0;
format!("\n{:?}", e)
})
.sorted()
.collect::<Vec<_>>()
.join("");
assert_eq!(debug_format, expected_manifest_ssts,);
assert_eq!(debug_format, expected_manifest_ssts, "{}", debug_format);
// list from storage
let storage_entries = engine
@@ -969,7 +971,7 @@ async fn test_list_ssts_with_format(
.sorted()
.collect::<Vec<_>>()
.join("");
assert_eq!(debug_format, expected_storage_ssts,);
assert_eq!(debug_format, expected_storage_ssts, "{}", debug_format);
}
#[tokio::test]

View File

@@ -55,10 +55,10 @@ async fn num_of_index_files(engine: &MitoEngine, scanner: &Scanner, region_id: R
return 0;
}
let mut index_files_count: usize = 0;
for region_file_id in scanner.file_ids() {
for region_index_id in scanner.index_ids() {
let index_path = location::index_file_path(
access_layer.table_dir(),
region_file_id,
region_index_id,
access_layer.path_type(),
);
if access_layer

View File

@@ -32,7 +32,7 @@ use crate::cache::index::bloom_filter_index::{
BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
};
use crate::cache::index::inverted_index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef};
use crate::sst::file::RegionFileId;
use crate::sst::file::RegionIndexId;
use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE as BLOOM_BLOB_TYPE;
use crate::sst::index::fulltext_index::{
INDEX_BLOB_TYPE_BLOOM as FULLTEXT_BLOOM_BLOB_TYPE,
@@ -66,14 +66,14 @@ pub(crate) struct IndexEntryContext<'a> {
/// Collect index metadata entries present in the SST puffin file.
pub(crate) async fn collect_index_entries_from_puffin(
manager: SstPuffinManager,
region_file_id: RegionFileId,
region_index_id: RegionIndexId,
context: IndexEntryContext<'_>,
bloom_filter_cache: Option<BloomFilterIndexCacheRef>,
inverted_index_cache: Option<InvertedIndexCacheRef>,
) -> Vec<PuffinIndexMetaEntry> {
let mut entries = Vec::new();
let reader = match manager.reader(&region_file_id).await {
let reader = match manager.reader(&region_index_id).await {
Ok(reader) => reader,
Err(err) => {
warn!(
@@ -104,7 +104,7 @@ pub(crate) async fn collect_index_entries_from_puffin(
Some(BlobIndexTypeTargetKey::BloomFilter(target_key)) => {
let bloom_meta = try_read_bloom_meta(
&reader,
region_file_id,
region_index_id,
blob.blob_type.as_str(),
target_key,
bloom_filter_cache.as_ref(),
@@ -130,7 +130,7 @@ pub(crate) async fn collect_index_entries_from_puffin(
Some(BlobIndexTypeTargetKey::FulltextBloom(target_key)) => {
let bloom_meta = try_read_bloom_meta(
&reader,
region_file_id,
region_index_id,
blob.blob_type.as_str(),
target_key,
bloom_filter_cache.as_ref(),
@@ -172,7 +172,7 @@ pub(crate) async fn collect_index_entries_from_puffin(
Some(BlobIndexTypeTargetKey::Inverted) => {
let mut inverted_entries = collect_inverted_entries(
&reader,
region_file_id,
region_index_id,
inverted_index_cache.as_ref(),
&context,
)
@@ -188,12 +188,12 @@ pub(crate) async fn collect_index_entries_from_puffin(
async fn collect_inverted_entries(
reader: &SstPuffinReader,
region_file_id: RegionFileId,
region_index_id: RegionIndexId,
cache: Option<&InvertedIndexCacheRef>,
context: &IndexEntryContext<'_>,
) -> Vec<PuffinIndexMetaEntry> {
// Read the inverted index blob and surface its per-column metadata entries.
let file_id = region_file_id.file_id();
let file_id = region_index_id.file_id();
let guard = match reader.blob(INVERTED_BLOB_TYPE).await {
Ok(guard) => guard,
@@ -229,6 +229,7 @@ async fn collect_inverted_entries(
let metas = if let (Some(cache), Some(blob_size)) = (cache, blob_size) {
let reader = CachedInvertedIndexBlobReader::new(
file_id,
region_index_id.version,
blob_size,
InvertedIndexBlobReader::new(blob_reader),
cache.clone(),
@@ -289,7 +290,7 @@ fn build_inverted_entries(
async fn try_read_bloom_meta(
reader: &SstPuffinReader,
region_file_id: RegionFileId,
region_index_id: RegionIndexId,
blob_type: &str,
target_key: &str,
cache: Option<&BloomFilterIndexCacheRef>,
@@ -311,7 +312,8 @@ async fn try_read_bloom_meta(
let result = match (cache, column_id, blob_size) {
(Some(cache), Some(column_id), Some(blob_size)) => {
CachedBloomFilterIndexBlobReader::new(
region_file_id.file_id(),
region_index_id.file_id(),
region_index_id.version,
column_id,
tag,
blob_size,

View File

@@ -643,7 +643,7 @@ impl RegionFlushTask {
available_indexes: sst_info.index_metadata.build_available_indexes(),
indexes: sst_info.index_metadata.build_indexes(),
index_file_size: sst_info.index_metadata.file_size,
index_file_id: None,
index_version: 0,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
sequence: NonZeroU64::new(max_sequence),
@@ -730,11 +730,13 @@ async fn memtable_source(mem_ranges: MemtableRanges, options: &RegionOptions) ->
// dedup according to merge mode
match options.merge_mode.unwrap_or(MergeMode::LastRow) {
MergeMode::LastRow => {
Box::new(DedupReader::new(merge_reader, LastRow::new(false))) as _
}
MergeMode::LastNonNull => {
Box::new(DedupReader::new(merge_reader, LastNonNull::new(false))) as _
Box::new(DedupReader::new(merge_reader, LastRow::new(false), None)) as _
}
MergeMode::LastNonNull => Box::new(DedupReader::new(
merge_reader,
LastNonNull::new(false),
None,
)) as _,
}
};
Source::Reader(maybe_dedup)

View File

@@ -287,6 +287,14 @@ impl LocalGcWorker {
let region_id = region.region_id();
debug!("Doing gc for region {}", region_id);
// do the time consuming listing only when full_file_listing is true
// and do it first to make sure we have the latest manifest etc.
let all_entries = if self.full_file_listing {
self.list_from_object_store(&region).await?
} else {
vec![]
};
let manifest = region.manifest_ctx.manifest().await;
let region_id = manifest.metadata.region_id;
let current_files = &manifest.files;
@@ -303,10 +311,6 @@ impl LocalGcWorker {
.map(|s| s.len())
.sum::<usize>();
let concurrency = (current_files.len() / Self::CONCURRENCY_LIST_PER_FILES)
.max(1)
.min(self.opt.max_concurrent_lister_per_gc_job);
let in_used: HashSet<FileId> = current_files
.keys()
.cloned()
@@ -314,7 +318,7 @@ impl LocalGcWorker {
.collect();
let unused_files = self
.list_to_be_deleted_files(region_id, &in_used, recently_removed_files, concurrency)
.list_to_be_deleted_files(region_id, &in_used, recently_removed_files, all_entries)
.await?;
let unused_file_cnt = unused_files.len();
@@ -330,10 +334,9 @@ impl LocalGcWorker {
// TODO(discord9): for now, ignore async index file as it's design is not stable, need to be improved once
// index file design is stable
let file_pairs: Vec<(FileId, FileId)> = unused_files
.iter()
.map(|file_id| (*file_id, *file_id))
.collect();
let file_pairs: Vec<(FileId, u64)> =
unused_files.iter().map(|file_id| (*file_id, 0)).collect();
// TODO(discord9): gc worker need another major refactor to support versioned index files
debug!(
"Found {} unused index files to delete for region {}",
@@ -354,7 +357,7 @@ impl LocalGcWorker {
Ok(unused_files)
}
async fn delete_files(&self, region_id: RegionId, file_ids: &[(FileId, FileId)]) -> Result<()> {
async fn delete_files(&self, region_id: RegionId, file_ids: &[(FileId, u64)]) -> Result<()> {
delete_files(
region_id,
file_ids,
@@ -443,6 +446,32 @@ impl LocalGcWorker {
Ok(listers)
}
/// List all files in the region directory.
/// Returns a vector of all file entries found.
/// This might take a long time if there are many files in the region directory.
async fn list_from_object_store(&self, region: &MitoRegionRef) -> Result<Vec<Entry>> {
let start = tokio::time::Instant::now();
let region_id = region.region_id();
let manifest = region.manifest_ctx.manifest().await;
let current_files = &manifest.files;
let concurrency = (current_files.len() / Self::CONCURRENCY_LIST_PER_FILES)
.max(1)
.min(self.opt.max_concurrent_lister_per_gc_job);
let listers = self.partition_region_files(region_id, concurrency).await?;
let lister_cnt = listers.len();
// Step 2: Concurrently list all files in the region directory
let all_entries = self.list_region_files_concurrent(listers).await?;
let cnt = all_entries.len();
info!(
"gc: full listing mode cost {} secs using {lister_cnt} lister for {cnt} files in region {}.",
start.elapsed().as_secs_f64(),
region_id
);
Ok(all_entries)
}
/// Concurrently list all files in the region directory using the provided listers.
/// Returns a vector of all file entries found across all partitions.
async fn list_region_files_concurrent(
@@ -573,9 +602,8 @@ impl LocalGcWorker {
region_id: RegionId,
in_used: &HashSet<FileId>,
recently_removed_files: BTreeMap<Timestamp, HashSet<FileId>>,
concurrency: usize,
all_entries: Vec<Entry>,
) -> Result<Vec<FileId>> {
let start = tokio::time::Instant::now();
let now = chrono::Utc::now();
let may_linger_until = self
.opt
@@ -630,8 +658,7 @@ impl LocalGcWorker {
.collect();
info!(
"gc: fast mode (no full listing) cost {} secs for region {}, found {} files to delete from manifest",
start.elapsed().as_secs_f64(),
"gc: fast mode (no full listing) for region {}, found {} files to delete from manifest",
region_id,
files_to_delete.len()
);
@@ -639,15 +666,7 @@ impl LocalGcWorker {
return Ok(files_to_delete);
}
// Full file listing mode: perform expensive list operations to find orphan files
// Step 1: Create partitioned listers for concurrent processing
let listers = self.partition_region_files(region_id, concurrency).await?;
let lister_cnt = listers.len();
// Step 2: Concurrently list all files in the region directory
let all_entries = self.list_region_files_concurrent(listers).await?;
let cnt = all_entries.len();
// Full file listing mode: get the full list of files from object store
// Step 3: Filter files to determine which ones can be deleted
let (all_unused_files_ready_for_delete, all_in_exist_linger_files) = self
@@ -659,12 +678,6 @@ impl LocalGcWorker {
unknown_file_may_linger_until,
);
info!(
"gc: full listing mode cost {} secs using {lister_cnt} lister for {cnt} files in region {}, found {} unused files to delete",
start.elapsed().as_secs_f64(),
region_id,
all_unused_files_ready_for_delete.len()
);
debug!("All in exist linger files: {:?}", all_in_exist_linger_files);
Ok(all_unused_files_ready_for_delete)

View File

@@ -247,7 +247,7 @@ async fn checkpoint_with_different_compression_types() {
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
index_file_id: None,
index_version: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,
@@ -312,7 +312,7 @@ fn generate_action_lists(num: usize) -> (Vec<FileId>, Vec<RegionMetaActionList>)
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
index_file_id: None,
index_version: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,

View File

@@ -57,6 +57,10 @@ pub(crate) mod version;
#[cfg(any(test, feature = "test"))]
pub use bulk::part::BulkPart;
pub use bulk::part::{
BulkPartEncoder, BulkPartMeta, UnorderedPart, record_batch_estimated_size,
sort_primary_key_record_batch,
};
#[cfg(any(test, feature = "test"))]
pub use time_partition::filter_record_batch;

View File

@@ -464,7 +464,7 @@ impl UnorderedPart {
}
/// More accurate estimation of the size of a record batch.
pub(crate) fn record_batch_estimated_size(batch: &RecordBatch) -> usize {
pub fn record_batch_estimated_size(batch: &RecordBatch) -> usize {
batch
.columns()
.iter()
@@ -715,7 +715,7 @@ fn new_primary_key_column_builders(
}
/// Sorts the record batch with primary key format.
fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result<RecordBatch> {
pub fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result<RecordBatch> {
let total_columns = batch.num_columns();
let sort_columns = vec![
// Primary key column (ascending)

View File

@@ -627,7 +627,7 @@ mod tests {
.await
.unwrap();
let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false));
let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false), None);
let mut num_rows = 0;
while let Some(b) = reader.next_batch().await.unwrap() {
num_rows += b.num_rows();
@@ -659,7 +659,7 @@ mod tests {
.await
.unwrap();
let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false));
let mut reader = DedupReader::new(reader, read::dedup::LastRow::new(false), None);
let mut num_rows = 0;
while let Some(b) = reader.next_batch().await.unwrap() {
num_rows += b.num_rows();

View File

@@ -14,6 +14,10 @@
//! Utilities to remove duplicate rows from a sorted batch.
use std::fmt;
use std::sync::Arc;
use std::time::{Duration, Instant};
use api::v1::OpType;
use async_trait::async_trait;
use common_telemetry::debug;
@@ -27,21 +31,34 @@ use crate::error::Result;
use crate::metrics::MERGE_FILTER_ROWS_TOTAL;
use crate::read::{Batch, BatchColumn, BatchReader};
/// Trait for reporting dedup metrics.
pub trait DedupMetricsReport: Send + Sync {
/// Reports and resets the metrics.
fn report(&self, metrics: &mut DedupMetrics);
}
/// A reader that dedup sorted batches from a source based on the
/// dedup strategy.
pub struct DedupReader<R, S> {
source: R,
strategy: S,
metrics: DedupMetrics,
/// Optional metrics reporter.
metrics_reporter: Option<Arc<dyn DedupMetricsReport>>,
}
impl<R, S> DedupReader<R, S> {
/// Creates a new dedup reader.
pub fn new(source: R, strategy: S) -> Self {
pub fn new(
source: R,
strategy: S,
metrics_reporter: Option<Arc<dyn DedupMetricsReport>>,
) -> Self {
Self {
source,
strategy,
metrics: DedupMetrics::default(),
metrics_reporter,
}
}
}
@@ -51,11 +68,14 @@ impl<R: BatchReader, S: DedupStrategy> DedupReader<R, S> {
async fn fetch_next_batch(&mut self) -> Result<Option<Batch>> {
while let Some(batch) = self.source.next_batch().await? {
if let Some(batch) = self.strategy.push_batch(batch, &mut self.metrics)? {
self.metrics.maybe_report(&self.metrics_reporter);
return Ok(Some(batch));
}
}
self.strategy.finish(&mut self.metrics)
let result = self.strategy.finish(&mut self.metrics)?;
self.metrics.maybe_report(&self.metrics_reporter);
Ok(result)
}
}
@@ -76,6 +96,11 @@ impl<R, S> Drop for DedupReader<R, S> {
MERGE_FILTER_ROWS_TOTAL
.with_label_values(&["delete"])
.inc_by(self.metrics.num_unselected_rows as u64);
// Report any remaining metrics.
if let Some(reporter) = &self.metrics_reporter {
reporter.report(&mut self.metrics);
}
}
}
@@ -138,6 +163,8 @@ impl DedupStrategy for LastRow {
mut batch: Batch,
metrics: &mut DedupMetrics,
) -> Result<Option<Batch>> {
let start = Instant::now();
if batch.is_empty() {
return Ok(None);
}
@@ -160,6 +187,7 @@ impl DedupStrategy for LastRow {
if batch.num_rows() == 1 {
// We don't need to update `prev_batch` because they have the same
// key and timestamp.
metrics.dedup_cost += start.elapsed();
return Ok(None);
}
// Skips the first row.
@@ -189,6 +217,8 @@ impl DedupStrategy for LastRow {
filter_deleted_from_batch(&mut batch, metrics)?;
}
metrics.dedup_cost += start.elapsed();
// The batch can become empty if all rows are deleted.
if batch.is_empty() {
Ok(None)
@@ -215,12 +245,58 @@ fn filter_deleted_from_batch(batch: &mut Batch, metrics: &mut DedupMetrics) -> R
}
/// Metrics for deduplication.
#[derive(Debug, Default)]
#[derive(Default)]
pub struct DedupMetrics {
/// Number of rows removed during deduplication.
pub(crate) num_unselected_rows: usize,
/// Number of deleted rows.
pub(crate) num_deleted_rows: usize,
/// Time spent on deduplication.
pub(crate) dedup_cost: Duration,
}
impl fmt::Debug for DedupMetrics {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// Skip output if dedup_cost is zero
if self.dedup_cost.is_zero() {
return write!(f, "{{}}");
}
write!(f, r#"{{"dedup_cost":"{:?}""#, self.dedup_cost)?;
if self.num_unselected_rows > 0 {
write!(f, r#", "num_unselected_rows":{}"#, self.num_unselected_rows)?;
}
if self.num_deleted_rows > 0 {
write!(f, r#", "num_deleted_rows":{}"#, self.num_deleted_rows)?;
}
write!(f, "}}")
}
}
impl DedupMetrics {
/// Merges metrics from another DedupMetrics instance.
pub(crate) fn merge(&mut self, other: &DedupMetrics) {
let DedupMetrics {
num_unselected_rows,
num_deleted_rows,
dedup_cost,
} = other;
self.num_unselected_rows += *num_unselected_rows;
self.num_deleted_rows += *num_deleted_rows;
self.dedup_cost += *dedup_cost;
}
/// Reports the metrics if dedup_cost exceeds 10ms and resets them.
pub(crate) fn maybe_report(&mut self, reporter: &Option<Arc<dyn DedupMetricsReport>>) {
if self.dedup_cost.as_millis() > 10
&& let Some(r) = reporter
{
r.report(self);
}
}
}
/// Buffer to store fields in the last row to merge.
@@ -427,6 +503,8 @@ impl LastNonNull {
impl DedupStrategy for LastNonNull {
fn push_batch(&mut self, batch: Batch, metrics: &mut DedupMetrics) -> Result<Option<Batch>> {
let start = Instant::now();
if batch.is_empty() {
return Ok(None);
}
@@ -444,6 +522,7 @@ impl DedupStrategy for LastNonNull {
// Next key is different.
let buffer = std::mem::replace(buffer, batch);
let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
metrics.dedup_cost += start.elapsed();
return Ok(merged);
}
@@ -451,6 +530,7 @@ impl DedupStrategy for LastNonNull {
// The next batch has a different timestamp.
let buffer = std::mem::replace(buffer, batch);
let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
metrics.dedup_cost += start.elapsed();
return Ok(merged);
}
@@ -460,6 +540,7 @@ impl DedupStrategy for LastNonNull {
// We assumes each batch doesn't contain duplicate rows so we only need to check the first row.
if batch.num_rows() == 1 {
self.last_fields.push_first_row(&batch);
metrics.dedup_cost += start.elapsed();
return Ok(None);
}
@@ -472,10 +553,14 @@ impl DedupStrategy for LastNonNull {
let buffer = std::mem::replace(buffer, batch);
let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
metrics.dedup_cost += start.elapsed();
Ok(merged)
}
fn finish(&mut self, metrics: &mut DedupMetrics) -> Result<Option<Batch>> {
let start = Instant::now();
let Some(buffer) = self.buffer.take() else {
return Ok(None);
};
@@ -485,6 +570,8 @@ impl DedupStrategy for LastNonNull {
let merged = self.last_fields.merge_last_non_null(buffer, metrics)?;
metrics.dedup_cost += start.elapsed();
Ok(merged)
}
}
@@ -614,14 +701,14 @@ mod tests {
// Test last row.
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastRow::new(true));
let mut reader = DedupReader::new(reader, LastRow::new(true), None);
check_reader_result(&mut reader, &input).await;
assert_eq!(0, reader.metrics().num_unselected_rows);
assert_eq!(0, reader.metrics().num_deleted_rows);
// Test last non-null.
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastNonNull::new(true));
let mut reader = DedupReader::new(reader, LastNonNull::new(true), None);
check_reader_result(&mut reader, &input).await;
assert_eq!(0, reader.metrics().num_unselected_rows);
assert_eq!(0, reader.metrics().num_deleted_rows);
@@ -662,7 +749,7 @@ mod tests {
];
// Filter deleted.
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastRow::new(true));
let mut reader = DedupReader::new(reader, LastRow::new(true), None);
check_reader_result(
&mut reader,
&[
@@ -684,7 +771,7 @@ mod tests {
// Does not filter deleted.
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastRow::new(false));
let mut reader = DedupReader::new(reader, LastRow::new(false), None);
check_reader_result(
&mut reader,
&[
@@ -801,7 +888,7 @@ mod tests {
// Filter deleted.
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastNonNull::new(true));
let mut reader = DedupReader::new(reader, LastNonNull::new(true), None);
check_reader_result(
&mut reader,
&[
@@ -835,7 +922,7 @@ mod tests {
// Does not filter deleted.
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastNonNull::new(false));
let mut reader = DedupReader::new(reader, LastNonNull::new(false), None);
check_reader_result(
&mut reader,
&[
@@ -885,7 +972,7 @@ mod tests {
)];
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastNonNull::new(true));
let mut reader = DedupReader::new(reader, LastNonNull::new(true), None);
check_reader_result(
&mut reader,
&[new_batch_multi_fields(
@@ -901,7 +988,7 @@ mod tests {
assert_eq!(1, reader.metrics().num_deleted_rows);
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastNonNull::new(false));
let mut reader = DedupReader::new(reader, LastNonNull::new(false), None);
check_reader_result(&mut reader, &input).await;
assert_eq!(0, reader.metrics().num_unselected_rows);
assert_eq!(0, reader.metrics().num_deleted_rows);
@@ -928,7 +1015,7 @@ mod tests {
];
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastNonNull::new(true));
let mut reader = DedupReader::new(reader, LastNonNull::new(true), None);
check_reader_result(
&mut reader,
&[
@@ -962,7 +1049,7 @@ mod tests {
];
let reader = VecBatchReader::new(&input);
let mut reader = DedupReader::new(reader, LastNonNull::new(true));
let mut reader = DedupReader::new(reader, LastNonNull::new(true), None);
check_reader_result(
&mut reader,
&[

View File

@@ -15,9 +15,12 @@
//! Dedup implementation for flat format.
use std::ops::Range;
use std::sync::Arc;
use std::time::Instant;
use api::v1::OpType;
use async_stream::try_stream;
use common_telemetry::debug;
use datatypes::arrow::array::{
Array, ArrayRef, BinaryArray, BooleanArray, BooleanBufferBuilder, UInt8Array, UInt64Array,
make_comparator,
@@ -36,7 +39,8 @@ use snafu::ResultExt;
use crate::error::{ComputeArrowSnafu, NewRecordBatchSnafu, Result};
use crate::memtable::partition_tree::data::timestamp_array_to_i64_slice;
use crate::read::dedup::DedupMetrics;
use crate::metrics::MERGE_FILTER_ROWS_TOTAL;
use crate::read::dedup::{DedupMetrics, DedupMetricsReport};
use crate::sst::parquet::flat_format::{
op_type_column_index, primary_key_column_index, time_index_column_index,
};
@@ -88,15 +92,22 @@ pub struct FlatDedupReader<I, S> {
stream: I,
strategy: S,
metrics: DedupMetrics,
/// Optional metrics reporter.
metrics_reporter: Option<Arc<dyn DedupMetricsReport>>,
}
impl<I, S> FlatDedupReader<I, S> {
/// Creates a new dedup iterator.
pub fn new(stream: I, strategy: S) -> Self {
/// Creates a new dedup reader.
pub fn new(
stream: I,
strategy: S,
metrics_reporter: Option<Arc<dyn DedupMetricsReport>>,
) -> Self {
Self {
stream,
strategy,
metrics: DedupMetrics::default(),
metrics_reporter,
}
}
}
@@ -108,11 +119,14 @@ impl<I: Stream<Item = Result<RecordBatch>> + Unpin, S: RecordBatchDedupStrategy>
async fn fetch_next_batch(&mut self) -> Result<Option<RecordBatch>> {
while let Some(batch) = self.stream.try_next().await? {
if let Some(batch) = self.strategy.push_batch(batch, &mut self.metrics)? {
self.metrics.maybe_report(&self.metrics_reporter);
return Ok(Some(batch));
}
}
self.strategy.finish(&mut self.metrics)
let result = self.strategy.finish(&mut self.metrics)?;
self.metrics.maybe_report(&self.metrics_reporter);
Ok(result)
}
/// Converts the reader into a stream.
@@ -125,6 +139,24 @@ impl<I: Stream<Item = Result<RecordBatch>> + Unpin, S: RecordBatchDedupStrategy>
}
}
impl<I, S> Drop for FlatDedupReader<I, S> {
fn drop(&mut self) {
debug!("Flat dedup reader finished, metrics: {:?}", self.metrics);
MERGE_FILTER_ROWS_TOTAL
.with_label_values(&["dedup"])
.inc_by(self.metrics.num_unselected_rows as u64);
MERGE_FILTER_ROWS_TOTAL
.with_label_values(&["delete"])
.inc_by(self.metrics.num_deleted_rows as u64);
// Report any remaining metrics.
if let Some(reporter) = &self.metrics_reporter {
reporter.report(&mut self.metrics);
}
}
}
/// Strategy to remove duplicate rows from sorted record batches.
pub trait RecordBatchDedupStrategy: Send {
/// Pushes a batch to the dedup strategy.
@@ -214,6 +246,8 @@ impl RecordBatchDedupStrategy for FlatLastRow {
batch: RecordBatch,
metrics: &mut DedupMetrics,
) -> Result<Option<RecordBatch>> {
let start = Instant::now();
if batch.num_rows() == 0 {
return Ok(None);
}
@@ -235,6 +269,7 @@ impl RecordBatchDedupStrategy for FlatLastRow {
// The batch after dedup is empty.
// We don't need to update `prev_batch` because they have the same
// key and timestamp.
metrics.dedup_cost += start.elapsed();
return Ok(None);
};
@@ -246,7 +281,11 @@ impl RecordBatchDedupStrategy for FlatLastRow {
self.prev_batch = Some(batch_last_row);
// Filters deleted rows at last.
maybe_filter_deleted(batch, self.filter_deleted, metrics)
let result = maybe_filter_deleted(batch, self.filter_deleted, metrics);
metrics.dedup_cost += start.elapsed();
result
}
fn finish(&mut self, _metrics: &mut DedupMetrics) -> Result<Option<RecordBatch>> {
@@ -275,6 +314,8 @@ impl RecordBatchDedupStrategy for FlatLastNonNull {
batch: RecordBatch,
metrics: &mut DedupMetrics,
) -> Result<Option<RecordBatch>> {
let start = Instant::now();
if batch.num_rows() == 0 {
return Ok(None);
}
@@ -290,6 +331,7 @@ impl RecordBatchDedupStrategy for FlatLastNonNull {
self.buffer = BatchLastRow::try_new(record_batch);
self.contains_delete = contains_delete;
metrics.dedup_cost += start.elapsed();
return Ok(None);
};
@@ -305,7 +347,9 @@ impl RecordBatchDedupStrategy for FlatLastNonNull {
self.buffer = BatchLastRow::try_new(record_batch);
self.contains_delete = contains_delete;
return maybe_filter_deleted(buffer.last_batch, self.filter_deleted, metrics);
let result = maybe_filter_deleted(buffer.last_batch, self.filter_deleted, metrics);
metrics.dedup_cost += start.elapsed();
return result;
}
// The next batch has duplicated rows.
@@ -332,6 +376,8 @@ impl RecordBatchDedupStrategy for FlatLastNonNull {
self.buffer = BatchLastRow::try_new(record_batch);
self.contains_delete = contains_delete;
metrics.dedup_cost += start.elapsed();
Ok(output)
}
@@ -340,7 +386,13 @@ impl RecordBatchDedupStrategy for FlatLastNonNull {
return Ok(None);
};
maybe_filter_deleted(buffer.last_batch, self.filter_deleted, metrics)
let start = Instant::now();
let result = maybe_filter_deleted(buffer.last_batch, self.filter_deleted, metrics);
metrics.dedup_cost += start.elapsed();
result
}
}

View File

@@ -15,8 +15,10 @@
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::sync::Arc;
use std::time::Instant;
use async_stream::try_stream;
use common_telemetry::debug;
use datatypes::arrow::array::{Int64Array, UInt64Array};
use datatypes::arrow::compute::interleave;
use datatypes::arrow::datatypes::SchemaRef;
@@ -29,7 +31,9 @@ use store_api::storage::SequenceNumber;
use crate::error::{ComputeArrowSnafu, Result};
use crate::memtable::BoxedRecordBatchIterator;
use crate::metrics::READ_STAGE_ELAPSED;
use crate::read::BoxedRecordBatchStream;
use crate::read::merge::{MergeMetrics, MergeMetricsReport};
use crate::sst::parquet::flat_format::{
primary_key_column_index, sequence_column_index, time_index_column_index,
};
@@ -462,12 +466,14 @@ impl FlatMergeIterator {
let algo = MergeAlgo::new(nodes);
Ok(Self {
let iter = Self {
algo,
in_progress,
output_batch: None,
batch_size,
})
};
Ok(iter)
}
/// Fetches next sorted batch.
@@ -484,12 +490,7 @@ impl FlatMergeIterator {
}
}
if let Some(batch) = self.output_batch.take() {
Ok(Some(batch))
} else {
// No more batches.
Ok(None)
}
Ok(self.output_batch.take())
}
/// Fetches a batch from the hottest node.
@@ -562,6 +563,10 @@ pub struct FlatMergeReader {
/// This is not a hard limit, the iterator may return smaller batches to avoid concatenating
/// rows.
batch_size: usize,
/// Local metrics.
metrics: MergeMetrics,
/// Optional metrics reporter.
metrics_reporter: Option<Arc<dyn MergeMetricsReport>>,
}
impl FlatMergeReader {
@@ -570,7 +575,10 @@ impl FlatMergeReader {
schema: SchemaRef,
iters: Vec<BoxedRecordBatchStream>,
batch_size: usize,
metrics_reporter: Option<Arc<dyn MergeMetricsReport>>,
) -> Result<Self> {
let start = Instant::now();
let metrics = MergeMetrics::default();
let mut in_progress = BatchBuilder::new(schema, iters.len(), batch_size);
let mut nodes = Vec::with_capacity(iters.len());
// Initialize nodes and the buffer.
@@ -588,16 +596,24 @@ impl FlatMergeReader {
let algo = MergeAlgo::new(nodes);
Ok(Self {
let mut reader = Self {
algo,
in_progress,
output_batch: None,
batch_size,
})
metrics,
metrics_reporter,
};
let elapsed = start.elapsed();
reader.metrics.init_cost += elapsed;
reader.metrics.scan_cost += elapsed;
Ok(reader)
}
/// Fetches next sorted batch.
pub async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
let start = Instant::now();
while self.algo.has_rows() && self.output_batch.is_none() {
if self.algo.can_fetch_batch() && !self.in_progress.is_empty() {
// Only one batch in the hot heap, but we have pending rows, output the pending rows first.
@@ -605,15 +621,21 @@ impl FlatMergeReader {
debug_assert!(self.output_batch.is_some());
} else if self.algo.can_fetch_batch() {
self.fetch_batch_from_hottest().await?;
self.metrics.num_fetch_by_batches += 1;
} else {
self.fetch_row_from_hottest().await?;
self.metrics.num_fetch_by_rows += 1;
}
}
if let Some(batch) = self.output_batch.take() {
self.metrics.scan_cost += start.elapsed();
self.metrics.maybe_report(&self.metrics_reporter);
Ok(Some(batch))
} else {
// No more batches.
self.metrics.scan_cost += start.elapsed();
self.metrics.maybe_report(&self.metrics_reporter);
Ok(None)
}
}
@@ -634,7 +656,9 @@ impl FlatMergeReader {
// Safety: next_batch() ensures the heap is not empty.
let mut hottest = self.algo.pop_hot().unwrap();
debug_assert!(!hottest.current_cursor().is_finished());
let start = Instant::now();
let next = hottest.advance_batch().await?;
self.metrics.fetch_cost += start.elapsed();
// The node is the heap is not empty, so it must have existing rows in the builder.
let batch = self
.in_progress
@@ -658,8 +682,12 @@ impl FlatMergeReader {
}
}
let start = Instant::now();
if let Some(next) = hottest.advance_row().await? {
self.metrics.fetch_cost += start.elapsed();
self.in_progress.push_batch(hottest.node_index, next);
} else {
self.metrics.fetch_cost += start.elapsed();
}
self.algo.reheap(hottest);
@@ -675,6 +703,24 @@ impl FlatMergeReader {
}
}
impl Drop for FlatMergeReader {
fn drop(&mut self) {
debug!("Flat merge reader finished, metrics: {:?}", self.metrics);
READ_STAGE_ELAPSED
.with_label_values(&["flat_merge"])
.observe(self.metrics.scan_cost.as_secs_f64());
READ_STAGE_ELAPSED
.with_label_values(&["flat_merge_fetch"])
.observe(self.metrics.fetch_cost.as_secs_f64());
// Report any remaining metrics.
if let Some(reporter) = &self.metrics_reporter {
reporter.report(&mut self.metrics);
}
}
}
/// A sync node in the merge iterator.
struct GenericNode<T> {
/// Index of the node.

View File

@@ -16,8 +16,9 @@
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::mem;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::{fmt, mem};
use async_trait::async_trait;
use common_telemetry::debug;
@@ -27,6 +28,12 @@ use crate::memtable::BoxedBatchIterator;
use crate::metrics::READ_STAGE_ELAPSED;
use crate::read::{Batch, BatchReader, BoxedBatchReader, Source};
/// Trait for reporting merge metrics.
pub trait MergeMetricsReport: Send + Sync {
/// Reports and resets the metrics.
fn report(&self, metrics: &mut MergeMetrics);
}
/// Reader to merge sorted batches.
///
/// The merge reader merges [Batch]es from multiple sources that yield sorted batches.
@@ -51,7 +58,9 @@ pub struct MergeReader {
/// Batch to output.
output_batch: Option<Batch>,
/// Local metrics.
metrics: Metrics,
metrics: MergeMetrics,
/// Optional metrics reporter.
metrics_reporter: Option<Arc<dyn MergeMetricsReport>>,
}
#[async_trait]
@@ -72,11 +81,12 @@ impl BatchReader for MergeReader {
if let Some(batch) = self.output_batch.take() {
self.metrics.scan_cost += start.elapsed();
self.metrics.num_output_rows += batch.num_rows();
self.metrics.maybe_report(&self.metrics_reporter);
Ok(Some(batch))
} else {
// Nothing fetched.
self.metrics.scan_cost += start.elapsed();
self.metrics.maybe_report(&self.metrics_reporter);
Ok(None)
}
}
@@ -92,14 +102,22 @@ impl Drop for MergeReader {
READ_STAGE_ELAPSED
.with_label_values(&["merge_fetch"])
.observe(self.metrics.fetch_cost.as_secs_f64());
// Report any remaining metrics.
if let Some(reporter) = &self.metrics_reporter {
reporter.report(&mut self.metrics);
}
}
}
impl MergeReader {
/// Creates and initializes a new [MergeReader].
pub async fn new(sources: Vec<Source>) -> Result<MergeReader> {
pub async fn new(
sources: Vec<Source>,
metrics_reporter: Option<Arc<dyn MergeMetricsReport>>,
) -> Result<MergeReader> {
let start = Instant::now();
let mut metrics = Metrics::default();
let mut metrics = MergeMetrics::default();
let mut cold = BinaryHeap::with_capacity(sources.len());
let hot = BinaryHeap::with_capacity(sources.len());
@@ -116,11 +134,14 @@ impl MergeReader {
cold,
output_batch: None,
metrics,
metrics_reporter,
};
// Initializes the reader.
reader.refill_hot();
reader.metrics.scan_cost += start.elapsed();
let elapsed = start.elapsed();
reader.metrics.init_cost += elapsed;
reader.metrics.scan_cost += elapsed;
Ok(reader)
}
@@ -250,6 +271,8 @@ pub struct MergeReaderBuilder {
///
/// All source must yield batches with the same schema.
sources: Vec<Source>,
/// Optional metrics reporter.
metrics_reporter: Option<Arc<dyn MergeMetricsReport>>,
}
impl MergeReaderBuilder {
@@ -260,7 +283,10 @@ impl MergeReaderBuilder {
/// Creates a builder from sources.
pub fn from_sources(sources: Vec<Source>) -> MergeReaderBuilder {
MergeReaderBuilder { sources }
MergeReaderBuilder {
sources,
metrics_reporter: None,
}
}
/// Pushes a batch reader to sources.
@@ -275,28 +301,94 @@ impl MergeReaderBuilder {
self
}
/// Sets the metrics reporter.
pub fn with_metrics_reporter(
&mut self,
reporter: Option<Arc<dyn MergeMetricsReport>>,
) -> &mut Self {
self.metrics_reporter = reporter;
self
}
/// Builds and initializes the reader, then resets the builder.
pub async fn build(&mut self) -> Result<MergeReader> {
let sources = mem::take(&mut self.sources);
MergeReader::new(sources).await
let metrics_reporter = self.metrics_reporter.take();
MergeReader::new(sources, metrics_reporter).await
}
}
/// Metrics for the merge reader.
#[derive(Debug, Default)]
struct Metrics {
#[derive(Default)]
pub struct MergeMetrics {
/// Cost to initialize the reader.
pub(crate) init_cost: Duration,
/// Total scan cost of the reader.
scan_cost: Duration,
pub(crate) scan_cost: Duration,
/// Number of times to fetch batches.
num_fetch_by_batches: usize,
pub(crate) num_fetch_by_batches: usize,
/// Number of times to fetch rows.
num_fetch_by_rows: usize,
/// Number of input rows.
num_input_rows: usize,
/// Number of output rows.
num_output_rows: usize,
pub(crate) num_fetch_by_rows: usize,
/// Cost to fetch batches from sources.
fetch_cost: Duration,
pub(crate) fetch_cost: Duration,
}
impl fmt::Debug for MergeMetrics {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// Skip output if scan_cost is zero
if self.scan_cost.is_zero() {
return write!(f, "{{}}");
}
write!(f, r#"{{"scan_cost":"{:?}""#, self.scan_cost)?;
if !self.init_cost.is_zero() {
write!(f, r#", "init_cost":"{:?}""#, self.init_cost)?;
}
if self.num_fetch_by_batches > 0 {
write!(
f,
r#", "num_fetch_by_batches":{}"#,
self.num_fetch_by_batches
)?;
}
if self.num_fetch_by_rows > 0 {
write!(f, r#", "num_fetch_by_rows":{}"#, self.num_fetch_by_rows)?;
}
if !self.fetch_cost.is_zero() {
write!(f, r#", "fetch_cost":"{:?}""#, self.fetch_cost)?;
}
write!(f, "}}")
}
}
impl MergeMetrics {
/// Merges metrics from another MergeMetrics instance.
pub(crate) fn merge(&mut self, other: &MergeMetrics) {
let MergeMetrics {
init_cost,
scan_cost,
num_fetch_by_batches,
num_fetch_by_rows,
fetch_cost,
} = other;
self.init_cost += *init_cost;
self.scan_cost += *scan_cost;
self.num_fetch_by_batches += *num_fetch_by_batches;
self.num_fetch_by_rows += *num_fetch_by_rows;
self.fetch_cost += *fetch_cost;
}
/// Reports the metrics if scan_cost exceeds 10ms and resets them.
pub(crate) fn maybe_report(&mut self, reporter: &Option<Arc<dyn MergeMetricsReport>>) {
if self.scan_cost.as_millis() > 10
&& let Some(r) = reporter
{
r.report(self);
}
}
}
/// A `Node` represent an individual input data source to be merged.
@@ -313,12 +405,11 @@ impl Node {
/// Initialize a node.
///
/// It tries to fetch one batch from the `source`.
async fn new(mut source: Source, metrics: &mut Metrics) -> Result<Node> {
async fn new(mut source: Source, metrics: &mut MergeMetrics) -> Result<Node> {
// Ensures batch is not empty.
let start = Instant::now();
let current_batch = source.next_batch().await?.map(CompareFirst);
metrics.fetch_cost += start.elapsed();
metrics.num_input_rows += current_batch.as_ref().map(|b| b.0.num_rows()).unwrap_or(0);
Ok(Node {
source,
@@ -352,17 +443,12 @@ impl Node {
///
/// # Panics
/// Panics if the node has reached EOF.
async fn fetch_batch(&mut self, metrics: &mut Metrics) -> Result<Batch> {
async fn fetch_batch(&mut self, metrics: &mut MergeMetrics) -> Result<Batch> {
let current = self.current_batch.take().unwrap();
let start = Instant::now();
// Ensures batch is not empty.
self.current_batch = self.source.next_batch().await?.map(CompareFirst);
metrics.fetch_cost += start.elapsed();
metrics.num_input_rows += self
.current_batch
.as_ref()
.map(|b| b.0.num_rows())
.unwrap_or(0);
Ok(current.0)
}
@@ -390,7 +476,7 @@ impl Node {
///
/// # Panics
/// Panics if the node is EOF.
async fn skip_rows(&mut self, num_to_skip: usize, metrics: &mut Metrics) -> Result<()> {
async fn skip_rows(&mut self, num_to_skip: usize, metrics: &mut MergeMetrics) -> Result<()> {
let batch = self.current_batch();
debug_assert!(batch.num_rows() >= num_to_skip);
@@ -547,9 +633,6 @@ mod tests {
],
)
.await;
assert_eq!(8, reader.metrics.num_input_rows);
assert_eq!(8, reader.metrics.num_output_rows);
}
#[tokio::test]
@@ -666,9 +749,6 @@ mod tests {
],
)
.await;
assert_eq!(11, reader.metrics.num_input_rows);
assert_eq!(11, reader.metrics.num_output_rows);
}
#[tokio::test]

View File

@@ -84,6 +84,14 @@ impl ProjectionMapper {
}
}
/// Returns true if the projection includes any tag columns.
pub(crate) fn has_tags(&self) -> bool {
match self {
ProjectionMapper::PrimaryKey(m) => m.has_tags(),
ProjectionMapper::Flat(_) => false,
}
}
/// Returns ids of projected columns that we need to read
/// from memtables and SSTs.
pub(crate) fn column_ids(&self) -> &[ColumnId] {
@@ -257,6 +265,11 @@ impl PrimaryKeyProjectionMapper {
&self.metadata
}
/// Returns true if the projection includes any tag columns.
pub(crate) fn has_tags(&self) -> bool {
self.has_tags
}
/// Returns ids of projected columns that we need to read
/// from memtables and SSTs.
pub(crate) fn column_ids(&self) -> &[ColumnId] {

View File

@@ -135,6 +135,14 @@ impl Scanner {
}
}
pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
match self {
Scanner::Seq(seq_scan) => seq_scan.input().index_ids(),
Scanner::Unordered(unordered_scan) => unordered_scan.input().index_ids(),
Scanner::Series(series_scan) => series_scan.input().index_ids(),
}
}
/// Sets the target partitions for the scanner. It can controls the parallelism of the scanner.
pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
use store_api::region_engine::{PrepareRequest, RegionScanner};
@@ -958,6 +966,7 @@ impl ScanInput {
) -> Result<FileRangeBuilder> {
let predicate = self.predicate_for_file(file);
let filter_mode = pre_filter_mode(self.append_mode, self.merge_mode);
let decode_pk_values = !self.compaction && self.mapper.has_tags();
let res = self
.access_layer
.read_sst(file.clone())
@@ -971,6 +980,7 @@ impl ScanInput {
.flat_format(self.flat_format)
.compaction(self.compaction)
.pre_filter_mode(filter_mode)
.decode_primary_key_values(decode_pk_values)
.build_reader_input(reader_metrics)
.await;
let (mut file_range_ctx, selection) = match res {
@@ -1160,6 +1170,10 @@ impl ScanInput {
pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
self.files.iter().map(|file| file.file_id()).collect()
}
pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
self.files.iter().map(|file| file.index_id()).collect()
}
}
fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {

View File

@@ -37,6 +37,8 @@ use crate::metrics::{
IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROW_GROUPS_TOTAL,
READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_RETURN, READ_STAGE_ELAPSED,
};
use crate::read::dedup::{DedupMetrics, DedupMetricsReport};
use crate::read::merge::{MergeMetrics, MergeMetricsReport};
use crate::read::range::{RangeBuilderList, RangeMeta, RowGroupIndex};
use crate::read::scan_region::StreamContext;
use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source};
@@ -130,6 +132,11 @@ pub(crate) struct ScanMetricsSet {
/// Duration of the series distributor to yield.
distributor_yield_cost: Duration,
/// Merge metrics.
merge_metrics: MergeMetrics,
/// Dedup metrics.
dedup_metrics: DedupMetrics,
/// The stream reached EOF
stream_eof: bool,
@@ -180,6 +187,8 @@ impl fmt::Debug for ScanMetricsSet {
num_distributor_batches,
distributor_scan_cost,
distributor_yield_cost,
merge_metrics,
dedup_metrics,
stream_eof,
mem_scan_cost,
mem_rows,
@@ -307,6 +316,16 @@ impl fmt::Debug for ScanMetricsSet {
write!(f, ", \"metadata_cache_metrics\":{:?}", metrics)?;
}
// Write merge metrics if not empty
if !merge_metrics.scan_cost.is_zero() {
write!(f, ", \"merge_metrics\":{:?}", merge_metrics)?;
}
// Write dedup metrics if not empty
if !dedup_metrics.dedup_cost.is_zero() {
write!(f, ", \"dedup_metrics\":{:?}", dedup_metrics)?;
}
write!(f, ", \"stream_eof\":{stream_eof}}}")
}
}
@@ -531,6 +550,28 @@ impl PartitionMetricsInner {
}
}
impl MergeMetricsReport for PartitionMetricsInner {
fn report(&self, metrics: &mut MergeMetrics) {
let mut scan_metrics = self.metrics.lock().unwrap();
// Merge the metrics into scan_metrics
scan_metrics.merge_metrics.merge(metrics);
// Reset the input metrics
*metrics = MergeMetrics::default();
}
}
impl DedupMetricsReport for PartitionMetricsInner {
fn report(&self, metrics: &mut DedupMetrics) {
let mut scan_metrics = self.metrics.lock().unwrap();
// Merge the metrics into scan_metrics
scan_metrics.dedup_metrics.merge(metrics);
// Reset the input metrics
*metrics = DedupMetrics::default();
}
}
impl Drop for PartitionMetricsInner {
fn drop(&mut self) {
self.on_finish(false);
@@ -703,6 +744,16 @@ impl PartitionMetrics {
pub(crate) fn explain_verbose(&self) -> bool {
self.0.explain_verbose
}
/// Returns a MergeMetricsReport trait object for reporting merge metrics.
pub(crate) fn merge_metrics_reporter(&self) -> Arc<dyn MergeMetricsReport> {
self.0.clone()
}
/// Returns a DedupMetricsReport trait object for reporting dedup metrics.
pub(crate) fn dedup_metrics_reporter(&self) -> Arc<dyn DedupMetricsReport> {
self.0.clone()
}
}
impl fmt::Debug for PartitionMetrics {

View File

@@ -189,7 +189,7 @@ impl SeqScan {
partition_ranges.len(),
sources.len()
);
Self::build_reader_from_sources(stream_ctx, sources, None).await
Self::build_reader_from_sources(stream_ctx, sources, None, None).await
}
/// Builds a merge reader that reads all flat ranges.
@@ -223,7 +223,7 @@ impl SeqScan {
partition_ranges.len(),
sources.len()
);
Self::build_flat_reader_from_sources(stream_ctx, sources, None).await
Self::build_flat_reader_from_sources(stream_ctx, sources, None, None).await
}
/// Builds a reader to read sources. If `semaphore` is provided, reads sources in parallel
@@ -233,6 +233,7 @@ impl SeqScan {
stream_ctx: &StreamContext,
mut sources: Vec<Source>,
semaphore: Option<Arc<Semaphore>>,
part_metrics: Option<&PartitionMetrics>,
) -> Result<BoxedBatchReader> {
if let Some(semaphore) = semaphore.as_ref() {
// Read sources in parallel.
@@ -244,18 +245,24 @@ impl SeqScan {
}
let mut builder = MergeReaderBuilder::from_sources(sources);
if let Some(metrics) = part_metrics {
builder.with_metrics_reporter(Some(metrics.merge_metrics_reporter()));
}
let reader = builder.build().await?;
let dedup = !stream_ctx.input.append_mode;
let dedup_metrics_reporter = part_metrics.map(|m| m.dedup_metrics_reporter());
let reader = if dedup {
match stream_ctx.input.merge_mode {
MergeMode::LastRow => Box::new(DedupReader::new(
reader,
LastRow::new(stream_ctx.input.filter_deleted),
dedup_metrics_reporter,
)) as _,
MergeMode::LastNonNull => Box::new(DedupReader::new(
reader,
LastNonNull::new(stream_ctx.input.filter_deleted),
dedup_metrics_reporter,
)) as _,
}
} else {
@@ -277,6 +284,7 @@ impl SeqScan {
stream_ctx: &StreamContext,
mut sources: Vec<BoxedRecordBatchStream>,
semaphore: Option<Arc<Semaphore>>,
part_metrics: Option<&PartitionMetrics>,
) -> Result<BoxedRecordBatchStream> {
if let Some(semaphore) = semaphore.as_ref() {
// Read sources in parallel.
@@ -290,15 +298,20 @@ impl SeqScan {
let mapper = stream_ctx.input.mapper.as_flat().unwrap();
let schema = mapper.input_arrow_schema(stream_ctx.input.compaction);
let reader = FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE).await?;
let metrics_reporter = part_metrics.map(|m| m.merge_metrics_reporter());
let reader =
FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE, metrics_reporter)
.await?;
let dedup = !stream_ctx.input.append_mode;
let dedup_metrics_reporter = part_metrics.map(|m| m.dedup_metrics_reporter());
let reader = if dedup {
match stream_ctx.input.merge_mode {
MergeMode::LastRow => Box::pin(
FlatDedupReader::new(
reader.into_stream().boxed(),
FlatLastRow::new(stream_ctx.input.filter_deleted),
dedup_metrics_reporter,
)
.into_stream(),
) as _,
@@ -309,6 +322,7 @@ impl SeqScan {
mapper.field_column_start(),
stream_ctx.input.filter_deleted,
),
dedup_metrics_reporter,
)
.into_stream(),
) as _,
@@ -409,7 +423,7 @@ impl SeqScan {
let mut metrics = ScannerMetrics::default();
let mut fetch_start = Instant::now();
let mut reader =
Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics))
.await?;
#[cfg(debug_assertions)]
let mut checker = crate::read::BatchChecker::default()
@@ -505,7 +519,7 @@ impl SeqScan {
let mut metrics = ScannerMetrics::default();
let mut fetch_start = Instant::now();
let mut reader =
Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone())
Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics))
.await?;
while let Some(record_batch) = reader.try_next().await? {

View File

@@ -438,6 +438,7 @@ impl SeriesDistributor {
&self.stream_ctx,
sources,
self.semaphore.clone(),
Some(&part_metrics),
)
.await?;
let mut metrics = SeriesDistributorMetrics::default();
@@ -519,9 +520,13 @@ impl SeriesDistributor {
}
// Builds a reader that merge sources from all parts.
let mut reader =
SeqScan::build_reader_from_sources(&self.stream_ctx, sources, self.semaphore.clone())
.await?;
let mut reader = SeqScan::build_reader_from_sources(
&self.stream_ctx,
sources,
self.semaphore.clone(),
Some(&part_metrics),
)
.await?;
let mut metrics = SeriesDistributorMetrics::default();
let mut fetch_start = Instant::now();

View File

@@ -617,17 +617,16 @@ impl MitoRegion {
.map(|meta| {
let region_id = self.region_id;
let origin_region_id = meta.region_id;
let (index_file_id, index_file_path, index_file_size) = if meta.index_file_size > 0
let (index_version, index_file_path, index_file_size) = if meta.index_file_size > 0
{
let index_file_path =
index_file_path(table_dir, meta.index_file_id(), path_type);
let index_file_path = index_file_path(table_dir, meta.index_id(), path_type);
(
Some(meta.index_file_id().file_id().to_string()),
meta.index_version,
Some(index_file_path),
Some(meta.index_file_size),
)
} else {
(None, None, None)
(0, None, None)
};
let visible = visible_ssts.contains(&meta.file_id);
ManifestSstEntry {
@@ -638,7 +637,7 @@ impl MitoRegion {
region_group: region_id.region_group(),
region_sequence: region_id.region_sequence(),
file_id: meta.file_id.to_string(),
index_file_id,
index_version,
level: meta.level,
file_path: sst_file_path(table_dir, meta.file_id(), path_type),
file_size: meta.file_size,

View File

@@ -63,7 +63,7 @@ use crate::region_write_ctx::RegionWriteCtx;
use crate::request::OptionOutputTx;
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::FormatType;
use crate::sst::file::RegionFileId;
use crate::sst::file::{RegionFileId, RegionIndexId};
use crate::sst::file_purger::{FilePurgerRef, create_file_purger};
use crate::sst::file_ref::FileReferenceManagerRef;
use crate::sst::index::intermediate::IntermediateManager;
@@ -867,8 +867,8 @@ impl RegionLoadCacheTask {
if file_meta.exists_index() {
let puffin_key = IndexKey::new(
file_meta.region_id,
file_meta.index_file_id().file_id(),
FileType::Puffin,
file_meta.file_id,
FileType::Puffin(file_meta.index_version),
);
if !file_cache.contains_key(&puffin_key) {
@@ -925,12 +925,18 @@ impl RegionLoadCacheTask {
break;
}
let index_remote_path = location::index_file_path(
table_dir,
let index_version = if let FileType::Puffin(version) = puffin_key.file_type {
version
} else {
unreachable!("`files_to_download` should only contains Puffin files");
};
let index_id = RegionIndexId::new(
RegionFileId::new(puffin_key.region_id, puffin_key.file_id),
path_type,
index_version,
);
let index_remote_path = location::index_file_path(table_dir, index_id, path_type);
match file_cache
.download(puffin_key, &index_remote_path, object_store, file_size)
.await

View File

@@ -428,7 +428,7 @@ mod tests {
available_indexes: SmallVec::new(),
indexes: Default::default(),
index_file_size: 0,
index_file_id: None,
index_version: 0,
num_rows: 100,
num_row_groups: 1,
sequence: NonZeroU64::new(1),

View File

@@ -28,7 +28,7 @@ use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use store_api::metadata::ColumnMetadata;
use store_api::region_request::PathType;
use store_api::storage::{ColumnId, FileId, RegionId};
use store_api::storage::{ColumnId, FileId, IndexVersion, RegionId};
use crate::access_layer::AccessLayerRef;
use crate::cache::CacheManagerRef;
@@ -117,6 +117,41 @@ impl fmt::Display for RegionFileId {
}
}
/// Unique identifier for an index file, combining the SST file ID and the index version.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct RegionIndexId {
pub file_id: RegionFileId,
pub version: IndexVersion,
}
impl RegionIndexId {
pub fn new(file_id: RegionFileId, version: IndexVersion) -> Self {
Self { file_id, version }
}
pub fn region_id(&self) -> RegionId {
self.file_id.region_id
}
pub fn file_id(&self) -> FileId {
self.file_id.file_id
}
}
impl fmt::Display for RegionIndexId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.version == 0 {
write!(f, "{}/{}", self.file_id.region_id, self.file_id.file_id)
} else {
write!(
f,
"{}/{}.{}",
self.file_id.region_id, self.file_id.file_id, self.version
)
}
}
}
/// Time range (min and max timestamps) of a SST file.
/// Both min and max are inclusive.
pub type FileTimeRange = (Timestamp, Timestamp);
@@ -159,12 +194,10 @@ pub struct FileMeta {
pub indexes: Vec<ColumnIndexMetadata>,
/// Size of the index file.
pub index_file_size: u64,
/// File ID of the index file.
///
/// When this field is None, it means the index file id is the same as the file id.
/// Only meaningful when index_file_size > 0.
/// Used for rebuilding index files.
pub index_file_id: Option<FileId>,
/// Version of the index file.
/// Used to generate the index file name: "{file_id}.{index_version}.puffin".
/// Default is 0 (which maps to "{file_id}.puffin" for compatibility).
pub index_version: u64,
/// Number of rows in the file.
///
/// For historical reasons, this field might be missing in old files. Thus
@@ -273,6 +306,11 @@ impl FileMeta {
!self.available_indexes.is_empty()
}
/// Whether the index file is up-to-date comparing to another file meta.
pub fn is_index_up_to_date(&self, other: &FileMeta) -> bool {
self.exists_index() && other.exists_index() && self.index_version >= other.index_version
}
/// Returns true if the file has an inverted index
pub fn inverted_index_available(&self) -> bool {
self.available_indexes.contains(&IndexType::InvertedIndex)
@@ -332,14 +370,9 @@ impl FileMeta {
RegionFileId::new(self.region_id, self.file_id)
}
/// Returns the cross-region index file id.
/// If the index file id is not set, returns the file id.
pub fn index_file_id(&self) -> RegionFileId {
if let Some(index_file_id) = self.index_file_id {
RegionFileId::new(self.region_id, index_file_id)
} else {
self.file_id()
}
/// Returns the RegionIndexId for this file.
pub fn index_id(&self) -> RegionIndexId {
RegionIndexId::new(self.file_id(), self.index_version)
}
}
@@ -376,14 +409,9 @@ impl FileHandle {
RegionFileId::new(self.inner.meta.region_id, self.inner.meta.file_id)
}
/// Returns the cross-region index file id.
/// If the index file id is not set, returns the file id.
pub fn index_file_id(&self) -> RegionFileId {
if let Some(index_file_id) = self.inner.meta.index_file_id {
RegionFileId::new(self.inner.meta.region_id, index_file_id)
} else {
self.file_id()
}
/// Returns the RegionIndexId for this file.
pub fn index_id(&self) -> RegionIndexId {
RegionIndexId::new(self.file_id(), self.inner.meta.index_version)
}
/// Returns the complete file path of the file.
@@ -409,6 +437,16 @@ impl FileHandle {
self.inner.compacting.store(compacting, Ordering::Relaxed);
}
pub fn index_outdated(&self) -> bool {
self.inner.index_outdated.load(Ordering::Relaxed)
}
pub fn set_index_outdated(&self, index_outdated: bool) {
self.inner
.index_outdated
.store(index_outdated, Ordering::Relaxed);
}
/// Returns a reference to the [FileMeta].
pub fn meta_ref(&self) -> &FileMeta {
&self.inner.meta
@@ -446,32 +484,43 @@ struct FileHandleInner {
meta: FileMeta,
compacting: AtomicBool,
deleted: AtomicBool,
index_outdated: AtomicBool,
file_purger: FilePurgerRef,
}
impl Drop for FileHandleInner {
fn drop(&mut self) {
self.file_purger
.remove_file(self.meta.clone(), self.deleted.load(Ordering::Relaxed));
self.file_purger.remove_file(
self.meta.clone(),
self.deleted.load(Ordering::Acquire),
self.index_outdated.load(Ordering::Acquire),
);
}
}
impl FileHandleInner {
/// There should only be one `FileHandleInner` for each file on a datanode
fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandleInner {
file_purger.new_file(&meta);
FileHandleInner {
meta,
compacting: AtomicBool::new(false),
deleted: AtomicBool::new(false),
index_outdated: AtomicBool::new(false),
file_purger,
}
}
}
/// Delete
/// Delete files for a region.
/// - `region_id`: Region id.
/// - `file_ids`: List of (file id, index version) tuples to delete.
/// - `delete_index`: Whether to delete the index file from the cache.
/// - `access_layer`: Access layer to delete files.
/// - `cache_manager`: Cache manager to remove files from cache.
pub async fn delete_files(
region_id: RegionId,
file_ids: &[(FileId, FileId)],
file_ids: &[(FileId, u64)],
delete_index: bool,
access_layer: &AccessLayerRef,
cache_manager: &Option<CacheManagerRef>,
@@ -484,12 +533,12 @@ pub async fn delete_files(
}
let mut deleted_files = Vec::with_capacity(file_ids.len());
for (file_id, index_file_id) in file_ids {
for (file_id, index_version) in file_ids {
let region_file_id = RegionFileId::new(region_id, *file_id);
match access_layer
.delete_sst(
&RegionFileId::new(region_id, *file_id),
&RegionFileId::new(region_id, *index_file_id),
&region_file_id,
&RegionIndexId::new(region_file_id, *index_version),
)
.await
{
@@ -509,32 +558,78 @@ pub async fn delete_files(
deleted_files
);
for (file_id, index_file_id) in file_ids {
if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) {
// Removes index file from the cache.
if delete_index {
write_cache
.remove(IndexKey::new(region_id, *index_file_id, FileType::Puffin))
.await;
}
for (file_id, index_version) in file_ids {
purge_index_cache_stager(
region_id,
delete_index,
access_layer,
cache_manager,
*file_id,
*index_version,
)
.await;
}
Ok(())
}
// Remove the SST file from the cache.
pub async fn delete_index(
region_index_id: RegionIndexId,
access_layer: &AccessLayerRef,
cache_manager: &Option<CacheManagerRef>,
) -> crate::error::Result<()> {
access_layer.delete_index(region_index_id).await?;
purge_index_cache_stager(
region_index_id.region_id(),
true,
access_layer,
cache_manager,
region_index_id.file_id(),
region_index_id.version,
)
.await;
Ok(())
}
async fn purge_index_cache_stager(
region_id: RegionId,
delete_index: bool,
access_layer: &AccessLayerRef,
cache_manager: &Option<CacheManagerRef>,
file_id: FileId,
index_version: u64,
) {
if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) {
// Removes index file from the cache.
if delete_index {
write_cache
.remove(IndexKey::new(region_id, *file_id, FileType::Parquet))
.remove(IndexKey::new(
region_id,
file_id,
FileType::Puffin(index_version),
))
.await;
}
// Purges index content in the stager.
if let Err(e) = access_layer
.puffin_manager_factory()
.purge_stager(RegionFileId::new(region_id, *index_file_id))
.await
{
error!(e; "Failed to purge stager with index file, file_id: {}, region: {}",
index_file_id, region_id);
}
// Remove the SST file from the cache.
write_cache
.remove(IndexKey::new(region_id, file_id, FileType::Parquet))
.await;
}
// Purges index content in the stager.
if let Err(e) = access_layer
.puffin_manager_factory()
.purge_stager(RegionIndexId::new(
RegionFileId::new(region_id, file_id),
index_version,
))
.await
{
error!(e; "Failed to purge stager with index file, file_id: {}, index_version: {}, region: {}",
file_id, index_version, region_id);
}
Ok(())
}
#[cfg(test)]
@@ -563,7 +658,7 @@ mod tests {
created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
}],
index_file_size: 0,
index_file_id: None,
index_version: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,
@@ -614,7 +709,7 @@ mod tests {
created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
}],
index_file_size: 0,
index_file_id: None,
index_version: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,

View File

@@ -21,7 +21,7 @@ use crate::access_layer::AccessLayerRef;
use crate::cache::CacheManagerRef;
use crate::error::Result;
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::file::{FileMeta, delete_files};
use crate::sst::file::{FileMeta, delete_files, delete_index};
use crate::sst::file_ref::FileReferenceManagerRef;
/// A worker to delete files in background.
@@ -29,7 +29,8 @@ pub trait FilePurger: Send + Sync + fmt::Debug {
/// Send a request to remove the file.
/// If `is_delete` is true, the file will be deleted from the storage.
/// Otherwise, only the reference will be removed.
fn remove_file(&self, file_meta: FileMeta, is_delete: bool);
/// If `index_outdated` is true, the index file will be deleted regardless of `is_delete`.
fn remove_file(&self, file_meta: FileMeta, is_delete: bool, index_outdated: bool);
/// Notify the purger of a new file created.
/// This is useful for object store based storage, where we need to track the file references
@@ -46,7 +47,7 @@ pub type FilePurgerRef = Arc<dyn FilePurger>;
pub struct NoopFilePurger;
impl FilePurger for NoopFilePurger {
fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool) {
fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool, _index_outdated: bool) {
// noop
}
}
@@ -129,7 +130,7 @@ impl LocalFilePurger {
if let Err(e) = self.scheduler.schedule(Box::pin(async move {
if let Err(e) = delete_files(
file_meta.region_id,
&[(file_meta.file_id, file_meta.index_file_id().file_id())],
&[(file_meta.file_id, file_meta.index_id().version)],
file_meta.exists_index(),
&sst_layer,
&cache_manager,
@@ -142,12 +143,27 @@ impl LocalFilePurger {
error!(e; "Failed to schedule the file purge request");
}
}
fn delete_index(&self, file_meta: FileMeta) {
let sst_layer = self.sst_layer.clone();
let cache_manager = self.cache_manager.clone();
if let Err(e) = self.scheduler.schedule(Box::pin(async move {
let index_id = file_meta.index_id();
if let Err(e) = delete_index(index_id, &sst_layer, &cache_manager).await {
error!(e; "Failed to delete index for file {:?} from storage", file_meta);
}
})) {
error!(e; "Failed to schedule the index purge request");
}
}
}
impl FilePurger for LocalFilePurger {
fn remove_file(&self, file_meta: FileMeta, is_delete: bool) {
fn remove_file(&self, file_meta: FileMeta, is_delete: bool, index_outdated: bool) {
if is_delete {
self.delete_file(file_meta);
} else if index_outdated {
self.delete_index(file_meta);
}
}
}
@@ -158,7 +174,7 @@ pub struct ObjectStoreFilePurger {
}
impl FilePurger for ObjectStoreFilePurger {
fn remove_file(&self, file_meta: FileMeta, _is_delete: bool) {
fn remove_file(&self, file_meta: FileMeta, _is_delete: bool, _index_outdated: bool) {
// if not on local file system, instead inform the global file purger to remove the file reference.
// notice that no matter whether the file is deleted or not, we need to remove the reference
// because the file is no longer in use nonetheless.
@@ -187,6 +203,7 @@ mod tests {
use crate::schedule::scheduler::{LocalScheduler, Scheduler};
use crate::sst::file::{
ColumnIndexMetadata, FileHandle, FileMeta, FileTimeRange, IndexType, RegionFileId,
RegionIndexId,
};
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
@@ -237,7 +254,7 @@ mod tests {
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
index_file_id: None,
index_version: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,
@@ -263,6 +280,7 @@ mod tests {
let dir_path = dir.path().display().to_string();
let builder = Fs::default().root(&dir_path);
let sst_file_id = RegionFileId::new(RegionId::new(0, 0), FileId::random());
let index_file_id = RegionIndexId::new(sst_file_id, 0);
let sst_dir = "table1";
let index_aux_path = dir.path().join("index_aux");
@@ -285,7 +303,7 @@ mod tests {
let path = location::sst_file_path(sst_dir, sst_file_id, layer.path_type());
object_store.write(&path, vec![0; 4096]).await.unwrap();
let index_path = location::index_file_path(sst_dir, sst_file_id, layer.path_type());
let index_path = location::index_file_path(sst_dir, index_file_id, layer.path_type());
object_store
.write(&index_path, vec![0; 4096])
.await
@@ -309,7 +327,7 @@ mod tests {
created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
}],
index_file_size: 4096,
index_file_id: None,
index_version: 0,
num_rows: 1024,
num_row_groups: 1,
sequence: NonZeroU64::new(4096),

Some files were not shown because too many files have changed in this diff Show More