Compare commits

..

35 Commits

Author SHA1 Message Date
Yingwen
eff07d5986 Merge pull request #16 from GreptimeTeam/feat/grpc-unary-insert
feat: GRPC unary insert method
2023-03-17 19:26:48 +08:00
luofucong
40c55e4da7 feat: GRPC unary insert method 2023-03-17 19:11:46 +08:00
Yingwen
8d113550cf Merge pull request #15 from GreptimeTeam/ci/release-yaml
ci: Adjust release yaml
2023-03-15 11:23:31 +08:00
evenyag
15a0ed0853 ci: Adjust release yaml 2023-03-14 19:36:55 +08:00
Ruihang Xia
44493e9d8c feat: impl flush on shutdown (#14)
* feat: impl flush on shutdown

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* powerful if-else!

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2023-03-14 18:29:38 +08:00
Lei, HUANG
efd15839d4 Merge pull request #13 from GreptimeTeam/chore/flush-message
chore(servers): Change flush message
2023-03-14 17:34:22 +08:00
evenyag
1f62b36537 chore(servers): Change flush message 2023-03-14 17:16:39 +08:00
Ruihang Xia
7b8e65ce93 chore: merge public repo (#12)
* feat: implement table flush (#1121)

* feat: add flush method for trait

* feat: implement flush via grpc

* chore: move table_dir/region_name/region_id to table crate

* chore: Update src/mito/src/table.rs

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>

* fix: use correct env var (#1166)

* fix: use correct env var

* fix: move COPY up so rustup know it's nightly

* fix: add `pyo3_backend` in GHA yml

* chore: name for `TODO`

* temp: not set `pyo3_backend` before find DSO

* fix: release linux with pyo3_backend

* fix: failed to run subquery wrapped in two parentheses (#1157)

* refactor: add the separate GitHub Action job to push the image to the UCloud registry (#1170)

* refactor: make the cmd hold the application instance (#1159)

* fix: export 'PYO3_CROSS_LIB_DIR' when cargo build for aarch64-linux and refactor matrix opts (#1171)

---------

Co-authored-by: Weny Xu <wenymedia@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
Co-authored-by: discord9 <55937128+discord9@users.noreply.github.com>
Co-authored-by: LFC <luofucong@greptime.com>
Co-authored-by: zyy17 <zyylsxm@gmail.com>
2023-03-14 16:42:40 +08:00
Yingwen
6475339ad0 Merge pull request #10 from GreptimeTeam/feat/manual-flush-http
feat: manual flush http API
2023-03-14 16:37:56 +08:00
Lei, HUANG
0bd802c70d Update src/servers/src/error.rs
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
2023-03-14 16:26:47 +08:00
Lei, HUANG
28d07c7a2e Merge pull request #9 from GreptimeTeam/docs/edge-example-toml
docs(config): Add edge example
2023-03-14 16:25:29 +08:00
Lei, HUANG
dc33b0c0ce Merge pull request #11 from GreptimeTeam/chore/adjust-log
chore(storage): Adjust log level
2023-03-14 16:25:05 +08:00
evenyag
4b4f8f27e8 chore(storage): Adjust log level 2023-03-14 16:14:25 +08:00
Lei, HUANG
c994e0de88 fix: format 2023-03-14 16:13:57 +08:00
Lei, HUANG
d1ba9ca126 fix: unit tests 2023-03-14 16:11:25 +08:00
evenyag
0877dabce2 docs(config): Add edge example 2023-03-14 16:02:11 +08:00
Lei, HUANG
8b9671f376 feat: manual flush http API 2023-03-14 15:49:37 +08:00
Yingwen
dcf66d9d52 chore(datanode): derive serde default for Wal/CompactionConfig (#8) 2023-03-14 15:22:13 +08:00
Yingwen
65b61e78ad Merge pull request #6 from GreptimeTeam/docs/project-version
docs: Set greptimedb-edge version to 0.1.0
2023-03-14 15:01:29 +08:00
evenyag
3638704f95 docs: Set greptimedb-edge version to 0.1.0 2023-03-14 14:38:00 +08:00
Lei, HUANG
8a2f4256bf Merge pull request #5 from GreptimeTeam/feat/wait-flush-done
feat: Region writer wait flush done
2023-03-14 14:28:39 +08:00
evenyag
83aeadc506 feat: Region writer wait flush done 2023-03-14 14:09:22 +08:00
Lei, HUANG
f556052951 Merge pull request #3 from GreptimeTeam/feat/merge-public
feat: Merge develop branch of public repo
2023-03-14 14:05:38 +08:00
LFC
8658d428e0 fix: failed to run subquery wrapped in two parentheses (#1157) 2023-03-14 11:52:25 +08:00
discord9
e8e11072f8 fix: use correct env var (#1166)
* fix: use correct env var

* fix: move COPY up so rustup know it's nightly

* fix: add `pyo3_backend` in GHA yml

* chore: name for `TODO`

* temp: not set `pyo3_backend` before find DSO

* fix: release linux with pyo3_backend
2023-03-14 11:52:25 +08:00
Weny Xu
6f0f72c377 feat: implement table flush (#1121)
* feat: add flush method for trait

* feat: implement flush via grpc

* chore: move table_dir/region_name/region_id to table crate

* chore: Update src/mito/src/table.rs

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
2023-03-14 11:52:25 +08:00
Yingwen
32030a8194 Merge pull request #4 from GreptimeTeam/ci/remove-unnecessary
ci: Remove api doc ci and coverage statistics
2023-03-14 11:51:21 +08:00
evenyag
0f7cde2411 ci: Remove api doc ci and coverage statistics 2023-03-14 11:46:48 +08:00
Lei, HUANG
1ece402ec8 Merge pull request #2 from GreptimeTeam/feat/skip-wal
feat: skip wal for user table
2023-03-14 11:39:20 +08:00
Lei, HUANG
7ee54b3e69 fix: don't skip wal in test 2023-03-14 11:19:17 +08:00
Lei, HUANG
9b4dcba8cf fix: check errors 2023-03-14 10:51:16 +08:00
Lei, HUANG
c3bcb1111f fix: fmt 2023-03-13 20:24:25 +08:00
Lei, HUANG
a4ebd03a61 feat: skip wal for user table 2023-03-13 20:18:41 +08:00
Lei, HUANG
e7daf1226f feat: tune parquet parameters 2023-03-13 20:17:23 +08:00
Lei, HUANG
05c0ea9a59 feat: tune parquet parameters (#1)
* feat: tune parquet parameters

* Update src/storage/src/sst/parquet.rs

---------

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
2023-03-13 20:11:06 +08:00
77 changed files with 1012 additions and 1425 deletions

View File

@@ -2,7 +2,7 @@
GT_S3_BUCKET=S3 bucket
GT_S3_ACCESS_KEY_ID=S3 access key id
GT_S3_ACCESS_KEY=S3 secret access key
GT_S3_ENDPOINT_URL=S3 endpoint url
# Settings for oss test
GT_OSS_BUCKET=OSS bucket
GT_OSS_ACCESS_KEY_ID=OSS access key id

View File

@@ -1,42 +1,42 @@
on:
push:
branches:
- develop
paths-ignore:
- 'docs/**'
- 'config/**'
- '**.md'
- '.dockerignore'
- 'docker/**'
- '.gitignore'
# on:
# push:
# branches:
# - develop
# paths-ignore:
# - 'docs/**'
# - 'config/**'
# - '**.md'
# - '.dockerignore'
# - 'docker/**'
# - '.gitignore'
name: Build API docs
# name: Build API docs
env:
RUST_TOOLCHAIN: nightly-2023-02-26
# env:
# RUST_TOOLCHAIN: nightly-2023-02-26
jobs:
apidoc:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: arduino/setup-protoc@v1
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.RUST_TOOLCHAIN }}
- run: cargo doc --workspace --no-deps --document-private-items
- run: |
cat <<EOF > target/doc/index.html
<!DOCTYPE html>
<html>
<head>
<meta http-equiv="refresh" content="0; url='greptime/'" />
</head>
<body></body></html>
EOF
- name: Publish dist directory
uses: JamesIves/github-pages-deploy-action@v4
with:
folder: target/doc
# jobs:
# apidoc:
# runs-on: ubuntu-latest
# steps:
# - uses: actions/checkout@v3
# - uses: arduino/setup-protoc@v1
# with:
# repo-token: ${{ secrets.GITHUB_TOKEN }}
# - uses: dtolnay/rust-toolchain@master
# with:
# toolchain: ${{ env.RUST_TOOLCHAIN }}
# - run: cargo doc --workspace --no-deps --document-private-items
# - run: |
# cat <<EOF > target/doc/index.html
# <!DOCTYPE html>
# <html>
# <head>
# <meta http-equiv="refresh" content="0; url='greptime/'" />
# </head>
# <body></body></html>
# EOF
# - name: Publish dist directory
# uses: JamesIves/github-pages-deploy-action@v4
# with:
# folder: target/doc

View File

@@ -213,10 +213,11 @@ jobs:
python-version: '3.10'
- name: Install PyArrow Package
run: pip install pyarrow
- name: Install cargo-llvm-cov
uses: taiki-e/install-action@cargo-llvm-cov
# - name: Install cargo-llvm-cov
# uses: taiki-e/install-action@cargo-llvm-cov
- name: Collect coverage data
run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F pyo3_backend
run: cargo nextest run -F pyo3_backend
# run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F pyo3_backend
env:
CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=lld"
RUST_BACKTRACE: 1

View File

@@ -2,9 +2,9 @@ on:
push:
tags:
- "v*.*.*"
schedule:
# At 00:00 on Monday.
- cron: '0 0 * * 1'
# schedule:
# # At 00:00 on Monday.
# - cron: '0 0 * * 1'
workflow_dispatch:
name: Release
@@ -18,9 +18,6 @@ env:
CARGO_PROFILE: nightly
## FIXME(zyy17): Enable it after the tests are stabled.
DISABLE_RUN_TESTS: true
jobs:
build:
name: Build binary
@@ -32,25 +29,23 @@ jobs:
os: ubuntu-2004-16-cores
file: greptime-linux-amd64
continue-on-error: false
opts: "-F pyo3_backend"
# opts: "-F pyo3_backend"
- arch: aarch64-unknown-linux-gnu
os: ubuntu-2004-16-cores
file: greptime-linux-arm64
continue-on-error: false
opts: "-F pyo3_backend"
- arch: aarch64-apple-darwin
os: macos-latest
file: greptime-darwin-arm64
continue-on-error: false
opts: "-F pyo3_backend"
- arch: x86_64-apple-darwin
os: macos-latest
file: greptime-darwin-amd64
continue-on-error: false
opts: "-F pyo3_backend"
continue-on-error: true
# opts: "-F pyo3_backend"
# - arch: aarch64-apple-darwin
# os: macos-latest
# file: greptime-darwin-arm64
# continue-on-error: true
# - arch: x86_64-apple-darwin
# os: macos-latest
# file: greptime-darwin-amd64
# continue-on-error: true
runs-on: ${{ matrix.os }}
continue-on-error: ${{ matrix.continue-on-error }}
if: github.repository == 'GreptimeTeam/greptimedb'
if: github.repository == 'GreptimeTeam/greptimedb-edge'
steps:
- name: Checkout sources
uses: actions/checkout@v3
@@ -121,7 +116,6 @@ jobs:
run: protoc --version ; cargo version ; rustc --version ; gcc --version ; g++ --version
- name: Run tests
if: env.DISABLE_RUN_TESTS == 'false'
run: make unit-test integration-test sqlness-test
- name: Run cargo build for aarch64-linux
@@ -156,100 +150,11 @@ jobs:
with:
name: ${{ matrix.file }}.sha256sum
path: target/${{ matrix.arch }}/${{ env.CARGO_PROFILE }}/${{ matrix.file }}.sha256sum
docker:
name: Build docker image
needs: [build]
runs-on: ubuntu-latest
if: github.repository == 'GreptimeTeam/greptimedb'
steps:
- name: Checkout sources
uses: actions/checkout@v3
- name: Login to Dockerhub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Configure scheduled build image tag # the tag would be ${SCHEDULED_BUILD_VERSION_PREFIX}-YYYYMMDD-${SCHEDULED_PERIOD}
shell: bash
if: github.event_name == 'schedule'
run: |
buildTime=`date "+%Y%m%d"`
SCHEDULED_BUILD_VERSION=${{ env.SCHEDULED_BUILD_VERSION_PREFIX }}-$buildTime-${{ env.SCHEDULED_PERIOD }}
echo "IMAGE_TAG=${SCHEDULED_BUILD_VERSION:1}" >> $GITHUB_ENV
- name: Configure tag # If the release tag is v0.1.0, then the image version tag will be 0.1.0.
shell: bash
if: github.event_name != 'schedule'
run: |
VERSION=${{ github.ref_name }}
echo "IMAGE_TAG=${VERSION:1}" >> $GITHUB_ENV
- name: Set up QEMU
uses: docker/setup-qemu-action@v2
- name: Set up buildx
uses: docker/setup-buildx-action@v2
- name: Download amd64 binary
uses: actions/download-artifact@v3
with:
name: greptime-linux-amd64
path: amd64
- name: Unzip the amd64 artifacts
run: |
cd amd64
tar xvf greptime-linux-amd64.tgz
rm greptime-linux-amd64.tgz
- name: Download arm64 binary
id: download-arm64
uses: actions/download-artifact@v3
with:
name: greptime-linux-arm64
path: arm64
- name: Unzip the arm64 artifacts
id: unzip-arm64
if: success() || steps.download-arm64.conclusion == 'success'
run: |
cd arm64
tar xvf greptime-linux-arm64.tgz
rm greptime-linux-arm64.tgz
- name: Build and push all
uses: docker/build-push-action@v3
if: success() || steps.unzip-arm64.conclusion == 'success' # Build and push all platform if unzip-arm64 succeeds
with:
context: .
file: ./docker/ci/Dockerfile
push: true
platforms: linux/amd64,linux/arm64
tags: |
greptime/greptimedb:latest
greptime/greptimedb:${{ env.IMAGE_TAG }}
- name: Build and push amd64 only
uses: docker/build-push-action@v3
if: success() || steps.download-arm64.conclusion == 'failure' # Only build and push amd64 platform if download-arm64 fails
with:
context: .
file: ./docker/ci/Dockerfile
push: true
platforms: linux/amd64
tags: |
greptime/greptimedb:latest
greptime/greptimedb:${{ env.IMAGE_TAG }}
release:
name: Release artifacts
# Release artifacts only when all the artifacts are built successfully.
needs: [build,docker]
needs: [build]
runs-on: ubuntu-latest
if: github.repository == 'GreptimeTeam/greptimedb'
if: github.repository == 'GreptimeTeam/greptimedb-edge'
steps:
- name: Checkout sources
uses: actions/checkout@v3
@@ -288,48 +193,142 @@ jobs:
files: |
**/greptime-*
docker-push-uhub:
name: Push docker image to UCloud Container Registry
needs: [docker]
runs-on: ubuntu-latest
if: github.repository == 'GreptimeTeam/greptimedb'
# Push to uhub may fail(500 error), but we don't want to block the release process. The failed job will be retried manually.
continue-on-error: true
steps:
- name: Checkout sources
uses: actions/checkout@v3
# docker:
# name: Build docker image
# needs: [build]
# runs-on: ubuntu-latest
# if: github.repository == 'GreptimeTeam/greptimedb'
# steps:
# - name: Checkout sources
# uses: actions/checkout@v3
- name: Set up QEMU
uses: docker/setup-qemu-action@v2
# - name: Login to UCloud Container Registry
# uses: docker/login-action@v2
# with:
# registry: uhub.service.ucloud.cn
# username: ${{ secrets.UCLOUD_USERNAME }}
# password: ${{ secrets.UCLOUD_PASSWORD }}
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
# - name: Login to Dockerhub
# uses: docker/login-action@v2
# with:
# username: ${{ secrets.DOCKERHUB_USERNAME }}
# password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Login to UCloud Container Registry
uses: docker/login-action@v2
with:
registry: uhub.service.ucloud.cn
username: ${{ secrets.UCLOUD_USERNAME }}
password: ${{ secrets.UCLOUD_PASSWORD }}
# - name: Configure scheduled build image tag # the tag would be ${SCHEDULED_BUILD_VERSION_PREFIX}-YYYYMMDD-${SCHEDULED_PERIOD}
# shell: bash
# if: github.event_name == 'schedule'
# run: |
# buildTime=`date "+%Y%m%d"`
# SCHEDULED_BUILD_VERSION=${{ env.SCHEDULED_BUILD_VERSION_PREFIX }}-$buildTime-${{ env.SCHEDULED_PERIOD }}
# echo "IMAGE_TAG=${SCHEDULED_BUILD_VERSION:1}" >> $GITHUB_ENV
- name: Configure scheduled build image tag # the tag would be ${SCHEDULED_BUILD_VERSION_PREFIX}-YYYYMMDD-${SCHEDULED_PERIOD}
shell: bash
if: github.event_name == 'schedule'
run: |
buildTime=`date "+%Y%m%d"`
SCHEDULED_BUILD_VERSION=${{ env.SCHEDULED_BUILD_VERSION_PREFIX }}-$buildTime-${{ env.SCHEDULED_PERIOD }}
echo "IMAGE_TAG=${SCHEDULED_BUILD_VERSION:1}" >> $GITHUB_ENV
# - name: Configure tag # If the release tag is v0.1.0, then the image version tag will be 0.1.0.
# shell: bash
# if: github.event_name != 'schedule'
# run: |
# VERSION=${{ github.ref_name }}
# echo "IMAGE_TAG=${VERSION:1}" >> $GITHUB_ENV
- name: Configure tag # If the release tag is v0.1.0, then the image version tag will be 0.1.0.
shell: bash
if: github.event_name != 'schedule'
run: |
VERSION=${{ github.ref_name }}
echo "IMAGE_TAG=${VERSION:1}" >> $GITHUB_ENV
# - name: Set up QEMU
# uses: docker/setup-qemu-action@v2
- name: Push image to uhub # Use 'docker buildx imagetools create' to create a new image base on source image.
run: |
docker buildx imagetools create \
--tag uhub.service.ucloud.cn/greptime/greptimedb:latest \
--tag uhub.service.ucloud.cn/greptime/greptimedb:${{ env.IMAGE_TAG }} \
greptime/greptimedb:${{ env.IMAGE_TAG }}
# - name: Set up buildx
# uses: docker/setup-buildx-action@v2
# - name: Download amd64 binary
# uses: actions/download-artifact@v3
# with:
# name: greptime-linux-amd64
# path: amd64
# - name: Unzip the amd64 artifacts
# run: |
# cd amd64
# tar xvf greptime-linux-amd64.tgz
# rm greptime-linux-amd64.tgz
# - name: Download arm64 binary
# id: download-arm64
# uses: actions/download-artifact@v3
# with:
# name: greptime-linux-arm64
# path: arm64
# - name: Unzip the arm64 artifacts
# id: unzip-arm64
# if: success() || steps.download-arm64.conclusion == 'success'
# run: |
# cd arm64
# tar xvf greptime-linux-arm64.tgz
# rm greptime-linux-arm64.tgz
# - name: Build and push all
# uses: docker/build-push-action@v3
# if: success() || steps.unzip-arm64.conclusion == 'success' # Build and push all platform if unzip-arm64 succeeds
# with:
# context: .
# file: ./docker/ci/Dockerfile
# push: true
# platforms: linux/amd64,linux/arm64
# tags: |
# greptime/greptimedb:latest
# greptime/greptimedb:${{ env.IMAGE_TAG }}
# - name: Build and push amd64 only
# uses: docker/build-push-action@v3
# if: success() || steps.download-arm64.conclusion == 'failure' # Only build and push amd64 platform if download-arm64 fails
# with:
# context: .
# file: ./docker/ci/Dockerfile
# push: true
# platforms: linux/amd64
# tags: |
# greptime/greptimedb:latest
# greptime/greptimedb:${{ env.IMAGE_TAG }}
# docker-push-uhub:
# name: Push docker image to UCloud Container Registry
# needs: [docker]
# runs-on: ubuntu-latest
# if: github.repository == 'GreptimeTeam/greptimedb'
# # Push to uhub may fail(500 error), but we don't want to block the release process. The failed job will be retried manually.
# continue-on-error: true
# steps:
# - name: Checkout sources
# uses: actions/checkout@v3
# - name: Set up QEMU
# uses: docker/setup-qemu-action@v2
# - name: Set up Docker Buildx
# uses: docker/setup-buildx-action@v2
# - name: Login to UCloud Container Registry
# uses: docker/login-action@v2
# with:
# registry: uhub.service.ucloud.cn
# username: ${{ secrets.UCLOUD_USERNAME }}
# password: ${{ secrets.UCLOUD_PASSWORD }}
# - name: Configure scheduled build image tag # the tag would be ${SCHEDULED_BUILD_VERSION_PREFIX}-YYYYMMDD-${SCHEDULED_PERIOD}
# shell: bash
# if: github.event_name == 'schedule'
# run: |
# buildTime=`date "+%Y%m%d"`
# SCHEDULED_BUILD_VERSION=${{ env.SCHEDULED_BUILD_VERSION_PREFIX }}-$buildTime-${{ env.SCHEDULED_PERIOD }}
# echo "IMAGE_TAG=${SCHEDULED_BUILD_VERSION:1}" >> $GITHUB_ENV
# - name: Configure tag # If the release tag is v0.1.0, then the image version tag will be 0.1.0.
# shell: bash
# if: github.event_name != 'schedule'
# run: |
# VERSION=${{ github.ref_name }}
# echo "IMAGE_TAG=${VERSION:1}" >> $GITHUB_ENV
# - name: Push image to uhub # Use 'docker buildx imagetools create' to create a new image base on source image.
# run: |
# docker buildx imagetools create \
# --tag uhub.service.ucloud.cn/greptime/greptimedb:latest \
# --tag uhub.service.ucloud.cn/greptime/greptimedb:${{ env.IMAGE_TAG }} \
# greptime/greptimedb:${{ env.IMAGE_TAG }}

131
Cargo.lock generated
View File

@@ -135,7 +135,7 @@ checksum = "8f1f8f5a6f3d50d89e3797d7593a50f96bb2aaa20ca0cc7be1fb673232c91d72"
[[package]]
name = "api"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"arrow-flight",
"common-base",
@@ -754,7 +754,7 @@ dependencies = [
[[package]]
name = "benchmarks"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"arrow",
"clap 4.1.8",
@@ -1088,7 +1088,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "catalog"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"api",
"arc-swap",
@@ -1339,7 +1339,7 @@ dependencies = [
[[package]]
name = "client"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"api",
"arrow-flight",
@@ -1362,7 +1362,7 @@ dependencies = [
"prost",
"rand",
"snafu",
"substrait 0.1.1",
"substrait 0.1.0",
"substrait 0.4.1",
"tokio",
"tonic",
@@ -1392,7 +1392,7 @@ dependencies = [
[[package]]
name = "cmd"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"anymap",
"build-data",
@@ -1420,7 +1420,7 @@ dependencies = [
"servers",
"session",
"snafu",
"substrait 0.1.1",
"substrait 0.1.0",
"tikv-jemalloc-ctl",
"tikv-jemallocator",
"tokio",
@@ -1456,7 +1456,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
[[package]]
name = "common-base"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"anymap",
"bitvec",
@@ -1470,7 +1470,7 @@ dependencies = [
[[package]]
name = "common-catalog"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"async-trait",
"chrono",
@@ -1487,7 +1487,7 @@ dependencies = [
[[package]]
name = "common-error"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"snafu",
"strum",
@@ -1495,7 +1495,7 @@ dependencies = [
[[package]]
name = "common-function"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"arc-swap",
"chrono-tz",
@@ -1518,7 +1518,7 @@ dependencies = [
[[package]]
name = "common-function-macro"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"arc-swap",
"common-query",
@@ -1532,7 +1532,7 @@ dependencies = [
[[package]]
name = "common-grpc"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"api",
"arrow-flight",
@@ -1558,7 +1558,7 @@ dependencies = [
[[package]]
name = "common-grpc-expr"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"api",
"async-trait",
@@ -1576,7 +1576,7 @@ dependencies = [
[[package]]
name = "common-mem-prof"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"common-error",
"snafu",
@@ -1589,10 +1589,9 @@ dependencies = [
[[package]]
name = "common-procedure"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"async-trait",
"backon 0.4.0",
"common-error",
"common-runtime",
"common-telemetry",
@@ -1610,7 +1609,7 @@ dependencies = [
[[package]]
name = "common-query"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"async-trait",
"common-base",
@@ -1628,7 +1627,7 @@ dependencies = [
[[package]]
name = "common-recordbatch"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"common-error",
"datafusion",
@@ -1644,7 +1643,7 @@ dependencies = [
[[package]]
name = "common-runtime"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"common-error",
"common-telemetry",
@@ -1658,7 +1657,7 @@ dependencies = [
[[package]]
name = "common-telemetry"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"backtrace",
"common-error",
@@ -1680,14 +1679,14 @@ dependencies = [
[[package]]
name = "common-test-util"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"tempfile",
]
[[package]]
name = "common-time"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"chrono",
"common-error",
@@ -2108,8 +2107,8 @@ dependencies = [
[[package]]
name = "datafusion"
version = "20.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=146a949218ec970784974137277cde3b4e547d0a#146a949218ec970784974137277cde3b4e547d0a"
version = "19.0.0"
source = "git+https://github.com/MichaelScofield/arrow-datafusion.git?rev=d7b3c730049f2561755f9d855f638cb580c38eff#d7b3c730049f2561755f9d855f638cb580c38eff"
dependencies = [
"ahash 0.8.3",
"arrow",
@@ -2138,6 +2137,7 @@ dependencies = [
"object_store",
"parking_lot",
"parquet",
"paste",
"percent-encoding",
"pin-project-lite",
"rand",
@@ -2155,8 +2155,8 @@ dependencies = [
[[package]]
name = "datafusion-common"
version = "20.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=146a949218ec970784974137277cde3b4e547d0a#146a949218ec970784974137277cde3b4e547d0a"
version = "19.0.0"
source = "git+https://github.com/MichaelScofield/arrow-datafusion.git?rev=d7b3c730049f2561755f9d855f638cb580c38eff#d7b3c730049f2561755f9d855f638cb580c38eff"
dependencies = [
"arrow",
"chrono",
@@ -2168,8 +2168,8 @@ dependencies = [
[[package]]
name = "datafusion-execution"
version = "20.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=146a949218ec970784974137277cde3b4e547d0a#146a949218ec970784974137277cde3b4e547d0a"
version = "19.0.0"
source = "git+https://github.com/MichaelScofield/arrow-datafusion.git?rev=d7b3c730049f2561755f9d855f638cb580c38eff#d7b3c730049f2561755f9d855f638cb580c38eff"
dependencies = [
"dashmap",
"datafusion-common",
@@ -2185,19 +2185,20 @@ dependencies = [
[[package]]
name = "datafusion-expr"
version = "20.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=146a949218ec970784974137277cde3b4e547d0a#146a949218ec970784974137277cde3b4e547d0a"
version = "19.0.0"
source = "git+https://github.com/MichaelScofield/arrow-datafusion.git?rev=d7b3c730049f2561755f9d855f638cb580c38eff#d7b3c730049f2561755f9d855f638cb580c38eff"
dependencies = [
"ahash 0.8.3",
"arrow",
"datafusion-common",
"log",
"sqlparser",
]
[[package]]
name = "datafusion-optimizer"
version = "20.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=146a949218ec970784974137277cde3b4e547d0a#146a949218ec970784974137277cde3b4e547d0a"
version = "19.0.0"
source = "git+https://github.com/MichaelScofield/arrow-datafusion.git?rev=d7b3c730049f2561755f9d855f638cb580c38eff#d7b3c730049f2561755f9d855f638cb580c38eff"
dependencies = [
"arrow",
"async-trait",
@@ -2213,8 +2214,8 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr"
version = "20.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=146a949218ec970784974137277cde3b4e547d0a#146a949218ec970784974137277cde3b4e547d0a"
version = "19.0.0"
source = "git+https://github.com/MichaelScofield/arrow-datafusion.git?rev=d7b3c730049f2561755f9d855f638cb580c38eff#d7b3c730049f2561755f9d855f638cb580c38eff"
dependencies = [
"ahash 0.8.3",
"arrow",
@@ -2232,6 +2233,7 @@ dependencies = [
"itertools",
"lazy_static",
"md-5",
"num-traits",
"paste",
"petgraph",
"rand",
@@ -2243,8 +2245,8 @@ dependencies = [
[[package]]
name = "datafusion-row"
version = "20.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=146a949218ec970784974137277cde3b4e547d0a#146a949218ec970784974137277cde3b4e547d0a"
version = "19.0.0"
source = "git+https://github.com/MichaelScofield/arrow-datafusion.git?rev=d7b3c730049f2561755f9d855f638cb580c38eff#d7b3c730049f2561755f9d855f638cb580c38eff"
dependencies = [
"arrow",
"datafusion-common",
@@ -2254,8 +2256,8 @@ dependencies = [
[[package]]
name = "datafusion-sql"
version = "20.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=146a949218ec970784974137277cde3b4e547d0a#146a949218ec970784974137277cde3b4e547d0a"
version = "19.0.0"
source = "git+https://github.com/MichaelScofield/arrow-datafusion.git?rev=d7b3c730049f2561755f9d855f638cb580c38eff#d7b3c730049f2561755f9d855f638cb580c38eff"
dependencies = [
"arrow-schema",
"datafusion-common",
@@ -2266,7 +2268,7 @@ dependencies = [
[[package]]
name = "datanode"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"api",
"async-compat",
@@ -2317,7 +2319,7 @@ dependencies = [
"sql",
"storage",
"store-api",
"substrait 0.1.1",
"substrait 0.1.0",
"table",
"table-procedure",
"tokio",
@@ -2327,12 +2329,11 @@ dependencies = [
"tower",
"tower-http",
"url",
"uuid",
]
[[package]]
name = "datatypes"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"arrow",
"arrow-schema",
@@ -2780,7 +2781,7 @@ dependencies = [
[[package]]
name = "frontend"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"api",
"async-stream",
@@ -2823,7 +2824,7 @@ dependencies = [
"sql",
"store-api",
"strfmt",
"substrait 0.1.1",
"substrait 0.1.0",
"table",
"tokio",
"toml",
@@ -3084,7 +3085,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=3a715150563b89d5dfc81a5838eac1f66a5658a1#3a715150563b89d5dfc81a5838eac1f66a5658a1"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=0a7b790ed41364b5599dff806d1080bd59c5c9f6#0a7b790ed41364b5599dff806d1080bd59c5c9f6"
dependencies = [
"prost",
"tonic",
@@ -3731,7 +3732,7 @@ dependencies = [
[[package]]
name = "log-store"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"arc-swap",
"async-stream",
@@ -3974,7 +3975,7 @@ dependencies = [
[[package]]
name = "meta-client"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"api",
"async-trait",
@@ -4001,7 +4002,7 @@ dependencies = [
[[package]]
name = "meta-srv"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"anymap",
"api",
@@ -4136,7 +4137,7 @@ dependencies = [
[[package]]
name = "mito"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"anymap",
"arc-swap",
@@ -4539,7 +4540,7 @@ dependencies = [
[[package]]
name = "object-store"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
@@ -4861,7 +4862,7 @@ dependencies = [
[[package]]
name = "partition"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"common-catalog",
"common-error",
@@ -5382,7 +5383,7 @@ dependencies = [
[[package]]
name = "promql"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"async-recursion",
"async-trait",
@@ -5614,7 +5615,7 @@ dependencies = [
[[package]]
name = "query"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"approx_eq",
"arc-swap",
@@ -6627,7 +6628,7 @@ checksum = "1792db035ce95be60c3f8853017b3999209281c24e2ba5bc8e59bf97a0c590c1"
[[package]]
name = "script"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"arrow",
"async-trait",
@@ -6857,7 +6858,7 @@ dependencies = [
[[package]]
name = "servers"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"aide",
"api",
@@ -6933,7 +6934,7 @@ dependencies = [
[[package]]
name = "session"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"arc-swap",
"common-catalog",
@@ -7170,7 +7171,7 @@ dependencies = [
[[package]]
name = "sql"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"api",
"catalog",
@@ -7205,7 +7206,7 @@ dependencies = [
[[package]]
name = "sqlness-runner"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"async-trait",
"client",
@@ -7283,7 +7284,7 @@ dependencies = [
[[package]]
name = "storage"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"arc-swap",
"arrow",
@@ -7331,7 +7332,7 @@ dependencies = [
[[package]]
name = "store-api"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"async-stream",
"async-trait",
@@ -7463,7 +7464,7 @@ dependencies = [
[[package]]
name = "substrait"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"async-recursion",
"async-trait",
@@ -7557,7 +7558,7 @@ dependencies = [
[[package]]
name = "table"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"anymap",
"async-trait",
@@ -7593,7 +7594,7 @@ dependencies = [
[[package]]
name = "table-procedure"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"async-trait",
"catalog",
@@ -7675,7 +7676,7 @@ dependencies = [
[[package]]
name = "tests-integration"
version = "0.1.1"
version = "0.1.0"
dependencies = [
"api",
"axum",

View File

@@ -45,7 +45,7 @@ members = [
]
[workspace.package]
version = "0.1.1"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
@@ -57,12 +57,13 @@ arrow-schema = { version = "34.0", features = ["serde"] }
async-stream = "0.3"
async-trait = "0.1"
chrono = { version = "0.4", features = ["serde"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "146a949218ec970784974137277cde3b4e547d0a" }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "146a949218ec970784974137277cde3b4e547d0a" }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "146a949218ec970784974137277cde3b4e547d0a" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "146a949218ec970784974137277cde3b4e547d0a" }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "146a949218ec970784974137277cde3b4e547d0a" }
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "146a949218ec970784974137277cde3b4e547d0a" }
# TODO(LFC): Use official DataFusion, when https://github.com/apache/arrow-datafusion/pull/5542 got merged
datafusion = { git = "https://github.com/MichaelScofield/arrow-datafusion.git", rev = "d7b3c730049f2561755f9d855f638cb580c38eff" }
datafusion-common = { git = "https://github.com/MichaelScofield/arrow-datafusion.git", rev = "d7b3c730049f2561755f9d855f638cb580c38eff" }
datafusion-expr = { git = "https://github.com/MichaelScofield/arrow-datafusion.git", rev = "d7b3c730049f2561755f9d855f638cb580c38eff" }
datafusion-optimizer = { git = "https://github.com/MichaelScofield/arrow-datafusion.git", rev = "d7b3c730049f2561755f9d855f638cb580c38eff" }
datafusion-physical-expr = { git = "https://github.com/MichaelScofield/arrow-datafusion.git", rev = "d7b3c730049f2561755f9d855f638cb580c38eff" }
datafusion-sql = { git = "https://github.com/MichaelScofield/arrow-datafusion.git", rev = "d7b3c730049f2561755f9d855f638cb580c38eff" }
futures = "0.3"
futures-util = "0.3"
parquet = "34.0"

View File

@@ -44,7 +44,5 @@ max_purge_tasks = 32
# Procedure storage options, see `standalone.example.toml`.
# [procedure.store]
# type = "File"
# data_dir = "/tmp/greptimedb/procedure/"
# max_retry_times = 3
# retry_delay = "500ms"
# type = 'File'
# data_dir = '/tmp/greptimedb/procedure/'

11
config/edge.example.toml Normal file
View File

@@ -0,0 +1,11 @@
# WAL options.
[wal]
# WAL data directory.
dir = "/tmp/greptimedb/wal"
# Storage options.
[storage]
# Storage type.
type = "File"
# Data directory, "/tmp/greptimedb/data" by default.
data_dir = "/tmp/greptimedb/data"

View File

@@ -114,7 +114,3 @@ max_purge_tasks = 32
# type = "File"
# # Procedure data path.
# data_dir = "/tmp/greptimedb/procedure/"
# # Procedure max retry time.
# max_retry_times = 3
# # Initial retry delay of procedures, increases exponentially
# retry_delay = "500ms"

View File

@@ -1,196 +0,0 @@
---
Feature Name: "Fault Tolerance for Region"
Tracking Issue: https://github.com/GreptimeTeam/greptimedb/issues/1126
Date: 2023-03-08
Author: "Luo Fucong <luofucong@greptime.com>"
---
Fault Tolerance for Region
----------------------
# Summary
This RFC proposes a method to achieve fault tolerance for regions in GreptimeDB's distributed mode. Or, put it in another way, achieving region high availability("HA") for GreptimeDB cluster.
In this RFC, we mainly describe two aspects of region HA: how region availability is detected, and what recovery process is need to be taken. We also discuss some alternatives and future work.
When this feature is done, our users could expect a GreptimeDB cluster that can always handle their requests to regions, despite some requests may failed during the region failover. The optimization to reduce the MTTR(Mean Time To Recovery) is not a concern of this RPC, and is left for future work.
# Motivation
Fault tolerance for regions is a critical feature for our clients to use the GreptimeDB cluster confidently. High availability for users to interact with their stored data is a "must have" for any TSDB products, that include our GreptimeDB cluster.
# Details
## Background
Some backgrounds about region in distributed mode:
- A table is logically split into multiple regions. Each region stores a part of non-overlapping table data.
- Regions are distributed in Datanodes, the mappings are not static, are assigned and governed by Metasrv.
- In distributed mode, client requests are scoped in regions. To be more specific, when a request that needs to scan multiple regions arrived in Frontend, Frontend splits the request into multiple sub-requests, each of which scans one region only, and submits them to Datanodes that hold corresponding regions.
In conclusion, as long as regions remain available, and regions could regain availability when failures do occur, the overall region HA could be achieved. With this in mind, let's see how region failures are detected first.
## Failure Detection
We detect region failures in Metasrv, and do it both passively and actively. Passively means that Metasrv do not fire some "are you healthy" requests to regions. Instead, we carry region healthy information in the heartbeat requests that are submit to Metasrv by Datanodes.
Datanode already carries its regions stats in the heartbeat request (the non-relevant fields are omitted):
```protobuf
message HeartbeatRequest {
...
// Region stats on this node
repeated RegionStat region_stats = 6;
...
}
message RegionStat {
uint64 region_id = 1;
TableName table_name = 2;
...
}
```
For the sake of simplicity, we don't add another field `bool available = 3` to the `RegionStat` message; instead, if the region were unavailable in the view of the Datanode that contains it, the Datanode just not includes the `RegionStat` of it in the heartbeat request. Or, if the Datanode itself is not unavailable, the heartbeat request is not submitted, effectively the same with not carrying the `RegionStat`.
> The heartbeat interval is now hardcoded to five seconds.
Metasrv gathers the heartbeat requests, extracts the `RegionStat`s, and treat them as region heartbeat. In this way, Metasrv maintains all regions healthy information. If some region's heartbeats were not received in a period of time, Metasrv speculates the region might be unavailable. To make the decision whether a region is failed or not, Metasrv uses a failure detection algorithm called the "[Phi φ Accrual Failure Detection](https://medium.com/@arpitbhayani/phi-%CF%86-accrual-failure-detection-79c21ce53a7a)". Basically, the algorithm calculates a value called "phi" to represent the possibility of a region's unavailability, based on the historical heartbeats' arrived rate. Once the "phi" is above some pre-defined threshold, Metasrv knows the region is failed.
> This algorithm has been widely adopted in some well known products, like Akka and Cassandra.
When Metasrv decides some region is failed from heartbeats, it's not the final decision. Here comes the "actively" detection. Before Metasrv decides to do region failover, it actively invokes the healthy check interface of the Datanode that the failure region resides. Only this healthy check is failed does Metasrv actually start doing failover upon the region.
To conclude, the failure detection pseudo-codes are like this:
```rust
// in Metasrv:
fn failure_detection() {
loop {
// passive detection
let failed_regions = all_regions.iter().filter(|r| r.estimated_failure_possibility() > config.phi).collect();
// find the datanodes that contains the failed regions
let datanodes_and_regions = find_region_resides_datanodes(failed_regions);
// active detection
for (datanode, regions) in datanodes_and_regions {
if !datanode.is_healthy(regions) {
do_failover(datanode, regions);
}
}
sleep(config.detect_interval);
}
}
```
Some design considerations:
- Why active detecting while we have passively detection? Because it could be happened that the network is singly connectable sometimes (especially in the complex Cloud environment), then the Datanode's heartbeats cannot reach Metasrv, while Metasrv could request Datanode. Active detecting avoid this false positive situation.
- Why the detection works on region instead of Datanode? Because we might face the possibility that only part of the regions in the Datanode are not available, not ALL regions. Especially the situation that Datanodes are used by multiple tenants. If this is the case, it's better to do failover upon the designated regions instead of the whole regions that reside on the Datanode. All in all, we want a more subtle control over region failover.
So we detect some regions are not available. How to regain the availability back?
## Region Failover
Region Failover largely relies on remote WAL, aka "[Bunshin](https://github.com/GreptimeTeam/bunshin)". I'm not including any of the details of it in this RFC, let's just assume we already have it.
In general, region failover is fairly simple. Once Metasrv decides to do failover upon some regions, it first chooses one or more Datanodes to hold the failed region. This can be done easily, as the Metasrv already has the whole picture of Datanodes: it knows which Datanode has the minimum regions, what Datanode historically had the lowest CPU usage and IO rate, and how the Datanodes are assigned to tenants, among other information that can all help the Metasrv choose the most suitable Datanodes. Let's call these chosen Datanodes as "candidates".
> The strategy to choose the most suitable candidates required careful design, but it's another RFC.
Then, Metasrv sets the states of these failed regions as "passive". We should add a field to `Region`:
```protobuf
message Region {
uint64 id = 1;
string name = 2;
Partition partition = 3;
message State {
Active,
Passive,
}
State state = 4;
map<string, string> attrs = 100;
}
```
Here `Region` is used in message `RegionRoute`, which indicates how the write request is split among regions. When a region is set as "passive", Frontend knows the write to it should be rejected at the moment (the region read is not blocked, however).
> Making a region "passive" here is effectively blocking the write to it. It's ok in the failover situation, the region is failed anyway. However, when dealing with active maintenance operations, region state requires more refined design. But that's another story.
Third, Metasrv fires the "close region" requests to the failed Datanodes, and fires the "open region" requests to those candidates. "Close region" requests might be failed due to the unavailability of Datanodes, but that's fine, it's just a best-effort attempt to reduce the chance of any in-flight writes got handled unintentionally after the region is set as "passive". The "open region" requests must have succeeded though. Datanodes open regions from remote WAL.
> Currently the "close region" is undefined in Datanode. It could be a local cache clean up of region data or other resources tidy up.
Finally, when a candidate successfully opens its region, it calls back to Metasrv, indicating it is ready to handle region. "call back" here is backed by its heartbeat to Metasrv. Metasrv updates the region's state to "active", so as to let Frontend lifts the restrictions of region writes (again, the read part of region is untouched).
All the above steps should be managed by remote procedure framework. It's another implementation challenge in the region failover feature. (One is the remote WAL of course.)
A picture is worth a 1000 words:
```text
+-------------------------+
| Metasrv detects region |
| failure |
+-------------------------+
|
v
+----------------------------+
| Metasrv chooses candidates |
| to hold failed regions |
+----------------------------+
|
v
+-------------------------+ +-------------------------+
| Metasrv "passive" the |------>| Frontend rejects writes |
| failed regions | | to "passive" regions |
+-------------------------+ +-------------------------+
|
v
+--------------------------+ +---------------------------+
| Candidate Datanodes open |<-------| Metasrv fires "close" and |
| regions from remote WAL | | "open" region requests |
+--------------------------+ +---------------------------+
|
|
| +-------------------------+ +-------------------------+
+--------------------->| Metasrv "active" the |------>| Frontend lifts write |
| failed regions | | restriction to regions |
+-------------------------+ +-------------------------+
|
v
+-------------------------+
| Region failover done, |
| HA regain |
+-------------------------+
```
# Alternatives
## The "Neon" Way
Remote WAL raises a problem that could harm the write throughput of GreptimeDB cluster: each write request has to do at least two remote call, one is from Frontend to Datanode, and one is from Datanode to remote WAL. What if we do it the "[Neon](https://github.com/neondatabase/neon)" way, making remote WAL sits in between the Frontend and Datanode, couldn't that improve our write throughput? It could, though there're some consistency issues like "read-your-writes" to solve.
However, the main concerns we don't adopt this method are two-fold:
1. Remote WAL is planned to be quorum based, it can be efficiently written;
2. More importantly, we are planning to make the remote WAL an option that users could choose not to enable it (at the cost of some reliability reduction).
## No WAL, Replication instead
This method replicates region across Datanodes directly, like the common way in shared-nothing database. Were the main region failed, a standby region in the replicate group is elected as new "main" and take the read/write requests. The main concern to this method is the incompatibility to our current architecture and code structure. It requires a major redesign, but gains no significant advantage over the remote WAL method.
However, the replication does have its own advantage that we can learn from to optimize this failover procedure.
# Future Work
Some optimizations we could take:
- To reduce the MTTR, we could make Metasrv chooses the candidate to each region at normal time. The candidate does some preparation works to reduce the open region time, effectively accelerate the failover procedure.
- We can adopt the replication method, to the degree that region replicas are used as the fast catch-up candidates. The data difference among replicas is minor, region failover does not need to load or exchange too much data, greatly reduced the region failover time.

View File

@@ -10,7 +10,7 @@ common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
common-time = { path = "../common/time" }
datatypes = { path = "../datatypes" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "3a715150563b89d5dfc81a5838eac1f66a5658a1" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "0a7b790ed41364b5599dff806d1080bd59c5c9f6" }
prost.workspace = true
snafu = { version = "0.7", features = ["backtraces"] }
tonic.workspace = true

View File

@@ -204,21 +204,6 @@ pub enum Error {
#[snafu(display("Illegal access to catalog: {} and schema: {}", catalog, schema))]
QueryAccessDenied { catalog: String, schema: String },
#[snafu(display(
"Failed to get region stats, catalog: {}, schema: {}, table: {}, source: {}",
catalog,
schema,
table,
source
))]
RegionStats {
catalog: String,
schema: String,
table: String,
#[snafu(backtrace)]
source: table::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -253,8 +238,7 @@ impl ErrorExt for Error {
| Error::InsertCatalogRecord { source, .. }
| Error::OpenTable { source, .. }
| Error::CreateTable { source, .. }
| Error::DeregisterTable { source, .. }
| Error::RegionStats { source, .. } => source.status_code(),
| Error::DeregisterTable { source, .. } => source.status_code(),
Error::MetaSrv { source, .. } => source.status_code(),
Error::SystemCatalogTableScan { source } => source.status_code(),

View File

@@ -18,7 +18,6 @@ use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use api::v1::meta::{RegionStat, TableName};
use common_telemetry::info;
use snafu::{OptionExt, ResultExt};
use table::engine::{EngineContext, TableEngineRef};
@@ -226,10 +225,10 @@ pub(crate) async fn handle_system_table_request<'a, M: CatalogManager>(
Ok(())
}
/// The stat of regions in the datanode node.
/// The number of regions can be got from len of vec.
pub async fn region_stats(catalog_manager: &CatalogManagerRef) -> Result<Vec<RegionStat>> {
let mut region_stats = Vec::new();
/// The number of regions in the datanode node.
pub async fn region_number(catalog_manager: &CatalogManagerRef) -> Result<u64> {
let mut region_number: u64 = 0;
for catalog_name in catalog_manager.catalog_names()? {
let catalog =
catalog_manager
@@ -255,28 +254,10 @@ pub async fn region_stats(catalog_manager: &CatalogManagerRef) -> Result<Vec<Reg
table_info: &table_name,
})?;
region_stats.extend(
table
.region_stats()
.context(error::RegionStatsSnafu {
catalog: &catalog_name,
schema: &schema_name,
table: &table_name,
})?
.into_iter()
.map(|stat| RegionStat {
region_id: stat.region_id,
table_name: Some(TableName {
catalog_name: catalog_name.clone(),
schema_name: schema_name.clone(),
table_name: table_name.clone(),
}),
approximate_bytes: stat.disk_usage_bytes as i64,
..Default::default()
}),
);
let region_numbers = &table.table_info().meta.region_numbers;
region_number += region_numbers.len() as u64;
}
}
}
Ok(region_stats)
Ok(region_number)
}

View File

@@ -14,6 +14,7 @@
use std::sync::Arc;
use api::v1::greptime_database_client::GreptimeDatabaseClient;
use arrow_flight::flight_service_client::FlightServiceClient;
use common_grpc::channel_manager::ChannelManager;
use parking_lot::RwLock;
@@ -23,6 +24,10 @@ use tonic::transport::Channel;
use crate::load_balance::{LoadBalance, Loadbalancer};
use crate::{error, Result};
pub(crate) struct DatabaseClient {
pub(crate) inner: GreptimeDatabaseClient<Channel>,
}
pub(crate) struct FlightClient {
addr: String,
client: FlightServiceClient<Channel>,
@@ -118,7 +123,7 @@ impl Client {
self.inner.set_peers(urls);
}
pub(crate) fn make_client(&self) -> Result<FlightClient> {
fn find_channel(&self) -> Result<(String, Channel)> {
let addr = self
.inner
.get_peer()
@@ -131,11 +136,23 @@ impl Client {
.channel_manager
.get(&addr)
.context(error::CreateChannelSnafu { addr: &addr })?;
Ok((addr, channel))
}
pub(crate) fn make_flight_client(&self) -> Result<FlightClient> {
let (addr, channel) = self.find_channel()?;
Ok(FlightClient {
addr,
client: FlightServiceClient::new(channel),
})
}
pub(crate) fn make_database_client(&self) -> Result<DatabaseClient> {
let (_, channel) = self.find_channel()?;
Ok(DatabaseClient {
inner: GreptimeDatabaseClient::new(channel),
})
}
}
#[cfg(test)]

View File

@@ -12,15 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::str::FromStr;
use api::v1::auth_header::AuthScheme;
use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
use api::v1::{
AlterExpr, AuthHeader, CreateTableExpr, DdlRequest, DropTableExpr, FlushTableExpr,
GreptimeRequest, InsertRequest, PromRangeQuery, QueryRequest, RequestHeader,
greptime_response, AffectedRows, AlterExpr, AuthHeader, CreateTableExpr, DdlRequest,
DropTableExpr, FlushTableExpr, GreptimeRequest, InsertRequest, PromRangeQuery, QueryRequest,
RequestHeader,
};
use arrow_flight::{FlightData, Ticket};
use common_error::prelude::*;
@@ -31,7 +30,9 @@ use futures_util::{TryFutureExt, TryStreamExt};
use prost::Message;
use snafu::{ensure, ResultExt};
use crate::error::{ConvertFlightDataSnafu, IllegalFlightMessagesSnafu};
use crate::error::{
ConvertFlightDataSnafu, IllegalDatabaseResponseSnafu, IllegalFlightMessagesSnafu,
};
use crate::{error, Client, Result};
#[derive(Clone, Debug)]
@@ -78,8 +79,26 @@ impl Database {
});
}
pub async fn insert(&self, request: InsertRequest) -> Result<Output> {
self.do_get(Request::Insert(request)).await
pub async fn insert(&self, request: InsertRequest) -> Result<u32> {
let mut client = self.client.make_database_client()?.inner;
let request = GreptimeRequest {
header: Some(RequestHeader {
catalog: self.catalog.clone(),
schema: self.schema.clone(),
authorization: self.ctx.auth_header.clone(),
}),
request: Some(Request::Insert(request)),
};
let response = client
.handle(request)
.await?
.into_inner()
.response
.context(IllegalDatabaseResponseSnafu {
err_msg: "GreptimeResponse is empty",
})?;
let greptime_response::Response::AffectedRows(AffectedRows { value }) = response;
Ok(value)
}
pub async fn sql(&self, sql: &str) -> Result<Output> {
@@ -155,7 +174,7 @@ impl Database {
ticket: request.encode_to_vec().into(),
};
let mut client = self.client.make_client()?;
let mut client = self.client.make_flight_client()?;
// TODO(LFC): Streaming get flight data.
let flight_data: Vec<FlightData> = client
@@ -164,22 +183,22 @@ impl Database {
.and_then(|response| response.into_inner().try_collect())
.await
.map_err(|e| {
let code = get_metadata_value(&e, INNER_ERROR_CODE)
.and_then(|s| StatusCode::from_str(&s).ok())
.unwrap_or(StatusCode::Unknown);
let msg = get_metadata_value(&e, INNER_ERROR_MSG).unwrap_or(e.to_string());
error::ExternalSnafu { code, msg }
let tonic_code = e.code();
let e: error::Error = e.into();
let code = e.status_code();
let msg = e.to_string();
error::ServerSnafu { code, msg }
.fail::<()>()
.map_err(BoxedError::new)
.context(error::FlightGetSnafu {
tonic_code: e.code(),
tonic_code,
addr: client.addr(),
})
.map_err(|error| {
logging::error!(
"Failed to do Flight get, addr: {}, code: {}, source: {}",
client.addr(),
e.code(),
tonic_code,
error
);
error
@@ -210,12 +229,6 @@ impl Database {
}
}
fn get_metadata_value(e: &tonic::Status, key: &str) -> Option<String> {
e.metadata()
.get(key)
.and_then(|v| String::from_utf8(v.as_bytes().to_vec()).ok())
}
#[derive(Default, Debug, Clone)]
pub struct FlightContext {
auth_header: Option<AuthHeader>,

View File

@@ -13,9 +13,10 @@
// limitations under the License.
use std::any::Any;
use std::str::FromStr;
use common_error::prelude::*;
use tonic::Code;
use tonic::{Code, Status};
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
@@ -68,6 +69,13 @@ pub enum Error {
/// Error deserialized from gRPC metadata
#[snafu(display("{}", msg))]
ExternalError { code: StatusCode, msg: String },
// Server error carried in Tonic Status's metadata.
#[snafu(display("{}", msg))]
Server { code: StatusCode, msg: String },
#[snafu(display("Illegal Database response: {err_msg}"))]
IllegalDatabaseResponse { err_msg: String },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -77,7 +85,10 @@ impl ErrorExt for Error {
match self {
Error::IllegalFlightMessages { .. }
| Error::ColumnDataType { .. }
| Error::MissingField { .. } => StatusCode::Internal,
| Error::MissingField { .. }
| Error::IllegalDatabaseResponse { .. } => StatusCode::Internal,
Error::Server { code, .. } => *code,
Error::FlightGet { source, .. } => source.status_code(),
Error::CreateChannel { source, .. } | Error::ConvertFlightData { source } => {
source.status_code()
@@ -95,3 +106,21 @@ impl ErrorExt for Error {
self
}
}
impl From<Status> for Error {
fn from(e: Status) -> Self {
fn get_metadata_value(e: &Status, key: &str) -> Option<String> {
e.metadata()
.get(key)
.and_then(|v| String::from_utf8(v.as_bytes().to_vec()).ok())
}
let code = get_metadata_value(&e, INNER_ERROR_CODE)
.and_then(|s| StatusCode::from_str(&s).ok())
.unwrap_or(StatusCode::Unknown);
let msg = get_metadata_value(&e, INNER_ERROR_MSG).unwrap_or(e.to_string());
Self::Server { code, msg }
}
}

View File

@@ -150,6 +150,7 @@ impl TryFrom<StartCommand> for DatanodeOptions {
if let Some(wal_dir) = cmd.wal_dir {
opts.wal.dir = wal_dir;
}
if let Some(procedure_dir) = cmd.procedure_dir {
opts.procedure = Some(ProcedureConfig::from_file_path(procedure_dir));
}

View File

@@ -26,6 +26,12 @@ pub enum Error {
source: datanode::error::Error,
},
#[snafu(display("Failed to stop datanode, source: {}", source))]
StopDatanode {
#[snafu(backtrace)]
source: BoxedError,
},
#[snafu(display("Failed to start frontend, source: {}", source))]
StartFrontend {
#[snafu(backtrace)]
@@ -163,6 +169,7 @@ impl ErrorExt for Error {
source.status_code()
}
Error::SubstraitEncodeLogicalPlan { source } => source.status_code(),
Error::StopDatanode { source } => source.status_code(),
}
}

View File

@@ -16,6 +16,7 @@ use std::sync::Arc;
use clap::Parser;
use common_base::Plugins;
use common_error::prelude::BoxedError;
use common_telemetry::info;
use datanode::datanode::{
CompactionConfig, Datanode, DatanodeOptions, ObjectStoreConfig, ProcedureConfig, WalConfig,
@@ -36,7 +37,9 @@ use servers::tls::{TlsMode, TlsOption};
use servers::Mode;
use snafu::ResultExt;
use crate::error::{Error, IllegalConfigSnafu, Result, StartDatanodeSnafu, StartFrontendSnafu};
use crate::error::{
Error, IllegalConfigSnafu, Result, StartDatanodeSnafu, StartFrontendSnafu, StopDatanodeSnafu,
};
use crate::frontend::load_frontend_plugins;
use crate::toml_loader;
@@ -152,7 +155,17 @@ impl Instance {
}
pub async fn stop(&self) -> Result<()> {
// TODO: handle standalone shutdown
self.datanode
.shutdown()
.await
.map_err(BoxedError::new)
.context(StopDatanodeSnafu)?;
self.frontend
.shutdown()
.await
.map_err(BoxedError::new)
.context(StopDatanodeSnafu)?;
Ok(())
}
}

View File

@@ -12,10 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
mod from_unixtime;
use from_unixtime::FromUnixtimeFunction;
use crate::scalars::function_registry::FunctionRegistry;
pub(crate) struct TimestampFunction;
impl TimestampFunction {
pub fn register(_registry: &FunctionRegistry) {}
pub fn register(registry: &FunctionRegistry) {
registry.register(Arc::new(FromUnixtimeFunction::default()));
}
}

View File

@@ -0,0 +1,133 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! from_unixtime function.
/// TODO(dennis) It can be removed after we upgrade datafusion.
use std::fmt;
use std::sync::Arc;
use common_query::error::{
ArrowComputeSnafu, IntoVectorSnafu, Result, TypeCastSnafu, UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::{Signature, Volatility};
use datatypes::arrow::compute;
use datatypes::arrow::datatypes::{DataType as ArrowDatatype, Int64Type};
use datatypes::data_type::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::vectors::{TimestampMillisecondVector, VectorRef};
use snafu::ResultExt;
use crate::scalars::function::{Function, FunctionContext};
#[derive(Clone, Debug, Default)]
pub struct FromUnixtimeFunction;
const NAME: &str = "from_unixtime";
impl Function for FromUnixtimeFunction {
fn name(&self) -> &str {
NAME
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::timestamp_millisecond_datatype())
}
fn signature(&self) -> Signature {
Signature::uniform(
1,
vec![ConcreteDataType::int64_datatype()],
Volatility::Immutable,
)
}
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
match columns[0].data_type() {
ConcreteDataType::Int64(_) => {
let array = columns[0].to_arrow_array();
// Our timestamp vector's time unit is millisecond
let array = compute::multiply_scalar_dyn::<Int64Type>(&array, 1000i64)
.context(ArrowComputeSnafu)?;
let arrow_datatype = &self.return_type(&[]).unwrap().as_arrow_type();
Ok(Arc::new(
TimestampMillisecondVector::try_from_arrow_array(
compute::cast(&array, arrow_datatype).context(TypeCastSnafu {
typ: ArrowDatatype::Int64,
})?,
)
.context(IntoVectorSnafu {
data_type: arrow_datatype.clone(),
})?,
))
}
_ => UnsupportedInputDataTypeSnafu {
function: NAME,
datatypes: columns.iter().map(|c| c.data_type()).collect::<Vec<_>>(),
}
.fail(),
}
}
}
impl fmt::Display for FromUnixtimeFunction {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "FROM_UNIXTIME")
}
}
#[cfg(test)]
mod tests {
use common_query::prelude::TypeSignature;
use datatypes::value::Value;
use datatypes::vectors::Int64Vector;
use super::*;
#[test]
fn test_from_unixtime() {
let f = FromUnixtimeFunction::default();
assert_eq!("from_unixtime", f.name());
assert_eq!(
ConcreteDataType::timestamp_millisecond_datatype(),
f.return_type(&[]).unwrap()
);
assert!(matches!(f.signature(),
Signature {
type_signature: TypeSignature::Uniform(1, valid_types),
volatility: Volatility::Immutable
} if valid_types == vec![ConcreteDataType::int64_datatype()]
));
let times = vec![Some(1494410783), None, Some(1494410983)];
let args: Vec<VectorRef> = vec![Arc::new(Int64Vector::from(times.clone()))];
let vector = f.eval(FunctionContext::default(), &args).unwrap();
assert_eq!(3, vector.len());
for (i, t) in times.iter().enumerate() {
let v = vector.get(i);
if i == 1 {
assert_eq!(Value::Null, v);
continue;
}
match v {
Value::Timestamp(ts) => {
assert_eq!(ts.value(), t.unwrap() * 1000);
}
_ => unreachable!(),
}
}
}
}

View File

@@ -14,7 +14,6 @@ object-store = { path = "../../object-store" }
serde.workspace = true
serde_json = "1.0"
smallvec = "1"
backon = "0.4.0"
snafu.workspace = true
tokio.workspace = true
uuid.workspace = true

View File

@@ -97,16 +97,6 @@ pub enum Error {
source: Arc<Error>,
backtrace: Backtrace,
},
#[snafu(display(
"Procedure retry exceeded max times, procedure_id: {}, source:{}",
procedure_id,
source
))]
RetryTimesExceeded {
source: Arc<Error>,
procedure_id: ProcedureId,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -121,7 +111,6 @@ impl ErrorExt for Error {
| Error::ListState { .. }
| Error::ReadState { .. }
| Error::FromJson { .. }
| Error::RetryTimesExceeded { .. }
| Error::RetryLater { .. }
| Error::WaitWatcher { .. } => StatusCode::Internal,
Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => {

View File

@@ -17,10 +17,8 @@ mod runner;
use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
use async_trait::async_trait;
use backon::ExponentialBuilder;
use common_telemetry::logging;
use object_store::ObjectStore;
use snafu::ensure;
@@ -293,16 +291,12 @@ impl ManagerContext {
pub struct ManagerConfig {
/// Object store
pub object_store: ObjectStore,
pub max_retry_times: usize,
pub retry_delay: Duration,
}
/// A [ProcedureManager] that maintains procedure states locally.
pub struct LocalManager {
manager_ctx: Arc<ManagerContext>,
state_store: StateStoreRef,
max_retry_times: usize,
retry_delay: Duration,
}
impl LocalManager {
@@ -311,8 +305,6 @@ impl LocalManager {
LocalManager {
manager_ctx: Arc::new(ManagerContext::new()),
state_store: Arc::new(ObjectStateStore::new(config.object_store)),
max_retry_times: config.max_retry_times,
retry_delay: config.retry_delay,
}
}
@@ -329,11 +321,7 @@ impl LocalManager {
procedure,
manager_ctx: self.manager_ctx.clone(),
step,
exponential_builder: ExponentialBuilder::default()
.with_min_delay(self.retry_delay)
.with_max_times(self.max_retry_times),
store: ProcedureStore::new(self.state_store.clone()),
rolling_back: false,
};
let watcher = meta.state_receiver.clone();
@@ -555,8 +543,6 @@ mod tests {
let dir = create_temp_dir("register");
let config = ManagerConfig {
object_store: test_util::new_object_store(&dir),
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
};
let manager = LocalManager::new(config);
@@ -576,8 +562,6 @@ mod tests {
let object_store = test_util::new_object_store(&dir);
let config = ManagerConfig {
object_store: object_store.clone(),
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
};
let manager = LocalManager::new(config);
@@ -622,8 +606,6 @@ mod tests {
let dir = create_temp_dir("submit");
let config = ManagerConfig {
object_store: test_util::new_object_store(&dir),
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
};
let manager = LocalManager::new(config);
@@ -670,8 +652,6 @@ mod tests {
let dir = create_temp_dir("on_err");
let config = ManagerConfig {
object_store: test_util::new_object_store(&dir),
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
};
let manager = LocalManager::new(config);

View File

@@ -15,15 +15,15 @@
use std::sync::Arc;
use std::time::Duration;
use backon::{BackoffBuilder, ExponentialBuilder};
use common_telemetry::logging;
use tokio::time;
use crate::error::{ProcedurePanicSnafu, Result};
use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef};
use crate::store::ProcedureStore;
use crate::ProcedureState::Retrying;
use crate::{BoxedProcedure, Context, Error, ProcedureId, ProcedureState, ProcedureWithId, Status};
use crate::{BoxedProcedure, Context, ProcedureId, ProcedureState, ProcedureWithId, Status};
const ERR_WAIT_DURATION: Duration = Duration::from_secs(30);
#[derive(Debug)]
enum ExecResult {
@@ -108,9 +108,7 @@ pub(crate) struct Runner {
pub(crate) procedure: BoxedProcedure,
pub(crate) manager_ctx: Arc<ManagerContext>,
pub(crate) step: u32,
pub(crate) exponential_builder: ExponentialBuilder,
pub(crate) store: ProcedureStore,
pub(crate) rolling_back: bool,
}
impl Runner {
@@ -166,56 +164,18 @@ impl Runner {
provider: self.manager_ctx.clone(),
};
self.rolling_back = false;
self.execute_once_with_retry(&ctx).await;
}
async fn execute_once_with_retry(&mut self, ctx: &Context) {
let mut retry = self.exponential_builder.build();
let mut retry_times = 0;
loop {
match self.execute_once(ctx).await {
ExecResult::Done | ExecResult::Failed => return,
match self.execute_once(&ctx).await {
ExecResult::Continue => (),
ExecResult::Done | ExecResult::Failed => return,
ExecResult::RetryLater => {
retry_times += 1;
if let Some(d) = retry.next() {
self.wait_on_err(d, retry_times).await;
} else {
assert!(self.meta.state().is_retrying());
if let Retrying { error } = self.meta.state() {
self.meta.set_state(ProcedureState::failed(Arc::new(
Error::RetryTimesExceeded {
source: error,
procedure_id: self.meta.id,
},
)))
}
return;
}
self.wait_on_err().await;
}
}
}
}
async fn rollback(&mut self, error: Arc<Error>) -> ExecResult {
if let Err(e) = self.rollback_procedure().await {
self.rolling_back = true;
self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
return ExecResult::RetryLater;
}
self.meta.set_state(ProcedureState::failed(error));
ExecResult::Failed
}
async fn execute_once(&mut self, ctx: &Context) -> ExecResult {
// if rolling_back, there is no need to execute again.
if self.rolling_back {
// We can definitely get the previous error here.
let state = self.meta.state();
let err = state.error().unwrap();
return self.rollback(err.clone()).await;
}
match self.procedure.execute(ctx).await {
Ok(status) => {
logging::debug!(
@@ -226,11 +186,8 @@ impl Runner {
status.need_persist(),
);
if status.need_persist() {
if let Err(err) = self.persist_procedure().await {
self.meta.set_state(ProcedureState::retrying(Arc::new(err)));
return ExecResult::RetryLater;
}
if status.need_persist() && self.persist_procedure().await.is_err() {
return ExecResult::RetryLater;
}
match status {
@@ -239,8 +196,7 @@ impl Runner {
self.on_suspended(subprocedures).await;
}
Status::Done => {
if let Err(e) = self.commit_procedure().await {
self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
if self.commit_procedure().await.is_err() {
return ExecResult::RetryLater;
}
@@ -261,12 +217,17 @@ impl Runner {
);
if e.is_retry_later() {
self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
return ExecResult::RetryLater;
}
self.meta.set_state(ProcedureState::failed(Arc::new(e)));
// Write rollback key so we can skip this procedure while recovering procedures.
self.rollback(Arc::new(e)).await
if self.rollback_procedure().await.is_err() {
return ExecResult::RetryLater;
}
ExecResult::Failed
}
}
}
@@ -300,9 +261,7 @@ impl Runner {
procedure,
manager_ctx: self.manager_ctx.clone(),
step,
exponential_builder: self.exponential_builder.clone(),
store: self.store.clone(),
rolling_back: false,
};
// Insert the procedure. We already check the procedure existence before inserting
@@ -326,16 +285,8 @@ impl Runner {
});
}
/// Extend the retry time to wait for the next retry.
async fn wait_on_err(&self, d: Duration, i: u64) {
logging::info!(
"Procedure {}-{} retry for the {} times after {} millis",
self.procedure.type_name(),
self.meta.id,
i,
d.as_millis(),
);
time::sleep(d).await;
async fn wait_on_err(&self) {
time::sleep(ERR_WAIT_DURATION).await;
}
async fn on_suspended(&self, subprocedures: Vec<ProcedureWithId>) {
@@ -465,9 +416,7 @@ mod tests {
procedure,
manager_ctx: Arc::new(ManagerContext::new()),
step: 0,
exponential_builder: ExponentialBuilder::default(),
store,
rolling_back: false,
}
}
@@ -795,7 +744,7 @@ mod tests {
let res = runner.execute_once(&ctx).await;
assert!(res.is_retry_later(), "{res:?}");
assert!(meta.state().is_retrying());
assert!(meta.state().is_running());
let res = runner.execute_once(&ctx).await;
assert!(res.is_done(), "{res:?}");
@@ -803,36 +752,6 @@ mod tests {
check_files(&object_store, ctx.procedure_id, &["0000000000.commit"]).await;
}
#[tokio::test]
async fn test_execute_exceed_max_retry_later() {
let exec_fn =
|_| async { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed();
let exceed_max_retry_later = ProcedureAdapter {
data: "exceed_max_retry_later".to_string(),
lock_key: LockKey::single("catalog.schema.table"),
exec_fn,
};
let dir = create_temp_dir("exceed_max_retry_later");
let meta = exceed_max_retry_later.new_meta(ROOT_ID);
let object_store = test_util::new_object_store(&dir);
let procedure_store = ProcedureStore::from(object_store.clone());
let mut runner = new_runner(
meta.clone(),
Box::new(exceed_max_retry_later),
procedure_store,
);
runner.exponential_builder = ExponentialBuilder::default()
.with_min_delay(Duration::from_millis(1))
.with_max_times(3);
// Run the runner and execute the procedure.
runner.execute_procedure_in_loop().await;
let err = meta.state().error().unwrap().to_string();
assert!(err.contains("Procedure retry exceeded max times"));
}
#[tokio::test]
async fn test_child_error() {
let mut times = 0;
@@ -900,7 +819,7 @@ mod tests {
// Replace the manager ctx.
runner.manager_ctx = manager_ctx;
// Run the runner and execute the procedure.
// Run the runer and execute the procedure.
runner.run().await;
let err = meta.state().error().unwrap().to_string();
assert!(err.contains("subprocedure failed"), "{err}");

View File

@@ -206,8 +206,6 @@ pub enum ProcedureState {
Running,
/// The procedure is finished.
Done,
/// The procedure is failed and can be retried.
Retrying { error: Arc<Error> },
/// The procedure is failed and cannot proceed anymore.
Failed { error: Arc<Error> },
}
@@ -218,11 +216,6 @@ impl ProcedureState {
ProcedureState::Failed { error }
}
/// Returns a [ProcedureState] with retrying state.
pub fn retrying(error: Arc<Error>) -> ProcedureState {
ProcedureState::Retrying { error }
}
/// Returns true if the procedure state is running.
pub fn is_running(&self) -> bool {
matches!(self, ProcedureState::Running)
@@ -238,16 +231,10 @@ impl ProcedureState {
matches!(self, ProcedureState::Failed { .. })
}
/// Returns true if the procedure state is retrying.
pub fn is_retrying(&self) -> bool {
matches!(self, ProcedureState::Retrying { .. })
}
/// Returns the error.
pub fn error(&self) -> Option<&Arc<Error>> {
match self {
ProcedureState::Failed { error } => Some(error),
ProcedureState::Retrying { error } => Some(error),
_ => None,
}
}

View File

@@ -33,9 +33,6 @@ pub async fn wait(watcher: &mut Watcher) -> Result<()> {
ProcedureState::Failed { error } => {
return Err(error.clone()).context(ProcedureExecSnafu);
}
ProcedureState::Retrying { error } => {
return Err(error.clone()).context(ProcedureExecSnafu);
}
}
}
}

View File

@@ -64,7 +64,6 @@ tonic.workspace = true
tower = { version = "0.4", features = ["full"] }
tower-http = { version = "0.3", features = ["full"] }
url = "2.3.1"
uuid.workspace = true
[dev-dependencies]
axum-test-helper = { git = "https://github.com/sunng87/axum-test-helper.git", branch = "patch-1" }

View File

@@ -151,22 +151,11 @@ impl From<&DatanodeOptions> for StorageEngineConfig {
pub struct ProcedureConfig {
/// Storage config for procedure manager.
pub store: ObjectStoreConfig,
/// Max retry times of procedure.
pub max_retry_times: usize,
/// Initial retry delay of procedures, increases exponentially.
#[serde(with = "humantime_serde")]
pub retry_delay: Duration,
}
impl Default for ProcedureConfig {
fn default() -> ProcedureConfig {
ProcedureConfig {
store: ObjectStoreConfig::File(FileConfig {
data_dir: "/tmp/greptimedb/procedure/".to_string(),
}),
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
}
ProcedureConfig::from_file_path("/tmp/greptimedb/procedure/".to_string())
}
}
@@ -174,7 +163,6 @@ impl ProcedureConfig {
pub fn from_file_path(path: String) -> ProcedureConfig {
ProcedureConfig {
store: ObjectStoreConfig::File(FileConfig { data_dir: path }),
..Default::default()
}
}
}

View File

@@ -17,7 +17,7 @@ use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::{HeartbeatRequest, HeartbeatResponse, NodeStat, Peer};
use catalog::{region_stats, CatalogManagerRef};
use catalog::{region_number, CatalogManagerRef};
use common_telemetry::{error, info, warn};
use meta_client::client::{HeartbeatSender, MetaClient};
use snafu::ResultExt;
@@ -106,11 +106,11 @@ impl HeartbeatTask {
let mut tx = Self::create_streams(&meta_client, running.clone()).await?;
common_runtime::spawn_bg(async move {
while running.load(Ordering::Acquire) {
let (region_num, region_stats) = match region_stats(&catalog_manager_clone).await {
Ok(region_stats) => (region_stats.len() as i64, region_stats),
let region_num = match region_number(&catalog_manager_clone).await {
Ok(region_num) => region_num as i64,
Err(e) => {
error!("failed to get region status, err: {e:?}");
(-1, vec![])
error!("failed to get region number, err: {e:?}");
-1
}
};
@@ -123,7 +123,6 @@ impl HeartbeatTask {
region_num,
..Default::default()
}),
region_stats,
..Default::default()
};

View File

@@ -37,12 +37,14 @@ use object_store::services::{Fs as FsBuilder, Oss as OSSBuilder, S3 as S3Builder
use object_store::{util, ObjectStore, ObjectStoreBuilder};
use query::query_engine::{QueryEngineFactory, QueryEngineRef};
use servers::Mode;
use session::context::QueryContext;
use snafu::prelude::*;
use storage::compaction::{CompactionHandler, CompactionSchedulerRef, SimplePicker};
use storage::config::EngineConfig as StorageEngineConfig;
use storage::scheduler::{LocalScheduler, SchedulerConfig};
use storage::EngineImpl;
use store_api::logstore::LogStore;
use table::requests::FlushTableRequest;
use table::table::numbers::NumbersTable;
use table::table::TableIdProviderRef;
use table::Table;
@@ -56,7 +58,7 @@ use crate::error::{
};
use crate::heartbeat::HeartbeatTask;
use crate::script::ScriptExecutor;
use crate::sql::SqlHandler;
use crate::sql::{SqlHandler, SqlRequest};
mod grpc;
mod script;
@@ -233,6 +235,8 @@ impl Instance {
.context(ShutdownInstanceSnafu)?;
}
self.flush_tables().await?;
self.sql_handler
.close()
.await
@@ -240,6 +244,42 @@ impl Instance {
.context(ShutdownInstanceSnafu)
}
pub async fn flush_tables(&self) -> Result<()> {
info!("going to flush all schemas");
let schema_list = self
.catalog_manager
.catalog(DEFAULT_CATALOG_NAME)
.map_err(BoxedError::new)
.context(ShutdownInstanceSnafu)?
.expect("Default schema not found")
.schema_names()
.map_err(BoxedError::new)
.context(ShutdownInstanceSnafu)?;
let flush_requests = schema_list
.into_iter()
.map(|schema_name| {
SqlRequest::FlushTable(FlushTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name,
table_name: None,
region_number: None,
})
})
.collect::<Vec<_>>();
let flush_result = futures::future::try_join_all(
flush_requests
.into_iter()
.map(|request| self.sql_handler.execute(request, QueryContext::arc())),
)
.await
.map_err(BoxedError::new)
.context(ShutdownInstanceSnafu);
info!("flush success: {}", flush_result.is_ok());
flush_result?;
Ok(())
}
pub fn sql_handler(&self) -> &SqlHandler {
&self.sql_handler
}
@@ -460,11 +500,7 @@ pub(crate) async fn create_procedure_manager(
);
let object_store = new_object_store(&procedure_config.store).await?;
let manager_config = ManagerConfig {
object_store,
max_retry_times: procedure_config.max_retry_times,
retry_delay: procedure_config.retry_delay,
};
let manager_config = ManagerConfig { object_store };
Ok(Some(Arc::new(LocalManager::new(manager_config))))
}

View File

@@ -163,14 +163,14 @@ impl Instance {
QueryStatement::Sql(Statement::Copy(copy_table)) => match copy_table {
CopyTable::To(copy_table) => {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(&copy_table.table_name, query_ctx.clone())?;
let file_name = copy_table.file_name;
table_idents_to_full_name(copy_table.table_name(), query_ctx.clone())?;
let file_name = copy_table.file_name().to_string();
let req = CopyTableRequest {
catalog_name,
schema_name,
table_name,
file_name,
connection: copy_table.connection,
};
self.sql_handler

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::pin::Pin;
use common_query::physical_plan::SessionContext;
@@ -23,54 +22,16 @@ use datafusion::parquet::basic::{Compression, Encoding};
use datafusion::parquet::file::properties::WriterProperties;
use datafusion::physical_plan::RecordBatchStream;
use futures::TryStreamExt;
use object_store::ObjectStore;
use object_store::services::Fs as Builder;
use object_store::{ObjectStore, ObjectStoreBuilder};
use snafu::ResultExt;
use table::engine::TableReference;
use table::requests::CopyTableRequest;
use url::{ParseError, Url};
use super::copy_table_from::{build_fs_backend, build_s3_backend, S3_SCHEMA};
use crate::error::{self, Result};
use crate::sql::SqlHandler;
impl SqlHandler {
fn build_backend(
&self,
url: &str,
connection: HashMap<String, String>,
) -> Result<(ObjectStore, String)> {
let result = Url::parse(url);
match result {
Ok(url) => {
let host = url.host_str();
let schema = url.scheme();
let path = url.path();
match schema.to_uppercase().as_str() {
S3_SCHEMA => {
let object_store = build_s3_backend(host, "/", connection)?;
Ok((object_store, path.to_string()))
}
_ => error::UnsupportedBackendProtocolSnafu {
protocol: schema.to_string(),
}
.fail(),
}
}
Err(ParseError::RelativeUrlWithoutBase) => {
let object_store = build_fs_backend("/")?;
Ok((object_store, url.to_string()))
}
Err(err) => Err(error::Error::InvalidUrl {
url: url.to_string(),
source: err,
}),
}
}
pub(crate) async fn copy_table(&self, req: CopyTableRequest) -> Result<Output> {
let table_ref = TableReference {
catalog: &req.catalog_name,
@@ -91,9 +52,13 @@ impl SqlHandler {
.context(error::TableScanExecSnafu)?;
let stream = Box::pin(DfRecordBatchStreamAdapter::new(stream));
let (object_store, file_name) = self.build_backend(&req.file_name, req.connection)?;
let accessor = Builder::default()
.root("/")
.build()
.context(error::BuildBackendSnafu)?;
let object_store = ObjectStore::new(accessor).finish();
let mut parquet_writer = ParquetWriter::new(file_name, stream, object_store);
let mut parquet_writer = ParquetWriter::new(req.file_name, stream, object_store);
// TODO(jiachun):
// For now, COPY is implemented synchronously.
// When copying large table, it will be blocked for a long time.

View File

@@ -34,7 +34,7 @@ use url::{ParseError, Url};
use crate::error::{self, Result};
use crate::sql::SqlHandler;
pub const S3_SCHEMA: &str = "S3";
const S3_SCHEMA: &str = "S3";
const ENDPOINT_URL: &str = "ENDPOINT_URL";
const ACCESS_KEY_ID: &str = "ACCESS_KEY_ID";
const SECRET_ACCESS_KEY: &str = "SECRET_ACCESS_KEY";
@@ -165,10 +165,13 @@ impl DataSource {
Source::Dir
};
let object_store = build_fs_backend(&path)?;
let accessor = Fs::default()
.root(&path)
.build()
.context(error::BuildBackendSnafu)?;
Ok(DataSource {
object_store,
object_store: ObjectStore::new(accessor).finish(),
source,
path,
regex,
@@ -181,6 +184,59 @@ impl DataSource {
}
}
fn build_s3_backend(
host: Option<&str>,
path: &str,
connection: HashMap<String, String>,
) -> Result<ObjectStore> {
let mut builder = S3::default();
builder.root(path);
if let Some(bucket) = host {
builder.bucket(bucket);
}
if let Some(endpoint) = connection.get(ENDPOINT_URL) {
builder.endpoint(endpoint);
}
if let Some(region) = connection.get(REGION) {
builder.region(region);
}
if let Some(key_id) = connection.get(ACCESS_KEY_ID) {
builder.access_key_id(key_id);
}
if let Some(key) = connection.get(SECRET_ACCESS_KEY) {
builder.secret_access_key(key);
}
if let Some(session_token) = connection.get(SESSION_TOKEN) {
builder.security_token(session_token);
}
if let Some(enable_str) = connection.get(ENABLE_VIRTUAL_HOST_STYLE) {
let enable = enable_str.as_str().parse::<bool>().map_err(|e| {
error::InvalidConnectionSnafu {
msg: format!(
"failed to parse the option {}={}, {}",
ENABLE_VIRTUAL_HOST_STYLE, enable_str, e
),
}
.build()
})?;
if enable {
builder.enable_virtual_host_style();
}
}
let accessor = builder.build().context(error::BuildBackendSnafu)?;
Ok(ObjectStore::new(accessor).finish())
}
fn from_url(
url: Url,
regex: Option<Regex>,
@@ -201,7 +257,7 @@ impl DataSource {
};
let object_store = match schema.to_uppercase().as_str() {
S3_SCHEMA => build_s3_backend(host, &dir, connection)?,
S3_SCHEMA => DataSource::build_s3_backend(host, &dir, connection)?,
_ => {
return error::UnsupportedBackendProtocolSnafu {
protocol: schema.to_string(),
@@ -292,68 +348,6 @@ impl DataSource {
}
}
pub fn build_s3_backend(
host: Option<&str>,
path: &str,
connection: HashMap<String, String>,
) -> Result<ObjectStore> {
let mut builder = S3::default();
builder.root(path);
if let Some(bucket) = host {
builder.bucket(bucket);
}
if let Some(endpoint) = connection.get(ENDPOINT_URL) {
builder.endpoint(endpoint);
}
if let Some(region) = connection.get(REGION) {
builder.region(region);
}
if let Some(key_id) = connection.get(ACCESS_KEY_ID) {
builder.access_key_id(key_id);
}
if let Some(key) = connection.get(SECRET_ACCESS_KEY) {
builder.secret_access_key(key);
}
if let Some(session_token) = connection.get(SESSION_TOKEN) {
builder.security_token(session_token);
}
if let Some(enable_str) = connection.get(ENABLE_VIRTUAL_HOST_STYLE) {
let enable = enable_str.as_str().parse::<bool>().map_err(|e| {
error::InvalidConnectionSnafu {
msg: format!(
"failed to parse the option {}={}, {}",
ENABLE_VIRTUAL_HOST_STYLE, enable_str, e
),
}
.build()
})?;
if enable {
builder.enable_virtual_host_style();
}
}
let accessor = builder.build().context(error::BuildBackendSnafu)?;
Ok(ObjectStore::new(accessor).finish())
}
pub fn build_fs_backend(root: &str) -> Result<ObjectStore> {
let accessor = Fs::default()
.root(root)
.build()
.context(error::BuildBackendSnafu)?;
Ok(ObjectStore::new(accessor).finish())
}
#[cfg(test)]
mod tests {

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_query::Output;
use snafu::{OptionExt, ResultExt};
use table::engine::TableReference;
@@ -63,6 +64,10 @@ impl SqlHandler {
table: &str,
region: Option<u32>,
) -> Result<()> {
if schema == DEFAULT_SCHEMA_NAME && table == "numbers" {
return Ok(());
}
let table_ref = TableReference {
catalog,
schema,

View File

@@ -12,13 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::env;
use std::sync::Arc;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::Output;
use common_recordbatch::util;
use common_telemetry::logging;
use datatypes::data_type::ConcreteDataType;
use datatypes::vectors::{Int64Vector, StringVector, UInt64Vector, VectorRef};
use query::parser::{QueryLanguageParser, QueryStatement};
@@ -799,45 +797,6 @@ async fn test_execute_copy_to() {
assert!(matches!(output, Output::AffectedRows(2)));
}
#[tokio::test(flavor = "multi_thread")]
async fn test_execute_copy_to_s3() {
logging::init_default_ut_logging();
if let Ok(bucket) = env::var("GT_S3_BUCKET") {
if !bucket.is_empty() {
let instance = setup_test_instance("test_execute_copy_to_s3").await;
// setups
execute_sql(
&instance,
"create table demo(host string, cpu double, memory double, ts timestamp time index);",
)
.await;
let output = execute_sql(
&instance,
r#"insert into demo(host, cpu, memory, ts) values
('host1', 66.6, 1024, 1655276557000),
('host2', 88.8, 333.3, 1655276558000)
"#,
)
.await;
assert!(matches!(output, Output::AffectedRows(2)));
let key_id = env::var("GT_S3_ACCESS_KEY_ID").unwrap();
let key = env::var("GT_S3_ACCESS_KEY").unwrap();
let url =
env::var("GT_S3_ENDPOINT_URL").unwrap_or("https://s3.amazonaws.com".to_string());
let root = uuid::Uuid::new_v4().to_string();
// exports
let copy_to_stmt = format!("Copy demo TO 's3://{}/{}/export/demo.parquet' CONNECTION (ACCESS_KEY_ID='{}',SECRET_ACCESS_KEY='{}',ENDPOINT_URL='{}')", bucket, root, key_id, key, url);
let output = execute_sql(&instance, &copy_to_stmt).await;
assert!(matches!(output, Output::AffectedRows(2)));
}
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_execute_copy_from() {
let instance = setup_test_instance("test_execute_copy_from").await;
@@ -923,106 +882,6 @@ async fn test_execute_copy_from() {
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_execute_copy_from_s3() {
logging::init_default_ut_logging();
if let Ok(bucket) = env::var("GT_S3_BUCKET") {
if !bucket.is_empty() {
let instance = setup_test_instance("test_execute_copy_from_s3").await;
// setups
execute_sql(
&instance,
"create table demo(host string, cpu double, memory double, ts timestamp time index);",
)
.await;
let output = execute_sql(
&instance,
r#"insert into demo(host, cpu, memory, ts) values
('host1', 66.6, 1024, 1655276557000),
('host2', 88.8, 333.3, 1655276558000)
"#,
)
.await;
assert!(matches!(output, Output::AffectedRows(2)));
// export
let root = uuid::Uuid::new_v4().to_string();
let key_id = env::var("GT_S3_ACCESS_KEY_ID").unwrap();
let key = env::var("GT_S3_ACCESS_KEY").unwrap();
let url =
env::var("GT_S3_ENDPOINT_URL").unwrap_or("https://s3.amazonaws.com".to_string());
let copy_to_stmt = format!("Copy demo TO 's3://{}/{}/export/demo.parquet' CONNECTION (ACCESS_KEY_ID='{}',SECRET_ACCESS_KEY='{}',ENDPOINT_URL='{}')", bucket, root, key_id, key, url);
logging::info!("Copy table to s3: {}", copy_to_stmt);
let output = execute_sql(&instance, &copy_to_stmt).await;
assert!(matches!(output, Output::AffectedRows(2)));
struct Test<'a> {
sql: &'a str,
table_name: &'a str,
}
let tests = [
Test {
sql: &format!(
"Copy with_filename FROM 's3://{}/{}/export/demo.parquet_1_2'",
bucket, root
),
table_name: "with_filename",
},
Test {
sql: &format!("Copy with_path FROM 's3://{}/{}/export/'", bucket, root),
table_name: "with_path",
},
Test {
sql: &format!(
"Copy with_pattern FROM 's3://{}/{}/export/' WITH (PATTERN = 'demo.*')",
bucket, root
),
table_name: "with_pattern",
},
];
for test in tests {
// import
execute_sql(
&instance,
&format!(
"create table {}(host string, cpu double, memory double, ts timestamp time index);",
test.table_name
),
)
.await;
let sql = format!(
"{} CONNECTION (ACCESS_KEY_ID='{}',SECRET_ACCESS_KEY='{}',ENDPOINT_URL='{}')",
test.sql, key_id, key, url
);
logging::info!("Running sql: {}", sql);
let output = execute_sql(&instance, &sql).await;
assert!(matches!(output, Output::AffectedRows(2)));
let output = execute_sql(
&instance,
&format!("select * from {} order by ts", test.table_name),
)
.await;
let expected = "\
+-------+------+--------+---------------------+
| host | cpu | memory | ts |
+-------+------+--------+---------------------+
| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 |
| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 |
+-------+------+--------+---------------------+"
.to_string();
check_output_stream(output, expected).await;
}
}
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_create_by_procedure() {
common_telemetry::init_default_ut_logging();

View File

@@ -13,7 +13,6 @@
// limitations under the License.
use std::sync::Arc;
use std::time::Duration;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
use common_query::Output;
@@ -65,8 +64,6 @@ impl MockInstance {
store: ObjectStoreConfig::File(FileConfig {
data_dir: procedure_dir.path().to_str().unwrap().to_string(),
}),
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
});
let instance = Instance::with_mock_meta_client(&opts).await.unwrap();

View File

@@ -677,7 +677,7 @@ pub fn check_permission(
validate_param(delete.table_name(), query_ctx)?;
}
Statement::Copy(stmd) => match stmd {
CopyTable::To(copy_table_to) => validate_param(&copy_table_to.table_name, query_ctx)?,
CopyTable::To(copy_table_to) => validate_param(copy_table_to.table_name(), query_ctx)?,
CopyTable::From(copy_table_from) => {
validate_param(&copy_table_from.table_name, query_ctx)?
}

View File

@@ -74,8 +74,7 @@ impl DistTable {
let mut success = 0;
for join in joins {
let object_result = join.await.context(error::JoinTaskSnafu)??;
let Output::AffectedRows(rows) = object_result else { unreachable!() };
let rows = join.await.context(error::JoinTaskSnafu)?? as usize;
success += rows;
}
Ok(Output::AffectedRows(success))

View File

@@ -47,7 +47,7 @@ impl DatanodeInstance {
Self { table, db }
}
pub(crate) async fn grpc_insert(&self, request: InsertRequest) -> client::Result<Output> {
pub(crate) async fn grpc_insert(&self, request: InsertRequest) -> client::Result<u32> {
self.db.insert(request).await
}

View File

@@ -125,15 +125,15 @@ pub(crate) async fn create_datanode_client(
// create a mock datanode grpc service, see example here:
// https://github.com/hyperium/tonic/blob/master/examples/src/mock/mock.rs
let datanode_service = GrpcServer::new(
let grpc_server = GrpcServer::new(
ServerGrpcQueryHandlerAdaptor::arc(datanode_instance),
None,
runtime,
)
.create_service();
);
tokio::spawn(async move {
Server::builder()
.add_service(datanode_service)
.add_service(grpc_server.create_flight_service())
.add_service(grpc_server.create_database_service())
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
});

View File

@@ -47,7 +47,7 @@ use table::requests::{
AddColumnRequest, AlterKind, AlterTableRequest, DeleteRequest, InsertRequest,
};
use table::table::scan::SimpleTableScan;
use table::table::{AlterContext, RegionStat, Table};
use table::table::{AlterContext, Table};
use tokio::sync::Mutex;
use crate::error;
@@ -350,17 +350,6 @@ impl<R: Region> Table for MitoTable<R> {
Ok(())
}
fn region_stats(&self) -> TableResult<Vec<RegionStat>> {
Ok(self
.regions
.values()
.map(|region| RegionStat {
region_id: region.id(),
disk_usage_bytes: region.disk_usage_bytes(),
})
.collect())
}
}
struct ChunkStream {

View File

@@ -957,12 +957,7 @@ impl PromPlanner {
.tag_columns
.iter()
.chain(self.ctx.time_index_column.iter())
.map(|col| {
Ok(DfExpr::Column(Column::new(
self.ctx.table_name.clone(),
col,
)))
});
.map(|col| Ok(DfExpr::Column(Column::from(col))));
// build computation exprs
let result_value_columns = self
@@ -1490,7 +1485,7 @@ mod test {
.unwrap();
let expected = String::from(
"Projection: some_metric.tag_0, some_metric.timestamp, some_metric.field_0 + some_metric.field_0 AS some_metric.field_0 + some_metric.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), some_metric.field_0 + some_metric.field_0:Float64;N]\
"Projection: lhs.tag_0, lhs.timestamp, some_metric.field_0 + some_metric.field_0 AS some_metric.field_0 + some_metric.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), some_metric.field_0 + some_metric.field_0:Float64;N]\
\n Inner Join: lhs.tag_0 = some_metric.tag_0, lhs.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n SubqueryAlias: lhs [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\

View File

@@ -34,8 +34,6 @@ use datatypes::arrow::datatypes::DataType;
pub struct TypeConversionRule;
impl OptimizerRule for TypeConversionRule {
// TODO(ruihang): fix this warning
#[allow(deprecated)]
fn try_optimize(
&self,
plan: &LogicalPlan,

View File

@@ -379,8 +379,7 @@ import greptime as gt
@copr(args=["number"], returns = ["number"], sql = "select * from numbers")
def test(number) -> vector[u32]:
from greptime import query
return query().sql("select * from numbers")[0][0]
return query.sql("select * from numbers")[0][0]
"#;
let script = script_engine
.compile(script, CompileContext::default())
@@ -439,8 +438,7 @@ from greptime import col
@copr(args=["number"], returns = ["number"], sql = "select * from numbers")
def test(number) -> vector[u32]:
from greptime import dataframe
return dataframe().filter(col("number")==col("number")).collect()[0][0]
return dataframe.filter(col("number")==col("number")).collect()[0][0]
"#;
let script = script_engine
.compile(script, CompileContext::default())

View File

@@ -140,7 +140,7 @@ impl Coprocessor {
cols.len() == names.len() && names.len() == anno.len(),
OtherSnafu {
reason: format!(
"Unmatched length for cols({}), names({}) and annotation({})",
"Unmatched length for cols({}), names({}) and anno({})",
cols.len(),
names.len(),
anno.len()
@@ -335,7 +335,7 @@ pub fn exec_coprocessor(script: &str, rb: &Option<RecordBatch>) -> Result<Record
#[cfg_attr(feature = "pyo3_backend", pyo3class(name = "query_engine"))]
#[rspyclass(module = false, name = "query_engine")]
#[derive(Debug, PyPayload, Clone)]
#[derive(Debug, PyPayload)]
pub struct PyQueryEngine {
inner: QueryEngineWeakRef,
}

View File

@@ -184,7 +184,7 @@ fn eval_rspy(case: CodeBlockTestCase) {
#[cfg(feature = "pyo3_backend")]
fn eval_pyo3(case: CodeBlockTestCase) {
init_cpython_interpreter().unwrap();
init_cpython_interpreter();
Python::with_gil(|py| {
let locals = {
let locals_dict = PyDict::new(py);

View File

@@ -38,55 +38,6 @@ pub(super) fn generate_copr_intgrate_tests() -> Vec<CoprTestCase> {
vec![
CoprTestCase {
script: r#"
from greptime import vector
@copr(returns=["value"])
def boolean_array() -> vector[f64]:
v = vector([1.0, 2.0, 3.0])
# This returns a vector([2.0])
return v[(v > 1) & (v < 3)]
"#
.to_string(),
expect: Some(ronish!("value": vector!(Float64Vector, [2.0f64]))),
},
#[cfg(feature = "pyo3_backend")]
CoprTestCase {
script: r#"
@copr(returns=["value"], backend="pyo3")
def boolean_array() -> vector[f64]:
from greptime import vector
from greptime import query, dataframe
try:
print("query()=", query())
except KeyError as e:
print("query()=", e)
try:
print("dataframe()=", dataframe())
except KeyError as e:
print("dataframe()=", e)
v = vector([1.0, 2.0, 3.0])
# This returns a vector([2.0])
return v[(v > 1) & (v < 3)]
"#
.to_string(),
expect: Some(ronish!("value": vector!(Float64Vector, [2.0f64]))),
},
#[cfg(feature = "pyo3_backend")]
CoprTestCase {
script: r#"
@copr(returns=["value"], backend="pyo3")
def boolean_array() -> vector[f64]:
from greptime import vector
v = vector([1.0, 2.0, 3.0])
# This returns a vector([2.0])
return v[(v > 1) & (v < 3)]
"#
.to_string(),
expect: Some(ronish!("value": vector!(Float64Vector, [2.0f64]))),
},
CoprTestCase {
script: r#"
@copr(args=["number", "number"],
returns=["value"],
sql="select number from numbers limit 5", backend="rspy")
@@ -156,9 +107,9 @@ def answer() -> vector[i64]:
script: r#"
@copr(args=[], returns = ["number"], sql = "select * from numbers", backend="rspy")
def answer() -> vector[i64]:
from greptime import vector, col, lit, dataframe
from greptime import vector, col, lit
expr_0 = (col("number")<lit(3)) & (col("number")>0)
ret = dataframe().select([col("number")]).filter(expr_0).collect()[0][0]
ret = dataframe.select([col("number")]).filter(expr_0).collect()[0][0]
return ret
"#
.to_string(),
@@ -169,10 +120,10 @@ def answer() -> vector[i64]:
script: r#"
@copr(args=[], returns = ["number"], sql = "select * from numbers", backend="pyo3")
def answer() -> vector[i64]:
from greptime import vector, col, lit, dataframe
from greptime import vector, col, lit
# Bitwise Operator pred comparison operator
expr_0 = (col("number")<lit(3)) & (col("number")>0)
ret = dataframe().select([col("number")]).filter(expr_0).collect()[0][0]
ret = dataframe.select([col("number")]).filter(expr_0).collect()[0][0]
return ret
"#
.to_string(),

View File

@@ -133,7 +133,7 @@ fn get_test_cases() -> Vec<TestCase> {
}
#[cfg(feature = "pyo3_backend")]
fn eval_pyo3(testcase: TestCase, locals: HashMap<String, PyVector>) {
init_cpython_interpreter().unwrap();
init_cpython_interpreter();
Python::with_gil(|py| {
let locals = {
let locals_dict = PyDict::new(py);

View File

@@ -20,13 +20,10 @@ use datafusion::physical_plan::expressions;
use datafusion_expr::ColumnarValue;
use datafusion_physical_expr::{math_expressions, AggregateExpr};
use datatypes::vectors::VectorRef;
use pyo3::exceptions::{PyKeyError, PyValueError};
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::types::PyDict;
use super::dataframe_impl::PyDataFrame;
use super::utils::scalar_value_to_py_any;
use crate::python::ffi_types::copr::PyQueryEngine;
use crate::python::ffi_types::utils::all_to_f64;
use crate::python::ffi_types::PyVector;
use crate::python::pyo3::dataframe_impl::{col, lit};
@@ -61,12 +58,9 @@ macro_rules! batch_import {
#[pyo3(name = "greptime")]
pub(crate) fn greptime_builtins(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
m.add_class::<PyVector>()?;
use self::query_engine;
batch_import!(
m,
[
dataframe,
query_engine,
lit,
col,
pow,
@@ -118,34 +112,6 @@ pub(crate) fn greptime_builtins(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
Ok(())
}
fn get_globals(py: Python) -> PyResult<&PyDict> {
// TODO(discord9): check if this is sound(in python)
let py_main = PyModule::import(py, "__main__")?;
let globals = py_main.dict();
Ok(globals)
}
#[pyfunction]
fn dataframe(py: Python) -> PyResult<PyDataFrame> {
let globals = get_globals(py)?;
let df = globals
.get_item("__dataframe__")
.ok_or(PyKeyError::new_err("No __dataframe__ variable is found"))?
.extract::<PyDataFrame>()?;
Ok(df)
}
#[pyfunction]
#[pyo3(name = "query")]
fn query_engine(py: Python) -> PyResult<PyQueryEngine> {
let globals = get_globals(py)?;
let query = globals
.get_item("__query__")
.ok_or(PyKeyError::new_err("No __query__ variable is found"))?
.extract::<PyQueryEngine>()?;
Ok(query)
}
fn eval_func(py: Python<'_>, name: &str, v: &[&PyObject]) -> PyResult<PyVector> {
let v = to_array_of_py_vec(py, v)?;
py.allow_threads(|| {

View File

@@ -24,6 +24,7 @@ use snafu::{ensure, Backtrace, GenerateImplicitData, ResultExt};
use crate::python::error::{self, NewRecordBatchSnafu, OtherSnafu, Result};
use crate::python::ffi_types::copr::PyQueryEngine;
use crate::python::ffi_types::{check_args_anno_real_type, select_from_rb, Coprocessor, PyVector};
use crate::python::pyo3::builtins::greptime_builtins;
use crate::python::pyo3::dataframe_impl::PyDataFrame;
use crate::python::pyo3::utils::{init_cpython_interpreter, pyo3_obj_try_to_typed_val};
@@ -56,7 +57,6 @@ impl PyQueryEngine {
}
// TODO: put this into greptime module
}
/// Execute a `Coprocessor` with given `RecordBatch`
pub(crate) fn pyo3_exec_parsed(
copr: &Coprocessor,
@@ -73,7 +73,7 @@ pub(crate) fn pyo3_exec_parsed(
Vec::new()
};
// Just in case cpython is not inited
init_cpython_interpreter().unwrap();
init_cpython_interpreter();
Python::with_gil(|py| -> Result<_> {
let mut cols = (|| -> PyResult<_> {
let dummy_decorator = "
@@ -86,6 +86,7 @@ def copr(*dummy, **kwdummy):
return func
return inner
coprocessor = copr
from greptime import vector
";
let gen_call = format!("\n_return_from_coprocessor = {}(*_args_for_coprocessor, **_kwargs_for_coprocessor)", copr.name);
let script = format!("{}{}{}", dummy_decorator, copr.script, gen_call);
@@ -108,11 +109,14 @@ coprocessor = copr
let globals = py_main.dict();
let locals = PyDict::new(py);
let greptime = PyModule::new(py, "greptime")?;
greptime_builtins(py, greptime)?;
locals.set_item("greptime", greptime)?;
if let Some(engine) = &copr.query_engine {
let query_engine = PyQueryEngine::from_weakref(engine.clone());
let query_engine = PyCell::new(py, query_engine)?;
globals.set_item("__query__", query_engine)?;
globals.set_item("query", query_engine)?;
}
// TODO(discord9): find out why `dataframe` is not in scope
@@ -125,12 +129,12 @@ coprocessor = copr
)
)?;
let dataframe = PyCell::new(py, dataframe)?;
globals.set_item("__dataframe__", dataframe)?;
globals.set_item("dataframe", dataframe)?;
}
locals.set_item("_args_for_coprocessor", args)?;
locals.set_item("_kwargs_for_coprocessor", kwargs)?;
// `greptime` is already import when init interpreter, so no need to set in here
// TODO(discord9): find a better way to set `dataframe` and `query` in scope/ or set it into module(latter might be impossible and not idomatic even in python)
// set `dataframe` and `query` in scope/ or set it into module
@@ -215,10 +219,10 @@ mod copr_test {
@copr(args=["cpu", "mem"], returns=["ref"], backend="pyo3")
def a(cpu, mem, **kwargs):
import greptime as gt
from greptime import vector, log2, sum, pow, col, lit, dataframe
from greptime import vector, log2, sum, pow, col, lit
for k, v in kwargs.items():
print("%s == %s" % (k, v))
print(dataframe().select([col("cpu")<lit(0.3)]).collect())
print(dataframe.select([col("cpu")<lit(0.3)]).collect())
return (0.5 < cpu) & ~( cpu >= 0.75)
"#;
let cpu_array = Float32Vector::from_slice([0.9f32, 0.8, 0.7, 0.3]);

View File

@@ -26,8 +26,6 @@ use crate::python::ffi_types::PyVector;
use crate::python::pyo3::utils::pyo3_obj_try_to_typed_scalar_value;
use crate::python::utils::block_on_async;
type PyExprRef = Py<PyExpr>;
#[derive(Debug, Clone)]
#[pyclass]
pub(crate) struct PyDataFrame {
inner: DfDataFrame,
@@ -49,9 +47,6 @@ impl PyDataFrame {
#[pymethods]
impl PyDataFrame {
fn __call__(&self) -> PyResult<Self> {
Ok(self.clone())
}
fn select_columns(&self, columns: Vec<String>) -> PyResult<Self> {
Ok(self
.inner
@@ -257,9 +252,6 @@ pub(crate) fn col(name: String) -> PyExpr {
#[pymethods]
impl PyExpr {
fn __call__(&self) -> PyResult<Self> {
Ok(self.clone())
}
fn __richcmp__(&self, py: Python<'_>, other: PyObject, op: CompareOp) -> PyResult<Self> {
let other = other.extract::<Self>(py).or_else(|_| lit(py, other))?;
let op = match op {

View File

@@ -36,9 +36,7 @@ static START_PYO3: Lazy<Mutex<bool>> = Lazy::new(|| Mutex::new(false));
pub(crate) fn to_py_err(err: impl ToString) -> PyErr {
PyArrowException::new_err(err.to_string())
}
/// init cpython interpreter with `greptime` builtins, if already inited, do nothing
pub(crate) fn init_cpython_interpreter() -> PyResult<()> {
pub(crate) fn init_cpython_interpreter() {
let mut start = START_PYO3.lock().unwrap();
if !*start {
pyo3::append_to_inittab!(greptime_builtins);
@@ -46,7 +44,6 @@ pub(crate) fn init_cpython_interpreter() -> PyResult<()> {
*start = true;
info!("Started CPython Interpreter");
}
Ok(())
}
pub fn val_to_py_any(py: Python<'_>, val: Value) -> PyResult<PyObject> {

View File

@@ -21,12 +21,11 @@ use datatypes::arrow::array::{Array, ArrayRef};
use datatypes::arrow::datatypes::DataType as ArrowDataType;
use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::vectors::Helper;
use pyo3::exceptions::{PyIndexError, PyRuntimeError, PyValueError};
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::pyclass::CompareOp;
use pyo3::types::{PyBool, PyFloat, PyInt, PyList, PyString, PyType};
use super::utils::val_to_py_any;
use crate::python::ffi_types::vector::{arrow_rtruediv, wrap_bool_result, wrap_result, PyVector};
use crate::python::pyo3::utils::{pyo3_obj_try_to_typed_val, to_py_err};
@@ -279,45 +278,6 @@ impl PyVector {
let v = Helper::try_into_vector(array).map_err(to_py_err)?;
Ok(v.into())
}
/// PyO3's Magic Method for slicing and indexing
fn __getitem__(&self, py: Python, needle: PyObject) -> PyResult<PyObject> {
if let Ok(needle) = needle.extract::<PyVector>(py) {
let mask = needle.to_arrow_array();
let mask = mask
.as_any()
.downcast_ref::<BooleanArray>()
.ok_or_else(|| {
PyValueError::new_err(
"A Boolean Array is requested for slicing, found {mask:?}",
)
})?;
let result = compute::filter(&self.to_arrow_array(), mask)
.map_err(|err| PyRuntimeError::new_err(format!("Arrow Error: {err:#?}")))?;
let ret = Helper::try_into_vector(result.clone()).map_err(|e| {
PyRuntimeError::new_err(format!("Can't cast result into vector, err: {e:?}"))
})?;
let ret = Self::from(ret).into_py(py);
Ok(ret)
} else if let Ok(index) = needle.extract::<isize>(py) {
// deal with negative index
let len = self.len() as isize;
let index = if index < 0 { len + index } else { index };
if index < 0 || index >= len {
return Err(PyIndexError::new_err(format!(
"Index out of bound, index: {index}, len: {len}",
index = index,
len = len
)));
}
let val = self.as_vector_ref().get(index as usize);
val_to_py_any(py, val)
} else {
Err(PyValueError::new_err(
"{needle:?} is neither a Vector nor a int, can't use for slicing or indexing",
))
}
}
}
#[cfg(test)]
@@ -357,7 +317,7 @@ mod test {
}
#[test]
fn test_py_vector_api() {
init_cpython_interpreter().unwrap();
init_cpython_interpreter();
Python::with_gil(|py| {
let module = PyModule::new(py, "gt").unwrap();
module.add_class::<PyVector>().unwrap();

View File

@@ -306,38 +306,13 @@ pub(crate) mod greptime_builtin {
all_to_f64, eval_aggr_fn, from_df_err, try_into_columnar_value, try_into_py_obj,
type_cast_error,
};
use crate::python::ffi_types::copr::PyQueryEngine;
use crate::python::ffi_types::vector::val_to_pyobj;
use crate::python::ffi_types::PyVector;
use crate::python::rspython::dataframe_impl::data_frame::{PyDataFrame, PyExpr, PyExprRef};
use crate::python::rspython::dataframe_impl::data_frame::{PyExpr, PyExprRef};
use crate::python::rspython::utils::{
is_instance, py_obj_to_value, py_obj_to_vec, PyVectorRef,
};
/// get `__dataframe__` from globals and return it
/// TODO(discord9): this is a terrible hack, we should find a better way to get `__dataframe__`
#[pyfunction]
fn dataframe(vm: &VirtualMachine) -> PyResult<PyDataFrame> {
let df = vm.current_globals().get_item("__dataframe__", vm)?;
let df = df
.payload::<PyDataFrame>()
.ok_or_else(|| vm.new_type_error(format!("object {:?} is not a DataFrame", df)))?;
let df = df.clone();
Ok(df)
}
/// get `__query__` from globals and return it
/// TODO(discord9): this is a terrible hack, we should find a better way to get `__dataframe__`
#[pyfunction]
fn query(vm: &VirtualMachine) -> PyResult<PyQueryEngine> {
let query_engine = vm.current_globals().get_item("__query__", vm)?;
let query_engine = query_engine.payload::<PyQueryEngine>().ok_or_else(|| {
vm.new_type_error(format!("object {:?} is not a QueryEngine", query_engine))
})?;
let query_engine = query_engine.clone();
Ok(query_engine)
}
#[pyfunction]
fn vector(args: OptionalArg<PyObjectRef>, vm: &VirtualMachine) -> PyResult<PyVector> {
PyVector::new(args, vm)

View File

@@ -81,13 +81,12 @@ fn set_items_in_scope(
fn set_query_engine_in_scope(
scope: &Scope,
vm: &VirtualMachine,
name: &str,
query_engine: PyQueryEngine,
) -> Result<()> {
scope
.locals
.as_object()
.set_item(name, query_engine.to_pyobject(vm), vm)
.set_item("query", query_engine.to_pyobject(vm), vm)
.map_err(|e| format_py_error(e, vm))
}
@@ -102,7 +101,7 @@ pub(crate) fn exec_with_cached_vm(
// set arguments with given name and values
let scope = vm.new_scope_with_builtins();
if let Some(rb) = rb {
set_dataframe_in_scope(&scope, vm, "__dataframe__", rb)?;
set_dataframe_in_scope(&scope, vm, "dataframe", rb)?;
}
if let Some(arg_names) = &copr.deco_args.arg_names {
@@ -114,7 +113,7 @@ pub(crate) fn exec_with_cached_vm(
let query_engine = PyQueryEngine::from_weakref(engine.clone());
// put a object named with query of class PyQueryEngine in scope
set_query_engine_in_scope(&scope, vm, "__query__", query_engine)?;
set_query_engine_in_scope(&scope, vm, query_engine)?;
}
if let Some(kwarg) = &copr.kwarg {

View File

@@ -38,7 +38,7 @@ pub(crate) mod data_frame {
use crate::python::rspython::builtins::greptime_builtin::lit;
use crate::python::utils::block_on_async;
#[rspyclass(module = "data_frame", name = "DataFrame")]
#[derive(PyPayload, Debug, Clone)]
#[derive(PyPayload, Debug)]
pub struct PyDataFrame {
pub inner: DfDataFrame,
}

View File

@@ -287,7 +287,7 @@ def a(cpu, mem):
abc = vector([v[0] > v[1] for v in zip(cpu, mem)])
fed = cpu.filter(abc)
ref = log2(fed/prev(fed))
return cpu[(cpu > 0.5) & ~( cpu >= 0.75)]
return (0.5 < cpu) & ~( cpu >= 0.75)
"#;
let cpu_array = Float32Vector::from_slice([0.9f32, 0.8, 0.7, 0.3]);
let mem_array = Float64Vector::from_slice([0.1f64, 0.2, 0.3, 0.4]);

View File

@@ -559,11 +559,11 @@ def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None],
// constant column(int)
name: "test_data_frame",
code: r#"
from greptime import col, dataframe
from greptime import col
@copr(args=["cpu", "mem"], returns=["perf", "what"])
def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None],
vector[f32]):
ret = dataframe().select([col("cpu"), col("mem")]).collect()[0]
ret = dataframe.select([col("cpu"), col("mem")]).collect()[0]
return ret[0], ret[1]
"#,
predicate: ExecIsOk(
@@ -593,11 +593,11 @@ def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None],
// constant column(int)
name: "test_data_frame",
code: r#"
from greptime import col, dataframe
from greptime import col
@copr(args=["cpu", "mem"], returns=["perf", "what"])
def a(cpu: vector[f32], mem: vector[f64])->(vector[f64|None],
vector[f32]):
ret = dataframe().filter(col("cpu")>col("mem")).collect()[0]
ret = dataframe.filter(col("cpu")>col("mem")).collect()[0]
return ret[0], ret[1]
"#,
predicate: ExecIsOk(

View File

@@ -12,11 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod database;
pub mod flight;
pub mod handler;
use std::net::SocketAddr;
use std::sync::Arc;
use api::v1::greptime_database_server::{GreptimeDatabase, GreptimeDatabaseServer};
use arrow_flight::flight_service_server::{FlightService, FlightServiceServer};
use async_trait::async_trait;
use common_runtime::Runtime;
@@ -27,18 +30,21 @@ use tokio::net::TcpListener;
use tokio::sync::oneshot::{self, Sender};
use tokio::sync::Mutex;
use tokio_stream::wrappers::TcpListenerStream;
use tonic::Status;
use crate::auth::UserProviderRef;
use crate::error::{AlreadyStartedSnafu, Result, StartGrpcSnafu, TcpBindSnafu};
use crate::grpc::database::DatabaseService;
use crate::grpc::flight::FlightHandler;
use crate::grpc::handler::GreptimeRequestHandler;
use crate::query_handler::grpc::ServerGrpcQueryHandlerRef;
use crate::server::Server;
type TonicResult<T> = std::result::Result<T, Status>;
pub struct GrpcServer {
query_handler: ServerGrpcQueryHandlerRef,
user_provider: Option<UserProviderRef>,
shutdown_tx: Mutex<Option<Sender<()>>>,
runtime: Arc<Runtime>,
request_handler: Arc<GreptimeRequestHandler>,
}
impl GrpcServer {
@@ -47,21 +53,23 @@ impl GrpcServer {
user_provider: Option<UserProviderRef>,
runtime: Arc<Runtime>,
) -> Self {
Self {
let request_handler = Arc::new(GreptimeRequestHandler::new(
query_handler,
user_provider,
shutdown_tx: Mutex::new(None),
runtime,
));
Self {
shutdown_tx: Mutex::new(None),
request_handler,
}
}
pub fn create_service(&self) -> FlightServiceServer<impl FlightService> {
let service = FlightHandler::new(
self.query_handler.clone(),
self.user_provider.clone(),
self.runtime.clone(),
);
FlightServiceServer::new(service)
pub fn create_flight_service(&self) -> FlightServiceServer<impl FlightService> {
FlightServiceServer::new(FlightHandler::new(self.request_handler.clone()))
}
pub fn create_database_service(&self) -> GreptimeDatabaseServer<impl GreptimeDatabase> {
GreptimeDatabaseServer::new(DatabaseService::new(self.request_handler.clone()))
}
}
@@ -103,7 +111,8 @@ impl Server for GrpcServer {
// Would block to serve requests.
tonic::transport::Server::builder()
.add_service(self.create_service())
.add_service(self.create_flight_service())
.add_service(self.create_database_service())
.serve_with_incoming_shutdown(TcpListenerStream::new(listener), rx.map(drop))
.await
.context(StartGrpcSnafu)?;

View File

@@ -0,0 +1,57 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use api::v1::greptime_database_server::GreptimeDatabase;
use api::v1::{greptime_response, AffectedRows, GreptimeRequest, GreptimeResponse};
use async_trait::async_trait;
use common_query::Output;
use tonic::{Request, Response, Status};
use crate::grpc::handler::GreptimeRequestHandler;
use crate::grpc::TonicResult;
pub(crate) struct DatabaseService {
handler: Arc<GreptimeRequestHandler>,
}
impl DatabaseService {
pub(crate) fn new(handler: Arc<GreptimeRequestHandler>) -> Self {
Self { handler }
}
}
#[async_trait]
impl GreptimeDatabase for DatabaseService {
async fn handle(
&self,
request: Request<GreptimeRequest>,
) -> TonicResult<Response<GreptimeResponse>> {
let request = request.into_inner();
let output = self.handler.handle_request(request).await?;
let response = match output {
Output::AffectedRows(rows) => GreptimeResponse {
header: None,
response: Some(greptime_response::Response::AffectedRows(AffectedRows {
value: rows as _,
})),
},
Output::Stream(_) | Output::RecordBatches(_) => {
return Err(Status::unimplemented("GreptimeDatabase::handle for query"));
}
};
Ok(Response::new(response))
}
}

View File

@@ -17,8 +17,7 @@ mod stream;
use std::pin::Pin;
use std::sync::Arc;
use api::v1::auth_header::AuthScheme;
use api::v1::{Basic, GreptimeRequest, RequestHeader};
use api::v1::GreptimeRequest;
use arrow_flight::flight_service_server::FlightService;
use arrow_flight::{
Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo,
@@ -27,40 +26,25 @@ use arrow_flight::{
use async_trait::async_trait;
use common_grpc::flight::{FlightEncoder, FlightMessage};
use common_query::Output;
use common_runtime::Runtime;
use futures::Stream;
use prost::Message;
use session::context::{QueryContext, QueryContextRef};
use snafu::{OptionExt, ResultExt};
use snafu::ResultExt;
use tonic::{Request, Response, Status, Streaming};
use crate::auth::{Identity, UserProviderRef};
use crate::error;
use crate::error::Error::Auth;
use crate::error::{NotFoundAuthHeaderSnafu, UnsupportedAuthSchemeSnafu};
use crate::grpc::flight::stream::FlightRecordBatchStream;
use crate::query_handler::grpc::ServerGrpcQueryHandlerRef;
use crate::grpc::handler::GreptimeRequestHandler;
use crate::grpc::TonicResult;
type TonicResult<T> = Result<T, Status>;
type TonicStream<T> = Pin<Box<dyn Stream<Item = TonicResult<T>> + Send + Sync + 'static>>;
pub struct FlightHandler {
handler: ServerGrpcQueryHandlerRef,
user_provider: Option<UserProviderRef>,
runtime: Arc<Runtime>,
handler: Arc<GreptimeRequestHandler>,
}
impl FlightHandler {
pub fn new(
handler: ServerGrpcQueryHandlerRef,
user_provider: Option<UserProviderRef>,
runtime: Arc<Runtime>,
) -> Self {
Self {
handler,
user_provider,
runtime,
}
pub fn new(handler: Arc<GreptimeRequestHandler>) -> Self {
Self { handler }
}
}
@@ -105,40 +89,8 @@ impl FlightService for FlightHandler {
let request =
GreptimeRequest::decode(ticket.as_ref()).context(error::InvalidFlightTicketSnafu)?;
let query = request.request.context(error::InvalidQuerySnafu {
reason: "Expecting non-empty GreptimeRequest.",
})?;
let query_ctx = create_query_context(request.header.as_ref());
let output = self.handler.handle_request(request).await?;
auth(
self.user_provider.as_ref(),
request.header.as_ref(),
&query_ctx,
)
.await?;
let handler = self.handler.clone();
// Executes requests in another runtime to
// 1. prevent the execution from being cancelled unexpected by Tonic runtime;
// - Refer to our blog for the rational behind it:
// https://www.greptime.com/blogs/2023-01-12-hidden-control-flow.html
// - Obtaining a `JoinHandle` to get the panic message (if there's any).
// From its docs, `JoinHandle` is cancel safe. The task keeps running even it's handle been dropped.
// 2. avoid the handler blocks the gRPC runtime incidentally.
let handle = self
.runtime
.spawn(async move { handler.do_query(query, query_ctx).await });
let output = handle.await.map_err(|e| {
if e.is_cancelled() {
Status::cancelled(e.to_string())
} else if e.is_panic() {
Status::internal(format!("{:?}", e.into_panic()))
} else {
Status::unknown(e.to_string())
}
})??;
let stream = to_flight_data_stream(output);
Ok(Response::new(stream))
}
@@ -195,56 +147,3 @@ fn to_flight_data_stream(output: Output) -> TonicStream<FlightData> {
}
}
}
fn create_query_context(header: Option<&RequestHeader>) -> QueryContextRef {
let ctx = QueryContext::arc();
if let Some(header) = header {
if !header.catalog.is_empty() {
ctx.set_current_catalog(&header.catalog);
}
if !header.schema.is_empty() {
ctx.set_current_schema(&header.schema);
}
};
ctx
}
async fn auth(
user_provider: Option<&UserProviderRef>,
request_header: Option<&RequestHeader>,
query_ctx: &QueryContextRef,
) -> TonicResult<()> {
let Some(user_provider) = user_provider else { return Ok(()) };
let user_info = match request_header
.context(NotFoundAuthHeaderSnafu)?
.clone()
.authorization
.context(NotFoundAuthHeaderSnafu)?
.auth_scheme
.context(NotFoundAuthHeaderSnafu)?
{
AuthScheme::Basic(Basic { username, password }) => user_provider
.authenticate(
Identity::UserId(&username, None),
crate::auth::Password::PlainText(&password),
)
.await
.map_err(|e| Auth { source: e }),
AuthScheme::Token(_) => UnsupportedAuthSchemeSnafu {
name: "Token AuthScheme",
}
.fail(),
}
.map_err(|e| Status::unauthenticated(e.to_string()))?;
user_provider
.authorize(
&query_ctx.current_catalog(),
&query_ctx.current_schema(),
&user_info,
)
.await
.map_err(|e| Status::permission_denied(e.to_string()))
}

View File

@@ -0,0 +1,137 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use api::v1::auth_header::AuthScheme;
use api::v1::{Basic, GreptimeRequest, RequestHeader};
use common_query::Output;
use common_runtime::Runtime;
use session::context::{QueryContext, QueryContextRef};
use snafu::OptionExt;
use tonic::Status;
use crate::auth::{Identity, Password, UserProviderRef};
use crate::error::Error::{Auth, UnsupportedAuthScheme};
use crate::error::{InvalidQuerySnafu, NotFoundAuthHeaderSnafu};
use crate::grpc::TonicResult;
use crate::query_handler::grpc::ServerGrpcQueryHandlerRef;
pub struct GreptimeRequestHandler {
handler: ServerGrpcQueryHandlerRef,
user_provider: Option<UserProviderRef>,
runtime: Arc<Runtime>,
}
impl GreptimeRequestHandler {
pub fn new(
handler: ServerGrpcQueryHandlerRef,
user_provider: Option<UserProviderRef>,
runtime: Arc<Runtime>,
) -> Self {
Self {
handler,
user_provider,
runtime,
}
}
pub(crate) async fn handle_request(&self, request: GreptimeRequest) -> TonicResult<Output> {
let query = request.request.context(InvalidQuerySnafu {
reason: "Expecting non-empty GreptimeRequest.",
})?;
let header = request.header.as_ref();
let query_ctx = create_query_context(header);
self.auth(header, &query_ctx).await?;
let handler = self.handler.clone();
// Executes requests in another runtime to
// 1. prevent the execution from being cancelled unexpected by Tonic runtime;
// - Refer to our blog for the rational behind it:
// https://www.greptime.com/blogs/2023-01-12-hidden-control-flow.html
// - Obtaining a `JoinHandle` to get the panic message (if there's any).
// From its docs, `JoinHandle` is cancel safe. The task keeps running even it's handle been dropped.
// 2. avoid the handler blocks the gRPC runtime incidentally.
let handle = self
.runtime
.spawn(async move { handler.do_query(query, query_ctx).await });
let output = handle.await.map_err(|e| {
if e.is_cancelled() {
Status::cancelled(e.to_string())
} else if e.is_panic() {
Status::internal(format!("{:?}", e.into_panic()))
} else {
Status::unknown(e.to_string())
}
})??;
Ok(output)
}
async fn auth(
&self,
header: Option<&RequestHeader>,
query_ctx: &QueryContextRef,
) -> TonicResult<()> {
let Some(user_provider) = self.user_provider.as_ref() else { return Ok(()) };
let auth_scheme = header
.and_then(|header| {
header
.authorization
.as_ref()
.and_then(|x| x.auth_scheme.clone())
})
.context(NotFoundAuthHeaderSnafu)?;
let user_info = match auth_scheme {
AuthScheme::Basic(Basic { username, password }) => user_provider
.authenticate(
Identity::UserId(&username, None),
Password::PlainText(&password),
)
.await
.map_err(|e| Auth { source: e }),
AuthScheme::Token(_) => Err(UnsupportedAuthScheme {
name: "Token AuthScheme".to_string(),
}),
}
.map_err(|e| Status::unauthenticated(e.to_string()))?;
user_provider
.authorize(
&query_ctx.current_catalog(),
&query_ctx.current_schema(),
&user_info,
)
.await
.map_err(|e| Status::permission_denied(e.to_string()))
}
}
fn create_query_context(header: Option<&RequestHeader>) -> QueryContextRef {
let ctx = QueryContext::arc();
if let Some(header) = header {
if !header.catalog.is_empty() {
ctx.set_current_catalog(&header.catalog);
}
if !header.schema.is_empty() {
ctx.set_current_schema(&header.schema);
}
};
ctx
}

View File

@@ -65,5 +65,5 @@ pub async fn flush(
});
grpc_handler.do_query(request, QueryContext::arc()).await?;
Ok((HttpStatusCode::OK, Json::from("hello, world".to_string())))
Ok((HttpStatusCode::OK, Json::from("done".to_string())))
}

View File

@@ -24,6 +24,7 @@ use common_runtime::{Builder as RuntimeBuilder, Runtime};
use servers::auth::UserProviderRef;
use servers::error::{Result, StartGrpcSnafu, TcpBindSnafu};
use servers::grpc::flight::FlightHandler;
use servers::grpc::handler::GreptimeRequestHandler;
use servers::query_handler::grpc::ServerGrpcQueryHandlerRef;
use servers::server::Server;
use snafu::ResultExt;
@@ -54,11 +55,11 @@ impl MockGrpcServer {
}
fn create_service(&self) -> FlightServiceServer<impl FlightService> {
let service = FlightHandler::new(
let service = FlightHandler::new(Arc::new(GreptimeRequestHandler::new(
self.query_handler.clone(),
self.user_provider.clone(),
self.runtime.clone(),
);
)));
FlightServiceServer::new(service)
}
}

View File

@@ -130,24 +130,8 @@ impl<'a> ParserContext<'a> {
}
}
let connection_options = self
.parser
.parse_options(Keyword::CONNECTION)
.context(error::SyntaxSnafu { sql: self.sql })?;
let connection = connection_options
.into_iter()
.filter_map(|option| {
if let Some(v) = ParserContext::parse_option_string(option.value) {
Some((option.name.value.to_uppercase(), v))
} else {
None
}
})
.collect();
Ok(CopyTable::To(CopyTableTo::new(
table_name, file_name, format, connection,
table_name, file_name, format,
)))
}
@@ -183,7 +167,7 @@ mod tests {
match statement {
Statement::Copy(CopyTable::To(copy_table)) => {
let (catalog, schema, table) =
if let [catalog, schema, table] = &copy_table.table_name.0[..] {
if let [catalog, schema, table] = &copy_table.table_name().0[..] {
(
catalog.value.clone(),
schema.value.clone(),
@@ -197,11 +181,11 @@ mod tests {
assert_eq!("schema0", schema);
assert_eq!("tbl", table);
let file_name = copy_table.file_name;
let file_name = copy_table.file_name();
assert_eq!("tbl_file.parquet", file_name);
let format = copy_table.format;
assert_eq!(Format::Parquet, format);
let format = copy_table.format();
assert_eq!(Format::Parquet, *format);
}
_ => unreachable!(),
}
@@ -291,44 +275,6 @@ mod tests {
}
}
#[test]
fn test_parse_copy_table_to() {
struct Test<'a> {
sql: &'a str,
expected_connection: HashMap<String, String>,
}
let tests = [
Test {
sql: "COPY catalog0.schema0.tbl TO 'tbl_file.parquet' ",
expected_connection: HashMap::new(),
},
Test {
sql: "COPY catalog0.schema0.tbl TO 'tbl_file.parquet' CONNECTION (FOO='Bar', ONE='two')",
expected_connection: [("FOO","Bar"),("ONE","two")].into_iter().map(|(k,v)|{(k.to_string(),v.to_string())}).collect()
},
Test {
sql:"COPY catalog0.schema0.tbl TO 'tbl_file.parquet' WITH (FORMAT = 'parquet') CONNECTION (FOO='Bar', ONE='two')",
expected_connection: [("FOO","Bar"),("ONE","two")].into_iter().map(|(k,v)|{(k.to_string(),v.to_string())}).collect()
},
];
for test in tests {
let mut result =
ParserContext::create_with_dialect(test.sql, &GenericDialect {}).unwrap();
assert_eq!(1, result.len());
let statement = result.remove(0);
assert_matches!(statement, Statement::Copy { .. });
match statement {
Statement::Copy(CopyTable::To(copy_table)) => {
assert_eq!(copy_table.connection.clone(), test.expected_connection);
}
_ => unreachable!(),
}
}
}
#[test]
fn test_parse_copy_table_with_unsupopoted_format() {
let results = [

View File

@@ -26,26 +26,31 @@ pub enum CopyTable {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CopyTableTo {
pub table_name: ObjectName,
pub file_name: String,
pub format: Format,
pub connection: HashMap<String, String>,
table_name: ObjectName,
file_name: String,
format: Format,
}
impl CopyTableTo {
pub(crate) fn new(
table_name: ObjectName,
file_name: String,
format: Format,
connection: HashMap<String, String>,
) -> Self {
pub(crate) fn new(table_name: ObjectName, file_name: String, format: Format) -> Self {
Self {
table_name,
file_name,
format,
connection,
}
}
pub fn table_name(&self) -> &ObjectName {
&self.table_name
}
pub fn file_name(&self) -> &str {
&self.file_name
}
pub fn format(&self) -> &Format {
&self.format
}
}
// TODO: To combine struct CopyTableFrom and CopyTableTo

View File

@@ -266,7 +266,14 @@ impl RegionWriter {
ensure!(!inner.is_closed(), error::ClosedRegionSnafu);
inner.manual_flush(writer_ctx).await
inner.manual_flush(writer_ctx).await?;
// Wait flush.
if let Some(handle) = inner.flush_handle.take() {
handle.join().await?;
}
Ok(())
}
/// Cancel flush task if any
@@ -384,6 +391,7 @@ impl WriterInner {
let next_sequence = committed_sequence + 1;
let version = version_control.current();
let wal_header = WalHeader::with_last_manifest_version(version.manifest_version());
writer_ctx
.wal

View File

@@ -43,7 +43,7 @@ use parquet::basic::{Compression, Encoding};
use parquet::file::metadata::KeyValue;
use parquet::file::properties::WriterProperties;
use parquet::format::FileMetaData;
use parquet::schema::types::SchemaDescriptor;
use parquet::schema::types::{ColumnPath, SchemaDescriptor};
use snafu::{OptionExt, ResultExt};
use table::predicate::Predicate;
use tokio::io::BufReader;
@@ -71,7 +71,7 @@ impl<'a> ParquetWriter<'a> {
file_path,
source,
object_store,
max_row_group_size: 4096, // TODO(hl): make this configurable
max_row_group_size: 64 * 1024, // TODO(hl): make this configurable
}
}
@@ -88,9 +88,25 @@ impl<'a> ParquetWriter<'a> {
let schema = store_schema.arrow_schema().clone();
let object = self.object_store.object(self.file_path);
let ts_col_name = store_schema
.schema()
.timestamp_column()
.unwrap()
.name
.clone();
let writer_props = WriterProperties::builder()
.set_compression(Compression::ZSTD)
.set_encoding(Encoding::PLAIN)
.set_column_dictionary_enabled(ColumnPath::new(vec![ts_col_name.clone()]), false)
.set_column_encoding(
ColumnPath::new(vec![ts_col_name]),
Encoding::DELTA_BINARY_PACKED,
)
.set_column_dictionary_enabled(ColumnPath::new(vec!["__sequence".to_string()]), false)
.set_column_encoding(
ColumnPath::new(vec!["__sequence".to_string()]),
Encoding::DELTA_BINARY_PACKED,
)
.set_max_row_group_size(self.max_row_group_size)
.set_key_value_metadata(extra_meta.map(|map| {
map.iter()

View File

@@ -24,7 +24,7 @@
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use common_telemetry::info;
use common_telemetry::logging;
use store_api::manifest::ManifestVersion;
use store_api::storage::{SchemaRef, SequenceNumber};
@@ -248,7 +248,7 @@ impl Version {
.ssts
.merge(handles_to_add, edit.files_to_remove.into_iter());
info!(
logging::debug!(
"After apply edit, region: {}, SST files: {:?}",
self.metadata.id(),
merged_ssts

View File

@@ -106,6 +106,9 @@ impl<S: LogStore> Wal<S> {
mut header: WalHeader,
payload: Option<&Payload>,
) -> Result<Id> {
if !cfg!(test) && (self.region_id >> 32) >= 1024 {
return Ok(seq);
}
if let Some(p) = payload {
header.mutation_types = wal::gen_mutation_types(p);
}

View File

@@ -197,7 +197,7 @@ impl CreateTableProcedure {
};
match sub_state {
ProcedureState::Running | ProcedureState::Retrying { .. } => Ok(Status::Suspended {
ProcedureState::Running => Ok(Status::Suspended {
subprocedures: Vec::new(),
persist: false,
}),

View File

@@ -13,7 +13,6 @@
// limitations under the License.
use std::sync::Arc;
use std::time::Duration;
use catalog::local::MemoryCatalogManager;
use catalog::CatalogManagerRef;
@@ -60,11 +59,7 @@ impl TestEnv {
let accessor = Fs::default().root(&procedure_dir).build().unwrap();
let object_store = ObjectStore::new(accessor).finish();
let procedure_manager = Arc::new(LocalManager::new(ManagerConfig {
object_store,
max_retry_times: 3,
retry_delay: Duration::from_secs(500),
}));
let procedure_manager = Arc::new(LocalManager::new(ManagerConfig { object_store }));
let catalog_manager = Arc::new(MemoryCatalogManager::default());

View File

@@ -197,7 +197,6 @@ pub struct CopyTableRequest {
pub schema_name: String,
pub table_name: String,
pub file_name: String,
pub connection: HashMap<String, String>,
}
#[derive(Debug)]

View File

@@ -105,14 +105,6 @@ pub trait Table: Send + Sync {
async fn close(&self) -> Result<()> {
Ok(())
}
/// Get region stats in this table.
fn region_stats(&self) -> Result<Vec<RegionStat>> {
UnsupportedSnafu {
operation: "REGION_STATS",
}
.fail()?
}
}
pub type TableRef = Arc<dyn Table>;
@@ -123,9 +115,3 @@ pub trait TableIdProvider {
}
pub type TableIdProviderRef = Arc<dyn TableIdProvider + Send + Sync>;
#[derive(Default, Debug)]
pub struct RegionStat {
pub region_id: u64,
pub disk_usage_bytes: u64,
}

View File

@@ -183,7 +183,7 @@ async fn insert_and_assert(db: &Database) {
row_count: 4,
};
let result = db.insert(request).await;
result.unwrap();
assert_eq!(result.unwrap(), 4);
let result = db
.sql(

View File

@@ -121,13 +121,7 @@ SELECT * FROM integers WHERE i IN ((SELECT i FROM integers)) AND i<3 ORDER BY i;
SELECT i1.i,i2.i FROM integers i1, integers i2 WHERE i IN ((SELECT i FROM integers)) AND i1.i=i2.i ORDER BY 1;
+---+---+
| i | i |
+---+---+
| 1 | 1 |
| 2 | 2 |
| 3 | 3 |
+---+---+
Error: 3000(PlanQuery), Error during planning: column reference i is ambiguous
SELECT * FROM integers i1 WHERE EXISTS(SELECT i FROM integers WHERE i=i1.i) ORDER BY i1.i;