Compare commits

..

24 Commits

Author SHA1 Message Date
Niwaka
358d5e1d63 fix: support alter table ~ add ~ custom_type (#5165) 2024-12-15 09:05:29 +00:00
Yingwen
579059d99f ci: use 4xlarge for nightly build (#5158) 2024-12-13 12:53:11 +00:00
localhost
53d55c0b6b fix: loki write row len error (#5161) 2024-12-13 10:10:59 +00:00
Yingwen
bef6896280 docs: Add index panels to standalone grafana dashboard (#5140)
* docs: Add index panels to standalnoe grafana dashboard

* docs: fix flush/compaction op
2024-12-13 08:17:49 +00:00
Yohan Wal
4b4c6dbb66 refactor: cache inverted index with fixed-size page (#5114)
* feat: cache inverted index by page instead of file

* fix: add unit test and fix bugs

* chore: typo

* chore: ci

* fix: math

* chore: apply review comments

* chore: renames

* test: add unit test for index key calculation

* refactor: use ReadableSize

* feat: add config for inverted index page size

* chore: update config file

* refactor: handle multiple range read and fix some related bugs

* fix: add config

* test: turn to a fs reader to match behaviors of object store
2024-12-13 07:34:24 +00:00
localhost
e8e9526738 chore: pipeline dryrun api can currently receives pipeline raw content (#5142)
* chore: pipeline dryrun api can currently receives pipeline raw content

* chore: remove dryrun v1 and add test

* chore: change dryrun pipeline api body schema

* chore: remove useless struct PipelineInfo

* chore: update PipelineDryrunParams doc

* chore: increase code readability

* chore: add some comment for pipeline dryrun test

* Apply suggestions from code review

Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>

* chore: format code

---------

Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>
2024-12-12 11:47:21 +00:00
Yingwen
fee75a1fad feat: collect reader metrics from prune reader (#5152) 2024-12-12 11:27:22 +00:00
localhost
b8a78b7838 chore: decide tag column in log api follow table schema if table exists (#5138)
* chore: decide tag column in log api follow table schema if table exists

* chore: add more test for greptime_identity pipeline

* chore: change pipeline get_table function signature

* chore: change identity_pipeline_inner tag_column_names type
2024-12-12 09:01:21 +00:00
Weny Xu
2137c53274 feat(index): add file_size_hint for remote blob reader (#5147)
feat(index): add file_size_hint for remote blob reader
2024-12-12 04:45:40 +00:00
Yohan Wal
03ad6e2a8d feat(fuzz): add alter table options for alter fuzzer (#5074)
* feat(fuzz): add set table options to alter fuzzer

* chore: clippy is happy, I'm sad

* chore: happy ci happy

* fix: unit test

* feat(fuzz): add unset table options to alter fuzzer

* fix: unit test

* feat(fuzz): add table option validator

* fix: make clippy happy

* chore: add comments

* chore: apply review comments

* fix: unit test

* feat(fuzz): add more ttl options

* fix: #5108

* chore: add comments

* chore: add comments
2024-12-12 04:21:38 +00:00
Weny Xu
d53fbcb936 feat: introduce PuffinMetadataCache (#5148)
* feat: introduce `PuffinMetadataCache`

* refactor: remove too_many_arguments

* chore: fmt toml
2024-12-12 04:09:36 +00:00
Weny Xu
8c1959c580 feat: add prefetch support to InvertedIndexFooterReader for reduced I/O time (#5146)
* feat: add prefetch support to `InvertedIndeFooterReader`

* chore: correct struct name

* chore: apply suggestions from CR
2024-12-12 03:49:54 +00:00
Weny Xu
e2a41ccaec feat: add prefetch support to PuffinFileFooterReader for reduced I/O time (#5145)
* feat: introduce `PuffinFileFooterReader`

* refactor: remove `SyncReader` trait and impl

* refactor: replace `FooterParser` with `PuffinFileFooterReader`

* chore: remove unused errors
2024-12-12 03:13:36 +00:00
Niwaka
a8012147ab feat: support push down IN filter (#5129)
* feat: support push down IN filter

* chore: move tests to prune.sql
2024-12-11 13:46:23 +00:00
Ruihang Xia
60f8dbf7f0 feat: implement v1/sql/parse endpoint to parse GreptimeDB's SQL dialect (#5144)
* derive ser/de

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* impl method

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove deserialize

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-12-11 13:33:54 +00:00
ZonaHe
9da2e17d0e feat: update dashboard to v0.7.2 (#5141)
Co-authored-by: sunchanglong <sunchanglong@users.noreply.github.com>
2024-12-11 12:47:59 +00:00
Yohan Wal
1a8e77a480 test: part of parser test migrated from duckdb (#5125)
* test: update test

* fix: fix test
2024-12-11 09:28:13 +00:00
Zhenchi
e1e39993f7 feat(vector): add scalar add function (#5119)
* refactor: extract implicit conversion helper functions of vector

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat(vector): add scalar add function

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix fmt

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
2024-12-11 09:25:56 +00:00
Lei, HUANG
a30d918df2 perf: avoid cache during compaction (#5135)
* Revert "refactor: Avoid wrapping Option for CacheManagerRef (#4996)"

This reverts commit 42bf7e9965.

* fix: memory usage during log ingestion

* fix: fmt
2024-12-11 08:24:41 +00:00
dennis zhuang
2c4ac76754 feat: adjust WAL purge default configurations (#5107)
* feat: adjust WAL purge default configurations

* fix: config

* feat: change raft engine file_size default to 128Mib
2024-12-11 08:08:05 +00:00
jeremyhi
a6893aad42 chore: set store_key_prefix for all kvbackend (#5132) 2024-12-11 08:04:02 +00:00
discord9
d91517688a chore: fix aws_lc not in depend tree check in CI (#5121)
* chore: fix aws_lc check in CI

* chore: update lock file
2024-12-11 07:02:03 +00:00
shuiyisong
3d1b8c4fac chore: add /ready api for health checking (#5124)
* chore: add ready endpoint for health checking

* chore: add test
2024-12-11 02:56:48 +00:00
Yingwen
7c69ca0502 chore: bump main branch version to 0.12 (#5133)
chore: bump version to v0.12.0
2024-12-10 13:10:37 +00:00
421 changed files with 4607 additions and 20431 deletions

View File

@@ -54,7 +54,7 @@ runs:
PROFILE_TARGET: ${{ inputs.cargo-profile == 'dev' && 'debug' || inputs.cargo-profile }}
with:
artifacts-dir: ${{ inputs.artifacts-dir }}
target-files: ./target/$PROFILE_TARGET/greptime
target-file: ./target/$PROFILE_TARGET/greptime
version: ${{ inputs.version }}
working-dir: ${{ inputs.working-dir }}
@@ -72,6 +72,6 @@ runs:
if: ${{ inputs.build-android-artifacts == 'true' }}
with:
artifacts-dir: ${{ inputs.artifacts-dir }}
target-files: ./target/aarch64-linux-android/release/greptime
target-file: ./target/aarch64-linux-android/release/greptime
version: ${{ inputs.version }}
working-dir: ${{ inputs.working-dir }}

View File

@@ -41,8 +41,8 @@ runs:
image-name: ${{ inputs.image-name }}
image-tag: ${{ inputs.version }}
docker-file: docker/ci/ubuntu/Dockerfile
amd64-artifact-name: greptime-linux-amd64-${{ inputs.version }}
arm64-artifact-name: greptime-linux-arm64-${{ inputs.version }}
amd64-artifact-name: greptime-linux-amd64-pyo3-${{ inputs.version }}
arm64-artifact-name: greptime-linux-arm64-pyo3-${{ inputs.version }}
platforms: linux/amd64,linux/arm64
push-latest-tag: ${{ inputs.push-latest-tag }}

View File

@@ -48,7 +48,19 @@ runs:
path: /tmp/greptime-*.log
retention-days: 3
- name: Build greptime
- name: Build standard greptime
uses: ./.github/actions/build-greptime-binary
with:
base-image: ubuntu
features: pyo3_backend,servers/dashboard
cargo-profile: ${{ inputs.cargo-profile }}
artifacts-dir: greptime-linux-${{ inputs.arch }}-pyo3-${{ inputs.version }}
version: ${{ inputs.version }}
working-dir: ${{ inputs.working-dir }}
image-registry: ${{ inputs.image-registry }}
image-namespace: ${{ inputs.image-namespace }}
- name: Build greptime without pyo3
if: ${{ inputs.dev-mode == 'false' }}
uses: ./.github/actions/build-greptime-binary
with:

View File

@@ -90,5 +90,5 @@ runs:
uses: ./.github/actions/upload-artifacts
with:
artifacts-dir: ${{ inputs.artifacts-dir }}
target-files: target/${{ inputs.arch }}/${{ inputs.cargo-profile }}/greptime
target-file: target/${{ inputs.arch }}/${{ inputs.cargo-profile }}/greptime
version: ${{ inputs.version }}

View File

@@ -33,6 +33,15 @@ runs:
- name: Rust Cache
uses: Swatinem/rust-cache@v2
- name: Install Python
uses: actions/setup-python@v5
with:
python-version: "3.10"
- name: Install PyArrow Package
shell: pwsh
run: pip install pyarrow numpy
- name: Install WSL distribution
uses: Vampire/setup-wsl@v2
with:
@@ -67,5 +76,5 @@ runs:
uses: ./.github/actions/upload-artifacts
with:
artifacts-dir: ${{ inputs.artifacts-dir }}
target-files: target/${{ inputs.arch }}/${{ inputs.cargo-profile }}/greptime,target/${{ inputs.arch }}/${{ inputs.cargo-profile }}/greptime.pdb
target-file: target/${{ inputs.arch }}/${{ inputs.cargo-profile }}/greptime
version: ${{ inputs.version }}

View File

@@ -5,7 +5,7 @@ meta:
[datanode]
[datanode.client]
timeout = "120s"
timeout = "60s"
datanode:
configData: |-
[runtime]
@@ -21,7 +21,7 @@ frontend:
global_rt_size = 4
[meta_client]
ddl_timeout = "120s"
ddl_timeout = "60s"
objectStorage:
s3:
bucket: default

View File

@@ -5,7 +5,7 @@ meta:
[datanode]
[datanode.client]
timeout = "120s"
timeout = "60s"
datanode:
configData: |-
[runtime]
@@ -17,7 +17,7 @@ frontend:
global_rt_size = 4
[meta_client]
ddl_timeout = "120s"
ddl_timeout = "60s"
objectStorage:
s3:
bucket: default

View File

@@ -11,7 +11,7 @@ meta:
[datanode]
[datanode.client]
timeout = "120s"
timeout = "60s"
datanode:
configData: |-
[runtime]
@@ -28,7 +28,7 @@ frontend:
global_rt_size = 4
[meta_client]
ddl_timeout = "120s"
ddl_timeout = "60s"
objectStorage:
s3:
bucket: default

View File

@@ -18,8 +18,6 @@ runs:
--set controller.replicaCount=${{ inputs.controller-replicas }} \
--set controller.resources.requests.cpu=50m \
--set controller.resources.requests.memory=128Mi \
--set controller.resources.limits.cpu=2000m \
--set controller.resources.limits.memory=2Gi \
--set listeners.controller.protocol=PLAINTEXT \
--set listeners.client.protocol=PLAINTEXT \
--create-namespace \

View File

@@ -4,8 +4,8 @@ inputs:
artifacts-dir:
description: Directory to store artifacts
required: true
target-files:
description: The multiple target files to upload, separated by comma
target-file:
description: The path of the target artifact
required: false
version:
description: Version of the artifact
@@ -18,16 +18,12 @@ runs:
using: composite
steps:
- name: Create artifacts directory
if: ${{ inputs.target-files != '' }}
if: ${{ inputs.target-file != '' }}
working-directory: ${{ inputs.working-dir }}
shell: bash
run: |
set -e
mkdir -p ${{ inputs.artifacts-dir }}
IFS=',' read -ra FILES <<< "${{ inputs.target-files }}"
for file in "${FILES[@]}"; do
cp "$file" ${{ inputs.artifacts-dir }}/
done
mkdir -p ${{ inputs.artifacts-dir }} && \
cp ${{ inputs.target-file }} ${{ inputs.artifacts-dir }}
# The compressed artifacts will use the following layout:
# greptime-linux-amd64-pyo3-v0.3.0sha256sum

View File

@@ -4,8 +4,7 @@ I hereby agree to the terms of the [GreptimeDB CLA](https://github.com/GreptimeT
## What's changed and what's your intention?
<!--
__!!! DO NOT LEAVE THIS BLOCK EMPTY !!!__
__!!! DO NOT LEAVE THIS BLOCK EMPTY !!!__
Please explain IN DETAIL what the changes are in this PR and why they are needed:
@@ -13,14 +12,9 @@ Please explain IN DETAIL what the changes are in this PR and why they are needed
- How does this PR work? Need a brief introduction for the changed logic (optional)
- Describe clearly one logical change and avoid lazy messages (optional)
- Describe any limitations of the current code (optional)
- Describe if this PR will break **API or data compatibility** (optional)
-->
## PR Checklist
Please convert it to a draft if some of the following conditions are not met.
## Checklist
- [ ] I have written the necessary rustdoc comments.
- [ ] I have added the necessary unit tests and integration tests.
- [ ] This PR requires documentation updates.
- [ ] API changes are backward compatible.
- [ ] Schema or data changes are backward compatible.

View File

@@ -29,7 +29,7 @@ on:
linux_arm64_runner:
type: choice
description: The runner uses to build linux-arm64 artifacts
default: ec2-c6g.8xlarge-arm64
default: ec2-c6g.4xlarge-arm64
options:
- ec2-c6g.xlarge-arm64 # 4C8G
- ec2-c6g.2xlarge-arm64 # 8C16G

View File

@@ -10,6 +10,17 @@ on:
- 'docker/**'
- '.gitignore'
- 'grafana/**'
push:
branches:
- main
paths-ignore:
- 'docs/**'
- 'config/**'
- '**.md'
- '.dockerignore'
- 'docker/**'
- '.gitignore'
- 'grafana/**'
workflow_dispatch:
name: CI
@@ -73,7 +84,7 @@ jobs:
# Shares across multiple jobs
shared-key: "check-toml"
- name: Install taplo
run: cargo +stable install taplo-cli --version ^0.9 --locked --force
run: cargo +stable install taplo-cli --version ^0.9 --locked
- name: Run taplo
run: taplo format --check
@@ -96,7 +107,7 @@ jobs:
shared-key: "build-binaries"
- name: Install cargo-gc-bin
shell: bash
run: cargo install cargo-gc-bin --force
run: cargo install cargo-gc-bin
- name: Build greptime binaries
shell: bash
# `cargo gc` will invoke `cargo build` with specified args
@@ -152,7 +163,7 @@ jobs:
run: |
sudo apt-get install -y libfuzzer-14-dev
rustup install nightly
cargo +nightly install cargo-fuzz cargo-gc-bin --force
cargo +nightly install cargo-fuzz cargo-gc-bin
- name: Download pre-built binaries
uses: actions/download-artifact@v4
with:
@@ -209,7 +220,7 @@ jobs:
shell: bash
run: |
sudo apt update && sudo apt install -y libfuzzer-14-dev
cargo install cargo-fuzz cargo-gc-bin --force
cargo install cargo-fuzz cargo-gc-bin
- name: Download pre-built binariy
uses: actions/download-artifact@v4
with:
@@ -257,7 +268,7 @@ jobs:
shared-key: "build-greptime-ci"
- name: Install cargo-gc-bin
shell: bash
run: cargo install cargo-gc-bin --force
run: cargo install cargo-gc-bin
- name: Build greptime bianry
shell: bash
# `cargo gc` will invoke `cargo build` with specified args
@@ -312,6 +323,8 @@ jobs:
uses: ./.github/actions/setup-kafka-cluster
- name: Setup Etcd cluser
uses: ./.github/actions/setup-etcd-cluster
- name: Setup Postgres cluser
uses: ./.github/actions/setup-postgres-cluster
# Prepares for fuzz tests
- uses: arduino/setup-protoc@v3
with:
@@ -327,7 +340,7 @@ jobs:
run: |
sudo apt-get install -y libfuzzer-14-dev
rustup install nightly
cargo +nightly install cargo-fuzz cargo-gc-bin --force
cargo +nightly install cargo-fuzz cargo-gc-bin
# Downloads ci image
- name: Download pre-built binariy
uses: actions/download-artifact@v4
@@ -461,6 +474,8 @@ jobs:
uses: ./.github/actions/setup-kafka-cluster
- name: Setup Etcd cluser
uses: ./.github/actions/setup-etcd-cluster
- name: Setup Postgres cluser
uses: ./.github/actions/setup-postgres-cluster
# Prepares for fuzz tests
- uses: arduino/setup-protoc@v3
with:
@@ -476,7 +491,7 @@ jobs:
run: |
sudo apt-get install -y libfuzzer-14-dev
rustup install nightly
cargo +nightly install cargo-fuzz cargo-gc-bin --force
cargo +nightly install cargo-fuzz cargo-gc-bin
# Downloads ci image
- name: Download pre-built binariy
uses: actions/download-artifact@v4
@@ -642,7 +657,6 @@ jobs:
if: github.event.pull_request.draft == false
runs-on: ubuntu-20.04-8-cores
timeout-minutes: 60
needs: [clippy, fmt]
steps:
- uses: actions/checkout@v4
- uses: arduino/setup-protoc@v3
@@ -668,6 +682,12 @@ jobs:
uses: taiki-e/install-action@nextest
- name: Install cargo-llvm-cov
uses: taiki-e/install-action@cargo-llvm-cov
- name: Install Python
uses: actions/setup-python@v5
with:
python-version: '3.10'
- name: Install PyArrow Package
run: pip install pyarrow numpy
- name: Setup etcd server
working-directory: tests-integration/fixtures/etcd
run: docker compose -f docker-compose-standalone.yml up -d --wait
@@ -681,7 +701,7 @@ jobs:
working-directory: tests-integration/fixtures/postgres
run: docker compose -f docker-compose-standalone.yml up -d --wait
- name: Run nextest cases
run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F dashboard -F pg_kvbackend
run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F pyo3_backend -F dashboard
env:
CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=lld"
RUST_BACKTRACE: 1

View File

@@ -27,7 +27,7 @@ on:
linux_arm64_runner:
type: choice
description: The runner uses to build linux-arm64 artifacts
default: ec2-c6g.8xlarge-arm64
default: ec2-c6g.4xlarge-arm64
options:
- ec2-c6g.xlarge-arm64 # 4C8G
- ec2-c6g.2xlarge-arm64 # 8C16G

View File

@@ -1,6 +1,6 @@
on:
schedule:
- cron: "0 23 * * 1-4"
- cron: "0 23 * * 1-5"
workflow_dispatch:
name: Nightly CI
@@ -91,12 +91,18 @@ jobs:
uses: Swatinem/rust-cache@v2
- name: Install Cargo Nextest
uses: taiki-e/install-action@nextest
- name: Install Python
uses: actions/setup-python@v5
with:
python-version: "3.10"
- name: Install PyArrow Package
run: pip install pyarrow numpy
- name: Install WSL distribution
uses: Vampire/setup-wsl@v2
with:
distribution: Ubuntu-22.04
- name: Running tests
run: cargo nextest run -F dashboard
run: cargo nextest run -F pyo3_backend,dashboard
env:
CARGO_BUILD_RUSTFLAGS: "-C linker=lld-link"
RUST_BACKTRACE: 1
@@ -108,16 +114,6 @@ jobs:
GT_S3_REGION: ${{ vars.AWS_CI_TEST_BUCKET_REGION }}
UNITTEST_LOG_DIR: "__unittest_logs"
cleanbuild-linux-nix:
runs-on: ubuntu-latest-8-cores
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
- uses: cachix/install-nix-action@v27
with:
nix_path: nixpkgs=channel:nixos-unstable
- run: nix-shell --pure --run "cargo build"
check-status:
name: Check status
needs: [sqlness-test, sqlness-windows, test-on-windows]

View File

@@ -31,7 +31,7 @@ on:
linux_arm64_runner:
type: choice
description: The runner uses to build linux-arm64 artifacts
default: ec2-c6g.8xlarge-arm64
default: ec2-c6g.4xlarge-arm64
options:
- ubuntu-2204-32-cores-arm
- ec2-c6g.xlarge-arm64 # 4C8G
@@ -91,7 +91,7 @@ env:
# The scheduled version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-YYYYMMDD', like v0.2.0-nigthly-20230313;
NIGHTLY_RELEASE_PREFIX: nightly
# Note: The NEXT_RELEASE_VERSION should be modified manually by every formal release.
NEXT_RELEASE_VERSION: v0.11.0
NEXT_RELEASE_VERSION: v0.12.0
# Permission reference: https://docs.github.com/en/actions/using-jobs/assigning-permissions-to-jobs
permissions:
@@ -222,10 +222,18 @@ jobs:
arch: aarch64-apple-darwin
features: servers/dashboard
artifacts-dir-prefix: greptime-darwin-arm64
- os: ${{ needs.allocate-runners.outputs.macos-runner }}
arch: aarch64-apple-darwin
features: pyo3_backend,servers/dashboard
artifacts-dir-prefix: greptime-darwin-arm64-pyo3
- os: ${{ needs.allocate-runners.outputs.macos-runner }}
features: servers/dashboard
arch: x86_64-apple-darwin
artifacts-dir-prefix: greptime-darwin-amd64
- os: ${{ needs.allocate-runners.outputs.macos-runner }}
features: pyo3_backend,servers/dashboard
arch: x86_64-apple-darwin
artifacts-dir-prefix: greptime-darwin-amd64-pyo3
runs-on: ${{ matrix.os }}
outputs:
build-macos-result: ${{ steps.set-build-macos-result.outputs.build-macos-result }}
@@ -263,6 +271,10 @@ jobs:
arch: x86_64-pc-windows-msvc
features: servers/dashboard
artifacts-dir-prefix: greptime-windows-amd64
- os: ${{ needs.allocate-runners.outputs.windows-runner }}
arch: x86_64-pc-windows-msvc
features: pyo3_backend,servers/dashboard
artifacts-dir-prefix: greptime-windows-amd64-pyo3
runs-on: ${{ matrix.os }}
outputs:
build-windows-result: ${{ steps.set-build-windows-result.outputs.build-windows-result }}

6
.gitignore vendored
View File

@@ -47,10 +47,6 @@ benchmarks/data
venv/
# Fuzz tests
# Fuzz tests
tests-fuzz/artifacts/
tests-fuzz/corpus/
# Nix
.direnv
.envrc

533
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -68,7 +68,7 @@ members = [
resolver = "2"
[workspace.package]
version = "0.11.2"
version = "0.12.0"
edition = "2021"
license = "Apache-2.0"
@@ -124,9 +124,8 @@ etcd-client = "0.13"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "43ddd8dea69f4df0fe2e8b5cdc0044d2cfa35908" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a875e976441188028353f7274a46a7e6e065c5d4" }
hex = "0.4"
http = "0.2"
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"
@@ -135,7 +134,6 @@ lazy_static = "1.4"
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "a10facb353b41460eeb98578868ebf19c2084fac" }
mockall = "0.11.4"
moka = "0.12"
nalgebra = "0.33"
notify = "6.1"
num_cpus = "1.16"
once_cell = "1.18"
@@ -240,7 +238,6 @@ file-engine = { path = "src/file-engine" }
flow = { path = "src/flow" }
frontend = { path = "src/frontend", default-features = false }
index = { path = "src/index" }
log-query = { path = "src/log-query" }
log-store = { path = "src/log-store" }
meta-client = { path = "src/meta-client" }
meta-srv = { path = "src/meta-srv" }

View File

@@ -70,23 +70,23 @@ Our core developers have been building time-series data platforms for years. Bas
* **Unified Processing of Metrics, Logs, and Events**
GreptimeDB unifies time series data processing by treating all data - whether metrics, logs, or events - as timestamped events with context. Users can analyze this data using either [SQL](https://docs.greptime.com/user-guide/query-data/sql) or [PromQL](https://docs.greptime.com/user-guide/query-data/promql) and leverage stream processing ([Flow](https://docs.greptime.com/user-guide/flow-computation/overview)) to enable continuous aggregation. [Read more](https://docs.greptime.com/user-guide/concepts/data-model).
GreptimeDB unifies time series data processing by treating all data - whether metrics, logs, or events - as timestamped events with context. Users can analyze this data using either [SQL](https://docs.greptime.com/user-guide/query-data/sql) or [PromQL](https://docs.greptime.com/user-guide/query-data/promql) and leverage stream processing ([Flow](https://docs.greptime.com/user-guide/continuous-aggregation/overview)) to enable continuous aggregation. [Read more](https://docs.greptime.com/user-guide/concepts/data-model).
* **Cloud-native Distributed Database**
Built for [Kubernetes](https://docs.greptime.com/user-guide/deployments/deploy-on-kubernetes/greptimedb-operator-management). GreptimeDB achieves seamless scalability with its [cloud-native architecture](https://docs.greptime.com/user-guide/concepts/architecture) of separated compute and storage, built on object storage (AWS S3, Azure Blob Storage, etc.) while enabling cross-cloud deployment through a unified data access layer.
Built for [Kubernetes](https://docs.greptime.com/user-guide/deployments/deploy-on-kubernetes/greptimedb-operator-management). GreptimeDB achieves seamless scalability with its [cloud-native architecture](https://docs.greptime.com/user-guide/concepts/architecture) of separated compute and storage, built on object storage (AWS S3, Azure Blob Storage, etc.) while enabling cross-cloud deployment through a unified data access layer.
* **Performance and Cost-effective**
Written in pure Rust for superior performance and reliability. GreptimeDB features a distributed query engine with intelligent indexing to handle high cardinality data efficiently. Its optimized columnar storage achieves 50x cost efficiency on cloud object storage through advanced compression. [Benchmark reports](https://www.greptime.com/blogs/2024-09-09-report-summary).
Written in pure Rust for superior performance and reliability. GreptimeDB features a distributed query engine with intelligent indexing to handle high cardinality data efficiently. Its optimized columnar storage achieves 50x cost efficiency on cloud object storage through advanced compression. [Benchmark reports](https://www.greptime.com/blogs/2024-09-09-report-summary).
* **Cloud-Edge Collaboration**
GreptimeDB seamlessly operates across cloud and edge (ARM/Android/Linux), providing consistent APIs and control plane for unified data management and efficient synchronization. [Learn how to run on Android](https://docs.greptime.com/user-guide/deployments/run-on-android/).
GreptimeDB seamlessly operates across cloud and edge (ARM/Android/Linux), providing consistent APIs and control plane for unified data management and efficient synchronization. [Learn how to run on Android](https://docs.greptime.com/user-guide/deployments/run-on-android/).
* **Multi-protocol Ingestion, SQL & PromQL Ready**
Widely adopted database protocols and APIs, including MySQL, PostgreSQL, InfluxDB, OpenTelemetry, Loki and Prometheus, etc. Effortless Adoption & Seamless Migration. [Supported Protocols Overview](https://docs.greptime.com/user-guide/protocols/overview).
Widely adopted database protocols and APIs, including MySQL, PostgreSQL, InfluxDB, OpenTelemetry, Loki and Prometheus, etc. Effortless Adoption & Seamless Migration. [Supported Protocols Overview](https://docs.greptime.com/user-guide/protocols/overview).
For more detailed info please read [Why GreptimeDB](https://docs.greptime.com/user-guide/concepts/why-greptimedb).
@@ -138,7 +138,7 @@ Check the prerequisite:
* [Rust toolchain](https://www.rust-lang.org/tools/install) (nightly)
* [Protobuf compiler](https://grpc.io/docs/protoc-installation/) (>= 3.15)
* Python toolchain (optional): Required only if built with PyO3 backend. More details for compiling with PyO3 can be found in its [documentation](https://pyo3.rs/v0.18.1/building_and_distribution#configuring-the-python-version).
* Python toolchain (optional): Required only if built with PyO3 backend. More detail for compiling with PyO3 can be found in its [documentation](https://pyo3.rs/v0.18.1/building_and_distribution#configuring-the-python-version).
Build GreptimeDB binary:
@@ -154,10 +154,6 @@ cargo run -- standalone start
## Tools & Extensions
### Kubernetes
- [GreptimeDB Operator](https://github.com/GrepTimeTeam/greptimedb-operator)
### Dashboard
- [The dashboard UI for GreptimeDB](https://github.com/GreptimeTeam/dashboard)
@@ -177,7 +173,7 @@ Our official Grafana dashboard for monitoring GreptimeDB is available at [grafan
## Project Status
GreptimeDB is currently in Beta. We are targeting GA (General Availability) with v1.0 release by Early 2025.
GreptimeDB is currently in Beta. We are targeting GA (General Availability) with v1.0 release by Early 2025.
While in Beta, GreptimeDB is already:

View File

@@ -18,7 +18,6 @@
| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |
| `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited. |
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. Enabled by default. |
| `max_in_flight_write_bytes` | String | Unset | The maximum in-flight write bytes. |
| `runtime` | -- | -- | The runtime options. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
@@ -151,17 +150,12 @@
| `region_engine.mito.inverted_index.intermediate_path` | String | `""` | Deprecated, use `region_engine.mito.index.aux_path` instead. |
| `region_engine.mito.inverted_index.metadata_cache_size` | String | `64MiB` | Cache size for inverted index metadata. |
| `region_engine.mito.inverted_index.content_cache_size` | String | `128MiB` | Cache size for inverted index content. |
| `region_engine.mito.inverted_index.content_cache_page_size` | String | `64KiB` | Page size for inverted index content cache. |
| `region_engine.mito.inverted_index.content_cache_page_size` | String | `8MiB` | Page size for inverted index content cache. |
| `region_engine.mito.fulltext_index` | -- | -- | The options for full-text index in Mito engine. |
| `region_engine.mito.fulltext_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.apply_on_query` | String | `auto` | Whether to apply the index on query<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.mem_threshold_on_create` | String | `auto` | Memory threshold for index creation.<br/>- `auto`: automatically determine the threshold based on the system memory size (default)<br/>- `unlimited`: no memory limit<br/>- `[size]` e.g. `64MB`: fixed memory threshold |
| `region_engine.mito.bloom_filter_index` | -- | -- | The options for bloom filter in Mito engine. |
| `region_engine.mito.bloom_filter_index.create_on_flush` | String | `auto` | Whether to create the bloom filter on flush.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.bloom_filter_index.create_on_compaction` | String | `auto` | Whether to create the bloom filter on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.bloom_filter_index.apply_on_query` | String | `auto` | Whether to apply the bloom filter on query<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.bloom_filter_index.mem_threshold_on_create` | String | `auto` | Memory threshold for bloom filter creation.<br/>- `auto`: automatically determine the threshold based on the system memory size (default)<br/>- `unlimited`: no memory limit<br/>- `[size]` e.g. `64MB`: fixed memory threshold |
| `region_engine.mito.memtable` | -- | -- | -- |
| `region_engine.mito.memtable.type` | String | `time_series` | Memtable type.<br/>- `time_series`: time-series memtable<br/>- `partition_tree`: partition tree memtable (experimental) |
| `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.<br/>Only available for `partition_tree` memtable. |
@@ -201,7 +195,6 @@
| Key | Type | Default | Descriptions |
| --- | -----| ------- | ----------- |
| `default_timezone` | String | Unset | The default timezone of the server. |
| `max_in_flight_write_bytes` | String | Unset | The maximum in-flight write bytes. |
| `runtime` | -- | -- | The runtime options. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
@@ -428,7 +421,7 @@
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `/tmp/greptimedb/` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. |
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}/object_cache/read`. An empty string means disabling. |
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |
| `storage.bucket` | String | Unset | The S3 bucket name.<br/>**It's only used when the storage type is `S3`, `Oss` and `Gcs`**. |
| `storage.root` | String | Unset | The S3 data will be stored in the specified prefix, for example, `s3://${bucket}/${root}`.<br/>**It's only used when the storage type is `S3`, `Oss` and `Azblob`**. |
@@ -467,7 +460,7 @@
| `region_engine.mito.page_cache_size` | String | Auto | Cache size for pages of SST row groups. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/8 of OS memory. |
| `region_engine.mito.selector_result_cache_size` | String | Auto | Cache size for time series selector (e.g. `last_value()`). Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.enable_experimental_write_cache` | Bool | `false` | Whether to enable the experimental write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance. |
| `region_engine.mito.experimental_write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}`. |
| `region_engine.mito.experimental_write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}/object_cache/write`. |
| `region_engine.mito.experimental_write_cache_size` | String | `5GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. |
| `region_engine.mito.experimental_write_cache_ttl` | String | Unset | TTL for write cache. |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
@@ -485,17 +478,12 @@
| `region_engine.mito.inverted_index.intermediate_path` | String | `""` | Deprecated, use `region_engine.mito.index.aux_path` instead. |
| `region_engine.mito.inverted_index.metadata_cache_size` | String | `64MiB` | Cache size for inverted index metadata. |
| `region_engine.mito.inverted_index.content_cache_size` | String | `128MiB` | Cache size for inverted index content. |
| `region_engine.mito.inverted_index.content_cache_page_size` | String | `64KiB` | Page size for inverted index content cache. |
| `region_engine.mito.inverted_index.content_cache_page_size` | String | `8MiB` | Page size for inverted index content cache. |
| `region_engine.mito.fulltext_index` | -- | -- | The options for full-text index in Mito engine. |
| `region_engine.mito.fulltext_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.apply_on_query` | String | `auto` | Whether to apply the index on query<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.mem_threshold_on_create` | String | `auto` | Memory threshold for index creation.<br/>- `auto`: automatically determine the threshold based on the system memory size (default)<br/>- `unlimited`: no memory limit<br/>- `[size]` e.g. `64MB`: fixed memory threshold |
| `region_engine.mito.bloom_filter_index` | -- | -- | The options for bloom filter index in Mito engine. |
| `region_engine.mito.bloom_filter_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.bloom_filter_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.bloom_filter_index.apply_on_query` | String | `auto` | Whether to apply the index on query<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.bloom_filter_index.mem_threshold_on_create` | String | `auto` | Memory threshold for the index creation.<br/>- `auto`: automatically determine the threshold based on the system memory size (default)<br/>- `unlimited`: no memory limit<br/>- `[size]` e.g. `64MB`: fixed memory threshold |
| `region_engine.mito.memtable` | -- | -- | -- |
| `region_engine.mito.memtable.type` | String | `time_series` | Memtable type.<br/>- `time_series`: time-series memtable<br/>- `partition_tree`: partition tree memtable (experimental) |
| `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.<br/>Only available for `partition_tree` memtable. |

View File

@@ -294,7 +294,7 @@ data_home = "/tmp/greptimedb/"
type = "File"
## Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.
## A local file directory, defaults to `{data_home}`. An empty string means disabling.
## A local file directory, defaults to `{data_home}/object_cache/read`. An empty string means disabling.
## @toml2docs:none-default
#+ cache_path = ""
@@ -478,7 +478,7 @@ auto_flush_interval = "1h"
## Whether to enable the experimental write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance.
enable_experimental_write_cache = false
## File system path for write cache, defaults to `{data_home}`.
## File system path for write cache, defaults to `{data_home}/object_cache/write`.
experimental_write_cache_path = ""
## Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger.
@@ -550,7 +550,7 @@ metadata_cache_size = "64MiB"
content_cache_size = "128MiB"
## Page size for inverted index content cache.
content_cache_page_size = "64KiB"
content_cache_page_size = "8MiB"
## The options for full-text index in Mito engine.
[region_engine.mito.fulltext_index]
@@ -576,30 +576,6 @@ apply_on_query = "auto"
## - `[size]` e.g. `64MB`: fixed memory threshold
mem_threshold_on_create = "auto"
## The options for bloom filter index in Mito engine.
[region_engine.mito.bloom_filter_index]
## Whether to create the index on flush.
## - `auto`: automatically (default)
## - `disable`: never
create_on_flush = "auto"
## Whether to create the index on compaction.
## - `auto`: automatically (default)
## - `disable`: never
create_on_compaction = "auto"
## Whether to apply the index on query
## - `auto`: automatically (default)
## - `disable`: never
apply_on_query = "auto"
## Memory threshold for the index creation.
## - `auto`: automatically determine the threshold based on the system memory size (default)
## - `unlimited`: no memory limit
## - `[size]` e.g. `64MB`: fixed memory threshold
mem_threshold_on_create = "auto"
[region_engine.mito.memtable]
## Memtable type.
## - `time_series`: time-series memtable

View File

@@ -2,10 +2,6 @@
## @toml2docs:none-default
default_timezone = "UTC"
## The maximum in-flight write bytes.
## @toml2docs:none-default
#+ max_in_flight_write_bytes = "500MB"
## The runtime options.
#+ [runtime]
## The number of threads to execute the runtime for global read operations.

View File

@@ -18,10 +18,6 @@ max_concurrent_queries = 0
## Enable telemetry to collect anonymous usage data. Enabled by default.
#+ enable_telemetry = true
## The maximum in-flight write bytes.
## @toml2docs:none-default
#+ max_in_flight_write_bytes = "500MB"
## The runtime options.
#+ [runtime]
## The number of threads to execute the runtime for global read operations.
@@ -593,7 +589,7 @@ metadata_cache_size = "64MiB"
content_cache_size = "128MiB"
## Page size for inverted index content cache.
content_cache_page_size = "64KiB"
content_cache_page_size = "8MiB"
## The options for full-text index in Mito engine.
[region_engine.mito.fulltext_index]
@@ -619,30 +615,6 @@ apply_on_query = "auto"
## - `[size]` e.g. `64MB`: fixed memory threshold
mem_threshold_on_create = "auto"
## The options for bloom filter in Mito engine.
[region_engine.mito.bloom_filter_index]
## Whether to create the bloom filter on flush.
## - `auto`: automatically (default)
## - `disable`: never
create_on_flush = "auto"
## Whether to create the bloom filter on compaction.
## - `auto`: automatically (default)
## - `disable`: never
create_on_compaction = "auto"
## Whether to apply the bloom filter on query
## - `auto`: automatically (default)
## - `disable`: never
apply_on_query = "auto"
## Memory threshold for bloom filter creation.
## - `auto`: automatically determine the threshold based on the system memory size (default)
## - `unlimited`: no memory limit
## - `[size]` e.g. `64MB`: fixed memory threshold
mem_threshold_on_create = "auto"
[region_engine.mito.memtable]
## Memtable type.
## - `time_series`: time-series memtable

View File

@@ -13,6 +13,8 @@ RUN yum install -y epel-release \
openssl \
openssl-devel \
centos-release-scl \
rh-python38 \
rh-python38-python-devel \
which
# Install protoc
@@ -41,6 +43,8 @@ RUN yum install -y epel-release \
openssl \
openssl-devel \
centos-release-scl \
rh-python38 \
rh-python38-python-devel \
which
WORKDIR /greptime

View File

@@ -20,7 +20,10 @@ RUN --mount=type=cache,target=/var/cache/apt \
curl \
git \
build-essential \
pkg-config
pkg-config \
python3.10 \
python3.10-dev \
python3-pip
# Install Rust.
SHELL ["/bin/bash", "-c"]
@@ -43,8 +46,15 @@ ARG OUTPUT_DIR
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get \
-y install ca-certificates \
python3.10 \
python3.10-dev \
python3-pip \
curl
COPY ./docker/python/requirements.txt /etc/greptime/requirements.txt
RUN python3 -m pip install -r /etc/greptime/requirements.txt
WORKDIR /greptime
COPY --from=builder /out/target/${OUTPUT_DIR}/greptime /greptime/bin/
ENV PATH /greptime/bin/:$PATH

View File

@@ -7,7 +7,9 @@ RUN sed -i s/^#.*baseurl=http/baseurl=http/g /etc/yum.repos.d/*.repo
RUN yum install -y epel-release \
openssl \
openssl-devel \
centos-release-scl
centos-release-scl \
rh-python38 \
rh-python38-python-devel
ARG TARGETARCH

View File

@@ -8,8 +8,15 @@ ARG TARGET_BIN=greptime
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \
ca-certificates \
python3.10 \
python3.10-dev \
python3-pip \
curl
COPY $DOCKER_BUILD_ROOT/docker/python/requirements.txt /etc/greptime/requirements.txt
RUN python3 -m pip install -r /etc/greptime/requirements.txt
ARG TARGETARCH
ADD $TARGETARCH/$TARGET_BIN /greptime/bin/

View File

@@ -15,8 +15,8 @@ RUN apt-get update && \
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \
libssl-dev \
tzdata \
protobuf-compiler \
curl \
unzip \
ca-certificates \
git \
build-essential \
@@ -24,20 +24,6 @@ RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \
python3.10 \
python3.10-dev
ARG TARGETPLATFORM
RUN echo "target platform: $TARGETPLATFORM"
# Install protobuf, because the one in the apt is too old (v3.12).
RUN if [ "$TARGETPLATFORM" = "linux/arm64" ]; then \
curl -OL https://github.com/protocolbuffers/protobuf/releases/download/v29.1/protoc-29.1-linux-aarch_64.zip && \
unzip protoc-29.1-linux-aarch_64.zip -d protoc3; \
elif [ "$TARGETPLATFORM" = "linux/amd64" ]; then \
curl -OL https://github.com/protocolbuffers/protobuf/releases/download/v29.1/protoc-29.1-linux-x86_64.zip && \
unzip protoc-29.1-linux-x86_64.zip -d protoc3; \
fi
RUN mv protoc3/bin/* /usr/local/bin/
RUN mv protoc3/include/* /usr/local/include/
# https://github.com/GreptimeTeam/greptimedb/actions/runs/10935485852/job/30357457188#step:3:7106
# `aws-lc-sys` require gcc >= 10.3.0 to work, hence alias to use gcc-10
RUN apt-get remove -y gcc-9 g++-9 cpp-9 && \
@@ -63,7 +49,7 @@ RUN apt-get -y purge python3.8 && \
# wildcard here. However, that requires the git's config files and the submodules all owned by the very same user.
# It's troublesome to do this since the dev build runs in Docker, which is under user "root"; while outside the Docker,
# it can be a different user that have prepared the submodules.
RUN git config --global --add safe.directory '*'
RUN git config --global --add safe.directory *
# Install Python dependencies.
COPY $DOCKER_BUILD_ROOT/docker/python/requirements.txt /etc/greptime/requirements.txt

File diff suppressed because it is too large Load Diff

View File

@@ -1,3 +1,2 @@
[toolchain]
channel = "nightly-2024-10-19"
components = ["rust-analyzer"]

View File

@@ -14,7 +14,6 @@
import os
import re
from multiprocessing import Pool
def find_rust_files(directory):
@@ -34,11 +33,13 @@ def extract_branch_names(file_content):
return pattern.findall(file_content)
def check_snafu_in_files(branch_name, rust_files_content):
def check_snafu_in_files(branch_name, rust_files):
branch_name_snafu = f"{branch_name}Snafu"
for content in rust_files_content.values():
if branch_name_snafu in content:
return True
for rust_file in rust_files:
with open(rust_file, "r") as file:
content = file.read()
if branch_name_snafu in content:
return True
return False
@@ -48,24 +49,19 @@ def main():
for error_file in error_files:
with open(error_file, "r") as file:
branch_names.extend(extract_branch_names(file.read()))
content = file.read()
branch_names.extend(extract_branch_names(content))
# Read all rust files into memory once
rust_files_content = {}
for rust_file in other_rust_files:
with open(rust_file, "r") as file:
rust_files_content[rust_file] = file.read()
unused_snafu = [
branch_name
for branch_name in branch_names
if not check_snafu_in_files(branch_name, other_rust_files)
]
with Pool() as pool:
results = pool.starmap(
check_snafu_in_files, [(bn, rust_files_content) for bn in branch_names]
)
unused_snafu = [bn for bn, found in zip(branch_names, results) if not found]
for name in unused_snafu:
print(name)
if unused_snafu:
print("Unused error variants:")
for name in unused_snafu:
print(name)
raise SystemExit(1)

View File

@@ -1,29 +0,0 @@
let
nixpkgs = fetchTarball "https://github.com/NixOS/nixpkgs/tarball/nixos-24.11";
fenix = import (fetchTarball "https://github.com/nix-community/fenix/archive/main.tar.gz") {};
pkgs = import nixpkgs { config = {}; overlays = []; };
in
pkgs.mkShell rec {
nativeBuildInputs = with pkgs; [
pkg-config
git
clang
gcc
protobuf
mold
(fenix.fromToolchainFile {
dir = ./.;
})
cargo-nextest
taplo
curl
];
buildInputs = with pkgs; [
libgit2
libz
];
LD_LIBRARY_PATH = pkgs.lib.makeLibraryPath buildInputs;
}

View File

@@ -16,7 +16,7 @@ use std::collections::HashMap;
use datatypes::schema::{
ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextOptions, COMMENT_KEY,
FULLTEXT_KEY, INVERTED_INDEX_KEY, SKIPPING_INDEX_KEY,
FULLTEXT_KEY, INVERTED_INDEX_KEY,
};
use greptime_proto::v1::Analyzer;
use snafu::ResultExt;
@@ -29,8 +29,6 @@ use crate::v1::{ColumnDef, ColumnOptions, SemanticType};
const FULLTEXT_GRPC_KEY: &str = "fulltext";
/// Key used to store inverted index options in gRPC column options.
const INVERTED_INDEX_GRPC_KEY: &str = "inverted_index";
/// Key used to store skip index options in gRPC column options.
const SKIPPING_INDEX_GRPC_KEY: &str = "skipping_index";
/// Tries to construct a `ColumnSchema` from the given `ColumnDef`.
pub fn try_as_column_schema(column_def: &ColumnDef) -> Result<ColumnSchema> {
@@ -62,9 +60,6 @@ pub fn try_as_column_schema(column_def: &ColumnDef) -> Result<ColumnSchema> {
if let Some(inverted_index) = options.options.get(INVERTED_INDEX_GRPC_KEY) {
metadata.insert(INVERTED_INDEX_KEY.to_string(), inverted_index.clone());
}
if let Some(skipping_index) = options.options.get(SKIPPING_INDEX_GRPC_KEY) {
metadata.insert(SKIPPING_INDEX_KEY.to_string(), skipping_index.clone());
}
}
ColumnSchema::new(&column_def.name, data_type.into(), column_def.is_nullable)
@@ -89,11 +84,6 @@ pub fn options_from_column_schema(column_schema: &ColumnSchema) -> Option<Column
.options
.insert(INVERTED_INDEX_GRPC_KEY.to_string(), inverted_index.clone());
}
if let Some(skipping_index) = column_schema.metadata().get(SKIPPING_INDEX_KEY) {
options
.options
.insert(SKIPPING_INDEX_GRPC_KEY.to_string(), skipping_index.clone());
}
(!options.options.is_empty()).then_some(options)
}

View File

@@ -25,7 +25,6 @@ pub enum PermissionReq<'a> {
GrpcRequest(&'a Request),
SqlStatement(&'a Statement),
PromQuery,
LogQuery,
Opentsdb,
LineProtocol,
PromStoreWrite,

View File

@@ -11,3 +11,4 @@ common-macro.workspace = true
common-meta.workspace = true
moka.workspace = true
snafu.workspace = true
substrait.workspace = true

View File

@@ -18,6 +18,7 @@ async-stream.workspace = true
async-trait = "0.1"
bytes.workspace = true
common-catalog.workspace = true
common-config.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-meta.workspace = true
@@ -57,5 +58,7 @@ catalog = { workspace = true, features = ["testing"] }
chrono.workspace = true
common-meta = { workspace = true, features = ["testing"] }
common-query = { workspace = true, features = ["testing"] }
common-test-util.workspace = true
log-store.workspace = true
object-store.workspace = true
tokio.workspace = true

View File

@@ -64,13 +64,6 @@ pub enum Error {
source: BoxedError,
},
#[snafu(display("Failed to list flow stats"))]
ListFlowStats {
#[snafu(implicit)]
location: Location,
source: BoxedError,
},
#[snafu(display("Failed to list flows in catalog {catalog}"))]
ListFlows {
#[snafu(implicit)]
@@ -333,7 +326,6 @@ impl ErrorExt for Error {
| Error::ListSchemas { source, .. }
| Error::ListTables { source, .. }
| Error::ListFlows { source, .. }
| Error::ListFlowStats { source, .. }
| Error::ListProcedures { source, .. }
| Error::ListRegionStats { source, .. }
| Error::ConvertProtoData { source, .. } => source.status_code(),

View File

@@ -17,7 +17,6 @@ use common_error::ext::BoxedError;
use common_meta::cluster::{ClusterInfo, NodeInfo};
use common_meta::datanode::RegionStat;
use common_meta::ddl::{ExecutorContext, ProcedureExecutor};
use common_meta::key::flow::flow_state::FlowStat;
use common_meta::rpc::procedure;
use common_procedure::{ProcedureInfo, ProcedureState};
use meta_client::MetaClientRef;
@@ -90,12 +89,4 @@ impl InformationExtension for DistributedInformationExtension {
.map_err(BoxedError::new)
.context(error::ListRegionStatsSnafu)
}
async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
self.meta_client
.list_flow_stats()
.await
.map_err(BoxedError::new)
.context(crate::error::ListFlowStatsSnafu)
}
}

View File

@@ -38,7 +38,7 @@ pub fn new_table_cache(
) -> TableCache {
let init = init_factory(table_info_cache, table_name_cache);
CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter))
}
fn init_factory(

View File

@@ -35,7 +35,6 @@ use common_catalog::consts::{self, DEFAULT_CATALOG_NAME, INFORMATION_SCHEMA_NAME
use common_error::ext::ErrorExt;
use common_meta::cluster::NodeInfo;
use common_meta::datanode::RegionStat;
use common_meta::key::flow::flow_state::FlowStat;
use common_meta::key::flow::FlowMetadataManager;
use common_procedure::ProcedureInfo;
use common_recordbatch::SendableRecordBatchStream;
@@ -193,7 +192,6 @@ impl SystemSchemaProviderInner for InformationSchemaProvider {
)) as _),
FLOWS => Some(Arc::new(InformationSchemaFlows::new(
self.catalog_name.clone(),
self.catalog_manager.clone(),
self.flow_metadata_manager.clone(),
)) as _),
PROCEDURE_INFO => Some(
@@ -340,9 +338,6 @@ pub trait InformationExtension {
/// Gets the region statistics.
async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error>;
/// Get the flow statistics. If no flownode is available, return `None`.
async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error>;
}
pub struct NoopInformationExtension;
@@ -362,8 +357,4 @@ impl InformationExtension for NoopInformationExtension {
async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
Ok(vec![])
}
async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
Ok(None)
}
}

View File

@@ -12,12 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::{Arc, Weak};
use std::sync::Arc;
use common_catalog::consts::INFORMATION_SCHEMA_FLOW_TABLE_ID;
use common_error::ext::BoxedError;
use common_meta::key::flow::flow_info::FlowInfoValue;
use common_meta::key::flow::flow_state::FlowStat;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::FlowId;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
@@ -29,9 +28,7 @@ use datatypes::prelude::ConcreteDataType as CDT;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{
Int64VectorBuilder, StringVectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder, VectorRef,
};
use datatypes::vectors::{Int64VectorBuilder, StringVectorBuilder, UInt32VectorBuilder, VectorRef};
use futures::TryStreamExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId};
@@ -41,8 +38,6 @@ use crate::error::{
};
use crate::information_schema::{Predicates, FLOWS};
use crate::system_schema::information_schema::InformationTable;
use crate::system_schema::utils;
use crate::CatalogManager;
const INIT_CAPACITY: usize = 42;
@@ -50,7 +45,6 @@ const INIT_CAPACITY: usize = 42;
// pk is (flow_name, flow_id, table_catalog)
pub const FLOW_NAME: &str = "flow_name";
pub const FLOW_ID: &str = "flow_id";
pub const STATE_SIZE: &str = "state_size";
pub const TABLE_CATALOG: &str = "table_catalog";
pub const FLOW_DEFINITION: &str = "flow_definition";
pub const COMMENT: &str = "comment";
@@ -61,24 +55,20 @@ pub const FLOWNODE_IDS: &str = "flownode_ids";
pub const OPTIONS: &str = "options";
/// The `information_schema.flows` to provides information about flows in databases.
///
pub(super) struct InformationSchemaFlows {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
flow_metadata_manager: Arc<FlowMetadataManager>,
}
impl InformationSchemaFlows {
pub(super) fn new(
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
flow_metadata_manager: Arc<FlowMetadataManager>,
) -> Self {
Self {
schema: Self::schema(),
catalog_name,
catalog_manager,
flow_metadata_manager,
}
}
@@ -90,7 +80,6 @@ impl InformationSchemaFlows {
vec![
(FLOW_NAME, CDT::string_datatype(), false),
(FLOW_ID, CDT::uint32_datatype(), false),
(STATE_SIZE, CDT::uint64_datatype(), true),
(TABLE_CATALOG, CDT::string_datatype(), false),
(FLOW_DEFINITION, CDT::string_datatype(), false),
(COMMENT, CDT::string_datatype(), true),
@@ -110,7 +99,6 @@ impl InformationSchemaFlows {
InformationSchemaFlowsBuilder::new(
self.schema.clone(),
self.catalog_name.clone(),
self.catalog_manager.clone(),
&self.flow_metadata_manager,
)
}
@@ -156,12 +144,10 @@ impl InformationTable for InformationSchemaFlows {
struct InformationSchemaFlowsBuilder {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
flow_metadata_manager: Arc<FlowMetadataManager>,
flow_names: StringVectorBuilder,
flow_ids: UInt32VectorBuilder,
state_sizes: UInt64VectorBuilder,
table_catalogs: StringVectorBuilder,
raw_sqls: StringVectorBuilder,
comments: StringVectorBuilder,
@@ -176,18 +162,15 @@ impl InformationSchemaFlowsBuilder {
fn new(
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
flow_metadata_manager: &Arc<FlowMetadataManager>,
) -> Self {
Self {
schema,
catalog_name,
catalog_manager,
flow_metadata_manager: flow_metadata_manager.clone(),
flow_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
flow_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
state_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
table_catalogs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
raw_sqls: StringVectorBuilder::with_capacity(INIT_CAPACITY),
comments: StringVectorBuilder::with_capacity(INIT_CAPACITY),
@@ -212,11 +195,6 @@ impl InformationSchemaFlowsBuilder {
.flow_names(&catalog_name)
.await;
let flow_stat = {
let information_extension = utils::information_extension(&self.catalog_manager)?;
information_extension.flow_stats().await?
};
while let Some((flow_name, flow_id)) = stream
.try_next()
.await
@@ -235,7 +213,7 @@ impl InformationSchemaFlowsBuilder {
catalog_name: catalog_name.to_string(),
flow_name: flow_name.to_string(),
})?;
self.add_flow(&predicates, flow_id.flow_id(), flow_info, &flow_stat)?;
self.add_flow(&predicates, flow_id.flow_id(), flow_info)?;
}
self.finish()
@@ -246,7 +224,6 @@ impl InformationSchemaFlowsBuilder {
predicates: &Predicates,
flow_id: FlowId,
flow_info: FlowInfoValue,
flow_stat: &Option<FlowStat>,
) -> Result<()> {
let row = [
(FLOW_NAME, &Value::from(flow_info.flow_name().to_string())),
@@ -261,11 +238,6 @@ impl InformationSchemaFlowsBuilder {
}
self.flow_names.push(Some(flow_info.flow_name()));
self.flow_ids.push(Some(flow_id));
self.state_sizes.push(
flow_stat
.as_ref()
.and_then(|state| state.state_size.get(&flow_id).map(|v| *v as u64)),
);
self.table_catalogs.push(Some(flow_info.catalog_name()));
self.raw_sqls.push(Some(flow_info.raw_sql()));
self.comments.push(Some(flow_info.comment()));
@@ -298,7 +270,6 @@ impl InformationSchemaFlowsBuilder {
let columns: Vec<VectorRef> = vec![
Arc::new(self.flow_names.finish()),
Arc::new(self.flow_ids.finish()),
Arc::new(self.state_sizes.finish()),
Arc::new(self.table_catalogs.finish()),
Arc::new(self.raw_sqls.finish()),
Arc::new(self.comments.finish()),

View File

@@ -54,10 +54,6 @@ const INIT_CAPACITY: usize = 42;
pub(crate) const PRI_CONSTRAINT_NAME: &str = "PRIMARY";
/// Time index constraint name
pub(crate) const TIME_INDEX_CONSTRAINT_NAME: &str = "TIME INDEX";
/// Inverted index constraint name
pub(crate) const INVERTED_INDEX_CONSTRAINT_NAME: &str = "INVERTED INDEX";
/// Fulltext index constraint name
pub(crate) const FULLTEXT_INDEX_CONSTRAINT_NAME: &str = "FULLTEXT INDEX";
/// The virtual table implementation for `information_schema.KEY_COLUMN_USAGE`.
pub(super) struct InformationSchemaKeyColumnUsage {
@@ -220,13 +216,14 @@ impl InformationSchemaKeyColumnUsageBuilder {
let mut stream = catalog_manager.tables(&catalog_name, &schema_name, None);
while let Some(table) = stream.try_next().await? {
let mut primary_constraints = vec![];
let table_info = table.table_info();
let table_name = &table_info.name;
let keys = &table_info.meta.primary_key_indices;
let schema = table.schema();
for (idx, column) in schema.column_schemas().iter().enumerate() {
let mut constraints = vec![];
if column.is_time_index() {
self.add_key_column_usage(
&predicates,
@@ -239,31 +236,30 @@ impl InformationSchemaKeyColumnUsageBuilder {
1, //always 1 for time index
);
}
// TODO(dimbtp): foreign key constraint not supported yet
if keys.contains(&idx) {
constraints.push(PRI_CONSTRAINT_NAME);
}
if column.is_inverted_indexed() {
constraints.push(INVERTED_INDEX_CONSTRAINT_NAME);
primary_constraints.push((
catalog_name.clone(),
schema_name.clone(),
table_name.to_string(),
column.name.clone(),
));
}
// TODO(dimbtp): foreign key constraint not supported yet
}
if column.has_fulltext_index_key() {
constraints.push(FULLTEXT_INDEX_CONSTRAINT_NAME);
}
if !constraints.is_empty() {
let aggregated_constraints = constraints.join(", ");
self.add_key_column_usage(
&predicates,
&schema_name,
&aggregated_constraints,
&catalog_name,
&schema_name,
table_name,
&column.name,
idx as u32 + 1,
);
}
for (i, (catalog_name, schema_name, table_name, column_name)) in
primary_constraints.into_iter().enumerate()
{
self.add_key_column_usage(
&predicates,
&schema_name,
PRI_CONSTRAINT_NAME,
&catalog_name,
&schema_name,
&table_name,
&column_name,
i as u32 + 1,
);
}
}
}

View File

@@ -15,7 +15,7 @@ cache.workspace = true
catalog.workspace = true
chrono.workspace = true
clap.workspace = true
client = { workspace = true, features = ["testing"] }
client.workspace = true
common-base.workspace = true
common-catalog.workspace = true
common-config.workspace = true
@@ -23,6 +23,7 @@ common-error.workspace = true
common-grpc.workspace = true
common-macro.workspace = true
common-meta.workspace = true
common-options.workspace = true
common-procedure.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true
@@ -56,7 +57,9 @@ tokio.workspace = true
tracing-appender.workspace = true
[dev-dependencies]
client = { workspace = true, features = ["testing"] }
common-test-util.workspace = true
common-version.workspace = true
serde.workspace = true
temp-env = "0.3"
tempfile.workspace = true

View File

@@ -34,7 +34,7 @@ use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::debug;
use either::Either;
use meta_client::client::{ClusterKvBackend, MetaClientBuilder};
use meta_client::client::MetaClientBuilder;
use query::datafusion::DatafusionQueryEngine;
use query::parser::QueryLanguageParser;
use query::query_engine::{DefaultSerializer, QueryEngineState};

View File

@@ -42,6 +42,8 @@ tonic.workspace = true
[dev-dependencies]
common-grpc-expr.workspace = true
datanode.workspace = true
derive-new = "0.5"
tracing = "0.1"
[dev-dependencies.substrait_proto]

View File

@@ -59,6 +59,10 @@ impl Instance {
}
}
pub fn datanode_mut(&mut self) -> &mut Datanode {
&mut self.datanode
}
pub fn datanode(&self) -> &Datanode {
&self.datanode
}

View File

@@ -63,6 +63,10 @@ impl Instance {
}
}
pub fn flownode_mut(&mut self) -> &mut FlownodeInstance {
&mut self.flownode
}
pub fn flownode(&self) -> &FlownodeInstance {
&self.flownode
}

View File

@@ -22,7 +22,6 @@ use catalog::information_schema::InformationExtension;
use catalog::kvbackend::KvBackendCatalogManager;
use clap::Parser;
use client::api::v1::meta::RegionRole;
use common_base::readable_size::ReadableSize;
use common_base::Plugins;
use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
use common_config::{metadata_store_dir, Configurable, KvBackendConfig};
@@ -35,7 +34,6 @@ use common_meta::ddl::flow_meta::{FlowMetadataAllocator, FlowMetadataAllocatorRe
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl, ProcedureExecutorRef};
use common_meta::ddl_manager::DdlManager;
use common_meta::key::flow::flow_state::FlowStat;
use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
@@ -72,7 +70,7 @@ use servers::http::HttpOptions;
use servers::tls::{TlsMode, TlsOption};
use servers::Mode;
use snafu::ResultExt;
use tokio::sync::{broadcast, RwLock};
use tokio::sync::broadcast;
use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{
@@ -153,7 +151,6 @@ pub struct StandaloneOptions {
pub tracing: TracingOptions,
pub init_regions_in_background: bool,
pub init_regions_parallelism: usize,
pub max_in_flight_write_bytes: Option<ReadableSize>,
}
impl Default for StandaloneOptions {
@@ -183,7 +180,6 @@ impl Default for StandaloneOptions {
tracing: TracingOptions::default(),
init_regions_in_background: false,
init_regions_parallelism: 16,
max_in_flight_write_bytes: None,
}
}
}
@@ -221,7 +217,6 @@ impl StandaloneOptions {
user_provider: cloned_opts.user_provider,
// Handle the export metrics task run by standalone to frontend for execution
export_metrics: cloned_opts.export_metrics,
max_in_flight_write_bytes: cloned_opts.max_in_flight_write_bytes,
..Default::default()
}
}
@@ -512,7 +507,7 @@ impl StartCommand {
procedure_manager.clone(),
));
let catalog_manager = KvBackendCatalogManager::new(
information_extension.clone(),
information_extension,
kv_backend.clone(),
layered_cache_registry.clone(),
Some(procedure_manager.clone()),
@@ -537,14 +532,6 @@ impl StartCommand {
.context(OtherSnafu)?,
);
// set the ref to query for the local flow state
{
let flow_worker_manager = flownode.flow_worker_manager();
information_extension
.set_flow_worker_manager(flow_worker_manager.clone())
.await;
}
let node_manager = Arc::new(StandaloneDatanodeManager {
region_server: datanode.region_server(),
flow_server: flownode.flow_worker_manager(),
@@ -682,7 +669,6 @@ pub struct StandaloneInformationExtension {
region_server: RegionServer,
procedure_manager: ProcedureManagerRef,
start_time_ms: u64,
flow_worker_manager: RwLock<Option<Arc<FlowWorkerManager>>>,
}
impl StandaloneInformationExtension {
@@ -691,15 +677,8 @@ impl StandaloneInformationExtension {
region_server,
procedure_manager,
start_time_ms: common_time::util::current_time_millis() as u64,
flow_worker_manager: RwLock::new(None),
}
}
/// Set the flow worker manager for the standalone instance.
pub async fn set_flow_worker_manager(&self, flow_worker_manager: Arc<FlowWorkerManager>) {
let mut guard = self.flow_worker_manager.write().await;
*guard = Some(flow_worker_manager);
}
}
#[async_trait::async_trait]
@@ -771,18 +750,6 @@ impl InformationExtension for StandaloneInformationExtension {
.collect::<Vec<_>>();
Ok(stats)
}
async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
Ok(Some(
self.flow_worker_manager
.read()
.await
.as_ref()
.unwrap()
.gen_state_report()
.await,
))
}
}
#[cfg(test)]

View File

@@ -17,7 +17,6 @@ common-macro.workspace = true
futures.workspace = true
paste = "1.0"
pin-project.workspace = true
rand.workspace = true
serde = { version = "1.0", features = ["derive"] }
snafu.workspace = true
tokio.workspace = true

View File

@@ -8,5 +8,10 @@ license.workspace = true
workspace = true
[dependencies]
common-error.workspace = true
common-macro.workspace = true
snafu.workspace = true
[dev-dependencies]
chrono.workspace = true
tokio.workspace = true

View File

@@ -48,4 +48,5 @@ url = "2.3"
[dev-dependencies]
common-telemetry.workspace = true
common-test-util.workspace = true
dotenv.workspace = true
uuid.workspace = true

View File

@@ -27,7 +27,7 @@ pub fn build_fs_backend(root: &str) -> Result<ObjectStore> {
DefaultLoggingInterceptor,
))
.layer(object_store::layers::TracingLayer)
.layer(object_store::layers::build_prometheus_metrics_layer(true))
.layer(object_store::layers::PrometheusMetricsLayer::new(true))
.finish();
Ok(object_store)
}

View File

@@ -89,7 +89,7 @@ pub fn build_s3_backend(
DefaultLoggingInterceptor,
))
.layer(object_store::layers::TracingLayer)
.layer(object_store::layers::build_prometheus_metrics_layer(true))
.layer(object_store::layers::PrometheusMetricsLayer::new(true))
.finish())
}

View File

@@ -35,23 +35,10 @@ data = {
"bigint_other": [5, -5, 1, 5, 5],
"utf8_increase": ["a", "bb", "ccc", "dddd", "eeeee"],
"utf8_decrease": ["eeeee", "dddd", "ccc", "bb", "a"],
"timestamp_simple": [
datetime.datetime(2023, 4, 1, 20, 15, 30, 2000),
datetime.datetime.fromtimestamp(int("1629617204525777000") / 1000000000),
datetime.datetime(2023, 1, 1),
datetime.datetime(2023, 2, 1),
datetime.datetime(2023, 3, 1),
],
"date_simple": [
datetime.date(2023, 4, 1),
datetime.date(2023, 3, 1),
datetime.date(2023, 1, 1),
datetime.date(2023, 2, 1),
datetime.date(2023, 3, 1),
],
"timestamp_simple": [datetime.datetime(2023, 4, 1, 20, 15, 30, 2000), datetime.datetime.fromtimestamp(int('1629617204525777000')/1000000000), datetime.datetime(2023, 1, 1), datetime.datetime(2023, 2, 1), datetime.datetime(2023, 3, 1)],
"date_simple": [datetime.date(2023, 4, 1), datetime.date(2023, 3, 1), datetime.date(2023, 1, 1), datetime.date(2023, 2, 1), datetime.date(2023, 3, 1)]
}
def infer_schema(data):
schema = "struct<"
for key, value in data.items():
@@ -69,7 +56,7 @@ def infer_schema(data):
elif key.startswith("date"):
dt = "date"
else:
print(key, value, dt)
print(key,value,dt)
raise NotImplementedError
if key.startswith("double"):
dt = "double"
@@ -81,6 +68,7 @@ def infer_schema(data):
return schema
def _write(
schema: str,
data,

View File

@@ -8,7 +8,6 @@ license.workspace = true
workspace = true
[dependencies]
http.workspace = true
snafu.workspace = true
strum.workspace = true
tonic.workspace = true

View File

@@ -18,30 +18,9 @@ pub mod ext;
pub mod mock;
pub mod status_code;
use http::{HeaderMap, HeaderValue};
pub use snafu;
// HACK - these headers are here for shared in gRPC services. For common HTTP headers,
// please define in `src/servers/src/http/header.rs`.
pub const GREPTIME_DB_HEADER_ERROR_CODE: &str = "x-greptime-err-code";
pub const GREPTIME_DB_HEADER_ERROR_MSG: &str = "x-greptime-err-msg";
/// Create a http header map from error code and message.
/// using `GREPTIME_DB_HEADER_ERROR_CODE` and `GREPTIME_DB_HEADER_ERROR_MSG` as keys.
pub fn from_err_code_msg_to_header(code: u32, msg: &str) -> HeaderMap {
let mut header = HeaderMap::new();
let msg = HeaderValue::from_str(msg).unwrap_or_else(|_| {
HeaderValue::from_bytes(
&msg.as_bytes()
.iter()
.flat_map(|b| std::ascii::escape_default(*b))
.collect::<Vec<u8>>(),
)
.expect("Already escaped string should be valid ascii")
});
header.insert(GREPTIME_DB_HEADER_ERROR_CODE, code.into());
header.insert(GREPTIME_DB_HEADER_ERROR_MSG, msg);
header
}

View File

@@ -5,7 +5,12 @@ edition.workspace = true
license.workspace = true
[dependencies]
api.workspace = true
async-trait.workspace = true
common-base.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-query.workspace = true
session.workspace = true
snafu.workspace = true
sql.workspace = true

View File

@@ -33,7 +33,7 @@ geo-types = { version = "0.7", optional = true }
geohash = { version = "0.13", optional = true }
h3o = { version = "0.6", optional = true }
jsonb.workspace = true
nalgebra.workspace = true
nalgebra = "0.33"
num = "0.4"
num-traits = "0.2"
once_cell.workspace = true
@@ -51,5 +51,6 @@ wkt = { version = "0.11", optional = true }
[dev-dependencies]
approx = "0.5"
ron = "0.7"
serde = { version = "1.0", features = ["derive"] }
tokio.workspace = true

View File

@@ -26,4 +26,3 @@ pub mod function_registry;
pub mod handlers;
pub mod helper;
pub mod state;
pub mod utils;

View File

@@ -32,7 +32,6 @@ pub use scipy_stats_norm_cdf::ScipyStatsNormCdfAccumulatorCreator;
pub use scipy_stats_norm_pdf::ScipyStatsNormPdfAccumulatorCreator;
use crate::function_registry::FunctionRegistry;
use crate::scalars::vector::sum::VectorSumCreator;
/// A function creates `AggregateFunctionCreator`.
/// "Aggregator" *is* AggregatorFunction. Since the later one is long, we named an short alias for it.
@@ -92,7 +91,6 @@ impl AggregateFunctions {
register_aggr_func!("argmin", 1, ArgminAccumulatorCreator);
register_aggr_func!("scipystatsnormcdf", 2, ScipyStatsNormCdfAccumulatorCreator);
register_aggr_func!("scipystatsnormpdf", 2, ScipyStatsNormPdfAccumulatorCreator);
register_aggr_func!("vec_sum", 1, VectorSumCreator);
#[cfg(feature = "geo")]
register_aggr_func!(

View File

@@ -204,10 +204,20 @@ impl PatternAst {
fn convert_literal(column: &str, pattern: &str) -> Expr {
logical_expr::col(column).like(logical_expr::lit(format!(
"%{}%",
crate::utils::escape_like_pattern(pattern)
Self::escape_pattern(pattern)
)))
}
fn escape_pattern(pattern: &str) -> String {
pattern
.chars()
.flat_map(|c| match c {
'\\' | '%' | '_' => vec!['\\', c],
_ => vec![c],
})
.collect::<String>()
}
/// Transform this AST with preset rules to make it correct.
fn transform_ast(self) -> Result<Self> {
self.transform_up(Self::collapse_binary_branch_fn)

View File

@@ -14,14 +14,8 @@
mod convert;
mod distance;
mod elem_sum;
pub mod impl_conv;
pub(crate) mod impl_conv;
mod scalar_add;
mod scalar_mul;
mod sub;
pub(crate) mod sum;
mod vector_div;
mod vector_mul;
use std::sync::Arc;
@@ -42,12 +36,5 @@ impl VectorFunction {
// scalar calculation
registry.register(Arc::new(scalar_add::ScalarAddFunction));
registry.register(Arc::new(scalar_mul::ScalarMulFunction));
// vector calculation
registry.register(Arc::new(vector_mul::VectorMulFunction));
registry.register(Arc::new(vector_div::VectorDivFunction));
registry.register(Arc::new(sub::SubFunction));
registry.register(Arc::new(elem_sum::ElemSumFunction));
}
}

View File

@@ -1,129 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::borrow::Cow;
use std::fmt::Display;
use common_query::error::InvalidFuncArgsSnafu;
use common_query::prelude::{Signature, TypeSignature, Volatility};
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{Float32VectorBuilder, MutableVector, VectorRef};
use nalgebra::DVectorView;
use snafu::ensure;
use crate::function::{Function, FunctionContext};
use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const};
const NAME: &str = "vec_elem_sum";
#[derive(Debug, Clone, Default)]
pub struct ElemSumFunction;
impl Function for ElemSumFunction {
fn name(&self) -> &str {
NAME
}
fn return_type(
&self,
_input_types: &[ConcreteDataType],
) -> common_query::error::Result<ConcreteDataType> {
Ok(ConcreteDataType::float32_datatype())
}
fn signature(&self) -> Signature {
Signature::one_of(
vec![
TypeSignature::Exact(vec![ConcreteDataType::string_datatype()]),
TypeSignature::Exact(vec![ConcreteDataType::binary_datatype()]),
],
Volatility::Immutable,
)
}
fn eval(
&self,
_func_ctx: FunctionContext,
columns: &[VectorRef],
) -> common_query::error::Result<VectorRef> {
ensure!(
columns.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly one, have: {}",
columns.len()
)
}
);
let arg0 = &columns[0];
let len = arg0.len();
let mut result = Float32VectorBuilder::with_capacity(len);
if len == 0 {
return Ok(result.to_vector());
}
let arg0_const = as_veclit_if_const(arg0)?;
for i in 0..len {
let arg0 = match arg0_const.as_ref() {
Some(arg0) => Some(Cow::Borrowed(arg0.as_ref())),
None => as_veclit(arg0.get_ref(i))?,
};
let Some(arg0) = arg0 else {
result.push_null();
continue;
};
result.push(Some(DVectorView::from_slice(&arg0, arg0.len()).sum()));
}
Ok(result.to_vector())
}
}
impl Display for ElemSumFunction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", NAME.to_ascii_uppercase())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use datatypes::vectors::StringVector;
use super::*;
use crate::function::FunctionContext;
#[test]
fn test_elem_sum() {
let func = ElemSumFunction;
let input0 = Arc::new(StringVector::from(vec![
Some("[1.0,2.0,3.0]".to_string()),
Some("[4.0,5.0,6.0]".to_string()),
None,
]));
let result = func.eval(FunctionContext::default(), &[input0]).unwrap();
let result = result.as_ref();
assert_eq!(result.len(), 3);
assert_eq!(result.get_ref(0).as_f32().unwrap(), Some(6.0));
assert_eq!(result.get_ref(1).as_f32().unwrap(), Some(15.0));
assert_eq!(result.get_ref(2).as_f32().unwrap(), None);
}
}

View File

@@ -1,173 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::borrow::Cow;
use std::fmt::Display;
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::Signature;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{BinaryVectorBuilder, MutableVector, VectorRef};
use nalgebra::DVectorView;
use snafu::ensure;
use crate::function::{Function, FunctionContext};
use crate::helper;
use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const, veclit_to_binlit};
const NAME: &str = "vec_scalar_mul";
/// Multiples a scalar to each element of a vector.
///
/// # Example
///
/// ```sql
/// SELECT vec_to_string(vec_scalar_mul(2, "[1, 2, 3]")) as result;
///
/// +---------+
/// | result |
/// +---------+
/// | [2,4,6] |
/// +---------+
///
/// -- 1/scalar to simulate division
/// SELECT vec_to_string(vec_scalar_mul(0.5, "[2, 4, 6]")) as result;
///
/// +---------+
/// | result |
/// +---------+
/// | [1,2,3] |
/// +---------+
/// ```
#[derive(Debug, Clone, Default)]
pub struct ScalarMulFunction;
impl Function for ScalarMulFunction {
fn name(&self) -> &str {
NAME
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::binary_datatype())
}
fn signature(&self) -> Signature {
helper::one_of_sigs2(
vec![ConcreteDataType::float64_datatype()],
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::binary_datatype(),
],
)
}
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly two, have: {}",
columns.len()
),
}
);
let arg0 = &columns[0];
let arg1 = &columns[1];
let len = arg0.len();
let mut result = BinaryVectorBuilder::with_capacity(len);
if len == 0 {
return Ok(result.to_vector());
}
let arg1_const = as_veclit_if_const(arg1)?;
for i in 0..len {
let arg0 = arg0.get(i).as_f64_lossy();
let Some(arg0) = arg0 else {
result.push_null();
continue;
};
let arg1 = match arg1_const.as_ref() {
Some(arg1) => Some(Cow::Borrowed(arg1.as_ref())),
None => as_veclit(arg1.get_ref(i))?,
};
let Some(arg1) = arg1 else {
result.push_null();
continue;
};
let vec = DVectorView::from_slice(&arg1, arg1.len());
let vec_res = vec.scale(arg0 as _);
let veclit = vec_res.as_slice();
let binlit = veclit_to_binlit(veclit);
result.push(Some(&binlit));
}
Ok(result.to_vector())
}
}
impl Display for ScalarMulFunction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", NAME.to_ascii_uppercase())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use datatypes::vectors::{Float32Vector, StringVector};
use super::*;
#[test]
fn test_scalar_mul() {
let func = ScalarMulFunction;
let input0 = Arc::new(Float32Vector::from(vec![
Some(2.0),
Some(-0.5),
None,
Some(3.0),
]));
let input1 = Arc::new(StringVector::from(vec![
Some("[1.0,2.0,3.0]".to_string()),
Some("[8.0,10.0,12.0]".to_string()),
Some("[7.0,8.0,9.0]".to_string()),
None,
]));
let result = func
.eval(FunctionContext::default(), &[input0, input1])
.unwrap();
let result = result.as_ref();
assert_eq!(result.len(), 4);
assert_eq!(
result.get_ref(0).as_binary().unwrap(),
Some(veclit_to_binlit(&[2.0, 4.0, 6.0]).as_slice())
);
assert_eq!(
result.get_ref(1).as_binary().unwrap(),
Some(veclit_to_binlit(&[-4.0, -5.0, -6.0]).as_slice())
);
assert!(result.get_ref(2).is_null());
assert!(result.get_ref(3).is_null());
}
}

View File

@@ -1,223 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::borrow::Cow;
use std::fmt::Display;
use common_query::error::InvalidFuncArgsSnafu;
use common_query::prelude::Signature;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{BinaryVectorBuilder, MutableVector, VectorRef};
use nalgebra::DVectorView;
use snafu::ensure;
use crate::function::{Function, FunctionContext};
use crate::helper;
use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const, veclit_to_binlit};
const NAME: &str = "vec_sub";
/// Subtracts corresponding elements of two vectors, returns a vector.
///
/// # Example
///
/// ```sql
/// SELECT vec_to_string(vec_sub("[1.0, 1.0]", "[1.0, 2.0]")) as result;
///
/// +---------------------------------------------------------------+
/// | vec_to_string(vec_sub(Utf8("[1.0, 1.0]"),Utf8("[1.0, 2.0]"))) |
/// +---------------------------------------------------------------+
/// | [0,-1] |
/// +---------------------------------------------------------------+
///
/// -- Negative scalar to simulate subtraction
/// SELECT vec_to_string(vec_sub('[-1.0, -1.0]', '[1.0, 2.0]'));
///
/// +-----------------------------------------------------------------+
/// | vec_to_string(vec_sub(Utf8("[-1.0, -1.0]"),Utf8("[1.0, 2.0]"))) |
/// +-----------------------------------------------------------------+
/// | [-2,-3] |
/// +-----------------------------------------------------------------+
///
#[derive(Debug, Clone, Default)]
pub struct SubFunction;
impl Function for SubFunction {
fn name(&self) -> &str {
NAME
}
fn return_type(
&self,
_input_types: &[ConcreteDataType],
) -> common_query::error::Result<ConcreteDataType> {
Ok(ConcreteDataType::binary_datatype())
}
fn signature(&self) -> Signature {
helper::one_of_sigs2(
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::binary_datatype(),
],
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::binary_datatype(),
],
)
}
fn eval(
&self,
_func_ctx: FunctionContext,
columns: &[VectorRef],
) -> common_query::error::Result<VectorRef> {
ensure!(
columns.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly two, have: {}",
columns.len()
)
}
);
let arg0 = &columns[0];
let arg1 = &columns[1];
ensure!(
arg0.len() == arg1.len(),
InvalidFuncArgsSnafu {
err_msg: format!(
"The lengths of the vector are not aligned, args 0: {}, args 1: {}",
arg0.len(),
arg1.len(),
)
}
);
let len = arg0.len();
let mut result = BinaryVectorBuilder::with_capacity(len);
if len == 0 {
return Ok(result.to_vector());
}
let arg0_const = as_veclit_if_const(arg0)?;
let arg1_const = as_veclit_if_const(arg1)?;
for i in 0..len {
let arg0 = match arg0_const.as_ref() {
Some(arg0) => Some(Cow::Borrowed(arg0.as_ref())),
None => as_veclit(arg0.get_ref(i))?,
};
let arg1 = match arg1_const.as_ref() {
Some(arg1) => Some(Cow::Borrowed(arg1.as_ref())),
None => as_veclit(arg1.get_ref(i))?,
};
let (Some(arg0), Some(arg1)) = (arg0, arg1) else {
result.push_null();
continue;
};
let vec0 = DVectorView::from_slice(&arg0, arg0.len());
let vec1 = DVectorView::from_slice(&arg1, arg1.len());
let vec_res = vec0 - vec1;
let veclit = vec_res.as_slice();
let binlit = veclit_to_binlit(veclit);
result.push(Some(&binlit));
}
Ok(result.to_vector())
}
}
impl Display for SubFunction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", NAME.to_ascii_uppercase())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_query::error::Error;
use datatypes::vectors::StringVector;
use super::*;
#[test]
fn test_sub() {
let func = SubFunction;
let input0 = Arc::new(StringVector::from(vec![
Some("[1.0,2.0,3.0]".to_string()),
Some("[4.0,5.0,6.0]".to_string()),
None,
Some("[2.0,3.0,3.0]".to_string()),
]));
let input1 = Arc::new(StringVector::from(vec![
Some("[1.0,1.0,1.0]".to_string()),
Some("[6.0,5.0,4.0]".to_string()),
Some("[3.0,2.0,2.0]".to_string()),
None,
]));
let result = func
.eval(FunctionContext::default(), &[input0, input1])
.unwrap();
let result = result.as_ref();
assert_eq!(result.len(), 4);
assert_eq!(
result.get_ref(0).as_binary().unwrap(),
Some(veclit_to_binlit(&[0.0, 1.0, 2.0]).as_slice())
);
assert_eq!(
result.get_ref(1).as_binary().unwrap(),
Some(veclit_to_binlit(&[-2.0, 0.0, 2.0]).as_slice())
);
assert!(result.get_ref(2).is_null());
assert!(result.get_ref(3).is_null());
}
#[test]
fn test_sub_error() {
let func = SubFunction;
let input0 = Arc::new(StringVector::from(vec![
Some("[1.0,2.0,3.0]".to_string()),
Some("[4.0,5.0,6.0]".to_string()),
None,
Some("[2.0,3.0,3.0]".to_string()),
]));
let input1 = Arc::new(StringVector::from(vec![
Some("[1.0,1.0,1.0]".to_string()),
Some("[6.0,5.0,4.0]".to_string()),
Some("[3.0,2.0,2.0]".to_string()),
]));
let result = func.eval(FunctionContext::default(), &[input0, input1]);
match result {
Err(Error::InvalidFuncArgs { err_msg, .. }) => {
assert_eq!(
err_msg,
"The lengths of the vector are not aligned, args 0: 4, args 1: 3"
)
}
_ => unreachable!(),
}
}
}

View File

@@ -1,202 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_macro::{as_aggr_func_creator, AggrFuncTypeStore};
use common_query::error::{CreateAccumulatorSnafu, Error, InvalidFuncArgsSnafu};
use common_query::logical_plan::{Accumulator, AggregateFunctionCreator};
use common_query::prelude::AccumulatorCreatorFunction;
use datatypes::prelude::{ConcreteDataType, Value, *};
use datatypes::vectors::VectorRef;
use nalgebra::{Const, DVectorView, Dyn, OVector};
use snafu::ensure;
use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const, veclit_to_binlit};
#[derive(Debug, Default)]
pub struct VectorSum {
sum: Option<OVector<f32, Dyn>>,
has_null: bool,
}
#[as_aggr_func_creator]
#[derive(Debug, Default, AggrFuncTypeStore)]
pub struct VectorSumCreator {}
impl AggregateFunctionCreator for VectorSumCreator {
fn creator(&self) -> AccumulatorCreatorFunction {
let creator: AccumulatorCreatorFunction = Arc::new(move |types: &[ConcreteDataType]| {
ensure!(
types.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly one, have: {}",
types.len()
)
}
);
let input_type = &types[0];
match input_type {
ConcreteDataType::String(_) | ConcreteDataType::Binary(_) => {
Ok(Box::new(VectorSum::default()))
}
_ => {
let err_msg = format!(
"\"VEC_SUM\" aggregate function not support data type {:?}",
input_type.logical_type_id(),
);
CreateAccumulatorSnafu { err_msg }.fail()?
}
}
});
creator
}
fn output_type(&self) -> common_query::error::Result<ConcreteDataType> {
Ok(ConcreteDataType::binary_datatype())
}
fn state_types(&self) -> common_query::error::Result<Vec<ConcreteDataType>> {
Ok(vec![self.output_type()?])
}
}
impl VectorSum {
fn inner(&mut self, len: usize) -> &mut OVector<f32, Dyn> {
self.sum
.get_or_insert_with(|| OVector::zeros_generic(Dyn(len), Const::<1>))
}
fn update(&mut self, values: &[VectorRef], is_update: bool) -> Result<(), Error> {
if values.is_empty() || self.has_null {
return Ok(());
};
let column = &values[0];
let len = column.len();
match as_veclit_if_const(column)? {
Some(column) => {
let vec_column = DVectorView::from_slice(&column, column.len()).scale(len as f32);
*self.inner(vec_column.len()) += vec_column;
}
None => {
for i in 0..len {
let Some(arg0) = as_veclit(column.get_ref(i))? else {
if is_update {
self.has_null = true;
self.sum = None;
}
return Ok(());
};
let vec_column = DVectorView::from_slice(&arg0, arg0.len());
*self.inner(vec_column.len()) += vec_column;
}
}
}
Ok(())
}
}
impl Accumulator for VectorSum {
fn state(&self) -> common_query::error::Result<Vec<Value>> {
self.evaluate().map(|v| vec![v])
}
fn update_batch(&mut self, values: &[VectorRef]) -> common_query::error::Result<()> {
self.update(values, true)
}
fn merge_batch(&mut self, states: &[VectorRef]) -> common_query::error::Result<()> {
self.update(states, false)
}
fn evaluate(&self) -> common_query::error::Result<Value> {
match &self.sum {
None => Ok(Value::Null),
Some(vector) => Ok(Value::from(veclit_to_binlit(vector.as_slice()))),
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use datatypes::vectors::{ConstantVector, StringVector};
use super::*;
#[test]
fn test_update_batch() {
// test update empty batch, expect not updating anything
let mut vec_sum = VectorSum::default();
vec_sum.update_batch(&[]).unwrap();
assert!(vec_sum.sum.is_none());
assert!(!vec_sum.has_null);
assert_eq!(Value::Null, vec_sum.evaluate().unwrap());
// test update one not-null value
let mut vec_sum = VectorSum::default();
let v: Vec<VectorRef> = vec![Arc::new(StringVector::from(vec![Some(
"[1.0,2.0,3.0]".to_string(),
)]))];
vec_sum.update_batch(&v).unwrap();
assert_eq!(
Value::from(veclit_to_binlit(&[1.0, 2.0, 3.0])),
vec_sum.evaluate().unwrap()
);
// test update one null value
let mut vec_sum = VectorSum::default();
let v: Vec<VectorRef> = vec![Arc::new(StringVector::from(vec![Option::<String>::None]))];
vec_sum.update_batch(&v).unwrap();
assert_eq!(Value::Null, vec_sum.evaluate().unwrap());
// test update no null-value batch
let mut vec_sum = VectorSum::default();
let v: Vec<VectorRef> = vec![Arc::new(StringVector::from(vec![
Some("[1.0,2.0,3.0]".to_string()),
Some("[4.0,5.0,6.0]".to_string()),
Some("[7.0,8.0,9.0]".to_string()),
]))];
vec_sum.update_batch(&v).unwrap();
assert_eq!(
Value::from(veclit_to_binlit(&[12.0, 15.0, 18.0])),
vec_sum.evaluate().unwrap()
);
// test update null-value batch
let mut vec_sum = VectorSum::default();
let v: Vec<VectorRef> = vec![Arc::new(StringVector::from(vec![
Some("[1.0,2.0,3.0]".to_string()),
None,
Some("[7.0,8.0,9.0]".to_string()),
]))];
vec_sum.update_batch(&v).unwrap();
assert_eq!(Value::Null, vec_sum.evaluate().unwrap());
// test update with constant vector
let mut vec_sum = VectorSum::default();
let v: Vec<VectorRef> = vec![Arc::new(ConstantVector::new(
Arc::new(StringVector::from_vec(vec!["[1.0,2.0,3.0]".to_string()])),
4,
))];
vec_sum.update_batch(&v).unwrap();
assert_eq!(
Value::from(veclit_to_binlit(&[4.0, 8.0, 12.0])),
vec_sum.evaluate().unwrap()
);
}
}

View File

@@ -1,218 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::borrow::Cow;
use std::fmt::Display;
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::Signature;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{BinaryVectorBuilder, MutableVector, VectorRef};
use nalgebra::DVectorView;
use snafu::ensure;
use crate::function::{Function, FunctionContext};
use crate::helper;
use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const, veclit_to_binlit};
const NAME: &str = "vec_div";
/// Divides corresponding elements of two vectors.
///
/// # Example
///
/// ```sql
/// SELECT vec_to_string(vec_div("[2, 4, 6]", "[2, 2, 2]")) as result;
///
/// +---------+
/// | result |
/// +---------+
/// | [1,2,3] |
/// +---------+
///
/// ```
#[derive(Debug, Clone, Default)]
pub struct VectorDivFunction;
impl Function for VectorDivFunction {
fn name(&self) -> &str {
NAME
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::binary_datatype())
}
fn signature(&self) -> Signature {
helper::one_of_sigs2(
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::binary_datatype(),
],
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::binary_datatype(),
],
)
}
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly two, have: {}",
columns.len()
),
}
);
let arg0 = &columns[0];
let arg1 = &columns[1];
let len = arg0.len();
let mut result = BinaryVectorBuilder::with_capacity(len);
if len == 0 {
return Ok(result.to_vector());
}
let arg0_const = as_veclit_if_const(arg0)?;
let arg1_const = as_veclit_if_const(arg1)?;
for i in 0..len {
let arg0 = match arg0_const.as_ref() {
Some(arg0) => Some(Cow::Borrowed(arg0.as_ref())),
None => as_veclit(arg0.get_ref(i))?,
};
let arg1 = match arg1_const.as_ref() {
Some(arg1) => Some(Cow::Borrowed(arg1.as_ref())),
None => as_veclit(arg1.get_ref(i))?,
};
if let (Some(arg0), Some(arg1)) = (arg0, arg1) {
ensure!(
arg0.len() == arg1.len(),
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the vectors must match for division, have: {} vs {}",
arg0.len(),
arg1.len()
),
}
);
let vec0 = DVectorView::from_slice(&arg0, arg0.len());
let vec1 = DVectorView::from_slice(&arg1, arg1.len());
let vec_res = vec0.component_div(&vec1);
let veclit = vec_res.as_slice();
let binlit = veclit_to_binlit(veclit);
result.push(Some(&binlit));
} else {
result.push_null();
}
}
Ok(result.to_vector())
}
}
impl Display for VectorDivFunction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", NAME.to_ascii_uppercase())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_query::error;
use datatypes::vectors::StringVector;
use super::*;
#[test]
fn test_vector_mul() {
let func = VectorDivFunction;
let vec0 = vec![1.0, 2.0, 3.0];
let vec1 = vec![1.0, 1.0];
let (len0, len1) = (vec0.len(), vec1.len());
let input0 = Arc::new(StringVector::from(vec![Some(format!("{vec0:?}"))]));
let input1 = Arc::new(StringVector::from(vec![Some(format!("{vec1:?}"))]));
let err = func
.eval(FunctionContext::default(), &[input0, input1])
.unwrap_err();
match err {
error::Error::InvalidFuncArgs { err_msg, .. } => {
assert_eq!(
err_msg,
format!(
"The length of the vectors must match for division, have: {} vs {}",
len0, len1
)
)
}
_ => unreachable!(),
}
let input0 = Arc::new(StringVector::from(vec![
Some("[1.0,2.0,3.0]".to_string()),
Some("[8.0,10.0,12.0]".to_string()),
Some("[7.0,8.0,9.0]".to_string()),
None,
]));
let input1 = Arc::new(StringVector::from(vec![
Some("[1.0,1.0,1.0]".to_string()),
Some("[2.0,2.0,2.0]".to_string()),
None,
Some("[3.0,3.0,3.0]".to_string()),
]));
let result = func
.eval(FunctionContext::default(), &[input0, input1])
.unwrap();
let result = result.as_ref();
assert_eq!(result.len(), 4);
assert_eq!(
result.get_ref(0).as_binary().unwrap(),
Some(veclit_to_binlit(&[1.0, 2.0, 3.0]).as_slice())
);
assert_eq!(
result.get_ref(1).as_binary().unwrap(),
Some(veclit_to_binlit(&[4.0, 5.0, 6.0]).as_slice())
);
assert!(result.get_ref(2).is_null());
assert!(result.get_ref(3).is_null());
let input0 = Arc::new(StringVector::from(vec![Some("[1.0,-2.0]".to_string())]));
let input1 = Arc::new(StringVector::from(vec![Some("[0.0,0.0]".to_string())]));
let result = func
.eval(FunctionContext::default(), &[input0, input1])
.unwrap();
let result = result.as_ref();
assert_eq!(
result.get_ref(0).as_binary().unwrap(),
Some(veclit_to_binlit(&[f64::INFINITY as f32, f64::NEG_INFINITY as f32]).as_slice())
);
}
}

View File

@@ -1,205 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::borrow::Cow;
use std::fmt::Display;
use common_query::error::{InvalidFuncArgsSnafu, Result};
use common_query::prelude::Signature;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::{BinaryVectorBuilder, MutableVector, VectorRef};
use nalgebra::DVectorView;
use snafu::ensure;
use crate::function::{Function, FunctionContext};
use crate::helper;
use crate::scalars::vector::impl_conv::{as_veclit, as_veclit_if_const, veclit_to_binlit};
const NAME: &str = "vec_mul";
/// Multiplies corresponding elements of two vectors.
///
/// # Example
///
/// ```sql
/// SELECT vec_to_string(vec_mul("[1, 2, 3]", "[1, 2, 3]")) as result;
///
/// +---------+
/// | result |
/// +---------+
/// | [1,4,9] |
/// +---------+
///
/// ```
#[derive(Debug, Clone, Default)]
pub struct VectorMulFunction;
impl Function for VectorMulFunction {
fn name(&self) -> &str {
NAME
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::binary_datatype())
}
fn signature(&self) -> Signature {
helper::one_of_sigs2(
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::binary_datatype(),
],
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::binary_datatype(),
],
)
}
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly two, have: {}",
columns.len()
),
}
);
let arg0 = &columns[0];
let arg1 = &columns[1];
let len = arg0.len();
let mut result = BinaryVectorBuilder::with_capacity(len);
if len == 0 {
return Ok(result.to_vector());
}
let arg0_const = as_veclit_if_const(arg0)?;
let arg1_const = as_veclit_if_const(arg1)?;
for i in 0..len {
let arg0 = match arg0_const.as_ref() {
Some(arg0) => Some(Cow::Borrowed(arg0.as_ref())),
None => as_veclit(arg0.get_ref(i))?,
};
let arg1 = match arg1_const.as_ref() {
Some(arg1) => Some(Cow::Borrowed(arg1.as_ref())),
None => as_veclit(arg1.get_ref(i))?,
};
if let (Some(arg0), Some(arg1)) = (arg0, arg1) {
ensure!(
arg0.len() == arg1.len(),
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the vectors must match for multiplying, have: {} vs {}",
arg0.len(),
arg1.len()
),
}
);
let vec0 = DVectorView::from_slice(&arg0, arg0.len());
let vec1 = DVectorView::from_slice(&arg1, arg1.len());
let vec_res = vec1.component_mul(&vec0);
let veclit = vec_res.as_slice();
let binlit = veclit_to_binlit(veclit);
result.push(Some(&binlit));
} else {
result.push_null();
}
}
Ok(result.to_vector())
}
}
impl Display for VectorMulFunction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", NAME.to_ascii_uppercase())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_query::error;
use datatypes::vectors::StringVector;
use super::*;
#[test]
fn test_vector_mul() {
let func = VectorMulFunction;
let vec0 = vec![1.0, 2.0, 3.0];
let vec1 = vec![1.0, 1.0];
let (len0, len1) = (vec0.len(), vec1.len());
let input0 = Arc::new(StringVector::from(vec![Some(format!("{vec0:?}"))]));
let input1 = Arc::new(StringVector::from(vec![Some(format!("{vec1:?}"))]));
let err = func
.eval(FunctionContext::default(), &[input0, input1])
.unwrap_err();
match err {
error::Error::InvalidFuncArgs { err_msg, .. } => {
assert_eq!(
err_msg,
format!(
"The length of the vectors must match for multiplying, have: {} vs {}",
len0, len1
)
)
}
_ => unreachable!(),
}
let input0 = Arc::new(StringVector::from(vec![
Some("[1.0,2.0,3.0]".to_string()),
Some("[8.0,10.0,12.0]".to_string()),
Some("[7.0,8.0,9.0]".to_string()),
None,
]));
let input1 = Arc::new(StringVector::from(vec![
Some("[1.0,1.0,1.0]".to_string()),
Some("[2.0,2.0,2.0]".to_string()),
None,
Some("[3.0,3.0,3.0]".to_string()),
]));
let result = func
.eval(FunctionContext::default(), &[input0, input1])
.unwrap();
let result = result.as_ref();
assert_eq!(result.len(), 4);
assert_eq!(
result.get_ref(0).as_binary().unwrap(),
Some(veclit_to_binlit(&[1.0, 2.0, 3.0]).as_slice())
);
assert_eq!(
result.get_ref(1).as_binary().unwrap(),
Some(veclit_to_binlit(&[16.0, 20.0, 24.0]).as_slice())
);
assert!(result.get_ref(2).is_null());
assert!(result.get_ref(3).is_null());
}
}

View File

@@ -1,58 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/// Escapes special characters in the provided pattern string for `LIKE`.
///
/// Specifically, it prefixes the backslash (`\`), percent (`%`), and underscore (`_`)
/// characters with an additional backslash to ensure they are treated literally.
///
/// # Examples
///
/// ```rust
/// let escaped = escape_pattern("100%_some\\path");
/// assert_eq!(escaped, "100\\%\\_some\\\\path");
/// ```
pub fn escape_like_pattern(pattern: &str) -> String {
pattern
.chars()
.flat_map(|c| match c {
'\\' | '%' | '_' => vec!['\\', c],
_ => vec![c],
})
.collect::<String>()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_escape_like_pattern() {
assert_eq!(
escape_like_pattern("100%_some\\path"),
"100\\%\\_some\\\\path"
);
assert_eq!(escape_like_pattern(""), "");
assert_eq!(escape_like_pattern("hello"), "hello");
assert_eq!(escape_like_pattern("\\%_"), "\\\\\\%\\_");
assert_eq!(escape_like_pattern("%%__\\\\"), "\\%\\%\\_\\_\\\\\\\\");
assert_eq!(escape_like_pattern("abc123"), "abc123");
assert_eq!(escape_like_pattern("%_\\"), "\\%\\_\\\\");
assert_eq!(
escape_like_pattern("%%__\\\\another%string"),
"\\%\\%\\_\\_\\\\\\\\another\\%string"
);
assert_eq!(escape_like_pattern("foo%bar_"), "foo\\%bar\\_");
assert_eq!(escape_like_pattern("\\_\\%"), "\\\\\\_\\\\\\%");
}
}

View File

@@ -60,7 +60,6 @@ pub fn alter_expr_to_request(table_id: TableId, expr: AlterTableExpr) -> Result<
column_schema: schema,
is_key: column_def.semantic_type == SemanticType::Tag as i32,
location: parse_location(ac.location)?,
add_if_not_exists: ac.add_if_not_exists,
})
})
.collect::<Result<Vec<_>>>()?;
@@ -221,7 +220,6 @@ mod tests {
..Default::default()
}),
location: None,
add_if_not_exists: true,
}],
})),
};
@@ -242,7 +240,6 @@ mod tests {
add_column.column_schema.data_type
);
assert_eq!(None, add_column.location);
assert!(add_column.add_if_not_exists);
}
#[test]
@@ -268,7 +265,6 @@ mod tests {
location_type: LocationType::First.into(),
after_column_name: String::default(),
}),
add_if_not_exists: false,
},
AddColumn {
column_def: Some(ColumnDef {
@@ -284,7 +280,6 @@ mod tests {
location_type: LocationType::After.into(),
after_column_name: "ts".to_string(),
}),
add_if_not_exists: true,
},
],
})),
@@ -313,7 +308,6 @@ mod tests {
}),
add_column.location
);
assert!(add_column.add_if_not_exists);
let add_column = add_columns.pop().unwrap();
assert!(!add_column.is_key);
@@ -323,7 +317,6 @@ mod tests {
add_column.column_schema.data_type
);
assert_eq!(Some(AddColumnLocation::First), add_column.location);
assert!(!add_column.add_if_not_exists);
}
#[test]

View File

@@ -299,7 +299,6 @@ mod tests {
.unwrap()
)
);
assert!(host_column.add_if_not_exists);
let memory_column = &add_columns.add_columns[1];
assert_eq!(
@@ -312,7 +311,6 @@ mod tests {
.unwrap()
)
);
assert!(host_column.add_if_not_exists);
let time_column = &add_columns.add_columns[2];
assert_eq!(
@@ -325,7 +323,6 @@ mod tests {
.unwrap()
)
);
assert!(host_column.add_if_not_exists);
let interval_column = &add_columns.add_columns[3];
assert_eq!(
@@ -338,7 +335,6 @@ mod tests {
.unwrap()
)
);
assert!(host_column.add_if_not_exists);
let decimal_column = &add_columns.add_columns[4];
assert_eq!(
@@ -356,7 +352,6 @@ mod tests {
.unwrap()
)
);
assert!(host_column.add_if_not_exists);
}
#[test]

View File

@@ -192,9 +192,6 @@ pub fn build_create_table_expr(
Ok(expr)
}
/// Find columns that are not present in the schema and return them as `AddColumns`
/// for adding columns automatically.
/// It always sets `add_if_not_exists` to `true` for now.
pub fn extract_new_columns(
schema: &Schema,
column_exprs: Vec<ColumnExpr>,
@@ -216,7 +213,6 @@ pub fn extract_new_columns(
AddColumn {
column_def,
location: None,
add_if_not_exists: true,
}
})
.collect::<Vec<_>>();

View File

@@ -43,7 +43,7 @@ pub struct CacheContainer<K, V, CacheToken> {
cache: Cache<K, V>,
invalidator: Invalidator<K, V, CacheToken>,
initializer: Initializer<K, V>,
token_filter: fn(&CacheToken) -> bool,
token_filter: TokenFilter<CacheToken>,
}
impl<K, V, CacheToken> CacheContainer<K, V, CacheToken>
@@ -58,7 +58,7 @@ where
cache: Cache<K, V>,
invalidator: Invalidator<K, V, CacheToken>,
initializer: Initializer<K, V>,
token_filter: fn(&CacheToken) -> bool,
token_filter: TokenFilter<CacheToken>,
) -> Self {
Self {
name,
@@ -206,13 +206,10 @@ mod tests {
name: &'a str,
}
fn always_true_filter(_: &String) -> bool {
true
}
#[tokio::test]
async fn test_get() {
let cache: Cache<NameKey, String> = CacheBuilder::new(128).build();
let filter: TokenFilter<String> = Box::new(|_| true);
let counter = Arc::new(AtomicI32::new(0));
let moved_counter = counter.clone();
let init: Initializer<NameKey, String> = Arc::new(move |_| {
@@ -222,13 +219,7 @@ mod tests {
let invalidator: Invalidator<NameKey, String, String> =
Box::new(|_, _| Box::pin(async { Ok(()) }));
let adv_cache = CacheContainer::new(
"test".to_string(),
cache,
invalidator,
init,
always_true_filter,
);
let adv_cache = CacheContainer::new("test".to_string(), cache, invalidator, init, filter);
let key = NameKey { name: "key" };
let value = adv_cache.get(key).await.unwrap().unwrap();
assert_eq!(value, "hi");
@@ -242,6 +233,7 @@ mod tests {
#[tokio::test]
async fn test_get_by_ref() {
let cache: Cache<String, String> = CacheBuilder::new(128).build();
let filter: TokenFilter<String> = Box::new(|_| true);
let counter = Arc::new(AtomicI32::new(0));
let moved_counter = counter.clone();
let init: Initializer<String, String> = Arc::new(move |_| {
@@ -251,13 +243,7 @@ mod tests {
let invalidator: Invalidator<String, String, String> =
Box::new(|_, _| Box::pin(async { Ok(()) }));
let adv_cache = CacheContainer::new(
"test".to_string(),
cache,
invalidator,
init,
always_true_filter,
);
let adv_cache = CacheContainer::new("test".to_string(), cache, invalidator, init, filter);
let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap();
assert_eq!(value, "hi");
let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap();
@@ -271,18 +257,13 @@ mod tests {
#[tokio::test]
async fn test_get_value_not_exits() {
let cache: Cache<String, String> = CacheBuilder::new(128).build();
let filter: TokenFilter<String> = Box::new(|_| true);
let init: Initializer<String, String> =
Arc::new(move |_| Box::pin(async { error::ValueNotExistSnafu {}.fail() }));
let invalidator: Invalidator<String, String, String> =
Box::new(|_, _| Box::pin(async { Ok(()) }));
let adv_cache = CacheContainer::new(
"test".to_string(),
cache,
invalidator,
init,
always_true_filter,
);
let adv_cache = CacheContainer::new("test".to_string(), cache, invalidator, init, filter);
let value = adv_cache.get_by_ref("foo").await.unwrap();
assert!(value.is_none());
}
@@ -290,6 +271,7 @@ mod tests {
#[tokio::test]
async fn test_invalidate() {
let cache: Cache<String, String> = CacheBuilder::new(128).build();
let filter: TokenFilter<String> = Box::new(|_| true);
let counter = Arc::new(AtomicI32::new(0));
let moved_counter = counter.clone();
let init: Initializer<String, String> = Arc::new(move |_| {
@@ -303,13 +285,7 @@ mod tests {
})
});
let adv_cache = CacheContainer::new(
"test".to_string(),
cache,
invalidator,
init,
always_true_filter,
);
let adv_cache = CacheContainer::new("test".to_string(), cache, invalidator, init, filter);
let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap();
assert_eq!(value, "hi");
let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap();

View File

@@ -45,7 +45,7 @@ pub fn new_table_flownode_set_cache(
let table_flow_manager = Arc::new(TableFlowManager::new(kv_backend));
let init = init_factory(table_flow_manager);
CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter))
}
fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId, FlownodeSet> {

View File

@@ -151,15 +151,12 @@ mod tests {
use crate::cache::*;
use crate::instruction::CacheIdent;
fn always_true_filter(_: &CacheIdent) -> bool {
true
}
fn test_cache(
name: &str,
invalidator: Invalidator<String, String, CacheIdent>,
) -> CacheContainer<String, String, CacheIdent> {
let cache: Cache<String, String> = CacheBuilder::new(128).build();
let filter: TokenFilter<CacheIdent> = Box::new(|_| true);
let counter = Arc::new(AtomicI32::new(0));
let moved_counter = counter.clone();
let init: Initializer<String, String> = Arc::new(move |_| {
@@ -167,13 +164,7 @@ mod tests {
Box::pin(async { Ok(Some("hi".to_string())) })
});
CacheContainer::new(
name.to_string(),
cache,
invalidator,
init,
always_true_filter,
)
CacheContainer::new(name.to_string(), cache, invalidator, init, filter)
}
fn test_i32_cache(
@@ -181,6 +172,7 @@ mod tests {
invalidator: Invalidator<i32, String, CacheIdent>,
) -> CacheContainer<i32, String, CacheIdent> {
let cache: Cache<i32, String> = CacheBuilder::new(128).build();
let filter: TokenFilter<CacheIdent> = Box::new(|_| true);
let counter = Arc::new(AtomicI32::new(0));
let moved_counter = counter.clone();
let init: Initializer<i32, String> = Arc::new(move |_| {
@@ -188,13 +180,7 @@ mod tests {
Box::pin(async { Ok(Some("foo".to_string())) })
});
CacheContainer::new(
name.to_string(),
cache,
invalidator,
init,
always_true_filter,
)
CacheContainer::new(name.to_string(), cache, invalidator, init, filter)
}
#[tokio::test]

View File

@@ -36,7 +36,7 @@ pub fn new_schema_cache(
let schema_manager = SchemaManager::new(kv_backend.clone());
let init = init_factory(schema_manager);
CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter))
}
fn init_factory(schema_manager: SchemaManager) -> Initializer<SchemaName, Arc<SchemaNameValue>> {

View File

@@ -41,7 +41,7 @@ pub fn new_table_info_cache(
let table_info_manager = Arc::new(TableInfoManager::new(kv_backend));
let init = init_factory(table_info_manager);
CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter))
}
fn init_factory(table_info_manager: TableInfoManagerRef) -> Initializer<TableId, Arc<TableInfo>> {

View File

@@ -41,7 +41,7 @@ pub fn new_table_name_cache(
let table_name_manager = Arc::new(TableNameManager::new(kv_backend));
let init = init_factory(table_name_manager);
CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter))
}
fn init_factory(table_name_manager: TableNameManagerRef) -> Initializer<TableName, TableId> {

View File

@@ -49,6 +49,14 @@ impl TableRoute {
TableRoute::Logical(_) => None,
}
}
/// Returns [LogicalTableRouteValue] reference if it's [TableRoute::Logical]; Otherwise it returns [None].
pub fn as_logical_table_route_ref(&self) -> Option<&Arc<LogicalTableRouteValue>> {
match self {
TableRoute::Physical(_) => None,
TableRoute::Logical(table_route) => Some(table_route),
}
}
}
/// [TableRouteCache] caches the [TableId] to [TableRoute] mapping.
@@ -65,7 +73,7 @@ pub fn new_table_route_cache(
let table_info_manager = Arc::new(TableRouteManager::new(kv_backend));
let init = init_factory(table_info_manager);
CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter))
}
fn init_factory(

View File

@@ -40,7 +40,7 @@ pub fn new_table_schema_cache(
let table_info_manager = TableInfoManager::new(kv_backend);
let init = init_factory(table_info_manager);
CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter))
}
fn init_factory(table_info_manager: TableInfoManager) -> Initializer<TableId, Arc<SchemaName>> {

View File

@@ -40,7 +40,7 @@ pub fn new_view_info_cache(
let view_info_manager = Arc::new(ViewInfoManager::new(kv_backend));
let init = init_factory(view_info_manager);
CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter))
}
fn init_factory(view_info_manager: ViewInfoManagerRef) -> Initializer<TableId, Arc<ViewInfoValue>> {

View File

@@ -105,7 +105,7 @@ impl AlterLogicalTablesProcedure {
.context(ConvertAlterTableRequestSnafu)?;
let new_meta = table_info
.meta
.builder_with_alter_kind(table_ref.table, &request.alter_kind)
.builder_with_alter_kind(table_ref.table, &request.alter_kind, true)
.context(error::TableSnafu)?
.build()
.with_context(|_| error::BuildTableMetaSnafu {

View File

@@ -28,13 +28,13 @@ use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSn
use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Status, StringKey,
};
use common_telemetry::{debug, error, info};
use common_telemetry::{debug, info};
use futures::future;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::storage::RegionId;
use strum::AsRefStr;
use table::metadata::{RawTableInfo, TableId, TableInfo};
use table::metadata::{RawTableInfo, TableId};
use table::table_reference::TableReference;
use crate::cache_invalidator::Context;
@@ -51,14 +51,10 @@ use crate::{metrics, ClusterId};
/// The alter table procedure
pub struct AlterTableProcedure {
/// The runtime context.
// The runtime context.
context: DdlContext,
/// The serialized data.
// The serialized data.
data: AlterTableData,
/// Cached new table metadata in the prepare step.
/// If we recover the procedure from json, then the table info value is not cached.
/// But we already validated it in the prepare step.
new_table_info: Option<TableInfo>,
}
impl AlterTableProcedure {
@@ -74,31 +70,18 @@ impl AlterTableProcedure {
Ok(Self {
context,
data: AlterTableData::new(task, table_id, cluster_id),
new_table_info: None,
})
}
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data: AlterTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
Ok(AlterTableProcedure {
context,
data,
new_table_info: None,
})
Ok(AlterTableProcedure { context, data })
}
// Checks whether the table exists.
pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
self.check_alter().await?;
self.fill_table_info().await?;
// Validates the request and builds the new table info.
// We need to build the new table info here because we should ensure the alteration
// is valid in `UpdateMeta` state as we already altered the region.
// Safety: `fill_table_info()` already set it.
let table_info_value = self.data.table_info_value.as_ref().unwrap();
self.new_table_info = Some(self.build_new_table_info(&table_info_value.table_info)?);
// Safety: Checked in `AlterTableProcedure::new`.
let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
if matches!(alter_kind, Kind::RenameTable { .. }) {
@@ -123,14 +106,6 @@ impl AlterTableProcedure {
let leaders = find_leaders(&physical_table_route.region_routes);
let mut alter_region_tasks = Vec::with_capacity(leaders.len());
let alter_kind = self.make_region_alter_kind()?;
info!(
"Submitting alter region requests for table {}, table_id: {}, alter_kind: {:?}",
self.data.table_ref(),
table_id,
alter_kind,
);
for datanode in leaders {
let requester = self.context.node_manager.datanode(&datanode).await;
@@ -138,7 +113,7 @@ impl AlterTableProcedure {
for region in regions {
let region_id = RegionId::new(table_id, region);
let request = self.make_alter_region_request(region_id, alter_kind.clone())?;
let request = self.make_alter_region_request(region_id)?;
debug!("Submitting {request:?} to {datanode}");
let datanode = datanode.clone();
@@ -175,15 +150,7 @@ impl AlterTableProcedure {
let table_ref = self.data.table_ref();
// Safety: checked before.
let table_info_value = self.data.table_info_value.as_ref().unwrap();
// Gets the table info from the cache or builds it.
let new_info = match &self.new_table_info {
Some(cached) => cached.clone(),
None => self.build_new_table_info(&table_info_value.table_info)
.inspect_err(|e| {
// We already check the table info in the prepare step so this should not happen.
error!(e; "Unable to build info for table {} in update metadata step, table_id: {}", table_ref, table_id);
})?,
};
let new_info = self.build_new_table_info(&table_info_value.table_info)?;
debug!(
"Starting update table: {} metadata, new table info {:?}",
@@ -207,7 +174,7 @@ impl AlterTableProcedure {
.await?;
}
info!("Updated table metadata for table {table_ref}, table_id: {table_id}, kind: {alter_kind:?}");
info!("Updated table metadata for table {table_ref}, table_id: {table_id}");
self.data.state = AlterTableState::InvalidateTableCache;
Ok(Status::executing(true))
}

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use api::v1::alter_table_expr::Kind;
use api::v1::region::region_request::Body;
use api::v1::region::{
@@ -29,15 +27,13 @@ use crate::ddl::alter_table::AlterTableProcedure;
use crate::error::{InvalidProtoMsgSnafu, Result};
impl AlterTableProcedure {
/// Makes alter region request from existing an alter kind.
/// Region alter request always add columns if not exist.
pub(crate) fn make_alter_region_request(
&self,
region_id: RegionId,
kind: Option<alter_request::Kind>,
) -> Result<RegionRequest> {
/// Makes alter region request.
pub(crate) fn make_alter_region_request(&self, region_id: RegionId) -> Result<RegionRequest> {
// Safety: Checked in `AlterTableProcedure::new`.
let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
// Safety: checked
let table_info = self.data.table_info().unwrap();
let kind = create_proto_alter_kind(table_info, alter_kind)?;
Ok(RegionRequest {
header: Some(RegionRequestHeader {
@@ -51,66 +47,45 @@ impl AlterTableProcedure {
})),
})
}
/// Makes alter kind proto that all regions can reuse.
/// Region alter request always add columns if not exist.
pub(crate) fn make_region_alter_kind(&self) -> Result<Option<alter_request::Kind>> {
// Safety: Checked in `AlterTableProcedure::new`.
let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
// Safety: checked
let table_info = self.data.table_info().unwrap();
let kind = create_proto_alter_kind(table_info, alter_kind)?;
Ok(kind)
}
}
/// Creates region proto alter kind from `table_info` and `alter_kind`.
///
/// It always adds column if not exists and drops column if exists.
/// It skips the column if it already exists in the table.
/// Returns the kind and next column id if it adds new columns.
fn create_proto_alter_kind(
table_info: &RawTableInfo,
alter_kind: &Kind,
) -> Result<Option<alter_request::Kind>> {
match alter_kind {
Kind::AddColumns(x) => {
// Construct a set of existing columns in the table.
let existing_columns: HashSet<_> = table_info
.meta
.schema
.column_schemas
.iter()
.map(|col| &col.name)
.collect();
let mut next_column_id = table_info.meta.next_column_id;
let mut add_columns = Vec::with_capacity(x.add_columns.len());
for add_column in &x.add_columns {
let column_def = add_column
.column_def
.as_ref()
.context(InvalidProtoMsgSnafu {
err_msg: "'column_def' is absent",
})?;
let add_columns = x
.add_columns
.iter()
.map(|add_column| {
let column_def =
add_column
.column_def
.as_ref()
.context(InvalidProtoMsgSnafu {
err_msg: "'column_def' is absent",
})?;
// Skips existing columns.
if existing_columns.contains(&column_def.name) {
continue;
}
let column_id = next_column_id;
next_column_id += 1;
let column_id = next_column_id;
next_column_id += 1;
let column_def = RegionColumnDef {
column_def: Some(column_def.clone()),
column_id,
};
let column_def = RegionColumnDef {
column_def: Some(column_def.clone()),
column_id,
};
add_columns.push(AddColumn {
column_def: Some(column_def),
location: add_column.location.clone(),
});
}
Ok(AddColumn {
column_def: Some(column_def),
location: add_column.location.clone(),
})
})
.collect::<Result<Vec<_>>>()?;
Ok(Some(alter_request::Kind::AddColumns(AddColumns {
add_columns,
@@ -168,7 +143,6 @@ mod tests {
use crate::rpc::router::{Region, RegionRoute};
use crate::test_util::{new_ddl_context, MockDatanodeManager};
/// Prepares a region with schema `[ts: Timestamp, host: Tag, cpu: Field]`.
async fn prepare_ddl_context() -> (DdlContext, u64, TableId, RegionId, String) {
let datanode_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(datanode_manager);
@@ -197,7 +171,6 @@ mod tests {
.name("cpu")
.data_type(ColumnDataType::Float64)
.semantic_type(SemanticType::Field)
.is_nullable(true)
.build()
.unwrap()
.into(),
@@ -252,16 +225,15 @@ mod tests {
name: "my_tag3".to_string(),
data_type: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: Vec::new(),
default_constraint: b"hello".to_vec(),
semantic_type: SemanticType::Tag as i32,
comment: String::new(),
..Default::default()
}),
location: Some(AddColumnLocation {
location_type: LocationType::After as i32,
after_column_name: "host".to_string(),
after_column_name: "my_tag2".to_string(),
}),
add_if_not_exists: false,
}],
})),
},
@@ -270,11 +242,8 @@ mod tests {
let mut procedure =
AlterTableProcedure::new(cluster_id, table_id, task, ddl_context).unwrap();
procedure.on_prepare().await.unwrap();
let alter_kind = procedure.make_region_alter_kind().unwrap();
let Some(Body::Alter(alter_region_request)) = procedure
.make_alter_region_request(region_id, alter_kind)
.unwrap()
.body
let Some(Body::Alter(alter_region_request)) =
procedure.make_alter_region_request(region_id).unwrap().body
else {
unreachable!()
};
@@ -290,7 +259,7 @@ mod tests {
name: "my_tag3".to_string(),
data_type: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: Vec::new(),
default_constraint: b"hello".to_vec(),
semantic_type: SemanticType::Tag as i32,
comment: String::new(),
..Default::default()
@@ -299,7 +268,7 @@ mod tests {
}),
location: Some(AddColumnLocation {
location_type: LocationType::After as i32,
after_column_name: "host".to_string(),
after_column_name: "my_tag2".to_string(),
}),
}]
}
@@ -330,11 +299,8 @@ mod tests {
let mut procedure =
AlterTableProcedure::new(cluster_id, table_id, task, ddl_context).unwrap();
procedure.on_prepare().await.unwrap();
let alter_kind = procedure.make_region_alter_kind().unwrap();
let Some(Body::Alter(alter_region_request)) = procedure
.make_alter_region_request(region_id, alter_kind)
.unwrap()
.body
let Some(Body::Alter(alter_region_request)) =
procedure.make_alter_region_request(region_id).unwrap().body
else {
unreachable!()
};

View File

@@ -23,9 +23,7 @@ use crate::key::table_info::TableInfoValue;
use crate::key::{DeserializedValueWithBytes, RegionDistribution};
impl AlterTableProcedure {
/// Builds new table info after alteration.
/// It bumps the column id of the table by the number of the add column requests.
/// So there may be holes in the column id sequence.
/// Builds new_meta
pub(crate) fn build_new_table_info(&self, table_info: &RawTableInfo) -> Result<TableInfo> {
let table_info =
TableInfo::try_from(table_info.clone()).context(error::ConvertRawTableInfoSnafu)?;
@@ -36,7 +34,7 @@ impl AlterTableProcedure {
let new_meta = table_info
.meta
.builder_with_alter_kind(table_ref.table, &request.alter_kind)
.builder_with_alter_kind(table_ref.table, &request.alter_kind, false)
.context(error::TableSnafu)?
.build()
.with_context(|_| error::BuildTableMetaSnafu {
@@ -48,9 +46,6 @@ impl AlterTableProcedure {
new_info.ident.version = table_info.ident.version + 1;
match request.alter_kind {
AlterKind::AddColumns { columns } => {
// Bumps the column id for the new columns.
// It may bump more than the actual number of columns added if there are
// existing columns, but it's fine.
new_info.meta.next_column_id += columns.len() as u32;
}
AlterKind::RenameTable { new_table_name } => {

View File

@@ -30,8 +30,6 @@ pub struct TestAlterTableExpr {
add_columns: Vec<ColumnDef>,
#[builder(setter(into, strip_option))]
new_table_name: Option<String>,
#[builder(setter)]
add_if_not_exists: bool,
}
impl From<TestAlterTableExpr> for AlterTableExpr {
@@ -55,7 +53,6 @@ impl From<TestAlterTableExpr> for AlterTableExpr {
.map(|col| AddColumn {
column_def: Some(col),
location: None,
add_if_not_exists: value.add_if_not_exists,
})
.collect(),
})),

View File

@@ -56,7 +56,6 @@ fn make_alter_logical_table_add_column_task(
let alter_table = alter_table
.table_name(table.to_string())
.add_columns(add_columns)
.add_if_not_exists(true)
.build()
.unwrap();

View File

@@ -139,7 +139,7 @@ async fn test_on_submit_alter_request() {
table_name: table_name.to_string(),
kind: Some(Kind::DropColumns(DropColumns {
drop_columns: vec![DropColumn {
name: "cpu".to_string(),
name: "my_field_column".to_string(),
}],
})),
},
@@ -225,7 +225,7 @@ async fn test_on_submit_alter_request_with_outdated_request() {
table_name: table_name.to_string(),
kind: Some(Kind::DropColumns(DropColumns {
drop_columns: vec![DropColumn {
name: "cpu".to_string(),
name: "my_field_column".to_string(),
}],
})),
},
@@ -330,7 +330,6 @@ async fn test_on_update_metadata_add_columns() {
..Default::default()
}),
location: None,
add_if_not_exists: false,
}],
})),
},

View File

@@ -137,7 +137,6 @@ use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue};
use self::table_route::{TableRouteManager, TableRouteValue};
use self::tombstone::TombstoneManager;
use crate::error::{self, Result, SerdeJsonSnafu};
use crate::key::flow::flow_state::FlowStateValue;
use crate::key::node_address::NodeAddressValue;
use crate::key::table_route::TableRouteKey;
use crate::key::txn_helper::TxnOpGetResponseSet;
@@ -1263,8 +1262,7 @@ impl_metadata_value! {
FlowRouteValue,
TableFlowValue,
NodeAddressValue,
SchemaNameValue,
FlowStateValue
SchemaNameValue
}
impl_optional_metadata_value! {

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::fmt::Display;
use std::sync::Arc;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use futures::stream::BoxStream;
@@ -145,7 +146,7 @@ impl CatalogManager {
self.kv_backend.clone(),
req,
DEFAULT_PAGE_SIZE,
catalog_decoder,
Arc::new(catalog_decoder),
)
.into_stream();
@@ -155,8 +156,6 @@ impl CatalogManager {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::kv_backend::memory::MemoryKvBackend;

View File

@@ -14,6 +14,7 @@
use std::collections::HashMap;
use std::fmt::Display;
use std::sync::Arc;
use futures::stream::BoxStream;
use serde::{Deserialize, Serialize};
@@ -165,7 +166,7 @@ impl DatanodeTableManager {
self.kv_backend.clone(),
req,
DEFAULT_PAGE_SIZE,
datanode_table_value_decoder,
Arc::new(datanode_table_value_decoder),
)
.into_stream();

View File

@@ -15,7 +15,6 @@
pub mod flow_info;
pub(crate) mod flow_name;
pub(crate) mod flow_route;
pub mod flow_state;
pub(crate) mod flownode_flow;
pub(crate) mod table_flow;
@@ -36,7 +35,6 @@ use crate::ensure_values;
use crate::error::{self, Result};
use crate::key::flow::flow_info::FlowInfoManager;
use crate::key::flow::flow_name::FlowNameManager;
use crate::key::flow::flow_state::FlowStateManager;
use crate::key::flow::flownode_flow::FlownodeFlowManager;
pub use crate::key::flow::table_flow::{TableFlowManager, TableFlowManagerRef};
use crate::key::txn_helper::TxnOpGetResponseSet;
@@ -104,8 +102,6 @@ pub struct FlowMetadataManager {
flownode_flow_manager: FlownodeFlowManager,
table_flow_manager: TableFlowManager,
flow_name_manager: FlowNameManager,
/// only metasrv have access to itself's memory backend, so for other case it should be None
flow_state_manager: Option<FlowStateManager>,
kv_backend: KvBackendRef,
}
@@ -118,7 +114,6 @@ impl FlowMetadataManager {
flow_name_manager: FlowNameManager::new(kv_backend.clone()),
flownode_flow_manager: FlownodeFlowManager::new(kv_backend.clone()),
table_flow_manager: TableFlowManager::new(kv_backend.clone()),
flow_state_manager: None,
kv_backend,
}
}
@@ -128,10 +123,6 @@ impl FlowMetadataManager {
&self.flow_name_manager
}
pub fn flow_state_manager(&self) -> Option<&FlowStateManager> {
self.flow_state_manager.as_ref()
}
/// Returns the [`FlowInfoManager`].
pub fn flow_info_manager(&self) -> &FlowInfoManager {
&self.flow_info_manager

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use futures::stream::BoxStream;
use lazy_static::lazy_static;
use regex::Regex;
@@ -199,7 +201,7 @@ impl FlowNameManager {
self.kv_backend.clone(),
req,
DEFAULT_PAGE_SIZE,
flow_name_decoder,
Arc::new(flow_name_decoder),
)
.into_stream();

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use futures::stream::BoxStream;
use lazy_static::lazy_static;
use regex::Regex;
@@ -177,7 +179,7 @@ impl FlowRouteManager {
self.kv_backend.clone(),
req,
DEFAULT_PAGE_SIZE,
flow_route_decoder,
Arc::new(flow_route_decoder),
)
.into_stream();

View File

@@ -1,162 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use crate::error::{self, Result};
use crate::key::flow::FlowScoped;
use crate::key::{FlowId, MetadataKey, MetadataValue};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::PutRequest;
/// The entire FlowId to Flow Size's Map is stored directly in the value part of the key.
const FLOW_STATE_KEY: &str = "state";
/// The key of flow state.
#[derive(Debug, Clone, Copy, PartialEq)]
struct FlowStateKeyInner;
impl FlowStateKeyInner {
pub fn new() -> Self {
Self
}
}
impl<'a> MetadataKey<'a, FlowStateKeyInner> for FlowStateKeyInner {
fn to_bytes(&self) -> Vec<u8> {
FLOW_STATE_KEY.as_bytes().to_vec()
}
fn from_bytes(bytes: &'a [u8]) -> Result<FlowStateKeyInner> {
let key = std::str::from_utf8(bytes).map_err(|e| {
error::InvalidMetadataSnafu {
err_msg: format!(
"FlowInfoKeyInner '{}' is not a valid UTF8 string: {e}",
String::from_utf8_lossy(bytes)
),
}
.build()
})?;
if key != FLOW_STATE_KEY {
return Err(error::InvalidMetadataSnafu {
err_msg: format!("Invalid FlowStateKeyInner '{key}'"),
}
.build());
}
Ok(FlowStateKeyInner::new())
}
}
/// The key stores the state size of the flow.
///
/// The layout: `__flow/state`.
pub struct FlowStateKey(FlowScoped<FlowStateKeyInner>);
impl FlowStateKey {
/// Returns the [FlowStateKey].
pub fn new() -> FlowStateKey {
let inner = FlowStateKeyInner::new();
FlowStateKey(FlowScoped::new(inner))
}
}
impl Default for FlowStateKey {
fn default() -> Self {
Self::new()
}
}
impl<'a> MetadataKey<'a, FlowStateKey> for FlowStateKey {
fn to_bytes(&self) -> Vec<u8> {
self.0.to_bytes()
}
fn from_bytes(bytes: &'a [u8]) -> Result<FlowStateKey> {
Ok(FlowStateKey(FlowScoped::<FlowStateKeyInner>::from_bytes(
bytes,
)?))
}
}
/// The value of flow state size
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct FlowStateValue {
/// For each key, the bytes of the state in memory
pub state_size: BTreeMap<FlowId, usize>,
}
impl FlowStateValue {
pub fn new(state_size: BTreeMap<FlowId, usize>) -> Self {
Self { state_size }
}
}
pub type FlowStateManagerRef = Arc<FlowStateManager>;
/// The manager of [FlowStateKey]. Since state size changes frequently, we store it in memory.
///
/// This is only used in distributed mode. When meta-srv use heartbeat to update the flow stat report
/// and frontned use get to get the latest flow stat report.
pub struct FlowStateManager {
in_memory: KvBackendRef,
}
impl FlowStateManager {
pub fn new(in_memory: KvBackendRef) -> Self {
Self { in_memory }
}
pub async fn get(&self) -> Result<Option<FlowStateValue>> {
let key = FlowStateKey::new().to_bytes();
self.in_memory
.get(&key)
.await?
.map(|x| FlowStateValue::try_from_raw_value(&x.value))
.transpose()
}
pub async fn put(&self, value: FlowStateValue) -> Result<()> {
let key = FlowStateKey::new().to_bytes();
let value = value.try_as_raw_value()?;
let req = PutRequest::new().with_key(key).with_value(value);
self.in_memory.put(req).await?;
Ok(())
}
}
/// Flow's state report, send regularly through heartbeat message
#[derive(Debug, Clone)]
pub struct FlowStat {
/// For each key, the bytes of the state in memory
pub state_size: BTreeMap<u32, usize>,
}
impl From<FlowStateValue> for FlowStat {
fn from(value: FlowStateValue) -> Self {
Self {
state_size: value.state_size,
}
}
}
impl From<FlowStat> for FlowStateValue {
fn from(value: FlowStat) -> Self {
Self {
state_size: value.state_size,
}
}
}

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use futures::stream::BoxStream;
use futures::TryStreamExt;
use lazy_static::lazy_static;
@@ -177,7 +179,7 @@ impl FlownodeFlowManager {
self.kv_backend.clone(),
req,
DEFAULT_PAGE_SIZE,
flownode_flow_key_decoder,
Arc::new(flownode_flow_key_decoder),
)
.into_stream();

View File

@@ -206,7 +206,7 @@ impl TableFlowManager {
self.kv_backend.clone(),
req,
DEFAULT_PAGE_SIZE,
table_flow_decoder,
Arc::new(table_flow_decoder),
)
.into_stream();

View File

@@ -28,10 +28,13 @@ pub type SchemaMetadataManagerRef = Arc<SchemaMetadataManager>;
pub struct SchemaMetadataManager {
table_id_schema_cache: TableSchemaCacheRef,
schema_cache: SchemaCacheRef,
#[cfg(any(test, feature = "testing"))]
kv_backend: crate::kv_backend::KvBackendRef,
}
impl SchemaMetadataManager {
/// Creates a new database meta
#[cfg(not(any(test, feature = "testing")))]
pub fn new(table_id_schema_cache: TableSchemaCacheRef, schema_cache: SchemaCacheRef) -> Self {
Self {
table_id_schema_cache,
@@ -39,6 +42,20 @@ impl SchemaMetadataManager {
}
}
/// Creates a new database meta
#[cfg(any(test, feature = "testing"))]
pub fn new(
kv_backend: crate::kv_backend::KvBackendRef,
table_id_schema_cache: TableSchemaCacheRef,
schema_cache: SchemaCacheRef,
) -> Self {
Self {
table_id_schema_cache,
schema_cache,
kv_backend,
}
}
/// Gets schema options by table id.
pub async fn get_schema_options_by_table_id(
&self,
@@ -63,7 +80,6 @@ impl SchemaMetadataManager {
schema_name: &str,
catalog_name: &str,
schema_value: Option<crate::key::schema_name::SchemaNameValue>,
kv_backend: crate::kv_backend::KvBackendRef,
) {
use table::metadata::{RawTableInfo, TableType};
let value = crate::key::table_info::TableInfoValue::new(RawTableInfo {
@@ -75,18 +91,19 @@ impl SchemaMetadataManager {
meta: Default::default(),
table_type: TableType::Base,
});
let table_info_manager = crate::key::table_info::TableInfoManager::new(kv_backend.clone());
let table_info_manager =
crate::key::table_info::TableInfoManager::new(self.kv_backend.clone());
let (txn, _) = table_info_manager
.build_create_txn(table_id, &value)
.unwrap();
let resp = kv_backend.txn(txn).await.unwrap();
let resp = self.kv_backend.txn(txn).await.unwrap();
assert!(resp.succeeded, "Failed to create table metadata");
let key = crate::key::schema_name::SchemaNameKey {
catalog: catalog_name,
schema: schema_name,
};
crate::key::schema_name::SchemaManager::new(kv_backend.clone())
crate::key::schema_name::SchemaManager::new(self.kv_backend.clone())
.create(key, schema_value, false)
.await
.expect("Failed to create schema metadata");

View File

@@ -14,6 +14,7 @@
use std::collections::HashMap;
use std::fmt::Display;
use std::sync::Arc;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_time::DatabaseTimeToLive;
@@ -282,7 +283,7 @@ impl SchemaManager {
self.kv_backend.clone(),
req,
DEFAULT_PAGE_SIZE,
schema_decoder,
Arc::new(schema_decoder),
)
.into_stream();
@@ -307,7 +308,6 @@ impl<'a> From<&'a SchemaName> for SchemaNameKey<'a> {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;
use super::*;

Some files were not shown because too many files have changed in this diff Show More