Compare commits

...

23 Commits

Author SHA1 Message Date
Lei, HUANG
3d942f6763 fix: bulk insert case sensitive (#6165)
* fix/bulk-insert-case-sensitive:
 Add error inspection for gRPC bulk insert in `greptime_handler.rs`

 - Enhanced error handling by adding `inspect_err` to log errors during the `put_record_batch` operation in `greptime_handler.rs`.

* fix: silient error while bulk ingest with uppercase columns
2025-05-24 07:02:42 +00:00
discord9
3901863432 chore: metasrv starting not blocking (#6158)
* chore: metasrv starting not blocking

* chore: fmt

* chore: expose actual bind_addr
2025-05-23 09:53:42 +00:00
Lei, HUANG
27e339f628 perf: optimize bulk encode decode (#6161)
* main:
 **Enhancements to Flight Data Handling and Error Management**

 - **Flight Data Handling:**
   - Added `bytes` dependency in `Cargo.lock` and `Cargo.toml`.
   - Introduced `try_from_schema_bytes` and `try_decode_record_batch` methods in `FlightDecoder` to handle schema and record batch decoding more efficiently in `src/common/grpc/src/flight.rs`.
   - Updated `Inserter` in `src/operator/src/bulk_insert.rs` to utilize schema bytes directly, improving bulk insert operations.

 - **Error Management:**
   - Added `ArrowError` handling in `src/common/grpc/src/error.rs` to manage errors related to Arrow operations.

 - **Region Request Processing:**
   - Modified `make_region_bulk_inserts` in `src/store-api/src/region_request.rs` to use the new `FlightDecoder` methods for decoding Arrow IPC data.

* - **Flight Data Handling:**
 - Added `bytes` dependency in `Cargo.lock` and `Cargo.toml`.
 - Introduced `try_from_schema_bytes` and `try_decode_record_batch` methods in `FlightDecoder` to handle schema and record batch decoding more efficiently in `src/common/grpc/src/flight.rs`.
 - Updated `Inserter` in `src/operator/src/bulk_insert.rs` to utilize schema bytes directly, improving bulk insert operations.
- **Error Management:**
 - Added `ArrowError` handling in `src/common/grpc/src/error.rs` to manage errors related to Arrow operations.
- **Region Request Processing:**
 - Modified `make_region_bulk_inserts` in `src/store-api/src/region_request.rs` to use the new `FlightDecoder` methods for decoding Arrow IPC data.

* perf/optimize-bulk-encode-decode:
 Update `greptime-proto` dependency and refactor error handling

 - **Dependency Update**: Updated the `greptime-proto` dependency to a new revision in `Cargo.lock` and `Cargo.toml`.
 - **Error Handling Refactor**: Removed the `Prost` error variant from `MetadataError` in `src/store-api/src/metadata.rs`.
 - **Error Handling Improvement**: Replaced `unwrap` with `context(FlightCodecSnafu)` for error handling in `make_region_bulk_inserts` function in `src/store-api/src/region_request.rs`.

* fix: clippy

* fix: toml

* perf/optimize-bulk-encode-decode:
 ### Update `Cargo.toml` Dependencies

 - Updated the `bytes` dependency to use the workspace version in `Cargo.toml`.

* perf/optimize-bulk-encode-decode:
 **Fix payload assignment in `bulk_insert.rs`**

 - Corrected the assignment of the `payload` field in the `ArrowIpc` struct within the `Inserter` implementation in `bulk_insert.rs`.

* use main branch proto
2025-05-23 07:22:10 +00:00
discord9
cf2712e6f4 chore: invalid table flow mapping cache (#6135)
* chore: invalid table flow mapping

* chore: exists

* fix: invalid all related keys in kv cache when drop flow&refactor: per review

* fix: flow not found status code

* chore: rm unused error code

* chore: stuff

* chore: unused
2025-05-23 03:40:10 +00:00
Lei, HUANG
4b71e493f7 feat!: revise compaction picker (#6121)
* - **Refactor `RegionFilePathFactory` to `RegionFilePathProvider`:** Updated references and implementations in `access_layer.rs`, `write_cache.rs`, and related test files to use the new struct name.
 - **Add `max_file_size` support in compaction:** Introduced `max_file_size` option in `PickerOutput`, `SerializedPickerOutput`, and `WriteOptions` in `compactor.rs`, `picker.rs`, `twcs.rs`, and `window.rs`.
 - **Enhance Parquet writing logic:** Modified `parquet.rs` and `parquet/writer.rs` to support optional `max_file_size` and added a test case `test_write_multiple_files` to verify writing multiple files based on size constraints.

 **Refactor Parquet Writer Initialization and File Handling**
 - Updated `ParquetWriter` in `writer.rs` to handle `current_indexer` as an `Option`, allowing for more flexible initialization and management.
 - Introduced `finish_current_file` method to encapsulate logic for completing and transitioning between SST files, improving code clarity and maintainability.
 - Enhanced error handling and logging with `debug` statements for better traceability during file operations.

 - **Removed Output Size Enforcement in `twcs.rs`:**
   - Deleted the `enforce_max_output_size` function and related logic to simplify compaction input handling.

 - **Added Max File Size Option in `parquet.rs`:**
   - Introduced `max_file_size` in `WriteOptions` to control the maximum size of output files.

 - **Refactored Indexer Management in `parquet/writer.rs`:**
   - Changed `current_indexer` from an `Option` to a direct `Indexer` type.
   - Implemented `roll_to_next_file` to handle file transitions when exceeding `max_file_size`.
   - Simplified indexer initialization and management logic.

 - **Refactored SST File Handling**:
   - Introduced `FilePathProvider` trait and its implementations (`WriteCachePathProvider`, `RegionFilePathFactory`) to manage SST and index file paths.
   - Updated `AccessLayer`, `WriteCache`, and `ParquetWriter` to use `FilePathProvider` for path management.
   - Modified `SstWriteRequest` and `SstUploadRequest` to use path providers instead of direct paths.
   - Files affected: `access_layer.rs`, `write_cache.rs`, `parquet.rs`, `writer.rs`.

 - **Enhanced Indexer Management**:
   - Replaced `IndexerBuilder` with `IndexerBuilderImpl` and made it async to support dynamic indexer creation.
   - Updated `ParquetWriter` to handle multiple indexers and file IDs.
   - Files affected: `index.rs`, `parquet.rs`, `writer.rs`.

 - **Removed Redundant File ID Handling**:
   - Removed `file_id` from `SstWriteRequest` and `CompactionOutput`.
   - Updated related logic to dynamically generate file IDs where necessary.
   - Files affected: `compaction.rs`, `flush.rs`, `picker.rs`, `twcs.rs`, `window.rs`.

 - **Test Adjustments**:
   - Updated tests to align with new path and indexer management.
   - Introduced `FixedPathProvider` and `NoopIndexBuilder` for testing purposes.
   - Files affected: `sst_util.rs`, `version_util.rs`, `parquet.rs`.

* chore: rebase main

* feat/multiple-compaction-output:
 ### Add Benchmarking and Refactor Compaction Logic

 - **Benchmarking**: Added a new benchmark `run_bench` in `Cargo.toml` and implemented benchmarks in `benches/run_bench.rs` using Criterion for `find_sorted_runs` and `reduce_runs` functions.
 - **Compaction Module Enhancements**:
   - Made `run.rs` public and refactored the `Ranged` and `Item` traits to be public.
   - Simplified the logic in `find_sorted_runs` and `reduce_runs` by removing `MergeItems` and related functions.
   - Introduced `find_overlapping_items` for identifying overlapping items.
 - **Code Cleanup**: Removed redundant code and tests related to `MergeItems` in `run.rs`.

* feat/multiple-compaction-output:
 ### Enhance Compaction Logic and Add Benchmarks

 - **Compaction Logic Improvements**:
   - Updated `reduce_runs` function in `src/mito2/src/compaction/run.rs` to remove the target parameter and improve the logic for selecting files to merge based on minimum penalty.
   - Enhanced `find_overlapping_items` to handle unsorted inputs and improve overlap detection efficiency.

 - **Benchmark Enhancements**:
   - Added `bench_find_overlapping_items` in `src/mito2/benches/run_bench.rs` to benchmark the new `find_overlapping_items` function.
   - Extended existing benchmarks to include larger data sizes.

 - **Testing Enhancements**:
   - Updated tests in `src/mito2/src/compaction/run.rs` to reflect changes in `reduce_runs` and added new tests for `find_overlapping_items`.

 - **Logging and Debugging**:
   - Improved logging in `src/mito2/src/compaction/twcs.rs` to provide more detailed information about compaction decisions.

* feat/multiple-compaction-output:
 ### Refactor and Enhance Compaction Logic

 - **Refactor `find_overlapping_items` Function**: Changed the function signature to accept slices instead of mutable vectors in `run.rs`.
 - **Rename and Update Struct Fields**: Renamed `penalty` to `size` in `SortedRun` struct and updated related logic in `run.rs`.
 - **Enhance `reduce_runs` Function**: Improved logic to sort runs by size and limit probe runs to 100 in `run.rs`.
 - **Add `merge_seq_files` Function**: Introduced a new function `merge_seq_files` in `run.rs` for merging sequential files.
 - **Modify `TwcsPicker` Logic**: Updated the compaction logic to use `merge_seq_files` when only one run is found in `twcs.rs`.
 - **Remove `enforce_file_num` Function**: Deleted the `enforce_file_num` function and its related test cases in `twcs.rs`.

* feat/multiple-compaction-output:
 ### Enhance Compaction Logic and Testing

 - **Add `merge_seq_files` Functionality**: Implemented the `merge_seq_files` function in `run.rs` to optimize file merging based on scoring systems. Updated
 benchmarks in `run_bench.rs` to include `bench_merge_seq_files`.
 - **Improve Compaction Strategy in `twcs.rs`**: Modified the compaction logic to handle file merging more effectively, considering file size and overlap.
 - **Update Tests**: Enhanced test coverage in `compaction_test.rs` and `append_mode_test.rs` to validate new compaction logic and file merging strategies.
 - **Remove Unused Function**: Deleted `new_file_handles` from `test_util.rs` as it was no longer needed.

* feat/multiple-compaction-output:
 ### Refactor TWCS Compaction Options

 - **Refactor Compaction Logic**: Simplified the TWCS compaction logic by replacing multiple parameters (`max_active_window_runs`, `max_active_window_files`, `max_inactive_window_runs`, `max_inactive_window_files`) with a single `trigger_file_num` parameter in `picker.rs`, `twcs.rs`, and `options.rs`.
 - **Update Tests**: Adjusted test cases to reflect the new compaction logic in `append_mode_test.rs`, `compaction_test.rs`, `filter_deleted_test.rs`, `merge_mode_test.rs`, and various test files under `tests/cases`.
 - **Modify Engine Options**: Updated engine option keys to use `trigger_file_num` in `mito_engine_options.rs` and `region_request.rs`.
 - **Fuzz Testing**: Updated fuzz test generators and translators to accommodate the new compaction parameter in `alter_expr.rs` and related files.

 This refactor aims to streamline the compaction configuration by reducing the number of parameters and simplifying the codebase.

* chore: add trailing space

* fix license header

* feat/revise-compaction-picker:
 **Limit File Processing and Optimize Merge Logic in `run.rs`**

 - Introduced a limit to process a maximum of 100 files in `merge_seq_files` to control time complexity.
 - Adjusted logic to calculate `target_size` and iterate over files using the limited set of files.
 - Updated scoring calculations to use the limited file set, ensuring efficient file merging.

* feat/revise-compaction-picker:
 ### Add Compaction Metrics and Remove Debug Logging

 - **Compaction Metrics**: Introduced new histograms `COMPACTION_INPUT_BYTES` and `COMPACTION_OUTPUT_BYTES` to track compaction input and output file sizes in `metrics.rs`. Updated `compactor.rs` to observe these metrics during the compaction process.
 - **Logging Cleanup**: Removed debug logging of file ranges during the merge process in `twcs.rs`.

* feat/revise-compaction-picker:
 ## Enhance Compaction Logic and Metrics

 - **Compaction Logic Improvements**:
   - Added methods `input_file_size` and `output_file_size` to `MergeOutput` in `compactor.rs` to streamline file size calculations.
   - Updated `Compactor` implementation to use these methods for metrics tracking.
   - Modified `Ranged` trait logic in `run.rs` to improve range comparison.
   - Enhanced test cases in `run.rs` to reflect changes in compaction logic.

 - **Metrics Enhancements**:
   - Changed `COMPACTION_INPUT_BYTES` and `COMPACTION_OUTPUT_BYTES` from histograms to counters in `metrics.rs` for better performance tracking.

 - **Debugging and Logging**:
   - Added detailed logging for compaction pick results in `twcs.rs`.
   - Implemented custom `Debug` trait for `FileMeta` in `file.rs` to improve debugging output.

 - **Testing Enhancements**:
   - Added new test `test_compaction_overlapping_files` in `compaction_test.rs` to verify compaction behavior with overlapping files.
   - Updated `merge_mode_test.rs` to reflect changes in file handling during scans.

* feat/revise-compaction-picker:
 ### Update `FileHandle` Debug Implementation

 - **Refactor Debug Output**: Simplified the `fmt::Debug` implementation for `FileHandle` in `src/mito2/src/sst/file.rs` by consolidating multiple fields into a single `meta` field using `meta_ref()`.
 - **Atomic Operations**: Updated the `deleted` field to use atomic loading with `Ordering::Relaxed`.

* Trigger CI

* feat/revise-compaction-picker:
 **Update compaction logic and default options**

 - **`twcs.rs`**: Enhanced logging for compaction pick results by improving the formatting for better readability.
 - **`options.rs`**: Modified the default `max_output_file_size` in `TwcsOptions` from 2GB to 512MB to optimize file handling and performance.

* feat/revise-compaction-picker:
 Refactor `find_overlapping_items` to use an external result vector

 - Updated `find_overlapping_items` in `src/mito2/src/compaction/run.rs` to accept a mutable result vector instead of returning a new vector, improving memory efficiency.
 - Modified benchmarks in `src/mito2/benches/bench_compaction_picker.rs` to accommodate the new function signature.
 - Adjusted tests in `src/mito2/src/compaction/run.rs` to use the updated function signature, ensuring correct functionality with the new approach.

* feat/revise-compaction-picker:
 Improve file merging logic in `run.rs`

 - Refactor the loop logic in `merge_seq_files` to simplify the iteration over file groups.
 - Adjust the range for `end_idx` to include the endpoint, allowing for more flexible group selection.
 - Remove the condition that skips groups with only one file, enabling more comprehensive processing of file sequences.

* feat/revise-compaction-picker:
 Enhance `find_overlapping_items` with `SortedRun` and Update Tests

 - Refactor `find_overlapping_items` in `src/mito2/src/compaction/run.rs` to utilize the `SortedRun` struct for improved efficiency and clarity.
 - Introduce a `sorted` flag in `SortedRun` to optimize sorting operations.
 - Update test cases in `src/mito2/benches/bench_compaction_picker.rs` to accommodate changes in `find_overlapping_items` by using `SortedRun`.
 - Add `From<Vec<T>>` implementation for `SortedRun` to facilitate easy conversion from vectors.

* feat/revise-compaction-picker:
 **Enhancements in `compaction/run.rs`:**

 - Added `ReadableSize` import to handle size calculations.
 - Modified the logic in `merge_seq_files` to clamp the calculated target size to a maximum of 2GB when `max_file_size` is not provided.

* feat/revise-compaction-picker: Add Default Max Output Size Constant for Compaction

Introduce DEFAULT_MAX_OUTPUT_SIZE constant to define the default maximum compaction output file size as 2GB. Refactor the merge_seq_files function to utilize this constant, ensuring consistent and maintainable code for handling file size limits during compaction.
2025-05-23 03:29:08 +00:00
Ruihang Xia
bf496e05cc ci: turn off fail fast strategy (#6157)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-05-23 02:38:25 +00:00
zyy17
513ca951ee chore: add the missing v prefix for NEXT_RELEASE_VERSION variable (#6160)
chore: add 'v' prefix for NEXT_RELEASE_VERSION variable
2025-05-22 10:33:14 +00:00
Ruihang Xia
791f530a78 fix: require input ordering in series divide plan (#6148)
* require input ordering in series divide plan

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add sqlness case

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* finilise

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-05-22 07:04:25 +00:00
Ning Sun
1de6d8c619 fix: ident value in set search_path (#6153)
* fix: ident value in set search_path

* refactor: remove unneeded clone
2025-05-22 03:58:18 +00:00
discord9
a4d0420727 fix(flow): flow task run interval (#6100)
* fix: always check for shutdown signal in flow
chore: correct log msg for flows that shouldn't exist
feat: use time window size/2 as sleep interval

* chore: better slower query refresh time

* chore

* refactor: per review
2025-05-22 03:27:26 +00:00
discord9
fc6300a2ba feat(flow): support prom ql(in tql) in flow (#6063)
* feat: support parse prom ql in create flow

* refactor

* fix: just run tql unmodified

* refactor: determine type faster

* fix: pass original query

* tests: sqlness

* test: fix format&chore

* fix: get raw query

* test: fix sqlness randomness

* chore: what's the box for?

* test: location_to_index

* test: make sqlness more determinstic

* fix: tmp add sleep 1s after flush_flow

* undo test sleep 1s&rm done todo

* chore: more tests
2025-05-22 03:06:09 +00:00
liyang
f55af5838c ci: add issues write permission (#6145)
fixed to: https://github.com/GreptimeTeam/greptimedb/actions/runs/15155518237/job/42610589439
2025-05-21 15:53:01 +00:00
Lei, HUANG
5a0da5b6bb fix: region worker stall metrics (#6149)
fix/stall-metrics:
 Improve stalled request handling in `handle_write.rs`

 - Updated logic to account for both `write_requests` and `bulk_requests` when adjusting `stalled_count`.
 - Modified `reject_region_stalled_requests` and `handle_region_stalled_requests` to correctly subtract the combined length of `requests` and `bulk` from `stalled_count`.
2025-05-21 13:21:50 +00:00
Lei, HUANG
d5f0006864 fix: flaky prom gateway test (#6146)
fix/flaky-prom-gateway-test:
 **Refactor gRPC Test Assertions in `grpc.rs`**

 - Updated test assertions for `test_prom_gateway_query` to improve clarity and maintainability.
 - Replaced direct comparison with expected `PrometheusJsonResponse` objects with individual field assertions.
 - Added sorting for `vector` and `matrix` results to ensure consistent test outcomes.
2025-05-21 09:31:58 +00:00
liyang
ede82331b2 docs: change docker run mount directory (#6142) 2025-05-21 07:05:21 +00:00
Ruihang Xia
56e696bd55 chore: remove stale wal config entries (#6134)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-05-20 19:42:09 +00:00
ZonaHe
bc0cdf62ba feat: update dashboard to v0.9.2 (#6140)
Co-authored-by: ZonaHex <ZonaHex@users.noreply.github.com>
2025-05-20 19:41:29 +00:00
Lei, HUANG
eaf7b4b9dd chore: update flush failure metric name and update grafana dashboard (#6138)
* 1. rename `greptime_mito_flush_errors_total` metric to `greptime_mito_flush_errors_total` for consistency
2. update grafana dashboard to add following panel:
  - compaction input/output bytes
  - bulk insert handle elasped time in frontend and region worker
2025-05-20 12:05:54 +00:00
Ruihang Xia
7ae0e150e5 feat: support altering multiple logical table in one remote write request (#6137)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-05-20 11:22:38 +00:00
ZonaHe
43c30b55ae feat: update dashboard to v0.9.1 (#6132)
Co-authored-by: sunchanglong <sunchanglong@users.noreply.github.com>
2025-05-20 09:58:44 +00:00
liyang
153e80450a fix: update dev-build image tag (#6136) 2025-05-20 09:08:28 +00:00
jeremyhi
1624dc41c5 chore: reduce unnecessary txns in alter operations (#6133) 2025-05-20 08:29:49 +00:00
Ruihang Xia
300262562b feat: accommodate default column name with pre-created table schema (#6126)
* refactor: prepare_mocked_backend

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* modify request in place

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* apply to influx line protocol

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* return on empty alter expr list

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* expose to other write paths

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-05-20 07:22:13 +00:00
124 changed files with 11625 additions and 9135 deletions

View File

@@ -22,7 +22,6 @@ datanode:
[wal]
provider = "kafka"
broker_endpoints = ["kafka.kafka-cluster.svc.cluster.local:9092"]
linger = "2ms"
overwrite_entry_start_id = true
frontend:
configData: |-

View File

@@ -16,7 +16,8 @@ function create_version() {
if [ -z "$NEXT_RELEASE_VERSION" ]; then
echo "NEXT_RELEASE_VERSION is empty, use version from Cargo.toml" >&2
export NEXT_RELEASE_VERSION=$(grep '^version = ' Cargo.toml | cut -d '"' -f 2 | head -n 1)
# NOTE: Need a `v` prefix for the version string.
export NEXT_RELEASE_VERSION=v$(grep '^version = ' Cargo.toml | cut -d '"' -f 2 | head -n 1)
fi
if [ -z "$NIGHTLY_RELEASE_PREFIX" ]; then

View File

@@ -4,7 +4,7 @@ DEV_BUILDER_IMAGE_TAG=$1
update_dev_builder_version() {
if [ -z "$DEV_BUILDER_IMAGE_TAG" ]; then
echo "Error: Should specify the dev-builder image tag"
echo "Error: Should specify the dev-builder image tag"
exit 1
fi
@@ -17,7 +17,7 @@ update_dev_builder_version() {
git checkout -b $BRANCH_NAME
# Update the dev-builder image tag in the Makefile.
gsed -i "s/DEV_BUILDER_IMAGE_TAG ?=.*/DEV_BUILDER_IMAGE_TAG ?= ${DEV_BUILDER_IMAGE_TAG}/g" Makefile
sed -i "s/DEV_BUILDER_IMAGE_TAG ?=.*/DEV_BUILDER_IMAGE_TAG ?= ${DEV_BUILDER_IMAGE_TAG}/g" Makefile
# Commit the changes.
git add Makefile

View File

@@ -195,6 +195,7 @@ jobs:
runs-on: ubuntu-latest
timeout-minutes: 60
strategy:
fail-fast: false
matrix:
target: [ "unstable_fuzz_create_table_standalone" ]
steps:
@@ -299,6 +300,7 @@ jobs:
needs: build-greptime-ci
timeout-minutes: 60
strategy:
fail-fast: false
matrix:
target: [ "fuzz_create_table", "fuzz_alter_table", "fuzz_create_database", "fuzz_create_logical_table", "fuzz_alter_logical_table", "fuzz_insert", "fuzz_insert_logical_table" ]
mode:
@@ -431,6 +433,7 @@ jobs:
needs: build-greptime-ci
timeout-minutes: 60
strategy:
fail-fast: false
matrix:
target: ["fuzz_migrate_mito_regions", "fuzz_migrate_metric_regions", "fuzz_failover_mito_regions", "fuzz_failover_metric_regions"]
mode:
@@ -578,6 +581,7 @@ jobs:
needs: build
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ ubuntu-latest ]
mode:

View File

@@ -16,6 +16,7 @@ jobs:
runs-on: ubuntu-latest
permissions:
pull-requests: write # Add permissions to modify PRs
issues: write
timeout-minutes: 10
steps:
- uses: actions/checkout@v4

4
Cargo.lock generated
View File

@@ -2223,6 +2223,7 @@ version = "0.15.0"
dependencies = [
"api",
"arrow-flight",
"bytes",
"common-base",
"common-error",
"common-macro",
@@ -4352,6 +4353,7 @@ dependencies = [
"session",
"smallvec",
"snafu 0.8.5",
"sql",
"store-api",
"strum 0.27.1",
"substrait 0.15.0",
@@ -4855,7 +4857,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=7668a882d57ca6a2333146e0574b8f0c9d5008ae#7668a882d57ca6a2333146e0574b8f0c9d5008ae"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=67ee5f94e5da72314cda7d0eb90106eb1c16a1ae#67ee5f94e5da72314cda7d0eb90106eb1c16a1ae"
dependencies = [
"prost 0.13.5",
"serde",

View File

@@ -132,7 +132,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "7668a882d57ca6a2333146e0574b8f0c9d5008ae" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "67ee5f94e5da72314cda7d0eb90106eb1c16a1ae" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -8,7 +8,7 @@ CARGO_BUILD_OPTS := --locked
IMAGE_REGISTRY ?= docker.io
IMAGE_NAMESPACE ?= greptime
IMAGE_TAG ?= latest
DEV_BUILDER_IMAGE_TAG ?= 2025-04-15-1a517ec8-20250428023155
DEV_BUILDER_IMAGE_TAG ?= 2025-05-19-b2377d4b-20250520045554
BUILDX_MULTI_PLATFORM_BUILD ?= false
BUILDX_BUILDER_NAME ?= gtbuilder
BASE_IMAGE ?= ubuntu

View File

@@ -121,7 +121,7 @@ docker pull greptime/greptimedb
```shell
docker run -p 127.0.0.1:4000-4003:4000-4003 \
-v "$(pwd)/greptimedb:/greptimedb_data" \
-v "$(pwd)/greptimedb_data:/greptimedb_data" \
--name greptime --rm \
greptime/greptimedb:latest standalone start \
--http-addr 0.0.0.0:4000 \
@@ -129,7 +129,7 @@ docker run -p 127.0.0.1:4000-4003:4000-4003 \
--mysql-addr 0.0.0.0:4002 \
--postgres-addr 0.0.0.0:4003
```
Dashboard: [http://localhost:4000/dashboard](http://localhost:4000/dashboard)
Dashboard: [http://localhost:4000/dashboard](http://localhost:4000/dashboard)
[Full Install Guide](https://docs.greptime.com/getting-started/installation/overview)
**Troubleshooting:**
@@ -167,7 +167,7 @@ cargo run -- standalone start
## Project Status
> **Status:** Beta.
> **Status:** Beta.
> **GA (v1.0):** Targeted for mid 2025.
- Being used in production by early adopters
@@ -197,8 +197,8 @@ GreptimeDB is licensed under the [Apache License 2.0](https://apache.org/license
## Commercial Support
Running GreptimeDB in your organization?
We offer enterprise add-ons, services, training, and consulting.
Running GreptimeDB in your organization?
We offer enterprise add-ons, services, training, and consulting.
[Contact us](https://greptime.com/contactus) for details.
## Contributing

File diff suppressed because it is too large Load Diff

View File

@@ -46,6 +46,7 @@
| Ingest Rows per Instance | `sum by(instance, pod)(rate(greptime_table_operator_ingest_rows{instance=~"$frontend"}[$__rate_interval]))` | `timeseries` | Ingestion rate by row as in each frontend | `prometheus` | `rowsps` | `[{{instance}}]-[{{pod}}]` |
| Region Call QPS per Instance | `sum by(instance, pod, request_type) (rate(greptime_grpc_region_request_count{instance=~"$frontend"}[$__rate_interval]))` | `timeseries` | Region Call QPS per Instance. | `prometheus` | `ops` | `[{{instance}}]-[{{pod}}]-[{{request_type}}]` |
| Region Call P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, request_type) (rate(greptime_grpc_region_request_bucket{instance=~"$frontend"}[$__rate_interval])))` | `timeseries` | Region Call P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{request_type}}]` |
| Frontend Handle Bulk Insert Elapsed Time | `sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_sum[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_count[$__rate_interval]))`<br/>`histogram_quantile(0.99, sum by(instance, pod, stage, le) (rate(greptime_table_operator_handle_bulk_insert_bucket[$__rate_interval])))` | `timeseries` | Per-stage time for frontend to handle bulk insert requests | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG` |
# Mito Engine
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- |
@@ -67,6 +68,8 @@
| WAL sync duration seconds | `histogram_quantile(0.99, sum by(le, type, node, instance, pod) (rate(raft_engine_sync_log_duration_seconds_bucket[$__rate_interval])))` | `timeseries` | Raft engine (local disk) log store sync latency, p99 | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-p99` |
| Log Store op duration seconds | `histogram_quantile(0.99, sum by(le,logstore,optype,instance, pod) (rate(greptime_logstore_op_elapsed_bucket[$__rate_interval])))` | `timeseries` | Write-ahead log operations latency at p99 | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{logstore}}]-[{{optype}}]-p99` |
| Inflight Flush | `greptime_mito_inflight_flush_count` | `timeseries` | Ongoing flush task count | `prometheus` | `none` | `[{{instance}}]-[{{pod}}]` |
| Compaction Input/Output Bytes | `sum by(instance, pod) (greptime_mito_compaction_input_bytes)`<br/>`sum by(instance, pod) (greptime_mito_compaction_output_bytes)` | `timeseries` | Compaction oinput output bytes | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-input` |
| Region Worker Handle Bulk Insert Requests | `histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))`<br/>`sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to handle bulk insert region requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
# OpenDAL
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- |

View File

@@ -371,6 +371,21 @@ groups:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{request_type}}]'
- title: 'Frontend Handle Bulk Insert Elapsed Time '
type: timeseries
description: Per-stage time for frontend to handle bulk insert requests
unit: s
queries:
- expr: sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_sum[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_count[$__rate_interval]))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG'
- expr: histogram_quantile(0.99, sum by(instance, pod, stage, le) (rate(greptime_table_operator_handle_bulk_insert_bucket[$__rate_interval])))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-P95'
- title: Mito Engine
panels:
- title: Request OPS per Instance
@@ -562,6 +577,36 @@ groups:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]'
- title: Compaction Input/Output Bytes
type: timeseries
description: Compaction oinput output bytes
unit: bytes
queries:
- expr: sum by(instance, pod) (greptime_mito_compaction_input_bytes)
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-input'
- expr: sum by(instance, pod) (greptime_mito_compaction_output_bytes)
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-output'
- title: Region Worker Handle Bulk Insert Requests
type: timeseries
description: Per-stage elapsed time for region worker to handle bulk insert region requests.
unit: s
queries:
- expr: histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-P95'
- expr: sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG'
- title: OpenDAL
panels:
- title: QPS per Instance

File diff suppressed because it is too large Load Diff

View File

@@ -46,6 +46,7 @@
| Ingest Rows per Instance | `sum by(instance, pod)(rate(greptime_table_operator_ingest_rows{}[$__rate_interval]))` | `timeseries` | Ingestion rate by row as in each frontend | `prometheus` | `rowsps` | `[{{instance}}]-[{{pod}}]` |
| Region Call QPS per Instance | `sum by(instance, pod, request_type) (rate(greptime_grpc_region_request_count{}[$__rate_interval]))` | `timeseries` | Region Call QPS per Instance. | `prometheus` | `ops` | `[{{instance}}]-[{{pod}}]-[{{request_type}}]` |
| Region Call P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, request_type) (rate(greptime_grpc_region_request_bucket{}[$__rate_interval])))` | `timeseries` | Region Call P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{request_type}}]` |
| Frontend Handle Bulk Insert Elapsed Time | `sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_sum[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_count[$__rate_interval]))`<br/>`histogram_quantile(0.99, sum by(instance, pod, stage, le) (rate(greptime_table_operator_handle_bulk_insert_bucket[$__rate_interval])))` | `timeseries` | Per-stage time for frontend to handle bulk insert requests | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG` |
# Mito Engine
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- |
@@ -67,6 +68,8 @@
| WAL sync duration seconds | `histogram_quantile(0.99, sum by(le, type, node, instance, pod) (rate(raft_engine_sync_log_duration_seconds_bucket[$__rate_interval])))` | `timeseries` | Raft engine (local disk) log store sync latency, p99 | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-p99` |
| Log Store op duration seconds | `histogram_quantile(0.99, sum by(le,logstore,optype,instance, pod) (rate(greptime_logstore_op_elapsed_bucket[$__rate_interval])))` | `timeseries` | Write-ahead log operations latency at p99 | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{logstore}}]-[{{optype}}]-p99` |
| Inflight Flush | `greptime_mito_inflight_flush_count` | `timeseries` | Ongoing flush task count | `prometheus` | `none` | `[{{instance}}]-[{{pod}}]` |
| Compaction Input/Output Bytes | `sum by(instance, pod) (greptime_mito_compaction_input_bytes)`<br/>`sum by(instance, pod) (greptime_mito_compaction_output_bytes)` | `timeseries` | Compaction oinput output bytes | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-input` |
| Region Worker Handle Bulk Insert Requests | `histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))`<br/>`sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to handle bulk insert region requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
# OpenDAL
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
| --- | --- | --- | --- | --- | --- | --- |

View File

@@ -371,6 +371,21 @@ groups:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{request_type}}]'
- title: 'Frontend Handle Bulk Insert Elapsed Time '
type: timeseries
description: Per-stage time for frontend to handle bulk insert requests
unit: s
queries:
- expr: sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_sum[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_count[$__rate_interval]))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG'
- expr: histogram_quantile(0.99, sum by(instance, pod, stage, le) (rate(greptime_table_operator_handle_bulk_insert_bucket[$__rate_interval])))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-P95'
- title: Mito Engine
panels:
- title: Request OPS per Instance
@@ -562,6 +577,36 @@ groups:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]'
- title: Compaction Input/Output Bytes
type: timeseries
description: Compaction oinput output bytes
unit: bytes
queries:
- expr: sum by(instance, pod) (greptime_mito_compaction_input_bytes)
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-input'
- expr: sum by(instance, pod) (greptime_mito_compaction_output_bytes)
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-output'
- title: Region Worker Handle Bulk Insert Requests
type: timeseries
description: Per-stage elapsed time for region worker to handle bulk insert region requests.
unit: s
queries:
- expr: histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-P95'
- expr: sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))
datasource:
type: prometheus
uid: ${metrics}
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG'
- title: OpenDAL
panels:
- title: QPS per Instance

View File

@@ -10,6 +10,7 @@ workspace = true
[dependencies]
api.workspace = true
arrow-flight.workspace = true
bytes.workspace = true
common-base.workspace = true
common-error.workspace = true
common-macro.workspace = true

View File

@@ -0,0 +1,143 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use arrow_flight::FlightData;
use bytes::Bytes;
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
use common_recordbatch::{DfRecordBatch, RecordBatch};
use criterion::{criterion_group, criterion_main, Criterion};
use datatypes::arrow::array::{ArrayRef, Int64Array, StringArray, TimestampMillisecondArray};
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::VectorRef;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::vectors::Helper;
use prost::Message;
fn schema() -> SchemaRef {
let schema = Schema::new(vec![
ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), false),
ColumnSchema::new("v1", ConcreteDataType::int64_datatype(), false),
]);
Arc::new(schema)
}
/// Generate record batch according to provided schema and num rows.
fn prepare_random_record_batch(schema: SchemaRef, num_rows: usize) -> RecordBatch {
let tag_candidates = (0..10000).map(|i| i.to_string()).collect::<Vec<_>>();
let columns: Vec<VectorRef> = schema
.column_schemas()
.iter()
.map(|col| match &col.data_type {
ConcreteDataType::String(_) => {
let array = StringArray::from(
(0..num_rows)
.map(|_| {
let idx: usize = rand::random_range(0..10000);
format!("tag-{}", tag_candidates[idx])
})
.collect::<Vec<_>>(),
);
Helper::try_into_vector(Arc::new(array) as ArrayRef).unwrap()
}
ConcreteDataType::Timestamp(_) => {
let now = common_time::util::current_time_millis();
let array = TimestampMillisecondArray::from(
(0..num_rows).map(|i| now + i as i64).collect::<Vec<_>>(),
);
Helper::try_into_vector(Arc::new(array) as ArrayRef).unwrap()
}
ConcreteDataType::Int64(_) => {
let array = Int64Array::from((0..num_rows).map(|i| i as i64).collect::<Vec<_>>());
Helper::try_into_vector(Arc::new(array) as ArrayRef).unwrap()
}
_ => unreachable!(),
})
.collect();
RecordBatch::new(schema, columns).unwrap()
}
fn prepare_flight_data(num_rows: usize) -> (FlightData, FlightData) {
let schema = schema();
let mut encoder = FlightEncoder::default();
let schema_data = encoder.encode(FlightMessage::Schema(schema.clone()));
let rb = prepare_random_record_batch(schema, num_rows);
let rb_data = encoder.encode(FlightMessage::Recordbatch(rb));
(schema_data, rb_data)
}
fn decode_flight_data_from_protobuf(schema: &Bytes, payload: &Bytes) -> DfRecordBatch {
let schema = FlightData::decode(&schema[..]).unwrap();
let payload = FlightData::decode(&payload[..]).unwrap();
let mut decoder = FlightDecoder::default();
let _schema = decoder.try_decode(&schema).unwrap();
let message = decoder.try_decode(&payload).unwrap();
let FlightMessage::Recordbatch(batch) = message else {
unreachable!("unexpected message");
};
batch.into_df_record_batch()
}
fn decode_flight_data_from_header_and_body(
schema: &Bytes,
data_header: &Bytes,
data_body: &Bytes,
) -> DfRecordBatch {
let mut decoder = FlightDecoder::try_from_schema_bytes(schema).unwrap();
decoder
.try_decode_record_batch(data_header, data_body)
.unwrap()
}
fn bench_decode_flight_data(c: &mut Criterion) {
let row_counts = [100000, 200000, 1000000];
for row_count in row_counts {
let (schema, payload) = prepare_flight_data(row_count);
// arguments for decode_flight_data_from_protobuf
let schema_bytes = Bytes::from(schema.encode_to_vec());
let payload_bytes = Bytes::from(payload.encode_to_vec());
let mut group = c.benchmark_group(format!("flight_decoder_{}_rows", row_count));
group.bench_function("decode_from_protobuf", |b| {
b.iter(|| decode_flight_data_from_protobuf(&schema_bytes, &payload_bytes));
});
group.bench_function("decode_from_header_and_body", |b| {
b.iter(|| {
decode_flight_data_from_header_and_body(
&schema.data_header,
&payload.data_header,
&payload.data_body,
)
});
});
group.finish();
}
}
criterion_group!(benches, bench_decode_flight_data);
criterion_main!(benches);

View File

@@ -14,8 +14,10 @@
use criterion::criterion_main;
mod bench_flight_decoder;
mod channel_manager;
criterion_main! {
channel_manager::benches
channel_manager::benches,
bench_flight_decoder::benches
}

View File

@@ -18,6 +18,7 @@ use std::io;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use datatypes::arrow::error::ArrowError;
use snafu::{Location, Snafu};
pub type Result<T> = std::result::Result<T, Error>;
@@ -105,6 +106,14 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed arrow operation"))]
Arrow {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: ArrowError,
},
}
impl ErrorExt for Error {
@@ -123,6 +132,7 @@ impl ErrorExt for Error {
Error::CreateRecordBatch { source, .. } => source.status_code(),
Error::ConvertArrowSchema { source, .. } => source.status_code(),
Error::Arrow { .. } => StatusCode::Internal,
}
}

View File

@@ -21,16 +21,19 @@ use api::v1::{AffectedRows, FlightMetadata, Metrics};
use arrow_flight::utils::flight_data_to_arrow_batch;
use arrow_flight::{FlightData, SchemaAsIpc};
use common_base::bytes::Bytes;
use common_recordbatch::{RecordBatch, RecordBatches};
use common_recordbatch::{DfRecordBatch, RecordBatch, RecordBatches};
use datatypes::arrow;
use datatypes::arrow::buffer::Buffer;
use datatypes::arrow::datatypes::Schema as ArrowSchema;
use datatypes::arrow::ipc::{root_as_message, writer, MessageHeader};
use datatypes::arrow::error::ArrowError;
use datatypes::arrow::ipc::{convert, reader, root_as_message, writer, MessageHeader};
use datatypes::schema::{Schema, SchemaRef};
use flatbuffers::FlatBufferBuilder;
use prost::bytes::Bytes as ProstBytes;
use prost::Message;
use snafu::{OptionExt, ResultExt};
use crate::error;
use crate::error::{
ConvertArrowSchemaSnafu, CreateRecordBatchSnafu, DecodeFlightDataSnafu, InvalidFlightDataSnafu,
Result,
@@ -124,9 +127,60 @@ impl FlightEncoder {
#[derive(Default)]
pub struct FlightDecoder {
schema: Option<SchemaRef>,
schema_bytes: Option<bytes::Bytes>,
}
impl FlightDecoder {
/// Build a [FlightDecoder] instance from provided schema bytes.
pub fn try_from_schema_bytes(schema_bytes: &bytes::Bytes) -> Result<Self> {
let arrow_schema = convert::try_schema_from_flatbuffer_bytes(&schema_bytes[..])
.context(error::ArrowSnafu)?;
let schema = Arc::new(Schema::try_from(arrow_schema).context(ConvertArrowSchemaSnafu)?);
Ok(Self {
schema: Some(schema),
schema_bytes: Some(schema_bytes.clone()),
})
}
pub fn try_decode_record_batch(
&mut self,
data_header: &bytes::Bytes,
data_body: &bytes::Bytes,
) -> Result<DfRecordBatch> {
let schema = self
.schema
.as_ref()
.context(InvalidFlightDataSnafu {
reason: "Should have decoded schema first!",
})?
.clone();
let arrow_schema = schema.arrow_schema().clone();
let message = root_as_message(&data_header[..])
.map_err(|err| {
ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
})
.context(error::ArrowSnafu)?;
let result = message
.header_as_record_batch()
.ok_or_else(|| {
ArrowError::ParseError(
"Unable to convert flight data header to a record batch".to_string(),
)
})
.and_then(|batch| {
reader::read_record_batch(
&Buffer::from(data_body.as_ref()),
batch,
arrow_schema,
&HashMap::new(),
None,
&message.version(),
)
})
.context(error::ArrowSnafu)?;
Ok(result)
}
pub fn try_decode(&mut self, flight_data: &FlightData) -> Result<FlightMessage> {
let message = root_as_message(&flight_data.data_header).map_err(|e| {
InvalidFlightDataSnafu {
@@ -162,7 +216,7 @@ impl FlightDecoder {
Arc::new(Schema::try_from(arrow_schema).context(ConvertArrowSchemaSnafu)?);
self.schema = Some(schema.clone());
self.schema_bytes = Some(flight_data.data_header.clone());
Ok(FlightMessage::Schema(schema))
}
MessageHeader::RecordBatch => {
@@ -196,6 +250,10 @@ impl FlightDecoder {
pub fn schema(&self) -> Option<&SchemaRef> {
self.schema.as_ref()
}
pub fn schema_bytes(&self) -> Option<bytes::Bytes> {
self.schema_bytes.clone()
}
}
pub fn flight_messages_to_recordbatches(messages: Vec<FlightMessage>) -> Result<RecordBatches> {

View File

@@ -16,9 +16,12 @@ use std::sync::Arc;
use crate::error::Result;
use crate::flow_name::FlowName;
use crate::instruction::CacheIdent;
use crate::instruction::{CacheIdent, DropFlow};
use crate::key::flow::flow_info::FlowInfoKey;
use crate::key::flow::flow_name::FlowNameKey;
use crate::key::flow::flow_route::FlowRouteKey;
use crate::key::flow::flownode_flow::FlownodeFlowKey;
use crate::key::flow::table_flow::TableFlowKey;
use crate::key::schema_name::SchemaNameKey;
use crate::key::table_info::TableInfoKey;
use crate::key::table_name::TableNameKey;
@@ -89,9 +92,40 @@ where
let key: SchemaNameKey = schema_name.into();
self.invalidate_key(&key.to_bytes()).await;
}
CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_) => {
CacheIdent::CreateFlow(_) => {
// Do nothing
}
CacheIdent::DropFlow(DropFlow {
flow_id,
source_table_ids,
flow_part2node_id,
}) => {
// invalidate flow route/flownode flow/table flow
let mut keys = Vec::with_capacity(
source_table_ids.len() * flow_part2node_id.len()
+ flow_part2node_id.len() * 2,
);
for table_id in source_table_ids {
for (partition_id, node_id) in flow_part2node_id {
let key =
TableFlowKey::new(*table_id, *node_id, *flow_id, *partition_id)
.to_bytes();
keys.push(key);
}
}
for (partition_id, node_id) in flow_part2node_id {
let key =
FlownodeFlowKey::new(*node_id, *flow_id, *partition_id).to_bytes();
keys.push(key);
let key = FlowRouteKey::new(*flow_id, *partition_id).to_bytes();
keys.push(key);
}
for key in keys {
self.invalidate_key(&key).await;
}
}
CacheIdent::FlowName(FlowName {
catalog_name,
flow_name,

View File

@@ -256,6 +256,11 @@ impl DatanodeTableManager {
})?
.and_then(|r| DatanodeTableValue::try_from_raw_value(&r.value))?
.region_info;
// If the region options are the same, we don't need to update it.
if region_info.region_options == new_region_options {
return Ok(Txn::new());
}
// substitute region options only.
region_info.region_options = new_region_options;

View File

@@ -45,7 +45,7 @@ use crate::kv_backend::KvBackendRef;
use crate::rpc::store::BatchDeleteRequest;
/// The key of `__flow/` scope.
#[derive(Debug, PartialEq)]
#[derive(Debug, Clone, PartialEq)]
pub struct FlowScoped<T> {
inner: T,
}

View File

@@ -153,6 +153,15 @@ impl FlowInfoValue {
&self.flownode_ids
}
/// Insert a new flownode id for a partition.
pub fn insert_flownode_id(
&mut self,
partition: FlowPartitionId,
node: FlownodeId,
) -> Option<FlownodeId> {
self.flownode_ids.insert(partition, node)
}
/// Returns the `source_table`.
pub fn source_table_ids(&self) -> &[TableId] {
&self.source_table_ids

View File

@@ -42,7 +42,7 @@ lazy_static! {
/// The key stores the route info of the flow.
///
/// The layout: `__flow/route/{flow_id}/{partition_id}`.
#[derive(Debug, PartialEq)]
#[derive(Debug, Clone, PartialEq)]
pub struct FlowRouteKey(FlowScoped<FlowRouteKeyInner>);
impl FlowRouteKey {
@@ -145,6 +145,12 @@ pub struct FlowRouteValue {
pub(crate) peer: Peer,
}
impl From<Peer> for FlowRouteValue {
fn from(peer: Peer) -> Self {
Self { peer }
}
}
impl FlowRouteValue {
/// Returns the `peer`.
pub fn peer(&self) -> &Peer {

View File

@@ -166,6 +166,17 @@ impl FlownodeFlowManager {
Self { kv_backend }
}
/// Whether given flow exist on this flownode.
pub async fn exists(
&self,
flownode_id: FlownodeId,
flow_id: FlowId,
partition_id: FlowPartitionId,
) -> Result<bool> {
let key = FlownodeFlowKey::new(flownode_id, flow_id, partition_id).to_bytes();
Ok(self.kv_backend.get(&key).await?.is_some())
}
/// Retrieves all [FlowId] and [FlowPartitionId]s of the specified `flownode_id`.
pub fn flows(
&self,

View File

@@ -158,12 +158,7 @@ mod tests {
provider = "kafka"
broker_endpoints = ["127.0.0.1:9092"]
max_batch_bytes = "1MB"
linger = "200ms"
consumer_wait_timeout = "100ms"
backoff_init = "500ms"
backoff_max = "10s"
backoff_base = 2
backoff_deadline = "5mins"
num_topics = 32
num_partitions = 1
selector_type = "round_robin"

View File

@@ -65,6 +65,7 @@ servers.workspace = true
session.workspace = true
smallvec.workspace = true
snafu.workspace = true
sql.workspace = true
store-api.workspace = true
strum.workspace = true
substrait.workspace = true

View File

@@ -359,7 +359,7 @@ impl FlowDualEngine {
}
} else {
warn!(
"Flownode {:?} found flows not exist in flownode, flow_ids={:?}",
"Flows do not exist in flownode for node {:?}, flow_ids={:?}",
nodeid, to_be_created
);
}
@@ -379,7 +379,7 @@ impl FlowDualEngine {
}
} else {
warn!(
"Flownode {:?} found flows not exist in flownode, flow_ids={:?}",
"Flows do not exist in metadata for node {:?}, flow_ids={:?}",
nodeid, to_be_dropped
);
}
@@ -826,9 +826,17 @@ fn to_meta_err(
location: snafu::Location,
) -> impl FnOnce(crate::error::Error) -> common_meta::error::Error {
move |err: crate::error::Error| -> common_meta::error::Error {
common_meta::error::Error::External {
location,
source: BoxedError::new(err),
match err {
crate::error::Error::FlowNotFound { id, .. } => {
common_meta::error::Error::FlowNotFound {
flow_name: format!("flow_id={id}"),
location,
}
}
_ => common_meta::error::Error::External {
location,
source: BoxedError::new(err),
},
}
}
}

View File

@@ -39,7 +39,8 @@ use crate::batching_mode::time_window::{find_time_window_expr, TimeWindowExpr};
use crate::batching_mode::utils::sql_to_df_plan;
use crate::engine::FlowEngine;
use crate::error::{
ExternalSnafu, FlowAlreadyExistSnafu, TableNotFoundMetaSnafu, UnexpectedSnafu, UnsupportedSnafu,
ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu,
UnexpectedSnafu, UnsupportedSnafu,
};
use crate::{CreateFlowArgs, Error, FlowId, TableName};
@@ -312,7 +313,7 @@ impl BatchingEngine {
.unwrap_or("None".to_string())
);
let task = BatchingTask::new(
let task = BatchingTask::try_new(
flow_id,
&sql,
plan,
@@ -323,7 +324,7 @@ impl BatchingEngine {
query_ctx,
self.catalog_manager.clone(),
rx,
);
)?;
let task_inner = task.clone();
let engine = self.query_engine.clone();
@@ -349,7 +350,8 @@ impl BatchingEngine {
pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> {
if self.tasks.write().await.remove(&flow_id).is_none() {
warn!("Flow {flow_id} not found in tasks")
warn!("Flow {flow_id} not found in tasks");
FlowNotFoundSnafu { id: flow_id }.fail()?;
}
let Some(tx) = self.shutdown_txs.write().await.remove(&flow_id) else {
UnexpectedSnafu {
@@ -366,9 +368,7 @@ impl BatchingEngine {
pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> {
debug!("Try flush flow {flow_id}");
let task = self.tasks.read().await.get(&flow_id).cloned();
let task = task.with_context(|| UnexpectedSnafu {
reason: format!("Can't found task for flow {flow_id}"),
})?;
let task = task.with_context(|| FlowNotFoundSnafu { id: flow_id })?;
task.mark_all_windows_as_dirty()?;

View File

@@ -71,18 +71,33 @@ impl TaskState {
self.last_update_time = Instant::now();
}
/// wait for at least `last_query_duration`, at most `max_timeout` to start next query
/// Compute the next query delay based on the time window size or the last query duration.
/// Aiming to avoid too frequent queries. But also not too long delay.
/// The delay is computed as follows:
/// - If `time_window_size` is set, the delay is half the time window size, constrained to be
/// at least `last_query_duration` and at most `max_timeout`.
/// - If `time_window_size` is not set, the delay defaults to `last_query_duration`, constrained
/// to be at least `MIN_REFRESH_DURATION` and at most `max_timeout`.
///
/// if have more dirty time window, exec next query immediately
/// If there are dirty time windows, the function returns an immediate execution time to clean them.
/// TODO: Make this behavior configurable.
pub fn get_next_start_query_time(
&self,
flow_id: FlowId,
time_window_size: &Option<Duration>,
max_timeout: Option<Duration>,
) -> Instant {
let next_duration = max_timeout
let last_duration = max_timeout
.unwrap_or(self.last_query_duration)
.min(self.last_query_duration);
let next_duration = next_duration.max(MIN_REFRESH_DURATION);
.min(self.last_query_duration)
.max(MIN_REFRESH_DURATION);
let next_duration = time_window_size
.map(|t| {
let half = t / 2;
half.max(last_duration)
})
.unwrap_or(last_duration);
// if have dirty time window, execute immediately to clean dirty time window
if self.dirty_time_windows.windows.is_empty() {

View File

@@ -13,7 +13,6 @@
// limitations under the License.
use std::collections::{BTreeSet, HashSet};
use std::ops::Deref;
use std::sync::{Arc, RwLock};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
@@ -29,6 +28,7 @@ use datafusion::optimizer::analyzer::count_wildcard_rule::CountWildcardRule;
use datafusion::optimizer::AnalyzerRule;
use datafusion::sql::unparser::expr_to_sql;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::DFSchemaRef;
use datafusion_expr::{DmlStatement, LogicalPlan, WriteOp};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
@@ -37,6 +37,8 @@ use query::query_engine::DefaultSerializer;
use query::QueryEngineRef;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use sql::parser::{ParseOptions, ParserContext};
use sql::statements::statement::Statement;
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;
@@ -68,13 +70,42 @@ use crate::{Error, FlowId};
pub struct TaskConfig {
pub flow_id: FlowId,
pub query: String,
plan: Arc<LogicalPlan>,
/// output schema of the query
pub output_schema: DFSchemaRef,
pub time_window_expr: Option<TimeWindowExpr>,
/// in seconds
pub expire_after: Option<i64>,
sink_table_name: [String; 3],
pub source_table_names: HashSet<[String; 3]>,
catalog_manager: CatalogManagerRef,
query_type: QueryType,
}
fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
let stmts =
ParserContext::create_with_dialect(query, query_ctx.sql_dialect(), ParseOptions::default())
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
ensure!(
stmts.len() == 1,
InvalidQuerySnafu {
reason: format!("Expect only one statement, found {}", stmts.len())
}
);
let stmt = &stmts[0];
match stmt {
Statement::Tql(_) => Ok(QueryType::Tql),
_ => Ok(QueryType::Sql),
}
}
#[derive(Debug, Clone)]
enum QueryType {
/// query is a tql query
Tql,
/// query is a sql query
Sql,
}
#[derive(Clone)]
@@ -85,7 +116,7 @@ pub struct BatchingTask {
impl BatchingTask {
#[allow(clippy::too_many_arguments)]
pub fn new(
pub fn try_new(
flow_id: FlowId,
query: &str,
plan: LogicalPlan,
@@ -96,20 +127,21 @@ impl BatchingTask {
query_ctx: QueryContextRef,
catalog_manager: CatalogManagerRef,
shutdown_rx: oneshot::Receiver<()>,
) -> Self {
Self {
) -> Result<Self, Error> {
Ok(Self {
config: Arc::new(TaskConfig {
flow_id,
query: query.to_string(),
plan: Arc::new(plan),
time_window_expr,
expire_after,
sink_table_name,
source_table_names: source_table_names.into_iter().collect(),
catalog_manager,
output_schema: plan.schema().clone(),
query_type: determine_query_type(query, &query_ctx)?,
}),
state: Arc::new(RwLock::new(TaskState::new(query_ctx, shutdown_rx))),
}
})
}
/// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set)
@@ -380,6 +412,23 @@ impl BatchingTask {
frontend_client: Arc<FrontendClient>,
) {
loop {
// first check if shutdown signal is received
// if so, break the loop
{
let mut state = self.state.write().unwrap();
match state.shutdown_rx.try_recv() {
Ok(()) => break,
Err(TryRecvError::Closed) => {
warn!(
"Unexpected shutdown flow {}, shutdown anyway",
self.config.flow_id
);
break;
}
Err(TryRecvError::Empty) => (),
}
}
let mut new_query = None;
let mut gen_and_exec = async || {
new_query = self.gen_insert_plan(&engine).await?;
@@ -393,20 +442,15 @@ impl BatchingTask {
// normal execute, sleep for some time before doing next query
Ok(Some(_)) => {
let sleep_until = {
let mut state = self.state.write().unwrap();
match state.shutdown_rx.try_recv() {
Ok(()) => break,
Err(TryRecvError::Closed) => {
warn!(
"Unexpected shutdown flow {}, shutdown anyway",
self.config.flow_id
);
break;
}
Err(TryRecvError::Empty) => (),
}
let state = self.state.write().unwrap();
state.get_next_start_query_time(
self.config.flow_id,
&self
.config
.time_window_expr
.as_ref()
.and_then(|t| *t.time_window_size()),
Some(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT),
)
};
@@ -472,7 +516,7 @@ impl BatchingTask {
.unwrap_or(u64::MIN);
let low_bound = Timestamp::new_second(low_bound as i64);
let schema_len = self.config.plan.schema().fields().len();
let schema_len = self.config.output_schema.fields().len();
let expire_time_window_bound = self
.config
@@ -481,104 +525,101 @@ impl BatchingTask {
.map(|expr| expr.eval(low_bound))
.transpose()?;
let new_plan = {
let expr = {
match expire_time_window_bound {
Some((Some(l), Some(u))) => {
let window_size = u.sub(&l).with_context(|| UnexpectedSnafu {
reason: format!("Can't get window size from {u:?} - {l:?}"),
})?;
let col_name = self
.config
.time_window_expr
.as_ref()
.map(|expr| expr.column_name.clone())
.with_context(|| UnexpectedSnafu {
reason: format!(
"Flow id={:?}, Failed to get column name from time window expr",
self.config.flow_id
),
})?;
self.state
.write()
.unwrap()
.dirty_time_windows
.gen_filter_exprs(
&col_name,
Some(l),
window_size,
self.config.flow_id,
Some(self),
)?
}
_ => {
// use sink_table_meta to add to query the `update_at` and `__ts_placeholder` column's value too for compatibility reason
debug!(
"Flow id = {:?}, can't get window size: precise_lower_bound={expire_time_window_bound:?}, using the same query", self.config.flow_id
);
// clean dirty time window too, this could be from create flow's check_execute
self.state.write().unwrap().dirty_time_windows.clean();
let mut add_auto_column =
AddAutoColumnRewriter::new(sink_table_schema.clone());
let plan = self
.config
.plan
.deref()
.clone()
.rewrite(&mut add_auto_column)
.with_context(|_| DatafusionSnafu {
context: format!(
"Failed to rewrite plan:\n {}\n",
self.config.plan
),
})?
.data;
let schema_len = plan.schema().fields().len();
// since no time window lower/upper bound is found, just return the original query(with auto columns)
return Ok(Some((plan, schema_len)));
}
}
};
let (Some((Some(l), Some(u))), QueryType::Sql) =
(expire_time_window_bound, &self.config.query_type)
else {
// either no time window or not a sql query, then just use the original query
// use sink_table_meta to add to query the `update_at` and `__ts_placeholder` column's value too for compatibility reason
debug!(
"Flow id={:?}, Generated filter expr: {:?}",
self.config.flow_id,
expr.as_ref()
.map(|expr| expr_to_sql(expr).with_context(|_| DatafusionSnafu {
context: format!("Failed to generate filter expr from {expr:?}"),
}))
.transpose()?
.map(|s| s.to_string())
"Flow id = {:?}, can't get window size: precise_lower_bound={expire_time_window_bound:?}, using the same query", self.config.flow_id
);
// clean dirty time window too, this could be from create flow's check_execute
self.state.write().unwrap().dirty_time_windows.clean();
let Some(expr) = expr else {
// no new data, hence no need to update
debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
return Ok(None);
};
// TODO(discord9): add auto column or not? This might break compatibility for auto created sink table before this, but that's ok right?
let mut add_filter = AddFilterRewriter::new(expr);
// TODO(discord9): not add auto column for tql query?
let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone());
let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false)
.await?;
let rewrite = plan
let plan = plan
.clone()
.rewrite(&mut add_filter)
.and_then(|p| p.data.rewrite(&mut add_auto_column))
.rewrite(&mut add_auto_column)
.with_context(|_| DatafusionSnafu {
context: format!("Failed to rewrite plan:\n {}\n", plan),
})?
.data;
// only apply optimize after complex rewrite is done
apply_df_optimizer(rewrite).await?
let schema_len = plan.schema().fields().len();
// since no time window lower/upper bound is found, just return the original query(with auto columns)
return Ok(Some((plan, schema_len)));
};
debug!(
"Flow id = {:?}, found time window: precise_lower_bound={:?}, precise_upper_bound={:?}",
self.config.flow_id, l, u
);
let window_size = u.sub(&l).with_context(|| UnexpectedSnafu {
reason: format!("Can't get window size from {u:?} - {l:?}"),
})?;
let col_name = self
.config
.time_window_expr
.as_ref()
.map(|expr| expr.column_name.clone())
.with_context(|| UnexpectedSnafu {
reason: format!(
"Flow id={:?}, Failed to get column name from time window expr",
self.config.flow_id
),
})?;
let expr = self
.state
.write()
.unwrap()
.dirty_time_windows
.gen_filter_exprs(
&col_name,
Some(l),
window_size,
self.config.flow_id,
Some(self),
)?;
debug!(
"Flow id={:?}, Generated filter expr: {:?}",
self.config.flow_id,
expr.as_ref()
.map(|expr| expr_to_sql(expr).with_context(|_| DatafusionSnafu {
context: format!("Failed to generate filter expr from {expr:?}"),
}))
.transpose()?
.map(|s| s.to_string())
);
let Some(expr) = expr else {
// no new data, hence no need to update
debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
return Ok(None);
};
let mut add_filter = AddFilterRewriter::new(expr);
let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone());
let plan =
sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false).await?;
let rewrite = plan
.clone()
.rewrite(&mut add_filter)
.and_then(|p| p.data.rewrite(&mut add_auto_column))
.with_context(|_| DatafusionSnafu {
context: format!("Failed to rewrite plan:\n {}\n", plan),
})?
.data;
// only apply optimize after complex rewrite is done
let new_plan = apply_df_optimizer(rewrite).await?;
Ok(Some((new_plan, schema_len)))
}
}

View File

@@ -55,6 +55,9 @@ use crate::error::{
use crate::expr::error::DataTypeSnafu;
use crate::Error;
/// Represents a test timestamp in seconds since the Unix epoch.
const DEFAULT_TEST_TIMESTAMP: Timestamp = Timestamp::new_second(17_0000_0000);
/// Time window expr like `date_bin(INTERVAL '1' MINUTE, ts)`, this type help with
/// evaluating the expr using given timestamp
///
@@ -70,6 +73,7 @@ pub struct TimeWindowExpr {
pub column_name: String,
logical_expr: Expr,
df_schema: DFSchema,
eval_time_window_size: Option<std::time::Duration>,
}
impl std::fmt::Display for TimeWindowExpr {
@@ -84,6 +88,11 @@ impl std::fmt::Display for TimeWindowExpr {
}
impl TimeWindowExpr {
/// The time window size of the expr, get from calling `eval` with a test timestamp
pub fn time_window_size(&self) -> &Option<std::time::Duration> {
&self.eval_time_window_size
}
pub fn from_expr(
expr: &Expr,
column_name: &str,
@@ -91,12 +100,28 @@ impl TimeWindowExpr {
session: &SessionState,
) -> Result<Self, Error> {
let phy_expr: PhysicalExprRef = to_phy_expr(expr, df_schema, session)?;
Ok(Self {
let mut zelf = Self {
phy_expr,
column_name: column_name.to_string(),
logical_expr: expr.clone(),
df_schema: df_schema.clone(),
})
eval_time_window_size: None,
};
let test_ts = DEFAULT_TEST_TIMESTAMP;
let (l, u) = zelf.eval(test_ts)?;
let time_window_size = match (l, u) {
(Some(l), Some(u)) => u.sub(&l).map(|r| r.to_std()).transpose().map_err(|_| {
UnexpectedSnafu {
reason: format!(
"Expect upper bound older than lower bound, found upper={u:?} and lower={l:?}"
),
}
.build()
})?,
_ => None,
};
zelf.eval_time_window_size = time_window_size;
Ok(zelf)
}
pub fn eval(

View File

@@ -29,15 +29,18 @@ use datafusion_common::tree_node::{
use datafusion_common::{DFSchema, DataFusionError, ScalarValue};
use datafusion_expr::{Distinct, LogicalPlan, Projection};
use datatypes::schema::SchemaRef;
use query::parser::QueryLanguageParser;
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement, DEFAULT_LOOKBACK_STRING};
use query::QueryEngineRef;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use sql::parser::{ParseOptions, ParserContext};
use sql::statements::statement::Statement;
use sql::statements::tql::Tql;
use table::metadata::TableInfo;
use crate::adapter::AUTO_CREATED_PLACEHOLDER_TS_COL;
use crate::df_optimizer::apply_df_optimizer;
use crate::error::{DatafusionSnafu, ExternalSnafu, TableNotFoundSnafu};
use crate::error::{DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu, TableNotFoundSnafu};
use crate::{Error, TableName};
pub async fn get_table_info_df_schema(
@@ -73,21 +76,57 @@ pub async fn get_table_info_df_schema(
}
/// Convert sql to datafusion logical plan
/// Also support TQL (but only Eval not Explain or Analyze)
pub async fn sql_to_df_plan(
query_ctx: QueryContextRef,
engine: QueryEngineRef,
sql: &str,
optimize: bool,
) -> Result<LogicalPlan, Error> {
let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let stmts =
ParserContext::create_with_dialect(sql, query_ctx.sql_dialect(), ParseOptions::default())
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
ensure!(
stmts.len() == 1,
InvalidQuerySnafu {
reason: format!("Expect only one statement, found {}", stmts.len())
}
);
let stmt = &stmts[0];
let query_stmt = match stmt {
Statement::Tql(tql) => match tql {
Tql::Eval(eval) => {
let eval = eval.clone();
let promql = PromQuery {
start: eval.start,
end: eval.end,
step: eval.step,
query: eval.query,
lookback: eval
.lookback
.unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
};
QueryLanguageParser::parse_promql(&promql, &query_ctx)
.map_err(BoxedError::new)
.context(ExternalSnafu)?
}
_ => InvalidQuerySnafu {
reason: format!("TQL statement {tql:?} is not supported, expect only TQL EVAL"),
}
.fail()?,
},
_ => QueryStatement::Sql(stmt.clone()),
};
let plan = engine
.planner()
.plan(&stmt, query_ctx)
.plan(&query_stmt, query_ctx)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let plan = if optimize {
apply_df_optimizer(plan).await?
} else {

View File

@@ -321,8 +321,8 @@ impl ErrorExt for Error {
Self::FlowAlreadyExist { .. } => StatusCode::TableAlreadyExists,
Self::TableNotFound { .. }
| Self::TableNotFoundMeta { .. }
| Self::FlowNotFound { .. }
| Self::ListFlows { .. } => StatusCode::TableNotFound,
Self::FlowNotFound { .. } => StatusCode::FlowNotFound,
Self::Plan { .. } | Self::Datatypes { .. } => StatusCode::PlanQuery,
Self::CreateFlow { .. } | Self::Arrow { .. } | Self::Time { .. } => {
StatusCode::EngineExecuteQuery

View File

@@ -596,7 +596,7 @@ impl FrontendInvoker {
.start_timer();
self.inserter
.handle_row_inserts(requests, ctx, &self.statement_executor)
.handle_row_inserts(requests, ctx, &self.statement_executor, false)
.await
.map_err(BoxedError::new)
.context(common_frontend::error::ExternalSnafu)

View File

@@ -75,7 +75,10 @@ impl GrpcQueryHandler for Instance {
let output = match request {
Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
Request::RowInserts(requests) => self.handle_row_inserts(requests, ctx.clone()).await?,
Request::RowInserts(requests) => {
self.handle_row_inserts(requests, ctx.clone(), false)
.await?
}
Request::Deletes(requests) => self.handle_deletes(requests, ctx.clone()).await?,
Request::RowDeletes(requests) => self.handle_row_deletes(requests, ctx.clone()).await?,
Request::Query(query_request) => {
@@ -416,9 +419,15 @@ impl Instance {
&self,
requests: RowInsertRequests,
ctx: QueryContextRef,
accommodate_existing_schema: bool,
) -> Result<Output> {
self.inserter
.handle_row_inserts(requests, ctx, self.statement_executor.as_ref())
.handle_row_inserts(
requests,
ctx,
self.statement_executor.as_ref(),
accommodate_existing_schema,
)
.await
.context(TableOperationSnafu)
}
@@ -430,7 +439,7 @@ impl Instance {
ctx: QueryContextRef,
) -> Result<Output> {
self.inserter
.handle_last_non_null_inserts(requests, ctx, self.statement_executor.as_ref())
.handle_last_non_null_inserts(requests, ctx, self.statement_executor.as_ref(), true)
.await
.context(TableOperationSnafu)
}

View File

@@ -53,7 +53,7 @@ impl OpentsdbProtocolHandler for Instance {
};
let output = self
.handle_row_inserts(requests, ctx)
.handle_row_inserts(requests, ctx, true)
.await
.map_err(BoxedError::new)
.context(servers::error::ExecuteGrpcQuerySnafu)?;

View File

@@ -63,7 +63,7 @@ impl OpenTelemetryProtocolHandler for Instance {
None
};
self.handle_row_inserts(requests, ctx)
self.handle_row_inserts(requests, ctx, false)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)

View File

@@ -195,7 +195,7 @@ impl PromStoreProtocolHandler for Instance {
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?
} else {
self.handle_row_inserts(request, ctx.clone())
self.handle_row_inserts(request, ctx.clone(), true)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?

View File

@@ -233,7 +233,7 @@ impl SlowQueryEventHandler {
.into();
self.inserter
.handle_row_inserts(requests, query_ctx, &self.statement_executor)
.handle_row_inserts(requests, query_ctx, &self.statement_executor, false)
.await
.context(TableOperationSnafu)?;

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::net::SocketAddr;
use std::sync::Arc;
use api::v1::meta::cluster_server::ClusterServer;
@@ -36,7 +37,6 @@ use common_telemetry::info;
#[cfg(feature = "pg_kvbackend")]
use deadpool_postgres::{Config, Runtime};
use etcd_client::Client;
use futures::future;
use servers::configurator::ConfiguratorRef;
use servers::export_metrics::ExportMetricsTask;
use servers::http::{HttpServer, HttpServerBuilder};
@@ -53,6 +53,7 @@ use sqlx::mysql::{MySqlConnection, MySqlPool};
use sqlx::Connection;
use tokio::net::TcpListener;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::{oneshot, Mutex};
#[cfg(feature = "pg_kvbackend")]
use tokio_postgres::NoTls;
use tonic::codec::CompressionEncoding;
@@ -88,6 +89,12 @@ pub struct MetasrvInstance {
plugins: Plugins,
export_metrics_task: Option<ExportMetricsTask>,
/// gRPC serving state receiver. Only present if the gRPC server is started.
serve_state: Arc<Mutex<Option<oneshot::Receiver<Result<()>>>>>,
/// gRPC bind addr
bind_addr: Option<SocketAddr>,
}
impl MetasrvInstance {
@@ -113,6 +120,8 @@ impl MetasrvInstance {
signal_sender: None,
plugins,
export_metrics_task,
serve_state: Default::default(),
bind_addr: None,
})
}
@@ -132,21 +141,30 @@ impl MetasrvInstance {
router = configurator.config_grpc(router);
}
let metasrv = bootstrap_metasrv_with_router(&self.opts.bind_addr, router, rx);
let (serve_state_tx, serve_state_rx) = oneshot::channel();
let socket_addr =
bootstrap_metasrv_with_router(&self.opts.bind_addr, router, serve_state_tx, rx).await?;
self.bind_addr = Some(socket_addr);
let addr = self.opts.http.addr.parse().context(error::ParseAddrSnafu {
addr: &self.opts.http.addr,
})?;
let http_srv = async {
self.http_server
.start(addr)
.await
.context(error::StartHttpSnafu)
};
future::try_join(metasrv, http_srv).await?;
self.http_server
.start(addr)
.await
.context(error::StartHttpSnafu)?;
*self.serve_state.lock().await = Some(serve_state_rx);
Ok(())
}
pub async fn shutdown(&self) -> Result<()> {
if let Some(mut rx) = self.serve_state.lock().await.take() {
if let Ok(Err(err)) = rx.try_recv() {
common_telemetry::error!(err; "Metasrv start failed")
}
}
if let Some(signal) = &self.signal_sender {
signal
.send(())
@@ -170,30 +188,42 @@ impl MetasrvInstance {
pub fn get_inner(&self) -> &Metasrv {
&self.metasrv
}
pub fn bind_addr(&self) -> &Option<SocketAddr> {
&self.bind_addr
}
}
pub async fn bootstrap_metasrv_with_router(
bind_addr: &str,
router: Router,
mut signal: Receiver<()>,
) -> Result<()> {
serve_state_tx: oneshot::Sender<Result<()>>,
mut shutdown_rx: Receiver<()>,
) -> Result<SocketAddr> {
let listener = TcpListener::bind(bind_addr)
.await
.context(error::TcpBindSnafu { addr: bind_addr })?;
info!("gRPC server is bound to: {bind_addr}");
let real_bind_addr = listener
.local_addr()
.context(error::TcpBindSnafu { addr: bind_addr })?;
info!("gRPC server is bound to: {}", real_bind_addr);
let incoming =
TcpIncoming::from_listener(listener, true, None).context(error::TcpIncomingSnafu)?;
router
.serve_with_incoming_shutdown(incoming, async {
let _ = signal.recv().await;
})
.await
.context(error::StartGrpcSnafu)?;
let _handle = common_runtime::spawn_global(async move {
let result = router
.serve_with_incoming_shutdown(incoming, async {
let _ = shutdown_rx.recv().await;
})
.await
.inspect_err(|err| common_telemetry::error!(err;"Failed to start metasrv"))
.context(error::StartGrpcSnafu);
let _ = serve_state_tx.send(result);
});
Ok(())
Ok(real_bind_addr)
}
#[macro_export]

View File

@@ -97,3 +97,8 @@ required-features = ["test"]
name = "bench_filter_time_partition"
harness = false
required-features = ["test"]
[[bench]]
name = "bench_compaction_picker"
harness = false
required-features = ["test"]

View File

@@ -0,0 +1,157 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use mito2::compaction::run::{
find_overlapping_items, find_sorted_runs, merge_seq_files, reduce_runs, Item, Ranged, SortedRun,
};
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
struct MockFile {
start: i64,
end: i64,
size: usize,
}
impl Ranged for MockFile {
type BoundType = i64;
fn range(&self) -> (Self::BoundType, Self::BoundType) {
(self.start, self.end)
}
}
impl Item for MockFile {
fn size(&self) -> usize {
self.size
}
}
fn generate_test_files(n: usize) -> Vec<MockFile> {
let mut files = Vec::with_capacity(n);
for _ in 0..n {
// Create slightly overlapping ranges to force multiple sorted runs
files.push(MockFile {
start: 0,
end: 10,
size: 10,
});
}
files
}
fn bench_find_sorted_runs(c: &mut Criterion) {
let mut group = c.benchmark_group("find_sorted_runs");
for size in [10, 100, 1000].iter() {
group.bench_function(format!("size_{}", size), |b| {
let mut files = generate_test_files(*size);
b.iter(|| {
find_sorted_runs(black_box(&mut files));
});
});
}
group.finish();
}
fn bench_reduce_runs(c: &mut Criterion) {
let mut group = c.benchmark_group("reduce_runs");
for size in [10, 100, 1000].iter() {
group.bench_function(format!("size_{}", size), |b| {
let mut files = generate_test_files(*size);
let runs = find_sorted_runs(&mut files);
b.iter(|| {
reduce_runs(black_box(runs.clone()));
});
});
}
group.finish();
}
fn bench_find_overlapping_items(c: &mut Criterion) {
let mut group = c.benchmark_group("find_overlapping_items");
for size in [10, 100, 1000].iter() {
group.bench_function(format!("size_{}", size), |b| {
// Create two sets of files with some overlapping ranges
let mut files1 = Vec::with_capacity(*size);
let mut files2 = Vec::with_capacity(*size);
for i in 0..*size {
files1.push(MockFile {
start: i as i64,
end: (i + 5) as i64,
size: 10,
});
files2.push(MockFile {
start: (i + 3) as i64,
end: (i + 8) as i64,
size: 10,
});
}
let mut r1 = SortedRun::from(files1);
let mut r2 = SortedRun::from(files2);
b.iter(|| {
let mut result = vec![];
find_overlapping_items(black_box(&mut r1), black_box(&mut r2), &mut result);
});
});
}
group.finish();
}
fn bench_merge_seq_files(c: &mut Criterion) {
let mut group = c.benchmark_group("merge_seq_files");
for size in [10, 100, 1000].iter() {
group.bench_function(format!("size_{}", size), |b| {
// Create a set of files with varying sizes
let mut files = Vec::with_capacity(*size);
for i in 0..*size {
// Create files with different sizes to test the scoring algorithm
let file_size = if i % 3 == 0 {
5
} else if i % 3 == 1 {
10
} else {
15
};
files.push(MockFile {
start: i as i64,
end: (i + 1) as i64,
size: file_size,
});
}
b.iter(|| {
merge_seq_files(black_box(&files), black_box(Some(50)));
});
});
}
group.finish();
}
criterion_group!(
benches,
bench_find_sorted_runs,
bench_reduce_runs,
bench_find_overlapping_items,
bench_merge_seq_files
);
criterion_main!(benches);

View File

@@ -362,7 +362,7 @@ impl FilePathProvider for WriteCachePathProvider {
/// Path provider that builds paths in region storage path.
#[derive(Clone, Debug)]
pub(crate) struct RegionFilePathFactory {
region_dir: String,
pub(crate) region_dir: String,
}
impl RegionFilePathFactory {

View File

@@ -15,7 +15,7 @@
mod buckets;
pub mod compactor;
pub mod picker;
mod run;
pub mod run;
mod task;
#[cfg(test)]
mod test_util;

View File

@@ -36,6 +36,7 @@ use crate::error::{EmptyRegionDirSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Res
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::manifest::storage::manifest_compress_type;
use crate::metrics;
use crate::read::Source;
use crate::region::opener::new_manifest_dir;
use crate::region::options::RegionOptions;
@@ -240,6 +241,14 @@ impl MergeOutput {
pub fn is_empty(&self) -> bool {
self.files_to_add.is_empty() && self.files_to_remove.is_empty()
}
pub fn input_file_size(&self) -> u64 {
self.files_to_remove.iter().map(|f| f.file_size).sum()
}
pub fn output_file_size(&self) -> u64 {
self.files_to_add.iter().map(|f| f.file_size).sum()
}
}
/// Compactor is the trait that defines the compaction logic.
@@ -286,6 +295,7 @@ impl Compactor for DefaultCompactor {
compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone()));
let write_opts = WriteOptions {
write_buffer_size: compaction_region.engine_config.sst_write_buffer_size,
max_file_size: picker_output.max_file_size,
..Default::default()
};
@@ -460,6 +470,9 @@ impl Compactor for DefaultCompactor {
);
return Ok(());
}
metrics::COMPACTION_INPUT_BYTES.inc_by(merge_output.input_file_size() as f64);
metrics::COMPACTION_OUTPUT_BYTES.inc_by(merge_output.output_file_size() as f64);
self.update_manifest(compaction_region, merge_output)
.await?;

View File

@@ -45,6 +45,8 @@ pub struct PickerOutput {
pub outputs: Vec<CompactionOutput>,
pub expired_ssts: Vec<FileHandle>,
pub time_window_size: i64,
/// Max single output file size in bytes.
pub max_file_size: Option<usize>,
}
/// SerializedPickerOutput is a serialized version of PickerOutput by replacing [CompactionOutput] and [FileHandle] with [SerializedCompactionOutput] and [FileMeta].
@@ -53,6 +55,7 @@ pub struct SerializedPickerOutput {
pub outputs: Vec<SerializedCompactionOutput>,
pub expired_ssts: Vec<FileMeta>,
pub time_window_size: i64,
pub max_file_size: Option<usize>,
}
impl From<&PickerOutput> for SerializedPickerOutput {
@@ -76,6 +79,7 @@ impl From<&PickerOutput> for SerializedPickerOutput {
outputs,
expired_ssts,
time_window_size: input.time_window_size,
max_file_size: input.max_file_size,
}
}
}
@@ -111,6 +115,7 @@ impl PickerOutput {
outputs,
expired_ssts,
time_window_size: input.time_window_size,
max_file_size: input.max_file_size,
}
}
}
@@ -131,10 +136,7 @@ pub fn new_picker(
} else {
match compaction_options {
CompactionOptions::Twcs(twcs_opts) => Arc::new(TwcsPicker {
max_active_window_runs: twcs_opts.max_active_window_runs,
max_active_window_files: twcs_opts.max_active_window_files,
max_inactive_window_runs: twcs_opts.max_inactive_window_runs,
max_inactive_window_files: twcs_opts.max_inactive_window_files,
trigger_file_num: twcs_opts.trigger_file_num,
time_window_seconds: twcs_opts.time_window_seconds(),
max_output_file_size: twcs_opts.max_output_file_size.map(|r| r.as_bytes()),
append_mode,
@@ -179,6 +181,7 @@ mod tests {
],
expired_ssts: expired_ssts_file_handle.clone(),
time_window_size: 1000,
max_file_size: None,
};
let picker_output_str =

File diff suppressed because it is too large Load Diff

View File

@@ -22,7 +22,6 @@ use tokio::sync::mpsc;
use crate::compaction::compactor::{CompactionRegion, Compactor};
use crate::compaction::picker::{CompactionTask, PickerOutput};
use crate::error;
use crate::error::CompactRegionSnafu;
use crate::manifest::action::RegionEdit;
use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED};
@@ -30,6 +29,7 @@ use crate::request::{
BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest,
};
use crate::worker::WorkerListener;
use crate::{error, metrics};
/// Maximum number of compaction tasks in parallel.
pub const MAX_PARALLEL_COMPACTION: usize = 1;
@@ -98,6 +98,8 @@ impl CompactionTaskImpl {
};
let merge_time = merge_timer.stop_and_record();
metrics::COMPACTION_INPUT_BYTES.inc_by(compaction_result.input_file_size() as f64);
metrics::COMPACTION_OUTPUT_BYTES.inc_by(compaction_result.output_file_size() as f64);
info!(
"Compacted SST files, region_id: {}, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}, merge_time: {}s",
self.compaction_region.region_id,

View File

@@ -44,30 +44,3 @@ pub fn new_file_handle(
file_purger,
)
}
pub(crate) fn new_file_handles(file_specs: &[(i64, i64, u64)]) -> Vec<FileHandle> {
let file_purger = new_noop_file_purger();
file_specs
.iter()
.map(|(start, end, size)| {
FileHandle::new(
FileMeta {
region_id: 0.into(),
file_id: FileId::random(),
time_range: (
Timestamp::new_millisecond(*start),
Timestamp::new_millisecond(*end),
),
level: 0,
file_size: *size,
available_indexes: Default::default(),
index_file_size: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,
},
file_purger.clone(),
)
})
.collect()
}

View File

@@ -16,15 +16,17 @@ use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;
use common_telemetry::{info, trace};
use common_base::readable_size::ReadableSize;
use common_telemetry::info;
use common_time::timestamp::TimeUnit;
use common_time::timestamp_millis::BucketAligned;
use common_time::Timestamp;
use store_api::storage::RegionId;
use crate::compaction::buckets::infer_time_bucket;
use crate::compaction::compactor::CompactionRegion;
use crate::compaction::picker::{Picker, PickerOutput};
use crate::compaction::run::{find_sorted_runs, reduce_runs, Item};
use crate::compaction::run::{find_sorted_runs, merge_seq_files, reduce_runs};
use crate::compaction::{get_expired_ssts, CompactionOutput};
use crate::sst::file::{overlaps, FileHandle, Level};
use crate::sst::version::LevelMeta;
@@ -35,14 +37,8 @@ const LEVEL_COMPACTED: Level = 1;
/// candidates.
#[derive(Debug)]
pub struct TwcsPicker {
/// Max allowed sorted runs in active window.
pub max_active_window_runs: usize,
/// Max allowed files in active window.
pub max_active_window_files: usize,
/// Max allowed sorted runs in inactive windows.
pub max_inactive_window_runs: usize,
/// Max allowed files in inactive windows.
pub max_inactive_window_files: usize,
/// Minimum file num to trigger a compaction.
pub trigger_file_num: usize,
/// Compaction time window in seconds.
pub time_window_seconds: Option<i64>,
/// Max allowed compaction output file size.
@@ -53,89 +49,48 @@ pub struct TwcsPicker {
impl TwcsPicker {
/// Builds compaction output from files.
/// For active writing window, we allow for at most `max_active_window_runs` files to alleviate
/// fragmentation. For other windows, we allow at most 1 file at each window.
fn build_output(
&self,
region_id: RegionId,
time_windows: &mut BTreeMap<i64, Window>,
active_window: Option<i64>,
) -> Vec<CompactionOutput> {
let mut output = vec![];
for (window, files) in time_windows {
if files.files.is_empty() {
continue;
}
let sorted_runs = find_sorted_runs(&mut files.files);
let found_runs = sorted_runs.len();
// We only remove deletion markers if we found less than 2 runs and not in append mode.
// because after compaction there will be no overlapping files.
let filter_deleted = !files.overlapping && found_runs <= 2 && !self.append_mode;
let (max_runs, max_files) = if let Some(active_window) = active_window
&& *window == active_window
{
(self.max_active_window_runs, self.max_active_window_files)
let inputs = if found_runs > 1 {
reduce_runs(sorted_runs)
} else {
(
self.max_inactive_window_runs,
self.max_inactive_window_files,
)
let run = sorted_runs.last().unwrap();
if run.items().len() < self.trigger_file_num {
continue;
}
// no overlapping files, try merge small files
merge_seq_files(run.items(), self.max_output_file_size)
};
let found_runs = sorted_runs.len();
// We only remove deletion markers once no file in current window overlaps with any other window
// and region is not in append mode.
let filter_deleted =
!files.overlapping && (found_runs == 1 || max_runs == 1) && !self.append_mode;
let inputs = if found_runs > max_runs {
let files_to_compact = reduce_runs(sorted_runs, max_runs);
let files_to_compact_len = files_to_compact.len();
info!(
"Building compaction output, active window: {:?}, \
current window: {}, \
max runs: {}, \
found runs: {}, \
output size: {}, \
max output size: {:?}, \
remove deletion markers: {}",
active_window,
if !inputs.is_empty() {
log_pick_result(
region_id,
*window,
max_runs,
active_window,
found_runs,
files_to_compact_len,
self.max_output_file_size,
filter_deleted
);
files_to_compact
} else if files.files.len() > max_files {
info!(
"Enforcing max file num in window: {}, active: {:?}, max: {}, current: {}, max output size: {:?}, filter delete: {}",
*window,
active_window,
max_files,
files.files.len(),
self.max_output_file_size,
filter_deleted,
&inputs,
);
// Files in window exceeds file num limit
vec![enforce_file_num(&files.files, max_files)]
} else {
trace!("Skip building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, ", active_window, *window, max_runs, found_runs);
continue;
};
let split_inputs = if !filter_deleted
&& let Some(max_output_file_size) = self.max_output_file_size
{
let len_before_split = inputs.len();
let maybe_split = enforce_max_output_size(inputs, max_output_file_size);
if maybe_split.len() != len_before_split {
info!("Compaction output file size exceeds threshold {}, split compaction inputs to: {:?}", max_output_file_size, maybe_split);
}
maybe_split
} else {
inputs
};
for input in split_inputs {
debug_assert!(input.len() > 1);
output.push(CompactionOutput {
output_level: LEVEL_COMPACTED, // always compact to l1
inputs: input,
inputs,
filter_deleted,
output_time_range: None, // we do not enforce output time range in twcs compactions.
});
@@ -145,66 +100,50 @@ impl TwcsPicker {
}
}
/// Limits the size of compaction output in a naive manner.
/// todo(hl): we can find the output file size more precisely by checking the time range
/// of each row group and adding the sizes of those non-overlapping row groups. But now
/// we'd better not to expose the SST details in this level.
fn enforce_max_output_size(
inputs: Vec<Vec<FileHandle>>,
max_output_file_size: u64,
) -> Vec<Vec<FileHandle>> {
inputs
.into_iter()
.flat_map(|input| {
debug_assert!(input.len() > 1);
let estimated_output_size = input.iter().map(|f| f.size()).sum::<u64>();
if estimated_output_size < max_output_file_size {
// total file size does not exceed the threshold, just return the original input.
return vec![input];
}
let mut splits = vec![];
let mut new_input = vec![];
let mut new_input_size = 0;
for f in input {
if new_input_size + f.size() > max_output_file_size {
splits.push(std::mem::take(&mut new_input));
new_input_size = 0;
}
new_input_size += f.size();
new_input.push(f);
}
if !new_input.is_empty() {
splits.push(new_input);
}
splits
#[allow(clippy::too_many_arguments)]
fn log_pick_result(
region_id: RegionId,
window: i64,
active_window: Option<i64>,
found_runs: usize,
file_num: usize,
max_output_file_size: Option<u64>,
filter_deleted: bool,
inputs: &[FileHandle],
) {
let input_file_str: Vec<String> = inputs
.iter()
.map(|f| {
let range = f.time_range();
let start = range.0.to_iso8601_string();
let end = range.1.to_iso8601_string();
let num_rows = f.num_rows();
format!(
"SST{{id: {}, range: ({}, {}), size: {}, num rows: {} }}",
f.file_id(),
start,
end,
ReadableSize(f.size()),
num_rows
)
})
.filter(|p| p.len() > 1)
.collect()
}
/// Merges consecutive files so that file num does not exceed `max_file_num`, and chooses
/// the solution with minimum overhead according to files sizes to be merged.
/// `enforce_file_num` only merges consecutive files so that it won't create overlapping outputs.
/// `runs` must be sorted according to time ranges.
fn enforce_file_num<T: Item>(files: &[T], max_file_num: usize) -> Vec<T> {
debug_assert!(files.len() > max_file_num);
let to_merge = files.len() - max_file_num + 1;
let mut min_penalty = usize::MAX;
let mut min_idx = 0;
for idx in 0..=(files.len() - to_merge) {
let current_penalty: usize = files
.iter()
.skip(idx)
.take(to_merge)
.map(|f| f.size())
.sum();
if current_penalty < min_penalty {
min_penalty = current_penalty;
min_idx = idx;
}
}
files.iter().skip(min_idx).take(to_merge).cloned().collect()
.collect();
let window_str = Timestamp::new_second(window).to_iso8601_string();
let active_window_str = active_window.map(|s| Timestamp::new_second(s).to_iso8601_string());
let max_output_file_size = max_output_file_size.map(|size| ReadableSize(size).to_string());
info!(
"Region ({:?}) compaction pick result: current window: {}, active window: {:?}, \
found runs: {}, file num: {}, max output file size: {:?}, filter deleted: {}, \
input files: {:?}",
region_id,
window_str,
active_window_str,
found_runs,
file_num,
max_output_file_size,
filter_deleted,
input_file_str
);
}
impl Picker for TwcsPicker {
@@ -240,16 +179,18 @@ impl Picker for TwcsPicker {
// Assign files to windows
let mut windows =
assign_to_windows(levels.iter().flat_map(LevelMeta::files), time_window_size);
let outputs = self.build_output(&mut windows, active_window);
let outputs = self.build_output(region_id, &mut windows, active_window);
if outputs.is_empty() && expired_ssts.is_empty() {
return None;
}
let max_file_size = self.max_output_file_size.map(|v| v as usize);
Some(PickerOutput {
outputs,
expired_ssts,
time_window_size,
max_file_size,
})
}
}
@@ -368,12 +309,10 @@ fn find_latest_window_in_seconds<'a>(
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::sync::Arc;
use super::*;
use crate::compaction::test_util::{new_file_handle, new_file_handles};
use crate::sst::file::{FileId, FileMeta, Level};
use crate::test_util::NoopFilePurger;
use crate::compaction::test_util::new_file_handle;
use crate::sst::file::{FileId, Level};
#[test]
fn test_get_latest_window_in_seconds() {
@@ -614,25 +553,31 @@ mod tests {
impl CompactionPickerTestCase {
fn check(&self) {
let file_id_to_idx = self
.input_files
.iter()
.enumerate()
.map(|(idx, file)| (file.file_id(), idx))
.collect::<HashMap<_, _>>();
let mut windows = assign_to_windows(self.input_files.iter(), self.window_size);
let active_window =
find_latest_window_in_seconds(self.input_files.iter(), self.window_size);
let output = TwcsPicker {
max_active_window_runs: 4,
max_active_window_files: usize::MAX,
max_inactive_window_runs: 1,
max_inactive_window_files: usize::MAX,
trigger_file_num: 4,
time_window_seconds: None,
max_output_file_size: None,
append_mode: false,
}
.build_output(&mut windows, active_window);
.build_output(RegionId::from_u64(0), &mut windows, active_window);
let output = output
.iter()
.map(|o| {
let input_file_ids =
o.inputs.iter().map(|f| f.file_id()).collect::<HashSet<_>>();
let input_file_ids = o
.inputs
.iter()
.map(|f| file_id_to_idx.get(&f.file_id()).copied().unwrap())
.collect::<HashSet<_>>();
(input_file_ids, o.output_level)
})
.collect::<Vec<_>>();
@@ -641,11 +586,7 @@ mod tests {
.expected_outputs
.iter()
.map(|o| {
let input_file_ids = o
.input_files
.iter()
.map(|idx| self.input_files[*idx].file_id())
.collect::<HashSet<_>>();
let input_file_ids = o.input_files.iter().copied().collect::<HashSet<_>>();
(input_file_ids, o.output_level)
})
.collect::<Vec<_>>();
@@ -658,47 +599,11 @@ mod tests {
output_level: Level,
}
fn check_enforce_file_num(
input_files: &[(i64, i64, u64)],
max_file_num: usize,
files_to_merge: &[(i64, i64)],
) {
let mut files = new_file_handles(input_files);
// ensure sorted
find_sorted_runs(&mut files);
let mut to_merge = enforce_file_num(&files, max_file_num);
to_merge.sort_unstable_by_key(|f| f.time_range().0);
assert_eq!(
files_to_merge.to_vec(),
to_merge
.iter()
.map(|f| {
let (start, end) = f.time_range();
(start.value(), end.value())
})
.collect::<Vec<_>>()
);
}
#[test]
fn test_enforce_file_num() {
check_enforce_file_num(
&[(0, 300, 2), (100, 200, 1), (200, 400, 1)],
2,
&[(100, 200), (200, 400)],
);
check_enforce_file_num(
&[(0, 300, 200), (100, 200, 100), (200, 400, 100)],
1,
&[(0, 300), (100, 200), (200, 400)],
);
}
#[test]
fn test_build_twcs_output() {
let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
// Case 1: 2 runs found in each time window.
CompactionPickerTestCase {
window_size: 3,
input_files: [
@@ -708,13 +613,25 @@ mod tests {
new_file_handle(file_ids[3], 50, 2998, 0), //active windows
]
.to_vec(),
expected_outputs: vec![ExpectedOutput {
input_files: vec![0, 1],
output_level: 1,
}],
expected_outputs: vec![
ExpectedOutput {
input_files: vec![0, 1],
output_level: 1,
},
ExpectedOutput {
input_files: vec![2, 3],
output_level: 1,
},
],
}
.check();
// Case 2:
// -2000........-3
// -3000.....-100
// 0..............2999
// 50..........2998
// 11.........2990
let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
CompactionPickerTestCase {
window_size: 3,
@@ -724,7 +641,6 @@ mod tests {
new_file_handle(file_ids[2], 0, 2999, 0),
new_file_handle(file_ids[3], 50, 2998, 0),
new_file_handle(file_ids[4], 11, 2990, 0),
new_file_handle(file_ids[5], 50, 4998, 0),
]
.to_vec(),
expected_outputs: vec![
@@ -733,7 +649,7 @@ mod tests {
output_level: 1,
},
ExpectedOutput {
input_files: vec![2, 3, 4],
input_files: vec![2, 4],
output_level: 1,
},
],
@@ -741,44 +657,5 @@ mod tests {
.check();
}
fn make_file_handles(inputs: &[(i64, i64, u64)]) -> Vec<FileHandle> {
inputs
.iter()
.map(|(start, end, size)| {
FileHandle::new(
FileMeta {
region_id: Default::default(),
file_id: Default::default(),
time_range: (
Timestamp::new_millisecond(*start),
Timestamp::new_millisecond(*end),
),
level: 0,
file_size: *size,
available_indexes: Default::default(),
index_file_size: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,
},
Arc::new(NoopFilePurger),
)
})
.collect()
}
#[test]
fn test_limit_output_size() {
let mut files = make_file_handles(&[(1, 1, 1)].repeat(6));
let runs = find_sorted_runs(&mut files);
assert_eq!(6, runs.len());
let files_to_merge = reduce_runs(runs, 2);
let enforced = enforce_max_output_size(files_to_merge, 2);
assert_eq!(2, enforced.len());
assert_eq!(2, enforced[0].len());
assert_eq!(2, enforced[1].len());
}
// TODO(hl): TTL tester that checks if get_expired_ssts function works as expected.
}

View File

@@ -115,6 +115,7 @@ impl Picker for WindowedCompactionPicker {
outputs,
expired_ssts,
time_window_size: time_window,
max_file_size: None, // todo (hl): we may need to support `max_file_size` parameter in manual compaction.
})
}
}

View File

@@ -110,8 +110,6 @@ async fn test_append_mode_compaction() {
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "2")
.insert_option("compaction.twcs.max_inactive_window_runs", "2")
.insert_option("append_mode", "true")
.build();
let region_dir = request.region_dir.clone();
@@ -177,7 +175,7 @@ async fn test_append_mode_compaction() {
+-------+---------+---------------------+";
// Scans in parallel.
let mut scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
assert_eq!(2, scanner.num_files());
assert_eq!(1, scanner.num_files());
assert_eq!(1, scanner.num_memtables());
scanner.set_target_partitions(2);
let stream = scanner.scan().await.unwrap();

View File

@@ -129,8 +129,6 @@ async fn test_compaction_region() {
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "1")
.insert_option("compaction.twcs.max_inactive_window_runs", "1")
.build();
let column_schemas = request
@@ -163,12 +161,12 @@ async fn test_compaction_region() {
// [0..9]
// [10...19]
// [20....29]
// -[15.........29]-
// -[15.........29]- (delete)
// [15.....24]
// Output:
// [0..9]
// [10..14]
// [15..24]
// [10............29] (contains delete)
// [15....24]
assert_eq!(
3,
scanner.num_files(),
@@ -181,6 +179,71 @@ async fn test_compaction_region() {
assert_eq!((0..25).map(|v| v * 1000).collect::<Vec<_>>(), vec);
}
#[tokio::test]
async fn test_compaction_overlapping_files() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
env.get_kv_backend(),
)
.await;
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.build();
let column_schemas = request
.column_metadatas
.iter()
.map(column_metadata_to_column_schema)
.collect::<Vec<_>>();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Flush 5 SSTs for compaction.
put_and_flush(&engine, region_id, &column_schemas, 0..10).await;
delete_and_flush(&engine, region_id, &column_schemas, 10..20).await;
put_and_flush(&engine, region_id, &column_schemas, 20..30).await;
delete_and_flush(&engine, region_id, &column_schemas, 30..40).await;
let result = engine
.handle_request(
region_id,
RegionRequest::Compact(RegionCompactRequest::default()),
)
.await
.unwrap();
assert_eq!(result.affected_rows, 0);
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
assert_eq!(
1,
scanner.num_files(),
"unexpected files: {:?}",
scanner.file_ids()
);
let stream = scanner.scan().await.unwrap();
let vec = collect_stream_ts(stream).await;
assert_eq!(
vec,
(0..=9)
.map(|v| v * 1000)
.chain((20..=29).map(|v| v * 1000))
.collect::<Vec<_>>()
);
}
#[tokio::test]
async fn test_compaction_region_with_overlapping() {
common_telemetry::init_default_ut_logging();
@@ -201,8 +264,6 @@ async fn test_compaction_region_with_overlapping() {
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "2")
.insert_option("compaction.twcs.max_inactive_window_runs", "2")
.insert_option("compaction.twcs.time_window", "1h")
.build();
@@ -257,10 +318,6 @@ async fn test_compaction_region_with_overlapping_delete_all() {
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "2")
.insert_option("compaction.twcs.max_active_window_files", "2")
.insert_option("compaction.twcs.max_inactive_window_runs", "2")
.insert_option("compaction.twcs.max_inactive_window_files", "2")
.insert_option("compaction.twcs.time_window", "1h")
.build();
@@ -290,7 +347,7 @@ async fn test_compaction_region_with_overlapping_delete_all() {
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
assert_eq!(
4,
2,
scanner.num_files(),
"unexpected files: {:?}",
scanner.file_ids()
@@ -332,7 +389,6 @@ async fn test_readonly_during_compaction() {
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "1")
.build();
let column_schemas = request
@@ -404,10 +460,6 @@ async fn test_compaction_update_time_window() {
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "2")
.insert_option("compaction.twcs.max_active_window_files", "2")
.insert_option("compaction.twcs.max_inactive_window_runs", "2")
.insert_option("compaction.twcs.max_inactive_window_files", "2")
.build();
let column_schemas = request
@@ -420,9 +472,10 @@ async fn test_compaction_update_time_window() {
.await
.unwrap();
// Flush 3 SSTs for compaction.
put_and_flush(&engine, region_id, &column_schemas, 0..1200).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 1200..2400).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 2400..3600).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 0..900).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 900..1800).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 1800..2700).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 2700..3600).await; // window 3600
let result = engine
.handle_request(
@@ -433,11 +486,21 @@ async fn test_compaction_update_time_window() {
.unwrap();
assert_eq!(result.affected_rows, 0);
assert_eq!(
engine
.get_region(region_id)
.unwrap()
.version_control
.current()
.version
.compaction_time_window,
Some(Duration::from_secs(3600))
);
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
assert_eq!(0, scanner.num_memtables());
// We keep at most two files.
// We keep all 3 files because no enough file to merge
assert_eq!(
2,
1,
scanner.num_files(),
"unexpected files: {:?}",
scanner.file_ids()
@@ -492,10 +555,6 @@ async fn test_change_region_compaction_window() {
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "1")
.insert_option("compaction.twcs.max_active_window_files", "1")
.insert_option("compaction.twcs.max_inactive_window_runs", "1")
.insert_option("compaction.twcs.max_inactive_window_files", "1")
.build();
let region_dir = request.region_dir.clone();
let column_schemas = request
@@ -508,8 +567,10 @@ async fn test_change_region_compaction_window() {
.await
.unwrap();
// Flush 2 SSTs for compaction.
put_and_flush(&engine, region_id, &column_schemas, 0..1200).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 1200..2400).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 0..600).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 600..1200).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 1200..1800).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 1800..2400).await; // window 3600
engine
.handle_request(
@@ -520,7 +581,7 @@ async fn test_change_region_compaction_window() {
.unwrap();
// Put window 7200
put_and_flush(&engine, region_id, &column_schemas, 4000..5000).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 4000..5000).await;
// Check compaction window.
let region = engine.get_region(region_id).unwrap();
@@ -543,6 +604,22 @@ async fn test_change_region_compaction_window() {
},
});
engine.handle_request(region_id, request).await.unwrap();
assert_eq!(
engine
.get_region(region_id)
.unwrap()
.version_control
.current()
.version
.options
.compaction
.time_window(),
Some(Duration::from_secs(7200))
);
put_and_flush(&engine, region_id, &column_schemas, 5000..5100).await;
put_and_flush(&engine, region_id, &column_schemas, 5100..5200).await;
put_and_flush(&engine, region_id, &column_schemas, 5200..5300).await;
// Compaction again. It should compacts window 3600 and 7200
// into 7200.
@@ -585,12 +662,12 @@ async fn test_change_region_compaction_window() {
{
let region = engine.get_region(region_id).unwrap();
let version = region.version();
// We open the region without options, so the time window should be None.
assert!(version.options.compaction.time_window().is_none());
assert_eq!(
Some(Duration::from_secs(7200)),
version.compaction_time_window,
);
// We open the region without options, so the time window should be None.
assert!(version.options.compaction.time_window().is_none());
}
}
@@ -615,10 +692,6 @@ async fn test_open_overwrite_compaction_window() {
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "1")
.insert_option("compaction.twcs.max_active_window_files", "1")
.insert_option("compaction.twcs.max_inactive_window_runs", "1")
.insert_option("compaction.twcs.max_inactive_window_files", "1")
.build();
let region_dir = request.region_dir.clone();
let column_schemas = request
@@ -631,8 +704,10 @@ async fn test_open_overwrite_compaction_window() {
.await
.unwrap();
// Flush 2 SSTs for compaction.
put_and_flush(&engine, region_id, &column_schemas, 0..1200).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 1200..2400).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 0..600).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 600..1200).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 1200..1800).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 1800..2400).await; // window 3600
engine
.handle_request(

View File

@@ -45,7 +45,6 @@ async fn test_scan_without_filtering_deleted() {
.await;
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "10")
.build();
let column_schemas = rows_schema(&request);

View File

@@ -111,8 +111,6 @@ async fn test_merge_mode_compaction() {
let request = CreateRequestBuilder::new()
.field_num(2)
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "1")
.insert_option("compaction.twcs.max_inactive_window_runs", "1")
.insert_option("merge_mode", "last_non_null")
.build();
let region_dir = request.region_dir.clone();
@@ -191,7 +189,7 @@ async fn test_merge_mode_compaction() {
+-------+---------+---------+---------------------+";
// Scans in parallel.
let mut scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
assert_eq!(1, scanner.num_files());
assert_eq!(2, scanner.num_files());
assert_eq!(1, scanner.num_memtables());
scanner.set_target_partitions(2);
let stream = scanner.scan().await.unwrap();

View File

@@ -33,7 +33,7 @@ use crate::error::{
};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::metrics::{
FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_ERRORS_TOTAL, FLUSH_REQUESTS_TOTAL,
FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_FAILURE_TOTAL, FLUSH_REQUESTS_TOTAL,
INFLIGHT_FLUSH_COUNT,
};
use crate::read::Source;
@@ -601,7 +601,7 @@ impl FlushScheduler {
pub(crate) fn on_flush_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
error!(err; "Region {} failed to flush, cancel all pending tasks", region_id);
FLUSH_ERRORS_TOTAL.inc();
FLUSH_FAILURE_TOTAL.inc();
// Remove this region.
let Some(flush_status) = self.region_status.remove(&region_id) else {

View File

@@ -20,6 +20,7 @@
#![feature(assert_matches)]
#![feature(result_flattening)]
#![feature(int_roundings)]
#![feature(debug_closure_helpers)]
#[cfg(any(test, feature = "test"))]
#[cfg_attr(feature = "test", allow(unused))]

View File

@@ -70,8 +70,8 @@ lazy_static! {
)
.unwrap();
/// Counter of scheduled failed flush jobs.
pub static ref FLUSH_ERRORS_TOTAL: IntCounter =
register_int_counter!("greptime_mito_flush_errors_total", "mito flush errors total").unwrap();
pub static ref FLUSH_FAILURE_TOTAL: IntCounter =
register_int_counter!("greptime_mito_flush_failure_total", "mito flush failure total").unwrap();
/// Elapsed time of a flush job.
pub static ref FLUSH_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_mito_flush_elapsed",
@@ -84,7 +84,7 @@ lazy_static! {
/// Histogram of flushed bytes.
pub static ref FLUSH_BYTES_TOTAL: IntCounter =
register_int_counter!("greptime_mito_flush_bytes_total", "mito flush bytes total").unwrap();
/// Gauge for inflight compaction tasks.
/// Gauge for inflight flush tasks.
pub static ref INFLIGHT_FLUSH_COUNT: IntGauge =
register_int_gauge!(
"greptime_mito_inflight_flush_count",
@@ -153,7 +153,6 @@ lazy_static! {
"greptime_mito_inflight_compaction_count",
"inflight compaction count",
).unwrap();
// ------- End of compaction metrics.
// Query metrics.
/// Timer of different stages in query.
@@ -403,6 +402,20 @@ lazy_static! {
}
lazy_static! {
/// Counter for compaction input file size.
pub static ref COMPACTION_INPUT_BYTES: Counter = register_counter!(
"greptime_mito_compaction_input_bytes",
"mito compaction input file size",
).unwrap();
/// Counter for compaction output file size.
pub static ref COMPACTION_OUTPUT_BYTES: Counter = register_counter!(
"greptime_mito_compaction_output_bytes",
"mito compaction output file size",
).unwrap();
}
/// Stager notifier to collect metrics.
pub struct StagerMetrics {
cache_hit: IntCounter,

View File

@@ -199,18 +199,9 @@ impl Default for CompactionOptions {
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
pub struct TwcsOptions {
/// Max num of sorted runs that can be kept in active writing time window.
/// Minimum file num in every time window to trigger a compaction.
#[serde_as(as = "DisplayFromStr")]
pub max_active_window_runs: usize,
/// Max num of files in the active window.
#[serde_as(as = "DisplayFromStr")]
pub max_active_window_files: usize,
/// Max num of sorted runs that can be kept in inactive time windows.
#[serde_as(as = "DisplayFromStr")]
pub max_inactive_window_runs: usize,
/// Max num of files in inactive time windows.
#[serde_as(as = "DisplayFromStr")]
pub max_inactive_window_files: usize,
pub trigger_file_num: usize,
/// Compaction time window defined when creating tables.
#[serde(with = "humantime_serde")]
pub time_window: Option<Duration>,
@@ -243,12 +234,9 @@ impl TwcsOptions {
impl Default for TwcsOptions {
fn default() -> Self {
Self {
max_active_window_runs: 4,
max_active_window_files: 4,
max_inactive_window_runs: 1,
max_inactive_window_files: 1,
trigger_file_num: 4,
time_window: None,
max_output_file_size: Some(ReadableSize::gb(2)),
max_output_file_size: Some(ReadableSize::mb(512)),
remote_compaction: false,
fallback_to_local: true,
}
@@ -500,7 +488,7 @@ mod tests {
#[test]
fn test_without_compaction_type() {
let map = make_map(&[
("compaction.twcs.max_active_window_runs", "8"),
("compaction.twcs.trigger_file_num", "8"),
("compaction.twcs.time_window", "2h"),
]);
let err = RegionOptions::try_from(&map).unwrap_err();
@@ -510,14 +498,14 @@ mod tests {
#[test]
fn test_with_compaction_type() {
let map = make_map(&[
("compaction.twcs.max_active_window_runs", "8"),
("compaction.twcs.trigger_file_num", "8"),
("compaction.twcs.time_window", "2h"),
("compaction.type", "twcs"),
]);
let options = RegionOptions::try_from(&map).unwrap();
let expect = RegionOptions {
compaction: CompactionOptions::Twcs(TwcsOptions {
max_active_window_runs: 8,
trigger_file_num: 8,
time_window: Some(Duration::from_secs(3600 * 2)),
..Default::default()
}),
@@ -618,10 +606,7 @@ mod tests {
});
let map = make_map(&[
("ttl", "7d"),
("compaction.twcs.max_active_window_runs", "8"),
("compaction.twcs.max_active_window_files", "11"),
("compaction.twcs.max_inactive_window_runs", "2"),
("compaction.twcs.max_inactive_window_files", "3"),
("compaction.twcs.trigger_file_num", "8"),
("compaction.twcs.max_output_file_size", "1GB"),
("compaction.twcs.time_window", "2h"),
("compaction.type", "twcs"),
@@ -645,10 +630,7 @@ mod tests {
let expect = RegionOptions {
ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
compaction: CompactionOptions::Twcs(TwcsOptions {
max_active_window_runs: 8,
max_active_window_files: 11,
max_inactive_window_runs: 2,
max_inactive_window_files: 3,
trigger_file_num: 8,
time_window: Some(Duration::from_secs(3600 * 2)),
max_output_file_size: Some(ReadableSize::gb(1)),
remote_compaction: false,
@@ -679,10 +661,7 @@ mod tests {
let options = RegionOptions {
ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
compaction: CompactionOptions::Twcs(TwcsOptions {
max_active_window_runs: 8,
max_active_window_files: usize::MAX,
max_inactive_window_runs: 2,
max_inactive_window_files: usize::MAX,
trigger_file_num: 8,
time_window: Some(Duration::from_secs(3600 * 2)),
max_output_file_size: None,
remote_compaction: false,
@@ -719,10 +698,7 @@ mod tests {
"ttl": "7days",
"compaction": {
"compaction.type": "twcs",
"compaction.twcs.max_active_window_runs": "8",
"compaction.twcs.max_active_window_files": "11",
"compaction.twcs.max_inactive_window_runs": "2",
"compaction.twcs.max_inactive_window_files": "7",
"compaction.twcs.trigger_file_num": "8",
"compaction.twcs.max_output_file_size": "7MB",
"compaction.twcs.time_window": "2h"
},
@@ -748,10 +724,7 @@ mod tests {
let options = RegionOptions {
ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
compaction: CompactionOptions::Twcs(TwcsOptions {
max_active_window_runs: 8,
max_active_window_files: 11,
max_inactive_window_runs: 2,
max_inactive_window_files: 7,
trigger_file_num: 8,
time_window: Some(Duration::from_secs(3600 * 2)),
max_output_file_size: Some(ReadableSize::mb(7)),
remote_compaction: false,

View File

@@ -15,11 +15,13 @@
//! Structures to describe metadata of files.
use std::fmt;
use std::fmt::{Debug, Formatter};
use std::num::NonZeroU64;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use common_base::readable_size::ReadableSize;
use common_time::Timestamp;
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
@@ -105,7 +107,7 @@ pub(crate) fn overlaps(l: &FileTimeRange, r: &FileTimeRange) -> bool {
}
/// Metadata of a SST file.
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
#[derive(Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
#[serde(default)]
pub struct FileMeta {
/// Region of file.
@@ -142,6 +144,42 @@ pub struct FileMeta {
pub sequence: Option<NonZeroU64>,
}
impl Debug for FileMeta {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let mut debug_struct = f.debug_struct("FileMeta");
debug_struct
.field("region_id", &self.region_id)
.field_with("file_id", |f| write!(f, "{} ", self.file_id))
.field_with("time_range", |f| {
write!(
f,
"({}, {}) ",
self.time_range.0.to_iso8601_string(),
self.time_range.1.to_iso8601_string()
)
})
.field("level", &self.level)
.field("file_size", &ReadableSize(self.file_size));
if !self.available_indexes.is_empty() {
debug_struct
.field("available_indexes", &self.available_indexes)
.field("index_file_size", &ReadableSize(self.index_file_size));
}
debug_struct
.field("num_rows", &self.num_rows)
.field("num_row_groups", &self.num_row_groups)
.field_with("sequence", |f| match self.sequence {
None => {
write!(f, "None")
}
Some(seq) => {
write!(f, "{}", seq)
}
})
.finish()
}
}
/// Type of index.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum IndexType {
@@ -188,13 +226,9 @@ pub struct FileHandle {
impl fmt::Debug for FileHandle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FileHandle")
.field("region_id", &self.inner.meta.region_id)
.field("file_id", &self.inner.meta.file_id)
.field("time_range", &self.inner.meta.time_range)
.field("size", &self.inner.meta.file_size)
.field("level", &self.inner.meta.level)
.field("compacting", &self.inner.compacting)
.field("deleted", &self.inner.deleted)
.field("meta", self.meta_ref())
.field("compacting", &self.compacting())
.field("deleted", &self.inner.deleted.load(Ordering::Relaxed))
.finish()
}
}

View File

@@ -50,6 +50,10 @@ pub struct WriteOptions {
pub write_buffer_size: ReadableSize,
/// Row group size.
pub row_group_size: usize,
/// Max single output file size.
/// Note: This is not a hard limit as we can only observe the file size when
/// ArrowWrite writes to underlying writers.
pub max_file_size: Option<usize>,
}
impl Default for WriteOptions {
@@ -57,6 +61,7 @@ impl Default for WriteOptions {
WriteOptions {
write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
row_group_size: DEFAULT_ROW_GROUP_SIZE,
max_file_size: None,
}
}
}
@@ -99,8 +104,9 @@ mod tests {
use tokio_util::compat::FuturesAsyncWriteCompatExt;
use super::*;
use crate::access_layer::FilePathProvider;
use crate::access_layer::{FilePathProvider, RegionFilePathFactory};
use crate::cache::{CacheManager, CacheStrategy, PageKey};
use crate::read::BatchReader;
use crate::sst::index::{Indexer, IndexerBuilder};
use crate::sst::parquet::format::WriteFormat;
use crate::sst::parquet::reader::ParquetReaderBuilder;
@@ -108,7 +114,8 @@ mod tests {
use crate::sst::{location, DEFAULT_WRITE_CONCURRENCY};
use crate::test_util::sst_util::{
assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
new_batch_with_binary, new_source, sst_file_handle, sst_region_metadata,
new_batch_with_binary, new_source, sst_file_handle, sst_file_handle_with_file_id,
sst_region_metadata,
};
use crate::test_util::{check_reader_result, TestEnv};
@@ -532,4 +539,58 @@ mod tests {
)
.await;
}
#[tokio::test]
async fn test_write_multiple_files() {
common_telemetry::init_default_ut_logging();
// create test env
let mut env = TestEnv::new();
let object_store = env.init_object_store_manager();
let metadata = Arc::new(sst_region_metadata());
let batches = &[
new_batch_by_range(&["a", "d"], 0, 1000),
new_batch_by_range(&["b", "f"], 0, 1000),
new_batch_by_range(&["b", "h"], 100, 200),
new_batch_by_range(&["b", "h"], 200, 300),
new_batch_by_range(&["b", "h"], 300, 1000),
];
let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
let source = new_source(batches);
let write_opts = WriteOptions {
row_group_size: 50,
max_file_size: Some(1024 * 16),
..Default::default()
};
let path_provider = RegionFilePathFactory {
region_dir: "test".to_string(),
};
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
metadata.clone(),
NoopIndexBuilder,
path_provider,
)
.await;
let files = writer.write_all(source, None, &write_opts).await.unwrap();
assert_eq!(2, files.len());
let mut rows_read = 0;
for f in &files {
let file_handle = sst_file_handle_with_file_id(
f.file_id,
f.time_range.0.value(),
f.time_range.1.value(),
);
let builder =
ParquetReaderBuilder::new("test".to_string(), file_handle, object_store.clone());
let mut reader = builder.build().await.unwrap();
while let Some(batch) = reader.next_batch().await.unwrap() {
rows_read += batch.num_rows();
}
}
assert_eq!(total_rows, rows_read);
}
}

View File

@@ -15,11 +15,13 @@
//! Parquet writer.
use std::future::Future;
use std::mem;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use common_telemetry::debug;
use common_time::Timestamp;
use datatypes::arrow::datatypes::SchemaRef;
use object_store::{FuturesAsyncWriter, ObjectStore};
@@ -143,17 +145,52 @@ where
}
}
async fn get_or_create_indexer(&mut self) -> &mut Indexer {
match self.current_indexer {
None => {
self.current_file = FileId::random();
let indexer = self.indexer_builder.build(self.current_file).await;
self.current_indexer = Some(indexer);
// safety: self.current_indexer already set above.
self.current_indexer.as_mut().unwrap()
}
Some(ref mut indexer) => indexer,
}
/// Finishes current SST file and index file.
async fn finish_current_file(
&mut self,
ssts: &mut SstInfoArray,
stats: &mut SourceStats,
) -> Result<()> {
// maybe_init_writer will re-create a new file.
if let Some(mut current_writer) = mem::take(&mut self.writer) {
let stats = mem::take(stats);
// At least one row has been written.
assert!(stats.num_rows > 0);
debug!(
"Finishing current file {}, file size: {}, num rows: {}",
self.current_file,
self.bytes_written.load(Ordering::Relaxed),
stats.num_rows
);
// Finish indexer and writer.
// safety: writer and index can only be both present or not.
let index_output = self.current_indexer.as_mut().unwrap().finish().await;
current_writer.flush().await.context(WriteParquetSnafu)?;
let file_meta = current_writer.close().await.context(WriteParquetSnafu)?;
let file_size = self.bytes_written.load(Ordering::Relaxed) as u64;
// Safety: num rows > 0 so we must have min/max.
let time_range = stats.time_range.unwrap();
// convert FileMetaData to ParquetMetaData
let parquet_metadata = parse_parquet_metadata(file_meta)?;
ssts.push(SstInfo {
file_id: self.current_file,
time_range,
file_size,
num_rows: stats.num_rows,
num_row_groups: parquet_metadata.num_row_groups() as u64,
file_metadata: Some(Arc::new(parquet_metadata)),
index_metadata: index_output,
});
self.current_file = FileId::random();
self.bytes_written.store(0, Ordering::Relaxed)
};
Ok(())
}
/// Iterates source and writes all rows to Parquet file.
@@ -184,6 +221,7 @@ where
override_sequence: Option<SequenceNumber>, // override the `sequence` field from `Source`
opts: &WriteOptions,
) -> Result<SstInfoArray> {
let mut results = smallvec![];
let write_format =
WriteFormat::new(self.metadata.clone()).with_override_sequence(override_sequence);
let mut stats = SourceStats::default();
@@ -196,49 +234,31 @@ where
match res {
Ok(mut batch) => {
stats.update(&batch);
self.get_or_create_indexer().await.update(&mut batch).await;
// safety: self.current_indexer must be set when first batch has been written.
self.current_indexer
.as_mut()
.unwrap()
.update(&mut batch)
.await;
if let Some(max_file_size) = opts.max_file_size
&& self.bytes_written.load(Ordering::Relaxed) > max_file_size
{
self.finish_current_file(&mut results, &mut stats).await?;
}
}
Err(e) => {
self.get_or_create_indexer().await.abort().await;
if let Some(indexer) = &mut self.current_indexer {
indexer.abort().await;
}
return Err(e);
}
}
}
let index_output = self.get_or_create_indexer().await.finish().await;
if stats.num_rows == 0 {
return Ok(smallvec![]);
}
let Some(mut arrow_writer) = self.writer.take() else {
// No batch actually written.
return Ok(smallvec![]);
};
arrow_writer.flush().await.context(WriteParquetSnafu)?;
let file_meta = arrow_writer.close().await.context(WriteParquetSnafu)?;
let file_size = self.bytes_written.load(Ordering::Relaxed) as u64;
// Safety: num rows > 0 so we must have min/max.
let time_range = stats.time_range.unwrap();
// convert FileMetaData to ParquetMetaData
let parquet_metadata = parse_parquet_metadata(file_meta)?;
let file_id = self.current_file;
self.finish_current_file(&mut results, &mut stats).await?;
// object_store.write will make sure all bytes are written or an error is raised.
Ok(smallvec![SstInfo {
file_id,
time_range,
file_size,
num_rows: stats.num_rows,
num_row_groups: parquet_metadata.num_row_groups() as u64,
file_metadata: Some(Arc::new(parquet_metadata)),
index_metadata: index_output,
}])
Ok(results)
}
/// Customizes per-column config according to schema and maybe column cardinality.
@@ -309,6 +329,10 @@ where
AsyncArrowWriter::try_new(writer, schema.clone(), Some(writer_props))
.context(WriteParquetSnafu)?;
self.writer = Some(arrow_writer);
let indexer = self.indexer_builder.build(self.current_file).await;
self.current_indexer = Some(indexer);
// safety: self.writer is assigned above
Ok(self.writer.as_mut().unwrap())
}

View File

@@ -214,28 +214,10 @@ fn set_twcs_options(
region_id: RegionId,
) -> std::result::Result<(), MetadataError> {
match key {
mito_engine_options::TWCS_MAX_ACTIVE_WINDOW_RUNS => {
let runs = parse_usize_with_default(key, value, default_option.max_active_window_runs)?;
log_option_update(region_id, key, options.max_active_window_runs, runs);
options.max_active_window_runs = runs;
}
mito_engine_options::TWCS_MAX_ACTIVE_WINDOW_FILES => {
let files =
parse_usize_with_default(key, value, default_option.max_active_window_files)?;
log_option_update(region_id, key, options.max_active_window_files, files);
options.max_active_window_files = files;
}
mito_engine_options::TWCS_MAX_INACTIVE_WINDOW_RUNS => {
let runs =
parse_usize_with_default(key, value, default_option.max_inactive_window_runs)?;
log_option_update(region_id, key, options.max_inactive_window_runs, runs);
options.max_inactive_window_runs = runs;
}
mito_engine_options::TWCS_MAX_INACTIVE_WINDOW_FILES => {
let files =
parse_usize_with_default(key, value, default_option.max_inactive_window_files)?;
log_option_update(region_id, key, options.max_inactive_window_files, files);
options.max_inactive_window_files = files;
mito_engine_options::TWCS_TRIGGER_FILE_NUM => {
let files = parse_usize_with_default(key, value, default_option.trigger_file_num)?;
log_option_update(region_id, key, options.trigger_file_num, files);
options.trigger_file_num = files;
}
mito_engine_options::TWCS_MAX_OUTPUT_FILE_SIZE => {
let size = if value.is_empty() {

View File

@@ -57,7 +57,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
if self.write_buffer_manager.should_stall() && allow_stall {
self.stalled_count.add(write_requests.len() as i64);
self.stalled_count
.add((write_requests.len() + bulk_requests.len()) as i64);
self.stalled_requests.append(write_requests, bulk_requests);
self.listener.on_write_stall();
return;
@@ -181,7 +182,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
pub(crate) fn reject_region_stalled_requests(&mut self, region_id: &RegionId) {
debug!("Rejects stalled requests for region {}", region_id);
let (mut requests, mut bulk) = self.stalled_requests.remove(region_id);
self.stalled_count.sub(requests.len() as i64);
self.stalled_count.sub((requests.len() + bulk.len()) as i64);
reject_write_requests(&mut requests, &mut bulk);
}
@@ -189,7 +190,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
pub(crate) async fn handle_region_stalled_requests(&mut self, region_id: &RegionId) {
debug!("Handles stalled requests for region {}", region_id);
let (mut requests, mut bulk) = self.stalled_requests.remove(region_id);
self.stalled_count.sub(requests.len() as i64);
self.stalled_count.sub((requests.len() + bulk.len()) as i64);
self.handle_write_requests(&mut requests, &mut bulk, true)
.await;
}

View File

@@ -68,5 +68,6 @@ tokio-util.workspace = true
tonic.workspace = true
[dev-dependencies]
common-meta = { workspace = true, features = ["testing"] }
common-test-util.workspace = true
path-slash = "0.2"

View File

@@ -19,14 +19,12 @@ use api::v1::region::{
bulk_insert_request, region_request, ArrowIpc, BulkInsertRequest, RegionRequest,
RegionRequestHeader,
};
use bytes::Bytes;
use common_base::AffectedRows;
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
use common_grpc::FlightData;
use common_recordbatch::RecordBatch;
use common_telemetry::tracing_context::TracingContext;
use datatypes::schema::Schema;
use prost::Message;
use snafu::ResultExt;
use store_api::storage::RegionId;
use table::metadata::TableId;
@@ -60,13 +58,8 @@ impl Inserter {
.with_label_values(&["raw"])
.observe(record_batch.num_rows() as f64);
// todo(hl): find a way to embed raw FlightData messages in greptimedb proto files so we don't have to encode here.
// safety: when reach here schema must be present.
let schema_message = FlightEncoder::default()
.encode(FlightMessage::Schema(decoder.schema().unwrap().clone()));
let schema_bytes = Bytes::from(schema_message.encode_to_vec());
let schema_bytes = decoder.schema_bytes().unwrap();
let partition_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
.with_label_values(&["partition"])
.start_timer();
@@ -96,12 +89,6 @@ impl Inserter {
.find_region_leader(region_id)
.await
.context(error::FindRegionLeaderSnafu)?;
let payload = {
let _encode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
.with_label_values(&["encode"])
.start_timer();
Bytes::from(data.encode_to_vec())
};
let request = RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
@@ -111,7 +98,8 @@ impl Inserter {
body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc {
region_id: region_id.as_u64(),
schema: schema_bytes,
payload,
data_header: data.data_header,
payload: data.data_body,
})),
})),
};
@@ -149,6 +137,7 @@ impl Inserter {
let record_batch_schema =
Arc::new(Schema::try_from(record_batch.schema()).context(error::ConvertSchemaSnafu)?);
// raw daya header and payload bytes.
let mut raw_data_bytes = None;
for (peer, masks) in mask_per_datanode {
for (region_id, mask) in masks {
@@ -157,10 +146,12 @@ impl Inserter {
let record_batch_schema = record_batch_schema.clone();
let node_manager = self.node_manager.clone();
let peer = peer.clone();
let raw_data = if mask.select_all() {
let raw_header_and_data = if mask.select_all() {
Some(
raw_data_bytes
.get_or_insert_with(|| Bytes::from(data.encode_to_vec()))
.get_or_insert_with(|| {
(data.data_header.clone(), data.data_body.clone())
})
.clone(),
)
} else {
@@ -168,9 +159,9 @@ impl Inserter {
};
let handle: common_runtime::JoinHandle<error::Result<api::region::RegionResponse>> =
common_runtime::spawn_global(async move {
let payload = if mask.select_all() {
let (header, payload) = if mask.select_all() {
// SAFETY: raw data must be present, we can avoid re-encoding.
raw_data.unwrap()
raw_header_and_data.unwrap()
} else {
let filter_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
.with_label_values(&["filter"])
@@ -188,13 +179,10 @@ impl Inserter {
let batch =
RecordBatch::try_from_df_record_batch(record_batch_schema, rb)
.context(error::BuildRecordBatchSnafu)?;
let payload = Bytes::from(
FlightEncoder::default()
.encode(FlightMessage::Recordbatch(batch))
.encode_to_vec(),
);
let flight_data =
FlightEncoder::default().encode(FlightMessage::Recordbatch(batch));
encode_timer.observe_duration();
payload
(flight_data.data_header, flight_data.data_body)
};
let _datanode_handle_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
.with_label_values(&["datanode_handle"])
@@ -208,6 +196,7 @@ impl Inserter {
body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc {
region_id: region_id.as_u64(),
schema: schema_bytes,
data_header: header,
payload,
})),
})),
@@ -231,6 +220,7 @@ impl Inserter {
for res in region_responses {
rows_inserted += res?.affected_rows;
}
crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(rows_inserted as u64);
Ok(rows_inserted)
}
}

View File

@@ -780,8 +780,77 @@ mod tests {
use super::*;
#[test]
fn test_create_flow_tql_expr() {
let sql = r#"
CREATE FLOW calc_reqs SINK TO cnt_reqs AS
TQL EVAL (0, 15, '5s') count_values("status_code", http_requests);"#;
let stmt =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap()
.pop()
.unwrap();
let Statement::CreateFlow(create_flow) = stmt else {
unreachable!()
};
let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
let to_dot_sep =
|c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
assert_eq!("calc_reqs", expr.flow_name);
assert_eq!("greptime", expr.catalog_name);
assert_eq!(
"greptime.public.cnt_reqs",
expr.sink_table_name.map(to_dot_sep).unwrap()
);
assert!(expr.source_table_names.is_empty());
assert_eq!(
r#"TQL EVAL (0, 15, '5s') count_values("status_code", http_requests)"#,
expr.sql
);
}
#[test]
fn test_create_flow_expr() {
let sql = r"
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
SELECT
DISTINCT number as dis
FROM
distinct_basic;";
let stmt =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap()
.pop()
.unwrap();
let Statement::CreateFlow(create_flow) = stmt else {
unreachable!()
};
let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
let to_dot_sep =
|c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
assert_eq!("test_distinct_basic", expr.flow_name);
assert_eq!("greptime", expr.catalog_name);
assert_eq!(
"greptime.public.out_distinct_basic",
expr.sink_table_name.map(to_dot_sep).unwrap()
);
assert_eq!(1, expr.source_table_names.len());
assert_eq!(
"greptime.public.distinct_basic",
to_dot_sep(expr.source_table_names[0].clone())
);
assert_eq!(
r"SELECT
DISTINCT number as dis
FROM
distinct_basic",
expr.sql
);
let sql = r"
CREATE FLOW `task_2`
SINK TO schema_1.table_1

View File

@@ -147,7 +147,7 @@ impl Inserter {
statement_executor: &StatementExecutor,
) -> Result<Output> {
let row_inserts = ColumnToRow::convert(requests)?;
self.handle_row_inserts(row_inserts, ctx, statement_executor)
self.handle_row_inserts(row_inserts, ctx, statement_executor, false)
.await
}
@@ -157,6 +157,7 @@ impl Inserter {
mut requests: RowInsertRequests,
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
accommodate_existing_schema: bool,
) -> Result<Output> {
preprocess_row_insert_requests(&mut requests.inserts)?;
self.handle_row_inserts_with_create_type(
@@ -164,6 +165,7 @@ impl Inserter {
ctx,
statement_executor,
AutoCreateTableType::Physical,
accommodate_existing_schema,
)
.await
}
@@ -180,6 +182,7 @@ impl Inserter {
ctx,
statement_executor,
AutoCreateTableType::Log,
false,
)
.await
}
@@ -195,6 +198,7 @@ impl Inserter {
ctx,
statement_executor,
AutoCreateTableType::Trace,
false,
)
.await
}
@@ -205,12 +209,14 @@ impl Inserter {
requests: RowInsertRequests,
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
accommodate_existing_schema: bool,
) -> Result<Output> {
self.handle_row_inserts_with_create_type(
requests,
ctx,
statement_executor,
AutoCreateTableType::LastNonNull,
accommodate_existing_schema,
)
.await
}
@@ -222,6 +228,7 @@ impl Inserter {
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
create_type: AutoCreateTableType,
accommodate_existing_schema: bool,
) -> Result<Output> {
// remove empty requests
requests.inserts.retain(|req| {
@@ -236,7 +243,13 @@ impl Inserter {
instant_table_ids,
table_infos,
} = self
.create_or_alter_tables_on_demand(&requests, &ctx, create_type, statement_executor)
.create_or_alter_tables_on_demand(
&mut requests,
&ctx,
create_type,
statement_executor,
accommodate_existing_schema,
)
.await?;
let name_to_info = table_infos
@@ -281,10 +294,11 @@ impl Inserter {
table_infos,
} = self
.create_or_alter_tables_on_demand(
&requests,
&mut requests,
&ctx,
AutoCreateTableType::Logical(physical_table.to_string()),
statement_executor,
true,
)
.await?;
let name_to_info = table_infos
@@ -448,12 +462,18 @@ impl Inserter {
///
/// Returns a mapping from table name to table id, where table name is the table name involved in the requests.
/// This mapping is used in the conversion of RowToRegion.
///
/// `accommodate_existing_schema` is used to determine if the existing schema should override the new schema.
/// It only works for TIME_INDEX and VALUE columns. This is for the case where the user creates a table with
/// custom schema, and then inserts data with endpoints that have default schema setting, like prometheus
/// remote write. This will modify the `RowInsertRequests` in place.
async fn create_or_alter_tables_on_demand(
&self,
requests: &RowInsertRequests,
requests: &mut RowInsertRequests,
ctx: &QueryContextRef,
auto_create_table_type: AutoCreateTableType,
statement_executor: &StatementExecutor,
accommodate_existing_schema: bool,
) -> Result<CreateAlterTableResult> {
let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND
.with_label_values(&[auto_create_table_type.as_str()])
@@ -504,7 +524,7 @@ impl Inserter {
let mut alter_tables = vec![];
let mut instant_table_ids = HashSet::new();
for req in &requests.inserts {
for req in &mut requests.inserts {
match self.get_table(catalog, &schema, &req.table_name).await? {
Some(table) => {
let table_info = table.table_info();
@@ -512,9 +532,12 @@ impl Inserter {
instant_table_ids.insert(table_info.table_id());
}
table_infos.insert(table_info.table_id(), table.table_info());
if let Some(alter_expr) =
self.get_alter_table_expr_on_demand(req, &table, ctx)?
{
if let Some(alter_expr) = self.get_alter_table_expr_on_demand(
req,
&table,
ctx,
accommodate_existing_schema,
)? {
alter_tables.push(alter_expr);
}
}
@@ -788,12 +811,16 @@ impl Inserter {
}
/// Returns an alter table expression if it finds new columns in the request.
/// It always adds columns if not exist.
/// When `accommodate_existing_schema` is false, it always adds columns if not exist.
/// When `accommodate_existing_schema` is true, it may modify the input `req` to
/// accommodate it with existing schema. See [`create_or_alter_tables_on_demand`](Self::create_or_alter_tables_on_demand)
/// for more details.
fn get_alter_table_expr_on_demand(
&self,
req: &RowInsertRequest,
req: &mut RowInsertRequest,
table: &TableRef,
ctx: &QueryContextRef,
accommodate_existing_schema: bool,
) -> Result<Option<AlterTableExpr>> {
let catalog_name = ctx.current_catalog();
let schema_name = ctx.current_schema();
@@ -802,10 +829,64 @@ impl Inserter {
let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
let column_exprs = ColumnExpr::from_column_schemas(request_schema);
let add_columns = expr_helper::extract_add_columns_expr(&table.schema(), column_exprs)?;
let Some(add_columns) = add_columns else {
let Some(mut add_columns) = add_columns else {
return Ok(None);
};
// If accommodate_existing_schema is true, update request schema for Timestamp/Field columns
if accommodate_existing_schema {
let table_schema = table.schema();
// Find timestamp column name
let ts_col_name = table_schema.timestamp_column().map(|c| c.name.clone());
// Find field column name if there is only one
let mut field_col_name = None;
let mut multiple_field_cols = false;
table.field_columns().for_each(|col| {
if field_col_name.is_none() {
field_col_name = Some(col.name.clone());
} else {
multiple_field_cols = true;
}
});
if multiple_field_cols {
field_col_name = None;
}
// Update column name in request schema for Timestamp/Field columns
if let Some(rows) = req.rows.as_mut() {
for col in &mut rows.schema {
match col.semantic_type {
x if x == SemanticType::Timestamp as i32 => {
if let Some(ref ts_name) = ts_col_name {
if col.column_name != *ts_name {
col.column_name = ts_name.clone();
}
}
}
x if x == SemanticType::Field as i32 => {
if let Some(ref field_name) = field_col_name {
if col.column_name != *field_name {
col.column_name = field_name.clone();
}
}
}
_ => {}
}
}
}
// Remove from add_columns any column that is timestamp or field (if there is only one field column)
add_columns.add_columns.retain(|col| {
let def = col.column_def.as_ref().unwrap();
def.semantic_type != SemanticType::Timestamp as i32
&& (def.semantic_type != SemanticType::Field as i32 && field_col_name.is_some())
});
if add_columns.add_columns.is_empty() {
return Ok(None);
}
}
Ok(Some(AlterTableExpr {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
@@ -1039,3 +1120,124 @@ impl FlowMirrorTask {
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::{ColumnSchema as GrpcColumnSchema, RowInsertRequest, Rows, SemanticType, Value};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::cache::new_table_flownode_set_cache;
use common_meta::ddl::test_util::datanode_handler::NaiveDatanodeHandler;
use common_meta::test_util::MockDatanodeManager;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use moka::future::Cache;
use session::context::QueryContext;
use table::dist_table::DummyDataSource;
use table::metadata::{TableInfoBuilder, TableMetaBuilder, TableType};
use table::TableRef;
use super::*;
use crate::tests::{create_partition_rule_manager, prepare_mocked_backend};
fn make_table_ref_with_schema(ts_name: &str, field_name: &str) -> TableRef {
let schema = datatypes::schema::SchemaBuilder::try_from_columns(vec![
ColumnSchema::new(
ts_name,
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
ColumnSchema::new(field_name, ConcreteDataType::float64_datatype(), true),
])
.unwrap()
.build()
.unwrap();
let meta = TableMetaBuilder::empty()
.schema(Arc::new(schema))
.primary_key_indices(vec![])
.value_indices(vec![1])
.engine("mito")
.next_column_id(0)
.options(Default::default())
.created_on(Default::default())
.region_numbers(vec![0])
.build()
.unwrap();
let info = Arc::new(
TableInfoBuilder::default()
.table_id(1)
.table_version(0)
.name("test_table")
.schema_name(DEFAULT_SCHEMA_NAME)
.catalog_name(DEFAULT_CATALOG_NAME)
.desc(None)
.table_type(TableType::Base)
.meta(meta)
.build()
.unwrap(),
);
Arc::new(table::Table::new(
info,
table::metadata::FilterPushDownType::Unsupported,
Arc::new(DummyDataSource),
))
}
#[tokio::test]
async fn test_accommodate_existing_schema_logic() {
let ts_name = "my_ts";
let field_name = "my_field";
let table = make_table_ref_with_schema(ts_name, field_name);
// The request uses different names for timestamp and field columns
let mut req = RowInsertRequest {
table_name: "test_table".to_string(),
rows: Some(Rows {
schema: vec![
GrpcColumnSchema {
column_name: "ts_wrong".to_string(),
datatype: api::v1::ColumnDataType::TimestampMillisecond as i32,
semantic_type: SemanticType::Timestamp as i32,
..Default::default()
},
GrpcColumnSchema {
column_name: "field_wrong".to_string(),
datatype: api::v1::ColumnDataType::Float64 as i32,
semantic_type: SemanticType::Field as i32,
..Default::default()
},
],
rows: vec![api::v1::Row {
values: vec![Value::default(), Value::default()],
}],
}),
};
let ctx = Arc::new(QueryContext::with(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
));
let kv_backend = prepare_mocked_backend().await;
let inserter = Inserter::new(
catalog::memory::MemoryCatalogManager::new(),
create_partition_rule_manager(kv_backend.clone()).await,
Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)),
Arc::new(new_table_flownode_set_cache(
String::new(),
Cache::new(100),
kv_backend.clone(),
)),
);
let alter_expr = inserter
.get_alter_table_expr_on_demand(&mut req, &table, &ctx, true)
.unwrap();
assert!(alter_expr.is_none());
// The request's schema should have updated names for timestamp and field columns
let req_schema = req.rows.as_ref().unwrap().schema.clone();
assert_eq!(req_schema[0].column_name, ts_name);
assert_eq!(req_schema[1].column_name, field_name);
}
}

View File

@@ -14,6 +14,7 @@
#![feature(assert_matches)]
#![feature(if_let_guard)]
#![feature(let_chains)]
mod bulk_insert;
pub mod delete;

View File

@@ -57,33 +57,13 @@ mod tests {
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType, Value};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::key::catalog_name::{CatalogManager, CatalogNameKey};
use common_meta::key::schema_name::{SchemaManager, SchemaNameKey};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
use datatypes::vectors::{Int32Vector, VectorRef};
use store_api::storage::RegionId;
use super::*;
use crate::tests::{create_partition_rule_manager, new_test_table_info};
async fn prepare_mocked_backend() -> KvBackendRef {
let backend = Arc::new(MemoryKvBackend::default());
let catalog_manager = CatalogManager::new(backend.clone());
let schema_manager = SchemaManager::new(backend.clone());
catalog_manager
.create(CatalogNameKey::default(), false)
.await
.unwrap();
schema_manager
.create(SchemaNameKey::default(), None, false)
.await
.unwrap();
backend
}
use crate::tests::{
create_partition_rule_manager, new_test_table_info, prepare_mocked_backend,
};
#[tokio::test]
async fn test_delete_request_table_to_region() {

View File

@@ -73,33 +73,13 @@ mod tests {
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType, Value};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::key::catalog_name::{CatalogManager, CatalogNameKey};
use common_meta::key::schema_name::{SchemaManager, SchemaNameKey};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
use datatypes::vectors::{Int32Vector, VectorRef};
use store_api::storage::RegionId;
use super::*;
use crate::tests::{create_partition_rule_manager, new_test_table_info};
async fn prepare_mocked_backend() -> KvBackendRef {
let backend = Arc::new(MemoryKvBackend::default());
let catalog_manager = CatalogManager::new(backend.clone());
let schema_manager = SchemaManager::new(backend.clone());
catalog_manager
.create(CatalogNameKey::default(), false)
.await
.unwrap();
schema_manager
.create(SchemaNameKey::default(), None, false)
.await
.unwrap();
backend
}
use crate::tests::{
create_partition_rule_manager, new_test_table_info, prepare_mocked_backend,
};
#[tokio::test]
async fn test_insert_request_table_to_region() {

View File

@@ -48,7 +48,7 @@ use lazy_static::lazy_static;
use partition::expr::{Operand, PartitionExpr, RestrictedOp};
use partition::multi_dim::MultiDimPartitionRule;
use partition::partition::{PartitionBound, PartitionDef};
use query::parser::{QueryLanguageParser, QueryStatement};
use query::parser::QueryStatement;
use query::plan::extract_and_rewrite_full_table_names;
use query::query_engine::DefaultSerializer;
use query::sql::create_table_stmt;
@@ -56,6 +56,7 @@ use regex::Regex;
use session::context::QueryContextRef;
use session::table_name::table_idents_to_full_name;
use snafu::{ensure, OptionExt, ResultExt};
use sql::parser::{ParseOptions, ParserContext};
use sql::statements::alter::{AlterDatabase, AlterTable};
use sql::statements::create::{
CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, Partitions,
@@ -440,15 +441,33 @@ impl StatementExecutor {
}
let engine = &self.query_engine;
let stmt = QueryLanguageParser::parse_sql(&expr.sql, &query_ctx)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let plan = engine
.planner()
.plan(&stmt, query_ctx)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let stmts = ParserContext::create_with_dialect(
&expr.sql,
query_ctx.sql_dialect(),
ParseOptions::default(),
)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
ensure!(
stmts.len() == 1,
InvalidSqlSnafu {
err_msg: format!("Expect only one statement, found {}", stmts.len())
}
);
let stmt = &stmts[0];
// support tql parse too
let plan = match stmt {
// prom ql is only supported in batching mode
Statement::Tql(_) => return Ok(FlowType::Batching),
_ => engine
.planner()
.plan(&QueryStatement::Sql(stmt.clone()), query_ctx)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?,
};
/// Visitor to find aggregation or distinct
struct FindAggr {
@@ -843,8 +862,46 @@ impl StatementExecutor {
}
);
self.alter_logical_tables_procedure(alter_table_exprs, query_context)
.await?;
// group by physical table id
let mut groups: HashMap<TableId, Vec<AlterTableExpr>> = HashMap::new();
for expr in alter_table_exprs {
// Get table_id from catalog_manager
let catalog = if expr.catalog_name.is_empty() {
query_context.current_catalog()
} else {
&expr.catalog_name
};
let schema = if expr.schema_name.is_empty() {
query_context.current_schema()
} else {
expr.schema_name.to_string()
};
let table_name = &expr.table_name;
let table = self
.catalog_manager
.table(catalog, &schema, table_name, Some(&query_context))
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: format_full_table_name(catalog, &schema, table_name),
})?;
let table_id = table.table_info().ident.table_id;
let physical_table_id = self
.table_metadata_manager
.table_route_manager()
.get_physical_table_id(table_id)
.await
.context(TableMetadataManagerSnafu)?;
groups.entry(physical_table_id).or_default().push(expr);
}
// Submit procedure for each physical table
let mut handles = Vec::with_capacity(groups.len());
for (_physical_table_id, exprs) in groups {
let fut = self.alter_logical_tables_procedure(exprs, query_context.clone());
handles.push(fut);
}
let _results = futures::future::try_join_all(handles).await?;
Ok(Output::new_with_affected_rows(0))
}

View File

@@ -122,7 +122,11 @@ pub fn set_search_path(exprs: Vec<Expr>, ctx: QueryContextRef) -> Result<()> {
match search_expr {
Expr::Value(Value::SingleQuotedString(search_path))
| Expr::Value(Value::DoubleQuotedString(search_path)) => {
ctx.set_current_schema(&search_path.clone());
ctx.set_current_schema(search_path);
Ok(())
}
Expr::Identifier(Ident { value, .. }) => {
ctx.set_current_schema(value);
Ok(())
}
expr => NotSupportedSnafu {

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod kv_backend;
mod partition_manager;
pub(crate) use kv_backend::prepare_mocked_backend;
pub(crate) use partition_manager::{create_partition_rule_manager, new_test_table_info};

View File

@@ -0,0 +1,38 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_meta::key::catalog_name::{CatalogManager, CatalogNameKey};
use common_meta::key::schema_name::{SchemaManager, SchemaNameKey};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
pub async fn prepare_mocked_backend() -> KvBackendRef {
let backend = Arc::new(MemoryKvBackend::default());
let catalog_manager = CatalogManager::new(backend.clone());
let schema_manager = SchemaManager::new(backend.clone());
catalog_manager
.create(CatalogNameKey::default(), false)
.await
.unwrap();
schema_manager
.create(SchemaNameKey::default(), None, false)
.await
.unwrap();
backend
}

View File

@@ -61,7 +61,7 @@ impl From<Value> for Operand {
impl Operand {
pub fn try_as_logical_expr(&self) -> error::Result<Expr> {
match self {
Self::Column(c) => Ok(datafusion_expr::col(c)),
Self::Column(c) => Ok(datafusion_expr::col(format!(r#""{}""#, c))),
Self::Value(v) => {
let scalar_value = match v {
Value::Boolean(v) => ScalarValue::Boolean(Some(*v)),

View File

@@ -249,6 +249,7 @@ impl PipelineTable {
requests,
Self::query_ctx(&table_info),
&self.statement_executor,
false,
)
.await
.context(InsertPipelineSnafu)?;

View File

@@ -47,6 +47,11 @@ use crate::metrics::PROMQL_SERIES_COUNT;
#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
pub struct SeriesDivide {
tag_columns: Vec<String>,
/// `SeriesDivide` requires `time_index` column's name to generate ordering requirement
/// for input data. But this plan itself doesn't depend on the ordering of time index
/// column. This is for follow on plans like `RangeManipulate`. Because requiring ordering
/// here can avoid unnecessary sort in follow on plans.
time_index_column: String,
input: LogicalPlan,
}
@@ -84,14 +89,19 @@ impl UserDefinedLogicalNodeCore for SeriesDivide {
Ok(Self {
tag_columns: self.tag_columns.clone(),
time_index_column: self.time_index_column.clone(),
input: inputs[0].clone(),
})
}
}
impl SeriesDivide {
pub fn new(tag_columns: Vec<String>, input: LogicalPlan) -> Self {
Self { tag_columns, input }
pub fn new(tag_columns: Vec<String>, time_index_column: String, input: LogicalPlan) -> Self {
Self {
tag_columns,
time_index_column,
input,
}
}
pub const fn name() -> &'static str {
@@ -101,6 +111,7 @@ impl SeriesDivide {
pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(SeriesDivideExec {
tag_columns: self.tag_columns.clone(),
time_index_column: self.time_index_column.clone(),
input: exec_input,
metric: ExecutionPlanMetricsSet::new(),
})
@@ -113,6 +124,7 @@ impl SeriesDivide {
pub fn serialize(&self) -> Vec<u8> {
pb::SeriesDivide {
tag_columns: self.tag_columns.clone(),
time_index_column: self.time_index_column.clone(),
}
.encode_to_vec()
}
@@ -125,6 +137,7 @@ impl SeriesDivide {
});
Ok(Self {
tag_columns: pb_series_divide.tag_columns,
time_index_column: pb_series_divide.time_index_column,
input: placeholder_plan,
})
}
@@ -133,6 +146,7 @@ impl SeriesDivide {
#[derive(Debug)]
pub struct SeriesDivideExec {
tag_columns: Vec<String>,
time_index_column: String,
input: Arc<dyn ExecutionPlan>,
metric: ExecutionPlanMetricsSet,
}
@@ -163,7 +177,7 @@ impl ExecutionPlan for SeriesDivideExec {
fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
let input_schema = self.input.schema();
let exprs: Vec<PhysicalSortRequirement> = self
let mut exprs: Vec<PhysicalSortRequirement> = self
.tag_columns
.iter()
.map(|tag| PhysicalSortRequirement {
@@ -175,11 +189,17 @@ impl ExecutionPlan for SeriesDivideExec {
}),
})
.collect();
if !exprs.is_empty() {
vec![Some(LexRequirement::new(exprs))]
} else {
vec![None]
}
exprs.push(PhysicalSortRequirement {
expr: Arc::new(
ColumnExpr::new_with_schema(&self.time_index_column, &input_schema).unwrap(),
),
options: Some(SortOptions {
descending: false,
nulls_first: true,
}),
});
vec![Some(LexRequirement::new(exprs))]
}
fn maintains_input_order(&self) -> Vec<bool> {
@@ -197,6 +217,7 @@ impl ExecutionPlan for SeriesDivideExec {
assert!(!children.is_empty());
Ok(Arc::new(Self {
tag_columns: self.tag_columns.clone(),
time_index_column: self.time_index_column.clone(),
input: children[0].clone(),
metric: self.metric.clone(),
}))
@@ -474,6 +495,11 @@ mod test {
let schema = Arc::new(Schema::new(vec![
Field::new("host", DataType::Utf8, true),
Field::new("path", DataType::Utf8, true),
Field::new(
"time_index",
DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
false,
),
]));
let path_column_1 = Arc::new(StringArray::from(vec![
@@ -482,9 +508,17 @@ mod test {
let host_column_1 = Arc::new(StringArray::from(vec![
"000", "000", "001", "002", "002", "002", "002", "002", "003", "005", "005", "005",
])) as _;
let time_index_column_1 = Arc::new(
datafusion::arrow::array::TimestampMillisecondArray::from(vec![
1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 11000, 12000,
]),
) as _;
let path_column_2 = Arc::new(StringArray::from(vec!["bla", "bla", "bla"])) as _;
let host_column_2 = Arc::new(StringArray::from(vec!["005", "005", "005"])) as _;
let time_index_column_2 = Arc::new(
datafusion::arrow::array::TimestampMillisecondArray::from(vec![13000, 14000, 15000]),
) as _;
let path_column_3 = Arc::new(StringArray::from(vec![
"bla", "🥺", "🥺", "🥺", "🥺", "🥺", "🫠", "🫠",
@@ -492,13 +526,26 @@ mod test {
let host_column_3 = Arc::new(StringArray::from(vec![
"005", "001", "001", "001", "001", "001", "001", "001",
])) as _;
let time_index_column_3 =
Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
vec![16000, 17000, 18000, 19000, 20000, 21000, 22000, 23000],
)) as _;
let data_1 =
RecordBatch::try_new(schema.clone(), vec![path_column_1, host_column_1]).unwrap();
let data_2 =
RecordBatch::try_new(schema.clone(), vec![path_column_2, host_column_2]).unwrap();
let data_3 =
RecordBatch::try_new(schema.clone(), vec![path_column_3, host_column_3]).unwrap();
let data_1 = RecordBatch::try_new(
schema.clone(),
vec![path_column_1, host_column_1, time_index_column_1],
)
.unwrap();
let data_2 = RecordBatch::try_new(
schema.clone(),
vec![path_column_2, host_column_2, time_index_column_2],
)
.unwrap();
let data_3 = RecordBatch::try_new(
schema.clone(),
vec![path_column_3, host_column_3, time_index_column_3],
)
.unwrap();
MemoryExec::try_new(&[vec![data_1, data_2, data_3]], schema, None).unwrap()
}
@@ -508,6 +555,7 @@ mod test {
let memory_exec = Arc::new(prepare_test_data());
let divide_exec = Arc::new(SeriesDivideExec {
tag_columns: vec!["host".to_string(), "path".to_string()],
time_index_column: "time_index".to_string(),
input: memory_exec,
metric: ExecutionPlanMetricsSet::new(),
});
@@ -520,33 +568,33 @@ mod test {
.to_string();
let expected = String::from(
"+------+------+\
\n| host | path |\
\n+------+------+\
\n| foo | 000 |\
\n| foo | 000 |\
\n| foo | 001 |\
\n| bar | 002 |\
\n| bar | 002 |\
\n| bar | 002 |\
\n| bar | 002 |\
\n| bar | 002 |\
\n| bar | 003 |\
\n| bla | 005 |\
\n| bla | 005 |\
\n| bla | 005 |\
\n| bla | 005 |\
\n| bla | 005 |\
\n| bla | 005 |\
\n| bla | 005 |\
\n| 🥺 | 001 |\
\n| 🥺 | 001 |\
\n| 🥺 | 001 |\
\n| 🥺 | 001 |\
\n| 🥺 | 001 |\
\n| 🫠 | 001 |\
\n| 🫠 | 001 |\
\n+------+------+",
"+------+------+---------------------+\
\n| host | path | time_index |\
\n+------+------+---------------------+\
\n| foo | 000 | 1970-01-01T00:00:01 |\
\n| foo | 000 | 1970-01-01T00:00:02 |\
\n| foo | 001 | 1970-01-01T00:00:03 |\
\n| bar | 002 | 1970-01-01T00:00:04 |\
\n| bar | 002 | 1970-01-01T00:00:05 |\
\n| bar | 002 | 1970-01-01T00:00:06 |\
\n| bar | 002 | 1970-01-01T00:00:07 |\
\n| bar | 002 | 1970-01-01T00:00:08 |\
\n| bar | 003 | 1970-01-01T00:00:09 |\
\n| bla | 005 | 1970-01-01T00:00:10 |\
\n| bla | 005 | 1970-01-01T00:00:11 |\
\n| bla | 005 | 1970-01-01T00:00:12 |\
\n| bla | 005 | 1970-01-01T00:00:13 |\
\n| bla | 005 | 1970-01-01T00:00:14 |\
\n| bla | 005 | 1970-01-01T00:00:15 |\
\n| bla | 005 | 1970-01-01T00:00:16 |\
\n| 🥺 | 001 | 1970-01-01T00:00:17 |\
\n| 🥺 | 001 | 1970-01-01T00:00:18 |\
\n| 🥺 | 001 | 1970-01-01T00:00:19 |\
\n| 🥺 | 001 | 1970-01-01T00:00:20 |\
\n| 🥺 | 001 | 1970-01-01T00:00:21 |\
\n| 🫠 | 001 | 1970-01-01T00:00:22 |\
\n| 🫠 | 001 | 1970-01-01T00:00:23 |\
\n+------+------+---------------------+",
);
assert_eq!(result_literal, expected);
}
@@ -556,6 +604,7 @@ mod test {
let memory_exec = Arc::new(prepare_test_data());
let divide_exec = Arc::new(SeriesDivideExec {
tag_columns: vec!["host".to_string(), "path".to_string()],
time_index_column: "time_index".to_string(),
input: memory_exec,
metric: ExecutionPlanMetricsSet::new(),
});
@@ -565,69 +614,69 @@ mod test {
let mut expectations = vec![
String::from(
"+------+------+\
\n| host | path |\
\n+------+------+\
\n| foo | 000 |\
\n| foo | 000 |\
\n+------+------+",
"+------+------+---------------------+\
\n| host | path | time_index |\
\n+------+------+---------------------+\
\n| foo | 000 | 1970-01-01T00:00:01 |\
\n| foo | 000 | 1970-01-01T00:00:02 |\
\n+------+------+---------------------+",
),
String::from(
"+------+------+\
\n| host | path |\
\n+------+------+\
\n| foo | 001 |\
\n+------+------+",
"+------+------+---------------------+\
\n| host | path | time_index |\
\n+------+------+---------------------+\
\n| foo | 001 | 1970-01-01T00:00:03 |\
\n+------+------+---------------------+",
),
String::from(
"+------+------+\
\n| host | path |\
\n+------+------+\
\n| bar | 002 |\
\n| bar | 002 |\
\n| bar | 002 |\
\n| bar | 002 |\
\n| bar | 002 |\
\n+------+------+",
"+------+------+---------------------+\
\n| host | path | time_index |\
\n+------+------+---------------------+\
\n| bar | 002 | 1970-01-01T00:00:04 |\
\n| bar | 002 | 1970-01-01T00:00:05 |\
\n| bar | 002 | 1970-01-01T00:00:06 |\
\n| bar | 002 | 1970-01-01T00:00:07 |\
\n| bar | 002 | 1970-01-01T00:00:08 |\
\n+------+------+---------------------+",
),
String::from(
"+------+------+\
\n| host | path |\
\n+------+------+\
\n| bar | 003 |\
\n+------+------+",
"+------+------+---------------------+\
\n| host | path | time_index |\
\n+------+------+---------------------+\
\n| bar | 003 | 1970-01-01T00:00:09 |\
\n+------+------+---------------------+",
),
String::from(
"+------+------+\
\n| host | path |\
\n+------+------+\
\n| bla | 005 |\
\n| bla | 005 |\
\n| bla | 005 |\
\n| bla | 005 |\
\n| bla | 005 |\
\n| bla | 005 |\
\n| bla | 005 |\
\n+------+------+",
"+------+------+---------------------+\
\n| host | path | time_index |\
\n+------+------+---------------------+\
\n| bla | 005 | 1970-01-01T00:00:10 |\
\n| bla | 005 | 1970-01-01T00:00:11 |\
\n| bla | 005 | 1970-01-01T00:00:12 |\
\n| bla | 005 | 1970-01-01T00:00:13 |\
\n| bla | 005 | 1970-01-01T00:00:14 |\
\n| bla | 005 | 1970-01-01T00:00:15 |\
\n| bla | 005 | 1970-01-01T00:00:16 |\
\n+------+------+---------------------+",
),
String::from(
"+------+------+\
\n| host | path |\
\n+------+------+\
\n| 🥺 | 001 |\
\n| 🥺 | 001 |\
\n| 🥺 | 001 |\
\n| 🥺 | 001 |\
\n| 🥺 | 001 |\
\n+------+------+",
"+------+------+---------------------+\
\n| host | path | time_index |\
\n+------+------+---------------------+\
\n| 🥺 | 001 | 1970-01-01T00:00:17 |\
\n| 🥺 | 001 | 1970-01-01T00:00:18 |\
\n| 🥺 | 001 | 1970-01-01T00:00:19 |\
\n| 🥺 | 001 | 1970-01-01T00:00:20 |\
\n| 🥺 | 001 | 1970-01-01T00:00:21 |\
\n+------+------+---------------------+",
),
String::from(
"+------+------+\
\n| host | path |\
\n+------+------+\
\n| 🫠 | 001 |\
\n| 🫠 | 001 |\
\n+------+------+",
"+------+------+---------------------+\
\n| host | path | time_index |\
\n+------+------+---------------------+\
\n| 🫠 | 001 | 1970-01-01T00:00:22 |\
\n| 🫠 | 001 | 1970-01-01T00:00:23 |\
\n+------+------+---------------------+",
),
];
expectations.reverse();
@@ -648,6 +697,11 @@ mod test {
let schema = Arc::new(Schema::new(vec![
Field::new("host", DataType::Utf8, true),
Field::new("path", DataType::Utf8, true),
Field::new(
"time_index",
DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
false,
),
]));
// Create batches with three different combinations
@@ -660,6 +714,9 @@ mod test {
vec![
Arc::new(StringArray::from(vec!["server1", "server1", "server1"])) as _,
Arc::new(StringArray::from(vec!["/var/log", "/var/log", "/var/log"])) as _,
Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
vec![1000, 2000, 3000],
)) as _,
],
)
.unwrap();
@@ -669,6 +726,9 @@ mod test {
vec![
Arc::new(StringArray::from(vec!["server1", "server1"])) as _,
Arc::new(StringArray::from(vec!["/var/log", "/var/log"])) as _,
Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
vec![4000, 5000],
)) as _,
],
)
.unwrap();
@@ -683,6 +743,9 @@ mod test {
"/var/data",
"/var/data",
])) as _,
Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
vec![6000, 7000, 8000],
)) as _,
],
)
.unwrap();
@@ -692,6 +755,9 @@ mod test {
vec![
Arc::new(StringArray::from(vec!["server2"])) as _,
Arc::new(StringArray::from(vec!["/var/data"])) as _,
Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
vec![9000],
)) as _,
],
)
.unwrap();
@@ -702,6 +768,9 @@ mod test {
vec![
Arc::new(StringArray::from(vec!["server3", "server3"])) as _,
Arc::new(StringArray::from(vec!["/opt/logs", "/opt/logs"])) as _,
Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
vec![10000, 11000],
)) as _,
],
)
.unwrap();
@@ -715,6 +784,9 @@ mod test {
"/opt/logs",
"/opt/logs",
])) as _,
Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
vec![12000, 13000, 14000],
)) as _,
],
)
.unwrap();
@@ -732,6 +804,7 @@ mod test {
// Create SeriesDivideExec
let divide_exec = Arc::new(SeriesDivideExec {
tag_columns: vec!["host".to_string(), "path".to_string()],
time_index_column: "time_index".to_string(),
input: memory_exec,
metric: ExecutionPlanMetricsSet::new(),
});
@@ -766,10 +839,16 @@ mod test {
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let time_index_array1 = result[0]
.column(2)
.as_any()
.downcast_ref::<datafusion::arrow::array::TimestampMillisecondArray>()
.unwrap();
for i in 0..5 {
assert_eq!(host_array1.value(i), "server1");
assert_eq!(path_array1.value(i), "/var/log");
assert_eq!(time_index_array1.value(i), 1000 + (i as i64) * 1000);
}
// Verify values in second batch (server2, /var/data)
@@ -783,10 +862,16 @@ mod test {
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let time_index_array2 = result[1]
.column(2)
.as_any()
.downcast_ref::<datafusion::arrow::array::TimestampMillisecondArray>()
.unwrap();
for i in 0..4 {
assert_eq!(host_array2.value(i), "server2");
assert_eq!(path_array2.value(i), "/var/data");
assert_eq!(time_index_array2.value(i), 6000 + (i as i64) * 1000);
}
// Verify values in third batch (server3, /opt/logs)
@@ -800,10 +885,16 @@ mod test {
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let time_index_array3 = result[2]
.column(2)
.as_any()
.downcast_ref::<datafusion::arrow::array::TimestampMillisecondArray>()
.unwrap();
for i in 0..5 {
assert_eq!(host_array3.value(i), "server3");
assert_eq!(path_array3.value(i), "/opt/logs");
assert_eq!(time_index_array3.value(i), 10000 + (i as i64) * 1000);
}
// Also verify streaming behavior

View File

@@ -1035,8 +1035,19 @@ impl PromPlanner {
.context(DataFusionPlanningSnafu)?;
// make divide plan
let time_index_column =
self.ctx
.time_index_column
.clone()
.with_context(|| TimeIndexNotFoundSnafu {
table: table_ref.to_string(),
})?;
let divide_plan = LogicalPlan::Extension(Extension {
node: Arc::new(SeriesDivide::new(self.ctx.tag_columns.clone(), sort_plan)),
node: Arc::new(SeriesDivide::new(
self.ctx.tag_columns.clone(),
time_index_column,
sort_plan,
)),
});
// make series_normalize plan

View File

@@ -55,7 +55,7 @@ pub use show_create_table::create_table_stmt;
use snafu::{ensure, OptionExt, ResultExt};
use sql::ast::Ident;
use sql::parser::ParserContext;
use sql::statements::create::{CreateDatabase, CreateFlow, CreateView, Partitions};
use sql::statements::create::{CreateDatabase, CreateFlow, CreateView, Partitions, SqlOrTql};
use sql::statements::show::{
ShowColumns, ShowDatabases, ShowFlows, ShowIndex, ShowKind, ShowRegion, ShowTableStatus,
ShowTables, ShowVariables, ShowViews,
@@ -958,7 +958,15 @@ pub fn show_create_flow(
let mut parser_ctx =
ParserContext::new(query_ctx.sql_dialect(), flow_val.raw_sql()).context(error::SqlSnafu)?;
let query = parser_ctx.parser_query().context(error::SqlSnafu)?;
let query = parser_ctx.parse_statement().context(error::SqlSnafu)?;
// since prom ql will parse `now()` to a fixed time, we need to not use it for generating raw query
let raw_query = match &query {
Statement::Tql(_) => flow_val.raw_sql().clone(),
_ => query.to_string(),
};
let query = Box::new(SqlOrTql::try_from_statement(query, &raw_query).context(error::SqlSnafu)?);
let comment = if flow_val.comment().is_empty() {
None

View File

@@ -1 +1 @@
v0.9.0
v0.9.2

View File

@@ -167,7 +167,8 @@ impl GreptimeRequestHandler {
let timer = metrics::GRPC_BULK_INSERT_ELAPSED.start_timer();
let result = handler
.put_record_batch(&table_name, &mut table_id, &mut decoder, data)
.await;
.await
.inspect_err(|e| error!(e; "Failed to handle flight record batches"));
timer.observe_duration();
let result = result
.map(|x| DoPutResponse::new(request_id, x))

View File

@@ -475,6 +475,38 @@ pub fn mock_timeseries() -> Vec<TimeSeries> {
]
}
/// Add new labels to the mock timeseries.
pub fn mock_timeseries_new_label() -> Vec<TimeSeries> {
let ts_demo_metrics = TimeSeries {
labels: vec![
new_label(METRIC_NAME_LABEL.to_string(), "demo_metrics".to_string()),
new_label("idc".to_string(), "idc3".to_string()),
new_label("new_label1".to_string(), "foo".to_string()),
],
samples: vec![Sample {
value: 42.0,
timestamp: 3000,
}],
..Default::default()
};
let ts_multi_labels = TimeSeries {
labels: vec![
new_label(METRIC_NAME_LABEL.to_string(), "metric1".to_string()),
new_label("idc".to_string(), "idc4".to_string()),
new_label("env".to_string(), "prod".to_string()),
new_label("host".to_string(), "host9".to_string()),
new_label("new_label2".to_string(), "bar".to_string()),
],
samples: vec![Sample {
value: 99.0,
timestamp: 4000,
}],
..Default::default()
};
vec![ts_demo_metrics, ts_multi_labels]
}
#[cfg(test)]
mod tests {
use std::sync::Arc;

View File

@@ -195,6 +195,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Invalid flow query: {}", reason))]
InvalidFlowQuery {
reason: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid default constraint, column: {}", column))]
InvalidDefault {
column: String,
@@ -390,6 +397,7 @@ impl ErrorExt for Error {
| ColumnTypeMismatch { .. }
| InvalidTableName { .. }
| InvalidFlowName { .. }
| InvalidFlowQuery { .. }
| InvalidSqlValue { .. }
| TimestampOverflow { .. }
| InvalidTableOption { .. }

View File

@@ -40,12 +40,12 @@ use crate::parsers::utils::{
};
use crate::statements::create::{
Column, ColumnExtensions, CreateDatabase, CreateExternalTable, CreateFlow, CreateTable,
CreateTableLike, CreateView, Partitions, TableConstraint, VECTOR_OPT_DIM,
CreateTableLike, CreateView, Partitions, SqlOrTql, TableConstraint, VECTOR_OPT_DIM,
};
use crate::statements::statement::Statement;
use crate::statements::transform::type_alias::get_data_type_by_alias_name;
use crate::statements::{sql_data_type_to_concrete_data_type, OptionMap};
use crate::util::parse_option_string;
use crate::util::{location_to_index, parse_option_string};
pub const ENGINE: &str = "ENGINE";
pub const MAXVALUE: &str = "MAXVALUE";
@@ -282,12 +282,13 @@ impl<'a> ParserContext<'a> {
.consume_tokens(&[Token::make_keyword(EXPIRE), Token::make_keyword(AFTER)])
{
let expire_after_expr = self.parser.parse_expr().context(error::SyntaxSnafu)?;
let expire_after_lit = utils::parser_expr_to_scalar_value(expire_after_expr.clone())?
.cast_to(&ArrowDataType::Interval(IntervalUnit::MonthDayNano))
.ok()
.with_context(|| InvalidIntervalSnafu {
reason: format!("cannot cast {} to interval type", expire_after_expr),
})?;
let expire_after_lit =
utils::parser_expr_to_scalar_value_literal(expire_after_expr.clone())?
.cast_to(&ArrowDataType::Interval(IntervalUnit::MonthDayNano))
.ok()
.with_context(|| InvalidIntervalSnafu {
reason: format!("cannot cast {} to interval type", expire_after_expr),
})?;
if let ScalarValue::IntervalMonthDayNano(Some(interval)) = expire_after_lit {
Some(
interval.nanoseconds / 1_000_000_000
@@ -324,7 +325,22 @@ impl<'a> ParserContext<'a> {
.expect_keyword(Keyword::AS)
.context(SyntaxSnafu)?;
let query = self.parser.parse_query().context(error::SyntaxSnafu)?;
let start_loc = self.parser.peek_token().span.start;
let start_index = location_to_index(self.sql, &start_loc);
let query = self.parse_statement()?;
let end_token = self.parser.peek_token();
let raw_query = if end_token == Token::EOF {
&self.sql[start_index..]
} else {
let end_loc = end_token.span.end;
let end_index = location_to_index(self.sql, &end_loc);
&self.sql[start_index..end_index.min(self.sql.len())]
};
let raw_query = raw_query.trim_end_matches(";");
let query = Box::new(SqlOrTql::try_from_statement(query, raw_query)?);
Ok(Statement::CreateFlow(CreateFlow {
flow_name,

View File

@@ -23,6 +23,7 @@ use crate::parser::ParserContext;
use crate::parsers::utils;
use crate::statements::statement::Statement;
use crate::statements::tql::{Tql, TqlAnalyze, TqlEval, TqlExplain, TqlParameters};
use crate::util::location_to_index;
pub const TQL: &str = "TQL";
const EVAL: &str = "EVAL";
@@ -159,7 +160,7 @@ impl ParserContext<'_> {
let value = match tokens[0].clone() {
Token::Number(n, _) => n,
Token::DoubleQuotedString(s) | Token::SingleQuotedString(s) => s,
Token::Word(_) => Self::parse_tokens(tokens)?,
Token::Word(_) => Self::parse_tokens_to_ts(tokens)?,
unexpected => {
return Err(ParserError::ParserError(format!(
"Expected number, string or word, but have {unexpected:?}"
@@ -169,7 +170,7 @@ impl ParserContext<'_> {
};
Ok(value)
}
_ => Self::parse_tokens(tokens),
_ => Self::parse_tokens_to_ts(tokens),
};
for token in delimiter_tokens {
if parser.consume_token(token) {
@@ -182,9 +183,10 @@ impl ParserContext<'_> {
.context(ParserSnafu)
}
fn parse_tokens(tokens: Vec<Token>) -> std::result::Result<String, TQLError> {
/// Parse the tokens to seconds and convert to string.
fn parse_tokens_to_ts(tokens: Vec<Token>) -> std::result::Result<String, TQLError> {
let parser_expr = Self::parse_to_expr(tokens)?;
let lit = utils::parser_expr_to_scalar_value(parser_expr)
let lit = utils::parser_expr_to_scalar_value_literal(parser_expr)
.map_err(Box::new)
.context(ConvertToLogicalExpressionSnafu)?;
@@ -217,11 +219,15 @@ impl ParserContext<'_> {
while matches!(parser.peek_token().token, Token::Comma) {
let _skip_token = parser.next_token();
}
let index = parser.next_token().span.start.column as usize;
if index == 0 {
let start_tql = parser.next_token();
if start_tql == Token::EOF {
return Err(ParserError::ParserError("empty TQL query".to_string()));
}
let start_location = start_tql.span.start;
// translate the start location to the index in the sql string
let index = location_to_index(sql, &start_location);
let query = &sql[index - 1..];
while parser.next_token() != Token::EOF {
// consume all tokens

View File

@@ -41,7 +41,7 @@ use crate::error::{
/// Convert a parser expression to a scalar value. This function will try the
/// best to resolve and reduce constants. Exprs like `1 + 1` or `now()` can be
/// handled properly.
pub fn parser_expr_to_scalar_value(expr: sqlparser::ast::Expr) -> Result<ScalarValue> {
pub fn parser_expr_to_scalar_value_literal(expr: sqlparser::ast::Expr) -> Result<ScalarValue> {
// 1. convert parser expr to logical expr
let empty_df_schema = DFSchema::empty();
let logical_expr = SqlToRel::new(&StubContextProvider::default())

View File

@@ -24,8 +24,11 @@ use sqlparser::ast::{ColumnOptionDef, DataType, Expr, Query};
use sqlparser_derive::{Visit, VisitMut};
use crate::ast::{ColumnDef, Ident, ObjectName, Value as SqlValue};
use crate::error::{Result, SetFulltextOptionSnafu, SetSkippingIndexOptionSnafu};
use crate::error::{
InvalidFlowQuerySnafu, Result, SetFulltextOptionSnafu, SetSkippingIndexOptionSnafu,
};
use crate::statements::statement::Statement;
use crate::statements::tql::Tql;
use crate::statements::OptionMap;
const LINE_SEP: &str = ",\n";
@@ -374,7 +377,41 @@ pub struct CreateFlow {
/// Comment string
pub comment: Option<String>,
/// SQL statement
pub query: Box<Query>,
pub query: Box<SqlOrTql>,
}
/// Either a sql query or a tql query
#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
pub enum SqlOrTql {
Sql(Query, String),
Tql(Tql, String),
}
impl std::fmt::Display for SqlOrTql {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Sql(_, s) => write!(f, "{}", s),
Self::Tql(_, s) => write!(f, "{}", s),
}
}
}
impl SqlOrTql {
pub fn try_from_statement(
value: Statement,
original_query: &str,
) -> std::result::Result<Self, crate::error::Error> {
match value {
Statement::Query(query) => {
Ok(Self::Sql((*query).try_into()?, original_query.to_string()))
}
Statement::Tql(tql) => Ok(Self::Tql(tql, original_query.to_string())),
_ => InvalidFlowQuerySnafu {
reason: format!("Expect either sql query or promql query, found {:?}", value),
}
.fail(),
}
}
}
impl Display for CreateFlow {
@@ -741,7 +778,7 @@ WITH(
r#"
CREATE FLOW filter_numbers
SINK TO out_num_cnt
AS SELECT number FROM numbers_input WHERE number > 10"#,
AS SELECT number FROM numbers_input where number > 10"#,
&new_sql
);

View File

@@ -42,7 +42,7 @@ fn format_tql(
lookback: Option<&str>,
query: &str,
) -> std::fmt::Result {
write!(f, "({start}, {end}, {step}")?;
write!(f, "({start}, {end}, '{step}'")?;
if let Some(lookback) = lookback {
write!(f, ", {lookback}")?;
}

View File

@@ -15,9 +15,10 @@
use std::collections::HashSet;
use std::fmt::{Display, Formatter};
use sqlparser::ast::{Expr, ObjectName, Query, SetExpr, SqlOption, TableFactor, Value};
use sqlparser::ast::{Expr, ObjectName, SetExpr, SqlOption, TableFactor, Value};
use crate::error::{InvalidSqlSnafu, InvalidTableOptionValueSnafu, Result};
use crate::statements::create::SqlOrTql;
/// Format an [ObjectName] without any quote of its idents.
pub fn format_raw_object_name(name: &ObjectName) -> String {
@@ -58,14 +59,36 @@ pub fn parse_option_string(option: SqlOption) -> Result<(String, String)> {
}
/// Walk through a [Query] and extract all the tables referenced in it.
pub fn extract_tables_from_query(query: &Query) -> impl Iterator<Item = ObjectName> {
pub fn extract_tables_from_query(query: &SqlOrTql) -> impl Iterator<Item = ObjectName> {
let mut names = HashSet::new();
extract_tables_from_set_expr(&query.body, &mut names);
match query {
SqlOrTql::Sql(query, _) => extract_tables_from_set_expr(&query.body, &mut names),
SqlOrTql::Tql(_tql, _) => {
// since tql have sliding time window, so we don't need to extract tables from it
// (because we are going to eval it fully anyway)
}
}
names.into_iter()
}
/// translate the start location to the index in the sql string
pub fn location_to_index(sql: &str, location: &sqlparser::tokenizer::Location) -> usize {
let mut index = 0;
for (lno, line) in sql.lines().enumerate() {
if lno + 1 == location.line as usize {
index += location.column as usize;
break;
} else {
index += line.len() + 1; // +1 for the newline
}
}
// -1 because the index is 0-based
// and the location is 1-based
index - 1
}
/// Helper function for [extract_tables_from_query].
///
/// Handle [SetExpr].
@@ -98,3 +121,53 @@ fn table_factor_to_object_name(table_factor: &TableFactor, names: &mut HashSet<O
names.insert(name.to_owned());
}
}
#[cfg(test)]
mod tests {
use sqlparser::tokenizer::Token;
use super::*;
use crate::dialect::GreptimeDbDialect;
use crate::parser::ParserContext;
#[test]
fn test_location_to_index() {
let testcases = vec![
"SELECT * FROM t WHERE a = 1",
// start or end with newline
r"
SELECT *
FROM
t
WHERE a =
1
",
r"SELECT *
FROM
t
WHERE a =
1
",
r"
SELECT *
FROM
t
WHERE a =
1",
];
for sql in testcases {
let mut parser = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap();
loop {
let token = parser.parser.next_token();
if token == Token::EOF {
break;
}
let span = token.span;
let subslice =
&sql[location_to_index(sql, &span.start)..location_to_index(sql, &span.end)];
assert_eq!(token.to_string(), subslice);
}
}
}
}

View File

@@ -1027,14 +1027,6 @@ pub enum MetadataError {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to decode prost message"))]
Prost {
#[snafu(source)]
error: prost::DecodeError,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for MetadataError {

View File

@@ -29,14 +29,8 @@ pub const SNAPSHOT_READ: &str = "snapshot_read";
pub const COMPACTION_TYPE: &str = "compaction.type";
/// TWCS compaction strategy.
pub const COMPACTION_TYPE_TWCS: &str = "twcs";
/// Option key for twcs max active window runs.
pub const TWCS_MAX_ACTIVE_WINDOW_RUNS: &str = "compaction.twcs.max_active_window_runs";
/// Option key for twcs max active window files.
pub const TWCS_MAX_ACTIVE_WINDOW_FILES: &str = "compaction.twcs.max_active_window_files";
/// Option key for twcs max inactive window runs.
pub const TWCS_MAX_INACTIVE_WINDOW_RUNS: &str = "compaction.twcs.max_inactive_window_runs";
/// Option key for twcs max inactive window files.
pub const TWCS_MAX_INACTIVE_WINDOW_FILES: &str = "compaction.twcs.max_inactive_window_files";
/// Option key for twcs min file num to trigger a compaction.
pub const TWCS_TRIGGER_FILE_NUM: &str = "compaction.twcs.trigger_file_num";
/// Option key for twcs max output file size.
pub const TWCS_MAX_OUTPUT_FILE_SIZE: &str = "compaction.twcs.max_output_file_size";
/// Option key for twcs time window.
@@ -68,10 +62,7 @@ pub fn is_mito_engine_option_key(key: &str) -> bool {
[
"ttl",
COMPACTION_TYPE,
TWCS_MAX_ACTIVE_WINDOW_RUNS,
TWCS_MAX_ACTIVE_WINDOW_FILES,
TWCS_MAX_INACTIVE_WINDOW_RUNS,
TWCS_MAX_INACTIVE_WINDOW_FILES,
TWCS_TRIGGER_FILE_NUM,
TWCS_MAX_OUTPUT_FILE_SIZE,
TWCS_TIME_WINDOW,
REMOTE_COMPACTION,
@@ -100,10 +91,7 @@ mod tests {
assert!(is_mito_engine_option_key("ttl"));
assert!(is_mito_engine_option_key("compaction.type"));
assert!(is_mito_engine_option_key(
"compaction.twcs.max_active_window_runs"
));
assert!(is_mito_engine_option_key(
"compaction.twcs.max_inactive_window_runs"
"compaction.twcs.trigger_file_num"
));
assert!(is_mito_engine_option_key("compaction.twcs.time_window"));
assert!(is_mito_engine_option_key("storage"));

View File

@@ -22,22 +22,20 @@ use api::v1::column_def::{
};
use api::v1::region::bulk_insert_request::Body;
use api::v1::region::{
alter_request, compact_request, region_request, AlterRequest, AlterRequests, BulkInsertRequest,
CloseRequest, CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest,
DropRequests, FlushRequest, InsertRequests, OpenRequest, TruncateRequest,
alter_request, compact_request, region_request, AlterRequest, AlterRequests, ArrowIpc,
BulkInsertRequest, CloseRequest, CompactRequest, CreateRequest, CreateRequests, DeleteRequests,
DropRequest, DropRequests, FlushRequest, InsertRequests, OpenRequest, TruncateRequest,
};
use api::v1::{
self, set_index, Analyzer, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows,
SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
};
pub use common_base::AffectedRows;
use common_grpc::flight::{FlightDecoder, FlightMessage};
use common_grpc::FlightData;
use common_grpc::flight::FlightDecoder;
use common_recordbatch::DfRecordBatch;
use common_time::TimeToLive;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
use prost::Message;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use strum::{AsRefStr, IntoStaticStr};
@@ -46,15 +44,12 @@ use crate::logstore::entry;
use crate::metadata::{
ColumnMetadata, DecodeProtoSnafu, FlightCodecSnafu, InvalidRawRegionRequestSnafu,
InvalidRegionRequestSnafu, InvalidSetRegionOptionRequestSnafu,
InvalidUnsetRegionOptionRequestSnafu, MetadataError, ProstSnafu, RegionMetadata, Result,
UnexpectedSnafu,
InvalidUnsetRegionOptionRequestSnafu, MetadataError, RegionMetadata, Result, UnexpectedSnafu,
};
use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
use crate::metrics;
use crate::mito_engine_options::{
TTL_KEY, TWCS_MAX_ACTIVE_WINDOW_FILES, TWCS_MAX_ACTIVE_WINDOW_RUNS,
TWCS_MAX_INACTIVE_WINDOW_FILES, TWCS_MAX_INACTIVE_WINDOW_RUNS, TWCS_MAX_OUTPUT_FILE_SIZE,
TWCS_TIME_WINDOW,
TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM,
};
use crate::path_utils::region_dir;
use crate::storage::{ColumnId, RegionId, ScanRequest};
@@ -334,22 +329,21 @@ fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId,
return Ok(vec![]);
};
let ArrowIpc {
region_id,
schema,
payload,
data_header,
} = request;
let decoder_timer = metrics::CONVERT_REGION_BULK_REQUEST
.with_label_values(&["decode"])
.start_timer();
let schema_data = FlightData::decode(request.schema.clone()).context(ProstSnafu)?;
let payload_data = FlightData::decode(request.payload.clone()).context(ProstSnafu)?;
let mut decoder = FlightDecoder::default();
let _ = decoder.try_decode(&schema_data).context(FlightCodecSnafu)?;
let FlightMessage::Recordbatch(rb) = decoder
.try_decode(&payload_data)
.context(FlightCodecSnafu)?
else {
unreachable!("Always expect record batch message after schema");
};
let mut decoder = FlightDecoder::try_from_schema_bytes(&schema).context(FlightCodecSnafu)?;
let payload = decoder
.try_decode_record_batch(&data_header, &payload)
.context(FlightCodecSnafu)?;
decoder_timer.observe_duration();
let payload = rb.into_df_record_batch();
let region_id: RegionId = request.region_id.into();
let region_id: RegionId = region_id.into();
Ok(vec![(
region_id,
RegionRequest::BulkInserts(RegionBulkInsertsRequest { region_id, payload }),
@@ -1027,12 +1021,9 @@ impl TryFrom<&PbOption> for SetRegionOption {
Ok(Self::Ttl(Some(ttl)))
}
TWCS_MAX_ACTIVE_WINDOW_RUNS
| TWCS_MAX_ACTIVE_WINDOW_FILES
| TWCS_MAX_INACTIVE_WINDOW_FILES
| TWCS_MAX_INACTIVE_WINDOW_RUNS
| TWCS_MAX_OUTPUT_FILE_SIZE
| TWCS_TIME_WINDOW => Ok(Self::Twsc(key.to_string(), value.to_string())),
TWCS_TRIGGER_FILE_NUM | TWCS_MAX_OUTPUT_FILE_SIZE | TWCS_TIME_WINDOW => {
Ok(Self::Twsc(key.to_string(), value.to_string()))
}
_ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
}
}
@@ -1041,16 +1032,7 @@ impl TryFrom<&PbOption> for SetRegionOption {
impl From<&UnsetRegionOption> for SetRegionOption {
fn from(unset_option: &UnsetRegionOption) -> Self {
match unset_option {
UnsetRegionOption::TwcsMaxActiveWindowFiles => {
SetRegionOption::Twsc(unset_option.to_string(), String::new())
}
UnsetRegionOption::TwcsMaxInactiveWindowFiles => {
SetRegionOption::Twsc(unset_option.to_string(), String::new())
}
UnsetRegionOption::TwcsMaxActiveWindowRuns => {
SetRegionOption::Twsc(unset_option.to_string(), String::new())
}
UnsetRegionOption::TwcsMaxInactiveWindowRuns => {
UnsetRegionOption::TwcsTriggerFileNum => {
SetRegionOption::Twsc(unset_option.to_string(), String::new())
}
UnsetRegionOption::TwcsMaxOutputFileSize => {
@@ -1070,10 +1052,7 @@ impl TryFrom<&str> for UnsetRegionOption {
fn try_from(key: &str) -> Result<Self> {
match key.to_ascii_lowercase().as_str() {
TTL_KEY => Ok(Self::Ttl),
TWCS_MAX_ACTIVE_WINDOW_FILES => Ok(Self::TwcsMaxActiveWindowFiles),
TWCS_MAX_INACTIVE_WINDOW_FILES => Ok(Self::TwcsMaxInactiveWindowFiles),
TWCS_MAX_ACTIVE_WINDOW_RUNS => Ok(Self::TwcsMaxActiveWindowRuns),
TWCS_MAX_INACTIVE_WINDOW_RUNS => Ok(Self::TwcsMaxInactiveWindowRuns),
TWCS_TRIGGER_FILE_NUM => Ok(Self::TwcsTriggerFileNum),
TWCS_MAX_OUTPUT_FILE_SIZE => Ok(Self::TwcsMaxOutputFileSize),
TWCS_TIME_WINDOW => Ok(Self::TwcsTimeWindow),
_ => InvalidUnsetRegionOptionRequestSnafu { key }.fail(),
@@ -1083,10 +1062,7 @@ impl TryFrom<&str> for UnsetRegionOption {
#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
pub enum UnsetRegionOption {
TwcsMaxActiveWindowFiles,
TwcsMaxInactiveWindowFiles,
TwcsMaxActiveWindowRuns,
TwcsMaxInactiveWindowRuns,
TwcsTriggerFileNum,
TwcsMaxOutputFileSize,
TwcsTimeWindow,
Ttl,
@@ -1096,10 +1072,7 @@ impl UnsetRegionOption {
pub fn as_str(&self) -> &str {
match self {
Self::Ttl => TTL_KEY,
Self::TwcsMaxActiveWindowFiles => TWCS_MAX_ACTIVE_WINDOW_FILES,
Self::TwcsMaxInactiveWindowFiles => TWCS_MAX_INACTIVE_WINDOW_FILES,
Self::TwcsMaxActiveWindowRuns => TWCS_MAX_ACTIVE_WINDOW_RUNS,
Self::TwcsMaxInactiveWindowRuns => TWCS_MAX_INACTIVE_WINDOW_RUNS,
Self::TwcsTriggerFileNum => TWCS_TRIGGER_FILE_NUM,
Self::TwcsMaxOutputFileSize => TWCS_MAX_OUTPUT_FILE_SIZE,
Self::TwcsTimeWindow => TWCS_TIME_WINDOW,
}

View File

@@ -238,21 +238,9 @@ impl<R: Rng> Generator<AlterTableExpr, R> for AlterExprSetTableOptionsGenerator<
let max_output_file_size: u64 = rng.random();
AlterTableOption::TwcsMaxOutputFileSize(ReadableSize(max_output_file_size))
}
AlterTableOption::TwcsMaxInactiveWindowRuns(_) => {
let max_inactive_window_runs: u64 = rng.random();
AlterTableOption::TwcsMaxInactiveWindowRuns(max_inactive_window_runs)
}
AlterTableOption::TwcsMaxActiveWindowFiles(_) => {
let max_active_window_files: u64 = rng.random();
AlterTableOption::TwcsMaxActiveWindowFiles(max_active_window_files)
}
AlterTableOption::TwcsMaxActiveWindowRuns(_) => {
let max_active_window_runs: u64 = rng.random();
AlterTableOption::TwcsMaxActiveWindowRuns(max_active_window_runs)
}
AlterTableOption::TwcsMaxInactiveWindowFiles(_) => {
let max_inactive_window_files: u64 = rng.random();
AlterTableOption::TwcsMaxInactiveWindowFiles(max_inactive_window_files)
AlterTableOption::TwcsTriggerFileNum(_) => {
let trigger_file_num: u64 = rng.random();
AlterTableOption::TwcsTriggerFileNum(trigger_file_num)
}
})
.collect();
@@ -365,7 +353,7 @@ mod tests {
.generate(&mut rng)
.unwrap();
let serialized = serde_json::to_string(&expr).unwrap();
let expected = r#"{"table_name":{"value":"quasi","quote_style":null},"alter_kinds":{"SetTableOptions":{"options":[{"TwcsMaxOutputFileSize":16770910638250818741}]}}}"#;
let expected = r#"{"table_name":{"value":"quasi","quote_style":null},"alter_kinds":{"SetTableOptions":{"options":[{"TwcsTimeWindow":{"value":2428665013,"unit":"Millisecond"}}]}}}"#;
assert_eq!(expected, serialized);
let expr = AlterExprUnsetTableOptionsGeneratorBuilder::default()
@@ -375,7 +363,7 @@ mod tests {
.generate(&mut rng)
.unwrap();
let serialized = serde_json::to_string(&expr).unwrap();
let expected = r#"{"table_name":{"value":"quasi","quote_style":null},"alter_kinds":{"UnsetTableOptions":{"keys":["compaction.twcs.max_active_window_runs","compaction.twcs.max_output_file_size","compaction.twcs.time_window","compaction.twcs.max_inactive_window_files","compaction.twcs.max_active_window_files"]}}}"#;
let expected = r#"{"table_name":{"value":"quasi","quote_style":null},"alter_kinds":{"UnsetTableOptions":{"keys":["compaction.twcs.trigger_file_num","compaction.twcs.time_window"]}}}"#;
assert_eq!(expected, serialized);
}
}

View File

@@ -21,9 +21,8 @@ use common_time::{Duration, FOREVER, INSTANT};
use derive_builder::Builder;
use serde::{Deserialize, Serialize};
use store_api::mito_engine_options::{
APPEND_MODE_KEY, COMPACTION_TYPE, TTL_KEY, TWCS_MAX_ACTIVE_WINDOW_FILES,
TWCS_MAX_ACTIVE_WINDOW_RUNS, TWCS_MAX_INACTIVE_WINDOW_FILES, TWCS_MAX_INACTIVE_WINDOW_RUNS,
TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW,
APPEND_MODE_KEY, COMPACTION_TYPE, TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW,
TWCS_TRIGGER_FILE_NUM,
};
use strum::EnumIter;
@@ -78,10 +77,7 @@ pub enum AlterTableOption {
Ttl(Ttl),
TwcsTimeWindow(Duration),
TwcsMaxOutputFileSize(ReadableSize),
TwcsMaxInactiveWindowFiles(u64),
TwcsMaxActiveWindowFiles(u64),
TwcsMaxInactiveWindowRuns(u64),
TwcsMaxActiveWindowRuns(u64),
TwcsTriggerFileNum(u64),
}
impl AlterTableOption {
@@ -90,10 +86,7 @@ impl AlterTableOption {
AlterTableOption::Ttl(_) => TTL_KEY,
AlterTableOption::TwcsTimeWindow(_) => TWCS_TIME_WINDOW,
AlterTableOption::TwcsMaxOutputFileSize(_) => TWCS_MAX_OUTPUT_FILE_SIZE,
AlterTableOption::TwcsMaxInactiveWindowFiles(_) => TWCS_MAX_INACTIVE_WINDOW_FILES,
AlterTableOption::TwcsMaxActiveWindowFiles(_) => TWCS_MAX_ACTIVE_WINDOW_FILES,
AlterTableOption::TwcsMaxInactiveWindowRuns(_) => TWCS_MAX_INACTIVE_WINDOW_RUNS,
AlterTableOption::TwcsMaxActiveWindowRuns(_) => TWCS_MAX_ACTIVE_WINDOW_RUNS,
AlterTableOption::TwcsTriggerFileNum(_) => TWCS_TRIGGER_FILE_NUM,
}
}
@@ -111,21 +104,9 @@ impl AlterTableOption {
};
Ok(AlterTableOption::Ttl(ttl))
}
TWCS_MAX_ACTIVE_WINDOW_RUNS => {
let runs = value.parse().unwrap();
Ok(AlterTableOption::TwcsMaxActiveWindowRuns(runs))
}
TWCS_MAX_ACTIVE_WINDOW_FILES => {
TWCS_TRIGGER_FILE_NUM => {
let files = value.parse().unwrap();
Ok(AlterTableOption::TwcsMaxActiveWindowFiles(files))
}
TWCS_MAX_INACTIVE_WINDOW_RUNS => {
let runs = value.parse().unwrap();
Ok(AlterTableOption::TwcsMaxInactiveWindowRuns(runs))
}
TWCS_MAX_INACTIVE_WINDOW_FILES => {
let files = value.parse().unwrap();
Ok(AlterTableOption::TwcsMaxInactiveWindowFiles(files))
Ok(AlterTableOption::TwcsTriggerFileNum(files))
}
TWCS_MAX_OUTPUT_FILE_SIZE => {
// may be "1M" instead of "1 MiB"
@@ -178,17 +159,8 @@ impl Display for AlterTableOption {
// Caution: to_string loses precision for ReadableSize
write!(f, "'{}' = '{}'", TWCS_MAX_OUTPUT_FILE_SIZE, s)
}
AlterTableOption::TwcsMaxInactiveWindowFiles(u) => {
write!(f, "'{}' = '{}'", TWCS_MAX_INACTIVE_WINDOW_FILES, u)
}
AlterTableOption::TwcsMaxActiveWindowFiles(u) => {
write!(f, "'{}' = '{}'", TWCS_MAX_ACTIVE_WINDOW_FILES, u)
}
AlterTableOption::TwcsMaxInactiveWindowRuns(u) => {
write!(f, "'{}' = '{}'", TWCS_MAX_INACTIVE_WINDOW_RUNS, u)
}
AlterTableOption::TwcsMaxActiveWindowRuns(u) => {
write!(f, "'{}' = '{}'", TWCS_MAX_ACTIVE_WINDOW_RUNS, u)
AlterTableOption::TwcsTriggerFileNum(u) => {
write!(f, "'{}' = '{}'", TWCS_TRIGGER_FILE_NUM, u)
}
}
}
@@ -212,21 +184,15 @@ mod tests {
]
);
let option_string = "compaction.twcs.max_active_window_files = '5030469694939972912',
compaction.twcs.max_active_window_runs = '8361168990283879099',
compaction.twcs.max_inactive_window_files = '6028716566907830876',
compaction.twcs.max_inactive_window_runs = '10622283085591494074',
let option_string = "compaction.twcs.trigger_file_num = '5030469694939972912',
compaction.twcs.max_output_file_size = '15686.4PiB',
compaction.twcs.time_window = '2061999256ms',
compaction.type = 'twcs',
ttl = '1month 3days 15h 49m 8s 279ms'";
let options = AlterTableOption::parse_kv_pairs(option_string).unwrap();
assert_eq!(options.len(), 7);
assert_eq!(options.len(), 4);
let expected = vec![
AlterTableOption::TwcsMaxActiveWindowFiles(5030469694939972912),
AlterTableOption::TwcsMaxActiveWindowRuns(8361168990283879099),
AlterTableOption::TwcsMaxInactiveWindowFiles(6028716566907830876),
AlterTableOption::TwcsMaxInactiveWindowRuns(10622283085591494074),
AlterTableOption::TwcsTriggerFileNum(5030469694939972912),
AlterTableOption::TwcsMaxOutputFileSize(ReadableSize::from_str("15686.4PiB").unwrap()),
AlterTableOption::TwcsTimeWindow(Duration::new_nanosecond(2_061_999_256_000_000)),
AlterTableOption::Ttl(Ttl::Duration(Duration::new_millisecond(

View File

@@ -191,10 +191,7 @@ mod tests {
AlterTableOption::Ttl(Ttl::Duration(Duration::new_second(60))),
AlterTableOption::TwcsTimeWindow(Duration::new_second(60)),
AlterTableOption::TwcsMaxOutputFileSize(ReadableSize::from_str("1GB").unwrap()),
AlterTableOption::TwcsMaxActiveWindowFiles(10),
AlterTableOption::TwcsMaxActiveWindowRuns(10),
AlterTableOption::TwcsMaxInactiveWindowFiles(5),
AlterTableOption::TwcsMaxInactiveWindowRuns(5),
AlterTableOption::TwcsTriggerFileNum(5),
],
},
};
@@ -204,10 +201,7 @@ mod tests {
"ALTER TABLE test SET 'ttl' = '60s', ",
"'compaction.twcs.time_window' = '60s', ",
"'compaction.twcs.max_output_file_size' = '1.0GiB', ",
"'compaction.twcs.max_active_window_files' = '10', ",
"'compaction.twcs.max_active_window_runs' = '10', ",
"'compaction.twcs.max_inactive_window_files' = '5', ",
"'compaction.twcs.max_inactive_window_runs' = '5';"
"'compaction.twcs.trigger_file_num' = '5';"
);
assert_eq!(expected, output);
}

View File

@@ -187,10 +187,7 @@ mod tests {
AlterTableOption::Ttl(Ttl::Duration(Duration::new_second(60))),
AlterTableOption::TwcsTimeWindow(Duration::new_second(60)),
AlterTableOption::TwcsMaxOutputFileSize(ReadableSize::from_str("1GB").unwrap()),
AlterTableOption::TwcsMaxActiveWindowFiles(10),
AlterTableOption::TwcsMaxActiveWindowRuns(10),
AlterTableOption::TwcsMaxInactiveWindowFiles(5),
AlterTableOption::TwcsMaxInactiveWindowRuns(5),
AlterTableOption::TwcsTriggerFileNum(10),
],
},
};
@@ -200,10 +197,7 @@ mod tests {
"ALTER TABLE test SET 'ttl' = '60s', ",
"'compaction.twcs.time_window' = '60s', ",
"'compaction.twcs.max_output_file_size' = '1.0GiB', ",
"'compaction.twcs.max_active_window_files' = '10', ",
"'compaction.twcs.max_active_window_runs' = '10', ",
"'compaction.twcs.max_inactive_window_files' = '5', ",
"'compaction.twcs.max_inactive_window_runs' = '5';"
"'compaction.twcs.trigger_file_num' = '10';",
);
assert_eq!(expected, output);
}

View File

@@ -59,10 +59,10 @@ mod test {
let record_batches = create_record_batches(1);
test_put_record_batches(&client, record_batches).await;
let sql = "select ts, a, b from foo order by ts";
let sql = "select ts, a, `B` from foo order by ts";
let expected = "\
+-------------------------+----+----+
| ts | a | b |
| ts | a | B |
+-------------------------+----+----+
| 1970-01-01T00:00:00.001 | -1 | s1 |
| 1970-01-01T00:00:00.002 | -2 | s2 |
@@ -116,10 +116,10 @@ mod test {
let record_batches = create_record_batches(1);
test_put_record_batches(&client, record_batches).await;
let sql = "select ts, a, b from foo order by ts";
let sql = "select ts, a, `B` from foo order by ts";
let expected = "\
+-------------------------+----+----+
| ts | a | b |
| ts | a | B |
+-------------------------+----+----+
| 1970-01-01T00:00:00.001 | -1 | s1 |
| 1970-01-01T00:00:00.002 | -2 | s2 |
@@ -192,7 +192,7 @@ mod test {
)
.with_time_index(true),
ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false),
ColumnSchema::new("b", ConcreteDataType::string_datatype(), true),
ColumnSchema::new("B", ConcreteDataType::string_datatype(), true),
]));
let mut record_batches = Vec::with_capacity(3);
@@ -250,7 +250,7 @@ mod test {
..Default::default()
},
ColumnDef {
name: "b".to_string(),
name: "B".to_string(),
data_type: ColumnDataType::String as i32,
semantic_type: SemanticType::Field as i32,
is_nullable: true,

Some files were not shown because too many files have changed in this diff Show More