mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-06 13:22:57 +00:00
Compare commits
23 Commits
ci/update-
...
v0.15.0-ni
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3d942f6763 | ||
|
|
3901863432 | ||
|
|
27e339f628 | ||
|
|
cf2712e6f4 | ||
|
|
4b71e493f7 | ||
|
|
bf496e05cc | ||
|
|
513ca951ee | ||
|
|
791f530a78 | ||
|
|
1de6d8c619 | ||
|
|
a4d0420727 | ||
|
|
fc6300a2ba | ||
|
|
f55af5838c | ||
|
|
5a0da5b6bb | ||
|
|
d5f0006864 | ||
|
|
ede82331b2 | ||
|
|
56e696bd55 | ||
|
|
bc0cdf62ba | ||
|
|
eaf7b4b9dd | ||
|
|
7ae0e150e5 | ||
|
|
43c30b55ae | ||
|
|
153e80450a | ||
|
|
1624dc41c5 | ||
|
|
300262562b |
@@ -22,7 +22,6 @@ datanode:
|
||||
[wal]
|
||||
provider = "kafka"
|
||||
broker_endpoints = ["kafka.kafka-cluster.svc.cluster.local:9092"]
|
||||
linger = "2ms"
|
||||
overwrite_entry_start_id = true
|
||||
frontend:
|
||||
configData: |-
|
||||
|
||||
3
.github/scripts/create-version.sh
vendored
3
.github/scripts/create-version.sh
vendored
@@ -16,7 +16,8 @@ function create_version() {
|
||||
|
||||
if [ -z "$NEXT_RELEASE_VERSION" ]; then
|
||||
echo "NEXT_RELEASE_VERSION is empty, use version from Cargo.toml" >&2
|
||||
export NEXT_RELEASE_VERSION=$(grep '^version = ' Cargo.toml | cut -d '"' -f 2 | head -n 1)
|
||||
# NOTE: Need a `v` prefix for the version string.
|
||||
export NEXT_RELEASE_VERSION=v$(grep '^version = ' Cargo.toml | cut -d '"' -f 2 | head -n 1)
|
||||
fi
|
||||
|
||||
if [ -z "$NIGHTLY_RELEASE_PREFIX" ]; then
|
||||
|
||||
@@ -4,7 +4,7 @@ DEV_BUILDER_IMAGE_TAG=$1
|
||||
|
||||
update_dev_builder_version() {
|
||||
if [ -z "$DEV_BUILDER_IMAGE_TAG" ]; then
|
||||
echo "Error: Should specify the dev-builder image tag"
|
||||
echo "Error: Should specify the dev-builder image tag"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
@@ -17,7 +17,7 @@ update_dev_builder_version() {
|
||||
git checkout -b $BRANCH_NAME
|
||||
|
||||
# Update the dev-builder image tag in the Makefile.
|
||||
gsed -i "s/DEV_BUILDER_IMAGE_TAG ?=.*/DEV_BUILDER_IMAGE_TAG ?= ${DEV_BUILDER_IMAGE_TAG}/g" Makefile
|
||||
sed -i "s/DEV_BUILDER_IMAGE_TAG ?=.*/DEV_BUILDER_IMAGE_TAG ?= ${DEV_BUILDER_IMAGE_TAG}/g" Makefile
|
||||
|
||||
# Commit the changes.
|
||||
git add Makefile
|
||||
|
||||
4
.github/workflows/develop.yml
vendored
4
.github/workflows/develop.yml
vendored
@@ -195,6 +195,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 60
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
target: [ "unstable_fuzz_create_table_standalone" ]
|
||||
steps:
|
||||
@@ -299,6 +300,7 @@ jobs:
|
||||
needs: build-greptime-ci
|
||||
timeout-minutes: 60
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
target: [ "fuzz_create_table", "fuzz_alter_table", "fuzz_create_database", "fuzz_create_logical_table", "fuzz_alter_logical_table", "fuzz_insert", "fuzz_insert_logical_table" ]
|
||||
mode:
|
||||
@@ -431,6 +433,7 @@ jobs:
|
||||
needs: build-greptime-ci
|
||||
timeout-minutes: 60
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
target: ["fuzz_migrate_mito_regions", "fuzz_migrate_metric_regions", "fuzz_failover_mito_regions", "fuzz_failover_metric_regions"]
|
||||
mode:
|
||||
@@ -578,6 +581,7 @@ jobs:
|
||||
needs: build
|
||||
runs-on: ${{ matrix.os }}
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
os: [ ubuntu-latest ]
|
||||
mode:
|
||||
|
||||
1
.github/workflows/semantic-pull-request.yml
vendored
1
.github/workflows/semantic-pull-request.yml
vendored
@@ -16,6 +16,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
pull-requests: write # Add permissions to modify PRs
|
||||
issues: write
|
||||
timeout-minutes: 10
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -2223,6 +2223,7 @@ version = "0.15.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow-flight",
|
||||
"bytes",
|
||||
"common-base",
|
||||
"common-error",
|
||||
"common-macro",
|
||||
@@ -4352,6 +4353,7 @@ dependencies = [
|
||||
"session",
|
||||
"smallvec",
|
||||
"snafu 0.8.5",
|
||||
"sql",
|
||||
"store-api",
|
||||
"strum 0.27.1",
|
||||
"substrait 0.15.0",
|
||||
@@ -4855,7 +4857,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=7668a882d57ca6a2333146e0574b8f0c9d5008ae#7668a882d57ca6a2333146e0574b8f0c9d5008ae"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=67ee5f94e5da72314cda7d0eb90106eb1c16a1ae#67ee5f94e5da72314cda7d0eb90106eb1c16a1ae"
|
||||
dependencies = [
|
||||
"prost 0.13.5",
|
||||
"serde",
|
||||
|
||||
@@ -132,7 +132,7 @@ etcd-client = "0.14"
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "7668a882d57ca6a2333146e0574b8f0c9d5008ae" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "67ee5f94e5da72314cda7d0eb90106eb1c16a1ae" }
|
||||
hex = "0.4"
|
||||
http = "1"
|
||||
humantime = "2.1"
|
||||
|
||||
2
Makefile
2
Makefile
@@ -8,7 +8,7 @@ CARGO_BUILD_OPTS := --locked
|
||||
IMAGE_REGISTRY ?= docker.io
|
||||
IMAGE_NAMESPACE ?= greptime
|
||||
IMAGE_TAG ?= latest
|
||||
DEV_BUILDER_IMAGE_TAG ?= 2025-04-15-1a517ec8-20250428023155
|
||||
DEV_BUILDER_IMAGE_TAG ?= 2025-05-19-b2377d4b-20250520045554
|
||||
BUILDX_MULTI_PLATFORM_BUILD ?= false
|
||||
BUILDX_BUILDER_NAME ?= gtbuilder
|
||||
BASE_IMAGE ?= ubuntu
|
||||
|
||||
10
README.md
10
README.md
@@ -121,7 +121,7 @@ docker pull greptime/greptimedb
|
||||
|
||||
```shell
|
||||
docker run -p 127.0.0.1:4000-4003:4000-4003 \
|
||||
-v "$(pwd)/greptimedb:/greptimedb_data" \
|
||||
-v "$(pwd)/greptimedb_data:/greptimedb_data" \
|
||||
--name greptime --rm \
|
||||
greptime/greptimedb:latest standalone start \
|
||||
--http-addr 0.0.0.0:4000 \
|
||||
@@ -129,7 +129,7 @@ docker run -p 127.0.0.1:4000-4003:4000-4003 \
|
||||
--mysql-addr 0.0.0.0:4002 \
|
||||
--postgres-addr 0.0.0.0:4003
|
||||
```
|
||||
Dashboard: [http://localhost:4000/dashboard](http://localhost:4000/dashboard)
|
||||
Dashboard: [http://localhost:4000/dashboard](http://localhost:4000/dashboard)
|
||||
[Full Install Guide](https://docs.greptime.com/getting-started/installation/overview)
|
||||
|
||||
**Troubleshooting:**
|
||||
@@ -167,7 +167,7 @@ cargo run -- standalone start
|
||||
|
||||
## Project Status
|
||||
|
||||
> **Status:** Beta.
|
||||
> **Status:** Beta.
|
||||
> **GA (v1.0):** Targeted for mid 2025.
|
||||
|
||||
- Being used in production by early adopters
|
||||
@@ -197,8 +197,8 @@ GreptimeDB is licensed under the [Apache License 2.0](https://apache.org/license
|
||||
|
||||
## Commercial Support
|
||||
|
||||
Running GreptimeDB in your organization?
|
||||
We offer enterprise add-ons, services, training, and consulting.
|
||||
Running GreptimeDB in your organization?
|
||||
We offer enterprise add-ons, services, training, and consulting.
|
||||
[Contact us](https://greptime.com/contactus) for details.
|
||||
|
||||
## Contributing
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -46,6 +46,7 @@
|
||||
| Ingest Rows per Instance | `sum by(instance, pod)(rate(greptime_table_operator_ingest_rows{instance=~"$frontend"}[$__rate_interval]))` | `timeseries` | Ingestion rate by row as in each frontend | `prometheus` | `rowsps` | `[{{instance}}]-[{{pod}}]` |
|
||||
| Region Call QPS per Instance | `sum by(instance, pod, request_type) (rate(greptime_grpc_region_request_count{instance=~"$frontend"}[$__rate_interval]))` | `timeseries` | Region Call QPS per Instance. | `prometheus` | `ops` | `[{{instance}}]-[{{pod}}]-[{{request_type}}]` |
|
||||
| Region Call P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, request_type) (rate(greptime_grpc_region_request_bucket{instance=~"$frontend"}[$__rate_interval])))` | `timeseries` | Region Call P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{request_type}}]` |
|
||||
| Frontend Handle Bulk Insert Elapsed Time | `sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_sum[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_count[$__rate_interval]))`<br/>`histogram_quantile(0.99, sum by(instance, pod, stage, le) (rate(greptime_table_operator_handle_bulk_insert_bucket[$__rate_interval])))` | `timeseries` | Per-stage time for frontend to handle bulk insert requests | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG` |
|
||||
# Mito Engine
|
||||
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
|
||||
| --- | --- | --- | --- | --- | --- | --- |
|
||||
@@ -67,6 +68,8 @@
|
||||
| WAL sync duration seconds | `histogram_quantile(0.99, sum by(le, type, node, instance, pod) (rate(raft_engine_sync_log_duration_seconds_bucket[$__rate_interval])))` | `timeseries` | Raft engine (local disk) log store sync latency, p99 | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-p99` |
|
||||
| Log Store op duration seconds | `histogram_quantile(0.99, sum by(le,logstore,optype,instance, pod) (rate(greptime_logstore_op_elapsed_bucket[$__rate_interval])))` | `timeseries` | Write-ahead log operations latency at p99 | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{logstore}}]-[{{optype}}]-p99` |
|
||||
| Inflight Flush | `greptime_mito_inflight_flush_count` | `timeseries` | Ongoing flush task count | `prometheus` | `none` | `[{{instance}}]-[{{pod}}]` |
|
||||
| Compaction Input/Output Bytes | `sum by(instance, pod) (greptime_mito_compaction_input_bytes)`<br/>`sum by(instance, pod) (greptime_mito_compaction_output_bytes)` | `timeseries` | Compaction oinput output bytes | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-input` |
|
||||
| Region Worker Handle Bulk Insert Requests | `histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))`<br/>`sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to handle bulk insert region requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
|
||||
# OpenDAL
|
||||
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
|
||||
| --- | --- | --- | --- | --- | --- | --- |
|
||||
|
||||
@@ -371,6 +371,21 @@ groups:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{request_type}}]'
|
||||
- title: 'Frontend Handle Bulk Insert Elapsed Time '
|
||||
type: timeseries
|
||||
description: Per-stage time for frontend to handle bulk insert requests
|
||||
unit: s
|
||||
queries:
|
||||
- expr: sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_sum[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_count[$__rate_interval]))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG'
|
||||
- expr: histogram_quantile(0.99, sum by(instance, pod, stage, le) (rate(greptime_table_operator_handle_bulk_insert_bucket[$__rate_interval])))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-P95'
|
||||
- title: Mito Engine
|
||||
panels:
|
||||
- title: Request OPS per Instance
|
||||
@@ -562,6 +577,36 @@ groups:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]'
|
||||
- title: Compaction Input/Output Bytes
|
||||
type: timeseries
|
||||
description: Compaction oinput output bytes
|
||||
unit: bytes
|
||||
queries:
|
||||
- expr: sum by(instance, pod) (greptime_mito_compaction_input_bytes)
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-input'
|
||||
- expr: sum by(instance, pod) (greptime_mito_compaction_output_bytes)
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-output'
|
||||
- title: Region Worker Handle Bulk Insert Requests
|
||||
type: timeseries
|
||||
description: Per-stage elapsed time for region worker to handle bulk insert region requests.
|
||||
unit: s
|
||||
queries:
|
||||
- expr: histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-P95'
|
||||
- expr: sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG'
|
||||
- title: OpenDAL
|
||||
panels:
|
||||
- title: QPS per Instance
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -46,6 +46,7 @@
|
||||
| Ingest Rows per Instance | `sum by(instance, pod)(rate(greptime_table_operator_ingest_rows{}[$__rate_interval]))` | `timeseries` | Ingestion rate by row as in each frontend | `prometheus` | `rowsps` | `[{{instance}}]-[{{pod}}]` |
|
||||
| Region Call QPS per Instance | `sum by(instance, pod, request_type) (rate(greptime_grpc_region_request_count{}[$__rate_interval]))` | `timeseries` | Region Call QPS per Instance. | `prometheus` | `ops` | `[{{instance}}]-[{{pod}}]-[{{request_type}}]` |
|
||||
| Region Call P99 per Instance | `histogram_quantile(0.99, sum by(instance, pod, le, request_type) (rate(greptime_grpc_region_request_bucket{}[$__rate_interval])))` | `timeseries` | Region Call P99 per Instance. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{request_type}}]` |
|
||||
| Frontend Handle Bulk Insert Elapsed Time | `sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_sum[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_count[$__rate_interval]))`<br/>`histogram_quantile(0.99, sum by(instance, pod, stage, le) (rate(greptime_table_operator_handle_bulk_insert_bucket[$__rate_interval])))` | `timeseries` | Per-stage time for frontend to handle bulk insert requests | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG` |
|
||||
# Mito Engine
|
||||
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
|
||||
| --- | --- | --- | --- | --- | --- | --- |
|
||||
@@ -67,6 +68,8 @@
|
||||
| WAL sync duration seconds | `histogram_quantile(0.99, sum by(le, type, node, instance, pod) (rate(raft_engine_sync_log_duration_seconds_bucket[$__rate_interval])))` | `timeseries` | Raft engine (local disk) log store sync latency, p99 | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-p99` |
|
||||
| Log Store op duration seconds | `histogram_quantile(0.99, sum by(le,logstore,optype,instance, pod) (rate(greptime_logstore_op_elapsed_bucket[$__rate_interval])))` | `timeseries` | Write-ahead log operations latency at p99 | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{logstore}}]-[{{optype}}]-p99` |
|
||||
| Inflight Flush | `greptime_mito_inflight_flush_count` | `timeseries` | Ongoing flush task count | `prometheus` | `none` | `[{{instance}}]-[{{pod}}]` |
|
||||
| Compaction Input/Output Bytes | `sum by(instance, pod) (greptime_mito_compaction_input_bytes)`<br/>`sum by(instance, pod) (greptime_mito_compaction_output_bytes)` | `timeseries` | Compaction oinput output bytes | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-input` |
|
||||
| Region Worker Handle Bulk Insert Requests | `histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))`<br/>`sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to handle bulk insert region requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
|
||||
# OpenDAL
|
||||
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
|
||||
| --- | --- | --- | --- | --- | --- | --- |
|
||||
|
||||
@@ -371,6 +371,21 @@ groups:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{request_type}}]'
|
||||
- title: 'Frontend Handle Bulk Insert Elapsed Time '
|
||||
type: timeseries
|
||||
description: Per-stage time for frontend to handle bulk insert requests
|
||||
unit: s
|
||||
queries:
|
||||
- expr: sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_sum[$__rate_interval]))/sum by(instance, pod, stage) (rate(greptime_table_operator_handle_bulk_insert_count[$__rate_interval]))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG'
|
||||
- expr: histogram_quantile(0.99, sum by(instance, pod, stage, le) (rate(greptime_table_operator_handle_bulk_insert_bucket[$__rate_interval])))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-P95'
|
||||
- title: Mito Engine
|
||||
panels:
|
||||
- title: Request OPS per Instance
|
||||
@@ -562,6 +577,36 @@ groups:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]'
|
||||
- title: Compaction Input/Output Bytes
|
||||
type: timeseries
|
||||
description: Compaction oinput output bytes
|
||||
unit: bytes
|
||||
queries:
|
||||
- expr: sum by(instance, pod) (greptime_mito_compaction_input_bytes)
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-input'
|
||||
- expr: sum by(instance, pod) (greptime_mito_compaction_output_bytes)
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-output'
|
||||
- title: Region Worker Handle Bulk Insert Requests
|
||||
type: timeseries
|
||||
description: Per-stage elapsed time for region worker to handle bulk insert region requests.
|
||||
unit: s
|
||||
queries:
|
||||
- expr: histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-P95'
|
||||
- expr: sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))
|
||||
datasource:
|
||||
type: prometheus
|
||||
uid: ${metrics}
|
||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG'
|
||||
- title: OpenDAL
|
||||
panels:
|
||||
- title: QPS per Instance
|
||||
|
||||
@@ -10,6 +10,7 @@ workspace = true
|
||||
[dependencies]
|
||||
api.workspace = true
|
||||
arrow-flight.workspace = true
|
||||
bytes.workspace = true
|
||||
common-base.workspace = true
|
||||
common-error.workspace = true
|
||||
common-macro.workspace = true
|
||||
|
||||
143
src/common/grpc/benches/bench_flight_decoder.rs
Normal file
143
src/common/grpc/benches/bench_flight_decoder.rs
Normal file
@@ -0,0 +1,143 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow_flight::FlightData;
|
||||
use bytes::Bytes;
|
||||
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
|
||||
use common_recordbatch::{DfRecordBatch, RecordBatch};
|
||||
use criterion::{criterion_group, criterion_main, Criterion};
|
||||
use datatypes::arrow::array::{ArrayRef, Int64Array, StringArray, TimestampMillisecondArray};
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::prelude::VectorRef;
|
||||
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
|
||||
use datatypes::vectors::Helper;
|
||||
use prost::Message;
|
||||
|
||||
fn schema() -> SchemaRef {
|
||||
let schema = Schema::new(vec![
|
||||
ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
|
||||
ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
|
||||
ColumnSchema::new(
|
||||
"ts",
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
),
|
||||
ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), false),
|
||||
ColumnSchema::new("v1", ConcreteDataType::int64_datatype(), false),
|
||||
]);
|
||||
Arc::new(schema)
|
||||
}
|
||||
|
||||
/// Generate record batch according to provided schema and num rows.
|
||||
fn prepare_random_record_batch(schema: SchemaRef, num_rows: usize) -> RecordBatch {
|
||||
let tag_candidates = (0..10000).map(|i| i.to_string()).collect::<Vec<_>>();
|
||||
|
||||
let columns: Vec<VectorRef> = schema
|
||||
.column_schemas()
|
||||
.iter()
|
||||
.map(|col| match &col.data_type {
|
||||
ConcreteDataType::String(_) => {
|
||||
let array = StringArray::from(
|
||||
(0..num_rows)
|
||||
.map(|_| {
|
||||
let idx: usize = rand::random_range(0..10000);
|
||||
format!("tag-{}", tag_candidates[idx])
|
||||
})
|
||||
.collect::<Vec<_>>(),
|
||||
);
|
||||
Helper::try_into_vector(Arc::new(array) as ArrayRef).unwrap()
|
||||
}
|
||||
ConcreteDataType::Timestamp(_) => {
|
||||
let now = common_time::util::current_time_millis();
|
||||
let array = TimestampMillisecondArray::from(
|
||||
(0..num_rows).map(|i| now + i as i64).collect::<Vec<_>>(),
|
||||
);
|
||||
Helper::try_into_vector(Arc::new(array) as ArrayRef).unwrap()
|
||||
}
|
||||
ConcreteDataType::Int64(_) => {
|
||||
let array = Int64Array::from((0..num_rows).map(|i| i as i64).collect::<Vec<_>>());
|
||||
Helper::try_into_vector(Arc::new(array) as ArrayRef).unwrap()
|
||||
}
|
||||
_ => unreachable!(),
|
||||
})
|
||||
.collect();
|
||||
|
||||
RecordBatch::new(schema, columns).unwrap()
|
||||
}
|
||||
|
||||
fn prepare_flight_data(num_rows: usize) -> (FlightData, FlightData) {
|
||||
let schema = schema();
|
||||
let mut encoder = FlightEncoder::default();
|
||||
let schema_data = encoder.encode(FlightMessage::Schema(schema.clone()));
|
||||
let rb = prepare_random_record_batch(schema, num_rows);
|
||||
let rb_data = encoder.encode(FlightMessage::Recordbatch(rb));
|
||||
(schema_data, rb_data)
|
||||
}
|
||||
|
||||
fn decode_flight_data_from_protobuf(schema: &Bytes, payload: &Bytes) -> DfRecordBatch {
|
||||
let schema = FlightData::decode(&schema[..]).unwrap();
|
||||
let payload = FlightData::decode(&payload[..]).unwrap();
|
||||
let mut decoder = FlightDecoder::default();
|
||||
let _schema = decoder.try_decode(&schema).unwrap();
|
||||
let message = decoder.try_decode(&payload).unwrap();
|
||||
let FlightMessage::Recordbatch(batch) = message else {
|
||||
unreachable!("unexpected message");
|
||||
};
|
||||
batch.into_df_record_batch()
|
||||
}
|
||||
|
||||
fn decode_flight_data_from_header_and_body(
|
||||
schema: &Bytes,
|
||||
data_header: &Bytes,
|
||||
data_body: &Bytes,
|
||||
) -> DfRecordBatch {
|
||||
let mut decoder = FlightDecoder::try_from_schema_bytes(schema).unwrap();
|
||||
decoder
|
||||
.try_decode_record_batch(data_header, data_body)
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn bench_decode_flight_data(c: &mut Criterion) {
|
||||
let row_counts = [100000, 200000, 1000000];
|
||||
|
||||
for row_count in row_counts {
|
||||
let (schema, payload) = prepare_flight_data(row_count);
|
||||
|
||||
// arguments for decode_flight_data_from_protobuf
|
||||
let schema_bytes = Bytes::from(schema.encode_to_vec());
|
||||
let payload_bytes = Bytes::from(payload.encode_to_vec());
|
||||
|
||||
let mut group = c.benchmark_group(format!("flight_decoder_{}_rows", row_count));
|
||||
group.bench_function("decode_from_protobuf", |b| {
|
||||
b.iter(|| decode_flight_data_from_protobuf(&schema_bytes, &payload_bytes));
|
||||
});
|
||||
|
||||
group.bench_function("decode_from_header_and_body", |b| {
|
||||
b.iter(|| {
|
||||
decode_flight_data_from_header_and_body(
|
||||
&schema.data_header,
|
||||
&payload.data_header,
|
||||
&payload.data_body,
|
||||
)
|
||||
});
|
||||
});
|
||||
|
||||
group.finish();
|
||||
}
|
||||
}
|
||||
|
||||
criterion_group!(benches, bench_decode_flight_data);
|
||||
criterion_main!(benches);
|
||||
@@ -14,8 +14,10 @@
|
||||
|
||||
use criterion::criterion_main;
|
||||
|
||||
mod bench_flight_decoder;
|
||||
mod channel_manager;
|
||||
|
||||
criterion_main! {
|
||||
channel_manager::benches
|
||||
channel_manager::benches,
|
||||
bench_flight_decoder::benches
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ use std::io;
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_macro::stack_trace_debug;
|
||||
use datatypes::arrow::error::ArrowError;
|
||||
use snafu::{Location, Snafu};
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -105,6 +106,14 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed arrow operation"))]
|
||||
Arrow {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
#[snafu(source)]
|
||||
error: ArrowError,
|
||||
},
|
||||
}
|
||||
|
||||
impl ErrorExt for Error {
|
||||
@@ -123,6 +132,7 @@ impl ErrorExt for Error {
|
||||
|
||||
Error::CreateRecordBatch { source, .. } => source.status_code(),
|
||||
Error::ConvertArrowSchema { source, .. } => source.status_code(),
|
||||
Error::Arrow { .. } => StatusCode::Internal,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -21,16 +21,19 @@ use api::v1::{AffectedRows, FlightMetadata, Metrics};
|
||||
use arrow_flight::utils::flight_data_to_arrow_batch;
|
||||
use arrow_flight::{FlightData, SchemaAsIpc};
|
||||
use common_base::bytes::Bytes;
|
||||
use common_recordbatch::{RecordBatch, RecordBatches};
|
||||
use common_recordbatch::{DfRecordBatch, RecordBatch, RecordBatches};
|
||||
use datatypes::arrow;
|
||||
use datatypes::arrow::buffer::Buffer;
|
||||
use datatypes::arrow::datatypes::Schema as ArrowSchema;
|
||||
use datatypes::arrow::ipc::{root_as_message, writer, MessageHeader};
|
||||
use datatypes::arrow::error::ArrowError;
|
||||
use datatypes::arrow::ipc::{convert, reader, root_as_message, writer, MessageHeader};
|
||||
use datatypes::schema::{Schema, SchemaRef};
|
||||
use flatbuffers::FlatBufferBuilder;
|
||||
use prost::bytes::Bytes as ProstBytes;
|
||||
use prost::Message;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use crate::error;
|
||||
use crate::error::{
|
||||
ConvertArrowSchemaSnafu, CreateRecordBatchSnafu, DecodeFlightDataSnafu, InvalidFlightDataSnafu,
|
||||
Result,
|
||||
@@ -124,9 +127,60 @@ impl FlightEncoder {
|
||||
#[derive(Default)]
|
||||
pub struct FlightDecoder {
|
||||
schema: Option<SchemaRef>,
|
||||
schema_bytes: Option<bytes::Bytes>,
|
||||
}
|
||||
|
||||
impl FlightDecoder {
|
||||
/// Build a [FlightDecoder] instance from provided schema bytes.
|
||||
pub fn try_from_schema_bytes(schema_bytes: &bytes::Bytes) -> Result<Self> {
|
||||
let arrow_schema = convert::try_schema_from_flatbuffer_bytes(&schema_bytes[..])
|
||||
.context(error::ArrowSnafu)?;
|
||||
let schema = Arc::new(Schema::try_from(arrow_schema).context(ConvertArrowSchemaSnafu)?);
|
||||
Ok(Self {
|
||||
schema: Some(schema),
|
||||
schema_bytes: Some(schema_bytes.clone()),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn try_decode_record_batch(
|
||||
&mut self,
|
||||
data_header: &bytes::Bytes,
|
||||
data_body: &bytes::Bytes,
|
||||
) -> Result<DfRecordBatch> {
|
||||
let schema = self
|
||||
.schema
|
||||
.as_ref()
|
||||
.context(InvalidFlightDataSnafu {
|
||||
reason: "Should have decoded schema first!",
|
||||
})?
|
||||
.clone();
|
||||
let arrow_schema = schema.arrow_schema().clone();
|
||||
let message = root_as_message(&data_header[..])
|
||||
.map_err(|err| {
|
||||
ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
|
||||
})
|
||||
.context(error::ArrowSnafu)?;
|
||||
let result = message
|
||||
.header_as_record_batch()
|
||||
.ok_or_else(|| {
|
||||
ArrowError::ParseError(
|
||||
"Unable to convert flight data header to a record batch".to_string(),
|
||||
)
|
||||
})
|
||||
.and_then(|batch| {
|
||||
reader::read_record_batch(
|
||||
&Buffer::from(data_body.as_ref()),
|
||||
batch,
|
||||
arrow_schema,
|
||||
&HashMap::new(),
|
||||
None,
|
||||
&message.version(),
|
||||
)
|
||||
})
|
||||
.context(error::ArrowSnafu)?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub fn try_decode(&mut self, flight_data: &FlightData) -> Result<FlightMessage> {
|
||||
let message = root_as_message(&flight_data.data_header).map_err(|e| {
|
||||
InvalidFlightDataSnafu {
|
||||
@@ -162,7 +216,7 @@ impl FlightDecoder {
|
||||
Arc::new(Schema::try_from(arrow_schema).context(ConvertArrowSchemaSnafu)?);
|
||||
|
||||
self.schema = Some(schema.clone());
|
||||
|
||||
self.schema_bytes = Some(flight_data.data_header.clone());
|
||||
Ok(FlightMessage::Schema(schema))
|
||||
}
|
||||
MessageHeader::RecordBatch => {
|
||||
@@ -196,6 +250,10 @@ impl FlightDecoder {
|
||||
pub fn schema(&self) -> Option<&SchemaRef> {
|
||||
self.schema.as_ref()
|
||||
}
|
||||
|
||||
pub fn schema_bytes(&self) -> Option<bytes::Bytes> {
|
||||
self.schema_bytes.clone()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn flight_messages_to_recordbatches(messages: Vec<FlightMessage>) -> Result<RecordBatches> {
|
||||
|
||||
@@ -16,9 +16,12 @@ use std::sync::Arc;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::flow_name::FlowName;
|
||||
use crate::instruction::CacheIdent;
|
||||
use crate::instruction::{CacheIdent, DropFlow};
|
||||
use crate::key::flow::flow_info::FlowInfoKey;
|
||||
use crate::key::flow::flow_name::FlowNameKey;
|
||||
use crate::key::flow::flow_route::FlowRouteKey;
|
||||
use crate::key::flow::flownode_flow::FlownodeFlowKey;
|
||||
use crate::key::flow::table_flow::TableFlowKey;
|
||||
use crate::key::schema_name::SchemaNameKey;
|
||||
use crate::key::table_info::TableInfoKey;
|
||||
use crate::key::table_name::TableNameKey;
|
||||
@@ -89,9 +92,40 @@ where
|
||||
let key: SchemaNameKey = schema_name.into();
|
||||
self.invalidate_key(&key.to_bytes()).await;
|
||||
}
|
||||
CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_) => {
|
||||
CacheIdent::CreateFlow(_) => {
|
||||
// Do nothing
|
||||
}
|
||||
CacheIdent::DropFlow(DropFlow {
|
||||
flow_id,
|
||||
source_table_ids,
|
||||
flow_part2node_id,
|
||||
}) => {
|
||||
// invalidate flow route/flownode flow/table flow
|
||||
let mut keys = Vec::with_capacity(
|
||||
source_table_ids.len() * flow_part2node_id.len()
|
||||
+ flow_part2node_id.len() * 2,
|
||||
);
|
||||
for table_id in source_table_ids {
|
||||
for (partition_id, node_id) in flow_part2node_id {
|
||||
let key =
|
||||
TableFlowKey::new(*table_id, *node_id, *flow_id, *partition_id)
|
||||
.to_bytes();
|
||||
keys.push(key);
|
||||
}
|
||||
}
|
||||
|
||||
for (partition_id, node_id) in flow_part2node_id {
|
||||
let key =
|
||||
FlownodeFlowKey::new(*node_id, *flow_id, *partition_id).to_bytes();
|
||||
keys.push(key);
|
||||
let key = FlowRouteKey::new(*flow_id, *partition_id).to_bytes();
|
||||
keys.push(key);
|
||||
}
|
||||
|
||||
for key in keys {
|
||||
self.invalidate_key(&key).await;
|
||||
}
|
||||
}
|
||||
CacheIdent::FlowName(FlowName {
|
||||
catalog_name,
|
||||
flow_name,
|
||||
|
||||
@@ -256,6 +256,11 @@ impl DatanodeTableManager {
|
||||
})?
|
||||
.and_then(|r| DatanodeTableValue::try_from_raw_value(&r.value))?
|
||||
.region_info;
|
||||
|
||||
// If the region options are the same, we don't need to update it.
|
||||
if region_info.region_options == new_region_options {
|
||||
return Ok(Txn::new());
|
||||
}
|
||||
// substitute region options only.
|
||||
region_info.region_options = new_region_options;
|
||||
|
||||
|
||||
@@ -45,7 +45,7 @@ use crate::kv_backend::KvBackendRef;
|
||||
use crate::rpc::store::BatchDeleteRequest;
|
||||
|
||||
/// The key of `__flow/` scope.
|
||||
#[derive(Debug, PartialEq)]
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct FlowScoped<T> {
|
||||
inner: T,
|
||||
}
|
||||
|
||||
@@ -153,6 +153,15 @@ impl FlowInfoValue {
|
||||
&self.flownode_ids
|
||||
}
|
||||
|
||||
/// Insert a new flownode id for a partition.
|
||||
pub fn insert_flownode_id(
|
||||
&mut self,
|
||||
partition: FlowPartitionId,
|
||||
node: FlownodeId,
|
||||
) -> Option<FlownodeId> {
|
||||
self.flownode_ids.insert(partition, node)
|
||||
}
|
||||
|
||||
/// Returns the `source_table`.
|
||||
pub fn source_table_ids(&self) -> &[TableId] {
|
||||
&self.source_table_ids
|
||||
|
||||
@@ -42,7 +42,7 @@ lazy_static! {
|
||||
/// The key stores the route info of the flow.
|
||||
///
|
||||
/// The layout: `__flow/route/{flow_id}/{partition_id}`.
|
||||
#[derive(Debug, PartialEq)]
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct FlowRouteKey(FlowScoped<FlowRouteKeyInner>);
|
||||
|
||||
impl FlowRouteKey {
|
||||
@@ -145,6 +145,12 @@ pub struct FlowRouteValue {
|
||||
pub(crate) peer: Peer,
|
||||
}
|
||||
|
||||
impl From<Peer> for FlowRouteValue {
|
||||
fn from(peer: Peer) -> Self {
|
||||
Self { peer }
|
||||
}
|
||||
}
|
||||
|
||||
impl FlowRouteValue {
|
||||
/// Returns the `peer`.
|
||||
pub fn peer(&self) -> &Peer {
|
||||
|
||||
@@ -166,6 +166,17 @@ impl FlownodeFlowManager {
|
||||
Self { kv_backend }
|
||||
}
|
||||
|
||||
/// Whether given flow exist on this flownode.
|
||||
pub async fn exists(
|
||||
&self,
|
||||
flownode_id: FlownodeId,
|
||||
flow_id: FlowId,
|
||||
partition_id: FlowPartitionId,
|
||||
) -> Result<bool> {
|
||||
let key = FlownodeFlowKey::new(flownode_id, flow_id, partition_id).to_bytes();
|
||||
Ok(self.kv_backend.get(&key).await?.is_some())
|
||||
}
|
||||
|
||||
/// Retrieves all [FlowId] and [FlowPartitionId]s of the specified `flownode_id`.
|
||||
pub fn flows(
|
||||
&self,
|
||||
|
||||
@@ -158,12 +158,7 @@ mod tests {
|
||||
provider = "kafka"
|
||||
broker_endpoints = ["127.0.0.1:9092"]
|
||||
max_batch_bytes = "1MB"
|
||||
linger = "200ms"
|
||||
consumer_wait_timeout = "100ms"
|
||||
backoff_init = "500ms"
|
||||
backoff_max = "10s"
|
||||
backoff_base = 2
|
||||
backoff_deadline = "5mins"
|
||||
num_topics = 32
|
||||
num_partitions = 1
|
||||
selector_type = "round_robin"
|
||||
|
||||
@@ -65,6 +65,7 @@ servers.workspace = true
|
||||
session.workspace = true
|
||||
smallvec.workspace = true
|
||||
snafu.workspace = true
|
||||
sql.workspace = true
|
||||
store-api.workspace = true
|
||||
strum.workspace = true
|
||||
substrait.workspace = true
|
||||
|
||||
@@ -359,7 +359,7 @@ impl FlowDualEngine {
|
||||
}
|
||||
} else {
|
||||
warn!(
|
||||
"Flownode {:?} found flows not exist in flownode, flow_ids={:?}",
|
||||
"Flows do not exist in flownode for node {:?}, flow_ids={:?}",
|
||||
nodeid, to_be_created
|
||||
);
|
||||
}
|
||||
@@ -379,7 +379,7 @@ impl FlowDualEngine {
|
||||
}
|
||||
} else {
|
||||
warn!(
|
||||
"Flownode {:?} found flows not exist in flownode, flow_ids={:?}",
|
||||
"Flows do not exist in metadata for node {:?}, flow_ids={:?}",
|
||||
nodeid, to_be_dropped
|
||||
);
|
||||
}
|
||||
@@ -826,9 +826,17 @@ fn to_meta_err(
|
||||
location: snafu::Location,
|
||||
) -> impl FnOnce(crate::error::Error) -> common_meta::error::Error {
|
||||
move |err: crate::error::Error| -> common_meta::error::Error {
|
||||
common_meta::error::Error::External {
|
||||
location,
|
||||
source: BoxedError::new(err),
|
||||
match err {
|
||||
crate::error::Error::FlowNotFound { id, .. } => {
|
||||
common_meta::error::Error::FlowNotFound {
|
||||
flow_name: format!("flow_id={id}"),
|
||||
location,
|
||||
}
|
||||
}
|
||||
_ => common_meta::error::Error::External {
|
||||
location,
|
||||
source: BoxedError::new(err),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,7 +39,8 @@ use crate::batching_mode::time_window::{find_time_window_expr, TimeWindowExpr};
|
||||
use crate::batching_mode::utils::sql_to_df_plan;
|
||||
use crate::engine::FlowEngine;
|
||||
use crate::error::{
|
||||
ExternalSnafu, FlowAlreadyExistSnafu, TableNotFoundMetaSnafu, UnexpectedSnafu, UnsupportedSnafu,
|
||||
ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu,
|
||||
UnexpectedSnafu, UnsupportedSnafu,
|
||||
};
|
||||
use crate::{CreateFlowArgs, Error, FlowId, TableName};
|
||||
|
||||
@@ -312,7 +313,7 @@ impl BatchingEngine {
|
||||
.unwrap_or("None".to_string())
|
||||
);
|
||||
|
||||
let task = BatchingTask::new(
|
||||
let task = BatchingTask::try_new(
|
||||
flow_id,
|
||||
&sql,
|
||||
plan,
|
||||
@@ -323,7 +324,7 @@ impl BatchingEngine {
|
||||
query_ctx,
|
||||
self.catalog_manager.clone(),
|
||||
rx,
|
||||
);
|
||||
)?;
|
||||
|
||||
let task_inner = task.clone();
|
||||
let engine = self.query_engine.clone();
|
||||
@@ -349,7 +350,8 @@ impl BatchingEngine {
|
||||
|
||||
pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> {
|
||||
if self.tasks.write().await.remove(&flow_id).is_none() {
|
||||
warn!("Flow {flow_id} not found in tasks")
|
||||
warn!("Flow {flow_id} not found in tasks");
|
||||
FlowNotFoundSnafu { id: flow_id }.fail()?;
|
||||
}
|
||||
let Some(tx) = self.shutdown_txs.write().await.remove(&flow_id) else {
|
||||
UnexpectedSnafu {
|
||||
@@ -366,9 +368,7 @@ impl BatchingEngine {
|
||||
pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result<usize, Error> {
|
||||
debug!("Try flush flow {flow_id}");
|
||||
let task = self.tasks.read().await.get(&flow_id).cloned();
|
||||
let task = task.with_context(|| UnexpectedSnafu {
|
||||
reason: format!("Can't found task for flow {flow_id}"),
|
||||
})?;
|
||||
let task = task.with_context(|| FlowNotFoundSnafu { id: flow_id })?;
|
||||
|
||||
task.mark_all_windows_as_dirty()?;
|
||||
|
||||
|
||||
@@ -71,18 +71,33 @@ impl TaskState {
|
||||
self.last_update_time = Instant::now();
|
||||
}
|
||||
|
||||
/// wait for at least `last_query_duration`, at most `max_timeout` to start next query
|
||||
/// Compute the next query delay based on the time window size or the last query duration.
|
||||
/// Aiming to avoid too frequent queries. But also not too long delay.
|
||||
/// The delay is computed as follows:
|
||||
/// - If `time_window_size` is set, the delay is half the time window size, constrained to be
|
||||
/// at least `last_query_duration` and at most `max_timeout`.
|
||||
/// - If `time_window_size` is not set, the delay defaults to `last_query_duration`, constrained
|
||||
/// to be at least `MIN_REFRESH_DURATION` and at most `max_timeout`.
|
||||
///
|
||||
/// if have more dirty time window, exec next query immediately
|
||||
/// If there are dirty time windows, the function returns an immediate execution time to clean them.
|
||||
/// TODO: Make this behavior configurable.
|
||||
pub fn get_next_start_query_time(
|
||||
&self,
|
||||
flow_id: FlowId,
|
||||
time_window_size: &Option<Duration>,
|
||||
max_timeout: Option<Duration>,
|
||||
) -> Instant {
|
||||
let next_duration = max_timeout
|
||||
let last_duration = max_timeout
|
||||
.unwrap_or(self.last_query_duration)
|
||||
.min(self.last_query_duration);
|
||||
let next_duration = next_duration.max(MIN_REFRESH_DURATION);
|
||||
.min(self.last_query_duration)
|
||||
.max(MIN_REFRESH_DURATION);
|
||||
|
||||
let next_duration = time_window_size
|
||||
.map(|t| {
|
||||
let half = t / 2;
|
||||
half.max(last_duration)
|
||||
})
|
||||
.unwrap_or(last_duration);
|
||||
|
||||
// if have dirty time window, execute immediately to clean dirty time window
|
||||
if self.dirty_time_windows.windows.is_empty() {
|
||||
|
||||
@@ -13,7 +13,6 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::{BTreeSet, HashSet};
|
||||
use std::ops::Deref;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
|
||||
@@ -29,6 +28,7 @@ use datafusion::optimizer::analyzer::count_wildcard_rule::CountWildcardRule;
|
||||
use datafusion::optimizer::AnalyzerRule;
|
||||
use datafusion::sql::unparser::expr_to_sql;
|
||||
use datafusion_common::tree_node::{Transformed, TreeNode};
|
||||
use datafusion_common::DFSchemaRef;
|
||||
use datafusion_expr::{DmlStatement, LogicalPlan, WriteOp};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnSchema, Schema};
|
||||
@@ -37,6 +37,8 @@ use query::query_engine::DefaultSerializer;
|
||||
use query::QueryEngineRef;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use sql::parser::{ParseOptions, ParserContext};
|
||||
use sql::statements::statement::Statement;
|
||||
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::sync::oneshot::error::TryRecvError;
|
||||
@@ -68,13 +70,42 @@ use crate::{Error, FlowId};
|
||||
pub struct TaskConfig {
|
||||
pub flow_id: FlowId,
|
||||
pub query: String,
|
||||
plan: Arc<LogicalPlan>,
|
||||
/// output schema of the query
|
||||
pub output_schema: DFSchemaRef,
|
||||
pub time_window_expr: Option<TimeWindowExpr>,
|
||||
/// in seconds
|
||||
pub expire_after: Option<i64>,
|
||||
sink_table_name: [String; 3],
|
||||
pub source_table_names: HashSet<[String; 3]>,
|
||||
catalog_manager: CatalogManagerRef,
|
||||
query_type: QueryType,
|
||||
}
|
||||
|
||||
fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
|
||||
let stmts =
|
||||
ParserContext::create_with_dialect(query, query_ctx.sql_dialect(), ParseOptions::default())
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
|
||||
ensure!(
|
||||
stmts.len() == 1,
|
||||
InvalidQuerySnafu {
|
||||
reason: format!("Expect only one statement, found {}", stmts.len())
|
||||
}
|
||||
);
|
||||
let stmt = &stmts[0];
|
||||
match stmt {
|
||||
Statement::Tql(_) => Ok(QueryType::Tql),
|
||||
_ => Ok(QueryType::Sql),
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
enum QueryType {
|
||||
/// query is a tql query
|
||||
Tql,
|
||||
/// query is a sql query
|
||||
Sql,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -85,7 +116,7 @@ pub struct BatchingTask {
|
||||
|
||||
impl BatchingTask {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
pub fn try_new(
|
||||
flow_id: FlowId,
|
||||
query: &str,
|
||||
plan: LogicalPlan,
|
||||
@@ -96,20 +127,21 @@ impl BatchingTask {
|
||||
query_ctx: QueryContextRef,
|
||||
catalog_manager: CatalogManagerRef,
|
||||
shutdown_rx: oneshot::Receiver<()>,
|
||||
) -> Self {
|
||||
Self {
|
||||
) -> Result<Self, Error> {
|
||||
Ok(Self {
|
||||
config: Arc::new(TaskConfig {
|
||||
flow_id,
|
||||
query: query.to_string(),
|
||||
plan: Arc::new(plan),
|
||||
time_window_expr,
|
||||
expire_after,
|
||||
sink_table_name,
|
||||
source_table_names: source_table_names.into_iter().collect(),
|
||||
catalog_manager,
|
||||
output_schema: plan.schema().clone(),
|
||||
query_type: determine_query_type(query, &query_ctx)?,
|
||||
}),
|
||||
state: Arc::new(RwLock::new(TaskState::new(query_ctx, shutdown_rx))),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set)
|
||||
@@ -380,6 +412,23 @@ impl BatchingTask {
|
||||
frontend_client: Arc<FrontendClient>,
|
||||
) {
|
||||
loop {
|
||||
// first check if shutdown signal is received
|
||||
// if so, break the loop
|
||||
{
|
||||
let mut state = self.state.write().unwrap();
|
||||
match state.shutdown_rx.try_recv() {
|
||||
Ok(()) => break,
|
||||
Err(TryRecvError::Closed) => {
|
||||
warn!(
|
||||
"Unexpected shutdown flow {}, shutdown anyway",
|
||||
self.config.flow_id
|
||||
);
|
||||
break;
|
||||
}
|
||||
Err(TryRecvError::Empty) => (),
|
||||
}
|
||||
}
|
||||
|
||||
let mut new_query = None;
|
||||
let mut gen_and_exec = async || {
|
||||
new_query = self.gen_insert_plan(&engine).await?;
|
||||
@@ -393,20 +442,15 @@ impl BatchingTask {
|
||||
// normal execute, sleep for some time before doing next query
|
||||
Ok(Some(_)) => {
|
||||
let sleep_until = {
|
||||
let mut state = self.state.write().unwrap();
|
||||
match state.shutdown_rx.try_recv() {
|
||||
Ok(()) => break,
|
||||
Err(TryRecvError::Closed) => {
|
||||
warn!(
|
||||
"Unexpected shutdown flow {}, shutdown anyway",
|
||||
self.config.flow_id
|
||||
);
|
||||
break;
|
||||
}
|
||||
Err(TryRecvError::Empty) => (),
|
||||
}
|
||||
let state = self.state.write().unwrap();
|
||||
|
||||
state.get_next_start_query_time(
|
||||
self.config.flow_id,
|
||||
&self
|
||||
.config
|
||||
.time_window_expr
|
||||
.as_ref()
|
||||
.and_then(|t| *t.time_window_size()),
|
||||
Some(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT),
|
||||
)
|
||||
};
|
||||
@@ -472,7 +516,7 @@ impl BatchingTask {
|
||||
.unwrap_or(u64::MIN);
|
||||
|
||||
let low_bound = Timestamp::new_second(low_bound as i64);
|
||||
let schema_len = self.config.plan.schema().fields().len();
|
||||
let schema_len = self.config.output_schema.fields().len();
|
||||
|
||||
let expire_time_window_bound = self
|
||||
.config
|
||||
@@ -481,104 +525,101 @@ impl BatchingTask {
|
||||
.map(|expr| expr.eval(low_bound))
|
||||
.transpose()?;
|
||||
|
||||
let new_plan = {
|
||||
let expr = {
|
||||
match expire_time_window_bound {
|
||||
Some((Some(l), Some(u))) => {
|
||||
let window_size = u.sub(&l).with_context(|| UnexpectedSnafu {
|
||||
reason: format!("Can't get window size from {u:?} - {l:?}"),
|
||||
})?;
|
||||
let col_name = self
|
||||
.config
|
||||
.time_window_expr
|
||||
.as_ref()
|
||||
.map(|expr| expr.column_name.clone())
|
||||
.with_context(|| UnexpectedSnafu {
|
||||
reason: format!(
|
||||
"Flow id={:?}, Failed to get column name from time window expr",
|
||||
self.config.flow_id
|
||||
),
|
||||
})?;
|
||||
|
||||
self.state
|
||||
.write()
|
||||
.unwrap()
|
||||
.dirty_time_windows
|
||||
.gen_filter_exprs(
|
||||
&col_name,
|
||||
Some(l),
|
||||
window_size,
|
||||
self.config.flow_id,
|
||||
Some(self),
|
||||
)?
|
||||
}
|
||||
_ => {
|
||||
// use sink_table_meta to add to query the `update_at` and `__ts_placeholder` column's value too for compatibility reason
|
||||
debug!(
|
||||
"Flow id = {:?}, can't get window size: precise_lower_bound={expire_time_window_bound:?}, using the same query", self.config.flow_id
|
||||
);
|
||||
// clean dirty time window too, this could be from create flow's check_execute
|
||||
self.state.write().unwrap().dirty_time_windows.clean();
|
||||
|
||||
let mut add_auto_column =
|
||||
AddAutoColumnRewriter::new(sink_table_schema.clone());
|
||||
let plan = self
|
||||
.config
|
||||
.plan
|
||||
.deref()
|
||||
.clone()
|
||||
.rewrite(&mut add_auto_column)
|
||||
.with_context(|_| DatafusionSnafu {
|
||||
context: format!(
|
||||
"Failed to rewrite plan:\n {}\n",
|
||||
self.config.plan
|
||||
),
|
||||
})?
|
||||
.data;
|
||||
let schema_len = plan.schema().fields().len();
|
||||
|
||||
// since no time window lower/upper bound is found, just return the original query(with auto columns)
|
||||
return Ok(Some((plan, schema_len)));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let (Some((Some(l), Some(u))), QueryType::Sql) =
|
||||
(expire_time_window_bound, &self.config.query_type)
|
||||
else {
|
||||
// either no time window or not a sql query, then just use the original query
|
||||
// use sink_table_meta to add to query the `update_at` and `__ts_placeholder` column's value too for compatibility reason
|
||||
debug!(
|
||||
"Flow id={:?}, Generated filter expr: {:?}",
|
||||
self.config.flow_id,
|
||||
expr.as_ref()
|
||||
.map(|expr| expr_to_sql(expr).with_context(|_| DatafusionSnafu {
|
||||
context: format!("Failed to generate filter expr from {expr:?}"),
|
||||
}))
|
||||
.transpose()?
|
||||
.map(|s| s.to_string())
|
||||
"Flow id = {:?}, can't get window size: precise_lower_bound={expire_time_window_bound:?}, using the same query", self.config.flow_id
|
||||
);
|
||||
// clean dirty time window too, this could be from create flow's check_execute
|
||||
self.state.write().unwrap().dirty_time_windows.clean();
|
||||
|
||||
let Some(expr) = expr else {
|
||||
// no new data, hence no need to update
|
||||
debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
// TODO(discord9): add auto column or not? This might break compatibility for auto created sink table before this, but that's ok right?
|
||||
|
||||
let mut add_filter = AddFilterRewriter::new(expr);
|
||||
// TODO(discord9): not add auto column for tql query?
|
||||
let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone());
|
||||
|
||||
let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false)
|
||||
.await?;
|
||||
let rewrite = plan
|
||||
|
||||
let plan = plan
|
||||
.clone()
|
||||
.rewrite(&mut add_filter)
|
||||
.and_then(|p| p.data.rewrite(&mut add_auto_column))
|
||||
.rewrite(&mut add_auto_column)
|
||||
.with_context(|_| DatafusionSnafu {
|
||||
context: format!("Failed to rewrite plan:\n {}\n", plan),
|
||||
})?
|
||||
.data;
|
||||
// only apply optimize after complex rewrite is done
|
||||
apply_df_optimizer(rewrite).await?
|
||||
let schema_len = plan.schema().fields().len();
|
||||
|
||||
// since no time window lower/upper bound is found, just return the original query(with auto columns)
|
||||
return Ok(Some((plan, schema_len)));
|
||||
};
|
||||
|
||||
debug!(
|
||||
"Flow id = {:?}, found time window: precise_lower_bound={:?}, precise_upper_bound={:?}",
|
||||
self.config.flow_id, l, u
|
||||
);
|
||||
let window_size = u.sub(&l).with_context(|| UnexpectedSnafu {
|
||||
reason: format!("Can't get window size from {u:?} - {l:?}"),
|
||||
})?;
|
||||
let col_name = self
|
||||
.config
|
||||
.time_window_expr
|
||||
.as_ref()
|
||||
.map(|expr| expr.column_name.clone())
|
||||
.with_context(|| UnexpectedSnafu {
|
||||
reason: format!(
|
||||
"Flow id={:?}, Failed to get column name from time window expr",
|
||||
self.config.flow_id
|
||||
),
|
||||
})?;
|
||||
|
||||
let expr = self
|
||||
.state
|
||||
.write()
|
||||
.unwrap()
|
||||
.dirty_time_windows
|
||||
.gen_filter_exprs(
|
||||
&col_name,
|
||||
Some(l),
|
||||
window_size,
|
||||
self.config.flow_id,
|
||||
Some(self),
|
||||
)?;
|
||||
|
||||
debug!(
|
||||
"Flow id={:?}, Generated filter expr: {:?}",
|
||||
self.config.flow_id,
|
||||
expr.as_ref()
|
||||
.map(|expr| expr_to_sql(expr).with_context(|_| DatafusionSnafu {
|
||||
context: format!("Failed to generate filter expr from {expr:?}"),
|
||||
}))
|
||||
.transpose()?
|
||||
.map(|s| s.to_string())
|
||||
);
|
||||
|
||||
let Some(expr) = expr else {
|
||||
// no new data, hence no need to update
|
||||
debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let mut add_filter = AddFilterRewriter::new(expr);
|
||||
let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone());
|
||||
|
||||
let plan =
|
||||
sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false).await?;
|
||||
let rewrite = plan
|
||||
.clone()
|
||||
.rewrite(&mut add_filter)
|
||||
.and_then(|p| p.data.rewrite(&mut add_auto_column))
|
||||
.with_context(|_| DatafusionSnafu {
|
||||
context: format!("Failed to rewrite plan:\n {}\n", plan),
|
||||
})?
|
||||
.data;
|
||||
// only apply optimize after complex rewrite is done
|
||||
let new_plan = apply_df_optimizer(rewrite).await?;
|
||||
|
||||
Ok(Some((new_plan, schema_len)))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -55,6 +55,9 @@ use crate::error::{
|
||||
use crate::expr::error::DataTypeSnafu;
|
||||
use crate::Error;
|
||||
|
||||
/// Represents a test timestamp in seconds since the Unix epoch.
|
||||
const DEFAULT_TEST_TIMESTAMP: Timestamp = Timestamp::new_second(17_0000_0000);
|
||||
|
||||
/// Time window expr like `date_bin(INTERVAL '1' MINUTE, ts)`, this type help with
|
||||
/// evaluating the expr using given timestamp
|
||||
///
|
||||
@@ -70,6 +73,7 @@ pub struct TimeWindowExpr {
|
||||
pub column_name: String,
|
||||
logical_expr: Expr,
|
||||
df_schema: DFSchema,
|
||||
eval_time_window_size: Option<std::time::Duration>,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for TimeWindowExpr {
|
||||
@@ -84,6 +88,11 @@ impl std::fmt::Display for TimeWindowExpr {
|
||||
}
|
||||
|
||||
impl TimeWindowExpr {
|
||||
/// The time window size of the expr, get from calling `eval` with a test timestamp
|
||||
pub fn time_window_size(&self) -> &Option<std::time::Duration> {
|
||||
&self.eval_time_window_size
|
||||
}
|
||||
|
||||
pub fn from_expr(
|
||||
expr: &Expr,
|
||||
column_name: &str,
|
||||
@@ -91,12 +100,28 @@ impl TimeWindowExpr {
|
||||
session: &SessionState,
|
||||
) -> Result<Self, Error> {
|
||||
let phy_expr: PhysicalExprRef = to_phy_expr(expr, df_schema, session)?;
|
||||
Ok(Self {
|
||||
let mut zelf = Self {
|
||||
phy_expr,
|
||||
column_name: column_name.to_string(),
|
||||
logical_expr: expr.clone(),
|
||||
df_schema: df_schema.clone(),
|
||||
})
|
||||
eval_time_window_size: None,
|
||||
};
|
||||
let test_ts = DEFAULT_TEST_TIMESTAMP;
|
||||
let (l, u) = zelf.eval(test_ts)?;
|
||||
let time_window_size = match (l, u) {
|
||||
(Some(l), Some(u)) => u.sub(&l).map(|r| r.to_std()).transpose().map_err(|_| {
|
||||
UnexpectedSnafu {
|
||||
reason: format!(
|
||||
"Expect upper bound older than lower bound, found upper={u:?} and lower={l:?}"
|
||||
),
|
||||
}
|
||||
.build()
|
||||
})?,
|
||||
_ => None,
|
||||
};
|
||||
zelf.eval_time_window_size = time_window_size;
|
||||
Ok(zelf)
|
||||
}
|
||||
|
||||
pub fn eval(
|
||||
|
||||
@@ -29,15 +29,18 @@ use datafusion_common::tree_node::{
|
||||
use datafusion_common::{DFSchema, DataFusionError, ScalarValue};
|
||||
use datafusion_expr::{Distinct, LogicalPlan, Projection};
|
||||
use datatypes::schema::SchemaRef;
|
||||
use query::parser::QueryLanguageParser;
|
||||
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement, DEFAULT_LOOKBACK_STRING};
|
||||
use query::QueryEngineRef;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use sql::parser::{ParseOptions, ParserContext};
|
||||
use sql::statements::statement::Statement;
|
||||
use sql::statements::tql::Tql;
|
||||
use table::metadata::TableInfo;
|
||||
|
||||
use crate::adapter::AUTO_CREATED_PLACEHOLDER_TS_COL;
|
||||
use crate::df_optimizer::apply_df_optimizer;
|
||||
use crate::error::{DatafusionSnafu, ExternalSnafu, TableNotFoundSnafu};
|
||||
use crate::error::{DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu, TableNotFoundSnafu};
|
||||
use crate::{Error, TableName};
|
||||
|
||||
pub async fn get_table_info_df_schema(
|
||||
@@ -73,21 +76,57 @@ pub async fn get_table_info_df_schema(
|
||||
}
|
||||
|
||||
/// Convert sql to datafusion logical plan
|
||||
/// Also support TQL (but only Eval not Explain or Analyze)
|
||||
pub async fn sql_to_df_plan(
|
||||
query_ctx: QueryContextRef,
|
||||
engine: QueryEngineRef,
|
||||
sql: &str,
|
||||
optimize: bool,
|
||||
) -> Result<LogicalPlan, Error> {
|
||||
let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx)
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
let stmts =
|
||||
ParserContext::create_with_dialect(sql, query_ctx.sql_dialect(), ParseOptions::default())
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
|
||||
ensure!(
|
||||
stmts.len() == 1,
|
||||
InvalidQuerySnafu {
|
||||
reason: format!("Expect only one statement, found {}", stmts.len())
|
||||
}
|
||||
);
|
||||
let stmt = &stmts[0];
|
||||
let query_stmt = match stmt {
|
||||
Statement::Tql(tql) => match tql {
|
||||
Tql::Eval(eval) => {
|
||||
let eval = eval.clone();
|
||||
let promql = PromQuery {
|
||||
start: eval.start,
|
||||
end: eval.end,
|
||||
step: eval.step,
|
||||
query: eval.query,
|
||||
lookback: eval
|
||||
.lookback
|
||||
.unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
|
||||
};
|
||||
|
||||
QueryLanguageParser::parse_promql(&promql, &query_ctx)
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?
|
||||
}
|
||||
_ => InvalidQuerySnafu {
|
||||
reason: format!("TQL statement {tql:?} is not supported, expect only TQL EVAL"),
|
||||
}
|
||||
.fail()?,
|
||||
},
|
||||
_ => QueryStatement::Sql(stmt.clone()),
|
||||
};
|
||||
let plan = engine
|
||||
.planner()
|
||||
.plan(&stmt, query_ctx)
|
||||
.plan(&query_stmt, query_ctx)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
|
||||
let plan = if optimize {
|
||||
apply_df_optimizer(plan).await?
|
||||
} else {
|
||||
|
||||
@@ -321,8 +321,8 @@ impl ErrorExt for Error {
|
||||
Self::FlowAlreadyExist { .. } => StatusCode::TableAlreadyExists,
|
||||
Self::TableNotFound { .. }
|
||||
| Self::TableNotFoundMeta { .. }
|
||||
| Self::FlowNotFound { .. }
|
||||
| Self::ListFlows { .. } => StatusCode::TableNotFound,
|
||||
Self::FlowNotFound { .. } => StatusCode::FlowNotFound,
|
||||
Self::Plan { .. } | Self::Datatypes { .. } => StatusCode::PlanQuery,
|
||||
Self::CreateFlow { .. } | Self::Arrow { .. } | Self::Time { .. } => {
|
||||
StatusCode::EngineExecuteQuery
|
||||
|
||||
@@ -596,7 +596,7 @@ impl FrontendInvoker {
|
||||
.start_timer();
|
||||
|
||||
self.inserter
|
||||
.handle_row_inserts(requests, ctx, &self.statement_executor)
|
||||
.handle_row_inserts(requests, ctx, &self.statement_executor, false)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_frontend::error::ExternalSnafu)
|
||||
|
||||
@@ -75,7 +75,10 @@ impl GrpcQueryHandler for Instance {
|
||||
|
||||
let output = match request {
|
||||
Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
|
||||
Request::RowInserts(requests) => self.handle_row_inserts(requests, ctx.clone()).await?,
|
||||
Request::RowInserts(requests) => {
|
||||
self.handle_row_inserts(requests, ctx.clone(), false)
|
||||
.await?
|
||||
}
|
||||
Request::Deletes(requests) => self.handle_deletes(requests, ctx.clone()).await?,
|
||||
Request::RowDeletes(requests) => self.handle_row_deletes(requests, ctx.clone()).await?,
|
||||
Request::Query(query_request) => {
|
||||
@@ -416,9 +419,15 @@ impl Instance {
|
||||
&self,
|
||||
requests: RowInsertRequests,
|
||||
ctx: QueryContextRef,
|
||||
accommodate_existing_schema: bool,
|
||||
) -> Result<Output> {
|
||||
self.inserter
|
||||
.handle_row_inserts(requests, ctx, self.statement_executor.as_ref())
|
||||
.handle_row_inserts(
|
||||
requests,
|
||||
ctx,
|
||||
self.statement_executor.as_ref(),
|
||||
accommodate_existing_schema,
|
||||
)
|
||||
.await
|
||||
.context(TableOperationSnafu)
|
||||
}
|
||||
@@ -430,7 +439,7 @@ impl Instance {
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
self.inserter
|
||||
.handle_last_non_null_inserts(requests, ctx, self.statement_executor.as_ref())
|
||||
.handle_last_non_null_inserts(requests, ctx, self.statement_executor.as_ref(), true)
|
||||
.await
|
||||
.context(TableOperationSnafu)
|
||||
}
|
||||
|
||||
@@ -53,7 +53,7 @@ impl OpentsdbProtocolHandler for Instance {
|
||||
};
|
||||
|
||||
let output = self
|
||||
.handle_row_inserts(requests, ctx)
|
||||
.handle_row_inserts(requests, ctx, true)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(servers::error::ExecuteGrpcQuerySnafu)?;
|
||||
|
||||
@@ -63,7 +63,7 @@ impl OpenTelemetryProtocolHandler for Instance {
|
||||
None
|
||||
};
|
||||
|
||||
self.handle_row_inserts(requests, ctx)
|
||||
self.handle_row_inserts(requests, ctx, false)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)
|
||||
|
||||
@@ -195,7 +195,7 @@ impl PromStoreProtocolHandler for Instance {
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)?
|
||||
} else {
|
||||
self.handle_row_inserts(request, ctx.clone())
|
||||
self.handle_row_inserts(request, ctx.clone(), true)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)?
|
||||
|
||||
@@ -233,7 +233,7 @@ impl SlowQueryEventHandler {
|
||||
.into();
|
||||
|
||||
self.inserter
|
||||
.handle_row_inserts(requests, query_ctx, &self.statement_executor)
|
||||
.handle_row_inserts(requests, query_ctx, &self.statement_executor, false)
|
||||
.await
|
||||
.context(TableOperationSnafu)?;
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::meta::cluster_server::ClusterServer;
|
||||
@@ -36,7 +37,6 @@ use common_telemetry::info;
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
use deadpool_postgres::{Config, Runtime};
|
||||
use etcd_client::Client;
|
||||
use futures::future;
|
||||
use servers::configurator::ConfiguratorRef;
|
||||
use servers::export_metrics::ExportMetricsTask;
|
||||
use servers::http::{HttpServer, HttpServerBuilder};
|
||||
@@ -53,6 +53,7 @@ use sqlx::mysql::{MySqlConnection, MySqlPool};
|
||||
use sqlx::Connection;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::sync::mpsc::{self, Receiver, Sender};
|
||||
use tokio::sync::{oneshot, Mutex};
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
use tokio_postgres::NoTls;
|
||||
use tonic::codec::CompressionEncoding;
|
||||
@@ -88,6 +89,12 @@ pub struct MetasrvInstance {
|
||||
plugins: Plugins,
|
||||
|
||||
export_metrics_task: Option<ExportMetricsTask>,
|
||||
|
||||
/// gRPC serving state receiver. Only present if the gRPC server is started.
|
||||
serve_state: Arc<Mutex<Option<oneshot::Receiver<Result<()>>>>>,
|
||||
|
||||
/// gRPC bind addr
|
||||
bind_addr: Option<SocketAddr>,
|
||||
}
|
||||
|
||||
impl MetasrvInstance {
|
||||
@@ -113,6 +120,8 @@ impl MetasrvInstance {
|
||||
signal_sender: None,
|
||||
plugins,
|
||||
export_metrics_task,
|
||||
serve_state: Default::default(),
|
||||
bind_addr: None,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -132,21 +141,30 @@ impl MetasrvInstance {
|
||||
router = configurator.config_grpc(router);
|
||||
}
|
||||
|
||||
let metasrv = bootstrap_metasrv_with_router(&self.opts.bind_addr, router, rx);
|
||||
let (serve_state_tx, serve_state_rx) = oneshot::channel();
|
||||
|
||||
let socket_addr =
|
||||
bootstrap_metasrv_with_router(&self.opts.bind_addr, router, serve_state_tx, rx).await?;
|
||||
self.bind_addr = Some(socket_addr);
|
||||
|
||||
let addr = self.opts.http.addr.parse().context(error::ParseAddrSnafu {
|
||||
addr: &self.opts.http.addr,
|
||||
})?;
|
||||
let http_srv = async {
|
||||
self.http_server
|
||||
.start(addr)
|
||||
.await
|
||||
.context(error::StartHttpSnafu)
|
||||
};
|
||||
future::try_join(metasrv, http_srv).await?;
|
||||
self.http_server
|
||||
.start(addr)
|
||||
.await
|
||||
.context(error::StartHttpSnafu)?;
|
||||
|
||||
*self.serve_state.lock().await = Some(serve_state_rx);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn shutdown(&self) -> Result<()> {
|
||||
if let Some(mut rx) = self.serve_state.lock().await.take() {
|
||||
if let Ok(Err(err)) = rx.try_recv() {
|
||||
common_telemetry::error!(err; "Metasrv start failed")
|
||||
}
|
||||
}
|
||||
if let Some(signal) = &self.signal_sender {
|
||||
signal
|
||||
.send(())
|
||||
@@ -170,30 +188,42 @@ impl MetasrvInstance {
|
||||
pub fn get_inner(&self) -> &Metasrv {
|
||||
&self.metasrv
|
||||
}
|
||||
pub fn bind_addr(&self) -> &Option<SocketAddr> {
|
||||
&self.bind_addr
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn bootstrap_metasrv_with_router(
|
||||
bind_addr: &str,
|
||||
router: Router,
|
||||
mut signal: Receiver<()>,
|
||||
) -> Result<()> {
|
||||
serve_state_tx: oneshot::Sender<Result<()>>,
|
||||
mut shutdown_rx: Receiver<()>,
|
||||
) -> Result<SocketAddr> {
|
||||
let listener = TcpListener::bind(bind_addr)
|
||||
.await
|
||||
.context(error::TcpBindSnafu { addr: bind_addr })?;
|
||||
|
||||
info!("gRPC server is bound to: {bind_addr}");
|
||||
let real_bind_addr = listener
|
||||
.local_addr()
|
||||
.context(error::TcpBindSnafu { addr: bind_addr })?;
|
||||
|
||||
info!("gRPC server is bound to: {}", real_bind_addr);
|
||||
|
||||
let incoming =
|
||||
TcpIncoming::from_listener(listener, true, None).context(error::TcpIncomingSnafu)?;
|
||||
|
||||
router
|
||||
.serve_with_incoming_shutdown(incoming, async {
|
||||
let _ = signal.recv().await;
|
||||
})
|
||||
.await
|
||||
.context(error::StartGrpcSnafu)?;
|
||||
let _handle = common_runtime::spawn_global(async move {
|
||||
let result = router
|
||||
.serve_with_incoming_shutdown(incoming, async {
|
||||
let _ = shutdown_rx.recv().await;
|
||||
})
|
||||
.await
|
||||
.inspect_err(|err| common_telemetry::error!(err;"Failed to start metasrv"))
|
||||
.context(error::StartGrpcSnafu);
|
||||
let _ = serve_state_tx.send(result);
|
||||
});
|
||||
|
||||
Ok(())
|
||||
Ok(real_bind_addr)
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
|
||||
@@ -97,3 +97,8 @@ required-features = ["test"]
|
||||
name = "bench_filter_time_partition"
|
||||
harness = false
|
||||
required-features = ["test"]
|
||||
|
||||
[[bench]]
|
||||
name = "bench_compaction_picker"
|
||||
harness = false
|
||||
required-features = ["test"]
|
||||
|
||||
157
src/mito2/benches/bench_compaction_picker.rs
Normal file
157
src/mito2/benches/bench_compaction_picker.rs
Normal file
@@ -0,0 +1,157 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use criterion::{black_box, criterion_group, criterion_main, Criterion};
|
||||
use mito2::compaction::run::{
|
||||
find_overlapping_items, find_sorted_runs, merge_seq_files, reduce_runs, Item, Ranged, SortedRun,
|
||||
};
|
||||
|
||||
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
|
||||
struct MockFile {
|
||||
start: i64,
|
||||
end: i64,
|
||||
size: usize,
|
||||
}
|
||||
|
||||
impl Ranged for MockFile {
|
||||
type BoundType = i64;
|
||||
|
||||
fn range(&self) -> (Self::BoundType, Self::BoundType) {
|
||||
(self.start, self.end)
|
||||
}
|
||||
}
|
||||
|
||||
impl Item for MockFile {
|
||||
fn size(&self) -> usize {
|
||||
self.size
|
||||
}
|
||||
}
|
||||
|
||||
fn generate_test_files(n: usize) -> Vec<MockFile> {
|
||||
let mut files = Vec::with_capacity(n);
|
||||
for _ in 0..n {
|
||||
// Create slightly overlapping ranges to force multiple sorted runs
|
||||
files.push(MockFile {
|
||||
start: 0,
|
||||
end: 10,
|
||||
size: 10,
|
||||
});
|
||||
}
|
||||
files
|
||||
}
|
||||
|
||||
fn bench_find_sorted_runs(c: &mut Criterion) {
|
||||
let mut group = c.benchmark_group("find_sorted_runs");
|
||||
|
||||
for size in [10, 100, 1000].iter() {
|
||||
group.bench_function(format!("size_{}", size), |b| {
|
||||
let mut files = generate_test_files(*size);
|
||||
b.iter(|| {
|
||||
find_sorted_runs(black_box(&mut files));
|
||||
});
|
||||
});
|
||||
}
|
||||
group.finish();
|
||||
}
|
||||
|
||||
fn bench_reduce_runs(c: &mut Criterion) {
|
||||
let mut group = c.benchmark_group("reduce_runs");
|
||||
|
||||
for size in [10, 100, 1000].iter() {
|
||||
group.bench_function(format!("size_{}", size), |b| {
|
||||
let mut files = generate_test_files(*size);
|
||||
let runs = find_sorted_runs(&mut files);
|
||||
b.iter(|| {
|
||||
reduce_runs(black_box(runs.clone()));
|
||||
});
|
||||
});
|
||||
}
|
||||
group.finish();
|
||||
}
|
||||
|
||||
fn bench_find_overlapping_items(c: &mut Criterion) {
|
||||
let mut group = c.benchmark_group("find_overlapping_items");
|
||||
|
||||
for size in [10, 100, 1000].iter() {
|
||||
group.bench_function(format!("size_{}", size), |b| {
|
||||
// Create two sets of files with some overlapping ranges
|
||||
let mut files1 = Vec::with_capacity(*size);
|
||||
let mut files2 = Vec::with_capacity(*size);
|
||||
|
||||
for i in 0..*size {
|
||||
files1.push(MockFile {
|
||||
start: i as i64,
|
||||
end: (i + 5) as i64,
|
||||
size: 10,
|
||||
});
|
||||
|
||||
files2.push(MockFile {
|
||||
start: (i + 3) as i64,
|
||||
end: (i + 8) as i64,
|
||||
size: 10,
|
||||
});
|
||||
}
|
||||
|
||||
let mut r1 = SortedRun::from(files1);
|
||||
let mut r2 = SortedRun::from(files2);
|
||||
b.iter(|| {
|
||||
let mut result = vec![];
|
||||
find_overlapping_items(black_box(&mut r1), black_box(&mut r2), &mut result);
|
||||
});
|
||||
});
|
||||
}
|
||||
group.finish();
|
||||
}
|
||||
|
||||
fn bench_merge_seq_files(c: &mut Criterion) {
|
||||
let mut group = c.benchmark_group("merge_seq_files");
|
||||
|
||||
for size in [10, 100, 1000].iter() {
|
||||
group.bench_function(format!("size_{}", size), |b| {
|
||||
// Create a set of files with varying sizes
|
||||
let mut files = Vec::with_capacity(*size);
|
||||
|
||||
for i in 0..*size {
|
||||
// Create files with different sizes to test the scoring algorithm
|
||||
let file_size = if i % 3 == 0 {
|
||||
5
|
||||
} else if i % 3 == 1 {
|
||||
10
|
||||
} else {
|
||||
15
|
||||
};
|
||||
|
||||
files.push(MockFile {
|
||||
start: i as i64,
|
||||
end: (i + 1) as i64,
|
||||
size: file_size,
|
||||
});
|
||||
}
|
||||
|
||||
b.iter(|| {
|
||||
merge_seq_files(black_box(&files), black_box(Some(50)));
|
||||
});
|
||||
});
|
||||
}
|
||||
group.finish();
|
||||
}
|
||||
|
||||
criterion_group!(
|
||||
benches,
|
||||
bench_find_sorted_runs,
|
||||
bench_reduce_runs,
|
||||
bench_find_overlapping_items,
|
||||
bench_merge_seq_files
|
||||
);
|
||||
criterion_main!(benches);
|
||||
@@ -362,7 +362,7 @@ impl FilePathProvider for WriteCachePathProvider {
|
||||
/// Path provider that builds paths in region storage path.
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) struct RegionFilePathFactory {
|
||||
region_dir: String,
|
||||
pub(crate) region_dir: String,
|
||||
}
|
||||
|
||||
impl RegionFilePathFactory {
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
mod buckets;
|
||||
pub mod compactor;
|
||||
pub mod picker;
|
||||
mod run;
|
||||
pub mod run;
|
||||
mod task;
|
||||
#[cfg(test)]
|
||||
mod test_util;
|
||||
|
||||
@@ -36,6 +36,7 @@ use crate::error::{EmptyRegionDirSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Res
|
||||
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
|
||||
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
|
||||
use crate::manifest::storage::manifest_compress_type;
|
||||
use crate::metrics;
|
||||
use crate::read::Source;
|
||||
use crate::region::opener::new_manifest_dir;
|
||||
use crate::region::options::RegionOptions;
|
||||
@@ -240,6 +241,14 @@ impl MergeOutput {
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.files_to_add.is_empty() && self.files_to_remove.is_empty()
|
||||
}
|
||||
|
||||
pub fn input_file_size(&self) -> u64 {
|
||||
self.files_to_remove.iter().map(|f| f.file_size).sum()
|
||||
}
|
||||
|
||||
pub fn output_file_size(&self) -> u64 {
|
||||
self.files_to_add.iter().map(|f| f.file_size).sum()
|
||||
}
|
||||
}
|
||||
|
||||
/// Compactor is the trait that defines the compaction logic.
|
||||
@@ -286,6 +295,7 @@ impl Compactor for DefaultCompactor {
|
||||
compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone()));
|
||||
let write_opts = WriteOptions {
|
||||
write_buffer_size: compaction_region.engine_config.sst_write_buffer_size,
|
||||
max_file_size: picker_output.max_file_size,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
@@ -460,6 +470,9 @@ impl Compactor for DefaultCompactor {
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
metrics::COMPACTION_INPUT_BYTES.inc_by(merge_output.input_file_size() as f64);
|
||||
metrics::COMPACTION_OUTPUT_BYTES.inc_by(merge_output.output_file_size() as f64);
|
||||
self.update_manifest(compaction_region, merge_output)
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -45,6 +45,8 @@ pub struct PickerOutput {
|
||||
pub outputs: Vec<CompactionOutput>,
|
||||
pub expired_ssts: Vec<FileHandle>,
|
||||
pub time_window_size: i64,
|
||||
/// Max single output file size in bytes.
|
||||
pub max_file_size: Option<usize>,
|
||||
}
|
||||
|
||||
/// SerializedPickerOutput is a serialized version of PickerOutput by replacing [CompactionOutput] and [FileHandle] with [SerializedCompactionOutput] and [FileMeta].
|
||||
@@ -53,6 +55,7 @@ pub struct SerializedPickerOutput {
|
||||
pub outputs: Vec<SerializedCompactionOutput>,
|
||||
pub expired_ssts: Vec<FileMeta>,
|
||||
pub time_window_size: i64,
|
||||
pub max_file_size: Option<usize>,
|
||||
}
|
||||
|
||||
impl From<&PickerOutput> for SerializedPickerOutput {
|
||||
@@ -76,6 +79,7 @@ impl From<&PickerOutput> for SerializedPickerOutput {
|
||||
outputs,
|
||||
expired_ssts,
|
||||
time_window_size: input.time_window_size,
|
||||
max_file_size: input.max_file_size,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -111,6 +115,7 @@ impl PickerOutput {
|
||||
outputs,
|
||||
expired_ssts,
|
||||
time_window_size: input.time_window_size,
|
||||
max_file_size: input.max_file_size,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -131,10 +136,7 @@ pub fn new_picker(
|
||||
} else {
|
||||
match compaction_options {
|
||||
CompactionOptions::Twcs(twcs_opts) => Arc::new(TwcsPicker {
|
||||
max_active_window_runs: twcs_opts.max_active_window_runs,
|
||||
max_active_window_files: twcs_opts.max_active_window_files,
|
||||
max_inactive_window_runs: twcs_opts.max_inactive_window_runs,
|
||||
max_inactive_window_files: twcs_opts.max_inactive_window_files,
|
||||
trigger_file_num: twcs_opts.trigger_file_num,
|
||||
time_window_seconds: twcs_opts.time_window_seconds(),
|
||||
max_output_file_size: twcs_opts.max_output_file_size.map(|r| r.as_bytes()),
|
||||
append_mode,
|
||||
@@ -179,6 +181,7 @@ mod tests {
|
||||
],
|
||||
expired_ssts: expired_ssts_file_handle.clone(),
|
||||
time_window_size: 1000,
|
||||
max_file_size: None,
|
||||
};
|
||||
|
||||
let picker_output_str =
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -22,7 +22,6 @@ use tokio::sync::mpsc;
|
||||
|
||||
use crate::compaction::compactor::{CompactionRegion, Compactor};
|
||||
use crate::compaction::picker::{CompactionTask, PickerOutput};
|
||||
use crate::error;
|
||||
use crate::error::CompactRegionSnafu;
|
||||
use crate::manifest::action::RegionEdit;
|
||||
use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED};
|
||||
@@ -30,6 +29,7 @@ use crate::request::{
|
||||
BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest,
|
||||
};
|
||||
use crate::worker::WorkerListener;
|
||||
use crate::{error, metrics};
|
||||
|
||||
/// Maximum number of compaction tasks in parallel.
|
||||
pub const MAX_PARALLEL_COMPACTION: usize = 1;
|
||||
@@ -98,6 +98,8 @@ impl CompactionTaskImpl {
|
||||
};
|
||||
let merge_time = merge_timer.stop_and_record();
|
||||
|
||||
metrics::COMPACTION_INPUT_BYTES.inc_by(compaction_result.input_file_size() as f64);
|
||||
metrics::COMPACTION_OUTPUT_BYTES.inc_by(compaction_result.output_file_size() as f64);
|
||||
info!(
|
||||
"Compacted SST files, region_id: {}, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}, merge_time: {}s",
|
||||
self.compaction_region.region_id,
|
||||
|
||||
@@ -44,30 +44,3 @@ pub fn new_file_handle(
|
||||
file_purger,
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) fn new_file_handles(file_specs: &[(i64, i64, u64)]) -> Vec<FileHandle> {
|
||||
let file_purger = new_noop_file_purger();
|
||||
file_specs
|
||||
.iter()
|
||||
.map(|(start, end, size)| {
|
||||
FileHandle::new(
|
||||
FileMeta {
|
||||
region_id: 0.into(),
|
||||
file_id: FileId::random(),
|
||||
time_range: (
|
||||
Timestamp::new_millisecond(*start),
|
||||
Timestamp::new_millisecond(*end),
|
||||
),
|
||||
level: 0,
|
||||
file_size: *size,
|
||||
available_indexes: Default::default(),
|
||||
index_file_size: 0,
|
||||
num_rows: 0,
|
||||
num_row_groups: 0,
|
||||
sequence: None,
|
||||
},
|
||||
file_purger.clone(),
|
||||
)
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
@@ -16,15 +16,17 @@ use std::collections::hash_map::Entry;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::fmt::Debug;
|
||||
|
||||
use common_telemetry::{info, trace};
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_telemetry::info;
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use common_time::timestamp_millis::BucketAligned;
|
||||
use common_time::Timestamp;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::compaction::buckets::infer_time_bucket;
|
||||
use crate::compaction::compactor::CompactionRegion;
|
||||
use crate::compaction::picker::{Picker, PickerOutput};
|
||||
use crate::compaction::run::{find_sorted_runs, reduce_runs, Item};
|
||||
use crate::compaction::run::{find_sorted_runs, merge_seq_files, reduce_runs};
|
||||
use crate::compaction::{get_expired_ssts, CompactionOutput};
|
||||
use crate::sst::file::{overlaps, FileHandle, Level};
|
||||
use crate::sst::version::LevelMeta;
|
||||
@@ -35,14 +37,8 @@ const LEVEL_COMPACTED: Level = 1;
|
||||
/// candidates.
|
||||
#[derive(Debug)]
|
||||
pub struct TwcsPicker {
|
||||
/// Max allowed sorted runs in active window.
|
||||
pub max_active_window_runs: usize,
|
||||
/// Max allowed files in active window.
|
||||
pub max_active_window_files: usize,
|
||||
/// Max allowed sorted runs in inactive windows.
|
||||
pub max_inactive_window_runs: usize,
|
||||
/// Max allowed files in inactive windows.
|
||||
pub max_inactive_window_files: usize,
|
||||
/// Minimum file num to trigger a compaction.
|
||||
pub trigger_file_num: usize,
|
||||
/// Compaction time window in seconds.
|
||||
pub time_window_seconds: Option<i64>,
|
||||
/// Max allowed compaction output file size.
|
||||
@@ -53,89 +49,48 @@ pub struct TwcsPicker {
|
||||
|
||||
impl TwcsPicker {
|
||||
/// Builds compaction output from files.
|
||||
/// For active writing window, we allow for at most `max_active_window_runs` files to alleviate
|
||||
/// fragmentation. For other windows, we allow at most 1 file at each window.
|
||||
fn build_output(
|
||||
&self,
|
||||
region_id: RegionId,
|
||||
time_windows: &mut BTreeMap<i64, Window>,
|
||||
active_window: Option<i64>,
|
||||
) -> Vec<CompactionOutput> {
|
||||
let mut output = vec![];
|
||||
for (window, files) in time_windows {
|
||||
if files.files.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let sorted_runs = find_sorted_runs(&mut files.files);
|
||||
let found_runs = sorted_runs.len();
|
||||
// We only remove deletion markers if we found less than 2 runs and not in append mode.
|
||||
// because after compaction there will be no overlapping files.
|
||||
let filter_deleted = !files.overlapping && found_runs <= 2 && !self.append_mode;
|
||||
|
||||
let (max_runs, max_files) = if let Some(active_window) = active_window
|
||||
&& *window == active_window
|
||||
{
|
||||
(self.max_active_window_runs, self.max_active_window_files)
|
||||
let inputs = if found_runs > 1 {
|
||||
reduce_runs(sorted_runs)
|
||||
} else {
|
||||
(
|
||||
self.max_inactive_window_runs,
|
||||
self.max_inactive_window_files,
|
||||
)
|
||||
let run = sorted_runs.last().unwrap();
|
||||
if run.items().len() < self.trigger_file_num {
|
||||
continue;
|
||||
}
|
||||
// no overlapping files, try merge small files
|
||||
merge_seq_files(run.items(), self.max_output_file_size)
|
||||
};
|
||||
|
||||
let found_runs = sorted_runs.len();
|
||||
// We only remove deletion markers once no file in current window overlaps with any other window
|
||||
// and region is not in append mode.
|
||||
let filter_deleted =
|
||||
!files.overlapping && (found_runs == 1 || max_runs == 1) && !self.append_mode;
|
||||
|
||||
let inputs = if found_runs > max_runs {
|
||||
let files_to_compact = reduce_runs(sorted_runs, max_runs);
|
||||
let files_to_compact_len = files_to_compact.len();
|
||||
info!(
|
||||
"Building compaction output, active window: {:?}, \
|
||||
current window: {}, \
|
||||
max runs: {}, \
|
||||
found runs: {}, \
|
||||
output size: {}, \
|
||||
max output size: {:?}, \
|
||||
remove deletion markers: {}",
|
||||
active_window,
|
||||
if !inputs.is_empty() {
|
||||
log_pick_result(
|
||||
region_id,
|
||||
*window,
|
||||
max_runs,
|
||||
active_window,
|
||||
found_runs,
|
||||
files_to_compact_len,
|
||||
self.max_output_file_size,
|
||||
filter_deleted
|
||||
);
|
||||
files_to_compact
|
||||
} else if files.files.len() > max_files {
|
||||
info!(
|
||||
"Enforcing max file num in window: {}, active: {:?}, max: {}, current: {}, max output size: {:?}, filter delete: {}",
|
||||
*window,
|
||||
active_window,
|
||||
max_files,
|
||||
files.files.len(),
|
||||
self.max_output_file_size,
|
||||
filter_deleted,
|
||||
&inputs,
|
||||
);
|
||||
// Files in window exceeds file num limit
|
||||
vec![enforce_file_num(&files.files, max_files)]
|
||||
} else {
|
||||
trace!("Skip building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, ", active_window, *window, max_runs, found_runs);
|
||||
continue;
|
||||
};
|
||||
|
||||
let split_inputs = if !filter_deleted
|
||||
&& let Some(max_output_file_size) = self.max_output_file_size
|
||||
{
|
||||
let len_before_split = inputs.len();
|
||||
let maybe_split = enforce_max_output_size(inputs, max_output_file_size);
|
||||
if maybe_split.len() != len_before_split {
|
||||
info!("Compaction output file size exceeds threshold {}, split compaction inputs to: {:?}", max_output_file_size, maybe_split);
|
||||
}
|
||||
maybe_split
|
||||
} else {
|
||||
inputs
|
||||
};
|
||||
|
||||
for input in split_inputs {
|
||||
debug_assert!(input.len() > 1);
|
||||
output.push(CompactionOutput {
|
||||
output_level: LEVEL_COMPACTED, // always compact to l1
|
||||
inputs: input,
|
||||
inputs,
|
||||
filter_deleted,
|
||||
output_time_range: None, // we do not enforce output time range in twcs compactions.
|
||||
});
|
||||
@@ -145,66 +100,50 @@ impl TwcsPicker {
|
||||
}
|
||||
}
|
||||
|
||||
/// Limits the size of compaction output in a naive manner.
|
||||
/// todo(hl): we can find the output file size more precisely by checking the time range
|
||||
/// of each row group and adding the sizes of those non-overlapping row groups. But now
|
||||
/// we'd better not to expose the SST details in this level.
|
||||
fn enforce_max_output_size(
|
||||
inputs: Vec<Vec<FileHandle>>,
|
||||
max_output_file_size: u64,
|
||||
) -> Vec<Vec<FileHandle>> {
|
||||
inputs
|
||||
.into_iter()
|
||||
.flat_map(|input| {
|
||||
debug_assert!(input.len() > 1);
|
||||
let estimated_output_size = input.iter().map(|f| f.size()).sum::<u64>();
|
||||
if estimated_output_size < max_output_file_size {
|
||||
// total file size does not exceed the threshold, just return the original input.
|
||||
return vec![input];
|
||||
}
|
||||
let mut splits = vec![];
|
||||
let mut new_input = vec![];
|
||||
let mut new_input_size = 0;
|
||||
for f in input {
|
||||
if new_input_size + f.size() > max_output_file_size {
|
||||
splits.push(std::mem::take(&mut new_input));
|
||||
new_input_size = 0;
|
||||
}
|
||||
new_input_size += f.size();
|
||||
new_input.push(f);
|
||||
}
|
||||
if !new_input.is_empty() {
|
||||
splits.push(new_input);
|
||||
}
|
||||
splits
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn log_pick_result(
|
||||
region_id: RegionId,
|
||||
window: i64,
|
||||
active_window: Option<i64>,
|
||||
found_runs: usize,
|
||||
file_num: usize,
|
||||
max_output_file_size: Option<u64>,
|
||||
filter_deleted: bool,
|
||||
inputs: &[FileHandle],
|
||||
) {
|
||||
let input_file_str: Vec<String> = inputs
|
||||
.iter()
|
||||
.map(|f| {
|
||||
let range = f.time_range();
|
||||
let start = range.0.to_iso8601_string();
|
||||
let end = range.1.to_iso8601_string();
|
||||
let num_rows = f.num_rows();
|
||||
format!(
|
||||
"SST{{id: {}, range: ({}, {}), size: {}, num rows: {} }}",
|
||||
f.file_id(),
|
||||
start,
|
||||
end,
|
||||
ReadableSize(f.size()),
|
||||
num_rows
|
||||
)
|
||||
})
|
||||
.filter(|p| p.len() > 1)
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Merges consecutive files so that file num does not exceed `max_file_num`, and chooses
|
||||
/// the solution with minimum overhead according to files sizes to be merged.
|
||||
/// `enforce_file_num` only merges consecutive files so that it won't create overlapping outputs.
|
||||
/// `runs` must be sorted according to time ranges.
|
||||
fn enforce_file_num<T: Item>(files: &[T], max_file_num: usize) -> Vec<T> {
|
||||
debug_assert!(files.len() > max_file_num);
|
||||
let to_merge = files.len() - max_file_num + 1;
|
||||
let mut min_penalty = usize::MAX;
|
||||
let mut min_idx = 0;
|
||||
|
||||
for idx in 0..=(files.len() - to_merge) {
|
||||
let current_penalty: usize = files
|
||||
.iter()
|
||||
.skip(idx)
|
||||
.take(to_merge)
|
||||
.map(|f| f.size())
|
||||
.sum();
|
||||
if current_penalty < min_penalty {
|
||||
min_penalty = current_penalty;
|
||||
min_idx = idx;
|
||||
}
|
||||
}
|
||||
files.iter().skip(min_idx).take(to_merge).cloned().collect()
|
||||
.collect();
|
||||
let window_str = Timestamp::new_second(window).to_iso8601_string();
|
||||
let active_window_str = active_window.map(|s| Timestamp::new_second(s).to_iso8601_string());
|
||||
let max_output_file_size = max_output_file_size.map(|size| ReadableSize(size).to_string());
|
||||
info!(
|
||||
"Region ({:?}) compaction pick result: current window: {}, active window: {:?}, \
|
||||
found runs: {}, file num: {}, max output file size: {:?}, filter deleted: {}, \
|
||||
input files: {:?}",
|
||||
region_id,
|
||||
window_str,
|
||||
active_window_str,
|
||||
found_runs,
|
||||
file_num,
|
||||
max_output_file_size,
|
||||
filter_deleted,
|
||||
input_file_str
|
||||
);
|
||||
}
|
||||
|
||||
impl Picker for TwcsPicker {
|
||||
@@ -240,16 +179,18 @@ impl Picker for TwcsPicker {
|
||||
// Assign files to windows
|
||||
let mut windows =
|
||||
assign_to_windows(levels.iter().flat_map(LevelMeta::files), time_window_size);
|
||||
let outputs = self.build_output(&mut windows, active_window);
|
||||
let outputs = self.build_output(region_id, &mut windows, active_window);
|
||||
|
||||
if outputs.is_empty() && expired_ssts.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let max_file_size = self.max_output_file_size.map(|v| v as usize);
|
||||
Some(PickerOutput {
|
||||
outputs,
|
||||
expired_ssts,
|
||||
time_window_size,
|
||||
max_file_size,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -368,12 +309,10 @@ fn find_latest_window_in_seconds<'a>(
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::*;
|
||||
use crate::compaction::test_util::{new_file_handle, new_file_handles};
|
||||
use crate::sst::file::{FileId, FileMeta, Level};
|
||||
use crate::test_util::NoopFilePurger;
|
||||
use crate::compaction::test_util::new_file_handle;
|
||||
use crate::sst::file::{FileId, Level};
|
||||
|
||||
#[test]
|
||||
fn test_get_latest_window_in_seconds() {
|
||||
@@ -614,25 +553,31 @@ mod tests {
|
||||
|
||||
impl CompactionPickerTestCase {
|
||||
fn check(&self) {
|
||||
let file_id_to_idx = self
|
||||
.input_files
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(idx, file)| (file.file_id(), idx))
|
||||
.collect::<HashMap<_, _>>();
|
||||
let mut windows = assign_to_windows(self.input_files.iter(), self.window_size);
|
||||
let active_window =
|
||||
find_latest_window_in_seconds(self.input_files.iter(), self.window_size);
|
||||
let output = TwcsPicker {
|
||||
max_active_window_runs: 4,
|
||||
max_active_window_files: usize::MAX,
|
||||
max_inactive_window_runs: 1,
|
||||
max_inactive_window_files: usize::MAX,
|
||||
trigger_file_num: 4,
|
||||
time_window_seconds: None,
|
||||
max_output_file_size: None,
|
||||
append_mode: false,
|
||||
}
|
||||
.build_output(&mut windows, active_window);
|
||||
.build_output(RegionId::from_u64(0), &mut windows, active_window);
|
||||
|
||||
let output = output
|
||||
.iter()
|
||||
.map(|o| {
|
||||
let input_file_ids =
|
||||
o.inputs.iter().map(|f| f.file_id()).collect::<HashSet<_>>();
|
||||
let input_file_ids = o
|
||||
.inputs
|
||||
.iter()
|
||||
.map(|f| file_id_to_idx.get(&f.file_id()).copied().unwrap())
|
||||
.collect::<HashSet<_>>();
|
||||
(input_file_ids, o.output_level)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
@@ -641,11 +586,7 @@ mod tests {
|
||||
.expected_outputs
|
||||
.iter()
|
||||
.map(|o| {
|
||||
let input_file_ids = o
|
||||
.input_files
|
||||
.iter()
|
||||
.map(|idx| self.input_files[*idx].file_id())
|
||||
.collect::<HashSet<_>>();
|
||||
let input_file_ids = o.input_files.iter().copied().collect::<HashSet<_>>();
|
||||
(input_file_ids, o.output_level)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
@@ -658,47 +599,11 @@ mod tests {
|
||||
output_level: Level,
|
||||
}
|
||||
|
||||
fn check_enforce_file_num(
|
||||
input_files: &[(i64, i64, u64)],
|
||||
max_file_num: usize,
|
||||
files_to_merge: &[(i64, i64)],
|
||||
) {
|
||||
let mut files = new_file_handles(input_files);
|
||||
// ensure sorted
|
||||
find_sorted_runs(&mut files);
|
||||
let mut to_merge = enforce_file_num(&files, max_file_num);
|
||||
to_merge.sort_unstable_by_key(|f| f.time_range().0);
|
||||
assert_eq!(
|
||||
files_to_merge.to_vec(),
|
||||
to_merge
|
||||
.iter()
|
||||
.map(|f| {
|
||||
let (start, end) = f.time_range();
|
||||
(start.value(), end.value())
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_enforce_file_num() {
|
||||
check_enforce_file_num(
|
||||
&[(0, 300, 2), (100, 200, 1), (200, 400, 1)],
|
||||
2,
|
||||
&[(100, 200), (200, 400)],
|
||||
);
|
||||
|
||||
check_enforce_file_num(
|
||||
&[(0, 300, 200), (100, 200, 100), (200, 400, 100)],
|
||||
1,
|
||||
&[(0, 300), (100, 200), (200, 400)],
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_twcs_output() {
|
||||
let file_ids = (0..4).map(|_| FileId::random()).collect::<Vec<_>>();
|
||||
|
||||
// Case 1: 2 runs found in each time window.
|
||||
CompactionPickerTestCase {
|
||||
window_size: 3,
|
||||
input_files: [
|
||||
@@ -708,13 +613,25 @@ mod tests {
|
||||
new_file_handle(file_ids[3], 50, 2998, 0), //active windows
|
||||
]
|
||||
.to_vec(),
|
||||
expected_outputs: vec![ExpectedOutput {
|
||||
input_files: vec![0, 1],
|
||||
output_level: 1,
|
||||
}],
|
||||
expected_outputs: vec![
|
||||
ExpectedOutput {
|
||||
input_files: vec![0, 1],
|
||||
output_level: 1,
|
||||
},
|
||||
ExpectedOutput {
|
||||
input_files: vec![2, 3],
|
||||
output_level: 1,
|
||||
},
|
||||
],
|
||||
}
|
||||
.check();
|
||||
|
||||
// Case 2:
|
||||
// -2000........-3
|
||||
// -3000.....-100
|
||||
// 0..............2999
|
||||
// 50..........2998
|
||||
// 11.........2990
|
||||
let file_ids = (0..6).map(|_| FileId::random()).collect::<Vec<_>>();
|
||||
CompactionPickerTestCase {
|
||||
window_size: 3,
|
||||
@@ -724,7 +641,6 @@ mod tests {
|
||||
new_file_handle(file_ids[2], 0, 2999, 0),
|
||||
new_file_handle(file_ids[3], 50, 2998, 0),
|
||||
new_file_handle(file_ids[4], 11, 2990, 0),
|
||||
new_file_handle(file_ids[5], 50, 4998, 0),
|
||||
]
|
||||
.to_vec(),
|
||||
expected_outputs: vec![
|
||||
@@ -733,7 +649,7 @@ mod tests {
|
||||
output_level: 1,
|
||||
},
|
||||
ExpectedOutput {
|
||||
input_files: vec![2, 3, 4],
|
||||
input_files: vec![2, 4],
|
||||
output_level: 1,
|
||||
},
|
||||
],
|
||||
@@ -741,44 +657,5 @@ mod tests {
|
||||
.check();
|
||||
}
|
||||
|
||||
fn make_file_handles(inputs: &[(i64, i64, u64)]) -> Vec<FileHandle> {
|
||||
inputs
|
||||
.iter()
|
||||
.map(|(start, end, size)| {
|
||||
FileHandle::new(
|
||||
FileMeta {
|
||||
region_id: Default::default(),
|
||||
file_id: Default::default(),
|
||||
time_range: (
|
||||
Timestamp::new_millisecond(*start),
|
||||
Timestamp::new_millisecond(*end),
|
||||
),
|
||||
level: 0,
|
||||
file_size: *size,
|
||||
available_indexes: Default::default(),
|
||||
index_file_size: 0,
|
||||
num_rows: 0,
|
||||
num_row_groups: 0,
|
||||
sequence: None,
|
||||
},
|
||||
Arc::new(NoopFilePurger),
|
||||
)
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_limit_output_size() {
|
||||
let mut files = make_file_handles(&[(1, 1, 1)].repeat(6));
|
||||
let runs = find_sorted_runs(&mut files);
|
||||
assert_eq!(6, runs.len());
|
||||
let files_to_merge = reduce_runs(runs, 2);
|
||||
|
||||
let enforced = enforce_max_output_size(files_to_merge, 2);
|
||||
assert_eq!(2, enforced.len());
|
||||
assert_eq!(2, enforced[0].len());
|
||||
assert_eq!(2, enforced[1].len());
|
||||
}
|
||||
|
||||
// TODO(hl): TTL tester that checks if get_expired_ssts function works as expected.
|
||||
}
|
||||
|
||||
@@ -115,6 +115,7 @@ impl Picker for WindowedCompactionPicker {
|
||||
outputs,
|
||||
expired_ssts,
|
||||
time_window_size: time_window,
|
||||
max_file_size: None, // todo (hl): we may need to support `max_file_size` parameter in manual compaction.
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -110,8 +110,6 @@ async fn test_append_mode_compaction() {
|
||||
|
||||
let request = CreateRequestBuilder::new()
|
||||
.insert_option("compaction.type", "twcs")
|
||||
.insert_option("compaction.twcs.max_active_window_runs", "2")
|
||||
.insert_option("compaction.twcs.max_inactive_window_runs", "2")
|
||||
.insert_option("append_mode", "true")
|
||||
.build();
|
||||
let region_dir = request.region_dir.clone();
|
||||
@@ -177,7 +175,7 @@ async fn test_append_mode_compaction() {
|
||||
+-------+---------+---------------------+";
|
||||
// Scans in parallel.
|
||||
let mut scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
|
||||
assert_eq!(2, scanner.num_files());
|
||||
assert_eq!(1, scanner.num_files());
|
||||
assert_eq!(1, scanner.num_memtables());
|
||||
scanner.set_target_partitions(2);
|
||||
let stream = scanner.scan().await.unwrap();
|
||||
|
||||
@@ -129,8 +129,6 @@ async fn test_compaction_region() {
|
||||
|
||||
let request = CreateRequestBuilder::new()
|
||||
.insert_option("compaction.type", "twcs")
|
||||
.insert_option("compaction.twcs.max_active_window_runs", "1")
|
||||
.insert_option("compaction.twcs.max_inactive_window_runs", "1")
|
||||
.build();
|
||||
|
||||
let column_schemas = request
|
||||
@@ -163,12 +161,12 @@ async fn test_compaction_region() {
|
||||
// [0..9]
|
||||
// [10...19]
|
||||
// [20....29]
|
||||
// -[15.........29]-
|
||||
// -[15.........29]- (delete)
|
||||
// [15.....24]
|
||||
// Output:
|
||||
// [0..9]
|
||||
// [10..14]
|
||||
// [15..24]
|
||||
// [10............29] (contains delete)
|
||||
// [15....24]
|
||||
assert_eq!(
|
||||
3,
|
||||
scanner.num_files(),
|
||||
@@ -181,6 +179,71 @@ async fn test_compaction_region() {
|
||||
assert_eq!((0..25).map(|v| v * 1000).collect::<Vec<_>>(), vec);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_compaction_overlapping_files() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let mut env = TestEnv::new();
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
env.get_schema_metadata_manager()
|
||||
.register_region_table_info(
|
||||
region_id.table_id(),
|
||||
"test_table",
|
||||
"test_catalog",
|
||||
"test_schema",
|
||||
None,
|
||||
env.get_kv_backend(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let request = CreateRequestBuilder::new()
|
||||
.insert_option("compaction.type", "twcs")
|
||||
.build();
|
||||
|
||||
let column_schemas = request
|
||||
.column_metadatas
|
||||
.iter()
|
||||
.map(column_metadata_to_column_schema)
|
||||
.collect::<Vec<_>>();
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Create(request))
|
||||
.await
|
||||
.unwrap();
|
||||
// Flush 5 SSTs for compaction.
|
||||
put_and_flush(&engine, region_id, &column_schemas, 0..10).await;
|
||||
delete_and_flush(&engine, region_id, &column_schemas, 10..20).await;
|
||||
put_and_flush(&engine, region_id, &column_schemas, 20..30).await;
|
||||
delete_and_flush(&engine, region_id, &column_schemas, 30..40).await;
|
||||
|
||||
let result = engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
RegionRequest::Compact(RegionCompactRequest::default()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(result.affected_rows, 0);
|
||||
|
||||
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
|
||||
assert_eq!(
|
||||
1,
|
||||
scanner.num_files(),
|
||||
"unexpected files: {:?}",
|
||||
scanner.file_ids()
|
||||
);
|
||||
let stream = scanner.scan().await.unwrap();
|
||||
|
||||
let vec = collect_stream_ts(stream).await;
|
||||
assert_eq!(
|
||||
vec,
|
||||
(0..=9)
|
||||
.map(|v| v * 1000)
|
||||
.chain((20..=29).map(|v| v * 1000))
|
||||
.collect::<Vec<_>>()
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_compaction_region_with_overlapping() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
@@ -201,8 +264,6 @@ async fn test_compaction_region_with_overlapping() {
|
||||
|
||||
let request = CreateRequestBuilder::new()
|
||||
.insert_option("compaction.type", "twcs")
|
||||
.insert_option("compaction.twcs.max_active_window_runs", "2")
|
||||
.insert_option("compaction.twcs.max_inactive_window_runs", "2")
|
||||
.insert_option("compaction.twcs.time_window", "1h")
|
||||
.build();
|
||||
|
||||
@@ -257,10 +318,6 @@ async fn test_compaction_region_with_overlapping_delete_all() {
|
||||
|
||||
let request = CreateRequestBuilder::new()
|
||||
.insert_option("compaction.type", "twcs")
|
||||
.insert_option("compaction.twcs.max_active_window_runs", "2")
|
||||
.insert_option("compaction.twcs.max_active_window_files", "2")
|
||||
.insert_option("compaction.twcs.max_inactive_window_runs", "2")
|
||||
.insert_option("compaction.twcs.max_inactive_window_files", "2")
|
||||
.insert_option("compaction.twcs.time_window", "1h")
|
||||
.build();
|
||||
|
||||
@@ -290,7 +347,7 @@ async fn test_compaction_region_with_overlapping_delete_all() {
|
||||
|
||||
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
|
||||
assert_eq!(
|
||||
4,
|
||||
2,
|
||||
scanner.num_files(),
|
||||
"unexpected files: {:?}",
|
||||
scanner.file_ids()
|
||||
@@ -332,7 +389,6 @@ async fn test_readonly_during_compaction() {
|
||||
|
||||
let request = CreateRequestBuilder::new()
|
||||
.insert_option("compaction.type", "twcs")
|
||||
.insert_option("compaction.twcs.max_active_window_runs", "1")
|
||||
.build();
|
||||
|
||||
let column_schemas = request
|
||||
@@ -404,10 +460,6 @@ async fn test_compaction_update_time_window() {
|
||||
|
||||
let request = CreateRequestBuilder::new()
|
||||
.insert_option("compaction.type", "twcs")
|
||||
.insert_option("compaction.twcs.max_active_window_runs", "2")
|
||||
.insert_option("compaction.twcs.max_active_window_files", "2")
|
||||
.insert_option("compaction.twcs.max_inactive_window_runs", "2")
|
||||
.insert_option("compaction.twcs.max_inactive_window_files", "2")
|
||||
.build();
|
||||
|
||||
let column_schemas = request
|
||||
@@ -420,9 +472,10 @@ async fn test_compaction_update_time_window() {
|
||||
.await
|
||||
.unwrap();
|
||||
// Flush 3 SSTs for compaction.
|
||||
put_and_flush(&engine, region_id, &column_schemas, 0..1200).await; // window 3600
|
||||
put_and_flush(&engine, region_id, &column_schemas, 1200..2400).await; // window 3600
|
||||
put_and_flush(&engine, region_id, &column_schemas, 2400..3600).await; // window 3600
|
||||
put_and_flush(&engine, region_id, &column_schemas, 0..900).await; // window 3600
|
||||
put_and_flush(&engine, region_id, &column_schemas, 900..1800).await; // window 3600
|
||||
put_and_flush(&engine, region_id, &column_schemas, 1800..2700).await; // window 3600
|
||||
put_and_flush(&engine, region_id, &column_schemas, 2700..3600).await; // window 3600
|
||||
|
||||
let result = engine
|
||||
.handle_request(
|
||||
@@ -433,11 +486,21 @@ async fn test_compaction_update_time_window() {
|
||||
.unwrap();
|
||||
assert_eq!(result.affected_rows, 0);
|
||||
|
||||
assert_eq!(
|
||||
engine
|
||||
.get_region(region_id)
|
||||
.unwrap()
|
||||
.version_control
|
||||
.current()
|
||||
.version
|
||||
.compaction_time_window,
|
||||
Some(Duration::from_secs(3600))
|
||||
);
|
||||
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
|
||||
assert_eq!(0, scanner.num_memtables());
|
||||
// We keep at most two files.
|
||||
// We keep all 3 files because no enough file to merge
|
||||
assert_eq!(
|
||||
2,
|
||||
1,
|
||||
scanner.num_files(),
|
||||
"unexpected files: {:?}",
|
||||
scanner.file_ids()
|
||||
@@ -492,10 +555,6 @@ async fn test_change_region_compaction_window() {
|
||||
|
||||
let request = CreateRequestBuilder::new()
|
||||
.insert_option("compaction.type", "twcs")
|
||||
.insert_option("compaction.twcs.max_active_window_runs", "1")
|
||||
.insert_option("compaction.twcs.max_active_window_files", "1")
|
||||
.insert_option("compaction.twcs.max_inactive_window_runs", "1")
|
||||
.insert_option("compaction.twcs.max_inactive_window_files", "1")
|
||||
.build();
|
||||
let region_dir = request.region_dir.clone();
|
||||
let column_schemas = request
|
||||
@@ -508,8 +567,10 @@ async fn test_change_region_compaction_window() {
|
||||
.await
|
||||
.unwrap();
|
||||
// Flush 2 SSTs for compaction.
|
||||
put_and_flush(&engine, region_id, &column_schemas, 0..1200).await; // window 3600
|
||||
put_and_flush(&engine, region_id, &column_schemas, 1200..2400).await; // window 3600
|
||||
put_and_flush(&engine, region_id, &column_schemas, 0..600).await; // window 3600
|
||||
put_and_flush(&engine, region_id, &column_schemas, 600..1200).await; // window 3600
|
||||
put_and_flush(&engine, region_id, &column_schemas, 1200..1800).await; // window 3600
|
||||
put_and_flush(&engine, region_id, &column_schemas, 1800..2400).await; // window 3600
|
||||
|
||||
engine
|
||||
.handle_request(
|
||||
@@ -520,7 +581,7 @@ async fn test_change_region_compaction_window() {
|
||||
.unwrap();
|
||||
|
||||
// Put window 7200
|
||||
put_and_flush(&engine, region_id, &column_schemas, 4000..5000).await; // window 3600
|
||||
put_and_flush(&engine, region_id, &column_schemas, 4000..5000).await;
|
||||
|
||||
// Check compaction window.
|
||||
let region = engine.get_region(region_id).unwrap();
|
||||
@@ -543,6 +604,22 @@ async fn test_change_region_compaction_window() {
|
||||
},
|
||||
});
|
||||
engine.handle_request(region_id, request).await.unwrap();
|
||||
assert_eq!(
|
||||
engine
|
||||
.get_region(region_id)
|
||||
.unwrap()
|
||||
.version_control
|
||||
.current()
|
||||
.version
|
||||
.options
|
||||
.compaction
|
||||
.time_window(),
|
||||
Some(Duration::from_secs(7200))
|
||||
);
|
||||
|
||||
put_and_flush(&engine, region_id, &column_schemas, 5000..5100).await;
|
||||
put_and_flush(&engine, region_id, &column_schemas, 5100..5200).await;
|
||||
put_and_flush(&engine, region_id, &column_schemas, 5200..5300).await;
|
||||
|
||||
// Compaction again. It should compacts window 3600 and 7200
|
||||
// into 7200.
|
||||
@@ -585,12 +662,12 @@ async fn test_change_region_compaction_window() {
|
||||
{
|
||||
let region = engine.get_region(region_id).unwrap();
|
||||
let version = region.version();
|
||||
// We open the region without options, so the time window should be None.
|
||||
assert!(version.options.compaction.time_window().is_none());
|
||||
assert_eq!(
|
||||
Some(Duration::from_secs(7200)),
|
||||
version.compaction_time_window,
|
||||
);
|
||||
// We open the region without options, so the time window should be None.
|
||||
assert!(version.options.compaction.time_window().is_none());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -615,10 +692,6 @@ async fn test_open_overwrite_compaction_window() {
|
||||
|
||||
let request = CreateRequestBuilder::new()
|
||||
.insert_option("compaction.type", "twcs")
|
||||
.insert_option("compaction.twcs.max_active_window_runs", "1")
|
||||
.insert_option("compaction.twcs.max_active_window_files", "1")
|
||||
.insert_option("compaction.twcs.max_inactive_window_runs", "1")
|
||||
.insert_option("compaction.twcs.max_inactive_window_files", "1")
|
||||
.build();
|
||||
let region_dir = request.region_dir.clone();
|
||||
let column_schemas = request
|
||||
@@ -631,8 +704,10 @@ async fn test_open_overwrite_compaction_window() {
|
||||
.await
|
||||
.unwrap();
|
||||
// Flush 2 SSTs for compaction.
|
||||
put_and_flush(&engine, region_id, &column_schemas, 0..1200).await; // window 3600
|
||||
put_and_flush(&engine, region_id, &column_schemas, 1200..2400).await; // window 3600
|
||||
put_and_flush(&engine, region_id, &column_schemas, 0..600).await; // window 3600
|
||||
put_and_flush(&engine, region_id, &column_schemas, 600..1200).await; // window 3600
|
||||
put_and_flush(&engine, region_id, &column_schemas, 1200..1800).await; // window 3600
|
||||
put_and_flush(&engine, region_id, &column_schemas, 1800..2400).await; // window 3600
|
||||
|
||||
engine
|
||||
.handle_request(
|
||||
|
||||
@@ -45,7 +45,6 @@ async fn test_scan_without_filtering_deleted() {
|
||||
.await;
|
||||
let request = CreateRequestBuilder::new()
|
||||
.insert_option("compaction.type", "twcs")
|
||||
.insert_option("compaction.twcs.max_active_window_runs", "10")
|
||||
.build();
|
||||
|
||||
let column_schemas = rows_schema(&request);
|
||||
|
||||
@@ -111,8 +111,6 @@ async fn test_merge_mode_compaction() {
|
||||
let request = CreateRequestBuilder::new()
|
||||
.field_num(2)
|
||||
.insert_option("compaction.type", "twcs")
|
||||
.insert_option("compaction.twcs.max_active_window_runs", "1")
|
||||
.insert_option("compaction.twcs.max_inactive_window_runs", "1")
|
||||
.insert_option("merge_mode", "last_non_null")
|
||||
.build();
|
||||
let region_dir = request.region_dir.clone();
|
||||
@@ -191,7 +189,7 @@ async fn test_merge_mode_compaction() {
|
||||
+-------+---------+---------+---------------------+";
|
||||
// Scans in parallel.
|
||||
let mut scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
|
||||
assert_eq!(1, scanner.num_files());
|
||||
assert_eq!(2, scanner.num_files());
|
||||
assert_eq!(1, scanner.num_memtables());
|
||||
scanner.set_target_partitions(2);
|
||||
let stream = scanner.scan().await.unwrap();
|
||||
|
||||
@@ -33,7 +33,7 @@ use crate::error::{
|
||||
};
|
||||
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
|
||||
use crate::metrics::{
|
||||
FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_ERRORS_TOTAL, FLUSH_REQUESTS_TOTAL,
|
||||
FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_FAILURE_TOTAL, FLUSH_REQUESTS_TOTAL,
|
||||
INFLIGHT_FLUSH_COUNT,
|
||||
};
|
||||
use crate::read::Source;
|
||||
@@ -601,7 +601,7 @@ impl FlushScheduler {
|
||||
pub(crate) fn on_flush_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
|
||||
error!(err; "Region {} failed to flush, cancel all pending tasks", region_id);
|
||||
|
||||
FLUSH_ERRORS_TOTAL.inc();
|
||||
FLUSH_FAILURE_TOTAL.inc();
|
||||
|
||||
// Remove this region.
|
||||
let Some(flush_status) = self.region_status.remove(®ion_id) else {
|
||||
|
||||
@@ -20,6 +20,7 @@
|
||||
#![feature(assert_matches)]
|
||||
#![feature(result_flattening)]
|
||||
#![feature(int_roundings)]
|
||||
#![feature(debug_closure_helpers)]
|
||||
|
||||
#[cfg(any(test, feature = "test"))]
|
||||
#[cfg_attr(feature = "test", allow(unused))]
|
||||
|
||||
@@ -70,8 +70,8 @@ lazy_static! {
|
||||
)
|
||||
.unwrap();
|
||||
/// Counter of scheduled failed flush jobs.
|
||||
pub static ref FLUSH_ERRORS_TOTAL: IntCounter =
|
||||
register_int_counter!("greptime_mito_flush_errors_total", "mito flush errors total").unwrap();
|
||||
pub static ref FLUSH_FAILURE_TOTAL: IntCounter =
|
||||
register_int_counter!("greptime_mito_flush_failure_total", "mito flush failure total").unwrap();
|
||||
/// Elapsed time of a flush job.
|
||||
pub static ref FLUSH_ELAPSED: HistogramVec = register_histogram_vec!(
|
||||
"greptime_mito_flush_elapsed",
|
||||
@@ -84,7 +84,7 @@ lazy_static! {
|
||||
/// Histogram of flushed bytes.
|
||||
pub static ref FLUSH_BYTES_TOTAL: IntCounter =
|
||||
register_int_counter!("greptime_mito_flush_bytes_total", "mito flush bytes total").unwrap();
|
||||
/// Gauge for inflight compaction tasks.
|
||||
/// Gauge for inflight flush tasks.
|
||||
pub static ref INFLIGHT_FLUSH_COUNT: IntGauge =
|
||||
register_int_gauge!(
|
||||
"greptime_mito_inflight_flush_count",
|
||||
@@ -153,7 +153,6 @@ lazy_static! {
|
||||
"greptime_mito_inflight_compaction_count",
|
||||
"inflight compaction count",
|
||||
).unwrap();
|
||||
// ------- End of compaction metrics.
|
||||
|
||||
// Query metrics.
|
||||
/// Timer of different stages in query.
|
||||
@@ -403,6 +402,20 @@ lazy_static! {
|
||||
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
/// Counter for compaction input file size.
|
||||
pub static ref COMPACTION_INPUT_BYTES: Counter = register_counter!(
|
||||
"greptime_mito_compaction_input_bytes",
|
||||
"mito compaction input file size",
|
||||
).unwrap();
|
||||
|
||||
/// Counter for compaction output file size.
|
||||
pub static ref COMPACTION_OUTPUT_BYTES: Counter = register_counter!(
|
||||
"greptime_mito_compaction_output_bytes",
|
||||
"mito compaction output file size",
|
||||
).unwrap();
|
||||
}
|
||||
|
||||
/// Stager notifier to collect metrics.
|
||||
pub struct StagerMetrics {
|
||||
cache_hit: IntCounter,
|
||||
|
||||
@@ -199,18 +199,9 @@ impl Default for CompactionOptions {
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(default)]
|
||||
pub struct TwcsOptions {
|
||||
/// Max num of sorted runs that can be kept in active writing time window.
|
||||
/// Minimum file num in every time window to trigger a compaction.
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub max_active_window_runs: usize,
|
||||
/// Max num of files in the active window.
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub max_active_window_files: usize,
|
||||
/// Max num of sorted runs that can be kept in inactive time windows.
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub max_inactive_window_runs: usize,
|
||||
/// Max num of files in inactive time windows.
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub max_inactive_window_files: usize,
|
||||
pub trigger_file_num: usize,
|
||||
/// Compaction time window defined when creating tables.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub time_window: Option<Duration>,
|
||||
@@ -243,12 +234,9 @@ impl TwcsOptions {
|
||||
impl Default for TwcsOptions {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
max_active_window_runs: 4,
|
||||
max_active_window_files: 4,
|
||||
max_inactive_window_runs: 1,
|
||||
max_inactive_window_files: 1,
|
||||
trigger_file_num: 4,
|
||||
time_window: None,
|
||||
max_output_file_size: Some(ReadableSize::gb(2)),
|
||||
max_output_file_size: Some(ReadableSize::mb(512)),
|
||||
remote_compaction: false,
|
||||
fallback_to_local: true,
|
||||
}
|
||||
@@ -500,7 +488,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_without_compaction_type() {
|
||||
let map = make_map(&[
|
||||
("compaction.twcs.max_active_window_runs", "8"),
|
||||
("compaction.twcs.trigger_file_num", "8"),
|
||||
("compaction.twcs.time_window", "2h"),
|
||||
]);
|
||||
let err = RegionOptions::try_from(&map).unwrap_err();
|
||||
@@ -510,14 +498,14 @@ mod tests {
|
||||
#[test]
|
||||
fn test_with_compaction_type() {
|
||||
let map = make_map(&[
|
||||
("compaction.twcs.max_active_window_runs", "8"),
|
||||
("compaction.twcs.trigger_file_num", "8"),
|
||||
("compaction.twcs.time_window", "2h"),
|
||||
("compaction.type", "twcs"),
|
||||
]);
|
||||
let options = RegionOptions::try_from(&map).unwrap();
|
||||
let expect = RegionOptions {
|
||||
compaction: CompactionOptions::Twcs(TwcsOptions {
|
||||
max_active_window_runs: 8,
|
||||
trigger_file_num: 8,
|
||||
time_window: Some(Duration::from_secs(3600 * 2)),
|
||||
..Default::default()
|
||||
}),
|
||||
@@ -618,10 +606,7 @@ mod tests {
|
||||
});
|
||||
let map = make_map(&[
|
||||
("ttl", "7d"),
|
||||
("compaction.twcs.max_active_window_runs", "8"),
|
||||
("compaction.twcs.max_active_window_files", "11"),
|
||||
("compaction.twcs.max_inactive_window_runs", "2"),
|
||||
("compaction.twcs.max_inactive_window_files", "3"),
|
||||
("compaction.twcs.trigger_file_num", "8"),
|
||||
("compaction.twcs.max_output_file_size", "1GB"),
|
||||
("compaction.twcs.time_window", "2h"),
|
||||
("compaction.type", "twcs"),
|
||||
@@ -645,10 +630,7 @@ mod tests {
|
||||
let expect = RegionOptions {
|
||||
ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
|
||||
compaction: CompactionOptions::Twcs(TwcsOptions {
|
||||
max_active_window_runs: 8,
|
||||
max_active_window_files: 11,
|
||||
max_inactive_window_runs: 2,
|
||||
max_inactive_window_files: 3,
|
||||
trigger_file_num: 8,
|
||||
time_window: Some(Duration::from_secs(3600 * 2)),
|
||||
max_output_file_size: Some(ReadableSize::gb(1)),
|
||||
remote_compaction: false,
|
||||
@@ -679,10 +661,7 @@ mod tests {
|
||||
let options = RegionOptions {
|
||||
ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
|
||||
compaction: CompactionOptions::Twcs(TwcsOptions {
|
||||
max_active_window_runs: 8,
|
||||
max_active_window_files: usize::MAX,
|
||||
max_inactive_window_runs: 2,
|
||||
max_inactive_window_files: usize::MAX,
|
||||
trigger_file_num: 8,
|
||||
time_window: Some(Duration::from_secs(3600 * 2)),
|
||||
max_output_file_size: None,
|
||||
remote_compaction: false,
|
||||
@@ -719,10 +698,7 @@ mod tests {
|
||||
"ttl": "7days",
|
||||
"compaction": {
|
||||
"compaction.type": "twcs",
|
||||
"compaction.twcs.max_active_window_runs": "8",
|
||||
"compaction.twcs.max_active_window_files": "11",
|
||||
"compaction.twcs.max_inactive_window_runs": "2",
|
||||
"compaction.twcs.max_inactive_window_files": "7",
|
||||
"compaction.twcs.trigger_file_num": "8",
|
||||
"compaction.twcs.max_output_file_size": "7MB",
|
||||
"compaction.twcs.time_window": "2h"
|
||||
},
|
||||
@@ -748,10 +724,7 @@ mod tests {
|
||||
let options = RegionOptions {
|
||||
ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
|
||||
compaction: CompactionOptions::Twcs(TwcsOptions {
|
||||
max_active_window_runs: 8,
|
||||
max_active_window_files: 11,
|
||||
max_inactive_window_runs: 2,
|
||||
max_inactive_window_files: 7,
|
||||
trigger_file_num: 8,
|
||||
time_window: Some(Duration::from_secs(3600 * 2)),
|
||||
max_output_file_size: Some(ReadableSize::mb(7)),
|
||||
remote_compaction: false,
|
||||
|
||||
@@ -15,11 +15,13 @@
|
||||
//! Structures to describe metadata of files.
|
||||
|
||||
use std::fmt;
|
||||
use std::fmt::{Debug, Formatter};
|
||||
use std::num::NonZeroU64;
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_time::Timestamp;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use smallvec::SmallVec;
|
||||
@@ -105,7 +107,7 @@ pub(crate) fn overlaps(l: &FileTimeRange, r: &FileTimeRange) -> bool {
|
||||
}
|
||||
|
||||
/// Metadata of a SST file.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
|
||||
#[derive(Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
|
||||
#[serde(default)]
|
||||
pub struct FileMeta {
|
||||
/// Region of file.
|
||||
@@ -142,6 +144,42 @@ pub struct FileMeta {
|
||||
pub sequence: Option<NonZeroU64>,
|
||||
}
|
||||
|
||||
impl Debug for FileMeta {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
|
||||
let mut debug_struct = f.debug_struct("FileMeta");
|
||||
debug_struct
|
||||
.field("region_id", &self.region_id)
|
||||
.field_with("file_id", |f| write!(f, "{} ", self.file_id))
|
||||
.field_with("time_range", |f| {
|
||||
write!(
|
||||
f,
|
||||
"({}, {}) ",
|
||||
self.time_range.0.to_iso8601_string(),
|
||||
self.time_range.1.to_iso8601_string()
|
||||
)
|
||||
})
|
||||
.field("level", &self.level)
|
||||
.field("file_size", &ReadableSize(self.file_size));
|
||||
if !self.available_indexes.is_empty() {
|
||||
debug_struct
|
||||
.field("available_indexes", &self.available_indexes)
|
||||
.field("index_file_size", &ReadableSize(self.index_file_size));
|
||||
}
|
||||
debug_struct
|
||||
.field("num_rows", &self.num_rows)
|
||||
.field("num_row_groups", &self.num_row_groups)
|
||||
.field_with("sequence", |f| match self.sequence {
|
||||
None => {
|
||||
write!(f, "None")
|
||||
}
|
||||
Some(seq) => {
|
||||
write!(f, "{}", seq)
|
||||
}
|
||||
})
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// Type of index.
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||
pub enum IndexType {
|
||||
@@ -188,13 +226,9 @@ pub struct FileHandle {
|
||||
impl fmt::Debug for FileHandle {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("FileHandle")
|
||||
.field("region_id", &self.inner.meta.region_id)
|
||||
.field("file_id", &self.inner.meta.file_id)
|
||||
.field("time_range", &self.inner.meta.time_range)
|
||||
.field("size", &self.inner.meta.file_size)
|
||||
.field("level", &self.inner.meta.level)
|
||||
.field("compacting", &self.inner.compacting)
|
||||
.field("deleted", &self.inner.deleted)
|
||||
.field("meta", self.meta_ref())
|
||||
.field("compacting", &self.compacting())
|
||||
.field("deleted", &self.inner.deleted.load(Ordering::Relaxed))
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -50,6 +50,10 @@ pub struct WriteOptions {
|
||||
pub write_buffer_size: ReadableSize,
|
||||
/// Row group size.
|
||||
pub row_group_size: usize,
|
||||
/// Max single output file size.
|
||||
/// Note: This is not a hard limit as we can only observe the file size when
|
||||
/// ArrowWrite writes to underlying writers.
|
||||
pub max_file_size: Option<usize>,
|
||||
}
|
||||
|
||||
impl Default for WriteOptions {
|
||||
@@ -57,6 +61,7 @@ impl Default for WriteOptions {
|
||||
WriteOptions {
|
||||
write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
|
||||
row_group_size: DEFAULT_ROW_GROUP_SIZE,
|
||||
max_file_size: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -99,8 +104,9 @@ mod tests {
|
||||
use tokio_util::compat::FuturesAsyncWriteCompatExt;
|
||||
|
||||
use super::*;
|
||||
use crate::access_layer::FilePathProvider;
|
||||
use crate::access_layer::{FilePathProvider, RegionFilePathFactory};
|
||||
use crate::cache::{CacheManager, CacheStrategy, PageKey};
|
||||
use crate::read::BatchReader;
|
||||
use crate::sst::index::{Indexer, IndexerBuilder};
|
||||
use crate::sst::parquet::format::WriteFormat;
|
||||
use crate::sst::parquet::reader::ParquetReaderBuilder;
|
||||
@@ -108,7 +114,8 @@ mod tests {
|
||||
use crate::sst::{location, DEFAULT_WRITE_CONCURRENCY};
|
||||
use crate::test_util::sst_util::{
|
||||
assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
|
||||
new_batch_with_binary, new_source, sst_file_handle, sst_region_metadata,
|
||||
new_batch_with_binary, new_source, sst_file_handle, sst_file_handle_with_file_id,
|
||||
sst_region_metadata,
|
||||
};
|
||||
use crate::test_util::{check_reader_result, TestEnv};
|
||||
|
||||
@@ -532,4 +539,58 @@ mod tests {
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_write_multiple_files() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
// create test env
|
||||
let mut env = TestEnv::new();
|
||||
let object_store = env.init_object_store_manager();
|
||||
let metadata = Arc::new(sst_region_metadata());
|
||||
let batches = &[
|
||||
new_batch_by_range(&["a", "d"], 0, 1000),
|
||||
new_batch_by_range(&["b", "f"], 0, 1000),
|
||||
new_batch_by_range(&["b", "h"], 100, 200),
|
||||
new_batch_by_range(&["b", "h"], 200, 300),
|
||||
new_batch_by_range(&["b", "h"], 300, 1000),
|
||||
];
|
||||
let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
|
||||
|
||||
let source = new_source(batches);
|
||||
let write_opts = WriteOptions {
|
||||
row_group_size: 50,
|
||||
max_file_size: Some(1024 * 16),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let path_provider = RegionFilePathFactory {
|
||||
region_dir: "test".to_string(),
|
||||
};
|
||||
let mut writer = ParquetWriter::new_with_object_store(
|
||||
object_store.clone(),
|
||||
metadata.clone(),
|
||||
NoopIndexBuilder,
|
||||
path_provider,
|
||||
)
|
||||
.await;
|
||||
|
||||
let files = writer.write_all(source, None, &write_opts).await.unwrap();
|
||||
assert_eq!(2, files.len());
|
||||
|
||||
let mut rows_read = 0;
|
||||
for f in &files {
|
||||
let file_handle = sst_file_handle_with_file_id(
|
||||
f.file_id,
|
||||
f.time_range.0.value(),
|
||||
f.time_range.1.value(),
|
||||
);
|
||||
let builder =
|
||||
ParquetReaderBuilder::new("test".to_string(), file_handle, object_store.clone());
|
||||
let mut reader = builder.build().await.unwrap();
|
||||
while let Some(batch) = reader.next_batch().await.unwrap() {
|
||||
rows_read += batch.num_rows();
|
||||
}
|
||||
}
|
||||
assert_eq!(total_rows, rows_read);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,11 +15,13 @@
|
||||
//! Parquet writer.
|
||||
|
||||
use std::future::Future;
|
||||
use std::mem;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use common_telemetry::debug;
|
||||
use common_time::Timestamp;
|
||||
use datatypes::arrow::datatypes::SchemaRef;
|
||||
use object_store::{FuturesAsyncWriter, ObjectStore};
|
||||
@@ -143,17 +145,52 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_or_create_indexer(&mut self) -> &mut Indexer {
|
||||
match self.current_indexer {
|
||||
None => {
|
||||
self.current_file = FileId::random();
|
||||
let indexer = self.indexer_builder.build(self.current_file).await;
|
||||
self.current_indexer = Some(indexer);
|
||||
// safety: self.current_indexer already set above.
|
||||
self.current_indexer.as_mut().unwrap()
|
||||
}
|
||||
Some(ref mut indexer) => indexer,
|
||||
}
|
||||
/// Finishes current SST file and index file.
|
||||
async fn finish_current_file(
|
||||
&mut self,
|
||||
ssts: &mut SstInfoArray,
|
||||
stats: &mut SourceStats,
|
||||
) -> Result<()> {
|
||||
// maybe_init_writer will re-create a new file.
|
||||
if let Some(mut current_writer) = mem::take(&mut self.writer) {
|
||||
let stats = mem::take(stats);
|
||||
// At least one row has been written.
|
||||
assert!(stats.num_rows > 0);
|
||||
|
||||
debug!(
|
||||
"Finishing current file {}, file size: {}, num rows: {}",
|
||||
self.current_file,
|
||||
self.bytes_written.load(Ordering::Relaxed),
|
||||
stats.num_rows
|
||||
);
|
||||
|
||||
// Finish indexer and writer.
|
||||
// safety: writer and index can only be both present or not.
|
||||
let index_output = self.current_indexer.as_mut().unwrap().finish().await;
|
||||
current_writer.flush().await.context(WriteParquetSnafu)?;
|
||||
|
||||
let file_meta = current_writer.close().await.context(WriteParquetSnafu)?;
|
||||
let file_size = self.bytes_written.load(Ordering::Relaxed) as u64;
|
||||
|
||||
// Safety: num rows > 0 so we must have min/max.
|
||||
let time_range = stats.time_range.unwrap();
|
||||
|
||||
// convert FileMetaData to ParquetMetaData
|
||||
let parquet_metadata = parse_parquet_metadata(file_meta)?;
|
||||
ssts.push(SstInfo {
|
||||
file_id: self.current_file,
|
||||
time_range,
|
||||
file_size,
|
||||
num_rows: stats.num_rows,
|
||||
num_row_groups: parquet_metadata.num_row_groups() as u64,
|
||||
file_metadata: Some(Arc::new(parquet_metadata)),
|
||||
index_metadata: index_output,
|
||||
});
|
||||
self.current_file = FileId::random();
|
||||
self.bytes_written.store(0, Ordering::Relaxed)
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Iterates source and writes all rows to Parquet file.
|
||||
@@ -184,6 +221,7 @@ where
|
||||
override_sequence: Option<SequenceNumber>, // override the `sequence` field from `Source`
|
||||
opts: &WriteOptions,
|
||||
) -> Result<SstInfoArray> {
|
||||
let mut results = smallvec![];
|
||||
let write_format =
|
||||
WriteFormat::new(self.metadata.clone()).with_override_sequence(override_sequence);
|
||||
let mut stats = SourceStats::default();
|
||||
@@ -196,49 +234,31 @@ where
|
||||
match res {
|
||||
Ok(mut batch) => {
|
||||
stats.update(&batch);
|
||||
self.get_or_create_indexer().await.update(&mut batch).await;
|
||||
// safety: self.current_indexer must be set when first batch has been written.
|
||||
self.current_indexer
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.update(&mut batch)
|
||||
.await;
|
||||
if let Some(max_file_size) = opts.max_file_size
|
||||
&& self.bytes_written.load(Ordering::Relaxed) > max_file_size
|
||||
{
|
||||
self.finish_current_file(&mut results, &mut stats).await?;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
self.get_or_create_indexer().await.abort().await;
|
||||
if let Some(indexer) = &mut self.current_indexer {
|
||||
indexer.abort().await;
|
||||
}
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let index_output = self.get_or_create_indexer().await.finish().await;
|
||||
|
||||
if stats.num_rows == 0 {
|
||||
return Ok(smallvec![]);
|
||||
}
|
||||
|
||||
let Some(mut arrow_writer) = self.writer.take() else {
|
||||
// No batch actually written.
|
||||
return Ok(smallvec![]);
|
||||
};
|
||||
|
||||
arrow_writer.flush().await.context(WriteParquetSnafu)?;
|
||||
|
||||
let file_meta = arrow_writer.close().await.context(WriteParquetSnafu)?;
|
||||
let file_size = self.bytes_written.load(Ordering::Relaxed) as u64;
|
||||
|
||||
// Safety: num rows > 0 so we must have min/max.
|
||||
let time_range = stats.time_range.unwrap();
|
||||
|
||||
// convert FileMetaData to ParquetMetaData
|
||||
let parquet_metadata = parse_parquet_metadata(file_meta)?;
|
||||
|
||||
let file_id = self.current_file;
|
||||
self.finish_current_file(&mut results, &mut stats).await?;
|
||||
|
||||
// object_store.write will make sure all bytes are written or an error is raised.
|
||||
Ok(smallvec![SstInfo {
|
||||
file_id,
|
||||
time_range,
|
||||
file_size,
|
||||
num_rows: stats.num_rows,
|
||||
num_row_groups: parquet_metadata.num_row_groups() as u64,
|
||||
file_metadata: Some(Arc::new(parquet_metadata)),
|
||||
index_metadata: index_output,
|
||||
}])
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
/// Customizes per-column config according to schema and maybe column cardinality.
|
||||
@@ -309,6 +329,10 @@ where
|
||||
AsyncArrowWriter::try_new(writer, schema.clone(), Some(writer_props))
|
||||
.context(WriteParquetSnafu)?;
|
||||
self.writer = Some(arrow_writer);
|
||||
|
||||
let indexer = self.indexer_builder.build(self.current_file).await;
|
||||
self.current_indexer = Some(indexer);
|
||||
|
||||
// safety: self.writer is assigned above
|
||||
Ok(self.writer.as_mut().unwrap())
|
||||
}
|
||||
|
||||
@@ -214,28 +214,10 @@ fn set_twcs_options(
|
||||
region_id: RegionId,
|
||||
) -> std::result::Result<(), MetadataError> {
|
||||
match key {
|
||||
mito_engine_options::TWCS_MAX_ACTIVE_WINDOW_RUNS => {
|
||||
let runs = parse_usize_with_default(key, value, default_option.max_active_window_runs)?;
|
||||
log_option_update(region_id, key, options.max_active_window_runs, runs);
|
||||
options.max_active_window_runs = runs;
|
||||
}
|
||||
mito_engine_options::TWCS_MAX_ACTIVE_WINDOW_FILES => {
|
||||
let files =
|
||||
parse_usize_with_default(key, value, default_option.max_active_window_files)?;
|
||||
log_option_update(region_id, key, options.max_active_window_files, files);
|
||||
options.max_active_window_files = files;
|
||||
}
|
||||
mito_engine_options::TWCS_MAX_INACTIVE_WINDOW_RUNS => {
|
||||
let runs =
|
||||
parse_usize_with_default(key, value, default_option.max_inactive_window_runs)?;
|
||||
log_option_update(region_id, key, options.max_inactive_window_runs, runs);
|
||||
options.max_inactive_window_runs = runs;
|
||||
}
|
||||
mito_engine_options::TWCS_MAX_INACTIVE_WINDOW_FILES => {
|
||||
let files =
|
||||
parse_usize_with_default(key, value, default_option.max_inactive_window_files)?;
|
||||
log_option_update(region_id, key, options.max_inactive_window_files, files);
|
||||
options.max_inactive_window_files = files;
|
||||
mito_engine_options::TWCS_TRIGGER_FILE_NUM => {
|
||||
let files = parse_usize_with_default(key, value, default_option.trigger_file_num)?;
|
||||
log_option_update(region_id, key, options.trigger_file_num, files);
|
||||
options.trigger_file_num = files;
|
||||
}
|
||||
mito_engine_options::TWCS_MAX_OUTPUT_FILE_SIZE => {
|
||||
let size = if value.is_empty() {
|
||||
|
||||
@@ -57,7 +57,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
}
|
||||
|
||||
if self.write_buffer_manager.should_stall() && allow_stall {
|
||||
self.stalled_count.add(write_requests.len() as i64);
|
||||
self.stalled_count
|
||||
.add((write_requests.len() + bulk_requests.len()) as i64);
|
||||
self.stalled_requests.append(write_requests, bulk_requests);
|
||||
self.listener.on_write_stall();
|
||||
return;
|
||||
@@ -181,7 +182,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
pub(crate) fn reject_region_stalled_requests(&mut self, region_id: &RegionId) {
|
||||
debug!("Rejects stalled requests for region {}", region_id);
|
||||
let (mut requests, mut bulk) = self.stalled_requests.remove(region_id);
|
||||
self.stalled_count.sub(requests.len() as i64);
|
||||
self.stalled_count.sub((requests.len() + bulk.len()) as i64);
|
||||
reject_write_requests(&mut requests, &mut bulk);
|
||||
}
|
||||
|
||||
@@ -189,7 +190,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
pub(crate) async fn handle_region_stalled_requests(&mut self, region_id: &RegionId) {
|
||||
debug!("Handles stalled requests for region {}", region_id);
|
||||
let (mut requests, mut bulk) = self.stalled_requests.remove(region_id);
|
||||
self.stalled_count.sub(requests.len() as i64);
|
||||
self.stalled_count.sub((requests.len() + bulk.len()) as i64);
|
||||
self.handle_write_requests(&mut requests, &mut bulk, true)
|
||||
.await;
|
||||
}
|
||||
|
||||
@@ -68,5 +68,6 @@ tokio-util.workspace = true
|
||||
tonic.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
common-meta = { workspace = true, features = ["testing"] }
|
||||
common-test-util.workspace = true
|
||||
path-slash = "0.2"
|
||||
|
||||
@@ -19,14 +19,12 @@ use api::v1::region::{
|
||||
bulk_insert_request, region_request, ArrowIpc, BulkInsertRequest, RegionRequest,
|
||||
RegionRequestHeader,
|
||||
};
|
||||
use bytes::Bytes;
|
||||
use common_base::AffectedRows;
|
||||
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
|
||||
use common_grpc::FlightData;
|
||||
use common_recordbatch::RecordBatch;
|
||||
use common_telemetry::tracing_context::TracingContext;
|
||||
use datatypes::schema::Schema;
|
||||
use prost::Message;
|
||||
use snafu::ResultExt;
|
||||
use store_api::storage::RegionId;
|
||||
use table::metadata::TableId;
|
||||
@@ -60,13 +58,8 @@ impl Inserter {
|
||||
.with_label_values(&["raw"])
|
||||
.observe(record_batch.num_rows() as f64);
|
||||
|
||||
// todo(hl): find a way to embed raw FlightData messages in greptimedb proto files so we don't have to encode here.
|
||||
|
||||
// safety: when reach here schema must be present.
|
||||
let schema_message = FlightEncoder::default()
|
||||
.encode(FlightMessage::Schema(decoder.schema().unwrap().clone()));
|
||||
let schema_bytes = Bytes::from(schema_message.encode_to_vec());
|
||||
|
||||
let schema_bytes = decoder.schema_bytes().unwrap();
|
||||
let partition_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
|
||||
.with_label_values(&["partition"])
|
||||
.start_timer();
|
||||
@@ -96,12 +89,6 @@ impl Inserter {
|
||||
.find_region_leader(region_id)
|
||||
.await
|
||||
.context(error::FindRegionLeaderSnafu)?;
|
||||
let payload = {
|
||||
let _encode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
|
||||
.with_label_values(&["encode"])
|
||||
.start_timer();
|
||||
Bytes::from(data.encode_to_vec())
|
||||
};
|
||||
let request = RegionRequest {
|
||||
header: Some(RegionRequestHeader {
|
||||
tracing_context: TracingContext::from_current_span().to_w3c(),
|
||||
@@ -111,7 +98,8 @@ impl Inserter {
|
||||
body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc {
|
||||
region_id: region_id.as_u64(),
|
||||
schema: schema_bytes,
|
||||
payload,
|
||||
data_header: data.data_header,
|
||||
payload: data.data_body,
|
||||
})),
|
||||
})),
|
||||
};
|
||||
@@ -149,6 +137,7 @@ impl Inserter {
|
||||
let record_batch_schema =
|
||||
Arc::new(Schema::try_from(record_batch.schema()).context(error::ConvertSchemaSnafu)?);
|
||||
|
||||
// raw daya header and payload bytes.
|
||||
let mut raw_data_bytes = None;
|
||||
for (peer, masks) in mask_per_datanode {
|
||||
for (region_id, mask) in masks {
|
||||
@@ -157,10 +146,12 @@ impl Inserter {
|
||||
let record_batch_schema = record_batch_schema.clone();
|
||||
let node_manager = self.node_manager.clone();
|
||||
let peer = peer.clone();
|
||||
let raw_data = if mask.select_all() {
|
||||
let raw_header_and_data = if mask.select_all() {
|
||||
Some(
|
||||
raw_data_bytes
|
||||
.get_or_insert_with(|| Bytes::from(data.encode_to_vec()))
|
||||
.get_or_insert_with(|| {
|
||||
(data.data_header.clone(), data.data_body.clone())
|
||||
})
|
||||
.clone(),
|
||||
)
|
||||
} else {
|
||||
@@ -168,9 +159,9 @@ impl Inserter {
|
||||
};
|
||||
let handle: common_runtime::JoinHandle<error::Result<api::region::RegionResponse>> =
|
||||
common_runtime::spawn_global(async move {
|
||||
let payload = if mask.select_all() {
|
||||
let (header, payload) = if mask.select_all() {
|
||||
// SAFETY: raw data must be present, we can avoid re-encoding.
|
||||
raw_data.unwrap()
|
||||
raw_header_and_data.unwrap()
|
||||
} else {
|
||||
let filter_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
|
||||
.with_label_values(&["filter"])
|
||||
@@ -188,13 +179,10 @@ impl Inserter {
|
||||
let batch =
|
||||
RecordBatch::try_from_df_record_batch(record_batch_schema, rb)
|
||||
.context(error::BuildRecordBatchSnafu)?;
|
||||
let payload = Bytes::from(
|
||||
FlightEncoder::default()
|
||||
.encode(FlightMessage::Recordbatch(batch))
|
||||
.encode_to_vec(),
|
||||
);
|
||||
let flight_data =
|
||||
FlightEncoder::default().encode(FlightMessage::Recordbatch(batch));
|
||||
encode_timer.observe_duration();
|
||||
payload
|
||||
(flight_data.data_header, flight_data.data_body)
|
||||
};
|
||||
let _datanode_handle_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
|
||||
.with_label_values(&["datanode_handle"])
|
||||
@@ -208,6 +196,7 @@ impl Inserter {
|
||||
body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc {
|
||||
region_id: region_id.as_u64(),
|
||||
schema: schema_bytes,
|
||||
data_header: header,
|
||||
payload,
|
||||
})),
|
||||
})),
|
||||
@@ -231,6 +220,7 @@ impl Inserter {
|
||||
for res in region_responses {
|
||||
rows_inserted += res?.affected_rows;
|
||||
}
|
||||
crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(rows_inserted as u64);
|
||||
Ok(rows_inserted)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -780,8 +780,77 @@ mod tests {
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_create_flow_tql_expr() {
|
||||
let sql = r#"
|
||||
CREATE FLOW calc_reqs SINK TO cnt_reqs AS
|
||||
TQL EVAL (0, 15, '5s') count_values("status_code", http_requests);"#;
|
||||
let stmt =
|
||||
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
|
||||
.unwrap()
|
||||
.pop()
|
||||
.unwrap();
|
||||
|
||||
let Statement::CreateFlow(create_flow) = stmt else {
|
||||
unreachable!()
|
||||
};
|
||||
let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
|
||||
|
||||
let to_dot_sep =
|
||||
|c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
|
||||
assert_eq!("calc_reqs", expr.flow_name);
|
||||
assert_eq!("greptime", expr.catalog_name);
|
||||
assert_eq!(
|
||||
"greptime.public.cnt_reqs",
|
||||
expr.sink_table_name.map(to_dot_sep).unwrap()
|
||||
);
|
||||
assert!(expr.source_table_names.is_empty());
|
||||
assert_eq!(
|
||||
r#"TQL EVAL (0, 15, '5s') count_values("status_code", http_requests)"#,
|
||||
expr.sql
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_create_flow_expr() {
|
||||
let sql = r"
|
||||
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
|
||||
SELECT
|
||||
DISTINCT number as dis
|
||||
FROM
|
||||
distinct_basic;";
|
||||
let stmt =
|
||||
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
|
||||
.unwrap()
|
||||
.pop()
|
||||
.unwrap();
|
||||
|
||||
let Statement::CreateFlow(create_flow) = stmt else {
|
||||
unreachable!()
|
||||
};
|
||||
let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
|
||||
|
||||
let to_dot_sep =
|
||||
|c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
|
||||
assert_eq!("test_distinct_basic", expr.flow_name);
|
||||
assert_eq!("greptime", expr.catalog_name);
|
||||
assert_eq!(
|
||||
"greptime.public.out_distinct_basic",
|
||||
expr.sink_table_name.map(to_dot_sep).unwrap()
|
||||
);
|
||||
assert_eq!(1, expr.source_table_names.len());
|
||||
assert_eq!(
|
||||
"greptime.public.distinct_basic",
|
||||
to_dot_sep(expr.source_table_names[0].clone())
|
||||
);
|
||||
assert_eq!(
|
||||
r"SELECT
|
||||
DISTINCT number as dis
|
||||
FROM
|
||||
distinct_basic",
|
||||
expr.sql
|
||||
);
|
||||
|
||||
let sql = r"
|
||||
CREATE FLOW `task_2`
|
||||
SINK TO schema_1.table_1
|
||||
|
||||
@@ -147,7 +147,7 @@ impl Inserter {
|
||||
statement_executor: &StatementExecutor,
|
||||
) -> Result<Output> {
|
||||
let row_inserts = ColumnToRow::convert(requests)?;
|
||||
self.handle_row_inserts(row_inserts, ctx, statement_executor)
|
||||
self.handle_row_inserts(row_inserts, ctx, statement_executor, false)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -157,6 +157,7 @@ impl Inserter {
|
||||
mut requests: RowInsertRequests,
|
||||
ctx: QueryContextRef,
|
||||
statement_executor: &StatementExecutor,
|
||||
accommodate_existing_schema: bool,
|
||||
) -> Result<Output> {
|
||||
preprocess_row_insert_requests(&mut requests.inserts)?;
|
||||
self.handle_row_inserts_with_create_type(
|
||||
@@ -164,6 +165,7 @@ impl Inserter {
|
||||
ctx,
|
||||
statement_executor,
|
||||
AutoCreateTableType::Physical,
|
||||
accommodate_existing_schema,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -180,6 +182,7 @@ impl Inserter {
|
||||
ctx,
|
||||
statement_executor,
|
||||
AutoCreateTableType::Log,
|
||||
false,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -195,6 +198,7 @@ impl Inserter {
|
||||
ctx,
|
||||
statement_executor,
|
||||
AutoCreateTableType::Trace,
|
||||
false,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -205,12 +209,14 @@ impl Inserter {
|
||||
requests: RowInsertRequests,
|
||||
ctx: QueryContextRef,
|
||||
statement_executor: &StatementExecutor,
|
||||
accommodate_existing_schema: bool,
|
||||
) -> Result<Output> {
|
||||
self.handle_row_inserts_with_create_type(
|
||||
requests,
|
||||
ctx,
|
||||
statement_executor,
|
||||
AutoCreateTableType::LastNonNull,
|
||||
accommodate_existing_schema,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -222,6 +228,7 @@ impl Inserter {
|
||||
ctx: QueryContextRef,
|
||||
statement_executor: &StatementExecutor,
|
||||
create_type: AutoCreateTableType,
|
||||
accommodate_existing_schema: bool,
|
||||
) -> Result<Output> {
|
||||
// remove empty requests
|
||||
requests.inserts.retain(|req| {
|
||||
@@ -236,7 +243,13 @@ impl Inserter {
|
||||
instant_table_ids,
|
||||
table_infos,
|
||||
} = self
|
||||
.create_or_alter_tables_on_demand(&requests, &ctx, create_type, statement_executor)
|
||||
.create_or_alter_tables_on_demand(
|
||||
&mut requests,
|
||||
&ctx,
|
||||
create_type,
|
||||
statement_executor,
|
||||
accommodate_existing_schema,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let name_to_info = table_infos
|
||||
@@ -281,10 +294,11 @@ impl Inserter {
|
||||
table_infos,
|
||||
} = self
|
||||
.create_or_alter_tables_on_demand(
|
||||
&requests,
|
||||
&mut requests,
|
||||
&ctx,
|
||||
AutoCreateTableType::Logical(physical_table.to_string()),
|
||||
statement_executor,
|
||||
true,
|
||||
)
|
||||
.await?;
|
||||
let name_to_info = table_infos
|
||||
@@ -448,12 +462,18 @@ impl Inserter {
|
||||
///
|
||||
/// Returns a mapping from table name to table id, where table name is the table name involved in the requests.
|
||||
/// This mapping is used in the conversion of RowToRegion.
|
||||
///
|
||||
/// `accommodate_existing_schema` is used to determine if the existing schema should override the new schema.
|
||||
/// It only works for TIME_INDEX and VALUE columns. This is for the case where the user creates a table with
|
||||
/// custom schema, and then inserts data with endpoints that have default schema setting, like prometheus
|
||||
/// remote write. This will modify the `RowInsertRequests` in place.
|
||||
async fn create_or_alter_tables_on_demand(
|
||||
&self,
|
||||
requests: &RowInsertRequests,
|
||||
requests: &mut RowInsertRequests,
|
||||
ctx: &QueryContextRef,
|
||||
auto_create_table_type: AutoCreateTableType,
|
||||
statement_executor: &StatementExecutor,
|
||||
accommodate_existing_schema: bool,
|
||||
) -> Result<CreateAlterTableResult> {
|
||||
let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND
|
||||
.with_label_values(&[auto_create_table_type.as_str()])
|
||||
@@ -504,7 +524,7 @@ impl Inserter {
|
||||
let mut alter_tables = vec![];
|
||||
let mut instant_table_ids = HashSet::new();
|
||||
|
||||
for req in &requests.inserts {
|
||||
for req in &mut requests.inserts {
|
||||
match self.get_table(catalog, &schema, &req.table_name).await? {
|
||||
Some(table) => {
|
||||
let table_info = table.table_info();
|
||||
@@ -512,9 +532,12 @@ impl Inserter {
|
||||
instant_table_ids.insert(table_info.table_id());
|
||||
}
|
||||
table_infos.insert(table_info.table_id(), table.table_info());
|
||||
if let Some(alter_expr) =
|
||||
self.get_alter_table_expr_on_demand(req, &table, ctx)?
|
||||
{
|
||||
if let Some(alter_expr) = self.get_alter_table_expr_on_demand(
|
||||
req,
|
||||
&table,
|
||||
ctx,
|
||||
accommodate_existing_schema,
|
||||
)? {
|
||||
alter_tables.push(alter_expr);
|
||||
}
|
||||
}
|
||||
@@ -788,12 +811,16 @@ impl Inserter {
|
||||
}
|
||||
|
||||
/// Returns an alter table expression if it finds new columns in the request.
|
||||
/// It always adds columns if not exist.
|
||||
/// When `accommodate_existing_schema` is false, it always adds columns if not exist.
|
||||
/// When `accommodate_existing_schema` is true, it may modify the input `req` to
|
||||
/// accommodate it with existing schema. See [`create_or_alter_tables_on_demand`](Self::create_or_alter_tables_on_demand)
|
||||
/// for more details.
|
||||
fn get_alter_table_expr_on_demand(
|
||||
&self,
|
||||
req: &RowInsertRequest,
|
||||
req: &mut RowInsertRequest,
|
||||
table: &TableRef,
|
||||
ctx: &QueryContextRef,
|
||||
accommodate_existing_schema: bool,
|
||||
) -> Result<Option<AlterTableExpr>> {
|
||||
let catalog_name = ctx.current_catalog();
|
||||
let schema_name = ctx.current_schema();
|
||||
@@ -802,10 +829,64 @@ impl Inserter {
|
||||
let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
|
||||
let column_exprs = ColumnExpr::from_column_schemas(request_schema);
|
||||
let add_columns = expr_helper::extract_add_columns_expr(&table.schema(), column_exprs)?;
|
||||
let Some(add_columns) = add_columns else {
|
||||
let Some(mut add_columns) = add_columns else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
// If accommodate_existing_schema is true, update request schema for Timestamp/Field columns
|
||||
if accommodate_existing_schema {
|
||||
let table_schema = table.schema();
|
||||
// Find timestamp column name
|
||||
let ts_col_name = table_schema.timestamp_column().map(|c| c.name.clone());
|
||||
// Find field column name if there is only one
|
||||
let mut field_col_name = None;
|
||||
let mut multiple_field_cols = false;
|
||||
table.field_columns().for_each(|col| {
|
||||
if field_col_name.is_none() {
|
||||
field_col_name = Some(col.name.clone());
|
||||
} else {
|
||||
multiple_field_cols = true;
|
||||
}
|
||||
});
|
||||
if multiple_field_cols {
|
||||
field_col_name = None;
|
||||
}
|
||||
|
||||
// Update column name in request schema for Timestamp/Field columns
|
||||
if let Some(rows) = req.rows.as_mut() {
|
||||
for col in &mut rows.schema {
|
||||
match col.semantic_type {
|
||||
x if x == SemanticType::Timestamp as i32 => {
|
||||
if let Some(ref ts_name) = ts_col_name {
|
||||
if col.column_name != *ts_name {
|
||||
col.column_name = ts_name.clone();
|
||||
}
|
||||
}
|
||||
}
|
||||
x if x == SemanticType::Field as i32 => {
|
||||
if let Some(ref field_name) = field_col_name {
|
||||
if col.column_name != *field_name {
|
||||
col.column_name = field_name.clone();
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Remove from add_columns any column that is timestamp or field (if there is only one field column)
|
||||
add_columns.add_columns.retain(|col| {
|
||||
let def = col.column_def.as_ref().unwrap();
|
||||
def.semantic_type != SemanticType::Timestamp as i32
|
||||
&& (def.semantic_type != SemanticType::Field as i32 && field_col_name.is_some())
|
||||
});
|
||||
|
||||
if add_columns.add_columns.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Some(AlterTableExpr {
|
||||
catalog_name: catalog_name.to_string(),
|
||||
schema_name: schema_name.to_string(),
|
||||
@@ -1039,3 +1120,124 @@ impl FlowMirrorTask {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::{ColumnSchema as GrpcColumnSchema, RowInsertRequest, Rows, SemanticType, Value};
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_meta::cache::new_table_flownode_set_cache;
|
||||
use common_meta::ddl::test_util::datanode_handler::NaiveDatanodeHandler;
|
||||
use common_meta::test_util::MockDatanodeManager;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use moka::future::Cache;
|
||||
use session::context::QueryContext;
|
||||
use table::dist_table::DummyDataSource;
|
||||
use table::metadata::{TableInfoBuilder, TableMetaBuilder, TableType};
|
||||
use table::TableRef;
|
||||
|
||||
use super::*;
|
||||
use crate::tests::{create_partition_rule_manager, prepare_mocked_backend};
|
||||
|
||||
fn make_table_ref_with_schema(ts_name: &str, field_name: &str) -> TableRef {
|
||||
let schema = datatypes::schema::SchemaBuilder::try_from_columns(vec![
|
||||
ColumnSchema::new(
|
||||
ts_name,
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
)
|
||||
.with_time_index(true),
|
||||
ColumnSchema::new(field_name, ConcreteDataType::float64_datatype(), true),
|
||||
])
|
||||
.unwrap()
|
||||
.build()
|
||||
.unwrap();
|
||||
let meta = TableMetaBuilder::empty()
|
||||
.schema(Arc::new(schema))
|
||||
.primary_key_indices(vec![])
|
||||
.value_indices(vec![1])
|
||||
.engine("mito")
|
||||
.next_column_id(0)
|
||||
.options(Default::default())
|
||||
.created_on(Default::default())
|
||||
.region_numbers(vec![0])
|
||||
.build()
|
||||
.unwrap();
|
||||
let info = Arc::new(
|
||||
TableInfoBuilder::default()
|
||||
.table_id(1)
|
||||
.table_version(0)
|
||||
.name("test_table")
|
||||
.schema_name(DEFAULT_SCHEMA_NAME)
|
||||
.catalog_name(DEFAULT_CATALOG_NAME)
|
||||
.desc(None)
|
||||
.table_type(TableType::Base)
|
||||
.meta(meta)
|
||||
.build()
|
||||
.unwrap(),
|
||||
);
|
||||
Arc::new(table::Table::new(
|
||||
info,
|
||||
table::metadata::FilterPushDownType::Unsupported,
|
||||
Arc::new(DummyDataSource),
|
||||
))
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_accommodate_existing_schema_logic() {
|
||||
let ts_name = "my_ts";
|
||||
let field_name = "my_field";
|
||||
let table = make_table_ref_with_schema(ts_name, field_name);
|
||||
|
||||
// The request uses different names for timestamp and field columns
|
||||
let mut req = RowInsertRequest {
|
||||
table_name: "test_table".to_string(),
|
||||
rows: Some(Rows {
|
||||
schema: vec![
|
||||
GrpcColumnSchema {
|
||||
column_name: "ts_wrong".to_string(),
|
||||
datatype: api::v1::ColumnDataType::TimestampMillisecond as i32,
|
||||
semantic_type: SemanticType::Timestamp as i32,
|
||||
..Default::default()
|
||||
},
|
||||
GrpcColumnSchema {
|
||||
column_name: "field_wrong".to_string(),
|
||||
datatype: api::v1::ColumnDataType::Float64 as i32,
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
..Default::default()
|
||||
},
|
||||
],
|
||||
rows: vec![api::v1::Row {
|
||||
values: vec![Value::default(), Value::default()],
|
||||
}],
|
||||
}),
|
||||
};
|
||||
let ctx = Arc::new(QueryContext::with(
|
||||
DEFAULT_CATALOG_NAME,
|
||||
DEFAULT_SCHEMA_NAME,
|
||||
));
|
||||
|
||||
let kv_backend = prepare_mocked_backend().await;
|
||||
let inserter = Inserter::new(
|
||||
catalog::memory::MemoryCatalogManager::new(),
|
||||
create_partition_rule_manager(kv_backend.clone()).await,
|
||||
Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)),
|
||||
Arc::new(new_table_flownode_set_cache(
|
||||
String::new(),
|
||||
Cache::new(100),
|
||||
kv_backend.clone(),
|
||||
)),
|
||||
);
|
||||
let alter_expr = inserter
|
||||
.get_alter_table_expr_on_demand(&mut req, &table, &ctx, true)
|
||||
.unwrap();
|
||||
assert!(alter_expr.is_none());
|
||||
|
||||
// The request's schema should have updated names for timestamp and field columns
|
||||
let req_schema = req.rows.as_ref().unwrap().schema.clone();
|
||||
assert_eq!(req_schema[0].column_name, ts_name);
|
||||
assert_eq!(req_schema[1].column_name, field_name);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
#![feature(assert_matches)]
|
||||
#![feature(if_let_guard)]
|
||||
#![feature(let_chains)]
|
||||
|
||||
mod bulk_insert;
|
||||
pub mod delete;
|
||||
|
||||
@@ -57,33 +57,13 @@ mod tests {
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType, Value};
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_meta::key::catalog_name::{CatalogManager, CatalogNameKey};
|
||||
use common_meta::key::schema_name::{SchemaManager, SchemaNameKey};
|
||||
use common_meta::kv_backend::memory::MemoryKvBackend;
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use datatypes::vectors::{Int32Vector, VectorRef};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use super::*;
|
||||
use crate::tests::{create_partition_rule_manager, new_test_table_info};
|
||||
|
||||
async fn prepare_mocked_backend() -> KvBackendRef {
|
||||
let backend = Arc::new(MemoryKvBackend::default());
|
||||
|
||||
let catalog_manager = CatalogManager::new(backend.clone());
|
||||
let schema_manager = SchemaManager::new(backend.clone());
|
||||
|
||||
catalog_manager
|
||||
.create(CatalogNameKey::default(), false)
|
||||
.await
|
||||
.unwrap();
|
||||
schema_manager
|
||||
.create(SchemaNameKey::default(), None, false)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
backend
|
||||
}
|
||||
use crate::tests::{
|
||||
create_partition_rule_manager, new_test_table_info, prepare_mocked_backend,
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_delete_request_table_to_region() {
|
||||
|
||||
@@ -73,33 +73,13 @@ mod tests {
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType, Value};
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_meta::key::catalog_name::{CatalogManager, CatalogNameKey};
|
||||
use common_meta::key::schema_name::{SchemaManager, SchemaNameKey};
|
||||
use common_meta::kv_backend::memory::MemoryKvBackend;
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use datatypes::vectors::{Int32Vector, VectorRef};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use super::*;
|
||||
use crate::tests::{create_partition_rule_manager, new_test_table_info};
|
||||
|
||||
async fn prepare_mocked_backend() -> KvBackendRef {
|
||||
let backend = Arc::new(MemoryKvBackend::default());
|
||||
|
||||
let catalog_manager = CatalogManager::new(backend.clone());
|
||||
let schema_manager = SchemaManager::new(backend.clone());
|
||||
|
||||
catalog_manager
|
||||
.create(CatalogNameKey::default(), false)
|
||||
.await
|
||||
.unwrap();
|
||||
schema_manager
|
||||
.create(SchemaNameKey::default(), None, false)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
backend
|
||||
}
|
||||
use crate::tests::{
|
||||
create_partition_rule_manager, new_test_table_info, prepare_mocked_backend,
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_insert_request_table_to_region() {
|
||||
|
||||
@@ -48,7 +48,7 @@ use lazy_static::lazy_static;
|
||||
use partition::expr::{Operand, PartitionExpr, RestrictedOp};
|
||||
use partition::multi_dim::MultiDimPartitionRule;
|
||||
use partition::partition::{PartitionBound, PartitionDef};
|
||||
use query::parser::{QueryLanguageParser, QueryStatement};
|
||||
use query::parser::QueryStatement;
|
||||
use query::plan::extract_and_rewrite_full_table_names;
|
||||
use query::query_engine::DefaultSerializer;
|
||||
use query::sql::create_table_stmt;
|
||||
@@ -56,6 +56,7 @@ use regex::Regex;
|
||||
use session::context::QueryContextRef;
|
||||
use session::table_name::table_idents_to_full_name;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use sql::parser::{ParseOptions, ParserContext};
|
||||
use sql::statements::alter::{AlterDatabase, AlterTable};
|
||||
use sql::statements::create::{
|
||||
CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, Partitions,
|
||||
@@ -440,15 +441,33 @@ impl StatementExecutor {
|
||||
}
|
||||
|
||||
let engine = &self.query_engine;
|
||||
let stmt = QueryLanguageParser::parse_sql(&expr.sql, &query_ctx)
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
let plan = engine
|
||||
.planner()
|
||||
.plan(&stmt, query_ctx)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
let stmts = ParserContext::create_with_dialect(
|
||||
&expr.sql,
|
||||
query_ctx.sql_dialect(),
|
||||
ParseOptions::default(),
|
||||
)
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
|
||||
ensure!(
|
||||
stmts.len() == 1,
|
||||
InvalidSqlSnafu {
|
||||
err_msg: format!("Expect only one statement, found {}", stmts.len())
|
||||
}
|
||||
);
|
||||
let stmt = &stmts[0];
|
||||
|
||||
// support tql parse too
|
||||
let plan = match stmt {
|
||||
// prom ql is only supported in batching mode
|
||||
Statement::Tql(_) => return Ok(FlowType::Batching),
|
||||
_ => engine
|
||||
.planner()
|
||||
.plan(&QueryStatement::Sql(stmt.clone()), query_ctx)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?,
|
||||
};
|
||||
|
||||
/// Visitor to find aggregation or distinct
|
||||
struct FindAggr {
|
||||
@@ -843,8 +862,46 @@ impl StatementExecutor {
|
||||
}
|
||||
);
|
||||
|
||||
self.alter_logical_tables_procedure(alter_table_exprs, query_context)
|
||||
.await?;
|
||||
// group by physical table id
|
||||
let mut groups: HashMap<TableId, Vec<AlterTableExpr>> = HashMap::new();
|
||||
for expr in alter_table_exprs {
|
||||
// Get table_id from catalog_manager
|
||||
let catalog = if expr.catalog_name.is_empty() {
|
||||
query_context.current_catalog()
|
||||
} else {
|
||||
&expr.catalog_name
|
||||
};
|
||||
let schema = if expr.schema_name.is_empty() {
|
||||
query_context.current_schema()
|
||||
} else {
|
||||
expr.schema_name.to_string()
|
||||
};
|
||||
let table_name = &expr.table_name;
|
||||
let table = self
|
||||
.catalog_manager
|
||||
.table(catalog, &schema, table_name, Some(&query_context))
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
table_name: format_full_table_name(catalog, &schema, table_name),
|
||||
})?;
|
||||
let table_id = table.table_info().ident.table_id;
|
||||
let physical_table_id = self
|
||||
.table_metadata_manager
|
||||
.table_route_manager()
|
||||
.get_physical_table_id(table_id)
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?;
|
||||
groups.entry(physical_table_id).or_default().push(expr);
|
||||
}
|
||||
|
||||
// Submit procedure for each physical table
|
||||
let mut handles = Vec::with_capacity(groups.len());
|
||||
for (_physical_table_id, exprs) in groups {
|
||||
let fut = self.alter_logical_tables_procedure(exprs, query_context.clone());
|
||||
handles.push(fut);
|
||||
}
|
||||
let _results = futures::future::try_join_all(handles).await?;
|
||||
|
||||
Ok(Output::new_with_affected_rows(0))
|
||||
}
|
||||
|
||||
@@ -122,7 +122,11 @@ pub fn set_search_path(exprs: Vec<Expr>, ctx: QueryContextRef) -> Result<()> {
|
||||
match search_expr {
|
||||
Expr::Value(Value::SingleQuotedString(search_path))
|
||||
| Expr::Value(Value::DoubleQuotedString(search_path)) => {
|
||||
ctx.set_current_schema(&search_path.clone());
|
||||
ctx.set_current_schema(search_path);
|
||||
Ok(())
|
||||
}
|
||||
Expr::Identifier(Ident { value, .. }) => {
|
||||
ctx.set_current_schema(value);
|
||||
Ok(())
|
||||
}
|
||||
expr => NotSupportedSnafu {
|
||||
|
||||
@@ -12,6 +12,8 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod kv_backend;
|
||||
mod partition_manager;
|
||||
|
||||
pub(crate) use kv_backend::prepare_mocked_backend;
|
||||
pub(crate) use partition_manager::{create_partition_rule_manager, new_test_table_info};
|
||||
|
||||
38
src/operator/src/tests/kv_backend.rs
Normal file
38
src/operator/src/tests/kv_backend.rs
Normal file
@@ -0,0 +1,38 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_meta::key::catalog_name::{CatalogManager, CatalogNameKey};
|
||||
use common_meta::key::schema_name::{SchemaManager, SchemaNameKey};
|
||||
use common_meta::kv_backend::memory::MemoryKvBackend;
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
|
||||
pub async fn prepare_mocked_backend() -> KvBackendRef {
|
||||
let backend = Arc::new(MemoryKvBackend::default());
|
||||
|
||||
let catalog_manager = CatalogManager::new(backend.clone());
|
||||
let schema_manager = SchemaManager::new(backend.clone());
|
||||
|
||||
catalog_manager
|
||||
.create(CatalogNameKey::default(), false)
|
||||
.await
|
||||
.unwrap();
|
||||
schema_manager
|
||||
.create(SchemaNameKey::default(), None, false)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
backend
|
||||
}
|
||||
@@ -61,7 +61,7 @@ impl From<Value> for Operand {
|
||||
impl Operand {
|
||||
pub fn try_as_logical_expr(&self) -> error::Result<Expr> {
|
||||
match self {
|
||||
Self::Column(c) => Ok(datafusion_expr::col(c)),
|
||||
Self::Column(c) => Ok(datafusion_expr::col(format!(r#""{}""#, c))),
|
||||
Self::Value(v) => {
|
||||
let scalar_value = match v {
|
||||
Value::Boolean(v) => ScalarValue::Boolean(Some(*v)),
|
||||
|
||||
@@ -249,6 +249,7 @@ impl PipelineTable {
|
||||
requests,
|
||||
Self::query_ctx(&table_info),
|
||||
&self.statement_executor,
|
||||
false,
|
||||
)
|
||||
.await
|
||||
.context(InsertPipelineSnafu)?;
|
||||
|
||||
@@ -47,6 +47,11 @@ use crate::metrics::PROMQL_SERIES_COUNT;
|
||||
#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
|
||||
pub struct SeriesDivide {
|
||||
tag_columns: Vec<String>,
|
||||
/// `SeriesDivide` requires `time_index` column's name to generate ordering requirement
|
||||
/// for input data. But this plan itself doesn't depend on the ordering of time index
|
||||
/// column. This is for follow on plans like `RangeManipulate`. Because requiring ordering
|
||||
/// here can avoid unnecessary sort in follow on plans.
|
||||
time_index_column: String,
|
||||
input: LogicalPlan,
|
||||
}
|
||||
|
||||
@@ -84,14 +89,19 @@ impl UserDefinedLogicalNodeCore for SeriesDivide {
|
||||
|
||||
Ok(Self {
|
||||
tag_columns: self.tag_columns.clone(),
|
||||
time_index_column: self.time_index_column.clone(),
|
||||
input: inputs[0].clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl SeriesDivide {
|
||||
pub fn new(tag_columns: Vec<String>, input: LogicalPlan) -> Self {
|
||||
Self { tag_columns, input }
|
||||
pub fn new(tag_columns: Vec<String>, time_index_column: String, input: LogicalPlan) -> Self {
|
||||
Self {
|
||||
tag_columns,
|
||||
time_index_column,
|
||||
input,
|
||||
}
|
||||
}
|
||||
|
||||
pub const fn name() -> &'static str {
|
||||
@@ -101,6 +111,7 @@ impl SeriesDivide {
|
||||
pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
|
||||
Arc::new(SeriesDivideExec {
|
||||
tag_columns: self.tag_columns.clone(),
|
||||
time_index_column: self.time_index_column.clone(),
|
||||
input: exec_input,
|
||||
metric: ExecutionPlanMetricsSet::new(),
|
||||
})
|
||||
@@ -113,6 +124,7 @@ impl SeriesDivide {
|
||||
pub fn serialize(&self) -> Vec<u8> {
|
||||
pb::SeriesDivide {
|
||||
tag_columns: self.tag_columns.clone(),
|
||||
time_index_column: self.time_index_column.clone(),
|
||||
}
|
||||
.encode_to_vec()
|
||||
}
|
||||
@@ -125,6 +137,7 @@ impl SeriesDivide {
|
||||
});
|
||||
Ok(Self {
|
||||
tag_columns: pb_series_divide.tag_columns,
|
||||
time_index_column: pb_series_divide.time_index_column,
|
||||
input: placeholder_plan,
|
||||
})
|
||||
}
|
||||
@@ -133,6 +146,7 @@ impl SeriesDivide {
|
||||
#[derive(Debug)]
|
||||
pub struct SeriesDivideExec {
|
||||
tag_columns: Vec<String>,
|
||||
time_index_column: String,
|
||||
input: Arc<dyn ExecutionPlan>,
|
||||
metric: ExecutionPlanMetricsSet,
|
||||
}
|
||||
@@ -163,7 +177,7 @@ impl ExecutionPlan for SeriesDivideExec {
|
||||
|
||||
fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
|
||||
let input_schema = self.input.schema();
|
||||
let exprs: Vec<PhysicalSortRequirement> = self
|
||||
let mut exprs: Vec<PhysicalSortRequirement> = self
|
||||
.tag_columns
|
||||
.iter()
|
||||
.map(|tag| PhysicalSortRequirement {
|
||||
@@ -175,11 +189,17 @@ impl ExecutionPlan for SeriesDivideExec {
|
||||
}),
|
||||
})
|
||||
.collect();
|
||||
if !exprs.is_empty() {
|
||||
vec![Some(LexRequirement::new(exprs))]
|
||||
} else {
|
||||
vec![None]
|
||||
}
|
||||
|
||||
exprs.push(PhysicalSortRequirement {
|
||||
expr: Arc::new(
|
||||
ColumnExpr::new_with_schema(&self.time_index_column, &input_schema).unwrap(),
|
||||
),
|
||||
options: Some(SortOptions {
|
||||
descending: false,
|
||||
nulls_first: true,
|
||||
}),
|
||||
});
|
||||
vec![Some(LexRequirement::new(exprs))]
|
||||
}
|
||||
|
||||
fn maintains_input_order(&self) -> Vec<bool> {
|
||||
@@ -197,6 +217,7 @@ impl ExecutionPlan for SeriesDivideExec {
|
||||
assert!(!children.is_empty());
|
||||
Ok(Arc::new(Self {
|
||||
tag_columns: self.tag_columns.clone(),
|
||||
time_index_column: self.time_index_column.clone(),
|
||||
input: children[0].clone(),
|
||||
metric: self.metric.clone(),
|
||||
}))
|
||||
@@ -474,6 +495,11 @@ mod test {
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("host", DataType::Utf8, true),
|
||||
Field::new("path", DataType::Utf8, true),
|
||||
Field::new(
|
||||
"time_index",
|
||||
DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
|
||||
false,
|
||||
),
|
||||
]));
|
||||
|
||||
let path_column_1 = Arc::new(StringArray::from(vec![
|
||||
@@ -482,9 +508,17 @@ mod test {
|
||||
let host_column_1 = Arc::new(StringArray::from(vec![
|
||||
"000", "000", "001", "002", "002", "002", "002", "002", "003", "005", "005", "005",
|
||||
])) as _;
|
||||
let time_index_column_1 = Arc::new(
|
||||
datafusion::arrow::array::TimestampMillisecondArray::from(vec![
|
||||
1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 11000, 12000,
|
||||
]),
|
||||
) as _;
|
||||
|
||||
let path_column_2 = Arc::new(StringArray::from(vec!["bla", "bla", "bla"])) as _;
|
||||
let host_column_2 = Arc::new(StringArray::from(vec!["005", "005", "005"])) as _;
|
||||
let time_index_column_2 = Arc::new(
|
||||
datafusion::arrow::array::TimestampMillisecondArray::from(vec![13000, 14000, 15000]),
|
||||
) as _;
|
||||
|
||||
let path_column_3 = Arc::new(StringArray::from(vec![
|
||||
"bla", "🥺", "🥺", "🥺", "🥺", "🥺", "🫠", "🫠",
|
||||
@@ -492,13 +526,26 @@ mod test {
|
||||
let host_column_3 = Arc::new(StringArray::from(vec![
|
||||
"005", "001", "001", "001", "001", "001", "001", "001",
|
||||
])) as _;
|
||||
let time_index_column_3 =
|
||||
Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
|
||||
vec![16000, 17000, 18000, 19000, 20000, 21000, 22000, 23000],
|
||||
)) as _;
|
||||
|
||||
let data_1 =
|
||||
RecordBatch::try_new(schema.clone(), vec![path_column_1, host_column_1]).unwrap();
|
||||
let data_2 =
|
||||
RecordBatch::try_new(schema.clone(), vec![path_column_2, host_column_2]).unwrap();
|
||||
let data_3 =
|
||||
RecordBatch::try_new(schema.clone(), vec![path_column_3, host_column_3]).unwrap();
|
||||
let data_1 = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![path_column_1, host_column_1, time_index_column_1],
|
||||
)
|
||||
.unwrap();
|
||||
let data_2 = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![path_column_2, host_column_2, time_index_column_2],
|
||||
)
|
||||
.unwrap();
|
||||
let data_3 = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![path_column_3, host_column_3, time_index_column_3],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
MemoryExec::try_new(&[vec![data_1, data_2, data_3]], schema, None).unwrap()
|
||||
}
|
||||
@@ -508,6 +555,7 @@ mod test {
|
||||
let memory_exec = Arc::new(prepare_test_data());
|
||||
let divide_exec = Arc::new(SeriesDivideExec {
|
||||
tag_columns: vec!["host".to_string(), "path".to_string()],
|
||||
time_index_column: "time_index".to_string(),
|
||||
input: memory_exec,
|
||||
metric: ExecutionPlanMetricsSet::new(),
|
||||
});
|
||||
@@ -520,33 +568,33 @@ mod test {
|
||||
.to_string();
|
||||
|
||||
let expected = String::from(
|
||||
"+------+------+\
|
||||
\n| host | path |\
|
||||
\n+------+------+\
|
||||
\n| foo | 000 |\
|
||||
\n| foo | 000 |\
|
||||
\n| foo | 001 |\
|
||||
\n| bar | 002 |\
|
||||
\n| bar | 002 |\
|
||||
\n| bar | 002 |\
|
||||
\n| bar | 002 |\
|
||||
\n| bar | 002 |\
|
||||
\n| bar | 003 |\
|
||||
\n| bla | 005 |\
|
||||
\n| bla | 005 |\
|
||||
\n| bla | 005 |\
|
||||
\n| bla | 005 |\
|
||||
\n| bla | 005 |\
|
||||
\n| bla | 005 |\
|
||||
\n| bla | 005 |\
|
||||
\n| 🥺 | 001 |\
|
||||
\n| 🥺 | 001 |\
|
||||
\n| 🥺 | 001 |\
|
||||
\n| 🥺 | 001 |\
|
||||
\n| 🥺 | 001 |\
|
||||
\n| 🫠 | 001 |\
|
||||
\n| 🫠 | 001 |\
|
||||
\n+------+------+",
|
||||
"+------+------+---------------------+\
|
||||
\n| host | path | time_index |\
|
||||
\n+------+------+---------------------+\
|
||||
\n| foo | 000 | 1970-01-01T00:00:01 |\
|
||||
\n| foo | 000 | 1970-01-01T00:00:02 |\
|
||||
\n| foo | 001 | 1970-01-01T00:00:03 |\
|
||||
\n| bar | 002 | 1970-01-01T00:00:04 |\
|
||||
\n| bar | 002 | 1970-01-01T00:00:05 |\
|
||||
\n| bar | 002 | 1970-01-01T00:00:06 |\
|
||||
\n| bar | 002 | 1970-01-01T00:00:07 |\
|
||||
\n| bar | 002 | 1970-01-01T00:00:08 |\
|
||||
\n| bar | 003 | 1970-01-01T00:00:09 |\
|
||||
\n| bla | 005 | 1970-01-01T00:00:10 |\
|
||||
\n| bla | 005 | 1970-01-01T00:00:11 |\
|
||||
\n| bla | 005 | 1970-01-01T00:00:12 |\
|
||||
\n| bla | 005 | 1970-01-01T00:00:13 |\
|
||||
\n| bla | 005 | 1970-01-01T00:00:14 |\
|
||||
\n| bla | 005 | 1970-01-01T00:00:15 |\
|
||||
\n| bla | 005 | 1970-01-01T00:00:16 |\
|
||||
\n| 🥺 | 001 | 1970-01-01T00:00:17 |\
|
||||
\n| 🥺 | 001 | 1970-01-01T00:00:18 |\
|
||||
\n| 🥺 | 001 | 1970-01-01T00:00:19 |\
|
||||
\n| 🥺 | 001 | 1970-01-01T00:00:20 |\
|
||||
\n| 🥺 | 001 | 1970-01-01T00:00:21 |\
|
||||
\n| 🫠 | 001 | 1970-01-01T00:00:22 |\
|
||||
\n| 🫠 | 001 | 1970-01-01T00:00:23 |\
|
||||
\n+------+------+---------------------+",
|
||||
);
|
||||
assert_eq!(result_literal, expected);
|
||||
}
|
||||
@@ -556,6 +604,7 @@ mod test {
|
||||
let memory_exec = Arc::new(prepare_test_data());
|
||||
let divide_exec = Arc::new(SeriesDivideExec {
|
||||
tag_columns: vec!["host".to_string(), "path".to_string()],
|
||||
time_index_column: "time_index".to_string(),
|
||||
input: memory_exec,
|
||||
metric: ExecutionPlanMetricsSet::new(),
|
||||
});
|
||||
@@ -565,69 +614,69 @@ mod test {
|
||||
|
||||
let mut expectations = vec![
|
||||
String::from(
|
||||
"+------+------+\
|
||||
\n| host | path |\
|
||||
\n+------+------+\
|
||||
\n| foo | 000 |\
|
||||
\n| foo | 000 |\
|
||||
\n+------+------+",
|
||||
"+------+------+---------------------+\
|
||||
\n| host | path | time_index |\
|
||||
\n+------+------+---------------------+\
|
||||
\n| foo | 000 | 1970-01-01T00:00:01 |\
|
||||
\n| foo | 000 | 1970-01-01T00:00:02 |\
|
||||
\n+------+------+---------------------+",
|
||||
),
|
||||
String::from(
|
||||
"+------+------+\
|
||||
\n| host | path |\
|
||||
\n+------+------+\
|
||||
\n| foo | 001 |\
|
||||
\n+------+------+",
|
||||
"+------+------+---------------------+\
|
||||
\n| host | path | time_index |\
|
||||
\n+------+------+---------------------+\
|
||||
\n| foo | 001 | 1970-01-01T00:00:03 |\
|
||||
\n+------+------+---------------------+",
|
||||
),
|
||||
String::from(
|
||||
"+------+------+\
|
||||
\n| host | path |\
|
||||
\n+------+------+\
|
||||
\n| bar | 002 |\
|
||||
\n| bar | 002 |\
|
||||
\n| bar | 002 |\
|
||||
\n| bar | 002 |\
|
||||
\n| bar | 002 |\
|
||||
\n+------+------+",
|
||||
"+------+------+---------------------+\
|
||||
\n| host | path | time_index |\
|
||||
\n+------+------+---------------------+\
|
||||
\n| bar | 002 | 1970-01-01T00:00:04 |\
|
||||
\n| bar | 002 | 1970-01-01T00:00:05 |\
|
||||
\n| bar | 002 | 1970-01-01T00:00:06 |\
|
||||
\n| bar | 002 | 1970-01-01T00:00:07 |\
|
||||
\n| bar | 002 | 1970-01-01T00:00:08 |\
|
||||
\n+------+------+---------------------+",
|
||||
),
|
||||
String::from(
|
||||
"+------+------+\
|
||||
\n| host | path |\
|
||||
\n+------+------+\
|
||||
\n| bar | 003 |\
|
||||
\n+------+------+",
|
||||
"+------+------+---------------------+\
|
||||
\n| host | path | time_index |\
|
||||
\n+------+------+---------------------+\
|
||||
\n| bar | 003 | 1970-01-01T00:00:09 |\
|
||||
\n+------+------+---------------------+",
|
||||
),
|
||||
String::from(
|
||||
"+------+------+\
|
||||
\n| host | path |\
|
||||
\n+------+------+\
|
||||
\n| bla | 005 |\
|
||||
\n| bla | 005 |\
|
||||
\n| bla | 005 |\
|
||||
\n| bla | 005 |\
|
||||
\n| bla | 005 |\
|
||||
\n| bla | 005 |\
|
||||
\n| bla | 005 |\
|
||||
\n+------+------+",
|
||||
"+------+------+---------------------+\
|
||||
\n| host | path | time_index |\
|
||||
\n+------+------+---------------------+\
|
||||
\n| bla | 005 | 1970-01-01T00:00:10 |\
|
||||
\n| bla | 005 | 1970-01-01T00:00:11 |\
|
||||
\n| bla | 005 | 1970-01-01T00:00:12 |\
|
||||
\n| bla | 005 | 1970-01-01T00:00:13 |\
|
||||
\n| bla | 005 | 1970-01-01T00:00:14 |\
|
||||
\n| bla | 005 | 1970-01-01T00:00:15 |\
|
||||
\n| bla | 005 | 1970-01-01T00:00:16 |\
|
||||
\n+------+------+---------------------+",
|
||||
),
|
||||
String::from(
|
||||
"+------+------+\
|
||||
\n| host | path |\
|
||||
\n+------+------+\
|
||||
\n| 🥺 | 001 |\
|
||||
\n| 🥺 | 001 |\
|
||||
\n| 🥺 | 001 |\
|
||||
\n| 🥺 | 001 |\
|
||||
\n| 🥺 | 001 |\
|
||||
\n+------+------+",
|
||||
"+------+------+---------------------+\
|
||||
\n| host | path | time_index |\
|
||||
\n+------+------+---------------------+\
|
||||
\n| 🥺 | 001 | 1970-01-01T00:00:17 |\
|
||||
\n| 🥺 | 001 | 1970-01-01T00:00:18 |\
|
||||
\n| 🥺 | 001 | 1970-01-01T00:00:19 |\
|
||||
\n| 🥺 | 001 | 1970-01-01T00:00:20 |\
|
||||
\n| 🥺 | 001 | 1970-01-01T00:00:21 |\
|
||||
\n+------+------+---------------------+",
|
||||
),
|
||||
String::from(
|
||||
"+------+------+\
|
||||
\n| host | path |\
|
||||
\n+------+------+\
|
||||
\n| 🫠 | 001 |\
|
||||
\n| 🫠 | 001 |\
|
||||
\n+------+------+",
|
||||
"+------+------+---------------------+\
|
||||
\n| host | path | time_index |\
|
||||
\n+------+------+---------------------+\
|
||||
\n| 🫠 | 001 | 1970-01-01T00:00:22 |\
|
||||
\n| 🫠 | 001 | 1970-01-01T00:00:23 |\
|
||||
\n+------+------+---------------------+",
|
||||
),
|
||||
];
|
||||
expectations.reverse();
|
||||
@@ -648,6 +697,11 @@ mod test {
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("host", DataType::Utf8, true),
|
||||
Field::new("path", DataType::Utf8, true),
|
||||
Field::new(
|
||||
"time_index",
|
||||
DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
|
||||
false,
|
||||
),
|
||||
]));
|
||||
|
||||
// Create batches with three different combinations
|
||||
@@ -660,6 +714,9 @@ mod test {
|
||||
vec![
|
||||
Arc::new(StringArray::from(vec!["server1", "server1", "server1"])) as _,
|
||||
Arc::new(StringArray::from(vec!["/var/log", "/var/log", "/var/log"])) as _,
|
||||
Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
|
||||
vec![1000, 2000, 3000],
|
||||
)) as _,
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
@@ -669,6 +726,9 @@ mod test {
|
||||
vec![
|
||||
Arc::new(StringArray::from(vec!["server1", "server1"])) as _,
|
||||
Arc::new(StringArray::from(vec!["/var/log", "/var/log"])) as _,
|
||||
Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
|
||||
vec![4000, 5000],
|
||||
)) as _,
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
@@ -683,6 +743,9 @@ mod test {
|
||||
"/var/data",
|
||||
"/var/data",
|
||||
])) as _,
|
||||
Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
|
||||
vec![6000, 7000, 8000],
|
||||
)) as _,
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
@@ -692,6 +755,9 @@ mod test {
|
||||
vec![
|
||||
Arc::new(StringArray::from(vec!["server2"])) as _,
|
||||
Arc::new(StringArray::from(vec!["/var/data"])) as _,
|
||||
Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
|
||||
vec![9000],
|
||||
)) as _,
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
@@ -702,6 +768,9 @@ mod test {
|
||||
vec![
|
||||
Arc::new(StringArray::from(vec!["server3", "server3"])) as _,
|
||||
Arc::new(StringArray::from(vec!["/opt/logs", "/opt/logs"])) as _,
|
||||
Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
|
||||
vec![10000, 11000],
|
||||
)) as _,
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
@@ -715,6 +784,9 @@ mod test {
|
||||
"/opt/logs",
|
||||
"/opt/logs",
|
||||
])) as _,
|
||||
Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
|
||||
vec![12000, 13000, 14000],
|
||||
)) as _,
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
@@ -732,6 +804,7 @@ mod test {
|
||||
// Create SeriesDivideExec
|
||||
let divide_exec = Arc::new(SeriesDivideExec {
|
||||
tag_columns: vec!["host".to_string(), "path".to_string()],
|
||||
time_index_column: "time_index".to_string(),
|
||||
input: memory_exec,
|
||||
metric: ExecutionPlanMetricsSet::new(),
|
||||
});
|
||||
@@ -766,10 +839,16 @@ mod test {
|
||||
.as_any()
|
||||
.downcast_ref::<StringArray>()
|
||||
.unwrap();
|
||||
let time_index_array1 = result[0]
|
||||
.column(2)
|
||||
.as_any()
|
||||
.downcast_ref::<datafusion::arrow::array::TimestampMillisecondArray>()
|
||||
.unwrap();
|
||||
|
||||
for i in 0..5 {
|
||||
assert_eq!(host_array1.value(i), "server1");
|
||||
assert_eq!(path_array1.value(i), "/var/log");
|
||||
assert_eq!(time_index_array1.value(i), 1000 + (i as i64) * 1000);
|
||||
}
|
||||
|
||||
// Verify values in second batch (server2, /var/data)
|
||||
@@ -783,10 +862,16 @@ mod test {
|
||||
.as_any()
|
||||
.downcast_ref::<StringArray>()
|
||||
.unwrap();
|
||||
let time_index_array2 = result[1]
|
||||
.column(2)
|
||||
.as_any()
|
||||
.downcast_ref::<datafusion::arrow::array::TimestampMillisecondArray>()
|
||||
.unwrap();
|
||||
|
||||
for i in 0..4 {
|
||||
assert_eq!(host_array2.value(i), "server2");
|
||||
assert_eq!(path_array2.value(i), "/var/data");
|
||||
assert_eq!(time_index_array2.value(i), 6000 + (i as i64) * 1000);
|
||||
}
|
||||
|
||||
// Verify values in third batch (server3, /opt/logs)
|
||||
@@ -800,10 +885,16 @@ mod test {
|
||||
.as_any()
|
||||
.downcast_ref::<StringArray>()
|
||||
.unwrap();
|
||||
let time_index_array3 = result[2]
|
||||
.column(2)
|
||||
.as_any()
|
||||
.downcast_ref::<datafusion::arrow::array::TimestampMillisecondArray>()
|
||||
.unwrap();
|
||||
|
||||
for i in 0..5 {
|
||||
assert_eq!(host_array3.value(i), "server3");
|
||||
assert_eq!(path_array3.value(i), "/opt/logs");
|
||||
assert_eq!(time_index_array3.value(i), 10000 + (i as i64) * 1000);
|
||||
}
|
||||
|
||||
// Also verify streaming behavior
|
||||
|
||||
@@ -1035,8 +1035,19 @@ impl PromPlanner {
|
||||
.context(DataFusionPlanningSnafu)?;
|
||||
|
||||
// make divide plan
|
||||
let time_index_column =
|
||||
self.ctx
|
||||
.time_index_column
|
||||
.clone()
|
||||
.with_context(|| TimeIndexNotFoundSnafu {
|
||||
table: table_ref.to_string(),
|
||||
})?;
|
||||
let divide_plan = LogicalPlan::Extension(Extension {
|
||||
node: Arc::new(SeriesDivide::new(self.ctx.tag_columns.clone(), sort_plan)),
|
||||
node: Arc::new(SeriesDivide::new(
|
||||
self.ctx.tag_columns.clone(),
|
||||
time_index_column,
|
||||
sort_plan,
|
||||
)),
|
||||
});
|
||||
|
||||
// make series_normalize plan
|
||||
|
||||
@@ -55,7 +55,7 @@ pub use show_create_table::create_table_stmt;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use sql::ast::Ident;
|
||||
use sql::parser::ParserContext;
|
||||
use sql::statements::create::{CreateDatabase, CreateFlow, CreateView, Partitions};
|
||||
use sql::statements::create::{CreateDatabase, CreateFlow, CreateView, Partitions, SqlOrTql};
|
||||
use sql::statements::show::{
|
||||
ShowColumns, ShowDatabases, ShowFlows, ShowIndex, ShowKind, ShowRegion, ShowTableStatus,
|
||||
ShowTables, ShowVariables, ShowViews,
|
||||
@@ -958,7 +958,15 @@ pub fn show_create_flow(
|
||||
let mut parser_ctx =
|
||||
ParserContext::new(query_ctx.sql_dialect(), flow_val.raw_sql()).context(error::SqlSnafu)?;
|
||||
|
||||
let query = parser_ctx.parser_query().context(error::SqlSnafu)?;
|
||||
let query = parser_ctx.parse_statement().context(error::SqlSnafu)?;
|
||||
|
||||
// since prom ql will parse `now()` to a fixed time, we need to not use it for generating raw query
|
||||
let raw_query = match &query {
|
||||
Statement::Tql(_) => flow_val.raw_sql().clone(),
|
||||
_ => query.to_string(),
|
||||
};
|
||||
|
||||
let query = Box::new(SqlOrTql::try_from_statement(query, &raw_query).context(error::SqlSnafu)?);
|
||||
|
||||
let comment = if flow_val.comment().is_empty() {
|
||||
None
|
||||
|
||||
@@ -1 +1 @@
|
||||
v0.9.0
|
||||
v0.9.2
|
||||
|
||||
@@ -167,7 +167,8 @@ impl GreptimeRequestHandler {
|
||||
let timer = metrics::GRPC_BULK_INSERT_ELAPSED.start_timer();
|
||||
let result = handler
|
||||
.put_record_batch(&table_name, &mut table_id, &mut decoder, data)
|
||||
.await;
|
||||
.await
|
||||
.inspect_err(|e| error!(e; "Failed to handle flight record batches"));
|
||||
timer.observe_duration();
|
||||
let result = result
|
||||
.map(|x| DoPutResponse::new(request_id, x))
|
||||
|
||||
@@ -475,6 +475,38 @@ pub fn mock_timeseries() -> Vec<TimeSeries> {
|
||||
]
|
||||
}
|
||||
|
||||
/// Add new labels to the mock timeseries.
|
||||
pub fn mock_timeseries_new_label() -> Vec<TimeSeries> {
|
||||
let ts_demo_metrics = TimeSeries {
|
||||
labels: vec![
|
||||
new_label(METRIC_NAME_LABEL.to_string(), "demo_metrics".to_string()),
|
||||
new_label("idc".to_string(), "idc3".to_string()),
|
||||
new_label("new_label1".to_string(), "foo".to_string()),
|
||||
],
|
||||
samples: vec![Sample {
|
||||
value: 42.0,
|
||||
timestamp: 3000,
|
||||
}],
|
||||
..Default::default()
|
||||
};
|
||||
let ts_multi_labels = TimeSeries {
|
||||
labels: vec![
|
||||
new_label(METRIC_NAME_LABEL.to_string(), "metric1".to_string()),
|
||||
new_label("idc".to_string(), "idc4".to_string()),
|
||||
new_label("env".to_string(), "prod".to_string()),
|
||||
new_label("host".to_string(), "host9".to_string()),
|
||||
new_label("new_label2".to_string(), "bar".to_string()),
|
||||
],
|
||||
samples: vec![Sample {
|
||||
value: 99.0,
|
||||
timestamp: 4000,
|
||||
}],
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
vec![ts_demo_metrics, ts_multi_labels]
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -195,6 +195,13 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid flow query: {}", reason))]
|
||||
InvalidFlowQuery {
|
||||
reason: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid default constraint, column: {}", column))]
|
||||
InvalidDefault {
|
||||
column: String,
|
||||
@@ -390,6 +397,7 @@ impl ErrorExt for Error {
|
||||
| ColumnTypeMismatch { .. }
|
||||
| InvalidTableName { .. }
|
||||
| InvalidFlowName { .. }
|
||||
| InvalidFlowQuery { .. }
|
||||
| InvalidSqlValue { .. }
|
||||
| TimestampOverflow { .. }
|
||||
| InvalidTableOption { .. }
|
||||
|
||||
@@ -40,12 +40,12 @@ use crate::parsers::utils::{
|
||||
};
|
||||
use crate::statements::create::{
|
||||
Column, ColumnExtensions, CreateDatabase, CreateExternalTable, CreateFlow, CreateTable,
|
||||
CreateTableLike, CreateView, Partitions, TableConstraint, VECTOR_OPT_DIM,
|
||||
CreateTableLike, CreateView, Partitions, SqlOrTql, TableConstraint, VECTOR_OPT_DIM,
|
||||
};
|
||||
use crate::statements::statement::Statement;
|
||||
use crate::statements::transform::type_alias::get_data_type_by_alias_name;
|
||||
use crate::statements::{sql_data_type_to_concrete_data_type, OptionMap};
|
||||
use crate::util::parse_option_string;
|
||||
use crate::util::{location_to_index, parse_option_string};
|
||||
|
||||
pub const ENGINE: &str = "ENGINE";
|
||||
pub const MAXVALUE: &str = "MAXVALUE";
|
||||
@@ -282,12 +282,13 @@ impl<'a> ParserContext<'a> {
|
||||
.consume_tokens(&[Token::make_keyword(EXPIRE), Token::make_keyword(AFTER)])
|
||||
{
|
||||
let expire_after_expr = self.parser.parse_expr().context(error::SyntaxSnafu)?;
|
||||
let expire_after_lit = utils::parser_expr_to_scalar_value(expire_after_expr.clone())?
|
||||
.cast_to(&ArrowDataType::Interval(IntervalUnit::MonthDayNano))
|
||||
.ok()
|
||||
.with_context(|| InvalidIntervalSnafu {
|
||||
reason: format!("cannot cast {} to interval type", expire_after_expr),
|
||||
})?;
|
||||
let expire_after_lit =
|
||||
utils::parser_expr_to_scalar_value_literal(expire_after_expr.clone())?
|
||||
.cast_to(&ArrowDataType::Interval(IntervalUnit::MonthDayNano))
|
||||
.ok()
|
||||
.with_context(|| InvalidIntervalSnafu {
|
||||
reason: format!("cannot cast {} to interval type", expire_after_expr),
|
||||
})?;
|
||||
if let ScalarValue::IntervalMonthDayNano(Some(interval)) = expire_after_lit {
|
||||
Some(
|
||||
interval.nanoseconds / 1_000_000_000
|
||||
@@ -324,7 +325,22 @@ impl<'a> ParserContext<'a> {
|
||||
.expect_keyword(Keyword::AS)
|
||||
.context(SyntaxSnafu)?;
|
||||
|
||||
let query = self.parser.parse_query().context(error::SyntaxSnafu)?;
|
||||
let start_loc = self.parser.peek_token().span.start;
|
||||
let start_index = location_to_index(self.sql, &start_loc);
|
||||
|
||||
let query = self.parse_statement()?;
|
||||
let end_token = self.parser.peek_token();
|
||||
|
||||
let raw_query = if end_token == Token::EOF {
|
||||
&self.sql[start_index..]
|
||||
} else {
|
||||
let end_loc = end_token.span.end;
|
||||
let end_index = location_to_index(self.sql, &end_loc);
|
||||
&self.sql[start_index..end_index.min(self.sql.len())]
|
||||
};
|
||||
let raw_query = raw_query.trim_end_matches(";");
|
||||
|
||||
let query = Box::new(SqlOrTql::try_from_statement(query, raw_query)?);
|
||||
|
||||
Ok(Statement::CreateFlow(CreateFlow {
|
||||
flow_name,
|
||||
|
||||
@@ -23,6 +23,7 @@ use crate::parser::ParserContext;
|
||||
use crate::parsers::utils;
|
||||
use crate::statements::statement::Statement;
|
||||
use crate::statements::tql::{Tql, TqlAnalyze, TqlEval, TqlExplain, TqlParameters};
|
||||
use crate::util::location_to_index;
|
||||
|
||||
pub const TQL: &str = "TQL";
|
||||
const EVAL: &str = "EVAL";
|
||||
@@ -159,7 +160,7 @@ impl ParserContext<'_> {
|
||||
let value = match tokens[0].clone() {
|
||||
Token::Number(n, _) => n,
|
||||
Token::DoubleQuotedString(s) | Token::SingleQuotedString(s) => s,
|
||||
Token::Word(_) => Self::parse_tokens(tokens)?,
|
||||
Token::Word(_) => Self::parse_tokens_to_ts(tokens)?,
|
||||
unexpected => {
|
||||
return Err(ParserError::ParserError(format!(
|
||||
"Expected number, string or word, but have {unexpected:?}"
|
||||
@@ -169,7 +170,7 @@ impl ParserContext<'_> {
|
||||
};
|
||||
Ok(value)
|
||||
}
|
||||
_ => Self::parse_tokens(tokens),
|
||||
_ => Self::parse_tokens_to_ts(tokens),
|
||||
};
|
||||
for token in delimiter_tokens {
|
||||
if parser.consume_token(token) {
|
||||
@@ -182,9 +183,10 @@ impl ParserContext<'_> {
|
||||
.context(ParserSnafu)
|
||||
}
|
||||
|
||||
fn parse_tokens(tokens: Vec<Token>) -> std::result::Result<String, TQLError> {
|
||||
/// Parse the tokens to seconds and convert to string.
|
||||
fn parse_tokens_to_ts(tokens: Vec<Token>) -> std::result::Result<String, TQLError> {
|
||||
let parser_expr = Self::parse_to_expr(tokens)?;
|
||||
let lit = utils::parser_expr_to_scalar_value(parser_expr)
|
||||
let lit = utils::parser_expr_to_scalar_value_literal(parser_expr)
|
||||
.map_err(Box::new)
|
||||
.context(ConvertToLogicalExpressionSnafu)?;
|
||||
|
||||
@@ -217,11 +219,15 @@ impl ParserContext<'_> {
|
||||
while matches!(parser.peek_token().token, Token::Comma) {
|
||||
let _skip_token = parser.next_token();
|
||||
}
|
||||
let index = parser.next_token().span.start.column as usize;
|
||||
if index == 0 {
|
||||
let start_tql = parser.next_token();
|
||||
if start_tql == Token::EOF {
|
||||
return Err(ParserError::ParserError("empty TQL query".to_string()));
|
||||
}
|
||||
|
||||
let start_location = start_tql.span.start;
|
||||
// translate the start location to the index in the sql string
|
||||
let index = location_to_index(sql, &start_location);
|
||||
|
||||
let query = &sql[index - 1..];
|
||||
while parser.next_token() != Token::EOF {
|
||||
// consume all tokens
|
||||
|
||||
@@ -41,7 +41,7 @@ use crate::error::{
|
||||
/// Convert a parser expression to a scalar value. This function will try the
|
||||
/// best to resolve and reduce constants. Exprs like `1 + 1` or `now()` can be
|
||||
/// handled properly.
|
||||
pub fn parser_expr_to_scalar_value(expr: sqlparser::ast::Expr) -> Result<ScalarValue> {
|
||||
pub fn parser_expr_to_scalar_value_literal(expr: sqlparser::ast::Expr) -> Result<ScalarValue> {
|
||||
// 1. convert parser expr to logical expr
|
||||
let empty_df_schema = DFSchema::empty();
|
||||
let logical_expr = SqlToRel::new(&StubContextProvider::default())
|
||||
|
||||
@@ -24,8 +24,11 @@ use sqlparser::ast::{ColumnOptionDef, DataType, Expr, Query};
|
||||
use sqlparser_derive::{Visit, VisitMut};
|
||||
|
||||
use crate::ast::{ColumnDef, Ident, ObjectName, Value as SqlValue};
|
||||
use crate::error::{Result, SetFulltextOptionSnafu, SetSkippingIndexOptionSnafu};
|
||||
use crate::error::{
|
||||
InvalidFlowQuerySnafu, Result, SetFulltextOptionSnafu, SetSkippingIndexOptionSnafu,
|
||||
};
|
||||
use crate::statements::statement::Statement;
|
||||
use crate::statements::tql::Tql;
|
||||
use crate::statements::OptionMap;
|
||||
|
||||
const LINE_SEP: &str = ",\n";
|
||||
@@ -374,7 +377,41 @@ pub struct CreateFlow {
|
||||
/// Comment string
|
||||
pub comment: Option<String>,
|
||||
/// SQL statement
|
||||
pub query: Box<Query>,
|
||||
pub query: Box<SqlOrTql>,
|
||||
}
|
||||
|
||||
/// Either a sql query or a tql query
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut, Serialize)]
|
||||
pub enum SqlOrTql {
|
||||
Sql(Query, String),
|
||||
Tql(Tql, String),
|
||||
}
|
||||
|
||||
impl std::fmt::Display for SqlOrTql {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::Sql(_, s) => write!(f, "{}", s),
|
||||
Self::Tql(_, s) => write!(f, "{}", s),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SqlOrTql {
|
||||
pub fn try_from_statement(
|
||||
value: Statement,
|
||||
original_query: &str,
|
||||
) -> std::result::Result<Self, crate::error::Error> {
|
||||
match value {
|
||||
Statement::Query(query) => {
|
||||
Ok(Self::Sql((*query).try_into()?, original_query.to_string()))
|
||||
}
|
||||
Statement::Tql(tql) => Ok(Self::Tql(tql, original_query.to_string())),
|
||||
_ => InvalidFlowQuerySnafu {
|
||||
reason: format!("Expect either sql query or promql query, found {:?}", value),
|
||||
}
|
||||
.fail(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for CreateFlow {
|
||||
@@ -741,7 +778,7 @@ WITH(
|
||||
r#"
|
||||
CREATE FLOW filter_numbers
|
||||
SINK TO out_num_cnt
|
||||
AS SELECT number FROM numbers_input WHERE number > 10"#,
|
||||
AS SELECT number FROM numbers_input where number > 10"#,
|
||||
&new_sql
|
||||
);
|
||||
|
||||
|
||||
@@ -42,7 +42,7 @@ fn format_tql(
|
||||
lookback: Option<&str>,
|
||||
query: &str,
|
||||
) -> std::fmt::Result {
|
||||
write!(f, "({start}, {end}, {step}")?;
|
||||
write!(f, "({start}, {end}, '{step}'")?;
|
||||
if let Some(lookback) = lookback {
|
||||
write!(f, ", {lookback}")?;
|
||||
}
|
||||
|
||||
@@ -15,9 +15,10 @@
|
||||
use std::collections::HashSet;
|
||||
use std::fmt::{Display, Formatter};
|
||||
|
||||
use sqlparser::ast::{Expr, ObjectName, Query, SetExpr, SqlOption, TableFactor, Value};
|
||||
use sqlparser::ast::{Expr, ObjectName, SetExpr, SqlOption, TableFactor, Value};
|
||||
|
||||
use crate::error::{InvalidSqlSnafu, InvalidTableOptionValueSnafu, Result};
|
||||
use crate::statements::create::SqlOrTql;
|
||||
|
||||
/// Format an [ObjectName] without any quote of its idents.
|
||||
pub fn format_raw_object_name(name: &ObjectName) -> String {
|
||||
@@ -58,14 +59,36 @@ pub fn parse_option_string(option: SqlOption) -> Result<(String, String)> {
|
||||
}
|
||||
|
||||
/// Walk through a [Query] and extract all the tables referenced in it.
|
||||
pub fn extract_tables_from_query(query: &Query) -> impl Iterator<Item = ObjectName> {
|
||||
pub fn extract_tables_from_query(query: &SqlOrTql) -> impl Iterator<Item = ObjectName> {
|
||||
let mut names = HashSet::new();
|
||||
|
||||
extract_tables_from_set_expr(&query.body, &mut names);
|
||||
match query {
|
||||
SqlOrTql::Sql(query, _) => extract_tables_from_set_expr(&query.body, &mut names),
|
||||
SqlOrTql::Tql(_tql, _) => {
|
||||
// since tql have sliding time window, so we don't need to extract tables from it
|
||||
// (because we are going to eval it fully anyway)
|
||||
}
|
||||
}
|
||||
|
||||
names.into_iter()
|
||||
}
|
||||
|
||||
/// translate the start location to the index in the sql string
|
||||
pub fn location_to_index(sql: &str, location: &sqlparser::tokenizer::Location) -> usize {
|
||||
let mut index = 0;
|
||||
for (lno, line) in sql.lines().enumerate() {
|
||||
if lno + 1 == location.line as usize {
|
||||
index += location.column as usize;
|
||||
break;
|
||||
} else {
|
||||
index += line.len() + 1; // +1 for the newline
|
||||
}
|
||||
}
|
||||
// -1 because the index is 0-based
|
||||
// and the location is 1-based
|
||||
index - 1
|
||||
}
|
||||
|
||||
/// Helper function for [extract_tables_from_query].
|
||||
///
|
||||
/// Handle [SetExpr].
|
||||
@@ -98,3 +121,53 @@ fn table_factor_to_object_name(table_factor: &TableFactor, names: &mut HashSet<O
|
||||
names.insert(name.to_owned());
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use sqlparser::tokenizer::Token;
|
||||
|
||||
use super::*;
|
||||
use crate::dialect::GreptimeDbDialect;
|
||||
use crate::parser::ParserContext;
|
||||
|
||||
#[test]
|
||||
fn test_location_to_index() {
|
||||
let testcases = vec![
|
||||
"SELECT * FROM t WHERE a = 1",
|
||||
// start or end with newline
|
||||
r"
|
||||
SELECT *
|
||||
FROM
|
||||
t
|
||||
WHERE a =
|
||||
1
|
||||
",
|
||||
r"SELECT *
|
||||
FROM
|
||||
t
|
||||
WHERE a =
|
||||
1
|
||||
",
|
||||
r"
|
||||
SELECT *
|
||||
FROM
|
||||
t
|
||||
WHERE a =
|
||||
1",
|
||||
];
|
||||
|
||||
for sql in testcases {
|
||||
let mut parser = ParserContext::new(&GreptimeDbDialect {}, sql).unwrap();
|
||||
loop {
|
||||
let token = parser.parser.next_token();
|
||||
if token == Token::EOF {
|
||||
break;
|
||||
}
|
||||
let span = token.span;
|
||||
let subslice =
|
||||
&sql[location_to_index(sql, &span.start)..location_to_index(sql, &span.end)];
|
||||
assert_eq!(token.to_string(), subslice);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1027,14 +1027,6 @@ pub enum MetadataError {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to decode prost message"))]
|
||||
Prost {
|
||||
#[snafu(source)]
|
||||
error: prost::DecodeError,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
impl ErrorExt for MetadataError {
|
||||
|
||||
@@ -29,14 +29,8 @@ pub const SNAPSHOT_READ: &str = "snapshot_read";
|
||||
pub const COMPACTION_TYPE: &str = "compaction.type";
|
||||
/// TWCS compaction strategy.
|
||||
pub const COMPACTION_TYPE_TWCS: &str = "twcs";
|
||||
/// Option key for twcs max active window runs.
|
||||
pub const TWCS_MAX_ACTIVE_WINDOW_RUNS: &str = "compaction.twcs.max_active_window_runs";
|
||||
/// Option key for twcs max active window files.
|
||||
pub const TWCS_MAX_ACTIVE_WINDOW_FILES: &str = "compaction.twcs.max_active_window_files";
|
||||
/// Option key for twcs max inactive window runs.
|
||||
pub const TWCS_MAX_INACTIVE_WINDOW_RUNS: &str = "compaction.twcs.max_inactive_window_runs";
|
||||
/// Option key for twcs max inactive window files.
|
||||
pub const TWCS_MAX_INACTIVE_WINDOW_FILES: &str = "compaction.twcs.max_inactive_window_files";
|
||||
/// Option key for twcs min file num to trigger a compaction.
|
||||
pub const TWCS_TRIGGER_FILE_NUM: &str = "compaction.twcs.trigger_file_num";
|
||||
/// Option key for twcs max output file size.
|
||||
pub const TWCS_MAX_OUTPUT_FILE_SIZE: &str = "compaction.twcs.max_output_file_size";
|
||||
/// Option key for twcs time window.
|
||||
@@ -68,10 +62,7 @@ pub fn is_mito_engine_option_key(key: &str) -> bool {
|
||||
[
|
||||
"ttl",
|
||||
COMPACTION_TYPE,
|
||||
TWCS_MAX_ACTIVE_WINDOW_RUNS,
|
||||
TWCS_MAX_ACTIVE_WINDOW_FILES,
|
||||
TWCS_MAX_INACTIVE_WINDOW_RUNS,
|
||||
TWCS_MAX_INACTIVE_WINDOW_FILES,
|
||||
TWCS_TRIGGER_FILE_NUM,
|
||||
TWCS_MAX_OUTPUT_FILE_SIZE,
|
||||
TWCS_TIME_WINDOW,
|
||||
REMOTE_COMPACTION,
|
||||
@@ -100,10 +91,7 @@ mod tests {
|
||||
assert!(is_mito_engine_option_key("ttl"));
|
||||
assert!(is_mito_engine_option_key("compaction.type"));
|
||||
assert!(is_mito_engine_option_key(
|
||||
"compaction.twcs.max_active_window_runs"
|
||||
));
|
||||
assert!(is_mito_engine_option_key(
|
||||
"compaction.twcs.max_inactive_window_runs"
|
||||
"compaction.twcs.trigger_file_num"
|
||||
));
|
||||
assert!(is_mito_engine_option_key("compaction.twcs.time_window"));
|
||||
assert!(is_mito_engine_option_key("storage"));
|
||||
|
||||
@@ -22,22 +22,20 @@ use api::v1::column_def::{
|
||||
};
|
||||
use api::v1::region::bulk_insert_request::Body;
|
||||
use api::v1::region::{
|
||||
alter_request, compact_request, region_request, AlterRequest, AlterRequests, BulkInsertRequest,
|
||||
CloseRequest, CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest,
|
||||
DropRequests, FlushRequest, InsertRequests, OpenRequest, TruncateRequest,
|
||||
alter_request, compact_request, region_request, AlterRequest, AlterRequests, ArrowIpc,
|
||||
BulkInsertRequest, CloseRequest, CompactRequest, CreateRequest, CreateRequests, DeleteRequests,
|
||||
DropRequest, DropRequests, FlushRequest, InsertRequests, OpenRequest, TruncateRequest,
|
||||
};
|
||||
use api::v1::{
|
||||
self, set_index, Analyzer, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows,
|
||||
SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
|
||||
};
|
||||
pub use common_base::AffectedRows;
|
||||
use common_grpc::flight::{FlightDecoder, FlightMessage};
|
||||
use common_grpc::FlightData;
|
||||
use common_grpc::flight::FlightDecoder;
|
||||
use common_recordbatch::DfRecordBatch;
|
||||
use common_time::TimeToLive;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
|
||||
use prost::Message;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use strum::{AsRefStr, IntoStaticStr};
|
||||
@@ -46,15 +44,12 @@ use crate::logstore::entry;
|
||||
use crate::metadata::{
|
||||
ColumnMetadata, DecodeProtoSnafu, FlightCodecSnafu, InvalidRawRegionRequestSnafu,
|
||||
InvalidRegionRequestSnafu, InvalidSetRegionOptionRequestSnafu,
|
||||
InvalidUnsetRegionOptionRequestSnafu, MetadataError, ProstSnafu, RegionMetadata, Result,
|
||||
UnexpectedSnafu,
|
||||
InvalidUnsetRegionOptionRequestSnafu, MetadataError, RegionMetadata, Result, UnexpectedSnafu,
|
||||
};
|
||||
use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
|
||||
use crate::metrics;
|
||||
use crate::mito_engine_options::{
|
||||
TTL_KEY, TWCS_MAX_ACTIVE_WINDOW_FILES, TWCS_MAX_ACTIVE_WINDOW_RUNS,
|
||||
TWCS_MAX_INACTIVE_WINDOW_FILES, TWCS_MAX_INACTIVE_WINDOW_RUNS, TWCS_MAX_OUTPUT_FILE_SIZE,
|
||||
TWCS_TIME_WINDOW,
|
||||
TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM,
|
||||
};
|
||||
use crate::path_utils::region_dir;
|
||||
use crate::storage::{ColumnId, RegionId, ScanRequest};
|
||||
@@ -334,22 +329,21 @@ fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId,
|
||||
return Ok(vec![]);
|
||||
};
|
||||
|
||||
let ArrowIpc {
|
||||
region_id,
|
||||
schema,
|
||||
payload,
|
||||
data_header,
|
||||
} = request;
|
||||
let decoder_timer = metrics::CONVERT_REGION_BULK_REQUEST
|
||||
.with_label_values(&["decode"])
|
||||
.start_timer();
|
||||
let schema_data = FlightData::decode(request.schema.clone()).context(ProstSnafu)?;
|
||||
let payload_data = FlightData::decode(request.payload.clone()).context(ProstSnafu)?;
|
||||
let mut decoder = FlightDecoder::default();
|
||||
let _ = decoder.try_decode(&schema_data).context(FlightCodecSnafu)?;
|
||||
let FlightMessage::Recordbatch(rb) = decoder
|
||||
.try_decode(&payload_data)
|
||||
.context(FlightCodecSnafu)?
|
||||
else {
|
||||
unreachable!("Always expect record batch message after schema");
|
||||
};
|
||||
let mut decoder = FlightDecoder::try_from_schema_bytes(&schema).context(FlightCodecSnafu)?;
|
||||
let payload = decoder
|
||||
.try_decode_record_batch(&data_header, &payload)
|
||||
.context(FlightCodecSnafu)?;
|
||||
decoder_timer.observe_duration();
|
||||
let payload = rb.into_df_record_batch();
|
||||
let region_id: RegionId = request.region_id.into();
|
||||
let region_id: RegionId = region_id.into();
|
||||
Ok(vec![(
|
||||
region_id,
|
||||
RegionRequest::BulkInserts(RegionBulkInsertsRequest { region_id, payload }),
|
||||
@@ -1027,12 +1021,9 @@ impl TryFrom<&PbOption> for SetRegionOption {
|
||||
|
||||
Ok(Self::Ttl(Some(ttl)))
|
||||
}
|
||||
TWCS_MAX_ACTIVE_WINDOW_RUNS
|
||||
| TWCS_MAX_ACTIVE_WINDOW_FILES
|
||||
| TWCS_MAX_INACTIVE_WINDOW_FILES
|
||||
| TWCS_MAX_INACTIVE_WINDOW_RUNS
|
||||
| TWCS_MAX_OUTPUT_FILE_SIZE
|
||||
| TWCS_TIME_WINDOW => Ok(Self::Twsc(key.to_string(), value.to_string())),
|
||||
TWCS_TRIGGER_FILE_NUM | TWCS_MAX_OUTPUT_FILE_SIZE | TWCS_TIME_WINDOW => {
|
||||
Ok(Self::Twsc(key.to_string(), value.to_string()))
|
||||
}
|
||||
_ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
|
||||
}
|
||||
}
|
||||
@@ -1041,16 +1032,7 @@ impl TryFrom<&PbOption> for SetRegionOption {
|
||||
impl From<&UnsetRegionOption> for SetRegionOption {
|
||||
fn from(unset_option: &UnsetRegionOption) -> Self {
|
||||
match unset_option {
|
||||
UnsetRegionOption::TwcsMaxActiveWindowFiles => {
|
||||
SetRegionOption::Twsc(unset_option.to_string(), String::new())
|
||||
}
|
||||
UnsetRegionOption::TwcsMaxInactiveWindowFiles => {
|
||||
SetRegionOption::Twsc(unset_option.to_string(), String::new())
|
||||
}
|
||||
UnsetRegionOption::TwcsMaxActiveWindowRuns => {
|
||||
SetRegionOption::Twsc(unset_option.to_string(), String::new())
|
||||
}
|
||||
UnsetRegionOption::TwcsMaxInactiveWindowRuns => {
|
||||
UnsetRegionOption::TwcsTriggerFileNum => {
|
||||
SetRegionOption::Twsc(unset_option.to_string(), String::new())
|
||||
}
|
||||
UnsetRegionOption::TwcsMaxOutputFileSize => {
|
||||
@@ -1070,10 +1052,7 @@ impl TryFrom<&str> for UnsetRegionOption {
|
||||
fn try_from(key: &str) -> Result<Self> {
|
||||
match key.to_ascii_lowercase().as_str() {
|
||||
TTL_KEY => Ok(Self::Ttl),
|
||||
TWCS_MAX_ACTIVE_WINDOW_FILES => Ok(Self::TwcsMaxActiveWindowFiles),
|
||||
TWCS_MAX_INACTIVE_WINDOW_FILES => Ok(Self::TwcsMaxInactiveWindowFiles),
|
||||
TWCS_MAX_ACTIVE_WINDOW_RUNS => Ok(Self::TwcsMaxActiveWindowRuns),
|
||||
TWCS_MAX_INACTIVE_WINDOW_RUNS => Ok(Self::TwcsMaxInactiveWindowRuns),
|
||||
TWCS_TRIGGER_FILE_NUM => Ok(Self::TwcsTriggerFileNum),
|
||||
TWCS_MAX_OUTPUT_FILE_SIZE => Ok(Self::TwcsMaxOutputFileSize),
|
||||
TWCS_TIME_WINDOW => Ok(Self::TwcsTimeWindow),
|
||||
_ => InvalidUnsetRegionOptionRequestSnafu { key }.fail(),
|
||||
@@ -1083,10 +1062,7 @@ impl TryFrom<&str> for UnsetRegionOption {
|
||||
|
||||
#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
|
||||
pub enum UnsetRegionOption {
|
||||
TwcsMaxActiveWindowFiles,
|
||||
TwcsMaxInactiveWindowFiles,
|
||||
TwcsMaxActiveWindowRuns,
|
||||
TwcsMaxInactiveWindowRuns,
|
||||
TwcsTriggerFileNum,
|
||||
TwcsMaxOutputFileSize,
|
||||
TwcsTimeWindow,
|
||||
Ttl,
|
||||
@@ -1096,10 +1072,7 @@ impl UnsetRegionOption {
|
||||
pub fn as_str(&self) -> &str {
|
||||
match self {
|
||||
Self::Ttl => TTL_KEY,
|
||||
Self::TwcsMaxActiveWindowFiles => TWCS_MAX_ACTIVE_WINDOW_FILES,
|
||||
Self::TwcsMaxInactiveWindowFiles => TWCS_MAX_INACTIVE_WINDOW_FILES,
|
||||
Self::TwcsMaxActiveWindowRuns => TWCS_MAX_ACTIVE_WINDOW_RUNS,
|
||||
Self::TwcsMaxInactiveWindowRuns => TWCS_MAX_INACTIVE_WINDOW_RUNS,
|
||||
Self::TwcsTriggerFileNum => TWCS_TRIGGER_FILE_NUM,
|
||||
Self::TwcsMaxOutputFileSize => TWCS_MAX_OUTPUT_FILE_SIZE,
|
||||
Self::TwcsTimeWindow => TWCS_TIME_WINDOW,
|
||||
}
|
||||
|
||||
@@ -238,21 +238,9 @@ impl<R: Rng> Generator<AlterTableExpr, R> for AlterExprSetTableOptionsGenerator<
|
||||
let max_output_file_size: u64 = rng.random();
|
||||
AlterTableOption::TwcsMaxOutputFileSize(ReadableSize(max_output_file_size))
|
||||
}
|
||||
AlterTableOption::TwcsMaxInactiveWindowRuns(_) => {
|
||||
let max_inactive_window_runs: u64 = rng.random();
|
||||
AlterTableOption::TwcsMaxInactiveWindowRuns(max_inactive_window_runs)
|
||||
}
|
||||
AlterTableOption::TwcsMaxActiveWindowFiles(_) => {
|
||||
let max_active_window_files: u64 = rng.random();
|
||||
AlterTableOption::TwcsMaxActiveWindowFiles(max_active_window_files)
|
||||
}
|
||||
AlterTableOption::TwcsMaxActiveWindowRuns(_) => {
|
||||
let max_active_window_runs: u64 = rng.random();
|
||||
AlterTableOption::TwcsMaxActiveWindowRuns(max_active_window_runs)
|
||||
}
|
||||
AlterTableOption::TwcsMaxInactiveWindowFiles(_) => {
|
||||
let max_inactive_window_files: u64 = rng.random();
|
||||
AlterTableOption::TwcsMaxInactiveWindowFiles(max_inactive_window_files)
|
||||
AlterTableOption::TwcsTriggerFileNum(_) => {
|
||||
let trigger_file_num: u64 = rng.random();
|
||||
AlterTableOption::TwcsTriggerFileNum(trigger_file_num)
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
@@ -365,7 +353,7 @@ mod tests {
|
||||
.generate(&mut rng)
|
||||
.unwrap();
|
||||
let serialized = serde_json::to_string(&expr).unwrap();
|
||||
let expected = r#"{"table_name":{"value":"quasi","quote_style":null},"alter_kinds":{"SetTableOptions":{"options":[{"TwcsMaxOutputFileSize":16770910638250818741}]}}}"#;
|
||||
let expected = r#"{"table_name":{"value":"quasi","quote_style":null},"alter_kinds":{"SetTableOptions":{"options":[{"TwcsTimeWindow":{"value":2428665013,"unit":"Millisecond"}}]}}}"#;
|
||||
assert_eq!(expected, serialized);
|
||||
|
||||
let expr = AlterExprUnsetTableOptionsGeneratorBuilder::default()
|
||||
@@ -375,7 +363,7 @@ mod tests {
|
||||
.generate(&mut rng)
|
||||
.unwrap();
|
||||
let serialized = serde_json::to_string(&expr).unwrap();
|
||||
let expected = r#"{"table_name":{"value":"quasi","quote_style":null},"alter_kinds":{"UnsetTableOptions":{"keys":["compaction.twcs.max_active_window_runs","compaction.twcs.max_output_file_size","compaction.twcs.time_window","compaction.twcs.max_inactive_window_files","compaction.twcs.max_active_window_files"]}}}"#;
|
||||
let expected = r#"{"table_name":{"value":"quasi","quote_style":null},"alter_kinds":{"UnsetTableOptions":{"keys":["compaction.twcs.trigger_file_num","compaction.twcs.time_window"]}}}"#;
|
||||
assert_eq!(expected, serialized);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,9 +21,8 @@ use common_time::{Duration, FOREVER, INSTANT};
|
||||
use derive_builder::Builder;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use store_api::mito_engine_options::{
|
||||
APPEND_MODE_KEY, COMPACTION_TYPE, TTL_KEY, TWCS_MAX_ACTIVE_WINDOW_FILES,
|
||||
TWCS_MAX_ACTIVE_WINDOW_RUNS, TWCS_MAX_INACTIVE_WINDOW_FILES, TWCS_MAX_INACTIVE_WINDOW_RUNS,
|
||||
TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW,
|
||||
APPEND_MODE_KEY, COMPACTION_TYPE, TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW,
|
||||
TWCS_TRIGGER_FILE_NUM,
|
||||
};
|
||||
use strum::EnumIter;
|
||||
|
||||
@@ -78,10 +77,7 @@ pub enum AlterTableOption {
|
||||
Ttl(Ttl),
|
||||
TwcsTimeWindow(Duration),
|
||||
TwcsMaxOutputFileSize(ReadableSize),
|
||||
TwcsMaxInactiveWindowFiles(u64),
|
||||
TwcsMaxActiveWindowFiles(u64),
|
||||
TwcsMaxInactiveWindowRuns(u64),
|
||||
TwcsMaxActiveWindowRuns(u64),
|
||||
TwcsTriggerFileNum(u64),
|
||||
}
|
||||
|
||||
impl AlterTableOption {
|
||||
@@ -90,10 +86,7 @@ impl AlterTableOption {
|
||||
AlterTableOption::Ttl(_) => TTL_KEY,
|
||||
AlterTableOption::TwcsTimeWindow(_) => TWCS_TIME_WINDOW,
|
||||
AlterTableOption::TwcsMaxOutputFileSize(_) => TWCS_MAX_OUTPUT_FILE_SIZE,
|
||||
AlterTableOption::TwcsMaxInactiveWindowFiles(_) => TWCS_MAX_INACTIVE_WINDOW_FILES,
|
||||
AlterTableOption::TwcsMaxActiveWindowFiles(_) => TWCS_MAX_ACTIVE_WINDOW_FILES,
|
||||
AlterTableOption::TwcsMaxInactiveWindowRuns(_) => TWCS_MAX_INACTIVE_WINDOW_RUNS,
|
||||
AlterTableOption::TwcsMaxActiveWindowRuns(_) => TWCS_MAX_ACTIVE_WINDOW_RUNS,
|
||||
AlterTableOption::TwcsTriggerFileNum(_) => TWCS_TRIGGER_FILE_NUM,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -111,21 +104,9 @@ impl AlterTableOption {
|
||||
};
|
||||
Ok(AlterTableOption::Ttl(ttl))
|
||||
}
|
||||
TWCS_MAX_ACTIVE_WINDOW_RUNS => {
|
||||
let runs = value.parse().unwrap();
|
||||
Ok(AlterTableOption::TwcsMaxActiveWindowRuns(runs))
|
||||
}
|
||||
TWCS_MAX_ACTIVE_WINDOW_FILES => {
|
||||
TWCS_TRIGGER_FILE_NUM => {
|
||||
let files = value.parse().unwrap();
|
||||
Ok(AlterTableOption::TwcsMaxActiveWindowFiles(files))
|
||||
}
|
||||
TWCS_MAX_INACTIVE_WINDOW_RUNS => {
|
||||
let runs = value.parse().unwrap();
|
||||
Ok(AlterTableOption::TwcsMaxInactiveWindowRuns(runs))
|
||||
}
|
||||
TWCS_MAX_INACTIVE_WINDOW_FILES => {
|
||||
let files = value.parse().unwrap();
|
||||
Ok(AlterTableOption::TwcsMaxInactiveWindowFiles(files))
|
||||
Ok(AlterTableOption::TwcsTriggerFileNum(files))
|
||||
}
|
||||
TWCS_MAX_OUTPUT_FILE_SIZE => {
|
||||
// may be "1M" instead of "1 MiB"
|
||||
@@ -178,17 +159,8 @@ impl Display for AlterTableOption {
|
||||
// Caution: to_string loses precision for ReadableSize
|
||||
write!(f, "'{}' = '{}'", TWCS_MAX_OUTPUT_FILE_SIZE, s)
|
||||
}
|
||||
AlterTableOption::TwcsMaxInactiveWindowFiles(u) => {
|
||||
write!(f, "'{}' = '{}'", TWCS_MAX_INACTIVE_WINDOW_FILES, u)
|
||||
}
|
||||
AlterTableOption::TwcsMaxActiveWindowFiles(u) => {
|
||||
write!(f, "'{}' = '{}'", TWCS_MAX_ACTIVE_WINDOW_FILES, u)
|
||||
}
|
||||
AlterTableOption::TwcsMaxInactiveWindowRuns(u) => {
|
||||
write!(f, "'{}' = '{}'", TWCS_MAX_INACTIVE_WINDOW_RUNS, u)
|
||||
}
|
||||
AlterTableOption::TwcsMaxActiveWindowRuns(u) => {
|
||||
write!(f, "'{}' = '{}'", TWCS_MAX_ACTIVE_WINDOW_RUNS, u)
|
||||
AlterTableOption::TwcsTriggerFileNum(u) => {
|
||||
write!(f, "'{}' = '{}'", TWCS_TRIGGER_FILE_NUM, u)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -212,21 +184,15 @@ mod tests {
|
||||
]
|
||||
);
|
||||
|
||||
let option_string = "compaction.twcs.max_active_window_files = '5030469694939972912',
|
||||
compaction.twcs.max_active_window_runs = '8361168990283879099',
|
||||
compaction.twcs.max_inactive_window_files = '6028716566907830876',
|
||||
compaction.twcs.max_inactive_window_runs = '10622283085591494074',
|
||||
let option_string = "compaction.twcs.trigger_file_num = '5030469694939972912',
|
||||
compaction.twcs.max_output_file_size = '15686.4PiB',
|
||||
compaction.twcs.time_window = '2061999256ms',
|
||||
compaction.type = 'twcs',
|
||||
ttl = '1month 3days 15h 49m 8s 279ms'";
|
||||
let options = AlterTableOption::parse_kv_pairs(option_string).unwrap();
|
||||
assert_eq!(options.len(), 7);
|
||||
assert_eq!(options.len(), 4);
|
||||
let expected = vec![
|
||||
AlterTableOption::TwcsMaxActiveWindowFiles(5030469694939972912),
|
||||
AlterTableOption::TwcsMaxActiveWindowRuns(8361168990283879099),
|
||||
AlterTableOption::TwcsMaxInactiveWindowFiles(6028716566907830876),
|
||||
AlterTableOption::TwcsMaxInactiveWindowRuns(10622283085591494074),
|
||||
AlterTableOption::TwcsTriggerFileNum(5030469694939972912),
|
||||
AlterTableOption::TwcsMaxOutputFileSize(ReadableSize::from_str("15686.4PiB").unwrap()),
|
||||
AlterTableOption::TwcsTimeWindow(Duration::new_nanosecond(2_061_999_256_000_000)),
|
||||
AlterTableOption::Ttl(Ttl::Duration(Duration::new_millisecond(
|
||||
|
||||
@@ -191,10 +191,7 @@ mod tests {
|
||||
AlterTableOption::Ttl(Ttl::Duration(Duration::new_second(60))),
|
||||
AlterTableOption::TwcsTimeWindow(Duration::new_second(60)),
|
||||
AlterTableOption::TwcsMaxOutputFileSize(ReadableSize::from_str("1GB").unwrap()),
|
||||
AlterTableOption::TwcsMaxActiveWindowFiles(10),
|
||||
AlterTableOption::TwcsMaxActiveWindowRuns(10),
|
||||
AlterTableOption::TwcsMaxInactiveWindowFiles(5),
|
||||
AlterTableOption::TwcsMaxInactiveWindowRuns(5),
|
||||
AlterTableOption::TwcsTriggerFileNum(5),
|
||||
],
|
||||
},
|
||||
};
|
||||
@@ -204,10 +201,7 @@ mod tests {
|
||||
"ALTER TABLE test SET 'ttl' = '60s', ",
|
||||
"'compaction.twcs.time_window' = '60s', ",
|
||||
"'compaction.twcs.max_output_file_size' = '1.0GiB', ",
|
||||
"'compaction.twcs.max_active_window_files' = '10', ",
|
||||
"'compaction.twcs.max_active_window_runs' = '10', ",
|
||||
"'compaction.twcs.max_inactive_window_files' = '5', ",
|
||||
"'compaction.twcs.max_inactive_window_runs' = '5';"
|
||||
"'compaction.twcs.trigger_file_num' = '5';"
|
||||
);
|
||||
assert_eq!(expected, output);
|
||||
}
|
||||
|
||||
@@ -187,10 +187,7 @@ mod tests {
|
||||
AlterTableOption::Ttl(Ttl::Duration(Duration::new_second(60))),
|
||||
AlterTableOption::TwcsTimeWindow(Duration::new_second(60)),
|
||||
AlterTableOption::TwcsMaxOutputFileSize(ReadableSize::from_str("1GB").unwrap()),
|
||||
AlterTableOption::TwcsMaxActiveWindowFiles(10),
|
||||
AlterTableOption::TwcsMaxActiveWindowRuns(10),
|
||||
AlterTableOption::TwcsMaxInactiveWindowFiles(5),
|
||||
AlterTableOption::TwcsMaxInactiveWindowRuns(5),
|
||||
AlterTableOption::TwcsTriggerFileNum(10),
|
||||
],
|
||||
},
|
||||
};
|
||||
@@ -200,10 +197,7 @@ mod tests {
|
||||
"ALTER TABLE test SET 'ttl' = '60s', ",
|
||||
"'compaction.twcs.time_window' = '60s', ",
|
||||
"'compaction.twcs.max_output_file_size' = '1.0GiB', ",
|
||||
"'compaction.twcs.max_active_window_files' = '10', ",
|
||||
"'compaction.twcs.max_active_window_runs' = '10', ",
|
||||
"'compaction.twcs.max_inactive_window_files' = '5', ",
|
||||
"'compaction.twcs.max_inactive_window_runs' = '5';"
|
||||
"'compaction.twcs.trigger_file_num' = '10';",
|
||||
);
|
||||
assert_eq!(expected, output);
|
||||
}
|
||||
|
||||
@@ -59,10 +59,10 @@ mod test {
|
||||
let record_batches = create_record_batches(1);
|
||||
test_put_record_batches(&client, record_batches).await;
|
||||
|
||||
let sql = "select ts, a, b from foo order by ts";
|
||||
let sql = "select ts, a, `B` from foo order by ts";
|
||||
let expected = "\
|
||||
+-------------------------+----+----+
|
||||
| ts | a | b |
|
||||
| ts | a | B |
|
||||
+-------------------------+----+----+
|
||||
| 1970-01-01T00:00:00.001 | -1 | s1 |
|
||||
| 1970-01-01T00:00:00.002 | -2 | s2 |
|
||||
@@ -116,10 +116,10 @@ mod test {
|
||||
let record_batches = create_record_batches(1);
|
||||
test_put_record_batches(&client, record_batches).await;
|
||||
|
||||
let sql = "select ts, a, b from foo order by ts";
|
||||
let sql = "select ts, a, `B` from foo order by ts";
|
||||
let expected = "\
|
||||
+-------------------------+----+----+
|
||||
| ts | a | b |
|
||||
| ts | a | B |
|
||||
+-------------------------+----+----+
|
||||
| 1970-01-01T00:00:00.001 | -1 | s1 |
|
||||
| 1970-01-01T00:00:00.002 | -2 | s2 |
|
||||
@@ -192,7 +192,7 @@ mod test {
|
||||
)
|
||||
.with_time_index(true),
|
||||
ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false),
|
||||
ColumnSchema::new("b", ConcreteDataType::string_datatype(), true),
|
||||
ColumnSchema::new("B", ConcreteDataType::string_datatype(), true),
|
||||
]));
|
||||
|
||||
let mut record_batches = Vec::with_capacity(3);
|
||||
@@ -250,7 +250,7 @@ mod test {
|
||||
..Default::default()
|
||||
},
|
||||
ColumnDef {
|
||||
name: "b".to_string(),
|
||||
name: "B".to_string(),
|
||||
data_type: ColumnDataType::String as i32,
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
is_nullable: true,
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user