mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-25 23:49:58 +00:00
Compare commits
51 Commits
chore/benc
...
chore/debu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1e37847f48 | ||
|
|
2b89970d45 | ||
|
|
53d006292d | ||
|
|
d18c8b5e16 | ||
|
|
e0949c4a11 | ||
|
|
5cf931c417 | ||
|
|
cc5b1d42b0 | ||
|
|
55b7656956 | ||
|
|
75e4f307c9 | ||
|
|
89f2e15ffb | ||
|
|
13ed10556a | ||
|
|
d1108ab581 | ||
|
|
1287d4cb9f | ||
|
|
109fe04d17 | ||
|
|
f1eb76f489 | ||
|
|
11bab0c47c | ||
|
|
588f6755f0 | ||
|
|
dad8ac6f71 | ||
|
|
ef13c52814 | ||
|
|
7471f55c2e | ||
|
|
f4b2d393be | ||
|
|
0cf44e1e47 | ||
|
|
00ad27dd2e | ||
|
|
5ba8bd09fb | ||
|
|
a9f21915ef | ||
|
|
039989f77b | ||
|
|
abf34b845c | ||
|
|
4051be4214 | ||
|
|
5e88c80394 | ||
|
|
6a46f391cc | ||
|
|
c96903e60c | ||
|
|
a23f269bb1 | ||
|
|
f33b378e45 | ||
|
|
267941bbb5 | ||
|
|
074846bbc2 | ||
|
|
88d46a38ae | ||
|
|
de0beabf34 | ||
|
|
68dd2916fb | ||
|
|
d51b65a8bf | ||
|
|
2082c4b6e4 | ||
|
|
c623404fff | ||
|
|
fa3b7ed5ea | ||
|
|
8ece853076 | ||
|
|
4245bff8f2 | ||
|
|
3d4121aefb | ||
|
|
1910d71cb3 | ||
|
|
a578eea801 | ||
|
|
6bf574f098 | ||
|
|
a4d61bcaf1 | ||
|
|
7ea8a44d3a | ||
|
|
2d6f63a504 |
@@ -54,7 +54,7 @@ runs:
|
||||
PROFILE_TARGET: ${{ inputs.cargo-profile == 'dev' && 'debug' || inputs.cargo-profile }}
|
||||
with:
|
||||
artifacts-dir: ${{ inputs.artifacts-dir }}
|
||||
target-file: ./target/$PROFILE_TARGET/greptime
|
||||
target-files: ./target/$PROFILE_TARGET/greptime
|
||||
version: ${{ inputs.version }}
|
||||
working-dir: ${{ inputs.working-dir }}
|
||||
|
||||
@@ -72,6 +72,6 @@ runs:
|
||||
if: ${{ inputs.build-android-artifacts == 'true' }}
|
||||
with:
|
||||
artifacts-dir: ${{ inputs.artifacts-dir }}
|
||||
target-file: ./target/aarch64-linux-android/release/greptime
|
||||
target-files: ./target/aarch64-linux-android/release/greptime
|
||||
version: ${{ inputs.version }}
|
||||
working-dir: ${{ inputs.working-dir }}
|
||||
|
||||
4
.github/actions/build-images/action.yml
vendored
4
.github/actions/build-images/action.yml
vendored
@@ -41,8 +41,8 @@ runs:
|
||||
image-name: ${{ inputs.image-name }}
|
||||
image-tag: ${{ inputs.version }}
|
||||
docker-file: docker/ci/ubuntu/Dockerfile
|
||||
amd64-artifact-name: greptime-linux-amd64-pyo3-${{ inputs.version }}
|
||||
arm64-artifact-name: greptime-linux-arm64-pyo3-${{ inputs.version }}
|
||||
amd64-artifact-name: greptime-linux-amd64-${{ inputs.version }}
|
||||
arm64-artifact-name: greptime-linux-arm64-${{ inputs.version }}
|
||||
platforms: linux/amd64,linux/arm64
|
||||
push-latest-tag: ${{ inputs.push-latest-tag }}
|
||||
|
||||
|
||||
14
.github/actions/build-linux-artifacts/action.yml
vendored
14
.github/actions/build-linux-artifacts/action.yml
vendored
@@ -48,19 +48,7 @@ runs:
|
||||
path: /tmp/greptime-*.log
|
||||
retention-days: 3
|
||||
|
||||
- name: Build standard greptime
|
||||
uses: ./.github/actions/build-greptime-binary
|
||||
with:
|
||||
base-image: ubuntu
|
||||
features: pyo3_backend,servers/dashboard
|
||||
cargo-profile: ${{ inputs.cargo-profile }}
|
||||
artifacts-dir: greptime-linux-${{ inputs.arch }}-pyo3-${{ inputs.version }}
|
||||
version: ${{ inputs.version }}
|
||||
working-dir: ${{ inputs.working-dir }}
|
||||
image-registry: ${{ inputs.image-registry }}
|
||||
image-namespace: ${{ inputs.image-namespace }}
|
||||
|
||||
- name: Build greptime without pyo3
|
||||
- name: Build greptime
|
||||
if: ${{ inputs.dev-mode == 'false' }}
|
||||
uses: ./.github/actions/build-greptime-binary
|
||||
with:
|
||||
|
||||
@@ -90,5 +90,5 @@ runs:
|
||||
uses: ./.github/actions/upload-artifacts
|
||||
with:
|
||||
artifacts-dir: ${{ inputs.artifacts-dir }}
|
||||
target-file: target/${{ inputs.arch }}/${{ inputs.cargo-profile }}/greptime
|
||||
target-files: target/${{ inputs.arch }}/${{ inputs.cargo-profile }}/greptime
|
||||
version: ${{ inputs.version }}
|
||||
|
||||
@@ -33,15 +33,6 @@ runs:
|
||||
- name: Rust Cache
|
||||
uses: Swatinem/rust-cache@v2
|
||||
|
||||
- name: Install Python
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: "3.10"
|
||||
|
||||
- name: Install PyArrow Package
|
||||
shell: pwsh
|
||||
run: pip install pyarrow numpy
|
||||
|
||||
- name: Install WSL distribution
|
||||
uses: Vampire/setup-wsl@v2
|
||||
with:
|
||||
@@ -76,5 +67,5 @@ runs:
|
||||
uses: ./.github/actions/upload-artifacts
|
||||
with:
|
||||
artifacts-dir: ${{ inputs.artifacts-dir }}
|
||||
target-file: target/${{ inputs.arch }}/${{ inputs.cargo-profile }}/greptime
|
||||
target-files: target/${{ inputs.arch }}/${{ inputs.cargo-profile }}/greptime,target/${{ inputs.arch }}/${{ inputs.cargo-profile }}/greptime.pdb
|
||||
version: ${{ inputs.version }}
|
||||
|
||||
@@ -5,7 +5,7 @@ meta:
|
||||
|
||||
[datanode]
|
||||
[datanode.client]
|
||||
timeout = "60s"
|
||||
timeout = "120s"
|
||||
datanode:
|
||||
configData: |-
|
||||
[runtime]
|
||||
@@ -21,7 +21,7 @@ frontend:
|
||||
global_rt_size = 4
|
||||
|
||||
[meta_client]
|
||||
ddl_timeout = "60s"
|
||||
ddl_timeout = "120s"
|
||||
objectStorage:
|
||||
s3:
|
||||
bucket: default
|
||||
|
||||
@@ -5,7 +5,7 @@ meta:
|
||||
|
||||
[datanode]
|
||||
[datanode.client]
|
||||
timeout = "60s"
|
||||
timeout = "120s"
|
||||
datanode:
|
||||
configData: |-
|
||||
[runtime]
|
||||
@@ -17,7 +17,7 @@ frontend:
|
||||
global_rt_size = 4
|
||||
|
||||
[meta_client]
|
||||
ddl_timeout = "60s"
|
||||
ddl_timeout = "120s"
|
||||
objectStorage:
|
||||
s3:
|
||||
bucket: default
|
||||
|
||||
@@ -11,7 +11,7 @@ meta:
|
||||
|
||||
[datanode]
|
||||
[datanode.client]
|
||||
timeout = "60s"
|
||||
timeout = "120s"
|
||||
datanode:
|
||||
configData: |-
|
||||
[runtime]
|
||||
@@ -28,7 +28,7 @@ frontend:
|
||||
global_rt_size = 4
|
||||
|
||||
[meta_client]
|
||||
ddl_timeout = "60s"
|
||||
ddl_timeout = "120s"
|
||||
objectStorage:
|
||||
s3:
|
||||
bucket: default
|
||||
|
||||
14
.github/actions/upload-artifacts/action.yml
vendored
14
.github/actions/upload-artifacts/action.yml
vendored
@@ -4,8 +4,8 @@ inputs:
|
||||
artifacts-dir:
|
||||
description: Directory to store artifacts
|
||||
required: true
|
||||
target-file:
|
||||
description: The path of the target artifact
|
||||
target-files:
|
||||
description: The multiple target files to upload, separated by comma
|
||||
required: false
|
||||
version:
|
||||
description: Version of the artifact
|
||||
@@ -18,12 +18,16 @@ runs:
|
||||
using: composite
|
||||
steps:
|
||||
- name: Create artifacts directory
|
||||
if: ${{ inputs.target-file != '' }}
|
||||
if: ${{ inputs.target-files != '' }}
|
||||
working-directory: ${{ inputs.working-dir }}
|
||||
shell: bash
|
||||
run: |
|
||||
mkdir -p ${{ inputs.artifacts-dir }} && \
|
||||
cp ${{ inputs.target-file }} ${{ inputs.artifacts-dir }}
|
||||
set -e
|
||||
mkdir -p ${{ inputs.artifacts-dir }}
|
||||
IFS=',' read -ra FILES <<< "${{ inputs.target-files }}"
|
||||
for file in "${FILES[@]}"; do
|
||||
cp "$file" ${{ inputs.artifacts-dir }}/
|
||||
done
|
||||
|
||||
# The compressed artifacts will use the following layout:
|
||||
# greptime-linux-amd64-pyo3-v0.3.0sha256sum
|
||||
|
||||
10
.github/pull_request_template.md
vendored
10
.github/pull_request_template.md
vendored
@@ -4,7 +4,8 @@ I hereby agree to the terms of the [GreptimeDB CLA](https://github.com/GreptimeT
|
||||
|
||||
## What's changed and what's your intention?
|
||||
|
||||
__!!! DO NOT LEAVE THIS BLOCK EMPTY !!!__
|
||||
<!--
|
||||
__!!! DO NOT LEAVE THIS BLOCK EMPTY !!!__
|
||||
|
||||
Please explain IN DETAIL what the changes are in this PR and why they are needed:
|
||||
|
||||
@@ -12,9 +13,14 @@ Please explain IN DETAIL what the changes are in this PR and why they are needed
|
||||
- How does this PR work? Need a brief introduction for the changed logic (optional)
|
||||
- Describe clearly one logical change and avoid lazy messages (optional)
|
||||
- Describe any limitations of the current code (optional)
|
||||
- Describe if this PR will break **API or data compatibility** (optional)
|
||||
-->
|
||||
|
||||
## Checklist
|
||||
## PR Checklist
|
||||
Please convert it to a draft if some of the following conditions are not met.
|
||||
|
||||
- [ ] I have written the necessary rustdoc comments.
|
||||
- [ ] I have added the necessary unit tests and integration tests.
|
||||
- [ ] This PR requires documentation updates.
|
||||
- [ ] API changes are backward compatible.
|
||||
- [ ] Schema or data changes are backward compatible.
|
||||
|
||||
34
.github/workflows/develop.yml
vendored
34
.github/workflows/develop.yml
vendored
@@ -10,17 +10,6 @@ on:
|
||||
- 'docker/**'
|
||||
- '.gitignore'
|
||||
- 'grafana/**'
|
||||
push:
|
||||
branches:
|
||||
- main
|
||||
paths-ignore:
|
||||
- 'docs/**'
|
||||
- 'config/**'
|
||||
- '**.md'
|
||||
- '.dockerignore'
|
||||
- 'docker/**'
|
||||
- '.gitignore'
|
||||
- 'grafana/**'
|
||||
workflow_dispatch:
|
||||
|
||||
name: CI
|
||||
@@ -84,7 +73,7 @@ jobs:
|
||||
# Shares across multiple jobs
|
||||
shared-key: "check-toml"
|
||||
- name: Install taplo
|
||||
run: cargo +stable install taplo-cli --version ^0.9 --locked
|
||||
run: cargo +stable install taplo-cli --version ^0.9 --locked --force
|
||||
- name: Run taplo
|
||||
run: taplo format --check
|
||||
|
||||
@@ -107,7 +96,7 @@ jobs:
|
||||
shared-key: "build-binaries"
|
||||
- name: Install cargo-gc-bin
|
||||
shell: bash
|
||||
run: cargo install cargo-gc-bin
|
||||
run: cargo install cargo-gc-bin --force
|
||||
- name: Build greptime binaries
|
||||
shell: bash
|
||||
# `cargo gc` will invoke `cargo build` with specified args
|
||||
@@ -163,7 +152,7 @@ jobs:
|
||||
run: |
|
||||
sudo apt-get install -y libfuzzer-14-dev
|
||||
rustup install nightly
|
||||
cargo +nightly install cargo-fuzz cargo-gc-bin
|
||||
cargo +nightly install cargo-fuzz cargo-gc-bin --force
|
||||
- name: Download pre-built binaries
|
||||
uses: actions/download-artifact@v4
|
||||
with:
|
||||
@@ -220,7 +209,7 @@ jobs:
|
||||
shell: bash
|
||||
run: |
|
||||
sudo apt update && sudo apt install -y libfuzzer-14-dev
|
||||
cargo install cargo-fuzz cargo-gc-bin
|
||||
cargo install cargo-fuzz cargo-gc-bin --force
|
||||
- name: Download pre-built binariy
|
||||
uses: actions/download-artifact@v4
|
||||
with:
|
||||
@@ -268,7 +257,7 @@ jobs:
|
||||
shared-key: "build-greptime-ci"
|
||||
- name: Install cargo-gc-bin
|
||||
shell: bash
|
||||
run: cargo install cargo-gc-bin
|
||||
run: cargo install cargo-gc-bin --force
|
||||
- name: Build greptime bianry
|
||||
shell: bash
|
||||
# `cargo gc` will invoke `cargo build` with specified args
|
||||
@@ -338,7 +327,7 @@ jobs:
|
||||
run: |
|
||||
sudo apt-get install -y libfuzzer-14-dev
|
||||
rustup install nightly
|
||||
cargo +nightly install cargo-fuzz cargo-gc-bin
|
||||
cargo +nightly install cargo-fuzz cargo-gc-bin --force
|
||||
# Downloads ci image
|
||||
- name: Download pre-built binariy
|
||||
uses: actions/download-artifact@v4
|
||||
@@ -487,7 +476,7 @@ jobs:
|
||||
run: |
|
||||
sudo apt-get install -y libfuzzer-14-dev
|
||||
rustup install nightly
|
||||
cargo +nightly install cargo-fuzz cargo-gc-bin
|
||||
cargo +nightly install cargo-fuzz cargo-gc-bin --force
|
||||
# Downloads ci image
|
||||
- name: Download pre-built binariy
|
||||
uses: actions/download-artifact@v4
|
||||
@@ -653,6 +642,7 @@ jobs:
|
||||
if: github.event.pull_request.draft == false
|
||||
runs-on: ubuntu-20.04-8-cores
|
||||
timeout-minutes: 60
|
||||
needs: [clippy, fmt]
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: arduino/setup-protoc@v3
|
||||
@@ -678,12 +668,6 @@ jobs:
|
||||
uses: taiki-e/install-action@nextest
|
||||
- name: Install cargo-llvm-cov
|
||||
uses: taiki-e/install-action@cargo-llvm-cov
|
||||
- name: Install Python
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: '3.10'
|
||||
- name: Install PyArrow Package
|
||||
run: pip install pyarrow numpy
|
||||
- name: Setup etcd server
|
||||
working-directory: tests-integration/fixtures/etcd
|
||||
run: docker compose -f docker-compose-standalone.yml up -d --wait
|
||||
@@ -697,7 +681,7 @@ jobs:
|
||||
working-directory: tests-integration/fixtures/postgres
|
||||
run: docker compose -f docker-compose-standalone.yml up -d --wait
|
||||
- name: Run nextest cases
|
||||
run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F pyo3_backend -F dashboard
|
||||
run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F dashboard -F pg_kvbackend
|
||||
env:
|
||||
CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=lld"
|
||||
RUST_BACKTRACE: 1
|
||||
|
||||
11
.github/workflows/nightly-ci.yml
vendored
11
.github/workflows/nightly-ci.yml
vendored
@@ -1,6 +1,6 @@
|
||||
on:
|
||||
schedule:
|
||||
- cron: "0 23 * * 1-5"
|
||||
- cron: "0 23 * * 1-4"
|
||||
workflow_dispatch:
|
||||
|
||||
name: Nightly CI
|
||||
@@ -91,18 +91,12 @@ jobs:
|
||||
uses: Swatinem/rust-cache@v2
|
||||
- name: Install Cargo Nextest
|
||||
uses: taiki-e/install-action@nextest
|
||||
- name: Install Python
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: "3.10"
|
||||
- name: Install PyArrow Package
|
||||
run: pip install pyarrow numpy
|
||||
- name: Install WSL distribution
|
||||
uses: Vampire/setup-wsl@v2
|
||||
with:
|
||||
distribution: Ubuntu-22.04
|
||||
- name: Running tests
|
||||
run: cargo nextest run -F pyo3_backend,dashboard
|
||||
run: cargo nextest run -F dashboard
|
||||
env:
|
||||
CARGO_BUILD_RUSTFLAGS: "-C linker=lld-link"
|
||||
RUST_BACKTRACE: 1
|
||||
@@ -117,7 +111,6 @@ jobs:
|
||||
cleanbuild-linux-nix:
|
||||
runs-on: ubuntu-latest-8-cores
|
||||
timeout-minutes: 60
|
||||
needs: [coverage, fmt, clippy, check]
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: cachix/install-nix-action@v27
|
||||
|
||||
14
.github/workflows/release.yml
vendored
14
.github/workflows/release.yml
vendored
@@ -31,7 +31,7 @@ on:
|
||||
linux_arm64_runner:
|
||||
type: choice
|
||||
description: The runner uses to build linux-arm64 artifacts
|
||||
default: ec2-c6g.4xlarge-arm64
|
||||
default: ec2-c6g.8xlarge-arm64
|
||||
options:
|
||||
- ubuntu-2204-32-cores-arm
|
||||
- ec2-c6g.xlarge-arm64 # 4C8G
|
||||
@@ -222,18 +222,10 @@ jobs:
|
||||
arch: aarch64-apple-darwin
|
||||
features: servers/dashboard
|
||||
artifacts-dir-prefix: greptime-darwin-arm64
|
||||
- os: ${{ needs.allocate-runners.outputs.macos-runner }}
|
||||
arch: aarch64-apple-darwin
|
||||
features: pyo3_backend,servers/dashboard
|
||||
artifacts-dir-prefix: greptime-darwin-arm64-pyo3
|
||||
- os: ${{ needs.allocate-runners.outputs.macos-runner }}
|
||||
features: servers/dashboard
|
||||
arch: x86_64-apple-darwin
|
||||
artifacts-dir-prefix: greptime-darwin-amd64
|
||||
- os: ${{ needs.allocate-runners.outputs.macos-runner }}
|
||||
features: pyo3_backend,servers/dashboard
|
||||
arch: x86_64-apple-darwin
|
||||
artifacts-dir-prefix: greptime-darwin-amd64-pyo3
|
||||
runs-on: ${{ matrix.os }}
|
||||
outputs:
|
||||
build-macos-result: ${{ steps.set-build-macos-result.outputs.build-macos-result }}
|
||||
@@ -271,10 +263,6 @@ jobs:
|
||||
arch: x86_64-pc-windows-msvc
|
||||
features: servers/dashboard
|
||||
artifacts-dir-prefix: greptime-windows-amd64
|
||||
- os: ${{ needs.allocate-runners.outputs.windows-runner }}
|
||||
arch: x86_64-pc-windows-msvc
|
||||
features: pyo3_backend,servers/dashboard
|
||||
artifacts-dir-prefix: greptime-windows-amd64-pyo3
|
||||
runs-on: ${{ matrix.os }}
|
||||
outputs:
|
||||
build-windows-result: ${{ steps.set-build-windows-result.outputs.build-windows-result }}
|
||||
|
||||
182
Cargo.lock
generated
182
Cargo.lock
generated
@@ -730,6 +730,36 @@ version = "1.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
|
||||
|
||||
[[package]]
|
||||
name = "attribute-derive"
|
||||
version = "0.10.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f1800e974930e9079c965b9ffbcb6667a40401063a26396c7b4f15edc92da690"
|
||||
dependencies = [
|
||||
"attribute-derive-macro",
|
||||
"derive-where",
|
||||
"manyhow",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.90",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "attribute-derive-macro"
|
||||
version = "0.10.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5d908eb786ef94296bff86f90130b3b748b49401dc81fd2bb8b3dccd44cfacbd"
|
||||
dependencies = [
|
||||
"collection_literals",
|
||||
"interpolator",
|
||||
"manyhow",
|
||||
"proc-macro-utils",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"quote-use",
|
||||
"syn 2.0.90",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "atty"
|
||||
version = "0.2.14"
|
||||
@@ -1845,6 +1875,12 @@ dependencies = [
|
||||
"tracing-appender",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "collection_literals"
|
||||
version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "186dce98367766de751c42c4f03970fc60fc012296e706ccbb9d5df9b6c1e271"
|
||||
|
||||
[[package]]
|
||||
name = "colorchoice"
|
||||
version = "1.0.2"
|
||||
@@ -1980,6 +2016,7 @@ dependencies = [
|
||||
name = "common-error"
|
||||
version = "0.12.0"
|
||||
dependencies = [
|
||||
"http 0.2.12",
|
||||
"snafu 0.8.5",
|
||||
"strum 0.25.0",
|
||||
"tonic 0.11.0",
|
||||
@@ -2314,6 +2351,8 @@ dependencies = [
|
||||
"snafu 0.8.5",
|
||||
"tempfile",
|
||||
"tokio",
|
||||
"tokio-metrics",
|
||||
"tokio-metrics-collector",
|
||||
"tokio-test",
|
||||
"tokio-util",
|
||||
]
|
||||
@@ -3346,6 +3385,17 @@ dependencies = [
|
||||
"syn 2.0.90",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "derive-where"
|
||||
version = "1.2.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "62d671cc41a825ebabc75757b62d3d168c577f9149b2d49ece1dad1f72119d25"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.90",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "derive_arbitrary"
|
||||
version = "1.3.2"
|
||||
@@ -4011,7 +4061,10 @@ dependencies = [
|
||||
"enum-as-inner",
|
||||
"enum_dispatch",
|
||||
"futures",
|
||||
"get-size-derive2",
|
||||
"get-size2",
|
||||
"greptime-proto",
|
||||
"http 0.2.12",
|
||||
"hydroflow",
|
||||
"itertools 0.10.5",
|
||||
"lazy_static",
|
||||
@@ -4103,6 +4156,7 @@ dependencies = [
|
||||
"futures",
|
||||
"humantime-serde",
|
||||
"lazy_static",
|
||||
"log-query",
|
||||
"log-store",
|
||||
"meta-client",
|
||||
"opentelemetry-proto 0.5.0",
|
||||
@@ -4415,6 +4469,23 @@ dependencies = [
|
||||
"libm",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "get-size-derive2"
|
||||
version = "0.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1fd26d3a97ea14d289c8b54180243ecfe465f3fa9c279a6336d7a003698fc39d"
|
||||
dependencies = [
|
||||
"attribute-derive",
|
||||
"quote",
|
||||
"syn 2.0.90",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "get-size2"
|
||||
version = "0.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "159c430715e540d2198fa981d39cd45563ccc60900de187f5b152b33b1cb408e"
|
||||
|
||||
[[package]]
|
||||
name = "gethostname"
|
||||
version = "0.2.3"
|
||||
@@ -5219,6 +5290,7 @@ dependencies = [
|
||||
"futures",
|
||||
"greptime-proto",
|
||||
"mockall",
|
||||
"parquet",
|
||||
"pin-project",
|
||||
"prost 0.12.6",
|
||||
"rand",
|
||||
@@ -5346,6 +5418,12 @@ version = "4.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0d762194228a2f1c11063e46e32e5acb96e66e906382b9eb5441f2e0504bbd5a"
|
||||
|
||||
[[package]]
|
||||
name = "interpolator"
|
||||
version = "0.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "71dd52191aae121e8611f1e8dc3e324dd0dd1dee1e6dd91d10ee07a3cfb4d9d8"
|
||||
|
||||
[[package]]
|
||||
name = "inventory"
|
||||
version = "0.3.15"
|
||||
@@ -6050,6 +6128,7 @@ dependencies = [
|
||||
"chrono",
|
||||
"common-error",
|
||||
"common-macro",
|
||||
"serde",
|
||||
"snafu 0.8.5",
|
||||
"table",
|
||||
]
|
||||
@@ -6244,6 +6323,29 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "manyhow"
|
||||
version = "0.11.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b33efb3ca6d3b07393750d4030418d594ab1139cee518f0dc88db70fec873587"
|
||||
dependencies = [
|
||||
"manyhow-macros",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.90",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "manyhow-macros"
|
||||
version = "0.11.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "46fce34d199b78b6e6073abf984c9cf5fd3e9330145a93ee0738a7443e371495"
|
||||
dependencies = [
|
||||
"proc-macro-utils",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "maplit"
|
||||
version = "1.0.2"
|
||||
@@ -7375,8 +7477,7 @@ checksum = "b410bbe7e14ab526a0e86877eb47c6996a2bd7746f027ba551028c925390e4e9"
|
||||
[[package]]
|
||||
name = "opendal"
|
||||
version = "0.50.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cb28bb6c64e116ceaf8dd4e87099d3cfea4a58e85e62b104fef74c91afba0f44"
|
||||
source = "git+https://github.com/GreptimeTeam/opendal.git?rev=c82605177f2feec83e49dcaa537c505639d94024#c82605177f2feec83e49dcaa537c505639d94024"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
@@ -8065,7 +8166,7 @@ dependencies = [
|
||||
"rand",
|
||||
"ring 0.17.8",
|
||||
"rust_decimal",
|
||||
"thiserror 2.0.4",
|
||||
"thiserror 2.0.6",
|
||||
"tokio",
|
||||
"tokio-rustls 0.26.0",
|
||||
"tokio-util",
|
||||
@@ -8528,6 +8629,17 @@ dependencies = [
|
||||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro-utils"
|
||||
version = "0.10.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "eeaf08a13de400bc215877b5bdc088f241b12eb42f0a548d3390dc1c56bb7071"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"smallvec",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro2"
|
||||
version = "1.0.92"
|
||||
@@ -8992,8 +9104,10 @@ dependencies = [
|
||||
"humantime",
|
||||
"itertools 0.10.5",
|
||||
"lazy_static",
|
||||
"log-query",
|
||||
"meter-core",
|
||||
"meter-macros",
|
||||
"nalgebra 0.33.2",
|
||||
"num",
|
||||
"num-traits",
|
||||
"object-store",
|
||||
@@ -9107,6 +9221,28 @@ dependencies = [
|
||||
"proc-macro2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quote-use"
|
||||
version = "0.8.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9619db1197b497a36178cfc736dc96b271fe918875fbf1344c436a7e93d0321e"
|
||||
dependencies = [
|
||||
"quote",
|
||||
"quote-use-macros",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quote-use-macros"
|
||||
version = "0.8.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "82ebfb7faafadc06a7ab141a6f67bcfb24cb8beb158c6fe933f2f035afa99f35"
|
||||
dependencies = [
|
||||
"proc-macro-utils",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.90",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "radium"
|
||||
version = "0.7.0"
|
||||
@@ -10824,6 +10960,7 @@ dependencies = [
|
||||
"json5",
|
||||
"jsonb",
|
||||
"lazy_static",
|
||||
"log-query",
|
||||
"loki-api",
|
||||
"mime_guess",
|
||||
"mysql_async",
|
||||
@@ -12306,11 +12443,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "thiserror"
|
||||
version = "2.0.4"
|
||||
version = "2.0.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2f49a1853cf82743e3b7950f77e0f4d622ca36cf4317cba00c767838bac8d490"
|
||||
checksum = "8fec2a1820ebd077e2b90c4df007bebf344cd394098a13c563957d0afc83ea47"
|
||||
dependencies = [
|
||||
"thiserror-impl 2.0.4",
|
||||
"thiserror-impl 2.0.6",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -12326,9 +12463,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "thiserror-impl"
|
||||
version = "2.0.4"
|
||||
version = "2.0.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8381894bb3efe0c4acac3ded651301ceee58a15d47c2e34885ed1908ad667061"
|
||||
checksum = "d65750cab40f4ff1929fb1ba509e9914eb756131cef4210da8d5d700d26f6312"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -12484,9 +12621,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
|
||||
|
||||
[[package]]
|
||||
name = "tokio"
|
||||
version = "1.40.0"
|
||||
version = "1.42.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998"
|
||||
checksum = "5cec9b21b0450273377fc97bd4c33a8acffc8c996c987a7c5b319a0083707551"
|
||||
dependencies = [
|
||||
"backtrace",
|
||||
"bytes",
|
||||
@@ -12522,6 +12659,31 @@ dependencies = [
|
||||
"syn 2.0.90",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-metrics"
|
||||
version = "0.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "eace09241d62c98b7eeb1107d4c5c64ca3bd7da92e8c218c153ab3a78f9be112"
|
||||
dependencies = [
|
||||
"futures-util",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-metrics-collector"
|
||||
version = "0.2.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a8092b7a97ed5dac2f44892db190eca8f476ede0fa585bc87664de4151cd0b64"
|
||||
dependencies = [
|
||||
"lazy_static",
|
||||
"parking_lot 0.12.3",
|
||||
"prometheus",
|
||||
"tokio",
|
||||
"tokio-metrics",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-postgres"
|
||||
version = "0.7.12"
|
||||
|
||||
@@ -126,6 +126,7 @@ futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a875e976441188028353f7274a46a7e6e065c5d4" }
|
||||
hex = "0.4"
|
||||
http = "0.2"
|
||||
humantime = "2.1"
|
||||
humantime-serde = "1.1"
|
||||
itertools = "0.10"
|
||||
@@ -134,6 +135,7 @@ lazy_static = "1.4"
|
||||
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "a10facb353b41460eeb98578868ebf19c2084fac" }
|
||||
mockall = "0.11.4"
|
||||
moka = "0.12"
|
||||
nalgebra = "0.33"
|
||||
notify = "6.1"
|
||||
num_cpus = "1.16"
|
||||
once_cell = "1.18"
|
||||
@@ -238,6 +240,7 @@ file-engine = { path = "src/file-engine" }
|
||||
flow = { path = "src/flow" }
|
||||
frontend = { path = "src/frontend", default-features = false }
|
||||
index = { path = "src/index" }
|
||||
log-query = { path = "src/log-query" }
|
||||
log-store = { path = "src/log-store" }
|
||||
meta-client = { path = "src/meta-client" }
|
||||
meta-srv = { path = "src/meta-srv" }
|
||||
|
||||
18
README.md
18
README.md
@@ -70,23 +70,23 @@ Our core developers have been building time-series data platforms for years. Bas
|
||||
|
||||
* **Unified Processing of Metrics, Logs, and Events**
|
||||
|
||||
GreptimeDB unifies time series data processing by treating all data - whether metrics, logs, or events - as timestamped events with context. Users can analyze this data using either [SQL](https://docs.greptime.com/user-guide/query-data/sql) or [PromQL](https://docs.greptime.com/user-guide/query-data/promql) and leverage stream processing ([Flow](https://docs.greptime.com/user-guide/continuous-aggregation/overview)) to enable continuous aggregation. [Read more](https://docs.greptime.com/user-guide/concepts/data-model).
|
||||
GreptimeDB unifies time series data processing by treating all data - whether metrics, logs, or events - as timestamped events with context. Users can analyze this data using either [SQL](https://docs.greptime.com/user-guide/query-data/sql) or [PromQL](https://docs.greptime.com/user-guide/query-data/promql) and leverage stream processing ([Flow](https://docs.greptime.com/user-guide/flow-computation/overview)) to enable continuous aggregation. [Read more](https://docs.greptime.com/user-guide/concepts/data-model).
|
||||
|
||||
* **Cloud-native Distributed Database**
|
||||
|
||||
Built for [Kubernetes](https://docs.greptime.com/user-guide/deployments/deploy-on-kubernetes/greptimedb-operator-management). GreptimeDB achieves seamless scalability with its [cloud-native architecture](https://docs.greptime.com/user-guide/concepts/architecture) of separated compute and storage, built on object storage (AWS S3, Azure Blob Storage, etc.) while enabling cross-cloud deployment through a unified data access layer.
|
||||
Built for [Kubernetes](https://docs.greptime.com/user-guide/deployments/deploy-on-kubernetes/greptimedb-operator-management). GreptimeDB achieves seamless scalability with its [cloud-native architecture](https://docs.greptime.com/user-guide/concepts/architecture) of separated compute and storage, built on object storage (AWS S3, Azure Blob Storage, etc.) while enabling cross-cloud deployment through a unified data access layer.
|
||||
|
||||
* **Performance and Cost-effective**
|
||||
|
||||
Written in pure Rust for superior performance and reliability. GreptimeDB features a distributed query engine with intelligent indexing to handle high cardinality data efficiently. Its optimized columnar storage achieves 50x cost efficiency on cloud object storage through advanced compression. [Benchmark reports](https://www.greptime.com/blogs/2024-09-09-report-summary).
|
||||
Written in pure Rust for superior performance and reliability. GreptimeDB features a distributed query engine with intelligent indexing to handle high cardinality data efficiently. Its optimized columnar storage achieves 50x cost efficiency on cloud object storage through advanced compression. [Benchmark reports](https://www.greptime.com/blogs/2024-09-09-report-summary).
|
||||
|
||||
* **Cloud-Edge Collaboration**
|
||||
|
||||
GreptimeDB seamlessly operates across cloud and edge (ARM/Android/Linux), providing consistent APIs and control plane for unified data management and efficient synchronization. [Learn how to run on Android](https://docs.greptime.com/user-guide/deployments/run-on-android/).
|
||||
GreptimeDB seamlessly operates across cloud and edge (ARM/Android/Linux), providing consistent APIs and control plane for unified data management and efficient synchronization. [Learn how to run on Android](https://docs.greptime.com/user-guide/deployments/run-on-android/).
|
||||
|
||||
* **Multi-protocol Ingestion, SQL & PromQL Ready**
|
||||
|
||||
Widely adopted database protocols and APIs, including MySQL, PostgreSQL, InfluxDB, OpenTelemetry, Loki and Prometheus, etc. Effortless Adoption & Seamless Migration. [Supported Protocols Overview](https://docs.greptime.com/user-guide/protocols/overview).
|
||||
Widely adopted database protocols and APIs, including MySQL, PostgreSQL, InfluxDB, OpenTelemetry, Loki and Prometheus, etc. Effortless Adoption & Seamless Migration. [Supported Protocols Overview](https://docs.greptime.com/user-guide/protocols/overview).
|
||||
|
||||
For more detailed info please read [Why GreptimeDB](https://docs.greptime.com/user-guide/concepts/why-greptimedb).
|
||||
|
||||
@@ -138,7 +138,7 @@ Check the prerequisite:
|
||||
|
||||
* [Rust toolchain](https://www.rust-lang.org/tools/install) (nightly)
|
||||
* [Protobuf compiler](https://grpc.io/docs/protoc-installation/) (>= 3.15)
|
||||
* Python toolchain (optional): Required only if built with PyO3 backend. More detail for compiling with PyO3 can be found in its [documentation](https://pyo3.rs/v0.18.1/building_and_distribution#configuring-the-python-version).
|
||||
* Python toolchain (optional): Required only if built with PyO3 backend. More details for compiling with PyO3 can be found in its [documentation](https://pyo3.rs/v0.18.1/building_and_distribution#configuring-the-python-version).
|
||||
|
||||
Build GreptimeDB binary:
|
||||
|
||||
@@ -154,6 +154,10 @@ cargo run -- standalone start
|
||||
|
||||
## Tools & Extensions
|
||||
|
||||
### Kubernetes
|
||||
|
||||
- [GreptimeDB Operator](https://github.com/GrepTimeTeam/greptimedb-operator)
|
||||
|
||||
### Dashboard
|
||||
|
||||
- [The dashboard UI for GreptimeDB](https://github.com/GreptimeTeam/dashboard)
|
||||
@@ -173,7 +177,7 @@ Our official Grafana dashboard for monitoring GreptimeDB is available at [grafan
|
||||
|
||||
## Project Status
|
||||
|
||||
GreptimeDB is currently in Beta. We are targeting GA (General Availability) with v1.0 release by Early 2025.
|
||||
GreptimeDB is currently in Beta. We are targeting GA (General Availability) with v1.0 release by Early 2025.
|
||||
|
||||
While in Beta, GreptimeDB is already:
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |
|
||||
| `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited. |
|
||||
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. Enabled by default. |
|
||||
| `max_in_flight_write_bytes` | String | Unset | The maximum in-flight write bytes. |
|
||||
| `runtime` | -- | -- | The runtime options. |
|
||||
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
|
||||
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
|
||||
@@ -156,6 +157,11 @@
|
||||
| `region_engine.mito.fulltext_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
|
||||
| `region_engine.mito.fulltext_index.apply_on_query` | String | `auto` | Whether to apply the index on query<br/>- `auto`: automatically (default)<br/>- `disable`: never |
|
||||
| `region_engine.mito.fulltext_index.mem_threshold_on_create` | String | `auto` | Memory threshold for index creation.<br/>- `auto`: automatically determine the threshold based on the system memory size (default)<br/>- `unlimited`: no memory limit<br/>- `[size]` e.g. `64MB`: fixed memory threshold |
|
||||
| `region_engine.mito.bloom_filter_index` | -- | -- | The options for bloom filter in Mito engine. |
|
||||
| `region_engine.mito.bloom_filter_index.create_on_flush` | String | `auto` | Whether to create the bloom filter on flush.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
|
||||
| `region_engine.mito.bloom_filter_index.create_on_compaction` | String | `auto` | Whether to create the bloom filter on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
|
||||
| `region_engine.mito.bloom_filter_index.apply_on_query` | String | `auto` | Whether to apply the bloom filter on query<br/>- `auto`: automatically (default)<br/>- `disable`: never |
|
||||
| `region_engine.mito.bloom_filter_index.mem_threshold_on_create` | String | `auto` | Memory threshold for bloom filter creation.<br/>- `auto`: automatically determine the threshold based on the system memory size (default)<br/>- `unlimited`: no memory limit<br/>- `[size]` e.g. `64MB`: fixed memory threshold |
|
||||
| `region_engine.mito.memtable` | -- | -- | -- |
|
||||
| `region_engine.mito.memtable.type` | String | `time_series` | Memtable type.<br/>- `time_series`: time-series memtable<br/>- `partition_tree`: partition tree memtable (experimental) |
|
||||
| `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.<br/>Only available for `partition_tree` memtable. |
|
||||
@@ -195,6 +201,7 @@
|
||||
| Key | Type | Default | Descriptions |
|
||||
| --- | -----| ------- | ----------- |
|
||||
| `default_timezone` | String | Unset | The default timezone of the server. |
|
||||
| `max_in_flight_write_bytes` | String | Unset | The maximum in-flight write bytes. |
|
||||
| `runtime` | -- | -- | The runtime options. |
|
||||
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
|
||||
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
|
||||
@@ -421,7 +428,7 @@
|
||||
| `storage` | -- | -- | The data storage options. |
|
||||
| `storage.data_home` | String | `/tmp/greptimedb/` | The working home directory. |
|
||||
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
|
||||
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}/object_cache/read`. An empty string means disabling. |
|
||||
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. |
|
||||
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |
|
||||
| `storage.bucket` | String | Unset | The S3 bucket name.<br/>**It's only used when the storage type is `S3`, `Oss` and `Gcs`**. |
|
||||
| `storage.root` | String | Unset | The S3 data will be stored in the specified prefix, for example, `s3://${bucket}/${root}`.<br/>**It's only used when the storage type is `S3`, `Oss` and `Azblob`**. |
|
||||
@@ -460,7 +467,7 @@
|
||||
| `region_engine.mito.page_cache_size` | String | Auto | Cache size for pages of SST row groups. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/8 of OS memory. |
|
||||
| `region_engine.mito.selector_result_cache_size` | String | Auto | Cache size for time series selector (e.g. `last_value()`). Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
|
||||
| `region_engine.mito.enable_experimental_write_cache` | Bool | `false` | Whether to enable the experimental write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance. |
|
||||
| `region_engine.mito.experimental_write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}/object_cache/write`. |
|
||||
| `region_engine.mito.experimental_write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}`. |
|
||||
| `region_engine.mito.experimental_write_cache_size` | String | `5GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. |
|
||||
| `region_engine.mito.experimental_write_cache_ttl` | String | Unset | TTL for write cache. |
|
||||
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
|
||||
@@ -484,6 +491,11 @@
|
||||
| `region_engine.mito.fulltext_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
|
||||
| `region_engine.mito.fulltext_index.apply_on_query` | String | `auto` | Whether to apply the index on query<br/>- `auto`: automatically (default)<br/>- `disable`: never |
|
||||
| `region_engine.mito.fulltext_index.mem_threshold_on_create` | String | `auto` | Memory threshold for index creation.<br/>- `auto`: automatically determine the threshold based on the system memory size (default)<br/>- `unlimited`: no memory limit<br/>- `[size]` e.g. `64MB`: fixed memory threshold |
|
||||
| `region_engine.mito.bloom_filter_index` | -- | -- | The options for bloom filter index in Mito engine. |
|
||||
| `region_engine.mito.bloom_filter_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
|
||||
| `region_engine.mito.bloom_filter_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
|
||||
| `region_engine.mito.bloom_filter_index.apply_on_query` | String | `auto` | Whether to apply the index on query<br/>- `auto`: automatically (default)<br/>- `disable`: never |
|
||||
| `region_engine.mito.bloom_filter_index.mem_threshold_on_create` | String | `auto` | Memory threshold for the index creation.<br/>- `auto`: automatically determine the threshold based on the system memory size (default)<br/>- `unlimited`: no memory limit<br/>- `[size]` e.g. `64MB`: fixed memory threshold |
|
||||
| `region_engine.mito.memtable` | -- | -- | -- |
|
||||
| `region_engine.mito.memtable.type` | String | `time_series` | Memtable type.<br/>- `time_series`: time-series memtable<br/>- `partition_tree`: partition tree memtable (experimental) |
|
||||
| `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.<br/>Only available for `partition_tree` memtable. |
|
||||
|
||||
@@ -294,7 +294,7 @@ data_home = "/tmp/greptimedb/"
|
||||
type = "File"
|
||||
|
||||
## Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.
|
||||
## A local file directory, defaults to `{data_home}/object_cache/read`. An empty string means disabling.
|
||||
## A local file directory, defaults to `{data_home}`. An empty string means disabling.
|
||||
## @toml2docs:none-default
|
||||
#+ cache_path = ""
|
||||
|
||||
@@ -478,7 +478,7 @@ auto_flush_interval = "1h"
|
||||
## Whether to enable the experimental write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance.
|
||||
enable_experimental_write_cache = false
|
||||
|
||||
## File system path for write cache, defaults to `{data_home}/object_cache/write`.
|
||||
## File system path for write cache, defaults to `{data_home}`.
|
||||
experimental_write_cache_path = ""
|
||||
|
||||
## Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger.
|
||||
@@ -576,6 +576,30 @@ apply_on_query = "auto"
|
||||
## - `[size]` e.g. `64MB`: fixed memory threshold
|
||||
mem_threshold_on_create = "auto"
|
||||
|
||||
## The options for bloom filter index in Mito engine.
|
||||
[region_engine.mito.bloom_filter_index]
|
||||
|
||||
## Whether to create the index on flush.
|
||||
## - `auto`: automatically (default)
|
||||
## - `disable`: never
|
||||
create_on_flush = "auto"
|
||||
|
||||
## Whether to create the index on compaction.
|
||||
## - `auto`: automatically (default)
|
||||
## - `disable`: never
|
||||
create_on_compaction = "auto"
|
||||
|
||||
## Whether to apply the index on query
|
||||
## - `auto`: automatically (default)
|
||||
## - `disable`: never
|
||||
apply_on_query = "auto"
|
||||
|
||||
## Memory threshold for the index creation.
|
||||
## - `auto`: automatically determine the threshold based on the system memory size (default)
|
||||
## - `unlimited`: no memory limit
|
||||
## - `[size]` e.g. `64MB`: fixed memory threshold
|
||||
mem_threshold_on_create = "auto"
|
||||
|
||||
[region_engine.mito.memtable]
|
||||
## Memtable type.
|
||||
## - `time_series`: time-series memtable
|
||||
|
||||
@@ -2,6 +2,10 @@
|
||||
## @toml2docs:none-default
|
||||
default_timezone = "UTC"
|
||||
|
||||
## The maximum in-flight write bytes.
|
||||
## @toml2docs:none-default
|
||||
#+ max_in_flight_write_bytes = "500MB"
|
||||
|
||||
## The runtime options.
|
||||
#+ [runtime]
|
||||
## The number of threads to execute the runtime for global read operations.
|
||||
|
||||
@@ -18,6 +18,10 @@ max_concurrent_queries = 0
|
||||
## Enable telemetry to collect anonymous usage data. Enabled by default.
|
||||
#+ enable_telemetry = true
|
||||
|
||||
## The maximum in-flight write bytes.
|
||||
## @toml2docs:none-default
|
||||
#+ max_in_flight_write_bytes = "500MB"
|
||||
|
||||
## The runtime options.
|
||||
#+ [runtime]
|
||||
## The number of threads to execute the runtime for global read operations.
|
||||
@@ -615,6 +619,30 @@ apply_on_query = "auto"
|
||||
## - `[size]` e.g. `64MB`: fixed memory threshold
|
||||
mem_threshold_on_create = "auto"
|
||||
|
||||
## The options for bloom filter in Mito engine.
|
||||
[region_engine.mito.bloom_filter_index]
|
||||
|
||||
## Whether to create the bloom filter on flush.
|
||||
## - `auto`: automatically (default)
|
||||
## - `disable`: never
|
||||
create_on_flush = "auto"
|
||||
|
||||
## Whether to create the bloom filter on compaction.
|
||||
## - `auto`: automatically (default)
|
||||
## - `disable`: never
|
||||
create_on_compaction = "auto"
|
||||
|
||||
## Whether to apply the bloom filter on query
|
||||
## - `auto`: automatically (default)
|
||||
## - `disable`: never
|
||||
apply_on_query = "auto"
|
||||
|
||||
## Memory threshold for bloom filter creation.
|
||||
## - `auto`: automatically determine the threshold based on the system memory size (default)
|
||||
## - `unlimited`: no memory limit
|
||||
## - `[size]` e.g. `64MB`: fixed memory threshold
|
||||
mem_threshold_on_create = "auto"
|
||||
|
||||
[region_engine.mito.memtable]
|
||||
## Memtable type.
|
||||
## - `time_series`: time-series memtable
|
||||
|
||||
@@ -13,8 +13,6 @@ RUN yum install -y epel-release \
|
||||
openssl \
|
||||
openssl-devel \
|
||||
centos-release-scl \
|
||||
rh-python38 \
|
||||
rh-python38-python-devel \
|
||||
which
|
||||
|
||||
# Install protoc
|
||||
@@ -43,8 +41,6 @@ RUN yum install -y epel-release \
|
||||
openssl \
|
||||
openssl-devel \
|
||||
centos-release-scl \
|
||||
rh-python38 \
|
||||
rh-python38-python-devel \
|
||||
which
|
||||
|
||||
WORKDIR /greptime
|
||||
|
||||
@@ -20,10 +20,7 @@ RUN --mount=type=cache,target=/var/cache/apt \
|
||||
curl \
|
||||
git \
|
||||
build-essential \
|
||||
pkg-config \
|
||||
python3.10 \
|
||||
python3.10-dev \
|
||||
python3-pip
|
||||
pkg-config
|
||||
|
||||
# Install Rust.
|
||||
SHELL ["/bin/bash", "-c"]
|
||||
@@ -46,15 +43,8 @@ ARG OUTPUT_DIR
|
||||
|
||||
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get \
|
||||
-y install ca-certificates \
|
||||
python3.10 \
|
||||
python3.10-dev \
|
||||
python3-pip \
|
||||
curl
|
||||
|
||||
COPY ./docker/python/requirements.txt /etc/greptime/requirements.txt
|
||||
|
||||
RUN python3 -m pip install -r /etc/greptime/requirements.txt
|
||||
|
||||
WORKDIR /greptime
|
||||
COPY --from=builder /out/target/${OUTPUT_DIR}/greptime /greptime/bin/
|
||||
ENV PATH /greptime/bin/:$PATH
|
||||
|
||||
@@ -7,9 +7,7 @@ RUN sed -i s/^#.*baseurl=http/baseurl=http/g /etc/yum.repos.d/*.repo
|
||||
RUN yum install -y epel-release \
|
||||
openssl \
|
||||
openssl-devel \
|
||||
centos-release-scl \
|
||||
rh-python38 \
|
||||
rh-python38-python-devel
|
||||
centos-release-scl
|
||||
|
||||
ARG TARGETARCH
|
||||
|
||||
|
||||
@@ -8,15 +8,8 @@ ARG TARGET_BIN=greptime
|
||||
|
||||
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \
|
||||
ca-certificates \
|
||||
python3.10 \
|
||||
python3.10-dev \
|
||||
python3-pip \
|
||||
curl
|
||||
|
||||
COPY $DOCKER_BUILD_ROOT/docker/python/requirements.txt /etc/greptime/requirements.txt
|
||||
|
||||
RUN python3 -m pip install -r /etc/greptime/requirements.txt
|
||||
|
||||
ARG TARGETARCH
|
||||
|
||||
ADD $TARGETARCH/$TARGET_BIN /greptime/bin/
|
||||
|
||||
@@ -15,8 +15,8 @@ RUN apt-get update && \
|
||||
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \
|
||||
libssl-dev \
|
||||
tzdata \
|
||||
protobuf-compiler \
|
||||
curl \
|
||||
unzip \
|
||||
ca-certificates \
|
||||
git \
|
||||
build-essential \
|
||||
@@ -24,6 +24,20 @@ RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \
|
||||
python3.10 \
|
||||
python3.10-dev
|
||||
|
||||
ARG TARGETPLATFORM
|
||||
RUN echo "target platform: $TARGETPLATFORM"
|
||||
|
||||
# Install protobuf, because the one in the apt is too old (v3.12).
|
||||
RUN if [ "$TARGETPLATFORM" = "linux/arm64" ]; then \
|
||||
curl -OL https://github.com/protocolbuffers/protobuf/releases/download/v29.1/protoc-29.1-linux-aarch_64.zip && \
|
||||
unzip protoc-29.1-linux-aarch_64.zip -d protoc3; \
|
||||
elif [ "$TARGETPLATFORM" = "linux/amd64" ]; then \
|
||||
curl -OL https://github.com/protocolbuffers/protobuf/releases/download/v29.1/protoc-29.1-linux-x86_64.zip && \
|
||||
unzip protoc-29.1-linux-x86_64.zip -d protoc3; \
|
||||
fi
|
||||
RUN mv protoc3/bin/* /usr/local/bin/
|
||||
RUN mv protoc3/include/* /usr/local/include/
|
||||
|
||||
# https://github.com/GreptimeTeam/greptimedb/actions/runs/10935485852/job/30357457188#step:3:7106
|
||||
# `aws-lc-sys` require gcc >= 10.3.0 to work, hence alias to use gcc-10
|
||||
RUN apt-get remove -y gcc-9 g++-9 cpp-9 && \
|
||||
@@ -49,7 +63,7 @@ RUN apt-get -y purge python3.8 && \
|
||||
# wildcard here. However, that requires the git's config files and the submodules all owned by the very same user.
|
||||
# It's troublesome to do this since the dev build runs in Docker, which is under user "root"; while outside the Docker,
|
||||
# it can be a different user that have prepared the submodules.
|
||||
RUN git config --global --add safe.directory *
|
||||
RUN git config --global --add safe.directory '*'
|
||||
|
||||
# Install Python dependencies.
|
||||
COPY $DOCKER_BUILD_ROOT/docker/python/requirements.txt /etc/greptime/requirements.txt
|
||||
|
||||
@@ -20,3 +20,31 @@ Sample at 49 Hertz, for 10 seconds, output report in text format.
|
||||
```bash
|
||||
curl -X POST -s '0:4000/debug/prof/cpu?seconds=10&frequency=49&output=text' > /tmp/pprof.txt
|
||||
```
|
||||
|
||||
## Using `perf`
|
||||
|
||||
First find the pid of GreptimeDB:
|
||||
|
||||
Using `perf record` to profile GreptimeDB, at the sampling frequency of 99 hertz, and a duration of 60 seconds:
|
||||
|
||||
```bash
|
||||
perf record -p <pid> --call-graph dwarf -F 99 -- sleep 60
|
||||
```
|
||||
|
||||
The result will be saved to file `perf.data`.
|
||||
|
||||
Then
|
||||
|
||||
```bash
|
||||
perf script --no-inline > perf.out
|
||||
```
|
||||
|
||||
Produce a flame graph out of it:
|
||||
|
||||
```bash
|
||||
git clone https://github.com/brendangregg/FlameGraph
|
||||
|
||||
FlameGraph/stackcollapse-perf.pl perf.out > perf.folded
|
||||
|
||||
FlameGraph/flamegraph.pl perf.folded > perf.svg
|
||||
```
|
||||
|
||||
@@ -25,6 +25,7 @@ pub enum PermissionReq<'a> {
|
||||
GrpcRequest(&'a Request),
|
||||
SqlStatement(&'a Statement),
|
||||
PromQuery,
|
||||
LogQuery,
|
||||
Opentsdb,
|
||||
LineProtocol,
|
||||
PromStoreWrite,
|
||||
|
||||
@@ -64,6 +64,13 @@ pub enum Error {
|
||||
source: BoxedError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to list flow stats"))]
|
||||
ListFlowStats {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
source: BoxedError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to list flows in catalog {catalog}"))]
|
||||
ListFlows {
|
||||
#[snafu(implicit)]
|
||||
@@ -326,6 +333,7 @@ impl ErrorExt for Error {
|
||||
| Error::ListSchemas { source, .. }
|
||||
| Error::ListTables { source, .. }
|
||||
| Error::ListFlows { source, .. }
|
||||
| Error::ListFlowStats { source, .. }
|
||||
| Error::ListProcedures { source, .. }
|
||||
| Error::ListRegionStats { source, .. }
|
||||
| Error::ConvertProtoData { source, .. } => source.status_code(),
|
||||
|
||||
@@ -17,6 +17,7 @@ use common_error::ext::BoxedError;
|
||||
use common_meta::cluster::{ClusterInfo, NodeInfo};
|
||||
use common_meta::datanode::RegionStat;
|
||||
use common_meta::ddl::{ExecutorContext, ProcedureExecutor};
|
||||
use common_meta::key::flow::flow_state::FlowStat;
|
||||
use common_meta::rpc::procedure;
|
||||
use common_procedure::{ProcedureInfo, ProcedureState};
|
||||
use meta_client::MetaClientRef;
|
||||
@@ -89,4 +90,12 @@ impl InformationExtension for DistributedInformationExtension {
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ListRegionStatsSnafu)
|
||||
}
|
||||
|
||||
async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
|
||||
self.meta_client
|
||||
.list_flow_stats()
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(crate::error::ListFlowStatsSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,7 +18,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use common_error::ext::{BoxedError, ErrorExt};
|
||||
use common_meta::cache_invalidator::KvCacheInvalidator;
|
||||
use common_meta::error::Error::CacheNotGet;
|
||||
use common_meta::error::{CacheNotGetSnafu, Error, ExternalSnafu, GetKvCacheSnafu, Result};
|
||||
@@ -37,6 +37,7 @@ use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use crate::metrics::{
|
||||
METRIC_CATALOG_KV_BATCH_GET, METRIC_CATALOG_KV_GET, METRIC_CATALOG_KV_REMOTE_GET,
|
||||
METRIC_META_CLIENT_GET,
|
||||
};
|
||||
|
||||
const DEFAULT_CACHE_MAX_CAPACITY: u64 = 10000;
|
||||
@@ -292,7 +293,7 @@ impl KvBackend for CachedKvBackend {
|
||||
}
|
||||
.map_err(|e| {
|
||||
GetKvCacheSnafu {
|
||||
err_msg: e.to_string(),
|
||||
err_msg: e.output_msg(),
|
||||
}
|
||||
.build()
|
||||
});
|
||||
@@ -445,6 +446,8 @@ impl KvBackend for MetaKvBackend {
|
||||
}
|
||||
|
||||
async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>> {
|
||||
let _timer = METRIC_META_CLIENT_GET.start_timer();
|
||||
|
||||
let mut response = self
|
||||
.client
|
||||
.range(RangeRequest::new().with_key(key))
|
||||
|
||||
@@ -38,7 +38,7 @@ pub fn new_table_cache(
|
||||
) -> TableCache {
|
||||
let init = init_factory(table_info_cache, table_name_cache);
|
||||
|
||||
CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter))
|
||||
CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
|
||||
}
|
||||
|
||||
fn init_factory(
|
||||
|
||||
@@ -34,4 +34,6 @@ lazy_static! {
|
||||
register_histogram!("greptime_catalog_kv_get", "catalog kv get").unwrap();
|
||||
pub static ref METRIC_CATALOG_KV_BATCH_GET: Histogram =
|
||||
register_histogram!("greptime_catalog_kv_batch_get", "catalog kv batch get").unwrap();
|
||||
pub static ref METRIC_META_CLIENT_GET: Histogram =
|
||||
register_histogram!("greptime_meta_client_get", "meta client get").unwrap();
|
||||
}
|
||||
|
||||
@@ -35,6 +35,7 @@ use common_catalog::consts::{self, DEFAULT_CATALOG_NAME, INFORMATION_SCHEMA_NAME
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_meta::cluster::NodeInfo;
|
||||
use common_meta::datanode::RegionStat;
|
||||
use common_meta::key::flow::flow_state::FlowStat;
|
||||
use common_meta::key::flow::FlowMetadataManager;
|
||||
use common_procedure::ProcedureInfo;
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
@@ -192,6 +193,7 @@ impl SystemSchemaProviderInner for InformationSchemaProvider {
|
||||
)) as _),
|
||||
FLOWS => Some(Arc::new(InformationSchemaFlows::new(
|
||||
self.catalog_name.clone(),
|
||||
self.catalog_manager.clone(),
|
||||
self.flow_metadata_manager.clone(),
|
||||
)) as _),
|
||||
PROCEDURE_INFO => Some(
|
||||
@@ -338,6 +340,9 @@ pub trait InformationExtension {
|
||||
|
||||
/// Gets the region statistics.
|
||||
async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error>;
|
||||
|
||||
/// Get the flow statistics. If no flownode is available, return `None`.
|
||||
async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error>;
|
||||
}
|
||||
|
||||
pub struct NoopInformationExtension;
|
||||
@@ -357,4 +362,8 @@ impl InformationExtension for NoopInformationExtension {
|
||||
async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
|
||||
Ok(vec![])
|
||||
}
|
||||
|
||||
async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,11 +12,12 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::sync::{Arc, Weak};
|
||||
|
||||
use common_catalog::consts::INFORMATION_SCHEMA_FLOW_TABLE_ID;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::key::flow::flow_info::FlowInfoValue;
|
||||
use common_meta::key::flow::flow_state::FlowStat;
|
||||
use common_meta::key::flow::FlowMetadataManager;
|
||||
use common_meta::key::FlowId;
|
||||
use common_recordbatch::adapter::RecordBatchStreamAdapter;
|
||||
@@ -28,7 +29,9 @@ use datatypes::prelude::ConcreteDataType as CDT;
|
||||
use datatypes::scalars::ScalarVectorBuilder;
|
||||
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::{Int64VectorBuilder, StringVectorBuilder, UInt32VectorBuilder, VectorRef};
|
||||
use datatypes::vectors::{
|
||||
Int64VectorBuilder, StringVectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder, VectorRef,
|
||||
};
|
||||
use futures::TryStreamExt;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::storage::{ScanRequest, TableId};
|
||||
@@ -38,6 +41,8 @@ use crate::error::{
|
||||
};
|
||||
use crate::information_schema::{Predicates, FLOWS};
|
||||
use crate::system_schema::information_schema::InformationTable;
|
||||
use crate::system_schema::utils;
|
||||
use crate::CatalogManager;
|
||||
|
||||
const INIT_CAPACITY: usize = 42;
|
||||
|
||||
@@ -45,6 +50,7 @@ const INIT_CAPACITY: usize = 42;
|
||||
// pk is (flow_name, flow_id, table_catalog)
|
||||
pub const FLOW_NAME: &str = "flow_name";
|
||||
pub const FLOW_ID: &str = "flow_id";
|
||||
pub const STATE_SIZE: &str = "state_size";
|
||||
pub const TABLE_CATALOG: &str = "table_catalog";
|
||||
pub const FLOW_DEFINITION: &str = "flow_definition";
|
||||
pub const COMMENT: &str = "comment";
|
||||
@@ -55,20 +61,24 @@ pub const FLOWNODE_IDS: &str = "flownode_ids";
|
||||
pub const OPTIONS: &str = "options";
|
||||
|
||||
/// The `information_schema.flows` to provides information about flows in databases.
|
||||
///
|
||||
pub(super) struct InformationSchemaFlows {
|
||||
schema: SchemaRef,
|
||||
catalog_name: String,
|
||||
catalog_manager: Weak<dyn CatalogManager>,
|
||||
flow_metadata_manager: Arc<FlowMetadataManager>,
|
||||
}
|
||||
|
||||
impl InformationSchemaFlows {
|
||||
pub(super) fn new(
|
||||
catalog_name: String,
|
||||
catalog_manager: Weak<dyn CatalogManager>,
|
||||
flow_metadata_manager: Arc<FlowMetadataManager>,
|
||||
) -> Self {
|
||||
Self {
|
||||
schema: Self::schema(),
|
||||
catalog_name,
|
||||
catalog_manager,
|
||||
flow_metadata_manager,
|
||||
}
|
||||
}
|
||||
@@ -80,6 +90,7 @@ impl InformationSchemaFlows {
|
||||
vec![
|
||||
(FLOW_NAME, CDT::string_datatype(), false),
|
||||
(FLOW_ID, CDT::uint32_datatype(), false),
|
||||
(STATE_SIZE, CDT::uint64_datatype(), true),
|
||||
(TABLE_CATALOG, CDT::string_datatype(), false),
|
||||
(FLOW_DEFINITION, CDT::string_datatype(), false),
|
||||
(COMMENT, CDT::string_datatype(), true),
|
||||
@@ -99,6 +110,7 @@ impl InformationSchemaFlows {
|
||||
InformationSchemaFlowsBuilder::new(
|
||||
self.schema.clone(),
|
||||
self.catalog_name.clone(),
|
||||
self.catalog_manager.clone(),
|
||||
&self.flow_metadata_manager,
|
||||
)
|
||||
}
|
||||
@@ -144,10 +156,12 @@ impl InformationTable for InformationSchemaFlows {
|
||||
struct InformationSchemaFlowsBuilder {
|
||||
schema: SchemaRef,
|
||||
catalog_name: String,
|
||||
catalog_manager: Weak<dyn CatalogManager>,
|
||||
flow_metadata_manager: Arc<FlowMetadataManager>,
|
||||
|
||||
flow_names: StringVectorBuilder,
|
||||
flow_ids: UInt32VectorBuilder,
|
||||
state_sizes: UInt64VectorBuilder,
|
||||
table_catalogs: StringVectorBuilder,
|
||||
raw_sqls: StringVectorBuilder,
|
||||
comments: StringVectorBuilder,
|
||||
@@ -162,15 +176,18 @@ impl InformationSchemaFlowsBuilder {
|
||||
fn new(
|
||||
schema: SchemaRef,
|
||||
catalog_name: String,
|
||||
catalog_manager: Weak<dyn CatalogManager>,
|
||||
flow_metadata_manager: &Arc<FlowMetadataManager>,
|
||||
) -> Self {
|
||||
Self {
|
||||
schema,
|
||||
catalog_name,
|
||||
catalog_manager,
|
||||
flow_metadata_manager: flow_metadata_manager.clone(),
|
||||
|
||||
flow_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
flow_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
state_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
table_catalogs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
raw_sqls: StringVectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
comments: StringVectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
@@ -195,6 +212,11 @@ impl InformationSchemaFlowsBuilder {
|
||||
.flow_names(&catalog_name)
|
||||
.await;
|
||||
|
||||
let flow_stat = {
|
||||
let information_extension = utils::information_extension(&self.catalog_manager)?;
|
||||
information_extension.flow_stats().await?
|
||||
};
|
||||
|
||||
while let Some((flow_name, flow_id)) = stream
|
||||
.try_next()
|
||||
.await
|
||||
@@ -213,7 +235,7 @@ impl InformationSchemaFlowsBuilder {
|
||||
catalog_name: catalog_name.to_string(),
|
||||
flow_name: flow_name.to_string(),
|
||||
})?;
|
||||
self.add_flow(&predicates, flow_id.flow_id(), flow_info)?;
|
||||
self.add_flow(&predicates, flow_id.flow_id(), flow_info, &flow_stat)?;
|
||||
}
|
||||
|
||||
self.finish()
|
||||
@@ -224,6 +246,7 @@ impl InformationSchemaFlowsBuilder {
|
||||
predicates: &Predicates,
|
||||
flow_id: FlowId,
|
||||
flow_info: FlowInfoValue,
|
||||
flow_stat: &Option<FlowStat>,
|
||||
) -> Result<()> {
|
||||
let row = [
|
||||
(FLOW_NAME, &Value::from(flow_info.flow_name().to_string())),
|
||||
@@ -238,6 +261,11 @@ impl InformationSchemaFlowsBuilder {
|
||||
}
|
||||
self.flow_names.push(Some(flow_info.flow_name()));
|
||||
self.flow_ids.push(Some(flow_id));
|
||||
self.state_sizes.push(
|
||||
flow_stat
|
||||
.as_ref()
|
||||
.and_then(|state| state.state_size.get(&flow_id).map(|v| *v as u64)),
|
||||
);
|
||||
self.table_catalogs.push(Some(flow_info.catalog_name()));
|
||||
self.raw_sqls.push(Some(flow_info.raw_sql()));
|
||||
self.comments.push(Some(flow_info.comment()));
|
||||
@@ -270,6 +298,7 @@ impl InformationSchemaFlowsBuilder {
|
||||
let columns: Vec<VectorRef> = vec![
|
||||
Arc::new(self.flow_names.finish()),
|
||||
Arc::new(self.flow_ids.finish()),
|
||||
Arc::new(self.state_sizes.finish()),
|
||||
Arc::new(self.table_catalogs.finish()),
|
||||
Arc::new(self.raw_sqls.finish()),
|
||||
Arc::new(self.comments.finish()),
|
||||
|
||||
@@ -15,7 +15,7 @@ cache.workspace = true
|
||||
catalog.workspace = true
|
||||
chrono.workspace = true
|
||||
clap.workspace = true
|
||||
client.workspace = true
|
||||
client = { workspace = true, features = ["testing"] }
|
||||
common-base.workspace = true
|
||||
common-catalog.workspace = true
|
||||
common-config.workspace = true
|
||||
@@ -56,7 +56,6 @@ tokio.workspace = true
|
||||
tracing-appender.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
client = { workspace = true, features = ["testing"] }
|
||||
common-test-util.workspace = true
|
||||
common-version.workspace = true
|
||||
serde.workspace = true
|
||||
|
||||
@@ -34,7 +34,7 @@ use common_query::Output;
|
||||
use common_recordbatch::RecordBatches;
|
||||
use common_telemetry::debug;
|
||||
use either::Either;
|
||||
use meta_client::client::MetaClientBuilder;
|
||||
use meta_client::client::{ClusterKvBackend, MetaClientBuilder};
|
||||
use query::datafusion::DatafusionQueryEngine;
|
||||
use query::parser::QueryLanguageParser;
|
||||
use query::query_engine::{DefaultSerializer, QueryEngineState};
|
||||
|
||||
@@ -62,6 +62,13 @@ impl Instance {
|
||||
pub fn datanode(&self) -> &Datanode {
|
||||
&self.datanode
|
||||
}
|
||||
|
||||
/// Get mutable Datanode instance for changing some internal state, before starting it.
|
||||
// Useful for wrapping Datanode instance. Please do not remove this method even if you find
|
||||
// nowhere it is called.
|
||||
pub fn datanode_mut(&mut self) -> &mut Datanode {
|
||||
&mut self.datanode
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
||||
@@ -22,6 +22,7 @@ use catalog::information_schema::InformationExtension;
|
||||
use catalog::kvbackend::KvBackendCatalogManager;
|
||||
use clap::Parser;
|
||||
use client::api::v1::meta::RegionRole;
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_base::Plugins;
|
||||
use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
|
||||
use common_config::{metadata_store_dir, Configurable, KvBackendConfig};
|
||||
@@ -34,6 +35,7 @@ use common_meta::ddl::flow_meta::{FlowMetadataAllocator, FlowMetadataAllocatorRe
|
||||
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
|
||||
use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl, ProcedureExecutorRef};
|
||||
use common_meta::ddl_manager::DdlManager;
|
||||
use common_meta::key::flow::flow_state::FlowStat;
|
||||
use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
|
||||
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
@@ -70,7 +72,7 @@ use servers::http::HttpOptions;
|
||||
use servers::tls::{TlsMode, TlsOption};
|
||||
use servers::Mode;
|
||||
use snafu::ResultExt;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::{broadcast, RwLock};
|
||||
use tracing_appender::non_blocking::WorkerGuard;
|
||||
|
||||
use crate::error::{
|
||||
@@ -151,6 +153,7 @@ pub struct StandaloneOptions {
|
||||
pub tracing: TracingOptions,
|
||||
pub init_regions_in_background: bool,
|
||||
pub init_regions_parallelism: usize,
|
||||
pub max_in_flight_write_bytes: Option<ReadableSize>,
|
||||
}
|
||||
|
||||
impl Default for StandaloneOptions {
|
||||
@@ -180,6 +183,7 @@ impl Default for StandaloneOptions {
|
||||
tracing: TracingOptions::default(),
|
||||
init_regions_in_background: false,
|
||||
init_regions_parallelism: 16,
|
||||
max_in_flight_write_bytes: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -217,6 +221,7 @@ impl StandaloneOptions {
|
||||
user_provider: cloned_opts.user_provider,
|
||||
// Handle the export metrics task run by standalone to frontend for execution
|
||||
export_metrics: cloned_opts.export_metrics,
|
||||
max_in_flight_write_bytes: cloned_opts.max_in_flight_write_bytes,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
@@ -507,7 +512,7 @@ impl StartCommand {
|
||||
procedure_manager.clone(),
|
||||
));
|
||||
let catalog_manager = KvBackendCatalogManager::new(
|
||||
information_extension,
|
||||
information_extension.clone(),
|
||||
kv_backend.clone(),
|
||||
layered_cache_registry.clone(),
|
||||
Some(procedure_manager.clone()),
|
||||
@@ -532,6 +537,14 @@ impl StartCommand {
|
||||
.context(OtherSnafu)?,
|
||||
);
|
||||
|
||||
// set the ref to query for the local flow state
|
||||
{
|
||||
let flow_worker_manager = flownode.flow_worker_manager();
|
||||
information_extension
|
||||
.set_flow_worker_manager(flow_worker_manager.clone())
|
||||
.await;
|
||||
}
|
||||
|
||||
let node_manager = Arc::new(StandaloneDatanodeManager {
|
||||
region_server: datanode.region_server(),
|
||||
flow_server: flownode.flow_worker_manager(),
|
||||
@@ -669,6 +682,7 @@ pub struct StandaloneInformationExtension {
|
||||
region_server: RegionServer,
|
||||
procedure_manager: ProcedureManagerRef,
|
||||
start_time_ms: u64,
|
||||
flow_worker_manager: RwLock<Option<Arc<FlowWorkerManager>>>,
|
||||
}
|
||||
|
||||
impl StandaloneInformationExtension {
|
||||
@@ -677,8 +691,15 @@ impl StandaloneInformationExtension {
|
||||
region_server,
|
||||
procedure_manager,
|
||||
start_time_ms: common_time::util::current_time_millis() as u64,
|
||||
flow_worker_manager: RwLock::new(None),
|
||||
}
|
||||
}
|
||||
|
||||
/// Set the flow worker manager for the standalone instance.
|
||||
pub async fn set_flow_worker_manager(&self, flow_worker_manager: Arc<FlowWorkerManager>) {
|
||||
let mut guard = self.flow_worker_manager.write().await;
|
||||
*guard = Some(flow_worker_manager);
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@@ -750,6 +771,18 @@ impl InformationExtension for StandaloneInformationExtension {
|
||||
.collect::<Vec<_>>();
|
||||
Ok(stats)
|
||||
}
|
||||
|
||||
async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
|
||||
Ok(Some(
|
||||
self.flow_worker_manager
|
||||
.read()
|
||||
.await
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.gen_state_report()
|
||||
.await,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -8,6 +8,7 @@ license.workspace = true
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
http.workspace = true
|
||||
snafu.workspace = true
|
||||
strum.workspace = true
|
||||
tonic.workspace = true
|
||||
|
||||
@@ -18,9 +18,30 @@ pub mod ext;
|
||||
pub mod mock;
|
||||
pub mod status_code;
|
||||
|
||||
use http::{HeaderMap, HeaderValue};
|
||||
pub use snafu;
|
||||
|
||||
// HACK - these headers are here for shared in gRPC services. For common HTTP headers,
|
||||
// please define in `src/servers/src/http/header.rs`.
|
||||
pub const GREPTIME_DB_HEADER_ERROR_CODE: &str = "x-greptime-err-code";
|
||||
pub const GREPTIME_DB_HEADER_ERROR_MSG: &str = "x-greptime-err-msg";
|
||||
|
||||
/// Create a http header map from error code and message.
|
||||
/// using `GREPTIME_DB_HEADER_ERROR_CODE` and `GREPTIME_DB_HEADER_ERROR_MSG` as keys.
|
||||
pub fn from_err_code_msg_to_header(code: u32, msg: &str) -> HeaderMap {
|
||||
let mut header = HeaderMap::new();
|
||||
|
||||
let msg = HeaderValue::from_str(msg).unwrap_or_else(|_| {
|
||||
HeaderValue::from_bytes(
|
||||
&msg.as_bytes()
|
||||
.iter()
|
||||
.flat_map(|b| std::ascii::escape_default(*b))
|
||||
.collect::<Vec<u8>>(),
|
||||
)
|
||||
.expect("Already escaped string should be valid ascii")
|
||||
});
|
||||
|
||||
header.insert(GREPTIME_DB_HEADER_ERROR_CODE, code.into());
|
||||
header.insert(GREPTIME_DB_HEADER_ERROR_MSG, msg);
|
||||
header
|
||||
}
|
||||
|
||||
@@ -33,7 +33,7 @@ geo-types = { version = "0.7", optional = true }
|
||||
geohash = { version = "0.13", optional = true }
|
||||
h3o = { version = "0.6", optional = true }
|
||||
jsonb.workspace = true
|
||||
nalgebra = "0.33"
|
||||
nalgebra.workspace = true
|
||||
num = "0.4"
|
||||
num-traits = "0.2"
|
||||
once_cell.workspace = true
|
||||
|
||||
@@ -26,3 +26,4 @@ pub mod function_registry;
|
||||
pub mod handlers;
|
||||
pub mod helper;
|
||||
pub mod state;
|
||||
pub mod utils;
|
||||
|
||||
@@ -32,6 +32,7 @@ pub use scipy_stats_norm_cdf::ScipyStatsNormCdfAccumulatorCreator;
|
||||
pub use scipy_stats_norm_pdf::ScipyStatsNormPdfAccumulatorCreator;
|
||||
|
||||
use crate::function_registry::FunctionRegistry;
|
||||
use crate::scalars::vector::sum::VectorSumCreator;
|
||||
|
||||
/// A function creates `AggregateFunctionCreator`.
|
||||
/// "Aggregator" *is* AggregatorFunction. Since the later one is long, we named an short alias for it.
|
||||
@@ -91,6 +92,7 @@ impl AggregateFunctions {
|
||||
register_aggr_func!("argmin", 1, ArgminAccumulatorCreator);
|
||||
register_aggr_func!("scipystatsnormcdf", 2, ScipyStatsNormCdfAccumulatorCreator);
|
||||
register_aggr_func!("scipystatsnormpdf", 2, ScipyStatsNormPdfAccumulatorCreator);
|
||||
register_aggr_func!("vec_sum", 1, VectorSumCreator);
|
||||
|
||||
#[cfg(feature = "geo")]
|
||||
register_aggr_func!(
|
||||
|
||||
@@ -204,20 +204,10 @@ impl PatternAst {
|
||||
fn convert_literal(column: &str, pattern: &str) -> Expr {
|
||||
logical_expr::col(column).like(logical_expr::lit(format!(
|
||||
"%{}%",
|
||||
Self::escape_pattern(pattern)
|
||||
crate::utils::escape_like_pattern(pattern)
|
||||
)))
|
||||
}
|
||||
|
||||
fn escape_pattern(pattern: &str) -> String {
|
||||
pattern
|
||||
.chars()
|
||||
.flat_map(|c| match c {
|
||||
'\\' | '%' | '_' => vec!['\\', c],
|
||||
_ => vec![c],
|
||||
})
|
||||
.collect::<String>()
|
||||
}
|
||||
|
||||
/// Transform this AST with preset rules to make it correct.
|
||||
fn transform_ast(self) -> Result<Self> {
|
||||
self.transform_up(Self::collapse_binary_branch_fn)
|
||||
|
||||
@@ -14,9 +14,14 @@
|
||||
|
||||
mod convert;
|
||||
mod distance;
|
||||
pub(crate) mod impl_conv;
|
||||
mod elem_sum;
|
||||
pub mod impl_conv;
|
||||
mod scalar_add;
|
||||
mod scalar_mul;
|
||||
mod sub;
|
||||
pub(crate) mod sum;
|
||||
mod vector_div;
|
||||
mod vector_mul;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -38,5 +43,11 @@ impl VectorFunction {
|
||||
// scalar calculation
|
||||
registry.register(Arc::new(scalar_add::ScalarAddFunction));
|
||||
registry.register(Arc::new(scalar_mul::ScalarMulFunction));
|
||||
|
||||
// vector calculation
|
||||
registry.register(Arc::new(vector_mul::VectorMulFunction));
|
||||
registry.register(Arc::new(vector_div::VectorDivFunction));
|
||||
registry.register(Arc::new(sub::SubFunction));
|
||||
registry.register(Arc::new(elem_sum::ElemSumFunction));
|
||||
}
|
||||
}
|
||||
|
||||
129
src/common/function/src/scalars/vector/elem_sum.rs
Normal file
129
src/common/function/src/scalars/vector/elem_sum.rs
Normal file
@@ -0,0 +1,129 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::borrow::Cow;
|
||||
use std::fmt::Display;
|
||||
|
||||
use common_query::error::InvalidFuncArgsSnafu;
|
||||
use common_query::prelude::{Signature, TypeSignature, Volatility};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::scalars::ScalarVectorBuilder;
|
||||
use datatypes::vectors::{Float32VectorBuilder, MutableVector, VectorRef};
|
||||
use nalgebra::DVectorView;
|
||||
use snafu::ensure;
|
||||
|
||||
use crate::function::{Function, FunctionContext};
|
||||
use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const};
|
||||
|
||||
const NAME: &str = "vec_elem_sum";
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct ElemSumFunction;
|
||||
|
||||
impl Function for ElemSumFunction {
|
||||
fn name(&self) -> &str {
|
||||
NAME
|
||||
}
|
||||
|
||||
fn return_type(
|
||||
&self,
|
||||
_input_types: &[ConcreteDataType],
|
||||
) -> common_query::error::Result<ConcreteDataType> {
|
||||
Ok(ConcreteDataType::float32_datatype())
|
||||
}
|
||||
|
||||
fn signature(&self) -> Signature {
|
||||
Signature::one_of(
|
||||
vec![
|
||||
TypeSignature::Exact(vec![ConcreteDataType::string_datatype()]),
|
||||
TypeSignature::Exact(vec![ConcreteDataType::binary_datatype()]),
|
||||
],
|
||||
Volatility::Immutable,
|
||||
)
|
||||
}
|
||||
|
||||
fn eval(
|
||||
&self,
|
||||
_func_ctx: FunctionContext,
|
||||
columns: &[VectorRef],
|
||||
) -> common_query::error::Result<VectorRef> {
|
||||
ensure!(
|
||||
columns.len() == 1,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect exactly one, have: {}",
|
||||
columns.len()
|
||||
)
|
||||
}
|
||||
);
|
||||
let arg0 = &columns[0];
|
||||
|
||||
let len = arg0.len();
|
||||
let mut result = Float32VectorBuilder::with_capacity(len);
|
||||
if len == 0 {
|
||||
return Ok(result.to_vector());
|
||||
}
|
||||
|
||||
let arg0_const = as_veclit_if_const(arg0)?;
|
||||
|
||||
for i in 0..len {
|
||||
let arg0 = match arg0_const.as_ref() {
|
||||
Some(arg0) => Some(Cow::Borrowed(arg0.as_ref())),
|
||||
None => as_veclit(arg0.get_ref(i))?,
|
||||
};
|
||||
let Some(arg0) = arg0 else {
|
||||
result.push_null();
|
||||
continue;
|
||||
};
|
||||
result.push(Some(DVectorView::from_slice(&arg0, arg0.len()).sum()));
|
||||
}
|
||||
|
||||
Ok(result.to_vector())
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for ElemSumFunction {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", NAME.to_ascii_uppercase())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use datatypes::vectors::StringVector;
|
||||
|
||||
use super::*;
|
||||
use crate::function::FunctionContext;
|
||||
|
||||
#[test]
|
||||
fn test_elem_sum() {
|
||||
let func = ElemSumFunction;
|
||||
|
||||
let input0 = Arc::new(StringVector::from(vec![
|
||||
Some("[1.0,2.0,3.0]".to_string()),
|
||||
Some("[4.0,5.0,6.0]".to_string()),
|
||||
None,
|
||||
]));
|
||||
|
||||
let result = func.eval(FunctionContext::default(), &[input0]).unwrap();
|
||||
|
||||
let result = result.as_ref();
|
||||
assert_eq!(result.len(), 3);
|
||||
assert_eq!(result.get_ref(0).as_f32().unwrap(), Some(6.0));
|
||||
assert_eq!(result.get_ref(1).as_f32().unwrap(), Some(15.0));
|
||||
assert_eq!(result.get_ref(2).as_f32().unwrap(), None);
|
||||
}
|
||||
}
|
||||
223
src/common/function/src/scalars/vector/sub.rs
Normal file
223
src/common/function/src/scalars/vector/sub.rs
Normal file
@@ -0,0 +1,223 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::borrow::Cow;
|
||||
use std::fmt::Display;
|
||||
|
||||
use common_query::error::InvalidFuncArgsSnafu;
|
||||
use common_query::prelude::Signature;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::scalars::ScalarVectorBuilder;
|
||||
use datatypes::vectors::{BinaryVectorBuilder, MutableVector, VectorRef};
|
||||
use nalgebra::DVectorView;
|
||||
use snafu::ensure;
|
||||
|
||||
use crate::function::{Function, FunctionContext};
|
||||
use crate::helper;
|
||||
use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const, veclit_to_binlit};
|
||||
|
||||
const NAME: &str = "vec_sub";
|
||||
|
||||
/// Subtracts corresponding elements of two vectors, returns a vector.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```sql
|
||||
/// SELECT vec_to_string(vec_sub("[1.0, 1.0]", "[1.0, 2.0]")) as result;
|
||||
///
|
||||
/// +---------------------------------------------------------------+
|
||||
/// | vec_to_string(vec_sub(Utf8("[1.0, 1.0]"),Utf8("[1.0, 2.0]"))) |
|
||||
/// +---------------------------------------------------------------+
|
||||
/// | [0,-1] |
|
||||
/// +---------------------------------------------------------------+
|
||||
///
|
||||
/// -- Negative scalar to simulate subtraction
|
||||
/// SELECT vec_to_string(vec_sub('[-1.0, -1.0]', '[1.0, 2.0]'));
|
||||
///
|
||||
/// +-----------------------------------------------------------------+
|
||||
/// | vec_to_string(vec_sub(Utf8("[-1.0, -1.0]"),Utf8("[1.0, 2.0]"))) |
|
||||
/// +-----------------------------------------------------------------+
|
||||
/// | [-2,-3] |
|
||||
/// +-----------------------------------------------------------------+
|
||||
///
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct SubFunction;
|
||||
|
||||
impl Function for SubFunction {
|
||||
fn name(&self) -> &str {
|
||||
NAME
|
||||
}
|
||||
|
||||
fn return_type(
|
||||
&self,
|
||||
_input_types: &[ConcreteDataType],
|
||||
) -> common_query::error::Result<ConcreteDataType> {
|
||||
Ok(ConcreteDataType::binary_datatype())
|
||||
}
|
||||
|
||||
fn signature(&self) -> Signature {
|
||||
helper::one_of_sigs2(
|
||||
vec![
|
||||
ConcreteDataType::string_datatype(),
|
||||
ConcreteDataType::binary_datatype(),
|
||||
],
|
||||
vec![
|
||||
ConcreteDataType::string_datatype(),
|
||||
ConcreteDataType::binary_datatype(),
|
||||
],
|
||||
)
|
||||
}
|
||||
|
||||
fn eval(
|
||||
&self,
|
||||
_func_ctx: FunctionContext,
|
||||
columns: &[VectorRef],
|
||||
) -> common_query::error::Result<VectorRef> {
|
||||
ensure!(
|
||||
columns.len() == 2,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect exactly two, have: {}",
|
||||
columns.len()
|
||||
)
|
||||
}
|
||||
);
|
||||
let arg0 = &columns[0];
|
||||
let arg1 = &columns[1];
|
||||
|
||||
ensure!(
|
||||
arg0.len() == arg1.len(),
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The lengths of the vector are not aligned, args 0: {}, args 1: {}",
|
||||
arg0.len(),
|
||||
arg1.len(),
|
||||
)
|
||||
}
|
||||
);
|
||||
|
||||
let len = arg0.len();
|
||||
let mut result = BinaryVectorBuilder::with_capacity(len);
|
||||
if len == 0 {
|
||||
return Ok(result.to_vector());
|
||||
}
|
||||
|
||||
let arg0_const = as_veclit_if_const(arg0)?;
|
||||
let arg1_const = as_veclit_if_const(arg1)?;
|
||||
|
||||
for i in 0..len {
|
||||
let arg0 = match arg0_const.as_ref() {
|
||||
Some(arg0) => Some(Cow::Borrowed(arg0.as_ref())),
|
||||
None => as_veclit(arg0.get_ref(i))?,
|
||||
};
|
||||
let arg1 = match arg1_const.as_ref() {
|
||||
Some(arg1) => Some(Cow::Borrowed(arg1.as_ref())),
|
||||
None => as_veclit(arg1.get_ref(i))?,
|
||||
};
|
||||
let (Some(arg0), Some(arg1)) = (arg0, arg1) else {
|
||||
result.push_null();
|
||||
continue;
|
||||
};
|
||||
let vec0 = DVectorView::from_slice(&arg0, arg0.len());
|
||||
let vec1 = DVectorView::from_slice(&arg1, arg1.len());
|
||||
|
||||
let vec_res = vec0 - vec1;
|
||||
let veclit = vec_res.as_slice();
|
||||
let binlit = veclit_to_binlit(veclit);
|
||||
result.push(Some(&binlit));
|
||||
}
|
||||
|
||||
Ok(result.to_vector())
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for SubFunction {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", NAME.to_ascii_uppercase())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_query::error::Error;
|
||||
use datatypes::vectors::StringVector;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_sub() {
|
||||
let func = SubFunction;
|
||||
|
||||
let input0 = Arc::new(StringVector::from(vec![
|
||||
Some("[1.0,2.0,3.0]".to_string()),
|
||||
Some("[4.0,5.0,6.0]".to_string()),
|
||||
None,
|
||||
Some("[2.0,3.0,3.0]".to_string()),
|
||||
]));
|
||||
let input1 = Arc::new(StringVector::from(vec![
|
||||
Some("[1.0,1.0,1.0]".to_string()),
|
||||
Some("[6.0,5.0,4.0]".to_string()),
|
||||
Some("[3.0,2.0,2.0]".to_string()),
|
||||
None,
|
||||
]));
|
||||
|
||||
let result = func
|
||||
.eval(FunctionContext::default(), &[input0, input1])
|
||||
.unwrap();
|
||||
|
||||
let result = result.as_ref();
|
||||
assert_eq!(result.len(), 4);
|
||||
assert_eq!(
|
||||
result.get_ref(0).as_binary().unwrap(),
|
||||
Some(veclit_to_binlit(&[0.0, 1.0, 2.0]).as_slice())
|
||||
);
|
||||
assert_eq!(
|
||||
result.get_ref(1).as_binary().unwrap(),
|
||||
Some(veclit_to_binlit(&[-2.0, 0.0, 2.0]).as_slice())
|
||||
);
|
||||
assert!(result.get_ref(2).is_null());
|
||||
assert!(result.get_ref(3).is_null());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sub_error() {
|
||||
let func = SubFunction;
|
||||
|
||||
let input0 = Arc::new(StringVector::from(vec![
|
||||
Some("[1.0,2.0,3.0]".to_string()),
|
||||
Some("[4.0,5.0,6.0]".to_string()),
|
||||
None,
|
||||
Some("[2.0,3.0,3.0]".to_string()),
|
||||
]));
|
||||
let input1 = Arc::new(StringVector::from(vec![
|
||||
Some("[1.0,1.0,1.0]".to_string()),
|
||||
Some("[6.0,5.0,4.0]".to_string()),
|
||||
Some("[3.0,2.0,2.0]".to_string()),
|
||||
]));
|
||||
|
||||
let result = func.eval(FunctionContext::default(), &[input0, input1]);
|
||||
|
||||
match result {
|
||||
Err(Error::InvalidFuncArgs { err_msg, .. }) => {
|
||||
assert_eq!(
|
||||
err_msg,
|
||||
"The lengths of the vector are not aligned, args 0: 4, args 1: 3"
|
||||
)
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
202
src/common/function/src/scalars/vector/sum.rs
Normal file
202
src/common/function/src/scalars/vector/sum.rs
Normal file
@@ -0,0 +1,202 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
|
||||
use common_query::error::{CreateAccumulatorSnafu, Error, InvalidFuncArgsSnafu};
|
||||
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
|
||||
use common_query::prelude::AccumulatorCreatorFunction;
|
||||
use datatypes::prelude::{ConcreteDataType, Value, *};
|
||||
use datatypes::vectors::VectorRef;
|
||||
use nalgebra::{Const, DVectorView, Dyn, OVector};
|
||||
use snafu::ensure;
|
||||
|
||||
use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const, veclit_to_binlit};
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct VectorSum {
|
||||
sum: Option<OVector<f32, Dyn>>,
|
||||
has_null: bool,
|
||||
}
|
||||
|
||||
#[as_aggr_func_creator]
|
||||
#[derive(Debug, Default, AggrFuncTypeStore)]
|
||||
pub struct VectorSumCreator {}
|
||||
|
||||
impl AggregateFunctionCreator for VectorSumCreator {
|
||||
fn creator(&self) -> AccumulatorCreatorFunction {
|
||||
let creator: AccumulatorCreatorFunction = Arc::new(move |types: &[ConcreteDataType]| {
|
||||
ensure!(
|
||||
types.len() == 1,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect exactly one, have: {}",
|
||||
types.len()
|
||||
)
|
||||
}
|
||||
);
|
||||
let input_type = &types[0];
|
||||
match input_type {
|
||||
ConcreteDataType::String(_) | ConcreteDataType::Binary(_) => {
|
||||
Ok(Box::new(VectorSum::default()))
|
||||
}
|
||||
_ => {
|
||||
let err_msg = format!(
|
||||
"\"VEC_SUM\" aggregate function not support data type {:?}",
|
||||
input_type.logical_type_id(),
|
||||
);
|
||||
CreateAccumulatorSnafu { err_msg }.fail()?
|
||||
}
|
||||
}
|
||||
});
|
||||
creator
|
||||
}
|
||||
|
||||
fn output_type(&self) -> common_query::error::Result<ConcreteDataType> {
|
||||
Ok(ConcreteDataType::binary_datatype())
|
||||
}
|
||||
|
||||
fn state_types(&self) -> common_query::error::Result<Vec<ConcreteDataType>> {
|
||||
Ok(vec![self.output_type()?])
|
||||
}
|
||||
}
|
||||
|
||||
impl VectorSum {
|
||||
fn inner(&mut self, len: usize) -> &mut OVector<f32, Dyn> {
|
||||
self.sum
|
||||
.get_or_insert_with(|| OVector::zeros_generic(Dyn(len), Const::<1>))
|
||||
}
|
||||
|
||||
fn update(&mut self, values: &[VectorRef], is_update: bool) -> Result<(), Error> {
|
||||
if values.is_empty() || self.has_null {
|
||||
return Ok(());
|
||||
};
|
||||
let column = &values[0];
|
||||
let len = column.len();
|
||||
|
||||
match as_veclit_if_const(column)? {
|
||||
Some(column) => {
|
||||
let vec_column = DVectorView::from_slice(&column, column.len()).scale(len as f32);
|
||||
*self.inner(vec_column.len()) += vec_column;
|
||||
}
|
||||
None => {
|
||||
for i in 0..len {
|
||||
let Some(arg0) = as_veclit(column.get_ref(i))? else {
|
||||
if is_update {
|
||||
self.has_null = true;
|
||||
self.sum = None;
|
||||
}
|
||||
return Ok(());
|
||||
};
|
||||
let vec_column = DVectorView::from_slice(&arg0, arg0.len());
|
||||
*self.inner(vec_column.len()) += vec_column;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Accumulator for VectorSum {
|
||||
fn state(&self) -> common_query::error::Result<Vec<Value>> {
|
||||
self.evaluate().map(|v| vec![v])
|
||||
}
|
||||
|
||||
fn update_batch(&mut self, values: &[VectorRef]) -> common_query::error::Result<()> {
|
||||
self.update(values, true)
|
||||
}
|
||||
|
||||
fn merge_batch(&mut self, states: &[VectorRef]) -> common_query::error::Result<()> {
|
||||
self.update(states, false)
|
||||
}
|
||||
|
||||
fn evaluate(&self) -> common_query::error::Result<Value> {
|
||||
match &self.sum {
|
||||
None => Ok(Value::Null),
|
||||
Some(vector) => Ok(Value::from(veclit_to_binlit(vector.as_slice()))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use datatypes::vectors::{ConstantVector, StringVector};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_update_batch() {
|
||||
// test update empty batch, expect not updating anything
|
||||
let mut vec_sum = VectorSum::default();
|
||||
vec_sum.update_batch(&[]).unwrap();
|
||||
assert!(vec_sum.sum.is_none());
|
||||
assert!(!vec_sum.has_null);
|
||||
assert_eq!(Value::Null, vec_sum.evaluate().unwrap());
|
||||
|
||||
// test update one not-null value
|
||||
let mut vec_sum = VectorSum::default();
|
||||
let v: Vec<VectorRef> = vec![Arc::new(StringVector::from(vec![Some(
|
||||
"[1.0,2.0,3.0]".to_string(),
|
||||
)]))];
|
||||
vec_sum.update_batch(&v).unwrap();
|
||||
assert_eq!(
|
||||
Value::from(veclit_to_binlit(&[1.0, 2.0, 3.0])),
|
||||
vec_sum.evaluate().unwrap()
|
||||
);
|
||||
|
||||
// test update one null value
|
||||
let mut vec_sum = VectorSum::default();
|
||||
let v: Vec<VectorRef> = vec![Arc::new(StringVector::from(vec![Option::<String>::None]))];
|
||||
vec_sum.update_batch(&v).unwrap();
|
||||
assert_eq!(Value::Null, vec_sum.evaluate().unwrap());
|
||||
|
||||
// test update no null-value batch
|
||||
let mut vec_sum = VectorSum::default();
|
||||
let v: Vec<VectorRef> = vec![Arc::new(StringVector::from(vec![
|
||||
Some("[1.0,2.0,3.0]".to_string()),
|
||||
Some("[4.0,5.0,6.0]".to_string()),
|
||||
Some("[7.0,8.0,9.0]".to_string()),
|
||||
]))];
|
||||
vec_sum.update_batch(&v).unwrap();
|
||||
assert_eq!(
|
||||
Value::from(veclit_to_binlit(&[12.0, 15.0, 18.0])),
|
||||
vec_sum.evaluate().unwrap()
|
||||
);
|
||||
|
||||
// test update null-value batch
|
||||
let mut vec_sum = VectorSum::default();
|
||||
let v: Vec<VectorRef> = vec![Arc::new(StringVector::from(vec![
|
||||
Some("[1.0,2.0,3.0]".to_string()),
|
||||
None,
|
||||
Some("[7.0,8.0,9.0]".to_string()),
|
||||
]))];
|
||||
vec_sum.update_batch(&v).unwrap();
|
||||
assert_eq!(Value::Null, vec_sum.evaluate().unwrap());
|
||||
|
||||
// test update with constant vector
|
||||
let mut vec_sum = VectorSum::default();
|
||||
let v: Vec<VectorRef> = vec![Arc::new(ConstantVector::new(
|
||||
Arc::new(StringVector::from_vec(vec!["[1.0,2.0,3.0]".to_string()])),
|
||||
4,
|
||||
))];
|
||||
vec_sum.update_batch(&v).unwrap();
|
||||
assert_eq!(
|
||||
Value::from(veclit_to_binlit(&[4.0, 8.0, 12.0])),
|
||||
vec_sum.evaluate().unwrap()
|
||||
);
|
||||
}
|
||||
}
|
||||
218
src/common/function/src/scalars/vector/vector_div.rs
Normal file
218
src/common/function/src/scalars/vector/vector_div.rs
Normal file
@@ -0,0 +1,218 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::borrow::Cow;
|
||||
use std::fmt::Display;
|
||||
|
||||
use common_query::error::{InvalidFuncArgsSnafu, Result};
|
||||
use common_query::prelude::Signature;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::scalars::ScalarVectorBuilder;
|
||||
use datatypes::vectors::{BinaryVectorBuilder, MutableVector, VectorRef};
|
||||
use nalgebra::DVectorView;
|
||||
use snafu::ensure;
|
||||
|
||||
use crate::function::{Function, FunctionContext};
|
||||
use crate::helper;
|
||||
use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const, veclit_to_binlit};
|
||||
|
||||
const NAME: &str = "vec_div";
|
||||
|
||||
/// Divides corresponding elements of two vectors.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```sql
|
||||
/// SELECT vec_to_string(vec_div("[2, 4, 6]", "[2, 2, 2]")) as result;
|
||||
///
|
||||
/// +---------+
|
||||
/// | result |
|
||||
/// +---------+
|
||||
/// | [1,2,3] |
|
||||
/// +---------+
|
||||
///
|
||||
/// ```
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct VectorDivFunction;
|
||||
|
||||
impl Function for VectorDivFunction {
|
||||
fn name(&self) -> &str {
|
||||
NAME
|
||||
}
|
||||
|
||||
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
|
||||
Ok(ConcreteDataType::binary_datatype())
|
||||
}
|
||||
|
||||
fn signature(&self) -> Signature {
|
||||
helper::one_of_sigs2(
|
||||
vec![
|
||||
ConcreteDataType::string_datatype(),
|
||||
ConcreteDataType::binary_datatype(),
|
||||
],
|
||||
vec![
|
||||
ConcreteDataType::string_datatype(),
|
||||
ConcreteDataType::binary_datatype(),
|
||||
],
|
||||
)
|
||||
}
|
||||
|
||||
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
|
||||
ensure!(
|
||||
columns.len() == 2,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect exactly two, have: {}",
|
||||
columns.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
let arg0 = &columns[0];
|
||||
let arg1 = &columns[1];
|
||||
|
||||
let len = arg0.len();
|
||||
let mut result = BinaryVectorBuilder::with_capacity(len);
|
||||
if len == 0 {
|
||||
return Ok(result.to_vector());
|
||||
}
|
||||
|
||||
let arg0_const = as_veclit_if_const(arg0)?;
|
||||
let arg1_const = as_veclit_if_const(arg1)?;
|
||||
|
||||
for i in 0..len {
|
||||
let arg0 = match arg0_const.as_ref() {
|
||||
Some(arg0) => Some(Cow::Borrowed(arg0.as_ref())),
|
||||
None => as_veclit(arg0.get_ref(i))?,
|
||||
};
|
||||
|
||||
let arg1 = match arg1_const.as_ref() {
|
||||
Some(arg1) => Some(Cow::Borrowed(arg1.as_ref())),
|
||||
None => as_veclit(arg1.get_ref(i))?,
|
||||
};
|
||||
|
||||
if let (Some(arg0), Some(arg1)) = (arg0, arg1) {
|
||||
ensure!(
|
||||
arg0.len() == arg1.len(),
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the vectors must match for division, have: {} vs {}",
|
||||
arg0.len(),
|
||||
arg1.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
let vec0 = DVectorView::from_slice(&arg0, arg0.len());
|
||||
let vec1 = DVectorView::from_slice(&arg1, arg1.len());
|
||||
let vec_res = vec0.component_div(&vec1);
|
||||
|
||||
let veclit = vec_res.as_slice();
|
||||
let binlit = veclit_to_binlit(veclit);
|
||||
result.push(Some(&binlit));
|
||||
} else {
|
||||
result.push_null();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(result.to_vector())
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for VectorDivFunction {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", NAME.to_ascii_uppercase())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_query::error;
|
||||
use datatypes::vectors::StringVector;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_vector_mul() {
|
||||
let func = VectorDivFunction;
|
||||
|
||||
let vec0 = vec![1.0, 2.0, 3.0];
|
||||
let vec1 = vec![1.0, 1.0];
|
||||
let (len0, len1) = (vec0.len(), vec1.len());
|
||||
let input0 = Arc::new(StringVector::from(vec![Some(format!("{vec0:?}"))]));
|
||||
let input1 = Arc::new(StringVector::from(vec![Some(format!("{vec1:?}"))]));
|
||||
|
||||
let err = func
|
||||
.eval(FunctionContext::default(), &[input0, input1])
|
||||
.unwrap_err();
|
||||
|
||||
match err {
|
||||
error::Error::InvalidFuncArgs { err_msg, .. } => {
|
||||
assert_eq!(
|
||||
err_msg,
|
||||
format!(
|
||||
"The length of the vectors must match for division, have: {} vs {}",
|
||||
len0, len1
|
||||
)
|
||||
)
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
|
||||
let input0 = Arc::new(StringVector::from(vec![
|
||||
Some("[1.0,2.0,3.0]".to_string()),
|
||||
Some("[8.0,10.0,12.0]".to_string()),
|
||||
Some("[7.0,8.0,9.0]".to_string()),
|
||||
None,
|
||||
]));
|
||||
|
||||
let input1 = Arc::new(StringVector::from(vec![
|
||||
Some("[1.0,1.0,1.0]".to_string()),
|
||||
Some("[2.0,2.0,2.0]".to_string()),
|
||||
None,
|
||||
Some("[3.0,3.0,3.0]".to_string()),
|
||||
]));
|
||||
|
||||
let result = func
|
||||
.eval(FunctionContext::default(), &[input0, input1])
|
||||
.unwrap();
|
||||
|
||||
let result = result.as_ref();
|
||||
assert_eq!(result.len(), 4);
|
||||
assert_eq!(
|
||||
result.get_ref(0).as_binary().unwrap(),
|
||||
Some(veclit_to_binlit(&[1.0, 2.0, 3.0]).as_slice())
|
||||
);
|
||||
assert_eq!(
|
||||
result.get_ref(1).as_binary().unwrap(),
|
||||
Some(veclit_to_binlit(&[4.0, 5.0, 6.0]).as_slice())
|
||||
);
|
||||
assert!(result.get_ref(2).is_null());
|
||||
assert!(result.get_ref(3).is_null());
|
||||
|
||||
let input0 = Arc::new(StringVector::from(vec![Some("[1.0,-2.0]".to_string())]));
|
||||
let input1 = Arc::new(StringVector::from(vec![Some("[0.0,0.0]".to_string())]));
|
||||
|
||||
let result = func
|
||||
.eval(FunctionContext::default(), &[input0, input1])
|
||||
.unwrap();
|
||||
|
||||
let result = result.as_ref();
|
||||
assert_eq!(
|
||||
result.get_ref(0).as_binary().unwrap(),
|
||||
Some(veclit_to_binlit(&[f64::INFINITY as f32, f64::NEG_INFINITY as f32]).as_slice())
|
||||
);
|
||||
}
|
||||
}
|
||||
205
src/common/function/src/scalars/vector/vector_mul.rs
Normal file
205
src/common/function/src/scalars/vector/vector_mul.rs
Normal file
@@ -0,0 +1,205 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::borrow::Cow;
|
||||
use std::fmt::Display;
|
||||
|
||||
use common_query::error::{InvalidFuncArgsSnafu, Result};
|
||||
use common_query::prelude::Signature;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::scalars::ScalarVectorBuilder;
|
||||
use datatypes::vectors::{BinaryVectorBuilder, MutableVector, VectorRef};
|
||||
use nalgebra::DVectorView;
|
||||
use snafu::ensure;
|
||||
|
||||
use crate::function::{Function, FunctionContext};
|
||||
use crate::helper;
|
||||
use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const, veclit_to_binlit};
|
||||
|
||||
const NAME: &str = "vec_mul";
|
||||
|
||||
/// Multiplies corresponding elements of two vectors.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```sql
|
||||
/// SELECT vec_to_string(vec_mul("[1, 2, 3]", "[1, 2, 3]")) as result;
|
||||
///
|
||||
/// +---------+
|
||||
/// | result |
|
||||
/// +---------+
|
||||
/// | [1,4,9] |
|
||||
/// +---------+
|
||||
///
|
||||
/// ```
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct VectorMulFunction;
|
||||
|
||||
impl Function for VectorMulFunction {
|
||||
fn name(&self) -> &str {
|
||||
NAME
|
||||
}
|
||||
|
||||
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
|
||||
Ok(ConcreteDataType::binary_datatype())
|
||||
}
|
||||
|
||||
fn signature(&self) -> Signature {
|
||||
helper::one_of_sigs2(
|
||||
vec![
|
||||
ConcreteDataType::string_datatype(),
|
||||
ConcreteDataType::binary_datatype(),
|
||||
],
|
||||
vec![
|
||||
ConcreteDataType::string_datatype(),
|
||||
ConcreteDataType::binary_datatype(),
|
||||
],
|
||||
)
|
||||
}
|
||||
|
||||
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
|
||||
ensure!(
|
||||
columns.len() == 2,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect exactly two, have: {}",
|
||||
columns.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
let arg0 = &columns[0];
|
||||
let arg1 = &columns[1];
|
||||
|
||||
let len = arg0.len();
|
||||
let mut result = BinaryVectorBuilder::with_capacity(len);
|
||||
if len == 0 {
|
||||
return Ok(result.to_vector());
|
||||
}
|
||||
|
||||
let arg0_const = as_veclit_if_const(arg0)?;
|
||||
let arg1_const = as_veclit_if_const(arg1)?;
|
||||
|
||||
for i in 0..len {
|
||||
let arg0 = match arg0_const.as_ref() {
|
||||
Some(arg0) => Some(Cow::Borrowed(arg0.as_ref())),
|
||||
None => as_veclit(arg0.get_ref(i))?,
|
||||
};
|
||||
|
||||
let arg1 = match arg1_const.as_ref() {
|
||||
Some(arg1) => Some(Cow::Borrowed(arg1.as_ref())),
|
||||
None => as_veclit(arg1.get_ref(i))?,
|
||||
};
|
||||
|
||||
if let (Some(arg0), Some(arg1)) = (arg0, arg1) {
|
||||
ensure!(
|
||||
arg0.len() == arg1.len(),
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the vectors must match for multiplying, have: {} vs {}",
|
||||
arg0.len(),
|
||||
arg1.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
let vec0 = DVectorView::from_slice(&arg0, arg0.len());
|
||||
let vec1 = DVectorView::from_slice(&arg1, arg1.len());
|
||||
let vec_res = vec1.component_mul(&vec0);
|
||||
|
||||
let veclit = vec_res.as_slice();
|
||||
let binlit = veclit_to_binlit(veclit);
|
||||
result.push(Some(&binlit));
|
||||
} else {
|
||||
result.push_null();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(result.to_vector())
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for VectorMulFunction {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", NAME.to_ascii_uppercase())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_query::error;
|
||||
use datatypes::vectors::StringVector;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_vector_mul() {
|
||||
let func = VectorMulFunction;
|
||||
|
||||
let vec0 = vec![1.0, 2.0, 3.0];
|
||||
let vec1 = vec![1.0, 1.0];
|
||||
let (len0, len1) = (vec0.len(), vec1.len());
|
||||
let input0 = Arc::new(StringVector::from(vec![Some(format!("{vec0:?}"))]));
|
||||
let input1 = Arc::new(StringVector::from(vec![Some(format!("{vec1:?}"))]));
|
||||
|
||||
let err = func
|
||||
.eval(FunctionContext::default(), &[input0, input1])
|
||||
.unwrap_err();
|
||||
|
||||
match err {
|
||||
error::Error::InvalidFuncArgs { err_msg, .. } => {
|
||||
assert_eq!(
|
||||
err_msg,
|
||||
format!(
|
||||
"The length of the vectors must match for multiplying, have: {} vs {}",
|
||||
len0, len1
|
||||
)
|
||||
)
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
|
||||
let input0 = Arc::new(StringVector::from(vec![
|
||||
Some("[1.0,2.0,3.0]".to_string()),
|
||||
Some("[8.0,10.0,12.0]".to_string()),
|
||||
Some("[7.0,8.0,9.0]".to_string()),
|
||||
None,
|
||||
]));
|
||||
|
||||
let input1 = Arc::new(StringVector::from(vec![
|
||||
Some("[1.0,1.0,1.0]".to_string()),
|
||||
Some("[2.0,2.0,2.0]".to_string()),
|
||||
None,
|
||||
Some("[3.0,3.0,3.0]".to_string()),
|
||||
]));
|
||||
|
||||
let result = func
|
||||
.eval(FunctionContext::default(), &[input0, input1])
|
||||
.unwrap();
|
||||
|
||||
let result = result.as_ref();
|
||||
assert_eq!(result.len(), 4);
|
||||
assert_eq!(
|
||||
result.get_ref(0).as_binary().unwrap(),
|
||||
Some(veclit_to_binlit(&[1.0, 2.0, 3.0]).as_slice())
|
||||
);
|
||||
assert_eq!(
|
||||
result.get_ref(1).as_binary().unwrap(),
|
||||
Some(veclit_to_binlit(&[16.0, 20.0, 24.0]).as_slice())
|
||||
);
|
||||
assert!(result.get_ref(2).is_null());
|
||||
assert!(result.get_ref(3).is_null());
|
||||
}
|
||||
}
|
||||
58
src/common/function/src/utils.rs
Normal file
58
src/common/function/src/utils.rs
Normal file
@@ -0,0 +1,58 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
/// Escapes special characters in the provided pattern string for `LIKE`.
|
||||
///
|
||||
/// Specifically, it prefixes the backslash (`\`), percent (`%`), and underscore (`_`)
|
||||
/// characters with an additional backslash to ensure they are treated literally.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```rust
|
||||
/// let escaped = escape_pattern("100%_some\\path");
|
||||
/// assert_eq!(escaped, "100\\%\\_some\\\\path");
|
||||
/// ```
|
||||
pub fn escape_like_pattern(pattern: &str) -> String {
|
||||
pattern
|
||||
.chars()
|
||||
.flat_map(|c| match c {
|
||||
'\\' | '%' | '_' => vec!['\\', c],
|
||||
_ => vec![c],
|
||||
})
|
||||
.collect::<String>()
|
||||
}
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_escape_like_pattern() {
|
||||
assert_eq!(
|
||||
escape_like_pattern("100%_some\\path"),
|
||||
"100\\%\\_some\\\\path"
|
||||
);
|
||||
assert_eq!(escape_like_pattern(""), "");
|
||||
assert_eq!(escape_like_pattern("hello"), "hello");
|
||||
assert_eq!(escape_like_pattern("\\%_"), "\\\\\\%\\_");
|
||||
assert_eq!(escape_like_pattern("%%__\\\\"), "\\%\\%\\_\\_\\\\\\\\");
|
||||
assert_eq!(escape_like_pattern("abc123"), "abc123");
|
||||
assert_eq!(escape_like_pattern("%_\\"), "\\%\\_\\\\");
|
||||
assert_eq!(
|
||||
escape_like_pattern("%%__\\\\another%string"),
|
||||
"\\%\\%\\_\\_\\\\\\\\another\\%string"
|
||||
);
|
||||
assert_eq!(escape_like_pattern("foo%bar_"), "foo\\%bar\\_");
|
||||
assert_eq!(escape_like_pattern("\\_\\%"), "\\\\\\_\\\\\\%");
|
||||
}
|
||||
}
|
||||
44
src/common/meta/src/cache/container.rs
vendored
44
src/common/meta/src/cache/container.rs
vendored
@@ -43,7 +43,7 @@ pub struct CacheContainer<K, V, CacheToken> {
|
||||
cache: Cache<K, V>,
|
||||
invalidator: Invalidator<K, V, CacheToken>,
|
||||
initializer: Initializer<K, V>,
|
||||
token_filter: TokenFilter<CacheToken>,
|
||||
token_filter: fn(&CacheToken) -> bool,
|
||||
}
|
||||
|
||||
impl<K, V, CacheToken> CacheContainer<K, V, CacheToken>
|
||||
@@ -58,7 +58,7 @@ where
|
||||
cache: Cache<K, V>,
|
||||
invalidator: Invalidator<K, V, CacheToken>,
|
||||
initializer: Initializer<K, V>,
|
||||
token_filter: TokenFilter<CacheToken>,
|
||||
token_filter: fn(&CacheToken) -> bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
name,
|
||||
@@ -206,10 +206,13 @@ mod tests {
|
||||
name: &'a str,
|
||||
}
|
||||
|
||||
fn always_true_filter(_: &String) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get() {
|
||||
let cache: Cache<NameKey, String> = CacheBuilder::new(128).build();
|
||||
let filter: TokenFilter<String> = Box::new(|_| true);
|
||||
let counter = Arc::new(AtomicI32::new(0));
|
||||
let moved_counter = counter.clone();
|
||||
let init: Initializer<NameKey, String> = Arc::new(move |_| {
|
||||
@@ -219,7 +222,13 @@ mod tests {
|
||||
let invalidator: Invalidator<NameKey, String, String> =
|
||||
Box::new(|_, _| Box::pin(async { Ok(()) }));
|
||||
|
||||
let adv_cache = CacheContainer::new("test".to_string(), cache, invalidator, init, filter);
|
||||
let adv_cache = CacheContainer::new(
|
||||
"test".to_string(),
|
||||
cache,
|
||||
invalidator,
|
||||
init,
|
||||
always_true_filter,
|
||||
);
|
||||
let key = NameKey { name: "key" };
|
||||
let value = adv_cache.get(key).await.unwrap().unwrap();
|
||||
assert_eq!(value, "hi");
|
||||
@@ -233,7 +242,6 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_get_by_ref() {
|
||||
let cache: Cache<String, String> = CacheBuilder::new(128).build();
|
||||
let filter: TokenFilter<String> = Box::new(|_| true);
|
||||
let counter = Arc::new(AtomicI32::new(0));
|
||||
let moved_counter = counter.clone();
|
||||
let init: Initializer<String, String> = Arc::new(move |_| {
|
||||
@@ -243,7 +251,13 @@ mod tests {
|
||||
let invalidator: Invalidator<String, String, String> =
|
||||
Box::new(|_, _| Box::pin(async { Ok(()) }));
|
||||
|
||||
let adv_cache = CacheContainer::new("test".to_string(), cache, invalidator, init, filter);
|
||||
let adv_cache = CacheContainer::new(
|
||||
"test".to_string(),
|
||||
cache,
|
||||
invalidator,
|
||||
init,
|
||||
always_true_filter,
|
||||
);
|
||||
let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap();
|
||||
assert_eq!(value, "hi");
|
||||
let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap();
|
||||
@@ -257,13 +271,18 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_get_value_not_exits() {
|
||||
let cache: Cache<String, String> = CacheBuilder::new(128).build();
|
||||
let filter: TokenFilter<String> = Box::new(|_| true);
|
||||
let init: Initializer<String, String> =
|
||||
Arc::new(move |_| Box::pin(async { error::ValueNotExistSnafu {}.fail() }));
|
||||
let invalidator: Invalidator<String, String, String> =
|
||||
Box::new(|_, _| Box::pin(async { Ok(()) }));
|
||||
|
||||
let adv_cache = CacheContainer::new("test".to_string(), cache, invalidator, init, filter);
|
||||
let adv_cache = CacheContainer::new(
|
||||
"test".to_string(),
|
||||
cache,
|
||||
invalidator,
|
||||
init,
|
||||
always_true_filter,
|
||||
);
|
||||
let value = adv_cache.get_by_ref("foo").await.unwrap();
|
||||
assert!(value.is_none());
|
||||
}
|
||||
@@ -271,7 +290,6 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_invalidate() {
|
||||
let cache: Cache<String, String> = CacheBuilder::new(128).build();
|
||||
let filter: TokenFilter<String> = Box::new(|_| true);
|
||||
let counter = Arc::new(AtomicI32::new(0));
|
||||
let moved_counter = counter.clone();
|
||||
let init: Initializer<String, String> = Arc::new(move |_| {
|
||||
@@ -285,7 +303,13 @@ mod tests {
|
||||
})
|
||||
});
|
||||
|
||||
let adv_cache = CacheContainer::new("test".to_string(), cache, invalidator, init, filter);
|
||||
let adv_cache = CacheContainer::new(
|
||||
"test".to_string(),
|
||||
cache,
|
||||
invalidator,
|
||||
init,
|
||||
always_true_filter,
|
||||
);
|
||||
let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap();
|
||||
assert_eq!(value, "hi");
|
||||
let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap();
|
||||
|
||||
@@ -45,7 +45,7 @@ pub fn new_table_flownode_set_cache(
|
||||
let table_flow_manager = Arc::new(TableFlowManager::new(kv_backend));
|
||||
let init = init_factory(table_flow_manager);
|
||||
|
||||
CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter))
|
||||
CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
|
||||
}
|
||||
|
||||
fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId, FlownodeSet> {
|
||||
|
||||
22
src/common/meta/src/cache/registry.rs
vendored
22
src/common/meta/src/cache/registry.rs
vendored
@@ -151,12 +151,15 @@ mod tests {
|
||||
use crate::cache::*;
|
||||
use crate::instruction::CacheIdent;
|
||||
|
||||
fn always_true_filter(_: &CacheIdent) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn test_cache(
|
||||
name: &str,
|
||||
invalidator: Invalidator<String, String, CacheIdent>,
|
||||
) -> CacheContainer<String, String, CacheIdent> {
|
||||
let cache: Cache<String, String> = CacheBuilder::new(128).build();
|
||||
let filter: TokenFilter<CacheIdent> = Box::new(|_| true);
|
||||
let counter = Arc::new(AtomicI32::new(0));
|
||||
let moved_counter = counter.clone();
|
||||
let init: Initializer<String, String> = Arc::new(move |_| {
|
||||
@@ -164,7 +167,13 @@ mod tests {
|
||||
Box::pin(async { Ok(Some("hi".to_string())) })
|
||||
});
|
||||
|
||||
CacheContainer::new(name.to_string(), cache, invalidator, init, filter)
|
||||
CacheContainer::new(
|
||||
name.to_string(),
|
||||
cache,
|
||||
invalidator,
|
||||
init,
|
||||
always_true_filter,
|
||||
)
|
||||
}
|
||||
|
||||
fn test_i32_cache(
|
||||
@@ -172,7 +181,6 @@ mod tests {
|
||||
invalidator: Invalidator<i32, String, CacheIdent>,
|
||||
) -> CacheContainer<i32, String, CacheIdent> {
|
||||
let cache: Cache<i32, String> = CacheBuilder::new(128).build();
|
||||
let filter: TokenFilter<CacheIdent> = Box::new(|_| true);
|
||||
let counter = Arc::new(AtomicI32::new(0));
|
||||
let moved_counter = counter.clone();
|
||||
let init: Initializer<i32, String> = Arc::new(move |_| {
|
||||
@@ -180,7 +188,13 @@ mod tests {
|
||||
Box::pin(async { Ok(Some("foo".to_string())) })
|
||||
});
|
||||
|
||||
CacheContainer::new(name.to_string(), cache, invalidator, init, filter)
|
||||
CacheContainer::new(
|
||||
name.to_string(),
|
||||
cache,
|
||||
invalidator,
|
||||
init,
|
||||
always_true_filter,
|
||||
)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
2
src/common/meta/src/cache/table/schema.rs
vendored
2
src/common/meta/src/cache/table/schema.rs
vendored
@@ -36,7 +36,7 @@ pub fn new_schema_cache(
|
||||
let schema_manager = SchemaManager::new(kv_backend.clone());
|
||||
let init = init_factory(schema_manager);
|
||||
|
||||
CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter))
|
||||
CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
|
||||
}
|
||||
|
||||
fn init_factory(schema_manager: SchemaManager) -> Initializer<SchemaName, Arc<SchemaNameValue>> {
|
||||
|
||||
@@ -41,7 +41,7 @@ pub fn new_table_info_cache(
|
||||
let table_info_manager = Arc::new(TableInfoManager::new(kv_backend));
|
||||
let init = init_factory(table_info_manager);
|
||||
|
||||
CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter))
|
||||
CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
|
||||
}
|
||||
|
||||
fn init_factory(table_info_manager: TableInfoManagerRef) -> Initializer<TableId, Arc<TableInfo>> {
|
||||
|
||||
@@ -41,7 +41,7 @@ pub fn new_table_name_cache(
|
||||
let table_name_manager = Arc::new(TableNameManager::new(kv_backend));
|
||||
let init = init_factory(table_name_manager);
|
||||
|
||||
CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter))
|
||||
CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
|
||||
}
|
||||
|
||||
fn init_factory(table_name_manager: TableNameManagerRef) -> Initializer<TableName, TableId> {
|
||||
|
||||
@@ -65,7 +65,7 @@ pub fn new_table_route_cache(
|
||||
let table_info_manager = Arc::new(TableRouteManager::new(kv_backend));
|
||||
let init = init_factory(table_info_manager);
|
||||
|
||||
CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter))
|
||||
CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
|
||||
}
|
||||
|
||||
fn init_factory(
|
||||
|
||||
@@ -40,7 +40,7 @@ pub fn new_table_schema_cache(
|
||||
let table_info_manager = TableInfoManager::new(kv_backend);
|
||||
let init = init_factory(table_info_manager);
|
||||
|
||||
CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter))
|
||||
CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
|
||||
}
|
||||
|
||||
fn init_factory(table_info_manager: TableInfoManager) -> Initializer<TableId, Arc<SchemaName>> {
|
||||
|
||||
2
src/common/meta/src/cache/table/view_info.rs
vendored
2
src/common/meta/src/cache/table/view_info.rs
vendored
@@ -40,7 +40,7 @@ pub fn new_view_info_cache(
|
||||
let view_info_manager = Arc::new(ViewInfoManager::new(kv_backend));
|
||||
let init = init_factory(view_info_manager);
|
||||
|
||||
CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter))
|
||||
CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
|
||||
}
|
||||
|
||||
fn init_factory(view_info_manager: ViewInfoManagerRef) -> Initializer<TableId, Arc<ViewInfoValue>> {
|
||||
|
||||
@@ -137,6 +137,7 @@ use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue};
|
||||
use self::table_route::{TableRouteManager, TableRouteValue};
|
||||
use self::tombstone::TombstoneManager;
|
||||
use crate::error::{self, Result, SerdeJsonSnafu};
|
||||
use crate::key::flow::flow_state::FlowStateValue;
|
||||
use crate::key::node_address::NodeAddressValue;
|
||||
use crate::key::table_route::TableRouteKey;
|
||||
use crate::key::txn_helper::TxnOpGetResponseSet;
|
||||
@@ -1262,7 +1263,8 @@ impl_metadata_value! {
|
||||
FlowRouteValue,
|
||||
TableFlowValue,
|
||||
NodeAddressValue,
|
||||
SchemaNameValue
|
||||
SchemaNameValue,
|
||||
FlowStateValue
|
||||
}
|
||||
|
||||
impl_optional_metadata_value! {
|
||||
|
||||
@@ -13,7 +13,6 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::fmt::Display;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_catalog::consts::DEFAULT_CATALOG_NAME;
|
||||
use futures::stream::BoxStream;
|
||||
@@ -146,7 +145,7 @@ impl CatalogManager {
|
||||
self.kv_backend.clone(),
|
||||
req,
|
||||
DEFAULT_PAGE_SIZE,
|
||||
Arc::new(catalog_decoder),
|
||||
catalog_decoder,
|
||||
)
|
||||
.into_stream();
|
||||
|
||||
@@ -156,6 +155,8 @@ impl CatalogManager {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::*;
|
||||
use crate::kv_backend::memory::MemoryKvBackend;
|
||||
|
||||
|
||||
@@ -14,7 +14,6 @@
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::Display;
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::stream::BoxStream;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -166,7 +165,7 @@ impl DatanodeTableManager {
|
||||
self.kv_backend.clone(),
|
||||
req,
|
||||
DEFAULT_PAGE_SIZE,
|
||||
Arc::new(datanode_table_value_decoder),
|
||||
datanode_table_value_decoder,
|
||||
)
|
||||
.into_stream();
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
pub mod flow_info;
|
||||
pub(crate) mod flow_name;
|
||||
pub(crate) mod flow_route;
|
||||
pub mod flow_state;
|
||||
pub(crate) mod flownode_flow;
|
||||
pub(crate) mod table_flow;
|
||||
|
||||
@@ -35,6 +36,7 @@ use crate::ensure_values;
|
||||
use crate::error::{self, Result};
|
||||
use crate::key::flow::flow_info::FlowInfoManager;
|
||||
use crate::key::flow::flow_name::FlowNameManager;
|
||||
use crate::key::flow::flow_state::FlowStateManager;
|
||||
use crate::key::flow::flownode_flow::FlownodeFlowManager;
|
||||
pub use crate::key::flow::table_flow::{TableFlowManager, TableFlowManagerRef};
|
||||
use crate::key::txn_helper::TxnOpGetResponseSet;
|
||||
@@ -102,6 +104,8 @@ pub struct FlowMetadataManager {
|
||||
flownode_flow_manager: FlownodeFlowManager,
|
||||
table_flow_manager: TableFlowManager,
|
||||
flow_name_manager: FlowNameManager,
|
||||
/// only metasrv have access to itself's memory backend, so for other case it should be None
|
||||
flow_state_manager: Option<FlowStateManager>,
|
||||
kv_backend: KvBackendRef,
|
||||
}
|
||||
|
||||
@@ -114,6 +118,7 @@ impl FlowMetadataManager {
|
||||
flow_name_manager: FlowNameManager::new(kv_backend.clone()),
|
||||
flownode_flow_manager: FlownodeFlowManager::new(kv_backend.clone()),
|
||||
table_flow_manager: TableFlowManager::new(kv_backend.clone()),
|
||||
flow_state_manager: None,
|
||||
kv_backend,
|
||||
}
|
||||
}
|
||||
@@ -123,6 +128,10 @@ impl FlowMetadataManager {
|
||||
&self.flow_name_manager
|
||||
}
|
||||
|
||||
pub fn flow_state_manager(&self) -> Option<&FlowStateManager> {
|
||||
self.flow_state_manager.as_ref()
|
||||
}
|
||||
|
||||
/// Returns the [`FlowInfoManager`].
|
||||
pub fn flow_info_manager(&self) -> &FlowInfoManager {
|
||||
&self.flow_info_manager
|
||||
|
||||
@@ -12,8 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::stream::BoxStream;
|
||||
use lazy_static::lazy_static;
|
||||
use regex::Regex;
|
||||
@@ -201,7 +199,7 @@ impl FlowNameManager {
|
||||
self.kv_backend.clone(),
|
||||
req,
|
||||
DEFAULT_PAGE_SIZE,
|
||||
Arc::new(flow_name_decoder),
|
||||
flow_name_decoder,
|
||||
)
|
||||
.into_stream();
|
||||
|
||||
|
||||
@@ -12,8 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::stream::BoxStream;
|
||||
use lazy_static::lazy_static;
|
||||
use regex::Regex;
|
||||
@@ -179,7 +177,7 @@ impl FlowRouteManager {
|
||||
self.kv_backend.clone(),
|
||||
req,
|
||||
DEFAULT_PAGE_SIZE,
|
||||
Arc::new(flow_route_decoder),
|
||||
flow_route_decoder,
|
||||
)
|
||||
.into_stream();
|
||||
|
||||
|
||||
162
src/common/meta/src/key/flow/flow_state.rs
Normal file
162
src/common/meta/src/key/flow/flow_state.rs
Normal file
@@ -0,0 +1,162 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::key::flow::FlowScoped;
|
||||
use crate::key::{FlowId, MetadataKey, MetadataValue};
|
||||
use crate::kv_backend::KvBackendRef;
|
||||
use crate::rpc::store::PutRequest;
|
||||
|
||||
/// The entire FlowId to Flow Size's Map is stored directly in the value part of the key.
|
||||
const FLOW_STATE_KEY: &str = "state";
|
||||
|
||||
/// The key of flow state.
|
||||
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||
struct FlowStateKeyInner;
|
||||
|
||||
impl FlowStateKeyInner {
|
||||
pub fn new() -> Self {
|
||||
Self
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> MetadataKey<'a, FlowStateKeyInner> for FlowStateKeyInner {
|
||||
fn to_bytes(&self) -> Vec<u8> {
|
||||
FLOW_STATE_KEY.as_bytes().to_vec()
|
||||
}
|
||||
|
||||
fn from_bytes(bytes: &'a [u8]) -> Result<FlowStateKeyInner> {
|
||||
let key = std::str::from_utf8(bytes).map_err(|e| {
|
||||
error::InvalidMetadataSnafu {
|
||||
err_msg: format!(
|
||||
"FlowInfoKeyInner '{}' is not a valid UTF8 string: {e}",
|
||||
String::from_utf8_lossy(bytes)
|
||||
),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
if key != FLOW_STATE_KEY {
|
||||
return Err(error::InvalidMetadataSnafu {
|
||||
err_msg: format!("Invalid FlowStateKeyInner '{key}'"),
|
||||
}
|
||||
.build());
|
||||
}
|
||||
Ok(FlowStateKeyInner::new())
|
||||
}
|
||||
}
|
||||
|
||||
/// The key stores the state size of the flow.
|
||||
///
|
||||
/// The layout: `__flow/state`.
|
||||
pub struct FlowStateKey(FlowScoped<FlowStateKeyInner>);
|
||||
|
||||
impl FlowStateKey {
|
||||
/// Returns the [FlowStateKey].
|
||||
pub fn new() -> FlowStateKey {
|
||||
let inner = FlowStateKeyInner::new();
|
||||
FlowStateKey(FlowScoped::new(inner))
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for FlowStateKey {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> MetadataKey<'a, FlowStateKey> for FlowStateKey {
|
||||
fn to_bytes(&self) -> Vec<u8> {
|
||||
self.0.to_bytes()
|
||||
}
|
||||
|
||||
fn from_bytes(bytes: &'a [u8]) -> Result<FlowStateKey> {
|
||||
Ok(FlowStateKey(FlowScoped::<FlowStateKeyInner>::from_bytes(
|
||||
bytes,
|
||||
)?))
|
||||
}
|
||||
}
|
||||
|
||||
/// The value of flow state size
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub struct FlowStateValue {
|
||||
/// For each key, the bytes of the state in memory
|
||||
pub state_size: BTreeMap<FlowId, usize>,
|
||||
}
|
||||
|
||||
impl FlowStateValue {
|
||||
pub fn new(state_size: BTreeMap<FlowId, usize>) -> Self {
|
||||
Self { state_size }
|
||||
}
|
||||
}
|
||||
|
||||
pub type FlowStateManagerRef = Arc<FlowStateManager>;
|
||||
|
||||
/// The manager of [FlowStateKey]. Since state size changes frequently, we store it in memory.
|
||||
///
|
||||
/// This is only used in distributed mode. When meta-srv use heartbeat to update the flow stat report
|
||||
/// and frontned use get to get the latest flow stat report.
|
||||
pub struct FlowStateManager {
|
||||
in_memory: KvBackendRef,
|
||||
}
|
||||
|
||||
impl FlowStateManager {
|
||||
pub fn new(in_memory: KvBackendRef) -> Self {
|
||||
Self { in_memory }
|
||||
}
|
||||
|
||||
pub async fn get(&self) -> Result<Option<FlowStateValue>> {
|
||||
let key = FlowStateKey::new().to_bytes();
|
||||
self.in_memory
|
||||
.get(&key)
|
||||
.await?
|
||||
.map(|x| FlowStateValue::try_from_raw_value(&x.value))
|
||||
.transpose()
|
||||
}
|
||||
|
||||
pub async fn put(&self, value: FlowStateValue) -> Result<()> {
|
||||
let key = FlowStateKey::new().to_bytes();
|
||||
let value = value.try_as_raw_value()?;
|
||||
let req = PutRequest::new().with_key(key).with_value(value);
|
||||
self.in_memory.put(req).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Flow's state report, send regularly through heartbeat message
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct FlowStat {
|
||||
/// For each key, the bytes of the state in memory
|
||||
pub state_size: BTreeMap<u32, usize>,
|
||||
}
|
||||
|
||||
impl From<FlowStateValue> for FlowStat {
|
||||
fn from(value: FlowStateValue) -> Self {
|
||||
Self {
|
||||
state_size: value.state_size,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<FlowStat> for FlowStateValue {
|
||||
fn from(value: FlowStat) -> Self {
|
||||
Self {
|
||||
state_size: value.state_size,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -12,8 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::stream::BoxStream;
|
||||
use futures::TryStreamExt;
|
||||
use lazy_static::lazy_static;
|
||||
@@ -179,7 +177,7 @@ impl FlownodeFlowManager {
|
||||
self.kv_backend.clone(),
|
||||
req,
|
||||
DEFAULT_PAGE_SIZE,
|
||||
Arc::new(flownode_flow_key_decoder),
|
||||
flownode_flow_key_decoder,
|
||||
)
|
||||
.into_stream();
|
||||
|
||||
|
||||
@@ -206,7 +206,7 @@ impl TableFlowManager {
|
||||
self.kv_backend.clone(),
|
||||
req,
|
||||
DEFAULT_PAGE_SIZE,
|
||||
Arc::new(table_flow_decoder),
|
||||
table_flow_decoder,
|
||||
)
|
||||
.into_stream();
|
||||
|
||||
|
||||
@@ -28,13 +28,10 @@ pub type SchemaMetadataManagerRef = Arc<SchemaMetadataManager>;
|
||||
pub struct SchemaMetadataManager {
|
||||
table_id_schema_cache: TableSchemaCacheRef,
|
||||
schema_cache: SchemaCacheRef,
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
kv_backend: crate::kv_backend::KvBackendRef,
|
||||
}
|
||||
|
||||
impl SchemaMetadataManager {
|
||||
/// Creates a new database meta
|
||||
#[cfg(not(any(test, feature = "testing")))]
|
||||
pub fn new(table_id_schema_cache: TableSchemaCacheRef, schema_cache: SchemaCacheRef) -> Self {
|
||||
Self {
|
||||
table_id_schema_cache,
|
||||
@@ -42,20 +39,6 @@ impl SchemaMetadataManager {
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new database meta
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
pub fn new(
|
||||
kv_backend: crate::kv_backend::KvBackendRef,
|
||||
table_id_schema_cache: TableSchemaCacheRef,
|
||||
schema_cache: SchemaCacheRef,
|
||||
) -> Self {
|
||||
Self {
|
||||
table_id_schema_cache,
|
||||
schema_cache,
|
||||
kv_backend,
|
||||
}
|
||||
}
|
||||
|
||||
/// Gets schema options by table id.
|
||||
pub async fn get_schema_options_by_table_id(
|
||||
&self,
|
||||
@@ -80,6 +63,7 @@ impl SchemaMetadataManager {
|
||||
schema_name: &str,
|
||||
catalog_name: &str,
|
||||
schema_value: Option<crate::key::schema_name::SchemaNameValue>,
|
||||
kv_backend: crate::kv_backend::KvBackendRef,
|
||||
) {
|
||||
use table::metadata::{RawTableInfo, TableType};
|
||||
let value = crate::key::table_info::TableInfoValue::new(RawTableInfo {
|
||||
@@ -91,19 +75,18 @@ impl SchemaMetadataManager {
|
||||
meta: Default::default(),
|
||||
table_type: TableType::Base,
|
||||
});
|
||||
let table_info_manager =
|
||||
crate::key::table_info::TableInfoManager::new(self.kv_backend.clone());
|
||||
let table_info_manager = crate::key::table_info::TableInfoManager::new(kv_backend.clone());
|
||||
let (txn, _) = table_info_manager
|
||||
.build_create_txn(table_id, &value)
|
||||
.unwrap();
|
||||
let resp = self.kv_backend.txn(txn).await.unwrap();
|
||||
let resp = kv_backend.txn(txn).await.unwrap();
|
||||
assert!(resp.succeeded, "Failed to create table metadata");
|
||||
let key = crate::key::schema_name::SchemaNameKey {
|
||||
catalog: catalog_name,
|
||||
schema: schema_name,
|
||||
};
|
||||
|
||||
crate::key::schema_name::SchemaManager::new(self.kv_backend.clone())
|
||||
crate::key::schema_name::SchemaManager::new(kv_backend.clone())
|
||||
.create(key, schema_value, false)
|
||||
.await
|
||||
.expect("Failed to create schema metadata");
|
||||
|
||||
@@ -14,7 +14,6 @@
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::Display;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_time::DatabaseTimeToLive;
|
||||
@@ -30,6 +29,7 @@ use crate::error::{self, Error, InvalidMetadataSnafu, ParseOptionSnafu, Result};
|
||||
use crate::key::{MetadataKey, SCHEMA_NAME_KEY_PATTERN, SCHEMA_NAME_KEY_PREFIX};
|
||||
use crate::kv_backend::txn::Txn;
|
||||
use crate::kv_backend::KvBackendRef;
|
||||
use crate::metrics::METRIC_META_SCHEMA_INFO_GET;
|
||||
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
|
||||
use crate::rpc::store::RangeRequest;
|
||||
use crate::rpc::KeyValue;
|
||||
@@ -210,6 +210,8 @@ impl SchemaManager {
|
||||
&self,
|
||||
schema: SchemaNameKey<'_>,
|
||||
) -> Result<Option<DeserializedValueWithBytes<SchemaNameValue>>> {
|
||||
let _timer = METRIC_META_SCHEMA_INFO_GET.start_timer();
|
||||
|
||||
let raw_key = schema.to_bytes();
|
||||
self.kv_backend
|
||||
.get(&raw_key)
|
||||
@@ -283,7 +285,7 @@ impl SchemaManager {
|
||||
self.kv_backend.clone(),
|
||||
req,
|
||||
DEFAULT_PAGE_SIZE,
|
||||
Arc::new(schema_decoder),
|
||||
schema_decoder,
|
||||
)
|
||||
.into_stream();
|
||||
|
||||
@@ -308,6 +310,7 @@ impl<'a> From<&'a SchemaName> for SchemaNameKey<'a> {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use super::*;
|
||||
|
||||
@@ -29,6 +29,7 @@ use crate::key::txn_helper::TxnOpGetResponseSet;
|
||||
use crate::key::{DeserializedValueWithBytes, MetadataKey, MetadataValue, TABLE_INFO_KEY_PREFIX};
|
||||
use crate::kv_backend::txn::Txn;
|
||||
use crate::kv_backend::KvBackendRef;
|
||||
use crate::metrics::METRIC_META_TABLE_INFO_GET;
|
||||
use crate::rpc::store::BatchGetRequest;
|
||||
|
||||
/// The key stores the metadata of the table.
|
||||
@@ -194,6 +195,8 @@ impl TableInfoManager {
|
||||
&self,
|
||||
table_id: TableId,
|
||||
) -> Result<Option<DeserializedValueWithBytes<TableInfoValue>>> {
|
||||
let _timer = METRIC_META_TABLE_INFO_GET.start_timer();
|
||||
|
||||
let key = TableInfoKey::new(table_id);
|
||||
let raw_key = key.to_bytes();
|
||||
self.kv_backend
|
||||
|
||||
@@ -269,7 +269,7 @@ impl TableNameManager {
|
||||
self.kv_backend.clone(),
|
||||
req,
|
||||
DEFAULT_PAGE_SIZE,
|
||||
Arc::new(table_decoder),
|
||||
table_decoder,
|
||||
)
|
||||
.into_stream();
|
||||
|
||||
|
||||
@@ -36,7 +36,7 @@ pub mod postgres;
|
||||
pub mod test;
|
||||
pub mod txn;
|
||||
|
||||
pub type KvBackendRef = Arc<dyn KvBackend<Error = Error> + Send + Sync>;
|
||||
pub type KvBackendRef<E = Error> = Arc<dyn KvBackend<Error = E> + Send + Sync>;
|
||||
|
||||
#[async_trait]
|
||||
pub trait KvBackend: TxnService
|
||||
@@ -161,6 +161,9 @@ where
|
||||
Self::Error: ErrorExt,
|
||||
{
|
||||
fn reset(&self);
|
||||
|
||||
/// Upcast as `KvBackendRef`. Since https://github.com/rust-lang/rust/issues/65991 is not yet stable.
|
||||
fn as_kv_backend_ref(self: Arc<Self>) -> KvBackendRef<Self::Error>;
|
||||
}
|
||||
|
||||
pub type ResettableKvBackendRef = Arc<dyn ResettableKvBackend<Error = Error> + Send + Sync>;
|
||||
pub type ResettableKvBackendRef<E = Error> = Arc<dyn ResettableKvBackend<Error = E> + Send + Sync>;
|
||||
|
||||
@@ -16,13 +16,13 @@ use std::any::Any;
|
||||
use std::collections::BTreeMap;
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::RwLock;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_error::ext::ErrorExt;
|
||||
use serde::Serializer;
|
||||
|
||||
use super::ResettableKvBackend;
|
||||
use super::{KvBackendRef, ResettableKvBackend};
|
||||
use crate::kv_backend::txn::{Txn, TxnOp, TxnOpResponse, TxnRequest, TxnResponse};
|
||||
use crate::kv_backend::{KvBackend, TxnService};
|
||||
use crate::metrics::METRIC_META_TXN_REQUEST;
|
||||
@@ -311,6 +311,10 @@ impl<T: ErrorExt + Send + Sync + 'static> ResettableKvBackend for MemoryKvBacken
|
||||
fn reset(&self) {
|
||||
self.clear();
|
||||
}
|
||||
|
||||
fn as_kv_backend_ref(self: Arc<Self>) -> KvBackendRef<T> {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -16,6 +16,7 @@ use std::any::Any;
|
||||
use std::borrow::Cow;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_telemetry::error;
|
||||
use snafu::ResultExt;
|
||||
use tokio_postgres::types::ToSql;
|
||||
use tokio_postgres::{Client, NoTls};
|
||||
@@ -97,7 +98,11 @@ impl PgStore {
|
||||
let (client, conn) = tokio_postgres::connect(url, NoTls)
|
||||
.await
|
||||
.context(ConnectPostgresSnafu)?;
|
||||
tokio::spawn(async move { conn.await.context(ConnectPostgresSnafu) });
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = conn.await {
|
||||
error!(e; "connection error");
|
||||
}
|
||||
});
|
||||
Self::with_pg_client(client).await
|
||||
}
|
||||
|
||||
|
||||
@@ -108,4 +108,9 @@ lazy_static! {
|
||||
&["name"]
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
pub static ref METRIC_META_TABLE_INFO_GET: Histogram =
|
||||
register_histogram!("greptime_meta_table_info_get", "get table info from kvbackend").unwrap();
|
||||
pub static ref METRIC_META_SCHEMA_INFO_GET: Histogram =
|
||||
register_histogram!("greptime_meta_schema_info_get", "get schema info from kvbackend").unwrap();
|
||||
}
|
||||
|
||||
@@ -12,8 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_stream::try_stream;
|
||||
use common_telemetry::debug;
|
||||
use futures::Stream;
|
||||
@@ -148,7 +146,7 @@ impl PaginationStreamFactory {
|
||||
}
|
||||
|
||||
pub struct PaginationStream<T> {
|
||||
decoder_fn: Arc<KeyValueDecoderFn<T>>,
|
||||
decoder_fn: fn(KeyValue) -> Result<T>,
|
||||
factory: PaginationStreamFactory,
|
||||
}
|
||||
|
||||
@@ -158,7 +156,7 @@ impl<T> PaginationStream<T> {
|
||||
kv: KvBackendRef,
|
||||
req: RangeRequest,
|
||||
page_size: usize,
|
||||
decoder_fn: Arc<KeyValueDecoderFn<T>>,
|
||||
decoder_fn: fn(KeyValue) -> Result<T>,
|
||||
) -> Self {
|
||||
Self {
|
||||
decoder_fn,
|
||||
@@ -191,6 +189,7 @@ mod tests {
|
||||
|
||||
use std::assert_matches::assert_matches;
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::TryStreamExt;
|
||||
|
||||
@@ -250,7 +249,7 @@ mod tests {
|
||||
..Default::default()
|
||||
},
|
||||
DEFAULT_PAGE_SIZE,
|
||||
Arc::new(decoder),
|
||||
decoder,
|
||||
)
|
||||
.into_stream();
|
||||
let kv = stream.try_collect::<Vec<_>>().await.unwrap();
|
||||
@@ -290,7 +289,7 @@ mod tests {
|
||||
..Default::default()
|
||||
},
|
||||
2,
|
||||
Arc::new(decoder),
|
||||
decoder,
|
||||
);
|
||||
let kv = stream
|
||||
.into_stream()
|
||||
|
||||
@@ -12,8 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_procedure::error::{DeleteStatesSnafu, ListStateSnafu, PutStateSnafu};
|
||||
@@ -171,7 +169,7 @@ impl StateStore for KvStateStore {
|
||||
self.kv_backend.clone(),
|
||||
req,
|
||||
self.max_num_per_range_request.unwrap_or_default(),
|
||||
Arc::new(decode_kv),
|
||||
decode_kv,
|
||||
)
|
||||
.into_stream();
|
||||
|
||||
|
||||
@@ -39,3 +39,7 @@ tokio-util.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
tokio-test = "0.4"
|
||||
|
||||
[target.'cfg(tokio_unstable)'.dependencies]
|
||||
tokio-metrics = { version = "0.3" }
|
||||
tokio-metrics-collector = { version = "0.2" }
|
||||
|
||||
@@ -224,7 +224,6 @@ impl DatanodeBuilder {
|
||||
cache_registry.get().context(MissingCacheSnafu)?;
|
||||
|
||||
let schema_metadata_manager = Arc::new(SchemaMetadataManager::new(
|
||||
kv_backend.clone(),
|
||||
table_id_schema_cache,
|
||||
schema_cache,
|
||||
));
|
||||
|
||||
@@ -28,7 +28,7 @@ use common_telemetry::{info, warn};
|
||||
use object_store::layers::{LruCacheLayer, RetryInterceptor, RetryLayer};
|
||||
use object_store::services::Fs;
|
||||
use object_store::util::{join_dir, normalize_dir, with_instrument_layers};
|
||||
use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder, OBJECT_CACHE_DIR};
|
||||
use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder};
|
||||
use snafu::prelude::*;
|
||||
|
||||
use crate::config::{HttpClientConfig, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
|
||||
@@ -147,12 +147,10 @@ async fn build_cache_layer(
|
||||
};
|
||||
|
||||
// Enable object cache by default
|
||||
// Set the cache_path to be `${data_home}/object_cache/read/{name}` by default
|
||||
// Set the cache_path to be `${data_home}` by default
|
||||
// if it's not present
|
||||
if cache_path.is_none() {
|
||||
let object_cache_path = join_dir(data_home, OBJECT_CACHE_DIR);
|
||||
let read_cache_path = join_dir(&object_cache_path, "read");
|
||||
let read_cache_path = join_dir(&read_cache_path, &name.to_lowercase());
|
||||
let read_cache_path = data_home.to_string();
|
||||
tokio::fs::create_dir_all(Path::new(&read_cache_path))
|
||||
.await
|
||||
.context(CreateDirSnafu {
|
||||
|
||||
@@ -29,7 +29,7 @@ use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, R
|
||||
use crate::prelude::ConcreteDataType;
|
||||
pub use crate::schema::column_schema::{
|
||||
ColumnSchema, FulltextAnalyzer, FulltextOptions, Metadata, SkippingIndexOptions,
|
||||
COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE, COLUMN_FULLTEXT_OPT_KEY_ANALYZER,
|
||||
SkippingIndexType, COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE, COLUMN_FULLTEXT_OPT_KEY_ANALYZER,
|
||||
COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY,
|
||||
COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY, FULLTEXT_KEY, INVERTED_INDEX_KEY,
|
||||
SKIPPING_INDEX_KEY, TIME_INDEX_KEY,
|
||||
|
||||
@@ -543,7 +543,7 @@ pub struct SkippingIndexOptions {
|
||||
pub granularity: u32,
|
||||
/// The type of the skip index.
|
||||
#[serde(default)]
|
||||
pub index_type: SkipIndexType,
|
||||
pub index_type: SkippingIndexType,
|
||||
}
|
||||
|
||||
impl fmt::Display for SkippingIndexOptions {
|
||||
@@ -556,15 +556,15 @@ impl fmt::Display for SkippingIndexOptions {
|
||||
|
||||
/// Skip index types.
|
||||
#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize, Visit, VisitMut)]
|
||||
pub enum SkipIndexType {
|
||||
pub enum SkippingIndexType {
|
||||
#[default]
|
||||
BloomFilter,
|
||||
}
|
||||
|
||||
impl fmt::Display for SkipIndexType {
|
||||
impl fmt::Display for SkippingIndexType {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
SkipIndexType::BloomFilter => write!(f, "BLOOM"),
|
||||
SkippingIndexType::BloomFilter => write!(f, "BLOOM"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -587,7 +587,7 @@ impl TryFrom<HashMap<String, String>> for SkippingIndexOptions {
|
||||
// Parse index type with default value BloomFilter
|
||||
let index_type = match options.get(COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE) {
|
||||
Some(typ) => match typ.to_ascii_uppercase().as_str() {
|
||||
"BLOOM" => SkipIndexType::BloomFilter,
|
||||
"BLOOM" => SkippingIndexType::BloomFilter,
|
||||
_ => {
|
||||
return error::InvalidSkippingIndexOptionSnafu {
|
||||
msg: format!("Invalid index type: {typ}, expected: 'BLOOM'"),
|
||||
@@ -595,7 +595,7 @@ impl TryFrom<HashMap<String, String>> for SkippingIndexOptions {
|
||||
.fail();
|
||||
}
|
||||
},
|
||||
None => SkipIndexType::default(),
|
||||
None => SkippingIndexType::default(),
|
||||
};
|
||||
|
||||
Ok(SkippingIndexOptions {
|
||||
|
||||
@@ -40,9 +40,12 @@ datatypes.workspace = true
|
||||
enum-as-inner = "0.6.0"
|
||||
enum_dispatch = "0.3"
|
||||
futures = "0.3"
|
||||
get-size-derive2 = "0.1.2"
|
||||
get-size2 = "0.1.2"
|
||||
greptime-proto.workspace = true
|
||||
# This fork of hydroflow is simply for keeping our dependency in our org, and pin the version
|
||||
# otherwise it is the same with upstream repo
|
||||
http.workspace = true
|
||||
hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", branch = "main" }
|
||||
itertools.workspace = true
|
||||
lazy_static.workspace = true
|
||||
|
||||
@@ -30,7 +30,7 @@ use common_telemetry::{debug, info, trace};
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use datatypes::value::Value;
|
||||
use greptime_proto::v1;
|
||||
use itertools::Itertools;
|
||||
use itertools::{EitherOrBoth, Itertools};
|
||||
use meta_client::MetaClientOptions;
|
||||
use query::QueryEngine;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -46,20 +46,23 @@ use tokio::sync::{broadcast, watch, Mutex, RwLock};
|
||||
|
||||
pub(crate) use crate::adapter::node_context::FlownodeContext;
|
||||
use crate::adapter::table_source::TableSource;
|
||||
use crate::adapter::util::column_schemas_to_proto;
|
||||
use crate::adapter::util::{
|
||||
relation_desc_to_column_schemas_with_fallback, table_info_value_to_relation_desc,
|
||||
};
|
||||
use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
|
||||
use crate::compute::ErrCollector;
|
||||
use crate::df_optimizer::sql_to_flow_plan;
|
||||
use crate::error::{
|
||||
EvalSnafu, ExternalSnafu, FlowAlreadyExistSnafu, InternalSnafu, TableNotFoundSnafu,
|
||||
EvalSnafu, ExternalSnafu, FlowAlreadyExistSnafu, InternalSnafu, InvalidQuerySnafu,
|
||||
UnexpectedSnafu,
|
||||
};
|
||||
use crate::expr::{Batch, GlobalId};
|
||||
use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_RUN_INTERVAL_MS};
|
||||
use crate::repr::{self, DiffRow, Row, BATCH_SIZE};
|
||||
use crate::expr::Batch;
|
||||
use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS};
|
||||
use crate::repr::{self, DiffRow, RelationDesc, Row, BATCH_SIZE};
|
||||
|
||||
mod flownode_impl;
|
||||
mod parse_expr;
|
||||
mod stat;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
mod util;
|
||||
@@ -69,6 +72,7 @@ pub(crate) mod node_context;
|
||||
mod table_source;
|
||||
|
||||
use crate::error::Error;
|
||||
use crate::utils::StateReportHandler;
|
||||
use crate::FrontendInvoker;
|
||||
|
||||
// `GREPTIME_TIMESTAMP` is not used to distinguish when table is created automatically by flow
|
||||
@@ -137,6 +141,8 @@ pub struct FlowWorkerManager {
|
||||
///
|
||||
/// So that a series of event like `inserts -> flush` can be handled correctly
|
||||
flush_lock: RwLock<()>,
|
||||
/// receive a oneshot sender to send state size report
|
||||
state_report_handler: RwLock<Option<StateReportHandler>>,
|
||||
}
|
||||
|
||||
/// Building FlownodeManager
|
||||
@@ -170,9 +176,15 @@ impl FlowWorkerManager {
|
||||
tick_manager,
|
||||
node_id,
|
||||
flush_lock: RwLock::new(()),
|
||||
state_report_handler: RwLock::new(None),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn with_state_report_handler(self, handler: StateReportHandler) -> Self {
|
||||
*self.state_report_handler.write().await = Some(handler);
|
||||
self
|
||||
}
|
||||
|
||||
/// Create a flownode manager with one worker
|
||||
pub fn new_with_worker<'s>(
|
||||
node_id: Option<u32>,
|
||||
@@ -235,16 +247,26 @@ impl FlowWorkerManager {
|
||||
let (catalog, schema) = (table_name[0].clone(), table_name[1].clone());
|
||||
let ctx = Arc::new(QueryContext::with(&catalog, &schema));
|
||||
|
||||
let (is_ts_placeholder, proto_schema) =
|
||||
self.try_fetch_or_create_table(&table_name).await?;
|
||||
let (is_ts_placeholder, proto_schema) = self
|
||||
.try_fetch_existing_table(&table_name)
|
||||
.await?
|
||||
.context(UnexpectedSnafu {
|
||||
reason: format!("Table not found: {}", table_name.join(".")),
|
||||
})?;
|
||||
let schema_len = proto_schema.len();
|
||||
|
||||
let total_rows = reqs.iter().map(|r| r.len()).sum::<usize>();
|
||||
trace!(
|
||||
"Sending {} writeback requests to table {}, reqs total rows={}",
|
||||
reqs.len(),
|
||||
table_name.join("."),
|
||||
reqs.iter().map(|r| r.len()).sum::<usize>()
|
||||
);
|
||||
|
||||
METRIC_FLOW_ROWS
|
||||
.with_label_values(&["out"])
|
||||
.inc_by(total_rows as u64);
|
||||
|
||||
let now = self.tick_manager.tick();
|
||||
for req in reqs {
|
||||
match req {
|
||||
@@ -380,14 +402,12 @@ impl FlowWorkerManager {
|
||||
Ok(output)
|
||||
}
|
||||
|
||||
/// Fetch table info or create table from flow's schema if not exist
|
||||
async fn try_fetch_or_create_table(
|
||||
/// Fetch table schema and primary key from table info source, if table not exist return None
|
||||
async fn fetch_table_pk_schema(
|
||||
&self,
|
||||
table_name: &TableName,
|
||||
) -> Result<(bool, Vec<api::v1::ColumnSchema>), Error> {
|
||||
// TODO(discord9): instead of auto build table from request schema, actually build table
|
||||
// before `create flow` to be able to assign pk and ts etc.
|
||||
let (primary_keys, schema, is_ts_placeholder) = if let Some(table_id) = self
|
||||
) -> Result<Option<(Vec<String>, Option<usize>, Vec<ColumnSchema>)>, Error> {
|
||||
if let Some(table_id) = self
|
||||
.table_info_source
|
||||
.get_table_id_from_name(table_name)
|
||||
.await?
|
||||
@@ -404,102 +424,90 @@ impl FlowWorkerManager {
|
||||
.map(|i| meta.schema.column_schemas[i].name.clone())
|
||||
.collect_vec();
|
||||
let schema = meta.schema.column_schemas;
|
||||
// check if the last column is the auto created timestamp column, hence the table is auto created from
|
||||
// flow's plan type
|
||||
let is_auto_create = {
|
||||
let correct_name = schema
|
||||
.last()
|
||||
.map(|s| s.name == AUTO_CREATED_PLACEHOLDER_TS_COL)
|
||||
.unwrap_or(false);
|
||||
let correct_time_index = meta.schema.timestamp_index == Some(schema.len() - 1);
|
||||
correct_name && correct_time_index
|
||||
};
|
||||
(primary_keys, schema, is_auto_create)
|
||||
let time_index = meta.schema.timestamp_index;
|
||||
Ok(Some((primary_keys, time_index, schema)))
|
||||
} else {
|
||||
// TODO(discord9): condiser remove buggy auto create by schema
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
let node_ctx = self.node_context.read().await;
|
||||
let gid: GlobalId = node_ctx
|
||||
.table_repr
|
||||
.get_by_name(table_name)
|
||||
.map(|x| x.1)
|
||||
.unwrap();
|
||||
let schema = node_ctx
|
||||
.schema
|
||||
.get(&gid)
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
name: format!("Table name = {:?}", table_name),
|
||||
})?
|
||||
.clone();
|
||||
// TODO(discord9): use default key from schema
|
||||
let primary_keys = schema
|
||||
.typ()
|
||||
.keys
|
||||
.first()
|
||||
.map(|v| {
|
||||
v.column_indices
|
||||
.iter()
|
||||
.map(|i| {
|
||||
schema
|
||||
.get_name(*i)
|
||||
.clone()
|
||||
.unwrap_or_else(|| format!("col_{i}"))
|
||||
})
|
||||
.collect_vec()
|
||||
})
|
||||
.unwrap_or_default();
|
||||
let update_at = ColumnSchema::new(
|
||||
UPDATE_AT_TS_COL,
|
||||
/// return (primary keys, schema and if the table have a placeholder timestamp column)
|
||||
/// schema of the table comes from flow's output plan
|
||||
///
|
||||
/// adjust to add `update_at` column and ts placeholder if needed
|
||||
async fn adjust_auto_created_table_schema(
|
||||
&self,
|
||||
schema: &RelationDesc,
|
||||
) -> Result<(Vec<String>, Vec<ColumnSchema>, bool), Error> {
|
||||
// TODO(discord9): condiser remove buggy auto create by schema
|
||||
|
||||
// TODO(discord9): use default key from schema
|
||||
let primary_keys = schema
|
||||
.typ()
|
||||
.keys
|
||||
.first()
|
||||
.map(|v| {
|
||||
v.column_indices
|
||||
.iter()
|
||||
.map(|i| {
|
||||
schema
|
||||
.get_name(*i)
|
||||
.clone()
|
||||
.unwrap_or_else(|| format!("col_{i}"))
|
||||
})
|
||||
.collect_vec()
|
||||
})
|
||||
.unwrap_or_default();
|
||||
let update_at = ColumnSchema::new(
|
||||
UPDATE_AT_TS_COL,
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
true,
|
||||
);
|
||||
|
||||
let original_schema = relation_desc_to_column_schemas_with_fallback(schema);
|
||||
|
||||
let mut with_auto_added_col = original_schema.clone();
|
||||
with_auto_added_col.push(update_at);
|
||||
|
||||
// if no time index, add one as placeholder
|
||||
let no_time_index = schema.typ().time_index.is_none();
|
||||
if no_time_index {
|
||||
let ts_col = ColumnSchema::new(
|
||||
AUTO_CREATED_PLACEHOLDER_TS_COL,
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
true,
|
||||
);
|
||||
)
|
||||
.with_time_index(true);
|
||||
with_auto_added_col.push(ts_col);
|
||||
}
|
||||
|
||||
let original_schema = schema
|
||||
.typ()
|
||||
.column_types
|
||||
.clone()
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.map(|(idx, typ)| {
|
||||
let name = schema
|
||||
.names
|
||||
.get(idx)
|
||||
.cloned()
|
||||
.flatten()
|
||||
.unwrap_or(format!("col_{}", idx));
|
||||
let ret = ColumnSchema::new(name, typ.scalar_type, typ.nullable);
|
||||
if schema.typ().time_index == Some(idx) {
|
||||
ret.with_time_index(true)
|
||||
} else {
|
||||
ret
|
||||
}
|
||||
})
|
||||
.collect_vec();
|
||||
|
||||
let mut with_auto_added_col = original_schema.clone();
|
||||
with_auto_added_col.push(update_at);
|
||||
|
||||
// if no time index, add one as placeholder
|
||||
let no_time_index = schema.typ().time_index.is_none();
|
||||
if no_time_index {
|
||||
let ts_col = ColumnSchema::new(
|
||||
AUTO_CREATED_PLACEHOLDER_TS_COL,
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
true,
|
||||
)
|
||||
.with_time_index(true);
|
||||
with_auto_added_col.push(ts_col);
|
||||
}
|
||||
|
||||
(primary_keys, with_auto_added_col, no_time_index)
|
||||
};
|
||||
let proto_schema = column_schemas_to_proto(schema, &primary_keys)?;
|
||||
Ok((is_ts_placeholder, proto_schema))
|
||||
Ok((primary_keys, with_auto_added_col, no_time_index))
|
||||
}
|
||||
}
|
||||
|
||||
/// Flow Runtime related methods
|
||||
impl FlowWorkerManager {
|
||||
/// Start state report handler, which will receive a sender from HeartbeatTask to send state size report back
|
||||
///
|
||||
/// if heartbeat task is shutdown, this future will exit too
|
||||
async fn start_state_report_handler(self: Arc<Self>) -> Option<JoinHandle<()>> {
|
||||
let state_report_handler = self.state_report_handler.write().await.take();
|
||||
if let Some(mut handler) = state_report_handler {
|
||||
let zelf = self.clone();
|
||||
let handler = common_runtime::spawn_global(async move {
|
||||
while let Some(ret_handler) = handler.recv().await {
|
||||
let state_report = zelf.gen_state_report().await;
|
||||
ret_handler.send(state_report).unwrap_or_else(|err| {
|
||||
common_telemetry::error!(err; "Send state size report error");
|
||||
});
|
||||
}
|
||||
});
|
||||
Some(handler)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// run in common_runtime background runtime
|
||||
pub fn run_background(
|
||||
self: Arc<Self>,
|
||||
@@ -507,6 +515,7 @@ impl FlowWorkerManager {
|
||||
) -> JoinHandle<()> {
|
||||
info!("Starting flownode manager's background task");
|
||||
common_runtime::spawn_global(async move {
|
||||
let _state_report_handler = self.clone().start_state_report_handler().await;
|
||||
self.run(shutdown).await;
|
||||
})
|
||||
}
|
||||
@@ -533,6 +542,8 @@ impl FlowWorkerManager {
|
||||
let default_interval = Duration::from_secs(1);
|
||||
let mut avg_spd = 0; // rows/sec
|
||||
let mut since_last_run = tokio::time::Instant::now();
|
||||
let run_per_trace = 10;
|
||||
let mut run_cnt = 0;
|
||||
loop {
|
||||
// TODO(discord9): only run when new inputs arrive or scheduled to
|
||||
let row_cnt = self.run_available(true).await.unwrap_or_else(|err| {
|
||||
@@ -575,10 +586,19 @@ impl FlowWorkerManager {
|
||||
} else {
|
||||
(9 * avg_spd + cur_spd) / 10
|
||||
};
|
||||
trace!("avg_spd={} r/s, cur_spd={} r/s", avg_spd, cur_spd);
|
||||
let new_wait = BATCH_SIZE * 1000 / avg_spd.max(1); //in ms
|
||||
let new_wait = Duration::from_millis(new_wait as u64).min(default_interval);
|
||||
trace!("Wait for {} ms, row_cnt={}", new_wait.as_millis(), row_cnt);
|
||||
|
||||
// print trace every `run_per_trace` times so that we can see if there is something wrong
|
||||
// but also not get flooded with trace
|
||||
if run_cnt >= run_per_trace {
|
||||
trace!("avg_spd={} r/s, cur_spd={} r/s", avg_spd, cur_spd);
|
||||
trace!("Wait for {} ms, row_cnt={}", new_wait.as_millis(), row_cnt);
|
||||
run_cnt = 0;
|
||||
} else {
|
||||
run_cnt += 1;
|
||||
}
|
||||
|
||||
METRIC_FLOW_RUN_INTERVAL_MS.set(new_wait.as_millis() as i64);
|
||||
since_last_run = tokio::time::Instant::now();
|
||||
tokio::time::sleep(new_wait).await;
|
||||
@@ -638,13 +658,18 @@ impl FlowWorkerManager {
|
||||
&self,
|
||||
region_id: RegionId,
|
||||
rows: Vec<DiffRow>,
|
||||
batch_datatypes: &[ConcreteDataType],
|
||||
) -> Result<(), Error> {
|
||||
let rows_len = rows.len();
|
||||
let table_id = region_id.table_id();
|
||||
let _timer = METRIC_FLOW_INSERT_ELAPSED
|
||||
.with_label_values(&[table_id.to_string().as_str()])
|
||||
.start_timer();
|
||||
self.node_context.read().await.send(table_id, rows).await?;
|
||||
self.node_context
|
||||
.read()
|
||||
.await
|
||||
.send(table_id, rows, batch_datatypes)
|
||||
.await?;
|
||||
trace!(
|
||||
"Handling write request for table_id={} with {} rows",
|
||||
table_id,
|
||||
@@ -759,7 +784,85 @@ impl FlowWorkerManager {
|
||||
let flow_plan = sql_to_flow_plan(&mut node_ctx, &self.query_engine, &sql).await?;
|
||||
|
||||
debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan);
|
||||
node_ctx.assign_table_schema(&sink_table_name, flow_plan.schema.clone())?;
|
||||
|
||||
// check schema against actual table schema if exists
|
||||
// if not exist create sink table immediately
|
||||
if let Some((_, _, real_schema)) = self.fetch_table_pk_schema(&sink_table_name).await? {
|
||||
let auto_schema = relation_desc_to_column_schemas_with_fallback(&flow_plan.schema);
|
||||
|
||||
// for column schema, only `data_type` need to be check for equality
|
||||
// since one can omit flow's column name when write flow query
|
||||
// print a user friendly error message about mismatch and how to correct them
|
||||
for (idx, zipped) in auto_schema
|
||||
.iter()
|
||||
.zip_longest(real_schema.iter())
|
||||
.enumerate()
|
||||
{
|
||||
match zipped {
|
||||
EitherOrBoth::Both(auto, real) => {
|
||||
if auto.data_type != real.data_type {
|
||||
InvalidQuerySnafu {
|
||||
reason: format!(
|
||||
"Column {}(name is '{}', flow inferred name is '{}')'s data type mismatch, expect {:?} got {:?}",
|
||||
idx,
|
||||
real.name,
|
||||
auto.name,
|
||||
real.data_type,
|
||||
auto.data_type
|
||||
),
|
||||
}
|
||||
.fail()?;
|
||||
}
|
||||
}
|
||||
EitherOrBoth::Right(real) if real.data_type.is_timestamp() => {
|
||||
// if table is auto created, the last one or two column should be timestamp(update at and ts placeholder)
|
||||
continue;
|
||||
}
|
||||
_ => InvalidQuerySnafu {
|
||||
reason: format!(
|
||||
"schema length mismatched, expected {} found {}",
|
||||
real_schema.len(),
|
||||
auto_schema.len()
|
||||
),
|
||||
}
|
||||
.fail()?,
|
||||
}
|
||||
}
|
||||
|
||||
let table_id = self
|
||||
.table_info_source
|
||||
.get_table_id_from_name(&sink_table_name)
|
||||
.await?
|
||||
.context(UnexpectedSnafu {
|
||||
reason: format!("Can't get table id for table name {:?}", sink_table_name),
|
||||
})?;
|
||||
let table_info_value = self
|
||||
.table_info_source
|
||||
.get_table_info_value(&table_id)
|
||||
.await?
|
||||
.context(UnexpectedSnafu {
|
||||
reason: format!("Can't get table info value for table id {:?}", table_id),
|
||||
})?;
|
||||
let real_schema = table_info_value_to_relation_desc(table_info_value)?;
|
||||
node_ctx.assign_table_schema(&sink_table_name, real_schema.clone())?;
|
||||
} else {
|
||||
// assign inferred schema to sink table
|
||||
// create sink table
|
||||
node_ctx.assign_table_schema(&sink_table_name, flow_plan.schema.clone())?;
|
||||
let did_create = self
|
||||
.create_table_from_relation(
|
||||
&format!("flow-id={flow_id}"),
|
||||
&sink_table_name,
|
||||
&flow_plan.schema,
|
||||
)
|
||||
.await?;
|
||||
if !did_create {
|
||||
UnexpectedSnafu {
|
||||
reason: format!("Failed to create table {:?}", sink_table_name),
|
||||
}
|
||||
.fail()?;
|
||||
}
|
||||
}
|
||||
|
||||
let _ = comment;
|
||||
let _ = flow_options;
|
||||
|
||||
@@ -28,6 +28,7 @@ use itertools::Itertools;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use super::util::from_proto_to_data_type;
|
||||
use crate::adapter::{CreateFlowArgs, FlowWorkerManager};
|
||||
use crate::error::InternalSnafu;
|
||||
use crate::metrics::METRIC_FLOW_TASK_COUNT;
|
||||
@@ -137,7 +138,7 @@ impl Flownode for FlowWorkerManager {
|
||||
}
|
||||
|
||||
async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse> {
|
||||
// using try_read makesure two things:
|
||||
// using try_read to ensure two things:
|
||||
// 1. flush wouldn't happen until inserts before it is inserted
|
||||
// 2. inserts happening concurrently with flush wouldn't be block by flush
|
||||
let _flush_lock = self.flush_lock.try_read();
|
||||
@@ -206,9 +207,17 @@ impl Flownode for FlowWorkerManager {
|
||||
})
|
||||
.map(|r| (r, now, 1))
|
||||
.collect_vec();
|
||||
self.handle_write_request(region_id.into(), rows)
|
||||
.await
|
||||
let batch_datatypes = insert_schema
|
||||
.iter()
|
||||
.map(from_proto_to_data_type)
|
||||
.collect::<std::result::Result<Vec<_>, _>>()
|
||||
.map_err(to_meta_err)?;
|
||||
self.handle_write_request(region_id.into(), rows, &batch_datatypes)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
common_telemetry::error!(err;"Failed to handle write request");
|
||||
to_meta_err(err)
|
||||
})?;
|
||||
}
|
||||
Ok(Default::default())
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_telemetry::trace;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use session::context::QueryContext;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use table::metadata::TableId;
|
||||
@@ -131,7 +132,11 @@ impl SourceSender {
|
||||
}
|
||||
|
||||
/// return number of rows it actual send(including what's in the buffer)
|
||||
pub async fn send_rows(&self, rows: Vec<DiffRow>) -> Result<usize, Error> {
|
||||
pub async fn send_rows(
|
||||
&self,
|
||||
rows: Vec<DiffRow>,
|
||||
batch_datatypes: &[ConcreteDataType],
|
||||
) -> Result<usize, Error> {
|
||||
METRIC_FLOW_INPUT_BUF_SIZE.add(rows.len() as _);
|
||||
while self.send_buf_row_cnt.load(Ordering::SeqCst) >= BATCH_SIZE * 4 {
|
||||
tokio::task::yield_now().await;
|
||||
@@ -139,8 +144,11 @@ impl SourceSender {
|
||||
// row count metrics is approx so relaxed order is ok
|
||||
self.send_buf_row_cnt
|
||||
.fetch_add(rows.len(), Ordering::SeqCst);
|
||||
let batch = Batch::try_from_rows(rows.into_iter().map(|(row, _, _)| row).collect())
|
||||
.context(EvalSnafu)?;
|
||||
let batch = Batch::try_from_rows_with_types(
|
||||
rows.into_iter().map(|(row, _, _)| row).collect(),
|
||||
batch_datatypes,
|
||||
)
|
||||
.context(EvalSnafu)?;
|
||||
common_telemetry::trace!("Send one batch to worker with {} rows", batch.row_count());
|
||||
self.send_buf_tx.send(batch).await.map_err(|e| {
|
||||
crate::error::InternalSnafu {
|
||||
@@ -157,14 +165,19 @@ impl FlownodeContext {
|
||||
/// return number of rows it actual send(including what's in the buffer)
|
||||
///
|
||||
/// TODO(discord9): make this concurrent
|
||||
pub async fn send(&self, table_id: TableId, rows: Vec<DiffRow>) -> Result<usize, Error> {
|
||||
pub async fn send(
|
||||
&self,
|
||||
table_id: TableId,
|
||||
rows: Vec<DiffRow>,
|
||||
batch_datatypes: &[ConcreteDataType],
|
||||
) -> Result<usize, Error> {
|
||||
let sender = self
|
||||
.source_sender
|
||||
.get(&table_id)
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
name: table_id.to_string(),
|
||||
})?;
|
||||
sender.send_rows(rows).await
|
||||
sender.send_rows(rows, batch_datatypes).await
|
||||
}
|
||||
|
||||
/// flush all sender's buf
|
||||
@@ -318,12 +331,14 @@ impl FlownodeContext {
|
||||
} else {
|
||||
let global_id = self.new_global_id();
|
||||
|
||||
// table id is Some meaning db must have created the table
|
||||
if let Some(table_id) = table_id {
|
||||
let (known_table_name, schema) = srv_map.get_table_name_schema(&table_id).await?;
|
||||
table_name = table_name.or(Some(known_table_name));
|
||||
self.schema.insert(global_id, schema);
|
||||
} // if we don't have table id, it means database havn't assign one yet or we don't need it
|
||||
} // if we don't have table id, it means database haven't assign one yet or we don't need it
|
||||
|
||||
// still update the mapping with new global id
|
||||
self.table_repr.insert(table_name, table_id, global_id);
|
||||
Ok(global_id)
|
||||
}
|
||||
@@ -345,6 +360,7 @@ impl FlownodeContext {
|
||||
})?;
|
||||
|
||||
self.schema.insert(gid, schema);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
40
src/flow/src/adapter/stat.rs
Normal file
40
src/flow/src/adapter/stat.rs
Normal file
@@ -0,0 +1,40 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use common_meta::key::flow::flow_state::FlowStat;
|
||||
|
||||
use crate::FlowWorkerManager;
|
||||
|
||||
impl FlowWorkerManager {
|
||||
pub async fn gen_state_report(&self) -> FlowStat {
|
||||
let mut full_report = BTreeMap::new();
|
||||
for worker in self.worker_handles.iter() {
|
||||
let worker = worker.lock().await;
|
||||
match worker.get_state_size().await {
|
||||
Ok(state_size) => {
|
||||
full_report.extend(state_size.into_iter().map(|(k, v)| (k as u32, v)))
|
||||
}
|
||||
Err(err) => {
|
||||
common_telemetry::error!(err; "Get flow stat size error");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
FlowStat {
|
||||
state_size: full_report,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -20,11 +20,12 @@ use common_meta::key::table_name::{TableNameKey, TableNameManager};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use table::metadata::TableId;
|
||||
|
||||
use crate::adapter::util::table_info_value_to_relation_desc;
|
||||
use crate::adapter::TableName;
|
||||
use crate::error::{
|
||||
Error, ExternalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu, UnexpectedSnafu,
|
||||
};
|
||||
use crate::repr::{self, ColumnType, RelationDesc, RelationType};
|
||||
use crate::repr::RelationDesc;
|
||||
|
||||
/// mapping of table name <-> table id should be query from tableinfo manager
|
||||
pub struct TableSource {
|
||||
@@ -61,7 +62,7 @@ impl TableSource {
|
||||
.map(|id| id.table_id())
|
||||
}
|
||||
|
||||
/// If the table havn't been created in database, the tableId returned would be null
|
||||
/// If the table haven't been created in database, the tableId returned would be null
|
||||
pub async fn get_table_id_from_name(&self, name: &TableName) -> Result<Option<TableId>, Error> {
|
||||
let ret = self
|
||||
.table_name_manager
|
||||
@@ -121,38 +122,7 @@ impl TableSource {
|
||||
table_name.table_name,
|
||||
];
|
||||
|
||||
let raw_schema = table_info_value.table_info.meta.schema;
|
||||
let (column_types, col_names): (Vec<_>, Vec<_>) = raw_schema
|
||||
.column_schemas
|
||||
.clone()
|
||||
.into_iter()
|
||||
.map(|col| {
|
||||
(
|
||||
ColumnType {
|
||||
nullable: col.is_nullable(),
|
||||
scalar_type: col.data_type,
|
||||
},
|
||||
Some(col.name),
|
||||
)
|
||||
})
|
||||
.unzip();
|
||||
|
||||
let key = table_info_value.table_info.meta.primary_key_indices;
|
||||
let keys = vec![repr::Key::from(key)];
|
||||
|
||||
let time_index = raw_schema.timestamp_index;
|
||||
Ok((
|
||||
table_name,
|
||||
RelationDesc {
|
||||
typ: RelationType {
|
||||
column_types,
|
||||
keys,
|
||||
time_index,
|
||||
// by default table schema's column are all non-auto
|
||||
auto_columns: vec![],
|
||||
},
|
||||
names: col_names,
|
||||
},
|
||||
))
|
||||
let desc = table_info_value_to_relation_desc(table_info_value)?;
|
||||
Ok((table_name, desc))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,15 +12,167 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::helper::ColumnDataTypeWrapper;
|
||||
use api::v1::column_def::options_from_column_schema;
|
||||
use api::v1::{ColumnDataType, ColumnDataTypeExtension, SemanticType};
|
||||
use api::v1::{ColumnDataType, ColumnDataTypeExtension, CreateTableExpr, SemanticType};
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::key::table_info::TableInfoValue;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use itertools::Itertools;
|
||||
use snafu::ResultExt;
|
||||
use operator::expr_factory::CreateExprFactory;
|
||||
use session::context::QueryContextBuilder;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use table::table_reference::TableReference;
|
||||
|
||||
use crate::error::{Error, ExternalSnafu};
|
||||
use crate::adapter::{TableName, AUTO_CREATED_PLACEHOLDER_TS_COL};
|
||||
use crate::error::{Error, ExternalSnafu, UnexpectedSnafu};
|
||||
use crate::repr::{ColumnType, RelationDesc, RelationType};
|
||||
use crate::FlowWorkerManager;
|
||||
|
||||
impl FlowWorkerManager {
|
||||
/// Create table from given schema(will adjust to add auto column if needed), return true if table is created
|
||||
pub(crate) async fn create_table_from_relation(
|
||||
&self,
|
||||
flow_name: &str,
|
||||
table_name: &TableName,
|
||||
relation_desc: &RelationDesc,
|
||||
) -> Result<bool, Error> {
|
||||
if self.fetch_table_pk_schema(table_name).await?.is_some() {
|
||||
return Ok(false);
|
||||
}
|
||||
let (pks, tys, _) = self.adjust_auto_created_table_schema(relation_desc).await?;
|
||||
|
||||
//create sink table using pks, column types and is_ts_auto
|
||||
|
||||
let proto_schema = column_schemas_to_proto(tys.clone(), &pks)?;
|
||||
|
||||
// create sink table
|
||||
let create_expr = CreateExprFactory {}
|
||||
.create_table_expr_by_column_schemas(
|
||||
&TableReference {
|
||||
catalog: &table_name[0],
|
||||
schema: &table_name[1],
|
||||
table: &table_name[2],
|
||||
},
|
||||
&proto_schema,
|
||||
"mito",
|
||||
Some(&format!("Sink table for flow {}", flow_name)),
|
||||
)
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
|
||||
self.submit_create_sink_table_ddl(create_expr).await?;
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
/// Try fetch table with adjusted schema(added auto column if needed)
|
||||
pub(crate) async fn try_fetch_existing_table(
|
||||
&self,
|
||||
table_name: &TableName,
|
||||
) -> Result<Option<(bool, Vec<api::v1::ColumnSchema>)>, Error> {
|
||||
if let Some((primary_keys, time_index, schema)) =
|
||||
self.fetch_table_pk_schema(table_name).await?
|
||||
{
|
||||
// check if the last column is the auto created timestamp column, hence the table is auto created from
|
||||
// flow's plan type
|
||||
let is_auto_create = {
|
||||
let correct_name = schema
|
||||
.last()
|
||||
.map(|s| s.name == AUTO_CREATED_PLACEHOLDER_TS_COL)
|
||||
.unwrap_or(false);
|
||||
let correct_time_index = time_index == Some(schema.len() - 1);
|
||||
correct_name && correct_time_index
|
||||
};
|
||||
let proto_schema = column_schemas_to_proto(schema, &primary_keys)?;
|
||||
Ok(Some((is_auto_create, proto_schema)))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
/// submit a create table ddl
|
||||
pub(crate) async fn submit_create_sink_table_ddl(
|
||||
&self,
|
||||
mut create_table: CreateTableExpr,
|
||||
) -> Result<(), Error> {
|
||||
let stmt_exec = {
|
||||
self.frontend_invoker
|
||||
.read()
|
||||
.await
|
||||
.as_ref()
|
||||
.map(|f| f.statement_executor())
|
||||
}
|
||||
.context(UnexpectedSnafu {
|
||||
reason: "Failed to get statement executor",
|
||||
})?;
|
||||
let ctx = Arc::new(
|
||||
QueryContextBuilder::default()
|
||||
.current_catalog(create_table.catalog_name.clone())
|
||||
.current_schema(create_table.schema_name.clone())
|
||||
.build(),
|
||||
);
|
||||
stmt_exec
|
||||
.create_table_inner(&mut create_table, None, ctx)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn table_info_value_to_relation_desc(
|
||||
table_info_value: TableInfoValue,
|
||||
) -> Result<RelationDesc, Error> {
|
||||
let raw_schema = table_info_value.table_info.meta.schema;
|
||||
let (column_types, col_names): (Vec<_>, Vec<_>) = raw_schema
|
||||
.column_schemas
|
||||
.clone()
|
||||
.into_iter()
|
||||
.map(|col| {
|
||||
(
|
||||
ColumnType {
|
||||
nullable: col.is_nullable(),
|
||||
scalar_type: col.data_type,
|
||||
},
|
||||
Some(col.name),
|
||||
)
|
||||
})
|
||||
.unzip();
|
||||
|
||||
let key = table_info_value.table_info.meta.primary_key_indices;
|
||||
let keys = vec![crate::repr::Key::from(key)];
|
||||
|
||||
let time_index = raw_schema.timestamp_index;
|
||||
|
||||
Ok(RelationDesc {
|
||||
typ: RelationType {
|
||||
column_types,
|
||||
keys,
|
||||
time_index,
|
||||
// by default table schema's column are all non-auto
|
||||
auto_columns: vec![],
|
||||
},
|
||||
names: col_names,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn from_proto_to_data_type(
|
||||
column_schema: &api::v1::ColumnSchema,
|
||||
) -> Result<ConcreteDataType, Error> {
|
||||
let wrapper = ColumnDataTypeWrapper::try_new(
|
||||
column_schema.datatype,
|
||||
column_schema.datatype_extension.clone(),
|
||||
)
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
let cdt = ConcreteDataType::from(wrapper);
|
||||
|
||||
Ok(cdt)
|
||||
}
|
||||
|
||||
/// convert `ColumnSchema` lists to it's corresponding proto type
|
||||
pub fn column_schemas_to_proto(
|
||||
@@ -60,3 +212,29 @@ pub fn column_schemas_to_proto(
|
||||
.collect();
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
/// Convert `RelationDesc` to `ColumnSchema` list,
|
||||
/// if the column name is not present, use `col_{idx}` as the column name
|
||||
pub fn relation_desc_to_column_schemas_with_fallback(schema: &RelationDesc) -> Vec<ColumnSchema> {
|
||||
schema
|
||||
.typ()
|
||||
.column_types
|
||||
.clone()
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.map(|(idx, typ)| {
|
||||
let name = schema
|
||||
.names
|
||||
.get(idx)
|
||||
.cloned()
|
||||
.flatten()
|
||||
.unwrap_or(format!("col_{}", idx));
|
||||
let ret = ColumnSchema::new(name, typ.scalar_type, typ.nullable);
|
||||
if schema.typ().time_index == Some(idx) {
|
||||
ret.with_time_index(true)
|
||||
} else {
|
||||
ret
|
||||
}
|
||||
})
|
||||
.collect_vec()
|
||||
}
|
||||
|
||||
@@ -197,6 +197,21 @@ impl WorkerHandle {
|
||||
.fail()
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_state_size(&self) -> Result<BTreeMap<FlowId, usize>, Error> {
|
||||
let ret = self
|
||||
.itc_client
|
||||
.call_with_resp(Request::QueryStateSize)
|
||||
.await?;
|
||||
ret.into_query_state_size().map_err(|ret| {
|
||||
InternalSnafu {
|
||||
reason: format!(
|
||||
"Flow Node/Worker itc failed, expect Response::QueryStateSize, found {ret:?}"
|
||||
),
|
||||
}
|
||||
.build()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for WorkerHandle {
|
||||
@@ -361,6 +376,13 @@ impl<'s> Worker<'s> {
|
||||
Some(Response::ContainTask { result: ret })
|
||||
}
|
||||
Request::Shutdown => return Err(()),
|
||||
Request::QueryStateSize => {
|
||||
let mut ret = BTreeMap::new();
|
||||
for (flow_id, task_state) in self.task_states.iter() {
|
||||
ret.insert(*flow_id, task_state.state.get_state_size());
|
||||
}
|
||||
Some(Response::QueryStateSize { result: ret })
|
||||
}
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
@@ -391,6 +413,7 @@ pub enum Request {
|
||||
flow_id: FlowId,
|
||||
},
|
||||
Shutdown,
|
||||
QueryStateSize,
|
||||
}
|
||||
|
||||
#[derive(Debug, EnumAsInner)]
|
||||
@@ -406,6 +429,10 @@ enum Response {
|
||||
result: bool,
|
||||
},
|
||||
RunAvail,
|
||||
QueryStateSize {
|
||||
/// each flow tasks' state size
|
||||
result: BTreeMap<FlowId, usize>,
|
||||
},
|
||||
}
|
||||
|
||||
fn create_inter_thread_call() -> (InterThreadCallClient, InterThreadCallServer) {
|
||||
@@ -423,10 +450,12 @@ struct InterThreadCallClient {
|
||||
}
|
||||
|
||||
impl InterThreadCallClient {
|
||||
/// call without response
|
||||
fn call_no_resp(&self, req: Request) -> Result<(), Error> {
|
||||
self.arg_sender.send((req, None)).map_err(from_send_error)
|
||||
}
|
||||
|
||||
/// call with response
|
||||
async fn call_with_resp(&self, req: Request) -> Result<Response, Error> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.arg_sender
|
||||
@@ -527,6 +556,7 @@ mod test {
|
||||
);
|
||||
tx.send(Batch::empty()).unwrap();
|
||||
handle.run_available(0, true).await.unwrap();
|
||||
assert_eq!(handle.get_state_size().await.unwrap().len(), 1);
|
||||
assert_eq!(sink_rx.recv().await.unwrap(), Batch::empty());
|
||||
drop(handle);
|
||||
worker_thread_handle.join().unwrap();
|
||||
|
||||
@@ -30,7 +30,7 @@ use crate::compute::types::{Collection, CollectionBundle, ErrCollector, Toff};
|
||||
use crate::error::{Error, InvalidQuerySnafu, NotImplementedSnafu};
|
||||
use crate::expr::{self, Batch, GlobalId, LocalId};
|
||||
use crate::plan::{Plan, TypedPlan};
|
||||
use crate::repr::{self, DiffRow};
|
||||
use crate::repr::{self, DiffRow, RelationType};
|
||||
|
||||
mod map;
|
||||
mod reduce;
|
||||
@@ -124,10 +124,10 @@ impl Context<'_, '_> {
|
||||
/// Like `render_plan` but in Batch Mode
|
||||
pub fn render_plan_batch(&mut self, plan: TypedPlan) -> Result<CollectionBundle<Batch>, Error> {
|
||||
match plan.plan {
|
||||
Plan::Constant { rows } => Ok(self.render_constant_batch(rows)),
|
||||
Plan::Constant { rows } => Ok(self.render_constant_batch(rows, &plan.schema.typ)),
|
||||
Plan::Get { id } => self.get_batch_by_id(id),
|
||||
Plan::Let { id, value, body } => self.eval_batch_let(id, value, body),
|
||||
Plan::Mfp { input, mfp } => self.render_mfp_batch(input, mfp),
|
||||
Plan::Mfp { input, mfp } => self.render_mfp_batch(input, mfp, &plan.schema.typ),
|
||||
Plan::Reduce {
|
||||
input,
|
||||
key_val_plan,
|
||||
@@ -172,7 +172,11 @@ impl Context<'_, '_> {
|
||||
/// render Constant, take all rows that have a timestamp not greater than the current time
|
||||
/// This function is primarily used for testing
|
||||
/// Always assume input is sorted by timestamp
|
||||
pub fn render_constant_batch(&mut self, rows: Vec<DiffRow>) -> CollectionBundle<Batch> {
|
||||
pub fn render_constant_batch(
|
||||
&mut self,
|
||||
rows: Vec<DiffRow>,
|
||||
output_type: &RelationType,
|
||||
) -> CollectionBundle<Batch> {
|
||||
let (send_port, recv_port) = self.df.make_edge::<_, Toff<Batch>>("constant_batch");
|
||||
let mut per_time: BTreeMap<repr::Timestamp, Vec<DiffRow>> = Default::default();
|
||||
for (key, group) in &rows.into_iter().group_by(|(_row, ts, _diff)| *ts) {
|
||||
@@ -185,6 +189,8 @@ impl Context<'_, '_> {
|
||||
let scheduler_inner = scheduler.clone();
|
||||
let err_collector = self.err_collector.clone();
|
||||
|
||||
let output_type = output_type.clone();
|
||||
|
||||
let subgraph_id =
|
||||
self.df
|
||||
.add_subgraph_source("ConstantBatch", send_port, move |_ctx, send_port| {
|
||||
@@ -199,7 +205,14 @@ impl Context<'_, '_> {
|
||||
not_great_than_now.into_iter().for_each(|(_ts, rows)| {
|
||||
err_collector.run(|| {
|
||||
let rows = rows.into_iter().map(|(row, _ts, _diff)| row).collect();
|
||||
let batch = Batch::try_from_rows(rows)?;
|
||||
let batch = Batch::try_from_rows_with_types(
|
||||
rows,
|
||||
&output_type
|
||||
.column_types
|
||||
.iter()
|
||||
.map(|ty| ty.scalar_type().clone())
|
||||
.collect_vec(),
|
||||
)?;
|
||||
send_port.give(vec![batch]);
|
||||
Ok(())
|
||||
});
|
||||
|
||||
@@ -25,7 +25,7 @@ use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector
|
||||
use crate::error::{Error, PlanSnafu};
|
||||
use crate::expr::{Batch, EvalError, MapFilterProject, MfpPlan, ScalarExpr};
|
||||
use crate::plan::TypedPlan;
|
||||
use crate::repr::{self, DiffRow, KeyValDiffRow, Row};
|
||||
use crate::repr::{self, DiffRow, KeyValDiffRow, RelationType, Row};
|
||||
use crate::utils::ArrangeHandler;
|
||||
|
||||
impl Context<'_, '_> {
|
||||
@@ -34,6 +34,7 @@ impl Context<'_, '_> {
|
||||
&mut self,
|
||||
input: Box<TypedPlan>,
|
||||
mfp: MapFilterProject,
|
||||
_output_type: &RelationType,
|
||||
) -> Result<CollectionBundle<Batch>, Error> {
|
||||
let input = self.render_plan_batch(*input)?;
|
||||
|
||||
|
||||
@@ -16,6 +16,7 @@ use std::collections::{BTreeMap, BTreeSet};
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::array::new_null_array;
|
||||
use common_telemetry::trace;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::prelude::DataType;
|
||||
@@ -87,6 +88,8 @@ impl Context<'_, '_> {
|
||||
})?;
|
||||
let key_val_plan = key_val_plan.clone();
|
||||
|
||||
let output_type = output_type.clone();
|
||||
|
||||
let now = self.compute_state.current_time_ref();
|
||||
|
||||
let err_collector = self.err_collector.clone();
|
||||
@@ -118,6 +121,7 @@ impl Context<'_, '_> {
|
||||
src_data,
|
||||
&key_val_plan,
|
||||
&accum_plan,
|
||||
&output_type,
|
||||
SubgraphArg {
|
||||
now,
|
||||
err_collector: &err_collector,
|
||||
@@ -354,6 +358,7 @@ fn reduce_batch_subgraph(
|
||||
src_data: impl IntoIterator<Item = Batch>,
|
||||
key_val_plan: &KeyValPlan,
|
||||
accum_plan: &AccumulablePlan,
|
||||
output_type: &RelationType,
|
||||
SubgraphArg {
|
||||
now,
|
||||
err_collector,
|
||||
@@ -394,20 +399,54 @@ fn reduce_batch_subgraph(
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: here reduce numbers of eq to minimal by keeping slicing key/val batch
|
||||
let key_data_types = output_type
|
||||
.column_types
|
||||
.iter()
|
||||
.map(|t| t.scalar_type.clone())
|
||||
.collect_vec();
|
||||
|
||||
// TODO(discord9): here reduce numbers of eq to minimal by keeping slicing key/val batch
|
||||
for key_row in distinct_keys {
|
||||
let key_scalar_value = {
|
||||
let mut key_scalar_value = Vec::with_capacity(key_row.len());
|
||||
for key in key_row.iter() {
|
||||
for (key_idx, key) in key_row.iter().enumerate() {
|
||||
let v =
|
||||
key.try_to_scalar_value(&key.data_type())
|
||||
.context(DataTypeSnafu {
|
||||
msg: "can't convert key values to datafusion value",
|
||||
})?;
|
||||
let arrow_value =
|
||||
|
||||
let key_data_type = key_data_types.get(key_idx).context(InternalSnafu {
|
||||
reason: format!(
|
||||
"Key index out of bound, expected at most {} but got {}",
|
||||
output_type.column_types.len(),
|
||||
key_idx
|
||||
),
|
||||
})?;
|
||||
|
||||
// if incoming value's datatype is null, it need to be handled specially, see below
|
||||
if key_data_type.as_arrow_type() != v.data_type()
|
||||
&& !v.data_type().is_null()
|
||||
{
|
||||
crate::expr::error::InternalSnafu {
|
||||
reason: format!(
|
||||
"Key data type mismatch, expected {:?} but got {:?}",
|
||||
key_data_type.as_arrow_type(),
|
||||
v.data_type()
|
||||
),
|
||||
}
|
||||
.fail()?
|
||||
}
|
||||
|
||||
// handle single null key
|
||||
let arrow_value = if v.data_type().is_null() {
|
||||
let ret = new_null_array(&arrow::datatypes::DataType::Null, 1);
|
||||
arrow::array::Scalar::new(ret)
|
||||
} else {
|
||||
v.to_scalar().context(crate::expr::error::DatafusionSnafu {
|
||||
context: "can't convert key values to arrow value",
|
||||
})?;
|
||||
})?
|
||||
};
|
||||
key_scalar_value.push(arrow_value);
|
||||
}
|
||||
key_scalar_value
|
||||
@@ -419,7 +458,19 @@ fn reduce_batch_subgraph(
|
||||
.zip(key_batch.batch().iter())
|
||||
.map(|(key, col)| {
|
||||
// TODO(discord9): this takes half of the cpu! And this is redundant amount of `eq`!
|
||||
arrow::compute::kernels::cmp::eq(&key, &col.to_arrow_array().as_ref() as _)
|
||||
|
||||
// note that if lhs is a null, we still need to get all rows that are null! But can't use `eq` since
|
||||
// it will return null if input have null, so we need to use `is_null` instead
|
||||
if arrow::array::Datum::get(&key).0.data_type().is_null() {
|
||||
arrow::compute::kernels::boolean::is_null(
|
||||
col.to_arrow_array().as_ref() as _
|
||||
)
|
||||
} else {
|
||||
arrow::compute::kernels::cmp::eq(
|
||||
&key,
|
||||
&col.to_arrow_array().as_ref() as _,
|
||||
)
|
||||
}
|
||||
})
|
||||
.try_collect::<_, Vec<_>, _>()
|
||||
.context(ArrowSnafu {
|
||||
@@ -535,17 +586,13 @@ fn reduce_batch_subgraph(
|
||||
// this output part is not supposed to be resource intensive
|
||||
// (because for every batch there wouldn't usually be as many output row?),
|
||||
// so we can do some costly operation here
|
||||
let output_types = all_output_dict.first_entry().map(|entry| {
|
||||
entry
|
||||
.key()
|
||||
.iter()
|
||||
.chain(entry.get().iter())
|
||||
.map(|v| v.data_type())
|
||||
.collect::<Vec<ConcreteDataType>>()
|
||||
});
|
||||
let output_types = output_type
|
||||
.column_types
|
||||
.iter()
|
||||
.map(|t| t.scalar_type.clone())
|
||||
.collect_vec();
|
||||
|
||||
if let Some(output_types) = output_types {
|
||||
err_collector.run(|| {
|
||||
err_collector.run(|| {
|
||||
let column_cnt = output_types.len();
|
||||
let row_cnt = all_output_dict.len();
|
||||
|
||||
@@ -585,7 +632,6 @@ fn reduce_batch_subgraph(
|
||||
|
||||
Ok(())
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// reduce subgraph, reduce the input data into a single row
|
||||
@@ -1516,7 +1562,9 @@ mod test {
|
||||
let mut ctx = harness_test_ctx(&mut df, &mut state);
|
||||
|
||||
let rows = vec![
|
||||
(Row::new(vec![1i64.into()]), 1, 1),
|
||||
(Row::new(vec![Value::Null]), -1, 1),
|
||||
(Row::new(vec![1i64.into()]), 0, 1),
|
||||
(Row::new(vec![Value::Null]), 1, 1),
|
||||
(Row::new(vec![2i64.into()]), 2, 1),
|
||||
(Row::new(vec![3i64.into()]), 3, 1),
|
||||
(Row::new(vec![1i64.into()]), 4, 1),
|
||||
@@ -1558,13 +1606,15 @@ mod test {
|
||||
Box::new(input_plan.with_types(typ.into_unnamed())),
|
||||
&key_val_plan,
|
||||
&reduce_plan,
|
||||
&RelationType::empty(),
|
||||
&RelationType::new(vec![ColumnType::new(CDT::int64_datatype(), true)]),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
{
|
||||
let now_inner = now.clone();
|
||||
let expected = BTreeMap::<i64, Vec<i64>>::from([
|
||||
(-1, vec![]),
|
||||
(0, vec![1i64]),
|
||||
(1, vec![1i64]),
|
||||
(2, vec![3i64]),
|
||||
(3, vec![6i64]),
|
||||
@@ -1581,7 +1631,11 @@ mod test {
|
||||
|
||||
if let Some(expected) = expected.get(&now) {
|
||||
let batch = expected.iter().map(|v| Value::from(*v)).collect_vec();
|
||||
let batch = Batch::try_from_rows(vec![batch.into()]).unwrap();
|
||||
let batch = Batch::try_from_rows_with_types(
|
||||
vec![batch.into()],
|
||||
&[CDT::int64_datatype()],
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(res.first(), Some(&batch));
|
||||
}
|
||||
});
|
||||
|
||||
@@ -16,6 +16,7 @@ use std::cell::RefCell;
|
||||
use std::collections::{BTreeMap, VecDeque};
|
||||
use std::rc::Rc;
|
||||
|
||||
use get_size2::GetSize;
|
||||
use hydroflow::scheduled::graph::Hydroflow;
|
||||
use hydroflow::scheduled::SubgraphId;
|
||||
|
||||
@@ -109,6 +110,10 @@ impl DataflowState {
|
||||
pub fn expire_after(&self) -> Option<Timestamp> {
|
||||
self.expire_after
|
||||
}
|
||||
|
||||
pub fn get_state_size(&self) -> usize {
|
||||
self.arrange_used.iter().map(|x| x.read().get_size()).sum()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
|
||||
@@ -17,6 +17,7 @@ use std::collections::{BTreeMap, VecDeque};
|
||||
use std::rc::Rc;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_error::ext::ErrorExt;
|
||||
use hydroflow::scheduled::graph::Hydroflow;
|
||||
use hydroflow::scheduled::handoff::TeeingHandoff;
|
||||
use hydroflow::scheduled::port::RecvPort;
|
||||
@@ -25,6 +26,7 @@ use itertools::Itertools;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use crate::expr::{Batch, EvalError, ScalarExpr};
|
||||
use crate::metrics::METRIC_FLOW_ERRORS;
|
||||
use crate::repr::DiffRow;
|
||||
use crate::utils::ArrangeHandler;
|
||||
|
||||
@@ -185,6 +187,9 @@ impl ErrCollector {
|
||||
}
|
||||
|
||||
pub fn push_err(&self, err: EvalError) {
|
||||
METRIC_FLOW_ERRORS
|
||||
.with_label_values(&[err.status_code().as_ref()])
|
||||
.inc();
|
||||
self.inner.blocking_lock().push_back(err)
|
||||
}
|
||||
|
||||
|
||||
@@ -492,7 +492,7 @@ impl ScalarUDFImpl for TumbleExpand {
|
||||
if let Some(start_time) = opt{
|
||||
if !matches!(start_time, Utf8 | Date32 | Date64 | Timestamp(_, _)){
|
||||
return Err(DataFusionError::Plan(
|
||||
format!("Expect start_time to either be date, timestampe or string, found {:?}", start_time)
|
||||
format!("Expect start_time to either be date, timestamp or string, found {:?}", start_time)
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,12 +16,13 @@
|
||||
|
||||
use std::any::Any;
|
||||
|
||||
use common_error::define_into_tonic_status;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_error::{define_into_tonic_status, from_err_code_msg_to_header};
|
||||
use common_macro::stack_trace_debug;
|
||||
use common_telemetry::common_error::ext::ErrorExt;
|
||||
use common_telemetry::common_error::status_code::StatusCode;
|
||||
use snafu::{Location, Snafu};
|
||||
use tonic::metadata::MetadataMap;
|
||||
|
||||
use crate::adapter::FlowId;
|
||||
use crate::expr::EvalError;
|
||||
@@ -186,6 +187,20 @@ pub enum Error {
|
||||
},
|
||||
}
|
||||
|
||||
/// the outer message is the full error stack, and inner message in header is the last error message that can be show directly to user
|
||||
pub fn to_status_with_last_err(err: impl ErrorExt) -> tonic::Status {
|
||||
let msg = err.to_string();
|
||||
let last_err_msg = common_error::ext::StackError::last(&err).to_string();
|
||||
let code = err.status_code() as u32;
|
||||
let header = from_err_code_msg_to_header(code, &last_err_msg);
|
||||
|
||||
tonic::Status::with_metadata(
|
||||
tonic::Code::InvalidArgument,
|
||||
msg,
|
||||
MetadataMap::from_headers(header),
|
||||
)
|
||||
}
|
||||
|
||||
/// Result type for flow module
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
@@ -200,9 +215,8 @@ impl ErrorExt for Error {
|
||||
| Self::TableNotFoundMeta { .. }
|
||||
| Self::FlowNotFound { .. }
|
||||
| Self::ListFlows { .. } => StatusCode::TableNotFound,
|
||||
Self::InvalidQuery { .. } | Self::Plan { .. } | Self::Datatypes { .. } => {
|
||||
StatusCode::PlanQuery
|
||||
}
|
||||
Self::Plan { .. } | Self::Datatypes { .. } => StatusCode::PlanQuery,
|
||||
Self::InvalidQuery { .. } => StatusCode::EngineExecuteQuery,
|
||||
Self::Unexpected { .. } => StatusCode::Unexpected,
|
||||
Self::NotImplemented { .. } | Self::UnsupportedTemporalFilter { .. } => {
|
||||
StatusCode::Unsupported
|
||||
|
||||
@@ -24,7 +24,7 @@ mod scalar;
|
||||
mod signature;
|
||||
|
||||
use arrow::compute::FilterBuilder;
|
||||
use datatypes::prelude::DataType;
|
||||
use datatypes::prelude::{ConcreteDataType, DataType};
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::{BooleanVector, Helper, VectorRef};
|
||||
pub(crate) use df_func::{DfScalarFunction, RawDfScalarFn};
|
||||
@@ -85,16 +85,18 @@ impl Default for Batch {
|
||||
}
|
||||
|
||||
impl Batch {
|
||||
pub fn try_from_rows(rows: Vec<crate::repr::Row>) -> Result<Self, EvalError> {
|
||||
/// Get batch from rows, will try best to determine data type
|
||||
pub fn try_from_rows_with_types(
|
||||
rows: Vec<crate::repr::Row>,
|
||||
batch_datatypes: &[ConcreteDataType],
|
||||
) -> Result<Self, EvalError> {
|
||||
if rows.is_empty() {
|
||||
return Ok(Self::empty());
|
||||
}
|
||||
let len = rows.len();
|
||||
let mut builder = rows
|
||||
.first()
|
||||
.unwrap()
|
||||
let mut builder = batch_datatypes
|
||||
.iter()
|
||||
.map(|v| v.data_type().create_mutable_vector(len))
|
||||
.map(|ty| ty.create_mutable_vector(len))
|
||||
.collect_vec();
|
||||
for row in rows {
|
||||
ensure!(
|
||||
@@ -221,10 +223,25 @@ impl Batch {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let dts = if self.batch.is_empty() {
|
||||
other.batch.iter().map(|v| v.data_type()).collect_vec()
|
||||
} else {
|
||||
self.batch.iter().map(|v| v.data_type()).collect_vec()
|
||||
let dts = {
|
||||
let max_len = self.batch.len().max(other.batch.len());
|
||||
let mut dts = Vec::with_capacity(max_len);
|
||||
for i in 0..max_len {
|
||||
if let Some(v) = self.batch().get(i)
|
||||
&& !v.data_type().is_null()
|
||||
{
|
||||
dts.push(v.data_type())
|
||||
} else if let Some(v) = other.batch().get(i)
|
||||
&& !v.data_type().is_null()
|
||||
{
|
||||
dts.push(v.data_type())
|
||||
} else {
|
||||
// both are null, so we will push null type
|
||||
dts.push(datatypes::prelude::ConcreteDataType::null_datatype())
|
||||
}
|
||||
}
|
||||
|
||||
dts
|
||||
};
|
||||
|
||||
let batch_builders = dts
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user