Compare commits

...

29 Commits

Author SHA1 Message Date
zyy17
f0e2d6e663 fix: use 'target' for 'actions-rust-lang/setup-rust-toolchain' to fix cross build failed (#4661) 2024-09-02 06:11:12 +00:00
Weny Xu
306bd25c64 fix: expose missing options for initializing regions (#4660)
* fix: expose `init_regions_in_background` and `init_regions_parallelism` opts

* fix: ci
2024-09-02 03:11:18 +00:00
zyy17
ddafcc678c ci: disable macos integration test and some minor refactoring (#4658) 2024-09-02 03:06:17 +00:00
Weny Xu
2564b5daee fix: correct otlp endpoint formatting (#4646) 2024-09-02 02:59:50 +00:00
Lei, HUANG
37dcf34bb9 fix(mito): avoid caching empty batches in row group (#4652)
* fix: avoid caching empty batches in row group

* fix: clippy

* Update tests/cases/standalone/common/select/last_value.sql

* fix: sqlness
2024-09-02 02:43:00 +00:00
Yingwen
8eda36bfe3 feat: remove files from the write cache in purger (#4655)
* feat: remove files from the write cache in purger

* chore: fix typo
2024-08-31 04:19:52 +00:00
Ruihang Xia
68b59e0e5e feat: remove the requirement that partition column must be PK (#4647)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-08-31 03:16:01 +00:00
Ruihang Xia
a37aeb2814 feat: initialize partition range from ScanInput (#4635)
* feat: initialize partition range from ScanInput

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* use num_rows instead

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add todo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* setup unordered scan

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Update src/mito2/src/read/scan_region.rs

Co-authored-by: jeremyhi <jiachun_feng@proton.me>

* leave unordered scan unchanged

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: jeremyhi <jiachun_feng@proton.me>
2024-08-30 07:30:37 +00:00
jeremyhi
f641c562c2 feat: show create database (#4642)
* feat: show create database

* feat: add sqlness test

* chore: reorder mod and use

* feat: show create schema

* Update src/frontend/src/instance.rs
2024-08-30 03:58:11 +00:00
Lanqing Yang
9286e963e7 chore: adding heartbeat sent/recv counts in greptimedb nodes (#4624)
obs: adding heartbeat sent/recv counts in greptimedb nodes
2024-08-30 03:57:16 +00:00
LFC
8ea4f67e4b refactor: reduce a object store "stat" call (#4645) 2024-08-30 03:31:19 +00:00
jeremyhi
5e4bac2633 feat: import cli tool (#4639)
* feat: import create tables

* feat: import databasse

* fix: export view schema
2024-08-29 09:32:21 +00:00
LFC
d45b04180c feat: pre-download the ingested sst (#4636)
* refactor: pre-read the ingested sst file in object store to fill the local cache to accelerate first query

* feat: pre-download the ingested SST from remote to accelerate following reads

* resolve PR comments

* resolve PR comments
2024-08-29 08:36:41 +00:00
discord9
8c8499ce53 perf(flow): Map&Reduce Operator use batch to reduce alloc (#4567)
* feat: partial impl mfp

* feat: eval batch inner

* chore: fmt

* feat: mfp eval_batch

* WIP

* feat: Collection generic over row&Batch

* feat: render source batch

* chore: chore

* feat: render mfp batch

* feat: render reduce batch(WIP)

* feat(WIP): render reduce

* feat: reduce batch

* feat: render sink batch

* feat: render constant batch

* chore: error handling& mfp batch test

* test: mfp batch

* chore: rm import

* test: render reduce batch

* chore: add TODO

* chore: per bot review

* refactor: per review

* chore: cmt

* chore: rename

* docs: update no panic
2024-08-29 07:28:13 +00:00
Weny Xu
79f40a762b fix: set selector_result_cache_size in unit test again (#4641) 2024-08-29 07:14:40 +00:00
jeremyhi
b062d8515d feat: copy database ignores view and temporary tables (#4640)
feat: copy database ingores view and temporary tables
2024-08-29 06:17:51 +00:00
discord9
9f9c1dab60 feat(flow): use DataFusion's optimizer (#4489)
* feat: use datafusion optimization

refactor: mv `sql_to_flow_plan` elsewhere

feat(WIP): use df optimization

WIP analyzer rule

feat(WIP): avg expander

fix: transform avg expander

fix: avg expand

feat: names from substrait

fix: avg rewrite

test: update `test_avg`&`test_avg_group_by`

test: fix `test_sum`

test: fix some tests

chore: remove unused flow plan transform

feat: tumble expander

test: update tests

* chore: clippy

* fix: tumble lose `group expr`

* test: sqlness test update

* test: rm unused cast

* test: simplify sqlness

* refactor: per review

* chore: after rebase

* fix: remove a outdated test

* test: add comment

* fix: report error when not literal

* chore: update sqlness test after rebase

* refactor: per review
2024-08-29 02:52:00 +00:00
dennis zhuang
841e66c810 fix: config api and export metrics default database (#4633) 2024-08-28 14:28:49 +00:00
shuiyisong
d1c635085c chore: modify grafana config to accord with version 9 (#4634)
chore: update grafana config to accord with version 9
2024-08-28 12:53:35 +00:00
Weny Xu
47657ebbc8 feat: replay WAL entries respect index (#4565)
* feat(log_store): use new `Consumer`

* feat: add `from_peer_id`

* feat: read WAL entries respect index

* test: add test for `build_region_wal_index_iterator`

* fix: keep the handle

* fix: incorrect last index

* fix: replay last entry id may be greater than expected

* chore: remove unused code

* chore: apply suggestions from CR

* chore: rename `datanode_id` to `location_id`

* chore: rename `from_peer_id` to `location_id`

* chore: rename `from_peer_id` to `location_id`

* chore: apply suggestions from CR
2024-08-28 11:37:18 +00:00
Ruihang Xia
64ae32def0 feat: remove some redundent clone/conversion on constructing MergeScan stream (#4632)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-08-28 08:52:09 +00:00
Weny Xu
744946957e fix: set selector_result_cache_size in unit test (#4631) 2024-08-28 07:24:17 +00:00
Ruihang Xia
d5455db2d5 fix: update properties on updating partitions (#4627)
* fix: update properties on updating partitions

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add unit test and handle insufficient ranges

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-08-28 07:06:01 +00:00
Yingwen
28bf549907 fix: fallback to window size in manifest (#4629) 2024-08-28 06:43:56 +00:00
zyy17
4ea412249a ci: add check-builder-rust-version job in release and change release-dev-builder-images trigger condition (#4615) 2024-08-27 16:59:01 +00:00
zyy17
eacc7bc471 refactor: add app in greptime_app_version metric (#4626)
refactor: add app in greptime_app_version metric
2024-08-27 11:19:01 +00:00
Ruihang Xia
b72d3bc71d build(deps): bump backon to 1.0 (#4625)
* build(deps): bump backon to 1.0

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-08-27 09:38:12 +00:00
Ning Sun
0b102ef846 ci: improve toolchain resolution in ci (#4614)
* ci: improve toolchain resolution in ci

* fix: yaml format
2024-08-27 07:46:51 +00:00
liyang
e404e9dafc chore: setting docker authentication in dev-build image (#4623) 2024-08-27 03:49:53 +00:00
125 changed files with 4294 additions and 1954 deletions

View File

@@ -4,9 +4,6 @@ inputs:
arch:
description: Architecture to build
required: true
rust-toolchain:
description: Rust toolchain to use
required: true
cargo-profile:
description: Cargo profile to build
required: true
@@ -43,10 +40,9 @@ runs:
brew install protobuf
- name: Install rust toolchain
uses: dtolnay/rust-toolchain@master
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
toolchain: ${{ inputs.rust-toolchain }}
targets: ${{ inputs.arch }}
target: ${{ inputs.arch }}
- name: Start etcd # For integration tests.
if: ${{ inputs.disable-run-tests == 'false' }}

View File

@@ -4,9 +4,6 @@ inputs:
arch:
description: Architecture to build
required: true
rust-toolchain:
description: Rust toolchain to use
required: true
cargo-profile:
description: Cargo profile to build
required: true
@@ -28,10 +25,9 @@ runs:
- uses: arduino/setup-protoc@v3
- name: Install rust toolchain
uses: dtolnay/rust-toolchain@master
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
toolchain: ${{ inputs.rust-toolchain }}
targets: ${{ inputs.arch }}
target: ${{ inputs.arch }}
components: llvm-tools-preview
- name: Rust Cache

View File

@@ -38,7 +38,7 @@ runs:
steps:
- name: Configure AWS credentials
if: startsWith(inputs.runner, 'ec2')
uses: aws-actions/configure-aws-credentials@v2
uses: aws-actions/configure-aws-credentials@v4
with:
aws-access-key-id: ${{ inputs.aws-access-key-id }}
aws-secret-access-key: ${{ inputs.aws-secret-access-key }}

View File

@@ -25,7 +25,7 @@ runs:
steps:
- name: Configure AWS credentials
if: ${{ inputs.label && inputs.ec2-instance-id }}
uses: aws-actions/configure-aws-credentials@v2
uses: aws-actions/configure-aws-credentials@v4
with:
aws-access-key-id: ${{ inputs.aws-access-key-id }}
aws-secret-access-key: ${{ inputs.aws-secret-access-key }}

View File

@@ -12,9 +12,6 @@ on:
name: Build API docs
env:
RUST_TOOLCHAIN: nightly-2024-06-06
jobs:
apidoc:
runs-on: ubuntu-20.04
@@ -23,9 +20,7 @@ jobs:
- uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
- uses: actions-rust-lang/setup-rust-toolchain@v1
- run: cargo doc --workspace --no-deps --document-private-items
- run: |
cat <<EOF > target/doc/index.html

View File

@@ -29,9 +29,6 @@ concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
cancel-in-progress: true
env:
RUST_TOOLCHAIN: nightly-2024-06-06
jobs:
check-typos-and-docs:
name: Check typos and docs
@@ -64,9 +61,7 @@ jobs:
- uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
- uses: actions-rust-lang/setup-rust-toolchain@v1
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
@@ -82,9 +77,7 @@ jobs:
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@master
with:
toolchain: stable
- uses: actions-rust-lang/setup-rust-toolchain@v1
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
@@ -107,9 +100,7 @@ jobs:
- uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
- uses: actions-rust-lang/setup-rust-toolchain@v1
- uses: Swatinem/rust-cache@v2
with:
# Shares across multiple jobs
@@ -161,9 +152,7 @@ jobs:
- uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
- uses: actions-rust-lang/setup-rust-toolchain@v1
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
@@ -181,7 +170,7 @@ jobs:
name: bins
path: .
- name: Unzip binaries
run: |
run: |
tar -xvf ./bins.tar.gz
rm ./bins.tar.gz
- name: Run GreptimeDB
@@ -221,9 +210,7 @@ jobs:
- uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
- uses: actions-rust-lang/setup-rust-toolchain@v1
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
@@ -274,9 +261,7 @@ jobs:
- uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
- uses: actions-rust-lang/setup-rust-toolchain@v1
- uses: Swatinem/rust-cache@v2
with:
# Shares across multiple jobs
@@ -287,7 +272,7 @@ jobs:
- name: Build greptime bianry
shell: bash
# `cargo gc` will invoke `cargo build` with specified args
run: cargo gc --profile ci -- --bin greptime
run: cargo gc --profile ci -- --bin greptime
- name: Pack greptime binary
shell: bash
run: |
@@ -301,7 +286,7 @@ jobs:
artifacts-dir: bin
version: current
distributed-fuzztest:
distributed-fuzztest:
name: Fuzz Test (Distributed, ${{ matrix.mode.name }}, ${{ matrix.target }})
runs-on: ubuntu-latest
needs: build-greptime-ci
@@ -344,9 +329,7 @@ jobs:
- uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
- uses: actions-rust-lang/setup-rust-toolchain@v1
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
@@ -416,12 +399,12 @@ jobs:
- name: Describe Nodes
if: failure()
shell: bash
run: |
kubectl describe nodes
run: |
kubectl describe nodes
- name: Export kind logs
if: failure()
shell: bash
run: |
run: |
kind export logs /tmp/kind
- name: Upload logs
if: failure()
@@ -433,13 +416,13 @@ jobs:
- name: Delete cluster
if: success()
shell: bash
run: |
run: |
kind delete cluster
docker stop $(docker ps -a -q)
docker rm $(docker ps -a -q)
docker system prune -f
distributed-fuzztest-with-chaos:
distributed-fuzztest-with-chaos:
name: Fuzz Test with Chaos (Distributed, ${{ matrix.mode.name }}, ${{ matrix.target }})
runs-on: ubuntu-latest
needs: build-greptime-ci
@@ -447,7 +430,7 @@ jobs:
strategy:
matrix:
target: ["fuzz_migrate_mito_regions", "fuzz_failover_mito_regions", "fuzz_failover_metric_regions"]
mode:
mode:
- name: "Remote WAL"
minio: true
kafka: true
@@ -484,9 +467,7 @@ jobs:
- uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
- uses: actions-rust-lang/setup-rust-toolchain@v1
- name: Rust Cache
uses: Swatinem/rust-cache@v2
with:
@@ -557,12 +538,12 @@ jobs:
- name: Describe Nodes
if: failure()
shell: bash
run: |
kubectl describe nodes
run: |
kubectl describe nodes
- name: Export kind logs
if: failure()
shell: bash
run: |
run: |
kind export logs /tmp/kind
- name: Upload logs
if: failure()
@@ -574,7 +555,7 @@ jobs:
- name: Delete cluster
if: success()
shell: bash
run: |
run: |
kind delete cluster
docker stop $(docker ps -a -q)
docker rm $(docker ps -a -q)
@@ -627,9 +608,8 @@ jobs:
- uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: dtolnay/rust-toolchain@master
- uses: actions-rust-lang/setup-rust-toolchain@v1
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
components: rustfmt
- name: Rust Cache
uses: Swatinem/rust-cache@v2
@@ -648,9 +628,8 @@ jobs:
- uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: dtolnay/rust-toolchain@master
- uses: actions-rust-lang/setup-rust-toolchain@v1
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
components: clippy
- name: Rust Cache
uses: Swatinem/rust-cache@v2
@@ -674,9 +653,8 @@ jobs:
with:
version: "14.0"
- name: Install toolchain
uses: dtolnay/rust-toolchain@master
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
components: llvm-tools-preview
- name: Rust Cache
uses: Swatinem/rust-cache@v2

View File

@@ -9,9 +9,6 @@ concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
cancel-in-progress: true
env:
RUST_TOOLCHAIN: nightly-2024-06-06
permissions:
issues: write
@@ -52,9 +49,7 @@ jobs:
- uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
- uses: actions-rust-lang/setup-rust-toolchain@v1
- name: Rust Cache
uses: Swatinem/rust-cache@v2
- name: Run sqlness
@@ -85,9 +80,8 @@ jobs:
with:
version: "14.0"
- name: Install Rust toolchain
uses: dtolnay/rust-toolchain@master
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
components: llvm-tools-preview
- name: Rust Cache
uses: Swatinem/rust-cache@v2

View File

@@ -1,6 +1,12 @@
name: Release dev-builder images
on:
push:
branches:
- main
paths:
- rust-toolchain.toml
- 'docker/dev-builder/**'
workflow_dispatch: # Allows you to run this workflow manually.
inputs:
release_dev_builder_ubuntu_image:
@@ -80,28 +86,46 @@ jobs:
shell: bash
if: ${{ inputs.release_dev_builder_ubuntu_image }}
run: |
docker run quay.io/skopeo/stable:latest copy -a docker://docker.io/${{ vars.IMAGE_NAMESPACE }}/dev-builder-ubuntu:${{ needs.release-dev-builder-images.outputs.version }} \
docker run -v "${DOCKER_CONFIG:-$HOME/.docker}:/root/.docker:ro" \
-e "REGISTRY_AUTH_FILE=/root/.docker/config.json" \
quay.io/skopeo/stable:latest \
copy -a docker://docker.io/${{ vars.IMAGE_NAMESPACE }}/dev-builder-ubuntu:${{ needs.release-dev-builder-images.outputs.version }} \
docker://${{ vars.ECR_IMAGE_REGISTRY }}/${{ vars.ECR_IMAGE_NAMESPACE }}/dev-builder-ubuntu:${{ needs.release-dev-builder-images.outputs.version }}
docker run quay.io/skopeo/stable:latest copy -a docker://docker.io/${{ vars.IMAGE_NAMESPACE }}/dev-builder-ubuntu:latest \
docker run -v "${DOCKER_CONFIG:-$HOME/.docker}:/root/.docker:ro" \
-e "REGISTRY_AUTH_FILE=/root/.docker/config.json" \
quay.io/skopeo/stable:latest \
copy -a docker://docker.io/${{ vars.IMAGE_NAMESPACE }}/dev-builder-ubuntu:latest \
docker://${{ vars.ECR_IMAGE_REGISTRY }}/${{ vars.ECR_IMAGE_NAMESPACE }}/dev-builder-ubuntu:latest
- name: Push dev-builder-centos image
shell: bash
if: ${{ inputs.release_dev_builder_centos_image }}
run: |
docker run quay.io/skopeo/stable:latest copy -a docker://docker.io/${{ vars.IMAGE_NAMESPACE }}/dev-builder-centos:${{ needs.release-dev-builder-images.outputs.version }} \
docker run -v "${DOCKER_CONFIG:-$HOME/.docker}:/root/.docker:ro" \
-e "REGISTRY_AUTH_FILE=/root/.docker/config.json" \
quay.io/skopeo/stable:latest \
copy -a docker://docker.io/${{ vars.IMAGE_NAMESPACE }}/dev-builder-centos:${{ needs.release-dev-builder-images.outputs.version }} \
docker://${{ vars.ECR_IMAGE_REGISTRY }}/${{ vars.ECR_IMAGE_NAMESPACE }}/dev-builder-centos:${{ needs.release-dev-builder-images.outputs.version }}
docker run quay.io/skopeo/stable:latest copy -a docker://docker.io/${{ vars.IMAGE_NAMESPACE }}/dev-builder-centos:latest \
docker run -v "${DOCKER_CONFIG:-$HOME/.docker}:/root/.docker:ro" \
-e "REGISTRY_AUTH_FILE=/root/.docker/config.json" \
quay.io/skopeo/stable:latest \
copy -a docker://docker.io/${{ vars.IMAGE_NAMESPACE }}/dev-builder-centos:latest \
docker://${{ vars.ECR_IMAGE_REGISTRY }}/${{ vars.ECR_IMAGE_NAMESPACE }}/dev-builder-centos:latest
- name: Push dev-builder-android image
shell: bash
if: ${{ inputs.release_dev_builder_android_image }}
run: |
docker run quay.io/skopeo/stable:latest copy -a docker://docker.io/${{ vars.IMAGE_NAMESPACE }}/dev-builder-android:${{ needs.release-dev-builder-images.outputs.version }} \
docker run -v "${DOCKER_CONFIG:-$HOME/.docker}:/root/.docker:ro" \
-e "REGISTRY_AUTH_FILE=/root/.docker/config.json" \
quay.io/skopeo/stable:latest \
copy -a docker://docker.io/${{ vars.IMAGE_NAMESPACE }}/dev-builder-android:${{ needs.release-dev-builder-images.outputs.version }} \
docker://${{ vars.ECR_IMAGE_REGISTRY }}/${{ vars.ECR_IMAGE_NAMESPACE }}/dev-builder-android:${{ needs.release-dev-builder-images.outputs.version }}
docker run quay.io/skopeo/stable:latest copy -a docker://docker.io/${{ vars.IMAGE_NAMESPACE }}/dev-builder-android:latest \
docker run -v "${DOCKER_CONFIG:-$HOME/.docker}:/root/.docker:ro" \
-e "REGISTRY_AUTH_FILE=/root/.docker/config.json" \
quay.io/skopeo/stable:latest \
copy -a docker://docker.io/${{ vars.IMAGE_NAMESPACE }}/dev-builder-android:latest \
docker://${{ vars.ECR_IMAGE_REGISTRY }}/${{ vars.ECR_IMAGE_NAMESPACE }}/dev-builder-android:latest
release-dev-builder-images-cn: # Note: Be careful issue: https://github.com/containers/skopeo/issues/1874 and we decide to use the latest stable skopeo container.
name: Release dev builder images to CN region
@@ -121,19 +145,28 @@ jobs:
shell: bash
if: ${{ inputs.release_dev_builder_ubuntu_image }}
run: |
docker run quay.io/skopeo/stable:latest copy -a docker://docker.io/${{ vars.IMAGE_NAMESPACE }}/dev-builder-ubuntu:${{ needs.release-dev-builder-images.outputs.version }} \
docker run -v "${DOCKER_CONFIG:-$HOME/.docker}:/root/.docker:ro" \
-e "REGISTRY_AUTH_FILE=/root/.docker/config.json" \
quay.io/skopeo/stable:latest \
copy -a docker://docker.io/${{ vars.IMAGE_NAMESPACE }}/dev-builder-ubuntu:${{ needs.release-dev-builder-images.outputs.version }} \
docker://${{ vars.ACR_IMAGE_REGISTRY }}/${{ vars.IMAGE_NAMESPACE }}/dev-builder-ubuntu:${{ needs.release-dev-builder-images.outputs.version }}
- name: Push dev-builder-centos image
shell: bash
if: ${{ inputs.release_dev_builder_centos_image }}
run: |
docker run quay.io/skopeo/stable:latest copy -a docker://docker.io/${{ vars.IMAGE_NAMESPACE }}/dev-builder-centos:${{ needs.release-dev-builder-images.outputs.version }} \
docker run -v "${DOCKER_CONFIG:-$HOME/.docker}:/root/.docker:ro" \
-e "REGISTRY_AUTH_FILE=/root/.docker/config.json" \
quay.io/skopeo/stable:latest \
copy -a docker://docker.io/${{ vars.IMAGE_NAMESPACE }}/dev-builder-centos:${{ needs.release-dev-builder-images.outputs.version }} \
docker://${{ vars.ACR_IMAGE_REGISTRY }}/${{ vars.IMAGE_NAMESPACE }}/dev-builder-centos:${{ needs.release-dev-builder-images.outputs.version }}
- name: Push dev-builder-android image
shell: bash
if: ${{ inputs.release_dev_builder_android_image }}
run: |
docker run quay.io/skopeo/stable:latest copy -a docker://docker.io/${{ vars.IMAGE_NAMESPACE }}/dev-builder-android:${{ needs.release-dev-builder-images.outputs.version }} \
docker run -v "${DOCKER_CONFIG:-$HOME/.docker}:/root/.docker:ro" \
-e "REGISTRY_AUTH_FILE=/root/.docker/config.json" \
quay.io/skopeo/stable:latest \
copy -a docker://docker.io/${{ vars.IMAGE_NAMESPACE }}/dev-builder-android:${{ needs.release-dev-builder-images.outputs.version }} \
docker://${{ vars.ACR_IMAGE_REGISTRY }}/${{ vars.IMAGE_NAMESPACE }}/dev-builder-android:${{ needs.release-dev-builder-images.outputs.version }}

View File

@@ -33,6 +33,7 @@ on:
description: The runner uses to build linux-arm64 artifacts
default: ec2-c6g.4xlarge-arm64
options:
- ubuntu-2204-32-cores-arm
- ec2-c6g.xlarge-arm64 # 4C8G
- ec2-c6g.2xlarge-arm64 # 8C16G
- ec2-c6g.4xlarge-arm64 # 16C32G
@@ -82,7 +83,6 @@ on:
# Use env variables to control all the release process.
env:
# The arguments of building greptime.
RUST_TOOLCHAIN: nightly-2024-06-06
CARGO_PROFILE: nightly
# Controls whether to run tests, include unit-test, integration-test and sqlness.
@@ -123,6 +123,11 @@ jobs:
with:
fetch-depth: 0
- name: Check Rust toolchain version
shell: bash
run: |
./scripts/check-builder-rust-version.sh
# The create-version will create a global variable named 'version' in the global workflows.
# - If it's a tag push release, the version is the tag name(${{ github.ref_name }});
# - If it's a scheduled release, the version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-$buildTime', like v0.2.0-nigthly-20230313;
@@ -244,11 +249,11 @@ jobs:
- uses: ./.github/actions/build-macos-artifacts
with:
arch: ${{ matrix.arch }}
rust-toolchain: ${{ env.RUST_TOOLCHAIN }}
cargo-profile: ${{ env.CARGO_PROFILE }}
features: ${{ matrix.features }}
version: ${{ needs.allocate-runners.outputs.version }}
disable-run-tests: ${{ env.DISABLE_RUN_TESTS }}
# We decide to disable the integration tests on macOS because it's unnecessary and time-consuming.
disable-run-tests: true
artifacts-dir: ${{ matrix.artifacts-dir-prefix }}-${{ needs.allocate-runners.outputs.version }}
- name: Set build macos result
@@ -287,7 +292,6 @@ jobs:
- uses: ./.github/actions/build-windows-artifacts
with:
arch: ${{ matrix.arch }}
rust-toolchain: ${{ env.RUST_TOOLCHAIN }}
cargo-profile: ${{ env.CARGO_PROFILE }}
features: ${{ matrix.features }}
version: ${{ needs.allocate-runners.outputs.version }}

22
Cargo.lock generated
View File

@@ -897,6 +897,15 @@ dependencies = [
"tokio",
]
[[package]]
name = "backon"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2caf634d05fe0642d0fb1ab43497fa627088ecd93f84b2d0f2a5d7b91f7730db"
dependencies = [
"fastrand",
]
[[package]]
name = "backtrace"
version = "0.3.73"
@@ -2111,7 +2120,7 @@ version = "0.9.2"
dependencies = [
"async-stream",
"async-trait",
"backon",
"backon 1.0.2",
"common-base",
"common-error",
"common-macro",
@@ -4708,7 +4717,7 @@ dependencies = [
"httpdate",
"itoa",
"pin-project-lite",
"socket2 0.5.7",
"socket2 0.4.10",
"tokio",
"tower-service",
"tracing",
@@ -5817,6 +5826,7 @@ dependencies = [
"common-time",
"common-wal",
"delta-encoding",
"derive_builder 0.12.0",
"futures",
"futures-util",
"itertools 0.10.5",
@@ -7011,13 +7021,13 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
[[package]]
name = "opendal"
version = "0.49.0"
version = "0.49.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39d516adf7db912c38af382c3e92c27cd62fbbc240e630920555d784c2ab1494"
checksum = "ba615070686c8781ce97376fdafca29d7c42f47b31d2230d7c8c1642ec823950"
dependencies = [
"anyhow",
"async-trait",
"backon",
"backon 0.4.4",
"base64 0.22.1",
"bytes",
"chrono",
@@ -8502,7 +8512,7 @@ dependencies = [
"indoc",
"libc",
"memoffset 0.9.1",
"parking_lot 0.12.3",
"parking_lot 0.11.2",
"portable-atomic",
"pyo3-build-config",
"pyo3-ffi",

View File

@@ -8,6 +8,7 @@ CARGO_BUILD_OPTS := --locked
IMAGE_REGISTRY ?= docker.io
IMAGE_NAMESPACE ?= greptime
IMAGE_TAG ?= latest
DEV_BUILDER_IMAGE_TAG ?= 2024-06-06-b4b105ad-20240827021230
BUILDX_MULTI_PLATFORM_BUILD ?= false
BUILDX_BUILDER_NAME ?= gtbuilder
BASE_IMAGE ?= ubuntu
@@ -77,7 +78,7 @@ build: ## Build debug version greptime.
build-by-dev-builder: ## Build greptime by dev-builder.
docker run --network=host \
-v ${PWD}:/greptimedb -v ${CARGO_REGISTRY_CACHE}:/root/.cargo/registry \
-w /greptimedb ${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/dev-builder-${BASE_IMAGE}:latest \
-w /greptimedb ${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/dev-builder-${BASE_IMAGE}:${DEV_BUILDER_IMAGE_TAG} \
make build \
CARGO_EXTENSION="${CARGO_EXTENSION}" \
CARGO_PROFILE=${CARGO_PROFILE} \
@@ -91,7 +92,7 @@ build-by-dev-builder: ## Build greptime by dev-builder.
build-android-bin: ## Build greptime binary for android.
docker run --network=host \
-v ${PWD}:/greptimedb -v ${CARGO_REGISTRY_CACHE}:/root/.cargo/registry \
-w /greptimedb ${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/dev-builder-android:latest \
-w /greptimedb ${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/dev-builder-android:${DEV_BUILDER_IMAGE_TAG} \
make build \
CARGO_EXTENSION="ndk --platform 23 -t aarch64-linux-android" \
CARGO_PROFILE=release \
@@ -105,7 +106,7 @@ build-android-bin: ## Build greptime binary for android.
strip-android-bin: build-android-bin ## Strip greptime binary for android.
docker run --network=host \
-v ${PWD}:/greptimedb \
-w /greptimedb ${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/dev-builder-android:latest \
-w /greptimedb ${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/dev-builder-android:${DEV_BUILDER_IMAGE_TAG} \
bash -c '$${NDK_ROOT}/toolchains/llvm/prebuilt/linux-x86_64/bin/llvm-strip --strip-debug /greptimedb/target/aarch64-linux-android/release/greptime'
.PHONY: clean
@@ -145,7 +146,7 @@ dev-builder: multi-platform-buildx ## Build dev-builder image.
docker buildx build --builder ${BUILDX_BUILDER_NAME} \
--build-arg="RUST_TOOLCHAIN=${RUST_TOOLCHAIN}" \
-f docker/dev-builder/${BASE_IMAGE}/Dockerfile \
-t ${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/dev-builder-${BASE_IMAGE}:${IMAGE_TAG} ${BUILDX_MULTI_PLATFORM_BUILD_OPTS} .
-t ${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/dev-builder-${BASE_IMAGE}:${DEV_BUILDER_IMAGE_TAG} ${BUILDX_MULTI_PLATFORM_BUILD_OPTS} .
.PHONY: multi-platform-buildx
multi-platform-buildx: ## Create buildx multi-platform builder.
@@ -203,7 +204,7 @@ stop-etcd: ## Stop single node etcd for testing purpose.
run-it-in-container: start-etcd ## Run integration tests in dev-builder.
docker run --network=host \
-v ${PWD}:/greptimedb -v ${CARGO_REGISTRY_CACHE}:/root/.cargo/registry -v /tmp:/tmp \
-w /greptimedb ${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/dev-builder-${BASE_IMAGE}:latest \
-w /greptimedb ${IMAGE_REGISTRY}/${IMAGE_NAMESPACE}/dev-builder-${BASE_IMAGE}:${DEV_BUILDER_IMAGE_TAG} \
make test sqlness-test BUILD_JOBS=${BUILD_JOBS}
.PHONY: start-cluster

View File

@@ -15,6 +15,8 @@
| `mode` | String | `standalone` | The running mode of the datanode. It can be `standalone` or `distributed`. |
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. |
| `default_timezone` | String | `None` | The default timezone of the server. |
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.<br/>By default, it provides services after all regions have been initialized. |
| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |
| `runtime` | -- | -- | The runtime options. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
@@ -160,10 +162,10 @@
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself |
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself<br/>You must create the database before enabling it. |
| `export_metrics.self_import.db` | String | `None` | -- |
| `export_metrics.remote_write` | -- | -- | -- |
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=information_schema`. |
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | `None` | The tokio console address. |
@@ -245,10 +247,10 @@
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself |
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself<br/>You must create the database before enabling it. |
| `export_metrics.self_import.db` | String | `None` | -- |
| `export_metrics.remote_write` | -- | -- | -- |
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=information_schema`. |
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | `None` | The tokio console address. |
@@ -309,10 +311,10 @@
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself |
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself<br/>You must create the database before enabling it. |
| `export_metrics.self_import.db` | String | `None` | -- |
| `export_metrics.remote_write` | -- | -- | -- |
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=information_schema`. |
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | `None` | The tokio console address. |
@@ -333,6 +335,10 @@
| `rpc_runtime_size` | Integer | `None` | Deprecated, use `grpc.runtime_size` instead. |
| `rpc_max_recv_message_size` | String | `None` | Deprecated, use `grpc.rpc_max_recv_message_size` instead. |
| `rpc_max_send_message_size` | String | `None` | Deprecated, use `grpc.rpc_max_send_message_size` instead. |
| `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.addr` | String | `127.0.0.1:3001` | The address to bind the gRPC server. |
| `grpc.hostname` | String | `127.0.0.1` | The hostname advertised to the metasrv,<br/>and used for connections from outside the host |
@@ -453,10 +459,10 @@
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. send to `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself |
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommend to collect metrics generated by itself<br/>You must create the database before enabling it. |
| `export_metrics.self_import.db` | String | `None` | -- |
| `export_metrics.remote_write` | -- | -- | -- |
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=information_schema`. |
| `export_metrics.remote_write.url` | String | `""` | The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | `None` | The tokio console address. |

View File

@@ -39,6 +39,18 @@ rpc_max_recv_message_size = "512MB"
## +toml2docs:none-default
rpc_max_send_message_size = "512MB"
## The HTTP server options.
[http]
## The address to bind the HTTP server.
addr = "127.0.0.1:4000"
## HTTP request timeout. Set to 0 to disable timeout.
timeout = "30s"
## HTTP request body limit.
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
## Set to 0 to disable limit.
body_limit = "64MB"
## The gRPC server options.
[grpc]
## The address to bind the gRPC server.
@@ -552,12 +564,13 @@ enable = false
write_interval = "30s"
## For `standalone` mode, `self_import` is recommend to collect metrics generated by itself
## You must create the database before enabling it.
[export_metrics.self_import]
## +toml2docs:none-default
db = "information_schema"
db = "greptime_metrics"
[export_metrics.remote_write]
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=information_schema`.
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
url = ""
## HTTP headers of Prometheus remote-write carry.

View File

@@ -199,12 +199,13 @@ enable = false
write_interval = "30s"
## For `standalone` mode, `self_import` is recommend to collect metrics generated by itself
## You must create the database before enabling it.
[export_metrics.self_import]
## +toml2docs:none-default
db = "information_schema"
db = "greptime_metrics"
[export_metrics.remote_write]
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=information_schema`.
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
url = ""
## HTTP headers of Prometheus remote-write carry.

View File

@@ -186,12 +186,13 @@ enable = false
write_interval = "30s"
## For `standalone` mode, `self_import` is recommend to collect metrics generated by itself
## You must create the database before enabling it.
[export_metrics.self_import]
## +toml2docs:none-default
db = "information_schema"
db = "greptime_metrics"
[export_metrics.remote_write]
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=information_schema`.
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
url = ""
## HTTP headers of Prometheus remote-write carry.

View File

@@ -8,6 +8,13 @@ enable_telemetry = true
## +toml2docs:none-default
default_timezone = "UTC"
## Initialize all regions in the background during the startup.
## By default, it provides services after all regions have been initialized.
init_regions_in_background = false
## Parallelism of initializing regions.
init_regions_parallelism = 16
## The runtime options.
[runtime]
## The number of threads to execute the runtime for global read operations.
@@ -601,12 +608,13 @@ enable = false
write_interval = "30s"
## For `standalone` mode, `self_import` is recommend to collect metrics generated by itself
## You must create the database before enabling it.
[export_metrics.self_import]
## +toml2docs:none-default
db = "information_schema"
db = "greptime_metrics"
[export_metrics.remote_write]
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=information_schema`.
## The url the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
url = ""
## HTTP headers of Prometheus remote-write carry.

View File

@@ -157,26 +157,6 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -326,26 +306,6 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -497,26 +457,6 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -668,26 +608,6 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -852,26 +772,6 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -1023,26 +923,6 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -1194,26 +1074,6 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -1365,26 +1225,6 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -1536,26 +1376,6 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -1707,26 +1527,6 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -1878,26 +1678,6 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -2062,26 +1842,6 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -2233,26 +1993,6 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -2417,26 +2157,6 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -2569,26 +2289,6 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -2751,26 +2451,6 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -3091,28 +2771,7 @@
},
"unit": "s"
},
"overrides": [
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
"overrides": []
},
"gridPos": {
"h": 8,
@@ -3242,26 +2901,6 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -3429,26 +3068,6 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -3598,26 +3217,6 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -3740,28 +3339,7 @@
},
"unit": "s"
},
"overrides": [
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
"overrides": []
},
"gridPos": {
"h": 8,
@@ -4101,26 +3679,6 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -4270,26 +3828,6 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -4439,26 +3977,6 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -4608,26 +4126,6 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -4777,26 +4275,6 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -4946,26 +4424,6 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -5115,26 +4573,6 @@
}
}
]
},
{
"matcher": {
"id": "byValue",
"options": {
"op": "gte",
"reducer": "allIsZero",
"value": 0
}
},
"properties": [
{
"id": "custom.hideFrom",
"value": {
"legend": true,
"tooltip": true,
"viz": true
}
}
]
}
]
},
@@ -5241,4 +4679,4 @@
"uid": "ea35efe5-918e-44fa-9743-e9aa1a340a3f",
"version": 11,
"weekStart": ""
}
}

View File

@@ -0,0 +1,42 @@
#!/usr/bin/env bash
set -e
RUST_TOOLCHAIN_VERSION_FILE="rust-toolchain.toml"
DEV_BUILDER_UBUNTU_REGISTRY="docker.io"
DEV_BUILDER_UBUNTU_NAMESPACE="greptime"
DEV_BUILDER_UBUNTU_NAME="dev-builder-ubuntu"
function check_rust_toolchain_version() {
DEV_BUILDER_IMAGE_TAG=$(grep "DEV_BUILDER_IMAGE_TAG ?= " Makefile | cut -d= -f2 | sed 's/^[ \t]*//')
if [ -z "$DEV_BUILDER_IMAGE_TAG" ]; then
echo "Error: No DEV_BUILDER_IMAGE_TAG found in Makefile"
exit 1
fi
DEV_BUILDER_UBUNTU_IMAGE="$DEV_BUILDER_UBUNTU_REGISTRY/$DEV_BUILDER_UBUNTU_NAMESPACE/$DEV_BUILDER_UBUNTU_NAME:$DEV_BUILDER_IMAGE_TAG"
CURRENT_VERSION=$(grep -Eo '[0-9]{4}-[0-9]{2}-[0-9]{2}' "$RUST_TOOLCHAIN_VERSION_FILE")
if [ -z "$CURRENT_VERSION" ]; then
echo "Error: No rust toolchain version found in $RUST_TOOLCHAIN_VERSION_FILE"
exit 1
fi
RUST_TOOLCHAIN_VERSION_IN_BUILDER=$(docker run "$DEV_BUILDER_UBUNTU_IMAGE" rustc --version | grep -Eo '[0-9]{4}-[0-9]{2}-[0-9]{2}')
if [ -z "$RUST_TOOLCHAIN_VERSION_IN_BUILDER" ]; then
echo "Error: No rustc version found in $DEV_BUILDER_UBUNTU_IMAGE"
exit 1
fi
# Compare the version and the difference should be less than 1 day.
current_rust_toolchain_seconds=$(date -d "$CURRENT_VERSION" +%s)
rust_toolchain_in_dev_builder_ubuntu_seconds=$(date -d "$RUST_TOOLCHAIN_VERSION_IN_BUILDER" +%s)
date_diff=$(( (current_rust_toolchain_seconds - rust_toolchain_in_dev_builder_ubuntu_seconds) / 86400 ))
if [ $date_diff -gt 1 ]; then
echo "Error: The rust toolchain '$RUST_TOOLCHAIN_VERSION_IN_BUILDER' in builder '$DEV_BUILDER_UBUNTU_IMAGE' maybe outdated, please update it to '$CURRENT_VERSION'"
exit 1
fi
}
check_rust_toolchain_version

View File

@@ -313,7 +313,7 @@ struct SystemCatalog {
catalog_cache: Cache<String, Arc<InformationSchemaProvider>>,
pg_catalog_cache: Cache<String, Arc<PGCatalogProvider>>,
// system_schema_provier for default catalog
// system_schema_provider for default catalog
information_schema_provider: Arc<InformationSchemaProvider>,
pg_catalog_provider: Arc<PGCatalogProvider>,
backend: KvBackendRef,

View File

@@ -21,6 +21,8 @@ mod export;
mod helper;
// Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373
mod database;
mod import;
#[allow(unused)]
mod repl;
@@ -32,6 +34,7 @@ pub use repl::Repl;
use tracing_appender::non_blocking::WorkerGuard;
use self::export::ExportCommand;
use crate::cli::import::ImportCommand;
use crate::error::Result;
use crate::options::GlobalOptions;
use crate::App;
@@ -114,6 +117,7 @@ enum SubCommand {
// Attach(AttachCommand),
Bench(BenchTableMetadataCommand),
Export(ExportCommand),
Import(ImportCommand),
}
impl SubCommand {
@@ -122,6 +126,7 @@ impl SubCommand {
// SubCommand::Attach(cmd) => cmd.build().await,
SubCommand::Bench(cmd) => cmd.build(guard).await,
SubCommand::Export(cmd) => cmd.build(guard).await,
SubCommand::Import(cmd) => cmd.build(guard).await,
}
}
}

119
src/cmd/src/cli/database.rs Normal file
View File

@@ -0,0 +1,119 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use base64::engine::general_purpose;
use base64::Engine;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use serde_json::Value;
use servers::http::greptime_result_v1::GreptimedbV1Response;
use servers::http::GreptimeQueryOutput;
use snafu::ResultExt;
use crate::error::{HttpQuerySqlSnafu, Result, SerdeJsonSnafu};
pub(crate) struct DatabaseClient {
addr: String,
catalog: String,
auth_header: Option<String>,
}
impl DatabaseClient {
pub fn new(addr: String, catalog: String, auth_basic: Option<String>) -> Self {
let auth_header = if let Some(basic) = auth_basic {
let encoded = general_purpose::STANDARD.encode(basic);
Some(format!("basic {}", encoded))
} else {
None
};
Self {
addr,
catalog,
auth_header,
}
}
pub async fn sql_in_public(&self, sql: &str) -> Result<Option<Vec<Vec<Value>>>> {
self.sql(sql, DEFAULT_SCHEMA_NAME).await
}
/// Execute sql query.
pub async fn sql(&self, sql: &str, schema: &str) -> Result<Option<Vec<Vec<Value>>>> {
let url = format!("http://{}/v1/sql", self.addr);
let params = [
("db", format!("{}-{}", self.catalog, schema)),
("sql", sql.to_string()),
];
let mut request = reqwest::Client::new()
.post(&url)
.form(&params)
.header("Content-Type", "application/x-www-form-urlencoded");
if let Some(ref auth) = self.auth_header {
request = request.header("Authorization", auth);
}
let response = request.send().await.with_context(|_| HttpQuerySqlSnafu {
reason: format!("bad url: {}", url),
})?;
let response = response
.error_for_status()
.with_context(|_| HttpQuerySqlSnafu {
reason: format!("query failed: {}", sql),
})?;
let text = response.text().await.with_context(|_| HttpQuerySqlSnafu {
reason: "cannot get response text".to_string(),
})?;
let body = serde_json::from_str::<GreptimedbV1Response>(&text).context(SerdeJsonSnafu)?;
Ok(body.output().first().and_then(|output| match output {
GreptimeQueryOutput::Records(records) => Some(records.rows().clone()),
GreptimeQueryOutput::AffectedRows(_) => None,
}))
}
}
/// Split at `-`.
pub(crate) fn split_database(database: &str) -> Result<(String, Option<String>)> {
let (catalog, schema) = match database.split_once('-') {
Some((catalog, schema)) => (catalog, schema),
None => (DEFAULT_CATALOG_NAME, database),
};
if schema == "*" {
Ok((catalog.to_string(), None))
} else {
Ok((catalog.to_string(), Some(schema.to_string())))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_split_database() {
let result = split_database("catalog-schema").unwrap();
assert_eq!(result, ("catalog".to_string(), Some("schema".to_string())));
let result = split_database("schema").unwrap();
assert_eq!(result, ("greptime".to_string(), Some("schema".to_string())));
let result = split_database("catalog-*").unwrap();
assert_eq!(result, ("catalog".to_string(), None));
let result = split_database("*").unwrap();
assert_eq!(result, ("greptime".to_string(), None));
}
}

View File

@@ -17,26 +17,19 @@ use std::path::Path;
use std::sync::Arc;
use async_trait::async_trait;
use base64::engine::general_purpose;
use base64::Engine;
use clap::{Parser, ValueEnum};
use client::DEFAULT_SCHEMA_NAME;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_telemetry::{debug, error, info};
use serde_json::Value;
use servers::http::greptime_result_v1::GreptimedbV1Response;
use servers::http::GreptimeQueryOutput;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use tokio::fs::File;
use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::sync::Semaphore;
use tokio::time::Instant;
use tracing_appender::non_blocking::WorkerGuard;
use crate::cli::{Instance, Tool};
use crate::error::{
EmptyResultSnafu, Error, FileIoSnafu, HttpQuerySqlSnafu, Result, SerdeJsonSnafu,
};
use crate::cli::database::DatabaseClient;
use crate::cli::{database, Instance, Tool};
use crate::error::{EmptyResultSnafu, Error, FileIoSnafu, Result};
type TableReference = (String, String, String);
@@ -94,26 +87,21 @@ pub struct ExportCommand {
impl ExportCommand {
pub async fn build(&self, guard: Vec<WorkerGuard>) -> Result<Instance> {
let (catalog, schema) = split_database(&self.database)?;
let (catalog, schema) = database::split_database(&self.database)?;
let auth_header = if let Some(basic) = &self.auth_basic {
let encoded = general_purpose::STANDARD.encode(basic);
Some(format!("basic {}", encoded))
} else {
None
};
let database_client =
DatabaseClient::new(self.addr.clone(), catalog.clone(), self.auth_basic.clone());
Ok(Instance::new(
Box::new(Export {
addr: self.addr.clone(),
catalog,
schema,
database_client,
output_dir: self.output_dir.clone(),
parallelism: self.export_jobs,
target: self.target.clone(),
start_time: self.start_time.clone(),
end_time: self.end_time.clone(),
auth_header,
}),
guard,
))
@@ -121,78 +109,43 @@ impl ExportCommand {
}
pub struct Export {
addr: String,
catalog: String,
schema: Option<String>,
database_client: DatabaseClient,
output_dir: String,
parallelism: usize,
target: ExportTarget,
start_time: Option<String>,
end_time: Option<String>,
auth_header: Option<String>,
}
impl Export {
/// Execute one single sql query.
async fn sql(&self, sql: &str) -> Result<Option<Vec<Vec<Value>>>> {
let url = format!(
"http://{}/v1/sql?db={}-{}&sql={}",
self.addr,
self.catalog,
self.schema.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME),
sql
);
let mut request = reqwest::Client::new()
.get(&url)
.header("Content-Type", "application/x-www-form-urlencoded");
if let Some(ref auth) = self.auth_header {
request = request.header("Authorization", auth);
async fn get_db_names(&self) -> Result<Vec<String>> {
if let Some(schema) = &self.schema {
Ok(vec![schema.clone()])
} else {
self.all_db_names().await
}
let response = request.send().await.with_context(|_| HttpQuerySqlSnafu {
reason: format!("bad url: {}", url),
})?;
let response = response
.error_for_status()
.with_context(|_| HttpQuerySqlSnafu {
reason: format!("query failed: {}", sql),
})?;
let text = response.text().await.with_context(|_| HttpQuerySqlSnafu {
reason: "cannot get response text".to_string(),
})?;
let body = serde_json::from_str::<GreptimedbV1Response>(&text).context(SerdeJsonSnafu)?;
Ok(body.output().first().and_then(|output| match output {
GreptimeQueryOutput::Records(records) => Some(records.rows().clone()),
GreptimeQueryOutput::AffectedRows(_) => None,
}))
}
/// Iterate over all db names.
///
/// Newbie: `db_name` is catalog + schema.
async fn iter_db_names(&self) -> Result<Vec<(String, String)>> {
if let Some(schema) = &self.schema {
Ok(vec![(self.catalog.clone(), schema.clone())])
} else {
let result = self.sql("SHOW DATABASES").await?;
let Some(records) = result else {
EmptyResultSnafu.fail()?
async fn all_db_names(&self) -> Result<Vec<String>> {
let result = self.database_client.sql_in_public("SHOW DATABASES").await?;
let records = result.context(EmptyResultSnafu)?;
let mut result = Vec::with_capacity(records.len());
for value in records {
let Value::String(schema) = &value[0] else {
unreachable!()
};
let mut result = Vec::with_capacity(records.len());
for value in records {
let Value::String(schema) = &value[0] else {
unreachable!()
};
if schema == common_catalog::consts::INFORMATION_SCHEMA_NAME {
continue;
}
result.push((self.catalog.clone(), schema.clone()));
if schema == common_catalog::consts::INFORMATION_SCHEMA_NAME {
continue;
}
Ok(result)
if schema == common_catalog::consts::PG_CATALOG_NAME {
continue;
}
result.push(schema.clone());
}
Ok(result)
}
/// Return a list of [`TableReference`] to be exported.
@@ -201,7 +154,11 @@ impl Export {
&self,
catalog: &str,
schema: &str,
) -> Result<(Vec<TableReference>, Vec<TableReference>)> {
) -> Result<(
Vec<TableReference>,
Vec<TableReference>,
Vec<TableReference>,
)> {
// Puts all metric table first
let sql = format!(
"SELECT table_catalog, table_schema, table_name \
@@ -210,15 +167,13 @@ impl Export {
and table_catalog = \'{catalog}\' \
and table_schema = \'{schema}\'"
);
let result = self.sql(&sql).await?;
let Some(records) = result else {
EmptyResultSnafu.fail()?
};
let result = self.database_client.sql_in_public(&sql).await?;
let records = result.context(EmptyResultSnafu)?;
let mut metric_physical_tables = HashSet::with_capacity(records.len());
for value in records {
let mut t = Vec::with_capacity(3);
for v in &value {
let serde_json::Value::String(value) = v else {
let Value::String(value) = v else {
unreachable!()
};
t.push(value);
@@ -228,54 +183,63 @@ impl Export {
// TODO: SQL injection hurts
let sql = format!(
"SELECT table_catalog, table_schema, table_name \
"SELECT table_catalog, table_schema, table_name, table_type \
FROM information_schema.tables \
WHERE table_type = \'BASE TABLE\' \
WHERE (table_type = \'BASE TABLE\' OR table_type = \'VIEW\') \
and table_catalog = \'{catalog}\' \
and table_schema = \'{schema}\'",
);
let result = self.sql(&sql).await?;
let Some(records) = result else {
EmptyResultSnafu.fail()?
};
let result = self.database_client.sql_in_public(&sql).await?;
let records = result.context(EmptyResultSnafu)?;
debug!("Fetched table list: {:?}", records);
debug!("Fetched table/view list: {:?}", records);
if records.is_empty() {
return Ok((vec![], vec![]));
return Ok((vec![], vec![], vec![]));
}
let mut remaining_tables = Vec::with_capacity(records.len());
let mut views = Vec::new();
for value in records {
let mut t = Vec::with_capacity(3);
let mut t = Vec::with_capacity(4);
for v in &value {
let serde_json::Value::String(value) = v else {
let Value::String(value) = v else {
unreachable!()
};
t.push(value);
}
let table = (t[0].clone(), t[1].clone(), t[2].clone());
let table_type = t[3].as_str();
// Ignores the physical table
if !metric_physical_tables.contains(&table) {
remaining_tables.push(table);
if table_type == "VIEW" {
views.push(table);
} else {
remaining_tables.push(table);
}
}
}
Ok((
metric_physical_tables.into_iter().collect(),
remaining_tables,
views,
))
}
async fn show_create_table(&self, catalog: &str, schema: &str, table: &str) -> Result<String> {
async fn show_create(
&self,
show_type: &str,
catalog: &str,
schema: &str,
table: &str,
) -> Result<String> {
let sql = format!(
r#"SHOW CREATE TABLE "{}"."{}"."{}""#,
catalog, schema, table
r#"SHOW CREATE {} "{}"."{}"."{}""#,
show_type, catalog, schema, table
);
let result = self.sql(&sql).await?;
let Some(records) = result else {
EmptyResultSnafu.fail()?
};
let result = self.database_client.sql_in_public(&sql).await?;
let records = result.context(EmptyResultSnafu)?;
let Value::String(create_table) = &records[0][1] else {
unreachable!()
};
@@ -286,18 +250,19 @@ impl Export {
async fn export_create_table(&self) -> Result<()> {
let timer = Instant::now();
let semaphore = Arc::new(Semaphore::new(self.parallelism));
let db_names = self.iter_db_names().await?;
let db_names = self.get_db_names().await?;
let db_count = db_names.len();
let mut tasks = Vec::with_capacity(db_names.len());
for (catalog, schema) in db_names {
for schema in db_names {
let semaphore_moved = semaphore.clone();
tasks.push(async move {
let _permit = semaphore_moved.acquire().await.unwrap();
let (metric_physical_tables, remaining_tables) =
self.get_table_list(&catalog, &schema).await?;
let table_count = metric_physical_tables.len() + remaining_tables.len();
let (metric_physical_tables, remaining_tables, views) =
self.get_table_list(&self.catalog, &schema).await?;
let table_count =
metric_physical_tables.len() + remaining_tables.len() + views.len();
let output_dir = Path::new(&self.output_dir)
.join(&catalog)
.join(&self.catalog)
.join(format!("{schema}/"));
tokio::fs::create_dir_all(&output_dir)
.await
@@ -305,7 +270,7 @@ impl Export {
let output_file = Path::new(&output_dir).join("create_tables.sql");
let mut file = File::create(output_file).await.context(FileIoSnafu)?;
for (c, s, t) in metric_physical_tables.into_iter().chain(remaining_tables) {
match self.show_create_table(&c, &s, &t).await {
match self.show_create("TABLE", &c, &s, &t).await {
Err(e) => {
error!(e; r#"Failed to export table "{}"."{}"."{}""#, c, s, t)
}
@@ -316,9 +281,22 @@ impl Export {
}
}
}
for (c, s, v) in views {
match self.show_create("VIEW", &c, &s, &v).await {
Err(e) => {
error!(e; r#"Failed to export view "{}"."{}"."{}""#, c, s, v)
}
Ok(create_view) => {
file.write_all(create_view.as_bytes())
.await
.context(FileIoSnafu)?;
}
}
}
info!(
"Finished exporting {catalog}.{schema} with {table_count} table schemas to path: {}",
"Finished exporting {}.{schema} with {table_count} table schemas to path: {}",
self.catalog,
output_dir.to_string_lossy()
);
@@ -332,7 +310,7 @@ impl Export {
.filter(|r| match r {
Ok(_) => true,
Err(e) => {
error!(e; "export job failed");
error!(e; "export schema job failed");
false
}
})
@@ -347,15 +325,15 @@ impl Export {
async fn export_database_data(&self) -> Result<()> {
let timer = Instant::now();
let semaphore = Arc::new(Semaphore::new(self.parallelism));
let db_names = self.iter_db_names().await?;
let db_names = self.get_db_names().await?;
let db_count = db_names.len();
let mut tasks = Vec::with_capacity(db_names.len());
for (catalog, schema) in db_names {
let mut tasks = Vec::with_capacity(db_count);
for schema in db_names {
let semaphore_moved = semaphore.clone();
tasks.push(async move {
let _permit = semaphore_moved.acquire().await.unwrap();
let output_dir = Path::new(&self.output_dir)
.join(&catalog)
.join(&self.catalog)
.join(format!("{schema}/"));
tokio::fs::create_dir_all(&output_dir)
.await
@@ -379,7 +357,7 @@ impl Export {
let sql = format!(
r#"COPY DATABASE "{}"."{}" TO '{}' {};"#,
catalog,
self.catalog,
schema,
output_dir.to_str().unwrap(),
with_options
@@ -387,10 +365,11 @@ impl Export {
info!("Executing sql: {sql}");
self.sql(&sql).await?;
self.database_client.sql_in_public(&sql).await?;
info!(
"Finished exporting {catalog}.{schema} data into path: {}",
"Finished exporting {}.{schema} data into path: {}",
self.catalog,
output_dir.to_string_lossy()
);
@@ -400,7 +379,7 @@ impl Export {
BufWriter::new(File::create(copy_from_file).await.context(FileIoSnafu)?);
let copy_database_from_sql = format!(
r#"COPY DATABASE "{}"."{}" FROM '{}' WITH (FORMAT='parquet');"#,
catalog,
self.catalog,
schema,
output_dir.to_str().unwrap()
);
@@ -410,7 +389,7 @@ impl Export {
.context(FileIoSnafu)?;
writer.flush().await.context(FileIoSnafu)?;
info!("Finished exporting {catalog}.{schema} copy_from.sql");
info!("Finished exporting {}.{schema} copy_from.sql", self.catalog);
Ok::<(), Error>(())
})
@@ -429,13 +408,12 @@ impl Export {
.count();
let elapsed = timer.elapsed();
info!("Success {success}/{db_count} jobs, costs: {:?}", elapsed);
info!("Success {success}/{db_count} jobs, costs: {elapsed:?}");
Ok(())
}
}
#[allow(deprecated)]
#[async_trait]
impl Tool for Export {
async fn do_work(&self) -> Result<()> {
@@ -450,20 +428,6 @@ impl Tool for Export {
}
}
/// Split at `-`.
fn split_database(database: &str) -> Result<(String, Option<String>)> {
let (catalog, schema) = match database.split_once('-') {
Some((catalog, schema)) => (catalog, schema),
None => (DEFAULT_CATALOG_NAME, database),
};
if schema == "*" {
Ok((catalog.to_string(), None))
} else {
Ok((catalog.to_string(), Some(schema.to_string())))
}
}
#[cfg(test)]
mod tests {
use clap::Parser;
@@ -471,26 +435,10 @@ mod tests {
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_telemetry::logging::LoggingOptions;
use crate::cli::export::split_database;
use crate::error::Result as CmdResult;
use crate::options::GlobalOptions;
use crate::{cli, standalone, App};
#[test]
fn test_split_database() {
let result = split_database("catalog-schema").unwrap();
assert_eq!(result, ("catalog".to_string(), Some("schema".to_string())));
let result = split_database("schema").unwrap();
assert_eq!(result, ("greptime".to_string(), Some("schema".to_string())));
let result = split_database("catalog-*").unwrap();
assert_eq!(result, ("catalog".to_string(), None));
let result = split_database("*").unwrap();
assert_eq!(result, ("greptime".to_string(), None));
}
#[tokio::test(flavor = "multi_thread")]
async fn test_export_create_table_with_quoted_names() -> CmdResult<()> {
let output_dir = tempfile::tempdir().unwrap();

204
src/cmd/src/cli/import.rs Normal file
View File

@@ -0,0 +1,204 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::PathBuf;
use std::sync::Arc;
use async_trait::async_trait;
use clap::{Parser, ValueEnum};
use common_telemetry::{error, info, warn};
use snafu::ResultExt;
use tokio::sync::Semaphore;
use tokio::time::Instant;
use tracing_appender::non_blocking::WorkerGuard;
use crate::cli::database::DatabaseClient;
use crate::cli::{database, Instance, Tool};
use crate::error::{Error, FileIoSnafu, Result};
#[derive(Debug, Default, Clone, ValueEnum)]
enum ImportTarget {
/// Import all table schemas into the database.
Schema,
/// Import all table data into the database.
Data,
/// Export all table schemas and data at once.
#[default]
All,
}
#[derive(Debug, Default, Parser)]
pub struct ImportCommand {
/// Server address to connect
#[clap(long)]
addr: String,
/// Directory of the data. E.g.: /tmp/greptimedb-backup
#[clap(long)]
input_dir: String,
/// The name of the catalog to import.
#[clap(long, default_value = "greptime-*")]
database: String,
/// Parallelism of the import.
#[clap(long, short = 'j', default_value = "1")]
import_jobs: usize,
/// Max retry times for each job.
#[clap(long, default_value = "3")]
max_retry: usize,
/// Things to export
#[clap(long, short = 't', value_enum, default_value = "all")]
target: ImportTarget,
/// The basic authentication for connecting to the server
#[clap(long)]
auth_basic: Option<String>,
}
impl ImportCommand {
pub async fn build(&self, guard: Vec<WorkerGuard>) -> Result<Instance> {
let (catalog, schema) = database::split_database(&self.database)?;
let database_client =
DatabaseClient::new(self.addr.clone(), catalog.clone(), self.auth_basic.clone());
Ok(Instance::new(
Box::new(Import {
catalog,
schema,
database_client,
input_dir: self.input_dir.clone(),
parallelism: self.import_jobs,
target: self.target.clone(),
}),
guard,
))
}
}
pub struct Import {
catalog: String,
schema: Option<String>,
database_client: DatabaseClient,
input_dir: String,
parallelism: usize,
target: ImportTarget,
}
impl Import {
async fn import_create_table(&self) -> Result<()> {
self.do_sql_job("create_tables.sql").await
}
async fn import_database_data(&self) -> Result<()> {
self.do_sql_job("copy_from.sql").await
}
async fn do_sql_job(&self, filename: &str) -> Result<()> {
let timer = Instant::now();
let semaphore = Arc::new(Semaphore::new(self.parallelism));
let db_names = self.get_db_names().await?;
let db_count = db_names.len();
let mut tasks = Vec::with_capacity(db_count);
for schema in db_names {
let semaphore_moved = semaphore.clone();
tasks.push(async move {
let _permit = semaphore_moved.acquire().await.unwrap();
let database_input_dir = self.catalog_path().join(&schema);
let sql_file = database_input_dir.join(filename);
let sql = tokio::fs::read_to_string(sql_file)
.await
.context(FileIoSnafu)?;
if sql.is_empty() {
info!("Empty `{filename}` {database_input_dir:?}");
} else {
self.database_client.sql(&sql, &schema).await?;
info!("Imported `{filename}` for database {schema}");
}
Ok::<(), Error>(())
})
}
let success = futures::future::join_all(tasks)
.await
.into_iter()
.filter(|r| match r {
Ok(_) => true,
Err(e) => {
error!(e; "import {filename} job failed");
false
}
})
.count();
let elapsed = timer.elapsed();
info!("Success {success}/{db_count} `{filename}` jobs, cost: {elapsed:?}");
Ok(())
}
fn catalog_path(&self) -> PathBuf {
PathBuf::from(&self.input_dir).join(&self.catalog)
}
async fn get_db_names(&self) -> Result<Vec<String>> {
if let Some(schema) = &self.schema {
Ok(vec![schema.clone()])
} else {
self.all_db_names().await
}
}
// Get all database names in the input directory.
// The directory structure should be like:
// /tmp/greptimedb-backup
// ├── greptime-1
// │ ├── db1
// │ └── db2
async fn all_db_names(&self) -> Result<Vec<String>> {
let mut db_names = vec![];
let path = self.catalog_path();
let mut entries = tokio::fs::read_dir(path).await.context(FileIoSnafu)?;
while let Some(entry) = entries.next_entry().await.context(FileIoSnafu)? {
let path = entry.path();
if path.is_dir() {
let db_name = match path.file_name() {
Some(name) => name.to_string_lossy().to_string(),
None => {
warn!("Failed to get the file name of {:?}", path);
continue;
}
};
db_names.push(db_name);
}
}
Ok(db_names)
}
}
#[async_trait]
impl Tool for Import {
async fn do_work(&self) -> Result<()> {
match self.target {
ImportTarget::Schema => self.import_create_table().await,
ImportTarget::Data => self.import_database_data().await,
ImportTarget::All => {
self.import_create_table().await?;
self.import_database_data().await
}
}
}
}

View File

@@ -267,7 +267,7 @@ impl StartCommand {
&opts.component.tracing,
opts.component.node_id.map(|x| x.to_string()),
);
log_versions(version(), short_version());
log_versions(version(), short_version(), APP_NAME);
info!("Datanode start command: {:#?}", self);
info!("Datanode options: {:#?}", opts);

View File

@@ -215,7 +215,7 @@ impl StartCommand {
&opts.component.tracing,
opts.component.node_id.map(|x| x.to_string()),
);
log_versions(version(), short_version());
log_versions(version(), short_version(), APP_NAME);
info!("Flownode start command: {:#?}", self);
info!("Flownode options: {:#?}", opts);

View File

@@ -261,7 +261,7 @@ impl StartCommand {
&opts.component.tracing,
opts.component.node_id.clone(),
);
log_versions(version(), short_version());
log_versions(version(), short_version(), APP_NAME);
info!("Frontend start command: {:#?}", self);
info!("Frontend options: {:#?}", opts);

View File

@@ -30,7 +30,7 @@ pub mod standalone;
lazy_static::lazy_static! {
static ref APP_VERSION: prometheus::IntGaugeVec =
prometheus::register_int_gauge_vec!("greptime_app_version", "app version", &["version", "short_version"]).unwrap();
prometheus::register_int_gauge_vec!("greptime_app_version", "app version", &["version", "short_version", "app"]).unwrap();
}
#[async_trait]
@@ -76,10 +76,10 @@ pub trait App: Send {
/// Log the versions of the application, and the arguments passed to the cli.
/// `version` should be the same as the output of cli "--version";
/// and the `short_version` is the short version of the codes, often consist of git branch and commit.
pub fn log_versions(version: &str, short_version: &str) {
pub fn log_versions(version: &str, short_version: &str, app: &str) {
// Report app version as gauge.
APP_VERSION
.with_label_values(&[env!("CARGO_PKG_VERSION"), short_version])
.with_label_values(&[env!("CARGO_PKG_VERSION"), short_version, app])
.inc();
// Log version and argument flags.

View File

@@ -244,7 +244,7 @@ impl StartCommand {
&opts.component.tracing,
None,
);
log_versions(version(), short_version());
log_versions(version(), short_version(), APP_NAME);
info!("Metasrv start command: {:#?}", self);
info!("Metasrv options: {:#?}", opts);

View File

@@ -141,6 +141,8 @@ pub struct StandaloneOptions {
pub region_engine: Vec<RegionEngineConfig>,
pub export_metrics: ExportMetricsOption,
pub tracing: TracingOptions,
pub init_regions_in_background: bool,
pub init_regions_parallelism: usize,
}
impl Default for StandaloneOptions {
@@ -168,6 +170,8 @@ impl Default for StandaloneOptions {
RegionEngineConfig::File(FileEngineConfig::default()),
],
tracing: TracingOptions::default(),
init_regions_in_background: false,
init_regions_parallelism: 16,
}
}
}
@@ -178,6 +182,16 @@ impl Configurable for StandaloneOptions {
}
}
/// The [`StandaloneOptions`] is only defined in cmd crate,
/// we don't want to make `frontend` depends on it, so impl [`Into`]
/// rather than [`From`].
#[allow(clippy::from_over_into)]
impl Into<FrontendOptions> for StandaloneOptions {
fn into(self) -> FrontendOptions {
self.frontend_options()
}
}
impl StandaloneOptions {
pub fn frontend_options(&self) -> FrontendOptions {
let cloned_opts = self.clone();
@@ -208,6 +222,9 @@ impl StandaloneOptions {
storage: cloned_opts.storage,
region_engine: cloned_opts.region_engine,
grpc: cloned_opts.grpc,
init_regions_in_background: cloned_opts.init_regions_in_background,
init_regions_parallelism: cloned_opts.init_regions_parallelism,
mode: Mode::Standalone,
..Default::default()
}
}
@@ -415,7 +432,7 @@ impl StartCommand {
&opts.component.tracing,
None,
);
log_versions(version(), short_version());
log_versions(version(), short_version(), APP_NAME);
info!("Standalone start command: {:#?}", self);
info!("Standalone options: {opts:#?}");
@@ -510,7 +527,7 @@ impl StartCommand {
.build(),
);
let wal_options_allocator = Arc::new(WalOptionsAllocator::new(
opts.wal.into(),
opts.wal.clone().into(),
kv_backend.clone(),
));
let table_meta_allocator = Arc::new(TableMetadataAllocator::new(
@@ -533,7 +550,7 @@ impl StartCommand {
.await?;
let mut frontend = FrontendBuilder::new(
fe_opts.clone(),
fe_opts,
kv_backend.clone(),
layered_cache_registry.clone(),
catalog_manager.clone(),
@@ -561,7 +578,7 @@ impl StartCommand {
let (tx, _rx) = broadcast::channel(1);
let servers = Services::new(fe_opts, Arc::new(frontend.clone()), plugins)
let servers = Services::new(opts, Arc::new(frontend.clone()), plugins)
.build()
.await
.context(StartFrontendSnafu)?;

View File

@@ -81,6 +81,7 @@ fn test_load_datanode_example_config() {
sst_meta_cache_size: ReadableSize::mb(128),
vector_cache_size: ReadableSize::mb(512),
page_cache_size: ReadableSize::mb(512),
selector_result_cache_size: ReadableSize::mb(512),
max_background_jobs: 4,
experimental_write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)),
..Default::default()
@@ -218,6 +219,7 @@ fn test_load_standalone_example_config() {
sst_meta_cache_size: ReadableSize::mb(128),
vector_cache_size: ReadableSize::mb(512),
page_cache_size: ReadableSize::mb(512),
selector_result_cache_size: ReadableSize::mb(512),
max_background_jobs: 4,
experimental_write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)),
..Default::default()

View File

@@ -153,6 +153,9 @@ pub struct UpgradeRegion {
/// it's helpful to verify whether the leader region is ready.
#[serde(with = "humantime_serde")]
pub wait_for_replay_timeout: Option<Duration>,
/// The hint for replaying memtable.
#[serde(default)]
pub location_id: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]

View File

@@ -89,6 +89,19 @@ impl TryFrom<&HashMap<String, String>> for SchemaNameValue {
}
}
impl From<SchemaNameValue> for HashMap<String, String> {
fn from(value: SchemaNameValue) -> Self {
let mut opts = HashMap::new();
if let Some(ttl) = value.ttl {
opts.insert(
OPT_KEY_TTL.to_string(),
format!("{}", humantime::format_duration(ttl)),
);
}
opts
}
}
impl<'a> SchemaNameKey<'a> {
pub fn new(catalog: &'a str, schema: &'a str) -> Self {
Self { catalog, schema }

View File

@@ -13,7 +13,7 @@ workspace = true
[dependencies]
async-stream.workspace = true
async-trait.workspace = true
backon = "0.4"
backon = "1"
common-base.workspace = true
common-error.workspace = true
common-macro.workspace = true

View File

@@ -373,7 +373,7 @@ impl Runner {
procedure,
manager_ctx: self.manager_ctx.clone(),
step,
exponential_builder: self.exponential_builder.clone(),
exponential_builder: self.exponential_builder,
store: self.store.clone(),
rolling_back: false,
};

View File

@@ -234,7 +234,13 @@ pub fn init_global_logging(
opentelemetry_otlp::new_exporter().tonic().with_endpoint(
opts.otlp_endpoint
.as_ref()
.map(|e| format!("http://{}", e))
.map(|e| {
if e.starts_with("http") {
e.to_string()
} else {
format!("http://{}", e)
}
})
.unwrap_or(DEFAULT_OTLP_ENDPOINT.to_string()),
),
)

View File

@@ -37,7 +37,7 @@ use crate::alive_keeper::RegionAliveKeeper;
use crate::config::DatanodeOptions;
use crate::error::{self, MetaClientInitSnafu, Result};
use crate::event_listener::RegionServerEventReceiver;
use crate::metrics;
use crate::metrics::{self, HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
use crate::region_server::RegionServer;
pub(crate) mod handler;
@@ -231,10 +231,12 @@ impl HeartbeatTask {
mailbox_message: Some(message),
..Default::default()
};
HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
Some(req)
}
Err(e) => {
error!(e; "Failed to encode mailbox messages!");
HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
None
}
}
@@ -304,6 +306,8 @@ impl HeartbeatTask {
error!(e; "Failed to reconnect to metasrv!");
}
}
} else {
HEARTBEAT_SENT_COUNT.inc();
}
}
}

View File

@@ -206,6 +206,7 @@ mod tests {
region_id,
last_entry_id: None,
wait_for_replay_timeout: None,
location_id: None,
});
assert!(
heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction)))

View File

@@ -27,6 +27,7 @@ impl HandlerContext {
region_id,
last_entry_id,
wait_for_replay_timeout,
location_id,
}: UpgradeRegion,
) -> BoxFuture<'static, InstructionReply> {
Box::pin(async move {
@@ -62,6 +63,7 @@ impl HandlerContext {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true,
entry_id: last_entry_id,
location_id,
}),
)
.await?;
@@ -151,6 +153,7 @@ mod tests {
region_id,
last_entry_id: None,
wait_for_replay_timeout,
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
@@ -191,6 +194,7 @@ mod tests {
region_id,
last_entry_id: None,
wait_for_replay_timeout,
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
@@ -232,6 +236,7 @@ mod tests {
region_id,
last_entry_id: None,
wait_for_replay_timeout,
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
@@ -274,8 +279,9 @@ mod tests {
.clone()
.handle_upgrade_region_instruction(UpgradeRegion {
region_id,
last_entry_id: None,
wait_for_replay_timeout,
last_entry_id: None,
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
@@ -293,6 +299,7 @@ mod tests {
region_id,
last_entry_id: None,
wait_for_replay_timeout: Some(Duration::from_millis(500)),
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
@@ -337,6 +344,7 @@ mod tests {
region_id,
last_entry_id: None,
wait_for_replay_timeout: None,
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
@@ -354,6 +362,7 @@ mod tests {
region_id,
last_entry_id: None,
wait_for_replay_timeout: Some(Duration::from_millis(200)),
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));

View File

@@ -54,4 +54,17 @@ lazy_static! {
&[REGION_ROLE]
)
.unwrap();
/// The number of heartbeats send by datanode.
pub static ref HEARTBEAT_SENT_COUNT: IntCounter = register_int_counter!(
"greptime_datanode_heartbeat_send_count",
"datanode heartbeat sent",
)
.unwrap();
/// The number of heartbeats received by datanode, labeled with result type.
pub static ref HEARTBEAT_RECV_COUNT: IntCounterVec = register_int_counter_vec!(
"greptime_datanode_heartbeat_recv_count",
"datanode heartbeat received",
&["result"]
)
.unwrap();
}

View File

@@ -860,7 +860,7 @@ impl RegionServerInner {
// complains "higher-ranked lifetime error". Rust can't prove some future is legit.
// Possible related issue: https://github.com/rust-lang/rust/issues/102211
//
// The walkaround is to put the async functions in the `common_runtime::spawn_global`. Or like
// The workaround is to put the async functions in the `common_runtime::spawn_global`. Or like
// it here, collect the values first then use later separately.
let regions = self

View File

@@ -49,13 +49,13 @@ use crate::adapter::table_source::TableSource;
use crate::adapter::util::column_schemas_to_proto;
use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
use crate::compute::ErrCollector;
use crate::df_optimizer::sql_to_flow_plan;
use crate::error::{ExternalSnafu, InternalSnafu, TableNotFoundSnafu, UnexpectedSnafu};
use crate::expr::GlobalId;
use crate::metrics::{
METRIC_FLOW_INPUT_BUF_SIZE, METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_RUN_INTERVAL_MS,
};
use crate::repr::{self, DiffRow, Row, BATCH_SIZE};
use crate::transform::sql_to_flow_plan;
mod flownode_impl;
mod parse_expr;

View File

@@ -28,7 +28,7 @@ use super::state::Scheduler;
use crate::compute::state::DataflowState;
use crate::compute::types::{Collection, CollectionBundle, ErrCollector, Toff};
use crate::error::{Error, InvalidQuerySnafu, NotImplementedSnafu};
use crate::expr::{self, GlobalId, LocalId};
use crate::expr::{self, Batch, GlobalId, LocalId};
use crate::plan::{Plan, TypedPlan};
use crate::repr::{self, DiffRow};
@@ -87,9 +87,38 @@ impl<'referred, 'df> Context<'referred, 'df> {
}
impl<'referred, 'df> Context<'referred, 'df> {
/// Interpret and execute plan
/// Like `render_plan` but in Batch Mode
pub fn render_plan_batch(&mut self, plan: TypedPlan) -> Result<CollectionBundle<Batch>, Error> {
match plan.plan {
Plan::Constant { rows } => Ok(self.render_constant_batch(rows)),
Plan::Get { .. } => NotImplementedSnafu {
reason: "Get is still WIP in batchmode",
}
.fail(),
Plan::Let { .. } => NotImplementedSnafu {
reason: "Let is still WIP in batchmode",
}
.fail(),
Plan::Mfp { input, mfp } => self.render_mfp_batch(input, mfp),
Plan::Reduce {
input,
key_val_plan,
reduce_plan,
} => self.render_reduce_batch(input, &key_val_plan, &reduce_plan, &plan.schema.typ),
Plan::Join { .. } => NotImplementedSnafu {
reason: "Join is still WIP",
}
.fail(),
Plan::Union { .. } => NotImplementedSnafu {
reason: "Union is still WIP",
}
.fail(),
}
}
/// Interpret plan to dataflow and prepare them for execution
///
/// return the output of this plan
/// return the output handler of this plan
pub fn render_plan(&mut self, plan: TypedPlan) -> Result<CollectionBundle, Error> {
match plan.plan {
Plan::Constant { rows } => Ok(self.render_constant(rows)),
@@ -112,17 +141,61 @@ impl<'referred, 'df> Context<'referred, 'df> {
}
}
/// render Constant, take all rows that have a timestamp not greater than the current time
/// This function is primarily used for testing
/// Always assume input is sorted by timestamp
pub fn render_constant_batch(&mut self, rows: Vec<DiffRow>) -> CollectionBundle<Batch> {
let (send_port, recv_port) = self.df.make_edge::<_, Toff<Batch>>("constant_batch");
let mut per_time: BTreeMap<repr::Timestamp, Vec<DiffRow>> = Default::default();
for (key, group) in &rows.into_iter().group_by(|(_row, ts, _diff)| *ts) {
per_time.entry(key).or_default().extend(group);
}
let now = self.compute_state.current_time_ref();
// TODO(discord9): better way to schedule future run
let scheduler = self.compute_state.get_scheduler();
let scheduler_inner = scheduler.clone();
let err_collector = self.err_collector.clone();
let subgraph_id =
self.df
.add_subgraph_source("ConstantBatch", send_port, move |_ctx, send_port| {
// find the first timestamp that is greater than now
// use filter_map
let mut after = per_time.split_off(&(*now.borrow() + 1));
// swap
std::mem::swap(&mut per_time, &mut after);
let not_great_than_now = after;
not_great_than_now.into_iter().for_each(|(_ts, rows)| {
err_collector.run(|| {
let rows = rows.into_iter().map(|(row, _ts, _diff)| row).collect();
let batch = Batch::try_from_rows(rows)?;
send_port.give(vec![batch]);
Ok(())
});
});
// schedule the next run
if let Some(next_run_time) = per_time.keys().next().copied() {
scheduler_inner.schedule_at(next_run_time);
}
});
scheduler.set_cur_subgraph(subgraph_id);
CollectionBundle::from_collection(Collection::from_port(recv_port))
}
/// render Constant, take all rows that have a timestamp not greater than the current time
///
/// Always assume input is sorted by timestamp
pub fn render_constant(&mut self, rows: Vec<DiffRow>) -> CollectionBundle {
let (send_port, recv_port) = self.df.make_edge::<_, Toff>("constant");
let mut per_time: BTreeMap<repr::Timestamp, Vec<DiffRow>> = rows
.into_iter()
.group_by(|(_row, ts, _diff)| *ts)
.into_iter()
.map(|(k, v)| (k, v.into_iter().collect_vec()))
.collect();
let mut per_time: BTreeMap<repr::Timestamp, Vec<DiffRow>> = Default::default();
for (key, group) in &rows.into_iter().group_by(|(_row, ts, _diff)| *ts) {
per_time.entry(key).or_default().extend(group);
}
let now = self.compute_state.current_time_ref();
// TODO(discord9): better way to schedule future run
let scheduler = self.compute_state.get_scheduler();

View File

@@ -23,12 +23,59 @@ use crate::compute::render::Context;
use crate::compute::state::Scheduler;
use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff};
use crate::error::{Error, PlanSnafu};
use crate::expr::{EvalError, MapFilterProject, MfpPlan, ScalarExpr};
use crate::expr::{Batch, EvalError, MapFilterProject, MfpPlan, ScalarExpr};
use crate::plan::TypedPlan;
use crate::repr::{self, DiffRow, KeyValDiffRow, Row};
use crate::utils::ArrangeHandler;
impl<'referred, 'df> Context<'referred, 'df> {
/// Like `render_mfp` but in batch mode
pub fn render_mfp_batch(
&mut self,
input: Box<TypedPlan>,
mfp: MapFilterProject,
) -> Result<CollectionBundle<Batch>, Error> {
let input = self.render_plan_batch(*input)?;
let (out_send_port, out_recv_port) = self.df.make_edge::<_, Toff<Batch>>("mfp_batch");
// This closure capture following variables:
let mfp_plan = MfpPlan::create_from(mfp)?;
let err_collector = self.err_collector.clone();
// TODO(discord9): better way to schedule future run
let scheduler = self.compute_state.get_scheduler();
let subgraph = self.df.add_subgraph_in_out(
"mfp_batch",
input.collection.into_inner(),
out_send_port,
move |_ctx, recv, send| {
// mfp only need to passively receive updates from recvs
let src_data = recv.take_inner().into_iter().flat_map(|v| v.into_iter());
let output_batches = src_data
.filter_map(|mut input_batch| {
err_collector.run(|| {
let res_batch = mfp_plan.mfp.eval_batch_into(&mut input_batch)?;
Ok(res_batch)
})
})
.collect_vec();
send.give(output_batches);
},
);
// register current subgraph in scheduler for future scheduling
scheduler.set_cur_subgraph(subgraph);
let bundle =
CollectionBundle::from_collection(Collection::<Batch>::from_port(out_recv_port));
Ok(bundle)
}
/// render MapFilterProject, will only emit the `rows` once. Assume all incoming row's sys time being `now`` and ignore the row's stated sys time
/// TODO(discord9): schedule mfp operator to run when temporal filter need
///

View File

@@ -14,23 +14,247 @@
use std::collections::BTreeMap;
use std::ops::Range;
use std::sync::Arc;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::DataType;
use datatypes::value::{ListValue, Value};
use datatypes::vectors::NullVector;
use hydroflow::scheduled::graph_ext::GraphExt;
use itertools::Itertools;
use snafu::{ensure, OptionExt, ResultExt};
use crate::compute::render::{Context, SubgraphArg};
use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff};
use crate::error::{Error, PlanSnafu};
use crate::error::{Error, NotImplementedSnafu, PlanSnafu};
use crate::expr::error::{DataAlreadyExpiredSnafu, DataTypeSnafu, InternalSnafu};
use crate::expr::{EvalError, ScalarExpr};
use crate::expr::{Batch, EvalError, ScalarExpr};
use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, ReducePlan, TypedPlan};
use crate::repr::{self, DiffRow, KeyValDiffRow, RelationType, Row};
use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter, KeyExpiryManager};
impl<'referred, 'df> Context<'referred, 'df> {
const REDUCE_BATCH: &'static str = "reduce_batch";
/// Like `render_reduce`, but for batch mode, and only barebone implementation
/// no support for distinct aggregation for now
// There is a false positive in using `Vec<ScalarExpr>` as key due to `Value` have `bytes` variant
#[allow(clippy::mutable_key_type)]
pub fn render_reduce_batch(
&mut self,
input: Box<TypedPlan>,
key_val_plan: &KeyValPlan,
reduce_plan: &ReducePlan,
output_type: &RelationType,
) -> Result<CollectionBundle<Batch>, Error> {
let accum_plan = if let ReducePlan::Accumulable(accum_plan) = reduce_plan {
if !accum_plan.distinct_aggrs.is_empty() {
NotImplementedSnafu {
reason: "Distinct aggregation is not supported in batch mode",
}
.fail()?
}
accum_plan.clone()
} else {
NotImplementedSnafu {
reason: "Only accumulable reduce plan is supported in batch mode",
}
.fail()?
};
let input = self.render_plan_batch(*input)?;
// first assembly key&val to separate key and val columns(since this is batch mode)
// Then stream kvs through a reduce operator
// the output is concat from key and val
let output_key_arity = key_val_plan.key_plan.output_arity();
// TODO(discord9): config global expire time from self
let arrange_handler = self.compute_state.new_arrange(None);
if let (Some(time_index), Some(expire_after)) =
(output_type.time_index, self.compute_state.expire_after())
{
let expire_man =
KeyExpiryManager::new(Some(expire_after), Some(ScalarExpr::Column(time_index)));
arrange_handler.write().set_expire_state(expire_man);
}
// reduce need full arrangement to be able to query all keys
let arrange_handler_inner = arrange_handler.clone_full_arrange().context(PlanSnafu {
reason: "No write is expected at this point",
})?;
let key_val_plan = key_val_plan.clone();
let now = self.compute_state.current_time_ref();
let err_collector = self.err_collector.clone();
// TODO(discord9): better way to schedule future run
let scheduler = self.compute_state.get_scheduler();
let (out_send_port, out_recv_port) =
self.df.make_edge::<_, Toff<Batch>>(Self::REDUCE_BATCH);
let subgraph =
self.df.add_subgraph_in_out(
Self::REDUCE_BATCH,
input.collection.into_inner(),
out_send_port,
move |_ctx, recv, send| {
let now = *(now.borrow());
let arrange = arrange_handler_inner.clone();
// mfp only need to passively receive updates from recvs
let src_data = recv
.take_inner()
.into_iter()
.flat_map(|v| v.into_iter())
.collect_vec();
let mut key_to_many_vals = BTreeMap::<Row, Batch>::new();
for batch in src_data {
err_collector.run(|| {
let (key_batch, val_batch) =
batch_split_by_key_val(&batch, &key_val_plan, &err_collector);
ensure!(
key_batch.row_count() == val_batch.row_count(),
InternalSnafu {
reason: format!(
"Key and val batch should have the same row count, found {} and {}",
key_batch.row_count(),
val_batch.row_count()
)
}
);
for row_idx in 0..key_batch.row_count() {
let key_row = key_batch.get_row(row_idx).unwrap();
let val_row = val_batch.slice(row_idx, 1)?;
let val_batch =
key_to_many_vals.entry(Row::new(key_row)).or_default();
val_batch.append_batch(val_row)?;
}
Ok(())
});
}
// write lock the arrange for the rest of the function body
// to prevent wired race condition
let mut arrange = arrange.write();
let mut all_arrange_updates = Vec::with_capacity(key_to_many_vals.len());
let mut all_output_rows = Vec::with_capacity(key_to_many_vals.len());
for (key, val_batch) in key_to_many_vals {
err_collector.run(|| -> Result<(), _> {
let (accums, _, _) = arrange.get(now, &key).unwrap_or_default();
let accum_list = from_accum_values_to_live_accums(
accums.unpack(),
accum_plan.simple_aggrs.len(),
)?;
let mut accum_output = AccumOutput::new();
for AggrWithIndex {
expr,
input_idx,
output_idx,
} in accum_plan.simple_aggrs.iter()
{
let cur_old_accum = accum_list.get(*output_idx).cloned().unwrap_or_default();
// if batch is empty, input null instead
let cur_input = val_batch.batch().get(*input_idx).cloned().unwrap_or_else(||Arc::new(NullVector::new(val_batch.row_count())));
let (output, new_accum) =
expr.func.eval_batch(cur_old_accum, cur_input, None)?;
accum_output.insert_accum(*output_idx, new_accum);
accum_output.insert_output(*output_idx, output);
}
let (new_accums, res_val_row) = accum_output.into_accum_output()?;
let arrange_update = ((key.clone(), Row::new(new_accums)), now, 1);
all_arrange_updates.push(arrange_update);
let mut key_val = key;
key_val.extend(res_val_row);
all_output_rows.push((key_val, now, 1));
Ok(())
});
}
err_collector.run(|| {
arrange.apply_updates(now, all_arrange_updates)?;
arrange.compact_to(now)
});
// this output part is not supposed to be resource intensive
// (because for every batch there wouldn't usually be as many output row?),
// so we can do some costly operation here
let output_types = all_output_rows.first().map(|(row, _, _)| {
row.iter()
.map(|v| v.data_type())
.collect::<Vec<ConcreteDataType>>()
});
if let Some(output_types) = output_types {
err_collector.run(|| {
let column_cnt = output_types.len();
let row_cnt = all_output_rows.len();
let mut output_builder = output_types
.into_iter()
.map(|t| t.create_mutable_vector(row_cnt))
.collect_vec();
for (row, _, _) in all_output_rows {
for (i, v) in row.into_iter().enumerate() {
output_builder
.get_mut(i)
.context(InternalSnafu{
reason: format!(
"Output builder should have the same length as the row, expected at most {} but got {}",
column_cnt-1,
i
)
})?
.try_push_value_ref(v.as_value_ref())
.context(DataTypeSnafu {
msg: "Failed to push value",
})?;
}
}
let output_columns = output_builder
.into_iter()
.map(|mut b| b.to_vector())
.collect_vec();
let output_batch = Batch::try_new(output_columns, row_cnt)?;
send.give(vec![output_batch]);
Ok(())
});
}
},
);
scheduler.set_cur_subgraph(subgraph);
// by default the key of output arrange
let arranged = BTreeMap::from([(
(0..output_key_arity).map(ScalarExpr::Column).collect_vec(),
Arranged::new(arrange_handler),
)]);
let bundle = CollectionBundle {
collection: Collection::from_port(out_recv_port),
arranged,
};
Ok(bundle)
}
const REDUCE: &'static str = "reduce";
/// render `Plan::Reduce` into executable dataflow
// There is a false positive in using `Vec<ScalarExpr>` as key due to `Value` have `bytes` variant
@@ -151,6 +375,18 @@ impl<'referred, 'df> Context<'referred, 'df> {
}
}
fn from_accum_values_to_live_accums(
accums: Vec<Value>,
len: usize,
) -> Result<Vec<Vec<Value>>, EvalError> {
let accum_ranges = from_val_to_slice_idx(accums.first().cloned(), len)?;
let mut accum_list = vec![];
for range in accum_ranges.iter() {
accum_list.push(accums.get(range.clone()).unwrap_or_default().to_vec());
}
Ok(accum_list)
}
/// All arrange(aka state) used in reduce operator
pub struct ReduceArrange {
/// The output arrange of reduce operator
@@ -160,33 +396,40 @@ pub struct ReduceArrange {
distinct_input: Option<Vec<ArrangeHandler>>,
}
/// split a row into key and val by evaluate the key and val plan
fn split_row_to_key_val(
row: Row,
sys_time: repr::Timestamp,
diff: repr::Diff,
fn batch_split_by_key_val(
batch: &Batch,
key_val_plan: &KeyValPlan,
row_buf: &mut Row,
) -> Result<Option<KeyValDiffRow>, EvalError> {
if let Some(key) = key_val_plan
.key_plan
.evaluate_into(&mut row.inner.clone(), row_buf)?
{
// val_plan is not supported to carry any filter predicate,
let val = key_val_plan
.val_plan
.evaluate_into(&mut row.inner.clone(), row_buf)?
.context(InternalSnafu {
reason: "val_plan should not contain any filter predicate",
})?;
Ok(Some(((key, val), sys_time, diff)))
} else {
Ok(None)
err_collector: &ErrCollector,
) -> (Batch, Batch) {
let row_count = batch.row_count();
let mut key_batch = Batch::empty();
let mut val_batch = Batch::empty();
err_collector.run(|| {
if key_val_plan.key_plan.output_arity() != 0 {
key_batch = key_val_plan.key_plan.eval_batch_into(&mut batch.clone())?;
}
if key_val_plan.val_plan.output_arity() != 0 {
val_batch = key_val_plan.val_plan.eval_batch_into(&mut batch.clone())?;
}
Ok(())
});
// deal with empty key or val
if key_batch.row_count() == 0 && key_batch.column_count() == 0 {
key_batch.set_row_count(row_count);
}
if val_batch.row_count() == 0 && val_batch.column_count() == 0 {
val_batch.set_row_count(row_count);
}
(key_batch, val_batch)
}
/// split a row into key and val by evaluate the key and val plan
fn batch_split_rows_to_key_val(
fn split_rows_to_key_val(
rows: impl IntoIterator<Item = DiffRow>,
key_val_plan: KeyValPlan,
err_collector: ErrCollector,
@@ -235,7 +478,7 @@ fn reduce_subgraph(
send,
}: SubgraphArg,
) {
let key_val = batch_split_rows_to_key_val(data, key_val_plan.clone(), err_collector.clone());
let key_val = split_rows_to_key_val(data, key_val_plan.clone(), err_collector.clone());
// from here for distinct reduce and accum reduce, things are drastically different
// for distinct reduce the arrange store the output,
// but for accum reduce the arrange store the accum state, and output is
@@ -1127,6 +1370,105 @@ mod test {
run_and_check(&mut state, &mut df, 6..7, expected, output);
}
/// Batch Mode Reduce Evaluation
/// SELECT SUM(col) FROM table
///
/// table schema:
/// | name | type |
/// |------|-------|
/// | col | Int64 |
#[test]
fn test_basic_batch_reduce_accum() {
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let now = state.current_time_ref();
let mut ctx = harness_test_ctx(&mut df, &mut state);
let rows = vec![
(Row::new(vec![1i64.into()]), 1, 1),
(Row::new(vec![2i64.into()]), 2, 1),
(Row::new(vec![3i64.into()]), 3, 1),
(Row::new(vec![1i64.into()]), 4, 1),
(Row::new(vec![2i64.into()]), 5, 1),
(Row::new(vec![3i64.into()]), 6, 1),
];
let input_plan = Plan::Constant { rows: rows.clone() };
let typ = RelationType::new(vec![ColumnType::new_nullable(
ConcreteDataType::int64_datatype(),
)]);
let key_val_plan = KeyValPlan {
key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
};
let simple_aggrs = vec![AggrWithIndex::new(
AggregateExpr {
func: AggregateFunc::SumInt64,
expr: ScalarExpr::Column(0),
distinct: false,
},
0,
0,
)];
let accum_plan = AccumulablePlan {
full_aggrs: vec![AggregateExpr {
func: AggregateFunc::SumInt64,
expr: ScalarExpr::Column(0),
distinct: false,
}],
simple_aggrs,
distinct_aggrs: vec![],
};
let reduce_plan = ReducePlan::Accumulable(accum_plan);
let bundle = ctx
.render_reduce_batch(
Box::new(input_plan.with_types(typ.into_unnamed())),
&key_val_plan,
&reduce_plan,
&RelationType::empty(),
)
.unwrap();
{
let now_inner = now.clone();
let expected = BTreeMap::<i64, Vec<i64>>::from([
(1, vec![1i64]),
(2, vec![3i64]),
(3, vec![6i64]),
(4, vec![7i64]),
(5, vec![9i64]),
(6, vec![12i64]),
]);
let collection = bundle.collection;
ctx.df
.add_subgraph_sink("test_sink", collection.into_inner(), move |_ctx, recv| {
let now = *now_inner.borrow();
let data = recv.take_inner();
let res = data.into_iter().flat_map(|v| v.into_iter()).collect_vec();
if let Some(expected) = expected.get(&now) {
let batch = expected.iter().map(|v| Value::from(*v)).collect_vec();
let batch = Batch::try_from_rows(vec![batch.into()]).unwrap();
assert_eq!(res.first(), Some(&batch));
}
});
drop(ctx);
for now in 1..7 {
state.set_current_ts(now);
state.run_available_with_schedule(&mut df);
if !state.get_err_collector().is_empty() {
panic!(
"Errors occur: {:?}",
state.get_err_collector().get_all_blocking()
)
}
}
}
}
/// SELECT SUM(col) FROM table
///
/// table schema:

View File

@@ -27,11 +27,67 @@ use crate::compute::render::Context;
use crate::compute::types::{Arranged, Collection, CollectionBundle, Toff};
use crate::error::{Error, PlanSnafu};
use crate::expr::error::InternalSnafu;
use crate::expr::EvalError;
use crate::expr::{Batch, EvalError};
use crate::repr::{DiffRow, Row, BROADCAST_CAP};
#[allow(clippy::mutable_key_type)]
impl<'referred, 'df> Context<'referred, 'df> {
/// simply send the batch to downstream, without fancy features like buffering
pub fn render_source_batch(
&mut self,
mut src_recv: broadcast::Receiver<Batch>,
) -> Result<CollectionBundle<Batch>, Error> {
debug!("Rendering Source Batch");
let (send_port, recv_port) = self.df.make_edge::<_, Toff<Batch>>("source_batch");
let schd = self.compute_state.get_scheduler();
let inner_schd = schd.clone();
let now = self.compute_state.current_time_ref();
let err_collector = self.err_collector.clone();
let sub = self
.df
.add_subgraph_source("source_batch", send_port, move |_ctx, send| {
loop {
match src_recv.try_recv() {
Ok(batch) => {
send.give(vec![batch]);
}
Err(TryRecvError::Empty) => {
break;
}
Err(TryRecvError::Lagged(lag_offset)) => {
// use `err_collector` instead of `error!` to locate which operator caused the error
err_collector.run(|| -> Result<(), EvalError> {
InternalSnafu {
reason: format!("Flow missing {} rows behind", lag_offset),
}
.fail()
});
break;
}
Err(TryRecvError::Closed) => {
err_collector.run(|| -> Result<(), EvalError> {
InternalSnafu {
reason: "Source Batch Channel is closed".to_string(),
}
.fail()
});
break;
}
}
}
let now = *now.borrow();
// always schedule source to run at now so we can
// repeatedly run source if needed
inner_schd.schedule_at(now);
});
schd.set_cur_subgraph(sub);
let bundle = CollectionBundle::from_collection(Collection::<Batch>::from_port(recv_port));
Ok(bundle)
}
/// Render a source which comes from brocast channel into the dataflow
/// will immediately send updates not greater than `now` and buffer the rest in arrangement
pub fn render_source(
@@ -114,6 +170,32 @@ impl<'referred, 'df> Context<'referred, 'df> {
})
}
pub fn render_unbounded_sink_batch(
&mut self,
bundle: CollectionBundle<Batch>,
sender: mpsc::UnboundedSender<Batch>,
) {
let CollectionBundle {
collection,
arranged: _,
} = bundle;
let _sink = self.df.add_subgraph_sink(
"UnboundedSinkBatch",
collection.into_inner(),
move |_ctx, recv| {
let data = recv.take_inner();
for batch in data.into_iter().flat_map(|i| i.into_iter()) {
// if the sender is closed unexpectedly, stop sending
if sender.is_closed() || sender.send(batch).is_err() {
common_telemetry::error!("UnboundedSinkBatch is closed");
break;
}
}
},
);
}
pub fn render_unbounded_sink(
&mut self,
bundle: CollectionBundle,

View File

@@ -105,11 +105,13 @@ impl Arranged {
/// This type maintains the invariant that it does contain at least one(or both) valid
/// source of data, either a collection or at least one arrangement. This is for convenience
/// of reading the data from the collection.
pub struct CollectionBundle {
///
// TODO(discord9): make T default to Batch and obsolete the Row Mode
pub struct CollectionBundle<T: 'static = DiffRow> {
/// This is useful for passively reading the new updates from the collection
///
/// Invariant: the timestamp of the updates should always not greater than now, since future updates should be stored in the arrangement
pub collection: Collection<DiffRow>,
pub collection: Collection<T>,
/// the key [`ScalarExpr`] indicate how the keys(also a [`Row`]) used in Arranged is extract from collection's [`Row`]
/// So it is the "index" of the arrangement
///
@@ -121,13 +123,16 @@ pub struct CollectionBundle {
pub arranged: BTreeMap<Vec<ScalarExpr>, Arranged>,
}
impl CollectionBundle {
pub fn from_collection(collection: Collection<DiffRow>) -> Self {
impl<T: 'static> CollectionBundle<T> {
pub fn from_collection(collection: Collection<T>) -> Self {
Self {
collection,
arranged: BTreeMap::default(),
}
}
}
impl<T: 'static + Clone> CollectionBundle<T> {
pub fn clone(&self, df: &mut Hydroflow) -> Self {
Self {
collection: self.collection.clone(df),

View File

@@ -0,0 +1,604 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Datafusion optimizer for flow plan
#![warn(unused)]
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use common_error::ext::BoxedError;
use common_telemetry::debug;
use datafusion::config::ConfigOptions;
use datafusion::error::DataFusionError;
use datafusion::optimizer::analyzer::type_coercion::TypeCoercion;
use datafusion::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
use datafusion::optimizer::optimize_projections::OptimizeProjections;
use datafusion::optimizer::simplify_expressions::SimplifyExpressions;
use datafusion::optimizer::unwrap_cast_in_comparison::UnwrapCastInComparison;
use datafusion::optimizer::utils::NamePreserver;
use datafusion::optimizer::{Analyzer, AnalyzerRule, Optimizer, OptimizerContext};
use datafusion_common::tree_node::{
Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter, TreeNodeVisitor,
};
use datafusion_common::{Column, DFSchema, ScalarValue};
use datafusion_expr::aggregate_function::AggregateFunction;
use datafusion_expr::expr::AggregateFunctionDefinition;
use datafusion_expr::utils::merge_schema;
use datafusion_expr::{
BinaryExpr, Expr, Operator, Projection, ScalarUDFImpl, Signature, TypeSignature, Volatility,
};
use query::parser::QueryLanguageParser;
use query::plan::LogicalPlan;
use query::query_engine::DefaultSerializer;
use query::QueryEngine;
use snafu::ResultExt;
/// note here we are using the `substrait_proto_df` crate from the `substrait` module and
/// rename it to `substrait_proto`
use substrait::DFLogicalSubstraitConvertor;
use crate::adapter::FlownodeContext;
use crate::error::{DatafusionSnafu, Error, ExternalSnafu, UnexpectedSnafu};
use crate::expr::{TUMBLE_END, TUMBLE_START};
use crate::plan::TypedPlan;
// TODO(discord9): use `Analyzer` to manage rules if more `AnalyzerRule` is needed
pub async fn apply_df_optimizer(
plan: datafusion_expr::LogicalPlan,
) -> Result<datafusion_expr::LogicalPlan, Error> {
let cfg = ConfigOptions::new();
let analyzer = Analyzer::with_rules(vec![
Arc::new(AvgExpandRule::new()),
Arc::new(TumbleExpandRule::new()),
Arc::new(CheckGroupByRule::new()),
Arc::new(TypeCoercion::new()),
]);
let plan = analyzer
.execute_and_check(plan, &cfg, |p, r| {
debug!("After apply rule {}, get plan: \n{:?}", r.name(), p);
})
.context(DatafusionSnafu {
context: "Fail to apply analyzer",
})?;
let ctx = OptimizerContext::new();
let optimizer = Optimizer::with_rules(vec![
Arc::new(OptimizeProjections::new()),
Arc::new(CommonSubexprEliminate::new()),
Arc::new(SimplifyExpressions::new()),
Arc::new(UnwrapCastInComparison::new()),
]);
let plan = optimizer
.optimize(plan, &ctx, |_, _| {})
.context(DatafusionSnafu {
context: "Fail to apply optimizer",
})?;
Ok(plan)
}
/// To reuse existing code for parse sql, the sql is first parsed into a datafusion logical plan,
/// then to a substrait plan, and finally to a flow plan.
pub async fn sql_to_flow_plan(
ctx: &mut FlownodeContext,
engine: &Arc<dyn QueryEngine>,
sql: &str,
) -> Result<TypedPlan, Error> {
let query_ctx = ctx.query_context.clone().ok_or_else(|| {
UnexpectedSnafu {
reason: "Query context is missing",
}
.build()
})?;
let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let plan = engine
.planner()
.plan(stmt, query_ctx)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let LogicalPlan::DfPlan(plan) = plan;
let opted_plan = apply_df_optimizer(plan).await?;
// TODO(discord9): add df optimization
let sub_plan = DFLogicalSubstraitConvertor {}
.to_sub_plan(&opted_plan, DefaultSerializer)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let flow_plan = TypedPlan::from_substrait_plan(ctx, &sub_plan).await?;
Ok(flow_plan)
}
struct AvgExpandRule {}
impl AvgExpandRule {
pub fn new() -> Self {
Self {}
}
}
impl AnalyzerRule for AvgExpandRule {
fn analyze(
&self,
plan: datafusion_expr::LogicalPlan,
_config: &ConfigOptions,
) -> datafusion_common::Result<datafusion_expr::LogicalPlan> {
let transformed = plan
.transform_up_with_subqueries(expand_avg_analyzer)?
.data
.transform_down_with_subqueries(put_aggr_to_proj_analyzer)?
.data;
Ok(transformed)
}
fn name(&self) -> &str {
"avg_expand"
}
}
/// lift aggr's composite aggr_expr to outer proj, and leave aggr only with simple direct aggr expr
/// i.e.
/// ```ignore
/// proj: avg(x)
/// -- aggr: [sum(x)/count(x) as avg(x)]
/// ```
/// becomes:
/// ```ignore
/// proj: sum(x)/count(x) as avg(x)
/// -- aggr: [sum(x), count(x)]
/// ```
fn put_aggr_to_proj_analyzer(
plan: datafusion_expr::LogicalPlan,
) -> Result<Transformed<datafusion_expr::LogicalPlan>, DataFusionError> {
if let datafusion_expr::LogicalPlan::Projection(proj) = &plan {
if let datafusion_expr::LogicalPlan::Aggregate(aggr) = proj.input.as_ref() {
let mut replace_old_proj_exprs = HashMap::new();
let mut expanded_aggr_exprs = vec![];
for aggr_expr in &aggr.aggr_expr {
let mut is_composite = false;
if let Expr::AggregateFunction(_) = &aggr_expr {
expanded_aggr_exprs.push(aggr_expr.clone());
} else {
let old_name = aggr_expr.name_for_alias()?;
let new_proj_expr = aggr_expr
.clone()
.transform(|ch| {
if let Expr::AggregateFunction(_) = &ch {
is_composite = true;
expanded_aggr_exprs.push(ch.clone());
Ok(Transformed::yes(Expr::Column(Column::from_qualified_name(
ch.name_for_alias()?,
))))
} else {
Ok(Transformed::no(ch))
}
})?
.data;
replace_old_proj_exprs.insert(old_name, new_proj_expr);
}
}
if expanded_aggr_exprs.len() > aggr.aggr_expr.len() {
let mut aggr = aggr.clone();
aggr.aggr_expr = expanded_aggr_exprs;
let mut aggr_plan = datafusion_expr::LogicalPlan::Aggregate(aggr);
// important to recompute schema after changing aggr_expr
aggr_plan = aggr_plan.recompute_schema()?;
// reconstruct proj with new proj_exprs
let mut new_proj_exprs = proj.expr.clone();
for proj_expr in new_proj_exprs.iter_mut() {
if let Some(new_proj_expr) =
replace_old_proj_exprs.get(&proj_expr.name_for_alias()?)
{
*proj_expr = new_proj_expr.clone();
}
*proj_expr = proj_expr
.clone()
.transform(|expr| {
if let Some(new_expr) =
replace_old_proj_exprs.get(&expr.name_for_alias()?)
{
Ok(Transformed::yes(new_expr.clone()))
} else {
Ok(Transformed::no(expr))
}
})?
.data;
}
let proj = datafusion_expr::LogicalPlan::Projection(Projection::try_new(
new_proj_exprs,
Arc::new(aggr_plan),
)?);
return Ok(Transformed::yes(proj));
}
}
}
Ok(Transformed::no(plan))
}
/// expand `avg(<expr>)` function into `cast(sum((<expr>) AS f64)/count((<expr>)`
fn expand_avg_analyzer(
plan: datafusion_expr::LogicalPlan,
) -> Result<Transformed<datafusion_expr::LogicalPlan>, DataFusionError> {
let mut schema = merge_schema(plan.inputs());
if let datafusion_expr::LogicalPlan::TableScan(ts) = &plan {
let source_schema =
DFSchema::try_from_qualified_schema(ts.table_name.clone(), &ts.source.schema())?;
schema.merge(&source_schema);
}
let mut expr_rewrite = ExpandAvgRewriter::new(&schema);
let name_preserver = NamePreserver::new(&plan);
// apply coercion rewrite all expressions in the plan individually
plan.map_expressions(|expr| {
let original_name = name_preserver.save(&expr)?;
expr.rewrite(&mut expr_rewrite)?
.map_data(|expr| original_name.restore(expr))
})?
.map_data(|plan| plan.recompute_schema())
}
/// rewrite `avg(<expr>)` function into `CASE WHEN count(<expr>) !=0 THEN cast(sum((<expr>) AS avg_return_type)/count((<expr>) ELSE 0`
///
/// TODO(discord9): support avg return type decimal128
///
/// see impl details at https://github.com/apache/datafusion/blob/4ad4f90d86c57226a4e0fb1f79dfaaf0d404c273/datafusion/expr/src/type_coercion/aggregates.rs#L457-L462
pub(crate) struct ExpandAvgRewriter<'a> {
/// schema of the plan
#[allow(unused)]
pub(crate) schema: &'a DFSchema,
}
impl<'a> ExpandAvgRewriter<'a> {
fn new(schema: &'a DFSchema) -> Self {
Self { schema }
}
}
impl<'a> TreeNodeRewriter for ExpandAvgRewriter<'a> {
type Node = Expr;
fn f_up(&mut self, expr: Expr) -> Result<Transformed<Expr>, DataFusionError> {
if let Expr::AggregateFunction(aggr_func) = &expr {
if let AggregateFunctionDefinition::BuiltIn(AggregateFunction::Avg) =
&aggr_func.func_def
{
let sum_expr = {
let mut tmp = aggr_func.clone();
tmp.func_def = AggregateFunctionDefinition::BuiltIn(AggregateFunction::Sum);
Expr::AggregateFunction(tmp)
};
let sum_cast = {
let mut tmp = sum_expr.clone();
tmp = Expr::Cast(datafusion_expr::Cast {
expr: Box::new(tmp),
data_type: arrow_schema::DataType::Float64,
});
tmp
};
let count_expr = {
let mut tmp = aggr_func.clone();
tmp.func_def = AggregateFunctionDefinition::BuiltIn(AggregateFunction::Count);
Expr::AggregateFunction(tmp)
};
let count_expr_ref =
Expr::Column(Column::from_qualified_name(count_expr.name_for_alias()?));
let div =
BinaryExpr::new(Box::new(sum_cast), Operator::Divide, Box::new(count_expr));
let div_expr = Box::new(Expr::BinaryExpr(div));
let zero = Box::new(Expr::Literal(ScalarValue::Int64(Some(0))));
let not_zero =
BinaryExpr::new(Box::new(count_expr_ref), Operator::NotEq, zero.clone());
let not_zero = Box::new(Expr::BinaryExpr(not_zero));
let null = Box::new(Expr::Literal(ScalarValue::Null));
let case_when =
datafusion_expr::Case::new(None, vec![(not_zero, div_expr)], Some(null));
let case_when_expr = Expr::Case(case_when);
return Ok(Transformed::yes(case_when_expr));
}
}
Ok(Transformed::no(expr))
}
}
/// expand tumble in aggr expr to tumble_start and tumble_end with column name like `window_start`
struct TumbleExpandRule {}
impl TumbleExpandRule {
pub fn new() -> Self {
Self {}
}
}
impl AnalyzerRule for TumbleExpandRule {
fn analyze(
&self,
plan: datafusion_expr::LogicalPlan,
_config: &ConfigOptions,
) -> datafusion_common::Result<datafusion_expr::LogicalPlan> {
let transformed = plan
.transform_up_with_subqueries(expand_tumble_analyzer)?
.data;
Ok(transformed)
}
fn name(&self) -> &str {
"tumble_expand"
}
}
/// expand `tumble` in aggr expr to `tumble_start` and `tumble_end`, also expand related alias and column ref
///
/// will add `tumble_start` and `tumble_end` to outer projection if not exist before
fn expand_tumble_analyzer(
plan: datafusion_expr::LogicalPlan,
) -> Result<Transformed<datafusion_expr::LogicalPlan>, DataFusionError> {
if let datafusion_expr::LogicalPlan::Projection(proj) = &plan {
if let datafusion_expr::LogicalPlan::Aggregate(aggr) = proj.input.as_ref() {
let mut new_group_expr = vec![];
let mut alias_to_expand = HashMap::new();
let mut encountered_tumble = false;
for expr in aggr.group_expr.iter() {
match expr {
datafusion_expr::Expr::ScalarFunction(func) if func.name() == "tumble" => {
encountered_tumble = true;
let tumble_start = TumbleExpand::new(TUMBLE_START);
let tumble_start = datafusion_expr::expr::ScalarFunction::new_udf(
Arc::new(tumble_start.into()),
func.args.clone(),
);
let tumble_start = datafusion_expr::Expr::ScalarFunction(tumble_start);
let start_col_name = tumble_start.name_for_alias()?;
new_group_expr.push(tumble_start);
let tumble_end = TumbleExpand::new(TUMBLE_END);
let tumble_end = datafusion_expr::expr::ScalarFunction::new_udf(
Arc::new(tumble_end.into()),
func.args.clone(),
);
let tumble_end = datafusion_expr::Expr::ScalarFunction(tumble_end);
let end_col_name = tumble_end.name_for_alias()?;
new_group_expr.push(tumble_end);
alias_to_expand
.insert(expr.name_for_alias()?, (start_col_name, end_col_name));
}
_ => new_group_expr.push(expr.clone()),
}
}
if !encountered_tumble {
return Ok(Transformed::no(plan));
}
let mut new_aggr = aggr.clone();
new_aggr.group_expr = new_group_expr;
let new_aggr = datafusion_expr::LogicalPlan::Aggregate(new_aggr).recompute_schema()?;
// replace alias in projection if needed, and add new column ref if necessary
let mut new_proj_expr = vec![];
let mut have_expanded = false;
for proj_expr in proj.expr.iter() {
if let Some((start_col_name, end_col_name)) =
alias_to_expand.get(&proj_expr.name_for_alias()?)
{
let start_col = Column::from_qualified_name(start_col_name);
let end_col = Column::from_qualified_name(end_col_name);
new_proj_expr.push(datafusion_expr::Expr::Column(start_col));
new_proj_expr.push(datafusion_expr::Expr::Column(end_col));
have_expanded = true;
} else {
new_proj_expr.push(proj_expr.clone());
}
}
// append to end of projection if not exist
if !have_expanded {
for (start_col_name, end_col_name) in alias_to_expand.values() {
let start_col = Column::from_qualified_name(start_col_name);
let end_col = Column::from_qualified_name(end_col_name);
new_proj_expr
.push(datafusion_expr::Expr::Column(start_col).alias("window_start"));
new_proj_expr.push(datafusion_expr::Expr::Column(end_col).alias("window_end"));
}
}
let new_proj = datafusion_expr::LogicalPlan::Projection(Projection::try_new(
new_proj_expr,
Arc::new(new_aggr),
)?);
return Ok(Transformed::yes(new_proj));
}
}
Ok(Transformed::no(plan))
}
/// This is a placeholder for tumble_start and tumble_end function, so that datafusion can
/// recognize them as scalar function
#[derive(Debug)]
pub struct TumbleExpand {
signature: Signature,
name: String,
}
impl TumbleExpand {
pub fn new(name: &str) -> Self {
Self {
signature: Signature::new(TypeSignature::UserDefined, Volatility::Immutable),
name: name.to_string(),
}
}
}
impl ScalarUDFImpl for TumbleExpand {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn name(&self) -> &str {
&self.name
}
/// elide the signature for now
fn signature(&self) -> &Signature {
&self.signature
}
fn coerce_types(
&self,
arg_types: &[arrow_schema::DataType],
) -> datafusion_common::Result<Vec<arrow_schema::DataType>> {
match (arg_types.first(), arg_types.get(1), arg_types.get(2)) {
(Some(ts), Some(window), opt) => {
use arrow_schema::DataType::*;
if !matches!(ts, Date32 | Date64 | Timestamp(_, _)) {
return Err(DataFusionError::Plan(
format!("Expect timestamp column as first arg for tumble_start, found {:?}", ts)
));
}
if !matches!(window, Utf8 | Interval(_)) {
return Err(DataFusionError::Plan(
format!("Expect second arg for window size's type being interval for tumble_start, found {:?}", window),
));
}
if let Some(start_time) = opt{
if !matches!(start_time, Utf8 | Date32 | Date64 | Timestamp(_, _)){
return Err(DataFusionError::Plan(
format!("Expect start_time to either be date, timestampe or string, found {:?}", start_time)
));
}
}
Ok(arg_types.to_vec())
}
_ => Err(DataFusionError::Plan(
"Expect tumble function have at least two arg(timestamp column and window size) and a third optional arg for starting time".to_string(),
)),
}
}
fn return_type(
&self,
arg_types: &[arrow_schema::DataType],
) -> Result<arrow_schema::DataType, DataFusionError> {
arg_types.first().cloned().ok_or_else(|| {
DataFusionError::Plan(
"Expect tumble function have at least two arg(timestamp column and window size)"
.to_string(),
)
})
}
fn invoke(
&self,
_args: &[datafusion_expr::ColumnarValue],
) -> Result<datafusion_expr::ColumnarValue, DataFusionError> {
Err(DataFusionError::Plan(
"This function should not be executed by datafusion".to_string(),
))
}
}
/// This rule check all group by exprs, and make sure they are also in select clause in a aggr query
struct CheckGroupByRule {}
impl CheckGroupByRule {
pub fn new() -> Self {
Self {}
}
}
impl AnalyzerRule for CheckGroupByRule {
fn analyze(
&self,
plan: datafusion_expr::LogicalPlan,
_config: &ConfigOptions,
) -> datafusion_common::Result<datafusion_expr::LogicalPlan> {
let transformed = plan
.transform_up_with_subqueries(check_group_by_analyzer)?
.data;
Ok(transformed)
}
fn name(&self) -> &str {
"check_groupby"
}
}
/// make sure everything in group by's expr is in select
fn check_group_by_analyzer(
plan: datafusion_expr::LogicalPlan,
) -> Result<Transformed<datafusion_expr::LogicalPlan>, DataFusionError> {
if let datafusion_expr::LogicalPlan::Projection(proj) = &plan {
if let datafusion_expr::LogicalPlan::Aggregate(aggr) = proj.input.as_ref() {
let mut found_column_used = FindColumn::new();
proj.expr
.iter()
.map(|i| i.visit(&mut found_column_used))
.count();
for expr in aggr.group_expr.iter() {
if !found_column_used
.names_for_alias
.contains(&expr.name_for_alias()?)
{
return Err(DataFusionError::Plan(format!("Expect {} expr in group by also exist in select list, but select list only contain {:?}",expr.name_for_alias()?, found_column_used.names_for_alias)));
}
}
}
}
Ok(Transformed::no(plan))
}
/// Find all column names in a plan
#[derive(Debug, Default)]
struct FindColumn {
names_for_alias: HashSet<String>,
}
impl FindColumn {
fn new() -> Self {
Default::default()
}
}
impl TreeNodeVisitor<'_> for FindColumn {
type Node = datafusion_expr::Expr;
fn f_down(
&mut self,
node: &datafusion_expr::Expr,
) -> Result<TreeNodeRecursion, DataFusionError> {
if let datafusion_expr::Expr::Column(_) = node {
self.names_for_alias.insert(node.name_for_alias()?);
}
Ok(TreeNodeRecursion::Continue)
}
}

View File

@@ -24,6 +24,7 @@ mod scalar;
mod signature;
use datatypes::prelude::DataType;
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
pub(crate) use df_func::{DfScalarFunction, RawDfScalarFn};
pub(crate) use error::{EvalError, InvalidArgumentSnafu};
@@ -37,42 +38,168 @@ use snafu::{ensure, ResultExt};
use crate::expr::error::DataTypeSnafu;
pub const TUMBLE_START: &str = "tumble_start";
pub const TUMBLE_END: &str = "tumble_end";
/// A batch of vectors with the same length but without schema, only useful in dataflow
///
/// somewhere cheap to clone since it just contains a list of VectorRef(which is a `Arc`).
#[derive(Debug, Clone)]
pub struct Batch {
batch: Vec<VectorRef>,
row_count: usize,
/// describe if corresponding rows in batch is insert or delete, None means all rows are insert
diffs: Option<VectorRef>,
}
impl PartialEq for Batch {
fn eq(&self, other: &Self) -> bool {
let mut batch_eq = true;
if self.batch.len() != other.batch.len() {
return false;
}
for (left, right) in self.batch.iter().zip(other.batch.iter()) {
batch_eq = batch_eq
&& <dyn arrow::array::Array>::eq(&left.to_arrow_array(), &right.to_arrow_array());
}
let diff_eq = match (&self.diffs, &other.diffs) {
(Some(left), Some(right)) => {
<dyn arrow::array::Array>::eq(&left.to_arrow_array(), &right.to_arrow_array())
}
(None, None) => true,
_ => false,
};
batch_eq && diff_eq && self.row_count == other.row_count
}
}
impl Eq for Batch {}
impl Default for Batch {
fn default() -> Self {
Self::empty()
}
}
impl Batch {
pub fn new(batch: Vec<VectorRef>, row_count: usize) -> Self {
Self { batch, row_count }
pub fn try_from_rows(rows: Vec<crate::repr::Row>) -> Result<Self, EvalError> {
if rows.is_empty() {
return Ok(Self::empty());
}
let len = rows.len();
let mut builder = rows
.first()
.unwrap()
.iter()
.map(|v| v.data_type().create_mutable_vector(len))
.collect_vec();
for row in rows {
ensure!(
row.len() == builder.len(),
InvalidArgumentSnafu {
reason: format!(
"row length not match, expect {}, found {}",
builder.len(),
row.len()
)
}
);
for (idx, value) in row.iter().enumerate() {
builder[idx]
.try_push_value_ref(value.as_value_ref())
.context(DataTypeSnafu {
msg: "Failed to convert rows to columns",
})?;
}
}
let columns = builder.into_iter().map(|mut b| b.to_vector()).collect_vec();
let batch = Self::try_new(columns, len)?;
Ok(batch)
}
pub fn empty() -> Self {
Self {
batch: vec![],
row_count: 0,
diffs: None,
}
}
pub fn try_new(batch: Vec<VectorRef>, row_count: usize) -> Result<Self, EvalError> {
ensure!(
batch.iter().map(|v| v.len()).all_equal()
&& batch.first().map(|v| v.len() == row_count).unwrap_or(true),
InvalidArgumentSnafu {
reason: "All columns should have same length".to_string()
}
);
Ok(Self {
batch,
row_count,
diffs: None,
})
}
pub fn new_unchecked(batch: Vec<VectorRef>, row_count: usize) -> Self {
Self {
batch,
row_count,
diffs: None,
}
}
pub fn batch(&self) -> &[VectorRef] {
&self.batch
}
pub fn batch_mut(&mut self) -> &mut Vec<VectorRef> {
&mut self.batch
}
pub fn row_count(&self) -> usize {
self.row_count
}
pub fn set_row_count(&mut self, row_count: usize) {
self.row_count = row_count;
}
pub fn column_count(&self) -> usize {
self.batch.len()
}
pub fn get_row(&self, idx: usize) -> Result<Vec<Value>, EvalError> {
ensure!(
idx < self.row_count,
InvalidArgumentSnafu {
reason: format!(
"Expect row index to be less than {}, found {}",
self.row_count, idx
)
}
);
Ok(self.batch.iter().map(|v| v.get(idx)).collect_vec())
}
/// Slices the `Batch`, returning a new `Batch`.
///
/// # Panics
/// This function panics if `offset + length > self.row_count()`.
pub fn slice(&self, offset: usize, length: usize) -> Batch {
pub fn slice(&self, offset: usize, length: usize) -> Result<Batch, EvalError> {
let batch = self
.batch()
.iter()
.map(|v| v.slice(offset, length))
.collect_vec();
Batch::new(batch, length)
Batch::try_new(batch, length)
}
/// append another batch to self
///
/// NOTE: This is expensive since it will create new vectors for each column
pub fn append_batch(&mut self, other: Batch) -> Result<(), EvalError> {
ensure!(
self.batch.len() == other.batch.len(),
self.batch.len() == other.batch.len()
|| self.batch.is_empty()
|| other.batch.is_empty(),
InvalidArgumentSnafu {
reason: format!(
"Expect two batch to have same numbers of column, found {} and {} columns",
@@ -82,21 +209,31 @@ impl Batch {
}
);
let batch_builders = self
.batch
if self.batch.is_empty() {
self.batch = other.batch;
self.row_count = other.row_count;
return Ok(());
} else if other.batch.is_empty() {
return Ok(());
}
let dts = if self.batch.is_empty() {
other.batch.iter().map(|v| v.data_type()).collect_vec()
} else {
self.batch.iter().map(|v| v.data_type()).collect_vec()
};
let batch_builders = dts
.iter()
.map(|v| {
v.data_type()
.create_mutable_vector(self.row_count() + other.row_count())
})
.map(|dt| dt.create_mutable_vector(self.row_count() + other.row_count()))
.collect_vec();
let mut result = vec![];
let zelf_row_count = self.row_count();
let self_row_count = self.row_count();
let other_row_count = other.row_count();
for (idx, mut builder) in batch_builders.into_iter().enumerate() {
builder
.extend_slice_of(self.batch()[idx].as_ref(), 0, zelf_row_count)
.extend_slice_of(self.batch()[idx].as_ref(), 0, self_row_count)
.context(DataTypeSnafu {
msg: "Failed to extend vector",
})?;
@@ -108,7 +245,7 @@ impl Batch {
result.push(builder.to_vector());
}
self.batch = result;
self.row_count = zelf_row_count + other_row_count;
self.row_count = self_row_count + other_row_count;
Ok(())
}
}

View File

@@ -35,13 +35,13 @@ use snafu::{ensure, OptionExt, ResultExt};
use strum::{EnumIter, IntoEnumIterator};
use substrait::df_logical_plan::consumer::name_to_op;
use crate::error::{Error, ExternalSnafu, InvalidQuerySnafu, PlanSnafu};
use crate::error::{Error, ExternalSnafu, InvalidQuerySnafu, PlanSnafu, UnexpectedSnafu};
use crate::expr::error::{
ArrowSnafu, CastValueSnafu, DataTypeSnafu, DivisionByZeroSnafu, EvalError, OverflowSnafu,
TryFromValueSnafu, TypeMismatchSnafu,
};
use crate::expr::signature::{GenericFn, Signature};
use crate::expr::{Batch, InvalidArgumentSnafu, ScalarExpr, TypedExpr};
use crate::expr::{Batch, InvalidArgumentSnafu, ScalarExpr, TypedExpr, TUMBLE_END, TUMBLE_START};
use crate::repr::{self, value_to_internal_ts};
/// UnmaterializableFunc is a function that can't be eval independently,
@@ -87,42 +87,10 @@ impl UnmaterializableFunc {
}
/// Create a UnmaterializableFunc from a string of the function name
pub fn from_str_args(name: &str, args: Vec<TypedExpr>) -> Result<Self, Error> {
pub fn from_str_args(name: &str, _args: Vec<TypedExpr>) -> Result<Self, Error> {
match name.to_lowercase().as_str() {
"now" => Ok(Self::Now),
"current_schema" => Ok(Self::CurrentSchema),
"tumble" => {
let ts = args.first().context(InvalidQuerySnafu {
reason: "Tumble window function requires a timestamp argument",
})?;
let window_size = args
.get(1)
.and_then(|expr| expr.expr.as_literal())
.context(InvalidQuerySnafu {
reason: "Tumble window function requires a window size argument"
})?.as_string() // TODO(discord9): since df to substrait convertor does not support interval type yet, we need to take a string and cast it to interval instead
.map(|s|cast(Value::from(s), &ConcreteDataType::interval_month_day_nano_datatype())).transpose().map_err(BoxedError::new).context(
ExternalSnafu
)?.and_then(|v|v.as_interval())
.with_context(||InvalidQuerySnafu {
reason: format!("Tumble window function requires window size argument to be a string describe a interval, found {:?}", args.get(1))
})?;
let start_time = match args.get(2) {
Some(start_time) => start_time.expr.as_literal(),
None => None,
}
.map(|s| cast(s.clone(), &ConcreteDataType::datetime_datatype())).transpose().map_err(BoxedError::new).context(ExternalSnafu)?.map(|v|v.as_datetime().with_context(
||InvalidQuerySnafu {
reason: format!("Tumble window function requires start time argument to be a datetime describe in string, found {:?}", args.get(2))
}
)).transpose()?;
Ok(Self::TumbleWindow {
ts: Box::new(ts.clone()),
window_size,
start_time,
})
}
_ => InvalidQuerySnafu {
reason: format!("Unknown unmaterializable function: {}", name),
}
@@ -347,6 +315,96 @@ impl UnaryFunc {
}
}
pub fn from_tumble_func(name: &str, args: &[TypedExpr]) -> Result<(Self, TypedExpr), Error> {
match name.to_lowercase().as_str() {
TUMBLE_START | TUMBLE_END => {
let ts = args.first().context(InvalidQuerySnafu {
reason: "Tumble window function requires a timestamp argument",
})?;
let window_size = {
let window_size_untyped = args
.get(1)
.and_then(|expr| expr.expr.as_literal())
.context(InvalidQuerySnafu {
reason: "Tumble window function requires a window size argument",
})?;
if let Some(window_size) = window_size_untyped.as_string() {
// cast as interval
cast(
Value::from(window_size),
&ConcreteDataType::interval_month_day_nano_datatype(),
)
.map_err(BoxedError::new)
.context(ExternalSnafu)?
.as_interval()
.context(UnexpectedSnafu {
reason: "Expect window size arg to be interval after successful cast"
.to_string(),
})?
} else if let Some(interval) = window_size_untyped.as_interval() {
interval
} else {
InvalidQuerySnafu {
reason: format!(
"Tumble window function requires window size argument to be either a interval or a string describe a interval, found {:?}",
window_size_untyped
)
}.fail()?
}
};
// start time argument is optional
let start_time = match args.get(2) {
Some(start_time) => {
if let Some(value) = start_time.expr.as_literal() {
// cast as DateTime
let ret = cast(value, &ConcreteDataType::datetime_datatype())
.map_err(BoxedError::new)
.context(ExternalSnafu)?
.as_datetime()
.context(UnexpectedSnafu {
reason:
"Expect start time arg to be datetime after successful cast"
.to_string(),
})?;
Some(ret)
} else {
UnexpectedSnafu {
reason: "Expect start time arg to be literal",
}
.fail()?
}
}
None => None,
};
if name == TUMBLE_START {
Ok((
Self::TumbleWindowFloor {
window_size,
start_time,
},
ts.clone(),
))
} else if name == TUMBLE_END {
Ok((
Self::TumbleWindowCeiling {
window_size,
start_time,
},
ts.clone(),
))
} else {
unreachable!()
}
}
_ => crate::error::InternalSnafu {
reason: format!("Unknown tumble kind function: {}", name),
}
.fail()?,
}
}
/// Evaluate the function with given values and expression
///
/// # Arguments
@@ -712,8 +770,8 @@ impl BinaryFunc {
t1 == t2,
InvalidQuerySnafu {
reason: format!(
"Binary function {:?} requires both arguments to have the same type",
generic
"Binary function {:?} requires both arguments to have the same type, left={:?}, right={:?}",
generic, t1, t2
),
}
);

View File

@@ -16,13 +16,18 @@
use std::collections::{BTreeMap, BTreeSet};
use arrow::array::BooleanArray;
use arrow::compute::FilterBuilder;
use common_telemetry::debug;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::Value;
use snafu::ensure;
use datatypes::vectors::{BooleanVector, Helper};
use itertools::Itertools;
use snafu::{ensure, OptionExt, ResultExt};
use crate::error::{Error, InvalidQuerySnafu};
use crate::expr::error::{EvalError, InternalSnafu};
use crate::expr::{InvalidArgumentSnafu, ScalarExpr};
use crate::expr::error::{ArrowSnafu, DataTypeSnafu, EvalError, InternalSnafu, TypeMismatchSnafu};
use crate::expr::{Batch, InvalidArgumentSnafu, ScalarExpr};
use crate::repr::{self, value_to_internal_ts, Diff, Row};
/// A compound operator that can be applied row-by-row.
@@ -473,6 +478,85 @@ impl SafeMfpPlan {
self.mfp.permute(map, new_arity)
}
/// similar to [`MapFilterProject::evaluate_into`], just in batch, and rows that don't pass the predicates are not included in the output.
///
/// so it's not guaranteed that the output will have the same number of rows as the input.
pub fn eval_batch_into(&self, batch: &mut Batch) -> Result<Batch, EvalError> {
ensure!(
batch.column_count() == self.mfp.input_arity,
InvalidArgumentSnafu {
reason: format!(
"batch column length {} is not equal to input_arity {}",
batch.column_count(),
self.mfp.input_arity
),
}
);
let passed_predicates = self.eval_batch_inner(batch)?;
let filter = FilterBuilder::new(passed_predicates.as_boolean_array());
let pred = filter.build();
let mut result = vec![];
for col in batch.batch() {
let filtered = pred
.filter(col.to_arrow_array().as_ref())
.context(ArrowSnafu {
context: format!("failed to filter column for mfp operator {:?}", self),
})?;
result.push(Helper::try_into_vector(filtered).context(DataTypeSnafu {
msg: "Failed to convert arrow array to vector",
})?);
}
let projected = self
.mfp
.projection
.iter()
.map(|c| result[*c].clone())
.collect_vec();
let row_count = pred.count();
Batch::try_new(projected, row_count)
}
/// similar to [`MapFilterProject::evaluate_into`], just in batch.
pub fn eval_batch_inner(&self, batch: &mut Batch) -> Result<BooleanVector, EvalError> {
// mark the columns that have been evaluated and appended to the `batch`
let mut expression = 0;
// preds default to true and will be updated as we evaluate each predicate
let mut all_preds = BooleanVector::from(vec![Some(true); batch.row_count()]);
// to compute predicate, need to first compute all expressions used in predicates
for (support, predicate) in self.mfp.predicates.iter() {
while self.mfp.input_arity + expression < *support {
let expr_eval = self.mfp.expressions[expression].eval_batch(batch)?;
batch.batch_mut().push(expr_eval);
expression += 1;
}
let pred_vec = predicate.eval_batch(batch)?;
let pred_arr = pred_vec.to_arrow_array();
let pred_arr = pred_arr.as_any().downcast_ref::<BooleanArray>().context({
TypeMismatchSnafu {
expected: ConcreteDataType::boolean_datatype(),
actual: pred_vec.data_type(),
}
})?;
let all_arr = all_preds.as_boolean_array();
let res_arr = arrow::compute::and(all_arr, pred_arr).context(ArrowSnafu {
context: format!("failed to compute predicate for mfp operator {:?}", self),
})?;
all_preds = BooleanVector::from(res_arr);
}
// while evaluated expressions are less than total expressions, keep evaluating
while expression < self.mfp.expressions.len() {
let expr_eval = self.mfp.expressions[expression].eval_batch(batch)?;
batch.batch_mut().push(expr_eval);
expression += 1;
}
Ok(all_preds)
}
/// Evaluates the linear operator on a supplied list of datums.
///
/// The arguments are the initial datums associated with the row,
@@ -735,10 +819,15 @@ impl MfpPlan {
#[cfg(test)]
mod test {
use std::sync::Arc;
use datatypes::data_type::ConcreteDataType;
use datatypes::vectors::{Int32Vector, Int64Vector};
use pretty_assertions::assert_eq;
use super::*;
use crate::expr::{BinaryFunc, UnaryFunc, UnmaterializableFunc};
#[test]
fn test_mfp_with_time() {
use crate::expr::func::BinaryFunc;
@@ -844,6 +933,21 @@ mod test {
.unwrap()
.unwrap();
assert_eq!(ret, Row::pack(vec![Value::from(false), Value::from(true)]));
// batch mode
let mut batch = Batch::try_from_rows(vec![Row::from(vec![
Value::from(4),
Value::from(2),
Value::from(3),
])])
.unwrap();
let ret = safe_mfp.eval_batch_into(&mut batch).unwrap();
assert_eq!(
ret,
Batch::try_from_rows(vec![Row::from(vec![Value::from(false), Value::from(true)])])
.unwrap()
);
}
#[test]
@@ -865,7 +969,7 @@ mod test {
BinaryFunc::Gt,
)])
.unwrap();
let mut input1 = vec![
let input1 = vec![
Value::from(4),
Value::from(2),
Value::from(3),
@@ -873,19 +977,34 @@ mod test {
];
let safe_mfp = SafeMfpPlan { mfp };
let ret = safe_mfp
.evaluate_into(&mut input1, &mut Row::empty())
.evaluate_into(&mut input1.clone(), &mut Row::empty())
.unwrap();
assert_eq!(ret, None);
let mut input2 = vec![
let mut input1_batch = Batch::try_from_rows(vec![Row::new(input1)]).unwrap();
let ret_batch = safe_mfp.eval_batch_into(&mut input1_batch).unwrap();
assert_eq!(
ret_batch,
Batch::try_new(vec![Arc::new(Int32Vector::from_vec(vec![]))], 0).unwrap()
);
let input2 = vec![
Value::from(5),
Value::from(2),
Value::from(4),
Value::from("abc"),
];
let ret = safe_mfp
.evaluate_into(&mut input2, &mut Row::empty())
.evaluate_into(&mut input2.clone(), &mut Row::empty())
.unwrap();
assert_eq!(ret, Some(Row::pack(vec![Value::from(11)])));
let mut input2_batch = Batch::try_from_rows(vec![Row::new(input2)]).unwrap();
let ret_batch = safe_mfp.eval_batch_into(&mut input2_batch).unwrap();
assert_eq!(
ret_batch,
Batch::try_new(vec![Arc::new(Int32Vector::from_vec(vec![11]))], 1).unwrap()
);
}
#[test]
@@ -923,27 +1042,50 @@ mod test {
.unwrap()
.project([0, 1, 2])
.unwrap();
let mut input1 = vec![
let input1 = vec![
Value::from(4i64),
Value::from(2),
Value::from(3),
Value::from(53),
];
let safe_mfp = SafeMfpPlan { mfp };
let ret = safe_mfp.evaluate_into(&mut input1, &mut Row::empty());
let ret = safe_mfp.evaluate_into(&mut input1.clone(), &mut Row::empty());
assert!(matches!(ret, Err(EvalError::InvalidArgument { .. })));
let mut input1_batch = Batch::try_from_rows(vec![Row::new(input1)]).unwrap();
let ret_batch = safe_mfp.eval_batch_into(&mut input1_batch);
assert!(matches!(ret_batch, Err(EvalError::InvalidArgument { .. })));
let input2 = vec![Value::from(4i64), Value::from(2), Value::from(3)];
let ret = safe_mfp
.evaluate_into(&mut input2.clone(), &mut Row::empty())
.unwrap();
assert_eq!(ret, Some(Row::new(input2)));
assert_eq!(ret, Some(Row::new(input2.clone())));
let mut input3 = vec![Value::from(4i64), Value::from(5), Value::from(2)];
let input2_batch = Batch::try_from_rows(vec![Row::new(input2)]).unwrap();
let ret_batch = safe_mfp.eval_batch_into(&mut input2_batch.clone()).unwrap();
assert_eq!(ret_batch, input2_batch);
let input3 = vec![Value::from(4i64), Value::from(5), Value::from(2)];
let ret = safe_mfp
.evaluate_into(&mut input3, &mut Row::empty())
.evaluate_into(&mut input3.clone(), &mut Row::empty())
.unwrap();
assert_eq!(ret, None);
let input3_batch = Batch::try_from_rows(vec![Row::new(input3)]).unwrap();
let ret_batch = safe_mfp.eval_batch_into(&mut input3_batch.clone()).unwrap();
assert_eq!(
ret_batch,
Batch::try_new(
vec![
Arc::new(Int64Vector::from_vec(Default::default())),
Arc::new(Int32Vector::from_vec(Default::default())),
Arc::new(Int32Vector::from_vec(Default::default()))
],
0
)
.unwrap()
);
}
#[test]
@@ -961,10 +1103,18 @@ mod test {
.unwrap()
.project(vec![3])
.unwrap();
let mut input1 = vec![Value::from(2), Value::from(3), Value::from(4)];
let input1 = vec![Value::from(2), Value::from(3), Value::from(4)];
let safe_mfp = SafeMfpPlan { mfp };
let ret = safe_mfp.evaluate_into(&mut input1, &mut Row::empty());
let ret = safe_mfp.evaluate_into(&mut input1.clone(), &mut Row::empty());
assert_eq!(ret.unwrap(), Some(Row::new(vec![Value::from(false)])));
let mut input1_batch = Batch::try_from_rows(vec![Row::new(input1)]).unwrap();
let ret_batch = safe_mfp.eval_batch_into(&mut input1_batch).unwrap();
assert_eq!(
ret_batch,
Batch::try_new(vec![Arc::new(BooleanVector::from(vec![false]))], 1).unwrap()
);
}
#[test]
fn test_mfp_chore() {

View File

@@ -18,15 +18,17 @@ use std::sync::OnceLock;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
use serde::{Deserialize, Serialize};
use smallvec::smallvec;
use snafu::{IntoError, OptionExt};
use snafu::{ensure, IntoError, OptionExt};
use strum::{EnumIter, IntoEnumIterator};
use crate::error::{DatafusionSnafu, Error, InvalidQuerySnafu};
use crate::expr::error::EvalError;
use crate::expr::relation::accum::{Accum, Accumulator};
use crate::expr::signature::{GenericFn, Signature};
use crate::expr::InvalidArgumentSnafu;
use crate::repr::Diff;
/// Aggregate functions that can be applied to a group of rows.
@@ -131,6 +133,98 @@ impl AggregateFunc {
let res = accum.eval(self)?;
Ok((res, accum.into_state()))
}
/// return output value and new accumulator state
pub fn eval_batch<A>(
&self,
accum: A,
vector: VectorRef,
diff: Option<VectorRef>,
) -> Result<(Value, Vec<Value>), EvalError>
where
A: IntoIterator<Item = Value>,
{
let mut accum = accum.into_iter().peekable();
let mut accum = if accum.peek().is_none() {
Accum::new_accum(self)?
} else {
Accum::try_from_iter(self, &mut accum)?
};
let vector_diff = VectorDiff::try_new(vector, diff)?;
accum.update_batch(self, vector_diff)?;
let res = accum.eval(self)?;
Ok((res, accum.into_state()))
}
}
struct VectorDiff {
vector: VectorRef,
diff: Option<VectorRef>,
}
impl VectorDiff {
fn len(&self) -> usize {
self.vector.len()
}
fn try_new(vector: VectorRef, diff: Option<VectorRef>) -> Result<Self, EvalError> {
ensure!(
diff.as_ref()
.map_or(true, |diff| diff.len() == vector.len()),
InvalidArgumentSnafu {
reason: "Length of vector and diff should be the same"
}
);
Ok(Self { vector, diff })
}
}
impl IntoIterator for VectorDiff {
type Item = (Value, Diff);
type IntoIter = VectorDiffIter;
fn into_iter(self) -> Self::IntoIter {
VectorDiffIter {
vector: self.vector,
diff: self.diff,
idx: 0,
}
}
}
struct VectorDiffIter {
vector: VectorRef,
diff: Option<VectorRef>,
idx: usize,
}
impl std::iter::Iterator for VectorDiffIter {
type Item = (Value, Diff);
fn next(&mut self) -> Option<Self::Item> {
if self.idx >= self.vector.len() {
return None;
}
let value = self.vector.get(self.idx);
// +1 means insert, -1 means delete, and default to +1 insert when diff is not provided
let diff = if let Some(diff) = self.diff.as_ref() {
if let Ok(diff_at) = diff.get(self.idx).try_into() {
diff_at
} else {
common_telemetry::warn!("Invalid diff value at index {}", self.idx);
return None;
}
} else {
1
};
self.idx += 1;
Some((value, diff))
}
}
/// Generate signature for each aggregate function

View File

@@ -30,7 +30,7 @@ use crate::expr::error::{
};
use crate::expr::func::{BinaryFunc, UnaryFunc, UnmaterializableFunc, VariadicFunc};
use crate::expr::{Batch, DfScalarFunction};
use crate::repr::{ColumnType, RelationType};
use crate::repr::ColumnType;
/// A scalar expression with a known type.
#[derive(Ord, PartialOrd, Clone, Debug, Eq, PartialEq, Hash)]
pub struct TypedExpr {
@@ -46,77 +46,6 @@ impl TypedExpr {
}
}
impl TypedExpr {
/// expand multi-value expression to multiple expressions with new indices
///
/// Currently it just mean expand `TumbleWindow` to `TumbleWindowFloor` and `TumbleWindowCeiling`
///
/// TODO(discord9): test if nested reduce combine with df scalar function would cause problem
pub fn expand_multi_value(
input_typ: &RelationType,
exprs: &[TypedExpr],
) -> Result<Vec<TypedExpr>, Error> {
// old indices in mfp, expanded expr
let mut ret = vec![];
let input_arity = input_typ.column_types.len();
for (old_idx, expr) in exprs.iter().enumerate() {
if let ScalarExpr::CallUnmaterializable(UnmaterializableFunc::TumbleWindow {
ts,
window_size,
start_time,
}) = &expr.expr
{
let floor = UnaryFunc::TumbleWindowFloor {
window_size: *window_size,
start_time: *start_time,
};
let ceil = UnaryFunc::TumbleWindowCeiling {
window_size: *window_size,
start_time: *start_time,
};
let floor = ScalarExpr::CallUnary {
func: floor,
expr: Box::new(ts.expr.clone()),
}
.with_type(ts.typ.clone());
ret.push((None, floor));
let ceil = ScalarExpr::CallUnary {
func: ceil,
expr: Box::new(ts.expr.clone()),
}
.with_type(ts.typ.clone());
ret.push((None, ceil));
} else {
ret.push((Some(input_arity + old_idx), expr.clone()))
}
}
// get shuffled index(old_idx -> new_idx)
// note index is offset by input_arity because mfp is designed to be first include input columns then intermediate columns
let shuffle = ret
.iter()
.map(|(old_idx, _)| *old_idx) // [Option<opt_idx>]
.enumerate()
.map(|(new, old)| (old, new + input_arity))
.flat_map(|(old, new)| old.map(|o| (o, new)))
.chain((0..input_arity).map(|i| (i, i))) // also remember to chain the input columns as not changed
.collect::<BTreeMap<_, _>>();
// shuffle expr's index
let exprs = ret
.into_iter()
.map(|(_, mut expr)| {
// invariant: it is expect that no expr will try to refer the column being expanded
expr.expr.permute_map(&shuffle)?;
Ok(expr)
})
.collect::<Result<Vec<_>, _>>()?;
Ok(exprs)
}
}
/// A scalar expression, which can be evaluated to a value.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum ScalarExpr {
@@ -210,6 +139,13 @@ impl ScalarExpr {
}
impl ScalarExpr {
pub fn cast(self, typ: ConcreteDataType) -> Self {
ScalarExpr::CallUnary {
func: UnaryFunc::Cast(typ),
expr: Box::new(self),
}
}
/// apply optimization to the expression, like flatten variadic function
pub fn optimize(&mut self) {
self.flatten_varidic_fn();
@@ -341,7 +277,7 @@ impl ScalarExpr {
// put a slice to corresponding batch
let slice_offset = prev_cond_idx;
let slice_length = idx - prev_cond_idx;
let to_be_append = batch.slice(slice_offset, slice_length);
let to_be_append = batch.slice(slice_offset, slice_length)?;
let to_put_back = match prev_cond {
Some(true) => (
@@ -364,7 +300,7 @@ impl ScalarExpr {
if let Some(slice_offset) = prev_start_idx {
let prev_cond = prev_cond.unwrap();
let slice_length = bool_conds.len() - slice_offset;
let to_be_append = batch.slice(slice_offset, slice_length);
let to_be_append = batch.slice(slice_offset, slice_length)?;
let to_put_back = match prev_cond {
Some(true) => (
Some(true),
@@ -876,7 +812,7 @@ mod test {
let raw_len = raw.len();
let vectors = vec![Int32Vector::from(raw).slice(0, raw_len)];
let batch = Batch::new(vectors, raw_len);
let batch = Batch::try_new(vectors, raw_len).unwrap();
let expected = Int32Vector::from(vec![
None,
Some(42),
@@ -895,7 +831,7 @@ mod test {
let raw_len = raw.len();
let vectors = vec![Int32Vector::from(raw).slice(0, raw_len)];
let batch = Batch::new(vectors, raw_len);
let batch = Batch::try_new(vectors, raw_len).unwrap();
let expected = Int32Vector::from(vec![Some(42)]).slice(0, raw_len);
assert_eq!(expr.eval_batch(&batch).unwrap(), expected);
@@ -903,7 +839,7 @@ mod test {
let raw_len = raw.len();
let vectors = vec![Int32Vector::from(raw).slice(0, raw_len)];
let batch = Batch::new(vectors, raw_len);
let batch = Batch::try_new(vectors, raw_len).unwrap();
let expected = NullVector::new(raw_len).slice(0, raw_len);
assert_eq!(expr.eval_batch(&batch).unwrap(), expected);
}

View File

@@ -19,6 +19,8 @@ use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
/// Function signature
///
/// TODO(discord9): use `common_query::signature::Signature` crate
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Hash)]
pub struct Signature {
/// the input types, usually not great than two input arg

View File

@@ -20,9 +20,14 @@
#![allow(dead_code)]
#![warn(clippy::missing_docs_in_private_items)]
#![warn(clippy::too_many_lines)]
// TODO(discord9): enable this lint to handle out of bound access
// #![cfg_attr(not(test), warn(clippy::indexing_slicing))]
// allow unused for now because it should be use later
mod adapter;
mod compute;
mod df_optimizer;
pub mod error;
mod expr;
pub mod heartbeat;

View File

@@ -115,6 +115,8 @@ impl TypedPlan {
/// TODO(discord9): support `TableFunc`by define FlatMap that map 1 to n)
/// Plan describe how to transform data in dataflow
///
/// This can be considered as a physical plan in dataflow, which describe how to transform data in a streaming manner.
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
pub enum Plan {
/// A constant collection of rows.

View File

@@ -177,6 +177,12 @@ impl Row {
}
}
impl From<Vec<Value>> for Row {
fn from(row: Vec<Value>) -> Self {
Row::new(row)
}
}
impl From<ProtoRow> for Row {
fn from(row: ProtoRow) -> Self {
Row::pack(

View File

@@ -374,10 +374,8 @@ impl RelationDesc {
.collect();
let arrow_schema = arrow_schema::Schema::new(fields);
DFSchema::try_from(arrow_schema.clone()).context({
DatafusionSnafu {
context: format!("Error when converting to DFSchema: {:?}", arrow_schema),
}
DFSchema::try_from(arrow_schema.clone()).with_context(|_e| DatafusionSnafu {
context: format!("Error when converting to DFSchema: {:?}", arrow_schema),
})
}

View File

@@ -17,24 +17,19 @@ use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use common_error::ext::BoxedError;
use datafusion::optimizer::simplify_expressions::SimplifyExpressions;
use datafusion::optimizer::{OptimizerContext, OptimizerRule};
use datatypes::data_type::ConcreteDataType as CDT;
use query::parser::QueryLanguageParser;
use query::plan::LogicalPlan;
use query::query_engine::DefaultSerializer;
use query::QueryEngine;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
/// note here we are using the `substrait_proto_df` crate from the `substrait` module and
/// rename it to `substrait_proto`
use substrait::{substrait_proto_df as substrait_proto, DFLogicalSubstraitConvertor};
use substrait::substrait_proto_df as substrait_proto;
use substrait_proto::proto::extensions::simple_extension_declaration::MappingType;
use substrait_proto::proto::extensions::SimpleExtensionDeclaration;
use crate::adapter::FlownodeContext;
use crate::error::{DatafusionSnafu, Error, ExternalSnafu, NotImplementedSnafu, UnexpectedSnafu};
use crate::plan::TypedPlan;
use crate::error::{Error, NotImplementedSnafu, UnexpectedSnafu};
use crate::expr::{TUMBLE_END, TUMBLE_START};
/// a simple macro to generate a not implemented error
macro_rules! not_impl_err {
($($arg:tt)*) => {
@@ -102,68 +97,39 @@ impl FunctionExtensions {
}
}
/// To reuse existing code for parse sql, the sql is first parsed into a datafusion logical plan,
/// then to a substrait plan, and finally to a flow plan.
pub async fn sql_to_flow_plan(
ctx: &mut FlownodeContext,
engine: &Arc<dyn QueryEngine>,
sql: &str,
) -> Result<TypedPlan, Error> {
let query_ctx = ctx.query_context.clone().ok_or_else(|| {
UnexpectedSnafu {
reason: "Query context is missing",
}
.build()
})?;
let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let plan = engine
.planner()
.plan(stmt, query_ctx)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let LogicalPlan::DfPlan(plan) = plan;
let plan = SimplifyExpressions::new()
.rewrite(plan, &OptimizerContext::default())
.context(DatafusionSnafu {
context: "Fail to apply `SimplifyExpressions` optimization",
})?
.data;
let sub_plan = DFLogicalSubstraitConvertor {}
.to_sub_plan(&plan, DefaultSerializer)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let flow_plan = TypedPlan::from_substrait_plan(ctx, &sub_plan).await?;
Ok(flow_plan)
}
/// register flow-specific functions to the query engine
pub fn register_function_to_query_engine(engine: &Arc<dyn QueryEngine>) {
engine.register_function(Arc::new(TumbleFunction {}));
engine.register_function(Arc::new(TumbleFunction::new("tumble")));
engine.register_function(Arc::new(TumbleFunction::new(TUMBLE_START)));
engine.register_function(Arc::new(TumbleFunction::new(TUMBLE_END)));
}
#[derive(Debug)]
pub struct TumbleFunction {}
pub struct TumbleFunction {
name: String,
}
const TUMBLE_NAME: &str = "tumble";
impl TumbleFunction {
fn new(name: &str) -> Self {
Self {
name: name.to_string(),
}
}
}
impl std::fmt::Display for TumbleFunction {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", TUMBLE_NAME.to_ascii_uppercase())
write!(f, "{}", self.name.to_ascii_uppercase())
}
}
impl common_function::function::Function for TumbleFunction {
fn name(&self) -> &str {
TUMBLE_NAME
&self.name
}
fn return_type(&self, _input_types: &[CDT]) -> common_query::error::Result<CDT> {
Ok(CDT::datetime_datatype())
Ok(CDT::timestamp_millisecond_datatype())
}
fn signature(&self) -> common_query::prelude::Signature {
@@ -198,6 +164,7 @@ mod test {
use prost::Message;
use query::parser::QueryLanguageParser;
use query::plan::LogicalPlan;
use query::query_engine::DefaultSerializer;
use query::QueryEngine;
use session::context::QueryContext;
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
@@ -207,6 +174,7 @@ mod test {
use super::*;
use crate::adapter::node_context::IdToNameMap;
use crate::df_optimizer::apply_df_optimizer;
use crate::expr::GlobalId;
use crate::repr::{ColumnType, RelationType};
@@ -292,7 +260,7 @@ mod test {
let factory = query::QueryEngineFactory::new(catalog_list, None, None, None, None, false);
let engine = factory.query_engine();
engine.register_function(Arc::new(TumbleFunction {}));
register_function_to_query_engine(&engine);
assert_eq!("datafusion", engine.name());
engine
@@ -307,6 +275,7 @@ mod test {
.await
.unwrap();
let LogicalPlan::DfPlan(plan) = plan;
let plan = apply_df_optimizer(plan).await.unwrap();
// encode then decode so to rely on the impl of conversion from logical plan to substrait plan
let bytes = DFLogicalSubstraitConvertor {}
@@ -315,4 +284,22 @@ mod test {
proto::Plan::decode(bytes).unwrap()
}
/// TODO(discord9): add more illegal sql tests
#[tokio::test]
async fn test_missing_key_check() {
let engine = create_test_query_engine();
let sql = "SELECT avg(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 hour'), number";
let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
let plan = engine
.planner()
.plan(stmt, QueryContext::arc())
.await
.unwrap();
let LogicalPlan::DfPlan(plan) = plan;
let plan = apply_df_optimizer(plan).await;
assert!(plan.is_err());
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -20,7 +20,7 @@ use common_error::ext::BoxedError;
use common_telemetry::debug;
use datafusion_physical_expr::PhysicalExpr;
use datatypes::data_type::ConcreteDataType as CDT;
use snafu::{OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use substrait_proto::proto::expression::field_reference::ReferenceType::DirectReference;
use substrait_proto::proto::expression::reference_segment::ReferenceType::StructField;
use substrait_proto::proto::expression::{IfThen, RexType, ScalarFunction};
@@ -33,7 +33,7 @@ use crate::error::{
};
use crate::expr::{
BinaryFunc, DfScalarFunction, RawDfScalarFn, ScalarExpr, TypedExpr, UnaryFunc,
UnmaterializableFunc, VariadicFunc,
UnmaterializableFunc, VariadicFunc, TUMBLE_END, TUMBLE_START,
};
use crate::repr::{ColumnType, RelationDesc, RelationType};
use crate::transform::literal::{
@@ -167,6 +167,16 @@ fn rewrite_scalar_function(
arg_typed_exprs: &[TypedExpr],
) -> Result<ScalarFunction, Error> {
let mut f_rewrite = f.clone();
ensure!(
f_rewrite.arguments.len() == arg_typed_exprs.len(),
crate::error::InternalSnafu {
reason: format!(
"Expect `f_rewrite` and `arg_typed_expr` to be same length, found {} and {}",
f_rewrite.arguments.len(),
arg_typed_exprs.len()
)
}
);
for (idx, raw_expr) in f_rewrite.arguments.iter_mut().enumerate() {
// only replace it with col(idx) if it is not literal
// will try best to determine if it is literal, i.e. for function like `cast(<literal>)` will try
@@ -351,7 +361,13 @@ impl TypedExpr {
Ok(TypedExpr::new(ret_expr, ret_type))
}
_var => {
if VariadicFunc::is_valid_func_name(fn_name) {
if fn_name == TUMBLE_START || fn_name == TUMBLE_END {
let (func, arg) = UnaryFunc::from_tumble_func(fn_name, &arg_typed_exprs)?;
let ret_type = ColumnType::new_nullable(func.signature().output.clone());
Ok(TypedExpr::new(arg.expr.call_unary(func), ret_type))
} else if VariadicFunc::is_valid_func_name(fn_name) {
let func = VariadicFunc::from_str_and_types(fn_name, &arg_types)?;
let ret_type = ColumnType::new_nullable(func.signature().output.clone());
let mut expr = ScalarExpr::CallVariadic {
@@ -521,7 +537,6 @@ impl TypedExpr {
#[cfg(test)]
mod test {
use common_time::{DateTime, Interval};
use datatypes::prelude::ConcreteDataType;
use datatypes::value::Value;
use pretty_assertions::assert_eq;
@@ -562,7 +577,7 @@ mod test {
};
let expected = TypedPlan {
schema: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)])
.into_named(vec![Some("number".to_string())]),
.into_named(vec![Some("numbers.number".to_string())]),
plan: Plan::Mfp {
input: Box::new(
Plan::Get {
@@ -576,13 +591,7 @@ mod test {
.into_named(vec![Some("number".to_string())]),
),
),
mfp: MapFilterProject::new(1)
.map(vec![ScalarExpr::Column(0)])
.unwrap()
.filter(vec![filter])
.unwrap()
.project(vec![1])
.unwrap(),
mfp: MapFilterProject::new(1).filter(vec![filter]).unwrap(),
},
};
assert_eq!(flow_plan.unwrap(), expected);
@@ -600,7 +609,7 @@ mod test {
let expected = TypedPlan {
schema: RelationType::new(vec![ColumnType::new(CDT::boolean_datatype(), true)])
.into_unnamed(),
.into_named(vec![Some("Int64(1) + Int64(1) * Int64(2) - Int64(1) / Int64(1) + Int64(1) % Int64(2) = Int64(3)".to_string())]),
plan: Plan::Constant {
rows: vec![(
repr::Row::new(vec![Value::from(true)]),
@@ -624,8 +633,8 @@ mod test {
let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan).await;
let expected = TypedPlan {
schema: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), true)])
.into_unnamed(),
schema: RelationType::new(vec![ColumnType::new(CDT::int64_datatype(), true)])
.into_named(vec![Some("numbers.number + Int64(1)".to_string())]),
plan: Plan::Mfp {
input: Box::new(
Plan::Get {
@@ -640,10 +649,12 @@ mod test {
),
),
mfp: MapFilterProject::new(1)
.map(vec![ScalarExpr::Column(0).call_binary(
ScalarExpr::Literal(Value::from(1u32), CDT::uint32_datatype()),
BinaryFunc::AddUInt32,
)])
.map(vec![ScalarExpr::Column(0)
.call_unary(UnaryFunc::Cast(CDT::int64_datatype()))
.call_binary(
ScalarExpr::Literal(Value::from(1i64), CDT::int64_datatype()),
BinaryFunc::AddInt64,
)])
.unwrap()
.project(vec![1])
.unwrap(),
@@ -663,7 +674,9 @@ mod test {
let expected = TypedPlan {
schema: RelationType::new(vec![ColumnType::new(CDT::int16_datatype(), true)])
.into_unnamed(),
.into_named(vec![Some(
"arrow_cast(Int64(1),Utf8(\"Int16\"))".to_string(),
)]),
plan: Plan::Constant {
// cast of literal is constant folded
rows: vec![(repr::Row::new(vec![Value::from(1i16)]), i64::MIN, 1)],
@@ -683,7 +696,7 @@ mod test {
let expected = TypedPlan {
schema: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), true)])
.into_unnamed(),
.into_named(vec![Some("numbers.number + numbers.number".to_string())]),
plan: Plan::Mfp {
input: Box::new(
Plan::Get {
@@ -780,65 +793,5 @@ mod test {
},
}
);
let f = substrait_proto::proto::expression::ScalarFunction {
function_reference: 0,
arguments: vec![proto_col(0), lit("1 second"), lit("2021-07-01 00:00:00")],
options: vec![],
output_type: None,
..Default::default()
};
let input_schema = RelationType::new(vec![
ColumnType::new(CDT::timestamp_nanosecond_datatype(), false),
ColumnType::new(CDT::string_datatype(), false),
])
.into_unnamed();
let extensions = FunctionExtensions::from_iter(vec![(0, "tumble".to_string())]);
let res = TypedExpr::from_substrait_scalar_func(&f, &input_schema, &extensions)
.await
.unwrap();
assert_eq!(
res,
ScalarExpr::CallUnmaterializable(UnmaterializableFunc::TumbleWindow {
ts: Box::new(
ScalarExpr::Column(0)
.with_type(ColumnType::new(CDT::timestamp_nanosecond_datatype(), false))
),
window_size: Interval::from_month_day_nano(0, 0, 1_000_000_000),
start_time: Some(DateTime::new(1625097600000))
})
.with_type(ColumnType::new(CDT::timestamp_millisecond_datatype(), true)),
);
let f = substrait_proto::proto::expression::ScalarFunction {
function_reference: 0,
arguments: vec![proto_col(0), lit("1 second")],
options: vec![],
output_type: None,
..Default::default()
};
let input_schema = RelationType::new(vec![
ColumnType::new(CDT::timestamp_nanosecond_datatype(), false),
ColumnType::new(CDT::string_datatype(), false),
])
.into_unnamed();
let extensions = FunctionExtensions::from_iter(vec![(0, "tumble".to_string())]);
let res = TypedExpr::from_substrait_scalar_func(&f, &input_schema, &extensions)
.await
.unwrap();
assert_eq!(
res,
ScalarExpr::CallUnmaterializable(UnmaterializableFunc::TumbleWindow {
ts: Box::new(
ScalarExpr::Column(0)
.with_type(ColumnType::new(CDT::timestamp_nanosecond_datatype(), false))
),
window_size: Interval::from_month_day_nano(0, 0, 1_000_000_000),
start_time: None
})
.with_type(ColumnType::new(CDT::timestamp_millisecond_datatype(), true)),
)
}
}

View File

@@ -340,6 +340,8 @@ pub fn from_substrait_type(null_type: &substrait_proto::proto::Type) -> Result<C
#[cfg(test)]
mod test {
use pretty_assertions::assert_eq;
use super::*;
use crate::plan::{Plan, TypedPlan};
use crate::repr::{self, ColumnType, RelationType};
@@ -356,7 +358,7 @@ mod test {
let expected = TypedPlan {
schema: RelationType::new(vec![ColumnType::new(CDT::int64_datatype(), true)])
.into_unnamed(),
.into_named(vec![Some("Int64(1)".to_string())]),
plan: Plan::Constant {
rows: vec![(
repr::Row::new(vec![Value::Int64(1)]),

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::collections::HashSet;
use itertools::Itertools;
use snafu::OptionExt;
@@ -23,9 +23,9 @@ use substrait_proto::proto::rel::RelType;
use substrait_proto::proto::{plan_rel, Plan as SubPlan, ProjectRel, Rel};
use crate::error::{Error, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu, UnexpectedSnafu};
use crate::expr::{MapFilterProject, ScalarExpr, TypedExpr, UnaryFunc};
use crate::plan::{KeyValPlan, Plan, TypedPlan};
use crate::repr::{self, RelationDesc, RelationType};
use crate::expr::{MapFilterProject, TypedExpr};
use crate::plan::{Plan, TypedPlan};
use crate::repr::{self, RelationType};
use crate::transform::{substrait_proto, FlownodeContext, FunctionExtensions};
impl TypedPlan {
@@ -49,7 +49,14 @@ impl TypedPlan {
let input = root.input.as_ref().with_context(|| InvalidQuerySnafu {
reason: "Root relation without input",
})?;
Ok(TypedPlan::from_substrait_rel(ctx, input, &function_extension).await?)
let mut ret = TypedPlan::from_substrait_rel(ctx, input, &function_extension).await?;
if !root.names.is_empty() {
ret.schema = ret.schema.clone().try_with_names(root.names.clone())?;
}
Ok(ret)
}
},
None => plan_err!("Cannot parse plan relation: None")
@@ -115,17 +122,6 @@ impl TypedPlan {
plan,
})
} else {
match input.plan.clone() {
Plan::Reduce { key_val_plan, .. } => {
rewrite_projection_after_reduce(key_val_plan, &input.schema, &mut exprs)?;
}
Plan::Mfp { input, mfp: _ } => {
if let Plan::Reduce { key_val_plan, .. } = input.plan {
rewrite_projection_after_reduce(key_val_plan, &input.schema, &mut exprs)?;
}
}
_ => (),
}
input.projection(exprs)
}
}
@@ -233,120 +229,13 @@ impl TypedPlan {
}
}
/// if reduce_plan contains the special function like tumble floor/ceiling, add them to the proj_exprs
/// so the effect is the window_start, window_end column are auto added to output rows
///
/// This is to fix a problem that we have certain functions that return two values, but since substrait doesn't know that, it will assume it return one value
/// this function fix that and rewrite `proj_exprs` to correct form
fn rewrite_projection_after_reduce(
key_val_plan: KeyValPlan,
reduce_output_type: &RelationDesc,
proj_exprs: &mut Vec<TypedExpr>,
) -> Result<(), Error> {
// TODO(discord9): get keys correctly
let key_exprs = key_val_plan
.key_plan
.projection
.clone()
.into_iter()
.map(|i| {
if i < key_val_plan.key_plan.input_arity {
ScalarExpr::Column(i)
} else {
key_val_plan.key_plan.expressions[i - key_val_plan.key_plan.input_arity].clone()
}
})
.collect_vec();
let mut shift_offset = 0;
let mut shuffle: BTreeMap<usize, usize> = BTreeMap::new();
let special_keys = key_exprs
.clone()
.into_iter()
.enumerate()
.filter(|(idx, p)| {
shuffle.insert(*idx, *idx + shift_offset);
if matches!(
p,
ScalarExpr::CallUnary {
func: UnaryFunc::TumbleWindowFloor { .. },
..
} | ScalarExpr::CallUnary {
func: UnaryFunc::TumbleWindowCeiling { .. },
..
}
) {
if matches!(
p,
ScalarExpr::CallUnary {
func: UnaryFunc::TumbleWindowFloor { .. },
..
}
) {
shift_offset += 1;
}
true
} else {
false
}
})
.collect_vec();
let spec_key_arity = special_keys.len();
if spec_key_arity == 0 {
return Ok(());
}
// shuffle proj_exprs
// because substrait use offset while assume `tumble` only return one value
for proj_expr in proj_exprs.iter_mut() {
proj_expr.expr.permute_map(&shuffle)?;
} // add key to the end
for (key_idx, _key_expr) in special_keys {
// here we assume the output type of reduce operator(`reduce_output_type`) is just first keys columns, then append value columns
// so we can use `key_idx` to index `reduce_output_type` and get the keys we need to append to `proj_exprs`
proj_exprs.push(
ScalarExpr::Column(key_idx)
.with_type(reduce_output_type.typ().column_types[key_idx].clone()),
);
}
// check if normal expr in group exprs are all in proj_exprs
let all_cols_ref_in_proj: BTreeSet<usize> = proj_exprs
.iter()
.filter_map(|e| {
if let ScalarExpr::Column(i) = &e.expr {
Some(*i)
} else {
None
}
})
.collect();
for (key_idx, key_expr) in key_exprs.iter().enumerate() {
if let ScalarExpr::Column(_) = key_expr {
if !all_cols_ref_in_proj.contains(&key_idx) {
let err_msg = format!(
"Expect normal column in group by also appear in projection, but column {}(name is {}) is missing",
key_idx,
reduce_output_type
.get_name(key_idx)
.clone()
.map(|s|format!("'{}'",s))
.unwrap_or("unknown".to_string())
);
return InvalidQuerySnafu { reason: err_msg }.fail();
}
}
}
Ok(())
}
#[cfg(test)]
mod test {
use datatypes::prelude::ConcreteDataType;
use pretty_assertions::assert_eq;
use super::*;
use crate::expr::{GlobalId, ScalarExpr};
use crate::expr::GlobalId;
use crate::plan::{Plan, TypedPlan};
use crate::repr::{ColumnType, RelationType};
use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait};
@@ -363,7 +252,7 @@ mod test {
let expected = TypedPlan {
schema: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)])
.into_named(vec![Some("number".to_string())]),
.into_named(vec![Some("numbers.number".to_string())]),
plan: Plan::Mfp {
input: Box::new(
Plan::Get {
@@ -377,11 +266,7 @@ mod test {
.into_named(vec![Some("number".to_string())]),
),
),
mfp: MapFilterProject::new(1)
.map(vec![ScalarExpr::Column(0)])
.unwrap()
.project(vec![1])
.unwrap(),
mfp: MapFilterProject::new(1),
},
};

View File

@@ -513,7 +513,7 @@ pub type ArrangeReader<'a> = tokio::sync::RwLockReadGuard<'a, Arrangement>;
pub type ArrangeWriter<'a> = tokio::sync::RwLockWriteGuard<'a, Arrangement>;
/// A handler to the inner Arrangement, can be cloned and shared, useful for query it's inner state
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct ArrangeHandler {
inner: Arc<RwLock<Arrangement>>,
}

View File

@@ -32,6 +32,7 @@ use tokio::time::{Duration, Instant};
use crate::error;
use crate::error::Result;
use crate::frontend::FrontendOptions;
use crate::metrics::{HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
pub mod handler;
@@ -94,10 +95,16 @@ impl HeartbeatTask {
let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), resp);
if let Err(e) = capture_self.handle_response(ctx).await {
error!(e; "Error while handling heartbeat response");
HEARTBEAT_RECV_COUNT
.with_label_values(&["processing_error"])
.inc();
} else {
HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
}
}
Ok(None) => break,
Err(e) => {
HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
error!(e; "Occur error while reading heartbeat response");
capture_self
.start_with_retry(Duration::from_millis(retry_interval))
@@ -180,6 +187,7 @@ impl HeartbeatTask {
error!(e; "Failed to send heartbeat to metasrv");
break;
} else {
HEARTBEAT_SENT_COUNT.inc();
debug!("Send a heartbeat request to metasrv, content: {:?}", req);
}
}

View File

@@ -79,9 +79,9 @@ pub use standalone::StandaloneDatanodeManager;
use self::prom_store::ExportMetricHandler;
use crate::error::{
self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, ParseSqlSnafu,
PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu, StartServerSnafu,
TableOperationSnafu,
self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, InvalidSqlSnafu,
ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu,
StartServerSnafu, TableOperationSnafu,
};
use crate::frontend::FrontendOptions;
use crate::heartbeat::HeartbeatTask;
@@ -452,6 +452,9 @@ pub fn check_permission(
| Statement::DropDatabase(_)
| Statement::DropFlow(_)
| Statement::Use(_) => {}
Statement::ShowCreateDatabase(stmt) => {
validate_database(&stmt.database_name, query_ctx)?;
}
Statement::ShowCreateTable(stmt) => {
validate_param(&stmt.table_name, query_ctx)?;
}
@@ -527,8 +530,8 @@ pub fn check_permission(
},
Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
match copy_database {
CopyDatabase::To(stmt) => validate_param(&stmt.database_name, query_ctx)?,
CopyDatabase::From(stmt) => validate_param(&stmt.database_name, query_ctx)?,
CopyDatabase::To(stmt) => validate_database(&stmt.database_name, query_ctx)?,
CopyDatabase::From(stmt) => validate_database(&stmt.database_name, query_ctx)?,
}
}
Statement::TruncateTable(stmt) => {
@@ -548,6 +551,26 @@ fn validate_param(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()>
.context(SqlExecInterceptedSnafu)
}
fn validate_database(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
let (catalog, schema) = match &name.0[..] {
[schema] => (
query_ctx.current_catalog().to_string(),
schema.value.clone(),
),
[catalog, schema] => (catalog.value.clone(), schema.value.clone()),
_ => InvalidSqlSnafu {
err_msg: format!(
"expect database name to be <catalog>.<schema> or <schema>, actual: {name}",
),
}
.fail()?,
};
validate_catalog_and_schema(&catalog, &schema, query_ctx)
.map_err(BoxedError::new)
.context(SqlExecInterceptedSnafu)
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;

View File

@@ -51,4 +51,17 @@ lazy_static! {
"frontend otlp traces rows"
)
.unwrap();
/// The number of heartbeats send by frontend node.
pub static ref HEARTBEAT_SENT_COUNT: IntCounter = register_int_counter!(
"greptime_frontend_heartbeat_send_count",
"frontend heartbeat sent",
)
.unwrap();
/// The number of heartbeats received by frontend node, labeled with result type.
pub static ref HEARTBEAT_RECV_COUNT: IntCounterVec = register_int_counter_vec!(
"greptime_frontend_heartbeat_recv_count",
"frontend heartbeat received",
&["result"]
)
.unwrap();
}

View File

@@ -26,6 +26,7 @@ common-telemetry.workspace = true
common-time.workspace = true
common-wal.workspace = true
delta-encoding = "0.4"
derive_builder.workspace = true
futures.workspace = true
futures-util.workspace = true
itertools.workspace = true

View File

@@ -304,6 +304,15 @@ pub enum Error {
error: object_store::Error,
},
#[snafu(display("Failed to read index, path: {path}"))]
ReadIndex {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: object_store::Error,
path: String,
},
#[snafu(display(
"The length of meta if exceeded the limit: {}, actual: {}",
limit,

View File

@@ -14,15 +14,10 @@
pub(crate) mod client_manager;
pub(crate) mod consumer;
/// TODO(weny): remove it.
#[allow(dead_code)]
#[allow(unused_imports)]
pub(crate) mod index;
pub mod log_store;
pub(crate) mod producer;
pub(crate) mod util;
/// TODO(weny): remove it.
#[allow(dead_code)]
pub(crate) mod worker;
pub use index::{default_index_file, GlobalIndexCollector};

View File

@@ -19,6 +19,7 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use common_telemetry::debug;
use derive_builder::Builder;
use futures::future::{BoxFuture, Fuse, FusedFuture};
use futures::{FutureExt, Stream};
use pin_project::pin_project;
@@ -60,40 +61,61 @@ struct FetchResult {
used_offset: i64,
}
const MAX_BATCH_SIZE: usize = 52428800;
const AVG_RECORD_SIZE: usize = 256 * 1024;
/// The [`Consumer`] struct represents a Kafka consumer that fetches messages from
/// a Kafka cluster. Yielding records respecting the [`RegionWalIndexIterator`].
#[pin_project]
#[derive(Builder)]
#[builder(pattern = "owned")]
pub struct Consumer {
#[builder(default = "-1")]
last_high_watermark: i64,
/// The client is used to fetch records from kafka topic.
client: Arc<dyn FetchClient>,
/// The max batch size in a single fetch request.
#[builder(default = "MAX_BATCH_SIZE")]
max_batch_size: usize,
/// The max wait milliseconds.
#[builder(default = "500")]
max_wait_ms: u32,
/// The avg record size
#[builder(default = "AVG_RECORD_SIZE")]
avg_record_size: usize,
/// Termination flag
#[builder(default = "false")]
terminated: bool,
/// The buffer of records.
buffer: RecordsBuffer,
/// The fetch future.
#[builder(default = "Fuse::terminated()")]
fetch_fut: Fuse<BoxFuture<'static, rskafka::client::error::Result<FetchResult>>>,
}
struct RecordsBuffer {
pub(crate) struct RecordsBuffer {
buffer: VecDeque<RecordAndOffset>,
index: Box<dyn RegionWalIndexIterator>,
}
impl RecordsBuffer {
/// Creates an empty [`RecordsBuffer`]
pub fn new(index: Box<dyn RegionWalIndexIterator>) -> Self {
RecordsBuffer {
buffer: VecDeque::new(),
index,
}
}
}
impl RecordsBuffer {
fn pop_front(&mut self) -> Option<RecordAndOffset> {
while let Some(index) = self.index.peek() {

View File

@@ -20,10 +20,11 @@ pub use collector::GlobalIndexCollector;
pub(crate) use collector::{IndexCollector, NoopCollector};
pub(crate) use encoder::{IndexEncoder, JsonIndexEncoder};
pub(crate) use iterator::{
MultipleRegionWalIndexIterator, NextBatchHint, RegionWalIndexIterator, RegionWalRange,
RegionWalVecIndex,
build_region_wal_index_iterator, NextBatchHint, RegionWalIndexIterator, MIN_BATCH_WINDOW_SIZE,
};
#[cfg(test)]
pub(crate) use iterator::{MultipleRegionWalIndexIterator, RegionWalRange, RegionWalVecIndex};
pub fn default_index_file(datanode_id: u64) -> String {
format!("__datanode/{datanode_id}/index.json")
pub fn default_index_file(location_id: u64) -> String {
format!("__wal/{location_id}/index.json")
}

View File

@@ -19,6 +19,7 @@ use std::time::Duration;
use common_telemetry::{error, info};
use futures::future::try_join_all;
use object_store::ErrorKind;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::logstore::provider::KafkaProvider;
@@ -28,8 +29,9 @@ use tokio::select;
use tokio::sync::mpsc::Sender;
use tokio::sync::Mutex as TokioMutex;
use super::default_index_file;
use crate::error::{self, Result};
use crate::kafka::index::encoder::IndexEncoder;
use crate::kafka::index::encoder::{DatanodeWalIndexes, IndexEncoder};
use crate::kafka::index::JsonIndexEncoder;
use crate::kafka::worker::{DumpIndexRequest, TruncateIndexRequest, WorkerRequest};
@@ -52,10 +54,11 @@ pub trait IndexCollector: Send + Sync {
/// The [`GlobalIndexCollector`] struct is responsible for managing index entries
/// across multiple providers.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct GlobalIndexCollector {
providers: Arc<TokioMutex<HashMap<Arc<KafkaProvider>, Sender<WorkerRequest>>>>,
task: CollectionTask,
operator: object_store::ObjectStore,
_handle: CollectionTaskHandle,
}
#[derive(Debug, Clone)]
@@ -103,7 +106,7 @@ impl CollectionTask {
/// The background task performs two main operations:
/// - Persists the WAL index to the specified `path` at every `dump_index_interval`.
/// - Updates the latest index ID for each WAL provider at every `checkpoint_interval`.
fn run(&self) {
fn run(self) -> CollectionTaskHandle {
let mut dump_index_interval = tokio::time::interval(self.dump_index_interval);
let running = self.running.clone();
let moved_self = self.clone();
@@ -122,15 +125,23 @@ impl CollectionTask {
}
}
});
CollectionTaskHandle {
running: self.running.clone(),
}
}
}
impl Drop for CollectionTask {
impl Drop for CollectionTaskHandle {
fn drop(&mut self) {
self.running.store(false, Ordering::Relaxed);
}
}
#[derive(Debug, Default)]
struct CollectionTaskHandle {
running: Arc<AtomicBool>,
}
impl GlobalIndexCollector {
/// Constructs a [`GlobalIndexCollector`].
///
@@ -148,16 +159,65 @@ impl GlobalIndexCollector {
let task = CollectionTask {
providers: providers.clone(),
dump_index_interval,
operator,
operator: operator.clone(),
path,
running: Arc::new(AtomicBool::new(true)),
};
task.run();
Self { providers, task }
let handle = task.run();
Self {
providers,
operator,
_handle: handle,
}
}
#[cfg(test)]
pub fn new_for_test(operator: object_store::ObjectStore) -> Self {
Self {
providers: Default::default(),
operator,
_handle: Default::default(),
}
}
}
impl GlobalIndexCollector {
/// Retrieve [`EntryId`]s for a specified `region_id` in `datanode_id`
/// that are greater than or equal to a given `entry_id`.
pub(crate) async fn read_remote_region_index(
&self,
location_id: u64,
provider: &KafkaProvider,
region_id: RegionId,
entry_id: EntryId,
) -> Result<Option<(BTreeSet<EntryId>, EntryId)>> {
let path = default_index_file(location_id);
let bytes = match self.operator.read(&path).await {
Ok(bytes) => bytes.to_vec(),
Err(err) => {
if err.kind() == ErrorKind::NotFound {
return Ok(None);
} else {
return Err(err).context(error::ReadIndexSnafu { path });
}
}
};
match DatanodeWalIndexes::decode(&bytes)?.provider(provider) {
Some(indexes) => {
let last_index = indexes.last_index();
let indexes = indexes
.region(region_id)
.unwrap_or_default()
.split_off(&entry_id);
Ok(Some((indexes, last_index)))
}
None => Ok(None),
}
}
/// Creates a new [`ProviderLevelIndexCollector`] for a specified provider.
pub(crate) async fn provider_level_index_collector(
&self,
@@ -266,3 +326,92 @@ impl IndexCollector for NoopCollector {
fn dump(&mut self, _encoder: &dyn IndexEncoder) {}
}
#[cfg(test)]
mod tests {
use std::collections::{BTreeSet, HashMap};
use store_api::logstore::provider::KafkaProvider;
use store_api::storage::RegionId;
use crate::kafka::index::collector::RegionIndexes;
use crate::kafka::index::encoder::IndexEncoder;
use crate::kafka::index::JsonIndexEncoder;
use crate::kafka::{default_index_file, GlobalIndexCollector};
#[tokio::test]
async fn test_read_remote_region_index() {
let operator = object_store::ObjectStore::new(object_store::services::Memory::default())
.unwrap()
.finish();
let path = default_index_file(0);
let encoder = JsonIndexEncoder::default();
encoder.encode(
&KafkaProvider::new("my_topic_0".to_string()),
&RegionIndexes {
regions: HashMap::from([(RegionId::new(1, 1), BTreeSet::from([1, 5, 15]))]),
latest_entry_id: 20,
},
);
let bytes = encoder.finish().unwrap();
let mut writer = operator.writer(&path).await.unwrap();
writer.write(bytes).await.unwrap();
writer.close().await.unwrap();
let collector = GlobalIndexCollector::new_for_test(operator.clone());
// Index file doesn't exist
let result = collector
.read_remote_region_index(
1,
&KafkaProvider::new("my_topic_0".to_string()),
RegionId::new(1, 1),
1,
)
.await
.unwrap();
assert!(result.is_none());
// RegionId doesn't exist
let (indexes, last_index) = collector
.read_remote_region_index(
0,
&KafkaProvider::new("my_topic_0".to_string()),
RegionId::new(1, 2),
5,
)
.await
.unwrap()
.unwrap();
assert_eq!(indexes, BTreeSet::new());
assert_eq!(last_index, 20);
// RegionId(1, 1), Start EntryId: 5
let (indexes, last_index) = collector
.read_remote_region_index(
0,
&KafkaProvider::new("my_topic_0".to_string()),
RegionId::new(1, 1),
5,
)
.await
.unwrap()
.unwrap();
assert_eq!(indexes, BTreeSet::from([5, 15]));
assert_eq!(last_index, 20);
// RegionId(1, 1), Start EntryId: 20
let (indexes, last_index) = collector
.read_remote_region_index(
0,
&KafkaProvider::new("my_topic_0".to_string()),
RegionId::new(1, 1),
20,
)
.await
.unwrap()
.unwrap();
assert_eq!(indexes, BTreeSet::new());
assert_eq!(last_index, 20);
}
}

View File

@@ -50,7 +50,7 @@ pub struct DeltaEncodedRegionIndexes {
impl DeltaEncodedRegionIndexes {
/// Retrieves the original (decoded) index values for a given region.
fn region(&self, region_id: RegionId) -> Option<BTreeSet<u64>> {
pub(crate) fn region(&self, region_id: RegionId) -> Option<BTreeSet<u64>> {
let decoded = self
.regions
.get(&region_id)
@@ -60,7 +60,7 @@ impl DeltaEncodedRegionIndexes {
}
/// Retrieves the last index.
fn last_index(&self) -> u64 {
pub(crate) fn last_index(&self) -> u64 {
self.last_index
}
}
@@ -86,7 +86,7 @@ impl DatanodeWalIndexes {
value
}
fn decode(byte: &[u8]) -> Result<Self> {
pub(crate) fn decode(byte: &[u8]) -> Result<Self> {
serde_json::from_slice(byte).context(error::DecodeJsonSnafu)
}
@@ -118,7 +118,7 @@ impl IndexEncoder for JsonIndexEncoder {
#[cfg(test)]
mod tests {
use std::collections::{BTreeSet, HashMap, HashSet};
use std::collections::{BTreeSet, HashMap};
use store_api::logstore::provider::KafkaProvider;
use store_api::storage::RegionId;

View File

@@ -12,8 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::cmp::min;
use std::collections::VecDeque;
use std::cmp::{max, min};
use std::collections::{BTreeSet, VecDeque};
use std::fmt::Debug;
use std::ops::Range;
use store_api::logstore::EntryId;
@@ -27,7 +28,7 @@ pub(crate) struct NextBatchHint {
}
/// An iterator over WAL (Write-Ahead Log) entries index for a region.
pub trait RegionWalIndexIterator: Send + Sync {
pub trait RegionWalIndexIterator: Send + Sync + Debug {
/// Returns next batch hint.
fn next_batch_hint(&self, avg_size: usize) -> Option<NextBatchHint>;
@@ -36,9 +37,13 @@ pub trait RegionWalIndexIterator: Send + Sync {
// Advances the iterator and returns the next EntryId.
fn next(&mut self) -> Option<EntryId>;
#[cfg(test)]
fn as_any(&self) -> &dyn std::any::Any;
}
/// Represents a range [next_entry_id, end_entry_id) of WAL entries for a region.
#[derive(Debug)]
pub struct RegionWalRange {
current_entry_id: EntryId,
end_entry_id: EntryId,
@@ -96,10 +101,18 @@ impl RegionWalIndexIterator for RegionWalRange {
None
}
}
#[cfg(test)]
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
pub const MIN_BATCH_WINDOW_SIZE: usize = 4 * 1024 * 1024;
/// Represents an index of Write-Ahead Log entries for a region,
/// stored as a vector of [EntryId]s.
#[derive(Debug)]
pub struct RegionWalVecIndex {
index: VecDeque<EntryId>,
min_batch_window_size: usize,
@@ -134,11 +147,17 @@ impl RegionWalIndexIterator for RegionWalVecIndex {
fn next(&mut self) -> Option<EntryId> {
self.index.pop_front()
}
#[cfg(test)]
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
/// Represents an iterator over multiple region WAL indexes.
///
/// Allowing iteration through multiple WAL indexes.
#[derive(Debug)]
pub struct MultipleRegionWalIndexIterator {
iterator: VecDeque<Box<dyn RegionWalIndexIterator>>,
}
@@ -185,6 +204,53 @@ impl RegionWalIndexIterator for MultipleRegionWalIndexIterator {
self.iterator.front_mut().and_then(|iter| iter.next())
}
#[cfg(test)]
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
/// Builds [`RegionWalIndexIterator`].
///
/// Returns None means there are no entries to replay.
pub fn build_region_wal_index_iterator(
start_entry_id: EntryId,
end_entry_id: EntryId,
region_indexes: Option<(BTreeSet<EntryId>, EntryId)>,
max_batch_bytes: usize,
min_window_size: usize,
) -> Option<Box<dyn RegionWalIndexIterator>> {
if (start_entry_id..end_entry_id).is_empty() {
return None;
}
match region_indexes {
Some((region_indexes, last_index)) => {
if region_indexes.is_empty() && last_index >= end_entry_id {
return None;
}
let mut iterator: Vec<Box<dyn RegionWalIndexIterator>> = Vec::with_capacity(2);
if !region_indexes.is_empty() {
let index = RegionWalVecIndex::new(region_indexes, min_window_size);
iterator.push(Box::new(index));
}
let known_last_index = max(last_index, start_entry_id);
if known_last_index < end_entry_id {
let range = known_last_index..end_entry_id;
let index = RegionWalRange::new(range, max_batch_bytes);
iterator.push(Box::new(index));
}
Some(Box::new(MultipleRegionWalIndexIterator::new(iterator)))
}
None => {
let range = start_entry_id..end_entry_id;
Some(Box::new(RegionWalRange::new(range, max_batch_bytes)))
}
}
}
#[cfg(test)]
@@ -353,4 +419,69 @@ mod tests {
assert_eq!(iter.peek(), None);
assert_eq!(iter.next(), None);
}
#[test]
fn test_build_region_wal_index_iterator() {
let iterator = build_region_wal_index_iterator(1024, 1024, None, 5, 5);
assert!(iterator.is_none());
let iterator = build_region_wal_index_iterator(1024, 1023, None, 5, 5);
assert!(iterator.is_none());
let iterator =
build_region_wal_index_iterator(1024, 1024, Some((BTreeSet::new(), 1024)), 5, 5);
assert!(iterator.is_none());
let iterator =
build_region_wal_index_iterator(1, 1024, Some((BTreeSet::new(), 1024)), 5, 5);
assert!(iterator.is_none());
let iterator =
build_region_wal_index_iterator(1, 1024, Some((BTreeSet::new(), 1025)), 5, 5);
assert!(iterator.is_none());
let iterator = build_region_wal_index_iterator(
1,
1024,
Some((BTreeSet::from([512, 756]), 1024)),
5,
5,
)
.unwrap();
let iter = iterator
.as_any()
.downcast_ref::<MultipleRegionWalIndexIterator>()
.unwrap();
assert_eq!(iter.iterator.len(), 1);
let vec_index = iter.iterator[0]
.as_any()
.downcast_ref::<RegionWalVecIndex>()
.unwrap();
assert_eq!(vec_index.index, VecDeque::from([512, 756]));
let iterator = build_region_wal_index_iterator(
1,
1024,
Some((BTreeSet::from([512, 756]), 1023)),
5,
5,
)
.unwrap();
let iter = iterator
.as_any()
.downcast_ref::<MultipleRegionWalIndexIterator>()
.unwrap();
assert_eq!(iter.iterator.len(), 2);
let vec_index = iter.iterator[0]
.as_any()
.downcast_ref::<RegionWalVecIndex>()
.unwrap();
assert_eq!(vec_index.index, VecDeque::from([512, 756]));
let wal_range = iter.iterator[1]
.as_any()
.downcast_ref::<RegionWalRange>()
.unwrap();
assert_eq!(wal_range.current_entry_id, 1023);
assert_eq!(wal_range.end_entry_id, 1024);
}
}

View File

@@ -20,19 +20,20 @@ use common_telemetry::{debug, warn};
use common_wal::config::kafka::DatanodeKafkaConfig;
use futures::future::try_join_all;
use futures_util::StreamExt;
use rskafka::client::consumer::{StartOffset, StreamConsumerBuilder};
use rskafka::client::partition::OffsetAt;
use snafu::{OptionExt, ResultExt};
use store_api::logstore::entry::{
Entry, Id as EntryId, MultiplePartEntry, MultiplePartHeader, NaiveEntry,
};
use store_api::logstore::provider::{KafkaProvider, Provider};
use store_api::logstore::{AppendBatchResponse, LogStore, SendableEntryStream};
use store_api::logstore::{AppendBatchResponse, LogStore, SendableEntryStream, WalIndex};
use store_api::storage::RegionId;
use super::index::build_region_wal_index_iterator;
use crate::error::{self, ConsumeRecordSnafu, Error, GetOffsetSnafu, InvalidProviderSnafu, Result};
use crate::kafka::client_manager::{ClientManager, ClientManagerRef};
use crate::kafka::index::GlobalIndexCollector;
use crate::kafka::consumer::{ConsumerBuilder, RecordsBuffer};
use crate::kafka::index::{GlobalIndexCollector, MIN_BATCH_WINDOW_SIZE};
use crate::kafka::producer::OrderedBatchProducerRef;
use crate::kafka::util::record::{
convert_to_kafka_records, maybe_emit_entry, remaining_entries, Record, ESTIMATED_META_SIZE,
@@ -205,6 +206,7 @@ impl LogStore for KafkaLogStore {
&self,
provider: &Provider,
entry_id: EntryId,
index: Option<WalIndex>,
) -> Result<SendableEntryStream<'static, Entry, Self::Error>> {
let provider = provider
.as_kafka_provider()
@@ -232,35 +234,41 @@ impl LogStore for KafkaLogStore {
.await
.context(GetOffsetSnafu {
topic: &provider.topic,
})?
- 1;
// Reads entries with offsets in the range [start_offset, end_offset].
let start_offset = entry_id as i64;
})?;
debug!(
"Start reading entries in range [{}, {}] for ns {}",
start_offset, end_offset, provider
);
let region_indexes = if let (Some(index), Some(collector)) =
(index, self.client_manager.global_index_collector())
{
collector
.read_remote_region_index(index.location_id, provider, index.region_id, entry_id)
.await?
} else {
None
};
// Abort if there're no new entries.
// FIXME(niebayes): how come this case happens?
if start_offset > end_offset {
warn!(
"No new entries for ns {} in range [{}, {}]",
provider, start_offset, end_offset
);
let Some(iterator) = build_region_wal_index_iterator(
entry_id,
end_offset as u64,
region_indexes,
self.max_batch_bytes,
MIN_BATCH_WINDOW_SIZE,
) else {
let range = entry_id..end_offset as u64;
warn!("No new entries in range {:?} of ns {}", range, provider);
return Ok(futures_util::stream::empty().boxed());
}
};
let mut stream_consumer = StreamConsumerBuilder::new(client, StartOffset::At(start_offset))
.with_max_batch_size(self.max_batch_bytes as i32)
.with_max_wait_ms(self.consumer_wait_timeout.as_millis() as i32)
.build();
debug!("Reading entries with {:?} of ns {}", iterator, provider);
debug!(
"Built a stream consumer for ns {} to consume entries in range [{}, {}]",
provider, start_offset, end_offset
);
// Safety: Must be ok.
let mut stream_consumer = ConsumerBuilder::default()
.client(client)
// Safety: checked before.
.buffer(RecordsBuffer::new(iterator))
.max_batch_size(self.max_batch_bytes)
.max_wait_ms(self.consumer_wait_timeout.as_millis() as u32)
.build()
.unwrap();
// A buffer is used to collect records to construct a complete entry.
let mut entry_records: HashMap<RegionId, Vec<Record>> = HashMap::new();
@@ -511,7 +519,7 @@ mod tests {
// 5 region
assert_eq!(response.last_entry_ids.len(), 5);
let got_entries = logstore
.read(&provider, 0)
.read(&provider, 0, None)
.await
.unwrap()
.try_collect::<Vec<_>>()
@@ -584,7 +592,7 @@ mod tests {
// 5 region
assert_eq!(response.last_entry_ids.len(), 5);
let got_entries = logstore
.read(&provider, 0)
.read(&provider, 0, None)
.await
.unwrap()
.try_collect::<Vec<_>>()

View File

@@ -25,7 +25,7 @@ use raft_engine::{Config, Engine, LogBatch, MessageExt, ReadableSize, RecoveryMo
use snafu::{ensure, OptionExt, ResultExt};
use store_api::logstore::entry::{Entry, Id as EntryId, NaiveEntry};
use store_api::logstore::provider::{Provider, RaftEngineProvider};
use store_api::logstore::{AppendBatchResponse, LogStore, SendableEntryStream};
use store_api::logstore::{AppendBatchResponse, LogStore, SendableEntryStream, WalIndex};
use store_api::storage::RegionId;
use crate::error::{
@@ -252,6 +252,7 @@ impl LogStore for RaftEngineLogStore {
&self,
provider: &Provider,
entry_id: EntryId,
_index: Option<WalIndex>,
) -> Result<SendableEntryStream<'static, Entry, Self::Error>> {
let ns = provider
.as_raft_engine_provider()
@@ -545,7 +546,7 @@ mod tests {
}
let mut entries = HashSet::with_capacity(1024);
let mut s = logstore
.read(&Provider::raft_engine_provider(1), 0)
.read(&Provider::raft_engine_provider(1), 0, None)
.await
.unwrap();
while let Some(r) = s.next().await {
@@ -578,7 +579,7 @@ mod tests {
.await
.is_ok());
let entries = logstore
.read(&Provider::raft_engine_provider(1), 1)
.read(&Provider::raft_engine_provider(1), 1, None)
.await
.unwrap()
.collect::<Vec<_>>()
@@ -596,7 +597,7 @@ mod tests {
let entries = collect_entries(
logstore
.read(&Provider::raft_engine_provider(1), 1)
.read(&Provider::raft_engine_provider(1), 1, None)
.await
.unwrap(),
)
@@ -682,7 +683,7 @@ mod tests {
logstore.obsolete(&namespace, region_id, 100).await.unwrap();
assert_eq!(101, logstore.engine.first_index(namespace_id).unwrap());
let res = logstore.read(&namespace, 100).await.unwrap();
let res = logstore.read(&namespace, 100, None).await.unwrap();
let mut vec = collect_entries(res).await;
vec.sort_by(|a, b| a.entry_id().partial_cmp(&b.entry_id()).unwrap());
assert_eq!(101, vec.first().unwrap().entry_id());

View File

@@ -45,4 +45,7 @@ lazy_static! {
/// Meta kv cache miss counter.
pub static ref METRIC_META_KV_CACHE_MISS: IntCounterVec =
register_int_counter_vec!("greptime_meta_kv_cache_miss", "meta kv cache miss", &["op"]).unwrap();
// Heartbeat received by metasrv.
pub static ref METRIC_META_HEARTBEAT_RECV: IntCounterVec =
register_int_counter_vec!("greptime_meta_heartbeat_recv", "heartbeats received by metasrv", &["pusher_key"]).unwrap();
}

View File

@@ -90,6 +90,7 @@ impl UpgradeCandidateRegion {
region_id,
last_entry_id,
wait_for_replay_timeout: Some(self.replay_timeout),
location_id: Some(ctx.persistent_ctx.from_peer.id),
})
}

View File

@@ -32,6 +32,7 @@ use crate::error;
use crate::error::Result;
use crate::handler::{HeartbeatHandlerGroup, Pusher};
use crate::metasrv::{Context, Metasrv};
use crate::metrics::METRIC_META_HEARTBEAT_RECV;
use crate::service::{GrpcResult, GrpcStream};
#[async_trait::async_trait]
@@ -65,7 +66,11 @@ impl heartbeat_server::Heartbeat for Metasrv {
if pusher_key.is_none() {
pusher_key = register_pusher(&handler_group, header, tx.clone()).await;
}
if let Some(k) = &pusher_key {
METRIC_META_HEARTBEAT_RECV.with_label_values(&[k]);
} else {
METRIC_META_HEARTBEAT_RECV.with_label_values(&["none"]);
}
let res = handler_group
.handle(req, ctx.clone())
.await

View File

@@ -41,6 +41,7 @@ impl MetricEngineInner {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: req.set_writable,
entry_id: None,
location_id: req.location_id,
}),
)
.await
@@ -52,6 +53,7 @@ impl MetricEngineInner {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: req.set_writable,
entry_id: req.entry_id,
location_id: req.location_id,
}),
)
.await

View File

@@ -179,7 +179,6 @@ impl FileCache {
}
}
#[allow(unused)]
/// Removes a file from the cache explicitly.
pub(crate) async fn remove(&self, key: IndexKey) {
let file_path = self.cache_file_path(key);

View File

@@ -27,7 +27,10 @@ use snafu::ResultExt;
use crate::access_layer::{new_fs_cache_store, SstWriteRequest};
use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
use crate::error::{self, Result};
use crate::metrics::{FLUSH_ELAPSED, UPLOAD_BYTES_TOTAL};
use crate::metrics::{
FLUSH_ELAPSED, UPLOAD_BYTES_TOTAL, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL,
WRITE_CACHE_DOWNLOAD_ELAPSED,
};
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::sst::index::IndexerBuilder;
@@ -166,6 +169,81 @@ impl WriteCache {
Ok(Some(sst_info))
}
/// Removes a file from the cache by `index_key`.
pub(crate) async fn remove(&self, index_key: IndexKey) {
self.file_cache.remove(index_key).await
}
/// Downloads a file in `remote_path` from the remote object store to the local cache
/// (specified by `index_key`).
pub(crate) async fn download(
&self,
index_key: IndexKey,
remote_path: &str,
remote_store: &ObjectStore,
file_size: u64,
) -> Result<()> {
const DOWNLOAD_READER_CONCURRENCY: usize = 8;
const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8);
let file_type = index_key.file_type;
let timer = WRITE_CACHE_DOWNLOAD_ELAPSED
.with_label_values(&[match file_type {
FileType::Parquet => "download_parquet",
FileType::Puffin => "download_puffin",
}])
.start_timer();
let reader = remote_store
.reader_with(remote_path)
.concurrent(DOWNLOAD_READER_CONCURRENCY)
.chunk(DOWNLOAD_READER_CHUNK_SIZE.as_bytes() as usize)
.await
.context(error::OpenDalSnafu)?
.into_futures_async_read(0..file_size)
.await
.context(error::OpenDalSnafu)?;
let cache_path = self.file_cache.cache_file_path(index_key);
let mut writer = self
.file_cache
.local_store()
.writer(&cache_path)
.await
.context(error::OpenDalSnafu)?
.into_futures_async_write();
let region_id = index_key.region_id;
let file_id = index_key.file_id;
let bytes_written =
futures::io::copy(reader, &mut writer)
.await
.context(error::DownloadSnafu {
region_id,
file_id,
file_type,
})?;
writer.close().await.context(error::DownloadSnafu {
region_id,
file_id,
file_type,
})?;
WRITE_CACHE_DOWNLOAD_BYTES_TOTAL.inc_by(bytes_written);
let elapsed = timer.stop_and_record();
debug!(
"Successfully download file '{}' to local '{}', file size: {}, region: {}, cost: {:?}s",
remote_path, cache_path, bytes_written, region_id, elapsed,
);
let index_value = IndexValue {
file_size: bytes_written as _,
};
self.file_cache.put(index_key, index_value).await;
Ok(())
}
/// Uploads a Parquet file or a Puffin file to the remote object store.
async fn upload(
&self,
@@ -351,6 +429,13 @@ mod tests {
.await
.unwrap();
assert_eq!(remote_index_data.to_vec(), cache_index_data.to_vec());
// Removes the file from the cache.
let sst_index_key = IndexKey::new(region_id, file_id, FileType::Parquet);
write_cache.remove(sst_index_key).await;
assert!(!write_cache.file_cache.contains_key(&sst_index_key));
write_cache.remove(index_key).await;
assert!(!write_cache.file_cache.contains_key(&index_key));
}
#[tokio::test]

View File

@@ -94,6 +94,7 @@ async fn test_catchup_with_last_entry_id() {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: false,
entry_id: last_entry_id,
location_id: None,
}),
)
.await;
@@ -125,6 +126,7 @@ async fn test_catchup_with_last_entry_id() {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true,
entry_id: last_entry_id,
location_id: None,
}),
)
.await;
@@ -191,6 +193,7 @@ async fn test_catchup_with_incorrect_last_entry_id() {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: false,
entry_id: incorrect_last_entry_id,
location_id: None,
}),
)
.await
@@ -207,6 +210,7 @@ async fn test_catchup_with_incorrect_last_entry_id() {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: false,
entry_id: incorrect_last_entry_id,
location_id: None,
}),
)
.await;
@@ -255,6 +259,7 @@ async fn test_catchup_without_last_entry_id() {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: false,
entry_id: None,
location_id: None,
}),
)
.await;
@@ -285,6 +290,7 @@ async fn test_catchup_without_last_entry_id() {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true,
entry_id: None,
location_id: None,
}),
)
.await;
@@ -354,6 +360,7 @@ async fn test_catchup_with_manifest_update() {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: false,
entry_id: None,
location_id: None,
}),
)
.await;
@@ -390,6 +397,7 @@ async fn test_catchup_with_manifest_update() {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true,
entry_id: None,
location_id: None,
}),
)
.await;
@@ -411,6 +419,7 @@ async fn test_catchup_not_exist() {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true,
entry_id: None,
location_id: None,
}),
)
.await

View File

@@ -12,21 +12,96 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use object_store::ObjectStore;
use store_api::region_engine::RegionEngine;
use store_api::region_request::RegionRequest;
use store_api::storage::RegionId;
use tokio::sync::Barrier;
use tokio::sync::{oneshot, Barrier};
use crate::config::MitoConfig;
use crate::engine::listener::EventListener;
use crate::engine::MitoEngine;
use crate::manifest::action::RegionEdit;
use crate::region::MitoRegionRef;
use crate::sst::file::{FileId, FileMeta};
use crate::test_util::{CreateRequestBuilder, TestEnv};
#[tokio::test]
async fn test_edit_region_fill_cache() {
let mut env = TestEnv::new();
struct EditRegionListener {
tx: Mutex<Option<oneshot::Sender<FileId>>>,
}
impl EventListener for EditRegionListener {
fn on_file_cache_filled(&self, file_id: FileId) {
let mut tx = self.tx.lock().unwrap();
tx.take().unwrap().send(file_id).unwrap();
}
}
let (tx, rx) = oneshot::channel();
let engine = env
.create_engine_with(
MitoConfig {
// Write cache must be enabled to download the ingested SST file.
enable_experimental_write_cache: true,
..Default::default()
},
None,
Some(Arc::new(EditRegionListener {
tx: Mutex::new(Some(tx)),
})),
)
.await;
let region_id = RegionId::new(1, 1);
engine
.handle_request(
region_id,
RegionRequest::Create(CreateRequestBuilder::new().build()),
)
.await
.unwrap();
let region = engine.get_region(region_id).unwrap();
let file_id = FileId::random();
// Simulating the ingestion of an SST file.
env.get_object_store()
.unwrap()
.write(
&format!("{}/{}.parquet", region.region_dir(), file_id),
b"x".as_slice(),
)
.await
.unwrap();
let edit = RegionEdit {
files_to_add: vec![FileMeta {
region_id: region.region_id,
file_id,
level: 0,
..Default::default()
}],
files_to_remove: vec![],
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
};
engine.edit_region(region.region_id, edit).await.unwrap();
// Asserts that the background downloading of the SST is succeeded.
let actual = tokio::time::timeout(Duration::from_secs(9), rx)
.await
.unwrap()
.unwrap();
assert_eq!(file_id, actual);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_edit_region_concurrently() {
const EDITS_PER_TASK: usize = 10;

View File

@@ -22,6 +22,8 @@ use common_telemetry::info;
use store_api::storage::RegionId;
use tokio::sync::Notify;
use crate::sst::file::FileId;
/// Mito engine background event listener.
#[async_trait]
pub trait EventListener: Send + Sync {
@@ -61,6 +63,9 @@ pub trait EventListener: Send + Sync {
fn on_recv_requests(&self, request_num: usize) {
let _ = request_num;
}
/// Notifies the listener that the file cache is filled when, for example, editing region.
fn on_file_cache_filled(&self, _file_id: FileId) {}
}
pub type EventListenerRef = Arc<dyn EventListener>;

View File

@@ -639,6 +639,22 @@ pub enum Error {
location: Location,
},
#[snafu(display(
"Failed to download file, region_id: {}, file_id: {}, file_type: {:?}",
region_id,
file_id,
file_type,
))]
Download {
region_id: RegionId,
file_id: FileId,
file_type: FileType,
#[snafu(source)]
error: std::io::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Failed to upload file, region_id: {}, file_id: {}, file_type: {:?}",
region_id,
@@ -965,7 +981,7 @@ impl ErrorExt for Error {
FilterRecordBatch { source, .. } => source.status_code(),
Upload { .. } => StatusCode::StorageUnavailable,
Download { .. } | Upload { .. } => StatusCode::StorageUnavailable,
ChecksumMismatch { .. } => StatusCode::Unexpected,
RegionStopped { .. } => StatusCode::RegionNotReady,
TimeRangePredicateOverflow { .. } => StatusCode::InvalidArguments,

View File

@@ -189,6 +189,17 @@ lazy_static! {
&[TYPE_LABEL]
)
.unwrap();
/// Download bytes counter in the write cache.
pub static ref WRITE_CACHE_DOWNLOAD_BYTES_TOTAL: IntCounter = register_int_counter!(
"mito_write_cache_download_bytes_total",
"mito write cache download bytes total",
).unwrap();
/// Timer of the downloading task in the write cache.
pub static ref WRITE_CACHE_DOWNLOAD_ELAPSED: HistogramVec = register_histogram_vec!(
"mito_write_cache_download_elapsed",
"mito write cache download elapsed",
&[TYPE_LABEL],
).unwrap();
/// Upload bytes counter.
pub static ref UPLOAD_BYTES_TOTAL: IntCounter = register_int_counter!(
"mito_upload_bytes_total",

View File

@@ -213,6 +213,10 @@ impl RowGroupLastRowReader {
/// Updates row group's last row cache if cache manager is present.
fn maybe_update_cache(&mut self) {
if let Some(cache) = &self.cache_manager {
if self.yielded_batches.is_empty() {
// we always expect that row groups yields batches.
return;
}
let value = Arc::new(SelectorResultValue {
result: std::mem::take(&mut self.yielded_batches),
projection: self

View File

@@ -25,7 +25,7 @@ use common_time::range::TimestampRange;
use common_time::Timestamp;
use datafusion::physical_plan::DisplayFormatType;
use smallvec::SmallVec;
use store_api::region_engine::RegionScannerRef;
use store_api::region_engine::{PartitionRange, RegionScannerRef};
use store_api::storage::{ScanRequest, TimeSeriesRowSelector};
use table::predicate::{build_time_range_predicate, Predicate};
use tokio::sync::{mpsc, Mutex, Semaphore};
@@ -705,6 +705,37 @@ impl ScanInput {
let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
rows_in_files + rows_in_memtables
}
/// Retrieves [`PartitionRange`] from memtable and files
pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
let mut id = 0;
let mut container = Vec::with_capacity(self.memtables.len() + self.files.len());
for memtable in &self.memtables {
let range = PartitionRange {
// TODO(ruihang): filter out empty memtables in the future.
start: memtable.stats().time_range().unwrap().0,
end: memtable.stats().time_range().unwrap().1,
num_rows: memtable.stats().num_rows(),
identifier: id,
};
id += 1;
container.push(range);
}
for file in &self.files {
let range = PartitionRange {
start: file.meta_ref().time_range.0,
end: file.meta_ref().time_range.1,
num_rows: file.meta_ref().num_rows as usize,
identifier: id,
};
id += 1;
container.push(range);
}
container
}
}
#[cfg(test)]

View File

@@ -67,10 +67,11 @@ impl SeqScan {
/// Creates a new [SeqScan].
pub(crate) fn new(input: ScanInput) -> Self {
let parallelism = input.parallelism.parallelism.max(1);
let properties = ScannerProperties::default()
let mut properties = ScannerProperties::default()
.with_parallelism(parallelism)
.with_append_mode(input.append_mode)
.with_total_rows(input.total_rows());
properties.partitions = vec![input.partition_ranges()];
let stream_ctx = Arc::new(StreamContext::new(input));
Self {

View File

@@ -202,8 +202,8 @@ impl RegionOpener {
options.need_dedup(),
options.merge_mode(),
);
// Initial memtable id is 0.
let part_duration = options.compaction.time_window();
// Initial memtable id is 0.
let mutable = Arc::new(TimePartitions::new(
metadata.clone(),
memtable_builder.clone(),
@@ -313,7 +313,7 @@ impl RegionOpener {
let wal_entry_reader = self
.wal_entry_reader
.take()
.unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id));
.unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id, None));
let on_region_opened = wal.on_region_opened();
let object_store = self.object_store(&region_options.storage)?.clone();
@@ -335,8 +335,13 @@ impl RegionOpener {
region_options.need_dedup(),
region_options.merge_mode(),
);
// Use compaction time window in the manifest if region doesn't provide
// the time window option.
let part_duration = region_options
.compaction
.time_window()
.or(manifest.compaction_time_window);
// Initial memtable id is 0.
let part_duration = region_options.compaction.time_window();
let mutable = Arc::new(TimePartitions::new(
metadata.clone(),
memtable_builder.clone(),

View File

@@ -18,6 +18,7 @@ use std::sync::Arc;
use common_telemetry::{error, info};
use crate::access_layer::AccessLayerRef;
use crate::cache::file_cache::{FileType, IndexKey};
use crate::cache::CacheManagerRef;
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::file::FileMeta;
@@ -77,9 +78,10 @@ impl FilePurger for LocalFilePurger {
cache.remove_parquet_meta_data(file_meta.region_id, file_meta.file_id);
}
let cache_manager = self.cache_manager.clone();
if let Err(e) = self.scheduler.schedule(Box::pin(async move {
if let Err(e) = sst_layer.delete_sst(&file_meta).await {
error!(e; "Failed to delete SST file, file_id: {}, region: {}",
error!(e; "Failed to delete SST file, file_id: {}, region: {}",
file_meta.file_id, file_meta.region_id);
} else {
info!(
@@ -87,6 +89,28 @@ impl FilePurger for LocalFilePurger {
file_meta.file_id, file_meta.region_id
);
}
if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache())
{
// Removes the inverted index from the cache.
if file_meta.inverted_index_available() {
write_cache
.remove(IndexKey::new(
file_meta.region_id,
file_meta.file_id,
FileType::Puffin,
))
.await;
}
// Remove the SST file from the cache.
write_cache
.remove(IndexKey::new(
file_meta.region_id,
file_meta.file_id,
FileType::Parquet,
))
.await;
}
})) {
error!(e; "Failed to schedule the file purge request");
}

View File

@@ -364,7 +364,7 @@ impl TestEnv {
.as_path()
.display()
.to_string();
let mut builder = Fs::default();
let builder = Fs::default();
let object_store = ObjectStore::new(builder.root(&data_path)).unwrap().finish();
object_store_manager.add(storage_name, object_store);
}

View File

@@ -30,7 +30,7 @@ use prost::Message;
use snafu::ResultExt;
use store_api::logstore::entry::Entry;
use store_api::logstore::provider::Provider;
use store_api::logstore::{AppendBatchResponse, LogStore};
use store_api::logstore::{AppendBatchResponse, LogStore, WalIndex};
use store_api::storage::RegionId;
use crate::error::{BuildEntrySnafu, DeleteWalSnafu, EncodeWalSnafu, Result, WriteWalSnafu};
@@ -102,15 +102,24 @@ impl<S: LogStore> Wal<S> {
&self,
provider: &Provider,
region_id: RegionId,
location_id: Option<u64>,
) -> Box<dyn WalEntryReader> {
match provider {
Provider::RaftEngine(_) => Box::new(LogStoreEntryReader::new(
LogStoreRawEntryReader::new(self.store.clone()),
)),
Provider::Kafka(_) => Box::new(LogStoreEntryReader::new(RegionRawEntryReader::new(
LogStoreRawEntryReader::new(self.store.clone()),
region_id,
))),
Provider::Kafka(_) => {
let reader = if let Some(location_id) = location_id {
LogStoreRawEntryReader::new(self.store.clone())
.with_wal_index(WalIndex::new(region_id, location_id))
} else {
LogStoreRawEntryReader::new(self.store.clone())
};
Box::new(LogStoreEntryReader::new(RegionRawEntryReader::new(
reader, region_id,
)))
}
}
}

View File

@@ -20,7 +20,7 @@ use futures::stream::BoxStream;
use snafu::ResultExt;
use store_api::logstore::entry::Entry;
use store_api::logstore::provider::Provider;
use store_api::logstore::LogStore;
use store_api::logstore::{LogStore, WalIndex};
use store_api::storage::RegionId;
use tokio_stream::StreamExt;
@@ -38,11 +38,20 @@ pub(crate) trait RawEntryReader: Send + Sync {
/// Implement the [RawEntryReader] for the [LogStore].
pub struct LogStoreRawEntryReader<S> {
store: Arc<S>,
wal_index: Option<WalIndex>,
}
impl<S> LogStoreRawEntryReader<S> {
pub fn new(store: Arc<S>) -> Self {
Self { store }
Self {
store,
wal_index: None,
}
}
pub fn with_wal_index(mut self, wal_index: WalIndex) -> Self {
self.wal_index = Some(wal_index);
self
}
}
@@ -50,9 +59,10 @@ impl<S: LogStore> RawEntryReader for LogStoreRawEntryReader<S> {
fn read(&self, provider: &Provider, start_id: EntryId) -> Result<EntryStream<'static>> {
let store = self.store.clone();
let provider = provider.clone();
let wal_index = self.wal_index;
let stream = try_stream!({
let mut stream = store
.read(&provider, start_id)
.read(&provider, start_id, wal_index)
.await
.map_err(BoxedError::new)
.with_context(|_| error::ReadWalSnafu {
@@ -119,7 +129,9 @@ mod tests {
use futures::{stream, TryStreamExt};
use store_api::logstore::entry::{Entry, NaiveEntry};
use store_api::logstore::{AppendBatchResponse, EntryId, LogStore, SendableEntryStream};
use store_api::logstore::{
AppendBatchResponse, EntryId, LogStore, SendableEntryStream, WalIndex,
};
use store_api::storage::RegionId;
use super::*;
@@ -149,6 +161,7 @@ mod tests {
&self,
_provider: &Provider,
_id: EntryId,
_index: Option<WalIndex>,
) -> Result<SendableEntryStream<'static, Entry, Self::Error>, Self::Error> {
Ok(Box::pin(stream::iter(vec![Ok(self.entries.clone())])))
}

View File

@@ -57,6 +57,7 @@ use crate::request::{
BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest,
};
use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
use crate::sst::file::FileId;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::time_provider::{StdTimeProvider, TimeProviderRef};
@@ -950,6 +951,13 @@ impl WorkerListener {
// Avoid compiler warning.
let _ = request_num;
}
pub(crate) fn on_file_cache_filled(&self, _file_id: FileId) {
#[cfg(any(test, feature = "test"))]
if let Some(listener) = &self.listener {
listener.on_file_cache_filled(_file_id);
}
}
}
#[cfg(test)]

View File

@@ -74,7 +74,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
let flushed_entry_id = region.version_control.current().last_entry_id;
info!("Trying to replay memtable for region: {region_id}, flushed entry id: {flushed_entry_id}");
let timer = Instant::now();
let wal_entry_reader = self.wal.wal_entry_reader(&region.provider, region_id);
let wal_entry_reader =
self.wal
.wal_entry_reader(&region.provider, region_id, request.location_id);
let on_region_opened = self.wal.on_region_opened();
let last_entry_id = replay_memtable(
&region.provider,
@@ -93,7 +95,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
);
if let Some(expected_last_entry_id) = request.entry_id {
ensure!(
expected_last_entry_id == last_entry_id,
// The replayed last entry id may be greater than the `expected_last_entry_id`.
last_entry_id >= expected_last_entry_id,
error::UnexpectedReplaySnafu {
region_id,
expected_last_entry_id,

View File

@@ -21,6 +21,8 @@ use std::collections::{HashMap, VecDeque};
use common_telemetry::{info, warn};
use store_api::storage::RegionId;
use crate::cache::file_cache::{FileType, IndexKey};
use crate::cache::CacheManagerRef;
use crate::error::{RegionBusySnafu, RegionNotFoundSnafu, Result};
use crate::manifest::action::{
RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate,
@@ -30,7 +32,8 @@ use crate::request::{
BackgroundNotify, OptionOutputTx, RegionChangeResult, RegionEditRequest, RegionEditResult,
TruncateResult, WorkerRequest,
};
use crate::worker::RegionWorkerLoop;
use crate::sst::location;
use crate::worker::{RegionWorkerLoop, WorkerListener};
pub(crate) type RegionEditQueues = HashMap<RegionId, RegionEditQueue>;
@@ -105,10 +108,12 @@ impl<S> RegionWorkerLoop<S> {
}
let request_sender = self.sender.clone();
let cache_manager = self.cache_manager.clone();
let listener = self.listener.clone();
// Now the region is in editing state.
// Updates manifest in background.
common_runtime::spawn_global(async move {
let result = edit_region(&region, edit.clone()).await;
let result = edit_region(&region, edit.clone(), cache_manager, listener).await;
let notify = WorkerRequest::Background {
region_id,
notify: BackgroundNotify::RegionEdit(RegionEditResult {
@@ -286,8 +291,41 @@ impl<S> RegionWorkerLoop<S> {
}
/// Checks the edit, writes and applies it.
async fn edit_region(region: &MitoRegionRef, edit: RegionEdit) -> Result<()> {
async fn edit_region(
region: &MitoRegionRef,
edit: RegionEdit,
cache_manager: CacheManagerRef,
listener: WorkerListener,
) -> Result<()> {
let region_id = region.region_id;
if let Some(write_cache) = cache_manager.write_cache() {
for file_meta in &edit.files_to_add {
let write_cache = write_cache.clone();
let layer = region.access_layer.clone();
let listener = listener.clone();
let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Parquet);
let remote_path = location::sst_file_path(layer.region_dir(), file_meta.file_id);
let file_size = file_meta.file_size;
common_runtime::spawn_global(async move {
if write_cache
.download(index_key, &remote_path, layer.object_store(), file_size)
.await
.is_ok()
{
// Triggers the filling of the parquet metadata cache.
// The parquet file is already downloaded.
let _ = write_cache
.file_cache()
.get_parquet_meta_data(index_key)
.await;
listener.on_file_cache_filled(index_key.file_id);
}
});
}
}
info!("Applying {edit:?} to region {}", region_id);
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit));

View File

@@ -676,18 +676,6 @@ pub enum Error {
location: Location,
},
#[snafu(display(
"Invalid partition columns when creating table '{}', reason: {}",
table,
reason
))]
InvalidPartitionColumns {
table: String,
reason: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to prepare file table"))]
PrepareFileTable {
#[snafu(implicit)]
@@ -784,6 +772,12 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to upgrade catalog manager reference"))]
UpgradeCatalogManagerRef {
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -806,7 +800,6 @@ impl ErrorExt for Error {
| Error::ProjectSchema { .. }
| Error::UnsupportedFormat { .. }
| Error::ColumnNoneDefaultValue { .. }
| Error::InvalidPartitionColumns { .. }
| Error::PrepareFileTable { .. }
| Error::InferFileTableSchema { .. }
| Error::SchemaIncompatible { .. }
@@ -931,6 +924,8 @@ impl ErrorExt for Error {
Error::ExecuteAdminFunction { source, .. } => source.status_code(),
Error::BuildRecordBatch { source, .. } => source.status_code(),
Error::UpgradeCatalogManagerRef { .. } => StatusCode::Internal,
}
}

View File

@@ -23,8 +23,10 @@ mod set;
mod show;
mod tql;
use std::collections::HashMap;
use std::sync::Arc;
use catalog::kvbackend::KvBackendCatalogManager;
use catalog::CatalogManagerRef;
use client::RecordBatches;
use common_error::ext::BoxedError;
@@ -32,6 +34,7 @@ use common_meta::cache::TableRouteCacheRef;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl::ProcedureExecutorRef;
use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::view_info::{ViewInfoManager, ViewInfoManagerRef};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
@@ -60,7 +63,8 @@ use table::TableRef;
use self::set::{set_bytea_output, set_datestyle, set_timezone, validate_client_encoding};
use crate::error::{
self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, NotSupportedSnafu,
PlanStatementSnafu, Result, SchemaNotFoundSnafu, TableNotFoundSnafu,
PlanStatementSnafu, Result, SchemaNotFoundSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
UpgradeCatalogManagerRefSnafu,
};
use crate::insert::InserterRef;
use crate::statement::copy_database::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
@@ -251,6 +255,29 @@ impl StatementExecutor {
)
.await
}
Statement::ShowCreateDatabase(show) => {
let (catalog, database) =
idents_to_full_database_name(&show.database_name, &query_ctx)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let table_metadata_manager = self
.catalog_manager
.as_any()
.downcast_ref::<KvBackendCatalogManager>()
.map(|manager| manager.table_metadata_manager_ref().clone())
.context(UpgradeCatalogManagerRefSnafu)?;
let opts: HashMap<String, String> = table_metadata_manager
.schema_manager()
.get(SchemaNameKey::new(&catalog, &database))
.await
.context(TableMetadataManagerSnafu)?
.context(SchemaNotFoundSnafu {
schema_info: &database,
})?
.into();
self.show_create_database(&database, opts.into()).await
}
Statement::ShowCreateTable(show) => {
let (catalog, schema, table) =
table_idents_to_full_name(&show.table_name, &query_ctx)

Some files were not shown because too many files have changed in this diff Show More