Compare commits

..

18 Commits

Author SHA1 Message Date
Ruihang Xia
ea0a347edc fix(log-query): panic on prometheus (#5429)
* fix(log-query): panic on prometheus

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix test environment setup

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-01-23 23:03:20 +08:00
Lei, HUANG
4d70589488 fix: avoid suppress manual compaction (#5399)
* fix/avoid-suppress-manual-compaction:
 **Refactor Compaction Logic**

 - Removed `PendingCompaction` struct and integrated its functionality directly into `CompactionStatus` in `compaction.rs`.
 - Simplified waiter management by consolidating waiter handling logic into `CompactionStatus`.
 - Updated `CompactionRequest` creation to directly handle waiters without intermediate structures.
 - Adjusted test cases in `compaction.rs` to align with the new waiter management approach.

(cherry picked from commit 87e2d1c2cc9bd82c02991d22e429bef25c5ee348)

* fix/avoid-suppress-manual-compaction:
 ### Add Support for Manual Compaction Requests

 - **Compaction Logic Enhancements**:
   - Updated `CompactionScheduler` in `compaction.rs` to handle manual compaction requests using `Options::StrictWindow`.
   - Introduced `PendingCompaction` struct to manage pending manual compaction requests.
   - Added logic to reschedule manual compaction requests once the current compaction task is completed.

 - **Testing**:
   - Added `test_manual_compaction_when_compaction_in_progress` to verify the handling of manual compaction requests during ongoing compaction processes.

 These changes enhance the compaction scheduling mechanism by allowing manual compaction requests to be queued and processed efficiently.

(cherry picked from commit bc38ed0f2f8ba2c4690e0d0e251aeb2acce308ca)

* chore: fix conflicts

* fix/avoid-suppress-manual-compaction:
 ### Add Error Handling for Manual Compaction Override

 - **`compaction.rs`**: Enhanced the `set_pending_request` method to handle manual compaction overrides by sending an error to the waiter if a previous request exists.
 - **`error.rs`**: Introduced a new error variant `ManualCompactionOverride` to represent manual compaction being overridden, and mapped it to the `Cancelled` status code.

* fix: format

* fix/avoid-suppress-manual-compaction:
 **Add Error Handling for Pending Compaction Requests**

 - Enhanced error handling in `compaction.rs` by adding logic to handle errors for pending compaction requests.
 - Introduced a mechanism to send errors using `waiter.send` when a pending compaction request fails, ensuring proper error propagation and context with `CompactRegionSnafu`.

* fix/avoid-suppress-manual-compaction:
 **Fix Typo and Simplify Code Logic in `compaction.rs`**

 - Corrected a typo in the license comment from "langucage" to "language".
 - Simplified the logic for handling `pending_compaction` in `CompactionStatus` by removing unnecessary pattern matching and directly accessing `waiter`.

* fix: typo
2025-01-23 19:23:06 +08:00
Yingwen
428f646fa3 feat: overwrites inferred compaction window by region options (#5396)
* feat: use time window in compaction options for compaction window

* test: add tests for overwriting options

* chore: typo

* chore: fix a grammar issue in log
2025-01-23 19:23:06 +08:00
Ruihang Xia
1d1bb83a9f feat: set default compaction parallelism (#5371)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-01-23 19:23:06 +08:00
discord9
27918686d6 fix: handle insert default value (#5307)
* fix: handle flow inserts with default values

* test: sqlness

* chore: typo

* chore: newline

* feat(WIP): impure default filler

* feat: fill impure default values

* test: add test for default fill impure

* feat: check for impure

* fix: also handle stmt to region

* refactor: per review

* refactor: per review

* chore: rebase fix

* chore: clippy

* chore: per review
2025-01-23 19:23:06 +08:00
LFC
0f55afd167 refactor: optimize out partition split insert requests (#5298)
* test: optimize out partition split insert requests if there is only one region

* Now that the optimization for single region insert has been lifted up, the original "fast path" can be obsoleted.

* resolve PR comments
2025-01-23 19:23:06 +08:00
Yiran
ea02ddcde1 ci: automatically bump doc version when release GreptimeDB (#5343)
* ci: automatically bump doc version when release GreptimeDB

* add license header
2025-01-23 15:15:52 +08:00
evenyag
0404e2a132 ci: disable pg kvbackend sqlness test 2025-01-23 15:15:52 +08:00
Ning Sun
7deb559a81 ci: revert coverage runner (#5403) 2025-01-23 15:15:52 +08:00
Ning Sun
c470c6a172 ci: use arm builders for tests (#5395) 2025-01-23 15:15:52 +08:00
Ning Sun
efee2480d2 ci: do not collect coverage from pull request any more (#5364)
* ci: do not collect coverage from pull request any more

* fix: disable toolchain cache

ci: update develop ci

update ci to the version in 121ec7936f on
main branch
2025-01-23 15:15:52 +08:00
Ning Sun
42aaf86c26 ci: disable cache for some tasks, create cache in nightly build (#5324)
* ci: disable cache for some tasks

* ci: add a nightly test to create rust cache on main
2025-01-23 15:15:52 +08:00
Ning Sun
a952ebb2ff ci: use mold for tests (#5319)
* ci: use mold for tests

* ci: enable rust cache saving for merge group
2025-01-23 15:15:52 +08:00
Ning Sun
9a5b904db3 ci: do not trigger tests when there is a merge conflict (#5318)
* ci: do not trigger tests when there is a merge conflict

* Update .github/workflows/develop.yml

Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>

* ci: disable cache from rust toolchain action

---------

Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>
2025-01-23 15:15:52 +08:00
Ning Sun
2e1a5d811a ci: disable docker/rust cache temporarily and merge docker compose files (#5293)
* ci: disable docker cache temporarily and merge docker compose files

* ci: fix compose file name and options

* ci: try to disable rust cache
2025-01-23 15:15:52 +08:00
evenyag
2d5824b3a5 chore: bump version to v0.11.3 2025-01-23 15:15:52 +08:00
Zhenchi
5f67f2b58e fix: matches incorrectly uses byte len as char len (#5411)
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2025-01-23 15:15:52 +08:00
Ruihang Xia
c12fbcda9f fix: panic when received invalid query string (#5366)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2025-01-23 15:15:52 +08:00
47 changed files with 1979 additions and 436 deletions

View File

@@ -1,9 +1,6 @@
name: Check Dependencies
on:
push:
branches:
- main
pull_request:
branches:
- main

View File

@@ -43,7 +43,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ windows-2022, ubuntu-20.04 ]
os: [ ubuntu-20.04 ]
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
@@ -57,6 +57,8 @@ jobs:
# Shares across multiple jobs
# Shares with `Clippy` job
shared-key: "check-lint"
cache-all-crates: "true"
save-if: ${{ github.ref == 'refs/heads/main' }}
- name: Run cargo check
run: cargo check --locked --workspace --all-targets
@@ -67,11 +69,6 @@ jobs:
steps:
- uses: actions/checkout@v4
- uses: actions-rust-lang/setup-rust-toolchain@v1
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
# Shares across multiple jobs
shared-key: "check-toml"
- name: Install taplo
run: cargo +stable install taplo-cli --version ^0.9 --locked --force
- name: Run taplo
@@ -94,13 +91,15 @@ jobs:
with:
# Shares across multiple jobs
shared-key: "build-binaries"
cache-all-crates: "true"
save-if: ${{ github.ref == 'refs/heads/main' }}
- name: Install cargo-gc-bin
shell: bash
run: cargo install cargo-gc-bin --force
- name: Build greptime binaries
shell: bash
# `cargo gc` will invoke `cargo build` with specified args
run: cargo gc -- --bin greptime --bin sqlness-runner
run: cargo gc -- --bin greptime --bin sqlness-runner --features pg_kvbackend
- name: Pack greptime binaries
shell: bash
run: |
@@ -142,11 +141,6 @@ jobs:
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions-rust-lang/setup-rust-toolchain@v1
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
# Shares across multiple jobs
shared-key: "fuzz-test-targets"
- name: Set Rust Fuzz
shell: bash
run: |
@@ -200,11 +194,6 @@ jobs:
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions-rust-lang/setup-rust-toolchain@v1
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
# Shares across multiple jobs
shared-key: "fuzz-test-targets"
- name: Set Rust Fuzz
shell: bash
run: |
@@ -255,13 +244,15 @@ jobs:
with:
# Shares across multiple jobs
shared-key: "build-greptime-ci"
cache-all-crates: "true"
save-if: ${{ github.ref == 'refs/heads/main' }}
- name: Install cargo-gc-bin
shell: bash
run: cargo install cargo-gc-bin --force
- name: Build greptime bianry
shell: bash
# `cargo gc` will invoke `cargo build` with specified args
run: cargo gc --profile ci -- --bin greptime
run: cargo gc --profile ci -- --bin greptime --features pg_kvbackend
- name: Pack greptime binary
shell: bash
run: |
@@ -317,11 +308,6 @@ jobs:
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions-rust-lang/setup-rust-toolchain@v1
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
# Shares across multiple jobs
shared-key: "fuzz-test-targets"
- name: Set Rust Fuzz
shell: bash
run: |
@@ -466,11 +452,6 @@ jobs:
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions-rust-lang/setup-rust-toolchain@v1
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
# Shares across multiple jobs
shared-key: "fuzz-test-targets"
- name: Set Rust Fuzz
shell: bash
run: |
@@ -578,8 +559,8 @@ jobs:
- uses: actions/checkout@v4
- if: matrix.mode.kafka
name: Setup kafka server
working-directory: tests-integration/fixtures/kafka
run: docker compose -f docker-compose-standalone.yml up -d --wait
working-directory: tests-integration/fixtures
run: docker compose up -d --wait kafka
- name: Download pre-built binaries
uses: actions/download-artifact@v4
with:
@@ -609,11 +590,6 @@ jobs:
- uses: actions-rust-lang/setup-rust-toolchain@v1
with:
components: rustfmt
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
# Shares across multiple jobs
shared-key: "check-rust-fmt"
- name: Check format
run: make fmt-check
@@ -635,55 +611,99 @@ jobs:
# Shares across multiple jobs
# Shares with `Check` job
shared-key: "check-lint"
cache-all-crates: "true"
save-if: ${{ github.ref == 'refs/heads/main' }}
- name: Run cargo clippy
run: make clippy
coverage:
if: github.event.pull_request.draft == false
runs-on: ubuntu-20.04-8-cores
conflict-check:
name: Check for conflict
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Merge Conflict Finder
uses: olivernybroe/action-conflict-finder@v4.0
test:
if: github.event_name != 'merge_group'
runs-on: ubuntu-24.04-arm
timeout-minutes: 60
needs: [clippy, fmt]
needs: [conflict-check, clippy, fmt]
steps:
- uses: actions/checkout@v4
- uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: KyleMayes/install-llvm-action@v1
with:
version: "14.0"
- uses: rui314/setup-mold@v1
- name: Install toolchain
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
components: llvm-tools-preview
cache: false
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
# Shares cross multiple jobs
shared-key: "coverage-test"
- name: Docker Cache
uses: ScribeMD/docker-cache@0.3.7
cache-all-crates: "true"
save-if: ${{ github.ref == 'refs/heads/main' }}
- name: Install latest nextest release
uses: taiki-e/install-action@nextest
- name: Setup external services
working-directory: tests-integration/fixtures
run: docker compose up -d --wait
- name: Run nextest cases
run: cargo nextest run --workspace -F dashboard -F pg_kvbackend
env:
CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=mold"
RUST_BACKTRACE: 1
CARGO_INCREMENTAL: 0
GT_S3_BUCKET: ${{ vars.AWS_CI_TEST_BUCKET }}
GT_S3_ACCESS_KEY_ID: ${{ secrets.AWS_CI_TEST_ACCESS_KEY_ID }}
GT_S3_ACCESS_KEY: ${{ secrets.AWS_CI_TEST_SECRET_ACCESS_KEY }}
GT_S3_REGION: ${{ vars.AWS_CI_TEST_BUCKET_REGION }}
GT_MINIO_BUCKET: greptime
GT_MINIO_ACCESS_KEY_ID: superpower_ci_user
GT_MINIO_ACCESS_KEY: superpower_password
GT_MINIO_REGION: us-west-2
GT_MINIO_ENDPOINT_URL: http://127.0.0.1:9000
GT_ETCD_ENDPOINTS: http://127.0.0.1:2379
GT_POSTGRES_ENDPOINTS: postgres://greptimedb:admin@127.0.0.1:5432/postgres
GT_KAFKA_ENDPOINTS: 127.0.0.1:9092
GT_KAFKA_SASL_ENDPOINTS: 127.0.0.1:9093
UNITTEST_LOG_DIR: "__unittest_logs"
coverage:
if: github.event_name == 'merge_group'
runs-on: ubuntu-20.04-8-cores
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
- uses: arduino/setup-protoc@v3
with:
key: docker-${{ runner.os }}-coverage
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: rui314/setup-mold@v1
- name: Install toolchain
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
components: llvm-tools
cache: false
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
# Shares cross multiple jobs
shared-key: "coverage-test"
save-if: ${{ github.ref == 'refs/heads/main' }}
- name: Install latest nextest release
uses: taiki-e/install-action@nextest
- name: Install cargo-llvm-cov
uses: taiki-e/install-action@cargo-llvm-cov
- name: Setup etcd server
working-directory: tests-integration/fixtures/etcd
run: docker compose -f docker-compose-standalone.yml up -d --wait
- name: Setup kafka server
working-directory: tests-integration/fixtures/kafka
run: docker compose -f docker-compose-standalone.yml up -d --wait
- name: Setup minio
working-directory: tests-integration/fixtures/minio
run: docker compose -f docker-compose-standalone.yml up -d --wait
- name: Setup postgres server
working-directory: tests-integration/fixtures/postgres
run: docker compose -f docker-compose-standalone.yml up -d --wait
- name: Setup external services
working-directory: tests-integration/fixtures
run: docker compose up -d --wait
- name: Run nextest cases
run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F dashboard -F pg_kvbackend
env:
CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=lld"
CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=mold"
RUST_BACKTRACE: 1
CARGO_INCREMENTAL: 0
GT_S3_BUCKET: ${{ vars.AWS_CI_TEST_BUCKET }}

View File

@@ -108,7 +108,53 @@ jobs:
GT_S3_REGION: ${{ vars.AWS_CI_TEST_BUCKET_REGION }}
UNITTEST_LOG_DIR: "__unittest_logs"
## this is designed for generating cache that usable for pull requests
test-on-linux:
name: Run tests on Linux
if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
runs-on: ubuntu-20.04-8-cores
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
- uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: rui314/setup-mold@v1
- name: Install Rust toolchain
uses: actions-rust-lang/setup-rust-toolchain@v1
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
# Shares cross multiple jobs
shared-key: "coverage-test"
- name: Install Cargo Nextest
uses: taiki-e/install-action@nextest
- name: Setup external services
working-directory: tests-integration/fixtures
run: docker compose up -d --wait
- name: Running tests
run: cargo nextest run -F dashboard -F pg_kvbackend
env:
CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=mold"
RUST_BACKTRACE: 1
CARGO_INCREMENTAL: 0
GT_S3_BUCKET: ${{ vars.AWS_CI_TEST_BUCKET }}
GT_S3_ACCESS_KEY_ID: ${{ secrets.AWS_CI_TEST_ACCESS_KEY_ID }}
GT_S3_ACCESS_KEY: ${{ secrets.AWS_CI_TEST_SECRET_ACCESS_KEY }}
GT_S3_REGION: ${{ vars.AWS_CI_TEST_BUCKET_REGION }}
GT_MINIO_BUCKET: greptime
GT_MINIO_ACCESS_KEY_ID: superpower_ci_user
GT_MINIO_ACCESS_KEY: superpower_password
GT_MINIO_REGION: us-west-2
GT_MINIO_ENDPOINT_URL: http://127.0.0.1:9000
GT_ETCD_ENDPOINTS: http://127.0.0.1:2379
GT_POSTGRES_ENDPOINTS: postgres://greptimedb:admin@127.0.0.1:5432/postgres
GT_KAFKA_ENDPOINTS: 127.0.0.1:9092
GT_KAFKA_SASL_ENDPOINTS: 127.0.0.1:9093
UNITTEST_LOG_DIR: "__unittest_logs"
cleanbuild-linux-nix:
name: Run clean build on Linux
runs-on: ubuntu-latest-8-cores
timeout-minutes: 60
steps:

View File

@@ -436,6 +436,22 @@ jobs:
aws-region: ${{ vars.EC2_RUNNER_REGION }}
github-token: ${{ secrets.GH_PERSONAL_ACCESS_TOKEN }}
bump-doc-version:
name: Bump doc version
if: ${{ github.event_name == 'push' || github.event_name == 'schedule' }}
needs: [allocate-runners]
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v4
- uses: ./.github/actions/setup-cyborg
- name: Bump doc version
working-directory: cyborg
run: pnpm tsx bin/bump-doc-version.ts
env:
VERSION: ${{ needs.allocate-runners.outputs.version }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
DOCS_REPO_TOKEN: ${{ secrets.DOCS_REPO_TOKEN }}
notification:
if: ${{ github.repository == 'GreptimeTeam/greptimedb' && (github.event_name == 'push' || github.event_name == 'schedule') && always() }}
name: Send notification to Greptime team

145
Cargo.lock generated
View File

@@ -188,7 +188,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
[[package]]
name = "api"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"common-base",
"common-decimal",
@@ -773,7 +773,7 @@ dependencies = [
[[package]]
name = "auth"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"api",
"async-trait",
@@ -1314,7 +1314,7 @@ dependencies = [
[[package]]
name = "cache"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"catalog",
"common-error",
@@ -1348,7 +1348,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "catalog"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"api",
"arrow",
@@ -1684,7 +1684,7 @@ checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97"
[[package]]
name = "cli"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"async-trait",
"auth",
@@ -1727,7 +1727,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"store-api",
"substrait 0.11.2",
"substrait 0.11.3",
"table",
"tempfile",
"tokio",
@@ -1736,7 +1736,7 @@ dependencies = [
[[package]]
name = "client"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"api",
"arc-swap",
@@ -1763,7 +1763,7 @@ dependencies = [
"rand",
"serde_json",
"snafu 0.8.5",
"substrait 0.11.2",
"substrait 0.11.3",
"substrait 0.37.3",
"tokio",
"tokio-stream",
@@ -1804,7 +1804,7 @@ dependencies = [
[[package]]
name = "cmd"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"async-trait",
"auth",
@@ -1864,7 +1864,7 @@ dependencies = [
"similar-asserts",
"snafu 0.8.5",
"store-api",
"substrait 0.11.2",
"substrait 0.11.3",
"table",
"temp-env",
"tempfile",
@@ -1916,7 +1916,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
[[package]]
name = "common-base"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"anymap2",
"async-trait",
@@ -1938,11 +1938,11 @@ dependencies = [
[[package]]
name = "common-catalog"
version = "0.11.2"
version = "0.11.3"
[[package]]
name = "common-config"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"common-base",
"common-error",
@@ -1965,7 +1965,7 @@ dependencies = [
[[package]]
name = "common-datasource"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"arrow",
"arrow-schema",
@@ -2001,7 +2001,7 @@ dependencies = [
[[package]]
name = "common-decimal"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"bigdecimal 0.4.5",
"common-error",
@@ -2014,7 +2014,7 @@ dependencies = [
[[package]]
name = "common-error"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"http 0.2.12",
"snafu 0.8.5",
@@ -2024,7 +2024,7 @@ dependencies = [
[[package]]
name = "common-frontend"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"async-trait",
"common-error",
@@ -2034,7 +2034,7 @@ dependencies = [
[[package]]
name = "common-function"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"api",
"approx 0.5.1",
@@ -2078,7 +2078,7 @@ dependencies = [
[[package]]
name = "common-greptimedb-telemetry"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"async-trait",
"common-runtime",
@@ -2095,7 +2095,7 @@ dependencies = [
[[package]]
name = "common-grpc"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"api",
"arrow-flight",
@@ -2121,7 +2121,7 @@ dependencies = [
[[package]]
name = "common-grpc-expr"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"api",
"common-base",
@@ -2140,7 +2140,7 @@ dependencies = [
[[package]]
name = "common-macro"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"arc-swap",
"common-query",
@@ -2154,7 +2154,7 @@ dependencies = [
[[package]]
name = "common-mem-prof"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"common-error",
"common-macro",
@@ -2167,7 +2167,7 @@ dependencies = [
[[package]]
name = "common-meta"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"anymap2",
"api",
@@ -2224,7 +2224,7 @@ dependencies = [
[[package]]
name = "common-options"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"common-grpc",
"humantime-serde",
@@ -2233,11 +2233,11 @@ dependencies = [
[[package]]
name = "common-plugins"
version = "0.11.2"
version = "0.11.3"
[[package]]
name = "common-pprof"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"common-error",
"common-macro",
@@ -2249,7 +2249,7 @@ dependencies = [
[[package]]
name = "common-procedure"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"async-stream",
"async-trait",
@@ -2276,7 +2276,7 @@ dependencies = [
[[package]]
name = "common-procedure-test"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"async-trait",
"common-procedure",
@@ -2284,7 +2284,7 @@ dependencies = [
[[package]]
name = "common-query"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"api",
"async-trait",
@@ -2310,7 +2310,7 @@ dependencies = [
[[package]]
name = "common-recordbatch"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"arc-swap",
"common-error",
@@ -2329,7 +2329,7 @@ dependencies = [
[[package]]
name = "common-runtime"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -2359,7 +2359,7 @@ dependencies = [
[[package]]
name = "common-telemetry"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"atty",
"backtrace",
@@ -2387,7 +2387,7 @@ dependencies = [
[[package]]
name = "common-test-util"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"client",
"common-query",
@@ -2399,7 +2399,7 @@ dependencies = [
[[package]]
name = "common-time"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"arrow",
"chrono",
@@ -2417,7 +2417,7 @@ dependencies = [
[[package]]
name = "common-version"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"build-data",
"const_format",
@@ -2427,7 +2427,7 @@ dependencies = [
[[package]]
name = "common-wal"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"common-base",
"common-error",
@@ -3226,7 +3226,7 @@ dependencies = [
[[package]]
name = "datanode"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"api",
"arrow-flight",
@@ -3277,7 +3277,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"store-api",
"substrait 0.11.2",
"substrait 0.11.3",
"table",
"tokio",
"toml 0.8.19",
@@ -3286,7 +3286,7 @@ dependencies = [
[[package]]
name = "datatypes"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"arrow",
"arrow-array",
@@ -3910,7 +3910,7 @@ dependencies = [
[[package]]
name = "file-engine"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"api",
"async-trait",
@@ -4026,7 +4026,7 @@ checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8"
[[package]]
name = "flow"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"api",
"arrow",
@@ -4085,7 +4085,7 @@ dependencies = [
"snafu 0.8.5",
"store-api",
"strum 0.25.0",
"substrait 0.11.2",
"substrait 0.11.3",
"table",
"tokio",
"tonic 0.11.0",
@@ -4123,7 +4123,7 @@ checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
[[package]]
name = "frontend"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"api",
"arc-swap",
@@ -5273,7 +5273,7 @@ dependencies = [
[[package]]
name = "index"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"async-trait",
"asynchronous-codec",
@@ -6123,7 +6123,7 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
[[package]]
name = "log-query"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"chrono",
"common-error",
@@ -6135,7 +6135,7 @@ dependencies = [
[[package]]
name = "log-store"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"async-stream",
"async-trait",
@@ -6479,7 +6479,7 @@ dependencies = [
[[package]]
name = "meta-client"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"api",
"async-trait",
@@ -6506,7 +6506,7 @@ dependencies = [
[[package]]
name = "meta-srv"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"api",
"async-trait",
@@ -6585,7 +6585,7 @@ dependencies = [
[[package]]
name = "metric-engine"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"api",
"aquamarine",
@@ -6679,7 +6679,7 @@ dependencies = [
[[package]]
name = "mito2"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"api",
"aquamarine",
@@ -7416,7 +7416,7 @@ dependencies = [
[[package]]
name = "object-store"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"anyhow",
"bytes",
@@ -7669,7 +7669,7 @@ dependencies = [
[[package]]
name = "operator"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"ahash 0.8.11",
"api",
@@ -7717,7 +7717,7 @@ dependencies = [
"sql",
"sqlparser 0.45.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=54a267ac89c09b11c0c88934690530807185d3e7)",
"store-api",
"substrait 0.11.2",
"substrait 0.11.3",
"table",
"tokio",
"tokio-util",
@@ -7967,7 +7967,7 @@ dependencies = [
[[package]]
name = "partition"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"api",
"async-trait",
@@ -8253,7 +8253,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pipeline"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"ahash 0.8.11",
"api",
@@ -8415,7 +8415,7 @@ dependencies = [
[[package]]
name = "plugins"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"auth",
"clap 4.5.19",
@@ -8703,7 +8703,7 @@ dependencies = [
[[package]]
name = "promql"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"ahash 0.8.11",
"async-trait",
@@ -8938,7 +8938,7 @@ dependencies = [
[[package]]
name = "puffin"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"async-compression 0.4.13",
"async-trait",
@@ -9063,7 +9063,7 @@ dependencies = [
[[package]]
name = "query"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"ahash 0.8.11",
"api",
@@ -9128,7 +9128,7 @@ dependencies = [
"sqlparser 0.45.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=54a267ac89c09b11c0c88934690530807185d3e7)",
"statrs",
"store-api",
"substrait 0.11.2",
"substrait 0.11.3",
"table",
"tokio",
"tokio-stream",
@@ -10612,7 +10612,7 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "script"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"api",
"arc-swap",
@@ -10904,7 +10904,7 @@ dependencies = [
[[package]]
name = "servers"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"ahash 0.8.11",
"api",
@@ -11016,7 +11016,7 @@ dependencies = [
[[package]]
name = "session"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"api",
"arc-swap",
@@ -11370,7 +11370,7 @@ dependencies = [
[[package]]
name = "sql"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"api",
"chrono",
@@ -11434,7 +11434,7 @@ dependencies = [
[[package]]
name = "sqlness-runner"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -11652,7 +11652,7 @@ dependencies = [
[[package]]
name = "store-api"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"api",
"aquamarine",
@@ -11814,7 +11814,7 @@ dependencies = [
[[package]]
name = "substrait"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"async-trait",
"bytes",
@@ -12013,7 +12013,7 @@ dependencies = [
[[package]]
name = "table"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"api",
"async-trait",
@@ -12290,7 +12290,7 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
[[package]]
name = "tests-fuzz"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"arbitrary",
"async-trait",
@@ -12333,7 +12333,7 @@ dependencies = [
[[package]]
name = "tests-integration"
version = "0.11.2"
version = "0.11.3"
dependencies = [
"api",
"arrow-flight",
@@ -12373,6 +12373,7 @@ dependencies = [
"futures-util",
"hex",
"itertools 0.10.5",
"log-query",
"loki-api",
"meta-client",
"meta-srv",
@@ -12397,7 +12398,7 @@ dependencies = [
"sql",
"sqlx",
"store-api",
"substrait 0.11.2",
"substrait 0.11.3",
"table",
"tempfile",
"time",

View File

@@ -68,7 +68,7 @@ members = [
resolver = "2"
[workspace.package]
version = "0.11.2"
version = "0.11.3"
edition = "2021"
license = "Apache-2.0"

View File

@@ -0,0 +1,75 @@
/*
* Copyright 2023 Greptime Team
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import * as core from "@actions/core";
import {obtainClient} from "@/common";
async function triggerWorkflow(workflowId: string, version: string) {
const docsClient = obtainClient("DOCS_REPO_TOKEN")
try {
await docsClient.rest.actions.createWorkflowDispatch({
owner: "GreptimeTeam",
repo: "docs",
workflow_id: workflowId,
ref: "main",
inputs: {
version,
},
});
console.log(`Successfully triggered ${workflowId} workflow with version ${version}`);
} catch (error) {
core.setFailed(`Failed to trigger workflow: ${error.message}`);
}
}
function determineWorkflow(version: string): [string, string] {
// Check if it's a nightly version
if (version.includes('nightly')) {
return ['bump-nightly-version.yml', version];
}
const parts = version.split('.');
if (parts.length !== 3) {
throw new Error('Invalid version format');
}
// If patch version (last number) is 0, it's a major version
// Return only major.minor version
if (parts[2] === '0') {
return ['bump-version.yml', `${parts[0]}.${parts[1]}`];
}
// Otherwise it's a patch version, use full version
return ['bump-patch-version.yml', version];
}
const version = process.env.VERSION;
if (!version) {
core.setFailed("VERSION environment variable is required");
process.exit(1);
}
// Remove 'v' prefix if exists
const cleanVersion = version.startsWith('v') ? version.slice(1) : version;
try {
const [workflowId, apiVersion] = determineWorkflow(cleanVersion);
triggerWorkflow(workflowId, apiVersion);
} catch (error) {
core.setFailed(`Error processing version: ${error.message}`);
process.exit(1);
}

View File

@@ -1,3 +1,3 @@
[toolchain]
channel = "nightly-2024-10-19"
components = ["rust-analyzer"]
components = ["rust-analyzer", "llvm-tools"]

View File

@@ -11,11 +11,13 @@ pkgs.mkShell rec {
clang
gcc
protobuf
gnumake
mold
(fenix.fromToolchainFile {
dir = ./.;
})
cargo-nextest
cargo-llvm-cov
taplo
curl
];

View File

@@ -725,7 +725,8 @@ struct Tokenizer {
impl Tokenizer {
pub fn tokenize(mut self, pattern: &str) -> Result<Vec<Token>> {
let mut tokens = vec![];
while self.cursor < pattern.len() {
let char_len = pattern.chars().count();
while self.cursor < char_len {
// TODO: collect pattern into Vec<char> if this tokenizer is bottleneck in the future
let c = pattern.chars().nth(self.cursor).unwrap();
match c {
@@ -794,7 +795,8 @@ impl Tokenizer {
let mut phase = String::new();
let mut is_quote_present = false;
while self.cursor < pattern.len() {
let char_len = pattern.chars().count();
while self.cursor < char_len {
let mut c = pattern.chars().nth(self.cursor).unwrap();
match c {
@@ -899,6 +901,26 @@ mod test {
Phase("c".to_string()),
],
),
(
r#"中文 测试"#,
vec![Phase("中文".to_string()), Phase("测试".to_string())],
),
(
r#"中文 AND 测试"#,
vec![Phase("中文".to_string()), And, Phase("测试".to_string())],
),
(
r#"中文 +测试"#,
vec![Phase("中文".to_string()), Must, Phase("测试".to_string())],
),
(
r#"中文 -测试"#,
vec![
Phase("中文".to_string()),
Negative,
Phase("测试".to_string()),
],
),
];
for (query, expected) in cases {
@@ -1030,6 +1052,61 @@ mod test {
],
},
),
(
r#"中文 测试"#,
PatternAst::Binary {
op: BinaryOp::Or,
children: vec![
PatternAst::Literal {
op: UnaryOp::Optional,
pattern: "中文".to_string(),
},
PatternAst::Literal {
op: UnaryOp::Optional,
pattern: "测试".to_string(),
},
],
},
),
(
r#"中文 AND 测试"#,
PatternAst::Binary {
op: BinaryOp::And,
children: vec![
PatternAst::Literal {
op: UnaryOp::Optional,
pattern: "中文".to_string(),
},
PatternAst::Literal {
op: UnaryOp::Optional,
pattern: "测试".to_string(),
},
],
},
),
(
r#"中文 +测试"#,
PatternAst::Literal {
op: UnaryOp::Must,
pattern: "测试".to_string(),
},
),
(
r#"中文 -测试"#,
PatternAst::Binary {
op: BinaryOp::And,
children: vec![
PatternAst::Literal {
op: UnaryOp::Negative,
pattern: "测试".to_string(),
},
PatternAst::Literal {
op: UnaryOp::Optional,
pattern: "中文".to_string(),
},
],
},
),
];
for (query, expected) in cases {

View File

@@ -123,6 +123,14 @@ impl ColumnSchema {
self.default_constraint.as_ref()
}
/// Check if the default constraint is a impure function.
pub fn is_default_impure(&self) -> bool {
self.default_constraint
.as_ref()
.map(|c| c.is_function())
.unwrap_or(false)
}
#[inline]
pub fn metadata(&self) -> &Metadata {
&self.metadata
@@ -283,6 +291,15 @@ impl ColumnSchema {
}
}
/// Creates an impure default value for this column, only if it have a impure default constraint.
/// Otherwise, returns `Ok(None)`.
pub fn create_impure_default(&self) -> Result<Option<Value>> {
match &self.default_constraint {
Some(c) => c.create_impure_default(&self.data_type),
None => Ok(None),
}
}
/// Retrieves the fulltext options for the column.
pub fn fulltext_options(&self) -> Result<Option<FulltextOptions>> {
match self.metadata.get(FULLTEXT_KEY) {

View File

@@ -178,12 +178,63 @@ impl ColumnDefaultConstraint {
}
}
/// Only create default vector if it's impure, i.e., it's a function.
///
/// This helps to delay creating constant default values to mito engine while also keeps impure default have consistent values
pub fn create_impure_default_vector(
&self,
data_type: &ConcreteDataType,
num_rows: usize,
) -> Result<Option<VectorRef>> {
assert!(num_rows > 0);
match self {
ColumnDefaultConstraint::Function(expr) => {
// Functions should also ensure its return value is not null when
// is_nullable is true.
match &expr[..] {
// TODO(dennis): we only supports current_timestamp right now,
// it's better to use a expression framework in future.
CURRENT_TIMESTAMP | CURRENT_TIMESTAMP_FN | NOW_FN => {
create_current_timestamp_vector(data_type, num_rows).map(Some)
}
_ => error::UnsupportedDefaultExprSnafu { expr }.fail(),
}
}
ColumnDefaultConstraint::Value(_) => Ok(None),
}
}
/// Only create default value if it's impure, i.e., it's a function.
///
/// This helps to delay creating constant default values to mito engine while also keeps impure default have consistent values
pub fn create_impure_default(&self, data_type: &ConcreteDataType) -> Result<Option<Value>> {
match self {
ColumnDefaultConstraint::Function(expr) => {
// Functions should also ensure its return value is not null when
// is_nullable is true.
match &expr[..] {
CURRENT_TIMESTAMP | CURRENT_TIMESTAMP_FN | NOW_FN => {
create_current_timestamp(data_type).map(Some)
}
_ => error::UnsupportedDefaultExprSnafu { expr }.fail(),
}
}
ColumnDefaultConstraint::Value(_) => Ok(None),
}
}
/// Returns true if this constraint might creates NULL.
fn maybe_null(&self) -> bool {
// Once we support more functions, we may return true if given function
// could return null.
matches!(self, ColumnDefaultConstraint::Value(Value::Null))
}
/// Returns true if this constraint is a function.
pub fn is_function(&self) -> bool {
matches!(self, ColumnDefaultConstraint::Function(_))
}
}
fn create_current_timestamp(data_type: &ConcreteDataType) -> Result<Value> {

View File

@@ -24,6 +24,7 @@ use common_error::ext::BoxedError;
use common_meta::error::{ExternalSnafu, Result, UnexpectedSnafu};
use common_meta::node_manager::Flownode;
use common_telemetry::{debug, trace};
use datatypes::value::Value;
use itertools::Itertools;
use snafu::{IntoError, OptionExt, ResultExt};
use store_api::storage::RegionId;
@@ -178,14 +179,32 @@ impl Flownode for FlowWorkerManager {
.table_from_id(&table_id)
.await
.map_err(to_meta_err(snafu::location!()))?;
let default_vals = table_schema
.default_values
.iter()
.zip(table_schema.relation_desc.typ().column_types.iter())
.map(|(v, ty)| {
v.as_ref().and_then(|v| {
match v.create_default(ty.scalar_type(), ty.nullable()) {
Ok(v) => Some(v),
Err(err) => {
common_telemetry::error!(err; "Failed to create default value");
None
}
}
})
})
.collect_vec();
let table_types = table_schema
.relation_desc
.typ()
.column_types
.clone()
.into_iter()
.map(|t| t.scalar_type)
.collect_vec();
let table_col_names = table_schema.names;
let table_col_names = table_schema.relation_desc.names;
let table_col_names = table_col_names
.iter().enumerate()
.map(|(idx,name)| match name {
@@ -202,31 +221,35 @@ impl Flownode for FlowWorkerManager {
.enumerate()
.map(|(i, name)| (&name.column_name, i)),
);
let fetch_order: Vec<usize> = table_col_names
let fetch_order: Vec<FetchFromRow> = table_col_names
.iter()
.map(|col_name| {
.zip(default_vals.into_iter())
.map(|(col_name, col_default_val)| {
name_to_col
.get(col_name)
.copied()
.map(FetchFromRow::Idx)
.or_else(|| col_default_val.clone().map(FetchFromRow::Default))
.with_context(|| UnexpectedSnafu {
err_msg: format!("Column not found: {}", col_name),
err_msg: format!(
"Column not found: {}, default_value: {:?}",
col_name, col_default_val
),
})
})
.try_collect()?;
if !fetch_order.iter().enumerate().all(|(i, &v)| i == v) {
trace!("Reordering columns: {:?}", fetch_order)
}
trace!("Reordering columns: {:?}", fetch_order);
(table_types, fetch_order)
};
// TODO(discord9): use column instead of row
let rows: Vec<DiffRow> = rows_proto
.into_iter()
.map(|r| {
let r = repr::Row::from(r);
let reordered = fetch_order
.iter()
.map(|&i| r.inner[i].clone())
.collect_vec();
let reordered = fetch_order.iter().map(|i| i.fetch(&r)).collect_vec();
repr::Row::new(reordered)
})
.map(|r| (r, now, 1))
@@ -258,3 +281,20 @@ impl Flownode for FlowWorkerManager {
Ok(Default::default())
}
}
/// Simple helper enum for fetching value from row with default value
#[derive(Debug, Clone)]
enum FetchFromRow {
Idx(usize),
Default(Value),
}
impl FetchFromRow {
/// Panic if idx is out of bound
fn fetch(&self, row: &repr::Row) -> Value {
match self {
FetchFromRow::Idx(idx) => row.get(*idx).unwrap().clone(),
FetchFromRow::Default(v) => v.clone(),
}
}
}

View File

@@ -313,7 +313,7 @@ impl FlownodeContext {
name: name.join("."),
})?;
let schema = self.table_source.table(name).await?;
Ok((id, schema))
Ok((id, schema.relation_desc))
}
/// Assign a global id to a table, if already assigned, return the existing global id

View File

@@ -17,6 +17,8 @@
use common_error::ext::BoxedError;
use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
use common_meta::key::table_name::{TableNameKey, TableNameManager};
use datatypes::schema::ColumnDefaultConstraint;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use table::metadata::TableId;
@@ -27,6 +29,32 @@ use crate::error::{
};
use crate::repr::RelationDesc;
/// Table description, include relation desc and default values, which is the minimal information flow needed for table
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TableDesc {
pub relation_desc: RelationDesc,
pub default_values: Vec<Option<ColumnDefaultConstraint>>,
}
impl TableDesc {
pub fn new(
relation_desc: RelationDesc,
default_values: Vec<Option<ColumnDefaultConstraint>>,
) -> Self {
Self {
relation_desc,
default_values,
}
}
pub fn new_no_default(relation_desc: RelationDesc) -> Self {
Self {
relation_desc,
default_values: vec![],
}
}
}
/// Table source but for flow, provide table schema by table name/id
#[async_trait::async_trait]
pub trait FlowTableSource: Send + Sync + std::fmt::Debug {
@@ -34,11 +62,11 @@ pub trait FlowTableSource: Send + Sync + std::fmt::Debug {
async fn table_id_from_name(&self, name: &TableName) -> Result<TableId, Error>;
/// Get the table schema by table name
async fn table(&self, name: &TableName) -> Result<RelationDesc, Error> {
async fn table(&self, name: &TableName) -> Result<TableDesc, Error> {
let id = self.table_id_from_name(name).await?;
self.table_from_id(&id).await
}
async fn table_from_id(&self, table_id: &TableId) -> Result<RelationDesc, Error>;
async fn table_from_id(&self, table_id: &TableId) -> Result<TableDesc, Error>;
}
/// managed table source information, query from table info manager and table name manager
@@ -51,7 +79,7 @@ pub struct ManagedTableSource {
#[async_trait::async_trait]
impl FlowTableSource for ManagedTableSource {
async fn table_from_id(&self, table_id: &TableId) -> Result<RelationDesc, Error> {
async fn table_from_id(&self, table_id: &TableId) -> Result<TableDesc, Error> {
let table_info_value = self
.get_table_info_value(table_id)
.await?
@@ -150,7 +178,7 @@ impl ManagedTableSource {
pub async fn get_table_name_schema(
&self,
table_id: &TableId,
) -> Result<(TableName, RelationDesc), Error> {
) -> Result<(TableName, TableDesc), Error> {
let table_info_value = self
.get_table_info_value(table_id)
.await?
@@ -186,7 +214,7 @@ pub(crate) mod test {
use crate::repr::{ColumnType, RelationType};
pub struct FlowDummyTableSource {
pub id_names_to_desc: Vec<(TableId, TableName, RelationDesc)>,
pub id_names_to_desc: Vec<(TableId, TableName, TableDesc)>,
id_to_idx: HashMap<TableId, usize>,
name_to_idx: HashMap<TableName, usize>,
}
@@ -201,8 +229,10 @@ pub(crate) mod test {
"public".to_string(),
"numbers".to_string(),
],
RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)])
.into_named(vec![Some("number".to_string())]),
TableDesc::new_no_default(
RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)])
.into_named(vec![Some("number".to_string())]),
),
),
(
1025,
@@ -211,11 +241,13 @@ pub(crate) mod test {
"public".to_string(),
"numbers_with_ts".to_string(),
],
RelationType::new(vec![
ColumnType::new(CDT::uint32_datatype(), false),
ColumnType::new(CDT::timestamp_millisecond_datatype(), false),
])
.into_named(vec![Some("number".to_string()), Some("ts".to_string())]),
TableDesc::new_no_default(
RelationType::new(vec![
ColumnType::new(CDT::uint32_datatype(), false),
ColumnType::new(CDT::timestamp_millisecond_datatype(), false),
])
.into_named(vec![Some("number".to_string()), Some("ts".to_string())]),
),
),
];
let id_to_idx = id_names_to_desc
@@ -238,7 +270,7 @@ pub(crate) mod test {
#[async_trait::async_trait]
impl FlowTableSource for FlowDummyTableSource {
async fn table_from_id(&self, table_id: &TableId) -> Result<RelationDesc, Error> {
async fn table_from_id(&self, table_id: &TableId) -> Result<TableDesc, Error> {
let idx = self.id_to_idx.get(table_id).context(TableNotFoundSnafu {
name: format!("Table id = {:?}, couldn't found table desc", table_id),
})?;

View File

@@ -27,6 +27,7 @@ use session::context::QueryContextBuilder;
use snafu::{OptionExt, ResultExt};
use table::table_reference::TableReference;
use crate::adapter::table_source::TableDesc;
use crate::adapter::{TableName, AUTO_CREATED_PLACEHOLDER_TS_COL};
use crate::error::{Error, ExternalSnafu, UnexpectedSnafu};
use crate::repr::{ColumnType, RelationDesc, RelationType};
@@ -126,7 +127,7 @@ impl FlowWorkerManager {
pub fn table_info_value_to_relation_desc(
table_info_value: TableInfoValue,
) -> Result<RelationDesc, Error> {
) -> Result<TableDesc, Error> {
let raw_schema = table_info_value.table_info.meta.schema;
let (column_types, col_names): (Vec<_>, Vec<_>) = raw_schema
.column_schemas
@@ -147,8 +148,7 @@ pub fn table_info_value_to_relation_desc(
let keys = vec![crate::repr::Key::from(key)];
let time_index = raw_schema.timestamp_index;
Ok(RelationDesc {
let relation_desc = RelationDesc {
typ: RelationType {
column_types,
keys,
@@ -157,7 +157,14 @@ pub fn table_info_value_to_relation_desc(
auto_columns: vec![],
},
names: col_names,
})
};
let default_values = raw_schema
.column_schemas
.iter()
.map(|c| c.default_constraint().cloned())
.collect_vec();
Ok(TableDesc::new(relation_desc, default_values))
}
pub fn from_proto_to_data_type(

View File

@@ -27,6 +27,7 @@ use std::sync::Arc;
use std::time::Instant;
use api::v1::region::compact_request;
use api::v1::region::compact_request::Options;
use common_base::Plugins;
use common_meta::key::SchemaMetadataManagerRef;
use common_telemetry::{debug, error, info, warn};
@@ -40,6 +41,7 @@ use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{RegionId, TableId};
use table::predicate::Predicate;
use task::MAX_PARALLEL_COMPACTION;
use tokio::sync::mpsc::{self, Sender};
use crate::access_layer::AccessLayerRef;
@@ -49,9 +51,9 @@ use crate::compaction::picker::{new_picker, CompactionTask};
use crate::compaction::task::CompactionTaskImpl;
use crate::config::MitoConfig;
use crate::error::{
CompactRegionSnafu, Error, GetSchemaMetadataSnafu, RegionClosedSnafu, RegionDroppedSnafu,
RegionTruncatedSnafu, RemoteCompactionSnafu, Result, TimeRangePredicateOverflowSnafu,
TimeoutSnafu,
CompactRegionSnafu, Error, GetSchemaMetadataSnafu, ManualCompactionOverrideSnafu,
RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, RemoteCompactionSnafu, Result,
TimeRangePredicateOverflowSnafu, TimeoutSnafu,
};
use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
use crate::read::projection::ProjectionMapper;
@@ -85,19 +87,13 @@ pub struct CompactionRequest {
pub(crate) manifest_ctx: ManifestContextRef,
pub(crate) listener: WorkerListener,
pub(crate) schema_metadata_manager: SchemaMetadataManagerRef,
pub(crate) max_parallelism: usize,
}
impl CompactionRequest {
pub(crate) fn region_id(&self) -> RegionId {
self.current_version.metadata.region_id
}
/// Push waiter to the request.
pub(crate) fn push_waiter(&mut self, mut waiter: OptionOutputTx) {
if let Some(waiter) = waiter.take_inner() {
self.waiters.push(waiter);
}
}
}
/// Compaction scheduler tracks and manages compaction tasks.
@@ -145,10 +141,27 @@ impl CompactionScheduler {
waiter: OptionOutputTx,
manifest_ctx: &ManifestContextRef,
schema_metadata_manager: SchemaMetadataManagerRef,
max_parallelism: usize,
) -> Result<()> {
if let Some(status) = self.region_status.get_mut(&region_id) {
// Region is compacting. Add the waiter to pending list.
status.merge_waiter(waiter);
match compact_options {
Options::Regular(_) => {
// Region is compacting. Add the waiter to pending list.
status.merge_waiter(waiter);
}
options @ Options::StrictWindow(_) => {
// Incoming compaction request is manually triggered.
status.set_pending_request(PendingCompaction {
options,
waiter,
max_parallelism,
});
info!(
"Region {} is compacting, manually compaction will be re-scheduled.",
region_id
);
}
}
return Ok(());
}
@@ -163,6 +176,7 @@ impl CompactionScheduler {
manifest_ctx,
self.listener.clone(),
schema_metadata_manager,
max_parallelism,
);
self.region_status.insert(region_id, status);
let result = self
@@ -184,6 +198,35 @@ impl CompactionScheduler {
return;
};
if let Some(pending_request) = std::mem::take(&mut status.pending_request) {
let PendingCompaction {
options,
waiter,
max_parallelism,
} = pending_request;
let request = status.new_compaction_request(
self.request_sender.clone(),
waiter,
self.engine_config.clone(),
self.cache_manager.clone(),
manifest_ctx,
self.listener.clone(),
schema_metadata_manager,
max_parallelism,
);
if let Err(e) = self.schedule_compaction_request(request, options).await {
error!(e; "Failed to continue pending manual compaction for region id: {}", region_id);
} else {
debug!(
"Successfully scheduled manual compaction for region id: {}",
region_id
);
}
return;
}
// We should always try to compact the region until picker returns None.
let request = status.new_compaction_request(
self.request_sender.clone(),
@@ -193,6 +236,7 @@ impl CompactionScheduler {
manifest_ctx,
self.listener.clone(),
schema_metadata_manager,
MAX_PARALLEL_COMPACTION,
);
// Try to schedule next compaction task for this region.
if let Err(e) = self
@@ -264,6 +308,7 @@ impl CompactionScheduler {
manifest_ctx,
listener,
schema_metadata_manager,
max_parallelism,
} = request;
let ttl = find_ttl(
@@ -294,6 +339,7 @@ impl CompactionScheduler {
manifest_ctx: manifest_ctx.clone(),
file_purger: None,
ttl: Some(ttl),
max_parallelism,
};
let picker_output = {
@@ -417,27 +463,6 @@ impl Drop for CompactionScheduler {
}
}
/// Pending compaction tasks.
struct PendingCompaction {
waiters: Vec<OutputTx>,
}
impl PendingCompaction {
/// Push waiter to the request.
fn push_waiter(&mut self, mut waiter: OptionOutputTx) {
if let Some(waiter) = waiter.take_inner() {
self.waiters.push(waiter);
}
}
/// Send compaction error to waiter.
fn on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
for waiter in self.waiters.drain(..) {
waiter.send(Err(err.clone()).context(CompactRegionSnafu { region_id }));
}
}
}
/// Finds TTL of table by first examine table options then database options.
async fn find_ttl(
table_id: TableId,
@@ -471,10 +496,10 @@ struct CompactionStatus {
version_control: VersionControlRef,
/// Access layer of the region.
access_layer: AccessLayerRef,
/// Compaction pending to schedule.
///
/// For simplicity, we merge all pending compaction requests into one.
pending_compaction: Option<PendingCompaction>,
/// Pending waiters for compaction.
waiters: Vec<OutputTx>,
/// Pending compactions that are supposed to run as soon as current compaction task finished.
pending_request: Option<PendingCompaction>,
}
impl CompactionStatus {
@@ -488,23 +513,44 @@ impl CompactionStatus {
region_id,
version_control,
access_layer,
pending_compaction: None,
waiters: Vec::new(),
pending_request: None,
}
}
/// Merge the watier to the pending compaction.
fn merge_waiter(&mut self, waiter: OptionOutputTx) {
let pending = self
.pending_compaction
.get_or_insert_with(|| PendingCompaction {
waiters: Vec::new(),
});
pending.push_waiter(waiter);
/// Merge the waiter to the pending compaction.
fn merge_waiter(&mut self, mut waiter: OptionOutputTx) {
if let Some(waiter) = waiter.take_inner() {
self.waiters.push(waiter);
}
}
fn on_failure(self, err: Arc<Error>) {
if let Some(mut pending) = self.pending_compaction {
pending.on_failure(self.region_id, err.clone());
/// Set pending compaction request or replace current value if already exist.
fn set_pending_request(&mut self, pending: PendingCompaction) {
if let Some(mut prev) = self.pending_request.replace(pending) {
debug!(
"Replace pending compaction options with new request {:?} for region: {}",
prev.options, self.region_id
);
if let Some(waiter) = prev.waiter.take_inner() {
waiter.send(ManualCompactionOverrideSnafu.fail());
}
}
}
fn on_failure(mut self, err: Arc<Error>) {
for waiter in self.waiters.drain(..) {
waiter.send(Err(err.clone()).context(CompactRegionSnafu {
region_id: self.region_id,
}));
}
if let Some(pending_compaction) = self.pending_request {
pending_compaction
.waiter
.send(Err(err.clone()).context(CompactRegionSnafu {
region_id: self.region_id,
}));
}
}
@@ -515,34 +561,36 @@ impl CompactionStatus {
fn new_compaction_request(
&mut self,
request_sender: Sender<WorkerRequest>,
waiter: OptionOutputTx,
mut waiter: OptionOutputTx,
engine_config: Arc<MitoConfig>,
cache_manager: CacheManagerRef,
manifest_ctx: &ManifestContextRef,
listener: WorkerListener,
schema_metadata_manager: SchemaMetadataManagerRef,
max_parallelism: usize,
) -> CompactionRequest {
let current_version = CompactionVersion::from(self.version_control.current().version);
let start_time = Instant::now();
let mut req = CompactionRequest {
let mut waiters = Vec::with_capacity(self.waiters.len() + 1);
waiters.extend(std::mem::take(&mut self.waiters));
if let Some(waiter) = waiter.take_inner() {
waiters.push(waiter);
}
CompactionRequest {
engine_config,
current_version,
access_layer: self.access_layer.clone(),
request_sender: request_sender.clone(),
waiters: Vec::new(),
waiters,
start_time,
cache_manager,
manifest_ctx: manifest_ctx.clone(),
listener,
schema_metadata_manager,
};
if let Some(pending) = self.pending_compaction.take() {
req.waiters = pending.waiters;
max_parallelism,
}
req.push_waiter(waiter);
req
}
}
@@ -680,8 +728,20 @@ fn get_expired_ssts(
.collect()
}
/// Pending compaction request that is supposed to run after current task is finished,
/// typically used for manual compactions.
struct PendingCompaction {
/// Compaction options. Currently, it can only be [StrictWindow].
pub(crate) options: compact_request::Options,
/// Waiters of pending requests.
pub(crate) waiter: OptionOutputTx,
/// Max parallelism for pending compaction.
pub(crate) max_parallelism: usize,
}
#[cfg(test)]
mod tests {
use api::v1::region::StrictWindow;
use tokio::sync::oneshot;
use super::*;
@@ -722,6 +782,7 @@ mod tests {
waiter,
&manifest_ctx,
schema_metadata_manager.clone(),
1,
)
.await
.unwrap();
@@ -742,6 +803,7 @@ mod tests {
waiter,
&manifest_ctx,
schema_metadata_manager,
1,
)
.await
.unwrap();
@@ -752,6 +814,7 @@ mod tests {
#[tokio::test]
async fn test_schedule_on_finished() {
common_telemetry::init_default_ut_logging();
let job_scheduler = Arc::new(VecScheduler::default());
let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
let (tx, _rx) = mpsc::channel(4);
@@ -795,6 +858,7 @@ mod tests {
OptionOutputTx::none(),
&manifest_ctx,
schema_metadata_manager.clone(),
1,
)
.await
.unwrap();
@@ -816,6 +880,119 @@ mod tests {
purger.clone(),
);
// The task is pending.
let (tx, _rx) = oneshot::channel();
scheduler
.schedule_compaction(
region_id,
compact_request::Options::Regular(Default::default()),
&version_control,
&env.access_layer,
OptionOutputTx::new(Some(OutputTx::new(tx))),
&manifest_ctx,
schema_metadata_manager.clone(),
1,
)
.await
.unwrap();
assert_eq!(1, scheduler.region_status.len());
assert_eq!(1, job_scheduler.num_jobs());
assert!(!scheduler
.region_status
.get(&builder.region_id())
.unwrap()
.waiters
.is_empty());
// On compaction finished and schedule next compaction.
scheduler
.on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
.await;
assert_eq!(1, scheduler.region_status.len());
assert_eq!(2, job_scheduler.num_jobs());
// 5 files for next compaction.
apply_edit(
&version_control,
&[(0, end), (20, end), (40, end), (60, end), (80, end)],
&[],
purger.clone(),
);
let (tx, _rx) = oneshot::channel();
// The task is pending.
scheduler
.schedule_compaction(
region_id,
compact_request::Options::Regular(Default::default()),
&version_control,
&env.access_layer,
OptionOutputTx::new(Some(OutputTx::new(tx))),
&manifest_ctx,
schema_metadata_manager,
1,
)
.await
.unwrap();
assert_eq!(2, job_scheduler.num_jobs());
assert!(!scheduler
.region_status
.get(&builder.region_id())
.unwrap()
.waiters
.is_empty());
}
#[tokio::test]
async fn test_manual_compaction_when_compaction_in_progress() {
common_telemetry::init_default_ut_logging();
let job_scheduler = Arc::new(VecScheduler::default());
let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let mut builder = VersionControlBuilder::new();
let purger = builder.file_purger();
let region_id = builder.region_id();
let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
schema_metadata_manager
.register_region_table_info(
builder.region_id().table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
kv_backend,
)
.await;
// 5 files to compact.
let end = 1000 * 1000;
let version_control = Arc::new(
builder
.push_l0_file(0, end)
.push_l0_file(10, end)
.push_l0_file(50, end)
.push_l0_file(80, end)
.push_l0_file(90, end)
.build(),
);
let manifest_ctx = env
.mock_manifest_context(version_control.current().version.metadata.clone())
.await;
let file_metas: Vec<_> = version_control.current().version.ssts.levels()[0]
.files
.values()
.map(|file| file.meta_ref().clone())
.collect();
// 5 files for next compaction and removes old files.
apply_edit(
&version_control,
&[(0, end), (20, end), (40, end), (60, end), (80, end)],
&file_metas,
purger.clone(),
);
scheduler
.schedule_compaction(
region_id,
@@ -825,17 +1002,40 @@ mod tests {
OptionOutputTx::none(),
&manifest_ctx,
schema_metadata_manager.clone(),
1,
)
.await
.unwrap();
// Should schedule 1 compaction.
assert_eq!(1, scheduler.region_status.len());
assert_eq!(1, job_scheduler.num_jobs());
assert!(scheduler
.region_status
.get(&builder.region_id())
.get(&region_id)
.unwrap()
.pending_compaction
.is_some());
.pending_request
.is_none());
// Schedule another manual compaction.
let (tx, _rx) = oneshot::channel();
scheduler
.schedule_compaction(
region_id,
compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }),
&version_control,
&env.access_layer,
OptionOutputTx::new(Some(OutputTx::new(tx))),
&manifest_ctx,
schema_metadata_manager.clone(),
1,
)
.await
.unwrap();
assert_eq!(1, scheduler.region_status.len());
// Current job num should be 1 since compaction is in progress.
assert_eq!(1, job_scheduler.num_jobs());
let status = scheduler.region_status.get(&builder.region_id()).unwrap();
assert!(status.pending_request.is_some());
// On compaction finished and schedule next compaction.
scheduler
@@ -843,32 +1043,8 @@ mod tests {
.await;
assert_eq!(1, scheduler.region_status.len());
assert_eq!(2, job_scheduler.num_jobs());
// 5 files for next compaction.
apply_edit(
&version_control,
&[(0, end), (20, end), (40, end), (60, end), (80, end)],
&[],
purger.clone(),
);
// The task is pending.
scheduler
.schedule_compaction(
region_id,
compact_request::Options::Regular(Default::default()),
&version_control,
&env.access_layer,
OptionOutputTx::none(),
&manifest_ctx,
schema_metadata_manager,
)
.await
.unwrap();
assert_eq!(2, job_scheduler.num_jobs());
assert!(scheduler
.region_status
.get(&builder.region_id())
.unwrap()
.pending_compaction
.is_some());
let status = scheduler.region_status.get(&builder.region_id()).unwrap();
assert!(status.pending_request.is_none());
}
}

View File

@@ -91,6 +91,12 @@ pub struct CompactionRegion {
pub(crate) current_version: CompactionVersion,
pub(crate) file_purger: Option<Arc<LocalFilePurger>>,
pub(crate) ttl: Option<TimeToLive>,
/// Controls the parallelism of this compaction task. Default is 1.
///
/// The parallel is inside this compaction task, not across different compaction tasks.
/// It can be different windows of the same compaction task or something like this.
pub max_parallelism: usize,
}
/// OpenCompactionRegionRequest represents the request to open a compaction region.
@@ -99,6 +105,7 @@ pub struct OpenCompactionRegionRequest {
pub region_id: RegionId,
pub region_dir: String,
pub region_options: RegionOptions,
pub max_parallelism: usize,
}
/// Open a compaction region from a compaction request.
@@ -205,6 +212,7 @@ pub async fn open_compaction_region(
current_version,
file_purger: Some(file_purger),
ttl: Some(ttl),
max_parallelism: req.max_parallelism,
})
}
@@ -266,6 +274,7 @@ impl Compactor for DefaultCompactor {
let mut futs = Vec::with_capacity(picker_output.outputs.len());
let mut compacted_inputs =
Vec::with_capacity(picker_output.outputs.iter().map(|o| o.inputs.len()).sum());
let internal_parallelism = compaction_region.max_parallelism.max(1);
for output in picker_output.outputs.drain(..) {
compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone()));
@@ -358,9 +367,8 @@ impl Compactor for DefaultCompactor {
}
let mut output_files = Vec::with_capacity(futs.len());
while !futs.is_empty() {
let mut task_chunk =
Vec::with_capacity(crate::compaction::task::MAX_PARALLEL_COMPACTION);
for _ in 0..crate::compaction::task::MAX_PARALLEL_COMPACTION {
let mut task_chunk = Vec::with_capacity(internal_parallelism);
for _ in 0..internal_parallelism {
if let Some(task) = futs.pop() {
task_chunk.push(common_runtime::spawn_compact(task));
}

View File

@@ -32,7 +32,7 @@ use crate::request::{
use crate::worker::WorkerListener;
/// Maximum number of compaction tasks in parallel.
pub const MAX_PARALLEL_COMPACTION: usize = 8;
pub const MAX_PARALLEL_COMPACTION: usize = 1;
pub(crate) struct CompactionTaskImpl {
pub compaction_region: CompactionRegion,

View File

@@ -12,16 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::ops::Range;
use std::sync::Arc;
use std::time::Duration;
use api::v1::{ColumnSchema, Rows};
use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
use datatypes::prelude::ScalarVector;
use datatypes::vectors::TimestampMillisecondVector;
use store_api::region_engine::{RegionEngine, RegionRole};
use store_api::region_request::AlterKind::SetRegionOptions;
use store_api::region_request::{
RegionCompactRequest, RegionDeleteRequest, RegionFlushRequest, RegionRequest,
RegionAlterRequest, RegionCompactRequest, RegionDeleteRequest, RegionFlushRequest,
RegionOpenRequest, RegionRequest, SetRegionOption,
};
use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::Notify;
@@ -466,3 +470,219 @@ async fn test_compaction_update_time_window() {
let vec = collect_stream_ts(stream).await;
assert_eq!((0..4000).map(|v| v * 1000).collect::<Vec<_>>(), vec);
}
#[tokio::test]
async fn test_change_region_compaction_window() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
env.get_kv_backend(),
)
.await;
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "1")
.insert_option("compaction.twcs.max_active_window_files", "1")
.insert_option("compaction.twcs.max_inactive_window_runs", "1")
.insert_option("compaction.twcs.max_inactive_window_files", "1")
.build();
let region_dir = request.region_dir.clone();
let column_schemas = request
.column_metadatas
.iter()
.map(column_metadata_to_column_schema)
.collect::<Vec<_>>();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Flush 2 SSTs for compaction.
put_and_flush(&engine, region_id, &column_schemas, 0..1200).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 1200..2400).await; // window 3600
engine
.handle_request(
region_id,
RegionRequest::Compact(RegionCompactRequest::default()),
)
.await
.unwrap();
// Put window 7200
put_and_flush(&engine, region_id, &column_schemas, 4000..5000).await; // window 3600
// Check compaction window.
let region = engine.get_region(region_id).unwrap();
{
let version = region.version();
assert_eq!(
Some(Duration::from_secs(3600)),
version.compaction_time_window,
);
assert!(version.options.compaction.time_window().is_none());
}
// Change compaction window.
let request = RegionRequest::Alter(RegionAlterRequest {
schema_version: region.metadata().schema_version,
kind: SetRegionOptions {
options: vec![SetRegionOption::Twsc(
"compaction.twcs.time_window".to_string(),
"2h".to_string(),
)],
},
});
engine.handle_request(region_id, request).await.unwrap();
// Compaction again. It should compacts window 3600 and 7200
// into 7200.
engine
.handle_request(
region_id,
RegionRequest::Compact(RegionCompactRequest::default()),
)
.await
.unwrap();
// Check compaction window.
{
let region = engine.get_region(region_id).unwrap();
let version = region.version();
assert_eq!(
Some(Duration::from_secs(7200)),
version.compaction_time_window,
);
assert_eq!(
Some(Duration::from_secs(7200)),
version.options.compaction.time_window()
);
}
// Reopen region.
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir,
options: Default::default(),
skip_wal_replay: false,
}),
)
.await
.unwrap();
// Check compaction window.
{
let region = engine.get_region(region_id).unwrap();
let version = region.version();
assert_eq!(
Some(Duration::from_secs(7200)),
version.compaction_time_window,
);
// We open the region without options, so the time window should be None.
assert!(version.options.compaction.time_window().is_none());
}
}
#[tokio::test]
async fn test_open_overwrite_compaction_window() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
env.get_kv_backend(),
)
.await;
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "1")
.insert_option("compaction.twcs.max_active_window_files", "1")
.insert_option("compaction.twcs.max_inactive_window_runs", "1")
.insert_option("compaction.twcs.max_inactive_window_files", "1")
.build();
let region_dir = request.region_dir.clone();
let column_schemas = request
.column_metadatas
.iter()
.map(column_metadata_to_column_schema)
.collect::<Vec<_>>();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Flush 2 SSTs for compaction.
put_and_flush(&engine, region_id, &column_schemas, 0..1200).await; // window 3600
put_and_flush(&engine, region_id, &column_schemas, 1200..2400).await; // window 3600
engine
.handle_request(
region_id,
RegionRequest::Compact(RegionCompactRequest::default()),
)
.await
.unwrap();
// Check compaction window.
{
let region = engine.get_region(region_id).unwrap();
let version = region.version();
assert_eq!(
Some(Duration::from_secs(3600)),
version.compaction_time_window,
);
assert!(version.options.compaction.time_window().is_none());
}
// Reopen region.
let options = HashMap::from([
("compaction.type".to_string(), "twcs".to_string()),
("compaction.twcs.time_window".to_string(), "2h".to_string()),
]);
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir,
options,
skip_wal_replay: false,
}),
)
.await
.unwrap();
// Check compaction window.
{
let region = engine.get_region(region_id).unwrap();
let version = region.version();
assert_eq!(
Some(Duration::from_secs(7200)),
version.compaction_time_window,
);
assert_eq!(
Some(Duration::from_secs(7200)),
version.options.compaction.time_window()
);
}
}

View File

@@ -464,6 +464,7 @@ async fn test_open_compaction_region() {
region_id,
region_dir: region_dir.clone(),
region_options: RegionOptions::default(),
max_parallelism: 1,
};
let compaction_region = open_compaction_region(

View File

@@ -925,6 +925,23 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Unexpected impure default value with region_id: {}, column: {}, default_value: {}",
region_id,
column,
default_value
))]
UnexpectedImpureDefault {
#[snafu(implicit)]
location: Location,
region_id: RegionId,
column: String,
default_value: String,
},
#[snafu(display("Manual compaction is override by following operations."))]
ManualCompactionOverride {},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -964,7 +981,8 @@ impl ErrorExt for Error {
| InvalidParquet { .. }
| OperateAbortedIndex { .. }
| UnexpectedReplay { .. }
| IndexEncodeNull { .. } => StatusCode::Unexpected,
| IndexEncodeNull { .. }
| UnexpectedImpureDefault { .. } => StatusCode::Unexpected,
RegionNotFound { .. } => StatusCode::RegionNotFound,
ObjectStoreNotFound { .. }
| InvalidScanIndex { .. }
@@ -1067,6 +1085,8 @@ impl ErrorExt for Error {
PushBloomFilterValue { source, .. } | BloomFilterFinish { source, .. } => {
source.status_code()
}
ManualCompactionOverride {} => StatusCode::Cancelled,
}
}

View File

@@ -26,6 +26,7 @@
use std::sync::{Arc, RwLock};
use std::time::Duration;
use common_telemetry::info;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::SequenceNumber;
@@ -253,7 +254,10 @@ pub(crate) struct Version {
///
/// Used to check if it is a flush task during the truncating table.
pub(crate) truncated_entry_id: Option<EntryId>,
/// Inferred compaction time window.
/// Inferred compaction time window from flush.
///
/// If compaction options contain a time window, it will overwrite this value
/// when creating a new version from the [VersionBuilder].
pub(crate) compaction_time_window: Option<Duration>,
/// Options of the region.
pub(crate) options: RegionOptions,
@@ -389,7 +393,24 @@ impl VersionBuilder {
}
/// Builds a new [Version] from the builder.
/// It overwrites the window size by compaction option.
pub(crate) fn build(self) -> Version {
let compaction_time_window = self
.options
.compaction
.time_window()
.or(self.compaction_time_window);
if self.compaction_time_window.is_some()
&& compaction_time_window != self.compaction_time_window
{
info!(
"VersionBuilder overwrites region compaction time window from {:?} to {:?}, region: {}",
self.compaction_time_window,
compaction_time_window,
self.metadata.region_id
);
}
Version {
metadata: self.metadata,
memtables: self.memtables,
@@ -397,7 +418,7 @@ impl VersionBuilder {
flushed_entry_id: self.flushed_entry_id,
flushed_sequence: self.flushed_sequence,
truncated_entry_id: self.truncated_entry_id,
compaction_time_window: self.compaction_time_window,
compaction_time_window,
options: self.options,
}
}

View File

@@ -42,7 +42,7 @@ use tokio::sync::oneshot::{self, Receiver, Sender};
use crate::error::{
CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu,
FlushRegionSnafu, InvalidRequestSnafu, Result,
FlushRegionSnafu, InvalidRequestSnafu, Result, UnexpectedImpureDefaultSnafu,
};
use crate::manifest::action::RegionEdit;
use crate::memtable::MemtableId;
@@ -333,6 +333,14 @@ impl WriteRequest {
}
OpType::Put => {
// For put requests, we use the default value from column schema.
if column.column_schema.is_default_impure() {
UnexpectedImpureDefaultSnafu {
region_id: self.region_id,
column: &column.column_schema.name,
default_value: format!("{:?}", column.column_schema.default_constraint()),
}
.fail()?
}
column
.column_schema
.create_default()
@@ -1039,6 +1047,57 @@ mod tests {
check_invalid_request(&err, r#"unknown columns: ["k1"]"#);
}
#[test]
fn test_fill_impure_columns_err() {
let rows = Rows {
schema: vec![new_column_schema(
"k0",
ColumnDataType::Int64,
SemanticType::Tag,
)],
rows: vec![Row {
values: vec![i64_value(1)],
}],
};
let metadata = {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
builder
.push_column_metadata(ColumnMetadata {
column_schema: datatypes::schema::ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_default_constraint(Some(ColumnDefaultConstraint::Function(
"now()".to_string(),
)))
.unwrap(),
semantic_type: SemanticType::Timestamp,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: datatypes::schema::ColumnSchema::new(
"k0",
ConcreteDataType::int64_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 2,
})
.primary_key(vec![2]);
builder.build().unwrap()
};
let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows).unwrap();
let err = request.check_schema(&metadata).unwrap_err();
assert!(err.is_fill_default());
assert!(request
.fill_missing_columns(&metadata)
.unwrap_err()
.to_string()
.contains("Unexpected impure default value with region_id"));
}
#[test]
fn test_fill_missing_columns() {
let rows = Rows {

View File

@@ -45,6 +45,8 @@ impl<S> RegionWorkerLoop<S> {
sender,
&region.manifest_ctx,
self.schema_metadata_manager.clone(),
// TODO(yingwen): expose this to frontend
1,
)
.await
{
@@ -113,6 +115,7 @@ impl<S> RegionWorkerLoop<S> {
OptionOutputTx::none(),
&region.manifest_ctx,
self.schema_metadata_manager.clone(),
1,
)
.await
{

View File

@@ -47,6 +47,7 @@ use store_api::metric_engine_consts::{
};
use store_api::mito_engine_options::{APPEND_MODE_KEY, MERGE_MODE_KEY};
use store_api::storage::{RegionId, TableId};
use table::metadata::TableInfo;
use table::requests::{InsertRequest as TableInsertRequest, AUTO_CREATE_TABLE_KEY, TTL_KEY};
use table::table_reference::TableReference;
use table::TableRef;
@@ -58,7 +59,9 @@ use crate::error::{
use crate::expr_factory::CreateExprFactory;
use crate::region_req_factory::RegionRequestFactory;
use crate::req_convert::common::preprocess_row_insert_requests;
use crate::req_convert::insert::{ColumnToRow, RowToRegion, StatementToRegion, TableToRegion};
use crate::req_convert::insert::{
fill_reqs_with_impure_default, ColumnToRow, RowToRegion, StatementToRegion, TableToRegion,
};
use crate::statement::StatementExecutor;
pub struct Inserter {
@@ -200,18 +203,26 @@ impl Inserter {
});
validate_column_count_match(&requests)?;
let (table_name_to_ids, instant_table_ids) = self
let CreateAlterTableResult {
instant_table_ids,
table_infos,
} = self
.create_or_alter_tables_on_demand(&requests, &ctx, create_type, statement_executor)
.await?;
let name_to_info = table_infos
.values()
.map(|info| (info.name.clone(), info.clone()))
.collect::<HashMap<_, _>>();
let inserts = RowToRegion::new(
table_name_to_ids,
name_to_info,
instant_table_ids,
self.partition_manager.as_ref(),
)
.convert(requests)
.await?;
self.do_request(inserts, &ctx).await
self.do_request(inserts, &table_infos, &ctx).await
}
/// Handles row inserts request with metric engine.
@@ -236,7 +247,10 @@ impl Inserter {
.await?;
// check and create logical tables
let (table_name_to_ids, instant_table_ids) = self
let CreateAlterTableResult {
instant_table_ids,
table_infos,
} = self
.create_or_alter_tables_on_demand(
&requests,
&ctx,
@@ -244,15 +258,15 @@ impl Inserter {
statement_executor,
)
.await?;
let inserts = RowToRegion::new(
table_name_to_ids,
instant_table_ids,
&self.partition_manager,
)
.convert(requests)
.await?;
let name_to_info = table_infos
.values()
.map(|info| (info.name.clone(), info.clone()))
.collect::<HashMap<_, _>>();
let inserts = RowToRegion::new(name_to_info, instant_table_ids, &self.partition_manager)
.convert(requests)
.await?;
self.do_request(inserts, &ctx).await
self.do_request(inserts, &table_infos, &ctx).await
}
pub async fn handle_table_insert(
@@ -273,7 +287,10 @@ impl Inserter {
.convert(request)
.await?;
self.do_request(inserts, &ctx).await
let table_infos =
HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
self.do_request(inserts, &table_infos, &ctx).await
}
pub async fn handle_statement_insert(
@@ -281,12 +298,15 @@ impl Inserter {
insert: &Insert,
ctx: &QueryContextRef,
) -> Result<Output> {
let inserts =
let (inserts, table_info) =
StatementToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, ctx)
.convert(insert, ctx)
.await?;
self.do_request(inserts, ctx).await
let table_infos =
HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
self.do_request(inserts, &table_infos, ctx).await
}
}
@@ -294,8 +314,12 @@ impl Inserter {
async fn do_request(
&self,
requests: InstantAndNormalInsertRequests,
table_infos: &HashMap<TableId, Arc<TableInfo>>,
ctx: &QueryContextRef,
) -> Result<Output> {
// Fill impure default values in the request
let requests = fill_reqs_with_impure_default(table_infos, requests)?;
let write_cost = write_meter!(
ctx.current_catalog(),
ctx.current_schema(),
@@ -499,14 +523,15 @@ impl Inserter {
ctx: &QueryContextRef,
auto_create_table_type: AutoCreateTableType,
statement_executor: &StatementExecutor,
) -> Result<(HashMap<String, TableId>, HashSet<TableId>)> {
) -> Result<CreateAlterTableResult> {
let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND
.with_label_values(&[auto_create_table_type.as_str()])
.start_timer();
let catalog = ctx.current_catalog();
let schema = ctx.current_schema();
let mut table_name_to_ids = HashMap::with_capacity(requests.inserts.len());
let mut table_infos = HashMap::new();
// If `auto_create_table` hint is disabled, skip creating/altering tables.
let auto_create_table_hint = ctx
.extension(AUTO_CREATE_TABLE_KEY)
@@ -535,9 +560,13 @@ impl Inserter {
if table_info.is_ttl_instant_table() {
instant_table_ids.insert(table_info.table_id());
}
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
table_infos.insert(table_info.table_id(), table.table_info());
}
return Ok((table_name_to_ids, instant_table_ids));
let ret = CreateAlterTableResult {
instant_table_ids,
table_infos,
};
return Ok(ret);
}
let mut create_tables = vec![];
@@ -551,7 +580,7 @@ impl Inserter {
if table_info.is_ttl_instant_table() {
instant_table_ids.insert(table_info.table_id());
}
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
table_infos.insert(table_info.table_id(), table.table_info());
if let Some(alter_expr) =
self.get_alter_table_expr_on_demand(req, &table, ctx)?
{
@@ -579,7 +608,7 @@ impl Inserter {
if table_info.is_ttl_instant_table() {
instant_table_ids.insert(table_info.table_id());
}
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
table_infos.insert(table_info.table_id(), table.table_info());
}
}
if !alter_tables.is_empty() {
@@ -602,7 +631,7 @@ impl Inserter {
if table_info.is_ttl_instant_table() {
instant_table_ids.insert(table_info.table_id());
}
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
table_infos.insert(table_info.table_id(), table.table_info());
}
for alter_expr in alter_tables.into_iter() {
statement_executor
@@ -612,7 +641,10 @@ impl Inserter {
}
}
Ok((table_name_to_ids, instant_table_ids))
Ok(CreateAlterTableResult {
instant_table_ids,
table_infos,
})
}
async fn create_physical_table_on_demand(
@@ -874,3 +906,11 @@ fn build_create_table_expr(
) -> Result<CreateTableExpr> {
CreateExprFactory.create_table_expr_by_column_schemas(table, request_schema, engine, None)
}
/// Result of `create_or_alter_tables_on_demand`.
struct CreateAlterTableResult {
/// table ids of ttl=instant tables.
instant_table_ids: HashSet<TableId>,
/// Table Info of the created tables.
table_infos: HashMap<TableId, Arc<TableInfo>>,
}

View File

@@ -13,12 +13,14 @@
// limitations under the License.
mod column_to_row;
mod fill_impure_default;
mod row_to_region;
mod stmt_to_region;
mod table_to_region;
use api::v1::SemanticType;
pub use column_to_row::ColumnToRow;
pub use fill_impure_default::fill_reqs_with_impure_default;
pub use row_to_region::RowToRegion;
use snafu::{OptionExt, ResultExt};
pub use stmt_to_region::StatementToRegion;

View File

@@ -0,0 +1,242 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Util functions to help with fill impure default values columns in request
use std::sync::Arc;
use ahash::{HashMap, HashMapExt, HashSet};
use datatypes::schema::ColumnSchema;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{RegionId, TableId};
use table::metadata::{TableInfo, TableInfoRef};
use crate::error::{ConvertColumnDefaultConstraintSnafu, Result, UnexpectedSnafu};
use crate::expr_factory::column_schemas_to_defs;
use crate::insert::InstantAndNormalInsertRequests;
/// Find all columns that have impure default values
pub fn find_all_impure_columns(table_info: &TableInfo) -> Vec<ColumnSchema> {
let columns = table_info.meta.schema.column_schemas();
columns
.iter()
.filter(|column| column.is_default_impure())
.cloned()
.collect()
}
/// Fill impure default values in the request
pub struct ImpureDefaultFiller {
impure_columns: HashMap<String, (api::v1::ColumnSchema, Option<api::v1::Value>)>,
}
impl ImpureDefaultFiller {
pub fn new(table_info: TableInfoRef) -> Result<Self> {
let impure_column_list = find_all_impure_columns(&table_info);
let pks = &table_info.meta.primary_key_indices;
let pk_names = pks
.iter()
.map(|&i| table_info.meta.schema.column_name_by_index(i).to_string())
.collect::<Vec<_>>();
let mut impure_columns = HashMap::new();
for column in impure_column_list {
let default_value = column
.create_impure_default()
.with_context(|_| ConvertColumnDefaultConstraintSnafu {
column_name: column.name.clone(),
})?
.with_context(|| UnexpectedSnafu {
violated: format!(
"Expect default value to be impure, found {:?}",
column.default_constraint()
),
})?;
let grpc_default_value = api::helper::to_proto_value(default_value);
let def = column_schemas_to_defs(vec![column], &pk_names)?.swap_remove(0);
let grpc_column_schema = api::v1::ColumnSchema {
column_name: def.name,
datatype: def.data_type,
semantic_type: def.semantic_type,
datatype_extension: def.datatype_extension,
options: def.options,
};
impure_columns.insert(
grpc_column_schema.column_name.clone(),
(grpc_column_schema, grpc_default_value),
);
}
Ok(Self { impure_columns })
}
/// Fill impure default values in the request
pub fn fill_rows(&self, rows: &mut api::v1::Rows) {
let impure_columns_in_reqs: HashSet<_> = rows
.schema
.iter()
.filter_map(|schema| {
if self.impure_columns.contains_key(&schema.column_name) {
Some(&schema.column_name)
} else {
None
}
})
.collect();
if self.impure_columns.len() == impure_columns_in_reqs.len() {
return;
}
let (schema_append, row_append): (Vec<_>, Vec<_>) = self
.impure_columns
.iter()
.filter_map(|(name, (schema, val))| {
if !impure_columns_in_reqs.contains(name) {
Some((schema.clone(), val.clone().unwrap_or_default()))
} else {
None
}
})
.unzip();
rows.schema.extend(schema_append);
for row in rows.rows.iter_mut() {
row.values.extend_from_slice(row_append.as_slice());
}
}
}
/// Fill impure default values in the request(only for normal insert requests, since instant insert can be filled in flownode directly as a single source of truth)
pub fn fill_reqs_with_impure_default(
table_infos: &HashMap<TableId, Arc<TableInfo>>,
mut inserts: InstantAndNormalInsertRequests,
) -> Result<InstantAndNormalInsertRequests> {
let fillers = table_infos
.iter()
.map(|(table_id, table_info)| {
let table_id = *table_id;
ImpureDefaultFiller::new(table_info.clone()).map(|filler| (table_id, filler))
})
.collect::<Result<HashMap<TableId, ImpureDefaultFiller>>>()?;
let normal_inserts = &mut inserts.normal_requests;
for request in normal_inserts.requests.iter_mut() {
let region_id = RegionId::from(request.region_id);
let table_id = region_id.table_id();
let filler = fillers.get(&table_id).with_context(|| UnexpectedSnafu {
violated: format!("impure default filler for table_id: {} not found", table_id),
})?;
if let Some(rows) = &mut request.rows {
filler.fill_rows(rows);
}
}
Ok(inserts)
}
#[cfg(test)]
mod tests {
use api::v1::value::ValueData;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder};
use datatypes::value::Value;
use table::metadata::{TableInfoBuilder, TableMetaBuilder};
use super::*;
/// Create a test schema with 3 columns: `[col1 int32, ts timestampmills DEFAULT now(), col2 int32]`.
fn new_test_schema() -> Schema {
let column_schemas = vec![
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true)
.with_default_constraint(Some(datatypes::schema::ColumnDefaultConstraint::Function(
"now()".to_string(),
)))
.unwrap(),
ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true)
.with_default_constraint(Some(datatypes::schema::ColumnDefaultConstraint::Value(
Value::from(1i32),
)))
.unwrap(),
];
SchemaBuilder::try_from(column_schemas)
.unwrap()
.version(123)
.build()
.unwrap()
}
pub fn new_table_info() -> TableInfo {
let schema = Arc::new(new_test_schema());
let meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![0])
.engine("engine")
.next_column_id(3)
.build()
.unwrap();
TableInfoBuilder::default()
.table_id(10)
.table_version(5)
.name("mytable")
.meta(meta)
.build()
.unwrap()
}
fn column_schema_to_proto(
column_schema: &[ColumnSchema],
pk_names: &[String],
) -> Vec<api::v1::ColumnSchema> {
column_schemas_to_defs(column_schema.to_vec(), pk_names)
.unwrap()
.into_iter()
.map(|def| api::v1::ColumnSchema {
column_name: def.name,
datatype: def.data_type,
semantic_type: def.semantic_type,
datatype_extension: def.datatype_extension,
options: def.options,
})
.collect()
}
#[test]
fn test_impure_append() {
let row = api::v1::Row {
values: vec![api::v1::Value {
value_data: Some(ValueData::I32Value(42)),
}],
};
let schema = new_test_schema().column_schemas()[0].clone();
let col_schemas = column_schema_to_proto(&[schema], &["col1".to_string()]);
let mut rows = api::v1::Rows {
schema: col_schemas,
rows: vec![row],
};
let info = new_table_info();
let filler = ImpureDefaultFiller::new(Arc::new(info)).unwrap();
filler.fill_rows(&mut rows);
assert_eq!(rows.schema[1].column_name, "ts");
assert!(rows.schema.len() == 2 && rows.rows[0].values.len() == 2);
}
}

View File

@@ -13,30 +13,31 @@
// limitations under the License.
use ahash::{HashMap, HashSet};
use api::v1::region::InsertRequests as RegionInsertRequests;
use api::v1::region::{InsertRequest, InsertRequests as RegionInsertRequests};
use api::v1::RowInsertRequests;
use partition::manager::PartitionRuleManager;
use snafu::OptionExt;
use table::metadata::TableId;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::{TableId, TableInfoRef};
use crate::error::{Result, TableNotFoundSnafu};
use crate::insert::InstantAndNormalInsertRequests;
use crate::req_convert::common::partitioner::Partitioner;
pub struct RowToRegion<'a> {
table_name_to_ids: HashMap<String, TableId>,
tables_info: HashMap<String, TableInfoRef>,
instant_table_ids: HashSet<TableId>,
partition_manager: &'a PartitionRuleManager,
}
impl<'a> RowToRegion<'a> {
pub fn new(
table_name_to_ids: HashMap<String, TableId>,
tables_info: HashMap<String, TableInfoRef>,
instant_table_ids: HashSet<TableId>,
partition_manager: &'a PartitionRuleManager,
) -> Self {
Self {
table_name_to_ids,
tables_info,
instant_table_ids,
partition_manager,
}
@@ -49,10 +50,24 @@ impl<'a> RowToRegion<'a> {
let mut region_request = Vec::with_capacity(requests.inserts.len());
let mut instant_request = Vec::with_capacity(requests.inserts.len());
for request in requests.inserts {
let Some(rows) = request.rows else { continue };
let table_id = self.get_table_id(&request.table_name)?;
let requests = Partitioner::new(self.partition_manager)
.partition_insert_requests(table_id, request.rows.unwrap_or_default())
.await?;
let region_numbers = self.region_numbers(&request.table_name)?;
let requests = if let Some(region_id) = match region_numbers[..] {
[singular] => Some(RegionId::new(table_id, singular)),
_ => None,
} {
vec![InsertRequest {
region_id: region_id.as_u64(),
rows: Some(rows),
}]
} else {
Partitioner::new(self.partition_manager)
.partition_insert_requests(table_id, rows)
.await?
};
if self.instant_table_ids.contains(&table_id) {
instant_request.extend(requests);
} else {
@@ -71,9 +86,16 @@ impl<'a> RowToRegion<'a> {
}
fn get_table_id(&self, table_name: &str) -> Result<TableId> {
self.table_name_to_ids
self.tables_info
.get(table_name)
.cloned()
.map(|x| x.table_id())
.context(TableNotFoundSnafu { table_name })
}
fn region_numbers(&self, table_name: &str) -> Result<&Vec<RegionNumber>> {
self.tables_info
.get(table_name)
.map(|x| &x.meta.region_numbers)
.context(TableNotFoundSnafu { table_name })
}
}

View File

@@ -25,6 +25,7 @@ use snafu::{ensure, OptionExt, ResultExt};
use sql::statements;
use sql::statements::insert::Insert;
use sqlparser::ast::{ObjectName, Value as SqlValue};
use table::metadata::TableInfoRef;
use table::TableRef;
use crate::error::{
@@ -61,7 +62,7 @@ impl<'a> StatementToRegion<'a> {
&self,
stmt: &Insert,
query_ctx: &QueryContextRef,
) -> Result<InstantAndNormalInsertRequests> {
) -> Result<(InstantAndNormalInsertRequests, TableInfoRef)> {
let (catalog, schema, table_name) = self.get_full_name(stmt.table_name())?;
let table = self.get_table(&catalog, &schema, &table_name).await?;
let table_schema = table.schema();
@@ -137,15 +138,21 @@ impl<'a> StatementToRegion<'a> {
.await?;
let requests = RegionInsertRequests { requests };
if table_info.is_ttl_instant_table() {
Ok(InstantAndNormalInsertRequests {
normal_requests: Default::default(),
instant_requests: requests,
})
Ok((
InstantAndNormalInsertRequests {
normal_requests: Default::default(),
instant_requests: requests,
},
table_info,
))
} else {
Ok(InstantAndNormalInsertRequests {
normal_requests: requests,
instant_requests: Default::default(),
})
Ok((
InstantAndNormalInsertRequests {
normal_requests: requests,
instant_requests: Default::default(),
},
table_info,
))
}
}

View File

@@ -80,35 +80,20 @@ impl<'a> SplitReadRowHelper<'a> {
fn split_rows(mut self) -> Result<HashMap<RegionNumber, Rows>> {
let regions = self.split_to_regions()?;
let request_splits = if regions.len() == 1 {
// fast path, zero copy
regions
.into_keys()
.map(|region_number| {
let rows = std::mem::take(&mut self.rows);
let rows = Rows {
schema: self.schema.clone(),
rows,
};
(region_number, rows)
})
.collect::<HashMap<_, _>>()
} else {
regions
.into_iter()
.map(|(region_number, row_indexes)| {
let rows = row_indexes
.into_iter()
.map(|row_idx| std::mem::take(&mut self.rows[row_idx]))
.collect();
let rows = Rows {
schema: self.schema.clone(),
rows,
};
(region_number, rows)
})
.collect::<HashMap<_, _>>()
};
let request_splits = regions
.into_iter()
.map(|(region_number, row_indexes)| {
let rows = row_indexes
.into_iter()
.map(|row_idx| std::mem::take(&mut self.rows[row_idx]))
.collect();
let rows = Rows {
schema: self.schema.clone(),
rows,
};
(region_number, rows)
})
.collect::<HashMap<_, _>>();
Ok(request_splits)
}

View File

@@ -38,7 +38,7 @@ pub async fn logs(
query_ctx.set_channel(Channel::Http);
let query_ctx = Arc::new(query_ctx);
let _timer = crate::metrics::METRIC_HTTP_LOGS_INGESTION_ELAPSED
let _timer = crate::metrics::METRIC_HTTP_LOGS_ELAPSED
.with_label_values(&[db.as_str()])
.start_timer();

View File

@@ -273,8 +273,11 @@ pub(crate) fn check(
) -> Option<Output> {
// INSERT don't need MySQL federated check. We assume the query doesn't contain
// federated or driver setup command if it starts with a 'INSERT' statement.
if query.len() > 6 && query[..6].eq_ignore_ascii_case("INSERT") {
return None;
let the_6th_index = query.char_indices().nth(6).map(|(i, _)| i);
if let Some(index) = the_6th_index {
if query[..index].eq_ignore_ascii_case("INSERT") {
return None;
}
}
// First to check the query is like "select @@variables".
@@ -295,6 +298,15 @@ mod test {
use super::*;
#[test]
fn test_check_abnormal() {
let session = Arc::new(Session::new(None, Channel::Mysql, Default::default()));
let query = "🫣一点不正常的东西🫣";
let output = check(query, QueryContext::arc(), session.clone());
assert!(output.is_none());
}
#[test]
fn test_check() {
let session = Arc::new(Session::new(None, Channel::Mysql, Default::default()));

View File

@@ -45,6 +45,7 @@ flow.workspace = true
frontend = { workspace = true, features = ["testing"] }
futures.workspace = true
futures-util.workspace = true
log-query = { workspace = true }
loki-api = "0.1"
meta-client.workspace = true
meta-srv = { workspace = true, features = ["mock"] }

View File

@@ -47,13 +47,10 @@ To run the integration test, please copy `.env.example` to `.env` in the project
GT_KAFKA_ENDPOINTS = localhost:9092
```
### Setup kafka standalone
### Setup kafka standalone
```
cd tests-integration/fixtures/kafka
cd tests-integration/fixtures
docker compose -f docker-compose-standalone.yml up
docker compose -f docker-compose-standalone.yml up kafka
```

View File

@@ -0,0 +1,72 @@
services:
zookeeper:
image: docker.io/bitnami/zookeeper:3.7
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: docker.io/bitnami/kafka:3.6.0
container_name: kafka
ports:
- 9092:9092
- 9093:9093
environment:
# KRaft settings
KAFKA_CFG_NODE_ID: "1"
KAFKA_CFG_PROCESS_ROLES: broker,controller
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@127.0.0.1:2181
# Listeners
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:9092,SECURE://localhost:9093
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SECURE:SASL_PLAINTEXT
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:2181,SECURE://:9093
ALLOW_PLAINTEXT_LISTENER: "yes"
KAFKA_BROKER_ID: "1"
KAFKA_CLIENT_USERS: "user_kafka"
KAFKA_CLIENT_PASSWORDS: "secret"
depends_on:
zookeeper:
condition: service_started
etcd:
image: docker.io/bitnami/etcd:3.5
ports:
- "2379:2379"
- "2380:2380"
environment:
ALLOW_NONE_AUTHENTICATION: "yes"
ETCD_NAME: etcd
ETCD_LISTEN_CLIENT_URLS: http://0.0.0.0:2379
ETCD_ADVERTISE_CLIENT_URLS: http://etcd:2379
ETCD_MAX_REQUEST_BYTES: 10485760
minio:
image: docker.io/bitnami/minio:2024
ports:
- '9000:9000'
- '9001:9001'
environment:
- MINIO_ROOT_USER=superpower_ci_user
- MINIO_ROOT_PASSWORD=superpower_password
- MINIO_DEFAULT_BUCKETS=greptime
- BITNAMI_DEBUG=true
volumes:
- 'minio_data:/bitnami/minio/data'
postgres:
image: docker.io/postgres:14-alpine
ports:
- 5432:5432
volumes:
- ~/apps/postgres:/var/lib/postgresql/data
environment:
- POSTGRES_USER=greptimedb
- POSTGRES_DB=postgres
- POSTGRES_PASSWORD=admin
volumes:
minio_data:
driver: local

View File

@@ -1,13 +0,0 @@
version: '3.8'
services:
etcd:
image: ghcr.io/zcube/bitnami-compat/etcd:3.5
ports:
- "2379:2379"
- "2380:2380"
environment:
ALLOW_NONE_AUTHENTICATION: "yes"
ETCD_NAME: etcd
ETCD_LISTEN_CLIENT_URLS: http://0.0.0.0:2379
ETCD_ADVERTISE_CLIENT_URLS: http://etcd:2379
ETCD_MAX_REQUEST_BYTES: 10485760

View File

@@ -1,19 +0,0 @@
## Starts a standalone kafka
```bash
docker compose -f docker-compose-standalone.yml up kafka -d
```
## Lists running services
```bash
docker compose -f docker-compose-standalone.yml ps
```
## Stops the standalone kafka
```bash
docker compose -f docker-compose-standalone.yml stop kafka
```
## Stops and removes the standalone kafka
```bash
docker compose -f docker-compose-standalone.yml down kafka
```

View File

@@ -1,28 +0,0 @@
version: '3.8'
services:
zookeeper:
image: bitnami/zookeeper:3.7
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: bitnami/kafka:3.6.0
container_name: kafka
ports:
- 9092:9092
- 9093:9093
environment:
# KRaft settings
KAFKA_CFG_NODE_ID: "1"
KAFKA_CFG_PROCESS_ROLES: broker,controller
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@127.0.0.1:2181
# Listeners
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:9092,SECURE://localhost:9093
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SECURE:SASL_PLAINTEXT
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:2181,SECURE://:9093
ALLOW_PLAINTEXT_LISTENER: "yes"
KAFKA_BROKER_ID: "1"
KAFKA_CLIENT_USERS: "user_kafka"
KAFKA_CLIENT_PASSWORDS: "secret"

View File

@@ -1,18 +0,0 @@
version: '3.8'
services:
minio:
image: bitnami/minio:2024
ports:
- '9000:9000'
- '9001:9001'
environment:
- MINIO_ROOT_USER=superpower_ci_user
- MINIO_ROOT_PASSWORD=superpower_password
- MINIO_DEFAULT_BUCKETS=greptime
- BITNAMI_DEBUG=true
volumes:
- 'minio_data:/bitnami/minio/data'
volumes:
minio_data:
driver: local

View File

@@ -1,12 +0,0 @@
version: '3.9'
services:
postgres:
image: postgres:14-alpine
ports:
- 5432:5432
volumes:
- ~/apps/postgres:/var/lib/postgresql/data
environment:
- POSTGRES_USER=greptimedb
- POSTGRES_DB=postgres
- POSTGRES_PASSWORD=admin

View File

@@ -394,6 +394,7 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router
ServerSqlQueryHandlerAdapter::arc(instance.instance.clone()),
None,
)
.with_logs_handler(instance.instance.clone())
.with_metrics_handler(MetricsHandler)
.with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap())
.build();
@@ -429,6 +430,7 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider(
Some(instance.instance.clone()),
)
.with_log_ingest_handler(instance.instance.clone(), None, None)
.with_logs_handler(instance.instance.clone())
.with_otlp_handler(instance.instance.clone())
.with_greptime_config_options(instance.opts.to_toml().unwrap());
@@ -467,6 +469,7 @@ pub async fn setup_test_prom_app_with_frontend(
ServerSqlQueryHandlerAdapter::arc(frontend_ref.clone()),
Some(frontend_ref.clone()),
)
.with_logs_handler(instance.instance.clone())
.with_prom_handler(frontend_ref.clone(), true, is_strict_mode)
.with_prometheus_handler(frontend_ref)
.with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap())

View File

@@ -22,6 +22,7 @@ use axum::http::{HeaderName, HeaderValue, StatusCode};
use common_error::status_code::StatusCode as ErrorCode;
use flate2::write::GzEncoder;
use flate2::Compression;
use log_query::{ColumnFilters, Context, Limit, LogQuery, TimeFilter};
use loki_api::logproto::{EntryAdapter, PushRequest, StreamAdapter};
use loki_api::prost_types::Timestamp;
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
@@ -40,6 +41,7 @@ use servers::http::result::influxdb_result_v1::{InfluxdbOutput, InfluxdbV1Respon
use servers::http::test_helpers::{TestClient, TestResponse};
use servers::http::GreptimeQueryOutput;
use servers::prom_store;
use table::table_name::TableName;
use tests_integration::test_util::{
setup_test_http_app, setup_test_http_app_with_frontend,
setup_test_http_app_with_frontend_and_user_provider, setup_test_prom_app_with_frontend,
@@ -97,6 +99,7 @@ macro_rules! http_tests {
test_otlp_traces,
test_otlp_logs,
test_loki_logs,
test_log_query,
);
)*
};
@@ -1882,6 +1885,68 @@ pub async fn test_loki_logs(store_type: StorageType) {
guard.remove_all().await;
}
pub async fn test_log_query(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_log_query").await;
let client = TestClient::new(app);
// prepare data with SQL API
let res = client
.get("/v1/sql?sql=create table logs (`ts` timestamp time index, message string);")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK, "{:?}", res.text().await);
let res = client
.post("/v1/sql?sql=insert into logs values ('2024-11-07 10:53:50', 'hello');")
.header("Content-Type", "application/x-www-form-urlencoded")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK, "{:?}", res.text().await);
// test log query
let log_query = LogQuery {
table: TableName {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: "logs".to_string(),
},
time_filter: TimeFilter {
start: Some("2024-11-07".to_string()),
end: None,
span: None,
},
limit: Limit {
skip: None,
fetch: Some(1),
},
columns: vec![
ColumnFilters {
column_name: "ts".to_string(),
filters: vec![],
},
ColumnFilters {
column_name: "message".to_string(),
filters: vec![],
},
],
context: Context::None,
};
let res = client
.post("/v1/logs")
.header("Content-Type", "application/json")
.body(serde_json::to_string(&log_query).unwrap())
.send()
.await;
assert_eq!(res.status(), StatusCode::OK, "{:?}", res.text().await);
let resp = res.text().await;
let v = get_rows_from_output(&resp);
assert_eq!(v, "[[1730976830000,\"hello\"]]");
guard.remove_all().await;
}
async fn validate_data(test_name: &str, client: &TestClient, sql: &str, expected: &str) {
let res = client
.get(format!("/v1/sql?sql={sql}").as_str())

View File

@@ -0,0 +1,70 @@
CREATE TABLE bytes_log (
byte INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-- event time
TIME INDEX(ts)
);
Affected Rows: 0
CREATE TABLE approx_rate (
rate DOUBLE,
time_window TIMESTAMP,
update_at TIMESTAMP,
TIME INDEX(time_window)
);
Affected Rows: 0
CREATE FLOW find_approx_rate SINK TO approx_rate AS
SELECT
(max(byte) - min(byte)) / 30.0 as rate,
date_bin(INTERVAL '30 second', ts) as time_window
from
bytes_log
GROUP BY
time_window;
Affected Rows: 0
INSERT INTO
bytes_log (byte)
VALUES
(NULL),
(300);
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
+--------------------------------------+
| ADMIN FLUSH_FLOW('find_approx_rate') |
+--------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------+
-- since ts is default to now(), omit it when querying
SELECT
rate
FROM
approx_rate;
+------+
| rate |
+------+
| 0.0 |
+------+
DROP FLOW find_approx_rate;
Affected Rows: 0
DROP TABLE bytes_log;
Affected Rows: 0
DROP TABLE approx_rate;
Affected Rows: 0

View File

@@ -0,0 +1,41 @@
CREATE TABLE bytes_log (
byte INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-- event time
TIME INDEX(ts)
);
CREATE TABLE approx_rate (
rate DOUBLE,
time_window TIMESTAMP,
update_at TIMESTAMP,
TIME INDEX(time_window)
);
CREATE FLOW find_approx_rate SINK TO approx_rate AS
SELECT
(max(byte) - min(byte)) / 30.0 as rate,
date_bin(INTERVAL '30 second', ts) as time_window
from
bytes_log
GROUP BY
time_window;
INSERT INTO
bytes_log (byte)
VALUES
(NULL),
(300);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
-- since ts is default to now(), omit it when querying
SELECT
rate
FROM
approx_rate;
DROP FLOW find_approx_rate;
DROP TABLE bytes_log;
DROP TABLE approx_rate;

View File

@@ -257,3 +257,149 @@ drop table fox;
Affected Rows: 0
create table fox_zh (
ts timestamp time index,
fox string,
);
Affected Rows: 0
insert into fox_zh values
(1, '快速的棕色狐狸跳过了懒狗'),
(2, '这只狐狸非常聪明,跳过了高高的栅栏'),
(3, '狐狸和狗是好朋友,它们一起玩耍'),
(4, '狐狸跳过了一条小溪,狗在后面追赶'),
(5, '狐狸和狗都喜欢在森林里探险'),
(6, '狐狸跳过了一个大石头,狗却没有跳过去'),
(7, '狐狸和狗在阳光下休息,享受着温暖的时光'),
(8, '狐狸跳过了一个小山坡,狗在后面慢慢地走'),
(9, '狐狸和狗一起找到了一颗闪闪发光的宝石'),
(10, '狐狸跳过了一个小水坑,狗在旁边看着');
Affected Rows: 10
select fox from fox_zh where matches(fox, '狐狸 AND 跳过') order by ts;
+----------------------------------------+
| fox |
+----------------------------------------+
| 快速的棕色狐狸跳过了懒狗 |
| 这只狐狸非常聪明,跳过了高高的栅栏 |
| 狐狸跳过了一条小溪,狗在后面追赶 |
| 狐狸跳过了一个大石头,狗却没有跳过去 |
| 狐狸跳过了一个小山坡,狗在后面慢慢地走 |
| 狐狸跳过了一个小水坑,狗在旁边看着 |
+----------------------------------------+
select fox from fox_zh where matches(fox, '狐狸 OR 狗') order by ts;
+----------------------------------------+
| fox |
+----------------------------------------+
| 快速的棕色狐狸跳过了懒狗 |
| 这只狐狸非常聪明,跳过了高高的栅栏 |
| 狐狸和狗是好朋友,它们一起玩耍 |
| 狐狸跳过了一条小溪,狗在后面追赶 |
| 狐狸和狗都喜欢在森林里探险 |
| 狐狸跳过了一个大石头,狗却没有跳过去 |
| 狐狸和狗在阳光下休息,享受着温暖的时光 |
| 狐狸跳过了一个小山坡,狗在后面慢慢地走 |
| 狐狸和狗一起找到了一颗闪闪发光的宝石 |
| 狐狸跳过了一个小水坑,狗在旁边看着 |
+----------------------------------------+
select fox from fox_zh where matches(fox, '狐狸 AND 狗') order by ts;
+----------------------------------------+
| fox |
+----------------------------------------+
| 快速的棕色狐狸跳过了懒狗 |
| 狐狸和狗是好朋友,它们一起玩耍 |
| 狐狸跳过了一条小溪,狗在后面追赶 |
| 狐狸和狗都喜欢在森林里探险 |
| 狐狸跳过了一个大石头,狗却没有跳过去 |
| 狐狸和狗在阳光下休息,享受着温暖的时光 |
| 狐狸跳过了一个小山坡,狗在后面慢慢地走 |
| 狐狸和狗一起找到了一颗闪闪发光的宝石 |
| 狐狸跳过了一个小水坑,狗在旁边看着 |
+----------------------------------------+
select fox from fox_zh where matches(fox, '狐狸 -跳过') order by ts;
+----------------------------------------+
| fox |
+----------------------------------------+
| 狐狸和狗是好朋友,它们一起玩耍 |
| 狐狸和狗都喜欢在森林里探险 |
| 狐狸和狗在阳光下休息,享受着温暖的时光 |
| 狐狸和狗一起找到了一颗闪闪发光的宝石 |
+----------------------------------------+
select fox from fox_zh where matches(fox, '狐狸 AND 跳过 -石头') order by ts;
+----------------------------------------+
| fox |
+----------------------------------------+
| 快速的棕色狐狸跳过了懒狗 |
| 这只狐狸非常聪明,跳过了高高的栅栏 |
| 狐狸跳过了一条小溪,狗在后面追赶 |
| 狐狸跳过了一个小山坡,狗在后面慢慢地走 |
| 狐狸跳过了一个小水坑,狗在旁边看着 |
+----------------------------------------+
select fox from fox_zh where matches(fox, '(狐狸 OR 狗) AND 森林') order by ts;
+----------------------------+
| fox |
+----------------------------+
| 狐狸和狗都喜欢在森林里探险 |
+----------------------------+
select fox from fox_zh where matches(fox, '狐狸 AND (跳过 OR 追赶)') order by ts;
+----------------------------------------+
| fox |
+----------------------------------------+
| 快速的棕色狐狸跳过了懒狗 |
| 这只狐狸非常聪明,跳过了高高的栅栏 |
| 狐狸跳过了一条小溪,狗在后面追赶 |
| 狐狸跳过了一个大石头,狗却没有跳过去 |
| 狐狸跳过了一个小山坡,狗在后面慢慢地走 |
| 狐狸跳过了一个小水坑,狗在旁边看着 |
+----------------------------------------+
select fox from fox_zh where matches(fox, '狐狸 AND -(跳过 OR 追赶)') order by ts;
+----------------------------------------+
| fox |
+----------------------------------------+
| 狐狸和狗是好朋友,它们一起玩耍 |
| 狐狸和狗都喜欢在森林里探险 |
| 狐狸和狗在阳光下休息,享受着温暖的时光 |
| 狐狸和狗一起找到了一颗闪闪发光的宝石 |
+----------------------------------------+
select fox from fox_zh where matches(fox, '狐狸 AND 跳过 AND (小溪 OR 石头)') order by ts;
+--------------------------------------+
| fox |
+--------------------------------------+
| 狐狸跳过了一条小溪,狗在后面追赶 |
| 狐狸跳过了一个大石头,狗却没有跳过去 |
+--------------------------------------+
select fox from fox_zh where matches(fox, '狐狸 AND 跳过 AND -(石头 OR 栅栏)') order by ts;
+----------------------------------------+
| fox |
+----------------------------------------+
| 快速的棕色狐狸跳过了懒狗 |
| 狐狸跳过了一条小溪,狗在后面追赶 |
| 狐狸跳过了一个小山坡,狗在后面慢慢地走 |
| 狐狸跳过了一个小水坑,狗在旁边看着 |
+----------------------------------------+
drop table fox_zh;
Affected Rows: 0

View File

@@ -55,3 +55,42 @@ select fox from fox where matches(fox, 'over -(fox AND jumps)') order by ts;
select fox from fox where matches(fox, 'over AND -(-(fox OR jumps))') order by ts;
drop table fox;
create table fox_zh (
ts timestamp time index,
fox string,
);
insert into fox_zh values
(1, '快速的棕色狐狸跳过了懒狗'),
(2, '这只狐狸非常聪明,跳过了高高的栅栏'),
(3, '狐狸和狗是好朋友,它们一起玩耍'),
(4, '狐狸跳过了一条小溪,狗在后面追赶'),
(5, '狐狸和狗都喜欢在森林里探险'),
(6, '狐狸跳过了一个大石头,狗却没有跳过去'),
(7, '狐狸和狗在阳光下休息,享受着温暖的时光'),
(8, '狐狸跳过了一个小山坡,狗在后面慢慢地走'),
(9, '狐狸和狗一起找到了一颗闪闪发光的宝石'),
(10, '狐狸跳过了一个小水坑,狗在旁边看着');
select fox from fox_zh where matches(fox, '狐狸 AND 跳过') order by ts;
select fox from fox_zh where matches(fox, '狐狸 OR 狗') order by ts;
select fox from fox_zh where matches(fox, '狐狸 AND 狗') order by ts;
select fox from fox_zh where matches(fox, '狐狸 -跳过') order by ts;
select fox from fox_zh where matches(fox, '狐狸 AND 跳过 -石头') order by ts;
select fox from fox_zh where matches(fox, '(狐狸 OR 狗) AND 森林') order by ts;
select fox from fox_zh where matches(fox, '狐狸 AND (跳过 OR 追赶)') order by ts;
select fox from fox_zh where matches(fox, '狐狸 AND -(跳过 OR 追赶)') order by ts;
select fox from fox_zh where matches(fox, '狐狸 AND 跳过 AND (小溪 OR 石头)') order by ts;
select fox from fox_zh where matches(fox, '狐狸 AND 跳过 AND -(石头 OR 栅栏)') order by ts;
drop table fox_zh;