Compare commits

..

55 Commits

Author SHA1 Message Date
evenyag
6247de2d50 chore: Revert "feat: prune in each partition"
This reverts commit 3f9bf48161.
2024-11-08 23:57:18 +08:00
evenyag
a2eb46132f feat: tokio dump 2024-11-08 23:08:47 +08:00
evenyag
3f9bf48161 feat: prune in each partition 2024-11-08 21:31:03 +08:00
evenyag
9bd2e006b5 feat: file output thread id 2024-11-08 20:35:40 +08:00
evenyag
031421ca91 feat: add thread id to log 2024-11-08 19:09:08 +08:00
evenyag
999f3a40c2 chore: Revert "chore: Revert "feat: yield after get ranges""
This reverts commit 770a850437.
2024-11-08 17:12:54 +08:00
evenyag
50d28e0a00 feat: add timeout to build ranges 2024-11-08 16:23:03 +08:00
evenyag
770a850437 chore: Revert "feat: yield after get ranges"
This reverts commit 65e53b5bc4.
2024-11-08 15:35:23 +08:00
evenyag
65e53b5bc4 feat: yield after get ranges 2024-11-08 01:28:39 +08:00
evenyag
9a6c7aa4d6 chore: log label 2024-11-08 01:08:02 +08:00
evenyag
4f446b95d8 chore: logs 2024-11-08 01:01:27 +08:00
evenyag
9ad4200f55 feat: only log for unordered scan 2024-11-08 00:58:29 +08:00
evenyag
53d456651f chore: range builder logs 2024-11-08 00:11:54 +08:00
evenyag
f11c5acb0f feat: logs for debug prune cost 2024-11-07 22:10:46 +08:00
evenyag
8536a1ec6e chore: logs to debug hang 2024-11-07 20:36:12 +08:00
evenyag
fce8c968da feat: gauge for scan partition 2024-11-07 16:55:40 +08:00
evenyag
98a6ac973c feat: log on merge scan region start/end 2024-11-07 16:48:03 +08:00
evenyag
8f79e421c3 chore: Revert "feat: remove too large files"
This reverts commit a22667bf3c.
2024-11-07 16:20:39 +08:00
evenyag
e8b326382f chore: fix compile 2024-11-07 00:28:19 +08:00
evenyag
56781e7fbc fix: skip expired files 2024-11-07 00:25:52 +08:00
evenyag
7d342b3d95 feat: small max file size 2024-11-06 23:31:16 +08:00
evenyag
a22667bf3c feat: remove too large files 2024-11-06 22:08:43 +08:00
evenyag
29b9b7db0c feat: support compression method 2024-11-06 18:58:20 +08:00
evenyag
a66909a562 chore: fix compile 2024-11-06 16:29:18 +08:00
evenyag
8137b8ff3d chore: more logs 2024-11-06 15:25:49 +08:00
Ruihang Xia
7c5cd2922a fix split logic
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-11-05 23:43:35 +08:00
evenyag
a1d0dcf2c3 chore: more logs 2024-11-05 20:25:05 +08:00
evenyag
c391171f99 feat: more logs 2024-11-05 20:18:35 +08:00
evenyag
f44862aaac feat: update log 2024-11-05 17:47:32 +08:00
evenyag
8bf795d88c chore: more logs 2024-11-05 16:22:54 +08:00
evenyag
3bbf4e0232 feat: log range meta 2024-11-05 16:01:55 +08:00
evenyag
83da3950da chore: debug 2024-11-05 15:33:42 +08:00
evenyag
957b5effd5 chore: fix compile 2024-11-05 15:32:35 +08:00
evenyag
f59e28006a feat: assert precision 2024-11-05 15:24:40 +08:00
evenyag
3e5bbdf71e feat: enable batch checker 2024-11-05 15:24:40 +08:00
Ruihang Xia
b8ac19c480 log on wrong range index
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-11-05 14:57:44 +08:00
evenyag
92b274a856 chore: log compute cost 2024-11-05 14:57:54 +08:00
evenyag
6bdac25f0a chore: more logs 2024-11-05 13:02:16 +08:00
evenyag
a9f3c4b17c chore: page reader metrics 2024-11-04 20:08:56 +08:00
evenyag
e003eaab36 chore: more log 2024-11-04 20:06:20 +08:00
evenyag
6e590da412 chore: remove compaction skip log 2024-11-04 19:40:42 +08:00
evenyag
ff5fa40b85 feat: skip wal 2024-11-04 19:40:41 +08:00
Ruihang Xia
d4aa4159d4 feat: support windowed sort with where condition
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-11-04 19:34:03 +08:00
evenyag
960f6d821b feat: spawn block write wal 2024-11-04 17:35:12 +08:00
Ruihang Xia
9c5d044238 Merge branch 'main' into transform-count-min-max
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-11-01 17:45:28 +08:00
Ruihang Xia
70c354eed6 fix: the way to retrieve time index column
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-11-01 12:10:12 +08:00
Ruihang Xia
23bf663d58 feat: handle sort that wont preserving partition
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-10-31 22:13:36 +08:00
Ruihang Xia
817648eac5 Merge branch 'main' into transform-count-min-max
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-10-31 15:38:12 +08:00
Ruihang Xia
03b29439e2 Merge branch 'main' into transform-count-min-max
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-09-11 11:09:07 +08:00
Ruihang Xia
712f4ca0ef try sort partial commutative
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-09-09 21:08:59 +08:00
Ruihang Xia
60bacff57e ignore unmatched left and right greater
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-09-08 11:12:21 +08:00
Ruihang Xia
6208772ba4 Merge branch 'main' into transform-count-min-max
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-09-08 11:02:04 +08:00
Ruihang Xia
67184c0498 Merge branch 'main' into transform-count-min-max
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-09-05 14:30:47 +08:00
Ruihang Xia
1dd908fdf7 handle group by
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-09-05 12:50:13 +08:00
Ruihang Xia
8179b4798e feat: support transforming min/max/count aggr fn
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-09-04 22:17:31 +08:00
792 changed files with 14920 additions and 52775 deletions

View File

@@ -54,7 +54,7 @@ runs:
PROFILE_TARGET: ${{ inputs.cargo-profile == 'dev' && 'debug' || inputs.cargo-profile }} PROFILE_TARGET: ${{ inputs.cargo-profile == 'dev' && 'debug' || inputs.cargo-profile }}
with: with:
artifacts-dir: ${{ inputs.artifacts-dir }} artifacts-dir: ${{ inputs.artifacts-dir }}
target-files: ./target/$PROFILE_TARGET/greptime target-file: ./target/$PROFILE_TARGET/greptime
version: ${{ inputs.version }} version: ${{ inputs.version }}
working-dir: ${{ inputs.working-dir }} working-dir: ${{ inputs.working-dir }}
@@ -72,6 +72,6 @@ runs:
if: ${{ inputs.build-android-artifacts == 'true' }} if: ${{ inputs.build-android-artifacts == 'true' }}
with: with:
artifacts-dir: ${{ inputs.artifacts-dir }} artifacts-dir: ${{ inputs.artifacts-dir }}
target-files: ./target/aarch64-linux-android/release/greptime target-file: ./target/aarch64-linux-android/release/greptime
version: ${{ inputs.version }} version: ${{ inputs.version }}
working-dir: ${{ inputs.working-dir }} working-dir: ${{ inputs.working-dir }}

View File

@@ -41,8 +41,8 @@ runs:
image-name: ${{ inputs.image-name }} image-name: ${{ inputs.image-name }}
image-tag: ${{ inputs.version }} image-tag: ${{ inputs.version }}
docker-file: docker/ci/ubuntu/Dockerfile docker-file: docker/ci/ubuntu/Dockerfile
amd64-artifact-name: greptime-linux-amd64-${{ inputs.version }} amd64-artifact-name: greptime-linux-amd64-pyo3-${{ inputs.version }}
arm64-artifact-name: greptime-linux-arm64-${{ inputs.version }} arm64-artifact-name: greptime-linux-arm64-pyo3-${{ inputs.version }}
platforms: linux/amd64,linux/arm64 platforms: linux/amd64,linux/arm64
push-latest-tag: ${{ inputs.push-latest-tag }} push-latest-tag: ${{ inputs.push-latest-tag }}

View File

@@ -48,7 +48,19 @@ runs:
path: /tmp/greptime-*.log path: /tmp/greptime-*.log
retention-days: 3 retention-days: 3
- name: Build greptime - name: Build standard greptime
uses: ./.github/actions/build-greptime-binary
with:
base-image: ubuntu
features: pyo3_backend,servers/dashboard
cargo-profile: ${{ inputs.cargo-profile }}
artifacts-dir: greptime-linux-${{ inputs.arch }}-pyo3-${{ inputs.version }}
version: ${{ inputs.version }}
working-dir: ${{ inputs.working-dir }}
image-registry: ${{ inputs.image-registry }}
image-namespace: ${{ inputs.image-namespace }}
- name: Build greptime without pyo3
if: ${{ inputs.dev-mode == 'false' }} if: ${{ inputs.dev-mode == 'false' }}
uses: ./.github/actions/build-greptime-binary uses: ./.github/actions/build-greptime-binary
with: with:

View File

@@ -90,5 +90,5 @@ runs:
uses: ./.github/actions/upload-artifacts uses: ./.github/actions/upload-artifacts
with: with:
artifacts-dir: ${{ inputs.artifacts-dir }} artifacts-dir: ${{ inputs.artifacts-dir }}
target-files: target/${{ inputs.arch }}/${{ inputs.cargo-profile }}/greptime target-file: target/${{ inputs.arch }}/${{ inputs.cargo-profile }}/greptime
version: ${{ inputs.version }} version: ${{ inputs.version }}

View File

@@ -33,6 +33,15 @@ runs:
- name: Rust Cache - name: Rust Cache
uses: Swatinem/rust-cache@v2 uses: Swatinem/rust-cache@v2
- name: Install Python
uses: actions/setup-python@v5
with:
python-version: "3.10"
- name: Install PyArrow Package
shell: pwsh
run: pip install pyarrow numpy
- name: Install WSL distribution - name: Install WSL distribution
uses: Vampire/setup-wsl@v2 uses: Vampire/setup-wsl@v2
with: with:
@@ -67,5 +76,5 @@ runs:
uses: ./.github/actions/upload-artifacts uses: ./.github/actions/upload-artifacts
with: with:
artifacts-dir: ${{ inputs.artifacts-dir }} artifacts-dir: ${{ inputs.artifacts-dir }}
target-files: target/${{ inputs.arch }}/${{ inputs.cargo-profile }}/greptime,target/${{ inputs.arch }}/${{ inputs.cargo-profile }}/greptime.pdb target-file: target/${{ inputs.arch }}/${{ inputs.cargo-profile }}/greptime
version: ${{ inputs.version }} version: ${{ inputs.version }}

View File

@@ -8,7 +8,7 @@ inputs:
default: 2 default: 2
description: "Number of Datanode replicas" description: "Number of Datanode replicas"
meta-replicas: meta-replicas:
default: 1 default: 3
description: "Number of Metasrv replicas" description: "Number of Metasrv replicas"
image-registry: image-registry:
default: "docker.io" default: "docker.io"
@@ -58,7 +58,7 @@ runs:
--set image.tag=${{ inputs.image-tag }} \ --set image.tag=${{ inputs.image-tag }} \
--set base.podTemplate.main.resources.requests.cpu=50m \ --set base.podTemplate.main.resources.requests.cpu=50m \
--set base.podTemplate.main.resources.requests.memory=256Mi \ --set base.podTemplate.main.resources.requests.memory=256Mi \
--set base.podTemplate.main.resources.limits.cpu=2000m \ --set base.podTemplate.main.resources.limits.cpu=1000m \
--set base.podTemplate.main.resources.limits.memory=2Gi \ --set base.podTemplate.main.resources.limits.memory=2Gi \
--set frontend.replicas=${{ inputs.frontend-replicas }} \ --set frontend.replicas=${{ inputs.frontend-replicas }} \
--set datanode.replicas=${{ inputs.datanode-replicas }} \ --set datanode.replicas=${{ inputs.datanode-replicas }} \

View File

@@ -5,7 +5,7 @@ meta:
[datanode] [datanode]
[datanode.client] [datanode.client]
timeout = "120s" timeout = "60s"
datanode: datanode:
configData: |- configData: |-
[runtime] [runtime]
@@ -21,7 +21,7 @@ frontend:
global_rt_size = 4 global_rt_size = 4
[meta_client] [meta_client]
ddl_timeout = "120s" ddl_timeout = "60s"
objectStorage: objectStorage:
s3: s3:
bucket: default bucket: default

View File

@@ -5,7 +5,7 @@ meta:
[datanode] [datanode]
[datanode.client] [datanode.client]
timeout = "120s" timeout = "60s"
datanode: datanode:
configData: |- configData: |-
[runtime] [runtime]
@@ -17,7 +17,7 @@ frontend:
global_rt_size = 4 global_rt_size = 4
[meta_client] [meta_client]
ddl_timeout = "120s" ddl_timeout = "60s"
objectStorage: objectStorage:
s3: s3:
bucket: default bucket: default

View File

@@ -11,7 +11,7 @@ meta:
[datanode] [datanode]
[datanode.client] [datanode.client]
timeout = "120s" timeout = "60s"
datanode: datanode:
configData: |- configData: |-
[runtime] [runtime]
@@ -28,7 +28,7 @@ frontend:
global_rt_size = 4 global_rt_size = 4
[meta_client] [meta_client]
ddl_timeout = "120s" ddl_timeout = "60s"
objectStorage: objectStorage:
s3: s3:
bucket: default bucket: default

View File

@@ -18,8 +18,6 @@ runs:
--set controller.replicaCount=${{ inputs.controller-replicas }} \ --set controller.replicaCount=${{ inputs.controller-replicas }} \
--set controller.resources.requests.cpu=50m \ --set controller.resources.requests.cpu=50m \
--set controller.resources.requests.memory=128Mi \ --set controller.resources.requests.memory=128Mi \
--set controller.resources.limits.cpu=2000m \
--set controller.resources.limits.memory=2Gi \
--set listeners.controller.protocol=PLAINTEXT \ --set listeners.controller.protocol=PLAINTEXT \
--set listeners.client.protocol=PLAINTEXT \ --set listeners.client.protocol=PLAINTEXT \
--create-namespace \ --create-namespace \

View File

@@ -4,8 +4,8 @@ inputs:
artifacts-dir: artifacts-dir:
description: Directory to store artifacts description: Directory to store artifacts
required: true required: true
target-files: target-file:
description: The multiple target files to upload, separated by comma description: The path of the target artifact
required: false required: false
version: version:
description: Version of the artifact description: Version of the artifact
@@ -18,16 +18,12 @@ runs:
using: composite using: composite
steps: steps:
- name: Create artifacts directory - name: Create artifacts directory
if: ${{ inputs.target-files != '' }} if: ${{ inputs.target-file != '' }}
working-directory: ${{ inputs.working-dir }} working-directory: ${{ inputs.working-dir }}
shell: bash shell: bash
run: | run: |
set -e mkdir -p ${{ inputs.artifacts-dir }} && \
mkdir -p ${{ inputs.artifacts-dir }} cp ${{ inputs.target-file }} ${{ inputs.artifacts-dir }}
IFS=',' read -ra FILES <<< "${{ inputs.target-files }}"
for file in "${FILES[@]}"; do
cp "$file" ${{ inputs.artifacts-dir }}/
done
# The compressed artifacts will use the following layout: # The compressed artifacts will use the following layout:
# greptime-linux-amd64-pyo3-v0.3.0sha256sum # greptime-linux-amd64-pyo3-v0.3.0sha256sum

View File

@@ -1,3 +0,0 @@
native-tls
openssl
aws-lc-sys

View File

@@ -4,8 +4,7 @@ I hereby agree to the terms of the [GreptimeDB CLA](https://github.com/GreptimeT
## What's changed and what's your intention? ## What's changed and what's your intention?
<!-- __!!! DO NOT LEAVE THIS BLOCK EMPTY !!!__
__!!! DO NOT LEAVE THIS BLOCK EMPTY !!!__
Please explain IN DETAIL what the changes are in this PR and why they are needed: Please explain IN DETAIL what the changes are in this PR and why they are needed:
@@ -13,14 +12,9 @@ Please explain IN DETAIL what the changes are in this PR and why they are needed
- How does this PR work? Need a brief introduction for the changed logic (optional) - How does this PR work? Need a brief introduction for the changed logic (optional)
- Describe clearly one logical change and avoid lazy messages (optional) - Describe clearly one logical change and avoid lazy messages (optional)
- Describe any limitations of the current code (optional) - Describe any limitations of the current code (optional)
- Describe if this PR will break **API or data compatibility** (optional)
-->
## PR Checklist ## Checklist
Please convert it to a draft if some of the following conditions are not met.
- [ ] I have written the necessary rustdoc comments. - [ ] I have written the necessary rustdoc comments.
- [ ] I have added the necessary unit tests and integration tests. - [ ] I have added the necessary unit tests and integration tests.
- [ ] This PR requires documentation updates. - [ ] This PR requires documentation updates.
- [ ] API changes are backward compatible.
- [ ] Schema or data changes are backward compatible.

View File

@@ -1,14 +0,0 @@
#!/bin/sh
set -e
# Get the latest version of github.com/GreptimeTeam/greptimedb
VERSION=$(curl -s https://api.github.com/repos/GreptimeTeam/greptimedb/releases/latest | jq -r '.tag_name')
echo "Downloading the latest version: $VERSION"
# Download the install script
curl -fsSL https://raw.githubusercontent.com/greptimeteam/greptimedb/main/scripts/install.sh | sh -s $VERSION
# Execute the `greptime` command
./greptime --version

View File

@@ -1,36 +0,0 @@
name: Check Dependencies
on:
push:
branches:
- main
pull_request:
branches:
- main
jobs:
check-dependencies:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Rust
uses: actions-rust-lang/setup-rust-toolchain@v1
- name: Run cargo tree
run: cargo tree --prefix none > dependencies.txt
- name: Extract dependency names
run: awk '{print $1}' dependencies.txt > dependency_names.txt
- name: Check for blacklisted crates
run: |
while read -r dep; do
if grep -qFx "$dep" dependency_names.txt; then
echo "Blacklisted crate '$dep' found in dependencies."
exit 1
fi
done < .github/cargo-blacklist.txt
echo "No blacklisted crates found."

View File

@@ -10,6 +10,17 @@ on:
- 'docker/**' - 'docker/**'
- '.gitignore' - '.gitignore'
- 'grafana/**' - 'grafana/**'
push:
branches:
- main
paths-ignore:
- 'docs/**'
- 'config/**'
- '**.md'
- '.dockerignore'
- 'docker/**'
- '.gitignore'
- 'grafana/**'
workflow_dispatch: workflow_dispatch:
name: CI name: CI
@@ -73,7 +84,7 @@ jobs:
# Shares across multiple jobs # Shares across multiple jobs
shared-key: "check-toml" shared-key: "check-toml"
- name: Install taplo - name: Install taplo
run: cargo +stable install taplo-cli --version ^0.9 --locked --force run: cargo +stable install taplo-cli --version ^0.9 --locked
- name: Run taplo - name: Run taplo
run: taplo format --check run: taplo format --check
@@ -96,7 +107,7 @@ jobs:
shared-key: "build-binaries" shared-key: "build-binaries"
- name: Install cargo-gc-bin - name: Install cargo-gc-bin
shell: bash shell: bash
run: cargo install cargo-gc-bin --force run: cargo install cargo-gc-bin
- name: Build greptime binaries - name: Build greptime binaries
shell: bash shell: bash
# `cargo gc` will invoke `cargo build` with specified args # `cargo gc` will invoke `cargo build` with specified args
@@ -152,7 +163,7 @@ jobs:
run: | run: |
sudo apt-get install -y libfuzzer-14-dev sudo apt-get install -y libfuzzer-14-dev
rustup install nightly rustup install nightly
cargo +nightly install cargo-fuzz cargo-gc-bin --force cargo +nightly install cargo-fuzz cargo-gc-bin
- name: Download pre-built binaries - name: Download pre-built binaries
uses: actions/download-artifact@v4 uses: actions/download-artifact@v4
with: with:
@@ -209,7 +220,7 @@ jobs:
shell: bash shell: bash
run: | run: |
sudo apt update && sudo apt install -y libfuzzer-14-dev sudo apt update && sudo apt install -y libfuzzer-14-dev
cargo install cargo-fuzz cargo-gc-bin --force cargo install cargo-fuzz cargo-gc-bin
- name: Download pre-built binariy - name: Download pre-built binariy
uses: actions/download-artifact@v4 uses: actions/download-artifact@v4
with: with:
@@ -257,7 +268,14 @@ jobs:
shared-key: "build-greptime-ci" shared-key: "build-greptime-ci"
- name: Install cargo-gc-bin - name: Install cargo-gc-bin
shell: bash shell: bash
run: cargo install cargo-gc-bin --force run: cargo install cargo-gc-bin
- name: Check aws-lc-sys will not build
shell: bash
run: |
if cargo tree -i aws-lc-sys -e features | grep -q aws-lc-sys; then
echo "Found aws-lc-sys, which has compilation problems on older gcc versions. Please replace it with ring until its building experience improves."
exit 1
fi
- name: Build greptime bianry - name: Build greptime bianry
shell: bash shell: bash
# `cargo gc` will invoke `cargo build` with specified args # `cargo gc` will invoke `cargo build` with specified args
@@ -312,6 +330,8 @@ jobs:
uses: ./.github/actions/setup-kafka-cluster uses: ./.github/actions/setup-kafka-cluster
- name: Setup Etcd cluser - name: Setup Etcd cluser
uses: ./.github/actions/setup-etcd-cluster uses: ./.github/actions/setup-etcd-cluster
- name: Setup Postgres cluser
uses: ./.github/actions/setup-postgres-cluster
# Prepares for fuzz tests # Prepares for fuzz tests
- uses: arduino/setup-protoc@v3 - uses: arduino/setup-protoc@v3
with: with:
@@ -327,7 +347,7 @@ jobs:
run: | run: |
sudo apt-get install -y libfuzzer-14-dev sudo apt-get install -y libfuzzer-14-dev
rustup install nightly rustup install nightly
cargo +nightly install cargo-fuzz cargo-gc-bin --force cargo +nightly install cargo-fuzz cargo-gc-bin
# Downloads ci image # Downloads ci image
- name: Download pre-built binariy - name: Download pre-built binariy
uses: actions/download-artifact@v4 uses: actions/download-artifact@v4
@@ -461,6 +481,8 @@ jobs:
uses: ./.github/actions/setup-kafka-cluster uses: ./.github/actions/setup-kafka-cluster
- name: Setup Etcd cluser - name: Setup Etcd cluser
uses: ./.github/actions/setup-etcd-cluster uses: ./.github/actions/setup-etcd-cluster
- name: Setup Postgres cluser
uses: ./.github/actions/setup-postgres-cluster
# Prepares for fuzz tests # Prepares for fuzz tests
- uses: arduino/setup-protoc@v3 - uses: arduino/setup-protoc@v3
with: with:
@@ -476,7 +498,7 @@ jobs:
run: | run: |
sudo apt-get install -y libfuzzer-14-dev sudo apt-get install -y libfuzzer-14-dev
rustup install nightly rustup install nightly
cargo +nightly install cargo-fuzz cargo-gc-bin --force cargo +nightly install cargo-fuzz cargo-gc-bin
# Downloads ci image # Downloads ci image
- name: Download pre-built binariy - name: Download pre-built binariy
uses: actions/download-artifact@v4 uses: actions/download-artifact@v4
@@ -642,7 +664,6 @@ jobs:
if: github.event.pull_request.draft == false if: github.event.pull_request.draft == false
runs-on: ubuntu-20.04-8-cores runs-on: ubuntu-20.04-8-cores
timeout-minutes: 60 timeout-minutes: 60
needs: [clippy, fmt]
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
- uses: arduino/setup-protoc@v3 - uses: arduino/setup-protoc@v3
@@ -668,6 +689,12 @@ jobs:
uses: taiki-e/install-action@nextest uses: taiki-e/install-action@nextest
- name: Install cargo-llvm-cov - name: Install cargo-llvm-cov
uses: taiki-e/install-action@cargo-llvm-cov uses: taiki-e/install-action@cargo-llvm-cov
- name: Install Python
uses: actions/setup-python@v5
with:
python-version: '3.10'
- name: Install PyArrow Package
run: pip install pyarrow numpy
- name: Setup etcd server - name: Setup etcd server
working-directory: tests-integration/fixtures/etcd working-directory: tests-integration/fixtures/etcd
run: docker compose -f docker-compose-standalone.yml up -d --wait run: docker compose -f docker-compose-standalone.yml up -d --wait
@@ -681,7 +708,7 @@ jobs:
working-directory: tests-integration/fixtures/postgres working-directory: tests-integration/fixtures/postgres
run: docker compose -f docker-compose-standalone.yml up -d --wait run: docker compose -f docker-compose-standalone.yml up -d --wait
- name: Run nextest cases - name: Run nextest cases
run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F dashboard -F pg_kvbackend run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F pyo3_backend -F dashboard
env: env:
CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=lld" CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=lld"
RUST_BACKTRACE: 1 RUST_BACKTRACE: 1

View File

@@ -12,7 +12,7 @@ on:
linux_amd64_runner: linux_amd64_runner:
type: choice type: choice
description: The runner uses to build linux-amd64 artifacts description: The runner uses to build linux-amd64 artifacts
default: ec2-c6i.4xlarge-amd64 default: ec2-c6i.2xlarge-amd64
options: options:
- ubuntu-20.04 - ubuntu-20.04
- ubuntu-20.04-8-cores - ubuntu-20.04-8-cores
@@ -27,7 +27,7 @@ on:
linux_arm64_runner: linux_arm64_runner:
type: choice type: choice
description: The runner uses to build linux-arm64 artifacts description: The runner uses to build linux-arm64 artifacts
default: ec2-c6g.4xlarge-arm64 default: ec2-c6g.2xlarge-arm64
options: options:
- ec2-c6g.xlarge-arm64 # 4C8G - ec2-c6g.xlarge-arm64 # 4C8G
- ec2-c6g.2xlarge-arm64 # 8C16G - ec2-c6g.2xlarge-arm64 # 8C16G

View File

@@ -1,6 +1,6 @@
on: on:
schedule: schedule:
- cron: "0 23 * * 1-4" - cron: "0 23 * * 1-5"
workflow_dispatch: workflow_dispatch:
name: Nightly CI name: Nightly CI
@@ -22,10 +22,6 @@ jobs:
uses: actions/checkout@v4 uses: actions/checkout@v4
with: with:
fetch-depth: 0 fetch-depth: 0
- name: Check install.sh
run: ./.github/scripts/check-install-script.sh
- name: Run sqlness test - name: Run sqlness test
uses: ./.github/actions/sqlness-test uses: ./.github/actions/sqlness-test
with: with:
@@ -91,12 +87,18 @@ jobs:
uses: Swatinem/rust-cache@v2 uses: Swatinem/rust-cache@v2
- name: Install Cargo Nextest - name: Install Cargo Nextest
uses: taiki-e/install-action@nextest uses: taiki-e/install-action@nextest
- name: Install Python
uses: actions/setup-python@v5
with:
python-version: "3.10"
- name: Install PyArrow Package
run: pip install pyarrow numpy
- name: Install WSL distribution - name: Install WSL distribution
uses: Vampire/setup-wsl@v2 uses: Vampire/setup-wsl@v2
with: with:
distribution: Ubuntu-22.04 distribution: Ubuntu-22.04
- name: Running tests - name: Running tests
run: cargo nextest run -F dashboard run: cargo nextest run -F pyo3_backend,dashboard
env: env:
CARGO_BUILD_RUSTFLAGS: "-C linker=lld-link" CARGO_BUILD_RUSTFLAGS: "-C linker=lld-link"
RUST_BACKTRACE: 1 RUST_BACKTRACE: 1
@@ -108,16 +110,6 @@ jobs:
GT_S3_REGION: ${{ vars.AWS_CI_TEST_BUCKET_REGION }} GT_S3_REGION: ${{ vars.AWS_CI_TEST_BUCKET_REGION }}
UNITTEST_LOG_DIR: "__unittest_logs" UNITTEST_LOG_DIR: "__unittest_logs"
cleanbuild-linux-nix:
runs-on: ubuntu-latest-8-cores
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
- uses: cachix/install-nix-action@v27
with:
nix_path: nixpkgs=channel:nixos-unstable
- run: nix-shell --pure --run "cargo build"
check-status: check-status:
name: Check status name: Check status
needs: [sqlness-test, sqlness-windows, test-on-windows] needs: [sqlness-test, sqlness-windows, test-on-windows]

View File

@@ -31,7 +31,7 @@ on:
linux_arm64_runner: linux_arm64_runner:
type: choice type: choice
description: The runner uses to build linux-arm64 artifacts description: The runner uses to build linux-arm64 artifacts
default: ec2-c6g.8xlarge-arm64 default: ec2-c6g.4xlarge-arm64
options: options:
- ubuntu-2204-32-cores-arm - ubuntu-2204-32-cores-arm
- ec2-c6g.xlarge-arm64 # 4C8G - ec2-c6g.xlarge-arm64 # 4C8G
@@ -91,7 +91,7 @@ env:
# The scheduled version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-YYYYMMDD', like v0.2.0-nigthly-20230313; # The scheduled version is '${{ env.NEXT_RELEASE_VERSION }}-nightly-YYYYMMDD', like v0.2.0-nigthly-20230313;
NIGHTLY_RELEASE_PREFIX: nightly NIGHTLY_RELEASE_PREFIX: nightly
# Note: The NEXT_RELEASE_VERSION should be modified manually by every formal release. # Note: The NEXT_RELEASE_VERSION should be modified manually by every formal release.
NEXT_RELEASE_VERSION: v0.12.0 NEXT_RELEASE_VERSION: v0.10.0
# Permission reference: https://docs.github.com/en/actions/using-jobs/assigning-permissions-to-jobs # Permission reference: https://docs.github.com/en/actions/using-jobs/assigning-permissions-to-jobs
permissions: permissions:
@@ -222,10 +222,18 @@ jobs:
arch: aarch64-apple-darwin arch: aarch64-apple-darwin
features: servers/dashboard features: servers/dashboard
artifacts-dir-prefix: greptime-darwin-arm64 artifacts-dir-prefix: greptime-darwin-arm64
- os: ${{ needs.allocate-runners.outputs.macos-runner }}
arch: aarch64-apple-darwin
features: pyo3_backend,servers/dashboard
artifacts-dir-prefix: greptime-darwin-arm64-pyo3
- os: ${{ needs.allocate-runners.outputs.macos-runner }} - os: ${{ needs.allocate-runners.outputs.macos-runner }}
features: servers/dashboard features: servers/dashboard
arch: x86_64-apple-darwin arch: x86_64-apple-darwin
artifacts-dir-prefix: greptime-darwin-amd64 artifacts-dir-prefix: greptime-darwin-amd64
- os: ${{ needs.allocate-runners.outputs.macos-runner }}
features: pyo3_backend,servers/dashboard
arch: x86_64-apple-darwin
artifacts-dir-prefix: greptime-darwin-amd64-pyo3
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
outputs: outputs:
build-macos-result: ${{ steps.set-build-macos-result.outputs.build-macos-result }} build-macos-result: ${{ steps.set-build-macos-result.outputs.build-macos-result }}
@@ -263,6 +271,10 @@ jobs:
arch: x86_64-pc-windows-msvc arch: x86_64-pc-windows-msvc
features: servers/dashboard features: servers/dashboard
artifacts-dir-prefix: greptime-windows-amd64 artifacts-dir-prefix: greptime-windows-amd64
- os: ${{ needs.allocate-runners.outputs.windows-runner }}
arch: x86_64-pc-windows-msvc
features: pyo3_backend,servers/dashboard
artifacts-dir-prefix: greptime-windows-amd64-pyo3
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
outputs: outputs:
build-windows-result: ${{ steps.set-build-windows-result.outputs.build-windows-result }} build-windows-result: ${{ steps.set-build-windows-result.outputs.build-windows-result }}

6
.gitignore vendored
View File

@@ -47,10 +47,6 @@ benchmarks/data
venv/ venv/
# Fuzz tests # Fuzz tests
tests-fuzz/artifacts/ tests-fuzz/artifacts/
tests-fuzz/corpus/ tests-fuzz/corpus/
# Nix
.direnv
.envrc

View File

@@ -17,6 +17,6 @@ repos:
- id: fmt - id: fmt
- id: clippy - id: clippy
args: ["--workspace", "--all-targets", "--all-features", "--", "-D", "warnings"] args: ["--workspace", "--all-targets", "--all-features", "--", "-D", "warnings"]
stages: [pre-push] stages: [push]
- id: cargo-check - id: cargo-check
args: ["--workspace", "--all-targets", "--all-features"] args: ["--workspace", "--all-targets", "--all-features"]

View File

@@ -7,8 +7,6 @@
* [NiwakaDev](https://github.com/NiwakaDev) * [NiwakaDev](https://github.com/NiwakaDev)
* [etolbakov](https://github.com/etolbakov) * [etolbakov](https://github.com/etolbakov)
* [irenjj](https://github.com/irenjj) * [irenjj](https://github.com/irenjj)
* [tisonkun](https://github.com/tisonkun)
* [Lanqing Yang](https://github.com/lyang24)
## Team Members (in alphabetical order) ## Team Members (in alphabetical order)
@@ -32,6 +30,7 @@
* [shuiyisong](https://github.com/shuiyisong) * [shuiyisong](https://github.com/shuiyisong)
* [sunchanglong](https://github.com/sunchanglong) * [sunchanglong](https://github.com/sunchanglong)
* [sunng87](https://github.com/sunng87) * [sunng87](https://github.com/sunng87)
* [tisonkun](https://github.com/tisonkun)
* [v0y4g3r](https://github.com/v0y4g3r) * [v0y4g3r](https://github.com/v0y4g3r)
* [waynexia](https://github.com/waynexia) * [waynexia](https://github.com/waynexia)
* [xtang](https://github.com/xtang) * [xtang](https://github.com/xtang)

1401
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -2,26 +2,23 @@
members = [ members = [
"src/api", "src/api",
"src/auth", "src/auth",
"src/cache",
"src/catalog", "src/catalog",
"src/cli", "src/cache",
"src/client", "src/client",
"src/cmd", "src/cmd",
"src/common/base", "src/common/base",
"src/common/catalog", "src/common/catalog",
"src/common/config", "src/common/config",
"src/common/datasource", "src/common/datasource",
"src/common/decimal",
"src/common/error", "src/common/error",
"src/common/frontend", "src/common/frontend",
"src/common/function", "src/common/function",
"src/common/macro",
"src/common/greptimedb-telemetry", "src/common/greptimedb-telemetry",
"src/common/grpc", "src/common/grpc",
"src/common/grpc-expr", "src/common/grpc-expr",
"src/common/macro",
"src/common/mem-prof", "src/common/mem-prof",
"src/common/meta", "src/common/meta",
"src/common/options",
"src/common/plugins", "src/common/plugins",
"src/common/pprof", "src/common/pprof",
"src/common/procedure", "src/common/procedure",
@@ -33,6 +30,7 @@ members = [
"src/common/telemetry", "src/common/telemetry",
"src/common/test-util", "src/common/test-util",
"src/common/time", "src/common/time",
"src/common/decimal",
"src/common/version", "src/common/version",
"src/common/wal", "src/common/wal",
"src/datanode", "src/datanode",
@@ -40,8 +38,6 @@ members = [
"src/file-engine", "src/file-engine",
"src/flow", "src/flow",
"src/frontend", "src/frontend",
"src/index",
"src/log-query",
"src/log-store", "src/log-store",
"src/meta-client", "src/meta-client",
"src/meta-srv", "src/meta-srv",
@@ -61,6 +57,7 @@ members = [
"src/sql", "src/sql",
"src/store-api", "src/store-api",
"src/table", "src/table",
"src/index",
"tests-fuzz", "tests-fuzz",
"tests-integration", "tests-integration",
"tests/runner", "tests/runner",
@@ -68,7 +65,7 @@ members = [
resolver = "2" resolver = "2"
[workspace.package] [workspace.package]
version = "0.12.0" version = "0.9.5"
edition = "2021" edition = "2021"
license = "Apache-2.0" license = "Apache-2.0"
@@ -120,22 +117,19 @@ datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "7823ef2f63663907edab46af0d51359900f608d6" } datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "7823ef2f63663907edab46af0d51359900f608d6" }
derive_builder = "0.12" derive_builder = "0.12"
dotenv = "0.15" dotenv = "0.15"
etcd-client = "0.13" etcd-client = { version = "0.13" }
fst = "0.4.7" fst = "0.4.7"
futures = "0.3" futures = "0.3"
futures-util = "0.3" futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a875e976441188028353f7274a46a7e6e065c5d4" } greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "255f87a3318ace3f88a67f76995a0e14910983f4" }
hex = "0.4"
http = "0.2"
humantime = "2.1" humantime = "2.1"
humantime-serde = "1.1" humantime-serde = "1.1"
itertools = "0.10" itertools = "0.10"
jsonb = { git = "https://github.com/databendlabs/jsonb.git", rev = "8c8d2fc294a39f3ff08909d60f718639cfba3875", default-features = false } jsonb = { git = "https://github.com/databendlabs/jsonb.git", rev = "46ad50fc71cf75afbf98eec455f7892a6387c1fc", default-features = false }
lazy_static = "1.4" lazy_static = "1.4"
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "a10facb353b41460eeb98578868ebf19c2084fac" } meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "a10facb353b41460eeb98578868ebf19c2084fac" }
mockall = "0.11.4" mockall = "0.11.4"
moka = "0.12" moka = "0.12"
nalgebra = "0.33"
notify = "6.1" notify = "6.1"
num_cpus = "1.16" num_cpus = "1.16"
once_cell = "1.18" once_cell = "1.18"
@@ -157,7 +151,7 @@ raft-engine = { version = "0.4.1", default-features = false }
rand = "0.8" rand = "0.8"
ratelimit = "0.9" ratelimit = "0.9"
regex = "1.8" regex = "1.8"
regex-automata = "0.4" regex-automata = { version = "0.4" }
reqwest = { version = "0.12", default-features = false, features = [ reqwest = { version = "0.12", default-features = false, features = [
"json", "json",
"rustls-tls-native-roots", "rustls-tls-native-roots",
@@ -171,6 +165,7 @@ rstest = "0.21"
rstest_reuse = "0.7" rstest_reuse = "0.7"
rust_decimal = "1.33" rust_decimal = "1.33"
rustc-hash = "2.0" rustc-hash = "2.0"
schemars = "0.8"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", features = ["float_roundtrip"] } serde_json = { version = "1.0", features = ["float_roundtrip"] }
serde_with = "3" serde_with = "3"
@@ -182,17 +177,16 @@ sysinfo = "0.30"
# on branch v0.44.x # on branch v0.44.x
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "54a267ac89c09b11c0c88934690530807185d3e7", features = [ sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "54a267ac89c09b11c0c88934690530807185d3e7", features = [
"visitor", "visitor",
"serde",
] } ] }
strum = { version = "0.25", features = ["derive"] } strum = { version = "0.25", features = ["derive"] }
tempfile = "3" tempfile = "3"
tokio = { version = "1.40", features = ["full"] } tokio = { version = "1.40", features = ["full"] }
tokio-postgres = "0.7" tokio-postgres = "0.7"
tokio-stream = "0.1" tokio-stream = { version = "0.1" }
tokio-util = { version = "0.7", features = ["io-util", "compat"] } tokio-util = { version = "0.7", features = ["io-util", "compat"] }
toml = "0.8.8" toml = "0.8.8"
tonic = { version = "0.11", features = ["tls", "gzip", "zstd"] } tonic = { version = "0.11", features = ["tls", "gzip", "zstd"] }
tower = "0.4" tower = { version = "0.4" }
tracing-appender = "0.2" tracing-appender = "0.2"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "fmt"] } tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "fmt"] }
typetag = "0.2" typetag = "0.2"
@@ -204,7 +198,6 @@ api = { path = "src/api" }
auth = { path = "src/auth" } auth = { path = "src/auth" }
cache = { path = "src/cache" } cache = { path = "src/cache" }
catalog = { path = "src/catalog" } catalog = { path = "src/catalog" }
cli = { path = "src/cli" }
client = { path = "src/client" } client = { path = "src/client" }
cmd = { path = "src/cmd", default-features = false } cmd = { path = "src/cmd", default-features = false }
common-base = { path = "src/common/base" } common-base = { path = "src/common/base" }
@@ -221,7 +214,6 @@ common-grpc-expr = { path = "src/common/grpc-expr" }
common-macro = { path = "src/common/macro" } common-macro = { path = "src/common/macro" }
common-mem-prof = { path = "src/common/mem-prof" } common-mem-prof = { path = "src/common/mem-prof" }
common-meta = { path = "src/common/meta" } common-meta = { path = "src/common/meta" }
common-options = { path = "src/common/options" }
common-plugins = { path = "src/common/plugins" } common-plugins = { path = "src/common/plugins" }
common-pprof = { path = "src/common/pprof" } common-pprof = { path = "src/common/pprof" }
common-procedure = { path = "src/common/procedure" } common-procedure = { path = "src/common/procedure" }
@@ -240,7 +232,6 @@ file-engine = { path = "src/file-engine" }
flow = { path = "src/flow" } flow = { path = "src/flow" }
frontend = { path = "src/frontend", default-features = false } frontend = { path = "src/frontend", default-features = false }
index = { path = "src/index" } index = { path = "src/index" }
log-query = { path = "src/log-query" }
log-store = { path = "src/log-store" } log-store = { path = "src/log-store" }
meta-client = { path = "src/meta-client" } meta-client = { path = "src/meta-client" }
meta-srv = { path = "src/meta-srv" } meta-srv = { path = "src/meta-srv" }
@@ -270,21 +261,21 @@ tokio-rustls = { git = "https://github.com/GreptimeTeam/tokio-rustls" }
# This is commented, since we are not using aws-lc-sys, if we need to use it, we need to uncomment this line or use a release after this commit, or it wouldn't compile with gcc < 8.1 # This is commented, since we are not using aws-lc-sys, if we need to use it, we need to uncomment this line or use a release after this commit, or it wouldn't compile with gcc < 8.1
# see https://github.com/aws/aws-lc-rs/pull/526 # see https://github.com/aws/aws-lc-rs/pull/526
# aws-lc-sys = { git ="https://github.com/aws/aws-lc-rs", rev = "556558441e3494af4b156ae95ebc07ebc2fd38aa" } # aws-lc-sys = { git ="https://github.com/aws/aws-lc-rs", rev = "556558441e3494af4b156ae95ebc07ebc2fd38aa" }
# Apply a fix for pprof for unaligned pointer access
pprof = { git = "https://github.com/GreptimeTeam/pprof-rs", rev = "1bd1e21" }
[workspace.dependencies.meter-macros] [workspace.dependencies.meter-macros]
git = "https://github.com/GreptimeTeam/greptime-meter.git" git = "https://github.com/GreptimeTeam/greptime-meter.git"
rev = "a10facb353b41460eeb98578868ebf19c2084fac" rev = "a10facb353b41460eeb98578868ebf19c2084fac"
[profile.release] [profile.release]
debug = 1 # debug = 1
split-debuginfo = "off"
[profile.nightly] [profile.nightly]
inherits = "release" inherits = "release"
strip = "debuginfo" split-debuginfo = "off"
# strip = "debuginfo"
lto = "thin" lto = "thin"
debug = false # debug = false
incremental = false incremental = false
[profile.ci] [profile.ci]

View File

@@ -6,7 +6,7 @@
</picture> </picture>
</p> </p>
<h2 align="center">Unified & Cost-Effective Time Series Database for Metrics, Logs, and Events</h2> <h2 align="center">Unified Time Series Database for Metrics, Logs, and Events</h2>
<div align="center"> <div align="center">
<h3 align="center"> <h3 align="center">
@@ -48,51 +48,37 @@
</a> </a>
</div> </div>
- [Introduction](#introduction)
- [**Features: Why GreptimeDB**](#why-greptimedb)
- [Architecture](https://docs.greptime.com/contributor-guide/overview/#architecture)
- [Try it for free](#try-greptimedb)
- [Getting Started](#getting-started)
- [Project Status](#project-status)
- [Join the community](#community)
- [Contributing](#contributing)
- [Tools & Extensions](#tools--extensions)
- [License](#license)
- [Acknowledgement](#acknowledgement)
## Introduction ## Introduction
**GreptimeDB** is an open-source unified & cost-effective time-series database for **Metrics**, **Logs**, and **Events** (also **Traces** in plan). You can gain real-time insights from Edge to Cloud at Any Scale. **GreptimeDB** is an open-source unified time-series database for **Metrics**, **Logs**, and **Events** (also **Traces** in plan). You can gain real-time insights from Edge to Cloud at any scale.
## Why GreptimeDB ## Why GreptimeDB
Our core developers have been building time-series data platforms for years. Based on our best practices, GreptimeDB was born to give you: Our core developers have been building time-series data platforms for years. Based on our best-practices, GreptimeDB is born to give you:
* **Unified Processing of Metrics, Logs, and Events** * **Unified all kinds of time series**
GreptimeDB unifies time series data processing by treating all data - whether metrics, logs, or events - as timestamped events with context. Users can analyze this data using either [SQL](https://docs.greptime.com/user-guide/query-data/sql) or [PromQL](https://docs.greptime.com/user-guide/query-data/promql) and leverage stream processing ([Flow](https://docs.greptime.com/user-guide/flow-computation/overview)) to enable continuous aggregation. [Read more](https://docs.greptime.com/user-guide/concepts/data-model). GreptimeDB treats all time series as contextual events with timestamp, and thus unifies the processing of metrics, logs, and events. It supports analyzing metrics, logs, and events with SQL and PromQL, and doing streaming with continuous aggregation.
* **Cloud-native Distributed Database** * **Cloud-Edge collaboration**
Built for [Kubernetes](https://docs.greptime.com/user-guide/deployments/deploy-on-kubernetes/greptimedb-operator-management). GreptimeDB achieves seamless scalability with its [cloud-native architecture](https://docs.greptime.com/user-guide/concepts/architecture) of separated compute and storage, built on object storage (AWS S3, Azure Blob Storage, etc.) while enabling cross-cloud deployment through a unified data access layer. GreptimeDB can be deployed on ARM architecture-compatible Android/Linux systems as well as cloud environments from various vendors. Both sides run the same software, providing identical APIs and control planes, so your application can run at the edge or on the cloud without modification, and data synchronization also becomes extremely easy and efficient.
* **Cloud-native distributed database**
By leveraging object storage (S3 and others), separating compute and storage, scaling stateless compute nodes arbitrarily, GreptimeDB implements seamless scalability. It also supports cross-cloud deployment with a built-in unified data access layer over different object storages.
* **Performance and Cost-effective** * **Performance and Cost-effective**
Written in pure Rust for superior performance and reliability. GreptimeDB features a distributed query engine with intelligent indexing to handle high cardinality data efficiently. Its optimized columnar storage achieves 50x cost efficiency on cloud object storage through advanced compression. [Benchmark reports](https://www.greptime.com/blogs/2024-09-09-report-summary). Flexible indexing capabilities and distributed, parallel-processing query engine, tackling high cardinality issues down. Optimized columnar layout for handling time-series data; compacted, compressed, and stored on various storage backends, particularly cloud object storage with 50x cost efficiency.
* **Cloud-Edge Collaboration** * **Compatible with InfluxDB, Prometheus and more protocols**
GreptimeDB seamlessly operates across cloud and edge (ARM/Android/Linux), providing consistent APIs and control plane for unified data management and efficient synchronization. [Learn how to run on Android](https://docs.greptime.com/user-guide/deployments/run-on-android/). Widely adopted database protocols and APIs, including MySQL, PostgreSQL, and Prometheus Remote Storage, etc. [Read more](https://docs.greptime.com/user-guide/protocols/overview).
* **Multi-protocol Ingestion, SQL & PromQL Ready**
Widely adopted database protocols and APIs, including MySQL, PostgreSQL, InfluxDB, OpenTelemetry, Loki and Prometheus, etc. Effortless Adoption & Seamless Migration. [Supported Protocols Overview](https://docs.greptime.com/user-guide/protocols/overview).
For more detailed info please read [Why GreptimeDB](https://docs.greptime.com/user-guide/concepts/why-greptimedb).
## Try GreptimeDB ## Try GreptimeDB
### 1. [Live Demo](https://greptime.com/playground) ### 1. [GreptimePlay](https://greptime.com/playground)
Try out the features of GreptimeDB right from your browser. Try out the features of GreptimeDB right from your browser.
@@ -111,18 +97,9 @@ docker pull greptime/greptimedb
Start a GreptimeDB container with: Start a GreptimeDB container with:
```shell ```shell
docker run -p 127.0.0.1:4000-4003:4000-4003 \ docker run --rm --name greptime --net=host greptime/greptimedb standalone start
-v "$(pwd)/greptimedb:/tmp/greptimedb" \
--name greptime --rm \
greptime/greptimedb:latest standalone start \
--http-addr 0.0.0.0:4000 \
--rpc-addr 0.0.0.0:4001 \
--mysql-addr 0.0.0.0:4002 \
--postgres-addr 0.0.0.0:4003
``` ```
Access the dashboard via `http://localhost:4000/dashboard`.
Read more about [Installation](https://docs.greptime.com/getting-started/installation/overview) on docs. Read more about [Installation](https://docs.greptime.com/getting-started/installation/overview) on docs.
## Getting Started ## Getting Started
@@ -138,7 +115,7 @@ Check the prerequisite:
* [Rust toolchain](https://www.rust-lang.org/tools/install) (nightly) * [Rust toolchain](https://www.rust-lang.org/tools/install) (nightly)
* [Protobuf compiler](https://grpc.io/docs/protoc-installation/) (>= 3.15) * [Protobuf compiler](https://grpc.io/docs/protoc-installation/) (>= 3.15)
* Python toolchain (optional): Required only if built with PyO3 backend. More details for compiling with PyO3 can be found in its [documentation](https://pyo3.rs/v0.18.1/building_and_distribution#configuring-the-python-version). * Python toolchain (optional): Required only if built with PyO3 backend. More detail for compiling with PyO3 can be found in its [documentation](https://pyo3.rs/v0.18.1/building_and_distribution#configuring-the-python-version).
Build GreptimeDB binary: Build GreptimeDB binary:
@@ -152,11 +129,7 @@ Run a standalone server:
cargo run -- standalone start cargo run -- standalone start
``` ```
## Tools & Extensions ## Extension
### Kubernetes
- [GreptimeDB Operator](https://github.com/GrepTimeTeam/greptimedb-operator)
### Dashboard ### Dashboard
@@ -173,19 +146,14 @@ cargo run -- standalone start
### Grafana Dashboard ### Grafana Dashboard
Our official Grafana dashboard for monitoring GreptimeDB is available at [grafana](grafana/README.md) directory. Our official Grafana dashboard is available at [grafana](grafana/README.md) directory.
## Project Status ## Project Status
GreptimeDB is currently in Beta. We are targeting GA (General Availability) with v1.0 release by Early 2025. The current version has not yet reached the standards for General Availability.
According to our Greptime 2024 Roadmap, we aim to achieve a production-level version with the release of v1.0 by the end of 2024. [Join Us](https://github.com/GreptimeTeam/greptimedb/issues/3412)
While in Beta, GreptimeDB is already: We welcome you to test and use GreptimeDB. Some users have already adopted it in their production environments. If you're interested in trying it out, please use the latest stable release available.
* Being used in production by early adopters
* Actively maintained with regular releases, [about version number](https://docs.greptime.com/nightly/reference/about-greptimedb-version)
* Suitable for testing and evaluation
For production use, we recommend using the latest stable release.
## Community ## Community
@@ -204,12 +172,12 @@ In addition, you may:
- Connect us with [Linkedin](https://www.linkedin.com/company/greptime/) - Connect us with [Linkedin](https://www.linkedin.com/company/greptime/)
- Follow us on [Twitter](https://twitter.com/greptime) - Follow us on [Twitter](https://twitter.com/greptime)
## Commercial Support ## Commerial Support
If you are running GreptimeDB OSS in your organization, we offer additional If you are running GreptimeDB OSS in your organization, we offer additional
enterprise add-ons, installation services, training, and consulting. [Contact enterprise addons, installation service, training and consulting. [Contact
us](https://greptime.com/contactus) and we will reach out to you with more us](https://greptime.com/contactus) and we will reach out to you with more
detail of our commercial license. detail of our commerial license.
## License ## License

View File

@@ -13,12 +13,11 @@
| Key | Type | Default | Descriptions | | Key | Type | Default | Descriptions |
| --- | -----| ------- | ----------- | | --- | -----| ------- | ----------- |
| `mode` | String | `standalone` | The running mode of the datanode. It can be `standalone` or `distributed`. | | `mode` | String | `standalone` | The running mode of the datanode. It can be `standalone` or `distributed`. |
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. |
| `default_timezone` | String | Unset | The default timezone of the server. | | `default_timezone` | String | Unset | The default timezone of the server. |
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.<br/>By default, it provides services after all regions have been initialized. | | `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.<br/>By default, it provides services after all regions have been initialized. |
| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. | | `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |
| `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited. | | `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited. |
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. Enabled by default. |
| `max_in_flight_write_bytes` | String | Unset | The maximum in-flight write bytes. |
| `runtime` | -- | -- | The runtime options. | | `runtime` | -- | -- | The runtime options. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | | `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. | | `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
@@ -62,9 +61,9 @@
| `wal` | -- | -- | The WAL options. | | `wal` | -- | -- | The WAL options. |
| `wal.provider` | String | `raft_engine` | The provider of the WAL.<br/>- `raft_engine`: the wal is stored in the local file system by raft-engine.<br/>- `kafka`: it's remote wal that data is stored in Kafka. | | `wal.provider` | String | `raft_engine` | The provider of the WAL.<br/>- `raft_engine`: the wal is stored in the local file system by raft-engine.<br/>- `kafka`: it's remote wal that data is stored in Kafka. |
| `wal.dir` | String | Unset | The directory to store the WAL files.<br/>**It's only used when the provider is `raft_engine`**. | | `wal.dir` | String | Unset | The directory to store the WAL files.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.file_size` | String | `128MB` | The size of the WAL segment file.<br/>**It's only used when the provider is `raft_engine`**. | | `wal.file_size` | String | `256MB` | The size of the WAL segment file.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.purge_threshold` | String | `1GB` | The threshold of the WAL size to trigger a flush.<br/>**It's only used when the provider is `raft_engine`**. | | `wal.purge_threshold` | String | `4GB` | The threshold of the WAL size to trigger a flush.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.purge_interval` | String | `1m` | The interval to trigger a flush.<br/>**It's only used when the provider is `raft_engine`**. | | `wal.purge_interval` | String | `10m` | The interval to trigger a flush.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.read_batch_size` | Integer | `128` | The read batch size.<br/>**It's only used when the provider is `raft_engine`**. | | `wal.read_batch_size` | Integer | `128` | The read batch size.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.sync_write` | Bool | `false` | Whether to use sync write.<br/>**It's only used when the provider is `raft_engine`**. | | `wal.sync_write` | Bool | `false` | Whether to use sync write.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.enable_log_recycle` | Bool | `true` | Whether to reuse logically truncated log files.<br/>**It's only used when the provider is `raft_engine`**. | | `wal.enable_log_recycle` | Bool | `true` | Whether to reuse logically truncated log files.<br/>**It's only used when the provider is `raft_engine`**. |
@@ -94,8 +93,8 @@
| `storage` | -- | -- | The data storage options. | | `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `/tmp/greptimedb/` | The working home directory. | | `storage.data_home` | String | `/tmp/greptimedb/` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. | | `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}/object_cache/read`. An empty string means disabling. | | `storage.cache_path` | String | Unset | Cache configuration for object storage such as 'S3' etc.<br/>The local file cache directory. |
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. | | `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. |
| `storage.bucket` | String | Unset | The S3 bucket name.<br/>**It's only used when the storage type is `S3`, `Oss` and `Gcs`**. | | `storage.bucket` | String | Unset | The S3 bucket name.<br/>**It's only used when the storage type is `S3`, `Oss` and `Gcs`**. |
| `storage.root` | String | Unset | The S3 data will be stored in the specified prefix, for example, `s3://${bucket}/${root}`.<br/>**It's only used when the storage type is `S3`, `Oss` and `Azblob`**. | | `storage.root` | String | Unset | The S3 data will be stored in the specified prefix, for example, `s3://${bucket}/${root}`.<br/>**It's only used when the storage type is `S3`, `Oss` and `Azblob`**. |
| `storage.access_key_id` | String | Unset | The access key id of the aws account.<br/>It's **highly recommended** to use AWS IAM roles instead of hardcoding the access key id and secret key.<br/>**It's only used when the storage type is `S3` and `Oss`**. | | `storage.access_key_id` | String | Unset | The access key id of the aws account.<br/>It's **highly recommended** to use AWS IAM roles instead of hardcoding the access key id and secret key.<br/>**It's only used when the storage type is `S3` and `Oss`**. |
@@ -110,11 +109,6 @@
| `storage.sas_token` | String | Unset | The sas token of the azure account.<br/>**It's only used when the storage type is `Azblob`**. | | `storage.sas_token` | String | Unset | The sas token of the azure account.<br/>**It's only used when the storage type is `Azblob`**. |
| `storage.endpoint` | String | Unset | The endpoint of the S3 service.<br/>**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. | | `storage.endpoint` | String | Unset | The endpoint of the S3 service.<br/>**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. |
| `storage.region` | String | Unset | The region of the S3 service.<br/>**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. | | `storage.region` | String | Unset | The region of the S3 service.<br/>**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. |
| `storage.http_client` | -- | -- | The http client options to the storage.<br/>**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. |
| `storage.http_client.pool_max_idle_per_host` | Integer | `1024` | The maximum idle connection per host allowed in the pool. |
| `storage.http_client.connect_timeout` | String | `30s` | The timeout for only the connect phase of a http client. |
| `storage.http_client.timeout` | String | `30s` | The total request timeout, applied from when the request starts connecting until the response body has finished.<br/>Also considered a total deadline. |
| `storage.http_client.pool_idle_timeout` | String | `90s` | The timeout for idle sockets being kept-alive. |
| `[[region_engine]]` | -- | -- | The region engine options. You can configure multiple region engines. | | `[[region_engine]]` | -- | -- | The region engine options. You can configure multiple region engines. |
| `region_engine.mito` | -- | -- | The Mito engine options. | | `region_engine.mito` | -- | -- | The Mito engine options. |
| `region_engine.mito.num_workers` | Integer | `8` | Number of region workers. | | `region_engine.mito.num_workers` | Integer | `8` | Number of region workers. |
@@ -132,11 +126,12 @@
| `region_engine.mito.vector_cache_size` | String | Auto | Cache size for vectors and arrow arrays. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. | | `region_engine.mito.vector_cache_size` | String | Auto | Cache size for vectors and arrow arrays. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.page_cache_size` | String | Auto | Cache size for pages of SST row groups. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/8 of OS memory. | | `region_engine.mito.page_cache_size` | String | Auto | Cache size for pages of SST row groups. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/8 of OS memory. |
| `region_engine.mito.selector_result_cache_size` | String | Auto | Cache size for time series selector (e.g. `last_value()`). Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. | | `region_engine.mito.selector_result_cache_size` | String | Auto | Cache size for time series selector (e.g. `last_value()`). Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.enable_experimental_write_cache` | Bool | `false` | Whether to enable the experimental write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance. | | `region_engine.mito.enable_experimental_write_cache` | Bool | `false` | Whether to enable the experimental write cache. |
| `region_engine.mito.experimental_write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}/object_cache/write`. | | `region_engine.mito.experimental_write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}/write_cache`. |
| `region_engine.mito.experimental_write_cache_size` | String | `5GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. | | `region_engine.mito.experimental_write_cache_size` | String | `512MB` | Capacity for write cache. |
| `region_engine.mito.experimental_write_cache_ttl` | String | Unset | TTL for write cache. | | `region_engine.mito.experimental_write_cache_ttl` | String | Unset | TTL for write cache. |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. | | `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
| `region_engine.mito.scan_parallelism` | Integer | `0` | Parallelism to scan a region (default: 1/4 of cpu cores).<br/>- `0`: using the default value (1/4 of cpu cores).<br/>- `1`: scan in current thread.<br/>- `n`: scan in parallelism n. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. | | `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. | | `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.<br/>To align with the old behavior, the default value is 0 (no restrictions). | | `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.<br/>To align with the old behavior, the default value is 0 (no restrictions). |
@@ -151,17 +146,11 @@
| `region_engine.mito.inverted_index.intermediate_path` | String | `""` | Deprecated, use `region_engine.mito.index.aux_path` instead. | | `region_engine.mito.inverted_index.intermediate_path` | String | `""` | Deprecated, use `region_engine.mito.index.aux_path` instead. |
| `region_engine.mito.inverted_index.metadata_cache_size` | String | `64MiB` | Cache size for inverted index metadata. | | `region_engine.mito.inverted_index.metadata_cache_size` | String | `64MiB` | Cache size for inverted index metadata. |
| `region_engine.mito.inverted_index.content_cache_size` | String | `128MiB` | Cache size for inverted index content. | | `region_engine.mito.inverted_index.content_cache_size` | String | `128MiB` | Cache size for inverted index content. |
| `region_engine.mito.inverted_index.content_cache_page_size` | String | `8MiB` | Page size for inverted index content cache. |
| `region_engine.mito.fulltext_index` | -- | -- | The options for full-text index in Mito engine. | | `region_engine.mito.fulltext_index` | -- | -- | The options for full-text index in Mito engine. |
| `region_engine.mito.fulltext_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically (default)<br/>- `disable`: never | | `region_engine.mito.fulltext_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never | | `region_engine.mito.fulltext_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.apply_on_query` | String | `auto` | Whether to apply the index on query<br/>- `auto`: automatically (default)<br/>- `disable`: never | | `region_engine.mito.fulltext_index.apply_on_query` | String | `auto` | Whether to apply the index on query<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.mem_threshold_on_create` | String | `auto` | Memory threshold for index creation.<br/>- `auto`: automatically determine the threshold based on the system memory size (default)<br/>- `unlimited`: no memory limit<br/>- `[size]` e.g. `64MB`: fixed memory threshold | | `region_engine.mito.fulltext_index.mem_threshold_on_create` | String | `auto` | Memory threshold for index creation.<br/>- `auto`: automatically determine the threshold based on the system memory size (default)<br/>- `unlimited`: no memory limit<br/>- `[size]` e.g. `64MB`: fixed memory threshold |
| `region_engine.mito.bloom_filter_index` | -- | -- | The options for bloom filter in Mito engine. |
| `region_engine.mito.bloom_filter_index.create_on_flush` | String | `auto` | Whether to create the bloom filter on flush.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.bloom_filter_index.create_on_compaction` | String | `auto` | Whether to create the bloom filter on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.bloom_filter_index.apply_on_query` | String | `auto` | Whether to apply the bloom filter on query<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.bloom_filter_index.mem_threshold_on_create` | String | `auto` | Memory threshold for bloom filter creation.<br/>- `auto`: automatically determine the threshold based on the system memory size (default)<br/>- `unlimited`: no memory limit<br/>- `[size]` e.g. `64MB`: fixed memory threshold |
| `region_engine.mito.memtable` | -- | -- | -- | | `region_engine.mito.memtable` | -- | -- | -- |
| `region_engine.mito.memtable.type` | String | `time_series` | Memtable type.<br/>- `time_series`: time-series memtable<br/>- `partition_tree`: partition tree memtable (experimental) | | `region_engine.mito.memtable.type` | String | `time_series` | Memtable type.<br/>- `time_series`: time-series memtable<br/>- `partition_tree`: partition tree memtable (experimental) |
| `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.<br/>Only available for `partition_tree` memtable. | | `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.<br/>Only available for `partition_tree` memtable. |
@@ -201,7 +190,6 @@
| Key | Type | Default | Descriptions | | Key | Type | Default | Descriptions |
| --- | -----| ------- | ----------- | | --- | -----| ------- | ----------- |
| `default_timezone` | String | Unset | The default timezone of the server. | | `default_timezone` | String | Unset | The default timezone of the server. |
| `max_in_flight_write_bytes` | String | Unset | The maximum in-flight write bytes. |
| `runtime` | -- | -- | The runtime options. | | `runtime` | -- | -- | The runtime options. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | | `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. | | `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
@@ -293,13 +281,13 @@
| `data_home` | String | `/tmp/metasrv/` | The working home directory. | | `data_home` | String | `/tmp/metasrv/` | The working home directory. |
| `bind_addr` | String | `127.0.0.1:3002` | The bind address of metasrv. | | `bind_addr` | String | `127.0.0.1:3002` | The bind address of metasrv. |
| `server_addr` | String | `127.0.0.1:3002` | The communication server address for frontend and datanode to connect to metasrv, "127.0.0.1:3002" by default for localhost. | | `server_addr` | String | `127.0.0.1:3002` | The communication server address for frontend and datanode to connect to metasrv, "127.0.0.1:3002" by default for localhost. |
| `store_addrs` | Array | -- | Store server address default to etcd store. | | `store_addr` | String | `127.0.0.1:2379` | Store server address default to etcd store. |
| `store_key_prefix` | String | `""` | If it's not empty, the metasrv will store all data with this key prefix. |
| `backend` | String | `EtcdStore` | The datastore for meta server. |
| `selector` | String | `round_robin` | Datanode selector type.<br/>- `round_robin` (default value)<br/>- `lease_based`<br/>- `load_based`<br/>For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". | | `selector` | String | `round_robin` | Datanode selector type.<br/>- `round_robin` (default value)<br/>- `lease_based`<br/>- `load_based`<br/>For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". |
| `use_memory_store` | Bool | `false` | Store data in memory. | | `use_memory_store` | Bool | `false` | Store data in memory. |
| `enable_telemetry` | Bool | `true` | Whether to enable greptimedb telemetry. |
| `store_key_prefix` | String | `""` | If it's not empty, the metasrv will store all data with this key prefix. |
| `enable_region_failover` | Bool | `false` | Whether to enable region failover.<br/>This feature is only available on GreptimeDB running on cluster mode and<br/>- Using Remote WAL<br/>- Using shared storage (e.g., s3). | | `enable_region_failover` | Bool | `false` | Whether to enable region failover.<br/>This feature is only available on GreptimeDB running on cluster mode and<br/>- Using Remote WAL<br/>- Using shared storage (e.g., s3). |
| `enable_telemetry` | Bool | `true` | Whether to enable greptimedb telemetry. Enabled by default. | | `backend` | String | `EtcdStore` | The datastore for meta server. |
| `runtime` | -- | -- | The runtime options. | | `runtime` | -- | -- | The runtime options. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | | `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. | | `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
@@ -364,6 +352,7 @@
| `node_id` | Integer | Unset | The datanode identifier and should be unique in the cluster. | | `node_id` | Integer | Unset | The datanode identifier and should be unique in the cluster. |
| `require_lease_before_startup` | Bool | `false` | Start services after regions have obtained leases.<br/>It will block the datanode start if it can't receive leases in the heartbeat from metasrv. | | `require_lease_before_startup` | Bool | `false` | Start services after regions have obtained leases.<br/>It will block the datanode start if it can't receive leases in the heartbeat from metasrv. |
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.<br/>By default, it provides services after all regions have been initialized. | | `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.<br/>By default, it provides services after all regions have been initialized. |
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. |
| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. | | `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |
| `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited. | | `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited. |
| `rpc_addr` | String | Unset | Deprecated, use `grpc.addr` instead. | | `rpc_addr` | String | Unset | Deprecated, use `grpc.addr` instead. |
@@ -371,7 +360,6 @@
| `rpc_runtime_size` | Integer | Unset | Deprecated, use `grpc.runtime_size` instead. | | `rpc_runtime_size` | Integer | Unset | Deprecated, use `grpc.runtime_size` instead. |
| `rpc_max_recv_message_size` | String | Unset | Deprecated, use `grpc.rpc_max_recv_message_size` instead. | | `rpc_max_recv_message_size` | String | Unset | Deprecated, use `grpc.rpc_max_recv_message_size` instead. |
| `rpc_max_send_message_size` | String | Unset | Deprecated, use `grpc.rpc_max_send_message_size` instead. | | `rpc_max_send_message_size` | String | Unset | Deprecated, use `grpc.rpc_max_send_message_size` instead. |
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. Enabled by default. |
| `http` | -- | -- | The HTTP server options. | | `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. | | `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. | | `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. |
@@ -406,9 +394,9 @@
| `wal` | -- | -- | The WAL options. | | `wal` | -- | -- | The WAL options. |
| `wal.provider` | String | `raft_engine` | The provider of the WAL.<br/>- `raft_engine`: the wal is stored in the local file system by raft-engine.<br/>- `kafka`: it's remote wal that data is stored in Kafka. | | `wal.provider` | String | `raft_engine` | The provider of the WAL.<br/>- `raft_engine`: the wal is stored in the local file system by raft-engine.<br/>- `kafka`: it's remote wal that data is stored in Kafka. |
| `wal.dir` | String | Unset | The directory to store the WAL files.<br/>**It's only used when the provider is `raft_engine`**. | | `wal.dir` | String | Unset | The directory to store the WAL files.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.file_size` | String | `128MB` | The size of the WAL segment file.<br/>**It's only used when the provider is `raft_engine`**. | | `wal.file_size` | String | `256MB` | The size of the WAL segment file.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.purge_threshold` | String | `1GB` | The threshold of the WAL size to trigger a flush.<br/>**It's only used when the provider is `raft_engine`**. | | `wal.purge_threshold` | String | `4GB` | The threshold of the WAL size to trigger a flush.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.purge_interval` | String | `1m` | The interval to trigger a flush.<br/>**It's only used when the provider is `raft_engine`**. | | `wal.purge_interval` | String | `10m` | The interval to trigger a flush.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.read_batch_size` | Integer | `128` | The read batch size.<br/>**It's only used when the provider is `raft_engine`**. | | `wal.read_batch_size` | Integer | `128` | The read batch size.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.sync_write` | Bool | `false` | Whether to use sync write.<br/>**It's only used when the provider is `raft_engine`**. | | `wal.sync_write` | Bool | `false` | Whether to use sync write.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.enable_log_recycle` | Bool | `true` | Whether to reuse logically truncated log files.<br/>**It's only used when the provider is `raft_engine`**. | | `wal.enable_log_recycle` | Bool | `true` | Whether to reuse logically truncated log files.<br/>**It's only used when the provider is `raft_engine`**. |
@@ -428,8 +416,8 @@
| `storage` | -- | -- | The data storage options. | | `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `/tmp/greptimedb/` | The working home directory. | | `storage.data_home` | String | `/tmp/greptimedb/` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. | | `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. | | `storage.cache_path` | String | Unset | Cache configuration for object storage such as 'S3' etc.<br/>The local file cache directory. |
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. | | `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. |
| `storage.bucket` | String | Unset | The S3 bucket name.<br/>**It's only used when the storage type is `S3`, `Oss` and `Gcs`**. | | `storage.bucket` | String | Unset | The S3 bucket name.<br/>**It's only used when the storage type is `S3`, `Oss` and `Gcs`**. |
| `storage.root` | String | Unset | The S3 data will be stored in the specified prefix, for example, `s3://${bucket}/${root}`.<br/>**It's only used when the storage type is `S3`, `Oss` and `Azblob`**. | | `storage.root` | String | Unset | The S3 data will be stored in the specified prefix, for example, `s3://${bucket}/${root}`.<br/>**It's only used when the storage type is `S3`, `Oss` and `Azblob`**. |
| `storage.access_key_id` | String | Unset | The access key id of the aws account.<br/>It's **highly recommended** to use AWS IAM roles instead of hardcoding the access key id and secret key.<br/>**It's only used when the storage type is `S3` and `Oss`**. | | `storage.access_key_id` | String | Unset | The access key id of the aws account.<br/>It's **highly recommended** to use AWS IAM roles instead of hardcoding the access key id and secret key.<br/>**It's only used when the storage type is `S3` and `Oss`**. |
@@ -444,11 +432,6 @@
| `storage.sas_token` | String | Unset | The sas token of the azure account.<br/>**It's only used when the storage type is `Azblob`**. | | `storage.sas_token` | String | Unset | The sas token of the azure account.<br/>**It's only used when the storage type is `Azblob`**. |
| `storage.endpoint` | String | Unset | The endpoint of the S3 service.<br/>**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. | | `storage.endpoint` | String | Unset | The endpoint of the S3 service.<br/>**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. |
| `storage.region` | String | Unset | The region of the S3 service.<br/>**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. | | `storage.region` | String | Unset | The region of the S3 service.<br/>**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. |
| `storage.http_client` | -- | -- | The http client options to the storage.<br/>**It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**. |
| `storage.http_client.pool_max_idle_per_host` | Integer | `1024` | The maximum idle connection per host allowed in the pool. |
| `storage.http_client.connect_timeout` | String | `30s` | The timeout for only the connect phase of a http client. |
| `storage.http_client.timeout` | String | `30s` | The total request timeout, applied from when the request starts connecting until the response body has finished.<br/>Also considered a total deadline. |
| `storage.http_client.pool_idle_timeout` | String | `90s` | The timeout for idle sockets being kept-alive. |
| `[[region_engine]]` | -- | -- | The region engine options. You can configure multiple region engines. | | `[[region_engine]]` | -- | -- | The region engine options. You can configure multiple region engines. |
| `region_engine.mito` | -- | -- | The Mito engine options. | | `region_engine.mito` | -- | -- | The Mito engine options. |
| `region_engine.mito.num_workers` | Integer | `8` | Number of region workers. | | `region_engine.mito.num_workers` | Integer | `8` | Number of region workers. |
@@ -466,11 +449,12 @@
| `region_engine.mito.vector_cache_size` | String | Auto | Cache size for vectors and arrow arrays. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. | | `region_engine.mito.vector_cache_size` | String | Auto | Cache size for vectors and arrow arrays. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.page_cache_size` | String | Auto | Cache size for pages of SST row groups. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/8 of OS memory. | | `region_engine.mito.page_cache_size` | String | Auto | Cache size for pages of SST row groups. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/8 of OS memory. |
| `region_engine.mito.selector_result_cache_size` | String | Auto | Cache size for time series selector (e.g. `last_value()`). Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. | | `region_engine.mito.selector_result_cache_size` | String | Auto | Cache size for time series selector (e.g. `last_value()`). Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.enable_experimental_write_cache` | Bool | `false` | Whether to enable the experimental write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance. | | `region_engine.mito.enable_experimental_write_cache` | Bool | `false` | Whether to enable the experimental write cache. |
| `region_engine.mito.experimental_write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}`. | | `region_engine.mito.experimental_write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}/write_cache`. |
| `region_engine.mito.experimental_write_cache_size` | String | `5GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. | | `region_engine.mito.experimental_write_cache_size` | String | `512MB` | Capacity for write cache. |
| `region_engine.mito.experimental_write_cache_ttl` | String | Unset | TTL for write cache. | | `region_engine.mito.experimental_write_cache_ttl` | String | Unset | TTL for write cache. |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. | | `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
| `region_engine.mito.scan_parallelism` | Integer | `0` | Parallelism to scan a region (default: 1/4 of cpu cores).<br/>- `0`: using the default value (1/4 of cpu cores).<br/>- `1`: scan in current thread.<br/>- `n`: scan in parallelism n. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. | | `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. | | `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.<br/>To align with the old behavior, the default value is 0 (no restrictions). | | `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.<br/>To align with the old behavior, the default value is 0 (no restrictions). |
@@ -483,19 +467,11 @@
| `region_engine.mito.inverted_index.apply_on_query` | String | `auto` | Whether to apply the index on query<br/>- `auto`: automatically (default)<br/>- `disable`: never | | `region_engine.mito.inverted_index.apply_on_query` | String | `auto` | Whether to apply the index on query<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.inverted_index.mem_threshold_on_create` | String | `auto` | Memory threshold for performing an external sort during index creation.<br/>- `auto`: automatically determine the threshold based on the system memory size (default)<br/>- `unlimited`: no memory limit<br/>- `[size]` e.g. `64MB`: fixed memory threshold | | `region_engine.mito.inverted_index.mem_threshold_on_create` | String | `auto` | Memory threshold for performing an external sort during index creation.<br/>- `auto`: automatically determine the threshold based on the system memory size (default)<br/>- `unlimited`: no memory limit<br/>- `[size]` e.g. `64MB`: fixed memory threshold |
| `region_engine.mito.inverted_index.intermediate_path` | String | `""` | Deprecated, use `region_engine.mito.index.aux_path` instead. | | `region_engine.mito.inverted_index.intermediate_path` | String | `""` | Deprecated, use `region_engine.mito.index.aux_path` instead. |
| `region_engine.mito.inverted_index.metadata_cache_size` | String | `64MiB` | Cache size for inverted index metadata. |
| `region_engine.mito.inverted_index.content_cache_size` | String | `128MiB` | Cache size for inverted index content. |
| `region_engine.mito.inverted_index.content_cache_page_size` | String | `8MiB` | Page size for inverted index content cache. |
| `region_engine.mito.fulltext_index` | -- | -- | The options for full-text index in Mito engine. | | `region_engine.mito.fulltext_index` | -- | -- | The options for full-text index in Mito engine. |
| `region_engine.mito.fulltext_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically (default)<br/>- `disable`: never | | `region_engine.mito.fulltext_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never | | `region_engine.mito.fulltext_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.apply_on_query` | String | `auto` | Whether to apply the index on query<br/>- `auto`: automatically (default)<br/>- `disable`: never | | `region_engine.mito.fulltext_index.apply_on_query` | String | `auto` | Whether to apply the index on query<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.mem_threshold_on_create` | String | `auto` | Memory threshold for index creation.<br/>- `auto`: automatically determine the threshold based on the system memory size (default)<br/>- `unlimited`: no memory limit<br/>- `[size]` e.g. `64MB`: fixed memory threshold | | `region_engine.mito.fulltext_index.mem_threshold_on_create` | String | `auto` | Memory threshold for index creation.<br/>- `auto`: automatically determine the threshold based on the system memory size (default)<br/>- `unlimited`: no memory limit<br/>- `[size]` e.g. `64MB`: fixed memory threshold |
| `region_engine.mito.bloom_filter_index` | -- | -- | The options for bloom filter index in Mito engine. |
| `region_engine.mito.bloom_filter_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.bloom_filter_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.bloom_filter_index.apply_on_query` | String | `auto` | Whether to apply the index on query<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.bloom_filter_index.mem_threshold_on_create` | String | `auto` | Memory threshold for the index creation.<br/>- `auto`: automatically determine the threshold based on the system memory size (default)<br/>- `unlimited`: no memory limit<br/>- `[size]` e.g. `64MB`: fixed memory threshold |
| `region_engine.mito.memtable` | -- | -- | -- | | `region_engine.mito.memtable` | -- | -- | -- |
| `region_engine.mito.memtable.type` | String | `time_series` | Memtable type.<br/>- `time_series`: time-series memtable<br/>- `partition_tree`: partition tree memtable (experimental) | | `region_engine.mito.memtable.type` | String | `time_series` | Memtable type.<br/>- `time_series`: time-series memtable<br/>- `partition_tree`: partition tree memtable (experimental) |
| `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.<br/>Only available for `partition_tree` memtable. | | `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.<br/>Only available for `partition_tree` memtable. |

View File

@@ -13,6 +13,9 @@ require_lease_before_startup = false
## By default, it provides services after all regions have been initialized. ## By default, it provides services after all regions have been initialized.
init_regions_in_background = false init_regions_in_background = false
## Enable telemetry to collect anonymous usage data.
enable_telemetry = true
## Parallelism of initializing regions. ## Parallelism of initializing regions.
init_regions_parallelism = 16 init_regions_parallelism = 16
@@ -39,8 +42,6 @@ rpc_max_recv_message_size = "512MB"
## @toml2docs:none-default ## @toml2docs:none-default
rpc_max_send_message_size = "512MB" rpc_max_send_message_size = "512MB"
## Enable telemetry to collect anonymous usage data. Enabled by default.
#+ enable_telemetry = true
## The HTTP server options. ## The HTTP server options.
[http] [http]
@@ -142,15 +143,15 @@ dir = "/tmp/greptimedb/wal"
## The size of the WAL segment file. ## The size of the WAL segment file.
## **It's only used when the provider is `raft_engine`**. ## **It's only used when the provider is `raft_engine`**.
file_size = "128MB" file_size = "256MB"
## The threshold of the WAL size to trigger a flush. ## The threshold of the WAL size to trigger a flush.
## **It's only used when the provider is `raft_engine`**. ## **It's only used when the provider is `raft_engine`**.
purge_threshold = "1GB" purge_threshold = "4GB"
## The interval to trigger a flush. ## The interval to trigger a flush.
## **It's only used when the provider is `raft_engine`**. ## **It's only used when the provider is `raft_engine`**.
purge_interval = "1m" purge_interval = "10m"
## The read batch size. ## The read batch size.
## **It's only used when the provider is `raft_engine`**. ## **It's only used when the provider is `raft_engine`**.
@@ -293,14 +294,14 @@ data_home = "/tmp/greptimedb/"
## - `Oss`: the data is stored in the Aliyun OSS. ## - `Oss`: the data is stored in the Aliyun OSS.
type = "File" type = "File"
## Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance. ## Cache configuration for object storage such as 'S3' etc.
## A local file directory, defaults to `{data_home}`. An empty string means disabling. ## The local file cache directory.
## @toml2docs:none-default ## @toml2docs:none-default
#+ cache_path = "" cache_path = "/path/local_cache"
## The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. ## The local file cache capacity in bytes.
## @toml2docs:none-default ## @toml2docs:none-default
cache_capacity = "5GiB" cache_capacity = "256MB"
## The S3 bucket name. ## The S3 bucket name.
## **It's only used when the storage type is `S3`, `Oss` and `Gcs`**. ## **It's only used when the storage type is `S3`, `Oss` and `Gcs`**.
@@ -374,23 +375,6 @@ endpoint = "https://s3.amazonaws.com"
## @toml2docs:none-default ## @toml2docs:none-default
region = "us-west-2" region = "us-west-2"
## The http client options to the storage.
## **It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**.
[storage.http_client]
## The maximum idle connection per host allowed in the pool.
pool_max_idle_per_host = 1024
## The timeout for only the connect phase of a http client.
connect_timeout = "30s"
## The total request timeout, applied from when the request starts connecting until the response body has finished.
## Also considered a total deadline.
timeout = "30s"
## The timeout for idle sockets being kept-alive.
pool_idle_timeout = "90s"
# Custom storage options # Custom storage options
# [[storage.providers]] # [[storage.providers]]
# name = "S3" # name = "S3"
@@ -475,14 +459,14 @@ auto_flush_interval = "1h"
## @toml2docs:none-default="Auto" ## @toml2docs:none-default="Auto"
#+ selector_result_cache_size = "512MB" #+ selector_result_cache_size = "512MB"
## Whether to enable the experimental write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance. ## Whether to enable the experimental write cache.
enable_experimental_write_cache = false enable_experimental_write_cache = false
## File system path for write cache, defaults to `{data_home}`. ## File system path for write cache, defaults to `{data_home}/write_cache`.
experimental_write_cache_path = "" experimental_write_cache_path = ""
## Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. ## Capacity for write cache.
experimental_write_cache_size = "5GiB" experimental_write_cache_size = "512MB"
## TTL for write cache. ## TTL for write cache.
## @toml2docs:none-default ## @toml2docs:none-default
@@ -491,6 +475,12 @@ experimental_write_cache_ttl = "8h"
## Buffer size for SST writing. ## Buffer size for SST writing.
sst_write_buffer_size = "8MB" sst_write_buffer_size = "8MB"
## Parallelism to scan a region (default: 1/4 of cpu cores).
## - `0`: using the default value (1/4 of cpu cores).
## - `1`: scan in current thread.
## - `n`: scan in parallelism n.
scan_parallelism = 0
## Capacity of the channel to send data from parallel scan tasks to the main task. ## Capacity of the channel to send data from parallel scan tasks to the main task.
parallel_scan_channel_size = 32 parallel_scan_channel_size = 32
@@ -543,15 +533,6 @@ mem_threshold_on_create = "auto"
## Deprecated, use `region_engine.mito.index.aux_path` instead. ## Deprecated, use `region_engine.mito.index.aux_path` instead.
intermediate_path = "" intermediate_path = ""
## Cache size for inverted index metadata.
metadata_cache_size = "64MiB"
## Cache size for inverted index content.
content_cache_size = "128MiB"
## Page size for inverted index content cache.
content_cache_page_size = "8MiB"
## The options for full-text index in Mito engine. ## The options for full-text index in Mito engine.
[region_engine.mito.fulltext_index] [region_engine.mito.fulltext_index]
@@ -576,30 +557,6 @@ apply_on_query = "auto"
## - `[size]` e.g. `64MB`: fixed memory threshold ## - `[size]` e.g. `64MB`: fixed memory threshold
mem_threshold_on_create = "auto" mem_threshold_on_create = "auto"
## The options for bloom filter index in Mito engine.
[region_engine.mito.bloom_filter_index]
## Whether to create the index on flush.
## - `auto`: automatically (default)
## - `disable`: never
create_on_flush = "auto"
## Whether to create the index on compaction.
## - `auto`: automatically (default)
## - `disable`: never
create_on_compaction = "auto"
## Whether to apply the index on query
## - `auto`: automatically (default)
## - `disable`: never
apply_on_query = "auto"
## Memory threshold for the index creation.
## - `auto`: automatically determine the threshold based on the system memory size (default)
## - `unlimited`: no memory limit
## - `[size]` e.g. `64MB`: fixed memory threshold
mem_threshold_on_create = "auto"
[region_engine.mito.memtable] [region_engine.mito.memtable]
## Memtable type. ## Memtable type.
## - `time_series`: time-series memtable ## - `time_series`: time-series memtable

View File

@@ -2,10 +2,6 @@
## @toml2docs:none-default ## @toml2docs:none-default
default_timezone = "UTC" default_timezone = "UTC"
## The maximum in-flight write bytes.
## @toml2docs:none-default
#+ max_in_flight_write_bytes = "500MB"
## The runtime options. ## The runtime options.
#+ [runtime] #+ [runtime]
## The number of threads to execute the runtime for global read operations. ## The number of threads to execute the runtime for global read operations.

View File

@@ -8,13 +8,7 @@ bind_addr = "127.0.0.1:3002"
server_addr = "127.0.0.1:3002" server_addr = "127.0.0.1:3002"
## Store server address default to etcd store. ## Store server address default to etcd store.
store_addrs = ["127.0.0.1:2379"] store_addr = "127.0.0.1:2379"
## If it's not empty, the metasrv will store all data with this key prefix.
store_key_prefix = ""
## The datastore for meta server.
backend = "EtcdStore"
## Datanode selector type. ## Datanode selector type.
## - `round_robin` (default value) ## - `round_robin` (default value)
@@ -26,14 +20,20 @@ selector = "round_robin"
## Store data in memory. ## Store data in memory.
use_memory_store = false use_memory_store = false
## Whether to enable greptimedb telemetry.
enable_telemetry = true
## If it's not empty, the metasrv will store all data with this key prefix.
store_key_prefix = ""
## Whether to enable region failover. ## Whether to enable region failover.
## This feature is only available on GreptimeDB running on cluster mode and ## This feature is only available on GreptimeDB running on cluster mode and
## - Using Remote WAL ## - Using Remote WAL
## - Using shared storage (e.g., s3). ## - Using shared storage (e.g., s3).
enable_region_failover = false enable_region_failover = false
## Whether to enable greptimedb telemetry. Enabled by default. ## The datastore for meta server.
#+ enable_telemetry = true backend = "EtcdStore"
## The runtime options. ## The runtime options.
#+ [runtime] #+ [runtime]

View File

@@ -1,6 +1,9 @@
## The running mode of the datanode. It can be `standalone` or `distributed`. ## The running mode of the datanode. It can be `standalone` or `distributed`.
mode = "standalone" mode = "standalone"
## Enable telemetry to collect anonymous usage data.
enable_telemetry = true
## The default timezone of the server. ## The default timezone of the server.
## @toml2docs:none-default ## @toml2docs:none-default
default_timezone = "UTC" default_timezone = "UTC"
@@ -15,13 +18,6 @@ init_regions_parallelism = 16
## The maximum current queries allowed to be executed. Zero means unlimited. ## The maximum current queries allowed to be executed. Zero means unlimited.
max_concurrent_queries = 0 max_concurrent_queries = 0
## Enable telemetry to collect anonymous usage data. Enabled by default.
#+ enable_telemetry = true
## The maximum in-flight write bytes.
## @toml2docs:none-default
#+ max_in_flight_write_bytes = "500MB"
## The runtime options. ## The runtime options.
#+ [runtime] #+ [runtime]
## The number of threads to execute the runtime for global read operations. ## The number of threads to execute the runtime for global read operations.
@@ -151,15 +147,15 @@ dir = "/tmp/greptimedb/wal"
## The size of the WAL segment file. ## The size of the WAL segment file.
## **It's only used when the provider is `raft_engine`**. ## **It's only used when the provider is `raft_engine`**.
file_size = "128MB" file_size = "256MB"
## The threshold of the WAL size to trigger a flush. ## The threshold of the WAL size to trigger a flush.
## **It's only used when the provider is `raft_engine`**. ## **It's only used when the provider is `raft_engine`**.
purge_threshold = "1GB" purge_threshold = "4GB"
## The interval to trigger a flush. ## The interval to trigger a flush.
## **It's only used when the provider is `raft_engine`**. ## **It's only used when the provider is `raft_engine`**.
purge_interval = "1m" purge_interval = "10m"
## The read batch size. ## The read batch size.
## **It's only used when the provider is `raft_engine`**. ## **It's only used when the provider is `raft_engine`**.
@@ -336,14 +332,14 @@ data_home = "/tmp/greptimedb/"
## - `Oss`: the data is stored in the Aliyun OSS. ## - `Oss`: the data is stored in the Aliyun OSS.
type = "File" type = "File"
## Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance. ## Cache configuration for object storage such as 'S3' etc.
## A local file directory, defaults to `{data_home}/object_cache/read`. An empty string means disabling. ## The local file cache directory.
## @toml2docs:none-default ## @toml2docs:none-default
#+ cache_path = "" cache_path = "/path/local_cache"
## The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. ## The local file cache capacity in bytes.
## @toml2docs:none-default ## @toml2docs:none-default
cache_capacity = "5GiB" cache_capacity = "256MB"
## The S3 bucket name. ## The S3 bucket name.
## **It's only used when the storage type is `S3`, `Oss` and `Gcs`**. ## **It's only used when the storage type is `S3`, `Oss` and `Gcs`**.
@@ -417,23 +413,6 @@ endpoint = "https://s3.amazonaws.com"
## @toml2docs:none-default ## @toml2docs:none-default
region = "us-west-2" region = "us-west-2"
## The http client options to the storage.
## **It's only used when the storage type is `S3`, `Oss`, `Gcs` and `Azblob`**.
[storage.http_client]
## The maximum idle connection per host allowed in the pool.
pool_max_idle_per_host = 1024
## The timeout for only the connect phase of a http client.
connect_timeout = "30s"
## The total request timeout, applied from when the request starts connecting until the response body has finished.
## Also considered a total deadline.
timeout = "30s"
## The timeout for idle sockets being kept-alive.
pool_idle_timeout = "90s"
# Custom storage options # Custom storage options
# [[storage.providers]] # [[storage.providers]]
# name = "S3" # name = "S3"
@@ -518,14 +497,14 @@ auto_flush_interval = "1h"
## @toml2docs:none-default="Auto" ## @toml2docs:none-default="Auto"
#+ selector_result_cache_size = "512MB" #+ selector_result_cache_size = "512MB"
## Whether to enable the experimental write cache, it's enabled by default when using object storage. It is recommended to enable it when using object storage for better performance. ## Whether to enable the experimental write cache.
enable_experimental_write_cache = false enable_experimental_write_cache = false
## File system path for write cache, defaults to `{data_home}/object_cache/write`. ## File system path for write cache, defaults to `{data_home}/write_cache`.
experimental_write_cache_path = "" experimental_write_cache_path = ""
## Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. ## Capacity for write cache.
experimental_write_cache_size = "5GiB" experimental_write_cache_size = "512MB"
## TTL for write cache. ## TTL for write cache.
## @toml2docs:none-default ## @toml2docs:none-default
@@ -534,6 +513,12 @@ experimental_write_cache_ttl = "8h"
## Buffer size for SST writing. ## Buffer size for SST writing.
sst_write_buffer_size = "8MB" sst_write_buffer_size = "8MB"
## Parallelism to scan a region (default: 1/4 of cpu cores).
## - `0`: using the default value (1/4 of cpu cores).
## - `1`: scan in current thread.
## - `n`: scan in parallelism n.
scan_parallelism = 0
## Capacity of the channel to send data from parallel scan tasks to the main task. ## Capacity of the channel to send data from parallel scan tasks to the main task.
parallel_scan_channel_size = 32 parallel_scan_channel_size = 32
@@ -592,9 +577,6 @@ metadata_cache_size = "64MiB"
## Cache size for inverted index content. ## Cache size for inverted index content.
content_cache_size = "128MiB" content_cache_size = "128MiB"
## Page size for inverted index content cache.
content_cache_page_size = "8MiB"
## The options for full-text index in Mito engine. ## The options for full-text index in Mito engine.
[region_engine.mito.fulltext_index] [region_engine.mito.fulltext_index]
@@ -619,30 +601,6 @@ apply_on_query = "auto"
## - `[size]` e.g. `64MB`: fixed memory threshold ## - `[size]` e.g. `64MB`: fixed memory threshold
mem_threshold_on_create = "auto" mem_threshold_on_create = "auto"
## The options for bloom filter in Mito engine.
[region_engine.mito.bloom_filter_index]
## Whether to create the bloom filter on flush.
## - `auto`: automatically (default)
## - `disable`: never
create_on_flush = "auto"
## Whether to create the bloom filter on compaction.
## - `auto`: automatically (default)
## - `disable`: never
create_on_compaction = "auto"
## Whether to apply the bloom filter on query
## - `auto`: automatically (default)
## - `disable`: never
apply_on_query = "auto"
## Memory threshold for bloom filter creation.
## - `auto`: automatically determine the threshold based on the system memory size (default)
## - `unlimited`: no memory limit
## - `[size]` e.g. `64MB`: fixed memory threshold
mem_threshold_on_create = "auto"
[region_engine.mito.memtable] [region_engine.mito.memtable]
## Memtable type. ## Memtable type.
## - `time_series`: time-series memtable ## - `time_series`: time-series memtable

View File

@@ -13,6 +13,8 @@ RUN yum install -y epel-release \
openssl \ openssl \
openssl-devel \ openssl-devel \
centos-release-scl \ centos-release-scl \
rh-python38 \
rh-python38-python-devel \
which which
# Install protoc # Install protoc
@@ -41,6 +43,8 @@ RUN yum install -y epel-release \
openssl \ openssl \
openssl-devel \ openssl-devel \
centos-release-scl \ centos-release-scl \
rh-python38 \
rh-python38-python-devel \
which which
WORKDIR /greptime WORKDIR /greptime

View File

@@ -20,7 +20,10 @@ RUN --mount=type=cache,target=/var/cache/apt \
curl \ curl \
git \ git \
build-essential \ build-essential \
pkg-config pkg-config \
python3.10 \
python3.10-dev \
python3-pip
# Install Rust. # Install Rust.
SHELL ["/bin/bash", "-c"] SHELL ["/bin/bash", "-c"]
@@ -43,8 +46,15 @@ ARG OUTPUT_DIR
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get \ RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get \
-y install ca-certificates \ -y install ca-certificates \
python3.10 \
python3.10-dev \
python3-pip \
curl curl
COPY ./docker/python/requirements.txt /etc/greptime/requirements.txt
RUN python3 -m pip install -r /etc/greptime/requirements.txt
WORKDIR /greptime WORKDIR /greptime
COPY --from=builder /out/target/${OUTPUT_DIR}/greptime /greptime/bin/ COPY --from=builder /out/target/${OUTPUT_DIR}/greptime /greptime/bin/
ENV PATH /greptime/bin/:$PATH ENV PATH /greptime/bin/:$PATH

View File

@@ -7,7 +7,9 @@ RUN sed -i s/^#.*baseurl=http/baseurl=http/g /etc/yum.repos.d/*.repo
RUN yum install -y epel-release \ RUN yum install -y epel-release \
openssl \ openssl \
openssl-devel \ openssl-devel \
centos-release-scl centos-release-scl \
rh-python38 \
rh-python38-python-devel
ARG TARGETARCH ARG TARGETARCH

View File

@@ -8,8 +8,15 @@ ARG TARGET_BIN=greptime
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \ RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \
ca-certificates \ ca-certificates \
python3.10 \
python3.10-dev \
python3-pip \
curl curl
COPY $DOCKER_BUILD_ROOT/docker/python/requirements.txt /etc/greptime/requirements.txt
RUN python3 -m pip install -r /etc/greptime/requirements.txt
ARG TARGETARCH ARG TARGETARCH
ADD $TARGETARCH/$TARGET_BIN /greptime/bin/ ADD $TARGETARCH/$TARGET_BIN /greptime/bin/

View File

@@ -15,8 +15,8 @@ RUN apt-get update && \
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \ RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \
libssl-dev \ libssl-dev \
tzdata \ tzdata \
protobuf-compiler \
curl \ curl \
unzip \
ca-certificates \ ca-certificates \
git \ git \
build-essential \ build-essential \
@@ -24,20 +24,6 @@ RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \
python3.10 \ python3.10 \
python3.10-dev python3.10-dev
ARG TARGETPLATFORM
RUN echo "target platform: $TARGETPLATFORM"
# Install protobuf, because the one in the apt is too old (v3.12).
RUN if [ "$TARGETPLATFORM" = "linux/arm64" ]; then \
curl -OL https://github.com/protocolbuffers/protobuf/releases/download/v29.1/protoc-29.1-linux-aarch_64.zip && \
unzip protoc-29.1-linux-aarch_64.zip -d protoc3; \
elif [ "$TARGETPLATFORM" = "linux/amd64" ]; then \
curl -OL https://github.com/protocolbuffers/protobuf/releases/download/v29.1/protoc-29.1-linux-x86_64.zip && \
unzip protoc-29.1-linux-x86_64.zip -d protoc3; \
fi
RUN mv protoc3/bin/* /usr/local/bin/
RUN mv protoc3/include/* /usr/local/include/
# https://github.com/GreptimeTeam/greptimedb/actions/runs/10935485852/job/30357457188#step:3:7106 # https://github.com/GreptimeTeam/greptimedb/actions/runs/10935485852/job/30357457188#step:3:7106
# `aws-lc-sys` require gcc >= 10.3.0 to work, hence alias to use gcc-10 # `aws-lc-sys` require gcc >= 10.3.0 to work, hence alias to use gcc-10
RUN apt-get remove -y gcc-9 g++-9 cpp-9 && \ RUN apt-get remove -y gcc-9 g++-9 cpp-9 && \
@@ -63,7 +49,7 @@ RUN apt-get -y purge python3.8 && \
# wildcard here. However, that requires the git's config files and the submodules all owned by the very same user. # wildcard here. However, that requires the git's config files and the submodules all owned by the very same user.
# It's troublesome to do this since the dev build runs in Docker, which is under user "root"; while outside the Docker, # It's troublesome to do this since the dev build runs in Docker, which is under user "root"; while outside the Docker,
# it can be a different user that have prepared the submodules. # it can be a different user that have prepared the submodules.
RUN git config --global --add safe.directory '*' RUN git config --global --add safe.directory *
# Install Python dependencies. # Install Python dependencies.
COPY $DOCKER_BUILD_ROOT/docker/python/requirements.txt /etc/greptime/requirements.txt COPY $DOCKER_BUILD_ROOT/docker/python/requirements.txt /etc/greptime/requirements.txt

View File

@@ -4,13 +4,13 @@
example: example:
```bash ```bash
curl --data "trace,flow=debug" 127.0.0.1:4000/debug/log_level curl --data "trace;flow=debug" 127.0.0.1:4000/debug/log_level
``` ```
And database will reply with something like: And database will reply with something like:
```bash ```bash
Log Level changed from Some("info") to "trace,flow=debug"% Log Level changed from Some("info") to "trace;flow=debug"%
``` ```
The data is a string in the format of `global_level,module1=level1,module2=level2,...` that follow the same rule of `RUST_LOG`. The data is a string in the format of `global_level;module1=level1;module2=level2;...` that follow the same rule of `RUST_LOG`.
The module is the module name of the log, and the level is the log level. The log level can be one of the following: `trace`, `debug`, `info`, `warn`, `error`, `off`(case insensitive). The module is the module name of the log, and the level is the log level. The log level can be one of the following: `trace`, `debug`, `info`, `warn`, `error`, `off`(case insensitive).

View File

@@ -3,7 +3,7 @@
## HTTP API ## HTTP API
Sample at 99 Hertz, for 5 seconds, output report in [protobuf format](https://github.com/google/pprof/blob/master/proto/profile.proto). Sample at 99 Hertz, for 5 seconds, output report in [protobuf format](https://github.com/google/pprof/blob/master/proto/profile.proto).
```bash ```bash
curl -X POST -s '0:4000/debug/prof/cpu' > /tmp/pprof.out curl -s '0:4000/debug/prof/cpu' > /tmp/pprof.out
``` ```
Then you can use `pprof` command with the protobuf file. Then you can use `pprof` command with the protobuf file.
@@ -13,38 +13,10 @@ go tool pprof -top /tmp/pprof.out
Sample at 99 Hertz, for 60 seconds, output report in flamegraph format. Sample at 99 Hertz, for 60 seconds, output report in flamegraph format.
```bash ```bash
curl -X POST -s '0:4000/debug/prof/cpu?seconds=60&output=flamegraph' > /tmp/pprof.svg curl -s '0:4000/debug/prof/cpu?seconds=60&output=flamegraph' > /tmp/pprof.svg
``` ```
Sample at 49 Hertz, for 10 seconds, output report in text format. Sample at 49 Hertz, for 10 seconds, output report in text format.
```bash ```bash
curl -X POST -s '0:4000/debug/prof/cpu?seconds=10&frequency=49&output=text' > /tmp/pprof.txt curl -s '0:4000/debug/prof/cpu?seconds=10&frequency=49&output=text' > /tmp/pprof.txt
```
## Using `perf`
First find the pid of GreptimeDB:
Using `perf record` to profile GreptimeDB, at the sampling frequency of 99 hertz, and a duration of 60 seconds:
```bash
perf record -p <pid> --call-graph dwarf -F 99 -- sleep 60
```
The result will be saved to file `perf.data`.
Then
```bash
perf script --no-inline > perf.out
```
Produce a flame graph out of it:
```bash
git clone https://github.com/brendangregg/FlameGraph
FlameGraph/stackcollapse-perf.pl perf.out > perf.folded
FlameGraph/flamegraph.pl perf.folded > perf.svg
``` ```

View File

@@ -23,13 +23,13 @@ curl https://raw.githubusercontent.com/brendangregg/FlameGraph/master/flamegraph
Start GreptimeDB instance with environment variables: Start GreptimeDB instance with environment variables:
```bash ```bash
MALLOC_CONF=prof:true ./target/debug/greptime standalone start MALLOC_CONF=prof:true,lg_prof_interval:28 ./target/debug/greptime standalone start
``` ```
Dump memory profiling data through HTTP API: Dump memory profiling data through HTTP API:
```bash ```bash
curl -X POST localhost:4000/debug/prof/mem > greptime.hprof curl localhost:4000/debug/prof/mem > greptime.hprof
``` ```
You can periodically dump profiling data and compare them to find the delta memory usage. You can periodically dump profiling data and compare them to find the delta memory usage.

View File

@@ -5,13 +5,6 @@ GreptimeDB's official Grafana dashboard.
Status notify: we are still working on this config. It's expected to change frequently in the recent days. Please feel free to submit your feedback and/or contribution to this dashboard 🤗 Status notify: we are still working on this config. It's expected to change frequently in the recent days. Please feel free to submit your feedback and/or contribution to this dashboard 🤗
If you use Helm [chart](https://github.com/GreptimeTeam/helm-charts) to deploy GreptimeDB cluster, you can enable self-monitoring by setting the following values in your Helm chart:
- `monitoring.enabled=true`: Deploys a standalone GreptimeDB instance dedicated to monitoring the cluster;
- `grafana.enabled=true`: Deploys Grafana and automatically imports the monitoring dashboard;
The standalone GreptimeDB instance will collect metrics from your cluster and the dashboard will be available in the Grafana UI. For detailed deployment instructions, please refer to our [Kubernetes deployment guide](https://docs.greptime.com/nightly/user-guide/deployments/deploy-on-kubernetes/getting-started).
# How to use # How to use
## `greptimedb.json` ## `greptimedb.json`

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,3 +1,2 @@
[toolchain] [toolchain]
channel = "nightly-2024-10-19" channel = "nightly-2024-10-19"
components = ["rust-analyzer"]

View File

@@ -58,10 +58,8 @@ def main():
if not check_snafu_in_files(branch_name, other_rust_files) if not check_snafu_in_files(branch_name, other_rust_files)
] ]
if unused_snafu: for name in unused_snafu:
print("Unused error variants:") print(name)
for name in unused_snafu:
print(name)
if unused_snafu: if unused_snafu:
raise SystemExit(1) raise SystemExit(1)

View File

@@ -1,4 +1,4 @@
#!/bin/sh #!/usr/bin/env bash
set -ue set -ue
@@ -15,7 +15,7 @@ GITHUB_ORG=GreptimeTeam
GITHUB_REPO=greptimedb GITHUB_REPO=greptimedb
BIN=greptime BIN=greptime
get_os_type() { function get_os_type() {
os_type="$(uname -s)" os_type="$(uname -s)"
case "$os_type" in case "$os_type" in
@@ -31,7 +31,7 @@ get_os_type() {
esac esac
} }
get_arch_type() { function get_arch_type() {
arch_type="$(uname -m)" arch_type="$(uname -m)"
case "$arch_type" in case "$arch_type" in
@@ -53,7 +53,7 @@ get_arch_type() {
esac esac
} }
download_artifact() { function download_artifact() {
if [ -n "${OS_TYPE}" ] && [ -n "${ARCH_TYPE}" ]; then if [ -n "${OS_TYPE}" ] && [ -n "${ARCH_TYPE}" ]; then
# Use the latest stable released version. # Use the latest stable released version.
# GitHub API reference: https://docs.github.com/en/rest/releases/releases?apiVersion=2022-11-28#get-the-latest-release. # GitHub API reference: https://docs.github.com/en/rest/releases/releases?apiVersion=2022-11-28#get-the-latest-release.

View File

@@ -1,27 +0,0 @@
let
nixpkgs = fetchTarball "https://github.com/NixOS/nixpkgs/tarball/nixos-unstable";
fenix = import (fetchTarball "https://github.com/nix-community/fenix/archive/main.tar.gz") {};
pkgs = import nixpkgs { config = {}; overlays = []; };
in
pkgs.mkShell rec {
nativeBuildInputs = with pkgs; [
pkg-config
git
clang
gcc
protobuf
mold
(fenix.fromToolchainFile {
dir = ./.;
})
cargo-nextest
taplo
];
buildInputs = with pkgs; [
libgit2
];
LD_LIBRARY_PATH = pkgs.lib.makeLibraryPath buildInputs;
}

View File

@@ -36,14 +36,15 @@ use datatypes::vectors::{
TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, UInt32Vector, TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, UInt32Vector,
UInt64Vector, VectorRef, UInt64Vector, VectorRef,
}; };
use greptime_proto::v1;
use greptime_proto::v1::column_data_type_extension::TypeExt; use greptime_proto::v1::column_data_type_extension::TypeExt;
use greptime_proto::v1::ddl_request::Expr; use greptime_proto::v1::ddl_request::Expr;
use greptime_proto::v1::greptime_request::Request; use greptime_proto::v1::greptime_request::Request;
use greptime_proto::v1::query_request::Query; use greptime_proto::v1::query_request::Query;
use greptime_proto::v1::value::ValueData; use greptime_proto::v1::value::ValueData;
use greptime_proto::v1::{ use greptime_proto::v1::{
self, ColumnDataTypeExtension, DdlRequest, DecimalTypeExtension, JsonTypeExtension, ColumnDataTypeExtension, DdlRequest, DecimalTypeExtension, JsonTypeExtension, QueryRequest,
QueryRequest, Row, SemanticType, VectorTypeExtension, Row, SemanticType,
}; };
use paste::paste; use paste::paste;
use snafu::prelude::*; use snafu::prelude::*;
@@ -149,17 +150,6 @@ impl From<ColumnDataTypeWrapper> for ConcreteDataType {
ConcreteDataType::decimal128_default_datatype() ConcreteDataType::decimal128_default_datatype()
} }
} }
ColumnDataType::Vector => {
if let Some(TypeExt::VectorType(d)) = datatype_wrapper
.datatype_ext
.as_ref()
.and_then(|datatype_ext| datatype_ext.type_ext.as_ref())
{
ConcreteDataType::vector_datatype(d.dim)
} else {
ConcreteDataType::vector_default_datatype()
}
}
} }
} }
} }
@@ -241,15 +231,6 @@ impl ColumnDataTypeWrapper {
}), }),
} }
} }
pub fn vector_datatype(dim: u32) -> Self {
ColumnDataTypeWrapper {
datatype: ColumnDataType::Vector,
datatype_ext: Some(ColumnDataTypeExtension {
type_ext: Some(TypeExt::VectorType(VectorTypeExtension { dim })),
}),
}
}
} }
impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper { impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
@@ -268,7 +249,7 @@ impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
ConcreteDataType::UInt64(_) => ColumnDataType::Uint64, ConcreteDataType::UInt64(_) => ColumnDataType::Uint64,
ConcreteDataType::Float32(_) => ColumnDataType::Float32, ConcreteDataType::Float32(_) => ColumnDataType::Float32,
ConcreteDataType::Float64(_) => ColumnDataType::Float64, ConcreteDataType::Float64(_) => ColumnDataType::Float64,
ConcreteDataType::Binary(_) => ColumnDataType::Binary, ConcreteDataType::Binary(_) | ConcreteDataType::Json(_) => ColumnDataType::Binary,
ConcreteDataType::String(_) => ColumnDataType::String, ConcreteDataType::String(_) => ColumnDataType::String,
ConcreteDataType::Date(_) => ColumnDataType::Date, ConcreteDataType::Date(_) => ColumnDataType::Date,
ConcreteDataType::DateTime(_) => ColumnDataType::Datetime, ConcreteDataType::DateTime(_) => ColumnDataType::Datetime,
@@ -290,8 +271,6 @@ impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
IntervalType::MonthDayNano(_) => ColumnDataType::IntervalMonthDayNano, IntervalType::MonthDayNano(_) => ColumnDataType::IntervalMonthDayNano,
}, },
ConcreteDataType::Decimal128(_) => ColumnDataType::Decimal128, ConcreteDataType::Decimal128(_) => ColumnDataType::Decimal128,
ConcreteDataType::Json(_) => ColumnDataType::Json,
ConcreteDataType::Vector(_) => ColumnDataType::Vector,
ConcreteDataType::Null(_) ConcreteDataType::Null(_)
| ConcreteDataType::List(_) | ConcreteDataType::List(_)
| ConcreteDataType::Dictionary(_) | ConcreteDataType::Dictionary(_)
@@ -310,17 +289,15 @@ impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
})), })),
}) })
} }
ColumnDataType::Json => datatype.as_json().map(|_| ColumnDataTypeExtension { ColumnDataType::Binary => {
type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())), if datatype == ConcreteDataType::json_datatype() {
}), // Json is the same as binary in proto. The extension marks the binary in proto is actually a json.
ColumnDataType::Vector => { Some(ColumnDataTypeExtension {
datatype type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
.as_vector()
.map(|vector_type| ColumnDataTypeExtension {
type_ext: Some(TypeExt::VectorType(VectorTypeExtension {
dim: vector_type.dim as _,
})),
}) })
} else {
None
}
} }
_ => None, _ => None,
}; };
@@ -445,10 +422,6 @@ pub fn values_with_capacity(datatype: ColumnDataType, capacity: usize) -> Values
string_values: Vec::with_capacity(capacity), string_values: Vec::with_capacity(capacity),
..Default::default() ..Default::default()
}, },
ColumnDataType::Vector => Values {
binary_values: Vec::with_capacity(capacity),
..Default::default()
},
} }
} }
@@ -527,14 +500,13 @@ fn ddl_request_type(request: &DdlRequest) -> &'static str {
match request.expr { match request.expr {
Some(Expr::CreateDatabase(_)) => "ddl.create_database", Some(Expr::CreateDatabase(_)) => "ddl.create_database",
Some(Expr::CreateTable(_)) => "ddl.create_table", Some(Expr::CreateTable(_)) => "ddl.create_table",
Some(Expr::AlterTable(_)) => "ddl.alter_table", Some(Expr::Alter(_)) => "ddl.alter",
Some(Expr::DropTable(_)) => "ddl.drop_table", Some(Expr::DropTable(_)) => "ddl.drop_table",
Some(Expr::TruncateTable(_)) => "ddl.truncate_table", Some(Expr::TruncateTable(_)) => "ddl.truncate_table",
Some(Expr::CreateFlow(_)) => "ddl.create_flow", Some(Expr::CreateFlow(_)) => "ddl.create_flow",
Some(Expr::DropFlow(_)) => "ddl.drop_flow", Some(Expr::DropFlow(_)) => "ddl.drop_flow",
Some(Expr::CreateView(_)) => "ddl.create_view", Some(Expr::CreateView(_)) => "ddl.create_view",
Some(Expr::DropView(_)) => "ddl.drop_view", Some(Expr::DropView(_)) => "ddl.drop_view",
Some(Expr::AlterDatabase(_)) => "ddl.alter_database",
None => "ddl.empty", None => "ddl.empty",
} }
} }
@@ -701,7 +673,6 @@ pub fn pb_values_to_vector_ref(data_type: &ConcreteDataType, values: Values) ->
Decimal128::from_value_precision_scale(x.hi, x.lo, d.precision(), d.scale()).into() Decimal128::from_value_precision_scale(x.hi, x.lo, d.precision(), d.scale()).into()
}), }),
)), )),
ConcreteDataType::Vector(_) => Arc::new(BinaryVector::from_vec(values.binary_values)),
ConcreteDataType::Null(_) ConcreteDataType::Null(_)
| ConcreteDataType::List(_) | ConcreteDataType::List(_)
| ConcreteDataType::Dictionary(_) | ConcreteDataType::Dictionary(_)
@@ -867,7 +838,6 @@ pub fn pb_values_to_values(data_type: &ConcreteDataType, values: Values) -> Vec<
)) ))
}) })
.collect(), .collect(),
ConcreteDataType::Vector(_) => values.binary_values.into_iter().map(|v| v.into()).collect(),
ConcreteDataType::Null(_) ConcreteDataType::Null(_)
| ConcreteDataType::List(_) | ConcreteDataType::List(_)
| ConcreteDataType::Dictionary(_) | ConcreteDataType::Dictionary(_)
@@ -892,7 +862,10 @@ pub fn is_column_type_value_eq(
ColumnDataTypeWrapper::try_new(type_value, type_extension) ColumnDataTypeWrapper::try_new(type_value, type_extension)
.map(|wrapper| { .map(|wrapper| {
let datatype = ConcreteDataType::from(wrapper); let datatype = ConcreteDataType::from(wrapper);
expect_type == &datatype (datatype == *expect_type)
// Json type leverage binary type in pb, so this is valid.
|| (datatype == ConcreteDataType::binary_datatype()
&& *expect_type == ConcreteDataType::json_datatype())
}) })
.unwrap_or(false) .unwrap_or(false)
} }
@@ -1179,10 +1152,6 @@ mod tests {
let values = values_with_capacity(ColumnDataType::Decimal128, 2); let values = values_with_capacity(ColumnDataType::Decimal128, 2);
let values = values.decimal128_values; let values = values.decimal128_values;
assert_eq!(2, values.capacity()); assert_eq!(2, values.capacity());
let values = values_with_capacity(ColumnDataType::Vector, 2);
let values = values.binary_values;
assert_eq!(2, values.capacity());
} }
#[test] #[test]
@@ -1270,11 +1239,7 @@ mod tests {
assert_eq!( assert_eq!(
ConcreteDataType::decimal128_datatype(10, 2), ConcreteDataType::decimal128_datatype(10, 2),
ColumnDataTypeWrapper::decimal128_datatype(10, 2).into() ColumnDataTypeWrapper::decimal128_datatype(10, 2).into()
); )
assert_eq!(
ConcreteDataType::vector_datatype(3),
ColumnDataTypeWrapper::vector_datatype(3).into()
);
} }
#[test] #[test]
@@ -1370,10 +1335,6 @@ mod tests {
.try_into() .try_into()
.unwrap() .unwrap()
); );
assert_eq!(
ColumnDataTypeWrapper::vector_datatype(3),
ConcreteDataType::vector_datatype(3).try_into().unwrap()
);
let result: Result<ColumnDataTypeWrapper> = ConcreteDataType::null_datatype().try_into(); let result: Result<ColumnDataTypeWrapper> = ConcreteDataType::null_datatype().try_into();
assert!(result.is_err()); assert!(result.is_err());

View File

@@ -15,10 +15,8 @@
use std::collections::HashMap; use std::collections::HashMap;
use datatypes::schema::{ use datatypes::schema::{
ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextOptions, COMMENT_KEY, ColumnDefaultConstraint, ColumnSchema, FulltextOptions, COMMENT_KEY, FULLTEXT_KEY,
FULLTEXT_KEY, INVERTED_INDEX_KEY, SKIPPING_INDEX_KEY,
}; };
use greptime_proto::v1::Analyzer;
use snafu::ResultExt; use snafu::ResultExt;
use crate::error::{self, Result}; use crate::error::{self, Result};
@@ -27,10 +25,6 @@ use crate::v1::{ColumnDef, ColumnOptions, SemanticType};
/// Key used to store fulltext options in gRPC column options. /// Key used to store fulltext options in gRPC column options.
const FULLTEXT_GRPC_KEY: &str = "fulltext"; const FULLTEXT_GRPC_KEY: &str = "fulltext";
/// Key used to store inverted index options in gRPC column options.
const INVERTED_INDEX_GRPC_KEY: &str = "inverted_index";
/// Key used to store skip index options in gRPC column options.
const SKIPPING_INDEX_GRPC_KEY: &str = "skipping_index";
/// Tries to construct a `ColumnSchema` from the given `ColumnDef`. /// Tries to construct a `ColumnSchema` from the given `ColumnDef`.
pub fn try_as_column_schema(column_def: &ColumnDef) -> Result<ColumnSchema> { pub fn try_as_column_schema(column_def: &ColumnDef) -> Result<ColumnSchema> {
@@ -55,16 +49,10 @@ pub fn try_as_column_schema(column_def: &ColumnDef) -> Result<ColumnSchema> {
if !column_def.comment.is_empty() { if !column_def.comment.is_empty() {
metadata.insert(COMMENT_KEY.to_string(), column_def.comment.clone()); metadata.insert(COMMENT_KEY.to_string(), column_def.comment.clone());
} }
if let Some(options) = column_def.options.as_ref() { if let Some(options) = column_def.options.as_ref()
if let Some(fulltext) = options.options.get(FULLTEXT_GRPC_KEY) { && let Some(fulltext) = options.options.get(FULLTEXT_GRPC_KEY)
metadata.insert(FULLTEXT_KEY.to_string(), fulltext.clone()); {
} metadata.insert(FULLTEXT_KEY.to_string(), fulltext.to_string());
if let Some(inverted_index) = options.options.get(INVERTED_INDEX_GRPC_KEY) {
metadata.insert(INVERTED_INDEX_KEY.to_string(), inverted_index.clone());
}
if let Some(skipping_index) = options.options.get(SKIPPING_INDEX_GRPC_KEY) {
metadata.insert(SKIPPING_INDEX_KEY.to_string(), skipping_index.clone());
}
} }
ColumnSchema::new(&column_def.name, data_type.into(), column_def.is_nullable) ColumnSchema::new(&column_def.name, data_type.into(), column_def.is_nullable)
@@ -82,17 +70,7 @@ pub fn options_from_column_schema(column_schema: &ColumnSchema) -> Option<Column
if let Some(fulltext) = column_schema.metadata().get(FULLTEXT_KEY) { if let Some(fulltext) = column_schema.metadata().get(FULLTEXT_KEY) {
options options
.options .options
.insert(FULLTEXT_GRPC_KEY.to_string(), fulltext.clone()); .insert(FULLTEXT_GRPC_KEY.to_string(), fulltext.to_string());
}
if let Some(inverted_index) = column_schema.metadata().get(INVERTED_INDEX_KEY) {
options
.options
.insert(INVERTED_INDEX_GRPC_KEY.to_string(), inverted_index.clone());
}
if let Some(skipping_index) = column_schema.metadata().get(SKIPPING_INDEX_KEY) {
options
.options
.insert(SKIPPING_INDEX_GRPC_KEY.to_string(), skipping_index.clone());
} }
(!options.options.is_empty()).then_some(options) (!options.options.is_empty()).then_some(options)
@@ -115,14 +93,6 @@ pub fn options_from_fulltext(fulltext: &FulltextOptions) -> Result<Option<Column
Ok((!options.options.is_empty()).then_some(options)) Ok((!options.options.is_empty()).then_some(options))
} }
/// Tries to construct a `FulltextAnalyzer` from the given analyzer.
pub fn as_fulltext_option(analyzer: Analyzer) -> FulltextAnalyzer {
match analyzer {
Analyzer::English => FulltextAnalyzer::English,
Analyzer::Chinese => FulltextAnalyzer::Chinese,
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
@@ -145,13 +115,10 @@ mod tests {
comment: "test_comment".to_string(), comment: "test_comment".to_string(),
datatype_extension: None, datatype_extension: None,
options: Some(ColumnOptions { options: Some(ColumnOptions {
options: HashMap::from([ options: HashMap::from([(
( FULLTEXT_GRPC_KEY.to_string(),
FULLTEXT_GRPC_KEY.to_string(), "{\"enable\":true}".to_string(),
"{\"enable\":true}".to_string(), )]),
),
(INVERTED_INDEX_GRPC_KEY.to_string(), "true".to_string()),
]),
}), }),
}; };
@@ -172,7 +139,6 @@ mod tests {
..Default::default() ..Default::default()
} }
); );
assert!(schema.is_inverted_indexed());
} }
#[test] #[test]
@@ -187,17 +153,12 @@ mod tests {
analyzer: FulltextAnalyzer::English, analyzer: FulltextAnalyzer::English,
case_sensitive: false, case_sensitive: false,
}) })
.unwrap() .unwrap();
.set_inverted_index(true);
let options = options_from_column_schema(&schema).unwrap(); let options = options_from_column_schema(&schema).unwrap();
assert_eq!( assert_eq!(
options.options.get(FULLTEXT_GRPC_KEY).unwrap(), options.options.get(FULLTEXT_GRPC_KEY).unwrap(),
"{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false}" "{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false}"
); );
assert_eq!(
options.options.get(INVERTED_INDEX_GRPC_KEY).unwrap(),
"true"
);
} }
#[test] #[test]

View File

@@ -25,7 +25,6 @@ pub enum PermissionReq<'a> {
GrpcRequest(&'a Request), GrpcRequest(&'a Request),
SqlStatement(&'a Statement), SqlStatement(&'a Statement),
PromQuery, PromQuery,
LogQuery,
Opentsdb, Opentsdb,
LineProtocol, LineProtocol,
PromStoreWrite, PromStoreWrite,

View File

@@ -11,3 +11,4 @@ common-macro.workspace = true
common-meta.workspace = true common-meta.workspace = true
moka.workspace = true moka.workspace = true
snafu.workspace = true snafu.workspace = true
substrait.workspace = true

62
src/cache/src/lib.rs vendored
View File

@@ -19,9 +19,9 @@ use std::time::Duration;
use catalog::kvbackend::new_table_cache; use catalog::kvbackend::new_table_cache;
use common_meta::cache::{ use common_meta::cache::{
new_schema_cache, new_table_flownode_set_cache, new_table_info_cache, new_table_name_cache, new_table_flownode_set_cache, new_table_info_cache, new_table_name_cache,
new_table_route_cache, new_table_schema_cache, new_view_info_cache, CacheRegistry, new_table_route_cache, new_view_info_cache, CacheRegistry, CacheRegistryBuilder,
CacheRegistryBuilder, LayeredCacheRegistryBuilder, LayeredCacheRegistryBuilder,
}; };
use common_meta::kv_backend::KvBackendRef; use common_meta::kv_backend::KvBackendRef;
use moka::future::CacheBuilder; use moka::future::CacheBuilder;
@@ -37,47 +37,9 @@ pub const TABLE_INFO_CACHE_NAME: &str = "table_info_cache";
pub const VIEW_INFO_CACHE_NAME: &str = "view_info_cache"; pub const VIEW_INFO_CACHE_NAME: &str = "view_info_cache";
pub const TABLE_NAME_CACHE_NAME: &str = "table_name_cache"; pub const TABLE_NAME_CACHE_NAME: &str = "table_name_cache";
pub const TABLE_CACHE_NAME: &str = "table_cache"; pub const TABLE_CACHE_NAME: &str = "table_cache";
pub const SCHEMA_CACHE_NAME: &str = "schema_cache";
pub const TABLE_SCHEMA_NAME_CACHE_NAME: &str = "table_schema_name_cache";
pub const TABLE_FLOWNODE_SET_CACHE_NAME: &str = "table_flownode_set_cache"; pub const TABLE_FLOWNODE_SET_CACHE_NAME: &str = "table_flownode_set_cache";
pub const TABLE_ROUTE_CACHE_NAME: &str = "table_route_cache"; pub const TABLE_ROUTE_CACHE_NAME: &str = "table_route_cache";
/// Builds cache registry for datanode, including:
/// - Schema cache.
/// - Table id to schema name cache.
pub fn build_datanode_cache_registry(kv_backend: KvBackendRef) -> CacheRegistry {
// Builds table id schema name cache that never expires.
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY).build();
let table_id_schema_cache = Arc::new(new_table_schema_cache(
TABLE_SCHEMA_NAME_CACHE_NAME.to_string(),
cache,
kv_backend.clone(),
));
// Builds schema cache
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
.time_to_live(DEFAULT_CACHE_TTL)
.time_to_idle(DEFAULT_CACHE_TTI)
.build();
let schema_cache = Arc::new(new_schema_cache(
SCHEMA_CACHE_NAME.to_string(),
cache,
kv_backend.clone(),
));
CacheRegistryBuilder::default()
.add_cache(table_id_schema_cache)
.add_cache(schema_cache)
.build()
}
/// Builds cache registry for frontend and datanode, including:
/// - Table info cache
/// - Table name cache
/// - Table route cache
/// - Table flow node cache
/// - View cache
/// - Schema cache
pub fn build_fundamental_cache_registry(kv_backend: KvBackendRef) -> CacheRegistry { pub fn build_fundamental_cache_registry(kv_backend: KvBackendRef) -> CacheRegistry {
// Builds table info cache // Builds table info cache
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY) let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
@@ -133,30 +95,12 @@ pub fn build_fundamental_cache_registry(kv_backend: KvBackendRef) -> CacheRegist
kv_backend.clone(), kv_backend.clone(),
)); ));
// Builds schema cache
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
.time_to_live(DEFAULT_CACHE_TTL)
.time_to_idle(DEFAULT_CACHE_TTI)
.build();
let schema_cache = Arc::new(new_schema_cache(
SCHEMA_CACHE_NAME.to_string(),
cache,
kv_backend.clone(),
));
let table_id_schema_cache = Arc::new(new_table_schema_cache(
TABLE_SCHEMA_NAME_CACHE_NAME.to_string(),
CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY).build(),
kv_backend,
));
CacheRegistryBuilder::default() CacheRegistryBuilder::default()
.add_cache(table_info_cache) .add_cache(table_info_cache)
.add_cache(table_name_cache) .add_cache(table_name_cache)
.add_cache(table_route_cache) .add_cache(table_route_cache)
.add_cache(view_info_cache) .add_cache(view_info_cache)
.add_cache(table_flownode_set_cache) .add_cache(table_flownode_set_cache)
.add_cache(schema_cache)
.add_cache(table_id_schema_cache)
.build() .build()
} }

View File

@@ -18,6 +18,7 @@ async-stream.workspace = true
async-trait = "0.1" async-trait = "0.1"
bytes.workspace = true bytes.workspace = true
common-catalog.workspace = true common-catalog.workspace = true
common-config.workspace = true
common-error.workspace = true common-error.workspace = true
common-macro.workspace = true common-macro.workspace = true
common-meta.workspace = true common-meta.workspace = true
@@ -57,5 +58,7 @@ catalog = { workspace = true, features = ["testing"] }
chrono.workspace = true chrono.workspace = true
common-meta = { workspace = true, features = ["testing"] } common-meta = { workspace = true, features = ["testing"] }
common-query = { workspace = true, features = ["testing"] } common-query = { workspace = true, features = ["testing"] }
common-test-util.workspace = true
log-store.workspace = true
object-store.workspace = true object-store.workspace = true
tokio.workspace = true tokio.workspace = true

View File

@@ -64,13 +64,6 @@ pub enum Error {
source: BoxedError, source: BoxedError,
}, },
#[snafu(display("Failed to list flow stats"))]
ListFlowStats {
#[snafu(implicit)]
location: Location,
source: BoxedError,
},
#[snafu(display("Failed to list flows in catalog {catalog}"))] #[snafu(display("Failed to list flows in catalog {catalog}"))]
ListFlows { ListFlows {
#[snafu(implicit)] #[snafu(implicit)]
@@ -185,12 +178,6 @@ pub enum Error {
location: Location, location: Location,
}, },
#[snafu(display("Partition manager not found, it's not expected."))]
PartitionManagerNotFound {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to find table partitions"))] #[snafu(display("Failed to find table partitions"))]
FindPartitions { source: partition::error::Error }, FindPartitions { source: partition::error::Error },
@@ -314,7 +301,6 @@ impl ErrorExt for Error {
| Error::CastManager { .. } | Error::CastManager { .. }
| Error::Json { .. } | Error::Json { .. }
| Error::GetInformationExtension { .. } | Error::GetInformationExtension { .. }
| Error::PartitionManagerNotFound { .. }
| Error::ProcedureIdNotFound { .. } => StatusCode::Unexpected, | Error::ProcedureIdNotFound { .. } => StatusCode::Unexpected,
Error::ViewPlanColumnsChanged { .. } => StatusCode::InvalidArguments, Error::ViewPlanColumnsChanged { .. } => StatusCode::InvalidArguments,
@@ -333,7 +319,6 @@ impl ErrorExt for Error {
| Error::ListSchemas { source, .. } | Error::ListSchemas { source, .. }
| Error::ListTables { source, .. } | Error::ListTables { source, .. }
| Error::ListFlows { source, .. } | Error::ListFlows { source, .. }
| Error::ListFlowStats { source, .. }
| Error::ListProcedures { source, .. } | Error::ListProcedures { source, .. }
| Error::ListRegionStats { source, .. } | Error::ListRegionStats { source, .. }
| Error::ConvertProtoData { source, .. } => source.status_code(), | Error::ConvertProtoData { source, .. } => source.status_code(),

View File

@@ -1,101 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::ProcedureStatus;
use common_error::ext::BoxedError;
use common_meta::cluster::{ClusterInfo, NodeInfo};
use common_meta::datanode::RegionStat;
use common_meta::ddl::{ExecutorContext, ProcedureExecutor};
use common_meta::key::flow::flow_state::FlowStat;
use common_meta::rpc::procedure;
use common_procedure::{ProcedureInfo, ProcedureState};
use meta_client::MetaClientRef;
use snafu::ResultExt;
use crate::error;
use crate::information_schema::InformationExtension;
pub struct DistributedInformationExtension {
meta_client: MetaClientRef,
}
impl DistributedInformationExtension {
pub fn new(meta_client: MetaClientRef) -> Self {
Self { meta_client }
}
}
#[async_trait::async_trait]
impl InformationExtension for DistributedInformationExtension {
type Error = crate::error::Error;
async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
self.meta_client
.list_nodes(None)
.await
.map_err(BoxedError::new)
.context(error::ListNodesSnafu)
}
async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
let procedures = self
.meta_client
.list_procedures(&ExecutorContext::default())
.await
.map_err(BoxedError::new)
.context(error::ListProceduresSnafu)?
.procedures;
let mut result = Vec::with_capacity(procedures.len());
for procedure in procedures {
let pid = match procedure.id {
Some(pid) => pid,
None => return error::ProcedureIdNotFoundSnafu {}.fail(),
};
let pid = procedure::pb_pid_to_pid(&pid)
.map_err(BoxedError::new)
.context(error::ConvertProtoDataSnafu)?;
let status = ProcedureStatus::try_from(procedure.status)
.map(|v| v.as_str_name())
.unwrap_or("Unknown")
.to_string();
let procedure_info = ProcedureInfo {
id: pid,
type_name: procedure.type_name,
start_time_ms: procedure.start_time_ms,
end_time_ms: procedure.end_time_ms,
state: ProcedureState::Running,
lock_keys: procedure.lock_keys,
};
result.push((status, procedure_info));
}
Ok(result)
}
async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
self.meta_client
.list_region_stats()
.await
.map_err(BoxedError::new)
.context(error::ListRegionStatsSnafu)
}
async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
self.meta_client
.list_flow_stats()
.await
.map_err(BoxedError::new)
.context(crate::error::ListFlowStatsSnafu)
}
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
pub use client::{CachedKvBackend, CachedKvBackendBuilder, MetaKvBackend}; pub use client::{CachedMetaKvBackend, CachedMetaKvBackendBuilder, MetaKvBackend};
mod client; mod client;
mod manager; mod manager;

View File

@@ -18,11 +18,10 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::Duration; use std::time::Duration;
use common_error::ext::{BoxedError, ErrorExt}; use common_error::ext::BoxedError;
use common_meta::cache_invalidator::KvCacheInvalidator; use common_meta::cache_invalidator::KvCacheInvalidator;
use common_meta::error::Error::CacheNotGet; use common_meta::error::Error::CacheNotGet;
use common_meta::error::{CacheNotGetSnafu, Error, ExternalSnafu, GetKvCacheSnafu, Result}; use common_meta::error::{CacheNotGetSnafu, Error, ExternalSnafu, GetKvCacheSnafu, Result};
use common_meta::kv_backend::txn::{Txn, TxnResponse};
use common_meta::kv_backend::{KvBackend, KvBackendRef, TxnService}; use common_meta::kv_backend::{KvBackend, KvBackendRef, TxnService};
use common_meta::rpc::store::{ use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
@@ -37,27 +36,26 @@ use snafu::{OptionExt, ResultExt};
use crate::metrics::{ use crate::metrics::{
METRIC_CATALOG_KV_BATCH_GET, METRIC_CATALOG_KV_GET, METRIC_CATALOG_KV_REMOTE_GET, METRIC_CATALOG_KV_BATCH_GET, METRIC_CATALOG_KV_GET, METRIC_CATALOG_KV_REMOTE_GET,
METRIC_META_CLIENT_GET,
}; };
const DEFAULT_CACHE_MAX_CAPACITY: u64 = 10000; const DEFAULT_CACHE_MAX_CAPACITY: u64 = 10000;
const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(10 * 60); const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(10 * 60);
const DEFAULT_CACHE_TTI: Duration = Duration::from_secs(5 * 60); const DEFAULT_CACHE_TTI: Duration = Duration::from_secs(5 * 60);
pub struct CachedKvBackendBuilder { pub struct CachedMetaKvBackendBuilder {
cache_max_capacity: Option<u64>, cache_max_capacity: Option<u64>,
cache_ttl: Option<Duration>, cache_ttl: Option<Duration>,
cache_tti: Option<Duration>, cache_tti: Option<Duration>,
inner: KvBackendRef, meta_client: Arc<MetaClient>,
} }
impl CachedKvBackendBuilder { impl CachedMetaKvBackendBuilder {
pub fn new(inner: KvBackendRef) -> Self { pub fn new(meta_client: Arc<MetaClient>) -> Self {
Self { Self {
cache_max_capacity: None, cache_max_capacity: None,
cache_ttl: None, cache_ttl: None,
cache_tti: None, cache_tti: None,
inner, meta_client,
} }
} }
@@ -76,7 +74,7 @@ impl CachedKvBackendBuilder {
self self
} }
pub fn build(self) -> CachedKvBackend { pub fn build(self) -> CachedMetaKvBackend {
let cache_max_capacity = self let cache_max_capacity = self
.cache_max_capacity .cache_max_capacity
.unwrap_or(DEFAULT_CACHE_MAX_CAPACITY); .unwrap_or(DEFAULT_CACHE_MAX_CAPACITY);
@@ -87,11 +85,14 @@ impl CachedKvBackendBuilder {
.time_to_live(cache_ttl) .time_to_live(cache_ttl)
.time_to_idle(cache_tti) .time_to_idle(cache_tti)
.build(); .build();
let kv_backend = self.inner;
let kv_backend = Arc::new(MetaKvBackend {
client: self.meta_client,
});
let name = format!("CachedKvBackend({})", kv_backend.name()); let name = format!("CachedKvBackend({})", kv_backend.name());
let version = AtomicUsize::new(0); let version = AtomicUsize::new(0);
CachedKvBackend { CachedMetaKvBackend {
kv_backend, kv_backend,
cache, cache,
name, name,
@@ -111,29 +112,19 @@ pub type CacheBackend = Cache<Vec<u8>, KeyValue>;
/// Therefore, it is recommended to use CachedMetaKvBackend to only read metadata related /// Therefore, it is recommended to use CachedMetaKvBackend to only read metadata related
/// information. Note: If you read other information, you may read expired data, which depends on /// information. Note: If you read other information, you may read expired data, which depends on
/// TTL and TTI for cache. /// TTL and TTI for cache.
pub struct CachedKvBackend { pub struct CachedMetaKvBackend {
kv_backend: KvBackendRef, kv_backend: KvBackendRef,
cache: CacheBackend, cache: CacheBackend,
name: String, name: String,
version: AtomicUsize, version: AtomicUsize,
} }
#[async_trait::async_trait] impl TxnService for CachedMetaKvBackend {
impl TxnService for CachedKvBackend {
type Error = Error; type Error = Error;
async fn txn(&self, txn: Txn) -> std::result::Result<TxnResponse, Self::Error> {
// TODO(hl): txn of CachedKvBackend simply pass through to inner backend without invalidating caches.
self.kv_backend.txn(txn).await
}
fn max_txn_ops(&self) -> usize {
self.kv_backend.max_txn_ops()
}
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl KvBackend for CachedKvBackend { impl KvBackend for CachedMetaKvBackend {
fn name(&self) -> &str { fn name(&self) -> &str {
&self.name &self.name
} }
@@ -293,7 +284,7 @@ impl KvBackend for CachedKvBackend {
} }
.map_err(|e| { .map_err(|e| {
GetKvCacheSnafu { GetKvCacheSnafu {
err_msg: e.output_msg(), err_msg: e.to_string(),
} }
.build() .build()
}); });
@@ -314,7 +305,7 @@ impl KvBackend for CachedKvBackend {
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl KvCacheInvalidator for CachedKvBackend { impl KvCacheInvalidator for CachedMetaKvBackend {
async fn invalidate_key(&self, key: &[u8]) { async fn invalidate_key(&self, key: &[u8]) {
self.create_new_version(); self.create_new_version();
self.cache.invalidate(key).await; self.cache.invalidate(key).await;
@@ -322,7 +313,7 @@ impl KvCacheInvalidator for CachedKvBackend {
} }
} }
impl CachedKvBackend { impl CachedMetaKvBackend {
// only for test // only for test
#[cfg(test)] #[cfg(test)]
fn wrap(kv_backend: KvBackendRef) -> Self { fn wrap(kv_backend: KvBackendRef) -> Self {
@@ -446,8 +437,6 @@ impl KvBackend for MetaKvBackend {
} }
async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>> { async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>> {
let _timer = METRIC_META_CLIENT_GET.start_timer();
let mut response = self let mut response = self
.client .client
.range(RangeRequest::new().with_key(key)) .range(RangeRequest::new().with_key(key))
@@ -477,7 +466,7 @@ mod tests {
use common_meta::rpc::KeyValue; use common_meta::rpc::KeyValue;
use dashmap::DashMap; use dashmap::DashMap;
use super::CachedKvBackend; use super::CachedMetaKvBackend;
#[derive(Default)] #[derive(Default)]
pub struct SimpleKvBackend { pub struct SimpleKvBackend {
@@ -551,7 +540,7 @@ mod tests {
async fn test_cached_kv_backend() { async fn test_cached_kv_backend() {
let simple_kv = Arc::new(SimpleKvBackend::default()); let simple_kv = Arc::new(SimpleKvBackend::default());
let get_execute_times = simple_kv.get_execute_times.clone(); let get_execute_times = simple_kv.get_execute_times.clone();
let cached_kv = CachedKvBackend::wrap(simple_kv); let cached_kv = CachedMetaKvBackend::wrap(simple_kv);
add_some_vals(&cached_kv).await; add_some_vals(&cached_kv).await;

View File

@@ -38,7 +38,7 @@ pub fn new_table_cache(
) -> TableCache { ) -> TableCache {
let init = init_factory(table_info_cache, table_name_cache); let init = init_factory(table_info_cache, table_name_cache);
CacheContainer::new(name, cache, Box::new(invalidator), init, filter) CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter))
} }
fn init_factory( fn init_factory(

View File

@@ -30,7 +30,6 @@ use table::TableRef;
use crate::error::Result; use crate::error::Result;
pub mod error; pub mod error;
pub mod information_extension;
pub mod kvbackend; pub mod kvbackend;
pub mod memory; pub mod memory;
mod metrics; mod metrics;

View File

@@ -34,6 +34,4 @@ lazy_static! {
register_histogram!("greptime_catalog_kv_get", "catalog kv get").unwrap(); register_histogram!("greptime_catalog_kv_get", "catalog kv get").unwrap();
pub static ref METRIC_CATALOG_KV_BATCH_GET: Histogram = pub static ref METRIC_CATALOG_KV_BATCH_GET: Histogram =
register_histogram!("greptime_catalog_kv_batch_get", "catalog kv batch get").unwrap(); register_histogram!("greptime_catalog_kv_batch_get", "catalog kv batch get").unwrap();
pub static ref METRIC_META_CLIENT_GET: Histogram =
register_histogram!("greptime_meta_client_get", "meta client get").unwrap();
} }

View File

@@ -35,7 +35,6 @@ use common_catalog::consts::{self, DEFAULT_CATALOG_NAME, INFORMATION_SCHEMA_NAME
use common_error::ext::ErrorExt; use common_error::ext::ErrorExt;
use common_meta::cluster::NodeInfo; use common_meta::cluster::NodeInfo;
use common_meta::datanode::RegionStat; use common_meta::datanode::RegionStat;
use common_meta::key::flow::flow_state::FlowStat;
use common_meta::key::flow::FlowMetadataManager; use common_meta::key::flow::FlowMetadataManager;
use common_procedure::ProcedureInfo; use common_procedure::ProcedureInfo;
use common_recordbatch::SendableRecordBatchStream; use common_recordbatch::SendableRecordBatchStream;
@@ -193,7 +192,6 @@ impl SystemSchemaProviderInner for InformationSchemaProvider {
)) as _), )) as _),
FLOWS => Some(Arc::new(InformationSchemaFlows::new( FLOWS => Some(Arc::new(InformationSchemaFlows::new(
self.catalog_name.clone(), self.catalog_name.clone(),
self.catalog_manager.clone(),
self.flow_metadata_manager.clone(), self.flow_metadata_manager.clone(),
)) as _), )) as _),
PROCEDURE_INFO => Some( PROCEDURE_INFO => Some(
@@ -340,9 +338,6 @@ pub trait InformationExtension {
/// Gets the region statistics. /// Gets the region statistics.
async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error>; async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error>;
/// Get the flow statistics. If no flownode is available, return `None`.
async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error>;
} }
pub struct NoopInformationExtension; pub struct NoopInformationExtension;
@@ -362,8 +357,4 @@ impl InformationExtension for NoopInformationExtension {
async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> { async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
Ok(vec![]) Ok(vec![])
} }
async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
Ok(None)
}
} }

View File

@@ -12,12 +12,11 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::sync::{Arc, Weak}; use std::sync::Arc;
use common_catalog::consts::INFORMATION_SCHEMA_FLOW_TABLE_ID; use common_catalog::consts::INFORMATION_SCHEMA_FLOW_TABLE_ID;
use common_error::ext::BoxedError; use common_error::ext::BoxedError;
use common_meta::key::flow::flow_info::FlowInfoValue; use common_meta::key::flow::flow_info::FlowInfoValue;
use common_meta::key::flow::flow_state::FlowStat;
use common_meta::key::flow::FlowMetadataManager; use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::FlowId; use common_meta::key::FlowId;
use common_recordbatch::adapter::RecordBatchStreamAdapter; use common_recordbatch::adapter::RecordBatchStreamAdapter;
@@ -29,9 +28,7 @@ use datatypes::prelude::ConcreteDataType as CDT;
use datatypes::scalars::ScalarVectorBuilder; use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value; use datatypes::value::Value;
use datatypes::vectors::{ use datatypes::vectors::{Int64VectorBuilder, StringVectorBuilder, UInt32VectorBuilder, VectorRef};
Int64VectorBuilder, StringVectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder, VectorRef,
};
use futures::TryStreamExt; use futures::TryStreamExt;
use snafu::{OptionExt, ResultExt}; use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId}; use store_api::storage::{ScanRequest, TableId};
@@ -41,8 +38,6 @@ use crate::error::{
}; };
use crate::information_schema::{Predicates, FLOWS}; use crate::information_schema::{Predicates, FLOWS};
use crate::system_schema::information_schema::InformationTable; use crate::system_schema::information_schema::InformationTable;
use crate::system_schema::utils;
use crate::CatalogManager;
const INIT_CAPACITY: usize = 42; const INIT_CAPACITY: usize = 42;
@@ -50,7 +45,6 @@ const INIT_CAPACITY: usize = 42;
// pk is (flow_name, flow_id, table_catalog) // pk is (flow_name, flow_id, table_catalog)
pub const FLOW_NAME: &str = "flow_name"; pub const FLOW_NAME: &str = "flow_name";
pub const FLOW_ID: &str = "flow_id"; pub const FLOW_ID: &str = "flow_id";
pub const STATE_SIZE: &str = "state_size";
pub const TABLE_CATALOG: &str = "table_catalog"; pub const TABLE_CATALOG: &str = "table_catalog";
pub const FLOW_DEFINITION: &str = "flow_definition"; pub const FLOW_DEFINITION: &str = "flow_definition";
pub const COMMENT: &str = "comment"; pub const COMMENT: &str = "comment";
@@ -61,24 +55,20 @@ pub const FLOWNODE_IDS: &str = "flownode_ids";
pub const OPTIONS: &str = "options"; pub const OPTIONS: &str = "options";
/// The `information_schema.flows` to provides information about flows in databases. /// The `information_schema.flows` to provides information about flows in databases.
///
pub(super) struct InformationSchemaFlows { pub(super) struct InformationSchemaFlows {
schema: SchemaRef, schema: SchemaRef,
catalog_name: String, catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
flow_metadata_manager: Arc<FlowMetadataManager>, flow_metadata_manager: Arc<FlowMetadataManager>,
} }
impl InformationSchemaFlows { impl InformationSchemaFlows {
pub(super) fn new( pub(super) fn new(
catalog_name: String, catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
flow_metadata_manager: Arc<FlowMetadataManager>, flow_metadata_manager: Arc<FlowMetadataManager>,
) -> Self { ) -> Self {
Self { Self {
schema: Self::schema(), schema: Self::schema(),
catalog_name, catalog_name,
catalog_manager,
flow_metadata_manager, flow_metadata_manager,
} }
} }
@@ -90,7 +80,6 @@ impl InformationSchemaFlows {
vec![ vec![
(FLOW_NAME, CDT::string_datatype(), false), (FLOW_NAME, CDT::string_datatype(), false),
(FLOW_ID, CDT::uint32_datatype(), false), (FLOW_ID, CDT::uint32_datatype(), false),
(STATE_SIZE, CDT::uint64_datatype(), true),
(TABLE_CATALOG, CDT::string_datatype(), false), (TABLE_CATALOG, CDT::string_datatype(), false),
(FLOW_DEFINITION, CDT::string_datatype(), false), (FLOW_DEFINITION, CDT::string_datatype(), false),
(COMMENT, CDT::string_datatype(), true), (COMMENT, CDT::string_datatype(), true),
@@ -110,7 +99,6 @@ impl InformationSchemaFlows {
InformationSchemaFlowsBuilder::new( InformationSchemaFlowsBuilder::new(
self.schema.clone(), self.schema.clone(),
self.catalog_name.clone(), self.catalog_name.clone(),
self.catalog_manager.clone(),
&self.flow_metadata_manager, &self.flow_metadata_manager,
) )
} }
@@ -156,12 +144,10 @@ impl InformationTable for InformationSchemaFlows {
struct InformationSchemaFlowsBuilder { struct InformationSchemaFlowsBuilder {
schema: SchemaRef, schema: SchemaRef,
catalog_name: String, catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
flow_metadata_manager: Arc<FlowMetadataManager>, flow_metadata_manager: Arc<FlowMetadataManager>,
flow_names: StringVectorBuilder, flow_names: StringVectorBuilder,
flow_ids: UInt32VectorBuilder, flow_ids: UInt32VectorBuilder,
state_sizes: UInt64VectorBuilder,
table_catalogs: StringVectorBuilder, table_catalogs: StringVectorBuilder,
raw_sqls: StringVectorBuilder, raw_sqls: StringVectorBuilder,
comments: StringVectorBuilder, comments: StringVectorBuilder,
@@ -176,18 +162,15 @@ impl InformationSchemaFlowsBuilder {
fn new( fn new(
schema: SchemaRef, schema: SchemaRef,
catalog_name: String, catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
flow_metadata_manager: &Arc<FlowMetadataManager>, flow_metadata_manager: &Arc<FlowMetadataManager>,
) -> Self { ) -> Self {
Self { Self {
schema, schema,
catalog_name, catalog_name,
catalog_manager,
flow_metadata_manager: flow_metadata_manager.clone(), flow_metadata_manager: flow_metadata_manager.clone(),
flow_names: StringVectorBuilder::with_capacity(INIT_CAPACITY), flow_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
flow_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY), flow_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
state_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
table_catalogs: StringVectorBuilder::with_capacity(INIT_CAPACITY), table_catalogs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
raw_sqls: StringVectorBuilder::with_capacity(INIT_CAPACITY), raw_sqls: StringVectorBuilder::with_capacity(INIT_CAPACITY),
comments: StringVectorBuilder::with_capacity(INIT_CAPACITY), comments: StringVectorBuilder::with_capacity(INIT_CAPACITY),
@@ -212,11 +195,6 @@ impl InformationSchemaFlowsBuilder {
.flow_names(&catalog_name) .flow_names(&catalog_name)
.await; .await;
let flow_stat = {
let information_extension = utils::information_extension(&self.catalog_manager)?;
information_extension.flow_stats().await?
};
while let Some((flow_name, flow_id)) = stream while let Some((flow_name, flow_id)) = stream
.try_next() .try_next()
.await .await
@@ -235,7 +213,7 @@ impl InformationSchemaFlowsBuilder {
catalog_name: catalog_name.to_string(), catalog_name: catalog_name.to_string(),
flow_name: flow_name.to_string(), flow_name: flow_name.to_string(),
})?; })?;
self.add_flow(&predicates, flow_id.flow_id(), flow_info, &flow_stat)?; self.add_flow(&predicates, flow_id.flow_id(), flow_info)?;
} }
self.finish() self.finish()
@@ -246,7 +224,6 @@ impl InformationSchemaFlowsBuilder {
predicates: &Predicates, predicates: &Predicates,
flow_id: FlowId, flow_id: FlowId,
flow_info: FlowInfoValue, flow_info: FlowInfoValue,
flow_stat: &Option<FlowStat>,
) -> Result<()> { ) -> Result<()> {
let row = [ let row = [
(FLOW_NAME, &Value::from(flow_info.flow_name().to_string())), (FLOW_NAME, &Value::from(flow_info.flow_name().to_string())),
@@ -261,11 +238,6 @@ impl InformationSchemaFlowsBuilder {
} }
self.flow_names.push(Some(flow_info.flow_name())); self.flow_names.push(Some(flow_info.flow_name()));
self.flow_ids.push(Some(flow_id)); self.flow_ids.push(Some(flow_id));
self.state_sizes.push(
flow_stat
.as_ref()
.and_then(|state| state.state_size.get(&flow_id).map(|v| *v as u64)),
);
self.table_catalogs.push(Some(flow_info.catalog_name())); self.table_catalogs.push(Some(flow_info.catalog_name()));
self.raw_sqls.push(Some(flow_info.raw_sql())); self.raw_sqls.push(Some(flow_info.raw_sql()));
self.comments.push(Some(flow_info.comment())); self.comments.push(Some(flow_info.comment()));
@@ -298,7 +270,6 @@ impl InformationSchemaFlowsBuilder {
let columns: Vec<VectorRef> = vec![ let columns: Vec<VectorRef> = vec![
Arc::new(self.flow_names.finish()), Arc::new(self.flow_names.finish()),
Arc::new(self.flow_ids.finish()), Arc::new(self.flow_ids.finish()),
Arc::new(self.state_sizes.finish()),
Arc::new(self.table_catalogs.finish()), Arc::new(self.table_catalogs.finish()),
Arc::new(self.raw_sqls.finish()), Arc::new(self.raw_sqls.finish()),
Arc::new(self.comments.finish()), Arc::new(self.comments.finish()),

View File

@@ -54,10 +54,6 @@ const INIT_CAPACITY: usize = 42;
pub(crate) const PRI_CONSTRAINT_NAME: &str = "PRIMARY"; pub(crate) const PRI_CONSTRAINT_NAME: &str = "PRIMARY";
/// Time index constraint name /// Time index constraint name
pub(crate) const TIME_INDEX_CONSTRAINT_NAME: &str = "TIME INDEX"; pub(crate) const TIME_INDEX_CONSTRAINT_NAME: &str = "TIME INDEX";
/// Inverted index constraint name
pub(crate) const INVERTED_INDEX_CONSTRAINT_NAME: &str = "INVERTED INDEX";
/// Fulltext index constraint name
pub(crate) const FULLTEXT_INDEX_CONSTRAINT_NAME: &str = "FULLTEXT INDEX";
/// The virtual table implementation for `information_schema.KEY_COLUMN_USAGE`. /// The virtual table implementation for `information_schema.KEY_COLUMN_USAGE`.
pub(super) struct InformationSchemaKeyColumnUsage { pub(super) struct InformationSchemaKeyColumnUsage {
@@ -220,13 +216,14 @@ impl InformationSchemaKeyColumnUsageBuilder {
let mut stream = catalog_manager.tables(&catalog_name, &schema_name, None); let mut stream = catalog_manager.tables(&catalog_name, &schema_name, None);
while let Some(table) = stream.try_next().await? { while let Some(table) = stream.try_next().await? {
let mut primary_constraints = vec![];
let table_info = table.table_info(); let table_info = table.table_info();
let table_name = &table_info.name; let table_name = &table_info.name;
let keys = &table_info.meta.primary_key_indices; let keys = &table_info.meta.primary_key_indices;
let schema = table.schema(); let schema = table.schema();
for (idx, column) in schema.column_schemas().iter().enumerate() { for (idx, column) in schema.column_schemas().iter().enumerate() {
let mut constraints = vec![];
if column.is_time_index() { if column.is_time_index() {
self.add_key_column_usage( self.add_key_column_usage(
&predicates, &predicates,
@@ -239,31 +236,30 @@ impl InformationSchemaKeyColumnUsageBuilder {
1, //always 1 for time index 1, //always 1 for time index
); );
} }
// TODO(dimbtp): foreign key constraint not supported yet
if keys.contains(&idx) { if keys.contains(&idx) {
constraints.push(PRI_CONSTRAINT_NAME); primary_constraints.push((
} catalog_name.clone(),
if column.is_inverted_indexed() { schema_name.clone(),
constraints.push(INVERTED_INDEX_CONSTRAINT_NAME); table_name.to_string(),
column.name.clone(),
));
} }
// TODO(dimbtp): foreign key constraint not supported yet
}
if column.has_fulltext_index_key() { for (i, (catalog_name, schema_name, table_name, column_name)) in
constraints.push(FULLTEXT_INDEX_CONSTRAINT_NAME); primary_constraints.into_iter().enumerate()
} {
self.add_key_column_usage(
if !constraints.is_empty() { &predicates,
let aggregated_constraints = constraints.join(", "); &schema_name,
self.add_key_column_usage( PRI_CONSTRAINT_NAME,
&predicates, &catalog_name,
&schema_name, &schema_name,
&aggregated_constraints, &table_name,
&catalog_name, &column_name,
&schema_name, i as u32 + 1,
table_name, );
&column.name,
idx as u32 + 1,
);
}
} }
} }
} }

View File

@@ -34,14 +34,15 @@ use datatypes::vectors::{
}; };
use futures::{StreamExt, TryStreamExt}; use futures::{StreamExt, TryStreamExt};
use partition::manager::PartitionInfo; use partition::manager::PartitionInfo;
use partition::partition::PartitionDef;
use snafu::{OptionExt, ResultExt}; use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId}; use store_api::storage::{RegionId, ScanRequest, TableId};
use table::metadata::{TableInfo, TableType}; use table::metadata::{TableInfo, TableType};
use super::PARTITIONS; use super::PARTITIONS;
use crate::error::{ use crate::error::{
CreateRecordBatchSnafu, FindPartitionsSnafu, InternalSnafu, PartitionManagerNotFoundSnafu, CreateRecordBatchSnafu, FindPartitionsSnafu, InternalSnafu, Result,
Result, UpgradeWeakCatalogManagerRefSnafu, UpgradeWeakCatalogManagerRefSnafu,
}; };
use crate::kvbackend::KvBackendCatalogManager; use crate::kvbackend::KvBackendCatalogManager;
use crate::system_schema::information_schema::{InformationTable, Predicates}; use crate::system_schema::information_schema::{InformationTable, Predicates};
@@ -235,8 +236,7 @@ impl InformationSchemaPartitionsBuilder {
let partition_manager = catalog_manager let partition_manager = catalog_manager
.as_any() .as_any()
.downcast_ref::<KvBackendCatalogManager>() .downcast_ref::<KvBackendCatalogManager>()
.map(|catalog_manager| catalog_manager.partition_manager()) .map(|catalog_manager| catalog_manager.partition_manager());
.context(PartitionManagerNotFoundSnafu)?;
let predicates = Predicates::from_scan_request(&request); let predicates = Predicates::from_scan_request(&request);
@@ -262,10 +262,27 @@ impl InformationSchemaPartitionsBuilder {
let table_ids: Vec<TableId> = let table_ids: Vec<TableId> =
table_infos.iter().map(|info| info.ident.table_id).collect(); table_infos.iter().map(|info| info.ident.table_id).collect();
let mut table_partitions = partition_manager let mut table_partitions = if let Some(partition_manager) = &partition_manager {
.batch_find_table_partitions(&table_ids) partition_manager
.await .batch_find_table_partitions(&table_ids)
.context(FindPartitionsSnafu)?; .await
.context(FindPartitionsSnafu)?
} else {
// Current node must be a standalone instance, contains only one partition by default.
// TODO(dennis): change it when we support multi-regions for standalone.
table_ids
.into_iter()
.map(|table_id| {
(
table_id,
vec![PartitionInfo {
id: RegionId::new(table_id, 0),
partition: PartitionDef::new(vec![], vec![]),
}],
)
})
.collect()
};
for table_info in table_infos { for table_info in table_infos {
let partitions = table_partitions let partitions = table_partitions

View File

@@ -180,7 +180,7 @@ impl InformationSchemaSchemataBuilder {
.context(TableMetadataManagerSnafu)? .context(TableMetadataManagerSnafu)?
// information_schema is not available from this // information_schema is not available from this
// table_metadata_manager and we return None // table_metadata_manager and we return None
.map(|schema_opts| format!("{}", schema_opts.into_inner())) .map(|schema_opts| format!("{schema_opts}"))
} else { } else {
None None
}; };

View File

@@ -12,16 +12,13 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::collections::HashSet;
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use arrow_schema::SchemaRef as ArrowSchemaRef; use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::{INFORMATION_SCHEMA_TABLES_TABLE_ID, MITO_ENGINE}; use common_catalog::consts::INFORMATION_SCHEMA_TABLES_TABLE_ID;
use common_error::ext::BoxedError; use common_error::ext::BoxedError;
use common_meta::datanode::RegionStat;
use common_recordbatch::adapter::RecordBatchStreamAdapter; use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_telemetry::error;
use datafusion::execution::TaskContext; use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter; use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream; use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
@@ -34,7 +31,7 @@ use datatypes::vectors::{
}; };
use futures::TryStreamExt; use futures::TryStreamExt;
use snafu::{OptionExt, ResultExt}; use snafu::{OptionExt, ResultExt};
use store_api::storage::{RegionId, ScanRequest, TableId}; use store_api::storage::{ScanRequest, TableId};
use table::metadata::{TableInfo, TableType}; use table::metadata::{TableInfo, TableType};
use super::TABLES; use super::TABLES;
@@ -42,7 +39,6 @@ use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu, CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
}; };
use crate::system_schema::information_schema::{InformationTable, Predicates}; use crate::system_schema::information_schema::{InformationTable, Predicates};
use crate::system_schema::utils;
use crate::CatalogManager; use crate::CatalogManager;
pub const TABLE_CATALOG: &str = "table_catalog"; pub const TABLE_CATALOG: &str = "table_catalog";
@@ -238,51 +234,17 @@ impl InformationSchemaTablesBuilder {
.context(UpgradeWeakCatalogManagerRefSnafu)?; .context(UpgradeWeakCatalogManagerRefSnafu)?;
let predicates = Predicates::from_scan_request(&request); let predicates = Predicates::from_scan_request(&request);
let information_extension = utils::information_extension(&self.catalog_manager)?;
// TODO(dennis): `region_stats` API is not stable in distributed cluster because of network issue etc.
// But we don't want the statements such as `show tables` fail,
// so using `unwrap_or_else` here instead of `?` operator.
let region_stats = information_extension
.region_stats()
.await
.map_err(|e| {
error!(e; "Failed to call region_stats");
e
})
.unwrap_or_else(|_| vec![]);
for schema_name in catalog_manager.schema_names(&catalog_name, None).await? { for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
let mut stream = catalog_manager.tables(&catalog_name, &schema_name, None); let mut stream = catalog_manager.tables(&catalog_name, &schema_name, None);
while let Some(table) = stream.try_next().await? { while let Some(table) = stream.try_next().await? {
let table_info = table.table_info(); let table_info = table.table_info();
// TODO(dennis): make it working for metric engine
let table_region_stats =
if table_info.meta.engine == MITO_ENGINE || table_info.is_physical_table() {
let region_ids = table_info
.meta
.region_numbers
.iter()
.map(|n| RegionId::new(table_info.ident.table_id, *n))
.collect::<HashSet<_>>();
region_stats
.iter()
.filter(|stat| region_ids.contains(&stat.id))
.collect::<Vec<_>>()
} else {
vec![]
};
self.add_table( self.add_table(
&predicates, &predicates,
&catalog_name, &catalog_name,
&schema_name, &schema_name,
table_info, table_info,
table.table_type(), table.table_type(),
&table_region_stats,
); );
} }
} }
@@ -298,7 +260,6 @@ impl InformationSchemaTablesBuilder {
schema_name: &str, schema_name: &str,
table_info: Arc<TableInfo>, table_info: Arc<TableInfo>,
table_type: TableType, table_type: TableType,
region_stats: &[&RegionStat],
) { ) {
let table_name = table_info.name.as_ref(); let table_name = table_info.name.as_ref();
let table_id = table_info.table_id(); let table_id = table_info.table_id();
@@ -312,9 +273,7 @@ impl InformationSchemaTablesBuilder {
let row = [ let row = [
(TABLE_CATALOG, &Value::from(catalog_name)), (TABLE_CATALOG, &Value::from(catalog_name)),
(TABLE_ID, &Value::from(table_id)),
(TABLE_SCHEMA, &Value::from(schema_name)), (TABLE_SCHEMA, &Value::from(schema_name)),
(ENGINE, &Value::from(engine)),
(TABLE_NAME, &Value::from(table_name)), (TABLE_NAME, &Value::from(table_name)),
(TABLE_TYPE, &Value::from(table_type_text)), (TABLE_TYPE, &Value::from(table_type_text)),
]; ];
@@ -328,39 +287,21 @@ impl InformationSchemaTablesBuilder {
self.table_names.push(Some(table_name)); self.table_names.push(Some(table_name));
self.table_types.push(Some(table_type_text)); self.table_types.push(Some(table_type_text));
self.table_ids.push(Some(table_id)); self.table_ids.push(Some(table_id));
let data_length = region_stats.iter().map(|stat| stat.sst_size).sum();
let table_rows = region_stats.iter().map(|stat| stat.num_rows).sum();
let index_length = region_stats.iter().map(|stat| stat.index_size).sum();
// It's not precise, but it is acceptable for long-term data storage.
let avg_row_length = if table_rows > 0 {
let total_data_length = data_length
+ region_stats
.iter()
.map(|stat| stat.memtable_size)
.sum::<u64>();
total_data_length / table_rows
} else {
0
};
self.data_length.push(Some(data_length));
self.index_length.push(Some(index_length));
self.table_rows.push(Some(table_rows));
self.avg_row_length.push(Some(avg_row_length));
// TODO(sunng87): use real data for these fields // TODO(sunng87): use real data for these fields
self.data_length.push(Some(0));
self.max_data_length.push(Some(0)); self.max_data_length.push(Some(0));
self.checksum.push(Some(0)); self.index_length.push(Some(0));
self.avg_row_length.push(Some(0));
self.max_index_length.push(Some(0)); self.max_index_length.push(Some(0));
self.checksum.push(Some(0));
self.table_rows.push(Some(0));
self.data_free.push(Some(0)); self.data_free.push(Some(0));
self.auto_increment.push(Some(0)); self.auto_increment.push(Some(0));
self.row_format.push(Some("Fixed")); self.row_format.push(Some("Fixed"));
self.table_collation.push(Some("utf8_bin")); self.table_collation.push(Some("utf8_bin"));
self.update_time.push(None); self.update_time.push(None);
self.check_time.push(None); self.check_time.push(None);
// use mariadb default table version number here // use mariadb default table version number here
self.version.push(Some(11)); self.version.push(Some(11));
self.table_comment.push(table_info.desc.as_deref()); self.table_comment.push(table_info.desc.as_deref());

View File

@@ -1,62 +0,0 @@
[package]
name = "cli"
version.workspace = true
edition.workspace = true
license.workspace = true
[lints]
workspace = true
[dependencies]
async-trait.workspace = true
auth.workspace = true
base64.workspace = true
cache.workspace = true
catalog.workspace = true
chrono.workspace = true
clap.workspace = true
client = { workspace = true, features = ["testing"] }
common-base.workspace = true
common-catalog.workspace = true
common-config.workspace = true
common-error.workspace = true
common-grpc.workspace = true
common-macro.workspace = true
common-meta.workspace = true
common-procedure.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true
common-runtime.workspace = true
common-telemetry = { workspace = true, features = [
"deadlock_detection",
] }
common-time.workspace = true
common-version.workspace = true
common-wal.workspace = true
datatypes.workspace = true
either = "1.8"
etcd-client.workspace = true
futures.workspace = true
humantime.workspace = true
meta-client.workspace = true
nu-ansi-term = "0.46"
query.workspace = true
rand.workspace = true
reqwest.workspace = true
rustyline = "10.1"
serde.workspace = true
serde_json.workspace = true
servers.workspace = true
session.workspace = true
snafu.workspace = true
store-api.workspace = true
substrait.workspace = true
table.workspace = true
tokio.workspace = true
tracing-appender.workspace = true
[dev-dependencies]
common-test-util.workspace = true
common-version.workspace = true
serde.workspace = true
tempfile.workspace = true

View File

@@ -1,316 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use rustyline::error::ReadlineError;
use snafu::{Location, Snafu};
#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Failed to install ring crypto provider: {}", msg))]
InitTlsProvider {
#[snafu(implicit)]
location: Location,
msg: String,
},
#[snafu(display("Failed to create default catalog and schema"))]
InitMetadata {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
},
#[snafu(display("Failed to init DDL manager"))]
InitDdlManager {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
},
#[snafu(display("Failed to init default timezone"))]
InitTimezone {
#[snafu(implicit)]
location: Location,
source: common_time::error::Error,
},
#[snafu(display("Failed to start procedure manager"))]
StartProcedureManager {
#[snafu(implicit)]
location: Location,
source: common_procedure::error::Error,
},
#[snafu(display("Failed to stop procedure manager"))]
StopProcedureManager {
#[snafu(implicit)]
location: Location,
source: common_procedure::error::Error,
},
#[snafu(display("Failed to start wal options allocator"))]
StartWalOptionsAllocator {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
},
#[snafu(display("Missing config, msg: {}", msg))]
MissingConfig {
msg: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Illegal config: {}", msg))]
IllegalConfig {
msg: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid REPL command: {reason}"))]
InvalidReplCommand { reason: String },
#[snafu(display("Cannot create REPL"))]
ReplCreation {
#[snafu(source)]
error: ReadlineError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Error reading command"))]
Readline {
#[snafu(source)]
error: ReadlineError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to request database, sql: {sql}"))]
RequestDatabase {
sql: String,
#[snafu(source)]
source: client::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to collect RecordBatches"))]
CollectRecordBatches {
#[snafu(implicit)]
location: Location,
source: common_recordbatch::error::Error,
},
#[snafu(display("Failed to pretty print Recordbatches"))]
PrettyPrintRecordBatches {
#[snafu(implicit)]
location: Location,
source: common_recordbatch::error::Error,
},
#[snafu(display("Failed to start Meta client"))]
StartMetaClient {
#[snafu(implicit)]
location: Location,
source: meta_client::error::Error,
},
#[snafu(display("Failed to parse SQL: {}", sql))]
ParseSql {
sql: String,
#[snafu(implicit)]
location: Location,
source: query::error::Error,
},
#[snafu(display("Failed to plan statement"))]
PlanStatement {
#[snafu(implicit)]
location: Location,
source: query::error::Error,
},
#[snafu(display("Failed to encode logical plan in substrait"))]
SubstraitEncodeLogicalPlan {
#[snafu(implicit)]
location: Location,
source: substrait::error::Error,
},
#[snafu(display("Failed to load layered config"))]
LoadLayeredConfig {
#[snafu(source(from(common_config::error::Error, Box::new)))]
source: Box<common_config::error::Error>,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to connect to Etcd at {etcd_addr}"))]
ConnectEtcd {
etcd_addr: String,
#[snafu(source)]
error: etcd_client::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to serde json"))]
SerdeJson {
#[snafu(source)]
error: serde_json::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to run http request: {reason}"))]
HttpQuerySql {
reason: String,
#[snafu(source)]
error: reqwest::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Empty result from output"))]
EmptyResult {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to manipulate file"))]
FileIo {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: std::io::Error,
},
#[snafu(display("Failed to create directory {}", dir))]
CreateDir {
dir: String,
#[snafu(source)]
error: std::io::Error,
},
#[snafu(display("Failed to spawn thread"))]
SpawnThread {
#[snafu(source)]
error: std::io::Error,
},
#[snafu(display("Other error"))]
Other {
source: BoxedError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to build runtime"))]
BuildRuntime {
#[snafu(implicit)]
location: Location,
source: common_runtime::error::Error,
},
#[snafu(display("Failed to get cache from cache registry: {}", name))]
CacheRequired {
#[snafu(implicit)]
location: Location,
name: String,
},
#[snafu(display("Failed to build cache registry"))]
BuildCacheRegistry {
#[snafu(implicit)]
location: Location,
source: cache::error::Error,
},
#[snafu(display("Failed to initialize meta client"))]
MetaClientInit {
#[snafu(implicit)]
location: Location,
source: meta_client::error::Error,
},
#[snafu(display("Cannot find schema {schema} in catalog {catalog}"))]
SchemaNotFound {
catalog: String,
schema: String,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::InitMetadata { source, .. } | Error::InitDdlManager { source, .. } => {
source.status_code()
}
Error::MissingConfig { .. }
| Error::LoadLayeredConfig { .. }
| Error::IllegalConfig { .. }
| Error::InvalidReplCommand { .. }
| Error::InitTimezone { .. }
| Error::ConnectEtcd { .. }
| Error::CreateDir { .. }
| Error::EmptyResult { .. } => StatusCode::InvalidArguments,
Error::StartProcedureManager { source, .. }
| Error::StopProcedureManager { source, .. } => source.status_code(),
Error::StartWalOptionsAllocator { source, .. } => source.status_code(),
Error::ReplCreation { .. } | Error::Readline { .. } | Error::HttpQuerySql { .. } => {
StatusCode::Internal
}
Error::RequestDatabase { source, .. } => source.status_code(),
Error::CollectRecordBatches { source, .. }
| Error::PrettyPrintRecordBatches { source, .. } => source.status_code(),
Error::StartMetaClient { source, .. } => source.status_code(),
Error::ParseSql { source, .. } | Error::PlanStatement { source, .. } => {
source.status_code()
}
Error::SubstraitEncodeLogicalPlan { source, .. } => source.status_code(),
Error::SerdeJson { .. }
| Error::FileIo { .. }
| Error::SpawnThread { .. }
| Error::InitTlsProvider { .. } => StatusCode::Unexpected,
Error::Other { source, .. } => source.status_code(),
Error::BuildRuntime { source, .. } => source.status_code(),
Error::CacheRequired { .. } | Error::BuildCacheRegistry { .. } => StatusCode::Internal,
Error::MetaClientInit { source, .. } => source.status_code(),
Error::SchemaNotFound { .. } => StatusCode::DatabaseNotFound,
}
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -1,60 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod bench;
pub mod error;
// Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373
#[allow(unused)]
mod cmd;
mod export;
mod helper;
// Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373
mod database;
mod import;
#[allow(unused)]
mod repl;
use async_trait::async_trait;
use clap::Parser;
use common_error::ext::BoxedError;
pub use database::DatabaseClient;
use error::Result;
pub use repl::Repl;
pub use crate::bench::BenchTableMetadataCommand;
pub use crate::export::ExportCommand;
pub use crate::import::ImportCommand;
#[async_trait]
pub trait Tool: Send + Sync {
async fn do_work(&self) -> std::result::Result<(), BoxedError>;
}
#[derive(Debug, Parser)]
pub(crate) struct AttachCommand {
#[clap(long)]
pub(crate) grpc_addr: String,
#[clap(long)]
pub(crate) meta_addr: Option<String>,
#[clap(long, action)]
pub(crate) disable_helper: bool,
}
impl AttachCommand {
#[allow(dead_code)]
async fn build(self) -> Result<Box<dyn Tool>> {
unimplemented!("Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373")
}
}

View File

@@ -42,6 +42,8 @@ tonic.workspace = true
[dev-dependencies] [dev-dependencies]
common-grpc-expr.workspace = true common-grpc-expr.workspace = true
datanode.workspace = true
derive-new = "0.5"
tracing = "0.1" tracing = "0.1"
[dev-dependencies.substrait_proto] [dev-dependencies.substrait_proto]

View File

@@ -18,7 +18,7 @@ use api::v1::greptime_database_client::GreptimeDatabaseClient;
use api::v1::greptime_request::Request; use api::v1::greptime_request::Request;
use api::v1::query_request::Query; use api::v1::query_request::Query;
use api::v1::{ use api::v1::{
AlterTableExpr, AuthHeader, CreateTableExpr, DdlRequest, GreptimeRequest, InsertRequests, AlterExpr, AuthHeader, CreateTableExpr, DdlRequest, GreptimeRequest, InsertRequests,
QueryRequest, RequestHeader, QueryRequest, RequestHeader,
}; };
use arrow_flight::Ticket; use arrow_flight::Ticket;
@@ -211,9 +211,9 @@ impl Database {
.await .await
} }
pub async fn alter(&self, expr: AlterTableExpr) -> Result<Output> { pub async fn alter(&self, expr: AlterExpr) -> Result<Output> {
self.do_get(Request::Ddl(DdlRequest { self.do_get(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::AlterTable(expr)), expr: Some(DdlExpr::Alter(expr)),
})) }))
.await .await
} }

View File

@@ -25,7 +25,6 @@ cache.workspace = true
catalog.workspace = true catalog.workspace = true
chrono.workspace = true chrono.workspace = true
clap.workspace = true clap.workspace = true
cli.workspace = true
client.workspace = true client.workspace = true
common-base.workspace = true common-base.workspace = true
common-catalog.workspace = true common-catalog.workspace = true
@@ -34,7 +33,6 @@ common-error.workspace = true
common-grpc.workspace = true common-grpc.workspace = true
common-macro.workspace = true common-macro.workspace = true
common-meta.workspace = true common-meta.workspace = true
common-options.workspace = true
common-procedure.workspace = true common-procedure.workspace = true
common-query.workspace = true common-query.workspace = true
common-recordbatch.workspace = true common-recordbatch.workspace = true
@@ -54,7 +52,6 @@ flow.workspace = true
frontend = { workspace = true, default-features = false } frontend = { workspace = true, default-features = false }
futures.workspace = true futures.workspace = true
human-panic = "2.0" human-panic = "2.0"
humantime.workspace = true
lazy_static.workspace = true lazy_static.workspace = true
meta-client.workspace = true meta-client.workspace = true
meta-srv.workspace = true meta-srv.workspace = true

View File

@@ -12,17 +12,39 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
mod bench;
// Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373
#[allow(unused)]
mod cmd;
mod export;
mod helper;
// Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373
mod database;
mod import;
#[allow(unused)]
mod repl;
use async_trait::async_trait;
use bench::BenchTableMetadataCommand;
use clap::Parser; use clap::Parser;
use cli::Tool;
use common_telemetry::logging::{LoggingOptions, TracingOptions}; use common_telemetry::logging::{LoggingOptions, TracingOptions};
use plugins::SubCommand; pub use repl::Repl;
use snafu::ResultExt;
use tracing_appender::non_blocking::WorkerGuard; use tracing_appender::non_blocking::WorkerGuard;
use self::export::ExportCommand;
use crate::cli::import::ImportCommand;
use crate::error::Result;
use crate::options::GlobalOptions; use crate::options::GlobalOptions;
use crate::{error, App, Result}; use crate::App;
pub const APP_NAME: &str = "greptime-cli"; pub const APP_NAME: &str = "greptime-cli";
use async_trait::async_trait;
#[async_trait]
pub trait Tool: Send + Sync {
async fn do_work(&self) -> Result<()>;
}
pub struct Instance { pub struct Instance {
tool: Box<dyn Tool>, tool: Box<dyn Tool>,
@@ -32,16 +54,12 @@ pub struct Instance {
} }
impl Instance { impl Instance {
pub fn new(tool: Box<dyn Tool>, guard: Vec<WorkerGuard>) -> Self { fn new(tool: Box<dyn Tool>, guard: Vec<WorkerGuard>) -> Self {
Self { Self {
tool, tool,
_guard: guard, _guard: guard,
} }
} }
pub async fn start(&mut self) -> Result<()> {
self.tool.do_work().await.context(error::StartCliSnafu)
}
} }
#[async_trait] #[async_trait]
@@ -51,8 +69,7 @@ impl App for Instance {
} }
async fn start(&mut self) -> Result<()> { async fn start(&mut self) -> Result<()> {
self.start().await.unwrap(); self.tool.do_work().await
Ok(())
} }
fn wait_signal(&self) -> bool { fn wait_signal(&self) -> bool {
@@ -79,12 +96,7 @@ impl Command {
None, None,
); );
let tool = self.cmd.build().await.context(error::BuildCliSnafu)?; self.cmd.build(guard).await
let instance = Instance {
tool,
_guard: guard,
};
Ok(instance)
} }
pub fn load_options(&self, global_options: &GlobalOptions) -> Result<LoggingOptions> { pub fn load_options(&self, global_options: &GlobalOptions) -> Result<LoggingOptions> {
@@ -100,81 +112,38 @@ impl Command {
} }
} }
#[cfg(test)] #[derive(Parser)]
mod tests { enum SubCommand {
use clap::Parser; // Attach(AttachCommand),
use client::{Client, Database}; Bench(BenchTableMetadataCommand),
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; Export(ExportCommand),
use common_telemetry::logging::LoggingOptions; Import(ImportCommand),
}
use crate::error::Result as CmdResult; impl SubCommand {
use crate::options::GlobalOptions; async fn build(&self, guard: Vec<WorkerGuard>) -> Result<Instance> {
use crate::{cli, standalone, App}; match self {
// SubCommand::Attach(cmd) => cmd.build().await,
#[tokio::test(flavor = "multi_thread")] SubCommand::Bench(cmd) => cmd.build(guard).await,
async fn test_export_create_table_with_quoted_names() -> CmdResult<()> { SubCommand::Export(cmd) => cmd.build(guard).await,
let output_dir = tempfile::tempdir().unwrap(); SubCommand::Import(cmd) => cmd.build(guard).await,
}
let standalone = standalone::Command::parse_from([ }
"standalone", }
"start",
"--data-home", #[derive(Debug, Parser)]
&*output_dir.path().to_string_lossy(), pub(crate) struct AttachCommand {
]); #[clap(long)]
pub(crate) grpc_addr: String,
let standalone_opts = standalone.load_options(&GlobalOptions::default()).unwrap(); #[clap(long)]
let mut instance = standalone.build(standalone_opts).await?; pub(crate) meta_addr: Option<String>,
instance.start().await?; #[clap(long, action)]
pub(crate) disable_helper: bool,
let client = Client::with_urls(["127.0.0.1:4001"]); }
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
database impl AttachCommand {
.sql(r#"CREATE DATABASE "cli.export.create_table";"#) #[allow(dead_code)]
.await async fn build(self) -> Result<Instance> {
.unwrap(); unimplemented!("Wait for https://github.com/GreptimeTeam/greptimedb/issues/2373")
database
.sql(
r#"CREATE TABLE "cli.export.create_table"."a.b.c"(
ts TIMESTAMP,
TIME INDEX (ts)
) engine=mito;
"#,
)
.await
.unwrap();
let output_dir = tempfile::tempdir().unwrap();
let cli = cli::Command::parse_from([
"cli",
"export",
"--addr",
"127.0.0.1:4000",
"--output-dir",
&*output_dir.path().to_string_lossy(),
"--target",
"schema",
]);
let mut cli_app = cli.build(LoggingOptions::default()).await?;
cli_app.start().await?;
instance.stop().await?;
let output_file = output_dir
.path()
.join("greptime")
.join("cli.export.create_table")
.join("create_tables.sql");
let res = std::fs::read_to_string(output_file).unwrap();
let expect = r#"CREATE TABLE IF NOT EXISTS "a.b.c" (
"ts" TIMESTAMP(3) NOT NULL,
TIME INDEX ("ts")
)
ENGINE=mito
;
"#;
assert_eq!(res.trim(), expect.trim());
Ok(())
} }
} }

View File

@@ -19,7 +19,6 @@ use std::time::Duration;
use async_trait::async_trait; use async_trait::async_trait;
use clap::Parser; use clap::Parser;
use common_error::ext::BoxedError;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::etcd::EtcdStore; use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::peer::Peer; use common_meta::peer::Peer;
@@ -31,9 +30,11 @@ use rand::Rng;
use store_api::storage::RegionNumber; use store_api::storage::RegionNumber;
use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableType}; use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableType};
use table::table_name::TableName; use table::table_name::TableName;
use tracing_appender::non_blocking::WorkerGuard;
use self::metadata::TableMetadataBencher; use self::metadata::TableMetadataBencher;
use crate::Tool; use crate::cli::{Instance, Tool};
use crate::error::Result;
mod metadata; mod metadata;
@@ -61,7 +62,7 @@ pub struct BenchTableMetadataCommand {
} }
impl BenchTableMetadataCommand { impl BenchTableMetadataCommand {
pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> { pub async fn build(&self, guard: Vec<WorkerGuard>) -> Result<Instance> {
let etcd_store = EtcdStore::with_endpoints([&self.etcd_addr], 128) let etcd_store = EtcdStore::with_endpoints([&self.etcd_addr], 128)
.await .await
.unwrap(); .unwrap();
@@ -72,7 +73,7 @@ impl BenchTableMetadataCommand {
table_metadata_manager, table_metadata_manager,
count: self.count, count: self.count,
}; };
Ok(Box::new(tool)) Ok(Instance::new(Box::new(tool), guard))
} }
} }
@@ -83,7 +84,7 @@ struct BenchTableMetadata {
#[async_trait] #[async_trait]
impl Tool for BenchTableMetadata { impl Tool for BenchTableMetadata {
async fn do_work(&self) -> std::result::Result<(), BoxedError> { async fn do_work(&self) -> Result<()> {
let bencher = TableMetadataBencher::new(self.table_metadata_manager.clone(), self.count); let bencher = TableMetadataBencher::new(self.table_metadata_manager.clone(), self.count);
bencher.bench_create().await; bencher.bench_create().await;
bencher.bench_get().await; bencher.bench_get().await;

View File

@@ -18,7 +18,7 @@ use common_meta::key::table_route::TableRouteValue;
use common_meta::key::TableMetadataManagerRef; use common_meta::key::TableMetadataManagerRef;
use table::table_name::TableName; use table::table_name::TableName;
use crate::bench::{ use crate::cli::bench::{
bench_self_recorded, create_region_routes, create_region_wal_options, create_table_info, bench_self_recorded, create_region_routes, create_region_wal_options, create_table_info,
}; };

View File

@@ -12,35 +12,24 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::time::Duration;
use base64::engine::general_purpose; use base64::engine::general_purpose;
use base64::Engine; use base64::Engine;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use humantime::format_duration;
use serde_json::Value; use serde_json::Value;
use servers::http::header::constants::GREPTIME_DB_HEADER_TIMEOUT; use servers::http::greptime_result_v1::GreptimedbV1Response;
use servers::http::result::greptime_result_v1::GreptimedbV1Response;
use servers::http::GreptimeQueryOutput; use servers::http::GreptimeQueryOutput;
use snafu::ResultExt; use snafu::ResultExt;
use crate::error::{HttpQuerySqlSnafu, Result, SerdeJsonSnafu}; use crate::error::{HttpQuerySqlSnafu, Result, SerdeJsonSnafu};
#[derive(Debug, Clone)] pub(crate) struct DatabaseClient {
pub struct DatabaseClient {
addr: String, addr: String,
catalog: String, catalog: String,
auth_header: Option<String>, auth_header: Option<String>,
timeout: Duration,
} }
impl DatabaseClient { impl DatabaseClient {
pub fn new( pub fn new(addr: String, catalog: String, auth_basic: Option<String>) -> Self {
addr: String,
catalog: String,
auth_basic: Option<String>,
timeout: Duration,
) -> Self {
let auth_header = if let Some(basic) = auth_basic { let auth_header = if let Some(basic) = auth_basic {
let encoded = general_purpose::STANDARD.encode(basic); let encoded = general_purpose::STANDARD.encode(basic);
Some(format!("basic {}", encoded)) Some(format!("basic {}", encoded))
@@ -52,7 +41,6 @@ impl DatabaseClient {
addr, addr,
catalog, catalog,
auth_header, auth_header,
timeout,
} }
} }
@@ -75,11 +63,6 @@ impl DatabaseClient {
request = request.header("Authorization", auth); request = request.header("Authorization", auth);
} }
request = request.header(
GREPTIME_DB_HEADER_TIMEOUT,
format_duration(self.timeout).to_string(),
);
let response = request.send().await.with_context(|_| HttpQuerySqlSnafu { let response = request.send().await.with_context(|_| HttpQuerySqlSnafu {
reason: format!("bad url: {}", url), reason: format!("bad url: {}", url),
})?; })?;

View File

@@ -15,11 +15,9 @@
use std::collections::HashSet; use std::collections::HashSet;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait; use async_trait::async_trait;
use clap::{Parser, ValueEnum}; use clap::{Parser, ValueEnum};
use common_error::ext::BoxedError;
use common_telemetry::{debug, error, info}; use common_telemetry::{debug, error, info};
use serde_json::Value; use serde_json::Value;
use snafu::{OptionExt, ResultExt}; use snafu::{OptionExt, ResultExt};
@@ -27,10 +25,11 @@ use tokio::fs::File;
use tokio::io::{AsyncWriteExt, BufWriter}; use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::sync::Semaphore; use tokio::sync::Semaphore;
use tokio::time::Instant; use tokio::time::Instant;
use tracing_appender::non_blocking::WorkerGuard;
use crate::database::DatabaseClient; use crate::cli::database::DatabaseClient;
use crate::cli::{database, Instance, Tool};
use crate::error::{EmptyResultSnafu, Error, FileIoSnafu, Result, SchemaNotFoundSnafu}; use crate::error::{EmptyResultSnafu, Error, FileIoSnafu, Result, SchemaNotFoundSnafu};
use crate::{database, Tool};
type TableReference = (String, String, String); type TableReference = (String, String, String);
@@ -84,38 +83,28 @@ pub struct ExportCommand {
/// The basic authentication for connecting to the server /// The basic authentication for connecting to the server
#[clap(long)] #[clap(long)]
auth_basic: Option<String>, auth_basic: Option<String>,
/// The timeout of invoking the database.
///
/// It is used to override the server-side timeout setting.
/// The default behavior will disable server-side default timeout(i.e. `0s`).
#[clap(long, value_parser = humantime::parse_duration)]
timeout: Option<Duration>,
} }
impl ExportCommand { impl ExportCommand {
pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> { pub async fn build(&self, guard: Vec<WorkerGuard>) -> Result<Instance> {
let (catalog, schema) = let (catalog, schema) = database::split_database(&self.database)?;
database::split_database(&self.database).map_err(BoxedError::new)?;
let database_client = DatabaseClient::new( let database_client =
self.addr.clone(), DatabaseClient::new(self.addr.clone(), catalog.clone(), self.auth_basic.clone());
catalog.clone(),
self.auth_basic.clone(),
// Treats `None` as `0s` to disable server-side default timeout.
self.timeout.unwrap_or_default(),
);
Ok(Box::new(Export { Ok(Instance::new(
catalog, Box::new(Export {
schema, catalog,
database_client, schema,
output_dir: self.output_dir.clone(), database_client,
parallelism: self.export_jobs, output_dir: self.output_dir.clone(),
target: self.target.clone(), parallelism: self.export_jobs,
start_time: self.start_time.clone(), target: self.target.clone(),
end_time: self.end_time.clone(), start_time: self.start_time.clone(),
})) end_time: self.end_time.clone(),
}),
guard,
))
} }
} }
@@ -463,22 +452,97 @@ impl Export {
#[async_trait] #[async_trait]
impl Tool for Export { impl Tool for Export {
async fn do_work(&self) -> std::result::Result<(), BoxedError> { async fn do_work(&self) -> Result<()> {
match self.target { match self.target {
ExportTarget::Schema => { ExportTarget::Schema => {
self.export_create_database() self.export_create_database().await?;
.await self.export_create_table().await
.map_err(BoxedError::new)?;
self.export_create_table().await.map_err(BoxedError::new)
} }
ExportTarget::Data => self.export_database_data().await.map_err(BoxedError::new), ExportTarget::Data => self.export_database_data().await,
ExportTarget::All => { ExportTarget::All => {
self.export_create_database() self.export_create_database().await?;
.await self.export_create_table().await?;
.map_err(BoxedError::new)?; self.export_database_data().await
self.export_create_table().await.map_err(BoxedError::new)?;
self.export_database_data().await.map_err(BoxedError::new)
} }
} }
} }
} }
#[cfg(test)]
mod tests {
use clap::Parser;
use client::{Client, Database};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_telemetry::logging::LoggingOptions;
use crate::error::Result as CmdResult;
use crate::options::GlobalOptions;
use crate::{cli, standalone, App};
#[tokio::test(flavor = "multi_thread")]
async fn test_export_create_table_with_quoted_names() -> CmdResult<()> {
let output_dir = tempfile::tempdir().unwrap();
let standalone = standalone::Command::parse_from([
"standalone",
"start",
"--data-home",
&*output_dir.path().to_string_lossy(),
]);
let standalone_opts = standalone.load_options(&GlobalOptions::default()).unwrap();
let mut instance = standalone.build(standalone_opts).await?;
instance.start().await?;
let client = Client::with_urls(["127.0.0.1:4001"]);
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
database
.sql(r#"CREATE DATABASE "cli.export.create_table";"#)
.await
.unwrap();
database
.sql(
r#"CREATE TABLE "cli.export.create_table"."a.b.c"(
ts TIMESTAMP,
TIME INDEX (ts)
) engine=mito;
"#,
)
.await
.unwrap();
let output_dir = tempfile::tempdir().unwrap();
let cli = cli::Command::parse_from([
"cli",
"export",
"--addr",
"127.0.0.1:4000",
"--output-dir",
&*output_dir.path().to_string_lossy(),
"--target",
"schema",
]);
let mut cli_app = cli.build(LoggingOptions::default()).await?;
cli_app.start().await?;
instance.stop().await?;
let output_file = output_dir
.path()
.join("greptime")
.join("cli.export.create_table")
.join("create_tables.sql");
let res = std::fs::read_to_string(output_file).unwrap();
let expect = r#"CREATE TABLE IF NOT EXISTS "a.b.c" (
"ts" TIMESTAMP(3) NOT NULL,
TIME INDEX ("ts")
)
ENGINE=mito
;
"#;
assert_eq!(res.trim(), expect.trim());
Ok(())
}
}

View File

@@ -19,7 +19,7 @@ use rustyline::highlight::{Highlighter, MatchingBracketHighlighter};
use rustyline::hint::{Hinter, HistoryHinter}; use rustyline::hint::{Hinter, HistoryHinter};
use rustyline::validate::{ValidationContext, ValidationResult, Validator}; use rustyline::validate::{ValidationContext, ValidationResult, Validator};
use crate::cmd::ReplCommand; use crate::cli::cmd::ReplCommand;
pub(crate) struct RustylineHelper { pub(crate) struct RustylineHelper {
hinter: HistoryHinter, hinter: HistoryHinter,

View File

@@ -14,20 +14,19 @@
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait; use async_trait::async_trait;
use clap::{Parser, ValueEnum}; use clap::{Parser, ValueEnum};
use common_catalog::consts::DEFAULT_SCHEMA_NAME; use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_error::ext::BoxedError;
use common_telemetry::{error, info, warn}; use common_telemetry::{error, info, warn};
use snafu::{OptionExt, ResultExt}; use snafu::{OptionExt, ResultExt};
use tokio::sync::Semaphore; use tokio::sync::Semaphore;
use tokio::time::Instant; use tokio::time::Instant;
use tracing_appender::non_blocking::WorkerGuard;
use crate::database::DatabaseClient; use crate::cli::database::DatabaseClient;
use crate::cli::{database, Instance, Tool};
use crate::error::{Error, FileIoSnafu, Result, SchemaNotFoundSnafu}; use crate::error::{Error, FileIoSnafu, Result, SchemaNotFoundSnafu};
use crate::{database, Tool};
#[derive(Debug, Default, Clone, ValueEnum)] #[derive(Debug, Default, Clone, ValueEnum)]
enum ImportTarget { enum ImportTarget {
@@ -69,35 +68,25 @@ pub struct ImportCommand {
/// The basic authentication for connecting to the server /// The basic authentication for connecting to the server
#[clap(long)] #[clap(long)]
auth_basic: Option<String>, auth_basic: Option<String>,
/// The timeout of invoking the database.
///
/// It is used to override the server-side timeout setting.
/// The default behavior will disable server-side default timeout(i.e. `0s`).
#[clap(long, value_parser = humantime::parse_duration)]
timeout: Option<Duration>,
} }
impl ImportCommand { impl ImportCommand {
pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> { pub async fn build(&self, guard: Vec<WorkerGuard>) -> Result<Instance> {
let (catalog, schema) = let (catalog, schema) = database::split_database(&self.database)?;
database::split_database(&self.database).map_err(BoxedError::new)?; let database_client =
let database_client = DatabaseClient::new( DatabaseClient::new(self.addr.clone(), catalog.clone(), self.auth_basic.clone());
self.addr.clone(),
catalog.clone(),
self.auth_basic.clone(),
// Treats `None` as `0s` to disable server-side default timeout.
self.timeout.unwrap_or_default(),
);
Ok(Box::new(Import { Ok(Instance::new(
catalog, Box::new(Import {
schema, catalog,
database_client, schema,
input_dir: self.input_dir.clone(), database_client,
parallelism: self.import_jobs, input_dir: self.input_dir.clone(),
target: self.target.clone(), parallelism: self.import_jobs,
})) target: self.target.clone(),
}),
guard,
))
} }
} }
@@ -216,13 +205,13 @@ impl Import {
#[async_trait] #[async_trait]
impl Tool for Import { impl Tool for Import {
async fn do_work(&self) -> std::result::Result<(), BoxedError> { async fn do_work(&self) -> Result<()> {
match self.target { match self.target {
ImportTarget::Schema => self.import_create_table().await.map_err(BoxedError::new), ImportTarget::Schema => self.import_create_table().await,
ImportTarget::Data => self.import_database_data().await.map_err(BoxedError::new), ImportTarget::Data => self.import_database_data().await,
ImportTarget::All => { ImportTarget::All => {
self.import_create_table().await.map_err(BoxedError::new)?; self.import_create_table().await?;
self.import_database_data().await.map_err(BoxedError::new) self.import_database_data().await
} }
} }
} }

View File

@@ -20,21 +20,19 @@ use cache::{
build_fundamental_cache_registry, with_default_composite_cache_registry, TABLE_CACHE_NAME, build_fundamental_cache_registry, with_default_composite_cache_registry, TABLE_CACHE_NAME,
TABLE_ROUTE_CACHE_NAME, TABLE_ROUTE_CACHE_NAME,
}; };
use catalog::information_extension::DistributedInformationExtension;
use catalog::kvbackend::{ use catalog::kvbackend::{
CachedKvBackend, CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend, CachedMetaKvBackend, CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend,
}; };
use client::{Client, Database, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use client::{Client, Database, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_base::Plugins; use common_base::Plugins;
use common_config::Mode; use common_config::Mode;
use common_error::ext::ErrorExt; use common_error::ext::ErrorExt;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder}; use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::kv_backend::KvBackendRef;
use common_query::Output; use common_query::Output;
use common_recordbatch::RecordBatches; use common_recordbatch::RecordBatches;
use common_telemetry::debug; use common_telemetry::debug;
use either::Either; use either::Either;
use meta_client::client::{ClusterKvBackend, MetaClientBuilder}; use meta_client::client::MetaClientBuilder;
use query::datafusion::DatafusionQueryEngine; use query::datafusion::DatafusionQueryEngine;
use query::parser::QueryLanguageParser; use query::parser::QueryLanguageParser;
use query::query_engine::{DefaultSerializer, QueryEngineState}; use query::query_engine::{DefaultSerializer, QueryEngineState};
@@ -45,14 +43,15 @@ use session::context::QueryContext;
use snafu::{OptionExt, ResultExt}; use snafu::{OptionExt, ResultExt};
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use crate::cmd::ReplCommand; use crate::cli::cmd::ReplCommand;
use crate::cli::helper::RustylineHelper;
use crate::cli::AttachCommand;
use crate::error::{ use crate::error::{
CollectRecordBatchesSnafu, ParseSqlSnafu, PlanStatementSnafu, PrettyPrintRecordBatchesSnafu, CollectRecordBatchesSnafu, ParseSqlSnafu, PlanStatementSnafu, PrettyPrintRecordBatchesSnafu,
ReadlineSnafu, ReplCreationSnafu, RequestDatabaseSnafu, Result, StartMetaClientSnafu, ReadlineSnafu, ReplCreationSnafu, RequestDatabaseSnafu, Result, StartMetaClientSnafu,
SubstraitEncodeLogicalPlanSnafu, SubstraitEncodeLogicalPlanSnafu,
}; };
use crate::helper::RustylineHelper; use crate::{error, DistributedInformationExtension};
use crate::{error, AttachCommand};
/// Captures the state of the repl, gathers commands and executes them one by one /// Captures the state of the repl, gathers commands and executes them one by one
pub struct Repl { pub struct Repl {
@@ -259,9 +258,8 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
.context(StartMetaClientSnafu)?; .context(StartMetaClientSnafu)?;
let meta_client = Arc::new(meta_client); let meta_client = Arc::new(meta_client);
let cached_meta_backend = Arc::new( let cached_meta_backend =
CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone()))).build(), Arc::new(CachedMetaKvBackendBuilder::new(meta_client.clone()).build());
);
let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry( let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
CacheRegistryBuilder::default() CacheRegistryBuilder::default()
.add_cache(cached_meta_backend.clone()) .add_cache(cached_meta_backend.clone())

View File

@@ -16,12 +16,10 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use async_trait::async_trait; use async_trait::async_trait;
use cache::build_datanode_cache_registry;
use catalog::kvbackend::MetaKvBackend; use catalog::kvbackend::MetaKvBackend;
use clap::Parser; use clap::Parser;
use common_base::Plugins; use common_base::Plugins;
use common_config::Configurable; use common_config::Configurable;
use common_meta::cache::LayeredCacheRegistryBuilder;
use common_telemetry::logging::TracingOptions; use common_telemetry::logging::TracingOptions;
use common_telemetry::{info, warn}; use common_telemetry::{info, warn};
use common_version::{short_version, version}; use common_version::{short_version, version};
@@ -59,16 +57,13 @@ impl Instance {
} }
} }
pub fn datanode(&self) -> &Datanode {
&self.datanode
}
/// Get mutable Datanode instance for changing some internal state, before starting it.
// Useful for wrapping Datanode instance. Please do not remove this method even if you find
// nowhere it is called.
pub fn datanode_mut(&mut self) -> &mut Datanode { pub fn datanode_mut(&mut self) -> &mut Datanode {
&mut self.datanode &mut self.datanode
} }
pub fn datanode(&self) -> &Datanode {
&self.datanode
}
} }
#[async_trait] #[async_trait]
@@ -305,17 +300,9 @@ impl StartCommand {
client: meta_client.clone(), client: meta_client.clone(),
}); });
// Builds cache registry for datanode.
let layered_cache_registry = Arc::new(
LayeredCacheRegistryBuilder::default()
.add_cache_registry(build_datanode_cache_registry(meta_backend.clone()))
.build(),
);
let mut datanode = DatanodeBuilder::new(opts.clone(), plugins) let mut datanode = DatanodeBuilder::new(opts.clone(), plugins)
.with_meta_client(meta_client) .with_meta_client(meta_client)
.with_kv_backend(meta_backend) .with_kv_backend(meta_backend)
.with_cache_registry(layered_cache_registry)
.build() .build()
.await .await
.context(StartDatanodeSnafu)?; .context(StartDatanodeSnafu)?;

View File

@@ -114,20 +114,6 @@ pub enum Error {
source: frontend::error::Error, source: frontend::error::Error,
}, },
#[snafu(display("Failed to build cli"))]
BuildCli {
#[snafu(implicit)]
location: Location,
source: BoxedError,
},
#[snafu(display("Failed to start cli"))]
StartCli {
#[snafu(implicit)]
location: Location,
source: BoxedError,
},
#[snafu(display("Failed to build meta server"))] #[snafu(display("Failed to build meta server"))]
BuildMetaServer { BuildMetaServer {
#[snafu(implicit)] #[snafu(implicit)]
@@ -360,8 +346,6 @@ impl ErrorExt for Error {
Error::ShutdownMetaServer { source, .. } => source.status_code(), Error::ShutdownMetaServer { source, .. } => source.status_code(),
Error::BuildMetaServer { source, .. } => source.status_code(), Error::BuildMetaServer { source, .. } => source.status_code(),
Error::UnsupportedSelectorType { source, .. } => source.status_code(), Error::UnsupportedSelectorType { source, .. } => source.status_code(),
Error::BuildCli { source, .. } => source.status_code(),
Error::StartCli { source, .. } => source.status_code(),
Error::InitMetadata { source, .. } | Error::InitDdlManager { source, .. } => { Error::InitMetadata { source, .. } | Error::InitDdlManager { source, .. } => {
source.status_code() source.status_code()

View File

@@ -15,15 +15,13 @@
use std::sync::Arc; use std::sync::Arc;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry}; use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::information_extension::DistributedInformationExtension; use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use clap::Parser; use clap::Parser;
use client::client_manager::NodeClients; use client::client_manager::NodeClients;
use common_base::Plugins; use common_base::Plugins;
use common_config::Configurable; use common_config::Configurable;
use common_grpc::channel_manager::ChannelConfig; use common_grpc::channel_manager::ChannelConfig;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder}; use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor; use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::key::flow::FlowMetadataManager; use common_meta::key::flow::FlowMetadataManager;
@@ -32,6 +30,7 @@ use common_telemetry::info;
use common_telemetry::logging::TracingOptions; use common_telemetry::logging::TracingOptions;
use common_version::{short_version, version}; use common_version::{short_version, version};
use flow::{FlownodeBuilder, FlownodeInstance, FrontendInvoker}; use flow::{FlownodeBuilder, FlownodeInstance, FrontendInvoker};
use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler;
use meta_client::{MetaClientOptions, MetaClientType}; use meta_client::{MetaClientOptions, MetaClientType};
use servers::Mode; use servers::Mode;
use snafu::{OptionExt, ResultExt}; use snafu::{OptionExt, ResultExt};
@@ -42,7 +41,7 @@ use crate::error::{
MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu, MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu,
}; };
use crate::options::{GlobalOptions, GreptimeOptions}; use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{log_versions, App}; use crate::{log_versions, App, DistributedInformationExtension};
pub const APP_NAME: &str = "greptime-flownode"; pub const APP_NAME: &str = "greptime-flownode";
@@ -63,6 +62,10 @@ impl Instance {
} }
} }
pub fn flownode_mut(&mut self) -> &mut FlownodeInstance {
&mut self.flownode
}
pub fn flownode(&self) -> &FlownodeInstance { pub fn flownode(&self) -> &FlownodeInstance {
&self.flownode &self.flownode
} }
@@ -243,12 +246,11 @@ impl StartCommand {
let cache_tti = meta_config.metadata_cache_tti; let cache_tti = meta_config.metadata_cache_tti;
// TODO(discord9): add helper function to ease the creation of cache registry&such // TODO(discord9): add helper function to ease the creation of cache registry&such
let cached_meta_backend = let cached_meta_backend = CachedMetaKvBackendBuilder::new(meta_client.clone())
CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone()))) .cache_max_capacity(cache_max_capacity)
.cache_max_capacity(cache_max_capacity) .cache_ttl(cache_ttl)
.cache_ttl(cache_ttl) .cache_tti(cache_tti)
.cache_tti(cache_tti) .build();
.build();
let cached_meta_backend = Arc::new(cached_meta_backend); let cached_meta_backend = Arc::new(cached_meta_backend);
// Builds cache registry // Builds cache registry
@@ -285,7 +287,9 @@ impl StartCommand {
let executor = HandlerGroupExecutor::new(vec![ let executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler), Arc::new(ParseMailboxMessageHandler),
Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())), Arc::new(InvalidateTableCacheHandler::new(
layered_cache_registry.clone(),
)),
]); ]);
let heartbeat_task = flow::heartbeat::HeartbeatTask::new( let heartbeat_task = flow::heartbeat::HeartbeatTask::new(

View File

@@ -17,21 +17,20 @@ use std::time::Duration;
use async_trait::async_trait; use async_trait::async_trait;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry}; use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::information_extension::DistributedInformationExtension; use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use clap::Parser; use clap::Parser;
use client::client_manager::NodeClients; use client::client_manager::NodeClients;
use common_base::Plugins; use common_base::Plugins;
use common_config::Configurable; use common_config::Configurable;
use common_grpc::channel_manager::ChannelConfig; use common_grpc::channel_manager::ChannelConfig;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder}; use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor; use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_telemetry::info; use common_telemetry::info;
use common_telemetry::logging::TracingOptions; use common_telemetry::logging::TracingOptions;
use common_time::timezone::set_default_timezone; use common_time::timezone::set_default_timezone;
use common_version::{short_version, version}; use common_version::{short_version, version};
use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler;
use frontend::heartbeat::HeartbeatTask; use frontend::heartbeat::HeartbeatTask;
use frontend::instance::builder::FrontendBuilder; use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance as FeInstance}; use frontend::instance::{FrontendInstance, Instance as FeInstance};
@@ -47,7 +46,7 @@ use crate::error::{
Result, StartFrontendSnafu, Result, StartFrontendSnafu,
}; };
use crate::options::{GlobalOptions, GreptimeOptions}; use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{log_versions, App}; use crate::{log_versions, App, DistributedInformationExtension};
type FrontendOptions = GreptimeOptions<frontend::frontend::FrontendOptions>; type FrontendOptions = GreptimeOptions<frontend::frontend::FrontendOptions>;
@@ -294,12 +293,11 @@ impl StartCommand {
.context(MetaClientInitSnafu)?; .context(MetaClientInitSnafu)?;
// TODO(discord9): add helper function to ease the creation of cache registry&such // TODO(discord9): add helper function to ease the creation of cache registry&such
let cached_meta_backend = let cached_meta_backend = CachedMetaKvBackendBuilder::new(meta_client.clone())
CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone()))) .cache_max_capacity(cache_max_capacity)
.cache_max_capacity(cache_max_capacity) .cache_ttl(cache_ttl)
.cache_ttl(cache_ttl) .cache_tti(cache_tti)
.cache_tti(cache_tti) .build();
.build();
let cached_meta_backend = Arc::new(cached_meta_backend); let cached_meta_backend = Arc::new(cached_meta_backend);
// Builds cache registry // Builds cache registry
@@ -329,7 +327,9 @@ impl StartCommand {
let executor = HandlerGroupExecutor::new(vec![ let executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler), Arc::new(ParseMailboxMessageHandler),
Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())), Arc::new(InvalidateTableCacheHandler::new(
layered_cache_registry.clone(),
)),
]); ]);
let heartbeat_task = HeartbeatTask::new( let heartbeat_task = HeartbeatTask::new(

View File

@@ -15,7 +15,17 @@
#![feature(assert_matches, let_chains)] #![feature(assert_matches, let_chains)]
use async_trait::async_trait; use async_trait::async_trait;
use catalog::information_schema::InformationExtension;
use client::api::v1::meta::ProcedureStatus;
use common_error::ext::BoxedError;
use common_meta::cluster::{ClusterInfo, NodeInfo};
use common_meta::datanode::RegionStat;
use common_meta::ddl::{ExecutorContext, ProcedureExecutor};
use common_meta::rpc::procedure;
use common_procedure::{ProcedureInfo, ProcedureState};
use common_telemetry::{error, info}; use common_telemetry::{error, info};
use meta_client::MetaClientRef;
use snafu::ResultExt;
use crate::error::Result; use crate::error::Result;
@@ -33,31 +43,6 @@ lazy_static::lazy_static! {
prometheus::register_int_gauge_vec!("greptime_app_version", "app version", &["version", "short_version", "app"]).unwrap(); prometheus::register_int_gauge_vec!("greptime_app_version", "app version", &["version", "short_version", "app"]).unwrap();
} }
/// wait for the close signal, for unix platform it's SIGINT or SIGTERM
#[cfg(unix)]
async fn start_wait_for_close_signal() -> std::io::Result<()> {
use tokio::signal::unix::{signal, SignalKind};
let mut sigint = signal(SignalKind::interrupt())?;
let mut sigterm = signal(SignalKind::terminate())?;
tokio::select! {
_ = sigint.recv() => {
info!("Received SIGINT, shutting down");
}
_ = sigterm.recv() => {
info!("Received SIGTERM, shutting down");
}
}
Ok(())
}
/// wait for the close signal, for non-unix platform it's ctrl-c
#[cfg(not(unix))]
async fn start_wait_for_close_signal() -> std::io::Result<()> {
tokio::signal::ctrl_c().await
}
#[async_trait] #[async_trait]
pub trait App: Send { pub trait App: Send {
fn name(&self) -> &str; fn name(&self) -> &str;
@@ -84,9 +69,9 @@ pub trait App: Send {
self.start().await?; self.start().await?;
if self.wait_signal() { if self.wait_signal() {
if let Err(e) = start_wait_for_close_signal().await { if let Err(e) = tokio::signal::ctrl_c().await {
error!(e; "Failed to listen for close signal"); error!(e; "Failed to listen for ctrl-c signal");
// It's unusual to fail to listen for close signal, maybe there's something unexpected in // It's unusual to fail to listen for ctrl-c signal, maybe there's something unexpected in
// the underlying system. So we stop the app instead of running nonetheless to let people // the underlying system. So we stop the app instead of running nonetheless to let people
// investigate the issue. // investigate the issue.
} }
@@ -120,3 +105,69 @@ fn log_env_flags() {
info!("argument: {}", argument); info!("argument: {}", argument);
} }
} }
pub struct DistributedInformationExtension {
meta_client: MetaClientRef,
}
impl DistributedInformationExtension {
pub fn new(meta_client: MetaClientRef) -> Self {
Self { meta_client }
}
}
#[async_trait::async_trait]
impl InformationExtension for DistributedInformationExtension {
type Error = catalog::error::Error;
async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
self.meta_client
.list_nodes(None)
.await
.map_err(BoxedError::new)
.context(catalog::error::ListNodesSnafu)
}
async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
let procedures = self
.meta_client
.list_procedures(&ExecutorContext::default())
.await
.map_err(BoxedError::new)
.context(catalog::error::ListProceduresSnafu)?
.procedures;
let mut result = Vec::with_capacity(procedures.len());
for procedure in procedures {
let pid = match procedure.id {
Some(pid) => pid,
None => return catalog::error::ProcedureIdNotFoundSnafu {}.fail(),
};
let pid = procedure::pb_pid_to_pid(&pid)
.map_err(BoxedError::new)
.context(catalog::error::ConvertProtoDataSnafu)?;
let status = ProcedureStatus::try_from(procedure.status)
.map(|v| v.as_str_name())
.unwrap_or("Unknown")
.to_string();
let procedure_info = ProcedureInfo {
id: pid,
type_name: procedure.type_name,
start_time_ms: procedure.start_time_ms,
end_time_ms: procedure.end_time_ms,
state: ProcedureState::Running,
lock_keys: procedure.lock_keys,
};
result.push((status, procedure_info));
}
Ok(result)
}
async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
self.meta_client
.list_region_stats()
.await
.map_err(BoxedError::new)
.context(catalog::error::ListRegionStatsSnafu)
}
}

View File

@@ -22,7 +22,6 @@ use catalog::information_schema::InformationExtension;
use catalog::kvbackend::KvBackendCatalogManager; use catalog::kvbackend::KvBackendCatalogManager;
use clap::Parser; use clap::Parser;
use client::api::v1::meta::RegionRole; use client::api::v1::meta::RegionRole;
use common_base::readable_size::ReadableSize;
use common_base::Plugins; use common_base::Plugins;
use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID}; use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
use common_config::{metadata_store_dir, Configurable, KvBackendConfig}; use common_config::{metadata_store_dir, Configurable, KvBackendConfig};
@@ -35,7 +34,6 @@ use common_meta::ddl::flow_meta::{FlowMetadataAllocator, FlowMetadataAllocatorRe
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef}; use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl, ProcedureExecutorRef}; use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl, ProcedureExecutorRef};
use common_meta::ddl_manager::DdlManager; use common_meta::ddl_manager::DdlManager;
use common_meta::key::flow::flow_state::FlowStat;
use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef}; use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef; use common_meta::kv_backend::KvBackendRef;
@@ -72,7 +70,7 @@ use servers::http::HttpOptions;
use servers::tls::{TlsMode, TlsOption}; use servers::tls::{TlsMode, TlsOption};
use servers::Mode; use servers::Mode;
use snafu::ResultExt; use snafu::ResultExt;
use tokio::sync::{broadcast, RwLock}; use tokio::sync::broadcast;
use tracing_appender::non_blocking::WorkerGuard; use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{ use crate::error::{
@@ -153,7 +151,6 @@ pub struct StandaloneOptions {
pub tracing: TracingOptions, pub tracing: TracingOptions,
pub init_regions_in_background: bool, pub init_regions_in_background: bool,
pub init_regions_parallelism: usize, pub init_regions_parallelism: usize,
pub max_in_flight_write_bytes: Option<ReadableSize>,
} }
impl Default for StandaloneOptions { impl Default for StandaloneOptions {
@@ -183,7 +180,6 @@ impl Default for StandaloneOptions {
tracing: TracingOptions::default(), tracing: TracingOptions::default(),
init_regions_in_background: false, init_regions_in_background: false,
init_regions_parallelism: 16, init_regions_parallelism: 16,
max_in_flight_write_bytes: None,
} }
} }
} }
@@ -221,7 +217,6 @@ impl StandaloneOptions {
user_provider: cloned_opts.user_provider, user_provider: cloned_opts.user_provider,
// Handle the export metrics task run by standalone to frontend for execution // Handle the export metrics task run by standalone to frontend for execution
export_metrics: cloned_opts.export_metrics, export_metrics: cloned_opts.export_metrics,
max_in_flight_write_bytes: cloned_opts.max_in_flight_write_bytes,
..Default::default() ..Default::default()
} }
} }
@@ -502,7 +497,6 @@ impl StartCommand {
let datanode = DatanodeBuilder::new(dn_opts, plugins.clone()) let datanode = DatanodeBuilder::new(dn_opts, plugins.clone())
.with_kv_backend(kv_backend.clone()) .with_kv_backend(kv_backend.clone())
.with_cache_registry(layered_cache_registry.clone())
.build() .build()
.await .await
.context(StartDatanodeSnafu)?; .context(StartDatanodeSnafu)?;
@@ -512,7 +506,7 @@ impl StartCommand {
procedure_manager.clone(), procedure_manager.clone(),
)); ));
let catalog_manager = KvBackendCatalogManager::new( let catalog_manager = KvBackendCatalogManager::new(
information_extension.clone(), information_extension,
kv_backend.clone(), kv_backend.clone(),
layered_cache_registry.clone(), layered_cache_registry.clone(),
Some(procedure_manager.clone()), Some(procedure_manager.clone()),
@@ -537,14 +531,6 @@ impl StartCommand {
.context(OtherSnafu)?, .context(OtherSnafu)?,
); );
// set the ref to query for the local flow state
{
let flow_worker_manager = flownode.flow_worker_manager();
information_extension
.set_flow_worker_manager(flow_worker_manager.clone())
.await;
}
let node_manager = Arc::new(StandaloneDatanodeManager { let node_manager = Arc::new(StandaloneDatanodeManager {
region_server: datanode.region_server(), region_server: datanode.region_server(),
flow_server: flownode.flow_worker_manager(), flow_server: flownode.flow_worker_manager(),
@@ -682,7 +668,6 @@ pub struct StandaloneInformationExtension {
region_server: RegionServer, region_server: RegionServer,
procedure_manager: ProcedureManagerRef, procedure_manager: ProcedureManagerRef,
start_time_ms: u64, start_time_ms: u64,
flow_worker_manager: RwLock<Option<Arc<FlowWorkerManager>>>,
} }
impl StandaloneInformationExtension { impl StandaloneInformationExtension {
@@ -691,15 +676,8 @@ impl StandaloneInformationExtension {
region_server, region_server,
procedure_manager, procedure_manager,
start_time_ms: common_time::util::current_time_millis() as u64, start_time_ms: common_time::util::current_time_millis() as u64,
flow_worker_manager: RwLock::new(None),
} }
} }
/// Set the flow worker manager for the standalone instance.
pub async fn set_flow_worker_manager(&self, flow_worker_manager: Arc<FlowWorkerManager>) {
let mut guard = self.flow_worker_manager.write().await;
*guard = Some(flow_worker_manager);
}
} }
#[async_trait::async_trait] #[async_trait::async_trait]
@@ -771,18 +749,6 @@ impl InformationExtension for StandaloneInformationExtension {
.collect::<Vec<_>>(); .collect::<Vec<_>>();
Ok(stats) Ok(stats)
} }
async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
Ok(Some(
self.flow_worker_manager
.read()
.await
.as_ref()
.unwrap()
.gen_state_report()
.await,
))
}
} }
#[cfg(test)] #[cfg(test)]

View File

@@ -20,13 +20,13 @@ use common_config::Configurable;
use common_grpc::channel_manager::{ use common_grpc::channel_manager::{
DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
}; };
use common_options::datanode::{ClientOptions, DatanodeClientOptions};
use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, DEFAULT_OTLP_ENDPOINT}; use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, DEFAULT_OTLP_ENDPOINT};
use common_wal::config::raft_engine::RaftEngineConfig; use common_wal::config::raft_engine::RaftEngineConfig;
use common_wal::config::DatanodeWalConfig; use common_wal::config::DatanodeWalConfig;
use datanode::config::{DatanodeOptions, RegionEngineConfig, StorageConfig}; use datanode::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
use file_engine::config::EngineConfig; use file_engine::config::EngineConfig;
use frontend::frontend::FrontendOptions; use frontend::frontend::FrontendOptions;
use frontend::service_config::datanode::DatanodeClientOptions;
use meta_client::MetaClientOptions; use meta_client::MetaClientOptions;
use meta_srv::metasrv::MetasrvOptions; use meta_srv::metasrv::MetasrvOptions;
use meta_srv::selector::SelectorType; use meta_srv::selector::SelectorType;
@@ -69,6 +69,7 @@ fn test_load_datanode_example_config() {
region_engine: vec![ region_engine: vec![
RegionEngineConfig::Mito(MitoConfig { RegionEngineConfig::Mito(MitoConfig {
auto_flush_interval: Duration::from_secs(3600), auto_flush_interval: Duration::from_secs(3600),
scan_parallelism: 0,
experimental_write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)), experimental_write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)),
..Default::default() ..Default::default()
}), }),
@@ -125,11 +126,10 @@ fn test_load_frontend_example_config() {
tracing_sample_ratio: Some(Default::default()), tracing_sample_ratio: Some(Default::default()),
..Default::default() ..Default::default()
}, },
datanode: DatanodeClientOptions { datanode: frontend::service_config::DatanodeOptions {
client: ClientOptions { client: DatanodeClientOptions {
connect_timeout: Duration::from_secs(10), connect_timeout: Duration::from_secs(10),
tcp_nodelay: true, tcp_nodelay: true,
..Default::default()
}, },
}, },
export_metrics: ExportMetricsOption { export_metrics: ExportMetricsOption {
@@ -166,8 +166,8 @@ fn test_load_metasrv_example_config() {
}, },
..Default::default() ..Default::default()
}, },
datanode: DatanodeClientOptions { datanode: meta_srv::metasrv::DatanodeOptions {
client: ClientOptions { client: meta_srv::metasrv::DatanodeClientOptions {
timeout: Duration::from_secs(10), timeout: Duration::from_secs(10),
connect_timeout: Duration::from_secs(10), connect_timeout: Duration::from_secs(10),
tcp_nodelay: true, tcp_nodelay: true,
@@ -204,6 +204,7 @@ fn test_load_standalone_example_config() {
RegionEngineConfig::Mito(MitoConfig { RegionEngineConfig::Mito(MitoConfig {
auto_flush_interval: Duration::from_secs(3600), auto_flush_interval: Duration::from_secs(3600),
experimental_write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)), experimental_write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)),
scan_parallelism: 0,
..Default::default() ..Default::default()
}), }),
RegionEngineConfig::File(EngineConfig {}), RegionEngineConfig::File(EngineConfig {}),

View File

@@ -16,13 +16,9 @@ common-error.workspace = true
common-macro.workspace = true common-macro.workspace = true
futures.workspace = true futures.workspace = true
paste = "1.0" paste = "1.0"
pin-project.workspace = true
rand.workspace = true
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
snafu.workspace = true snafu.workspace = true
tokio.workspace = true
zeroize = { version = "1.6", default-features = false, features = ["alloc"] } zeroize = { version = "1.6", default-features = false, features = ["alloc"] }
[dev-dependencies] [dev-dependencies]
common-test-util.workspace = true
toml.workspace = true toml.workspace = true

View File

@@ -12,20 +12,12 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::future::Future;
use std::io; use std::io;
use std::ops::Range; use std::ops::Range;
use std::path::Path;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use async_trait::async_trait; use async_trait::async_trait;
use bytes::{BufMut, Bytes}; use bytes::{BufMut, Bytes};
use futures::AsyncRead; use futures::{AsyncReadExt, AsyncSeekExt};
use pin_project::pin_project;
use tokio::io::{AsyncReadExt as _, AsyncSeekExt as _};
use tokio::sync::Mutex;
/// `Metadata` contains the metadata of a source. /// `Metadata` contains the metadata of a source.
pub struct Metadata { pub struct Metadata {
@@ -36,11 +28,6 @@ pub struct Metadata {
/// `RangeReader` reads a range of bytes from a source. /// `RangeReader` reads a range of bytes from a source.
#[async_trait] #[async_trait]
pub trait RangeReader: Send + Unpin { pub trait RangeReader: Send + Unpin {
/// Sets the file size hint for the reader.
///
/// It's used to optimize the reading process by reducing the number of remote requests.
fn with_file_size_hint(&mut self, file_size_hint: u64);
/// Returns the metadata of the source. /// Returns the metadata of the source.
async fn metadata(&mut self) -> io::Result<Metadata>; async fn metadata(&mut self) -> io::Result<Metadata>;
@@ -74,11 +61,7 @@ pub trait RangeReader: Send + Unpin {
} }
#[async_trait] #[async_trait]
impl<R: ?Sized + RangeReader> RangeReader for &mut R { impl<R: RangeReader + Send + Unpin> RangeReader for &mut R {
fn with_file_size_hint(&mut self, file_size_hint: u64) {
(*self).with_file_size_hint(file_size_hint)
}
async fn metadata(&mut self) -> io::Result<Metadata> { async fn metadata(&mut self) -> io::Result<Metadata> {
(*self).metadata().await (*self).metadata().await
} }
@@ -97,218 +80,26 @@ impl<R: ?Sized + RangeReader> RangeReader for &mut R {
} }
} }
/// `AsyncReadAdapter` adapts a `RangeReader` to an `AsyncRead`. /// `RangeReaderAdapter` bridges `RangeReader` and `AsyncRead + AsyncSeek`.
#[pin_project] pub struct RangeReaderAdapter<R>(pub R);
pub struct AsyncReadAdapter<R> {
/// The inner `RangeReader`.
/// Use `Mutex` to get rid of the borrow checker issue.
inner: Arc<Mutex<R>>,
/// The current position from the view of the reader.
position: u64,
/// The buffer for the read bytes.
buffer: Vec<u8>,
/// The length of the content.
content_length: u64,
/// The future for reading the next bytes.
#[pin]
read_fut: Option<Pin<Box<dyn Future<Output = io::Result<Bytes>> + Send>>>,
}
impl<R: RangeReader + 'static> AsyncReadAdapter<R> {
pub async fn new(inner: R) -> io::Result<Self> {
let mut inner = inner;
let metadata = inner.metadata().await?;
Ok(AsyncReadAdapter {
inner: Arc::new(Mutex::new(inner)),
position: 0,
buffer: Vec::new(),
content_length: metadata.content_length,
read_fut: None,
})
}
}
/// The maximum size per read for the inner reader in `AsyncReadAdapter`.
const MAX_SIZE_PER_READ: usize = 8 * 1024 * 1024; // 8MB
impl<R: RangeReader + 'static> AsyncRead for AsyncReadAdapter<R> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let mut this = self.as_mut().project();
if *this.position >= *this.content_length {
return Poll::Ready(Ok(0));
}
if !this.buffer.is_empty() {
let to_read = this.buffer.len().min(buf.len());
buf[..to_read].copy_from_slice(&this.buffer[..to_read]);
this.buffer.drain(..to_read);
*this.position += to_read as u64;
return Poll::Ready(Ok(to_read));
}
if this.read_fut.is_none() {
let size = (*this.content_length - *this.position).min(MAX_SIZE_PER_READ as u64);
let range = *this.position..(*this.position + size);
let inner = this.inner.clone();
let fut = async move {
let mut inner = inner.lock().await;
inner.read(range).await
};
*this.read_fut = Some(Box::pin(fut));
}
match this
.read_fut
.as_mut()
.as_pin_mut()
.expect("checked above")
.poll(cx)
{
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(bytes)) => {
*this.read_fut = None;
if !bytes.is_empty() {
this.buffer.extend_from_slice(&bytes);
self.poll_read(cx, buf)
} else {
Poll::Ready(Ok(0))
}
}
Poll::Ready(Err(e)) => {
*this.read_fut = None;
Poll::Ready(Err(e))
}
}
}
}
/// Implements `RangeReader` for a type that implements `AsyncRead + AsyncSeek`.
///
/// TODO(zhongzc): It's a temporary solution for porting the codebase from `AsyncRead + AsyncSeek` to `RangeReader`.
/// Until the codebase is fully ported to `RangeReader`, remove this implementation.
#[async_trait] #[async_trait]
impl RangeReader for Vec<u8> { impl<R: futures::AsyncRead + futures::AsyncSeek + Send + Unpin> RangeReader
fn with_file_size_hint(&mut self, _file_size_hint: u64) { for RangeReaderAdapter<R>
// do nothing {
}
async fn metadata(&mut self) -> io::Result<Metadata> { async fn metadata(&mut self) -> io::Result<Metadata> {
Ok(Metadata { let content_length = self.0.seek(io::SeekFrom::End(0)).await?;
content_length: self.len() as u64, Ok(Metadata { content_length })
})
} }
async fn read(&mut self, range: Range<u64>) -> io::Result<Bytes> { async fn read(&mut self, range: Range<u64>) -> io::Result<Bytes> {
let bytes = Bytes::copy_from_slice(&self[range.start as usize..range.end as usize]);
Ok(bytes)
}
}
/// `FileReader` is a `RangeReader` for reading a file.
pub struct FileReader {
content_length: u64,
position: u64,
file: tokio::fs::File,
}
impl FileReader {
/// Creates a new `FileReader` for the file at the given path.
pub async fn new(path: impl AsRef<Path>) -> io::Result<Self> {
let file = tokio::fs::File::open(path).await?;
let metadata = file.metadata().await?;
Ok(FileReader {
content_length: metadata.len(),
position: 0,
file,
})
}
}
#[async_trait]
impl RangeReader for FileReader {
fn with_file_size_hint(&mut self, _file_size_hint: u64) {
// do nothing
}
async fn metadata(&mut self) -> io::Result<Metadata> {
Ok(Metadata {
content_length: self.content_length,
})
}
async fn read(&mut self, mut range: Range<u64>) -> io::Result<Bytes> {
if range.start != self.position {
self.file.seek(io::SeekFrom::Start(range.start)).await?;
self.position = range.start;
}
range.end = range.end.min(self.content_length);
if range.end <= self.position {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"Start of range is out of bounds",
));
}
let mut buf = vec![0; (range.end - range.start) as usize]; let mut buf = vec![0; (range.end - range.start) as usize];
self.0.seek(io::SeekFrom::Start(range.start)).await?;
self.file.read_exact(&mut buf).await?; self.0.read_exact(&mut buf).await?;
self.position = range.end;
Ok(Bytes::from(buf)) Ok(Bytes::from(buf))
} }
} }
#[cfg(test)]
mod tests {
use common_test_util::temp_dir::create_named_temp_file;
use futures::io::AsyncReadExt as _;
use super::*;
#[tokio::test]
async fn test_async_read_adapter() {
let data = b"hello world";
let reader = Vec::from(data);
let mut adapter = AsyncReadAdapter::new(reader).await.unwrap();
let mut buf = Vec::new();
adapter.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, data);
}
#[tokio::test]
async fn test_async_read_adapter_large() {
let data = (0..20 * 1024 * 1024).map(|i| i as u8).collect::<Vec<u8>>();
let mut adapter = AsyncReadAdapter::new(data.clone()).await.unwrap();
let mut buf = Vec::new();
adapter.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, data);
}
#[tokio::test]
async fn test_file_reader() {
let file = create_named_temp_file();
let path = file.path();
let data = b"hello world";
tokio::fs::write(path, data).await.unwrap();
let mut reader = FileReader::new(path).await.unwrap();
let metadata = reader.metadata().await.unwrap();
assert_eq!(metadata.content_length, data.len() as u64);
let bytes = reader.read(0..metadata.content_length).await.unwrap();
assert_eq!(&*bytes, data);
let bytes = reader.read(0..5).await.unwrap();
assert_eq!(&*bytes, &data[..5]);
}
}

View File

@@ -19,7 +19,7 @@ pub const GIB: u64 = MIB * BINARY_DATA_MAGNITUDE;
pub const TIB: u64 = GIB * BINARY_DATA_MAGNITUDE; pub const TIB: u64 = GIB * BINARY_DATA_MAGNITUDE;
pub const PIB: u64 = TIB * BINARY_DATA_MAGNITUDE; pub const PIB: u64 = TIB * BINARY_DATA_MAGNITUDE;
#[derive(Clone, Copy, PartialEq, Eq, Ord, PartialOrd, Default)] #[derive(Clone, Copy, PartialEq, Eq, Ord, PartialOrd)]
pub struct ReadableSize(pub u64); pub struct ReadableSize(pub u64);
impl ReadableSize { impl ReadableSize {

View File

@@ -8,5 +8,10 @@ license.workspace = true
workspace = true workspace = true
[dependencies] [dependencies]
common-error.workspace = true
common-macro.workspace = true
snafu.workspace = true
[dev-dependencies] [dev-dependencies]
chrono.workspace = true
tokio.workspace = true

View File

@@ -48,4 +48,5 @@ url = "2.3"
[dev-dependencies] [dev-dependencies]
common-telemetry.workspace = true common-telemetry.workspace = true
common-test-util.workspace = true common-test-util.workspace = true
dotenv.workspace = true
uuid.workspace = true uuid.workspace = true

View File

@@ -27,7 +27,7 @@ pub fn build_fs_backend(root: &str) -> Result<ObjectStore> {
DefaultLoggingInterceptor, DefaultLoggingInterceptor,
)) ))
.layer(object_store::layers::TracingLayer) .layer(object_store::layers::TracingLayer)
.layer(object_store::layers::build_prometheus_metrics_layer(true)) .layer(object_store::layers::PrometheusMetricsLayer::new(true))
.finish(); .finish();
Ok(object_store) Ok(object_store)
} }

View File

@@ -89,7 +89,7 @@ pub fn build_s3_backend(
DefaultLoggingInterceptor, DefaultLoggingInterceptor,
)) ))
.layer(object_store::layers::TracingLayer) .layer(object_store::layers::TracingLayer)
.layer(object_store::layers::build_prometheus_metrics_layer(true)) .layer(object_store::layers::PrometheusMetricsLayer::new(true))
.finish()) .finish())
} }

View File

@@ -8,7 +8,6 @@ license.workspace = true
workspace = true workspace = true
[dependencies] [dependencies]
http.workspace = true
snafu.workspace = true snafu.workspace = true
strum.workspace = true strum.workspace = true
tonic.workspace = true tonic.workspace = true

View File

@@ -18,30 +18,9 @@ pub mod ext;
pub mod mock; pub mod mock;
pub mod status_code; pub mod status_code;
use http::{HeaderMap, HeaderValue};
pub use snafu; pub use snafu;
// HACK - these headers are here for shared in gRPC services. For common HTTP headers, // HACK - these headers are here for shared in gRPC services. For common HTTP headers,
// please define in `src/servers/src/http/header.rs`. // please define in `src/servers/src/http/header.rs`.
pub const GREPTIME_DB_HEADER_ERROR_CODE: &str = "x-greptime-err-code"; pub const GREPTIME_DB_HEADER_ERROR_CODE: &str = "x-greptime-err-code";
pub const GREPTIME_DB_HEADER_ERROR_MSG: &str = "x-greptime-err-msg"; pub const GREPTIME_DB_HEADER_ERROR_MSG: &str = "x-greptime-err-msg";
/// Create a http header map from error code and message.
/// using `GREPTIME_DB_HEADER_ERROR_CODE` and `GREPTIME_DB_HEADER_ERROR_MSG` as keys.
pub fn from_err_code_msg_to_header(code: u32, msg: &str) -> HeaderMap {
let mut header = HeaderMap::new();
let msg = HeaderValue::from_str(msg).unwrap_or_else(|_| {
HeaderValue::from_bytes(
&msg.as_bytes()
.iter()
.flat_map(|b| std::ascii::escape_default(*b))
.collect::<Vec<u8>>(),
)
.expect("Already escaped string should be valid ascii")
});
header.insert(GREPTIME_DB_HEADER_ERROR_CODE, code.into());
header.insert(GREPTIME_DB_HEADER_ERROR_MSG, msg);
header
}

View File

@@ -5,7 +5,12 @@ edition.workspace = true
license.workspace = true license.workspace = true
[dependencies] [dependencies]
api.workspace = true
async-trait.workspace = true async-trait.workspace = true
common-base.workspace = true
common-error.workspace = true common-error.workspace = true
common-macro.workspace = true common-macro.workspace = true
common-query.workspace = true
session.workspace = true
snafu.workspace = true snafu.workspace = true
sql.workspace = true

View File

@@ -9,7 +9,7 @@ workspace = true
[features] [features]
default = ["geo"] default = ["geo"]
geo = ["geohash", "h3o", "s2", "wkt", "geo-types", "dep:geo"] geo = ["geohash", "h3o", "s2"]
[dependencies] [dependencies]
api.workspace = true api.workspace = true
@@ -28,12 +28,9 @@ common-version.workspace = true
datafusion.workspace = true datafusion.workspace = true
datatypes.workspace = true datatypes.workspace = true
derive_more = { version = "1", default-features = false, features = ["display"] } derive_more = { version = "1", default-features = false, features = ["display"] }
geo = { version = "0.29", optional = true }
geo-types = { version = "0.7", optional = true }
geohash = { version = "0.13", optional = true } geohash = { version = "0.13", optional = true }
h3o = { version = "0.6", optional = true } h3o = { version = "0.6", optional = true }
jsonb.workspace = true jsonb.workspace = true
nalgebra.workspace = true
num = "0.4" num = "0.4"
num-traits = "0.2" num-traits = "0.2"
once_cell.workspace = true once_cell.workspace = true
@@ -47,9 +44,8 @@ sql.workspace = true
statrs = "0.16" statrs = "0.16"
store-api.workspace = true store-api.workspace = true
table.workspace = true table.workspace = true
wkt = { version = "0.11", optional = true }
[dev-dependencies] [dev-dependencies]
approx = "0.5" ron = "0.7"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
tokio.workspace = true tokio.workspace = true

View File

@@ -27,7 +27,6 @@ use crate::scalars::matches::MatchesFunction;
use crate::scalars::math::MathFunction; use crate::scalars::math::MathFunction;
use crate::scalars::numpy::NumpyFunction; use crate::scalars::numpy::NumpyFunction;
use crate::scalars::timestamp::TimestampFunction; use crate::scalars::timestamp::TimestampFunction;
use crate::scalars::vector::VectorFunction;
use crate::system::SystemFunction; use crate::system::SystemFunction;
use crate::table::TableFunction; use crate::table::TableFunction;
@@ -121,9 +120,6 @@ pub static FUNCTION_REGISTRY: Lazy<Arc<FunctionRegistry>> = Lazy::new(|| {
// Json related functions // Json related functions
JsonFunction::register(&function_registry); JsonFunction::register(&function_registry);
// Vector related functions
VectorFunction::register(&function_registry);
// Geo functions // Geo functions
#[cfg(feature = "geo")] #[cfg(feature = "geo")]
crate::scalars::geo::GeoFunctions::register(&function_registry); crate::scalars::geo::GeoFunctions::register(&function_registry);

View File

@@ -26,4 +26,3 @@ pub mod function_registry;
pub mod handlers; pub mod handlers;
pub mod helper; pub mod helper;
pub mod state; pub mod state;
pub mod utils;

View File

@@ -21,7 +21,6 @@ pub mod json;
pub mod matches; pub mod matches;
pub mod math; pub mod math;
pub mod numpy; pub mod numpy;
pub mod vector;
#[cfg(test)] #[cfg(test)]
pub(crate) mod test; pub(crate) mod test;

Some files were not shown because too many files have changed in this diff Show More